Browse Source

Add anti-entropy for registry.

Roberto Ostinelli 5 years ago
parent
commit
b4bce71874
4 changed files with 141 additions and 6 deletions
  1. 32 0
      src/syn_backbone.erl
  2. 40 4
      src/syn_registry.erl
  3. 53 2
      test/syn_registry_SUITE.erl
  4. 16 0
      test/syn_test_suite_helper.erl

+ 32 - 0
src/syn_backbone.erl

@@ -29,12 +29,14 @@
 %% API
 %% API
 -export([start_link/0]).
 -export([start_link/0]).
 -export([get_event_handler_module/0]).
 -export([get_event_handler_module/0]).
+-export([get_anti_entropy_settings/1]).
 
 
 %% gen_server callbacks
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
 
 
 %% macros
 %% macros
 -define(DEFAULT_EVENT_HANDLER_MODULE, syn_event_handler).
 -define(DEFAULT_EVENT_HANDLER_MODULE, syn_event_handler).
+-define(DEFAULT_ANTI_ENTROPY_MAX_DEVIATION_MS, 60000).
 
 
 %% records
 %% records
 -record(state, {}).
 -record(state, {}).
@@ -59,6 +61,36 @@ get_event_handler_module() ->
     %% return
     %% return
     CustomEventHandler.
     CustomEventHandler.
 
 
+-spec get_anti_entropy_settings(Module :: registry | groups) ->
+    {IntervalMs :: non_neg_integer() | undefined, IntervalMaxDeviationMs :: non_neg_integer() | undefined}.
+get_anti_entropy_settings(Module) ->
+    case application:get_env(syn, anti_entropy, undefined) of
+        undefined ->
+            {undefined, undefined};
+
+        AntiEntropySettings ->
+            case proplists:get_value(Module, AntiEntropySettings) of
+                undefined ->
+                    {undefined, undefined};
+
+                ModSettings ->
+                    case proplists:get_value(interval, ModSettings) of
+                        undefined ->
+                            {undefined, undefined};
+
+                        I ->
+                            IntervalMs = I * 1000,
+                            IntervalMaxDeviationMs = proplists:get_value(
+                                interval_max_deviation,
+                                ModSettings,
+                                ?DEFAULT_ANTI_ENTROPY_MAX_DEVIATION_MS
+                            ) * 1000,
+                            %% return
+                            {IntervalMs, IntervalMaxDeviationMs}
+                    end
+            end
+    end.
+
 %% ===================================================================
 %% ===================================================================
 %% Callbacks
 %% Callbacks
 %% ===================================================================
 %% ===================================================================

+ 40 - 4
src/syn_registry.erl

@@ -43,7 +43,9 @@
 
 
 %% records
 %% records
 -record(state, {
 -record(state, {
-    custom_event_handler = undefined :: module()
+    custom_event_handler = undefined :: module(),
+    anti_entropy_interval_ms = undefined :: non_neg_integer(),
+    anti_entropy_interval_max_deviation_ms = undefined :: non_neg_integer()
 }).
 }).
 
 
 %% includes
 %% includes
@@ -135,12 +137,20 @@ init([]) ->
     rebuild_monitors(),
     rebuild_monitors(),
     %% get handler
     %% get handler
     CustomEventHandler = syn_backbone:get_event_handler_module(),
     CustomEventHandler = syn_backbone:get_event_handler_module(),
+    %% get anti-entropy interval
+    {AntiEntropyIntervalMs, AntiEntropyIntervalMaxDeviationMs} = syn_backbone:get_anti_entropy_settings(registry),
     %% send message to initiate full cluster sync
     %% send message to initiate full cluster sync
     timer:send_after(0, self(), sync_full_cluster),
     timer:send_after(0, self(), sync_full_cluster),
+    %% build state
+    State = #state{
+        custom_event_handler = CustomEventHandler,
+        anti_entropy_interval_ms = AntiEntropyIntervalMs,
+        anti_entropy_interval_max_deviation_ms = AntiEntropyIntervalMaxDeviationMs
+    },
+    %% start anti-entropy
+    set_timer_for_anti_entropy(State),
     %% init
     %% init
-    {ok, #state{
-        custom_event_handler = CustomEventHandler
-    }}.
+    {ok, State}.
 
 
 %% ----------------------------------------------------------------------------------------------------------
 %% ----------------------------------------------------------------------------------------------------------
 %% Call messages
 %% Call messages
@@ -302,6 +312,23 @@ handle_info(sync_full_cluster, State) ->
     end, nodes()),
     end, nodes()),
     {noreply, State};
     {noreply, State};
 
 
+handle_info(sync_anti_entropy, State) ->
+    %% sync
+    RemoteNodes = nodes(),
+    case length(RemoteNodes) > 0 of
+        true ->
+            RandomRemoteNode = lists:nth(rand:uniform(length(RemoteNodes)), RemoteNodes),
+            error_logger:info_msg("Syn(~p): Initiating anti-entropy sync for node ~p~n", [node(), RandomRemoteNode]),
+            registry_automerge(RandomRemoteNode, State);
+
+        _ ->
+            ok
+    end,
+    %% set timer
+    set_timer_for_anti_entropy(State),
+    %% return
+    {noreply, State};
+
 handle_info(Info, State) ->
 handle_info(Info, State) ->
     error_logger:warning_msg("Syn(~p): Received an unknown info message: ~p~n", [node(), Info]),
     error_logger:warning_msg("Syn(~p): Received an unknown info message: ~p~n", [node(), Info]),
     {noreply, State}.
     {noreply, State}.
@@ -651,3 +678,12 @@ rebuild_monitors() ->
         end
         end
     end, Entries).
     end, Entries).
 
 
+-spec set_timer_for_anti_entropy(#state{}) -> ok.
+set_timer_for_anti_entropy(#state{anti_entropy_interval_ms = undefined}) -> ok;
+set_timer_for_anti_entropy(#state{
+    anti_entropy_interval_ms = AntiEntropyIntervalMs,
+    anti_entropy_interval_max_deviation_ms = AntiEntropyIntervalMaxDeviationMs
+}) ->
+    IntervalMs = round(AntiEntropyIntervalMs + rand:uniform() * AntiEntropyIntervalMaxDeviationMs),
+    {ok, _} = timer:send_after(IntervalMs, self(), sync_anti_entropy),
+    ok.

+ 53 - 2
test/syn_registry_SUITE.erl

@@ -56,7 +56,8 @@
     three_nodes_full_netsplit_consistency/1,
     three_nodes_full_netsplit_consistency/1,
     three_nodes_start_syn_before_connecting_cluster_with_conflict/1,
     three_nodes_start_syn_before_connecting_cluster_with_conflict/1,
     three_nodes_start_syn_before_connecting_cluster_with_custom_conflict_resolution/1,
     three_nodes_start_syn_before_connecting_cluster_with_custom_conflict_resolution/1,
-    three_nodes_registration_race_condition_custom_conflict_resolution/1
+    three_nodes_registration_race_condition_custom_conflict_resolution/1,
+    three_nodes_anti_entropy/1
 ]).
 ]).
 
 
 %% support
 %% support
@@ -125,7 +126,8 @@ groups() ->
             three_nodes_full_netsplit_consistency,
             three_nodes_full_netsplit_consistency,
             three_nodes_start_syn_before_connecting_cluster_with_conflict,
             three_nodes_start_syn_before_connecting_cluster_with_conflict,
             three_nodes_start_syn_before_connecting_cluster_with_custom_conflict_resolution,
             three_nodes_start_syn_before_connecting_cluster_with_custom_conflict_resolution,
-            three_nodes_registration_race_condition_custom_conflict_resolution
+            three_nodes_registration_race_condition_custom_conflict_resolution,
+            three_nodes_anti_entropy
         ]}
         ]}
     ].
     ].
 %% -------------------------------------------------------------------
 %% -------------------------------------------------------------------
@@ -938,6 +940,55 @@ three_nodes_registration_race_condition_custom_conflict_resolution(Config) ->
     true = rpc:call(SlaveNode1, erlang, is_process_alive, [Pid1]),
     true = rpc:call(SlaveNode1, erlang, is_process_alive, [Pid1]),
     true = rpc:call(SlaveNode2, erlang, is_process_alive, [Pid2]).
     true = rpc:call(SlaveNode2, erlang, is_process_alive, [Pid2]).
 
 
+three_nodes_anti_entropy(Config) ->
+    %% get slaves
+    SlaveNode1 = proplists:get_value(slave_node_1, Config),
+    SlaveNode2 = proplists:get_value(slave_node_2, Config),
+    %% use customer handler
+    syn_test_suite_helper:use_custom_handler(),
+    rpc:call(SlaveNode1, syn_test_suite_helper, use_custom_handler, []),
+    rpc:call(SlaveNode2, syn_test_suite_helper, use_custom_handler, []),
+    %% set anti-entropy with a very low interval (0.25 second)
+    syn_test_suite_helper:use_anti_entropy(registry, 0.25),
+    rpc:call(SlaveNode1, syn_test_suite_helper, use_anti_entropy, [registry, 0.25]),
+    rpc:call(SlaveNode2, syn_test_suite_helper, use_anti_entropy, [registry, 0.25]),
+    %% start syn on nodes
+    ok = syn:start(),
+    ok = rpc:call(SlaveNode1, syn, start, []),
+    ok = rpc:call(SlaveNode2, syn, start, []),
+    timer:sleep(100),
+    %% start processes
+    Pid0 = syn_test_suite_helper:start_process(),
+    Pid1 = syn_test_suite_helper:start_process(SlaveNode1),
+    Pid2 = syn_test_suite_helper:start_process(SlaveNode2),
+    Pid0Conflict = syn_test_suite_helper:start_process(),
+    Pid1Conflict = syn_test_suite_helper:start_process(SlaveNode1),
+    Pid2Conflict = syn_test_suite_helper:start_process(SlaveNode2),
+    timer:sleep(100),
+    %% inject data to simulate latent conflicts
+    ok = syn_registry:add_to_local_table("pid0", Pid0, node(), undefined),
+    ok = rpc:call(SlaveNode1, syn_registry, add_to_local_table, ["pid1", Pid1, SlaveNode1, undefined]),
+    ok = rpc:call(SlaveNode2, syn_registry, add_to_local_table, ["pid2", Pid2, SlaveNode2, undefined]),
+    ok = syn_registry:add_to_local_table("conflict", Pid0Conflict, node(), undefined),
+    ok = rpc:call(SlaveNode1, syn_registry, add_to_local_table, ["conflict", Pid1Conflict, keep_this_one, undefined]),
+    ok = rpc:call(SlaveNode2, syn_registry, add_to_local_table, ["conflict", Pid2Conflict, SlaveNode2, undefined]),
+    %% wait to let anti-entropy settle
+    timer:sleep(5000),
+    %% check
+    Node = node(),
+    {Pid0, Node} = syn:whereis("pid0", with_meta),
+    {Pid1, SlaveNode1} = syn:whereis("pid1", with_meta),
+    {Pid2, SlaveNode2} = syn:whereis("pid2", with_meta),
+    {Pid1Conflict, keep_this_one} = syn:whereis("conflict", with_meta),
+    {Pid0, Node} = rpc:call(SlaveNode1, syn, whereis, ["pid0", with_meta]),
+    {Pid1, SlaveNode1} = rpc:call(SlaveNode1, syn, whereis, ["pid1", with_meta]),
+    {Pid2, SlaveNode2} = rpc:call(SlaveNode1, syn, whereis, ["pid2", with_meta]),
+    {Pid1Conflict, keep_this_one} = rpc:call(SlaveNode1, syn, whereis, ["conflict", with_meta]),
+    {Pid0, Node} = rpc:call(SlaveNode2, syn, whereis, ["pid0", with_meta]),
+    {Pid1, SlaveNode1} = rpc:call(SlaveNode2, syn, whereis, ["pid1", with_meta]),
+    {Pid2, SlaveNode2} = rpc:call(SlaveNode2, syn, whereis, ["pid2", with_meta]),
+    {Pid1Conflict, keep_this_one} = rpc:call(SlaveNode2, syn, whereis, ["conflict", with_meta]).
+
 %% ===================================================================
 %% ===================================================================
 %% Internal
 %% Internal
 %% ===================================================================
 %% ===================================================================

+ 16 - 0
test/syn_test_suite_helper.erl

@@ -33,6 +33,7 @@
 -export([start_process/0, start_process/1, start_process/2]).
 -export([start_process/0, start_process/1, start_process/2]).
 -export([kill_process/1]).
 -export([kill_process/1]).
 -export([use_custom_handler/0]).
 -export([use_custom_handler/0]).
+-export([use_anti_entropy/2]).
 -export([send_error_logger_to_disk/0]).
 -export([send_error_logger_to_disk/0]).
 
 
 %% internal
 %% internal
@@ -98,6 +99,21 @@ kill_process(Pid) ->
 use_custom_handler() ->
 use_custom_handler() ->
     application:set_env(syn, event_handler, syn_test_event_handler).
     application:set_env(syn, event_handler, syn_test_event_handler).
 
 
+use_anti_entropy(registry, Interval) ->
+    application:set_env(syn, anti_entropy, [
+        {registry, [
+            {interval, Interval},
+            {interval_max_deviation, 0.1}
+        ]}
+    ]);
+use_anti_entropy(groups, Interval) ->
+    application:set_env(syn, anti_entropy, [
+        {groups, [
+            {interval, Interval},
+            {interval_max_deviation, 0.1}
+        ]}
+    ]).
+
 send_error_logger_to_disk() ->
 send_error_logger_to_disk() ->
     error_logger:logfile({open, atom_to_list(node())}).
     error_logger:logfile({open, atom_to_list(node())}).