syn_groups.erl 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. %% ==========================================================================================================
  2. %% Syn - A global Process Registry and Process Group manager.
  3. %%
  4. %% The MIT License (MIT)
  5. %%
  6. %% Copyright (c) 2016 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
  7. %%
  8. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  9. %% of this software and associated documentation files (the "Software"), to deal
  10. %% in the Software without restriction, including without limitation the rights
  11. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  12. %% copies of the Software, and to permit persons to whom the Software is
  13. %% furnished to do so, subject to the following conditions:
  14. %%
  15. %% The above copyright notice and this permission notice shall be included in
  16. %% all copies or substantial portions of the Software.
  17. %%
  18. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  19. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  20. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  21. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  22. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  23. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  24. %% THE SOFTWARE.
  25. %% ==========================================================================================================
  26. -module(syn_groups).
  27. %% API
  28. -export([start_link/0]).
  29. -export([join/2]).
  30. -export([leave/2]).
  31. -export([member/2]).
  32. -export([get_members/1]).
  33. -export([publish/2]).
  34. -export([multi_call/2]).
  35. -export([multi_call_reply/2]).
  36. %% gen_server callbacks
  37. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  38. %% records
  39. -record(state, {}).
  40. %% macros
  41. -define(MULTI_CALL_TIMEOUT_MS, 5000).
  42. %% include
  43. -include("syn.hrl").
  44. %% ===================================================================
  45. %% API
  46. %% ===================================================================
  47. -spec start_link() -> {ok, pid()} | {error, any()}.
  48. start_link() ->
  49. Options = [],
  50. gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
  51. -spec join(Name :: any(), Pid :: pid()) -> ok | {error, pid_already_in_group}.
  52. join(Name, Pid) ->
  53. Node = node(Pid),
  54. gen_server:call({?MODULE, Node}, {join, Name, Pid}).
  55. -spec leave(Name :: any(), Pid :: pid()) -> ok | {error, undefined | pid_not_in_group}.
  56. leave(Name, Pid) ->
  57. Node = node(Pid),
  58. gen_server:call({?MODULE, Node}, {leave, Name, Pid}).
  59. -spec member(Pid :: pid(), Name :: any()) -> boolean().
  60. member(Pid, Name) ->
  61. i_member(Pid, Name).
  62. -spec get_members(Name :: any()) -> [pid()].
  63. get_members(Name) ->
  64. i_get_members(Name).
  65. -spec publish(Name :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
  66. publish(Name, Message) ->
  67. MemberPids = i_get_members(Name),
  68. FSend = fun(Pid) ->
  69. Pid ! Message
  70. end,
  71. lists:foreach(FSend, MemberPids),
  72. {ok, length(MemberPids)}.
  73. -spec multi_call(Name :: any(), Message :: any()) ->
  74. {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
  75. multi_call(Name, Message) ->
  76. Self = self(),
  77. MemberPids = i_get_members(Name),
  78. FSend = fun(Pid) ->
  79. Pid ! {syn_multi_call, Self, Message}
  80. end,
  81. lists:foreach(FSend, MemberPids),
  82. collect_replies(MemberPids).
  83. -spec multi_call_reply(CallerPid :: pid(), Reply :: any()) -> ok.
  84. multi_call_reply(CallerPid, Reply) ->
  85. CallerPid ! {syn_multi_call_reply, self(), Reply}.
  86. %% ===================================================================
  87. %% Callbacks
  88. %% ===================================================================
  89. %% ----------------------------------------------------------------------------------------------------------
  90. %% Init
  91. %% ----------------------------------------------------------------------------------------------------------
  92. -spec init([]) ->
  93. {ok, #state{}} |
  94. {ok, #state{}, Timeout :: non_neg_integer()} |
  95. ignore |
  96. {stop, Reason :: any()}.
  97. init([]) ->
  98. %% trap linked processes signal
  99. process_flag(trap_exit, true),
  100. %% build state
  101. {ok, #state{}}.
  102. %% ----------------------------------------------------------------------------------------------------------
  103. %% Call messages
  104. %% ----------------------------------------------------------------------------------------------------------
  105. -spec handle_call(Request :: any(), From :: any(), #state{}) ->
  106. {reply, Reply :: any(), #state{}} |
  107. {reply, Reply :: any(), #state{}, Timeout :: non_neg_integer()} |
  108. {noreply, #state{}} |
  109. {noreply, #state{}, Timeout :: non_neg_integer()} |
  110. {stop, Reason :: any(), Reply :: any(), #state{}} |
  111. {stop, Reason :: any(), #state{}}.
  112. handle_call({join, Name, Pid}, _From, State) ->
  113. case i_member(Pid, Name) of
  114. false ->
  115. %% add to table
  116. mnesia:dirty_write(#syn_groups_table{
  117. name = Name,
  118. pid = Pid,
  119. node = node()
  120. }),
  121. %% link
  122. erlang:link(Pid),
  123. %% return
  124. {reply, ok, State};
  125. _ ->
  126. {reply, pid_already_in_group, State}
  127. end;
  128. handle_call({leave, Name, Pid}, _From, State) ->
  129. %% we check again to return the correct response regardless of race conditions
  130. case i_find_by_pid(Pid) of
  131. undefined ->
  132. {reply, {error, undefined}, State};
  133. Process when Process#syn_groups_table.name =/= Name ->
  134. {error, pid_not_in_group};
  135. Process ->
  136. %% remove from table
  137. remove_process(Process),
  138. %% unlink
  139. erlang:unlink(Pid),
  140. %% reply
  141. {reply, ok, State}
  142. end;
  143. handle_call(Request, From, State) ->
  144. error_logger:warning_msg("Received from ~p an unknown call message: ~p", [Request, From]),
  145. {reply, undefined, State}.
  146. %% ----------------------------------------------------------------------------------------------------------
  147. %% Cast messages
  148. %% ----------------------------------------------------------------------------------------------------------
  149. -spec handle_cast(Msg :: any(), #state{}) ->
  150. {noreply, #state{}} |
  151. {noreply, #state{}, Timeout :: non_neg_integer()} |
  152. {stop, Reason :: any(), #state{}}.
  153. handle_cast(Msg, State) ->
  154. error_logger:warning_msg("Received an unknown cast message: ~p", [Msg]),
  155. {noreply, State}.
  156. %% ----------------------------------------------------------------------------------------------------------
  157. %% All non Call / Cast messages
  158. %% ----------------------------------------------------------------------------------------------------------
  159. -spec handle_info(Info :: any(), #state{}) ->
  160. {noreply, #state{}} |
  161. {noreply, #state{}, Timeout :: non_neg_integer()} |
  162. {stop, Reason :: any(), #state{}}.
  163. handle_info({'EXIT', Pid, Reason}, State) ->
  164. %% check if pid is in table
  165. case i_find_by_pid(Pid) of
  166. undefined ->
  167. %% log
  168. case Reason of
  169. normal -> ok;
  170. killed -> ok;
  171. _ ->
  172. error_logger:error_msg("Received an exit message from an unlinked process ~p with reason: ~p", [Pid, Reason])
  173. end;
  174. Process ->
  175. %% get group
  176. Name = Process#syn_groups_table.name,
  177. %% log
  178. case Reason of
  179. normal -> ok;
  180. killed -> ok;
  181. _ ->
  182. error_logger:error_msg("Process of group ~p and pid ~p exited with reason: ~p", [Name, Pid, Reason])
  183. end,
  184. %% delete from table
  185. remove_process(Process)
  186. end,
  187. %% return
  188. {noreply, State};
  189. handle_info(Info, State) ->
  190. error_logger:warning_msg("Received an unknown info message: ~p", [Info]),
  191. {noreply, State}.
  192. %% ----------------------------------------------------------------------------------------------------------
  193. %% Terminate
  194. %% ----------------------------------------------------------------------------------------------------------
  195. -spec terminate(Reason :: any(), #state{}) -> terminated.
  196. terminate(Reason, _State) ->
  197. error_logger:info_msg("Terminating syn_groups with reason: ~p", [Reason]),
  198. terminated.
  199. %% ----------------------------------------------------------------------------------------------------------
  200. %% Convert process state when code is changed.
  201. %% ----------------------------------------------------------------------------------------------------------
  202. -spec code_change(OldVsn :: any(), #state{}, Extra :: any()) -> {ok, #state{}}.
  203. code_change(_OldVsn, State, _Extra) ->
  204. {ok, State}.
  205. %% ===================================================================
  206. %% Internal
  207. %% ===================================================================
  208. -spec i_member(Pid :: pid(), Name :: any()) -> boolean().
  209. i_member(Pid, Name) when is_tuple(Name) ->
  210. i_member_check(Pid, {'==', '$1', {Name}});
  211. i_member(Pid, Name) ->
  212. i_member_check(Pid, {'=:=', '$1', Name}).
  213. -spec i_member_check(Pid :: pid(), NameGuard :: any()) -> boolean().
  214. i_member_check(Pid, NameGuard) ->
  215. %% build match specs
  216. MatchHead = #syn_groups_table{name = '$1', pid = '$2', _ = '_'},
  217. Guards = [NameGuard, {'=:=', '$2', Pid}],
  218. Result = '$2',
  219. %% select
  220. case mnesia:dirty_select(syn_groups_table, [{MatchHead, Guards, [Result]}]) of
  221. [] -> false;
  222. _ -> true
  223. end.
  224. -spec i_get_members(Name :: any()) -> [Process :: #syn_groups_table{}].
  225. i_get_members(Name) ->
  226. Processes = mnesia:dirty_read(syn_groups_table, Name),
  227. lists:map(fun(Process) ->
  228. Process#syn_groups_table.pid
  229. end, Processes).
  230. -spec i_find_by_pid(Pid :: pid()) -> Process :: #syn_groups_table{} | undefined.
  231. i_find_by_pid(Pid) ->
  232. case mnesia:dirty_index_read(syn_groups_table, Pid, #syn_groups_table.pid) of
  233. [Process] -> Process;
  234. _ -> undefined
  235. end.
  236. -spec remove_process(Process :: #syn_groups_table{}) -> ok.
  237. remove_process(Process) ->
  238. mnesia:dirty_delete_object(syn_groups_table, Process).
  239. -spec collect_replies(MemberPids :: [pid()]) ->
  240. {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
  241. collect_replies(MemberPids) ->
  242. collect_replies(MemberPids, []).
  243. -spec collect_replies(MemberPids :: [pid()], Replies :: [{pid(), Reply :: any()}]) ->
  244. {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
  245. collect_replies([], Replies) -> {Replies, []};
  246. collect_replies(MemberPids, Replies) ->
  247. receive
  248. {syn_multi_call_reply, Pid, Reply} ->
  249. MemberPids1 = lists:delete(Pid, MemberPids),
  250. collect_replies(MemberPids1, [{Pid, Reply} | Replies])
  251. after ?MULTI_CALL_TIMEOUT_MS ->
  252. {Replies, MemberPids}
  253. end.