Browse Source

Extend sync registration conflict resolution to the whole cluster.

Roberto Ostinelli 5 years ago
parent
commit
c776150bda
3 changed files with 106 additions and 50 deletions
  1. 6 6
      src/syn_event_handler.erl
  2. 67 42
      src/syn_registry.erl
  3. 33 2
      test/syn_registry_SUITE.erl

+ 6 - 6
src/syn_event_handler.erl

@@ -96,23 +96,23 @@ do_on_group_process_exit(GroupName, Pid, Meta, Reason, CustomEventHandler) ->
     {Pid1 :: pid(), Meta1 :: any()},
     {Pid2 :: pid(), Meta2 :: any()},
     CustomEventHandler :: module()
-) -> {PidToKeep :: pid() | undefined, KillOther :: boolean()}.
+) -> {PidToKeep :: pid() | undefined, PidToKill :: pid() | undefined}.
 do_resolve_registry_conflict(Name, {Pid1, Meta1}, {Pid2, Meta2}, CustomEventHandler) ->
     case erlang:function_exported(CustomEventHandler, resolve_registry_conflict, 3) of
         true ->
             try CustomEventHandler:resolve_registry_conflict(Name, {Pid1, Meta1}, {Pid2, Meta2}) of
                 PidToKeep when is_pid(PidToKeep) ->
-                    {PidToKeep, false};
+                    {PidToKeep, undefined};
                 _ ->
-                    {undefined, false}
+                    {undefined, undefined}
             catch Exception:Reason ->
                 error_logger:error_msg(
                     "Syn(~p): Error ~p in custom handler resolve_registry_conflict: ~p~n",
                     [node(), Exception, Reason]
                 ),
-                {undefined, false}
+                {undefined, undefined}
             end;
         _ ->
-            %% by default, keep pid that generated the conflict
-            {Pid2, true}
+            %% by default, keep pid that generated the conflict & kill the one in the local table
+            {Pid2, Pid1}
     end.

+ 67 - 42
src/syn_registry.erl

@@ -271,19 +271,41 @@ handle_cast({sync_register, Name, RemotePid, RemoteMeta}, State) ->
             global:trans({{?MODULE, {inconsistent_name, Name}}, self()},
                 fun() ->
                     error_logger:warning_msg(
-                        "Syn(~p): REGISTRY INCONSISTENCY (name: ~p for ~p and ~p) ----> Initiating for remote node ~p~n",
+                        "Syn(~p): REGISTRY INCONSISTENCY (name: ~p for ~p and ~p) ----> Received from remote node ~p~n",
                         [node(), Name, {TablePid, TableMeta}, {RemotePid, RemoteMeta}, RemoteNode]
                     ),
 
-                    CallbackIfLocal = fun() ->
-                        %% keeping local: overwrite local data to remote node
-                        ok = rpc:call(RemoteNode, syn_registry, add_to_local_table, [Name, TablePid, TableMeta, undefined])
+                    case resolve_conflict(Name, {TablePid, TableMeta}, {RemotePid, RemoteMeta}, State) of
+                        {TablePid, PidToKill} ->
+                            %% keep local: overwrite local data to all remote nodes
+                            lists:foreach(fun(RNode) ->
+                                ok = rpc:call(RNode, syn_registry, add_to_local_table, [Name, TablePid, TableMeta, undefined])
+                            end, nodes()),
+                            %% kill
+                            syn_kill(Name, PidToKill, RemoteMeta);
+
+                        {RemotePid, PidToKill} ->
+
+                            NodesExceptRemoteNode = [node() | nodes()] -- [RemoteNode],
+                            %% keep remote: overwrite remote data to all other nodes (including local)
+                            lists:foreach(fun(RNode) ->
+                                ok = rpc:call(RNode, syn_registry, add_to_local_table, [Name, RemotePid, RemoteMeta, undefined])
+                            end, NodesExceptRemoteNode),
+                            %% kill
+                            syn_kill(Name, PidToKill, TableMeta);
+
+                        undefined ->
+                            AllNodes = [node() | nodes()],
+                            %% both are dead, remove from all nodes
+                            lists:foreach(fun(RNode) ->
+                                ok = rpc:call(RNode, syn_registry, remove_from_local_table, [Name, RemotePid])
+                            end, AllNodes),
+                            remove_from_local_table(Name, TablePid)
                     end,
-                    resolve_conflict(Name, {TablePid, TableMeta}, {RemotePid, RemoteMeta}, CallbackIfLocal, State),
 
                     error_logger:info_msg(
-                        "Syn(~p): REGISTRY INCONSISTENCY (name: ~p)  <---- Done for remote node ~p~n",
-                        [node(), Name, RemoteNode]
+                        "Syn(~p): REGISTRY INCONSISTENCY (name: ~p)  <---- Done on all cluster~n",
+                        [node(), Name]
                     )
                 end
             )
@@ -589,7 +611,7 @@ registry_automerge(RemoteNode, State) ->
                     raw_purge_registry_entries_for_remote_node(RemoteNode),
                     %% loop
                     F = fun({Name, RemotePid, RemoteMeta}) ->
-                        resolve_tuple(Name, RemotePid, RemoteMeta, RemoteNode, State)
+                        resolve_tuple_in_automerge(Name, RemotePid, RemoteMeta, RemoteNode, State)
                     end,
                     %% add to table
                     lists:foreach(F, Entries),
@@ -599,52 +621,63 @@ registry_automerge(RemoteNode, State) ->
         end
     ).
 
--spec resolve_tuple(
+-spec resolve_tuple_in_automerge(
     Name :: any(),
     RemotePid :: pid(),
     RemoteMeta :: any(),
     RemoteNode :: node(),
     #state{}
 ) -> any().
-resolve_tuple(Name, RemotePid, RemoteMeta, RemoteNode, State) ->
+resolve_tuple_in_automerge(Name, RemotePid, RemoteMeta, RemoteNode, State) ->
     %% check if same name is registered
     case find_registry_tuple_by_name(Name) of
         undefined ->
             %% no conflict
             add_to_local_table(Name, RemotePid, RemoteMeta, undefined);
 
-        {Name, LocalPid, LocalMeta} ->
+        {Name, TablePid, TableMeta} ->
             error_logger:warning_msg(
                 "Syn(~p): Conflicting name in auto merge for: ~p, processes are ~p, ~p~n",
-                [node(), Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta}]
+                [node(), Name, {TablePid, TableMeta}, {RemotePid, RemoteMeta}]
             ),
 
-            CallbackIfLocal = fun() ->
-                %% keeping local: remote data still on remote node, remove there
-                ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name, RemotePid])
-            end,
-            resolve_conflict(Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta}, CallbackIfLocal, State)
+            case resolve_conflict(Name, {TablePid, TableMeta}, {RemotePid, RemoteMeta}, State) of
+                {TablePid, PidToKill} ->
+                    %% keep local: remote data still on remote node, remove there
+                    ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name, RemotePid]),
+                    %% kill
+                    syn_kill(Name, PidToKill, RemoteMeta);
+
+                {RemotePid, PidToKill} ->
+                    %% keep remote: overwrite remote data to local
+                    add_to_local_table(Name, RemotePid, RemoteMeta, undefined),
+                    %% kill
+                    syn_kill(Name, PidToKill, TableMeta);
+
+                undefined ->
+                    %% both are dead, remove from local & remote
+                    remove_from_local_table(Name, TablePid),
+                    ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name, RemotePid])
+            end
     end.
 
 -spec resolve_conflict(
     Name :: any(),
-    {LocalPid :: pid(), LocalMeta :: any()},
+    {TablePid :: pid(), TableMeta :: any()},
     {RemotePid :: pid(), RemoteMeta :: any()},
-    CallbackIfLocal :: fun(),
     #state{}
-) -> any().
+) -> {PidToKeep :: pid(), PidToKill :: pid() | undefined} | undefined.
 resolve_conflict(
     Name,
     {TablePid, TableMeta},
     {RemotePid, RemoteMeta},
-    CallbackIfLocal,
     #state{custom_event_handler = CustomEventHandler}
 ) ->
     TablePidAlive = rpc:call(node(TablePid), erlang, is_process_alive, [TablePid]),
     RemotePidAlive = rpc:call(node(RemotePid), erlang, is_process_alive, [RemotePid]),
 
     %% check if pids are alive (race conditions if pid dies during resolution)
-    {PidToKeep, KillOther} = case {TablePidAlive, RemotePidAlive} of
+    {PidToKeep, PidToKill} = case {TablePidAlive, RemotePidAlive} of
         {true, true} ->
             %% call conflict resolution
             syn_event_handler:do_resolve_registry_conflict(
@@ -656,15 +689,15 @@ resolve_conflict(
 
         {true, false} ->
             %% keep only alive process
-            {TablePid, false};
+            {TablePid, undefined};
 
         {false, true} ->
             %% keep only alive process
-            {RemotePid, false};
+            {RemotePid, undefined};
 
         {false, false} ->
             %% remove both
-            {none, false}
+            {undefined, undefined}
     end,
 
     %% keep chosen one
@@ -675,9 +708,7 @@ resolve_conflict(
                 "Syn(~p): Keeping process in table ~p over remote process ~p~n",
                 [node(), TablePid, RemotePid]
             ),
-            %% callback: keeping local
-            %% no process killing necessary because we kill local only if in a custom handler
-            CallbackIfLocal();
+            {TablePid, PidToKill};
 
         RemotePid ->
             %% keep remote
@@ -685,31 +716,25 @@ resolve_conflict(
                 "Syn(~p): Keeping remote process ~p over process in table ~p~n",
                 [node(), RemotePid, TablePid]
             ),
-            %% keeping remote: overwrite remote data to local
-            add_to_local_table(Name, RemotePid, RemoteMeta, undefined),
-            %% kill?
-            case KillOther of
-                true -> syn_kill(Name, TablePid, TableMeta);
-                _ -> undefined
-            end;
+            {RemotePid, PidToKill};
 
-        none ->
+        undefined ->
             error_logger:info_msg(
                 "Syn(~p): Removing both processes' ~p and ~p data from local and remote tables~n",
                 [node(), RemotePid, TablePid]
             ),
-            remove_from_local_table(Name, TablePid),
-            RemoteNode = node(RemotePid),
-            ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name, RemotePid]);
+            undefined;
 
         Other ->
             error_logger:error_msg(
-                "Syn(~p): Custom handler returned ~p, valid options were ~p and ~p~n",
+                "Syn(~p): Custom handler returned ~p, valid options were ~p and ~p, removing both~n",
                 [node(), Other, TablePid, RemotePid]
-            )
+            ),
+            undefined
     end.
 
--spec syn_kill(Name :: any(), PidToKill :: pid(), Meta :: any()) -> true.
+-spec syn_kill(Name :: any(), PidToKill :: pid() | undefined, Meta :: any()) -> true.
+syn_kill(_Name, undefined, _Meta) -> ok;
 syn_kill(Name, PidToKill, Meta) -> exit(PidToKill, {syn_resolve_kill, Name, Meta}).
 
 -spec raw_purge_registry_entries_for_remote_node(Node :: atom()) -> ok.

+ 33 - 2
test/syn_registry_SUITE.erl

@@ -60,7 +60,8 @@
     three_nodes_registration_race_condition_custom_conflict_resolution/1,
     three_nodes_anti_entropy/1,
     three_nodes_anti_entropy_manual/1,
-    three_nodes_concurrent_registration_unregistration/1
+    three_nodes_concurrent_registration_unregistration/1,
+    three_nodes_resolve_conflict_on_all_nodes/1
 ]).
 
 %% support
@@ -134,7 +135,8 @@ groups() ->
             three_nodes_registration_race_condition_custom_conflict_resolution,
             three_nodes_anti_entropy,
             three_nodes_anti_entropy_manual,
-            three_nodes_concurrent_registration_unregistration
+            three_nodes_concurrent_registration_unregistration,
+            three_nodes_resolve_conflict_on_all_nodes
         ]}
     ].
 %% -------------------------------------------------------------------
@@ -1104,6 +1106,35 @@ three_nodes_concurrent_registration_unregistration(Config) ->
     {Pid0, Node} = rpc:call(SlaveNode1, syn, whereis, [CommonName, with_meta]),
     {Pid0, Node} = rpc:call(SlaveNode2, syn, whereis, [CommonName, with_meta]).
 
+three_nodes_resolve_conflict_on_all_nodes(Config) ->
+    CommonName = "common-name",
+    %% get slaves
+    SlaveNode1 = proplists:get_value(slave_node_1, Config),
+    SlaveNode2 = proplists:get_value(slave_node_2, Config),
+    %% start syn on nodes
+    ok = syn:start(),
+    ok = rpc:call(SlaveNode1, syn, start, []),
+    ok = rpc:call(SlaveNode2, syn, start, []),
+    timer:sleep(100),
+    %% start processes
+    Pid0 = syn_test_suite_helper:start_process(),
+    Pid1 = syn_test_suite_helper:start_process(SlaveNode1),
+    Pid2 = syn_test_suite_helper:start_process(SlaveNode2),
+    timer:sleep(100),
+    %% register on slave 1begin
+    ok = rpc:call(SlaveNode1, syn, register, [CommonName, Pid1, SlaveNode1]),
+    timer:sleep(100),
+    %% check
+    {Pid1, SlaveNode1} = syn:whereis(CommonName, with_meta),
+    {Pid1, SlaveNode1} = rpc:call(SlaveNode1, syn, whereis, [CommonName, with_meta]),
+    {Pid1, SlaveNode1} = rpc:call(SlaveNode2, syn, whereis, [CommonName, with_meta]),
+    %% force  a sync registration conflict on master node from slave 2
+    syn_registry:sync_register(node(), CommonName, Pid2, SlaveNode2),
+    timer:sleep(1000),
+    %% check
+    {Pid2, SlaveNode2} = syn:whereis(CommonName, with_meta),
+    {Pid2, SlaveNode2} = rpc:call(SlaveNode1, syn, whereis, [CommonName, with_meta]).
+
 %% ===================================================================
 %% Internal
 %% ===================================================================