|
@@ -10,18 +10,10 @@
|
|
|
%%
|
|
|
-module(pooler).
|
|
|
-behaviour(gen_server).
|
|
|
--define(SERVER, ?MODULE).
|
|
|
-
|
|
|
--define(DEFAULT_ADD_RETRY, 1).
|
|
|
--define(DEFAULT_CULL_INTERVAL, {0, min}).
|
|
|
--define(DEFAULT_MAX_AGE, {0, min}).
|
|
|
|
|
|
+-include("pooler.hrl").
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
|
|
--type member_info() :: {string(), free | pid(), {_, _, _}}.
|
|
|
--type free_member_info() :: {string(), free, {_, _, _}}.
|
|
|
--type time_unit() :: min | sec | ms | mu.
|
|
|
--type time_spec() :: {non_neg_integer(), time_unit()}.
|
|
|
|
|
|
%% type specs for pool metrics
|
|
|
-type metric_label() :: binary().
|
|
@@ -32,56 +24,17 @@
|
|
|
'error_no_members'.
|
|
|
-type metric_type() :: 'counter' | 'histogram' | 'history' | 'meter'.
|
|
|
|
|
|
--record(pool, {
|
|
|
- name :: string(),
|
|
|
- max_count = 100 :: non_neg_integer(),
|
|
|
- init_count = 10 :: non_neg_integer(),
|
|
|
- start_mfa :: {atom(), atom(), [term()]},
|
|
|
- free_pids = [] :: [pid()],
|
|
|
- in_use_count = 0 :: non_neg_integer(),
|
|
|
- free_count = 0 :: non_neg_integer(),
|
|
|
- %% The number times to attempt adding a pool member if the
|
|
|
- %% pool size is below max_count and there are no free
|
|
|
- %% members. After this many tries, error_no_members will be
|
|
|
- %% returned by a call to take_member. NOTE: this value
|
|
|
- %% should be >= 2 or else the pool will not grow on demand
|
|
|
- %% when max_count is larger than init_count.
|
|
|
- add_member_retry = ?DEFAULT_ADD_RETRY :: non_neg_integer(),
|
|
|
-
|
|
|
- %% The interval to schedule a cull message. Both
|
|
|
- %% 'cull_interval' and 'max_age' are specified using a
|
|
|
- %% `time_spec()' type.
|
|
|
- cull_interval = ?DEFAULT_CULL_INTERVAL :: time_spec(),
|
|
|
- %% The maximum age for members.
|
|
|
- max_age = ?DEFAULT_MAX_AGE :: time_spec()
|
|
|
- }).
|
|
|
-
|
|
|
--record(state, {
|
|
|
- npools :: non_neg_integer(),
|
|
|
- pools = dict:new() :: dict(),
|
|
|
- pool_sups = dict:new() :: dict(),
|
|
|
- all_members = dict:new() :: dict(),
|
|
|
- consumer_to_pid = dict:new() :: dict(),
|
|
|
- pool_selector :: array()
|
|
|
- }).
|
|
|
-
|
|
|
--define(gv(X, Y), proplists:get_value(X, Y)).
|
|
|
--define(gv(X, Y, D), proplists:get_value(X, Y, D)).
|
|
|
-
|
|
|
%% ------------------------------------------------------------------
|
|
|
%% API Function Exports
|
|
|
%% ------------------------------------------------------------------
|
|
|
|
|
|
-export([start/1,
|
|
|
start_link/1,
|
|
|
- stop/0,
|
|
|
- take_member/0,
|
|
|
+ stop/1,
|
|
|
take_member/1,
|
|
|
- return_member/1,
|
|
|
return_member/2,
|
|
|
- % remove_pool/2,
|
|
|
- % add_pool/1,
|
|
|
- pool_stats/0]).
|
|
|
+ return_member/3,
|
|
|
+ pool_stats/1]).
|
|
|
|
|
|
%% ------------------------------------------------------------------
|
|
|
%% gen_server Function Exports
|
|
@@ -103,142 +56,99 @@
|
|
|
%% API Function Definitions
|
|
|
%% ------------------------------------------------------------------
|
|
|
|
|
|
-start_link(Config) ->
|
|
|
- gen_server:start_link({local, ?SERVER}, ?MODULE, Config, []).
|
|
|
-
|
|
|
-start(Config) ->
|
|
|
- gen_server:start_link({local, ?SERVER}, ?MODULE, Config, []).
|
|
|
+start_link(#pool{name = Name} = Pool) ->
|
|
|
+ gen_server:start_link({local, Name}, ?MODULE, Pool, []).
|
|
|
|
|
|
-stop() ->
|
|
|
- gen_server:call(?SERVER, stop).
|
|
|
+start(#pool{name = Name} = Pool) ->
|
|
|
+ gen_server:start({local, Name}, ?MODULE, Pool, []).
|
|
|
|
|
|
-%% @doc Obtain exclusive access to a member from a randomly selected pool.
|
|
|
-%%
|
|
|
-%% If there are no free members in the randomly selected pool, then a
|
|
|
-%% member will be returned from the pool with the most free members.
|
|
|
-%% If no free members are available, 'error_no_members' is returned.
|
|
|
-%%
|
|
|
--spec take_member() -> pid() | error_no_members.
|
|
|
-take_member() ->
|
|
|
- gen_server:call(?SERVER, take_member, infinity).
|
|
|
+stop(Name) ->
|
|
|
+ gen_server:call(Name, stop).
|
|
|
|
|
|
%% @doc Obtain exclusive access to a member from `PoolName'.
|
|
|
%%
|
|
|
%% If no free members are available, 'error_no_members' is returned.
|
|
|
%%
|
|
|
--spec take_member(string()) -> pid() | error_no_members | error_no_pool.
|
|
|
-take_member(PoolName) when is_list(PoolName) ->
|
|
|
- gen_server:call(?SERVER, {take_member, PoolName}, infinity).
|
|
|
+-spec take_member(atom() | pid()) -> pid() | error_no_members.
|
|
|
+take_member(PoolName) when is_atom(PoolName) orelse is_pid(PoolName) ->
|
|
|
+ gen_server:call(PoolName, take_member, infinity).
|
|
|
|
|
|
%% @doc Return a member to the pool so it can be reused.
|
|
|
%%
|
|
|
%% If `Status' is 'ok', the member is returned to the pool. If
|
|
|
%% `Status' is 'fail', the member is destroyed and a new member is
|
|
|
%% added to the pool in its place.
|
|
|
--spec return_member(pid() | error_no_members, ok | fail) -> ok.
|
|
|
-return_member(Pid, Status) when is_pid(Pid) andalso
|
|
|
- (Status =:= ok orelse Status =:= fail) ->
|
|
|
- gen_server:call(?SERVER, {return_member, Pid, Status}, infinity),
|
|
|
+-spec return_member(atom() | pid(), pid() | error_no_members, ok | fail) -> ok.
|
|
|
+return_member(PoolName, Pid, Status) when is_pid(Pid) andalso
|
|
|
+ (is_atom(PoolName) orelse
|
|
|
+ is_pid(PoolName)) andalso
|
|
|
+ (Status =:= ok orelse
|
|
|
+ Status =:= fail) ->
|
|
|
+ gen_server:call(PoolName, {return_member, Pid, Status}, infinity),
|
|
|
ok;
|
|
|
-return_member(error_no_members, _) ->
|
|
|
+return_member(_, error_no_members, _) ->
|
|
|
ok.
|
|
|
|
|
|
%% @doc Return a member to the pool so it can be reused.
|
|
|
%%
|
|
|
--spec return_member(pid() | error_no_members) -> ok.
|
|
|
-return_member(Pid) when is_pid(Pid) ->
|
|
|
- gen_server:call(?SERVER, {return_member, Pid, ok}, infinity),
|
|
|
+-spec return_member(atom() | pid(), pid() | error_no_members) -> ok.
|
|
|
+return_member(PoolName, Pid) when is_pid(Pid) andalso
|
|
|
+ (is_atom(PoolName) orelse is_pid(PoolName)) ->
|
|
|
+ gen_server:call(PoolName, {return_member, Pid, ok}, infinity),
|
|
|
ok;
|
|
|
-return_member(error_no_members) ->
|
|
|
+return_member(_, error_no_members) ->
|
|
|
ok.
|
|
|
|
|
|
-% TODO:
|
|
|
-% remove_pool(Name, How) when How == graceful; How == immediate ->
|
|
|
-% gen_server:call(?SERVER, {remove_pool, Name, How}).
|
|
|
-
|
|
|
-% TODO:
|
|
|
-% add_pool(Pool) ->
|
|
|
-% gen_server:call(?SERVER, {add_pool, Pool}).
|
|
|
-
|
|
|
%% @doc Obtain runtime state info for all pools.
|
|
|
%%
|
|
|
%% Format of the return value is subject to change.
|
|
|
--spec pool_stats() -> [tuple()].
|
|
|
-pool_stats() ->
|
|
|
- gen_server:call(?SERVER, pool_stats).
|
|
|
+-spec pool_stats(atom() | pid()) -> [tuple()].
|
|
|
+pool_stats(PoolName) ->
|
|
|
+ gen_server:call(PoolName, pool_stats).
|
|
|
|
|
|
%% ------------------------------------------------------------------
|
|
|
%% gen_server Function Definitions
|
|
|
%% ------------------------------------------------------------------
|
|
|
|
|
|
--spec init([any()]) -> {'ok', #state{npools::'undefined' | non_neg_integer(),
|
|
|
- pools::dict(),
|
|
|
- pool_sups::dict(),
|
|
|
- all_members::dict(),
|
|
|
- consumer_to_pid::dict(),
|
|
|
- pool_selector::'undefined' | array()}}.
|
|
|
-init(Config) ->
|
|
|
+-spec init(#pool{}) -> {'ok', #pool{}}.
|
|
|
+init(#pool{}=Pool) ->
|
|
|
+ %% FIXME: change to a monitor only model so that this doesn't have
|
|
|
+ %% to be a system process.
|
|
|
process_flag(trap_exit, true),
|
|
|
- PoolRecs = [ props_to_pool(P) || P <- ?gv(pools, Config) ],
|
|
|
- Pools = [ {Pool#pool.name, Pool} || Pool <- PoolRecs ],
|
|
|
- PoolSups = [ begin
|
|
|
- {ok, SupPid} = supervisor:start_child(pooler_pool_sup, [MFA]),
|
|
|
- {Name, SupPid}
|
|
|
- end || #pool{name = Name, start_mfa = MFA} <- PoolRecs ],
|
|
|
- State0 = #state{npools = length(Pools),
|
|
|
- pools = dict:from_list(Pools),
|
|
|
- pool_sups = dict:from_list(PoolSups),
|
|
|
- pool_selector = array:from_list([PN || {PN, _} <- Pools])
|
|
|
- },
|
|
|
-
|
|
|
- lists:foldl(fun(#pool{name = PName, init_count = N}, {ok, AccState}) ->
|
|
|
- AccState1 = cull_members(PName, AccState),
|
|
|
- add_pids(PName, N, AccState1)
|
|
|
- end, {ok, State0}, PoolRecs).
|
|
|
-
|
|
|
-handle_call(take_member, {CPid, _Tag},
|
|
|
- #state{pool_selector = PS, npools = NP} = State) ->
|
|
|
- % attempt to return a member from a randomly selected pool. If
|
|
|
- % that pool has no members, find the pool with most free members
|
|
|
- % and return a member from there.
|
|
|
- PoolName = array:get(crypto:rand_uniform(0, NP), PS),
|
|
|
- case take_member(PoolName, CPid, State) of
|
|
|
- {error_no_members, NewState} ->
|
|
|
- case max_free_pool(State#state.pools) of
|
|
|
- error_no_members ->
|
|
|
- {reply, error_no_members, NewState};
|
|
|
- MaxFreePoolName ->
|
|
|
- {NewPid, State2} = take_member(MaxFreePoolName, CPid,
|
|
|
- NewState),
|
|
|
- {reply, NewPid, State2}
|
|
|
- end;
|
|
|
- {NewPid, NewState} ->
|
|
|
- {reply, NewPid, NewState}
|
|
|
- end;
|
|
|
-handle_call({take_member, PoolName}, {CPid, _Tag}, #state{} = State) ->
|
|
|
- {Member, NewState} = take_member(PoolName, CPid, State),
|
|
|
- {reply, Member, NewState};
|
|
|
-handle_call({return_member, Pid, Status}, {_CPid, _Tag}, State) ->
|
|
|
- {reply, ok, do_return_member(Pid, Status, State)};
|
|
|
-handle_call(stop, _From, State) ->
|
|
|
- {stop, normal, stop_ok, State};
|
|
|
-handle_call(pool_stats, _From, State) ->
|
|
|
- {reply, dict:to_list(State#state.all_members), State};
|
|
|
-handle_call(_Request, _From, State) ->
|
|
|
- {noreply, State}.
|
|
|
+ #pool{init_count = N} = Pool,
|
|
|
+ MemberSup = pooler_pool_sup:member_sup_name(Pool),
|
|
|
+ Pool1 = set_member_sup(Pool, MemberSup),
|
|
|
+ Pool2 = cull_members_from_pool(Pool1),
|
|
|
+ add_pids(N, Pool2).
|
|
|
+
|
|
|
+set_member_sup(#pool{} = Pool, MemberSup) ->
|
|
|
+ Pool#pool{member_sup = MemberSup}.
|
|
|
+
|
|
|
+handle_call(take_member, {CPid, _Tag}, #pool{} = Pool) ->
|
|
|
+ Retries = pool_add_retries(Pool),
|
|
|
+ {Member, NewPool} = take_member_from_pool(Pool, CPid, Retries),
|
|
|
+ {reply, Member, NewPool};
|
|
|
+handle_call({return_member, Pid, Status}, {_CPid, _Tag}, Pool) ->
|
|
|
+ {reply, ok, do_return_member(Pid, Status, Pool)};
|
|
|
+handle_call(stop, _From, Pool) ->
|
|
|
+ {stop, normal, stop_ok, Pool};
|
|
|
+handle_call(pool_stats, _From, Pool) ->
|
|
|
+ {reply, dict:to_list(Pool#pool.all_members), Pool};
|
|
|
+handle_call(_Request, _From, Pool) ->
|
|
|
+ {noreply, Pool}.
|
|
|
|
|
|
-spec handle_cast(_,_) -> {'noreply', _}.
|
|
|
-handle_cast(_Msg, State) ->
|
|
|
- {noreply, State}.
|
|
|
+handle_cast(_Msg, Pool) ->
|
|
|
+ {noreply, Pool}.
|
|
|
|
|
|
-spec handle_info(_, _) -> {'noreply', _}.
|
|
|
handle_info({'EXIT', Pid, Reason}, State) ->
|
|
|
State1 =
|
|
|
- case dict:find(Pid, State#state.all_members) of
|
|
|
+ case dict:find(Pid, State#pool.all_members) of
|
|
|
{ok, {_PoolName, _ConsumerPid, _Time}} ->
|
|
|
do_return_member(Pid, fail, State);
|
|
|
error ->
|
|
|
- case dict:find(Pid, State#state.consumer_to_pid) of
|
|
|
+ case dict:find(Pid, State#pool.consumer_to_pid) of
|
|
|
{ok, Pids} ->
|
|
|
IsOk = case Reason of
|
|
|
normal -> ok;
|
|
@@ -252,8 +162,8 @@ handle_info({'EXIT', Pid, Reason}, State) ->
|
|
|
end
|
|
|
end,
|
|
|
{noreply, State1};
|
|
|
-handle_info({cull_pool, PoolName}, State) ->
|
|
|
- {noreply, cull_members(PoolName, State)};
|
|
|
+handle_info(cull_pool, Pool) ->
|
|
|
+ {noreply, cull_members_from_pool(Pool)};
|
|
|
handle_info(_Info, State) ->
|
|
|
{noreply, State}.
|
|
|
|
|
@@ -281,19 +191,17 @@ props_to_pool(P) ->
|
|
|
|
|
|
% FIXME: creation of new pids should probably happen
|
|
|
% in a spawned process to avoid tying up the loop.
|
|
|
--spec add_pids(error | string(), non_neg_integer(), #state{}) ->
|
|
|
- {bad_pool_name | max_count_reached | ok, #state{}}.
|
|
|
-add_pids(error, _N, State) ->
|
|
|
- {bad_pool_name, State};
|
|
|
-add_pids(PoolName, N, State) ->
|
|
|
- #state{pools = Pools, all_members = AllMembers} = State,
|
|
|
- Pool = fetch_pool(PoolName, Pools),
|
|
|
+-spec add_pids(non_neg_integer(), #pool{}) ->
|
|
|
+ {max_count_reached | ok, #pool{}}.
|
|
|
+add_pids(N, Pool) ->
|
|
|
#pool{max_count = Max, free_pids = Free,
|
|
|
- in_use_count = NumInUse, free_count = NumFree} = Pool,
|
|
|
+ in_use_count = NumInUse, free_count = NumFree,
|
|
|
+ member_sup = PoolSup,
|
|
|
+ all_members = AllMembers} = Pool,
|
|
|
Total = NumFree + NumInUse,
|
|
|
+ PoolName = Pool#pool.name,
|
|
|
case Total + N =< Max of
|
|
|
true ->
|
|
|
- PoolSup = dict:fetch(PoolName, State#state.pool_sups),
|
|
|
{AllMembers1, NewPids} = start_n_pids(N, PoolName, PoolSup,
|
|
|
AllMembers),
|
|
|
%% start_n_pids may return fewer than N if errors were
|
|
@@ -302,6 +210,7 @@ add_pids(PoolName, N, State) ->
|
|
|
case NewPidCount =:= N of
|
|
|
true -> ok;
|
|
|
false ->
|
|
|
+ %% FIXME: pool NAME specific
|
|
|
error_logger:error_msg("tried to add ~B members, only added ~B~n",
|
|
|
[N, NewPidCount]),
|
|
|
send_metric(<<"pooler.events">>,
|
|
@@ -309,93 +218,82 @@ add_pids(PoolName, N, State) ->
|
|
|
end,
|
|
|
Pool1 = Pool#pool{free_pids = Free ++ NewPids,
|
|
|
free_count = length(Free) + NewPidCount},
|
|
|
- {ok, State#state{pools = store_pool(PoolName, Pool1, Pools),
|
|
|
- all_members = AllMembers1}};
|
|
|
+ {ok, Pool1#pool{all_members = AllMembers1}};
|
|
|
false ->
|
|
|
- {max_count_reached, State}
|
|
|
+ {max_count_reached, Pool}
|
|
|
end.
|
|
|
|
|
|
--spec take_member(string(), {pid(), _}, #state{}) ->
|
|
|
- {error_no_pool | error_no_members | pid(), #state{}}.
|
|
|
-take_member(PoolName, From, #state{pools = Pools} = State) ->
|
|
|
- Pool = fetch_pool(PoolName, Pools),
|
|
|
- take_member_from_pool(Pool, From, State, pool_add_retries(Pool)).
|
|
|
-
|
|
|
--spec take_member_from_pool(error_no_pool | #pool{}, {pid(), term()}, #state{},
|
|
|
+-spec take_member_from_pool(#pool{}, {pid(), term()},
|
|
|
non_neg_integer()) ->
|
|
|
- {error_no_pool | error_no_members | pid(), #state{}}.
|
|
|
-take_member_from_pool(error_no_pool, _From, State, _) ->
|
|
|
- {error_no_pool, State};
|
|
|
+ {error_no_members | pid(), #pool{}}.
|
|
|
take_member_from_pool(#pool{name = PoolName,
|
|
|
max_count = Max,
|
|
|
free_pids = Free,
|
|
|
in_use_count = NumInUse,
|
|
|
- free_count = NumFree} = Pool,
|
|
|
+ free_count = NumFree,
|
|
|
+ consumer_to_pid = CPMap} = Pool,
|
|
|
From,
|
|
|
- #state{pools = Pools, consumer_to_pid = CPMap} = State,
|
|
|
Retries) ->
|
|
|
send_metric(pool_metric(PoolName, take_rate), 1, meter),
|
|
|
case Free of
|
|
|
[] when NumInUse =:= Max ->
|
|
|
send_metric(<<"pooler.error_no_members_count">>, {inc, 1}, counter),
|
|
|
send_metric(<<"pooler.events">>, error_no_members, history),
|
|
|
- {error_no_members, State};
|
|
|
+ {error_no_members, Pool};
|
|
|
[] when NumInUse < Max andalso Retries > 0 ->
|
|
|
- case add_pids(PoolName, 1, State) of
|
|
|
- {ok, State1} ->
|
|
|
+ case add_pids(1, Pool) of
|
|
|
+ {ok, Pool1} ->
|
|
|
%% add_pids may have updated our pool
|
|
|
- Pool1 = fetch_pool(PoolName, State1#state.pools),
|
|
|
- take_member_from_pool(Pool1, From, State1, Retries - 1);
|
|
|
+ take_member_from_pool(Pool1, From, Retries - 1);
|
|
|
{max_count_reached, _} ->
|
|
|
+ %% FIXME: make pool NAME specific
|
|
|
send_metric(<<"pooler.error_no_members_count">>, {inc, 1}, counter),
|
|
|
send_metric(<<"pooler.events">>, error_no_members, history),
|
|
|
- {error_no_members, State}
|
|
|
+ {error_no_members, Pool}
|
|
|
end;
|
|
|
[] when Retries =:= 0 ->
|
|
|
%% max retries reached
|
|
|
+ %% FIXME: make pool NAME specific
|
|
|
send_metric(<<"pooler.error_no_members_count">>, {inc, 1}, counter),
|
|
|
- {error_no_members, State};
|
|
|
+ {error_no_members, Pool};
|
|
|
[Pid|Rest] ->
|
|
|
erlang:link(From),
|
|
|
Pool1 = Pool#pool{free_pids = Rest, in_use_count = NumInUse + 1,
|
|
|
free_count = NumFree - 1},
|
|
|
send_metric(pool_metric(PoolName, in_use_count), Pool1#pool.in_use_count, histogram),
|
|
|
send_metric(pool_metric(PoolName, free_count), Pool1#pool.free_count, histogram),
|
|
|
- {Pid, State#state{
|
|
|
- pools = store_pool(PoolName, Pool1, Pools),
|
|
|
+ {Pid, Pool1#pool{
|
|
|
consumer_to_pid = add_member_to_consumer(Pid, From, CPMap),
|
|
|
all_members = set_cpid_for_member(Pid, From,
|
|
|
- State#state.all_members)
|
|
|
+ Pool1#pool.all_members)
|
|
|
}}
|
|
|
end.
|
|
|
|
|
|
--spec do_return_member(pid(), ok | fail, #state{}) -> #state{}.
|
|
|
-do_return_member(Pid, ok, #state{all_members = AllMembers} = State) ->
|
|
|
+-spec do_return_member(pid(), ok | fail, #pool{}) -> #pool{}.
|
|
|
+do_return_member(Pid, ok, #pool{all_members = AllMembers} = Pool) ->
|
|
|
case dict:find(Pid, AllMembers) of
|
|
|
{ok, {PoolName, CPid, _}} ->
|
|
|
- Pool = fetch_pool(PoolName, State#state.pools),
|
|
|
#pool{free_pids = Free, in_use_count = NumInUse,
|
|
|
free_count = NumFree} = Pool,
|
|
|
Pool1 = Pool#pool{free_pids = [Pid | Free], in_use_count = NumInUse - 1,
|
|
|
free_count = NumFree + 1},
|
|
|
Entry = {PoolName, free, os:timestamp()},
|
|
|
- State#state{pools = store_pool(PoolName, Pool1, State#state.pools),
|
|
|
- all_members = store_all_members(Pid, Entry, AllMembers),
|
|
|
- consumer_to_pid = cpmap_remove(Pid, CPid,
|
|
|
- State#state.consumer_to_pid)};
|
|
|
+ Pool1#pool{all_members = store_all_members(Pid, Entry, AllMembers),
|
|
|
+ consumer_to_pid = cpmap_remove(Pid, CPid,
|
|
|
+ Pool1#pool.consumer_to_pid)};
|
|
|
error ->
|
|
|
- State
|
|
|
+ Pool
|
|
|
end;
|
|
|
-do_return_member(Pid, fail, #state{all_members = AllMembers} = State) ->
|
|
|
+do_return_member(Pid, fail, #pool{all_members = AllMembers} = Pool) ->
|
|
|
% for the fail case, perhaps the member crashed and was alerady
|
|
|
% removed, so use find instead of fetch and ignore missing.
|
|
|
case dict:find(Pid, AllMembers) of
|
|
|
- {ok, {PoolName, _, _}} ->
|
|
|
- State1 = remove_pid(Pid, State),
|
|
|
- case add_pids(PoolName, 1, State1) of
|
|
|
- {Status, State2} when Status =:= ok;
|
|
|
- Status =:= max_count_reached ->
|
|
|
- State2;
|
|
|
+ {ok, {_PoolName, _, _}} ->
|
|
|
+ Pool1 = remove_pid(Pid, Pool),
|
|
|
+ case add_pids(1, Pool1) of
|
|
|
+ {Status, Pool2} when Status =:= ok;
|
|
|
+ Status =:= max_count_reached ->
|
|
|
+ Pool2;
|
|
|
{Status, _} ->
|
|
|
erlang:error({error, "unexpected return from add_pid",
|
|
|
Status, erlang:get_stacktrace()}),
|
|
@@ -403,7 +301,7 @@ do_return_member(Pid, fail, #state{all_members = AllMembers} = State) ->
|
|
|
history)
|
|
|
end;
|
|
|
error ->
|
|
|
- State
|
|
|
+ Pool
|
|
|
end.
|
|
|
|
|
|
% @doc Remove `Pid' from the pid list associated with `CPid' in the
|
|
@@ -436,34 +334,33 @@ cpmap_remove(Pid, CPid, CPMap) ->
|
|
|
% Handles in-use and free members. Logs an error if the pid is not
|
|
|
% tracked in state.all_members.
|
|
|
%
|
|
|
--spec remove_pid(pid(), #state{}) -> #state{}.
|
|
|
-remove_pid(Pid, State) ->
|
|
|
- #state{all_members = AllMembers, pools = Pools,
|
|
|
- consumer_to_pid = CPMap} = State,
|
|
|
+-spec remove_pid(pid(), #pool{}) -> #pool{}.
|
|
|
+remove_pid(Pid, Pool) ->
|
|
|
+ #pool{all_members = AllMembers,
|
|
|
+ consumer_to_pid = CPMap} = Pool,
|
|
|
case dict:find(Pid, AllMembers) of
|
|
|
- {ok, {PoolName, free, _Time}} ->
|
|
|
+ {ok, {_PoolName, free, _Time}} ->
|
|
|
% remove an unused member
|
|
|
- Pool = fetch_pool(PoolName, Pools),
|
|
|
FreePids = lists:delete(Pid, Pool#pool.free_pids),
|
|
|
NumFree = Pool#pool.free_count - 1,
|
|
|
Pool1 = Pool#pool{free_pids = FreePids, free_count = NumFree},
|
|
|
exit(Pid, kill),
|
|
|
+ %% FIXME: make this pool NAME specific
|
|
|
send_metric(<<"pooler.killed_free_count">>, {inc, 1}, counter),
|
|
|
- State#state{pools = store_pool(PoolName, Pool1, Pools),
|
|
|
- all_members = dict:erase(Pid, AllMembers)};
|
|
|
- {ok, {PoolName, CPid, _Time}} ->
|
|
|
- Pool = fetch_pool(PoolName, Pools),
|
|
|
+ Pool1#pool{all_members = dict:erase(Pid, AllMembers)};
|
|
|
+ {ok, {_PoolName, CPid, _Time}} ->
|
|
|
Pool1 = Pool#pool{in_use_count = Pool#pool.in_use_count - 1},
|
|
|
exit(Pid, kill),
|
|
|
+ %% FIXME: also fix to make pool NAME specific
|
|
|
send_metric(<<"pooler.killed_in_use_count">>, {inc, 1}, counter),
|
|
|
- State#state{pools = store_pool(PoolName, Pool1, Pools),
|
|
|
- consumer_to_pid = cpmap_remove(Pid, CPid, CPMap),
|
|
|
- all_members = dict:erase(Pid, AllMembers)};
|
|
|
+ Pool1#pool{consumer_to_pid = cpmap_remove(Pid, CPid, CPMap),
|
|
|
+ all_members = dict:erase(Pid, AllMembers)};
|
|
|
error ->
|
|
|
+ %% FIXME: make msg and metric pool NAME specific
|
|
|
error_logger:error_report({unknown_pid, Pid,
|
|
|
erlang:get_stacktrace()}),
|
|
|
send_metric(<<"pooler.event">>, unknown_pid, history),
|
|
|
- State
|
|
|
+ Pool
|
|
|
end.
|
|
|
|
|
|
-spec max_free_pool(dict()) -> error_no_members | string().
|
|
@@ -482,12 +379,13 @@ fold_max_free_count(Name, Pool, {CName, CMax}) ->
|
|
|
end.
|
|
|
|
|
|
|
|
|
--spec start_n_pids(non_neg_integer(), string(), pid(), dict()) ->
|
|
|
+-spec start_n_pids(non_neg_integer(), atom() | pid(), pid(), dict()) ->
|
|
|
{dict(), [pid()]}.
|
|
|
start_n_pids(N, PoolName, PoolSup, AllMembers) ->
|
|
|
NewPids = do_n(N, fun(Acc) ->
|
|
|
case supervisor:start_child(PoolSup, []) of
|
|
|
{ok, Pid} ->
|
|
|
+ %% FIXME: we should monitor instead
|
|
|
erlang:link(Pid),
|
|
|
[Pid | Acc];
|
|
|
_Else ->
|
|
@@ -506,23 +404,11 @@ do_n(0, _Fun, Acc) ->
|
|
|
do_n(N, Fun, Acc) ->
|
|
|
do_n(N - 1, Fun, Fun(Acc)).
|
|
|
|
|
|
-
|
|
|
--spec fetch_pool(string(), dict()) -> #pool{} | error_no_pool.
|
|
|
-fetch_pool(PoolName, Pools) ->
|
|
|
- case dict:find(PoolName, Pools) of
|
|
|
- {ok, Pool} -> Pool;
|
|
|
- error -> error_no_pool
|
|
|
- end.
|
|
|
-
|
|
|
pool_add_retries(#pool{add_member_retry = Retries}) ->
|
|
|
Retries;
|
|
|
pool_add_retries(error_no_pool) ->
|
|
|
0.
|
|
|
|
|
|
--spec store_pool(string(), #pool{}, dict()) -> dict().
|
|
|
-store_pool(PoolName, Pool = #pool{}, Pools) ->
|
|
|
- dict:store(PoolName, Pool, Pools).
|
|
|
-
|
|
|
-spec store_all_members(pid(),
|
|
|
{string(), free | pid(), {_, _, _}}, dict()) -> dict().
|
|
|
store_all_members(Pid, Val = {_PoolName, _CPid, _Time}, AllMembers) ->
|
|
@@ -539,39 +425,34 @@ set_cpid_for_member(MemberPid, CPid, AllMembers) ->
|
|
|
add_member_to_consumer(MemberPid, CPid, CPMap) ->
|
|
|
dict:update(CPid, fun(O) -> [MemberPid|O] end, [MemberPid], CPMap).
|
|
|
|
|
|
--spec cull_members(string(), #state{}) -> #state{}.
|
|
|
-cull_members(PoolName, #state{pools = Pools} = State) ->
|
|
|
- cull_members_from_pool(fetch_pool(PoolName, Pools), State).
|
|
|
-
|
|
|
--spec cull_members_from_pool(#pool{}, #state{}) -> #state{}.
|
|
|
-cull_members_from_pool(error_no_pool, State) ->
|
|
|
- State;
|
|
|
-cull_members_from_pool(#pool{cull_interval = {0, _}}, State) ->
|
|
|
+-spec cull_members_from_pool(#pool{}) -> #pool{}.
|
|
|
+cull_members_from_pool(#pool{cull_interval = {0, _}} = Pool) ->
|
|
|
%% 0 cull_interval means do not cull
|
|
|
- State;
|
|
|
+ Pool;
|
|
|
cull_members_from_pool(#pool{name = PoolName,
|
|
|
free_count = FreeCount,
|
|
|
init_count = InitCount,
|
|
|
in_use_count = InUseCount,
|
|
|
cull_interval = Delay,
|
|
|
- max_age = MaxAge} = Pool,
|
|
|
- #state{all_members = AllMembers} = State) ->
|
|
|
+ max_age = MaxAge,
|
|
|
+ all_members = AllMembers} = Pool) ->
|
|
|
MaxCull = FreeCount - (InitCount - InUseCount),
|
|
|
- State1 = case MaxCull > 0 of
|
|
|
- true ->
|
|
|
- MemberInfo = member_info(Pool#pool.free_pids, AllMembers),
|
|
|
- ExpiredMembers =
|
|
|
- expired_free_members(MemberInfo, os:timestamp(), MaxAge),
|
|
|
- CullList = lists:sublist(ExpiredMembers, MaxCull),
|
|
|
- lists:foldl(fun({CullMe, _}, S) -> remove_pid(CullMe, S) end,
|
|
|
- State, CullList);
|
|
|
- false ->
|
|
|
- State
|
|
|
- end,
|
|
|
+ Pool1 = case MaxCull > 0 of
|
|
|
+ true ->
|
|
|
+ MemberInfo = member_info(Pool#pool.free_pids, AllMembers),
|
|
|
+ ExpiredMembers =
|
|
|
+ expired_free_members(MemberInfo, os:timestamp(), MaxAge),
|
|
|
+ CullList = lists:sublist(ExpiredMembers, MaxCull),
|
|
|
+ lists:foldl(fun({CullMe, _}, S) -> remove_pid(CullMe, S) end,
|
|
|
+ Pool, CullList);
|
|
|
+ false ->
|
|
|
+ Pool
|
|
|
+ end,
|
|
|
schedule_cull(PoolName, Delay),
|
|
|
- State1.
|
|
|
+ Pool1.
|
|
|
|
|
|
--spec schedule_cull(PoolName :: string(), Delay :: time_spec()) -> reference().
|
|
|
+-spec schedule_cull(PoolName :: atom() | pid(),
|
|
|
+ Delay :: time_spec()) -> reference().
|
|
|
%% @doc Schedule a pool cleaning or "cull" for `PoolName' in which
|
|
|
%% members older than `max_age' will be removed until the pool has
|
|
|
%% `init_count' members. Uses `erlang:send_after/3' for light-weight
|
|
@@ -580,7 +461,7 @@ schedule_cull(PoolName, Delay) ->
|
|
|
DelayMillis = time_as_millis(Delay),
|
|
|
%% use pid instead of server name atom to take advantage of
|
|
|
%% automatic cancelling
|
|
|
- erlang:send_after(DelayMillis, self(), {cull_pool, PoolName}).
|
|
|
+ erlang:send_after(DelayMillis, PoolName, cull_pool).
|
|
|
|
|
|
-spec member_info([pid()], dict()) -> [{pid(), member_info()}].
|
|
|
member_info(Pids, AllMembers) ->
|
|
@@ -606,10 +487,10 @@ send_metric(Name, Value, Type) ->
|
|
|
end,
|
|
|
ok.
|
|
|
|
|
|
--spec pool_metric(string(), 'free_count' | 'in_use_count' | 'take_rate') -> binary().
|
|
|
+-spec pool_metric(atom(), 'free_count' | 'in_use_count' | 'take_rate') -> binary().
|
|
|
pool_metric(PoolName, Metric) ->
|
|
|
- iolist_to_binary([<<"pooler.">>, PoolName, ".",
|
|
|
- atom_to_binary(Metric, utf8)]).
|
|
|
+ iolist_to_binary([<<"pooler.">>, atom_to_binary(PoolName, utf8),
|
|
|
+ ".", atom_to_binary(Metric, utf8)]).
|
|
|
|
|
|
-spec time_as_millis(time_spec()) -> non_neg_integer().
|
|
|
%% @doc Convert time unit into milliseconds.
|