|
@@ -11,7 +11,6 @@
|
|
|
-module(pooler).
|
|
|
-behaviour(gen_server).
|
|
|
|
|
|
--include("pooler.hrl").
|
|
|
-include_lib("kernel/include/logger.hrl").
|
|
|
|
|
|
%% ------------------------------------------------------------------
|
|
@@ -44,6 +43,7 @@
|
|
|
call_free_members/2,
|
|
|
call_free_members/3
|
|
|
]).
|
|
|
+-export([create_group_table/0, config_as_map/1]).
|
|
|
|
|
|
%% ------------------------------------------------------------------
|
|
|
%% gen_server Function Exports
|
|
@@ -64,8 +64,98 @@
|
|
|
%% ------------------------------------------------------------------
|
|
|
-export_type([pool_config/0, pool_name/0, group_name/0, member_info/0, time_unit/0, time_spec/0]).
|
|
|
|
|
|
-% Internal
|
|
|
--export_type([members_map/0, consumers_map/0, requestor_queue/0, pool_state/0]).
|
|
|
+-define(DEFAULT_ADD_RETRY, 1).
|
|
|
+-define(DEFAULT_CULL_INTERVAL, {1, min}).
|
|
|
+-define(DEFAULT_MAX_AGE, {30, sec}).
|
|
|
+-define(DEFAULT_MEMBER_START_TIMEOUT, {1, min}).
|
|
|
+-define(DEFAULT_AUTO_GROW_THRESHOLD, undefined).
|
|
|
+-define(POOLER_GROUP_TABLE, pooler_group_table).
|
|
|
+-define(DEFAULT_POOLER_QUEUE_MAX, 50).
|
|
|
+-define(POOLER_POOL_NAME, '$pooler_pool_name').
|
|
|
+-define(POOLER_PID, '$pooler_pid').
|
|
|
+-define(DEFAULT_STOP_MFA, {supervisor, terminate_child, [?POOLER_POOL_NAME, ?POOLER_PID]}).
|
|
|
+
|
|
|
+-record(pool, {
|
|
|
+ name :: atom(),
|
|
|
+ group :: atom(),
|
|
|
+ max_count = 100 :: non_neg_integer(),
|
|
|
+ init_count = 10 :: non_neg_integer(),
|
|
|
+ start_mfa :: {atom(), atom(), [term()]},
|
|
|
+ free_pids = [] :: [pid()],
|
|
|
+ in_use_count = 0 :: non_neg_integer(),
|
|
|
+ free_count = 0 :: non_neg_integer(),
|
|
|
+ %% The number times to attempt adding a pool member if the
|
|
|
+ %% pool size is below max_count and there are no free
|
|
|
+ %% members. After this many tries, error_no_members will be
|
|
|
+ %% returned by a call to take_member. NOTE: this value
|
|
|
+ %% should be >= 2 or else the pool will not grow on demand
|
|
|
+ %% when max_count is larger than init_count.
|
|
|
+ add_member_retry = ?DEFAULT_ADD_RETRY :: non_neg_integer(),
|
|
|
+
|
|
|
+ %% The interval to schedule a cull message. Both
|
|
|
+ %% 'cull_interval' and 'max_age' are specified using a
|
|
|
+ %% `time_spec()' type.
|
|
|
+ cull_interval = ?DEFAULT_CULL_INTERVAL :: time_spec(),
|
|
|
+ %% The maximum age for members.
|
|
|
+ max_age = ?DEFAULT_MAX_AGE :: time_spec(),
|
|
|
+ cull_timer :: reference() | undefined,
|
|
|
+
|
|
|
+ %% The supervisor used to start new members
|
|
|
+ member_sup :: atom() | pid(),
|
|
|
+
|
|
|
+ %% The supervisor used to start starter servers that start
|
|
|
+ %% new members. This is what enables async member starts.
|
|
|
+ starter_sup :: atom() | pid(),
|
|
|
+
|
|
|
+ %% Maps member pid to a tuple of the form:
|
|
|
+ %% {MonitorRef, Status, Time},
|
|
|
+ %% where MonitorRef is a monitor reference for the member,,
|
|
|
+ %% Status is either 'free' or the consumer pid, and Time is
|
|
|
+ %% an Erlang timestamp that records when the member became
|
|
|
+ %% free.
|
|
|
+
|
|
|
+ all_members = #{} :: members_map(),
|
|
|
+
|
|
|
+ %% Maps consumer pid to a tuple of the form:
|
|
|
+ %% {MonitorRef, MemberList} where MonitorRef is a monitor
|
|
|
+ %% reference for the consumer and MemberList is a list of
|
|
|
+ %% members being consumed.
|
|
|
+ consumer_to_pid = #{} :: consumers_map(),
|
|
|
+
|
|
|
+ %% A list of `{References, Timestamp}' tuples representing
|
|
|
+ %% new member start requests that are in-flight. The
|
|
|
+ %% timestamp records when the start request was initiated
|
|
|
+ %% and is used to implement start timeout.
|
|
|
+ starting_members = [] :: [{pid(), erlang:timestamp()}],
|
|
|
+
|
|
|
+ %% The maximum amount of time to allow for member start.
|
|
|
+ member_start_timeout = ?DEFAULT_MEMBER_START_TIMEOUT :: time_spec(),
|
|
|
+
|
|
|
+ %% The optional threshold at which more members will be started if
|
|
|
+ %% free_count drops to this value. Normally undefined, but may be
|
|
|
+ %% set to a non-negative integer in order to enable "anticipatory"
|
|
|
+ %% behavior (start members before they're actually needed).
|
|
|
+ auto_grow_threshold = ?DEFAULT_AUTO_GROW_THRESHOLD :: undefined | non_neg_integer(),
|
|
|
+
|
|
|
+ %% Stop callback to gracefully attempt to terminate pool members.
|
|
|
+ %% The list of arguments must contain the fixed atom '$pooler_pid'.
|
|
|
+ stop_mfa = ?DEFAULT_STOP_MFA :: {atom(), atom(), [term()]},
|
|
|
+
|
|
|
+ %% The module to use for collecting metrics. If set to
|
|
|
+ %% 'pooler_no_metrics', then metric sending calls do
|
|
|
+ %% nothing. A typical value to actually capture metrics is
|
|
|
+ %% folsom_metrics.
|
|
|
+ metrics_mod = pooler_no_metrics :: atom(),
|
|
|
+
|
|
|
+ %% The API used to call the metrics system. It supports both Folsom
|
|
|
+ %% and Exometer format.
|
|
|
+ metrics_api = folsom :: 'folsom' | 'exometer',
|
|
|
+
|
|
|
+ %% A queue of requestors for blocking take member requests
|
|
|
+ queued_requestors = queue:new() :: requestor_queue(),
|
|
|
+ %% The max depth of the queue
|
|
|
+ queue_max = 50 :: non_neg_integer()
|
|
|
+}).
|
|
|
|
|
|
-type pool_name() :: atom().
|
|
|
%% The name of the pool
|
|
@@ -75,32 +165,31 @@
|
|
|
-type time_spec() :: {non_neg_integer(), time_unit()}.
|
|
|
%% Human-friendly way to specify the amount of time
|
|
|
|
|
|
--type pool_config() :: [
|
|
|
- {name, pool_name()}
|
|
|
- | {init_count, non_neg_integer()}
|
|
|
- | {max_count, non_neg_integer()}
|
|
|
- | {start_mfa, {module(), atom(), [any()]}}
|
|
|
- | {group, group_name()}
|
|
|
- | {cull_interval, time_spec()}
|
|
|
- | {max_age, time_spec()}
|
|
|
- | {member_start_timeout, time_spec()}
|
|
|
- | {queue_max, non_neg_integer()}
|
|
|
- | {metrics_api, folsom | exometer}
|
|
|
- | {metrics_mod, module()}
|
|
|
- | {stop_mfa, {module(), atom(), ['$pooler_pid' | any(), ...]}}
|
|
|
- | {auto_grow_threshold, non_neg_integer()}
|
|
|
- | {add_member_retry, non_neg_integer()}
|
|
|
-].
|
|
|
+-type pool_config() ::
|
|
|
+ #{
|
|
|
+ name := pool_name(),
|
|
|
+ init_count := non_neg_integer(),
|
|
|
+ max_count := non_neg_integer(),
|
|
|
+ start_mfa := {module(), atom(), [any()]},
|
|
|
+ group => group_name(),
|
|
|
+ cull_interval => time_spec(),
|
|
|
+ max_age => time_spec(),
|
|
|
+ member_start_timeout => time_spec(),
|
|
|
+ queue_max => non_neg_integer(),
|
|
|
+ metrics_api => folsom | exometer,
|
|
|
+ metrics_mod => module(),
|
|
|
+ stop_mfa => {module(), atom(), ['$pooler_pid' | any(), ...]},
|
|
|
+ auto_grow_threshold => non_neg_integer(),
|
|
|
+ add_member_retry => non_neg_integer()
|
|
|
+ }.
|
|
|
%% See {@link pooler:new_pool/1}
|
|
|
+-type pool_config_legacy() :: [{atom, any()}].
|
|
|
|
|
|
-type free_member_info() :: {reference(), free, erlang:timestamp()}.
|
|
|
-type member_info() :: {reference(), free | pid(), erlang:timestamp()}.
|
|
|
+
|
|
|
-type members_map() :: #{pid() => member_info()}.
|
|
|
-%% Internal
|
|
|
-type consumers_map() :: #{pid() => {reference(), [pid()]}}.
|
|
|
-%% Internal
|
|
|
--type pool_state() :: #pool{}.
|
|
|
-%% Internal
|
|
|
|
|
|
-if(?OTP_RELEASE >= 25).
|
|
|
-type gen_server_from() :: gen_server:from().
|
|
@@ -137,15 +226,21 @@ stop() ->
|
|
|
%% API Function Definitions
|
|
|
%% ------------------------------------------------------------------
|
|
|
|
|
|
-start_link(#pool{name = Name} = Pool) ->
|
|
|
- gen_server:start_link({local, Name}, ?MODULE, Pool, []).
|
|
|
+-spec start_link(pool_config()) -> {ok, pid()} | {error, any()}.
|
|
|
+start_link(#{name := Name, max_count := _, init_count := _, start_mfa := _} = PoolConfig) ->
|
|
|
+ %% PoolConfig may include `metrics_mod' and `metrics_api' at this point
|
|
|
+ gen_server:start_link({local, Name}, ?MODULE, PoolConfig, []).
|
|
|
|
|
|
manual_start() ->
|
|
|
application:start(sasl),
|
|
|
application:start(pooler).
|
|
|
|
|
|
-%% @doc Start a new pool described by the proplist `PoolConfig'. The
|
|
|
-%% following keys are required in the proplist:
|
|
|
+%% @private
|
|
|
+create_group_table() ->
|
|
|
+ ets:new(?POOLER_GROUP_TABLE, [set, public, named_table, {write_concurrency, true}]).
|
|
|
+
|
|
|
+%% @doc Start a new pool described by the map `PoolConfig'. The
|
|
|
+%% following keys are required in the map:
|
|
|
%%
|
|
|
%% <dl>
|
|
|
%% <dt>`name'</dt>
|
|
@@ -189,9 +284,9 @@ manual_start() ->
|
|
|
%% <dd>Time limit for member starts. Specified as `{Time,
|
|
|
%% Unit}'. Defaults to `{1, min}'.</dd>
|
|
|
%% </dl>
|
|
|
--spec new_pool(pool_config()) -> {ok, pid()} | {error, {already_started, pid()}}.
|
|
|
+-spec new_pool(pool_config() | pool_config_legacy()) -> {ok, pid()} | {error, {already_started, pid()}}.
|
|
|
new_pool(PoolConfig) ->
|
|
|
- pooler_sup:new_pool(PoolConfig).
|
|
|
+ pooler_sup:new_pool(config_as_map(PoolConfig)).
|
|
|
|
|
|
%% @doc Terminate the named pool.
|
|
|
-spec rm_pool(pool_name()) -> ok | {error, not_found | running | restarting}.
|
|
@@ -234,9 +329,9 @@ rm_group_members(MemberPids) ->
|
|
|
%% @doc Get child spec described by the proplist `PoolConfig'.
|
|
|
%%
|
|
|
%% See {@link pooler:new_pool/1} for info about `PoolConfig'.
|
|
|
--spec pool_child_spec(pool_config()) -> supervisor:child_spec().
|
|
|
+-spec pool_child_spec(pool_config() | pool_config_legacy()) -> supervisor:child_spec().
|
|
|
pool_child_spec(PoolConfig) ->
|
|
|
- pooler_sup:pool_child_spec(PoolConfig).
|
|
|
+ pooler_sup:pool_child_spec(config_as_map(PoolConfig)).
|
|
|
|
|
|
%% @doc For INTERNAL use. Adds `MemberPid' to the pool.
|
|
|
-spec accept_member(pool_name(), pooler_starter:start_result()) -> ok.
|
|
@@ -406,14 +501,29 @@ call_free_members(PoolName, Fun, Timeout) when
|
|
|
%% gen_server Function Definitions
|
|
|
%% ------------------------------------------------------------------
|
|
|
|
|
|
-init(#pool{} = Pool) ->
|
|
|
- #pool{init_count = N} = Pool,
|
|
|
- MemberSup = pooler_pool_sup:member_sup_name(Pool),
|
|
|
+init(#{name := Name, max_count := MaxCount, init_count := InitCount, start_mfa := StartMFA} = P) ->
|
|
|
+ Pool = #pool{
|
|
|
+ name = Name,
|
|
|
+ group = maps:get(group, P, undefined),
|
|
|
+ max_count = MaxCount,
|
|
|
+ init_count = InitCount,
|
|
|
+ start_mfa = StartMFA,
|
|
|
+ add_member_retry = maps:get(add_member_retry, P, ?DEFAULT_ADD_RETRY),
|
|
|
+ cull_interval = maps:get(cull_interval, P, ?DEFAULT_CULL_INTERVAL),
|
|
|
+ max_age = maps:get(max_age, P, ?DEFAULT_MAX_AGE),
|
|
|
+ member_start_timeout = maps:get(member_start_timeout, P, ?DEFAULT_MEMBER_START_TIMEOUT),
|
|
|
+ auto_grow_threshold = maps:get(auto_grow_threshold, P, ?DEFAULT_AUTO_GROW_THRESHOLD),
|
|
|
+ stop_mfa = maps:get(stop_mfa, P, ?DEFAULT_STOP_MFA),
|
|
|
+ metrics_mod = maps:get(metrics_mod, P, pooler_no_metrics),
|
|
|
+ metrics_api = maps:get(metrics_api, P, folsom),
|
|
|
+ queue_max = maps:get(queue_max, P, ?DEFAULT_POOLER_QUEUE_MAX)
|
|
|
+ },
|
|
|
+ MemberSup = pooler_pool_sup:build_member_sup_name(Name),
|
|
|
Pool1 = set_member_sup(Pool, MemberSup),
|
|
|
%% This schedules the next cull when the pool is configured for
|
|
|
%% such and is otherwise a no-op.
|
|
|
Pool2 = cull_members_from_pool(Pool1),
|
|
|
- {ok, NewPool} = init_members_sync(N, Pool2),
|
|
|
+ {ok, NewPool} = init_members_sync(InitCount, Pool2),
|
|
|
{ok, NewPool, {continue, join_group}}.
|
|
|
|
|
|
handle_continue(join_group, #pool{group = undefined} = Pool) ->
|
|
@@ -1207,6 +1317,13 @@ to_map(#pool{} = Pool) ->
|
|
|
[{'$record_name', Name} | lists:zip(record_info(fields, pool), Values)]
|
|
|
).
|
|
|
|
|
|
+%% @private
|
|
|
+-spec config_as_map(pool_config() | pool_config_legacy()) -> pool_config().
|
|
|
+config_as_map(Conf) when is_map(Conf) ->
|
|
|
+ Conf;
|
|
|
+config_as_map(LegacyConf) when is_list(LegacyConf) ->
|
|
|
+ maps:from_list(LegacyConf).
|
|
|
+
|
|
|
% >= OTP-21
|
|
|
-ifdef(OTP_RELEASE).
|
|
|
-if(?OTP_RELEASE >= 23).
|