Просмотр исходного кода

Merge branch 'multi-pool' into pooler-1-0

Seth Falcon 12 лет назад
Родитель
Сommit
f98f3b5a08
9 измененных файлов с 626 добавлено и 442 удалено
  1. 80 39
      README.org
  2. 238 313
      src/pooler.erl
  3. 50 0
      src/pooler.hrl
  4. 32 0
      src/pooler_config.erl
  5. 27 9
      src/pooler_pool_sup.erl
  6. 5 2
      src/pooler_pooled_worker_sup.erl
  7. 25 6
      src/pooler_sup.erl
  8. 3 3
      test/pooler_perf_test.erl
  9. 166 70
      test/pooler_tests.erl

+ 80 - 39
README.org

@@ -11,8 +11,8 @@ with exclusive access to pool members using =pooler:take_member=.
 
 *** Protects the members of a pool from being used concurrently
 
-The main pooler interface is =pooler:take_member/0= and
-=pooler:return_member/2=.  The pooler server will keep track of which
+The main pooler interface is =pooler:take_member/1= and
+=pooler:return_member/3=.  The pooler server will keep track of which
 members are *in use* and which are *free*.  There is no need to call
 =pooler:return_member= if the consumer is a short-lived process; in
 this case, pooler will detect the consumer's normal exit and reclaim
@@ -25,25 +25,26 @@ out the member pid to another worker process.
 
 You specify an initial and a maximum number of members in the pool.
 Pooler will create new members on demand until the maximum member
-count is reached.  New pool members are added to replace member that
+count is reached.  New pool members are added to replace members that
 crash.  If a consumer crashes, the member it was using will be
 destroyed and replaced.  You can configure Pooler to periodically
-check for and remove members that have not been used recently using to
+check for and remove members that have not been used recently to
 reduce the member count back to its initial size.
 
 *** Manage multiple pools
 
-A common configuration is to have each pool contain client processes
-connected to a particular node in a cluster (think database read
-slaves).  Pooler will randomly select a pool to fetch a member from.
-If the randomly selected pool has no free members, pooler will select
-a member from the pool with the most free members.  If there is no
-pool with available members, pooler will return =error_no_members=.
-
-You can ask for a member from a specified pool using
-=pooler:take_member/1=. If ensure your code always asks for members by
-pool name, you can use pooler to pool clients for different backend
-services.
+You can use pooler to manage multiple independent pools and multiple
+grouped pools. Independent pools allow you to pool clients for
+different backend services (e.g. postgresql and redis). Grouped pools
+can optionally be accessed using =pooler:take_group_member/1= to
+provide load balancing of the pools in the group. A typical use of
+grouped pools is to have each pool contain clients connected to a
+particular node in a cluster (think database read slaves).  Pooler's
+=take_group_member= function will randomly select a pool in the group
+to fetch a member from.  If the randomly selected pool has no free
+members, pooler will attempt to obtain a member from each pool in the
+group.  If there is no pool with available members, pooler will return
+=error_no_members=.
 
 ** Motivation
 
@@ -70,6 +71,10 @@ continue in the face of Riak node failures, consumers should spread
 their requests across clients connected to each node.  The client pool
 provides an easy way to load balance.
 
+Since writing pooler, I've seen it used to pool database connections
+for PostgreSQL, MySQL, and Redis. These uses led to a redesign to
+better support multiple independent pools.
+
 ** Usage and API
 
 *** Pool Configuration
@@ -77,8 +82,9 @@ provides an easy way to load balance.
 Pool configuration is specified in the pooler application's
 environment.  This can be provided in a config file using =-config= or
 set at startup using =application:set_env(pooler, pools,
-Pools)=. Here's an example config file that creates three pools of
-Riak pb clients each talking to a different node in a local cluster:
+Pools)=. Here's an example config file that creates two pools of
+Riak pb clients each talking to a different node in a local cluster
+and one pool talking to a Postgresql database:
 
 #+BEGIN_SRC erlang
   % pooler.config
@@ -88,23 +94,25 @@ Riak pb clients each talking to a different node in a local cluster:
   [
    {pooler, [
            {pools, [
-                    [{name, "rc8081"},
+                    [{name, rc8081},
+                     {group, riak},
                      {max_count, 5},
                      {init_count, 2},
                      {start_mfa,
                       {riakc_pb_socket, start_link, ["localhost", 8081]}}],
 
-                    [{name, "rc8082"},
+                    [{name, rc8082},
+                     {group, riak},
                      {max_count, 5},
                      {init_count, 2},
                      {start_mfa,
                       {riakc_pb_socket, start_link, ["localhost", 8082]}}],
 
-                    [{name, "rc8083"},
-                     {max_count, 5},
+                    [{name, pg_db1},
+                     {max_count, 10},
                      {init_count, 2},
                      {start_mfa,
-                      {riakc_pb_socket, start_link, ["localhost", 8083]}}]
+                      {my_pg_sql_driver, start_link, ["db_host"]}}]
                    ]}
              %% if you want to enable metrics, set this to a module with
              %% an API conformant to the folsom_metrics module.
@@ -114,10 +122,12 @@ Riak pb clients each talking to a different node in a local cluster:
   ].
 #+END_SRC
 
-Each pool has a unique name, an initial and maximum number of members,
+Each pool has a unique name, specified as an atom, an initial and maximum number of members,
 and an ={M, F, A}= describing how to start members of the pool.  When
 pooler starts, it will create members in each pool according to
-=init_count=.
+=init_count=. Optionally, you can indicate that a pool is part of a
+group. You can use pooler to load balance across pools labeled with
+the same group tag.
 
 **** Culling stale members
 
@@ -135,7 +145,7 @@ examples are valid:
 #+END_SRC
 
 The =cull_interval= determines the schedule when a check will be made
-for stale members. Checks are scheduling using =erlang:send_after/3=
+for stale members. Checks are scheduled using =erlang:send_after/3=
 which provides a light-weight timing mechanism. The next check is
 scheduled after the prior check completes.
 
@@ -163,23 +173,29 @@ Here's an example session:
 
 #+BEGIN_SRC erlang
 application:start(pooler).
-P = pooler:take_member(),
+P = pooler:take_member(mysql),
 % use P
-pooler:return_member(P, ok).
+pooler:return_member(mysql, P, ok).
 #+END_SRC
 
 Once started, the main interaction you will have with pooler is
-through two functions, =take_member/0= (or =take_member/1=) and
-=return_member/2= (or =return_member/1=).
-
-Call =pooler:take_member()= to obtain a member from a randomly
-selected pool.  When you are done with it, return it to the pool using
-=pooler:return_member(Pid, ok)=.  If you encountered an error using
-the member, you can pass =fail= as the second argument.  In this case,
-pooler will permanently remove that member from the pool and start a
-new member to replace it.  If your process is short lived, you can
-omit the call to =return_member=.  In this case, pooler will detect
-the normal exit of the consumer and reclaim the member.
+through two functions, =take_member/1= and =return_member/3= (or
+=return_member/2=).
+
+Call =pooler:take_member(Pool)= to obtain the pid belonging to a
+member of the pool =Pool=.  When you are done with it, return it to
+the pool using =pooler:return_member(Pool, Pid, ok)=.  If you
+encountered an error using the member, you can pass =fail= as the
+second argument.  In this case, pooler will permanently remove that
+member from the pool and start a new member to replace it.  If your
+process is short lived, you can omit the call to =return_member=.  In
+this case, pooler will detect the normal exit of the consumer and
+reclaim the member.
+
+If you would like to obtain a member from a randomly selected pool in
+a group, call =pooler:take_group_member(Group)=. This will return a
+={Pool, Pid}= pair. You will need the =Pool= value to return the
+member to its pool.
 
 *** pooler as an included application
 
@@ -191,7 +207,7 @@ cause problems. One way to work around this is to specify pooler as an
 included application in your app. This means you will call pooler's
 top-level supervisor in your app's top-level supervisor and can regain
 control over the application start order. To do this, you would remove
-pooler from the list of applications in your_app.app add
+pooler from the list of applications in your_app.app and add
 it to the included_application key:
 
 #+BEGIN_SRC erlang
@@ -265,6 +281,31 @@ When enabled, the following metrics will be tracked:
    ok
    #+END_EXAMPLE
 
+** Implementation Notes
+*** Overview of supervision
+
+The top-level supervisor is pooler_sup. It supervises one supervisor
+for each pool configured in pooler's app config.
+
+At startup, a pooler_NAME_pool_sup is started for each pool described in
+pooler's app config with NAME matching the name attribute of the
+config.
+
+The pooler_NAME_pool_sup starts the gen_server that will register with
+pooler_NAME_pool as well as a pooler_pooled_worker_sup that will be
+used to start and supervise the members of this pool.
+
+pooler_sup:                one_for_one
+pooler_NAME_pool_sup:      all_for_one
+pooler_pooled_worker_sup:  simple_one_for_one
+
+Groups of pools are managed using the pg2 application. This imposes a
+requirement to set a configuration parameter on the kernel application
+in an OTP release. Like this in sys.config:
+#+begin_src erlang
+{kernel, [{start_pg2, true}]}
+#+end_src
+
 ** License
 Pooler is licensed under the Apache License Version 2.0.  See the
 [[file:LICENSE][LICENSE]] file for details.

+ 238 - 313
src/pooler.erl

@@ -10,21 +10,12 @@
 %%
 -module(pooler).
 -behaviour(gen_server).
--define(SERVER, ?MODULE).
-
--define(DEFAULT_ADD_RETRY, 1).
--define(DEFAULT_CULL_INTERVAL, {0, min}).
--define(DEFAULT_MAX_AGE, {0, min}).
 
+-include("pooler.hrl").
 -include_lib("eunit/include/eunit.hrl").
 
--type member_info() :: {string(), free | pid(), {_, _, _}}.
--type free_member_info() :: {string(), free, {_, _, _}}.
--type time_unit() :: min | sec | ms | mu.
--type time_spec() :: {non_neg_integer(), time_unit()}.
 
 %% type specs for pool metrics
--type metric_label() :: binary().
 -type metric_value() :: 'unknown_pid' |
                         non_neg_integer() |
                         {'add_pids_failed', non_neg_integer(), non_neg_integer()} |
@@ -32,56 +23,18 @@
                         'error_no_members'.
 -type metric_type() :: 'counter' | 'histogram' | 'history' | 'meter'.
 
--record(pool, {
-          name             :: string(),
-          max_count = 100  :: non_neg_integer(),
-          init_count = 10  :: non_neg_integer(),
-          start_mfa        :: {atom(), atom(), [term()]},
-          free_pids = []   :: [pid()],
-          in_use_count = 0 :: non_neg_integer(),
-          free_count = 0   :: non_neg_integer(),
-          %% The number times to attempt adding a pool member if the
-          %% pool size is below max_count and there are no free
-          %% members. After this many tries, error_no_members will be
-          %% returned by a call to take_member. NOTE: this value
-          %% should be >= 2 or else the pool will not grow on demand
-          %% when max_count is larger than init_count.
-          add_member_retry = ?DEFAULT_ADD_RETRY :: non_neg_integer(),
-
-          %% The interval to schedule a cull message. Both
-          %% 'cull_interval' and 'max_age' are specified using a
-          %% `time_spec()' type.
-          cull_interval = ?DEFAULT_CULL_INTERVAL :: time_spec(),
-          %% The maximum age for members.
-          max_age = ?DEFAULT_MAX_AGE             :: time_spec()
-         }).
-
--record(state, {
-          npools                       :: non_neg_integer(),
-          pools = dict:new()           :: dict(),
-          pool_sups = dict:new()       :: dict(),
-          all_members = dict:new()     :: dict(),
-          consumer_to_pid = dict:new() :: dict(),
-          pool_selector                :: array()
-         }).
-
--define(gv(X, Y), proplists:get_value(X, Y)).
--define(gv(X, Y, D), proplists:get_value(X, Y, D)).
-
 %% ------------------------------------------------------------------
 %% API Function Exports
 %% ------------------------------------------------------------------
 
--export([start/1,
-         start_link/1,
-         stop/0,
-         take_member/0,
+-export([start_link/1,
          take_member/1,
-         return_member/1,
+         take_group_member/1,
+         return_group_member/2,
+         return_group_member/3,
          return_member/2,
-         % remove_pool/2,
-         % add_pool/1,
-         pool_stats/0]).
+         return_member/3,
+         pool_stats/1]).
 
 %% ------------------------------------------------------------------
 %% gen_server Function Exports
@@ -103,142 +56,167 @@
 %% API Function Definitions
 %% ------------------------------------------------------------------
 
-start_link(Config) ->
-    gen_server:start_link({local, ?SERVER}, ?MODULE, Config, []).
-
-start(Config) ->
-    gen_server:start_link({local, ?SERVER}, ?MODULE, Config, []).
-
-stop() ->
-    gen_server:call(?SERVER, stop).
-
-%% @doc Obtain exclusive access to a member from a randomly selected pool.
-%%
-%% If there are no free members in the randomly selected pool, then a
-%% member will be returned from the pool with the most free members.
-%% If no free members are available, 'error_no_members' is returned.
-%%
--spec take_member() -> pid() | error_no_members.
-take_member() ->
-    gen_server:call(?SERVER, take_member, infinity).
+start_link(#pool{name = Name} = Pool) ->
+    gen_server:start_link({local, Name}, ?MODULE, Pool, []).
 
 %% @doc Obtain exclusive access to a member from `PoolName'.
 %%
 %% If no free members are available, 'error_no_members' is returned.
 %%
--spec take_member(string()) -> pid() | error_no_members | error_no_pool.
-take_member(PoolName) when is_list(PoolName) ->
-    gen_server:call(?SERVER, {take_member, PoolName}, infinity).
+-spec take_member(atom() | pid()) -> pid() | error_no_members.
+take_member(PoolName) when is_atom(PoolName) orelse is_pid(PoolName) ->
+    gen_server:call(PoolName, take_member, infinity).
+
+%% @doc Take a member from a randomly selected member of the group
+%% `GroupName'. Returns `MemberPid' or `error_no_members'.  If no
+%% members are available in the randomly chosen pool, all other pools
+%% in the group are tried in order.
+-spec take_group_member(atom()) -> pid() | error_no_members.
+take_group_member(GroupName) ->
+    case pg2:get_local_members(GroupName) of
+        {error, {no_such_group, GroupName}} = Error ->
+            Error;
+        Members ->
+            %% Put a random member at the front of the list and then
+            %% return the first member you can walking the list.
+            Idx = crypto:rand_uniform(1, length(Members) + 1),
+            {Pid, Rest} = extract_nth(Idx, Members),
+            take_first_member([Pid | Rest])
+    end.
+
+take_first_member([Pid | Rest]) ->
+    case take_member(Pid) of
+        error_no_members ->
+            take_first_member(Rest);
+        Member ->
+            ets:insert(?POOLER_GROUP_TABLE, {Member, Pid}),
+            Member
+    end;
+take_first_member([]) ->
+    error_no_members.
+
+%% this helper function returns `{Nth_Elt, Rest}' where `Nth_Elt' is
+%% the nth element of `L' and `Rest' is `L -- [Nth_Elt]'.
+extract_nth(N, L) ->
+    extract_nth(N, L, []).
+
+extract_nth(1, [H | T], Acc) ->
+    {H, Acc ++ T};
+extract_nth(N, [H | T], Acc) ->
+    extract_nth(N - 1, T, [H | Acc]);
+extract_nth(_, [], _) ->
+    error(badarg).
+
+%% @doc Return a member that was taken from the group
+%% `GroupName'. This is a convenience function for
+%% `return_group_member/3' with `Status' of `ok'.
+-spec return_group_member(atom(), pid() | error_no_members) -> ok.
+return_group_member(GroupName, MemberPid) ->
+    return_group_member(GroupName, MemberPid, ok).
+
+%% @doc Return a member that was taken from the group `GroupName'. If
+%% `Status' is `ok' the member is returned to the pool from which is
+%% came. If `Status' is `fail' the member will be terminated and a new
+%% member added to the appropriate pool.
+-spec return_group_member(atom(), pid() | error_no_members, ok | fail) -> ok.
+return_group_member(_, error_no_members, _) ->
+    ok;
+return_group_member(_GroupName, MemberPid, Status) ->
+    case ets:lookup(?POOLER_GROUP_TABLE, MemberPid) of
+        [{MemberPid, PoolPid}] ->
+            return_member(PoolPid, MemberPid, Status);
+        [] ->
+            ok
+    end.
 
 %% @doc Return a member to the pool so it can be reused.
 %%
 %% If `Status' is 'ok', the member is returned to the pool.  If
 %% `Status' is 'fail', the member is destroyed and a new member is
 %% added to the pool in its place.
--spec return_member(pid() | error_no_members, ok | fail) -> ok.
-return_member(Pid, Status) when is_pid(Pid) andalso
-                                (Status =:= ok orelse Status =:= fail) ->
-    gen_server:call(?SERVER, {return_member, Pid, Status}, infinity),
+-spec return_member(atom() | pid(), pid() | error_no_members, ok | fail) -> ok.
+return_member(PoolName, Pid, Status) when is_pid(Pid) andalso
+                                          (is_atom(PoolName) orelse
+                                           is_pid(PoolName)) andalso
+                                          (Status =:= ok orelse
+                                           Status =:= fail) ->
+    gen_server:call(PoolName, {return_member, Pid, Status}, infinity),
     ok;
-return_member(error_no_members, _) ->
+return_member(_, error_no_members, _) ->
     ok.
 
 %% @doc Return a member to the pool so it can be reused.
 %%
--spec return_member(pid() | error_no_members) -> ok.
-return_member(Pid) when is_pid(Pid) ->
-    gen_server:call(?SERVER, {return_member, Pid, ok}, infinity),
+-spec return_member(atom() | pid(), pid() | error_no_members) -> ok.
+return_member(PoolName, Pid) when is_pid(Pid) andalso
+                                  (is_atom(PoolName) orelse is_pid(PoolName)) ->
+    gen_server:call(PoolName, {return_member, Pid, ok}, infinity),
     ok;
-return_member(error_no_members) ->
+return_member(_, error_no_members) ->
     ok.
 
-% TODO:
-% remove_pool(Name, How) when How == graceful; How == immediate ->
-%     gen_server:call(?SERVER, {remove_pool, Name, How}).
-
-% TODO:
-% add_pool(Pool) ->
-%     gen_server:call(?SERVER, {add_pool, Pool}).
-
 %% @doc Obtain runtime state info for all pools.
 %%
 %% Format of the return value is subject to change.
--spec pool_stats() -> [tuple()].
-pool_stats() ->
-    gen_server:call(?SERVER, pool_stats).
+-spec pool_stats(atom() | pid()) -> [tuple()].
+pool_stats(PoolName) ->
+    gen_server:call(PoolName, pool_stats).
 
 %% ------------------------------------------------------------------
 %% gen_server Function Definitions
 %% ------------------------------------------------------------------
 
--spec init([any()]) -> {'ok', #state{npools::'undefined' | non_neg_integer(),
-                                     pools::dict(),
-                                     pool_sups::dict(),
-                                     all_members::dict(),
-                                     consumer_to_pid::dict(),
-                                     pool_selector::'undefined' | array()}}.
-init(Config) ->
+-spec init(#pool{}) -> {'ok', #pool{}, 0}.
+init(#pool{}=Pool) ->
+    %% FIXME: change to a monitor only model so that this doesn't have
+    %% to be a system process.
     process_flag(trap_exit, true),
-    PoolRecs = [ props_to_pool(P) || P <- ?gv(pools, Config) ],
-    Pools = [ {Pool#pool.name, Pool} || Pool <-  PoolRecs ],
-    PoolSups = [ begin
-                  {ok, SupPid} = supervisor:start_child(pooler_pool_sup, [MFA]),
-                  {Name, SupPid}
-                 end || #pool{name = Name, start_mfa = MFA} <- PoolRecs ],
-    State0 = #state{npools = length(Pools),
-                    pools = dict:from_list(Pools),
-                    pool_sups = dict:from_list(PoolSups),
-                    pool_selector = array:from_list([PN || {PN, _} <- Pools])
-                  },
-
-    lists:foldl(fun(#pool{name = PName, init_count = N}, {ok, AccState}) ->
-                        AccState1 = cull_members(PName, AccState),
-                        add_pids(PName, N, AccState1)
-                end, {ok, State0}, PoolRecs).
-
-handle_call(take_member, {CPid, _Tag},
-            #state{pool_selector = PS, npools = NP} = State) ->
-    % attempt to return a member from a randomly selected pool.  If
-    % that pool has no members, find the pool with most free members
-    % and return a member from there.
-    PoolName = array:get(crypto:rand_uniform(0, NP), PS),
-    case take_member(PoolName, CPid, State) of
-        {error_no_members, NewState} ->
-            case max_free_pool(State#state.pools) of
-                error_no_members ->
-                    {reply, error_no_members, NewState};
-                MaxFreePoolName ->
-                    {NewPid, State2} = take_member(MaxFreePoolName, CPid,
-                                                   NewState),
-                    {reply, NewPid, State2}
-            end;
-        {NewPid, NewState} ->
-            {reply, NewPid, NewState}
-    end;
-handle_call({take_member, PoolName}, {CPid, _Tag}, #state{} = State) ->
-    {Member, NewState} = take_member(PoolName, CPid, State),
-    {reply, Member, NewState};
-handle_call({return_member, Pid, Status}, {_CPid, _Tag}, State) ->
-    {reply, ok, do_return_member(Pid, Status, State)};
-handle_call(stop, _From, State) ->
-    {stop, normal, stop_ok, State};
-handle_call(pool_stats, _From, State) ->
-    {reply, dict:to_list(State#state.all_members), State};
-handle_call(_Request, _From, State) ->
-    {noreply, State}.
+    #pool{init_count = N} = Pool,
+    MemberSup = pooler_pool_sup:member_sup_name(Pool),
+    Pool1 = set_member_sup(Pool, MemberSup),
+    Pool2 = cull_members_from_pool(Pool1),
+    {ok, NewPool} = add_pids(N, Pool2),
+    %% trigger an immediate timeout, handled by handle_info to allow
+    %% us to register with pg2. We use the timeout mechanism to ensure
+    %% that a server is added to a group only when it is ready to
+    %% process messages.
+    {ok, NewPool, 0}.
+
+set_member_sup(#pool{} = Pool, MemberSup) ->
+    Pool#pool{member_sup = MemberSup}.
+
+handle_call(take_member, {CPid, _Tag}, #pool{} = Pool) ->
+    Retries = pool_add_retries(Pool),
+    {Member, NewPool} = take_member_from_pool(Pool, CPid, Retries),
+    {reply, Member, NewPool};
+handle_call({return_member, Pid, Status}, {_CPid, _Tag}, Pool) ->
+    {reply, ok, do_return_member(Pid, Status, Pool)};
+handle_call(stop, _From, Pool) ->
+    {stop, normal, stop_ok, Pool};
+handle_call(pool_stats, _From, Pool) ->
+    {reply, dict:to_list(Pool#pool.all_members), Pool};
+handle_call(_Request, _From, Pool) ->
+    {noreply, Pool}.
 
 -spec handle_cast(_,_) -> {'noreply', _}.
-handle_cast(_Msg, State) ->
-    {noreply, State}.
+handle_cast(_Msg, Pool) ->
+    {noreply, Pool}.
 
 -spec handle_info(_, _) -> {'noreply', _}.
+handle_info(timeout, #pool{group = undefined} = Pool) ->
+    %% ignore
+    {noreply, Pool};
+handle_info(timeout, #pool{group = Group} = Pool) ->
+    ok = pg2:create(Group),
+    ok = pg2:join(Group, self()),
+    {noreply, Pool};
 handle_info({'EXIT', Pid, Reason}, State) ->
     State1 =
-        case dict:find(Pid, State#state.all_members) of
+        case dict:find(Pid, State#pool.all_members) of
             {ok, {_PoolName, _ConsumerPid, _Time}} ->
                 do_return_member(Pid, fail, State);
             error ->
-                case dict:find(Pid, State#state.consumer_to_pid) of
+                case dict:find(Pid, State#pool.consumer_to_pid) of
                     {ok, Pids} ->
                         IsOk = case Reason of
                                    normal -> ok;
@@ -252,8 +230,8 @@ handle_info({'EXIT', Pid, Reason}, State) ->
                 end
         end,
     {noreply, State1};
-handle_info({cull_pool, PoolName}, State) ->
-    {noreply, cull_members(PoolName, State)};
+handle_info(cull_pool, Pool) ->
+    {noreply, cull_members_from_pool(Pool)};
 handle_info(_Info, State) ->
     {noreply, State}.
 
@@ -269,31 +247,19 @@ code_change(_OldVsn, State, _Extra) ->
 %% Internal Function Definitions
 %% ------------------------------------------------------------------
 
--spec props_to_pool([{atom(), term()}]) -> #pool{}.
-props_to_pool(P) ->
-    #pool{      name = ?gv(name, P),
-           max_count = ?gv(max_count, P),
-          init_count = ?gv(init_count, P),
-           start_mfa = ?gv(start_mfa, P),
-    add_member_retry = ?gv(add_member_retry, P, ?DEFAULT_ADD_RETRY),
-       cull_interval = ?gv(cull_interval, P, ?DEFAULT_CULL_INTERVAL),
-             max_age = ?gv(max_age, P, ?DEFAULT_MAX_AGE)}.
-
 % FIXME: creation of new pids should probably happen
 % in a spawned process to avoid tying up the loop.
--spec add_pids(error | string(), non_neg_integer(), #state{}) ->
-    {bad_pool_name | max_count_reached | ok, #state{}}.
-add_pids(error, _N, State) ->
-    {bad_pool_name, State};
-add_pids(PoolName, N, State) ->
-    #state{pools = Pools, all_members = AllMembers} = State,
-    Pool = fetch_pool(PoolName, Pools),
+-spec add_pids(non_neg_integer(), #pool{}) ->
+    {max_count_reached | ok, #pool{}}.
+add_pids(N, Pool) ->
     #pool{max_count = Max, free_pids = Free,
-          in_use_count = NumInUse, free_count = NumFree} = Pool,
+          in_use_count = NumInUse, free_count = NumFree,
+          member_sup = PoolSup,
+          all_members = AllMembers} = Pool,
     Total = NumFree + NumInUse,
+    PoolName = Pool#pool.name,
     case Total + N =< Max of
         true ->
-            PoolSup = dict:fetch(PoolName, State#state.pool_sups),
             {AllMembers1, NewPids} = start_n_pids(N, PoolName, PoolSup,
                                                   AllMembers),
             %% start_n_pids may return fewer than N if errors were
@@ -302,110 +268,105 @@ add_pids(PoolName, N, State) ->
             case NewPidCount =:= N of
                 true -> ok;
                 false ->
-                    error_logger:error_msg("tried to add ~B members, only added ~B~n",
-                                           [N, NewPidCount]),
-                    send_metric(<<"pooler.events">>,
+                    error_logger:error_msg("pool '~s' tried to add ~B members, only added ~B~n",
+                                           [PoolName, N, NewPidCount]),
+                    %% consider changing this to a count?
+                    send_metric(Pool, events,
                                 {add_pids_failed, N, NewPidCount}, history)
             end,
             Pool1 = Pool#pool{free_pids = Free ++ NewPids,
                               free_count = length(Free) + NewPidCount},
-            {ok, State#state{pools = store_pool(PoolName, Pool1, Pools),
-                             all_members = AllMembers1}};
+            {ok, Pool1#pool{all_members = AllMembers1}};
         false ->
-            {max_count_reached, State}
+            {max_count_reached, Pool}
     end.
 
--spec take_member(string(), {pid(), _}, #state{}) ->
-    {error_no_pool | error_no_members | pid(), #state{}}.
-take_member(PoolName, From, #state{pools = Pools} = State) ->
-    Pool = fetch_pool(PoolName, Pools),
-    take_member_from_pool(Pool, From, State, pool_add_retries(Pool)).
-
--spec take_member_from_pool(error_no_pool | #pool{}, {pid(), term()}, #state{},
+-spec take_member_from_pool(#pool{}, {pid(), term()},
                             non_neg_integer()) ->
-                                   {error_no_pool | error_no_members | pid(), #state{}}.
-take_member_from_pool(error_no_pool, _From, State, _) ->
-    {error_no_pool, State};
-take_member_from_pool(#pool{name = PoolName,
-                            max_count = Max,
+                                   {error_no_members | pid(), #pool{}}.
+take_member_from_pool(#pool{max_count = Max,
                             free_pids = Free,
                             in_use_count = NumInUse,
-                            free_count = NumFree} = Pool,
+                            free_count = NumFree,
+                            consumer_to_pid = CPMap} = Pool,
                       From,
-                      #state{pools = Pools, consumer_to_pid = CPMap} = State,
                       Retries) ->
-    send_metric(pool_metric(PoolName, take_rate), 1, meter),
+    send_metric(Pool, take_rate, 1, meter),
     case Free of
         [] when NumInUse =:= Max ->
-            send_metric(<<"pooler.error_no_members_count">>, {inc, 1}, counter),
-            send_metric(<<"pooler.events">>, error_no_members, history),
-            {error_no_members, State};
+            send_metric(Pool, error_no_members_count, {inc, 1}, counter),
+            send_metric(Pool, events, error_no_members, history),
+            {error_no_members, Pool};
         [] when NumInUse < Max andalso Retries > 0 ->
-            case add_pids(PoolName, 1, State) of
-                {ok, State1} ->
+            case add_pids(1, Pool) of
+                {ok, Pool1} ->
                     %% add_pids may have updated our pool
-                    Pool1 = fetch_pool(PoolName, State1#state.pools),
-                    take_member_from_pool(Pool1, From, State1, Retries - 1);
+                    take_member_from_pool(Pool1, From, Retries - 1);
                 {max_count_reached, _} ->
-                    send_metric(<<"pooler.error_no_members_count">>, {inc, 1}, counter),
-                    send_metric(<<"pooler.events">>, error_no_members, history),
-                    {error_no_members, State}
+                    send_metric(Pool, error_no_members_count, {inc, 1}, counter),
+                    send_metric(Pool, events, error_no_members, history),
+                    {error_no_members, Pool}
             end;
         [] when Retries =:= 0 ->
             %% max retries reached
-            send_metric(<<"pooler.error_no_members_count">>, {inc, 1}, counter),
-            {error_no_members, State};
+            send_metric(Pool, error_no_members_count, {inc, 1}, counter),
+            send_metric(Pool, events, error_no_members, history),
+            {error_no_members, Pool};
         [Pid|Rest] ->
             erlang:link(From),
             Pool1 = Pool#pool{free_pids = Rest, in_use_count = NumInUse + 1,
                               free_count = NumFree - 1},
-            send_metric(pool_metric(PoolName, in_use_count), Pool1#pool.in_use_count, histogram),
-            send_metric(pool_metric(PoolName, free_count), Pool1#pool.free_count, histogram),
-            {Pid, State#state{
-                    pools = store_pool(PoolName, Pool1, Pools),
+            send_metric(Pool, in_use_count, Pool1#pool.in_use_count, histogram),
+            send_metric(Pool, free_count, Pool1#pool.free_count, histogram),
+            {Pid, Pool1#pool{
                     consumer_to_pid = add_member_to_consumer(Pid, From, CPMap),
                     all_members = set_cpid_for_member(Pid, From,
-                                                      State#state.all_members)
+                                                      Pool1#pool.all_members)
                    }}
     end.
 
--spec do_return_member(pid(), ok | fail, #state{}) -> #state{}.
-do_return_member(Pid, ok, #state{all_members = AllMembers} = State) ->
+-spec do_return_member(pid(), ok | fail, #pool{}) -> #pool{}.
+do_return_member(Pid, ok, #pool{all_members = AllMembers} = Pool) ->
+    clean_group_table(Pid, Pool),
     case dict:find(Pid, AllMembers) of
         {ok, {PoolName, CPid, _}} ->
-            Pool = fetch_pool(PoolName, State#state.pools),
             #pool{free_pids = Free, in_use_count = NumInUse,
                   free_count = NumFree} = Pool,
             Pool1 = Pool#pool{free_pids = [Pid | Free], in_use_count = NumInUse - 1,
                               free_count = NumFree + 1},
             Entry = {PoolName, free, os:timestamp()},
-            State#state{pools = store_pool(PoolName, Pool1, State#state.pools),
-                        all_members = store_all_members(Pid, Entry, AllMembers),
-                        consumer_to_pid = cpmap_remove(Pid, CPid,
-                                                       State#state.consumer_to_pid)};
+            Pool1#pool{all_members = store_all_members(Pid, Entry, AllMembers),
+                       consumer_to_pid = cpmap_remove(Pid, CPid,
+                                                      Pool1#pool.consumer_to_pid)};
         error ->
-            State
+            Pool
     end;
-do_return_member(Pid, fail, #state{all_members = AllMembers} = State) ->
+do_return_member(Pid, fail, #pool{all_members = AllMembers} = Pool) ->
     % for the fail case, perhaps the member crashed and was alerady
     % removed, so use find instead of fetch and ignore missing.
+    clean_group_table(Pid, Pool),
     case dict:find(Pid, AllMembers) of
-        {ok, {PoolName, _, _}} ->
-            State1 = remove_pid(Pid, State),
-            case add_pids(PoolName, 1, State1) of
-                {Status, State2} when Status =:= ok;
-                                      Status =:= max_count_reached ->
-                    State2;
+        {ok, {_PoolName, _, _}} ->
+            Pool1 = remove_pid(Pid, Pool),
+            case add_pids(1, Pool1) of
+                {Status, Pool2} when Status =:= ok;
+                                     Status =:= max_count_reached ->
+                    Pool2;
                 {Status, _} ->
                     erlang:error({error, "unexpected return from add_pid",
                                   Status, erlang:get_stacktrace()}),
-                    send_metric(<<"pooler.events">>, bad_return_from_add_pid,
+                    send_metric(Pool1, events, bad_return_from_add_pid,
                                 history)
             end;
         error ->
-            State
+            Pool
     end.
 
+clean_group_table(_MemberPid, #pool{group = undefined}) ->
+    ok;
+clean_group_table(MemberPid, #pool{group = _GroupName}) ->
+    ets:delete(?POOLER_GROUP_TABLE, MemberPid).
+
 % @doc Remove `Pid' from the pid list associated with `CPid' in the
 % consumer to member map given by `CPMap'.
 %
@@ -436,58 +397,40 @@ cpmap_remove(Pid, CPid, CPMap) ->
 % Handles in-use and free members.  Logs an error if the pid is not
 % tracked in state.all_members.
 %
--spec remove_pid(pid(), #state{}) -> #state{}.
-remove_pid(Pid, State) ->
-    #state{all_members = AllMembers, pools = Pools,
-           consumer_to_pid = CPMap} = State,
+-spec remove_pid(pid(), #pool{}) -> #pool{}.
+remove_pid(Pid, Pool) ->
+    #pool{name = PoolName,
+          all_members = AllMembers,
+          consumer_to_pid = CPMap} = Pool,
     case dict:find(Pid, AllMembers) of
         {ok, {PoolName, free, _Time}} ->
             % remove an unused member
-            Pool = fetch_pool(PoolName, Pools),
             FreePids = lists:delete(Pid, Pool#pool.free_pids),
             NumFree = Pool#pool.free_count - 1,
             Pool1 = Pool#pool{free_pids = FreePids, free_count = NumFree},
             exit(Pid, kill),
-            send_metric(<<"pooler.killed_free_count">>, {inc, 1}, counter),
-            State#state{pools = store_pool(PoolName, Pool1, Pools),
-                        all_members = dict:erase(Pid, AllMembers)};
+            send_metric(Pool1, killed_free_count, {inc, 1}, counter),
+            Pool1#pool{all_members = dict:erase(Pid, AllMembers)};
         {ok, {PoolName, CPid, _Time}} ->
-            Pool = fetch_pool(PoolName, Pools),
             Pool1 = Pool#pool{in_use_count = Pool#pool.in_use_count - 1},
             exit(Pid, kill),
-            send_metric(<<"pooler.killed_in_use_count">>, {inc, 1}, counter),
-            State#state{pools = store_pool(PoolName, Pool1, Pools),
-                        consumer_to_pid = cpmap_remove(Pid, CPid, CPMap),
-                        all_members = dict:erase(Pid, AllMembers)};
+            send_metric(Pool1, killed_in_use_count, {inc, 1}, counter),
+            Pool1#pool{consumer_to_pid = cpmap_remove(Pid, CPid, CPMap),
+                       all_members = dict:erase(Pid, AllMembers)};
         error ->
-            error_logger:error_report({unknown_pid, Pid,
+            error_logger:error_report({{pool, PoolName}, unknown_pid, Pid,
                                        erlang:get_stacktrace()}),
-            send_metric(<<"pooler.event">>, unknown_pid, history),
-            State
-    end.
-
--spec max_free_pool(dict()) -> error_no_members | string().
-max_free_pool(Pools) ->
-    case dict:fold(fun fold_max_free_count/3, {"", 0}, Pools) of
-        {"", 0} -> error_no_members;
-        {MaxFreePoolName, _} -> MaxFreePoolName
-    end.
-
--spec fold_max_free_count(string(), #pool{}, {string(), non_neg_integer()}) ->
-    {string(), non_neg_integer()}.
-fold_max_free_count(Name, Pool, {CName, CMax}) ->
-    case Pool#pool.free_count > CMax of
-        true -> {Name, Pool#pool.free_count};
-        false -> {CName, CMax}
+            send_metric(Pool, events, unknown_pid, history),
+            Pool
     end.
 
-
--spec start_n_pids(non_neg_integer(), string(), pid(), dict()) ->
+-spec start_n_pids(non_neg_integer(), atom() | pid(), pid(), dict()) ->
     {dict(), [pid()]}.
 start_n_pids(N, PoolName, PoolSup, AllMembers) ->
     NewPids = do_n(N, fun(Acc) ->
                               case supervisor:start_child(PoolSup, []) of
                                   {ok, Pid} ->
+                                      %% FIXME: we should monitor instead
                                       erlang:link(Pid),
                                       [Pid | Acc];
                                   _Else ->
@@ -506,22 +449,8 @@ do_n(0, _Fun, Acc) ->
 do_n(N, Fun, Acc) ->
     do_n(N - 1, Fun, Fun(Acc)).
 
-
--spec fetch_pool(string(), dict()) -> #pool{} | error_no_pool.
-fetch_pool(PoolName, Pools) ->
-    case dict:find(PoolName, Pools) of
-        {ok, Pool} -> Pool;
-        error -> error_no_pool
-    end.
-
 pool_add_retries(#pool{add_member_retry = Retries}) ->
-    Retries;
-pool_add_retries(error_no_pool) ->
-    0.
-
--spec store_pool(string(), #pool{}, dict()) -> dict().
-store_pool(PoolName, Pool = #pool{}, Pools) ->
-    dict:store(PoolName, Pool, Pools).
+    Retries.
 
 -spec store_all_members(pid(),
                         {string(), free | pid(), {_, _, _}}, dict()) -> dict().
@@ -539,39 +468,34 @@ set_cpid_for_member(MemberPid, CPid, AllMembers) ->
 add_member_to_consumer(MemberPid, CPid, CPMap) ->
     dict:update(CPid, fun(O) -> [MemberPid|O] end, [MemberPid], CPMap).
 
--spec cull_members(string(), #state{}) -> #state{}.
-cull_members(PoolName, #state{pools = Pools} = State) ->
-    cull_members_from_pool(fetch_pool(PoolName, Pools), State).
-
--spec cull_members_from_pool(#pool{}, #state{}) -> #state{}.
-cull_members_from_pool(error_no_pool, State) ->
-    State;
-cull_members_from_pool(#pool{cull_interval = {0, _}}, State) ->
+-spec cull_members_from_pool(#pool{}) -> #pool{}.
+cull_members_from_pool(#pool{cull_interval = {0, _}} = Pool) ->
     %% 0 cull_interval means do not cull
-    State;
+    Pool;
 cull_members_from_pool(#pool{name = PoolName,
                              free_count = FreeCount,
                              init_count = InitCount,
                              in_use_count = InUseCount,
                              cull_interval = Delay,
-                             max_age = MaxAge} = Pool,
-                       #state{all_members = AllMembers} = State) ->
+                             max_age = MaxAge,
+                             all_members = AllMembers} = Pool) ->
     MaxCull = FreeCount - (InitCount - InUseCount),
-    State1 = case MaxCull > 0 of
-                 true ->
-                     MemberInfo = member_info(Pool#pool.free_pids, AllMembers),
-                     ExpiredMembers =
-                         expired_free_members(MemberInfo, os:timestamp(), MaxAge),
-                     CullList = lists:sublist(ExpiredMembers, MaxCull),
-                     lists:foldl(fun({CullMe, _}, S) -> remove_pid(CullMe, S) end,
-                                 State, CullList);
-                 false ->
-                     State
-             end,
+    Pool1 = case MaxCull > 0 of
+                true ->
+                    MemberInfo = member_info(Pool#pool.free_pids, AllMembers),
+                    ExpiredMembers =
+                        expired_free_members(MemberInfo, os:timestamp(), MaxAge),
+                    CullList = lists:sublist(ExpiredMembers, MaxCull),
+                    lists:foldl(fun({CullMe, _}, S) -> remove_pid(CullMe, S) end,
+                                Pool, CullList);
+                false ->
+                    Pool
+            end,
     schedule_cull(PoolName, Delay),
-    State1.
+    Pool1.
 
--spec schedule_cull(PoolName :: string(), Delay :: time_spec()) -> reference().
+-spec schedule_cull(PoolName :: atom() | pid(),
+                    Delay :: time_spec()) -> reference().
 %% @doc Schedule a pool cleaning or "cull" for `PoolName' in which
 %% members older than `max_age' will be removed until the pool has
 %% `init_count' members. Uses `erlang:send_after/3' for light-weight
@@ -580,7 +504,7 @@ schedule_cull(PoolName, Delay) ->
     DelayMillis = time_as_millis(Delay),
     %% use pid instead of server name atom to take advantage of
     %% automatic cancelling
-    erlang:send_after(DelayMillis, self(), {cull_pool, PoolName}).
+    erlang:send_after(DelayMillis, PoolName, cull_pool).
 
 -spec member_info([pid()], dict()) -> [{pid(), member_info()}].
 member_info(Pids, AllMembers) ->
@@ -594,22 +518,23 @@ expired_free_members(Members, Now, MaxAge) ->
     [ MI || MI = {_, {_, free, LastReturn}} <- Members,
             timer:now_diff(Now, LastReturn) >= MaxMicros ].
 
--spec send_metric(Name :: metric_label(),
-                  Value :: metric_value(),
-                  Type :: metric_type()) -> ok.
 %% Send a metric using the metrics module from application config or
 %% do nothing.
-send_metric(Name, Value, Type) ->
-    case application:get_env(pooler, metrics_module) of
-        undefined -> ok;
-        {ok, Mod} -> Mod:notify(Name, Value, Type)
-    end,
+-spec send_metric(Pool  :: #pool{},
+                  Label :: atom(),
+                  Value :: metric_value(),
+                  Type  :: metric_type()) -> ok.
+send_metric(#pool{metrics_mod = undefined}, _Label, _Value, _Type) ->
+    ok;
+send_metric(#pool{name = PoolName, metrics_mod = MetricsMod}, Label, Value, Type) ->
+    MetricName = pool_metric(PoolName, Label),
+    MetricsMod:notify(MetricName, Value, Type),
     ok.
 
--spec pool_metric(string(), 'free_count' | 'in_use_count' | 'take_rate') -> binary().
+-spec pool_metric(atom(), atom()) -> binary().
 pool_metric(PoolName, Metric) ->
-    iolist_to_binary([<<"pooler.">>, PoolName, ".",
-                      atom_to_binary(Metric, utf8)]).
+    iolist_to_binary([<<"pooler.">>, atom_to_binary(PoolName, utf8),
+                      ".", atom_to_binary(Metric, utf8)]).
 
 -spec time_as_millis(time_spec()) -> non_neg_integer().
 %% @doc Convert time unit into milliseconds.

+ 50 - 0
src/pooler.hrl

@@ -0,0 +1,50 @@
+-define(DEFAULT_ADD_RETRY, 1).
+-define(DEFAULT_CULL_INTERVAL, {0, min}).
+-define(DEFAULT_MAX_AGE, {0, min}).
+
+-define(POOLER_GROUP_TABLE, pooler_group_table).
+
+-type member_info() :: {string(), free | pid(), {_, _, _}}.
+-type free_member_info() :: {string(), free, {_, _, _}}.
+-type time_unit() :: min | sec | ms | mu.
+-type time_spec() :: {non_neg_integer(), time_unit()}.
+
+-record(pool, {
+          name             :: atom(),
+          group            :: atom(),
+          max_count = 100  :: non_neg_integer(),
+          init_count = 10  :: non_neg_integer(),
+          start_mfa        :: {atom(), atom(), [term()]},
+          free_pids = []   :: [pid()],
+          in_use_count = 0 :: non_neg_integer(),
+          free_count = 0   :: non_neg_integer(),
+          %% The number times to attempt adding a pool member if the
+          %% pool size is below max_count and there are no free
+          %% members. After this many tries, error_no_members will be
+          %% returned by a call to take_member. NOTE: this value
+          %% should be >= 2 or else the pool will not grow on demand
+          %% when max_count is larger than init_count.
+          add_member_retry = ?DEFAULT_ADD_RETRY :: non_neg_integer(),
+
+          %% The interval to schedule a cull message. Both
+          %% 'cull_interval' and 'max_age' are specified using a
+          %% `time_spec()' type.
+          cull_interval = ?DEFAULT_CULL_INTERVAL :: time_spec(),
+          %% The maximum age for members.
+          max_age = ?DEFAULT_MAX_AGE             :: time_spec(),
+
+          member_sup,
+          all_members = dict:new()     :: dict(),
+          consumer_to_pid = dict:new() :: dict(),
+
+          %% The module to use for collecting metrics. If set to
+          %% 'pooler_no_metrics', then metric sending calls do
+          %% nothing. A typical value to actually capture metrics is
+          %% folsom_metrics.
+          metrics_mod = pooler_no_metrics :: atom()
+         }).
+
+-define(gv(X, Y), proplists:get_value(X, Y)).
+-define(gv(X, Y, D), proplists:get_value(X, Y, D)).
+
+

+ 32 - 0
src/pooler_config.erl

@@ -0,0 +1,32 @@
+%% @author Seth Falcon <seth@userprimary.net>
+%% @copyright 2012 Seth Falcon
+%% @doc Helper module to transform app config proplists into pool records
+
+-module(pooler_config).
+
+-export([list_to_pool/1]).
+
+-include("pooler.hrl").
+
+-spec list_to_pool([{atom(), term()}]) -> #pool{}.
+list_to_pool(P) ->
+    #pool{
+       name              = req(name, P),
+       group             = ?gv(group, P),
+       max_count         = req(max_count, P),
+       init_count        = req(init_count, P),
+       start_mfa         = req(start_mfa, P),
+       add_member_retry  = ?gv(add_member_retry, P, ?DEFAULT_ADD_RETRY),
+       cull_interval     = ?gv(cull_interval, P, ?DEFAULT_CULL_INTERVAL),
+       max_age           = ?gv(max_age, P, ?DEFAULT_MAX_AGE),
+       metrics_mod       = ?gv(metrics_mod, P, pooler_no_metrics)}.
+
+%% Return `Value' for `Key' in proplist `P' or crashes with an
+%% informative message if no value is found.
+req(Key, P) ->
+    case lists:keyfind(Key, 1, P) of
+        false ->
+            error({missing_required_config, Key, P});
+        {Key, Value} ->
+            Value
+    end.

+ 27 - 9
src/pooler_pool_sup.erl

@@ -2,14 +2,32 @@
 
 -behaviour(supervisor).
 
--export([start_link/0, init/1]).
+-export([start_link/1, init/1,
+         pool_sup_name/1,
+         member_sup_name/1]).
 
-start_link() ->
-    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+-include("pooler.hrl").
 
-init([]) ->
-    Worker = {pooler_pooled_worker_sup,
-              {pooler_pooled_worker_sup, start_link, []},
-              temporary, 5000, supervisor, [pooler_pooled_worker_sup]},
-    Restart = {simple_one_for_one, 1, 1},
-    {ok, {Restart, [Worker]}}.
+start_link(#pool{} = Pool) ->
+    SupName = pool_sup_name(Pool),
+    supervisor:start_link({local, SupName}, ?MODULE, Pool).
+
+init(#pool{} = Pool) ->
+    PoolerSpec = {pooler,
+                  {pooler, start_link, [Pool]},
+                  transient,  5000, worker, [pooler]},
+    MemberSupName = member_sup_name(Pool),
+    MemberSupSpec = {MemberSupName,
+                     {pooler_pooled_worker_sup, start_link, [Pool]},
+                     transient, 5000, supervisor, [pooler_pooled_worker_sup]},
+
+    %% five restarts in 60 seconds, then shutdown
+    Restart = {one_for_all, 5, 60},
+    {ok, {Restart, [MemberSupSpec, PoolerSpec]}}.
+
+
+member_sup_name(#pool{name = PoolName}) ->
+    list_to_atom("pooler_" ++ atom_to_list(PoolName) ++ "_member_sup").
+
+pool_sup_name(#pool{name = PoolName}) ->
+    list_to_atom("pooler_" ++ atom_to_list(PoolName) ++ "_pool_sup").

+ 5 - 2
src/pooler_pooled_worker_sup.erl

@@ -4,8 +4,11 @@
 
 -export([start_link/1, init/1]).
 
-start_link(Config) ->
-    supervisor:start_link(?MODULE, Config).
+-include("pooler.hrl").
+
+start_link(#pool{start_mfa = {_, _, _} = MFA} = Pool) ->
+    SupName = pooler_pool_sup:member_sup_name(Pool),
+    supervisor:start_link({local, SupName}, ?MODULE, MFA).
 
 init({Mod, Fun, Args}) ->
     Worker = {Mod, {Mod, Fun, Args}, temporary, brutal_kill, worker, [Mod]},

+ 25 - 6
src/pooler_sup.erl

@@ -4,13 +4,32 @@
 
 -export([start_link/0, init/1]).
 
+-include("pooler.hrl").
+
 start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 init([]) ->
-    Config = application:get_all_env(pooler),
-    Pooler = {pooler, {pooler, start_link, [Config]},
-            permanent, 5000, worker, [pooler]},
-    PoolerPool = {pooler_pool_sup, {pooler_pool_sup, start_link, []},
-                permanent, 5000, supervisor, [pooler_pool_sup]},
-    {ok, {{one_for_one, 5, 10}, [PoolerPool, Pooler]}}.
+    %% a list of pool configs
+    {ok, Config} = application:get_env(pooler, pools),
+    MetricsConfig = {metrics_mod, metrics_module()},
+    Pools = [ pooler_config:list_to_pool([MetricsConfig | L]) || L <- Config ],
+    PoolSupSpecs = [ pool_sup_spec(Pool) || Pool <- Pools ],
+    ets:new(?POOLER_GROUP_TABLE, [set, public, named_table, {write_concurrency, true}]),
+    {ok, {{one_for_one, 5, 60}, PoolSupSpecs}}.
+
+pool_sup_spec(#pool{name = Name} = Pool) ->
+    SupName = pool_sup_name(Name),
+    {SupName, {pooler_pool_sup, start_link, [Pool]},
+     transient, 5000, supervisor, [pooler_pool_sup]}.
+
+pool_sup_name(Name) ->
+    list_to_atom("pooler_" ++ atom_to_list(Name) ++ "_pool_sup").
+
+metrics_module() ->
+    case application:get_env(pooler, metrics_module) of
+        {ok, Mod} ->
+            Mod;
+        undefined ->
+            pooler_no_metrics
+    end.

+ 3 - 3
test/pooler_perf_test.erl

@@ -13,7 +13,7 @@ setup(InitCount, MaxCount, NumPools) ->
                        N = integer_to_list(I),
                        Name = "p" ++ N,
                        Arg0 = "pool-" ++ Name,
-                       [{name, Name},
+                       [{name, list_to_atom(Name)},
                         {max_count, MaxCount},
                         {init_count, InitCount},
                         {start_mfa,
@@ -27,11 +27,11 @@ consumer_cycle(N) ->
     consumer_cycle(N, 0, 0).
 
 consumer_cycle(N, NumOk, NumFail) when N > 0 ->
-    P = pooler:take_member(),
+    P = pooler:take_member(p1),
     case P of
         Pid when is_pid(Pid) ->
             true = is_process_alive(P),
-            pooler:return_member(P, ok),
+            pooler:return_member(p1, P, ok),
             consumer_cycle(N - 1, NumOk + 1, NumFail);
         _ ->
             consumer_cycle(N - 1, NumOk, NumFail + 1)

+ 166 - 70
test/pooler_test.erl → test/pooler_tests.erl

@@ -1,4 +1,4 @@
--module(pooler_test).
+-module(pooler_tests).
 
 -include_lib("eunit/include/eunit.hrl").
 
@@ -28,7 +28,7 @@ user_crash(Pid) ->
     Pid ! crash.
 
 user_loop(Atom) when Atom =:= error_no_members orelse Atom =:= start ->
-    user_loop(pooler:take_member());
+    user_loop(pooler:take_member(test_pool_1));
 user_loop(MyTC) ->
     receive
         {get_tc_id, From} ->
@@ -41,11 +41,11 @@ user_loop(MyTC) ->
             From ! pooled_gs:ping_count(MyTC),
             user_loop(MyTC);
         new_tc ->
-            pooler:return_member(MyTC, ok),
-            MyNewTC = pooler:take_member(),
+            pooler:return_member(test_pool_1, MyTC, ok),
+            MyNewTC = pooler:take_member(test_pool_1),
             user_loop(MyNewTC);
         stop ->
-            pooler:return_member(MyTC, ok),
+            pooler:return_member(test_pool_1, MyTC, ok),
             stopped;
         crash ->
             erlang:error({user_loop, kaboom})
@@ -113,13 +113,14 @@ pooler_basics_test_() ->
     {foreach,
      % setup
      fun() ->
-             Pools = [[{name, "p1"},
+             Pools = [[{name, test_pool_1},
                        {max_count, 3},
                        {init_count, 2},
                        {start_mfa,
                         {pooled_gs, start_link, [{"type-0"}]}}]],
              application:set_env(pooler, pools, Pools),
              error_logger:delete_report_handler(error_logger_tty_h),
+             application:start(crypto),
              application:start(pooler)
      end,
      fun(_X) ->
@@ -128,34 +129,35 @@ pooler_basics_test_() ->
      [
       {"there are init_count members at start",
        fun() ->
-               Stats = [ P || {P, {_, free, _}} <- pooler:pool_stats() ],
+               Stats = [ P || {P, {_, free, _}} <- pooler:pool_stats(test_pool_1) ],
                ?assertEqual(2, length(Stats))
        end},
 
       {"take and return one",
        fun() ->
-               P = pooler:take_member(),
+               P = pooler:take_member(test_pool_1),
                ?assertMatch({"type-0", _Id}, pooled_gs:get_id(P)),
-               ok = pooler:return_member(P, ok)
+               ok = pooler:return_member(test_pool_1, P, ok)
        end},
 
       {"take and return one, named pool",
        fun() ->
-               P = pooler:take_member("p1"),
+               P = pooler:take_member(test_pool_1),
                ?assertMatch({"type-0", _Id}, pooled_gs:get_id(P)),
-               ok, pooler:return_member(P)
+               ok, pooler:return_member(test_pool_1, P)
        end},
 
       {"attempt to take form unknown pool",
        fun() ->
-               ?assertEqual(error_no_pool, pooler:take_member("bad_pool_name"))
+               %% since pools are now servers, an unknown pool will timeout
+               ?assertExit({noproc, _}, pooler:take_member(bad_pool_name))
        end},
 
       {"pids are created on demand until max",
        fun() ->
-               Pids = [pooler:take_member(), pooler:take_member(), pooler:take_member()],
-               ?assertEqual(error_no_members, pooler:take_member()),
-               ?assertEqual(error_no_members, pooler:take_member()),
+               Pids = [pooler:take_member(test_pool_1), pooler:take_member(test_pool_1), pooler:take_member(test_pool_1)],
+               ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
+               ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
                PRefs = [ R || {_T, R} <- [ pooled_gs:get_id(P) || P <- Pids ] ],
                % no duplicates
                ?assertEqual(length(PRefs), length(lists:usort(PRefs)))
@@ -164,20 +166,20 @@ pooler_basics_test_() ->
 
       {"pids are reused most recent return first",
        fun() ->
-               P1 = pooler:take_member(),
-               P2 = pooler:take_member(),
+               P1 = pooler:take_member(test_pool_1),
+               P2 = pooler:take_member(test_pool_1),
                ?assertNot(P1 == P2),
-               ok = pooler:return_member(P1, ok),
-               ok = pooler:return_member(P2, ok),
+               ok = pooler:return_member(test_pool_1, P1, ok),
+               ok = pooler:return_member(test_pool_1, P2, ok),
                % pids are reused most recent first
-               ?assertEqual(P2, pooler:take_member()),
-               ?assertEqual(P1, pooler:take_member())
+               ?assertEqual(P2, pooler:take_member(test_pool_1)),
+               ?assertEqual(P1, pooler:take_member(test_pool_1))
        end},
 
       {"if an in-use pid crashes it is replaced",
        fun() ->
-               Pids0 = [pooler:take_member(), pooler:take_member(),
-                        pooler:take_member()],
+               Pids0 = [pooler:take_member(test_pool_1), pooler:take_member(test_pool_1),
+                        pooler:take_member(test_pool_1)],
                Ids0 = [ pooled_gs:get_id(P) || P <- Pids0 ],
                % crash them all
                [ pooled_gs:crash(P) || P <- Pids0 ],
@@ -189,7 +191,7 @@ pooler_basics_test_() ->
 
       {"if a free pid crashes it is replaced",
        fun() ->
-               FreePids = [ P || {P, {_, free, _}} <- pooler:pool_stats() ],
+               FreePids = [ P || {P, {_, free, _}} <- pooler:pool_stats(test_pool_1) ],
                [ exit(P, kill) || P <- FreePids ],
                Pids1 = get_n_pids(3, []),
                ?assertEqual(3, length(Pids1))
@@ -197,10 +199,10 @@ pooler_basics_test_() ->
 
       {"if a pid is returned with bad status it is replaced",
        fun() ->
-               Pids0 = [pooler:take_member(), pooler:take_member(), pooler:take_member()],
+               Pids0 = [pooler:take_member(test_pool_1), pooler:take_member(test_pool_1), pooler:take_member(test_pool_1)],
                Ids0 = [ pooled_gs:get_id(P) || P <- Pids0 ],
                % return them all marking as bad
-               [ pooler:return_member(P, fail) || P <- Pids0 ],
+               [ pooler:return_member(test_pool_1, P, fail) || P <- Pids0 ],
                Pids1 = get_n_pids(3, []),
                Ids1 = [ pooled_gs:get_id(P) || P <- Pids1 ],
                [ ?assertNot(lists:member(I, Ids0)) || I <- Ids1 ]
@@ -222,16 +224,16 @@ pooler_basics_test_() ->
        fun() ->
                Bogus1 = spawn(fun() -> ok end),
                Bogus2 = spawn(fun() -> ok end),
-               ?assertEqual(ok, pooler:return_member(Bogus1, ok)),
-               ?assertEqual(ok, pooler:return_member(Bogus2, fail))
+               ?assertEqual(ok, pooler:return_member(test_pool_1, Bogus1, ok)),
+               ?assertEqual(ok, pooler:return_member(test_pool_1, Bogus2, fail))
        end
       },
 
       {"calling return_member on error_no_members is ignored",
        fun() ->
-               ?assertEqual(ok, pooler:return_member(error_no_members)),
-               ?assertEqual(ok, pooler:return_member(error_no_members, ok)),
-               ?assertEqual(ok, pooler:return_member(error_no_members, fail))
+               ?assertEqual(ok, pooler:return_member(test_pool_1, error_no_members)),
+               ?assertEqual(ok, pooler:return_member(test_pool_1, error_no_members, ok)),
+               ?assertEqual(ok, pooler:return_member(test_pool_1, error_no_members, fail))
        end
       },
 
@@ -239,36 +241,130 @@ pooler_basics_test_() ->
        fun() ->
                %% exercise the API to ensure we have certain keys reported as metrics
                fake_metrics:reset_metrics(),
-               Pids = [ pooler:take_member() || _I <- lists:seq(1, 10) ],
-               [ pooler:return_member(P) || P <- Pids ],
-               pooler:take_member("bad_pool_name"),
+               Pids = [ pooler:take_member(test_pool_1) || _I <- lists:seq(1, 10) ],
+               [ pooler:return_member(test_pool_1, P) || P <- Pids ],
+               catch pooler:take_member(bad_pool_name),
                %% kill and unused member
                exit(hd(Pids), kill),
                %% kill a used member
-               KillMe = pooler:take_member("p1"),
+               KillMe = pooler:take_member(test_pool_1),
                exit(KillMe, kill),
                %% FIXME: We need to wait for pooler to process the
                %% exit message. This is ugly, will fix later.
                timer:sleep(200),                % :(
-               ExpectKeys = [<<"pooler.error_no_members_count">>,
-                             <<"pooler.events">>,
-                             <<"pooler.killed_free_count">>,
-                             <<"pooler.killed_in_use_count">>,
-                             <<"pooler.p1.free_count">>,
-                             <<"pooler.p1.in_use_count">>,
-                             <<"pooler.p1.take_rate">>],
+               ExpectKeys = lists:sort([<<"pooler.test_pool_1.error_no_members_count">>,
+                                        <<"pooler.test_pool_1.events">>,
+                                        <<"pooler.test_pool_1.free_count">>,
+                                        <<"pooler.test_pool_1.in_use_count">>,
+                                        <<"pooler.test_pool_1.killed_free_count">>,
+                                        <<"pooler.test_pool_1.killed_in_use_count">>,
+                                        <<"pooler.test_pool_1.take_rate">>]),
                Metrics = fake_metrics:get_metrics(),
                GotKeys = lists:usort([ Name || {Name, _, _} <- Metrics ]),
                ?assertEqual(ExpectKeys, GotKeys)
        end}
      ]}}.
 
+pooler_groups_test_() ->
+    {setup,
+     fun() ->
+             application:set_env(pooler, metrics_module, fake_metrics),
+             fake_metrics:start_link()
+     end,
+     fun(_X) ->
+             fake_metrics:stop()
+     end,
+    {foreach,
+     % setup
+     fun() ->
+             Pools = [[{name, test_pool_1},
+                       {group, group_1},
+                       {max_count, 3},
+                       {init_count, 2},
+                       {start_mfa,
+                        {pooled_gs, start_link, [{"type-1-1"}]}}],
+                      [{name, test_pool_2},
+                       {group, group_1},
+                       {max_count, 3},
+                       {init_count, 2},
+                       {start_mfa,
+                        {pooled_gs, start_link, [{"type-1-2"}]}}],
+                      %% test_pool_3 not part of the group
+                      [{name, test_pool_3},
+                       {group, undefined},
+                       {max_count, 3},
+                       {init_count, 2},
+                       {start_mfa,
+                        {pooled_gs, start_link, [{"type-3"}]}}]
+                     ],
+             application:set_env(pooler, pools, Pools),
+             %% error_logger:delete_report_handler(error_logger_tty_h),
+             application:start(crypto),
+             pg2:start(),
+             application:start(pooler)
+     end,
+     fun(_X) ->
+             application:stop(pooler)
+     end,
+     [
+      {"take and return one group member (repeated)",
+       fun() ->
+               Types = [ begin
+                             Pid = pooler:take_group_member(group_1),
+                             {Type, _} = pooled_gs:get_id(Pid),
+                             ?assertMatch("type-1" ++ _, Type),
+                             ok = pooler:return_group_member(group_1, Pid, ok),
+                             Type
+                         end
+                         || _I <- lists:seq(1, 50) ],
+               Type_1_1 = [ X || "type-1-1" = X <- Types ],
+               Type_1_2 = [ X || "type-1-2" = X <- Types ],
+               ?assert(length(Type_1_1) > 0),
+               ?assert(length(Type_1_2) > 0)
+       end},
+
+      {"take member from unknown group",
+       fun() ->
+               ?assertEqual({error, {no_such_group, not_a_group}},
+                            pooler:take_group_member(not_a_group))
+       end},
+
+      {"return member to group, implied ok",
+       fun() ->
+               Pid = pooler:take_group_member(group_1),
+               ?assertEqual(ok, pooler:return_group_member(group_1, Pid))
+       end},
+
+      {"return error_no_member to group",
+       fun() ->
+               ?assertEqual(ok, pooler:return_group_member(group_1, error_no_members))
+       end},
+      
+
+      {"exhaust pools in group",
+       fun() ->
+               Pids = [ pooler:take_group_member(group_1) || _I <- lists:seq(1, 6) ],
+               %% they should all be pids
+               [ begin
+                     {Type, _} = pooled_gs:get_id(P),
+                     ?assertMatch("type-1" ++ _, Type),
+                     ok
+                 end || P <- Pids ],
+               %% further attempts should be error
+               [error_no_members,
+                error_no_members,
+                error_no_members] = [ pooler:take_group_member(group_1)
+                                      || _I <- lists:seq(1, 3) ]
+       end}
+     ]}}.
+               
+
 pooler_limit_failed_adds_test_() ->
     %% verify that pooler crashes completely if too many failures are
     %% encountered while trying to add pids.
     {setup,
      fun() ->
-             Pools = [[{name, "p1"},
+             Pools = [[{name, test_pool_1},
                        {max_count, 10},
                        {init_count, 10},
                        {start_mfa,
@@ -280,8 +376,8 @@ pooler_limit_failed_adds_test_() ->
      end,
      fun() ->
              application:start(pooler),
-             ?assertEqual(error_no_members, pooler:take_member()),
-             ?assertEqual(error_no_members, pooler:take_member("p1"))
+             ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
+             ?assertEqual(error_no_members, pooler:take_member(test_pool_1))
      end}.
 
 pooler_scheduled_cull_test_() ->
@@ -289,13 +385,13 @@ pooler_scheduled_cull_test_() ->
      fun() ->
              application:set_env(pooler, metrics_module, fake_metrics),
              fake_metrics:start_link(),
-             Pools = [[{name, "p1"},
+             Pools = [[{name, test_pool_1},
                        {max_count, 10},
                        {init_count, 2},
                        {start_mfa, {pooled_gs, start_link, [{"type-0"}]}},
                        {cull_interval, {200, ms}}]],
              application:set_env(pooler, pools, Pools),
-             error_logger:delete_report_handler(error_logger_tty_h),
+             %% error_logger:delete_report_handler(error_logger_tty_h),
              application:start(pooler)
      end,
      fun(_X) ->
@@ -305,52 +401,52 @@ pooler_scheduled_cull_test_() ->
      [{"excess members are culled repeatedly",
        fun() ->
                %% take all members
-               Pids1 = [ pooler:take_member("p1") || _X <- lists:seq(1, 10) ],
+               Pids1 = [ pooler:take_member(test_pool_1) || _X <- lists:seq(1, 10) ],
                %% return all
-               [ pooler:return_member(P) || P <- Pids1 ],
-               ?assertEqual(10, length(pooler:pool_stats())),
+               [ pooler:return_member(test_pool_1, P) || P <- Pids1 ],
+               ?assertEqual(10, length(pooler:pool_stats(test_pool_1))),
                %% wait for longer than cull delay
                timer:sleep(250),
-               ?assertEqual(2, length(pooler:pool_stats())),
+               ?assertEqual(2, length(pooler:pool_stats(test_pool_1))),
 
                %% repeat the test to verify that culling gets rescheduled.
-               Pids2 = [ pooler:take_member("p1") || _X <- lists:seq(1, 10) ],
+               Pids2 = [ pooler:take_member(test_pool_1) || _X <- lists:seq(1, 10) ],
                %% return all
-               [ pooler:return_member(P) || P <- Pids2 ],
-               ?assertEqual(10, length(pooler:pool_stats())),
+               [ pooler:return_member(test_pool_1, P) || P <- Pids2 ],
+               ?assertEqual(10, length(pooler:pool_stats(test_pool_1))),
                %% wait for longer than cull delay
                timer:sleep(250),
-               ?assertEqual(2, length(pooler:pool_stats()))
+               ?assertEqual(2, length(pooler:pool_stats(test_pool_1)))
        end
       },
 
       {"non-excess members are not culled",
        fun() ->
-               [P1, P2] = [pooler:take_member("p1") || _X <- [1, 2] ],
-               [pooler:return_member(P) || P <- [P1, P2] ],
-               ?assertEqual(2, length(pooler:pool_stats())),
+               [P1, P2] = [pooler:take_member(test_pool_1) || _X <- [1, 2] ],
+               [pooler:return_member(test_pool_1, P) || P <- [P1, P2] ],
+               ?assertEqual(2, length(pooler:pool_stats(test_pool_1))),
                timer:sleep(250),
-               ?assertEqual(2, length(pooler:pool_stats()))
+               ?assertEqual(2, length(pooler:pool_stats(test_pool_1)))
        end
       },
 
       {"in-use members are not culled",
        fun() ->
                %% take all members
-               Pids = [ pooler:take_member("p1") || _X <- lists:seq(1, 10) ],
+               Pids = [ pooler:take_member(test_pool_1) || _X <- lists:seq(1, 10) ],
                %% don't return any
-               ?assertEqual(10, length(pooler:pool_stats())),
+               ?assertEqual(10, length(pooler:pool_stats(test_pool_1))),
                %% wait for longer than cull delay
                timer:sleep(250),
-               ?assertEqual(10, length(pooler:pool_stats())),
-               [ pooler:return_member(P) || P <- Pids ]
+               ?assertEqual(10, length(pooler:pool_stats(test_pool_1))),
+               [ pooler:return_member(test_pool_1, P) || P <- Pids ]
        end}
      ]}.
 
 random_message_test_() ->
     {setup,
      fun() ->
-             Pools = [[{name, "p1"},
+             Pools = [[{name, test_pool_1},
                        {max_count, 2},
                        {init_count, 1},
                        {start_mfa,
@@ -360,9 +456,9 @@ random_message_test_() ->
              application:start(pooler),
              %% now send some bogus messages
              %% do the call in a throw-away process to avoid timeout error
-             spawn(fun() -> catch gen_server:call(pooler, {unexpected_garbage_msg, 5}) end),
-             gen_server:cast(pooler, {unexpected_garbage_msg, 6}),
-            whereis(pooler) ! {unexpected_garbage_msg, 7},
+             spawn(fun() -> catch gen_server:call(test_pool_1, {unexpected_garbage_msg, 5}) end),
+             gen_server:cast(test_pool_1, {unexpected_garbage_msg, 6}),
+             whereis(test_pool_1) ! {unexpected_garbage_msg, 7},
              ok
      end,
      fun(_) ->
@@ -370,7 +466,7 @@ random_message_test_() ->
      end,
     [
      fun() ->
-             Pid = pooler:take_member("p1"),
+             Pid = pooler:take_member(test_pool_1),
              {Type, _} =  pooled_gs:get_id(Pid),
              ?assertEqual("type-0", Type)
      end
@@ -380,7 +476,7 @@ pooler_integration_test_() ->
     {foreach,
      % setup
      fun() ->
-             Pools = [[{name, "p1"},
+             Pools = [[{name, test_pool_1},
                        {max_count, 10},
                        {init_count, 10},
                        {start_mfa,
@@ -460,7 +556,7 @@ time_as_micros_test_() ->
 get_n_pids(0, Acc) ->
     Acc;
 get_n_pids(N, Acc) ->
-    case pooler:take_member() of
+    case pooler:take_member(test_pool_1) of
         error_no_members ->
             get_n_pids(N, Acc);
         Pid ->