Browse Source

Refactor conflict resolution.

Roberto Ostinelli 5 years ago
parent
commit
572afc0c64
3 changed files with 73 additions and 81 deletions
  1. 33 51
      src/syn_registry.erl
  2. 2 21
      test/syn_groups_SUITE.erl
  3. 38 9
      test/syn_registry_SUITE.erl

+ 33 - 51
src/syn_registry.erl

@@ -215,28 +215,19 @@ handle_cast({sync_register, Name, RemotePid, RemoteMeta}, State) ->
             %% different pid, we have a conflict
             global:trans({{?MODULE, {inconsistent_name, Name}}, self()},
                 fun() ->
-                    error_logger:warning_msg(
-                        "Syn(~p): REGISTRY INCONSISTENCY (name: ~p) ----> Initiating for remote node ~p~n",
-                        [node(), Name, RemoteNode]
-                    ),
-
                     TablePid = Entry#syn_registry_table.pid,
                     TableMeta = Entry#syn_registry_table.meta,
 
-                    case resolve_conflict(Name, {TablePid, TableMeta}, {RemotePid, RemoteMeta}, State) of
-                        {PidToKeep, PidToKill} when PidToKeep =:= TablePid ->
-                            ok = rpc:call(RemoteNode, syn_registry, add_to_local_table, [Name, TablePid, TableMeta, undefined]),
-                            syn_kill(PidToKill, Name, RemoteMeta);
-
-                        {PidToKeep, PidToKill} when PidToKeep =:= RemotePid ->
-                            %% overwrite
-                            add_to_local_table(Name, RemotePid, RemoteMeta, undefined),
-                            syn_kill(PidToKill, Name, TableMeta);
+                    error_logger:warning_msg(
+                        "Syn(~p): REGISTRY INCONSISTENCY (name: ~p for ~p and ~p) ----> Initiating for remote node ~p~n",
+                        [node(), Name, {TablePid, TableMeta}, {RemotePid, RemoteMeta}, RemoteNode]
+                    ),
 
-                        _ ->
-                            %% no process is alive, monitors will remove them from tables
-                            ok
+                    CallbackIfLocal = fun() ->
+                        %% keeping local: overwrite local data to remote node
+                        ok = rpc:call(RemoteNode, syn_registry, add_to_local_table, [Name, TablePid, TableMeta, undefined])
                     end,
+                    resolve_conflict(Name, {TablePid, TableMeta}, {RemotePid, RemoteMeta}, CallbackIfLocal, State),
 
                     error_logger:info_msg(
                         "Syn(~p): REGISTRY INCONSISTENCY (name: ~p)  <---- Done for remote node ~p~n",
@@ -465,20 +456,11 @@ registry_automerge(RemoteNode, State) ->
                             [node(), Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta}]
                         ),
 
-                        case resolve_conflict(Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta}, State) of
-                            {PidToKeep, PidToKill} when PidToKeep =:= LocalPid ->
-%%                                        ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name]),
-                                ok = rpc:call(RemoteNode, syn_registry, add_to_local_table, [Name, LocalPid, LocalMeta, undefined]),
-
-                                syn_kill(PidToKill, Name, RemoteMeta);
-
-                            {PidToKeep, PidToKill} when PidToKeep =:= RemotePid ->
-                                add_to_local_table(Name, RemotePid, RemoteMeta, undefined),
-                                syn_kill(PidToKill, Name, LocalMeta);
-
-                            _ ->
-                                ok
-                        end
+                        CallbackIfLocal = fun() ->
+                            %% keeping local: remote data still on remote node, remove there
+                            ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name])
+                        end,
+                        resolve_conflict(Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta}, CallbackIfLocal, State)
                 end
             end,
             %% add to table
@@ -492,12 +474,14 @@ registry_automerge(RemoteNode, State) ->
     Name :: any(),
     {LocalPid :: pid(), LocalMeta :: any()},
     {RemotePid :: pid(), RemoteMeta :: any()},
+    CallbackIfLocal :: fun(),
     #state{}
-) -> {PidToKeep :: pid() | undefined, PidToKill :: pid() | undefined}.
+) -> any().
 resolve_conflict(
     Name,
     {TablePid, TableMeta},
     {RemotePid, RemoteMeta},
+    CallbackIfLocal,
     #state{custom_event_handler = CustomEventHandler}
 ) ->
     TablePidAlive = rpc:call(node(TablePid), erlang, is_process_alive, [TablePid]),
@@ -532,43 +516,41 @@ resolve_conflict(
         TablePid ->
             %% keep local
             error_logger:info_msg(
-                "Syn(~p): Keeping process in table ~p, killing remote process ~p~n",
+                "Syn(~p): Keeping process in table ~p over remote process ~p~n",
                 [node(), TablePid, RemotePid]
             ),
-            PidToKill = case KillOther of
-                true -> RemotePid;
+            %% callback: keeping local
+            CallbackIfLocal(),
+            %% kill?
+            case KillOther of
+                true -> syn_kill(RemotePid, Name, RemoteMeta);
                 _ -> undefined
-            end,
-            %% return
-            {PidToKeep, PidToKill};
+            end;
 
         RemotePid ->
             %% keep remote
             error_logger:info_msg(
-                "Syn(~p): Keeping remote process ~p, killing process in table ~p~n",
+                "Syn(~p): Keeping remote process ~p over process in table ~p~n",
                 [node(), RemotePid, TablePid]
             ),
-            PidToKill = case KillOther of
-                true -> TablePid;
-                _ -> undefined
-            end,
-            %% return
-            {PidToKeep, PidToKill};
+            %% keeping remote: overwrite remote data to local
+            %% no process killing necessary because we kill remote only if in a custom handler
+            add_to_local_table(Name, RemotePid, RemoteMeta, undefined);
 
         none ->
+            error_logger:info_msg(
+                "Syn(~p): Removing both processes' ~p and ~p data from local and remote tables~n",
+                [node(), RemotePid, TablePid]
+            ),
             remove_from_local_table(Name),
             RemoteNode = node(RemotePid),
-            ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name]),
-            %% return
-            {undefined, undefined};
+            ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name]);
 
         Other ->
             error_logger:error_msg(
                 "Syn(~p): Custom handler returned ~p, valid options were ~p and ~p~n",
                 [node(), Other, TablePid, RemotePid]
-            ),
-            %% return
-            {undefined, undefined}
+            )
     end.
 
 -spec syn_kill(PidToKill :: pid(), Name :: any(), Meta :: any()) -> true.

+ 2 - 21
test/syn_groups_SUITE.erl

@@ -47,8 +47,7 @@
     two_nodes_local_members/1,
     two_nodes_publish/1,
     two_nodes_local_publish/1,
-    two_nodes_multicall/1,
-    two_nodes_groups_wait_for_syn_up/1
+    two_nodes_multicall/1
 ]).
 -export([
     three_nodes_partial_netsplit_consistency/1,
@@ -105,8 +104,7 @@ groups() ->
             two_nodes_local_members,
             two_nodes_publish,
             two_nodes_local_publish,
-            two_nodes_multicall,
-            two_nodes_groups_wait_for_syn_up
+            two_nodes_multicall
         ]},
         {three_nodes_groups, [shuffle], [
             three_nodes_partial_netsplit_consistency,
@@ -728,23 +726,6 @@ two_nodes_multicall(Config) ->
     ]) =:= lists:sort(Replies),
     [PidUnresponsive] = BadPids.
 
-two_nodes_groups_wait_for_syn_up(_Config) ->
-    %% stop slave
-    syn_test_suite_helper:stop_slave(syn_slave),
-    %% start syn on local node
-    ok = syn:start(),
-    %% start process
-    Pid = syn_test_suite_helper:start_process(),
-    %% join
-    ok = syn:join(<<"group">>, Pid),
-    %% start remote node and syn
-    {ok, SlaveNode} = syn_test_suite_helper:start_slave(syn_slave),
-    ok = rpc:call(SlaveNode, syn, start, []),
-    timer:sleep(1000),
-    %% check
-    [Pid] = syn:get_members(<<"group">>),
-    [Pid] = rpc:call(SlaveNode, syn, get_members, [<<"group">>]).
-
 three_nodes_partial_netsplit_consistency(Config) ->
     GroupName = "my group",
     %% get slaves

+ 38 - 9
test/syn_registry_SUITE.erl

@@ -45,7 +45,8 @@
 -export([
     two_nodes_register_monitor_and_unregister/1,
     two_nodes_registry_count/1,
-    two_nodes_registration_race_condition_conflict_resolution/1,
+    two_nodes_registration_race_condition_conflict_resolution_keep_local/1,
+    two_nodes_registration_race_condition_conflict_resolution_keep_remote/1,
     two_nodes_registration_race_condition_conflict_resolution_when_process_died/1
 ]).
 -export([
@@ -111,7 +112,8 @@ groups() ->
         {two_nodes_process_registration, [shuffle], [
             two_nodes_register_monitor_and_unregister,
             two_nodes_registry_count,
-            two_nodes_registration_race_condition_conflict_resolution,
+            two_nodes_registration_race_condition_conflict_resolution_keep_local,
+            two_nodes_registration_race_condition_conflict_resolution_keep_remote,
             two_nodes_registration_race_condition_conflict_resolution_when_process_died
         ]},
         {three_nodes_process_registration, [shuffle], [
@@ -480,7 +482,7 @@ two_nodes_registry_count(Config) ->
     0 = syn:registry_count(node()),
     0 = syn:registry_count(SlaveNode).
 
-two_nodes_registration_race_condition_conflict_resolution(Config) ->
+two_nodes_registration_race_condition_conflict_resolution_keep_local(Config) ->
     ConflictingName = "COMMON",
     %% get slaves
     SlaveNode = proplists:get_value(slave_node, Config),
@@ -492,15 +494,42 @@ two_nodes_registration_race_condition_conflict_resolution(Config) ->
     Pid0 = syn_test_suite_helper:start_process(),
     Pid1 = syn_test_suite_helper:start_process(SlaveNode),
     %% inject into syn to simulate concurrent registration
-    ok = rpc:call(SlaveNode, syn_registry, add_to_local_table, [ConflictingName, Pid1, SlaveNode, undefined]),
-    %% register on master node to trigger conflict resolution
-    ok = syn:register(ConflictingName, Pid0, node()),
+    ok = syn_registry:add_to_local_table(ConflictingName, Pid0, node(), undefined),
+    %% register on slave node to trigger conflict resolution on master node
+    ok = rpc:call(SlaveNode, syn, register, [ConflictingName, Pid1, SlaveNode]),
     timer:sleep(1000),
     %% check metadata, resolution happens on master node
-    {Pid1, SlaveNode} = syn:whereis(ConflictingName, with_meta),
-    {Pid1, SlaveNode} = rpc:call(SlaveNode, syn, whereis, [ConflictingName, with_meta]),
+    Node = node(),
+    {Pid0, Node} = syn:whereis(ConflictingName, with_meta),
+    {Pid0, Node} = rpc:call(SlaveNode, syn, whereis, [ConflictingName, with_meta]),
+    %% check that other processes are not alive because syn killed them
+    true = is_process_alive(Pid0),
+    false = rpc:call(SlaveNode, erlang, is_process_alive, [Pid1]).
+
+two_nodes_registration_race_condition_conflict_resolution_keep_remote(Config) ->
+    ConflictingName = "COMMON",
+    %% get slaves
+    SlaveNode = proplists:get_value(slave_node, Config),
+    %% use customer handler
+    syn_test_suite_helper:use_custom_handler(),
+    rpc:call(SlaveNode, syn_test_suite_helper, use_custom_handler, []),
+    %% start syn on nodes
+    ok = syn:start(),
+    ok = rpc:call(SlaveNode, syn, start, []),
+    timer:sleep(100),
+    %% start processes
+    Pid0 = syn_test_suite_helper:start_process(),
+    Pid1 = syn_test_suite_helper:start_process(SlaveNode),
+    %% inject into syn to simulate concurrent registration
+    ok = syn_registry:add_to_local_table(ConflictingName, Pid0, node(), undefined),
+    %% register on slave node to trigger conflict resolution on master node
+    ok = rpc:call(SlaveNode, syn, register, [ConflictingName, Pid1, keep_this_one]),
+    timer:sleep(1000),
+    %% check metadata, resolution happens on master node
+    {Pid1, keep_this_one} = syn:whereis(ConflictingName, with_meta),
+    {Pid1, keep_this_one} = rpc:call(SlaveNode, syn, whereis, [ConflictingName, with_meta]),
     %% check that other processes are not alive because syn killed them
-    false = is_process_alive(Pid0),
+    true = is_process_alive(Pid0),
     true = rpc:call(SlaveNode, erlang, is_process_alive, [Pid1]).
 
 two_nodes_registration_race_condition_conflict_resolution_when_process_died(Config) ->