Browse Source

Ensure that syn syncs to the full cluster on boot.

Roberto Ostinelli 5 years ago
parent
commit
d4a655ec1a
4 changed files with 147 additions and 50 deletions
  1. 15 6
      src/syn_groups.erl
  2. 54 40
      src/syn_registry.erl
  3. 39 2
      test/syn_groups_SUITE.erl
  4. 39 2
      test/syn_registry_SUITE.erl

+ 15 - 6
src/syn_groups.erl

@@ -213,6 +213,8 @@ init([]) ->
     ok = net_kernel:monitor_nodes(true),
     %% get handler
     CustomEventHandler = syn_backbone:get_event_handler_module(),
+    %% send message to initiate full cluster sync
+    timer:send_after(0, self(), sync_full_cluster),
     %% init
     {ok, #state{
         custom_event_handler = CustomEventHandler
@@ -314,7 +316,7 @@ handle_info({'DOWN', _MonitorRef, process, Pid, Reason}, State) ->
 
 handle_info({nodeup, RemoteNode}, State) ->
     error_logger:info_msg("Syn(~p): Node ~p has joined the cluster~n", [node(), RemoteNode]),
-    group_manager_automerge(RemoteNode),
+    groups_automerge(RemoteNode),
     %% resume
     {noreply, State};
 
@@ -323,6 +325,13 @@ handle_info({nodedown, RemoteNode}, State) ->
     raw_purge_group_entries_for_node(RemoteNode),
     {noreply, State};
 
+handle_info(sync_full_cluster, State) ->
+    error_logger:info_msg("Syn(~p): Initiating full cluster groups sync for nodes: ~p~n", [node(), nodes()]),
+    lists:foreach(fun(RemoteNode) ->
+        groups_automerge(RemoteNode)
+    end, nodes()),
+    {noreply, State};
+
 handle_info(Info, State) ->
     error_logger:warning_msg("Syn(~p): Received an unknown info message: ~p~n", [node(), Info]),
     {noreply, State}.
@@ -471,15 +480,15 @@ handle_process_down(GroupName, Pid, Meta, Reason, #state{
             syn_event_handler:do_on_group_process_exit(GroupName, Pid, Meta, Reason, CustomEventHandler)
     end.
 
--spec group_manager_automerge(RemoteNode :: node()) -> ok.
-group_manager_automerge(RemoteNode) ->
+-spec groups_automerge(RemoteNode :: node()) -> ok.
+groups_automerge(RemoteNode) ->
     global:trans({{?MODULE, auto_merge_groups}, self()},
         fun() ->
-            error_logger:info_msg("Syn(~p): GROUP MANAGER AUTOMERGE ----> Initiating for remote node ~p~n", [node(), RemoteNode]),
+            error_logger:info_msg("Syn(~p): GROUPS AUTOMERGE ----> Initiating for remote node ~p~n", [node(), RemoteNode]),
             %% get group tuples from remote node
             case rpc:call(RemoteNode, ?MODULE, sync_get_local_group_tuples, [node()]) of
                 {badrpc, _} ->
-                    error_logger:info_msg("Syn(~p): GROUP MANAGER AUTOMERGE <---- Syn not ready on remote node ~p, aborting~n", [node(), RemoteNode]);
+                    error_logger:info_msg("Syn(~p): GROUPS AUTOMERGE <---- Syn not ready on remote node ~p, postponing~n", [node(), RemoteNode]);
 
                 GroupTuples ->
                     error_logger:info_msg(
@@ -498,7 +507,7 @@ group_manager_automerge(RemoteNode) ->
                         end
                     end, GroupTuples),
                     %% exit
-                    error_logger:info_msg("Syn(~p): GROUP MANAGER AUTOMERGE <---- Done for remote node ~p~n", [node(), RemoteNode])
+                    error_logger:info_msg("Syn(~p): GROUPS AUTOMERGE <---- Done for remote node ~p~n", [node(), RemoteNode])
             end
         end
     ).

+ 54 - 40
src/syn_registry.erl

@@ -132,6 +132,8 @@ init([]) ->
     ok = net_kernel:monitor_nodes(true),
     %% get handler
     CustomEventHandler = syn_backbone:get_event_handler_module(),
+    %% send message to initiate full cluster sync
+    timer:send_after(0, self(), sync_full_cluster),
     %% init
     {ok, #state{
         custom_event_handler = CustomEventHandler
@@ -291,6 +293,13 @@ handle_info({nodedown, RemoteNode}, State) ->
     raw_purge_registry_entries_for_remote_node(RemoteNode),
     {noreply, State};
 
+handle_info(sync_full_cluster, State) ->
+    error_logger:info_msg("Syn(~p): Initiating full cluster registry sync for nodes: ~p~n", [node(), nodes()]),
+    lists:foreach(fun(RemoteNode) ->
+        registry_automerge(RemoteNode, State)
+    end, nodes()),
+    {noreply, State};
+
 handle_info(Info, State) ->
     error_logger:warning_msg("Syn(~p): Received an unknown info message: ~p~n", [node(), Info]),
     {noreply, State}.
@@ -427,46 +436,51 @@ registry_automerge(RemoteNode, State) ->
         fun() ->
             error_logger:info_msg("Syn(~p): REGISTRY AUTOMERGE ----> Initiating for remote node ~p~n", [node(), RemoteNode]),
             %% get registry tuples from remote node
-            RegistryTuples = rpc:call(RemoteNode, ?MODULE, sync_get_local_registry_tuples, [node()]),
-            error_logger:info_msg(
-                "Syn(~p): Received ~p registry tuple(s) from remote node ~p~n",
-                [node(), length(RegistryTuples), RemoteNode]
-            ),
-            %% ensure that registry doesn't have any joining node's entries (here again for race conditions)
-            raw_purge_registry_entries_for_remote_node(RemoteNode),
-            %% loop
-            F = fun({Name, RemotePid, RemoteMeta}) ->
-                %% check if same name is registered
-                case find_process_entry_by_name(Name) of
-                    undefined ->
-                        %% no conflict
-                        case rpc:call(node(RemotePid), erlang, is_process_alive, [RemotePid]) of
-                            true ->
-                                add_to_local_table(Name, RemotePid, RemoteMeta, undefined);
-                            _ ->
-                                ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name])
-                        end;
-
-                    Entry ->
-                        LocalPid = Entry#syn_registry_table.pid,
-                        LocalMeta = Entry#syn_registry_table.meta,
-
-                        error_logger:warning_msg(
-                            "Syn(~p): Conflicting name in auto merge for: ~p, processes are ~p, ~p~n",
-                            [node(), Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta}]
-                        ),
-
-                        CallbackIfLocal = fun() ->
-                            %% keeping local: remote data still on remote node, remove there
-                            ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name])
-                        end,
-                        resolve_conflict(Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta}, CallbackIfLocal, State)
-                end
-            end,
-            %% add to table
-            lists:foreach(F, RegistryTuples),
-            %% exit
-            error_logger:info_msg("Syn(~p): REGISTRY AUTOMERGE <---- Done for remote node ~p~n", [node(), RemoteNode])
+            case rpc:call(RemoteNode, ?MODULE, sync_get_local_registry_tuples, [node()]) of
+                {badrpc, _} ->
+                    error_logger:info_msg("Syn(~p): REGISTRY AUTOMERGE <---- Syn not ready on remote node ~p, postponing~n", [node(), RemoteNode]);
+
+                RegistryTuples ->
+                    error_logger:info_msg(
+                        "Syn(~p): Received ~p registry tuple(s) from remote node ~p~n",
+                        [node(), length(RegistryTuples), RemoteNode]
+                    ),
+                    %% ensure that registry doesn't have any joining node's entries (here again for race conditions)
+                    raw_purge_registry_entries_for_remote_node(RemoteNode),
+                    %% loop
+                    F = fun({Name, RemotePid, RemoteMeta}) ->
+                        %% check if same name is registered
+                        case find_process_entry_by_name(Name) of
+                            undefined ->
+                                %% no conflict
+                                case rpc:call(node(RemotePid), erlang, is_process_alive, [RemotePid]) of
+                                    true ->
+                                        add_to_local_table(Name, RemotePid, RemoteMeta, undefined);
+                                    _ ->
+                                        ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name])
+                                end;
+
+                            Entry ->
+                                LocalPid = Entry#syn_registry_table.pid,
+                                LocalMeta = Entry#syn_registry_table.meta,
+
+                                error_logger:warning_msg(
+                                    "Syn(~p): Conflicting name in auto merge for: ~p, processes are ~p, ~p~n",
+                                    [node(), Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta}]
+                                ),
+
+                                CallbackIfLocal = fun() ->
+                                    %% keeping local: remote data still on remote node, remove there
+                                    ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name])
+                                end,
+                                resolve_conflict(Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta}, CallbackIfLocal, State)
+                        end
+                    end,
+                    %% add to table
+                    lists:foreach(F, RegistryTuples),
+                    %% exit
+                    error_logger:info_msg("Syn(~p): REGISTRY AUTOMERGE <---- Done for remote node ~p~n", [node(), RemoteNode])
+            end
         end
     ).
 

+ 39 - 2
test/syn_groups_SUITE.erl

@@ -47,7 +47,9 @@
     two_nodes_local_members/1,
     two_nodes_publish/1,
     two_nodes_local_publish/1,
-    two_nodes_multicall/1
+    two_nodes_multicall/1,
+    two_nodes_groups_full_cluster_sync_on_boot_node_added_later/1,
+    two_nodes_groups_full_cluster_sync_on_boot_syn_started_later/1
 ]).
 -export([
     three_nodes_partial_netsplit_consistency/1,
@@ -104,7 +106,9 @@ groups() ->
             two_nodes_local_members,
             two_nodes_publish,
             two_nodes_local_publish,
-            two_nodes_multicall
+            two_nodes_multicall,
+            two_nodes_groups_full_cluster_sync_on_boot_node_added_later,
+            two_nodes_groups_full_cluster_sync_on_boot_syn_started_later
         ]},
         {three_nodes_groups, [shuffle], [
             three_nodes_partial_netsplit_consistency,
@@ -726,6 +730,39 @@ two_nodes_multicall(Config) ->
     ]) =:= lists:sort(Replies),
     [PidUnresponsive] = BadPids.
 
+two_nodes_groups_full_cluster_sync_on_boot_node_added_later(_Config) ->
+    %% stop slave
+    syn_test_suite_helper:stop_slave(syn_slave),
+    %% start syn on local node
+    ok = syn:start(),
+    %% start process
+    Pid = syn_test_suite_helper:start_process(),
+    %% register
+    ok = syn:join(<<"group">>, Pid),
+    %% start remote node and syn
+    {ok, SlaveNode} = syn_test_suite_helper:start_slave(syn_slave),
+    ok = rpc:call(SlaveNode, syn, start, []),
+    timer:sleep(1000),
+    %% check
+    [Pid] = syn:get_members(<<"group">>),
+    [Pid] = rpc:call(SlaveNode, syn, get_members, [<<"group">>]).
+
+two_nodes_groups_full_cluster_sync_on_boot_syn_started_later(Config) ->
+    %% get slaves
+    SlaveNode = proplists:get_value(slave_node, Config),
+    %% start syn on local node
+    ok = syn:start(),
+    %% start process
+    Pid = syn_test_suite_helper:start_process(),
+    %% register
+    ok = syn:join(<<"group">>, Pid),
+    %% start ib remote syn
+    ok = rpc:call(SlaveNode, syn, start, []),
+    timer:sleep(500),
+    %% check
+    [Pid] = syn:get_members(<<"group">>),
+    [Pid] = rpc:call(SlaveNode, syn, get_members, [<<"group">>]).
+
 three_nodes_partial_netsplit_consistency(Config) ->
     GroupName = "my group",
     %% get slaves

+ 39 - 2
test/syn_registry_SUITE.erl

@@ -47,7 +47,9 @@
     two_nodes_registry_count/1,
     two_nodes_registration_race_condition_conflict_resolution_keep_local/1,
     two_nodes_registration_race_condition_conflict_resolution_keep_remote/1,
-    two_nodes_registration_race_condition_conflict_resolution_when_process_died/1
+    two_nodes_registration_race_condition_conflict_resolution_when_process_died/1,
+    two_nodes_registry_full_cluster_sync_on_boot_node_added_later/1,
+    two_nodes_registry_full_cluster_sync_on_boot_syn_started_later/1
 ]).
 -export([
     three_nodes_partial_netsplit_consistency/1,
@@ -114,7 +116,9 @@ groups() ->
             two_nodes_registry_count,
             two_nodes_registration_race_condition_conflict_resolution_keep_local,
             two_nodes_registration_race_condition_conflict_resolution_keep_remote,
-            two_nodes_registration_race_condition_conflict_resolution_when_process_died
+            two_nodes_registration_race_condition_conflict_resolution_when_process_died,
+            two_nodes_registry_full_cluster_sync_on_boot_node_added_later,
+            two_nodes_registry_full_cluster_sync_on_boot_syn_started_later
         ]},
         {three_nodes_process_registration, [shuffle], [
             three_nodes_partial_netsplit_consistency,
@@ -559,6 +563,39 @@ two_nodes_registration_race_condition_conflict_resolution_when_process_died(Conf
     %% check that process is alive
     true = rpc:call(SlaveNode, erlang, is_process_alive, [Pid1]).
 
+two_nodes_registry_full_cluster_sync_on_boot_node_added_later(_Config) ->
+    %% stop slave
+    syn_test_suite_helper:stop_slave(syn_slave),
+    %% start syn on local node
+    ok = syn:start(),
+    %% start process
+    Pid = syn_test_suite_helper:start_process(),
+    %% register
+    ok = syn:register(<<"proc">>, Pid),
+    %% start remote node and syn
+    {ok, SlaveNode} = syn_test_suite_helper:start_slave(syn_slave),
+    ok = rpc:call(SlaveNode, syn, start, []),
+    timer:sleep(1000),
+    %% check
+    Pid = syn:whereis(<<"proc">>),
+    Pid = rpc:call(SlaveNode, syn, whereis, [<<"proc">>]).
+
+two_nodes_registry_full_cluster_sync_on_boot_syn_started_later(Config) ->
+    %% get slaves
+    SlaveNode = proplists:get_value(slave_node, Config),
+    %% start syn on local node
+    ok = syn:start(),
+    %% start process
+    Pid = syn_test_suite_helper:start_process(),
+    %% register
+    ok = syn:register(<<"proc">>, Pid),
+    %% start ib remote syn
+    ok = rpc:call(SlaveNode, syn, start, []),
+    timer:sleep(500),
+    %% check
+    Pid = syn:whereis(<<"proc">>),
+    Pid = rpc:call(SlaveNode, syn, whereis, [<<"proc">>]).
+
 three_nodes_partial_netsplit_consistency(Config) ->
     %% get slaves
     SlaveNode1 = proplists:get_value(slave_node_1, Config),