|
@@ -3,7 +3,7 @@
|
|
|
%% @doc This is the main interface to the pooler application
|
|
|
%%
|
|
|
%% To integrate with your application, you probably want to call
|
|
|
-%% application:start(pooler) after having specified appropriate
|
|
|
+%% `application:start(pooler)' after having specified appropriate
|
|
|
%% configuration for the pooler application (either via a config file
|
|
|
%% or appropriate calls to the application module to set the
|
|
|
%% application's config).
|
|
@@ -14,15 +14,6 @@
|
|
|
-include("pooler.hrl").
|
|
|
-include_lib("kernel/include/logger.hrl").
|
|
|
|
|
|
-%% type specs for pool metrics
|
|
|
--type metric_value() ::
|
|
|
- 'unknown_pid'
|
|
|
- | non_neg_integer()
|
|
|
- | {'add_pids_failed', non_neg_integer(), non_neg_integer()}
|
|
|
- | {'inc', 1}
|
|
|
- | 'error_no_members'.
|
|
|
--type metric_type() :: 'counter' | 'histogram' | 'history' | 'meter'.
|
|
|
-
|
|
|
%% ------------------------------------------------------------------
|
|
|
%% API Function Exports
|
|
|
%% ------------------------------------------------------------------
|
|
@@ -71,10 +62,18 @@
|
|
|
%% ------------------------------------------------------------------
|
|
|
%% Types
|
|
|
%% ------------------------------------------------------------------
|
|
|
--export_type([pool_config/0, pool_name/0, group_name/0]).
|
|
|
+-export_type([pool_config/0, pool_name/0, group_name/0, member_info/0, time_unit/0, time_spec/0]).
|
|
|
+
|
|
|
+% Internal
|
|
|
+-export_type([members_map/0, consumers_map/0, requestor_queue/0]).
|
|
|
|
|
|
-type pool_name() :: atom().
|
|
|
+%% The name of the pool
|
|
|
-type group_name() :: atom().
|
|
|
+%% The name of the group pool belongs to
|
|
|
+-type time_unit() :: min | sec | ms | mu.
|
|
|
+-type time_spec() :: {non_neg_integer(), time_unit()}.
|
|
|
+%% Human-friendly way to specify the amount of time
|
|
|
|
|
|
-type pool_config() :: [
|
|
|
{name, pool_name()}
|
|
@@ -92,6 +91,32 @@
|
|
|
| {auto_grow_threshold, non_neg_integer()}
|
|
|
| {add_member_retry, non_neg_integer()}
|
|
|
].
|
|
|
+%% See {@link pooler:new_pool/1}
|
|
|
+
|
|
|
+-type free_member_info() :: {reference(), free, erlang:timestamp()}.
|
|
|
+-type member_info() :: {reference(), free | pid(), erlang:timestamp()}.
|
|
|
+-type members_map() :: #{pid() => member_info()}.
|
|
|
+%% Internal
|
|
|
+-type consumers_map() :: #{pid() => {reference(), [pid()]}}.
|
|
|
+%% Internal
|
|
|
+
|
|
|
+-if(?OTP_RELEASE >= 25).
|
|
|
+-type gen_server_from() :: gen_server:from().
|
|
|
+-else.
|
|
|
+-type gen_server_from() :: {pid(), any()}.
|
|
|
+-endif.
|
|
|
+
|
|
|
+-type requestor_queue() :: queue:queue({gen_server_from(), reference()}).
|
|
|
+%% Internal
|
|
|
+
|
|
|
+% type specs for pool metrics
|
|
|
+-type metric_value() ::
|
|
|
+ 'unknown_pid'
|
|
|
+ | non_neg_integer()
|
|
|
+ | {'add_pids_failed', non_neg_integer(), non_neg_integer()}
|
|
|
+ | {'inc', 1}
|
|
|
+ | 'error_no_members'.
|
|
|
+-type metric_type() :: 'counter' | 'histogram' | 'history' | 'meter'.
|
|
|
|
|
|
%% ------------------------------------------------------------------
|
|
|
%% Application API
|
|
@@ -410,7 +435,7 @@ handle_call({accept_member, Pid}, _From, Pool) ->
|
|
|
handle_call(stop, _From, Pool) ->
|
|
|
{stop, normal, stop_ok, Pool};
|
|
|
handle_call(pool_stats, _From, Pool) ->
|
|
|
- {reply, dict:to_list(Pool#pool.all_members), Pool};
|
|
|
+ {reply, maps:to_list(Pool#pool.all_members), Pool};
|
|
|
handle_call(pool_utilization, _From, Pool) ->
|
|
|
{reply, compute_utilization(Pool), Pool};
|
|
|
handle_call(dump_pool, _From, Pool) ->
|
|
@@ -439,12 +464,12 @@ handle_info({requestor_timeout, From}, Pool = #pool{queued_requestors = Requesto
|
|
|
{noreply, Pool#pool{queued_requestors = NewQueue}};
|
|
|
handle_info({'DOWN', MRef, process, Pid, Reason}, State) ->
|
|
|
State1 =
|
|
|
- case dict:find(Pid, State#pool.all_members) of
|
|
|
- {ok, {_PoolName, _ConsumerPid, _Time}} ->
|
|
|
+ case maps:get(Pid, State#pool.all_members, undefined) of
|
|
|
+ {_PoolName, _ConsumerPid, _Time} ->
|
|
|
do_return_member(Pid, fail, State);
|
|
|
- error ->
|
|
|
- case dict:find(Pid, State#pool.consumer_to_pid) of
|
|
|
- {ok, {MRef, Pids}} ->
|
|
|
+ undefined ->
|
|
|
+ case maps:get(Pid, State#pool.consumer_to_pid, undefined) of
|
|
|
+ {MRef, Pids} ->
|
|
|
IsOk =
|
|
|
case Reason of
|
|
|
normal -> ok;
|
|
@@ -455,7 +480,7 @@ handle_info({'DOWN', MRef, process, Pid, Reason}, State) ->
|
|
|
State,
|
|
|
Pids
|
|
|
);
|
|
|
- error ->
|
|
|
+ undefined ->
|
|
|
State
|
|
|
end
|
|
|
end,
|
|
@@ -558,8 +583,8 @@ reply_to_queued_requestor(TRef, Pid, From = {APid, _}, NewQueuedRequestors, Pool
|
|
|
|
|
|
-spec take_member_bookkeeping(
|
|
|
pid(),
|
|
|
- {pid(), _},
|
|
|
- [pid()] | p_requestor_queue(),
|
|
|
+ gen_server_from(),
|
|
|
+ [pid()] | requestor_queue(),
|
|
|
#pool{}
|
|
|
) -> #pool{}.
|
|
|
take_member_bookkeeping(
|
|
@@ -671,7 +696,7 @@ collect_init_members(#pool{member_start_timeout = StartTimeout} = Pool) ->
|
|
|
timeout
|
|
|
end.
|
|
|
|
|
|
--spec take_member_from_pool(#pool{}, {pid(), term()}) ->
|
|
|
+-spec take_member_from_pool(#pool{}, gen_server_from()) ->
|
|
|
{error_no_members | pid(), #pool{}}.
|
|
|
take_member_from_pool(
|
|
|
#pool{
|
|
@@ -728,7 +753,7 @@ take_member_from_pool(
|
|
|
|
|
|
-spec take_member_from_pool_queued(
|
|
|
#pool{},
|
|
|
- {pid(), _},
|
|
|
+ gen_server_from(),
|
|
|
non_neg_integer()
|
|
|
) ->
|
|
|
{error_no_members | queued | pid(), #pool{}}.
|
|
@@ -777,8 +802,8 @@ do_return_member(
|
|
|
} = Pool
|
|
|
) ->
|
|
|
clean_group_table(Pid, Pool),
|
|
|
- case dict:find(Pid, AllMembers) of
|
|
|
- {ok, {_, free, _}} ->
|
|
|
+ case maps:get(Pid, AllMembers, undefined) of
|
|
|
+ {_, free, _} ->
|
|
|
?LOG_WARNING(
|
|
|
#{
|
|
|
label => "ignored return of free member",
|
|
@@ -788,7 +813,7 @@ do_return_member(
|
|
|
#{domain => [pooler]}
|
|
|
),
|
|
|
Pool;
|
|
|
- {ok, {MRef, CPid, _}} ->
|
|
|
+ {MRef, CPid, _} ->
|
|
|
#pool{
|
|
|
free_pids = Free,
|
|
|
in_use_count = NumInUse,
|
|
@@ -810,18 +835,18 @@ do_return_member(
|
|
|
{{value, {From = {APid, _}, TRef}}, NewQueuedRequestors} when is_pid(APid) ->
|
|
|
reply_to_queued_requestor(TRef, Pid, From, NewQueuedRequestors, Pool2)
|
|
|
end;
|
|
|
- error ->
|
|
|
+ undefined ->
|
|
|
Pool
|
|
|
end;
|
|
|
do_return_member(Pid, fail, #pool{all_members = AllMembers} = Pool) ->
|
|
|
% for the fail case, perhaps the member crashed and was alerady
|
|
|
% removed, so use find instead of fetch and ignore missing.
|
|
|
clean_group_table(Pid, Pool),
|
|
|
- case dict:find(Pid, AllMembers) of
|
|
|
- {ok, {_MRef, _, _}} ->
|
|
|
+ case maps:get(Pid, AllMembers, undefined) of
|
|
|
+ {_MRef, _, _} ->
|
|
|
Pool1 = remove_pid(Pid, Pool),
|
|
|
add_members_async(1, Pool1);
|
|
|
- error ->
|
|
|
+ undefined ->
|
|
|
Pool
|
|
|
end.
|
|
|
|
|
@@ -836,22 +861,22 @@ clean_group_table(MemberPid, #pool{group = _GroupName}) ->
|
|
|
% If `Pid' is the last element in `CPid's pid list, then the `CPid'
|
|
|
% entry is removed entirely.
|
|
|
%
|
|
|
--spec cpmap_remove(pid(), pid() | free, dict:dict()) -> dict:dict().
|
|
|
+-spec cpmap_remove(pid(), pid() | free, consumers_map()) -> consumers_map().
|
|
|
cpmap_remove(_Pid, free, CPMap) ->
|
|
|
CPMap;
|
|
|
cpmap_remove(Pid, CPid, CPMap) ->
|
|
|
- case dict:find(CPid, CPMap) of
|
|
|
- {ok, {MRef, Pids0}} ->
|
|
|
+ case maps:get(CPid, CPMap, undefined) of
|
|
|
+ {MRef, Pids0} ->
|
|
|
Pids1 = lists:delete(Pid, Pids0),
|
|
|
case Pids1 of
|
|
|
[_H | _T] ->
|
|
|
- dict:store(CPid, {MRef, Pids1}, CPMap);
|
|
|
+ CPMap#{CPid => {MRef, Pids1}};
|
|
|
[] ->
|
|
|
%% no more members for this consumer
|
|
|
erlang:demonitor(MRef, [flush]),
|
|
|
- dict:erase(CPid, CPMap)
|
|
|
+ maps:remove(CPid, CPMap)
|
|
|
end;
|
|
|
- error ->
|
|
|
+ undefined ->
|
|
|
% FIXME: this shouldn't happen, should we log or error?
|
|
|
CPMap
|
|
|
end.
|
|
@@ -869,8 +894,8 @@ remove_pid(Pid, Pool) ->
|
|
|
consumer_to_pid = CPMap,
|
|
|
stop_mfa = StopMFA
|
|
|
} = Pool,
|
|
|
- case dict:find(Pid, AllMembers) of
|
|
|
- {ok, {MRef, free, _Time}} ->
|
|
|
+ case maps:get(Pid, AllMembers, undefined) of
|
|
|
+ {MRef, free, _Time} ->
|
|
|
% remove an unused member
|
|
|
erlang:demonitor(MRef, [flush]),
|
|
|
FreePids = lists:delete(Pid, Pool#pool.free_pids),
|
|
@@ -878,8 +903,8 @@ remove_pid(Pid, Pool) ->
|
|
|
Pool1 = Pool#pool{free_pids = FreePids, free_count = NumFree},
|
|
|
terminate_pid(PoolName, Pid, StopMFA),
|
|
|
send_metric(Pool1, killed_free_count, {inc, 1}, counter),
|
|
|
- Pool1#pool{all_members = dict:erase(Pid, AllMembers)};
|
|
|
- {ok, {MRef, CPid, _Time}} ->
|
|
|
+ Pool1#pool{all_members = maps:remove(Pid, AllMembers)};
|
|
|
+ {MRef, CPid, _Time} ->
|
|
|
%% remove a member being consumed. No notice is sent to
|
|
|
%% the consumer.
|
|
|
erlang:demonitor(MRef, [flush]),
|
|
@@ -888,9 +913,9 @@ remove_pid(Pid, Pool) ->
|
|
|
send_metric(Pool1, killed_in_use_count, {inc, 1}, counter),
|
|
|
Pool1#pool{
|
|
|
consumer_to_pid = cpmap_remove(Pid, CPid, CPMap),
|
|
|
- all_members = dict:erase(Pid, AllMembers)
|
|
|
+ all_members = maps:remove(Pid, AllMembers)
|
|
|
};
|
|
|
- error ->
|
|
|
+ undefined ->
|
|
|
?LOG_ERROR(
|
|
|
#{
|
|
|
label => unknown_pid,
|
|
@@ -905,15 +930,15 @@ remove_pid(Pid, Pool) ->
|
|
|
|
|
|
-spec store_all_members(
|
|
|
pid(),
|
|
|
- {reference(), free | pid(), {_, _, _}},
|
|
|
- dict:dict()
|
|
|
-) -> dict:dict().
|
|
|
+ member_info(),
|
|
|
+ members_map()
|
|
|
+) -> members_map().
|
|
|
store_all_members(Pid, Val = {_MRef, _CPid, _Time}, AllMembers) ->
|
|
|
- dict:store(Pid, Val, AllMembers).
|
|
|
+ AllMembers#{Pid => Val}.
|
|
|
|
|
|
--spec set_cpid_for_member(pid(), pid(), dict:dict()) -> dict:dict().
|
|
|
+-spec set_cpid_for_member(pid(), pid(), members_map()) -> members_map().
|
|
|
set_cpid_for_member(MemberPid, CPid, AllMembers) ->
|
|
|
- dict:update(
|
|
|
+ maps:update_with(
|
|
|
MemberPid,
|
|
|
fun({MRef, free, Time = {_, _, _}}) ->
|
|
|
{MRef, CPid, Time}
|
|
@@ -921,16 +946,16 @@ set_cpid_for_member(MemberPid, CPid, AllMembers) ->
|
|
|
AllMembers
|
|
|
).
|
|
|
|
|
|
--spec add_member_to_consumer(pid(), pid(), dict:dict()) -> dict:dict().
|
|
|
+-spec add_member_to_consumer(pid(), pid(), consumers_map()) -> consumers_map().
|
|
|
add_member_to_consumer(MemberPid, CPid, CPMap) ->
|
|
|
- %% we can't use dict:update here because we need to create the
|
|
|
+ %% we can't use maps:update_with here because we need to create the
|
|
|
%% monitor if we aren't already tracking this consumer.
|
|
|
- case dict:find(CPid, CPMap) of
|
|
|
- {ok, {MRef, MList}} ->
|
|
|
- dict:store(CPid, {MRef, [MemberPid | MList]}, CPMap);
|
|
|
- error ->
|
|
|
+ case maps:get(CPid, CPMap, undefined) of
|
|
|
+ {MRef, MList} ->
|
|
|
+ CPMap#{CPid => {MRef, [MemberPid | MList]}};
|
|
|
+ undefined ->
|
|
|
MRef = erlang:monitor(process, CPid),
|
|
|
- dict:store(CPid, {MRef, [MemberPid]}, CPMap)
|
|
|
+ CPMap#{CPid => {MRef, [MemberPid]}}
|
|
|
end.
|
|
|
|
|
|
-spec cull_members_from_pool(#pool{}) -> #pool{}.
|
|
@@ -987,9 +1012,9 @@ schedule_cull(Pool, Delay) ->
|
|
|
DelayMillis = time_as_millis(Delay),
|
|
|
erlang:send_after(DelayMillis, Pool, cull_pool).
|
|
|
|
|
|
--spec member_info([pid()], dict:dict()) -> [{pid(), member_info()}].
|
|
|
+-spec member_info([pid()], members_map()) -> [{pid(), member_info()}].
|
|
|
member_info(Pids, AllMembers) ->
|
|
|
- [{P, dict:fetch(P, AllMembers)} || P <- Pids].
|
|
|
+ maps:to_list(maps:with(Pids, AllMembers)).
|
|
|
|
|
|
-spec expired_free_members(
|
|
|
Members :: [{pid(), member_info()}],
|