Просмотр исходного кода

start members asynchronously using pooler_starter

Members are started using a pooler_starter server and returned to the
pool via pooler:accept_member.

Tests pass after using get_n_pids as needed to add retry since member
addition is now async and triggered by a take_member call.
Seth Falcon 12 лет назад
Родитель
Сommit
b2cd30a72f
7 измененных файлов с 259 добавлено и 40 удалено
  1. 82 23
      src/pooler.erl
  2. 11 1
      src/pooler.hrl
  3. 100 0
      src/pooler_starter.erl
  4. 26 0
      src/pooler_starter_sup.erl
  5. 5 1
      src/pooler_sup.erl
  6. 4 1
      test/pooler_perf_test.erl
  7. 31 14
      test/pooler_tests.erl

+ 82 - 23
src/pooler.erl

@@ -27,7 +27,8 @@
 %% API Function Exports
 %% ------------------------------------------------------------------
 
--export([start_link/1,
+-export([accept_member/2,
+         start_link/1,
          take_member/1,
          take_group_member/1,
          return_group_member/2,
@@ -59,6 +60,11 @@
 start_link(#pool{name = Name} = Pool) ->
     gen_server:start_link({local, Name}, ?MODULE, Pool, []).
 
+%% @doc For INTERNAL use. Adds `MemberPid' to the pool.
+-spec accept_member(atom() | pid(), pid() | {noproc, _}) -> ok.
+accept_member(PoolName, MemberPid) ->
+    gen_server:call(PoolName, {accept_member, MemberPid}).
+
 %% @doc Obtain exclusive access to a member from `PoolName'.
 %%
 %% If no free members are available, 'error_no_members' is returned.
@@ -183,15 +189,18 @@ set_member_sup(#pool{} = Pool, MemberSup) ->
     Pool#pool{member_sup = MemberSup}.
 
 handle_call(take_member, {CPid, _Tag}, #pool{} = Pool) ->
-    Retries = pool_add_retries(Pool),
-    {Member, NewPool} = take_member_from_pool(Pool, CPid, Retries),
+    {Member, NewPool} = take_member_from_pool(Pool, CPid),
     {reply, Member, NewPool};
 handle_call({return_member, Pid, Status}, {_CPid, _Tag}, Pool) ->
     {reply, ok, do_return_member(Pid, Status, Pool)};
+handle_call({accept_member, Pid}, _From, Pool) ->
+    {reply, ok, do_accept_member(Pid, Pool)};
 handle_call(stop, _From, Pool) ->
     {stop, normal, stop_ok, Pool};
 handle_call(pool_stats, _From, Pool) ->
     {reply, dict:to_list(Pool#pool.all_members), Pool};
+handle_call(dump_pool, _From, Pool) ->
+    {reply, Pool, Pool};
 handle_call(_Request, _From, Pool) ->
     {noreply, Pool}.
 
@@ -244,6 +253,41 @@ code_change(_OldVsn, State, _Extra) ->
 %% Internal Function Definitions
 %% ------------------------------------------------------------------
 
+do_accept_member({Ref, Pid},
+                 #pool{
+                    all_members = AllMembers,
+                    free_pids = Free,
+                    free_count = NumFree,
+                    starting_members = StartingMembers
+                   } = Pool) when is_pid(Pid) ->
+    case lists:keymember(Ref, 1, StartingMembers) of
+        false ->
+            %% a pid we didn't ask to start, ignore it.
+            %% should we log it?
+            Pool;
+        true ->
+            StartingMembers1 = lists:keydelete(Ref, 1, StartingMembers),
+            MRef = erlang:monitor(process, Pid),
+            Entry = {MRef, free, os:timestamp()},
+            AllMembers1 = store_all_members(Pid, Entry, AllMembers),
+            Pool#pool{free_pids = Free ++ [Pid],
+                      free_count = NumFree + 1,
+                      all_members = AllMembers1,
+                      starting_members = StartingMembers1}
+    end;
+do_accept_member({Ref, _Reason}, #pool{starting_members = StartingMembers} = Pool) ->
+    %% member start failed, remove in-flight ref and carry on.
+    StartingMembers1 = lists:keydelete(Ref, 1, StartingMembers),
+    Pool#pool{starting_members = StartingMembers1}.
+
+-spec remove_stale_starting_members([{reference(), erlang:timestamp()}], time_spec()) -> [{reference(), erlang:timestamp()}].
+remove_stale_starting_members(StartingMembers, MaxAge) ->
+    Now = calendar:time_to_seconds(os:timestamp()),
+    lists:filter(fun({_Ref, StartTime}) ->
+                         StartSecs = calendar:time_to_seconds(StartTime),
+                         (Now - StartSecs) < MaxAge
+                 end, StartingMembers).
+
 % FIXME: creation of new pids should probably happen
 % in a spawned process to avoid tying up the loop.
 -spec add_pids(non_neg_integer(), #pool{}) ->
@@ -277,37 +321,48 @@ add_pids(N, Pool) ->
             {max_count_reached, Pool}
     end.
 
--spec take_member_from_pool(#pool{}, {pid(), term()},
-                            non_neg_integer()) ->
+-spec take_member_from_pool(#pool{}, {pid(), term()}) ->
                                    {error_no_members | pid(), #pool{}}.
 take_member_from_pool(#pool{max_count = Max,
                             free_pids = Free,
                             in_use_count = NumInUse,
                             free_count = NumFree,
-                            consumer_to_pid = CPMap} = Pool,
-                      From,
-                      Retries) ->
+                            consumer_to_pid = CPMap,
+                            starting_members = StartingMembers} = Pool,
+                      From) ->
     send_metric(Pool, take_rate, 1, meter),
+    CanAdd = (NumInUse + length(StartingMembers)) < Max,
     case Free of
-        [] when NumInUse =:= Max ->
+        [] when CanAdd =:= false  ->
             send_metric(Pool, error_no_members_count, {inc, 1}, counter),
             send_metric(Pool, events, error_no_members, history),
             {error_no_members, Pool};
-        [] when NumInUse < Max andalso Retries > 0 ->
-            case add_pids(1, Pool) of
-                {ok, Pool1} ->
-                    %% add_pids may have updated our pool
-                    take_member_from_pool(Pool1, From, Retries - 1);
-                {max_count_reached, _} ->
-                    send_metric(Pool, error_no_members_count, {inc, 1}, counter),
-                    send_metric(Pool, events, error_no_members, history),
-                    {error_no_members, Pool}
-            end;
-        [] when Retries =:= 0 ->
-            %% max retries reached
+        [] when CanAdd =:= true ->
+            %% request async member creation, return error for now.
+            %% also should verify that starting_members length will
+            %% not exceed max size if all come back success.  need a
+            %% reference to the starter supervisor or else we reuse a
+            %% starter for now.
+            {ok, Starter} = pooler_starter_sup:new_starter(),
+            StartRef = pooler_starter:start_member(Starter, Pool),
+            %% case add_pids(1, Pool) of
+            %%     {ok, Pool1} ->
+            %%         %% add_pids may have updated our pool
+            %%         take_member_from_pool(Pool1, From, Retries - 1);
+            %%     {max_count_reached, _} ->
+            %%         send_metric(Pool, error_no_members_count, {inc, 1}, counter),
+            %%         send_metric(Pool, events, error_no_members, history),
+            %%         {error_no_members, Pool}
+            %% end,
             send_metric(Pool, error_no_members_count, {inc, 1}, counter),
             send_metric(Pool, events, error_no_members, history),
-            {error_no_members, Pool};
+            StartingMembers1 = [{StartRef, os:timestamp()} | StartingMembers],
+            {error_no_members, Pool#pool{starting_members = StartingMembers1}};
+        %% [] when Retries =:= 0 ->
+        %%     %% max retries reached
+        %%     send_metric(Pool, error_no_members_count, {inc, 1}, counter),
+        %%     send_metric(Pool, events, error_no_members, history),
+        %%     {error_no_members, Pool};
         [Pid|Rest] ->
             Pool1 = Pool#pool{free_pids = Rest, in_use_count = NumInUse + 1,
                               free_count = NumFree - 1},
@@ -531,7 +586,7 @@ expired_free_members(Members, Now, MaxAge) ->
                   Label :: atom(),
                   Value :: metric_value(),
                   Type  :: metric_type()) -> ok.
-send_metric(#pool{metrics_mod = undefined}, _Label, _Value, _Type) ->
+send_metric(#pool{metrics_mod = pooler_no_metrics}, _Label, _Value, _Type) ->
     ok;
 send_metric(#pool{name = PoolName, metrics_mod = MetricsMod}, Label, Value, Type) ->
     MetricName = pool_metric(PoolName, Label),
@@ -543,6 +598,10 @@ pool_metric(PoolName, Metric) ->
     iolist_to_binary([<<"pooler.">>, atom_to_binary(PoolName, utf8),
                       ".", atom_to_binary(Metric, utf8)]).
 
+-spec time_as_secs(time_spec()) -> non_neg_integer().
+time_as_secs({Time, Unit}) ->
+    time_as_micros({Time, Unit}) div 1000000.
+
 -spec time_as_millis(time_spec()) -> non_neg_integer().
 %% @doc Convert time unit into milliseconds.
 time_as_millis({Time, Unit}) ->

+ 11 - 1
src/pooler.hrl

@@ -1,7 +1,7 @@
 -define(DEFAULT_ADD_RETRY, 1).
 -define(DEFAULT_CULL_INTERVAL, {0, min}).
 -define(DEFAULT_MAX_AGE, {0, min}).
-
+-define(DEFAULT_MEMBER_START_TIMEOUT, {1, min}).
 -define(POOLER_GROUP_TABLE, pooler_group_table).
 
 -type member_info() :: {string(), free | pid(), {_, _, _}}.
@@ -36,6 +36,10 @@
           %% The supervisor used to start new members
           member_sup :: atom() | pid(),
 
+          %% The supervisor used to start starter servers that start
+          %% new members. This is what enables async member starts.
+          starter_sup :: atom() | pid(),
+
           %% Maps member pid to a tuple of the form:
           %% {MonitorRef, Status, Time},
           %% where MonitorRef is a monitor reference for the member,,
@@ -50,6 +54,12 @@
           %% members being consumed.
           consumer_to_pid = dict:new() :: dict(),
 
+          %% A list of `{References, Timestamp}' tuples representing
+          %% new member start requests that are in-flight. The
+          %% timestamp records when the start request was initiated
+          %% and is used to implement start timeout.
+          starting_members = [] :: [{reference(), erlang:timestamp()}],
+
           %% The module to use for collecting metrics. If set to
           %% 'pooler_no_metrics', then metric sending calls do
           %% nothing. A typical value to actually capture metrics is

+ 100 - 0
src/pooler_starter.erl

@@ -0,0 +1,100 @@
+%% @author Seth Falcon <seth@userprimary.net>
+%% @copyright 2012 Seth Falcon
+%% @doc Helper gen_server to start pool members
+%%
+-module(pooler_starter).
+-behaviour(gen_server).
+
+-include("pooler.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+%% ------------------------------------------------------------------
+%% API Function Exports
+%% ------------------------------------------------------------------
+
+-export([start_link/0,
+         start_member/2]).
+
+%% ------------------------------------------------------------------
+%% gen_server Function Exports
+%% ------------------------------------------------------------------
+
+-export([init/1,
+         handle_call/3,
+         handle_cast/2,
+         handle_info/2,
+         terminate/2,
+         code_change/3]).
+
+%% To help with testing internal functions
+-ifdef(TEST).
+-compile([export_all]).
+-endif.
+
+%% ------------------------------------------------------------------
+%% API Function Definitions
+%% ------------------------------------------------------------------
+
+start_link() ->
+    gen_server:start_link(?MODULE, [], []).
+
+%% @doc Start a member for the specified `Pool'.
+%%
+%% The start member request is a sent as a cast to the starter
+%% server. The starter server mailbox is treated as the member start
+%% work queue. Members are started serially and sent back to the
+%% requesting pool via `pooler:accept_member/2'. It is expected that
+%% callers keep track of, and limit, their start requests so that the
+%% starters queue doesn't grow unbounded. A likely enhancement would
+%% be to allow parallel starts either by having the starter spawn a
+%% subprocess and manage or by using pg2 to group a number of starter
+%% servers. Note that timeout could be handled client-side using the
+%% `gen_server:call/3' timeout value.
+-spec start_member(atom() | pid(), #pool{}) -> reference().
+start_member(Starter, #pool{} = Pool) ->
+    Ref = make_ref(),
+    gen_server:cast(Starter, {start_member, Pool, Ref}),
+    Ref.
+
+%% ------------------------------------------------------------------
+%% gen_server Function Definitions
+%% ------------------------------------------------------------------
+
+-spec init([]) -> {'ok', {}}.
+init([]) ->
+    {ok, {}}.
+
+handle_call(stop, _From, Pool) ->
+    {stop, normal, stop_ok, Pool};
+handle_call(_Request, _From, Pool) ->
+    {noreply, Pool}.
+
+handle_cast({start_member, Pool, Ref}, State) ->
+    ok = do_start_member(Pool, Ref),
+    {noreply, State}.
+
+do_start_member(#pool{name = PoolName,
+                      member_sup = PoolSup},
+                Ref) ->
+    case supervisor:start_child(PoolSup, []) of
+        {ok, Pid} ->
+            ok = pooler:accept_member(PoolName, {Ref, Pid}),
+            ok;
+        Error ->
+            error_logger:error_msg("pool '~s' failed to start member: ~p",
+                                   [PoolName, Error]),
+            pooler:accept_member(PoolName, {Ref, Error}),
+            ok
+    end.
+            
+-spec handle_info(_, _) -> {'noreply', _}.
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+-spec terminate(_, _) -> 'ok'.
+terminate(_Reason, _State) ->
+    ok.
+
+-spec code_change(_, _, _) -> {'ok', _}.
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.

+ 26 - 0
src/pooler_starter_sup.erl

@@ -0,0 +1,26 @@
+%% @doc Simple one for one supervisor for pooler_starter.
+%%
+%% This supervisor is shared by all pools since pooler_starter is a
+%% generic helper to fasciliate async member start.
+-module(pooler_starter_sup).
+
+-behaviour(supervisor).
+
+-export([new_starter/0,
+         start_link/0,
+         init/1]).
+
+-include("pooler.hrl").
+
+new_starter() ->
+    supervisor:start_child(?MODULE, []).
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+    Worker = {pooler_starter, {pooler_starter, start_link, []},
+              temporary, brutal_kill, worker, [pooler_starter]},
+    Specs = [Worker],
+    Restart = {simple_one_for_one, 1, 1},
+    {ok, {Restart, Specs}}.

+ 5 - 1
src/pooler_sup.erl

@@ -16,7 +16,11 @@ init([]) ->
     Pools = [ pooler_config:list_to_pool([MetricsConfig | L]) || L <- Config ],
     PoolSupSpecs = [ pool_sup_spec(Pool) || Pool <- Pools ],
     ets:new(?POOLER_GROUP_TABLE, [set, public, named_table, {write_concurrency, true}]),
-    {ok, {{one_for_one, 5, 60}, PoolSupSpecs}}.
+    {ok, {{one_for_one, 5, 60}, [starter_sup_spec() | PoolSupSpecs]}}.
+
+starter_sup_spec() ->
+    {pooler_starter_sup, {pooler_starter_sup, start_link, []},
+     transient, 5000, supervisor, [pooler_starter_sup]}.
 
 pool_sup_spec(#pool{name = Name} = Pool) ->
     SupName = pool_sup_name(Name),

+ 4 - 1
test/pooler_perf_test.erl

@@ -84,7 +84,7 @@ pooler_take_return_test_() ->
     {foreach,
      % setup
      fun() ->
-             InitCount = 10,
+             InitCount = 100,
              MaxCount = 100,
              NumPools = 5,
              error_logger:delete_report_handler(error_logger_tty_h),
@@ -114,6 +114,9 @@ pooler_take_return_test_() ->
                    lists:foldr(fun({_, L}, {O, F}) ->
                                        {O + ?gv(ok, L), F + ?gv(fail, L)}
                                end, {0, 0}, Res),
+               %% not sure what to test here now. We expect some
+               %% failures if init count is less than max count
+               %% because of async start.
                ?assertEqual(0, NumFail),
                ?assertEqual(100*100, NumOk)
        end}

+ 31 - 14
test/pooler_tests.erl

@@ -153,9 +153,14 @@ pooler_basics_test_() ->
                ?assertExit({noproc, _}, pooler:take_member(bad_pool_name))
        end},
 
-      {"pids are created on demand until max",
+      {"members creation is triggered after pool exhaustion until max",
        fun() ->
-               Pids = [pooler:take_member(test_pool_1), pooler:take_member(test_pool_1), pooler:take_member(test_pool_1)],
+               %% init count is 2
+               Pids0 = [pooler:take_member(test_pool_1), pooler:take_member(test_pool_1)],
+               %% since new member creation is async, can only assert
+               %% that we will get a pid, but may not be first try.
+               Pids = get_n_pids(1, Pids0),
+               %% pool is at max now, requests should give error
                ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
                ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
                PRefs = [ R || {_T, R} <- [ pooled_gs:get_id(P) || P <- Pids ] ],
@@ -178,8 +183,7 @@ pooler_basics_test_() ->
 
       {"if an in-use pid crashes it is replaced",
        fun() ->
-               Pids0 = [pooler:take_member(test_pool_1), pooler:take_member(test_pool_1),
-                        pooler:take_member(test_pool_1)],
+               Pids0 = get_n_pids(3, []),
                Ids0 = [ pooled_gs:get_id(P) || P <- Pids0 ],
                % crash them all
                [ pooled_gs:crash(P) || P <- Pids0 ],
@@ -199,7 +203,7 @@ pooler_basics_test_() ->
 
       {"if a pid is returned with bad status it is replaced",
        fun() ->
-               Pids0 = [pooler:take_member(test_pool_1), pooler:take_member(test_pool_1), pooler:take_member(test_pool_1)],
+               Pids0 = get_n_pids(3, []),
                Ids0 = [ pooled_gs:get_id(P) || P <- Pids0 ],
                % return them all marking as bad
                [ pooler:return_member(test_pool_1, P, fail) || P <- Pids0 ],
@@ -343,7 +347,7 @@ pooler_groups_test_() ->
 
       {"exhaust pools in group",
        fun() ->
-               Pids = [ pooler:take_group_member(group_1) || _I <- lists:seq(1, 6) ],
+               Pids = get_n_pids_group(group_1, 6, []),
                %% they should all be pids
                [ begin
                      {Type, _} = pooled_gs:get_id(P),
@@ -401,7 +405,7 @@ pooler_scheduled_cull_test_() ->
      [{"excess members are culled repeatedly",
        fun() ->
                %% take all members
-               Pids1 = [ pooler:take_member(test_pool_1) || _X <- lists:seq(1, 10) ],
+               Pids1 = get_n_pids(test_pool_1, 10, []),
                %% return all
                [ pooler:return_member(test_pool_1, P) || P <- Pids1 ],
                ?assertEqual(10, length(pooler:pool_stats(test_pool_1))),
@@ -410,7 +414,7 @@ pooler_scheduled_cull_test_() ->
                ?assertEqual(2, length(pooler:pool_stats(test_pool_1))),
 
                %% repeat the test to verify that culling gets rescheduled.
-               Pids2 = [ pooler:take_member(test_pool_1) || _X <- lists:seq(1, 10) ],
+               Pids2 = get_n_pids(test_pool_1, 10, []),
                %% return all
                [ pooler:return_member(test_pool_1, P) || P <- Pids2 ],
                ?assertEqual(10, length(pooler:pool_stats(test_pool_1))),
@@ -433,7 +437,7 @@ pooler_scheduled_cull_test_() ->
       {"in-use members are not culled",
        fun() ->
                %% take all members
-               Pids = [ pooler:take_member(test_pool_1) || _X <- lists:seq(1, 10) ],
+               Pids = get_n_pids(test_pool_1, 10, []),
                %% don't return any
                ?assertEqual(10, length(pooler:pool_stats(test_pool_1))),
                %% wait for longer than cull delay
@@ -553,12 +557,25 @@ time_as_micros_test_() ->
 % testing crash recovery means race conditions when either pids
 % haven't yet crashed or pooler hasn't recovered.  So this helper loops
 % forver until N pids are obtained, ignoring error_no_members.
-get_n_pids(0, Acc) ->
-    Acc;
 get_n_pids(N, Acc) ->
-    case pooler:take_member(test_pool_1) of
+    get_n_pids(test_pool_1, N, Acc).
+
+get_n_pids(_Pool, 0, Acc) ->
+    Acc;
+get_n_pids(Pool, N, Acc) ->
+    case pooler:take_member(Pool) of
+        error_no_members ->
+            get_n_pids(Pool, N, Acc);
+        Pid ->
+            get_n_pids(Pool, N - 1, [Pid|Acc])
+    end.
+
+get_n_pids_group(_Group, 0, Acc) ->
+    Acc;
+get_n_pids_group(Group, N, Acc) ->
+    case pooler:take_group_member(Group) of
         error_no_members ->
-            get_n_pids(N, Acc);
+            get_n_pids_group(Group, N, Acc);
         Pid ->
-            get_n_pids(N - 1, [Pid|Acc])
+            get_n_pids_group(Group, N - 1, [Pid|Acc])
     end.