syn_backbone.erl 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. %% ==========================================================================================================
  2. %% Syn - A global process registry.
  3. %%
  4. %% Copyright (C) 2015, Roberto Ostinelli <roberto@ostinelli.net>.
  5. %% All rights reserved.
  6. %%
  7. %% The MIT License (MIT)
  8. %%
  9. %% Copyright (c) 2015 Roberto Ostinelli
  10. %%
  11. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  12. %% of this software and associated documentation files (the "Software"), to deal
  13. %% in the Software without restriction, including without limitation the rights
  14. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  15. %% copies of the Software, and to permit persons to whom the Software is
  16. %% furnished to do so, subject to the following conditions:
  17. %%
  18. %% The above copyright notice and this permission notice shall be included in
  19. %% all copies or substantial portions of the Software.
  20. %%
  21. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  22. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  23. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  24. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  25. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  26. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  27. %% THE SOFTWARE.
  28. %% ==========================================================================================================
  29. -module(syn_backbone).
  30. -behaviour(gen_server).
  31. %% API
  32. -export([start_link/0]).
  33. -export([initdb/0]).
  34. -export([register/2, register/3]).
  35. -export([unregister/1]).
  36. -export([find_by_key/1, find_by_key/2]).
  37. -export([find_by_pid/1, find_by_pid/2]).
  38. -export([count/0, count/1]).
  39. %% gen_server callbacks
  40. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  41. %% records
  42. -record(state, {
  43. process_exit_callback_module = undefined :: atom(),
  44. process_exit_callback_function = undefined :: atom()
  45. }).
  46. %% include
  47. -include("syn.hrl").
  48. %% ===================================================================
  49. %% API
  50. %% ===================================================================
  51. -spec start_link() -> {ok, pid()} | {error, any()}.
  52. start_link() ->
  53. Options = [],
  54. gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
  55. -spec initdb() -> ok | {error, any()}.
  56. initdb() ->
  57. initdb_do().
  58. -spec find_by_key(Key :: any()) -> pid() | undefined.
  59. find_by_key(Key) ->
  60. case i_find_by_key(Key) of
  61. undefined -> undefined;
  62. Process -> Process#syn_processes_table.pid
  63. end.
  64. -spec find_by_key(Key :: any(), with_meta) -> {pid(), Meta :: any()} | undefined.
  65. find_by_key(Key, with_meta) ->
  66. case i_find_by_key(Key) of
  67. undefined -> undefined;
  68. Process -> {Process#syn_processes_table.pid, Process#syn_processes_table.meta}
  69. end.
  70. -spec find_by_pid(Pid :: pid()) -> Key :: any() | undefined.
  71. find_by_pid(Pid) ->
  72. case i_find_by_pid(Pid) of
  73. undefined -> undefined;
  74. Process -> Process#syn_processes_table.key
  75. end.
  76. -spec find_by_pid(Pid :: pid(), with_meta) -> {Key :: any(), Meta :: any()} | undefined.
  77. find_by_pid(Pid, with_meta) ->
  78. case i_find_by_pid(Pid) of
  79. undefined -> undefined;
  80. Process -> {Process#syn_processes_table.key, Process#syn_processes_table.meta}
  81. end.
  82. -spec register(Key :: any(), Pid :: pid()) -> ok | {error, taken | pid_already_registered}.
  83. register(Key, Pid) ->
  84. register(Key, Pid, undefined).
  85. -spec register(Key :: any(), Pid :: pid(), Meta :: any()) -> ok | {error, taken}.
  86. register(Key, Pid, Meta) ->
  87. Node = node(Pid),
  88. gen_server:call({?MODULE, Node}, {register_on_node, Key, Pid, Meta}).
  89. -spec unregister(Key :: any()) -> ok | {error, undefined}.
  90. unregister(Key) ->
  91. case find_by_key(Key) of
  92. undefined ->
  93. {error, undefined};
  94. Pid ->
  95. Node = node(Pid),
  96. gen_server:call({?MODULE, Node}, {unregister_on_node, Key, Pid})
  97. end.
  98. -spec count() -> non_neg_integer().
  99. count() ->
  100. mnesia:table_info(syn_processes_table, size).
  101. -spec count(Node :: atom()) -> non_neg_integer().
  102. count(Node) ->
  103. %% build match specs
  104. MatchHead = #syn_processes_table{node = '$2', _ = '_'},
  105. Guard = {'=:=', '$2', Node},
  106. Result = '$2',
  107. %% select
  108. Processes = mnesia:dirty_select(syn_processes_table, [{MatchHead, [Guard], [Result]}]),
  109. length(Processes).
  110. %% ===================================================================
  111. %% Callbacks
  112. %% ===================================================================
  113. %% ----------------------------------------------------------------------------------------------------------
  114. %% Init
  115. %% ----------------------------------------------------------------------------------------------------------
  116. -spec init([]) ->
  117. {ok, #state{}} |
  118. {ok, #state{}, Timeout :: non_neg_integer()} |
  119. ignore |
  120. {stop, Reason :: any()}.
  121. init([]) ->
  122. %% trap linked processes signal
  123. process_flag(trap_exit, true),
  124. %% get options
  125. {ok, [ProcessExitCallbackModule, ProcessExitCallbackFunction]} = syn_utils:get_env_value(
  126. process_exit_callback,
  127. [undefined, undefined]
  128. ),
  129. %% build state
  130. {ok, #state{
  131. process_exit_callback_module = ProcessExitCallbackModule,
  132. process_exit_callback_function = ProcessExitCallbackFunction
  133. }}.
  134. %% ----------------------------------------------------------------------------------------------------------
  135. %% Call messages
  136. %% ----------------------------------------------------------------------------------------------------------
  137. -spec handle_call(Request :: any(), From :: any(), #state{}) ->
  138. {reply, Reply :: any(), #state{}} |
  139. {reply, Reply :: any(), #state{}, Timeout :: non_neg_integer()} |
  140. {noreply, #state{}} |
  141. {noreply, #state{}, Timeout :: non_neg_integer()} |
  142. {stop, Reason :: any(), Reply :: any(), #state{}} |
  143. {stop, Reason :: any(), #state{}}.
  144. handle_call({register_on_node, Key, Pid, Meta}, _From, State) ->
  145. %% check & register in gen_server process to ensure atomicity at node level without transaction lock
  146. %% atomicity is obviously not at cluster level, which is covered by syn_consistency.
  147. case i_find_by_key(Key) of
  148. undefined ->
  149. case i_find_by_pid(Pid) of
  150. undefined ->
  151. %% add to table
  152. mnesia:dirty_write(#syn_processes_table{
  153. key = Key,
  154. pid = Pid,
  155. node = node(),
  156. meta = Meta
  157. }),
  158. %% link
  159. erlang:link(Pid),
  160. %% return
  161. {reply, ok, State};
  162. _ ->
  163. {reply, {error, pid_already_registered}, State}
  164. end;
  165. _ ->
  166. {reply, {error, taken}, State}
  167. end;
  168. handle_call({unregister_on_node, Key, Pid}, _From, State) ->
  169. remove_process_by_key(Key),
  170. erlang:unlink(Pid),
  171. {reply, ok, State};
  172. handle_call({unlink_process, Pid}, _From, State) ->
  173. erlang:unlink(Pid),
  174. {reply, ok, State};
  175. handle_call(Request, From, State) ->
  176. error_logger:warning_msg("Received from ~p an unknown call message: ~p", [Request, From]),
  177. {reply, undefined, State}.
  178. %% ----------------------------------------------------------------------------------------------------------
  179. %% Cast messages
  180. %% ----------------------------------------------------------------------------------------------------------
  181. -spec handle_cast(Msg :: any(), #state{}) ->
  182. {noreply, #state{}} |
  183. {noreply, #state{}, Timeout :: non_neg_integer()} |
  184. {stop, Reason :: any(), #state{}}.
  185. handle_cast(Msg, State) ->
  186. error_logger:warning_msg("Received an unknown cast message: ~p", [Msg]),
  187. {noreply, State}.
  188. %% ----------------------------------------------------------------------------------------------------------
  189. %% All non Call / Cast messages
  190. %% ----------------------------------------------------------------------------------------------------------
  191. -spec handle_info(Info :: any(), #state{}) ->
  192. {noreply, #state{}} |
  193. {noreply, #state{}, Timeout :: non_neg_integer()} |
  194. {stop, Reason :: any(), #state{}}.
  195. handle_info({'EXIT', Pid, Reason}, #state{
  196. process_exit_callback_module = ProcessExitCallbackModule,
  197. process_exit_callback_function = ProcessExitCallbackFunction
  198. } = State) ->
  199. %% do not lock backbone
  200. spawn(fun() ->
  201. %% check if pid is in table
  202. case find_by_pid(Pid, with_meta) of
  203. undefined ->
  204. case Reason of
  205. normal -> ok;
  206. killed -> ok;
  207. _ ->
  208. error_logger:warning_msg("Received an exit message from an unlinked process ~p with reason: ~p", [Pid, Reason])
  209. end;
  210. {Key, Meta} ->
  211. %% delete from table
  212. remove_process_by_key(Key),
  213. %% log
  214. case Reason of
  215. normal -> ok;
  216. killed -> ok;
  217. _ ->
  218. error_logger:error_msg("Process with key ~p exited with reason: ~p", [Key, Reason])
  219. end,
  220. %% callback
  221. case ProcessExitCallbackModule of
  222. undefined -> ok;
  223. _ -> ProcessExitCallbackModule:ProcessExitCallbackFunction(Key, Pid, Meta, Reason)
  224. end
  225. end
  226. end),
  227. %% return
  228. {noreply, State};
  229. handle_info(Info, State) ->
  230. error_logger:warning_msg("Received an unknown info message: ~p", [Info]),
  231. {noreply, State}.
  232. %% ----------------------------------------------------------------------------------------------------------
  233. %% Terminate
  234. %% ----------------------------------------------------------------------------------------------------------
  235. -spec terminate(Reason :: any(), #state{}) -> terminated.
  236. terminate(Reason, _State) ->
  237. error_logger:info_msg("Terminating syn backbone with reason: ~p", [Reason]),
  238. terminated.
  239. %% ----------------------------------------------------------------------------------------------------------
  240. %% Convert process state when code is changed.
  241. %% ----------------------------------------------------------------------------------------------------------
  242. -spec code_change(OldVsn :: any(), #state{}, Extra :: any()) -> {ok, #state{}}.
  243. code_change(_OldVsn, State, _Extra) ->
  244. {ok, State}.
  245. %% ===================================================================
  246. %% Internal
  247. %% ===================================================================
  248. -spec initdb_do() -> ok | {error, any()}.
  249. initdb_do() ->
  250. %% get nodes
  251. CurrentNode = node(),
  252. ClusterNodes = [CurrentNode | nodes()],
  253. %% ensure all nodes are added
  254. mnesia:change_config(extra_db_nodes, ClusterNodes),
  255. %% ensure table exists
  256. case mnesia:create_table(syn_processes_table, [
  257. {type, set},
  258. {ram_copies, ClusterNodes},
  259. {attributes, record_info(fields, syn_processes_table)},
  260. {index, [#syn_processes_table.pid]},
  261. {storage_properties, [{ets, [{read_concurrency, true}]}]}
  262. ]) of
  263. {atomic, ok} ->
  264. error_logger:info_msg("syn_processes_table was successfully created"),
  265. ok;
  266. {aborted, {already_exists, syn_processes_table}} ->
  267. %% table already exists, try to add current node as copy
  268. add_table_copy_to_current_node();
  269. {aborted, {already_exists, syn_processes_table, CurrentNode}} ->
  270. %% table already exists, try to add current node as copy
  271. add_table_copy_to_current_node();
  272. Other ->
  273. error_logger:error_msg("Error while creating syn_processes_table: ~p", [Other]),
  274. {error, Other}
  275. end.
  276. -spec add_table_copy_to_current_node() -> ok | {error, any()}.
  277. add_table_copy_to_current_node() ->
  278. %% wait for table
  279. mnesia:wait_for_tables([syn_processes_table], 10000),
  280. %% add copy
  281. CurrentNode = node(),
  282. case mnesia:add_table_copy(syn_processes_table, CurrentNode, ram_copies) of
  283. {atomic, ok} ->
  284. error_logger:info_msg("Copy of syn_processes_table was successfully added to current node"),
  285. ok;
  286. {aborted, {already_exists, syn_processes_table}} ->
  287. error_logger:info_msg("Copy of syn_processes_table is already added to current node"),
  288. ok;
  289. {aborted, {already_exists, syn_processes_table, CurrentNode}} ->
  290. error_logger:info_msg("Copy of syn_processes_table is already added to current node"),
  291. ok;
  292. {aborted, Reason} ->
  293. error_logger:error_msg("Error while creating copy of syn_processes_table: ~p", [Reason]),
  294. {error, Reason}
  295. end.
  296. -spec i_find_by_key(Key :: any()) -> Process :: #syn_processes_table{} | undefined.
  297. i_find_by_key(Key) ->
  298. case mnesia:dirty_read(syn_processes_table, Key) of
  299. [Process] -> return_if_on_connected_node(Process);
  300. _ -> undefined
  301. end.
  302. -spec i_find_by_pid(Pid :: pid()) -> Process :: #syn_processes_table{} | undefined.
  303. i_find_by_pid(Pid) ->
  304. case mnesia:dirty_index_read(syn_processes_table, Pid, #syn_processes_table.pid) of
  305. [Process] -> return_if_on_connected_node(Process);
  306. _ -> undefined
  307. end.
  308. -spec return_if_on_connected_node(Process :: #syn_processes_table{}) -> Process :: #syn_processes_table{} | undefined.
  309. return_if_on_connected_node(Process) ->
  310. case lists:member(Process#syn_processes_table.node, [node() | nodes()]) of
  311. true -> Process;
  312. _ -> undefined
  313. end.
  314. -spec remove_process_by_key(Key :: any()) -> ok.
  315. remove_process_by_key(Key) ->
  316. mnesia:dirty_delete(syn_processes_table, Key).