|
@@ -33,7 +33,7 @@
|
|
|
-export([member/2]).
|
|
|
-export([get_members/1]).
|
|
|
-export([publish/2]).
|
|
|
--export([multi_call/2]).
|
|
|
+-export([multi_call/2, multi_call/3]).
|
|
|
-export([multi_call_reply/2]).
|
|
|
|
|
|
%% gen_server callbacks
|
|
@@ -43,7 +43,7 @@
|
|
|
-record(state, {}).
|
|
|
|
|
|
%% macros
|
|
|
--define(MULTI_CALL_TIMEOUT_MS, 5000).
|
|
|
+-define(DEFAULT_MULTI_CALL_TIMEOUT_MS, 5000).
|
|
|
|
|
|
%% include
|
|
|
-include("syn.hrl").
|
|
@@ -84,16 +84,20 @@ publish(Name, Message) ->
|
|
|
lists:foreach(FSend, MemberPids),
|
|
|
{ok, length(MemberPids)}.
|
|
|
|
|
|
--spec multi_call(Name :: any(), Message :: any()) ->
|
|
|
- {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
|
|
|
+-spec multi_call(Name :: any(), Message :: any()) -> {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
|
|
|
multi_call(Name, Message) ->
|
|
|
+ multi_call(Name, Message, ?DEFAULT_MULTI_CALL_TIMEOUT_MS).
|
|
|
+
|
|
|
+-spec multi_call(Name :: any(), Message :: any(), Timeout :: non_neg_integer()) ->
|
|
|
+ {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
|
|
|
+multi_call(Name, Message, Timeout) ->
|
|
|
Self = self(),
|
|
|
MemberPids = i_get_members(Name),
|
|
|
FSend = fun(Pid) ->
|
|
|
Pid ! {syn_multi_call, Self, Message}
|
|
|
end,
|
|
|
lists:foreach(FSend, MemberPids),
|
|
|
- collect_replies(MemberPids).
|
|
|
+ collect_replies(MemberPids, Timeout).
|
|
|
|
|
|
-spec multi_call_reply(CallerPid :: pid(), Reply :: any()) -> ok.
|
|
|
multi_call_reply(CallerPid, Reply) ->
|
|
@@ -274,19 +278,19 @@ i_find_by_pid(Pid) ->
|
|
|
remove_process(Process) ->
|
|
|
mnesia:dirty_delete_object(syn_groups_table, Process).
|
|
|
|
|
|
--spec collect_replies(MemberPids :: [pid()]) ->
|
|
|
+-spec collect_replies(MemberPids :: [pid()], Timeout :: non_neg_integer()) ->
|
|
|
{[{pid(), Reply :: any()}], [BadPid :: pid()]}.
|
|
|
-collect_replies(MemberPids) ->
|
|
|
- collect_replies(MemberPids, []).
|
|
|
+collect_replies(MemberPids, Timeout) ->
|
|
|
+ collect_replies(MemberPids, Timeout, []).
|
|
|
|
|
|
--spec collect_replies(MemberPids :: [pid()], Replies :: [{pid(), Reply :: any()}]) ->
|
|
|
+-spec collect_replies(MemberPids :: [pid()], Timeout :: non_neg_integer(), Replies :: [{pid(), Reply :: any()}]) ->
|
|
|
{[{pid(), Reply :: any()}], [BadPid :: pid()]}.
|
|
|
-collect_replies([], Replies) -> {Replies, []};
|
|
|
-collect_replies(MemberPids, Replies) ->
|
|
|
+collect_replies([], _Timeout, Replies) -> {Replies, []};
|
|
|
+collect_replies(MemberPids, Timeout, Replies) ->
|
|
|
receive
|
|
|
{syn_multi_call_reply, Pid, Reply} ->
|
|
|
MemberPids1 = lists:delete(Pid, MemberPids),
|
|
|
- collect_replies(MemberPids1, [{Pid, Reply} | Replies])
|
|
|
- after ?MULTI_CALL_TIMEOUT_MS ->
|
|
|
+ collect_replies(MemberPids1, Timeout, [{Pid, Reply} | Replies])
|
|
|
+ after Timeout ->
|
|
|
{Replies, MemberPids}
|
|
|
end.
|