|
@@ -254,7 +254,7 @@ multi_call(Scope, GroupName, Message, Timeout) ->
|
|
|
lists:foreach(fun({Pid, Meta}) ->
|
|
|
spawn_link(?MODULE, multi_call_and_receive, [Self, Pid, Meta, Message, Timeout])
|
|
|
end, Members),
|
|
|
- collect_replies(Members).
|
|
|
+ collect_replies(orddict:from_list(Members)).
|
|
|
|
|
|
-spec multi_call_reply(CallerPid :: pid(), Reply :: any()) -> {syn_multi_call_reply, pid(), Reply :: any()}.
|
|
|
multi_call_reply(CallerPid, Reply) ->
|
|
@@ -424,26 +424,26 @@ rebuild_monitors(#state{
|
|
|
|
|
|
-spec do_rebuild_monitors([syn_groups_tuple()], #{pid() => reference()}, #state{}) -> ok.
|
|
|
do_rebuild_monitors([], _, _) -> ok;
|
|
|
-do_rebuild_monitors([{GroupName, Pid, Meta, Time} | T], NewMonitorRefs, #state{
|
|
|
+do_rebuild_monitors([{GroupName, Pid, Meta, Time} | T], NewMRefs, #state{
|
|
|
table_by_name = TableByName,
|
|
|
table_by_pid = TableByPid
|
|
|
} = State) ->
|
|
|
remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
|
|
|
case is_process_alive(Pid) of
|
|
|
true ->
|
|
|
- case maps:find(Pid, NewMonitorRefs) of
|
|
|
+ case maps:find(Pid, NewMRefs) of
|
|
|
error ->
|
|
|
MRef = erlang:monitor(process, Pid),
|
|
|
add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid),
|
|
|
- do_rebuild_monitors(T, maps:put(Pid, MRef, NewMonitorRefs), State);
|
|
|
+ do_rebuild_monitors(T, maps:put(Pid, MRef, NewMRefs), State);
|
|
|
|
|
|
{ok, MRef} ->
|
|
|
add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid),
|
|
|
- do_rebuild_monitors(T, NewMonitorRefs, State)
|
|
|
+ do_rebuild_monitors(T, NewMRefs, State)
|
|
|
end;
|
|
|
|
|
|
_ ->
|
|
|
- do_rebuild_monitors(T, NewMonitorRefs, State)
|
|
|
+ do_rebuild_monitors(T, NewMRefs, State)
|
|
|
end.
|
|
|
|
|
|
-spec do_join_on_node(
|
|
@@ -614,7 +614,7 @@ handle_groups_sync(GroupName, Pid, Meta, Time, #state{
|
|
|
) -> any().
|
|
|
multi_call_and_receive(CollectorPid, Pid, Meta, Message, Timeout) ->
|
|
|
%% monitor
|
|
|
- MonitorRef = monitor(process, Pid),
|
|
|
+ MRef = monitor(process, Pid),
|
|
|
%% send
|
|
|
Pid ! {syn_multi_call, Message, self(), Meta},
|
|
|
%% wait for reply
|
|
@@ -622,23 +622,23 @@ multi_call_and_receive(CollectorPid, Pid, Meta, Message, Timeout) ->
|
|
|
{syn_multi_call_reply, Pid, Reply} ->
|
|
|
CollectorPid ! {reply, Pid, Reply};
|
|
|
|
|
|
- {'DOWN', MonitorRef, _, _, _} ->
|
|
|
+ {'DOWN', MRef, _, _, _} ->
|
|
|
CollectorPid ! {bad_pid, Pid}
|
|
|
|
|
|
after Timeout ->
|
|
|
CollectorPid ! {bad_pid, Pid}
|
|
|
end.
|
|
|
|
|
|
--spec collect_replies(Members :: [{pid(), Meta :: any()}]) ->
|
|
|
+-spec collect_replies(MembersOD :: orddict:orddict({pid(), Meta :: any()})) ->
|
|
|
{
|
|
|
Replies :: [{{pid(), Meta :: term()}, Reply :: any()}],
|
|
|
BadReplies :: [{pid(), Meta :: term()}]
|
|
|
}.
|
|
|
-collect_replies(Members) ->
|
|
|
- collect_replies(Members, [], []).
|
|
|
+collect_replies(MembersOD) ->
|
|
|
+ collect_replies(MembersOD, [], []).
|
|
|
|
|
|
-spec collect_replies(
|
|
|
- MemberPids :: [{pid(), Meta :: any()}],
|
|
|
+ MembersOD :: orddict:orddict({pid(), Meta :: any()}),
|
|
|
Replies :: [{{pid(), Meta :: term()}, Reply :: any()}],
|
|
|
BadReplies :: [{pid(), Meta :: term()}]
|
|
|
) ->
|
|
@@ -647,13 +647,13 @@ collect_replies(Members) ->
|
|
|
BadReplies :: [{pid(), Meta :: term()}]
|
|
|
}.
|
|
|
collect_replies([], Replies, BadReplies) -> {Replies, BadReplies};
|
|
|
-collect_replies(Members, Replies, BadReplies) ->
|
|
|
+collect_replies(MembersOD, Replies, BadReplies) ->
|
|
|
receive
|
|
|
{reply, Pid, Reply} ->
|
|
|
- {value, {Pid, Meta}, Members1} = lists:keytake(Pid, 1, Members),
|
|
|
- collect_replies(Members1, [{{Pid, Meta}, Reply} | Replies], BadReplies);
|
|
|
+ {Meta, MembersOD1} = orddict:take(Pid, MembersOD),
|
|
|
+ collect_replies(MembersOD1, [{{Pid, Meta}, Reply} | Replies], BadReplies);
|
|
|
|
|
|
{bad_pid, Pid} ->
|
|
|
- {value, {Pid, Meta}, Members1} = lists:keytake(Pid, 1, Members),
|
|
|
- collect_replies(Members1, Replies, [{Pid, Meta} | BadReplies])
|
|
|
+ {Meta, MembersOD1} = orddict:take(Pid, MembersOD),
|
|
|
+ collect_replies(MembersOD1, Replies, [{Pid, Meta} | BadReplies])
|
|
|
end.
|