syn_groups.erl 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. %% ==========================================================================================================
  2. %% Syn - A global Process Registry and Process Group manager.
  3. %%
  4. %% The MIT License (MIT)
  5. %%
  6. %% Copyright (c) 2016 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
  7. %%
  8. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  9. %% of this software and associated documentation files (the "Software"), to deal
  10. %% in the Software without restriction, including without limitation the rights
  11. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  12. %% copies of the Software, and to permit persons to whom the Software is
  13. %% furnished to do so, subject to the following conditions:
  14. %%
  15. %% The above copyright notice and this permission notice shall be included in
  16. %% all copies or substantial portions of the Software.
  17. %%
  18. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  19. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  20. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  21. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  22. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  23. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  24. %% THE SOFTWARE.
  25. %% ==========================================================================================================
  26. -module(syn_groups).
  27. -behaviour(gen_server).
  28. %% API
  29. -export([start_link/0]).
  30. -export([join/2, join/3]).
  31. -export([leave/2]).
  32. -export([member/2]).
  33. -export([get_members/1, get_members/2]).
  34. -export([get_local_members/1, get_local_members/2]).
  35. -export([publish/2]).
  36. -export([publish_to_local/2]).
  37. -export([multi_call/2, multi_call/3]).
  38. -export([multi_call_reply/2]).
  39. %% gen_server callbacks
  40. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  41. %% internal
  42. -export([multi_call_and_receive/4]).
  43. %% records
  44. -record(state, {
  45. process_groups_process_exit_callback_module = undefined :: atom(),
  46. process_groups_process_exit_callback_function = undefined :: atom()
  47. }).
  48. %% macros
  49. -define(DEFAULT_MULTI_CALL_TIMEOUT_MS, 5000).
  50. %% include
  51. -include("syn.hrl").
  52. %% ===================================================================
  53. %% API
  54. %% ===================================================================
  55. -spec start_link() -> {ok, pid()} | {error, any()}.
  56. start_link() ->
  57. Options = [],
  58. gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
  59. -spec join(Name :: any(), Pid :: pid()) -> ok.
  60. join(Name, Pid) ->
  61. join(Name, Pid, undefined).
  62. -spec join(Name :: any(), Pid :: pid(), Meta :: any()) -> ok.
  63. join(Name, Pid, Meta) when is_pid(Pid) ->
  64. Node = node(Pid),
  65. gen_server:call({?MODULE, Node}, {join, Name, Pid, Meta}).
  66. -spec leave(Name :: any(), Pid :: pid()) -> ok | {error, pid_not_in_group}.
  67. leave(Name, Pid) when is_pid(Pid) ->
  68. Node = node(Pid),
  69. gen_server:call({?MODULE, Node}, {leave, Name, Pid}).
  70. -spec member(Pid :: pid(), Name :: any()) -> boolean().
  71. member(Pid, Name) when is_pid(Pid) ->
  72. i_member(Pid, Name).
  73. -spec get_members(Name :: any()) -> [pid()].
  74. get_members(Name) ->
  75. i_get_members(Name).
  76. -spec get_members(Name :: any(), with_meta) -> [{pid(), Meta :: any()}].
  77. get_members(Name, with_meta) ->
  78. i_get_members(Name, with_meta).
  79. -spec get_local_members(Name :: any()) -> [pid()].
  80. get_local_members(Name) ->
  81. i_get_local_members(Name).
  82. -spec get_local_members(Name :: any(), with_meta) -> [{pid(), Meta :: any()}].
  83. get_local_members(Name, with_meta) ->
  84. i_get_local_members(Name, with_meta).
  85. -spec publish(Name :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
  86. publish(Name, Message) ->
  87. MemberPids = i_get_members(Name),
  88. FSend = fun(Pid) ->
  89. Pid ! Message
  90. end,
  91. lists:foreach(FSend, MemberPids),
  92. {ok, length(MemberPids)}.
  93. -spec publish_to_local(Name :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
  94. publish_to_local(Name, Message) ->
  95. MemberPids = i_get_local_members(Name),
  96. FSend = fun(Pid) ->
  97. Pid ! Message
  98. end,
  99. lists:foreach(FSend, MemberPids),
  100. {ok, length(MemberPids)}.
  101. -spec multi_call(Name :: any(), Message :: any()) -> {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
  102. multi_call(Name, Message) ->
  103. multi_call(Name, Message, ?DEFAULT_MULTI_CALL_TIMEOUT_MS).
  104. -spec multi_call(Name :: any(), Message :: any(), Timeout :: non_neg_integer()) ->
  105. {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
  106. multi_call(Name, Message, Timeout) ->
  107. Self = self(),
  108. MemberPids = i_get_members(Name),
  109. FSend = fun(Pid) ->
  110. spawn_link(?MODULE, multi_call_and_receive, [Self, Pid, Message, Timeout])
  111. end,
  112. lists:foreach(FSend, MemberPids),
  113. collect_replies(MemberPids).
  114. -spec multi_call_reply(CallerPid :: pid(), Reply :: any()) -> {syn_multi_call_reply, pid(), Reply :: any()}.
  115. multi_call_reply(CallerPid, Reply) ->
  116. CallerPid ! {syn_multi_call_reply, self(), Reply}.
  117. %% ===================================================================
  118. %% Callbacks
  119. %% ===================================================================
  120. %% ----------------------------------------------------------------------------------------------------------
  121. %% Init
  122. %% ----------------------------------------------------------------------------------------------------------
  123. -spec init([]) ->
  124. {ok, #state{}} |
  125. {ok, #state{}, Timeout :: non_neg_integer()} |
  126. ignore |
  127. {stop, Reason :: any()}.
  128. init([]) ->
  129. %% trap linked processes signal
  130. process_flag(trap_exit, true),
  131. %% get options
  132. {ok, [ProcessExitCallbackModule, ProcessExitCallbackFunction]} = syn_utils:get_env_value(
  133. process_groups_process_exit_callback,
  134. [undefined, undefined]
  135. ),
  136. %% build state
  137. {ok, #state{
  138. process_groups_process_exit_callback_module = ProcessExitCallbackModule,
  139. process_groups_process_exit_callback_function = ProcessExitCallbackFunction
  140. }}.
  141. %% ----------------------------------------------------------------------------------------------------------
  142. %% Call messages
  143. %% ----------------------------------------------------------------------------------------------------------
  144. -spec handle_call(Request :: any(), From :: any(), #state{}) ->
  145. {reply, Reply :: any(), #state{}} |
  146. {reply, Reply :: any(), #state{}, Timeout :: non_neg_integer()} |
  147. {noreply, #state{}} |
  148. {noreply, #state{}, Timeout :: non_neg_integer()} |
  149. {stop, Reason :: any(), Reply :: any(), #state{}} |
  150. {stop, Reason :: any(), #state{}}.
  151. handle_call({join, Name, Pid, Meta}, _From, State) ->
  152. %% check if pid is already in group
  153. case find_by_pid_and_name(Pid, Name) of
  154. undefined ->
  155. ok;
  156. Process ->
  157. %% remove old reference
  158. mnesia:dirty_delete_object(Process)
  159. end,
  160. %% add to group
  161. mnesia:dirty_write(#syn_groups_table{
  162. name = Name,
  163. pid = Pid,
  164. node = node(),
  165. meta = Meta
  166. }),
  167. %% link
  168. erlang:link(Pid),
  169. %% return
  170. {reply, ok, State};
  171. handle_call({leave, Name, Pid}, _From, State) ->
  172. case find_by_pid_and_name(Pid, Name) of
  173. undefined ->
  174. {reply, {error, pid_not_in_group}, State};
  175. Process ->
  176. %% remove from table
  177. remove_process(Process),
  178. %% unlink only when process is no more in groups
  179. case find_groups_by_pid(Pid) of
  180. [] -> erlang:unlink(Pid);
  181. _ -> nop
  182. end,
  183. %% reply
  184. {reply, ok, State}
  185. end;
  186. handle_call(Request, From, State) ->
  187. error_logger:warning_msg("Received from ~p an unknown call message: ~p", [Request, From]),
  188. {reply, undefined, State}.
  189. %% ----------------------------------------------------------------------------------------------------------
  190. %% Cast messages
  191. %% ----------------------------------------------------------------------------------------------------------
  192. -spec handle_cast(Msg :: any(), #state{}) ->
  193. {noreply, #state{}} |
  194. {noreply, #state{}, Timeout :: non_neg_integer()} |
  195. {stop, Reason :: any(), #state{}}.
  196. handle_cast(Msg, State) ->
  197. error_logger:warning_msg("Received an unknown cast message: ~p", [Msg]),
  198. {noreply, State}.
  199. %% ----------------------------------------------------------------------------------------------------------
  200. %% All non Call / Cast messages
  201. %% ----------------------------------------------------------------------------------------------------------
  202. -spec handle_info(Info :: any(), #state{}) ->
  203. {noreply, #state{}} |
  204. {noreply, #state{}, Timeout :: non_neg_integer()} |
  205. {stop, Reason :: any(), #state{}}.
  206. handle_info({'EXIT', Pid, Reason}, #state{
  207. process_groups_process_exit_callback_module = ProcessExitCallbackModule,
  208. process_groups_process_exit_callback_function = ProcessExitCallbackFunction
  209. } = State) ->
  210. %% check if pid is in table
  211. case find_groups_by_pid(Pid) of
  212. [] ->
  213. %% log
  214. case Reason of
  215. normal -> ok;
  216. killed -> ok;
  217. _ ->
  218. error_logger:error_msg("Received an exit message from an unlinked process ~p with reason: ~p", [Pid, Reason])
  219. end;
  220. Processes ->
  221. F = fun(Process) ->
  222. %% get group & meta
  223. Name = Process#syn_groups_table.name,
  224. Meta = Process#syn_groups_table.meta,
  225. %% log
  226. case Reason of
  227. normal -> ok;
  228. killed -> ok;
  229. _ ->
  230. error_logger:error_msg("Process of group ~p and pid ~p exited with reason: ~p", [Name, Pid, Reason])
  231. end,
  232. %% delete from table
  233. remove_process(Process),
  234. %% callback in separate process
  235. case ProcessExitCallbackModule of
  236. undefined ->
  237. ok;
  238. _ ->
  239. spawn(fun() ->
  240. ProcessExitCallbackModule:ProcessExitCallbackFunction(Name, Pid, Meta, Reason)
  241. end)
  242. end
  243. end,
  244. lists:foreach(F, Processes)
  245. end,
  246. %% return
  247. {noreply, State};
  248. handle_info(Info, State) ->
  249. error_logger:warning_msg("Received an unknown info message: ~p", [Info]),
  250. {noreply, State}.
  251. %% ----------------------------------------------------------------------------------------------------------
  252. %% Terminate
  253. %% ----------------------------------------------------------------------------------------------------------
  254. -spec terminate(Reason :: any(), #state{}) -> terminated.
  255. terminate(Reason, _State) ->
  256. error_logger:info_msg("Terminating syn_groups with reason: ~p", [Reason]),
  257. terminated.
  258. %% ----------------------------------------------------------------------------------------------------------
  259. %% Convert process state when code is changed.
  260. %% ----------------------------------------------------------------------------------------------------------
  261. -spec code_change(OldVsn :: any(), #state{}, Extra :: any()) -> {ok, #state{}}.
  262. code_change(_OldVsn, State, _Extra) ->
  263. {ok, State}.
  264. %% ===================================================================
  265. %% Internal
  266. %% ===================================================================
  267. -spec find_by_pid_and_name(Pid :: pid(), Name :: any()) -> Process :: #syn_groups_table{} | undefined.
  268. find_by_pid_and_name(Pid, Name) ->
  269. %% build match specs
  270. MatchHead = #syn_groups_table{name = Name, pid = Pid, _ = '_'},
  271. Guards = [],
  272. Result = '$_',
  273. %% select
  274. case mnesia:dirty_select(syn_groups_table, [{MatchHead, Guards, [Result]}]) of
  275. [] -> undefined;
  276. [Process] -> Process
  277. end.
  278. -spec i_member(Pid :: pid(), Name :: any()) -> boolean().
  279. i_member(Pid, Name) ->
  280. case find_by_pid_and_name(Pid, Name) of
  281. undefined -> false;
  282. _ -> true
  283. end.
  284. -spec i_get_members(Name :: any()) -> [pid()].
  285. i_get_members(Name) ->
  286. Processes = mnesia:dirty_read(syn_groups_table, Name),
  287. Pids = lists:map(fun(Process) ->
  288. Process#syn_groups_table.pid
  289. end, Processes),
  290. lists:sort(Pids).
  291. -spec i_get_members(Name :: any(), with_meta) -> [{pid(), Meta :: any()}].
  292. i_get_members(Name, with_meta) ->
  293. Processes = mnesia:dirty_read(syn_groups_table, Name),
  294. PidsWithMeta = lists:map(fun(Process) ->
  295. {Process#syn_groups_table.pid, Process#syn_groups_table.meta}
  296. end, Processes),
  297. lists:keysort(1, PidsWithMeta).
  298. -spec i_get_local_members(Name :: any()) -> [pid()].
  299. i_get_local_members(Name) ->
  300. %% build name guard
  301. NameGuard = case is_tuple(Name) of
  302. true -> {'==', '$1', {Name}};
  303. _ -> {'=:=', '$1', Name}
  304. end,
  305. %% build match specs
  306. MatchHead = #syn_groups_table{name = '$1', node = '$2', pid = '$3', _ = '_'},
  307. Guards = [NameGuard, {'=:=', '$2', node()}],
  308. Result = '$3',
  309. %% select
  310. Pids = mnesia:dirty_select(syn_groups_table, [{MatchHead, Guards, [Result]}]),
  311. lists:sort(Pids).
  312. -spec i_get_local_members(Name :: any(), with_meta) -> [{pid(), Meta :: any()}].
  313. i_get_local_members(Name, with_meta) ->
  314. %% build name guard
  315. NameGuard = case is_tuple(Name) of
  316. true -> {'==', '$1', {Name}};
  317. _ -> {'=:=', '$1', Name}
  318. end,
  319. %% build match specs
  320. MatchHead = #syn_groups_table{name = '$1', node = '$2', pid = '$3', meta = '$4', _ = '_'},
  321. Guards = [NameGuard, {'=:=', '$2', node()}],
  322. Result = {{'$3', '$4'}},
  323. %% select
  324. PidsWithMeta = mnesia:dirty_select(syn_groups_table, [{MatchHead, Guards, [Result]}]),
  325. lists:keysort(1, PidsWithMeta).
  326. -spec find_groups_by_pid(Pid :: pid()) -> [Process :: #syn_groups_table{}].
  327. find_groups_by_pid(Pid) ->
  328. mnesia:dirty_index_read(syn_groups_table, Pid, #syn_groups_table.pid).
  329. -spec remove_process(Process :: #syn_groups_table{}) -> ok.
  330. remove_process(Process) ->
  331. mnesia:dirty_delete_object(syn_groups_table, Process).
  332. -spec multi_call_and_receive(
  333. CollectorPid :: pid(),
  334. Pid :: pid(),
  335. Message :: any(),
  336. Timeout :: non_neg_integer()
  337. ) -> any().
  338. multi_call_and_receive(CollectorPid, Pid, Message, Timeout) ->
  339. MonitorRef = monitor(process, Pid),
  340. Pid ! {syn_multi_call, self(), Message},
  341. receive
  342. {syn_multi_call_reply, Pid, Reply} ->
  343. CollectorPid ! {reply, Pid, Reply};
  344. {'DOWN', MonitorRef, _, _, _} ->
  345. CollectorPid ! {bad_pid, Pid}
  346. after Timeout ->
  347. CollectorPid ! {bad_pid, Pid}
  348. end.
  349. -spec collect_replies(MemberPids :: [pid()]) -> {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
  350. collect_replies(MemberPids) ->
  351. collect_replies(MemberPids, [], []).
  352. -spec collect_replies(MemberPids :: [pid()], [{pid(), Reply :: any()}], [pid()]) ->
  353. {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
  354. collect_replies([], Replies, BadPids) -> {Replies, BadPids};
  355. collect_replies(MemberPids, Replies, BadPids) ->
  356. receive
  357. {reply, Pid, Reply} ->
  358. MemberPids1 = lists:delete(Pid, MemberPids),
  359. collect_replies(MemberPids1, [{Pid, Reply} | Replies], BadPids);
  360. {bad_pid, Pid} ->
  361. MemberPids1 = lists:delete(Pid, MemberPids),
  362. collect_replies(MemberPids1, Replies, [Pid | BadPids])
  363. end.