Roberto Ostinelli 3 лет назад
Родитель
Сommit
26dec88f52
3 измененных файлов с 111 добавлено и 83 удалено
  1. 6 6
      src/syn_backbone.erl
  2. 57 47
      src/syn_groups.erl
  3. 48 30
      src/syn_registry.erl

+ 6 - 6
src/syn_backbone.erl

@@ -99,10 +99,10 @@ init([]) ->
     {stop, Reason :: any(), State :: map()}.
 handle_call({create_tables_for_scope, Scope}, _From, State) ->
     error_logger:info_msg("SYN[~s] Creating tables for scope '~s'", [?MODULE, Scope]),
-    ensure_table_exists(set, syn_registry_by_name, Scope),
-    ensure_table_exists(bag, syn_registry_by_pid, Scope),
-    ensure_table_exists(ordered_set, syn_groups_by_name, Scope),
-    ensure_table_exists(ordered_set, syn_groups_by_pid, Scope),
+    ensure_table_existence(set, syn_registry_by_name, Scope),
+    ensure_table_existence(bag, syn_registry_by_pid, Scope),
+    ensure_table_existence(ordered_set, syn_groups_by_name, Scope),
+    ensure_table_existence(ordered_set, syn_groups_by_pid, Scope),
     {reply, ok, State};
 
 handle_call(Request, From, State) ->
@@ -150,8 +150,8 @@ code_change(_OldVsn, State, _Extra) ->
 %% ===================================================================
 %% Internal
 %% ===================================================================
--spec ensure_table_exists(Type :: ets:type(), TableId :: atom(), Scope :: atom()) -> ok.
-ensure_table_exists(Type, TableId, Scope) ->
+-spec ensure_table_existence(Type :: ets:type(), TableId :: atom(), Scope :: atom()) -> ok.
+ensure_table_existence(Type, TableId, Scope) ->
     %% build name
     TableIdBin = atom_to_binary(TableId),
     ScopeBin = atom_to_binary(Scope),

+ 57 - 47
src/syn_groups.erl

@@ -142,7 +142,7 @@ join(Scope, GroupName, Pid) when is_pid(Pid) ->
 -spec join(Scope :: atom(), GroupName :: term(), Pid :: pid(), Meta :: term()) -> ok.
 join(Scope, GroupName, Pid, Meta) ->
     Node = node(Pid),
-    case syn_gen_scope:call(?MODULE, Node, Scope, {join_on_owner, node(), GroupName, Pid, Meta}) of
+    case syn_gen_scope:call(?MODULE, Node, Scope, {join_on_node, node(), GroupName, Pid, Meta}) of
         {ok, {CallbackMethod, Time, TableByName, TableByPid}} when Node =/= node() ->
             %% update table on caller node immediately so that subsequent calls have an updated registry
             add_to_local_table(GroupName, Pid, Meta, Time, undefined, TableByName, TableByPid),
@@ -167,7 +167,7 @@ leave(Scope, GroupName, Pid) ->
 
         TableByName ->
             Node = node(Pid),
-            case syn_gen_scope:call(?MODULE, Node, Scope, {leave_on_owner, node(), GroupName, Pid}) of
+            case syn_gen_scope:call(?MODULE, Node, Scope, {leave_on_node, node(), GroupName, Pid}) of
                 {ok, {Meta, TableByPid}} when Node =/= node() ->
                     %% remove table on caller node immediately so that subsequent calls have an updated registry
                     remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
@@ -183,29 +183,21 @@ leave(Scope, GroupName, Pid) ->
 
 -spec count(Scope :: atom()) -> non_neg_integer().
 count(Scope) ->
-    case syn_backbone:get_table_name(syn_groups_by_name, Scope) of
-        undefined ->
-            error({invalid_scope, Scope});
-
-        TableByName ->
-            Entries = ets:select(TableByName, [{
-                {{'$1', '_'}, '_', '_', '_', '_'},
-                [],
-                ['$1']
-            }]),
-            Set = sets:from_list(Entries),
-            sets:size(Set)
-    end.
+    do_count(Scope, '_').
 
 -spec count(Scope :: atom(), Node :: node()) -> non_neg_integer().
 count(Scope, Node) ->
+    do_count(Scope, Node).
+
+-spec do_count(Scope :: atom(), NodeParam :: atom()) -> non_neg_integer().
+do_count(Scope, NodeParam) ->
     case syn_backbone:get_table_name(syn_groups_by_name, Scope) of
         undefined ->
             error({invalid_scope, Scope});
 
         TableByName ->
             Entries = ets:select(TableByName, [{
-                {{'$1', '_'}, '_', '_', '_', Node},
+                {{'$1', '_'}, '_', '_', '_', NodeParam},
                 [],
                 ['$1']
             }]),
@@ -220,10 +212,7 @@ publish(GroupName, Message) ->
 -spec publish(Scope :: atom(), GroupName :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
 publish(Scope, GroupName, Message) ->
     Members = members(Scope, GroupName),
-    lists:foreach(fun({Pid, _Meta}) ->
-        Pid ! Message
-    end, Members),
-    {ok, length(Members)}.
+    do_publish(Members, Message).
 
 -spec local_publish(GroupName :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
 local_publish(GroupName, Message) ->
@@ -232,6 +221,11 @@ local_publish(GroupName, Message) ->
 -spec local_publish(Scope :: atom(), GroupName :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
 local_publish(Scope, GroupName, Message) ->
     Members = local_members(Scope, GroupName),
+    do_publish(Members, Message).
+
+-spec do_publish(Members :: [{Pid :: pid(), Meta :: term()}], Message :: any()) ->
+    {ok, RecipientCount :: non_neg_integer()}.
+do_publish(Members, Message) ->
     lists:foreach(fun({Pid, _Meta}) ->
         Pid ! Message
     end, Members),
@@ -262,8 +256,7 @@ init(State) ->
     {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
     {stop, Reason :: term(), Reply :: term(), #state{}} |
     {stop, Reason :: term(), #state{}}.
-handle_call({join_on_owner, RequesterNode, GroupName, Pid, Meta}, _From, #state{
-    scope = Scope,
+handle_call({join_on_node, RequesterNode, GroupName, Pid, Meta}, _From, #state{
     table_by_name = TableByName,
     table_by_pid = TableByPid
 } = State) ->
@@ -276,38 +269,21 @@ handle_call({join_on_owner, RequesterNode, GroupName, Pid, Meta}, _From, #state{
                         undefined -> erlang:monitor(process, Pid);  %% process is not monitored yet, create
                         MRef0 -> MRef0
                     end,
-                    Time = erlang:system_time(),
-                    %% add to local table
-                    add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid),
-                    %% callback
-                    syn_event_handler:call_event_handler(on_process_joined, [Scope, GroupName, Pid, Meta]),
-                    %% broadcast
-                    syn_gen_scope:broadcast({'3.0', sync_join, GroupName, Pid, Meta, Time}, [RequesterNode], State),
-                    %% return
-                    {reply, {ok, {on_process_joined, Time, TableByName, TableByPid}}, State};
+                    do_join_on_node(GroupName, Pid, Meta, MRef, RequesterNode, on_process_joined, State);
 
                 {{_, Meta}, _, _, _, _} ->
                     %% re-joined with same meta
                     {ok, noop};
 
                 {{_, _}, _, _, MRef, _} ->
-                    %% meta updated
-                    Time = erlang:system_time(),
-                    %% add to local table
-                    add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid),
-                    %% callback
-                    syn_event_handler:call_event_handler(on_group_process_updated, [Scope, GroupName, Pid, Meta]),
-                    %% broadcast
-                    syn_gen_scope:broadcast({'3.0', sync_join, GroupName, Pid, Meta, Time}, [RequesterNode], State),
-                    %% return
-                    {reply, {ok, {on_group_process_updated, Time, TableByName, TableByPid}}, State}
+                    do_join_on_node(GroupName, Pid, Meta, MRef, RequesterNode, on_group_process_updated, State)
             end;
 
         false ->
             {reply, {{error, not_alive}, undefined}, State}
     end;
 
-handle_call({leave_on_owner, RequesterNode, GroupName, Pid}, _From, #state{
+handle_call({leave_on_node, RequesterNode, GroupName, Pid}, _From, #state{
     scope = Scope,
     table_by_name = TableByName,
     table_by_pid = TableByPid
@@ -386,7 +362,7 @@ handle_info(Info, #state{scope = Scope} = State) ->
     {noreply, State}.
 
 %% ----------------------------------------------------------------------------------------------------------
-%% Data
+%% Data callbacks
 %% ----------------------------------------------------------------------------------------------------------
 -spec get_local_data(State :: term()) -> {ok, Data :: any()} | undefined.
 get_local_data(#state{table_by_name = TableByName}) ->
@@ -441,6 +417,40 @@ do_rebuild_monitors([{GroupName, Pid, Meta, Time} | T], NewMonitorRefs, #state{
             do_rebuild_monitors(T, NewMonitorRefs, State)
     end.
 
+-spec do_join_on_node(
+    GroupName :: term(),
+    Pid :: pid(),
+    Meta :: term(),
+    MRef :: reference() | undefined,
+    RequesterNode :: node(),
+    CallbackMethod :: atom(),
+    #state{}
+) ->
+    {
+        reply,
+        {
+            CallbackMethod :: atom(),
+            Time :: non_neg_integer(),
+            TableByName :: atom(),
+            TableByPid :: atom()
+        },
+        #state{}
+    }.
+do_join_on_node(GroupName, Pid, Meta, MRef, RequesterNode, CallbackMethod, #state{
+    scope = Scope,
+    table_by_name = TableByName,
+    table_by_pid = TableByPid
+} = State) ->
+    Time = erlang:system_time(),
+    %% add to local table
+    add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid),
+    %% callback
+    syn_event_handler:call_event_handler(CallbackMethod, [Scope, GroupName, Pid, Meta]),
+    %% broadcast
+    syn_gen_scope:broadcast({'3.0', sync_join, GroupName, Pid, Meta, Time}, [RequesterNode], State),
+    %% return
+    {reply, {ok, {CallbackMethod, Time, TableByName, TableByPid}}, State}.
+
 -spec get_groups_tuples_for_node(Node :: node(), TableByName :: atom()) -> [syn_groups_tuple()].
 get_groups_tuples_for_node(Node, TableByName) ->
     ets:select(TableByName, [{
@@ -452,7 +462,7 @@ get_groups_tuples_for_node(Node, TableByName) ->
 -spec find_monitor_for_pid(Pid :: pid(), TableByPid :: atom()) -> reference() | undefined.
 find_monitor_for_pid(Pid, TableByPid) when is_pid(Pid) ->
     %% we use select instead of lookup to limit the results and thus cover the case
-    %% when a process is registered with a considerable amount of names
+    %% when a process is in multiple groups
     case ets:select(TableByPid, [{
         {{Pid, '_'}, '_', '_', '$5', '_'},
         [],
@@ -470,7 +480,7 @@ find_groups_entry_by_name_and_pid(GroupName, Pid, TableByName) ->
         [Entry] -> Entry
     end.
 
--spec find_groups_entries_by_pid(Pid :: pid(), TableByPid :: atom()) -> GroupTuples :: [syn_groups_tuple()].
+-spec find_groups_entries_by_pid(Pid :: pid(), TableByPid :: atom()) -> GroupEntries :: [syn_groups_entry()].
 find_groups_entries_by_pid(Pid, TableByPid) when is_pid(Pid) ->
     ets:select(TableByPid, [{
         {{Pid, '_'}, '_', '_', '_', '_'},
@@ -482,7 +492,7 @@ find_groups_entries_by_pid(Pid, TableByPid) when is_pid(Pid) ->
 maybe_demonitor(Pid, TableByPid) ->
     %% select 2: if only 1 is returned it means that no other aliases exist for the Pid
     %% we use select instead of lookup to limit the results and thus cover the case
-    %% when a process is registered with a considerable amount of names
+    %% when a process is in multiple groups
     case ets:select(TableByPid, [{
         {{Pid, '_'}, '_', '_', '$5', '_'},
         [],
@@ -522,7 +532,7 @@ remove_from_local_table(GroupName, Pid, TableByName, TableByPid) ->
     true = ets:delete(TableByPid, {Pid, GroupName}).
 
 -spec purge_groups_for_remote_node(Scope :: atom(), Node :: atom(), TableByName :: atom(), TableByPid :: atom()) -> true.
-purge_groups_for_remote_node(Scope, Node, TableByName, TableByPid) ->
+purge_groups_for_remote_node(Scope, Node, TableByName, TableByPid) when Node =/= node() ->
     %% loop elements for callback in a separate process to free scope process
     GroupsTuples = get_groups_tuples_for_node(Node, TableByName),
     spawn(fun() ->

+ 48 - 30
src/syn_registry.erl

@@ -95,7 +95,7 @@ register(Scope, Name, Pid) when is_pid(Pid) ->
 -spec register(Scope :: atom(), Name :: term(), Pid :: pid(), Meta :: term()) -> ok | {error, Reason :: term()}.
 register(Scope, Name, Pid, Meta) ->
     Node = node(Pid),
-    case syn_gen_scope:call(?MODULE, Node, Scope, {register_on_owner, node(), Name, Pid, Meta}) of
+    case syn_gen_scope:call(?MODULE, Node, Scope, {register_on_node, node(), Name, Pid, Meta}) of
         {ok, {CallbackMethod, Time, TableByName, TableByPid}} when Node =/= node() ->
             %% update table on caller node immediately so that subsequent calls have an updated registry
             add_to_local_table(Name, Pid, Meta, Time, undefined, TableByName, TableByPid),
@@ -126,7 +126,7 @@ unregister(Scope, Name) ->
 
                 {Name, Pid, Meta, _, _, _} ->
                     Node = node(Pid),
-                    case syn_gen_scope:call(?MODULE, Node, Scope, {unregister_on_owner, node(), Name, Pid}) of
+                    case syn_gen_scope:call(?MODULE, Node, Scope, {unregister_on_node, node(), Name, Pid}) of
                         {ok, TableByPid} when Node =/= node() ->
                             %% remove table on caller node immediately so that subsequent calls have an updated registry
                             remove_from_local_table(Name, Pid, TableByName, TableByPid),
@@ -188,7 +188,7 @@ init(State) ->
     {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
     {stop, Reason :: term(), Reply :: term(), #state{}} |
     {stop, Reason :: term(), #state{}}.
-handle_call({register_on_owner, RequesterNode, Name, Pid, Meta}, _From, #state{
+handle_call({register_on_node, RequesterNode, Name, Pid, Meta}, _From, #state{
     scope = Scope,
     table_by_name = TableByName,
     table_by_pid = TableByPid
@@ -202,30 +202,14 @@ handle_call({register_on_owner, RequesterNode, Name, Pid, Meta}, _From, #state{
                         undefined -> erlang:monitor(process, Pid);  %% process is not monitored yet, add
                         MRef0 -> MRef0
                     end,
-                    %% add to local table
-                    Time = erlang:system_time(),
-                    add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid),
-                    %% callback
-                    syn_event_handler:call_event_handler(on_process_registered, [Scope, Name, Pid, Meta]),
-                    %% broadcast
-                    syn_gen_scope:broadcast({'3.0', sync_register, Name, Pid, Meta, Time}, [RequesterNode], State),
-                    %% return
-                    {reply, {ok, {on_process_registered, Time, TableByName, TableByPid}}, State};
+                    do_register_on_node(Name, Pid, Meta, MRef, RequesterNode, on_process_registered, State);
 
                 {Name, Pid, Meta, _, _, _} ->
                     %% same pid, same meta
                     {reply, {ok, noop}, State};
 
                 {Name, Pid, _, _, MRef, _} ->
-                    %% same pid, new meta
-                    Time = erlang:system_time(),
-                    add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid),
-                    %% callback
-                    syn_event_handler:call_event_handler(on_registry_process_updated, [Scope, Name, Pid, Meta]),
-                    %% broadcast
-                    syn_gen_scope:broadcast({'3.0', sync_register, Name, Pid, Meta, Time}, State),
-                    %% return
-                    {reply, {ok, {on_registry_process_updated, Time, TableByName, TableByPid}}, State};
+                    do_register_on_node(Name, Pid, Meta, MRef, RequesterNode, on_registry_process_updated, State);
 
                 _ ->
                     {reply, {{error, taken}, undefined}, State}
@@ -235,13 +219,13 @@ handle_call({register_on_owner, RequesterNode, Name, Pid, Meta}, _From, #state{
             {reply, {{error, not_alive}, undefined}, State}
     end;
 
-handle_call({unregister_on_owner, RequesterNode, Name, Pid}, _From, #state{
+handle_call({unregister_on_node, RequesterNode, Name, Pid}, _From, #state{
     scope = Scope,
     table_by_name = TableByName,
     table_by_pid = TableByPid
 } = State) ->
     case find_registry_entry_by_name(Name, TableByName) of
-        {Name, Pid, Meta, _Time, _MRef, _Node} ->
+        {Name, Pid, Meta, _, _, _} ->
             %% demonitor if the process is not registered under other names
             maybe_demonitor(Pid, TableByPid),
             %% remove from table
@@ -253,7 +237,7 @@ handle_call({unregister_on_owner, RequesterNode, Name, Pid}, _From, #state{
             %% return
             {reply, {ok, TableByPid}, State};
 
-        {Name, _TablePid, _Meta, _Time, _MRef, _Node} ->
+        {Name, _, _, _, _, _} ->
             %% process is registered locally with another pid: race condition, wait for sync to happen & return error
             {reply, {{error, race_condition}, undefined}, State};
 
@@ -300,7 +284,7 @@ handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{
             );
 
         Entries ->
-            lists:foreach(fun({_Pid, Name, Meta, _, _, _}) ->
+            lists:foreach(fun({_, Name, Meta, _, _, _}) ->
                 %% remove from table
                 remove_from_local_table(Name, Pid, TableByName, TableByPid),
                 %% callback
@@ -317,7 +301,7 @@ handle_info(Info, #state{scope = Scope} = State) ->
     {noreply, State}.
 
 %% ----------------------------------------------------------------------------------------------------------
-%% Data
+%% Data callbacks
 %% ----------------------------------------------------------------------------------------------------------
 -spec get_local_data(#state{}) -> {ok, Data :: term()} | undefined.
 get_local_data(#state{table_by_name = TableByName}) ->
@@ -372,6 +356,40 @@ do_rebuild_monitors([{Name, Pid, Meta, Time} | T], NewMonitorRefs, #state{
             do_rebuild_monitors(T, NewMonitorRefs, State)
     end.
 
+-spec do_register_on_node(
+    Name :: term(),
+    Pid :: pid(),
+    Meta :: term(),
+    MRef :: reference() | undefined,
+    RequesterNode :: node(),
+    CallbackMethod :: atom(),
+    #state{}
+) ->
+    {
+        reply,
+        {
+            CallbackMethod :: atom(),
+            Time :: non_neg_integer(),
+            TableByName :: atom(),
+            TableByPid :: atom()
+        },
+        #state{}
+    }.
+do_register_on_node(Name, Pid, Meta, MRef, RequesterNode, CallbackMethod, #state{
+    scope = Scope,
+    table_by_name = TableByName,
+    table_by_pid = TableByPid
+} = State) ->
+    %% add to local table
+    Time = erlang:system_time(),
+    add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid),
+    %% callback
+    syn_event_handler:call_event_handler(CallbackMethod, [Scope, Name, Pid, Meta]),
+    %% broadcast
+    syn_gen_scope:broadcast({'3.0', sync_register, Name, Pid, Meta, Time}, [RequesterNode], State),
+    %% return
+    {reply, {ok, {CallbackMethod, Time, TableByName, TableByPid}}, State}.
+
 -spec get_registry_tuples_for_node(Node :: node(), TableByName :: atom()) -> [syn_registry_tuple()].
 get_registry_tuples_for_node(Node, TableByName) ->
     ets:select(TableByName, [{
@@ -499,7 +517,7 @@ handle_registry_sync(Name, Pid, Meta, Time, #state{
             %% callback
             syn_event_handler:call_event_handler(on_process_registered, [Scope, Name, Pid, Meta]);
 
-        {Name, Pid, TableMeta, _TableTime, MRef, _TableNode} ->
+        {_, Pid, TableMeta, _, MRef, _} ->
             %% same pid, more recent (because it comes from the same node, which means that it's sequential)
             %% maybe updated meta or time only
             add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid),
@@ -509,7 +527,7 @@ handle_registry_sync(Name, Pid, Meta, Time, #state{
                 _ -> ok
             end;
 
-        {Name, TablePid, TableMeta, TableTime, TableMRef, _TableNode} when node(TablePid) =:= node() ->
+        {_, TablePid, TableMeta, TableTime, TableMRef, TableNode} when TableNode =:= node() ->
             %% current node runs a conflicting process -> resolve
             %% * the conflict is resolved by the two nodes that own the conflicting processes
             %% * when a process is chosen, the time is updated
@@ -517,14 +535,14 @@ handle_registry_sync(Name, Pid, Meta, Time, #state{
             %% * recipients check that the time is more recent that what they have to ensure that there are no race conditions
             resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime, TableMRef}, State);
 
-        {Name, TablePid, TableMeta, TableTime, _TableMRef, _TableNode} when TableTime < Time ->
+        {_, TablePid, TableMeta, TableTime, _, _} when TableTime < Time ->
             %% current node does not own any of the conflicting processes, update
             update_local_table(Name, TablePid, {Pid, Meta, Time, undefined}, TableByName, TableByPid),
             %% callbacks
             syn_event_handler:call_event_handler(on_process_unregistered, [Scope, Name, TablePid, TableMeta]),
             syn_event_handler:call_event_handler(on_process_registered, [Scope, Name, Pid, Meta]);
 
-        {Name, _TablePid, _TableMeta, _TableTime, _TableMRef, _TableNode} ->
+        {_, _, _, _, _, _} ->
             %% race condition: incoming data is older, ignore
             ok
     end.