Browse Source

Merge branch 'add_member_retry'

seth 13 years ago
parent
commit
02f3023c72
2 changed files with 52 additions and 9 deletions
  1. 32 9
      src/pooler.erl
  2. 20 0
      test/pooler_test.erl

+ 32 - 9
src/pooler.erl

@@ -11,6 +11,7 @@
 -module(pooler).
 -module(pooler).
 -behaviour(gen_server).
 -behaviour(gen_server).
 -define(SERVER, ?MODULE).
 -define(SERVER, ?MODULE).
+-define(DEFAULT_ADD_RETRY, 1).
 
 
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("eunit/include/eunit.hrl").
 
 
@@ -24,7 +25,14 @@
           start_mfa        :: {atom(), atom(), [term()]},
           start_mfa        :: {atom(), atom(), [term()]},
           free_pids = []   :: [pid()],
           free_pids = []   :: [pid()],
           in_use_count = 0 :: non_neg_integer(),
           in_use_count = 0 :: non_neg_integer(),
-          free_count = 0   :: non_neg_integer()
+          free_count = 0   :: non_neg_integer(),
+          %% The number times to attempt adding a pool member if the
+          %% pool size is below max_count and there are no free
+          %% members. After this many tries, error_no_members will be
+          %% returned by a call to take_member. NOTE: this value
+          %% should be >= 2 or else the pool will not grow on demand
+          %% when max_count is larger than init_count.
+          add_member_retry = ?DEFAULT_ADD_RETRY :: non_neg_integer()
          }).
          }).
 
 
 -record(state, {
 -record(state, {
@@ -250,7 +258,8 @@ props_to_pool(P) ->
     #pool{      name = ?gv(name, P),
     #pool{      name = ?gv(name, P),
            max_count = ?gv(max_count, P),
            max_count = ?gv(max_count, P),
           init_count = ?gv(init_count, P),
           init_count = ?gv(init_count, P),
-           start_mfa = ?gv(start_mfa, P)}.
+           start_mfa = ?gv(start_mfa, P),
+ add_member_retry = ?gv(add_member_retry, P, ?DEFAULT_ADD_RETRY)}.
 
 
 % FIXME: creation of new pids should probably happen
 % FIXME: creation of new pids should probably happen
 % in a spawned process to avoid tying up the loop.
 % in a spawned process to avoid tying up the loop.
@@ -288,14 +297,16 @@ add_pids(PoolName, N, State) ->
             {max_count_reached, State}
             {max_count_reached, State}
     end.
     end.
 
 
--spec take_member(string(), pid(), #state{}) ->
+-spec take_member(string(), {pid(), _}, #state{}) ->
     {error_no_pool | error_no_members | pid(), #state{}}.
     {error_no_pool | error_no_members | pid(), #state{}}.
 take_member(PoolName, From, #state{pools = Pools} = State) ->
 take_member(PoolName, From, #state{pools = Pools} = State) ->
-    take_member_from_pool(fetch_pool(PoolName, Pools), From, State).
+    Pool = fetch_pool(PoolName, Pools),
+    take_member_from_pool(Pool, From, State, pool_add_retries(Pool)).
 
 
--spec take_member_from_pool(error_no_pool | #pool{}, {pid(), term()}, #state{}) ->
+-spec take_member_from_pool(error_no_pool | #pool{}, {pid(), term()}, #state{},
+                            non_neg_integer()) ->
                                    {error_no_pool | error_no_members | pid(), #state{}}.
                                    {error_no_pool | error_no_members | pid(), #state{}}.
-take_member_from_pool(error_no_pool, _From, State) ->
+take_member_from_pool(error_no_pool, _From, State, _) ->
     {error_no_pool, State};
     {error_no_pool, State};
 take_member_from_pool(#pool{name = PoolName,
 take_member_from_pool(#pool{name = PoolName,
                             max_count = Max,
                             max_count = Max,
@@ -303,22 +314,29 @@ take_member_from_pool(#pool{name = PoolName,
                             in_use_count = NumInUse,
                             in_use_count = NumInUse,
                             free_count = NumFree} = Pool,
                             free_count = NumFree} = Pool,
                       From,
                       From,
-                      #state{pools = Pools, consumer_to_pid = CPMap} = State) ->
+                      #state{pools = Pools, consumer_to_pid = CPMap} = State,
+                      Retries) ->
     send_metric(pool_metric(PoolName, take_rate), 1, meter),
     send_metric(pool_metric(PoolName, take_rate), 1, meter),
     case Free of
     case Free of
         [] when NumInUse =:= Max ->
         [] when NumInUse =:= Max ->
             send_metric(<<"pooler.error_no_members_count">>, {inc, 1}, counter),
             send_metric(<<"pooler.error_no_members_count">>, {inc, 1}, counter),
             send_metric(<<"pooler.events">>, error_no_members, history),
             send_metric(<<"pooler.events">>, error_no_members, history),
             {error_no_members, State};
             {error_no_members, State};
-        [] when NumInUse < Max ->
+        [] when NumInUse < Max andalso Retries > 0 ->
             case add_pids(PoolName, 1, State) of
             case add_pids(PoolName, 1, State) of
                 {ok, State1} ->
                 {ok, State1} ->
-                    take_member(PoolName, From, State1);
+                    %% add_pids may have updated our pool
+                    Pool1 = fetch_pool(PoolName, State1#state.pools),
+                    take_member_from_pool(Pool1, From, State1, Retries - 1);
                 {max_count_reached, _} ->
                 {max_count_reached, _} ->
                     send_metric(<<"pooler.error_no_members_count">>, {inc, 1}, counter),
                     send_metric(<<"pooler.error_no_members_count">>, {inc, 1}, counter),
                     send_metric(<<"pooler.events">>, error_no_members, history),
                     send_metric(<<"pooler.events">>, error_no_members, history),
                     {error_no_members, State}
                     {error_no_members, State}
             end;
             end;
+        [] when Retries =:= 0 ->
+            %% max retries reached
+            send_metric(<<"pooler.error_no_members_count">>, {inc, 1}, counter),
+            {error_no_members, State};
         [Pid|Rest] ->
         [Pid|Rest] ->
             erlang:link(From),
             erlang:link(From),
             Pool1 = Pool#pool{free_pids = Rest, in_use_count = NumInUse + 1,
             Pool1 = Pool#pool{free_pids = Rest, in_use_count = NumInUse + 1,
@@ -475,6 +493,11 @@ fetch_pool(PoolName, Pools) ->
         error -> error_no_pool
         error -> error_no_pool
     end.
     end.
 
 
+pool_add_retries(#pool{add_member_retry = Retries}) ->
+    Retries;
+pool_add_retries(error_no_pool) ->
+    0.
+
 -spec store_pool(string(), #pool{}, dict()) -> dict().
 -spec store_pool(string(), #pool{}, dict()) -> dict().
 store_pool(PoolName, Pool = #pool{}, Pools) ->
 store_pool(PoolName, Pool = #pool{}, Pools) ->
     dict:store(PoolName, Pool, Pools).
     dict:store(PoolName, Pool, Pools).

+ 20 - 0
test/pooler_test.erl

@@ -284,6 +284,26 @@ pooler_basics_test_() ->
        end}
        end}
      ]}}.
      ]}}.
 
 
+pooler_limit_failed_adds_test_() ->
+    %% verify that pooler crashes completely if too many failures are
+    %% encountered while trying to add pids.
+    {setup,
+     fun() ->
+             Pools = [[{name, "p1"},
+                       {max_count, 10},
+                       {init_count, 10},
+                       {start_mfa,
+                        {pooled_gs, start_link, [crash]}}]],
+             application:set_env(pooler, pools, Pools)
+     end,
+     fun(_) ->
+             application:stop(pooler)
+     end,
+     fun() ->
+             application:start(pooler),
+             ?assertEqual(error_no_members, pooler:take_member()),
+             ?assertEqual(error_no_members, pooler:take_member("p1"))
+     end}.
 
 
 random_message_test_() ->
 random_message_test_() ->
     {setup,
     {setup,