Browse Source

Merge branch 'pooler-1-0'

Seth Falcon 12 years ago
parent
commit
4596477ae5

+ 3 - 0
.gitignore

@@ -5,3 +5,6 @@ doc/*.html
 /doc/stylesheet.css
 /doc/erlang.png
 /doc/edoc-info
+/bench/tests
+/bench/deps
+

+ 3 - 0
Makefile

@@ -20,3 +20,6 @@ clean:
 
 distclean: clean
 	@rebar delete-deps
+
+demo_shell: compile test
+	@erl -pa .eunit ebin -config pooler-example -s pooler manual_start

+ 48 - 0
NEWS.org

@@ -0,0 +1,48 @@
+* 1.0.0
+** Breaking Changes (upgrading from 0.0.x)
+*** pooler application config format changes
+Pool names in config must be atoms, not strings.
+*** API changes
+1. The function =pooler:take_member/0= has been removed.
+2. Pool names are now atoms not strings. An atom matching a
+   configured pool name is expected by =pooler:take_member/1=.
+3. For load balancing a collection of related pools, you must use the
+   new group API functions: =pooler:take_group_member/1= and
+   =pooler:return_group_member/2=. A group attribute can be specified
+   as optional config for each pool. Pools with the same group name
+   form a group.
+** What's New
+*** Improved support for multiple independent pools
+Each pool is now serviced by its own =gen_server= with an independent
+supervision tree. This makes pooler a good fit when you need to pool
+different unrelated clients, for example Redis and Riak. Independent
+pools will not contend for the same server mailbox as was the case in
+version 0.0.x and the supervision structure should isolate failures
+such that a high crash rate in one pool should not take down an
+unrelated pool.
+*** Asynchronous and parallelized member creation
+Members are started and added to pools asynchronously. This is a major
+improvement when pooling members with substantial startup
+time. Instead of the entire pool being blocked while a new member is
+started, the pool can continue to process messages.
+
+When a pool is initialized, all =init_count= members are started in
+parallel. The pool does not start processing messages until all
+initial members have been added. This reduces the overall
+time-to-start for pooler compared to version 0.0.x where
+initialization of members was handled serially.
+
+Once running, new members are added in batches of size =init_count=
+up to =max_count=. Batches are added after the pool returns a
+single =error_no_members= value for a pool. This means a pool will
+always return at least one =error_no_members= value when growing
+beyond =init_count= size. This approach has the benefit of not
+penalizing a steady load of =init_count= members in use. If members
+addition were to be triggered before =init_count= were in use, then
+members would be added to the pool, never used, and culled after the
+configured timeout.
+*** The pooler server uses monitors not links
+In pooler 0.0.x, =pooler= was a system process that trapped exits and
+linked to members and consumers. Monitors are now used instead to
+reduce the potential impact of a pooler related crash and to simplify
+the code.

+ 99 - 49
README.org

@@ -11,8 +11,8 @@ with exclusive access to pool members using =pooler:take_member=.
 
 *** Protects the members of a pool from being used concurrently
 
-The main pooler interface is =pooler:take_member/0= and
-=pooler:return_member/2=.  The pooler server will keep track of which
+The main pooler interface is =pooler:take_member/1= and
+=pooler:return_member/3=.  The pooler server will keep track of which
 members are *in use* and which are *free*.  There is no need to call
 =pooler:return_member= if the consumer is a short-lived process; in
 this case, pooler will detect the consumer's normal exit and reclaim
@@ -25,25 +25,26 @@ out the member pid to another worker process.
 
 You specify an initial and a maximum number of members in the pool.
 Pooler will create new members on demand until the maximum member
-count is reached.  New pool members are added to replace member that
+count is reached.  New pool members are added to replace members that
 crash.  If a consumer crashes, the member it was using will be
 destroyed and replaced.  You can configure Pooler to periodically
-check for and remove members that have not been used recently using to
+check for and remove members that have not been used recently to
 reduce the member count back to its initial size.
 
 *** Manage multiple pools
 
-A common configuration is to have each pool contain client processes
-connected to a particular node in a cluster (think database read
-slaves).  Pooler will randomly select a pool to fetch a member from.
-If the randomly selected pool has no free members, pooler will select
-a member from the pool with the most free members.  If there is no
-pool with available members, pooler will return =error_no_members=.
-
-You can ask for a member from a specified pool using
-=pooler:take_member/1=. If ensure your code always asks for members by
-pool name, you can use pooler to pool clients for different backend
-services.
+You can use pooler to manage multiple independent pools and multiple
+grouped pools. Independent pools allow you to pool clients for
+different backend services (e.g. postgresql and redis). Grouped pools
+can optionally be accessed using =pooler:take_group_member/1= to
+provide load balancing of the pools in the group. A typical use of
+grouped pools is to have each pool contain clients connected to a
+particular node in a cluster (think database read slaves).  Pooler's
+=take_group_member= function will randomly select a pool in the group
+to fetch a member from.  If the randomly selected pool has no free
+members, pooler will attempt to obtain a member from each pool in the
+group.  If there is no pool with available members, pooler will return
+=error_no_members=.
 
 ** Motivation
 
@@ -70,15 +71,20 @@ continue in the face of Riak node failures, consumers should spread
 their requests across clients connected to each node.  The client pool
 provides an easy way to load balance.
 
+Since writing pooler, I've seen it used to pool database connections
+for PostgreSQL, MySQL, and Redis. These uses led to a redesign to
+better support multiple independent pools.
+
 ** Usage and API
 
-*** Pool Configuration
+*** Pool Configuration via application environment
 
 Pool configuration is specified in the pooler application's
 environment.  This can be provided in a config file using =-config= or
 set at startup using =application:set_env(pooler, pools,
-Pools)=. Here's an example config file that creates three pools of
-Riak pb clients each talking to a different node in a local cluster:
+Pools)=. Here's an example config file that creates two pools of
+Riak pb clients each talking to a different node in a local cluster
+and one pool talking to a Postgresql database:
 
 #+BEGIN_SRC erlang
   % pooler.config
@@ -88,23 +94,25 @@ Riak pb clients each talking to a different node in a local cluster:
   [
    {pooler, [
            {pools, [
-                    [{name, "rc8081"},
+                    [{name, rc8081},
+                     {group, riak},
                      {max_count, 5},
                      {init_count, 2},
                      {start_mfa,
                       {riakc_pb_socket, start_link, ["localhost", 8081]}}],
 
-                    [{name, "rc8082"},
+                    [{name, rc8082},
+                     {group, riak},
                      {max_count, 5},
                      {init_count, 2},
                      {start_mfa,
                       {riakc_pb_socket, start_link, ["localhost", 8082]}}],
 
-                    [{name, "rc8083"},
-                     {max_count, 5},
+                    [{name, pg_db1},
+                     {max_count, 10},
                      {init_count, 2},
                      {start_mfa,
-                      {riakc_pb_socket, start_link, ["localhost", 8083]}}]
+                      {my_pg_sql_driver, start_link, ["db_host"]}}]
                    ]}
              %% if you want to enable metrics, set this to a module with
              %% an API conformant to the folsom_metrics module.
@@ -114,10 +122,12 @@ Riak pb clients each talking to a different node in a local cluster:
   ].
 #+END_SRC
 
-Each pool has a unique name, an initial and maximum number of members,
+Each pool has a unique name, specified as an atom, an initial and maximum number of members,
 and an ={M, F, A}= describing how to start members of the pool.  When
 pooler starts, it will create members in each pool according to
-=init_count=.
+=init_count=. Optionally, you can indicate that a pool is part of a
+group. You can use pooler to load balance across pools labeled with
+the same group tag.
 
 **** Culling stale members
 
@@ -135,7 +145,7 @@ examples are valid:
 #+END_SRC
 
 The =cull_interval= determines the schedule when a check will be made
-for stale members. Checks are scheduling using =erlang:send_after/3=
+for stale members. Checks are scheduled using =erlang:send_after/3=
 which provides a light-weight timing mechanism. The next check is
 scheduled after the prior check completes.
 
@@ -148,38 +158,48 @@ stale member checking entirely. The =max_age= parameter has the same
 default value which will cause any members beyond =init_count= to be
 removed if scheduled culling is enabled.
 
-**** Retry behvaior when members do not start
-
-If there are no free members, but the pool size is less than
-=max_count=, pooler will attempt to add a new member to the pool to
-satisfy a =take_member= request. By default, pooler tries a single
-time to add a new member and will return =error_no_members= if this
-fails. You can increase the number of retries by specifying a value
-for the =add_member_retry= configuration parameter.
-
+*** Pool Configuration via =pooler:new_pool=
+You can create pools using =pooler:new_pool/1= when accepts a
+proplist of pool configuration. Here's an example:
+#+BEGIN_SRC erlang
+PoolConfig = [{name, rc8081},
+              {group, riak},
+              {max_count, 5},
+              {init_count, 2},
+              {start_mfa,
+               {riakc_pb_socket,
+                start_link, ["localhost", 8081]}}],
+pooler:new_pool(PoolConfig).
+#+END_SRC
 *** Using pooler
 
 Here's an example session:
 
 #+BEGIN_SRC erlang
 application:start(pooler).
-P = pooler:take_member(),
+P = pooler:take_member(mysql),
 % use P
-pooler:return_member(P, ok).
+pooler:return_member(mysql, P, ok).
 #+END_SRC
 
 Once started, the main interaction you will have with pooler is
-through two functions, =take_member/0= (or =take_member/1=) and
-=return_member/2= (or =return_member/1=).
-
-Call =pooler:take_member()= to obtain a member from a randomly
-selected pool.  When you are done with it, return it to the pool using
-=pooler:return_member(Pid, ok)=.  If you encountered an error using
-the member, you can pass =fail= as the second argument.  In this case,
-pooler will permanently remove that member from the pool and start a
-new member to replace it.  If your process is short lived, you can
-omit the call to =return_member=.  In this case, pooler will detect
-the normal exit of the consumer and reclaim the member.
+through two functions, =take_member/1= and =return_member/3= (or
+=return_member/2=).
+
+Call =pooler:take_member(Pool)= to obtain the pid belonging to a
+member of the pool =Pool=.  When you are done with it, return it to
+the pool using =pooler:return_member(Pool, Pid, ok)=.  If you
+encountered an error using the member, you can pass =fail= as the
+second argument.  In this case, pooler will permanently remove that
+member from the pool and start a new member to replace it.  If your
+process is short lived, you can omit the call to =return_member=.  In
+this case, pooler will detect the normal exit of the consumer and
+reclaim the member.
+
+If you would like to obtain a member from a randomly selected pool in
+a group, call =pooler:take_group_member(Group)=. This will return a
+={Pool, Pid}= pair. You will need the =Pool= value to return the
+member to its pool.
 
 *** pooler as an included application
 
@@ -191,7 +211,7 @@ cause problems. One way to work around this is to specify pooler as an
 included application in your app. This means you will call pooler's
 top-level supervisor in your app's top-level supervisor and can regain
 control over the application start order. To do this, you would remove
-pooler from the list of applications in your_app.app add
+pooler from the list of applications in your_app.app and add
 it to the included_application key:
 
 #+BEGIN_SRC erlang
@@ -265,6 +285,36 @@ When enabled, the following metrics will be tracked:
    ok
    #+END_EXAMPLE
 
+** Implementation Notes
+*** Overview of supervision
+
+[[./doc/pooler-sup-tree.png]]
+
+The top-level supervisor is pooler_sup. It supervises one supervisor
+for each pool configured in pooler's app config.
+
+At startup, a pooler_NAME_pool_sup is started for each pool described in
+pooler's app config with NAME matching the name attribute of the
+config.
+
+The pooler_NAME_pool_sup starts the gen_server that will register with
+pooler_NAME_pool as well as a pooler_NAME_member_sup that will be used
+to start and supervise the members of this pool. The
+pooler_starter_sup is used to start temporary workers used for
+managing async member start.
+
+pooler_sup:                one_for_one
+pooler_NAME_pool_sup:      all_for_one
+pooler_NAME_member_sup:    simple_one_for_one
+pooler_starter_sup:        simple_one_for_one
+
+Groups of pools are managed using the pg2 application. This imposes a
+requirement to set a configuration parameter on the kernel application
+in an OTP release. Like this in sys.config:
+#+begin_src erlang
+{kernel, [{start_pg2, true}]}
+#+end_src
+
 ** License
 Pooler is licensed under the Apache License Version 2.0.  See the
 [[file:LICENSE][LICENSE]] file for details.

+ 95 - 0
bench/Makefile

@@ -0,0 +1,95 @@
+DEPS = $(CURDIR)/deps
+
+DIALYZER_OPTS = -Wunderspecs
+
+# List dependencies that should be included in a cached dialyzer PLT file.
+# DIALYZER_DEPS = deps/app1/ebin \
+#                 deps/app2/ebin
+
+DEPS_PLT = bench.plt
+
+ERLANG_DIALYZER_APPS = asn1 \
+                       compiler \
+                       crypto \
+                       edoc \
+                       edoc \
+                       erts \
+                       eunit \
+                       eunit \
+                       gs \
+                       hipe \
+                       inets \
+                       kernel \
+                       mnesia \
+                       mnesia \
+                       observer \
+                       public_key \
+                       runtime_tools \
+                       runtime_tools \
+                       ssl \
+                       stdlib \
+                       syntax_tools \
+                       syntax_tools \
+                       tools \
+                       webtool \
+                       xmerl
+
+all: compile
+
+# Clean ebin and .eunit of this project
+clean:
+	@rebar clean skip_deps=true
+
+# Clean this project and all deps
+allclean:
+	@rebar clean
+
+compile: $(DEPS)
+	@rebar compile
+
+compile_skip:
+	@rebar compile skip_deps=true
+
+test: compile deps/basho_bench/basho_bench
+	@deps/basho_bench/basho_bench pooler.config
+	@deps/basho_bench/priv/summary.r -i tests/current
+
+deps/basho_bench/basho_bench:
+	@(cd deps/basho_bench;$(MAKE))
+
+$(DEPS):
+	@rebar get-deps
+
+# Full clean and removal of all deps. Remove deps first to avoid
+# wasted effort of cleaning deps before nuking them.
+distclean:
+	@rm -rf deps $(DEPS_PLT)
+	@rebar clean
+
+# Only include local PLT if we have deps that we are going to analyze
+ifeq ($(strip $(DIALYZER_DEPS)),)
+dialyzer: ~/.dialyzer_plt
+	@dialyzer $(DIALYZER_OPTS) -r ebin
+else
+dialyzer: ~/.dialyzer_plt $(DEPS_PLT)
+	@dialyzer $(DIALYZER_OPTS) --plts ~/.dialyzer_plt $(DEPS_PLT) -r ebin
+
+$(DEPS_PLT):
+	@dialyzer --build_plt $(DIALYZER_DEPS) --output_plt $(DEPS_PLT)
+endif
+
+~/.dialyzer_plt:
+	@echo "ERROR: Missing ~/.dialyzer_plt. Please wait while a new PLT is compiled."
+	dialyzer --build_plt --apps $(ERLANG_DIALYZER_APPS)
+	@echo "now try your build again"
+
+doc:
+	@rebar doc skip_deps=true
+
+shell:
+	erl -pa deps/*/ebin ebin
+
+tags:
+	find src deps -name "*.[he]rl" -print | etags -
+
+.PHONY: all compile eunit test dialyzer clean allclean distclean doc tags

+ 19 - 0
bench/README.md

@@ -0,0 +1,19 @@
+# bench - Pooler basho_bench Test Rig #
+
+Welcome to pooler's basho_bench test rig.
+
+## pooled_member ##
+
+Allows configurable start up delay, ability to crash on demand.
+
+## consumer ##
+
+Configurable think time, ability to crash on demand, configurable
+number of take/return ops. Fast, slow, superslow, member crash, self
+crash operations.
+
+
+
+
+
+

+ 16 - 0
bench/pooler.config

@@ -0,0 +1,16 @@
+{mode, max}.
+
+{duration, 4}.
+
+{concurrent, 5}.
+
+{driver, pooler_driver}.
+
+{key_generator, {function, pooler_driver, pool_name, []}}.
+
+{value_generator, {fixed_bin, 10000}}.
+
+% {operations, [{simple, 1}, {fast, 4}, {slow, 1}]}.
+{operations, [{simple, 1}]}.
+
+{code_paths, ["ebin", "deps/pooler/ebin"]}.

+ 14 - 0
bench/rebar.config

@@ -0,0 +1,14 @@
+%% -*- mode: erlang -*-
+%% -*- tab-width: 4;erlang-indent-level: 4;indent-tabs-mode: nil -*-
+%% ex: ts=4 sw=4 ft=erlang et
+
+{deps,
+ [
+  {pooler, ".*",
+   {git, "git://github.com/seth/pooler.git", {branch, "pooler-1-0"}}},
+
+  {basho_bench, ".*",
+   {git, "git://github.com/basho/basho_bench.git", {branch, "master"}}}
+ ]}.
+
+{cover_enabled, true}.

+ 15 - 0
bench/src/bench.app.src

@@ -0,0 +1,15 @@
+%% -*- mode: erlang -*-
+{application, bench,
+ [
+  {description, "pooler basho_bench test rig"},
+  {vsn, "0.0.1"},
+  {registered, []},
+  {applications, [
+                  kernel,
+                  stdlib
+                 ]},
+  %% uncomment if this is an active application
+  %% {mod, { bench_app, []}},
+  {env, []}
+ ]}.
+%% vim: set filetype=erlang tabstop=2

+ 6 - 0
bench/src/bench.erl

@@ -0,0 +1,6 @@
+-module(bench).
+
+-export([hello/0]).
+
+hello() ->
+    howdy.

+ 101 - 0
bench/src/consumer.erl

@@ -0,0 +1,101 @@
+%% @doc A consumer of pool members used for perf testing pooler. The
+%% consumer has a configurable think time for how long it keeps a
+%% member checked out, how many take/return cycles it performs. Jitter
+%% is added to think time. You can also request a consumer to crash or
+%% trigger a member crash.
+-module(consumer).
+-behaviour(gen_server).
+-define(SERVER, ?MODULE).
+
+-export([start_link/0,
+         run/2
+        ]).
+
+-export([
+         code_change/3,
+         handle_call/3,
+         handle_cast/2,
+         handle_info/2,
+         init/1,
+         terminate/2
+        ]).
+
+%% ------------------------------------------------------------------
+%% API Function Definitions
+%% ------------------------------------------------------------------
+
+start_link() ->
+    % not registered
+    gen_server:start_link(?MODULE, [], []).
+
+run(S, Config) ->
+    SelfCrash = proplists:get_value(consumer_crash, Config) =:= true,
+    MemberCrash = proplists:get_value(member_crash, Config) =:= true,
+    TakeCycles = proplists:get_value(take_cycles, Config),
+    ThinkTime = proplists:get_value(think_time, Config),
+    PoolName = proplists:get_value(pool_name, Config),
+    gen_server:call(S, {run, PoolName, SelfCrash, MemberCrash,
+                        TakeCycles, ThinkTime},
+                    ThinkTime * 3 * TakeCycles).
+
+%% ------------------------------------------------------------------
+%% gen_server Function Definitions
+%% ------------------------------------------------------------------
+-record(state, {
+          id,
+          ping_count = 0
+         }).
+
+init([]) ->
+    Now = erlang:now(),
+    random:seed(Now),
+    {ok, #state{id = Now}}.
+
+handle_call({run, PoolName, SelfCrash, MemberCrash,
+             TakeCycles, ThinkTime}, _From, State) ->
+    CrashData = crash_data(SelfCrash, MemberCrash, TakeCycles),
+    run_cycles(ThinkTime, TakeCycles, CrashData, PoolName),
+    {reply, ok, State};
+handle_call(_Request, _From, State) ->
+    {noreply, ok, State}.
+
+run_cycles(_ThinkTime, 0, _, _) ->
+    done;
+run_cycles(_ThinkTime, CrashIdx, {CrashIdx, _}, _) ->
+    %% self crash
+    erlang:error({consumer, self_crash_requested});
+run_cycles(ThinkTime, CrashIdx, {_, CrashIdx} = CrashData, PoolName) ->
+    %% member crash request
+    M = pooler:take_member(PoolName),
+    member:crash(M),
+    run_cycles(ThinkTime, CrashIdx - 1, CrashData, PoolName);
+run_cycles(ThinkTime, Idx, CrashData, PoolName) ->
+    M = pooler:take_member(PoolName),
+    Think = ThinkTime + random:uniform(ThinkTime),
+    timer:sleep(Think),
+    pooler:return_member(PoolName, M),
+    run_cycles(ThinkTime, Idx - 1, CrashData, PoolName).
+
+%% only support a single crash type. So if self crash is requested,
+%% we'll never crash the member.
+crash_data(false, false, _) ->
+    {never, never};
+crash_data(true, _, TakeCycles) ->
+    {random:uniform(TakeCycles), never};
+crash_data(false, true, TakeCycles) ->
+    {never, random:uniform(TakeCycles)}.
+
+handle_cast(crash, _State) ->
+    erlang:error({member, requested_crash});
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+

+ 23 - 0
bench/src/consumer_sup.erl

@@ -0,0 +1,23 @@
+-module(consumer_sup).
+
+-behaviour(supervisor).
+
+-export([
+         init/1,
+         new_consumer/0,
+         start_link/0
+        ]).
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init(Args) ->
+    Worker = {consumer, {consumer, start_link, Args},
+              temporary,                        % never restart workers
+              brutal_kill, worker, [consumer]},
+    Specs = [Worker],
+    Restart = {simple_one_for_one, 1, 1},
+    {ok, {Restart, Specs}}.
+
+new_consumer() ->
+    supervisor:start_child(?MODULE, []).

+ 96 - 0
bench/src/member.erl

@@ -0,0 +1,96 @@
+%% @doc A pool member used for perf testing pooler. The member has a
+%% configurable start-up delay. You set a delay value and actual start
+%% delay will be `delay + random:uniform(delay)'. The module supports
+%% a crash function to make the member crash.
+-module(member).
+-behaviour(gen_server).
+-define(SERVER, ?MODULE).
+
+%% ------------------------------------------------------------------
+%% API Function Exports
+%% ------------------------------------------------------------------
+
+-export([start_link/1,
+         ping/1,
+         ping_count/1,
+         crash/1,
+         stop/1
+        ]).
+
+%% ------------------------------------------------------------------
+%% gen_server Function Exports
+%% ------------------------------------------------------------------
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+%% ------------------------------------------------------------------
+%% API Function Definitions
+%% ------------------------------------------------------------------
+
+start_link(Config) ->
+    % not registered
+    gen_server:start_link(?MODULE, Config, []).
+
+ping(S) ->
+    gen_server:call(S, ping).
+
+ping_count(S) ->
+    gen_server:call(S, ping_count).
+
+crash(S) ->
+    gen_server:cast(S, crash),
+    sent_crash_request.
+
+stop(S) ->
+    gen_server:call(S, stop).
+
+%% ------------------------------------------------------------------
+%% gen_server Function Definitions
+%% ------------------------------------------------------------------
+-record(state, {
+          id,
+          ping_count = 0
+         }).
+
+init(Config) ->
+    start_up_delay(Config),
+    {ok, #state{id = make_ref()}}.
+
+%% pause server init based on start_up_delay config plus jitter (of up
+%% to 2x delay)
+start_up_delay(Config) ->
+    case proplists:get_value(start_up_delay, Config) of
+        T when is_integer(T) ->
+            random:seed(erlang:now()),
+            J = random:uniform(T),
+            timer:sleep(T + J),
+            ok;
+        _ ->
+            ok
+    end.
+
+handle_call(ping, _From, #state{ping_count = C } = State) ->
+    State1 = State#state{ping_count = C + 1},
+    {reply, pong, State1};
+handle_call(ping_count, _From, #state{ping_count = C } = State) ->
+    {reply, C, State};
+handle_call(stop, _From, State) ->
+    {stop, normal, stop_ok, State};
+handle_call(_Request, _From, State) ->
+    {noreply, ok, State}.
+
+handle_cast(crash, _State) ->
+    erlang:error({member, requested_crash});
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+

+ 24 - 0
bench/src/member_sup.erl

@@ -0,0 +1,24 @@
+-module(member_sup).
+
+-behaviour(supervisor).
+
+-export([
+         init/1,
+         new_member/1,
+         start_link/0
+        ]).
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init(Args) ->
+    Worker = {member, {member, start_link, Args},
+              temporary,                        % never restart workers
+              brutal_kill, worker, [member]},
+    Specs = [Worker],
+    Restart = {simple_one_for_one, 1, 1},
+    {ok, {Restart, Specs}}.
+
+new_member(Delay) ->
+    Config = [{start_up_delay, Delay}],
+    supervisor:start_child(?MODULE, [Config]).

+ 73 - 0
bench/src/pooler_driver.erl

@@ -0,0 +1,73 @@
+%% @doc basho_bench driver for pooler testing
+-module(pooler_driver).
+
+-export([
+         new/1,
+         pool_name/1,
+         run/4
+         ]).
+
+
+-record(state, {
+          %% integer id received from new/1
+          id = 0,
+
+          %% pid of consumer worker process
+          consumer = undefined
+         }).
+          
+new(ID) ->
+    %% this is bogus, b/c called too many times.
+    init_driver(),
+    {ok, Consumer} = consumer_sup:new_consumer(),
+    {ok, #state{id = ID, consumer = Consumer}}.
+
+%% KeyGen can be a function that returns a pool name atom.
+run(simple, PoolNameFun, _ValueGen, #state{consumer = _C} = State) ->
+    PoolName = PoolNameFun(),
+    case pooler:take_member(PoolName) of
+        error_no_members ->
+            {error, error_no_members, State};
+        Pid ->
+            pooler:return_member(PoolName, Pid),
+            {ok, State}
+    end;
+run(fast, PoolNameFun, _ValueGen, #state{consumer = C} = State) ->
+    PoolName = PoolNameFun(),
+    ConsumerOpts = [{consumer_crash, false},
+                    {member_crash, false},
+                    {take_cycles, 1},
+                    {think_time, 10},
+                    {pool_name, PoolName}
+                   ],
+    consumer:run(C, ConsumerOpts),
+    {ok, State};
+run(slow, PoolNameFun, _ValueGen, #state{consumer = C} = State) ->
+    PoolName = PoolNameFun(),
+    ConsumerOpts = [{consumer_crash, false},
+                    {member_crash, false},
+                    {take_cycles, 1},
+                    {think_time, 200},
+                    {pool_name, PoolName}
+                   ],
+    consumer:run(C, ConsumerOpts),
+    {ok, State}.
+
+
+
+%% gets called as the PoolNameFun aka key_generator via basho_bench config
+pool_name(_Id) ->
+    fun() -> p1 end.
+
+init_driver() ->
+    consumer_sup:start_link(),
+    member_sup:start_link(),
+    application:start(pooler),
+    Delay = 1000,
+    PoolConfig = [{name, p1},
+                  {max_count, 5},
+                  {init_count, 2},
+                  {start_mfa,
+                   {member_sup, new_member, [Delay]}}],
+    pooler:new_pool(PoolConfig),
+    ok.

+ 1 - 62
doc/overview.edoc

@@ -1,5 +1,5 @@
 @author Seth Falcon <seth@userprimary.net>
-@copyright 2011 Seth Falcon
+@copyright 2011-2013 Seth Falcon
 @title pooler - An OTP Process Pool Application
 @doc 
 The pooler application allows you to manage pools of OTP behaviors
@@ -8,64 +8,3 @@ with exclusive access to pool members using pooler:take_member.
 
 See the README.org file for a good introduction to what pooler is all
 about.
-
-== Pooler Configuration ==
-
-Pool configuration is specified in the pooler application's
-environment.  This can be provided in a config file using `-config' or
-set at startup using `application:set_env(pooler, pools, Pools)'.
-Here's an example config file that creates three pools of
-Riak pb clients each talking to a different node in a local cluster:
-
-```
-% pooler.config
-% Start Erlang as: erl -config pooler
-% -*- mode: erlang -*-
-% pooler app config
-[
- {pooler, [
-         {pools, [
-                  [{name, "rc8081"},
-                   {max_count, 5},
-                   {init_count, 2},
-                   {start_mfa,
-                    {riakc_pb_socket, start_link, ["localhost", 8081]}}],
-
-                  [{name, "rc8082"},
-                   {max_count, 5},
-                   {init_count, 2},
-                   {start_mfa,
-                    {riakc_pb_socket, start_link, ["localhost", 8082]}}],
-
-                  [{name, "rc8083"},
-                   {max_count, 5},
-                   {init_count, 2},
-                   {start_mfa,
-                    {riakc_pb_socket, start_link, ["localhost", 8083]}}]
-                 ]}
-        ]}
-].
-'''
-
-== Using pooler ==
-
-Here's an example session:
-
-```
-application:start(pooler).
-P = pooler:take_member(),
-% use P
-pooler:return_member(P, ok).
-'''
-
-Once started, the main interaction you will have with pooler is through
-two functions, `take_member/0' and `return_member/2'.
-
-Call `pooler:take_member()' to obtain a member from a randomly
-selected pool.  When you are done with it, return it to the pool using
-`pooler:return_member(Pid, ok)'.  If you encountered an error using
-the member, you can pass `fail' as the second argument.  In this case,
-pooler will permanently remove that member from the pool and start a
-new member to replace it.  If your process is short lived, you can
-omit the call to `return_member'.  In this case, pooler will detect
-the normal exit of the consumer and reclaim the member.

BIN
doc/pooler-sup-tree.png


+ 2 - 2
pooler.config.example → pooler-example.config

@@ -3,13 +3,13 @@
 [
  {pooler, [
            {pools, [
-                    [{name, "pool1"},
+                    [{name, pool1},
                      {max_count, 5},
                      {init_count, 2},
                      {start_mfa,
                       {pooled_gs, start_link, [{"p1"}]}}],
                     
-                    [{name, "pool2"},
+                    [{name, pool2},
                      {max_count, 5},
                      {init_count, 2},
                      {start_mfa,

+ 1 - 2
src/pooler.app.src

@@ -5,8 +5,7 @@
   {registered, []},
   {applications, [
                   kernel,
-                  stdlib,
-                  crypto
+                  stdlib
                  ]},
   {mod, { pooler_app, []}},
   {env, []}

+ 409 - 381
src/pooler.erl

@@ -1,5 +1,5 @@
 %% @author Seth Falcon <seth@userprimary.net>
-%% @copyright 2011-2012 Seth Falcon
+%% @copyright 2011-2013 Seth Falcon
 %% @doc This is the main interface to the pooler application
 %%
 %% To integrate with your application, you probably want to call
@@ -10,21 +10,12 @@
 %%
 -module(pooler).
 -behaviour(gen_server).
--define(SERVER, ?MODULE).
-
--define(DEFAULT_ADD_RETRY, 1).
--define(DEFAULT_CULL_INTERVAL, {0, min}).
--define(DEFAULT_MAX_AGE, {0, min}).
 
+-include("pooler.hrl").
 -include_lib("eunit/include/eunit.hrl").
 
--type member_info() :: {string(), free | pid(), {_, _, _}}.
--type free_member_info() :: {string(), free, {_, _, _}}.
--type time_unit() :: min | sec | ms | mu.
--type time_spec() :: {non_neg_integer(), time_unit()}.
 
 %% type specs for pool metrics
--type metric_label() :: binary().
 -type metric_value() :: 'unknown_pid' |
                         non_neg_integer() |
                         {'add_pids_failed', non_neg_integer(), non_neg_integer()} |
@@ -32,56 +23,22 @@
                         'error_no_members'.
 -type metric_type() :: 'counter' | 'histogram' | 'history' | 'meter'.
 
--record(pool, {
-          name             :: string(),
-          max_count = 100  :: non_neg_integer(),
-          init_count = 10  :: non_neg_integer(),
-          start_mfa        :: {atom(), atom(), [term()]},
-          free_pids = []   :: [pid()],
-          in_use_count = 0 :: non_neg_integer(),
-          free_count = 0   :: non_neg_integer(),
-          %% The number times to attempt adding a pool member if the
-          %% pool size is below max_count and there are no free
-          %% members. After this many tries, error_no_members will be
-          %% returned by a call to take_member. NOTE: this value
-          %% should be >= 2 or else the pool will not grow on demand
-          %% when max_count is larger than init_count.
-          add_member_retry = ?DEFAULT_ADD_RETRY :: non_neg_integer(),
-
-          %% The interval to schedule a cull message. Both
-          %% 'cull_interval' and 'max_age' are specified using a
-          %% `time_spec()' type.
-          cull_interval = ?DEFAULT_CULL_INTERVAL :: time_spec(),
-          %% The maximum age for members.
-          max_age = ?DEFAULT_MAX_AGE             :: time_spec()
-         }).
-
--record(state, {
-          npools                       :: non_neg_integer(),
-          pools = dict:new()           :: dict(),
-          pool_sups = dict:new()       :: dict(),
-          all_members = dict:new()     :: dict(),
-          consumer_to_pid = dict:new() :: dict(),
-          pool_selector                :: array()
-         }).
-
--define(gv(X, Y), proplists:get_value(X, Y)).
--define(gv(X, Y, D), proplists:get_value(X, Y, D)).
-
 %% ------------------------------------------------------------------
 %% API Function Exports
 %% ------------------------------------------------------------------
 
--export([start/1,
+-export([accept_member/2,
          start_link/1,
-         stop/0,
-         take_member/0,
          take_member/1,
-         return_member/1,
+         take_group_member/1,
+         return_group_member/2,
+         return_group_member/3,
          return_member/2,
-         % remove_pool/2,
-         % add_pool/1,
-         pool_stats/0]).
+         return_member/3,
+         pool_stats/1,
+         manual_start/0,
+         new_pool/1,
+         rm_pool/1]).
 
 %% ------------------------------------------------------------------
 %% gen_server Function Exports
@@ -103,143 +60,230 @@
 %% API Function Definitions
 %% ------------------------------------------------------------------
 
-start_link(Config) ->
-    gen_server:start_link({local, ?SERVER}, ?MODULE, Config, []).
+start_link(#pool{name = Name} = Pool) ->
+    gen_server:start_link({local, Name}, ?MODULE, Pool, []).
 
-start(Config) ->
-    gen_server:start_link({local, ?SERVER}, ?MODULE, Config, []).
+manual_start() ->
+    application:start(sasl),
+    application:start(pooler).
 
-stop() ->
-    gen_server:call(?SERVER, stop).
-
-%% @doc Obtain exclusive access to a member from a randomly selected pool.
+%% @doc Start a new pool described by the proplist `PoolConfig'. The
+%% following keys are required in the proplist:
 %%
-%% If there are no free members in the randomly selected pool, then a
-%% member will be returned from the pool with the most free members.
-%% If no free members are available, 'error_no_members' is returned.
+%% <dl>
+%% <dt>`name'</dt>
+%% <dd>An atom giving the name of the pool.</dd>
+%% <dt>`init_count'</dt>
+%% <dd>Number of members to add to the pool at start. When the pool is
+%% started, `init_count' members will be started in parallel.</dd>
+%% <dt>`max_count'</dt>
+%% <dd>Maximum number of members in the pool.</dd>
+%% <dt>`start_mfa'</dt>
+%% <dd>A tuple of the form `{Mod, Fun, Args}' describing how to start
+%% new pool members.</dd>
+%% </dl>
 %%
--spec take_member() -> pid() | error_no_members.
-take_member() ->
-    gen_server:call(?SERVER, take_member, infinity).
+%% In addition, you can specify any of the following optional
+%% configuration options:
+%%
+%% <dl>
+%% <dt>`group'</dt>
+%% <dd>An atom giving the name of the group this pool belongs
+%% to. Pools sharing a common `group' value can be accessed using
+%% {@link take_group_member/1} and {@link return_group_member/2}.</dd>
+%% <dt>`cull_interval'</dt>
+%% <dd>Time between checks for stale pool members. Specified as
+%% `{Time, Unit}' where `Time' is a non-negative integer and `Unit'
+%% is one of `min', `sec', `ms', or `mu'. The default value of `{0,
+%% min}' disables stale member checking. When `Time' is greater than
+%% zero, a message will be sent to the pool at the configured interval
+%% to trigger the removal of members that have not been accessed in
+%% `max_age' time units.</dd>
+%% <dt>`max_age'</dt>
+%% <dd>Members idle longer than `max_age' time units are removed from
+%% the pool when stale checking is enabled via
+%% `cull_interval'. Culling of idle members will never reduce the pool
+%% below `init_count'. The value is specified as `{Time, Unit}'. Note
+%% that timers are not set on individual pool members and may remain
+%% in the pool beyond the configured `max_age' value since members are
+%% only removed on the interval configured via `cull_interval'.</dd>
+%% </dl>
+new_pool(PoolConfig) ->
+    pooler_sup:new_pool(PoolConfig).
+
+%% @doc Terminate the named pool.
+rm_pool(PoolName) ->
+    pooler_sup:rm_pool(PoolName).
+
+%% @doc For INTERNAL use. Adds `MemberPid' to the pool.
+-spec accept_member(atom() | pid(), pid() | {noproc, _}) -> ok.
+accept_member(PoolName, MemberPid) ->
+    gen_server:call(PoolName, {accept_member, MemberPid}).
 
 %% @doc Obtain exclusive access to a member from `PoolName'.
 %%
 %% If no free members are available, 'error_no_members' is returned.
 %%
--spec take_member(string()) -> pid() | error_no_members | error_no_pool.
-take_member(PoolName) when is_list(PoolName) ->
-    gen_server:call(?SERVER, {take_member, PoolName}, infinity).
+-spec take_member(atom() | pid()) -> pid() | error_no_members.
+take_member(PoolName) when is_atom(PoolName) orelse is_pid(PoolName) ->
+    gen_server:call(PoolName, take_member, infinity).
+
+%% @doc Take a member from a randomly selected member of the group
+%% `GroupName'. Returns `MemberPid' or `error_no_members'.  If no
+%% members are available in the randomly chosen pool, all other pools
+%% in the group are tried in order.
+-spec take_group_member(atom()) -> pid() | error_no_members | {error_no_group, atom()}.
+take_group_member(GroupName) ->
+    case pg2:get_local_members(GroupName) of
+        {error, {no_such_group, GroupName}} ->
+            {error_no_group, GroupName};
+        [] ->
+            error_no_members;
+        Pools ->
+            %% Put a random member at the front of the list and then
+            %% return the first member you can walking the list.
+            {_, _, X} = erlang:now(),
+            Idx = (X rem length(Pools)) + 1,
+            {PoolPid, Rest} = extract_nth(Idx, Pools),
+            take_first_pool([PoolPid | Rest])
+    end.
+
+take_first_pool([PoolPid | Rest]) ->
+    case take_member(PoolPid) of
+        error_no_members ->
+            take_first_pool(Rest);
+        Member ->
+            ets:insert(?POOLER_GROUP_TABLE, {Member, PoolPid}),
+            Member
+    end;
+take_first_pool([]) ->
+    error_no_members.
+
+%% this helper function returns `{Nth_Elt, Rest}' where `Nth_Elt' is
+%% the nth element of `L' and `Rest' is `L -- [Nth_Elt]'.
+extract_nth(N, L) ->
+    extract_nth(N, L, []).
+
+extract_nth(1, [H | T], Acc) ->
+    {H, Acc ++ T};
+extract_nth(N, [H | T], Acc) ->
+    extract_nth(N - 1, T, [H | Acc]);
+extract_nth(_, [], _) ->
+    error(badarg).
+
+%% @doc Return a member that was taken from the group
+%% `GroupName'. This is a convenience function for
+%% `return_group_member/3' with `Status' of `ok'.
+-spec return_group_member(atom(), pid() | error_no_members) -> ok.
+return_group_member(GroupName, MemberPid) ->
+    return_group_member(GroupName, MemberPid, ok).
+
+%% @doc Return a member that was taken from the group `GroupName'. If
+%% `Status' is `ok' the member is returned to the pool from which is
+%% came. If `Status' is `fail' the member will be terminated and a new
+%% member added to the appropriate pool.
+-spec return_group_member(atom(), pid() | error_no_members, ok | fail) -> ok.
+return_group_member(_, error_no_members, _) ->
+    ok;
+return_group_member(_GroupName, MemberPid, Status) ->
+    case ets:lookup(?POOLER_GROUP_TABLE, MemberPid) of
+        [{MemberPid, PoolPid}] ->
+            return_member(PoolPid, MemberPid, Status);
+        [] ->
+            ok
+    end.
 
 %% @doc Return a member to the pool so it can be reused.
 %%
 %% If `Status' is 'ok', the member is returned to the pool.  If
 %% `Status' is 'fail', the member is destroyed and a new member is
 %% added to the pool in its place.
--spec return_member(pid() | error_no_members, ok | fail) -> ok.
-return_member(Pid, Status) when is_pid(Pid) andalso
-                                (Status =:= ok orelse Status =:= fail) ->
-    gen_server:call(?SERVER, {return_member, Pid, Status}, infinity),
+-spec return_member(atom() | pid(), pid() | error_no_members, ok | fail) -> ok.
+return_member(PoolName, Pid, Status) when is_pid(Pid) andalso
+                                          (is_atom(PoolName) orelse
+                                           is_pid(PoolName)) andalso
+                                          (Status =:= ok orelse
+                                           Status =:= fail) ->
+    gen_server:call(PoolName, {return_member, Pid, Status}, infinity),
     ok;
-return_member(error_no_members, _) ->
+return_member(_, error_no_members, _) ->
     ok.
 
 %% @doc Return a member to the pool so it can be reused.
 %%
--spec return_member(pid() | error_no_members) -> ok.
-return_member(Pid) when is_pid(Pid) ->
-    gen_server:call(?SERVER, {return_member, Pid, ok}, infinity),
+-spec return_member(atom() | pid(), pid() | error_no_members) -> ok.
+return_member(PoolName, Pid) when is_pid(Pid) andalso
+                                  (is_atom(PoolName) orelse is_pid(PoolName)) ->
+    gen_server:call(PoolName, {return_member, Pid, ok}, infinity),
     ok;
-return_member(error_no_members) ->
+return_member(_, error_no_members) ->
     ok.
 
-% TODO:
-% remove_pool(Name, How) when How == graceful; How == immediate ->
-%     gen_server:call(?SERVER, {remove_pool, Name, How}).
-
-% TODO:
-% add_pool(Pool) ->
-%     gen_server:call(?SERVER, {add_pool, Pool}).
-
 %% @doc Obtain runtime state info for all pools.
 %%
 %% Format of the return value is subject to change.
--spec pool_stats() -> [tuple()].
-pool_stats() ->
-    gen_server:call(?SERVER, pool_stats).
+-spec pool_stats(atom() | pid()) -> [tuple()].
+pool_stats(PoolName) ->
+    gen_server:call(PoolName, pool_stats).
 
 %% ------------------------------------------------------------------
 %% gen_server Function Definitions
 %% ------------------------------------------------------------------
 
--spec init([any()]) -> {'ok', #state{npools::'undefined' | non_neg_integer(),
-                                     pools::dict(),
-                                     pool_sups::dict(),
-                                     all_members::dict(),
-                                     consumer_to_pid::dict(),
-                                     pool_selector::'undefined' | array()}}.
-init(Config) ->
-    process_flag(trap_exit, true),
-    PoolRecs = [ props_to_pool(P) || P <- ?gv(pools, Config) ],
-    Pools = [ {Pool#pool.name, Pool} || Pool <-  PoolRecs ],
-    PoolSups = [ begin
-                  {ok, SupPid} = supervisor:start_child(pooler_pool_sup, [MFA]),
-                  {Name, SupPid}
-                 end || #pool{name = Name, start_mfa = MFA} <- PoolRecs ],
-    State0 = #state{npools = length(Pools),
-                    pools = dict:from_list(Pools),
-                    pool_sups = dict:from_list(PoolSups),
-                    pool_selector = array:from_list([PN || {PN, _} <- Pools])
-                  },
-
-    lists:foldl(fun(#pool{name = PName, init_count = N}, {ok, AccState}) ->
-                        AccState1 = cull_members(PName, AccState),
-                        add_pids(PName, N, AccState1)
-                end, {ok, State0}, PoolRecs).
-
-handle_call(take_member, {CPid, _Tag},
-            #state{pool_selector = PS, npools = NP} = State) ->
-    % attempt to return a member from a randomly selected pool.  If
-    % that pool has no members, find the pool with most free members
-    % and return a member from there.
-    PoolName = array:get(crypto:rand_uniform(0, NP), PS),
-    case take_member(PoolName, CPid, State) of
-        {error_no_members, NewState} ->
-            case max_free_pool(State#state.pools) of
-                error_no_members ->
-                    {reply, error_no_members, NewState};
-                MaxFreePoolName ->
-                    {NewPid, State2} = take_member(MaxFreePoolName, CPid,
-                                                   NewState),
-                    {reply, NewPid, State2}
-            end;
-        {NewPid, NewState} ->
-            {reply, NewPid, NewState}
-    end;
-handle_call({take_member, PoolName}, {CPid, _Tag}, #state{} = State) ->
-    {Member, NewState} = take_member(PoolName, CPid, State),
-    {reply, Member, NewState};
-handle_call({return_member, Pid, Status}, {_CPid, _Tag}, State) ->
-    {reply, ok, do_return_member(Pid, Status, State)};
-handle_call(stop, _From, State) ->
-    {stop, normal, stop_ok, State};
-handle_call(pool_stats, _From, State) ->
-    {reply, dict:to_list(State#state.all_members), State};
-handle_call(_Request, _From, State) ->
-    {noreply, State}.
+-spec init(#pool{}) -> {'ok', #pool{}, 0}.
+init(#pool{}=Pool) ->
+    #pool{init_count = N} = Pool,
+    MemberSup = pooler_pool_sup:member_sup_name(Pool),
+    Pool1 = set_member_sup(Pool, MemberSup),
+    %% This schedules the next cull when the pool is configured for
+    %% such and is otherwise a no-op.
+    Pool2 = cull_members_from_pool(Pool1),
+    {ok, NewPool} = init_members_sync(N, Pool2),
+    %% trigger an immediate timeout, handled by handle_info to allow
+    %% us to register with pg2. We use the timeout mechanism to ensure
+    %% that a server is added to a group only when it is ready to
+    %% process messages.
+    {ok, NewPool, 0}.
+
+set_member_sup(#pool{} = Pool, MemberSup) ->
+    Pool#pool{member_sup = MemberSup}.
+
+handle_call(take_member, {CPid, _Tag}, #pool{} = Pool) ->
+    {Member, NewPool} = take_member_from_pool(Pool, CPid),
+    {reply, Member, NewPool};
+handle_call({return_member, Pid, Status}, {_CPid, _Tag}, Pool) ->
+    {reply, ok, do_return_member(Pid, Status, Pool)};
+handle_call({accept_member, Pid}, _From, Pool) ->
+    {reply, ok, do_accept_member(Pid, Pool)};
+handle_call(stop, _From, Pool) ->
+    {stop, normal, stop_ok, Pool};
+handle_call(pool_stats, _From, Pool) ->
+    {reply, dict:to_list(Pool#pool.all_members), Pool};
+handle_call(dump_pool, _From, Pool) ->
+    {reply, Pool, Pool};
+handle_call(_Request, _From, Pool) ->
+    {noreply, Pool}.
 
 -spec handle_cast(_,_) -> {'noreply', _}.
-handle_cast(_Msg, State) ->
-    {noreply, State}.
+handle_cast(_Msg, Pool) ->
+    {noreply, Pool}.
 
 -spec handle_info(_, _) -> {'noreply', _}.
-handle_info({'EXIT', Pid, Reason}, State) ->
+handle_info(timeout, #pool{group = undefined} = Pool) ->
+    %% ignore
+    {noreply, Pool};
+handle_info(timeout, #pool{group = Group} = Pool) ->
+    ok = pg2:create(Group),
+    ok = pg2:join(Group, self()),
+    {noreply, Pool};
+handle_info({'DOWN', MRef, process, Pid, Reason}, State) ->
     State1 =
-        case dict:find(Pid, State#state.all_members) of
+        case dict:find(Pid, State#pool.all_members) of
             {ok, {_PoolName, _ConsumerPid, _Time}} ->
                 do_return_member(Pid, fail, State);
             error ->
-                case dict:find(Pid, State#state.consumer_to_pid) of
-                    {ok, Pids} ->
+                case dict:find(Pid, State#pool.consumer_to_pid) of
+                    {ok, {MRef, Pids}} ->
                         IsOk = case Reason of
                                    normal -> ok;
                                    _Crash -> fail
@@ -252,8 +296,8 @@ handle_info({'EXIT', Pid, Reason}, State) ->
                 end
         end,
     {noreply, State1};
-handle_info({cull_pool, PoolName}, State) ->
-    {noreply, cull_members(PoolName, State)};
+handle_info(cull_pool, Pool) ->
+    {noreply, cull_members_from_pool(Pool)};
 handle_info(_Info, State) ->
     {noreply, State}.
 
@@ -269,143 +313,171 @@ code_change(_OldVsn, State, _Extra) ->
 %% Internal Function Definitions
 %% ------------------------------------------------------------------
 
--spec props_to_pool([{atom(), term()}]) -> #pool{}.
-props_to_pool(P) ->
-    #pool{      name = ?gv(name, P),
-           max_count = ?gv(max_count, P),
-          init_count = ?gv(init_count, P),
-           start_mfa = ?gv(start_mfa, P),
-    add_member_retry = ?gv(add_member_retry, P, ?DEFAULT_ADD_RETRY),
-       cull_interval = ?gv(cull_interval, P, ?DEFAULT_CULL_INTERVAL),
-             max_age = ?gv(max_age, P, ?DEFAULT_MAX_AGE)}.
-
-% FIXME: creation of new pids should probably happen
-% in a spawned process to avoid tying up the loop.
--spec add_pids(error | string(), non_neg_integer(), #state{}) ->
-    {bad_pool_name | max_count_reached | ok, #state{}}.
-add_pids(error, _N, State) ->
-    {bad_pool_name, State};
-add_pids(PoolName, N, State) ->
-    #state{pools = Pools, all_members = AllMembers} = State,
-    Pool = fetch_pool(PoolName, Pools),
-    #pool{max_count = Max, free_pids = Free,
-          in_use_count = NumInUse, free_count = NumFree} = Pool,
-    Total = NumFree + NumInUse,
-    case Total + N =< Max of
+do_accept_member({Ref, Pid},
+                 #pool{
+                    all_members = AllMembers,
+                    free_pids = Free,
+                    free_count = NumFree,
+                    starting_members = StartingMembers0
+                   } = Pool) when is_pid(Pid) ->
+    %% make sure we don't accept a timedout member
+    StartingMembers = remove_stale_starting_members(Pool, StartingMembers0,
+                                                    ?DEFAULT_MEMBER_START_TIMEOUT),
+    case lists:keymember(Ref, 1, StartingMembers) of
+        false ->
+            %% a pid we didn't ask to start, ignore it.
+            %% should we log it?
+            Pool;
         true ->
-            PoolSup = dict:fetch(PoolName, State#state.pool_sups),
-            {AllMembers1, NewPids} = start_n_pids(N, PoolName, PoolSup,
-                                                  AllMembers),
-            %% start_n_pids may return fewer than N if errors were
-            %% encountered.
-            NewPidCount = length(NewPids),
-            case NewPidCount =:= N of
-                true -> ok;
-                false ->
-                    error_logger:error_msg("tried to add ~B members, only added ~B~n",
-                                           [N, NewPidCount]),
-                    send_metric(<<"pooler.events">>,
-                                {add_pids_failed, N, NewPidCount}, history)
-            end,
-            Pool1 = Pool#pool{free_pids = Free ++ NewPids,
-                              free_count = length(Free) + NewPidCount},
-            {ok, State#state{pools = store_pool(PoolName, Pool1, Pools),
-                             all_members = AllMembers1}};
+            StartingMembers1 = lists:keydelete(Ref, 1, StartingMembers),
+            MRef = erlang:monitor(process, Pid),
+            Entry = {MRef, free, os:timestamp()},
+            AllMembers1 = store_all_members(Pid, Entry, AllMembers),
+            Pool#pool{free_pids = Free ++ [Pid],
+                      free_count = NumFree + 1,
+                      all_members = AllMembers1,
+                      starting_members = StartingMembers1}
+    end;
+do_accept_member({Ref, _Reason}, #pool{starting_members = StartingMembers0} = Pool) ->
+    %% member start failed, remove in-flight ref and carry on.
+    StartingMembers = remove_stale_starting_members(Pool, StartingMembers0,
+                                                    ?DEFAULT_MEMBER_START_TIMEOUT),
+    StartingMembers1 = lists:keydelete(Ref, 1, StartingMembers),
+    Pool#pool{starting_members = StartingMembers1}.
+
+
+-spec remove_stale_starting_members(#pool{}, [{reference(), erlang:timestamp()}],
+                                    time_spec()) -> [{reference(), erlang:timestamp()}].
+remove_stale_starting_members(Pool, StartingMembers, MaxAge) ->
+    Now = os:timestamp(),
+    MaxAgeSecs = time_as_secs(MaxAge),
+    lists:filter(fun(SM) ->
+                         starting_member_not_stale(Pool, Now, SM, MaxAgeSecs)
+                 end, StartingMembers).
+
+starting_member_not_stale(Pool, Now, {_Ref, StartTime}, MaxAgeSecs) ->
+    case secs_between(StartTime, Now) < MaxAgeSecs of
+        true ->
+            true;
         false ->
-            {max_count_reached, State}
+            error_logger:error_msg("pool '~s': starting member timeout", [Pool#pool.name]),
+            send_metric(Pool, starting_member_timeout, {inc, 1}, counter),
+            false
+    end.
+
+init_members_sync(N, #pool{name = PoolName} = Pool) ->
+    Self = self(),
+    StartTime = os:timestamp(),
+    StartRefs = [ {pooler_starter:start_member(Pool, Self), StartTime}
+                  || _I <- lists:seq(1, N) ],
+    Pool1 = Pool#pool{starting_members = StartRefs},
+    case collect_init_members(Pool1) of
+        timeout ->
+            error_logger:error_msg("pool '~s': exceeded timeout waiting for ~B members",
+                                   [PoolName, Pool1#pool.init_count]),
+            error({timeout, "unable to start members"});
+        #pool{} = Pool2 ->
+            {ok, Pool2}
+    end.
+
+collect_init_members(#pool{starting_members = []} = Pool) ->
+    Pool;
+collect_init_members(#pool{} = Pool) ->
+    Timeout = time_as_millis(?DEFAULT_MEMBER_START_TIMEOUT),
+    receive
+        {accept_member, {Ref, Member}} ->
+            collect_init_members(do_accept_member({Ref, Member}, Pool))
+    after
+        Timeout ->
+            timeout
     end.
 
--spec take_member(string(), {pid(), _}, #state{}) ->
-    {error_no_pool | error_no_members | pid(), #state{}}.
-take_member(PoolName, From, #state{pools = Pools} = State) ->
-    Pool = fetch_pool(PoolName, Pools),
-    take_member_from_pool(Pool, From, State, pool_add_retries(Pool)).
-
--spec take_member_from_pool(error_no_pool | #pool{}, {pid(), term()}, #state{},
-                            non_neg_integer()) ->
-                                   {error_no_pool | error_no_members | pid(), #state{}}.
-take_member_from_pool(error_no_pool, _From, State, _) ->
-    {error_no_pool, State};
-take_member_from_pool(#pool{name = PoolName,
+-spec take_member_from_pool(#pool{}, {pid(), term()}) ->
+                                   {error_no_members | pid(), #pool{}}.
+take_member_from_pool(#pool{init_count = InitCount,
                             max_count = Max,
                             free_pids = Free,
                             in_use_count = NumInUse,
-                            free_count = NumFree} = Pool,
-                      From,
-                      #state{pools = Pools, consumer_to_pid = CPMap} = State,
-                      Retries) ->
-    send_metric(pool_metric(PoolName, take_rate), 1, meter),
+                            free_count = NumFree,
+                            consumer_to_pid = CPMap,
+                            starting_members = StartingMembers0} = Pool,
+                      From) ->
+    send_metric(Pool, take_rate, 1, meter),
+    StartingMembers = remove_stale_starting_members(Pool, StartingMembers0,
+                                                    ?DEFAULT_MEMBER_START_TIMEOUT),
+    NumCanAdd = Max - (NumInUse + NumFree + length(StartingMembers)),
     case Free of
-        [] when NumInUse =:= Max ->
-            send_metric(<<"pooler.error_no_members_count">>, {inc, 1}, counter),
-            send_metric(<<"pooler.events">>, error_no_members, history),
-            {error_no_members, State};
-        [] when NumInUse < Max andalso Retries > 0 ->
-            case add_pids(PoolName, 1, State) of
-                {ok, State1} ->
-                    %% add_pids may have updated our pool
-                    Pool1 = fetch_pool(PoolName, State1#state.pools),
-                    take_member_from_pool(Pool1, From, State1, Retries - 1);
-                {max_count_reached, _} ->
-                    send_metric(<<"pooler.error_no_members_count">>, {inc, 1}, counter),
-                    send_metric(<<"pooler.events">>, error_no_members, history),
-                    {error_no_members, State}
-            end;
-        [] when Retries =:= 0 ->
-            %% max retries reached
-            send_metric(<<"pooler.error_no_members_count">>, {inc, 1}, counter),
-            {error_no_members, State};
+        [] when NumCanAdd =< 0  ->
+            send_metric(Pool, error_no_members_count, {inc, 1}, counter),
+            send_metric(Pool, events, error_no_members, history),
+            {error_no_members, Pool};
+        [] when NumCanAdd > 0 ->
+            %% Limit concurrently starting members to init_count. Add
+            %% up to init_count members. Starting members here means
+            %% we always return an error_no_members for a take request
+            %% when all members are in-use. By adding a batch of new
+            %% members, the pool should reach a steady state with
+            %% unused members culled over time (if scheduled cull is
+            %% enabled).
+            NumToAdd = min(InitCount - length(StartingMembers), NumCanAdd),
+            Pool1 = add_members_async(NumToAdd, Pool),
+            send_metric(Pool, error_no_members_count, {inc, 1}, counter),
+            send_metric(Pool, events, error_no_members, history),
+            {error_no_members, Pool1};
         [Pid|Rest] ->
-            erlang:link(From),
             Pool1 = Pool#pool{free_pids = Rest, in_use_count = NumInUse + 1,
                               free_count = NumFree - 1},
-            send_metric(pool_metric(PoolName, in_use_count), Pool1#pool.in_use_count, histogram),
-            send_metric(pool_metric(PoolName, free_count), Pool1#pool.free_count, histogram),
-            {Pid, State#state{
-                    pools = store_pool(PoolName, Pool1, Pools),
+            send_metric(Pool, in_use_count, Pool1#pool.in_use_count, histogram),
+            send_metric(Pool, free_count, Pool1#pool.free_count, histogram),
+            {Pid, Pool1#pool{
                     consumer_to_pid = add_member_to_consumer(Pid, From, CPMap),
                     all_members = set_cpid_for_member(Pid, From,
-                                                      State#state.all_members)
+                                                      Pool1#pool.all_members)
                    }}
     end.
 
--spec do_return_member(pid(), ok | fail, #state{}) -> #state{}.
-do_return_member(Pid, ok, #state{all_members = AllMembers} = State) ->
+%% @doc Add `Count' members to `Pool' asynchronously. Returns updated
+%% `Pool' record with starting member refs added to field
+%% `starting_members'.
+add_members_async(Count, #pool{starting_members = StartingMembers} = Pool) ->
+    StartTime = os:timestamp(),
+    StartRefs = [ {pooler_starter:start_member(Pool), StartTime}
+                  || _I <- lists:seq(1, Count) ],
+    Pool#pool{starting_members = StartRefs ++ StartingMembers}.
+
+-spec do_return_member(pid(), ok | fail, #pool{}) -> #pool{}.
+do_return_member(Pid, ok, #pool{all_members = AllMembers} = Pool) ->
+    clean_group_table(Pid, Pool),
     case dict:find(Pid, AllMembers) of
-        {ok, {PoolName, CPid, _}} ->
-            Pool = fetch_pool(PoolName, State#state.pools),
+        {ok, {MRef, CPid, _}} ->
             #pool{free_pids = Free, in_use_count = NumInUse,
                   free_count = NumFree} = Pool,
             Pool1 = Pool#pool{free_pids = [Pid | Free], in_use_count = NumInUse - 1,
                               free_count = NumFree + 1},
-            Entry = {PoolName, free, os:timestamp()},
-            State#state{pools = store_pool(PoolName, Pool1, State#state.pools),
-                        all_members = store_all_members(Pid, Entry, AllMembers),
-                        consumer_to_pid = cpmap_remove(Pid, CPid,
-                                                       State#state.consumer_to_pid)};
+            Entry = {MRef, free, os:timestamp()},
+            Pool1#pool{all_members = store_all_members(Pid, Entry, AllMembers),
+                       consumer_to_pid = cpmap_remove(Pid, CPid,
+                                                      Pool1#pool.consumer_to_pid)};
         error ->
-            State
+            Pool
     end;
-do_return_member(Pid, fail, #state{all_members = AllMembers} = State) ->
+do_return_member(Pid, fail, #pool{all_members = AllMembers} = Pool) ->
     % for the fail case, perhaps the member crashed and was alerady
     % removed, so use find instead of fetch and ignore missing.
+    clean_group_table(Pid, Pool),
     case dict:find(Pid, AllMembers) of
-        {ok, {PoolName, _, _}} ->
-            State1 = remove_pid(Pid, State),
-            case add_pids(PoolName, 1, State1) of
-                {Status, State2} when Status =:= ok;
-                                      Status =:= max_count_reached ->
-                    State2;
-                {Status, _} ->
-                    erlang:error({error, "unexpected return from add_pid",
-                                  Status, erlang:get_stacktrace()}),
-                    send_metric(<<"pooler.events">>, bad_return_from_add_pid,
-                                history)
-            end;
+        {ok, {_MRef, _, _}} ->
+            Pool1 = remove_pid(Pid, Pool),
+            add_members_async(1, Pool1);
         error ->
-            State
+            Pool
     end.
 
+clean_group_table(_MemberPid, #pool{group = undefined}) ->
+    ok;
+clean_group_table(MemberPid, #pool{group = _GroupName}) ->
+    ets:delete(?POOLER_GROUP_TABLE, MemberPid).
+
 % @doc Remove `Pid' from the pid list associated with `CPid' in the
 % consumer to member map given by `CPMap'.
 %
@@ -417,13 +489,14 @@ cpmap_remove(_Pid, free, CPMap) ->
     CPMap;
 cpmap_remove(Pid, CPid, CPMap) ->
     case dict:find(CPid, CPMap) of
-        {ok, Pids0} ->
-            unlink(CPid), % FIXME: flush msg queue here?
+        {ok, {MRef, Pids0}} ->
             Pids1 = lists:delete(Pid, Pids0),
             case Pids1 of
                 [_H|_T] ->
-                    dict:store(CPid, Pids1, CPMap);
+                    dict:store(CPid, {MRef, Pids1}, CPMap);
                 [] ->
+                    %% no more members for this consumer
+                    erlang:demonitor(MRef),
                     dict:erase(CPid, CPMap)
             end;
         error ->
@@ -436,142 +509,89 @@ cpmap_remove(Pid, CPid, CPMap) ->
 % Handles in-use and free members.  Logs an error if the pid is not
 % tracked in state.all_members.
 %
--spec remove_pid(pid(), #state{}) -> #state{}.
-remove_pid(Pid, State) ->
-    #state{all_members = AllMembers, pools = Pools,
-           consumer_to_pid = CPMap} = State,
+-spec remove_pid(pid(), #pool{}) -> #pool{}.
+remove_pid(Pid, Pool) ->
+    #pool{name = PoolName,
+          all_members = AllMembers,
+          consumer_to_pid = CPMap} = Pool,
     case dict:find(Pid, AllMembers) of
-        {ok, {PoolName, free, _Time}} ->
+        {ok, {MRef, free, _Time}} ->
             % remove an unused member
-            Pool = fetch_pool(PoolName, Pools),
+            erlang:demonitor(MRef),
             FreePids = lists:delete(Pid, Pool#pool.free_pids),
             NumFree = Pool#pool.free_count - 1,
             Pool1 = Pool#pool{free_pids = FreePids, free_count = NumFree},
             exit(Pid, kill),
-            send_metric(<<"pooler.killed_free_count">>, {inc, 1}, counter),
-            State#state{pools = store_pool(PoolName, Pool1, Pools),
-                        all_members = dict:erase(Pid, AllMembers)};
-        {ok, {PoolName, CPid, _Time}} ->
-            Pool = fetch_pool(PoolName, Pools),
+            send_metric(Pool1, killed_free_count, {inc, 1}, counter),
+            Pool1#pool{all_members = dict:erase(Pid, AllMembers)};
+        {ok, {MRef, CPid, _Time}} ->
+            %% remove a member being consumed. No notice is sent to
+            %% the consumer.
+            erlang:demonitor(MRef),
             Pool1 = Pool#pool{in_use_count = Pool#pool.in_use_count - 1},
             exit(Pid, kill),
-            send_metric(<<"pooler.killed_in_use_count">>, {inc, 1}, counter),
-            State#state{pools = store_pool(PoolName, Pool1, Pools),
-                        consumer_to_pid = cpmap_remove(Pid, CPid, CPMap),
-                        all_members = dict:erase(Pid, AllMembers)};
+            send_metric(Pool1, killed_in_use_count, {inc, 1}, counter),
+            Pool1#pool{consumer_to_pid = cpmap_remove(Pid, CPid, CPMap),
+                       all_members = dict:erase(Pid, AllMembers)};
         error ->
-            error_logger:error_report({unknown_pid, Pid,
+            error_logger:error_report({{pool, PoolName}, unknown_pid, Pid,
                                        erlang:get_stacktrace()}),
-            send_metric(<<"pooler.event">>, unknown_pid, history),
-            State
-    end.
-
--spec max_free_pool(dict()) -> error_no_members | string().
-max_free_pool(Pools) ->
-    case dict:fold(fun fold_max_free_count/3, {"", 0}, Pools) of
-        {"", 0} -> error_no_members;
-        {MaxFreePoolName, _} -> MaxFreePoolName
+            send_metric(Pool, events, unknown_pid, history),
+            Pool
     end.
 
--spec fold_max_free_count(string(), #pool{}, {string(), non_neg_integer()}) ->
-    {string(), non_neg_integer()}.
-fold_max_free_count(Name, Pool, {CName, CMax}) ->
-    case Pool#pool.free_count > CMax of
-        true -> {Name, Pool#pool.free_count};
-        false -> {CName, CMax}
-    end.
-
-
--spec start_n_pids(non_neg_integer(), string(), pid(), dict()) ->
-    {dict(), [pid()]}.
-start_n_pids(N, PoolName, PoolSup, AllMembers) ->
-    NewPids = do_n(N, fun(Acc) ->
-                              case supervisor:start_child(PoolSup, []) of
-                                  {ok, Pid} ->
-                                      erlang:link(Pid),
-                                      [Pid | Acc];
-                                  _Else ->
-                                      Acc
-                              end
-                      end, []),
-    AllMembers1 = lists:foldl(
-                    fun(M, Dict) ->
-                            Entry = {PoolName, free, os:timestamp()},
-                            store_all_members(M, Entry, Dict)
-                    end, AllMembers, NewPids),
-    {AllMembers1, NewPids}.
-
-do_n(0, _Fun, Acc) ->
-    Acc;
-do_n(N, Fun, Acc) ->
-    do_n(N - 1, Fun, Fun(Acc)).
-
-
--spec fetch_pool(string(), dict()) -> #pool{} | error_no_pool.
-fetch_pool(PoolName, Pools) ->
-    case dict:find(PoolName, Pools) of
-        {ok, Pool} -> Pool;
-        error -> error_no_pool
-    end.
-
-pool_add_retries(#pool{add_member_retry = Retries}) ->
-    Retries;
-pool_add_retries(error_no_pool) ->
-    0.
-
--spec store_pool(string(), #pool{}, dict()) -> dict().
-store_pool(PoolName, Pool = #pool{}, Pools) ->
-    dict:store(PoolName, Pool, Pools).
-
 -spec store_all_members(pid(),
-                        {string(), free | pid(), {_, _, _}}, dict()) -> dict().
-store_all_members(Pid, Val = {_PoolName, _CPid, _Time}, AllMembers) ->
+                        {reference(), free | pid(), {_, _, _}}, dict()) -> dict().
+store_all_members(Pid, Val = {_MRef, _CPid, _Time}, AllMembers) ->
     dict:store(Pid, Val, AllMembers).
 
 -spec set_cpid_for_member(pid(), pid(), dict()) -> dict().
 set_cpid_for_member(MemberPid, CPid, AllMembers) ->
     dict:update(MemberPid,
-                fun({PoolName, free, Time = {_, _, _}}) ->
-                        {PoolName, CPid, Time}
+                fun({MRef, free, Time = {_, _, _}}) ->
+                        {MRef, CPid, Time}
                 end, AllMembers).
 
 -spec add_member_to_consumer(pid(), pid(), dict()) -> dict().
 add_member_to_consumer(MemberPid, CPid, CPMap) ->
-    dict:update(CPid, fun(O) -> [MemberPid|O] end, [MemberPid], CPMap).
-
--spec cull_members(string(), #state{}) -> #state{}.
-cull_members(PoolName, #state{pools = Pools} = State) ->
-    cull_members_from_pool(fetch_pool(PoolName, Pools), State).
+    %% we can't use dict:update here because we need to create the
+    %% monitor if we aren't already tracking this consumer.
+    case dict:find(CPid, CPMap) of
+        {ok, {MRef, MList}} ->
+            dict:store(CPid, {MRef, [MemberPid | MList]}, CPMap);
+        error ->
+            MRef = erlang:monitor(process, CPid),
+            dict:store(CPid, {MRef, [MemberPid]}, CPMap)
+    end.
 
--spec cull_members_from_pool(#pool{}, #state{}) -> #state{}.
-cull_members_from_pool(error_no_pool, State) ->
-    State;
-cull_members_from_pool(#pool{cull_interval = {0, _}}, State) ->
+-spec cull_members_from_pool(#pool{}) -> #pool{}.
+cull_members_from_pool(#pool{cull_interval = {0, _}} = Pool) ->
     %% 0 cull_interval means do not cull
-    State;
+    Pool;
 cull_members_from_pool(#pool{name = PoolName,
                              free_count = FreeCount,
                              init_count = InitCount,
                              in_use_count = InUseCount,
                              cull_interval = Delay,
-                             max_age = MaxAge} = Pool,
-                       #state{all_members = AllMembers} = State) ->
+                             max_age = MaxAge,
+                             all_members = AllMembers} = Pool) ->
     MaxCull = FreeCount - (InitCount - InUseCount),
-    State1 = case MaxCull > 0 of
-                 true ->
-                     MemberInfo = member_info(Pool#pool.free_pids, AllMembers),
-                     ExpiredMembers =
-                         expired_free_members(MemberInfo, os:timestamp(), MaxAge),
-                     CullList = lists:sublist(ExpiredMembers, MaxCull),
-                     lists:foldl(fun({CullMe, _}, S) -> remove_pid(CullMe, S) end,
-                                 State, CullList);
-                 false ->
-                     State
-             end,
+    Pool1 = case MaxCull > 0 of
+                true ->
+                    MemberInfo = member_info(Pool#pool.free_pids, AllMembers),
+                    ExpiredMembers =
+                        expired_free_members(MemberInfo, os:timestamp(), MaxAge),
+                    CullList = lists:sublist(ExpiredMembers, MaxCull),
+                    lists:foldl(fun({CullMe, _}, S) -> remove_pid(CullMe, S) end,
+                                Pool, CullList);
+                false ->
+                    Pool
+            end,
     schedule_cull(PoolName, Delay),
-    State1.
+    Pool1.
 
--spec schedule_cull(PoolName :: string(), Delay :: time_spec()) -> reference().
+-spec schedule_cull(PoolName :: atom() | pid(),
+                    Delay :: time_spec()) -> reference().
 %% @doc Schedule a pool cleaning or "cull" for `PoolName' in which
 %% members older than `max_age' will be removed until the pool has
 %% `init_count' members. Uses `erlang:send_after/3' for light-weight
@@ -580,7 +600,7 @@ schedule_cull(PoolName, Delay) ->
     DelayMillis = time_as_millis(Delay),
     %% use pid instead of server name atom to take advantage of
     %% automatic cancelling
-    erlang:send_after(DelayMillis, self(), {cull_pool, PoolName}).
+    erlang:send_after(DelayMillis, PoolName, cull_pool).
 
 -spec member_info([pid()], dict()) -> [{pid(), member_info()}].
 member_info(Pids, AllMembers) ->
@@ -594,22 +614,27 @@ expired_free_members(Members, Now, MaxAge) ->
     [ MI || MI = {_, {_, free, LastReturn}} <- Members,
             timer:now_diff(Now, LastReturn) >= MaxMicros ].
 
--spec send_metric(Name :: metric_label(),
-                  Value :: metric_value(),
-                  Type :: metric_type()) -> ok.
 %% Send a metric using the metrics module from application config or
 %% do nothing.
-send_metric(Name, Value, Type) ->
-    case application:get_env(pooler, metrics_module) of
-        undefined -> ok;
-        {ok, Mod} -> Mod:notify(Name, Value, Type)
-    end,
+-spec send_metric(Pool  :: #pool{},
+                  Label :: atom(),
+                  Value :: metric_value(),
+                  Type  :: metric_type()) -> ok.
+send_metric(#pool{metrics_mod = pooler_no_metrics}, _Label, _Value, _Type) ->
+    ok;
+send_metric(#pool{name = PoolName, metrics_mod = MetricsMod}, Label, Value, Type) ->
+    MetricName = pool_metric(PoolName, Label),
+    MetricsMod:notify(MetricName, Value, Type),
     ok.
 
--spec pool_metric(string(), 'free_count' | 'in_use_count' | 'take_rate') -> binary().
+-spec pool_metric(atom(), atom()) -> binary().
 pool_metric(PoolName, Metric) ->
-    iolist_to_binary([<<"pooler.">>, PoolName, ".",
-                      atom_to_binary(Metric, utf8)]).
+    iolist_to_binary([<<"pooler.">>, atom_to_binary(PoolName, utf8),
+                      ".", atom_to_binary(Metric, utf8)]).
+
+-spec time_as_secs(time_spec()) -> non_neg_integer().
+time_as_secs({Time, Unit}) ->
+    time_as_micros({Time, Unit}) div 1000000.
 
 -spec time_as_millis(time_spec()) -> non_neg_integer().
 %% @doc Convert time unit into milliseconds.
@@ -626,3 +651,6 @@ time_as_micros({Time, ms}) ->
     1000 * Time;
 time_as_micros({Time, mu}) ->
     Time.
+
+secs_between({Mega1, Secs1, _}, {Mega2, Secs2, _}) ->
+    (Mega2 - Mega1) * 1000000 + (Secs2 - Secs1).

+ 73 - 0
src/pooler.hrl

@@ -0,0 +1,73 @@
+-define(DEFAULT_ADD_RETRY, 1).
+-define(DEFAULT_CULL_INTERVAL, {0, min}).
+-define(DEFAULT_MAX_AGE, {0, min}).
+-define(DEFAULT_MEMBER_START_TIMEOUT, {1, min}).
+-define(POOLER_GROUP_TABLE, pooler_group_table).
+
+-type member_info() :: {string(), free | pid(), {_, _, _}}.
+-type free_member_info() :: {string(), free, {_, _, _}}.
+-type time_unit() :: min | sec | ms | mu.
+-type time_spec() :: {non_neg_integer(), time_unit()}.
+
+-record(pool, {
+          name             :: atom(),
+          group            :: atom(),
+          max_count = 100  :: non_neg_integer(),
+          init_count = 10  :: non_neg_integer(),
+          start_mfa        :: {atom(), atom(), [term()]},
+          free_pids = []   :: [pid()],
+          in_use_count = 0 :: non_neg_integer(),
+          free_count = 0   :: non_neg_integer(),
+          %% The number times to attempt adding a pool member if the
+          %% pool size is below max_count and there are no free
+          %% members. After this many tries, error_no_members will be
+          %% returned by a call to take_member. NOTE: this value
+          %% should be >= 2 or else the pool will not grow on demand
+          %% when max_count is larger than init_count.
+          add_member_retry = ?DEFAULT_ADD_RETRY :: non_neg_integer(),
+
+          %% The interval to schedule a cull message. Both
+          %% 'cull_interval' and 'max_age' are specified using a
+          %% `time_spec()' type.
+          cull_interval = ?DEFAULT_CULL_INTERVAL :: time_spec(),
+          %% The maximum age for members.
+          max_age = ?DEFAULT_MAX_AGE             :: time_spec(),
+
+          %% The supervisor used to start new members
+          member_sup :: atom() | pid(),
+
+          %% The supervisor used to start starter servers that start
+          %% new members. This is what enables async member starts.
+          starter_sup :: atom() | pid(),
+
+          %% Maps member pid to a tuple of the form:
+          %% {MonitorRef, Status, Time},
+          %% where MonitorRef is a monitor reference for the member,,
+          %% Status is either 'free' or the consumer pid, and Time is
+          %% an Erlang timestamp that records when the member became
+          %% free.
+          all_members = dict:new()     :: dict(),
+
+          %% Maps consumer pid to a tuple of the form:
+          %% {MonitorRef, MemberList} where MonitorRef is a monitor
+          %% reference for the consumer and MemberList is a list of
+          %% members being consumed.
+          consumer_to_pid = dict:new() :: dict(),
+
+          %% A list of `{References, Timestamp}' tuples representing
+          %% new member start requests that are in-flight. The
+          %% timestamp records when the start request was initiated
+          %% and is used to implement start timeout.
+          starting_members = [] :: [{reference(), erlang:timestamp()}],
+
+          %% The module to use for collecting metrics. If set to
+          %% 'pooler_no_metrics', then metric sending calls do
+          %% nothing. A typical value to actually capture metrics is
+          %% folsom_metrics.
+          metrics_mod = pooler_no_metrics :: atom()
+         }).
+
+-define(gv(X, Y), proplists:get_value(X, Y)).
+-define(gv(X, Y, D), proplists:get_value(X, Y, D)).
+
+

+ 32 - 0
src/pooler_config.erl

@@ -0,0 +1,32 @@
+%% @author Seth Falcon <seth@userprimary.net>
+%% @copyright 2012 Seth Falcon
+%% @doc Helper module to transform app config proplists into pool records
+
+-module(pooler_config).
+
+-export([list_to_pool/1]).
+
+-include("pooler.hrl").
+
+-spec list_to_pool([{atom(), term()}]) -> #pool{}.
+list_to_pool(P) ->
+    #pool{
+       name              = req(name, P),
+       group             = ?gv(group, P),
+       max_count         = req(max_count, P),
+       init_count        = req(init_count, P),
+       start_mfa         = req(start_mfa, P),
+       add_member_retry  = ?gv(add_member_retry, P, ?DEFAULT_ADD_RETRY),
+       cull_interval     = ?gv(cull_interval, P, ?DEFAULT_CULL_INTERVAL),
+       max_age           = ?gv(max_age, P, ?DEFAULT_MAX_AGE),
+       metrics_mod       = ?gv(metrics_mod, P, pooler_no_metrics)}.
+
+%% Return `Value' for `Key' in proplist `P' or crashes with an
+%% informative message if no value is found.
+req(Key, P) ->
+    case lists:keyfind(Key, 1, P) of
+        false ->
+            error({missing_required_config, Key, P});
+        {Key, Value} ->
+            Value
+    end.

+ 27 - 9
src/pooler_pool_sup.erl

@@ -2,14 +2,32 @@
 
 -behaviour(supervisor).
 
--export([start_link/0, init/1]).
+-export([start_link/1, init/1,
+         pool_sup_name/1,
+         member_sup_name/1]).
 
-start_link() ->
-    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+-include("pooler.hrl").
 
-init([]) ->
-    Worker = {pooler_pooled_worker_sup,
-              {pooler_pooled_worker_sup, start_link, []},
-              temporary, 5000, supervisor, [pooler_pooled_worker_sup]},
-    Restart = {simple_one_for_one, 1, 1},
-    {ok, {Restart, [Worker]}}.
+start_link(#pool{} = Pool) ->
+    SupName = pool_sup_name(Pool),
+    supervisor:start_link({local, SupName}, ?MODULE, Pool).
+
+init(#pool{} = Pool) ->
+    PoolerSpec = {pooler,
+                  {pooler, start_link, [Pool]},
+                  transient,  5000, worker, [pooler]},
+    MemberSupName = member_sup_name(Pool),
+    MemberSupSpec = {MemberSupName,
+                     {pooler_pooled_worker_sup, start_link, [Pool]},
+                     transient, 5000, supervisor, [pooler_pooled_worker_sup]},
+
+    %% five restarts in 60 seconds, then shutdown
+    Restart = {one_for_all, 5, 60},
+    {ok, {Restart, [MemberSupSpec, PoolerSpec]}}.
+
+
+member_sup_name(#pool{name = PoolName}) ->
+    list_to_atom("pooler_" ++ atom_to_list(PoolName) ++ "_member_sup").
+
+pool_sup_name(#pool{name = PoolName}) ->
+    list_to_atom("pooler_" ++ atom_to_list(PoolName) ++ "_pool_sup").

+ 5 - 2
src/pooler_pooled_worker_sup.erl

@@ -4,8 +4,11 @@
 
 -export([start_link/1, init/1]).
 
-start_link(Config) ->
-    supervisor:start_link(?MODULE, Config).
+-include("pooler.hrl").
+
+start_link(#pool{start_mfa = {_, _, _} = MFA} = Pool) ->
+    SupName = pooler_pool_sup:member_sup_name(Pool),
+    supervisor:start_link({local, SupName}, ?MODULE, MFA).
 
 init({Mod, Fun, Args}) ->
     Worker = {Mod, {Mod, Fun, Args}, temporary, brutal_kill, worker, [Mod]},

+ 127 - 0
src/pooler_starter.erl

@@ -0,0 +1,127 @@
+%% @author Seth Falcon <seth@userprimary.net>
+%% @copyright 2012-2013 Seth Falcon
+%% @doc Helper gen_server to start pool members
+%%
+-module(pooler_starter).
+-behaviour(gen_server).
+
+-include("pooler.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+%% ------------------------------------------------------------------
+%% API Function Exports
+%% ------------------------------------------------------------------
+
+-export([start_link/3,
+         start_member/1,
+         start_member/2,
+         stop/1]).
+
+%% ------------------------------------------------------------------
+%% gen_server Function Exports
+%% ------------------------------------------------------------------
+
+-export([init/1,
+         handle_call/3,
+         handle_cast/2,
+         handle_info/2,
+         terminate/2,
+         code_change/3]).
+
+%% To help with testing internal functions
+-ifdef(TEST).
+-compile([export_all]).
+-endif.
+
+%% ------------------------------------------------------------------
+%% API Function Definitions
+%% ------------------------------------------------------------------
+
+start_link(Pool, Ref, Parent) ->
+    gen_server:start_link(?MODULE, {Pool, Ref, Parent}, []).
+
+stop(Starter) ->
+    gen_server:call(Starter, stop).
+
+%% @doc Start a member for the specified `Pool'.
+%%
+%% Member creation with this call is async. This function returns
+%% immediately with a reference. When the member has been created it
+%% is sent to the specified pool via {@link pooler:accept_member/2}.
+%%
+%% Each call starts a single use `pooler_starter' instance via
+%% `pooler_starter_sup'. The instance terminates normally after
+%% creating a single member.
+-spec start_member(#pool{}) -> reference().
+start_member(Pool) ->
+    Ref = make_ref(),
+    {ok, _Pid} = pooler_starter_sup:new_starter(Pool, Ref, pool),
+    Ref.
+
+%% @doc Same as {@link start_member/1} except that instead of calling
+%% {@link pooler:accept_member/2} a raw message is sent to `Parent' of
+%% the form `{accept_member, {Ref, Member}'. Where `Member' will
+%% either be the member pid or an error term and `Ref' will be the
+%% reference returned from this function.
+%%
+%% This is used by the init function in the `pooler' to start the
+%% initial set of pool members in parallel.
+start_member(Pool, Parent) ->
+    Ref = make_ref(),
+    {ok, _Pid} = pooler_starter_sup:new_starter(Pool, Ref, Parent),
+    Ref.
+
+%% ------------------------------------------------------------------
+%% gen_server Function Definitions
+%% ------------------------------------------------------------------
+-record(starter, {pool,
+                  ref,
+                  parent}).
+
+-spec init({#pool{}, reference(), pid() | atom()}) -> {'ok', #starter{}, 0}.
+init({Pool, Ref, Parent}) ->
+    %% trigger immediate timeout message, which we'll use to trigger
+    %% the member start.
+    {ok, #starter{pool = Pool, ref = Ref, parent = Parent}, 0}.
+
+handle_call(stop, _From, State) ->
+    {stop, normal, stop_ok, State};
+handle_call(_Request, _From, State) ->
+    {noreply, State}.
+
+handle_cast(_Request, State) ->
+    {noreply, State}.
+
+-spec handle_info(_, _) -> {'noreply', _}.
+handle_info(timeout,
+            #starter{pool = Pool, ref = Ref, parent = Parent} = State) ->
+    ok = do_start_member(Pool, Ref, Parent),
+    {stop, normal, State};
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+-spec terminate(_, _) -> 'ok'.
+terminate(_Reason, _State) ->
+    ok.
+
+-spec code_change(_, _, _) -> {'ok', _}.
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+do_start_member(#pool{name = PoolName, member_sup = PoolSup}, Ref, Parent) ->
+    Msg = case supervisor:start_child(PoolSup, []) of
+              {ok, Pid} ->
+                  {Ref, Pid};
+              Error ->
+                  error_logger:error_msg("pool '~s' failed to start member: ~p",
+                                         [PoolName, Error]),
+                  {Ref, Error}
+          end,
+    send_accept_member(Parent, PoolName, Msg),
+    ok.
+
+send_accept_member(pool, PoolName, Msg) ->
+    pooler:accept_member(PoolName, Msg);
+send_accept_member(Pid, _PoolName, Msg) ->
+    Pid ! {accept_member, Msg},
+    ok.

+ 26 - 0
src/pooler_starter_sup.erl

@@ -0,0 +1,26 @@
+%% @doc Simple one for one supervisor for pooler_starter.
+%%
+%% This supervisor is shared by all pools since pooler_starter is a
+%% generic helper to fasciliate async member start.
+-module(pooler_starter_sup).
+
+-behaviour(supervisor).
+
+-export([new_starter/3,
+         start_link/0,
+         init/1]).
+
+-include("pooler.hrl").
+
+new_starter(Pool, Ref, Parent) ->
+    supervisor:start_child(?MODULE, [Pool, Ref, Parent]).
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+    Worker = {pooler_starter, {pooler_starter, start_link, []},
+              temporary, brutal_kill, worker, [pooler_starter]},
+    Specs = [Worker],
+    Restart = {simple_one_for_one, 1, 1},
+    {ok, {Restart, Specs}}.

+ 58 - 7
src/pooler_sup.erl

@@ -2,15 +2,66 @@
 
 -behaviour(supervisor).
 
--export([start_link/0, init/1]).
+-export([init/1,
+         new_pool/1,
+         rm_pool/1,
+         start_link/0]).
+
+-include("pooler.hrl").
 
 start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 init([]) ->
-    Config = application:get_all_env(pooler),
-    Pooler = {pooler, {pooler, start_link, [Config]},
-            permanent, 5000, worker, [pooler]},
-    PoolerPool = {pooler_pool_sup, {pooler_pool_sup, start_link, []},
-                permanent, 5000, supervisor, [pooler_pool_sup]},
-    {ok, {{one_for_one, 5, 10}, [PoolerPool, Pooler]}}.
+    %% a list of pool configs
+    Config = case application:get_env(pooler, pools) of
+                 {ok, C} ->
+                     C;
+                 undefined ->
+                     []
+             end,
+    MetricsConfig = {metrics_mod, metrics_module()},
+    Pools = [ pooler_config:list_to_pool([MetricsConfig | L]) || L <- Config ],
+    PoolSupSpecs = [ pool_sup_spec(Pool) || Pool <- Pools ],
+    ets:new(?POOLER_GROUP_TABLE, [set, public, named_table, {write_concurrency, true}]),
+    {ok, {{one_for_one, 5, 60}, [starter_sup_spec() | PoolSupSpecs]}}.
+
+%% @doc Create a new pool from proplist pool config `PoolConfig'. The
+%% public API for this functionality is {@link pooler:new_pool/1}.
+new_pool(PoolConfig) ->
+    MetricsConfig = {metrics_mod, metrics_module()},
+    NewPool = pooler_config:list_to_pool([MetricsConfig | PoolConfig]),
+    Spec = pool_sup_spec(NewPool),
+    supervisor:start_child(?MODULE, Spec).
+
+%% @doc Shutdown the named pool.
+rm_pool(Name) ->
+    SupName = pool_sup_name(Name),
+    case supervisor:terminate_child(?MODULE, SupName) of
+        {error, not_found} ->
+            ok;
+        ok ->
+            supervisor:terminate_child(?MODULE, SupName);
+        Error ->
+            Error
+    end.
+
+starter_sup_spec() ->
+    {pooler_starter_sup, {pooler_starter_sup, start_link, []},
+     transient, 5000, supervisor, [pooler_starter_sup]}.
+
+pool_sup_spec(#pool{name = Name} = Pool) ->
+    SupName = pool_sup_name(Name),
+    {SupName, {pooler_pool_sup, start_link, [Pool]},
+     transient, 5000, supervisor, [pooler_pool_sup]}.
+
+pool_sup_name(Name) ->
+    list_to_atom("pooler_" ++ atom_to_list(Name) ++ "_pool_sup").
+
+metrics_module() ->
+    case application:get_env(pooler, metrics_module) of
+        {ok, Mod} ->
+            Mod;
+        undefined ->
+            pooler_no_metrics
+    end.

+ 7 - 4
test/pooler_perf_test.erl

@@ -13,7 +13,7 @@ setup(InitCount, MaxCount, NumPools) ->
                        N = integer_to_list(I),
                        Name = "p" ++ N,
                        Arg0 = "pool-" ++ Name,
-                       [{name, Name},
+                       [{name, list_to_atom(Name)},
                         {max_count, MaxCount},
                         {init_count, InitCount},
                         {start_mfa,
@@ -27,11 +27,11 @@ consumer_cycle(N) ->
     consumer_cycle(N, 0, 0).
 
 consumer_cycle(N, NumOk, NumFail) when N > 0 ->
-    P = pooler:take_member(),
+    P = pooler:take_member(p1),
     case P of
         Pid when is_pid(Pid) ->
             true = is_process_alive(P),
-            pooler:return_member(P, ok),
+            pooler:return_member(p1, P, ok),
             consumer_cycle(N - 1, NumOk + 1, NumFail);
         _ ->
             consumer_cycle(N - 1, NumOk, NumFail + 1)
@@ -84,7 +84,7 @@ pooler_take_return_test_() ->
     {foreach,
      % setup
      fun() ->
-             InitCount = 10,
+             InitCount = 100,
              MaxCount = 100,
              NumPools = 5,
              error_logger:delete_report_handler(error_logger_tty_h),
@@ -114,6 +114,9 @@ pooler_take_return_test_() ->
                    lists:foldr(fun({_, L}, {O, F}) ->
                                        {O + ?gv(ok, L), F + ?gv(fail, L)}
                                end, {0, 0}, Res),
+               %% not sure what to test here now. We expect some
+               %% failures if init count is less than max count
+               %% because of async start.
                ?assertEqual(0, NumFail),
                ?assertEqual(100*100, NumOk)
        end}

+ 272 - 76
test/pooler_test.erl → test/pooler_tests.erl

@@ -1,4 +1,4 @@
--module(pooler_test).
+-module(pooler_tests).
 
 -include_lib("eunit/include/eunit.hrl").
 
@@ -28,7 +28,7 @@ user_crash(Pid) ->
     Pid ! crash.
 
 user_loop(Atom) when Atom =:= error_no_members orelse Atom =:= start ->
-    user_loop(pooler:take_member());
+    user_loop(pooler:take_member(test_pool_1));
 user_loop(MyTC) ->
     receive
         {get_tc_id, From} ->
@@ -41,11 +41,11 @@ user_loop(MyTC) ->
             From ! pooled_gs:ping_count(MyTC),
             user_loop(MyTC);
         new_tc ->
-            pooler:return_member(MyTC, ok),
-            MyNewTC = pooler:take_member(),
+            pooler:return_member(test_pool_1, MyTC, ok),
+            MyNewTC = pooler:take_member(test_pool_1),
             user_loop(MyNewTC);
         stop ->
-            pooler:return_member(MyTC, ok),
+            pooler:return_member(test_pool_1, MyTC, ok),
             stopped;
         crash ->
             erlang:error({user_loop, kaboom})
@@ -101,7 +101,7 @@ assert_tc_valid(Pid) ->
 %     user_crash(User),
 %     stop_tc(Pid1).
 
-pooler_basics_test_() ->
+pooler_basics_via_config_test_() ->
     {setup,
      fun() ->
              application:set_env(pooler, metrics_module, fake_metrics),
@@ -113,7 +113,7 @@ pooler_basics_test_() ->
     {foreach,
      % setup
      fun() ->
-             Pools = [[{name, "p1"},
+             Pools = [[{name, test_pool_1},
                        {max_count, 3},
                        {init_count, 2},
                        {start_mfa,
@@ -125,37 +125,73 @@ pooler_basics_test_() ->
      fun(_X) ->
              application:stop(pooler)
      end,
+     basic_tests()}}.
+
+pooler_basics_dynamic_test_() ->
+    {setup,
+     fun() ->
+             application:set_env(pooler, metrics_module, fake_metrics),
+             fake_metrics:start_link()
+     end,
+     fun(_X) ->
+             fake_metrics:stop()
+     end,
+    {foreach,
+     % setup
+     fun() ->
+             Pool = [{name, test_pool_1},
+                     {max_count, 3},
+                     {init_count, 2},
+                     {start_mfa,
+                      {pooled_gs, start_link, [{"type-0"}]}}],
+             application:unset_env(pooler, pools),
+             error_logger:delete_report_handler(error_logger_tty_h),
+             application:start(pooler),
+             pooler:new_pool(Pool)
+     end,
+     fun(_X) ->
+             application:stop(pooler)
+     end,
+     basic_tests()}}.
+
+basic_tests() ->
      [
       {"there are init_count members at start",
        fun() ->
-               Stats = [ P || {P, {_, free, _}} <- pooler:pool_stats() ],
+               Stats = [ P || {P, {_, free, _}} <- pooler:pool_stats(test_pool_1) ],
                ?assertEqual(2, length(Stats))
        end},
 
       {"take and return one",
        fun() ->
-               P = pooler:take_member(),
+               P = pooler:take_member(test_pool_1),
                ?assertMatch({"type-0", _Id}, pooled_gs:get_id(P)),
-               ok = pooler:return_member(P, ok)
+               ok = pooler:return_member(test_pool_1, P, ok)
        end},
 
       {"take and return one, named pool",
        fun() ->
-               P = pooler:take_member("p1"),
+               P = pooler:take_member(test_pool_1),
                ?assertMatch({"type-0", _Id}, pooled_gs:get_id(P)),
-               ok, pooler:return_member(P)
+               ok, pooler:return_member(test_pool_1, P)
        end},
 
       {"attempt to take form unknown pool",
        fun() ->
-               ?assertEqual(error_no_pool, pooler:take_member("bad_pool_name"))
+               %% since pools are now servers, an unknown pool will timeout
+               ?assertExit({noproc, _}, pooler:take_member(bad_pool_name))
        end},
 
-      {"pids are created on demand until max",
+      {"members creation is triggered after pool exhaustion until max",
        fun() ->
-               Pids = [pooler:take_member(), pooler:take_member(), pooler:take_member()],
-               ?assertEqual(error_no_members, pooler:take_member()),
-               ?assertEqual(error_no_members, pooler:take_member()),
+               %% init count is 2
+               Pids0 = [pooler:take_member(test_pool_1), pooler:take_member(test_pool_1)],
+               %% since new member creation is async, can only assert
+               %% that we will get a pid, but may not be first try.
+               Pids = get_n_pids(1, Pids0),
+               %% pool is at max now, requests should give error
+               ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
+               ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
                PRefs = [ R || {_T, R} <- [ pooled_gs:get_id(P) || P <- Pids ] ],
                % no duplicates
                ?assertEqual(length(PRefs), length(lists:usort(PRefs)))
@@ -164,20 +200,19 @@ pooler_basics_test_() ->
 
       {"pids are reused most recent return first",
        fun() ->
-               P1 = pooler:take_member(),
-               P2 = pooler:take_member(),
+               P1 = pooler:take_member(test_pool_1),
+               P2 = pooler:take_member(test_pool_1),
                ?assertNot(P1 == P2),
-               ok = pooler:return_member(P1, ok),
-               ok = pooler:return_member(P2, ok),
+               ok = pooler:return_member(test_pool_1, P1, ok),
+               ok = pooler:return_member(test_pool_1, P2, ok),
                % pids are reused most recent first
-               ?assertEqual(P2, pooler:take_member()),
-               ?assertEqual(P1, pooler:take_member())
+               ?assertEqual(P2, pooler:take_member(test_pool_1)),
+               ?assertEqual(P1, pooler:take_member(test_pool_1))
        end},
 
       {"if an in-use pid crashes it is replaced",
        fun() ->
-               Pids0 = [pooler:take_member(), pooler:take_member(),
-                        pooler:take_member()],
+               Pids0 = get_n_pids(3, []),
                Ids0 = [ pooled_gs:get_id(P) || P <- Pids0 ],
                % crash them all
                [ pooled_gs:crash(P) || P <- Pids0 ],
@@ -189,7 +224,7 @@ pooler_basics_test_() ->
 
       {"if a free pid crashes it is replaced",
        fun() ->
-               FreePids = [ P || {P, {_, free, _}} <- pooler:pool_stats() ],
+               FreePids = [ P || {P, {_, free, _}} <- pooler:pool_stats(test_pool_1) ],
                [ exit(P, kill) || P <- FreePids ],
                Pids1 = get_n_pids(3, []),
                ?assertEqual(3, length(Pids1))
@@ -197,10 +232,10 @@ pooler_basics_test_() ->
 
       {"if a pid is returned with bad status it is replaced",
        fun() ->
-               Pids0 = [pooler:take_member(), pooler:take_member(), pooler:take_member()],
+               Pids0 = get_n_pids(3, []),
                Ids0 = [ pooled_gs:get_id(P) || P <- Pids0 ],
                % return them all marking as bad
-               [ pooler:return_member(P, fail) || P <- Pids0 ],
+               [ pooler:return_member(test_pool_1, P, fail) || P <- Pids0 ],
                Pids1 = get_n_pids(3, []),
                Ids1 = [ pooled_gs:get_id(P) || P <- Pids1 ],
                [ ?assertNot(lists:member(I, Ids0)) || I <- Ids1 ]
@@ -222,53 +257,190 @@ pooler_basics_test_() ->
        fun() ->
                Bogus1 = spawn(fun() -> ok end),
                Bogus2 = spawn(fun() -> ok end),
-               ?assertEqual(ok, pooler:return_member(Bogus1, ok)),
-               ?assertEqual(ok, pooler:return_member(Bogus2, fail))
+               ?assertEqual(ok, pooler:return_member(test_pool_1, Bogus1, ok)),
+               ?assertEqual(ok, pooler:return_member(test_pool_1, Bogus2, fail))
        end
       },
 
       {"calling return_member on error_no_members is ignored",
        fun() ->
-               ?assertEqual(ok, pooler:return_member(error_no_members)),
-               ?assertEqual(ok, pooler:return_member(error_no_members, ok)),
-               ?assertEqual(ok, pooler:return_member(error_no_members, fail))
+               ?assertEqual(ok, pooler:return_member(test_pool_1, error_no_members)),
+               ?assertEqual(ok, pooler:return_member(test_pool_1, error_no_members, ok)),
+               ?assertEqual(ok, pooler:return_member(test_pool_1, error_no_members, fail))
        end
       },
 
+      {"dynamic pool creation",
+       fun() ->
+               {ok, SupPid} = pooler:new_pool([{name, dyn_pool_1},
+                                               {max_count, 3},
+                                               {init_count, 2},
+                                               {start_mfa,
+                                                {pooled_gs, start_link, [{"dyn-0"}]}}]),
+               ?assert(is_pid(SupPid)),
+               M = pooler:take_member(dyn_pool_1),
+               ?assertMatch({"dyn-0", _Id}, pooled_gs:get_id(M)),
+               ?assertEqual(ok, pooler:rm_pool(dyn_pool_1)),
+               ?assertExit({noproc, _}, pooler:take_member(dyn_pool_1)),
+               %% remove non-existing pool
+               ?assertEqual(ok, pooler:rm_pool(dyn_pool_X)),
+               ?assertEqual(ok, pooler:rm_pool(dyn_pool_1))
+       end},
+
       {"metrics have been called",
        fun() ->
                %% exercise the API to ensure we have certain keys reported as metrics
                fake_metrics:reset_metrics(),
-               Pids = [ pooler:take_member() || _I <- lists:seq(1, 10) ],
-               [ pooler:return_member(P) || P <- Pids ],
-               pooler:take_member("bad_pool_name"),
+               Pids = [ pooler:take_member(test_pool_1) || _I <- lists:seq(1, 10) ],
+               [ pooler:return_member(test_pool_1, P) || P <- Pids ],
+               catch pooler:take_member(bad_pool_name),
                %% kill and unused member
                exit(hd(Pids), kill),
                %% kill a used member
-               KillMe = pooler:take_member("p1"),
+               KillMe = pooler:take_member(test_pool_1),
                exit(KillMe, kill),
                %% FIXME: We need to wait for pooler to process the
                %% exit message. This is ugly, will fix later.
                timer:sleep(200),                % :(
-               ExpectKeys = [<<"pooler.error_no_members_count">>,
-                             <<"pooler.events">>,
-                             <<"pooler.killed_free_count">>,
-                             <<"pooler.killed_in_use_count">>,
-                             <<"pooler.p1.free_count">>,
-                             <<"pooler.p1.in_use_count">>,
-                             <<"pooler.p1.take_rate">>],
+               ExpectKeys = lists:sort([<<"pooler.test_pool_1.error_no_members_count">>,
+                                        <<"pooler.test_pool_1.events">>,
+                                        <<"pooler.test_pool_1.free_count">>,
+                                        <<"pooler.test_pool_1.in_use_count">>,
+                                        <<"pooler.test_pool_1.killed_free_count">>,
+                                        <<"pooler.test_pool_1.killed_in_use_count">>,
+                                        <<"pooler.test_pool_1.take_rate">>]),
                Metrics = fake_metrics:get_metrics(),
                GotKeys = lists:usort([ Name || {Name, _, _} <- Metrics ]),
                ?assertEqual(ExpectKeys, GotKeys)
+       end},
+
+      {"accept bad member is handled",
+       fun() ->
+               Bad = spawn(fun() -> ok end),
+               Ref = erlang:make_ref(),
+               ?assertEqual(ok, pooler:accept_member(test_pool_1, {Ref, Bad}))
+       end}
+      ].
+
+pooler_groups_test_() ->
+    {setup,
+     fun() ->
+             application:set_env(pooler, metrics_module, fake_metrics),
+             fake_metrics:start_link()
+     end,
+     fun(_X) ->
+             fake_metrics:stop()
+     end,
+    {foreach,
+     % setup
+     fun() ->
+             Pools = [[{name, test_pool_1},
+                       {group, group_1},
+                       {max_count, 3},
+                       {init_count, 2},
+                       {start_mfa,
+                        {pooled_gs, start_link, [{"type-1-1"}]}}],
+                      [{name, test_pool_2},
+                       {group, group_1},
+                       {max_count, 3},
+                       {init_count, 2},
+                       {start_mfa,
+                        {pooled_gs, start_link, [{"type-1-2"}]}}],
+                      %% test_pool_3 not part of the group
+                      [{name, test_pool_3},
+                       {group, undefined},
+                       {max_count, 3},
+                       {init_count, 2},
+                       {start_mfa,
+                        {pooled_gs, start_link, [{"type-3"}]}}]
+                     ],
+             application:set_env(pooler, pools, Pools),
+             %% error_logger:delete_report_handler(error_logger_tty_h),
+             pg2:start(),
+             application:start(pooler)
+     end,
+     fun(_X) ->
+             application:stop(pooler),
+             application:stop(pg2)
+     end,
+     [
+      {"take and return one group member (repeated)",
+       fun() ->
+               Types = [ begin
+                             Pid = pooler:take_group_member(group_1),
+                             {Type, _} = pooled_gs:get_id(Pid),
+                             ?assertMatch("type-1" ++ _, Type),
+                             ok = pooler:return_group_member(group_1, Pid, ok),
+                             Type
+                         end
+                         || _I <- lists:seq(1, 50) ],
+               Type_1_1 = [ X || "type-1-1" = X <- Types ],
+               Type_1_2 = [ X || "type-1-2" = X <- Types ],
+               ?assert(length(Type_1_1) > 0),
+               ?assert(length(Type_1_2) > 0)
+       end},
+
+      {"take member from unknown group",
+       fun() ->
+               ?assertEqual({error_no_group, not_a_group},
+                            pooler:take_group_member(not_a_group))
+       end},
+
+      {"return member to unknown group",
+       fun() ->
+               Pid = pooler:take_group_member(group_1),
+               ?assertEqual(ok, pooler:return_group_member(no_such_group, Pid))
+       end},
+
+      {"return member to wrong group",
+       fun() ->
+               Pid = pooler:take_member(test_pool_3),
+               ?assertEqual(ok, pooler:return_group_member(group_1, Pid))
+       end},
+
+      {"take member from empty group",
+       fun() ->
+               %% artificially empty group member list
+               [ pg2:leave(group_1, M) || M <- pg2:get_members(group_1) ],
+               ?assertEqual(error_no_members, pooler:take_group_member(group_1))
+       end},
+
+      {"return member to group, implied ok",
+       fun() ->
+               Pid = pooler:take_group_member(group_1),
+               ?assertEqual(ok, pooler:return_group_member(group_1, Pid))
+       end},
+
+      {"return error_no_member to group",
+       fun() ->
+               ?assertEqual(ok, pooler:return_group_member(group_1, error_no_members))
+       end},
+      
+
+      {"exhaust pools in group",
+       fun() ->
+               Pids = get_n_pids_group(group_1, 6, []),
+               %% they should all be pids
+               [ begin
+                     {Type, _} = pooled_gs:get_id(P),
+                     ?assertMatch("type-1" ++ _, Type),
+                     ok
+                 end || P <- Pids ],
+               %% further attempts should be error
+               [error_no_members,
+                error_no_members,
+                error_no_members] = [ pooler:take_group_member(group_1)
+                                      || _I <- lists:seq(1, 3) ]
        end}
      ]}}.
+               
 
 pooler_limit_failed_adds_test_() ->
     %% verify that pooler crashes completely if too many failures are
     %% encountered while trying to add pids.
     {setup,
      fun() ->
-             Pools = [[{name, "p1"},
+             Pools = [[{name, test_pool_1},
                        {max_count, 10},
                        {init_count, 10},
                        {start_mfa,
@@ -280,8 +452,8 @@ pooler_limit_failed_adds_test_() ->
      end,
      fun() ->
              application:start(pooler),
-             ?assertEqual(error_no_members, pooler:take_member()),
-             ?assertEqual(error_no_members, pooler:take_member("p1"))
+             ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
+             ?assertEqual(error_no_members, pooler:take_member(test_pool_1))
      end}.
 
 pooler_scheduled_cull_test_() ->
@@ -289,13 +461,13 @@ pooler_scheduled_cull_test_() ->
      fun() ->
              application:set_env(pooler, metrics_module, fake_metrics),
              fake_metrics:start_link(),
-             Pools = [[{name, "p1"},
+             Pools = [[{name, test_pool_1},
                        {max_count, 10},
                        {init_count, 2},
                        {start_mfa, {pooled_gs, start_link, [{"type-0"}]}},
                        {cull_interval, {200, ms}}]],
              application:set_env(pooler, pools, Pools),
-             error_logger:delete_report_handler(error_logger_tty_h),
+             %% error_logger:delete_report_handler(error_logger_tty_h),
              application:start(pooler)
      end,
      fun(_X) ->
@@ -305,52 +477,52 @@ pooler_scheduled_cull_test_() ->
      [{"excess members are culled repeatedly",
        fun() ->
                %% take all members
-               Pids1 = [ pooler:take_member("p1") || _X <- lists:seq(1, 10) ],
+               Pids1 = get_n_pids(test_pool_1, 10, []),
                %% return all
-               [ pooler:return_member(P) || P <- Pids1 ],
-               ?assertEqual(10, length(pooler:pool_stats())),
+               [ pooler:return_member(test_pool_1, P) || P <- Pids1 ],
+               ?assertEqual(10, length(pooler:pool_stats(test_pool_1))),
                %% wait for longer than cull delay
                timer:sleep(250),
-               ?assertEqual(2, length(pooler:pool_stats())),
+               ?assertEqual(2, length(pooler:pool_stats(test_pool_1))),
 
                %% repeat the test to verify that culling gets rescheduled.
-               Pids2 = [ pooler:take_member("p1") || _X <- lists:seq(1, 10) ],
+               Pids2 = get_n_pids(test_pool_1, 10, []),
                %% return all
-               [ pooler:return_member(P) || P <- Pids2 ],
-               ?assertEqual(10, length(pooler:pool_stats())),
+               [ pooler:return_member(test_pool_1, P) || P <- Pids2 ],
+               ?assertEqual(10, length(pooler:pool_stats(test_pool_1))),
                %% wait for longer than cull delay
                timer:sleep(250),
-               ?assertEqual(2, length(pooler:pool_stats()))
+               ?assertEqual(2, length(pooler:pool_stats(test_pool_1)))
        end
       },
 
       {"non-excess members are not culled",
        fun() ->
-               [P1, P2] = [pooler:take_member("p1") || _X <- [1, 2] ],
-               [pooler:return_member(P) || P <- [P1, P2] ],
-               ?assertEqual(2, length(pooler:pool_stats())),
+               [P1, P2] = [pooler:take_member(test_pool_1) || _X <- [1, 2] ],
+               [pooler:return_member(test_pool_1, P) || P <- [P1, P2] ],
+               ?assertEqual(2, length(pooler:pool_stats(test_pool_1))),
                timer:sleep(250),
-               ?assertEqual(2, length(pooler:pool_stats()))
+               ?assertEqual(2, length(pooler:pool_stats(test_pool_1)))
        end
       },
 
       {"in-use members are not culled",
        fun() ->
                %% take all members
-               Pids = [ pooler:take_member("p1") || _X <- lists:seq(1, 10) ],
+               Pids = get_n_pids(test_pool_1, 10, []),
                %% don't return any
-               ?assertEqual(10, length(pooler:pool_stats())),
+               ?assertEqual(10, length(pooler:pool_stats(test_pool_1))),
                %% wait for longer than cull delay
                timer:sleep(250),
-               ?assertEqual(10, length(pooler:pool_stats())),
-               [ pooler:return_member(P) || P <- Pids ]
+               ?assertEqual(10, length(pooler:pool_stats(test_pool_1))),
+               [ pooler:return_member(test_pool_1, P) || P <- Pids ]
        end}
      ]}.
 
 random_message_test_() ->
     {setup,
      fun() ->
-             Pools = [[{name, "p1"},
+             Pools = [[{name, test_pool_1},
                        {max_count, 2},
                        {init_count, 1},
                        {start_mfa,
@@ -360,9 +532,9 @@ random_message_test_() ->
              application:start(pooler),
              %% now send some bogus messages
              %% do the call in a throw-away process to avoid timeout error
-             spawn(fun() -> catch gen_server:call(pooler, {unexpected_garbage_msg, 5}) end),
-             gen_server:cast(pooler, {unexpected_garbage_msg, 6}),
-            whereis(pooler) ! {unexpected_garbage_msg, 7},
+             spawn(fun() -> catch gen_server:call(test_pool_1, {unexpected_garbage_msg, 5}) end),
+             gen_server:cast(test_pool_1, {unexpected_garbage_msg, 6}),
+             whereis(test_pool_1) ! {unexpected_garbage_msg, 7},
              ok
      end,
      fun(_) ->
@@ -370,9 +542,20 @@ random_message_test_() ->
      end,
     [
      fun() ->
-             Pid = pooler:take_member("p1"),
+             Pid = spawn(fun() -> ok end),
+             MonMsg = {'DOWN', erlang:make_ref(), process, Pid, because},
+             test_pool_1 ! MonMsg
+     end,
+
+     fun() ->
+             Pid = pooler:take_member(test_pool_1),
              {Type, _} =  pooled_gs:get_id(Pid),
              ?assertEqual("type-0", Type)
+     end,
+
+     fun() ->
+             RawPool = gen_server:call(test_pool_1, dump_pool),
+             ?assertEqual(pool, element(1, RawPool))
      end
     ]}.
 
@@ -380,7 +563,7 @@ pooler_integration_test_() ->
     {foreach,
      % setup
      fun() ->
-             Pools = [[{name, "p1"},
+             Pools = [[{name, test_pool_1},
                        {max_count, 10},
                        {init_count, 10},
                        {start_mfa,
@@ -457,12 +640,25 @@ time_as_micros_test_() ->
 % testing crash recovery means race conditions when either pids
 % haven't yet crashed or pooler hasn't recovered.  So this helper loops
 % forver until N pids are obtained, ignoring error_no_members.
-get_n_pids(0, Acc) ->
-    Acc;
 get_n_pids(N, Acc) ->
-    case pooler:take_member() of
+    get_n_pids(test_pool_1, N, Acc).
+
+get_n_pids(_Pool, 0, Acc) ->
+    Acc;
+get_n_pids(Pool, N, Acc) ->
+    case pooler:take_member(Pool) of
+        error_no_members ->
+            get_n_pids(Pool, N, Acc);
+        Pid ->
+            get_n_pids(Pool, N - 1, [Pid|Acc])
+    end.
+
+get_n_pids_group(_Group, 0, Acc) ->
+    Acc;
+get_n_pids_group(Group, N, Acc) ->
+    case pooler:take_group_member(Group) of
         error_no_members ->
-            get_n_pids(N, Acc);
+            get_n_pids_group(Group, N, Acc);
         Pid ->
-            get_n_pids(N - 1, [Pid|Acc])
+            get_n_pids_group(Group, N - 1, [Pid|Acc])
     end.