syn_registry.erl 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808
  1. %% ==========================================================================================================
  2. %% Syn - A global Process Registry and Process Group manager.
  3. %%
  4. %% The MIT License (MIT)
  5. %%
  6. %% Copyright (c) 2015-2019 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
  7. %%
  8. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  9. %% of this software and associated documentation files (the "Software"), to deal
  10. %% in the Software without restriction, including without limitation the rights
  11. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  12. %% copies of the Software, and to permit persons to whom the Software is
  13. %% furnished to do so, subject to the following conditions:
  14. %%
  15. %% The above copyright notice and this permission notice shall be included in
  16. %% all copies or substantial portions of the Software.
  17. %%
  18. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  19. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  20. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  21. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  22. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  23. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  24. %% THE SOFTWARE.
  25. %% ==========================================================================================================
  26. -module(syn_registry).
  27. -behaviour(gen_server).
  28. %% API
  29. -export([start_link/0]).
  30. -export([register/2, register/3]).
  31. -export([reregister/2, reregister/3]).
  32. -export([unregister/1]).
  33. -export([whereis/1, whereis/2]).
  34. -export([count/0, count/1]).
  35. %% sync API
  36. -export([sync_register/5, sync_unregister/3]).
  37. -export([sync_demonitor_and_kill_on_node/5]).
  38. -export([sync_get_local_registry_tuples/1]).
  39. -export([force_cluster_sync/0]).
  40. -export([add_to_local_table/5, remove_from_local_table/2]).
  41. -export([find_monitor_for_pid/1]).
  42. %% gen_server callbacks
  43. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  44. %% records
  45. -record(state, {
  46. custom_event_handler :: undefined | module(),
  47. anti_entropy_interval_ms :: undefined | non_neg_integer(),
  48. anti_entropy_interval_max_deviation_ms :: undefined | non_neg_integer()
  49. }).
  50. %% includes
  51. -include("syn.hrl").
  52. %% ===================================================================
  53. %% API
  54. %% ===================================================================
  55. -spec start_link() -> {ok, pid()} | {error, any()}.
  56. start_link() ->
  57. Options = [],
  58. gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
  59. -spec register(Name :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
  60. register(Name, Pid) ->
  61. register(Name, Pid, undefined).
  62. -spec register(Name :: any(), Pid :: pid(), Meta :: any()) -> ok | {error, Reason :: any()}.
  63. register(Name, Pid, Meta) when is_pid(Pid) ->
  64. Node = node(Pid),
  65. gen_server:call({?MODULE, Node}, {register_on_node, Name, Pid, Meta}).
  66. -spec reregister(Name :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
  67. reregister(Name, Pid) ->
  68. reregister(Name, Pid, undefined).
  69. -spec reregister(Name :: any(), Pid :: pid(), Meta :: any()) -> ok | {error, Reason :: any()}.
  70. reregister(Name, Pid, Meta) when is_pid(Pid) ->
  71. reregister(Name, Pid, Meta, 0).
  72. -spec reregister(Name :: any(), Pid :: pid(), Meta :: any(), RetryCount :: non_neg_integer()) ->
  73. ok | {error, Reason :: any()}.
  74. reregister(Name, Pid, Meta, RetryCount) when RetryCount > 40 ->
  75. exit(self(), {timeout, {gen_server, call, [?MODULE, reregister, {Name, Pid, Meta}]}});
  76. reregister(Name, Pid, Meta, RetryCount) when is_pid(Pid) ->
  77. ?MODULE:unregister(Name),
  78. case find_registry_tuple_by_name(Name) of
  79. undefined ->
  80. case ?MODULE:register(Name, Pid, Meta) of
  81. {error, taken} ->
  82. %% race conditions, retry
  83. timer:sleep(100),
  84. reregister(Name, Pid, Meta, RetryCount + 1);
  85. Result ->
  86. Result
  87. end;
  88. {Name, _, _, _} ->
  89. timer:sleep(100),
  90. reregister(Name, Pid, Meta, RetryCount + 1)
  91. end.
  92. -spec unregister(Name :: any()) -> ok | {error, Reason :: any()}.
  93. unregister(Name) ->
  94. % get process' node
  95. case find_registry_tuple_by_name(Name) of
  96. undefined ->
  97. {error, undefined};
  98. {Name, Pid, _, _} ->
  99. Node = node(Pid),
  100. gen_server:call({?MODULE, Node}, {unregister_on_node, Name})
  101. end.
  102. -spec whereis(Name :: any()) -> pid() | undefined.
  103. whereis(Name) ->
  104. case find_registry_tuple_by_name(Name) of
  105. undefined -> undefined;
  106. {Name, Pid, _, _} -> Pid
  107. end.
  108. -spec whereis(Name :: any(), with_meta) -> {pid(), Meta :: any()} | undefined.
  109. whereis(Name, with_meta) ->
  110. case find_registry_tuple_by_name(Name) of
  111. undefined -> undefined;
  112. {Name, Pid, Meta, _} -> {Pid, Meta}
  113. end.
  114. -spec count() -> non_neg_integer().
  115. count() ->
  116. ets:info(syn_registry_by_name, size).
  117. -spec count(Node :: node()) -> non_neg_integer().
  118. count(Node) ->
  119. ets:select_count(syn_registry_by_name, [{
  120. {'_', '_', '_', '_', '_', Node},
  121. [],
  122. [true]
  123. }]).
  124. -spec sync_register(RemoteNode :: node(), Name :: any(), RemotePid :: pid(), RemoteMeta :: any(), RemoteTime :: integer()) ->
  125. ok.
  126. sync_register(RemoteNode, Name, RemotePid, RemoteMeta, RemoteTime) ->
  127. gen_server:cast({?MODULE, RemoteNode}, {sync_register, Name, RemotePid, RemoteMeta, RemoteTime}).
  128. -spec sync_unregister(RemoteNode :: node(), Name :: any(), Pid :: pid()) -> ok.
  129. sync_unregister(RemoteNode, Name, Pid) ->
  130. gen_server:cast({?MODULE, RemoteNode}, {sync_unregister, Name, Pid}).
  131. -spec sync_demonitor_and_kill_on_node(
  132. Name :: any(),
  133. Pid :: pid(),
  134. Meta :: any(),
  135. MonitorRef :: reference(),
  136. Kill :: boolean()
  137. ) -> ok.
  138. sync_demonitor_and_kill_on_node(Name, Pid, Meta, MonitorRef, Kill) ->
  139. RemoteNode = node(Pid),
  140. gen_server:cast({?MODULE, RemoteNode}, {sync_demonitor_and_kill_on_node, Name, Pid, Meta, MonitorRef, Kill}).
  141. -spec sync_get_local_registry_tuples(FromNode :: node()) -> [syn_registry_tuple()].
  142. sync_get_local_registry_tuples(FromNode) ->
  143. error_logger:info_msg("Syn(~p): Received request of local registry tuples from remote node ~p~n", [node(), FromNode]),
  144. get_registry_tuples_for_node(node()).
  145. -spec force_cluster_sync() -> ok.
  146. force_cluster_sync() ->
  147. lists:foreach(fun(RemoteNode) ->
  148. gen_server:cast({?MODULE, RemoteNode}, force_cluster_sync)
  149. end, [node() | nodes()]).
  150. %% ===================================================================
  151. %% Callbacks
  152. %% ===================================================================
  153. %% ----------------------------------------------------------------------------------------------------------
  154. %% Init
  155. %% ----------------------------------------------------------------------------------------------------------
  156. -spec init([]) ->
  157. {ok, #state{}} |
  158. {ok, #state{}, Timeout :: non_neg_integer()} |
  159. ignore |
  160. {stop, Reason :: any()}.
  161. init([]) ->
  162. %% monitor nodes
  163. ok = net_kernel:monitor_nodes(true),
  164. %% rebuild monitors (if coming after a crash)
  165. rebuild_monitors(),
  166. %% get handler
  167. CustomEventHandler = syn_backbone:get_event_handler_module(),
  168. %% get anti-entropy interval
  169. {AntiEntropyIntervalMs, AntiEntropyIntervalMaxDeviationMs} = syn_backbone:get_anti_entropy_settings(registry),
  170. %% build state
  171. State = #state{
  172. custom_event_handler = CustomEventHandler,
  173. anti_entropy_interval_ms = AntiEntropyIntervalMs,
  174. anti_entropy_interval_max_deviation_ms = AntiEntropyIntervalMaxDeviationMs
  175. },
  176. %% send message to initiate full cluster sync
  177. timer:send_after(0, self(), sync_from_full_cluster),
  178. %% start anti-entropy
  179. set_timer_for_anti_entropy(State),
  180. %% init
  181. {ok, State}.
  182. %% ----------------------------------------------------------------------------------------------------------
  183. %% Call messages
  184. %% ----------------------------------------------------------------------------------------------------------
  185. -spec handle_call(Request :: any(), From :: any(), #state{}) ->
  186. {reply, Reply :: any(), #state{}} |
  187. {reply, Reply :: any(), #state{}, Timeout :: non_neg_integer()} |
  188. {noreply, #state{}} |
  189. {noreply, #state{}, Timeout :: non_neg_integer()} |
  190. {stop, Reason :: any(), Reply :: any(), #state{}} |
  191. {stop, Reason :: any(), #state{}}.
  192. handle_call({register_on_node, Name, Pid, Meta}, _From, State) ->
  193. %% check if pid is alive
  194. case is_process_alive(Pid) of
  195. true ->
  196. %% check if name available
  197. case find_registry_tuple_by_name(Name) of
  198. undefined ->
  199. {ok, Time} = register_on_node(Name, Pid, Meta),
  200. %% multicast
  201. multicast_register(Name, Pid, Meta, Time),
  202. %% return
  203. {reply, ok, State};
  204. {Name, Pid, _, _} ->
  205. {ok, Time} = register_on_node(Name, Pid, Meta),
  206. %% multicast
  207. multicast_register(Name, Pid, Meta, Time),
  208. %% return
  209. {reply, ok, State};
  210. _ ->
  211. {reply, {error, taken}, State}
  212. end;
  213. _ ->
  214. {reply, {error, not_alive}, State}
  215. end;
  216. handle_call({unregister_on_node, Name}, _From, State) ->
  217. case unregister_on_node(Name) of
  218. {ok, RemovedPid} ->
  219. multicast_unregister(Name, RemovedPid),
  220. %% return
  221. {reply, ok, State};
  222. {error, Reason} ->
  223. %% return
  224. {reply, {error, Reason}, State}
  225. end;
  226. handle_call(Request, From, State) ->
  227. error_logger:warning_msg("Syn(~p): Received from ~p an unknown call message: ~p~n", [node(), Request, From]),
  228. {reply, undefined, State}.
  229. %% ----------------------------------------------------------------------------------------------------------
  230. %% Cast messages
  231. %% ----------------------------------------------------------------------------------------------------------
  232. -spec handle_cast(Msg :: any(), #state{}) ->
  233. {noreply, #state{}} |
  234. {noreply, #state{}, Timeout :: non_neg_integer()} |
  235. {stop, Reason :: any(), #state{}}.
  236. handle_cast({sync_register, Name, RemotePid, RemoteMeta, RemoteTime}, State) ->
  237. %% check for conflicts
  238. case find_registry_tuple_by_name(Name) of
  239. undefined ->
  240. %% no conflict
  241. add_to_local_table(Name, RemotePid, RemoteMeta, RemoteTime, undefined);
  242. {Name, RemotePid, _, _} ->
  243. %% same process, no conflict, overwrite
  244. add_to_local_table(Name, RemotePid, RemoteMeta, RemoteTime, undefined);
  245. {Name, TablePid, TableMeta, TableTime} ->
  246. %% different pid, we have a conflict
  247. global:trans({{?MODULE, {inconsistent_name, Name}}, self()},
  248. fun() ->
  249. error_logger:warning_msg(
  250. "Syn(~p): REGISTRY INCONSISTENCY (name: ~p for ~p and ~p) ----> Received from remote node ~p~n",
  251. [node(), Name, {TablePid, TableMeta, TableTime}, {RemotePid, RemoteMeta, RemoteTime}, node(RemotePid)]
  252. ),
  253. case resolve_conflict(Name, {TablePid, TableMeta, TableTime}, {RemotePid, RemoteMeta, RemoteTime}, State) of
  254. {TablePid, KillOtherPid} ->
  255. %% keep local
  256. %% demonitor
  257. MonitorRef = rpc:call(node(RemotePid), syn_registry, find_monitor_for_pid, [RemotePid]),
  258. sync_demonitor_and_kill_on_node(Name, RemotePid, RemoteMeta, MonitorRef, KillOtherPid),
  259. %% overwrite local data to all remote nodes, except TablePid's node
  260. NodesExceptLocalAndTablePidNode = nodes() -- [node(TablePid)],
  261. lists:foreach(fun(RNode) ->
  262. ok = rpc:call(RNode,
  263. syn_registry, add_to_local_table,
  264. [Name, TablePid, TableMeta, TableTime, undefined]
  265. )
  266. end, NodesExceptLocalAndTablePidNode);
  267. {RemotePid, KillOtherPid} ->
  268. %% keep remote
  269. %% demonitor
  270. MonitorRef = rpc:call(node(TablePid), syn_registry, find_monitor_for_pid, [TablePid]),
  271. sync_demonitor_and_kill_on_node(Name, TablePid, TableMeta, MonitorRef, KillOtherPid),
  272. %% overwrite remote data to all other nodes (including local), except RemotePid's node
  273. NodesExceptRemoteNode = [node() | nodes()] -- [node(RemotePid)],
  274. lists:foreach(fun(RNode) ->
  275. ok = rpc:call(RNode,
  276. syn_registry, add_to_local_table,
  277. [Name, RemotePid, RemoteMeta, RemoteTime, undefined]
  278. )
  279. end, NodesExceptRemoteNode);
  280. undefined ->
  281. AllNodes = [node() | nodes()],
  282. %% both are dead, remove from all nodes
  283. lists:foreach(fun(RNode) ->
  284. ok = rpc:call(RNode, syn_registry, remove_from_local_table, [Name, RemotePid])
  285. end, AllNodes)
  286. end,
  287. error_logger:info_msg(
  288. "Syn(~p): REGISTRY INCONSISTENCY (name: ~p) <---- Done on all cluster~n",
  289. [node(), Name]
  290. )
  291. end
  292. )
  293. end,
  294. %% return
  295. {noreply, State};
  296. handle_cast({sync_unregister, Name, Pid}, State) ->
  297. %% remove
  298. remove_from_local_table(Name, Pid),
  299. %% return
  300. {noreply, State};
  301. handle_cast(force_cluster_sync, State) ->
  302. error_logger:info_msg("Syn(~p): Initiating full cluster FORCED registry sync for nodes: ~p~n", [node(), nodes()]),
  303. do_sync_from_full_cluster(State),
  304. {noreply, State};
  305. handle_cast({sync_demonitor_and_kill_on_node, Name, Pid, Meta, MonitorRef, Kill}, State) ->
  306. error_logger:info_msg("Syn(~p): Sync demonitoring pid ~p~n", [node(), Pid]),
  307. %% demonitor
  308. catch erlang:demonitor(MonitorRef, [flush]),
  309. %% kill
  310. case Kill of
  311. true ->
  312. exit(Pid, {syn_resolve_kill, Name, Meta});
  313. _ ->
  314. ok
  315. end,
  316. {noreply, State};
  317. handle_cast(Msg, State) ->
  318. error_logger:warning_msg("Syn(~p): Received an unknown cast message: ~p~n", [node(), Msg]),
  319. {noreply, State}.
  320. %% ----------------------------------------------------------------------------------------------------------
  321. %% All non Call / Cast messages
  322. %% ----------------------------------------------------------------------------------------------------------
  323. -spec handle_info(Info :: any(), #state{}) ->
  324. {noreply, #state{}} |
  325. {noreply, #state{}, Timeout :: non_neg_integer()} |
  326. {stop, Reason :: any(), #state{}}.
  327. handle_info({'DOWN', _MonitorRef, process, Pid, Reason}, State) ->
  328. case find_registry_tuples_by_pid(Pid) of
  329. [] ->
  330. %% handle
  331. handle_process_down(undefined, Pid, undefined, Reason, State);
  332. Entries ->
  333. lists:foreach(fun({Name, _Pid, Meta, _Time}) ->
  334. %% handle
  335. handle_process_down(Name, Pid, Meta, Reason, State),
  336. %% remove from table
  337. remove_from_local_table(Name, Pid),
  338. %% multicast
  339. multicast_unregister(Name, Pid)
  340. end, Entries)
  341. end,
  342. %% return
  343. {noreply, State};
  344. handle_info({nodeup, RemoteNode}, State) ->
  345. error_logger:info_msg("Syn(~p): Node ~p has joined the cluster~n", [node(), RemoteNode]),
  346. registry_automerge(RemoteNode, State),
  347. %% resume
  348. {noreply, State};
  349. handle_info({nodedown, RemoteNode}, State) ->
  350. error_logger:warning_msg("Syn(~p): Node ~p has left the cluster, removing registry entries on local~n", [node(), RemoteNode]),
  351. raw_purge_registry_entries_for_remote_node(RemoteNode),
  352. {noreply, State};
  353. handle_info(sync_from_full_cluster, State) ->
  354. error_logger:info_msg("Syn(~p): Initiating full cluster registry sync for nodes: ~p~n", [node(), nodes()]),
  355. do_sync_from_full_cluster(State),
  356. {noreply, State};
  357. handle_info(sync_anti_entropy, State) ->
  358. %% sync
  359. RemoteNodes = nodes(),
  360. case length(RemoteNodes) > 0 of
  361. true ->
  362. RandomRemoteNode = lists:nth(rand:uniform(length(RemoteNodes)), RemoteNodes),
  363. error_logger:info_msg("Syn(~p): Initiating anti-entropy sync for node ~p~n", [node(), RandomRemoteNode]),
  364. registry_automerge(RandomRemoteNode, State);
  365. _ ->
  366. ok
  367. end,
  368. %% set timer
  369. set_timer_for_anti_entropy(State),
  370. %% return
  371. {noreply, State};
  372. handle_info(Info, State) ->
  373. error_logger:warning_msg("Syn(~p): Received an unknown info message: ~p~n", [node(), Info]),
  374. {noreply, State}.
  375. %% ----------------------------------------------------------------------------------------------------------
  376. %% Terminate
  377. %% ----------------------------------------------------------------------------------------------------------
  378. -spec terminate(Reason :: any(), #state{}) -> terminated.
  379. terminate(Reason, _State) ->
  380. error_logger:info_msg("Syn(~p): Terminating with reason: ~p~n", [node(), Reason]),
  381. terminated.
  382. %% ----------------------------------------------------------------------------------------------------------
  383. %% Convert process state when code is changed.
  384. %% ----------------------------------------------------------------------------------------------------------
  385. -spec code_change(OldVsn :: any(), #state{}, Extra :: any()) -> {ok, #state{}}.
  386. code_change(_OldVsn, State, _Extra) ->
  387. {ok, State}.
  388. %% ===================================================================
  389. %% Internal
  390. %% ===================================================================
  391. -spec multicast_register(Name :: any(), Pid :: pid(), Meta :: any(), Time :: integer()) -> pid().
  392. multicast_register(Name, Pid, Meta, Time) ->
  393. spawn_link(fun() ->
  394. lists:foreach(fun(RemoteNode) ->
  395. sync_register(RemoteNode, Name, Pid, Meta, Time)
  396. end, nodes())
  397. end).
  398. -spec multicast_unregister(Name :: any(), Pid :: pid()) -> pid().
  399. multicast_unregister(Name, Pid) ->
  400. spawn_link(fun() ->
  401. lists:foreach(fun(RemoteNode) ->
  402. sync_unregister(RemoteNode, Name, Pid)
  403. end, nodes())
  404. end).
  405. -spec register_on_node(Name :: any(), Pid :: pid(), Meta :: any()) -> {ok, Time :: integer()}.
  406. register_on_node(Name, Pid, Meta) ->
  407. MonitorRef = case find_monitor_for_pid(Pid) of
  408. undefined ->
  409. %% process is not monitored yet, add
  410. erlang:monitor(process, Pid);
  411. MRef ->
  412. MRef
  413. end,
  414. %% add to table
  415. Time = erlang:system_time(),
  416. add_to_local_table(Name, Pid, Meta, Time, MonitorRef),
  417. {ok, Time}.
  418. -spec unregister_on_node(Name :: any()) -> {ok, RemovedPid :: pid()} | {error, Reason :: any()}.
  419. unregister_on_node(Name) ->
  420. case find_registry_entry_by_name(Name) of
  421. undefined ->
  422. {error, undefined};
  423. {Name, Pid, _Meta, _Clock, MonitorRef, _Node} when MonitorRef =/= undefined ->
  424. %% demonitor
  425. erlang:demonitor(MonitorRef, [flush]),
  426. %% remove from table
  427. remove_from_local_table(Name, Pid),
  428. %% return
  429. {ok, Pid};
  430. {Name, Pid, _Meta, _Clock, _MonitorRef, Node} = RegistryEntry when Node =:= node() ->
  431. error_logger:error_msg(
  432. "Syn(~p): INTERNAL ERROR | Registry entry ~p has no monitor but it's running on node~n",
  433. [node(), RegistryEntry]
  434. ),
  435. %% remove from table
  436. remove_from_local_table(Name, Pid),
  437. %% return
  438. {ok, Pid};
  439. RegistryEntry ->
  440. %% race condition: un-registration request but entry in table is not a local pid (has no monitor)
  441. %% sync messages will take care of it
  442. error_logger:info_msg(
  443. "Syn(~p): Registry entry ~p is not monitored and it's not running on node~n",
  444. [node(), RegistryEntry]
  445. ),
  446. {error, remote_pid}
  447. end.
  448. -spec add_to_local_table(
  449. Name :: any(),
  450. Pid :: pid(),
  451. Meta :: any(),
  452. Time :: integer(),
  453. MonitorRef :: undefined | reference()
  454. ) -> ok.
  455. add_to_local_table(Name, Pid, Meta, Time, MonitorRef) ->
  456. %% remove entry if previous exists
  457. case find_registry_tuple_by_name(Name) of
  458. undefined ->
  459. undefined;
  460. {Name, OldPid, _, _} ->
  461. ets:delete(syn_registry_by_pid, {OldPid, Name})
  462. end,
  463. %% overwrite & add
  464. ets:insert(syn_registry_by_name, {Name, Pid, Meta, Time, MonitorRef, node(Pid)}),
  465. ets:insert(syn_registry_by_pid, {{Pid, Name}, Meta, Time, MonitorRef, node(Pid)}),
  466. ok.
  467. -spec remove_from_local_table(Name :: any(), Pid :: pid()) -> ok.
  468. remove_from_local_table(Name, Pid) ->
  469. case find_registry_tuple_by_name(Name) of
  470. undefined ->
  471. ok;
  472. {Name, Pid, _, _} ->
  473. ets:delete(syn_registry_by_name, Name),
  474. ets:delete(syn_registry_by_pid, {Pid, Name}),
  475. ok;
  476. {Name, TablePid, _, _} ->
  477. error_logger:info_msg(
  478. "Syn(~p): Request to delete registry name ~p for pid ~p but locally have ~p, ignoring~n",
  479. [node(), Name, Pid, TablePid]
  480. )
  481. end.
  482. -spec find_registry_tuple_by_name(Name :: any()) -> RegistryTuple :: syn_registry_tuple() | undefined.
  483. find_registry_tuple_by_name(Name) ->
  484. MatchBody = case is_tuple(Name) of
  485. true -> {{{Name}, '$2', '$3', '$4'}};
  486. _ -> {{Name, '$2', '$3', '$4'}}
  487. end,
  488. case ets:select(syn_registry_by_name, [{
  489. {Name, '$2', '$3', '$4', '_', '_'},
  490. [],
  491. [MatchBody]
  492. }]) of
  493. [RegistryTuple] -> RegistryTuple;
  494. _ -> undefined
  495. end.
  496. -spec find_registry_entry_by_name(Name :: any()) -> Entry :: syn_registry_entry() | undefined.
  497. find_registry_entry_by_name(Name) ->
  498. case ets:select(syn_registry_by_name, [{
  499. {Name, '$2', '$3', '_', '_', '_'},
  500. [],
  501. ['$_']
  502. }]) of
  503. [RegistryTuple] -> RegistryTuple;
  504. _ -> undefined
  505. end.
  506. -spec find_monitor_for_pid(Pid :: pid()) -> reference() | undefined.
  507. find_monitor_for_pid(Pid) when is_pid(Pid) ->
  508. case ets:select(syn_registry_by_pid, [{
  509. {{Pid, '_'}, '_', '_', '$5', '_'},
  510. [],
  511. ['$5']
  512. }], 1) of
  513. {[MonitorRef], _} -> MonitorRef;
  514. _ -> undefined
  515. end.
  516. -spec find_registry_tuples_by_pid(Pid :: pid()) -> Entries :: [syn_registry_tuple()].
  517. find_registry_tuples_by_pid(Pid) when is_pid(Pid) ->
  518. ets:select(syn_registry_by_pid, [{
  519. {{Pid, '$2'}, '$3', '$4', '_', '_'},
  520. [],
  521. [{{'$2', Pid, '$3', '$4'}}]
  522. }]).
  523. -spec get_registry_tuples_for_node(Node :: node()) -> [syn_registry_tuple()].
  524. get_registry_tuples_for_node(Node) ->
  525. ets:select(syn_registry_by_name, [{
  526. {'$1', '$2', '$3', '$4', '_', Node},
  527. [],
  528. [{{'$1', '$2', '$3', '$4'}}]
  529. }]).
  530. -spec handle_process_down(Name :: any(), Pid :: pid(), Meta :: any(), Reason :: any(), #state{}) -> ok.
  531. handle_process_down(Name, Pid, Meta, Reason, #state{
  532. custom_event_handler = CustomEventHandler
  533. }) ->
  534. case Name of
  535. undefined ->
  536. case Reason of
  537. {syn_resolve_kill, KillName, KillMeta} ->
  538. syn_event_handler:do_on_process_exit(KillName, Pid, KillMeta, syn_resolve_kill, CustomEventHandler);
  539. _ ->
  540. error_logger:warning_msg(
  541. "Syn(~p): Received a DOWN message from an unregistered process ~p with reason: ~p~n",
  542. [node(), Pid, Reason]
  543. )
  544. end;
  545. _ ->
  546. syn_event_handler:do_on_process_exit(Name, Pid, Meta, Reason, CustomEventHandler)
  547. end.
  548. -spec registry_automerge(RemoteNode :: node(), #state{}) -> ok.
  549. registry_automerge(RemoteNode, State) ->
  550. global:trans({{?MODULE, auto_merge_registry}, self()},
  551. fun() ->
  552. error_logger:info_msg("Syn(~p): REGISTRY AUTOMERGE ----> Initiating for remote node ~p~n", [node(), RemoteNode]),
  553. %% get registry tuples from remote node
  554. case rpc:call(RemoteNode, ?MODULE, sync_get_local_registry_tuples, [node()]) of
  555. {badrpc, _} ->
  556. error_logger:info_msg(
  557. "Syn(~p): REGISTRY AUTOMERGE <---- Syn not ready on remote node ~p, postponing~n",
  558. [node(), RemoteNode]
  559. );
  560. Entries ->
  561. error_logger:info_msg(
  562. "Syn(~p): Received ~p registry tuple(s) from remote node ~p~n",
  563. [node(), length(Entries), RemoteNode]
  564. ),
  565. %% ensure that registry doesn't have any joining node's entries (here again for race conditions)
  566. raw_purge_registry_entries_for_remote_node(RemoteNode),
  567. %% loop
  568. F = fun({Name, RemotePid, RemoteMeta, RemoteTime}) ->
  569. resolve_tuple_in_automerge(Name, RemotePid, RemoteMeta, RemoteTime, State)
  570. end,
  571. %% add to table
  572. lists:foreach(F, Entries),
  573. %% exit
  574. error_logger:info_msg("Syn(~p): REGISTRY AUTOMERGE <---- Done for remote node ~p~n", [node(), RemoteNode])
  575. end
  576. end
  577. ).
  578. -spec resolve_tuple_in_automerge(
  579. Name :: any(),
  580. RemotePid :: pid(),
  581. RemoteMeta :: any(),
  582. RemoteTime :: integer(),
  583. #state{}
  584. ) -> any().
  585. resolve_tuple_in_automerge(Name, RemotePid, RemoteMeta, RemoteTime, State) ->
  586. %% check if same name is registered
  587. case find_registry_tuple_by_name(Name) of
  588. undefined ->
  589. %% no conflict
  590. add_to_local_table(Name, RemotePid, RemoteMeta, RemoteTime, undefined);
  591. {Name, TablePid, TableMeta, TableTime} ->
  592. error_logger:warning_msg(
  593. "Syn(~p): Conflicting name in auto merge for: ~p, processes are ~p, ~p~n",
  594. [node(), Name, {TablePid, TableMeta, TableTime}, {RemotePid, RemoteMeta, RemoteTime}]
  595. ),
  596. case resolve_conflict(Name, {TablePid, TableMeta, TableTime}, {RemotePid, RemoteMeta, RemoteTime}, State) of
  597. {TablePid, KillOtherPid} ->
  598. %% keep local
  599. %% demonitor
  600. MonitorRef = rpc:call(node(RemotePid), syn_registry, find_monitor_for_pid, [RemotePid]),
  601. sync_demonitor_and_kill_on_node(Name, RemotePid, RemoteMeta, MonitorRef, KillOtherPid),
  602. %% remote data still on remote node, remove there
  603. ok = rpc:call(node(RemotePid), syn_registry, remove_from_local_table, [Name, RemotePid]);
  604. {RemotePid, KillOtherPid} ->
  605. %% keep remote
  606. %% demonitor
  607. MonitorRef = rpc:call(node(TablePid), syn_registry, find_monitor_for_pid, [TablePid]),
  608. sync_demonitor_and_kill_on_node(Name, TablePid, TableMeta, MonitorRef, KillOtherPid),
  609. %% overwrite remote data to local
  610. add_to_local_table(Name, RemotePid, RemoteMeta, RemoteTime, undefined);
  611. undefined ->
  612. %% both are dead, remove from local & remote
  613. remove_from_local_table(Name, TablePid),
  614. ok = rpc:call(node(RemotePid), syn_registry, remove_from_local_table, [Name, RemotePid])
  615. end
  616. end.
  617. -spec resolve_conflict(
  618. Name :: any(),
  619. {TablePid :: pid(), TableMeta :: any(), TableTime :: integer()},
  620. {RemotePid :: pid(), RemoteMeta :: any(), RemoteTime :: integer()},
  621. #state{}
  622. ) -> {PidToKeep :: pid(), KillOtherPid :: boolean() | undefined} | undefined.
  623. resolve_conflict(
  624. Name,
  625. {TablePid, TableMeta, TableTime},
  626. {RemotePid, RemoteMeta, RemoteTime},
  627. #state{custom_event_handler = CustomEventHandler}
  628. ) ->
  629. TablePidAlive = rpc:call(node(TablePid), erlang, is_process_alive, [TablePid]),
  630. RemotePidAlive = rpc:call(node(RemotePid), erlang, is_process_alive, [RemotePid]),
  631. %% check if pids are alive (race conditions if pid dies during resolution)
  632. {PidToKeep, KillOtherPid} = case {TablePidAlive, RemotePidAlive} of
  633. {true, true} ->
  634. %% call conflict resolution
  635. syn_event_handler:do_resolve_registry_conflict(
  636. Name,
  637. {TablePid, TableMeta, TableTime},
  638. {RemotePid, RemoteMeta, RemoteTime},
  639. CustomEventHandler
  640. );
  641. {true, false} ->
  642. %% keep only alive process
  643. {TablePid, false};
  644. {false, true} ->
  645. %% keep only alive process
  646. {RemotePid, false};
  647. {false, false} ->
  648. %% remove both
  649. {undefined, false}
  650. end,
  651. %% keep chosen one
  652. case PidToKeep of
  653. TablePid ->
  654. %% keep local
  655. error_logger:info_msg(
  656. "Syn(~p): Keeping process in table ~p over remote process ~p~n",
  657. [node(), TablePid, RemotePid]
  658. ),
  659. {TablePid, KillOtherPid};
  660. RemotePid ->
  661. %% keep remote
  662. error_logger:info_msg(
  663. "Syn(~p): Keeping remote process ~p over process in table ~p~n",
  664. [node(), RemotePid, TablePid]
  665. ),
  666. {RemotePid, KillOtherPid};
  667. undefined ->
  668. error_logger:info_msg(
  669. "Syn(~p): Removing both processes' ~p and ~p data from local and remote tables~n",
  670. [node(), RemotePid, TablePid]
  671. ),
  672. undefined;
  673. Other ->
  674. error_logger:error_msg(
  675. "Syn(~p): Custom handler returned ~p, valid options were ~p and ~p, removing both~n",
  676. [node(), Other, TablePid, RemotePid]
  677. ),
  678. undefined
  679. end.
  680. -spec do_sync_from_full_cluster(#state{}) -> ok.
  681. do_sync_from_full_cluster(State) ->
  682. lists:foreach(fun(RemoteNode) ->
  683. registry_automerge(RemoteNode, State)
  684. end, nodes()).
  685. -spec raw_purge_registry_entries_for_remote_node(Node :: atom()) -> ok.
  686. raw_purge_registry_entries_for_remote_node(Node) when Node =/= node() ->
  687. %% NB: no demonitoring is done, this is why it's raw
  688. ets:match_delete(syn_registry_by_name, {'_', '_', '_', '_', '_', Node}),
  689. ets:match_delete(syn_registry_by_pid, {{'_', '_'}, '_', '_', '_', Node}),
  690. ok.
  691. -spec rebuild_monitors() -> ok.
  692. rebuild_monitors() ->
  693. Entries = get_registry_tuples_for_node(node()),
  694. lists:foreach(fun({Name, Pid, Meta, Time}) ->
  695. case is_process_alive(Pid) of
  696. true ->
  697. MonitorRef = erlang:monitor(process, Pid),
  698. %% overwrite
  699. add_to_local_table(Name, Pid, Meta, Time, MonitorRef);
  700. _ ->
  701. remove_from_local_table(Name, Pid)
  702. end
  703. end, Entries).
  704. -spec set_timer_for_anti_entropy(#state{}) -> ok.
  705. set_timer_for_anti_entropy(#state{anti_entropy_interval_ms = undefined}) -> ok;
  706. set_timer_for_anti_entropy(#state{
  707. anti_entropy_interval_ms = AntiEntropyIntervalMs,
  708. anti_entropy_interval_max_deviation_ms = AntiEntropyIntervalMaxDeviationMs
  709. }) ->
  710. IntervalMs = round(AntiEntropyIntervalMs + rand:uniform() * AntiEntropyIntervalMaxDeviationMs),
  711. {ok, _} = timer:send_after(IntervalMs, self(), sync_anti_entropy),
  712. ok.