|
@@ -78,9 +78,10 @@
|
|
|
set_value/2,
|
|
|
get_value/1, get_value/2,
|
|
|
where/1,
|
|
|
- await/1, await/2,
|
|
|
- nb_wait/1,
|
|
|
- cancel_wait/2,
|
|
|
+ await/1, await/2, await/3,
|
|
|
+ wide_await/3,
|
|
|
+ nb_wait/1, nb_wait/2,
|
|
|
+ cancel_wait/2, cancel_wait/3,
|
|
|
cancel_wait_or_monitor/1,
|
|
|
monitor/1,
|
|
|
demonitor/2,
|
|
@@ -624,6 +625,21 @@ await(Key) ->
|
|
|
await(Key, Timeout) ->
|
|
|
?CATCH_GPROC_ERROR(await1(Key, Timeout), [Key, Timeout]).
|
|
|
|
|
|
+%% @spec await(Node::node(), Key::key(), Timeout) -> {pid(),Value}
|
|
|
+%% Timeout = integer() | infinity
|
|
|
+%%
|
|
|
+%% @doc Wait for a local name to be registered on `Node'.
|
|
|
+%% This function works exactly like {@link await/2}, but queries a remote
|
|
|
+%% node instead. An exception is thrown if `Node' cannot be reached. If gproc
|
|
|
+%% is not running on a given node, this is treated the same as the node being
|
|
|
+%% down.
|
|
|
+%% @end
|
|
|
+%%
|
|
|
+await(Node, Key, Timeout) when Node == node() ->
|
|
|
+ await(Key, Timeout);
|
|
|
+await(Node, Key, Timeout) when is_atom(Node) ->
|
|
|
+ ?CATCH_GPROC_ERROR(await1(Node, Key, Timeout), [Node, Key, Timeout]).
|
|
|
+
|
|
|
await1({n,g,_} = Key, Timeout) ->
|
|
|
?CHK_DIST,
|
|
|
request_wait(Key, Timeout);
|
|
@@ -649,7 +665,16 @@ await1({n,l,_} = Key, Timeout) ->
|
|
|
await1(_, _) ->
|
|
|
throw(badarg).
|
|
|
|
|
|
-request_wait({n,C,_} = Key, Timeout) when C==l; C==g ->
|
|
|
+await1(N, {n,l,_} = Key, Timeout) when is_atom(N) ->
|
|
|
+ request_wait(N, Key, Timeout);
|
|
|
+await1(_, _, _) ->
|
|
|
+ throw(badarg).
|
|
|
+
|
|
|
+
|
|
|
+request_wait(Key, Timeout) ->
|
|
|
+ request_wait(node(), Key, Timeout).
|
|
|
+
|
|
|
+request_wait(N, {n,C,_} = Key, Timeout) when C==l; C==g ->
|
|
|
TRef = case Timeout of
|
|
|
infinity -> no_timer;
|
|
|
T when is_integer(T), T > 0 ->
|
|
@@ -657,7 +682,7 @@ request_wait({n,C,_} = Key, Timeout) when C==l; C==g ->
|
|
|
_ ->
|
|
|
?THROW_GPROC_ERROR(badarg)
|
|
|
end,
|
|
|
- WRef = case {call({await,Key,self()}, C), C} of
|
|
|
+ WRef = case {call(N, {await,Key,self()}, C), C} of
|
|
|
{{R, {Kg,Pg,Vg}}, g} ->
|
|
|
self() ! {gproc, R, registered, {Kg,Pg,Vg}},
|
|
|
R;
|
|
@@ -672,10 +697,80 @@ request_wait({n,C,_} = Key, Timeout) when C==l; C==g ->
|
|
|
end,
|
|
|
{Pid, V};
|
|
|
{timeout, TRef, gproc_timeout} ->
|
|
|
- cancel_wait(Key, WRef),
|
|
|
+ cancel_wait(N, Key, WRef),
|
|
|
?THROW_GPROC_ERROR(timeout)
|
|
|
end.
|
|
|
|
|
|
+%% @spec wide_await(Nodes::[node()], Key::key(), Timeout) -> {pid(),Value}
|
|
|
+%% Timeout = integer() | infinity
|
|
|
+%%
|
|
|
+%% @doc Wait for a local name to be registered on any of `Nodes'.
|
|
|
+%% This function works rather like {@link await/2}, but queries all nodes in
|
|
|
+%% the `Nodes' list at the same time. The first node to respond with a
|
|
|
+%% process registered as `Key' will provide the result. Other results are
|
|
|
+%% ignored. `Key' must be a unique name with local scope, i.e. `{n,l,Name}'.
|
|
|
+%%
|
|
|
+%% An exception is thrown upon timeout, or if no node can be reached (if gproc is
|
|
|
+%% not running on a given node, this is treated the same as the node being down).
|
|
|
+%% @end
|
|
|
+%%
|
|
|
+wide_await(Nodes, Key, Timeout) ->
|
|
|
+ ?CATCH_GPROC_ERROR(wide_await1(Nodes, Key, Timeout), [Nodes, Key, Timeout]).
|
|
|
+
|
|
|
+wide_await1(Nodes, {n,l,_} = Key, Timeout) ->
|
|
|
+ {_, Ref} = spawn_monitor(fun() ->
|
|
|
+ wide_request_wait(Nodes, Key, Timeout)
|
|
|
+ end),
|
|
|
+ receive
|
|
|
+ {'DOWN', Ref, _, _, Reason} ->
|
|
|
+ case Reason of
|
|
|
+ {ok, {gproc,_,registered,{_,Pid,V}}} ->
|
|
|
+ {Pid, V};
|
|
|
+ Other ->
|
|
|
+ ?THROW_GPROC_ERROR(Other)
|
|
|
+ end
|
|
|
+ end;
|
|
|
+wide_await1(_, _, _) ->
|
|
|
+ ?THROW_GPROC_ERROR(badarg).
|
|
|
+
|
|
|
+
|
|
|
+wide_request_wait(Nodes, {n,l,_} = Key, Timeout) ->
|
|
|
+ TRef = case Timeout of
|
|
|
+ infinity -> no_timer;
|
|
|
+ T when is_integer(T), T > 0 ->
|
|
|
+ erlang:start_timer(T, self(), gproc_timeout);
|
|
|
+ _ ->
|
|
|
+ exit(badarg)
|
|
|
+ end,
|
|
|
+ Req = {await, Key, self()},
|
|
|
+ Refs = lists:map(
|
|
|
+ fun(Node) ->
|
|
|
+ S = {?MODULE, Node},
|
|
|
+ Ref = erlang:monitor(process, S),
|
|
|
+ catch erlang:send(S, {'$gen_call', {self(), Ref}, Req},
|
|
|
+ [noconnect]),
|
|
|
+ {Node, Ref}
|
|
|
+ end, Nodes),
|
|
|
+ collect_replies(Refs, Key, TRef).
|
|
|
+
|
|
|
+collect_replies(Refs, Key, TRef) ->
|
|
|
+ receive
|
|
|
+ {gproc, _Ref, registered, {_, _, _}} = Result ->
|
|
|
+ exit({ok, Result});
|
|
|
+ {'DOWN', Ref, _, _, _} ->
|
|
|
+ case lists:keydelete(Ref, 2, Refs) of
|
|
|
+ [] ->
|
|
|
+ exit(nodedown);
|
|
|
+ Refs1 ->
|
|
|
+ collect_replies(Refs1, Key, TRef)
|
|
|
+ end;
|
|
|
+ {timeout, TRef, gproc_timeout} ->
|
|
|
+ exit(timeout);
|
|
|
+ {Ref, Ref} ->
|
|
|
+ %% ignore
|
|
|
+ collect_replies(Refs, Key, TRef)
|
|
|
+ end.
|
|
|
+
|
|
|
|
|
|
%% @spec nb_wait(Key::key()) -> Ref
|
|
|
%%
|
|
@@ -687,6 +782,16 @@ request_wait({n,C,_} = Key, Timeout) when C==l; C==g ->
|
|
|
nb_wait(Key) ->
|
|
|
?CATCH_GPROC_ERROR(nb_wait1(Key), [Key]).
|
|
|
|
|
|
+%% @spec nb_wait(Node::node(), Key::key()) -> Ref
|
|
|
+%%
|
|
|
+%% @doc Wait for a local name to be registered on `Node'.
|
|
|
+%% The caller can expect to receive a message,
|
|
|
+%% {gproc, Ref, registered, {Key, Pid, Value}}, once the name is registered.
|
|
|
+%% @end
|
|
|
+%%
|
|
|
+nb_wait(Node, {n,l,_} = Key) when is_atom(Node) ->
|
|
|
+ ?CATCH_GPROC_ERROR(nb_wait1(Node, Key), [Node, Key]).
|
|
|
+
|
|
|
nb_wait1({n,g,_} = Key) ->
|
|
|
?CHK_DIST,
|
|
|
call({await, Key, self()}, g);
|
|
@@ -695,6 +800,12 @@ nb_wait1({n,l,_} = Key) ->
|
|
|
nb_wait1(_) ->
|
|
|
?THROW_GPROC_ERROR(badarg).
|
|
|
|
|
|
+nb_wait1(Node, {n,l,_} = Key) when is_atom(Node) ->
|
|
|
+ call(Node, {await, Key, self()}, l);
|
|
|
+nb_wait1(_, _) ->
|
|
|
+ ?THROW_GPROC_ERROR(badarg).
|
|
|
+
|
|
|
+
|
|
|
%% @spec cancel_wait(Key::key(), Ref) -> ok
|
|
|
%% Ref = all | reference()
|
|
|
%%
|
|
@@ -707,6 +818,21 @@ nb_wait1(_) ->
|
|
|
cancel_wait(Key, Ref) ->
|
|
|
?CATCH_GPROC_ERROR(cancel_wait1(Key, Ref), [Key, Ref]).
|
|
|
|
|
|
+%% @spec cancel_wait(Node::node(), Key::key(), Ref) -> ok
|
|
|
+%% Ref = all | reference()
|
|
|
+%%
|
|
|
+%% @doc Cancels a previous call to nb_wait/2
|
|
|
+%%
|
|
|
+%% This function works just like {@link cancel_wait/2}, but talks to a remote
|
|
|
+%% node.
|
|
|
+%% @end
|
|
|
+%%
|
|
|
+cancel_wait(N, Key, Ref) when N == node() ->
|
|
|
+ cancel_wait(Key, Ref);
|
|
|
+cancel_wait(N, Key, Ref) ->
|
|
|
+ ?CATCH_GPROC_ERROR(cancel_wait1(N, Key, Ref), [N, Key, Ref]).
|
|
|
+
|
|
|
+
|
|
|
cancel_wait1({_,g,_} = Key, Ref) ->
|
|
|
?CHK_DIST,
|
|
|
cast({cancel_wait, self(), Key, Ref}, g),
|
|
@@ -715,6 +841,9 @@ cancel_wait1({_,l,_} = Key, Ref) ->
|
|
|
cast({cancel_wait, self(), Key, Ref}, l),
|
|
|
ok.
|
|
|
|
|
|
+cancel_wait1(N, {_,l,_} = Key, Ref) ->
|
|
|
+ cast(N, {cancel_wait, self(), Key, Ref}, l).
|
|
|
+
|
|
|
cancel_wait_or_monitor(Key) ->
|
|
|
?CATCH_GPROC_ERROR(cancel_wait_or_monitor1(Key), [Key]).
|
|
|
|
|
@@ -1770,6 +1899,9 @@ call(Req, l) ->
|
|
|
call(Req, g) ->
|
|
|
chk_reply(gproc_dist:leader_call(Req)).
|
|
|
|
|
|
+call(N, Req, l) ->
|
|
|
+ chk_reply(gen_server:call({?MODULE, N}, Req)).
|
|
|
+
|
|
|
chk_reply(Reply) ->
|
|
|
case Reply of
|
|
|
badarg -> ?THROW_GPROC_ERROR(badarg);
|
|
@@ -1785,6 +1917,9 @@ cast(Msg, l) ->
|
|
|
cast(Msg, g) ->
|
|
|
gproc_dist:leader_cast(Msg).
|
|
|
|
|
|
+cast(N, Msg, l) ->
|
|
|
+ gen_server:cast({?MODULE, N}, Msg).
|
|
|
+
|
|
|
try_insert_reg({T,l,_} = Key, Val, Pid) ->
|
|
|
case gproc_lib:insert_reg(Key, Val, Pid, l) of
|
|
|
false ->
|