syn_registry.erl 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. %% ==========================================================================================================
  2. %% Syn - A global Process Registry and Process Group manager.
  3. %%
  4. %% The MIT License (MIT)
  5. %%
  6. %% Copyright (c) 2015 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
  7. %%
  8. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  9. %% of this software and associated documentation files (the "Software"), to deal
  10. %% in the Software without restriction, including without limitation the rights
  11. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  12. %% copies of the Software, and to permit persons to whom the Software is
  13. %% furnished to do so, subject to the following conditions:
  14. %%
  15. %% The above copyright notice and this permission notice shall be included in
  16. %% all copies or substantial portions of the Software.
  17. %%
  18. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  19. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  20. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  21. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  22. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  23. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  24. %% THE SOFTWARE.
  25. %% ==========================================================================================================
  26. -module(syn_registry).
  27. -behaviour(gen_server).
  28. %% API
  29. -export([start_link/0]).
  30. -export([register/2, register/3]).
  31. -export([unregister/1]).
  32. -export([find_by_key/1, find_by_key/2]).
  33. -export([find_by_pid/1, find_by_pid/2]).
  34. -export([count/0, count/1]).
  35. %% gen_server callbacks
  36. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  37. %% records
  38. -record(state, {
  39. registry_process_exit_callback_module = undefined :: atom(),
  40. registry_process_exit_callback_function = undefined :: atom()
  41. }).
  42. %% include
  43. -include("syn.hrl").
  44. %% ===================================================================
  45. %% API
  46. %% ===================================================================
  47. -spec start_link() -> {ok, pid()} | {error, any()}.
  48. start_link() ->
  49. Options = [],
  50. gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
  51. -spec find_by_key(Key :: any()) -> pid() | undefined.
  52. find_by_key(Key) ->
  53. case i_find_by_key(on_connected_node, Key) of
  54. undefined -> undefined;
  55. Process -> Process#syn_registry_table.pid
  56. end.
  57. -spec find_by_key(Key :: any(), with_meta) -> {pid(), Meta :: any()} | undefined.
  58. find_by_key(Key, with_meta) ->
  59. case i_find_by_key(on_connected_node, Key) of
  60. undefined -> undefined;
  61. Process -> {Process#syn_registry_table.pid, Process#syn_registry_table.meta}
  62. end.
  63. -spec find_by_pid(Pid :: pid()) -> Key :: any() | undefined.
  64. find_by_pid(Pid) when is_pid(Pid) ->
  65. case i_find_by_pid(on_connected_node, Pid) of
  66. undefined -> undefined;
  67. Process -> Process#syn_registry_table.key
  68. end.
  69. -spec find_by_pid(Pid :: pid(), with_meta) -> {Key :: any(), Meta :: any()} | undefined.
  70. find_by_pid(Pid, with_meta) when is_pid(Pid) ->
  71. case i_find_by_pid(on_connected_node, Pid) of
  72. undefined -> undefined;
  73. Process -> {Process#syn_registry_table.key, Process#syn_registry_table.meta}
  74. end.
  75. -spec register(Key :: any(), Pid :: pid()) -> ok | {error, taken | pid_already_registered}.
  76. register(Key, Pid) when is_pid(Pid) ->
  77. register(Key, Pid, undefined).
  78. -spec register(Key :: any(), Pid :: pid(), Meta :: any()) -> ok | {error, taken | pid_already_registered}.
  79. register(Key, Pid, Meta) when is_pid(Pid) ->
  80. Node = node(Pid),
  81. gen_server:call({?MODULE, Node}, {register_on_node, Key, Pid, Meta}).
  82. -spec unregister(Key :: any()) -> ok | {error, undefined}.
  83. unregister(Key) ->
  84. case i_find_by_key(Key) of
  85. undefined ->
  86. {error, undefined};
  87. Process ->
  88. Node = node(Process#syn_registry_table.pid),
  89. gen_server:call({?MODULE, Node}, {unregister_on_node, Key})
  90. end.
  91. -spec count() -> non_neg_integer().
  92. count() ->
  93. mnesia:table_info(syn_registry_table, size).
  94. -spec count(Node :: atom()) -> non_neg_integer().
  95. count(Node) ->
  96. %% build match specs
  97. MatchHead = #syn_registry_table{node = '$2', _ = '_'},
  98. Guard = {'=:=', '$2', Node},
  99. Result = '$2',
  100. %% select
  101. Processes = mnesia:dirty_select(syn_registry_table, [{MatchHead, [Guard], [Result]}]),
  102. length(Processes).
  103. %% ===================================================================
  104. %% Callbacks
  105. %% ===================================================================
  106. %% ----------------------------------------------------------------------------------------------------------
  107. %% Init
  108. %% ----------------------------------------------------------------------------------------------------------
  109. -spec init([]) ->
  110. {ok, #state{}} |
  111. {ok, #state{}, Timeout :: non_neg_integer()} |
  112. ignore |
  113. {stop, Reason :: any()}.
  114. init([]) ->
  115. %% trap linked processes signal
  116. process_flag(trap_exit, true),
  117. %% get options
  118. {ok, [ProcessExitCallbackModule, ProcessExitCallbackFunction]} = syn_utils:get_env_value(
  119. registry_process_exit_callback,
  120. [undefined, undefined]
  121. ),
  122. %% build state
  123. {ok, #state{
  124. registry_process_exit_callback_module = ProcessExitCallbackModule,
  125. registry_process_exit_callback_function = ProcessExitCallbackFunction
  126. }}.
  127. %% ----------------------------------------------------------------------------------------------------------
  128. %% Call messages
  129. %% ----------------------------------------------------------------------------------------------------------
  130. -spec handle_call(Request :: any(), From :: any(), #state{}) ->
  131. {reply, Reply :: any(), #state{}} |
  132. {reply, Reply :: any(), #state{}, Timeout :: non_neg_integer()} |
  133. {noreply, #state{}} |
  134. {noreply, #state{}, Timeout :: non_neg_integer()} |
  135. {stop, Reason :: any(), Reply :: any(), #state{}} |
  136. {stop, Reason :: any(), #state{}}.
  137. handle_call({register_on_node, Key, Pid, Meta}, _From, State) ->
  138. %% check & register in gen_server process to ensure atomicity at node level without transaction lock
  139. %% atomicity is obviously not at cluster level, which is covered by syn_consistency.
  140. case i_find_by_key(Key) of
  141. undefined ->
  142. case i_find_by_pid(Pid) of
  143. undefined ->
  144. %% add to table
  145. mnesia:dirty_write(#syn_registry_table{
  146. key = Key,
  147. pid = Pid,
  148. node = node(),
  149. meta = Meta
  150. }),
  151. %% link
  152. erlang:link(Pid),
  153. %% return
  154. {reply, ok, State};
  155. _ ->
  156. {reply, {error, pid_already_registered}, State}
  157. end;
  158. _ ->
  159. {reply, {error, taken}, State}
  160. end;
  161. handle_call({unregister_on_node, Key}, _From, State) ->
  162. %% we check again for key to return the correct response regardless of race conditions
  163. case i_find_by_key(Key) of
  164. undefined ->
  165. {reply, {error, undefined}, State};
  166. Process ->
  167. %% remove from table
  168. remove_process_by_key(Key),
  169. %% unlink
  170. Pid = Process#syn_registry_table.pid,
  171. erlang:unlink(Pid),
  172. %% reply
  173. {reply, ok, State}
  174. end;
  175. handle_call({unlink_process, Pid}, _From, State) ->
  176. erlang:unlink(Pid),
  177. {reply, ok, State};
  178. handle_call(Request, From, State) ->
  179. error_logger:warning_msg("Received from ~p an unknown call message: ~p", [Request, From]),
  180. {reply, undefined, State}.
  181. %% ----------------------------------------------------------------------------------------------------------
  182. %% Cast messages
  183. %% ----------------------------------------------------------------------------------------------------------
  184. -spec handle_cast(Msg :: any(), #state{}) ->
  185. {noreply, #state{}} |
  186. {noreply, #state{}, Timeout :: non_neg_integer()} |
  187. {stop, Reason :: any(), #state{}}.
  188. handle_cast(Msg, State) ->
  189. error_logger:warning_msg("Received an unknown cast message: ~p", [Msg]),
  190. {noreply, State}.
  191. %% ----------------------------------------------------------------------------------------------------------
  192. %% All non Call / Cast messages
  193. %% ----------------------------------------------------------------------------------------------------------
  194. -spec handle_info(Info :: any(), #state{}) ->
  195. {noreply, #state{}} |
  196. {noreply, #state{}, Timeout :: non_neg_integer()} |
  197. {stop, Reason :: any(), #state{}}.
  198. handle_info({'EXIT', Pid, Reason}, #state{
  199. registry_process_exit_callback_module = ProcessExitCallbackModule,
  200. registry_process_exit_callback_function = ProcessExitCallbackFunction
  201. } = State) ->
  202. %% do not lock backbone
  203. spawn(fun() ->
  204. %% check if pid is in table
  205. {Key, Meta} = case i_find_by_pid(Pid) of
  206. undefined ->
  207. %% log
  208. case Reason of
  209. normal -> ok;
  210. killed -> ok;
  211. _ ->
  212. error_logger:error_msg("Received an exit message from an unlinked process ~p with reason: ~p", [Pid, Reason])
  213. end,
  214. %% return
  215. {undefined, undefined};
  216. Process ->
  217. %% get process info
  218. Key0 = Process#syn_registry_table.key,
  219. Meta0 = Process#syn_registry_table.meta,
  220. %% log
  221. case Reason of
  222. normal -> ok;
  223. killed -> ok;
  224. _ ->
  225. error_logger:error_msg("Process with key ~p and pid ~p exited with reason: ~p", [Key0, Pid, Reason])
  226. end,
  227. %% delete from table
  228. remove_process_by_key(Key0),
  229. %% return
  230. {Key0, Meta0}
  231. end,
  232. %% callback
  233. case ProcessExitCallbackModule of
  234. undefined -> ok;
  235. _ -> ProcessExitCallbackModule:ProcessExitCallbackFunction(Key, Pid, Meta, Reason)
  236. end
  237. end),
  238. %% return
  239. {noreply, State};
  240. handle_info(Info, State) ->
  241. error_logger:warning_msg("Received an unknown info message: ~p", [Info]),
  242. {noreply, State}.
  243. %% ----------------------------------------------------------------------------------------------------------
  244. %% Terminate
  245. %% ----------------------------------------------------------------------------------------------------------
  246. -spec terminate(Reason :: any(), #state{}) -> terminated.
  247. terminate(Reason, _State) ->
  248. error_logger:info_msg("Terminating syn_registry with reason: ~p", [Reason]),
  249. terminated.
  250. %% ----------------------------------------------------------------------------------------------------------
  251. %% Convert process state when code is changed.
  252. %% ----------------------------------------------------------------------------------------------------------
  253. -spec code_change(OldVsn :: any(), #state{}, Extra :: any()) -> {ok, #state{}}.
  254. code_change(_OldVsn, State, _Extra) ->
  255. {ok, State}.
  256. %% ===================================================================
  257. %% Internal
  258. %% ===================================================================
  259. -spec i_find_by_key(on_connected_node, Key :: any()) -> Process :: #syn_registry_table{} | undefined.
  260. i_find_by_key(on_connected_node, Key) ->
  261. case i_find_by_key(Key) of
  262. undefined -> undefined;
  263. Process -> return_if_on_connected_node(Process)
  264. end.
  265. -spec i_find_by_key(Key :: any()) -> Process :: #syn_registry_table{} | undefined.
  266. i_find_by_key(Key) ->
  267. case mnesia:dirty_read(syn_registry_table, Key) of
  268. [Process] -> Process;
  269. _ -> undefined
  270. end.
  271. -spec i_find_by_pid(on_connected_node, Pid :: pid()) -> Process :: #syn_registry_table{} | undefined.
  272. i_find_by_pid(on_connected_node, Pid) ->
  273. case i_find_by_pid(Pid) of
  274. undefined -> undefined;
  275. Process -> return_if_on_connected_node(Process)
  276. end.
  277. -spec i_find_by_pid(Pid :: pid()) -> Process :: #syn_registry_table{} | undefined.
  278. i_find_by_pid(Pid) ->
  279. case mnesia:dirty_index_read(syn_registry_table, Pid, #syn_registry_table.pid) of
  280. [Process] -> Process;
  281. _ -> undefined
  282. end.
  283. -spec return_if_on_connected_node(Process :: #syn_registry_table{}) -> Process :: #syn_registry_table{} | undefined.
  284. return_if_on_connected_node(Process) ->
  285. case lists:member(Process#syn_registry_table.node, [node() | nodes()]) of
  286. true -> Process;
  287. _ -> undefined
  288. end.
  289. -spec remove_process_by_key(Key :: any()) -> ok.
  290. remove_process_by_key(Key) ->
  291. mnesia:dirty_delete(syn_registry_table, Key).