pooler.erl 47 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370
  1. %% @author Seth Falcon <seth@userprimary.net>
  2. %% @copyright 2011-2013 Seth Falcon
  3. %% @doc This is the main interface to the pooler application
  4. %%
  5. %% To integrate with your application, you probably want to call
  6. %% `application:start(pooler)' after having specified appropriate
  7. %% configuration for the pooler application (either via a config file
  8. %% or appropriate calls to the application module to set the
  9. %% application's config).
  10. %%
  11. -module(pooler).
  12. -behaviour(gen_server).
  13. -include_lib("kernel/include/logger.hrl").
  14. %% ------------------------------------------------------------------
  15. %% API Function Exports
  16. %% ------------------------------------------------------------------
  17. -export([
  18. start/0,
  19. stop/0
  20. ]).
  21. -export([
  22. accept_member/2,
  23. start_link/1,
  24. take_member/1,
  25. take_member/2,
  26. take_group_member/1,
  27. return_group_member/2,
  28. return_group_member/3,
  29. group_pools/1,
  30. return_member/2,
  31. return_member/3,
  32. pool_stats/1,
  33. pool_utilization/1,
  34. manual_start/0,
  35. new_pool/1,
  36. pool_child_spec/1,
  37. rm_pool/1,
  38. rm_group/1,
  39. call_free_members/2,
  40. call_free_members/3
  41. ]).
  42. -export([create_group_table/0, config_as_map/1]).
  43. %% ------------------------------------------------------------------
  44. %% gen_server Function Exports
  45. %% ------------------------------------------------------------------
  46. -export([
  47. init/1,
  48. handle_continue/2,
  49. handle_call/3,
  50. handle_cast/2,
  51. handle_info/2,
  52. terminate/2,
  53. code_change/3
  54. ]).
  55. %% ------------------------------------------------------------------
  56. %% Types
  57. %% ------------------------------------------------------------------
  58. -export_type([pool_config/0, pool_name/0, group_name/0, member_info/0, time_unit/0, time_spec/0]).
  59. -define(DEFAULT_ADD_RETRY, 1).
  60. -define(DEFAULT_CULL_INTERVAL, {1, min}).
  61. -define(DEFAULT_MAX_AGE, {30, sec}).
  62. -define(DEFAULT_MEMBER_START_TIMEOUT, {1, min}).
  63. -define(DEFAULT_AUTO_GROW_THRESHOLD, undefined).
  64. -define(POOLER_GROUP_TABLE, pooler_group_table).
  65. -define(DEFAULT_POOLER_QUEUE_MAX, 50).
  66. -define(POOLER_POOL_NAME, '$pooler_pool_name').
  67. -define(POOLER_PID, '$pooler_pid').
  68. -define(DEFAULT_STOP_MFA, {supervisor, terminate_child, [?POOLER_POOL_NAME, ?POOLER_PID]}).
  69. -record(pool, {
  70. name :: atom(),
  71. group :: atom(),
  72. max_count = 100 :: non_neg_integer(),
  73. init_count = 10 :: non_neg_integer(),
  74. start_mfa :: {atom(), atom(), [term()]},
  75. free_pids = [] :: [pid()],
  76. in_use_count = 0 :: non_neg_integer(),
  77. free_count = 0 :: non_neg_integer(),
  78. %% The number times to attempt adding a pool member if the
  79. %% pool size is below max_count and there are no free
  80. %% members. After this many tries, error_no_members will be
  81. %% returned by a call to take_member. NOTE: this value
  82. %% should be >= 2 or else the pool will not grow on demand
  83. %% when max_count is larger than init_count.
  84. add_member_retry = ?DEFAULT_ADD_RETRY :: non_neg_integer(),
  85. %% The interval to schedule a cull message. Both
  86. %% 'cull_interval' and 'max_age' are specified using a
  87. %% `time_spec()' type.
  88. cull_interval = ?DEFAULT_CULL_INTERVAL :: time_spec(),
  89. %% The maximum age for members.
  90. max_age = ?DEFAULT_MAX_AGE :: time_spec(),
  91. cull_timer :: reference() | undefined,
  92. %% The supervisor used to start new members
  93. member_sup :: atom() | pid(),
  94. %% The supervisor used to start starter servers that start
  95. %% new members. This is what enables async member starts.
  96. starter_sup :: atom() | pid(),
  97. %% Maps member pid to a tuple of the form:
  98. %% {MonitorRef, Status, Time},
  99. %% where MonitorRef is a monitor reference for the member,,
  100. %% Status is either 'free' or the consumer pid, and Time is
  101. %% an Erlang timestamp that records when the member became
  102. %% free.
  103. all_members = #{} :: members_map(),
  104. %% Maps consumer pid to a tuple of the form:
  105. %% {MonitorRef, MemberList} where MonitorRef is a monitor
  106. %% reference for the consumer and MemberList is a list of
  107. %% members being consumed.
  108. consumer_to_pid = #{} :: consumers_map(),
  109. %% A list of `{References, Timestamp}' tuples representing
  110. %% new member start requests that are in-flight. The
  111. %% timestamp records when the start request was initiated
  112. %% and is used to implement start timeout.
  113. starting_members = [] :: [{pid(), erlang:timestamp()}],
  114. %% The maximum amount of time to allow for member start.
  115. member_start_timeout = ?DEFAULT_MEMBER_START_TIMEOUT :: time_spec(),
  116. %% The optional threshold at which more members will be started if
  117. %% free_count drops to this value. Normally undefined, but may be
  118. %% set to a non-negative integer in order to enable "anticipatory"
  119. %% behavior (start members before they're actually needed).
  120. auto_grow_threshold = ?DEFAULT_AUTO_GROW_THRESHOLD :: undefined | non_neg_integer(),
  121. %% Stop callback to gracefully attempt to terminate pool members.
  122. %% The list of arguments must contain the fixed atom '$pooler_pid'.
  123. stop_mfa = ?DEFAULT_STOP_MFA :: {atom(), atom(), [term()]},
  124. %% The module to use for collecting metrics. If set to
  125. %% 'pooler_no_metrics', then metric sending calls do
  126. %% nothing. A typical value to actually capture metrics is
  127. %% folsom_metrics.
  128. metrics_mod = pooler_no_metrics :: atom(),
  129. %% The API used to call the metrics system. It supports both Folsom
  130. %% and Exometer format.
  131. metrics_api = folsom :: 'folsom' | 'exometer',
  132. %% A queue of requestors for blocking take member requests
  133. queued_requestors = queue:new() :: requestor_queue(),
  134. %% The max depth of the queue
  135. queue_max = 50 :: non_neg_integer()
  136. }).
  137. -type pool_name() :: atom().
  138. %% The name of the pool
  139. -type group_name() :: atom().
  140. %% The name of the group pool belongs to
  141. -type time_unit() :: min | sec | ms | mu.
  142. -type time_spec() :: {non_neg_integer(), time_unit()}.
  143. %% Human-friendly way to specify the amount of time
  144. -type pool_config() ::
  145. #{
  146. name := pool_name(),
  147. init_count := non_neg_integer(),
  148. max_count := non_neg_integer(),
  149. start_mfa := {module(), atom(), [any()]},
  150. group => group_name(),
  151. cull_interval => time_spec(),
  152. max_age => time_spec(),
  153. member_start_timeout => time_spec(),
  154. queue_max => non_neg_integer(),
  155. metrics_api => folsom | exometer,
  156. metrics_mod => module(),
  157. stop_mfa => {module(), atom(), ['$pooler_pid' | any(), ...]},
  158. auto_grow_threshold => non_neg_integer(),
  159. add_member_retry => non_neg_integer()
  160. }.
  161. %% See {@link pooler:new_pool/1}
  162. -type pool_config_legacy() :: [{atom, any()}].
  163. -type free_member_info() :: {reference(), free, erlang:timestamp()}.
  164. -type member_info() :: {reference(), free | pid(), erlang:timestamp()}.
  165. -type members_map() :: #{pid() => member_info()}.
  166. -type consumers_map() :: #{pid() => {reference(), [pid()]}}.
  167. -if(?OTP_RELEASE >= 25).
  168. -type gen_server_from() :: gen_server:from().
  169. -else.
  170. -type gen_server_from() :: {pid(), any()}.
  171. -endif.
  172. -type requestor_queue() :: queue:queue({gen_server_from(), reference()}).
  173. %% Internal
  174. % type specs for pool metrics
  175. -type metric_value() ::
  176. 'unknown_pid'
  177. | non_neg_integer()
  178. | {'add_pids_failed', non_neg_integer(), non_neg_integer()}
  179. | {'inc', 1}
  180. | 'error_no_members'.
  181. -type metric_type() :: 'counter' | 'histogram' | 'history' | 'meter'.
  182. %% ------------------------------------------------------------------
  183. %% Application API
  184. %% ------------------------------------------------------------------
  185. -spec start() -> 'ok'.
  186. start() ->
  187. {ok, _} = application:ensure_all_started(pooler),
  188. ok.
  189. -spec stop() -> 'ok'.
  190. stop() ->
  191. ok = application:stop(pooler).
  192. %% ------------------------------------------------------------------
  193. %% API Function Definitions
  194. %% ------------------------------------------------------------------
  195. -spec start_link(pool_config()) -> {ok, pid()} | {error, any()}.
  196. start_link(#{name := Name, max_count := _, init_count := _, start_mfa := _} = PoolConfig) ->
  197. %% PoolConfig may include `metrics_mod' and `metrics_api' at this point
  198. gen_server:start_link({local, Name}, ?MODULE, PoolConfig, []).
  199. manual_start() ->
  200. application:start(sasl),
  201. application:start(pooler).
  202. %% @private
  203. create_group_table() ->
  204. ets:new(?POOLER_GROUP_TABLE, [set, public, named_table, {write_concurrency, true}]).
  205. %% @doc Start a new pool described by the map `PoolConfig'. The
  206. %% following keys are required in the map:
  207. %%
  208. %% <dl>
  209. %% <dt>`name'</dt>
  210. %% <dd>An atom giving the name of the pool.</dd>
  211. %% <dt>`init_count'</dt>
  212. %% <dd>Number of members to add to the pool at start. When the pool is
  213. %% started, `init_count' members will be started in parallel.</dd>
  214. %% <dt>`max_count'</dt>
  215. %% <dd>Maximum number of members in the pool.</dd>
  216. %% <dt>`start_mfa'</dt>
  217. %% <dd>A tuple of the form `{Mod, Fun, Args}' describing how to start
  218. %% new pool members.</dd>
  219. %% </dl>
  220. %%
  221. %% In addition, you can specify any of the following optional
  222. %% configuration options:
  223. %%
  224. %% <dl>
  225. %% <dt>`group'</dt>
  226. %% <dd>An atom giving the name of the group this pool belongs
  227. %% to. Pools sharing a common `group' value can be accessed using
  228. %% {@link take_group_member/1} and {@link return_group_member/2}.</dd>
  229. %% <dt>`cull_interval'</dt>
  230. %% <dd>Time between checks for stale pool members. Specified as
  231. %% `{Time, Unit}' where `Time' is a non-negative integer and `Unit' is
  232. %% one of `min', `sec', `ms', or `mu'. The default value of `{1, min}'
  233. %% triggers a once per minute check to remove members that have not
  234. %% been accessed in `max_age' time units. Culling can be disabled by
  235. %% specifying a zero time vaule (e.g. `{0, min}'. Culling will also be
  236. %% disabled if `init_count' is the same as `max_count'.</dd>
  237. %% <dt>`max_age'</dt>
  238. %% <dd>Members idle longer than `max_age' time units are removed from
  239. %% the pool when stale checking is enabled via
  240. %% `cull_interval'. Culling of idle members will never reduce the pool
  241. %% below `init_count'. The value is specified as `{Time, Unit}'. Note
  242. %% that timers are not set on individual pool members and may remain
  243. %% in the pool beyond the configured `max_age' value since members are
  244. %% only removed on the interval configured via `cull_interval'. The
  245. %% default value is `{30, sec}'.</dd>
  246. %% <dt>`member_start_timeout'</dt>
  247. %% <dd>Time limit for member starts. Specified as `{Time,
  248. %% Unit}'. Defaults to `{1, min}'.</dd>
  249. %% </dl>
  250. -spec new_pool(pool_config() | pool_config_legacy()) -> {ok, pid()} | {error, {already_started, pid()}}.
  251. new_pool(PoolConfig) ->
  252. pooler_sup:new_pool(config_as_map(PoolConfig)).
  253. %% @doc Terminate the named pool.
  254. -spec rm_pool(pool_name()) -> ok | {error, not_found | running | restarting}.
  255. rm_pool(PoolName) ->
  256. pooler_sup:rm_pool(PoolName).
  257. %% @doc Terminates the group and all pools in that group.
  258. %%
  259. %% If termination of any member pool fails, `rm_group/1' returns
  260. %% `{error, {failed_delete_pools, Pools}}', where `Pools' is a list
  261. %% of pools that failed to terminate.
  262. %%
  263. %% The group is NOT terminated if any member pool did not
  264. %% successfully terminate.
  265. %%
  266. -spec rm_group(group_name()) -> ok | {error, {failed_rm_pools, [atom()]}}.
  267. rm_group(GroupName) ->
  268. Pools = pg_get_local_members(GroupName),
  269. case rm_group_members(Pools) of
  270. [] ->
  271. pg_delete(GroupName);
  272. Failures ->
  273. {error, {failed_rm_pools, Failures}}
  274. end.
  275. -spec rm_group_members([pid()]) -> [atom()].
  276. rm_group_members(MemberPids) ->
  277. lists:foldl(
  278. fun(MemberPid, Acc) ->
  279. #{name := PoolName} = gen_server:call(MemberPid, dump_pool),
  280. case pooler_sup:rm_pool(PoolName) of
  281. ok -> Acc;
  282. _ -> [PoolName | Acc]
  283. end
  284. end,
  285. [],
  286. MemberPids
  287. ).
  288. %% @doc Get child spec described by the proplist `PoolConfig'.
  289. %%
  290. %% See {@link pooler:new_pool/1} for info about `PoolConfig'.
  291. -spec pool_child_spec(pool_config() | pool_config_legacy()) -> supervisor:child_spec().
  292. pool_child_spec(PoolConfig) ->
  293. pooler_sup:pool_child_spec(config_as_map(PoolConfig)).
  294. %% @doc For INTERNAL use. Adds `MemberPid' to the pool.
  295. -spec accept_member(pool_name(), pooler_starter:start_result()) -> ok.
  296. accept_member(PoolName, StartResult) ->
  297. gen_server:call(PoolName, {accept_member, StartResult}).
  298. %% @doc Obtain exclusive access to a member from `PoolName'.
  299. %%
  300. %% If no free members are available, 'error_no_members' is returned.
  301. %%
  302. -spec take_member(pool_name() | pid()) -> pid() | error_no_members.
  303. take_member(PoolName) when is_atom(PoolName) orelse is_pid(PoolName) ->
  304. gen_server:call(PoolName, {take_member, 0}, infinity).
  305. %% @doc Obtain exclusive access to a member of 'PoolName'.
  306. %%
  307. %% If no members are available, wait for up to Timeout milliseconds for a member
  308. %% to become available. Waiting requests are served in FIFO order. If no member
  309. %% is available within the specified timeout, error_no_members is returned.
  310. %% `Timeout' can be either milliseconds as integer or `{duration, time_unit}'
  311. %%
  312. -spec take_member(pool_name() | pid(), non_neg_integer() | time_spec()) -> pid() | error_no_members.
  313. take_member(PoolName, Timeout) when is_atom(PoolName) orelse is_pid(PoolName) ->
  314. gen_server:call(PoolName, {take_member, time_as_millis(Timeout)}, infinity).
  315. %% @doc Take a member from a randomly selected member of the group
  316. %% `GroupName'. Returns `MemberPid' or `error_no_members'. If no
  317. %% members are available in the randomly chosen pool, all other pools
  318. %% in the group are tried in order.
  319. -spec take_group_member(group_name()) -> pid() | error_no_members.
  320. take_group_member(GroupName) ->
  321. case pg_get_local_members(GroupName) of
  322. [] ->
  323. error_no_members;
  324. Pools ->
  325. %% Put a random member at the front of the list and then
  326. %% return the first member you can walking the list.
  327. {_, _, X} = os:timestamp(),
  328. Idx = (X rem length(Pools)) + 1,
  329. {PoolPid, Rest} = extract_nth(Idx, Pools),
  330. take_first_pool([PoolPid | Rest])
  331. end.
  332. take_first_pool([PoolPid | Rest]) ->
  333. case take_member(PoolPid) of
  334. error_no_members ->
  335. take_first_pool(Rest);
  336. Member ->
  337. ets:insert(?POOLER_GROUP_TABLE, {Member, PoolPid}),
  338. Member
  339. end;
  340. take_first_pool([]) ->
  341. error_no_members.
  342. %% this helper function returns `{Nth_Elt, Rest}' where `Nth_Elt' is
  343. %% the nth element of `L' and `Rest' is `L -- [Nth_Elt]'.
  344. extract_nth(N, L) ->
  345. extract_nth(N, L, []).
  346. extract_nth(1, [H | T], Acc) ->
  347. {H, Acc ++ T};
  348. extract_nth(N, [H | T], Acc) ->
  349. extract_nth(N - 1, T, [H | Acc]);
  350. extract_nth(_, [], _) ->
  351. error(badarg).
  352. %% @doc Return a member that was taken from the group
  353. %% `GroupName'. This is a convenience function for
  354. %% `return_group_member/3' with `Status' of `ok'.
  355. -spec return_group_member(group_name(), pid() | error_no_members) -> ok.
  356. return_group_member(GroupName, MemberPid) ->
  357. return_group_member(GroupName, MemberPid, ok).
  358. %% @doc Return a member that was taken from the group `GroupName'. If
  359. %% `Status' is `ok' the member is returned to the pool from which is
  360. %% came. If `Status' is `fail' the member will be terminated and a new
  361. %% member added to the appropriate pool.
  362. -spec return_group_member(group_name(), pid() | error_no_members, ok | fail) -> ok.
  363. return_group_member(_, error_no_members, _) ->
  364. ok;
  365. return_group_member(_GroupName, MemberPid, Status) when is_pid(MemberPid) ->
  366. case ets:lookup(?POOLER_GROUP_TABLE, MemberPid) of
  367. [{MemberPid, PoolPid}] ->
  368. return_member(PoolPid, MemberPid, Status);
  369. [] ->
  370. ok
  371. end.
  372. %% @doc Return a member to the pool so it can be reused.
  373. %%
  374. %% If `Status' is 'ok', the member is returned to the pool. If
  375. %% `Status' is 'fail', the member is destroyed and a new member is
  376. %% added to the pool in its place.
  377. -spec return_member(pool_name() | pid(), pid() | error_no_members, ok | fail) -> ok.
  378. return_member(PoolName, Pid, Status) when
  379. is_pid(Pid) andalso
  380. (is_atom(PoolName) orelse
  381. is_pid(PoolName)) andalso
  382. (Status =:= ok orelse
  383. Status =:= fail)
  384. ->
  385. gen_server:call(PoolName, {return_member, Pid, Status}, infinity),
  386. ok;
  387. return_member(_, error_no_members, _) ->
  388. ok.
  389. %% @doc Return a member to the pool so it can be reused.
  390. %%
  391. -spec return_member(pool_name() | pid(), pid() | error_no_members) -> ok.
  392. return_member(PoolName, Pid) when
  393. is_pid(Pid) andalso
  394. (is_atom(PoolName) orelse is_pid(PoolName))
  395. ->
  396. gen_server:call(PoolName, {return_member, Pid, ok}, infinity),
  397. ok;
  398. return_member(_, error_no_members) ->
  399. ok.
  400. %% @doc Obtain runtime state info for all workers.
  401. %%
  402. %% Format of the return value is subject to change.
  403. -spec pool_stats(pool_name() | pid()) -> [{pid(), {reference(), free | pid(), erlang:timestamp()}}].
  404. pool_stats(PoolName) ->
  405. gen_server:call(PoolName, pool_stats).
  406. %% @doc Obtain the pids of all pools which are members of the group.
  407. -spec group_pools(group_name()) -> [pid()].
  408. group_pools(GroupName) ->
  409. pg_get_local_members(GroupName).
  410. %% @doc Obtain utilization info for a pool.
  411. %%
  412. %% Format of the return value is subject to change, but for now it
  413. %% will be a proplist to maintain backcompat with R16.
  414. -spec pool_utilization(pool_name() | pid()) ->
  415. [
  416. {max_count, pos_integer()}
  417. | {in_use_count, non_neg_integer()}
  418. | {free_count, non_neg_integer()}
  419. | {starting_count, non_neg_integer()}
  420. | {queued_count, non_neg_integer()}
  421. | {queue_max, non_neg_integer()}
  422. ].
  423. pool_utilization(PoolName) ->
  424. gen_server:call(PoolName, pool_utilization).
  425. %% @doc Invokes `Fun' with arity 1 over all free members in pool with `PoolName'.
  426. %%
  427. -spec call_free_members(pool_name() | pid(), fun((pid()) -> term())) -> Res when
  428. Res :: [{ok, term()} | {error, term()}].
  429. call_free_members(PoolName, Fun) when
  430. (is_atom(PoolName) orelse is_pid(PoolName)) andalso is_function(Fun, 1)
  431. ->
  432. call_free_members(PoolName, Fun, infinity).
  433. %% @doc Invokes `Fun' with arity 1 over all free members in pool with `PoolName'.
  434. %% `Timeout' sets the timeout of gen_server call.
  435. -spec call_free_members(pool_name() | pid(), Fun, timeout()) -> Res when
  436. Fun :: fun((pid()) -> term()),
  437. Res :: [{ok, term()} | {error, term()}].
  438. call_free_members(PoolName, Fun, Timeout) when
  439. (is_atom(PoolName) orelse is_pid(PoolName)) andalso is_function(Fun, 1)
  440. ->
  441. gen_server:call(PoolName, {call_free_members, Fun}, Timeout).
  442. %% ------------------------------------------------------------------
  443. %% gen_server Function Definitions
  444. %% ------------------------------------------------------------------
  445. init(#{name := Name, max_count := MaxCount, init_count := InitCount, start_mfa := StartMFA} = P) ->
  446. Pool = #pool{
  447. name = Name,
  448. group = maps:get(group, P, undefined),
  449. max_count = MaxCount,
  450. init_count = InitCount,
  451. start_mfa = StartMFA,
  452. add_member_retry = maps:get(add_member_retry, P, ?DEFAULT_ADD_RETRY),
  453. cull_interval = maps:get(cull_interval, P, ?DEFAULT_CULL_INTERVAL),
  454. max_age = maps:get(max_age, P, ?DEFAULT_MAX_AGE),
  455. member_start_timeout = maps:get(member_start_timeout, P, ?DEFAULT_MEMBER_START_TIMEOUT),
  456. auto_grow_threshold = maps:get(auto_grow_threshold, P, ?DEFAULT_AUTO_GROW_THRESHOLD),
  457. stop_mfa = maps:get(stop_mfa, P, ?DEFAULT_STOP_MFA),
  458. metrics_mod = maps:get(metrics_mod, P, pooler_no_metrics),
  459. metrics_api = maps:get(metrics_api, P, folsom),
  460. queue_max = maps:get(queue_max, P, ?DEFAULT_POOLER_QUEUE_MAX)
  461. },
  462. MemberSup = pooler_pool_sup:build_member_sup_name(Name),
  463. Pool1 = set_member_sup(Pool, MemberSup),
  464. %% This schedules the next cull when the pool is configured for
  465. %% such and is otherwise a no-op.
  466. Pool2 = cull_members_from_pool(Pool1),
  467. {ok, NewPool} = init_members_sync(InitCount, Pool2),
  468. {ok, NewPool, {continue, join_group}}.
  469. handle_continue(join_group, #pool{group = undefined} = Pool) ->
  470. %% ignore
  471. {noreply, Pool};
  472. handle_continue(join_group, #pool{group = Group} = Pool) ->
  473. ok = pg_create(Group),
  474. ok = pg_join(Group, self()),
  475. {noreply, Pool}.
  476. set_member_sup(#pool{} = Pool, MemberSup) ->
  477. Pool#pool{member_sup = MemberSup}.
  478. handle_call({take_member, Timeout}, From = {APid, _}, #pool{} = Pool) when is_pid(APid) ->
  479. maybe_reply(take_member_from_pool_queued(Pool, From, Timeout));
  480. handle_call({return_member, Pid, Status}, {_CPid, _Tag}, Pool) ->
  481. {reply, ok, do_return_member(Pid, Status, Pool)};
  482. handle_call({accept_member, StartResult}, _From, Pool) ->
  483. {reply, ok, do_accept_member(StartResult, Pool)};
  484. handle_call(stop, _From, Pool) ->
  485. {stop, normal, stop_ok, Pool};
  486. handle_call(pool_stats, _From, Pool) ->
  487. {reply, maps:to_list(Pool#pool.all_members), Pool};
  488. handle_call(pool_utilization, _From, Pool) ->
  489. {reply, compute_utilization(Pool), Pool};
  490. handle_call(dump_pool, _From, Pool) ->
  491. {reply, to_map(Pool), Pool};
  492. handle_call({call_free_members, Fun}, _From, #pool{free_pids = Pids} = Pool) ->
  493. {reply, do_call_free_members(Fun, Pids), Pool};
  494. handle_call(_Request, _From, Pool) ->
  495. {noreply, Pool}.
  496. -spec handle_cast(_, _) -> {'noreply', _}.
  497. handle_cast(_Msg, Pool) ->
  498. {noreply, Pool}.
  499. -spec handle_info(_, _) -> {'noreply', _}.
  500. handle_info({requestor_timeout, From}, Pool = #pool{queued_requestors = RequestorQueue}) ->
  501. NewQueue = queue:filter(
  502. fun
  503. ({RequestorFrom, _TRef}) when RequestorFrom =:= From ->
  504. gen_server:reply(RequestorFrom, error_no_members),
  505. false;
  506. ({_, _}) ->
  507. true
  508. end,
  509. RequestorQueue
  510. ),
  511. {noreply, Pool#pool{queued_requestors = NewQueue}};
  512. handle_info({'DOWN', MRef, process, Pid, Reason}, State) ->
  513. State1 =
  514. case maps:get(Pid, State#pool.all_members, undefined) of
  515. {_PoolName, _ConsumerPid, _Time} ->
  516. do_return_member(Pid, fail, State);
  517. undefined ->
  518. case maps:get(Pid, State#pool.consumer_to_pid, undefined) of
  519. {MRef, Pids} ->
  520. IsOk =
  521. case Reason of
  522. normal -> ok;
  523. _Crash -> fail
  524. end,
  525. lists:foldl(
  526. fun(P, S) -> do_return_member(P, IsOk, S) end,
  527. State,
  528. Pids
  529. );
  530. undefined ->
  531. State
  532. end
  533. end,
  534. {noreply, State1};
  535. handle_info(cull_pool, Pool) ->
  536. {noreply, cull_members_from_pool(Pool)};
  537. handle_info(_Info, State) ->
  538. {noreply, State}.
  539. -spec terminate(_, _) -> 'ok'.
  540. terminate(_Reason, _State) ->
  541. ok.
  542. -spec code_change(_, _, _) -> {'ok', _}.
  543. code_change(_OldVsn, State, _Extra) ->
  544. {ok, State}.
  545. %% ------------------------------------------------------------------
  546. %% Internal Function Definitions
  547. %% ------------------------------------------------------------------
  548. do_accept_member(
  549. {StarterPid, Pid},
  550. #pool{
  551. all_members = AllMembers,
  552. starting_members = StartingMembers0,
  553. member_start_timeout = StartTimeout
  554. } = Pool
  555. ) when is_pid(Pid) ->
  556. %% make sure we don't accept a timedout member
  557. Pool1 =
  558. #pool{starting_members = StartingMembers} =
  559. remove_stale_starting_members(Pool, StartingMembers0, StartTimeout),
  560. case lists:keytake(StarterPid, 1, StartingMembers) of
  561. false ->
  562. %% A starter completed even though we invalidated the pid
  563. %% Ask the starter to kill the child and stop. In most cases, the
  564. %% starter has already received this message. However, when pools
  565. %% are dynamically re-created with the same name, it is possible
  566. %% to receive an accept from a pool that has since gone away.
  567. %% In this case, we should cleanup.
  568. pooler_starter:stop_member_async(StarterPid),
  569. Pool1;
  570. {value, _, StartingMembers1} ->
  571. MRef = erlang:monitor(process, Pid),
  572. Entry = {MRef, free, os:timestamp()},
  573. AllMembers1 = store_all_members(Pid, Entry, AllMembers),
  574. pooler_starter:stop(StarterPid),
  575. maybe_reply_with_pid(Pid, Pool1#pool{
  576. all_members = AllMembers1,
  577. starting_members = StartingMembers1
  578. })
  579. end;
  580. do_accept_member(
  581. {StarterPid, _Reason},
  582. #pool{
  583. starting_members = StartingMembers0,
  584. member_start_timeout = StartTimeout
  585. } = Pool
  586. ) ->
  587. %% member start failed, remove in-flight ref and carry on.
  588. pooler_starter:stop(StarterPid),
  589. Pool1 =
  590. #pool{starting_members = StartingMembers} =
  591. remove_stale_starting_members(
  592. Pool,
  593. StartingMembers0,
  594. StartTimeout
  595. ),
  596. StartingMembers1 = lists:keydelete(StarterPid, 1, StartingMembers),
  597. Pool1#pool{starting_members = StartingMembers1}.
  598. maybe_reply_with_pid(
  599. Pid,
  600. Pool = #pool{
  601. queued_requestors = QueuedRequestors,
  602. free_pids = Free,
  603. free_count = NumFree
  604. }
  605. ) when is_pid(Pid) ->
  606. case queue:out(QueuedRequestors) of
  607. {empty, _} ->
  608. Pool#pool{
  609. free_pids = [Pid | Free],
  610. free_count = NumFree + 1
  611. };
  612. {{value, {From = {APid, _}, TRef}}, NewQueuedRequestors} when is_pid(APid) ->
  613. reply_to_queued_requestor(TRef, Pid, From, NewQueuedRequestors, Pool)
  614. end.
  615. reply_to_queued_requestor(TRef, Pid, From = {APid, _}, NewQueuedRequestors, Pool) when is_pid(APid) ->
  616. erlang:cancel_timer(TRef),
  617. Pool1 = take_member_bookkeeping(Pid, From, NewQueuedRequestors, Pool),
  618. send_metric(Pool, in_use_count, Pool1#pool.in_use_count, histogram),
  619. send_metric(Pool, free_count, Pool1#pool.free_count, histogram),
  620. send_metric(Pool, events, error_no_members, history),
  621. gen_server:reply(From, Pid),
  622. Pool1.
  623. -spec take_member_bookkeeping(
  624. pid(),
  625. gen_server_from(),
  626. [pid()] | requestor_queue(),
  627. #pool{}
  628. ) -> #pool{}.
  629. take_member_bookkeeping(
  630. MemberPid,
  631. {CPid, _},
  632. Rest,
  633. Pool = #pool{
  634. in_use_count = NumInUse,
  635. free_count = NumFree,
  636. consumer_to_pid = CPMap,
  637. all_members = AllMembers
  638. }
  639. ) when
  640. is_pid(MemberPid),
  641. is_pid(CPid),
  642. is_list(Rest)
  643. ->
  644. Pool#pool{
  645. free_pids = Rest,
  646. in_use_count = NumInUse + 1,
  647. free_count = NumFree - 1,
  648. consumer_to_pid = add_member_to_consumer(MemberPid, CPid, CPMap),
  649. all_members = set_cpid_for_member(MemberPid, CPid, AllMembers)
  650. };
  651. take_member_bookkeeping(
  652. MemberPid,
  653. {ReplyPid, _Tag},
  654. NewQueuedRequestors,
  655. Pool = #pool{
  656. in_use_count = NumInUse,
  657. all_members = AllMembers,
  658. consumer_to_pid = CPMap
  659. }
  660. ) ->
  661. Pool#pool{
  662. in_use_count = NumInUse + 1,
  663. all_members = set_cpid_for_member(MemberPid, ReplyPid, AllMembers),
  664. consumer_to_pid = add_member_to_consumer(MemberPid, ReplyPid, CPMap),
  665. queued_requestors = NewQueuedRequestors
  666. }.
  667. -spec remove_stale_starting_members(
  668. #pool{},
  669. [{pid(), erlang:timestamp()}],
  670. time_spec()
  671. ) -> #pool{}.
  672. remove_stale_starting_members(Pool, StartingMembers, MaxAge) ->
  673. Now = os:timestamp(),
  674. MaxAgeSecs = time_as_secs(MaxAge),
  675. FilteredStartingMembers = lists:foldl(
  676. fun(SM, AccIn) ->
  677. accumulate_starting_member_not_stale(Pool, Now, SM, MaxAgeSecs, AccIn)
  678. end,
  679. [],
  680. StartingMembers
  681. ),
  682. Pool#pool{starting_members = FilteredStartingMembers}.
  683. accumulate_starting_member_not_stale(Pool, Now, SM = {Pid, StartTime}, MaxAgeSecs, AccIn) ->
  684. case secs_between(StartTime, Now) < MaxAgeSecs of
  685. true ->
  686. [SM | AccIn];
  687. false ->
  688. ?LOG_ERROR(
  689. #{
  690. label => "starting member timeout",
  691. pool => Pool#pool.name
  692. },
  693. #{domain => [pooler]}
  694. ),
  695. send_metric(Pool, starting_member_timeout, {inc, 1}, counter),
  696. pooler_starter:stop_member_async(Pid),
  697. AccIn
  698. end.
  699. init_members_sync(N, #pool{name = PoolName, member_sup = MemberSup} = Pool) ->
  700. Self = self(),
  701. StartTime = os:timestamp(),
  702. StartRefs = [
  703. {pooler_starter:start_member(PoolName, MemberSup, Self), StartTime}
  704. || _I <- lists:seq(1, N)
  705. ],
  706. Pool1 = Pool#pool{starting_members = StartRefs},
  707. case collect_init_members(Pool1) of
  708. timeout ->
  709. ?LOG_ERROR(
  710. #{
  711. label => "exceeded timeout waiting for members",
  712. pool => PoolName,
  713. init_count => Pool1#pool.init_count
  714. },
  715. #{domain => [pooler]}
  716. ),
  717. error({timeout, "unable to start members"});
  718. #pool{} = Pool2 ->
  719. {ok, Pool2}
  720. end.
  721. collect_init_members(#pool{starting_members = Empty} = Pool) when
  722. Empty =:= []
  723. ->
  724. Pool;
  725. collect_init_members(#pool{member_start_timeout = StartTimeout} = Pool) ->
  726. Timeout = time_as_millis(StartTimeout),
  727. receive
  728. {accept_member, {_, _} = StartResult} ->
  729. collect_init_members(do_accept_member(StartResult, Pool))
  730. after Timeout ->
  731. timeout
  732. end.
  733. -spec take_member_from_pool(#pool{}, gen_server_from()) ->
  734. {error_no_members | pid(), #pool{}}.
  735. take_member_from_pool(
  736. #pool{
  737. init_count = InitCount,
  738. max_count = Max,
  739. free_pids = Free,
  740. in_use_count = NumInUse,
  741. free_count = NumFree,
  742. starting_members = StartingMembers,
  743. member_start_timeout = StartTimeout
  744. } = Pool,
  745. From
  746. ) ->
  747. send_metric(Pool, take_rate, 1, meter),
  748. Pool1 = remove_stale_starting_members(Pool, StartingMembers, StartTimeout),
  749. NonStaleStartingMemberCount = length(Pool1#pool.starting_members),
  750. NumCanAdd = Max - (NumInUse + NumFree + NonStaleStartingMemberCount),
  751. case Free of
  752. [] when NumCanAdd =< 0 ->
  753. send_metric(Pool, error_no_members_count, {inc, 1}, counter),
  754. send_metric(Pool, events, error_no_members, history),
  755. {error_no_members, Pool1};
  756. [] when NumCanAdd > 0 ->
  757. %% Limit concurrently starting members to init_count. Add
  758. %% up to init_count members. Starting members here means
  759. %% we always return an error_no_members for a take request
  760. %% when all members are in-use. By adding a batch of new
  761. %% members, the pool should reach a steady state with
  762. %% unused members culled over time (if scheduled cull is
  763. %% enabled).
  764. NumToAdd = max(min(InitCount - NonStaleStartingMemberCount, NumCanAdd), 1),
  765. Pool2 = add_members_async(NumToAdd, Pool1),
  766. send_metric(Pool, error_no_members_count, {inc, 1}, counter),
  767. send_metric(Pool, events, error_no_members, history),
  768. {error_no_members, Pool2};
  769. [Pid | Rest] ->
  770. Pool2 = take_member_bookkeeping(Pid, From, Rest, Pool1),
  771. Pool3 =
  772. case Pool2#pool.auto_grow_threshold of
  773. N when
  774. is_integer(N) andalso
  775. Pool2#pool.free_count =< N andalso
  776. NumCanAdd > 0
  777. ->
  778. NumToAdd = max(min(InitCount - NonStaleStartingMemberCount, NumCanAdd), 0),
  779. add_members_async(NumToAdd, Pool2);
  780. _ ->
  781. Pool2
  782. end,
  783. send_metric(Pool, in_use_count, Pool3#pool.in_use_count, histogram),
  784. send_metric(Pool, free_count, Pool3#pool.free_count, histogram),
  785. {Pid, Pool3}
  786. end.
  787. -spec take_member_from_pool_queued(
  788. #pool{},
  789. gen_server_from(),
  790. non_neg_integer()
  791. ) ->
  792. {error_no_members | queued | pid(), #pool{}}.
  793. take_member_from_pool_queued(
  794. Pool0 = #pool{
  795. queue_max = QMax,
  796. queued_requestors = Requestors
  797. },
  798. From = {CPid, _},
  799. Timeout
  800. ) when is_pid(CPid) ->
  801. case {take_member_from_pool(Pool0, From), queue:len(Requestors)} of
  802. {{error_no_members, Pool1}, QLen} when QLen >= QMax ->
  803. send_metric(Pool1, events, error_no_members, history),
  804. send_metric(Pool1, queue_max_reached, {inc, 1}, counter),
  805. {error_no_members, Pool1};
  806. {{error_no_members, Pool1}, _} when Timeout =:= 0 ->
  807. {error_no_members, Pool1};
  808. {{error_no_members, Pool1 = #pool{queued_requestors = QueuedRequestors}}, QueueCount} ->
  809. TRef = erlang:send_after(Timeout, self(), {requestor_timeout, From}),
  810. send_metric(Pool1, queue_count, QueueCount, histogram),
  811. {queued, Pool1#pool{queued_requestors = queue:in({From, TRef}, QueuedRequestors)}};
  812. {{Member, NewPool}, _} when is_pid(Member) ->
  813. {Member, NewPool}
  814. end.
  815. %% @doc Add `Count' members to `Pool' asynchronously. Returns updated
  816. %% `Pool' record with starting member refs added to field
  817. %% `starting_members'.
  818. add_members_async(Count, #pool{name = PoolName, member_sup = MemberSup, starting_members = StartingMembers} = Pool) ->
  819. StartTime = os:timestamp(),
  820. StartRefs = [
  821. {pooler_starter:start_member(PoolName, MemberSup), StartTime}
  822. || _I <- lists:seq(1, Count)
  823. ],
  824. Pool#pool{starting_members = StartRefs ++ StartingMembers}.
  825. -spec do_return_member(pid(), ok | fail, #pool{}) -> #pool{}.
  826. do_return_member(
  827. Pid,
  828. ok,
  829. #pool{
  830. name = PoolName,
  831. all_members = AllMembers,
  832. queued_requestors = QueuedRequestors
  833. } = Pool
  834. ) ->
  835. clean_group_table(Pid, Pool),
  836. case maps:get(Pid, AllMembers, undefined) of
  837. {_, free, _} ->
  838. ?LOG_WARNING(
  839. #{
  840. label => "ignored return of free member",
  841. pool => PoolName,
  842. pid => Pid
  843. },
  844. #{domain => [pooler]}
  845. ),
  846. Pool;
  847. {MRef, CPid, _} ->
  848. #pool{
  849. free_pids = Free,
  850. in_use_count = NumInUse,
  851. free_count = NumFree
  852. } = Pool,
  853. Pool1 = Pool#pool{in_use_count = NumInUse - 1},
  854. Entry = {MRef, free, os:timestamp()},
  855. Pool2 = Pool1#pool{
  856. all_members = store_all_members(Pid, Entry, AllMembers),
  857. consumer_to_pid = cpmap_remove(
  858. Pid,
  859. CPid,
  860. Pool1#pool.consumer_to_pid
  861. )
  862. },
  863. case queue:out(QueuedRequestors) of
  864. {empty, _} ->
  865. Pool2#pool{free_pids = [Pid | Free], free_count = NumFree + 1};
  866. {{value, {From = {APid, _}, TRef}}, NewQueuedRequestors} when is_pid(APid) ->
  867. reply_to_queued_requestor(TRef, Pid, From, NewQueuedRequestors, Pool2)
  868. end;
  869. undefined ->
  870. Pool
  871. end;
  872. do_return_member(Pid, fail, #pool{all_members = AllMembers} = Pool) ->
  873. % for the fail case, perhaps the member crashed and was alerady
  874. % removed, so use find instead of fetch and ignore missing.
  875. clean_group_table(Pid, Pool),
  876. case maps:get(Pid, AllMembers, undefined) of
  877. {_MRef, _, _} ->
  878. Pool1 = remove_pid(Pid, Pool),
  879. add_members_async(1, Pool1);
  880. undefined ->
  881. Pool
  882. end.
  883. clean_group_table(_MemberPid, #pool{group = undefined}) ->
  884. ok;
  885. clean_group_table(MemberPid, #pool{group = _GroupName}) ->
  886. ets:delete(?POOLER_GROUP_TABLE, MemberPid).
  887. % @doc Remove `Pid' from the pid list associated with `CPid' in the
  888. % consumer to member map given by `CPMap'.
  889. %
  890. % If `Pid' is the last element in `CPid's pid list, then the `CPid'
  891. % entry is removed entirely.
  892. %
  893. -spec cpmap_remove(pid(), pid() | free, consumers_map()) -> consumers_map().
  894. cpmap_remove(_Pid, free, CPMap) ->
  895. CPMap;
  896. cpmap_remove(Pid, CPid, CPMap) ->
  897. case maps:get(CPid, CPMap, undefined) of
  898. {MRef, Pids0} ->
  899. Pids1 = lists:delete(Pid, Pids0),
  900. case Pids1 of
  901. [_H | _T] ->
  902. CPMap#{CPid => {MRef, Pids1}};
  903. [] ->
  904. %% no more members for this consumer
  905. erlang:demonitor(MRef, [flush]),
  906. maps:remove(CPid, CPMap)
  907. end;
  908. undefined ->
  909. % FIXME: this shouldn't happen, should we log or error?
  910. CPMap
  911. end.
  912. % @doc Remove and kill a pool member.
  913. %
  914. % Handles in-use and free members. Logs an error if the pid is not
  915. % tracked in state.all_members.
  916. %
  917. -spec remove_pid(pid(), #pool{}) -> #pool{}.
  918. remove_pid(Pid, Pool) ->
  919. #pool{
  920. name = PoolName,
  921. all_members = AllMembers,
  922. consumer_to_pid = CPMap,
  923. stop_mfa = StopMFA
  924. } = Pool,
  925. case maps:get(Pid, AllMembers, undefined) of
  926. {MRef, free, _Time} ->
  927. % remove an unused member
  928. erlang:demonitor(MRef, [flush]),
  929. FreePids = lists:delete(Pid, Pool#pool.free_pids),
  930. NumFree = Pool#pool.free_count - 1,
  931. Pool1 = Pool#pool{free_pids = FreePids, free_count = NumFree},
  932. terminate_pid(PoolName, Pid, StopMFA),
  933. send_metric(Pool1, killed_free_count, {inc, 1}, counter),
  934. Pool1#pool{all_members = maps:remove(Pid, AllMembers)};
  935. {MRef, CPid, _Time} ->
  936. %% remove a member being consumed. No notice is sent to
  937. %% the consumer.
  938. erlang:demonitor(MRef, [flush]),
  939. Pool1 = Pool#pool{in_use_count = Pool#pool.in_use_count - 1},
  940. terminate_pid(PoolName, Pid, StopMFA),
  941. send_metric(Pool1, killed_in_use_count, {inc, 1}, counter),
  942. Pool1#pool{
  943. consumer_to_pid = cpmap_remove(Pid, CPid, CPMap),
  944. all_members = maps:remove(Pid, AllMembers)
  945. };
  946. undefined ->
  947. ?LOG_ERROR(
  948. #{
  949. label => unknown_pid,
  950. pool => PoolName,
  951. pid => Pid
  952. },
  953. #{domain => [pooler]}
  954. ),
  955. send_metric(Pool, events, unknown_pid, history),
  956. Pool
  957. end.
  958. -spec store_all_members(
  959. pid(),
  960. member_info(),
  961. members_map()
  962. ) -> members_map().
  963. store_all_members(Pid, Val = {_MRef, _CPid, _Time}, AllMembers) ->
  964. AllMembers#{Pid => Val}.
  965. -spec set_cpid_for_member(pid(), pid(), members_map()) -> members_map().
  966. set_cpid_for_member(MemberPid, CPid, AllMembers) ->
  967. maps:update_with(
  968. MemberPid,
  969. fun({MRef, free, Time = {_, _, _}}) ->
  970. {MRef, CPid, Time}
  971. end,
  972. AllMembers
  973. ).
  974. -spec add_member_to_consumer(pid(), pid(), consumers_map()) -> consumers_map().
  975. add_member_to_consumer(MemberPid, CPid, CPMap) ->
  976. %% we can't use maps:update_with here because we need to create the
  977. %% monitor if we aren't already tracking this consumer.
  978. case maps:get(CPid, CPMap, undefined) of
  979. {MRef, MList} ->
  980. CPMap#{CPid => {MRef, [MemberPid | MList]}};
  981. undefined ->
  982. MRef = erlang:monitor(process, CPid),
  983. CPMap#{CPid => {MRef, [MemberPid]}}
  984. end.
  985. -spec cull_members_from_pool(#pool{}) -> #pool{}.
  986. cull_members_from_pool(#pool{cull_interval = {0, _}} = Pool) ->
  987. %% 0 cull_interval means do not cull
  988. Pool;
  989. cull_members_from_pool(#pool{init_count = C, max_count = C} = Pool) ->
  990. %% if init_count matches max_count, then we will not dynamically
  991. %% add capacity and should not schedule culling regardless of
  992. %% cull_interval config.
  993. Pool;
  994. cull_members_from_pool(
  995. #pool{
  996. free_count = FreeCount,
  997. init_count = InitCount,
  998. in_use_count = InUseCount,
  999. cull_interval = Delay,
  1000. cull_timer = CullTRef,
  1001. max_age = MaxAge,
  1002. all_members = AllMembers
  1003. } = Pool
  1004. ) ->
  1005. case is_reference(CullTRef) of
  1006. true -> erlang:cancel_timer(CullTRef);
  1007. false -> noop
  1008. end,
  1009. MaxCull = FreeCount - (InitCount - InUseCount),
  1010. Pool1 =
  1011. case MaxCull > 0 of
  1012. true ->
  1013. MemberInfo = member_info(Pool#pool.free_pids, AllMembers),
  1014. ExpiredMembers =
  1015. expired_free_members(MemberInfo, os:timestamp(), MaxAge),
  1016. CullList = lists:sublist(ExpiredMembers, MaxCull),
  1017. lists:foldl(
  1018. fun({CullMe, _}, S) -> remove_pid(CullMe, S) end,
  1019. Pool,
  1020. CullList
  1021. );
  1022. false ->
  1023. Pool
  1024. end,
  1025. Pool1#pool{cull_timer = schedule_cull(self(), Delay)}.
  1026. -spec schedule_cull(
  1027. Pool :: atom() | pid(),
  1028. Delay :: time_spec()
  1029. ) -> reference().
  1030. %% @doc Schedule a pool cleaning or "cull" for `PoolName' in which
  1031. %% members older than `max_age' will be removed until the pool has
  1032. %% `init_count' members. Uses `erlang:send_after/3' for light-weight
  1033. %% timer that will be auto-cancelled upon pooler shutdown.
  1034. schedule_cull(Pool, Delay) ->
  1035. DelayMillis = time_as_millis(Delay),
  1036. erlang:send_after(DelayMillis, Pool, cull_pool).
  1037. -spec member_info([pid()], members_map()) -> [{pid(), member_info()}].
  1038. member_info(Pids, AllMembers) ->
  1039. maps:to_list(maps:with(Pids, AllMembers)).
  1040. -spec expired_free_members(
  1041. Members :: [{pid(), member_info()}],
  1042. Now :: {_, _, _},
  1043. MaxAge :: time_spec()
  1044. ) -> [{pid(), free_member_info()}].
  1045. expired_free_members(Members, Now, MaxAge) ->
  1046. MaxMicros = time_as_micros(MaxAge),
  1047. [
  1048. MI
  1049. || MI = {_, {_, free, LastReturn}} <- Members,
  1050. timer:now_diff(Now, LastReturn) >= MaxMicros
  1051. ].
  1052. %% Send a metric using the metrics module from application config or
  1053. %% do nothing.
  1054. -spec send_metric(
  1055. Pool :: #pool{},
  1056. Label :: atom(),
  1057. Value :: metric_value(),
  1058. Type :: metric_type()
  1059. ) -> ok.
  1060. send_metric(#pool{metrics_mod = pooler_no_metrics}, _Label, _Value, _Type) ->
  1061. ok;
  1062. send_metric(
  1063. #pool{
  1064. name = PoolName,
  1065. metrics_mod = MetricsMod,
  1066. metrics_api = exometer
  1067. },
  1068. Label,
  1069. {inc, Value},
  1070. counter
  1071. ) ->
  1072. MetricName = pool_metric_exometer(PoolName, Label),
  1073. MetricsMod:update_or_create(MetricName, Value, counter, []),
  1074. ok;
  1075. % Exometer does not support 'history' type metrics right now.
  1076. send_metric(
  1077. #pool{
  1078. name = _PoolName,
  1079. metrics_mod = _MetricsMod,
  1080. metrics_api = exometer
  1081. },
  1082. _Label,
  1083. _Value,
  1084. history
  1085. ) ->
  1086. ok;
  1087. send_metric(
  1088. #pool{
  1089. name = PoolName,
  1090. metrics_mod = MetricsMod,
  1091. metrics_api = exometer
  1092. },
  1093. Label,
  1094. Value,
  1095. Type
  1096. ) ->
  1097. MetricName = pool_metric_exometer(PoolName, Label),
  1098. MetricsMod:update_or_create(MetricName, Value, Type, []),
  1099. ok;
  1100. %folsom API is the default one.
  1101. send_metric(
  1102. #pool{name = PoolName, metrics_mod = MetricsMod, metrics_api = folsom},
  1103. Label,
  1104. Value,
  1105. Type
  1106. ) ->
  1107. MetricName = pool_metric(PoolName, Label),
  1108. MetricsMod:notify(MetricName, Value, Type),
  1109. ok.
  1110. -spec pool_metric(atom(), atom()) -> binary().
  1111. pool_metric(PoolName, Metric) ->
  1112. iolist_to_binary([
  1113. <<"pooler.">>,
  1114. atom_to_binary(PoolName, utf8),
  1115. ".",
  1116. atom_to_binary(Metric, utf8)
  1117. ]).
  1118. %% Exometer metric names are lists, not binaries.
  1119. -spec pool_metric_exometer(atom(), atom()) -> nonempty_list(binary()).
  1120. pool_metric_exometer(PoolName, Metric) ->
  1121. [
  1122. <<"pooler">>,
  1123. atom_to_binary(PoolName, utf8),
  1124. atom_to_binary(Metric, utf8)
  1125. ].
  1126. -spec time_as_secs(time_spec()) -> non_neg_integer().
  1127. time_as_secs({Time, Unit}) ->
  1128. time_as_micros({Time, Unit}) div 1000000.
  1129. -spec time_as_millis(time_spec()) -> non_neg_integer().
  1130. %% @doc Convert time unit into milliseconds.
  1131. time_as_millis({Time, Unit}) ->
  1132. time_as_micros({Time, Unit}) div 1000;
  1133. %% Allows blind convert
  1134. time_as_millis(Time) when is_integer(Time) ->
  1135. Time.
  1136. -spec time_as_micros(time_spec()) -> non_neg_integer().
  1137. %% @doc Convert time unit into microseconds
  1138. time_as_micros({Time, min}) ->
  1139. 60 * 1000 * 1000 * Time;
  1140. time_as_micros({Time, sec}) ->
  1141. 1000 * 1000 * Time;
  1142. time_as_micros({Time, ms}) ->
  1143. 1000 * Time;
  1144. time_as_micros({Time, mu}) ->
  1145. Time.
  1146. secs_between({Mega1, Secs1, _}, {Mega2, Secs2, _}) ->
  1147. (Mega2 - Mega1) * 1000000 + (Secs2 - Secs1).
  1148. -spec maybe_reply({'queued' | 'error_no_members' | pid(), #pool{}}) ->
  1149. {noreply, #pool{}} | {reply, 'error_no_members' | pid(), #pool{}}.
  1150. maybe_reply({Member, NewPool}) ->
  1151. case Member of
  1152. queued ->
  1153. {noreply, NewPool};
  1154. error_no_members ->
  1155. {reply, error_no_members, NewPool};
  1156. Member when is_pid(Member) ->
  1157. {reply, Member, NewPool}
  1158. end.
  1159. %% Implementation of a best-effort termination for a pool member:
  1160. %% Terminates the pid's pool member given a MFA that gets applied. The list
  1161. %% of arguments must contain the fixed atom ?POOLER_PID, which is replaced
  1162. %% by the target pid. Failure to provide a valid MFA will lead to use the
  1163. %% default callback.
  1164. -spec terminate_pid(atom(), pid(), {atom(), atom(), [term()]}) -> ok.
  1165. terminate_pid(PoolName, Pid, {Mod, Fun, Args}) when is_list(Args) ->
  1166. NewArgs = replace_placeholders(PoolName, Pid, Args),
  1167. case catch erlang:apply(Mod, Fun, NewArgs) of
  1168. {'EXIT', _} ->
  1169. terminate_pid(PoolName, Pid, ?DEFAULT_STOP_MFA);
  1170. _Result ->
  1171. ok
  1172. end.
  1173. replace_placeholders(Name, Pid, Args) ->
  1174. [
  1175. case Arg of
  1176. ?POOLER_POOL_NAME ->
  1177. pooler_pool_sup:build_member_sup_name(Name);
  1178. ?POOLER_PID ->
  1179. Pid;
  1180. _ ->
  1181. Arg
  1182. end
  1183. || Arg <- Args
  1184. ].
  1185. compute_utilization(#pool{
  1186. max_count = MaxCount,
  1187. in_use_count = InUseCount,
  1188. free_count = FreeCount,
  1189. starting_members = Starting,
  1190. queued_requestors = Queue,
  1191. queue_max = QueueMax
  1192. }) ->
  1193. [
  1194. {max_count, MaxCount},
  1195. {in_use_count, InUseCount},
  1196. {free_count, FreeCount},
  1197. {starting_count, length(Starting)},
  1198. %% Note not O(n), so in pathological cases this might be expensive
  1199. {queued_count, queue:len(Queue)},
  1200. {queue_max, QueueMax}
  1201. ].
  1202. do_call_free_members(Fun, Pids) ->
  1203. [do_call_free_member(Fun, P) || P <- Pids].
  1204. do_call_free_member(Fun, Pid) ->
  1205. try
  1206. {ok, Fun(Pid)}
  1207. catch
  1208. _Class:Reason ->
  1209. {error, Reason}
  1210. end.
  1211. to_map(#pool{} = Pool) ->
  1212. [Name | Values] = tuple_to_list(Pool),
  1213. maps:from_list(
  1214. [{'$record_name', Name} | lists:zip(record_info(fields, pool), Values)]
  1215. ).
  1216. %% @private
  1217. -spec config_as_map(pool_config() | pool_config_legacy()) -> pool_config().
  1218. config_as_map(Conf) when is_map(Conf) ->
  1219. Conf;
  1220. config_as_map(LegacyConf) when is_list(LegacyConf) ->
  1221. maps:from_list(LegacyConf).
  1222. % >= OTP-21
  1223. -ifdef(OTP_RELEASE).
  1224. -if(?OTP_RELEASE >= 23).
  1225. -define(USE_PG_NOT_PG2, true).
  1226. -else.
  1227. -undef(USE_PG_NOT_PG2).
  1228. -endif.
  1229. % < OTP-21
  1230. -else.
  1231. -undef(USE_PG_NOT_PG2).
  1232. -endif.
  1233. -ifdef(USE_PG_NOT_PG2).
  1234. pg_get_local_members(GroupName) ->
  1235. pg:get_local_members(GroupName).
  1236. pg_delete(_GroupName) ->
  1237. ok.
  1238. pg_create(_Group) ->
  1239. ok.
  1240. pg_join(Group, Pid) ->
  1241. pg:join(Group, Pid).
  1242. -else.
  1243. pg_get_local_members(GroupName) ->
  1244. case pg2:get_local_members(GroupName) of
  1245. {error, {no_such_group, GroupName}} -> [];
  1246. Pids -> Pids
  1247. end.
  1248. pg_delete(GroupName) ->
  1249. pg2:delete(GroupName).
  1250. pg_create(Group) ->
  1251. pg2:create(Group).
  1252. pg_join(Group, Pid) ->
  1253. pg2:join(Group, Pid).
  1254. -endif.