gproc_monitor.erl 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. %% ``The contents of this file are subject to the Erlang Public License,
  2. %% Version 1.1, (the "License"); you may not use this file except in
  3. %% compliance with the License. You should have received a copy of the
  4. %% Erlang Public License along with this software. If not, it can be
  5. %% retrieved via the world wide web at http://www.erlang.org/.
  6. %%
  7. %% Software distributed under the License is distributed on an "AS IS"
  8. %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  9. %% the License for the specific language governing rights and limitations
  10. %% under the License.
  11. %%
  12. %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
  13. %% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
  14. %% AB. All Rights Reserved.''
  15. %%
  16. %% @author Ulf Wiger <ulf.wiger@feuerlabs.com>
  17. %%
  18. %% @doc
  19. %% This module implements a notification system for gproc names
  20. %% When a process subscribes to notifications for a given name, a message
  21. %% will be sent each time that name is registered
  22. -module(gproc_monitor).
  23. -behaviour(gen_server).
  24. %% API
  25. -export([subscribe/1,
  26. unsubscribe/1]).
  27. %% Process start function
  28. -export([start_link/0]).
  29. %% gen_server callbacks
  30. -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
  31. terminate/2, code_change/3]).
  32. -define(SERVER, ?MODULE).
  33. -define(TAB, ?MODULE).
  34. -record(state, {}).
  35. %%%===================================================================
  36. %%% API
  37. %%%===================================================================
  38. %%--------------------------------------------------------------------
  39. %% @spec subscribe(key()) -> ok
  40. %%
  41. %% @doc
  42. %% Subscribe to registration events for a certain name
  43. %%
  44. %% The subscribing process will receive a `{gproc_monitor, Name, Pid}' message
  45. %% whenever a process registers under the given name, and a
  46. %% `{gproc_monitor, Name, undefined}' message when the name is unregistered,
  47. %% either explicitly, or because the registered process dies.
  48. %%
  49. %% When the subscription is first ordered, one of the above messages will be
  50. %% sent immediately, indicating the current status of the name.
  51. %% @end
  52. %%--------------------------------------------------------------------
  53. subscribe({T,S,_} = Key) when (T==n orelse T==a)
  54. andalso (S==g orelse S==l) ->
  55. try gproc:reg({p,l,{?MODULE,Key}})
  56. catch
  57. error:badarg -> ok
  58. end,
  59. gen_server:cast(?SERVER, {subscribe, self(), Key}).
  60. %%--------------------------------------------------------------------
  61. %% @spec unsubscribe(key()) -> ok
  62. %%
  63. %% @doc
  64. %% Unsubscribe from registration events for a certain name
  65. %%
  66. %% This function is the reverse of subscribe/1. It removes the subscription.
  67. %% @end
  68. %%--------------------------------------------------------------------
  69. unsubscribe({T,S,_} = Key) when (T==n orelse T==a)
  70. andalso (S==g orelse S==l) ->
  71. try gproc:unreg({p, l, {?MODULE,Key}})
  72. catch
  73. error:badarg -> ok
  74. end,
  75. gen_server:cast(?SERVER, {unsubscribe, self(), Key}).
  76. %%--------------------------------------------------------------------
  77. %% @doc
  78. %% Starts the server
  79. %%
  80. %% @spec start_link() -> {ok, Pid} | ignore | {error, Error}
  81. %% @end
  82. %%--------------------------------------------------------------------
  83. start_link() ->
  84. Me = self(),
  85. _ = case ets:info(?TAB, owner) of
  86. undefined ->
  87. ets:new(?TAB, [ordered_set, protected, named_table,
  88. {heir, self(), []}]);
  89. Me ->
  90. ok
  91. end,
  92. {ok, Pid} = proc_lib:start_link(?MODULE, init, [Me]),
  93. ets:give_away(?TAB, Pid, []),
  94. {ok, Pid}.
  95. %%%===================================================================
  96. %%% gen_server callbacks
  97. %%%===================================================================
  98. %%--------------------------------------------------------------------
  99. %% @private
  100. %% @doc
  101. %% Initializes the server
  102. %%
  103. %% @spec init(Args) -> {ok, State} |
  104. %% {ok, State, Timeout} |
  105. %% ignore |
  106. %% {stop, Reason}
  107. %% @end
  108. %%--------------------------------------------------------------------
  109. init(Parent) ->
  110. process_flag(priority, high),
  111. register(?SERVER, self()),
  112. proc_lib:init_ack(Parent, {ok, self()}),
  113. receive {'ETS-TRANSFER',?TAB,_,_} -> ok end,
  114. gen_server:enter_loop(?MODULE, [], #state{}).
  115. %%--------------------------------------------------------------------
  116. %% @private
  117. %% @doc
  118. %% Handling call messages
  119. %%
  120. %% @spec handle_call(Request, From, State) ->
  121. %% {reply, Reply, State} |
  122. %% {reply, Reply, State, Timeout} |
  123. %% {noreply, State} |
  124. %% {noreply, State, Timeout} |
  125. %% {stop, Reason, Reply, State} |
  126. %% {stop, Reason, State}
  127. %% @end
  128. %%--------------------------------------------------------------------
  129. handle_call(_Request, _From, State) ->
  130. Reply = ok,
  131. {reply, Reply, State}.
  132. %%--------------------------------------------------------------------
  133. %% @private
  134. %% @doc
  135. %% Handling cast messages
  136. %%
  137. %% @spec handle_cast(Msg, State) -> {noreply, State} |
  138. %% {noreply, State, Timeout} |
  139. %% {stop, Reason, State}
  140. %% @end
  141. %%--------------------------------------------------------------------
  142. handle_cast({subscribe, Pid, Key}, State) ->
  143. Status = gproc:where(Key),
  144. add_subscription(Pid, Key),
  145. do_monitor(Key, Status),
  146. Pid ! {?MODULE, Key, Status},
  147. monitor_pid(Pid),
  148. {noreply, State};
  149. handle_cast({unsubscribe, Pid, Key}, State) ->
  150. del_subscription(Pid, Key),
  151. {noreply, State}.
  152. %%--------------------------------------------------------------------
  153. %% @private
  154. %% @doc
  155. %% Handling all non call/cast messages
  156. %%
  157. %% @spec handle_info(Info, State) -> {noreply, State} |
  158. %% {noreply, State, Timeout} |
  159. %% {stop, Reason, State}
  160. %% @end
  161. %%--------------------------------------------------------------------
  162. handle_info({gproc, unreg, _Ref, Name}, State) ->
  163. ets:delete(?TAB, {m, Name}),
  164. notify(Name, undefined),
  165. do_monitor(Name, undefined),
  166. {noreply, State};
  167. handle_info({gproc, {migrated,ToPid}, _Ref, Name}, State) ->
  168. ets:delete(?TAB, {m, Name}),
  169. notify(Name, {migrated, ToPid}),
  170. do_monitor(Name, ToPid),
  171. {noreply, State};
  172. handle_info({gproc, {failover,ToPid}, _Ref, Name}, State) ->
  173. ets:delete(?TAB, {m, Name}),
  174. notify(Name, {failover, ToPid}),
  175. do_monitor(Name, ToPid),
  176. {noreply, State};
  177. handle_info({gproc, _, registered, {{T,_,_} = Name, Pid, _}}, State)
  178. when T==n; T==a ->
  179. ets:delete(?TAB, {w, Name}),
  180. notify(Name, Pid),
  181. do_monitor(Name, Pid),
  182. {noreply, State};
  183. handle_info({'DOWN', _, process, Pid, _}, State) ->
  184. pid_is_down(Pid),
  185. {noreply, State};
  186. handle_info(_Msg, State) ->
  187. {noreply, State}.
  188. %%--------------------------------------------------------------------
  189. %% @private
  190. %% @doc
  191. %% This function is called by a gen_server when it is about to
  192. %% terminate. It should be the opposite of Module:init/1 and do any
  193. %% necessary cleaning up. When it returns, the gen_server terminates
  194. %% with Reason. The return value is ignored.
  195. %%
  196. %% @spec terminate(Reason, State) -> void()
  197. %% @end
  198. %%--------------------------------------------------------------------
  199. terminate(_Reason, _State) ->
  200. ok.
  201. %%--------------------------------------------------------------------
  202. %% @private
  203. %% @doc
  204. %% Convert process state when code is changed
  205. %%
  206. %% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
  207. %% @end
  208. %%--------------------------------------------------------------------
  209. code_change(_OldVsn, State, _Extra) ->
  210. {ok, State}.
  211. %%%===================================================================
  212. %%% Internal functions
  213. %%%===================================================================
  214. add_subscription(Pid, {_,_,_} = Key) when is_pid(Pid) ->
  215. ets:insert(?TAB, [{{s, Key, Pid}}, {{r, Pid, Key}}]).
  216. del_subscription(Pid, Key) ->
  217. ets:delete(?TAB, {{s, Key, Pid}}),
  218. ets:delete(?TAB, {{r, Pid, Key}}),
  219. maybe_cancel_wait(Key).
  220. do_monitor(Name, undefined) ->
  221. case ets:member(?TAB, {w, Name}) of
  222. false ->
  223. Ref = gproc:nb_wait(Name),
  224. ets:insert(?TAB, {{w, Name}, Ref});
  225. true ->
  226. ok
  227. end;
  228. do_monitor(Name, Pid) when is_pid(Pid) ->
  229. case ets:member(?TAB, {m, Name}) of
  230. true ->
  231. ok;
  232. _ ->
  233. Ref = gproc:monitor(Name),
  234. ets:insert(?TAB, {{m, Name}, Ref})
  235. end.
  236. monitor_pid(Pid) when is_pid(Pid) ->
  237. case ets:member(?TAB, {p,Pid}) of
  238. false ->
  239. Ref = erlang:monitor(process, Pid),
  240. ets:insert(?TAB, {{p,Pid}, Ref});
  241. true ->
  242. ok
  243. end.
  244. pid_is_down(Pid) ->
  245. Keys = ets:select(?TAB, [{ {{r, Pid, '$1'}}, [], ['$1'] }]),
  246. ets:select_delete(?TAB, [{ {{r, Pid, '$1'}}, [], [true] }]),
  247. lists:foreach(fun(K) ->
  248. ets:delete(?TAB, {s,K,Pid}),
  249. maybe_cancel_wait(K)
  250. end, Keys),
  251. ets:delete(?TAB, {p, Pid}).
  252. maybe_cancel_wait(K) ->
  253. case ets:next(?TAB, {s,K}) of
  254. {s,K,P} when is_pid(P) ->
  255. ok;
  256. _ ->
  257. gproc:cancel_wait_or_monitor(K),
  258. ets:delete(?TAB, {m, K}),
  259. ets:delete(?TAB, {w, K})
  260. end.
  261. notify(Name, Where) ->
  262. gproc:send({p, l, {?MODULE, Name}}, {?MODULE, Name, Where}).