syn_registry.erl 17 KB


  1. %% ==========================================================================================================
  2. %% Syn - A global Process Registry and Process Group manager.
  3. %%
  4. %% The MIT License (MIT)
  5. %%
  6. %% Copyright (c) 2015-2019 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
  7. %%
  8. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  9. %% of this software and associated documentation files (the "Software"), to deal
  10. %% in the Software without restriction, including without limitation the rights
  11. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  12. %% copies of the Software, and to permit persons to whom the Software is
  13. %% furnished to do so, subject to the following conditions:
  14. %%
  15. %% The above copyright notice and this permission notice shall be included in
  16. %% all copies or substantial portions of the Software.
  17. %%
  18. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  19. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  20. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  21. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  22. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  23. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  24. %% THE SOFTWARE.
  25. %% ==========================================================================================================
  26. -module(syn_registry).
  27. -behaviour(gen_server).
  28. %% API
  29. -export([start_link/0]).
  30. -export([register/2, register/3]).
  31. -export([unregister/1]).
  32. -export([whereis/1, whereis/2]).
  33. -export([count/0, count/1]).
  34. %% sync API
  35. -export([sync_get_local_registry_tuples/1]).
  36. -export([add_to_local_table/4]).
  37. -export([remove_from_local_table/1]).
  38. %% gen_server callbacks
  39. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  40. %% records
  41. -record(state, {
  42. custom_event_handler = undefined :: module()
  43. }).
  44. %% includes
  45. -include("syn.hrl").
  46. %% ===================================================================
  47. %% API
  48. %% ===================================================================
  49. -spec start_link() -> {ok, pid()} | {error, any()}.
  50. start_link() ->
  51. Options = [],
  52. gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
  53. -spec register(Name :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
  54. register(Name, Pid) ->
  55. register(Name, Pid, undefined).
  56. -spec register(Name :: any(), Pid :: pid(), Meta :: any()) -> ok | {error, Reason :: any()}.
  57. register(Name, Pid, Meta) when is_pid(Pid) ->
  58. Node = node(Pid),
  59. gen_server:call({?MODULE, Node}, {register_on_node, Name, Pid, Meta}).
  60. -spec unregister(Name :: any()) -> ok | {error, Reason :: any()}.
  61. unregister(Name) ->
  62. % get process' node
  63. case find_process_entry_by_name(Name) of
  64. undefined ->
  65. {error, undefined};
  66. Entry ->
  67. Node = node(Entry#syn_registry_table.pid),
  68. gen_server:call({?MODULE, Node}, {unregister_on_node, Name})
  69. end.
  70. -spec whereis(Name :: any()) -> pid() | undefined.
  71. whereis(Name) ->
  72. case find_process_entry_by_name(Name) of
  73. undefined -> undefined;
  74. Entry -> Entry#syn_registry_table.pid
  75. end.
  76. -spec whereis(Name :: any(), with_meta) -> {pid(), Meta :: any()} | undefined.
  77. whereis(Name, with_meta) ->
  78. case find_process_entry_by_name(Name) of
  79. undefined -> undefined;
  80. Entry -> {Entry#syn_registry_table.pid, Entry#syn_registry_table.meta}
  81. end.
  82. -spec count() -> non_neg_integer().
  83. count() ->
  84. mnesia:table_info(syn_registry_table, size).
  85. -spec count(Node :: node()) -> non_neg_integer().
  86. count(Node) ->
  87. RegistryTuples = get_registry_tuples_for_node(Node),
  88. length(RegistryTuples).
  89. -spec sync_get_local_registry_tuples(FromNode :: node()) -> [syn_registry_tuple()].
  90. sync_get_local_registry_tuples(FromNode) ->
  91. error_logger:info_msg("Syn(~p): Received request of local registry tuples from remote node: ~p~n", [node(), FromNode]),
  92. get_registry_tuples_for_node(node()).
  93. %% ===================================================================
  94. %% Callbacks
  95. %% ===================================================================
  96. %% ----------------------------------------------------------------------------------------------------------
  97. %% Init
  98. %% ----------------------------------------------------------------------------------------------------------
  99. -spec init([]) ->
  100. {ok, #state{}} |
  101. {ok, #state{}, Timeout :: non_neg_integer()} |
  102. ignore |
  103. {stop, Reason :: any()}.
  104. init([]) ->
  105. %% rebuild monitors (if coming after a crash)
  106. rebuild_monitors(),
  107. %% monitor nodes
  108. ok = net_kernel:monitor_nodes(true),
  109. %% get handler
  110. CustomEventHandler = syn_backbone:get_event_handler_module(),
  111. %% init
  112. {ok, #state{
  113. custom_event_handler = CustomEventHandler
  114. }}.
  115. %% ----------------------------------------------------------------------------------------------------------
  116. %% Call messages
  117. %% ----------------------------------------------------------------------------------------------------------
  118. -spec handle_call(Request :: any(), From :: any(), #state{}) ->
  119. {reply, Reply :: any(), #state{}} |
  120. {reply, Reply :: any(), #state{}, Timeout :: non_neg_integer()} |
  121. {noreply, #state{}} |
  122. {noreply, #state{}, Timeout :: non_neg_integer()} |
  123. {stop, Reason :: any(), Reply :: any(), #state{}} |
  124. {stop, Reason :: any(), #state{}}.
  125. handle_call({register_on_node, Name, Pid, Meta}, _From, State) ->
  126. %% check if pid is alive
  127. case is_process_alive(Pid) of
  128. true ->
  129. %% check if name available
  130. case find_process_entry_by_name(Name) of
  131. undefined ->
  132. register_on_node(Name, Pid, Meta),
  133. %% multicast
  134. multicast_register(Name, Pid, Meta),
  135. %% return
  136. {reply, ok, State};
  137. Entry when Entry#syn_registry_table.pid == Pid ->
  138. register_on_node(Name, Pid, Meta),
  139. %% multicast
  140. multicast_register(Name, Pid, Meta),
  141. %% return
  142. {reply, ok, State};
  143. _ ->
  144. {reply, {error, taken}, State}
  145. end;
  146. _ ->
  147. {reply, {error, not_alive}, State}
  148. end;
  149. handle_call({unregister_on_node, Name}, _From, State) ->
  150. case unregister_on_node(Name) of
  151. ok ->
  152. multicast_unregister(Name),
  153. %% return
  154. {reply, ok, State};
  155. {error, Reason} ->
  156. %% return
  157. {reply, {error, Reason}, State}
  158. end;
  159. handle_call(Request, From, State) ->
  160. error_logger:warning_msg("Syn(~p): Received from ~p an unknown call message: ~p~n", [node(), Request, From]),
  161. {reply, undefined, State}.
  162. %% ----------------------------------------------------------------------------------------------------------
  163. %% Cast messages
  164. %% ----------------------------------------------------------------------------------------------------------
  165. -spec handle_cast(Msg :: any(), #state{}) ->
  166. {noreply, #state{}} |
  167. {noreply, #state{}, Timeout :: non_neg_integer()} |
  168. {stop, Reason :: any(), #state{}}.
  169. handle_cast(Msg, State) ->
  170. error_logger:warning_msg("Syn(~p): Received an unknown cast message: ~p~n", [node(), Msg]),
  171. {noreply, State}.
  172. %% ----------------------------------------------------------------------------------------------------------
  173. %% All non Call / Cast messages
  174. %% ----------------------------------------------------------------------------------------------------------
  175. -spec handle_info(Info :: any(), #state{}) ->
  176. {noreply, #state{}} |
  177. {noreply, #state{}, Timeout :: non_neg_integer()} |
  178. {stop, Reason :: any(), #state{}}.
  179. handle_info({'DOWN', _MonitorRef, process, Pid, Reason}, State) ->
  180. case find_processes_entry_by_pid(Pid) of
  181. [] ->
  182. %% handle
  183. handle_process_down(undefined, Pid, undefined, Reason, State);
  184. Entries ->
  185. lists:foreach(fun(Entry) ->
  186. %% get process info
  187. Name = Entry#syn_registry_table.name,
  188. Pid = Entry#syn_registry_table.pid,
  189. Meta = Entry#syn_registry_table.meta,
  190. %% handle
  191. handle_process_down(Name, Pid, Meta, Reason, State),
  192. %% remove from table
  193. remove_from_local_table(Name),
  194. %% multicast
  195. multicast_unregister(Name)
  196. end, Entries)
  197. end,
  198. %% return
  199. {noreply, State};
  200. handle_info({nodeup, RemoteNode}, State) ->
  201. error_logger:info_msg("Syn(~p): Node ~p has joined the cluster~n", [node(), RemoteNode]),
  202. global:trans({{?MODULE, auto_merge_registry}, self()},
  203. fun() ->
  204. error_logger:warning_msg("Syn(~p): REGISTRY AUTOMERGE ----> Initiating for remote node ~p~n", [node(), RemoteNode]),
  205. %% get registry tuples from remote node
  206. RegistryTuples = rpc:call(RemoteNode, ?MODULE, sync_get_local_registry_tuples, [node()]),
  207. error_logger:warning_msg(
  208. "Syn(~p): Received ~p registry entrie(s) from remote node ~p, writing to local~n",
  209. [node(), length(RegistryTuples), RemoteNode]
  210. ),
  211. sync_registry_tuples(RemoteNode, RegistryTuples, State),
  212. %% exit
  213. error_logger:warning_msg("Syn(~p): REGISTRY AUTOMERGE <---- Done for remote node ~p~n", [node(), RemoteNode])
  214. end
  215. ),
  216. %% resume
  217. {noreply, State};
  218. handle_info({nodedown, RemoteNode}, State) ->
  219. error_logger:warning_msg("Syn(~p): Node ~p has left the cluster, removing registry entries on local~n", [node(), RemoteNode]),
  220. purge_registry_entries_for_remote_node(RemoteNode),
  221. {noreply, State};
  222. handle_info(Info, State) ->
  223. error_logger:warning_msg("Syn(~p): Received an unknown info message: ~p~n", [node(), Info]),
  224. {noreply, State}.
  225. %% ----------------------------------------------------------------------------------------------------------
  226. %% Terminate
  227. %% ----------------------------------------------------------------------------------------------------------
  228. -spec terminate(Reason :: any(), #state{}) -> terminated.
  229. terminate(Reason, _State) ->
  230. error_logger:info_msg("Syn(~p): Terminating with reason: ~p~n", [node(), Reason]),
  231. terminated.
  232. %% ----------------------------------------------------------------------------------------------------------
  233. %% Convert process state when code is changed.
  234. %% ----------------------------------------------------------------------------------------------------------
  235. -spec code_change(OldVsn :: any(), #state{}, Extra :: any()) -> {ok, #state{}}.
  236. code_change(_OldVsn, State, _Extra) ->
  237. {ok, State}.
  238. %% ===================================================================
  239. %% Internal
  240. %% ===================================================================
  241. -spec multicast_register(Name :: any(), Pid :: pid(), Meta :: any()) -> pid().
  242. multicast_register(Name, Pid, Meta) ->
  243. spawn_link(fun() ->
  244. rpc:eval_everywhere(nodes(), ?MODULE, add_to_local_table, [Name, Pid, Meta, undefined])
  245. end).
  246. -spec multicast_unregister(Name :: any()) -> pid().
  247. multicast_unregister(Name) ->
  248. spawn_link(fun() ->
  249. rpc:eval_everywhere(nodes(), ?MODULE, remove_from_local_table, [Name])
  250. end).
  251. -spec register_on_node(Name :: any(), Pid :: pid(), Meta :: any()) -> ok.
  252. register_on_node(Name, Pid, Meta) ->
  253. MonitorRef = case find_processes_entry_by_pid(Pid) of
  254. [] ->
  255. %% process is not monitored yet, add
  256. erlang:monitor(process, Pid);
  257. [Entry | _] ->
  258. Entry#syn_registry_table.monitor_ref
  259. end,
  260. %% add to table
  261. add_to_local_table(Name, Pid, Meta, MonitorRef).
  262. -spec unregister_on_node(Name :: any()) -> ok | {error, Reason :: any()}.
  263. unregister_on_node(Name) ->
  264. case find_process_entry_by_name(Name) of
  265. undefined ->
  266. {error, undefined};
  267. Entry when Entry#syn_registry_table.monitor_ref =/= undefined ->
  268. %% demonitor
  269. erlang:demonitor(Entry#syn_registry_table.monitor_ref),
  270. %% remove from table
  271. remove_from_local_table(Name)
  272. end.
  273. -spec add_to_local_table(Name :: any(), Pid :: pid(), Meta :: any(), MonitorRef :: undefined | reference()) -> ok.
  274. add_to_local_table(Name, Pid, Meta, MonitorRef) ->
  275. mnesia:dirty_write(#syn_registry_table{
  276. name = Name,
  277. pid = Pid,
  278. node = node(Pid),
  279. meta = Meta,
  280. monitor_ref = MonitorRef
  281. }).
  282. -spec remove_from_local_table(Name :: any()) -> ok.
  283. remove_from_local_table(Name) ->
  284. mnesia:dirty_delete(syn_registry_table, Name).
  285. -spec find_processes_entry_by_pid(Pid :: pid()) -> Entries :: [#syn_registry_table{}].
  286. find_processes_entry_by_pid(Pid) when is_pid(Pid) ->
  287. mnesia:dirty_index_read(syn_registry_table, Pid, #syn_registry_table.pid).
  288. -spec find_process_entry_by_name(Name :: any()) -> Entry :: #syn_registry_table{} | undefined.
  289. find_process_entry_by_name(Name) ->
  290. case mnesia:dirty_read(syn_registry_table, Name) of
  291. [Entry] -> Entry;
  292. _ -> undefined
  293. end.
  294. -spec get_registry_tuples_for_node(Node :: node()) -> [syn_registry_tuple()].
  295. get_registry_tuples_for_node(Node) ->
  296. %% build match specs
  297. MatchHead = #syn_registry_table{name = '$1', pid = '$2', node = '$3', meta = '$4', _ = '_'},
  298. Guard = {'=:=', '$3', Node},
  299. RegistryTupleFormat = {{'$1', '$2', '$4'}},
  300. %% select
  301. mnesia:dirty_select(syn_registry_table, [{MatchHead, [Guard], [RegistryTupleFormat]}]).
  302. -spec handle_process_down(Name :: any(), Pid :: pid(), Meta :: any(), Reason :: any(), #state{}) -> ok.
  303. handle_process_down(Name, Pid, Meta, Reason, #state{
  304. custom_event_handler = CustomEventHandler
  305. }) ->
  306. case Name of
  307. undefined ->
  308. error_logger:warning_msg(
  309. "Syn(~p): Received a DOWN message from an unmonitored process ~p with reason: ~p~n",
  310. [node(), Pid, Reason]
  311. );
  312. _ ->
  313. syn_event_handler:do_on_process_exit(Name, Pid, Meta, Reason, CustomEventHandler)
  314. end.
  315. -spec sync_registry_tuples(RemoteNode :: node(), RegistryTuples :: [syn_registry_tuple()], #state{}) -> ok.
  316. sync_registry_tuples(RemoteNode, RegistryTuples, #state{
  317. custom_event_handler = CustomEventHandler
  318. }) ->
  319. %% ensure that registry doesn't have any joining node's entries (here again for race conditions)
  320. purge_registry_entries_for_remote_node(RemoteNode),
  321. %% loop
  322. F = fun({Name, RemotePid, RemoteMeta}) ->
  323. %% check if same name is registered
  324. case find_process_entry_by_name(Name) of
  325. undefined ->
  326. %% no conflict
  327. register_on_node(Name, RemotePid, RemoteMeta);
  328. Entry ->
  329. LocalPid = Entry#syn_registry_table.pid,
  330. LocalMeta = Entry#syn_registry_table.meta,
  331. error_logger:warning_msg(
  332. "Syn(~p): Conflicting name process found for: ~p, processes are ~p, ~p~n",
  333. [node(), Name, LocalPid, RemotePid]
  334. ),
  335. %% call conflict resolution
  336. {PidToKeep, KillOther} = syn_event_handler:do_resolve_registry_conflict(
  337. Name,
  338. {LocalPid, LocalMeta},
  339. {RemotePid, RemoteMeta},
  340. CustomEventHandler
  341. ),
  342. %% keep chosen one
  343. case PidToKeep of
  344. LocalPid ->
  345. %% keep local
  346. ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name]),
  347. case KillOther of
  348. true -> exit(RemotePid, kill);
  349. _ -> ok
  350. end;
  351. RemotePid ->
  352. %% keep remote
  353. remove_from_local_table(Name),
  354. add_to_local_table(Name, RemotePid, RemoteMeta, undefined),
  355. case KillOther of
  356. true -> exit(LocalPid, kill);
  357. _ -> ok
  358. end;
  359. Other ->
  360. error_logger:error_msg(
  361. "Syn(~p): Custom handler returned ~p, valid options were ~p and ~p",
  362. [node(), Other, LocalPid, RemotePid]
  363. )
  364. end
  365. end
  366. end,
  367. %% add to table
  368. lists:foreach(F, RegistryTuples).
  369. -spec purge_registry_entries_for_remote_node(Node :: atom()) -> ok.
  370. purge_registry_entries_for_remote_node(Node) when Node =/= node() ->
  371. %% NB: no demonitoring is done, hence why this needs to run for a remote node
  372. %% build match specs
  373. MatchHead = #syn_registry_table{name = '$1', node = '$2', _ = '_'},
  374. Guard = {'=:=', '$2', Node},
  375. IdFormat = '$1',
  376. %% delete
  377. NodePids = mnesia:dirty_select(syn_registry_table, [{MatchHead, [Guard], [IdFormat]}]),
  378. DelF = fun(Id) -> mnesia:dirty_delete({syn_registry_table, Id}) end,
  379. lists:foreach(DelF, NodePids).
  380. -spec rebuild_monitors() -> ok.
  381. rebuild_monitors() ->
  382. RegistryTuples = get_registry_tuples_for_node(node()),
  383. lists:foreach(fun({Name, Pid, Meta}) ->
  384. case is_process_alive(Pid) of
  385. true ->
  386. MonitorRef = erlang:monitor(process, Pid),
  387. add_to_local_table(Name, Pid, Meta, MonitorRef);
  388. _ ->
  389. remove_from_local_table(Name)
  390. end
  391. end, RegistryTuples).