|
@@ -30,6 +30,7 @@
|
|
|
-export([accept_member/2,
|
|
|
start_link/1,
|
|
|
take_member/1,
|
|
|
+ take_member/2,
|
|
|
take_group_member/1,
|
|
|
return_group_member/2,
|
|
|
return_group_member/3,
|
|
@@ -40,7 +41,8 @@
|
|
|
new_pool/1,
|
|
|
pool_child_spec/1,
|
|
|
rm_pool/1,
|
|
|
- rm_group/1]).
|
|
|
+ rm_group/1
|
|
|
+ ]).
|
|
|
|
|
|
%% ------------------------------------------------------------------
|
|
|
%% gen_server Function Exports
|
|
@@ -176,7 +178,19 @@ accept_member(PoolName, MemberPid) ->
|
|
|
%%
|
|
|
-spec take_member(atom() | pid()) -> pid() | error_no_members.
|
|
|
take_member(PoolName) when is_atom(PoolName) orelse is_pid(PoolName) ->
|
|
|
- gen_server:call(PoolName, take_member, infinity).
|
|
|
+ gen_server:call(PoolName, {take_member, 0}, infinity).
|
|
|
+
|
|
|
+%% @doc Obtain exclusive access to a member of 'PoolName'.
|
|
|
+%%
|
|
|
+%% If no members are available, wait for up to Timeout milliseconds for a member
|
|
|
+%% to become available. Waiting requests are served in FIFO order. If no member
|
|
|
+%% is available within the specified timeout, error_no_members is returned.
|
|
|
+%% `Timeout' can be either milliseconds as integer or `{duration, time_unit}'
|
|
|
+%%
|
|
|
+-spec take_member(atom() | pid(), non_neg_integer() | time_spec()) -> pid() | error_no_members.
|
|
|
+take_member(PoolName, Timeout) when is_atom(PoolName) orelse is_pid(PoolName) ->
|
|
|
+ gen_server:call(PoolName, {take_member, time_as_millis(Timeout)}, infinity).
|
|
|
+
|
|
|
|
|
|
%% @doc Take a member from a randomly selected member of the group
|
|
|
%% `GroupName'. Returns `MemberPid' or `error_no_members'. If no
|
|
@@ -297,9 +311,10 @@ init(#pool{}=Pool) ->
|
|
|
|
|
|
set_member_sup(#pool{} = Pool, MemberSup) ->
|
|
|
Pool#pool{member_sup = MemberSup}.
|
|
|
-handle_call(take_member, {CPid, _Tag}, #pool{} = Pool) ->
|
|
|
- {Member, NewPool} = take_member_from_pool(Pool, CPid),
|
|
|
- {reply, Member, NewPool};
|
|
|
+
|
|
|
+handle_call({take_member, Timeout}, From = {APid, _}, #pool{} = Pool) when is_pid(APid) ->
|
|
|
+ maybe_reply(take_member_from_pool_queued(Pool, From, Timeout));
|
|
|
+
|
|
|
handle_call({return_member, Pid, Status}, {_CPid, _Tag}, Pool) ->
|
|
|
{reply, ok, do_return_member(Pid, Status, Pool)};
|
|
|
handle_call({accept_member, Pid}, _From, Pool) ->
|
|
@@ -318,6 +333,14 @@ handle_cast(_Msg, Pool) ->
|
|
|
{noreply, Pool}.
|
|
|
|
|
|
-spec handle_info(_, _) -> {'noreply', _}.
|
|
|
+handle_info({requestor_timeout, From}, Pool = #pool{ queued_requestors = RequestorQueue }) ->
|
|
|
+ NewQueue = queue:filter(fun({RequestorFrom, _TRef}) when RequestorFrom =:= From ->
|
|
|
+ gen_server:reply(RequestorFrom, error_no_members),
|
|
|
+ false;
|
|
|
+ ({_, _}) ->
|
|
|
+ true
|
|
|
+ end, RequestorQueue),
|
|
|
+ {noreply, Pool#pool{ queued_requestors = NewQueue} };
|
|
|
handle_info(timeout, #pool{group = undefined} = Pool) ->
|
|
|
%% ignore
|
|
|
{noreply, Pool};
|
|
@@ -365,13 +388,12 @@ code_change(_OldVsn, State, _Extra) ->
|
|
|
do_accept_member({StarterPid, Pid},
|
|
|
#pool{
|
|
|
all_members = AllMembers,
|
|
|
- free_pids = Free,
|
|
|
- free_count = NumFree,
|
|
|
starting_members = StartingMembers0,
|
|
|
member_start_timeout = StartTimeout
|
|
|
} = Pool) when is_pid(Pid) ->
|
|
|
%% make sure we don't accept a timedout member
|
|
|
- Pool1 = #pool{starting_members = StartingMembers}= remove_stale_starting_members(Pool, StartingMembers0, StartTimeout),
|
|
|
+ Pool1 = #pool{starting_members = StartingMembers} =
|
|
|
+ remove_stale_starting_members(Pool, StartingMembers0, StartTimeout),
|
|
|
case lists:keymember(StarterPid, 1, StartingMembers) of
|
|
|
false ->
|
|
|
%% A starter completed even though we invalidated the pid
|
|
@@ -388,21 +410,74 @@ do_accept_member({StarterPid, Pid},
|
|
|
Entry = {MRef, free, os:timestamp()},
|
|
|
AllMembers1 = store_all_members(Pid, Entry, AllMembers),
|
|
|
pooler_starter:stop(StarterPid),
|
|
|
- Pool#pool{free_pids = Free ++ [Pid],
|
|
|
- free_count = NumFree + 1,
|
|
|
- all_members = AllMembers1,
|
|
|
- starting_members = StartingMembers1}
|
|
|
+ maybe_reply_with_pid(Pid, Pool1#pool{all_members = AllMembers1,
|
|
|
+ starting_members = StartingMembers1})
|
|
|
end;
|
|
|
-do_accept_member({StarterPid, _Reason}, #pool{starting_members = StartingMembers0,
|
|
|
- member_start_timeout = StartTimeout} = Pool) ->
|
|
|
+do_accept_member({StarterPid, _Reason},
|
|
|
+ #pool{starting_members = StartingMembers0,
|
|
|
+ member_start_timeout = StartTimeout} = Pool) ->
|
|
|
%% member start failed, remove in-flight ref and carry on.
|
|
|
pooler_starter:stop(StarterPid),
|
|
|
- Pool1 = #pool{starting_members = StartingMembers} = remove_stale_starting_members(Pool, StartingMembers0,
|
|
|
- StartTimeout),
|
|
|
+ Pool1 = #pool{starting_members = StartingMembers} =
|
|
|
+ remove_stale_starting_members(Pool, StartingMembers0,
|
|
|
+ StartTimeout),
|
|
|
StartingMembers1 = lists:keydelete(StarterPid, 1, StartingMembers),
|
|
|
Pool1#pool{starting_members = StartingMembers1}.
|
|
|
|
|
|
|
|
|
+maybe_reply_with_pid(Pid,
|
|
|
+ Pool = #pool{queued_requestors = QueuedRequestors,
|
|
|
+ free_pids = Free,
|
|
|
+ free_count = NumFree}) when is_pid(Pid) ->
|
|
|
+ case queue:out(QueuedRequestors) of
|
|
|
+ {empty, _} ->
|
|
|
+ Pool#pool{free_pids = [Pid | Free],
|
|
|
+ free_count = NumFree + 1};
|
|
|
+ {{value, {From = {APid, _}, TRef}}, NewQueuedRequestors} when is_pid(APid) ->
|
|
|
+ timer:cancel(TRef),
|
|
|
+ Pool1 = take_member_bookkeeping(Pid, From, NewQueuedRequestors, Pool),
|
|
|
+ send_metric(Pool, in_use_count, Pool1#pool.in_use_count, histogram),
|
|
|
+ send_metric(Pool, free_count, Pool1#pool.free_count, histogram),
|
|
|
+ send_metric(Pool, events, error_no_members, history),
|
|
|
+ gen_server:reply(From, Pid),
|
|
|
+ Pool1
|
|
|
+ end.
|
|
|
+
|
|
|
+-spec take_member_bookkeeping(pid(),
|
|
|
+ {pid(), _},
|
|
|
+ [pid()] | p_requestor_queue(),
|
|
|
+ #pool{}) -> #pool{}.
|
|
|
+take_member_bookkeeping(MemberPid,
|
|
|
+ {CPid, _},
|
|
|
+ Rest,
|
|
|
+ Pool = #pool{in_use_count = NumInUse,
|
|
|
+ free_count = NumFree,
|
|
|
+ consumer_to_pid = CPMap,
|
|
|
+ all_members = AllMembers})
|
|
|
+ when is_pid(MemberPid),
|
|
|
+ is_pid(CPid),
|
|
|
+ is_list(Rest) ->
|
|
|
+ Pool#pool{free_pids = Rest,
|
|
|
+ in_use_count = NumInUse + 1,
|
|
|
+ free_count = NumFree - 1,
|
|
|
+ consumer_to_pid = add_member_to_consumer(MemberPid, CPid, CPMap),
|
|
|
+ all_members = set_cpid_for_member(MemberPid, CPid, AllMembers)
|
|
|
+ };
|
|
|
+take_member_bookkeeping(MemberPid,
|
|
|
+ {ReplyPid, _Tag},
|
|
|
+ NewQueuedRequestors,
|
|
|
+ Pool = #pool{
|
|
|
+ in_use_count = NumInUse,
|
|
|
+ all_members = AllMembers,
|
|
|
+ consumer_to_pid = CPMap
|
|
|
+ }) ->
|
|
|
+ Pool#pool{
|
|
|
+ in_use_count = NumInUse + 1,
|
|
|
+ all_members = set_cpid_for_member(MemberPid, ReplyPid, AllMembers),
|
|
|
+ consumer_to_pid = add_member_to_consumer(MemberPid, ReplyPid, CPMap),
|
|
|
+ queued_requestors = NewQueuedRequestors
|
|
|
+ }.
|
|
|
+
|
|
|
-spec remove_stale_starting_members(#pool{}, [{reference(), erlang:timestamp()}],
|
|
|
time_spec()) -> #pool{}.
|
|
|
remove_stale_starting_members(Pool, StartingMembers, MaxAge) ->
|
|
@@ -458,7 +533,6 @@ take_member_from_pool(#pool{init_count = InitCount,
|
|
|
free_pids = Free,
|
|
|
in_use_count = NumInUse,
|
|
|
free_count = NumFree,
|
|
|
- consumer_to_pid = CPMap,
|
|
|
starting_members = StartingMembers,
|
|
|
member_start_timeout = StartTimeout} = Pool,
|
|
|
From) ->
|
|
@@ -485,15 +559,31 @@ take_member_from_pool(#pool{init_count = InitCount,
|
|
|
send_metric(Pool, events, error_no_members, history),
|
|
|
{error_no_members, Pool2};
|
|
|
[Pid|Rest] ->
|
|
|
- Pool2 = Pool1#pool{free_pids = Rest, in_use_count = NumInUse + 1,
|
|
|
- free_count = NumFree - 1},
|
|
|
+ Pool2 = take_member_bookkeeping(Pid, From, Rest, Pool1),
|
|
|
send_metric(Pool, in_use_count, Pool2#pool.in_use_count, histogram),
|
|
|
send_metric(Pool, free_count, Pool2#pool.free_count, histogram),
|
|
|
- {Pid, Pool2#pool{
|
|
|
- consumer_to_pid = add_member_to_consumer(Pid, From, CPMap),
|
|
|
- all_members = set_cpid_for_member(Pid, From,
|
|
|
- Pool2#pool.all_members)
|
|
|
- }}
|
|
|
+ {Pid, Pool2}
|
|
|
+ end.
|
|
|
+
|
|
|
+-spec take_member_from_pool_queued(#pool{},
|
|
|
+ {pid(), _},
|
|
|
+ non_neg_integer()) ->
|
|
|
+ {error_no_members | queued | pid(), #pool{}}.
|
|
|
+take_member_from_pool_queued(Pool0 = #pool{queue_max = QMax,
|
|
|
+ queued_requestors = Requestors},
|
|
|
+ From = {CPid, _},
|
|
|
+ Timeout) when is_pid(CPid) ->
|
|
|
+ case {take_member_from_pool(Pool0, From), queue:len(Requestors)} of
|
|
|
+ {{error_no_members, Pool1}, QLen} when QLen >= QMax ->
|
|
|
+ send_metric(Pool1, events, error_no_members, history),
|
|
|
+ send_metric(Pool1, queue_max_reached, {inc, 1}, counter),
|
|
|
+ {error_no_members, Pool1};
|
|
|
+ {{error_no_members, Pool1 = #pool{queued_requestors = QueuedRequestors}}, QueueCount} ->
|
|
|
+ {ok, TRef} = timer:send_after(Timeout, {requestor_timeout, From}),
|
|
|
+ send_metric(Pool1, queue_count, QueueCount, histogram),
|
|
|
+ {queued, Pool1#pool{queued_requestors = queue:in({From, TRef}, QueuedRequestors)}};
|
|
|
+ {{Member, NewPool}, _} when is_pid(Member) ->
|
|
|
+ {Member, NewPool}
|
|
|
end.
|
|
|
|
|
|
%% @doc Add `Count' members to `Pool' asynchronously. Returns updated
|
|
@@ -737,7 +827,11 @@ time_as_secs({Time, Unit}) ->
|
|
|
-spec time_as_millis(time_spec()) -> non_neg_integer().
|
|
|
%% @doc Convert time unit into milliseconds.
|
|
|
time_as_millis({Time, Unit}) ->
|
|
|
- time_as_micros({Time, Unit}) div 1000.
|
|
|
+ time_as_micros({Time, Unit}) div 1000;
|
|
|
+%% Allows blind convert
|
|
|
+time_as_millis(Time) when is_integer(Time) ->
|
|
|
+ Time.
|
|
|
+
|
|
|
|
|
|
-spec time_as_micros(time_spec()) -> non_neg_integer().
|
|
|
%% @doc Convert time unit into microseconds
|
|
@@ -752,3 +846,15 @@ time_as_micros({Time, mu}) ->
|
|
|
|
|
|
secs_between({Mega1, Secs1, _}, {Mega2, Secs2, _}) ->
|
|
|
(Mega2 - Mega1) * 1000000 + (Secs2 - Secs1).
|
|
|
+
|
|
|
+-spec maybe_reply({'queued' | 'error_no_members' | pid(), #pool{}}) ->
|
|
|
+ {noreply, #pool{}} | {reply, 'error_no_members' | pid(), #pool{}}.
|
|
|
+maybe_reply({Member, NewPool}) ->
|
|
|
+ case Member of
|
|
|
+ queued ->
|
|
|
+ {noreply, NewPool};
|
|
|
+ error_no_members ->
|
|
|
+ {reply, error_no_members, NewPool};
|
|
|
+ Member when is_pid(Member) ->
|
|
|
+ {reply, Member, NewPool}
|
|
|
+ end.
|