syn_groups.erl 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652
  1. %% ==========================================================================================================
  2. %% Syn - A global Process Registry and Process Group manager.
  3. %%
  4. %% The MIT License (MIT)
  5. %%
  6. %% Copyright (c) 2015-2019 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
  7. %%
  8. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  9. %% of this software and associated documentation files (the "Software"), to deal
  10. %% in the Software without restriction, including without limitation the rights
  11. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  12. %% copies of the Software, and to permit persons to whom the Software is
  13. %% furnished to do so, subject to the following conditions:
  14. %%
  15. %% The above copyright notice and this permission notice shall be included in
  16. %% all copies or substantial portions of the Software.
  17. %%
  18. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  19. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  20. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  21. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  22. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  23. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  24. %% THE SOFTWARE.
  25. %% ==========================================================================================================
  26. -module(syn_groups).
  27. -behaviour(gen_server).
  28. %% API
  29. -export([start_link/0]).
  30. -export([join/2, join/3]).
  31. -export([leave/2]).
  32. -export([get_members/1, get_members/2]).
  33. -export([member/2]).
  34. -export([get_local_members/1, get_local_members/2]).
  35. -export([local_member/2]).
  36. -export([count/0, count/1]).
  37. -export([publish/2]).
  38. -export([publish_to_local/2]).
  39. -export([multi_call/2, multi_call/3, multi_call_reply/2]).
  40. %% sync API
  41. -export([sync_join/4, sync_leave/3]).
  42. -export([sync_get_local_groups_tuples/1]).
  43. -export([force_cluster_sync/0]).
  44. -export([remove_from_local_table/2]).
  45. %% tests
  46. -ifdef(TEST).
  47. -export([add_to_local_table/4]).
  48. -endif.
  49. %% gen_server callbacks
  50. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  51. %% internal
  52. -export([multi_call_and_receive/4]).
  53. %% records
  54. -record(state, {
  55. custom_event_handler :: undefined | module(),
  56. anti_entropy_interval_ms :: undefined | non_neg_integer(),
  57. anti_entropy_interval_max_deviation_ms :: undefined | non_neg_integer()
  58. }).
  59. %% macros
  60. -define(DEFAULT_MULTI_CALL_TIMEOUT_MS, 5000).
  61. %% includes
  62. -include("syn.hrl").
  63. %% ===================================================================
  64. %% API
  65. %% ===================================================================
  66. -spec start_link() -> {ok, pid()} | {error, any()}.
  67. start_link() ->
  68. Options = [],
  69. gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
  70. -spec join(GroupName :: any(), Pid :: pid()) -> ok.
  71. join(GroupName, Pid) ->
  72. join(GroupName, Pid, undefined).
  73. -spec join(GroupName :: any(), Pid :: pid(), Meta :: any()) -> ok.
  74. join(GroupName, Pid, Meta) when is_pid(Pid) ->
  75. Node = node(Pid),
  76. gen_server:call({?MODULE, Node}, {join_on_node, GroupName, Pid, Meta}).
  77. -spec leave(GroupName :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
  78. leave(GroupName, Pid) ->
  79. case find_groups_entry_by_name_and_pid(GroupName, Pid) of
  80. undefined ->
  81. {error, not_in_group};
  82. _ ->
  83. Node = node(Pid),
  84. gen_server:call({?MODULE, Node}, {leave_on_node, GroupName, Pid})
  85. end.
  86. -spec get_members(Name :: any()) -> [pid()].
  87. get_members(GroupName) ->
  88. ets:select(syn_groups_by_name, [{
  89. {{GroupName, '$2'}, '_', '_', '_'},
  90. [],
  91. ['$2']
  92. }]).
  93. -spec get_members(GroupName :: any(), with_meta) -> [{pid(), Meta :: any()}].
  94. get_members(GroupName, with_meta) ->
  95. ets:select(syn_groups_by_name, [{
  96. {{GroupName, '$2'}, '$3', '_', '_'},
  97. [],
  98. [{{'$2', '$3'}}]
  99. }]).
  100. -spec member(Pid :: pid(), GroupName :: any()) -> boolean().
  101. member(Pid, GroupName) ->
  102. case find_groups_entry_by_name_and_pid(GroupName, Pid) of
  103. undefined -> false;
  104. _ -> true
  105. end.
  106. -spec get_local_members(Name :: any()) -> [pid()].
  107. get_local_members(GroupName) ->
  108. Node = node(),
  109. ets:select(syn_groups_by_name, [{
  110. {{GroupName, '$2'}, '_', '_', Node},
  111. [],
  112. ['$2']
  113. }]).
  114. -spec get_local_members(GroupName :: any(), with_meta) -> [{pid(), Meta :: any()}].
  115. get_local_members(GroupName, with_meta) ->
  116. Node = node(),
  117. ets:select(syn_groups_by_name, [{
  118. {{GroupName, '$2'}, '$3', '_', Node},
  119. [],
  120. [{{'$2', '$3'}}]
  121. }]).
  122. -spec local_member(Pid :: pid(), GroupName :: any()) -> boolean().
  123. local_member(Pid, GroupName) ->
  124. case find_groups_entry_by_name_and_pid(GroupName, Pid) of
  125. {GroupName, Pid, _Meta, _MonitorRef, Node} when Node =:= node() ->
  126. true;
  127. _ ->
  128. false
  129. end.
  130. -spec count() -> non_neg_integer().
  131. count() ->
  132. Entries = ets:select(syn_groups_by_name, [{
  133. {{'$1', '_'}, '_', '_', '_'},
  134. [],
  135. ['$1']
  136. }]),
  137. Set = sets:from_list(Entries),
  138. sets:size(Set).
  139. -spec count(Node :: node()) -> non_neg_integer().
  140. count(Node) ->
  141. Entries = ets:select(syn_groups_by_name, [{
  142. {{'$1', '_'}, '_', '_', Node},
  143. [],
  144. ['$1']
  145. }]),
  146. Set = sets:from_list(Entries),
  147. sets:size(Set).
  148. -spec publish(GroupName :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
  149. publish(GroupName, Message) ->
  150. MemberPids = get_members(GroupName),
  151. FSend = fun(Pid) ->
  152. Pid ! Message
  153. end,
  154. lists:foreach(FSend, MemberPids),
  155. {ok, length(MemberPids)}.
  156. -spec publish_to_local(GroupName :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
  157. publish_to_local(GroupName, Message) ->
  158. MemberPids = get_local_members(GroupName),
  159. FSend = fun(Pid) ->
  160. Pid ! Message
  161. end,
  162. lists:foreach(FSend, MemberPids),
  163. {ok, length(MemberPids)}.
  164. -spec multi_call(GroupName :: any(), Message :: any()) -> {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
  165. multi_call(GroupName, Message) ->
  166. multi_call(GroupName, Message, ?DEFAULT_MULTI_CALL_TIMEOUT_MS).
  167. -spec multi_call(GroupName :: any(), Message :: any(), Timeout :: non_neg_integer()) ->
  168. {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
  169. multi_call(GroupName, Message, Timeout) ->
  170. Self = self(),
  171. MemberPids = get_members(GroupName),
  172. FSend = fun(Pid) ->
  173. spawn_link(?MODULE, multi_call_and_receive, [Self, Pid, Message, Timeout])
  174. end,
  175. lists:foreach(FSend, MemberPids),
  176. collect_replies(MemberPids).
  177. -spec multi_call_reply(CallerPid :: pid(), Reply :: any()) -> {syn_multi_call_reply, pid(), Reply :: any()}.
  178. multi_call_reply(CallerPid, Reply) ->
  179. CallerPid ! {syn_multi_call_reply, self(), Reply}.
  180. -spec sync_join(RemoteNode :: node(), GroupName :: any(), Pid :: pid(), Meta :: any()) -> ok.
  181. sync_join(RemoteNode, GroupName, Pid, Meta) ->
  182. gen_server:cast({?MODULE, RemoteNode}, {sync_join, GroupName, Pid, Meta}).
  183. -spec sync_leave(RemoteNode :: node(), GroupName :: any(), Pid :: pid()) -> ok.
  184. sync_leave(RemoteNode, GroupName, Pid) ->
  185. gen_server:cast({?MODULE, RemoteNode}, {sync_leave, GroupName, Pid}).
  186. -spec sync_get_local_groups_tuples(FromNode :: node()) -> list(syn_groups_tuple()).
  187. sync_get_local_groups_tuples(FromNode) ->
  188. error_logger:info_msg("Syn(~p): Received request of local group tuples from remote node: ~p~n", [node(), FromNode]),
  189. get_groups_tuples_for_node(node()).
  190. -spec force_cluster_sync() -> ok.
  191. force_cluster_sync() ->
  192. lists:foreach(fun(RemoteNode) ->
  193. gen_server:cast({?MODULE, RemoteNode}, force_cluster_sync)
  194. end, [node() | nodes()]).
  195. %% ===================================================================
  196. %% Callbacks
  197. %% ===================================================================
  198. %% ----------------------------------------------------------------------------------------------------------
  199. %% Init
  200. %% ----------------------------------------------------------------------------------------------------------
  201. -spec init([]) ->
  202. {ok, #state{}} |
  203. {ok, #state{}, Timeout :: non_neg_integer()} |
  204. ignore |
  205. {stop, Reason :: any()}.
  206. init([]) ->
  207. %% monitor nodes
  208. ok = net_kernel:monitor_nodes(true),
  209. %% rebuild
  210. rebuild_monitors(),
  211. %% get handler
  212. CustomEventHandler = syn_backbone:get_event_handler_module(),
  213. %% get anti-entropy interval
  214. {AntiEntropyIntervalMs, AntiEntropyIntervalMaxDeviationMs} = syn_backbone:get_anti_entropy_settings(groups),
  215. %% build state
  216. State = #state{
  217. custom_event_handler = CustomEventHandler,
  218. anti_entropy_interval_ms = AntiEntropyIntervalMs,
  219. anti_entropy_interval_max_deviation_ms = AntiEntropyIntervalMaxDeviationMs
  220. },
  221. %% send message to initiate full cluster sync
  222. timer:send_after(0, self(), sync_from_full_cluster),
  223. %% start anti-entropy
  224. set_timer_for_anti_entropy(State),
  225. %% init
  226. {ok, State}.
  227. %% ----------------------------------------------------------------------------------------------------------
  228. %% Call messages
  229. %% ----------------------------------------------------------------------------------------------------------
  230. -spec handle_call(Request :: any(), From :: any(), #state{}) ->
  231. {reply, Reply :: any(), #state{}} |
  232. {reply, Reply :: any(), #state{}, Timeout :: non_neg_integer()} |
  233. {noreply, #state{}} |
  234. {noreply, #state{}, Timeout :: non_neg_integer()} |
  235. {stop, Reason :: any(), Reply :: any(), #state{}} |
  236. {stop, Reason :: any(), #state{}}.
  237. handle_call({join_on_node, GroupName, Pid, Meta}, _From, State) ->
  238. %% check if pid is alive
  239. case is_process_alive(Pid) of
  240. true ->
  241. join_on_node(GroupName, Pid, Meta),
  242. %% multicast
  243. multicast_join(GroupName, Pid, Meta),
  244. %% return
  245. {reply, ok, State};
  246. _ ->
  247. {reply, {error, not_alive}, State}
  248. end;
  249. handle_call({leave_on_node, GroupName, Pid}, _From, State) ->
  250. case leave_on_node(GroupName, Pid) of
  251. ok ->
  252. %% multicast
  253. multicast_leave(GroupName, Pid),
  254. %% return
  255. {reply, ok, State};
  256. {error, Reason} ->
  257. %% return
  258. {reply, {error, Reason}, State}
  259. end;
  260. handle_call(Request, From, State) ->
  261. error_logger:warning_msg("Syn(~p): Received from ~p an unknown call message: ~p~n", [node(), Request, From]),
  262. {reply, undefined, State}.
  263. %% ----------------------------------------------------------------------------------------------------------
  264. %% Cast messages
  265. %% ----------------------------------------------------------------------------------------------------------
  266. -spec handle_cast(Msg :: any(), #state{}) ->
  267. {noreply, #state{}} |
  268. {noreply, #state{}, Timeout :: non_neg_integer()} |
  269. {stop, Reason :: any(), #state{}}.
  270. handle_cast({sync_join, GroupName, Pid, Meta}, State) ->
  271. %% add to table
  272. add_to_local_table(GroupName, Pid, Meta, undefined),
  273. %% return
  274. {noreply, State};
  275. handle_cast({sync_leave, GroupName, Pid}, State) ->
  276. %% remove entry
  277. remove_from_local_table(GroupName, Pid),
  278. %% return
  279. {noreply, State};
  280. handle_cast(force_cluster_sync, State) ->
  281. error_logger:info_msg("Syn(~p): Initiating full cluster groups FORCED sync for nodes: ~p~n", [node(), nodes()]),
  282. do_sync_from_full_cluster(),
  283. {noreply, State};
  284. handle_cast(Msg, State) ->
  285. error_logger:warning_msg("Syn(~p): Received an unknown cast message: ~p~n", [node(), Msg]),
  286. {noreply, State}.
  287. %% ----------------------------------------------------------------------------------------------------------
  288. %% All non Call / Cast messages
  289. %% ----------------------------------------------------------------------------------------------------------
  290. -spec handle_info(Info :: any(), #state{}) ->
  291. {noreply, #state{}} |
  292. {noreply, #state{}, Timeout :: non_neg_integer()} |
  293. {stop, Reason :: any(), #state{}}.
  294. handle_info({'DOWN', _MonitorRef, process, Pid, Reason}, State) ->
  295. case find_groups_tuples_by_pid(Pid) of
  296. [] ->
  297. %% handle
  298. handle_process_down(undefined, Pid, undefined, Reason, State);
  299. GroupTuples ->
  300. lists:foreach(fun({GroupName, _Pid, Meta}) ->
  301. %% remove from table
  302. remove_from_local_table(GroupName, Pid),
  303. %% handle
  304. handle_process_down(GroupName, Pid, Meta, Reason, State),
  305. %% multicast
  306. multicast_leave(GroupName, Pid)
  307. end, GroupTuples)
  308. end,
  309. %% return
  310. {noreply, State};
  311. handle_info({nodeup, RemoteNode}, State) ->
  312. error_logger:info_msg("Syn(~p): Node ~p has joined the cluster~n", [node(), RemoteNode]),
  313. groups_automerge(RemoteNode),
  314. %% resume
  315. {noreply, State};
  316. handle_info({nodedown, RemoteNode}, State) ->
  317. error_logger:warning_msg("Syn(~p): Node ~p has left the cluster, removing group entries on local~n", [node(), RemoteNode]),
  318. raw_purge_group_entries_for_node(RemoteNode),
  319. {noreply, State};
  320. handle_info(sync_from_full_cluster, State) ->
  321. error_logger:info_msg("Syn(~p): Initiating full cluster groups sync for nodes: ~p~n", [node(), nodes()]),
  322. do_sync_from_full_cluster(),
  323. {noreply, State};
  324. handle_info(sync_anti_entropy, State) ->
  325. %% sync
  326. RemoteNodes = nodes(),
  327. case length(RemoteNodes) > 0 of
  328. true ->
  329. RandomRemoteNode = lists:nth(rand:uniform(length(RemoteNodes)), RemoteNodes),
  330. error_logger:info_msg("Syn(~p): Initiating anti-entropy sync for node ~p~n", [node(), RandomRemoteNode]),
  331. groups_automerge(RandomRemoteNode);
  332. _ ->
  333. ok
  334. end,
  335. %% set timer
  336. set_timer_for_anti_entropy(State),
  337. %% return
  338. {noreply, State};
  339. handle_info(Info, State) ->
  340. error_logger:warning_msg("Syn(~p): Received an unknown info message: ~p~n", [node(), Info]),
  341. {noreply, State}.
  342. %% ----------------------------------------------------------------------------------------------------------
  343. %% Terminate
  344. %% ----------------------------------------------------------------------------------------------------------
  345. -spec terminate(Reason :: any(), #state{}) -> terminated.
  346. terminate(Reason, _State) ->
  347. error_logger:info_msg("Syn(~p): Terminating with reason: ~p~n", [node(), Reason]),
  348. terminated.
  349. %% ----------------------------------------------------------------------------------------------------------
  350. %% Convert process state when code is changed.
  351. %% ----------------------------------------------------------------------------------------------------------
  352. -spec code_change(OldVsn :: any(), #state{}, Extra :: any()) -> {ok, #state{}}.
  353. code_change(_OldVsn, State, _Extra) ->
  354. {ok, State}.
  355. %% ===================================================================
  356. %% Internal
  357. %% ===================================================================
  358. -spec multicast_join(GroupName :: any(), Pid :: pid(), Meta :: any()) -> pid().
  359. multicast_join(GroupName, Pid, Meta) ->
  360. spawn_link(fun() ->
  361. lists:foreach(fun(RemoteNode) ->
  362. sync_join(RemoteNode, GroupName, Pid, Meta)
  363. end, nodes())
  364. end).
  365. -spec multicast_leave(GroupName :: any(), Pid :: pid()) -> pid().
  366. multicast_leave(GroupName, Pid) ->
  367. spawn_link(fun() ->
  368. lists:foreach(fun(RemoteNode) ->
  369. sync_leave(RemoteNode, GroupName, Pid)
  370. end, nodes())
  371. end).
  372. -spec join_on_node(GroupName :: any(), Pid :: pid(), Meta :: any()) -> ok.
  373. join_on_node(GroupName, Pid, Meta) ->
  374. MonitorRef = case find_monitor_for_pid(Pid) of
  375. undefined ->
  376. %% process is not monitored yet, add
  377. erlang:monitor(process, Pid);
  378. MRef ->
  379. MRef
  380. end,
  381. %% add to table
  382. add_to_local_table(GroupName, Pid, Meta, MonitorRef).
  383. -spec leave_on_node(GroupName :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
  384. leave_on_node(GroupName, Pid) ->
  385. case find_groups_entry_by_name_and_pid(GroupName, Pid) of
  386. undefined ->
  387. {error, not_in_group};
  388. {GroupName, Pid, _Meta, MonitorRef, _Node} when MonitorRef =/= undefined ->
  389. %% is this the last group process is in?
  390. case find_groups_tuples_by_pid(Pid) of
  391. [_GroupTuple] ->
  392. %% only one left (the one we're about to delete), demonitor
  393. erlang:demonitor(MonitorRef, [flush]);
  394. _ ->
  395. ok
  396. end,
  397. %% remove from table
  398. remove_from_local_table(GroupName, Pid);
  399. {GroupName, Pid, _Meta, _MonitorRef, Node} = GroupsEntry when Node =:= node() ->
  400. error_logger:error_msg(
  401. "Syn(~p): INTERNAL ERROR | Group entry ~p has no monitor but it's running on node~n",
  402. [node(), GroupsEntry]
  403. ),
  404. %% remove from table
  405. remove_from_local_table(GroupName, Pid);
  406. _ ->
  407. %% race condition: leave request but entry in table is not a local pid (has no monitor)
  408. %% ignore it, sync messages will take care of it
  409. ok
  410. end.
  411. -spec add_to_local_table(GroupName :: any(), Pid :: pid(), Meta :: any(), MonitorRef :: undefined | reference()) -> ok.
  412. add_to_local_table(GroupName, Pid, Meta, MonitorRef) ->
  413. ets:insert(syn_groups_by_name, {{GroupName, Pid}, Meta, MonitorRef, node(Pid)}),
  414. ets:insert(syn_groups_by_pid, {{Pid, GroupName}, Meta, MonitorRef, node(Pid)}),
  415. ok.
  416. -spec remove_from_local_table(GroupName :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
  417. remove_from_local_table(GroupName, Pid) ->
  418. case ets:lookup(syn_groups_by_name, {GroupName, Pid}) of
  419. [] ->
  420. {error, not_in_group};
  421. _ ->
  422. ets:match_delete(syn_groups_by_name, {{GroupName, Pid}, '_', '_', '_'}),
  423. ets:match_delete(syn_groups_by_name, {{Pid, GroupName}, '_', '_', '_'}),
  424. ok
  425. end.
  426. -spec find_groups_tuples_by_pid(Pid :: pid()) -> GroupTuples :: list(syn_groups_tuple()).
  427. find_groups_tuples_by_pid(Pid) when is_pid(Pid) ->
  428. ets:select(syn_groups_by_pid, [{
  429. {{Pid, '$2'}, '$3', '_', '_'},
  430. [],
  431. [{{'$2', Pid, '$3'}}]
  432. }]).
  433. -spec find_groups_entry_by_name_and_pid(GroupName :: any(), Pid :: pid()) -> Entry :: syn_groups_entry() | undefined.
  434. find_groups_entry_by_name_and_pid(GroupName, Pid) ->
  435. MatchBody = case is_tuple(GroupName) of
  436. true -> {{{GroupName}, Pid, '$3', '$4', '$5'}};
  437. _ -> {{GroupName, Pid, '$3', '$4', '$5'}}
  438. end,
  439. case ets:select(syn_groups_by_name, [{
  440. {{GroupName, Pid}, '$3', '$4', '$5'},
  441. [],
  442. [MatchBody]
  443. }]) of
  444. [RegistryTuple] -> RegistryTuple;
  445. _ -> undefined
  446. end.
  447. -spec find_monitor_for_pid(Pid :: pid()) -> reference() | undefined.
  448. find_monitor_for_pid(Pid) when is_pid(Pid) ->
  449. case ets:select(syn_groups_by_pid, [{
  450. {{Pid, '_'}, '_', '$4', '_'},
  451. [],
  452. ['$4']
  453. }], 1) of
  454. {[MonitorRef], _} -> MonitorRef;
  455. _ -> undefined
  456. end.
  457. -spec get_groups_tuples_for_node(Node :: node()) -> [syn_groups_tuple()].
  458. get_groups_tuples_for_node(Node) ->
  459. ets:select(syn_groups_by_name, [{
  460. {{'$1', '$2'}, '$3', '_', Node},
  461. [],
  462. [{{'$1', '$2', '$3'}}]
  463. }]).
  464. -spec handle_process_down(GroupName :: any(), Pid :: pid(), Meta :: any(), Reason :: any(), #state{}) -> ok.
  465. handle_process_down(GroupName, Pid, Meta, Reason, #state{
  466. custom_event_handler = CustomEventHandler
  467. }) ->
  468. case GroupName of
  469. undefined ->
  470. error_logger:warning_msg(
  471. "Syn(~p): Received a DOWN message from an unjoined group process ~p with reason: ~p~n",
  472. [node(), Pid, Reason]
  473. );
  474. _ ->
  475. syn_event_handler:do_on_group_process_exit(GroupName, Pid, Meta, Reason, CustomEventHandler)
  476. end.
  477. -spec groups_automerge(RemoteNode :: node()) -> ok.
  478. groups_automerge(RemoteNode) ->
  479. global:trans({{?MODULE, auto_merge_groups}, self()},
  480. fun() ->
  481. error_logger:info_msg("Syn(~p): GROUPS AUTOMERGE ----> Initiating for remote node ~p~n", [node(), RemoteNode]),
  482. %% get group tuples from remote node
  483. case rpc:call(RemoteNode, ?MODULE, sync_get_local_groups_tuples, [node()]) of
  484. {badrpc, _} ->
  485. error_logger:info_msg("Syn(~p): GROUPS AUTOMERGE <---- Syn not ready on remote node ~p, postponing~n", [node(), RemoteNode]);
  486. GroupTuples ->
  487. error_logger:info_msg(
  488. "Syn(~p): Received ~p group tuple(s) from remote node ~p~n",
  489. [node(), length(GroupTuples), RemoteNode]
  490. ),
  491. %% ensure that groups doesn't have any joining node's entries
  492. raw_purge_group_entries_for_node(RemoteNode),
  493. %% add
  494. lists:foreach(fun({GroupName, RemotePid, RemoteMeta}) ->
  495. case rpc:call(node(RemotePid), erlang, is_process_alive, [RemotePid]) of
  496. true ->
  497. add_to_local_table(GroupName, RemotePid, RemoteMeta, undefined);
  498. _ ->
  499. ok = rpc:call(RemoteNode, syn_groups, remove_from_local_table, [GroupName, RemotePid])
  500. end
  501. end, GroupTuples),
  502. %% exit
  503. error_logger:info_msg("Syn(~p): GROUPS AUTOMERGE <---- Done for remote node ~p~n", [node(), RemoteNode])
  504. end
  505. end
  506. ).
  507. -spec do_sync_from_full_cluster() -> ok.
  508. do_sync_from_full_cluster() ->
  509. lists:foreach(fun(RemoteNode) ->
  510. groups_automerge(RemoteNode)
  511. end, nodes()).
  512. -spec raw_purge_group_entries_for_node(Node :: atom()) -> ok.
  513. raw_purge_group_entries_for_node(Node) ->
  514. %% NB: no demonitoring is done, this is why it's raw
  515. ets:match_delete(syn_groups_by_name, {{'_', '_'}, '_', '_', Node}),
  516. ets:match_delete(syn_groups_by_pid, {{'_', '_'}, '_', '_', Node}),
  517. ok.
  518. -spec multi_call_and_receive(
  519. CollectorPid :: pid(),
  520. Pid :: pid(),
  521. Message :: any(),
  522. Timeout :: non_neg_integer()
  523. ) -> any().
  524. multi_call_and_receive(CollectorPid, Pid, Message, Timeout) ->
  525. MonitorRef = monitor(process, Pid),
  526. Pid ! {syn_multi_call, self(), Message},
  527. receive
  528. {syn_multi_call_reply, Pid, Reply} ->
  529. CollectorPid ! {reply, Pid, Reply};
  530. {'DOWN', MonitorRef, _, _, _} ->
  531. CollectorPid ! {bad_pid, Pid}
  532. after Timeout ->
  533. CollectorPid ! {bad_pid, Pid}
  534. end.
  535. -spec collect_replies(MemberPids :: [pid()]) -> {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
  536. collect_replies(MemberPids) ->
  537. collect_replies(MemberPids, [], []).
  538. -spec collect_replies(MemberPids :: [pid()], [{pid(), Reply :: any()}], [pid()]) ->
  539. {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
  540. collect_replies([], Replies, BadPids) -> {Replies, BadPids};
  541. collect_replies(MemberPids, Replies, BadPids) ->
  542. receive
  543. {reply, Pid, Reply} ->
  544. MemberPids1 = lists:delete(Pid, MemberPids),
  545. collect_replies(MemberPids1, [{Pid, Reply} | Replies], BadPids);
  546. {bad_pid, Pid} ->
  547. MemberPids1 = lists:delete(Pid, MemberPids),
  548. collect_replies(MemberPids1, Replies, [Pid | BadPids])
  549. end.
  550. -spec rebuild_monitors() -> ok.
  551. rebuild_monitors() ->
  552. GroupTuples = get_groups_tuples_for_node(node()),
  553. %% ensure that groups doesn't have any joining node's entries
  554. raw_purge_group_entries_for_node(node()),
  555. %% add
  556. lists:foreach(fun({GroupName, Pid, Meta}) ->
  557. case erlang:is_process_alive(Pid) of
  558. true ->
  559. join_on_node(GroupName, Pid, Meta);
  560. _ ->
  561. multicast_leave(GroupName, Pid)
  562. end
  563. end, GroupTuples).
  564. -spec set_timer_for_anti_entropy(#state{}) -> ok.
  565. set_timer_for_anti_entropy(#state{anti_entropy_interval_ms = undefined}) -> ok;
  566. set_timer_for_anti_entropy(#state{
  567. anti_entropy_interval_ms = AntiEntropyIntervalMs,
  568. anti_entropy_interval_max_deviation_ms = AntiEntropyIntervalMaxDeviationMs
  569. }) ->
  570. IntervalMs = round(AntiEntropyIntervalMs + rand:uniform() * AntiEntropyIntervalMaxDeviationMs),
  571. {ok, _} = timer:send_after(IntervalMs, self(), sync_anti_entropy),
  572. ok.