syn_registry.erl 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703
  1. %% ==========================================================================================================
  2. %% Syn - A global Process Registry and Process Group manager.
  3. %%
  4. %% The MIT License (MIT)
  5. %%
  6. %% Copyright (c) 2015-2022 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
  7. %%
  8. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  9. %% of this software and associated documentation files (the "Software"), to deal
  10. %% in the Software without restriction, including without limitation the rights
  11. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  12. %% copies of the Software, and to permit persons to whom the Software is
  13. %% furnished to do so, subject to the following conditions:
  14. %%
  15. %% The above copyright notice and this permission notice shall be included in
  16. %% all copies or substantial portions of the Software.
  17. %%
  18. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  19. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THxE WARRANTIES OF MERCHANTABILITY,
  20. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  21. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  22. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  23. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  24. %% THE SOFTWARE.
  25. %% ==========================================================================================================
  26. %% @private
  27. -module(syn_registry).
  28. -behaviour(syn_gen_scope).
  29. %% API
  30. -export([start_link/1]).
  31. -export([subcluster_nodes/1]).
  32. -export([lookup/2]).
  33. -export([register/4]).
  34. -export([update/3]).
  35. -export([unregister/2]).
  36. -export([count/1, count/2]).
  37. %% syn_gen_scope callbacks
  38. -export([
  39. init/1,
  40. handle_call/3,
  41. handle_info/2,
  42. save_remote_data/2,
  43. get_local_data/1,
  44. purge_local_data_for_node/2
  45. ]).
  46. %% macros
  47. -define(MODULE_LOG_NAME, registry).
  48. %% tests
  49. -ifdef(TEST).
  50. -export([add_to_local_table/7, remove_from_local_table/4]).
  51. -endif.
  52. %% includes
  53. -include("syn.hrl").
  54. %% ===================================================================
  55. %% API
  56. %% ===================================================================
  57. -spec start_link(Scope :: atom()) ->
  58. {ok, Pid :: pid()} | {error, {already_started, Pid :: pid()}} | {error, Reason :: term()}.
  59. start_link(Scope) when is_atom(Scope) ->
  60. syn_gen_scope:start_link(?MODULE, ?MODULE_LOG_NAME, Scope).
  61. -spec subcluster_nodes(Scope :: atom()) -> [node()].
  62. subcluster_nodes(Scope) ->
  63. syn_gen_scope:subcluster_nodes(?MODULE, Scope).
  64. -spec lookup(Scope :: atom(), Name :: term()) -> {pid(), Meta :: term()} | undefined.
  65. lookup(Scope, Name) ->
  66. case syn_backbone:get_table_name(syn_registry_by_name, Scope) of
  67. undefined ->
  68. error({invalid_scope, Scope});
  69. TableByName ->
  70. case find_registry_entry_by_name(Name, TableByName) of
  71. undefined -> undefined;
  72. {Name, Pid, Meta, _, _, _} -> {Pid, Meta}
  73. end
  74. end.
  75. -spec register(Scope :: atom(), Name :: term(), Pid :: pid(), Meta :: term()) -> ok | {error, Reason :: term()}.
  76. register(Scope, Name, Pid, Meta) ->
  77. case register_or_update(Scope, Name, Pid, Meta) of
  78. {ok, _} -> ok;
  79. {error, Reason} -> {error, Reason}
  80. end.
  81. -spec update(Scope :: atom(), Name :: term(), Fun :: function()) ->
  82. {ok, {Pid :: pid(), Meta :: term()}} | {error, Reason :: term()}.
  83. update(Scope, Name, Fun) ->
  84. case syn_backbone:get_table_name(syn_registry_by_name, Scope) of
  85. undefined ->
  86. error({invalid_scope, Scope});
  87. TableByName ->
  88. % get process' node
  89. case find_registry_entry_by_name(Name, TableByName) of
  90. undefined ->
  91. {error, undefined};
  92. {Name, Pid, _, _, _, _} ->
  93. register_or_update(Scope, Name, Pid, Fun)
  94. end
  95. end.
  96. -spec register_or_update(Scope :: atom(), Name :: term(), Pid :: pid(), MetaOrFun :: term() | function()) ->
  97. {ok, {Pid :: pid(), Meta :: term()}} | {error, Reason :: term()}.
  98. register_or_update(Scope, Name, Pid, MetaOrFun) ->
  99. case syn_backbone:is_strict_mode() of
  100. true when Pid =/= self() ->
  101. {error, not_self};
  102. _ ->
  103. Node = node(Pid),
  104. case syn_gen_scope:call(?MODULE, Node, Scope, {'3.0', register_or_update_on_node, node(), Name, Pid, MetaOrFun}) of
  105. {ok, {CallbackMethod, Meta, Time, TableByName, TableByPid}} when Node =/= node() ->
  106. %% update table on caller node immediately so that subsequent calls have an updated registry
  107. add_to_local_table(Name, Pid, Meta, Time, undefined, TableByName, TableByPid),
  108. %% callback
  109. syn_event_handler:call_event_handler(CallbackMethod, [Scope, Name, Pid, Meta, normal]),
  110. %% return
  111. {ok, {Pid, Meta}};
  112. {ok, {_, Meta, _, _, _}} ->
  113. {ok, {Pid, Meta}};
  114. {noop, Meta} ->
  115. {ok, {Pid, Meta}};
  116. {{error, Reason}, _} ->
  117. {error, Reason};
  118. {raise, Class, Reason, Stacktrace} ->
  119. erlang:raise(Class, Reason, Stacktrace)
  120. end
  121. end.
  122. -spec unregister(Scope :: atom(), Name :: term()) -> ok | {error, Reason :: term()}.
  123. unregister(Scope, Name) ->
  124. case syn_backbone:get_table_name(syn_registry_by_name, Scope) of
  125. undefined ->
  126. error({invalid_scope, Scope});
  127. TableByName ->
  128. % get process' node
  129. case find_registry_entry_by_name(Name, TableByName) of
  130. undefined ->
  131. {error, undefined};
  132. {Name, Pid, Meta, _, _, _} ->
  133. Node = node(Pid),
  134. case syn_gen_scope:call(?MODULE, Node, Scope, {'3.0', unregister_on_node, node(), Name, Pid}) of
  135. {ok, TableByPid} when Node =/= node() ->
  136. %% remove table on caller node immediately so that subsequent calls have an updated registry
  137. remove_from_local_table(Name, Pid, TableByName, TableByPid),
  138. %% callback
  139. syn_event_handler:call_event_handler(on_process_unregistered, [Scope, Name, Pid, Meta, normal]),
  140. %% return
  141. ok;
  142. {Response, _} ->
  143. Response
  144. end
  145. end
  146. end.
  147. -spec count(Scope :: atom()) -> non_neg_integer().
  148. count(Scope) ->
  149. TableByName = syn_backbone:get_table_name(syn_registry_by_name, Scope),
  150. case ets:info(TableByName, size) of
  151. undefined -> error({invalid_scope, Scope});
  152. Value -> Value
  153. end.
  154. -spec count(Scope :: atom(), Node :: node()) -> non_neg_integer().
  155. count(Scope, Node) ->
  156. case syn_backbone:get_table_name(syn_registry_by_name, Scope) of
  157. undefined ->
  158. error({invalid_scope, Scope});
  159. TableByName ->
  160. ets:select_count(TableByName, [{
  161. {'_', '_', '_', '_', '_', Node},
  162. [],
  163. [true]
  164. }])
  165. end.
  166. %% ===================================================================
  167. %% Callbacks
  168. %% ===================================================================
  169. %% ----------------------------------------------------------------------------------------------------------
  170. %% Init
  171. %% ----------------------------------------------------------------------------------------------------------
  172. -spec init(#state{}) -> {ok, HandlerState :: term()}.
  173. init(#state{
  174. scope = Scope,
  175. table_by_name = TableByName,
  176. table_by_pid = TableByPid
  177. }) ->
  178. %% purge remote & rebuild
  179. purge_registry_for_remote_nodes(Scope, TableByName, TableByPid),
  180. rebuild_monitors(Scope, TableByName, TableByPid),
  181. %% init
  182. HandlerState = #{},
  183. {ok, HandlerState}.
  184. %% ----------------------------------------------------------------------------------------------------------
  185. %% Call messages
  186. %% ----------------------------------------------------------------------------------------------------------
  187. -spec handle_call(Request :: term(), From :: {pid(), Tag :: term()}, #state{}) ->
  188. {reply, Reply :: term(), #state{}} |
  189. {reply, Reply :: term(), #state{}, timeout() | hibernate | {continue, term()}} |
  190. {noreply, #state{}} |
  191. {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
  192. {stop, Reason :: term(), Reply :: term(), #state{}} |
  193. {stop, Reason :: term(), #state{}}.
  194. handle_call({'3.0', register_or_update_on_node, RequesterNode, Name, Pid, MetaOrFun}, _From, #state{
  195. table_by_name = TableByName,
  196. table_by_pid = TableByPid
  197. } = State) ->
  198. case is_process_alive(Pid) of
  199. true ->
  200. case find_registry_entry_by_name(Name, TableByName) of
  201. undefined when is_function(MetaOrFun) ->
  202. {reply, {{error, undefined}, undefined}, State};
  203. undefined ->
  204. %% available
  205. MRef = case find_monitor_for_pid(Pid, TableByPid) of
  206. undefined -> erlang:monitor(process, Pid); %% process is not monitored yet, add
  207. MRef0 -> MRef0
  208. end,
  209. do_register_on_node(Name, Pid, MetaOrFun, MRef, normal, RequesterNode, on_process_registered, State);
  210. {Name, Pid, TableMeta, _, MRef, _} when is_function(MetaOrFun) ->
  211. %% update with fun
  212. try MetaOrFun(Pid, TableMeta) of
  213. Meta when Meta =:= TableMeta ->
  214. {reply, {noop, TableMeta}, State};
  215. Meta ->
  216. do_register_on_node(Name, Pid, Meta, MRef, normal, RequesterNode, on_registry_process_updated, State)
  217. catch Class:Reason:Stacktrace ->
  218. error_logger:error_msg(
  219. "SYN[~s] Error ~p:~p in registry update function: ~p",
  220. [node(), Class, Reason, Stacktrace]
  221. ),
  222. {reply, {raise, Class, Reason, Stacktrace}, State}
  223. end;
  224. {Name, Pid, MetaOrFun, _, _, _} ->
  225. %% same pid, same meta
  226. {reply, {noop, MetaOrFun}, State};
  227. {Name, Pid, _, _, MRef, _} ->
  228. %% same pid, different meta
  229. do_register_on_node(Name, Pid, MetaOrFun, MRef, normal, RequesterNode, on_registry_process_updated, State);
  230. _ ->
  231. {reply, {{error, taken}, undefined}, State}
  232. end;
  233. false ->
  234. {reply, {{error, not_alive}, undefined}, State}
  235. end;
  236. handle_call({'3.0', unregister_on_node, RequesterNode, Name, Pid}, _From, #state{
  237. scope = Scope,
  238. table_by_name = TableByName,
  239. table_by_pid = TableByPid
  240. } = State) ->
  241. case find_registry_entry_by_name(Name, TableByName) of
  242. undefined ->
  243. {reply, {{error, undefined}, undefined}, State};
  244. {Name, Pid, Meta, _, _, _} ->
  245. %% demonitor if the process is not registered under other names
  246. maybe_demonitor(Pid, TableByPid),
  247. %% remove from table
  248. remove_from_local_table(Name, Pid, TableByName, TableByPid),
  249. %% callback
  250. syn_event_handler:call_event_handler(on_process_unregistered, [Scope, Name, Pid, Meta, normal]),
  251. %% broadcast
  252. syn_gen_scope:broadcast({'3.0', sync_unregister, Name, Pid, Meta, normal}, [RequesterNode], State),
  253. %% return
  254. {reply, {ok, TableByPid}, State};
  255. _ ->
  256. %% process is registered locally with another pid: race condition, wait for sync to happen & return error
  257. {reply, {{error, race_condition}, undefined}, State}
  258. end;
  259. handle_call(Request, From, #state{scope = Scope} = State) ->
  260. error_logger:warning_msg("SYN[~s|~s<~s>] Received from ~p an unknown call message: ~p",
  261. [node(), ?MODULE_LOG_NAME, Scope, From, Request]
  262. ),
  263. {reply, undefined, State}.
  264. %% ----------------------------------------------------------------------------------------------------------
  265. %% Info messages
  266. %% ----------------------------------------------------------------------------------------------------------
  267. -spec handle_info(Info :: timeout | term(), #state{}) ->
  268. {noreply, #state{}} |
  269. {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
  270. {stop, Reason :: term(), #state{}}.
  271. handle_info({'3.0', sync_register, Name, Pid, Meta, Time, Reason}, #state{nodes_map = NodesMap} = State) ->
  272. case maps:is_key(node(Pid), NodesMap) of
  273. true ->
  274. handle_registry_sync(Name, Pid, Meta, Time, Reason, State);
  275. false ->
  276. %% ignore, race condition
  277. ok
  278. end,
  279. {noreply, State};
  280. handle_info({'3.0', sync_unregister, Name, Pid, Meta, Reason}, #state{
  281. scope = Scope,
  282. table_by_name = TableByName,
  283. table_by_pid = TableByPid
  284. } = State) ->
  285. case find_registry_entry_by_name(Name, TableByName) of
  286. {_, Pid, _, _, _, _} ->
  287. remove_from_local_table(Name, Pid, TableByName, TableByPid),
  288. %% callback
  289. syn_event_handler:call_event_handler(on_process_unregistered, [Scope, Name, Pid, Meta, Reason]);
  290. _ ->
  291. %% not in table, nothing to do
  292. ok
  293. end,
  294. %% return
  295. {noreply, State};
  296. handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{
  297. scope = Scope,
  298. table_by_name = TableByName,
  299. table_by_pid = TableByPid
  300. } = State) ->
  301. case find_registry_entries_by_pid(Pid, TableByPid) of
  302. [] ->
  303. error_logger:warning_msg(
  304. "SYN[~s|~s<~s>] Received a DOWN message from an unknown process ~p with reason: ~p",
  305. [node(), ?MODULE_LOG_NAME, Scope, Pid, Reason]
  306. );
  307. Entries ->
  308. lists:foreach(fun({_, Name, Meta, _, _, _}) ->
  309. %% remove from table
  310. remove_from_local_table(Name, Pid, TableByName, TableByPid),
  311. %% callback
  312. syn_event_handler:call_event_handler(on_process_unregistered, [Scope, Name, Pid, Meta, Reason]),
  313. %% broadcast
  314. syn_gen_scope:broadcast({'3.0', sync_unregister, Name, Pid, Meta, Reason}, State)
  315. end, Entries)
  316. end,
  317. %% return
  318. {noreply, State};
  319. handle_info(Info, #state{scope = Scope} = State) ->
  320. error_logger:warning_msg("SYN[~s|~s<~s>] Received an unknown info message: ~p", [node(), ?MODULE_LOG_NAME, Scope, Info]),
  321. {noreply, State}.
  322. %% ----------------------------------------------------------------------------------------------------------
  323. %% Data callbacks
  324. %% ----------------------------------------------------------------------------------------------------------
  325. -spec get_local_data(#state{}) -> {ok, Data :: term()} | undefined.
  326. get_local_data(#state{table_by_name = TableByName}) ->
  327. {ok, get_registry_tuples_for_node(node(), TableByName)}.
  328. -spec save_remote_data(RemoteData :: term(), #state{}) -> any().
  329. save_remote_data(RegistryTuplesOfRemoteNode, #state{scope = Scope} = State) ->
  330. %% insert tuples
  331. lists:foreach(fun({Name, Pid, Meta, Time}) ->
  332. handle_registry_sync(Name, Pid, Meta, Time, {syn_remote_scope_node_up, Scope, node(Pid)}, State)
  333. end, RegistryTuplesOfRemoteNode).
  334. -spec purge_local_data_for_node(Node :: node(), #state{}) -> any().
  335. purge_local_data_for_node(Node, #state{
  336. scope = Scope,
  337. table_by_name = TableByName,
  338. table_by_pid = TableByPid
  339. }) ->
  340. purge_registry_for_remote_node(Scope, Node, TableByName, TableByPid).
  341. %% ===================================================================
  342. %% Internal
  343. %% ===================================================================
  344. -spec rebuild_monitors(Scope :: atom(), TableByName :: atom(), TableByPid :: atom()) -> ok.
  345. rebuild_monitors(Scope, TableByName, TableByPid) ->
  346. RegistryTuples = get_registry_tuples_for_node(node(), TableByName),
  347. do_rebuild_monitors(RegistryTuples, #{}, Scope, TableByName, TableByPid).
  348. -spec do_rebuild_monitors(
  349. [syn_registry_tuple()],
  350. #{pid() => reference()},
  351. Scope :: atom(),
  352. TableByName :: atom(),
  353. TableByPid :: atom()
  354. ) -> ok.
  355. do_rebuild_monitors([], _, _, _, _) -> ok;
  356. do_rebuild_monitors([{Name, Pid, Meta, Time} | T], NewMRefs, Scope, TableByName, TableByPid) ->
  357. remove_from_local_table(Name, Pid, TableByName, TableByPid),
  358. case is_process_alive(Pid) of
  359. true ->
  360. case maps:find(Pid, NewMRefs) of
  361. error ->
  362. MRef = erlang:monitor(process, Pid),
  363. add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid),
  364. do_rebuild_monitors(T, maps:put(Pid, MRef, NewMRefs), Scope, TableByName, TableByPid);
  365. {ok, MRef} ->
  366. add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid),
  367. do_rebuild_monitors(T, NewMRefs, Scope, TableByName, TableByPid)
  368. end;
  369. _ ->
  370. %% process died meanwhile, callback locally
  371. %% the remote callbacks will have been called when the scope process crash triggered them
  372. syn_event_handler:call_event_handler(on_process_unregistered, [Scope, Name, Pid, Meta, undefined]),
  373. %% loop
  374. do_rebuild_monitors(T, NewMRefs, Scope, TableByName, TableByPid)
  375. end.
  376. -spec do_register_on_node(
  377. Name :: term(),
  378. Pid :: pid(),
  379. Meta :: term(),
  380. MRef :: reference() | undefined,
  381. Reason :: term(),
  382. RequesterNode :: node(),
  383. CallbackMethod :: atom(),
  384. #state{}
  385. ) ->
  386. {
  387. reply,
  388. {ok, {
  389. CallbackMethod :: atom(),
  390. Meta :: term(),
  391. Time :: non_neg_integer(),
  392. TableByName :: atom(),
  393. TableByPid :: atom()
  394. }},
  395. #state{}
  396. }.
  397. do_register_on_node(Name, Pid, Meta, MRef, Reason, RequesterNode, CallbackMethod, #state{
  398. scope = Scope,
  399. table_by_name = TableByName,
  400. table_by_pid = TableByPid
  401. } = State) ->
  402. %% add to local table
  403. Time = erlang:system_time(),
  404. add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid),
  405. %% callback
  406. syn_event_handler:call_event_handler(CallbackMethod, [Scope, Name, Pid, Meta, Reason]),
  407. %% broadcast
  408. syn_gen_scope:broadcast({'3.0', sync_register, Name, Pid, Meta, Time, Reason}, [RequesterNode], State),
  409. %% return
  410. {reply, {ok, {CallbackMethod, Meta, Time, TableByName, TableByPid}}, State}.
  411. -spec get_registry_tuples_for_node(Node :: node(), TableByName :: atom()) -> [syn_registry_tuple()].
  412. get_registry_tuples_for_node(Node, TableByName) ->
  413. ets:select(TableByName, [{
  414. {'$1', '$2', '$3', '$4', '_', Node},
  415. [],
  416. [{{'$1', '$2', '$3', '$4'}}]
  417. }]).
  418. -spec find_registry_entry_by_name(Name :: term(), TableByName :: atom()) ->
  419. Entry :: syn_registry_entry() | undefined.
  420. find_registry_entry_by_name(Name, TableByName) ->
  421. case ets:lookup(TableByName, Name) of
  422. [] -> undefined;
  423. [Entry] -> Entry
  424. end.
  425. -spec find_registry_entries_by_pid(Pid :: pid(), TableByPid :: atom()) -> RegistryEntriesByPid :: [syn_registry_entry_by_pid()].
  426. find_registry_entries_by_pid(Pid, TableByPid) when is_pid(Pid) ->
  427. ets:lookup(TableByPid, Pid).
  428. -spec find_monitor_for_pid(Pid :: pid(), TableByPid :: atom()) -> reference() | undefined.
  429. find_monitor_for_pid(Pid, TableByPid) when is_pid(Pid) ->
  430. %% we use select instead of lookup to limit the results and thus cover the case
  431. %% when a process is registered with a considerable amount of names
  432. case ets:select(TableByPid, [{
  433. {Pid, '_', '_', '_', '$5', '_'},
  434. [],
  435. ['$5']
  436. }], 1) of
  437. {[MRef], _} -> MRef;
  438. '$end_of_table' -> undefined
  439. end.
  440. -spec maybe_demonitor(Pid :: pid(), TableByPid :: atom()) -> ok.
  441. maybe_demonitor(Pid, TableByPid) ->
  442. %% select 2: if only 1 is returned it means that no other aliases exist for the Pid
  443. %% we use select instead of lookup to limit the results and thus cover the case
  444. %% when a process is registered with a considerable amount of names
  445. case ets:select(TableByPid, [{
  446. {Pid, '_', '_', '_', '$5', '_'},
  447. [],
  448. ['$5']
  449. }], 2) of
  450. {[MRef], _} when is_reference(MRef) ->
  451. %% no other aliases, demonitor
  452. erlang:demonitor(MRef, [flush]),
  453. ok;
  454. _ ->
  455. ok
  456. end.
  457. -spec add_to_local_table(
  458. Name :: term(),
  459. Pid :: pid(),
  460. Meta :: term(),
  461. Time :: integer(),
  462. MRef :: undefined | reference(),
  463. TableByName :: atom(),
  464. TableByPid :: atom()
  465. ) -> true.
  466. add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid) ->
  467. %% insert
  468. true = ets:insert(TableByName, {Name, Pid, Meta, Time, MRef, node(Pid)}),
  469. %% since we use a table of type bag, we need to manually ensure that the key Pid, Name is unique
  470. true = ets:match_delete(TableByPid, {Pid, Name, '_', '_', '_', '_'}),
  471. true = ets:insert(TableByPid, {Pid, Name, Meta, Time, MRef, node(Pid)}).
  472. -spec remove_from_local_table(
  473. Name :: term(),
  474. Pid :: pid(),
  475. TableByName :: atom(),
  476. TableByPid :: atom()
  477. ) -> true.
  478. remove_from_local_table(Name, Pid, TableByName, TableByPid) ->
  479. true = ets:match_delete(TableByName, {Name, Pid, '_', '_', '_', '_'}),
  480. true = ets:match_delete(TableByPid, {Pid, Name, '_', '_', '_', '_'}).
  481. -spec update_local_table(
  482. Name :: term(),
  483. PreviousPid :: pid(),
  484. {
  485. Pid :: pid(),
  486. Meta :: term(),
  487. Time :: integer(),
  488. MRef :: undefined | reference()
  489. },
  490. TableByName :: atom(),
  491. TableByPid :: atom()
  492. ) -> true.
  493. update_local_table(Name, PreviousPid, {Pid, Meta, Time, MRef}, TableByName, TableByPid) ->
  494. maybe_demonitor(PreviousPid, TableByPid),
  495. remove_from_local_table(Name, PreviousPid, TableByName, TableByPid),
  496. add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid).
  497. -spec purge_registry_for_remote_nodes(Scope :: atom(), TableByName :: atom(), TableByPid :: atom()) -> any().
  498. purge_registry_for_remote_nodes(Scope, TableByName, TableByPid) ->
  499. LocalNode = node(),
  500. RemoteNodesWithDoubles = ets:select(TableByName, [{
  501. {'_', '_', '_', '_', '_', '$6'},
  502. [{'=/=', '$6', LocalNode}],
  503. ['$6']
  504. }]),
  505. RemoteNodes = ordsets:from_list(RemoteNodesWithDoubles),
  506. ordsets:fold(fun(RemoteNode, _) ->
  507. purge_registry_for_remote_node(Scope, RemoteNode, TableByName, TableByPid)
  508. end, undefined, RemoteNodes).
  509. -spec purge_registry_for_remote_node(Scope :: atom(), Node :: atom(), TableByName :: atom(), TableByPid :: atom()) -> true.
  510. purge_registry_for_remote_node(Scope, Node, TableByName, TableByPid) when Node =/= node() ->
  511. %% loop elements for callback
  512. RegistryTuples = get_registry_tuples_for_node(Node, TableByName),
  513. lists:foreach(fun({Name, Pid, Meta, _Time}) ->
  514. syn_event_handler:call_event_handler(on_process_unregistered,
  515. [Scope, Name, Pid, Meta, {syn_remote_scope_node_down, Scope, Node}]
  516. )
  517. end, RegistryTuples),
  518. %% remove all from pid table
  519. true = ets:match_delete(TableByName, {'_', '_', '_', '_', '_', Node}),
  520. true = ets:match_delete(TableByPid, {'_', '_', '_', '_', '_', Node}).
  521. -spec handle_registry_sync(
  522. Name :: term(),
  523. Pid :: pid(),
  524. Meta :: term(),
  525. Time :: non_neg_integer(),
  526. Reason :: term(),
  527. #state{}
  528. ) -> any().
  529. handle_registry_sync(Name, Pid, Meta, Time, Reason, #state{
  530. scope = Scope,
  531. table_by_name = TableByName,
  532. table_by_pid = TableByPid
  533. } = State) ->
  534. case find_registry_entry_by_name(Name, TableByName) of
  535. undefined ->
  536. %% no conflict
  537. add_to_local_table(Name, Pid, Meta, Time, undefined, TableByName, TableByPid),
  538. %% callback
  539. syn_event_handler:call_event_handler(on_process_registered, [Scope, Name, Pid, Meta, Reason]);
  540. {_, Pid, TableMeta, _, MRef, _} ->
  541. %% same pid, more recent (because it comes from the same node, which means that it's sequential)
  542. %% maybe updated meta or time only
  543. add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid),
  544. %% callback (call only if meta update)
  545. case TableMeta =/= Meta of
  546. true ->
  547. syn_event_handler:call_event_handler(on_registry_process_updated, [Scope, Name, Pid, Meta, Reason]);
  548. _ -> ok
  549. end;
  550. {_, TablePid, TableMeta, TableTime, TableMRef, TableNode} when TableNode =:= node() ->
  551. %% current node runs a conflicting process -> resolve
  552. %% * the conflict is resolved by the two nodes that own the conflicting processes
  553. %% * when a process is chosen, the time is updated
  554. %% * the node that runs the process that is kept sends the sync_register message
  555. %% * recipients check that the time is more recent that what they have to ensure that there are no race conditions
  556. resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime, TableMRef}, State);
  557. {_, TablePid, TableMeta, TableTime, _, _} when TableTime < Time ->
  558. %% current node does not own any of the conflicting processes, update
  559. update_local_table(Name, TablePid, {Pid, Meta, Time, undefined}, TableByName, TableByPid),
  560. %% callbacks
  561. syn_event_handler:call_event_handler(on_process_unregistered, [Scope, Name, TablePid, TableMeta, Reason]),
  562. syn_event_handler:call_event_handler(on_process_registered, [Scope, Name, Pid, Meta, Reason]);
  563. _ ->
  564. %% race condition: incoming data is older, ignore
  565. ok
  566. end.
  567. -spec resolve_conflict(
  568. Scope :: atom(),
  569. Name :: term(),
  570. {Pid :: pid(), Meta :: term(), Time :: non_neg_integer()},
  571. {TablePid :: pid(), TableMeta :: term(), TableTime :: non_neg_integer(), TableMRef :: reference()},
  572. #state{}
  573. ) -> any().
  574. resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime, TableMRef}, #state{
  575. table_by_name = TableByName,
  576. table_by_pid = TableByPid
  577. } = State) ->
  578. %% call conflict resolution
  579. {PidToKeep, KillOtherPid} = syn_event_handler:do_resolve_registry_conflict(
  580. Scope,
  581. Name,
  582. {Pid, Meta, Time},
  583. {TablePid, TableMeta, TableTime}
  584. ),
  585. %% resolve
  586. case PidToKeep of
  587. Pid ->
  588. %% -> we keep the remote pid
  589. error_logger:info_msg("SYN[~s|~s<~s>] Registry CONFLICT for name ~p: ~p vs ~p -> keeping remote: ~p",
  590. [node(), ?MODULE_LOG_NAME, Scope, Name, {Pid, Meta}, {TablePid, TableMeta}, Pid]
  591. ),
  592. %% update locally, the incoming sync_register will update with the time coming from remote node
  593. update_local_table(Name, TablePid, {Pid, Meta, Time, undefined}, TableByName, TableByPid),
  594. %% kill
  595. case KillOtherPid of
  596. true -> exit(TablePid, {syn_resolve_kill, Name, TableMeta});
  597. false -> ok
  598. end,
  599. %% callbacks
  600. syn_event_handler:call_event_handler(on_process_unregistered, [Scope, Name, TablePid, TableMeta, syn_conflict_resolution]),
  601. syn_event_handler:call_event_handler(on_process_registered, [Scope, Name, Pid, Meta, syn_conflict_resolution]);
  602. TablePid ->
  603. %% -> we keep the local pid, remote pid will be killed by the other node in the conflict
  604. error_logger:info_msg("SYN[~s|~s<~s>] Registry CONFLICT for name ~p: ~p vs ~p -> keeping local: ~p",
  605. [node(), ?MODULE_LOG_NAME, Scope, Name, {Pid, Meta}, {TablePid, TableMeta}, TablePid]
  606. ),
  607. %% overwrite with updated time
  608. ResolveTime = erlang:system_time(),
  609. add_to_local_table(Name, TablePid, TableMeta, ResolveTime, TableMRef, TableByName, TableByPid),
  610. %% broadcast to all (including remote node to update the time)
  611. syn_gen_scope:broadcast({'3.0', sync_register, Name, TablePid, TableMeta, ResolveTime, syn_conflict_resolution}, State);
  612. Invalid ->
  613. error_logger:info_msg("SYN[~s|~s<~s>] Registry CONFLICT for name ~p: ~p vs ~p -> none chosen (got: ~p)",
  614. [node(), ?MODULE_LOG_NAME, Scope, Name, {Pid, Meta}, {TablePid, TableMeta}, Invalid]
  615. ),
  616. %% remove
  617. maybe_demonitor(TablePid, TableByPid),
  618. remove_from_local_table(Name, TablePid, TableByName, TableByPid),
  619. %% kill local, remote will be killed by other node performing the same resolve
  620. case KillOtherPid of
  621. true -> exit(TablePid, {syn_resolve_kill, Name, TableMeta});
  622. false -> ok
  623. end,
  624. %% callback
  625. syn_event_handler:call_event_handler(on_process_unregistered, [Scope, Name, TablePid, TableMeta, syn_conflict_resolution]),
  626. %% broadcast to all but remote node, which will remove it during conflict resolution
  627. syn_gen_scope:broadcast({'3.0', sync_unregister, Name, TablePid, TableMeta, syn_conflict_resolution}, [node(Pid)], State)
  628. end.