Browse Source

Improve concurrency sync unregister's consistency.

Roberto Ostinelli 5 years ago
parent
commit
bffd990c47
2 changed files with 86 additions and 37 deletions
  1. 43 33
      src/syn_registry.erl
  2. 43 4
      test/syn_registry_SUITE.erl

+ 43 - 33
src/syn_registry.erl

@@ -34,9 +34,9 @@
 -export([count/0, count/1]).
 
 %% sync API
--export([sync_register/4, sync_unregister/2]).
+-export([sync_register/4, sync_unregister/3]).
 -export([sync_get_local_registry_tuples/1]).
--export([add_to_local_table/4, remove_from_local_table/1]).
+-export([add_to_local_table/4, remove_from_local_table/2]).
 -export([sync_from_node/1]).
 
 %% gen_server callbacks
@@ -110,9 +110,9 @@ count(Node) ->
 sync_register(RemoteNode, Name, RemotePid, RemoteMeta) ->
     gen_server:cast({?MODULE, RemoteNode}, {sync_register, Name, RemotePid, RemoteMeta}).
 
--spec sync_unregister(RemoteNode :: node(), Name :: any()) -> ok.
-sync_unregister(RemoteNode, Name) ->
-    gen_server:cast({?MODULE, RemoteNode}, {sync_unregister, Name}).
+-spec sync_unregister(RemoteNode :: node(), Name :: any(), Pid :: pid()) -> ok.
+sync_unregister(RemoteNode, Name, Pid) ->
+    gen_server:cast({?MODULE, RemoteNode}, {sync_unregister, Name, Pid}).
 
 -spec sync_get_local_registry_tuples(FromNode :: node()) -> [syn_registry_tuple()].
 sync_get_local_registry_tuples(FromNode) ->
@@ -200,8 +200,8 @@ handle_call({register_on_node, Name, Pid, Meta}, _From, State) ->
 
 handle_call({unregister_on_node, Name}, _From, State) ->
     case unregister_on_node(Name) of
-        ok ->
-            multicast_unregister(Name),
+        {ok, RemovedPid} ->
+            multicast_unregister(Name, RemovedPid),
             %% return
             {reply, ok, State};
         {error, Reason} ->
@@ -259,9 +259,9 @@ handle_cast({sync_register, Name, RemotePid, RemoteMeta}, State) ->
     %% return
     {noreply, State};
 
-handle_cast({sync_unregister, Name}, State) ->
+handle_cast({sync_unregister, Name, Pid}, State) ->
     %% remove
-    remove_from_local_table(Name),
+    remove_from_local_table(Name, Pid),
     %% return
     {noreply, State};
 
@@ -293,9 +293,9 @@ handle_info({'DOWN', _MonitorRef, process, Pid, Reason}, State) ->
                 %% handle
                 handle_process_down(Name, Pid, Meta, Reason, State),
                 %% remove from table
-                remove_from_local_table(Name),
+                remove_from_local_table(Name, Pid),
                 %% multicast
-                multicast_unregister(Name)
+                multicast_unregister(Name, Pid)
             end, Entries)
     end,
     %% return
@@ -372,11 +372,11 @@ multicast_register(Name, Pid, Meta) ->
         end, nodes())
     end).
 
--spec multicast_unregister(Name :: any()) -> pid().
-multicast_unregister(Name) ->
+-spec multicast_unregister(Name :: any(), Pid :: pid()) -> pid().
+multicast_unregister(Name, Pid) ->
     spawn_link(fun() ->
         lists:foreach(fun(RemoteNode) ->
-            sync_unregister(RemoteNode, Name)
+            sync_unregister(RemoteNode, Name, Pid)
         end, nodes())
     end).
 
@@ -393,30 +393,34 @@ register_on_node(Name, Pid, Meta) ->
     %% add to table
     add_to_local_table(Name, Pid, Meta, MonitorRef).
 
--spec unregister_on_node(Name :: any()) -> ok | {error, Reason :: any()}.
+-spec unregister_on_node(Name :: any()) -> {ok, RemovedPid :: pid()} | {error, Reason :: any()}.
 unregister_on_node(Name) ->
     case find_registry_entry_by_name(Name) of
         undefined ->
             {error, undefined};
 
-        {Name, _Pid, _Meta, MonitorRef, _Node} when MonitorRef =/= undefined ->
+        {Name, Pid, _Meta, MonitorRef, _Node} when MonitorRef =/= undefined ->
             %% demonitor
             erlang:demonitor(MonitorRef, [flush]),
             %% remove from table
-            remove_from_local_table(Name);
+            remove_from_local_table(Name, Pid),
+            %% return
+            {ok, Pid};
 
-        {Name, _Pid, _Meta, _MonitorRef, Node} = RegistryTuple when Node =:= node() ->
+        {Name, Pid, _Meta, _MonitorRef, Node} = RegistryTuple when Node =:= node() ->
             error_logger:error_msg(
                 "Syn(~p): INTERNAL ERROR | Registry entry ~p has no monitor but it's running on node~n",
                 [node(), RegistryTuple]
             ),
             %% remove from table
-            remove_from_local_table(Name);
+            remove_from_local_table(Name, Pid),
+            %% return
+            {ok, Pid};
 
         _ ->
             %% race condition: un-registration request but entry in table is not a local pid (has no monitor)
-            %% ignore it, sync messages will take care of it
-            ok
+            %% sync messages will take care of it
+            {error, remote_pid}
     end.
 
 -spec add_to_local_table(Name :: any(), Pid :: pid(), Meta :: any(), MonitorRef :: undefined | reference()) -> ok.
@@ -434,15 +438,21 @@ add_to_local_table(Name, Pid, Meta, MonitorRef) ->
     ets:insert(syn_registry_by_pid, {{Pid, Name}, Meta, MonitorRef, node(Pid)}),
     ok.
 
--spec remove_from_local_table(Name :: any()) -> ok.
-remove_from_local_table(Name) ->
-    case ets:take(syn_registry_by_name, Name) of
-        [{Name, Pid, _, _, _}] ->
-            ets:match_delete(syn_registry_by_pid, {{Pid, Name}, '_', '_', '_'}),
+-spec remove_from_local_table(Name :: any(), Pid :: pid()) -> ok.
+remove_from_local_table(Name, Pid) ->
+    case find_registry_tuple_by_name(Name) of
+        undefined ->
             ok;
 
-        _ ->
-            ok
+        {Name, Pid, _} ->
+            ets:delete(syn_registry_by_name, Name),
+            ets:match_delete(syn_registry_by_pid, {{Pid, Name}, '_', '_', '_'}),
+            ok;
+        {Name, TablePid, _} ->
+            error_logger:info_msg(
+                "Syn(~p): Request to delete registry name ~p for pid ~p but locally have ~p, ignoring~n",
+                [node(), Name, Pid, TablePid]
+            )
     end.
 
 -spec find_registry_tuple_by_name(Name :: any()) -> RegistryTuple :: syn_registry_tuple() | undefined.
@@ -568,7 +578,7 @@ resolve_tuple(Name, RemotePid, RemoteMeta, RemoteNode, State) ->
                 true ->
                     add_to_local_table(Name, RemotePid, RemoteMeta, undefined);
                 _ ->
-                    ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name])
+                    ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name, RemotePid])
             end;
 
         {Name, LocalPid, LocalMeta} ->
@@ -579,7 +589,7 @@ resolve_tuple(Name, RemotePid, RemoteMeta, RemoteNode, State) ->
 
             CallbackIfLocal = fun() ->
                 %% keeping local: remote data still on remote node, remove there
-                ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name])
+                ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name, RemotePid])
             end,
             resolve_conflict(Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta}, CallbackIfLocal, State)
     end.
@@ -656,9 +666,9 @@ resolve_conflict(
                 "Syn(~p): Removing both processes' ~p and ~p data from local and remote tables~n",
                 [node(), RemotePid, TablePid]
             ),
-            remove_from_local_table(Name),
+            remove_from_local_table(Name, TablePid),
             RemoteNode = node(RemotePid),
-            ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name]);
+            ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name, RemotePid]);
 
         Other ->
             error_logger:error_msg(
@@ -687,7 +697,7 @@ rebuild_monitors() ->
                 %% overwrite
                 add_to_local_table(Name, Pid, Meta, MonitorRef);
             _ ->
-                remove_from_local_table(Name)
+                remove_from_local_table(Name, Pid)
         end
     end, Entries).
 

+ 43 - 4
test/syn_registry_SUITE.erl

@@ -58,13 +58,15 @@
     three_nodes_start_syn_before_connecting_cluster_with_custom_conflict_resolution/1,
     three_nodes_registration_race_condition_custom_conflict_resolution/1,
     three_nodes_anti_entropy/1,
-    three_nodes_anti_entropy_manual/1
+    three_nodes_anti_entropy_manual/1,
+    three_nodes_concurrent_registration_unregistration/1
 ]).
 
 %% support
 -export([
     start_syn_delayed_and_register_local_process/3,
-    start_syn_delayed_with_custom_handler_register_local_process/4
+    start_syn_delayed_with_custom_handler_register_local_process/4,
+    seq_unregister_register/3
 ]).
 
 %% include
@@ -129,7 +131,8 @@ groups() ->
             three_nodes_start_syn_before_connecting_cluster_with_custom_conflict_resolution,
             three_nodes_registration_race_condition_custom_conflict_resolution,
             three_nodes_anti_entropy,
-            three_nodes_anti_entropy_manual
+            three_nodes_anti_entropy_manual,
+            three_nodes_concurrent_registration_unregistration
         ]}
     ].
 %% -------------------------------------------------------------------
@@ -388,7 +391,7 @@ single_node_ensure_callback_process_exit_is_called_if_process_killed(_Config) ->
     TestPid = self(),
     ok = syn:register(Name, Pid, {some_meta, TestPid}),
     %% remove from table to simulate conflict resolution
-    syn_registry:remove_from_local_table(Name),
+    syn_registry:remove_from_local_table(Name, TestPid),
     %% kill
     exit(Pid, {syn_resolve_kill, Name, {some_meta, TestPid}}),
     receive
@@ -1045,6 +1048,38 @@ three_nodes_anti_entropy_manual(Config) ->
     {Pid2, SlaveNode2} = rpc:call(SlaveNode2, syn, whereis, ["pid2", with_meta]),
     {Pid1Conflict, keep_this_one} = rpc:call(SlaveNode2, syn, whereis, ["conflict", with_meta]).
 
+three_nodes_concurrent_registration_unregistration(Config) ->
+    CommonName = "common-name",
+    %% get slaves
+    SlaveNode1 = proplists:get_value(slave_node_1, Config),
+    SlaveNode2 = proplists:get_value(slave_node_2, Config),
+    %% start syn on nodes
+    ok = syn:start(),
+    ok = rpc:call(SlaveNode1, syn, start, []),
+    ok = rpc:call(SlaveNode2, syn, start, []),
+    timer:sleep(100),
+    %% start processes
+    Pid0 = syn_test_suite_helper:start_process(),
+    Pid1 = syn_test_suite_helper:start_process(SlaveNode1),
+    Pid2 = syn_test_suite_helper:start_process(SlaveNode2),
+    timer:sleep(100),
+    %% register on 0
+    ok = syn:register(CommonName, Pid0, node()),
+    timer:sleep(250),
+    %% check
+    Node = node(),
+    {Pid0, Node} = syn:whereis(CommonName, with_meta),
+    {Pid0, Node} = rpc:call(SlaveNode1, syn, whereis, [CommonName, with_meta]),
+    {Pid0, Node} = rpc:call(SlaveNode2, syn, whereis, [CommonName, with_meta]),
+    %% simulate unregistration with inconsistent data
+    syn_registry:sync_unregister(SlaveNode1, Pid1, CommonName),
+    timer:sleep(250),
+    %% check
+    Node = node(),
+    {Pid0, Node} = syn:whereis(CommonName, with_meta),
+    {Pid0, Node} = rpc:call(SlaveNode1, syn, whereis, [CommonName, with_meta]),
+    {Pid0, Node} = rpc:call(SlaveNode2, syn, whereis, [CommonName, with_meta]).
+
 %% ===================================================================
 %% Internal
 %% ===================================================================
@@ -1073,3 +1108,7 @@ start_syn_delayed_with_custom_handler_register_local_process(Name, Pid, Meta, Ms
         syn:start(),
         ok = syn:register(Name, Pid, Meta)
     end).
+
+seq_unregister_register(Name, Pid, Meta) ->
+    syn:unregister(Name),
+    syn:register(Name, Pid, Meta).