syn_registry.erl 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579
  1. %% ==========================================================================================================
  2. %% Syn - A global Process Registry and Process Group manager.
  3. %%
  4. %% The MIT License (MIT)
  5. %%
  6. %% Copyright (c) 2015-2021 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
  7. %%
  8. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  9. %% of this software and associated documentation files (the "Software"), to deal
  10. %% in the Software without restriction, including without limitation the rights
  11. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  12. %% copies of the Software, and to permit persons to whom the Software is
  13. %% furnished to do so, subject to the following conditions:
  14. %%
  15. %% The above copyright notice and this permission notice shall be included in
  16. %% all copies or substantial portions of the Software.
  17. %%
  18. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  19. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THxE WARRANTIES OF MERCHANTABILITY,
  20. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  21. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  22. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  23. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  24. %% THE SOFTWARE.
  25. %% ==========================================================================================================
  26. -module(syn_registry).
  27. -behaviour(syn_gen_scope).
  28. %% API
  29. -export([start_link/1]).
  30. -export([get_subcluster_nodes/1]).
  31. -export([lookup/1, lookup/2]).
  32. -export([register/2, register/3, register/4]).
  33. -export([unregister/1, unregister/2]).
  34. -export([count/1, count/2]).
  35. %% syn_gen_scope callbacks
  36. -export([
  37. init/1,
  38. handle_call/3,
  39. handle_info/2,
  40. save_remote_data/2,
  41. get_local_data/1,
  42. purge_local_data_for_node/2
  43. ]).
  44. %% tests
  45. -ifdef(TEST).
  46. -export([add_to_local_table/7, remove_from_local_table/4]).
  47. -endif.
  48. %% includes
  49. -include("syn.hrl").
  50. %% ===================================================================
  51. %% API
  52. %% ===================================================================
  53. -spec start_link(Scope :: atom()) ->
  54. {ok, Pid :: pid()} | {error, {already_started, Pid :: pid()}} | {error, Reason :: term()}.
  55. start_link(Scope) when is_atom(Scope) ->
  56. syn_gen_scope:start_link(?MODULE, Scope).
  57. -spec get_subcluster_nodes(Scope :: atom()) -> [node()].
  58. get_subcluster_nodes(Scope) ->
  59. syn_gen_scope:get_subcluster_nodes(?MODULE, Scope).
  60. -spec lookup(Name :: term()) -> {pid(), Meta :: term()} | undefined.
  61. lookup(Name) ->
  62. lookup(default, Name).
  63. -spec lookup(Scope :: atom(), Name :: term()) -> {pid(), Meta :: term()} | undefined.
  64. lookup(Scope, Name) ->
  65. case syn_backbone:get_table_name(syn_registry_by_name, Scope) of
  66. undefined ->
  67. error({invalid_scope, Scope});
  68. TableByName ->
  69. case find_registry_entry_by_name(Name, TableByName) of
  70. undefined -> undefined;
  71. {Name, Pid, Meta, _, _, _} -> {Pid, Meta}
  72. end
  73. end.
  74. -spec register(Name :: term(), Pid :: pid()) -> ok | {error, Reason :: term()}.
  75. register(Name, Pid) ->
  76. register(default, Name, Pid, undefined).
  77. -spec register(NameOrScope :: term(), PidOrName :: term(), MetaOrPid :: term()) -> ok | {error, Reason :: term()}.
  78. register(Name, Pid, Meta) when is_pid(Pid) ->
  79. register(default, Name, Pid, Meta);
  80. register(Scope, Name, Pid) when is_pid(Pid) ->
  81. register(Scope, Name, Pid, undefined).
  82. -spec register(Scope :: atom(), Name :: term(), Pid :: pid(), Meta :: term()) -> ok | {error, Reason :: term()}.
  83. register(Scope, Name, Pid, Meta) ->
  84. Node = node(Pid),
  85. case syn_gen_scope:call(?MODULE, Node, Scope, {register_on_owner, node(), Name, Pid, Meta}) of
  86. {ok, {TablePid, TableMeta, Time, TableByName, TableByPid}} when Node =/= node() ->
  87. %% update table on caller node immediately so that subsequent calls have an updated registry
  88. add_to_local_table(Name, Pid, Meta, Time, undefined, TableByName, TableByPid),
  89. %% callback
  90. syn_event_handler:do_on_process_registered(Scope, Name, {TablePid, TableMeta}, {Pid, Meta}),
  91. %% return
  92. ok;
  93. {Response, _} ->
  94. Response
  95. end.
  96. -spec unregister(Name :: term()) -> ok | {error, Reason :: term()}.
  97. unregister(Name) ->
  98. unregister(default, Name).
  99. -spec unregister(Scope :: atom(), Name :: term()) -> ok | {error, Reason :: term()}.
  100. unregister(Scope, Name) ->
  101. case syn_backbone:get_table_name(syn_registry_by_name, Scope) of
  102. undefined ->
  103. error({invalid_scope, Scope});
  104. TableByName ->
  105. % get process' node
  106. case find_registry_entry_by_name(Name, TableByName) of
  107. undefined ->
  108. {error, undefined};
  109. {Name, Pid, Meta, _, _, _} ->
  110. Node = node(Pid),
  111. case syn_gen_scope:call(?MODULE, Node, Scope, {unregister_on_owner, node(), Name, Pid}) of
  112. {ok, TableByPid} when Node =/= node() ->
  113. %% remove table on caller node immediately so that subsequent calls have an updated registry
  114. remove_from_local_table(Name, Pid, TableByName, TableByPid),
  115. %% callback
  116. syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
  117. %% return
  118. ok;
  119. {Response, _} ->
  120. Response
  121. end
  122. end
  123. end.
  124. -spec count(Scope :: atom()) -> non_neg_integer().
  125. count(Scope) ->
  126. TableByName = syn_backbone:get_table_name(syn_registry_by_name, Scope),
  127. case ets:info(TableByName, size) of
  128. undefined -> error({invalid_scope, Scope});
  129. Value -> Value
  130. end.
  131. -spec count(Scope :: atom(), Node :: node()) -> non_neg_integer().
  132. count(Scope, Node) ->
  133. case syn_backbone:get_table_name(syn_registry_by_name, Scope) of
  134. undefined ->
  135. error({invalid_scope, Scope});
  136. TableByName ->
  137. ets:select_count(TableByName, [{
  138. {'_', '_', '_', '_', '_', Node},
  139. [],
  140. [true]
  141. }])
  142. end.
  143. %% ===================================================================
  144. %% Callbacks
  145. %% ===================================================================
  146. %% ----------------------------------------------------------------------------------------------------------
  147. %% Init
  148. %% ----------------------------------------------------------------------------------------------------------
  149. -spec init(#state{}) -> {ok, HandlerState :: term()}.
  150. init(State) ->
  151. HandlerState = #{},
  152. %% rebuild
  153. rebuild_monitors(State),
  154. %% init
  155. {ok, HandlerState}.
  156. %% ----------------------------------------------------------------------------------------------------------
  157. %% Call messages
  158. %% ----------------------------------------------------------------------------------------------------------
  159. -spec handle_call(Request :: term(), From :: {pid(), Tag :: term()}, #state{}) ->
  160. {reply, Reply :: term(), #state{}} |
  161. {reply, Reply :: term(), #state{}, timeout() | hibernate | {continue, term()}} |
  162. {noreply, #state{}} |
  163. {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
  164. {stop, Reason :: term(), Reply :: term(), #state{}} |
  165. {stop, Reason :: term(), #state{}}.
  166. handle_call({register_on_owner, RequesterNode, Name, Pid, Meta}, _From, #state{
  167. scope = Scope,
  168. table_by_name = TableByName,
  169. table_by_pid = TableByPid
  170. } = State) ->
  171. case is_process_alive(Pid) of
  172. true ->
  173. case find_registry_entry_by_name(Name, TableByName) of
  174. undefined ->
  175. %% available
  176. MRef = case find_monitor_for_pid(Pid, TableByPid) of
  177. undefined -> erlang:monitor(process, Pid); %% process is not monitored yet, add
  178. MRef0 -> MRef0
  179. end,
  180. %% add to local table
  181. Time = erlang:system_time(),
  182. add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid),
  183. %% callback
  184. syn_event_handler:do_on_process_registered(Scope, Name, {undefined, undefined}, {Pid, Meta}),
  185. %% broadcast
  186. syn_gen_scope:broadcast({'3.0', sync_register, Name, Pid, Meta, Time}, [RequesterNode], State),
  187. %% return
  188. {reply, {ok, {undefined, undefined, Time, TableByName, TableByPid}}, State};
  189. {Name, Pid, TableMeta, _TableTime, MRef, _TableNode} ->
  190. %% same pid, possibly new meta or time, overwrite
  191. Time = erlang:system_time(),
  192. add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid),
  193. %% callback
  194. syn_event_handler:do_on_process_registered(Scope, Name, {Pid, TableMeta}, {Pid, Meta}),
  195. %% broadcast
  196. syn_gen_scope:broadcast({'3.0', sync_register, Name, Pid, Meta, Time}, State),
  197. %% return
  198. {reply, {ok, {Pid, TableMeta, Time, TableByName, TableByPid}}, State};
  199. _ ->
  200. {reply, {{error, taken}, undefined}, State}
  201. end;
  202. false ->
  203. {reply, {{error, not_alive}, undefined}, State}
  204. end;
  205. handle_call({unregister_on_owner, RequesterNode, Name, Pid}, _From, #state{
  206. scope = Scope,
  207. table_by_name = TableByName,
  208. table_by_pid = TableByPid
  209. } = State) ->
  210. case find_registry_entry_by_name(Name, TableByName) of
  211. {Name, Pid, Meta, _Time, _MRef, _Node} ->
  212. %% demonitor if the process is not registered under other names
  213. maybe_demonitor(Pid, TableByPid),
  214. %% remove from table
  215. remove_from_local_table(Name, Pid, TableByName, TableByPid),
  216. %% callback
  217. syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
  218. %% broadcast
  219. syn_gen_scope:broadcast({'3.0', sync_unregister, Name, Pid, Meta}, [RequesterNode], State),
  220. %% return
  221. {reply, {ok, TableByPid}, State};
  222. {Name, _TablePid, _Meta, _Time, _MRef, _Node} ->
  223. %% process is registered locally with another pid: race condition, wait for sync to happen & return error
  224. {reply, {{error, race_condition}, undefined}, State};
  225. undefined ->
  226. {reply, {{error, undefined}, undefined}, State}
  227. end;
  228. handle_call(Request, From, State) ->
  229. error_logger:warning_msg("SYN[~s] Received from ~p an unknown call message: ~p", [node(), From, Request]),
  230. {reply, undefined, State}.
  231. %% ----------------------------------------------------------------------------------------------------------
  232. %% Info messages
  233. %% ----------------------------------------------------------------------------------------------------------
  234. -spec handle_info(Info :: timeout | term(), #state{}) ->
  235. {noreply, #state{}} |
  236. {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
  237. {stop, Reason :: term(), #state{}}.
  238. handle_info({'3.0', sync_register, Name, Pid, Meta, Time}, State) ->
  239. handle_registry_sync(Name, Pid, Meta, Time, State),
  240. {noreply, State};
  241. handle_info({'3.0', sync_unregister, Name, Pid, Meta}, #state{
  242. scope = Scope,
  243. table_by_name = TableByName,
  244. table_by_pid = TableByPid
  245. } = State) ->
  246. remove_from_local_table(Name, Pid, TableByName, TableByPid),
  247. %% callback
  248. syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
  249. %% return
  250. {noreply, State};
  251. handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{
  252. scope = Scope,
  253. table_by_name = TableByName,
  254. table_by_pid = TableByPid
  255. } = State) ->
  256. case find_registry_entries_by_pid(Pid, TableByPid) of
  257. [] ->
  258. error_logger:warning_msg(
  259. "SYN[~s] Received a DOWN message from an unknown process ~p with reason: ~p",
  260. [node(), Pid, Reason]
  261. );
  262. Entries ->
  263. lists:foreach(fun({_Pid, Name, Meta, _, _, _}) ->
  264. %% remove from table
  265. remove_from_local_table(Name, Pid, TableByName, TableByPid),
  266. %% callback
  267. syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
  268. %% broadcast
  269. syn_gen_scope:broadcast({'3.0', sync_unregister, Name, Pid, Meta}, State)
  270. end, Entries)
  271. end,
  272. %% return
  273. {noreply, State};
  274. handle_info(Info, State) ->
  275. error_logger:warning_msg("SYN[~s] Received an unknown info message: ~p", [node(), Info]),
  276. {noreply, State}.
  277. %% ----------------------------------------------------------------------------------------------------------
  278. %% Data
  279. %% ----------------------------------------------------------------------------------------------------------
  280. -spec get_local_data(#state{}) -> {ok, Data :: term()} | undefined.
  281. get_local_data(#state{table_by_name = TableByName}) ->
  282. {ok, get_registry_tuples_for_node(node(), TableByName)}.
  283. -spec save_remote_data(RemoteData :: term(), #state{}) -> any().
  284. save_remote_data(RegistryTuplesOfRemoteNode, State) ->
  285. %% insert tuples
  286. lists:foreach(fun({Name, Pid, Meta, Time}) ->
  287. handle_registry_sync(Name, Pid, Meta, Time, State)
  288. end, RegistryTuplesOfRemoteNode).
  289. -spec purge_local_data_for_node(Node :: node(), #state{}) -> any().
  290. purge_local_data_for_node(Node, #state{
  291. scope = Scope,
  292. table_by_name = TableByName,
  293. table_by_pid = TableByPid
  294. }) ->
  295. purge_registry_for_remote_node(Scope, Node, TableByName, TableByPid).
  296. %% ===================================================================
  297. %% Internal
  298. %% ===================================================================
  299. -spec rebuild_monitors(#state{}) -> ok.
  300. rebuild_monitors(#state{
  301. table_by_name = TableByName
  302. } = State) ->
  303. RegistryTuples = get_registry_tuples_for_node(node(), TableByName),
  304. do_rebuild_monitors(RegistryTuples, #{}, State).
  305. -spec do_rebuild_monitors([syn_registry_tuple()], [reference()], #state{}) -> ok.
  306. do_rebuild_monitors([], _, _) -> ok;
  307. do_rebuild_monitors([{Name, Pid, Meta, Time} | T], NewMonitorRefs, #state{
  308. table_by_name = TableByName,
  309. table_by_pid = TableByPid
  310. } = State) ->
  311. remove_from_local_table(Name, Pid, TableByName, TableByPid),
  312. case is_process_alive(Pid) of
  313. true ->
  314. case maps:find(Pid, NewMonitorRefs) of
  315. error ->
  316. MRef = erlang:monitor(process, Pid),
  317. add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid),
  318. do_rebuild_monitors(T, maps:put(Pid, MRef, NewMonitorRefs), State);
  319. {ok, MRef} ->
  320. add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid),
  321. do_rebuild_monitors(T, NewMonitorRefs, State)
  322. end;
  323. _ ->
  324. do_rebuild_monitors(T, NewMonitorRefs, State)
  325. end.
  326. -spec get_registry_tuples_for_node(Node :: node(), TableByName :: atom()) -> [syn_registry_tuple()].
  327. get_registry_tuples_for_node(Node, TableByName) ->
  328. ets:select(TableByName, [{
  329. {'$1', '$2', '$3', '$4', '_', Node},
  330. [],
  331. [{{'$1', '$2', '$3', '$4'}}]
  332. }]).
  333. -spec find_registry_entry_by_name(Name :: term(), TableByName :: atom()) ->
  334. Entry :: syn_registry_entry() | undefined.
  335. find_registry_entry_by_name(Name, TableByName) ->
  336. case ets:lookup(TableByName, Name) of
  337. [] -> undefined;
  338. [Entry] -> Entry
  339. end.
  340. -spec find_registry_entries_by_pid(Pid :: pid(), TableByPid :: atom()) -> RegistryEntriesByPid :: [syn_registry_entry_by_pid()].
  341. find_registry_entries_by_pid(Pid, TableByPid) when is_pid(Pid) ->
  342. ets:lookup(TableByPid, Pid).
  343. -spec find_monitor_for_pid(Pid :: pid(), TableByPid :: atom()) -> reference() | undefined.
  344. find_monitor_for_pid(Pid, TableByPid) when is_pid(Pid) ->
  345. %% we use select instead of lookup to limit the results and thus cover the case
  346. %% when a process is registered with a considerable amount of names
  347. case ets:select(TableByPid, [{
  348. {Pid, '_', '_', '_', '$5', '_'},
  349. [],
  350. ['$5']
  351. }], 1) of
  352. {[MRef], _} -> MRef;
  353. '$end_of_table' -> undefined
  354. end.
  355. -spec maybe_demonitor(Pid :: pid(), TableByPid :: atom()) -> ok.
  356. maybe_demonitor(Pid, TableByPid) ->
  357. %% select 2: if only 1 is returned it means that no other aliases exist for the Pid
  358. %% we use select instead of lookup to limit the results and thus cover the case
  359. %% when a process is registered with a considerable amount of names
  360. case ets:select(TableByPid, [{
  361. {Pid, '_', '_', '_', '$5', '_'},
  362. [],
  363. ['$5']
  364. }], 2) of
  365. {[MRef], _} when is_reference(MRef) ->
  366. %% no other aliases, demonitor
  367. erlang:demonitor(MRef, [flush]),
  368. ok;
  369. _ ->
  370. ok
  371. end.
  372. -spec add_to_local_table(
  373. Name :: term(),
  374. Pid :: pid(),
  375. Meta :: term(),
  376. Time :: integer(),
  377. MRef :: undefined | reference(),
  378. TableByName :: atom(),
  379. TableByPid :: atom()
  380. ) -> true.
  381. add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid) ->
  382. %% insert
  383. true = ets:insert(TableByName, {Name, Pid, Meta, Time, MRef, node(Pid)}),
  384. %% since we use a table of type bag, we need to manually ensure that the key Pid, Name is unique
  385. true = ets:match_delete(TableByPid, {Pid, Name, '_', '_', '_', '_'}),
  386. true = ets:insert(TableByPid, {Pid, Name, Meta, Time, MRef, node(Pid)}).
  387. -spec remove_from_local_table(
  388. Name :: term(),
  389. Pid :: pid(),
  390. TableByName :: atom(),
  391. TableByPid :: atom()
  392. ) -> true.
  393. remove_from_local_table(Name, Pid, TableByName, TableByPid) ->
  394. true = ets:match_delete(TableByName, {Name, Pid, '_', '_', '_', '_'}),
  395. true = ets:match_delete(TableByPid, {Pid, Name, '_', '_', '_', '_'}).
  396. -spec update_local_table(
  397. Name :: term(),
  398. PreviousPid :: pid(),
  399. {
  400. Pid :: pid(),
  401. Meta :: term(),
  402. Time :: integer(),
  403. MRef :: undefined | reference()
  404. },
  405. TableByName :: atom(),
  406. TableByPid :: atom()
  407. ) -> true.
  408. update_local_table(Name, PreviousPid, {Pid, Meta, Time, MRef}, TableByName, TableByPid) ->
  409. maybe_demonitor(PreviousPid, TableByPid),
  410. remove_from_local_table(Name, PreviousPid, TableByName, TableByPid),
  411. add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid).
  412. -spec purge_registry_for_remote_node(Scope :: atom(), Node :: atom(), TableByName :: atom(), TableByPid :: atom()) -> true.
  413. purge_registry_for_remote_node(Scope, Node, TableByName, TableByPid) when Node =/= node() ->
  414. %% loop elements for callback in a separate process to free scope process
  415. RegistryTuples = get_registry_tuples_for_node(Node, TableByName),
  416. spawn(fun() ->
  417. lists:foreach(fun({Name, Pid, Meta, _Time}) ->
  418. syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta)
  419. end, RegistryTuples)
  420. end),
  421. %% remove all from pid table
  422. true = ets:match_delete(TableByName, {'_', '_', '_', '_', '_', Node}),
  423. true = ets:match_delete(TableByPid, {'_', '_', '_', '_', '_', Node}).
  424. -spec handle_registry_sync(
  425. Name :: term(),
  426. Pid :: pid(),
  427. Meta :: term(),
  428. Time :: non_neg_integer(),
  429. #state{}
  430. ) -> any().
  431. handle_registry_sync(Name, Pid, Meta, Time, #state{
  432. scope = Scope,
  433. table_by_name = TableByName,
  434. table_by_pid = TableByPid
  435. } = State) ->
  436. case find_registry_entry_by_name(Name, TableByName) of
  437. undefined ->
  438. %% no conflict
  439. add_to_local_table(Name, Pid, Meta, Time, undefined, TableByName, TableByPid),
  440. %% callback
  441. syn_event_handler:do_on_process_registered(Scope, Name, {undefined, undefined}, {Pid, Meta});
  442. {Name, Pid, TableMeta, _TableTime, MRef, _TableNode} ->
  443. %% same pid, more recent (because it comes from the same node, which means that it's sequential)
  444. add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid),
  445. %% callback
  446. syn_event_handler:do_on_process_registered(Scope, Name, {Pid, TableMeta}, {Pid, Meta});
  447. {Name, TablePid, TableMeta, TableTime, TableMRef, _TableNode} when node(TablePid) =:= node() ->
  448. %% current node runs a conflicting process -> resolve
  449. %% * the conflict is resolved by the two nodes that own the conflicting processes
  450. %% * when a process is chosen, the time is updated
  451. %% * the node that runs the process that is kept sends the sync_register message
  452. %% * recipients check that the time is more recent that what they have to ensure that there are no race conditions
  453. resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime, TableMRef}, State);
  454. {Name, TablePid, TableMeta, TableTime, _TableMRef, _TableNode} when TableTime < Time ->
  455. %% current node does not own any of the conflicting processes, update
  456. update_local_table(Name, TablePid, {Pid, Meta, Time, undefined}, TableByName, TableByPid),
  457. %% callbacks
  458. syn_event_handler:do_on_process_unregistered(Scope, Name, TablePid, TableMeta),
  459. syn_event_handler:do_on_process_registered(Scope, Name, {TablePid, TableMeta}, {Pid, Meta});
  460. {Name, _TablePid, _TableMeta, _TableTime, _TableMRef, _TableNode} ->
  461. %% race condition: incoming data is older, ignore
  462. ok
  463. end.
  464. -spec resolve_conflict(
  465. Scope :: atom(),
  466. Name :: term(),
  467. {Pid :: pid(), Meta :: term(), Time :: non_neg_integer()},
  468. {TablePid :: pid(), TableMeta :: term(), TableTime :: non_neg_integer(), TableMRef :: reference()},
  469. #state{}
  470. ) -> any().
  471. resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime, TableMRef}, #state{
  472. table_by_name = TableByName,
  473. table_by_pid = TableByPid
  474. } = State) ->
  475. %% call conflict resolution
  476. PidToKeep = syn_event_handler:do_resolve_registry_conflict(
  477. Scope,
  478. Name,
  479. {Pid, Meta, Time},
  480. {TablePid, TableMeta, TableTime}
  481. ),
  482. %% resolve
  483. case PidToKeep of
  484. Pid ->
  485. %% -> we keep the remote pid
  486. error_logger:info_msg("SYN[~s] Registry CONFLICT for name ~p@~s: ~p vs ~p -> keeping remote: ~p",
  487. [node(), Name, Scope, Pid, TablePid, Pid]
  488. ),
  489. %% update locally, the incoming sync_register will update with the time coming from remote node
  490. update_local_table(Name, TablePid, {Pid, Meta, Time, undefined}, TableByName, TableByPid),
  491. %% kill
  492. exit(TablePid, {syn_resolve_kill, Name, TableMeta}),
  493. %% callbacks
  494. syn_event_handler:do_on_process_unregistered(Scope, Name, TablePid, TableMeta),
  495. syn_event_handler:do_on_process_registered(Scope, Name, {TablePid, TableMeta}, {Pid, Meta});
  496. TablePid ->
  497. %% -> we keep the local pid
  498. error_logger:info_msg("SYN[~s] Registry CONFLICT for name ~p@~s: ~p vs ~p -> keeping local: ~p",
  499. [node(), Name, Scope, Pid, TablePid, TablePid]
  500. ),
  501. %% overwrite with updated time
  502. ResolveTime = erlang:system_time(),
  503. add_to_local_table(Name, TablePid, TableMeta, ResolveTime, TableMRef, TableByName, TableByPid),
  504. %% broadcast to all but remote node
  505. syn_gen_scope:broadcast({'3.0', sync_register, Name, TablePid, TableMeta, ResolveTime}, State);
  506. Invalid ->
  507. error_logger:info_msg("SYN[~s] Registry CONFLICT for name ~p@~s: ~p vs ~p -> none chosen (got: ~p)",
  508. [node(), Name, Scope, Pid, TablePid, Invalid]
  509. ),
  510. %% remove
  511. maybe_demonitor(TablePid, TableByPid),
  512. remove_from_local_table(Name, TablePid, TableByName, TableByPid),
  513. %% kill local, remote will be killed by other node performing the same resolve
  514. exit(TablePid, {syn_resolve_kill, Name, TableMeta}),
  515. %% callback
  516. syn_event_handler:do_on_process_unregistered(Scope, Name, TablePid, TableMeta)
  517. end.