Browse Source

Merge pull request #90 from seriyps/optimize-worker-start

Optimize worker start
Sergey Prokhorov 2 years ago
parent
commit
8b578d618d
5 changed files with 132 additions and 57 deletions
  1. 14 15
      src/pooler.erl
  2. 1 1
      src/pooler.hrl
  3. 41 28
      src/pooler_starter.erl
  4. 4 5
      src/pooler_starter_sup.erl
  5. 72 8
      test/bench_take_return.erl

+ 14 - 15
src/pooler.erl

@@ -238,9 +238,9 @@ pool_child_spec(PoolConfig) ->
     pooler_sup:pool_child_spec(PoolConfig).
 
 %% @doc For INTERNAL use. Adds `MemberPid' to the pool.
--spec accept_member(atom() | pid(), pid() | {noproc, _}) -> ok.
-accept_member(PoolName, MemberPid) ->
-    gen_server:call(PoolName, {accept_member, MemberPid}).
+-spec accept_member(pool_name(), pooler_starter:start_result()) -> ok.
+accept_member(PoolName, StartResult) ->
+    gen_server:call(PoolName, {accept_member, StartResult}).
 
 %% @doc Obtain exclusive access to a member from `PoolName'.
 %%
@@ -430,8 +430,8 @@ handle_call({take_member, Timeout}, From = {APid, _}, #pool{} = Pool) when is_pi
     maybe_reply(take_member_from_pool_queued(Pool, From, Timeout));
 handle_call({return_member, Pid, Status}, {_CPid, _Tag}, Pool) ->
     {reply, ok, do_return_member(Pid, Status, Pool)};
-handle_call({accept_member, Pid}, _From, Pool) ->
-    {reply, ok, do_accept_member(Pid, Pool)};
+handle_call({accept_member, StartResult}, _From, Pool) ->
+    {reply, ok, do_accept_member(StartResult, Pool)};
 handle_call(stop, _From, Pool) ->
     {stop, normal, stop_ok, Pool};
 handle_call(pool_stats, _From, Pool) ->
@@ -514,7 +514,7 @@ do_accept_member(
     Pool1 =
         #pool{starting_members = StartingMembers} =
         remove_stale_starting_members(Pool, StartingMembers0, StartTimeout),
-    case lists:keymember(StarterPid, 1, StartingMembers) of
+    case lists:keytake(StarterPid, 1, StartingMembers) of
         false ->
             %% A starter completed even though we invalidated the pid
             %% Ask the starter to kill the child and stop. In most cases, the
@@ -524,8 +524,7 @@ do_accept_member(
             %% In this case, we should cleanup.
             pooler_starter:stop_member_async(StarterPid),
             Pool1;
-        true ->
-            StartingMembers1 = lists:keydelete(StarterPid, 1, StartingMembers),
+        {value, _, StartingMembers1} ->
             MRef = erlang:monitor(process, Pid),
             Entry = {MRef, free, os:timestamp()},
             AllMembers1 = store_all_members(Pid, Entry, AllMembers),
@@ -628,7 +627,7 @@ take_member_bookkeeping(
 
 -spec remove_stale_starting_members(
     #pool{},
-    [{reference(), erlang:timestamp()}],
+    [{pid(), erlang:timestamp()}],
     time_spec()
 ) -> #pool{}.
 remove_stale_starting_members(Pool, StartingMembers, MaxAge) ->
@@ -660,11 +659,11 @@ accumulate_starting_member_not_stale(Pool, Now, SM = {Pid, StartTime}, MaxAgeSec
             AccIn
     end.
 
-init_members_sync(N, #pool{name = PoolName} = Pool) ->
+init_members_sync(N, #pool{name = PoolName, member_sup = MemberSup} = Pool) ->
     Self = self(),
     StartTime = os:timestamp(),
     StartRefs = [
-        {pooler_starter:start_member(Pool, Self), StartTime}
+        {pooler_starter:start_member(PoolName, MemberSup, Self), StartTime}
      || _I <- lists:seq(1, N)
     ],
     Pool1 = Pool#pool{starting_members = StartRefs},
@@ -690,8 +689,8 @@ collect_init_members(#pool{starting_members = Empty} = Pool) when
 collect_init_members(#pool{member_start_timeout = StartTimeout} = Pool) ->
     Timeout = time_as_millis(StartTimeout),
     receive
-        {accept_member, {Ref, Member}} ->
-            collect_init_members(do_accept_member({Ref, Member}, Pool))
+        {accept_member, {_, _} = StartResult} ->
+            collect_init_members(do_accept_member(StartResult, Pool))
     after Timeout ->
         timeout
     end.
@@ -783,10 +782,10 @@ take_member_from_pool_queued(
 %% @doc Add `Count' members to `Pool' asynchronously. Returns updated
 %% `Pool' record with starting member refs added to field
 %% `starting_members'.
-add_members_async(Count, #pool{starting_members = StartingMembers} = Pool) ->
+add_members_async(Count, #pool{name = PoolName, member_sup = MemberSup, starting_members = StartingMembers} = Pool) ->
     StartTime = os:timestamp(),
     StartRefs = [
-        {pooler_starter:start_member(Pool), StartTime}
+        {pooler_starter:start_member(PoolName, MemberSup), StartTime}
      || _I <- lists:seq(1, Count)
     ],
     Pool#pool{starting_members = StartRefs ++ StartingMembers}.

+ 1 - 1
src/pooler.hrl

@@ -60,7 +60,7 @@
     %% new member start requests that are in-flight. The
     %% timestamp records when the start request was initiated
     %% and is used to implement start timeout.
-    starting_members = [] :: [{reference(), erlang:timestamp()}],
+    starting_members = [] :: [{pid(), erlang:timestamp()}],
 
     %% The maximum amount of time to allow for member start.
     member_start_timeout = ?DEFAULT_MEMBER_START_TIMEOUT :: pooler:time_spec(),

+ 41 - 28
src/pooler_starter.erl

@@ -13,9 +13,9 @@
 %% ------------------------------------------------------------------
 
 -export([
-    start_link/2,
-    start_member/1,
+    start_link/1,
     start_member/2,
+    start_member/3,
     stop_member_async/1,
     stop/1
 ]).
@@ -26,6 +26,7 @@
 
 -export([
     init/1,
+    handle_continue/2,
     handle_call/3,
     handle_cast/2,
     handle_info/2,
@@ -33,12 +34,20 @@
     code_change/3
 ]).
 
+-export_type([start_spec/0, start_result/0]).
+
+-type pool_member_sup() :: pid() | atom().
+-type parent() :: pid() | pool.
+-type start_result() :: {StarterPid :: pid(), Result :: pid() | {error, _}}.
+-opaque start_spec() :: {pooler:pool_name(), pool_member_sup(), parent()}.
+
 %% ------------------------------------------------------------------
 %% API Function Definitions
 %% ------------------------------------------------------------------
 
-start_link(Pool, Parent) ->
-    gen_server:start_link(?MODULE, {Pool, Parent}, []).
+-spec start_link(start_spec()) -> {ok, pid()}.
+start_link({_, _, _} = Spec) ->
+    gen_server:start_link(?MODULE, Spec, []).
 
 stop(Starter) ->
     gen_server:cast(Starter, stop).
@@ -53,9 +62,9 @@ stop(Starter) ->
 %% Each call starts a single use `pooler_starter' instance via
 %% `pooler_starter_sup'. The instance terminates normally after
 %% creating a single member.
--spec start_member(#pool{}) -> pid().
-start_member(Pool) ->
-    {ok, Pid} = pooler_starter_sup:new_starter(Pool, pool),
+-spec start_member(pooler:pool_name(), pool_member_sup()) -> pid().
+start_member(PoolName, PoolMemberSup) ->
+    {ok, Pid} = pooler_starter_sup:new_starter({PoolName, PoolMemberSup, pool}),
     Pid.
 
 %% @doc Same as {@link start_member/1} except that instead of calling
@@ -66,8 +75,9 @@ start_member(Pool) ->
 %%
 %% This is used by the init function in the `pooler' to start the
 %% initial set of pool members in parallel.
-start_member(Pool, Parent) ->
-    {ok, Pid} = pooler_starter_sup:new_starter(Pool, Parent),
+-spec start_member(pooler:pool_name(), pool_member_sup(), pid()) -> pid().
+start_member(PoolName, PoolMemberSup, Parent) ->
+    {ok, Pid} = pooler_starter_sup:new_starter({PoolName, PoolMemberSup, Parent}),
     Pid.
 
 %% @doc Stop a member in the pool
@@ -84,26 +94,34 @@ stop_member_async(Pid) ->
 %% gen_server Function Definitions
 %% ------------------------------------------------------------------
 -record(starter, {
-    pool :: #pool{},
-    parent :: pid() | atom(),
-    msg :: term()
+    parent :: parent(),
+    pool_name :: pooler:pool_name(),
+    pool_member_sup :: pool_member_sup(),
+    msg :: start_result() | undefined
 }).
 
--spec init({#pool{}, pid() | atom()}) -> {'ok', #starter{}, 0}.
-init({Pool, Parent}) ->
-    %% trigger immediate timeout message, which we'll use to trigger
-    %% the member start.
-    {ok, #starter{pool = Pool, parent = Parent}, 0}.
+-spec init(start_spec()) -> {ok, #starter{}, {continue, start}}.
+init({PoolName, PoolMemberSup, Parent}) ->
+    {ok, #starter{pool_name = PoolName, pool_member_sup = PoolMemberSup, parent = Parent}, {continue, start}}.
+
+handle_continue(
+    start,
+    #starter{pool_member_sup = PoolSup, pool_name = PoolName} = State
+) ->
+    Msg = do_start_member(PoolSup, PoolName),
+    % asynchronously in order to receive potential `stop*'
+    accept_member_async(self()),
+    {noreply, State#starter{msg = Msg}}.
 
 handle_call(_Request, _From, State) ->
     {noreply, State}.
 
-handle_cast(stop_member, #starter{msg = {_Me, Pid}, pool = #pool{member_sup = MemberSup}} = State) ->
+handle_cast(stop_member, #starter{msg = {_Me, Pid}, pool_member_sup = MemberSup} = State) ->
     %% The process we were starting is no longer valid for the pool.
     %% Cleanup the process and stop normally.
     supervisor:terminate_child(MemberSup, Pid),
     {stop, normal, State};
-handle_cast(accept_member, #starter{msg = Msg, parent = Parent, pool = #pool{name = PoolName}} = State) ->
+handle_cast(accept_member, #starter{msg = Msg, parent = Parent, pool_name = PoolName} = State) ->
     %% Process creation has succeeded. Send the member to the pooler
     %% gen_server to be accepted. Pooler gen_server will notify
     %% us if the member was accepted or needs to cleaned up.
@@ -114,14 +132,6 @@ handle_cast(stop, State) ->
 handle_cast(_Request, State) ->
     {noreply, State}.
 
--spec handle_info(_, _) -> {'noreply', _}.
-handle_info(
-    timeout,
-    #starter{pool = Pool} = State
-) ->
-    Msg = do_start_member(Pool),
-    accept_member_async(self()),
-    {noreply, State#starter{msg = Msg}};
 handle_info(_Info, State) ->
     {noreply, State}.
 
@@ -133,7 +143,7 @@ terminate(_Reason, _State) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-do_start_member(#pool{member_sup = PoolSup, name = PoolName}) ->
+do_start_member(PoolSup, PoolName) ->
     case supervisor:start_child(PoolSup, []) of
         {ok, Pid} ->
             {self(), Pid};
@@ -149,9 +159,12 @@ do_start_member(#pool{member_sup = PoolSup, name = PoolName}) ->
             {self(), Error}
     end.
 
+-spec send_accept_member(parent(), pooler:pool_name(), start_result()) -> ok.
 send_accept_member(pool, PoolName, Msg) ->
+    %% used to grow pool
     pooler:accept_member(PoolName, Msg);
 send_accept_member(Pid, _PoolName, Msg) ->
+    %% used during pool initialization
     Pid ! {accept_member, Msg},
     ok.
 

+ 4 - 5
src/pooler_starter_sup.erl

@@ -7,15 +7,14 @@
 -behaviour(supervisor).
 
 -export([
-    new_starter/2,
+    new_starter/1,
     start_link/0,
     init/1
 ]).
 
--include("pooler.hrl").
-
-new_starter(Pool, Parent) ->
-    supervisor:start_child(?MODULE, [Pool, Parent]).
+-spec new_starter(pooler_starter:start_spec()) -> {ok, pid()}.
+new_starter(Spec) ->
+    supervisor:start_child(?MODULE, [Spec]).
 
 start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).

+ 72 - 8
test/bench_take_return.erl

@@ -14,7 +14,9 @@
     size_500_clients_50_take_return_all/1,
     bench_size_500_clients_50_take_return_all/2,
     size_0_max_500_take_return_all/1,
-    bench_size_0_max_500_take_return_all/2
+    bench_size_0_max_500_take_return_all/2,
+    size_0_max_500_clients_50_take_return_all/1,
+    bench_size_0_max_500_clients_50_take_return_all/2
 ]).
 
 %% @doc Pool of fixed size 5 - try to take just one member and instantly return
@@ -102,9 +104,9 @@ size_500_clients_50_take_return_all(init) ->
     PerClient = PoolSize div NumClients,
     start_fixed(?FUNCTION_NAME, 500),
     Clients = [
-        erlang:spawn(
+        erlang:spawn_link(
             fun() ->
-                client(?FUNCTION_NAME, PerClient)
+                client(?FUNCTION_NAME, 0, PerClient)
             end
         )
      || _ <- lists:seq(1, NumClients)
@@ -113,7 +115,13 @@ size_500_clients_50_take_return_all(init) ->
 size_500_clients_50_take_return_all({input, {_Pool, _Size, _Clients}}) ->
     [];
 size_500_clients_50_take_return_all({stop, {PoolName, _Size, Clients}}) ->
-    [exit(Pid, shutdown) || Pid <- Clients],
+    [
+        begin
+            unlink(Pid),
+            exit(Pid, shutdown)
+        end
+     || Pid <- Clients
+    ],
     stop(PoolName).
 
 bench_size_500_clients_50_take_return_all(_Input, {_PoolName, _Size, Clients}) ->
@@ -132,7 +140,7 @@ bench_size_500_clients_50_take_return_all(_Input, {_PoolName, _Size, Clients}) -
         Clients
     ).
 
-%% @doc Artificial example: pool with init_count=500, max_count=500 that is culled to 0 on each iteration.
+%% @doc Artificial example: pool with init_count=0, max_count=500 that is culled to 0 on each iteration.
 %% Try to take 500 workers sequentially and instantly return them (and trigger culling).
 %% This benchmark, while is quite unrealistic (why have pool that instantly kills workers), but it
 %% helps to test worker spawn/kill routines.
@@ -158,6 +166,62 @@ bench_size_0_max_500_take_return_all(_Input, {PoolName, Size}) ->
     0 = proplists:get_value(free_count, Utilization),
     0 = proplists:get_value(in_use_count, Utilization).
 
+%% @doc Pool with init_count=0, max_count=500 that is culled to 0 on each iteration, taken by 50 workers
+%%
+%% Artificial example (because pool is instantly culled), but with parallel consumers we can test concurrent
+%% pool worker starter
+size_0_max_500_clients_50_take_return_all(init) ->
+    PoolSize = 500,
+    NumClients = 50,
+    PerClient = PoolSize div NumClients,
+    start([
+        {name, ?FUNCTION_NAME},
+        {init_count, 0},
+        {max_count, PoolSize},
+        {max_age, {0, sec}},
+        {queue_max, 500}
+    ]),
+    Clients = [
+        erlang:spawn_link(
+            fun() ->
+                client(?FUNCTION_NAME, 500, PerClient)
+            end
+        )
+     || _ <- lists:seq(1, NumClients)
+    ],
+    {?FUNCTION_NAME, 500, Clients};
+size_0_max_500_clients_50_take_return_all({input, {_Pool, _Size, _Clients}}) ->
+    [];
+size_0_max_500_clients_50_take_return_all({stop, {PoolName, _Size, Clients}}) ->
+    [
+        begin
+            unlink(Pid),
+            exit(Pid, shutdown)
+        end
+     || Pid <- Clients
+    ],
+    stop(PoolName).
+
+bench_size_0_max_500_clients_50_take_return_all(_Input, {PoolName, _Size, Clients}) ->
+    Ref = erlang:make_ref(),
+    Self = self(),
+    lists:foreach(fun(C) -> C ! {do, Self, Ref} end, Clients),
+    lists:foreach(
+        fun(_C) ->
+            receive
+                {done, RecRef} ->
+                    RecRef = Ref
+            after 5000 ->
+                error(timeout)
+            end
+        end,
+        Clients
+    ),
+    whereis(PoolName) ! cull_pool,
+    Utilization = pooler:pool_utilization(PoolName),
+    0 = proplists:get_value(free_count, Utilization),
+    0 = proplists:get_value(in_use_count, Utilization).
+
 %% Internal
 
 start_fixed(Name, Size) ->
@@ -188,11 +252,11 @@ take_n(PoolName, Timeout, N) ->
     true = is_pid(Member),
     [Member | take_n(PoolName, Timeout, N - 1)].
 
-client(Pool, N) ->
+client(Pool, Timeout, N) ->
     receive
         {do, From, Ref} ->
-            Taken = take_n(Pool, N),
+            Taken = take_n(Pool, Timeout, N),
             [pooler:return_member(Pool, Member) || Member <- Taken],
             From ! {done, Ref}
     end,
-    client(Pool, N).
+    client(Pool, Timeout, N).