Browse Source

Rebuild monitors if registry or groups were to exit.

Roberto Ostinelli 5 years ago
parent
commit
42ca07476a
5 changed files with 106 additions and 39 deletions
  1. 1 1
      src/syn.hrl
  2. 29 19
      src/syn_groups.erl
  3. 29 15
      src/syn_registry.erl
  4. 25 2
      test/syn_groups_SUITE.erl
  5. 22 2
      test/syn_registry_SUITE.erl

+ 1 - 1
src/syn.hrl

@@ -46,7 +46,7 @@
     Meta :: any()
 }.
 -type syn_group_tuple() :: {
-    Name :: any(),
+    GroupName :: any(),
     Pid :: pid(),
     Meta :: any()
 }.

+ 29 - 19
src/syn_groups.erl

@@ -184,12 +184,7 @@ multi_call_reply(CallerPid, Reply) ->
 -spec sync_get_local_group_tuples(FromNode :: node()) -> list(syn_group_tuple()).
 sync_get_local_group_tuples(FromNode) ->
     error_logger:info_msg("Syn(~p): Received request of local group tuples from remote node: ~p~n", [node(), FromNode]),
-    %% build match specs
-    MatchHead = #syn_groups_table{name = '$1', pid = '$2', node = '$3', meta = '$4', _ = '_'},
-    Guard = {'=:=', '$3', node()},
-    GroupTupleFormat = {{'$1', '$2', '$4'}},
-    %% select
-    mnesia:dirty_select(syn_groups_table, [{MatchHead, [Guard], [GroupTupleFormat]}]).
+    get_group_tuples_for_node(node()).
 
 %% ===================================================================
 %% Callbacks
@@ -204,6 +199,8 @@ sync_get_local_group_tuples(FromNode) ->
     ignore |
     {stop, Reason :: any()}.
 init([]) ->
+    %% rebuild
+    rebuild_monitors(),
     %% monitor nodes
     ok = net_kernel:monitor_nodes(true),
     %% get handler
@@ -306,7 +303,7 @@ handle_info({nodeup, RemoteNode}, State) ->
                 "Syn(~p): Received ~p group entrie(s) from remote node ~p, writing to local~n",
                 [node(), length(GroupTuples), RemoteNode]
             ),
-            sync_group_tuples(RemoteNode, GroupTuples),
+            write_group_tuples_for_node(GroupTuples, RemoteNode),
             %% exit
             error_logger:warning_msg("Syn(~p): GROUPS AUTOMERGE <---- Done for remote node ~p~n", [node(), RemoteNode])
         end
@@ -316,7 +313,7 @@ handle_info({nodeup, RemoteNode}, State) ->
 
 handle_info({nodedown, RemoteNode}, State) ->
     error_logger:warning_msg("Syn(~p): Node ~p has left the cluster, removing group entries on local~n", [node(), RemoteNode]),
-    purge_group_entries_for_remote_node(RemoteNode),
+    raw_purge_group_entries_for_node(RemoteNode),
     {noreply, State};
 
 handle_info(Info, State) ->
@@ -427,6 +424,15 @@ find_process_entry_by_name_and_pid(GroupName, Pid) ->
         [] -> undefined
     end.
 
+-spec get_group_tuples_for_node(Node :: node()) -> [syn_group_tuple()].
+get_group_tuples_for_node(Node) ->
+    %% build match specs
+    MatchHead = #syn_groups_table{name = '$1', pid = '$2', node = '$3', meta = '$4', _ = '_'},
+    Guard = {'=:=', '$3', Node},
+    GroupTupleFormat = {{'$1', '$2', '$4'}},
+    %% select
+    mnesia:dirty_select(syn_groups_table, [{MatchHead, [Guard], [GroupTupleFormat]}]).
+
 -spec handle_process_down(GroupName :: any(), Pid :: pid(), Meta :: any(), Reason :: any(), #state{}) -> ok.
 handle_process_down(GroupName, Pid, Meta, Reason, #state{
     custom_event_handler = CustomEventHandler
@@ -441,19 +447,17 @@ handle_process_down(GroupName, Pid, Meta, Reason, #state{
             syn_event_handler:do_on_group_process_exit(GroupName, Pid, Meta, Reason, CustomEventHandler)
     end.
 
--spec sync_group_tuples(RemoteNode :: node(), GroupTuples :: [syn_registry_tuple()]) -> ok.
-sync_group_tuples(RemoteNode, GroupTuples) ->
-    %% ensure that groups doesn't have any joining node's entries (here again for race conditions)
-    purge_group_entries_for_remote_node(RemoteNode),
-    %% loop
-    F = fun({Name, RemotePid, RemoteMeta}) ->
+-spec write_group_tuples_for_node(GroupTuples :: [syn_registry_tuple()], RemoteNode :: node()) -> ok.
+write_group_tuples_for_node(GroupTuples, RemoteNode) ->
+    %% ensure that groups doesn't have any joining node's entries
+    raw_purge_group_entries_for_node(RemoteNode),
+    %% add
+    lists:foreach(fun({Name, RemotePid, RemoteMeta}) ->
         join_on_node(Name, RemotePid, RemoteMeta)
-    end,
-    %% add to table
-    lists:foreach(F, GroupTuples).
+    end, GroupTuples).
 
--spec purge_group_entries_for_remote_node(Node :: atom()) -> ok.
-purge_group_entries_for_remote_node(Node) when Node =/= node() ->
+-spec raw_purge_group_entries_for_node(Node :: atom()) -> ok.
+raw_purge_group_entries_for_node(Node) ->
     %% NB: no demonitoring is done, hence why this needs to run for a remote node
     %% build match specs
     Pattern = #syn_groups_table{node = Node, _ = '_'},
@@ -497,3 +501,9 @@ collect_replies(MemberPids, Replies, BadPids) ->
             MemberPids1 = lists:delete(Pid, MemberPids),
             collect_replies(MemberPids1, Replies, [Pid | BadPids])
     end.
+
+-spec rebuild_monitors() -> ok.
+rebuild_monitors() ->
+    GroupTuples = get_group_tuples_for_node(node()),
+    %% remove all
+    write_group_tuples_for_node(GroupTuples, node()).

+ 29 - 15
src/syn_registry.erl

@@ -97,23 +97,13 @@ count() ->
 
 -spec count(Node :: node()) -> non_neg_integer().
 count(Node) ->
-    %% build match specs
-    MatchHead = #syn_registry_table{node = '$2', _ = '_'},
-    Guard = {'=:=', '$2', Node},
-    Result = '$2',
-    %% select
-    Processes = mnesia:dirty_select(syn_registry_table, [{MatchHead, [Guard], [Result]}]),
-    length(Processes).
+    RegistryTuples = get_registry_tuples_for_node(Node),
+    length(RegistryTuples).
 
--spec sync_get_local_registry_tuples(FromNode :: node()) -> list(syn_registry_tuple()).
+-spec sync_get_local_registry_tuples(FromNode :: node()) -> [syn_registry_tuple()].
 sync_get_local_registry_tuples(FromNode) ->
     error_logger:info_msg("Syn(~p): Received request of local registry tuples from remote node: ~p~n", [node(), FromNode]),
-    %% build match specs
-    MatchHead = #syn_registry_table{name = '$1', pid = '$2', node = '$3', meta = '$4', _ = '_'},
-    Guard = {'=:=', '$3', node()},
-    RegistryTupleFormat = {{'$1', '$2', '$4'}},
-    %% select
-    mnesia:dirty_select(syn_registry_table, [{MatchHead, [Guard], [RegistryTupleFormat]}]).
+    get_registry_tuples_for_node(node()).
 
 %% ===================================================================
 %% Callbacks
@@ -128,6 +118,8 @@ sync_get_local_registry_tuples(FromNode) ->
     ignore |
     {stop, Reason :: any()}.
 init([]) ->
+    %% rebuild monitors (if coming after a crash)
+    rebuild_monitors(),
     %% monitor nodes
     ok = net_kernel:monitor_nodes(true),
     %% get handler
@@ -330,7 +322,7 @@ add_to_local_table(Name, Pid, Meta, MonitorRef) ->
 remove_from_local_table(Name) ->
     mnesia:dirty_delete(syn_registry_table, Name).
 
--spec find_processes_entry_by_pid(Pid :: pid()) -> Entries :: list(#syn_registry_table{}).
+-spec find_processes_entry_by_pid(Pid :: pid()) -> Entries :: [#syn_registry_table{}].
 find_processes_entry_by_pid(Pid) when is_pid(Pid) ->
     mnesia:dirty_index_read(syn_registry_table, Pid, #syn_registry_table.pid).
 
@@ -341,6 +333,15 @@ find_process_entry_by_name(Name) ->
         _ -> undefined
     end.
 
+-spec get_registry_tuples_for_node(Node :: node()) -> [syn_registry_tuple()].
+get_registry_tuples_for_node(Node) ->
+    %% build match specs
+    MatchHead = #syn_registry_table{name = '$1', pid = '$2', node = '$3', meta = '$4', _ = '_'},
+    Guard = {'=:=', '$3', Node},
+    RegistryTupleFormat = {{'$1', '$2', '$4'}},
+    %% select
+    mnesia:dirty_select(syn_registry_table, [{MatchHead, [Guard], [RegistryTupleFormat]}]).
+
 -spec handle_process_down(Name :: any(), Pid :: pid(), Meta :: any(), Reason :: any(), #state{}) -> ok.
 handle_process_down(Name, Pid, Meta, Reason, #state{
     custom_event_handler = CustomEventHandler
@@ -427,3 +428,16 @@ purge_registry_entries_for_remote_node(Node) when Node =/= node() ->
     NodePids = mnesia:dirty_select(syn_registry_table, [{MatchHead, [Guard], [IdFormat]}]),
     DelF = fun(Id) -> mnesia:dirty_delete({syn_registry_table, Id}) end,
     lists:foreach(DelF, NodePids).
+
+-spec rebuild_monitors() -> ok.
+rebuild_monitors() ->
+    RegistryTuples = get_registry_tuples_for_node(node()),
+    lists:foreach(fun({Name, Pid, Meta}) ->
+        case is_process_alive(Pid) of
+            true ->
+                MonitorRef = erlang:monitor(process, Pid),
+                add_to_local_table(Name, Pid, Meta, MonitorRef);
+            _ ->
+                remove_from_local_table(Name)
+        end
+    end, RegistryTuples).

+ 25 - 2
test/syn_groups_SUITE.erl

@@ -39,7 +39,8 @@
     single_node_publish/1,
     single_node_multicall/1,
     single_node_multicall_with_custom_timeout/1,
-    single_node_callback_on_process_exit/1
+    single_node_callback_on_process_exit/1,
+    single_node_monitor_after_group_crash/1
 ]).
 -export([
     two_nodes_join_monitor_and_unregister/1,
@@ -95,7 +96,8 @@ groups() ->
             single_node_publish,
             single_node_multicall,
             single_node_multicall_with_custom_timeout,
-            single_node_callback_on_process_exit
+            single_node_callback_on_process_exit,
+            single_node_monitor_after_group_crash
         ]},
         {two_nodes_groups, [shuffle], [
             two_nodes_join_monitor_and_unregister,
@@ -436,6 +438,27 @@ single_node_callback_on_process_exit(_Config) ->
         ok
     end.
 
+single_node_monitor_after_group_crash(_Config) ->
+    GroupName = "my group",
+    %% start
+    ok = syn:start(),
+    %% start processes
+    Pid = syn_test_suite_helper:start_process(),
+    %% join
+    ok = syn:join(GroupName, Pid),
+    %% kill groups
+    exit(whereis(syn_groups), kill),
+    timer:sleep(200),
+    %% retrieve
+    true = syn:member(Pid, GroupName),
+    [Pid] = syn:get_members(GroupName),
+    %% kill process
+    syn_test_suite_helper:kill_process(Pid),
+    timer:sleep(200),
+    %% retrieve
+    false = syn:member(Pid, GroupName),
+    [] = syn:get_members(GroupName).
+
 two_nodes_join_monitor_and_unregister(Config) ->
     GroupName = "my group",
     %% get slave

+ 22 - 2
test/syn_registry_SUITE.erl

@@ -38,7 +38,8 @@
     single_node_registration_errors/1,
     single_node_registry_count/1,
     single_node_register_gen_server/1,
-    single_node_callback_on_process_exit/1
+    single_node_callback_on_process_exit/1,
+    single_node_monitor_after_registry_crash/1
 ]).
 -export([
     two_nodes_register_monitor_and_unregister/1,
@@ -98,7 +99,8 @@ groups() ->
             single_node_registration_errors,
             single_node_registry_count,
             single_node_register_gen_server,
-            single_node_callback_on_process_exit
+            single_node_callback_on_process_exit,
+            single_node_monitor_after_registry_crash
         ]},
         {two_nodes_process_registration, [shuffle], [
             two_nodes_register_monitor_and_unregister,
@@ -357,6 +359,24 @@ single_node_callback_on_process_exit(_Config) ->
         ok
     end.
 
+single_node_monitor_after_registry_crash(_Config) ->
+    %% start
+    ok = syn:start(),
+    %% start processes
+    Pid = syn_test_suite_helper:start_process(),
+    %% register
+    ok = syn:register(<<"my proc">>, Pid),
+    %% kill registry
+    exit(whereis(syn_registry), kill),
+    timer:sleep(200),
+    %% retrieve
+    Pid = syn:whereis(<<"my proc">>),
+    %% kill process
+    syn_test_suite_helper:kill_process(Pid),
+    timer:sleep(200),
+    %% retrieve
+    undefined = syn:whereis(<<"my proc 2">>).
+
 two_nodes_register_monitor_and_unregister(Config) ->
     %% get slave
     SlaveNode = proplists:get_value(slave_node, Config),