|
@@ -73,6 +73,9 @@
|
|
await/1, await/2,
|
|
await/1, await/2,
|
|
nb_wait/1,
|
|
nb_wait/1,
|
|
cancel_wait/2,
|
|
cancel_wait/2,
|
|
|
|
+ cancel_wait_or_monitor/1,
|
|
|
|
+ monitor/1,
|
|
|
|
+ demonitor/2,
|
|
lookup_pid/1,
|
|
lookup_pid/1,
|
|
lookup_pids/1,
|
|
lookup_pids/1,
|
|
lookup_value/1,
|
|
lookup_value/1,
|
|
@@ -119,6 +122,7 @@
|
|
|
|
|
|
%% Callbacks for behaviour support
|
|
%% Callbacks for behaviour support
|
|
-export([whereis_name/1,
|
|
-export([whereis_name/1,
|
|
|
|
+ register_name/2,
|
|
unregister_name/1]).
|
|
unregister_name/1]).
|
|
|
|
|
|
-export([default/1]).
|
|
-export([default/1]).
|
|
@@ -656,6 +660,15 @@ nb_wait({n,l,_} = Key) ->
|
|
nb_wait(Key) ->
|
|
nb_wait(Key) ->
|
|
erlang:error(badarg, [Key]).
|
|
erlang:error(badarg, [Key]).
|
|
|
|
|
|
|
|
+%% @spec cancel_wait(Key::key(), Ref) -> ok
|
|
|
|
+%% Ref = all | reference()
|
|
|
|
+%%
|
|
|
|
+%% @doc Cancels a previous call to nb_wait/1
|
|
|
|
+%%
|
|
|
|
+%% If `Ref = all', all wait requests on `Key' from the calling process
|
|
|
|
+%% are canceled.
|
|
|
|
+%% @end
|
|
|
|
+%%
|
|
cancel_wait({_,g,_} = Key, Ref) ->
|
|
cancel_wait({_,g,_} = Key, Ref) ->
|
|
?CHK_DIST,
|
|
?CHK_DIST,
|
|
cast({cancel_wait, self(), Key, Ref}, g),
|
|
cast({cancel_wait, self(), Key, Ref}, g),
|
|
@@ -664,6 +677,46 @@ cancel_wait({_,l,_} = Key, Ref) ->
|
|
cast({cancel_wait, self(), Key, Ref}, l),
|
|
cast({cancel_wait, self(), Key, Ref}, l),
|
|
ok.
|
|
ok.
|
|
|
|
|
|
|
|
+cancel_wait_or_monitor({_,g,_} = Key) ->
|
|
|
|
+ ?CHK_DIST,
|
|
|
|
+ cast({cancel_wait_or_monitor, self(), Key}, g),
|
|
|
|
+ ok;
|
|
|
|
+cancel_wait_or_monitor({_,l,_} = Key) ->
|
|
|
|
+ cast({cancel_wait_or_monitor, self(), Key}, l),
|
|
|
|
+ ok.
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+%% @spec monitor(key()) -> reference()
|
|
|
|
+%%
|
|
|
|
+%% @doc monitor a registered name
|
|
|
|
+%% This function works much like erlang:monitor(process, Pid), but monitors
|
|
|
|
+%% a unique name registered via gproc. A message, `{gproc, unreg, Ref, Key}'
|
|
|
|
+%% will be sent to the requesting process, if the name is unregistered or
|
|
|
|
+%% the registered process dies.
|
|
|
|
+%%
|
|
|
|
+%% If the name is not yet registered, the same message is sent immediately.
|
|
|
|
+%% @end
|
|
|
|
+monitor({T,g,_} = Key) when T==n; T==a ->
|
|
|
|
+ ?CHK_DIST,
|
|
|
|
+ call({monitor, Key, self()}, g);
|
|
|
|
+monitor({T,l,_} = Key) when T==n; T==a ->
|
|
|
|
+ call({monitor, Key, self()}, l);
|
|
|
|
+monitor(Key) ->
|
|
|
|
+ erlang:error(badarg, [Key]).
|
|
|
|
+
|
|
|
|
+%% @spec demonitor(key(), reference()) -> ok
|
|
|
|
+%%
|
|
|
|
+%% @doc Remove a monitor on a registered name
|
|
|
|
+%% This function is the reverse of monitor/1. It removes a monitor previously
|
|
|
|
+%% set on a unique name. This function always succeeds given legal input.
|
|
|
|
+%% @end
|
|
|
|
+demonitor({T,g,_} = Key, Ref) when T==n; T==a ->
|
|
|
|
+ ?CHK_DIST,
|
|
|
|
+ call({demonitor, Key, Ref, self()}, g);
|
|
|
|
+demonitor({T,l,_} = Key, Ref) when T==n; T==a ->
|
|
|
|
+ call({demonitor, Key, Ref, self()}, l);
|
|
|
|
+demonitor(Key, Ref) ->
|
|
|
|
+ erlang:error(badarg, [Key, Ref]).
|
|
|
|
|
|
%% @spec reg(Key::key(), Value) -> true
|
|
%% @spec reg(Key::key(), Value) -> true
|
|
%%
|
|
%%
|
|
@@ -795,7 +848,7 @@ unreg(Key) ->
|
|
{_, l, _} ->
|
|
{_, l, _} ->
|
|
case ets:member(?TAB, {Key,self()}) of
|
|
case ets:member(?TAB, {Key,self()}) of
|
|
true ->
|
|
true ->
|
|
- _ = gproc_lib:remove_reg(Key, self()),
|
|
|
|
|
|
+ _ = gproc_lib:remove_reg(Key, self(), unreg),
|
|
true;
|
|
true;
|
|
false ->
|
|
false ->
|
|
erlang:error(badarg)
|
|
erlang:error(badarg)
|
|
@@ -817,6 +870,17 @@ unreg_shared(Key) ->
|
|
erlang:error(badarg)
|
|
erlang:error(badarg)
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
+%% @spec (key(), pid()) -> yes | no
|
|
|
|
+%%
|
|
|
|
+%% @doc Behaviour support callback
|
|
|
|
+%% @end
|
|
|
|
+register_name({n,_,_} = Name, Pid) when Pid == self() ->
|
|
|
|
+ try reg(Name), yes
|
|
|
|
+ catch
|
|
|
|
+ error:_ ->
|
|
|
|
+ no
|
|
|
|
+ end.
|
|
|
|
+
|
|
%% @equiv unreg/1
|
|
%% @equiv unreg/1
|
|
unregister_name(Key) ->
|
|
unregister_name(Key) ->
|
|
unreg(Key).
|
|
unreg(Key).
|
|
@@ -890,7 +954,7 @@ local_mreg(T, [_|_] = KVL) ->
|
|
end.
|
|
end.
|
|
|
|
|
|
local_munreg(T, L) when T==p; T==c ->
|
|
local_munreg(T, L) when T==p; T==c ->
|
|
- _ = [gproc_lib:remove_reg({T,l,K}, self()) || K <- L],
|
|
|
|
|
|
+ _ = [gproc_lib:remove_reg({T,l,K}, self(), unreg) || K <- L],
|
|
true.
|
|
true.
|
|
|
|
|
|
%% @spec (Key :: key(), Value) -> true
|
|
%% @spec (Key :: key(), Value) -> true
|
|
@@ -1342,27 +1406,22 @@ handle_cast({monitor_me, Pid}, S) ->
|
|
erlang:monitor(process, Pid),
|
|
erlang:monitor(process, Pid),
|
|
{noreply, S};
|
|
{noreply, S};
|
|
handle_cast({cancel_wait, Pid, {T,_,_} = Key, Ref}, S) ->
|
|
handle_cast({cancel_wait, Pid, {T,_,_} = Key, Ref}, S) ->
|
|
- Rev = {Pid,Key},
|
|
|
|
case ets:lookup(?TAB, {Key,T}) of
|
|
case ets:lookup(?TAB, {Key,T}) of
|
|
- [{K, Waiters}] ->
|
|
|
|
- case Waiters -- [{Pid,Ref}] of
|
|
|
|
- [] ->
|
|
|
|
- ets:delete(?TAB, K),
|
|
|
|
- ets:delete(?TAB, Rev);
|
|
|
|
- NewWaiters ->
|
|
|
|
- ets:insert(?TAB, {K, NewWaiters}),
|
|
|
|
- case lists:keymember(Pid, 1, NewWaiters) of
|
|
|
|
- true ->
|
|
|
|
- %% should be extremely unlikely
|
|
|
|
- ok;
|
|
|
|
- false ->
|
|
|
|
- %% delete the reverse entry
|
|
|
|
- ets:delete(?TAB, Rev)
|
|
|
|
- end
|
|
|
|
- end;
|
|
|
|
|
|
+ [{_, Waiters}] ->
|
|
|
|
+ gproc_lib:remove_wait(Key, Pid, Ref, Waiters);
|
|
_ ->
|
|
_ ->
|
|
ignore
|
|
ignore
|
|
end,
|
|
end,
|
|
|
|
+ {noreply, S};
|
|
|
|
+handle_cast({cancel_wait_or_monitor, Pid, {T,_,_} = Key}, S) ->
|
|
|
|
+ case ets:lookup(?TAB, {Key, T}) of
|
|
|
|
+ [{_, Waiters}] ->
|
|
|
|
+ gproc_lib:remove_wait(Key, Pid, all, Waiters);
|
|
|
|
+ [{_, OtherPid, _}] ->
|
|
|
|
+ gproc_lib:remove_monitors(Key, OtherPid, Pid);
|
|
|
|
+ _ ->
|
|
|
|
+ ok
|
|
|
|
+ end,
|
|
{noreply, S}.
|
|
{noreply, S}.
|
|
|
|
|
|
%% @hidden
|
|
%% @hidden
|
|
@@ -1374,6 +1433,36 @@ handle_call({reg, {_T,l,_} = Key, Val}, {Pid,_}, S) ->
|
|
false ->
|
|
false ->
|
|
{reply, badarg, S}
|
|
{reply, badarg, S}
|
|
end;
|
|
end;
|
|
|
|
+handle_call({monitor, {T,l,_} = Key, Pid}, _From, S)
|
|
|
|
+ when T==n; T==a ->
|
|
|
|
+ Ref = make_ref(),
|
|
|
|
+ case where(Key) of
|
|
|
|
+ undefined ->
|
|
|
|
+ Pid ! {gproc, unreg, Ref, Key};
|
|
|
|
+ RegPid ->
|
|
|
|
+ case ets:lookup(?TAB, {RegPid, Key}) of
|
|
|
|
+ [{K,r}] ->
|
|
|
|
+ ets:insert(?TAB, {K, [{monitor, [{Pid,Ref}]}]});
|
|
|
|
+ [{K, Opts}] ->
|
|
|
|
+ ets:insert(?TAB, {K, gproc_lib:add_monitor(Opts, Pid, Ref)})
|
|
|
|
+ end
|
|
|
|
+ end,
|
|
|
|
+ {reply, Ref, S};
|
|
|
|
+handle_call({demonitor, {T,l,_} = Key, Ref, Pid}, _From, S)
|
|
|
|
+ when T==n; T==a ->
|
|
|
|
+ case where(Key) of
|
|
|
|
+ undefined ->
|
|
|
|
+ ok; % be nice
|
|
|
|
+ RegPid ->
|
|
|
|
+ case ets:lookup(?TAB, {RegPid, Key}) of
|
|
|
|
+ [{_K,r}] ->
|
|
|
|
+ ok; % be nice
|
|
|
|
+ [{K, Opts}] ->
|
|
|
|
+ ets:insert(?TAB, {K, gproc_lib:remove_monitor(
|
|
|
|
+ Opts, Pid, Ref)})
|
|
|
|
+ end
|
|
|
|
+ end,
|
|
|
|
+ {reply, ok, S};
|
|
handle_call({reg_shared, {_T,l,_} = Key, Val}, _From, S) ->
|
|
handle_call({reg_shared, {_T,l,_} = Key, Val}, _From, S) ->
|
|
case try_insert_reg(Key, Val, shared) of
|
|
case try_insert_reg(Key, Val, shared) of
|
|
%% case try_insert_shared(Key, Val) of
|
|
%% case try_insert_shared(Key, Val) of
|
|
@@ -1383,15 +1472,26 @@ handle_call({reg_shared, {_T,l,_} = Key, Val}, _From, S) ->
|
|
{reply, badarg, S}
|
|
{reply, badarg, S}
|
|
end;
|
|
end;
|
|
handle_call({unreg, {_,l,_} = Key}, {Pid,_}, S) ->
|
|
handle_call({unreg, {_,l,_} = Key}, {Pid,_}, S) ->
|
|
- case ets:member(?TAB, {Pid,Key}) of
|
|
|
|
- true ->
|
|
|
|
- _ = gproc_lib:remove_reg(Key, Pid),
|
|
|
|
|
|
+ case ets:lookup(?TAB, {Pid,Key}) of
|
|
|
|
+ [{_, r}] ->
|
|
|
|
+ _ = gproc_lib:remove_reg(Key, Pid, unreg),
|
|
{reply, true, S};
|
|
{reply, true, S};
|
|
- false ->
|
|
|
|
|
|
+ [{_, Opts}] when is_list(Opts) ->
|
|
|
|
+ _ = gproc_lib:remove_reg(Key, Pid, unreg, Opts),
|
|
|
|
+ {reply, true, S};
|
|
|
|
+ [] ->
|
|
{reply, badarg, S}
|
|
{reply, badarg, S}
|
|
end;
|
|
end;
|
|
handle_call({unreg_shared, {_,l,_} = Key}, _, S) ->
|
|
handle_call({unreg_shared, {_,l,_} = Key}, _, S) ->
|
|
- _ = gproc_lib:remove_reg(Key, shared),
|
|
|
|
|
|
+ case ets:lookup(?TAB, {shared, Key}) of
|
|
|
|
+ [{_, r}] ->
|
|
|
|
+ _ = gproc_lib:remove_reg(Key, shared, unreg);
|
|
|
|
+ [{_, Opts}] ->
|
|
|
|
+ _ = gproc_lib:remove_reg(Key, shared, unreg, Opts);
|
|
|
|
+ [] ->
|
|
|
|
+ %% don't crash if shared key already unregged.
|
|
|
|
+ ok
|
|
|
|
+ end,
|
|
{reply, true, S};
|
|
{reply, true, S};
|
|
handle_call({await, {_,l,_} = Key, Pid}, From, S) ->
|
|
handle_call({await, {_,l,_} = Key, Pid}, From, S) ->
|
|
%% Passing the pid explicitly is needed when leader_call is used,
|
|
%% Passing the pid explicitly is needed when leader_call is used,
|
|
@@ -1504,6 +1604,7 @@ try_insert_reg({T,l,_} = Key, Val, Pid) ->
|
|
true
|
|
true
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
+
|
|
%% try_insert_shared({c,l,_} = Key, Val) ->
|
|
%% try_insert_shared({c,l,_} = Key, Val) ->
|
|
%% ets:insert_new(?TAB, [{{Key,shared}, shared, Val}, {{shared, Key}, []}]);
|
|
%% ets:insert_new(?TAB, [{{Key,shared}, shared, Val}, {{shared, Key}, []}]);
|
|
%% try_insert_shared({a,l,_} = Key, Val) ->
|
|
%% try_insert_shared({a,l,_} = Key, Val) ->
|
|
@@ -1525,14 +1626,16 @@ process_is_down(Pid) when is_pid(Pid) ->
|
|
false ->
|
|
false ->
|
|
ok;
|
|
ok;
|
|
true ->
|
|
true ->
|
|
- Revs = ets:select(?TAB, [{{{Pid,'$1'}, '_'},
|
|
|
|
- [{'==',{element,2,'$1'},l}], ['$1']}]),
|
|
|
|
|
|
+ Revs = ets:select(?TAB, [{{{Pid,'$1'}, '$2'},
|
|
|
|
+ [{'==',{element,2,'$1'},l}],
|
|
|
|
+ [{{'$1','$2'}}]}]),
|
|
lists:foreach(
|
|
lists:foreach(
|
|
- fun({n,l,_}=K) ->
|
|
|
|
|
|
+ fun({{n,l,_}=K, R}) ->
|
|
Key = {K,n},
|
|
Key = {K,n},
|
|
case ets:lookup(?TAB, Key) of
|
|
case ets:lookup(?TAB, Key) of
|
|
[{_, Pid, _}] ->
|
|
[{_, Pid, _}] ->
|
|
- ets:delete(?TAB, Key);
|
|
|
|
|
|
+ ets:delete(?TAB, Key),
|
|
|
|
+ opt_notify(R, K);
|
|
[{_, Waiters}] ->
|
|
[{_, Waiters}] ->
|
|
case [W || {P,_} = W <- Waiters,
|
|
case [W || {P,_} = W <- Waiters,
|
|
P =/= Pid] of
|
|
P =/= Pid] of
|
|
@@ -1544,14 +1647,15 @@ process_is_down(Pid) when is_pid(Pid) ->
|
|
[] ->
|
|
[] ->
|
|
true
|
|
true
|
|
end;
|
|
end;
|
|
- ({c,l,C} = K) ->
|
|
|
|
|
|
+ ({{c,l,C} = K, _}) ->
|
|
Key = {K, Pid},
|
|
Key = {K, Pid},
|
|
[{_, _, Value}] = ets:lookup(?TAB, Key),
|
|
[{_, _, Value}] = ets:lookup(?TAB, Key),
|
|
ets:delete(?TAB, Key),
|
|
ets:delete(?TAB, Key),
|
|
gproc_lib:update_aggr_counter(l, C, -Value);
|
|
gproc_lib:update_aggr_counter(l, C, -Value);
|
|
- ({a,l,_} = K) ->
|
|
|
|
- ets:delete(?TAB, {K,a});
|
|
|
|
- ({p,_,_} = K) ->
|
|
|
|
|
|
+ ({{a,l,_} = K, R}) ->
|
|
|
|
+ ets:delete(?TAB, {K,a}),
|
|
|
|
+ opt_notify(R, K);
|
|
|
|
+ ({{p,_,_} = K, _}) ->
|
|
ets:delete(?TAB, {K, Pid})
|
|
ets:delete(?TAB, {K, Pid})
|
|
end, Revs),
|
|
end, Revs),
|
|
ets:select_delete(?TAB, [{{{Pid,{'_',l,'_'}},'_'}, [], [true]}]),
|
|
ets:select_delete(?TAB, [{{{Pid,{'_',l,'_'}},'_'}, [], [true]}]),
|
|
@@ -1559,6 +1663,12 @@ process_is_down(Pid) when is_pid(Pid) ->
|
|
ok
|
|
ok
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
+opt_notify(r, _) ->
|
|
|
|
+ ok;
|
|
|
|
+opt_notify(Opts, Key) ->
|
|
|
|
+ gproc_lib:notify(Key, Opts).
|
|
|
|
+
|
|
|
|
+
|
|
do_give_away({T,l,_} = K, To, Pid) when T==n; T==a ->
|
|
do_give_away({T,l,_} = K, To, Pid) when T==n; T==a ->
|
|
Key = {K, T},
|
|
Key = {K, T},
|
|
case ets:lookup(?TAB, Key) of
|
|
case ets:lookup(?TAB, Key) of
|
|
@@ -1572,11 +1682,11 @@ do_give_away({T,l,_} = K, To, Pid) when T==n; T==a ->
|
|
ToPid when is_pid(ToPid) ->
|
|
ToPid when is_pid(ToPid) ->
|
|
ets:insert(?TAB, [{Key, ToPid, Value},
|
|
ets:insert(?TAB, [{Key, ToPid, Value},
|
|
{{ToPid, K}, []}]),
|
|
{{ToPid, K}, []}]),
|
|
- ets:delete(?TAB, {Pid, K}),
|
|
|
|
|
|
+ gproc_lib:remove_reverse_mapping({migrated,ToPid}, Pid, K),
|
|
_ = gproc_lib:ensure_monitor(ToPid, l),
|
|
_ = gproc_lib:ensure_monitor(ToPid, l),
|
|
ToPid;
|
|
ToPid;
|
|
undefined ->
|
|
undefined ->
|
|
- _ = gproc_lib:remove_reg(K, Pid),
|
|
|
|
|
|
+ _ = gproc_lib:remove_reg(K, Pid, unreg),
|
|
undefined
|
|
undefined
|
|
end;
|
|
end;
|
|
_ ->
|
|
_ ->
|
|
@@ -1601,7 +1711,7 @@ do_give_away({T,l,_} = K, To, Pid) when T==c; T==p ->
|
|
ToPid
|
|
ToPid
|
|
end;
|
|
end;
|
|
undefined ->
|
|
undefined ->
|
|
- _ = gproc_lib:remove_reg(K, Pid),
|
|
|
|
|
|
+ _ = gproc_lib:remove_reg(K, Pid, migrated),
|
|
undefined
|
|
undefined
|
|
end;
|
|
end;
|
|
_ ->
|
|
_ ->
|
|
@@ -1645,8 +1755,6 @@ set_monitors({Pids, Cont}) ->
|
|
_ = [erlang:monitor(process,Pid) || Pid <- Pids],
|
|
_ = [erlang:monitor(process,Pid) || Pid <- Pids],
|
|
set_monitors(ets:select(Cont)).
|
|
set_monitors(ets:select(Cont)).
|
|
|
|
|
|
-
|
|
|
|
-
|
|
|
|
monitor_me() ->
|
|
monitor_me() ->
|
|
case ets:insert_new(?TAB, {{self(),l}}) of
|
|
case ets:insert_new(?TAB, {{self(),l}}) of
|
|
false -> true;
|
|
false -> true;
|