syn_registry.erl 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. %% ==========================================================================================================
  2. %% Syn - A global Process Registry and Process Group manager.
  3. %%
  4. %% The MIT License (MIT)
  5. %%
  6. %% Copyright (c) 2015-2019 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
  7. %%
  8. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  9. %% of this software and associated documentation files (the "Software"), to deal
  10. %% in the Software without restriction, including without limitation the rights
  11. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  12. %% copies of the Software, and to permit persons to whom the Software is
  13. %% furnished to do so, subject to the following conditions:
  14. %%
  15. %% The above copyright notice and this permission notice shall be included in
  16. %% all copies or substantial portions of the Software.
  17. %%
  18. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  19. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  20. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  21. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  22. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  23. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  24. %% THE SOFTWARE.
  25. %% ==========================================================================================================
  26. -module(syn_registry).
  27. -behaviour(gen_server).
  28. %% API
  29. -export([start_link/0]).
  30. -export([register/2, register/3]).
  31. -export([unregister/1]).
  32. -export([whereis/1, whereis/2]).
  33. -export([count/0, count/1]).
  34. %% sync API
  35. -export([sync_register/3, sync_unregister/1]).
  36. -export([sync_get_local_registry_tuples/1]).
  37. -export([unregister_on_node/1]).
  38. %% gen_server callbacks
  39. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  40. %% records
  41. -record(state, {}).
  42. %% includes
  43. -include("syn.hrl").
  44. %% ===================================================================
  45. %% API
  46. %% ===================================================================
  47. -spec start_link() -> {ok, pid()} | {error, any()}.
  48. start_link() ->
  49. Options = [],
  50. gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
  51. -spec register(Name :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
  52. register(Name, Pid) ->
  53. register(Name, Pid, undefined).
  54. -spec register(Name :: any(), Pid :: pid(), Meta :: any()) -> ok | {error, Reason :: any()}.
  55. register(Name, Pid, Meta) when is_pid(Pid) ->
  56. Node = node(Pid),
  57. gen_server:call({?MODULE, Node}, {register_on_node, Name, Pid, Meta}).
  58. -spec unregister(Name :: any()) -> ok | {error, Reason :: any()}.
  59. unregister(Name) ->
  60. % get process' node
  61. case find_process_entry_by_name(Name) of
  62. undefined ->
  63. {error, undefined};
  64. Entry ->
  65. Node = node(Entry#syn_registry_table.pid),
  66. gen_server:call({?MODULE, Node}, {unregister_on_node, Name})
  67. end.
  68. -spec whereis(Name :: any()) -> pid() | undefined.
  69. whereis(Name) ->
  70. case find_process_entry_by_name(Name) of
  71. undefined -> undefined;
  72. Entry -> Entry#syn_registry_table.pid
  73. end.
  74. -spec whereis(Name :: any(), with_meta) -> {pid(), Meta :: any()} | undefined.
  75. whereis(Name, with_meta) ->
  76. case find_process_entry_by_name(Name) of
  77. undefined -> undefined;
  78. Entry -> {Entry#syn_registry_table.pid, Entry#syn_registry_table.meta}
  79. end.
  80. -spec count() -> non_neg_integer().
  81. count() ->
  82. mnesia:table_info(syn_registry_table, size).
  83. -spec count(Node :: node()) -> non_neg_integer().
  84. count(Node) ->
  85. %% build match specs
  86. MatchHead = #syn_registry_table{node = '$2', _ = '_'},
  87. Guard = {'=:=', '$2', Node},
  88. Result = '$2',
  89. %% select
  90. Processes = mnesia:dirty_select(syn_registry_table, [{MatchHead, [Guard], [Result]}]),
  91. length(Processes).
  92. -spec sync_register(Name :: any(), Pid :: pid(), Meta :: any()) -> ok.
  93. sync_register(Name, Pid, Meta) ->
  94. gen_server:cast(?MODULE, {sync_register, Name, Pid, Meta}).
  95. -spec sync_unregister(Name :: any()) -> ok.
  96. sync_unregister(Name) ->
  97. gen_server:cast(?MODULE, {sync_unregister, Name}).
  98. -spec sync_get_local_registry_tuples(FromNode :: node()) -> list(syn_registry_tuple()).
  99. sync_get_local_registry_tuples(FromNode) ->
  100. error_logger:info_msg("Syn(~p): Received request of local registry tuples from remote node: ~p~n", [node(), FromNode]),
  101. %% build match specs
  102. MatchHead = #syn_registry_table{name = '$1', pid = '$2', node = '$3', meta = '$4', _ = '_'},
  103. Guard = {'=:=', '$3', node()},
  104. RegistryTupleFormat = {{'$1', '$2', '$4'}},
  105. %% select
  106. mnesia:dirty_select(syn_registry_table, [{MatchHead, [Guard], [RegistryTupleFormat]}]).
  107. %% ===================================================================
  108. %% Callbacks
  109. %% ===================================================================
  110. %% ----------------------------------------------------------------------------------------------------------
  111. %% Init
  112. %% ----------------------------------------------------------------------------------------------------------
  113. -spec init([]) ->
  114. {ok, #state{}} |
  115. {ok, #state{}, Timeout :: non_neg_integer()} |
  116. ignore |
  117. {stop, Reason :: any()}.
  118. init([]) ->
  119. %% wait for table
  120. case mnesia:wait_for_tables([syn_registry_table], 10000) of
  121. ok ->
  122. %% monitor nodes
  123. ok = net_kernel:monitor_nodes(true),
  124. %% init
  125. {ok, #state{}};
  126. Reason ->
  127. {stop, {error_waiting_for_registry_table, Reason}}
  128. end.
  129. %% ----------------------------------------------------------------------------------------------------------
  130. %% Call messages
  131. %% ----------------------------------------------------------------------------------------------------------
  132. -spec handle_call(Request :: any(), From :: any(), #state{}) ->
  133. {reply, Reply :: any(), #state{}} |
  134. {reply, Reply :: any(), #state{}, Timeout :: non_neg_integer()} |
  135. {noreply, #state{}} |
  136. {noreply, #state{}, Timeout :: non_neg_integer()} |
  137. {stop, Reason :: any(), Reply :: any(), #state{}} |
  138. {stop, Reason :: any(), #state{}}.
  139. handle_call({register_on_node, Name, Pid, Meta}, _From, State) ->
  140. %% check if pid is alive
  141. case is_process_alive(Pid) of
  142. true ->
  143. %% check if name available
  144. case find_process_entry_by_name(Name) of
  145. undefined ->
  146. register_on_node(Name, Pid, Meta),
  147. %% multicast
  148. rpc:eval_everywhere(nodes(), ?MODULE, sync_register, [Name, Pid, Meta]),
  149. %% return
  150. {reply, ok, State};
  151. Entry when Entry#syn_registry_table.pid == Pid ->
  152. register_on_node(Name, Pid, Meta),
  153. %% multicast
  154. rpc:eval_everywhere(nodes(), ?MODULE, sync_register, [Name, Pid, Meta]),
  155. %% return
  156. {reply, ok, State};
  157. _ ->
  158. {reply, {error, taken}, State}
  159. end;
  160. _ ->
  161. {reply, {error, not_alive}, State}
  162. end;
  163. handle_call({unregister_on_node, Name}, _From, State) ->
  164. case unregister_on_node(Name) of
  165. ok ->
  166. %% multicast
  167. rpc:eval_everywhere(nodes(), ?MODULE, sync_unregister, [Name]),
  168. %% return
  169. {reply, ok, State};
  170. {error, Reason} ->
  171. %% return
  172. {reply, {error, Reason}, State}
  173. end;
  174. handle_call(Request, From, State) ->
  175. error_logger:warning_msg("Syn(~p): Received from ~p an unknown call message: ~p~n", [node(), Request, From]),
  176. {reply, undefined, State}.
  177. %% ----------------------------------------------------------------------------------------------------------
  178. %% Cast messages
  179. %% ----------------------------------------------------------------------------------------------------------
  180. -spec handle_cast(Msg :: any(), #state{}) ->
  181. {noreply, #state{}} |
  182. {noreply, #state{}, Timeout :: non_neg_integer()} |
  183. {stop, Reason :: any(), #state{}}.
  184. handle_cast({sync_register, Name, Pid, Meta}, State) ->
  185. %% add to table
  186. add_to_local_table(Name, Pid, Meta, undefined),
  187. %% return
  188. {noreply, State};
  189. handle_cast({sync_unregister, Name}, State) ->
  190. %% remove from table
  191. remove_from_local_table(Name),
  192. %% return
  193. {noreply, State};
  194. handle_cast(Msg, State) ->
  195. error_logger:warning_msg("Syn(~p): Received an unknown cast message: ~p~n", [node(), Msg]),
  196. {noreply, State}.
  197. %% ----------------------------------------------------------------------------------------------------------
  198. %% All non Call / Cast messages
  199. %% ----------------------------------------------------------------------------------------------------------
  200. -spec handle_info(Info :: any(), #state{}) ->
  201. {noreply, #state{}} |
  202. {noreply, #state{}, Timeout :: non_neg_integer()} |
  203. {stop, Reason :: any(), #state{}}.
  204. handle_info({'DOWN', _MonitorRef, process, Pid, Reason}, State) ->
  205. case find_processes_entry_by_pid(Pid) of
  206. [] ->
  207. %% log
  208. log_process_exit(undefined, Pid, Reason);
  209. Entries ->
  210. lists:foreach(fun(Entry) ->
  211. %% get process info
  212. Name = Entry#syn_registry_table.name,
  213. %% log
  214. log_process_exit(Name, Pid, Reason),
  215. %% remove from table
  216. remove_from_local_table(Name),
  217. %% multicast
  218. rpc:eval_everywhere(nodes(), ?MODULE, sync_unregister, [Name])
  219. end, Entries)
  220. end,
  221. %% return
  222. {noreply, State};
  223. handle_info({nodeup, RemoteNode}, State) ->
  224. error_logger:info_msg("Syn(~p): Node ~p has joined the cluster~n", [node(), RemoteNode]),
  225. global:trans({{?MODULE, auto_merge_node_up}, self()},
  226. fun() ->
  227. error_logger:warning_msg("Syn(~p): AUTOMERGE ----> Initiating for remote node ~p~n", [node(), RemoteNode]),
  228. %% get processes info from remote node
  229. RegistryTuples = rpc:call(RemoteNode, ?MODULE, sync_get_local_registry_tuples, [node()]),
  230. error_logger:warning_msg(
  231. "Syn(~p): Received ~p registry entrie(s) from remote node ~p, writing to local~n",
  232. [node(), length(RegistryTuples), RemoteNode]
  233. ),
  234. sync_registry_tuples(RemoteNode, RegistryTuples),
  235. %% exit
  236. error_logger:warning_msg("Syn(~p): AUTOMERGE <---- Done for remote node ~p~n", [node(), RemoteNode])
  237. end
  238. ),
  239. %% resume
  240. {noreply, State};
  241. handle_info({nodedown, RemoteNode}, State) ->
  242. error_logger:warning_msg("Syn(~p): Node ~p has left the cluster, removing its entries on local~n", [node(), RemoteNode]),
  243. purge_registry_entries_for_remote_node(RemoteNode),
  244. {noreply, State};
  245. handle_info(Info, State) ->
  246. error_logger:warning_msg("Syn(~p): Received an unknown info message: ~p~n", [node(), Info]),
  247. {noreply, State}.
  248. %% ----------------------------------------------------------------------------------------------------------
  249. %% Terminate
  250. %% ----------------------------------------------------------------------------------------------------------
  251. -spec terminate(Reason :: any(), #state{}) -> terminated.
  252. terminate(Reason, _State) ->
  253. error_logger:info_msg("Syn(~p): Terminating with reason: ~p~n", [node(), Reason]),
  254. terminated.
  255. %% ----------------------------------------------------------------------------------------------------------
  256. %% Convert process state when code is changed.
  257. %% ----------------------------------------------------------------------------------------------------------
  258. -spec code_change(OldVsn :: any(), #state{}, Extra :: any()) -> {ok, #state{}}.
  259. code_change(_OldVsn, State, _Extra) ->
  260. {ok, State}.
  261. %% ===================================================================
  262. %% Internal
  263. %% ===================================================================
  264. -spec register_on_node(Name :: any(), Pid :: pid(), Meta :: any()) -> ok.
  265. register_on_node(Name, Pid, Meta) ->
  266. MonitorRef = case find_processes_entry_by_pid(Pid) of
  267. [] ->
  268. %% process is not monitored yet, add
  269. erlang:monitor(process, Pid);
  270. [Entry | _] ->
  271. Entry#syn_registry_table.monitor_ref
  272. end,
  273. %% add to table
  274. add_to_local_table(Name, Pid, Meta, MonitorRef).
  275. -spec unregister_on_node(Name :: any()) -> ok | {error, Reason :: any()}.
  276. unregister_on_node(Name) ->
  277. case find_process_entry_by_name(Name) of
  278. undefined ->
  279. {error, undefined};
  280. Entry when Entry#syn_registry_table.monitor_ref =/= undefined ->
  281. %% demonitor
  282. erlang:demonitor(Entry#syn_registry_table.monitor_ref),
  283. %% remove from table
  284. remove_from_local_table(Name)
  285. end.
  286. -spec add_to_local_table(Name :: any(), Pid :: pid(), Meta :: any(), MonitorRef :: undefined | reference()) -> ok.
  287. add_to_local_table(Name, Pid, Meta, MonitorRef) ->
  288. mnesia:dirty_write(#syn_registry_table{
  289. name = Name,
  290. pid = Pid,
  291. node = node(Pid),
  292. meta = Meta,
  293. monitor_ref = MonitorRef
  294. }).
  295. -spec remove_from_local_table(Name :: any()) -> ok.
  296. remove_from_local_table(Name) ->
  297. mnesia:dirty_delete(syn_registry_table, Name).
  298. -spec find_processes_entry_by_pid(Pid :: pid()) -> Entries :: list(#syn_registry_table{}).
  299. find_processes_entry_by_pid(Pid) when is_pid(Pid) ->
  300. mnesia:dirty_index_read(syn_registry_table, Pid, #syn_registry_table.pid).
  301. -spec find_process_entry_by_name(Name :: any()) -> Entry :: #syn_registry_table{} | undefined.
  302. find_process_entry_by_name(Name) ->
  303. case mnesia:dirty_read(syn_registry_table, Name) of
  304. [Entry] -> Entry;
  305. _ -> undefined
  306. end.
  307. -spec log_process_exit(Name :: any(), Pid :: pid(), Reason :: any()) -> ok.
  308. log_process_exit(Name, Pid, Reason) ->
  309. case Reason of
  310. normal -> ok;
  311. shutdown -> ok;
  312. {shutdown, _} -> ok;
  313. killed -> ok;
  314. _ ->
  315. case Name of
  316. undefined ->
  317. error_logger:error_msg(
  318. "Syn(~p): Received a DOWN message from an unmonitored process ~p with reason: ~p~n",
  319. [node(), Pid, Reason]
  320. );
  321. _ ->
  322. error_logger:error_msg(
  323. "Syn(~p): Process with name ~p and pid ~p exited with reason: ~p~n",
  324. [node(), Name, Pid, Reason]
  325. )
  326. end
  327. end.
  328. -spec sync_registry_tuples(RemoteNode :: node(), RegistryTuples :: [syn_registry_tuple()]) -> ok.
  329. sync_registry_tuples(RemoteNode, RegistryTuples) ->
  330. %% ensure that registry doesn't have any joining node's entries (here again for race conditions)
  331. purge_registry_entries_for_remote_node(RemoteNode),
  332. %% loop
  333. F = fun({Name, RemotePid, RemoteMeta}) ->
  334. %% check if same name is registered
  335. case find_process_entry_by_name(Name) of
  336. undefined ->
  337. %% no conflict
  338. register_on_node(Name, RemotePid, RemoteMeta);
  339. Entry ->
  340. error_logger:warning_msg(
  341. "Syn(~p): Conflicting name process found for: ~p, processes are ~p, ~p~n",
  342. [node(), Name, Entry#syn_registry_table.pid, RemotePid]
  343. ),
  344. %% unregister local
  345. unregister_on_node(Name),
  346. %% unregister remote
  347. ok = rpc:call(RemoteNode, syn_registry, unregister_on_node, [Name]),
  348. %% TODO: call conflict resolution fun, for now kill the remote one
  349. exit(RemotePid, kill),
  350. %% register local
  351. register_on_node(Name, Entry#syn_registry_table.pid, Entry#syn_registry_table.meta)
  352. end
  353. end,
  354. %% add to table
  355. lists:foreach(F, RegistryTuples).
  356. -spec purge_registry_entries_for_remote_node(Node :: atom()) -> ok.
  357. purge_registry_entries_for_remote_node(Node) when Node =/= node() ->
  358. %% NB: no demonitoring is done, hence why this needs to run for a remote node
  359. %% build match specs
  360. MatchHead = #syn_registry_table{name = '$1', node = '$2', _ = '_'},
  361. Guard = {'=:=', '$2', Node},
  362. NameFormat = '$1',
  363. %% delete
  364. NodePids = mnesia:dirty_select(syn_registry_table, [{MatchHead, [Guard], [NameFormat]}]),
  365. DelF = fun(Id) -> mnesia:dirty_delete({syn_registry_table, Id}) end,
  366. lists:foreach(DelF, NodePids).