Browse Source

Merge pull request #31 from oferrigni/of/kill_stale_started_members

Of/kill stale started members
Oliver Ferrigni 10 years ago
parent
commit
0ecc29bb8e
5 changed files with 167 additions and 73 deletions
  1. 40 33
      src/pooler.erl
  2. 62 35
      src/pooler_starter.erl
  3. 3 3
      src/pooler_starter_sup.erl
  4. 9 0
      test/pooled_gs.erl
  5. 53 2
      test/pooler_tests.erl

+ 40 - 33
src/pooler.erl

@@ -297,7 +297,6 @@ init(#pool{}=Pool) ->
 
 set_member_sup(#pool{} = Pool, MemberSup) ->
     Pool#pool{member_sup = MemberSup}.
-
 handle_call(take_member, {CPid, _Tag}, #pool{} = Pool) ->
     {Member, NewPool} = take_member_from_pool(Pool, CPid),
     {reply, Member, NewPool};
@@ -363,7 +362,7 @@ code_change(_OldVsn, State, _Extra) ->
 %% Internal Function Definitions
 %% ------------------------------------------------------------------
 
-do_accept_member({Ref, Pid},
+do_accept_member({StarterPid, Pid},
                  #pool{
                     all_members = AllMembers,
                     free_pids = Free,
@@ -372,49 +371,57 @@ do_accept_member({Ref, Pid},
                     member_start_timeout = StartTimeout
                    } = Pool) when is_pid(Pid) ->
     %% make sure we don't accept a timedout member
-    StartingMembers = remove_stale_starting_members(Pool, StartingMembers0,
-                                                    StartTimeout),
-    case lists:keymember(Ref, 1, StartingMembers) of
+    Pool1 = #pool{starting_members = StartingMembers}= remove_stale_starting_members(Pool, StartingMembers0, StartTimeout),
+    case lists:keymember(StarterPid, 1, StartingMembers) of
         false ->
-            %% a pid we didn't ask to start, ignore it.
-            %% should we log it?
-            Pool;
+            %% A starter completed even though we invalidated the pid
+            %% Ask the starter to kill the child and stop. In most cases, the
+            %% starter has already received this message. However, when pools
+            %% are dynamically re-created with the same name, it is possible
+            %% to receive an accept from a pool that has since gone away.
+            %% In this case, we should cleanup.
+            pooler_starter:stop_member_async(StarterPid),
+            Pool1;
         true ->
-            StartingMembers1 = lists:keydelete(Ref, 1, StartingMembers),
+            StartingMembers1 = lists:keydelete(StarterPid, 1, StartingMembers),
             MRef = erlang:monitor(process, Pid),
             Entry = {MRef, free, os:timestamp()},
             AllMembers1 = store_all_members(Pid, Entry, AllMembers),
+            pooler_starter:stop(StarterPid),
             Pool#pool{free_pids = Free ++ [Pid],
                       free_count = NumFree + 1,
                       all_members = AllMembers1,
                       starting_members = StartingMembers1}
     end;
-do_accept_member({Ref, _Reason}, #pool{starting_members = StartingMembers0,
+do_accept_member({StarterPid, _Reason}, #pool{starting_members = StartingMembers0,
                                        member_start_timeout = StartTimeout} = Pool) ->
     %% member start failed, remove in-flight ref and carry on.
-    StartingMembers = remove_stale_starting_members(Pool, StartingMembers0,
+    pooler_starter:stop(StarterPid),
+    Pool1 = #pool{starting_members = StartingMembers} = remove_stale_starting_members(Pool, StartingMembers0,
                                                     StartTimeout),
-    StartingMembers1 = lists:keydelete(Ref, 1, StartingMembers),
-    Pool#pool{starting_members = StartingMembers1}.
+    StartingMembers1 = lists:keydelete(StarterPid, 1, StartingMembers),
+    Pool1#pool{starting_members = StartingMembers1}.
 
 
 -spec remove_stale_starting_members(#pool{}, [{reference(), erlang:timestamp()}],
-                                    time_spec()) -> [{reference(), erlang:timestamp()}].
+                                    time_spec()) -> #pool{}.
 remove_stale_starting_members(Pool, StartingMembers, MaxAge) ->
     Now = os:timestamp(),
     MaxAgeSecs = time_as_secs(MaxAge),
-    lists:filter(fun(SM) ->
-                         starting_member_not_stale(Pool, Now, SM, MaxAgeSecs)
-                 end, StartingMembers).
+    FilteredStartingMembers = lists:foldl(fun(SM, AccIn) ->
+                         accumulate_starting_member_not_stale(Pool, Now, SM, MaxAgeSecs, AccIn)
+                 end, [], StartingMembers),
+    Pool#pool{starting_members = FilteredStartingMembers}.
 
-starting_member_not_stale(Pool, Now, {_Ref, StartTime}, MaxAgeSecs) ->
+accumulate_starting_member_not_stale(Pool, Now, SM = {Pid, StartTime}, MaxAgeSecs, AccIn) ->
     case secs_between(StartTime, Now) < MaxAgeSecs of
         true ->
-            true;
+            [SM | AccIn];
         false ->
             error_logger:error_msg("pool '~s': starting member timeout", [Pool#pool.name]),
             send_metric(Pool, starting_member_timeout, {inc, 1}, counter),
-            false
+            pooler_starter:stop_member_async(Pid),
+            AccIn
     end.
 
 init_members_sync(N, #pool{name = PoolName} = Pool) ->
@@ -452,18 +459,18 @@ take_member_from_pool(#pool{init_count = InitCount,
                             in_use_count = NumInUse,
                             free_count = NumFree,
                             consumer_to_pid = CPMap,
-                            starting_members = StartingMembers0,
+                            starting_members = StartingMembers,
                             member_start_timeout = StartTimeout} = Pool,
                       From) ->
     send_metric(Pool, take_rate, 1, meter),
-    StartingMembers = remove_stale_starting_members(Pool, StartingMembers0,
-                                                    StartTimeout),
-    NumCanAdd = Max - (NumInUse + NumFree + length(StartingMembers)),
+    Pool1 = remove_stale_starting_members(Pool, StartingMembers, StartTimeout),
+    NonStaleStartingMemberCount = length(Pool1#pool.starting_members),
+    NumCanAdd = Max - (NumInUse + NumFree + NonStaleStartingMemberCount),
     case Free of
         [] when NumCanAdd =< 0  ->
             send_metric(Pool, error_no_members_count, {inc, 1}, counter),
             send_metric(Pool, events, error_no_members, history),
-            {error_no_members, Pool};
+            {error_no_members, Pool1};
         [] when NumCanAdd > 0 ->
             %% Limit concurrently starting members to init_count. Add
             %% up to init_count members. Starting members here means
@@ -472,20 +479,20 @@ take_member_from_pool(#pool{init_count = InitCount,
             %% members, the pool should reach a steady state with
             %% unused members culled over time (if scheduled cull is
             %% enabled).
-            NumToAdd = min(InitCount - length(StartingMembers), NumCanAdd),
-            Pool1 = add_members_async(NumToAdd, Pool),
+            NumToAdd = max(min(InitCount - NonStaleStartingMemberCount, NumCanAdd), 1),
+            Pool2 = add_members_async(NumToAdd, Pool1),
             send_metric(Pool, error_no_members_count, {inc, 1}, counter),
             send_metric(Pool, events, error_no_members, history),
-            {error_no_members, Pool1};
+            {error_no_members, Pool2};
         [Pid|Rest] ->
-            Pool1 = Pool#pool{free_pids = Rest, in_use_count = NumInUse + 1,
+            Pool2 = Pool1#pool{free_pids = Rest, in_use_count = NumInUse + 1,
                               free_count = NumFree - 1},
-            send_metric(Pool, in_use_count, Pool1#pool.in_use_count, histogram),
-            send_metric(Pool, free_count, Pool1#pool.free_count, histogram),
-            {Pid, Pool1#pool{
+            send_metric(Pool, in_use_count, Pool2#pool.in_use_count, histogram),
+            send_metric(Pool, free_count, Pool2#pool.free_count, histogram),
+            {Pid, Pool2#pool{
                     consumer_to_pid = add_member_to_consumer(Pid, From, CPMap),
                     all_members = set_cpid_for_member(Pid, From,
-                                                      Pool1#pool.all_members)
+                                                      Pool2#pool.all_members)
                    }}
     end.
 

+ 62 - 35
src/pooler_starter.erl

@@ -12,9 +12,10 @@
 %% API Function Exports
 %% ------------------------------------------------------------------
 
--export([start_link/3,
+-export([start_link/2,
          start_member/1,
          start_member/2,
+         stop_member_async/1,
          stop/1]).
 
 %% ------------------------------------------------------------------
@@ -37,66 +38,91 @@
 %% API Function Definitions
 %% ------------------------------------------------------------------
 
-start_link(Pool, Ref, Parent) ->
-    gen_server:start_link(?MODULE, {Pool, Ref, Parent}, []).
+start_link(Pool, Parent) ->
+    gen_server:start_link(?MODULE, {Pool, Parent}, []).
 
 stop(Starter) ->
-    gen_server:call(Starter, stop).
+    gen_server:cast(Starter, stop).
 
 %% @doc Start a member for the specified `Pool'.
 %%
 %% Member creation with this call is async. This function returns
-%% immediately with a reference. When the member has been created it
-%% is sent to the specified pool via {@link pooler:accept_member/2}.
+%% immediately with create process' pid. When the member has been
+%% created it is sent to the specified pool via
+%% {@link pooler:accept_member/2}.
 %%
 %% Each call starts a single use `pooler_starter' instance via
 %% `pooler_starter_sup'. The instance terminates normally after
 %% creating a single member.
--spec start_member(#pool{}) -> reference().
+-spec start_member(#pool{}) -> pid().
 start_member(Pool) ->
-    Ref = make_ref(),
-    {ok, _Pid} = pooler_starter_sup:new_starter(Pool, Ref, pool),
-    Ref.
+    {ok, Pid} = pooler_starter_sup:new_starter(Pool, pool),
+    Pid.
 
 %% @doc Same as {@link start_member/1} except that instead of calling
 %% {@link pooler:accept_member/2} a raw message is sent to `Parent' of
 %% the form `{accept_member, {Ref, Member}'. Where `Member' will
 %% either be the member pid or an error term and `Ref' will be the
-%% reference returned from this function.
+%% Pid of the starter.
 %%
 %% This is used by the init function in the `pooler' to start the
 %% initial set of pool members in parallel.
 start_member(Pool, Parent) ->
-    Ref = make_ref(),
-    {ok, _Pid} = pooler_starter_sup:new_starter(Pool, Ref, Parent),
-    Ref.
+    {ok, Pid} = pooler_starter_sup:new_starter(Pool, Parent),
+    Pid.
+
+%% @doc Stop a member in the pool
+
+%% Member creation can take too long. In this case, the starter
+%% needs to be informed that even if creation succeeds, the
+%% started child should be not be sent back and should be
+%% cleaned up
+-spec stop_member_async(pid()) -> ok.
+stop_member_async(Pid) ->
+    gen_server:cast(Pid, stop_member).
 
 %% ------------------------------------------------------------------
 %% gen_server Function Definitions
 %% ------------------------------------------------------------------
 -record(starter, {pool,
-                  ref,
-                  parent}).
+                  parent,
+                  msg}).
 
--spec init({#pool{}, reference(), pid() | atom()}) -> {'ok', #starter{}, 0}.
-init({Pool, Ref, Parent}) ->
+-spec init({#pool{}, pid() | atom()}) -> {'ok', #starter{}, 0}.
+init({Pool, Parent}) ->
     %% trigger immediate timeout message, which we'll use to trigger
     %% the member start.
-    {ok, #starter{pool = Pool, ref = Ref, parent = Parent}, 0}.
+    {ok, #starter{pool = Pool, parent = Parent}, 0}.
 
-handle_call(stop, _From, State) ->
-    {stop, normal, stop_ok, State};
 handle_call(_Request, _From, State) ->
     {noreply, State}.
 
+handle_cast(stop_member, #starter{msg = {_Me, Pid}, pool = #pool{member_sup = MemberSup}} = State) ->
+    %% The process we were starting is no longer valid for the pool.
+    %% Cleanup the process and stop normally.
+    supervisor:terminate_child(MemberSup, Pid),
+    {stop, normal, State};
+
+handle_cast(accept_member, #starter{msg = Msg, parent = Parent, pool = #pool{name = PoolName}} = State) ->
+    %% Process creation has succeeded. Send the member to the pooler
+    %% gen_server to be accepted. Pooler gen_server will notify
+    %% us if the member was accepted or needs to cleaned up.
+    send_accept_member(Parent, PoolName, Msg),
+    {noreply, State};
+
+handle_cast(stop, State) ->
+    {stop, normal, stop_ok, State};
+
+
 handle_cast(_Request, State) ->
     {noreply, State}.
 
 -spec handle_info(_, _) -> {'noreply', _}.
 handle_info(timeout,
-            #starter{pool = Pool, ref = Ref, parent = Parent} = State) ->
-    ok = do_start_member(Pool, Ref, Parent),
-    {stop, normal, State};
+            #starter{pool = Pool} = State) ->
+    Msg = do_start_member(Pool),
+    accept_member_async(self()),
+    {noreply, State#starter{msg = Msg}};
 handle_info(_Info, State) ->
     {noreply, State}.
 
@@ -108,20 +134,21 @@ terminate(_Reason, _State) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-do_start_member(#pool{name = PoolName, member_sup = PoolSup}, Ref, Parent) ->
-    Msg = case supervisor:start_child(PoolSup, []) of
-              {ok, Pid} ->
-                  {Ref, Pid};
-              Error ->
-                  error_logger:error_msg("pool '~s' failed to start member: ~p",
-                                         [PoolName, Error]),
-                  {Ref, Error}
-          end,
-    send_accept_member(Parent, PoolName, Msg),
-    ok.
+do_start_member(#pool{member_sup = PoolSup, name = PoolName}) ->
+    case supervisor:start_child(PoolSup, []) of
+        {ok, Pid} ->
+            {self(), Pid};
+        Error ->
+            error_logger:error_msg("pool '~s' failed to start member: ~p",
+                                   [PoolName, Error]),
+            {self(), Error}
+    end.
 
 send_accept_member(pool, PoolName, Msg) ->
     pooler:accept_member(PoolName, Msg);
 send_accept_member(Pid, _PoolName, Msg) ->
     Pid ! {accept_member, Msg},
     ok.
+
+accept_member_async(Pid) ->
+    gen_server:cast(Pid, accept_member).

+ 3 - 3
src/pooler_starter_sup.erl

@@ -6,14 +6,14 @@
 
 -behaviour(supervisor).
 
--export([new_starter/3,
+-export([new_starter/2,
          start_link/0,
          init/1]).
 
 -include("pooler.hrl").
 
-new_starter(Pool, Ref, Parent) ->
-    supervisor:start_child(?MODULE, [Pool, Ref, Parent]).
+new_starter(Pool, Parent) ->
+    supervisor:start_child(?MODULE, [Pool, Parent]).
 
 start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).

+ 9 - 0
test/pooled_gs.erl

@@ -36,6 +36,10 @@
 
 start_link(Args ={_Type}) ->
     % not registered
+    gen_server:start_link(?MODULE, Args, []);
+
+start_link(Args ={_Type, _InitFun}) ->
+    % not registered
     gen_server:start_link(?MODULE, Args, []).
 
 %% @doc return the type argument passed to this worker at start time
@@ -73,8 +77,13 @@ stop(S) ->
          }).
 
 init({Type}) ->
+    {ok, #state{type = Type, id = make_ref()}};
+init({Type, StartFun}) ->
+    StartFun(),
     {ok, #state{type = Type, id = make_ref()}}.
 
+
+
 handle_call(get_id, _From, State) ->
     {reply, {State#state.type, State#state.id}, State};
 handle_call({do_work, T}, _From, State) ->

+ 53 - 2
test/pooler_tests.erl

@@ -371,8 +371,8 @@ basic_tests() ->
       {"accept bad member is handled",
        fun() ->
                Bad = spawn(fun() -> ok end),
-               Ref = erlang:make_ref(),
-               ?assertEqual(ok, pooler:accept_member(test_pool_1, {Ref, Bad}))
+               FakeStarter = spawn(fun() -> starter end),
+               ?assertEqual(ok, pooler:accept_member(test_pool_1, {FakeStarter, Bad}))
        end}
       ].
 
@@ -684,6 +684,51 @@ random_message_test_() ->
      end
     ]}.
 
+pooler_integration_long_init_test_() ->
+    {foreach,
+     % setup
+     fun() ->
+             Pool = [{name, test_pool_1},
+                       {max_count, 10},
+                       {init_count, 0},
+                       {member_start_timeout, {10, ms}},
+                       {start_mfa,
+                        {pooled_gs, start_link, [{"type-0", fun() -> timer:sleep(15) end}]}}],
+
+             application:set_env(pooler, pools, [Pool]),
+             application:start(pooler)
+     end,
+     % cleanup
+     fun(_) ->
+             application:stop(pooler)
+     end,
+     %
+     [
+      fun(_) ->
+              % Test what happens when pool members take too long to start.
+              % The pooler_starter should kill off stale members, there by
+              % reducing the number of children of the member_sup. This
+              % activity occurs both during take member and accept member.
+              % Accordingly, the count should go to zero once all starters
+              % check in.
+              fun() ->
+                      ?assertEqual(0, children_count(pooler_test_pool_1_member_sup)),
+                      [begin
+                           ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
+                           ?assertEqual(1, starting_members(test_pool_1))
+                       end
+                               || _ <- lists:seq(1,10)],
+                      ?assertEqual(10, children_count(pooler_test_pool_1_member_sup)),
+
+                      timer:sleep(150),
+                      ?assertEqual(0, children_count(pooler_test_pool_1_member_sup)),
+                      ?assertEqual(0, starting_members(test_pool_1))
+              end
+      end
+     ]
+     }.
+    
+
 pooler_integration_test_() ->
     {foreach,
      % setup
@@ -787,3 +832,9 @@ get_n_pids_group(Group, N, Acc) ->
         Pid ->
             get_n_pids_group(Group, N - 1, [Pid|Acc])
     end.
+
+children_count(SupId) ->
+    length(supervisor:which_children(SupId)).
+
+starting_members(PoolName) ->
+    length((gen_server:call(PoolName, dump_pool))#pool.starting_members).