Browse Source

Start members in init in parallel

The pooler_starter server is now a single-use process. Calling
pooler_starter:start_member creates a new server which will start a
member, notify pooler, and then exit normally. This means we won't
leak pooler_starter processes and don't have to add bookkeeping to
track them.

In addition, pooler_starter now allows for a raw message mode in which
the created member is sent to a process as a raw message rather than
via pooler:accept_member.

The init function in pooler makes use of the raw message mode of
pooler_starter:start_member to initialize members on startup in
parallel. This approach allows reuse of existing code via
do_accept_member, ensures that all processes are supervised (the
proc_lib:spawn cheat in pooler_starter for sync start is now gone),
yet blocks init until the initial count of members have been added.
Seth Falcon 12 years ago
parent
commit
fe4916c193
3 changed files with 94 additions and 113 deletions
  1. 29 32
      src/pooler.erl
  2. 62 78
      src/pooler_starter.erl
  3. 3 3
      src/pooler_starter_sup.erl

+ 29 - 32
src/pooler.erl

@@ -177,8 +177,10 @@ init(#pool{}=Pool) ->
     #pool{init_count = N} = Pool,
     MemberSup = pooler_pool_sup:member_sup_name(Pool),
     Pool1 = set_member_sup(Pool, MemberSup),
+    %% This schedules the next cull when the pool is configured for
+    %% such and is otherwise a no-op.
     Pool2 = cull_members_from_pool(Pool1),
-    {ok, NewPool} = add_members_sync(N, Pool2),
+    {ok, NewPool} = init_members_sync(N, Pool2),
     %% trigger an immediate timeout, handled by handle_info to allow
     %% us to register with pg2. We use the timeout mechanism to ensure
     %% that a server is added to a group only when it is ready to
@@ -303,36 +305,32 @@ starting_member_not_stale(Pool, Now, {_Ref, StartTime}, MaxAgeSecs) ->
             false
     end.
 
-%% @doc Synchronously add `N' members to `Pool'.
--spec add_members_sync(non_neg_integer(), #pool{}) -> {ok, #pool{}}.
-add_members_sync(N, #pool{name = PoolName,
-                          free_pids = Free,
-                          all_members = AllMembers} = Pool) ->
-    {ok, Starter} = pooler_starter_sup:new_starter(),
-    %% start N members in parallel and wait for all to start.
-    NewPids = pooler_starter:start_members_sync(Starter, Pool, N),
-    NewPidCount = length(NewPids),
-    case NewPidCount =:= N of
-        true -> ok;
-        false ->
-            error_logger:error_msg("pool '~s' tried to add ~B members, only added ~B~n",
-                                   [PoolName, N, NewPidCount]),
-            %% consider changing this to a count?
-            send_metric(Pool, events,
-                        {add_pids_failed, N, NewPidCount}, history)
-    end,
+init_members_sync(N, #pool{name = PoolName} = Pool) ->
+    Self = self(),
     StartTime = os:timestamp(),
-    AllMembers1 = lists:foldl(
-                    fun(MPid, Dict) ->
-                            MRef = erlang:monitor(process, MPid),
-                            Entry = {MRef, free, StartTime},
-                            store_all_members(MPid, Entry, Dict)
-                    end, AllMembers, NewPids),
-
-    Pool1 = Pool#pool{free_pids = Free ++ NewPids,
-                      free_count = length(Free) + NewPidCount,
-                      all_members = AllMembers1},
-    {ok, Pool1}.
+    StartRefs = [ {pooler_starter:start_member(Pool, Self), StartTime}
+                  || _I <- lists:seq(1, N) ],
+    Pool1 = Pool#pool{starting_members = StartRefs},
+    case collect_init_members(Pool1) of
+        timeout ->
+            error_logger:error_msg("pool '~s': exceeded timeout waiting for ~B members",
+                                   [PoolName, Pool1#pool.init_count]),
+            error({timeout, "unable to start members"});
+        #pool{} = Pool2 ->
+            {ok, Pool2}
+    end.
+
+collect_init_members(#pool{starting_members = []} = Pool) ->
+    Pool;
+collect_init_members(#pool{} = Pool) ->
+    Timeout = time_as_millis(?DEFAULT_MEMBER_START_TIMEOUT),
+    receive
+        {accept_member, {Ref, Member}} ->
+            collect_init_members(do_accept_member({Ref, Member}, Pool))
+    after
+        Timeout ->
+            timeout
+    end.
 
 -spec take_member_from_pool(#pool{}, {pid(), term()}) ->
                                    {error_no_members | pid(), #pool{}}.
@@ -382,9 +380,8 @@ take_member_from_pool(#pool{init_count = InitCount,
 %% `Pool' record with starting member refs added to field
 %% `starting_members'.
 add_members_async(Count, #pool{starting_members = StartingMembers} = Pool) ->
-    {ok, Starter} = pooler_starter_sup:new_starter(),
     StartTime = os:timestamp(),
-    StartRefs = [ {pooler_starter:start_member(Starter, Pool), StartTime}
+    StartRefs = [ {pooler_starter:start_member(Pool), StartTime}
                   || _I <- lists:seq(1, Count) ],
     Pool#pool{starting_members = StartRefs ++ StartingMembers}.
 

+ 62 - 78
src/pooler_starter.erl

@@ -12,8 +12,10 @@
 %% API Function Exports
 %% ------------------------------------------------------------------
 
--export([start_link/0,
-         start_member/2]).
+-export([start_link/3,
+         start_member/1,
+         start_member/2,
+         stop/1]).
 
 %% ------------------------------------------------------------------
 %% gen_server Function Exports
@@ -35,102 +37,66 @@
 %% API Function Definitions
 %% ------------------------------------------------------------------
 
-start_link() ->
-    gen_server:start_link(?MODULE, [], []).
+start_link(Pool, Ref, Parent) ->
+    gen_server:start_link(?MODULE, {Pool, Ref, Parent}, []).
+
+stop(Starter) ->
+    gen_server:call(Starter, stop).
 
 %% @doc Start a member for the specified `Pool'.
 %%
-%% The start member request is a sent as a cast to the starter
-%% server. The starter server mailbox is treated as the member start
-%% work queue. Members are started serially and sent back to the
-%% requesting pool via `pooler:accept_member/2'. It is expected that
-%% callers keep track of, and limit, their start requests so that the
-%% starters queue doesn't grow unbounded. A likely enhancement would
-%% be to allow parallel starts either by having the starter spawn a
-%% subprocess and manage or by using pg2 to group a number of starter
-%% servers. Note that timeout could be handled client-side using the
-%% `gen_server:call/3' timeout value.
--spec start_member(atom() | pid(), #pool{}) -> reference().
-start_member(Starter, #pool{} = Pool) ->
+%% Member creation with this call is async. This function returns
+%% immediately with a reference. When the member has been created it
+%% is sent to the specified pool via {@link pooler:accept_member/2}.
+%%
+%% Each call starts a single use `pooler_starter' instance via
+%% `pooler_starter_sup'. The instance terminates normally after
+%% creating a single member.
+-spec start_member(#pool{}) -> reference().
+start_member(Pool) ->
     Ref = make_ref(),
-    gen_server:cast(Starter, {start_member, Pool, Ref}),
+    {ok, _Pid} = pooler_starter_sup:new_starter(Pool, Ref, pool),
     Ref.
 
-%% @doc Start `Count' members in parallel and return a list of created
-%% members. The returned list may contain fewer than `Count' members
-%% if errors occured for some member starts.
--spec start_members_sync(atom() | pid(), #pool{},
-                         non_neg_integer()) -> [pid()].
-start_members_sync(Starter, #pool{} = Pool, Count) ->
-    gen_server:call(Starter, {start_members_sync, Pool, Count}, infinity).
+%% @doc Same as {@link start_member/1} except that instead of calling
+%% {@link pooler:accept_member/2} a raw message is sent to `Parent' of
+%% the form `{accept_member, {Ref, Member}'. Where `Member' will
+%% either be the member pid or an error term and `Ref' will be the
+%% reference returned from this function.
+%%
+%% This is used by the init function in the `pooler' to start the
+%% initial set of pool members in parallel.
+start_member(Pool, Parent) ->
+    Ref = make_ref(),
+    {ok, _Pid} = pooler_starter_sup:new_starter(Pool, Ref, Parent),
+    Ref.
 
 %% ------------------------------------------------------------------
 %% gen_server Function Definitions
 %% ------------------------------------------------------------------
+-record(starter, {pool,
+                  ref,
+                  parent}).
 
--spec init([]) -> {'ok', {}}.
-init([]) ->
-    {ok, {}}.
+-spec init({#pool{}, reference(), pid() | atom()}) -> {'ok', #starter{}, 0}.
+init({Pool, Ref, Parent}) ->
+    %% trigger immediate timeout message, which we'll use to trigger
+    %% the member start.
+    {ok, #starter{pool = Pool, ref = Ref, parent = Parent}, 0}.
 
-handle_call({start_members_sync, Pool, Count}, _From, State) ->
-    {reply, do_start_members_sync(Pool, Count), State};
 handle_call(stop, _From, State) ->
     {stop, normal, stop_ok, State};
 handle_call(_Request, _From, State) ->
     {noreply, State}.
 
-handle_cast({start_member, Pool, Ref}, State) ->
-    ok = do_start_member(Pool, Ref),
+handle_cast(_Request, State) ->
     {noreply, State}.
 
-do_start_member(#pool{name = PoolName,
-                      member_sup = PoolSup},
-                Ref) ->
-    case supervisor:start_child(PoolSup, []) of
-        {ok, Pid} ->
-            ok = pooler:accept_member(PoolName, {Ref, Pid}),
-            ok;
-        Error ->
-            error_logger:error_msg("pool '~s' failed to start member: ~p",
-                                   [PoolName, Error]),
-            pooler:accept_member(PoolName, {Ref, Error}),
-            ok
-    end.
-
-do_start_members_sync(#pool{name = PoolName,
-                            member_sup = PoolSup}, Count) ->
-    Parent = self(),
-    StarterPids = [ launch_starter(Parent, PoolName, PoolSup)
-                    || _I <- lists:seq(1, Count) ],
-    gather_pids(StarterPids, []).
-
-launch_starter(Parent, PoolName, PoolSup) ->
-    proc_lib:spawn_link(fun() ->
-                                Result = start_or_log_error(PoolName, PoolSup),
-                                Parent ! {self(), Result}
-                        end).
-
-start_or_log_error(PoolName, PoolSup) ->
-    case supervisor:start_child(PoolSup, []) of
-        {ok, Pid} ->
-            {ok, Pid};
-        Error ->
-            error_logger:error_msg("pool '~s' failed to start member: ~p",
-                                   [PoolName, Error]),
-            error
-    end.
-
-gather_pids([Pid|Rest], Acc) ->
-    receive
-        {Pid, error} ->
-            gather_pids(Rest, Acc);
-        {Pid, {ok, MemberPid}} ->
-            gather_pids(Rest, [MemberPid | Acc])
-    end;
-gather_pids([], Acc) ->
-    Acc.
-
 -spec handle_info(_, _) -> {'noreply', _}.
+handle_info(timeout,
+            #starter{pool = Pool, ref = Ref, parent = Parent} = State) ->
+    ok = do_start_member(Pool, Ref, Parent),
+    {stop, normal, State};
 handle_info(_Info, State) ->
     {noreply, State}.
 
@@ -141,3 +107,21 @@ terminate(_Reason, _State) ->
 -spec code_change(_, _, _) -> {'ok', _}.
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
+
+do_start_member(#pool{name = PoolName, member_sup = PoolSup}, Ref, Parent) ->
+    Msg = case supervisor:start_child(PoolSup, []) of
+              {ok, Pid} ->
+                  {Ref, Pid};
+              Error ->
+                  error_logger:error_msg("pool '~s' failed to start member: ~p",
+                                         [PoolName, Error]),
+                  {Ref, Error}
+          end,
+    send_accept_member(Parent, PoolName, Msg),
+    ok.
+
+send_accept_member(pool, PoolName, Msg) ->
+    pooler:accept_member(PoolName, Msg);
+send_accept_member(Pid, _PoolName, Msg) ->
+    Pid ! {accept_member, Msg},
+    ok.

+ 3 - 3
src/pooler_starter_sup.erl

@@ -6,14 +6,14 @@
 
 -behaviour(supervisor).
 
--export([new_starter/0,
+-export([new_starter/3,
          start_link/0,
          init/1]).
 
 -include("pooler.hrl").
 
-new_starter() ->
-    supervisor:start_child(?MODULE, []).
+new_starter(Pool, Ref, Parent) ->
+    supervisor:start_child(?MODULE, [Pool, Ref, Parent]).
 
 start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).