|
@@ -44,6 +44,9 @@
|
|
|
set_value/2,
|
|
|
get_value/1,
|
|
|
where/1,
|
|
|
+ await/2,
|
|
|
+ nb_wait/1,
|
|
|
+ cancel_wait/2,
|
|
|
lookup_pid/1,
|
|
|
lookup_pids/1,
|
|
|
lookup_values/1,
|
|
@@ -170,7 +173,7 @@ add_local_aggr_counter(Name) -> reg({a,l,Name}).
|
|
|
%% spec(Name::any()) -> true
|
|
|
%%
|
|
|
%% @doc Registers a global (unique) aggregated counter.
|
|
|
-%% @equiv reg({a,l,Name})
|
|
|
+%% @equiv reg({a,g,Name})
|
|
|
%% @end
|
|
|
%%
|
|
|
add_global_aggr_counter(Name) -> reg({a,g,Name}).
|
|
@@ -190,7 +193,7 @@ lookup_local_name(Name) -> where({n,l,Name}).
|
|
|
%% @equiv where({n,g,Name})
|
|
|
%% @end
|
|
|
%%
|
|
|
-lookup_global_name(Name) -> where({g,l,Name}).
|
|
|
+lookup_global_name(Name) -> where({n,g,Name}).
|
|
|
|
|
|
%% @spec (Name::any()) -> integer()
|
|
|
%%
|
|
@@ -244,7 +247,7 @@ lookup_local_counters(P) -> lookup_values({c,l,P}).
|
|
|
%%
|
|
|
%% @doc Look up all global (non-unique) instances of a given Counter.
|
|
|
%% Returns a list of {Pid, Value} tuples for all matching objects.
|
|
|
-%% @equiv lookup_values({c, l, Counter})
|
|
|
+%% @equiv lookup_values({c, g, Counter})
|
|
|
%% @end
|
|
|
%%
|
|
|
lookup_global_counters(P) -> lookup_values({c,g,P}).
|
|
@@ -263,6 +266,50 @@ default({T,_,_}) when T==c; T==a -> 0;
|
|
|
default(_) -> undefined.
|
|
|
|
|
|
|
|
|
+%% @spec await(Key::key(), Timeout) -> {pid(),Value}
|
|
|
+%% Timeout = integer() | infinity
|
|
|
+%%
|
|
|
+%% @doc Wait for a local name to be registered.
|
|
|
+%% The function raises an exception if the timeout expires. Timeout must be
|
|
|
+%% either an interger > 0 or 'infinity'.
|
|
|
+%% @end
|
|
|
+%%
|
|
|
+await({n,l,_} = Key, Timeout) ->
|
|
|
+ TRef = case Timeout of
|
|
|
+ infinity -> no_timer;
|
|
|
+ T when is_integer(T), T > 0 ->
|
|
|
+ erlang:start_timer(T, self(), timeout);
|
|
|
+ _ ->
|
|
|
+ erlang:error(badarg, [Key, Timeout])
|
|
|
+ end,
|
|
|
+ WRef = call({await,Key}),
|
|
|
+ receive
|
|
|
+ {gproc, WRef, registered, {_K, Pid, V}} ->
|
|
|
+ {Pid, V};
|
|
|
+ {timeout, TRef, timeout} ->
|
|
|
+ cancel_wait(Key, WRef),
|
|
|
+ erlang:error(timeout, [Key, Timeout])
|
|
|
+ end;
|
|
|
+await(K, T) ->
|
|
|
+ erlang:error(badarg, [K, T]).
|
|
|
+
|
|
|
+%% @spec nb_wait(Key::key()) -> Ref
|
|
|
+%%
|
|
|
+%% @doc Wait for a local name to be registered.
|
|
|
+%% The caller can expect to receive a message,
|
|
|
+%% {gproc, Ref, registered, {Key, Pid, Value}}, once the name is registered.
|
|
|
+%% @end
|
|
|
+%%
|
|
|
+nb_wait({n,l,_} = Key) ->
|
|
|
+ call({await, Key});
|
|
|
+nb_wait(Key) ->
|
|
|
+ erlang:error(badarg, [Key]).
|
|
|
+
|
|
|
+cancel_wait(Key, Ref) ->
|
|
|
+ cast({cancel_wait, self(), Key, Ref}),
|
|
|
+ ok.
|
|
|
+
|
|
|
+
|
|
|
%% @spec reg(Key::key(), Value) -> true
|
|
|
%%
|
|
|
%% @doc Register a name or property for the current process
|
|
@@ -675,6 +722,20 @@ info(Pid, I) ->
|
|
|
%% @hidden
|
|
|
handle_cast({monitor_me, Pid}, S) ->
|
|
|
erlang:monitor(process, Pid),
|
|
|
+ {noreply, S};
|
|
|
+handle_cast({cancel_wait, Pid, {T,_,_} = Key, Ref}, S) ->
|
|
|
+ case ets:lookup(?TAB, {Key,T}) of
|
|
|
+ [{K, Waiters}] ->
|
|
|
+ NewWaiters = Waiters -- [{Pid,Ref}],
|
|
|
+ %% for now, we don't remove the reverse entry. If we should do
|
|
|
+ %% that, we have to make sure that Pid doesn't have another
|
|
|
+ %% waiter (which it shouldn't have, given that the wait is
|
|
|
+ %% synchronous). Keeping it is not problematic - worst case, we
|
|
|
+ %% will get an unnecessary cleanup.
|
|
|
+ ets:insert(?TAB, {K, NewWaiters});
|
|
|
+ _ ->
|
|
|
+ ignore
|
|
|
+ end,
|
|
|
{noreply, S}.
|
|
|
|
|
|
%% @hidden
|
|
@@ -694,6 +755,24 @@ handle_call({unreg, {_,l,_} = Key}, {Pid,_}, S) ->
|
|
|
false ->
|
|
|
{reply, badarg, S}
|
|
|
end;
|
|
|
+handle_call({await, {T,l,_} = Key}, {Pid,Ref} = From, S) ->
|
|
|
+ Rev = {{Pid,Key}, r},
|
|
|
+ case ets:lookup(?TAB, {Key,T}) of
|
|
|
+ [{_, P, Value}] ->
|
|
|
+ %% for symmetry, we always reply with Ref and then send a message
|
|
|
+ gen_server:reply(From, Ref),
|
|
|
+ Pid ! {gproc, Ref, registered, {Key, P, Value}},
|
|
|
+ {noreply, S};
|
|
|
+ [{K, Waiters}] ->
|
|
|
+ NewWaiters = [{Pid,Ref} | Waiters],
|
|
|
+ ets:insert(?TAB, [{K, NewWaiters}, Rev]),
|
|
|
+ gproc_lib:ensure_monitor(Pid),
|
|
|
+ {reply, Ref, S};
|
|
|
+ [] ->
|
|
|
+ ets:insert(?TAB, [{{Key,T}, [{Pid,Ref}]}, Rev]),
|
|
|
+ gproc_lib:ensure_monitor(Pid),
|
|
|
+ {reply, Ref, S}
|
|
|
+ end;
|
|
|
handle_call({mreg, T, l, L}, {Pid,_}, S) ->
|
|
|
try gproc_lib:insert_many(T, l, L, Pid) of
|
|
|
{true,_} -> {reply, true, S};
|
|
@@ -736,6 +815,7 @@ call(Req) ->
|
|
|
Reply -> Reply
|
|
|
end.
|
|
|
|
|
|
+
|
|
|
cast(Msg) ->
|
|
|
gen_server:cast(?MODULE, Msg).
|
|
|
|
|
@@ -762,11 +842,15 @@ try_insert_reg({T,l,_} = Key, Val, Pid) ->
|
|
|
end.
|
|
|
|
|
|
process_is_down(Pid) ->
|
|
|
- Keys = ets:select(?TAB, [{{{Pid,'$1'}},
|
|
|
- [{'==',{element,2,'$1'},l}], ['$1']}]),
|
|
|
+ Keys = ets:select(?TAB, [{{{Pid,'$1'},'$2'},
|
|
|
+ [{'==',{element,2,'$1'},l}], [{{'$1','$2'}}]}]),
|
|
|
ets:select_delete(?TAB, [{{{Pid,{'_',l,'_'}}}, [], [true]}]),
|
|
|
ets:delete(?TAB, Pid),
|
|
|
- lists:foreach(fun(Key) -> gproc_lib:remove_reg_1(Key, Pid) end, Keys).
|
|
|
+ lists:foreach(fun({Key,r}) ->
|
|
|
+ gproc_lib:remove_reg_1(Key, Pid);
|
|
|
+ ({Key,w}) ->
|
|
|
+ gproc_lib:remove_waiter(Key, Pid)
|
|
|
+ end, Keys).
|
|
|
|
|
|
|
|
|
create_tabs() ->
|