syn_pg.erl 28 KB


  1. %% ==========================================================================================================
  2. %% Syn - A global Process Registry and Process Group manager.
  3. %%
  4. %% The MIT License (MIT)
  5. %%
  6. %% Copyright (c) 2015-2022 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
  7. %%
  8. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  9. %% of this software and associated documentation files (the "Software"), to deal
  10. %% in the Software without restriction, including without limitation the rights
  11. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  12. %% copies of the Software, and to permit persons to whom the Software is
  13. %% furnished to do so, subject to the following conditions:
  14. %%
  15. %% The above copyright notice and this permission notice shall be included in
  16. %% all copies or substantial portions of the Software.
  17. %%
  18. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  19. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THxE WARRANTIES OF MERCHANTABILITY,
  20. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  21. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  22. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  23. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  24. %% THE SOFTWARE.
  25. %% ==========================================================================================================
  26. %% @private
  27. -module(syn_pg).
  28. -behaviour(syn_gen_scope).
  29. %% API
  30. -export([start_link/1]).
  31. -export([subcluster_nodes/1]).
  32. -export([join/4]).
  33. -export([leave/3]).
  34. -export([members/2]).
  35. -export([member/3]).
  36. -export([is_member/3]).
  37. -export([update_member/4]).
  38. -export([local_members/2]).
  39. -export([is_local_member/3]).
  40. -export([count/1, count/2]).
  41. -export([group_names/1, group_names/2]).
  42. -export([publish/3]).
  43. -export([local_publish/3]).
  44. -export([multi_call/4, multi_call_reply/2]).
  45. %% syn_gen_scope callbacks
  46. -export([
  47. init/1,
  48. handle_call/3,
  49. handle_info/2,
  50. save_remote_data/2,
  51. get_local_data/1,
  52. purge_local_data_for_node/2
  53. ]).
  54. %% internal
  55. -export([multi_call_and_receive/5]).
  56. %% macros
  57. -define(MODULE_LOG_NAME, pg).
  58. %% tests
  59. -ifdef(TEST).
  60. -export([add_to_local_table/7]).
  61. -endif.
  62. %% includes
  63. -include("syn.hrl").
  64. %% ===================================================================
  65. %% API
  66. %% ===================================================================
  67. -spec start_link(Scope :: atom()) ->
  68. {ok, Pid :: pid()} | {error, {already_started, Pid :: pid()}} | {error, Reason :: term()}.
  69. start_link(Scope) when is_atom(Scope) ->
  70. syn_gen_scope:start_link(?MODULE, ?MODULE_LOG_NAME, Scope).
  71. -spec subcluster_nodes(Scope :: atom()) -> [node()].
  72. subcluster_nodes(Scope) ->
  73. syn_gen_scope:subcluster_nodes(?MODULE, Scope).
  74. -spec members(Scope :: atom(), GroupName :: term()) -> [{Pid :: pid(), Meta :: term()}].
  75. members(Scope, GroupName) ->
  76. do_get_members(Scope, GroupName, undefined, undefined).
  77. -spec member(Scope :: atom(), GroupName :: term(), pid()) -> {pid(), Meta :: term()} | undefined.
  78. member(Scope, GroupName, Pid) ->
  79. case do_get_members(Scope, GroupName, Pid, undefined) of
  80. [] -> undefined;
  81. [Member] -> Member
  82. end.
  83. -spec is_member(Scope :: atom(), GroupName :: term(), pid()) -> boolean().
  84. is_member(Scope, GroupName, Pid) ->
  85. case syn_backbone:get_table_name(syn_pg_by_name, Scope) of
  86. undefined ->
  87. error({invalid_scope, Scope});
  88. TableByName ->
  89. case find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) of
  90. undefined -> false;
  91. _ -> true
  92. end
  93. end.
  94. -spec local_members(Scope :: atom(), GroupName :: term()) -> [{pid(), Meta :: term()}].
  95. local_members(Scope, GroupName) ->
  96. do_get_members(Scope, GroupName, undefined, node()).
  97. -spec do_get_members(Scope :: atom(), GroupName :: term(), pid() | undefined, Node :: node() | undefined) ->
  98. [{pid(), Meta :: term()}].
  99. do_get_members(Scope, GroupName, Pid, Node) ->
  100. PidParam = case Pid of
  101. undefined -> '$2';
  102. _ -> Pid
  103. end,
  104. NodeParam = case Node of
  105. undefined -> '_';
  106. _ -> Node
  107. end,
  108. case syn_backbone:get_table_name(syn_pg_by_name, Scope) of
  109. undefined ->
  110. error({invalid_scope, Scope});
  111. TableByName ->
  112. ets:select(TableByName, [{
  113. {{GroupName, PidParam}, '$3', '_', '_', NodeParam},
  114. [],
  115. [{{PidParam, '$3'}}]
  116. }])
  117. end.
  118. -spec is_local_member(Scope :: atom(), GroupName :: term(), pid()) -> boolean().
  119. is_local_member(Scope, GroupName, Pid) ->
  120. case syn_backbone:get_table_name(syn_pg_by_name, Scope) of
  121. undefined ->
  122. error({invalid_scope, Scope});
  123. TableByName ->
  124. case find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) of
  125. {{_, _}, _, _, _, Node} when Node =:= node() -> true;
  126. _ -> false
  127. end
  128. end.
  129. -spec join(Scope :: atom(), GroupName :: term(), Pid :: pid(), Meta :: term()) -> ok.
  130. join(Scope, GroupName, Pid, Meta) ->
  131. case join_or_update(Scope, GroupName, Pid, Meta) of
  132. {ok, _} -> ok;
  133. {error, Reason} -> {error, Reason}
  134. end.
  135. -spec update_member(Scope :: atom(), GroupName :: term(), Pid :: pid(), Fun :: function()) ->
  136. {ok, {Pid :: pid(), Meta :: term()}} | {error, Reason :: term()}.
  137. update_member(Scope, GroupName, Pid, Fun) when is_function(Fun) ->
  138. join_or_update(Scope, GroupName, Pid, Fun).
  139. -spec join_or_update(Scope :: atom(), GroupName :: term(), Pid :: pid(), MetaOrFun :: term() | function()) ->
  140. {ok, {Pid :: pid(), Meta :: term()}} | {error, Reason :: term()}.
  141. join_or_update(Scope, GroupName, Pid, MetaOrFun) ->
  142. case syn_backbone:is_strict_mode() of
  143. true when Pid =/= self() ->
  144. {error, not_self};
  145. _ ->
  146. Node = node(Pid),
  147. case syn_gen_scope:call(?MODULE, Node, Scope, {'3.0', join_or_update_on_node, node(), GroupName, Pid, MetaOrFun}) of
  148. {ok, {CallbackMethod, Meta, Time, TableByName, TableByPid}} when Node =/= node() ->
  149. %% update table on caller node immediately so that subsequent calls have an updated pg
  150. add_to_local_table(GroupName, Pid, Meta, Time, undefined, TableByName, TableByPid),
  151. %% callback
  152. syn_event_handler:call_event_handler(CallbackMethod, [Scope, GroupName, Pid, Meta, normal]),
  153. %% return
  154. {ok, {Pid, Meta}};
  155. {ok, {_, Meta, _, _, _}} ->
  156. {ok, {Pid, Meta}};
  157. {noop, Meta} ->
  158. {ok, {Pid, Meta}};
  159. {{error, Reason}, _} ->
  160. {error, Reason}
  161. end
  162. end.
  163. -spec leave(Scope :: atom(), GroupName :: term(), pid()) -> ok | {error, Reason :: term()}.
  164. leave(Scope, GroupName, Pid) ->
  165. case syn_backbone:get_table_name(syn_pg_by_name, Scope) of
  166. undefined ->
  167. error({invalid_scope, Scope});
  168. TableByName ->
  169. Node = node(Pid),
  170. case syn_gen_scope:call(?MODULE, Node, Scope, {'3.0', leave_on_node, node(), GroupName, Pid}) of
  171. {ok, {Meta, TableByPid}} when Node =/= node() ->
  172. %% remove table on caller node immediately so that subsequent calls have an updated registry
  173. remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
  174. %% callback
  175. syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta, normal]),
  176. %% return
  177. ok;
  178. {Response, _} ->
  179. Response
  180. end
  181. end.
  182. -spec count(Scope :: atom()) -> non_neg_integer().
  183. count(Scope) ->
  184. Set = group_names_ordset(Scope, '_'),
  185. ordsets:size(Set).
  186. -spec count(Scope :: atom(), Node :: node()) -> non_neg_integer().
  187. count(Scope, Node) ->
  188. Set = group_names_ordset(Scope, Node),
  189. ordsets:size(Set).
  190. -spec group_names(Scope :: atom()) -> [GroupName :: term()].
  191. group_names(Scope) ->
  192. Set = group_names_ordset(Scope, '_'),
  193. ordsets:to_list(Set).
  194. -spec group_names(Scope :: atom(), Node :: node()) -> [GroupName :: term()].
  195. group_names(Scope, Node) ->
  196. Set = group_names_ordset(Scope, Node),
  197. ordsets:to_list(Set).
  198. -spec group_names_ordset(Scope :: atom(), Node :: node()) -> ordsets:ordset(GroupName :: term()).
  199. group_names_ordset(Scope, NodeParam) ->
  200. case syn_backbone:get_table_name(syn_pg_by_name, Scope) of
  201. undefined ->
  202. error({invalid_scope, Scope});
  203. TableByName ->
  204. DuplicatedGroups = ets:select(TableByName, [{
  205. {{'$1', '_'}, '_', '_', '_', NodeParam},
  206. [],
  207. ['$1']
  208. }]),
  209. ordsets:from_list(DuplicatedGroups)
  210. end.
  211. -spec publish(Scope :: atom(), GroupName :: term(), Message :: term()) -> {ok, RecipientCount :: non_neg_integer()}.
  212. publish(Scope, GroupName, Message) ->
  213. Members = members(Scope, GroupName),
  214. do_publish(Members, Message).
  215. -spec local_publish(Scope :: atom(), GroupName :: term(), Message :: term()) -> {ok, RecipientCount :: non_neg_integer()}.
  216. local_publish(Scope, GroupName, Message) ->
  217. Members = local_members(Scope, GroupName),
  218. do_publish(Members, Message).
  219. -spec do_publish(Members :: [{Pid :: pid(), Meta :: term()}], Message :: term()) ->
  220. {ok, RecipientCount :: non_neg_integer()}.
  221. do_publish(Members, Message) ->
  222. lists:foreach(fun({Pid, _Meta}) ->
  223. Pid ! Message
  224. end, Members),
  225. {ok, length(Members)}.
  226. -spec multi_call(Scope :: atom(), GroupName :: term(), Message :: term(), Timeout :: non_neg_integer()) ->
  227. {
  228. Replies :: [{{pid(), Meta :: term()}, Reply :: term()}],
  229. BadReplies :: [{pid(), Meta :: term()}]
  230. }.
  231. multi_call(Scope, GroupName, Message, Timeout) ->
  232. Self = self(),
  233. Members = members(Scope, GroupName),
  234. lists:foreach(fun({Pid, Meta}) ->
  235. spawn_link(?MODULE, multi_call_and_receive, [Self, Pid, Meta, Message, Timeout])
  236. end, Members),
  237. collect_replies(orddict:from_list(Members)).
  238. -spec multi_call_reply({CallerPid :: term(), reference()}, Reply :: term()) -> any().
  239. multi_call_reply({CallerPid, Ref}, Reply) ->
  240. CallerPid ! {syn_multi_call_reply, Ref, Reply}.
  241. %% ===================================================================
  242. %% Callbacks
  243. %% ===================================================================
  244. %% ----------------------------------------------------------------------------------------------------------
  245. %% Init
  246. %% ----------------------------------------------------------------------------------------------------------
  247. -spec init(#state{}) -> {ok, HandlerState :: term()}.
  248. init(#state{
  249. scope = Scope,
  250. table_by_name = TableByName,
  251. table_by_pid = TableByPid
  252. }) ->
  253. %% purge remote & rebuild
  254. purge_pg_for_remote_nodes(Scope, TableByName, TableByPid),
  255. rebuild_monitors(Scope, TableByName, TableByPid),
  256. %% init
  257. HandlerState = #{},
  258. {ok, HandlerState}.
  259. %% ----------------------------------------------------------------------------------------------------------
  260. %% Call messages
  261. %% ----------------------------------------------------------------------------------------------------------
  262. -spec handle_call(Request :: term(), From :: {pid(), Tag :: term()}, #state{}) ->
  263. {reply, Reply :: term(), #state{}} |
  264. {reply, Reply :: term(), #state{}, timeout() | hibernate | {continue, term()}} |
  265. {noreply, #state{}} |
  266. {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
  267. {stop, Reason :: term(), Reply :: term(), #state{}} |
  268. {stop, Reason :: term(), #state{}}.
  269. handle_call({'3.0', join_or_update_on_node, RequesterNode, GroupName, Pid, MetaOrFun}, _From, #state{
  270. table_by_name = TableByName,
  271. table_by_pid = TableByPid
  272. } = State) ->
  273. case is_process_alive(Pid) of
  274. true ->
  275. case find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) of
  276. undefined when is_function(MetaOrFun) ->
  277. {reply, {{error, undefined}, undefined}, State};
  278. undefined ->
  279. %% add
  280. MRef = case find_monitor_for_pid(Pid, TableByPid) of
  281. undefined -> erlang:monitor(process, Pid); %% process is not monitored yet, create
  282. MRef0 -> MRef0
  283. end,
  284. do_join_on_node(GroupName, Pid, MetaOrFun, MRef, normal, RequesterNode, on_process_joined, State);
  285. {{_, _}, TableMeta, _, MRef, _} when is_function(MetaOrFun) ->
  286. %% update with fun
  287. try MetaOrFun(TableMeta) of
  288. Meta when Meta =:= TableMeta ->
  289. {reply, {noop, TableMeta}, State};
  290. Meta ->
  291. do_join_on_node(GroupName, Pid, Meta, MRef, normal, RequesterNode, on_group_process_updated, State)
  292. catch Class:Reason:Stacktrace ->
  293. error_logger:error_msg(
  294. "SYN[~s] Error ~p:~p in pg update function: ~p",
  295. [node(), Class, Reason, Stacktrace]
  296. ),
  297. {reply, {{error, {update_fun, {Reason, Stacktrace}}}, undefined}, State}
  298. end;
  299. {{_, _}, MetaOrFun, _, _, _} ->
  300. %% re-joined with same meta
  301. {reply, {noop, MetaOrFun}, State};
  302. {{_, _}, _, _, MRef, _} ->
  303. %% re-joined with different meta
  304. do_join_on_node(GroupName, Pid, MetaOrFun, MRef, normal, RequesterNode, on_group_process_updated, State)
  305. end;
  306. false ->
  307. {reply, {{error, not_alive}, undefined}, State}
  308. end;
  309. handle_call({'3.0', leave_on_node, RequesterNode, GroupName, Pid}, _From, #state{
  310. scope = Scope,
  311. table_by_name = TableByName,
  312. table_by_pid = TableByPid
  313. } = State) ->
  314. case find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) of
  315. undefined ->
  316. {reply, {{error, not_in_group}, undefined}, State};
  317. {{_, _}, Meta, _, _, _} ->
  318. %% is this the last group process is in?
  319. maybe_demonitor(Pid, TableByPid),
  320. %% remove from table
  321. remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
  322. %% callback
  323. syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta, normal]),
  324. %% broadcast
  325. syn_gen_scope:broadcast({'3.0', sync_leave, GroupName, Pid, Meta, normal}, [RequesterNode], State),
  326. %% return
  327. {reply, {ok, {Meta, TableByPid}}, State}
  328. end;
  329. handle_call(Request, From, #state{scope = Scope} = State) ->
  330. error_logger:warning_msg("SYN[~s|~s<~s>] Received from ~p an unknown call message: ~p",
  331. [node(), ?MODULE_LOG_NAME, Scope, From, Request]
  332. ),
  333. {reply, undefined, State}.
  334. %% ----------------------------------------------------------------------------------------------------------
  335. %% Info messages
  336. %% ----------------------------------------------------------------------------------------------------------
  337. -spec handle_info(Info :: timeout | term(), #state{}) ->
  338. {noreply, #state{}} |
  339. {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
  340. {stop, Reason :: term(), #state{}}.
  341. handle_info({'3.0', sync_join, GroupName, Pid, Meta, Time, Reason}, #state{nodes_map = NodesMap} = State) ->
  342. case maps:is_key(node(Pid), NodesMap) of
  343. true ->
  344. handle_pg_sync(GroupName, Pid, Meta, Time, Reason, State);
  345. false ->
  346. %% ignore, race condition
  347. ok
  348. end,
  349. {noreply, State};
  350. handle_info({'3.0', sync_leave, GroupName, Pid, Meta, Reason}, #state{
  351. scope = Scope,
  352. table_by_name = TableByName,
  353. table_by_pid = TableByPid
  354. } = State) ->
  355. case find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) of
  356. undefined ->
  357. %% not in table, nothing to do
  358. ok;
  359. _ ->
  360. %% remove from table
  361. remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
  362. %% callback
  363. syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta, Reason])
  364. end,
  365. %% return
  366. {noreply, State};
  367. handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{
  368. scope = Scope,
  369. table_by_name = TableByName,
  370. table_by_pid = TableByPid
  371. } = State) ->
  372. case find_pg_entries_by_pid(Pid, TableByPid) of
  373. [] ->
  374. error_logger:warning_msg(
  375. "SYN[~s|~s<~s>] Received a DOWN message from an unknown process ~p with reason: ~p",
  376. [node(), ?MODULE_LOG_NAME, Scope, Pid, Reason]
  377. );
  378. Entries ->
  379. lists:foreach(fun({{_Pid, GroupName}, Meta, _, _, _}) ->
  380. %% remove from table
  381. remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
  382. %% callback
  383. syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta, Reason]),
  384. %% broadcast
  385. syn_gen_scope:broadcast({'3.0', sync_leave, GroupName, Pid, Meta, Reason}, State)
  386. end, Entries)
  387. end,
  388. %% return
  389. {noreply, State};
  390. handle_info(Info, #state{scope = Scope} = State) ->
  391. error_logger:warning_msg("SYN[~s|~s<~s>] Received an unknown info message: ~p", [node(), ?MODULE_LOG_NAME, Scope, Info]),
  392. {noreply, State}.
  393. %% ----------------------------------------------------------------------------------------------------------
  394. %% Data callbacks
  395. %% ----------------------------------------------------------------------------------------------------------
  396. -spec get_local_data(State :: term()) -> {ok, Data :: term()} | undefined.
  397. get_local_data(#state{table_by_name = TableByName}) ->
  398. {ok, get_pg_tuples_for_node(node(), TableByName)}.
  399. -spec save_remote_data(RemoteData :: term(), State :: term()) -> any().
  400. save_remote_data(PgTuplesOfRemoteNode, #state{scope = Scope} = State) ->
  401. %% insert tuples
  402. lists:foreach(fun({GroupName, Pid, Meta, Time}) ->
  403. handle_pg_sync(GroupName, Pid, Meta, Time, {syn_remote_scope_node_up, Scope, node(Pid)}, State)
  404. end, PgTuplesOfRemoteNode).
  405. -spec purge_local_data_for_node(Node :: node(), State :: term()) -> any().
  406. purge_local_data_for_node(Node, #state{
  407. scope = Scope,
  408. table_by_name = TableByName,
  409. table_by_pid = TableByPid
  410. }) ->
  411. purge_pg_for_remote_node(Scope, Node, TableByName, TableByPid).
  412. %% ===================================================================
  413. %% Internal
  414. %% ===================================================================
  415. -spec rebuild_monitors(Scope :: atom(), TableByName :: atom(), TableByPid :: atom()) -> ok.
  416. rebuild_monitors(Scope, TableByName, TableByPid) ->
  417. PgTuples = get_pg_tuples_for_node(node(), TableByName),
  418. do_rebuild_monitors(PgTuples, #{}, Scope, TableByName, TableByPid).
  419. -spec do_rebuild_monitors(
  420. [syn_pg_tuple()],
  421. #{pid() => reference()},
  422. Scope :: atom(),
  423. TableByName :: atom(),
  424. TableByPid :: atom()
  425. ) -> ok.
  426. do_rebuild_monitors([], _, _, _, _) -> ok;
  427. do_rebuild_monitors([{GroupName, Pid, Meta, Time} | T], NewMRefs, Scope, TableByName, TableByPid) ->
  428. remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
  429. case is_process_alive(Pid) of
  430. true ->
  431. case maps:find(Pid, NewMRefs) of
  432. error ->
  433. MRef = erlang:monitor(process, Pid),
  434. add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid),
  435. do_rebuild_monitors(T, maps:put(Pid, MRef, NewMRefs), Scope, TableByName, TableByPid);
  436. {ok, MRef} ->
  437. add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid),
  438. do_rebuild_monitors(T, NewMRefs, Scope, TableByName, TableByPid)
  439. end;
  440. _ ->
  441. %% process died meanwhile, callback
  442. %% the remote callbacks will have been called when the scope process crash triggered them
  443. syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta, undefined]),
  444. %% loop
  445. do_rebuild_monitors(T, NewMRefs, Scope, TableByName, TableByPid)
  446. end.
  447. -spec do_join_on_node(
  448. GroupName :: term(),
  449. Pid :: pid(),
  450. Meta :: term(),
  451. MRef :: reference() | undefined,
  452. Reason :: term(),
  453. RequesterNode :: node(),
  454. CallbackMethod :: atom(),
  455. #state{}
  456. ) ->
  457. {
  458. reply,
  459. {ok, {
  460. CallbackMethod :: atom(),
  461. Meta :: term(),
  462. Time :: non_neg_integer(),
  463. TableByName :: atom(),
  464. TableByPid :: atom()
  465. }},
  466. #state{}
  467. }.
  468. do_join_on_node(GroupName, Pid, Meta, MRef, Reason, RequesterNode, CallbackMethod, #state{
  469. scope = Scope,
  470. table_by_name = TableByName,
  471. table_by_pid = TableByPid
  472. } = State) ->
  473. Time = erlang:system_time(),
  474. %% add to local table
  475. add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid),
  476. %% callback
  477. syn_event_handler:call_event_handler(CallbackMethod, [Scope, GroupName, Pid, Meta, Reason]),
  478. %% broadcast
  479. syn_gen_scope:broadcast({'3.0', sync_join, GroupName, Pid, Meta, Time, Reason}, [RequesterNode], State),
  480. %% return
  481. {reply, {ok, {CallbackMethod, Meta, Time, TableByName, TableByPid}}, State}.
  482. -spec get_pg_tuples_for_node(Node :: node(), TableByName :: atom()) -> [syn_pg_tuple()].
  483. get_pg_tuples_for_node(Node, TableByName) ->
  484. ets:select(TableByName, [{
  485. {{'$1', '$2'}, '$3', '$4', '_', Node},
  486. [],
  487. [{{'$1', '$2', '$3', '$4'}}]
  488. }]).
  489. -spec find_monitor_for_pid(Pid :: pid(), TableByPid :: atom()) -> reference() | undefined.
  490. find_monitor_for_pid(Pid, TableByPid) when is_pid(Pid) ->
  491. %% we use select instead of lookup to limit the results and thus cover the case
  492. %% when a process is in multiple groups
  493. case ets:select(TableByPid, [{
  494. {{Pid, '_'}, '_', '_', '$5', '_'},
  495. [],
  496. ['$5']
  497. }], 1) of
  498. {[MRef], _} -> MRef;
  499. '$end_of_table' -> undefined
  500. end.
  501. -spec find_pg_entry_by_name_and_pid(GroupName :: term(), Pid :: pid(), TableByName :: atom()) ->
  502. Entry :: syn_pg_entry() | undefined.
  503. find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) ->
  504. case ets:lookup(TableByName, {GroupName, Pid}) of
  505. [] -> undefined;
  506. [Entry] -> Entry
  507. end.
  508. -spec find_pg_entries_by_pid(Pid :: pid(), TableByPid :: atom()) -> GroupEntries :: [syn_pg_entry()].
  509. find_pg_entries_by_pid(Pid, TableByPid) when is_pid(Pid) ->
  510. ets:select(TableByPid, [{
  511. {{Pid, '_'}, '_', '_', '_', '_'},
  512. [],
  513. ['$_']
  514. }]).
  515. -spec maybe_demonitor(Pid :: pid(), TableByPid :: atom()) -> ok.
  516. maybe_demonitor(Pid, TableByPid) ->
  517. %% select 2: if only 1 is returned it means that no other aliases exist for the Pid
  518. %% we use select instead of lookup to limit the results and thus cover the case
  519. %% when a process is in multiple groups
  520. case ets:select(TableByPid, [{
  521. {{Pid, '_'}, '_', '_', '$5', '_'},
  522. [],
  523. ['$5']
  524. }], 2) of
  525. {[MRef], _} when is_reference(MRef) ->
  526. %% no other aliases, demonitor
  527. erlang:demonitor(MRef, [flush]),
  528. ok;
  529. _ ->
  530. ok
  531. end.
  532. -spec add_to_local_table(
  533. GroupName :: term(),
  534. Pid :: pid(),
  535. Meta :: term(),
  536. Time :: integer(),
  537. MRef :: undefined | reference(),
  538. TableByName :: atom(),
  539. TableByPid :: atom()
  540. ) -> true.
  541. add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid) ->
  542. %% insert
  543. ets:insert(TableByName, {{GroupName, Pid}, Meta, Time, MRef, node(Pid)}),
  544. ets:insert(TableByPid, {{Pid, GroupName}, Meta, Time, MRef, node(Pid)}).
  545. -spec remove_from_local_table(
  546. Name :: term(),
  547. Pid :: pid(),
  548. TableByName :: atom(),
  549. TableByPid :: atom()
  550. ) -> true.
  551. remove_from_local_table(GroupName, Pid, TableByName, TableByPid) ->
  552. true = ets:delete(TableByName, {GroupName, Pid}),
  553. true = ets:delete(TableByPid, {Pid, GroupName}).
  554. -spec purge_pg_for_remote_nodes(Scope :: atom(), TableByName :: atom(), TableByPid :: atom()) -> any().
  555. purge_pg_for_remote_nodes(Scope, TableByName, TableByPid) ->
  556. LocalNode = node(),
  557. DuplicatedRemoteNodes = ets:select(TableByName, [{
  558. {{'_', '_'}, '_', '_', '_', '$6'},
  559. [{'=/=', '$6', LocalNode}],
  560. ['$6']
  561. }]),
  562. RemoteNodes = ordsets:from_list(DuplicatedRemoteNodes),
  563. ordsets:fold(fun(RemoteNode, _) ->
  564. purge_pg_for_remote_node(Scope, RemoteNode, TableByName, TableByPid)
  565. end, undefined, RemoteNodes).
  566. -spec purge_pg_for_remote_node(Scope :: atom(), Node :: atom(), TableByName :: atom(), TableByPid :: atom()) -> true.
  567. purge_pg_for_remote_node(Scope, Node, TableByName, TableByPid) when Node =/= node() ->
  568. %% loop elements for callback in a separate process to free scope process
  569. PgTuples = get_pg_tuples_for_node(Node, TableByName),
  570. lists:foreach(fun({GroupName, Pid, Meta, _Time}) ->
  571. syn_event_handler:call_event_handler(on_process_left,
  572. [Scope, GroupName, Pid, Meta, {syn_remote_scope_node_down, Scope, Node}]
  573. )
  574. end, PgTuples),
  575. ets:match_delete(TableByName, {{'_', '_'}, '_', '_', '_', Node}),
  576. ets:match_delete(TableByPid, {{'_', '_'}, '_', '_', '_', Node}).
  577. -spec handle_pg_sync(
  578. GroupName :: term(),
  579. Pid :: pid(),
  580. Meta :: term(),
  581. Time :: non_neg_integer(),
  582. Reason :: term(),
  583. #state{}
  584. ) -> any().
  585. handle_pg_sync(GroupName, Pid, Meta, Time, Reason, #state{
  586. scope = Scope,
  587. table_by_name = TableByName,
  588. table_by_pid = TableByPid
  589. }) ->
  590. case find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) of
  591. undefined ->
  592. %% new
  593. add_to_local_table(GroupName, Pid, Meta, Time, undefined, TableByName, TableByPid),
  594. %% callback
  595. syn_event_handler:call_event_handler(on_process_joined, [Scope, GroupName, Pid, Meta, Reason]);
  596. {{GroupName, Pid}, TableMeta, TableTime, _, _} when Time > TableTime ->
  597. %% maybe updated meta or time only
  598. add_to_local_table(GroupName, Pid, Meta, Time, undefined, TableByName, TableByPid),
  599. %% callback (call only if meta update)
  600. case TableMeta =/= Meta of
  601. true ->
  602. syn_event_handler:call_event_handler(on_group_process_updated, [Scope, GroupName, Pid, Meta, Reason]);
  603. _ -> ok
  604. end;
  605. _ ->
  606. %% race condition: incoming data is older, ignore
  607. ok
  608. end.
  609. -spec multi_call_and_receive(
  610. CollectorPid :: pid(),
  611. Pid :: pid(),
  612. Meta :: term(),
  613. Message :: term(),
  614. Timeout :: non_neg_integer()
  615. ) -> any().
  616. multi_call_and_receive(CollectorPid, Pid, Meta, Message, Timeout) ->
  617. %% monitor
  618. MRef = monitor(process, Pid),
  619. %% send
  620. Ref = make_ref(),
  621. From = {self(), Ref},
  622. Pid ! {syn_multi_call, Message, From, Meta},
  623. %% wait for reply
  624. receive
  625. {syn_multi_call_reply, Ref, Reply} ->
  626. erlang:demonitor(MRef, [flush]),
  627. CollectorPid ! {syn_reply, Pid, Reply};
  628. {'DOWN', MRef, _, _, _} ->
  629. CollectorPid ! {syn_bad_reply, Pid}
  630. after Timeout ->
  631. erlang:demonitor(MRef, [flush]),
  632. CollectorPid ! {syn_bad_reply, Pid}
  633. end.
  634. -spec collect_replies(MembersOD :: orddict:orddict({pid(), Meta :: term()})) ->
  635. {
  636. Replies :: [{{pid(), Meta :: term()}, Reply :: term()}],
  637. BadReplies :: [{pid(), Meta :: term()}]
  638. }.
  639. collect_replies(MembersOD) ->
  640. collect_replies(MembersOD, [], []).
  641. -spec collect_replies(
  642. MembersOD :: orddict:orddict({pid(), Meta :: term()}),
  643. Replies :: [{{pid(), Meta :: term()}, Reply :: term()}],
  644. BadReplies :: [{pid(), Meta :: term()}]
  645. ) ->
  646. {
  647. Replies :: [{{pid(), Meta :: term()}, Reply :: term()}],
  648. BadReplies :: [{pid(), Meta :: term()}]
  649. }.
  650. collect_replies([], Replies, BadReplies) -> {Replies, BadReplies};
  651. collect_replies(MembersOD, Replies, BadReplies) ->
  652. receive
  653. {syn_reply, Pid, Reply} ->
  654. {Meta, MembersOD1} = orddict:take(Pid, MembersOD),
  655. collect_replies(MembersOD1, [{{Pid, Meta}, Reply} | Replies], BadReplies);
  656. {syn_bad_reply, Pid} ->
  657. {Meta, MembersOD1} = orddict:take(Pid, MembersOD),
  658. collect_replies(MembersOD1, Replies, [{Pid, Meta} | BadReplies])
  659. end.