Browse Source

Change return value of update_counters/1; add gproc_ps:notify_single_if_true

update_counters(Cs) now returns [{Ctr,Pid,NewValue}] instead of just [NewValue]

gproc_ps:notify_single_if_true(Scope,Event,Test,Msg) enables/creates a
single-shot subscription and immediately executes a

  tell_singles(Scope,Event,Msg)

for that subscriber only if Test() -> true. Otherwise, the subscription will
stay active until a message is published for Event 'the normal way'.
Ulf Wiger 13 years ago
parent
commit
f9dcbb4f73
6 changed files with 50 additions and 14 deletions
  1. 6 2
      doc/gproc.md
  2. 9 4
      src/gproc.erl
  3. 3 3
      src/gproc_dist.erl
  4. 24 1
      src/gproc_ps.erl
  5. 4 2
      test/gproc_dist_tests.erl
  6. 4 2
      test/gproc_tests.erl

+ 6 - 2
doc/gproc.md

@@ -1388,7 +1388,7 @@ that the position is omitted; in gproc, the value position is always `3`.<a name
 
 
 
-<pre>update_counters(X1::<a href="#type-scope">scope()</a>, Cs::[{<a href="#type-key">key()</a>, pid(), <a href="#type-increment">increment()</a>}]) -> [integer()]</pre>
+<pre>update_counters(X1::<a href="#type-scope">scope()</a>, Cs::[{<a href="#type-key">key()</a>, pid(), <a href="#type-increment">increment()</a>}]) -> [{<a href="#type-key">key()</a>, pid(), integer()}]</pre>
 <br></br>
 
 
@@ -1398,9 +1398,13 @@ that the position is omitted; in gproc, the value position is always `3`.<a name
 
 Update a list of counters
 
+
+
 This function is not atomic, except (in a sense) for global counters. For local counters,
 it is more of a convenience function. For global counters, it is much more efficient
-than calling `gproc:update_counter/2` for each individual counter.<a name="update_shared_counter-2"></a>
+than calling `gproc:update_counter/2` for each individual counter.
+
+The return value is the corresponding list of `[{Counter, Pid, NewValue}]`.<a name="update_shared_counter-2"></a>
 
 ###update_shared_counter/2##
 

+ 9 - 4
src/gproc.erl

@@ -1226,17 +1226,22 @@ update_counter1(_, _) ->
 %% This function is not atomic, except (in a sense) for global counters. For local counters,
 %% it is more of a convenience function. For global counters, it is much more efficient
 %% than calling `gproc:update_counter/2' for each individual counter.
+%%
+%% The return value is the corresponding list of `[{Counter, Pid, NewValue}]'.
 %% @end
--spec update_counters(scope(), [{key(), pid(), increment()}]) -> [integer()].
-update_counters(l, Cs) ->
+-spec update_counters(scope(), [{key(), pid(), increment()}]) ->
+			     [{key(), pid(), integer()}].
+update_counters(_, []) ->
+    [];
+update_counters(l, [_|_] = Cs) ->
     ?CATCH_GPROC_ERROR(update_counters1(Cs), [Cs]);
-update_counters(g, Cs) ->
+update_counters(g, [_|_] = Cs) ->
     ?CHK_DIST,
     gproc_dist:update_counters(Cs).
 
 
 update_counters1([{{c,l,_} = Key, Pid, Incr}|T]) ->
-    [gproc_lib:update_counter(Key, Incr, Pid)|update_counters1(T)];
+    [{Key, Pid, gproc_lib:update_counter(Key, Incr, Pid)}|update_counters1(T)];
 update_counters1([]) ->
     [];
 update_counters1(_) ->

+ 3 - 3
src/gproc_dist.erl

@@ -653,10 +653,10 @@ batch_update_counters(Cs) ->
 batch_update_counters([{{c,g,_} = Key, Pid, Incr}|T], Returns, Updates) ->
     case update_counter_g(Key, Incr, Pid) of
 	[{_,_,_} = A, {_, _, V} = C] ->
-	    batch_update_counters(T, [V|Returns], add_object(
-						    A, add_object(C, Updates)));
+	    batch_update_counters(T, [{Key,Pid,V}|Returns], add_object(
+							      A, add_object(C, Updates)));
 	[{_, _, V} = C] ->
-	    batch_update_counters(T, [V|Returns], add_object(C, Updates))
+	    batch_update_counters(T, [{Key,Pid,V}|Returns], add_object(C, Updates))
     end;
 batch_update_counters([], Returns, Updates) ->
     {lists:reverse(Returns), Updates}.

+ 24 - 1
src/gproc_ps.erl

@@ -45,6 +45,7 @@
 	 disable_single/2,
 	 enable_single/2,
 	 tell_singles/3,
+	 notify_single_if_true/4,
 	 list_singles/2]).
 
 -define(ETag, gproc_ps_event).
@@ -177,7 +178,7 @@ tell_singles(Scope, Event, Msg) when Scope==l; Scope==g ->
 	     {Scope,c},
 	     [{ {{c,Scope,{?ETag,Event}}, '$1', 1}, [],
 		[{{ {{c,Scope, {{?ETag,Event}} }}, '$1', {{-1,0,0}} }}] }]),
-    gproc:update_counters(Scope, Subs),
+    _ = gproc:update_counters(Scope, Subs),
     [begin P ! {?ETag, Event, Msg}, P end || {_,P,_} <- Subs].
 
 -spec list_singles(scope(), event()) -> [{pid(), status()}].
@@ -186,3 +187,25 @@ tell_singles(Scope, Event, Msg) when Scope==l; Scope==g ->
 list_singles(Scope, Event) ->
     gproc:select({Scope,c}, [{ {{c,Scope,{?ETag,Event}}, '$1', '$2'},
 			       [], [{{'$1','$2'}}] }]).
+
+-spec notify_single_if_true(scope(), event(), fun(() -> boolean()), msg()) -> ok.
+%% @doc Create/enable a single subscription for event; notify at once if F() -> true
+%%
+%% This function is a convenience function, wrapping a single-shot pub/sub around a
+%% user-provided boolean test. `Msg' should be what the publisher will send later, if the
+%% immediate test returns `false'.
+%% @end
+notify_single_if_true(Scope, Event, F, Msg) ->
+    try enable_single(Scope, Event)
+    catch
+	error:_ ->
+	    create_single(Scope, Event)
+    end,
+    case F() of
+	true ->
+	    disable_single(Scope, Event),
+	    self() ! {?ETag, Event, Msg},
+	    ok;
+	false ->
+	    ok
+    end.

+ 4 - 2
test/gproc_dist_tests.erl

@@ -156,8 +156,10 @@ t_update_counters([H1,H2|_] = Ns) ->
     ?assertMatch(ok, t_read_everywhere(C2, P2, Ns, 1)),
     ?assertMatch(ok, t_read_everywhere(A1, Pa1, Ns, 4)),
     ?debugFmt("code:which(gproc_dist) = ~p~n", [code:which(gproc_dist)]),
-    ?assertMatch([3,4,0], t_call(P1, {apply, gproc, update_counters,
-				      [g, [{C1,P1,1},{C1,P12,2},{C2,P2,{-2,0,0}}]]})),
+    ?assertMatch([{C1,P1, 3},
+		  {C1,P12,4},
+		  {C2,P2, 0}], t_call(P1, {apply, gproc, update_counters,
+					   [g, [{C1,P1,1},{C1,P12,2},{C2,P2,{-2,0,0}}]]})),
     ?assertMatch(ok, t_read_everywhere(C1, P1, Ns, 3)),
     ?assertMatch(ok, t_read_everywhere(C1, P12, Ns, 4)),
     ?assertMatch(ok, t_read_everywhere(C2, P2, Ns, 0)),

+ 4 - 2
test/gproc_tests.erl

@@ -174,8 +174,10 @@ t_update_counters() ->
 		    end),
     receive {P1, ok} -> ok end,
     ?assert(gproc:get_value({a,l,c1}) =:= 8),
-    ?assertEqual([7,8], gproc:update_counters(l, [{{c,l,c1}, self(), 4},
-						  {{c,l,c1}, P1, 3}])),
+    Me = self(),
+    ?assertEqual([{{c,l,c1},Me,7},
+		  {{c,l,c1},P1,8}], gproc:update_counters(l, [{{c,l,c1}, Me, 4},
+							      {{c,l,c1}, P1, 3}])),
     ?assert(gproc:get_value({a,l,c1}) =:= 15),
     P1 ! {self(), goodbye},
     R = erlang:monitor(process, P1),