|
@@ -64,10 +64,11 @@
|
|
|
|
|
|
-export([start_link/0,
|
|
|
reg/1, reg/2, unreg/1,
|
|
|
+ reg_shared/1, reg_shared/2, unreg_shared/1,
|
|
|
mreg/3,
|
|
|
munreg/3,
|
|
|
set_value/2,
|
|
|
- get_value/1,
|
|
|
+ get_value/1, get_value/2,
|
|
|
where/1,
|
|
|
await/1, await/2,
|
|
|
nb_wait/1,
|
|
@@ -77,6 +78,8 @@
|
|
|
lookup_value/1,
|
|
|
lookup_values/1,
|
|
|
update_counter/2,
|
|
|
+ reset_counter/1,
|
|
|
+ update_shared_counter/2,
|
|
|
give_away/2,
|
|
|
goodbye/0,
|
|
|
send/2,
|
|
@@ -104,6 +107,7 @@
|
|
|
add_global_counter/2,
|
|
|
add_local_aggr_counter/1,
|
|
|
add_global_aggr_counter/1,
|
|
|
+ add_shared_local_counter/2,
|
|
|
lookup_local_name/1,
|
|
|
lookup_global_name/1,
|
|
|
lookup_local_properties/1,
|
|
@@ -202,6 +206,16 @@ add_local_counter(Name, Initial) when is_integer(Initial) ->
|
|
|
|
|
|
%% spec(Name::any(), Initial::integer()) -> true
|
|
|
%%
|
|
|
+%% @doc Registers a local shared (unique) counter.
|
|
|
+%% @equiv reg_shared({c,l,Name},Value)
|
|
|
+%% @end
|
|
|
+%%
|
|
|
+add_shared_local_counter(Name, Initial) when is_integer(Initial) ->
|
|
|
+ reg_shared({c,l,Name}, Initial).
|
|
|
+
|
|
|
+
|
|
|
+%% spec(Name::any(), Initial::integer()) -> true
|
|
|
+%%
|
|
|
%% @doc Registers a global (non-unique) counter. @equiv reg({c,g,Name},Value)
|
|
|
%% @end
|
|
|
%%
|
|
@@ -671,6 +685,46 @@ reg({n,l,_} = Key, Value) ->
|
|
|
reg(_, _) ->
|
|
|
erlang:error(badarg).
|
|
|
|
|
|
+
|
|
|
+%% @spec reg_shared(Key::key()) -> true
|
|
|
+%%
|
|
|
+%% @doc Register a resource, but don't tie it to a particular process.
|
|
|
+%%
|
|
|
+%% `reg_shared({c,l,C}) -> reg_shared({c,l,C}, 0).'
|
|
|
+%% `reg_shared({a,l,A}) -> reg_shared({a,l,A}, undefined).'
|
|
|
+%% @end
|
|
|
+reg_shared({c,_,_} = Key) ->
|
|
|
+ reg_shared(Key, 0);
|
|
|
+reg_shared({a,_,_} = Key) ->
|
|
|
+ reg_shared(Key, undefined).
|
|
|
+
|
|
|
+
|
|
|
+%% @spec reg_shared(Key::key(), Value) -> true
|
|
|
+%%
|
|
|
+%% @doc Register a resource, but don't tie it to a particular process.
|
|
|
+%%
|
|
|
+%% Shared resources are all unique. They remain until explicitly unregistered
|
|
|
+%% (using {@link unreg_shared/1}). The types of shared resources currently
|
|
|
+%% supported are `counter' and `aggregated counter'. In listings and query
|
|
|
+%% results, shared resources appear as other similar resources, except that
|
|
|
+%% `Pid == shared'. To wit, update_counter({c,l,myCounter}, 1, shared) would
|
|
|
+%% increment the shared counter `myCounter' with 1, provided it exists.
|
|
|
+%%
|
|
|
+%% A shared aggregated counter will track updates in exactly the same way as
|
|
|
+%% an aggregated counter which is owned by a process.
|
|
|
+%% @end
|
|
|
+%%
|
|
|
+reg_shared({_,g,_} = Key, Value) ->
|
|
|
+ %% anything global
|
|
|
+ ?CHK_DIST,
|
|
|
+ gproc_dist:reg_shared(Key, Value);
|
|
|
+reg_shared({a,l,_} = Key, undefined) ->
|
|
|
+ call({reg_shared, Key, undefined});
|
|
|
+reg_shared({c,l,_} = Key, Value) when is_integer(Value) ->
|
|
|
+ call({reg_shared, Key, Value});
|
|
|
+reg_shared(_, _) ->
|
|
|
+ erlang:error(badarg).
|
|
|
+
|
|
|
%% @spec mreg(type(), scope(), [{Key::any(), Value::any()}]) -> true
|
|
|
%%
|
|
|
%% @doc Register multiple {Key,Value} pairs of a given type and scope.
|
|
@@ -748,6 +802,21 @@ unreg(Key) ->
|
|
|
end
|
|
|
end.
|
|
|
|
|
|
+%% @spec (Key:: key()) -> true
|
|
|
+%%
|
|
|
+%% @doc Unregister a shared resource.
|
|
|
+%% @end
|
|
|
+unreg_shared(Key) ->
|
|
|
+ case Key of
|
|
|
+ {_, g, _} ->
|
|
|
+ ?CHK_DIST,
|
|
|
+ gproc_dist:unreg_shared(Key);
|
|
|
+ {T, l, _} when T == c;
|
|
|
+ T == a -> call({unreg_shared, Key});
|
|
|
+ _ ->
|
|
|
+ erlang:error(badarg)
|
|
|
+ end.
|
|
|
+
|
|
|
%% @equiv unreg/1
|
|
|
unregister_name(Key) ->
|
|
|
unreg(Key).
|
|
@@ -850,13 +919,20 @@ set_value(_, _) ->
|
|
|
erlang:error(badarg).
|
|
|
|
|
|
%% @spec (Key) -> Value
|
|
|
-%% @doc Read the value stored with a key registered to the current process.
|
|
|
+%% @doc Reads the value stored with a key registered to the current process.
|
|
|
%%
|
|
|
%% If no such key is registered to the current process, this function exits.
|
|
|
%% @end
|
|
|
get_value(Key) ->
|
|
|
get_value(Key, self()).
|
|
|
|
|
|
+%% @spec (Key, Pid) -> Value
|
|
|
+%% @doc Reads the value stored with a key registered to the process Pid.
|
|
|
+%%
|
|
|
+%% If `Pid == shared', the value of a shared key (see {@link reg_shared/1})
|
|
|
+%% will be read.
|
|
|
+%% @end
|
|
|
+%%
|
|
|
get_value({T,_,_} = Key, Pid) when is_pid(Pid) ->
|
|
|
if T==n orelse T==a ->
|
|
|
case ets:lookup(?TAB, {Key, T}) of
|
|
@@ -866,6 +942,15 @@ get_value({T,_,_} = Key, Pid) when is_pid(Pid) ->
|
|
|
true ->
|
|
|
ets:lookup_element(?TAB, {Key, Pid}, 3)
|
|
|
end;
|
|
|
+get_value({T,_,_} = K, shared) when T==c; T==a ->
|
|
|
+ Key = case T of
|
|
|
+ c -> {K, shared};
|
|
|
+ a -> {K, a}
|
|
|
+ end,
|
|
|
+ case ets:lookup(?TAB, Key) of
|
|
|
+ [{_, shared, Value}] -> Value;
|
|
|
+ _ -> erlang:error(badarg)
|
|
|
+ end;
|
|
|
get_value(_, _) ->
|
|
|
erlang:error(badarg).
|
|
|
|
|
@@ -978,6 +1063,40 @@ update_counter({c,g,_} = Key, Incr) when is_integer(Incr) ->
|
|
|
update_counter(_, _) ->
|
|
|
erlang:error(badarg).
|
|
|
|
|
|
+
|
|
|
+%% @spec (Key) -> {ValueBefore, ValueAfter}
|
|
|
+%% Key = {c, Scope, Name}
|
|
|
+%% Scope = l | g
|
|
|
+%% ValueBefore = integer()
|
|
|
+%% ValueAfter = integer()
|
|
|
+%%
|
|
|
+%% @doc Reads and resets a counter in a "thread-safe" way
|
|
|
+%%
|
|
|
+%% This function reads the current value of a counter and then resets it to its
|
|
|
+%% initial value. The reset operation is done using {@link update_counter/2},
|
|
|
+%% which allows for concurrent calls to {@link update_counter/2} without losing
|
|
|
+%% updates. Aggregated counters are updated accordingly.
|
|
|
+%% @end
|
|
|
+%%
|
|
|
+reset_counter({c,g,_} = Key) ->
|
|
|
+ ?CHK_DIST,
|
|
|
+ gproc_dist:reset_counter(Key);
|
|
|
+reset_counter({c,l,_} = Key) ->
|
|
|
+ Current = ets:lookup_element(?TAB, {Key, self()}, 3),
|
|
|
+ Initial = case ets:lookup(?TAB, {self(), Key}) of
|
|
|
+ [{_, r}] -> 0;
|
|
|
+ [{_, Opts}] ->
|
|
|
+ proplists:get_value(initial, Opts, 0)
|
|
|
+ end,
|
|
|
+ {Current, update_counter(Key, Initial - Current)}.
|
|
|
+
|
|
|
+
|
|
|
+update_shared_counter({c,g,_} = Key, Incr) ->
|
|
|
+ ?CHK_DIST,
|
|
|
+ gproc_dist:update_shared_counter(Key, Incr);
|
|
|
+update_shared_counter({c,l,_} = Key, Incr) ->
|
|
|
+ gproc_lib:update_counter(Key, Incr, shared).
|
|
|
+
|
|
|
%% @spec (From::key(), To::pid() | key()) -> undefined | pid()
|
|
|
%%
|
|
|
%% @doc Atomically transfers the key `From' to the process identified by `To'.
|
|
@@ -1150,7 +1269,7 @@ info(Pid) when is_pid(Pid) ->
|
|
|
%% same as [http://www.erlang.org/doc/man/erlang.html#process_info-2].
|
|
|
%% @end
|
|
|
info(Pid, ?MODULE) ->
|
|
|
- Keys = ets:select(?TAB, [{ {{Pid,'$1'}, r}, [], ['$1'] }]),
|
|
|
+ Keys = ets:select(?TAB, [{ {{Pid,'$1'}, '_'}, [], ['$1'] }]),
|
|
|
{?MODULE, lists:zf(
|
|
|
fun(K) ->
|
|
|
try V = get_value(K, Pid),
|
|
@@ -1210,6 +1329,14 @@ handle_call({reg, {_T,l,_} = Key, Val}, {Pid,_}, S) ->
|
|
|
false ->
|
|
|
{reply, badarg, S}
|
|
|
end;
|
|
|
+handle_call({reg_shared, {_T,l,_} = Key, Val}, _From, S) ->
|
|
|
+ case try_insert_reg(Key, Val, shared) of
|
|
|
+ %% case try_insert_shared(Key, Val) of
|
|
|
+ true ->
|
|
|
+ {reply, true, S};
|
|
|
+ false ->
|
|
|
+ {reply, badarg, S}
|
|
|
+ end;
|
|
|
handle_call({unreg, {_,l,_} = Key}, {Pid,_}, S) ->
|
|
|
case ets:member(?TAB, {Pid,Key}) of
|
|
|
true ->
|
|
@@ -1218,6 +1345,9 @@ handle_call({unreg, {_,l,_} = Key}, {Pid,_}, S) ->
|
|
|
false ->
|
|
|
{reply, badarg, S}
|
|
|
end;
|
|
|
+handle_call({unreg_shared, {_,l,_} = Key}, _, S) ->
|
|
|
+ _ = gproc_lib:remove_reg(Key, shared),
|
|
|
+ {reply, true, S};
|
|
|
handle_call({await, {_,l,_} = Key, Pid}, From, S) ->
|
|
|
%% Passing the pid explicitly is needed when leader_call is used,
|
|
|
%% since the Pid given as From in the leader is the local gen_leader
|
|
@@ -1307,9 +1437,6 @@ cast(Msg, l) ->
|
|
|
cast(Msg, g) ->
|
|
|
gproc_dist:leader_cast(Msg).
|
|
|
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
try_insert_reg({T,l,_} = Key, Val, Pid) ->
|
|
|
case gproc_lib:insert_reg(Key, Val, Pid, l) of
|
|
|
false ->
|
|
@@ -1332,6 +1459,10 @@ try_insert_reg({T,l,_} = Key, Val, Pid) ->
|
|
|
true
|
|
|
end.
|
|
|
|
|
|
+%% try_insert_shared({c,l,_} = Key, Val) ->
|
|
|
+%% ets:insert_new(?TAB, [{{Key,shared}, shared, Val}, {{shared, Key}, []}]);
|
|
|
+%% try_insert_shared({a,l,_} = Key, Val) ->
|
|
|
+%% ets:insert_new(?TAB, [{{Key, a}, shared, Val}, {{shared, Key}, []}]).
|
|
|
|
|
|
-spec audit_process(pid()) -> ok.
|
|
|
|
|
@@ -1349,7 +1480,7 @@ process_is_down(Pid) when is_pid(Pid) ->
|
|
|
false ->
|
|
|
ok;
|
|
|
true ->
|
|
|
- Revs = ets:select(?TAB, [{{{Pid,'$1'},r},
|
|
|
+ Revs = ets:select(?TAB, [{{{Pid,'$1'}, '_'},
|
|
|
[{'==',{element,2,'$1'},l}], ['$1']}]),
|
|
|
lists:foreach(
|
|
|
fun({n,l,_}=K) ->
|
|
@@ -1395,7 +1526,7 @@ do_give_away({T,l,_} = K, To, Pid) when T==n; T==a ->
|
|
|
Pid;
|
|
|
ToPid when is_pid(ToPid) ->
|
|
|
ets:insert(?TAB, [{Key, ToPid, Value},
|
|
|
- {{ToPid, K}, r}]),
|
|
|
+ {{ToPid, K}, []}]),
|
|
|
ets:delete(?TAB, {Pid, K}),
|
|
|
_ = gproc_lib:ensure_monitor(ToPid, l),
|
|
|
ToPid;
|
|
@@ -1418,7 +1549,7 @@ do_give_away({T,l,_} = K, To, Pid) when T==c; T==p ->
|
|
|
badarg;
|
|
|
false ->
|
|
|
ets:insert(?TAB, [{ToKey, ToPid, Value},
|
|
|
- {{ToPid, K}, r}]),
|
|
|
+ {{ToPid, K}, []}]),
|
|
|
ets:delete(?TAB, {Pid, K}),
|
|
|
ets:delete(?TAB, Key),
|
|
|
_ = gproc_lib:ensure_monitor(ToPid, l),
|
|
@@ -1598,7 +1729,7 @@ is_var('$8') -> {true,8};
|
|
|
is_var('$9') -> {true,9};
|
|
|
is_var(X) when is_atom(X) ->
|
|
|
case atom_to_list(X) of
|
|
|
- "$" ++ Tl ->
|
|
|
+ "\$" ++ Tl ->
|
|
|
try N = list_to_integer(Tl),
|
|
|
{true,N}
|
|
|
catch
|
|
@@ -1704,10 +1835,10 @@ qlc_lookup(_Scope, 1, Keys) ->
|
|
|
qlc_lookup(Scope, 2, Pids) ->
|
|
|
lists:flatmap(fun(Pid) ->
|
|
|
Found =
|
|
|
- ets:select(?TAB, [{ {{Pid, rev_keypat(Scope)}, r},
|
|
|
- [], ['$_']}]),
|
|
|
+ ets:select(?TAB, [{{{Pid, rev_keypat(Scope)}, '_'},
|
|
|
+ [], ['$_']}]),
|
|
|
lists:flatmap(
|
|
|
- fun({{_,{T,_,_}=K}, r}) ->
|
|
|
+ fun({{_,{T,_,_}=K}, _}) ->
|
|
|
K2 = if T==n orelse T==a -> T;
|
|
|
true -> Pid
|
|
|
end,
|