syn_pg.erl 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669
  1. %% ==========================================================================================================
  2. %% Syn - A global Process Registry and Process Group manager.
  3. %%
  4. %% The MIT License (MIT)
  5. %%
  6. %% Copyright (c) 2015-2021 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
  7. %%
  8. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  9. %% of this software and associated documentation files (the "Software"), to deal
  10. %% in the Software without restriction, including without limitation the rights
  11. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  12. %% copies of the Software, and to permit persons to whom the Software is
  13. %% furnished to do so, subject to the following conditions:
  14. %%
  15. %% The above copyright notice and this permission notice shall be included in
  16. %% all copies or substantial portions of the Software.
  17. %%
  18. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  19. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THxE WARRANTIES OF MERCHANTABILITY,
  20. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  21. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  22. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  23. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  24. %% THE SOFTWARE.
  25. %% ==========================================================================================================
  26. -module(syn_pg).
  27. -behaviour(syn_gen_scope).
  28. %% API
  29. -export([start_link/1]).
  30. -export([subcluster_nodes/1]).
  31. -export([join/4]).
  32. -export([leave/3]).
  33. -export([members/2]).
  34. -export([is_member/3]).
  35. -export([local_members/2]).
  36. -export([is_local_member/3]).
  37. -export([count/1, count/2]).
  38. -export([group_names/1, group_names/2]).
  39. -export([publish/3]).
  40. -export([local_publish/3]).
  41. -export([multi_call/4, multi_call_reply/2]).
  42. %% syn_gen_scope callbacks
  43. -export([
  44. init/1,
  45. handle_call/3,
  46. handle_info/2,
  47. save_remote_data/2,
  48. get_local_data/1,
  49. purge_local_data_for_node/2
  50. ]).
  51. %% internal
  52. -export([multi_call_and_receive/5]).
  53. %% includes
  54. -include("syn.hrl").
  55. %% ===================================================================
  56. %% API
  57. %% ===================================================================
  58. -spec start_link(Scope :: atom()) ->
  59. {ok, Pid :: pid()} | {error, {already_started, Pid :: pid()}} | {error, Reason :: term()}.
  60. start_link(Scope) when is_atom(Scope) ->
  61. syn_gen_scope:start_link(?MODULE, Scope).
  62. -spec subcluster_nodes(Scope :: atom()) -> [node()].
  63. subcluster_nodes(Scope) ->
  64. syn_gen_scope:subcluster_nodes(?MODULE, Scope).
  65. -spec members(Scope :: atom(), GroupName :: term()) -> [{Pid :: pid(), Meta :: term()}].
  66. members(Scope, GroupName) ->
  67. do_get_members(Scope, GroupName, '_').
  68. -spec is_member(Scope :: atom(), GroupName :: term(), Pid :: pid()) -> boolean().
  69. is_member(Scope, GroupName, Pid) ->
  70. case syn_backbone:get_table_name(syn_pg_by_name, Scope) of
  71. undefined ->
  72. error({invalid_scope, Scope});
  73. TableByName ->
  74. case find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) of
  75. undefined -> false;
  76. _ -> true
  77. end
  78. end.
  79. -spec local_members(Scope :: atom(), GroupName :: term()) -> [{Pid :: pid(), Meta :: term()}].
  80. local_members(Scope, GroupName) ->
  81. do_get_members(Scope, GroupName, node()).
  82. -spec do_get_members(Scope :: atom(), GroupName :: term(), NodeParam :: atom()) -> [{Pid :: pid(), Meta :: term()}].
  83. do_get_members(Scope, GroupName, NodeParam) ->
  84. case syn_backbone:get_table_name(syn_pg_by_name, Scope) of
  85. undefined ->
  86. error({invalid_scope, Scope});
  87. TableByName ->
  88. ets:select(TableByName, [{
  89. {{GroupName, '$2'}, '$3', '_', '_', NodeParam},
  90. [],
  91. [{{'$2', '$3'}}]
  92. }])
  93. end.
  94. -spec is_local_member(Scope :: atom(), GroupName :: term(), Pid :: pid()) -> boolean().
  95. is_local_member(Scope, GroupName, Pid) ->
  96. case syn_backbone:get_table_name(syn_pg_by_name, Scope) of
  97. undefined ->
  98. error({invalid_scope, Scope});
  99. TableByName ->
  100. case find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) of
  101. {{_, _}, _, _, _, Node} when Node =:= node() -> true;
  102. _ -> false
  103. end
  104. end.
  105. -spec join(Scope :: atom(), GroupName :: term(), Pid :: pid(), Meta :: term()) -> ok.
  106. join(Scope, GroupName, Pid, Meta) ->
  107. Node = node(Pid),
  108. case syn_gen_scope:call(?MODULE, Node, Scope, {'3.0', join_on_node, node(), GroupName, Pid, Meta}) of
  109. {ok, {CallbackMethod, Time, TableByName, TableByPid}} when Node =/= node() ->
  110. %% update table on caller node immediately so that subsequent calls have an updated registry
  111. add_to_local_table(GroupName, Pid, Meta, Time, undefined, TableByName, TableByPid),
  112. %% callback
  113. syn_event_handler:call_event_handler(CallbackMethod, [Scope, GroupName, Pid, Meta, normal]),
  114. %% return
  115. ok;
  116. {Response, _} ->
  117. Response
  118. end.
  119. -spec leave(Scope :: atom(), GroupName :: term(), Pid :: pid()) -> ok | {error, Reason :: term()}.
  120. leave(Scope, GroupName, Pid) ->
  121. case syn_backbone:get_table_name(syn_pg_by_name, Scope) of
  122. undefined ->
  123. error({invalid_scope, Scope});
  124. TableByName ->
  125. Node = node(Pid),
  126. case syn_gen_scope:call(?MODULE, Node, Scope, {'3.0', leave_on_node, node(), GroupName, Pid}) of
  127. {ok, {Meta, TableByPid}} when Node =/= node() ->
  128. %% remove table on caller node immediately so that subsequent calls have an updated registry
  129. remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
  130. %% callback
  131. syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta, normal]),
  132. %% return
  133. ok;
  134. {Response, _} ->
  135. Response
  136. end
  137. end.
  138. -spec count(Scope :: atom()) -> non_neg_integer().
  139. count(Scope) ->
  140. Set = group_names_ordset(Scope, '_'),
  141. ordsets:size(Set).
  142. -spec count(Scope :: atom(), Node :: node()) -> non_neg_integer().
  143. count(Scope, Node) ->
  144. Set = group_names_ordset(Scope, Node),
  145. ordsets:size(Set).
  146. -spec group_names(Scope :: atom()) -> [GroupName :: term()].
  147. group_names(Scope) ->
  148. Set = group_names_ordset(Scope, '_'),
  149. ordsets:to_list(Set).
  150. -spec group_names(Scope :: atom(), Node :: node()) -> [GroupName :: term()].
  151. group_names(Scope, Node) ->
  152. Set = group_names_ordset(Scope, Node),
  153. ordsets:to_list(Set).
  154. -spec group_names_ordset(Scope :: atom(), Node :: node()) -> ordsets:ordset(GroupName :: term()).
  155. group_names_ordset(Scope, NodeParam) ->
  156. case syn_backbone:get_table_name(syn_pg_by_name, Scope) of
  157. undefined ->
  158. error({invalid_scope, Scope});
  159. TableByName ->
  160. DuplicatedGroups = ets:select(TableByName, [{
  161. {{'$1', '_'}, '_', '_', '_', NodeParam},
  162. [],
  163. ['$1']
  164. }]),
  165. ordsets:from_list(DuplicatedGroups)
  166. end.
  167. -spec publish(Scope :: atom(), GroupName :: term(), Message :: term()) -> {ok, RecipientCount :: non_neg_integer()}.
  168. publish(Scope, GroupName, Message) ->
  169. Members = members(Scope, GroupName),
  170. do_publish(Members, Message).
  171. -spec local_publish(Scope :: atom(), GroupName :: term(), Message :: term()) -> {ok, RecipientCount :: non_neg_integer()}.
  172. local_publish(Scope, GroupName, Message) ->
  173. Members = local_members(Scope, GroupName),
  174. do_publish(Members, Message).
  175. -spec do_publish(Members :: [{Pid :: pid(), Meta :: term()}], Message :: term()) ->
  176. {ok, RecipientCount :: non_neg_integer()}.
  177. do_publish(Members, Message) ->
  178. lists:foreach(fun({Pid, _Meta}) ->
  179. Pid ! Message
  180. end, Members),
  181. {ok, length(Members)}.
  182. -spec multi_call(Scope :: atom(), GroupName :: term(), Message :: term(), Timeout :: non_neg_integer()) ->
  183. {
  184. Replies :: [{{pid(), Meta :: term()}, Reply :: term()}],
  185. BadReplies :: [{pid(), Meta :: term()}]
  186. }.
  187. multi_call(Scope, GroupName, Message, Timeout) ->
  188. Self = self(),
  189. Members = members(Scope, GroupName),
  190. lists:foreach(fun({Pid, Meta}) ->
  191. spawn_link(?MODULE, multi_call_and_receive, [Self, Pid, Meta, Message, Timeout])
  192. end, Members),
  193. collect_replies(orddict:from_list(Members)).
  194. -spec multi_call_reply({reference(), ClientPid :: pid()}, Reply :: term()) ->
  195. {syn_multi_call_reply, reference(), Reply :: term()}.
  196. multi_call_reply({Ref, CallerPid}, Reply) ->
  197. CallerPid ! {syn_multi_call_reply, Ref, Reply}.
  198. %% ===================================================================
  199. %% Callbacks
  200. %% ===================================================================
  201. %% ----------------------------------------------------------------------------------------------------------
  202. %% Init
  203. %% ----------------------------------------------------------------------------------------------------------
  204. -spec init(#state{}) -> {ok, HandlerState :: term()}.
  205. init(#state{
  206. scope = Scope,
  207. table_by_name = TableByName,
  208. table_by_pid = TableByPid
  209. }) ->
  210. %% purge remote & rebuild
  211. purge_pg_for_remote_nodes(Scope, TableByName, TableByPid),
  212. rebuild_monitors(Scope, TableByName, TableByPid),
  213. %% init
  214. HandlerState = #{},
  215. {ok, HandlerState}.
  216. %% ----------------------------------------------------------------------------------------------------------
  217. %% Call messages
  218. %% ----------------------------------------------------------------------------------------------------------
  219. -spec handle_call(Request :: term(), From :: {pid(), Tag :: term()}, #state{}) ->
  220. {reply, Reply :: term(), #state{}} |
  221. {reply, Reply :: term(), #state{}, timeout() | hibernate | {continue, term()}} |
  222. {noreply, #state{}} |
  223. {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
  224. {stop, Reason :: term(), Reply :: term(), #state{}} |
  225. {stop, Reason :: term(), #state{}}.
  226. handle_call({'3.0', join_on_node, RequesterNode, GroupName, Pid, Meta}, _From, #state{
  227. table_by_name = TableByName,
  228. table_by_pid = TableByPid
  229. } = State) ->
  230. case is_process_alive(Pid) of
  231. true ->
  232. case find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) of
  233. undefined ->
  234. %% add
  235. MRef = case find_monitor_for_pid(Pid, TableByPid) of
  236. undefined -> erlang:monitor(process, Pid); %% process is not monitored yet, create
  237. MRef0 -> MRef0
  238. end,
  239. do_join_on_node(GroupName, Pid, Meta, MRef, normal, RequesterNode, on_process_joined, State);
  240. {{_, Meta}, _, _, _, _} ->
  241. %% re-joined with same meta
  242. {ok, noop};
  243. {{_, _}, _, _, MRef, _} ->
  244. do_join_on_node(GroupName, Pid, Meta, MRef, normal, RequesterNode, on_group_process_updated, State)
  245. end;
  246. false ->
  247. {reply, {{error, not_alive}, undefined}, State}
  248. end;
  249. handle_call({'3.0', leave_on_node, RequesterNode, GroupName, Pid}, _From, #state{
  250. scope = Scope,
  251. table_by_name = TableByName,
  252. table_by_pid = TableByPid
  253. } = State) ->
  254. case find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) of
  255. undefined ->
  256. {reply, {{error, not_in_group}, undefined}, State};
  257. {{_, _}, Meta, _, _, _} ->
  258. %% is this the last group process is in?
  259. maybe_demonitor(Pid, TableByPid),
  260. %% remove from table
  261. remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
  262. %% callback
  263. syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta, normal]),
  264. %% broadcast
  265. syn_gen_scope:broadcast({'3.0', sync_leave, GroupName, Pid, Meta, normal}, [RequesterNode], State),
  266. %% return
  267. {reply, {ok, {Meta, TableByPid}}, State}
  268. end;
  269. handle_call(Request, From, #state{scope = Scope} = State) ->
  270. error_logger:warning_msg("SYN[~s<~s>] Received from ~p an unknown call message: ~p", [?MODULE, Scope, From, Request]),
  271. {reply, undefined, State}.
  272. %% ----------------------------------------------------------------------------------------------------------
  273. %% Info messages
  274. %% ----------------------------------------------------------------------------------------------------------
  275. -spec handle_info(Info :: timeout | term(), #state{}) ->
  276. {noreply, #state{}} |
  277. {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
  278. {stop, Reason :: term(), #state{}}.
  279. handle_info({'3.0', sync_join, GroupName, Pid, Meta, Time, Reason}, #state{nodes_map = NodesMap} = State) ->
  280. case maps:is_key(node(Pid), NodesMap) of
  281. true ->
  282. handle_pg_sync(GroupName, Pid, Meta, Time, Reason, State);
  283. false ->
  284. %% ignore, race condition
  285. ok
  286. end,
  287. {noreply, State};
  288. handle_info({'3.0', sync_leave, GroupName, Pid, Meta, Reason}, #state{
  289. scope = Scope,
  290. table_by_name = TableByName,
  291. table_by_pid = TableByPid
  292. } = State) ->
  293. case find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) of
  294. undefined ->
  295. %% not in table, nothing to do
  296. ok;
  297. _ ->
  298. %% remove from table
  299. remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
  300. %% callback
  301. syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta, Reason])
  302. end,
  303. %% return
  304. {noreply, State};
  305. handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{
  306. scope = Scope,
  307. table_by_name = TableByName,
  308. table_by_pid = TableByPid
  309. } = State) ->
  310. case find_pg_entries_by_pid(Pid, TableByPid) of
  311. [] ->
  312. error_logger:warning_msg(
  313. "SYN[~s<~s>] Received a DOWN message from an unknown process ~p with reason: ~p",
  314. [?MODULE, Scope, Pid, Reason]
  315. );
  316. Entries ->
  317. lists:foreach(fun({{_Pid, GroupName}, Meta, _, _, _}) ->
  318. %% remove from table
  319. remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
  320. %% callback
  321. syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta, Reason]),
  322. %% broadcast
  323. syn_gen_scope:broadcast({'3.0', sync_leave, GroupName, Pid, Meta, Reason}, State)
  324. end, Entries)
  325. end,
  326. %% return
  327. {noreply, State};
  328. handle_info(Info, #state{scope = Scope} = State) ->
  329. error_logger:warning_msg("SYN[~s<~s>] Received an unknown info message: ~p", [?MODULE, Scope, Info]),
  330. {noreply, State}.
  331. %% ----------------------------------------------------------------------------------------------------------
  332. %% Data callbacks
  333. %% ----------------------------------------------------------------------------------------------------------
  334. -spec get_local_data(State :: term()) -> {ok, Data :: term()} | undefined.
  335. get_local_data(#state{table_by_name = TableByName}) ->
  336. {ok, get_pg_tuples_for_node(node(), TableByName)}.
  337. -spec save_remote_data(RemoteData :: term(), State :: term()) -> any().
  338. save_remote_data(PgTuplesOfRemoteNode, #state{scope = Scope} = State) ->
  339. %% insert tuples
  340. lists:foreach(fun({GroupName, Pid, Meta, Time}) ->
  341. handle_pg_sync(GroupName, Pid, Meta, Time, {syn_remote_scope_node_up, Scope, node(Pid)}, State)
  342. end, PgTuplesOfRemoteNode).
  343. -spec purge_local_data_for_node(Node :: node(), State :: term()) -> any().
  344. purge_local_data_for_node(Node, #state{
  345. scope = Scope,
  346. table_by_name = TableByName,
  347. table_by_pid = TableByPid
  348. }) ->
  349. purge_pg_for_remote_node(Scope, Node, TableByName, TableByPid).
  350. %% ===================================================================
  351. %% Internal
  352. %% ===================================================================
  353. -spec rebuild_monitors(Scope :: atom(), TableByName :: atom(), TableByPid :: atom()) -> ok.
  354. rebuild_monitors(Scope, TableByName, TableByPid) ->
  355. PgTuples = get_pg_tuples_for_node(node(), TableByName),
  356. do_rebuild_monitors(PgTuples, #{}, Scope, TableByName, TableByPid).
  357. -spec do_rebuild_monitors(
  358. [syn_pg_tuple()],
  359. #{pid() => reference()},
  360. Scope :: atom(),
  361. TableByName :: atom(),
  362. TableByPid :: atom()
  363. ) -> ok.
  364. do_rebuild_monitors([], _, _, _, _) -> ok;
  365. do_rebuild_monitors([{GroupName, Pid, Meta, Time} | T], NewMRefs, Scope, TableByName, TableByPid) ->
  366. remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
  367. case is_process_alive(Pid) of
  368. true ->
  369. case maps:find(Pid, NewMRefs) of
  370. error ->
  371. MRef = erlang:monitor(process, Pid),
  372. add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid),
  373. do_rebuild_monitors(T, maps:put(Pid, MRef, NewMRefs), Scope, TableByName, TableByPid);
  374. {ok, MRef} ->
  375. add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid),
  376. do_rebuild_monitors(T, NewMRefs, Scope, TableByName, TableByPid)
  377. end;
  378. _ ->
  379. %% process died meanwhile, callback
  380. %% the remote callbacks will have been called when the scope process crash triggered them
  381. syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta, undefined]),
  382. %% loop
  383. do_rebuild_monitors(T, NewMRefs, Scope, TableByName, TableByPid)
  384. end.
  385. -spec do_join_on_node(
  386. GroupName :: term(),
  387. Pid :: pid(),
  388. Meta :: term(),
  389. MRef :: reference() | undefined,
  390. Reason :: term(),
  391. RequesterNode :: node(),
  392. CallbackMethod :: atom(),
  393. #state{}
  394. ) ->
  395. {
  396. reply,
  397. {ok, {
  398. CallbackMethod :: atom(),
  399. Time :: non_neg_integer(),
  400. TableByName :: atom(),
  401. TableByPid :: atom()
  402. }},
  403. #state{}
  404. }.
  405. do_join_on_node(GroupName, Pid, Meta, MRef, Reason, RequesterNode, CallbackMethod, #state{
  406. scope = Scope,
  407. table_by_name = TableByName,
  408. table_by_pid = TableByPid
  409. } = State) ->
  410. Time = erlang:system_time(),
  411. %% add to local table
  412. add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid),
  413. %% callback
  414. syn_event_handler:call_event_handler(CallbackMethod, [Scope, GroupName, Pid, Meta, Reason]),
  415. %% broadcast
  416. syn_gen_scope:broadcast({'3.0', sync_join, GroupName, Pid, Meta, Time, Reason}, [RequesterNode], State),
  417. %% return
  418. {reply, {ok, {CallbackMethod, Time, TableByName, TableByPid}}, State}.
  419. -spec get_pg_tuples_for_node(Node :: node(), TableByName :: atom()) -> [syn_pg_tuple()].
  420. get_pg_tuples_for_node(Node, TableByName) ->
  421. ets:select(TableByName, [{
  422. {{'$1', '$2'}, '$3', '$4', '_', Node},
  423. [],
  424. [{{'$1', '$2', '$3', '$4'}}]
  425. }]).
  426. -spec find_monitor_for_pid(Pid :: pid(), TableByPid :: atom()) -> reference() | undefined.
  427. find_monitor_for_pid(Pid, TableByPid) when is_pid(Pid) ->
  428. %% we use select instead of lookup to limit the results and thus cover the case
  429. %% when a process is in multiple groups
  430. case ets:select(TableByPid, [{
  431. {{Pid, '_'}, '_', '_', '$5', '_'},
  432. [],
  433. ['$5']
  434. }], 1) of
  435. {[MRef], _} -> MRef;
  436. '$end_of_table' -> undefined
  437. end.
  438. -spec find_pg_entry_by_name_and_pid(GroupName :: term(), Pid :: pid(), TableByName :: atom()) ->
  439. Entry :: syn_pg_entry() | undefined.
  440. find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) ->
  441. case ets:lookup(TableByName, {GroupName, Pid}) of
  442. [] -> undefined;
  443. [Entry] -> Entry
  444. end.
  445. -spec find_pg_entries_by_pid(Pid :: pid(), TableByPid :: atom()) -> GroupEntries :: [syn_pg_entry()].
  446. find_pg_entries_by_pid(Pid, TableByPid) when is_pid(Pid) ->
  447. ets:select(TableByPid, [{
  448. {{Pid, '_'}, '_', '_', '_', '_'},
  449. [],
  450. ['$_']
  451. }]).
  452. -spec maybe_demonitor(Pid :: pid(), TableByPid :: atom()) -> ok.
  453. maybe_demonitor(Pid, TableByPid) ->
  454. %% select 2: if only 1 is returned it means that no other aliases exist for the Pid
  455. %% we use select instead of lookup to limit the results and thus cover the case
  456. %% when a process is in multiple groups
  457. case ets:select(TableByPid, [{
  458. {{Pid, '_'}, '_', '_', '$5', '_'},
  459. [],
  460. ['$5']
  461. }], 2) of
  462. {[MRef], _} when is_reference(MRef) ->
  463. %% no other aliases, demonitor
  464. erlang:demonitor(MRef, [flush]),
  465. ok;
  466. _ ->
  467. ok
  468. end.
  469. -spec add_to_local_table(
  470. GroupName :: term(),
  471. Pid :: pid(),
  472. Meta :: term(),
  473. Time :: integer(),
  474. MRef :: undefined | reference(),
  475. TableByName :: atom(),
  476. TableByPid :: atom()
  477. ) -> true.
  478. add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid) ->
  479. %% insert
  480. ets:insert(TableByName, {{GroupName, Pid}, Meta, Time, MRef, node(Pid)}),
  481. ets:insert(TableByPid, {{Pid, GroupName}, Meta, Time, MRef, node(Pid)}).
  482. -spec remove_from_local_table(
  483. Name :: term(),
  484. Pid :: pid(),
  485. TableByName :: atom(),
  486. TableByPid :: atom()
  487. ) -> true.
  488. remove_from_local_table(GroupName, Pid, TableByName, TableByPid) ->
  489. true = ets:delete(TableByName, {GroupName, Pid}),
  490. true = ets:delete(TableByPid, {Pid, GroupName}).
  491. -spec purge_pg_for_remote_nodes(Scope :: atom(), TableByName :: atom(), TableByPid :: atom()) -> any().
  492. purge_pg_for_remote_nodes(Scope, TableByName, TableByPid) ->
  493. LocalNode = node(),
  494. DuplicatedRemoteNodes = ets:select(TableByName, [{
  495. {{'_', '_'}, '_', '_', '_', '$6'},
  496. [{'=/=', '$6', LocalNode}],
  497. ['$6']
  498. }]),
  499. RemoteNodes = ordsets:from_list(DuplicatedRemoteNodes),
  500. ordsets:fold(fun(RemoteNode, _) ->
  501. purge_pg_for_remote_node(Scope, RemoteNode, TableByName, TableByPid)
  502. end, undefined, RemoteNodes).
  503. -spec purge_pg_for_remote_node(Scope :: atom(), Node :: atom(), TableByName :: atom(), TableByPid :: atom()) -> true.
  504. purge_pg_for_remote_node(Scope, Node, TableByName, TableByPid) when Node =/= node() ->
  505. %% loop elements for callback in a separate process to free scope process
  506. PgTuples = get_pg_tuples_for_node(Node, TableByName),
  507. spawn(fun() ->
  508. lists:foreach(fun({GroupName, Pid, Meta, _Time}) ->
  509. syn_event_handler:call_event_handler(on_process_left,
  510. [Scope, GroupName, Pid, Meta, {syn_remote_scope_node_down, Scope, Node}]
  511. )
  512. end, PgTuples)
  513. end),
  514. ets:match_delete(TableByName, {{'_', '_'}, '_', '_', '_', Node}),
  515. ets:match_delete(TableByPid, {{'_', '_'}, '_', '_', '_', Node}).
  516. -spec handle_pg_sync(
  517. GroupName :: term(),
  518. Pid :: pid(),
  519. Meta :: term(),
  520. Time :: non_neg_integer(),
  521. Reason :: term(),
  522. #state{}
  523. ) -> any().
  524. handle_pg_sync(GroupName, Pid, Meta, Time, Reason, #state{
  525. scope = Scope,
  526. table_by_name = TableByName,
  527. table_by_pid = TableByPid
  528. }) ->
  529. case find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) of
  530. undefined ->
  531. %% new
  532. add_to_local_table(GroupName, Pid, Meta, Time, undefined, TableByName, TableByPid),
  533. %% callback
  534. syn_event_handler:call_event_handler(on_process_joined, [Scope, GroupName, Pid, Meta, Reason]);
  535. {{GroupName, Pid}, TableMeta, TableTime, _, _} when Time > TableTime ->
  536. %% maybe updated meta or time only
  537. add_to_local_table(GroupName, Pid, Meta, Time, undefined, TableByName, TableByPid),
  538. %% callback (call only if meta update)
  539. case TableMeta =/= Meta of
  540. true ->
  541. syn_event_handler:call_event_handler(on_group_process_updated, [Scope, GroupName, Pid, Meta, Reason]);
  542. _ -> ok
  543. end;
  544. _ ->
  545. %% race condition: incoming data is older, ignore
  546. ok
  547. end.
  548. -spec multi_call_and_receive(
  549. CollectorPid :: pid(),
  550. Pid :: pid(),
  551. Meta :: term(),
  552. Message :: term(),
  553. Timeout :: non_neg_integer()
  554. ) -> any().
  555. multi_call_and_receive(CollectorPid, Pid, Meta, Message, Timeout) ->
  556. %% monitor
  557. MRef = monitor(process, Pid),
  558. %% send
  559. Ref = make_ref(),
  560. From = {Ref, self()},
  561. Pid ! {syn_multi_call, Message, From, Meta},
  562. %% wait for reply
  563. receive
  564. {syn_multi_call_reply, Ref, Reply} ->
  565. CollectorPid ! {syn_reply, Pid, Reply};
  566. {'DOWN', MRef, _, _, _} ->
  567. CollectorPid ! {syn_bad_reply, Pid}
  568. after Timeout ->
  569. CollectorPid ! {syn_bad_reply, Pid}
  570. end.
  571. -spec collect_replies(MembersOD :: orddict:orddict({pid(), Meta :: term()})) ->
  572. {
  573. Replies :: [{{pid(), Meta :: term()}, Reply :: term()}],
  574. BadReplies :: [{pid(), Meta :: term()}]
  575. }.
  576. collect_replies(MembersOD) ->
  577. collect_replies(MembersOD, [], []).
  578. -spec collect_replies(
  579. MembersOD :: orddict:orddict({pid(), Meta :: term()}),
  580. Replies :: [{{pid(), Meta :: term()}, Reply :: term()}],
  581. BadReplies :: [{pid(), Meta :: term()}]
  582. ) ->
  583. {
  584. Replies :: [{{pid(), Meta :: term()}, Reply :: term()}],
  585. BadReplies :: [{pid(), Meta :: term()}]
  586. }.
  587. collect_replies([], Replies, BadReplies) -> {Replies, BadReplies};
  588. collect_replies(MembersOD, Replies, BadReplies) ->
  589. receive
  590. {syn_reply, Pid, Reply} ->
  591. {Meta, MembersOD1} = orddict:take(Pid, MembersOD),
  592. collect_replies(MembersOD1, [{{Pid, Meta}, Reply} | Replies], BadReplies);
  593. {syn_bad_reply, Pid} ->
  594. {Meta, MembersOD1} = orddict:take(Pid, MembersOD),
  595. collect_replies(MembersOD1, Replies, [{Pid, Meta} | BadReplies])
  596. end.