Browse Source

Add on_process_left callback.

Roberto Ostinelli 3 years ago
parent
commit
9e4b516f87
4 changed files with 121 additions and 27 deletions
  1. 10 1
      src/syn_event_handler.erl
  2. 28 20
      src/syn_groups.erl
  3. 79 6
      test/syn_groups_SUITE.erl
  4. 4 0
      test/syn_test_event_handler_callbacks.erl

+ 10 - 1
src/syn_event_handler.erl

@@ -31,7 +31,7 @@
 %% API
 -export([ensure_event_handler_loaded/0]).
 -export([do_on_process_registered/4, do_on_process_unregistered/4]).
--export([do_on_process_joined/4]).
+-export([do_on_process_joined/4, do_on_process_left/4]).
 -export([do_resolve_registry_conflict/4]).
 
 -callback on_process_registered(
@@ -97,6 +97,15 @@ do_on_process_joined(_Scope, _GroupName, {TableMeta}, {_Pid, Meta})
 do_on_process_joined(Scope, GroupName, {_TableMeta}, {Pid, Meta}) ->
     call_callback_event(on_process_joined, Scope, GroupName, Pid, Meta).
 
+-spec do_on_process_left(
+    Scope :: atom(),
+    Name :: any(),
+    TablePid :: pid(),
+    TableMeta :: any()
+) -> any().
+do_on_process_left(Scope, GroupName, Pid, Meta) ->
+    call_callback_event(on_process_left, Scope, GroupName, Pid, Meta).
+
 -spec do_resolve_registry_conflict(
     Scope :: atom(),
     Name :: any(),

+ 28 - 20
src/syn_groups.erl

@@ -117,11 +117,11 @@ leave(Scope, GroupName, Pid) ->
         TableByName ->
             Node = node(Pid),
             case syn_gen_scope:call(?MODULE, Node, Scope, {leave_on_owner, node(), GroupName, Pid}) of
-                {ok, TableByPid} when Node =/= node() ->
+                {ok, {TableMeta, TableByPid}} when Node =/= node() ->
                     %% remove table on caller node immediately so that subsequent calls have an updated registry
                     remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
                     %% callback
-                    %%syn_event_handler:do_on_process_left(Scope, GroupName, Pid, Meta),
+                    syn_event_handler:do_on_process_left(Scope, GroupName, Pid, TableMeta),
                     %% return
                     ok;
 
@@ -199,13 +199,14 @@ handle_call({join_on_owner, RequesterNode, GroupName, Pid, Meta}, _From, #state{
                 undefined -> erlang:monitor(process, Pid);  %% process is not monitored yet, add
                 MRef0 -> MRef0
             end,
-            %% add to local table
-            Time = erlang:system_time(),
-            add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid),
+            %% get table meta
             TableMeta = case find_groups_entry_by_name_and_pid(GroupName, Pid, TableByName) of
                 {{_, M}, _, _, _, _} -> M;
                 _ -> undefined
             end,
+            %% add to local table
+            Time = erlang:system_time(),
+            add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid),
             %% callback
             syn_event_handler:do_on_process_joined(Scope, GroupName, {TableMeta}, {Pid, Meta}),
             %% broadcast
@@ -218,6 +219,7 @@ handle_call({join_on_owner, RequesterNode, GroupName, Pid, Meta}, _From, #state{
     end;
 
 handle_call({leave_on_owner, RequesterNode, GroupName, Pid}, _From, #state{
+    scope = Scope,
     table_by_name = TableByName,
     table_by_pid = TableByPid
 } = State) ->
@@ -225,15 +227,17 @@ handle_call({leave_on_owner, RequesterNode, GroupName, Pid}, _From, #state{
         undefined ->
             {reply, {{error, not_in_group}, undefined}, State};
 
-        _ ->
+        {{_, _}, TableMeta, _, _, _} ->
             %% is this the last group process is in?
             maybe_demonitor(Pid, TableByPid),
             %% remove from table
             remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
+            %% callback
+            syn_event_handler:do_on_process_left(Scope, GroupName, Pid, TableMeta),
             %% broadcast
-            syn_gen_scope:broadcast({'3.0', sync_leave, GroupName, Pid}, [RequesterNode], State),
+            syn_gen_scope:broadcast({'3.0', sync_leave, GroupName, Pid, TableMeta}, [RequesterNode], State),
             %% return
-            {reply, {ok, TableByPid}, State}
+            {reply, {ok, {TableMeta, TableByPid}}, State}
     end;
 
 handle_call(Request, From, State) ->
@@ -251,16 +255,20 @@ handle_info({'3.0', sync_join, GroupName, Pid, Meta, Time}, State) ->
     handle_groups_sync(GroupName, Pid, Meta, Time, State),
     {noreply, State};
 
-handle_info({'3.0', sync_leave, GroupName, Pid}, #state{
+handle_info({'3.0', sync_leave, GroupName, Pid, Meta}, #state{
+    scope = Scope,
     table_by_name = TableByName,
     table_by_pid = TableByPid
 } = State) ->
     %% remove from table
     remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
+    %% callback
+    syn_event_handler:do_on_process_left(Scope, GroupName, Pid, Meta),
     %% return
     {noreply, State};
 
 handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{
+    scope = Scope,
     table_by_name = TableByName,
     table_by_pid = TableByPid
 } = State) ->
@@ -272,13 +280,13 @@ handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{
             );
 
         Entries ->
-            lists:foreach(fun({{_Pid, GroupName}, _Meta, _, _, _}) ->
+            lists:foreach(fun({{_Pid, GroupName}, Meta, _, _, _}) ->
                 %% remove from table
                 remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
                 %% callback
-                %%syn_event_handler:do_on_process_left(Scope, GroupName, Pid, Meta),
+                syn_event_handler:do_on_process_left(Scope, GroupName, Pid, Meta),
                 %% broadcast
-                syn_gen_scope:broadcast({'3.0', sync_leave, GroupName, Pid}, State)
+                syn_gen_scope:broadcast({'3.0', sync_leave, GroupName, Pid, Meta}, State)
             end, Entries)
     end,
     %% return
@@ -425,14 +433,14 @@ remove_from_local_table(GroupName, Pid, TableByName, TableByPid) ->
     true = ets:delete(TableByPid, {Pid, GroupName}).
 
 -spec purge_groups_for_remote_node(Scope :: atom(), Node :: atom(), TableByName :: atom(), TableByPid :: atom()) -> true.
-purge_groups_for_remote_node(_Scope, Node, TableByName, TableByPid) ->
-%%    %% loop elements for callback in a separate process to free scope process
-%%    GroupsTuples = get_groups_tuples_for_node(Node, TableByName),
-%%    spawn(fun() ->
-%%        lists:foreach(fun({Name, Pid, Meta, _Time}) ->
-%%            syn_event_handler:do_on_process_left(Scope, Name, Pid, Meta)
-%%        end, GroupsTuples)
-%%    end),
+purge_groups_for_remote_node(Scope, Node, TableByName, TableByPid) ->
+    %% loop elements for callback in a separate process to free scope process
+    GroupsTuples = get_groups_tuples_for_node(Node, TableByName),
+    spawn(fun() ->
+        lists:foreach(fun({GroupName, Pid, Meta, _Time}) ->
+            syn_event_handler:do_on_process_left(Scope, GroupName, Pid, Meta)
+        end, GroupsTuples)
+    end),
     ets:match_delete(TableByName, {{'_', '_'}, '_', '_', '_', Node}),
     ets:match_delete(TableByPid, {{'_', '_'}, '_', '_', '_', Node}).
 

+ 79 - 6
test/syn_groups_SUITE.erl

@@ -76,11 +76,11 @@ all() ->
 groups() ->
     [
         {three_nodes_groups, [shuffle], [
-%%            three_nodes_discover_default_scope,
-%%            three_nodes_discover_custom_scope,
-%%            three_nodes_join_leave_and_monitor_default_scope,
-%%            three_nodes_join_leave_and_monitor_custom_scope,
-%%            three_nodes_cluster_changes,
+            three_nodes_discover_default_scope,
+            three_nodes_discover_custom_scope,
+            three_nodes_join_leave_and_monitor_default_scope,
+            three_nodes_join_leave_and_monitor_custom_scope,
+            three_nodes_cluster_changes,
             three_nodes_custom_event_handler_joined_left
         ]}
     ].
@@ -984,4 +984,77 @@ three_nodes_custom_event_handler_joined_left(Config) ->
         {on_process_joined, SlaveNode1, default, "my-group", Pid, <<"new-meta">>},
         {on_process_joined, SlaveNode2, default, "my-group", Pid, <<"new-meta">>}
     ]),
-    syn_test_suite_helper:assert_empty_queue(self()).
+    syn_test_suite_helper:assert_empty_queue(self()),
+
+    %% ---> on left
+    ok = syn:leave("my-group", Pid),
+
+    %% check callbacks called
+    syn_test_suite_helper:assert_received_messages([
+        {on_process_left, CurrentNode, default, "my-group", Pid, <<"new-meta">>},
+        {on_process_left, SlaveNode1, default, "my-group", Pid, <<"new-meta">>},
+        {on_process_left, SlaveNode2, default, "my-group", Pid, <<"new-meta">>}
+    ]),
+    syn_test_suite_helper:assert_empty_queue(self()),
+
+    %% clean & check
+    syn_test_suite_helper:kill_process(Pid),
+    syn_test_suite_helper:assert_empty_queue(self()),
+
+    %% ---> after a netsplit
+    PidRemoteOn1 = syn_test_suite_helper:start_process(SlaveNode1),
+    syn:join(remote_on_1, PidRemoteOn1, {recipient, self(), <<"netsplit">>}),
+
+    %% check callbacks called
+    syn_test_suite_helper:assert_received_messages([
+        {on_process_joined, CurrentNode, default, remote_on_1, PidRemoteOn1, <<"netsplit">>},
+        {on_process_joined, SlaveNode1, default, remote_on_1, PidRemoteOn1, <<"netsplit">>},
+        {on_process_joined, SlaveNode2, default, remote_on_1, PidRemoteOn1, <<"netsplit">>}
+    ]),
+    syn_test_suite_helper:assert_empty_queue(self()),
+
+    %% partial netsplit (1 cannot see 2)
+    rpc:call(SlaveNode1, syn_test_suite_helper, disconnect_node, [SlaveNode2]),
+    syn_test_suite_helper:assert_cluster(node(), [SlaveNode1, SlaveNode2]),
+    syn_test_suite_helper:assert_cluster(SlaveNode1, [node()]),
+    syn_test_suite_helper:assert_cluster(SlaveNode2, [node()]),
+
+    %% check callbacks called
+    syn_test_suite_helper:assert_received_messages([
+        {on_process_left, SlaveNode2, default, remote_on_1, PidRemoteOn1, <<"netsplit">>}
+    ]),
+    syn_test_suite_helper:assert_empty_queue(self()),
+
+    %% ---> after a re-join
+    %% re-join
+    rpc:call(SlaveNode1, syn_test_suite_helper, connect_node, [SlaveNode2]),
+    syn_test_suite_helper:assert_cluster(node(), [SlaveNode1, SlaveNode2]),
+    syn_test_suite_helper:assert_cluster(SlaveNode1, [node(), SlaveNode2]),
+    syn_test_suite_helper:assert_cluster(SlaveNode2, [node(), SlaveNode1]),
+
+    %% check callbacks called
+    syn_test_suite_helper:assert_received_messages([
+        {on_process_joined, SlaveNode2, default, remote_on_1, PidRemoteOn1, <<"netsplit">>}
+    ]),
+    syn_test_suite_helper:assert_empty_queue(self()),
+
+    %% clean
+    syn_test_suite_helper:kill_process(PidRemoteOn1),
+
+    %% check callbacks called
+    syn_test_suite_helper:assert_received_messages([
+        {on_process_left, CurrentNode, default, remote_on_1, PidRemoteOn1, <<"netsplit">>},
+        {on_process_left, SlaveNode1, default, remote_on_1, PidRemoteOn1, <<"netsplit">>},
+        {on_process_left, SlaveNode2, default, remote_on_1, PidRemoteOn1, <<"netsplit">>}
+    ]),
+    syn_test_suite_helper:assert_empty_queue(self()),
+
+    %% ---> don't call on monitor rebuild
+    %% crash the scope process on local
+    syn_test_suite_helper:kill_process(syn_groups_default),
+
+    %% no messages
+    syn_test_suite_helper:assert_wait(
+        ok,
+        fun() -> syn_test_suite_helper:assert_empty_queue(self()) end
+    ).

+ 4 - 0
test/syn_test_event_handler_callbacks.erl

@@ -29,6 +29,7 @@
 -export([on_process_registered/4]).
 -export([on_process_unregistered/4]).
 -export([on_process_joined/4]).
+-export([on_process_left/4]).
 
 on_process_registered(Scope, Name, Pid, {recipient, RecipientPid, AdditionalMeta}) ->
     RecipientPid ! {on_process_registered, node(), Scope, Name, Pid, AdditionalMeta}.
@@ -38,3 +39,6 @@ on_process_unregistered(Scope, Name, Pid, {recipient, RecipientPid, AdditionalMe
 
 on_process_joined(Scope, GroupName, Pid, {recipient, RecipientPid, AdditionalMeta}) ->
     RecipientPid ! {on_process_joined, node(), Scope, GroupName, Pid, AdditionalMeta}.
+
+on_process_left(Scope, GroupName, Pid, {recipient, RecipientPid, AdditionalMeta}) ->
+    RecipientPid ! {on_process_left, node(), Scope, GroupName, Pid, AdditionalMeta}.