syn_registry.erl 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566
  1. %% ==========================================================================================================
  2. %% Syn - A global Process Registry and Process Group manager.
  3. %%
  4. %% The MIT License (MIT)
  5. %%
  6. %% Copyright (c) 2015-2021 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
  7. %%
  8. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  9. %% of this software and associated documentation files (the "Software"), to deal
  10. %% in the Software without restriction, including without limitation the rights
  11. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  12. %% copies of the Software, and to permit persons to whom the Software is
  13. %% furnished to do so, subject to the following conditions:
  14. %%
  15. %% The above copyright notice and this permission notice shall be included in
  16. %% all copies or substantial portions of the Software.
  17. %%
  18. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  19. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THxE WARRANTIES OF MERCHANTABILITY,
  20. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  21. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  22. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  23. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  24. %% THE SOFTWARE.
  25. %% ==========================================================================================================
  26. -module(syn_registry).
  27. -behaviour(syn_gen_scope).
  28. %% API
  29. -export([start_link/1]).
  30. -export([get_subcluster_nodes/1]).
  31. -export([lookup/1, lookup/2]).
  32. -export([register/2, register/3, register/4]).
  33. -export([unregister/1, unregister/2]).
  34. -export([count/1, count/2]).
  35. %% syn_gen_scope callbacks
  36. -export([
  37. init/1,
  38. handle_call/3,
  39. handle_info/2,
  40. save_remote_data/2,
  41. get_local_data/1,
  42. purge_local_data_for_node/2
  43. ]).
  44. %% tests
  45. -ifdef(TEST).
  46. -export([add_to_local_table/7, remove_from_local_table/4]).
  47. -endif.
  48. %% includes
  49. -include("syn.hrl").
  50. %% ===================================================================
  51. %% API
  52. %% ===================================================================
  53. -spec start_link(Scope :: atom()) ->
  54. {ok, Pid :: pid()} | {error, {already_started, Pid :: pid()}} | {error, Reason :: any()}.
  55. start_link(Scope) when is_atom(Scope) ->
  56. syn_gen_scope:start_link(?MODULE, Scope).
  57. -spec get_subcluster_nodes(#state{}) -> [node()].
  58. get_subcluster_nodes(State) ->
  59. syn_gen_scope:get_subcluster_nodes(?MODULE, State).
  60. -spec lookup(Name :: any()) -> {pid(), Meta :: any()} | undefined.
  61. lookup(Name) ->
  62. lookup(default, Name).
  63. -spec lookup(Scope :: atom(), Name :: any()) -> {pid(), Meta :: any()} | undefined.
  64. lookup(Scope, Name) ->
  65. case syn_backbone:get_table_name(syn_registry_by_name, Scope) of
  66. undefined ->
  67. error({invalid_scope, Scope});
  68. TableByName ->
  69. case find_registry_entry_by_name(Name, TableByName) of
  70. undefined -> undefined;
  71. {Name, Pid, Meta, _, _, _} -> {Pid, Meta}
  72. end
  73. end.
  74. -spec register(Name :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
  75. register(Name, Pid) ->
  76. register(default, Name, Pid, undefined).
  77. -spec register(NameOrScope :: any(), PidOrName :: any(), MetaOrPid :: any()) -> ok | {error, Reason :: any()}.
  78. register(Name, Pid, Meta) when is_pid(Pid) ->
  79. register(default, Name, Pid, Meta);
  80. register(Scope, Name, Pid) when is_pid(Pid) ->
  81. register(Scope, Name, Pid, undefined).
  82. -spec register(Scope :: atom(), Name :: any(), Pid :: pid(), Meta :: any()) -> ok | {error, Reason :: any()}.
  83. register(Scope, Name, Pid, Meta) ->
  84. Node = node(Pid),
  85. case syn_gen_scope:call(?MODULE, Node, Scope, {register_on_owner, node(), Name, Pid, Meta}) of
  86. {ok, {TablePid, TableMeta, Time, TableByName, TableByPid}} when Node =/= node() ->
  87. %% update table on caller node immediately so that subsequent calls have an updated registry
  88. add_to_local_table(Name, Pid, Meta, Time, undefined, TableByName, TableByPid),
  89. %% callback
  90. syn_event_handler:do_on_process_registered(Scope, Name, {TablePid, TableMeta}, {Pid, Meta}),
  91. %% return
  92. ok;
  93. {Response, _} ->
  94. Response
  95. end.
  96. -spec unregister(Name :: any()) -> ok | {error, Reason :: any()}.
  97. unregister(Name) ->
  98. unregister(default, Name).
  99. -spec unregister(Scope :: atom(), Name :: any()) -> ok | {error, Reason :: any()}.
  100. unregister(Scope, Name) ->
  101. case syn_backbone:get_table_name(syn_registry_by_name, Scope) of
  102. undefined ->
  103. error({invalid_scope, Scope});
  104. TableByName ->
  105. % get process' node
  106. case find_registry_entry_by_name(Name, TableByName) of
  107. undefined ->
  108. {error, undefined};
  109. {Name, Pid, Meta, _, _, _} ->
  110. Node = node(Pid),
  111. case syn_gen_scope:call(?MODULE, Node, Scope, {unregister_on_owner, node(), Name, Pid}) of
  112. {ok, TableByPid} when Node =/= node() ->
  113. %% remove table on caller node immediately so that subsequent calls have an updated registry
  114. remove_from_local_table(Name, Pid, TableByName, TableByPid),
  115. %% callback
  116. syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
  117. %% return
  118. ok;
  119. {Response, _} ->
  120. Response
  121. end
  122. end
  123. end.
  124. -spec count(Scope :: atom()) -> non_neg_integer().
  125. count(Scope) ->
  126. TableByName = syn_backbone:get_table_name(syn_registry_by_name, Scope),
  127. case ets:info(TableByName, size) of
  128. undefined -> error({invalid_scope, Scope});
  129. Value -> Value
  130. end.
  131. -spec count(Scope :: atom(), Node :: node()) -> non_neg_integer().
  132. count(Scope, Node) ->
  133. case syn_backbone:get_table_name(syn_registry_by_name, Scope) of
  134. undefined ->
  135. error({invalid_scope, Scope});
  136. TableByName ->
  137. ets:select_count(TableByName, [{
  138. {'_', '_', '_', '_', '_', Node},
  139. [],
  140. [true]
  141. }])
  142. end.
  143. %% ===================================================================
  144. %% Callbacks
  145. %% ===================================================================
  146. %% ----------------------------------------------------------------------------------------------------------
  147. %% Init
  148. %% ----------------------------------------------------------------------------------------------------------
  149. -spec init(State :: term()) -> {ok, State :: term()}.
  150. init(State) ->
  151. HandlerState = #{},
  152. %% rebuild
  153. rebuild_monitors(State),
  154. %% init
  155. {ok, HandlerState}.
  156. %% ----------------------------------------------------------------------------------------------------------
  157. %% Call messages
  158. %% ----------------------------------------------------------------------------------------------------------
  159. -spec handle_call(Request :: term(), From :: {pid(), Tag :: term()},
  160. State :: term()) ->
  161. {reply, Reply :: term(), NewState :: term()} |
  162. {reply, Reply :: term(), NewState :: term(), timeout() | hibernate | {continue, term()}} |
  163. {noreply, NewState :: term()} |
  164. {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
  165. {stop, Reason :: term(), Reply :: term(), NewState :: term()} |
  166. {stop, Reason :: term(), NewState :: term()}.
  167. handle_call({register_on_owner, RequesterNode, Name, Pid, Meta}, _From, #state{
  168. scope = Scope,
  169. table_by_name = TableByName,
  170. table_by_pid = TableByPid
  171. } = State) ->
  172. case is_process_alive(Pid) of
  173. true ->
  174. case find_registry_entry_by_name(Name, TableByName) of
  175. undefined ->
  176. %% available
  177. MRef = case find_monitor_for_pid(Pid, TableByPid) of
  178. undefined -> erlang:monitor(process, Pid); %% process is not monitored yet, add
  179. MRef0 -> MRef0
  180. end,
  181. %% add to local table
  182. Time = erlang:system_time(),
  183. add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid),
  184. %% callback
  185. syn_event_handler:do_on_process_registered(Scope, Name, {undefined, undefined}, {Pid, Meta}),
  186. %% broadcast
  187. syn_gen_scope:broadcast({'3.0', sync_register, Scope, Name, Pid, Meta, Time}, [RequesterNode], State),
  188. %% return
  189. {reply, {ok, {undefined, undefined, Time, TableByName, TableByPid}}, State};
  190. {Name, Pid, TableMeta, _TableTime, MRef, _TableNode} ->
  191. %% same pid, possibly new meta or time, overwrite
  192. Time = erlang:system_time(),
  193. add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid),
  194. %% callback
  195. syn_event_handler:do_on_process_registered(Scope, Name, {Pid, TableMeta}, {Pid, Meta}),
  196. %% broadcast
  197. syn_gen_scope:broadcast({'3.0', sync_register, Scope, Name, Pid, Meta, Time}, State),
  198. %% return
  199. {reply, {ok, {Pid, TableMeta, Time, TableByName, TableByPid}}, State};
  200. _ ->
  201. {reply, {{error, taken}, undefined}, State}
  202. end;
  203. false ->
  204. {reply, {{error, not_alive}, undefined}, State}
  205. end;
  206. handle_call({unregister_on_owner, RequesterNode, Name, Pid}, _From, #state{
  207. scope = Scope,
  208. table_by_name = TableByName,
  209. table_by_pid = TableByPid
  210. } = State) ->
  211. case find_registry_entry_by_name(Name, TableByName) of
  212. {Name, Pid, Meta, _Time, _MRef, _Node} ->
  213. %% demonitor if the process is not registered under other names
  214. maybe_demonitor(Pid, TableByPid),
  215. %% remove from table
  216. remove_from_local_table(Name, Pid, TableByName, TableByPid),
  217. %% callback
  218. syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
  219. %% broadcast
  220. syn_gen_scope:broadcast({'3.0', sync_unregister, Name, Pid, Meta}, [RequesterNode], State),
  221. %% return
  222. {reply, {ok, TableByPid}, State};
  223. {Name, _TablePid, _Meta, _Time, _MRef, _Node} ->
  224. %% process is registered locally with another pid: race condition, wait for sync to happen & return error
  225. {reply, {{error, race_condition}, undefined}, State};
  226. undefined ->
  227. {reply, {{error, undefined}, undefined}, State}
  228. end;
  229. handle_call(Request, From, State) ->
  230. error_logger:warning_msg("SYN[~s] Received from ~p an unknown call message: ~p", [node(), From, Request]),
  231. {reply, undefined, State}.
  232. %% ----------------------------------------------------------------------------------------------------------
  233. %% Info messages
  234. %% ----------------------------------------------------------------------------------------------------------
  235. -spec handle_info(Info :: timeout | term(), State :: term()) ->
  236. {noreply, NewState :: term()} |
  237. {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
  238. {stop, Reason :: term(), NewState :: term()}.
  239. handle_info({'3.0', sync_register, Scope, Name, Pid, Meta, Time}, State) ->
  240. handle_registry_sync(Scope, Name, Pid, Meta, Time, State),
  241. {noreply, State};
  242. handle_info({'3.0', sync_unregister, Name, Pid, Meta}, #state{
  243. scope = Scope,
  244. table_by_name = TableByName,
  245. table_by_pid = TableByPid
  246. } = State) ->
  247. remove_from_local_table(Name, Pid, TableByName, TableByPid),
  248. %% callback
  249. syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
  250. %% return
  251. {noreply, State};
  252. handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{
  253. scope = Scope,
  254. table_by_name = TableByName,
  255. table_by_pid = TableByPid
  256. } = State) ->
  257. case find_registry_entries_by_pid(Pid, TableByPid) of
  258. [] ->
  259. error_logger:warning_msg(
  260. "SYN[~s] Received a DOWN message from an unknown process ~p with reason: ~p",
  261. [node(), Pid, Reason]
  262. );
  263. Entries ->
  264. lists:foreach(fun({_Pid, Name, Meta, _, _, _}) ->
  265. %% remove from table
  266. remove_from_local_table(Name, Pid, TableByName, TableByPid),
  267. %% callback
  268. syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
  269. %% broadcast
  270. syn_gen_scope:broadcast({'3.0', sync_unregister, Name, Pid, Meta}, State)
  271. end, Entries)
  272. end,
  273. %% return
  274. {noreply, State};
  275. handle_info(Info, State) ->
  276. error_logger:warning_msg("SYN[~s] Received an unknown info message: ~p", [node(), Info]),
  277. {noreply, State}.
  278. %% ----------------------------------------------------------------------------------------------------------
  279. %% Data
  280. %% ----------------------------------------------------------------------------------------------------------
  281. -spec get_local_data(State :: term()) -> {ok, Data :: any()} | undefined.
  282. get_local_data(#state{table_by_name = TableByName}) ->
  283. {ok, get_registry_tuples_for_node(node(), TableByName)}.
  284. -spec save_remote_data(RemoteData :: any(), State :: term()) -> any().
  285. save_remote_data(RegistryTuplesOfRemoteNode, #state{scope = Scope} = State) ->
  286. %% insert tuples
  287. lists:foreach(fun({Name, Pid, Meta, Time}) ->
  288. handle_registry_sync(Scope, Name, Pid, Meta, Time, State)
  289. end, RegistryTuplesOfRemoteNode).
  290. -spec purge_local_data_for_node(Node :: node(), State :: term()) -> any().
  291. purge_local_data_for_node(Node, #state{
  292. scope = Scope,
  293. table_by_name = TableByName,
  294. table_by_pid = TableByPid
  295. }) ->
  296. purge_registry_for_remote_node(Scope, Node, TableByName, TableByPid).
  297. %% ===================================================================
  298. %% Internal
  299. %% ===================================================================
  300. -spec rebuild_monitors(#state{}) -> ok.
  301. rebuild_monitors(#state{
  302. table_by_name = TableByName,
  303. table_by_pid = TableByPid
  304. }) ->
  305. RegistryTuples = get_registry_tuples_for_node(node(), TableByName),
  306. lists:foreach(fun({Name, Pid, Meta, Time}) ->
  307. remove_from_local_table(Name, Pid, TableByName, TableByPid),
  308. case is_process_alive(Pid) of
  309. true ->
  310. MRef = erlang:monitor(process, Pid),
  311. add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid);
  312. _ ->
  313. ok
  314. end
  315. end, RegistryTuples).
  316. -spec get_registry_tuples_for_node(Node :: node(), TableByName :: atom()) -> [syn_registry_tuple()].
  317. get_registry_tuples_for_node(Node, TableByName) ->
  318. ets:select(TableByName, [{
  319. {'$1', '$2', '$3', '$4', '_', Node},
  320. [],
  321. [{{'$1', '$2', '$3', '$4'}}]
  322. }]).
  323. -spec find_registry_entry_by_name(Name :: any(), TableByName :: atom()) ->
  324. Entry :: syn_registry_entry() | undefined.
  325. find_registry_entry_by_name(Name, TableByName) ->
  326. case ets:lookup(TableByName, Name) of
  327. [] -> undefined;
  328. [Entry] -> Entry
  329. end.
  330. -spec find_registry_entries_by_pid(Pid :: pid(), TableByPid :: atom()) -> RegistryEntriesByPid :: [syn_registry_entry_by_pid()].
  331. find_registry_entries_by_pid(Pid, TableByPid) when is_pid(Pid) ->
  332. ets:lookup(TableByPid, Pid).
  333. -spec find_monitor_for_pid(Pid :: pid(), TableByPid :: atom()) -> reference() | undefined.
  334. find_monitor_for_pid(Pid, TableByPid) when is_pid(Pid) ->
  335. %% we use select instead of lookup to limit the results and thus cover the case
  336. %% when a process is registered with a considerable amount of names
  337. case ets:select(TableByPid, [{
  338. {Pid, '_', '_', '_', '$5', '_'},
  339. [],
  340. ['$5']
  341. }], 1) of
  342. {[MRef], _} -> MRef;
  343. '$end_of_table' -> undefined
  344. end.
  345. -spec maybe_demonitor(Pid :: pid(), TableByPid :: atom()) -> ok.
  346. maybe_demonitor(Pid, TableByPid) ->
  347. %% select 2: if only 1 is returned it means that no other aliases exist for the Pid
  348. %% we use select instead of lookup to limit the results and thus cover the case
  349. %% when a process is registered with a considerable amount of names
  350. case ets:select(TableByPid, [{
  351. {Pid, '_', '_', '_', '$5', '_'},
  352. [],
  353. ['$5']
  354. }], 2) of
  355. {[MRef], _} when is_reference(MRef) ->
  356. %% no other aliases, demonitor
  357. erlang:demonitor(MRef, [flush]),
  358. ok;
  359. _ ->
  360. ok
  361. end.
  362. -spec add_to_local_table(
  363. Name :: any(),
  364. Pid :: pid(),
  365. Meta :: any(),
  366. Time :: integer(),
  367. MRef :: undefined | reference(),
  368. TableByName :: atom(),
  369. TableByPid :: atom()
  370. ) -> true.
  371. add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid) ->
  372. %% insert
  373. true = ets:insert(TableByName, {Name, Pid, Meta, Time, MRef, node(Pid)}),
  374. %% since we use a table of type bag, we need to manually ensure that the key Pid, Name is unique
  375. true = ets:match_delete(TableByPid, {Pid, Name, '_', '_', '_', '_'}),
  376. true = ets:insert(TableByPid, {Pid, Name, Meta, Time, MRef, node(Pid)}).
  377. -spec remove_from_local_table(
  378. Name :: any(),
  379. Pid :: pid(),
  380. TableByName :: atom(),
  381. TableByPid :: atom()
  382. ) -> true.
  383. remove_from_local_table(Name, Pid, TableByName, TableByPid) ->
  384. true = ets:match_delete(TableByName, {Name, Pid, '_', '_', '_', '_'}),
  385. true = ets:match_delete(TableByPid, {Pid, Name, '_', '_', '_', '_'}).
  386. -spec update_local_table(
  387. Name :: any(),
  388. PreviousPid :: pid(),
  389. {
  390. Pid :: pid(),
  391. Meta :: any(),
  392. Time :: integer(),
  393. MRef :: undefined | reference()
  394. },
  395. TableByName :: atom(),
  396. TableByPid :: atom()
  397. ) -> true.
  398. update_local_table(Name, PreviousPid, {Pid, Meta, Time, MRef}, TableByName, TableByPid) ->
  399. maybe_demonitor(PreviousPid, TableByPid),
  400. remove_from_local_table(Name, PreviousPid, TableByName, TableByPid),
  401. add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid).
  402. -spec purge_registry_for_remote_node(Scope :: atom(), Node :: atom(), TableByName :: atom(), TableByPid :: atom()) -> true.
  403. purge_registry_for_remote_node(Scope, Node, TableByName, TableByPid) when Node =/= node() ->
  404. %% loop elements for callback in a separate process to free scope process
  405. RegistryTuples = get_registry_tuples_for_node(Node, TableByName),
  406. spawn(fun() ->
  407. lists:foreach(fun({Name, Pid, Meta, _Time}) ->
  408. syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta)
  409. end, RegistryTuples)
  410. end),
  411. %% remove all from pid table
  412. true = ets:match_delete(TableByName, {'_', '_', '_', '_', '_', Node}),
  413. true = ets:match_delete(TableByPid, {'_', '_', '_', '_', '_', Node}).
  414. -spec handle_registry_sync(
  415. Scope :: atom(),
  416. Name :: any(),
  417. Pid :: pid(),
  418. Meta :: any(),
  419. Time :: non_neg_integer(),
  420. #state{}
  421. ) -> any().
  422. handle_registry_sync(Scope, Name, Pid, Meta, Time, #state{
  423. table_by_name = TableByName,
  424. table_by_pid = TableByPid
  425. } = State) ->
  426. case find_registry_entry_by_name(Name, TableByName) of
  427. undefined ->
  428. %% no conflict
  429. add_to_local_table(Name, Pid, Meta, Time, undefined, TableByName, TableByPid),
  430. %% callback
  431. syn_event_handler:do_on_process_registered(Scope, Name, {undefined, undefined}, {Pid, Meta});
  432. {Name, Pid, TableMeta, _TableTime, MRef, _TableNode} ->
  433. %% same pid, more recent (because it comes from the same node, which means that it's sequential)
  434. add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid),
  435. %% callback
  436. syn_event_handler:do_on_process_registered(Scope, Name, {Pid, TableMeta}, {Pid, Meta});
  437. {Name, TablePid, TableMeta, TableTime, TableMRef, _TableNode} when node(TablePid) =:= node() ->
  438. %% current node runs a conflicting process -> resolve
  439. %% * the conflict is resolved by the two nodes that own the conflicting processes
  440. %% * when a process is chosen, the time is updated
  441. %% * the node that runs the process that is kept sends the sync_register message
  442. %% * recipients check that the time is more recent that what they have to ensure that there are no race conditions
  443. resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime, TableMRef}, State);
  444. {Name, TablePid, TableMeta, TableTime, _TableMRef, _TableNode} when TableTime < Time ->
  445. %% current node does not own any of the conflicting processes, update
  446. update_local_table(Name, TablePid, {Pid, Meta, Time, undefined}, TableByName, TableByPid),
  447. %% callbacks
  448. syn_event_handler:do_on_process_unregistered(Scope, Name, TablePid, TableMeta),
  449. syn_event_handler:do_on_process_registered(Scope, Name, {TablePid, TableMeta}, {Pid, Meta});
  450. {Name, _TablePid, _TableMeta, _TableTime, _TableMRef, _TableNode} ->
  451. %% race condition: incoming data is older, ignore
  452. ok
  453. end.
  454. -spec resolve_conflict(
  455. Scope :: atom(),
  456. Name :: any(),
  457. {Pid :: pid(), Meta :: any(), Time :: non_neg_integer()},
  458. {TablePid :: pid(), TableMeta :: any(), TableTime :: non_neg_integer(), TableMRef :: reference()},
  459. #state{}
  460. ) -> any().
  461. resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime, TableMRef}, #state{
  462. table_by_name = TableByName,
  463. table_by_pid = TableByPid
  464. } = State) ->
  465. %% call conflict resolution
  466. PidToKeep = syn_event_handler:do_resolve_registry_conflict(
  467. Scope,
  468. Name,
  469. {Pid, Meta, Time},
  470. {TablePid, TableMeta, TableTime}
  471. ),
  472. %% resolve
  473. case PidToKeep of
  474. Pid ->
  475. %% -> we keep the remote pid
  476. error_logger:info_msg("SYN[~s] Registry CONFLICT for name ~p@~s: ~p vs ~p -> keeping remote: ~p",
  477. [node(), Name, Scope, Pid, TablePid, Pid]
  478. ),
  479. %% update locally, the incoming sync_register will update with the time coming from remote node
  480. update_local_table(Name, TablePid, {Pid, Meta, Time, undefined}, TableByName, TableByPid),
  481. %% kill
  482. exit(TablePid, {syn_resolve_kill, Name, TableMeta}),
  483. %% callbacks
  484. syn_event_handler:do_on_process_unregistered(Scope, Name, TablePid, TableMeta),
  485. syn_event_handler:do_on_process_registered(Scope, Name, {TablePid, TableMeta}, {Pid, Meta});
  486. TablePid ->
  487. %% -> we keep the local pid
  488. error_logger:info_msg("SYN[~s] Registry CONFLICT for name ~p@~s: ~p vs ~p -> keeping local: ~p",
  489. [node(), Name, Scope, Pid, TablePid, TablePid]
  490. ),
  491. %% overwrite with updated time
  492. ResolveTime = erlang:system_time(),
  493. add_to_local_table(Name, TablePid, TableMeta, ResolveTime, TableMRef, TableByName, TableByPid),
  494. %% broadcast to all but remote node
  495. syn_gen_scope:broadcast({'3.0', sync_register, Scope, Name, TablePid, TableMeta, ResolveTime}, State);
  496. Invalid ->
  497. error_logger:info_msg("SYN[~s] Registry CONFLICT for name ~p@~s: ~p vs ~p -> none chosen (got: ~p)",
  498. [node(), Name, Scope, Pid, TablePid, Invalid]
  499. ),
  500. %% remove
  501. maybe_demonitor(TablePid, TableByPid),
  502. remove_from_local_table(Name, TablePid, TableByName, TableByPid),
  503. %% kill local, remote will be killed by other node performing the same resolve
  504. exit(TablePid, {syn_resolve_kill, Name, TableMeta}),
  505. %% callback
  506. syn_event_handler:do_on_process_unregistered(Scope, Name, TablePid, TableMeta)
  507. end.