syn_registry.erl 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. %% ==========================================================================================================
  2. %% Syn - A global Process Registry and Process Group manager.
  3. %%
  4. %% The MIT License (MIT)
  5. %%
  6. %% Copyright (c) 2015-2019 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
  7. %%
  8. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  9. %% of this software and associated documentation files (the "Software"), to deal
  10. %% in the Software without restriction, including without limitation the rights
  11. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  12. %% copies of the Software, and to permit persons to whom the Software is
  13. %% furnished to do so, subject to the following conditions:
  14. %%
  15. %% The above copyright notice and this permission notice shall be included in
  16. %% all copies or substantial portions of the Software.
  17. %%
  18. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  19. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  20. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  21. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  22. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  23. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  24. %% THE SOFTWARE.
  25. %% ==========================================================================================================
  26. -module(syn_registry).
  27. -behaviour(gen_server).
  28. %% API
  29. -export([start_link/0]).
  30. -export([whereis/1, whereis/2]).
  31. -export([register/2, register/3]).
  32. -export([unregister/1]).
  33. -export([count/0, count/1]).
  34. %% sync API
  35. -export([sync_register/3, sync_unregister/1]).
  36. -export([sync_get_local_registry_tuples/1]).
  37. -export([unregister_on_node/1]).
  38. %% gen_server callbacks
  39. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  40. %% records
  41. -record(state, {}).
  42. %% includes
  43. -include("syn.hrl").
  44. %% ===================================================================
  45. %% API
  46. %% ===================================================================
  47. -spec start_link() -> {ok, pid()} | {error, any()}.
  48. start_link() ->
  49. Options = [],
  50. gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
  51. -spec whereis(Name :: term()) -> pid() | undefined.
  52. whereis(Name) ->
  53. case find_process_entry_by_name(Name) of
  54. undefined -> undefined;
  55. Entry -> Entry#syn_registry_table.pid
  56. end.
  57. -spec whereis(Name :: term(), with_meta) -> {pid(), Meta :: term()} | undefined.
  58. whereis(Name, with_meta) ->
  59. case find_process_entry_by_name(Name) of
  60. undefined -> undefined;
  61. Entry -> {Entry#syn_registry_table.pid, Entry#syn_registry_table.meta}
  62. end.
  63. -spec register(Name :: term(), Pid :: pid()) -> ok | {error, Reason :: term()}.
  64. register(Name, Pid) ->
  65. register(Name, Pid, undefined).
  66. -spec register(Name :: term(), Pid :: pid(), Meta :: term()) -> ok | {error, Reason :: term()}.
  67. register(Name, Pid, Meta) when is_pid(Pid) ->
  68. Node = node(Pid),
  69. gen_server:call({?MODULE, Node}, {register_on_node, Name, Pid, Meta}).
  70. -spec unregister(Name :: term()) -> ok | {error, Reason :: term()}.
  71. unregister(Name) ->
  72. % get process' node
  73. case find_process_entry_by_name(Name) of
  74. undefined ->
  75. {error, undefined};
  76. Entry ->
  77. Node = node(Entry#syn_registry_table.pid),
  78. gen_server:call({?MODULE, Node}, {unregister_on_node, Name})
  79. end.
  80. -spec count() -> non_neg_integer().
  81. count() ->
  82. mnesia:table_info(syn_registry_table, size).
  83. -spec count(Node :: node()) -> non_neg_integer().
  84. count(Node) ->
  85. %% build match specs
  86. MatchHead = #syn_registry_table{node = '$2', _ = '_'},
  87. Guard = {'=:=', '$2', Node},
  88. Result = '$2',
  89. %% select
  90. Processes = mnesia:dirty_select(syn_registry_table, [{MatchHead, [Guard], [Result]}]),
  91. length(Processes).
  92. -spec sync_register(Name :: term(), Pid :: pid(), Meta :: term()) -> ok.
  93. sync_register(Name, Pid, Meta) ->
  94. gen_server:cast(?MODULE, {sync_register, Name, Pid, Meta}).
  95. -spec sync_unregister(Name :: term()) -> ok.
  96. sync_unregister(Name) ->
  97. gen_server:cast(?MODULE, {sync_unregister, Name}).
  98. -spec sync_get_local_registry_tuples(FromNode :: node()) -> list(syn_registry_tuple()).
  99. sync_get_local_registry_tuples(FromNode) ->
  100. error_logger:info_msg("Syn(~p): Received request of local registry tuples from remote node: ~p~n", [node(), FromNode]),
  101. %% build match specs
  102. MatchHead = #syn_registry_table{name = '$1', pid = '$2', node = '$3', meta = '$4', _ = '_'},
  103. Guard = {'=:=', '$3', node()},
  104. RegistryTupleFormat = {{'$1', '$2', '$4'}},
  105. %% select
  106. mnesia:dirty_select(syn_registry_table, [{MatchHead, [Guard], [RegistryTupleFormat]}]).
  107. %% ===================================================================
  108. %% Callbacks
  109. %% ===================================================================
  110. %% ----------------------------------------------------------------------------------------------------------
  111. %% Init
  112. %% ----------------------------------------------------------------------------------------------------------
  113. -spec init([]) ->
  114. {ok, #state{}} |
  115. {ok, #state{}, Timeout :: non_neg_integer()} |
  116. ignore |
  117. {stop, Reason :: any()}.
  118. init([]) ->
  119. %% wait for table
  120. case mnesia:wait_for_tables([syn_registry_table], 10000) of
  121. ok ->
  122. %% monitor nodes
  123. ok = net_kernel:monitor_nodes(true),
  124. %% init
  125. {ok, #state{}};
  126. Reason ->
  127. {stop, {error_waiting_for_registry_table, Reason}}
  128. end.
  129. %% ----------------------------------------------------------------------------------------------------------
  130. %% Call messages
  131. %% ----------------------------------------------------------------------------------------------------------
  132. -spec handle_call(Request :: any(), From :: any(), #state{}) ->
  133. {reply, Reply :: any(), #state{}} |
  134. {reply, Reply :: any(), #state{}, Timeout :: non_neg_integer()} |
  135. {noreply, #state{}} |
  136. {noreply, #state{}, Timeout :: non_neg_integer()} |
  137. {stop, Reason :: any(), Reply :: any(), #state{}} |
  138. {stop, Reason :: any(), #state{}}.
  139. handle_call({register_on_node, Name, Pid, Meta}, _From, State) ->
  140. %% check if pid is alive
  141. case is_process_alive(Pid) of
  142. true ->
  143. %% check if name available
  144. case find_process_entry_by_name(Name) of
  145. undefined ->
  146. register_on_node(Name, Pid, Meta),
  147. %% multicast
  148. rpc:eval_everywhere(nodes(), ?MODULE, sync_register, [Name, Pid, Meta]),
  149. %% return
  150. {reply, ok, State};
  151. Entry when Entry#syn_registry_table.pid == Pid ->
  152. register_on_node(Name, Pid, Meta),
  153. %% multicast
  154. rpc:eval_everywhere(nodes(), ?MODULE, sync_register, [Name, Pid, Meta]),
  155. %% return
  156. {reply, ok, State};
  157. _ ->
  158. {reply, {error, taken}, State}
  159. end;
  160. _ ->
  161. {reply, {error, not_alive}, State}
  162. end;
  163. handle_call({unregister_on_node, Name}, _From, State) ->
  164. unregister_on_node(Name),
  165. %% multicast
  166. rpc:eval_everywhere(nodes(), ?MODULE, sync_unregister, [Name]),
  167. %% return
  168. {reply, ok, State};
  169. handle_call(Request, From, State) ->
  170. error_logger:warning_msg("Syn(~p): Received from ~p an unknown call message: ~p~n", [node(), Request, From]),
  171. {reply, undefined, State}.
  172. %% ----------------------------------------------------------------------------------------------------------
  173. %% Cast messages
  174. %% ----------------------------------------------------------------------------------------------------------
  175. -spec handle_cast(Msg :: any(), #state{}) ->
  176. {noreply, #state{}} |
  177. {noreply, #state{}, Timeout :: non_neg_integer()} |
  178. {stop, Reason :: any(), #state{}}.
  179. handle_cast({sync_register, Name, Pid, Meta}, State) ->
  180. %% add to table
  181. add_to_local_table(Name, Pid, Meta, undefined),
  182. %% return
  183. {noreply, State};
  184. handle_cast({sync_unregister, Name}, State) ->
  185. %% remove from table
  186. remove_from_local_table(Name),
  187. %% return
  188. {noreply, State};
  189. handle_cast(Msg, State) ->
  190. error_logger:warning_msg("Syn(~p): Received an unknown cast message: ~p~n", [node(), Msg]),
  191. {noreply, State}.
  192. %% ----------------------------------------------------------------------------------------------------------
  193. %% All non Call / Cast messages
  194. %% ----------------------------------------------------------------------------------------------------------
  195. -spec handle_info(Info :: any(), #state{}) ->
  196. {noreply, #state{}} |
  197. {noreply, #state{}, Timeout :: non_neg_integer()} |
  198. {stop, Reason :: any(), #state{}}.
  199. handle_info({'DOWN', _MonitorRef, process, Pid, Reason}, State) ->
  200. case find_processes_entry_by_pid(Pid) of
  201. [] ->
  202. %% log
  203. log_process_exit(undefined, Pid, Reason);
  204. Entries ->
  205. lists:foreach(fun(Entry) ->
  206. %% get process info
  207. Name = Entry#syn_registry_table.name,
  208. %% log
  209. log_process_exit(Name, Pid, Reason),
  210. %% remove from table
  211. remove_from_local_table(Name),
  212. %% multicast
  213. rpc:eval_everywhere(nodes(), ?MODULE, sync_unregister, [Name])
  214. end, Entries)
  215. end,
  216. %% return
  217. {noreply, State};
  218. handle_info({nodeup, RemoteNode}, State) ->
  219. error_logger:info_msg("Syn(~p): Node ~p has joined the cluster~n", [node(), RemoteNode]),
  220. global:trans({{?MODULE, auto_merge_node_up}, self()},
  221. fun() ->
  222. error_logger:warning_msg("Syn(~p): AUTOMERGE ----> Initiating for remote node ~p~n", [node(), RemoteNode]),
  223. %% get processes info from remote node
  224. RegistryTuples = rpc:call(RemoteNode, ?MODULE, sync_get_local_registry_tuples, [node()]),
  225. error_logger:warning_msg(
  226. "Syn(~p): Received ~p registry entrie(s) from remote node ~p, writing to local~n",
  227. [node(), length(RegistryTuples), RemoteNode]
  228. ),
  229. sync_registry_tuples(RemoteNode, RegistryTuples),
  230. %% exit
  231. error_logger:warning_msg("Syn(~p): AUTOMERGE <---- Done for remote node ~p~n", [node(), RemoteNode])
  232. end
  233. ),
  234. %% resume
  235. {noreply, State};
  236. handle_info({nodedown, RemoteNode}, State) ->
  237. error_logger:warning_msg("Syn(~p): Node ~p has left the cluster, removing its entries on local~n", [node(), RemoteNode]),
  238. purge_registry_entries_for_remote_node(RemoteNode),
  239. {noreply, State};
  240. handle_info(Info, State) ->
  241. error_logger:warning_msg("Syn(~p): Received an unknown info message: ~p~n", [node(), Info]),
  242. {noreply, State}.
  243. %% ----------------------------------------------------------------------------------------------------------
  244. %% Terminate
  245. %% ----------------------------------------------------------------------------------------------------------
  246. -spec terminate(Reason :: any(), #state{}) -> terminated.
  247. terminate(Reason, _State) ->
  248. error_logger:info_msg("Syn(~p): Terminating with reason: ~p~n", [node(), Reason]),
  249. terminated.
  250. %% ----------------------------------------------------------------------------------------------------------
  251. %% Convert process state when code is changed.
  252. %% ----------------------------------------------------------------------------------------------------------
  253. -spec code_change(OldVsn :: any(), #state{}, Extra :: any()) -> {ok, #state{}}.
  254. code_change(_OldVsn, State, _Extra) ->
  255. {ok, State}.
  256. %% ===================================================================
  257. %% Internal
  258. %% ===================================================================
  259. -spec register_on_node(Name :: any(), Pid :: pid(), Meta :: any()) -> ok.
  260. register_on_node(Name, Pid, Meta) ->
  261. MonitorRef = case find_processes_entry_by_pid(Pid) of
  262. [] ->
  263. %% process is not monitored yet, add
  264. erlang:monitor(process, Pid);
  265. [Entry | _] ->
  266. Entry#syn_registry_table.monitor_ref
  267. end,
  268. %% add to table
  269. add_to_local_table(Name, Pid, Meta, MonitorRef).
  270. -spec unregister_on_node(Name :: any()) -> ok.
  271. unregister_on_node(Name) ->
  272. case find_process_entry_by_name(Name) of
  273. undefined ->
  274. {error, undefined};
  275. Entry when Entry#syn_registry_table.monitor_ref =/= undefined ->
  276. %% demonitor
  277. erlang:demonitor(Entry#syn_registry_table.monitor_ref),
  278. %% remove from table
  279. remove_from_local_table(Name)
  280. end.
  281. -spec add_to_local_table(Name :: any(), Pid :: pid(), Meta :: any(), MonitorRef :: undefined | reference()) -> ok.
  282. add_to_local_table(Name, Pid, Meta, MonitorRef) ->
  283. mnesia:dirty_write(#syn_registry_table{
  284. name = Name,
  285. pid = Pid,
  286. node = node(Pid),
  287. meta = Meta,
  288. monitor_ref = MonitorRef
  289. }).
  290. -spec remove_from_local_table(Name :: any()) -> ok.
  291. remove_from_local_table(Name) ->
  292. mnesia:dirty_delete(syn_registry_table, Name).
  293. -spec find_processes_entry_by_pid(Pid :: pid()) -> Entries :: list(#syn_registry_table{}).
  294. find_processes_entry_by_pid(Pid) when is_pid(Pid) ->
  295. mnesia:dirty_index_read(syn_registry_table, Pid, #syn_registry_table.pid).
  296. -spec find_process_entry_by_name(Name :: term()) -> Entry :: #syn_registry_table{} | undefined.
  297. find_process_entry_by_name(Name) ->
  298. case mnesia:dirty_read(syn_registry_table, Name) of
  299. [Entry] -> Entry;
  300. _ -> undefined
  301. end.
  302. -spec log_process_exit(Name :: term(), Pid :: pid(), Reason :: term()) -> ok.
  303. log_process_exit(Name, Pid, Reason) ->
  304. case Reason of
  305. normal -> ok;
  306. shutdown -> ok;
  307. {shutdown, _} -> ok;
  308. killed -> ok;
  309. _ ->
  310. case Name of
  311. undefined ->
  312. error_logger:error_msg(
  313. "Syn(~p): Received a DOWN message from an unmonitored process ~p with reason: ~p~n",
  314. [node(), Pid, Reason]
  315. );
  316. _ ->
  317. error_logger:error_msg(
  318. "Syn(~p): Process with name ~p and pid ~p exited with reason: ~p~n",
  319. [node(), Name, Pid, Reason]
  320. )
  321. end
  322. end.
  323. -spec sync_registry_tuples(RemoteNode :: node(), RegistryTuples :: [syn_registry_tuple()]) -> ok.
  324. sync_registry_tuples(RemoteNode, RegistryTuples) ->
  325. %% ensure that registry doesn't have any joining node's entries (here again for race conditions)
  326. purge_registry_entries_for_remote_node(RemoteNode),
  327. %% loop
  328. F = fun({Name, RemotePid, RemoteMeta}) ->
  329. %% check if same name is registered
  330. case find_process_entry_by_name(Name) of
  331. undefined ->
  332. %% no conflict
  333. register_on_node(Name, RemotePid, RemoteMeta);
  334. Entry ->
  335. error_logger:warning_msg(
  336. "Syn(~p): Conflicting name process found for: ~p, processes are ~p, ~p~n",
  337. [node(), Name, Entry#syn_registry_table.pid, RemotePid]
  338. ),
  339. %% unregister local
  340. unregister_on_node(Name),
  341. %% unregister remote
  342. ok = rpc:call(RemoteNode, syn_registry, unregister_on_node, [Name]),
  343. %% TODO: call conflict resolution fun, for now kill the remote one
  344. exit(RemotePid, kill),
  345. %% register local
  346. register_on_node(Name, Entry#syn_registry_table.pid, Entry#syn_registry_table.meta)
  347. end
  348. end,
  349. %% add to table
  350. lists:foreach(F, RegistryTuples).
  351. -spec purge_registry_entries_for_remote_node(Node :: atom()) -> ok.
  352. purge_registry_entries_for_remote_node(Node) when Node =/= node() ->
  353. %% NB: no demonitoring is done, hence why this needs to run for a remote node
  354. %% build match specs
  355. MatchHead = #syn_registry_table{name = '$1', node = '$2', _ = '_'},
  356. Guard = {'=:=', '$2', Node},
  357. NameFormat = '$1',
  358. %% delete
  359. NodePids = mnesia:dirty_select(syn_registry_table, [{MatchHead, [Guard], [NameFormat]}]),
  360. DelF = fun(Id) -> mnesia:dirty_delete({syn_registry_table, Id}) end,
  361. lists:foreach(DelF, NodePids).