Browse Source

pooler_starter should cleanup slow process starts

Pooler/_starter previously had a bug where if a started process took longer
than the start_member_timeout but did not crash, the process would get
orphaned. This fix includes modifications to the starter that change its
lifecycle. The starter will now wait to be stopped by the pooler gen_server via
either the stop_member_async or stop apis. The stop_member_async api is called
on the error path and the stop api is called on the success path.

Additionally, pooler gen_server bookkeeping is fixed to correctly update the
pool state with regard to starting members. Previously, pool record was not
updated with the starting members list returned from the non_stale filter.
Oliver Ferrigni 10 years ago
parent
commit
4c0a457336
5 changed files with 167 additions and 73 deletions
  1. 40 33
      src/pooler.erl
  2. 62 35
      src/pooler_starter.erl
  3. 3 3
      src/pooler_starter_sup.erl
  4. 9 0
      test/pooled_gs.erl
  5. 53 2
      test/pooler_tests.erl

+ 40 - 33
src/pooler.erl

@@ -297,7 +297,6 @@ init(#pool{}=Pool) ->
 
 set_member_sup(#pool{} = Pool, MemberSup) ->
     Pool#pool{member_sup = MemberSup}.
-
 handle_call(take_member, {CPid, _Tag}, #pool{} = Pool) ->
     {Member, NewPool} = take_member_from_pool(Pool, CPid),
     {reply, Member, NewPool};
@@ -363,7 +362,7 @@ code_change(_OldVsn, State, _Extra) ->
 %% Internal Function Definitions
 %% ------------------------------------------------------------------
 
-do_accept_member({Ref, Pid},
+do_accept_member({StarterPid, Pid},
                  #pool{
                     all_members = AllMembers,
                     free_pids = Free,
@@ -372,49 +371,57 @@ do_accept_member({Ref, Pid},
                     member_start_timeout = StartTimeout
                    } = Pool) when is_pid(Pid) ->
     %% make sure we don't accept a timedout member
-    StartingMembers = remove_stale_starting_members(Pool, StartingMembers0,
-                                                    StartTimeout),
-    case lists:keymember(Ref, 1, StartingMembers) of
+    Pool1 = #pool{starting_members = StartingMembers}= remove_stale_starting_members(Pool, StartingMembers0, StartTimeout),
+    case lists:keymember(StarterPid, 1, StartingMembers) of
         false ->
-            %% a pid we didn't ask to start, ignore it.
-            %% should we log it?
-            Pool;
+            %% A starter completed even though we invalidated the pid
+            %% Ask the starter to kill the child and stop. In most cases, the
+            %% starter has already received this message. However, when pools
+            %% are dynamically re-created with the same name, it is possible
+            %% to receive an accept from a pool that has since gone away.
+            %% In this case, we should cleanup.
+            pooler_starter:stop_member_async(StarterPid),
+            Pool1;
         true ->
-            StartingMembers1 = lists:keydelete(Ref, 1, StartingMembers),
+            StartingMembers1 = lists:keydelete(StarterPid, 1, StartingMembers),
             MRef = erlang:monitor(process, Pid),
             Entry = {MRef, free, os:timestamp()},
             AllMembers1 = store_all_members(Pid, Entry, AllMembers),
+            pooler_starter:stop(StarterPid),
             Pool#pool{free_pids = Free ++ [Pid],
                       free_count = NumFree + 1,
                       all_members = AllMembers1,
                       starting_members = StartingMembers1}
     end;
-do_accept_member({Ref, _Reason}, #pool{starting_members = StartingMembers0,
+do_accept_member({StarterPid, _Reason}, #pool{starting_members = StartingMembers0,
                                        member_start_timeout = StartTimeout} = Pool) ->
     %% member start failed, remove in-flight ref and carry on.
-    StartingMembers = remove_stale_starting_members(Pool, StartingMembers0,
+    pooler_starter:stop(StarterPid),
+    Pool1 = #pool{starting_members = StartingMembers} = remove_stale_starting_members(Pool, StartingMembers0,
                                                     StartTimeout),
-    StartingMembers1 = lists:keydelete(Ref, 1, StartingMembers),
-    Pool#pool{starting_members = StartingMembers1}.
+    StartingMembers1 = lists:keydelete(StarterPid, 1, StartingMembers),
+    Pool1#pool{starting_members = StartingMembers1}.
 
 
 -spec remove_stale_starting_members(#pool{}, [{reference(), erlang:timestamp()}],
-                                    time_spec()) -> [{reference(), erlang:timestamp()}].
+                                    time_spec()) -> #pool{}.
 remove_stale_starting_members(Pool, StartingMembers, MaxAge) ->
     Now = os:timestamp(),
     MaxAgeSecs = time_as_secs(MaxAge),
-    lists:filter(fun(SM) ->
-                         starting_member_not_stale(Pool, Now, SM, MaxAgeSecs)
-                 end, StartingMembers).
+    FilteredStartingMembers = lists:foldl(fun(SM, AccIn) ->
+                         accumulate_starting_member_not_stale(Pool, Now, SM, MaxAgeSecs, AccIn)
+                 end, [], StartingMembers),
+    Pool#pool{starting_members = FilteredStartingMembers}.
 
-starting_member_not_stale(Pool, Now, {_Ref, StartTime}, MaxAgeSecs) ->
+accumulate_starting_member_not_stale(Pool, Now, SM = {Pid, StartTime}, MaxAgeSecs, AccIn) ->
     case secs_between(StartTime, Now) < MaxAgeSecs of
         true ->
-            true;
+            [SM | AccIn];
         false ->
             error_logger:error_msg("pool '~s': starting member timeout", [Pool#pool.name]),
             send_metric(Pool, starting_member_timeout, {inc, 1}, counter),
-            false
+            pooler_starter:stop_member_async(Pid),
+            AccIn
     end.
 
 init_members_sync(N, #pool{name = PoolName} = Pool) ->
@@ -452,18 +459,18 @@ take_member_from_pool(#pool{init_count = InitCount,
                             in_use_count = NumInUse,
                             free_count = NumFree,
                             consumer_to_pid = CPMap,
-                            starting_members = StartingMembers0,
+                            starting_members = StartingMembers,
                             member_start_timeout = StartTimeout} = Pool,
                       From) ->
     send_metric(Pool, take_rate, 1, meter),
-    StartingMembers = remove_stale_starting_members(Pool, StartingMembers0,
-                                                    StartTimeout),
-    NumCanAdd = Max - (NumInUse + NumFree + length(StartingMembers)),
+    Pool1 = remove_stale_starting_members(Pool, StartingMembers, StartTimeout),
+    NonStaleStartingMemberCount = length(Pool1#pool.starting_members),
+    NumCanAdd = Max - (NumInUse + NumFree + NonStaleStartingMemberCount),
     case Free of
         [] when NumCanAdd =< 0  ->
             send_metric(Pool, error_no_members_count, {inc, 1}, counter),
             send_metric(Pool, events, error_no_members, history),
-            {error_no_members, Pool};
+            {error_no_members, Pool1};
         [] when NumCanAdd > 0 ->
             %% Limit concurrently starting members to init_count. Add
             %% up to init_count members. Starting members here means
@@ -472,20 +479,20 @@ take_member_from_pool(#pool{init_count = InitCount,
             %% members, the pool should reach a steady state with
             %% unused members culled over time (if scheduled cull is
             %% enabled).
-            NumToAdd = min(InitCount - length(StartingMembers), NumCanAdd),
-            Pool1 = add_members_async(NumToAdd, Pool),
+            NumToAdd = max(min(InitCount - NonStaleStartingMemberCount, NumCanAdd), 1),
+            Pool2 = add_members_async(NumToAdd, Pool1),
             send_metric(Pool, error_no_members_count, {inc, 1}, counter),
             send_metric(Pool, events, error_no_members, history),
-            {error_no_members, Pool1};
+            {error_no_members, Pool2};
         [Pid|Rest] ->
-            Pool1 = Pool#pool{free_pids = Rest, in_use_count = NumInUse + 1,
+            Pool2 = Pool1#pool{free_pids = Rest, in_use_count = NumInUse + 1,
                               free_count = NumFree - 1},
-            send_metric(Pool, in_use_count, Pool1#pool.in_use_count, histogram),
-            send_metric(Pool, free_count, Pool1#pool.free_count, histogram),
-            {Pid, Pool1#pool{
+            send_metric(Pool, in_use_count, Pool2#pool.in_use_count, histogram),
+            send_metric(Pool, free_count, Pool2#pool.free_count, histogram),
+            {Pid, Pool2#pool{
                     consumer_to_pid = add_member_to_consumer(Pid, From, CPMap),
                     all_members = set_cpid_for_member(Pid, From,
-                                                      Pool1#pool.all_members)
+                                                      Pool2#pool.all_members)
                    }}
     end.
 

+ 62 - 35
src/pooler_starter.erl

@@ -12,9 +12,10 @@
 %% API Function Exports
 %% ------------------------------------------------------------------
 
--export([start_link/3,
+-export([start_link/2,
          start_member/1,
          start_member/2,
+         stop_member_async/1,
          stop/1]).
 
 %% ------------------------------------------------------------------
@@ -37,66 +38,91 @@
 %% API Function Definitions
 %% ------------------------------------------------------------------
 
-start_link(Pool, Ref, Parent) ->
-    gen_server:start_link(?MODULE, {Pool, Ref, Parent}, []).
+start_link(Pool, Parent) ->
+    gen_server:start_link(?MODULE, {Pool, Parent}, []).
 
 stop(Starter) ->
-    gen_server:call(Starter, stop).
+    gen_server:cast(Starter, stop).
 
 %% @doc Start a member for the specified `Pool'.
 %%
 %% Member creation with this call is async. This function returns
-%% immediately with a reference. When the member has been created it
-%% is sent to the specified pool via {@link pooler:accept_member/2}.
+%% immediately with create process' pid. When the member has been
+%% created it is sent to the specified pool via
+%% {@link pooler:accept_member/2}.
 %%
 %% Each call starts a single use `pooler_starter' instance via
 %% `pooler_starter_sup'. The instance terminates normally after
 %% creating a single member.
--spec start_member(#pool{}) -> reference().
+-spec start_member(#pool{}) -> pid().
 start_member(Pool) ->
-    Ref = make_ref(),
-    {ok, _Pid} = pooler_starter_sup:new_starter(Pool, Ref, pool),
-    Ref.
+    {ok, Pid} = pooler_starter_sup:new_starter(Pool, pool),
+    Pid.
 
 %% @doc Same as {@link start_member/1} except that instead of calling
 %% {@link pooler:accept_member/2} a raw message is sent to `Parent' of
 %% the form `{accept_member, {Ref, Member}'. Where `Member' will
 %% either be the member pid or an error term and `Ref' will be the
-%% reference returned from this function.
+%% Pid of the starter.
 %%
 %% This is used by the init function in the `pooler' to start the
 %% initial set of pool members in parallel.
 start_member(Pool, Parent) ->
-    Ref = make_ref(),
-    {ok, _Pid} = pooler_starter_sup:new_starter(Pool, Ref, Parent),
-    Ref.
+    {ok, Pid} = pooler_starter_sup:new_starter(Pool, Parent),
+    Pid.
+
+%% @doc Stop a member in the pool
+
+%% Member creation can take too long. In this case, the starter
+%% needs to be informed that even if creation succeeds, the
+%% started child should be not be sent back and should be
+%% cleaned up
+-spec stop_member_async(pid()) -> ok.
+stop_member_async(Pid) ->
+    gen_server:cast(Pid, stop_member).
 
 %% ------------------------------------------------------------------
 %% gen_server Function Definitions
 %% ------------------------------------------------------------------
 -record(starter, {pool,
-                  ref,
-                  parent}).
+                  parent,
+                  msg}).
 
--spec init({#pool{}, reference(), pid() | atom()}) -> {'ok', #starter{}, 0}.
-init({Pool, Ref, Parent}) ->
+-spec init({#pool{}, pid() | atom()}) -> {'ok', #starter{}, 0}.
+init({Pool, Parent}) ->
     %% trigger immediate timeout message, which we'll use to trigger
     %% the member start.
-    {ok, #starter{pool = Pool, ref = Ref, parent = Parent}, 0}.
+    {ok, #starter{pool = Pool, parent = Parent}, 0}.
 
-handle_call(stop, _From, State) ->
-    {stop, normal, stop_ok, State};
 handle_call(_Request, _From, State) ->
     {noreply, State}.
 
+handle_cast(stop_member, #starter{msg = {_Me, Pid}, pool = #pool{member_sup = MemberSup}} = State) ->
+    %% The process we were starting is no longer valid for the pool.
+    %% Cleanup the process and stop normally.
+    supervisor:terminate_child(MemberSup, Pid),
+    {stop, normal, State};
+
+handle_cast(accept_member, #starter{msg = Msg, parent = Parent, pool = #pool{name = PoolName}} = State) ->
+    %% Process creation has succeeded. Send the member to the pooler
+    %% gen_server to be accepted. Pooler gen_server will notify
+    %% us if the member was accepted or needs to cleaned up.
+    send_accept_member(Parent, PoolName, Msg),
+    {noreply, State};
+
+handle_cast(stop, State) ->
+    {stop, normal, stop_ok, State};
+
+
 handle_cast(_Request, State) ->
     {noreply, State}.
 
 -spec handle_info(_, _) -> {'noreply', _}.
 handle_info(timeout,
-            #starter{pool = Pool, ref = Ref, parent = Parent} = State) ->
-    ok = do_start_member(Pool, Ref, Parent),
-    {stop, normal, State};
+            #starter{pool = Pool} = State) ->
+    Msg = do_start_member(Pool),
+    accept_member_async(self()),
+    {noreply, State#starter{msg = Msg}};
 handle_info(_Info, State) ->
     {noreply, State}.
 
@@ -108,20 +134,21 @@ terminate(_Reason, _State) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-do_start_member(#pool{name = PoolName, member_sup = PoolSup}, Ref, Parent) ->
-    Msg = case supervisor:start_child(PoolSup, []) of
-              {ok, Pid} ->
-                  {Ref, Pid};
-              Error ->
-                  error_logger:error_msg("pool '~s' failed to start member: ~p",
-                                         [PoolName, Error]),
-                  {Ref, Error}
-          end,
-    send_accept_member(Parent, PoolName, Msg),
-    ok.
+do_start_member(#pool{member_sup = PoolSup, name = PoolName}) ->
+    case supervisor:start_child(PoolSup, []) of
+        {ok, Pid} ->
+            {self(), Pid};
+        Error ->
+            error_logger:error_msg("pool '~s' failed to start member: ~p",
+                                   [PoolName, Error]),
+            {self(), Error}
+    end.
 
 send_accept_member(pool, PoolName, Msg) ->
     pooler:accept_member(PoolName, Msg);
 send_accept_member(Pid, _PoolName, Msg) ->
     Pid ! {accept_member, Msg},
     ok.
+
+accept_member_async(Pid) ->
+    gen_server:cast(Pid, accept_member).

+ 3 - 3
src/pooler_starter_sup.erl

@@ -6,14 +6,14 @@
 
 -behaviour(supervisor).
 
--export([new_starter/3,
+-export([new_starter/2,
          start_link/0,
          init/1]).
 
 -include("pooler.hrl").
 
-new_starter(Pool, Ref, Parent) ->
-    supervisor:start_child(?MODULE, [Pool, Ref, Parent]).
+new_starter(Pool, Parent) ->
+    supervisor:start_child(?MODULE, [Pool, Parent]).
 
 start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).

+ 9 - 0
test/pooled_gs.erl

@@ -36,6 +36,10 @@
 
 start_link(Args ={_Type}) ->
     % not registered
+    gen_server:start_link(?MODULE, Args, []);
+
+start_link(Args ={_Type, _InitFun}) ->
+    % not registered
     gen_server:start_link(?MODULE, Args, []).
 
 %% @doc return the type argument passed to this worker at start time
@@ -73,8 +77,13 @@ stop(S) ->
          }).
 
 init({Type}) ->
+    {ok, #state{type = Type, id = make_ref()}};
+init({Type, StartFun}) ->
+    StartFun(),
     {ok, #state{type = Type, id = make_ref()}}.
 
+
+
 handle_call(get_id, _From, State) ->
     {reply, {State#state.type, State#state.id}, State};
 handle_call({do_work, T}, _From, State) ->

+ 53 - 2
test/pooler_tests.erl

@@ -371,8 +371,8 @@ basic_tests() ->
       {"accept bad member is handled",
        fun() ->
                Bad = spawn(fun() -> ok end),
-               Ref = erlang:make_ref(),
-               ?assertEqual(ok, pooler:accept_member(test_pool_1, {Ref, Bad}))
+               FakeStarter = spawn(fun() -> starter end),
+               ?assertEqual(ok, pooler:accept_member(test_pool_1, {FakeStarter, Bad}))
        end}
       ].
 
@@ -684,6 +684,51 @@ random_message_test_() ->
      end
     ]}.
 
+pooler_integration_long_init_test_() ->
+    {foreach,
+     % setup
+     fun() ->
+             Pool = [{name, test_pool_1},
+                       {max_count, 10},
+                       {init_count, 0},
+                       {member_start_timeout, {10, ms}},
+                       {start_mfa,
+                        {pooled_gs, start_link, [{"type-0", fun() -> timer:sleep(15) end}]}}],
+
+             application:set_env(pooler, pools, [Pool]),
+             application:start(pooler)
+     end,
+     % cleanup
+     fun(_) ->
+             application:stop(pooler)
+     end,
+     %
+     [
+      fun(_) ->
+              % Test what happens when pool members take too long to start.
+              % The pooler_starter should kill off stale members, there by
+              % reducing the number of children of the member_sup. This
+              % activity occurs both during take member and accept member.
+              % Accordingly, the count should go to zero once all starters
+              % check in.
+              fun() ->
+                      ?assertEqual(0, children_count(pooler_test_pool_1_member_sup)),
+                      [begin
+                           ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
+                           ?assertEqual(1, starting_members(test_pool_1))
+                       end
+                               || _ <- lists:seq(1,10)],
+                      ?assertEqual(10, children_count(pooler_test_pool_1_member_sup)),
+
+                      timer:sleep(150),
+                      ?assertEqual(0, children_count(pooler_test_pool_1_member_sup)),
+                      ?assertEqual(0, starting_members(test_pool_1))
+              end
+      end
+     ]
+     }.
+    
+
 pooler_integration_test_() ->
     {foreach,
      % setup
@@ -787,3 +832,9 @@ get_n_pids_group(Group, N, Acc) ->
         Pid ->
             get_n_pids_group(Group, N - 1, [Pid|Acc])
     end.
+
+children_count(SupId) ->
+    length(supervisor:which_children(SupId)).
+
+starting_members(PoolName) ->
+    length((gen_server:call(PoolName, dump_pool))#pool.starting_members).