Browse Source

Merge branch 'async-start' into pooler-1-0

Seth Falcon 12 years ago
parent
commit
04e39af87d
9 changed files with 375 additions and 111 deletions
  1. 3 0
      Makefile
  2. 2 2
      pooler-example.config
  3. 136 92
      src/pooler.erl
  4. 11 1
      src/pooler.hrl
  5. 127 0
      src/pooler_starter.erl
  6. 26 0
      src/pooler_starter_sup.erl
  7. 5 1
      src/pooler_sup.erl
  8. 4 1
      test/pooler_perf_test.erl
  9. 61 14
      test/pooler_tests.erl

+ 3 - 0
Makefile

@@ -20,3 +20,6 @@ clean:
 
 distclean: clean
 	@rebar delete-deps
+
+demo_shell: compile test
+	@erl -pa .eunit ebin -config pooler-example -s pooler manual_start

+ 2 - 2
pooler.config.example → pooler-example.config

@@ -3,13 +3,13 @@
 [
  {pooler, [
            {pools, [
-                    [{name, "pool1"},
+                    [{name, pool1},
                      {max_count, 5},
                      {init_count, 2},
                      {start_mfa,
                       {pooled_gs, start_link, [{"p1"}]}}],
                     
-                    [{name, "pool2"},
+                    [{name, pool2},
                      {max_count, 5},
                      {init_count, 2},
                      {start_mfa,

+ 136 - 92
src/pooler.erl

@@ -27,14 +27,16 @@
 %% API Function Exports
 %% ------------------------------------------------------------------
 
--export([start_link/1,
+-export([accept_member/2,
+         start_link/1,
          take_member/1,
          take_group_member/1,
          return_group_member/2,
          return_group_member/3,
          return_member/2,
          return_member/3,
-         pool_stats/1]).
+         pool_stats/1,
+         manual_start/0]).
 
 %% ------------------------------------------------------------------
 %% gen_server Function Exports
@@ -59,6 +61,16 @@
 start_link(#pool{name = Name} = Pool) ->
     gen_server:start_link({local, Name}, ?MODULE, Pool, []).
 
+manual_start() ->
+    application:start(sasl),
+    application:start(crypto),
+    application:start(pooler).
+
+%% @doc For INTERNAL use. Adds `MemberPid' to the pool.
+-spec accept_member(atom() | pid(), pid() | {noproc, _}) -> ok.
+accept_member(PoolName, MemberPid) ->
+    gen_server:call(PoolName, {accept_member, MemberPid}).
+
 %% @doc Obtain exclusive access to a member from `PoolName'.
 %%
 %% If no free members are available, 'error_no_members' is returned.
@@ -171,8 +183,10 @@ init(#pool{}=Pool) ->
     #pool{init_count = N} = Pool,
     MemberSup = pooler_pool_sup:member_sup_name(Pool),
     Pool1 = set_member_sup(Pool, MemberSup),
+    %% This schedules the next cull when the pool is configured for
+    %% such and is otherwise a no-op.
     Pool2 = cull_members_from_pool(Pool1),
-    {ok, NewPool} = add_pids(N, Pool2),
+    {ok, NewPool} = init_members_sync(N, Pool2),
     %% trigger an immediate timeout, handled by handle_info to allow
     %% us to register with pg2. We use the timeout mechanism to ensure
     %% that a server is added to a group only when it is ready to
@@ -183,15 +197,18 @@ set_member_sup(#pool{} = Pool, MemberSup) ->
     Pool#pool{member_sup = MemberSup}.
 
 handle_call(take_member, {CPid, _Tag}, #pool{} = Pool) ->
-    Retries = pool_add_retries(Pool),
-    {Member, NewPool} = take_member_from_pool(Pool, CPid, Retries),
+    {Member, NewPool} = take_member_from_pool(Pool, CPid),
     {reply, Member, NewPool};
 handle_call({return_member, Pid, Status}, {_CPid, _Tag}, Pool) ->
     {reply, ok, do_return_member(Pid, Status, Pool)};
+handle_call({accept_member, Pid}, _From, Pool) ->
+    {reply, ok, do_accept_member(Pid, Pool)};
 handle_call(stop, _From, Pool) ->
     {stop, normal, stop_ok, Pool};
 handle_call(pool_stats, _From, Pool) ->
     {reply, dict:to_list(Pool#pool.all_members), Pool};
+handle_call(dump_pool, _From, Pool) ->
+    {reply, Pool, Pool};
 handle_call(_Request, _From, Pool) ->
     {noreply, Pool}.
 
@@ -244,70 +261,117 @@ code_change(_OldVsn, State, _Extra) ->
 %% Internal Function Definitions
 %% ------------------------------------------------------------------
 
-% FIXME: creation of new pids should probably happen
-% in a spawned process to avoid tying up the loop.
--spec add_pids(non_neg_integer(), #pool{}) ->
-    {max_count_reached | ok, #pool{}}.
-add_pids(N, Pool) ->
-    #pool{max_count = Max, free_pids = Free,
-          in_use_count = NumInUse, free_count = NumFree,
-          member_sup = PoolSup,
-          all_members = AllMembers} = Pool,
-    Total = NumFree + NumInUse,
-    PoolName = Pool#pool.name,
-    case Total + N =< Max of
+do_accept_member({Ref, Pid},
+                 #pool{
+                    all_members = AllMembers,
+                    free_pids = Free,
+                    free_count = NumFree,
+                    starting_members = StartingMembers0
+                   } = Pool) when is_pid(Pid) ->
+    %% make sure we don't accept a timedout member
+    StartingMembers = remove_stale_starting_members(Pool, StartingMembers0,
+                                                    ?DEFAULT_MEMBER_START_TIMEOUT),
+    case lists:keymember(Ref, 1, StartingMembers) of
+        false ->
+            %% a pid we didn't ask to start, ignore it.
+            %% should we log it?
+            Pool;
         true ->
-            {AllMembers1, NewPids} = start_n_pids(N, PoolSup, AllMembers),
-            %% start_n_pids may return fewer than N if errors were
-            %% encountered.
-            NewPidCount = length(NewPids),
-            case NewPidCount =:= N of
-                true -> ok;
-                false ->
-                    error_logger:error_msg("pool '~s' tried to add ~B members, only added ~B~n",
-                                           [PoolName, N, NewPidCount]),
-                    %% consider changing this to a count?
-                    send_metric(Pool, events,
-                                {add_pids_failed, N, NewPidCount}, history)
-            end,
-            Pool1 = Pool#pool{free_pids = Free ++ NewPids,
-                              free_count = length(Free) + NewPidCount},
-            {ok, Pool1#pool{all_members = AllMembers1}};
+            StartingMembers1 = lists:keydelete(Ref, 1, StartingMembers),
+            MRef = erlang:monitor(process, Pid),
+            Entry = {MRef, free, os:timestamp()},
+            AllMembers1 = store_all_members(Pid, Entry, AllMembers),
+            Pool#pool{free_pids = Free ++ [Pid],
+                      free_count = NumFree + 1,
+                      all_members = AllMembers1,
+                      starting_members = StartingMembers1}
+    end;
+do_accept_member({Ref, _Reason}, #pool{starting_members = StartingMembers0} = Pool) ->
+    %% member start failed, remove in-flight ref and carry on.
+    StartingMembers = remove_stale_starting_members(Pool, StartingMembers0,
+                                                    ?DEFAULT_MEMBER_START_TIMEOUT),
+    StartingMembers1 = lists:keydelete(Ref, 1, StartingMembers),
+    Pool#pool{starting_members = StartingMembers1}.
+
+
+-spec remove_stale_starting_members(#pool{}, [{reference(), erlang:timestamp()}],
+                                    time_spec()) -> [{reference(), erlang:timestamp()}].
+remove_stale_starting_members(Pool, StartingMembers, MaxAge) ->
+    Now = os:timestamp(),
+    MaxAgeSecs = time_as_secs(MaxAge),
+    lists:filter(fun(SM) ->
+                         starting_member_not_stale(Pool, Now, SM, MaxAgeSecs)
+                 end, StartingMembers).
+
+starting_member_not_stale(Pool, Now, {_Ref, StartTime}, MaxAgeSecs) ->
+    case secs_between(StartTime, Now) < MaxAgeSecs of
+        true ->
+            true;
         false ->
-            {max_count_reached, Pool}
+            error_logger:error_msg("pool '~s': starting member timeout", [Pool#pool.name]),
+            send_metric(Pool, starting_member_timeout, {inc, 1}, counter),
+            false
+    end.
+
+init_members_sync(N, #pool{name = PoolName} = Pool) ->
+    Self = self(),
+    StartTime = os:timestamp(),
+    StartRefs = [ {pooler_starter:start_member(Pool, Self), StartTime}
+                  || _I <- lists:seq(1, N) ],
+    Pool1 = Pool#pool{starting_members = StartRefs},
+    case collect_init_members(Pool1) of
+        timeout ->
+            error_logger:error_msg("pool '~s': exceeded timeout waiting for ~B members",
+                                   [PoolName, Pool1#pool.init_count]),
+            error({timeout, "unable to start members"});
+        #pool{} = Pool2 ->
+            {ok, Pool2}
     end.
 
--spec take_member_from_pool(#pool{}, {pid(), term()},
-                            non_neg_integer()) ->
+collect_init_members(#pool{starting_members = []} = Pool) ->
+    Pool;
+collect_init_members(#pool{} = Pool) ->
+    Timeout = time_as_millis(?DEFAULT_MEMBER_START_TIMEOUT),
+    receive
+        {accept_member, {Ref, Member}} ->
+            collect_init_members(do_accept_member({Ref, Member}, Pool))
+    after
+        Timeout ->
+            timeout
+    end.
+
+-spec take_member_from_pool(#pool{}, {pid(), term()}) ->
                                    {error_no_members | pid(), #pool{}}.
-take_member_from_pool(#pool{max_count = Max,
+take_member_from_pool(#pool{init_count = InitCount,
+                            max_count = Max,
                             free_pids = Free,
                             in_use_count = NumInUse,
                             free_count = NumFree,
-                            consumer_to_pid = CPMap} = Pool,
-                      From,
-                      Retries) ->
+                            consumer_to_pid = CPMap,
+                            starting_members = StartingMembers0} = Pool,
+                      From) ->
     send_metric(Pool, take_rate, 1, meter),
+    StartingMembers = remove_stale_starting_members(Pool, StartingMembers0,
+                                                    ?DEFAULT_MEMBER_START_TIMEOUT),
+    NumCanAdd = Max - (NumInUse + NumFree + length(StartingMembers)),
     case Free of
-        [] when NumInUse =:= Max ->
+        [] when NumCanAdd =< 0  ->
             send_metric(Pool, error_no_members_count, {inc, 1}, counter),
             send_metric(Pool, events, error_no_members, history),
             {error_no_members, Pool};
-        [] when NumInUse < Max andalso Retries > 0 ->
-            case add_pids(1, Pool) of
-                {ok, Pool1} ->
-                    %% add_pids may have updated our pool
-                    take_member_from_pool(Pool1, From, Retries - 1);
-                {max_count_reached, _} ->
-                    send_metric(Pool, error_no_members_count, {inc, 1}, counter),
-                    send_metric(Pool, events, error_no_members, history),
-                    {error_no_members, Pool}
-            end;
-        [] when Retries =:= 0 ->
-            %% max retries reached
+        [] when NumCanAdd > 0 ->
+            %% Limit concurrently starting members to init_count. Add
+            %% up to init_count members. Starting members here means
+            %% we always return an error_no_members for a take request
+            %% when all members are in-use. By adding a batch of new
+            %% members, the pool should reach a steady state with
+            %% unused members culled over time (if scheduled cull is
+            %% enabled).
+            NumToAdd = min(InitCount - length(StartingMembers), NumCanAdd),
+            Pool1 = add_members_async(NumToAdd, Pool),
             send_metric(Pool, error_no_members_count, {inc, 1}, counter),
             send_metric(Pool, events, error_no_members, history),
-            {error_no_members, Pool};
+            {error_no_members, Pool1};
         [Pid|Rest] ->
             Pool1 = Pool#pool{free_pids = Rest, in_use_count = NumInUse + 1,
                               free_count = NumFree - 1},
@@ -320,6 +384,15 @@ take_member_from_pool(#pool{max_count = Max,
                    }}
     end.
 
+%% @doc Add `Count' members to `Pool' asynchronously. Returns updated
+%% `Pool' record with starting member refs added to field
+%% `starting_members'.
+add_members_async(Count, #pool{starting_members = StartingMembers} = Pool) ->
+    StartTime = os:timestamp(),
+    StartRefs = [ {pooler_starter:start_member(Pool), StartTime}
+                  || _I <- lists:seq(1, Count) ],
+    Pool#pool{starting_members = StartRefs ++ StartingMembers}.
+
 -spec do_return_member(pid(), ok | fail, #pool{}) -> #pool{}.
 do_return_member(Pid, ok, #pool{all_members = AllMembers} = Pool) ->
     clean_group_table(Pid, Pool),
@@ -343,16 +416,7 @@ do_return_member(Pid, fail, #pool{all_members = AllMembers} = Pool) ->
     case dict:find(Pid, AllMembers) of
         {ok, {_MRef, _, _}} ->
             Pool1 = remove_pid(Pid, Pool),
-            case add_pids(1, Pool1) of
-                {Status, Pool2} when Status =:= ok;
-                                     Status =:= max_count_reached ->
-                    Pool2;
-                {Status, _} ->
-                    erlang:error({error, "unexpected return from add_pid",
-                                  Status, erlang:get_stacktrace()}),
-                    send_metric(Pool1, events, bad_return_from_add_pid,
-                                history)
-            end;
+            add_members_async(1, Pool1);
         error ->
             Pool
     end.
@@ -424,33 +488,6 @@ remove_pid(Pid, Pool) ->
             Pool
     end.
 
--spec start_n_pids(non_neg_integer(), pid(), dict()) -> {dict(), [pid()]}.
-start_n_pids(N, PoolSup, AllMembers) ->
-    NewPidsWith = do_n(N, fun(Acc) ->
-                                  case supervisor:start_child(PoolSup, []) of
-                                      {ok, Pid} ->
-                                          MRef = erlang:monitor(process, Pid),
-                                          [{MRef, Pid} | Acc];
-                                      _Else ->
-                                          Acc
-                                  end
-                          end, []),
-    AllMembers1 = lists:foldl(
-                    fun({MRef, Pid}, Dict) ->
-                            Entry = {MRef, free, os:timestamp()},
-                            store_all_members(Pid, Entry, Dict)
-                    end, AllMembers, NewPidsWith),
-    NewPids = [ Pid || {_MRef, Pid} <- NewPidsWith ],
-    {AllMembers1, NewPids}.
-
-do_n(0, _Fun, Acc) ->
-    Acc;
-do_n(N, Fun, Acc) ->
-    do_n(N - 1, Fun, Fun(Acc)).
-
-pool_add_retries(#pool{add_member_retry = Retries}) ->
-    Retries.
-
 -spec store_all_members(pid(),
                         {reference(), free | pid(), {_, _, _}}, dict()) -> dict().
 store_all_members(Pid, Val = {_MRef, _CPid, _Time}, AllMembers) ->
@@ -531,7 +568,7 @@ expired_free_members(Members, Now, MaxAge) ->
                   Label :: atom(),
                   Value :: metric_value(),
                   Type  :: metric_type()) -> ok.
-send_metric(#pool{metrics_mod = undefined}, _Label, _Value, _Type) ->
+send_metric(#pool{metrics_mod = pooler_no_metrics}, _Label, _Value, _Type) ->
     ok;
 send_metric(#pool{name = PoolName, metrics_mod = MetricsMod}, Label, Value, Type) ->
     MetricName = pool_metric(PoolName, Label),
@@ -543,6 +580,10 @@ pool_metric(PoolName, Metric) ->
     iolist_to_binary([<<"pooler.">>, atom_to_binary(PoolName, utf8),
                       ".", atom_to_binary(Metric, utf8)]).
 
+-spec time_as_secs(time_spec()) -> non_neg_integer().
+time_as_secs({Time, Unit}) ->
+    time_as_micros({Time, Unit}) div 1000000.
+
 -spec time_as_millis(time_spec()) -> non_neg_integer().
 %% @doc Convert time unit into milliseconds.
 time_as_millis({Time, Unit}) ->
@@ -558,3 +599,6 @@ time_as_micros({Time, ms}) ->
     1000 * Time;
 time_as_micros({Time, mu}) ->
     Time.
+
+secs_between({Mega1, Secs1, _}, {Mega2, Secs2, _}) ->
+    (Mega2 - Mega1) * 1000000 + (Secs2 - Secs1).

+ 11 - 1
src/pooler.hrl

@@ -1,7 +1,7 @@
 -define(DEFAULT_ADD_RETRY, 1).
 -define(DEFAULT_CULL_INTERVAL, {0, min}).
 -define(DEFAULT_MAX_AGE, {0, min}).
-
+-define(DEFAULT_MEMBER_START_TIMEOUT, {1, min}).
 -define(POOLER_GROUP_TABLE, pooler_group_table).
 
 -type member_info() :: {string(), free | pid(), {_, _, _}}.
@@ -36,6 +36,10 @@
           %% The supervisor used to start new members
           member_sup :: atom() | pid(),
 
+          %% The supervisor used to start starter servers that start
+          %% new members. This is what enables async member starts.
+          starter_sup :: atom() | pid(),
+
           %% Maps member pid to a tuple of the form:
           %% {MonitorRef, Status, Time},
           %% where MonitorRef is a monitor reference for the member,,
@@ -50,6 +54,12 @@
           %% members being consumed.
           consumer_to_pid = dict:new() :: dict(),
 
+          %% A list of `{References, Timestamp}' tuples representing
+          %% new member start requests that are in-flight. The
+          %% timestamp records when the start request was initiated
+          %% and is used to implement start timeout.
+          starting_members = [] :: [{reference(), erlang:timestamp()}],
+
           %% The module to use for collecting metrics. If set to
           %% 'pooler_no_metrics', then metric sending calls do
           %% nothing. A typical value to actually capture metrics is

+ 127 - 0
src/pooler_starter.erl

@@ -0,0 +1,127 @@
+%% @author Seth Falcon <seth@userprimary.net>
+%% @copyright 2012 Seth Falcon
+%% @doc Helper gen_server to start pool members
+%%
+-module(pooler_starter).
+-behaviour(gen_server).
+
+-include("pooler.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+%% ------------------------------------------------------------------
+%% API Function Exports
+%% ------------------------------------------------------------------
+
+-export([start_link/3,
+         start_member/1,
+         start_member/2,
+         stop/1]).
+
+%% ------------------------------------------------------------------
+%% gen_server Function Exports
+%% ------------------------------------------------------------------
+
+-export([init/1,
+         handle_call/3,
+         handle_cast/2,
+         handle_info/2,
+         terminate/2,
+         code_change/3]).
+
+%% To help with testing internal functions
+-ifdef(TEST).
+-compile([export_all]).
+-endif.
+
+%% ------------------------------------------------------------------
+%% API Function Definitions
+%% ------------------------------------------------------------------
+
+start_link(Pool, Ref, Parent) ->
+    gen_server:start_link(?MODULE, {Pool, Ref, Parent}, []).
+
+stop(Starter) ->
+    gen_server:call(Starter, stop).
+
+%% @doc Start a member for the specified `Pool'.
+%%
+%% Member creation with this call is async. This function returns
+%% immediately with a reference. When the member has been created it
+%% is sent to the specified pool via {@link pooler:accept_member/2}.
+%%
+%% Each call starts a single use `pooler_starter' instance via
+%% `pooler_starter_sup'. The instance terminates normally after
+%% creating a single member.
+-spec start_member(#pool{}) -> reference().
+start_member(Pool) ->
+    Ref = make_ref(),
+    {ok, _Pid} = pooler_starter_sup:new_starter(Pool, Ref, pool),
+    Ref.
+
+%% @doc Same as {@link start_member/1} except that instead of calling
+%% {@link pooler:accept_member/2} a raw message is sent to `Parent' of
+%% the form `{accept_member, {Ref, Member}'. Where `Member' will
+%% either be the member pid or an error term and `Ref' will be the
+%% reference returned from this function.
+%%
+%% This is used by the init function in the `pooler' to start the
+%% initial set of pool members in parallel.
+start_member(Pool, Parent) ->
+    Ref = make_ref(),
+    {ok, _Pid} = pooler_starter_sup:new_starter(Pool, Ref, Parent),
+    Ref.
+
+%% ------------------------------------------------------------------
+%% gen_server Function Definitions
+%% ------------------------------------------------------------------
+-record(starter, {pool,
+                  ref,
+                  parent}).
+
+-spec init({#pool{}, reference(), pid() | atom()}) -> {'ok', #starter{}, 0}.
+init({Pool, Ref, Parent}) ->
+    %% trigger immediate timeout message, which we'll use to trigger
+    %% the member start.
+    {ok, #starter{pool = Pool, ref = Ref, parent = Parent}, 0}.
+
+handle_call(stop, _From, State) ->
+    {stop, normal, stop_ok, State};
+handle_call(_Request, _From, State) ->
+    {noreply, State}.
+
+handle_cast(_Request, State) ->
+    {noreply, State}.
+
+-spec handle_info(_, _) -> {'noreply', _}.
+handle_info(timeout,
+            #starter{pool = Pool, ref = Ref, parent = Parent} = State) ->
+    ok = do_start_member(Pool, Ref, Parent),
+    {stop, normal, State};
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+-spec terminate(_, _) -> 'ok'.
+terminate(_Reason, _State) ->
+    ok.
+
+-spec code_change(_, _, _) -> {'ok', _}.
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+do_start_member(#pool{name = PoolName, member_sup = PoolSup}, Ref, Parent) ->
+    Msg = case supervisor:start_child(PoolSup, []) of
+              {ok, Pid} ->
+                  {Ref, Pid};
+              Error ->
+                  error_logger:error_msg("pool '~s' failed to start member: ~p",
+                                         [PoolName, Error]),
+                  {Ref, Error}
+          end,
+    send_accept_member(Parent, PoolName, Msg),
+    ok.
+
+send_accept_member(pool, PoolName, Msg) ->
+    pooler:accept_member(PoolName, Msg);
+send_accept_member(Pid, _PoolName, Msg) ->
+    Pid ! {accept_member, Msg},
+    ok.

+ 26 - 0
src/pooler_starter_sup.erl

@@ -0,0 +1,26 @@
+%% @doc Simple one for one supervisor for pooler_starter.
+%%
+%% This supervisor is shared by all pools since pooler_starter is a
+%% generic helper to fasciliate async member start.
+-module(pooler_starter_sup).
+
+-behaviour(supervisor).
+
+-export([new_starter/3,
+         start_link/0,
+         init/1]).
+
+-include("pooler.hrl").
+
+new_starter(Pool, Ref, Parent) ->
+    supervisor:start_child(?MODULE, [Pool, Ref, Parent]).
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+    Worker = {pooler_starter, {pooler_starter, start_link, []},
+              temporary, brutal_kill, worker, [pooler_starter]},
+    Specs = [Worker],
+    Restart = {simple_one_for_one, 1, 1},
+    {ok, {Restart, Specs}}.

+ 5 - 1
src/pooler_sup.erl

@@ -16,7 +16,11 @@ init([]) ->
     Pools = [ pooler_config:list_to_pool([MetricsConfig | L]) || L <- Config ],
     PoolSupSpecs = [ pool_sup_spec(Pool) || Pool <- Pools ],
     ets:new(?POOLER_GROUP_TABLE, [set, public, named_table, {write_concurrency, true}]),
-    {ok, {{one_for_one, 5, 60}, PoolSupSpecs}}.
+    {ok, {{one_for_one, 5, 60}, [starter_sup_spec() | PoolSupSpecs]}}.
+
+starter_sup_spec() ->
+    {pooler_starter_sup, {pooler_starter_sup, start_link, []},
+     transient, 5000, supervisor, [pooler_starter_sup]}.
 
 pool_sup_spec(#pool{name = Name} = Pool) ->
     SupName = pool_sup_name(Name),

+ 4 - 1
test/pooler_perf_test.erl

@@ -84,7 +84,7 @@ pooler_take_return_test_() ->
     {foreach,
      % setup
      fun() ->
-             InitCount = 10,
+             InitCount = 100,
              MaxCount = 100,
              NumPools = 5,
              error_logger:delete_report_handler(error_logger_tty_h),
@@ -114,6 +114,9 @@ pooler_take_return_test_() ->
                    lists:foldr(fun({_, L}, {O, F}) ->
                                        {O + ?gv(ok, L), F + ?gv(fail, L)}
                                end, {0, 0}, Res),
+               %% not sure what to test here now. We expect some
+               %% failures if init count is less than max count
+               %% because of async start.
                ?assertEqual(0, NumFail),
                ?assertEqual(100*100, NumOk)
        end}

+ 61 - 14
test/pooler_tests.erl

@@ -153,9 +153,14 @@ pooler_basics_test_() ->
                ?assertExit({noproc, _}, pooler:take_member(bad_pool_name))
        end},
 
-      {"pids are created on demand until max",
+      {"members creation is triggered after pool exhaustion until max",
        fun() ->
-               Pids = [pooler:take_member(test_pool_1), pooler:take_member(test_pool_1), pooler:take_member(test_pool_1)],
+               %% init count is 2
+               Pids0 = [pooler:take_member(test_pool_1), pooler:take_member(test_pool_1)],
+               %% since new member creation is async, can only assert
+               %% that we will get a pid, but may not be first try.
+               Pids = get_n_pids(1, Pids0),
+               %% pool is at max now, requests should give error
                ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
                ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
                PRefs = [ R || {_T, R} <- [ pooled_gs:get_id(P) || P <- Pids ] ],
@@ -178,8 +183,7 @@ pooler_basics_test_() ->
 
       {"if an in-use pid crashes it is replaced",
        fun() ->
-               Pids0 = [pooler:take_member(test_pool_1), pooler:take_member(test_pool_1),
-                        pooler:take_member(test_pool_1)],
+               Pids0 = get_n_pids(3, []),
                Ids0 = [ pooled_gs:get_id(P) || P <- Pids0 ],
                % crash them all
                [ pooled_gs:crash(P) || P <- Pids0 ],
@@ -199,7 +203,7 @@ pooler_basics_test_() ->
 
       {"if a pid is returned with bad status it is replaced",
        fun() ->
-               Pids0 = [pooler:take_member(test_pool_1), pooler:take_member(test_pool_1), pooler:take_member(test_pool_1)],
+               Pids0 = get_n_pids(3, []),
                Ids0 = [ pooled_gs:get_id(P) || P <- Pids0 ],
                % return them all marking as bad
                [ pooler:return_member(test_pool_1, P, fail) || P <- Pids0 ],
@@ -262,6 +266,13 @@ pooler_basics_test_() ->
                Metrics = fake_metrics:get_metrics(),
                GotKeys = lists:usort([ Name || {Name, _, _} <- Metrics ]),
                ?assertEqual(ExpectKeys, GotKeys)
+       end},
+
+      {"accept bad member is handled",
+       fun() ->
+               Bad = spawn(fun() -> ok end),
+               Ref = erlang:make_ref(),
+               ?assertEqual(ok, pooler:accept_member(test_pool_1, {Ref, Bad}))
        end}
      ]}}.
 
@@ -329,6 +340,18 @@ pooler_groups_test_() ->
                             pooler:take_group_member(not_a_group))
        end},
 
+      {"return member to unknown group",
+       fun() ->
+               Pid = pooler:take_group_member(group_1),
+               ?assertEqual(ok, pooler:return_group_member(no_such_group, Pid))
+       end},
+
+      {"return member to wrong group",
+       fun() ->
+               Pid = pooler:take_member(test_pool_3),
+               ?assertEqual(ok, pooler:return_group_member(group_1, Pid))
+       end},
+
       {"return member to group, implied ok",
        fun() ->
                Pid = pooler:take_group_member(group_1),
@@ -343,7 +366,7 @@ pooler_groups_test_() ->
 
       {"exhaust pools in group",
        fun() ->
-               Pids = [ pooler:take_group_member(group_1) || _I <- lists:seq(1, 6) ],
+               Pids = get_n_pids_group(group_1, 6, []),
                %% they should all be pids
                [ begin
                      {Type, _} = pooled_gs:get_id(P),
@@ -401,7 +424,7 @@ pooler_scheduled_cull_test_() ->
      [{"excess members are culled repeatedly",
        fun() ->
                %% take all members
-               Pids1 = [ pooler:take_member(test_pool_1) || _X <- lists:seq(1, 10) ],
+               Pids1 = get_n_pids(test_pool_1, 10, []),
                %% return all
                [ pooler:return_member(test_pool_1, P) || P <- Pids1 ],
                ?assertEqual(10, length(pooler:pool_stats(test_pool_1))),
@@ -410,7 +433,7 @@ pooler_scheduled_cull_test_() ->
                ?assertEqual(2, length(pooler:pool_stats(test_pool_1))),
 
                %% repeat the test to verify that culling gets rescheduled.
-               Pids2 = [ pooler:take_member(test_pool_1) || _X <- lists:seq(1, 10) ],
+               Pids2 = get_n_pids(test_pool_1, 10, []),
                %% return all
                [ pooler:return_member(test_pool_1, P) || P <- Pids2 ],
                ?assertEqual(10, length(pooler:pool_stats(test_pool_1))),
@@ -433,7 +456,7 @@ pooler_scheduled_cull_test_() ->
       {"in-use members are not culled",
        fun() ->
                %% take all members
-               Pids = [ pooler:take_member(test_pool_1) || _X <- lists:seq(1, 10) ],
+               Pids = get_n_pids(test_pool_1, 10, []),
                %% don't return any
                ?assertEqual(10, length(pooler:pool_stats(test_pool_1))),
                %% wait for longer than cull delay
@@ -466,9 +489,20 @@ random_message_test_() ->
      end,
     [
      fun() ->
+             Pid = spawn(fun() -> ok end),
+             MonMsg = {'DOWN', erlang:make_ref(), process, Pid, because},
+             test_pool_1 ! MonMsg
+     end,
+
+     fun() ->
              Pid = pooler:take_member(test_pool_1),
              {Type, _} =  pooled_gs:get_id(Pid),
              ?assertEqual("type-0", Type)
+     end,
+
+     fun() ->
+             RawPool = gen_server:call(test_pool_1, dump_pool),
+             ?assertEqual(pool, element(1, RawPool))
      end
     ]}.
 
@@ -553,12 +587,25 @@ time_as_micros_test_() ->
 % testing crash recovery means race conditions when either pids
 % haven't yet crashed or pooler hasn't recovered.  So this helper loops
 % forver until N pids are obtained, ignoring error_no_members.
-get_n_pids(0, Acc) ->
-    Acc;
 get_n_pids(N, Acc) ->
-    case pooler:take_member(test_pool_1) of
+    get_n_pids(test_pool_1, N, Acc).
+
+get_n_pids(_Pool, 0, Acc) ->
+    Acc;
+get_n_pids(Pool, N, Acc) ->
+    case pooler:take_member(Pool) of
+        error_no_members ->
+            get_n_pids(Pool, N, Acc);
+        Pid ->
+            get_n_pids(Pool, N - 1, [Pid|Acc])
+    end.
+
+get_n_pids_group(_Group, 0, Acc) ->
+    Acc;
+get_n_pids_group(Group, N, Acc) ->
+    case pooler:take_group_member(Group) of
         error_no_members ->
-            get_n_pids(N, Acc);
+            get_n_pids_group(Group, N, Acc);
         Pid ->
-            get_n_pids(N - 1, [Pid|Acc])
+            get_n_pids_group(Group, N - 1, [Pid|Acc])
     end.