Просмотр исходного кода

Ensure that on process exit is called with all data even in conflict resolution.

Roberto Ostinelli 5 лет назад
Родитель
Сommit
3457459787
3 измененных файлов с 62 добавлено и 40 удалено
  1. 32 26
      src/syn_registry.erl
  2. 28 14
      test/syn_registry_SUITE.erl
  3. 2 0
      test/syn_test_event_handler.erl

+ 32 - 26
src/syn_registry.erl

@@ -212,12 +212,12 @@ handle_cast({inconsistent_name_data, OriginatingNode, Name, RemotePid, RemoteMet
                         "Syn(~p): REGISTRY NAME MERGE ----> Initiating for originating node ~p~n",
                         [node(), OriginatingNode]
                     ),
-                    PidInTable = Entry#syn_registry_table.pid,
-                    MetaInTable = Entry#syn_registry_table.meta,
-                    resolve_conflict(Name, {PidInTable, MetaInTable}, {RemotePid, RemoteMeta},
+                    TablePid = Entry#syn_registry_table.pid,
+                    TableMeta = Entry#syn_registry_table.meta,
+                    resolve_conflict(Name, {TablePid, TableMeta}, {RemotePid, RemoteMeta},
                         %% keep currently in table
                         fun() ->
-                            ok = rpc:call(OriginatingNode, syn_registry, add_to_local_table, [Name, PidInTable, MetaInTable, undefined])
+                            ok = rpc:call(OriginatingNode, syn_registry, add_to_local_table, [Name, TablePid, TableMeta, undefined])
                         end,
                         %% keep remote
                         fun() ->
@@ -395,16 +395,6 @@ unregister_on_node(Name) ->
             remove_from_local_table(Name)
     end.
 
--spec add_to_local_table(Name :: any(), Pid :: pid(), Meta :: any(), MonitorRef :: undefined | reference()) -> ok.
-add_to_local_table(Name, Pid, Meta, MonitorRef) ->
-    mnesia:dirty_write(#syn_registry_table{
-        name = Name,
-        pid = Pid,
-        node = node(Pid),
-        meta = Meta,
-        monitor_ref = MonitorRef
-    }).
-
 -spec add_remote_to_local_table(Name :: any(), RemotePid :: pid(), RemoteMeta :: any()) -> ok.
 add_remote_to_local_table(Name, RemotePid, RemoteMeta) ->
     %% check for conflicts
@@ -417,6 +407,16 @@ add_remote_to_local_table(Name, RemotePid, RemoteMeta) ->
             multicast_inconsistent_name_data(Name, Entry#syn_registry_table.pid, Entry#syn_registry_table.meta)
     end.
 
+-spec add_to_local_table(Name :: any(), Pid :: pid(), Meta :: any(), MonitorRef :: undefined | reference()) -> ok.
+add_to_local_table(Name, Pid, Meta, MonitorRef) ->
+    mnesia:dirty_write(#syn_registry_table{
+        name = Name,
+        pid = Pid,
+        node = node(Pid),
+        meta = Meta,
+        monitor_ref = MonitorRef
+    }).
+
 -spec remove_from_local_table(Name :: any()) -> ok.
 remove_from_local_table(Name) ->
     mnesia:dirty_delete(syn_registry_table, Name).
@@ -447,10 +447,16 @@ handle_process_down(Name, Pid, Meta, Reason, #state{
 }) ->
     case Name of
         undefined ->
-            error_logger:warning_msg(
-                "Syn(~p): Received a DOWN message from an unregistered process ~p with reason: ~p~n",
-                [node(), Pid, Reason]
-            );
+            case Reason of
+                {kill, KillName, KillMeta} ->
+                    syn_event_handler:do_on_process_exit(KillName, Pid, KillMeta, killed, CustomEventHandler);
+                _ ->
+                    error_logger:warning_msg(
+                        "Syn(~p): Received a DOWN message from an unregistered process ~p with reason: ~p~n",
+                        [node(), Pid, Reason]
+                    )
+            end;
+
         _ ->
             syn_event_handler:do_on_process_exit(Name, Pid, Meta, Reason, CustomEventHandler)
     end.
@@ -463,27 +469,27 @@ handle_process_down(Name, Pid, Meta, Reason, #state{
     KeepRemoteFun :: fun(),
     #state{}
 ) -> ok.
-resolve_conflict(Name, {PidInTable, MetaInTable}, {RemotePid, RemoteMeta}, KeepTableFun, KeepRemoteFun, #state{
+resolve_conflict(Name, {TablePid, TableMeta}, {RemotePid, RemoteMeta}, KeepTableFun, KeepRemoteFun, #state{
     custom_event_handler = CustomEventHandler
 }) ->
     %% call conflict resolution
     {PidToKeep, KillOther} = syn_event_handler:do_resolve_registry_conflict(
         Name,
-        {PidInTable, MetaInTable},
+        {TablePid, TableMeta},
         {RemotePid, RemoteMeta},
         CustomEventHandler
     ),
 
     %% keep chosen one
     case PidToKeep of
-        PidInTable ->
+        TablePid ->
             %% keep local
             error_logger:error_msg(
                 "Syn(~p): Keeping local process ~p, killing remote ~p~n",
-                [node(), PidInTable, RemotePid]
+                [node(), TablePid, RemotePid]
             ),
             case KillOther of
-                true -> exit(RemotePid, kill);
+                true -> exit(RemotePid, {kill, Name, RemoteMeta});
                 _ -> ok
             end,
             KeepTableFun();
@@ -492,10 +498,10 @@ resolve_conflict(Name, {PidInTable, MetaInTable}, {RemotePid, RemoteMeta}, KeepT
             %% keep remote
             error_logger:error_msg(
                 "Syn(~p): Keeping remote process ~p, killing local ~p~n",
-                [node(), RemotePid, PidInTable]
+                [node(), RemotePid, TablePid]
             ),
             case KillOther of
-                true -> exit(PidInTable, kill);
+                true -> exit(TablePid, {kill, Name, TableMeta});
                 _ -> ok
             end,
             KeepRemoteFun();
@@ -503,7 +509,7 @@ resolve_conflict(Name, {PidInTable, MetaInTable}, {RemotePid, RemoteMeta}, KeepT
         Other ->
             error_logger:error_msg(
                 "Syn(~p): Custom handler returned ~p, valid options were ~p and ~p~n",
-                [node(), Other, PidInTable, RemotePid]
+                [node(), Other, TablePid, RemotePid]
             )
     end.
 

+ 28 - 14
test/syn_registry_SUITE.erl

@@ -39,6 +39,7 @@
     single_node_registry_count/1,
     single_node_register_gen_server/1,
     single_node_callback_on_process_exit/1,
+    single_node_ensure_callback_process_exit_is_called_if_process_killed/1,
     single_node_monitor_after_registry_crash/1
 ]).
 -export([
@@ -57,8 +58,7 @@
 %% support
 -export([
     start_syn_delayed_and_register_local_process/3,
-    start_syn_delayed_with_custom_handler_register_local_process/4,
-    inject_add_to_local_node/3
+    start_syn_delayed_with_custom_handler_register_local_process/4
 ]).
 
 %% include
@@ -104,6 +104,7 @@ groups() ->
             single_node_registry_count,
             single_node_register_gen_server,
             single_node_callback_on_process_exit,
+            single_node_ensure_callback_process_exit_is_called_if_process_killed,
             single_node_monitor_after_registry_crash
         ]},
         {two_nodes_process_registration, [shuffle], [
@@ -363,6 +364,28 @@ single_node_callback_on_process_exit(_Config) ->
         ok
     end.
 
+single_node_ensure_callback_process_exit_is_called_if_process_killed(_Config) ->
+    Name = <<"my proc">>,
+    %% use custom handler
+    syn_test_suite_helper:use_custom_handler(),
+    %% start
+    ok = syn:start(),
+    %% start process
+    Pid = syn_test_suite_helper:start_process(),
+    %% register
+    TestPid = self(),
+    ok = syn:register(Name, Pid, {some_meta, TestPid}),
+    %% remove from table to simulate conflict resolution
+    syn_registry:remove_from_local_table(Name),
+    %% kill
+    exit(Pid, {kill, Name, {some_meta, TestPid}}),
+    receive
+        {received_event_on, some_meta} ->
+            ok
+    after 1000 ->
+        ok = callback_on_process_exit_was_not_received_by_pid
+    end.
+
 single_node_monitor_after_registry_crash(_Config) ->
     %% start
     ok = syn:start(),
@@ -467,7 +490,7 @@ two_nodes_registration_race_condition_conflict_resolution(Config) ->
     Pid0 = syn_test_suite_helper:start_process(),
     Pid1 = syn_test_suite_helper:start_process(SlaveNode),
     %% inject into syn to simulate concurrent registration
-    ok = rpc:call(SlaveNode, ?MODULE, inject_add_to_local_node, [ConflictingName, Pid1, SlaveNode]),
+    ok = rpc:call(SlaveNode, syn_registry, add_to_local_table, [ConflictingName, Pid1, SlaveNode, undefined]),
     %% register on master node to trigger conflict resolution
     ok = syn:register(ConflictingName, Pid0, node()),
     timer:sleep(1000),
@@ -816,8 +839,8 @@ three_nodes_registration_race_condition_custom_conflict_resolution(Config) ->
     Pid1 = syn_test_suite_helper:start_process(SlaveNode1),
     Pid2 = syn_test_suite_helper:start_process(SlaveNode2),
     %% inject into syn to simulate concurrent registration
-    ok = rpc:call(SlaveNode1, ?MODULE, inject_add_to_local_node, [ConflictingName, Pid1, keep_this_one]),
-    ok = rpc:call(SlaveNode2, ?MODULE, inject_add_to_local_node, [ConflictingName, Pid2, SlaveNode2]),
+    ok = rpc:call(SlaveNode1, syn_registry, add_to_local_table, [ConflictingName, Pid1, keep_this_one, undefined]),
+    ok = rpc:call(SlaveNode2, syn_registry, add_to_local_table, [ConflictingName, Pid2, SlaveNode2, undefined]),
     %% register on master node to trigger conflict resolution
     ok = syn:register(ConflictingName, Pid0, node()),
     timer:sleep(1000),
@@ -862,12 +885,3 @@ start_syn_delayed_with_custom_handler_register_local_process(Name, Pid, Meta, Ms
         syn:start(),
         ok = syn:register(Name, Pid, Meta)
     end).
-
-inject_add_to_local_node(Name, Pid, Meta) ->
-    mnesia:dirty_write(#syn_registry_table{
-        name = Name,
-        pid = Pid,
-        node = node(Pid),
-        meta = Meta,
-        monitor_ref = undefined
-    }).

+ 2 - 0
test/syn_test_event_handler.erl

@@ -42,6 +42,8 @@
 ) -> any().
 on_process_exit(_Name, _Pid, {PidId, TestPid}, _Reason) when is_pid(TestPid) ->
     TestPid ! {received_event_on, PidId};
+on_process_exit(_Name, _Pid, {Meta, TestPid}, _Reason) when is_pid(TestPid) ->
+    TestPid ! {received_event_on, Meta};
 on_process_exit(_Name, _Pid, _Meta, _Reason) ->
     ok.