Browse Source

Add on_unregister callback and finalize on_register.

Roberto Ostinelli 3 years ago
parent
commit
a84c1926a3

+ 46 - 17
src/syn_event_handler.erl

@@ -31,6 +31,7 @@
 %% API
 -export([ensure_event_handler_loaded/0]).
 -export([do_on_process_registered/4]).
+-export([do_on_process_unregistered/4]).
 -export([do_resolve_registry_conflict/4]).
 
 -callback on_process_registered(
@@ -40,13 +41,20 @@
     Meta :: any()
 ) -> any().
 
+-callback on_process_unregistered(
+    Scope :: any(),
+    Name :: any(),
+    Pid :: pid(),
+    Meta :: any()
+) -> any().
+
 -callback resolve_registry_conflict(
     Name :: any(),
     {Pid1 :: pid(), Meta1 :: any()},
     {Pid2 :: pid(), Meta2 :: any()}
 ) -> PidToKeep :: pid() | undefined.
 
--optional_callbacks([on_process_registered/4, resolve_registry_conflict/3]).
+-optional_callbacks([on_process_registered/4, on_process_unregistered/4, resolve_registry_conflict/3]).
 
 %% ===================================================================
 %% API
@@ -61,24 +69,22 @@ ensure_event_handler_loaded() ->
 -spec do_on_process_registered(
     Scope :: atom(),
     Name :: any(),
-    Pid :: pid(),
-    Meta :: any()
+    {TablePid :: pid(), TableMeta :: any()},
+    {Pid :: pid(), Meta :: any()}
 ) -> any().
-do_on_process_registered(Scope, Name, Pid, Meta) ->
-    CustomEventHandler = get_custom_event_handler(),
-    case erlang:function_exported(CustomEventHandler, on_process_registered, 4) of
-        true ->
-            try CustomEventHandler:on_process_registered(Scope, Name, Pid, Meta)
-            catch Class:Reason:Stacktrace ->
-                error_logger:error_msg(
-                    "Syn(~p): Error ~p:~p in custom handler on_process_registered: ~p~n",
-                    [node(), Class, Reason, Stacktrace]
-                )
-            end;
+do_on_process_registered(_Scope, _Name, {TablePid, TableMeta}, {Pid, Meta})
+    when TablePid =:= Pid, TableMeta =:= Meta -> ok;
+do_on_process_registered(Scope, Name, {_TablePid, _TableMeta}, {Pid, Meta}) ->
+    call_callback_event(on_process_registered, Scope, Name, Pid, Meta).
 
-        _ ->
-            ok
-    end.
+-spec do_on_process_unregistered(
+    Scope :: atom(),
+    Name :: any(),
+    TablePid :: pid(),
+    TableMeta :: any()
+) -> any().
+do_on_process_unregistered(Scope, Name, Pid, Meta) ->
+    call_callback_event(on_process_unregistered, Scope, Name, Pid, Meta).
 
 -spec do_resolve_registry_conflict(
     Scope :: atom(),
@@ -119,3 +125,26 @@ do_resolve_registry_conflict(Scope, Name, {Pid1, Meta1, Time1}, {Pid2, Meta2, Ti
 -spec get_custom_event_handler() -> undefined | {ok, CustomEventHandler :: atom()}.
 get_custom_event_handler() ->
     application:get_env(syn, event_handler, undefined).
+
+-spec call_callback_event(
+    CallbackMethod :: atom(),
+    Scope :: atom(),
+    Name :: any(),
+    Pid :: pid(),
+    Meta :: any()
+) -> any().
+call_callback_event(CallbackMethod, Scope, Name, Pid, Meta) ->
+    CustomEventHandler = get_custom_event_handler(),
+    case erlang:function_exported(CustomEventHandler, CallbackMethod, 4) of
+        true ->
+            try CustomEventHandler:CallbackMethod(Scope, Name, Pid, Meta)
+            catch Class:Reason:Stacktrace ->
+                error_logger:error_msg(
+                    "Syn(~p): Error ~p:~p in custom handler ~p: ~p~n",
+                    [node(), Class, Reason, CallbackMethod, Stacktrace]
+                )
+            end;
+
+        _ ->
+            ok
+    end.

+ 55 - 19
src/syn_registry.erl

@@ -95,9 +95,12 @@ register(Scope, Name, Pid, Meta) ->
     ProcessName = get_process_name_for_scope(Scope),
     Node = node(Pid),
     try gen_server:call({ProcessName, Node}, {register_on_node, Name, Pid, Meta}) of
-        {ok, Time} when Node =/= node() ->
+        {ok, {TablePid, TableMeta, Time}} when Node =/= node() ->
             %% update table on caller node immediately so that subsequent calls have an updated registry
             add_to_local_table(Scope, Name, Pid, Meta, Time, undefined),
+            %% callback
+            syn_event_handler:do_on_process_registered(Scope, Name, {TablePid, TableMeta}, {Pid, Meta}),
+            %% return
             ok;
 
         {Response, _} ->
@@ -117,13 +120,16 @@ unregister(Scope, Name) ->
         undefined ->
             {error, undefined};
 
-        {{Name, Pid}, _, _, _, _} ->
+        {{Name, Pid}, Meta, _, _, _} ->
             ProcessName = get_process_name_for_scope(Scope),
             Node = node(Pid),
             case gen_server:call({ProcessName, Node}, {unregister_on_node, Name, Pid}) of
                 ok when Node =/= node() ->
                     %% remove table on caller node immediately so that subsequent calls have an updated registry
                     remove_from_local_table(Scope, Name, Pid),
+                    %% callback
+                    syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
+                    %% return
                     ok;
 
                 Response ->
@@ -207,19 +213,23 @@ handle_call({register_on_node, Name, Pid, Meta}, _From, #state{
                     %% add to local table
                     Time = erlang:system_time(),
                     add_to_local_table(Scope, Name, Pid, Meta, Time, MRef),
+                    %% callback
+                    syn_event_handler:do_on_process_registered(Scope, Name, {undefined, undefined}, {Pid, Meta}),
                     %% broadcast
                     broadcast({'3.0', sync_register, Scope, Name, Pid, Meta, Time}, State),
                     %% return
-                    {reply, {ok, Time}, State};
+                    {reply, {ok, {undefined, undefined, Time}}, State};
 
-                {{Name, Pid}, _TableMeta, _TableTime, MRef, _TableNode} ->
+                {{Name, Pid}, TableMeta, _TableTime, MRef, _TableNode} ->
                     %% same pid, possibly new meta or time, overwrite
                     Time = erlang:system_time(),
                     add_to_local_table(Scope, Name, Pid, Meta, Time, MRef),
+                    %% callback
+                    syn_event_handler:do_on_process_registered(Scope, Name, {Pid, TableMeta}, {Pid, Meta}),
                     %% broadcast
                     broadcast({'3.0', sync_register, Scope, Name, Pid, Meta, Time}, State),
                     %% return
-                    {reply, {ok, Time}, State};
+                    {reply, {ok, {Pid, TableMeta, Time}}, State};
 
                 _ ->
                     {reply, {{error, taken}, undefined}, State}
@@ -231,13 +241,15 @@ handle_call({register_on_node, Name, Pid, Meta}, _From, #state{
 
 handle_call({unregister_on_node, Name, Pid}, _From, #state{scope = Scope} = State) ->
     case find_registry_entry_by_name(Scope, Name) of
-        {{Name, Pid}, _Meta, _Time, _MRef, _Node} ->
+        {{Name, Pid}, Meta, _Time, _MRef, _Node} ->
             %% demonitor if the process is not registered under other names
             maybe_demonitor(Scope, Pid),
             %% remove from table
             remove_from_local_table(Scope, Name, Pid),
+            %% callback
+            syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
             %% broadcast
-            broadcast({'3.0', sync_unregister, Name, Pid}, State),
+            broadcast({'3.0', sync_unregister, Name, Pid, Meta}, State),
             %% return
             {reply, ok, State};
 
@@ -264,8 +276,11 @@ handle_cast({'3.0', sync_register, Scope, Name, Pid, Meta, Time}, State) ->
     handle_registry_sync(Scope, Name, Pid, Meta, Time, State),
     {noreply, State};
 
-handle_cast({'3.0', sync_unregister, Name, Pid}, #state{scope = Scope} = State) ->
+handle_cast({'3.0', sync_unregister, Name, Pid, Meta}, #state{scope = Scope} = State) ->
     remove_from_local_table(Scope, Name, Pid),
+    %% callback
+    syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
+    %% return
     {noreply, State};
 
 handle_cast({'3.0', announce, RemoteScopePid}, #state{
@@ -359,11 +374,13 @@ handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{scope = Scope} = State
             );
 
         Entries ->
-            lists:foreach(fun({{Name, _Pid}, _, _, _, _}) ->
+            lists:foreach(fun({{Name, _Pid}, Meta, _, _, _}) ->
                 %% remove from table
                 remove_from_local_table(Scope, Name, Pid),
+                %% callback
+                syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
                 %% broadcast
-                broadcast({'3.0', sync_unregister, Name, Pid}, State)
+                broadcast({'3.0', sync_unregister, Name, Pid, Meta}, State)
             end, Entries)
     end,
     %% return
@@ -497,7 +514,7 @@ maybe_demonitor(Scope, Pid) ->
     Meta :: any(),
     Time :: integer(),
     MRef :: undefined | reference()
-) -> any().
+) -> true.
 add_to_local_table(Scope, Name, Pid, Meta, Time, MRef) ->
     %% insert
     true = ets:insert(syn_backbone:get_table_name(syn_registry_by_name, Scope),
@@ -505,9 +522,7 @@ add_to_local_table(Scope, Name, Pid, Meta, Time, MRef) ->
     ),
     true = ets:insert(syn_backbone:get_table_name(syn_registry_by_pid, Scope),
         {{Pid, Name}, Meta, Time, MRef, node(Pid)}
-    ),
-    %% callback
-    syn_event_handler:do_on_process_registered(Scope, Name, Pid, Meta).
+    ).
 
 -spec remove_from_local_table(Scope :: atom(), Name :: any(), Pid :: pid()) -> true.
 remove_from_local_table(Scope, Name, Pid) ->
@@ -532,6 +547,14 @@ update_local_table(Scope, Name, PreviousPid, {Pid, Meta, Time, MRef}) ->
 
 -spec purge_registry_for_remote_node(Scope :: atom(), Node :: atom()) -> true.
 purge_registry_for_remote_node(Scope, Node) when Node =/= node() ->
+    %% loop elements for callback in a separate process to free scope process
+    RegistryTuples = get_registry_tuples_for_node(Scope, Node),
+    spawn(fun() ->
+        lists:foreach(fun({Name, Pid, Meta, _Time}) ->
+            syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta)
+        end, RegistryTuples)
+    end),
+    %% remove all from pid table
     true = ets:match_delete(syn_backbone:get_table_name(syn_registry_by_name, Scope), {{'_', '_'}, '_', '_', '_', Node}),
     true = ets:match_delete(syn_backbone:get_table_name(syn_registry_by_pid, Scope), {{'_', '_'}, '_', '_', '_', Node}).
 
@@ -547,11 +570,15 @@ handle_registry_sync(Scope, Name, Pid, Meta, Time, State) ->
     case find_registry_entry_by_name(Scope, Name) of
         undefined ->
             %% no conflict
-            add_to_local_table(Scope, Name, Pid, Meta, Time, undefined);
+            add_to_local_table(Scope, Name, Pid, Meta, Time, undefined),
+            %% callback
+            syn_event_handler:do_on_process_registered(Scope, Name, {undefined, undefined}, {Pid, Meta});
 
-        {{Name, Pid}, _TableMeta, _TableTime, MRef, _TableNode} ->
+        {{Name, Pid}, TableMeta, _TableTime, MRef, _TableNode} ->
             %% same pid, more recent (because it comes from the same node, which means that it's sequential)
-            add_to_local_table(Scope, Name, Pid, Meta, Time, MRef);
+            add_to_local_table(Scope, Name, Pid, Meta, Time, MRef),
+            %% callback
+            syn_event_handler:do_on_process_registered(Scope, Name, {Pid, TableMeta}, {Pid, Meta});
 
         {{Name, TablePid}, TableMeta, TableTime, TableMRef, _TableNode} when node(TablePid) =:= node() ->
             %% current node runs a conflicting process -> resolve
@@ -561,9 +588,12 @@ handle_registry_sync(Scope, Name, Pid, Meta, Time, State) ->
             %% * recipients check that the time is more recent that what they have to ensure that there are no race conditions
             resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime, TableMRef}, State);
 
-        {{Name, TablePid}, _TableMeta, TableTime, _TableMRef, _TableNode} when TableTime < Time ->
+        {{Name, TablePid}, TableMeta, TableTime, _TableMRef, _TableNode} when TableTime < Time ->
             %% current node does not own any of the conflicting processes, update
-            update_local_table(Scope, Name, TablePid, {Pid, Meta, Time, undefined});
+            update_local_table(Scope, Name, TablePid, {Pid, Meta, Time, undefined}),
+            %% callbacks
+            syn_event_handler:do_on_process_unregistered(Scope, Name, TablePid, TableMeta),
+            syn_event_handler:do_on_process_registered(Scope, Name, {TablePid, TableMeta}, {Pid, Meta});
 
         {{Name, _TablePid}, _TableMeta, _TableTime, _TableMRef, _TableNode} ->
             %% race condition: incoming data is older, ignore
@@ -591,6 +621,9 @@ resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime
             %% -> we keep the remote pid
             %% update locally, the incoming sync_register will update with the time coming from remote node
             update_local_table(Scope, Name, TablePid, {Pid, Meta, Time, undefined}),
+            %% callbacks
+            syn_event_handler:do_on_process_unregistered(Scope, Name, TablePid, TableMeta),
+            syn_event_handler:do_on_process_registered(Scope, Name, {TablePid, TableMeta}, {Pid, Meta}),
             %% kill
             exit(TablePid, {syn_resolve_kill, Name, TableMeta}),
             error_logger:info_msg("SYN[~p] Registry CONFLICT for name ~p@~p: ~p ~p -> chosen: ~p~n",
@@ -609,8 +642,11 @@ resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime
             );
 
         Invalid ->
+            %% remove
             maybe_demonitor(Scope, TablePid),
             remove_from_local_table(Scope, Name, TablePid),
+            %% callback
+            syn_event_handler:do_on_process_unregistered(Scope, Name, TablePid, TableMeta),
             %% kill local, remote will be killed by other node performing the same resolve
             exit(TablePid, {syn_resolve_kill, Name, TableMeta}),
             error_logger:info_msg("SYN[~p] Registry CONFLICT for name ~p@~p: ~p ~p -> none chosen (got: ~p)~n",

+ 253 - 115
test/syn_registry_SUITE.erl

@@ -39,7 +39,7 @@
     three_nodes_register_unregister_and_monitor_custom_scope/1,
     three_nodes_cluster_changes/1,
     three_nodes_cluster_conflicts/1,
-    three_nodes_custom_event_handler/1
+    three_nodes_custom_event_handler_reg_unreg/1
 ]).
 
 %% include
@@ -83,7 +83,7 @@ groups() ->
             three_nodes_register_unregister_and_monitor_custom_scope,
             three_nodes_cluster_changes,
             three_nodes_cluster_conflicts,
-            three_nodes_custom_event_handler
+            three_nodes_custom_event_handler_reg_unreg
         ]}
     ].
 %% -------------------------------------------------------------------
@@ -116,7 +116,7 @@ init_per_group(three_nodes_process_registration, Config) ->
     {ok, SlaveNode1} = syn_test_suite_helper:start_slave(syn_slave_1),
     {ok, SlaveNode2} = syn_test_suite_helper:start_slave(syn_slave_2),
     %% wait full cluster
-    case syn_test_suite_helper:wait_cluster_connected([node(), SlaveNode1, SlaveNode2]) of
+    case syn_test_suite_helper:wait_cluster_mesh_connected([node(), SlaveNode1, SlaveNode2]) of
         ok ->
             %% config
             [{slave_node_1, SlaveNode1}, {slave_node_2, SlaveNode2} | Config];
@@ -181,57 +181,76 @@ three_nodes_discover_default_scope(Config) ->
     ok = syn:start(),
     ok = rpc:call(SlaveNode1, syn, start, []),
     ok = rpc:call(SlaveNode2, syn, start, []),
-    timer:sleep(250),
+    timer:sleep(500),
 
     %% check
-    assert_scope_subcluster(node(), default, [SlaveNode1, SlaveNode2]),
-    assert_scope_subcluster(SlaveNode1, default, [node(), SlaveNode2]),
-    assert_scope_subcluster(SlaveNode2, default, [node(), SlaveNode1]),
+    syn_test_suite_helper:assert_scope_subcluster(node(), default, [SlaveNode1, SlaveNode2]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode1, default, [node(), SlaveNode2]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode2, default, [node(), SlaveNode1]),
 
     %% simulate full netsplit
     rpc:call(SlaveNode1, syn_test_suite_helper, disconnect_node, [SlaveNode2]),
     syn_test_suite_helper:disconnect_node(SlaveNode1),
     syn_test_suite_helper:disconnect_node(SlaveNode2),
-    timer:sleep(250),
+    timer:sleep(500),
 
     %% check
-    assert_scope_subcluster(node(), default, []),
+    syn_test_suite_helper:assert_scope_subcluster(node(), default, []),
 
     %% reconnect slave 1
     syn_test_suite_helper:connect_node(SlaveNode1),
-    ok = syn_test_suite_helper:wait_cluster_connected([node(), SlaveNode1]),
+    ok = syn_test_suite_helper:wait_cluster_mesh_connected([node(), SlaveNode1]),
 
     %% check
-    assert_scope_subcluster(node(), default, [SlaveNode1]),
-    assert_scope_subcluster(SlaveNode1, default, [node()]),
+    syn_test_suite_helper:assert_scope_subcluster(node(), default, [SlaveNode1]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode1, default, [node()]),
 
     %% reconnect all
     syn_test_suite_helper:connect_node(SlaveNode2),
     rpc:call(SlaveNode1, syn_test_suite_helper, connect_node, [SlaveNode2]),
-    ok = syn_test_suite_helper:wait_cluster_connected([node(), SlaveNode1, SlaveNode2]),
+    ok = syn_test_suite_helper:wait_cluster_mesh_connected([node(), SlaveNode1, SlaveNode2]),
 
     %% check
-    assert_scope_subcluster(node(), default, [SlaveNode1, SlaveNode2]),
-    assert_scope_subcluster(SlaveNode1, default, [node(), SlaveNode2]),
-    assert_scope_subcluster(SlaveNode2, default, [node(), SlaveNode1]),
+    syn_test_suite_helper:assert_scope_subcluster(node(), default, [SlaveNode1, SlaveNode2]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode1, default, [node(), SlaveNode2]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode2, default, [node(), SlaveNode1]),
+
+    %% simulate full netsplit, again
+    rpc:call(SlaveNode1, syn_test_suite_helper, disconnect_node, [SlaveNode2]),
+    syn_test_suite_helper:disconnect_node(SlaveNode1),
+    syn_test_suite_helper:disconnect_node(SlaveNode2),
+    timer:sleep(500),
+
+    %% check
+    syn_test_suite_helper:assert_scope_subcluster(node(), default, []),
+
+    %% reconnect all, again
+    syn_test_suite_helper:connect_node(SlaveNode2),
+    rpc:call(SlaveNode1, syn_test_suite_helper, connect_node, [SlaveNode2]),
+    ok = syn_test_suite_helper:wait_cluster_mesh_connected([node(), SlaveNode1, SlaveNode2]),
+
+    %% check
+    syn_test_suite_helper:assert_scope_subcluster(node(), default, [SlaveNode1, SlaveNode2]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode1, default, [node(), SlaveNode2]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode2, default, [node(), SlaveNode1]),
 
     %% crash the scope process on local
     syn_test_suite_helper:kill_process(syn_registry_default),
-    timer:sleep(250),
+    timer:sleep(500),
 
     %% check, it should have rebuilt after supervisor restarts it
-    assert_scope_subcluster(node(), default, [SlaveNode1, SlaveNode2]),
-    assert_scope_subcluster(SlaveNode1, default, [node(), SlaveNode2]),
-    assert_scope_subcluster(SlaveNode2, default, [node(), SlaveNode1]),
+    syn_test_suite_helper:assert_scope_subcluster(node(), default, [SlaveNode1, SlaveNode2]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode1, default, [node(), SlaveNode2]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode2, default, [node(), SlaveNode1]),
 
     %% crash scopes supervisor on local
     syn_test_suite_helper:kill_process(syn_scopes_sup),
-    timer:sleep(250),
+    timer:sleep(500),
 
     %% check
-    assert_scope_subcluster(node(), default, [SlaveNode1, SlaveNode2]),
-    assert_scope_subcluster(SlaveNode1, default, [node(), SlaveNode2]),
-    assert_scope_subcluster(SlaveNode2, default, [node(), SlaveNode1]).
+    syn_test_suite_helper:assert_scope_subcluster(node(), default, [SlaveNode1, SlaveNode2]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode1, default, [node(), SlaveNode2]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode2, default, [node(), SlaveNode1]).
 
 three_nodes_discover_custom_scope(Config) ->
     %% get slaves
@@ -242,83 +261,83 @@ three_nodes_discover_custom_scope(Config) ->
     ok = syn:start(),
     ok = rpc:call(SlaveNode1, syn, start, []),
     ok = rpc:call(SlaveNode2, syn, start, []),
-    timer:sleep(250),
+    timer:sleep(500),
 
     %% add custom scopes
     ok = syn:add_node_to_scope(custom_scope_ab),
     ok = syn:add_node_to_scope(custom_scope_all),
     ok = rpc:call(SlaveNode1, syn, add_node_to_scopes, [[custom_scope_ab, custom_scope_bc, custom_scope_all]]),
     ok = rpc:call(SlaveNode2, syn, add_node_to_scopes, [[custom_scope_bc, custom_scope_c, custom_scope_all]]),
-    timer:sleep(250),
+    timer:sleep(500),
 
     %% check
-    assert_scope_subcluster(node(), custom_scope_ab, [SlaveNode1]),
-    assert_scope_subcluster(node(), custom_scope_all, [SlaveNode1, SlaveNode2]),
-    assert_scope_subcluster(SlaveNode1, custom_scope_ab, [node()]),
-    assert_scope_subcluster(SlaveNode1, custom_scope_bc, [SlaveNode2]),
-    assert_scope_subcluster(SlaveNode1, custom_scope_all, [node(), SlaveNode2]),
-    assert_scope_subcluster(SlaveNode2, custom_scope_bc, [SlaveNode1]),
-    assert_scope_subcluster(SlaveNode2, custom_scope_c, []),
-    assert_scope_subcluster(SlaveNode2, custom_scope_all, [node(), SlaveNode1]),
+    syn_test_suite_helper:assert_scope_subcluster(node(), custom_scope_ab, [SlaveNode1]),
+    syn_test_suite_helper:assert_scope_subcluster(node(), custom_scope_all, [SlaveNode1, SlaveNode2]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode1, custom_scope_ab, [node()]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode1, custom_scope_bc, [SlaveNode2]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode1, custom_scope_all, [node(), SlaveNode2]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode2, custom_scope_bc, [SlaveNode1]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode2, custom_scope_c, []),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode2, custom_scope_all, [node(), SlaveNode1]),
 
     %% check default
-    assert_scope_subcluster(node(), default, [SlaveNode1, SlaveNode2]),
-    assert_scope_subcluster(SlaveNode1, default, [node(), SlaveNode2]),
-    assert_scope_subcluster(SlaveNode2, default, [node(), SlaveNode1]),
+    syn_test_suite_helper:assert_scope_subcluster(node(), default, [SlaveNode1, SlaveNode2]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode1, default, [node(), SlaveNode2]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode2, default, [node(), SlaveNode1]),
 
     %% disconnect node 2 (node 1 can still see node 2)
     syn_test_suite_helper:disconnect_node(SlaveNode2),
-    timer:sleep(250),
+    timer:sleep(500),
 
     %% check
-    assert_scope_subcluster(node(), custom_scope_ab, [SlaveNode1]),
-    assert_scope_subcluster(node(), custom_scope_all, [SlaveNode1]),
-    assert_scope_subcluster(SlaveNode1, custom_scope_ab, [node()]),
-    assert_scope_subcluster(SlaveNode1, custom_scope_bc, [SlaveNode2]),
-    assert_scope_subcluster(SlaveNode1, custom_scope_all, [node(), SlaveNode2]),
+    syn_test_suite_helper:assert_scope_subcluster(node(), custom_scope_ab, [SlaveNode1]),
+    syn_test_suite_helper:assert_scope_subcluster(node(), custom_scope_all, [SlaveNode1]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode1, custom_scope_ab, [node()]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode1, custom_scope_bc, [SlaveNode2]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode1, custom_scope_all, [node(), SlaveNode2]),
 
     %% reconnect node 2
     syn_test_suite_helper:connect_node(SlaveNode2),
-    ok = syn_test_suite_helper:wait_cluster_connected([node(), SlaveNode1, SlaveNode2]),
-    timer:sleep(250),
+    ok = syn_test_suite_helper:wait_cluster_mesh_connected([node(), SlaveNode1, SlaveNode2]),
+    timer:sleep(500),
 
     %% check
-    assert_scope_subcluster(node(), custom_scope_ab, [SlaveNode1]),
-    assert_scope_subcluster(node(), custom_scope_all, [SlaveNode1, SlaveNode2]),
-    assert_scope_subcluster(SlaveNode1, custom_scope_ab, [node()]),
-    assert_scope_subcluster(SlaveNode1, custom_scope_bc, [SlaveNode2]),
-    assert_scope_subcluster(SlaveNode1, custom_scope_all, [node(), SlaveNode2]),
-    assert_scope_subcluster(SlaveNode2, custom_scope_bc, [SlaveNode1]),
-    assert_scope_subcluster(SlaveNode2, custom_scope_c, []),
-    assert_scope_subcluster(SlaveNode2, custom_scope_all, [node(), SlaveNode1]),
+    syn_test_suite_helper:assert_scope_subcluster(node(), custom_scope_ab, [SlaveNode1]),
+    syn_test_suite_helper:assert_scope_subcluster(node(), custom_scope_all, [SlaveNode1, SlaveNode2]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode1, custom_scope_ab, [node()]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode1, custom_scope_bc, [SlaveNode2]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode1, custom_scope_all, [node(), SlaveNode2]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode2, custom_scope_bc, [SlaveNode1]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode2, custom_scope_c, []),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode2, custom_scope_all, [node(), SlaveNode1]),
 
     %% crash a scope process on 2
     rpc:call(SlaveNode2, syn_test_suite_helper, kill_process, [syn_registry_custom_scope_bc]),
-    timer:sleep(250),
+    timer:sleep(500),
 
     %% check
-    assert_scope_subcluster(node(), custom_scope_ab, [SlaveNode1]),
-    assert_scope_subcluster(node(), custom_scope_all, [SlaveNode1, SlaveNode2]),
-    assert_scope_subcluster(SlaveNode1, custom_scope_ab, [node()]),
-    assert_scope_subcluster(SlaveNode1, custom_scope_bc, [SlaveNode2]),
-    assert_scope_subcluster(SlaveNode1, custom_scope_all, [node(), SlaveNode2]),
-    assert_scope_subcluster(SlaveNode2, custom_scope_bc, [SlaveNode1]),
-    assert_scope_subcluster(SlaveNode2, custom_scope_c, []),
-    assert_scope_subcluster(SlaveNode2, custom_scope_all, [node(), SlaveNode1]),
+    syn_test_suite_helper:assert_scope_subcluster(node(), custom_scope_ab, [SlaveNode1]),
+    syn_test_suite_helper:assert_scope_subcluster(node(), custom_scope_all, [SlaveNode1, SlaveNode2]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode1, custom_scope_ab, [node()]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode1, custom_scope_bc, [SlaveNode2]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode1, custom_scope_all, [node(), SlaveNode2]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode2, custom_scope_bc, [SlaveNode1]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode2, custom_scope_c, []),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode2, custom_scope_all, [node(), SlaveNode1]),
 
     %% crash scopes supervisor on local
     syn_test_suite_helper:kill_process(syn_scopes_sup),
-    timer:sleep(250),
+    timer:sleep(500),
 
     %% check
-    assert_scope_subcluster(node(), custom_scope_ab, [SlaveNode1]),
-    assert_scope_subcluster(node(), custom_scope_all, [SlaveNode1, SlaveNode2]),
-    assert_scope_subcluster(SlaveNode1, custom_scope_ab, [node()]),
-    assert_scope_subcluster(SlaveNode1, custom_scope_bc, [SlaveNode2]),
-    assert_scope_subcluster(SlaveNode1, custom_scope_all, [node(), SlaveNode2]),
-    assert_scope_subcluster(SlaveNode2, custom_scope_bc, [SlaveNode1]),
-    assert_scope_subcluster(SlaveNode2, custom_scope_c, []),
-    assert_scope_subcluster(SlaveNode2, custom_scope_all, [node(), SlaveNode1]).
+    syn_test_suite_helper:assert_scope_subcluster(node(), custom_scope_ab, [SlaveNode1]),
+    syn_test_suite_helper:assert_scope_subcluster(node(), custom_scope_all, [SlaveNode1, SlaveNode2]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode1, custom_scope_ab, [node()]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode1, custom_scope_bc, [SlaveNode2]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode1, custom_scope_all, [node(), SlaveNode2]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode2, custom_scope_bc, [SlaveNode1]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode2, custom_scope_c, []),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode2, custom_scope_all, [node(), SlaveNode1]).
 
 three_nodes_register_unregister_and_monitor_default_scope(Config) ->
     %% get slaves
@@ -329,7 +348,7 @@ three_nodes_register_unregister_and_monitor_default_scope(Config) ->
     ok = syn:start(),
     ok = rpc:call(SlaveNode1, syn, start, []),
     ok = rpc:call(SlaveNode2, syn, start, []),
-    timer:sleep(250),
+    timer:sleep(500),
 
     %% start processes
     Pid = syn_test_suite_helper:start_process(),
@@ -359,7 +378,7 @@ three_nodes_register_unregister_and_monitor_default_scope(Config) ->
     ok = syn:register({"my proc alias"}, Pid), %% same pid, different name
     ok = syn:register(<<"my proc with meta">>, PidWithMeta, {meta, <<"meta">>}), %% pid with meta
     ok = syn:register({remote_pid_on, slave_1}, PidRemoteOn1), %% remote on slave 1
-    timer:sleep(250),
+    timer:sleep(500),
 
     %% errors
     {error, taken} = syn:register(<<"my proc">>, PidRemoteOn1),
@@ -386,7 +405,7 @@ three_nodes_register_unregister_and_monitor_default_scope(Config) ->
     %% re-register to edit meta
     ok = syn:register(<<"my proc with meta">>, PidWithMeta, {meta2, <<"meta2">>}),
     ok = rpc:call(SlaveNode2, syn, register, [{remote_pid_on, slave_1}, PidRemoteOn1, added_meta]), %% updated on slave 2
-    timer:sleep(250),
+    timer:sleep(500),
 
     %% retrieve
     {PidWithMeta, {meta2, <<"meta2">>}} = syn:lookup(<<"my proc with meta">>),
@@ -402,14 +421,14 @@ three_nodes_register_unregister_and_monitor_default_scope(Config) ->
 
     %% crash scope process to ensure that monitors get recreated
     exit(whereis(syn_registry_default), kill),
-    timer:sleep(250), %$ wait for sup to restart it
+    timer:sleep(500), %$ wait for sup to restart it
 
     %% kill process
     syn_test_suite_helper:kill_process(Pid),
     syn_test_suite_helper:kill_process(PidRemoteOn1),
     %% unregister process
     ok = syn:unregister(<<"my proc with meta">>),
-    timer:sleep(250),
+    timer:sleep(500),
 
     %% retrieve
     undefined = syn:lookup(<<"my proc">>),
@@ -436,7 +455,7 @@ three_nodes_register_unregister_and_monitor_default_scope(Config) ->
     Pid1 = syn_test_suite_helper:start_process(),
     Pid2 = syn_test_suite_helper:start_process(),
     ok = syn:register(<<"my proc">>, Pid1),
-    timer:sleep(250),
+    timer:sleep(500),
     syn_registry:remove_from_local_table(default, <<"my proc">>, Pid1),
     syn_registry:add_to_local_table(default, <<"my proc">>, Pid2, undefined, 0, undefined),
     {error, race_condition} = rpc:call(SlaveNode1, syn, unregister, [<<"my proc">>]).
@@ -450,13 +469,13 @@ three_nodes_register_unregister_and_monitor_custom_scope(Config) ->
     ok = syn:start(),
     ok = rpc:call(SlaveNode1, syn, start, []),
     ok = rpc:call(SlaveNode2, syn, start, []),
-    timer:sleep(250),
+    timer:sleep(500),
 
     %% add custom scopes
     ok = syn:add_node_to_scope(custom_scope_ab),
     ok = rpc:call(SlaveNode1, syn, add_node_to_scopes, [[custom_scope_ab, custom_scope_bc]]),
     ok = rpc:call(SlaveNode2, syn, add_node_to_scopes, [[custom_scope_bc]]),
-    timer:sleep(250),
+    timer:sleep(500),
 
     %% start processes
     Pid = syn_test_suite_helper:start_process(),
@@ -507,7 +526,7 @@ three_nodes_register_unregister_and_monitor_custom_scope(Config) ->
     {'EXIT', {{invalid_scope, custom_scope_bc}, _}} = catch syn:register(custom_scope_bc, "scope_a", Pid),
     {'EXIT', {{invalid_scope, non_existent_scope}, _}} = catch syn:register(non_existent_scope, "scope_a", Pid),
     ok = rpc:call(SlaveNode2, syn, register, [custom_scope_bc, {remote_scoped_bc}, PidRemoteWithMetaOn1, <<"with_meta 1">>]),
-    timer:sleep(250),
+    timer:sleep(500),
 
     %% errors
     {error, taken} = syn:register(custom_scope_ab, "scope_a", PidWithMeta),
@@ -555,14 +574,14 @@ three_nodes_register_unregister_and_monitor_custom_scope(Config) ->
 
     %% re-register to edit meta
     ok = syn:register(custom_scope_ab, "scope_a_alias", PidWithMeta, <<"with_meta_updated">>),
-    timer:sleep(250),
+    timer:sleep(500),
     {PidWithMeta, <<"with_meta_updated">>} = syn:lookup(custom_scope_ab, "scope_a_alias"),
     {PidWithMeta, <<"with_meta_updated">>} = rpc:call(SlaveNode1, syn, lookup, [custom_scope_ab, "scope_a_alias"]),
     {badrpc, {'EXIT', {{invalid_scope, custom_scope_ab}, _}}} = catch rpc:call(SlaveNode2, syn, lookup, [custom_scope_ab, "scope_a_alias"]),
 
     %% crash scope process to ensure that monitors get recreated
     exit(whereis(syn_registry_custom_scope_ab), kill),
-    timer:sleep(250), %$ wait for sup to restart it
+    timer:sleep(500), %$ wait for sup to restart it
 
     %% kill process
     syn_test_suite_helper:kill_process(Pid),
@@ -571,7 +590,7 @@ three_nodes_register_unregister_and_monitor_custom_scope(Config) ->
     {error, undefined} = catch syn:unregister(<<"my proc with meta">>),
     {'EXIT', {{invalid_scope, custom_scope_bc}, _}} = catch syn:unregister(custom_scope_bc, <<"my proc with meta">>),
     ok = rpc:call(SlaveNode1, syn, unregister, [custom_scope_bc, {remote_scoped_bc}]),
-    timer:sleep(250),
+    timer:sleep(500),
 
     %% retrieve
     undefined = syn:lookup("scope_a"),
@@ -618,7 +637,7 @@ three_nodes_register_unregister_and_monitor_custom_scope(Config) ->
     Pid1 = syn_test_suite_helper:start_process(),
     Pid2 = syn_test_suite_helper:start_process(),
     ok = syn:register(custom_scope_ab, <<"my proc">>, Pid1),
-    timer:sleep(250),
+    timer:sleep(500),
     syn_registry:remove_from_local_table(custom_scope_ab, <<"my proc">>, Pid1),
     syn_registry:add_to_local_table(custom_scope_ab, <<"my proc">>, Pid2, undefined, 0, undefined),
     {error, race_condition} = rpc:call(SlaveNode1, syn, unregister, [custom_scope_ab, <<"my proc">>]).
@@ -638,7 +657,7 @@ three_nodes_cluster_changes(Config) ->
     %% add custom scopes
     ok = rpc:call(SlaveNode1, syn, add_node_to_scopes, [[custom_scope_bc]]),
     ok = rpc:call(SlaveNode2, syn, add_node_to_scopes, [[custom_scope_bc]]),
-    timer:sleep(250),
+    timer:sleep(500),
 
     %% start processes
     PidRemoteOn1 = syn_test_suite_helper:start_process(SlaveNode1),
@@ -649,13 +668,13 @@ three_nodes_cluster_changes(Config) ->
     ok = rpc:call(SlaveNode1, syn, register, ["proc-2", PidRemoteOn2, "meta-2"]),
     ok = rpc:call(SlaveNode1, syn, register, [custom_scope_bc, "BC-proc-1", PidRemoteOn1, "meta-1"]),
     ok = rpc:call(SlaveNode1, syn, register, [custom_scope_bc, "BC-proc-1 alias", PidRemoteOn1, "meta-1 alias"]),
-    timer:sleep(250),
+    timer:sleep(500),
 
     %% form full cluster
     ok = syn:start(),
     rpc:call(SlaveNode1, syn_test_suite_helper, connect_node, [SlaveNode2]),
-    syn_test_suite_helper:wait_cluster_connected([node(), SlaveNode1, SlaveNode2]),
-    timer:sleep(250),
+    syn_test_suite_helper:wait_cluster_mesh_connected([node(), SlaveNode1, SlaveNode2]),
+    timer:sleep(500),
 
     %% retrieve
     {PidRemoteOn1, "meta-1"} = syn:lookup("proc-1"),
@@ -695,7 +714,7 @@ three_nodes_cluster_changes(Config) ->
 
     %% partial netsplit (1 cannot see 2)
     rpc:call(SlaveNode1, syn_test_suite_helper, disconnect_node, [SlaveNode2]),
-    timer:sleep(250),
+    timer:sleep(500),
 
     %% retrieve
     {PidRemoteOn1, "meta-1"} = syn:lookup("proc-1"),
@@ -735,8 +754,8 @@ three_nodes_cluster_changes(Config) ->
 
     %% re-join
     rpc:call(SlaveNode1, syn_test_suite_helper, connect_node, [SlaveNode2]),
-    syn_test_suite_helper:wait_cluster_connected([node(), SlaveNode1, SlaveNode2]),
-    timer:sleep(250),
+    syn_test_suite_helper:wait_cluster_mesh_connected([node(), SlaveNode1, SlaveNode2]),
+    timer:sleep(500),
 
     %% retrieve
     {PidRemoteOn1, "meta-1"} = syn:lookup("proc-1"),
@@ -787,11 +806,11 @@ three_nodes_cluster_conflicts(Config) ->
     %% add custom scopes
     ok = rpc:call(SlaveNode1, syn, add_node_to_scopes, [[custom_scope_bc]]),
     ok = rpc:call(SlaveNode2, syn, add_node_to_scopes, [[custom_scope_bc]]),
-    timer:sleep(250),
+    timer:sleep(500),
 
     %% partial netsplit (1 cannot see 2)
     rpc:call(SlaveNode1, syn_test_suite_helper, disconnect_node, [SlaveNode2]),
-    timer:sleep(250),
+    timer:sleep(500),
 
     %% start conflict processes
     Pid2RemoteOn1 = syn_test_suite_helper:start_process(SlaveNode1),
@@ -805,8 +824,8 @@ three_nodes_cluster_conflicts(Config) ->
 
     %% re-join
     rpc:call(SlaveNode1, syn_test_suite_helper, connect_node, [SlaveNode2]),
-    syn_test_suite_helper:wait_cluster_connected([node(), SlaveNode1, SlaveNode2]),
-    timer:sleep(250),
+    syn_test_suite_helper:wait_cluster_mesh_connected([node(), SlaveNode1, SlaveNode2]),
+    timer:sleep(500),
 
     %% retrieve
     {Pid2RemoteOn2, "meta-2"} = syn:lookup("proc-confict"),
@@ -840,7 +859,7 @@ three_nodes_cluster_conflicts(Config) ->
     Pid2 = syn_test_suite_helper:start_process(SlaveNode1),
     rpc:call(SlaveNode1, syn_registry, add_to_local_table, [default, <<"my proc">>, Pid2, "meta-2", erlang:system_time(), undefined]),
     ok = syn:register(<<"my proc">>, Pid1, "meta-1"),
-    timer:sleep(250),
+    timer:sleep(500),
     {Pid1, "meta-1"} = syn:lookup(<<"my proc">>),
     {Pid1, "meta-1"} = rpc:call(SlaveNode1, syn, lookup, [<<"my proc">>]),
     {Pid1, "meta-1"} = rpc:call(SlaveNode2, syn, lookup, [<<"my proc">>]),
@@ -851,13 +870,13 @@ three_nodes_cluster_conflicts(Config) ->
     PidCustom2 = syn_test_suite_helper:start_process(SlaveNode2),
     rpc:call(SlaveNode2, syn_registry, add_to_local_table, [custom_scope_bc, <<"my proc">>, PidCustom2, "meta-2", erlang:system_time(), undefined]),
     ok = rpc:call(SlaveNode1, syn, register, [custom_scope_bc, <<"my proc">>, PidCustom1, "meta-1"]),
-    timer:sleep(250),
+    timer:sleep(500),
     {PidCustom1, "meta-1"} = rpc:call(SlaveNode1, syn, lookup, [custom_scope_bc, <<"my proc">>]),
     {PidCustom1, "meta-1"} = rpc:call(SlaveNode2, syn, lookup, [custom_scope_bc, <<"my proc">>]),
     true = rpc:call(SlaveNode1, erlang, is_process_alive, [PidCustom1]),
     false = rpc:call(SlaveNode2, erlang, is_process_alive, [PidCustom2]).
 
-three_nodes_custom_event_handler(Config) ->
+three_nodes_custom_event_handler_reg_unreg(Config) ->
     %% get slaves
     SlaveNode1 = proplists:get_value(slave_node_1, Config),
     SlaveNode2 = proplists:get_value(slave_node_2, Config),
@@ -871,61 +890,180 @@ three_nodes_custom_event_handler(Config) ->
     ok = syn:start(),
     ok = rpc:call(SlaveNode1, syn, start, []),
     ok = rpc:call(SlaveNode2, syn, start, []),
-    timer:sleep(250),
+    timer:sleep(500),
+
+    %% init
+    CurrentNode = node(),
 
     %% register test process to receive messages back from test handler
     global:register_name(syn_test_main_process, self()),
 
     %% start process
     Pid = syn_test_suite_helper:start_process(),
+
+    %% ---> on registration
     ok = syn:register("proc-handler", Pid, <<"my-meta">>),
 
     %% check callbacks on_process_registered called on all nodes
-    CurrentNode = node(),
     receive
         {on_process_registered, CurrentNode, default, "proc-handler", Pid, <<"my-meta">>} -> ok
     after 1000 ->
-        ok = on_process_registered_not_called_on_main_node
+        ct:fail({on_process_registered_not_called_on, node()})
     end,
     receive
         {on_process_registered, SlaveNode1, default, "proc-handler", Pid, <<"my-meta">>} -> ok
     after 1000 ->
-        ok = on_process_registered_not_called_on_slave_1_node
+        ct:fail({on_process_registered_not_called_on, SlaveNode1})
     end,
     receive
         {on_process_registered, SlaveNode2, default, "proc-handler", Pid, <<"my-meta">>} -> ok
     after 1000 ->
-        ok = on_process_registered_not_called_on_slave_2_node
+        ct:fail({on_process_registered_not_called_on, SlaveNode2})
     end,
+    %% no other messages
+    {message_queue_len, 0} = process_info(global:whereis_name(syn_test_main_process), message_queue_len),
 
+    %% ---> on meta update
     ok = syn:register("proc-handler", Pid, <<"my-new-meta">>),
 
     %% check callbacks on_process_registered are called on nodes because of change of meta
     receive
         {on_process_registered, CurrentNode, default, "proc-handler", Pid, <<"my-new-meta">>} -> ok
     after 1000 ->
-        ok = on_process_registered_not_called_on_main_node
+        ct:fail({on_process_registered_not_called_on, node()})
     end,
     receive
         {on_process_registered, SlaveNode1, default, "proc-handler", Pid, <<"my-new-meta">>} -> ok
     after 1000 ->
-        ok = on_process_registered_not_called_on_slave_1_node
+        ct:fail({on_process_registered_not_called_on, SlaveNode1})
     end,
     receive
         {on_process_registered, SlaveNode2, default, "proc-handler", Pid, <<"my-new-meta">>} -> ok
     after 1000 ->
-        ok = on_process_registered_not_called_on_slave_2_node
+        ct:fail({on_process_registered_not_called_on, SlaveNode2})
+    end,
+    %% no other messages
+    {message_queue_len, 0} = process_info(global:whereis_name(syn_test_main_process), message_queue_len),
+
+    %% ---> on unregister
+    ok = syn:unregister("proc-handler"),
+
+    %% check callbacks on_process_unregistered called on all nodes
+    receive
+        {on_process_unregistered, CurrentNode, default, "proc-handler", Pid, <<"my-new-meta">>} -> ok
+    after 1000 ->
+        ct:fail({on_process_unregistered_not_called_on, node()})
+    end,
+    receive
+        {on_process_unregistered, SlaveNode1, default, "proc-handler", Pid, <<"my-new-meta">>} -> ok
+    after 1000 ->
+        ct:fail({on_process_unregistered_not_called_on, SlaveNode1})
+    end,
+    receive
+        {on_process_unregistered, SlaveNode2, default, "proc-handler", Pid, <<"my-new-meta">>} -> ok
+    after 1000 ->
+        ct:fail({on_process_unregistered_not_called_on, SlaveNode2})
+    end,
+    %% no other messages
+    {message_queue_len, 0} = process_info(global:whereis_name(syn_test_main_process), message_queue_len),
+
+    %% clean
+    syn_test_suite_helper:kill_process(Pid),
+    timer:sleep(500),
+    syn_test_suite_helper:flush_inbox(),
+
+    %% ---> after a netsplit
+    PidRemoteOn1 = syn_test_suite_helper:start_process(SlaveNode1),
+    syn:register(remote_on_1, PidRemoteOn1, {some, meta}),
+    timer:sleep(500),
+
+    %% flush inbox
+    syn_test_suite_helper:flush_inbox(),
+
+    %% partial netsplit (main cannot see 1)
+    syn_test_suite_helper:disconnect_node(SlaveNode1),
+    timer:sleep(500),
+
+    %% check callbacks on_process_unregistered called on all nodes
+    receive
+        {on_process_unregistered, CurrentNode, default, remote_on_1, PidRemoteOn1, {some, meta}} -> ok
+    after 1000 ->
+        ct:fail({on_process_unregistered_not_called_on, node()})
     end,
+    %% no other messages
+    {message_queue_len, 0} = process_info(global:whereis_name(syn_test_main_process), message_queue_len),
+
+    %% ---> after a re-join
+    %% re-join
+    syn_test_suite_helper:connect_node(SlaveNode1),
+    timer:sleep(500),
+
+    %% check callbacks on_process_registered called on all nodes
+    receive
+        {on_process_registered, CurrentNode, default, remote_on_1, PidRemoteOn1, {some, meta}} -> ok
+    after 1000 ->
+        ct:fail({on_process_registered_not_called_on, node()})
+    end,
+    %% no other messages
+    {message_queue_len, 0} = process_info(global:whereis_name(syn_test_main_process), message_queue_len),
+
+    %% clean
+    syn_test_suite_helper:kill_process(PidRemoteOn1),
+    timer:sleep(1000),
+    syn_test_suite_helper:flush_inbox(),
+
+    %% ---> after a conflict resolution
+    %% partial netsplit (1 cannot see 2)
+    rpc:call(SlaveNode1, syn_test_suite_helper, disconnect_node, [SlaveNode2]),
+    timer:sleep(1000),
+
+    %% check
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode1, default, [node()]),
+    syn_test_suite_helper:assert_scope_subcluster(SlaveNode2, default, [node()]),
+
+    %% start conflict processes
+    Pid2RemoteOn1 = syn_test_suite_helper:start_process(SlaveNode1),
+    Pid2RemoteOn2 = syn_test_suite_helper:start_process(SlaveNode2),
+    ok = rpc:call(SlaveNode1, syn, register, ["proc-confict", Pid2RemoteOn1, <<"meta-1">>]),
+    ok = rpc:call(SlaveNode2, syn, register, ["proc-confict", Pid2RemoteOn2, <<"meta-2">>]),
+    timer:sleep(500),
+
+    %% flush inbox
+    syn_test_suite_helper:flush_inbox(),
+
+    %% re-join
+    rpc:call(SlaveNode1, syn_test_suite_helper, connect_node, [SlaveNode2]),
+    syn_test_suite_helper:wait_cluster_mesh_connected([node(), SlaveNode1, SlaveNode2]),
+    timer:sleep(500),
+
+    %% check callbacks on_process_unregistered called on all nodes
+    receive
+        {on_process_unregistered, SlaveNode1, default, "proc-confict", Pid2RemoteOn1, <<"meta-1">>} -> ok
+    after 1000 ->
+        ct:fail({on_process_unregistered_not_called_on, SlaveNode1})
+    end,
+    receive
+        {on_process_registered, SlaveNode1, default, "proc-confict", Pid2RemoteOn2, <<"meta-2">>} -> ok
+    after 1000 ->
+        ct:fail({on_process_registered_not_called_on, SlaveNode1})
+    end,
+    %% no other messages
+    {message_queue_len, 0} = process_info(global:whereis_name(syn_test_main_process), message_queue_len),
 
     %% clean
+    syn_test_suite_helper:kill_process(Pid2RemoteOn1),
+    syn_test_suite_helper:kill_process(Pid2RemoteOn2),
+    timer:sleep(500),
+    syn_test_suite_helper:flush_inbox(),
+
+    %% ---> don't call on monitor rebuild
+    %% crash the scope process on local
+    syn_test_suite_helper:kill_process(syn_registry_default),
+    timer:sleep(500),
+
+    %% no messages
+    {message_queue_len, 0} = process_info(global:whereis_name(syn_test_main_process), message_queue_len),
+
+    %% unregister test process
     global:unregister_name(syn_test_main_process).
 
-%% ===================================================================
-%% Internal
-%% ===================================================================
-assert_scope_subcluster(Node, Scope, ExpectedNodes) ->
-    NodesMap = rpc:call(Node, syn_registry, get_subcluster_nodes, [Scope]),
-    Nodes = maps:keys(NodesMap),
-    ExpectedCount = length(ExpectedNodes),
-    ExpectedCount = length(Nodes),
-    lists:foreach(fun(RemoteNode) -> true = lists:member(RemoteNode, Nodes) end, ExpectedNodes).

+ 4 - 0
test/syn_test_event_handler.erl

@@ -27,10 +27,14 @@
 -behaviour(syn_event_handler).
 
 -export([on_process_registered/4]).
+-export([on_process_unregistered/4]).
 
 on_process_registered(Scope, Name, Pid, Meta) ->
     global:send(syn_test_main_process, {on_process_registered, node(), Scope, Name, Pid, Meta}).
 
+on_process_unregistered(Scope, Name, Pid, Meta) ->
+    global:send(syn_test_main_process, {on_process_unregistered, node(), Scope, Name, Pid, Meta}).
+
 %%-export([resolve_registry_conflict/4]).
 %%
 %%-spec resolve_registry_conflict(

+ 50 - 9
test/syn_test_suite_helper.erl

@@ -29,11 +29,13 @@
 -export([start_slave/1, start_slave/4]).
 -export([stop_slave/1, stop_slave/2]).
 -export([connect_node/1, disconnect_node/1]).
+-export([use_custom_handler/0]).
 -export([clean_after_test/0]).
 -export([start_process/0, start_process/1, start_process/2]).
 -export([kill_process/1]).
--export([wait_cluster_connected/1]).
--export([use_custom_handler/0]).
+-export([flush_inbox/0]).
+-export([wait_cluster_mesh_connected/1]).
+-export([assert_scope_subcluster/3]).
 -export([send_error_logger_to_disk/0]).
 
 %% internal
@@ -68,6 +70,9 @@ connect_node(Node) ->
 disconnect_node(Node) ->
     erlang:disconnect_node(Node).
 
+use_custom_handler() ->
+    application:set_env(syn, event_handler, syn_test_event_handler).
+
 clean_after_test() ->
     Nodes = [node() | nodes()],
     %% shutdown
@@ -96,9 +101,16 @@ kill_process(Pid) when is_pid(Pid) ->
 kill_process(RegisteredName) when is_atom(RegisteredName) ->
     exit(whereis(RegisteredName), kill).
 
-wait_cluster_connected(Nodes) ->
-    wait_cluster_connected(Nodes, os:system_time(millisecond)).
-wait_cluster_connected(Nodes, StartAt) ->
+flush_inbox() ->
+    receive
+        _ -> flush_inbox()
+    after
+        0 -> ok
+    end.
+
+wait_cluster_mesh_connected(Nodes) ->
+    wait_cluster_mesh_connected(Nodes, os:system_time(millisecond)).
+wait_cluster_mesh_connected(Nodes, StartAt) ->
     AllSynced = lists:all(fun(Node) ->
         RemoteNodes = rpc:call(Node, erlang, nodes, []),
         AllNodes = [Node | RemoteNodes],
@@ -114,13 +126,37 @@ wait_cluster_connected(Nodes, StartAt) ->
                 true ->
                     {error, {could_not_init_cluster, Nodes}};
                 false ->
-                    timer:sleep(1000),
-                    wait_cluster_connected(Nodes, StartAt)
+                    timer:sleep(100),
+                    wait_cluster_mesh_connected(Nodes, StartAt)
             end
     end.
 
-use_custom_handler() ->
-    application:set_env(syn, event_handler, syn_test_event_handler).
+assert_scope_subcluster(Node, Scope, ExpectedNodes) ->
+    NodesMap = rpc:call(Node, syn_registry, get_subcluster_nodes, [Scope]),
+    Nodes = maps:keys(NodesMap),
+    ExpectedCount = length(ExpectedNodes),
+    %% count nodes
+    case length(Nodes) of
+        ExpectedCount ->
+            ok;
+
+        _ ->
+            ct:fail("~n\tInvalid subcluster~n\tExpected: ~p~n\tActual: ~p~n\tLine: ~p~n",
+                [ExpectedNodes, Nodes, get_line_from_stacktrace(2)]
+            )
+    end,
+    %% loop nodes
+    lists:foreach(fun(RemoteNode) ->
+        case lists:member(RemoteNode, Nodes) of
+            true ->
+                ok;
+
+            _ ->
+                ct:fail("~n\tInvalid subcluster~n\tExpected: ~p~n\tActual: ~p~n\tLine: ~p~n",
+                    [ExpectedNodes, Nodes, get_line_from_stacktrace(3)]
+                )
+        end
+    end, ExpectedNodes).
 
 send_error_logger_to_disk() ->
     error_logger:logfile({open, atom_to_list(node())}).
@@ -132,3 +168,8 @@ process_main() ->
     receive
         _ -> process_main()
     end.
+
+get_line_from_stacktrace(Position) ->
+    {current_stacktrace, Stacktrace} = process_info(self(), current_stacktrace),
+    {_, _, _, FileInfo} = lists:nth(Position, Stacktrace),
+    proplists:get_value(line, FileInfo).