syn_groups.erl 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697
  1. %% ==========================================================================================================
  2. %% Syn - A global Process Registry and Process Group manager.
  3. %%
  4. %% The MIT License (MIT)
  5. %%
  6. %% Copyright (c) 2015-2021 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
  7. %%
  8. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  9. %% of this software and associated documentation files (the "Software"), to deal
  10. %% in the Software without restriction, including without limitation the rights
  11. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  12. %% copies of the Software, and to permit persons to whom the Software is
  13. %% furnished to do so, subject to the following conditions:
  14. %%
  15. %% The above copyright notice and this permission notice shall be included in
  16. %% all copies or substantial portions of the Software.
  17. %%
  18. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  19. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THxE WARRANTIES OF MERCHANTABILITY,
  20. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  21. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  22. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  23. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  24. %% THE SOFTWARE.
  25. %% ==========================================================================================================
  26. -module(syn_groups).
  27. -behaviour(syn_gen_scope).
  28. %% API
  29. -export([start_link/1]).
  30. -export([get_subcluster_nodes/1]).
  31. -export([join/2, join/3, join/4]).
  32. -export([leave/2, leave/3]).
  33. -export([members/1, members/2]).
  34. -export([is_member/2, is_member/3]).
  35. -export([local_members/1, local_members/2]).
  36. -export([is_local_member/2, is_local_member/3]).
  37. -export([count/0, count/1, count/2]).
  38. -export([local_count/0, local_count/1]).
  39. -export([group_names/0, group_names/1, group_names/2]).
  40. -export([local_group_names/0, local_group_names/1]).
  41. -export([publish/2, publish/3]).
  42. -export([local_publish/2, local_publish/3]).
  43. -export([multi_call/2, multi_call/3, multi_call/4, multi_call_reply/2]).
  44. %% syn_gen_scope callbacks
  45. -export([
  46. init/1,
  47. handle_call/3,
  48. handle_info/2,
  49. save_remote_data/2,
  50. get_local_data/1,
  51. purge_local_data_for_node/2
  52. ]).
  53. %% internal
  54. -export([multi_call_and_receive/5]).
  55. %% macros
  56. -define(DEFAULT_MULTI_CALL_TIMEOUT_MS, 5000).
  57. %% includes
  58. -include("syn.hrl").
  59. %% ===================================================================
  60. %% API
  61. %% ===================================================================
  62. -spec start_link(Scope :: atom()) ->
  63. {ok, Pid :: pid()} | {error, {already_started, Pid :: pid()}} | {error, Reason :: term()}.
  64. start_link(Scope) when is_atom(Scope) ->
  65. syn_gen_scope:start_link(?MODULE, Scope).
  66. -spec get_subcluster_nodes(Scope :: atom()) -> [node()].
  67. get_subcluster_nodes(Scope) ->
  68. syn_gen_scope:get_subcluster_nodes(?MODULE, Scope).
  69. -spec members(GroupName :: term()) -> [{Pid :: pid(), Meta :: term()}].
  70. members(GroupName) ->
  71. members(?DEFAULT_SCOPE, GroupName).
  72. -spec members(Scope :: atom(), GroupName :: term()) -> [{Pid :: pid(), Meta :: term()}].
  73. members(Scope, GroupName) ->
  74. do_get_members(Scope, GroupName, '_').
  75. -spec is_member(GroupName :: term(), Pid :: pid()) -> boolean().
  76. is_member(GroupName, Pid) ->
  77. is_member(?DEFAULT_SCOPE, GroupName, Pid).
  78. -spec is_member(Scope :: atom(), GroupName :: term(), Pid :: pid()) -> boolean().
  79. is_member(Scope, GroupName, Pid) ->
  80. case syn_backbone:get_table_name(syn_groups_by_name, Scope) of
  81. undefined ->
  82. error({invalid_scope, Scope});
  83. TableByName ->
  84. case find_groups_entry_by_name_and_pid(GroupName, Pid, TableByName) of
  85. undefined -> false;
  86. _ -> true
  87. end
  88. end.
  89. -spec local_members(GroupName :: term()) -> [{Pid :: pid(), Meta :: term()}].
  90. local_members(GroupName) ->
  91. local_members(?DEFAULT_SCOPE, GroupName).
  92. -spec local_members(Scope :: atom(), GroupName :: term()) -> [{Pid :: pid(), Meta :: term()}].
  93. local_members(Scope, GroupName) ->
  94. do_get_members(Scope, GroupName, node()).
  95. -spec do_get_members(Scope :: atom(), GroupName :: term(), NodeParam :: atom()) -> [{Pid :: pid(), Meta :: term()}].
  96. do_get_members(Scope, GroupName, NodeParam) ->
  97. case syn_backbone:get_table_name(syn_groups_by_name, Scope) of
  98. undefined ->
  99. error({invalid_scope, Scope});
  100. TableByName ->
  101. ets:select(TableByName, [{
  102. {{GroupName, '$2'}, '$3', '_', '_', NodeParam},
  103. [],
  104. [{{'$2', '$3'}}]
  105. }])
  106. end.
  107. -spec is_local_member(GroupName :: term(), Pid :: pid()) -> boolean().
  108. is_local_member(GroupName, Pid) ->
  109. is_local_member(?DEFAULT_SCOPE, GroupName, Pid).
  110. -spec is_local_member(Scope :: atom(), GroupName :: term(), Pid :: pid()) -> boolean().
  111. is_local_member(Scope, GroupName, Pid) ->
  112. case syn_backbone:get_table_name(syn_groups_by_name, Scope) of
  113. undefined ->
  114. error({invalid_scope, Scope});
  115. TableByName ->
  116. case find_groups_entry_by_name_and_pid(GroupName, Pid, TableByName) of
  117. {{_, _}, _, _, _, Node} when Node =:= node() -> true;
  118. _ -> false
  119. end
  120. end.
  121. -spec join(GroupName :: term(), Pid :: pid()) -> ok.
  122. join(GroupName, Pid) ->
  123. join(GroupName, Pid, undefined).
  124. -spec join(GroupNameOrScope :: term(), PidOrGroupName :: term(), MetaOrPid :: term()) -> ok.
  125. join(GroupName, Pid, Meta) when is_pid(Pid) ->
  126. join(?DEFAULT_SCOPE, GroupName, Pid, Meta);
  127. join(Scope, GroupName, Pid) when is_pid(Pid) ->
  128. join(Scope, GroupName, Pid, undefined).
  129. -spec join(Scope :: atom(), GroupName :: term(), Pid :: pid(), Meta :: term()) -> ok.
  130. join(Scope, GroupName, Pid, Meta) ->
  131. Node = node(Pid),
  132. case syn_gen_scope:call(?MODULE, Node, Scope, {join_on_node, node(), GroupName, Pid, Meta}) of
  133. {ok, {CallbackMethod, Time, TableByName, TableByPid}} when Node =/= node() ->
  134. %% update table on caller node immediately so that subsequent calls have an updated registry
  135. add_to_local_table(GroupName, Pid, Meta, Time, undefined, TableByName, TableByPid),
  136. %% callback
  137. syn_event_handler:call_event_handler(CallbackMethod, [Scope, GroupName, Pid, Meta]),
  138. %% return
  139. ok;
  140. {Response, _} ->
  141. Response
  142. end.
  143. -spec leave(GroupName :: term(), Pid :: pid()) -> ok | {error, Reason :: term()}.
  144. leave(GroupName, Pid) ->
  145. leave(?DEFAULT_SCOPE, GroupName, Pid).
  146. -spec leave(Scope :: atom(), GroupName :: term(), Pid :: pid()) -> ok | {error, Reason :: term()}.
  147. leave(Scope, GroupName, Pid) ->
  148. case syn_backbone:get_table_name(syn_groups_by_name, Scope) of
  149. undefined ->
  150. error({invalid_scope, Scope});
  151. TableByName ->
  152. Node = node(Pid),
  153. case syn_gen_scope:call(?MODULE, Node, Scope, {leave_on_node, node(), GroupName, Pid}) of
  154. {ok, {Meta, TableByPid}} when Node =/= node() ->
  155. %% remove table on caller node immediately so that subsequent calls have an updated registry
  156. remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
  157. %% callback
  158. syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta]),
  159. %% return
  160. ok;
  161. {Response, _} ->
  162. Response
  163. end
  164. end.
  165. -spec count() -> non_neg_integer().
  166. count() ->
  167. count(?DEFAULT_SCOPE).
  168. -spec count(Scope :: atom()) -> non_neg_integer().
  169. count(Scope) ->
  170. Set = group_names_ordset(Scope, '_'),
  171. ordsets:size(Set).
  172. -spec count(Scope :: atom(), Node :: node()) -> non_neg_integer().
  173. count(Scope, Node) ->
  174. Set = group_names_ordset(Scope, Node),
  175. ordsets:size(Set).
  176. -spec local_count() -> non_neg_integer().
  177. local_count() ->
  178. count(?DEFAULT_SCOPE, node()).
  179. -spec local_count(Scope :: atom()) -> non_neg_integer().
  180. local_count(Scope) ->
  181. count(Scope, node()).
  182. -spec group_names() -> [GroupName :: term()].
  183. group_names() ->
  184. group_names(?DEFAULT_SCOPE).
  185. -spec group_names(Scope :: atom()) -> [GroupName :: term()].
  186. group_names(Scope) ->
  187. Set = group_names_ordset(Scope, '_'),
  188. ordsets:to_list(Set).
  189. -spec group_names(Scope :: atom(), Node :: node()) -> [GroupName :: term()].
  190. group_names(Scope, Node) ->
  191. Set = group_names_ordset(Scope, Node),
  192. ordsets:to_list(Set).
  193. -spec local_group_names() -> [GroupName :: term()].
  194. local_group_names() ->
  195. group_names(?DEFAULT_SCOPE, node()).
  196. -spec local_group_names(Scope :: atom()) -> [GroupName :: term()].
  197. local_group_names(Scope) ->
  198. group_names(Scope, node()).
  199. -spec group_names_ordset(Scope :: atom(), Node :: node()) -> ordsets:ordset(GroupName :: term()).
  200. group_names_ordset(Scope, NodeParam) ->
  201. case syn_backbone:get_table_name(syn_groups_by_name, Scope) of
  202. undefined ->
  203. error({invalid_scope, Scope});
  204. TableByName ->
  205. Groups = ets:select(TableByName, [{
  206. {{'$1', '_'}, '_', '_', '_', NodeParam},
  207. [],
  208. ['$1']
  209. }]),
  210. ordsets:from_list(Groups)
  211. end.
  212. -spec publish(GroupName :: term(), Message :: term()) -> {ok, RecipientCount :: non_neg_integer()}.
  213. publish(GroupName, Message) ->
  214. publish(?DEFAULT_SCOPE, GroupName, Message).
  215. -spec publish(Scope :: atom(), GroupName :: term(), Message :: term()) -> {ok, RecipientCount :: non_neg_integer()}.
  216. publish(Scope, GroupName, Message) ->
  217. Members = members(Scope, GroupName),
  218. do_publish(Members, Message).
  219. -spec local_publish(GroupName :: term(), Message :: term()) -> {ok, RecipientCount :: non_neg_integer()}.
  220. local_publish(GroupName, Message) ->
  221. local_publish(?DEFAULT_SCOPE, GroupName, Message).
  222. -spec local_publish(Scope :: atom(), GroupName :: term(), Message :: term()) -> {ok, RecipientCount :: non_neg_integer()}.
  223. local_publish(Scope, GroupName, Message) ->
  224. Members = local_members(Scope, GroupName),
  225. do_publish(Members, Message).
  226. -spec do_publish(Members :: [{Pid :: pid(), Meta :: term()}], Message :: term()) ->
  227. {ok, RecipientCount :: non_neg_integer()}.
  228. do_publish(Members, Message) ->
  229. lists:foreach(fun({Pid, _Meta}) ->
  230. Pid ! Message
  231. end, Members),
  232. {ok, length(Members)}.
  233. -spec multi_call(GroupName :: term(), Message :: term()) -> {[{pid(), Reply :: term()}], [BadPid :: pid()]}.
  234. multi_call(GroupName, Message) ->
  235. multi_call(?DEFAULT_SCOPE, GroupName, Message).
  236. -spec multi_call(Scope :: atom(), GroupName :: term(), Message :: term()) -> {[{pid(), Reply :: term()}], [BadPid :: pid()]}.
  237. multi_call(Scope, GroupName, Message) ->
  238. multi_call(Scope, GroupName, Message, ?DEFAULT_MULTI_CALL_TIMEOUT_MS).
  239. -spec multi_call(Scope :: atom(), GroupName :: term(), Message :: term(), Timeout :: non_neg_integer()) ->
  240. {[{pid(), Reply :: term()}], [BadPid :: pid()]}.
  241. multi_call(Scope, GroupName, Message, Timeout) ->
  242. Self = self(),
  243. Members = members(Scope, GroupName),
  244. lists:foreach(fun({Pid, Meta}) ->
  245. spawn_link(?MODULE, multi_call_and_receive, [Self, Pid, Meta, Message, Timeout])
  246. end, Members),
  247. collect_replies(orddict:from_list(Members)).
  248. -spec multi_call_reply(CallerPid :: pid(), Reply :: term()) -> {syn_multi_call_reply, pid(), Reply :: term()}.
  249. multi_call_reply(CallerPid, Reply) ->
  250. CallerPid ! {syn_multi_call_reply, self(), Reply}.
  251. %% ===================================================================
  252. %% Callbacks
  253. %% ===================================================================
  254. %% ----------------------------------------------------------------------------------------------------------
  255. %% Init
  256. %% ----------------------------------------------------------------------------------------------------------
  257. -spec init(#state{}) -> {ok, HandlerState :: term()}.
  258. init(State) ->
  259. HandlerState = #{},
  260. %% rebuild
  261. rebuild_monitors(State),
  262. %% init
  263. {ok, HandlerState}.
  264. %% ----------------------------------------------------------------------------------------------------------
  265. %% Call messages
  266. %% ----------------------------------------------------------------------------------------------------------
  267. -spec handle_call(Request :: term(), From :: {pid(), Tag :: term()}, #state{}) ->
  268. {reply, Reply :: term(), #state{}} |
  269. {reply, Reply :: term(), #state{}, timeout() | hibernate | {continue, term()}} |
  270. {noreply, #state{}} |
  271. {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
  272. {stop, Reason :: term(), Reply :: term(), #state{}} |
  273. {stop, Reason :: term(), #state{}}.
  274. handle_call({join_on_node, RequesterNode, GroupName, Pid, Meta}, _From, #state{
  275. table_by_name = TableByName,
  276. table_by_pid = TableByPid
  277. } = State) ->
  278. case is_process_alive(Pid) of
  279. true ->
  280. case find_groups_entry_by_name_and_pid(GroupName, Pid, TableByName) of
  281. undefined ->
  282. %% add
  283. MRef = case find_monitor_for_pid(Pid, TableByPid) of
  284. undefined -> erlang:monitor(process, Pid); %% process is not monitored yet, create
  285. MRef0 -> MRef0
  286. end,
  287. do_join_on_node(GroupName, Pid, Meta, MRef, RequesterNode, on_process_joined, State);
  288. {{_, Meta}, _, _, _, _} ->
  289. %% re-joined with same meta
  290. {ok, noop};
  291. {{_, _}, _, _, MRef, _} ->
  292. do_join_on_node(GroupName, Pid, Meta, MRef, RequesterNode, on_group_process_updated, State)
  293. end;
  294. false ->
  295. {reply, {{error, not_alive}, undefined}, State}
  296. end;
  297. handle_call({leave_on_node, RequesterNode, GroupName, Pid}, _From, #state{
  298. scope = Scope,
  299. table_by_name = TableByName,
  300. table_by_pid = TableByPid
  301. } = State) ->
  302. case find_groups_entry_by_name_and_pid(GroupName, Pid, TableByName) of
  303. undefined ->
  304. {reply, {{error, not_in_group}, undefined}, State};
  305. {{_, _}, Meta, _, _, _} ->
  306. %% is this the last group process is in?
  307. maybe_demonitor(Pid, TableByPid),
  308. %% remove from table
  309. remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
  310. %% callback
  311. syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta]),
  312. %% broadcast
  313. syn_gen_scope:broadcast({'3.0', sync_leave, GroupName, Pid, Meta}, [RequesterNode], State),
  314. %% return
  315. {reply, {ok, {Meta, TableByPid}}, State}
  316. end;
  317. handle_call(Request, From, #state{scope = Scope} = State) ->
  318. error_logger:warning_msg("SYN[~s<~s>] Received from ~p an unknown call message: ~p", [?MODULE, Scope, From, Request]),
  319. {reply, undefined, State}.
  320. %% ----------------------------------------------------------------------------------------------------------
  321. %% Info messages
  322. %% ----------------------------------------------------------------------------------------------------------
  323. -spec handle_info(Info :: timeout | term(), #state{}) ->
  324. {noreply, #state{}} |
  325. {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
  326. {stop, Reason :: term(), #state{}}.
  327. handle_info({'3.0', sync_join, GroupName, Pid, Meta, Time}, State) ->
  328. handle_groups_sync(GroupName, Pid, Meta, Time, State),
  329. {noreply, State};
  330. handle_info({'3.0', sync_leave, GroupName, Pid, Meta}, #state{
  331. scope = Scope,
  332. table_by_name = TableByName,
  333. table_by_pid = TableByPid
  334. } = State) ->
  335. %% remove from table
  336. remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
  337. %% callback
  338. syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta]),
  339. %% return
  340. {noreply, State};
  341. handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{
  342. scope = Scope,
  343. table_by_name = TableByName,
  344. table_by_pid = TableByPid
  345. } = State) ->
  346. case find_groups_entries_by_pid(Pid, TableByPid) of
  347. [] ->
  348. error_logger:warning_msg(
  349. "SYN[~s<~s>] Received a DOWN message from an unknown process ~p with reason: ~p",
  350. [?MODULE, Scope, Pid, Reason]
  351. );
  352. Entries ->
  353. lists:foreach(fun({{_Pid, GroupName}, Meta, _, _, _}) ->
  354. %% remove from table
  355. remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
  356. %% callback
  357. syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta]),
  358. %% broadcast
  359. syn_gen_scope:broadcast({'3.0', sync_leave, GroupName, Pid, Meta}, State)
  360. end, Entries)
  361. end,
  362. %% return
  363. {noreply, State};
  364. handle_info(Info, #state{scope = Scope} = State) ->
  365. error_logger:warning_msg("SYN[~s<~s>] Received an unknown info message: ~p", [?MODULE, Scope, Info]),
  366. {noreply, State}.
  367. %% ----------------------------------------------------------------------------------------------------------
  368. %% Data callbacks
  369. %% ----------------------------------------------------------------------------------------------------------
  370. -spec get_local_data(State :: term()) -> {ok, Data :: term()} | undefined.
  371. get_local_data(#state{table_by_name = TableByName}) ->
  372. {ok, get_groups_tuples_for_node(node(), TableByName)}.
  373. -spec save_remote_data(RemoteData :: term(), State :: term()) -> any().
  374. save_remote_data(GroupsTuplesOfRemoteNode, State) ->
  375. %% insert tuples
  376. lists:foreach(fun({GroupName, Pid, Meta, Time}) ->
  377. handle_groups_sync(GroupName, Pid, Meta, Time, State)
  378. end, GroupsTuplesOfRemoteNode).
  379. -spec purge_local_data_for_node(Node :: node(), State :: term()) -> any().
  380. purge_local_data_for_node(Node, #state{
  381. scope = Scope,
  382. table_by_name = TableByName,
  383. table_by_pid = TableByPid
  384. }) ->
  385. purge_groups_for_remote_node(Scope, Node, TableByName, TableByPid).
  386. %% ===================================================================
  387. %% Internal
  388. %% ===================================================================
  389. -spec rebuild_monitors(#state{}) -> ok.
  390. rebuild_monitors(#state{
  391. table_by_name = TableByName
  392. } = State) ->
  393. GroupsTuples = get_groups_tuples_for_node(node(), TableByName),
  394. do_rebuild_monitors(GroupsTuples, #{}, State).
  395. -spec do_rebuild_monitors([syn_groups_tuple()], #{pid() => reference()}, #state{}) -> ok.
  396. do_rebuild_monitors([], _, _) -> ok;
  397. do_rebuild_monitors([{GroupName, Pid, Meta, Time} | T], NewMRefs, #state{
  398. table_by_name = TableByName,
  399. table_by_pid = TableByPid
  400. } = State) ->
  401. remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
  402. case is_process_alive(Pid) of
  403. true ->
  404. case maps:find(Pid, NewMRefs) of
  405. error ->
  406. MRef = erlang:monitor(process, Pid),
  407. add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid),
  408. do_rebuild_monitors(T, maps:put(Pid, MRef, NewMRefs), State);
  409. {ok, MRef} ->
  410. add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid),
  411. do_rebuild_monitors(T, NewMRefs, State)
  412. end;
  413. _ ->
  414. do_rebuild_monitors(T, NewMRefs, State)
  415. end.
  416. -spec do_join_on_node(
  417. GroupName :: term(),
  418. Pid :: pid(),
  419. Meta :: term(),
  420. MRef :: reference() | undefined,
  421. RequesterNode :: node(),
  422. CallbackMethod :: atom(),
  423. #state{}
  424. ) ->
  425. {
  426. reply,
  427. {ok, {
  428. CallbackMethod :: atom(),
  429. Time :: non_neg_integer(),
  430. TableByName :: atom(),
  431. TableByPid :: atom()
  432. }},
  433. #state{}
  434. }.
  435. do_join_on_node(GroupName, Pid, Meta, MRef, RequesterNode, CallbackMethod, #state{
  436. scope = Scope,
  437. table_by_name = TableByName,
  438. table_by_pid = TableByPid
  439. } = State) ->
  440. Time = erlang:system_time(),
  441. %% add to local table
  442. add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid),
  443. %% callback
  444. syn_event_handler:call_event_handler(CallbackMethod, [Scope, GroupName, Pid, Meta]),
  445. %% broadcast
  446. syn_gen_scope:broadcast({'3.0', sync_join, GroupName, Pid, Meta, Time}, [RequesterNode], State),
  447. %% return
  448. {reply, {ok, {CallbackMethod, Time, TableByName, TableByPid}}, State}.
  449. -spec get_groups_tuples_for_node(Node :: node(), TableByName :: atom()) -> [syn_groups_tuple()].
  450. get_groups_tuples_for_node(Node, TableByName) ->
  451. ets:select(TableByName, [{
  452. {{'$1', '$2'}, '$3', '$4', '_', Node},
  453. [],
  454. [{{'$1', '$2', '$3', '$4'}}]
  455. }]).
  456. -spec find_monitor_for_pid(Pid :: pid(), TableByPid :: atom()) -> reference() | undefined.
  457. find_monitor_for_pid(Pid, TableByPid) when is_pid(Pid) ->
  458. %% we use select instead of lookup to limit the results and thus cover the case
  459. %% when a process is in multiple groups
  460. case ets:select(TableByPid, [{
  461. {{Pid, '_'}, '_', '_', '$5', '_'},
  462. [],
  463. ['$5']
  464. }], 1) of
  465. {[MRef], _} -> MRef;
  466. '$end_of_table' -> undefined
  467. end.
  468. -spec find_groups_entry_by_name_and_pid(GroupName :: term(), Pid :: pid(), TableByName :: atom()) ->
  469. Entry :: syn_groups_entry() | undefined.
  470. find_groups_entry_by_name_and_pid(GroupName, Pid, TableByName) ->
  471. case ets:lookup(TableByName, {GroupName, Pid}) of
  472. [] -> undefined;
  473. [Entry] -> Entry
  474. end.
  475. -spec find_groups_entries_by_pid(Pid :: pid(), TableByPid :: atom()) -> GroupEntries :: [syn_groups_entry()].
  476. find_groups_entries_by_pid(Pid, TableByPid) when is_pid(Pid) ->
  477. ets:select(TableByPid, [{
  478. {{Pid, '_'}, '_', '_', '_', '_'},
  479. [],
  480. ['$_']
  481. }]).
  482. -spec maybe_demonitor(Pid :: pid(), TableByPid :: atom()) -> ok.
  483. maybe_demonitor(Pid, TableByPid) ->
  484. %% select 2: if only 1 is returned it means that no other aliases exist for the Pid
  485. %% we use select instead of lookup to limit the results and thus cover the case
  486. %% when a process is in multiple groups
  487. case ets:select(TableByPid, [{
  488. {{Pid, '_'}, '_', '_', '$5', '_'},
  489. [],
  490. ['$5']
  491. }], 2) of
  492. {[MRef], _} when is_reference(MRef) ->
  493. %% no other aliases, demonitor
  494. erlang:demonitor(MRef, [flush]),
  495. ok;
  496. _ ->
  497. ok
  498. end.
  499. -spec add_to_local_table(
  500. GroupName :: term(),
  501. Pid :: pid(),
  502. Meta :: term(),
  503. Time :: integer(),
  504. MRef :: undefined | reference(),
  505. TableByName :: atom(),
  506. TableByPid :: atom()
  507. ) -> true.
  508. add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid) ->
  509. %% insert
  510. ets:insert(TableByName, {{GroupName, Pid}, Meta, Time, MRef, node(Pid)}),
  511. ets:insert(TableByPid, {{Pid, GroupName}, Meta, Time, MRef, node(Pid)}).
  512. -spec remove_from_local_table(
  513. Name :: term(),
  514. Pid :: pid(),
  515. TableByName :: atom(),
  516. TableByPid :: atom()
  517. ) -> true.
  518. remove_from_local_table(GroupName, Pid, TableByName, TableByPid) ->
  519. true = ets:delete(TableByName, {GroupName, Pid}),
  520. true = ets:delete(TableByPid, {Pid, GroupName}).
  521. -spec purge_groups_for_remote_node(Scope :: atom(), Node :: atom(), TableByName :: atom(), TableByPid :: atom()) -> true.
  522. purge_groups_for_remote_node(Scope, Node, TableByName, TableByPid) when Node =/= node() ->
  523. %% loop elements for callback in a separate process to free scope process
  524. GroupsTuples = get_groups_tuples_for_node(Node, TableByName),
  525. spawn(fun() ->
  526. lists:foreach(fun({GroupName, Pid, Meta, _Time}) ->
  527. syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta])
  528. end, GroupsTuples)
  529. end),
  530. ets:match_delete(TableByName, {{'_', '_'}, '_', '_', '_', Node}),
  531. ets:match_delete(TableByPid, {{'_', '_'}, '_', '_', '_', Node}).
  532. -spec handle_groups_sync(
  533. GroupName :: term(),
  534. Pid :: pid(),
  535. Meta :: term(),
  536. Time :: non_neg_integer(),
  537. #state{}
  538. ) -> any().
  539. handle_groups_sync(GroupName, Pid, Meta, Time, #state{
  540. scope = Scope,
  541. table_by_name = TableByName,
  542. table_by_pid = TableByPid
  543. }) ->
  544. case find_groups_entry_by_name_and_pid(GroupName, Pid, TableByName) of
  545. undefined ->
  546. %% new
  547. add_to_local_table(GroupName, Pid, Meta, Time, undefined, TableByName, TableByPid),
  548. %% callback
  549. syn_event_handler:call_event_handler(on_process_joined, [Scope, GroupName, Pid, Meta]);
  550. {{GroupName, Pid}, TableMeta, TableTime, _MRef, _TableNode} when Time > TableTime ->
  551. %% maybe updated meta or time only
  552. add_to_local_table(GroupName, Pid, Meta, Time, undefined, TableByName, TableByPid),
  553. %% callback (call only if meta update)
  554. case TableMeta =/= Meta of
  555. true -> syn_event_handler:call_event_handler(on_group_process_updated, [Scope, GroupName, Pid, Meta]);
  556. _ -> ok
  557. end;
  558. {{GroupName, Pid}, _TableMeta, _TableTime, _TableMRef, _TableNode} ->
  559. %% race condition: incoming data is older, ignore
  560. ok
  561. end.
  562. -spec multi_call_and_receive(
  563. CollectorPid :: pid(),
  564. Pid :: pid(),
  565. Meta :: term(),
  566. Message :: term(),
  567. Timeout :: non_neg_integer()
  568. ) -> any().
  569. multi_call_and_receive(CollectorPid, Pid, Meta, Message, Timeout) ->
  570. %% monitor
  571. MRef = monitor(process, Pid),
  572. %% send
  573. Pid ! {syn_multi_call, Message, self(), Meta},
  574. %% wait for reply
  575. receive
  576. {syn_multi_call_reply, Pid, Reply} ->
  577. CollectorPid ! {reply, Pid, Reply};
  578. {'DOWN', MRef, _, _, _} ->
  579. CollectorPid ! {bad_pid, Pid}
  580. after Timeout ->
  581. CollectorPid ! {bad_pid, Pid}
  582. end.
  583. -spec collect_replies(MembersOD :: orddict:orddict({pid(), Meta :: term()})) ->
  584. {
  585. Replies :: [{{pid(), Meta :: term()}, Reply :: term()}],
  586. BadReplies :: [{pid(), Meta :: term()}]
  587. }.
  588. collect_replies(MembersOD) ->
  589. collect_replies(MembersOD, [], []).
  590. -spec collect_replies(
  591. MembersOD :: orddict:orddict({pid(), Meta :: term()}),
  592. Replies :: [{{pid(), Meta :: term()}, Reply :: term()}],
  593. BadReplies :: [{pid(), Meta :: term()}]
  594. ) ->
  595. {
  596. Replies :: [{{pid(), Meta :: term()}, Reply :: term()}],
  597. BadReplies :: [{pid(), Meta :: term()}]
  598. }.
  599. collect_replies([], Replies, BadReplies) -> {Replies, BadReplies};
  600. collect_replies(MembersOD, Replies, BadReplies) ->
  601. receive
  602. {reply, Pid, Reply} ->
  603. {Meta, MembersOD1} = orddict:take(Pid, MembersOD),
  604. collect_replies(MembersOD1, [{{Pid, Meta}, Reply} | Replies], BadReplies);
  605. {bad_pid, Pid} ->
  606. {Meta, MembersOD1} = orddict:take(Pid, MembersOD),
  607. collect_replies(MembersOD1, Replies, [{Pid, Meta} | BadReplies])
  608. end.