Просмотр исходного кода

Add on process joined callback.

Roberto Ostinelli 3 лет назад
Родитель
Сommit
bc33e57747
4 измененных файлов с 81 добавлено и 17 удалено
  1. 13 2
      src/syn_event_handler.erl
  2. 13 9
      src/syn_groups.erl
  3. 51 6
      test/syn_groups_SUITE.erl
  4. 4 0
      test/syn_test_event_handler_callbacks.erl

+ 13 - 2
src/syn_event_handler.erl

@@ -30,8 +30,8 @@
 
 %% API
 -export([ensure_event_handler_loaded/0]).
--export([do_on_process_registered/4]).
--export([do_on_process_unregistered/4]).
+-export([do_on_process_registered/4, do_on_process_unregistered/4]).
+-export([do_on_process_joined/4]).
 -export([do_resolve_registry_conflict/4]).
 
 -callback on_process_registered(
@@ -86,6 +86,17 @@ do_on_process_registered(Scope, Name, {_TablePid, _TableMeta}, {Pid, Meta}) ->
 do_on_process_unregistered(Scope, Name, Pid, Meta) ->
     call_callback_event(on_process_unregistered, Scope, Name, Pid, Meta).
 
+-spec do_on_process_joined(
+    Scope :: atom(),
+    Name :: any(),
+    {TablePid :: pid() | undefined, TableMeta :: any()},
+    {Pid :: pid(), Meta :: any()}
+) -> any().
+do_on_process_joined(_Scope, _GroupName, {TableMeta}, {_Pid, Meta})
+    when TableMeta =:= Meta -> ok;
+do_on_process_joined(Scope, GroupName, {_TableMeta}, {Pid, Meta}) ->
+    call_callback_event(on_process_joined, Scope, GroupName, Pid, Meta).
+
 -spec do_resolve_registry_conflict(
     Scope :: atom(),
     Name :: any(),

+ 13 - 9
src/syn_groups.erl

@@ -92,11 +92,11 @@ join(Scope, GroupName, Pid) when is_pid(Pid) ->
 join(Scope, GroupName, Pid, Meta) ->
     Node = node(Pid),
     case syn_gen_scope:call(?MODULE, Node, Scope, {join_on_owner, node(), GroupName, Pid, Meta}) of
-        {ok, {Time, TableByName, TableByPid}} when Node =/= node() ->
+        {ok, {TableMeta, Time, TableByName, TableByPid}} when Node =/= node() ->
             %% update table on caller node immediately so that subsequent calls have an updated registry
             add_to_local_table(GroupName, Pid, Meta, Time, undefined, TableByName, TableByPid),
             %% callback
-            %%syn_event_handler:do_on_process_joined(Scope, GroupName, {TablePid, TableMeta}, {Pid, Meta}),
+            syn_event_handler:do_on_process_joined(Scope, GroupName, {TableMeta}, {Pid, Meta}),
             %% return
             ok;
 
@@ -188,6 +188,7 @@ init(State) ->
     {stop, Reason :: term(), Reply :: term(), #state{}} |
     {stop, Reason :: term(), #state{}}.
 handle_call({join_on_owner, RequesterNode, GroupName, Pid, Meta}, _From, #state{
+    scope = Scope,
     table_by_name = TableByName,
     table_by_pid = TableByPid
 } = State) ->
@@ -201,12 +202,16 @@ handle_call({join_on_owner, RequesterNode, GroupName, Pid, Meta}, _From, #state{
             %% add to local table
             Time = erlang:system_time(),
             add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid),
+            TableMeta = case find_groups_entry_by_name_and_pid(GroupName, Pid, TableByName) of
+                {{_, M}, _, _, _, _} -> M;
+                _ -> undefined
+            end,
             %% callback
-            %%syn_event_handler:do_on_process_joined(Scope, GroupName, {undefined, undefined}, {Pid, Meta}),
+            syn_event_handler:do_on_process_joined(Scope, GroupName, {TableMeta}, {Pid, Meta}),
             %% broadcast
             syn_gen_scope:broadcast({'3.0', sync_join, GroupName, Pid, Meta, Time}, [RequesterNode], State),
             %% return
-            {reply, {ok, {Time, TableByName, TableByPid}}, State};
+            {reply, {ok, {TableMeta, Time, TableByName, TableByPid}}, State};
 
         false ->
             {reply, {{error, not_alive}, undefined}, State}
@@ -439,6 +444,7 @@ purge_groups_for_remote_node(_Scope, Node, TableByName, TableByPid) ->
     #state{}
 ) -> any().
 handle_groups_sync(GroupName, Pid, Meta, Time, #state{
+    scope = Scope,
     table_by_name = TableByName,
     table_by_pid = TableByPid
 }) ->
@@ -447,15 +453,13 @@ handle_groups_sync(GroupName, Pid, Meta, Time, #state{
             %% new
             add_to_local_table(GroupName, Pid, Meta, Time, undefined, TableByName, TableByPid),
             %% callback
-            %%syn_event_handler:do_on_process_joined(Scope, GroupName, {undefined, undefined}, {Pid, Meta});
-            ok;
+            syn_event_handler:do_on_process_joined(Scope, GroupName, {undefined}, {Pid, Meta});
 
-        {{GroupName, Pid}, _TableMeta, TableTime, _MRef, _TableNode} when Time > TableTime ->
+        {{GroupName, Pid}, TableMeta, TableTime, _MRef, _TableNode} when Time > TableTime ->
             %% update meta
             add_to_local_table(GroupName, Pid, Meta, Time, undefined, TableByName, TableByPid),
             %% callback
-            %%syn_event_handler:do_on_process_joined(Scope, GroupName, {undefined, undefined}, {Pid, Meta});
-            ok;
+            syn_event_handler:do_on_process_joined(Scope, GroupName, {TableMeta}, {Pid, Meta});
 
         {{GroupName, Pid}, _TableMeta, _TableTime, _TableMRef, _TableNode} ->
             %% race condition: incoming data is older, ignore

+ 51 - 6
test/syn_groups_SUITE.erl

@@ -37,7 +37,8 @@
     three_nodes_discover_custom_scope/1,
     three_nodes_join_leave_and_monitor_default_scope/1,
     three_nodes_join_leave_and_monitor_custom_scope/1,
-    three_nodes_cluster_changes/1
+    three_nodes_cluster_changes/1,
+    three_nodes_custom_event_handler_joined_left/1
 ]).
 
 %% include
@@ -75,11 +76,12 @@ all() ->
 groups() ->
     [
         {three_nodes_groups, [shuffle], [
-            three_nodes_discover_default_scope,
-            three_nodes_discover_custom_scope,
-            three_nodes_join_leave_and_monitor_default_scope,
-            three_nodes_join_leave_and_monitor_custom_scope,
-            three_nodes_cluster_changes
+%%            three_nodes_discover_default_scope,
+%%            three_nodes_discover_custom_scope,
+%%            three_nodes_join_leave_and_monitor_default_scope,
+%%            three_nodes_join_leave_and_monitor_custom_scope,
+%%            three_nodes_cluster_changes,
+            three_nodes_custom_event_handler_joined_left
         ]}
     ].
 %% -------------------------------------------------------------------
@@ -940,3 +942,46 @@ three_nodes_cluster_changes(Config) ->
     0 = rpc:call(SlaveNode2, syn, groups_count, [custom_scope_bc, node()]),
     1 = rpc:call(SlaveNode2, syn, groups_count, [custom_scope_bc, SlaveNode1]),
     1 = rpc:call(SlaveNode2, syn, groups_count, [custom_scope_bc, SlaveNode2]).
+
+three_nodes_custom_event_handler_joined_left(Config) ->
+    %% get slaves
+    SlaveNode1 = proplists:get_value(slave_node_1, Config),
+    SlaveNode2 = proplists:get_value(slave_node_2, Config),
+
+    %% add custom handler for callbacks
+    syn:set_event_handler(syn_test_event_handler_callbacks),
+    rpc:call(SlaveNode1, syn, set_event_handler, [syn_test_event_handler_callbacks]),
+    rpc:call(SlaveNode2, syn, set_event_handler, [syn_test_event_handler_callbacks]),
+
+    %% start syn on nodes
+    ok = syn:start(),
+    ok = rpc:call(SlaveNode1, syn, start, []),
+    ok = rpc:call(SlaveNode2, syn, start, []),
+
+    %% init
+    CurrentNode = node(),
+
+    %% start process
+    Pid = syn_test_suite_helper:start_process(),
+
+    %% ---> on join
+    ok = syn:join("my-group", Pid, {recipient, self(), <<"meta">>}),
+
+    %% check callbacks called
+    syn_test_suite_helper:assert_received_messages([
+        {on_process_joined, CurrentNode, default, "my-group", Pid, <<"meta">>},
+        {on_process_joined, SlaveNode1, default, "my-group", Pid, <<"meta">>},
+        {on_process_joined, SlaveNode2, default, "my-group", Pid, <<"meta">>}
+    ]),
+    syn_test_suite_helper:assert_empty_queue(self()),
+
+    %% ---> on meta update
+    ok = syn:join("my-group", Pid, {recipient, self(), <<"new-meta">>}),
+
+    %% check callbacks called
+    syn_test_suite_helper:assert_received_messages([
+        {on_process_joined, CurrentNode, default, "my-group", Pid, <<"new-meta">>},
+        {on_process_joined, SlaveNode1, default, "my-group", Pid, <<"new-meta">>},
+        {on_process_joined, SlaveNode2, default, "my-group", Pid, <<"new-meta">>}
+    ]),
+    syn_test_suite_helper:assert_empty_queue(self()).

+ 4 - 0
test/syn_test_event_handler_callbacks.erl

@@ -28,9 +28,13 @@
 
 -export([on_process_registered/4]).
 -export([on_process_unregistered/4]).
+-export([on_process_joined/4]).
 
 on_process_registered(Scope, Name, Pid, {recipient, RecipientPid, AdditionalMeta}) ->
     RecipientPid ! {on_process_registered, node(), Scope, Name, Pid, AdditionalMeta}.
 
 on_process_unregistered(Scope, Name, Pid, {recipient, RecipientPid, AdditionalMeta}) ->
     RecipientPid ! {on_process_unregistered, node(), Scope, Name, Pid, AdditionalMeta}.
+
+on_process_joined(Scope, GroupName, Pid, {recipient, RecipientPid, AdditionalMeta}) ->
+    RecipientPid ! {on_process_joined, node(), Scope, GroupName, Pid, AdditionalMeta}.