Browse Source

Add syn:force_cluster_sync/1.

Roberto Ostinelli 5 years ago
parent
commit
11d49566cb
6 changed files with 52 additions and 85 deletions
  1. 4 15
      README.md
  2. 6 6
      src/syn.erl
  3. 19 22
      src/syn_groups.erl
  4. 20 23
      src/syn_registry.erl
  5. 1 9
      test/syn_groups_SUITE.erl
  6. 2 10
      test/syn_registry_SUITE.erl

+ 4 - 15
README.md

@@ -2,6 +2,7 @@
 
 
 
+
 [![Build Status](https://travis-ci.org/ostinelli/syn.svg?branch=master)](https://travis-ci.org/ostinelli/syn) [![Hex pm](https://img.shields.io/hexpm/v/syn.svg)](https://hex.pm/packages/syn)
 
 
@@ -523,25 +524,13 @@ In `sys.config` you can specify your anti-entropy settings:
 `interval` specifies in seconds the interval between every anti-entropy syncing, while `interval_max_deviation` the max deviation in seconds from the `interval`. For instance, with an `interval` of 300 seconds and an `interval_max_deviation` of 60, anti-entropy will be called with an interval range of 240 to 360 seconds.
 
 ### Manual sync
-You can force an anti-entropy sync with a remote node by calling the following:
-
-```erlang
-syn:sync_from_node(Module, RemoteNode) -> ok | {error, Reason}.
-
-Types:
-    Module = registry | groups
-    RemoteNode = node()
-    Reason = not_remote_node
-```
-
-This is a unidirectional sync, that will sync the data of all the processes running on `RemoteNode` to the local node. If you want to sync data in both ways, you can do so with a RPC, for instance:
+You can force an anti-entropy sync on the whole cluster by calling the following:
 
 ```erlang
-ok = syn:sync_from_node(registry, 'remote@example.com'),
-ok = rpc:call('remote@example.com', syn, sync_from_node, [registry, node()]).
+syn:force_cluster_sync(registry | groups) -> ok.
 ```
 
-> As per the notes above, in normal conditions Syn doesn't need to be manually synced. Use these functions as a last resort.
+> As per the notes above, in normal conditions Syn doesn't need to be manually synced. This function will force a full mesh sync on all of the cluster. Use it as a last resort.
 
 ## Internals
 As of v2.1, Syn uses ETS for memory storage and doesn't have any external dependency. Syn has its own replication and net splits conflict resolution mechanisms.

+ 6 - 6
src/syn.erl

@@ -41,7 +41,7 @@
 -export([publish/2]).
 -export([publish_to_local/2]).
 -export([multi_call/2, multi_call/3, multi_call_reply/2]).
--export([sync_from_node/2]).
+-export([force_cluster_sync/1]).
 
 %% gen_server via interface
 -export([register_name/2, unregister_name/1, whereis_name/1, send/2]).
@@ -184,8 +184,8 @@ multi_call_reply(CallerPid, Reply) ->
     syn_groups:multi_call_reply(CallerPid, Reply).
 
 %% ----- \/ anti entropy ---------------------------------------------
--spec sync_from_node(registry | groups, RemoteNode :: node()) -> ok | {error, Reason :: any()}.
-sync_from_node(registry, RemoteNode) ->
-    syn_registry:sync_from_node(RemoteNode);
-sync_from_node(groups, RemoteNode) ->
-    syn_groups:sync_from_node(RemoteNode).
+-spec force_cluster_sync(registry | groups) -> ok.
+force_cluster_sync(registry) ->
+    syn_registry:force_cluster_sync();
+force_cluster_sync(groups) ->
+    syn_groups:force_cluster_sync().

+ 19 - 22
src/syn_groups.erl

@@ -41,8 +41,8 @@
 %% sync API
 -export([sync_join/4, sync_leave/3]).
 -export([sync_get_local_groups_tuples/1]).
+-export([force_cluster_sync/0]).
 -export([remove_from_local_table/2]).
--export([sync_from_node/1]).
 
 %% tests
 -ifdef(TEST).
@@ -200,12 +200,11 @@ sync_get_local_groups_tuples(FromNode) ->
     error_logger:info_msg("Syn(~p): Received request of local group tuples from remote node: ~p~n", [node(), FromNode]),
     get_groups_tuples_for_node(node()).
 
--spec sync_from_node(RemoteNode :: node()) -> ok | {error, Reason :: any()}.
-sync_from_node(RemoteNode) ->
-    case RemoteNode =:= node() of
-        true -> {error, not_remote_node};
-        _ -> gen_server:cast(?MODULE, {sync_from_node, RemoteNode})
-    end.
+-spec force_cluster_sync() -> ok.
+force_cluster_sync() ->
+    lists:foreach(fun(RemoteNode) ->
+        gen_server:cast({?MODULE, RemoteNode}, force_cluster_sync)
+    end, [node() | nodes()]).
 
 %% ===================================================================
 %% Callbacks
@@ -235,7 +234,7 @@ init([]) ->
         anti_entropy_interval_max_deviation_ms = AntiEntropyIntervalMaxDeviationMs
     },
     %% send message to initiate full cluster sync
-    timer:send_after(0, self(), sync_full_cluster),
+    timer:send_after(0, self(), sync_from_full_cluster),
     %% start anti-entropy
     set_timer_for_anti_entropy(State),
     %% init
@@ -301,9 +300,9 @@ handle_cast({sync_leave, GroupName, Pid}, State) ->
     %% return
     {noreply, State};
 
-handle_cast({sync_from_node, RemoteNode}, State) ->
-    error_logger:info_msg("Syn(~p): Initiating forced GROUPS sync for node ~p~n", [node(), RemoteNode]),
-    groups_automerge(RemoteNode),
+handle_cast(force_cluster_sync, State) ->
+    error_logger:info_msg("Syn(~p): Initiating full cluster groups FORCED sync for nodes: ~p~n", [node(), nodes()]),
+    do_sync_from_full_cluster(),
     {noreply, State};
 
 handle_cast(Msg, State) ->
@@ -348,17 +347,9 @@ handle_info({nodedown, RemoteNode}, State) ->
     raw_purge_group_entries_for_node(RemoteNode),
     {noreply, State};
 
-handle_info(sync_full_cluster, State) ->
-    case length(nodes()) > 0 of
-        true ->
-            error_logger:info_msg("Syn(~p): Initiating full cluster groups sync for nodes: ~p~n", [node(), nodes()]);
-
-        _ ->
-            ok
-    end,
-    lists:foreach(fun(RemoteNode) ->
-        groups_automerge(RemoteNode)
-    end, nodes()),
+handle_info(sync_from_full_cluster, State) ->
+    error_logger:info_msg("Syn(~p): Initiating full cluster groups sync for nodes: ~p~n", [node(), nodes()]),
+    do_sync_from_full_cluster(),
     {noreply, State};
 
 handle_info(sync_anti_entropy, State) ->
@@ -568,6 +559,12 @@ groups_automerge(RemoteNode) ->
         end
     ).
 
+-spec do_sync_from_full_cluster() -> ok.
+do_sync_from_full_cluster() ->
+    lists:foreach(fun(RemoteNode) ->
+        groups_automerge(RemoteNode)
+    end, nodes()).
+
 -spec raw_purge_group_entries_for_node(Node :: atom()) -> ok.
 raw_purge_group_entries_for_node(Node) ->
     %% NB: no demonitoring is done, this is why it's raw

+ 20 - 23
src/syn_registry.erl

@@ -36,10 +36,10 @@
 
 %% sync API
 -export([sync_register/4, sync_unregister/3]).
+-export([sync_demonitor_and_kill_on_node/5]).
 -export([sync_get_local_registry_tuples/1]).
--export([sync_from_node/1]).
+-export([force_cluster_sync/0]).
 -export([add_to_local_table/4, remove_from_local_table/2]).
--export([sync_demonitor_and_kill_on_node/5]).
 -export([find_monitor_for_pid/1]).
 
 %% gen_server callbacks
@@ -164,12 +164,11 @@ sync_get_local_registry_tuples(FromNode) ->
     error_logger:info_msg("Syn(~p): Received request of local registry tuples from remote node ~p~n", [node(), FromNode]),
     get_registry_tuples_for_node(node()).
 
--spec sync_from_node(RemoteNode :: node()) -> ok | {error, Reason :: any()}.
-sync_from_node(RemoteNode) ->
-    case RemoteNode =:= node() of
-        true -> {error, not_remote_node};
-        _ -> gen_server:cast(?MODULE, {sync_from_node, RemoteNode})
-    end.
+-spec force_cluster_sync() -> ok.
+force_cluster_sync() ->
+    lists:foreach(fun(RemoteNode) ->
+        gen_server:cast({?MODULE, RemoteNode}, force_cluster_sync)
+    end, [node() | nodes()]).
 
 %% ===================================================================
 %% Callbacks
@@ -199,7 +198,7 @@ init([]) ->
         anti_entropy_interval_max_deviation_ms = AntiEntropyIntervalMaxDeviationMs
     },
     %% send message to initiate full cluster sync
-    timer:send_after(0, self(), sync_full_cluster),
+    timer:send_after(0, self(), sync_from_full_cluster),
     %% start anti-entropy
     set_timer_for_anti_entropy(State),
     %% init
@@ -333,9 +332,9 @@ handle_cast({sync_unregister, Name, Pid}, State) ->
     %% return
     {noreply, State};
 
-handle_cast({sync_from_node, RemoteNode}, State) ->
-    error_logger:info_msg("Syn(~p): Initiating REGISTRY forced sync for node ~p~n", [node(), RemoteNode]),
-    registry_automerge(RemoteNode, State),
+handle_cast(force_cluster_sync, State) ->
+    error_logger:info_msg("Syn(~p): Initiating full cluster FORCED registry sync for nodes: ~p~n", [node(), nodes()]),
+    do_sync_from_full_cluster(State),
     {noreply, State};
 
 handle_cast({sync_demonitor_and_kill_on_node, Name, Pid, Meta, MonitorRef, Kill}, State) ->
@@ -394,17 +393,9 @@ handle_info({nodedown, RemoteNode}, State) ->
     raw_purge_registry_entries_for_remote_node(RemoteNode),
     {noreply, State};
 
-handle_info(sync_full_cluster, State) ->
-    case length(nodes()) > 0 of
-        true ->
-            error_logger:info_msg("Syn(~p): Initiating full cluster registry sync for nodes: ~p~n", [node(), nodes()]);
-
-        _ ->
-            ok
-    end,
-    lists:foreach(fun(RemoteNode) ->
-        registry_automerge(RemoteNode, State)
-    end, nodes()),
+handle_info(sync_from_full_cluster, State) ->
+    error_logger:info_msg("Syn(~p): Initiating full cluster registry sync for nodes: ~p~n", [node(), nodes()]),
+    do_sync_from_full_cluster(State),
     {noreply, State};
 
 handle_info(sync_anti_entropy, State) ->
@@ -764,6 +755,12 @@ resolve_conflict(
             undefined
     end.
 
+-spec do_sync_from_full_cluster(#state{}) -> ok.
+do_sync_from_full_cluster(State) ->
+    lists:foreach(fun(RemoteNode) ->
+        registry_automerge(RemoteNode, State)
+    end, nodes()).
+
 -spec raw_purge_registry_entries_for_remote_node(Node :: atom()) -> ok.
 raw_purge_registry_entries_for_remote_node(Node) when Node =/= node() ->
     %% NB: no demonitoring is done, this is why it's raw

+ 1 - 9
test/syn_groups_SUITE.erl

@@ -1148,15 +1148,7 @@ three_nodes_anti_entropy_manual(Config) ->
     ok = rpc:call(SlaveNode2, syn_groups, add_to_local_table, ["my-group", Pid2, SlaveNode2, undefined]),
     ok = rpc:call(SlaveNode2, syn_groups, add_to_local_table, ["my-group-isolated", Pid2Isolated, SlaveNode2, undefined]),
     %% call anti entropy
-    {error, not_remote_node} = syn:sync_from_node(groups, node()),
-    ok = syn:sync_from_node(groups, SlaveNode1),
-    ok = syn:sync_from_node(groups, SlaveNode2),
-    {error, not_remote_node} = rpc:call(SlaveNode1, syn, sync_from_node, [groups, SlaveNode1]),
-    ok = rpc:call(SlaveNode1, syn, sync_from_node, [groups, node()]),
-    ok = rpc:call(SlaveNode1, syn, sync_from_node, [groups, SlaveNode2]),
-    {error, not_remote_node} = rpc:call(SlaveNode2, syn, sync_from_node, [groups, SlaveNode2]),
-    ok = rpc:call(SlaveNode2, syn, sync_from_node, [groups, node()]),
-    ok = rpc:call(SlaveNode2, syn, sync_from_node, [groups, SlaveNode1]),
+    ok = syn:force_cluster_sync(groups),
     timer:sleep(5000),
     %% check
     Members = lists:sort([

+ 2 - 10
test/syn_registry_SUITE.erl

@@ -1087,16 +1087,8 @@ three_nodes_anti_entropy_manual(Config) ->
     ok = rpc:call(SlaveNode1, syn_registry, add_to_local_table, ["conflict", Pid1Conflict, keep_this_one, undefined]),
     ok = rpc:call(SlaveNode2, syn_registry, add_to_local_table, ["conflict", Pid2Conflict, SlaveNode2, undefined]),
     %% call anti entropy
-    {error, not_remote_node} = syn:sync_from_node(registry, node()),
-    ok = syn:sync_from_node(registry, SlaveNode1),
-    ok = syn:sync_from_node(registry, SlaveNode2),
-    {error, not_remote_node} = rpc:call(SlaveNode1, syn, sync_from_node, [registry, SlaveNode1]),
-    ok = rpc:call(SlaveNode1, syn, sync_from_node, [registry, node()]),
-    ok = rpc:call(SlaveNode1, syn, sync_from_node, [registry, SlaveNode2]),
-    {error, not_remote_node} = rpc:call(SlaveNode2, syn, sync_from_node, [registry, SlaveNode2]),
-    ok = rpc:call(SlaveNode2, syn, sync_from_node, [registry, node()]),
-    ok = rpc:call(SlaveNode2, syn, sync_from_node, [registry, SlaveNode1]),
-    timer:sleep(500),
+    ok = syn:force_cluster_sync(registry),
+    timer:sleep(1000),
     %% check
     Node = node(),
     {Pid0, Node} = syn:whereis("pid0", with_meta),