syn_groups.erl 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531
  1. %% ==========================================================================================================
  2. %% Syn - A global Process Registry and Process Group manager.
  3. %%
  4. %% The MIT License (MIT)
  5. %%
  6. %% Copyright (c) 2015-2019 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
  7. %%
  8. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  9. %% of this software and associated documentation files (the "Software"), to deal
  10. %% in the Software without restriction, including without limitation the rights
  11. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  12. %% copies of the Software, and to permit persons to whom the Software is
  13. %% furnished to do so, subject to the following conditions:
  14. %%
  15. %% The above copyright notice and this permission notice shall be included in
  16. %% all copies or substantial portions of the Software.
  17. %%
  18. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  19. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  20. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  21. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  22. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  23. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  24. %% THE SOFTWARE.
  25. %% ==========================================================================================================
  26. -module(syn_groups).
  27. -behaviour(gen_server).
  28. %% API
  29. -export([start_link/0]).
  30. -export([join/2, join/3]).
  31. -export([leave/2]).
  32. -export([get_members/1, get_members/2]).
  33. -export([member/2]).
  34. -export([get_local_members/1, get_local_members/2]).
  35. -export([local_member/2]).
  36. -export([publish/2]).
  37. -export([publish_to_local/2]).
  38. -export([multi_call/2, multi_call/3, multi_call_reply/2]).
  39. %% sync API
  40. -export([sync_join/4, sync_leave/3]).
  41. -export([sync_get_local_group_tuples/1]).
  42. %% gen_server callbacks
  43. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  44. %% internal
  45. -export([multi_call_and_receive/4]).
  46. %% records
  47. -record(state, {
  48. custom_event_handler = undefined :: module()
  49. }).
  50. %% macros
  51. -define(DEFAULT_EVENT_HANDLER_MODULE, syn_event_handler).
  52. -define(DEFAULT_MULTI_CALL_TIMEOUT_MS, 5000).
  53. %% includes
  54. -include("syn.hrl").
  55. %% ===================================================================
  56. %% API
  57. %% ===================================================================
  58. -spec start_link() -> {ok, pid()} | {error, any()}.
  59. start_link() ->
  60. Options = [],
  61. gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
  62. -spec join(GroupName :: any(), Pid :: pid()) -> ok.
  63. join(GroupName, Pid) ->
  64. join(GroupName, Pid, undefined).
  65. -spec join(GroupName :: any(), Pid :: pid(), Meta :: any()) -> ok.
  66. join(GroupName, Pid, Meta) when is_pid(Pid) ->
  67. Node = node(Pid),
  68. gen_server:call({?MODULE, Node}, {join_on_node, GroupName, Pid, Meta}).
  69. -spec leave(GroupName :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
  70. leave(GroupName, Pid) ->
  71. case find_process_entry_by_name_and_pid(GroupName, Pid) of
  72. undefined ->
  73. {error, not_in_group};
  74. _ ->
  75. Node = node(Pid),
  76. gen_server:call({?MODULE, Node}, {leave_on_node, GroupName, Pid})
  77. end.
  78. -spec get_members(Name :: any()) -> [pid()].
  79. get_members(GroupName) ->
  80. Entries = mnesia:dirty_read(syn_groups_table, GroupName),
  81. Pids = [Entry#syn_groups_table.pid || Entry <- Entries],
  82. lists:sort(Pids).
  83. -spec get_members(GroupName :: any(), with_meta) -> [{pid(), Meta :: any()}].
  84. get_members(GroupName, with_meta) ->
  85. Entries = mnesia:dirty_read(syn_groups_table, GroupName),
  86. Pids = [{Entry#syn_groups_table.pid, Entry#syn_groups_table.meta} || Entry <- Entries],
  87. lists:sort(Pids).
  88. -spec member(Pid :: pid(), GroupName :: any()) -> boolean().
  89. member(Pid, GroupName) ->
  90. case find_process_entry_by_name_and_pid(GroupName, Pid) of
  91. undefined -> false;
  92. _ -> true
  93. end.
  94. -spec get_local_members(Name :: any()) -> [pid()].
  95. get_local_members(GroupName) ->
  96. %% build name guard
  97. NameGuard = case is_tuple(GroupName) of
  98. true -> {'==', '$1', {GroupName}};
  99. _ -> {'=:=', '$1', GroupName}
  100. end,
  101. %% build match specs
  102. MatchHead = #syn_groups_table{name = '$1', node = '$2', pid = '$3', _ = '_'},
  103. Guards = [NameGuard, {'=:=', '$2', node()}],
  104. Result = '$3',
  105. %% select
  106. Pids = mnesia:dirty_select(syn_groups_table, [{MatchHead, Guards, [Result]}]),
  107. lists:sort(Pids).
  108. -spec get_local_members(GroupName :: any(), with_meta) -> [{pid(), Meta :: any()}].
  109. get_local_members(GroupName, with_meta) ->
  110. %% build name guard
  111. NameGuard = case is_tuple(GroupName) of
  112. true -> {'==', '$1', {GroupName}};
  113. _ -> {'=:=', '$1', GroupName}
  114. end,
  115. %% build match specs
  116. MatchHead = #syn_groups_table{name = '$1', node = '$2', pid = '$3', meta = '$4', _ = '_'},
  117. Guards = [NameGuard, {'=:=', '$2', node()}],
  118. Result = {{'$3', '$4'}},
  119. %% select
  120. PidsWithMeta = mnesia:dirty_select(syn_groups_table, [{MatchHead, Guards, [Result]}]),
  121. lists:keysort(1, PidsWithMeta).
  122. -spec local_member(Pid :: pid(), GroupName :: any()) -> boolean().
  123. local_member(Pid, GroupName) ->
  124. case find_process_entry_by_name_and_pid(GroupName, Pid) of
  125. undefined -> false;
  126. Entry when Entry#syn_groups_table.node =:= node() -> true;
  127. _ -> false
  128. end.
  129. -spec publish(GroupName :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
  130. publish(GroupName, Message) ->
  131. MemberPids = get_members(GroupName),
  132. FSend = fun(Pid) ->
  133. Pid ! Message
  134. end,
  135. lists:foreach(FSend, MemberPids),
  136. {ok, length(MemberPids)}.
  137. -spec publish_to_local(GroupName :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
  138. publish_to_local(GroupName, Message) ->
  139. MemberPids = get_local_members(GroupName),
  140. FSend = fun(Pid) ->
  141. Pid ! Message
  142. end,
  143. lists:foreach(FSend, MemberPids),
  144. {ok, length(MemberPids)}.
  145. -spec multi_call(GroupName :: any(), Message :: any()) -> {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
  146. multi_call(GroupName, Message) ->
  147. multi_call(GroupName, Message, ?DEFAULT_MULTI_CALL_TIMEOUT_MS).
  148. -spec multi_call(GroupName :: any(), Message :: any(), Timeout :: non_neg_integer()) ->
  149. {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
  150. multi_call(GroupName, Message, Timeout) ->
  151. Self = self(),
  152. MemberPids = get_members(GroupName),
  153. FSend = fun(Pid) ->
  154. spawn_link(?MODULE, multi_call_and_receive, [Self, Pid, Message, Timeout])
  155. end,
  156. lists:foreach(FSend, MemberPids),
  157. collect_replies(MemberPids).
  158. -spec multi_call_reply(CallerPid :: pid(), Reply :: any()) -> {syn_multi_call_reply, pid(), Reply :: any()}.
  159. multi_call_reply(CallerPid, Reply) ->
  160. CallerPid ! {syn_multi_call_reply, self(), Reply}.
  161. -spec sync_join(RemoteNode :: node(), GroupName :: any(), Pid :: pid(), Meta :: any()) -> ok.
  162. sync_join(RemoteNode, GroupName, Pid, Meta) ->
  163. gen_server:cast({?MODULE, RemoteNode}, {sync_join, GroupName, Pid, Meta}).
  164. -spec sync_leave(RemoteNode :: node(), GroupName :: any(), Pid :: pid()) -> ok.
  165. sync_leave(RemoteNode, GroupName, Pid) ->
  166. gen_server:cast({?MODULE, RemoteNode}, {sync_leave, GroupName, Pid}).
  167. -spec sync_get_local_group_tuples(FromNode :: node()) -> list(syn_group_tuple()).
  168. sync_get_local_group_tuples(FromNode) ->
  169. error_logger:info_msg("Syn(~p): Received request of local group tuples from remote node: ~p~n", [node(), FromNode]),
  170. %% build match specs
  171. MatchHead = #syn_groups_table{name = '$1', pid = '$2', node = '$3', meta = '$4', _ = '_'},
  172. Guard = {'=:=', '$3', node()},
  173. GroupTupleFormat = {{'$1', '$2', '$4'}},
  174. %% select
  175. mnesia:dirty_select(syn_groups_table, [{MatchHead, [Guard], [GroupTupleFormat]}]).
  176. %% ===================================================================
  177. %% Callbacks
  178. %% ===================================================================
  179. %% ----------------------------------------------------------------------------------------------------------
  180. %% Init
  181. %% ----------------------------------------------------------------------------------------------------------
  182. -spec init([]) ->
  183. {ok, #state{}} |
  184. {ok, #state{}, Timeout :: non_neg_integer()} |
  185. ignore |
  186. {stop, Reason :: any()}.
  187. init([]) ->
  188. %% wait for table
  189. case mnesia:wait_for_tables([syn_groups_table], 10000) of
  190. ok ->
  191. %% monitor nodes
  192. ok = net_kernel:monitor_nodes(true),
  193. %% get handler
  194. CustomEventHandler = application:get_env(syn, event_handler, ?DEFAULT_EVENT_HANDLER_MODULE),
  195. %% ensure that is it loaded (not using code:ensure_loaded/1 to support embedded mode)
  196. catch CustomEventHandler:module_info(exports),
  197. %% init
  198. {ok, #state{
  199. custom_event_handler = CustomEventHandler
  200. }};
  201. Reason ->
  202. {stop, {error_waiting_for_groups_table, Reason}}
  203. end.
  204. %% ----------------------------------------------------------------------------------------------------------
  205. %% Call messages
  206. %% ----------------------------------------------------------------------------------------------------------
  207. -spec handle_call(Request :: any(), From :: any(), #state{}) ->
  208. {reply, Reply :: any(), #state{}} |
  209. {reply, Reply :: any(), #state{}, Timeout :: non_neg_integer()} |
  210. {noreply, #state{}} |
  211. {noreply, #state{}, Timeout :: non_neg_integer()} |
  212. {stop, Reason :: any(), Reply :: any(), #state{}} |
  213. {stop, Reason :: any(), #state{}}.
  214. handle_call({join_on_node, GroupName, Pid, Meta}, _From, State) ->
  215. %% check if pid is alive
  216. case is_process_alive(Pid) of
  217. true ->
  218. join_on_node(GroupName, Pid, Meta),
  219. %% multicast
  220. multicast_join(GroupName, Pid, Meta),
  221. %% return
  222. {reply, ok, State};
  223. _ ->
  224. {reply, {error, not_alive}, State}
  225. end;
  226. handle_call({leave_on_node, GroupName, Pid}, _From, State) ->
  227. case leave_on_node(GroupName, Pid) of
  228. ok ->
  229. %% multicast
  230. multicast_leave(GroupName, Pid),
  231. %% return
  232. {reply, ok, State};
  233. {error, Reason} ->
  234. %% return
  235. {reply, {error, Reason}, State}
  236. end;
  237. handle_call(Request, From, State) ->
  238. error_logger:warning_msg("Syn(~p): Received from ~p an unknown call message: ~p~n", [node(), Request, From]),
  239. {reply, undefined, State}.
  240. %% ----------------------------------------------------------------------------------------------------------
  241. %% Cast messages
  242. %% ----------------------------------------------------------------------------------------------------------
  243. -spec handle_cast(Msg :: any(), #state{}) ->
  244. {noreply, #state{}} |
  245. {noreply, #state{}, Timeout :: non_neg_integer()} |
  246. {stop, Reason :: any(), #state{}}.
  247. handle_cast({sync_join, GroupName, Pid, Meta}, State) ->
  248. %% add to table
  249. add_to_local_table(GroupName, Pid, Meta, undefined),
  250. %% return
  251. {noreply, State};
  252. handle_cast({sync_leave, GroupName, Pid}, State) ->
  253. %% remove entry
  254. remove_from_local_table(GroupName, Pid),
  255. %% return
  256. {noreply, State};
  257. handle_cast(Msg, State) ->
  258. error_logger:warning_msg("Syn(~p): Received an unknown cast message: ~p~n", [node(), Msg]),
  259. {noreply, State}.
  260. %% ----------------------------------------------------------------------------------------------------------
  261. %% All non Call / Cast messages
  262. %% ----------------------------------------------------------------------------------------------------------
  263. -spec handle_info(Info :: any(), #state{}) ->
  264. {noreply, #state{}} |
  265. {noreply, #state{}, Timeout :: non_neg_integer()} |
  266. {stop, Reason :: any(), #state{}}.
  267. handle_info({'DOWN', _MonitorRef, process, Pid, Reason}, State) ->
  268. case find_processes_entry_by_pid(Pid) of
  269. [] ->
  270. %% handle
  271. handle_process_down(undefined, Pid, undefined, Reason, State);
  272. Entries ->
  273. lists:foreach(fun(Entry) ->
  274. %% get process info
  275. GroupName = Entry#syn_groups_table.name,
  276. Meta = Entry#syn_groups_table.meta,
  277. %% handle
  278. handle_process_down(GroupName, Pid, Meta, Reason, State),
  279. %% remove from table
  280. remove_from_local_table(Entry),
  281. %% multicast
  282. multicast_leave(GroupName, Pid)
  283. end, Entries)
  284. end,
  285. %% return
  286. {noreply, State};
  287. handle_info({nodeup, RemoteNode}, State) ->
  288. error_logger:info_msg("Syn(~p): Node ~p has joined the cluster~n", [node(), RemoteNode]),
  289. global:trans({{?MODULE, auto_merge_groups}, self()},
  290. fun() ->
  291. error_logger:warning_msg("Syn(~p): GROUPS AUTOMERGE ----> Initiating for remote node ~p~n", [node(), RemoteNode]),
  292. %% get group tuples from remote node
  293. GroupTuples = rpc:call(RemoteNode, ?MODULE, sync_get_local_group_tuples, [node()]),
  294. error_logger:warning_msg(
  295. "Syn(~p): Received ~p group entrie(s) from remote node ~p, writing to local~n",
  296. [node(), length(GroupTuples), RemoteNode]
  297. ),
  298. sync_group_tuples(RemoteNode, GroupTuples),
  299. %% exit
  300. error_logger:warning_msg("Syn(~p): GROUPS AUTOMERGE <---- Done for remote node ~p~n", [node(), RemoteNode])
  301. end
  302. ),
  303. %% resume
  304. {noreply, State};
  305. handle_info({nodedown, RemoteNode}, State) ->
  306. error_logger:warning_msg("Syn(~p): Node ~p has left the cluster, removing group entries on local~n", [node(), RemoteNode]),
  307. purge_group_entries_for_remote_node(RemoteNode),
  308. {noreply, State};
  309. handle_info(Info, State) ->
  310. error_logger:warning_msg("Syn(~p): Received an unknown info message: ~p~n", [node(), Info]),
  311. {noreply, State}.
  312. %% ----------------------------------------------------------------------------------------------------------
  313. %% Terminate
  314. %% ----------------------------------------------------------------------------------------------------------
  315. -spec terminate(Reason :: any(), #state{}) -> terminated.
  316. terminate(Reason, _State) ->
  317. error_logger:info_msg("Syn(~p): Terminating with reason: ~p~n", [node(), Reason]),
  318. terminated.
  319. %% ----------------------------------------------------------------------------------------------------------
  320. %% Convert process state when code is changed.
  321. %% ----------------------------------------------------------------------------------------------------------
  322. -spec code_change(OldVsn :: any(), #state{}, Extra :: any()) -> {ok, #state{}}.
  323. code_change(_OldVsn, State, _Extra) ->
  324. {ok, State}.
  325. %% ===================================================================
  326. %% Internal
  327. %% ===================================================================
  328. -spec multicast_join(GroupName :: any(), Pid :: pid(), Meta :: any()) -> pid().
  329. multicast_join(GroupName, Pid, Meta) ->
  330. spawn_link(fun() ->
  331. lists:foreach(fun(RemoteNode) ->
  332. sync_join(RemoteNode, GroupName, Pid, Meta)
  333. end, nodes())
  334. end).
  335. -spec multicast_leave(GroupName :: any(), Pid :: pid()) -> pid().
  336. multicast_leave(GroupName, Pid) ->
  337. spawn_link(fun() ->
  338. lists:foreach(fun(RemoteNode) ->
  339. sync_leave(RemoteNode, GroupName, Pid)
  340. end, nodes())
  341. end).
  342. -spec join_on_node(GroupName :: any(), Pid :: pid(), Meta :: any()) -> ok.
  343. join_on_node(GroupName, Pid, Meta) ->
  344. MonitorRef = case find_processes_entry_by_pid(Pid) of
  345. [] ->
  346. %% process is not monitored yet, add
  347. erlang:monitor(process, Pid);
  348. [Entry | _] ->
  349. Entry#syn_groups_table.monitor_ref
  350. end,
  351. %% add to table
  352. add_to_local_table(GroupName, Pid, Meta, MonitorRef).
  353. -spec leave_on_node(GroupName :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
  354. leave_on_node(GroupName, Pid) ->
  355. case find_process_entry_by_name_and_pid(GroupName, Pid) of
  356. undefined ->
  357. {error, not_in_group};
  358. Entry when Entry#syn_groups_table.monitor_ref =/= undefined ->
  359. %% is this the last group process is in?
  360. case find_processes_entry_by_pid(Pid) of
  361. [Entry] ->
  362. %% demonitor
  363. erlang:demonitor(Entry#syn_groups_table.monitor_ref);
  364. _ ->
  365. ok
  366. end,
  367. %% remove from table
  368. remove_from_local_table(Entry)
  369. end.
  370. -spec add_to_local_table(GroupName :: any(), Pid :: pid(), Meta :: any(), MonitorRef :: undefined | reference()) -> ok.
  371. add_to_local_table(GroupName, Pid, Meta, MonitorRef) ->
  372. %% clean if any
  373. remove_from_local_table(GroupName, Pid),
  374. %% write
  375. mnesia:dirty_write(#syn_groups_table{
  376. name = GroupName,
  377. pid = Pid,
  378. node = node(Pid),
  379. meta = Meta,
  380. monitor_ref = MonitorRef
  381. }).
  382. -spec remove_from_local_table(GroupName :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
  383. remove_from_local_table(GroupName, Pid) ->
  384. case find_process_entry_by_name_and_pid(GroupName, Pid) of
  385. undefined ->
  386. {error, not_in_group};
  387. Entry ->
  388. %% remove from table
  389. remove_from_local_table(Entry)
  390. end.
  391. -spec remove_from_local_table(Entry :: #syn_groups_table{}) -> ok.
  392. remove_from_local_table(Entry) ->
  393. mnesia:dirty_delete_object(syn_groups_table, Entry).
  394. -spec find_processes_entry_by_pid(Pid :: pid()) -> Entries :: list(#syn_groups_table{}).
  395. find_processes_entry_by_pid(Pid) when is_pid(Pid) ->
  396. mnesia:dirty_index_read(syn_groups_table, Pid, #syn_groups_table.pid).
  397. -spec find_process_entry_by_name_and_pid(GroupName :: any(), Pid :: pid()) -> Entry :: #syn_groups_table{} | undefined.
  398. find_process_entry_by_name_and_pid(GroupName, Pid) ->
  399. %% build match specs
  400. MatchHead = #syn_groups_table{name = GroupName, pid = Pid, _ = '_'},
  401. Guards = [],
  402. Result = '$_',
  403. %% select
  404. case mnesia:dirty_select(syn_groups_table, [{MatchHead, Guards, [Result]}]) of
  405. [Entry] -> Entry;
  406. [] -> undefined
  407. end.
  408. -spec handle_process_down(GroupName :: any(), Pid :: pid(), Meta :: any(), Reason :: any(), #state{}) -> ok.
  409. handle_process_down(GroupName, Pid, Meta, Reason, #state{
  410. custom_event_handler = CustomEventHandler
  411. }) ->
  412. case GroupName of
  413. undefined ->
  414. error_logger:warning_msg(
  415. "Syn(~p): Received a DOWN message from an unmonitored group process ~p with reason: ~p~n",
  416. [node(), Pid, Reason]
  417. );
  418. _ ->
  419. syn_event_handler:do_on_group_process_exit(GroupName, Pid, Meta, Reason, CustomEventHandler)
  420. end.
  421. -spec sync_group_tuples(RemoteNode :: node(), GroupTuples :: [syn_registry_tuple()]) -> ok.
  422. sync_group_tuples(RemoteNode, GroupTuples) ->
  423. %% ensure that groups doesn't have any joining node's entries (here again for race conditions)
  424. purge_group_entries_for_remote_node(RemoteNode),
  425. %% loop
  426. F = fun({Name, RemotePid, RemoteMeta}) ->
  427. join_on_node(Name, RemotePid, RemoteMeta)
  428. end,
  429. %% add to table
  430. lists:foreach(F, GroupTuples).
  431. -spec purge_group_entries_for_remote_node(Node :: atom()) -> ok.
  432. purge_group_entries_for_remote_node(Node) when Node =/= node() ->
  433. %% NB: no demonitoring is done, hence why this needs to run for a remote node
  434. %% build match specs
  435. Pattern = #syn_groups_table{node = Node, _ = '_'},
  436. ObjectsToDelete = mnesia:dirty_match_object(syn_groups_table, Pattern),
  437. %% delete
  438. DelF = fun(Record) -> mnesia:dirty_delete_object(syn_groups_table, Record) end,
  439. lists:foreach(DelF, ObjectsToDelete).
  440. -spec multi_call_and_receive(
  441. CollectorPid :: pid(),
  442. Pid :: pid(),
  443. Message :: any(),
  444. Timeout :: non_neg_integer()
  445. ) -> any().
  446. multi_call_and_receive(CollectorPid, Pid, Message, Timeout) ->
  447. MonitorRef = monitor(process, Pid),
  448. Pid ! {syn_multi_call, self(), Message},
  449. receive
  450. {syn_multi_call_reply, Pid, Reply} ->
  451. CollectorPid ! {reply, Pid, Reply};
  452. {'DOWN', MonitorRef, _, _, _} ->
  453. CollectorPid ! {bad_pid, Pid}
  454. after Timeout ->
  455. CollectorPid ! {bad_pid, Pid}
  456. end.
  457. -spec collect_replies(MemberPids :: [pid()]) -> {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
  458. collect_replies(MemberPids) ->
  459. collect_replies(MemberPids, [], []).
  460. -spec collect_replies(MemberPids :: [pid()], [{pid(), Reply :: any()}], [pid()]) ->
  461. {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
  462. collect_replies([], Replies, BadPids) -> {Replies, BadPids};
  463. collect_replies(MemberPids, Replies, BadPids) ->
  464. receive
  465. {reply, Pid, Reply} ->
  466. MemberPids1 = lists:delete(Pid, MemberPids),
  467. collect_replies(MemberPids1, [{Pid, Reply} | Replies], BadPids);
  468. {bad_pid, Pid} ->
  469. MemberPids1 = lists:delete(Pid, MemberPids),
  470. collect_replies(MemberPids1, Replies, [Pid | BadPids])
  471. end.