Browse Source

Abstract to syn_gen_scope.

Roberto Ostinelli 3 years ago
parent
commit
7ba1ef57c1
5 changed files with 361 additions and 212 deletions
  1. 9 0
      src/syn.hrl
  2. 20 23
      src/syn_backbone.erl
  3. 298 0
      src/syn_gen_scope.erl
  4. 0 3
      src/syn_groups.erl
  5. 34 186
      src/syn_registry.erl

+ 9 - 0
src/syn.hrl

@@ -56,3 +56,12 @@
     Pid :: pid(),
     Meta :: any()
 }.
+
+%% records
+-record(state, {
+    handler = undefined :: undefined | module(),
+    handler_state = undefined :: any(),
+    scope = default :: atom(),
+    process_name = syn_registry_default :: atom(),
+    nodes = #{} :: #{node() => pid()}
+}).

+ 20 - 23
src/syn_backbone.erl

@@ -34,9 +34,6 @@
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
 
-%% records
--record(state, {}).
-
 %% includes
 -include("syn.hrl").
 
@@ -66,24 +63,24 @@ get_table_name(TableName, Scope) ->
 %% Init
 %% ----------------------------------------------------------------------------------------------------------
 -spec init([]) ->
-    {ok, #state{}} |
-    {ok, #state{}, Timeout :: non_neg_integer()} |
+    {ok, State :: map()} |
+    {ok, State :: map(), Timeout :: non_neg_integer()} |
     ignore |
     {stop, Reason :: any()}.
 init([]) ->
     %% init
-    {ok, #state{}}.
+    {ok, #{}}.
 
 %% ----------------------------------------------------------------------------------------------------------
 %% Call messages
 %% ----------------------------------------------------------------------------------------------------------
--spec handle_call(Request :: any(), From :: any(), #state{}) ->
-    {reply, Reply :: any(), #state{}} |
-    {reply, Reply :: any(), #state{}, Timeout :: non_neg_integer()} |
-    {noreply, #state{}} |
-    {noreply, #state{}, Timeout :: non_neg_integer()} |
-    {stop, Reason :: any(), Reply :: any(), #state{}} |
-    {stop, Reason :: any(), #state{}}.
+-spec handle_call(Request :: any(), From :: any(), State :: map()) ->
+    {reply, Reply :: any(), State :: map()} |
+    {reply, Reply :: any(), State :: map(), Timeout :: non_neg_integer()} |
+    {noreply, State :: map()} |
+    {noreply, State :: map(), Timeout :: non_neg_integer()} |
+    {stop, Reason :: any(), Reply :: any(), State :: map()} |
+    {stop, Reason :: any(), State :: map()}.
 handle_call({create_tables_for_scope, Scope}, _From, State) ->
     error_logger:warning_msg("SYN[~p] Creating tables for scope: ~p", [node(), Scope]),
     ensure_table_exists(get_table_name(syn_registry_by_name, Scope)),
@@ -99,10 +96,10 @@ handle_call(Request, From, State) ->
 %% ----------------------------------------------------------------------------------------------------------
 %% Cast messages
 %% ----------------------------------------------------------------------------------------------------------
--spec handle_cast(Msg :: any(), #state{}) ->
-    {noreply, #state{}} |
-    {noreply, #state{}, Timeout :: non_neg_integer()} |
-    {stop, Reason :: any(), #state{}}.
+-spec handle_cast(Msg :: any(), State :: map()) ->
+    {noreply, State :: map()} |
+    {noreply, State :: map(), Timeout :: non_neg_integer()} |
+    {stop, Reason :: any(), State :: map()}.
 handle_cast(Msg, State) ->
     error_logger:warning_msg("SYN[~p] Received an unknown cast message: ~p", [node(), Msg]),
     {noreply, State}.
@@ -110,10 +107,10 @@ handle_cast(Msg, State) ->
 %% ----------------------------------------------------------------------------------------------------------
 %% All non Call / Cast messages
 %% ----------------------------------------------------------------------------------------------------------
--spec handle_info(Info :: any(), #state{}) ->
-    {noreply, #state{}} |
-    {noreply, #state{}, Timeout :: non_neg_integer()} |
-    {stop, Reason :: any(), #state{}}.
+-spec handle_info(Info :: any(), State :: map()) ->
+    {noreply, State :: map()} |
+    {noreply, State :: map(), Timeout :: non_neg_integer()} |
+    {stop, Reason :: any(), State :: map()}.
 handle_info(Info, State) ->
     error_logger:warning_msg("SYN[~p] Received an unknown info message: ~p", [node(), Info]),
     {noreply, State}.
@@ -121,7 +118,7 @@ handle_info(Info, State) ->
 %% ----------------------------------------------------------------------------------------------------------
 %% Terminate
 %% ----------------------------------------------------------------------------------------------------------
--spec terminate(Reason :: any(), #state{}) -> terminated.
+-spec terminate(Reason :: any(), State :: map()) -> terminated.
 terminate(Reason, _State) ->
     error_logger:info_msg("SYN[~p] Terminating with reason: ~p", [node(), Reason]),
     %% return
@@ -130,7 +127,7 @@ terminate(Reason, _State) ->
 %% ----------------------------------------------------------------------------------------------------------
 %% Convert process state when code is changed.
 %% ----------------------------------------------------------------------------------------------------------
--spec code_change(OldVsn :: any(), #state{}, Extra :: any()) -> {ok, #state{}}.
+-spec code_change(OldVsn :: any(), State :: map(), Extra :: any()) -> {ok, State :: map()}.
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 

+ 298 - 0
src/syn_gen_scope.erl

@@ -0,0 +1,298 @@
+%% ==========================================================================================================
+%% Syn - A global Process Registry and Process Group manager.
+%%
+%% The MIT License (MIT)
+%%
+%% Copyright (c) 2015-2021 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
+%%
+%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%% of this software and associated documentation files (the "Software"), to deal
+%% in the Software without restriction, including without limitation the rights
+%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%% copies of the Software, and to permit persons to whom the Software is
+%% furnished to do so, subject to the following conditions:
+%%
+%% The above copyright notice and this permission notice shall be included in
+%% all copies or substantial portions of the Software.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%% IMPLIED, INCLUDING BUT NOT LIMITED TO THxE WARRANTIES OF MERCHANTABILITY,
+%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+%% THE SOFTWARE.
+%% ==========================================================================================================
+-module(syn_gen_scope).
+-behaviour(gen_server).
+
+%% API
+-export([
+    start_link/3,
+    get_subcluster_nodes/2,
+    call/3, call/4
+]).
+-export([
+    broadcast/2, broadcast/3,
+    broadcast_all_cluster/2,
+    send_to_node/3
+]).
+
+%% gen_server callbacks
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    handle_continue/2,
+    terminate/2,
+    code_change/3
+]).
+
+%% callbacks
+-callback init(Args :: term()) ->
+    {ok, State :: term()}.
+-callback handle_call(Request :: term(), From :: {pid(), Tag :: term()},
+    State :: term()) ->
+    {reply, Reply :: term(), NewState :: term()} |
+    {reply, Reply :: term(), NewState :: term(), timeout() | hibernate | {continue, term()}} |
+    {noreply, NewState :: term()} |
+    {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
+    {stop, Reason :: term(), Reply :: term(), NewState :: term()} |
+    {stop, Reason :: term(), NewState :: term()}.
+-callback handle_info(Info :: timeout | term(), State :: term()) ->
+    {noreply, NewState :: term()} |
+    {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
+    {stop, Reason :: term(), NewState :: term()}.
+-callback save_remote_data(RemoteScopePid :: pid(), RemoteData :: any(), State :: term()) -> any().
+-callback get_local_data(State :: term()) -> {ok, Data :: any()} | undefined.
+-callback purge_local_data_for_node(Node :: node(), State :: term()) -> any().
+
+%% includes
+-include("syn.hrl").
+
+%% ===================================================================
+%% API
+%% ===================================================================
+-spec start_link(Handler :: module(), Scope :: atom(), Args :: [any()]) ->
+    {ok, Pid :: pid()} | {error, {already_started, Pid :: pid()}} | {error, Reason :: any()}.
+start_link(Handler, Scope, Args) when is_atom(Scope) ->
+    ProcessName = get_process_name_for_scope(Handler, Scope),
+    gen_server:start_link({local, ProcessName}, ?MODULE, [Handler, Scope, ProcessName, Args], []).
+
+-spec get_subcluster_nodes(Handler :: module(), Scope :: atom()) -> [node()].
+get_subcluster_nodes(Handler, Scope) ->
+    ProcessName = get_process_name_for_scope(Handler, Scope),
+    gen_server:call(ProcessName, get_subcluster_nodes).
+
+-spec call(Handler :: module(), Scope :: atom(), Message :: any()) -> Response :: any().
+call(Handler, Scope, Message) ->
+    call(Handler, node(), Scope, Message).
+
+-spec call(Handler :: module(), Node :: atom(), Scope :: atom(), Message :: any()) -> Response :: any().
+call(Handler,Node, Scope, Message) ->
+    ProcessName = get_process_name_for_scope(Handler, Scope),
+    gen_server:call({ProcessName, Node}, Message).
+
+%% ===================================================================
+%% In-Process API
+%% ===================================================================
+-spec broadcast(Message :: any(), #state{}) -> any().
+broadcast(Message, State) ->
+    broadcast(Message, [], State).
+
+-spec broadcast(Message :: any(), ExcludedNodes :: [node()], #state{}) -> any().
+broadcast(Message, ExcludedNodes, #state{process_name = ProcessName, nodes = Nodes}) ->
+    lists:foreach(fun(RemoteNode) ->
+        erlang:send({ProcessName, RemoteNode}, Message, [noconnect])
+    end, maps:keys(Nodes) -- ExcludedNodes).
+
+-spec broadcast_all_cluster(Message :: any(), #state{}) -> any().
+broadcast_all_cluster(Message, #state{process_name = ProcessName}) ->
+    lists:foreach(fun(RemoteNode) ->
+        erlang:send({ProcessName, RemoteNode}, Message, [noconnect])
+    end, nodes()).
+
+-spec send_to_node(RemoteNode :: node(), Message :: any(), #state{}) -> any().
+send_to_node(RemoteNode, Message, #state{process_name = ProcessName}) ->
+    erlang:send({ProcessName, RemoteNode}, Message, [noconnect]).
+
+%% ===================================================================
+%% Callbacks
+%% ===================================================================
+
+%% ----------------------------------------------------------------------------------------------------------
+%% Init
+%% ----------------------------------------------------------------------------------------------------------
+-spec init([term()]) ->
+    {ok, #state{}} |
+    {ok, #state{}, Timeout :: non_neg_integer()} |
+    ignore |
+    {stop, Reason :: any()} |
+    {continue, any()}.
+init([Handler, Scope, ProcessName, Args]) ->
+    %% call init
+    {ok, HandlerState} = Handler:init(Args),
+    %% monitor nodes
+    ok = net_kernel:monitor_nodes(true),
+    %% build state
+    State = #state{
+        handler = Handler,
+        handler_state = HandlerState,
+        scope = Scope,
+        process_name = ProcessName
+    },
+    {ok, State, {continue, after_init}}.
+
+%% ----------------------------------------------------------------------------------------------------------
+%% Call messages
+%% ----------------------------------------------------------------------------------------------------------
+-spec handle_call(Request :: term(), From :: {pid(), Tag :: term()},
+    State :: term()) ->
+    {reply, Reply :: term(), NewState :: term()} |
+    {reply, Reply :: term(), NewState :: term(), timeout() | hibernate | {continue, term()}} |
+    {noreply, NewState :: term()} |
+    {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
+    {stop, Reason :: term(), Reply :: term(), NewState :: term()} |
+    {stop, Reason :: term(), NewState :: term()}.
+handle_call(get_subcluster_nodes, _From, #state{
+    nodes = Nodes
+} = State) ->
+    {reply, Nodes, State};
+
+handle_call(Request, From, #state{handler = Handler} = State) ->
+    Handler:handle_call(Request, From, State).
+
+%% ----------------------------------------------------------------------------------------------------------
+%% Cast messages
+%% ----------------------------------------------------------------------------------------------------------
+-spec handle_cast(Request :: term(), State :: term()) ->
+    {noreply, NewState :: term()} |
+    {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
+    {stop, Reason :: term(), NewState :: term()}.
+handle_cast(Msg, #state{handler = Handler} = State) ->
+    Handler:handle_cast(Msg, State).
+
+%% ----------------------------------------------------------------------------------------------------------
+%% Info messages
+%% ----------------------------------------------------------------------------------------------------------
+-spec handle_info(Info :: timeout | term(), State :: term()) ->
+    {noreply, NewState :: term()} |
+    {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
+    {stop, Reason :: term(), NewState :: term()}.
+handle_info({'3.0', discover, RemoteScopePid}, #state{
+    handler = Handler,
+    scope = Scope,
+    nodes = Nodes
+} = State) ->
+    RemoteScopeNode = node(RemoteScopePid),
+    error_logger:info_msg("SYN[~s] Received DISCOVER request from node '~s' and scope '~s'",
+        [node(), RemoteScopeNode, Scope]
+    ),
+    %% send local data to remote
+    {ok, LocalData} = Handler:get_local_data(State),
+    send_to_node(RemoteScopeNode, {'3.0', ack_sync, self(), LocalData}, State),
+    %% is this a new node?
+    case maps:is_key(RemoteScopeNode, Nodes) of
+        true ->
+            %% already known, ignore
+            {noreply, State};
+
+        false ->
+            %% monitor
+            _MRef = monitor(process, RemoteScopePid),
+            {noreply, State#state{nodes = Nodes#{RemoteScopeNode => RemoteScopePid}}}
+    end;
+
+handle_info({'3.0', ack_sync, RemoteScopePid, Data}, #state{
+    handler = Handler,
+    nodes = Nodes,
+    scope = Scope
+} = State) ->
+    RemoteScopeNode = node(RemoteScopePid),
+    error_logger:info_msg("SYN[~s] Received ACK SYNC from node '~s' and scope '~s'",
+        [node(), RemoteScopeNode, Scope]
+    ),
+    %% save remote data
+    Handler:save_remote_data(RemoteScopePid, Data, State),
+    %% is this a new node?
+    case maps:is_key(RemoteScopeNode, Nodes) of
+        true ->
+            %% already known
+            {noreply, State};
+
+        false ->
+            %% monitor
+            _MRef = monitor(process, RemoteScopePid),
+            %% send local to remote
+            {ok, LocalData} = Handler:get_local_data(State),
+            send_to_node(RemoteScopeNode, {'3.0', ack_sync, self(), LocalData}, State),
+            %% return
+            {noreply, State#state{nodes = Nodes#{RemoteScopeNode => RemoteScopePid}}}
+    end;
+
+handle_info({'DOWN', MRef, process, Pid, Reason}, #state{
+    handler = Handler,
+    scope = Scope,
+    nodes = Nodes
+} = State) when node(Pid) =/= node() ->
+    %% scope process down
+    RemoteNode = node(Pid),
+    case maps:take(RemoteNode, Nodes) of
+        {Pid, Nodes1} ->
+            error_logger:info_msg("SYN[~s] Scope Process '~s' is DOWN on node '~s': ~p",
+                [node(), Scope, RemoteNode, Reason]
+            ),
+            Handler:purge_local_data_for_node(RemoteNode, State),
+            {noreply, State#state{nodes = Nodes1}};
+
+        error ->
+            %% relay to handler
+            Handler:handle_info({'DOWN', MRef, process, Pid, Reason}, State)
+    end;
+
+handle_info({nodedown, _Node}, State) ->
+    %% ignore & wait for monitor DOWN message
+    {noreply, State};
+
+handle_info({nodeup, RemoteNode}, #state{scope = Scope} = State) ->
+    error_logger:info_msg("SYN[~s] Node '~s' has joined the cluster, sending discover message for scope '~s'",
+        [node(), RemoteNode, Scope]
+    ),
+    send_to_node(RemoteNode, {'3.0', discover, self()}, State),
+    {noreply, State};
+
+handle_info(Info, #state{handler = Handler} = State) ->
+    Handler:handle_info(Info, State).
+
+%% ----------------------------------------------------------------------------------------------------------
+%% Continue messages
+%% ----------------------------------------------------------------------------------------------------------
+handle_continue(after_init, #state{scope = Scope} = State) ->
+    error_logger:info_msg("SYN[~s] Discovering the cluster with scope '~s'", [node(), Scope]),
+    broadcast_all_cluster({'3.0', discover, self()}, State),
+    {noreply, State}.
+
+%% ----------------------------------------------------------------------------------------------------------
+%% Terminate
+%% ----------------------------------------------------------------------------------------------------------
+-spec terminate(Reason :: any(), #state{}) -> terminated.
+terminate(Reason, _State) ->
+    error_logger:info_msg("SYN[~s] Terminating with reason: ~p", [node(), Reason]),
+    terminated.
+
+%% ----------------------------------------------------------------------------------------------------------
+%% Convert process state when code is changed.
+%% ----------------------------------------------------------------------------------------------------------
+-spec code_change(OldVsn :: any(), #state{}, Extra :: any()) -> {ok, #state{}}.
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%% ===================================================================
+%% Internal
+%% ===================================================================
+-spec get_process_name_for_scope(Handler :: module(), Scope :: atom()) -> atom().
+get_process_name_for_scope(Handler, Scope) ->
+    ModuleBin = atom_to_binary(Handler),
+    ScopeBin = atom_to_binary(Scope),
+    binary_to_atom(<<ModuleBin/binary, "_", ScopeBin/binary>>).

+ 0 - 3
src/syn_groups.erl

@@ -32,9 +32,6 @@
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
 
-%% records
--record(state, {}).
-
 %% includes
 -include("syn.hrl").
 

+ 34 - 186
src/syn_registry.erl

@@ -24,7 +24,7 @@
 %% THE SOFTWARE.
 %% ==========================================================================================================
 -module(syn_registry).
--behaviour(gen_server).
+-behaviour(syn_gen_scope).
 
 %% API
 -export([start_link/1]).
@@ -34,15 +34,14 @@
 -export([unregister/1, unregister/2]).
 -export([count/1, count/2]).
 
-%% gen_server callbacks
+%% syn_gen_scope callbacks
 -export([
     init/1,
     handle_call/3,
-    handle_cast/2,
     handle_info/2,
-    handle_continue/2,
-    terminate/2,
-    code_change/3
+    save_remote_data/3,
+    get_local_data/1,
+    purge_local_data_for_node/2
 ]).
 
 %% tests
@@ -50,13 +49,6 @@
 -export([add_to_local_table/6, remove_from_local_table/3]).
 -endif.
 
-%% records
--record(state, {
-    scope = default :: atom(),
-    process_name = syn_registry_default :: atom(),
-    nodes = #{} :: #{node() => pid()}
-}).
-
 %% includes
 -include("syn.hrl").
 
@@ -66,14 +58,11 @@
 -spec start_link(Scope :: atom()) ->
     {ok, Pid :: pid()} | {error, {already_started, Pid :: pid()}} | {error, Reason :: any()}.
 start_link(Scope) when is_atom(Scope) ->
-    ProcessName = get_process_name_for_scope(Scope),
-    Args = [Scope, ProcessName],
-    gen_server:start_link({local, ProcessName}, ?MODULE, Args, []).
+    syn_gen_scope:start_link(?MODULE, Scope, [Scope]).
 
--spec get_subcluster_nodes(Scope :: atom()) -> [node()].
-get_subcluster_nodes(Scope) ->
-    ProcessName = get_process_name_for_scope(Scope),
-    gen_server:call(ProcessName, get_subcluster_nodes).
+-spec get_subcluster_nodes(#state{}) -> [node()].
+get_subcluster_nodes(State) ->
+    syn_gen_scope:get_subcluster_nodes(?MODULE, State).
 
 -spec lookup(Name :: any()) -> {pid(), Meta :: any()} | undefined.
 lookup(Name) ->
@@ -101,9 +90,8 @@ register(Scope, Name, Pid) when is_pid(Pid) ->
 
 -spec register(Scope :: atom(), Name :: any(), Pid :: pid(), Meta :: any()) -> ok | {error, Reason :: any()}.
 register(Scope, Name, Pid, Meta) ->
-    ProcessName = get_process_name_for_scope(Scope),
     Node = node(Pid),
-    try gen_server:call({ProcessName, Node}, {register_on_owner, node(), Name, Pid, Meta}) of
+    try syn_gen_scope:call(?MODULE, Node, Scope, {register_on_owner, node(), Name, Pid, Meta}) of
         {ok, {TablePid, TableMeta, Time}} when Node =/= node() ->
             %% update table on caller node immediately so that subsequent calls have an updated registry
             add_to_local_table(Scope, Name, Pid, Meta, Time, undefined),
@@ -130,9 +118,8 @@ unregister(Scope, Name) ->
             {error, undefined};
 
         {{Name, Pid}, Meta, _, _, _} ->
-            ProcessName = get_process_name_for_scope(Scope),
             Node = node(Pid),
-            case gen_server:call({ProcessName, Node}, {unregister_on_owner, node(), Name, Pid}) of
+            case syn_gen_scope:call(?MODULE, Node, Scope, {unregister_on_owner, node(), Name, Pid}) of
                 ok when Node =/= node() ->
                     %% remove table on caller node immediately so that subsequent calls have an updated registry
                     remove_from_local_table(Scope, Name, Pid),
@@ -174,21 +161,13 @@ count(Scope, Node) ->
 %% ----------------------------------------------------------------------------------------------------------
 %% Init
 %% ----------------------------------------------------------------------------------------------------------
--spec init(Args :: term()) ->
-    {ok, State :: term()} | {ok, State :: term(), timeout() | hibernate | {continue, term()}} |
-    {stop, Reason :: term()} | ignore.
-init([Scope, ProcessName]) ->
-    %% monitor nodes
-    ok = net_kernel:monitor_nodes(true),
-    %% rebuild monitors (if after crash)
+-spec init(Args :: term()) -> {ok, State :: term()}.
+init([Scope]) ->
+    HandlerState = #{},
+    %% rebuild
     rebuild_monitors(Scope),
-    %% build state
-    State = #state{
-        scope = Scope,
-        process_name = ProcessName
-    },
     %% init
-    {ok, State, {continue, after_init}}.
+    {ok, HandlerState}.
 
 %% ----------------------------------------------------------------------------------------------------------
 %% Call messages
@@ -201,11 +180,6 @@ init([Scope, ProcessName]) ->
     {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
     {stop, Reason :: term(), Reply :: term(), NewState :: term()} |
     {stop, Reason :: term(), NewState :: term()}.
-handle_call(get_subcluster_nodes, _From, #state{
-    nodes = Nodes
-} = State) ->
-    {reply, Nodes, State};
-
 handle_call({register_on_owner, RequesterNode, Name, Pid, Meta}, _From, #state{
     scope = Scope
 } = State) ->
@@ -224,7 +198,7 @@ handle_call({register_on_owner, RequesterNode, Name, Pid, Meta}, _From, #state{
                     %% callback
                     syn_event_handler:do_on_process_registered(Scope, Name, {undefined, undefined}, {Pid, Meta}),
                     %% broadcast
-                    broadcast({'3.0', sync_register, Scope, Name, Pid, Meta, Time}, [RequesterNode], State),
+                    syn_gen_scope:broadcast({'3.0', sync_register, Scope, Name, Pid, Meta, Time}, [RequesterNode], State),
                     %% return
                     {reply, {ok, {undefined, undefined, Time}}, State};
 
@@ -235,7 +209,7 @@ handle_call({register_on_owner, RequesterNode, Name, Pid, Meta}, _From, #state{
                     %% callback
                     syn_event_handler:do_on_process_registered(Scope, Name, {Pid, TableMeta}, {Pid, Meta}),
                     %% broadcast
-                    broadcast({'3.0', sync_register, Scope, Name, Pid, Meta, Time}, State),
+                    syn_gen_scope:broadcast({'3.0', sync_register, Scope, Name, Pid, Meta, Time}, State),
                     %% return
                     {reply, {ok, {Pid, TableMeta, Time}}, State};
 
@@ -257,7 +231,7 @@ handle_call({unregister_on_owner, RequesterNode, Name, Pid}, _From, #state{scope
             %% callback
             syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
             %% broadcast
-            broadcast({'3.0', sync_unregister, Name, Pid, Meta}, [RequesterNode], State),
+            syn_gen_scope:broadcast({'3.0', sync_unregister, Name, Pid, Meta}, [RequesterNode], State),
             %% return
             {reply, ok, State};
 
@@ -274,17 +248,6 @@ handle_call(Request, From, State) ->
     {reply, undefined, State}.
 
 %% ----------------------------------------------------------------------------------------------------------
-%% Cast messages
-%% ----------------------------------------------------------------------------------------------------------
--spec handle_cast(Request :: term(), State :: term()) ->
-    {noreply, NewState :: term()} |
-    {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
-    {stop, Reason :: term(), NewState :: term()}.
-handle_cast(Msg, State) ->
-    error_logger:warning_msg("SYN[~s] Received an unknown cast message: ~p", [node(), Msg]),
-    {noreply, State}.
-
-%% ----------------------------------------------------------------------------------------------------------
 %% Info messages
 %% ----------------------------------------------------------------------------------------------------------
 -spec handle_info(Info :: timeout | term(), State :: term()) ->
@@ -301,73 +264,6 @@ handle_info({'3.0', sync_unregister, Name, Pid, Meta}, #state{scope = Scope} = S
     syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
     %% return
     {noreply, State};
-
-handle_info({'3.0', discover, RemoteScopePid}, #state{
-    scope = Scope,
-    nodes = Nodes
-} = State) ->
-    RemoteScopeNode = node(RemoteScopePid),
-    error_logger:info_msg("SYN[~s] Received DISCOVER request from node '~s' and scope '~s'", [node(), RemoteScopeNode, Scope]),
-    %% send data
-    RegistryTuplesOfLocalNode = get_registry_tuples_for_node(Scope, node()),
-    send_to_node(RemoteScopeNode, {'3.0', ack_sync, self(), RegistryTuplesOfLocalNode}, State),
-    %% is this a new node?
-    case maps:is_key(RemoteScopeNode, Nodes) of
-        true ->
-            %% already known, ignore
-            {noreply, State};
-
-        false ->
-            %% monitor
-            _MRef = monitor(process, RemoteScopePid),
-            {noreply, State#state{nodes = Nodes#{RemoteScopeNode => RemoteScopePid}}}
-    end;
-
-handle_info({'3.0', ack_sync, RemoteScopePid, RegistryTuplesOfRemoteNode}, #state{
-    scope = Scope,
-    nodes = Nodes
-} = State) ->
-    RemoteScopeNode = node(RemoteScopePid),
-    error_logger:info_msg("SYN[~s] Received ACK SYNC (~w entries) from node '~s' and scope '~s'",
-        [node(), length(RegistryTuplesOfRemoteNode), RemoteScopeNode, Scope]
-    ),
-    %% insert tuples
-    lists:foreach(fun({Name, Pid, Meta, Time}) ->
-        handle_registry_sync(Scope, Name, Pid, Meta, Time, State)
-    end, RegistryTuplesOfRemoteNode),
-    %% is this a new node?
-    case maps:is_key(RemoteScopeNode, Nodes) of
-        true ->
-            %% already known, ignore
-            {noreply, State};
-
-        false ->
-            %% monitor
-            _MRef = monitor(process, RemoteScopePid),
-            %% send data
-            RegistryTuplesOfLocalNode = get_registry_tuples_for_node(Scope, node()),
-            send_to_node(RemoteScopeNode, {'3.0', ack_sync, self(), RegistryTuplesOfLocalNode}, State),
-            %% return
-            {noreply, State#state{nodes = Nodes#{RemoteScopeNode => RemoteScopePid}}}
-    end;
-
-handle_info({'DOWN', _MRef, process, Pid, _Reason}, #state{
-    scope = Scope,
-    nodes = Nodes
-} = State) when node(Pid) =/= node() ->
-    %% scope process down
-    RemoteNode = node(Pid),
-    case maps:take(RemoteNode, Nodes) of
-        {Pid, Nodes1} ->
-            error_logger:info_msg("SYN[~s] Scope Process ~p is DOWN on node '~s'", [node(), Scope, RemoteNode]),
-            purge_registry_for_remote_node(Scope, RemoteNode),
-            {noreply, State#state{nodes = Nodes1}};
-
-        error ->
-            error_logger:warning_msg("SYN[~s] Received DOWN message from unknown pid: ~p", [node(), Pid]),
-            {noreply, State}
-    end;
-
 handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{scope = Scope} = State) ->
     case find_registry_entries_by_pid(Scope, Pid) of
         [] ->
@@ -383,65 +279,37 @@ handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{scope = Scope} = State
                 %% callback
                 syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
                 %% broadcast
-                broadcast({'3.0', sync_unregister, Name, Pid, Meta}, State)
+                syn_gen_scope:broadcast({'3.0', sync_unregister, Name, Pid, Meta}, State)
             end, Entries)
     end,
     %% return
     {noreply, State};
 
-handle_info({nodedown, _Node}, State) ->
-    %% ignore & wait for monitor DOWN message
-    {noreply, State};
-
-handle_info({nodeup, RemoteNode}, #state{scope = Scope} = State) ->
-    error_logger:info_msg("SYN[~s] Node '~s' has joined the cluster, sending discover message for scope '~s'",
-        [node(), RemoteNode, Scope]
-    ),
-    send_to_node(RemoteNode, {'3.0', discover, self()}, State),
-    {noreply, State};
-
 handle_info(Info, State) ->
     error_logger:warning_msg("SYN[~s] Received an unknown info message: ~p", [node(), Info]),
     {noreply, State}.
 
 %% ----------------------------------------------------------------------------------------------------------
-%% Continue messages
+%% Data
 %% ----------------------------------------------------------------------------------------------------------
--spec handle_continue(Info :: term(), State :: term()) ->
-    {noreply, NewState :: term()} |
-    {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
-    {stop, Reason :: term(), NewState :: term()}.
-handle_continue(after_init, #state{scope = Scope} = State) ->
-    error_logger:info_msg("SYN[~s] Discovering the cluster with scope '~s'", [node(), Scope]),
-    broadcast_all({'3.0', discover, self()}, State),
-    {noreply, State}.
+-spec get_local_data(State :: term()) -> {ok, Data :: any()} | undefined.
+get_local_data(#state{scope = Scope}) ->
+    {ok, get_registry_tuples_for_node(Scope, node())}.
 
-%% ----------------------------------------------------------------------------------------------------------
-%% Terminate
-%% ----------------------------------------------------------------------------------------------------------
--spec terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), State :: term()) -> term().
-terminate(Reason, _State) ->
-    error_logger:info_msg("SYN[~s] Terminating with reason: ~p", [node(), Reason]),
-    terminated.
+-spec save_remote_data(RemoteScopePid :: pid(), RemoteData :: any(), State :: term()) -> any().
+save_remote_data(RemoteScopePid, RegistryTuplesOfRemoteNode, #state{scope = Scope} = State) ->
+    %% insert tuples
+    lists:foreach(fun({Name, Pid, Meta, Time}) ->
+        handle_registry_sync(Scope, Name, Pid, Meta, Time, State)
+    end, RegistryTuplesOfRemoteNode).
 
-%% ----------------------------------------------------------------------------------------------------------
-%% Convert process state when code is changed.
-%% ----------------------------------------------------------------------------------------------------------
--spec code_change(OldVsn :: (term() | {down, term()}), State :: term(),
-    Extra :: term()) ->
-    {ok, NewState :: term()} | {error, Reason :: term()}.
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
+-spec purge_local_data_for_node(Node :: node(), State :: term()) -> any().
+purge_local_data_for_node(Node, #state{scope = Scope}) ->
+    purge_registry_for_remote_node(Scope, Node).
 
 %% ===================================================================
 %% Internal
 %% ===================================================================
--spec get_process_name_for_scope(Scope :: atom()) -> atom().
-get_process_name_for_scope(Scope) ->
-    ModuleBin = atom_to_binary(?MODULE),
-    ScopeBin = atom_to_binary(Scope),
-    binary_to_atom(<<ModuleBin/binary, "_", ScopeBin/binary>>).
-
 -spec rebuild_monitors(Scope :: atom()) -> ok.
 rebuild_monitors(Scope) ->
     RegistryTuples = get_registry_tuples_for_node(Scope, node()),
@@ -457,26 +325,6 @@ rebuild_monitors(Scope) ->
         end
     end, RegistryTuples).
 
--spec broadcast(Message :: any(), #state{}) -> any().
-broadcast(Message, State) ->
-    broadcast(Message, [], State).
-
--spec broadcast(Message :: any(), ExcludedNodes :: [node()], #state{}) -> any().
-broadcast(Message, ExcludedNodes, #state{process_name = ProcessName, nodes = Nodes}) ->
-    lists:foreach(fun(RemoteNode) ->
-        erlang:send({ProcessName, RemoteNode}, Message, [noconnect])
-    end, maps:keys(Nodes) -- ExcludedNodes).
-
--spec broadcast_all(Message :: any(), #state{}) -> any().
-broadcast_all(Message, #state{process_name = ProcessName}) ->
-    lists:foreach(fun(RemoteNode) ->
-        erlang:send({ProcessName, RemoteNode}, Message, [noconnect])
-    end, nodes()).
-
--spec send_to_node(RemoteNode :: node(), Message :: any(), #state{}) -> any().
-send_to_node(RemoteNode, Message, #state{process_name = ProcessName}) ->
-    erlang:send({ProcessName, RemoteNode}, Message, [noconnect]).
-
 -spec get_registry_tuples_for_node(Scope :: atom(), Node :: node()) -> [syn_registry_tuple()].
 get_registry_tuples_for_node(Scope, Node) ->
     ets:select(syn_backbone:get_table_name(syn_registry_by_name, Scope), [{
@@ -661,7 +509,7 @@ resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime
             ResolveTime = erlang:system_time(),
             add_to_local_table(Scope, Name, TablePid, TableMeta, ResolveTime, TableMRef),
             %% broadcast
-            broadcast({'3.0', sync_register, Scope, Name, TablePid, TableMeta, ResolveTime}, State),
+            syn_gen_scope:broadcast({'3.0', sync_register, Scope, Name, TablePid, TableMeta, ResolveTime}, State),
             error_logger:info_msg("SYN[~s] Registry CONFLICT for name ~p@~s: ~p ~p -> chosen: ~p",
                 [node(), Name, Scope, Pid, TablePid, TablePid]
             );