Просмотр исходного кода

Unmonitor processes discarded during conflict resolution.

Roberto Ostinelli 5 лет назад
Родитель
Сommit
097bcaed9d
3 измененных файлов с 115 добавлено и 53 удалено
  1. 4 4
      src/syn_event_handler.erl
  2. 66 37
      src/syn_registry.erl
  3. 45 12
      test/syn_registry_SUITE.erl

+ 4 - 4
src/syn_event_handler.erl

@@ -102,17 +102,17 @@ do_resolve_registry_conflict(Name, {Pid1, Meta1}, {Pid2, Meta2}, CustomEventHand
         true ->
             try CustomEventHandler:resolve_registry_conflict(Name, {Pid1, Meta1}, {Pid2, Meta2}) of
                 PidToKeep when is_pid(PidToKeep) ->
-                    {PidToKeep, undefined};
+                    {PidToKeep, false};
                 _ ->
-                    {undefined, undefined}
+                    {undefined, false}
             catch Exception:Reason ->
                 error_logger:error_msg(
                     "Syn(~p): Error ~p in custom handler resolve_registry_conflict: ~p~n",
                     [node(), Exception, Reason]
                 ),
-                {undefined, undefined}
+                {undefined, false}
             end;
         _ ->
             %% by default, keep pid that generated the conflict & kill the one in the local table
-            {Pid2, Pid1}
+            {Pid2, true}
     end.

+ 66 - 37
src/syn_registry.erl

@@ -37,8 +37,10 @@
 %% sync API
 -export([sync_register/4, sync_unregister/3]).
 -export([sync_get_local_registry_tuples/1]).
--export([add_to_local_table/4, remove_from_local_table/2]).
 -export([sync_from_node/1]).
+-export([add_to_local_table/4, remove_from_local_table/2]).
+-export([sync_demonitor_and_kill_on_node/5]).
+-export([find_monitor_for_pid/1]).
 
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@@ -146,6 +148,17 @@ sync_register(RemoteNode, Name, RemotePid, RemoteMeta) ->
 sync_unregister(RemoteNode, Name, Pid) ->
     gen_server:cast({?MODULE, RemoteNode}, {sync_unregister, Name, Pid}).
 
+-spec sync_demonitor_and_kill_on_node(
+    Name :: any(),
+    Pid :: pid(),
+    Meta :: any(),
+    MonitorRef :: reference(),
+    Kill :: boolean()
+) -> ok.
+sync_demonitor_and_kill_on_node(Name, Pid, Meta, MonitorRef, Kill) ->
+    RemoteNode = node(Pid),
+    gen_server:cast({?MODULE, RemoteNode}, {sync_demonitor_and_kill_on_node, Name, Pid, Meta, MonitorRef, Kill}).
+
 -spec sync_get_local_registry_tuples(FromNode :: node()) -> [syn_registry_tuple()].
 sync_get_local_registry_tuples(FromNode) ->
     error_logger:info_msg("Syn(~p): Received request of local registry tuples from remote node ~p~n", [node(), FromNode]),
@@ -276,31 +289,33 @@ handle_cast({sync_register, Name, RemotePid, RemoteMeta}, State) ->
                     ),
 
                     case resolve_conflict(Name, {TablePid, TableMeta}, {RemotePid, RemoteMeta}, State) of
-                        {TablePid, PidToKill} ->
-                            %% keep local: overwrite local data to all remote nodes
+                        {TablePid, KillOtherPid} ->
+                            %% keep local
+                            %% demonitor
+                            MonitorRef = rpc:call(RemoteNode, syn_registry, find_monitor_for_pid, [RemotePid]),
+                            sync_demonitor_and_kill_on_node(Name, RemotePid, RemoteMeta, MonitorRef, KillOtherPid),
+                            %% overwrite local data to all remote nodes
                             lists:foreach(fun(RNode) ->
                                 ok = rpc:call(RNode, syn_registry, add_to_local_table, [Name, TablePid, TableMeta, undefined])
-                            end, nodes()),
-                            %% kill
-                            syn_kill(Name, PidToKill, RemoteMeta);
-
-                        {RemotePid, PidToKill} ->
-
+                            end, nodes());
+
+                        {RemotePid, KillOtherPid} ->
+                            %% keep remote
+                            %% demonitor
+                            MonitorRef = rpc:call(node(TablePid), syn_registry, find_monitor_for_pid, [TablePid]),
+                            sync_demonitor_and_kill_on_node(Name, TablePid, TableMeta, MonitorRef, KillOtherPid),
+                            %% overwrite remote data to all other nodes (including local)
                             NodesExceptRemoteNode = [node() | nodes()] -- [RemoteNode],
-                            %% keep remote: overwrite remote data to all other nodes (including local)
                             lists:foreach(fun(RNode) ->
                                 ok = rpc:call(RNode, syn_registry, add_to_local_table, [Name, RemotePid, RemoteMeta, undefined])
-                            end, NodesExceptRemoteNode),
-                            %% kill
-                            syn_kill(Name, PidToKill, TableMeta);
+                            end, NodesExceptRemoteNode);
 
                         undefined ->
                             AllNodes = [node() | nodes()],
                             %% both are dead, remove from all nodes
                             lists:foreach(fun(RNode) ->
                                 ok = rpc:call(RNode, syn_registry, remove_from_local_table, [Name, RemotePid])
-                            end, AllNodes),
-                            remove_from_local_table(Name, TablePid)
+                            end, AllNodes)
                     end,
 
                     error_logger:info_msg(
@@ -324,6 +339,20 @@ handle_cast({sync_from_node, RemoteNode}, State) ->
     registry_automerge(RemoteNode, State),
     {noreply, State};
 
+handle_cast({sync_demonitor_and_kill_on_node, Name, Pid, Meta, MonitorRef, Kill}, State) ->
+    error_logger:info_msg("Syn(~p): Sync demonitoring pid ~p~n", [node(), Pid]),
+    %% demonitor
+    catch erlang:demonitor(MonitorRef, [flush]),
+    %% kill
+    case Kill of
+        true ->
+            exit(Pid, {syn_resolve_kill, Name, Meta});
+
+        _ ->
+            ok
+    end,
+    {noreply, State};
+
 handle_cast(Msg, State) ->
     error_logger:warning_msg("Syn(~p): Received an unknown cast message: ~p~n", [node(), Msg]),
     {noreply, State}.
@@ -651,17 +680,21 @@ resolve_tuple_in_automerge(Name, RemotePid, RemoteMeta, RemoteNode, State) ->
             ),
 
             case resolve_conflict(Name, {TablePid, TableMeta}, {RemotePid, RemoteMeta}, State) of
-                {TablePid, PidToKill} ->
-                    %% keep local: remote data still on remote node, remove there
-                    ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name, RemotePid]),
-                    %% kill
-                    syn_kill(Name, PidToKill, RemoteMeta);
-
-                {RemotePid, PidToKill} ->
-                    %% keep remote: overwrite remote data to local
-                    add_to_local_table(Name, RemotePid, RemoteMeta, undefined),
-                    %% kill
-                    syn_kill(Name, PidToKill, TableMeta);
+                {TablePid, KillOtherPid} ->
+                    %% keep local
+                    %% demonitor
+                    MonitorRef = rpc:call(RemoteNode, syn_registry, find_monitor_for_pid, [RemotePid]),
+                    sync_demonitor_and_kill_on_node(Name, RemotePid, RemoteMeta, MonitorRef, KillOtherPid),
+                    %% remote data still on remote node, remove there
+                    ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name, RemotePid]);
+
+                {RemotePid, KillOtherPid} ->
+                    %% keep remote
+                    %% demonitor
+                    MonitorRef = rpc:call(RemoteNode, syn_registry, find_monitor_for_pid, [TablePid]),
+                    sync_demonitor_and_kill_on_node(Name, TablePid, TableMeta, MonitorRef, KillOtherPid),
+                    %% overwrite remote data to local
+                    add_to_local_table(Name, RemotePid, RemoteMeta, undefined);
 
                 undefined ->
                     %% both are dead, remove from local & remote
@@ -675,7 +708,7 @@ resolve_tuple_in_automerge(Name, RemotePid, RemoteMeta, RemoteNode, State) ->
     {TablePid :: pid(), TableMeta :: any()},
     {RemotePid :: pid(), RemoteMeta :: any()},
     #state{}
-) -> {PidToKeep :: pid(), PidToKill :: pid() | undefined} | undefined.
+) -> {PidToKeep :: pid(), KillOtherPid :: boolean() | undefined} | undefined.
 resolve_conflict(
     Name,
     {TablePid, TableMeta},
@@ -686,7 +719,7 @@ resolve_conflict(
     RemotePidAlive = rpc:call(node(RemotePid), erlang, is_process_alive, [RemotePid]),
 
     %% check if pids are alive (race conditions if pid dies during resolution)
-    {PidToKeep, PidToKill} = case {TablePidAlive, RemotePidAlive} of
+    {PidToKeep, KillOtherPid} = case {TablePidAlive, RemotePidAlive} of
         {true, true} ->
             %% call conflict resolution
             syn_event_handler:do_resolve_registry_conflict(
@@ -698,15 +731,15 @@ resolve_conflict(
 
         {true, false} ->
             %% keep only alive process
-            {TablePid, undefined};
+            {TablePid, false};
 
         {false, true} ->
             %% keep only alive process
-            {RemotePid, undefined};
+            {RemotePid, false};
 
         {false, false} ->
             %% remove both
-            {undefined, undefined}
+            {undefined, false}
     end,
 
     %% keep chosen one
@@ -717,7 +750,7 @@ resolve_conflict(
                 "Syn(~p): Keeping process in table ~p over remote process ~p~n",
                 [node(), TablePid, RemotePid]
             ),
-            {TablePid, PidToKill};
+            {TablePid, KillOtherPid};
 
         RemotePid ->
             %% keep remote
@@ -725,7 +758,7 @@ resolve_conflict(
                 "Syn(~p): Keeping remote process ~p over process in table ~p~n",
                 [node(), RemotePid, TablePid]
             ),
-            {RemotePid, PidToKill};
+            {RemotePid, KillOtherPid};
 
         undefined ->
             error_logger:info_msg(
@@ -742,10 +775,6 @@ resolve_conflict(
             undefined
     end.
 
--spec syn_kill(Name :: any(), PidToKill :: pid() | undefined, Meta :: any()) -> true.
-syn_kill(_Name, undefined, _Meta) -> ok;
-syn_kill(Name, PidToKill, Meta) -> exit(PidToKill, {syn_resolve_kill, Name, Meta}).
-
 -spec raw_purge_registry_entries_for_remote_node(Node :: atom()) -> ok.
 raw_purge_registry_entries_for_remote_node(Node) when Node =/= node() ->
     %% NB: no demonitoring is done, this is why it's raw

+ 45 - 12
test/syn_registry_SUITE.erl

@@ -47,6 +47,7 @@
     two_nodes_register_monitor_and_unregister/1,
     two_nodes_registry_count/1,
     two_nodes_registration_race_condition_conflict_resolution_keep_remote/1,
+    two_nodes_registration_race_condition_conflict_resolution_keep_remote_with_custom_handler/1,
     two_nodes_registration_race_condition_conflict_resolution_keep_local_with_custom_handler/1,
     two_nodes_registration_race_condition_conflict_resolution_when_process_died/1,
     two_nodes_registry_full_cluster_sync_on_boot_node_added_later/1,
@@ -89,9 +90,9 @@
 %% -------------------------------------------------------------------
 all() ->
     [
-        {group, single_node_process_registration}
-%%        {group, two_nodes_process_registration},
-%%        {group, three_nodes_process_registration}
+        {group, single_node_process_registration},
+        {group, two_nodes_process_registration},
+        {group, three_nodes_process_registration}
     ].
 
 %% -------------------------------------------------------------------
@@ -109,20 +110,21 @@ all() ->
 groups() ->
     [
         {single_node_process_registration, [shuffle], [
-%%            single_node_register_and_monitor,
-%%            single_node_register_and_unregister,
-%%            single_node_registration_errors,
-%%            single_node_registry_count,
-%%            single_node_register_gen_server,
-%%            single_node_callback_on_process_exit,
-%%            single_node_ensure_callback_process_exit_is_called_if_process_killed,
-%%            single_node_monitor_after_registry_crash,
+            single_node_register_and_monitor,
+            single_node_register_and_unregister,
+            single_node_registration_errors,
+            single_node_registry_count,
+            single_node_register_gen_server,
+            single_node_callback_on_process_exit,
+            single_node_ensure_callback_process_exit_is_called_if_process_killed,
+            single_node_monitor_after_registry_crash,
             single_node_keep_monitor_reference_for_pid_if_there
         ]},
         {two_nodes_process_registration, [shuffle], [
             two_nodes_register_monitor_and_unregister,
             two_nodes_registry_count,
             two_nodes_registration_race_condition_conflict_resolution_keep_remote,
+            two_nodes_registration_race_condition_conflict_resolution_keep_remote_with_custom_handler,
             two_nodes_registration_race_condition_conflict_resolution_keep_local_with_custom_handler,
             two_nodes_registration_race_condition_conflict_resolution_when_process_died,
             two_nodes_registry_full_cluster_sync_on_boot_node_added_later,
@@ -545,6 +547,34 @@ two_nodes_registration_race_condition_conflict_resolution_keep_remote(Config) ->
     false = is_process_alive(Pid0),
     true = rpc:call(SlaveNode, erlang, is_process_alive, [Pid1]).
 
+two_nodes_registration_race_condition_conflict_resolution_keep_remote_with_custom_handler(Config) ->
+    ConflictingName = "COMMON",
+    %% get slaves
+    SlaveNode = proplists:get_value(slave_node, Config),
+    %% use customer handler
+    syn_test_suite_helper:use_custom_handler(),
+    rpc:call(SlaveNode, syn_test_suite_helper, use_custom_handler, []),
+    %% start syn on nodes
+    ok = syn:start(),
+    ok = rpc:call(SlaveNode, syn, start, []),
+    timer:sleep(1000),
+    %% start processes
+    Pid0 = syn_test_suite_helper:start_process(),
+    Pid1 = syn_test_suite_helper:start_process(SlaveNode),
+    %% inject into syn to simulate concurrent registration
+    ok = syn:register(ConflictingName, Pid0, node()),
+    %% trigger conflict resolution on master node
+    ok = syn_registry:sync_register(node(), ConflictingName, Pid1, keep_this_one),
+    timer:sleep(1000),
+    %% check metadata, resolution happens on master node
+    {Pid1, keep_this_one} = syn:whereis(ConflictingName, with_meta),
+    %% check that other processes are not alive because syn killed them
+    true = is_process_alive(Pid0),
+    true = rpc:call(SlaveNode, erlang, is_process_alive, [Pid1]),
+    %% check that discarded process is not monitored
+    {monitored_by, Monitors} = erlang:process_info(Pid0, monitored_by),
+    0 = length(Monitors).
+
 two_nodes_registration_race_condition_conflict_resolution_keep_local_with_custom_handler(Config) ->
     ConflictingName = "COMMON",
     %% get slaves
@@ -569,7 +599,10 @@ two_nodes_registration_race_condition_conflict_resolution_keep_local_with_custom
     {Pid0, keep_this_one} = rpc:call(SlaveNode, syn, whereis, [ConflictingName, with_meta]),
     %% check that other processes are not alive because syn killed them
     true = is_process_alive(Pid0),
-    true = rpc:call(SlaveNode, erlang, is_process_alive, [Pid1]).
+    true = rpc:call(SlaveNode, erlang, is_process_alive, [Pid1]),
+    %% check that discarded process is not monitored
+    {monitored_by, Monitors} = rpc:call(SlaveNode, erlang, process_info, [Pid1, monitored_by]),
+    0 = length(Monitors).
 
 two_nodes_registration_race_condition_conflict_resolution_when_process_died(Config) ->
     ConflictingName = "COMMON",