123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- %% ``The contents of this file are subject to the Erlang Public License,
- %% Version 1.1, (the "License"); you may not use this file except in
- %% compliance with the License. You should have received a copy of the
- %% Erlang Public License along with this software. If not, it can be
- %% retrieved via the world wide web at http://www.erlang.org/.
- %%
- %% Software distributed under the License is distributed on an "AS IS"
- %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
- %% the License for the specific language governing rights and limitations
- %% under the License.
- %%
- %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
- %% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
- %% AB. All Rights Reserved.''
- %%
- %% @author Ulf Wiger <ulf.wiger@feuerlabs.com>
- %%
- %% @doc
- %% This module implements a notification system for gproc names
- %% When a process subscribes to notifications for a given name, a message
- %% will be sent each time that name is registered
- -module(gproc_monitor).
- -behaviour(gen_server).
- %% API
- -export([subscribe/1,
- unsubscribe/1]).
- %% Process start function
- -export([start_link/0]).
- %% gen_server callbacks
- -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
- -define(SERVER, ?MODULE).
- -define(TAB, ?MODULE).
- -record(state, {}).
- %%%===================================================================
- %%% API
- %%%===================================================================
- %%--------------------------------------------------------------------
- %% @spec subscribe(key()) -> ok
- %%
- %% @doc
- %% Subscribe to registration events for a certain name
- %%
- %% The subscribing process will receive a `{gproc_monitor, Name, Pid}' message
- %% whenever a process registers under the given name, and a
- %% `{gproc_monitor, Name, undefined}' message when the name is unregistered,
- %% either explicitly, or because the registered process dies.
- %%
- %% When the subscription is first ordered, one of the above messages will be
- %% sent immediately, indicating the current status of the name.
- %% @end
- %%--------------------------------------------------------------------
- subscribe({T,S,_} = Key) when (T==n orelse T==a)
- andalso (S==g orelse S==l) ->
- try gproc:reg({p,l,{?MODULE,Key}})
- catch
- error:badarg -> ok
- end,
- gen_server:cast(?SERVER, {subscribe, self(), Key}).
- %%--------------------------------------------------------------------
- %% @spec unsubscribe(key()) -> ok
- %%
- %% @doc
- %% Unsubscribe from registration events for a certain name
- %%
- %% This function is the reverse of subscribe/1. It removes the subscription.
- %% @end
- %%--------------------------------------------------------------------
- unsubscribe({T,S,_} = Key) when (T==n orelse T==a)
- andalso (S==g orelse S==l) ->
- try gproc:unreg({p, l, {?MODULE,Key}})
- catch
- error:badarg -> ok
- end,
- gen_server:cast(?SERVER, {unsubscribe, self(), Key}).
- %%--------------------------------------------------------------------
- %% @doc
- %% Starts the server
- %%
- %% @spec start_link() -> {ok, Pid} | ignore | {error, Error}
- %% @end
- %%--------------------------------------------------------------------
- start_link() ->
- Me = self(),
- _ = case ets:info(?TAB, owner) of
- undefined ->
- ets:new(?TAB, [ordered_set, protected, named_table,
- {heir, self(), []}]);
- Me ->
- ok
- end,
- {ok, Pid} = proc_lib:start_link(?MODULE, init, [Me]),
- ets:give_away(?TAB, Pid, []),
- {ok, Pid}.
- %%%===================================================================
- %%% gen_server callbacks
- %%%===================================================================
- %%--------------------------------------------------------------------
- %% @private
- %% @doc
- %% Initializes the server
- %%
- %% @spec init(Args) -> {ok, State} |
- %% {ok, State, Timeout} |
- %% ignore |
- %% {stop, Reason}
- %% @end
- %%--------------------------------------------------------------------
- init(Parent) ->
- process_flag(priority, high),
- register(?SERVER, self()),
- proc_lib:init_ack(Parent, {ok, self()}),
- receive {'ETS-TRANSFER',?TAB,_,_} -> ok end,
- gen_server:enter_loop(?MODULE, [], #state{}).
- %%--------------------------------------------------------------------
- %% @private
- %% @doc
- %% Handling call messages
- %%
- %% @spec handle_call(Request, From, State) ->
- %% {reply, Reply, State} |
- %% {reply, Reply, State, Timeout} |
- %% {noreply, State} |
- %% {noreply, State, Timeout} |
- %% {stop, Reason, Reply, State} |
- %% {stop, Reason, State}
- %% @end
- %%--------------------------------------------------------------------
- handle_call(_Request, _From, State) ->
- Reply = ok,
- {reply, Reply, State}.
- %%--------------------------------------------------------------------
- %% @private
- %% @doc
- %% Handling cast messages
- %%
- %% @spec handle_cast(Msg, State) -> {noreply, State} |
- %% {noreply, State, Timeout} |
- %% {stop, Reason, State}
- %% @end
- %%--------------------------------------------------------------------
- handle_cast({subscribe, Pid, Key}, State) ->
- Status = gproc:where(Key),
- add_subscription(Pid, Key),
- do_monitor(Key, Status),
- Pid ! {?MODULE, Key, Status},
- monitor_pid(Pid),
- {noreply, State};
- handle_cast({unsubscribe, Pid, Key}, State) ->
- del_subscription(Pid, Key),
- {noreply, State}.
- %%--------------------------------------------------------------------
- %% @private
- %% @doc
- %% Handling all non call/cast messages
- %%
- %% @spec handle_info(Info, State) -> {noreply, State} |
- %% {noreply, State, Timeout} |
- %% {stop, Reason, State}
- %% @end
- %%--------------------------------------------------------------------
- handle_info({gproc, unreg, _Ref, Name}, State) ->
- ets:delete(?TAB, {m, Name}),
- notify(Name, undefined),
- do_monitor(Name, undefined),
- {noreply, State};
- handle_info({gproc, {migrated,ToPid}, _Ref, Name}, State) ->
- ets:delete(?TAB, {m, Name}),
- notify(Name, {migrated, ToPid}),
- do_monitor(Name, ToPid),
- {noreply, State};
- handle_info({gproc, {failover,ToPid}, _Ref, Name}, State) ->
- ets:delete(?TAB, {m, Name}),
- notify(Name, {failover, ToPid}),
- do_monitor(Name, ToPid),
- {noreply, State};
- handle_info({gproc, _, registered, {{T,_,_} = Name, Pid, _}}, State)
- when T==n; T==a ->
- ets:delete(?TAB, {w, Name}),
- notify(Name, Pid),
- do_monitor(Name, Pid),
- {noreply, State};
- handle_info({'DOWN', _, process, Pid, _}, State) ->
- pid_is_down(Pid),
- {noreply, State};
- handle_info(_Msg, State) ->
- {noreply, State}.
- %%--------------------------------------------------------------------
- %% @private
- %% @doc
- %% This function is called by a gen_server when it is about to
- %% terminate. It should be the opposite of Module:init/1 and do any
- %% necessary cleaning up. When it returns, the gen_server terminates
- %% with Reason. The return value is ignored.
- %%
- %% @spec terminate(Reason, State) -> void()
- %% @end
- %%--------------------------------------------------------------------
- terminate(_Reason, _State) ->
- ok.
- %%--------------------------------------------------------------------
- %% @private
- %% @doc
- %% Convert process state when code is changed
- %%
- %% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
- %% @end
- %%--------------------------------------------------------------------
- code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
- %%%===================================================================
- %%% Internal functions
- %%%===================================================================
- add_subscription(Pid, {_,_,_} = Key) when is_pid(Pid) ->
- ets:insert(?TAB, [{{s, Key, Pid}}, {{r, Pid, Key}}]).
- del_subscription(Pid, Key) ->
- ets:delete(?TAB, {{s, Key, Pid}}),
- ets:delete(?TAB, {{r, Pid, Key}}),
- maybe_cancel_wait(Key).
- do_monitor(Name, undefined) ->
- case ets:member(?TAB, {w, Name}) of
- false ->
- Ref = gproc:nb_wait(Name),
- ets:insert(?TAB, {{w, Name}, Ref});
- true ->
- ok
- end;
- do_monitor(Name, Pid) when is_pid(Pid) ->
- case ets:member(?TAB, {m, Name}) of
- true ->
- ok;
- _ ->
- Ref = gproc:monitor(Name),
- ets:insert(?TAB, {{m, Name}, Ref})
- end.
- monitor_pid(Pid) when is_pid(Pid) ->
- case ets:member(?TAB, {p,Pid}) of
- false ->
- Ref = erlang:monitor(process, Pid),
- ets:insert(?TAB, {{p,Pid}, Ref});
- true ->
- ok
- end.
- pid_is_down(Pid) ->
- Keys = ets:select(?TAB, [{ {{r, Pid, '$1'}}, [], ['$1'] }]),
- ets:select_delete(?TAB, [{ {{r, Pid, '$1'}}, [], [true] }]),
- lists:foreach(fun(K) ->
- ets:delete(?TAB, {s,K,Pid}),
- maybe_cancel_wait(K)
- end, Keys),
- ets:delete(?TAB, {p, Pid}).
- maybe_cancel_wait(K) ->
- case ets:next(?TAB, {s,K}) of
- {s,K,P} when is_pid(P) ->
- ok;
- _ ->
- gproc:cancel_wait_or_monitor(K),
- ets:delete(?TAB, {m, K}),
- ets:delete(?TAB, {w, K})
- end.
- notify(Name, Where) ->
- gproc:send({p, l, {?MODULE, Name}}, {?MODULE, Name, Where}).
|