Browse Source

Add scope registration, unregistration & monitoring.

No errors treatment yet.
Roberto Ostinelli 3 years ago
parent
commit
4a47a22f78
7 changed files with 926 additions and 17 deletions
  1. 20 0
      src/syn.erl
  2. 4 0
      src/syn.hrl
  3. 10 9
      src/syn_backbone.erl
  4. 62 0
      src/syn_event_handler.erl
  5. 421 8
      src/syn_registry.erl
  6. 286 0
      test/syn_registry_SUITE.erl
  7. 123 0
      test/syn_test_suite_helper.erl

+ 20 - 0
src/syn.erl

@@ -27,6 +27,9 @@
 
 
 %% API
 %% API
 -export([start/0, stop/0]).
 -export([start/0, stop/0]).
+-export([register/2, register/3]).
+-export([lookup/1]).
+-export([unregister/1]).
 
 
 %% ===================================================================
 %% ===================================================================
 %% API
 %% API
@@ -39,3 +42,20 @@ start() ->
 -spec stop() -> ok | {error, Reason :: any()}.
 -spec stop() -> ok | {error, Reason :: any()}.
 stop() ->
 stop() ->
     application:stop(syn).
     application:stop(syn).
+
+%% ----- \/ registry -------------------------------------------------
+-spec register(Name :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
+register(Name, Pid) ->
+    syn_registry:register(Name, Pid).
+
+-spec register(Name :: any(), Pid :: pid(), Meta :: any()) -> ok | {error, Reason :: any()}.
+register(Name, Pid, Meta) ->
+    syn_registry:register(Name, Pid, Meta).
+
+-spec lookup(Name :: any()) -> {pid(), Meta :: term()} | undefined.
+lookup(Name) ->
+    syn_registry:lookup(Name).
+
+-spec unregister(Name :: any()) -> ok | {error, Reason :: any()}.
+unregister(Name) ->
+    syn_registry:unregister(Name).

+ 4 - 0
src/syn.hrl

@@ -24,6 +24,10 @@
 %% THE SOFTWARE.
 %% THE SOFTWARE.
 %% ==========================================================================================================
 %% ==========================================================================================================
 %% types
 %% types
+-type syn_cluster_api_version() :: {
+    ApiCall :: atom(),
+    Version :: atom()
+}.
 -type syn_registry_entry() :: {
 -type syn_registry_entry() :: {
     {
     {
         Name :: any(),
         Name :: any(),

+ 10 - 9
src/syn_backbone.erl

@@ -59,15 +59,16 @@ start_link() ->
     ignore |
     ignore |
     {stop, Reason :: any()}.
     {stop, Reason :: any()}.
 init([]) ->
 init([]) ->
-    %% create ETS tables
+    %% create default ETS tables
+
     %% entries have structure {{Name, Pid}, Meta, Clock, MonitorRef, Node}
     %% entries have structure {{Name, Pid}, Meta, Clock, MonitorRef, Node}
-    ets:new(syn_registry_by_name, [ordered_set, public, named_table, {read_concurrency, true}, {write_concurrency, true}]),
+    ets:new(syn_registry_by_name_default, [ordered_set, public, named_table, {read_concurrency, true}, {write_concurrency, true}]),
     %% entries have format {{Pid, Name}, Meta, Clock, MonitorRef, Node}
     %% entries have format {{Pid, Name}, Meta, Clock, MonitorRef, Node}
-    ets:new(syn_registry_by_pid, [ordered_set, public, named_table, {read_concurrency, true}, {write_concurrency, true}]),
+    ets:new(syn_registry_by_pid_default, [ordered_set, public, named_table, {read_concurrency, true}, {write_concurrency, true}]),
     %% entries have format {{GroupName, Pid}, Meta, MonitorRef, Node}
     %% entries have format {{GroupName, Pid}, Meta, MonitorRef, Node}
-    ets:new(syn_groups_by_name, [ordered_set, public, named_table, {read_concurrency, true}, {write_concurrency, true}]),
+    ets:new(syn_groups_by_name_default, [ordered_set, public, named_table, {read_concurrency, true}, {write_concurrency, true}]),
     %% entries have format {{Pid, GroupName}, Meta, MonitorRef, Node}
     %% entries have format {{Pid, GroupName}, Meta, MonitorRef, Node}
-    ets:new(syn_groups_by_pid, [ordered_set, public, named_table, {read_concurrency, true}, {write_concurrency, true}]),
+    ets:new(syn_groups_by_pid_default, [ordered_set, public, named_table, {read_concurrency, true}, {write_concurrency, true}]),
     %% init
     %% init
     {ok, #state{}}.
     {ok, #state{}}.
 
 
@@ -117,10 +118,10 @@ handle_info(Info, State) ->
 terminate(Reason, _State) ->
 terminate(Reason, _State) ->
     error_logger:info_msg("Syn(~p): Terminating with reason: ~p~n", [node(), Reason]),
     error_logger:info_msg("Syn(~p): Terminating with reason: ~p~n", [node(), Reason]),
     %% delete ETS tables
     %% delete ETS tables
-    ets:delete(syn_registry_by_name),
-    ets:delete(syn_registry_by_pid),
-    ets:delete(syn_groups_by_name),
-    ets:delete(syn_groups_by_pid),
+    ets:delete(syn_registry_by_name_default),
+    ets:delete(syn_registry_by_pid_default),
+    ets:delete(syn_groups_by_name_default),
+    ets:delete(syn_groups_by_pid_default),
     %% return
     %% return
     terminated.
     terminated.
 
 

+ 62 - 0
src/syn_event_handler.erl

@@ -0,0 +1,62 @@
+%% ==========================================================================================================
+%% Syn - A global Process Registry and Process Group manager.
+%%
+%% The MIT License (MIT)
+%%
+%% Copyright (c) 2019-2021 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
+%%
+%% Portions of code from Ulf Wiger's unsplit server module:
+%% <https://github.com/uwiger/unsplit/blob/master/src/unsplit_server.erl>
+%%
+%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%% of this software and associated documentation files (the "Software"), to deal
+%% in the Software without restriction, including without limitation the rights
+%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%% copies of the Software, and to permit persons to whom the Software is
+%% furnished to do so, subject to the following conditions:
+%%
+%% The above copyright notice and this permission notice shall be included in
+%% all copies or substantial portions of the Software.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+%% THE SOFTWARE.
+%% ==========================================================================================================
+-module(syn_event_handler).
+
+-export([on_process_unregistered/5]).
+
+-callback on_process_unregistered(
+    Scope :: atom(),
+    Name :: any(),
+    Pid :: pid(),
+    Meta :: any(),
+    Reason :: any()
+) -> any().
+
+-optional_callbacks([on_process_unregistered/5]).
+
+%% ===================================================================
+%% API
+%% ===================================================================
+-spec on_process_unregistered(
+    Scope :: atom(),
+    Name :: any(),
+    Pid :: pid(),
+    Meta :: any(),
+    Reason :: any()
+) -> any().
+on_process_unregistered(Scope, Name, Pid, Meta, Reason) ->
+    CustomEventHandler = undefined,
+    case erlang:function_exported(CustomEventHandler, on_process_unregistered, 5) of
+        true ->
+            spawn(fun() ->
+                CustomEventHandler:on_process_unregistered(Scope, Name, Pid, Meta, Reason)
+            end);
+        _ ->
+            ok
+    end.

+ 421 - 8
src/syn_registry.erl

@@ -28,12 +28,25 @@
 
 
 %% API
 %% API
 -export([start_link/1]).
 -export([start_link/1]).
+-export([register/2, register/3, register/4]).
+-export([unregister/1, unregister/2]).
+-export([lookup/1]).
 
 
 %% gen_server callbacks
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
 
 
+%% tests
+-ifdef(TEST).
+-export([get_nodes/1]).
+-endif.
+
 %% records
 %% records
--record(state, {}).
+-record(state, {
+    scope = default :: atom(),
+    process_name = syn_registry_default :: atom(),
+    nodes = #{} :: #{node() => pid()},
+    remote_scope_monitors = #{} :: #{pid() => reference()}
+}).
 
 
 %% includes
 %% includes
 -include("syn.hrl").
 -include("syn.hrl").
@@ -43,10 +56,76 @@
 %% ===================================================================
 %% ===================================================================
 -spec start_link(Scope :: atom()) -> {ok, pid()} | {error, any()}.
 -spec start_link(Scope :: atom()) -> {ok, pid()} | {error, any()}.
 start_link(Scope) when is_atom(Scope) ->
 start_link(Scope) when is_atom(Scope) ->
-    Args = [],
     ProcessName = get_process_name(Scope),
     ProcessName = get_process_name(Scope),
+    Args = [Scope, ProcessName],
     gen_server:start_link({local, ProcessName}, ?MODULE, Args, []).
     gen_server:start_link({local, ProcessName}, ?MODULE, Args, []).
 
 
+-spec register(Name :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
+register(Name, Pid) ->
+    register(default, Name, Pid, undefined).
+
+-spec register(Name :: any(), Pid :: pid(), Meta :: term()) -> ok | {error, Reason :: any()}.
+register(Name, Pid, Meta) ->
+    register(default, Name, Pid, Meta).
+
+-spec register(Scope :: atom(), Name :: any(), Pid :: pid(), Meta :: any()) -> ok | {error, Reason :: any()}.
+register(Scope, Name, Pid, Meta) when is_pid(Pid) ->
+    ProcessName = get_process_name(Scope),
+    Node = node(Pid),
+    gen_server:call({ProcessName, Node}, {register_on_node, Name, Pid, Meta}).
+
+-spec lookup(Name :: any()) -> {pid(), Meta :: any()} | undefined.
+lookup(Name) ->
+    lookup(default, Name).
+
+-spec lookup(Scope :: atom(), Name :: any()) -> {pid(), Meta :: any()} | undefined.
+lookup(Scope, Name) ->
+    case find_registry_tuple_by_scope_and_name(Scope, Name) of
+        undefined -> undefined;
+        {Name, Pid, Meta, _} -> {Pid, Meta}
+    end.
+
+-spec unregister(Name :: any()) -> ok | {error, Reason :: any()}.
+unregister(Name) ->
+    unregister(default, Name).
+
+-spec unregister(Scope :: atom(), Name :: any()) -> ok | {error, Reason :: any()}.
+unregister(Scope, Name) ->
+    % get process' node
+    case find_registry_tuple_by_scope_and_name(Scope, Name) of
+        undefined ->
+            {error, undefined};
+        {Name, Pid, _, _} ->
+            ProcessName = get_process_name(Scope),
+            Node = node(Pid),
+            gen_server:call({ProcessName, Node}, {unregister_on_node, Name})
+    end.
+
+%% ----- \/ cluster API ----------------------------------------------
+-spec sync_register(
+    RemoteNode :: node(),
+    Scope :: atom(),
+    Name :: any(),
+    RemotePid :: pid(),
+    RemoteMeta :: any(),
+    RemoteTime :: integer()
+) -> ok.
+sync_register(RemoteNode, Scope, Name, RemotePid, RemoteMeta, RemoteTime) ->
+    ProcessName = get_process_name(Scope),
+    gen_server:cast({ProcessName, RemoteNode}, {sync_register, Name, RemotePid, RemoteMeta, RemoteTime}).
+
+-spec sync_unregister(RemoteNode :: node(), Scope :: atom(), Name :: any(), Pid :: pid()) -> ok.
+sync_unregister(RemoteNode, Scope, Name, Pid) ->
+    ProcessName = get_process_name(Scope),
+    gen_server:cast({ProcessName, RemoteNode}, {sync_unregister, Name, Pid}).
+
+%% ----- \/ TESTS ----------------------------------------------------
+-ifdef(TEST).
+get_nodes(Scope) ->
+    ProcessName = get_process_name(Scope),
+    gen_server:call(ProcessName, get_nodes).
+-endif.
+
 %% ===================================================================
 %% ===================================================================
 %% Callbacks
 %% Callbacks
 %% ===================================================================
 %% ===================================================================
@@ -54,16 +133,19 @@ start_link(Scope) when is_atom(Scope) ->
 %% ----------------------------------------------------------------------------------------------------------
 %% ----------------------------------------------------------------------------------------------------------
 %% Init
 %% Init
 %% ----------------------------------------------------------------------------------------------------------
 %% ----------------------------------------------------------------------------------------------------------
--spec init([]) ->
+-spec init([term()]) ->
     {ok, #state{}} |
     {ok, #state{}} |
     {ok, #state{}, Timeout :: non_neg_integer()} |
     {ok, #state{}, Timeout :: non_neg_integer()} |
     ignore |
     ignore |
     {stop, Reason :: any()}.
     {stop, Reason :: any()}.
-init([]) ->
+init([Scope, ProcessName]) ->
     %% build state
     %% build state
-    State = #state{},
-    %% init
-    {ok, State}.
+    State = #state{
+        scope = Scope,
+        process_name = ProcessName
+    },
+    %% init with 0 timeout
+    {ok, State, 0}.
 
 
 %% ----------------------------------------------------------------------------------------------------------
 %% ----------------------------------------------------------------------------------------------------------
 %% Call messages
 %% Call messages
@@ -75,6 +157,31 @@ init([]) ->
     {noreply, #state{}, Timeout :: non_neg_integer()} |
     {noreply, #state{}, Timeout :: non_neg_integer()} |
     {stop, Reason :: any(), Reply :: any(), #state{}} |
     {stop, Reason :: any(), Reply :: any(), #state{}} |
     {stop, Reason :: any(), #state{}}.
     {stop, Reason :: any(), #state{}}.
+handle_call({register_on_node, Name, Pid, Meta}, _From, #state{scope = Scope} = State) ->
+    %% available
+    {ok, Time} = register_on_node(Scope, Name, Pid, Meta),
+    %% multicast
+    multicast_register(Scope, Name, Pid, Meta, Time, State),
+    %% return
+    {reply, ok, State};
+
+handle_call({unregister_on_node, Name}, _From, #state{scope = Scope} = State) ->
+    case unregister_on_node(Scope, Name) of
+        {ok, RemovedPid} ->
+            multicast_unregister(Scope, Name, RemovedPid, State),
+            %% return
+            {reply, ok, State};
+
+        {error, Reason} ->
+            %% return
+            {reply, {error, Reason}, State}
+    end;
+
+handle_call(get_nodes, _From, #state{
+    nodes = Nodes
+} = State) ->
+    {reply, Nodes, State};
+
 handle_call(Request, From, State) ->
 handle_call(Request, From, State) ->
     error_logger:warning_msg("Syn(~p): Received from ~p an unknown call message: ~p~n", [node(), Request, From]),
     error_logger:warning_msg("Syn(~p): Received from ~p an unknown call message: ~p~n", [node(), Request, From]),
     {reply, undefined, State}.
     {reply, undefined, State}.
@@ -86,17 +193,106 @@ handle_call(Request, From, State) ->
     {noreply, #state{}} |
     {noreply, #state{}} |
     {noreply, #state{}, Timeout :: non_neg_integer()} |
     {noreply, #state{}, Timeout :: non_neg_integer()} |
     {stop, Reason :: any(), #state{}}.
     {stop, Reason :: any(), #state{}}.
+handle_cast({sync_register, Name, RemotePid, RemoteMeta, RemoteTime}, #state{scope = Scope} = State) ->
+    % check for conflicts
+    case find_registry_tuple_by_scope_and_name(Scope, Name) of
+        undefined ->
+            %% no conflict
+            add_to_local_table(Scope, Name, RemotePid, RemoteMeta, RemoteTime, undefined);
+
+        {Name, RemotePid, _, _} ->
+            %% same process, no conflict, overwrite
+            add_to_local_table(Scope, Name, RemotePid, RemoteMeta, RemoteTime, undefined)
+    end,
+    {noreply, State};
+
+handle_cast({sync_unregister, Name, Pid}, #state{scope = Scope} = State) ->
+    %% remove
+    remove_from_local_table(Scope, Name, Pid),
+    %% return
+    {noreply, State};
+
 handle_cast(Msg, State) ->
 handle_cast(Msg, State) ->
     error_logger:warning_msg("Syn(~p): Received an unknown cast message: ~p~n", [node(), Msg]),
     error_logger:warning_msg("Syn(~p): Received an unknown cast message: ~p~n", [node(), Msg]),
     {noreply, State}.
     {noreply, State}.
 
 
 %% ----------------------------------------------------------------------------------------------------------
 %% ----------------------------------------------------------------------------------------------------------
-%% All non Call / Cast messages
+%% Info messages
 %% ----------------------------------------------------------------------------------------------------------
 %% ----------------------------------------------------------------------------------------------------------
 -spec handle_info(Info :: any(), #state{}) ->
 -spec handle_info(Info :: any(), #state{}) ->
     {noreply, #state{}} |
     {noreply, #state{}} |
     {noreply, #state{}, Timeout :: non_neg_integer()} |
     {noreply, #state{}, Timeout :: non_neg_integer()} |
     {stop, Reason :: any(), #state{}}.
     {stop, Reason :: any(), #state{}}.
+handle_info(timeout, #state{
+    scope = Scope,
+    process_name = ProcessName
+} = State) ->
+    error_logger:info_msg("Syn(~p): Discovering nodes in the cluster with scope: ~p~n", [node(), Scope]),
+    lists:foreach(fun(RemoteNode) ->
+        %% send discover message to identically named process on other node
+        {ProcessName, RemoteNode} ! {discover, self()}
+    end, nodes()),
+    {noreply, State};
+
+handle_info({discover, RemoteScopePid}, #state{
+    scope = Scope
+} = State) ->
+    error_logger:info_msg("Syn(~p): Received discover REQ from node ~p with scope: ~p~n",
+        [node(), node(RemoteScopePid), Scope]
+    ),
+    case add_remote_scope_process(RemoteScopePid, State) of
+        {true, State1} ->
+            %% new remote scope process, send sync data back
+            %% TODO: add data to send
+            RemoteScopePid ! {sync, self(), []},
+            %% return
+            {noreply, State1};
+
+        {false, State1} ->
+            %% known scope process, do not sync
+            %% return
+            {noreply, State1}
+    end;
+
+handle_info({sync, RemoteScopePid, _Data}, #state{
+    scope = Scope
+} = State) ->
+    error_logger:info_msg("Syn(~p): Received SYNC from node ~p with scope: ~p~n",
+        [node(), node(RemoteScopePid), Scope]
+    ),
+    case add_remote_scope_process(RemoteScopePid, State) of
+        {true, State1} ->
+            %% new remote scope process, send sync data back
+            %% TODO: add data to send
+            RemoteScopePid ! {sync, self(), []},
+            %% return
+            {noreply, State1};
+
+        {false, State1} ->
+            %% known scope process, do not sync
+            %% return
+            {noreply, State1}
+    end;
+
+handle_info({'DOWN', _MonitorRef, process, Pid, Reason}, #state{scope = Scope} = State) ->
+    case find_registry_tuples_by_scope_and_pid(Scope, Pid) of
+        [] ->
+            %% handle
+            handle_process_down(Scope, undefined, Pid, undefined, Reason, State);
+
+        Entries ->
+            lists:foreach(fun({Name, _Pid, Meta, _Time}) ->
+                %% handle
+                handle_process_down(Scope, Name, Pid, Meta, Reason, State),
+                %% remove from table
+                remove_from_local_table(Scope, Name, Pid),
+                %% multicast
+                multicast_unregister(Scope, Name, Pid, State)
+            end, Entries)
+    end,
+    %% return
+    {noreply, State};
+
 handle_info(Info, State) ->
 handle_info(Info, State) ->
     error_logger:warning_msg("Syn(~p): Received an unknown info message: ~p~n", [node(), Info]),
     error_logger:warning_msg("Syn(~p): Received an unknown info message: ~p~n", [node(), Info]),
     {noreply, State}.
     {noreply, State}.
@@ -124,3 +320,220 @@ get_process_name(Scope) ->
     ModuleBin = atom_to_binary(?MODULE),
     ModuleBin = atom_to_binary(?MODULE),
     ScopeBin = atom_to_binary(Scope),
     ScopeBin = atom_to_binary(Scope),
     binary_to_atom(<<ModuleBin/binary, "_", ScopeBin/binary>>).
     binary_to_atom(<<ModuleBin/binary, "_", ScopeBin/binary>>).
+
+-spec get_table_name(TableName :: atom(), Scope :: atom()) -> atom().
+get_table_name(TableName, Scope) ->
+    TableNameBin = atom_to_binary(TableName),
+    ScopeBin = atom_to_binary(Scope),
+    binary_to_atom(<<TableNameBin/binary, "_", ScopeBin/binary>>).
+
+-spec add_remote_scope_process(RemoteScopePid :: pid(), #state{}) -> {IsNew :: boolean(), #state{}}.
+add_remote_scope_process(RemoteScopePid, #state{
+    nodes = Nodes,
+    remote_scope_monitors = RemoteScopeMonitors
+} = State) ->
+    %% add remote node (always update in case of race-conditions, when new pid is received)
+    Nodes1 = maps:put(node(RemoteScopePid), RemoteScopePid, Nodes),
+
+    %% add monitor if necessary
+    case maps:find(RemoteScopePid, RemoteScopeMonitors) of
+        error ->
+            %% monitor does not exist, add
+            MonitorRef = monitor(process, RemoteScopePid),
+            RemoteScopeMonitors1 = maps:put(RemoteScopePid, MonitorRef, RemoteScopeMonitors),
+            %% update state
+            {true, State#state{nodes = Nodes1, remote_scope_monitors = RemoteScopeMonitors1}};
+
+        {ok, _Ref} ->
+            %% monitor already exists, return existing map
+            {false, State#state{nodes = Nodes1}}
+    end.
+
+-spec find_registry_tuple_by_scope_and_name(Scope :: atom(), Name :: any()) ->
+    RegistryTuple :: syn_registry_tuple() | undefined.
+find_registry_tuple_by_scope_and_name(Scope, Name) ->
+    TableName = get_table_name(syn_registry_by_name, Scope),
+    case ets:select(TableName, [{
+        {{Name, '$2'}, '$3', '$4', '_', '_'},
+        [],
+        [{{{const, Name}, '$2', '$3', '$4'}}]
+    }]) of
+        [RegistryTuple] -> RegistryTuple;
+        _ -> undefined
+    end.
+
+-spec find_registry_entry_by_scope_and_name(Scope :: atom(), Name :: any()) -> Entry :: syn_registry_entry() | undefined.
+find_registry_entry_by_scope_and_name(Scope, Name) ->
+    TableName = get_table_name(syn_registry_by_name, Scope),
+    case ets:select(TableName, [{
+        {{Name, '$2'}, '$3', '_', '_', '_'},
+        [],
+        ['$_']
+    }]) of
+        [RegistryTuple] -> RegistryTuple;
+        _ -> undefined
+    end.
+
+-spec find_registry_tuples_by_scope_and_pid(Scope :: atom(), Pid :: pid()) -> RegistryTuples :: [syn_registry_tuple()].
+find_registry_tuples_by_scope_and_pid(Scope, Pid) when is_pid(Pid) ->
+    TableName = get_table_name(syn_registry_by_pid, Scope),
+    ets:select(TableName, [{
+        {{Pid, '$2'}, '$3', '$4', '_', '_'},
+        [],
+        [{{'$2', Pid, '$3', '$4'}}]
+    }]).
+
+-spec find_monitor_for_scope_and_pid(Scope :: atom(), Pid :: pid()) -> reference() | undefined.
+find_monitor_for_scope_and_pid(Scope, Pid) when is_pid(Pid) ->
+    TableName = get_table_name(syn_registry_by_pid, Scope),
+    case ets:select(TableName, [{
+        {{Pid, '_'}, '_', '_', '$5', '_'},
+        [],
+        ['$5']
+    }], 1) of
+        {[MonitorRef], _} -> MonitorRef;
+        _ -> undefined
+    end.
+
+-spec register_on_node(Scope :: atom(), Name :: any(), Pid :: pid(), Meta :: any()) -> {ok, Time :: integer()}.
+register_on_node(Scope, Name, Pid, Meta) ->
+    MonitorRef = case find_monitor_for_scope_and_pid(Scope, Pid) of
+        undefined ->
+            %% process is not monitored yet, add
+            erlang:monitor(process, Pid);
+
+        MRef ->
+            MRef
+    end,
+    %% add to table
+    Time = erlang:system_time(),
+    add_to_local_table(Scope, Name, Pid, Meta, Time, MonitorRef),
+    {ok, Time}.
+
+-spec unregister_on_node(Scope :: atom(), Name :: any()) -> {ok, RemovedPid :: pid()} | {error, Reason :: any()}.
+unregister_on_node(Scope, Name) ->
+    case find_registry_entry_by_scope_and_name(Scope, Name) of
+        undefined ->
+            {error, undefined};
+
+        {{Name, Pid}, _Meta, _Clock, MonitorRef, _Node} when MonitorRef =/= undefined ->
+            %% demonitor if the process is not registered under other names
+            maybe_demonitor(Scope, Pid),
+            %% remove from table
+            remove_from_local_table(Scope, Name, Pid),
+            %% return
+            {ok, Pid};
+
+        {{Name, Pid}, _Meta, _Clock, _MonitorRef, Node} = RegistryEntry when Node =:= node() ->
+            error_logger:error_msg(
+                "Syn(~p): INTERNAL ERROR | Registry entry ~p has no monitor but it's running on node~n",
+                [node(), RegistryEntry]
+            ),
+            %% remove from table
+            remove_from_local_table(Scope, Name, Pid),
+            %% return
+            {ok, Pid};
+
+        RegistryEntry ->
+            %% race condition: un-registration request but entry in table is not a local pid (has no monitor)
+            %% sync messages will take care of it
+            error_logger:info_msg(
+                "Syn(~p): Registry entry ~p is not monitored and it's not running on node~n",
+                [node(), RegistryEntry]
+            ),
+            {error, remote_pid}
+    end.
+
+-spec handle_process_down(
+    Scope :: atom(),
+    Name :: any(),
+    Pid :: pid(),
+    Meta :: any(),
+    Reason :: any(),
+    #state{}
+) -> ok.
+handle_process_down(Scope, Name, Pid, Meta, Reason, _State) ->
+    case Name of
+        undefined ->
+            case Reason of
+                {syn_resolve_kill, KillName, KillMeta} ->
+                    syn_event_handler:on_process_unregistered(Scope, KillName, Pid, KillMeta, syn_resolve_kill);
+
+                _ ->
+                    error_logger:warning_msg(
+                        "Syn(~p): Received a DOWN message from an unregistered process ~p with reason: ~p~n",
+                        [node(), Pid, Reason]
+                    )
+            end;
+
+        _ ->
+            syn_event_handler:on_process_unregistered(Scope, Name, Pid, Meta, Reason)
+    end.
+
+-spec maybe_demonitor(Scope :: atom(), Pid :: pid()) -> ok.
+maybe_demonitor(Scope, Pid) ->
+    %% try to retrieve 2 items
+    %% if only 1 is returned it means that no other aliases exist for the Pid
+    TableName = get_table_name(syn_registry_by_pid, Scope),
+    case ets:select(TableName, [{
+        {{Pid, '_'}, '_', '_', '$5', '_'},
+        [],
+        ['$5']
+    }], 2) of
+        {[MonitorRef], _} ->
+            %% no other aliases, demonitor
+            erlang:demonitor(MonitorRef, [flush]),
+            ok;
+        _ ->
+            ok
+    end.
+
+-spec add_to_local_table(
+    Scope :: atom(),
+    Name :: any(),
+    Pid :: pid(),
+    Meta :: any(),
+    Time :: integer(),
+    MonitorRef :: undefined | reference()
+) -> ok.
+add_to_local_table(Scope, Name, Pid, Meta, Time, MonitorRef) ->
+    ets:insert(get_table_name(syn_registry_by_name, Scope), {{Name, Pid}, Meta, Time, MonitorRef, node(Pid)}),
+    ets:insert(get_table_name(syn_registry_by_pid, Scope), {{Pid, Name}, Meta, Time, MonitorRef, node(Pid)}),
+    ok.
+
+-spec remove_from_local_table(Scope :: atom(), Name :: any(), Pid :: pid()) -> ok.
+remove_from_local_table(Scope, Name, Pid) ->
+    ets:delete(get_table_name(syn_registry_by_name, Scope), {Name, Pid}),
+    ets:delete(get_table_name(syn_registry_by_pid, Scope), {Pid, Name}),
+    ok.
+
+%% ----- \/ multicast ------------------------------------------------
+-spec multicast_register(
+    Scope :: atom(),
+    Name :: any(),
+    Pid :: pid(),
+    Meta :: any(),
+    Time :: integer(),
+    #state{}
+) -> any().
+multicast_register(Scope, Name, Pid, Meta, Time, #state{
+    nodes = Nodes
+}) ->
+    lists:foreach(fun(RemoteNode) ->
+        sync_register(RemoteNode, Scope, Name, Pid, Meta, Time)
+    end, maps:keys(Nodes)),
+    ok.
+
+-spec multicast_unregister(
+    Scope :: atom(),
+    Name :: any(),
+    Pid :: pid(),
+    #state{}
+) -> any().
+multicast_unregister(Scope, Name, Pid, #state{
+    nodes = Nodes
+}) ->
+    lists:foreach(fun(RemoteNode) ->
+        sync_unregister(RemoteNode, Scope, Name, Pid)
+    end, maps:keys(Nodes)),
+    ok.

+ 286 - 0
test/syn_registry_SUITE.erl

@@ -0,0 +1,286 @@
+%% ==========================================================================================================
+%% Syn - A global Process Registry and Process Group manager.
+%%
+%% The MIT License (MIT)
+%%
+%% Copyright (c) 2015-2019 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
+%%
+%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%% of this software and associated documentation files (the "Software"), to deal
+%% in the Software without restriction, including without limitation the rights
+%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%% copies of the Software, and to permit persons to whom the Software is
+%% furnished to do so, subject to the following conditions:
+%%
+%% The above copyright notice and this permission notice shall be included in
+%% all copies or substantial portions of the Software.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+%% THE SOFTWARE.
+%% ==========================================================================================================
+-module(syn_registry_SUITE).
+
+%% callbacks
+-export([all/0]).
+-export([init_per_suite/1, end_per_suite/1]).
+-export([groups/0, init_per_group/2, end_per_group/2]).
+-export([init_per_testcase/2, end_per_testcase/2]).
+
+%% tests
+-export([
+    three_nodes_discover_default_scope/1,
+    three_nodes_register_unregister_and_monitor_default_scope/1
+]).
+
+%% include
+-include_lib("common_test/include/ct.hrl").
+-include_lib("../src/syn.hrl").
+
+%% ===================================================================
+%% Callbacks
+%% ===================================================================
+
+%% -------------------------------------------------------------------
+%% Function: all() -> GroupsAndTestCases | {skip,Reason}
+%% GroupsAndTestCases = [{group,GroupName} | TestCase]
+%% GroupName = atom()
+%% TestCase = atom()
+%% Reason = any()
+%% -------------------------------------------------------------------
+all() ->
+    [
+        {group, three_nodes_process_registration}
+    ].
+
+%% -------------------------------------------------------------------
+%% Function: groups() -> [Group]
+%% Group = {GroupName,Properties,GroupsAndTestCases}
+%% GroupName =  atom()
+%% Properties = [parallel | sequence | Shuffle | {RepeatType,N}]
+%% GroupsAndTestCases = [Group | {group,GroupName} | TestCase]
+%% TestCase = atom()
+%% Shuffle = shuffle | {shuffle,{integer(),integer(),integer()}}
+%% RepeatType = repeat | repeat_until_all_ok | repeat_until_all_fail |
+%%			   repeat_until_any_ok | repeat_until_any_fail
+%% N = integer() | forever
+%% -------------------------------------------------------------------
+groups() ->
+    [
+        {three_nodes_process_registration, [shuffle], [
+            three_nodes_discover_default_scope,
+            three_nodes_register_unregister_and_monitor_default_scope
+        ]}
+    ].
+%% -------------------------------------------------------------------
+%% Function: init_per_suite(Config0) ->
+%%				Config1 | {skip,Reason} |
+%%              {skip_and_save,Reason,Config1}
+%% Config0 = Config1 = [tuple()]
+%% Reason = any()
+%% -------------------------------------------------------------------
+init_per_suite(Config) ->
+    Config.
+
+%% -------------------------------------------------------------------
+%% Function: end_per_suite(Config0) -> void() | {save_config,Config1}
+%% Config0 = Config1 = [tuple()]
+%% -------------------------------------------------------------------
+end_per_suite(_Config) ->
+    ok.
+
+%% -------------------------------------------------------------------
+%% Function: init_per_group(GroupName, Config0) ->
+%%				Config1 | {skip,Reason} |
+%%              {skip_and_save,Reason,Config1}
+%% GroupName = atom()
+%% Config0 = Config1 = [tuple()]
+%% Reason = any()
+%% -------------------------------------------------------------------
+init_per_group(three_nodes_process_registration, Config) ->
+    %% start slave
+    {ok, SlaveNode1} = syn_test_suite_helper:start_slave(syn_slave_1),
+    {ok, SlaveNode2} = syn_test_suite_helper:start_slave(syn_slave_2),
+    %% wait full cluster
+    case syn_test_suite_helper:wait_cluster_connected([node(), SlaveNode1, SlaveNode2]) of
+        ok ->
+            %% config
+            [{slave_node_1, SlaveNode1}, {slave_node_2, SlaveNode2} | Config];
+
+        Other ->
+            ct:pal("*********** Could not get full cluster, skipping"),
+            end_per_group(three_nodes_process_registration, Config),
+            {skip, Other}
+    end;
+
+init_per_group(_GroupName, Config) ->
+    Config.
+
+%% -------------------------------------------------------------------
+%% Function: end_per_group(GroupName, Config0) ->
+%%				void() | {save_config,Config1}
+%% GroupName = atom()
+%% Config0 = Config1 = [tuple()]
+%% -------------------------------------------------------------------
+end_per_group(three_nodes_process_registration, Config) ->
+    SlaveNode1 = proplists:get_value(slave_node_1, Config),
+    syn_test_suite_helper:connect_node(SlaveNode1),
+    SlaveNode2 = proplists:get_value(slave_node_2, Config),
+    syn_test_suite_helper:connect_node(SlaveNode2),
+    syn_test_suite_helper:clean_after_test(),
+    syn_test_suite_helper:stop_slave(syn_slave_1),
+    syn_test_suite_helper:stop_slave(syn_slave_2),
+    timer:sleep(1000);
+end_per_group(_GroupName, _Config) ->
+    syn_test_suite_helper:clean_after_test().
+
+%% -------------------------------------------------------------------
+%% Function: init_per_testcase(TestCase, Config0) ->
+%%				Config1 | {skip,Reason} | {skip_and_save,Reason,Config1}
+%% TestCase = atom()
+%% Config0 = Config1 = [tuple()]
+%% Reason = any()
+%% -------------------------------------------------------------------
+init_per_testcase(TestCase, Config) ->
+    ct:pal("Starting test: ~p", [TestCase]),
+    Config.
+
+%% -------------------------------------------------------------------
+%% Function: end_per_testcase(TestCase, Config0) ->
+%%				void() | {save_config,Config1} | {fail,Reason}
+%% TestCase = atom()
+%% Config0 = Config1 = [tuple()]
+%% Reason = any()
+%% -------------------------------------------------------------------
+end_per_testcase(_, _Config) ->
+    syn_test_suite_helper:clean_after_test().
+
+%% ===================================================================
+%% Tests
+%% ===================================================================
+three_nodes_discover_default_scope(Config) ->
+    %% get slaves
+    SlaveNode1 = proplists:get_value(slave_node_1, Config),
+    SlaveNode2 = proplists:get_value(slave_node_2, Config),
+    %% start syn on nodes
+    ok = syn:start(),
+    ok = rpc:call(SlaveNode1, syn, start, []),
+    ok = rpc:call(SlaveNode2, syn, start, []),
+    timer:sleep(50),
+
+    %% check scope network
+    NodesMap = syn_registry:get_nodes(default),
+    Nodes = maps:keys(NodesMap),
+    2 = length(Nodes),
+    true = lists:member(SlaveNode1, Nodes),
+    true = lists:member(SlaveNode2, Nodes),
+    NodesMap1 = rpc:call(SlaveNode1, syn_registry, get_nodes, [default]),
+    Nodes1 = maps:keys(NodesMap1),
+    2 = length(Nodes1),
+    true = lists:member(node(), Nodes1),
+    true = lists:member(SlaveNode2, Nodes1),
+    NodesMap2 = rpc:call(SlaveNode2, syn_registry, get_nodes, [default]),
+    Nodes2 = maps:keys(NodesMap2),
+    2 = length(Nodes2),
+    true = lists:member(node(), Nodes2),
+    true = lists:member(SlaveNode1, Nodes2).
+
+three_nodes_register_unregister_and_monitor_default_scope(Config) ->
+    %% get slaves
+    SlaveNode1 = proplists:get_value(slave_node_1, Config),
+    SlaveNode2 = proplists:get_value(slave_node_2, Config),
+    %% start syn on nodes
+    ok = syn:start(),
+    ok = rpc:call(SlaveNode1, syn, start, []),
+    ok = rpc:call(SlaveNode2, syn, start, []),
+    timer:sleep(50),
+
+    %% start processes
+    Pid = syn_test_suite_helper:start_process(),
+    PidWithMeta = syn_test_suite_helper:start_process(),
+    PidRemote1 = syn_test_suite_helper:start_process(SlaveNode1),
+
+    %% retrieve
+    undefined = syn:lookup(<<"my proc">>),
+    undefined = rpc:call(SlaveNode1, syn, lookup, [<<"my proc">>]),
+    undefined = rpc:call(SlaveNode2, syn, lookup, [<<"my proc">>]),
+    undefined = syn:lookup({"my proc 2"}),
+    undefined = rpc:call(SlaveNode1, syn, lookup, [{"my proc 2"}]),
+    undefined = rpc:call(SlaveNode2, syn, lookup, [{"my proc 2"}]),
+    undefined = syn:lookup(<<"my proc with meta">>),
+    undefined = rpc:call(SlaveNode1, syn, lookup, [<<"my proc with meta">>]),
+    undefined = rpc:call(SlaveNode2, syn, lookup, [<<"my proc with meta">>]),
+    undefined = syn:lookup({remote_pid_on, slave_1}),
+    undefined = rpc:call(SlaveNode1, syn, lookup, [{remote_pid_on, slave_1}]),
+    undefined = rpc:call(SlaveNode2, syn, lookup, [{remote_pid_on, slave_1}]),
+%%    0 = syn:registry_count(),
+%%    0 = syn:registry_count(node()),
+%%    0 = syn:registry_count(SlaveNode1),
+%%    0 = syn:registry_count(SlaveNode2),
+
+    %% register
+    ok = syn:register(<<"my proc">>, Pid),
+    ok = syn:register({"my proc 2"}, Pid), %% same pid, different name
+    ok = syn:register(<<"my proc with meta">>, PidWithMeta, {meta, <<"meta">>}), %% pid with meta
+    ok = rpc:call(SlaveNode1, syn, register, [{remote_pid_on, slave_1}, PidRemote1]), %% remote on slave 1
+    timer:sleep(100),
+
+    %% retrieve
+    {Pid, undefined} = syn:lookup(<<"my proc">>),
+    {Pid, undefined} = rpc:call(SlaveNode1, syn, lookup, [<<"my proc">>]),
+    {Pid, undefined} = rpc:call(SlaveNode2, syn, lookup, [<<"my proc">>]),
+    {Pid, undefined} = syn:lookup({"my proc 2"}),
+    {Pid, undefined} = rpc:call(SlaveNode1, syn, lookup, [{"my proc 2"}]),
+    {Pid, undefined} = rpc:call(SlaveNode2, syn, lookup, [{"my proc 2"}]),
+    {PidWithMeta, {meta, <<"meta">>}} = syn:lookup(<<"my proc with meta">>),
+    {PidWithMeta, {meta, <<"meta">>}} = rpc:call(SlaveNode1, syn, lookup, [<<"my proc with meta">>]),
+    {PidWithMeta, {meta, <<"meta">>}} = rpc:call(SlaveNode2, syn, lookup, [<<"my proc with meta">>]),
+    {PidRemote1, undefined} = syn:lookup({remote_pid_on, slave_1}),
+    {PidRemote1, undefined} = rpc:call(SlaveNode1, syn, lookup, [{remote_pid_on, slave_1}]),
+    {PidRemote1, undefined} = rpc:call(SlaveNode2, syn, lookup, [{remote_pid_on, slave_1}]),
+%%    4 = syn:registry_count(),
+%%    3 = syn:registry_count(node()),
+%%    1 = syn:registry_count(SlaveNode1),
+%%    0 = syn:registry_count(SlaveNode2),
+
+    %% re-register to edit meta
+    ok = syn:register(<<"my proc with meta">>, PidWithMeta, {meta2, <<"meta2">>}),
+    ok = rpc:call(SlaveNode2, syn, register, [{remote_pid_on, slave_1}, PidRemote1, added_meta]), %% updated on slave 2
+    timer:sleep(50),
+
+    %% retrieve
+    {PidWithMeta, {meta2, <<"meta2">>}} = syn:lookup(<<"my proc with meta">>),
+    {PidWithMeta, {meta2, <<"meta2">>}} = rpc:call(SlaveNode1, syn, lookup, [<<"my proc with meta">>]),
+    {PidWithMeta, {meta2, <<"meta2">>}} = rpc:call(SlaveNode2, syn, lookup, [<<"my proc with meta">>]),
+    {PidRemote1, added_meta} = syn:lookup({remote_pid_on, slave_1}),
+    {PidRemote1, added_meta} = rpc:call(SlaveNode1, syn, lookup, [{remote_pid_on, slave_1}]),
+    {PidRemote1, added_meta} = rpc:call(SlaveNode2, syn, lookup, [{remote_pid_on, slave_1}]),
+
+    %% kill process
+    syn_test_suite_helper:kill_process(Pid),
+    syn_test_suite_helper:kill_process(PidRemote1),
+    %% unregister process
+    syn:unregister(<<"my proc with meta">>),
+    timer:sleep(50),
+
+    %% retrieve
+    undefined = syn:lookup(<<"my proc">>),
+    undefined = rpc:call(SlaveNode1, syn, lookup, [<<"my proc">>]),
+    undefined = rpc:call(SlaveNode2, syn, lookup, [<<"my proc">>]),
+    undefined = syn:lookup({"my proc 2"}),
+    undefined = rpc:call(SlaveNode1, syn, lookup, [{"my proc 2"}]),
+    undefined = rpc:call(SlaveNode2, syn, lookup, [{"my proc 2"}]),
+    undefined = syn:lookup(<<"my proc with meta">>),
+    undefined = rpc:call(SlaveNode1, syn, lookup, [<<"my proc with meta">>]),
+    undefined = rpc:call(SlaveNode2, syn, lookup, [<<"my proc with meta">>]),
+    undefined = syn:lookup({remote_pid_on, slave_1}),
+    undefined = rpc:call(SlaveNode1, syn, lookup, [{remote_pid_on, slave_1}]),
+    undefined = rpc:call(SlaveNode2, syn, lookup, [{remote_pid_on, slave_1}]).
+%%    0 = syn:registry_count(),
+%%    0 = syn:registry_count(node()),
+%%    0 = syn:registry_count(SlaveNode1),
+%%    0 = syn:registry_count(SlaveNode2).

+ 123 - 0
test/syn_test_suite_helper.erl

@@ -0,0 +1,123 @@
+%% ==========================================================================================================
+%% Syn - A global Process Registry and Process Group manager.
+%%
+%% The MIT License (MIT)
+%%
+%% Copyright (c) 2015-2021 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
+%%
+%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%% of this software and associated documentation files (the "Software"), to deal
+%% in the Software without restriction, including without limitation the rights
+%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%% copies of the Software, and to permit persons to whom the Software is
+%% furnished to do so, subject to the following conditions:
+%%
+%% The above copyright notice and this permission notice shall be included in
+%% all copies or substantial portions of the Software.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+%% THE SOFTWARE.
+%% ==========================================================================================================
+-module(syn_test_suite_helper).
+
+%% API
+-export([start_slave/1, start_slave/4]).
+-export([stop_slave/1, stop_slave/2]).
+-export([connect_node/1, disconnect_node/1]).
+-export([clean_after_test/0]).
+-export([start_process/0, start_process/1, start_process/2]).
+-export([kill_process/1]).
+-export([wait_cluster_connected/1]).
+
+%% internal
+-export([process_main/0]).
+
+%% ===================================================================
+%% API
+%% ===================================================================
+start_slave(NodeShortName) ->
+    {ok, Node} = ct_slave:start(NodeShortName, [{boot_timeout, 10}]),
+    CodePath = code:get_path(),
+    true = rpc:call(Node, code, set_path, [CodePath]),
+    {ok, Node}.
+start_slave(NodeShortName, Host, Username, Password) ->
+    {ok, Node} = ct_slave:start(Host, NodeShortName, [
+        {boot_timeout, 10},
+        {username, Username},
+        {password, Password}
+    ]),
+    CodePath = code:get_path(),
+    true = rpc:call(Node, code, set_path, [CodePath]),
+    {ok, Node}.
+
+stop_slave(NodeShortName) ->
+    {ok, _} = ct_slave:stop(NodeShortName).
+stop_slave(Host, NodeShortName) ->
+    {ok, _} = ct_slave:stop(Host, NodeShortName).
+
+connect_node(Node) ->
+    net_kernel:connect_node(Node).
+
+disconnect_node(Node) ->
+    erlang:disconnect_node(Node).
+
+clean_after_test() ->
+    Nodes = [node() | nodes()],
+    %% shutdown
+    lists:foreach(fun(Node) ->
+        %% close syn
+        rpc:call(Node, application, stop, [syn])
+    end, Nodes).
+
+start_process() ->
+    Pid = spawn(fun process_main/0),
+    Pid.
+start_process(Node) when is_atom(Node) ->
+    Pid = spawn(Node, fun process_main/0),
+    Pid;
+start_process(Loop) when is_function(Loop) ->
+    Pid = spawn(Loop),
+    Pid.
+start_process(Node, Loop) ->
+    Pid = spawn(Node, Loop),
+    Pid.
+
+kill_process(Pid) ->
+    exit(Pid, kill).
+
+wait_cluster_connected(Nodes) ->
+    wait_cluster_connected(Nodes, os:system_time(millisecond)).
+wait_cluster_connected(Nodes, StartAt) ->
+    AllSynced = lists:all(fun(Node) ->
+        RemoteNodes = rpc:call(Node, erlang, nodes, []),
+        AllNodes = [Node | RemoteNodes],
+        lists:sort(AllNodes) == lists:sort(Nodes)
+    end, Nodes),
+
+    case AllSynced of
+        true ->
+            ok;
+
+        false ->
+            case os:system_time(millisecond) - StartAt > 5000 of
+                true ->
+                    {error, {could_not_init_cluster, Nodes}};
+                false ->
+                    timer:sleep(1000),
+                    wait_cluster_connected(Nodes, StartAt)
+            end
+    end.
+
+
+%% ===================================================================
+%% Internal
+%% ===================================================================
+process_main() ->
+    receive
+        _ -> process_main()
+    end.