Browse Source

Adding custom handler for conflict resolution.

Roberto Ostinelli 5 years ago
parent
commit
8a966036b5

+ 22 - 5
src/syn_event_handler.erl

@@ -28,7 +28,7 @@
 %% ==========================================================================================================
 -module(syn_event_handler).
 
--export([resolve_registry_conflict/4]).
+-export([do_resolve_registry_conflict/4]).
 
 -callback on_process_exit(
     Name :: any(),
@@ -52,11 +52,28 @@
 
 -optional_callbacks([on_process_exit/4, on_group_process_exit/4, resolve_registry_conflict/3]).
 
--spec resolve_registry_conflict(
+-spec do_resolve_registry_conflict(
     Name :: any(),
     {Pid1 :: pid(), Meta1 :: any()},
     {Pid2 :: pid(), Meta2 :: any()},
     CustomEventHandler :: module()
-) -> ok.
-resolve_registry_conflict(_Name, {LocalPid, _LocalMeta}, {_RemotePid, _RemoteMeta}, _CustomEventHandler) ->
-    LocalPid.
+) -> PidToKeep :: pid() | undefined.
+do_resolve_registry_conflict(Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta}, CustomEventHandler) ->
+    case erlang:function_exported(CustomEventHandler, resolve_registry_conflict, 3) of
+        true ->
+            try CustomEventHandler:resolve_registry_conflict(Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta}) of
+                PidToKeep when is_pid(PidToKeep) ->
+                    PidToKeep;
+                _ ->
+                    undefined
+            catch Class:Reason:Stacktrace ->
+                error_logger:error_msg(
+                    "Syn(~p): Error in custom handler while selecting process to keep: ~p:~p:~p",
+                    [node(), Class, Reason, Stacktrace]
+                ),
+                undefined
+            end;
+        _ ->
+            %% by default, keep local pid
+            LocalPid
+    end.

+ 15 - 14
src/syn_registry.erl

@@ -36,7 +36,7 @@
 %% sync API
 -export([sync_register/4, sync_unregister/2]).
 -export([sync_get_local_registry_tuples/1]).
--export([unregister_on_node/1]).
+-export([remove_from_local_table/1]).
 
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@@ -146,6 +146,8 @@ init([]) ->
             ok = net_kernel:monitor_nodes(true),
             %% get handler
             CustomEventHandler = application:get_env(syn, event_handler, ?DEFAULT_EVENT_HANDLER_MODULE),
+            %% ensure that is it loaded (not using code:ensure_loaded/1 to support embedded mode)
+            catch CustomEventHandler:module_info(exports),
             %% init
             {ok, #state{
                 custom_event_handler = CustomEventHandler
@@ -409,13 +411,9 @@ sync_registry_tuples(RemoteNode, RegistryTuples, #state{
                     "Syn(~p): Conflicting name process found for: ~p, processes are ~p, ~p~n",
                     [node(), Name, LocalPid, RemotePid]
                 ),
-                %% unregister local
-                unregister_on_node(Name),
-                %% unregister remote
-                ok = rpc:call(RemoteNode, syn_registry, unregister_on_node, [Name]),
 
                 %% call conflict resolution
-                PidToKeep = syn_event_handler:resolve_registry_conflict(
+                PidToKeep = syn_event_handler:do_resolve_registry_conflict(
                     Name,
                     {LocalPid, LocalMeta},
                     {RemotePid, RemoteMeta},
@@ -426,17 +424,20 @@ sync_registry_tuples(RemoteNode, RegistryTuples, #state{
                 case PidToKeep of
                     LocalPid ->
                         %% keep local
-                        exit(RemotePid, kill),
-                        register_on_node(Name, LocalPid, LocalMeta);
+                        ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name]),
+                        exit(RemotePid, kill);
 
                     RemotePid ->
                         %% keep remote
-                        exit(LocalPid, kill),
-                        register_on_node(Name, RemotePid, RemoteMeta);
-
-                    _ ->
-                        % don't keep any of the two
-                        ok
+                        remove_from_local_table(Name),
+                        add_to_local_table(Name, RemotePid, RemoteMeta, undefined),
+                        exit(LocalPid, kill);
+
+                    Other ->
+                        error_logger:error_msg(
+                            "Syn(~p): Custom handler returned ~p, valid options were ~p and ~p",
+                            [node(), Other, LocalPid, RemotePid]
+                        )
                 end
         end
     end,

+ 73 - 5
test/syn_registry_SUITE.erl

@@ -46,12 +46,14 @@
 -export([
     three_nodes_partial_netsplit_consistency/1,
     three_nodes_full_netsplit_consistency/1,
-    three_nodes_start_syn_before_connecting_cluster/1
+    three_nodes_start_syn_before_connecting_cluster_with_conflict/1,
+    three_nodes_start_syn_before_connecting_cluster_with_custom_conflict_resolution/1
 ]).
 
 %% support
 -export([
-    start_syn_delayed_and_register_local_process/3
+    start_syn_delayed_and_register_local_process/3,
+    start_syn_delayed_with_custom_handler_register_local_process/4
 ]).
 
 %% include
@@ -104,7 +106,8 @@ groups() ->
         {three_nodes_process_registration, [shuffle], [
             three_nodes_partial_netsplit_consistency,
             three_nodes_full_netsplit_consistency,
-            three_nodes_start_syn_before_connecting_cluster
+            three_nodes_start_syn_before_connecting_cluster_with_conflict,
+            three_nodes_start_syn_before_connecting_cluster_with_custom_conflict_resolution
         ]}
     ].
 %% -------------------------------------------------------------------
@@ -585,7 +588,7 @@ three_nodes_full_netsplit_consistency(Config) ->
     syn_test_suite_helper:kill_process(Pid1),
     syn_test_suite_helper:kill_process(Pid2).
 
-three_nodes_start_syn_before_connecting_cluster(Config) ->
+three_nodes_start_syn_before_connecting_cluster_with_conflict(Config) ->
     ConflictingName = "COMMON",
     %% get slaves
     SlaveNode1 = proplists:get_value(slave_node_1, Config),
@@ -603,7 +606,7 @@ three_nodes_start_syn_before_connecting_cluster(Config) ->
     rpc:call(SlaveNode1, syn_test_suite_helper, disconnect_node, [SlaveNode2]),
     syn_test_suite_helper:disconnect_node(SlaveNode1),
     syn_test_suite_helper:disconnect_node(SlaveNode2),
-    timer:sleep(1000),
+    timer:sleep(1500),
     [] = nodes(),
     %% reconnect all
     syn_test_suite_helper:connect_node(SlaveNode1),
@@ -633,6 +636,56 @@ three_nodes_start_syn_before_connecting_cluster(Config) ->
     syn_test_suite_helper:kill_process(Pid1),
     syn_test_suite_helper:kill_process(Pid2).
 
+three_nodes_start_syn_before_connecting_cluster_with_custom_conflict_resolution(Config) ->
+    ConflictingName = "COMMON",
+    %% get slaves
+    SlaveNode1 = proplists:get_value(slave_node_1, Config),
+    SlaveNode2 = proplists:get_value(slave_node_2, Config),
+    %% start processes
+    Pid0 = syn_test_suite_helper:start_process(),
+    Pid1 = syn_test_suite_helper:start_process(SlaveNode1),
+    Pid2 = syn_test_suite_helper:start_process(SlaveNode2),
+    %% start delayed
+    start_syn_delayed_with_custom_handler_register_local_process(ConflictingName, Pid0, {node, node()}, 1500),
+    rpc:cast(
+        SlaveNode1,
+        ?MODULE,
+        start_syn_delayed_with_custom_handler_register_local_process,
+        [ConflictingName, Pid1, keep_this_one, 1500])
+    ,
+    rpc:cast(
+        SlaveNode2,
+        ?MODULE,
+        start_syn_delayed_with_custom_handler_register_local_process,
+        [ConflictingName, Pid2, {node, SlaveNode2}, 1500]
+    ),
+    timer:sleep(500),
+    %% disconnect all
+    rpc:call(SlaveNode1, syn_test_suite_helper, disconnect_node, [SlaveNode2]),
+    syn_test_suite_helper:disconnect_node(SlaveNode1),
+    syn_test_suite_helper:disconnect_node(SlaveNode2),
+    timer:sleep(1500),
+    [] = nodes(),
+    %% reconnect all
+    syn_test_suite_helper:connect_node(SlaveNode1),
+    syn_test_suite_helper:connect_node(SlaveNode2),
+    rpc:call(SlaveNode1, syn_test_suite_helper, connect_node, [SlaveNode2]),
+    timer:sleep(2500),
+    %% count
+    1 = syn:registry_count(),
+    1 = rpc:call(SlaveNode1, syn, registry_count, []),
+    1 = rpc:call(SlaveNode2, syn, registry_count, []),
+    %% retrieve
+    true = lists:member(syn:whereis(ConflictingName), [Pid0, Pid1, Pid2]),
+    true = lists:member(rpc:call(SlaveNode1, syn, whereis, [ConflictingName]), [Pid0, Pid1, Pid2]),
+    true = lists:member(rpc:call(SlaveNode2, syn, whereis, [ConflictingName]), [Pid0, Pid1, Pid2]),
+    %% check metadata that we kept the correct process
+    {Pid1, keep_this_one} = syn:whereis(ConflictingName, with_meta),
+    %% kill processes
+    syn_test_suite_helper:kill_process(Pid0),
+    syn_test_suite_helper:kill_process(Pid1),
+    syn_test_suite_helper:kill_process(Pid2).
+
 %% ===================================================================
 %% Internal
 %% ===================================================================
@@ -643,6 +696,21 @@ start_syn_delayed_and_register_local_process(Name, Pid, Ms) ->
         end, nodes()),
         timer:sleep(Ms),
         [] = nodes(),
+        %%
         syn:start(),
         ok = syn:register(Name, Pid, node())
     end).
+
+start_syn_delayed_with_custom_handler_register_local_process(Name, Pid, Meta, Ms) ->
+    spawn(fun() ->
+        lists:foreach(fun(Node) ->
+            syn_test_suite_helper:disconnect_node(Node)
+        end, nodes()),
+        timer:sleep(Ms),
+        [] = nodes(),
+        %% use customer handler
+        syn_test_suite_helper:use_custom_handler(),
+        %%
+        syn:start(),
+        ok = syn:register(Name, Pid, Meta)
+    end).

+ 42 - 0
test/syn_test_event_handler.erl

@@ -0,0 +1,42 @@
+%% ==========================================================================================================
+%% Syn - A global Process Registry and Process Group manager.
+%%
+%% The MIT License (MIT)
+%%
+%% Copyright (c) 2019 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
+%%
+%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%% of this software and associated documentation files (the "Software"), to deal
+%% in the Software without restriction, including without limitation the rights
+%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%% copies of the Software, and to permit persons to whom the Software is
+%% furnished to do so, subject to the following conditions:
+%%
+%% The above copyright notice and this permission notice shall be included in
+%% all copies or substantial portions of the Software.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+%% THE SOFTWARE.
+%% ==========================================================================================================
+-module(syn_test_event_handler).
+-behaviour(syn_event_handler).
+
+%% API
+-export([resolve_registry_conflict/3]).
+
+-spec resolve_registry_conflict(
+    Name :: any(),
+    {Pid1 :: pid(), Meta1 :: any()},
+    {Pid2 :: pid(), Meta2 :: any()}
+) -> PidToKeep :: pid().
+resolve_registry_conflict(_Name, {LocalPid, keep_this_one}, {_RemotePid, _RemoteMeta}) ->
+    LocalPid;
+resolve_registry_conflict(_Name, {_LocalPid, _LocalMeta}, {RemotePid, keep_this_one}) ->
+    RemotePid;
+resolve_registry_conflict(_Name, {LocalPid, _LocalMeta}, {_RemotePid, _RemoteMeta}) ->
+    LocalPid.

+ 24 - 1
test/syn_test_suite_helper.erl

@@ -31,6 +31,8 @@
 -export([clean_after_test/0]).
 -export([start_process/0, start_process/1, start_process/2]).
 -export([kill_process/1]).
+-export([use_custom_handler/0]).
+-export([start_collecting_debug_data/0, send_debug_data/1, print_debug_data/0]).
 
 %% internal
 -export([process_main/0]).
@@ -40,7 +42,10 @@
 %% ===================================================================
 start_slave(NodeShortName) ->
     CodePath = code:get_path(),
-    {ok, Node} = ct_slave:start(NodeShortName, [{boot_timeout, 10}]),
+    {ok, Node} = ct_slave:start(NodeShortName, [
+        {boot_timeout, 10}
+%%        {erl_flags, ErlangFlags}
+    ]),
     true = rpc:call(Node, code, set_path, [CodePath]),
     {ok, Node}.
 
@@ -79,6 +84,24 @@ start_process(Node, Loop) ->
 kill_process(Pid) ->
     exit(Pid, kill).
 
+use_custom_handler() ->
+    application:set_env(syn, event_handler, syn_test_event_handler).
+
+start_collecting_debug_data() ->
+    global:register_name(syn_debug_process, self()).
+
+send_debug_data(Message) ->
+    global:send(syn_debug_process, Message).
+
+print_debug_data() ->
+    receive
+        Any ->
+            ct:pal("~p", [Any]),
+            print_debug_data()
+    after 1000 ->
+        ok
+    end.
+
 %% ===================================================================
 %% Internal
 %% ===================================================================