syn_backbone.erl 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. %% ==========================================================================================================
  2. %% Syn - A global process registry.
  3. %%
  4. %% Copyright (C) 2015, Roberto Ostinelli <roberto@ostinelli.net>.
  5. %% All rights reserved.
  6. %%
  7. %% The MIT License (MIT)
  8. %%
  9. %% Copyright (c) 2015 Roberto Ostinelli
  10. %%
  11. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  12. %% of this software and associated documentation files (the "Software"), to deal
  13. %% in the Software without restriction, including without limitation the rights
  14. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  15. %% copies of the Software, and to permit persons to whom the Software is
  16. %% furnished to do so, subject to the following conditions:
  17. %%
  18. %% The above copyright notice and this permission notice shall be included in
  19. %% all copies or substantial portions of the Software.
  20. %%
  21. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  22. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  23. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  24. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  25. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  26. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  27. %% THE SOFTWARE.
  28. %% ==========================================================================================================
  29. -module(syn_backbone).
  30. -behaviour(gen_server).
  31. %% API
  32. -export([start_link/0]).
  33. -export([register/2, unregister/1]).
  34. -export([find_by_key/1, find_by_pid/1]).
  35. -export([count/0, count/1]).
  36. %% gen_server callbacks
  37. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  38. %% records
  39. -record(state, {}).
  40. %% include
  41. -include("syn.hrl").
  42. %% ===================================================================
  43. %% API
  44. %% ===================================================================
  45. -spec start_link() -> {ok, pid()} | {error, any()}.
  46. start_link() ->
  47. Options = [],
  48. gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
  49. -spec find_by_key(Key :: any()) -> pid() | undefined.
  50. find_by_key(Key) ->
  51. case mnesia:dirty_read(syn_processes_table, Key) of
  52. [Process] -> return_pid_if_on_connected_node(Process);
  53. _ -> undefined
  54. end.
  55. -spec find_by_pid(Pid :: pid()) -> Key :: any() | undefined.
  56. find_by_pid(Pid) ->
  57. case mnesia:dirty_index_read(syn_processes_table, Pid, #syn_processes_table.pid) of
  58. [Process] -> return_key_if_on_connected_node(Process);
  59. _ -> undefined
  60. end.
  61. -spec register(Key :: any(), Pid :: pid()) -> ok | {error, taken}.
  62. register(Key, Pid) ->
  63. case find_by_key(Key) of
  64. undefined ->
  65. %% get processes's node
  66. Node = node(Pid),
  67. %% add to table
  68. mnesia:dirty_write(#syn_processes_table{
  69. key = Key,
  70. pid = Pid,
  71. node = Node
  72. }),
  73. %% link
  74. gen_server:call({?MODULE, Node}, {link_process, Pid});
  75. _ ->
  76. {error, taken}
  77. end.
  78. -spec unregister(Key :: any()) -> ok | {error, undefined}.
  79. unregister(Key) ->
  80. case find_by_key(Key) of
  81. undefined ->
  82. {error, undefined};
  83. Pid ->
  84. remove_process_by_key(Key),
  85. %% unlink
  86. gen_server:call(?MODULE, {unlink_process, Pid})
  87. end.
  88. -spec count() -> non_neg_integer().
  89. count() ->
  90. mnesia:table_info(syn_processes_table, size).
  91. -spec count(Node :: atom()) -> non_neg_integer().
  92. count(Node) ->
  93. %% build match specs
  94. MatchHead = #syn_processes_table{node = '$2', _ = '_'},
  95. Guard = {'=:=', '$2', Node},
  96. Result = '$2',
  97. %% select
  98. Processes = mnesia:dirty_select(syn_processes_table, [{MatchHead, [Guard], [Result]}]),
  99. length(Processes).
  100. %% ===================================================================
  101. %% Callbacks
  102. %% ===================================================================
  103. %% ----------------------------------------------------------------------------------------------------------
  104. %% Init
  105. %% ----------------------------------------------------------------------------------------------------------
  106. -spec init([]) ->
  107. {ok, #state{}} |
  108. {ok, #state{}, Timeout :: non_neg_integer()} |
  109. ignore |
  110. {stop, Reason :: any()}.
  111. init([]) ->
  112. %% trap linked processes signal
  113. process_flag(trap_exit, true),
  114. %% init
  115. case initdb() of
  116. ok ->
  117. {ok, #state{}};
  118. Other ->
  119. {stop, Other}
  120. end.
  121. %% ----------------------------------------------------------------------------------------------------------
  122. %% Call messages
  123. %% ----------------------------------------------------------------------------------------------------------
  124. -spec handle_call(Request :: any(), From :: any(), #state{}) ->
  125. {reply, Reply :: any(), #state{}} |
  126. {reply, Reply :: any(), #state{}, Timeout :: non_neg_integer()} |
  127. {noreply, #state{}} |
  128. {noreply, #state{}, Timeout :: non_neg_integer()} |
  129. {stop, Reason :: any(), Reply :: any(), #state{}} |
  130. {stop, Reason :: any(), #state{}}.
  131. handle_call({link_process, Pid}, _From, State) ->
  132. erlang:link(Pid),
  133. {reply, ok, State};
  134. handle_call({unlink_process, Pid}, _From, State) ->
  135. erlang:unlink(Pid),
  136. {reply, ok, State};
  137. handle_call(Request, From, State) ->
  138. error_logger:warning_msg("Received from ~p an unknown call message: ~p~n", [Request, From]),
  139. {reply, undefined, State}.
  140. %% ----------------------------------------------------------------------------------------------------------
  141. %% Cast messages
  142. %% ----------------------------------------------------------------------------------------------------------
  143. -spec handle_cast(Msg :: any(), #state{}) ->
  144. {noreply, #state{}} |
  145. {noreply, #state{}, Timeout :: non_neg_integer()} |
  146. {stop, Reason :: any(), #state{}}.
  147. handle_cast(Msg, State) ->
  148. error_logger:warning_msg("Received an unknown cast message: ~p~n", [Msg]),
  149. {noreply, State}.
  150. %% ----------------------------------------------------------------------------------------------------------
  151. %% All non Call / Cast messages
  152. %% ----------------------------------------------------------------------------------------------------------
  153. -spec handle_info(Info :: any(), #state{}) ->
  154. {noreply, #state{}} |
  155. {noreply, #state{}, Timeout :: non_neg_integer()} |
  156. {stop, Reason :: any(), #state{}}.
  157. handle_info({'EXIT', Pid, Reason}, State) ->
  158. %% do not lock backbone
  159. spawn(fun() ->
  160. %% check if pid is in table
  161. case find_by_pid(Pid) of
  162. undefined ->
  163. case Reason of
  164. normal -> ok;
  165. killed -> ok;
  166. _ ->
  167. error_logger:warning_msg("Received a crash message from an unlinked process ~p with reason: ~p~n", [Pid, Reason])
  168. end;
  169. Key ->
  170. %% delete from table
  171. remove_process_by_key(Key),
  172. %% log
  173. case Reason of
  174. normal -> ok;
  175. killed -> ok;
  176. _ -> error_logger:error_msg("Process with key ~p crashed with reason: ~p~n", [Key, Reason])
  177. end
  178. end
  179. end),
  180. %% return
  181. {noreply, State};
  182. handle_info(Info, State) ->
  183. error_logger:warning_msg("Received an unknown info message: ~p~n", [Info]),
  184. {noreply, State}.
  185. %% ----------------------------------------------------------------------------------------------------------
  186. %% Terminate
  187. %% ----------------------------------------------------------------------------------------------------------
  188. -spec terminate(Reason :: any(), #state{}) -> terminated.
  189. terminate(Reason, _State) ->
  190. error_logger:info_msg("Terminating syn backbone with reason: ~p~n", [Reason]),
  191. terminated.
  192. %% ----------------------------------------------------------------------------------------------------------
  193. %% Convert process state when code is changed.
  194. %% ----------------------------------------------------------------------------------------------------------
  195. -spec code_change(OldVsn :: any(), #state{}, Extra :: any()) -> {ok, #state{}}.
  196. code_change(_OldVsn, State, _Extra) ->
  197. {ok, State}.
  198. %% ===================================================================
  199. %% Internal
  200. %% ===================================================================
  201. -spec initdb() -> ok | {error, any()}.
  202. initdb() ->
  203. %% ensure all nodes are added - this covers when mnesia is in ram only mode
  204. mnesia:change_config(extra_db_nodes, [node() | nodes()]),
  205. %% ensure table exists
  206. CurrentNode = node(),
  207. case mnesia:create_table(syn_processes_table, [
  208. {type, set},
  209. {ram_copies, [node() | nodes()]},
  210. {attributes, record_info(fields, syn_processes_table)},
  211. {index, [#syn_processes_table.pid]},
  212. {storage_properties, [{ets, [{read_concurrency, true}]}]}
  213. ]) of
  214. {atomic, ok} ->
  215. error_logger:info_msg("syn_processes_table was successfully created.~n"),
  216. ok;
  217. {aborted, {already_exists, syn_processes_table}} ->
  218. %% table already exists, try to add current node as copy
  219. add_table_copy_to_local_node();
  220. {aborted, {already_exists, syn_processes_table, CurrentNode}} ->
  221. %% table already exists, try to add current node as copy
  222. add_table_copy_to_local_node();
  223. Other ->
  224. error_logger:error_msg("Error while creating syn_processes_table: ~p~n", [Other]),
  225. {error, Other}
  226. end.
  227. -spec add_table_copy_to_local_node() -> ok | {error, any()}.
  228. add_table_copy_to_local_node() ->
  229. CurrentNode = node(),
  230. case mnesia:add_table_copy(syn_processes_table, node(), ram_copies) of
  231. {atomic, ok} ->
  232. error_logger:info_msg("Copy of syn_processes_table was successfully added to current node.~n"),
  233. ok;
  234. {aborted, {already_exists, syn_processes_table}} ->
  235. error_logger:info_msg("Copy of syn_processes_table is already added to current node.~n"),
  236. ok;
  237. {aborted, {already_exists, syn_processes_table, CurrentNode}} ->
  238. error_logger:info_msg("Copy of syn_processes_table is already added to current node.~n"),
  239. ok;
  240. {aborted, Reason} ->
  241. error_logger:error_msg("Error while creating copy of syn_processes_table: ~p~n", [Reason]),
  242. {error, Reason}
  243. end.
  244. -spec return_pid_if_on_connected_node(Process :: #syn_processes_table{}) -> pid() | undefined.
  245. return_pid_if_on_connected_node(Process) ->
  246. case lists:member(Process#syn_processes_table.node, [node() | nodes()]) of
  247. true -> Process#syn_processes_table.pid;
  248. _ -> undefined
  249. end.
  250. -spec return_key_if_on_connected_node(Process :: #syn_processes_table{}) -> pid() | undefined.
  251. return_key_if_on_connected_node(Process) ->
  252. case lists:member(Process#syn_processes_table.node, [node() | nodes()]) of
  253. true -> Process#syn_processes_table.key;
  254. _ -> undefined
  255. end.
  256. -spec remove_process_by_key(Key :: any()) -> ok.
  257. remove_process_by_key(Key) ->
  258. mnesia:dirty_delete(syn_processes_table, Key).