Browse Source

Add on_process_registered callback.

Roberto Ostinelli 3 years ago
parent
commit
bc895b41e4

+ 3 - 0
src/syn_app.erl

@@ -37,6 +37,9 @@
     StartArgs :: any()
 ) -> {ok, pid()} | {ok, pid(), State :: any()} | {error, any()}.
 start(_StartType, _StartArgs) ->
+    %% ensure event handler is loaded
+    syn_event_handler:ensure_event_handler_loaded(),
+    %% start main sup
     syn_sup:start_link().
 
 -spec stop(State :: any()) -> ok.

+ 57 - 8
src/syn_event_handler.erl

@@ -28,7 +28,17 @@
 %% ==========================================================================================================
 -module(syn_event_handler).
 
--export([do_resolve_registry_conflict/5]).
+%% API
+-export([ensure_event_handler_loaded/0]).
+-export([do_on_process_registered/4]).
+-export([do_resolve_registry_conflict/4]).
+
+-callback on_process_registered(
+    Scope :: any(),
+    Name :: any(),
+    Pid :: pid(),
+    Meta :: any()
+) -> any().
 
 -callback resolve_registry_conflict(
     Name :: any(),
@@ -36,26 +46,58 @@
     {Pid2 :: pid(), Meta2 :: any()}
 ) -> PidToKeep :: pid() | undefined.
 
--optional_callbacks([resolve_registry_conflict/3]).
+-optional_callbacks([on_process_registered/4, resolve_registry_conflict/3]).
+
+%% ===================================================================
+%% API
+%% ===================================================================
+-spec ensure_event_handler_loaded() -> module().
+ensure_event_handler_loaded() ->
+    %% get handler
+    CustomEventHandler = get_custom_event_handler(),
+    %% ensure that is it loaded (not using code:ensure_loaded/1 to support embedded mode)
+    catch CustomEventHandler:module_info(exports).
+
+-spec do_on_process_registered(
+    Scope :: atom(),
+    Name :: any(),
+    Pid :: pid(),
+    Meta :: any()
+) -> any().
+do_on_process_registered(Scope, Name, Pid, Meta) ->
+    CustomEventHandler = get_custom_event_handler(),
+    case erlang:function_exported(CustomEventHandler, on_process_registered, 4) of
+        true ->
+            try CustomEventHandler:on_process_registered(Scope, Name, Pid, Meta)
+            catch Class:Reason:Stacktrace ->
+                error_logger:error_msg(
+                    "Syn(~p): Error ~p:~p in custom handler on_process_registered: ~p~n",
+                    [node(), Class, Reason, Stacktrace]
+                )
+            end;
+
+        _ ->
+            ok
+    end.
 
 -spec do_resolve_registry_conflict(
     Scope :: atom(),
     Name :: any(),
     {Pid1 :: pid(), Meta1 :: any(), Time1 :: non_neg_integer()},
-    {Pid2 :: pid(), Meta2 :: any(), Time2 :: non_neg_integer()},
-    CustomEventHandler :: module() | undefined
+    {Pid2 :: pid(), Meta2 :: any(), Time2 :: non_neg_integer()}
 ) -> PidToKeep :: pid() | undefined.
-do_resolve_registry_conflict(Scope, Name, {Pid1, Meta1, Time1}, {Pid2, Meta2, Time2}, CustomEventHandler) ->
+do_resolve_registry_conflict(Scope, Name, {Pid1, Meta1, Time1}, {Pid2, Meta2, Time2}) ->
+    CustomEventHandler = get_custom_event_handler(),
     case erlang:function_exported(CustomEventHandler, resolve_registry_conflict, 4) of
         true ->
-            try CustomEventHandler:resolve_registry_conflict(Scope, Name, {Pid1, Meta1}, {Pid2, Meta2}) of
+            try CustomEventHandler:resolve_registry_conflict(Scope, Name, {Pid1, Meta1, Time1}, {Pid2, Meta2, Time2}) of
                 PidToKeep when is_pid(PidToKeep) -> PidToKeep;
                 _ -> undefined
 
-            catch Exception:Reason ->
+            catch Class:Reason ->
                 error_logger:error_msg(
                     "Syn(~p): Error ~p in custom handler resolve_registry_conflict: ~p~n",
-                    [node(), Exception, Reason]
+                    [node(), Class, Reason]
                 ),
                 undefined
             end;
@@ -70,3 +112,10 @@ do_resolve_registry_conflict(Scope, Name, {Pid1, Meta1, Time1}, {Pid2, Meta2, Ti
             end,
             PidToKeep
     end.
+
+%% ===================================================================
+%% Internal
+%% ===================================================================
+-spec get_custom_event_handler() -> undefined | {ok, CustomEventHandler :: atom()}.
+get_custom_event_handler() ->
+    application:get_env(syn, event_handler, undefined).

+ 8 - 7
src/syn_registry.erl

@@ -497,14 +497,17 @@ maybe_demonitor(Scope, Pid) ->
     Meta :: any(),
     Time :: integer(),
     MRef :: undefined | reference()
-) -> true.
+) -> any().
 add_to_local_table(Scope, Name, Pid, Meta, Time, MRef) ->
+    %% insert
     true = ets:insert(syn_backbone:get_table_name(syn_registry_by_name, Scope),
         {{Name, Pid}, Meta, Time, MRef, node(Pid)}
     ),
     true = ets:insert(syn_backbone:get_table_name(syn_registry_by_pid, Scope),
         {{Pid, Name}, Meta, Time, MRef, node(Pid)}
-    ).
+    ),
+    %% callback
+    syn_event_handler:do_on_process_registered(Scope, Name, Pid, Meta).
 
 -spec remove_from_local_table(Scope :: atom(), Name :: any(), Pid :: pid()) -> true.
 remove_from_local_table(Scope, Name, Pid) ->
@@ -573,16 +576,14 @@ handle_registry_sync(Scope, Name, Pid, Meta, Time, State) ->
     {Pid :: pid(), Meta :: any(), Time :: non_neg_integer()},
     {TablePid :: pid(), TableMeta :: any(), TableTime :: non_neg_integer(), TableMRef :: reference()},
     #state{}
-) -> KeptPid :: pid().
+) -> any().
 resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime, TableMRef}, State) ->
-    CustomEventHandler = undefined,
     %% call conflict resolution
     PidToKeep = syn_event_handler:do_resolve_registry_conflict(
         Scope,
         Name,
         {Pid, Meta, Time},
-        {TablePid, TableMeta, TableTime},
-        CustomEventHandler
+        {TablePid, TableMeta, TableTime}
     ),
     %% resolve
     case PidToKeep of
@@ -610,7 +611,7 @@ resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime
         Invalid ->
             maybe_demonitor(Scope, TablePid),
             remove_from_local_table(Scope, Name, TablePid),
-            %% kill local, remote will be killed by other node performing the resolve
+            %% kill local, remote will be killed by other node performing the same resolve
             exit(TablePid, {syn_resolve_kill, Name, TableMeta}),
             error_logger:info_msg("SYN[~p] Registry CONFLICT for name ~p@~p: ~p ~p -> none chosen (got: ~p)~n",
                 [node(), Name, Scope, Pid, TablePid, Invalid]

+ 67 - 2
test/syn_registry_SUITE.erl

@@ -38,7 +38,8 @@
     three_nodes_register_unregister_and_monitor_default_scope/1,
     three_nodes_register_unregister_and_monitor_custom_scope/1,
     three_nodes_cluster_changes/1,
-    three_nodes_cluster_conflicts/1
+    three_nodes_cluster_conflicts/1,
+    three_nodes_custom_event_handler/1
 ]).
 
 %% include
@@ -81,7 +82,8 @@ groups() ->
             three_nodes_register_unregister_and_monitor_default_scope,
             three_nodes_register_unregister_and_monitor_custom_scope,
             three_nodes_cluster_changes,
-            three_nodes_cluster_conflicts
+            three_nodes_cluster_conflicts,
+            three_nodes_custom_event_handler
         ]}
     ].
 %% -------------------------------------------------------------------
@@ -854,6 +856,69 @@ three_nodes_cluster_conflicts(Config) ->
     true = rpc:call(SlaveNode1, erlang, is_process_alive, [PidCustom1]),
     false = rpc:call(SlaveNode2, erlang, is_process_alive, [PidCustom2]).
 
+three_nodes_custom_event_handler(Config) ->
+    %% get slaves
+    SlaveNode1 = proplists:get_value(slave_node_1, Config),
+    SlaveNode2 = proplists:get_value(slave_node_2, Config),
+
+    %% add custom handler
+    syn_test_suite_helper:use_custom_handler(),
+    rpc:call(SlaveNode1, syn_test_suite_helper, use_custom_handler, []),
+    rpc:call(SlaveNode2, syn_test_suite_helper, use_custom_handler, []),
+
+    %% start syn on nodes
+    ok = syn:start(),
+    ok = rpc:call(SlaveNode1, syn, start, []),
+    ok = rpc:call(SlaveNode2, syn, start, []),
+    timer:sleep(250),
+
+    %% register test process to receive messages back from test handler
+    global:register_name(syn_test_main_process, self()),
+
+    %% start process
+    Pid = syn_test_suite_helper:start_process(),
+    ok = syn:register("proc-handler", Pid, <<"my-meta">>),
+
+    %% check callbacks on_process_registered called on all nodes
+    CurrentNode = node(),
+    receive
+        {on_process_registered, CurrentNode, default, "proc-handler", Pid, <<"my-meta">>} -> ok
+    after 1000 ->
+        ok = on_process_registered_not_called_on_main_node
+    end,
+    receive
+        {on_process_registered, SlaveNode1, default, "proc-handler", Pid, <<"my-meta">>} -> ok
+    after 1000 ->
+        ok = on_process_registered_not_called_on_slave_1_node
+    end,
+    receive
+        {on_process_registered, SlaveNode2, default, "proc-handler", Pid, <<"my-meta">>} -> ok
+    after 1000 ->
+        ok = on_process_registered_not_called_on_slave_2_node
+    end,
+
+    ok = syn:register("proc-handler", Pid, <<"my-new-meta">>),
+
+    %% check callbacks on_process_registered are called on nodes because of change of meta
+    receive
+        {on_process_registered, CurrentNode, default, "proc-handler", Pid, <<"my-new-meta">>} -> ok
+    after 1000 ->
+        ok = on_process_registered_not_called_on_main_node
+    end,
+    receive
+        {on_process_registered, SlaveNode1, default, "proc-handler", Pid, <<"my-new-meta">>} -> ok
+    after 1000 ->
+        ok = on_process_registered_not_called_on_slave_1_node
+    end,
+    receive
+        {on_process_registered, SlaveNode2, default, "proc-handler", Pid, <<"my-new-meta">>} -> ok
+    after 1000 ->
+        ok = on_process_registered_not_called_on_slave_2_node
+    end,
+
+    %% clean
+    global:unregister_name(syn_test_main_process).
+
 %% ===================================================================
 %% Internal
 %% ===================================================================

+ 43 - 0
test/syn_test_event_handler.erl

@@ -0,0 +1,43 @@
+%% ==========================================================================================================
+%% Syn - A global Process Registry and Process Group manager.
+%%
+%% The MIT License (MIT)
+%%
+%% Copyright (c) 2015-2021 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
+%%
+%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%% of this software and associated documentation files (the "Software"), to deal
+%% in the Software without restriction, including without limitation the rights
+%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%% copies of the Software, and to permit persons to whom the Software is
+%% furnished to do so, subject to the following conditions:
+%%
+%% The above copyright notice and this permission notice shall be included in
+%% all copies or substantial portions of the Software.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+%% THE SOFTWARE.
+%% ==========================================================================================================
+-module(syn_test_event_handler).
+-behaviour(syn_event_handler).
+
+-export([on_process_registered/4]).
+
+on_process_registered(Scope, Name, Pid, Meta) ->
+    global:send(syn_test_main_process, {on_process_registered, node(), Scope, Name, Pid, Meta}).
+
+%%-export([resolve_registry_conflict/4]).
+%%
+%%-spec resolve_registry_conflict(
+%%    Scope ::atom(),
+%%    Name :: any(),
+%%    {LocalPid :: pid(), LocalMeta :: any()},
+%%    {RemotePid :: pid(), RemoteMeta :: any()}
+%%) -> PidToKeep :: pid().
+%%resolve_registry_conflict(_Scope, _Name, {Pid1, _Meta1, _Time1}, {_Pid2, _Meta2, _Time2}) ->
+%%    Pid1.

+ 7 - 1
test/syn_test_suite_helper.erl

@@ -33,6 +33,7 @@
 -export([start_process/0, start_process/1, start_process/2]).
 -export([kill_process/1]).
 -export([wait_cluster_connected/1]).
+-export([use_custom_handler/0]).
 -export([send_error_logger_to_disk/0]).
 
 %% internal
@@ -72,7 +73,9 @@ clean_after_test() ->
     %% shutdown
     lists:foreach(fun(Node) ->
         %% close syn
-        rpc:call(Node, application, stop, [syn])
+        rpc:call(Node, application, stop, [syn]),
+        %% clean env
+        rpc:call(Node, application, unset_env, [syn, event_handler])
     end, Nodes).
 
 start_process() ->
@@ -116,6 +119,9 @@ wait_cluster_connected(Nodes, StartAt) ->
             end
     end.
 
+use_custom_handler() ->
+    application:set_env(syn, event_handler, syn_test_event_handler).
+
 send_error_logger_to_disk() ->
     error_logger:logfile({open, atom_to_list(node())}).