syn_registry.erl 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474
  1. %% ==========================================================================================================
  2. %% Syn - A global Process Registry and Process Group manager.
  3. %%
  4. %% The MIT License (MIT)
  5. %%
  6. %% Copyright (c) 2015-2019 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
  7. %%
  8. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  9. %% of this software and associated documentation files (the "Software"), to deal
  10. %% in the Software without restriction, including without limitation the rights
  11. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  12. %% copies of the Software, and to permit persons to whom the Software is
  13. %% furnished to do so, subject to the following conditions:
  14. %%
  15. %% The above copyright notice and this permission notice shall be included in
  16. %% all copies or substantial portions of the Software.
  17. %%
  18. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  19. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  20. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  21. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  22. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  23. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  24. %% THE SOFTWARE.
  25. %% ==========================================================================================================
  26. -module(syn_registry).
  27. -behaviour(gen_server).
  28. %% API
  29. -export([start_link/0]).
  30. -export([register/2, register/3]).
  31. -export([unregister/1]).
  32. -export([whereis/1, whereis/2]).
  33. -export([count/0, count/1]).
  34. %% sync API
  35. -export([sync_register/4, sync_unregister/2]).
  36. -export([sync_get_local_registry_tuples/1]).
  37. -export([remove_from_local_table/1]).
  38. %% gen_server callbacks
  39. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  40. %% records
  41. -record(state, {
  42. custom_event_handler = undefined :: module()
  43. }).
  44. %% macros
  45. -define(DEFAULT_EVENT_HANDLER_MODULE, syn_event_handler).
  46. %% includes
  47. -include("syn.hrl").
  48. %% ===================================================================
  49. %% API
  50. %% ===================================================================
  51. -spec start_link() -> {ok, pid()} | {error, any()}.
  52. start_link() ->
  53. Options = [],
  54. gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
  55. -spec register(Name :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
  56. register(Name, Pid) ->
  57. register(Name, Pid, undefined).
  58. -spec register(Name :: any(), Pid :: pid(), Meta :: any()) -> ok | {error, Reason :: any()}.
  59. register(Name, Pid, Meta) when is_pid(Pid) ->
  60. Node = node(Pid),
  61. gen_server:call({?MODULE, Node}, {register_on_node, Name, Pid, Meta}).
  62. -spec unregister(Name :: any()) -> ok | {error, Reason :: any()}.
  63. unregister(Name) ->
  64. % get process' node
  65. case find_process_entry_by_name(Name) of
  66. undefined ->
  67. {error, undefined};
  68. Entry ->
  69. Node = node(Entry#syn_registry_table.pid),
  70. gen_server:call({?MODULE, Node}, {unregister_on_node, Name})
  71. end.
  72. -spec whereis(Name :: any()) -> pid() | undefined.
  73. whereis(Name) ->
  74. case find_process_entry_by_name(Name) of
  75. undefined -> undefined;
  76. Entry -> Entry#syn_registry_table.pid
  77. end.
  78. -spec whereis(Name :: any(), with_meta) -> {pid(), Meta :: any()} | undefined.
  79. whereis(Name, with_meta) ->
  80. case find_process_entry_by_name(Name) of
  81. undefined -> undefined;
  82. Entry -> {Entry#syn_registry_table.pid, Entry#syn_registry_table.meta}
  83. end.
  84. -spec count() -> non_neg_integer().
  85. count() ->
  86. mnesia:table_info(syn_registry_table, size).
  87. -spec count(Node :: node()) -> non_neg_integer().
  88. count(Node) ->
  89. %% build match specs
  90. MatchHead = #syn_registry_table{node = '$2', _ = '_'},
  91. Guard = {'=:=', '$2', Node},
  92. Result = '$2',
  93. %% select
  94. Processes = mnesia:dirty_select(syn_registry_table, [{MatchHead, [Guard], [Result]}]),
  95. length(Processes).
  96. -spec sync_register(RemoteNode :: node(), Name :: any(), Pid :: pid(), Meta :: any()) -> ok.
  97. sync_register(RemoteNode, Name, Pid, Meta) ->
  98. gen_server:cast({?MODULE, RemoteNode}, {sync_register, Name, Pid, Meta}).
  99. -spec sync_unregister(RemoteNode :: node(), Name :: any()) -> ok.
  100. sync_unregister(RemoteNode, Name) ->
  101. gen_server:cast({?MODULE, RemoteNode}, {sync_unregister, Name}).
  102. -spec sync_get_local_registry_tuples(FromNode :: node()) -> list(syn_registry_tuple()).
  103. sync_get_local_registry_tuples(FromNode) ->
  104. error_logger:info_msg("Syn(~p): Received request of local registry tuples from remote node: ~p~n", [node(), FromNode]),
  105. %% build match specs
  106. MatchHead = #syn_registry_table{name = '$1', pid = '$2', node = '$3', meta = '$4', _ = '_'},
  107. Guard = {'=:=', '$3', node()},
  108. RegistryTupleFormat = {{'$1', '$2', '$4'}},
  109. %% select
  110. mnesia:dirty_select(syn_registry_table, [{MatchHead, [Guard], [RegistryTupleFormat]}]).
  111. %% ===================================================================
  112. %% Callbacks
  113. %% ===================================================================
  114. %% ----------------------------------------------------------------------------------------------------------
  115. %% Init
  116. %% ----------------------------------------------------------------------------------------------------------
  117. -spec init([]) ->
  118. {ok, #state{}} |
  119. {ok, #state{}, Timeout :: non_neg_integer()} |
  120. ignore |
  121. {stop, Reason :: any()}.
  122. init([]) ->
  123. %% wait for table
  124. case mnesia:wait_for_tables([syn_registry_table], 10000) of
  125. ok ->
  126. %% monitor nodes
  127. ok = net_kernel:monitor_nodes(true),
  128. %% get handler
  129. CustomEventHandler = application:get_env(syn, event_handler, ?DEFAULT_EVENT_HANDLER_MODULE),
  130. %% ensure that is it loaded (not using code:ensure_loaded/1 to support embedded mode)
  131. catch CustomEventHandler:module_info(exports),
  132. %% init
  133. {ok, #state{
  134. custom_event_handler = CustomEventHandler
  135. }};
  136. Reason ->
  137. {stop, {error_waiting_for_registry_table, Reason}}
  138. end.
  139. %% ----------------------------------------------------------------------------------------------------------
  140. %% Call messages
  141. %% ----------------------------------------------------------------------------------------------------------
  142. -spec handle_call(Request :: any(), From :: any(), #state{}) ->
  143. {reply, Reply :: any(), #state{}} |
  144. {reply, Reply :: any(), #state{}, Timeout :: non_neg_integer()} |
  145. {noreply, #state{}} |
  146. {noreply, #state{}, Timeout :: non_neg_integer()} |
  147. {stop, Reason :: any(), Reply :: any(), #state{}} |
  148. {stop, Reason :: any(), #state{}}.
  149. handle_call({register_on_node, Name, Pid, Meta}, _From, State) ->
  150. %% check if pid is alive
  151. case is_process_alive(Pid) of
  152. true ->
  153. %% check if name available
  154. case find_process_entry_by_name(Name) of
  155. undefined ->
  156. register_on_node(Name, Pid, Meta),
  157. %% multicast
  158. multicast_register(Name, Pid, Meta),
  159. %% return
  160. {reply, ok, State};
  161. Entry when Entry#syn_registry_table.pid == Pid ->
  162. register_on_node(Name, Pid, Meta),
  163. %% multicast
  164. multicast_register(Name, Pid, Meta),
  165. %% return
  166. {reply, ok, State};
  167. _ ->
  168. {reply, {error, taken}, State}
  169. end;
  170. _ ->
  171. {reply, {error, not_alive}, State}
  172. end;
  173. handle_call({unregister_on_node, Name}, _From, State) ->
  174. case unregister_on_node(Name) of
  175. ok ->
  176. multicast_unregister(Name),
  177. %% return
  178. {reply, ok, State};
  179. {error, Reason} ->
  180. %% return
  181. {reply, {error, Reason}, State}
  182. end;
  183. handle_call(Request, From, State) ->
  184. error_logger:warning_msg("Syn(~p): Received from ~p an unknown call message: ~p~n", [node(), Request, From]),
  185. {reply, undefined, State}.
  186. %% ----------------------------------------------------------------------------------------------------------
  187. %% Cast messages
  188. %% ----------------------------------------------------------------------------------------------------------
  189. -spec handle_cast(Msg :: any(), #state{}) ->
  190. {noreply, #state{}} |
  191. {noreply, #state{}, Timeout :: non_neg_integer()} |
  192. {stop, Reason :: any(), #state{}}.
  193. handle_cast({sync_register, Name, Pid, Meta}, State) ->
  194. %% add to table
  195. add_to_local_table(Name, Pid, Meta, undefined),
  196. %% return
  197. {noreply, State};
  198. handle_cast({sync_unregister, Name}, State) ->
  199. %% remove from table
  200. remove_from_local_table(Name),
  201. %% return
  202. {noreply, State};
  203. handle_cast(Msg, State) ->
  204. error_logger:warning_msg("Syn(~p): Received an unknown cast message: ~p~n", [node(), Msg]),
  205. {noreply, State}.
  206. %% ----------------------------------------------------------------------------------------------------------
  207. %% All non Call / Cast messages
  208. %% ----------------------------------------------------------------------------------------------------------
  209. -spec handle_info(Info :: any(), #state{}) ->
  210. {noreply, #state{}} |
  211. {noreply, #state{}, Timeout :: non_neg_integer()} |
  212. {stop, Reason :: any(), #state{}}.
  213. handle_info({'DOWN', _MonitorRef, process, Pid, Reason}, State) ->
  214. case find_processes_entry_by_pid(Pid) of
  215. [] ->
  216. %% log
  217. handle_process_exit(undefined, Pid, undefined, Reason, State);
  218. Entries ->
  219. lists:foreach(fun(Entry) ->
  220. %% get process info
  221. Name = Entry#syn_registry_table.name,
  222. Pid = Entry#syn_registry_table.pid,
  223. Meta = Entry#syn_registry_table.meta,
  224. %% log
  225. handle_process_exit(Name, Pid, Meta, Reason, State),
  226. %% remove from table
  227. remove_from_local_table(Name),
  228. %% multicast
  229. multicast_unregister(Name)
  230. end, Entries)
  231. end,
  232. %% return
  233. {noreply, State};
  234. handle_info({nodeup, RemoteNode}, State) ->
  235. error_logger:info_msg("Syn(~p): Node ~p has joined the cluster~n", [node(), RemoteNode]),
  236. global:trans({{?MODULE, auto_merge_registry}, self()},
  237. fun() ->
  238. error_logger:warning_msg("Syn(~p): REGISTRY AUTOMERGE ----> Initiating for remote node ~p~n", [node(), RemoteNode]),
  239. %% get registry tuples from remote node
  240. RegistryTuples = rpc:call(RemoteNode, ?MODULE, sync_get_local_registry_tuples, [node()]),
  241. error_logger:warning_msg(
  242. "Syn(~p): Received ~p registry entrie(s) from remote node ~p, writing to local~n",
  243. [node(), length(RegistryTuples), RemoteNode]
  244. ),
  245. sync_registry_tuples(RemoteNode, RegistryTuples, State),
  246. %% exit
  247. error_logger:warning_msg("Syn(~p): REGISTRY AUTOMERGE <---- Done for remote node ~p~n", [node(), RemoteNode])
  248. end
  249. ),
  250. %% resume
  251. {noreply, State};
  252. handle_info({nodedown, RemoteNode}, State) ->
  253. error_logger:warning_msg("Syn(~p): Node ~p has left the cluster, removing registry entries on local~n", [node(), RemoteNode]),
  254. purge_registry_entries_for_remote_node(RemoteNode),
  255. {noreply, State};
  256. handle_info(Info, State) ->
  257. error_logger:warning_msg("Syn(~p): Received an unknown info message: ~p~n", [node(), Info]),
  258. {noreply, State}.
  259. %% ----------------------------------------------------------------------------------------------------------
  260. %% Terminate
  261. %% ----------------------------------------------------------------------------------------------------------
  262. -spec terminate(Reason :: any(), #state{}) -> terminated.
  263. terminate(Reason, _State) ->
  264. error_logger:info_msg("Syn(~p): Terminating with reason: ~p~n", [node(), Reason]),
  265. terminated.
  266. %% ----------------------------------------------------------------------------------------------------------
  267. %% Convert process state when code is changed.
  268. %% ----------------------------------------------------------------------------------------------------------
  269. -spec code_change(OldVsn :: any(), #state{}, Extra :: any()) -> {ok, #state{}}.
  270. code_change(_OldVsn, State, _Extra) ->
  271. {ok, State}.
  272. %% ===================================================================
  273. %% Internal
  274. %% ===================================================================
  275. -spec multicast_register(Name :: any(), Pid :: pid(), Meta :: any()) -> ok.
  276. multicast_register(Name, Pid, Meta) ->
  277. spawn_link(fun() ->
  278. lists:foreach(fun(RemoteNode) ->
  279. sync_register(RemoteNode, Name, Pid, Meta)
  280. end, nodes())
  281. end).
  282. -spec multicast_unregister(Name :: any()) -> ok.
  283. multicast_unregister(Name) ->
  284. spawn_link(fun() ->
  285. lists:foreach(fun(RemoteNode) ->
  286. sync_unregister(RemoteNode, Name)
  287. end, nodes())
  288. end).
  289. -spec register_on_node(Name :: any(), Pid :: pid(), Meta :: any()) -> ok.
  290. register_on_node(Name, Pid, Meta) ->
  291. MonitorRef = case find_processes_entry_by_pid(Pid) of
  292. [] ->
  293. %% process is not monitored yet, add
  294. erlang:monitor(process, Pid);
  295. [Entry | _] ->
  296. Entry#syn_registry_table.monitor_ref
  297. end,
  298. %% add to table
  299. add_to_local_table(Name, Pid, Meta, MonitorRef).
  300. -spec unregister_on_node(Name :: any()) -> ok | {error, Reason :: any()}.
  301. unregister_on_node(Name) ->
  302. case find_process_entry_by_name(Name) of
  303. undefined ->
  304. {error, undefined};
  305. Entry when Entry#syn_registry_table.monitor_ref =/= undefined ->
  306. %% demonitor
  307. erlang:demonitor(Entry#syn_registry_table.monitor_ref),
  308. %% remove from table
  309. remove_from_local_table(Name)
  310. end.
  311. -spec add_to_local_table(Name :: any(), Pid :: pid(), Meta :: any(), MonitorRef :: undefined | reference()) -> ok.
  312. add_to_local_table(Name, Pid, Meta, MonitorRef) ->
  313. mnesia:dirty_write(#syn_registry_table{
  314. name = Name,
  315. pid = Pid,
  316. node = node(Pid),
  317. meta = Meta,
  318. monitor_ref = MonitorRef
  319. }).
  320. -spec remove_from_local_table(Name :: any()) -> ok.
  321. remove_from_local_table(Name) ->
  322. mnesia:dirty_delete(syn_registry_table, Name).
  323. -spec find_processes_entry_by_pid(Pid :: pid()) -> Entries :: list(#syn_registry_table{}).
  324. find_processes_entry_by_pid(Pid) when is_pid(Pid) ->
  325. mnesia:dirty_index_read(syn_registry_table, Pid, #syn_registry_table.pid).
  326. -spec find_process_entry_by_name(Name :: any()) -> Entry :: #syn_registry_table{} | undefined.
  327. find_process_entry_by_name(Name) ->
  328. case mnesia:dirty_read(syn_registry_table, Name) of
  329. [Entry] -> Entry;
  330. _ -> undefined
  331. end.
  332. -spec handle_process_exit(Name :: any(), Pid :: pid(), Meta :: any(), Reason :: any(), #state{}) -> ok.
  333. handle_process_exit(Name, Pid, Meta, Reason, #state{
  334. custom_event_handler = CustomEventHandler
  335. }) ->
  336. case Name of
  337. undefined ->
  338. error_logger:warning_msg(
  339. "Syn(~p): Received a DOWN message from an unmonitored process ~p with reason: ~p~n",
  340. [node(), Pid, Reason]
  341. );
  342. _ ->
  343. case erlang:function_exported(CustomEventHandler, on_process_exit, 4) of
  344. true ->
  345. syn_event_handler:do_on_process_exit(Name, Pid, Meta, Reason, CustomEventHandler);
  346. _ ->
  347. case Reason of
  348. normal -> ok;
  349. shutdown -> ok;
  350. {shutdown, _} -> ok;
  351. killed -> ok;
  352. _ ->
  353. error_logger:error_msg(
  354. "Syn(~p): Process with name ~p and pid ~p exited with reason: ~p~n",
  355. [node(), Name, Pid, Reason]
  356. )
  357. end
  358. end
  359. end.
  360. -spec sync_registry_tuples(RemoteNode :: node(), RegistryTuples :: [syn_registry_tuple()], #state{}) -> ok.
  361. sync_registry_tuples(RemoteNode, RegistryTuples, #state{
  362. custom_event_handler = CustomEventHandler
  363. }) ->
  364. %% ensure that registry doesn't have any joining node's entries (here again for race conditions)
  365. purge_registry_entries_for_remote_node(RemoteNode),
  366. %% loop
  367. F = fun({Name, RemotePid, RemoteMeta}) ->
  368. %% check if same name is registered
  369. case find_process_entry_by_name(Name) of
  370. undefined ->
  371. %% no conflict
  372. register_on_node(Name, RemotePid, RemoteMeta);
  373. Entry ->
  374. LocalPid = Entry#syn_registry_table.pid,
  375. LocalMeta = Entry#syn_registry_table.meta,
  376. error_logger:warning_msg(
  377. "Syn(~p): Conflicting name process found for: ~p, processes are ~p, ~p~n",
  378. [node(), Name, LocalPid, RemotePid]
  379. ),
  380. %% call conflict resolution
  381. PidToKeep = syn_event_handler:do_resolve_registry_conflict(
  382. Name,
  383. {LocalPid, LocalMeta},
  384. {RemotePid, RemoteMeta},
  385. CustomEventHandler
  386. ),
  387. %% keep chosen one
  388. case PidToKeep of
  389. LocalPid ->
  390. %% keep local
  391. ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name]),
  392. exit(RemotePid, kill);
  393. RemotePid ->
  394. %% keep remote
  395. remove_from_local_table(Name),
  396. add_to_local_table(Name, RemotePid, RemoteMeta, undefined),
  397. exit(LocalPid, kill);
  398. Other ->
  399. error_logger:error_msg(
  400. "Syn(~p): Custom handler returned ~p, valid options were ~p and ~p",
  401. [node(), Other, LocalPid, RemotePid]
  402. )
  403. end
  404. end
  405. end,
  406. %% add to table
  407. lists:foreach(F, RegistryTuples).
  408. -spec purge_registry_entries_for_remote_node(Node :: atom()) -> ok.
  409. purge_registry_entries_for_remote_node(Node) when Node =/= node() ->
  410. %% NB: no demonitoring is done, hence why this needs to run for a remote node
  411. %% build match specs
  412. MatchHead = #syn_registry_table{name = '$1', node = '$2', _ = '_'},
  413. Guard = {'=:=', '$2', Node},
  414. IdFormat = '$1',
  415. %% delete
  416. NodePids = mnesia:dirty_select(syn_registry_table, [{MatchHead, [Guard], [IdFormat]}]),
  417. DelF = fun(Id) -> mnesia:dirty_delete({syn_registry_table, Id}) end,
  418. lists:foreach(DelF, NodePids).