syn_groups.erl 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631
  1. %% ==========================================================================================================
  2. %% Syn - A global Process Registry and Process Group manager.
  3. %%
  4. %% The MIT License (MIT)
  5. %%
  6. %% Copyright (c) 2015-2019 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
  7. %%
  8. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  9. %% of this software and associated documentation files (the "Software"), to deal
  10. %% in the Software without restriction, including without limitation the rights
  11. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  12. %% copies of the Software, and to permit persons to whom the Software is
  13. %% furnished to do so, subject to the following conditions:
  14. %%
  15. %% The above copyright notice and this permission notice shall be included in
  16. %% all copies or substantial portions of the Software.
  17. %%
  18. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  19. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  20. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  21. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  22. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  23. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  24. %% THE SOFTWARE.
  25. %% ==========================================================================================================
  26. -module(syn_groups).
  27. -behaviour(gen_server).
  28. %% API
  29. -export([start_link/0]).
  30. -export([join/2, join/3]).
  31. -export([leave/2]).
  32. -export([get_members/1, get_members/2]).
  33. -export([member/2]).
  34. -export([get_local_members/1, get_local_members/2]).
  35. -export([local_member/2]).
  36. -export([publish/2]).
  37. -export([publish_to_local/2]).
  38. -export([multi_call/2, multi_call/3, multi_call_reply/2]).
  39. %% sync API
  40. -export([sync_join/4, sync_leave/3]).
  41. -export([sync_get_local_groups_tuples/1]).
  42. -export([force_cluster_sync/0]).
  43. -export([remove_from_local_table/2]).
  44. %% tests
  45. -ifdef(TEST).
  46. -export([add_to_local_table/4]).
  47. -endif.
  48. %% gen_server callbacks
  49. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  50. %% internal
  51. -export([multi_call_and_receive/4]).
  52. %% records
  53. -record(state, {
  54. custom_event_handler :: undefined | module(),
  55. anti_entropy_interval_ms :: undefined | non_neg_integer(),
  56. anti_entropy_interval_max_deviation_ms :: undefined | non_neg_integer()
  57. }).
  58. %% macros
  59. -define(DEFAULT_MULTI_CALL_TIMEOUT_MS, 5000).
  60. %% includes
  61. -include("syn.hrl").
  62. %% ===================================================================
  63. %% API
  64. %% ===================================================================
  65. -spec start_link() -> {ok, pid()} | {error, any()}.
  66. start_link() ->
  67. Options = [],
  68. gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
  69. -spec join(GroupName :: any(), Pid :: pid()) -> ok.
  70. join(GroupName, Pid) ->
  71. join(GroupName, Pid, undefined).
  72. -spec join(GroupName :: any(), Pid :: pid(), Meta :: any()) -> ok.
  73. join(GroupName, Pid, Meta) when is_pid(Pid) ->
  74. Node = node(Pid),
  75. gen_server:call({?MODULE, Node}, {join_on_node, GroupName, Pid, Meta}).
  76. -spec leave(GroupName :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
  77. leave(GroupName, Pid) ->
  78. case find_groups_entry_by_name_and_pid(GroupName, Pid) of
  79. undefined ->
  80. {error, not_in_group};
  81. _ ->
  82. Node = node(Pid),
  83. gen_server:call({?MODULE, Node}, {leave_on_node, GroupName, Pid})
  84. end.
  85. -spec get_members(Name :: any()) -> [pid()].
  86. get_members(GroupName) ->
  87. lists:sort(ets:select(syn_groups_by_name, [{
  88. {{GroupName, '$2'}, '_', '_', '_'},
  89. [],
  90. ['$2']
  91. }])).
  92. -spec get_members(GroupName :: any(), with_meta) -> [{pid(), Meta :: any()}].
  93. get_members(GroupName, with_meta) ->
  94. lists:sort(ets:select(syn_groups_by_name, [{
  95. {{GroupName, '$2'}, '$3', '_', '_'},
  96. [],
  97. [{{'$2', '$3'}}]
  98. }])).
  99. -spec member(Pid :: pid(), GroupName :: any()) -> boolean().
  100. member(Pid, GroupName) ->
  101. case find_groups_entry_by_name_and_pid(GroupName, Pid) of
  102. undefined -> false;
  103. _ -> true
  104. end.
  105. -spec get_local_members(Name :: any()) -> [pid()].
  106. get_local_members(GroupName) ->
  107. Node = node(),
  108. lists:sort(ets:select(syn_groups_by_name, [{
  109. {{GroupName, '$2'}, '_', '_', Node},
  110. [],
  111. ['$2']
  112. }])).
  113. -spec get_local_members(GroupName :: any(), with_meta) -> [{pid(), Meta :: any()}].
  114. get_local_members(GroupName, with_meta) ->
  115. Node = node(),
  116. lists:sort(ets:select(syn_groups_by_name, [{
  117. {{GroupName, '$2'}, '$3', '_', Node},
  118. [],
  119. [{{'$2', '$3'}}]
  120. }])).
  121. -spec local_member(Pid :: pid(), GroupName :: any()) -> boolean().
  122. local_member(Pid, GroupName) ->
  123. case find_groups_entry_by_name_and_pid(GroupName, Pid) of
  124. {GroupName, Pid, _Meta, _MonitorRef, Node} when Node =:= node() ->
  125. true;
  126. _ ->
  127. false
  128. end.
  129. -spec publish(GroupName :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
  130. publish(GroupName, Message) ->
  131. MemberPids = get_members(GroupName),
  132. FSend = fun(Pid) ->
  133. Pid ! Message
  134. end,
  135. lists:foreach(FSend, MemberPids),
  136. {ok, length(MemberPids)}.
  137. -spec publish_to_local(GroupName :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
  138. publish_to_local(GroupName, Message) ->
  139. MemberPids = get_local_members(GroupName),
  140. FSend = fun(Pid) ->
  141. Pid ! Message
  142. end,
  143. lists:foreach(FSend, MemberPids),
  144. {ok, length(MemberPids)}.
  145. -spec multi_call(GroupName :: any(), Message :: any()) -> {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
  146. multi_call(GroupName, Message) ->
  147. multi_call(GroupName, Message, ?DEFAULT_MULTI_CALL_TIMEOUT_MS).
  148. -spec multi_call(GroupName :: any(), Message :: any(), Timeout :: non_neg_integer()) ->
  149. {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
  150. multi_call(GroupName, Message, Timeout) ->
  151. Self = self(),
  152. MemberPids = get_members(GroupName),
  153. FSend = fun(Pid) ->
  154. spawn_link(?MODULE, multi_call_and_receive, [Self, Pid, Message, Timeout])
  155. end,
  156. lists:foreach(FSend, MemberPids),
  157. collect_replies(MemberPids).
  158. -spec multi_call_reply(CallerPid :: pid(), Reply :: any()) -> {syn_multi_call_reply, pid(), Reply :: any()}.
  159. multi_call_reply(CallerPid, Reply) ->
  160. CallerPid ! {syn_multi_call_reply, self(), Reply}.
  161. -spec sync_join(RemoteNode :: node(), GroupName :: any(), Pid :: pid(), Meta :: any()) -> ok.
  162. sync_join(RemoteNode, GroupName, Pid, Meta) ->
  163. gen_server:cast({?MODULE, RemoteNode}, {sync_join, GroupName, Pid, Meta}).
  164. -spec sync_leave(RemoteNode :: node(), GroupName :: any(), Pid :: pid()) -> ok.
  165. sync_leave(RemoteNode, GroupName, Pid) ->
  166. gen_server:cast({?MODULE, RemoteNode}, {sync_leave, GroupName, Pid}).
  167. -spec sync_get_local_groups_tuples(FromNode :: node()) -> list(syn_groups_tuple()).
  168. sync_get_local_groups_tuples(FromNode) ->
  169. error_logger:info_msg("Syn(~p): Received request of local group tuples from remote node: ~p~n", [node(), FromNode]),
  170. get_groups_tuples_for_node(node()).
  171. -spec force_cluster_sync() -> ok.
  172. force_cluster_sync() ->
  173. lists:foreach(fun(RemoteNode) ->
  174. gen_server:cast({?MODULE, RemoteNode}, force_cluster_sync)
  175. end, [node() | nodes()]).
  176. %% ===================================================================
  177. %% Callbacks
  178. %% ===================================================================
  179. %% ----------------------------------------------------------------------------------------------------------
  180. %% Init
  181. %% ----------------------------------------------------------------------------------------------------------
  182. -spec init([]) ->
  183. {ok, #state{}} |
  184. {ok, #state{}, Timeout :: non_neg_integer()} |
  185. ignore |
  186. {stop, Reason :: any()}.
  187. init([]) ->
  188. %% monitor nodes
  189. ok = net_kernel:monitor_nodes(true),
  190. %% rebuild
  191. rebuild_monitors(),
  192. %% get handler
  193. CustomEventHandler = syn_backbone:get_event_handler_module(),
  194. %% get anti-entropy interval
  195. {AntiEntropyIntervalMs, AntiEntropyIntervalMaxDeviationMs} = syn_backbone:get_anti_entropy_settings(groups),
  196. %% build state
  197. State = #state{
  198. custom_event_handler = CustomEventHandler,
  199. anti_entropy_interval_ms = AntiEntropyIntervalMs,
  200. anti_entropy_interval_max_deviation_ms = AntiEntropyIntervalMaxDeviationMs
  201. },
  202. %% send message to initiate full cluster sync
  203. timer:send_after(0, self(), sync_from_full_cluster),
  204. %% start anti-entropy
  205. set_timer_for_anti_entropy(State),
  206. %% init
  207. {ok, State}.
  208. %% ----------------------------------------------------------------------------------------------------------
  209. %% Call messages
  210. %% ----------------------------------------------------------------------------------------------------------
  211. -spec handle_call(Request :: any(), From :: any(), #state{}) ->
  212. {reply, Reply :: any(), #state{}} |
  213. {reply, Reply :: any(), #state{}, Timeout :: non_neg_integer()} |
  214. {noreply, #state{}} |
  215. {noreply, #state{}, Timeout :: non_neg_integer()} |
  216. {stop, Reason :: any(), Reply :: any(), #state{}} |
  217. {stop, Reason :: any(), #state{}}.
  218. handle_call({join_on_node, GroupName, Pid, Meta}, _From, State) ->
  219. %% check if pid is alive
  220. case is_process_alive(Pid) of
  221. true ->
  222. join_on_node(GroupName, Pid, Meta),
  223. %% multicast
  224. multicast_join(GroupName, Pid, Meta),
  225. %% return
  226. {reply, ok, State};
  227. _ ->
  228. {reply, {error, not_alive}, State}
  229. end;
  230. handle_call({leave_on_node, GroupName, Pid}, _From, State) ->
  231. case leave_on_node(GroupName, Pid) of
  232. ok ->
  233. %% multicast
  234. multicast_leave(GroupName, Pid),
  235. %% return
  236. {reply, ok, State};
  237. {error, Reason} ->
  238. %% return
  239. {reply, {error, Reason}, State}
  240. end;
  241. handle_call(Request, From, State) ->
  242. error_logger:warning_msg("Syn(~p): Received from ~p an unknown call message: ~p~n", [node(), Request, From]),
  243. {reply, undefined, State}.
  244. %% ----------------------------------------------------------------------------------------------------------
  245. %% Cast messages
  246. %% ----------------------------------------------------------------------------------------------------------
  247. -spec handle_cast(Msg :: any(), #state{}) ->
  248. {noreply, #state{}} |
  249. {noreply, #state{}, Timeout :: non_neg_integer()} |
  250. {stop, Reason :: any(), #state{}}.
  251. handle_cast({sync_join, GroupName, Pid, Meta}, State) ->
  252. %% add to table
  253. add_to_local_table(GroupName, Pid, Meta, undefined),
  254. %% return
  255. {noreply, State};
  256. handle_cast({sync_leave, GroupName, Pid}, State) ->
  257. %% remove entry
  258. remove_from_local_table(GroupName, Pid),
  259. %% return
  260. {noreply, State};
  261. handle_cast(force_cluster_sync, State) ->
  262. error_logger:info_msg("Syn(~p): Initiating full cluster groups FORCED sync for nodes: ~p~n", [node(), nodes()]),
  263. do_sync_from_full_cluster(),
  264. {noreply, State};
  265. handle_cast(Msg, State) ->
  266. error_logger:warning_msg("Syn(~p): Received an unknown cast message: ~p~n", [node(), Msg]),
  267. {noreply, State}.
  268. %% ----------------------------------------------------------------------------------------------------------
  269. %% All non Call / Cast messages
  270. %% ----------------------------------------------------------------------------------------------------------
  271. -spec handle_info(Info :: any(), #state{}) ->
  272. {noreply, #state{}} |
  273. {noreply, #state{}, Timeout :: non_neg_integer()} |
  274. {stop, Reason :: any(), #state{}}.
  275. handle_info({'DOWN', _MonitorRef, process, Pid, Reason}, State) ->
  276. case find_groups_tuples_by_pid(Pid) of
  277. [] ->
  278. %% handle
  279. handle_process_down(undefined, Pid, undefined, Reason, State);
  280. GroupTuples ->
  281. lists:foreach(fun({GroupName, _Pid, Meta}) ->
  282. %% remove from table
  283. remove_from_local_table(GroupName, Pid),
  284. %% handle
  285. handle_process_down(GroupName, Pid, Meta, Reason, State),
  286. %% multicast
  287. multicast_leave(GroupName, Pid)
  288. end, GroupTuples)
  289. end,
  290. %% return
  291. {noreply, State};
  292. handle_info({nodeup, RemoteNode}, State) ->
  293. error_logger:info_msg("Syn(~p): Node ~p has joined the cluster~n", [node(), RemoteNode]),
  294. groups_automerge(RemoteNode),
  295. %% resume
  296. {noreply, State};
  297. handle_info({nodedown, RemoteNode}, State) ->
  298. error_logger:warning_msg("Syn(~p): Node ~p has left the cluster, removing group entries on local~n", [node(), RemoteNode]),
  299. raw_purge_group_entries_for_node(RemoteNode),
  300. {noreply, State};
  301. handle_info(sync_from_full_cluster, State) ->
  302. error_logger:info_msg("Syn(~p): Initiating full cluster groups sync for nodes: ~p~n", [node(), nodes()]),
  303. do_sync_from_full_cluster(),
  304. {noreply, State};
  305. handle_info(sync_anti_entropy, State) ->
  306. %% sync
  307. RemoteNodes = nodes(),
  308. case length(RemoteNodes) > 0 of
  309. true ->
  310. RandomRemoteNode = lists:nth(rand:uniform(length(RemoteNodes)), RemoteNodes),
  311. error_logger:info_msg("Syn(~p): Initiating anti-entropy sync for node ~p~n", [node(), RandomRemoteNode]),
  312. groups_automerge(RandomRemoteNode);
  313. _ ->
  314. ok
  315. end,
  316. %% set timer
  317. set_timer_for_anti_entropy(State),
  318. %% return
  319. {noreply, State};
  320. handle_info(Info, State) ->
  321. error_logger:warning_msg("Syn(~p): Received an unknown info message: ~p~n", [node(), Info]),
  322. {noreply, State}.
  323. %% ----------------------------------------------------------------------------------------------------------
  324. %% Terminate
  325. %% ----------------------------------------------------------------------------------------------------------
  326. -spec terminate(Reason :: any(), #state{}) -> terminated.
  327. terminate(Reason, _State) ->
  328. error_logger:info_msg("Syn(~p): Terminating with reason: ~p~n", [node(), Reason]),
  329. terminated.
  330. %% ----------------------------------------------------------------------------------------------------------
  331. %% Convert process state when code is changed.
  332. %% ----------------------------------------------------------------------------------------------------------
  333. -spec code_change(OldVsn :: any(), #state{}, Extra :: any()) -> {ok, #state{}}.
  334. code_change(_OldVsn, State, _Extra) ->
  335. {ok, State}.
  336. %% ===================================================================
  337. %% Internal
  338. %% ===================================================================
  339. -spec multicast_join(GroupName :: any(), Pid :: pid(), Meta :: any()) -> pid().
  340. multicast_join(GroupName, Pid, Meta) ->
  341. spawn_link(fun() ->
  342. lists:foreach(fun(RemoteNode) ->
  343. sync_join(RemoteNode, GroupName, Pid, Meta)
  344. end, nodes())
  345. end).
  346. -spec multicast_leave(GroupName :: any(), Pid :: pid()) -> pid().
  347. multicast_leave(GroupName, Pid) ->
  348. spawn_link(fun() ->
  349. lists:foreach(fun(RemoteNode) ->
  350. sync_leave(RemoteNode, GroupName, Pid)
  351. end, nodes())
  352. end).
  353. -spec join_on_node(GroupName :: any(), Pid :: pid(), Meta :: any()) -> ok.
  354. join_on_node(GroupName, Pid, Meta) ->
  355. MonitorRef = case find_monitor_for_pid(Pid) of
  356. undefined ->
  357. %% process is not monitored yet, add
  358. erlang:monitor(process, Pid);
  359. MRef ->
  360. MRef
  361. end,
  362. %% add to table
  363. add_to_local_table(GroupName, Pid, Meta, MonitorRef).
  364. -spec leave_on_node(GroupName :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
  365. leave_on_node(GroupName, Pid) ->
  366. case find_groups_entry_by_name_and_pid(GroupName, Pid) of
  367. undefined ->
  368. {error, not_in_group};
  369. {GroupName, Pid, _Meta, MonitorRef, _Node} when MonitorRef =/= undefined ->
  370. %% is this the last group process is in?
  371. case find_groups_tuples_by_pid(Pid) of
  372. [_GroupTuple] ->
  373. %% only one left (the one we're about to delete), demonitor
  374. erlang:demonitor(MonitorRef, [flush]);
  375. _ ->
  376. ok
  377. end,
  378. %% remove from table
  379. remove_from_local_table(GroupName, Pid);
  380. {GroupName, Pid, _Meta, _MonitorRef, Node} = GroupsEntry when Node =:= node() ->
  381. error_logger:error_msg(
  382. "Syn(~p): INTERNAL ERROR | Group entry ~p has no monitor but it's running on node~n",
  383. [node(), GroupsEntry]
  384. ),
  385. %% remove from table
  386. remove_from_local_table(GroupName, Pid);
  387. _ ->
  388. %% race condition: leave request but entry in table is not a local pid (has no monitor)
  389. %% ignore it, sync messages will take care of it
  390. ok
  391. end.
  392. -spec add_to_local_table(GroupName :: any(), Pid :: pid(), Meta :: any(), MonitorRef :: undefined | reference()) -> ok.
  393. add_to_local_table(GroupName, Pid, Meta, MonitorRef) ->
  394. ets:insert(syn_groups_by_name, {{GroupName, Pid}, Meta, MonitorRef, node(Pid)}),
  395. ets:insert(syn_groups_by_pid, {{Pid, GroupName}, Meta, MonitorRef, node(Pid)}),
  396. ok.
  397. -spec remove_from_local_table(GroupName :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
  398. remove_from_local_table(GroupName, Pid) ->
  399. case ets:lookup(syn_groups_by_name, {GroupName, Pid}) of
  400. [] ->
  401. {error, not_in_group};
  402. _ ->
  403. ets:match_delete(syn_groups_by_name, {{GroupName, Pid}, '_', '_', '_'}),
  404. ets:match_delete(syn_groups_by_name, {{Pid, GroupName}, '_', '_', '_'}),
  405. ok
  406. end.
  407. -spec find_groups_tuples_by_pid(Pid :: pid()) -> GroupTuples :: list(syn_groups_tuple()).
  408. find_groups_tuples_by_pid(Pid) when is_pid(Pid) ->
  409. ets:select(syn_groups_by_pid, [{
  410. {{Pid, '$2'}, '$3', '_', '_'},
  411. [],
  412. [{{'$2', Pid, '$3'}}]
  413. }]).
  414. -spec find_groups_entry_by_name_and_pid(GroupName :: any(), Pid :: pid()) -> Entry :: syn_groups_entry() | undefined.
  415. find_groups_entry_by_name_and_pid(GroupName, Pid) ->
  416. MatchBody = case is_tuple(GroupName) of
  417. true -> {{{GroupName}, Pid, '$3', '$4', '$5'}};
  418. _ -> {{GroupName, Pid, '$3', '$4', '$5'}}
  419. end,
  420. case ets:select(syn_groups_by_name, [{
  421. {{GroupName, Pid}, '$3', '$4', '$5'},
  422. [],
  423. [MatchBody]
  424. }]) of
  425. [RegistryTuple] -> RegistryTuple;
  426. _ -> undefined
  427. end.
  428. -spec find_monitor_for_pid(Pid :: pid()) -> reference() | undefined.
  429. find_monitor_for_pid(Pid) when is_pid(Pid) ->
  430. case ets:select(syn_groups_by_pid, [{
  431. {{Pid, '_'}, '_', '$4', '_'},
  432. [],
  433. ['$4']
  434. }], 1) of
  435. {[MonitorRef], _} -> MonitorRef;
  436. _ -> undefined
  437. end.
  438. -spec get_groups_tuples_for_node(Node :: node()) -> [syn_groups_tuple()].
  439. get_groups_tuples_for_node(Node) ->
  440. ets:select(syn_groups_by_name, [{
  441. {{'$1', '$2'}, '$3', '_', Node},
  442. [],
  443. [{{'$1', '$2', '$3'}}]
  444. }]).
  445. -spec handle_process_down(GroupName :: any(), Pid :: pid(), Meta :: any(), Reason :: any(), #state{}) -> ok.
  446. handle_process_down(GroupName, Pid, Meta, Reason, #state{
  447. custom_event_handler = CustomEventHandler
  448. }) ->
  449. case GroupName of
  450. undefined ->
  451. error_logger:warning_msg(
  452. "Syn(~p): Received a DOWN message from an unjoined group process ~p with reason: ~p~n",
  453. [node(), Pid, Reason]
  454. );
  455. _ ->
  456. syn_event_handler:do_on_group_process_exit(GroupName, Pid, Meta, Reason, CustomEventHandler)
  457. end.
  458. -spec groups_automerge(RemoteNode :: node()) -> ok.
  459. groups_automerge(RemoteNode) ->
  460. global:trans({{?MODULE, auto_merge_groups}, self()},
  461. fun() ->
  462. error_logger:info_msg("Syn(~p): GROUPS AUTOMERGE ----> Initiating for remote node ~p~n", [node(), RemoteNode]),
  463. %% get group tuples from remote node
  464. case rpc:call(RemoteNode, ?MODULE, sync_get_local_groups_tuples, [node()]) of
  465. {badrpc, _} ->
  466. error_logger:info_msg("Syn(~p): GROUPS AUTOMERGE <---- Syn not ready on remote node ~p, postponing~n", [node(), RemoteNode]);
  467. GroupTuples ->
  468. error_logger:info_msg(
  469. "Syn(~p): Received ~p group tuple(s) from remote node ~p~n",
  470. [node(), length(GroupTuples), RemoteNode]
  471. ),
  472. %% ensure that groups doesn't have any joining node's entries
  473. raw_purge_group_entries_for_node(RemoteNode),
  474. %% add
  475. lists:foreach(fun({GroupName, RemotePid, RemoteMeta}) ->
  476. case rpc:call(node(RemotePid), erlang, is_process_alive, [RemotePid]) of
  477. true ->
  478. add_to_local_table(GroupName, RemotePid, RemoteMeta, undefined);
  479. _ ->
  480. ok = rpc:call(RemoteNode, syn_groups, remove_from_local_table, [GroupName, RemotePid])
  481. end
  482. end, GroupTuples),
  483. %% exit
  484. error_logger:info_msg("Syn(~p): GROUPS AUTOMERGE <---- Done for remote node ~p~n", [node(), RemoteNode])
  485. end
  486. end
  487. ).
  488. -spec do_sync_from_full_cluster() -> ok.
  489. do_sync_from_full_cluster() ->
  490. lists:foreach(fun(RemoteNode) ->
  491. groups_automerge(RemoteNode)
  492. end, nodes()).
  493. -spec raw_purge_group_entries_for_node(Node :: atom()) -> ok.
  494. raw_purge_group_entries_for_node(Node) ->
  495. %% NB: no demonitoring is done, this is why it's raw
  496. ets:match_delete(syn_groups_by_name, {{'_', '_'}, '_', '_', Node}),
  497. ets:match_delete(syn_groups_by_pid, {{'_', '_'}, '_', '_', Node}),
  498. ok.
  499. -spec multi_call_and_receive(
  500. CollectorPid :: pid(),
  501. Pid :: pid(),
  502. Message :: any(),
  503. Timeout :: non_neg_integer()
  504. ) -> any().
  505. multi_call_and_receive(CollectorPid, Pid, Message, Timeout) ->
  506. MonitorRef = monitor(process, Pid),
  507. Pid ! {syn_multi_call, self(), Message},
  508. receive
  509. {syn_multi_call_reply, Pid, Reply} ->
  510. CollectorPid ! {reply, Pid, Reply};
  511. {'DOWN', MonitorRef, _, _, _} ->
  512. CollectorPid ! {bad_pid, Pid}
  513. after Timeout ->
  514. CollectorPid ! {bad_pid, Pid}
  515. end.
  516. -spec collect_replies(MemberPids :: [pid()]) -> {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
  517. collect_replies(MemberPids) ->
  518. collect_replies(MemberPids, [], []).
  519. -spec collect_replies(MemberPids :: [pid()], [{pid(), Reply :: any()}], [pid()]) ->
  520. {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
  521. collect_replies([], Replies, BadPids) -> {Replies, BadPids};
  522. collect_replies(MemberPids, Replies, BadPids) ->
  523. receive
  524. {reply, Pid, Reply} ->
  525. MemberPids1 = lists:delete(Pid, MemberPids),
  526. collect_replies(MemberPids1, [{Pid, Reply} | Replies], BadPids);
  527. {bad_pid, Pid} ->
  528. MemberPids1 = lists:delete(Pid, MemberPids),
  529. collect_replies(MemberPids1, Replies, [Pid | BadPids])
  530. end.
  531. -spec rebuild_monitors() -> ok.
  532. rebuild_monitors() ->
  533. GroupTuples = get_groups_tuples_for_node(node()),
  534. %% ensure that groups doesn't have any joining node's entries
  535. raw_purge_group_entries_for_node(node()),
  536. %% add
  537. lists:foreach(fun({GroupName, Pid, Meta}) ->
  538. case erlang:is_process_alive(Pid) of
  539. true ->
  540. join_on_node(GroupName, Pid, Meta);
  541. _ ->
  542. multicast_leave(GroupName, Pid)
  543. end
  544. end, GroupTuples).
  545. -spec set_timer_for_anti_entropy(#state{}) -> ok.
  546. set_timer_for_anti_entropy(#state{anti_entropy_interval_ms = undefined}) -> ok;
  547. set_timer_for_anti_entropy(#state{
  548. anti_entropy_interval_ms = AntiEntropyIntervalMs,
  549. anti_entropy_interval_max_deviation_ms = AntiEntropyIntervalMaxDeviationMs
  550. }) ->
  551. IntervalMs = round(AntiEntropyIntervalMs + rand:uniform() * AntiEntropyIntervalMaxDeviationMs),
  552. {ok, _} = timer:send_after(IntervalMs, self(), sync_anti_entropy),
  553. ok.