Просмотр исходного кода

Avoid double sync with remote reg/unregistering node.

Roberto Ostinelli 3 лет назад
Родитель
Сommit
da1f99ae1b
2 измененных файлов с 15 добавлено и 9 удалено
  1. 14 8
      src/syn_registry.erl
  2. 1 1
      test/syn_registry_SUITE.erl

+ 14 - 8
src/syn_registry.erl

@@ -102,7 +102,7 @@ register(Scope, Name, Pid) when is_pid(Pid) ->
 register(Scope, Name, Pid, Meta) ->
     ProcessName = get_process_name_for_scope(Scope),
     Node = node(Pid),
-    try gen_server:call({ProcessName, Node}, {register_on_node, Name, Pid, Meta}) of
+    try gen_server:call({ProcessName, Node}, {register_on_owner, node(), Name, Pid, Meta}) of
         {ok, {TablePid, TableMeta, Time}} when Node =/= node() ->
             %% update table on caller node immediately so that subsequent calls have an updated registry
             add_to_local_table(Scope, Name, Pid, Meta, Time, undefined),
@@ -131,7 +131,7 @@ unregister(Scope, Name) ->
         {{Name, Pid}, Meta, _, _, _} ->
             ProcessName = get_process_name_for_scope(Scope),
             Node = node(Pid),
-            case gen_server:call({ProcessName, Node}, {unregister_on_node, Name, Pid}) of
+            case gen_server:call({ProcessName, Node}, {unregister_on_owner, node(), Name, Pid}) of
                 ok when Node =/= node() ->
                     %% remove table on caller node immediately so that subsequent calls have an updated registry
                     remove_from_local_table(Scope, Name, Pid),
@@ -206,7 +206,7 @@ handle_call(get_subcluster_nodes, _From, #state{
 } = State) ->
     {reply, Nodes, State};
 
-handle_call({register_on_node, Name, Pid, Meta}, _From, #state{
+handle_call({register_on_owner, RequesterNode, Name, Pid, Meta}, _From, #state{
     scope = Scope
 } = State) ->
     case is_process_alive(Pid) of
@@ -224,7 +224,7 @@ handle_call({register_on_node, Name, Pid, Meta}, _From, #state{
                     %% callback
                     syn_event_handler:do_on_process_registered(Scope, Name, {undefined, undefined}, {Pid, Meta}),
                     %% broadcast
-                    broadcast({'3.0', sync_register, Scope, Name, Pid, Meta, Time}, State),
+                    broadcast({'3.0', sync_register, Scope, Name, Pid, Meta, Time}, [RequesterNode], State),
                     %% return
                     {reply, {ok, {undefined, undefined, Time}}, State};
 
@@ -247,7 +247,7 @@ handle_call({register_on_node, Name, Pid, Meta}, _From, #state{
             {reply, {{error, not_alive}, undefined}, State}
     end;
 
-handle_call({unregister_on_node, Name, Pid}, _From, #state{scope = Scope} = State) ->
+handle_call({unregister_on_owner, RequesterNode, Name, Pid}, _From, #state{scope = Scope} = State) ->
     case find_registry_entry_by_name(Scope, Name) of
         {{Name, Pid}, Meta, _Time, _MRef, _Node} ->
             %% demonitor if the process is not registered under other names
@@ -257,7 +257,7 @@ handle_call({unregister_on_node, Name, Pid}, _From, #state{scope = Scope} = Stat
             %% callback
             syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
             %% broadcast
-            broadcast({'3.0', sync_unregister, Name, Pid, Meta}, State),
+            broadcast({'3.0', sync_unregister, Name, Pid, Meta}, [RequesterNode], State),
             %% return
             {reply, ok, State};
 
@@ -436,8 +436,14 @@ rebuild_monitors(Scope) ->
     end, RegistryTuples).
 
 -spec broadcast(Message :: any(), #state{}) -> any().
-broadcast(Message, #state{process_name = ProcessName, nodes = Nodes}) ->
-    lists:foreach(fun(RemoteNode) -> gen_server:cast({ProcessName, RemoteNode}, Message) end, maps:keys(Nodes)).
+broadcast(Message, State) ->
+    broadcast(Message, [], State).
+
+-spec broadcast(Message :: any(), ExcludedNodes :: [node()], #state{}) -> any().
+broadcast(Message, ExcludedNodes, #state{process_name = ProcessName, nodes = Nodes}) ->
+    lists:foreach(fun(RemoteNode) ->
+        gen_server:cast({ProcessName, RemoteNode}, Message)
+    end, maps:keys(Nodes) -- ExcludedNodes).
 
 -spec broadcast_all(Message :: any(), #state{}) -> any().
 broadcast_all(Message, #state{process_name = ProcessName}) ->

+ 1 - 1
test/syn_registry_SUITE.erl

@@ -44,7 +44,7 @@
 
 %% include
 -include_lib("common_test/include/ct.hrl").
--include_lib("../src/syn.hrl").
+-include_lib("syn/src/syn.hrl").
 
 %% ===================================================================
 %% Callbacks