Browse Source

w.i.p. Add resource counting entries

Ulf Wiger 10 years ago
parent
commit
f22155521f
2 changed files with 186 additions and 71 deletions
  1. 79 42
      src/gproc.erl
  2. 107 29
      src/gproc_lib.erl

+ 79 - 42
src/gproc.erl

@@ -960,6 +960,10 @@ reg1({c,l,_} = Key, Value) when is_integer(Value) ->
     call({reg, Key, Value});
 reg1({n,l,_} = Key, Value) ->
     call({reg, Key, Value});
+reg1({r,l,_} = Key, Value) ->
+    call({reg, Key, Value});
+reg1({rc,l,_} = Key, Value) ->
+    call({reg, Key, Value});
 reg1(_, _) ->
     ?THROW_GPROC_ERROR(badarg).
 
@@ -992,7 +996,7 @@ reg_or_locate({n,_,_} = Key, Value, F) when is_function(F, 0) ->
 reg_or_locate1({_,g,_} = Key, Value, P) ->
     ?CHK_DIST,
     gproc_dist:reg_or_locate(Key, Value, P);
-reg_or_locate1({n,l,_} = Key, Value, P) ->
+reg_or_locate1({T,l,_} = Key, Value, P) when T==n; T==a; T==rc ->
     call({reg_or_locate, Key, Value, P});
 reg_or_locate1(_, _, _) ->
     ?THROW_GPROC_ERROR(badarg).
@@ -1040,6 +1044,8 @@ reg_shared1({c,l,_} = Key, Value) when is_integer(Value) ->
     call({reg_shared, Key, Value});
 reg_shared1({p,l,_} = Key, Value) ->
     call({reg_shared, Key, Value});
+reg_shared1({rc,l,_} = Key, undefined) ->
+    call({reg_shared, Key, undefined});
 reg_shared1(_, _) ->
     ?THROW_GPROC_ERROR(badarg).
 
@@ -1117,8 +1123,8 @@ unreg1(Key) ->
         {_, g, _} ->
             ?CHK_DIST,
             gproc_dist:unreg(Key);
-        {T, l, _} when T == n;
-                       T == a -> call({unreg, Key});
+        {T, l, _} when T == n; T == a; T == r; T == rc ->
+            call({unreg, Key});
         {_, l, _} ->
             case ets:member(?TAB, {Key,self()}) of
                 true ->
@@ -1166,7 +1172,8 @@ unreg_shared1(Key) ->
             gproc_dist:unreg_shared(Key);
         {T, l, _} when T == c;
                        T == a;
-		       T == p -> call({unreg_shared, Key});
+		       T == p;
+                       T == rc -> call({unreg_shared, Key});
         _ ->
 	    ?THROW_GPROC_ERROR(badarg)
     end.
@@ -1328,7 +1335,8 @@ set_value(Key, Value) ->
 %%
 set_value_shared({T,_,_} = Key, Value) when T == c;
 					    T == a;
-					    T == p->
+					    T == p;
+                                            T == r ->
     ?CATCH_GPROC_ERROR(set_value_shared1(Key, Value), [Key, Value]).
 
 set_value1({_,g,_} = Key, Value) ->
@@ -1387,7 +1395,7 @@ get_value(Key, Pid) ->
     ?CATCH_GPROC_ERROR(get_value1(Key, Pid), [Key, Pid]).
 
 get_value1({T,_,_} = Key, Pid) when is_pid(Pid) ->
-    if T==n orelse T==a ->
+    if T==n; T==a; T==rc ->
             case ets:lookup(?TAB, {Key, T}) of
                 [{_, P, Value}] when P == Pid -> Value;
                 _ -> ?THROW_GPROC_ERROR(badarg)
@@ -1395,11 +1403,13 @@ get_value1({T,_,_} = Key, Pid) when is_pid(Pid) ->
        true ->
             ets:lookup_element(?TAB, {Key, Pid}, 3)
     end;
-get_value1({T,_,_} = K, shared) when T==c; T==a; T==p ->
+get_value1({T,_,_} = K, shared) when T==c; T==a; T==p; T==r ->
     Key = case T of
-	      c -> {K, shared};
-	      p -> {K, shared};
-	      a -> {K, a}
+	      c  -> {K, shared};
+	      p  -> {K, shared};
+              r  -> {K, shared};
+	      a  -> {K, a};
+              rc -> {K, rc}
 	  end,
     case ets:lookup(?TAB, Key) of
 	[{_, shared, Value}] -> Value;
@@ -1418,9 +1428,9 @@ get_value1(_, _) ->
 %% @end
 get_attribute(Key, A) ->
     Pid = case Key of
-	      {T,_,_} when T==n; T==a ->
+	      {T,_,_} when T==n; T==a; T==rc ->
 		  where(Key);
-	      {T,_,_} when T==p; T==c ->
+	      {T,_,_} when T==p; T==c; T==r ->
 		  self()
 	  end,
     ?CATCH_GPROC_ERROR(get_attribute1(Key, Pid, A), [Key, A]).
@@ -1508,7 +1518,7 @@ lookup_pid({_T,_,_} = Key) ->
 %% This function raises a `badarg' exception if `Key' is not registered.
 %% @end
 lookup_value({T,_,_} = Key) ->
-    if T==n orelse T==a ->
+    if T==n orelse T==a orelse T==rc ->
             ets:lookup_element(?TAB, {Key,T}, 3);
        true ->
             erlang:error(badarg)
@@ -1527,7 +1537,7 @@ where(Key) ->
     ?CATCH_GPROC_ERROR(where1(Key), [Key]).
 
 where1({T,_,_}=Key) ->
-    if T==n orelse T==a ->
+    if T==n orelse T==a orelse T==rc ->
             case ets:lookup(?TAB, {Key,T}) of
                 [{_, P, _Value}] ->
                     case my_is_process_alive(P) of
@@ -1559,7 +1569,7 @@ whereis_name(Key) ->
 %% @end
 %%
 lookup_pids({T,_,_} = Key) ->
-    L = if T==n orelse T==a ->
+    L = if T==n orelse T==a orelse T==rc ->
                 ets:select(?TAB, [{{{Key,T}, '$1', '_'},
 				   [{is_pid, '$1'}], ['$1']}]);
            true ->
@@ -1587,7 +1597,7 @@ my_is_process_alive(_) ->
 %% @end
 %%
 lookup_values({T,_,_} = Key) ->
-    L = if T==n orelse T==a ->
+    L = if T==n orelse T==a orelse T==rc ->
                 ets:select(?TAB, [{{{Key,T}, '$1', '$2'},[],[{{'$1','$2'}}]}]);
            true ->
                 ets:select(?TAB, [{{{Key,'_'}, '$1', '$2'},[],[{{'$1','$2'}}]}])
@@ -1794,14 +1804,14 @@ send(Key, Msg) ->
     ?CATCH_GPROC_ERROR(send1(Key, Msg), [Key, Msg]).
 
 send1({T,C,_} = Key, Msg) when C==l; C==g ->
-    if T == n orelse T == a ->
+    if T == n orelse T == a orelse T == rc ->
             case ets:lookup(?TAB, {Key, T}) of
                 [{_, Pid, _}] ->
                     Pid ! Msg;
                 _ ->
                     ?THROW_GPROC_ERROR(badarg)
             end;
-       T==p orelse T==c ->
+       T==p orelse T==c orelse T==r ->
             %% BUG - if the key part contains select wildcards, we may end up
             %% sending multiple messages to the same pid
             lists:foreach(fun(Pid) ->
@@ -1837,7 +1847,7 @@ bcast(Key, Msg) ->
 bcast(Ns, Key, Msg) ->
     ?CATCH_GPROC_ERROR(bcast1(Ns, Key, Msg), [Key, Msg]).
 
-bcast1(Ns, {T,l,_} = Key, Msg) when T==p; T==a; T==c; T==n ->
+bcast1(Ns, {T,l,_} = Key, Msg) when T==p; T==a; T==c; T==n; T==r; T==rc ->
     send1(Key, Msg),
     gen_server:abcast(Ns -- [node()], gproc_bcast, {send, Key, Msg}),
     Msg.
@@ -2137,7 +2147,7 @@ handle_call({monitor, {T,l,_} = Key, Pid, Type}, _From, S)
 	end,
     {reply, Ref, S};
 handle_call({demonitor, {T,l,_} = Key, Ref, Pid}, _From, S)
-  when T==n; T==a ->
+  when T==n; T==a; T==rc ->
     _ = case where(Key) of
 	    undefined ->
 		ok;  % be nice
@@ -2163,6 +2173,7 @@ handle_call({unreg, {_,l,_} = Key}, {Pid,_}, S) ->
     case ets:lookup(?TAB, {Pid,Key}) of
         [{_, r}] ->
             _ = gproc_lib:remove_reg(Key, Pid, unreg, []),
+            flush_unregs(),
             {reply, true, S};
         [{_, Opts}] when is_list(Opts) ->
             _ = gproc_lib:remove_reg(Key, Pid, unreg, Opts),
@@ -2173,7 +2184,8 @@ handle_call({unreg, {_,l,_} = Key}, {Pid,_}, S) ->
 handle_call({unreg_shared, {_,l,_} = Key}, _, S) ->
     _ = case ets:lookup(?TAB, {shared, Key}) of
 	    [{_, r}] ->
-		_ = gproc_lib:remove_reg(Key, shared, unreg, []);
+		_ = gproc_lib:remove_reg(Key, shared, unreg, []),
+                flush_unregs();
 	    [{_, Opts}] ->
 		_ = gproc_lib:remove_reg(Key, shared, unreg, Opts);
 	    [] ->
@@ -2200,6 +2212,7 @@ handle_call({mreg, T, l, L}, {Pid,_}, S) ->
     end;
 handle_call({munreg, T, l, L}, {Pid,_}, S) ->
     _ = gproc_lib:remove_many(T, l, L, Pid),
+    flush_unregs(),
     {reply, true, S};
 handle_call({set, {_,l,_} = Key, Value}, {Pid,_}, S) ->
     case gproc_lib:do_set_value(Key, Value, Pid) of
@@ -2369,23 +2382,14 @@ process_is_down(Pid) when is_pid(Pid) ->
                       [{_, _, Value}] = ets:lookup(?TAB, Key),
                       ets:delete(?TAB, Key),
                       gproc_lib:update_aggr_counter(l, C, -Value);
+                 ({{r,l,Rsrc} = K, _}) ->
+                      Key = {K, Pid},
+                      ets:delete(?TAB, Key),
+                      update_refcount(l, Rsrc, -1);
+                 ({{rc,l,_} = K, R}) ->
+                      remove_aggregate(rc, K, R, Pid);
                  ({{a,l,_} = K, R}) ->
-                      case ets:lookup(?TAB, {K,a}) of
-                          [{_, Pid, V}] ->
-                              ets:delete(?TAB, {K,a}),
-                              opt_notify(R, K, Pid, V);
-                          [{_, OtherPid, _}] when Pid =/= OtherPid ->
-                              case ets:lookup(?TAB, {OtherPid, K}) of
-                                  [{RK, Opts}] when is_list(Opts) ->
-                                      Opts1 = gproc_lib:remove_monitor_pid(
-                                                Opts, Pid),
-                                      ets:insert(?TAB, {RK, Opts1});
-                                  _ ->
-                                      true
-                              end;
-                          [] ->
-                              opt_notify(R, K, Pid, undefined)
-                      end;
+                      remove_aggregate(a, K, R, Pid);
                  ({{p,_,_} = K, _}) ->
                       ets:delete(?TAB, {K, Pid})
               end, Revs),
@@ -2394,6 +2398,37 @@ process_is_down(Pid) when is_pid(Pid) ->
             ok
     end.
 
+update_refcount(C, R, V) ->
+    gproc_lib:update_refcount(C, R, V),
+    flush_unregs().
+
+flush_unregs() ->
+    receive
+        {gproc_unreg, _} ->
+            flush_unregs()
+    after 0 ->
+            ok
+    end.
+
+remove_aggregate(T, K, R, Pid) ->
+    case ets:lookup(?TAB, {K,T}) of
+        [{_, Pid, V}] ->
+            ets:delete(?TAB, {K,T}),
+            opt_notify(R, K, Pid, V);
+        [{_, OtherPid, _}] when Pid =/= OtherPid ->
+            case ets:lookup(?TAB, {OtherPid, K}) of
+                [{RK, Opts}] when is_list(Opts) ->
+                    Opts1 = gproc_lib:remove_monitor_pid(
+                              Opts, Pid),
+                    ets:insert(?TAB, {RK, Opts1});
+                _ ->
+                    true
+            end;
+        [] ->
+            opt_notify(R, K, Pid, undefined)
+    end.
+
+
 opt_notify(r, _, _, _) ->
     ok;
 opt_notify(Opts, {T,_,_} = Key, Pid, Value) ->
@@ -2442,7 +2477,7 @@ pick_standby([]) ->
 
 
 
-do_give_away({T,l,_} = K, To, Pid) when T==n; T==a ->
+do_give_away({T,l,_} = K, To, Pid) when T==n; T==a; T==rc ->
     Key = {K, T},
     case ets:lookup(?TAB, Key) of
         [{_, Pid, Value}] ->
@@ -2465,7 +2500,7 @@ do_give_away({T,l,_} = K, To, Pid) when T==n; T==a ->
         _ ->
             badarg
     end;
-do_give_away({T,l,_} = K, To, Pid) when T==c; T==p ->
+do_give_away({T,l,_} = K, To, Pid) when T==c; T==p; T==r ->
     Key = {K, Pid},
     case ets:lookup(?TAB, Key) of
         [{_, Pid, Value}] ->
@@ -2494,7 +2529,7 @@ do_give_away({T,l,_} = K, To, Pid) when T==c; T==p ->
 
 pid_to_give_away_to(P) when is_pid(P), node(P) == node() ->
     P;
-pid_to_give_away_to({T,l,_} = Key) when T==n; T==a ->
+pid_to_give_away_to({T,l,_} = Key) when T==n; T==a; T==rc ->
     case ets:lookup(?TAB, {Key, T}) of
         [{_, Pid, _}] ->
             Pid;
@@ -2584,7 +2619,7 @@ headpat(T, C, V1,V2,V3) ->
     Rf = fun(Pos) ->
                  {element,Pos,{element,1,{element,1,'$_'}}}
          end,
-    K2 = if T==n orelse T==a -> T;
+    K2 = if T==n orelse T==a orelse T==rc -> T;
             true -> '_'
          end,
     {Kp,Vars} = case V1 of
@@ -2633,8 +2668,10 @@ type(all)   -> '_';
 type(T) when T==n; T==p; T==c; T==a -> T;
 type(names) -> n;
 type(props) -> p;
+type(resources) -> r;
 type(counters) -> c;
-type(aggr_counters) -> a.
+type(aggr_counters) -> a;
+type(resource_counters) -> rc.
 
 rev_keypat(Context) ->
     {S,T} = get_s_t(Context),

+ 107 - 29
src/gproc_lib.erl

@@ -66,28 +66,17 @@ dbg(Mods) ->
 insert_reg(K, Value, Pid, Scope) ->
     insert_reg(K, Value, Pid, Scope, registered).
 
-insert_reg({T,_,Name} = K, Value, Pid, Scope, Event) when T==a; T==n ->
-    MaybeScan = fun() ->
-                        if T==a ->
-                                Initial = scan_existing_counters(Scope, Name),
-                                ets:insert(?TAB, {{K,a}, Pid, Initial});
-                           true ->
-                                true
-                        end
-                end,
-    case ets:insert_new(?TAB, {{K,T}, Pid, Value}) of
-        true ->
-            _ = ets:insert_new(?TAB, {{Pid,K}, []}),
-            MaybeScan();
-        false ->
-            if T==n; T==a ->
-                    Res = maybe_waiters(K, Pid, Value, T, Event),
-                    MaybeScan(),
-                    Res;
-               true ->
-                    false
-            end
-    end;
+insert_reg({T,_,Name} = K, Value, Pid, Scope, Event) when T==a; T==n; T==rc ->
+    Res = case ets:insert_new(?TAB, {{K,T}, Pid, Value}) of
+              true ->
+                  %% Use insert_new to avoid overwriting existing entry
+                  _ = ets:insert_new(?TAB, {{Pid,K}, []}),
+                  true;
+              false ->
+                  maybe_waiters(K, Pid, Value, T, Event)
+          end,
+    maybe_scan(T, Pid, Scope, Name, K),
+    Res;
 insert_reg({p,Scope,_} = K, Value, shared, Scope, _E)
   when Scope == g; Scope == l ->
     %% shared properties are unique
@@ -105,12 +94,31 @@ insert_reg({c,Scope,Ctr} = Key, Value, Pid, Scope, _E) when Scope==l; Scope==g -
             ignore
     end,
     Res;
+insert_reg({r,Scope,R} = Key, Value, Pid, Scope, _E) when Scope==l; Scope==g ->
+    K = {Key, Pid},
+    Kr = {Pid, Key},
+    Res = ets:insert_new(?TAB, [{K, Pid, Value}, {Kr, [{initial, Value}]}]),
+    case Res of
+        true ->
+            update_resource_count(Scope, R, 1);
+        false ->
+            ignore
+    end,
+    Res;
 insert_reg({_,_,_} = Key, Value, Pid, _Scope, _E) when is_pid(Pid) ->
     %% Non-unique keys; store Pid in the key part
     K = {Key, Pid},
     Kr = {Pid, Key},
     ets:insert_new(?TAB, [{K, Pid, Value}, {Kr, []}]).
 
+maybe_scan(a, Pid, Scope, Name, K) ->
+    Initial = scan_existing_counters(Scope, Name),
+    ets:insert(?TAB, {{K,a}, Pid, Initial});
+maybe_scan(rc, Pid, Scope, Name, K) ->
+    Initial = scan_existing_resources(Scope, Name),
+    ets:insert(?TAB, {{K,rc}, Pid, Initial});
+maybe_scan(_, _, _, _, _) ->
+    true.
 
 insert_attr({_,Scope,_} = Key, Attrs, Pid, Scope) when Scope==l;
 						       Scope==g ->
@@ -127,6 +135,24 @@ insert_attr({_,Scope,_} = Key, Attrs, Pid, Scope) when Scope==l;
 	    false
     end.
 
+get_attr(Attr, Pid, {_,_,_} = Key, Default) ->
+    case ets:lookup(?TAB, {Pid, Key}) of
+        [{_, Opts}] when is_list(Opts) ->
+            case lists:keyfind(attrs, 1, Opts) of
+                {_, Attrs} ->
+                    case lists:keyfind(Attr, 1, Attrs) of
+                        {_, Val} ->
+                            Val;
+                        _ ->
+                            Default
+                    end;
+                _ ->
+                    Default
+            end;
+        _ ->
+            Default
+    end.
+
 -spec insert_many(type(), scope(), [{key(),any()}], pid()) ->
           {true,list()} | false.
 
@@ -424,11 +450,11 @@ unreg_opts(Key, Pid) ->
 remove_reg_1({c,_,_} = Key, Pid) ->
     remove_counter_1(Key, ets:lookup_element(?TAB, Reg = {Key,Pid}, 3), Pid),
     Reg;
-remove_reg_1({a,_,_} = Key, _Pid) ->
-    ets:delete(?TAB, Reg = {Key,a}),
+remove_reg_1({r,_,_} = Key, Pid) ->
+    remove_resource_1(Key, ets:lookup_element(?TAB, Reg = {Key,Pid}, 3), Pid),
     Reg;
-remove_reg_1({n,_,_} = Key, _Pid) ->
-    ets:delete(?TAB, Reg = {Key,n}),
+remove_reg_1({T,_,_} = Key, _Pid) when T==a; T==n; T==rc ->
+    ets:delete(?TAB, Reg = {Key,T}),
     Reg;
 remove_reg_1({_,_,_} = Key, Pid) ->
     ets:delete(?TAB, Reg = {Key, Pid}),
@@ -439,9 +465,14 @@ remove_counter_1({c,C,N} = Key, Val, Pid) ->
     update_aggr_counter(C, N, -Val),
     Res.
 
+remove_resource_1({r,C,N} = Key, _, Pid) ->
+    Res = ets:delete(?TAB, {Key, Pid}),
+    update_resource_count(C, N, -1),
+    Res.
+
 do_set_value({T,_,_} = Key, Value, Pid) ->
     K2 = if Pid == shared -> shared;
-	    T==n orelse T==a -> T;
+	    T==n orelse T==a orelse T==rc -> T;
 	    true -> Pid
          end,
     try ets:lookup_element(?TAB, {Key,K2}, 2) of
@@ -480,6 +511,7 @@ update_counter({T,l,Ctr} = Key, {Incr, Threshold, SetValue}, Pid)
     end,
     New;
 update_counter({T,l,Ctr} = Key, Ops, Pid) when is_list(Ops), T==c;
+                                               is_list(Ops), T==r;
 					       is_list(Ops), T==n ->
     case ets:update_counter(?TAB, {Key, Pid},
 			    [{3, 0} | expand_ops(Ops)]) of
@@ -507,18 +539,64 @@ expand_ops([]) ->
 expand_ops(_) ->
     ?THROW_GPROC_ERROR(badarg).
 
+update_aggr_counter(C, N, Val) ->
+    ?MAY_FAIL(ets:update_counter(?TAB, {{a,C,N},a}, {3, Val})).
 
+update_resource_count(C, N, Val) ->
+    try ets:update_counter(?TAB, {{rc,C,N},rc}, {3, Val}) of
+        0 ->
+            resource_count_zero(C, N);
+        _ ->
+            ok
+    catch
+        _:_ -> ok
+    end.
+
+resource_count_zero(C, N) ->
+    case ets:lookup(?TAB, {K = {rc,C,N},rc}) of
+        [{_, Pid, _}] ->
+            case get_attr(on_zero, Pid, K, undefined) of
+                undefined -> ok;
+                Actions ->
+                    perform_on_zero(Actions, C, N, Pid)
+            end;
+        _ -> ok
+    end.
+
+perform_on_zero(Actions, C, N, Pid) ->
+    lists:foreach(
+      fun(A) ->
+              perform_on_zero_(A, C, N, Pid)
+      end, Actions).
+
+perform_on_zero_({send, ToProc}, C, N, Pid) ->
+    gproc:send(ToProc, {gproc, resource_on_zero, C, N, Pid}),
+    [];
+perform_on_zero_(publish, C, N, Pid) ->
+    gproc_ps:publish(C, gproc_resource_on_zero, {C, N, Pid}),
+    [];
+perform_on_zero_({unreg_shared, T,N}, C, _, _) ->
+    K = {T, C, N},
+    case ets:member(?TAB, {K, shared}) of
+        true ->
+            self() ! {gproc_unreg, remove_reg(K, shared, unreg)};
+        false ->
+            []
+    end;
+perform_on_zero_(_, _, _, _) ->
+    [].
 
 
 
-update_aggr_counter(C, N, Val) ->
-    ?MAY_FAIL(ets:update_counter(?TAB, {{a,C,N},a}, {3, Val})).
 
 scan_existing_counters(Ctxt, Name) ->
     Head = {{{c,Ctxt,Name},'_'},'_','$1'},
     Cs = ets:select(?TAB, [{Head, [], ['$1']}]),
     lists:sum(Cs).
 
+scan_existing_resources(Ctxt, Name) ->
+    Head = {{{r,Ctxt,Name},'_'},'_','_'},
+    ets:select_count(?TAB, [{Head, [], [true]}]).
 
 valid_opts(Type, Default) ->
     Opts = get_app_env(Type, Default),