syn_registry.erl 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625
  1. %% ==========================================================================================================
  2. %% Syn - A global Process Registry and Process Group manager.
  3. %%
  4. %% The MIT License (MIT)
  5. %%
  6. %% Copyright (c) 2015-2021 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
  7. %%
  8. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  9. %% of this software and associated documentation files (the "Software"), to deal
  10. %% in the Software without restriction, including without limitation the rights
  11. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  12. %% copies of the Software, and to permit persons to whom the Software is
  13. %% furnished to do so, subject to the following conditions:
  14. %%
  15. %% The above copyright notice and this permission notice shall be included in
  16. %% all copies or substantial portions of the Software.
  17. %%
  18. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  19. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THxE WARRANTIES OF MERCHANTABILITY,
  20. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  21. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  22. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  23. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  24. %% THE SOFTWARE.
  25. %% ==========================================================================================================
  26. -module(syn_registry).
  27. -behaviour(syn_gen_scope).
  28. %% API
  29. -export([start_link/1]).
  30. -export([get_subcluster_nodes/1]).
  31. -export([lookup/1, lookup/2]).
  32. -export([register/2, register/3, register/4]).
  33. -export([unregister/1, unregister/2]).
  34. -export([count/0, count/1, count/2]).
  35. -export([local_count/0, local_count/1]).
  36. %% syn_gen_scope callbacks
  37. -export([
  38. init/1,
  39. handle_call/3,
  40. handle_info/2,
  41. save_remote_data/2,
  42. get_local_data/1,
  43. purge_local_data_for_node/2
  44. ]).
  45. %% tests
  46. -ifdef(TEST).
  47. -export([add_to_local_table/7, remove_from_local_table/4]).
  48. -endif.
  49. %% includes
  50. -include("syn.hrl").
  51. %% ===================================================================
  52. %% API
  53. %% ===================================================================
  54. -spec start_link(Scope :: atom()) ->
  55. {ok, Pid :: pid()} | {error, {already_started, Pid :: pid()}} | {error, Reason :: term()}.
  56. start_link(Scope) when is_atom(Scope) ->
  57. syn_gen_scope:start_link(?MODULE, Scope).
  58. -spec get_subcluster_nodes(Scope :: atom()) -> [node()].
  59. get_subcluster_nodes(Scope) ->
  60. syn_gen_scope:get_subcluster_nodes(?MODULE, Scope).
  61. -spec lookup(Name :: term()) -> {pid(), Meta :: term()} | undefined.
  62. lookup(Name) ->
  63. lookup(?DEFAULT_SCOPE, Name).
  64. -spec lookup(Scope :: atom(), Name :: term()) -> {pid(), Meta :: term()} | undefined.
  65. lookup(Scope, Name) ->
  66. case syn_backbone:get_table_name(syn_registry_by_name, Scope) of
  67. undefined ->
  68. error({invalid_scope, Scope});
  69. TableByName ->
  70. case find_registry_entry_by_name(Name, TableByName) of
  71. undefined -> undefined;
  72. {Name, Pid, Meta, _, _, _} -> {Pid, Meta}
  73. end
  74. end.
  75. -spec register(Name :: term(), Pid :: pid()) -> ok | {error, Reason :: term()}.
  76. register(Name, Pid) ->
  77. register(?DEFAULT_SCOPE, Name, Pid, undefined).
  78. -spec register(NameOrScope :: term(), PidOrName :: term(), MetaOrPid :: term()) -> ok | {error, Reason :: term()}.
  79. register(Name, Pid, Meta) when is_pid(Pid) ->
  80. register(?DEFAULT_SCOPE, Name, Pid, Meta);
  81. register(Scope, Name, Pid) when is_pid(Pid) ->
  82. register(Scope, Name, Pid, undefined).
  83. -spec register(Scope :: atom(), Name :: term(), Pid :: pid(), Meta :: term()) -> ok | {error, Reason :: term()}.
  84. register(Scope, Name, Pid, Meta) ->
  85. Node = node(Pid),
  86. case syn_gen_scope:call(?MODULE, Node, Scope, {register_on_node, node(), Name, Pid, Meta}) of
  87. {ok, {CallbackMethod, Time, TableByName, TableByPid}} when Node =/= node() ->
  88. %% update table on caller node immediately so that subsequent calls have an updated registry
  89. add_to_local_table(Name, Pid, Meta, Time, undefined, TableByName, TableByPid),
  90. %% callback
  91. syn_event_handler:call_event_handler(CallbackMethod, [Scope, Name, Pid, Meta]),
  92. %% return
  93. ok;
  94. {Response, _} ->
  95. Response
  96. end.
  97. -spec unregister(Name :: term()) -> ok | {error, Reason :: term()}.
  98. unregister(Name) ->
  99. unregister(?DEFAULT_SCOPE, Name).
  100. -spec unregister(Scope :: atom(), Name :: term()) -> ok | {error, Reason :: term()}.
  101. unregister(Scope, Name) ->
  102. case syn_backbone:get_table_name(syn_registry_by_name, Scope) of
  103. undefined ->
  104. error({invalid_scope, Scope});
  105. TableByName ->
  106. % get process' node
  107. case find_registry_entry_by_name(Name, TableByName) of
  108. undefined ->
  109. {error, undefined};
  110. {Name, Pid, Meta, _, _, _} ->
  111. Node = node(Pid),
  112. case syn_gen_scope:call(?MODULE, Node, Scope, {unregister_on_node, node(), Name, Pid}) of
  113. {ok, TableByPid} when Node =/= node() ->
  114. %% remove table on caller node immediately so that subsequent calls have an updated registry
  115. remove_from_local_table(Name, Pid, TableByName, TableByPid),
  116. %% callback
  117. syn_event_handler:call_event_handler(on_process_unregistered, [Scope, Name, Pid, Meta]),
  118. %% return
  119. ok;
  120. {Response, _} ->
  121. Response
  122. end
  123. end
  124. end.
  125. -spec count() -> non_neg_integer().
  126. count() ->
  127. count(?DEFAULT_SCOPE).
  128. -spec count(Scope :: atom()) -> non_neg_integer().
  129. count(Scope) ->
  130. TableByName = syn_backbone:get_table_name(syn_registry_by_name, Scope),
  131. case ets:info(TableByName, size) of
  132. undefined -> error({invalid_scope, Scope});
  133. Value -> Value
  134. end.
  135. -spec count(Scope :: atom(), Node :: node()) -> non_neg_integer().
  136. count(Scope, Node) ->
  137. case syn_backbone:get_table_name(syn_registry_by_name, Scope) of
  138. undefined ->
  139. error({invalid_scope, Scope});
  140. TableByName ->
  141. ets:select_count(TableByName, [{
  142. {'_', '_', '_', '_', '_', Node},
  143. [],
  144. [true]
  145. }])
  146. end.
  147. -spec local_count() -> non_neg_integer().
  148. local_count() ->
  149. count(?DEFAULT_SCOPE, node()).
  150. -spec local_count(Scope :: atom()) -> non_neg_integer().
  151. local_count(Scope) ->
  152. count(Scope, node()).
  153. %% ===================================================================
  154. %% Callbacks
  155. %% ===================================================================
  156. %% ----------------------------------------------------------------------------------------------------------
  157. %% Init
  158. %% ----------------------------------------------------------------------------------------------------------
  159. -spec init(#state{}) -> {ok, HandlerState :: term()}.
  160. init(State) ->
  161. HandlerState = #{},
  162. %% rebuild
  163. rebuild_monitors(State),
  164. %% init
  165. {ok, HandlerState}.
  166. %% ----------------------------------------------------------------------------------------------------------
  167. %% Call messages
  168. %% ----------------------------------------------------------------------------------------------------------
  169. -spec handle_call(Request :: term(), From :: {pid(), Tag :: term()}, #state{}) ->
  170. {reply, Reply :: term(), #state{}} |
  171. {reply, Reply :: term(), #state{}, timeout() | hibernate | {continue, term()}} |
  172. {noreply, #state{}} |
  173. {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
  174. {stop, Reason :: term(), Reply :: term(), #state{}} |
  175. {stop, Reason :: term(), #state{}}.
  176. handle_call({register_on_node, RequesterNode, Name, Pid, Meta}, _From, #state{
  177. table_by_name = TableByName,
  178. table_by_pid = TableByPid
  179. } = State) ->
  180. case is_process_alive(Pid) of
  181. true ->
  182. case find_registry_entry_by_name(Name, TableByName) of
  183. undefined ->
  184. %% available
  185. MRef = case find_monitor_for_pid(Pid, TableByPid) of
  186. undefined -> erlang:monitor(process, Pid); %% process is not monitored yet, add
  187. MRef0 -> MRef0
  188. end,
  189. do_register_on_node(Name, Pid, Meta, MRef, RequesterNode, on_process_registered, State);
  190. {Name, Pid, Meta, _, _, _} ->
  191. %% same pid, same meta
  192. {reply, {ok, noop}, State};
  193. {Name, Pid, _, _, MRef, _} ->
  194. do_register_on_node(Name, Pid, Meta, MRef, RequesterNode, on_registry_process_updated, State);
  195. _ ->
  196. {reply, {{error, taken}, undefined}, State}
  197. end;
  198. false ->
  199. {reply, {{error, not_alive}, undefined}, State}
  200. end;
  201. handle_call({unregister_on_node, RequesterNode, Name, Pid}, _From, #state{
  202. scope = Scope,
  203. table_by_name = TableByName,
  204. table_by_pid = TableByPid
  205. } = State) ->
  206. case find_registry_entry_by_name(Name, TableByName) of
  207. {Name, Pid, Meta, _, _, _} ->
  208. %% demonitor if the process is not registered under other names
  209. maybe_demonitor(Pid, TableByPid),
  210. %% remove from table
  211. remove_from_local_table(Name, Pid, TableByName, TableByPid),
  212. %% callback
  213. syn_event_handler:call_event_handler(on_process_unregistered, [Scope, Name, Pid, Meta]),
  214. %% broadcast
  215. syn_gen_scope:broadcast({'3.0', sync_unregister, Name, Pid, Meta}, [RequesterNode], State),
  216. %% return
  217. {reply, {ok, TableByPid}, State};
  218. {Name, _, _, _, _, _} ->
  219. %% process is registered locally with another pid: race condition, wait for sync to happen & return error
  220. {reply, {{error, race_condition}, undefined}, State};
  221. undefined ->
  222. {reply, {{error, undefined}, undefined}, State}
  223. end;
  224. handle_call(Request, From, #state{scope = Scope} = State) ->
  225. error_logger:warning_msg("SYN[~s<~s>] Received from ~p an unknown call message: ~p", [?MODULE, Scope, From, Request]),
  226. {reply, undefined, State}.
  227. %% ----------------------------------------------------------------------------------------------------------
  228. %% Info messages
  229. %% ----------------------------------------------------------------------------------------------------------
  230. -spec handle_info(Info :: timeout | term(), #state{}) ->
  231. {noreply, #state{}} |
  232. {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
  233. {stop, Reason :: term(), #state{}}.
  234. handle_info({'3.0', sync_register, Name, Pid, Meta, Time}, State) ->
  235. handle_registry_sync(Name, Pid, Meta, Time, State),
  236. {noreply, State};
  237. handle_info({'3.0', sync_unregister, Name, Pid, Meta}, #state{
  238. scope = Scope,
  239. table_by_name = TableByName,
  240. table_by_pid = TableByPid
  241. } = State) ->
  242. remove_from_local_table(Name, Pid, TableByName, TableByPid),
  243. %% callback
  244. syn_event_handler:call_event_handler(on_process_unregistered, [Scope, Name, Pid, Meta]),
  245. %% return
  246. {noreply, State};
  247. handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{
  248. scope = Scope,
  249. table_by_name = TableByName,
  250. table_by_pid = TableByPid
  251. } = State) ->
  252. case find_registry_entries_by_pid(Pid, TableByPid) of
  253. [] ->
  254. error_logger:warning_msg(
  255. "SYN[~s<~s>] Received a DOWN message from an unknown process ~p with reason: ~p",
  256. [?MODULE, Scope, Pid, Reason]
  257. );
  258. Entries ->
  259. lists:foreach(fun({_, Name, Meta, _, _, _}) ->
  260. %% remove from table
  261. remove_from_local_table(Name, Pid, TableByName, TableByPid),
  262. %% callback
  263. syn_event_handler:call_event_handler(on_process_unregistered, [Scope, Name, Pid, Meta]),
  264. %% broadcast
  265. syn_gen_scope:broadcast({'3.0', sync_unregister, Name, Pid, Meta}, State)
  266. end, Entries)
  267. end,
  268. %% return
  269. {noreply, State};
  270. handle_info(Info, #state{scope = Scope} = State) ->
  271. error_logger:warning_msg("SYN[~s<~s>] Received an unknown info message: ~p", [?MODULE, Scope, Info]),
  272. {noreply, State}.
  273. %% ----------------------------------------------------------------------------------------------------------
  274. %% Data callbacks
  275. %% ----------------------------------------------------------------------------------------------------------
  276. -spec get_local_data(#state{}) -> {ok, Data :: term()} | undefined.
  277. get_local_data(#state{table_by_name = TableByName}) ->
  278. {ok, get_registry_tuples_for_node(node(), TableByName)}.
  279. -spec save_remote_data(RemoteData :: term(), #state{}) -> any().
  280. save_remote_data(RegistryTuplesOfRemoteNode, State) ->
  281. %% insert tuples
  282. lists:foreach(fun({Name, Pid, Meta, Time}) ->
  283. handle_registry_sync(Name, Pid, Meta, Time, State)
  284. end, RegistryTuplesOfRemoteNode).
  285. -spec purge_local_data_for_node(Node :: node(), #state{}) -> any().
  286. purge_local_data_for_node(Node, #state{
  287. scope = Scope,
  288. table_by_name = TableByName,
  289. table_by_pid = TableByPid
  290. }) ->
  291. purge_registry_for_remote_node(Scope, Node, TableByName, TableByPid).
  292. %% ===================================================================
  293. %% Internal
  294. %% ===================================================================
  295. -spec rebuild_monitors(#state{}) -> ok.
  296. rebuild_monitors(#state{
  297. table_by_name = TableByName
  298. } = State) ->
  299. RegistryTuples = get_registry_tuples_for_node(node(), TableByName),
  300. do_rebuild_monitors(RegistryTuples, #{}, State).
  301. -spec do_rebuild_monitors([syn_registry_tuple()], #{pid() => reference()}, #state{}) -> ok.
  302. do_rebuild_monitors([], _, _) -> ok;
  303. do_rebuild_monitors([{Name, Pid, Meta, Time} | T], NewMonitorRefs, #state{
  304. table_by_name = TableByName,
  305. table_by_pid = TableByPid
  306. } = State) ->
  307. remove_from_local_table(Name, Pid, TableByName, TableByPid),
  308. case is_process_alive(Pid) of
  309. true ->
  310. case maps:find(Pid, NewMonitorRefs) of
  311. error ->
  312. MRef = erlang:monitor(process, Pid),
  313. add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid),
  314. do_rebuild_monitors(T, maps:put(Pid, MRef, NewMonitorRefs), State);
  315. {ok, MRef} ->
  316. add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid),
  317. do_rebuild_monitors(T, NewMonitorRefs, State)
  318. end;
  319. _ ->
  320. do_rebuild_monitors(T, NewMonitorRefs, State)
  321. end.
  322. -spec do_register_on_node(
  323. Name :: term(),
  324. Pid :: pid(),
  325. Meta :: term(),
  326. MRef :: reference() | undefined,
  327. RequesterNode :: node(),
  328. CallbackMethod :: atom(),
  329. #state{}
  330. ) ->
  331. {
  332. reply,
  333. {ok, {
  334. CallbackMethod :: atom(),
  335. Time :: non_neg_integer(),
  336. TableByName :: atom(),
  337. TableByPid :: atom()
  338. }},
  339. #state{}
  340. }.
  341. do_register_on_node(Name, Pid, Meta, MRef, RequesterNode, CallbackMethod, #state{
  342. scope = Scope,
  343. table_by_name = TableByName,
  344. table_by_pid = TableByPid
  345. } = State) ->
  346. %% add to local table
  347. Time = erlang:system_time(),
  348. add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid),
  349. %% callback
  350. syn_event_handler:call_event_handler(CallbackMethod, [Scope, Name, Pid, Meta]),
  351. %% broadcast
  352. syn_gen_scope:broadcast({'3.0', sync_register, Name, Pid, Meta, Time}, [RequesterNode], State),
  353. %% return
  354. {reply, {ok, {CallbackMethod, Time, TableByName, TableByPid}}, State}.
  355. -spec get_registry_tuples_for_node(Node :: node(), TableByName :: atom()) -> [syn_registry_tuple()].
  356. get_registry_tuples_for_node(Node, TableByName) ->
  357. ets:select(TableByName, [{
  358. {'$1', '$2', '$3', '$4', '_', Node},
  359. [],
  360. [{{'$1', '$2', '$3', '$4'}}]
  361. }]).
  362. -spec find_registry_entry_by_name(Name :: term(), TableByName :: atom()) ->
  363. Entry :: syn_registry_entry() | undefined.
  364. find_registry_entry_by_name(Name, TableByName) ->
  365. case ets:lookup(TableByName, Name) of
  366. [] -> undefined;
  367. [Entry] -> Entry
  368. end.
  369. -spec find_registry_entries_by_pid(Pid :: pid(), TableByPid :: atom()) -> RegistryEntriesByPid :: [syn_registry_entry_by_pid()].
  370. find_registry_entries_by_pid(Pid, TableByPid) when is_pid(Pid) ->
  371. ets:lookup(TableByPid, Pid).
  372. -spec find_monitor_for_pid(Pid :: pid(), TableByPid :: atom()) -> reference() | undefined.
  373. find_monitor_for_pid(Pid, TableByPid) when is_pid(Pid) ->
  374. %% we use select instead of lookup to limit the results and thus cover the case
  375. %% when a process is registered with a considerable amount of names
  376. case ets:select(TableByPid, [{
  377. {Pid, '_', '_', '_', '$5', '_'},
  378. [],
  379. ['$5']
  380. }], 1) of
  381. {[MRef], _} -> MRef;
  382. '$end_of_table' -> undefined
  383. end.
  384. -spec maybe_demonitor(Pid :: pid(), TableByPid :: atom()) -> ok.
  385. maybe_demonitor(Pid, TableByPid) ->
  386. %% select 2: if only 1 is returned it means that no other aliases exist for the Pid
  387. %% we use select instead of lookup to limit the results and thus cover the case
  388. %% when a process is registered with a considerable amount of names
  389. case ets:select(TableByPid, [{
  390. {Pid, '_', '_', '_', '$5', '_'},
  391. [],
  392. ['$5']
  393. }], 2) of
  394. {[MRef], _} when is_reference(MRef) ->
  395. %% no other aliases, demonitor
  396. erlang:demonitor(MRef, [flush]),
  397. ok;
  398. _ ->
  399. ok
  400. end.
  401. -spec add_to_local_table(
  402. Name :: term(),
  403. Pid :: pid(),
  404. Meta :: term(),
  405. Time :: integer(),
  406. MRef :: undefined | reference(),
  407. TableByName :: atom(),
  408. TableByPid :: atom()
  409. ) -> true.
  410. add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid) ->
  411. %% insert
  412. true = ets:insert(TableByName, {Name, Pid, Meta, Time, MRef, node(Pid)}),
  413. %% since we use a table of type bag, we need to manually ensure that the key Pid, Name is unique
  414. true = ets:match_delete(TableByPid, {Pid, Name, '_', '_', '_', '_'}),
  415. true = ets:insert(TableByPid, {Pid, Name, Meta, Time, MRef, node(Pid)}).
  416. -spec remove_from_local_table(
  417. Name :: term(),
  418. Pid :: pid(),
  419. TableByName :: atom(),
  420. TableByPid :: atom()
  421. ) -> true.
  422. remove_from_local_table(Name, Pid, TableByName, TableByPid) ->
  423. true = ets:match_delete(TableByName, {Name, Pid, '_', '_', '_', '_'}),
  424. true = ets:match_delete(TableByPid, {Pid, Name, '_', '_', '_', '_'}).
  425. -spec update_local_table(
  426. Name :: term(),
  427. PreviousPid :: pid(),
  428. {
  429. Pid :: pid(),
  430. Meta :: term(),
  431. Time :: integer(),
  432. MRef :: undefined | reference()
  433. },
  434. TableByName :: atom(),
  435. TableByPid :: atom()
  436. ) -> true.
  437. update_local_table(Name, PreviousPid, {Pid, Meta, Time, MRef}, TableByName, TableByPid) ->
  438. maybe_demonitor(PreviousPid, TableByPid),
  439. remove_from_local_table(Name, PreviousPid, TableByName, TableByPid),
  440. add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid).
  441. -spec purge_registry_for_remote_node(Scope :: atom(), Node :: atom(), TableByName :: atom(), TableByPid :: atom()) -> true.
  442. purge_registry_for_remote_node(Scope, Node, TableByName, TableByPid) when Node =/= node() ->
  443. %% loop elements for callback in a separate process to free scope process
  444. RegistryTuples = get_registry_tuples_for_node(Node, TableByName),
  445. spawn(fun() ->
  446. lists:foreach(fun({Name, Pid, Meta, _Time}) ->
  447. syn_event_handler:call_event_handler(on_process_unregistered, [Scope, Name, Pid, Meta])
  448. end, RegistryTuples)
  449. end),
  450. %% remove all from pid table
  451. true = ets:match_delete(TableByName, {'_', '_', '_', '_', '_', Node}),
  452. true = ets:match_delete(TableByPid, {'_', '_', '_', '_', '_', Node}).
  453. -spec handle_registry_sync(
  454. Name :: term(),
  455. Pid :: pid(),
  456. Meta :: term(),
  457. Time :: non_neg_integer(),
  458. #state{}
  459. ) -> any().
  460. handle_registry_sync(Name, Pid, Meta, Time, #state{
  461. scope = Scope,
  462. table_by_name = TableByName,
  463. table_by_pid = TableByPid
  464. } = State) ->
  465. case find_registry_entry_by_name(Name, TableByName) of
  466. undefined ->
  467. %% no conflict
  468. add_to_local_table(Name, Pid, Meta, Time, undefined, TableByName, TableByPid),
  469. %% callback
  470. syn_event_handler:call_event_handler(on_process_registered, [Scope, Name, Pid, Meta]);
  471. {_, Pid, TableMeta, _, MRef, _} ->
  472. %% same pid, more recent (because it comes from the same node, which means that it's sequential)
  473. %% maybe updated meta or time only
  474. add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid),
  475. %% callback (call only if meta update)
  476. case TableMeta =/= Meta of
  477. true -> syn_event_handler:call_event_handler(on_registry_process_updated, [Scope, Name, Pid, Meta]);
  478. _ -> ok
  479. end;
  480. {_, TablePid, TableMeta, TableTime, TableMRef, TableNode} when TableNode =:= node() ->
  481. %% current node runs a conflicting process -> resolve
  482. %% * the conflict is resolved by the two nodes that own the conflicting processes
  483. %% * when a process is chosen, the time is updated
  484. %% * the node that runs the process that is kept sends the sync_register message
  485. %% * recipients check that the time is more recent that what they have to ensure that there are no race conditions
  486. resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime, TableMRef}, State);
  487. {_, TablePid, TableMeta, TableTime, _, _} when TableTime < Time ->
  488. %% current node does not own any of the conflicting processes, update
  489. update_local_table(Name, TablePid, {Pid, Meta, Time, undefined}, TableByName, TableByPid),
  490. %% callbacks
  491. syn_event_handler:call_event_handler(on_process_unregistered, [Scope, Name, TablePid, TableMeta]),
  492. syn_event_handler:call_event_handler(on_process_registered, [Scope, Name, Pid, Meta]);
  493. {_, _, _, _, _, _} ->
  494. %% race condition: incoming data is older, ignore
  495. ok
  496. end.
  497. -spec resolve_conflict(
  498. Scope :: atom(),
  499. Name :: term(),
  500. {Pid :: pid(), Meta :: term(), Time :: non_neg_integer()},
  501. {TablePid :: pid(), TableMeta :: term(), TableTime :: non_neg_integer(), TableMRef :: reference()},
  502. #state{}
  503. ) -> any().
  504. resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime, TableMRef}, #state{
  505. table_by_name = TableByName,
  506. table_by_pid = TableByPid
  507. } = State) ->
  508. %% call conflict resolution
  509. {PidToKeep, KillOtherPid} = syn_event_handler:do_resolve_registry_conflict(
  510. Scope,
  511. Name,
  512. {Pid, Meta, Time},
  513. {TablePid, TableMeta, TableTime}
  514. ),
  515. %% resolve
  516. case PidToKeep of
  517. Pid ->
  518. %% -> we keep the remote pid
  519. error_logger:info_msg("SYN[~s<~s>] Registry CONFLICT for name ~p: ~p vs ~p -> keeping remote: ~p",
  520. [?MODULE, Scope, Name, Pid, TablePid, Pid]
  521. ),
  522. %% update locally, the incoming sync_register will update with the time coming from remote node
  523. update_local_table(Name, TablePid, {Pid, Meta, Time, undefined}, TableByName, TableByPid),
  524. %% kill
  525. case KillOtherPid of
  526. true -> exit(TablePid, {syn_resolve_kill, Name, TableMeta});
  527. false -> ok
  528. end,
  529. %% callbacks
  530. syn_event_handler:call_event_handler(on_process_unregistered, [Scope, Name, TablePid, TableMeta]),
  531. syn_event_handler:call_event_handler(on_process_registered, [Scope, Name, Pid, Meta]);
  532. TablePid ->
  533. %% -> we keep the local pid, remote pid will be killed by the other node in the conflict
  534. error_logger:info_msg("SYN[~s<~s>] Registry CONFLICT for name ~p: ~p vs ~p -> keeping local: ~p",
  535. [?MODULE, Scope, Name, Pid, TablePid, TablePid]
  536. ),
  537. %% overwrite with updated time
  538. ResolveTime = erlang:system_time(),
  539. add_to_local_table(Name, TablePid, TableMeta, ResolveTime, TableMRef, TableByName, TableByPid),
  540. %% broadcast to all (including remote node to update the time)
  541. syn_gen_scope:broadcast({'3.0', sync_register, Name, TablePid, TableMeta, ResolveTime}, State);
  542. Invalid ->
  543. error_logger:info_msg("SYN[~s<~s>] Registry CONFLICT for name ~p: ~p vs ~p -> none chosen (got: ~p)",
  544. [?MODULE, Scope, Name, Pid, TablePid, Invalid]
  545. ),
  546. %% remove
  547. maybe_demonitor(TablePid, TableByPid),
  548. remove_from_local_table(Name, TablePid, TableByName, TableByPid),
  549. %% kill local, remote will be killed by other node performing the same resolve
  550. case KillOtherPid of
  551. true -> exit(TablePid, {syn_resolve_kill, Name, TableMeta});
  552. false -> ok
  553. end,
  554. %% callback
  555. syn_event_handler:call_event_handler(on_process_unregistered, [Scope, Name, TablePid, TableMeta]),
  556. %% broadcast to all but remote node, which will remove it during conflict resolution
  557. syn_gen_scope:broadcast({'3.0', sync_unregister, Name, TablePid, TableMeta}, [node(Pid)], State)
  558. end.