Browse Source

support awaiting aggr ctrs + minor optimizations

Ulf Wiger 10 years ago
parent
commit
03af3660db
6 changed files with 71 additions and 25 deletions
  1. 17 16
      src/gproc.erl
  2. 3 1
      src/gproc_bcast.erl
  3. 4 0
      src/gproc_int.hrl
  4. 7 5
      src/gproc_lib.erl
  5. 28 3
      test/gproc_dist_tests.erl
  6. 12 0
      test/gproc_tests.erl

+ 17 - 16
src/gproc.erl

@@ -635,7 +635,7 @@ await(Key) ->
 %% @spec await(Key::key(), Timeout) -> {pid(),Value}
 %%   Timeout = integer() | infinity
 %%
-%% @doc Wait for a local name to be registered.
+%% @doc Wait for a name or aggregated counter to be registered.
 %% The function raises an exception if the timeout expires. Timeout must be
 %% either an interger > 0 or 'infinity'.
 %% A small optimization: we first perform a lookup, to see if the name
@@ -650,7 +650,7 @@ await(Key, Timeout) ->
 %% @spec await(Node::node(), Key::key(), Timeout) -> {pid(),Value}
 %%   Timeout = integer() | infinity
 %%
-%% @doc Wait for a local name to be registered on `Node'.
+%% @doc Wait for a name or aggregated counter to be registered on `Node'.
 %% This function works exactly like {@link await/2}, but queries a remote
 %% node instead. An exception is thrown if `Node' cannot be reached. If gproc
 %% is not running on a given node, this is treated the same as the node being
@@ -662,11 +662,11 @@ await(Node, Key, Timeout) when Node == node() ->
 await(Node, Key, Timeout) when is_atom(Node) ->
     ?CATCH_GPROC_ERROR(await1(Node, Key, Timeout), [Node, Key, Timeout]).
 
-await1({n,g,_} = Key, Timeout) ->
+await1({T,g,_} = Key, Timeout) when T=:=n; T=:=a ->
     ?CHK_DIST,
     request_wait(Key, Timeout);
-await1({n,l,_} = Key, Timeout) ->
-    case ets:lookup(?TAB, {Key, n}) of
+await1({T,l,_} = Key, Timeout) when T=:=n; T=:=a ->
+    case ets:lookup(?TAB, {Key, T}) of
         [{_, Pid, Value}] ->
 	    case is_process_alive(Pid) of
 		true ->
@@ -698,7 +698,7 @@ request_wait({_,g,_} = Key, Timeout) ->
 request_wait(Key, Timeout) ->
     request_wait(node(), Key, Timeout).
 
-request_wait(N, {n,C,_} = Key, Timeout) when C==l; C==g ->
+request_wait(N, {_,C,_} = Key, Timeout) when C==l; C==g ->
     TRef = case Timeout of
                infinity -> no_timer;
                T when is_integer(T), T > 0 ->
@@ -741,7 +741,7 @@ request_wait(N, {n,C,_} = Key, Timeout) when C==l; C==g ->
 wide_await(Nodes, Key, Timeout) ->
     ?CATCH_GPROC_ERROR(wide_await1(Nodes, Key, Timeout), [Nodes, Key, Timeout]).
 
-wide_await1(Nodes, {n,l,_} = Key, Timeout) ->
+wide_await1(Nodes, {T,l,_} = Key, Timeout) when T=:=n; T=:=a ->
     {_, Ref} = spawn_monitor(fun() ->
 				     wide_request_wait(Nodes, Key, Timeout)
 			     end),
@@ -758,7 +758,7 @@ wide_await1(_, _, _) ->
     ?THROW_GPROC_ERROR(badarg).
 
 
-wide_request_wait(Nodes, {n,l,_} = Key, Timeout) ->
+wide_request_wait(Nodes, {Tk,l,_} = Key, Timeout) when Tk=:=n; Tk=:=a ->
         TRef = case Timeout of
                infinity -> no_timer;
                T when is_integer(T), T > 0 ->
@@ -771,8 +771,8 @@ wide_request_wait(Nodes, {n,l,_} = Key, Timeout) ->
 	     fun(Node) ->
 		     S = {?MODULE, Node},
 		     Ref = erlang:monitor(process, S),
-		     catch erlang:send(S, {'$gen_call', {self(), Ref}, Req},
-				       [noconnect]),
+		     ?MAY_FAIL(erlang:send(S, {'$gen_call', {self(), Ref}, Req},
+                                           [noconnect])),
 		     {Node, Ref}
 	     end, Nodes),
     collect_replies(Refs, Key, TRef).
@@ -798,7 +798,7 @@ collect_replies(Refs, Key, TRef) ->
 
 %% @spec nb_wait(Key::key()) -> Ref
 %%
-%% @doc Wait for a local name to be registered.
+%% @doc Wait for a name or aggregated counter to be registered.
 %% The caller can expect to receive a message,
 %% {gproc, Ref, registered, {Key, Pid, Value}}, once the name is registered.
 %% @end
@@ -808,23 +808,24 @@ nb_wait(Key) ->
 
 %% @spec nb_wait(Node::node(), Key::key()) -> Ref
 %%
-%% @doc Wait for a local name to be registered on `Node'.
+%% @doc Wait for a name or aggregated counter to be registered on `Node'.
 %% The caller can expect to receive a message,
 %% {gproc, Ref, registered, {Key, Pid, Value}}, once the name is registered.
 %% @end
 %%
-nb_wait(Node, {n,l,_} = Key) when is_atom(Node) ->
+nb_wait(Node, Key) ->
     ?CATCH_GPROC_ERROR(nb_wait1(Node, Key), [Node, Key]).
 
-nb_wait1({n,g,_} = Key) ->
+nb_wait1({T,g,_} = Key) when T=:=n; T=:=a ->
     ?CHK_DIST,
     call({await, Key, self()}, g);
-nb_wait1({n,l,_} = Key) ->
+nb_wait1({T,l,_} = Key) when T=:=n; T=:=a ->
     call({await, Key, self()}, l);
 nb_wait1(_) ->
     ?THROW_GPROC_ERROR(badarg).
 
-nb_wait1(Node, {n,l,_} = Key) when is_atom(Node) ->
+nb_wait1(Node, {T,l,_} = Key) when is_atom(Node), T=:=n;
+                                   is_atom(Node), T=:=a ->
     call(Node, {await, Key, self()}, l).
 
 

+ 3 - 1
src/gproc_bcast.erl

@@ -35,6 +35,8 @@
 	 terminate/2,
 	 code_change/3]).
 
+-include("gproc_int.hrl").
+
 start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
@@ -45,7 +47,7 @@ handle_call(_, _, S) ->
     {reply, {error, unknown_call}, S}.
 
 handle_cast({send, Key, Msg}, S) ->
-    catch gproc:send(Key, Msg),
+    ?MAY_FAIL(gproc:send(Key, Msg)),
     {noreply, S};
 handle_cast(_, S) ->
     {noreply, S}.

+ 4 - 0
src/gproc_int.hrl

@@ -25,3 +25,7 @@
 	end).
 
 -define(THROW_GPROC_ERROR(E), throw({gproc_error, E})).
+
+%% Used to wrap operations that may fail, but we ignore the exception.
+%% Use instead of catch, to avoid building a stacktrace unnecessarily.
+-define(MAY_FAIL(Expr), try (Expr) catch _:_ -> '$caught_exception' end).

+ 7 - 5
src/gproc_lib.erl

@@ -81,7 +81,9 @@ insert_reg({T,_,Name} = K, Value, Pid, Scope, Event) when T==a; T==n ->
             MaybeScan();
         false ->
             if T==n; T==a ->
-                    maybe_waiters(K, Pid, Value, T, Event);
+                    Res = maybe_waiters(K, Pid, Value, T, Event),
+                    MaybeScan(),
+                    Res;
                true ->
                     false
             end
@@ -442,13 +444,13 @@ do_set_value({T,_,_} = Key, Value, Pid) ->
 	    T==n orelse T==a -> T;
 	    true -> Pid
          end,
-    case (catch ets:lookup_element(?TAB, {Key,K2}, 2)) of
-        {'EXIT', {badarg, _}} ->
-            false;
+    try ets:lookup_element(?TAB, {Key,K2}, 2) of
         Pid ->
             ets:insert(?TAB, {{Key, K2}, Pid, Value});
         _ ->
             false
+    catch
+        error:_ -> false
     end.
 
 do_set_counter_value({_,C,N} = Key, Value, Pid) ->
@@ -510,7 +512,7 @@ expand_ops(_) ->
 
 
 update_aggr_counter(C, N, Val) ->
-    catch ets:update_counter(?TAB, {{a,C,N},a}, {3, Val}).
+    ?MAY_FAIL(ets:update_counter(?TAB, {{a,C,N},a}, {3, Val})).
 
 scan_existing_counters(Ctxt, Name) ->
     Head = {{{c,Ctxt,Name},'_'},'_','$1'},

+ 28 - 3
test/gproc_dist_tests.erl

@@ -61,6 +61,9 @@ dist_test_() ->
                                        ?debugVal(t_aggr_counter(Ns))
                                end,
                                fun() ->
+                                       ?debugVal(t_awaited_aggr_counter(Ns))
+                               end,
+                               fun() ->
                                        ?debugVal(t_update_counters(Ns))
                                end,
                                fun() ->
@@ -201,6 +204,29 @@ t_aggr_counter([H1,H2|_] = Ns) ->
     ?assertMatch(ok, t_call(Pc2, die)),
     ?assertMatch(ok, t_call(Pa, die)).
 
+t_awaited_aggr_counter([H1,H2|_] = Ns) ->
+    {c,g,Nm} = Ctr = ?T_COUNTER,
+    Aggr = {a,g,Nm},
+    Pc1 = t_spawn_reg(H1, Ctr, 3),
+    P = t_spawn(H2),
+    Ref = erlang:monitor(process, P),
+    P ! {self(), Ref, {apply, gproc, await, [Aggr]}},
+    t_sleep(),
+    P1 = t_spawn_reg(H2, Aggr),
+    ?assert(P1 == receive
+                      {P, Ref, Res} ->
+                          element(1, Res);
+                      {'DOWN', Ref, _, _, Reason} ->
+                          erlang:error(Reason);
+                      Other ->
+                          erlang:error({received, Other})
+                  end),
+    ?assertMatch(ok, t_read_everywhere(Aggr, P1, Ns, 3)),
+    ?assertMatch(ok, t_call(Pc1, die)),
+    ?assertMatch(ok, t_call(P, die)),
+    flush_down(Ref),
+    ?assertMatch(ok, t_call(P1, die)).
+
 t_update_counters([H1,H2|_] = Ns) ->
     {c,g,N1} = C1 = ?T_COUNTER,
     A1 = {a,g,N1},
@@ -213,7 +239,6 @@ t_update_counters([H1,H2|_] = Ns) ->
     ?assertMatch(ok, t_read_everywhere(C1, P12, Ns, 2)),
     ?assertMatch(ok, t_read_everywhere(C2, P2, Ns, 1)),
     ?assertMatch(ok, t_read_everywhere(A1, Pa1, Ns, 4)),
-    ?debugFmt("code:which(gproc_dist) = ~p~n", [code:which(gproc_dist)]),
     ?assertMatch([{C1,P1, 3},
 		  {C1,P12,4},
 		  {C2,P2, 0}], t_call(P1, {apply, gproc, update_counters,
@@ -347,12 +372,12 @@ t_standby_monitor([A,B|_] = Ns) ->
     ?assertMatch({gproc,unreg,Ref1,Na}, got_msg(Pc, gproc)),
     ?assertMatch(ok, t_lookup_everywhere(Na, Ns, undefined)).
 
-t_follow_monitor([A,B|_] = Ns) ->
+t_follow_monitor([A,B|_]) ->
     Na = ?T_NAME,
     Pa = t_spawn(A, _Selective = true),
     Ref = t_call(Pa, {apply, gproc, monitor, [Na, follow]}),
     {gproc,unreg,Ref,Na} = got_msg(Pa),
-    Pb = t_spawn_reg(A, Na),
+    Pb = t_spawn_reg(B, Na),
     {gproc,registered,Ref,Na} = got_msg(Pa),
     ok = t_call(Pb, die),
     ok = t_call(Pa, die).

+ 12 - 0
test/gproc_tests.erl

@@ -84,6 +84,8 @@ reg_test_() ->
       , ?_test(t_is_clean())
       , {spawn, ?_test(?debugVal(t_simple_aggr_counter()))}
       , ?_test(t_is_clean())
+      , {spawn, ?_test(?debugVal(t_awaited_aggr_counter()))}
+      , ?_test(t_is_clean())
       , {spawn, ?_test(?debugVal(t_update_counters()))}
       , ?_test(t_is_clean())
       , {spawn, ?_test(?debugVal(t_simple_prop()))}
@@ -229,6 +231,16 @@ t_simple_aggr_counter() ->
     end,
     ?assert(gproc:get_value({a,l,c1}) =:= 7).
 
+t_awaited_aggr_counter() ->
+    ?assert(gproc:reg({c,l,c1}, 3) =:= true),
+    gproc:nb_wait({a,l,c1}),
+    ?assert(gproc:reg({a,l,c1}) =:= true),
+    receive {gproc,_,registered,{{a,l,c1},_,_}} -> ok
+    after 1000 ->
+            error(timeout)
+    end,
+    ?assertMatch(3, gproc:get_value({a,l,c1})).
+
 t_update_counters() ->
     ?assert(gproc:reg({c,l,c1}, 3) =:= true),
     ?assert(gproc:reg({a,l,c1}) =:= true),