Просмотр исходного кода

Put consistency login in registry for monitoring reasons.

Roberto Ostinelli 5 лет назад
Родитель
Сommit
273f6e15c8
4 измененных файлов с 190 добавлено и 117 удалено
  1. 0 63
      src/syn_backbone.erl
  2. 1 1
      src/syn_records.hrl
  3. 120 45
      src/syn_registry.erl
  4. 69 8
      test/syn_registry_SUITE.erl

+ 0 - 63
src/syn_backbone.erl

@@ -28,7 +28,6 @@
 
 %% API
 -export([start_link/0]).
--export([resume_local_syn_registry/0]).
 
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@@ -47,10 +46,6 @@ start_link() ->
     Options = [],
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
 
--spec resume_local_syn_registry() -> ok.
-resume_local_syn_registry() ->
-    sys:resume(syn_registry).
-
 %% ===================================================================
 %% Callbacks
 %% ===================================================================
@@ -67,8 +62,6 @@ init([]) ->
     error_logger:info_msg("Syn(~p): Creating tables", [node()]),
     case create_ram_tables() of
         ok ->
-            %% monitor nodes
-            ok = net_kernel:monitor_nodes(true),
             %% init
             {ok, #state{}};
         Other ->
@@ -110,27 +103,6 @@ handle_cast(Msg, State) ->
     {noreply, #state{}, Timeout :: non_neg_integer()} |
     {stop, Reason :: any(), #state{}}.
 
-handle_info({nodeup, RemoteNode}, State) ->
-    error_logger:info_msg("Syn(~p): Node ~p has joined the cluster~n", [node(), RemoteNode]),
-    global:trans({{?MODULE, auto_merge_node_up}, self()},
-        fun() ->
-            error_logger:warning_msg("Syn(~p): AUTOMERGE ----> Initiating for remote node ~p~n", [node(), RemoteNode]),
-            %% request remote node process info & suspend remote registry
-            RegistryTuples = rpc:call(RemoteNode, syn_registry, get_local_registry_tuples_and_suspend, [node()]),
-            sync_registry_tuples(RemoteNode, RegistryTuples),
-            error_logger:warning_msg("Syn(~p): AUTOMERGE <---- Done for remote node ~p~n", [node(), RemoteNode])
-        end
-    ),
-    %% resume remote processes able to modify tables
-    ok = rpc:call(RemoteNode, sys, resume, [syn_registry]),
-    %% resume
-    {noreply, State};
-
-handle_info({nodedown, RemoteNode}, State) ->
-    error_logger:warning_msg("Syn(~p): Node ~p has left the cluster, removing its entries on local~n", [node(), RemoteNode]),
-    purge_registry_entries_for_node(RemoteNode),
-    {noreply, State};
-
 handle_info(Info, State) ->
     error_logger:warning_msg("Syn(~p): Received an unknown info message: ~p~n", [node(), Info]),
     {noreply, State}.
@@ -189,38 +161,3 @@ delete_ram_tables() ->
     mnesia:delete_table(syn_registry_table),
     mnesia:delete_table(syn_groups_table),
     ok.
-
-sync_registry_tuples(RemoteNode, RegistryTuples) ->
-    %% ensure that registry doesn't have any joining node's entries
-    purge_registry_entries_for_node(RemoteNode),
-    %% loop
-    F = fun({Name, RemotePid, _RemoteNode, RemoteMeta}) ->
-        %% check if same name is registered
-        case syn_registry:find_process_entry_by_name(Name) of
-            undefined ->
-                %% no conflict
-                ok;
-            Entry ->
-                error_logger:warning_msg(
-                    "Syn(~p): Conflicting name process found for: ~p, processes are ~p, ~p, killing local~n",
-                    [node(), Name, Entry#syn_registry_table.pid, RemotePid]
-                ),
-                %% kill the local one
-                exit(Entry#syn_registry_table.pid, kill)
-        end,
-        %% enqueue registration (to be done on syn_registry for monitor)
-        syn_registry:sync_register(Name, RemotePid, RemoteMeta)
-    end,
-    %% add to table
-    lists:foreach(F, RegistryTuples).
-
--spec purge_registry_entries_for_node(Node :: atom()) -> ok.
-purge_registry_entries_for_node(Node) ->
-    %% build match specs
-    MatchHead = #syn_registry_table{name = '$1', node = '$2', _ = '_'},
-    Guard = {'=:=', '$2', Node},
-    IdFormat = '$1',
-    %% delete
-    NodePids = mnesia:dirty_select(syn_registry_table, [{MatchHead, [Guard], [IdFormat]}]),
-    DelF = fun(Id) -> mnesia:dirty_delete({syn_registry_table, Id}) end,
-    lists:foreach(DelF, NodePids).

+ 1 - 1
src/syn_records.hrl

@@ -37,5 +37,5 @@
     node = undefined :: atom(),
     meta = undefined :: any()
 }).
--type syn_registry_tuple() :: {Name :: term(), Pid :: pid(), Node :: node(), Meta :: term()}.
+-type syn_registry_tuple() :: {Name :: term(), Pid :: pid(), Meta :: term()}.
 -export_type([syn_registry_tuple/0]).

+ 120 - 45
src/syn_registry.erl

@@ -35,10 +35,7 @@
 
 %% sync API
 -export([sync_register/3, sync_unregister/1]).
--export([get_local_registry_tuples_and_suspend/1]).
-
-%% internal
--export([find_process_entry_by_name/1]).
+-export([sync_get_local_registry_tuples/1]).
 
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@@ -113,13 +110,15 @@ sync_register(Name, Pid, Meta) ->
 sync_unregister(Name) ->
     gen_server:cast(?MODULE, {sync_unregister, Name}).
 
--spec get_local_registry_tuples_and_suspend(FromNode :: node()) -> list(syn_registry_tuple()).
-get_local_registry_tuples_and_suspend(FromNode) ->
+-spec sync_get_local_registry_tuples(FromNode :: node()) -> list(syn_registry_tuple()).
+sync_get_local_registry_tuples(FromNode) ->
     error_logger:info_msg("Syn(~p): Received request of local registry tuples from remote node: ~p~n", [node(), FromNode]),
-    %% suspend self to not modify table
-    sys:suspend(?MODULE),
-    %% get tuples
-    get_registry_tuples_of_current_node().
+    %% build match specs
+    MatchHead = #syn_registry_table{name = '$1', pid = '$2', node = '$3', meta = '$4', _ = '_'},
+    Guard = {'=:=', '$3', node()},
+    RegistryTupleFormat = {{'$1', '$2', '$4'}},
+    %% select
+    mnesia:dirty_select(syn_registry_table, [{MatchHead, [Guard], [RegistryTupleFormat]}]).
 
 %% ===================================================================
 %% Callbacks
@@ -137,6 +136,9 @@ init([]) ->
     %% wait for table
     case mnesia:wait_for_tables([syn_registry_table], 10000) of
         ok ->
+            %% monitor nodes
+            ok = net_kernel:monitor_nodes(true),
+            %% init
             {ok, #state{}};
         Reason ->
             {stop, {error_waiting_for_process_registry_table, Reason}}
@@ -160,16 +162,7 @@ handle_call({register_on_node, Name, Pid, Meta}, _From, State) ->
             %% check if name available
             case find_process_entry_by_name(Name) of
                 undefined ->
-                    MonitorRef = case find_processes_entry_by_pid(Pid) of
-                        [] ->
-                            %% process is not monitored yet, add
-                            erlang:monitor(process, Pid);
-                        [Entry | _] ->
-                            Entry#syn_registry_table.monitor_ref
-                    end,
-
-                    %% add to table
-                    register_on_node(Name, Pid, Meta, MonitorRef),
+                    register_on_node(Name, Pid, Meta),
                     %% multicast
                     rpc:eval_everywhere(nodes(), ?MODULE, sync_register, [Name, Pid, Meta]),
                     %% return
@@ -182,12 +175,16 @@ handle_call({register_on_node, Name, Pid, Meta}, _From, State) ->
     end;
 
 handle_call({unregister_on_node, Name}, _From, State) ->
-    %% remove from table
-    unregister_on_node(Name),
-    %% multicast
-    rpc:eval_everywhere(nodes(), ?MODULE, sync_unregister, [Name]),
-    %% return
-    {reply, ok, State};
+    case unregister_on_node(Name) of
+        {error, Error} ->
+            {reply, {error, Error}, State};
+
+        ok ->
+            %% multicast
+            rpc:eval_everywhere(nodes(), ?MODULE, sync_unregister, [Name]),
+            %% return
+            {reply, ok, State}
+    end;
 
 handle_call(Request, From, State) ->
     error_logger:warning_msg("Syn(~p): Received from ~p an unknown call message: ~p~n", [node(), Request, From]),
@@ -203,13 +200,13 @@ handle_call(Request, From, State) ->
 
 handle_cast({sync_register, Name, Pid, Meta}, State) ->
     %% add to table
-    register_on_node(Name, Pid, Meta, undefined),
+    add_to_local_table(Name, Pid, Meta, undefined),
     %% return
     {noreply, State};
 
 handle_cast({sync_unregister, Name}, State) ->
-    %% add to table
-    unregister_on_node(Name),
+    %% remove from table
+    remove_from_local_table(Name),
     %% return
     {noreply, State};
 
@@ -237,8 +234,8 @@ handle_info({'DOWN', _MonitorRef, process, Pid, Reason}, State) ->
                 Name = Entry#syn_registry_table.name,
                 %% log
                 log_process_exit(Name, Pid, Reason),
-                %% delete from table
-                unregister_on_node(Name),
+                %% remove from table
+                remove_from_local_table(Name),
                 %% multicast
                 rpc:eval_everywhere(nodes(), ?MODULE, sync_unregister, [Name])
             end, Entries)
@@ -246,6 +243,30 @@ handle_info({'DOWN', _MonitorRef, process, Pid, Reason}, State) ->
     %% return
     {noreply, State};
 
+handle_info({nodeup, RemoteNode}, State) ->
+    error_logger:info_msg("Syn(~p): Node ~p has joined the cluster~n", [node(), RemoteNode]),
+    global:trans({{?MODULE, auto_merge_node_up}, self()},
+        fun() ->
+            error_logger:warning_msg("Syn(~p): AUTOMERGE ----> Initiating for remote node ~p~n", [node(), RemoteNode]),
+            %% get processes info from remote node
+            RegistryTuples = rpc:call(RemoteNode, ?MODULE, sync_get_local_registry_tuples, [node()]),
+            error_logger:warning_msg(
+                "Syn(~p): Received ~p registry entrie(s) from remote node ~p, writing to local~n",
+                [node(), length(RegistryTuples), RemoteNode]
+            ),
+            sync_registry_tuples(RemoteNode, RegistryTuples),
+            %% exit
+            error_logger:warning_msg("Syn(~p): AUTOMERGE <---- Done for remote node ~p~n", [node(), RemoteNode])
+        end
+    ),
+    %% resume
+    {noreply, State};
+
+handle_info({nodedown, RemoteNode}, State) ->
+    error_logger:warning_msg("Syn(~p): Node ~p has left the cluster, removing its entries on local~n", [node(), RemoteNode]),
+    purge_registry_entries_for_remote_node(RemoteNode),
+    {noreply, State};
+
 handle_info(Info, State) ->
     error_logger:warning_msg("Syn(~p): Received an unknown info message: ~p~n", [node(), Info]),
     {noreply, State}.
@@ -268,9 +289,31 @@ code_change(_OldVsn, State, _Extra) ->
 %% ===================================================================
 %% Internal
 %% ===================================================================
--spec register_on_node(Name :: any(), Pid :: pid(), Node :: atom(), Meta :: any()) -> true.
-register_on_node(Name, Pid, Meta, MonitorRef) ->
+register_on_node(Name, Pid, Meta) ->
+    MonitorRef = case find_processes_entry_by_pid(Pid) of
+        [] ->
+            %% process is not monitored yet, add
+            erlang:monitor(process, Pid);
+        [Entry | _] ->
+            Entry#syn_registry_table.monitor_ref
+    end,
     %% add to table
+    add_to_local_table(Name, Pid, Meta, MonitorRef).
+
+unregister_on_node(Name) ->
+    case find_process_entry_by_name(Name) of
+        undefined ->
+            {error, undefined};
+
+        Entry when Entry#syn_registry_table.monitor_ref =/= undefined ->
+            %% demonitor
+            erlang:demonitor(Entry#syn_registry_table.monitor_ref),
+            %% remove from table
+            remove_from_local_table(Name)
+    end.
+
+-spec add_to_local_table(Name :: any(), Pid :: pid(), Node :: atom(), Meta :: any()) -> true.
+add_to_local_table(Name, Pid, Meta, MonitorRef) ->
     mnesia:dirty_write(#syn_registry_table{
         name = Name,
         pid = Pid,
@@ -279,10 +322,9 @@ register_on_node(Name, Pid, Meta, MonitorRef) ->
         monitor_ref = MonitorRef
     }).
 
--spec unregister_on_node(Name :: any()) -> ok.
-unregister_on_node(Name) ->
+-spec remove_from_local_table(Name :: any()) -> ok.
+remove_from_local_table(Name) ->
     mnesia:dirty_delete(syn_registry_table, Name).
-%% TODO: unmonitor process!
 
 -spec find_processes_entry_by_pid(Pid :: pid()) -> Entries :: list(#syn_registry_table{}).
 find_processes_entry_by_pid(Pid) when is_pid(Pid) ->
@@ -295,15 +337,6 @@ find_process_entry_by_name(Name) ->
         _ -> undefined
     end.
 
--spec get_registry_tuples_of_current_node() -> list(syn_registry_tuple()).
-get_registry_tuples_of_current_node() ->
-    %% build match specs
-    MatchHead = #syn_registry_table{name = '$1', pid = '$2', node = '$3', meta = '$4', _ = '_'},
-    Guard = {'=:=', '$3', node()},
-    RegistryTupleFormat = {{'$1', '$2', '$3', '$4'}},
-    %% select
-    mnesia:dirty_select(syn_registry_table, [{MatchHead, [Guard], [RegistryTupleFormat]}]).
-
 -spec log_process_exit(Name :: term(), Pid :: pid(), Reason :: term()) -> ok.
 log_process_exit(Name, Pid, Reason) ->
     case Reason of
@@ -327,3 +360,45 @@ log_process_exit(Name, Pid, Reason) ->
                     )
             end
     end.
+
+
+sync_registry_tuples(RemoteNode, RegistryTuples) ->
+    %% ensure that registry doesn't have any joining node's entries (here again for race conditions)
+    purge_registry_entries_for_remote_node(RemoteNode),
+    %% loop
+    F = fun({Name, RemotePid, RemoteMeta}) ->
+        %% check if same name is registered
+        case find_process_entry_by_name(Name) of
+            undefined ->
+                %% no conflict
+                register_on_node(Name, RemotePid, RemoteMeta);
+            Entry ->
+                error_logger:warning_msg(
+                    "Syn(~p): Conflicting name process found for: ~p, processes are ~p, ~p~n",
+                    [node(), Name, Entry#syn_registry_table.pid, RemotePid]
+                ),
+                %% remove from local table
+                unregister_on_node(Name),
+                %% remove from remote table
+                ok = rpc:call(RemoteNode, syn_registry, unregister_on_node, [Name]),
+
+                %% TODO: call conflict resolution fun, for now kill the local one
+                exit(Entry#syn_registry_table.pid, kill),
+                register_on_node(Name, RemotePid, RemoteMeta)
+                %% TODO
+        end
+    end,
+    %% add to table
+    lists:foreach(F, RegistryTuples).
+
+-spec purge_registry_entries_for_remote_node(Node :: atom()) -> ok.
+purge_registry_entries_for_remote_node(Node) when Node =/= node() ->
+    %% NB: no demonitoring is done, hence why this needs to run for a remote node
+    %% build match specs
+    MatchHead = #syn_registry_table{name = '$1', node = '$2', _ = '_'},
+    Guard = {'=:=', '$2', Node},
+    IdFormat = '$1',
+    %% delete
+    NodePids = mnesia:dirty_select(syn_registry_table, [{MatchHead, [Guard], [IdFormat]}]),
+    DelF = fun(Id) -> mnesia:dirty_delete({syn_registry_table, Id}) end,
+    lists:foreach(DelF, NodePids).

+ 69 - 8
test/syn_registry_SUITE.erl

@@ -43,8 +43,14 @@
     two_nodes_registry_count/1
 ]).
 -export([
-    three_nodes_consistency_partial_net_split/1,
-    three_nodes_consistency_full_net_split/1
+    three_nodes_partial_netsplit_consistency/1,
+    three_nodes_full_netsplit_consistency/1,
+    three_nodes_start_syn_before_connecting_cluster/1
+]).
+
+%% support
+-export([
+    start_syn_delayed_and_register_local_process/3
 ]).
 
 %% include
@@ -94,8 +100,9 @@ groups() ->
             two_nodes_registry_count
         ]},
         {three_nodes_process_registration, [shuffle], [
-            three_nodes_consistency_partial_net_split,
-            three_nodes_consistency_full_net_split
+            three_nodes_partial_netsplit_consistency,
+            three_nodes_full_netsplit_consistency,
+            three_nodes_start_syn_before_connecting_cluster
         ]}
     ].
 %% -------------------------------------------------------------------
@@ -146,14 +153,16 @@ init_per_group(_GroupName, Config) ->
 end_per_group(two_nodes_process_registration, Config) ->
     SlaveNode = proplists:get_value(slave_node, Config),
     syn_test_suite_helper:connect_node(SlaveNode),
-    syn_test_suite_helper:stop_slave(syn_slave);
+    syn_test_suite_helper:stop_slave(syn_slave),
+    timer:sleep(1000);
 end_per_group(three_nodes_process_registration, Config) ->
     SlaveNode1 = proplists:get_value(slave_node_1, Config),
     syn_test_suite_helper:connect_node(SlaveNode1),
     SlaveNode2 = proplists:get_value(slave_node_2, Config),
     syn_test_suite_helper:connect_node(SlaveNode2),
     syn_test_suite_helper:stop_slave(syn_slave_1),
-    syn_test_suite_helper:stop_slave(syn_slave_2);
+    syn_test_suite_helper:stop_slave(syn_slave_2),
+    timer:sleep(1000);
 end_per_group(_GroupName, _Config) ->
     ok.
 
@@ -350,7 +359,7 @@ two_nodes_registry_count(Config) ->
     %% kill proc
     syn_test_suite_helper:kill_process(RemotePid).
 
-three_nodes_consistency_partial_net_split(Config) ->
+three_nodes_partial_netsplit_consistency(Config) ->
     %% get slaves
     SlaveNode1 = proplists:get_value(slave_node_1, Config),
     SlaveNode2 = proplists:get_value(slave_node_2, Config),
@@ -441,7 +450,7 @@ three_nodes_consistency_partial_net_split(Config) ->
     syn_test_suite_helper:kill_process(Pid1),
     syn_test_suite_helper:kill_process(Pid2).
 
-three_nodes_consistency_full_net_split(Config) ->
+three_nodes_full_netsplit_consistency(Config) ->
     %% get slaves
     SlaveNode1 = proplists:get_value(slave_node_1, Config),
     SlaveNode2 = proplists:get_value(slave_node_2, Config),
@@ -546,3 +555,55 @@ three_nodes_consistency_full_net_split(Config) ->
     syn_test_suite_helper:kill_process(Pid0b),
     syn_test_suite_helper:kill_process(Pid1),
     syn_test_suite_helper:kill_process(Pid2).
+
+three_nodes_start_syn_before_connecting_cluster(Config) ->
+    ConflictingName = "COMMON",
+    %% get slaves
+    SlaveNode1 = proplists:get_value(slave_node_1, Config),
+    SlaveNode2 = proplists:get_value(slave_node_2, Config),
+    %% start processes
+    Pid0 = syn_test_suite_helper:start_process(),
+    Pid1 = syn_test_suite_helper:start_process(SlaveNode1),
+    Pid2 = syn_test_suite_helper:start_process(SlaveNode2),
+    %% start delayed
+    start_syn_delayed_and_register_local_process(ConflictingName, Pid0, 1500),
+    rpc:cast(SlaveNode1, ?MODULE, start_syn_delayed_and_register_local_process, [ConflictingName, Pid1, 1500]),
+    rpc:cast(SlaveNode2, ?MODULE, start_syn_delayed_and_register_local_process, [ConflictingName, Pid2, 1500]),
+    timer:sleep(500),
+    %% disconnect all
+    rpc:call(SlaveNode1, syn_test_suite_helper, disconnect_node, [SlaveNode2]),
+    syn_test_suite_helper:disconnect_node(SlaveNode1),
+    syn_test_suite_helper:disconnect_node(SlaveNode2),
+    timer:sleep(1000),
+    [] = nodes(),
+    %% reconnect all
+    syn_test_suite_helper:connect_node(SlaveNode1),
+    syn_test_suite_helper:connect_node(SlaveNode2),
+    rpc:call(SlaveNode1, syn_test_suite_helper, connect_node, [SlaveNode2]),
+    timer:sleep(1500),
+    %% count
+    1 = syn:registry_count(),
+    1 = rpc:call(SlaveNode1, syn, registry_count, []),
+    1 = rpc:call(SlaveNode2, syn, registry_count, []),
+    %% retrieve
+    true = lists:member(syn:whereis(ConflictingName), [Pid0, Pid1, Pid2]),
+    true = lists:member(rpc:call(SlaveNode1, syn, whereis, [ConflictingName]), [Pid0, Pid1, Pid2]),
+    true = lists:member(rpc:call(SlaveNode2, syn, whereis, [ConflictingName]), [Pid0, Pid1, Pid2]),
+    %% kill processes
+    syn_test_suite_helper:kill_process(Pid0),
+    syn_test_suite_helper:kill_process(Pid1),
+    syn_test_suite_helper:kill_process(Pid2).
+
+%% ===================================================================
+%% Internal
+%% ===================================================================
+start_syn_delayed_and_register_local_process(Name, Pid, Ms) ->
+    spawn(fun() ->
+        lists:foreach(fun(Node) ->
+            syn_test_suite_helper:disconnect_node(Node)
+        end, nodes()),
+        timer:sleep(Ms),
+        [] = nodes(),
+        syn:start(),
+        ok = syn:register(Name, Pid, node())
+    end).