Browse Source

Don't send whole pool to `pooler_starter`, only send necessary fields

Sergey Prokhorov 2 years ago
parent
commit
e64f07de9d
4 changed files with 58 additions and 46 deletions
  1. 12 12
      src/pooler.erl
  2. 1 1
      src/pooler.hrl
  3. 41 28
      src/pooler_starter.erl
  4. 4 5
      src/pooler_starter_sup.erl

+ 12 - 12
src/pooler.erl

@@ -238,9 +238,9 @@ pool_child_spec(PoolConfig) ->
     pooler_sup:pool_child_spec(PoolConfig).
 
 %% @doc For INTERNAL use. Adds `MemberPid' to the pool.
--spec accept_member(atom() | pid(), pid() | {noproc, _}) -> ok.
-accept_member(PoolName, MemberPid) ->
-    gen_server:call(PoolName, {accept_member, MemberPid}).
+-spec accept_member(pool_name(), pooler_starter:start_result()) -> ok.
+accept_member(PoolName, StartResult) ->
+    gen_server:call(PoolName, {accept_member, StartResult}).
 
 %% @doc Obtain exclusive access to a member from `PoolName'.
 %%
@@ -430,8 +430,8 @@ handle_call({take_member, Timeout}, From = {APid, _}, #pool{} = Pool) when is_pi
     maybe_reply(take_member_from_pool_queued(Pool, From, Timeout));
 handle_call({return_member, Pid, Status}, {_CPid, _Tag}, Pool) ->
     {reply, ok, do_return_member(Pid, Status, Pool)};
-handle_call({accept_member, Pid}, _From, Pool) ->
-    {reply, ok, do_accept_member(Pid, Pool)};
+handle_call({accept_member, StartResult}, _From, Pool) ->
+    {reply, ok, do_accept_member(StartResult, Pool)};
 handle_call(stop, _From, Pool) ->
     {stop, normal, stop_ok, Pool};
 handle_call(pool_stats, _From, Pool) ->
@@ -628,7 +628,7 @@ take_member_bookkeeping(
 
 -spec remove_stale_starting_members(
     #pool{},
-    [{reference(), erlang:timestamp()}],
+    [{pid(), erlang:timestamp()}],
     time_spec()
 ) -> #pool{}.
 remove_stale_starting_members(Pool, StartingMembers, MaxAge) ->
@@ -660,11 +660,11 @@ accumulate_starting_member_not_stale(Pool, Now, SM = {Pid, StartTime}, MaxAgeSec
             AccIn
     end.
 
-init_members_sync(N, #pool{name = PoolName} = Pool) ->
+init_members_sync(N, #pool{name = PoolName, member_sup = MemberSup} = Pool) ->
     Self = self(),
     StartTime = os:timestamp(),
     StartRefs = [
-        {pooler_starter:start_member(Pool, Self), StartTime}
+        {pooler_starter:start_member(PoolName, MemberSup, Self), StartTime}
      || _I <- lists:seq(1, N)
     ],
     Pool1 = Pool#pool{starting_members = StartRefs},
@@ -690,8 +690,8 @@ collect_init_members(#pool{starting_members = Empty} = Pool) when
 collect_init_members(#pool{member_start_timeout = StartTimeout} = Pool) ->
     Timeout = time_as_millis(StartTimeout),
     receive
-        {accept_member, {Ref, Member}} ->
-            collect_init_members(do_accept_member({Ref, Member}, Pool))
+        {accept_member, {_, _} = StartResult} ->
+            collect_init_members(do_accept_member(StartResult, Pool))
     after Timeout ->
         timeout
     end.
@@ -783,10 +783,10 @@ take_member_from_pool_queued(
 %% @doc Add `Count' members to `Pool' asynchronously. Returns updated
 %% `Pool' record with starting member refs added to field
 %% `starting_members'.
-add_members_async(Count, #pool{starting_members = StartingMembers} = Pool) ->
+add_members_async(Count, #pool{name = PoolName, member_sup = MemberSup, starting_members = StartingMembers} = Pool) ->
     StartTime = os:timestamp(),
     StartRefs = [
-        {pooler_starter:start_member(Pool), StartTime}
+        {pooler_starter:start_member(PoolName, MemberSup), StartTime}
      || _I <- lists:seq(1, Count)
     ],
     Pool#pool{starting_members = StartRefs ++ StartingMembers}.

+ 1 - 1
src/pooler.hrl

@@ -60,7 +60,7 @@
     %% new member start requests that are in-flight. The
     %% timestamp records when the start request was initiated
     %% and is used to implement start timeout.
-    starting_members = [] :: [{reference(), erlang:timestamp()}],
+    starting_members = [] :: [{pid(), erlang:timestamp()}],
 
     %% The maximum amount of time to allow for member start.
     member_start_timeout = ?DEFAULT_MEMBER_START_TIMEOUT :: pooler:time_spec(),

+ 41 - 28
src/pooler_starter.erl

@@ -13,9 +13,9 @@
 %% ------------------------------------------------------------------
 
 -export([
-    start_link/2,
-    start_member/1,
+    start_link/1,
     start_member/2,
+    start_member/3,
     stop_member_async/1,
     stop/1
 ]).
@@ -26,6 +26,7 @@
 
 -export([
     init/1,
+    handle_continue/2,
     handle_call/3,
     handle_cast/2,
     handle_info/2,
@@ -33,12 +34,20 @@
     code_change/3
 ]).
 
+-export_type([start_spec/0, start_result/0]).
+
+-type pool_member_sup() :: pid() | atom().
+-type parent() :: pid() | pool.
+-type start_result() :: {StarterPid :: pid(), Result :: pid() | {error, _}}.
+-opaque start_spec() :: {pooler:pool_name(), pool_member_sup(), parent()}.
+
 %% ------------------------------------------------------------------
 %% API Function Definitions
 %% ------------------------------------------------------------------
 
-start_link(Pool, Parent) ->
-    gen_server:start_link(?MODULE, {Pool, Parent}, []).
+-spec start_link(start_spec()) -> {ok, pid()}.
+start_link({_, _, _} = Spec) ->
+    gen_server:start_link(?MODULE, Spec, []).
 
 stop(Starter) ->
     gen_server:cast(Starter, stop).
@@ -53,9 +62,9 @@ stop(Starter) ->
 %% Each call starts a single use `pooler_starter' instance via
 %% `pooler_starter_sup'. The instance terminates normally after
 %% creating a single member.
--spec start_member(#pool{}) -> pid().
-start_member(Pool) ->
-    {ok, Pid} = pooler_starter_sup:new_starter(Pool, pool),
+-spec start_member(pooler:pool_name(), pool_member_sup()) -> pid().
+start_member(PoolName, PoolMemberSup) ->
+    {ok, Pid} = pooler_starter_sup:new_starter({PoolName, PoolMemberSup, pool}),
     Pid.
 
 %% @doc Same as {@link start_member/1} except that instead of calling
@@ -66,8 +75,9 @@ start_member(Pool) ->
 %%
 %% This is used by the init function in the `pooler' to start the
 %% initial set of pool members in parallel.
-start_member(Pool, Parent) ->
-    {ok, Pid} = pooler_starter_sup:new_starter(Pool, Parent),
+-spec start_member(pooler:pool_name(), pool_member_sup(), pid()) -> pid().
+start_member(PoolName, PoolMemberSup, Parent) ->
+    {ok, Pid} = pooler_starter_sup:new_starter({PoolName, PoolMemberSup, Parent}),
     Pid.
 
 %% @doc Stop a member in the pool
@@ -84,26 +94,34 @@ stop_member_async(Pid) ->
 %% gen_server Function Definitions
 %% ------------------------------------------------------------------
 -record(starter, {
-    pool :: #pool{},
-    parent :: pid() | atom(),
-    msg :: term()
+    parent :: parent(),
+    pool_name :: pooler:pool_name(),
+    pool_member_sup :: pool_member_sup(),
+    msg :: start_result() | undefined
 }).
 
--spec init({#pool{}, pid() | atom()}) -> {'ok', #starter{}, 0}.
-init({Pool, Parent}) ->
-    %% trigger immediate timeout message, which we'll use to trigger
-    %% the member start.
-    {ok, #starter{pool = Pool, parent = Parent}, 0}.
+-spec init(start_spec()) -> {ok, #starter{}, {continue, start}}.
+init({PoolName, PoolMemberSup, Parent}) ->
+    {ok, #starter{pool_name = PoolName, pool_member_sup = PoolMemberSup, parent = Parent}, {continue, start}}.
+
+handle_continue(
+    start,
+    #starter{pool_member_sup = PoolSup, pool_name = PoolName} = State
+) ->
+    Msg = do_start_member(PoolSup, PoolName),
+    % asynchronously in order to receive potential `stop*'
+    accept_member_async(self()),
+    {noreply, State#starter{msg = Msg}}.
 
 handle_call(_Request, _From, State) ->
     {noreply, State}.
 
-handle_cast(stop_member, #starter{msg = {_Me, Pid}, pool = #pool{member_sup = MemberSup}} = State) ->
+handle_cast(stop_member, #starter{msg = {_Me, Pid}, pool_member_sup = MemberSup} = State) ->
     %% The process we were starting is no longer valid for the pool.
     %% Cleanup the process and stop normally.
     supervisor:terminate_child(MemberSup, Pid),
     {stop, normal, State};
-handle_cast(accept_member, #starter{msg = Msg, parent = Parent, pool = #pool{name = PoolName}} = State) ->
+handle_cast(accept_member, #starter{msg = Msg, parent = Parent, pool_name = PoolName} = State) ->
     %% Process creation has succeeded. Send the member to the pooler
     %% gen_server to be accepted. Pooler gen_server will notify
     %% us if the member was accepted or needs to cleaned up.
@@ -114,14 +132,6 @@ handle_cast(stop, State) ->
 handle_cast(_Request, State) ->
     {noreply, State}.
 
--spec handle_info(_, _) -> {'noreply', _}.
-handle_info(
-    timeout,
-    #starter{pool = Pool} = State
-) ->
-    Msg = do_start_member(Pool),
-    accept_member_async(self()),
-    {noreply, State#starter{msg = Msg}};
 handle_info(_Info, State) ->
     {noreply, State}.
 
@@ -133,7 +143,7 @@ terminate(_Reason, _State) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-do_start_member(#pool{member_sup = PoolSup, name = PoolName}) ->
+do_start_member(PoolSup, PoolName) ->
     case supervisor:start_child(PoolSup, []) of
         {ok, Pid} ->
             {self(), Pid};
@@ -149,9 +159,12 @@ do_start_member(#pool{member_sup = PoolSup, name = PoolName}) ->
             {self(), Error}
     end.
 
+-spec send_accept_member(parent(), pooler:pool_name(), start_result()) -> ok.
 send_accept_member(pool, PoolName, Msg) ->
+    %% used to grow pool
     pooler:accept_member(PoolName, Msg);
 send_accept_member(Pid, _PoolName, Msg) ->
+    %% used during pool initialization
     Pid ! {accept_member, Msg},
     ok.
 

+ 4 - 5
src/pooler_starter_sup.erl

@@ -7,15 +7,14 @@
 -behaviour(supervisor).
 
 -export([
-    new_starter/2,
+    new_starter/1,
     start_link/0,
     init/1
 ]).
 
--include("pooler.hrl").
-
-new_starter(Pool, Parent) ->
-    supervisor:start_child(?MODULE, [Pool, Parent]).
+-spec new_starter(pooler_starter:start_spec()) -> {ok, pid()}.
+new_starter(Spec) ->
+    supervisor:start_child(?MODULE, [Spec]).
 
 start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).