Просмотр исходного кода

Resolve conflicts for registration race conditions with custom handler.

Roberto Ostinelli 5 лет назад
Родитель
Сommit
f17a819110
2 измененных файлов с 87 добавлено и 21 удалено
  1. 42 19
      src/syn_registry.erl
  2. 45 2
      test/syn_registry_SUITE.erl

+ 42 - 19
src/syn_registry.erl

@@ -34,8 +34,8 @@
 -export([count/0, count/1]).
 
 %% sync API
+-export([sync_register/4, sync_unregister/2]).
 -export([sync_get_local_registry_tuples/1]).
--export([add_remote_to_local_table/3]).
 -export([add_to_local_table/4]).
 -export([remove_from_local_table/1]).
 
@@ -101,6 +101,14 @@ count(Node) ->
     RegistryTuples = get_registry_tuples_for_node(Node),
     length(RegistryTuples).
 
+-spec sync_register(RemoteNode :: node(), Name :: any(), Pid :: pid(), Meta :: any()) -> ok.
+sync_register(RemoteNode, Name, Pid, Meta) ->
+    gen_server:cast({?MODULE, RemoteNode}, {sync_register, Name, Pid, Meta}).
+
+-spec sync_unregister(RemoteNode :: node(), Name :: any()) -> ok.
+sync_unregister(RemoteNode, Name) ->
+    gen_server:cast({?MODULE, RemoteNode}, {sync_unregister, Name}).
+
 -spec sync_get_local_registry_tuples(FromNode :: node()) -> [syn_registry_tuple()].
 sync_get_local_registry_tuples(FromNode) ->
     error_logger:info_msg("Syn(~p): Received request of local registry tuples from remote node: ~p~n", [node(), FromNode]),
@@ -191,6 +199,18 @@ handle_call(Request, From, State) ->
     {noreply, #state{}, Timeout :: non_neg_integer()} |
     {stop, Reason :: any(), #state{}}.
 
+handle_cast({sync_register, Name, Pid, Meta}, State) ->
+    %% add to table
+    add_remote_to_local_table(Name, Pid, Meta, State),
+    %% return
+    {noreply, State};
+
+handle_cast({sync_unregister, Name}, State) ->
+    %% remove from table
+    remove_from_local_table(Name),
+    %% return
+    {noreply, State};
+
 handle_cast(Msg, State) ->
     error_logger:warning_msg("Syn(~p): Received an unknown cast message: ~p~n", [node(), Msg]),
     {noreply, State}.
@@ -275,13 +295,17 @@ code_change(_OldVsn, State, _Extra) ->
 -spec multicast_register(Name :: any(), Pid :: pid(), Meta :: any()) -> pid().
 multicast_register(Name, Pid, Meta) ->
     spawn_link(fun() ->
-        rpc:eval_everywhere(nodes(), ?MODULE, add_remote_to_local_table, [Name, Pid, Meta])
+        lists:foreach(fun(RemoteNode) ->
+            sync_register(RemoteNode, Name, Pid, Meta)
+        end, nodes())
     end).
 
 -spec multicast_unregister(Name :: any()) -> pid().
 multicast_unregister(Name) ->
     spawn_link(fun() ->
-        rpc:eval_everywhere(nodes(), ?MODULE, remove_from_local_table, [Name])
+        lists:foreach(fun(RemoteNode) ->
+            sync_unregister(RemoteNode, Name)
+        end, nodes())
     end).
 
 -spec register_on_node(Name :: any(), Pid :: pid(), Meta :: any()) -> ok.
@@ -319,8 +343,8 @@ add_to_local_table(Name, Pid, Meta, MonitorRef) ->
         monitor_ref = MonitorRef
     }).
 
--spec add_remote_to_local_table(Name :: any(), RemotePid :: pid(), RemoteMeta :: any()) -> ok.
-add_remote_to_local_table(Name, RemotePid, RemoteMeta) ->
+-spec add_remote_to_local_table(Name :: any(), RemotePid :: pid(), RemoteMeta :: any(), State :: #state{}) -> ok.
+add_remote_to_local_table(Name, RemotePid, RemoteMeta, State) ->
     %% check for conflicts
     case find_process_entry_by_name(Name) of
         undefined ->
@@ -342,23 +366,21 @@ add_remote_to_local_table(Name, RemotePid, RemoteMeta) ->
                         false ->
                             LocalPid = Entry#syn_registry_table.pid,
                             LocalMeta = Entry#syn_registry_table.meta,
+
                             error_logger:warning_msg(
                                 "Syn(~p): Conflicting name from multicast found for: ~p, processes are ~p, ~p~n",
                                 [node(), Name, LocalPid, RemotePid]
                             ),
 
-                            %% TODO: get handler
-                            CustomEventHandler = undefined,
-
-
-                            resolve_conflict(Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta}, CustomEventHandler,
+                            resolve_conflict(Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta},
                                 fun() ->
                                     RemoteNode = node(RemotePid),
                                     ok = rpc:call(RemoteNode, syn_registry, add_to_local_table, [Name, LocalPid, LocalMeta, undefined])
                                 end,
                                 fun() ->
                                     add_to_local_table(Name, RemotePid, RemoteMeta, undefined)
-                                end
+                                end,
+                                State
                             )
                     end
                 end
@@ -369,11 +391,13 @@ add_remote_to_local_table(Name, RemotePid, RemoteMeta) ->
     Name :: any(),
     {LocalPid :: pid(), LocalMeta :: any()},
     {RemotePid :: pid(), RemoteMeta :: any()},
-    CustomEventHandler :: module(),
     KeepLocalFun :: fun(),
-    KeepRemoteFun :: fun()
+    KeepRemoteFun :: fun(),
+    #state{}
 ) -> ok.
-resolve_conflict(Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta}, CustomEventHandler, KeepLocalFun, KeepRemoteFun) ->
+resolve_conflict(Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta}, KeepLocalFun, KeepRemoteFun, #state{
+    custom_event_handler = CustomEventHandler
+}) ->
     %% call conflict resolution
     {PidToKeep, KillOther} = syn_event_handler:do_resolve_registry_conflict(
         Name,
@@ -461,9 +485,7 @@ handle_process_down(Name, Pid, Meta, Reason, #state{
     end.
 
 -spec sync_registry_tuples(RemoteNode :: node(), RegistryTuples :: [syn_registry_tuple()], #state{}) -> ok.
-sync_registry_tuples(RemoteNode, RegistryTuples, #state{
-    custom_event_handler = CustomEventHandler
-}) ->
+sync_registry_tuples(RemoteNode, RegistryTuples, State) ->
     %% ensure that registry doesn't have any joining node's entries (here again for race conditions)
     purge_registry_entries_for_remote_node(RemoteNode),
     %% loop
@@ -483,13 +505,14 @@ sync_registry_tuples(RemoteNode, RegistryTuples, #state{
                     [node(), Name, LocalPid, RemotePid]
                 ),
 
-                resolve_conflict(Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta}, CustomEventHandler,
+                resolve_conflict(Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta},
                     fun() ->
                         ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name])
                     end,
                     fun() ->
                         add_to_local_table(Name, RemotePid, RemoteMeta, undefined)
-                    end
+                    end,
+                    State
                 )
         end
     end,

+ 45 - 2
test/syn_registry_SUITE.erl

@@ -50,7 +50,8 @@
     three_nodes_partial_netsplit_consistency/1,
     three_nodes_full_netsplit_consistency/1,
     three_nodes_start_syn_before_connecting_cluster_with_conflict/1,
-    three_nodes_start_syn_before_connecting_cluster_with_custom_conflict_resolution/1
+    three_nodes_start_syn_before_connecting_cluster_with_custom_conflict_resolution/1,
+    three_nodes_registration_race_condition_custom_conflict_resolution/1
 ]).
 
 %% support
@@ -114,7 +115,8 @@ groups() ->
             three_nodes_partial_netsplit_consistency,
             three_nodes_full_netsplit_consistency,
             three_nodes_start_syn_before_connecting_cluster_with_conflict,
-            three_nodes_start_syn_before_connecting_cluster_with_custom_conflict_resolution
+            three_nodes_start_syn_before_connecting_cluster_with_custom_conflict_resolution,
+            three_nodes_registration_race_condition_custom_conflict_resolution
         ]}
     ].
 %% -------------------------------------------------------------------
@@ -822,6 +824,47 @@ three_nodes_start_syn_before_connecting_cluster_with_custom_conflict_resolution(
     syn_test_suite_helper:kill_process(Pid1),
     syn_test_suite_helper:kill_process(Pid2).
 
+three_nodes_registration_race_condition_custom_conflict_resolution(Config) ->
+    ConflictingName = "COMMON",
+    %% get slaves
+    SlaveNode1 = proplists:get_value(slave_node_1, Config),
+    SlaveNode2 = proplists:get_value(slave_node_2, Config),
+    %% use customer handler
+    syn_test_suite_helper:use_custom_handler(),
+    rpc:call(SlaveNode1, syn_test_suite_helper, use_custom_handler, []),
+    rpc:call(SlaveNode2, syn_test_suite_helper, use_custom_handler, []),
+    %% start syn on nodes
+    ok = syn:start(),
+    ok = rpc:call(SlaveNode1, syn, start, []),
+    ok = rpc:call(SlaveNode2, syn, start, []),
+    timer:sleep(500),
+    %% start processes
+    Pid0 = syn_test_suite_helper:start_process(),
+    Pid1 = syn_test_suite_helper:start_process(SlaveNode1),
+    Pid2 = syn_test_suite_helper:start_process(SlaveNode2),
+    %% inject into syn to simulate concurrent registration
+    ok = rpc:call(SlaveNode1, ?MODULE, inject_add_to_local_node, [ConflictingName, Pid1, keep_this_one]),
+    ok = rpc:call(SlaveNode2, ?MODULE, inject_add_to_local_node, [ConflictingName, Pid2, SlaveNode2]),
+    %% register on master node to trigger conflict resolution
+    ok = syn:register(ConflictingName, Pid0, node()),
+    timer:sleep(1000),
+    %% retrieve
+    true = lists:member(syn:whereis(ConflictingName), [Pid0, Pid1, Pid2]),
+    true = lists:member(rpc:call(SlaveNode1, syn, whereis, [ConflictingName]), [Pid0, Pid1, Pid2]),
+    true = lists:member(rpc:call(SlaveNode2, syn, whereis, [ConflictingName]), [Pid0, Pid1, Pid2]),
+    %% check metadata that we kept the correct process on all nodes
+    {Pid1, keep_this_one} = syn:whereis(ConflictingName, with_meta),
+    {Pid1, keep_this_one} = rpc:call(SlaveNode1, syn, whereis, [ConflictingName, with_meta]),
+    {Pid1, keep_this_one} = rpc:call(SlaveNode1, syn, whereis, [ConflictingName, with_meta]),
+    %% check that other processes are still alive because we didn't kill them
+    true = is_process_alive(Pid0),
+    true = rpc:call(SlaveNode1, erlang, is_process_alive, [Pid1]),
+    true = rpc:call(SlaveNode2, erlang, is_process_alive, [Pid2]),
+    %% kill processes
+    syn_test_suite_helper:kill_process(Pid0),
+    syn_test_suite_helper:kill_process(Pid1),
+    syn_test_suite_helper:kill_process(Pid2).
+
 %% ===================================================================
 %% Internal
 %% ===================================================================