syn_pg.erl 29 KB


  1. %% ==========================================================================================================
  2. %% Syn - A global Process Registry and Process Group manager.
  3. %%
  4. %% The MIT License (MIT)
  5. %%
  6. %% Copyright (c) 2015-2022 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
  7. %%
  8. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  9. %% of this software and associated documentation files (the "Software"), to deal
  10. %% in the Software without restriction, including without limitation the rights
  11. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  12. %% copies of the Software, and to permit persons to whom the Software is
  13. %% furnished to do so, subject to the following conditions:
  14. %%
  15. %% The above copyright notice and this permission notice shall be included in
  16. %% all copies or substantial portions of the Software.
  17. %%
  18. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  19. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THxE WARRANTIES OF MERCHANTABILITY,
  20. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  21. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  22. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  23. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  24. %% THE SOFTWARE.
  25. %% ==========================================================================================================
  26. %% @private
  27. -module(syn_pg).
  28. -behaviour(syn_gen_scope).
  29. %% API
  30. -export([start_link/1]).
  31. -export([subcluster_nodes/1]).
  32. -export([join/4]).
  33. -export([leave/3]).
  34. -export([members/2]).
  35. -export([member/3]).
  36. -export([member_count/2]).
  37. -export([is_member/3]).
  38. -export([update_member/4]).
  39. -export([local_members/2]).
  40. -export([is_local_member/3]).
  41. -export([count/1, count/2]).
  42. -export([group_names/1, group_names/2]).
  43. -export([publish/3]).
  44. -export([local_publish/3]).
  45. -export([multi_call/4, multi_call_reply/2]).
  46. %% syn_gen_scope callbacks
  47. -export([
  48. init/1,
  49. handle_call/3,
  50. handle_info/2,
  51. save_remote_data/2,
  52. get_local_data/1,
  53. purge_local_data_for_node/2
  54. ]).
  55. %% internal
  56. -export([multi_call_and_receive/5]).
  57. %% macros
  58. -define(MODULE_LOG_NAME, pg).
  59. %% tests
  60. -ifdef(TEST).
  61. -export([add_to_local_table/7]).
  62. -endif.
  63. %% includes
  64. -include("syn.hrl").
  65. %% ===================================================================
  66. %% API
  67. %% ===================================================================
  68. -spec start_link(Scope :: atom()) ->
  69. {ok, Pid :: pid()} | {error, {already_started, Pid :: pid()}} | {error, Reason :: term()}.
  70. start_link(Scope) when is_atom(Scope) ->
  71. syn_gen_scope:start_link(?MODULE, ?MODULE_LOG_NAME, Scope).
  72. -spec subcluster_nodes(Scope :: atom()) -> [node()].
  73. subcluster_nodes(Scope) ->
  74. syn_gen_scope:subcluster_nodes(?MODULE, Scope).
  75. -spec members(Scope :: atom(), GroupName :: term()) -> [{Pid :: pid(), Meta :: term()}].
  76. members(Scope, GroupName) ->
  77. do_get_members(Scope, GroupName, undefined, undefined).
  78. -spec member(Scope :: atom(), GroupName :: term(), pid()) -> {pid(), Meta :: term()} | undefined.
  79. member(Scope, GroupName, Pid) ->
  80. case do_get_members(Scope, GroupName, Pid, undefined) of
  81. [] -> undefined;
  82. [Member] -> Member
  83. end.
  84. -spec is_member(Scope :: atom(), GroupName :: term(), pid()) -> boolean().
  85. is_member(Scope, GroupName, Pid) ->
  86. case syn_backbone:get_table_name(syn_pg_by_name, Scope) of
  87. undefined ->
  88. error({invalid_scope, Scope});
  89. TableByName ->
  90. case find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) of
  91. undefined -> false;
  92. _ -> true
  93. end
  94. end.
  95. -spec local_members(Scope :: atom(), GroupName :: term()) -> [{pid(), Meta :: term()}].
  96. local_members(Scope, GroupName) ->
  97. do_get_members(Scope, GroupName, undefined, node()).
  98. -spec member_count(Scope :: atom(), GroupName :: term()) -> non_neg_integer().
  99. member_count(Scope, GroupName) ->
  100. case syn_backbone:get_table_name(syn_pg_by_name, Scope) of
  101. undefined ->
  102. error({invalid_scope, Scope});
  103. TableByName ->
  104. ets:select_count(TableByName, [{
  105. {{GroupName, '$1'}, '_', '_', '_', '_'},
  106. [],
  107. [true]
  108. }])
  109. end.
  110. -spec do_get_members(Scope :: atom(), GroupName :: term(), pid() | undefined, Node :: node() | undefined) ->
  111. [{pid(), Meta :: term()}].
  112. do_get_members(Scope, GroupName, Pid, Node) ->
  113. PidParam = case Pid of
  114. undefined -> '$2';
  115. _ -> Pid
  116. end,
  117. NodeParam = case Node of
  118. undefined -> '_';
  119. _ -> Node
  120. end,
  121. case syn_backbone:get_table_name(syn_pg_by_name, Scope) of
  122. undefined ->
  123. error({invalid_scope, Scope});
  124. TableByName ->
  125. ets:select(TableByName, [{
  126. {{GroupName, PidParam}, '$3', '_', '_', NodeParam},
  127. [],
  128. [{{PidParam, '$3'}}]
  129. }])
  130. end.
  131. -spec is_local_member(Scope :: atom(), GroupName :: term(), pid()) -> boolean().
  132. is_local_member(Scope, GroupName, Pid) ->
  133. case syn_backbone:get_table_name(syn_pg_by_name, Scope) of
  134. undefined ->
  135. error({invalid_scope, Scope});
  136. TableByName ->
  137. case find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) of
  138. {{_, _}, _, _, _, Node} when Node =:= node() -> true;
  139. _ -> false
  140. end
  141. end.
  142. -spec join(Scope :: atom(), GroupName :: term(), Pid :: pid(), Meta :: term()) -> ok.
  143. join(Scope, GroupName, Pid, Meta) ->
  144. case join_or_update(Scope, GroupName, Pid, Meta) of
  145. {ok, _} -> ok;
  146. {error, Reason} -> {error, Reason}
  147. end.
  148. -spec update_member(Scope :: atom(), GroupName :: term(), Pid :: pid(), Fun :: function()) ->
  149. {ok, {Pid :: pid(), Meta :: term()}} | {error, Reason :: term()}.
  150. update_member(Scope, GroupName, Pid, Fun) when is_function(Fun) ->
  151. join_or_update(Scope, GroupName, Pid, Fun).
  152. -spec join_or_update(Scope :: atom(), GroupName :: term(), Pid :: pid(), MetaOrFun :: term() | function()) ->
  153. {ok, {Pid :: pid(), Meta :: term()}} | {error, Reason :: term()}.
  154. join_or_update(Scope, GroupName, Pid, MetaOrFun) ->
  155. case syn_backbone:is_strict_mode() of
  156. true when Pid =/= self() ->
  157. {error, not_self};
  158. _ ->
  159. Node = node(Pid),
  160. case syn_gen_scope:call(?MODULE, Node, Scope, {'3.0', join_or_update_on_node, node(), GroupName, Pid, MetaOrFun}) of
  161. {ok, {CallbackMethod, Meta, Time, TableByName, TableByPid}} when Node =/= node() ->
  162. %% update table on caller node immediately so that subsequent calls have an updated pg
  163. add_to_local_table(GroupName, Pid, Meta, Time, undefined, TableByName, TableByPid),
  164. %% callback
  165. syn_event_handler:call_event_handler(CallbackMethod, [Scope, GroupName, Pid, Meta, normal]),
  166. %% return
  167. {ok, {Pid, Meta}};
  168. {ok, {_, Meta, _, _, _}} ->
  169. {ok, {Pid, Meta}};
  170. {noop, Meta} ->
  171. {ok, {Pid, Meta}};
  172. {error, Reason} ->
  173. {error, Reason};
  174. {raise, Class, Reason, Stacktrace} ->
  175. erlang:raise(Class, Reason, Stacktrace)
  176. end
  177. end.
  178. -spec leave(Scope :: atom(), GroupName :: term(), pid()) -> ok | {error, Reason :: term()}.
  179. leave(Scope, GroupName, Pid) ->
  180. case syn_backbone:get_table_name(syn_pg_by_name, Scope) of
  181. undefined ->
  182. error({invalid_scope, Scope});
  183. TableByName ->
  184. Node = node(Pid),
  185. case syn_gen_scope:call(?MODULE, Node, Scope, {'3.0', leave_on_node, node(), GroupName, Pid}) of
  186. {ok, {Meta, TableByPid}} when Node =/= node() ->
  187. %% remove table on caller node immediately so that subsequent calls have an updated registry
  188. remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
  189. %% callback
  190. syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta, normal]),
  191. %% return
  192. ok;
  193. {ok, _} ->
  194. ok;
  195. {error, Reason} ->
  196. {error, Reason}
  197. end
  198. end.
  199. -spec count(Scope :: atom()) -> non_neg_integer().
  200. count(Scope) ->
  201. Set = group_names_ordset(Scope, '_'),
  202. ordsets:size(Set).
  203. -spec count(Scope :: atom(), Node :: node()) -> non_neg_integer().
  204. count(Scope, Node) ->
  205. Set = group_names_ordset(Scope, Node),
  206. ordsets:size(Set).
  207. -spec group_names(Scope :: atom()) -> [GroupName :: term()].
  208. group_names(Scope) ->
  209. Set = group_names_ordset(Scope, '_'),
  210. ordsets:to_list(Set).
  211. -spec group_names(Scope :: atom(), Node :: node()) -> [GroupName :: term()].
  212. group_names(Scope, Node) ->
  213. Set = group_names_ordset(Scope, Node),
  214. ordsets:to_list(Set).
  215. -spec group_names_ordset(Scope :: atom(), Node :: node()) -> ordsets:ordset(GroupName :: term()).
  216. group_names_ordset(Scope, NodeParam) ->
  217. case syn_backbone:get_table_name(syn_pg_by_name, Scope) of
  218. undefined ->
  219. error({invalid_scope, Scope});
  220. TableByName ->
  221. DuplicatedGroups = ets:select(TableByName, [{
  222. {{'$1', '_'}, '_', '_', '_', NodeParam},
  223. [],
  224. ['$1']
  225. }]),
  226. ordsets:from_list(DuplicatedGroups)
  227. end.
  228. -spec publish(Scope :: atom(), GroupName :: term(), Message :: term()) -> {ok, RecipientCount :: non_neg_integer()}.
  229. publish(Scope, GroupName, Message) ->
  230. Members = members(Scope, GroupName),
  231. do_publish(Members, Message).
  232. -spec local_publish(Scope :: atom(), GroupName :: term(), Message :: term()) -> {ok, RecipientCount :: non_neg_integer()}.
  233. local_publish(Scope, GroupName, Message) ->
  234. Members = local_members(Scope, GroupName),
  235. do_publish(Members, Message).
  236. -spec do_publish(Members :: [{Pid :: pid(), Meta :: term()}], Message :: term()) ->
  237. {ok, RecipientCount :: non_neg_integer()}.
  238. do_publish(Members, Message) ->
  239. lists:foreach(fun({Pid, _Meta}) ->
  240. Pid ! Message
  241. end, Members),
  242. {ok, length(Members)}.
  243. -spec multi_call(Scope :: atom(), GroupName :: term(), Message :: term(), Timeout :: non_neg_integer()) ->
  244. {
  245. Replies :: [{{pid(), Meta :: term()}, Reply :: term()}],
  246. BadReplies :: [{pid(), Meta :: term()}]
  247. }.
  248. multi_call(Scope, GroupName, Message, Timeout) ->
  249. Self = self(),
  250. Members = members(Scope, GroupName),
  251. lists:foreach(fun({Pid, Meta}) ->
  252. spawn_link(?MODULE, multi_call_and_receive, [Self, Pid, Meta, Message, Timeout])
  253. end, Members),
  254. collect_replies(orddict:from_list(Members)).
  255. -spec multi_call_reply({CallerPid :: term(), reference()}, Reply :: term()) -> any().
  256. multi_call_reply({CallerPid, Ref}, Reply) ->
  257. CallerPid ! {syn_multi_call_reply, Ref, Reply}.
  258. %% ===================================================================
  259. %% Callbacks
  260. %% ===================================================================
  261. %% ----------------------------------------------------------------------------------------------------------
  262. %% Init
  263. %% ----------------------------------------------------------------------------------------------------------
  264. -spec init(#state{}) -> {ok, HandlerState :: term()}.
  265. init(#state{
  266. scope = Scope,
  267. table_by_name = TableByName,
  268. table_by_pid = TableByPid
  269. }) ->
  270. %% purge remote & rebuild
  271. purge_pg_for_remote_nodes(Scope, TableByName, TableByPid),
  272. rebuild_monitors(Scope, TableByName, TableByPid),
  273. %% init
  274. HandlerState = #{},
  275. {ok, HandlerState}.
  276. %% ----------------------------------------------------------------------------------------------------------
  277. %% Call messages
  278. %% ----------------------------------------------------------------------------------------------------------
  279. -spec handle_call(Request :: term(), From :: {pid(), Tag :: term()}, #state{}) ->
  280. {reply, Reply :: term(), #state{}} |
  281. {reply, Reply :: term(), #state{}, timeout() | hibernate | {continue, term()}} |
  282. {noreply, #state{}} |
  283. {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
  284. {stop, Reason :: term(), Reply :: term(), #state{}} |
  285. {stop, Reason :: term(), #state{}}.
  286. handle_call({'3.0', join_or_update_on_node, RequesterNode, GroupName, Pid, MetaOrFun}, _From, #state{
  287. table_by_name = TableByName,
  288. table_by_pid = TableByPid
  289. } = State) ->
  290. case is_process_alive(Pid) of
  291. true ->
  292. case find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) of
  293. undefined when is_function(MetaOrFun) ->
  294. {reply, {error, undefined}, State};
  295. undefined ->
  296. %% add
  297. MRef = case find_monitor_for_pid(Pid, TableByPid) of
  298. undefined -> erlang:monitor(process, Pid); %% process is not monitored yet, create
  299. MRef0 -> MRef0
  300. end,
  301. do_join_on_node(GroupName, Pid, MetaOrFun, MRef, normal, RequesterNode, on_process_joined, State);
  302. {{_, _}, TableMeta, _, MRef, _} when is_function(MetaOrFun) ->
  303. %% update with fun
  304. try MetaOrFun(TableMeta) of
  305. Meta when Meta =:= TableMeta ->
  306. {reply, {noop, TableMeta}, State};
  307. Meta ->
  308. do_join_on_node(GroupName, Pid, Meta, MRef, normal, RequesterNode, on_group_process_updated, State)
  309. catch Class:Reason:Stacktrace ->
  310. error_logger:error_msg(
  311. "SYN[~s] Error ~p:~p in pg update function: ~p",
  312. [node(), Class, Reason, Stacktrace]
  313. ),
  314. {reply, {raise, Class, Reason, Stacktrace}, State}
  315. end;
  316. {{_, _}, MetaOrFun, _, _, _} ->
  317. %% re-joined with same meta
  318. {reply, {noop, MetaOrFun}, State};
  319. {{_, _}, _, _, MRef, _} ->
  320. %% re-joined with different meta
  321. do_join_on_node(GroupName, Pid, MetaOrFun, MRef, normal, RequesterNode, on_group_process_updated, State)
  322. end;
  323. false ->
  324. {reply, {error, not_alive}, State}
  325. end;
  326. handle_call({'3.0', leave_on_node, RequesterNode, GroupName, Pid}, _From, #state{
  327. scope = Scope,
  328. table_by_name = TableByName,
  329. table_by_pid = TableByPid
  330. } = State) ->
  331. case find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) of
  332. undefined ->
  333. {reply, {error, not_in_group}, State};
  334. {{_, _}, Meta, _, _, _} ->
  335. %% is this the last group process is in?
  336. maybe_demonitor(Pid, TableByPid),
  337. %% remove from table
  338. remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
  339. %% callback
  340. syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta, normal]),
  341. %% broadcast
  342. syn_gen_scope:broadcast({'3.0', sync_leave, GroupName, Pid, Meta, normal}, [RequesterNode], State),
  343. %% return
  344. {reply, {ok, {Meta, TableByPid}}, State}
  345. end;
  346. handle_call(Request, From, #state{scope = Scope} = State) ->
  347. error_logger:warning_msg("SYN[~s|~s<~s>] Received from ~p an unknown call message: ~p",
  348. [node(), ?MODULE_LOG_NAME, Scope, From, Request]
  349. ),
  350. {reply, undefined, State}.
  351. %% ----------------------------------------------------------------------------------------------------------
  352. %% Info messages
  353. %% ----------------------------------------------------------------------------------------------------------
  354. -spec handle_info(Info :: timeout | term(), #state{}) ->
  355. {noreply, #state{}} |
  356. {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
  357. {stop, Reason :: term(), #state{}}.
  358. handle_info({'3.0', sync_join, GroupName, Pid, Meta, Time, Reason}, #state{nodes_map = NodesMap} = State) ->
  359. case maps:is_key(node(Pid), NodesMap) of
  360. true ->
  361. handle_pg_sync(GroupName, Pid, Meta, Time, Reason, State);
  362. false ->
  363. %% ignore, race condition
  364. ok
  365. end,
  366. {noreply, State};
  367. handle_info({'3.0', sync_leave, GroupName, Pid, Meta, Reason}, #state{
  368. scope = Scope,
  369. table_by_name = TableByName,
  370. table_by_pid = TableByPid
  371. } = State) ->
  372. case find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) of
  373. undefined ->
  374. %% not in table, nothing to do
  375. ok;
  376. _ ->
  377. %% remove from table
  378. remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
  379. %% callback
  380. syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta, Reason])
  381. end,
  382. %% return
  383. {noreply, State};
  384. handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{
  385. scope = Scope,
  386. table_by_name = TableByName,
  387. table_by_pid = TableByPid
  388. } = State) ->
  389. case find_pg_entries_by_pid(Pid, TableByPid) of
  390. [] ->
  391. error_logger:warning_msg(
  392. "SYN[~s|~s<~s>] Received a DOWN message from an unknown process ~p with reason: ~p",
  393. [node(), ?MODULE_LOG_NAME, Scope, Pid, Reason]
  394. );
  395. Entries ->
  396. lists:foreach(fun({{_Pid, GroupName}, Meta, _, _, _}) ->
  397. %% remove from table
  398. remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
  399. %% callback
  400. syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta, Reason]),
  401. %% broadcast
  402. syn_gen_scope:broadcast({'3.0', sync_leave, GroupName, Pid, Meta, Reason}, State)
  403. end, Entries)
  404. end,
  405. %% return
  406. {noreply, State};
  407. handle_info(Info, #state{scope = Scope} = State) ->
  408. error_logger:warning_msg("SYN[~s|~s<~s>] Received an unknown info message: ~p", [node(), ?MODULE_LOG_NAME, Scope, Info]),
  409. {noreply, State}.
  410. %% ----------------------------------------------------------------------------------------------------------
  411. %% Data callbacks
  412. %% ----------------------------------------------------------------------------------------------------------
  413. -spec get_local_data(State :: term()) -> {ok, Data :: term()} | undefined.
  414. get_local_data(#state{table_by_name = TableByName}) ->
  415. {ok, get_pg_tuples_for_node(node(), TableByName)}.
  416. -spec save_remote_data(RemoteData :: term(), State :: term()) -> any().
  417. save_remote_data(PgTuplesOfRemoteNode, #state{scope = Scope} = State) ->
  418. %% insert tuples
  419. lists:foreach(fun({GroupName, Pid, Meta, Time}) ->
  420. handle_pg_sync(GroupName, Pid, Meta, Time, {syn_remote_scope_node_up, Scope, node(Pid)}, State)
  421. end, PgTuplesOfRemoteNode).
  422. -spec purge_local_data_for_node(Node :: node(), State :: term()) -> any().
  423. purge_local_data_for_node(Node, #state{
  424. scope = Scope,
  425. table_by_name = TableByName,
  426. table_by_pid = TableByPid
  427. }) ->
  428. purge_pg_for_remote_node(Scope, Node, TableByName, TableByPid).
  429. %% ===================================================================
  430. %% Internal
  431. %% ===================================================================
  432. -spec rebuild_monitors(Scope :: atom(), TableByName :: atom(), TableByPid :: atom()) -> ok.
  433. rebuild_monitors(Scope, TableByName, TableByPid) ->
  434. PgTuples = get_pg_tuples_for_node(node(), TableByName),
  435. do_rebuild_monitors(PgTuples, #{}, Scope, TableByName, TableByPid).
  436. -spec do_rebuild_monitors(
  437. [syn_pg_tuple()],
  438. #{pid() => reference()},
  439. Scope :: atom(),
  440. TableByName :: atom(),
  441. TableByPid :: atom()
  442. ) -> ok.
  443. do_rebuild_monitors([], _, _, _, _) -> ok;
  444. do_rebuild_monitors([{GroupName, Pid, Meta, Time} | T], NewMRefs, Scope, TableByName, TableByPid) ->
  445. remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
  446. case is_process_alive(Pid) of
  447. true ->
  448. case maps:find(Pid, NewMRefs) of
  449. error ->
  450. MRef = erlang:monitor(process, Pid),
  451. add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid),
  452. do_rebuild_monitors(T, maps:put(Pid, MRef, NewMRefs), Scope, TableByName, TableByPid);
  453. {ok, MRef} ->
  454. add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid),
  455. do_rebuild_monitors(T, NewMRefs, Scope, TableByName, TableByPid)
  456. end;
  457. _ ->
  458. %% process died meanwhile, callback
  459. %% the remote callbacks will have been called when the scope process crash triggered them
  460. syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta, undefined]),
  461. %% loop
  462. do_rebuild_monitors(T, NewMRefs, Scope, TableByName, TableByPid)
  463. end.
  464. -spec do_join_on_node(
  465. GroupName :: term(),
  466. Pid :: pid(),
  467. Meta :: term(),
  468. MRef :: reference() | undefined,
  469. Reason :: term(),
  470. RequesterNode :: node(),
  471. CallbackMethod :: atom(),
  472. #state{}
  473. ) ->
  474. {
  475. reply,
  476. {ok, {
  477. CallbackMethod :: atom(),
  478. Meta :: term(),
  479. Time :: non_neg_integer(),
  480. TableByName :: atom(),
  481. TableByPid :: atom()
  482. }},
  483. #state{}
  484. }.
  485. do_join_on_node(GroupName, Pid, Meta, MRef, Reason, RequesterNode, CallbackMethod, #state{
  486. scope = Scope,
  487. table_by_name = TableByName,
  488. table_by_pid = TableByPid
  489. } = State) ->
  490. Time = erlang:system_time(),
  491. %% add to local table
  492. add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid),
  493. %% callback
  494. syn_event_handler:call_event_handler(CallbackMethod, [Scope, GroupName, Pid, Meta, Reason]),
  495. %% broadcast
  496. syn_gen_scope:broadcast({'3.0', sync_join, GroupName, Pid, Meta, Time, Reason}, [RequesterNode], State),
  497. %% return
  498. {reply, {ok, {CallbackMethod, Meta, Time, TableByName, TableByPid}}, State}.
  499. -spec get_pg_tuples_for_node(Node :: node(), TableByName :: atom()) -> [syn_pg_tuple()].
  500. get_pg_tuples_for_node(Node, TableByName) ->
  501. ets:select(TableByName, [{
  502. {{'$1', '$2'}, '$3', '$4', '_', Node},
  503. [],
  504. [{{'$1', '$2', '$3', '$4'}}]
  505. }]).
  506. -spec find_monitor_for_pid(Pid :: pid(), TableByPid :: atom()) -> reference() | undefined.
  507. find_monitor_for_pid(Pid, TableByPid) when is_pid(Pid) ->
  508. %% we use select instead of lookup to limit the results and thus cover the case
  509. %% when a process is in multiple groups
  510. case ets:select(TableByPid, [{
  511. {{Pid, '_'}, '_', '_', '$5', '_'},
  512. [],
  513. ['$5']
  514. }], 1) of
  515. {[MRef], _} -> MRef;
  516. '$end_of_table' -> undefined
  517. end.
  518. -spec find_pg_entry_by_name_and_pid(GroupName :: term(), Pid :: pid(), TableByName :: atom()) ->
  519. Entry :: syn_pg_entry() | undefined.
  520. find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) ->
  521. case ets:lookup(TableByName, {GroupName, Pid}) of
  522. [] -> undefined;
  523. [Entry] -> Entry
  524. end.
  525. -spec find_pg_entries_by_pid(Pid :: pid(), TableByPid :: atom()) -> GroupEntries :: [syn_pg_entry()].
  526. find_pg_entries_by_pid(Pid, TableByPid) when is_pid(Pid) ->
  527. ets:select(TableByPid, [{
  528. {{Pid, '_'}, '_', '_', '_', '_'},
  529. [],
  530. ['$_']
  531. }]).
  532. -spec maybe_demonitor(Pid :: pid(), TableByPid :: atom()) -> ok.
  533. maybe_demonitor(Pid, TableByPid) ->
  534. %% select 2: if only 1 is returned it means that no other aliases exist for the Pid
  535. %% we use select instead of lookup to limit the results and thus cover the case
  536. %% when a process is in multiple groups
  537. case ets:select(TableByPid, [{
  538. {{Pid, '_'}, '_', '_', '$5', '_'},
  539. [],
  540. ['$5']
  541. }], 2) of
  542. {[MRef], _} when is_reference(MRef) ->
  543. %% no other aliases, demonitor
  544. erlang:demonitor(MRef, [flush]),
  545. ok;
  546. _ ->
  547. ok
  548. end.
  549. -spec add_to_local_table(
  550. GroupName :: term(),
  551. Pid :: pid(),
  552. Meta :: term(),
  553. Time :: integer(),
  554. MRef :: undefined | reference(),
  555. TableByName :: atom(),
  556. TableByPid :: atom()
  557. ) -> true.
  558. add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid) ->
  559. %% insert
  560. ets:insert(TableByName, {{GroupName, Pid}, Meta, Time, MRef, node(Pid)}),
  561. ets:insert(TableByPid, {{Pid, GroupName}, Meta, Time, MRef, node(Pid)}).
  562. -spec remove_from_local_table(
  563. Name :: term(),
  564. Pid :: pid(),
  565. TableByName :: atom(),
  566. TableByPid :: atom()
  567. ) -> true.
  568. remove_from_local_table(GroupName, Pid, TableByName, TableByPid) ->
  569. true = ets:delete(TableByName, {GroupName, Pid}),
  570. true = ets:delete(TableByPid, {Pid, GroupName}).
  571. -spec purge_pg_for_remote_nodes(Scope :: atom(), TableByName :: atom(), TableByPid :: atom()) -> any().
  572. purge_pg_for_remote_nodes(Scope, TableByName, TableByPid) ->
  573. LocalNode = node(),
  574. DuplicatedRemoteNodes = ets:select(TableByName, [{
  575. {{'_', '_'}, '_', '_', '_', '$6'},
  576. [{'=/=', '$6', LocalNode}],
  577. ['$6']
  578. }]),
  579. RemoteNodes = ordsets:from_list(DuplicatedRemoteNodes),
  580. ordsets:fold(fun(RemoteNode, _) ->
  581. purge_pg_for_remote_node(Scope, RemoteNode, TableByName, TableByPid)
  582. end, undefined, RemoteNodes).
  583. -spec purge_pg_for_remote_node(Scope :: atom(), Node :: atom(), TableByName :: atom(), TableByPid :: atom()) -> true.
  584. purge_pg_for_remote_node(Scope, Node, TableByName, TableByPid) when Node =/= node() ->
  585. %% loop elements for callback in a separate process to free scope process
  586. PgTuples = get_pg_tuples_for_node(Node, TableByName),
  587. lists:foreach(fun({GroupName, Pid, Meta, _Time}) ->
  588. syn_event_handler:call_event_handler(on_process_left,
  589. [Scope, GroupName, Pid, Meta, {syn_remote_scope_node_down, Scope, Node}]
  590. )
  591. end, PgTuples),
  592. ets:match_delete(TableByName, {{'_', '_'}, '_', '_', '_', Node}),
  593. ets:match_delete(TableByPid, {{'_', '_'}, '_', '_', '_', Node}).
  594. -spec handle_pg_sync(
  595. GroupName :: term(),
  596. Pid :: pid(),
  597. Meta :: term(),
  598. Time :: non_neg_integer(),
  599. Reason :: term(),
  600. #state{}
  601. ) -> any().
  602. handle_pg_sync(GroupName, Pid, Meta, Time, Reason, #state{
  603. scope = Scope,
  604. table_by_name = TableByName,
  605. table_by_pid = TableByPid
  606. }) ->
  607. case find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) of
  608. undefined ->
  609. %% new
  610. add_to_local_table(GroupName, Pid, Meta, Time, undefined, TableByName, TableByPid),
  611. %% callback
  612. syn_event_handler:call_event_handler(on_process_joined, [Scope, GroupName, Pid, Meta, Reason]);
  613. {{GroupName, Pid}, TableMeta, TableTime, _, _} when Time > TableTime ->
  614. %% maybe updated meta or time only
  615. add_to_local_table(GroupName, Pid, Meta, Time, undefined, TableByName, TableByPid),
  616. %% callback (call only if meta update)
  617. case TableMeta =/= Meta of
  618. true ->
  619. syn_event_handler:call_event_handler(on_group_process_updated, [Scope, GroupName, Pid, Meta, Reason]);
  620. _ -> ok
  621. end;
  622. _ ->
  623. %% race condition: incoming data is older, ignore
  624. ok
  625. end.
  626. -spec multi_call_and_receive(
  627. CollectorPid :: pid(),
  628. Pid :: pid(),
  629. Meta :: term(),
  630. Message :: term(),
  631. Timeout :: non_neg_integer()
  632. ) -> any().
  633. multi_call_and_receive(CollectorPid, Pid, Meta, Message, Timeout) ->
  634. %% monitor
  635. MRef = monitor(process, Pid),
  636. %% send
  637. Ref = make_ref(),
  638. From = {self(), Ref},
  639. Pid ! {syn_multi_call, Message, From, Meta},
  640. %% wait for reply
  641. receive
  642. {syn_multi_call_reply, Ref, Reply} ->
  643. erlang:demonitor(MRef, [flush]),
  644. CollectorPid ! {syn_reply, Pid, Reply};
  645. {'DOWN', MRef, _, _, _} ->
  646. CollectorPid ! {syn_bad_reply, Pid}
  647. after Timeout ->
  648. erlang:demonitor(MRef, [flush]),
  649. CollectorPid ! {syn_bad_reply, Pid}
  650. end.
  651. -spec collect_replies(MembersOD :: orddict:orddict({pid(), Meta :: term()})) ->
  652. {
  653. Replies :: [{{pid(), Meta :: term()}, Reply :: term()}],
  654. BadReplies :: [{pid(), Meta :: term()}]
  655. }.
  656. collect_replies(MembersOD) ->
  657. collect_replies(MembersOD, [], []).
  658. -spec collect_replies(
  659. MembersOD :: orddict:orddict({pid(), Meta :: term()}),
  660. Replies :: [{{pid(), Meta :: term()}, Reply :: term()}],
  661. BadReplies :: [{pid(), Meta :: term()}]
  662. ) ->
  663. {
  664. Replies :: [{{pid(), Meta :: term()}, Reply :: term()}],
  665. BadReplies :: [{pid(), Meta :: term()}]
  666. }.
  667. collect_replies([], Replies, BadReplies) -> {Replies, BadReplies};
  668. collect_replies(MembersOD, Replies, BadReplies) ->
  669. receive
  670. {syn_reply, Pid, Reply} ->
  671. {Meta, MembersOD1} = orddict:take(Pid, MembersOD),
  672. collect_replies(MembersOD1, [{{Pid, Meta}, Reply} | Replies], BadReplies);
  673. {syn_bad_reply, Pid} ->
  674. {Meta, MembersOD1} = orddict:take(Pid, MembersOD),
  675. collect_replies(MembersOD1, Replies, [{Pid, Meta} | BadReplies])
  676. end.