Browse Source

Merge pull request #86 from uwiger/uw-pool-fixes

various fixes to gproc_pool
Ulf Wiger 10 years ago
parent
commit
7655bddc43
3 changed files with 138 additions and 32 deletions
  1. 20 8
      src/gproc.erl
  2. 15 3
      src/gproc_lib.erl
  3. 103 21
      src/gproc_pool.erl

+ 20 - 8
src/gproc.erl

@@ -1667,7 +1667,7 @@ update_counters1(_) ->
 
 
 %% @spec (Key) -> {ValueBefore, ValueAfter}
-%%   Key   = {c, Scope, Name}
+%%   Key   = {c, Scope, Name} | {n, Scope, Name}
 %%   Scope = l | g
 %%   ValueBefore = integer()
 %%   ValueAfter  = integer()
@@ -1678,22 +1678,34 @@ update_counters1(_) ->
 %% initial value. The reset operation is done using {@link update_counter/2},
 %% which allows for concurrent calls to {@link update_counter/2} without losing
 %% updates. Aggregated counters are updated accordingly.
+%%
+%% If `Key' refers to a unique name, the operation will depend on the value
+%% part of the registration being an integer(). While non-integer values are
+%% not permitted at all for counter objects, it is the user's responsibility to
+%% ensure that a name, on which `reset_counter/1' is to be performed, has the
+%% appropriate value type.
 %% @end
 %%
 reset_counter(Key) ->
     ?CATCH_GPROC_ERROR(reset_counter1(Key), [Key]).
 
-reset_counter1({c,g,_} = Key) ->
+reset_counter1({T,g,_} = Key) when T==c; T==n ->
     ?CHK_DIST,
     gproc_dist:reset_counter(Key);
+reset_counter1({n,l,_} = Key) ->
+    [{_, Pid, Current}] = ets:lookup(?TAB, {Key, n}),
+    {Current, update_counter(Key, get_initial(Pid, Key) - Current)};
 reset_counter1({c,l,_} = Key) ->
     Current = ets:lookup_element(?TAB, {Key, self()}, 3),
-    Initial = case ets:lookup(?TAB, {self(), Key}) of
-		  [{_, r}] -> 0;
-		  [{_, Opts}] ->
-		      proplists:get_value(initial, Opts, 0)
-	      end,
-    {Current, update_counter(Key, Initial - Current)}.
+    {Current, update_counter(Key, get_initial(self(), Key) - Current)}.
+
+get_initial(Pid, Key) ->
+    case ets:lookup(?TAB, {Pid, Key}) of
+	[{_, r}] -> 0;
+	[{_, Opts}] ->
+	    proplists:get_value(initial, Opts, 0)
+    end.
+
 
 %% @spec (Key::key(), Incr) -> integer() | [integer()]
 %%    Incr = IncrVal | UpdateOp | [UpdateOp]

+ 15 - 3
src/gproc_lib.erl

@@ -460,14 +460,22 @@ do_set_counter_value({_,C,N} = Key, Value, Pid) ->
 update_counter({T,l,Ctr} = Key, Incr, Pid) when is_integer(Incr), T==c;
 						is_integer(Incr), T==n ->
     Res = ets:update_counter(?TAB, {Key, Pid}, {3,Incr}),
-    update_aggr_counter(l, Ctr, Incr),
+    if T==c ->
+	    update_aggr_counter(l, Ctr, Incr);
+       true ->
+	    ok
+    end,
     Res;
 update_counter({T,l,Ctr} = Key, {Incr, Threshold, SetValue}, Pid)
   when is_integer(Incr), is_integer(Threshold), is_integer(SetValue), T==c;
        is_integer(Incr), is_integer(Threshold), is_integer(SetValue), T==n ->
     [Prev, New] = ets:update_counter(?TAB, {Key, Pid},
 				     [{3, 0}, {3, Incr, Threshold, SetValue}]),
-    update_aggr_counter(l, Ctr, New - Prev),
+    if T==c ->
+	    update_aggr_counter(l, Ctr, New - Prev);
+       true ->
+	    ok
+    end,
     New;
 update_counter({T,l,Ctr} = Key, Ops, Pid) when is_list(Ops), T==c;
 					       is_list(Ops), T==n ->
@@ -477,7 +485,11 @@ update_counter({T,l,Ctr} = Key, Ops, Pid) when is_list(Ops), T==c;
 	    [];
 	[Prev | Rest] ->
 	    [New | _] = lists:reverse(Rest),
-	    update_aggr_counter(l, Ctr, New - Prev),
+	    if T==c ->
+		    update_aggr_counter(l, Ctr, New - Prev);
+	       true ->
+		    ok
+	    end,
 	    Rest
     end;
 update_counter(_, _, _) ->

+ 103 - 21
src/gproc_pool.erl

@@ -73,6 +73,7 @@
          pick_worker/1,        % (Pool)
          pick_worker/2,        % (Pool, Value)
          claim/2,              % (Pool, Fun)
+	 claim/3,              % (Pool, Fun, Wait)
          log/1,                % (WorkerId)
          randomize/1]).        % (Pool)
 
@@ -85,7 +86,8 @@
          code_change/3]).
 
 -export([test/1, test/3, ptest/4, test_run/2, test_run1/2, test_run2/2,
-         test_run0/2, setup_test_pool/3, remove_test_pool/1]).
+         test_run0/2, setup_test_pool/3, setup_test_pool/4,
+	 remove_test_pool/1]).
 
 -define(POOL(Pool), {p,l,{?MODULE,Pool}}).
 -define(POOL_CUR(Pool), {c,l,{?MODULE,Pool,cur}}).
@@ -406,8 +408,15 @@ ret(Name, pid) ->
             false
     end.
 
+%% @equiv claim(Pool, F, nowait)
+claim(Pool, F) when is_function(F, 2) ->
+    claim(Pool, F, nowait).
 
-%% @spec claim(Pool::any(), Fun::function()) -> {true, Res} | false
+%% @spec claim(Pool, Fun, Wait) -> {true, Res} | false
+%%         Pool = any()
+%%         Fun  = function()
+%%         Wait = nowait | {busy_wait, integer()}
+%%
 %% @doc Picks the first available worker in the pool and applies `Fun'.
 %%
 %% A `claim' pool allows the caller to "claim" a worker during a short span
@@ -417,21 +426,49 @@ ret(Name, pid) ->
 %% The gproc name of the worker serves as a mutex, where its value is 0 (zero)
 %% if the worker is free, and 1 (one) if it is busy. The mutex operation is
 %% implemented using `gproc:update_counter/2'.
+%%
+%% `Wait == nowait' means that the call will return `false' immediately if
+%% there is no available worker.
+%%
+%% `Wait == {busy_wait, Timeout}' will keep repeating the claim attempt
+%% for `Timeout' milliseconds. If still no worker is available, it will
+%% return `false'.
 %% @end
-claim(Pool, F) when is_function(F, 2) ->
+claim(Pool, F, Wait) ->
     case gproc:get_value(?POOL(Pool), shared) of
-        {0, _} -> false;
-        {_, claim} ->
-            claim_(Pool, F);
-        _ ->
-            error(badarg)
+	{0, _} -> false;
+	{_, claim} ->
+	    W = setup_wait(Wait, Pool),
+	    claim_w(Pool, F, W);
+	_ ->
+	    error(badarg)
+    end.
+
+claim_w(_Pool, _F, timeout) ->
+    false;
+claim_w(Pool, F, W) ->
+    case claim_(Pool, F) of
+	false ->
+	    claim_w(Pool, F, do_wait(W));
+	Other ->
+	    clear_wait(W),
+	    Other
     end.
 
+%% Define how many workers to select in each chunk. We want to strike
+%% a good compromise between the cost of succeeding on the first try
+%% (likely a common event) and the cost of retrying. In my measurements,
+%% if a chunk size of 1 costs ca 30 us (on my Macbook), a chunk size of 5
+%% adds only ca 20% to the cost, i.e. a few us.
+-define(CLAIM_CHUNK, 5).
+
 claim_(Pool, F) ->
-    case gproc:select({l,n}, [{ {{n,l,[?MODULE,Pool,'$1','_']}, '_', 0}, [],
-                                [{{ {element,1,'$_'}, '$1' }}]}], 1) of
-        {[{K, Pid}], Cont} ->
-            case try_claim(K, Pid, F) of
+    %% Sorry, but we use ets:select/3 here in order to shave off a few us.
+    case ets:select(gproc, [{ {{{n,l,[?MODULE,Pool,'_','_']},n}, '$1', 0}, [],
+			      [{{ {element,1,{element,1,'$_'}}, '$1' }}]}],
+		     ?CLAIM_CHUNK) of
+        {[_|_] = Workers, Cont} ->
+            case try_claim(Workers, F) of
                 {true, _} = True ->
                     True;
                 false ->
@@ -441,10 +478,12 @@ claim_(Pool, F) ->
             false
     end.
 
+claim_cont('$end_of_table', _) ->
+    false;
 claim_cont(Cont, F) ->
-    case gproc:select(Cont) of
-        {[{K, Pid}], Cont1} ->
-            case try_claim(K, Pid, F) of
+    case ets:select(Cont) of
+        {[_|_] = Workers, Cont1} ->
+            case try_claim(Workers, F) of
                 {true, _} = True ->
                     True;
                 false ->
@@ -454,6 +493,16 @@ claim_cont(Cont, F) ->
             false
     end.
 
+try_claim([], _) ->
+    false;
+try_claim([{K,Pid}|T], F) ->
+    case try_claim(K, Pid, F) of
+	false ->
+	    try_claim(T, F);
+	Other ->
+	    Other
+    end.
+
 try_claim(K, Pid, F) ->
     case gproc:update_counter(K, [0, {1, 1, 1}]) of
         [0, 1] ->
@@ -461,13 +510,39 @@ try_claim(K, Pid, F) ->
             try  Res = F(K, Pid),
                  {true, Res}
             after
-                gproc:set_value(K, 0)
+                gproc:reset_counter(K)
             end;
         [1, 1] ->
             %% no
             false
     end.
 
+setup_wait(nowait, _) ->
+    nowait;
+setup_wait({busy_wait, MS}, Pool) ->
+    Ref = erlang:send_after(MS, self(), {claim, Pool}),
+    {busy_wait, Ref}.
+
+do_wait(nowait) ->
+    timeout;
+do_wait({busy_wait, Ref} = W) ->
+    %% Yielding here serves two purposes:
+    %% 1) Increase the chance that whoever's before us can finish
+    %% 2) The value of read_timer/1 only refreshes after yield (so I've heard)
+    erlang:yield(),
+    case erlang:read_timer(Ref) of
+	false ->
+	    erlang:cancel_timer(Ref),
+	    timeout;
+	_ ->
+	    W
+    end.
+
+clear_wait(nowait) ->
+    ok;
+clear_wait({busy_wait, Ref}) ->
+    erlang:cancel_timer(Ref),
+    ok.
 
 %% @spec log(GprocKey) -> integer()
 %% @doc Update a counter associated with a worker name.
@@ -928,14 +1003,18 @@ f(_) ->
 
 
 %% @private
-setup_test_pool(P, Type0, Opts) ->
+setup_test_pool(P, Type, Opts) ->
+    setup_test_pool(P, Type, Opts, test_workers()).
+
+setup_test_pool(P, Type0, Opts, Workers) ->
     Type = case Type0 of {_, T} -> T; T when is_atom(T) -> T end,
     new(P, Type, Opts),
     [begin R = add_worker(P, W),
            io:fwrite("add_worker(~p, ~p) -> ~p; Ws = ~p~n",
                      [P, W, R, get_workers_(?POOL(P))]),
            connect_worker(P, W)
-     end || W <- test_workers()].
+     end || W <- Workers].
+
 
 %% @private
 remove_test_pool(P) ->
@@ -978,11 +1057,14 @@ test_run1(_, _, S, M) ->
 
 %% @private
 test_run2(N, P) ->
-    test_run2(N, P, fun(K,_) -> log(K) end, 0, 0).
+    test_run2(N, P, fun(K,_) ->
+			    R = log(K),
+			    timer:sleep(crypto:rand_uniform(1,50)),
+			    R
+		    end, 0, 0).
 
 test_run2(N, P, F, S, M) when N > 0 ->
-    {T, {true, _}} = timer:tc(?MODULE, claim, [P, F]),
-    timer:sleep(crypto:rand_uniform(1,50)),
+    {T, {true, _}} = timer:tc(?MODULE, claim, [P, F, {busy_wait, 5000}]),
     test_run2(N-1, P, F, S+T, M+1);
 test_run2(_, _, _, S, M) ->
     S/M.