Roberto Ostinelli 3 лет назад
Родитель
Сommit
fc66b1402a
3 измененных файлов с 26 добавлено и 14 удалено
  1. 2 1
      src/syn.hrl
  2. 21 10
      src/syn_groups.erl
  3. 3 3
      src/syn_registry.erl

+ 2 - 1
src/syn.hrl

@@ -63,7 +63,8 @@
 -type syn_groups_tuple() :: {
     GroupName :: any(),
     Pid :: pid(),
-    Meta :: any()
+    Meta :: any(),
+    Time :: non_neg_integer()
 }.
 
 %% records

+ 21 - 10
src/syn_groups.erl

@@ -422,7 +422,7 @@ rebuild_monitors(#state{
     GroupsTuples = get_groups_tuples_for_node(node(), TableByName),
     do_rebuild_monitors(GroupsTuples, #{}, State).
 
--spec do_rebuild_monitors([syn_groups_tuple()], [reference()], #state{}) -> ok.
+-spec do_rebuild_monitors([syn_groups_tuple()], #{pid() => reference()}, #state{}) -> ok.
 do_rebuild_monitors([], _, _) -> ok;
 do_rebuild_monitors([{GroupName, Pid, Meta, Time} | T], NewMonitorRefs, #state{
     table_by_name = TableByName,
@@ -457,12 +457,12 @@ do_rebuild_monitors([{GroupName, Pid, Meta, Time} | T], NewMonitorRefs, #state{
 ) ->
     {
         reply,
-        {
+        {ok, {
             CallbackMethod :: atom(),
             Time :: non_neg_integer(),
             TableByName :: atom(),
             TableByPid :: atom()
-        },
+        }},
         #state{}
     }.
 do_join_on_node(GroupName, Pid, Meta, MRef, RequesterNode, CallbackMethod, #state{
@@ -629,20 +629,31 @@ multi_call_and_receive(CollectorPid, Pid, Meta, Message, Timeout) ->
         CollectorPid ! {bad_pid, Pid}
     end.
 
--spec collect_replies(Members :: [{pid(), Meta :: any()}]) -> {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
+-spec collect_replies(Members :: [{pid(), Meta :: any()}]) ->
+    {
+        Replies :: [{{pid(), Meta :: term()}, Reply :: any()}],
+        BadReplies :: [{pid(), Meta :: term()}]
+    }.
 collect_replies(Members) ->
     collect_replies(Members, [], []).
 
--spec collect_replies(MemberPids :: [{pid(), Meta :: any()}], [{pid(), Reply :: any()}], [pid()]) ->
-    {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
-collect_replies([], Replies, BadPids) -> {Replies, BadPids};
-collect_replies(Members, Replies, BadPids) ->
+-spec collect_replies(
+    MemberPids :: [{pid(), Meta :: any()}],
+    Replies :: [{{pid(), Meta :: term()}, Reply :: any()}],
+    BadReplies :: [{pid(), Meta :: term()}]
+) ->
+    {
+        Replies :: [{{pid(), Meta :: term()}, Reply :: any()}],
+        BadReplies :: [{pid(), Meta :: term()}]
+    }.
+collect_replies([], Replies, BadReplies) -> {Replies, BadReplies};
+collect_replies(Members, Replies, BadReplies) ->
     receive
         {reply, Pid, Reply} ->
             {value, {Pid, Meta}, Members1} = lists:keytake(Pid, 1, Members),
-            collect_replies(Members1, [{{Pid, Meta}, Reply} | Replies], BadPids);
+            collect_replies(Members1, [{{Pid, Meta}, Reply} | Replies], BadReplies);
 
         {bad_pid, Pid} ->
             {value, {Pid, Meta}, Members1} = lists:keytake(Pid, 1, Members),
-            collect_replies(Members1, Replies, [{Pid, Meta} | BadPids])
+            collect_replies(Members1, Replies, [{Pid, Meta} | BadReplies])
     end.

+ 3 - 3
src/syn_registry.erl

@@ -331,7 +331,7 @@ rebuild_monitors(#state{
     RegistryTuples = get_registry_tuples_for_node(node(), TableByName),
     do_rebuild_monitors(RegistryTuples, #{}, State).
 
--spec do_rebuild_monitors([syn_registry_tuple()], [reference()], #state{}) -> ok.
+-spec do_rebuild_monitors([syn_registry_tuple()], #{pid() => reference()}, #state{}) -> ok.
 do_rebuild_monitors([], _, _) -> ok;
 do_rebuild_monitors([{Name, Pid, Meta, Time} | T], NewMonitorRefs, #state{
     table_by_name = TableByName,
@@ -366,12 +366,12 @@ do_rebuild_monitors([{Name, Pid, Meta, Time} | T], NewMonitorRefs, #state{
 ) ->
     {
         reply,
-        {
+        {ok, {
             CallbackMethod :: atom(),
             Time :: non_neg_integer(),
             TableByName :: atom(),
             TableByPid :: atom()
-        },
+        }},
         #state{}
     }.
 do_register_on_node(Name, Pid, Meta, MRef, RequesterNode, CallbackMethod, #state{