Просмотр исходного кода

Ensure that data is sync'ed as soon as a new node joins.

Roberto Ostinelli 5 лет назад
Родитель
Сommit
d4b28885df
4 измененных файлов с 159 добавлено и 81 удалено
  1. 44 24
      src/syn_groups.erl
  2. 73 53
      src/syn_registry.erl
  3. 21 2
      test/syn_groups_SUITE.erl
  4. 21 2
      test/syn_registry_SUITE.erl

+ 44 - 24
src/syn_groups.erl

@@ -213,6 +213,8 @@ init([]) ->
     ok = net_kernel:monitor_nodes(true),
     %% get handler
     CustomEventHandler = syn_backbone:get_event_handler_module(),
+    %% start first sync
+    self() ! sync_all,
     %% init
     {ok, #state{
         custom_event_handler = CustomEventHandler
@@ -312,32 +314,18 @@ handle_info({'DOWN', _MonitorRef, process, Pid, Reason}, State) ->
     %% return
     {noreply, State};
 
+handle_info(sync_all, State) ->
+    error_logger:info_msg("Syn(~p): Start first groups sync~n", [node()]),
+    %% loop all nodes
+    lists:foreach(fun(RemoteNode) ->
+        group_manager_automerge(RemoteNode)
+    end, nodes()),
+    %% return
+    {noreply, State};
+
 handle_info({nodeup, RemoteNode}, State) ->
     error_logger:info_msg("Syn(~p): Node ~p has joined the cluster~n", [node(), RemoteNode]),
-    global:trans({{?MODULE, auto_merge_groups}, self()},
-        fun() ->
-            error_logger:info_msg("Syn(~p): GROUP MANAGER AUTOMERGE ----> Initiating for remote node ~p~n", [node(), RemoteNode]),
-            %% get group tuples from remote node
-            GroupTuples = rpc:call(RemoteNode, ?MODULE, sync_get_local_group_tuples, [node()]),
-            error_logger:info_msg(
-                "Syn(~p): Received ~p group tuple(s) from remote node ~p~n",
-                [node(), length(GroupTuples), RemoteNode]
-            ),
-            %% ensure that groups doesn't have any joining node's entries
-            raw_purge_group_entries_for_node(RemoteNode),
-            %% add
-            lists:foreach(fun({GroupName, RemotePid, RemoteMeta}) ->
-                case rpc:call(node(RemotePid), erlang, is_process_alive, [RemotePid]) of
-                    true ->
-                        add_to_local_table(GroupName, RemotePid, RemoteMeta, undefined);
-                    _ ->
-                        ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [GroupName, RemotePid])
-                end
-            end, GroupTuples),
-            %% exit
-            error_logger:info_msg("Syn(~p): GROUP MANAGER AUTOMERGE <---- Done for remote node ~p~n", [node(), RemoteNode])
-        end
-    ),
+    group_manager_automerge(RemoteNode),
     %% resume
     {noreply, State};
 
@@ -494,6 +482,38 @@ handle_process_down(GroupName, Pid, Meta, Reason, #state{
             syn_event_handler:do_on_group_process_exit(GroupName, Pid, Meta, Reason, CustomEventHandler)
     end.
 
+-spec group_manager_automerge(RemoteNode :: node()) -> ok.
+group_manager_automerge(RemoteNode) ->
+    global:trans({{?MODULE, auto_merge_groups}, self()},
+        fun() ->
+            error_logger:info_msg("Syn(~p): GROUP MANAGER AUTOMERGE ----> Initiating for remote node ~p~n", [node(), RemoteNode]),
+            %% get group tuples from remote node
+            case rpc:call(RemoteNode, ?MODULE, sync_get_local_group_tuples, [node()]) of
+                {badrpc, _} ->
+                    error_logger:info_msg("Syn(~p): GROUP MANAGER AUTOMERGE <---- Syn not ready on remote node ~p, aborting~n", [node(), RemoteNode]);
+
+                GroupTuples ->
+                    error_logger:info_msg(
+                        "Syn(~p): Received ~p group tuple(s) from remote node ~p~n",
+                        [node(), length(GroupTuples), RemoteNode]
+                    ),
+                    %% ensure that groups doesn't have any joining node's entries
+                    raw_purge_group_entries_for_node(RemoteNode),
+                    %% add
+                    lists:foreach(fun({GroupName, RemotePid, RemoteMeta}) ->
+                        case rpc:call(node(RemotePid), erlang, is_process_alive, [RemotePid]) of
+                            true ->
+                                add_to_local_table(GroupName, RemotePid, RemoteMeta, undefined);
+                            _ ->
+                                ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [GroupName, RemotePid])
+                        end
+                    end, GroupTuples),
+                    %% exit
+                    error_logger:info_msg("Syn(~p): GROUP MANAGER AUTOMERGE <---- Done for remote node ~p~n", [node(), RemoteNode])
+            end
+        end
+    ).
+
 -spec raw_purge_group_entries_for_node(Node :: atom()) -> ok.
 raw_purge_group_entries_for_node(Node) ->
     %% NB: no demonitoring is done, this is why it's raw

+ 73 - 53
src/syn_registry.erl

@@ -132,6 +132,8 @@ init([]) ->
     ok = net_kernel:monitor_nodes(true),
     %% get handler
     CustomEventHandler = syn_backbone:get_event_handler_module(),
+    %% start first sync
+    self() ! sync_all,
     %% init
     {ok, #state{
         custom_event_handler = CustomEventHandler
@@ -289,61 +291,18 @@ handle_info({'DOWN', _MonitorRef, process, Pid, Reason}, State) ->
     %% return
     {noreply, State};
 
+handle_info(sync_all, State) ->
+    error_logger:info_msg("Syn(~p): Start first registry sync~n", [node()]),
+    %% loop all nodes
+    lists:foreach(fun(RemoteNode) ->
+        registry_automerge(RemoteNode, State)
+    end, nodes()),
+    %% return
+    {noreply, State};
+
 handle_info({nodeup, RemoteNode}, State) ->
     error_logger:info_msg("Syn(~p): Node ~p has joined the cluster~n", [node(), RemoteNode]),
-    global:trans({{?MODULE, auto_merge_registry}, self()},
-        fun() ->
-            error_logger:info_msg("Syn(~p): REGISTRY AUTOMERGE ----> Initiating for remote node ~p~n", [node(), RemoteNode]),
-            %% get registry tuples from remote node
-            RegistryTuples = rpc:call(RemoteNode, ?MODULE, sync_get_local_registry_tuples, [node()]),
-            error_logger:info_msg(
-                "Syn(~p): Received ~p registry tuple(s) from remote node ~p~n",
-                [node(), length(RegistryTuples), RemoteNode]
-            ),
-            %% ensure that registry doesn't have any joining node's entries (here again for race conditions)
-            raw_purge_registry_entries_for_remote_node(RemoteNode),
-            %% loop
-            F = fun({Name, RemotePid, RemoteMeta}) ->
-                %% check if same name is registered
-                case find_process_entry_by_name(Name) of
-                    undefined ->
-                        %% no conflict
-                        case rpc:call(node(RemotePid), erlang, is_process_alive, [RemotePid]) of
-                            true ->
-                                add_to_local_table(Name, RemotePid, RemoteMeta, undefined);
-                            _ ->
-                                ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name])
-                        end;
-
-                    Entry ->
-                        LocalPid = Entry#syn_registry_table.pid,
-                        LocalMeta = Entry#syn_registry_table.meta,
-
-                        error_logger:warning_msg(
-                            "Syn(~p): Conflicting name in auto merge for: ~p, processes are ~p, ~p~n",
-                            [node(), Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta}]
-                        ),
-
-                        case resolve_conflict(Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta}, State) of
-                            {PidToKeep, PidToKill} when PidToKeep =:= LocalPid ->
-                                ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name]),
-                                syn_kill(PidToKill, Name, RemoteMeta);
-
-                            {PidToKeep, PidToKill} when PidToKeep =:= RemotePid ->
-                                add_to_local_table(Name, RemotePid, RemoteMeta, undefined),
-                                syn_kill(PidToKill, Name, LocalMeta);
-
-                            _ ->
-                                ok
-                        end
-                end
-            end,
-            %% add to table
-            lists:foreach(F, RegistryTuples),
-            %% exit
-            error_logger:info_msg("Syn(~p): REGISTRY AUTOMERGE <---- Done for remote node ~p~n", [node(), RemoteNode])
-        end
-    ),
+    registry_automerge(RemoteNode, State),
     %% resume
     {noreply, State};
 
@@ -482,6 +441,67 @@ handle_process_down(Name, Pid, Meta, Reason, #state{
             syn_event_handler:do_on_process_exit(Name, Pid, Meta, Reason, CustomEventHandler)
     end.
 
+-spec registry_automerge(RemoteNode :: node(), #state{}) -> ok.
+registry_automerge(RemoteNode, State) ->
+    global:trans({{?MODULE, auto_merge_registry}, self()},
+        fun() ->
+            error_logger:info_msg("Syn(~p): REGISTRY AUTOMERGE ----> Initiating for remote node ~p~n", [node(), RemoteNode]),
+            %% get registry tuples from remote node
+            case rpc:call(RemoteNode, ?MODULE, sync_get_local_registry_tuples, [node()]) of
+                {badrpc, _} ->
+                    error_logger:info_msg("Syn(~p): REGISTRY AUTOMERGE <---- Syn not ready on remote node ~p, aborting~n", [node(), RemoteNode]);
+
+                RegistryTuples ->
+                    error_logger:info_msg(
+                        "Syn(~p): Received ~p registry tuple(s) from remote node ~p~n",
+                        [node(), length(RegistryTuples), RemoteNode]
+                    ),
+                    %% ensure that registry doesn't have any joining node's entries (here again for race conditions)
+                    raw_purge_registry_entries_for_remote_node(RemoteNode),
+                    %% loop
+                    F = fun({Name, RemotePid, RemoteMeta}) ->
+                        %% check if same name is registered
+                        case find_process_entry_by_name(Name) of
+                            undefined ->
+                                %% no conflict
+                                case rpc:call(node(RemotePid), erlang, is_process_alive, [RemotePid]) of
+                                    true ->
+                                        add_to_local_table(Name, RemotePid, RemoteMeta, undefined);
+                                    _ ->
+                                        ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name])
+                                end;
+
+                            Entry ->
+                                LocalPid = Entry#syn_registry_table.pid,
+                                LocalMeta = Entry#syn_registry_table.meta,
+
+                                error_logger:warning_msg(
+                                    "Syn(~p): Conflicting name in auto merge for: ~p, processes are ~p, ~p~n",
+                                    [node(), Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta}]
+                                ),
+
+                                case resolve_conflict(Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta}, State) of
+                                    {PidToKeep, PidToKill} when PidToKeep =:= LocalPid ->
+                                        ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name]),
+                                        syn_kill(PidToKill, Name, RemoteMeta);
+
+                                    {PidToKeep, PidToKill} when PidToKeep =:= RemotePid ->
+                                        add_to_local_table(Name, RemotePid, RemoteMeta, undefined),
+                                        syn_kill(PidToKill, Name, LocalMeta);
+
+                                    _ ->
+                                        ok
+                                end
+                        end
+                    end,
+                    %% add to table
+                    lists:foreach(F, RegistryTuples),
+                    %% exit
+                    error_logger:info_msg("Syn(~p): REGISTRY AUTOMERGE <---- Done for remote node ~p~n", [node(), RemoteNode])
+            end
+        end
+    ).
+
 -spec resolve_conflict(
     Name :: any(),
     {LocalPid :: pid(), LocalMeta :: any()},

+ 21 - 2
test/syn_groups_SUITE.erl

@@ -47,7 +47,8 @@
     two_nodes_local_members/1,
     two_nodes_publish/1,
     two_nodes_local_publish/1,
-    two_nodes_multicall/1
+    two_nodes_multicall/1,
+    two_nodes_groups_wait_for_syn_up/1
 ]).
 -export([
     three_nodes_partial_netsplit_consistency/1,
@@ -104,7 +105,8 @@ groups() ->
             two_nodes_local_members,
             two_nodes_publish,
             two_nodes_local_publish,
-            two_nodes_multicall
+            two_nodes_multicall,
+            two_nodes_groups_wait_for_syn_up
         ]},
         {three_nodes_groups, [shuffle], [
             three_nodes_partial_netsplit_consistency,
@@ -726,6 +728,23 @@ two_nodes_multicall(Config) ->
     ]) =:= lists:sort(Replies),
     [PidUnresponsive] = BadPids.
 
+two_nodes_groups_wait_for_syn_up(_Config) ->
+    %% stop slave
+    syn_test_suite_helper:stop_slave(syn_slave),
+    %% start syn on local node
+    ok = syn:start(),
+    %% start process
+    Pid = syn_test_suite_helper:start_process(),
+    %% join
+    ok = syn:join(<<"group">>, Pid),
+    %% start remote node and syn
+    {ok, SlaveNode} = syn_test_suite_helper:start_slave(syn_slave),
+    ok = rpc:call(SlaveNode, syn, start, []),
+    timer:sleep(1000),
+    %% check
+    [Pid] = syn:get_members(<<"group">>),
+    [Pid] = rpc:call(SlaveNode, syn, get_members, [<<"group">>]).
+
 three_nodes_partial_netsplit_consistency(Config) ->
     GroupName = "my group",
     %% get slaves

+ 21 - 2
test/syn_registry_SUITE.erl

@@ -46,7 +46,8 @@
     two_nodes_register_monitor_and_unregister/1,
     two_nodes_registry_count/1,
     two_nodes_registration_race_condition_conflict_resolution/1,
-    two_nodes_registration_race_condition_conflict_resolution_when_process_died/1
+    two_nodes_registration_race_condition_conflict_resolution_when_process_died/1,
+    two_nodes_registry_wait_for_syn_up/1
 ]).
 -export([
     three_nodes_partial_netsplit_consistency/1,
@@ -112,7 +113,8 @@ groups() ->
             two_nodes_register_monitor_and_unregister,
             two_nodes_registry_count,
             two_nodes_registration_race_condition_conflict_resolution,
-            two_nodes_registration_race_condition_conflict_resolution_when_process_died
+            two_nodes_registration_race_condition_conflict_resolution_when_process_died,
+            two_nodes_registry_wait_for_syn_up
         ]},
         {three_nodes_process_registration, [shuffle], [
             three_nodes_partial_netsplit_consistency,
@@ -530,6 +532,23 @@ two_nodes_registration_race_condition_conflict_resolution_when_process_died(Conf
     %% check that process is alive
     true = rpc:call(SlaveNode, erlang, is_process_alive, [Pid1]).
 
+two_nodes_registry_wait_for_syn_up(_Config) ->
+    %% stop slave
+    syn_test_suite_helper:stop_slave(syn_slave),
+    %% start syn on local node
+    ok = syn:start(),
+    %% start process
+    Pid = syn_test_suite_helper:start_process(),
+    %% register
+    ok = syn:register(<<"proc">>, Pid),
+    %% start remote node and syn
+    {ok, SlaveNode} = syn_test_suite_helper:start_slave(syn_slave),
+    ok = rpc:call(SlaveNode, syn, start, []),
+    timer:sleep(1000),
+    %% check
+    Pid = syn:whereis(<<"proc">>),
+    Pid = rpc:call(SlaveNode, syn, whereis, [<<"proc">>]).
+
 three_nodes_partial_netsplit_consistency(Config) ->
     %% get slaves
     SlaveNode1 = proplists:get_value(slave_node_1, Config),