Browse Source

Add anti-entropy for groups.

Roberto Ostinelli 5 years ago
parent
commit
a393ebaa22
3 changed files with 90 additions and 8 deletions
  1. 46 4
      src/syn_groups.erl
  2. 2 2
      src/syn_registry.erl
  3. 42 2
      test/syn_groups_SUITE.erl

+ 46 - 4
src/syn_groups.erl

@@ -43,6 +43,11 @@
 -export([sync_get_local_group_tuples/1]).
 -export([remove_from_local_table/2]).
 
+%% tests
+-ifdef(TEST).
+-export([add_to_local_table/4]).
+-endif.
+
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
 
@@ -51,7 +56,9 @@
 
 %% records
 -record(state, {
-    custom_event_handler = undefined :: module()
+    custom_event_handler = undefined :: module(),
+    anti_entropy_interval_ms = undefined :: non_neg_integer(),
+    anti_entropy_interval_max_deviation_ms = undefined :: non_neg_integer()
 }).
 
 %% macros
@@ -211,12 +218,20 @@ init([]) ->
     rebuild_monitors(),
     %% get handler
     CustomEventHandler = syn_backbone:get_event_handler_module(),
+    %% get anti-entropy interval
+    {AntiEntropyIntervalMs, AntiEntropyIntervalMaxDeviationMs} = syn_backbone:get_anti_entropy_settings(groups),
+    %% build state
+    State = #state{
+        custom_event_handler = CustomEventHandler,
+        anti_entropy_interval_ms = AntiEntropyIntervalMs,
+        anti_entropy_interval_max_deviation_ms = AntiEntropyIntervalMaxDeviationMs
+    },
     %% send message to initiate full cluster sync
     timer:send_after(0, self(), sync_full_cluster),
+    %% start anti-entropy
+    set_timer_for_anti_entropy(State),
     %% init
-    {ok, #state{
-        custom_event_handler = CustomEventHandler
-    }}.
+    {ok, State}.
 
 %% ----------------------------------------------------------------------------------------------------------
 %% Call messages
@@ -333,6 +348,23 @@ handle_info(sync_full_cluster, State) ->
     end, nodes()),
     {noreply, State};
 
+handle_info(sync_anti_entropy, State) ->
+    %% sync
+    RemoteNodes = nodes(),
+    case length(RemoteNodes) > 0 of
+        true ->
+            RandomRemoteNode = lists:nth(rand:uniform(length(RemoteNodes)), RemoteNodes),
+            error_logger:info_msg("Syn(~p): Initiating anti-entropy sync for node ~p~n", [node(), RandomRemoteNode]),
+            groups_automerge(RandomRemoteNode);
+
+        _ ->
+            ok
+    end,
+    %% set timer
+    set_timer_for_anti_entropy(State),
+    %% return
+    {noreply, State};
+
 handle_info(Info, State) ->
     error_logger:warning_msg("Syn(~p): Received an unknown info message: ~p~n", [node(), Info]),
     {noreply, State}.
@@ -580,3 +612,13 @@ rebuild_monitors() ->
                 multicast_leave(GroupName, Pid)
         end
     end, GroupTuples).
+
+-spec set_timer_for_anti_entropy(#state{}) -> ok.
+set_timer_for_anti_entropy(#state{anti_entropy_interval_ms = undefined}) -> ok;
+set_timer_for_anti_entropy(#state{
+    anti_entropy_interval_ms = AntiEntropyIntervalMs,
+    anti_entropy_interval_max_deviation_ms = AntiEntropyIntervalMaxDeviationMs
+}) ->
+    IntervalMs = round(AntiEntropyIntervalMs + rand:uniform() * AntiEntropyIntervalMaxDeviationMs),
+    {ok, _} = timer:send_after(IntervalMs, self(), sync_anti_entropy),
+    ok.

+ 2 - 2
src/syn_registry.erl

@@ -139,14 +139,14 @@ init([]) ->
     CustomEventHandler = syn_backbone:get_event_handler_module(),
     %% get anti-entropy interval
     {AntiEntropyIntervalMs, AntiEntropyIntervalMaxDeviationMs} = syn_backbone:get_anti_entropy_settings(registry),
-    %% send message to initiate full cluster sync
-    timer:send_after(0, self(), sync_full_cluster),
     %% build state
     State = #state{
         custom_event_handler = CustomEventHandler,
         anti_entropy_interval_ms = AntiEntropyIntervalMs,
         anti_entropy_interval_max_deviation_ms = AntiEntropyIntervalMaxDeviationMs
     },
+    %% send message to initiate full cluster sync
+    timer:send_after(0, self(), sync_full_cluster),
     %% start anti-entropy
     set_timer_for_anti_entropy(State),
     %% init

+ 42 - 2
test/syn_groups_SUITE.erl

@@ -49,7 +49,8 @@
     two_nodes_local_publish/1,
     two_nodes_multicall/1,
     two_nodes_groups_full_cluster_sync_on_boot_node_added_later/1,
-    two_nodes_groups_full_cluster_sync_on_boot_syn_started_later/1
+    two_nodes_groups_full_cluster_sync_on_boot_syn_started_later/1,
+    three_nodes_anti_entropy/1
 ]).
 -export([
     three_nodes_partial_netsplit_consistency/1,
@@ -111,7 +112,8 @@ groups() ->
         ]},
         {three_nodes_groups, [shuffle], [
             three_nodes_partial_netsplit_consistency,
-            three_nodes_full_netsplit_consistency
+            three_nodes_full_netsplit_consistency,
+            three_nodes_anti_entropy
         ]}
     ].
 %% -------------------------------------------------------------------
@@ -1083,3 +1085,41 @@ three_nodes_full_netsplit_consistency(Config) ->
     true = rpc:call(SlaveNode2, syn, member, [Pid1, GroupName]),
     true = rpc:call(SlaveNode2, syn, member, [Pid2, GroupName]),
     false = rpc:call(SlaveNode2, syn, member, [OtherPid, GroupName]).
+
+three_nodes_anti_entropy(Config) ->
+    %% get slaves
+    SlaveNode1 = proplists:get_value(slave_node_1, Config),
+    SlaveNode2 = proplists:get_value(slave_node_2, Config),
+    %% set anti-entropy with a very low interval (0.25 second)
+    syn_test_suite_helper:use_anti_entropy(groups, 0.25),
+    rpc:call(SlaveNode1, syn_test_suite_helper, use_anti_entropy, [groups, 0.25]),
+    rpc:call(SlaveNode2, syn_test_suite_helper, use_anti_entropy, [groups, 0.25]),
+    %% start syn on nodes
+    ok = syn:start(),
+    ok = rpc:call(SlaveNode1, syn, start, []),
+    ok = rpc:call(SlaveNode2, syn, start, []),
+    timer:sleep(100),
+    %% start processes
+    Pid0 = syn_test_suite_helper:start_process(),
+    Pid1 = syn_test_suite_helper:start_process(SlaveNode1),
+    Pid2 = syn_test_suite_helper:start_process(SlaveNode2),
+    Pid2Isolated = syn_test_suite_helper:start_process(SlaveNode2),
+    timer:sleep(100),
+    %% inject data to simulate latent conflicts
+    ok = syn_groups:add_to_local_table("my-group", Pid0, node(), undefined),
+    ok = rpc:call(SlaveNode1, syn_groups, add_to_local_table, ["my-group", Pid1, SlaveNode1, undefined]),
+    ok = rpc:call(SlaveNode2, syn_groups, add_to_local_table, ["my-group", Pid2, SlaveNode2, undefined]),
+    ok = rpc:call(SlaveNode2, syn_groups, add_to_local_table, ["my-group-isolated", Pid2Isolated, SlaveNode2, undefined]),
+    timer:sleep(5000),
+    %% check
+    Members = lists:sort([
+        {Pid0, node()},
+        {Pid1, SlaveNode1},
+        {Pid2, SlaveNode2}
+    ]),
+    Members = syn:get_members("my-group", with_meta),
+    Members = rpc:call(SlaveNode1, syn, get_members, ["my-group", with_meta]),
+    Members = rpc:call(SlaveNode2, syn, get_members, ["my-group", with_meta]),
+    [{Pid2Isolated, SlaveNode2}] = syn:get_members("my-group-isolated", with_meta),
+    [{Pid2Isolated, SlaveNode2}] = rpc:call(SlaveNode1, syn, get_members, ["my-group-isolated", with_meta]),
+    [{Pid2Isolated, SlaveNode2}] = rpc:call(SlaveNode2, syn, get_members, ["my-group-isolated", with_meta]).