syn_pg.erl 28 KB


  1. %% ==========================================================================================================
  2. %% Syn - A global Process Registry and Process Group manager.
  3. %%
  4. %% The MIT License (MIT)
  5. %%
  6. %% Copyright (c) 2015-2022 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
  7. %%
  8. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  9. %% of this software and associated documentation files (the "Software"), to deal
  10. %% in the Software without restriction, including without limitation the rights
  11. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  12. %% copies of the Software, and to permit persons to whom the Software is
  13. %% furnished to do so, subject to the following conditions:
  14. %%
  15. %% The above copyright notice and this permission notice shall be included in
  16. %% all copies or substantial portions of the Software.
  17. %%
  18. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  19. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THxE WARRANTIES OF MERCHANTABILITY,
  20. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  21. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  22. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  23. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  24. %% THE SOFTWARE.
  25. %% ==========================================================================================================
  26. %% @private
  27. -module(syn_pg).
  28. -behaviour(syn_gen_scope).
  29. %% API
  30. -export([start_link/1]).
  31. -export([subcluster_nodes/1]).
  32. -export([join/4]).
  33. -export([leave/3]).
  34. -export([members/2]).
  35. -export([member/3]).
  36. -export([is_member/3]).
  37. -export([update_member/4]).
  38. -export([local_members/2]).
  39. -export([is_local_member/3]).
  40. -export([count/1, count/2]).
  41. -export([group_names/1, group_names/2]).
  42. -export([publish/3]).
  43. -export([local_publish/3]).
  44. -export([multi_call/4, multi_call_reply/2]).
  45. %% syn_gen_scope callbacks
  46. -export([
  47. init/1,
  48. handle_call/3,
  49. handle_info/2,
  50. save_remote_data/2,
  51. get_local_data/1,
  52. purge_local_data_for_node/2
  53. ]).
  54. %% internal
  55. -export([multi_call_and_receive/5]).
  56. %% macros
  57. -define(MODULE_LOG_NAME, pg).
  58. %% tests
  59. -ifdef(TEST).
  60. -export([add_to_local_table/7]).
  61. -endif.
  62. %% includes
  63. -include("syn.hrl").
  64. %% ===================================================================
  65. %% API
  66. %% ===================================================================
  67. -spec start_link(Scope :: atom()) ->
  68. {ok, Pid :: pid()} | {error, {already_started, Pid :: pid()}} | {error, Reason :: term()}.
  69. start_link(Scope) when is_atom(Scope) ->
  70. syn_gen_scope:start_link(?MODULE, ?MODULE_LOG_NAME, Scope).
  71. -spec subcluster_nodes(Scope :: atom()) -> [node()].
  72. subcluster_nodes(Scope) ->
  73. syn_gen_scope:subcluster_nodes(?MODULE, Scope).
  74. -spec members(Scope :: atom(), GroupName :: term()) -> [{Pid :: pid(), Meta :: term()}].
  75. members(Scope, GroupName) ->
  76. do_get_members(Scope, GroupName, undefined, undefined).
  77. -spec member(Scope :: atom(), GroupName :: term(), pid()) -> {pid(), Meta :: term()} | undefined.
  78. member(Scope, GroupName, Pid) ->
  79. case do_get_members(Scope, GroupName, Pid, undefined) of
  80. [] -> undefined;
  81. [Member] -> Member
  82. end.
  83. -spec is_member(Scope :: atom(), GroupName :: term(), pid()) -> boolean().
  84. is_member(Scope, GroupName, Pid) ->
  85. case syn_backbone:get_table_name(syn_pg_by_name, Scope) of
  86. undefined ->
  87. error({invalid_scope, Scope});
  88. TableByName ->
  89. case find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) of
  90. undefined -> false;
  91. _ -> true
  92. end
  93. end.
  94. -spec local_members(Scope :: atom(), GroupName :: term()) -> [{pid(), Meta :: term()}].
  95. local_members(Scope, GroupName) ->
  96. do_get_members(Scope, GroupName, undefined, node()).
  97. -spec do_get_members(Scope :: atom(), GroupName :: term(), pid() | undefined, Node :: node() | undefined) ->
  98. [{pid(), Meta :: term()}].
  99. do_get_members(Scope, GroupName, Pid, Node) ->
  100. PidParam = case Pid of
  101. undefined -> '$2';
  102. _ -> Pid
  103. end,
  104. NodeParam = case Node of
  105. undefined -> '_';
  106. _ -> Node
  107. end,
  108. case syn_backbone:get_table_name(syn_pg_by_name, Scope) of
  109. undefined ->
  110. error({invalid_scope, Scope});
  111. TableByName ->
  112. ets:select(TableByName, [{
  113. {{GroupName, PidParam}, '$3', '_', '_', NodeParam},
  114. [],
  115. [{{PidParam, '$3'}}]
  116. }])
  117. end.
  118. -spec is_local_member(Scope :: atom(), GroupName :: term(), pid()) -> boolean().
  119. is_local_member(Scope, GroupName, Pid) ->
  120. case syn_backbone:get_table_name(syn_pg_by_name, Scope) of
  121. undefined ->
  122. error({invalid_scope, Scope});
  123. TableByName ->
  124. case find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) of
  125. {{_, _}, _, _, _, Node} when Node =:= node() -> true;
  126. _ -> false
  127. end
  128. end.
  129. -spec join(Scope :: atom(), GroupName :: term(), Pid :: pid(), Meta :: term()) -> ok.
  130. join(Scope, GroupName, Pid, Meta) ->
  131. case join_or_update(Scope, GroupName, Pid, Meta) of
  132. {ok, _} -> ok;
  133. {error, Reason} -> {error, Reason}
  134. end.
  135. -spec update_member(Scope :: atom(), GroupName :: term(), Pid :: pid(), Fun :: function()) ->
  136. {ok, {Pid :: pid(), Meta :: term()}} | {error, Reason :: term()}.
  137. update_member(Scope, GroupName, Pid, Fun) when is_function(Fun) ->
  138. join_or_update(Scope, GroupName, Pid, Fun).
  139. -spec join_or_update(Scope :: atom(), GroupName :: term(), Pid :: pid(), MetaOrFun :: term() | function()) ->
  140. {ok, {Pid :: pid(), Meta :: term()}} | {error, Reason :: term()}.
  141. join_or_update(Scope, GroupName, Pid, MetaOrFun) ->
  142. case syn_backbone:is_strict_mode() of
  143. true when Pid =/= self() ->
  144. {error, not_self};
  145. _ ->
  146. Node = node(Pid),
  147. case syn_gen_scope:call(?MODULE, Node, Scope, {'3.0', join_or_update_on_node, node(), GroupName, Pid, MetaOrFun}) of
  148. {ok, {CallbackMethod, Meta, Time, TableByName, TableByPid}} when Node =/= node() ->
  149. %% update table on caller node immediately so that subsequent calls have an updated pg
  150. add_to_local_table(GroupName, Pid, Meta, Time, undefined, TableByName, TableByPid),
  151. %% callback
  152. syn_event_handler:call_event_handler(CallbackMethod, [Scope, GroupName, Pid, Meta, normal]),
  153. %% return
  154. {ok, {Pid, Meta}};
  155. {ok, {_, Meta, _, _, _}} ->
  156. {ok, {Pid, Meta}};
  157. {noop, Meta} ->
  158. {ok, {Pid, Meta}};
  159. {error, Reason} ->
  160. {error, Reason};
  161. {raise, Class, Reason, Stacktrace} ->
  162. erlang:raise(Class, Reason, Stacktrace)
  163. end
  164. end.
  165. -spec leave(Scope :: atom(), GroupName :: term(), pid()) -> ok | {error, Reason :: term()}.
  166. leave(Scope, GroupName, Pid) ->
  167. case syn_backbone:get_table_name(syn_pg_by_name, Scope) of
  168. undefined ->
  169. error({invalid_scope, Scope});
  170. TableByName ->
  171. Node = node(Pid),
  172. case syn_gen_scope:call(?MODULE, Node, Scope, {'3.0', leave_on_node, node(), GroupName, Pid}) of
  173. {ok, {Meta, TableByPid}} when Node =/= node() ->
  174. %% remove table on caller node immediately so that subsequent calls have an updated registry
  175. remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
  176. %% callback
  177. syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta, normal]),
  178. %% return
  179. ok;
  180. {ok, _} ->
  181. ok;
  182. {error, Reason} ->
  183. {error, Reason}
  184. end
  185. end.
  186. -spec count(Scope :: atom()) -> non_neg_integer().
  187. count(Scope) ->
  188. Set = group_names_ordset(Scope, '_'),
  189. ordsets:size(Set).
  190. -spec count(Scope :: atom(), Node :: node()) -> non_neg_integer().
  191. count(Scope, Node) ->
  192. Set = group_names_ordset(Scope, Node),
  193. ordsets:size(Set).
  194. -spec group_names(Scope :: atom()) -> [GroupName :: term()].
  195. group_names(Scope) ->
  196. Set = group_names_ordset(Scope, '_'),
  197. ordsets:to_list(Set).
  198. -spec group_names(Scope :: atom(), Node :: node()) -> [GroupName :: term()].
  199. group_names(Scope, Node) ->
  200. Set = group_names_ordset(Scope, Node),
  201. ordsets:to_list(Set).
  202. -spec group_names_ordset(Scope :: atom(), Node :: node()) -> ordsets:ordset(GroupName :: term()).
  203. group_names_ordset(Scope, NodeParam) ->
  204. case syn_backbone:get_table_name(syn_pg_by_name, Scope) of
  205. undefined ->
  206. error({invalid_scope, Scope});
  207. TableByName ->
  208. DuplicatedGroups = ets:select(TableByName, [{
  209. {{'$1', '_'}, '_', '_', '_', NodeParam},
  210. [],
  211. ['$1']
  212. }]),
  213. ordsets:from_list(DuplicatedGroups)
  214. end.
  215. -spec publish(Scope :: atom(), GroupName :: term(), Message :: term()) -> {ok, RecipientCount :: non_neg_integer()}.
  216. publish(Scope, GroupName, Message) ->
  217. Members = members(Scope, GroupName),
  218. do_publish(Members, Message).
  219. -spec local_publish(Scope :: atom(), GroupName :: term(), Message :: term()) -> {ok, RecipientCount :: non_neg_integer()}.
  220. local_publish(Scope, GroupName, Message) ->
  221. Members = local_members(Scope, GroupName),
  222. do_publish(Members, Message).
  223. -spec do_publish(Members :: [{Pid :: pid(), Meta :: term()}], Message :: term()) ->
  224. {ok, RecipientCount :: non_neg_integer()}.
  225. do_publish(Members, Message) ->
  226. lists:foreach(fun({Pid, _Meta}) ->
  227. Pid ! Message
  228. end, Members),
  229. {ok, length(Members)}.
  230. -spec multi_call(Scope :: atom(), GroupName :: term(), Message :: term(), Timeout :: non_neg_integer()) ->
  231. {
  232. Replies :: [{{pid(), Meta :: term()}, Reply :: term()}],
  233. BadReplies :: [{pid(), Meta :: term()}]
  234. }.
  235. multi_call(Scope, GroupName, Message, Timeout) ->
  236. Self = self(),
  237. Members = members(Scope, GroupName),
  238. lists:foreach(fun({Pid, Meta}) ->
  239. spawn_link(?MODULE, multi_call_and_receive, [Self, Pid, Meta, Message, Timeout])
  240. end, Members),
  241. collect_replies(orddict:from_list(Members)).
  242. -spec multi_call_reply({CallerPid :: term(), reference()}, Reply :: term()) -> any().
  243. multi_call_reply({CallerPid, Ref}, Reply) ->
  244. CallerPid ! {syn_multi_call_reply, Ref, Reply}.
  245. %% ===================================================================
  246. %% Callbacks
  247. %% ===================================================================
  248. %% ----------------------------------------------------------------------------------------------------------
  249. %% Init
  250. %% ----------------------------------------------------------------------------------------------------------
  251. -spec init(#state{}) -> {ok, HandlerState :: term()}.
  252. init(#state{
  253. scope = Scope,
  254. table_by_name = TableByName,
  255. table_by_pid = TableByPid
  256. }) ->
  257. %% purge remote & rebuild
  258. purge_pg_for_remote_nodes(Scope, TableByName, TableByPid),
  259. rebuild_monitors(Scope, TableByName, TableByPid),
  260. %% init
  261. HandlerState = #{},
  262. {ok, HandlerState}.
  263. %% ----------------------------------------------------------------------------------------------------------
  264. %% Call messages
  265. %% ----------------------------------------------------------------------------------------------------------
  266. -spec handle_call(Request :: term(), From :: {pid(), Tag :: term()}, #state{}) ->
  267. {reply, Reply :: term(), #state{}} |
  268. {reply, Reply :: term(), #state{}, timeout() | hibernate | {continue, term()}} |
  269. {noreply, #state{}} |
  270. {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
  271. {stop, Reason :: term(), Reply :: term(), #state{}} |
  272. {stop, Reason :: term(), #state{}}.
  273. handle_call({'3.0', join_or_update_on_node, RequesterNode, GroupName, Pid, MetaOrFun}, _From, #state{
  274. table_by_name = TableByName,
  275. table_by_pid = TableByPid
  276. } = State) ->
  277. case is_process_alive(Pid) of
  278. true ->
  279. case find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) of
  280. undefined when is_function(MetaOrFun) ->
  281. {reply, {error, undefined}, State};
  282. undefined ->
  283. %% add
  284. MRef = case find_monitor_for_pid(Pid, TableByPid) of
  285. undefined -> erlang:monitor(process, Pid); %% process is not monitored yet, create
  286. MRef0 -> MRef0
  287. end,
  288. do_join_on_node(GroupName, Pid, MetaOrFun, MRef, normal, RequesterNode, on_process_joined, State);
  289. {{_, _}, TableMeta, _, MRef, _} when is_function(MetaOrFun) ->
  290. %% update with fun
  291. try MetaOrFun(TableMeta) of
  292. Meta when Meta =:= TableMeta ->
  293. {reply, {noop, TableMeta}, State};
  294. Meta ->
  295. do_join_on_node(GroupName, Pid, Meta, MRef, normal, RequesterNode, on_group_process_updated, State)
  296. catch Class:Reason:Stacktrace ->
  297. error_logger:error_msg(
  298. "SYN[~s] Error ~p:~p in pg update function: ~p",
  299. [node(), Class, Reason, Stacktrace]
  300. ),
  301. {reply, {raise, Class, Reason, Stacktrace}, State}
  302. end;
  303. {{_, _}, MetaOrFun, _, _, _} ->
  304. %% re-joined with same meta
  305. {reply, {noop, MetaOrFun}, State};
  306. {{_, _}, _, _, MRef, _} ->
  307. %% re-joined with different meta
  308. do_join_on_node(GroupName, Pid, MetaOrFun, MRef, normal, RequesterNode, on_group_process_updated, State)
  309. end;
  310. false ->
  311. {reply, {error, not_alive}, State}
  312. end;
  313. handle_call({'3.0', leave_on_node, RequesterNode, GroupName, Pid}, _From, #state{
  314. scope = Scope,
  315. table_by_name = TableByName,
  316. table_by_pid = TableByPid
  317. } = State) ->
  318. case find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) of
  319. undefined ->
  320. {reply, {error, not_in_group}, State};
  321. {{_, _}, Meta, _, _, _} ->
  322. %% is this the last group process is in?
  323. maybe_demonitor(Pid, TableByPid),
  324. %% remove from table
  325. remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
  326. %% callback
  327. syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta, normal]),
  328. %% broadcast
  329. syn_gen_scope:broadcast({'3.0', sync_leave, GroupName, Pid, Meta, normal}, [RequesterNode], State),
  330. %% return
  331. {reply, {ok, {Meta, TableByPid}}, State}
  332. end;
  333. handle_call(Request, From, #state{scope = Scope} = State) ->
  334. error_logger:warning_msg("SYN[~s|~s<~s>] Received from ~p an unknown call message: ~p",
  335. [node(), ?MODULE_LOG_NAME, Scope, From, Request]
  336. ),
  337. {reply, undefined, State}.
  338. %% ----------------------------------------------------------------------------------------------------------
  339. %% Info messages
  340. %% ----------------------------------------------------------------------------------------------------------
  341. -spec handle_info(Info :: timeout | term(), #state{}) ->
  342. {noreply, #state{}} |
  343. {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
  344. {stop, Reason :: term(), #state{}}.
  345. handle_info({'3.0', sync_join, GroupName, Pid, Meta, Time, Reason}, #state{nodes_map = NodesMap} = State) ->
  346. case maps:is_key(node(Pid), NodesMap) of
  347. true ->
  348. handle_pg_sync(GroupName, Pid, Meta, Time, Reason, State);
  349. false ->
  350. %% ignore, race condition
  351. ok
  352. end,
  353. {noreply, State};
  354. handle_info({'3.0', sync_leave, GroupName, Pid, Meta, Reason}, #state{
  355. scope = Scope,
  356. table_by_name = TableByName,
  357. table_by_pid = TableByPid
  358. } = State) ->
  359. case find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) of
  360. undefined ->
  361. %% not in table, nothing to do
  362. ok;
  363. _ ->
  364. %% remove from table
  365. remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
  366. %% callback
  367. syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta, Reason])
  368. end,
  369. %% return
  370. {noreply, State};
  371. handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{
  372. scope = Scope,
  373. table_by_name = TableByName,
  374. table_by_pid = TableByPid
  375. } = State) ->
  376. case find_pg_entries_by_pid(Pid, TableByPid) of
  377. [] ->
  378. error_logger:warning_msg(
  379. "SYN[~s|~s<~s>] Received a DOWN message from an unknown process ~p with reason: ~p",
  380. [node(), ?MODULE_LOG_NAME, Scope, Pid, Reason]
  381. );
  382. Entries ->
  383. lists:foreach(fun({{_Pid, GroupName}, Meta, _, _, _}) ->
  384. %% remove from table
  385. remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
  386. %% callback
  387. syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta, Reason]),
  388. %% broadcast
  389. syn_gen_scope:broadcast({'3.0', sync_leave, GroupName, Pid, Meta, Reason}, State)
  390. end, Entries)
  391. end,
  392. %% return
  393. {noreply, State};
  394. handle_info(Info, #state{scope = Scope} = State) ->
  395. error_logger:warning_msg("SYN[~s|~s<~s>] Received an unknown info message: ~p", [node(), ?MODULE_LOG_NAME, Scope, Info]),
  396. {noreply, State}.
  397. %% ----------------------------------------------------------------------------------------------------------
  398. %% Data callbacks
  399. %% ----------------------------------------------------------------------------------------------------------
  400. -spec get_local_data(State :: term()) -> {ok, Data :: term()} | undefined.
  401. get_local_data(#state{table_by_name = TableByName}) ->
  402. {ok, get_pg_tuples_for_node(node(), TableByName)}.
  403. -spec save_remote_data(RemoteData :: term(), State :: term()) -> any().
  404. save_remote_data(PgTuplesOfRemoteNode, #state{scope = Scope} = State) ->
  405. %% insert tuples
  406. lists:foreach(fun({GroupName, Pid, Meta, Time}) ->
  407. handle_pg_sync(GroupName, Pid, Meta, Time, {syn_remote_scope_node_up, Scope, node(Pid)}, State)
  408. end, PgTuplesOfRemoteNode).
  409. -spec purge_local_data_for_node(Node :: node(), State :: term()) -> any().
  410. purge_local_data_for_node(Node, #state{
  411. scope = Scope,
  412. table_by_name = TableByName,
  413. table_by_pid = TableByPid
  414. }) ->
  415. purge_pg_for_remote_node(Scope, Node, TableByName, TableByPid).
  416. %% ===================================================================
  417. %% Internal
  418. %% ===================================================================
  419. -spec rebuild_monitors(Scope :: atom(), TableByName :: atom(), TableByPid :: atom()) -> ok.
  420. rebuild_monitors(Scope, TableByName, TableByPid) ->
  421. PgTuples = get_pg_tuples_for_node(node(), TableByName),
  422. do_rebuild_monitors(PgTuples, #{}, Scope, TableByName, TableByPid).
  423. -spec do_rebuild_monitors(
  424. [syn_pg_tuple()],
  425. #{pid() => reference()},
  426. Scope :: atom(),
  427. TableByName :: atom(),
  428. TableByPid :: atom()
  429. ) -> ok.
  430. do_rebuild_monitors([], _, _, _, _) -> ok;
  431. do_rebuild_monitors([{GroupName, Pid, Meta, Time} | T], NewMRefs, Scope, TableByName, TableByPid) ->
  432. remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
  433. case is_process_alive(Pid) of
  434. true ->
  435. case maps:find(Pid, NewMRefs) of
  436. error ->
  437. MRef = erlang:monitor(process, Pid),
  438. add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid),
  439. do_rebuild_monitors(T, maps:put(Pid, MRef, NewMRefs), Scope, TableByName, TableByPid);
  440. {ok, MRef} ->
  441. add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid),
  442. do_rebuild_monitors(T, NewMRefs, Scope, TableByName, TableByPid)
  443. end;
  444. _ ->
  445. %% process died meanwhile, callback
  446. %% the remote callbacks will have been called when the scope process crash triggered them
  447. syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta, undefined]),
  448. %% loop
  449. do_rebuild_monitors(T, NewMRefs, Scope, TableByName, TableByPid)
  450. end.
  451. -spec do_join_on_node(
  452. GroupName :: term(),
  453. Pid :: pid(),
  454. Meta :: term(),
  455. MRef :: reference() | undefined,
  456. Reason :: term(),
  457. RequesterNode :: node(),
  458. CallbackMethod :: atom(),
  459. #state{}
  460. ) ->
  461. {
  462. reply,
  463. {ok, {
  464. CallbackMethod :: atom(),
  465. Meta :: term(),
  466. Time :: non_neg_integer(),
  467. TableByName :: atom(),
  468. TableByPid :: atom()
  469. }},
  470. #state{}
  471. }.
  472. do_join_on_node(GroupName, Pid, Meta, MRef, Reason, RequesterNode, CallbackMethod, #state{
  473. scope = Scope,
  474. table_by_name = TableByName,
  475. table_by_pid = TableByPid
  476. } = State) ->
  477. Time = erlang:system_time(),
  478. %% add to local table
  479. add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid),
  480. %% callback
  481. syn_event_handler:call_event_handler(CallbackMethod, [Scope, GroupName, Pid, Meta, Reason]),
  482. %% broadcast
  483. syn_gen_scope:broadcast({'3.0', sync_join, GroupName, Pid, Meta, Time, Reason}, [RequesterNode], State),
  484. %% return
  485. {reply, {ok, {CallbackMethod, Meta, Time, TableByName, TableByPid}}, State}.
  486. -spec get_pg_tuples_for_node(Node :: node(), TableByName :: atom()) -> [syn_pg_tuple()].
  487. get_pg_tuples_for_node(Node, TableByName) ->
  488. ets:select(TableByName, [{
  489. {{'$1', '$2'}, '$3', '$4', '_', Node},
  490. [],
  491. [{{'$1', '$2', '$3', '$4'}}]
  492. }]).
  493. -spec find_monitor_for_pid(Pid :: pid(), TableByPid :: atom()) -> reference() | undefined.
  494. find_monitor_for_pid(Pid, TableByPid) when is_pid(Pid) ->
  495. %% we use select instead of lookup to limit the results and thus cover the case
  496. %% when a process is in multiple groups
  497. case ets:select(TableByPid, [{
  498. {{Pid, '_'}, '_', '_', '$5', '_'},
  499. [],
  500. ['$5']
  501. }], 1) of
  502. {[MRef], _} -> MRef;
  503. '$end_of_table' -> undefined
  504. end.
  505. -spec find_pg_entry_by_name_and_pid(GroupName :: term(), Pid :: pid(), TableByName :: atom()) ->
  506. Entry :: syn_pg_entry() | undefined.
  507. find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) ->
  508. case ets:lookup(TableByName, {GroupName, Pid}) of
  509. [] -> undefined;
  510. [Entry] -> Entry
  511. end.
  512. -spec find_pg_entries_by_pid(Pid :: pid(), TableByPid :: atom()) -> GroupEntries :: [syn_pg_entry()].
  513. find_pg_entries_by_pid(Pid, TableByPid) when is_pid(Pid) ->
  514. ets:select(TableByPid, [{
  515. {{Pid, '_'}, '_', '_', '_', '_'},
  516. [],
  517. ['$_']
  518. }]).
  519. -spec maybe_demonitor(Pid :: pid(), TableByPid :: atom()) -> ok.
  520. maybe_demonitor(Pid, TableByPid) ->
  521. %% select 2: if only 1 is returned it means that no other aliases exist for the Pid
  522. %% we use select instead of lookup to limit the results and thus cover the case
  523. %% when a process is in multiple groups
  524. case ets:select(TableByPid, [{
  525. {{Pid, '_'}, '_', '_', '$5', '_'},
  526. [],
  527. ['$5']
  528. }], 2) of
  529. {[MRef], _} when is_reference(MRef) ->
  530. %% no other aliases, demonitor
  531. erlang:demonitor(MRef, [flush]),
  532. ok;
  533. _ ->
  534. ok
  535. end.
  536. -spec add_to_local_table(
  537. GroupName :: term(),
  538. Pid :: pid(),
  539. Meta :: term(),
  540. Time :: integer(),
  541. MRef :: undefined | reference(),
  542. TableByName :: atom(),
  543. TableByPid :: atom()
  544. ) -> true.
  545. add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid) ->
  546. %% insert
  547. ets:insert(TableByName, {{GroupName, Pid}, Meta, Time, MRef, node(Pid)}),
  548. ets:insert(TableByPid, {{Pid, GroupName}, Meta, Time, MRef, node(Pid)}).
  549. -spec remove_from_local_table(
  550. Name :: term(),
  551. Pid :: pid(),
  552. TableByName :: atom(),
  553. TableByPid :: atom()
  554. ) -> true.
  555. remove_from_local_table(GroupName, Pid, TableByName, TableByPid) ->
  556. true = ets:delete(TableByName, {GroupName, Pid}),
  557. true = ets:delete(TableByPid, {Pid, GroupName}).
  558. -spec purge_pg_for_remote_nodes(Scope :: atom(), TableByName :: atom(), TableByPid :: atom()) -> any().
  559. purge_pg_for_remote_nodes(Scope, TableByName, TableByPid) ->
  560. LocalNode = node(),
  561. DuplicatedRemoteNodes = ets:select(TableByName, [{
  562. {{'_', '_'}, '_', '_', '_', '$6'},
  563. [{'=/=', '$6', LocalNode}],
  564. ['$6']
  565. }]),
  566. RemoteNodes = ordsets:from_list(DuplicatedRemoteNodes),
  567. ordsets:fold(fun(RemoteNode, _) ->
  568. purge_pg_for_remote_node(Scope, RemoteNode, TableByName, TableByPid)
  569. end, undefined, RemoteNodes).
  570. -spec purge_pg_for_remote_node(Scope :: atom(), Node :: atom(), TableByName :: atom(), TableByPid :: atom()) -> true.
  571. purge_pg_for_remote_node(Scope, Node, TableByName, TableByPid) when Node =/= node() ->
  572. %% loop elements for callback in a separate process to free scope process
  573. PgTuples = get_pg_tuples_for_node(Node, TableByName),
  574. lists:foreach(fun({GroupName, Pid, Meta, _Time}) ->
  575. syn_event_handler:call_event_handler(on_process_left,
  576. [Scope, GroupName, Pid, Meta, {syn_remote_scope_node_down, Scope, Node}]
  577. )
  578. end, PgTuples),
  579. ets:match_delete(TableByName, {{'_', '_'}, '_', '_', '_', Node}),
  580. ets:match_delete(TableByPid, {{'_', '_'}, '_', '_', '_', Node}).
  581. -spec handle_pg_sync(
  582. GroupName :: term(),
  583. Pid :: pid(),
  584. Meta :: term(),
  585. Time :: non_neg_integer(),
  586. Reason :: term(),
  587. #state{}
  588. ) -> any().
  589. handle_pg_sync(GroupName, Pid, Meta, Time, Reason, #state{
  590. scope = Scope,
  591. table_by_name = TableByName,
  592. table_by_pid = TableByPid
  593. }) ->
  594. case find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) of
  595. undefined ->
  596. %% new
  597. add_to_local_table(GroupName, Pid, Meta, Time, undefined, TableByName, TableByPid),
  598. %% callback
  599. syn_event_handler:call_event_handler(on_process_joined, [Scope, GroupName, Pid, Meta, Reason]);
  600. {{GroupName, Pid}, TableMeta, TableTime, _, _} when Time > TableTime ->
  601. %% maybe updated meta or time only
  602. add_to_local_table(GroupName, Pid, Meta, Time, undefined, TableByName, TableByPid),
  603. %% callback (call only if meta update)
  604. case TableMeta =/= Meta of
  605. true ->
  606. syn_event_handler:call_event_handler(on_group_process_updated, [Scope, GroupName, Pid, Meta, Reason]);
  607. _ -> ok
  608. end;
  609. _ ->
  610. %% race condition: incoming data is older, ignore
  611. ok
  612. end.
  613. -spec multi_call_and_receive(
  614. CollectorPid :: pid(),
  615. Pid :: pid(),
  616. Meta :: term(),
  617. Message :: term(),
  618. Timeout :: non_neg_integer()
  619. ) -> any().
  620. multi_call_and_receive(CollectorPid, Pid, Meta, Message, Timeout) ->
  621. %% monitor
  622. MRef = monitor(process, Pid),
  623. %% send
  624. Ref = make_ref(),
  625. From = {self(), Ref},
  626. Pid ! {syn_multi_call, Message, From, Meta},
  627. %% wait for reply
  628. receive
  629. {syn_multi_call_reply, Ref, Reply} ->
  630. erlang:demonitor(MRef, [flush]),
  631. CollectorPid ! {syn_reply, Pid, Reply};
  632. {'DOWN', MRef, _, _, _} ->
  633. CollectorPid ! {syn_bad_reply, Pid}
  634. after Timeout ->
  635. erlang:demonitor(MRef, [flush]),
  636. CollectorPid ! {syn_bad_reply, Pid}
  637. end.
  638. -spec collect_replies(MembersOD :: orddict:orddict({pid(), Meta :: term()})) ->
  639. {
  640. Replies :: [{{pid(), Meta :: term()}, Reply :: term()}],
  641. BadReplies :: [{pid(), Meta :: term()}]
  642. }.
  643. collect_replies(MembersOD) ->
  644. collect_replies(MembersOD, [], []).
  645. -spec collect_replies(
  646. MembersOD :: orddict:orddict({pid(), Meta :: term()}),
  647. Replies :: [{{pid(), Meta :: term()}, Reply :: term()}],
  648. BadReplies :: [{pid(), Meta :: term()}]
  649. ) ->
  650. {
  651. Replies :: [{{pid(), Meta :: term()}, Reply :: term()}],
  652. BadReplies :: [{pid(), Meta :: term()}]
  653. }.
  654. collect_replies([], Replies, BadReplies) -> {Replies, BadReplies};
  655. collect_replies(MembersOD, Replies, BadReplies) ->
  656. receive
  657. {syn_reply, Pid, Reply} ->
  658. {Meta, MembersOD1} = orddict:take(Pid, MembersOD),
  659. collect_replies(MembersOD1, [{{Pid, Meta}, Reply} | Replies], BadReplies);
  660. {syn_bad_reply, Pid} ->
  661. {Meta, MembersOD1} = orddict:take(Pid, MembersOD),
  662. collect_replies(MembersOD1, Replies, [{Pid, Meta} | BadReplies])
  663. end.