|
@@ -323,7 +323,8 @@ add_pids(N, Pool) ->
|
|
|
|
|
|
-spec take_member_from_pool(#pool{}, {pid(), term()}) ->
|
|
|
{error_no_members | pid(), #pool{}}.
|
|
|
-take_member_from_pool(#pool{max_count = Max,
|
|
|
+take_member_from_pool(#pool{init_count = InitCount,
|
|
|
+ max_count = Max,
|
|
|
free_pids = Free,
|
|
|
in_use_count = NumInUse,
|
|
|
free_count = NumFree,
|
|
@@ -331,38 +332,29 @@ take_member_from_pool(#pool{max_count = Max,
|
|
|
starting_members = StartingMembers} = Pool,
|
|
|
From) ->
|
|
|
send_metric(Pool, take_rate, 1, meter),
|
|
|
- CanAdd = (NumInUse + length(StartingMembers)) < Max,
|
|
|
+ NumCanAdd = Max - (NumInUse + NumFree + length(StartingMembers)),
|
|
|
case Free of
|
|
|
- [] when CanAdd =:= false ->
|
|
|
+ [] when NumCanAdd =< 0 ->
|
|
|
send_metric(Pool, error_no_members_count, {inc, 1}, counter),
|
|
|
send_metric(Pool, events, error_no_members, history),
|
|
|
{error_no_members, Pool};
|
|
|
- [] when CanAdd =:= true ->
|
|
|
- %% request async member creation, return error for now.
|
|
|
- %% also should verify that starting_members length will
|
|
|
- %% not exceed max size if all come back success. need a
|
|
|
- %% reference to the starter supervisor or else we reuse a
|
|
|
- %% starter for now.
|
|
|
+ [] when NumCanAdd > 0 ->
|
|
|
{ok, Starter} = pooler_starter_sup:new_starter(),
|
|
|
- StartRef = pooler_starter:start_member(Starter, Pool),
|
|
|
- %% case add_pids(1, Pool) of
|
|
|
- %% {ok, Pool1} ->
|
|
|
- %% %% add_pids may have updated our pool
|
|
|
- %% take_member_from_pool(Pool1, From, Retries - 1);
|
|
|
- %% {max_count_reached, _} ->
|
|
|
- %% send_metric(Pool, error_no_members_count, {inc, 1}, counter),
|
|
|
- %% send_metric(Pool, events, error_no_members, history),
|
|
|
- %% {error_no_members, Pool}
|
|
|
- %% end,
|
|
|
+ %% Limit concurrently starting members to init_count. Add
|
|
|
+ %% up to init_count members. Starting members here means
|
|
|
+ %% we always return an error_no_members for a take request
|
|
|
+ %% when all members are in-use. By adding a batch of new
|
|
|
+ %% members, the pool should reach a steady state with
|
|
|
+ %% unused members culled over time (if scheduled cull is
|
|
|
+ %% enabled).
|
|
|
+ NumToAdd = min(InitCount - length(StartingMembers), NumCanAdd),
|
|
|
+ StartTime = os:timestamp(),
|
|
|
+ StartRefs = [ {pooler_starter:start_member(Starter, Pool), StartTime}
|
|
|
+ || _I <- lists:seq(1, NumToAdd) ],
|
|
|
send_metric(Pool, error_no_members_count, {inc, 1}, counter),
|
|
|
send_metric(Pool, events, error_no_members, history),
|
|
|
- StartingMembers1 = [{StartRef, os:timestamp()} | StartingMembers],
|
|
|
+ StartingMembers1 = StartRefs ++ StartingMembers,
|
|
|
{error_no_members, Pool#pool{starting_members = StartingMembers1}};
|
|
|
- %% [] when Retries =:= 0 ->
|
|
|
- %% %% max retries reached
|
|
|
- %% send_metric(Pool, error_no_members_count, {inc, 1}, counter),
|
|
|
- %% send_metric(Pool, events, error_no_members, history),
|
|
|
- %% {error_no_members, Pool};
|
|
|
[Pid|Rest] ->
|
|
|
Pool1 = Pool#pool{free_pids = Rest, in_use_count = NumInUse + 1,
|
|
|
free_count = NumFree - 1},
|