Browse Source

Remove remote node data after scope process crashing to ensure consistency.

Roberto Ostinelli 3 years ago
parent
commit
6061316986
4 changed files with 108 additions and 50 deletions
  1. 36 18
      src/syn_pg.erl
  2. 43 27
      src/syn_registry.erl
  3. 11 0
      test/syn_pg_SUITE.erl
  4. 18 5
      test/syn_registry_SUITE.erl

+ 36 - 18
src/syn_pg.erl

@@ -234,11 +234,16 @@ multi_call_reply({Ref, CallerPid}, Reply) ->
 %% Init
 %% ----------------------------------------------------------------------------------------------------------
 -spec init(#state{}) -> {ok, HandlerState :: term()}.
-init(State) ->
-    HandlerState = #{},
-    %% rebuild
-    rebuild_monitors(State),
+init(#state{
+    scope = Scope,
+    table_by_name = TableByName,
+    table_by_pid = TableByPid
+}) ->
+    %% purge remote & rebuild
+    purge_groups_for_remote_nodes(Scope, TableByName, TableByPid),
+    rebuild_monitors(TableByName, TableByPid),
     %% init
+    HandlerState = #{},
     {ok, HandlerState}.
 
 %% ----------------------------------------------------------------------------------------------------------
@@ -395,19 +400,19 @@ purge_local_data_for_node(Node, #state{
 %% ===================================================================
 %% Internal
 %% ===================================================================
--spec rebuild_monitors(#state{}) -> ok.
-rebuild_monitors(#state{
-    table_by_name = TableByName
-} = State) ->
+-spec rebuild_monitors(TableByName :: atom(), TableByPid :: atom()) -> ok.
+rebuild_monitors(TableByName, TableByPid) ->
     GroupsTuples = get_groups_tuples_for_node(node(), TableByName),
-    do_rebuild_monitors(GroupsTuples, #{}, State).
+    do_rebuild_monitors(GroupsTuples, #{}, TableByName, TableByPid).
 
--spec do_rebuild_monitors([syn_pg_tuple()], #{pid() => reference()}, #state{}) -> ok.
-do_rebuild_monitors([], _, _) -> ok;
-do_rebuild_monitors([{GroupName, Pid, Meta, Time} | T], NewMRefs, #state{
-    table_by_name = TableByName,
-    table_by_pid = TableByPid
-} = State) ->
+-spec do_rebuild_monitors(
+    [syn_pg_tuple()],
+    #{pid() => reference()},
+    TableByName :: atom(),
+    TableByPid :: atom()
+) -> ok.
+do_rebuild_monitors([], _, _, _) -> ok;
+do_rebuild_monitors([{GroupName, Pid, Meta, Time} | T], NewMRefs, TableByName, TableByPid) ->
     remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
     case is_process_alive(Pid) of
         true ->
@@ -415,15 +420,15 @@ do_rebuild_monitors([{GroupName, Pid, Meta, Time} | T], NewMRefs, #state{
                 error ->
                     MRef = erlang:monitor(process, Pid),
                     add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid),
-                    do_rebuild_monitors(T, maps:put(Pid, MRef, NewMRefs), State);
+                    do_rebuild_monitors(T, maps:put(Pid, MRef, NewMRefs), TableByName, TableByPid);
 
                 {ok, MRef} ->
                     add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid),
-                    do_rebuild_monitors(T, NewMRefs, State)
+                    do_rebuild_monitors(T, NewMRefs, TableByName, TableByPid)
             end;
 
         _ ->
-            do_rebuild_monitors(T, NewMRefs, State)
+            do_rebuild_monitors(T, NewMRefs, TableByName, TableByPid)
     end.
 
 -spec do_join_on_node(
@@ -541,6 +546,19 @@ remove_from_local_table(GroupName, Pid, TableByName, TableByPid) ->
     true = ets:delete(TableByName, {GroupName, Pid}),
     true = ets:delete(TableByPid, {Pid, GroupName}).
 
+-spec purge_groups_for_remote_nodes(Scope :: atom(), TableByName :: atom(), TableByPid :: atom()) -> any().
+purge_groups_for_remote_nodes(Scope, TableByName, TableByPid) ->
+    LocalNode = node(),
+    RemoteNodesWithDoubles = ets:select(TableByName, [{
+        {{'_', '_'}, '_', '_', '_', '$6'},
+        [{'=/=', '$6', LocalNode}],
+        ['$6']
+    }]),
+    RemoteNodes = ordsets:from_list(RemoteNodesWithDoubles),
+    ordsets:fold(fun(RemoteNode, _) ->
+        purge_groups_for_remote_node(Scope, RemoteNode, TableByName, TableByPid)
+    end, undefined, RemoteNodes).
+
 -spec purge_groups_for_remote_node(Scope :: atom(), Node :: atom(), TableByName :: atom(), TableByPid :: atom()) -> true.
 purge_groups_for_remote_node(Scope, Node, TableByName, TableByPid) when Node =/= node() ->
     %% loop elements for callback in a separate process to free scope process

+ 43 - 27
src/syn_registry.erl

@@ -152,11 +152,16 @@ count(Scope, Node) ->
 %% Init
 %% ----------------------------------------------------------------------------------------------------------
 -spec init(#state{}) -> {ok, HandlerState :: term()}.
-init(State) ->
-    HandlerState = #{},
-    %% rebuild
-    rebuild_monitors(State),
+init(#state{
+    scope = Scope,
+    table_by_name = TableByName,
+    table_by_pid = TableByPid
+}) ->
+    %% purge remote & rebuild
+    purge_registry_for_remote_nodes(Scope, TableByName, TableByPid),
+    rebuild_monitors(TableByName, TableByPid),
     %% init
+    HandlerState = #{},
     {ok, HandlerState}.
 
 %% ----------------------------------------------------------------------------------------------------------
@@ -319,35 +324,35 @@ purge_local_data_for_node(Node, #state{
 %% ===================================================================
 %% Internal
 %% ===================================================================
--spec rebuild_monitors(#state{}) -> ok.
-rebuild_monitors(#state{
-    table_by_name = TableByName
-} = State) ->
+-spec rebuild_monitors(TableByName :: atom(), TableByPid :: atom()) -> ok.
+rebuild_monitors(TableByName, TableByPid) ->
     RegistryTuples = get_registry_tuples_for_node(node(), TableByName),
-    do_rebuild_monitors(RegistryTuples, #{}, State).
+    do_rebuild_monitors(RegistryTuples, #{}, TableByName, TableByPid).
 
--spec do_rebuild_monitors([syn_registry_tuple()], #{pid() => reference()}, #state{}) -> ok.
-do_rebuild_monitors([], _, _) -> ok;
-do_rebuild_monitors([{Name, Pid, Meta, Time} | T], NewMonitorRefs, #state{
-    table_by_name = TableByName,
-    table_by_pid = TableByPid
-} = State) ->
+-spec do_rebuild_monitors(
+    [syn_registry_tuple()],
+    #{pid() => reference()},
+    TableByName :: atom(),
+    TableByPid :: atom()
+) -> ok.
+do_rebuild_monitors([], _, _, _) -> ok;
+do_rebuild_monitors([{Name, Pid, Meta, Time} | T], NewMRefs, TableByName, TableByPid) ->
     remove_from_local_table(Name, Pid, TableByName, TableByPid),
     case is_process_alive(Pid) of
         true ->
-            case maps:find(Pid, NewMonitorRefs) of
+            case maps:find(Pid, NewMRefs) of
                 error ->
                     MRef = erlang:monitor(process, Pid),
                     add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid),
-                    do_rebuild_monitors(T, maps:put(Pid, MRef, NewMonitorRefs), State);
+                    do_rebuild_monitors(T, maps:put(Pid, MRef, NewMRefs), TableByName, TableByPid);
 
                 {ok, MRef} ->
                     add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid),
-                    do_rebuild_monitors(T, NewMonitorRefs, State)
+                    do_rebuild_monitors(T, NewMRefs, TableByName, TableByPid)
             end;
 
         _ ->
-            do_rebuild_monitors(T, NewMonitorRefs, State)
+            do_rebuild_monitors(T, NewMRefs, TableByName, TableByPid)
     end.
 
 -spec do_register_on_node(
@@ -480,17 +485,28 @@ update_local_table(Name, PreviousPid, {Pid, Meta, Time, MRef}, TableByName, Tabl
     remove_from_local_table(Name, PreviousPid, TableByName, TableByPid),
     add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid).
 
+-spec purge_registry_for_remote_nodes(Scope :: atom(), TableByName :: atom(), TableByPid :: atom()) -> any().
+purge_registry_for_remote_nodes(Scope, TableByName, TableByPid) ->
+    LocalNode = node(),
+    RemoteNodesWithDoubles = ets:select(TableByName, [{
+        {'_', '_', '_', '_', '_', '$6'},
+        [{'=/=', '$6', LocalNode}],
+        ['$6']
+    }]),
+    RemoteNodes = ordsets:from_list(RemoteNodesWithDoubles),
+    ordsets:fold(fun(RemoteNode, _) ->
+        purge_registry_for_remote_node(Scope, RemoteNode, TableByName, TableByPid)
+    end, undefined, RemoteNodes).
+
 -spec purge_registry_for_remote_node(Scope :: atom(), Node :: atom(), TableByName :: atom(), TableByPid :: atom()) -> true.
 purge_registry_for_remote_node(Scope, Node, TableByName, TableByPid) when Node =/= node() ->
-    %% loop elements for callback in a separate process to free scope process
+    %% loop elements for callback
     RegistryTuples = get_registry_tuples_for_node(Node, TableByName),
-    spawn(fun() ->
-        lists:foreach(fun({Name, Pid, Meta, _Time}) ->
-            syn_event_handler:call_event_handler(on_process_unregistered,
-                [Scope, Name, Pid, Meta, {syn_remote_scope_node_down, Scope, Node}]
-            )
-        end, RegistryTuples)
-    end),
+    lists:foreach(fun({Name, Pid, Meta, _Time}) ->
+        syn_event_handler:call_event_handler(on_process_unregistered,
+            [Scope, Name, Pid, Meta, {syn_remote_scope_node_down, Scope, Node}]
+        )
+    end, RegistryTuples),
     %% remove all from pid table
     true = ets:match_delete(TableByName, {'_', '_', '_', '_', '_', Node}),
     true = ets:match_delete(TableByPid, {'_', '_', '_', '_', '_', Node}).

+ 11 - 0
test/syn_pg_SUITE.erl

@@ -566,10 +566,21 @@ three_nodes_join_leave_and_monitor(Config) ->
     1 = rpc:call(SlaveNode2, syn, group_count, [scope_bc, SlaveNode1]),
     0 = rpc:call(SlaveNode2, syn, group_count, [scope_bc, SlaveNode2]),
 
+    syn:join(scope_ab, {group, "two"}, PidRemoteOn1),
+    syn_test_suite_helper:assert_wait(
+        lists:sort([{Pid, undefined}, {PidWithMeta, "with-meta-2"}, {PidRemoteOn1, undefined}]),
+        fun() -> lists:sort(syn:members(scope_ab, {group, "two"})) end
+    ),
+
     %% crash scope process to ensure that monitors get recreated
     exit(whereis(syn_pg_scope_ab), kill),
     syn_test_suite_helper:wait_process_name_ready(syn_pg_scope_ab),
 
+    syn_test_suite_helper:assert_wait(
+        lists:sort([{Pid, undefined}, {PidWithMeta, "with-meta-2"}, {PidRemoteOn1, undefined}]),
+        fun() -> lists:sort(syn:members(scope_ab, {group, "two"})) end
+    ),
+
     %% kill process
     syn_test_suite_helper:kill_process(Pid),
     syn_test_suite_helper:kill_process(PidRemoteOn1),

+ 18 - 5
test/syn_registry_SUITE.erl

@@ -432,10 +432,23 @@ three_nodes_register_unregister_and_monitor(Config) ->
     ),
     {badrpc, {'EXIT', {{invalid_scope, scope_ab}, _}}} = catch rpc:call(SlaveNode2, syn, lookup, [scope_ab, "scope_a_alias"]),
 
-    %% crash scope process to ensure that monitors get recreated
+    %% register remote
+    syn:register(scope_ab, "ab_on_1", PidRemoteWithMetaOn1, <<"ab-on-1">>),
+    syn_test_suite_helper:assert_wait(
+        {PidRemoteWithMetaOn1, <<"ab-on-1">>},
+        fun() -> syn:lookup(scope_ab, "ab_on_1") end
+    ),
+
+    %% crash scope process to ensure that monitors get recreated & data received from other nodes
     syn_test_suite_helper:kill_process(syn_registry_scope_ab),
     syn_test_suite_helper:wait_process_name_ready(syn_registry_scope_ab),
 
+    %% check remote has been sync'ed back
+    syn_test_suite_helper:assert_wait(
+        {PidRemoteWithMetaOn1, <<"ab-on-1">>},
+        fun() -> syn:lookup(scope_ab, "ab_on_1") end
+    ),
+
     %% kill process
     syn_test_suite_helper:kill_process(Pid),
     syn_test_suite_helper:kill_process(PidWithMeta),
@@ -472,17 +485,17 @@ three_nodes_register_unregister_and_monitor(Config) ->
         undefined,
         fun() -> rpc:call(SlaveNode2, syn, lookup, [scope_bc, {remote_scoped_bc}]) end
     ),
-    0 = syn:registry_count(scope_ab),
+    1 = syn:registry_count(scope_ab),
     0 = syn:registry_count(scope_ab, node()),
-    0 = syn:registry_count(scope_ab, SlaveNode1),
+    1 = syn:registry_count(scope_ab, SlaveNode1),
     0 = syn:registry_count(scope_ab, SlaveNode2),
     {'EXIT', {{invalid_scope, scope_bc}, _}} = catch syn:registry_count(scope_bc),
     {'EXIT', {{invalid_scope, scope_bc}, _}} = catch syn:registry_count(scope_bc, node()),
     {'EXIT', {{invalid_scope, scope_bc}, _}} = catch syn:registry_count(scope_bc, SlaveNode1),
     {'EXIT', {{invalid_scope, scope_bc}, _}} = catch syn:registry_count(scope_bc, SlaveNode2),
-    0 = rpc:call(SlaveNode1, syn, registry_count, [scope_ab]),
+    1 = rpc:call(SlaveNode1, syn, registry_count, [scope_ab]),
     0 = rpc:call(SlaveNode1, syn, registry_count, [scope_ab, node()]),
-    0 = rpc:call(SlaveNode1, syn, registry_count, [scope_ab, SlaveNode1]),
+    1 = rpc:call(SlaveNode1, syn, registry_count, [scope_ab, SlaveNode1]),
     0 = rpc:call(SlaveNode1, syn, registry_count, [scope_ab, SlaveNode2]),
     0 = rpc:call(SlaveNode1, syn, registry_count, [scope_bc]),
     0 = rpc:call(SlaveNode1, syn, registry_count, [scope_bc, node()]),