Browse Source

Resources & resource_counters work in both scopes (test cases added)
- Fixed broken type docs
- Added gproc:reg/4 (specify attributes at registration time)
- Types moved into gproc.erl and exported from there

Ulf Wiger 10 years ago
parent
commit
6c0ab17fdb
7 changed files with 361 additions and 167 deletions
  1. 0 23
      include/gproc.hrl
  2. 146 101
      src/gproc.erl
  3. 48 28
      src/gproc_dist.erl
  4. 24 14
      src/gproc_lib.erl
  5. 74 0
      test/gproc_dist_tests.erl
  6. 14 1
      test/gproc_test_lib.erl
  7. 55 0
      test/gproc_tests.erl

+ 0 - 23
include/gproc.hrl

@@ -18,26 +18,3 @@
 %% gproc.hrl: Common definitions
 %% gproc.hrl: Common definitions
 
 
 -define(TAB, gproc).
 -define(TAB, gproc).
-
-
--type type()     :: n | p | c | a.
--type scope()    :: l | g.
--type context()  :: {scope(),type()} | type().
--type sel_type() :: n | p | c | a |
-                    names | props | counters | aggr_counters.
-
--type sel_var() :: '_' | atom().
--type keypat()  :: {sel_type() | sel_var(), l | g | sel_var(), any()}.
--type pidpat()  :: pid() | sel_var().
--type headpat() :: {keypat(),pidpat(),any()}.
--type key()     :: {type(), scope(), any()}.
-
--type sel_pattern() :: [{headpat(), list(), list()}].
-
-%% update_counter increment
--type ctr_incr()   :: integer().
--type ctr_thr()    :: integer().
--type ctr_setval() :: integer().
--type ctr_update()  :: ctr_incr()
-		     | {ctr_incr(), ctr_thr(), ctr_setval()}.
--type increment() :: ctr_incr() | ctr_update() | [ctr_update()].

+ 146 - 101
src/gproc.erl

@@ -42,43 +42,13 @@
 %%
 %%
 %% @end
 %% @end
 
 
-%% @type type()  = n | p | c | a. n = name; p = property; c = counter;
-%%                                a = aggregate_counter
-%% @type scope() = l | g. l = local registration; g = global registration
-%%
-%% @type reg_id() = {type(), scope(), any()}.
-%% @type unique_id() = {n | a, scope(), any()}.
-%%
-%% @type monitor_type() = info | standby | follow.
-%%
-%% @type sel_scope() = scope | all | global | local.
-%% @type sel_type() = type() | names | props | counters | aggr_counters.
-%% @type context() = {scope(), type()} | type(). {'all','all'} is the default
-%%
-%% @type headpat() = {keypat(),pidpat(),ValPat}.
-%% @type keypat() = {sel_type() | sel_var(),
-%%                   l | g | sel_var(),
-%%                   any()}.
-%% @type pidpat() = pid() | sel_var().
-%% @type sel_var() = DollarVar | '_'.
-%% @type sel_pattern() = [{headpat(), Guards, Prod}].
-%% @type key()   = {type(), scope(), any()}.
-%%
-%% update_counter increment
-%% @type ctr_incr()   = integer().
-%% @type ctr_thr()    = integer().
-%% @type ctr_setval() = integer().
-%% @type ctr_update()  = ctr_incr()
-%% 		     | {ctr_incr(), ctr_thr(), ctr_setval()}.
-%% @type increment() = ctr_incr() | ctr_update() | [ctr_update()].
-
 -module(gproc).
 -module(gproc).
 -behaviour(gen_server).
 -behaviour(gen_server).
 
 
 -export([start_link/0,
 -export([start_link/0,
-         reg/1, reg/2, unreg/1, set_attributes/2,
+         reg/1, reg/2, reg/3, unreg/1, set_attributes/2,
 	 reg_or_locate/1, reg_or_locate/2, reg_or_locate/3,
 	 reg_or_locate/1, reg_or_locate/2, reg_or_locate/3,
-	 reg_shared/1, reg_shared/2, unreg_shared/1,
+	 reg_shared/1, reg_shared/2, reg_shared/3, unreg_shared/1,
 	 set_attributes_shared/2, set_value_shared/2,
 	 set_attributes_shared/2, set_value_shared/2,
          mreg/3,
          mreg/3,
          munreg/3,
          munreg/3,
@@ -161,6 +131,40 @@
 -include("gproc_int.hrl").
 -include("gproc_int.hrl").
 -include("gproc.hrl").
 -include("gproc.hrl").
 
 
+-export_type([scope/0, type/0, key/0,
+              context/0, sel_pattern/0, sel_scope/0, sel_context/0,
+              reg_id/0, unique_id/0, monitor_type/0]).
+
+-type type()     :: n | p | c | a | r | rc.
+-type scope()    :: l | g.
+-type context()  :: {scope(),type()} | type().
+-type sel_type() :: type()
+                    | names | props | counters | aggr_counters
+                    | resources | resource_counters.
+
+-type sel_var() :: '_' | atom().
+-type keypat()  :: {sel_type() | sel_var(), l | g | sel_var(), any()}.
+-type pidpat()  :: pid() | sel_var().
+-type headpat() :: {keypat(), pidpat(), any()}.
+-type key()     :: {type(), scope(), any()}.
+
+-type sel_pattern() :: [{headpat(), list(), list()}].
+
+-type reg_id() :: {type(), scope(), any()}.
+-type unique_id() :: {n | a, scope(), any()}.
+-type monitor_type() :: info | standby | follow.
+-type sel_scope() :: scope | all | global | local.
+-type sel_context() :: {scope(), type()} | type().
+
+%% update_counter increment
+-type ctr_incr()   :: integer().
+-type ctr_thr()    :: integer().
+-type ctr_setval() :: integer().
+-type ctr_update()  :: ctr_incr()
+		     | {ctr_incr(), ctr_thr(), ctr_setval()}.
+-type increment() :: ctr_incr() | ctr_update() | [ctr_update()].
+
+
 -define(SERVER, ?MODULE).
 -define(SERVER, ?MODULE).
 %%-define(l, l(?LINE)). % when activated, calls a traceable empty function
 %%-define(l, l(?LINE)). % when activated, calls a traceable empty function
 -define(l, ignore).
 -define(l, ignore).
@@ -196,7 +200,8 @@ start_link() ->
 %% @doc Registers a local (unique) name. @equiv reg({n,l,Name})
 %% @doc Registers a local (unique) name. @equiv reg({n,l,Name})
 %% @end
 %% @end
 %%
 %%
-add_local_name(Name)  -> ?CATCH_GPROC_ERROR(reg1({n,l,Name}, undefined), [Name]).
+add_local_name(Name)  ->
+    ?CATCH_GPROC_ERROR(reg1({n,l,Name}, undefined, []), [Name]).
 
 
 
 
 %% spec(Name::any()) -> true
 %% spec(Name::any()) -> true
@@ -204,7 +209,8 @@ add_local_name(Name)  -> ?CATCH_GPROC_ERROR(reg1({n,l,Name}, undefined), [Name])
 %% @doc Registers a global (unique) name. @equiv reg({n,g,Name})
 %% @doc Registers a global (unique) name. @equiv reg({n,g,Name})
 %% @end
 %% @end
 %%
 %%
-add_global_name(Name) -> ?CATCH_GPROC_ERROR(reg1({n,g,Name}, undefined), [Name]).
+add_global_name(Name) ->
+    ?CATCH_GPROC_ERROR(reg1({n,g,Name}, undefined, []), [Name]).
 
 
 
 
 %% spec(Name::any(), Value::any()) -> true
 %% spec(Name::any(), Value::any()) -> true
@@ -213,7 +219,7 @@ add_global_name(Name) -> ?CATCH_GPROC_ERROR(reg1({n,g,Name}, undefined), [Name])
 %% @end
 %% @end
 %%
 %%
 add_local_property(Name , Value) ->
 add_local_property(Name , Value) ->
-    ?CATCH_GPROC_ERROR(reg1({p,l,Name}, Value), [Name, Value]).
+    ?CATCH_GPROC_ERROR(reg1({p,l,Name}, Value, []), [Name, Value]).
 
 
 %% spec(Name::any(), Value::any()) -> true
 %% spec(Name::any(), Value::any()) -> true
 %%
 %%
@@ -221,7 +227,7 @@ add_local_property(Name , Value) ->
 %% @end
 %% @end
 %%
 %%
 add_global_property(Name, Value) ->
 add_global_property(Name, Value) ->
-    ?CATCH_GPROC_ERROR(reg1({p,g,Name}, Value), [Name, Value]).
+    ?CATCH_GPROC_ERROR(reg1({p,g,Name}, Value, []), [Name, Value]).
 
 
 %% spec(Name::any(), Initial::integer()) -> true
 %% spec(Name::any(), Initial::integer()) -> true
 %%
 %%
@@ -229,7 +235,7 @@ add_global_property(Name, Value) ->
 %% @end
 %% @end
 %%
 %%
 add_local_counter(Name, Initial) when is_integer(Initial) ->
 add_local_counter(Name, Initial) when is_integer(Initial) ->
-    ?CATCH_GPROC_ERROR(reg1({c,l,Name}, Initial), [Name, Initial]).
+    ?CATCH_GPROC_ERROR(reg1({c,l,Name}, Initial, []), [Name, Initial]).
 
 
 
 
 %% spec(Name::any(), Initial::integer()) -> true
 %% spec(Name::any(), Initial::integer()) -> true
@@ -248,7 +254,7 @@ add_shared_local_counter(Name, Initial) when is_integer(Initial) ->
 %% @end
 %% @end
 %%
 %%
 add_global_counter(Name, Initial) when is_integer(Initial) ->
 add_global_counter(Name, Initial) when is_integer(Initial) ->
-    ?CATCH_GPROC_ERROR(reg1({c,g,Name}, Initial), [Name, Initial]).
+    ?CATCH_GPROC_ERROR(reg1({c,g,Name}, Initial, []), [Name, Initial]).
 
 
 %% spec(Name::any()) -> true
 %% spec(Name::any()) -> true
 %%
 %%
@@ -538,7 +544,7 @@ lookup_env(Scope, App, Key, P) ->
 
 
 cache_env(Scope, App, Key, Value) ->
 cache_env(Scope, App, Key, Value) ->
     ?CATCH_GPROC_ERROR(
     ?CATCH_GPROC_ERROR(
-       reg1({p, Scope, {gproc_env, App, Key}}, Value),
+       reg1({p, Scope, {gproc_env, App, Key}}, Value, []),
        [Scope,App,Key,Value]).
        [Scope,App,Key,Value]).
 
 
 update_cached_env(Scope, App, Key, Value) ->
 update_cached_env(Scope, App, Key, Value) ->
@@ -604,13 +610,13 @@ is_string(S) ->
 %% @spec reg(Key::key()) -> true
 %% @spec reg(Key::key()) -> true
 %%
 %%
 %% @doc
 %% @doc
-%% @equiv reg(Key, default(Key))
+%% @equiv reg(Key, default(Key), [])
 %% @end
 %% @end
 reg(Key) ->
 reg(Key) ->
     ?CATCH_GPROC_ERROR(reg1(Key), [Key]).
     ?CATCH_GPROC_ERROR(reg1(Key), [Key]).
 
 
 reg1(Key) ->
 reg1(Key) ->
-    reg1(Key, default(Key)).
+    reg1(Key, default(Key), []).
 
 
 %% @spec reg_or_locate(Key::key()) -> {pid(), NewValue}
 %% @spec reg_or_locate(Key::key()) -> {pid(), NewValue}
 %%
 %%
@@ -662,10 +668,10 @@ await(Node, Key, Timeout) when Node == node() ->
 await(Node, Key, Timeout) when is_atom(Node) ->
 await(Node, Key, Timeout) when is_atom(Node) ->
     ?CATCH_GPROC_ERROR(await1(Node, Key, Timeout), [Node, Key, Timeout]).
     ?CATCH_GPROC_ERROR(await1(Node, Key, Timeout), [Node, Key, Timeout]).
 
 
-await1({T,g,_} = Key, Timeout) when T=:=n; T=:=a ->
+await1({T,g,_} = Key, Timeout) when T=:=n; T=:=a; T=:=rc ->
     ?CHK_DIST,
     ?CHK_DIST,
     request_wait(Key, Timeout);
     request_wait(Key, Timeout);
-await1({T,l,_} = Key, Timeout) when T=:=n; T=:=a ->
+await1({T,l,_} = Key, Timeout) when T=:=n; T=:=a; T=:=rc ->
     case ets:lookup(?TAB, {Key, T}) of
     case ets:lookup(?TAB, {Key, T}) of
         [{_, Pid, Value}] ->
         [{_, Pid, Value}] ->
 	    case is_process_alive(Pid) of
 	    case is_process_alive(Pid) of
@@ -685,12 +691,12 @@ await1({T,l,_} = Key, Timeout) when T=:=n; T=:=a ->
             request_wait(Key, Timeout)
             request_wait(Key, Timeout)
     end;
     end;
 await1(_, _) ->
 await1(_, _) ->
-    throw(badarg).
+    ?THROW_GPROC_ERROR(badarg).
 
 
 await1(N, {n,l,_} = Key, Timeout) when is_atom(N) ->
 await1(N, {n,l,_} = Key, Timeout) when is_atom(N) ->
     request_wait(N, Key, Timeout);
     request_wait(N, Key, Timeout);
 await1(_, _, _) ->
 await1(_, _, _) ->
-    throw(badarg).
+    ?THROW_GPROC_ERROR(badarg).
 
 
 
 
 request_wait({_,g,_} = Key, Timeout) ->
 request_wait({_,g,_} = Key, Timeout) ->
@@ -816,16 +822,17 @@ nb_wait(Key) ->
 nb_wait(Node, Key) ->
 nb_wait(Node, Key) ->
     ?CATCH_GPROC_ERROR(nb_wait1(Node, Key), [Node, Key]).
     ?CATCH_GPROC_ERROR(nb_wait1(Node, Key), [Node, Key]).
 
 
-nb_wait1({T,g,_} = Key) when T=:=n; T=:=a ->
+nb_wait1({T,g,_} = Key) when T=:=n; T=:=a; T=:=rc ->
     ?CHK_DIST,
     ?CHK_DIST,
     call({await, Key, self()}, g);
     call({await, Key, self()}, g);
-nb_wait1({T,l,_} = Key) when T=:=n; T=:=a ->
+nb_wait1({T,l,_} = Key) when T=:=n; T=:=a; T=:=rc ->
     call({await, Key, self()}, l);
     call({await, Key, self()}, l);
 nb_wait1(_) ->
 nb_wait1(_) ->
     ?THROW_GPROC_ERROR(badarg).
     ?THROW_GPROC_ERROR(badarg).
 
 
 nb_wait1(Node, {T,l,_} = Key) when is_atom(Node), T=:=n;
 nb_wait1(Node, {T,l,_} = Key) when is_atom(Node), T=:=n;
-                                   is_atom(Node), T=:=a ->
+                                   is_atom(Node), T=:=a;
+                                   is_atom(Node), T=:=rc ->
     call(Node, {await, Key, self()}, l).
     call(Node, {await, Key, self()}, l).
 
 
 
 
@@ -946,25 +953,65 @@ demonitor1(_, _) ->
 %%
 %%
 %%
 %%
 reg(Key, Value) ->
 reg(Key, Value) ->
-    ?CATCH_GPROC_ERROR(reg1(Key, Value), [Key, Value]).
+    ?CATCH_GPROC_ERROR(reg1(Key, Value, []), [Key, Value]).
 
 
-reg1({_,g,_} = Key, Value) ->
+%% @spec reg(Key::key(), Value, Attrs::[{atom(),any()}]) -> true
+%%
+%% @doc Register a name or property for the current process
+%% `Attrs' (default: `[]') can be inspected using {@link get_attribute/2}.
+%%
+%% The structure of `Key' is `{Type, Context, Name}', where:
+%%
+%% * `Context :: l | g' - `l' means 'local' context; `g' means 'global'
+%% * `Type :: p | n | c | a | r | rc' specifies the type of entry
+%%
+%% The semantics of the different types:
+%%
+%% * `p' - 'property', is non-unique, i.e. different processes can each
+%%    register a property with the same name.
+%% * `n' - 'name, is unique within the given context (local or global).
+%% * `c' - 'counter', is similar to a property, but has a numeric value
+%%    and behaves roughly as an ets counter (see {@link update_counter/2}.)
+%% * `a' - 'aggregated counter', is automatically updated by gproc, and
+%%    reflects the sum of all counter objects with the same name in the given
+%%    scope. The initial value for an aggregated counter must be `undefined'.
+%% * `r' - 'resource property', behaves like a property, but can be tracked
+%%    with a 'resource counter'.
+%% * `rc' - 'resource counter', tracks the number of resource properties
+%%    with the same name. When the resource count reaches `0', any triggers
+%%    specified using an `on_zero' attribute may be executed (see below).
+%%
+%% On-zero triggers:
+%%
+%% `Msg = {gproc, resource_on_zero, Context, Name, Pid}'
+%%
+%% * `{send, Key}' - run `gproc:send(Key, Msg)'
+%% * `{bcast, Key}' - run `gproc:bcast(Key, Msg)'
+%% * `publish' - run
+%%  `gproc_ps:publish(Context, gproc_resource_on_zero, {Context, Name, Pid})'
+%% * `{unreg_shared, Type, Name}' - unregister the shared key
+%%  `{Type, Context, Name}'
+%% @end
+reg(Key, Value, Attrs) ->
+    ?CATCH_GPROC_ERROR(reg1(Key, Value, Attrs), [Key, Value, Attrs]).
+
+reg1({T,g,_} = Key, Value, As) when T==p; T==a; T==c; T==n; T==r; T==rc ->
     %% anything global
     %% anything global
     ?CHK_DIST,
     ?CHK_DIST,
-    gproc_dist:reg(Key, Value);
-reg1({p,l,_} = Key, Value) ->
-    local_reg(Key, Value);
-reg1({a,l,_} = Key, undefined) ->
-    call({reg, Key, undefined});
-reg1({c,l,_} = Key, Value) when is_integer(Value) ->
-    call({reg, Key, Value});
-reg1({n,l,_} = Key, Value) ->
-    call({reg, Key, Value});
-reg1({r,l,_} = Key, Value) ->
-    call({reg, Key, Value});
-reg1({rc,l,_} = Key, Value) ->
-    call({reg, Key, Value});
-reg1(_, _) ->
+    gproc_dist:reg(Key, Value, As);
+reg1({p,l,_} = Key, Value, As) ->
+    local_reg(Key, Value, As);
+reg1({a,l,_} = Key, undefined, As) ->
+    call({reg, Key, undefined, As});
+reg1({c,l,_} = Key, Value, As) when is_integer(Value) ->
+    call({reg, Key, Value, As});
+reg1({n,l,_} = Key, Value, As) ->
+    call({reg, Key, Value, As});
+reg1({r,l,_} = Key, Value, As) ->
+    call({reg, Key, Value, As});
+reg1({rc,l,_} = Key, Value, As) ->
+    call({reg, Key, Value, As});
+reg1(_, _, _) ->
     ?THROW_GPROC_ERROR(badarg).
     ?THROW_GPROC_ERROR(badarg).
 
 
 %% @spec reg_or_locate(Key::key(), Value) -> {pid(), NewValue}
 %% @spec reg_or_locate(Key::key(), Value) -> {pid(), NewValue}
@@ -1031,22 +1078,25 @@ reg_shared1({T,_,_} = Key) when T==a; T==p; T==c ->
 %% @end
 %% @end
 %%
 %%
 reg_shared(Key, Value) ->
 reg_shared(Key, Value) ->
-    ?CATCH_GPROC_ERROR(reg_shared1(Key, Value), [Key, Value]).
+    ?CATCH_GPROC_ERROR(reg_shared1(Key, Value, []), [Key, Value]).
+
+reg_shared(Key, Value, Attrs) when is_list(Attrs) ->
+    ?CATCH_GPROC_ERROR(reg_shared1(Key, Value, Attrs), [Key, Value, Attrs]).
 
 
 %% @private
 %% @private
-reg_shared1({_,g,_} = Key, Value) ->
+reg_shared1({_,g,_} = Key, Value, As) ->
     %% anything global
     %% anything global
     ?CHK_DIST,
     ?CHK_DIST,
-    gproc_dist:reg_shared(Key, Value);
-reg_shared1({a,l,_} = Key, undefined) ->
-    call({reg_shared, Key, undefined});
-reg_shared1({c,l,_} = Key, Value) when is_integer(Value) ->
-    call({reg_shared, Key, Value});
-reg_shared1({p,l,_} = Key, Value) ->
-    call({reg_shared, Key, Value});
-reg_shared1({rc,l,_} = Key, undefined) ->
-    call({reg_shared, Key, undefined});
-reg_shared1(_, _) ->
+    gproc_dist:reg_shared(Key, Value, As);
+reg_shared1({a,l,_} = Key, undefined, As) ->
+    call({reg_shared, Key, undefined, As});
+reg_shared1({c,l,_} = Key, Value, As) when is_integer(Value) ->
+    call({reg_shared, Key, Value, As});
+reg_shared1({p,l,_} = Key, Value, As) ->
+    call({reg_shared, Key, Value, As});
+reg_shared1({rc,l,_} = Key, undefined, As) ->
+    call({reg_shared, Key, undefined, As});
+reg_shared1(_, _, _) ->
     ?THROW_GPROC_ERROR(badarg).
     ?THROW_GPROC_ERROR(badarg).
 
 
 %% @spec mreg(type(), scope(), [{Key::any(), Value::any()}]) -> true
 %% @spec mreg(type(), scope(), [{Key::any(), Value::any()}]) -> true
@@ -1243,7 +1293,7 @@ select({?TAB, _, _, _, _, _, _, _} = Continuation) ->
 select(Pat) ->
 select(Pat) ->
     select(all, Pat).
     select(all, Pat).
 
 
-%% @spec (Context::context(), Pat::sel_pattern()) -> [{Key, Pid, Value}]
+%% @spec (Context::sel_context(), Pat::sel_pattern()) -> [{Key, Pid, Value}]
 %%
 %%
 %% @doc Perform a select operation with limited context on the process registry
 %% @doc Perform a select operation with limited context on the process registry
 %%
 %%
@@ -1293,10 +1343,15 @@ select_count(Context, Pat) ->
 %%% Local properties can be registered in the local process, since
 %%% Local properties can be registered in the local process, since
 %%% no other process can interfere.
 %%% no other process can interfere.
 %%%
 %%%
-local_reg(Key, Value) ->
+local_reg({_,Scope,_} = Key, Value, As) ->
     case gproc_lib:insert_reg(Key, Value, self(), l) of
     case gproc_lib:insert_reg(Key, Value, self(), l) of
         false -> ?THROW_GPROC_ERROR(badarg);
         false -> ?THROW_GPROC_ERROR(badarg);
-        true  -> monitor_me()
+        true  ->
+            monitor_me(),
+            if As =/= [] ->
+                    gproc_lib:insert_attr(Key, As, self(), Scope);
+               true -> true
+            end
     end.
     end.
 
 
 local_mreg(_, []) -> true;
 local_mreg(_, []) -> true;
@@ -2067,10 +2122,14 @@ handle_cast({cancel_wait_or_monitor, Pid, {T,_,_} = Key}, S) ->
     {noreply, S}.
     {noreply, S}.
 
 
 %% @hidden
 %% @hidden
-handle_call({reg, {_T,l,_} = Key, Val}, {Pid,_}, S) ->
+handle_call({reg, {_T,l,_} = Key, Val, Attrs}, {Pid,_}, S) ->
     case try_insert_reg(Key, Val, Pid) of
     case try_insert_reg(Key, Val, Pid) of
         true ->
         true ->
             _ = gproc_lib:ensure_monitor(Pid,l),
             _ = gproc_lib:ensure_monitor(Pid,l),
+            _ = if Attrs =/= [] ->
+                        gproc_lib:insert_attr(Key, Attrs, Pid, l);
+                   true -> true
+                end,
             {reply, true, S};
             {reply, true, S};
         false ->
         false ->
             {reply, badarg, S}
             {reply, badarg, S}
@@ -2161,10 +2220,13 @@ handle_call({demonitor, {T,l,_} = Key, Ref, Pid}, _From, S)
 		end
 		end
 	end,
 	end,
     {reply, ok, S};
     {reply, ok, S};
-handle_call({reg_shared, {_T,l,_} = Key, Val}, _From, S) ->
+handle_call({reg_shared, {_T,l,_} = Key, Val, Attrs}, _From, S) ->
     case try_insert_reg(Key, Val, shared) of
     case try_insert_reg(Key, Val, shared) of
-    %% case try_insert_shared(Key, Val) of
 	true ->
 	true ->
+            _ = if Attrs =/= [] ->
+                        gproc_lib:insert_attr(Key, Attrs, shared, l);
+                   true -> true
+                end,
 	    {reply, true, S};
 	    {reply, true, S};
 	false ->
 	false ->
 	    {reply, badarg, S}
 	    {reply, badarg, S}
@@ -2173,7 +2235,6 @@ handle_call({unreg, {_,l,_} = Key}, {Pid,_}, S) ->
     case ets:lookup(?TAB, {Pid,Key}) of
     case ets:lookup(?TAB, {Pid,Key}) of
         [{_, r}] ->
         [{_, r}] ->
             _ = gproc_lib:remove_reg(Key, Pid, unreg, []),
             _ = gproc_lib:remove_reg(Key, Pid, unreg, []),
-            flush_unregs(),
             {reply, true, S};
             {reply, true, S};
         [{_, Opts}] when is_list(Opts) ->
         [{_, Opts}] when is_list(Opts) ->
             _ = gproc_lib:remove_reg(Key, Pid, unreg, Opts),
             _ = gproc_lib:remove_reg(Key, Pid, unreg, Opts),
@@ -2184,8 +2245,7 @@ handle_call({unreg, {_,l,_} = Key}, {Pid,_}, S) ->
 handle_call({unreg_shared, {_,l,_} = Key}, _, S) ->
 handle_call({unreg_shared, {_,l,_} = Key}, _, S) ->
     _ = case ets:lookup(?TAB, {shared, Key}) of
     _ = case ets:lookup(?TAB, {shared, Key}) of
 	    [{_, r}] ->
 	    [{_, r}] ->
-		_ = gproc_lib:remove_reg(Key, shared, unreg, []),
-                flush_unregs();
+		_ = gproc_lib:remove_reg(Key, shared, unreg, []);
 	    [{_, Opts}] ->
 	    [{_, Opts}] ->
 		_ = gproc_lib:remove_reg(Key, shared, unreg, Opts);
 		_ = gproc_lib:remove_reg(Key, shared, unreg, Opts);
 	    [] ->
 	    [] ->
@@ -2212,7 +2272,6 @@ handle_call({mreg, T, l, L}, {Pid,_}, S) ->
     end;
     end;
 handle_call({munreg, T, l, L}, {Pid,_}, S) ->
 handle_call({munreg, T, l, L}, {Pid,_}, S) ->
     _ = gproc_lib:remove_many(T, l, L, Pid),
     _ = gproc_lib:remove_many(T, l, L, Pid),
-    flush_unregs(),
     {reply, true, S};
     {reply, true, S};
 handle_call({set, {_,l,_} = Key, Value}, {Pid,_}, S) ->
 handle_call({set, {_,l,_} = Key, Value}, {Pid,_}, S) ->
     case gproc_lib:do_set_value(Key, Value, Pid) of
     case gproc_lib:do_set_value(Key, Value, Pid) of
@@ -2323,7 +2382,6 @@ try_insert_reg({T,l,_} = Key, Val, Pid) ->
             true
             true
     end.
     end.
 
 
-
 %% try_insert_shared({c,l,_} = Key, Val) ->
 %% try_insert_shared({c,l,_} = Key, Val) ->
 %%     ets:insert_new(?TAB, [{{Key,shared}, shared, Val}, {{shared, Key}, []}]);
 %%     ets:insert_new(?TAB, [{{Key,shared}, shared, Val}, {{shared, Key}, []}]);
 %% try_insert_shared({a,l,_} = Key, Val) ->
 %% try_insert_shared({a,l,_} = Key, Val) ->
@@ -2385,7 +2443,7 @@ process_is_down(Pid) when is_pid(Pid) ->
                  ({{r,l,Rsrc} = K, _}) ->
                  ({{r,l,Rsrc} = K, _}) ->
                       Key = {K, Pid},
                       Key = {K, Pid},
                       ets:delete(?TAB, Key),
                       ets:delete(?TAB, Key),
-                      update_resource_count(l, Rsrc, -1);
+                      gproc_lib:decrement_resource_count(l, Rsrc);
                  ({{rc,l,_} = K, R}) ->
                  ({{rc,l,_} = K, R}) ->
                       remove_aggregate(rc, K, R, Pid);
                       remove_aggregate(rc, K, R, Pid);
                  ({{a,l,_} = K, R}) ->
                  ({{a,l,_} = K, R}) ->
@@ -2398,18 +2456,6 @@ process_is_down(Pid) when is_pid(Pid) ->
             ok
             ok
     end.
     end.
 
 
-update_resource_count(C, R, V) ->
-    gproc_lib:update_resource_count(C, R, V),
-    flush_unregs().
-
-flush_unregs() ->
-    receive
-        {gproc_unreg, _} ->
-            flush_unregs()
-    after 0 ->
-            ok
-    end.
-
 remove_aggregate(T, K, R, Pid) ->
 remove_aggregate(T, K, R, Pid) ->
     case ets:lookup(?TAB, {K,T}) of
     case ets:lookup(?TAB, {K,T}) of
         [{_, Pid, V}] ->
         [{_, Pid, V}] ->
@@ -2428,7 +2474,6 @@ remove_aggregate(T, K, R, Pid) ->
             opt_notify(R, K, Pid, undefined)
             opt_notify(R, K, Pid, undefined)
     end.
     end.
 
 
-
 opt_notify(r, _, _, _) ->
 opt_notify(r, _, _, _) ->
     ok;
     ok;
 opt_notify(Opts, {T,_,_} = Key, Pid, Value) ->
 opt_notify(Opts, {T,_,_} = Key, Pid, Value) ->

+ 48 - 28
src/gproc_dist.erl

@@ -23,9 +23,9 @@
 -behaviour(gen_leader).
 -behaviour(gen_leader).
 
 
 -export([start_link/0, start_link/1,
 -export([start_link/0, start_link/1,
-         reg/1, reg/2, unreg/1,
+         reg/1, reg/3, unreg/1,
 	 reg_or_locate/3,
 	 reg_or_locate/3,
-	 reg_shared/2, unreg_shared/1,
+	 reg_shared/3, unreg_shared/1,
          monitor/2,
          monitor/2,
          demonitor/2,
          demonitor/2,
 	 set_attributes/2,
 	 set_attributes/2,
@@ -91,7 +91,7 @@ start_link({Nodes, Opts}) ->
 %% {@see gproc:reg/1}
 %% {@see gproc:reg/1}
 %%
 %%
 reg(Key) ->
 reg(Key) ->
-    reg(Key, gproc:default(Key)).
+    reg(Key, gproc:default(Key), []).
 
 
 %% {@see gproc:reg_or_locate/2}
 %% {@see gproc:reg_or_locate/2}
 %%
 %%
@@ -117,15 +117,15 @@ reg_or_locate(_, _, _) ->
 %%%          | a  - aggregated counter
 %%%          | a  - aggregated counter
 %%%    Scope = l | g (global or local)
 %%%    Scope = l | g (global or local)
 %%% @end
 %%% @end
-reg({_,g,_} = Key, Value) ->
+reg({_,g,_} = Key, Value, Attrs) ->
     %% anything global
     %% anything global
-    leader_call({reg, Key, Value, self()});
-reg(_, _) ->
+    leader_call({reg, Key, Value, self(), Attrs});
+reg(_, _, _) ->
     ?THROW_GPROC_ERROR(badarg).
     ?THROW_GPROC_ERROR(badarg).
 
 
-reg_shared({_,g,_} = Key, Value) ->
-    leader_call({reg, Key, Value, shared});
-reg_shared(_, _) ->
+reg_shared({_,g,_} = Key, Value, Attrs) ->
+    leader_call({reg, Key, Value, shared, Attrs});
+reg_shared(_, _, _) ->
     ?THROW_GPROC_ERROR(badarg).
     ?THROW_GPROC_ERROR(badarg).
 
 
 monitor({_,g,_} = Key, Type) when Type==info;
 monitor({_,g,_} = Key, Type) when Type==info;
@@ -255,6 +255,8 @@ handle_info({'DOWN', _MRef, process, Pid, _}, S) ->
     ets:delete(?TAB, {Pid, g}),
     ets:delete(?TAB, {Pid, g}),
     leader_cast({pid_is_DOWN, Pid}),
     leader_cast({pid_is_DOWN, Pid}),
     {ok, S};
     {ok, S};
+handle_info({gproc_unreg, Objs}, S) ->
+    {ok, [{delete, Objs}], S};
 handle_info(_, S) ->
 handle_info(_, S) ->
     {ok, S}.
     {ok, S}.
 
 
@@ -332,23 +334,17 @@ handle_leader_call(sync, From, #state{sync_requests = SReqs} = S, E) ->
             GenLeader:broadcast({from_leader, {sync, From}}, Alive, E),
             GenLeader:broadcast({from_leader, {sync, From}}, Alive, E),
             {noreply, S#state{sync_requests = [{From, Alive}|SReqs]}}
             {noreply, S#state{sync_requests = [{From, Alive}|SReqs]}}
     end;
     end;
-handle_leader_call({reg, {_C,g,_Name} = K, Value, Pid}, _From, S, _E) ->
+handle_leader_call({reg, {_C,g,_Name} = K, Value, Pid, As}, _From, S, _E) ->
     case gproc_lib:insert_reg(K, Value, Pid, g) of
     case gproc_lib:insert_reg(K, Value, Pid, g) of
         false ->
         false ->
             {reply, badarg, S};
             {reply, badarg, S};
         true ->
         true ->
             _ = gproc_lib:ensure_monitor(Pid,g),
             _ = gproc_lib:ensure_monitor(Pid,g),
+            _ = if As =/= [] ->
+                        gproc_lib:insert_attr(K, As, Pid, g);
+                   true -> []
+                end,
 	    Vals = mk_broadcast_insert_vals([{K, Pid, Value}]),
 	    Vals = mk_broadcast_insert_vals([{K, Pid, Value}]),
-            %% Vals =
-            %%     if C == a ->
-            %%             ets:lookup(?TAB, {K,a});
-            %%        C == c ->
-            %%             [{{K,Pid},Pid,Value} | ets:lookup(?TAB,{{a,g,Name},a})];
-            %%        C == n ->
-            %%             [{{K,n},Pid,Value}];
-            %%        true ->
-            %%             [{{K,Pid},Pid,Value}]
-            %%     end,
             {reply, true, [{insert, Vals}], S}
             {reply, true, [{insert, Vals}], S}
     end;
     end;
 handle_leader_call({monitor, {T,g,_} = K, MPid, Type}, _From, S, _E) when T==n;
 handle_leader_call({monitor, {T,g,_} = K, MPid, Type}, _From, S, _E) when T==n;
@@ -484,7 +480,7 @@ handle_leader_call({reset_counter, {c,g,_Ctr} = Key, Pid}, _From, S, _E) ->
 	    {reply, badarg, S}
 	    {reply, badarg, S}
     end;
     end;
 handle_leader_call({unreg, {T,g,Name} = K, Pid}, _From, S, _E) ->
 handle_leader_call({unreg, {T,g,Name} = K, Pid}, _From, S, _E) ->
-    Key = if T == n; T == a -> {K,T};
+    Key = if T == n; T == a; T == rc -> {K,T};
              true -> {K, Pid}
              true -> {K, Pid}
           end,
           end,
     case ets:member(?TAB, Key) of
     case ets:member(?TAB, Key) of
@@ -499,6 +495,14 @@ handle_leader_call({unreg, {T,g,Name} = K, Pid}, _From, S, _E) ->
                         [] ->
                         [] ->
                             {reply, true, [{delete, [Key, {Pid,K}]}], S}
                             {reply, true, [{delete, [Key, {Pid,K}]}], S}
                     end;
                     end;
+               T == r ->
+                    case ets:lookup(?TAB, {{rc,g,Name},rc}) of
+                        [RC] ->
+                            {reply, true, [{delete,[Key, {Pid,K}]},
+                                           {insert, [RC]}], S};
+                        [] ->
+                            {reply, true, [{delete, [Key, {Pid, K}]}], S}
+                    end;
                true ->
                true ->
                     {reply, true, [{notify, [{K, Pid, unreg}]},
                     {reply, true, [{notify, [{K, Pid, unreg}]},
                                    {delete, [Key, {Pid,K}]}], S}
                                    {delete, [Key, {Pid,K}]}], S}
@@ -507,7 +511,7 @@ handle_leader_call({unreg, {T,g,Name} = K, Pid}, _From, S, _E) ->
             {reply, badarg, S}
             {reply, badarg, S}
     end;
     end;
 handle_leader_call({give_away, {T,g,_} = K, To, Pid}, _From, S, _E)
 handle_leader_call({give_away, {T,g,_} = K, To, Pid}, _From, S, _E)
-  when T == a; T == n ->
+  when T == a; T == n; T == rc ->
     Key = {K, T},
     Key = {K, T},
     case ets:lookup(?TAB, Key) of
     case ets:lookup(?TAB, Key) of
         [{_, Pid, Value}] ->
         [{_, Pid, Value}] ->
@@ -537,7 +541,7 @@ handle_leader_call({give_away, {T,g,_} = K, To, Pid}, _From, S, _E)
             {reply, badarg, S}
             {reply, badarg, S}
     end;
     end;
 handle_leader_call({mreg, T, g, L, Pid}, _From, S, _E) ->
 handle_leader_call({mreg, T, g, L, Pid}, _From, S, _E) ->
-    if T==p; T==n ->
+    if T==p; T==n; T==r ->
             try gproc_lib:insert_many(T, g, L, Pid) of
             try gproc_lib:insert_many(T, g, L, Pid) of
                 {true,Objs} -> {reply, true, [{insert,Objs}], S};
                 {true,Objs} -> {reply, true, [{insert,Objs}], S};
                 false       -> {reply, badarg, S}
                 false       -> {reply, badarg, S}
@@ -655,11 +659,14 @@ handle_leader_cast({pid_is_DOWN, Pid}, S, _E) ->
 mk_broadcast_insert_vals(Objs) ->
 mk_broadcast_insert_vals(Objs) ->
     lists:flatmap(
     lists:flatmap(
       fun({{C, g, Name} = K, Pid, Value}) ->
       fun({{C, g, Name} = K, Pid, Value}) ->
-	      if C == a ->
-		      ets:lookup(?TAB, {K,a}) ++ ets:lookup(?TAB, {Pid,K});
+	      if C == a; C == rc ->
+		      ets:lookup(?TAB, {K,C}) ++ ets:lookup(?TAB, {Pid,K});
 		 C == c ->
 		 C == c ->
 		      [{{K,Pid},Pid,Value} | ets:lookup(?TAB,{{a,g,Name},a})]
 		      [{{K,Pid},Pid,Value} | ets:lookup(?TAB,{{a,g,Name},a})]
 			  ++ ets:lookup(?TAB, {Pid,K});
 			  ++ ets:lookup(?TAB, {Pid,K});
+                 C == r ->
+                      [{{K,Pid},Pid,Value} | ets:lookup(?TAB,{{rc,g,Name},rc})]
+                          ++ ets:lookup(?TAB, {Pid, K});
 		 C == n ->
 		 C == n ->
 		      [{{K,n},Pid,Value}| ets:lookup(?TAB, {Pid,K})];
 		      [{{K,n},Pid,Value}| ets:lookup(?TAB, {Pid,K})];
 		 true ->
 		 true ->
@@ -671,7 +678,7 @@ mk_broadcast_insert_vals(Objs) ->
 process_globals(Globals) ->
 process_globals(Globals) ->
     {Modified, Notifications} =
     {Modified, Notifications} =
         lists:foldl(
         lists:foldl(
-          fun({{T,_,_} = Key, Pid}, A) when T==n; T==a ->
+          fun({{T,_,_} = Key, Pid}, A) when T==n; T==a; T==rc ->
                   case ets:lookup(?TAB, {Pid,Key}) of
                   case ets:lookup(?TAB, {Pid,Key}) of
                       [{_, Opts}] when is_list(Opts) ->
                       [{_, Opts}] when is_list(Opts) ->
                           maybe_failover(Key, Pid, Opts, A);
                           maybe_failover(Key, Pid, Opts, A);
@@ -683,6 +690,8 @@ process_globals(Globals) ->
                             c ->
                             c ->
                                 Incr = ets:lookup_element(?TAB, {Key,Pid}, 3),
                                 Incr = ets:lookup_element(?TAB, {Key,Pid}, 3),
                                 update_aggr_counter(Key, -Incr) ++ MA;
                                 update_aggr_counter(Key, -Incr) ++ MA;
+                            r ->
+                                decrement_resource_count(Key, []) ++ MA;
                             _ ->
                             _ ->
                                MA
                                MA
                         end,
                         end,
@@ -813,7 +822,7 @@ do_notify([]) ->
     ok.
     ok.
 
 
 
 
-ets_key({T,_,_} = K, _) when T==n; T==a ->
+ets_key({T,_,_} = K, _) when T==n; T==a; T==rc ->
     {K, T};
     {K, T};
 ets_key(K, Pid) ->
 ets_key(K, Pid) ->
     {K, Pid}.
     {K, Pid}.
@@ -951,6 +960,17 @@ update_aggr_counter({c,g,Ctr}, Incr, Acc) ->
             [New|Acc]
             [New|Acc]
     end.
     end.
 
 
+decrement_resource_count({r,g,Rsrc}, Acc) ->
+    Key = {{rc,g,Rsrc},rc},
+    case ets:member(?TAB, Key) of
+        false ->
+            Acc;
+        true ->
+            %% Call the lib function, which might trigger events
+            gproc_lib:decrement_resource_count(g, Rsrc),
+            ets:lookup(?TAB, Key) ++ Acc
+    end.
+
 pid_to_give_away_to(P) when is_pid(P) ->
 pid_to_give_away_to(P) when is_pid(P) ->
     P;
     P;
 pid_to_give_away_to({T,g,_} = Key) when T==n; T==a ->
 pid_to_give_away_to({T,g,_} = Key) when T==n; T==a ->
@@ -962,7 +982,7 @@ pid_to_give_away_to({T,g,_} = Key) when T==n; T==a ->
     end.
     end.
 
 
 insert_reg([{_, Waiters}], K, Val, Pid, Event) ->
 insert_reg([{_, Waiters}], K, Val, Pid, Event) ->
-    gproc_lib:insert_reg(K, Val, Pid, g, []),
+    gproc_lib:insert_reg(K, Val, Pid, g),
     tell_waiters(Waiters, K, Pid, Val, Event).
     tell_waiters(Waiters, K, Pid, Val, Event).
 
 
 tell_waiters([{P,R}|T], K, Pid, V, Event) ->
 tell_waiters([{P,R}|T], K, Pid, V, Event) ->

+ 24 - 14
src/gproc_lib.erl

@@ -42,6 +42,7 @@
 	 remove_wait/4,
 	 remove_wait/4,
          update_aggr_counter/3,
          update_aggr_counter/3,
          update_counter/3,
          update_counter/3,
+         decrement_resource_count/2,
 	 valid_opts/2]).
 	 valid_opts/2]).
 
 
 -export([dbg/1]).
 -export([dbg/1]).
@@ -62,7 +63,7 @@ dbg(Mods) ->
 %% Pid around as payload as well. This is a bit redundant, but
 %% Pid around as payload as well. This is a bit redundant, but
 %% symmetric.
 %% symmetric.
 %%
 %%
--spec insert_reg(key(), any(), pid() | shared, scope()) -> boolean().
+-spec insert_reg(gproc:key(), any(), pid() | shared, gproc:scope()) -> boolean().
 insert_reg(K, Value, Pid, Scope) ->
 insert_reg(K, Value, Pid, Scope) ->
     insert_reg(K, Value, Pid, Scope, registered).
     insert_reg(K, Value, Pid, Scope, registered).
 
 
@@ -153,7 +154,7 @@ get_attr(Attr, Pid, {_,_,_} = Key, Default) ->
             Default
             Default
     end.
     end.
 
 
--spec insert_many(type(), scope(), [{key(),any()}], pid()) ->
+-spec insert_many(gproc:type(), gproc:scope(), [{gproc:key(),any()}], pid()) ->
           {true,list()} | false.
           {true,list()} | false.
 
 
 insert_many(T, Scope, KVL, Pid) ->
 insert_many(T, Scope, KVL, Pid) ->
@@ -183,7 +184,7 @@ insert_many(T, Scope, KVL, Pid) ->
             end
             end
     end.
     end.
 
 
--spec insert_objects([{key(), pid(), any()}]) -> ok.
+-spec insert_objects([{gproc:key(), pid(), any()}]) -> ok.
 
 
 insert_objects(Objs) ->
 insert_objects(Objs) ->
     lists:foreach(
     lists:foreach(
@@ -239,7 +240,7 @@ maybe_waiters(K, Pid, Value, T, Event) ->
             false
             false
     end.
     end.
 
 
--spec notify_waiters([{pid(), reference()}], key(), pid(), any(), any()) -> ok.
+-spec notify_waiters([{pid(), reference()}], gproc:key(), pid(), any(), any()) -> ok.
 notify_waiters([{P, Ref}|T], K, Pid, V, E) ->
 notify_waiters([{P, Ref}|T], K, Pid, V, E) ->
     P ! {gproc, Ref, registered, {K, Pid, V}},
     P ! {gproc, Ref, registered, {K, Pid, V}},
     notify_waiters(T, K, Pid, V, E);
     notify_waiters(T, K, Pid, V, E);
@@ -302,7 +303,7 @@ remove_monitors(Key, Pid, MPid) ->
     end.
     end.
 
 
 
 
-mk_reg_objs(T, Scope, Pid, L) when T==n; T==a ->
+mk_reg_objs(T, Scope, Pid, L) when T==n; T==a; T==rc ->
     lists:map(fun({K,V}) ->
     lists:map(fun({K,V}) ->
                       {{{T,Scope,K},T}, Pid, V};
                       {{{T,Scope,K},T}, Pid, V};
                  (_) ->
                  (_) ->
@@ -542,6 +543,9 @@ expand_ops(_) ->
 update_aggr_counter(C, N, Val) ->
 update_aggr_counter(C, N, Val) ->
     ?MAY_FAIL(ets:update_counter(?TAB, {{a,C,N},a}, {3, Val})).
     ?MAY_FAIL(ets:update_counter(?TAB, {{a,C,N},a}, {3, Val})).
 
 
+decrement_resource_count(C, N) ->
+    update_resource_count(C, N, -1).
+
 update_resource_count(C, N, Val) ->
 update_resource_count(C, N, Val) ->
     try ets:update_counter(?TAB, {{rc,C,N},rc}, {3, Val}) of
     try ets:update_counter(?TAB, {{rc,C,N},rc}, {3, Val}) of
         0 ->
         0 ->
@@ -566,28 +570,34 @@ resource_count_zero(C, N) ->
 perform_on_zero(Actions, C, N, Pid) ->
 perform_on_zero(Actions, C, N, Pid) ->
     lists:foreach(
     lists:foreach(
       fun(A) ->
       fun(A) ->
-              perform_on_zero_(A, C, N, Pid)
+              try perform_on_zero_(A, C, N, Pid)
+              catch error:_ -> ignore
+              end
       end, Actions).
       end, Actions).
 
 
 perform_on_zero_({send, ToProc}, C, N, Pid) ->
 perform_on_zero_({send, ToProc}, C, N, Pid) ->
     gproc:send(ToProc, {gproc, resource_on_zero, C, N, Pid}),
     gproc:send(ToProc, {gproc, resource_on_zero, C, N, Pid}),
-    [];
+    ok;
+perform_on_zero_({bcast, ToProc}, C, N, Pid) ->
+    gproc:bcast(ToProc, {gproc, resource_on_zero, C, N, Pid}),
+    ok;
 perform_on_zero_(publish, C, N, Pid) ->
 perform_on_zero_(publish, C, N, Pid) ->
     gproc_ps:publish(C, gproc_resource_on_zero, {C, N, Pid}),
     gproc_ps:publish(C, gproc_resource_on_zero, {C, N, Pid}),
-    [];
+    ok;
 perform_on_zero_({unreg_shared, T,N}, C, _, _) ->
 perform_on_zero_({unreg_shared, T,N}, C, _, _) ->
     K = {T, C, N},
     K = {T, C, N},
     case ets:member(?TAB, {K, shared}) of
     case ets:member(?TAB, {K, shared}) of
         true ->
         true ->
-            self() ! {gproc_unreg, remove_reg(K, shared, unreg)};
+            Objs = remove_reg(K, shared, unreg),
+            _ = if C == g -> self() ! {gproc_unreg, Objs};
+                   true   -> ok
+                end,
+            ok;
         false ->
         false ->
-            []
+            ok
     end;
     end;
 perform_on_zero_(_, _, _, _) ->
 perform_on_zero_(_, _, _, _) ->
-    [].
-
-
-
+    ok.
 
 
 scan_existing_counters(Ctxt, Name) ->
 scan_existing_counters(Ctxt, Name) ->
     Head = {{{c,Ctxt,Name},'_'},'_','$1'},
     Head = {{{c,Ctxt,Name},'_'},'_','$1'},

+ 74 - 0
test/gproc_dist_tests.erl

@@ -64,6 +64,15 @@ dist_test_() ->
                                        ?debugVal(t_awaited_aggr_counter(Ns))
                                        ?debugVal(t_awaited_aggr_counter(Ns))
                                end,
                                end,
                                fun() ->
                                fun() ->
+                                       ?debugVal(t_simple_resource_count(Ns))
+                               end,
+                               fun() ->
+                                       ?debugVal(t_awaited_resource_count(Ns))
+                               end,
+                               fun() ->
+                                       ?debugVal(t_resource_count_on_zero(Ns))
+                               end,
+                               fun() ->
                                        ?debugVal(t_update_counters(Ns))
                                        ?debugVal(t_update_counters(Ns))
                                end,
                                end,
                                fun() ->
                                fun() ->
@@ -130,6 +139,7 @@ run_dist_tests() ->
 -define(T_NAME, {n, g, {?MODULE, ?LINE, erlang:now()}}).
 -define(T_NAME, {n, g, {?MODULE, ?LINE, erlang:now()}}).
 -define(T_KVL, [{foo, "foo"}, {bar, "bar"}]).
 -define(T_KVL, [{foo, "foo"}, {bar, "bar"}]).
 -define(T_COUNTER, {c, g, {?MODULE, ?LINE}}).
 -define(T_COUNTER, {c, g, {?MODULE, ?LINE}}).
+-define(T_RESOURCE, {r, g, {?MODULE, ?LINE}}).
 -define(T_PROP, {p, g, ?MODULE}).
 -define(T_PROP, {p, g, ?MODULE}).
 
 
 t_simple_reg([H|_] = Ns) ->
 t_simple_reg([H|_] = Ns) ->
@@ -227,6 +237,69 @@ t_awaited_aggr_counter([H1,H2|_] = Ns) ->
     flush_down(Ref),
     flush_down(Ref),
     ?assertMatch(ok, t_call(P1, die)).
     ?assertMatch(ok, t_call(P1, die)).
 
 
+t_simple_resource_count([H1,H2|_] = Ns) ->
+    {r,g,Nm} = R = ?T_RESOURCE,
+    RC = {rc,g,Nm},
+    Pr1 = t_spawn_reg(H1, R, 3),
+    Prc = t_spawn_reg(H2, RC),
+    ?assertMatch(ok, t_read_everywhere(R, Pr1, Ns, 3)),
+    ?assertMatch(ok, t_read_everywhere(RC, Prc, Ns, 1)),
+    Pr2 = t_spawn_reg(H2, R, 4),
+    ?assertMatch(ok, t_read_everywhere(R, Pr2, Ns, 4)),
+    ?assertMatch(ok, t_read_everywhere(RC, Prc, Ns, 2)),
+    ?assertMatch(ok, t_call(Pr1, die)),
+    ?assertMatch(ok, t_read_everywhere(RC, Prc, Ns, 1)),
+    ?assertMatch(ok, t_call(Pr2, die)),
+    ?assertMatch(ok, t_call(Prc, die)).
+
+t_awaited_resource_count([H1,H2|_] = Ns) ->
+    {r,g,Nm} = R = ?T_RESOURCE,
+    RC = {rc,g,Nm},
+    Pr1 = t_spawn_reg(H1, R, 3),
+    P = t_spawn(H2),
+    Ref = erlang:monitor(process, P),
+    P ! {self(), Ref, {apply, gproc, await, [RC]}},
+    t_sleep(),
+    P1 = t_spawn_reg(H2, RC),
+    ?assert(P1 == receive
+                      {P, Ref, Res} ->
+                          element(1, Res);
+                      {'DOWN', Ref, _, _, Reason} ->
+                          erlang:error(Reason);
+                      Other ->
+                          erlang:error({received, Other})
+                  end),
+    ?assertMatch(ok, t_read_everywhere(RC, P1, Ns, 1)),
+    ?assertMatch(ok, t_call(Pr1, die)),
+    ?assertMatch(ok, t_call(P, die)),
+    flush_down(Ref),
+    ?assertMatch(ok, t_call(P1, die)).
+
+t_resource_count_on_zero([H1,H2|_] = Ns) ->
+    {r,g,Nm} = R = ?T_RESOURCE,
+    Prop = ?T_PROP,
+    RC = {rc,g,Nm},
+    Pr1 = t_spawn_reg(H1, R, 3),
+    Pp = t_spawn_reg(H2, Prop),
+    ?assertMatch(ok, t_call(Pp, {selective, true})),
+    Prc = t_spawn_reg(H2, RC, undefined, [{on_zero, [{send, Prop}]}]),
+    ?assertMatch(ok, t_read_everywhere(R, Pr1, Ns, 3)),
+    ?assertMatch(ok, t_read_everywhere(RC, Prc, Ns, 1)),
+    ?assertMatch(ok, t_call(Pr1, die)),
+    ?assertMatch(ok, t_read_everywhere(RC, Prc, Ns, 0)),
+    ?assertMatch({gproc, resource_on_zero, g, Nm, Prc},
+                 t_call(Pp, {apply_fun, fun() ->
+                                                receive
+                                                    {gproc, _, _, _, _} = M ->
+                                                        M
+                                                after 10000 ->
+                                                        timeout
+                                                end
+                                        end})),
+    ?assertMatch(ok, t_call(Pp, {selective, false})),
+    ?assertMatch(ok, t_call(Pp, die)),
+    ?assertMatch(ok, t_call(Prc, die)).
+
 t_update_counters([H1,H2|_] = Ns) ->
 t_update_counters([H1,H2|_] = Ns) ->
     {c,g,N1} = C1 = ?T_COUNTER,
     {c,g,N1} = C1 = ?T_COUNTER,
     A1 = {a,g,N1},
     A1 = {a,g,N1},
@@ -493,6 +566,7 @@ t_spawn(Node, Selective) -> gproc_test_lib:t_spawn(Node, Selective).
 t_spawn_mreg(Node, KVL) -> gproc_test_lib:t_spawn_mreg(Node, KVL).
 t_spawn_mreg(Node, KVL) -> gproc_test_lib:t_spawn_mreg(Node, KVL).
 t_spawn_reg(Node, N) -> gproc_test_lib:t_spawn_reg(Node, N).
 t_spawn_reg(Node, N) -> gproc_test_lib:t_spawn_reg(Node, N).
 t_spawn_reg(Node, N, V) -> gproc_test_lib:t_spawn_reg(Node, N, V).
 t_spawn_reg(Node, N, V) -> gproc_test_lib:t_spawn_reg(Node, N, V).
+t_spawn_reg(Node, N, V, As) -> gproc_test_lib:t_spawn_reg(Node, N, V, As).
 t_spawn_reg_shared(Node, N, V) -> gproc_test_lib:t_spawn_reg_shared(Node, N, V).
 t_spawn_reg_shared(Node, N, V) -> gproc_test_lib:t_spawn_reg_shared(Node, N, V).
 got_msg(P) -> gproc_test_lib:got_msg(P).
 got_msg(P) -> gproc_test_lib:got_msg(P).
 got_msg(P, Tag) -> gproc_test_lib:got_msg(P, Tag).
 got_msg(P, Tag) -> gproc_test_lib:got_msg(P, Tag).

+ 14 - 1
test/gproc_test_lib.erl

@@ -1,7 +1,7 @@
 -module(gproc_test_lib).
 -module(gproc_test_lib).
 
 
 -export([t_spawn/1, t_spawn/2,
 -export([t_spawn/1, t_spawn/2,
-         t_spawn_reg/2, t_spawn_reg/3,
+         t_spawn_reg/2, t_spawn_reg/3, t_spawn_reg/4,
          t_spawn_reg_shared/3,
          t_spawn_reg_shared/3,
          t_spawn_mreg/2,
          t_spawn_mreg/2,
          t_call/2,
          t_call/2,
@@ -42,6 +42,19 @@ t_spawn_reg(Node, Name, Value) ->
             erlang:error({timeout, t_spawn_reg, [Node, Name, Value]})
             erlang:error({timeout, t_spawn_reg, [Node, Name, Value]})
     end.
     end.
 
 
+t_spawn_reg(Node, Name, Value, Attrs) ->
+    Me = self(),
+    P = spawn(Node, fun() ->
+                            ?assertMatch(true, gproc:reg(Name, Value, Attrs)),
+                            Me ! {self(), ok},
+                            t_loop()
+                    end),
+    receive
+	{P, ok} -> P
+    after 1000 ->
+            erlang:error({timeout, t_spawn_reg, [Node, Name, Value]})
+    end.
+
 t_spawn_mreg(Node, KVL) ->
 t_spawn_mreg(Node, KVL) ->
     Me = self(),
     Me = self(),
     P = spawn(Node, fun() ->
     P = spawn(Node, fun() ->

+ 55 - 0
test/gproc_tests.erl

@@ -86,6 +86,12 @@ reg_test_() ->
       , ?_test(t_is_clean())
       , ?_test(t_is_clean())
       , {spawn, ?_test(?debugVal(t_awaited_aggr_counter()))}
       , {spawn, ?_test(?debugVal(t_awaited_aggr_counter()))}
       , ?_test(t_is_clean())
       , ?_test(t_is_clean())
+      , {spawn, ?_test(?debugVal(t_simple_resource_count()))}
+      , ?_test(t_is_clean())
+      , {spawn, ?_test(?debugVal(t_awaited_resource_count()))}
+      , ?_test(t_is_clean())
+      , {spawn, ?_test(?debugVal(t_resource_count_on_zero_send()))}
+      , ?_test(t_is_clean())
       , {spawn, ?_test(?debugVal(t_update_counters()))}
       , {spawn, ?_test(?debugVal(t_update_counters()))}
       , ?_test(t_is_clean())
       , ?_test(t_is_clean())
       , {spawn, ?_test(?debugVal(t_simple_prop()))}
       , {spawn, ?_test(?debugVal(t_simple_prop()))}
@@ -241,6 +247,55 @@ t_awaited_aggr_counter() ->
     end,
     end,
     ?assertMatch(3, gproc:get_value({a,l,c1})).
     ?assertMatch(3, gproc:get_value({a,l,c1})).
 
 
+t_simple_resource_count() ->
+    ?assert(gproc:reg({r,l,r1}, 1) =:= true),
+    ?assert(gproc:reg({rc,l,r1}) =:= true),
+    ?assert(gproc:get_value({rc,l,r1}) =:= 1),
+    P = self(),
+    P1 = spawn_link(fun() ->
+                            gproc:reg({r,l,r1}, 1),
+                            P ! {self(), ok},
+                            receive
+                                {P, goodbye} -> ok
+                            end
+                    end),
+    receive {P1, ok} -> ok end,
+    ?assert(gproc:get_value({rc,l,r1}) =:= 2),
+    P1 ! {self(), goodbye},
+    R = erlang:monitor(process, P1),
+    receive {'DOWN', R, _, _, _} ->
+            gproc:audit_process(P1)
+    end,
+    ?assert(gproc:get_value({rc,l,r1}) =:= 1).
+
+t_awaited_resource_count() ->
+    ?assert(gproc:reg({r,l,r1}, 3) =:= true),
+    ?assert(gproc:reg({r,l,r2}, 3) =:= true),
+    ?assert(gproc:reg({r,l,r3}, 3) =:= true),
+    gproc:nb_wait({rc,l,r1}),
+    ?assert(gproc:reg({rc,l,r1}) =:= true),
+    receive {gproc,_,registered,{{rc,l,r1},_,_}} -> ok
+    after 1000 ->
+            error(timeout)
+    end,
+    ?assertMatch(1, gproc:get_value({rc,l,r1})).
+
+t_resource_count_on_zero_send() ->
+    Me = self(),
+    ?assertMatch(true, gproc:reg({p,l,myp})),
+    ?assertMatch(true, gproc:reg({r,l,r1})),
+    ?assertMatch(true, gproc:reg({rc,l,r1}, 1, [{on_zero,
+                                                 [{send, {p,l,myp}}]}])),
+    ?assertMatch(1, gproc:get_value({rc,l,r1})),
+    ?assertMatch(true, gproc:unreg({r,l,r1})),
+    ?assertMatch(0, gproc:get_value({rc,l,r1})),
+    receive
+        {gproc, resource_on_zero, l, r1, Me} ->
+            ok
+    after 1000 ->
+            error(timeout)
+    end.
+
 t_update_counters() ->
 t_update_counters() ->
     ?assert(gproc:reg({c,l,c1}, 3) =:= true),
     ?assert(gproc:reg({c,l,c1}, 3) =:= true),
     ?assert(gproc:reg({a,l,c1}) =:= true),
     ?assert(gproc:reg({a,l,c1}) =:= true),