syn_groups.erl 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. %% ==========================================================================================================
  2. %% Syn - A global Process Registry and Process Group manager.
  3. %%
  4. %% The MIT License (MIT)
  5. %%
  6. %% Copyright (c) 2015-2019 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
  7. %%
  8. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  9. %% of this software and associated documentation files (the "Software"), to deal
  10. %% in the Software without restriction, including without limitation the rights
  11. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  12. %% copies of the Software, and to permit persons to whom the Software is
  13. %% furnished to do so, subject to the following conditions:
  14. %%
  15. %% The above copyright notice and this permission notice shall be included in
  16. %% all copies or substantial portions of the Software.
  17. %%
  18. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  19. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  20. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  21. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  22. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  23. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  24. %% THE SOFTWARE.
  25. %% ==========================================================================================================
  26. -module(syn_groups).
  27. -behaviour(gen_server).
  28. %% API
  29. -export([start_link/0]).
  30. -export([join/2, join/3]).
  31. -export([leave/2]).
  32. -export([get_members/1, get_members/2]).
  33. -export([member/2]).
  34. %% sync API
  35. -export([sync_join/3, sync_leave/2]).
  36. %% gen_server callbacks
  37. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  38. %% records
  39. -record(state, {}).
  40. %% includes
  41. -include("syn.hrl").
  42. %% ===================================================================
  43. %% API
  44. %% ===================================================================
  45. -spec start_link() -> {ok, pid()} | {error, any()}.
  46. start_link() ->
  47. Options = [],
  48. gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
  49. -spec join(GroupName :: term(), Pid :: pid()) -> ok.
  50. join(GroupName, Pid) ->
  51. join(GroupName, Pid, undefined).
  52. -spec join(GroupName :: any(), Pid :: pid(), Meta :: any()) -> ok.
  53. join(GroupName, Pid, Meta) when is_pid(Pid) ->
  54. Node = node(Pid),
  55. gen_server:call({?MODULE, Node}, {join_on_node, GroupName, Pid, Meta}).
  56. -spec leave(GroupName :: term(), Pid :: pid()) -> ok | {error, Reason :: term()}.
  57. leave(GroupName, Pid) ->
  58. case find_process_entry_by_name_and_pid(GroupName, Pid) of
  59. undefined ->
  60. {error, not_in_group};
  61. _ ->
  62. Node = node(Pid),
  63. gen_server:call({?MODULE, Node}, {leave_on_node, GroupName, Pid})
  64. end.
  65. -spec get_members(Name :: any()) -> [pid()].
  66. get_members(GroupName) ->
  67. Entries = mnesia:dirty_read(syn_groups_table, GroupName),
  68. Pids = [Entry#syn_groups_table.pid || Entry <- Entries],
  69. lists:sort(Pids).
  70. -spec get_members(GroupName :: any(), with_meta) -> [{pid(), Meta :: any()}].
  71. get_members(GroupName, with_meta) ->
  72. Entries = mnesia:dirty_read(syn_groups_table, GroupName),
  73. Pids = [{Entry#syn_groups_table.pid, Entry#syn_groups_table.meta} || Entry <- Entries],
  74. lists:sort(Pids).
  75. -spec member(Pid :: pid(), GroupName :: term()) -> boolean().
  76. member(Pid, GroupName) ->
  77. case find_process_entry_by_name_and_pid(GroupName, Pid) of
  78. undefined -> false;
  79. _ -> true
  80. end.
  81. -spec sync_join(GroupName :: term(), Pid :: pid(), Meta :: term()) -> ok.
  82. sync_join(GroupName, Pid, Meta) ->
  83. gen_server:cast(?MODULE, {sync_join, GroupName, Pid, Meta}).
  84. -spec sync_leave(GroupName :: term(), Pid :: pid()) -> ok.
  85. sync_leave(GroupName, Pid) ->
  86. gen_server:cast(?MODULE, {sync_leave, GroupName, Pid}).
  87. %% ===================================================================
  88. %% Callbacks
  89. %% ===================================================================
  90. %% ----------------------------------------------------------------------------------------------------------
  91. %% Init
  92. %% ----------------------------------------------------------------------------------------------------------
  93. -spec init([]) ->
  94. {ok, #state{}} |
  95. {ok, #state{}, Timeout :: non_neg_integer()} |
  96. ignore |
  97. {stop, Reason :: any()}.
  98. init([]) ->
  99. %% wait for table
  100. case mnesia:wait_for_tables([syn_groups_table], 10000) of
  101. ok ->
  102. %% monitor nodes
  103. ok = net_kernel:monitor_nodes(true),
  104. %% init
  105. {ok, #state{}};
  106. Reason ->
  107. {stop, {error_waiting_for_groups_table, Reason}}
  108. end.
  109. %% ----------------------------------------------------------------------------------------------------------
  110. %% Call messages
  111. %% ----------------------------------------------------------------------------------------------------------
  112. -spec handle_call(Request :: any(), From :: any(), #state{}) ->
  113. {reply, Reply :: any(), #state{}} |
  114. {reply, Reply :: any(), #state{}, Timeout :: non_neg_integer()} |
  115. {noreply, #state{}} |
  116. {noreply, #state{}, Timeout :: non_neg_integer()} |
  117. {stop, Reason :: any(), Reply :: any(), #state{}} |
  118. {stop, Reason :: any(), #state{}}.
  119. handle_call({join_on_node, GroupName, Pid, Meta}, _From, State) ->
  120. join_on_node(GroupName, Pid, Meta),
  121. %% multicast
  122. rpc:eval_everywhere(nodes(), ?MODULE, sync_join, [GroupName, Pid, Meta]),
  123. %% return
  124. {reply, ok, State};
  125. handle_call({leave_on_node, GroupName, Pid}, _From, State) ->
  126. case leave_on_node(GroupName, Pid) of
  127. ok ->
  128. %% multicast
  129. rpc:eval_everywhere(nodes(), ?MODULE, sync_leave, [GroupName, Pid]),
  130. %% return
  131. {reply, ok, State};
  132. {error, Reason} ->
  133. %% return
  134. {reply, {error, Reason}, State}
  135. end;
  136. handle_call(Request, From, State) ->
  137. error_logger:warning_msg("Syn(~p): Received from ~p an unknown call message: ~p~n", [node(), Request, From]),
  138. {reply, undefined, State}.
  139. %% ----------------------------------------------------------------------------------------------------------
  140. %% Cast messages
  141. %% ----------------------------------------------------------------------------------------------------------
  142. -spec handle_cast(Msg :: any(), #state{}) ->
  143. {noreply, #state{}} |
  144. {noreply, #state{}, Timeout :: non_neg_integer()} |
  145. {stop, Reason :: any(), #state{}}.
  146. handle_cast({sync_join, GroupName, Pid, Meta}, State) ->
  147. %% add to table
  148. add_to_local_table(GroupName, Pid, Meta, undefined),
  149. %% return
  150. {noreply, State};
  151. handle_cast({sync_leave, GroupName, Pid}, State) ->
  152. %% remove entry
  153. remove_from_local_table(GroupName, Pid),
  154. %% return
  155. {noreply, State};
  156. handle_cast(Msg, State) ->
  157. error_logger:warning_msg("Syn(~p): Received an unknown cast message: ~p~n", [node(), Msg]),
  158. {noreply, State}.
  159. %% ----------------------------------------------------------------------------------------------------------
  160. %% All non Call / Cast messages
  161. %% ----------------------------------------------------------------------------------------------------------
  162. -spec handle_info(Info :: any(), #state{}) ->
  163. {noreply, #state{}} |
  164. {noreply, #state{}, Timeout :: non_neg_integer()} |
  165. {stop, Reason :: any(), #state{}}.
  166. handle_info({'DOWN', _MonitorRef, process, Pid, Reason}, State) ->
  167. case find_processes_entry_by_pid(Pid) of
  168. [] ->
  169. %% log
  170. log_process_exit(undefined, Pid, Reason);
  171. Entries ->
  172. lists:foreach(fun(Entry) ->
  173. %% get process info
  174. GroupName = Entry#syn_groups_table.name,
  175. %% log
  176. log_process_exit(GroupName, Pid, Reason),
  177. %% remove from table
  178. remove_from_local_table(Entry),
  179. %% multicast
  180. rpc:eval_everywhere(nodes(), ?MODULE, sync_leave, [GroupName, Pid])
  181. end, Entries)
  182. end,
  183. %% return
  184. {noreply, State};
  185. handle_info(Info, State) ->
  186. error_logger:warning_msg("Syn(~p): Received an unknown info message: ~p~n", [node(), Info]),
  187. {noreply, State}.
  188. %% ----------------------------------------------------------------------------------------------------------
  189. %% Terminate
  190. %% ----------------------------------------------------------------------------------------------------------
  191. -spec terminate(Reason :: any(), #state{}) -> terminated.
  192. terminate(Reason, _State) ->
  193. error_logger:info_msg("Syn(~p): Terminating with reason: ~p~n", [node(), Reason]),
  194. terminated.
  195. %% ----------------------------------------------------------------------------------------------------------
  196. %% Convert process state when code is changed.
  197. %% ----------------------------------------------------------------------------------------------------------
  198. -spec code_change(OldVsn :: any(), #state{}, Extra :: any()) -> {ok, #state{}}.
  199. code_change(_OldVsn, State, _Extra) ->
  200. {ok, State}.
  201. %% ===================================================================
  202. %% Internal
  203. %% ===================================================================
  204. -spec join_on_node(GroupName :: any(), Pid :: pid(), Meta :: any()) -> ok.
  205. join_on_node(GroupName, Pid, Meta) ->
  206. MonitorRef = case find_processes_entry_by_pid(Pid) of
  207. [] ->
  208. %% process is not monitored yet, add
  209. erlang:monitor(process, Pid);
  210. [Entry | _] ->
  211. Entry#syn_groups_table.monitor_ref
  212. end,
  213. %% add to table
  214. add_to_local_table(GroupName, Pid, Meta, MonitorRef).
  215. -spec leave_on_node(GroupName :: any(), Pid :: pid()) -> ok | {error, Reason :: term()}.
  216. leave_on_node(GroupName, Pid) ->
  217. case find_process_entry_by_name_and_pid(GroupName, Pid) of
  218. undefined ->
  219. {error, not_in_group};
  220. Entry when Entry#syn_groups_table.monitor_ref =/= undefined ->
  221. %% is this the last group process is in?
  222. case find_processes_entry_by_pid(Pid) of
  223. [Entry] ->
  224. %% demonitor
  225. erlang:demonitor(Entry#syn_groups_table.monitor_ref);
  226. _ ->
  227. ok
  228. end,
  229. %% remove from table
  230. remove_from_local_table(Entry)
  231. end.
  232. -spec add_to_local_table(GroupName :: any(), Pid :: pid(), Meta :: any(), MonitorRef :: undefined | reference()) -> ok.
  233. add_to_local_table(GroupName, Pid, Meta, MonitorRef) ->
  234. %% clean if any
  235. remove_from_local_table(GroupName, Pid),
  236. %% write
  237. mnesia:dirty_write(#syn_groups_table{
  238. name = GroupName,
  239. pid = Pid,
  240. node = node(Pid),
  241. meta = Meta,
  242. monitor_ref = MonitorRef
  243. }).
  244. -spec remove_from_local_table(GroupName :: term(), Pid :: pid()) -> ok | {error, Reason :: term()}.
  245. remove_from_local_table(GroupName, Pid) ->
  246. case find_process_entry_by_name_and_pid(GroupName, Pid) of
  247. undefined ->
  248. {error, not_in_group};
  249. Entry ->
  250. %% remove from table
  251. remove_from_local_table(Entry)
  252. end.
  253. -spec remove_from_local_table(Entry :: #syn_groups_table{}) -> ok.
  254. remove_from_local_table(Entry) ->
  255. mnesia:dirty_delete_object(syn_groups_table, Entry).
  256. -spec find_processes_entry_by_pid(Pid :: pid()) -> Entries :: list(#syn_groups_table{}).
  257. find_processes_entry_by_pid(Pid) when is_pid(Pid) ->
  258. mnesia:dirty_index_read(syn_groups_table, Pid, #syn_groups_table.pid).
  259. -spec find_process_entry_by_name_and_pid(GroupName :: term(), Pid :: pid()) -> Entry :: #syn_groups_table{} | undefined.
  260. find_process_entry_by_name_and_pid(GroupName, Pid) ->
  261. %% build match specs
  262. MatchHead = #syn_groups_table{name = GroupName, pid = Pid, _ = '_'},
  263. Guards = [],
  264. Result = '$_',
  265. %% select
  266. case mnesia:dirty_select(syn_groups_table, [{MatchHead, Guards, [Result]}]) of
  267. [Entry] -> Entry;
  268. [] -> undefined
  269. end.
  270. -spec log_process_exit(Name :: term(), Pid :: pid(), Reason :: term()) -> ok.
  271. log_process_exit(GroupName, Pid, Reason) ->
  272. case Reason of
  273. normal -> ok;
  274. shutdown -> ok;
  275. {shutdown, _} -> ok;
  276. killed -> ok;
  277. _ ->
  278. case GroupName of
  279. undefined ->
  280. error_logger:error_msg(
  281. "Syn(~p): Received a DOWN message from an unmonitored process ~p with reason: ~p~n",
  282. [node(), Pid, Reason]
  283. );
  284. _ ->
  285. error_logger:error_msg(
  286. "Syn(~p): Process in group ~p and pid ~p exited with reason: ~p~n",
  287. [node(), GroupName, Pid, Reason]
  288. )
  289. end
  290. end.