syn_registry.erl 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680
  1. %% ==========================================================================================================
  2. %% Syn - A global Process Registry and Process Group manager.
  3. %%
  4. %% The MIT License (MIT)
  5. %%
  6. %% Copyright (c) 2015-2021 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
  7. %%
  8. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  9. %% of this software and associated documentation files (the "Software"), to deal
  10. %% in the Software without restriction, including without limitation the rights
  11. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  12. %% copies of the Software, and to permit persons to whom the Software is
  13. %% furnished to do so, subject to the following conditions:
  14. %%
  15. %% The above copyright notice and this permission notice shall be included in
  16. %% all copies or substantial portions of the Software.
  17. %%
  18. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  19. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THxE WARRANTIES OF MERCHANTABILITY,
  20. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  21. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  22. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  23. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  24. %% THE SOFTWARE.
  25. %% ==========================================================================================================
  26. -module(syn_registry).
  27. -behaviour(gen_server).
  28. %% API
  29. -export([start_link/1]).
  30. -export([get_subcluster_nodes/1]).
  31. -export([lookup/1, lookup/2]).
  32. -export([register/2, register/3, register/4]).
  33. -export([unregister/1, unregister/2]).
  34. -export([count/1, count/2]).
  35. %% gen_server callbacks
  36. -export([
  37. init/1,
  38. handle_call/3,
  39. handle_cast/2,
  40. handle_info/2,
  41. handle_continue/2,
  42. terminate/2,
  43. code_change/3
  44. ]).
  45. %% tests
  46. -ifdef(TEST).
  47. -export([add_to_local_table/6, remove_from_local_table/3]).
  48. -endif.
  49. %% records
  50. -record(state, {
  51. scope = default :: atom(),
  52. process_name = syn_registry_default :: atom(),
  53. nodes = #{} :: #{node() => pid()}
  54. }).
  55. %% includes
  56. -include("syn.hrl").
  57. %% ===================================================================
  58. %% API
  59. %% ===================================================================
  60. -spec start_link(Scope :: atom()) ->
  61. {ok, Pid :: pid()} | {error, {already_started, Pid :: pid()}} | {error, Reason :: any()}.
  62. start_link(Scope) when is_atom(Scope) ->
  63. ProcessName = get_process_name_for_scope(Scope),
  64. Args = [Scope, ProcessName],
  65. gen_server:start_link({local, ProcessName}, ?MODULE, Args, []).
  66. -spec get_subcluster_nodes(Scope :: atom()) -> [node()].
  67. get_subcluster_nodes(Scope) ->
  68. ProcessName = get_process_name_for_scope(Scope),
  69. gen_server:call(ProcessName, get_subcluster_nodes).
  70. -spec lookup(Name :: any()) -> {pid(), Meta :: any()} | undefined.
  71. lookup(Name) ->
  72. lookup(default, Name).
  73. -spec lookup(Scope :: atom(), Name :: any()) -> {pid(), Meta :: any()} | undefined.
  74. lookup(Scope, Name) ->
  75. try find_registry_entry_by_name(Scope, Name) of
  76. undefined -> undefined;
  77. {{Name, Pid}, Meta, _, _, _} -> {Pid, Meta}
  78. catch
  79. error:badarg -> error({invalid_scope, Scope})
  80. end.
  81. -spec register(Name :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
  82. register(Name, Pid) ->
  83. register(default, Name, Pid, undefined).
  84. -spec register(NameOrScope :: any(), PidOrName :: any(), MetaOrPid :: any()) -> ok | {error, Reason :: any()}.
  85. register(Name, Pid, Meta) when is_pid(Pid) ->
  86. register(default, Name, Pid, Meta);
  87. register(Scope, Name, Pid) when is_pid(Pid) ->
  88. register(Scope, Name, Pid, undefined).
  89. -spec register(Scope :: atom(), Name :: any(), Pid :: pid(), Meta :: any()) -> ok | {error, Reason :: any()}.
  90. register(Scope, Name, Pid, Meta) ->
  91. ProcessName = get_process_name_for_scope(Scope),
  92. Node = node(Pid),
  93. try gen_server:call({ProcessName, Node}, {register_on_owner, node(), Name, Pid, Meta}) of
  94. {ok, {TablePid, TableMeta, Time}} when Node =/= node() ->
  95. %% update table on caller node immediately so that subsequent calls have an updated registry
  96. add_to_local_table(Scope, Name, Pid, Meta, Time, undefined),
  97. %% callback
  98. syn_event_handler:do_on_process_registered(Scope, Name, {TablePid, TableMeta}, {Pid, Meta}),
  99. %% return
  100. ok;
  101. {Response, _} ->
  102. Response
  103. catch
  104. exit:{noproc, {gen_server, call, _}} -> error({invalid_scope, Scope})
  105. end.
  106. -spec unregister(Name :: any()) -> ok | {error, Reason :: any()}.
  107. unregister(Name) ->
  108. unregister(default, Name).
  109. -spec unregister(Scope :: atom(), Name :: any()) -> ok | {error, Reason :: any()}.
  110. unregister(Scope, Name) ->
  111. % get process' node
  112. try find_registry_entry_by_name(Scope, Name) of
  113. undefined ->
  114. {error, undefined};
  115. {{Name, Pid}, Meta, _, _, _} ->
  116. ProcessName = get_process_name_for_scope(Scope),
  117. Node = node(Pid),
  118. case gen_server:call({ProcessName, Node}, {unregister_on_owner, node(), Name, Pid}) of
  119. ok when Node =/= node() ->
  120. %% remove table on caller node immediately so that subsequent calls have an updated registry
  121. remove_from_local_table(Scope, Name, Pid),
  122. %% callback
  123. syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
  124. %% return
  125. ok;
  126. Response ->
  127. Response
  128. end
  129. catch
  130. exit:{noproc, {gen_server, call, _}} -> error({invalid_scope, Scope});
  131. error:badarg -> error({invalid_scope, Scope})
  132. end.
  133. -spec count(Scope :: atom()) -> non_neg_integer().
  134. count(Scope) ->
  135. case ets:info(syn_backbone:get_table_name(syn_registry_by_name, Scope), size) of
  136. undefined -> error({invalid_scope, Scope});
  137. Value -> Value
  138. end.
  139. -spec count(Scope :: atom(), Node :: node()) -> non_neg_integer().
  140. count(Scope, Node) ->
  141. case catch ets:select_count(syn_backbone:get_table_name(syn_registry_by_name, Scope), [{
  142. {{'_', '_'}, '_', '_', '_', Node},
  143. [],
  144. [true]
  145. }]) of
  146. {'EXIT', {badarg, [{ets, select_count, _, _} | _]}} -> error({invalid_scope, Scope});
  147. Value -> Value
  148. end.
  149. %% ===================================================================
  150. %% Callbacks
  151. %% ===================================================================
  152. %% ----------------------------------------------------------------------------------------------------------
  153. %% Init
  154. %% ----------------------------------------------------------------------------------------------------------
  155. -spec init(Args :: term()) ->
  156. {ok, State :: term()} | {ok, State :: term(), timeout() | hibernate | {continue, term()}} |
  157. {stop, Reason :: term()} | ignore.
  158. init([Scope, ProcessName]) ->
  159. %% monitor nodes
  160. ok = net_kernel:monitor_nodes(true),
  161. %% rebuild monitors (if after crash)
  162. rebuild_monitors(Scope),
  163. %% build state
  164. State = #state{
  165. scope = Scope,
  166. process_name = ProcessName
  167. },
  168. %% init
  169. {ok, State, {continue, after_init}}.
  170. %% ----------------------------------------------------------------------------------------------------------
  171. %% Call messages
  172. %% ----------------------------------------------------------------------------------------------------------
  173. -spec handle_call(Request :: term(), From :: {pid(), Tag :: term()},
  174. State :: term()) ->
  175. {reply, Reply :: term(), NewState :: term()} |
  176. {reply, Reply :: term(), NewState :: term(), timeout() | hibernate | {continue, term()}} |
  177. {noreply, NewState :: term()} |
  178. {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
  179. {stop, Reason :: term(), Reply :: term(), NewState :: term()} |
  180. {stop, Reason :: term(), NewState :: term()}.
  181. handle_call(get_subcluster_nodes, _From, #state{
  182. nodes = Nodes
  183. } = State) ->
  184. {reply, Nodes, State};
  185. handle_call({register_on_owner, RequesterNode, Name, Pid, Meta}, _From, #state{
  186. scope = Scope
  187. } = State) ->
  188. case is_process_alive(Pid) of
  189. true ->
  190. case find_registry_entry_by_name(Scope, Name) of
  191. undefined ->
  192. %% available
  193. MRef = case find_monitor_for_pid(Scope, Pid) of
  194. undefined -> erlang:monitor(process, Pid); %% process is not monitored yet, add
  195. MRef0 -> MRef0
  196. end,
  197. %% add to local table
  198. Time = erlang:system_time(),
  199. add_to_local_table(Scope, Name, Pid, Meta, Time, MRef),
  200. %% callback
  201. syn_event_handler:do_on_process_registered(Scope, Name, {undefined, undefined}, {Pid, Meta}),
  202. %% broadcast
  203. broadcast({'3.0', sync_register, Scope, Name, Pid, Meta, Time}, [RequesterNode], State),
  204. %% return
  205. {reply, {ok, {undefined, undefined, Time}}, State};
  206. {{Name, Pid}, TableMeta, _TableTime, MRef, _TableNode} ->
  207. %% same pid, possibly new meta or time, overwrite
  208. Time = erlang:system_time(),
  209. add_to_local_table(Scope, Name, Pid, Meta, Time, MRef),
  210. %% callback
  211. syn_event_handler:do_on_process_registered(Scope, Name, {Pid, TableMeta}, {Pid, Meta}),
  212. %% broadcast
  213. broadcast({'3.0', sync_register, Scope, Name, Pid, Meta, Time}, State),
  214. %% return
  215. {reply, {ok, {Pid, TableMeta, Time}}, State};
  216. _ ->
  217. {reply, {{error, taken}, undefined}, State}
  218. end;
  219. false ->
  220. {reply, {{error, not_alive}, undefined}, State}
  221. end;
  222. handle_call({unregister_on_owner, RequesterNode, Name, Pid}, _From, #state{scope = Scope} = State) ->
  223. case find_registry_entry_by_name(Scope, Name) of
  224. {{Name, Pid}, Meta, _Time, _MRef, _Node} ->
  225. %% demonitor if the process is not registered under other names
  226. maybe_demonitor(Scope, Pid),
  227. %% remove from table
  228. remove_from_local_table(Scope, Name, Pid),
  229. %% callback
  230. syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
  231. %% broadcast
  232. broadcast({'3.0', sync_unregister, Name, Pid, Meta}, [RequesterNode], State),
  233. %% return
  234. {reply, ok, State};
  235. {{Name, _TablePid}, _Meta, _Time, _MRef, _Node} ->
  236. %% process is registered locally with another pid: race condition, wait for sync to happen & return error
  237. {reply, {error, race_condition}, State};
  238. undefined ->
  239. {reply, {error, undefined}, State}
  240. end;
  241. handle_call(Request, From, State) ->
  242. error_logger:warning_msg("SYN[~s] Received from ~p an unknown call message: ~p", [node(), Request, From]),
  243. {reply, undefined, State}.
  244. %% ----------------------------------------------------------------------------------------------------------
  245. %% Cast messages
  246. %% ----------------------------------------------------------------------------------------------------------
  247. -spec handle_cast(Request :: term(), State :: term()) ->
  248. {noreply, NewState :: term()} |
  249. {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
  250. {stop, Reason :: term(), NewState :: term()}.
  251. handle_cast(Msg, State) ->
  252. error_logger:warning_msg("SYN[~s] Received an unknown cast message: ~p", [node(), Msg]),
  253. {noreply, State}.
  254. %% ----------------------------------------------------------------------------------------------------------
  255. %% Info messages
  256. %% ----------------------------------------------------------------------------------------------------------
  257. -spec handle_info(Info :: timeout | term(), State :: term()) ->
  258. {noreply, NewState :: term()} |
  259. {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
  260. {stop, Reason :: term(), NewState :: term()}.
  261. handle_info({'3.0', sync_register, Scope, Name, Pid, Meta, Time}, State) ->
  262. handle_registry_sync(Scope, Name, Pid, Meta, Time, State),
  263. {noreply, State};
  264. handle_info({'3.0', sync_unregister, Name, Pid, Meta}, #state{scope = Scope} = State) ->
  265. remove_from_local_table(Scope, Name, Pid),
  266. %% callback
  267. syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
  268. %% return
  269. {noreply, State};
  270. handle_info({'3.0', discover, RemoteScopePid}, #state{
  271. scope = Scope,
  272. nodes = Nodes
  273. } = State) ->
  274. RemoteScopeNode = node(RemoteScopePid),
  275. error_logger:info_msg("SYN[~s] Received DISCOVER request from node '~s' and scope '~s'", [node(), RemoteScopeNode, Scope]),
  276. %% send data
  277. RegistryTuplesOfLocalNode = get_registry_tuples_for_node(Scope, node()),
  278. send_to_node(RemoteScopeNode, {'3.0', ack_sync, self(), RegistryTuplesOfLocalNode}, State),
  279. %% is this a new node?
  280. case maps:is_key(RemoteScopeNode, Nodes) of
  281. true ->
  282. %% already known, ignore
  283. {noreply, State};
  284. false ->
  285. %% monitor
  286. _MRef = monitor(process, RemoteScopePid),
  287. {noreply, State#state{nodes = Nodes#{RemoteScopeNode => RemoteScopePid}}}
  288. end;
  289. handle_info({'3.0', ack_sync, RemoteScopePid, RegistryTuplesOfRemoteNode}, #state{
  290. scope = Scope,
  291. nodes = Nodes
  292. } = State) ->
  293. RemoteScopeNode = node(RemoteScopePid),
  294. error_logger:info_msg("SYN[~s] Received ACK SYNC (~w entries) from node '~s' and scope '~s'",
  295. [node(), length(RegistryTuplesOfRemoteNode), RemoteScopeNode, Scope]
  296. ),
  297. %% insert tuples
  298. lists:foreach(fun({Name, Pid, Meta, Time}) ->
  299. handle_registry_sync(Scope, Name, Pid, Meta, Time, State)
  300. end, RegistryTuplesOfRemoteNode),
  301. %% is this a new node?
  302. case maps:is_key(RemoteScopeNode, Nodes) of
  303. true ->
  304. %% already known, ignore
  305. {noreply, State};
  306. false ->
  307. %% monitor
  308. _MRef = monitor(process, RemoteScopePid),
  309. %% send data
  310. RegistryTuplesOfLocalNode = get_registry_tuples_for_node(Scope, node()),
  311. send_to_node(RemoteScopeNode, {'3.0', ack_sync, self(), RegistryTuplesOfLocalNode}, State),
  312. %% return
  313. {noreply, State#state{nodes = Nodes#{RemoteScopeNode => RemoteScopePid}}}
  314. end;
  315. handle_info({'DOWN', _MRef, process, Pid, _Reason}, #state{
  316. scope = Scope,
  317. nodes = Nodes
  318. } = State) when node(Pid) =/= node() ->
  319. %% scope process down
  320. RemoteNode = node(Pid),
  321. case maps:take(RemoteNode, Nodes) of
  322. {Pid, Nodes1} ->
  323. error_logger:info_msg("SYN[~s] Scope Process ~p is DOWN on node '~s'", [node(), Scope, RemoteNode]),
  324. purge_registry_for_remote_node(Scope, RemoteNode),
  325. {noreply, State#state{nodes = Nodes1}};
  326. error ->
  327. error_logger:warning_msg("SYN[~s] Received DOWN message from unknown pid: ~p", [node(), Pid]),
  328. {noreply, State}
  329. end;
  330. handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{scope = Scope} = State) ->
  331. case find_registry_entries_by_pid(Scope, Pid) of
  332. [] ->
  333. error_logger:warning_msg(
  334. "SYN[~s] Received a DOWN message from an unknown process ~p with reason: ~p",
  335. [node(), Pid, Reason]
  336. );
  337. Entries ->
  338. lists:foreach(fun({{Name, _Pid}, Meta, _, _, _}) ->
  339. %% remove from table
  340. remove_from_local_table(Scope, Name, Pid),
  341. %% callback
  342. syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
  343. %% broadcast
  344. broadcast({'3.0', sync_unregister, Name, Pid, Meta}, State)
  345. end, Entries)
  346. end,
  347. %% return
  348. {noreply, State};
  349. handle_info({nodedown, _Node}, State) ->
  350. %% ignore & wait for monitor DOWN message
  351. {noreply, State};
  352. handle_info({nodeup, RemoteNode}, #state{scope = Scope} = State) ->
  353. error_logger:info_msg("SYN[~s] Node '~s' has joined the cluster, sending discover message for scope '~s'",
  354. [node(), RemoteNode, Scope]
  355. ),
  356. send_to_node(RemoteNode, {'3.0', discover, self()}, State),
  357. {noreply, State};
  358. handle_info(Info, State) ->
  359. error_logger:warning_msg("SYN[~s] Received an unknown info message: ~p", [node(), Info]),
  360. {noreply, State}.
  361. %% ----------------------------------------------------------------------------------------------------------
  362. %% Continue messages
  363. %% ----------------------------------------------------------------------------------------------------------
  364. -spec handle_continue(Info :: term(), State :: term()) ->
  365. {noreply, NewState :: term()} |
  366. {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
  367. {stop, Reason :: term(), NewState :: term()}.
  368. handle_continue(after_init, #state{scope = Scope} = State) ->
  369. error_logger:info_msg("SYN[~s] Discovering the cluster with scope '~s'", [node(), Scope]),
  370. broadcast_all({'3.0', discover, self()}, State),
  371. {noreply, State}.
  372. %% ----------------------------------------------------------------------------------------------------------
  373. %% Terminate
  374. %% ----------------------------------------------------------------------------------------------------------
  375. -spec terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), State :: term()) -> term().
  376. terminate(Reason, _State) ->
  377. error_logger:info_msg("SYN[~s] Terminating with reason: ~p", [node(), Reason]),
  378. terminated.
  379. %% ----------------------------------------------------------------------------------------------------------
  380. %% Convert process state when code is changed.
  381. %% ----------------------------------------------------------------------------------------------------------
  382. -spec code_change(OldVsn :: (term() | {down, term()}), State :: term(),
  383. Extra :: term()) ->
  384. {ok, NewState :: term()} | {error, Reason :: term()}.
  385. code_change(_OldVsn, State, _Extra) ->
  386. {ok, State}.
  387. %% ===================================================================
  388. %% Internal
  389. %% ===================================================================
  390. -spec get_process_name_for_scope(Scope :: atom()) -> atom().
  391. get_process_name_for_scope(Scope) ->
  392. ModuleBin = atom_to_binary(?MODULE),
  393. ScopeBin = atom_to_binary(Scope),
  394. binary_to_atom(<<ModuleBin/binary, "_", ScopeBin/binary>>).
  395. -spec rebuild_monitors(Scope :: atom()) -> ok.
  396. rebuild_monitors(Scope) ->
  397. RegistryTuples = get_registry_tuples_for_node(Scope, node()),
  398. lists:foreach(fun({Name, Pid, Meta, Time}) ->
  399. remove_from_local_table(Scope, Name, Pid),
  400. case is_process_alive(Pid) of
  401. true ->
  402. MRef = erlang:monitor(process, Pid),
  403. add_to_local_table(Scope, Name, Pid, Meta, Time, MRef);
  404. _ ->
  405. ok
  406. end
  407. end, RegistryTuples).
  408. -spec broadcast(Message :: any(), #state{}) -> any().
  409. broadcast(Message, State) ->
  410. broadcast(Message, [], State).
  411. -spec broadcast(Message :: any(), ExcludedNodes :: [node()], #state{}) -> any().
  412. broadcast(Message, ExcludedNodes, #state{process_name = ProcessName, nodes = Nodes}) ->
  413. lists:foreach(fun(RemoteNode) ->
  414. erlang:send({ProcessName, RemoteNode}, Message, [noconnect])
  415. end, maps:keys(Nodes) -- ExcludedNodes).
  416. -spec broadcast_all(Message :: any(), #state{}) -> any().
  417. broadcast_all(Message, #state{process_name = ProcessName}) ->
  418. lists:foreach(fun(RemoteNode) ->
  419. erlang:send({ProcessName, RemoteNode}, Message, [noconnect])
  420. end, nodes()).
  421. -spec send_to_node(RemoteNode :: node(), Message :: any(), #state{}) -> any().
  422. send_to_node(RemoteNode, Message, #state{process_name = ProcessName}) ->
  423. erlang:send({ProcessName, RemoteNode}, Message, [noconnect]).
  424. -spec get_registry_tuples_for_node(Scope :: atom(), Node :: node()) -> [syn_registry_tuple()].
  425. get_registry_tuples_for_node(Scope, Node) ->
  426. ets:select(syn_backbone:get_table_name(syn_registry_by_name, Scope), [{
  427. {{'$1', '$2'}, '$3', '$4', '_', Node},
  428. [],
  429. [{{'$1', '$2', '$3', '$4'}}]
  430. }]).
  431. -spec find_registry_entry_by_name(Scope :: atom(), Name :: any()) -> Entry :: syn_registry_entry() | undefined.
  432. find_registry_entry_by_name(Scope, Name) ->
  433. case ets:select(syn_backbone:get_table_name(syn_registry_by_name, Scope), [{
  434. {{Name, '_'}, '_', '_', '_', '_'},
  435. [],
  436. ['$_']
  437. }]) of
  438. [RegistryEntry] -> RegistryEntry;
  439. [] -> undefined
  440. end.
  441. -spec find_registry_entries_by_pid(Scope :: atom(), Pid :: pid()) -> RegistryEntries :: [syn_registry_entry()].
  442. find_registry_entries_by_pid(Scope, Pid) when is_pid(Pid) ->
  443. ets:select(syn_backbone:get_table_name(syn_registry_by_pid, Scope), [{
  444. {{Pid, '$2'}, '$3', '$4', '$5', '$6'},
  445. [],
  446. [{{{{'$2', Pid}}, '$3', '$4', '$5', '$6'}}]
  447. }]).
  448. -spec find_monitor_for_pid(Scope :: atom(), Pid :: pid()) -> reference() | undefined.
  449. find_monitor_for_pid(Scope, Pid) when is_pid(Pid) ->
  450. case ets:select(syn_backbone:get_table_name(syn_registry_by_pid, Scope), [{
  451. {{Pid, '_'}, '_', '_', '$5', '_'},
  452. [],
  453. ['$5']
  454. }], 1) of
  455. {[MRef], _} -> MRef;
  456. '$end_of_table' -> undefined
  457. end.
  458. -spec maybe_demonitor(Scope :: atom(), Pid :: pid()) -> ok.
  459. maybe_demonitor(Scope, Pid) ->
  460. %% try to retrieve 2 items
  461. %% if only 1 is returned it means that no other aliases exist for the Pid
  462. case ets:select(syn_backbone:get_table_name(syn_registry_by_pid, Scope), [{
  463. {{Pid, '_'}, '_', '_', '$5', '_'},
  464. [],
  465. ['$5']
  466. }], 2) of
  467. {[MRef], _} when is_reference(MRef) ->
  468. %% no other aliases, demonitor
  469. erlang:demonitor(MRef, [flush]),
  470. ok;
  471. _ ->
  472. ok
  473. end.
  474. -spec add_to_local_table(
  475. Scope :: atom(),
  476. Name :: any(),
  477. Pid :: pid(),
  478. Meta :: any(),
  479. Time :: integer(),
  480. MRef :: undefined | reference()
  481. ) -> true.
  482. add_to_local_table(Scope, Name, Pid, Meta, Time, MRef) ->
  483. %% insert
  484. true = ets:insert(syn_backbone:get_table_name(syn_registry_by_name, Scope),
  485. {{Name, Pid}, Meta, Time, MRef, node(Pid)}
  486. ),
  487. true = ets:insert(syn_backbone:get_table_name(syn_registry_by_pid, Scope),
  488. {{Pid, Name}, Meta, Time, MRef, node(Pid)}
  489. ).
  490. -spec remove_from_local_table(Scope :: atom(), Name :: any(), Pid :: pid()) -> true.
  491. remove_from_local_table(Scope, Name, Pid) ->
  492. true = ets:delete(syn_backbone:get_table_name(syn_registry_by_name, Scope), {Name, Pid}),
  493. true = ets:delete(syn_backbone:get_table_name(syn_registry_by_pid, Scope), {Pid, Name}).
  494. -spec update_local_table(
  495. Scope :: atom(),
  496. Name :: any(),
  497. PreviousPid :: pid(),
  498. {
  499. Pid :: pid(),
  500. Meta :: any(),
  501. Time :: integer(),
  502. MRef :: undefined | reference()
  503. }
  504. ) -> true.
  505. update_local_table(Scope, Name, PreviousPid, {Pid, Meta, Time, MRef}) ->
  506. maybe_demonitor(Scope, PreviousPid),
  507. remove_from_local_table(Scope, Name, PreviousPid),
  508. add_to_local_table(Scope, Name, Pid, Meta, Time, MRef).
  509. -spec purge_registry_for_remote_node(Scope :: atom(), Node :: atom()) -> true.
  510. purge_registry_for_remote_node(Scope, Node) when Node =/= node() ->
  511. %% loop elements for callback in a separate process to free scope process
  512. RegistryTuples = get_registry_tuples_for_node(Scope, Node),
  513. spawn(fun() ->
  514. lists:foreach(fun({Name, Pid, Meta, _Time}) ->
  515. syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta)
  516. end, RegistryTuples)
  517. end),
  518. %% remove all from pid table
  519. true = ets:match_delete(syn_backbone:get_table_name(syn_registry_by_name, Scope), {{'_', '_'}, '_', '_', '_', Node}),
  520. true = ets:match_delete(syn_backbone:get_table_name(syn_registry_by_pid, Scope), {{'_', '_'}, '_', '_', '_', Node}).
  521. -spec handle_registry_sync(
  522. Scope :: atom(),
  523. Name :: any(),
  524. Pid :: pid(),
  525. Meta :: any(),
  526. Time :: non_neg_integer(),
  527. #state{}
  528. ) -> any().
  529. handle_registry_sync(Scope, Name, Pid, Meta, Time, State) ->
  530. case find_registry_entry_by_name(Scope, Name) of
  531. undefined ->
  532. %% no conflict
  533. add_to_local_table(Scope, Name, Pid, Meta, Time, undefined),
  534. %% callback
  535. syn_event_handler:do_on_process_registered(Scope, Name, {undefined, undefined}, {Pid, Meta});
  536. {{Name, Pid}, TableMeta, _TableTime, MRef, _TableNode} ->
  537. %% same pid, more recent (because it comes from the same node, which means that it's sequential)
  538. add_to_local_table(Scope, Name, Pid, Meta, Time, MRef),
  539. %% callback
  540. syn_event_handler:do_on_process_registered(Scope, Name, {Pid, TableMeta}, {Pid, Meta});
  541. {{Name, TablePid}, TableMeta, TableTime, TableMRef, _TableNode} when node(TablePid) =:= node() ->
  542. %% current node runs a conflicting process -> resolve
  543. %% * the conflict is resolved by the two nodes that own the conflicting processes
  544. %% * when a process is chosen, the time is updated
  545. %% * the node that runs the process that is kept sends the sync_register message
  546. %% * recipients check that the time is more recent that what they have to ensure that there are no race conditions
  547. resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime, TableMRef}, State);
  548. {{Name, TablePid}, TableMeta, TableTime, _TableMRef, _TableNode} when TableTime < Time ->
  549. %% current node does not own any of the conflicting processes, update
  550. update_local_table(Scope, Name, TablePid, {Pid, Meta, Time, undefined}),
  551. %% callbacks
  552. syn_event_handler:do_on_process_unregistered(Scope, Name, TablePid, TableMeta),
  553. syn_event_handler:do_on_process_registered(Scope, Name, {TablePid, TableMeta}, {Pid, Meta});
  554. {{Name, _TablePid}, _TableMeta, _TableTime, _TableMRef, _TableNode} ->
  555. %% race condition: incoming data is older, ignore
  556. ok
  557. end.
  558. -spec resolve_conflict(
  559. Scope :: atom(),
  560. Name :: any(),
  561. {Pid :: pid(), Meta :: any(), Time :: non_neg_integer()},
  562. {TablePid :: pid(), TableMeta :: any(), TableTime :: non_neg_integer(), TableMRef :: reference()},
  563. #state{}
  564. ) -> any().
  565. resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime, TableMRef}, State) ->
  566. %% call conflict resolution
  567. PidToKeep = syn_event_handler:do_resolve_registry_conflict(
  568. Scope,
  569. Name,
  570. {Pid, Meta, Time},
  571. {TablePid, TableMeta, TableTime}
  572. ),
  573. %% resolve
  574. case PidToKeep of
  575. Pid ->
  576. %% -> we keep the remote pid
  577. %% update locally, the incoming sync_register will update with the time coming from remote node
  578. update_local_table(Scope, Name, TablePid, {Pid, Meta, Time, undefined}),
  579. %% callbacks
  580. syn_event_handler:do_on_process_unregistered(Scope, Name, TablePid, TableMeta),
  581. syn_event_handler:do_on_process_registered(Scope, Name, {TablePid, TableMeta}, {Pid, Meta}),
  582. %% kill
  583. exit(TablePid, {syn_resolve_kill, Name, TableMeta}),
  584. error_logger:info_msg("SYN[~s] Registry CONFLICT for name ~p@~s: ~p ~p -> chosen: ~p",
  585. [node(), Name, Scope, Pid, TablePid, Pid]
  586. );
  587. TablePid ->
  588. %% -> we keep the local pid
  589. %% overwrite with updated time
  590. ResolveTime = erlang:system_time(),
  591. add_to_local_table(Scope, Name, TablePid, TableMeta, ResolveTime, TableMRef),
  592. %% broadcast
  593. broadcast({'3.0', sync_register, Scope, Name, TablePid, TableMeta, ResolveTime}, State),
  594. error_logger:info_msg("SYN[~s] Registry CONFLICT for name ~p@~s: ~p ~p -> chosen: ~p",
  595. [node(), Name, Scope, Pid, TablePid, TablePid]
  596. );
  597. Invalid ->
  598. %% remove
  599. maybe_demonitor(Scope, TablePid),
  600. remove_from_local_table(Scope, Name, TablePid),
  601. %% callback
  602. syn_event_handler:do_on_process_unregistered(Scope, Name, TablePid, TableMeta),
  603. %% kill local, remote will be killed by other node performing the same resolve
  604. exit(TablePid, {syn_resolve_kill, Name, TableMeta}),
  605. error_logger:info_msg("SYN[~s] Registry CONFLICT for name ~p@~s: ~p ~p -> none chosen (got: ~p)",
  606. [node(), Name, Scope, Pid, TablePid, Invalid]
  607. )
  608. end.