Просмотр исходного кода

Ensure that process in conflict is killed only after removal from tables.

This is to avoid for monitoring removing processes.
Roberto Ostinelli 5 лет назад
Родитель
Сommit
fd917a9151
2 измененных файлов с 48 добавлено и 47 удалено
  1. 42 28
      src/syn_registry.erl
  2. 6 19
      test/syn_registry_SUITE.erl

+ 42 - 28
src/syn_registry.erl

@@ -199,21 +199,23 @@ handle_call(Request, From, State) ->
     {stop, Reason :: any(), #state{}}.
 
 handle_cast({sync_add, Name, RemotePid, RemoteMeta}, State) ->
+    %% get remote node
+    RemoteNode = node(RemotePid),
     %% check for conflicts
     case find_process_entry_by_name(Name) of
         undefined ->
             %% no conflict
             add_to_local_table(Name, RemotePid, RemoteMeta, undefined);
+
         Entry when Entry#syn_registry_table.pid =:= RemotePid ->
-            %% no conflict - TODO: we still could have a conflict on meta, use vclocks?
+            %% same process, no conflict
             add_to_local_table(Name, RemotePid, RemoteMeta, undefined);
+
         Entry ->
             %% different pid, we have a conflict
-            RemoteNode = node(RemotePid),
-
             global:trans({{?MODULE, {inconsistent_name, Name}}, self()},
                 fun() ->
-                    error_logger:info_msg(
+                    error_logger:warning_msg(
                         "Syn(~p): REGISTRY NAME INCONSISTENCY FOR ~p ----> Initiating for remote node ~p",
                         [node(), Name, RemoteNode]
                     ),
@@ -221,17 +223,24 @@ handle_cast({sync_add, Name, RemotePid, RemoteMeta}, State) ->
                     TableMeta = Entry#syn_registry_table.meta,
 
                     case resolve_conflict(Name, {TablePid, TableMeta}, {RemotePid, RemoteMeta}, State) of
-                        PidToKeep when PidToKeep =:= TablePid ->
-                            %% overwrite
-                            ok = rpc:call(RemoteNode, syn_registry, add_to_local_table, [Name, TablePid, TableMeta, undefined]);
+                        {PidToKeep, PidToKill} when PidToKeep =:= TablePid ->
+                            ok = rpc:call(RemoteNode, syn_registry, add_to_local_table, [Name, TablePid, TableMeta, undefined]),
+                            syn_kill(PidToKill, Name, RemoteMeta);
 
-                        PidToKeep when PidToKeep =:= RemotePid ->
+                        {PidToKeep, PidToKill} when PidToKeep =:= RemotePid ->
                             %% overwrite
-                            add_to_local_table(Name, RemotePid, RemoteMeta, undefined);
+                            add_to_local_table(Name, RemotePid, RemoteMeta, undefined),
+                            syn_kill(PidToKill, Name, TableMeta);
 
                         _ ->
+                            %% no process is alive, monitors will remove them from tables
                             ok
-                    end
+                    end,
+
+                    error_logger:warning_msg(
+                        "Syn(~p): REGISTRY NAME INCONSISTENCY FOR ~p <---- Done for remote node ~p",
+                        [node(), Name, RemoteNode]
+                    )
                 end
             )
     end,
@@ -283,7 +292,7 @@ handle_info({nodeup, RemoteNode}, State) ->
     error_logger:warning_msg("Syn(~p): Node ~p has joined the cluster", [node(), RemoteNode]),
     global:trans({{?MODULE, auto_merge_registry}, self()},
         fun() ->
-            error_logger:info_msg("Syn(~p): REGISTRY AUTOMERGE ----> Initiating for remote node ~p", [node(), RemoteNode]),
+            error_logger:warning_msg("Syn(~p): REGISTRY AUTOMERGE ----> Initiating for remote node ~p", [node(), RemoteNode]),
             %% get registry tuples from remote node
             RegistryTuples = rpc:call(RemoteNode, ?MODULE, sync_get_local_registry_tuples, [node()]),
             error_logger:info_msg(
@@ -309,17 +318,19 @@ handle_info({nodeup, RemoteNode}, State) ->
                         LocalPid = Entry#syn_registry_table.pid,
                         LocalMeta = Entry#syn_registry_table.meta,
 
-                        error_logger:warning_msg(
+                        error_logger:info_msg(
                             "Syn(~p): Conflicting name in auto merge for: ~p, processes are ~p, ~p",
                             [node(), Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta}]
                         ),
 
                         case resolve_conflict(Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta}, State) of
-                            PidToKeep when PidToKeep =:= LocalPid ->
-                                ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name]);
+                            {PidToKeep, PidToKill} when PidToKeep =:= LocalPid ->
+                                ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name]),
+                                syn_kill(PidToKill, Name, RemoteMeta);
 
-                            PidToKeep when PidToKeep =:= RemotePid ->
-                                add_to_local_table(Name, RemotePid, RemoteMeta, undefined);
+                            {PidToKeep, PidToKill} when PidToKeep =:= RemotePid ->
+                                add_to_local_table(Name, RemotePid, RemoteMeta, undefined),
+                                syn_kill(PidToKill, Name, LocalMeta);
 
                             _ ->
                                 ok
@@ -329,7 +340,7 @@ handle_info({nodeup, RemoteNode}, State) ->
             %% add to table
             lists:foreach(F, RegistryTuples),
             %% exit
-            error_logger:info_msg("Syn(~p): REGISTRY AUTOMERGE <---- Done for remote node ~p", [node(), RemoteNode])
+            error_logger:warning_msg("Syn(~p): REGISTRY AUTOMERGE <---- Done for remote node ~p", [node(), RemoteNode])
         end
     ),
     %% resume
@@ -462,7 +473,7 @@ handle_process_down(Name, Pid, Meta, Reason, #state{
     {LocalPid :: pid(), LocalMeta :: any()},
     {RemotePid :: pid(), RemoteMeta :: any()},
     #state{}
-) -> pid() | undefined.
+) -> {PidToKeep :: pid() | undefined, PidToKill :: pid() | undefined}.
 resolve_conflict(
     Name,
     {TablePid, TableMeta},
@@ -504,12 +515,12 @@ resolve_conflict(
                 "Syn(~p): Keeping local process ~p, killing remote ~p",
                 [node(), TablePid, RemotePid]
             ),
-            case KillOther of
-                true -> exit(RemotePid, {syn_resolve_kill, Name, RemoteMeta});
-                _ -> ok
+            PidToKill = case KillOther of
+                true -> RemotePid;
+                _ -> undefined
             end,
             %% return
-            PidToKeep;
+            {PidToKeep, PidToKill};
 
         RemotePid ->
             %% keep remote
@@ -517,19 +528,19 @@ resolve_conflict(
                 "Syn(~p): Keeping remote process ~p, killing local ~p",
                 [node(), RemotePid, TablePid]
             ),
-            case KillOther of
-                true -> exit(TablePid, {syn_resolve_kill, Name, TableMeta});
-                _ -> ok
+            PidToKill = case KillOther of
+                true -> TablePid;
+                _ -> undefined
             end,
             %% return
-            PidToKeep;
+            {PidToKeep, PidToKill};
 
         none ->
             remove_from_local_table(Name),
             RemoteNode = node(RemotePid),
             ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name]),
             %% return
-            undefined;
+            {undefined, undefined};
 
         Other ->
             error_logger:error_msg(
@@ -537,8 +548,11 @@ resolve_conflict(
                 [node(), Other, TablePid, RemotePid]
             ),
             %% return
-            undefined
+            {undefined, undefined}
     end.
+-spec syn_kill(PidToKill :: pid(), Name :: any(), Meta :: any()) -> true.
+syn_kill(undefined, _, _) -> true;
+syn_kill(PidToKill, Name, Meta) -> exit(PidToKill, {syn_resolve_kill, Name, Meta}).
 
 -spec raw_purge_registry_entries_for_remote_node(Node :: atom()) -> ok.
 raw_purge_registry_entries_for_remote_node(Node) when Node =/= node() ->

+ 6 - 19
test/syn_registry_SUITE.erl

@@ -496,25 +496,12 @@ two_nodes_registration_race_condition_conflict_resolution(Config) ->
     %% register on master node to trigger conflict resolution
     ok = syn:register(ConflictingName, Pid0, node()),
     timer:sleep(1000),
-    %% check metadata
-    case syn:whereis(ConflictingName, with_meta) of
-        {Pid0, Meta} ->
-            Meta = node(),
-            %% check that other nodes' data corresponds
-            {Pid0, Meta} = rpc:call(SlaveNode, syn, whereis, [ConflictingName, with_meta]),
-            %% check that other processes are not alive because syn killed them
-            true = is_process_alive(Pid0),
-            false = rpc:call(SlaveNode, erlang, is_process_alive, [Pid1]);
-        {Pid1, Meta} ->
-            SlaveNode = Meta,
-            %% check that other nodes' data corresponds
-            {Pid1, Meta} = rpc:call(SlaveNode, syn, whereis, [ConflictingName, with_meta]),
-            %% check that other processes are not alive because syn killed them
-            false = is_process_alive(Pid0),
-            true = rpc:call(SlaveNode, erlang, is_process_alive, [Pid1]);
-        _ ->
-            ok = no_process_is_registered_with_conflicting_name
-    end.
+    %% check metadata, resolution happens on master node
+    {Pid1, SlaveNode} = syn:whereis(ConflictingName, with_meta),
+    {Pid1, SlaveNode} = rpc:call(SlaveNode, syn, whereis, [ConflictingName, with_meta]),
+    %% check that other processes are not alive because syn killed them
+    false = is_process_alive(Pid0),
+    true = rpc:call(SlaveNode, erlang, is_process_alive, [Pid1]).
 
 two_nodes_registration_race_condition_conflict_resolution_when_process_died(Config) ->
     ConflictingName = "COMMON",