Просмотр исходного кода

Resolve conflicts for registration race conditions.

Roberto Ostinelli 5 лет назад
Родитель
Сommit
7db6714104
2 измененных файлов с 162 добавлено и 13 удалено
  1. 102 6
      src/syn_registry.erl
  2. 60 7
      test/syn_registry_SUITE.erl

+ 102 - 6
src/syn_registry.erl

@@ -35,6 +35,7 @@
 
 %% sync API
 -export([sync_get_local_registry_tuples/1]).
+-export([add_remote_to_local_table/3]).
 -export([add_to_local_table/4]).
 -export([remove_from_local_table/1]).
 
@@ -274,7 +275,7 @@ code_change(_OldVsn, State, _Extra) ->
 -spec multicast_register(Name :: any(), Pid :: pid(), Meta :: any()) -> pid().
 multicast_register(Name, Pid, Meta) ->
     spawn_link(fun() ->
-        rpc:eval_everywhere(nodes(), ?MODULE, add_to_local_table, [Name, Pid, Meta, undefined])
+        rpc:eval_everywhere(nodes(), ?MODULE, add_remote_to_local_table, [Name, Pid, Meta])
     end).
 
 -spec multicast_unregister(Name :: any()) -> pid().
@@ -318,6 +319,86 @@ add_to_local_table(Name, Pid, Meta, MonitorRef) ->
         monitor_ref = MonitorRef
     }).
 
+-spec add_remote_to_local_table(Name :: any(), RemotePid :: pid(), RemoteMeta :: any()) -> ok.
+add_remote_to_local_table(Name, RemotePid, RemoteMeta) ->
+    %% check for conflicts
+    case find_process_entry_by_name(Name) of
+        undefined ->
+            %% no conflict
+            add_to_local_table(Name, RemotePid, RemoteMeta, undefined);
+        _Entry ->
+            %% conflict found, enter resolution
+            global:trans({{?MODULE, race_condition_registry}, self()},
+                fun() ->
+                    %% get entry (for first node entering lock it exists, for subsequent nodes too
+                    %% because of data written during resolve)
+                    Entry = find_process_entry_by_name(Name),
+                    PidInTable = Entry#syn_registry_table.pid,
+                    MetaInTable = Entry#syn_registry_table.meta,
+
+                    case PidInTable =:= RemotePid of
+                        true ->
+                            error_logger:info_msg(
+                                "Syn(~p): Conflicting name from multicast ~p already resolved, skipping~n",
+                                [node(), Name]
+                            );
+                        false ->
+                            error_logger:warning_msg(
+                                "Syn(~p): Conflicting name from multicast found for: ~p, processes are ~p, ~p~n",
+                                [node(), Name, PidInTable, RemotePid]
+                            ),
+
+
+                            CustomEventHandler = undefined,
+
+
+                            %% call conflict resolution
+                            {PidToKeep, KillOther} = syn_event_handler:do_resolve_registry_conflict(
+                                Name,
+                                {PidInTable, MetaInTable},
+                                {RemotePid, RemoteMeta},
+                                CustomEventHandler
+                            ),
+
+                            %% keep chosen one
+                            case PidToKeep of
+                                PidInTable ->
+                                    %% keep local
+                                    error_logger:error_msg(
+                                        "Syn(~p): Keeping local process ~p, killing remote ~p~n",
+                                        [node(), PidInTable, RemotePid]
+                                    ),
+                                    RemoteNode = node(RemotePid),
+                                    ok = rpc:call(RemoteNode, syn_registry, add_to_local_table, [Name, PidInTable, MetaInTable, undefined]),
+                                    case KillOther of
+                                        true -> exit(RemotePid, kill);
+                                        _ -> ok
+                                    end;
+
+                                RemotePid ->
+                                    %% keep remote
+                                    error_logger:error_msg(
+                                        "Syn(~p): Keeping remote process ~p, killing local ~p~n",
+                                        [node(), RemotePid, PidInTable]
+                                    ),
+                                    add_to_local_table(Name, RemotePid, RemoteMeta, undefined),
+                                    case KillOther of
+                                        true -> exit(PidInTable, kill);
+                                        _ -> ok
+                                    end;
+
+                                Other ->
+                                    error_logger:error_msg(
+                                        "Syn(~p): Custom handler returned ~p, valid options were ~p and ~p~n",
+                                        [node(), Other, PidInTable, RemotePid]
+                                    )
+                            end
+
+                    end
+                end
+            )
+    end.
+
 -spec remove_from_local_table(Name :: any()) -> ok.
 remove_from_local_table(Name) ->
     mnesia:dirty_delete(syn_registry_table, Name).
@@ -348,10 +429,17 @@ handle_process_down(Name, Pid, Meta, Reason, #state{
 }) ->
     case Name of
         undefined ->
-            error_logger:warning_msg(
-                "Syn(~p): Received a DOWN message from an unmonitored process ~p with reason: ~p~n",
-                [node(), Pid, Reason]
-            );
+            case Reason of
+                normal -> ok;
+                shutdown -> ok;
+                {shutdown, _} -> ok;
+                killed -> ok;
+                _ ->
+                    error_logger:warning_msg(
+                        "Syn(~p): Received a DOWN message from an unmonitored process ~p with reason: ~p~n",
+                        [node(), Pid, Reason]
+                    )
+            end;
         _ ->
             syn_event_handler:do_on_process_exit(Name, Pid, Meta, Reason, CustomEventHandler)
     end.
@@ -375,7 +463,7 @@ sync_registry_tuples(RemoteNode, RegistryTuples, #state{
                 LocalMeta = Entry#syn_registry_table.meta,
 
                 error_logger:warning_msg(
-                    "Syn(~p): Conflicting name process found for: ~p, processes are ~p, ~p~n",
+                    "Syn(~p): Conflicting name in auto merge for: ~p, processes are ~p, ~p~n",
                     [node(), Name, LocalPid, RemotePid]
                 ),
 
@@ -391,6 +479,10 @@ sync_registry_tuples(RemoteNode, RegistryTuples, #state{
                 case PidToKeep of
                     LocalPid ->
                         %% keep local
+                        error_logger:error_msg(
+                            "Syn(~p): Keeping local process ~p, killing remote ~p~n",
+                            [node(), LocalPid, RemotePid]
+                        ),
                         ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name]),
                         case KillOther of
                             true -> exit(RemotePid, kill);
@@ -399,6 +491,10 @@ sync_registry_tuples(RemoteNode, RegistryTuples, #state{
 
                     RemotePid ->
                         %% keep remote
+                        error_logger:error_msg(
+                            "Syn(~p): Keeping remote process ~p, killing local ~p~n",
+                            [node(), RemotePid, LocalPid]
+                        ),
                         remove_from_local_table(Name),
                         add_to_local_table(Name, RemotePid, RemoteMeta, undefined),
                         case KillOther of

+ 60 - 7
test/syn_registry_SUITE.erl

@@ -43,7 +43,8 @@
 ]).
 -export([
     two_nodes_register_monitor_and_unregister/1,
-    two_nodes_registry_count/1
+    two_nodes_registry_count/1,
+    two_nodes_registration_race_condition_conflict_resolution/1
 ]).
 -export([
     three_nodes_partial_netsplit_consistency/1,
@@ -55,11 +56,13 @@
 %% support
 -export([
     start_syn_delayed_and_register_local_process/3,
-    start_syn_delayed_with_custom_handler_register_local_process/4
+    start_syn_delayed_with_custom_handler_register_local_process/4,
+    inject_add_to_local_node/3
 ]).
 
 %% include
 -include_lib("common_test/include/ct.hrl").
+-include_lib("../src/syn.hrl").
 
 %% ===================================================================
 %% Callbacks
@@ -104,7 +107,8 @@ groups() ->
         ]},
         {two_nodes_process_registration, [shuffle], [
             two_nodes_register_monitor_and_unregister,
-            two_nodes_registry_count
+            two_nodes_registry_count,
+            two_nodes_registration_race_condition_conflict_resolution
         ]},
         {three_nodes_process_registration, [shuffle], [
             three_nodes_partial_netsplit_consistency,
@@ -456,6 +460,45 @@ two_nodes_registry_count(Config) ->
     syn_test_suite_helper:kill_process(RemotePid),
     syn_test_suite_helper:kill_process(PidUnregistered).
 
+two_nodes_registration_race_condition_conflict_resolution(Config) ->
+    ConflictingName = "COMMON",
+    %% get slaves
+    SlaveNode = proplists:get_value(slave_node, Config),
+    %% start syn on nodes
+    ok = syn:start(),
+    ok = rpc:call(SlaveNode, syn, start, []),
+    timer:sleep(100),
+    %% start processes
+    Pid0 = syn_test_suite_helper:start_process(),
+    Pid1 = syn_test_suite_helper:start_process(SlaveNode),
+    %% inject into syn to simulate concurrent registration
+    ok = rpc:call(SlaveNode, ?MODULE, inject_add_to_local_node, [ConflictingName, Pid1, SlaveNode]),
+    %% register on master node to trigger conflict resolution
+    ok = syn:register(ConflictingName, Pid0, node()),
+    timer:sleep(1000),
+    %% check metadata
+    case syn:whereis(ConflictingName, with_meta) of
+        {Pid0, Meta} ->
+            Meta = node(),
+            %% check that other nodes' data corresponds
+            {Pid0, Meta} = rpc:call(SlaveNode, syn, whereis, [ConflictingName, with_meta]),
+            %% check that other processes are not alive because syn killed them
+            true = is_process_alive(Pid0),
+            false = rpc:call(SlaveNode, erlang, is_process_alive, [Pid1]);
+        {Pid1, Meta} ->
+            SlaveNode = Meta,
+            %% check that other nodes' data corresponds
+            {Pid1, Meta} = rpc:call(SlaveNode, syn, whereis, [ConflictingName, with_meta]),
+            %% check that other processes are not alive because syn killed them
+            false = is_process_alive(Pid0),
+            true = rpc:call(SlaveNode, erlang, is_process_alive, [Pid1]);
+        _ ->
+            ok = no_process_is_registered_with_conflicting_name
+    end,
+    %% kill processes
+    syn_test_suite_helper:kill_process(Pid0),
+    syn_test_suite_helper:kill_process(Pid1).
+
 three_nodes_partial_netsplit_consistency(Config) ->
     %% get slaves
     SlaveNode1 = proplists:get_value(slave_node_1, Config),
@@ -690,10 +733,9 @@ three_nodes_start_syn_before_connecting_cluster_with_conflict(Config) ->
     case syn:whereis(ConflictingName, with_meta) of
         {Pid0, Meta} ->
             CurrentNode = node(),
-            CurrentNode = Meta,
             %% check that other nodes' data corresponds
-            {Pid0, Meta} = rpc:call(SlaveNode1, syn, whereis, [ConflictingName, with_meta]),
-            {Pid0, Meta} = rpc:call(SlaveNode2, syn, whereis, [ConflictingName, with_meta]),
+            {Pid0, CurrentNode} = rpc:call(SlaveNode1, syn, whereis, [ConflictingName, with_meta]),
+            {Pid0, CurrentNode} = rpc:call(SlaveNode2, syn, whereis, [ConflictingName, with_meta]),
             %% check that other processes are not alive because syn killed them
             true = is_process_alive(Pid0),
             false = rpc:call(SlaveNode1, erlang, is_process_alive, [Pid1]),
@@ -715,7 +757,9 @@ three_nodes_start_syn_before_connecting_cluster_with_conflict(Config) ->
             %% check that other processes are not alive because syn killed them
             false = is_process_alive(Pid0),
             false = rpc:call(SlaveNode1, erlang, is_process_alive, [Pid1]),
-            true = rpc:call(SlaveNode2, erlang, is_process_alive, [Pid2])
+            true = rpc:call(SlaveNode2, erlang, is_process_alive, [Pid2]);
+        _ ->
+            ok = no_process_is_registered_with_conflicting_name
     end,
     %% kill processes
     syn_test_suite_helper:kill_process(Pid0),
@@ -806,3 +850,12 @@ start_syn_delayed_with_custom_handler_register_local_process(Name, Pid, Meta, Ms
         syn:start(),
         ok = syn:register(Name, Pid, Meta)
     end).
+
+inject_add_to_local_node(Name, Pid, Meta) ->
+    mnesia:dirty_write(#syn_registry_table{
+        name = Name,
+        pid = Pid,
+        node = node(Pid),
+        meta = Meta,
+        monitor_ref = undefined
+    }).