Browse Source

Allow to manually start a sync from a remote node.

Roberto Ostinelli 5 years ago
parent
commit
5ef070b948
5 changed files with 140 additions and 4 deletions
  1. 8 0
      src/syn.erl
  2. 13 0
      src/syn_groups.erl
  3. 13 0
      src/syn_registry.erl
  4. 48 2
      test/syn_groups_SUITE.erl
  5. 58 2
      test/syn_registry_SUITE.erl

+ 8 - 0
src/syn.erl

@@ -40,6 +40,7 @@
 -export([publish/2]).
 -export([publish_to_local/2]).
 -export([multi_call/2, multi_call/3, multi_call_reply/2]).
+-export([sync_from_node/2]).
 
 %% gen_server via interface
 -export([register_name/2, unregister_name/1, whereis_name/1, send/2]).
@@ -172,3 +173,10 @@ multi_call(GroupName, Message, Timeout) ->
 -spec multi_call_reply(CallerPid :: pid(), Reply :: any()) -> {syn_multi_call_reply, pid(), Reply :: any()}.
 multi_call_reply(CallerPid, Reply) ->
     syn_groups:multi_call_reply(CallerPid, Reply).
+
+%% ----- \/ anti entropy ---------------------------------------------
+-spec sync_from_node(registry | groups, RemoteNode :: node()) -> ok | {error, Reason :: any()}.
+sync_from_node(registry, RemoteNode) ->
+    syn_registry:sync_from_node(RemoteNode);
+sync_from_node(groups, RemoteNode) ->
+    syn_groups:sync_from_node(RemoteNode).

+ 13 - 0
src/syn_groups.erl

@@ -42,6 +42,7 @@
 -export([sync_join/4, sync_leave/3]).
 -export([sync_get_local_group_tuples/1]).
 -export([remove_from_local_table/2]).
+-export([sync_from_node/1]).
 
 %% tests
 -ifdef(TEST).
@@ -199,6 +200,13 @@ sync_get_local_group_tuples(FromNode) ->
     error_logger:info_msg("Syn(~p): Received request of local group tuples from remote node: ~p~n", [node(), FromNode]),
     get_group_tuples_for_node(node()).
 
+-spec sync_from_node(RemoteNode :: node()) -> ok | {error, Reason :: any()}.
+sync_from_node(RemoteNode) ->
+    case RemoteNode =:= node() of
+        true -> {error, not_remote_node};
+        _ -> gen_server:cast(?MODULE, {sync_from_node, RemoteNode})
+    end.
+
 %% ===================================================================
 %% Callbacks
 %% ===================================================================
@@ -293,6 +301,11 @@ handle_cast({sync_leave, GroupName, Pid}, State) ->
     %% return
     {noreply, State};
 
+handle_cast({sync_from_node, RemoteNode}, State) ->
+    error_logger:info_msg("Syn(~p): Initiating forced GROUPS sync for node ~p~n", [node(), RemoteNode]),
+    groups_automerge(RemoteNode),
+    {noreply, State};
+
 handle_cast(Msg, State) ->
     error_logger:warning_msg("Syn(~p): Received an unknown cast message: ~p~n", [node(), Msg]),
     {noreply, State}.

+ 13 - 0
src/syn_registry.erl

@@ -37,6 +37,7 @@
 -export([sync_register/4, sync_unregister/2]).
 -export([sync_get_local_registry_tuples/1]).
 -export([add_to_local_table/4, remove_from_local_table/1]).
+-export([sync_from_node/1]).
 
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@@ -118,6 +119,13 @@ sync_get_local_registry_tuples(FromNode) ->
     error_logger:info_msg("Syn(~p): Received request of local registry tuples from remote node ~p~n", [node(), FromNode]),
     get_registry_tuples_for_node(node()).
 
+-spec sync_from_node(RemoteNode :: node()) -> ok | {error, Reason :: any()}.
+sync_from_node(RemoteNode) ->
+    case RemoteNode =:= node() of
+        true -> {error, not_remote_node};
+        _ -> gen_server:cast(?MODULE, {sync_from_node, RemoteNode})
+    end.
+
 %% ===================================================================
 %% Callbacks
 %% ===================================================================
@@ -257,6 +265,11 @@ handle_cast({sync_unregister, Name}, State) ->
     %% return
     {noreply, State};
 
+handle_cast({sync_from_node, RemoteNode}, State) ->
+    error_logger:info_msg("Syn(~p): Initiating REGISTRY forced sync for node ~p~n", [node(), RemoteNode]),
+    registry_automerge(RemoteNode, State),
+    {noreply, State};
+
 handle_cast(Msg, State) ->
     error_logger:warning_msg("Syn(~p): Received an unknown cast message: ~p~n", [node(), Msg]),
     {noreply, State}.

+ 48 - 2
test/syn_groups_SUITE.erl

@@ -50,7 +50,8 @@
     two_nodes_multicall/1,
     two_nodes_groups_full_cluster_sync_on_boot_node_added_later/1,
     two_nodes_groups_full_cluster_sync_on_boot_syn_started_later/1,
-    three_nodes_anti_entropy/1
+    three_nodes_anti_entropy/1,
+    three_nodes_anti_entropy_manual/1
 ]).
 -export([
     three_nodes_partial_netsplit_consistency/1,
@@ -113,7 +114,8 @@ groups() ->
         {three_nodes_groups, [shuffle], [
             three_nodes_partial_netsplit_consistency,
             three_nodes_full_netsplit_consistency,
-            three_nodes_anti_entropy
+            three_nodes_anti_entropy,
+            three_nodes_anti_entropy_manual
         ]}
     ].
 %% -------------------------------------------------------------------
@@ -1123,3 +1125,47 @@ three_nodes_anti_entropy(Config) ->
     [{Pid2Isolated, SlaveNode2}] = syn:get_members("my-group-isolated", with_meta),
     [{Pid2Isolated, SlaveNode2}] = rpc:call(SlaveNode1, syn, get_members, ["my-group-isolated", with_meta]),
     [{Pid2Isolated, SlaveNode2}] = rpc:call(SlaveNode2, syn, get_members, ["my-group-isolated", with_meta]).
+
+three_nodes_anti_entropy_manual(Config) ->
+    %% get slaves
+    SlaveNode1 = proplists:get_value(slave_node_1, Config),
+    SlaveNode2 = proplists:get_value(slave_node_2, Config),
+    %% start syn on nodes
+    ok = syn:start(),
+    ok = rpc:call(SlaveNode1, syn, start, []),
+    ok = rpc:call(SlaveNode2, syn, start, []),
+    timer:sleep(100),
+    %% start processes
+    Pid0 = syn_test_suite_helper:start_process(),
+    Pid1 = syn_test_suite_helper:start_process(SlaveNode1),
+    Pid2 = syn_test_suite_helper:start_process(SlaveNode2),
+    Pid2Isolated = syn_test_suite_helper:start_process(SlaveNode2),
+    timer:sleep(100),
+    %% inject data to simulate latent conflicts
+    ok = syn_groups:add_to_local_table("my-group", Pid0, node(), undefined),
+    ok = rpc:call(SlaveNode1, syn_groups, add_to_local_table, ["my-group", Pid1, SlaveNode1, undefined]),
+    ok = rpc:call(SlaveNode2, syn_groups, add_to_local_table, ["my-group", Pid2, SlaveNode2, undefined]),
+    ok = rpc:call(SlaveNode2, syn_groups, add_to_local_table, ["my-group-isolated", Pid2Isolated, SlaveNode2, undefined]),
+    %% call anti entropy
+    {error, not_remote_node} = syn:sync_from_node(groups, node()),
+    ok = syn:sync_from_node(groups, SlaveNode1),
+    ok = syn:sync_from_node(groups, SlaveNode2),
+    {error, not_remote_node} = rpc:call(SlaveNode1, syn, sync_from_node, [groups, SlaveNode1]),
+    ok = rpc:call(SlaveNode1, syn, sync_from_node, [groups, node()]),
+    ok = rpc:call(SlaveNode1, syn, sync_from_node, [groups, SlaveNode2]),
+    {error, not_remote_node} = rpc:call(SlaveNode2, syn, sync_from_node, [groups, SlaveNode2]),
+    ok = rpc:call(SlaveNode2, syn, sync_from_node, [groups, node()]),
+    ok = rpc:call(SlaveNode2, syn, sync_from_node, [groups, SlaveNode1]),
+    timer:sleep(500),
+    %% check
+    Members = lists:sort([
+        {Pid0, node()},
+        {Pid1, SlaveNode1},
+        {Pid2, SlaveNode2}
+    ]),
+    Members = syn:get_members("my-group", with_meta),
+    Members = rpc:call(SlaveNode1, syn, get_members, ["my-group", with_meta]),
+    Members = rpc:call(SlaveNode2, syn, get_members, ["my-group", with_meta]),
+    [{Pid2Isolated, SlaveNode2}] = syn:get_members("my-group-isolated", with_meta),
+    [{Pid2Isolated, SlaveNode2}] = rpc:call(SlaveNode1, syn, get_members, ["my-group-isolated", with_meta]),
+    [{Pid2Isolated, SlaveNode2}] = rpc:call(SlaveNode2, syn, get_members, ["my-group-isolated", with_meta]).

+ 58 - 2
test/syn_registry_SUITE.erl

@@ -57,7 +57,8 @@
     three_nodes_start_syn_before_connecting_cluster_with_conflict/1,
     three_nodes_start_syn_before_connecting_cluster_with_custom_conflict_resolution/1,
     three_nodes_registration_race_condition_custom_conflict_resolution/1,
-    three_nodes_anti_entropy/1
+    three_nodes_anti_entropy/1,
+    three_nodes_anti_entropy_manual/1
 ]).
 
 %% support
@@ -127,7 +128,8 @@ groups() ->
             three_nodes_start_syn_before_connecting_cluster_with_conflict,
             three_nodes_start_syn_before_connecting_cluster_with_custom_conflict_resolution,
             three_nodes_registration_race_condition_custom_conflict_resolution,
-            three_nodes_anti_entropy
+            three_nodes_anti_entropy,
+            three_nodes_anti_entropy_manual
         ]}
     ].
 %% -------------------------------------------------------------------
@@ -989,6 +991,60 @@ three_nodes_anti_entropy(Config) ->
     {Pid2, SlaveNode2} = rpc:call(SlaveNode2, syn, whereis, ["pid2", with_meta]),
     {Pid1Conflict, keep_this_one} = rpc:call(SlaveNode2, syn, whereis, ["conflict", with_meta]).
 
+three_nodes_anti_entropy_manual(Config) ->
+    %% get slaves
+    SlaveNode1 = proplists:get_value(slave_node_1, Config),
+    SlaveNode2 = proplists:get_value(slave_node_2, Config),
+    %% use customer handler
+    syn_test_suite_helper:use_custom_handler(),
+    rpc:call(SlaveNode1, syn_test_suite_helper, use_custom_handler, []),
+    rpc:call(SlaveNode2, syn_test_suite_helper, use_custom_handler, []),
+    %% start syn on nodes
+    ok = syn:start(),
+    ok = rpc:call(SlaveNode1, syn, start, []),
+    ok = rpc:call(SlaveNode2, syn, start, []),
+    timer:sleep(100),
+    %% start processes
+    Pid0 = syn_test_suite_helper:start_process(),
+    Pid1 = syn_test_suite_helper:start_process(SlaveNode1),
+    Pid2 = syn_test_suite_helper:start_process(SlaveNode2),
+    Pid0Conflict = syn_test_suite_helper:start_process(),
+    Pid1Conflict = syn_test_suite_helper:start_process(SlaveNode1),
+    Pid2Conflict = syn_test_suite_helper:start_process(SlaveNode2),
+    timer:sleep(100),
+    %% inject data to simulate latent conflicts
+    ok = syn_registry:add_to_local_table("pid0", Pid0, node(), undefined),
+    ok = rpc:call(SlaveNode1, syn_registry, add_to_local_table, ["pid1", Pid1, SlaveNode1, undefined]),
+    ok = rpc:call(SlaveNode2, syn_registry, add_to_local_table, ["pid2", Pid2, SlaveNode2, undefined]),
+    ok = syn_registry:add_to_local_table("conflict", Pid0Conflict, node(), undefined),
+    ok = rpc:call(SlaveNode1, syn_registry, add_to_local_table, ["conflict", Pid1Conflict, keep_this_one, undefined]),
+    ok = rpc:call(SlaveNode2, syn_registry, add_to_local_table, ["conflict", Pid2Conflict, SlaveNode2, undefined]),
+    %% call anti entropy
+    {error, not_remote_node} = syn:sync_from_node(registry, node()),
+    ok = syn:sync_from_node(registry, SlaveNode1),
+    ok = syn:sync_from_node(registry, SlaveNode2),
+    {error, not_remote_node} = rpc:call(SlaveNode1, syn, sync_from_node, [registry, SlaveNode1]),
+    ok = rpc:call(SlaveNode1, syn, sync_from_node, [registry, node()]),
+    ok = rpc:call(SlaveNode1, syn, sync_from_node, [registry, SlaveNode2]),
+    {error, not_remote_node} = rpc:call(SlaveNode2, syn, sync_from_node, [registry, SlaveNode2]),
+    ok = rpc:call(SlaveNode2, syn, sync_from_node, [registry, node()]),
+    ok = rpc:call(SlaveNode2, syn, sync_from_node, [registry, SlaveNode1]),
+    timer:sleep(500),
+    %% check
+    Node = node(),
+    {Pid0, Node} = syn:whereis("pid0", with_meta),
+    {Pid1, SlaveNode1} = syn:whereis("pid1", with_meta),
+    {Pid2, SlaveNode2} = syn:whereis("pid2", with_meta),
+    {Pid1Conflict, keep_this_one} = syn:whereis("conflict", with_meta),
+    {Pid0, Node} = rpc:call(SlaveNode1, syn, whereis, ["pid0", with_meta]),
+    {Pid1, SlaveNode1} = rpc:call(SlaveNode1, syn, whereis, ["pid1", with_meta]),
+    {Pid2, SlaveNode2} = rpc:call(SlaveNode1, syn, whereis, ["pid2", with_meta]),
+    {Pid1Conflict, keep_this_one} = rpc:call(SlaveNode1, syn, whereis, ["conflict", with_meta]),
+    {Pid0, Node} = rpc:call(SlaveNode2, syn, whereis, ["pid0", with_meta]),
+    {Pid1, SlaveNode1} = rpc:call(SlaveNode2, syn, whereis, ["pid1", with_meta]),
+    {Pid2, SlaveNode2} = rpc:call(SlaveNode2, syn, whereis, ["pid2", with_meta]),
+    {Pid1Conflict, keep_this_one} = rpc:call(SlaveNode2, syn, whereis, ["conflict", with_meta]).
+
 %% ===================================================================
 %% Internal
 %% ===================================================================