Browse Source

Add on_group_process_exit callback.

Roberto Ostinelli 5 years ago
parent
commit
b5175de250
6 changed files with 205 additions and 100 deletions
  1. 86 63
      README.md
  2. 32 2
      src/syn_event_handler.erl
  3. 27 25
      src/syn_groups.erl
  4. 3 8
      src/syn_registry.erl
  5. 45 2
      test/syn_groups_SUITE.erl
  6. 12 0
      test/syn_test_event_handler.erl

+ 86 - 63
README.md

@@ -1,41 +1,41 @@
 
-[![Build Status](https://travis-ci.org/ostinelli/syn.svg?branch=master)](https://travis-ci.org/ostinelli/syn)   [![Hex pm](https://img.shields.io/hexpm/v/syn.svg)](https://hex.pm/packages/syn)  
-  
-  
+[![Build Status](https://travis-ci.org/ostinelli/syn.svg?branch=master)](https://travis-ci.org/ostinelli/syn) [![Hex pm](https://img.shields.io/hexpm/v/syn.svg)](https://hex.pm/packages/syn)
+
+
 # Syn (v2)
-**Syn** (short for _synonym_) is a global Process Registry and Process Group manager for Erlang and Elixir. Syn automatically manages addition / removal of nodes from the cluster, and is also able to recover from net splits. 
-
-## Introduction  
-  
-##### What is a Process Registry?  
-A global Process Registry allows registering a process on all the nodes of a cluster with a single Key. Consider this the process equivalent of a DNS server: in the same way you can retrieve an IP address from a domain name, you can retrieve a process from its Key.  
-  
-Typical Use Case: registering on a system a process that handles a physical device (using its serial number).  
-  
-##### What is a Process Group?  
-A global Process Group is a named group which contains many processes, possibly running on different nodes. With the group Name, you can retrieve on any cluster node the list of these processes, or publish a message to all of them. This mechanism allows for Publish / Subscribe patterns.  
-  
-Typical Use Case: a chatroom.  
-  
-##### What is Syn?  
-Syn is a Process Registry and Process Group manager that has the following features:  
-  
- * Global Process Registry (i.e. a process is uniquely identified with a Key across all the nodes of a cluster).  
- * Global Process Group manager (i.e. a group is uniquely identified with a Name across all the nodes of a cluster).  
- * Any term can be used as Key and Name.  
- * A message can be published to all members of a Process Group (PubSub mechanism).  
- * Fast writes.  
- * Automatically handles conflict resolution (such as net splits).  
- * Configurable callbacks.  
- * Processes are automatically monitored and removed from the Process Registry and Process Groups if they die.  
-    
-## Notes  
-In any distributed system you are faced with a consistency challenge, which is often resolved by having one master arbiter performing all write operations (chosen with a mechanism of [leader election](http://en.wikipedia.org/wiki/Leader_election)), or through [atomic transactions](http://en.wikipedia.org/wiki/Atomicity_(database_systems)).  
-  
-Syn was born for applications of the [IoT](http://en.wikipedia.org/wiki/Internet_of_Things) field. In this context, Keys used to identify a process are often the physical object's unique identifier (for instance, its serial or MAC address), and are therefore already defined and unique _before_ hitting the system.  The consistency challenge is less of a problem in this case, since the likelihood of concurrent incoming requests that would register processes with the same Key is extremely low and, in most cases, acceptable.  
-  
-In addition, write speeds were a determining factor in the architecture of Syn.  
-  
+**Syn** (short for _synonym_) is a global Process Registry and Process Group manager for Erlang and Elixir. Syn automatically manages addition / removal of nodes from the cluster, and is also able to recover from net splits.
+
+## Introduction
+
+##### What is a Process Registry?
+A global Process Registry allows registering a process on all the nodes of a cluster with a single Key. Consider this the process equivalent of a DNS server: in the same way you can retrieve an IP address from a domain name, you can retrieve a process from its Key.
+
+Typical Use Case: registering on a system a process that handles a physical device (using its serial number).
+
+##### What is a Process Group?
+A global Process Group is a named group which contains many processes, possibly running on different nodes. With the group Name, you can retrieve on any cluster node the list of these processes, or publish a message to all of them. This mechanism allows for Publish / Subscribe patterns.
+
+Typical Use Case: a chatroom.
+
+##### What is Syn?
+Syn is a Process Registry and Process Group manager that has the following features:
+
+ * Global Process Registry (i.e. a process is uniquely identified with a Key across all the nodes of a cluster).
+ * Global Process Group manager (i.e. a group is uniquely identified with a Name across all the nodes of a cluster).
+ * Any term can be used as Key and Name.
+ * A message can be published to all members of a Process Group (PubSub mechanism).
+ * Fast writes.
+ * Automatically handles conflict resolution (such as net splits).
+ * Configurable callbacks.
+ * Processes are automatically monitored and removed from the Process Registry and Process Groups if they die.
+
+## Notes
+In any distributed system you are faced with a consistency challenge, which is often resolved by having one master arbiter performing all write operations (chosen with a mechanism of [leader election](http://en.wikipedia.org/wiki/Leader_election)), or through [atomic transactions](http://en.wikipedia.org/wiki/Atomicity_(database_systems)).
+
+Syn was born for applications of the [IoT](http://en.wikipedia.org/wiki/Internet_of_Things) field. In this context, Keys used to identify a process are often the physical object's unique identifier (for instance, its serial or MAC address), and are therefore already defined and unique _before_ hitting the system.  The consistency challenge is less of a problem in this case, since the likelihood of concurrent incoming requests that would register processes with the same Key is extremely low and, in most cases, acceptable.
+
+In addition, write speeds were a determining factor in the architecture of Syn.
+
 Therefore, Availability has been chosen over Consistency and Syn is [eventually consistent](http://en.wikipedia.org/wiki/Eventual_consistency).
 
 ## Setup
@@ -89,7 +89,7 @@ Ensure that `syn` is started with your application, for example by adding it in
     ]},
     %% ...
 ]}.
-``` 
+```
 
 ## API
 
@@ -353,12 +353,12 @@ config :syn,
   event_handler: MyCustomEventHandler
 ```
 
-In your module you then need to specify the behavior and the callbacks. Both callbacks are _optional_, so you just need to define the ones you need.
+In your module you then need to specify the behavior and the callbacks. All callbacks are _optional_, so you just need to define the ones you need.
 
 ```elixir
 defmodule MyCustomEventHandler do
   @behaviour :syn_event_handler
-  
+
   @impl true
   @spec on_process_exit(
     name :: any(),
@@ -368,7 +368,17 @@ defmodule MyCustomEventHandler do
   ) :: any()
   def on_process_exit(name, pid, meta, reason) do
   end
-  
+
+  @impl true
+  @spec on_group_process_exit(
+    group_name :: any(),
+    pid :: pid(),
+    meta :: any(),
+    reason :: any()
+  ) :: any()
+  def on_group_process_exit(group_name, pid, meta, reason) do
+  end
+
   @impl true
   @spec resolve_registry_conflict(
     name :: any(),
@@ -391,30 +401,40 @@ In `sys.config` you can specify your callback module:
 ]}
 ```
 
-In your module you then need to specify the behavior and the callbacks. Both callbacks are _optional_, so you just need to define the ones you need.
+In your module you then need to specify the behavior and the callbacks. All callbacks are _optional_, so you just need to define the ones you need.
 
 ```erlang
--module(my_custom_event_handler).  
--behaviour(syn_event_handler).  
-
--export([on_process_exit/4]).  
--export([resolve_registry_conflict/3]).  
-  
--spec on_process_exit(  
-    Name :: any(),  
-    Pid :: pid(),  
-    Meta :: any(),  
-    Reason :: any()  
-) -> any().  
-on_process_exit(Name, Pid, Meta, Reason) ->  
-    ok.  
-  
--spec resolve_registry_conflict(  
-    Name :: any(),  
-    {Pid1 :: pid(), Meta1 :: any()},  
-    {Pid2 :: pid(), Meta2 :: any()}  
-) -> PidToKeep :: pid().  
-resolve_registry_conflict(Name, {Pid1, Meta1}, {Pid2, Meta2}) ->  
+-module(my_custom_event_handler).
+-behaviour(syn_event_handler).
+
+-export([on_process_exit/4]).
+-export([on_group_process_exit/4]).
+-export([resolve_registry_conflict/3]).
+
+-spec on_process_exit(
+    Name :: any(),
+    Pid :: pid(),
+    Meta :: any(),
+    Reason :: any()
+) -> any().
+on_process_exit(Name, Pid, Meta, Reason) ->
+    ok.
+
+-spec on_group_process_exit(
+    GroupName :: any(),
+    Pid :: pid(),
+    Meta :: any(),
+    Reason :: any()
+) -> any().
+on_group_process_exit(GroupName, Pid, Meta, Reason) ->
+    ok.
+
+-spec resolve_registry_conflict(
+    Name :: any(),
+    {Pid1 :: pid(), Meta1 :: any()},
+    {Pid2 :: pid(), Meta2 :: any()}
+) -> PidToKeep :: pid().
+resolve_registry_conflict(Name, {Pid1, Meta1}, {Pid2, Meta2}) ->
     Pid1.
 ```
 
@@ -423,7 +443,10 @@ See details about the callback methods here below.
 ### Callback methods
 
 #### `on_process_exit/4`
-Called when a registered process exits. It will be called only on the node where the process was running. If a process was registered under multiple names, this callback will be called multiple times.
+Called when a registered process exits. It will be called only on the node where the process was running. If a process was registered under _n_ names, this callback will be called _n_ times (1 per registered name).
+
+#### `on_group_process_exit/4`
+Called when a process in a group exits. It will be called only on the node where the process was running. If a process was part of _n_ groups, this callback will be called _n_ times (1 per joined group).
 
 #### `resolve_registry_conflict/3`
 In case of net splits, a specific Name might get registered simultaneously on two different nodes. In this case, the cluster experiences a registry naming conflict.

+ 32 - 2
src/syn_event_handler.erl

@@ -29,6 +29,7 @@
 -module(syn_event_handler).
 
 -export([do_on_process_exit/5]).
+-export([do_on_group_process_exit/5]).
 -export([do_resolve_registry_conflict/4]).
 
 -callback on_process_exit(
@@ -38,13 +39,20 @@
     Reason :: any()
 ) -> any().
 
+-callback on_group_process_exit(
+    GroupName :: any(),
+    Pid :: pid(),
+    Meta :: any(),
+    Reason :: any()
+) -> any().
+
 -callback resolve_registry_conflict(
     Name :: any(),
     {Pid1 :: pid(), Meta1 :: any()},
     {Pid2 :: pid(), Meta2 :: any()}
 ) -> PidToKeep :: pid() | undefined.
 
--optional_callbacks([on_process_exit/4, resolve_registry_conflict/3]).
+-optional_callbacks([on_process_exit/4, on_group_process_exit/4, resolve_registry_conflict/3]).
 
 %% ===================================================================
 %% API
@@ -58,7 +66,29 @@
 ) -> any().
 do_on_process_exit(Name, Pid, Meta, Reason, CustomEventHandler) ->
     spawn(fun() ->
-        CustomEventHandler:on_process_exit(Name, Pid, Meta, Reason)
+        case erlang:function_exported(CustomEventHandler, on_process_exit, 4) of
+            true ->
+                CustomEventHandler:on_process_exit(Name, Pid, Meta, Reason);
+            _ ->
+                ok
+        end
+    end).
+
+-spec do_on_group_process_exit(
+    GroupName :: any(),
+    Pid :: pid(),
+    Meta :: any(),
+    Reason :: any(),
+    CustomEventHandler :: module()
+) -> any().
+do_on_group_process_exit(GroupName, Pid, Meta, Reason, CustomEventHandler) ->
+    spawn(fun() ->
+        case erlang:function_exported(CustomEventHandler, on_group_process_exit, 4) of
+            true ->
+                CustomEventHandler:on_group_process_exit(GroupName, Pid, Meta, Reason);
+            _ ->
+                ok
+        end
     end).
 
 -spec do_resolve_registry_conflict(

+ 27 - 25
src/syn_groups.erl

@@ -49,9 +49,12 @@
 -export([multi_call_and_receive/4]).
 
 %% records
--record(state, {}).
+-record(state, {
+    custom_event_handler = undefined :: module()
+}).
 
 %% macros
+-define(DEFAULT_EVENT_HANDLER_MODULE, syn_event_handler).
 -define(DEFAULT_MULTI_CALL_TIMEOUT_MS, 5000).
 
 %% includes
@@ -214,8 +217,14 @@ init([]) ->
         ok ->
             %% monitor nodes
             ok = net_kernel:monitor_nodes(true),
+            %% get handler
+            CustomEventHandler = application:get_env(syn, event_handler, ?DEFAULT_EVENT_HANDLER_MODULE),
+            %% ensure that is it loaded (not using code:ensure_loaded/1 to support embedded mode)
+            catch CustomEventHandler:module_info(exports),
             %% init
-            {ok, #state{}};
+            {ok, #state{
+                custom_event_handler = CustomEventHandler
+            }};
         Reason ->
             {stop, {error_waiting_for_groups_table, Reason}}
     end.
@@ -295,15 +304,16 @@ handle_cast(Msg, State) ->
 handle_info({'DOWN', _MonitorRef, process, Pid, Reason}, State) ->
     case find_processes_entry_by_pid(Pid) of
         [] ->
-            %% log
-            log_process_exit(undefined, Pid, Reason);
+            %% handle
+            handle_process_down(undefined, Pid, undefined, Reason, State);
 
         Entries ->
             lists:foreach(fun(Entry) ->
                 %% get process info
                 GroupName = Entry#syn_groups_table.name,
-                %% log
-                log_process_exit(GroupName, Pid, Reason),
+                Meta = Entry#syn_groups_table.meta,
+                %% handle
+                handle_process_down(GroupName, Pid, Meta, Reason, State),
                 %% remove from table
                 remove_from_local_table(Entry),
                 %% multicast
@@ -449,26 +459,18 @@ find_process_entry_by_name_and_pid(GroupName, Pid) ->
         [] -> undefined
     end.
 
--spec log_process_exit(Name :: any(), Pid :: pid(), Reason :: any()) -> ok.
-log_process_exit(GroupName, Pid, Reason) ->
-    case Reason of
-        normal -> ok;
-        shutdown -> ok;
-        {shutdown, _} -> ok;
-        killed -> ok;
+-spec handle_process_down(GroupName :: any(), Pid :: pid(), Meta :: any(), Reason :: any(), #state{}) -> ok.
+handle_process_down(GroupName, Pid, Meta, Reason, #state{
+    custom_event_handler = CustomEventHandler
+}) ->
+    case GroupName of
+        undefined ->
+            error_logger:warning_msg(
+                "Syn(~p): Received a DOWN message from an unmonitored group process ~p with reason: ~p~n",
+                [node(), Pid, Reason]
+            );
         _ ->
-            case GroupName of
-                undefined ->
-                    error_logger:error_msg(
-                        "Syn(~p): Received a DOWN message from an unmonitored process ~p with reason: ~p~n",
-                        [node(), Pid, Reason]
-                    );
-                _ ->
-                    error_logger:error_msg(
-                        "Syn(~p): Process in group ~p and pid ~p exited with reason: ~p~n",
-                        [node(), GroupName, Pid, Reason]
-                    )
-            end
+            syn_event_handler:do_on_group_process_exit(GroupName, Pid, Meta, Reason, CustomEventHandler)
     end.
 
 -spec sync_group_tuples(RemoteNode :: node(), GroupTuples :: [syn_registry_tuple()]) -> ok.

+ 3 - 8
src/syn_registry.erl

@@ -244,7 +244,7 @@ handle_cast(Msg, State) ->
 handle_info({'DOWN', _MonitorRef, process, Pid, Reason}, State) ->
     case find_processes_entry_by_pid(Pid) of
         [] ->
-            %% log
+            %% handle
             handle_process_down(undefined, Pid, undefined, Reason, State);
 
         Entries ->
@@ -253,7 +253,7 @@ handle_info({'DOWN', _MonitorRef, process, Pid, Reason}, State) ->
                 Name = Entry#syn_registry_table.name,
                 Pid = Entry#syn_registry_table.pid,
                 Meta = Entry#syn_registry_table.meta,
-                %% log
+                %% handle
                 handle_process_down(Name, Pid, Meta, Reason, State),
                 %% remove from table
                 remove_from_local_table(Name),
@@ -388,12 +388,7 @@ handle_process_down(Name, Pid, Meta, Reason, #state{
                 [node(), Pid, Reason]
             );
         _ ->
-            case erlang:function_exported(CustomEventHandler, on_process_exit, 4) of
-                true ->
-                    syn_event_handler:do_on_process_exit(Name, Pid, Meta, Reason, CustomEventHandler);
-                _ ->
-                    ok
-            end
+            syn_event_handler:do_on_process_exit(Name, Pid, Meta, Reason, CustomEventHandler)
     end.
 
 -spec sync_registry_tuples(RemoteNode :: node(), RegistryTuples :: [syn_registry_tuple()], #state{}) -> ok.

+ 45 - 2
test/syn_groups_SUITE.erl

@@ -38,7 +38,8 @@
     single_node_join_errors/1,
     single_node_publish/1,
     single_node_multicall/1,
-    single_node_multicall_with_custom_timeout/1
+    single_node_multicall_with_custom_timeout/1,
+    single_node_callback_on_process_exit/1
 ]).
 -export([
     two_nodes_join_monitor_and_unregister/1,
@@ -93,7 +94,8 @@ groups() ->
             single_node_join_errors,
             single_node_publish,
             single_node_multicall,
-            single_node_multicall_with_custom_timeout
+            single_node_multicall_with_custom_timeout,
+            single_node_callback_on_process_exit
         ]},
         {two_nodes_groups, [shuffle], [
             two_nodes_join_monitor_and_unregister,
@@ -391,6 +393,47 @@ single_node_multicall_with_custom_timeout(_Config) ->
     syn_test_suite_helper:kill_process(PidTakesLong),
     syn_test_suite_helper:kill_process(PidUnresponsive).
 
+single_node_callback_on_process_exit(_Config) ->
+    %% use custom handler
+    syn_test_suite_helper:use_custom_handler(),
+    %% start
+    ok = syn:start(),
+    %% start processes
+    Pid = syn_test_suite_helper:start_process(),
+    Pid2 = syn_test_suite_helper:start_process(),
+    %% join
+    TestPid = self(),
+    ok = syn:join(group_1, Pid, {pid_group_1, TestPid}),
+    ok = syn:join(group_2, Pid, {pid_group_2, TestPid}),
+    ok = syn:join(group_1, Pid2, {pid2, TestPid}),
+    %% kill 1
+    syn_test_suite_helper:kill_process(Pid),
+    receive
+        {received_event_on, pid_group_1} ->
+            ok;
+        {received_event_on, pid2} ->
+            ok = callback_on_process_exit_was_received_by_pid2
+    after 1000 ->
+        ok = callback_on_process_exit_was_not_received_by_pid
+    end,
+    receive
+        {received_event_on, pid_group_2} ->
+            ok;
+        {received_event_on, pid2} ->
+            ok = callback_on_process_exit_was_received_by_pid2
+    after 1000 ->
+        ok = callback_on_process_exit_was_not_received_by_pid
+    end,
+    %% unregister & kill 2
+    ok = syn:leave(group_1, Pid2),
+    syn_test_suite_helper:kill_process(Pid2),
+    receive
+        {received_event_on, pid2} ->
+            ok = callback_on_process_exit_was_received_by_pid2
+    after 1000 ->
+        ok
+    end.
+
 two_nodes_join_monitor_and_unregister(Config) ->
     GroupName = "my group",
     %% get slave

+ 12 - 0
test/syn_test_event_handler.erl

@@ -28,6 +28,7 @@
 
 %% API
 -export([on_process_exit/4]).
+-export([on_group_process_exit/4]).
 -export([resolve_registry_conflict/3]).
 
 %% ===================================================================
@@ -44,6 +45,17 @@ on_process_exit(_Name, _Pid, {PidId, TestPid}, _Reason) when is_pid(TestPid) ->
 on_process_exit(_Name, _Pid, _Meta, _Reason) ->
     ok.
 
+-spec on_group_process_exit(
+    GroupName :: any(),
+    Pid :: pid(),
+    Meta :: any(),
+    Reason :: any()
+) -> any().
+on_group_process_exit(_GroupName, _Pid, {PidId, TestPid}, _Reason) when is_pid(TestPid) ->
+    TestPid ! {received_event_on, PidId};
+on_group_process_exit(_GroupName, _Pid, _Meta, _Reason) ->
+    ok.
+
 -spec resolve_registry_conflict(
     Name :: any(),
     {Pid1 :: pid(), Meta1 :: any()},