pooler.erl 40 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222
  1. %% @author Seth Falcon <seth@userprimary.net>
  2. %% @copyright 2011-2013 Seth Falcon
  3. %% @doc This is the main interface to the pooler application
  4. %%
  5. %% To integrate with your application, you probably want to call
  6. %% application:start(pooler) after having specified appropriate
  7. %% configuration for the pooler application (either via a config file
  8. %% or appropriate calls to the application module to set the
  9. %% application's config).
  10. %%
  11. -module(pooler).
  12. -behaviour(gen_server).
  13. -include("pooler.hrl").
  14. -include_lib("kernel/include/logger.hrl").
  15. %% type specs for pool metrics
  16. -type metric_value() ::
  17. 'unknown_pid'
  18. | non_neg_integer()
  19. | {'add_pids_failed', non_neg_integer(), non_neg_integer()}
  20. | {'inc', 1}
  21. | 'error_no_members'.
  22. -type metric_type() :: 'counter' | 'histogram' | 'history' | 'meter'.
  23. %% ------------------------------------------------------------------
  24. %% API Function Exports
  25. %% ------------------------------------------------------------------
  26. -export([
  27. start/0,
  28. stop/0
  29. ]).
  30. -export([
  31. accept_member/2,
  32. start_link/1,
  33. take_member/1,
  34. take_member/2,
  35. take_group_member/1,
  36. return_group_member/2,
  37. return_group_member/3,
  38. group_pools/1,
  39. return_member/2,
  40. return_member/3,
  41. pool_stats/1,
  42. pool_utilization/1,
  43. manual_start/0,
  44. new_pool/1,
  45. pool_child_spec/1,
  46. rm_pool/1,
  47. rm_group/1,
  48. call_free_members/2,
  49. call_free_members/3
  50. ]).
  51. %% ------------------------------------------------------------------
  52. %% gen_server Function Exports
  53. %% ------------------------------------------------------------------
  54. -export([
  55. init/1,
  56. handle_continue/2,
  57. handle_call/3,
  58. handle_cast/2,
  59. handle_info/2,
  60. terminate/2,
  61. code_change/3
  62. ]).
  63. %% ------------------------------------------------------------------
  64. %% Types
  65. %% ------------------------------------------------------------------
  66. -export_type([pool_config/0, pool_name/0, group_name/0]).
  67. -type pool_name() :: atom().
  68. -type group_name() :: atom().
  69. -type pool_config() :: [
  70. {name, pool_name()}
  71. | {init_count, non_neg_integer()}
  72. | {max_count, non_neg_integer()}
  73. | {start_mfa, {module(), atom(), [any()]}}
  74. | {group, group_name()}
  75. | {cull_interval, time_spec()}
  76. | {max_age, time_spec()}
  77. | {member_start_timeout, time_spec()}
  78. | {queue_max, non_neg_integer()}
  79. | {metrics_api, folsom | exometer}
  80. | {metrics_mod, module()}
  81. | {stop_mfa, {module(), atom(), ['$pooler_pid' | any(), ...]}}
  82. | {auto_grow_threshold, non_neg_integer()}
  83. | {add_member_retry, non_neg_integer()}
  84. ].
  85. %% ------------------------------------------------------------------
  86. %% Application API
  87. %% ------------------------------------------------------------------
  88. -spec start() -> 'ok'.
  89. start() ->
  90. {ok, _} = application:ensure_all_started(pooler),
  91. ok.
  92. -spec stop() -> 'ok'.
  93. stop() ->
  94. ok = application:stop(pooler).
  95. %% ------------------------------------------------------------------
  96. %% API Function Definitions
  97. %% ------------------------------------------------------------------
  98. start_link(#pool{name = Name} = Pool) ->
  99. gen_server:start_link({local, Name}, ?MODULE, Pool, []).
  100. manual_start() ->
  101. application:start(sasl),
  102. application:start(pooler).
  103. %% @doc Start a new pool described by the proplist `PoolConfig'. The
  104. %% following keys are required in the proplist:
  105. %%
  106. %% <dl>
  107. %% <dt>`name'</dt>
  108. %% <dd>An atom giving the name of the pool.</dd>
  109. %% <dt>`init_count'</dt>
  110. %% <dd>Number of members to add to the pool at start. When the pool is
  111. %% started, `init_count' members will be started in parallel.</dd>
  112. %% <dt>`max_count'</dt>
  113. %% <dd>Maximum number of members in the pool.</dd>
  114. %% <dt>`start_mfa'</dt>
  115. %% <dd>A tuple of the form `{Mod, Fun, Args}' describing how to start
  116. %% new pool members.</dd>
  117. %% </dl>
  118. %%
  119. %% In addition, you can specify any of the following optional
  120. %% configuration options:
  121. %%
  122. %% <dl>
  123. %% <dt>`group'</dt>
  124. %% <dd>An atom giving the name of the group this pool belongs
  125. %% to. Pools sharing a common `group' value can be accessed using
  126. %% {@link take_group_member/1} and {@link return_group_member/2}.</dd>
  127. %% <dt>`cull_interval'</dt>
  128. %% <dd>Time between checks for stale pool members. Specified as
  129. %% `{Time, Unit}' where `Time' is a non-negative integer and `Unit' is
  130. %% one of `min', `sec', `ms', or `mu'. The default value of `{1, min}'
  131. %% triggers a once per minute check to remove members that have not
  132. %% been accessed in `max_age' time units. Culling can be disabled by
  133. %% specifying a zero time vaule (e.g. `{0, min}'. Culling will also be
  134. %% disabled if `init_count' is the same as `max_count'.</dd>
  135. %% <dt>`max_age'</dt>
  136. %% <dd>Members idle longer than `max_age' time units are removed from
  137. %% the pool when stale checking is enabled via
  138. %% `cull_interval'. Culling of idle members will never reduce the pool
  139. %% below `init_count'. The value is specified as `{Time, Unit}'. Note
  140. %% that timers are not set on individual pool members and may remain
  141. %% in the pool beyond the configured `max_age' value since members are
  142. %% only removed on the interval configured via `cull_interval'. The
  143. %% default value is `{30, sec}'.</dd>
  144. %% <dt>`member_start_timeout'</dt>
  145. %% <dd>Time limit for member starts. Specified as `{Time,
  146. %% Unit}'. Defaults to `{1, min}'.</dd>
  147. %% </dl>
  148. -spec new_pool(pool_config()) -> {ok, pid()} | {error, {already_started, pid()}}.
  149. new_pool(PoolConfig) ->
  150. pooler_sup:new_pool(PoolConfig).
  151. %% @doc Terminate the named pool.
  152. -spec rm_pool(pool_name()) -> ok | {error, not_found | running | restarting}.
  153. rm_pool(PoolName) ->
  154. pooler_sup:rm_pool(PoolName).
  155. %% @doc Terminates the group and all pools in that group.
  156. %%
  157. %% If termination of any member pool fails, `rm_group/1' returns
  158. %% `{error, {failed_delete_pools, Pools}}', where `Pools' is a list
  159. %% of pools that failed to terminate.
  160. %%
  161. %% The group is NOT terminated if any member pool did not
  162. %% successfully terminate.
  163. %%
  164. -spec rm_group(group_name()) -> ok | {error, {failed_rm_pools, [atom()]}}.
  165. rm_group(GroupName) ->
  166. Pools = pg_get_local_members(GroupName),
  167. case rm_group_members(Pools) of
  168. [] ->
  169. pg_delete(GroupName);
  170. Failures ->
  171. {error, {failed_rm_pools, Failures}}
  172. end.
  173. -spec rm_group_members([pid()]) -> [atom()].
  174. rm_group_members(MemberPids) ->
  175. lists:foldl(
  176. fun(MemberPid, Acc) ->
  177. Pool = gen_server:call(MemberPid, dump_pool),
  178. PoolName = Pool#pool.name,
  179. case pooler_sup:rm_pool(PoolName) of
  180. ok -> Acc;
  181. _ -> [PoolName | Acc]
  182. end
  183. end,
  184. [],
  185. MemberPids
  186. ).
  187. %% @doc Get child spec described by the proplist `PoolConfig'.
  188. %%
  189. %% See {@link pooler:new_pool/1} for info about `PoolConfig'.
  190. -spec pool_child_spec(pool_config()) -> supervisor:child_spec().
  191. pool_child_spec(PoolConfig) ->
  192. pooler_sup:pool_child_spec(PoolConfig).
  193. %% @doc For INTERNAL use. Adds `MemberPid' to the pool.
  194. -spec accept_member(atom() | pid(), pid() | {noproc, _}) -> ok.
  195. accept_member(PoolName, MemberPid) ->
  196. gen_server:call(PoolName, {accept_member, MemberPid}).
  197. %% @doc Obtain exclusive access to a member from `PoolName'.
  198. %%
  199. %% If no free members are available, 'error_no_members' is returned.
  200. %%
  201. -spec take_member(pool_name() | pid()) -> pid() | error_no_members.
  202. take_member(PoolName) when is_atom(PoolName) orelse is_pid(PoolName) ->
  203. gen_server:call(PoolName, {take_member, 0}, infinity).
  204. %% @doc Obtain exclusive access to a member of 'PoolName'.
  205. %%
  206. %% If no members are available, wait for up to Timeout milliseconds for a member
  207. %% to become available. Waiting requests are served in FIFO order. If no member
  208. %% is available within the specified timeout, error_no_members is returned.
  209. %% `Timeout' can be either milliseconds as integer or `{duration, time_unit}'
  210. %%
  211. -spec take_member(pool_name() | pid(), non_neg_integer() | time_spec()) -> pid() | error_no_members.
  212. take_member(PoolName, Timeout) when is_atom(PoolName) orelse is_pid(PoolName) ->
  213. gen_server:call(PoolName, {take_member, time_as_millis(Timeout)}, infinity).
  214. %% @doc Take a member from a randomly selected member of the group
  215. %% `GroupName'. Returns `MemberPid' or `error_no_members'. If no
  216. %% members are available in the randomly chosen pool, all other pools
  217. %% in the group are tried in order.
  218. -spec take_group_member(group_name()) -> pid() | error_no_members.
  219. take_group_member(GroupName) ->
  220. case pg_get_local_members(GroupName) of
  221. [] ->
  222. error_no_members;
  223. Pools ->
  224. %% Put a random member at the front of the list and then
  225. %% return the first member you can walking the list.
  226. {_, _, X} = os:timestamp(),
  227. Idx = (X rem length(Pools)) + 1,
  228. {PoolPid, Rest} = extract_nth(Idx, Pools),
  229. take_first_pool([PoolPid | Rest])
  230. end.
  231. take_first_pool([PoolPid | Rest]) ->
  232. case take_member(PoolPid) of
  233. error_no_members ->
  234. take_first_pool(Rest);
  235. Member ->
  236. ets:insert(?POOLER_GROUP_TABLE, {Member, PoolPid}),
  237. Member
  238. end;
  239. take_first_pool([]) ->
  240. error_no_members.
  241. %% this helper function returns `{Nth_Elt, Rest}' where `Nth_Elt' is
  242. %% the nth element of `L' and `Rest' is `L -- [Nth_Elt]'.
  243. extract_nth(N, L) ->
  244. extract_nth(N, L, []).
  245. extract_nth(1, [H | T], Acc) ->
  246. {H, Acc ++ T};
  247. extract_nth(N, [H | T], Acc) ->
  248. extract_nth(N - 1, T, [H | Acc]);
  249. extract_nth(_, [], _) ->
  250. error(badarg).
  251. %% @doc Return a member that was taken from the group
  252. %% `GroupName'. This is a convenience function for
  253. %% `return_group_member/3' with `Status' of `ok'.
  254. -spec return_group_member(group_name(), pid() | error_no_members) -> ok.
  255. return_group_member(GroupName, MemberPid) ->
  256. return_group_member(GroupName, MemberPid, ok).
  257. %% @doc Return a member that was taken from the group `GroupName'. If
  258. %% `Status' is `ok' the member is returned to the pool from which is
  259. %% came. If `Status' is `fail' the member will be terminated and a new
  260. %% member added to the appropriate pool.
  261. -spec return_group_member(group_name(), pid() | error_no_members, ok | fail) -> ok.
  262. return_group_member(_, error_no_members, _) ->
  263. ok;
  264. return_group_member(_GroupName, MemberPid, Status) when is_pid(MemberPid) ->
  265. case ets:lookup(?POOLER_GROUP_TABLE, MemberPid) of
  266. [{MemberPid, PoolPid}] ->
  267. return_member(PoolPid, MemberPid, Status);
  268. [] ->
  269. ok
  270. end.
  271. %% @doc Return a member to the pool so it can be reused.
  272. %%
  273. %% If `Status' is 'ok', the member is returned to the pool. If
  274. %% `Status' is 'fail', the member is destroyed and a new member is
  275. %% added to the pool in its place.
  276. -spec return_member(pool_name() | pid(), pid() | error_no_members, ok | fail) -> ok.
  277. return_member(PoolName, Pid, Status) when
  278. is_pid(Pid) andalso
  279. (is_atom(PoolName) orelse
  280. is_pid(PoolName)) andalso
  281. (Status =:= ok orelse
  282. Status =:= fail)
  283. ->
  284. gen_server:call(PoolName, {return_member, Pid, Status}, infinity),
  285. ok;
  286. return_member(_, error_no_members, _) ->
  287. ok.
  288. %% @doc Return a member to the pool so it can be reused.
  289. %%
  290. -spec return_member(pool_name() | pid(), pid() | error_no_members) -> ok.
  291. return_member(PoolName, Pid) when
  292. is_pid(Pid) andalso
  293. (is_atom(PoolName) orelse is_pid(PoolName))
  294. ->
  295. gen_server:call(PoolName, {return_member, Pid, ok}, infinity),
  296. ok;
  297. return_member(_, error_no_members) ->
  298. ok.
  299. %% @doc Obtain runtime state info for all workers.
  300. %%
  301. %% Format of the return value is subject to change.
  302. -spec pool_stats(pool_name() | pid()) -> [{pid(), {reference(), free | pid(), erlang:timestamp()}}].
  303. pool_stats(PoolName) ->
  304. gen_server:call(PoolName, pool_stats).
  305. %% @doc Obtain the pids of all pools which are members of the group.
  306. -spec group_pools(group_name()) -> [pid()].
  307. group_pools(GroupName) ->
  308. pg_get_local_members(GroupName).
  309. %% @doc Obtain utilization info for a pool.
  310. %%
  311. %% Format of the return value is subject to change, but for now it
  312. %% will be a proplist to maintain backcompat with R16.
  313. -spec pool_utilization(pool_name() | pid()) ->
  314. [
  315. {max_count, pos_integer()}
  316. | {in_use_count, non_neg_integer()}
  317. | {free_count, non_neg_integer()}
  318. | {starting_count, non_neg_integer()}
  319. | {queued_count, non_neg_integer()}
  320. | {queue_max, non_neg_integer()}
  321. ].
  322. pool_utilization(PoolName) ->
  323. gen_server:call(PoolName, pool_utilization).
  324. %% @doc Invokes `Fun' with arity 1 over all free members in pool with `PoolName'.
  325. %%
  326. -spec call_free_members(pool_name() | pid(), fun((pid()) -> term())) -> Res when
  327. Res :: [{ok, term()} | {error, term()}].
  328. call_free_members(PoolName, Fun) when
  329. (is_atom(PoolName) orelse is_pid(PoolName)) andalso is_function(Fun, 1)
  330. ->
  331. call_free_members(PoolName, Fun, infinity).
  332. %% @doc Invokes `Fun' with arity 1 over all free members in pool with `PoolName'.
  333. %% `Timeout' sets the timeout of gen_server call.
  334. -spec call_free_members(pool_name() | pid(), Fun, timeout()) -> Res when
  335. Fun :: fun((pid()) -> term()),
  336. Res :: [{ok, term()} | {error, term()}].
  337. call_free_members(PoolName, Fun, Timeout) when
  338. (is_atom(PoolName) orelse is_pid(PoolName)) andalso is_function(Fun, 1)
  339. ->
  340. gen_server:call(PoolName, {call_free_members, Fun}, Timeout).
  341. %% ------------------------------------------------------------------
  342. %% gen_server Function Definitions
  343. %% ------------------------------------------------------------------
  344. init(#pool{} = Pool) ->
  345. #pool{init_count = N} = Pool,
  346. MemberSup = pooler_pool_sup:member_sup_name(Pool),
  347. Pool1 = set_member_sup(Pool, MemberSup),
  348. %% This schedules the next cull when the pool is configured for
  349. %% such and is otherwise a no-op.
  350. Pool2 = cull_members_from_pool(Pool1),
  351. {ok, NewPool} = init_members_sync(N, Pool2),
  352. {ok, NewPool, {continue, join_group}}.
  353. handle_continue(join_group, #pool{group = undefined} = Pool) ->
  354. %% ignore
  355. {noreply, Pool};
  356. handle_continue(join_group, #pool{group = Group} = Pool) ->
  357. ok = pg_create(Group),
  358. ok = pg_join(Group, self()),
  359. {noreply, Pool}.
  360. set_member_sup(#pool{} = Pool, MemberSup) ->
  361. Pool#pool{member_sup = MemberSup}.
  362. handle_call({take_member, Timeout}, From = {APid, _}, #pool{} = Pool) when is_pid(APid) ->
  363. maybe_reply(take_member_from_pool_queued(Pool, From, Timeout));
  364. handle_call({return_member, Pid, Status}, {_CPid, _Tag}, Pool) ->
  365. {reply, ok, do_return_member(Pid, Status, Pool)};
  366. handle_call({accept_member, Pid}, _From, Pool) ->
  367. {reply, ok, do_accept_member(Pid, Pool)};
  368. handle_call(stop, _From, Pool) ->
  369. {stop, normal, stop_ok, Pool};
  370. handle_call(pool_stats, _From, Pool) ->
  371. {reply, dict:to_list(Pool#pool.all_members), Pool};
  372. handle_call(pool_utilization, _From, Pool) ->
  373. {reply, compute_utilization(Pool), Pool};
  374. handle_call(dump_pool, _From, Pool) ->
  375. {reply, Pool, Pool};
  376. handle_call({call_free_members, Fun}, _From, #pool{free_pids = Pids} = Pool) ->
  377. {reply, do_call_free_members(Fun, Pids), Pool};
  378. handle_call(_Request, _From, Pool) ->
  379. {noreply, Pool}.
  380. -spec handle_cast(_, _) -> {'noreply', _}.
  381. handle_cast(_Msg, Pool) ->
  382. {noreply, Pool}.
  383. -spec handle_info(_, _) -> {'noreply', _}.
  384. handle_info({requestor_timeout, From}, Pool = #pool{queued_requestors = RequestorQueue}) ->
  385. NewQueue = queue:filter(
  386. fun
  387. ({RequestorFrom, _TRef}) when RequestorFrom =:= From ->
  388. gen_server:reply(RequestorFrom, error_no_members),
  389. false;
  390. ({_, _}) ->
  391. true
  392. end,
  393. RequestorQueue
  394. ),
  395. {noreply, Pool#pool{queued_requestors = NewQueue}};
  396. handle_info({'DOWN', MRef, process, Pid, Reason}, State) ->
  397. State1 =
  398. case dict:find(Pid, State#pool.all_members) of
  399. {ok, {_PoolName, _ConsumerPid, _Time}} ->
  400. do_return_member(Pid, fail, State);
  401. error ->
  402. case dict:find(Pid, State#pool.consumer_to_pid) of
  403. {ok, {MRef, Pids}} ->
  404. IsOk =
  405. case Reason of
  406. normal -> ok;
  407. _Crash -> fail
  408. end,
  409. lists:foldl(
  410. fun(P, S) -> do_return_member(P, IsOk, S) end,
  411. State,
  412. Pids
  413. );
  414. error ->
  415. State
  416. end
  417. end,
  418. {noreply, State1};
  419. handle_info(cull_pool, Pool) ->
  420. {noreply, cull_members_from_pool(Pool)};
  421. handle_info(_Info, State) ->
  422. {noreply, State}.
  423. -spec terminate(_, _) -> 'ok'.
  424. terminate(_Reason, _State) ->
  425. ok.
  426. -spec code_change(_, _, _) -> {'ok', _}.
  427. code_change(_OldVsn, State, _Extra) ->
  428. {ok, State}.
  429. %% ------------------------------------------------------------------
  430. %% Internal Function Definitions
  431. %% ------------------------------------------------------------------
  432. do_accept_member(
  433. {StarterPid, Pid},
  434. #pool{
  435. all_members = AllMembers,
  436. starting_members = StartingMembers0,
  437. member_start_timeout = StartTimeout
  438. } = Pool
  439. ) when is_pid(Pid) ->
  440. %% make sure we don't accept a timedout member
  441. Pool1 =
  442. #pool{starting_members = StartingMembers} =
  443. remove_stale_starting_members(Pool, StartingMembers0, StartTimeout),
  444. case lists:keymember(StarterPid, 1, StartingMembers) of
  445. false ->
  446. %% A starter completed even though we invalidated the pid
  447. %% Ask the starter to kill the child and stop. In most cases, the
  448. %% starter has already received this message. However, when pools
  449. %% are dynamically re-created with the same name, it is possible
  450. %% to receive an accept from a pool that has since gone away.
  451. %% In this case, we should cleanup.
  452. pooler_starter:stop_member_async(StarterPid),
  453. Pool1;
  454. true ->
  455. StartingMembers1 = lists:keydelete(StarterPid, 1, StartingMembers),
  456. MRef = erlang:monitor(process, Pid),
  457. Entry = {MRef, free, os:timestamp()},
  458. AllMembers1 = store_all_members(Pid, Entry, AllMembers),
  459. pooler_starter:stop(StarterPid),
  460. maybe_reply_with_pid(Pid, Pool1#pool{
  461. all_members = AllMembers1,
  462. starting_members = StartingMembers1
  463. })
  464. end;
  465. do_accept_member(
  466. {StarterPid, _Reason},
  467. #pool{
  468. starting_members = StartingMembers0,
  469. member_start_timeout = StartTimeout
  470. } = Pool
  471. ) ->
  472. %% member start failed, remove in-flight ref and carry on.
  473. pooler_starter:stop(StarterPid),
  474. Pool1 =
  475. #pool{starting_members = StartingMembers} =
  476. remove_stale_starting_members(
  477. Pool,
  478. StartingMembers0,
  479. StartTimeout
  480. ),
  481. StartingMembers1 = lists:keydelete(StarterPid, 1, StartingMembers),
  482. Pool1#pool{starting_members = StartingMembers1}.
  483. maybe_reply_with_pid(
  484. Pid,
  485. Pool = #pool{
  486. queued_requestors = QueuedRequestors,
  487. free_pids = Free,
  488. free_count = NumFree
  489. }
  490. ) when is_pid(Pid) ->
  491. case queue:out(QueuedRequestors) of
  492. {empty, _} ->
  493. Pool#pool{
  494. free_pids = [Pid | Free],
  495. free_count = NumFree + 1
  496. };
  497. {{value, {From = {APid, _}, TRef}}, NewQueuedRequestors} when is_pid(APid) ->
  498. reply_to_queued_requestor(TRef, Pid, From, NewQueuedRequestors, Pool)
  499. end.
  500. reply_to_queued_requestor(TRef, Pid, From = {APid, _}, NewQueuedRequestors, Pool) when is_pid(APid) ->
  501. erlang:cancel_timer(TRef),
  502. Pool1 = take_member_bookkeeping(Pid, From, NewQueuedRequestors, Pool),
  503. send_metric(Pool, in_use_count, Pool1#pool.in_use_count, histogram),
  504. send_metric(Pool, free_count, Pool1#pool.free_count, histogram),
  505. send_metric(Pool, events, error_no_members, history),
  506. gen_server:reply(From, Pid),
  507. Pool1.
  508. -spec take_member_bookkeeping(
  509. pid(),
  510. {pid(), _},
  511. [pid()] | p_requestor_queue(),
  512. #pool{}
  513. ) -> #pool{}.
  514. take_member_bookkeeping(
  515. MemberPid,
  516. {CPid, _},
  517. Rest,
  518. Pool = #pool{
  519. in_use_count = NumInUse,
  520. free_count = NumFree,
  521. consumer_to_pid = CPMap,
  522. all_members = AllMembers
  523. }
  524. ) when
  525. is_pid(MemberPid),
  526. is_pid(CPid),
  527. is_list(Rest)
  528. ->
  529. Pool#pool{
  530. free_pids = Rest,
  531. in_use_count = NumInUse + 1,
  532. free_count = NumFree - 1,
  533. consumer_to_pid = add_member_to_consumer(MemberPid, CPid, CPMap),
  534. all_members = set_cpid_for_member(MemberPid, CPid, AllMembers)
  535. };
  536. take_member_bookkeeping(
  537. MemberPid,
  538. {ReplyPid, _Tag},
  539. NewQueuedRequestors,
  540. Pool = #pool{
  541. in_use_count = NumInUse,
  542. all_members = AllMembers,
  543. consumer_to_pid = CPMap
  544. }
  545. ) ->
  546. Pool#pool{
  547. in_use_count = NumInUse + 1,
  548. all_members = set_cpid_for_member(MemberPid, ReplyPid, AllMembers),
  549. consumer_to_pid = add_member_to_consumer(MemberPid, ReplyPid, CPMap),
  550. queued_requestors = NewQueuedRequestors
  551. }.
  552. -spec remove_stale_starting_members(
  553. #pool{},
  554. [{reference(), erlang:timestamp()}],
  555. time_spec()
  556. ) -> #pool{}.
  557. remove_stale_starting_members(Pool, StartingMembers, MaxAge) ->
  558. Now = os:timestamp(),
  559. MaxAgeSecs = time_as_secs(MaxAge),
  560. FilteredStartingMembers = lists:foldl(
  561. fun(SM, AccIn) ->
  562. accumulate_starting_member_not_stale(Pool, Now, SM, MaxAgeSecs, AccIn)
  563. end,
  564. [],
  565. StartingMembers
  566. ),
  567. Pool#pool{starting_members = FilteredStartingMembers}.
  568. accumulate_starting_member_not_stale(Pool, Now, SM = {Pid, StartTime}, MaxAgeSecs, AccIn) ->
  569. case secs_between(StartTime, Now) < MaxAgeSecs of
  570. true ->
  571. [SM | AccIn];
  572. false ->
  573. ?LOG_ERROR(
  574. #{
  575. label => "starting member timeout",
  576. pool => Pool#pool.name
  577. },
  578. #{domain => [pooler]}
  579. ),
  580. send_metric(Pool, starting_member_timeout, {inc, 1}, counter),
  581. pooler_starter:stop_member_async(Pid),
  582. AccIn
  583. end.
  584. init_members_sync(N, #pool{name = PoolName} = Pool) ->
  585. Self = self(),
  586. StartTime = os:timestamp(),
  587. StartRefs = [
  588. {pooler_starter:start_member(Pool, Self), StartTime}
  589. || _I <- lists:seq(1, N)
  590. ],
  591. Pool1 = Pool#pool{starting_members = StartRefs},
  592. case collect_init_members(Pool1) of
  593. timeout ->
  594. ?LOG_ERROR(
  595. #{
  596. label => "exceeded timeout waiting for members",
  597. pool => PoolName,
  598. init_count => Pool1#pool.init_count
  599. },
  600. #{domain => [pooler]}
  601. ),
  602. error({timeout, "unable to start members"});
  603. #pool{} = Pool2 ->
  604. {ok, Pool2}
  605. end.
  606. collect_init_members(#pool{starting_members = Empty} = Pool) when
  607. Empty =:= []
  608. ->
  609. Pool;
  610. collect_init_members(#pool{member_start_timeout = StartTimeout} = Pool) ->
  611. Timeout = time_as_millis(StartTimeout),
  612. receive
  613. {accept_member, {Ref, Member}} ->
  614. collect_init_members(do_accept_member({Ref, Member}, Pool))
  615. after Timeout ->
  616. timeout
  617. end.
  618. -spec take_member_from_pool(#pool{}, {pid(), term()}) ->
  619. {error_no_members | pid(), #pool{}}.
  620. take_member_from_pool(
  621. #pool{
  622. init_count = InitCount,
  623. max_count = Max,
  624. free_pids = Free,
  625. in_use_count = NumInUse,
  626. free_count = NumFree,
  627. starting_members = StartingMembers,
  628. member_start_timeout = StartTimeout
  629. } = Pool,
  630. From
  631. ) ->
  632. send_metric(Pool, take_rate, 1, meter),
  633. Pool1 = remove_stale_starting_members(Pool, StartingMembers, StartTimeout),
  634. NonStaleStartingMemberCount = length(Pool1#pool.starting_members),
  635. NumCanAdd = Max - (NumInUse + NumFree + NonStaleStartingMemberCount),
  636. case Free of
  637. [] when NumCanAdd =< 0 ->
  638. send_metric(Pool, error_no_members_count, {inc, 1}, counter),
  639. send_metric(Pool, events, error_no_members, history),
  640. {error_no_members, Pool1};
  641. [] when NumCanAdd > 0 ->
  642. %% Limit concurrently starting members to init_count. Add
  643. %% up to init_count members. Starting members here means
  644. %% we always return an error_no_members for a take request
  645. %% when all members are in-use. By adding a batch of new
  646. %% members, the pool should reach a steady state with
  647. %% unused members culled over time (if scheduled cull is
  648. %% enabled).
  649. NumToAdd = max(min(InitCount - NonStaleStartingMemberCount, NumCanAdd), 1),
  650. Pool2 = add_members_async(NumToAdd, Pool1),
  651. send_metric(Pool, error_no_members_count, {inc, 1}, counter),
  652. send_metric(Pool, events, error_no_members, history),
  653. {error_no_members, Pool2};
  654. [Pid | Rest] ->
  655. Pool2 = take_member_bookkeeping(Pid, From, Rest, Pool1),
  656. Pool3 =
  657. case Pool2#pool.auto_grow_threshold of
  658. N when
  659. is_integer(N) andalso
  660. Pool2#pool.free_count =< N andalso
  661. NumCanAdd > 0
  662. ->
  663. NumToAdd = max(min(InitCount - NonStaleStartingMemberCount, NumCanAdd), 0),
  664. add_members_async(NumToAdd, Pool2);
  665. _ ->
  666. Pool2
  667. end,
  668. send_metric(Pool, in_use_count, Pool3#pool.in_use_count, histogram),
  669. send_metric(Pool, free_count, Pool3#pool.free_count, histogram),
  670. {Pid, Pool3}
  671. end.
  672. -spec take_member_from_pool_queued(
  673. #pool{},
  674. {pid(), _},
  675. non_neg_integer()
  676. ) ->
  677. {error_no_members | queued | pid(), #pool{}}.
  678. take_member_from_pool_queued(
  679. Pool0 = #pool{
  680. queue_max = QMax,
  681. queued_requestors = Requestors
  682. },
  683. From = {CPid, _},
  684. Timeout
  685. ) when is_pid(CPid) ->
  686. case {take_member_from_pool(Pool0, From), queue:len(Requestors)} of
  687. {{error_no_members, Pool1}, QLen} when QLen >= QMax ->
  688. send_metric(Pool1, events, error_no_members, history),
  689. send_metric(Pool1, queue_max_reached, {inc, 1}, counter),
  690. {error_no_members, Pool1};
  691. {{error_no_members, Pool1}, _} when Timeout =:= 0 ->
  692. {error_no_members, Pool1};
  693. {{error_no_members, Pool1 = #pool{queued_requestors = QueuedRequestors}}, QueueCount} ->
  694. TRef = erlang:send_after(Timeout, self(), {requestor_timeout, From}),
  695. send_metric(Pool1, queue_count, QueueCount, histogram),
  696. {queued, Pool1#pool{queued_requestors = queue:in({From, TRef}, QueuedRequestors)}};
  697. {{Member, NewPool}, _} when is_pid(Member) ->
  698. {Member, NewPool}
  699. end.
  700. %% @doc Add `Count' members to `Pool' asynchronously. Returns updated
  701. %% `Pool' record with starting member refs added to field
  702. %% `starting_members'.
  703. add_members_async(Count, #pool{starting_members = StartingMembers} = Pool) ->
  704. StartTime = os:timestamp(),
  705. StartRefs = [
  706. {pooler_starter:start_member(Pool), StartTime}
  707. || _I <- lists:seq(1, Count)
  708. ],
  709. Pool#pool{starting_members = StartRefs ++ StartingMembers}.
  710. -spec do_return_member(pid(), ok | fail, #pool{}) -> #pool{}.
  711. do_return_member(
  712. Pid,
  713. ok,
  714. #pool{
  715. name = PoolName,
  716. all_members = AllMembers,
  717. queued_requestors = QueuedRequestors
  718. } = Pool
  719. ) ->
  720. clean_group_table(Pid, Pool),
  721. case dict:find(Pid, AllMembers) of
  722. {ok, {_, free, _}} ->
  723. ?LOG_WARNING(
  724. #{
  725. label => "ignored return of free member",
  726. pool => PoolName,
  727. pid => Pid
  728. },
  729. #{domain => [pooler]}
  730. ),
  731. Pool;
  732. {ok, {MRef, CPid, _}} ->
  733. #pool{
  734. free_pids = Free,
  735. in_use_count = NumInUse,
  736. free_count = NumFree
  737. } = Pool,
  738. Pool1 = Pool#pool{in_use_count = NumInUse - 1},
  739. Entry = {MRef, free, os:timestamp()},
  740. Pool2 = Pool1#pool{
  741. all_members = store_all_members(Pid, Entry, AllMembers),
  742. consumer_to_pid = cpmap_remove(
  743. Pid,
  744. CPid,
  745. Pool1#pool.consumer_to_pid
  746. )
  747. },
  748. case queue:out(QueuedRequestors) of
  749. {empty, _} ->
  750. Pool2#pool{free_pids = [Pid | Free], free_count = NumFree + 1};
  751. {{value, {From = {APid, _}, TRef}}, NewQueuedRequestors} when is_pid(APid) ->
  752. reply_to_queued_requestor(TRef, Pid, From, NewQueuedRequestors, Pool2)
  753. end;
  754. error ->
  755. Pool
  756. end;
  757. do_return_member(Pid, fail, #pool{all_members = AllMembers} = Pool) ->
  758. % for the fail case, perhaps the member crashed and was alerady
  759. % removed, so use find instead of fetch and ignore missing.
  760. clean_group_table(Pid, Pool),
  761. case dict:find(Pid, AllMembers) of
  762. {ok, {_MRef, _, _}} ->
  763. Pool1 = remove_pid(Pid, Pool),
  764. add_members_async(1, Pool1);
  765. error ->
  766. Pool
  767. end.
  768. clean_group_table(_MemberPid, #pool{group = undefined}) ->
  769. ok;
  770. clean_group_table(MemberPid, #pool{group = _GroupName}) ->
  771. ets:delete(?POOLER_GROUP_TABLE, MemberPid).
  772. % @doc Remove `Pid' from the pid list associated with `CPid' in the
  773. % consumer to member map given by `CPMap'.
  774. %
  775. % If `Pid' is the last element in `CPid's pid list, then the `CPid'
  776. % entry is removed entirely.
  777. %
  778. -spec cpmap_remove(pid(), pid() | free, dict:dict()) -> dict:dict().
  779. cpmap_remove(_Pid, free, CPMap) ->
  780. CPMap;
  781. cpmap_remove(Pid, CPid, CPMap) ->
  782. case dict:find(CPid, CPMap) of
  783. {ok, {MRef, Pids0}} ->
  784. Pids1 = lists:delete(Pid, Pids0),
  785. case Pids1 of
  786. [_H | _T] ->
  787. dict:store(CPid, {MRef, Pids1}, CPMap);
  788. [] ->
  789. %% no more members for this consumer
  790. erlang:demonitor(MRef, [flush]),
  791. dict:erase(CPid, CPMap)
  792. end;
  793. error ->
  794. % FIXME: this shouldn't happen, should we log or error?
  795. CPMap
  796. end.
  797. % @doc Remove and kill a pool member.
  798. %
  799. % Handles in-use and free members. Logs an error if the pid is not
  800. % tracked in state.all_members.
  801. %
  802. -spec remove_pid(pid(), #pool{}) -> #pool{}.
  803. remove_pid(Pid, Pool) ->
  804. #pool{
  805. name = PoolName,
  806. all_members = AllMembers,
  807. consumer_to_pid = CPMap,
  808. stop_mfa = StopMFA
  809. } = Pool,
  810. case dict:find(Pid, AllMembers) of
  811. {ok, {MRef, free, _Time}} ->
  812. % remove an unused member
  813. erlang:demonitor(MRef, [flush]),
  814. FreePids = lists:delete(Pid, Pool#pool.free_pids),
  815. NumFree = Pool#pool.free_count - 1,
  816. Pool1 = Pool#pool{free_pids = FreePids, free_count = NumFree},
  817. terminate_pid(PoolName, Pid, StopMFA),
  818. send_metric(Pool1, killed_free_count, {inc, 1}, counter),
  819. Pool1#pool{all_members = dict:erase(Pid, AllMembers)};
  820. {ok, {MRef, CPid, _Time}} ->
  821. %% remove a member being consumed. No notice is sent to
  822. %% the consumer.
  823. erlang:demonitor(MRef, [flush]),
  824. Pool1 = Pool#pool{in_use_count = Pool#pool.in_use_count - 1},
  825. terminate_pid(PoolName, Pid, StopMFA),
  826. send_metric(Pool1, killed_in_use_count, {inc, 1}, counter),
  827. Pool1#pool{
  828. consumer_to_pid = cpmap_remove(Pid, CPid, CPMap),
  829. all_members = dict:erase(Pid, AllMembers)
  830. };
  831. error ->
  832. ?LOG_ERROR(
  833. #{
  834. label => unknown_pid,
  835. pool => PoolName,
  836. pid => Pid
  837. },
  838. #{domain => [pooler]}
  839. ),
  840. send_metric(Pool, events, unknown_pid, history),
  841. Pool
  842. end.
  843. -spec store_all_members(
  844. pid(),
  845. {reference(), free | pid(), {_, _, _}},
  846. dict:dict()
  847. ) -> dict:dict().
  848. store_all_members(Pid, Val = {_MRef, _CPid, _Time}, AllMembers) ->
  849. dict:store(Pid, Val, AllMembers).
  850. -spec set_cpid_for_member(pid(), pid(), dict:dict()) -> dict:dict().
  851. set_cpid_for_member(MemberPid, CPid, AllMembers) ->
  852. dict:update(
  853. MemberPid,
  854. fun({MRef, free, Time = {_, _, _}}) ->
  855. {MRef, CPid, Time}
  856. end,
  857. AllMembers
  858. ).
  859. -spec add_member_to_consumer(pid(), pid(), dict:dict()) -> dict:dict().
  860. add_member_to_consumer(MemberPid, CPid, CPMap) ->
  861. %% we can't use dict:update here because we need to create the
  862. %% monitor if we aren't already tracking this consumer.
  863. case dict:find(CPid, CPMap) of
  864. {ok, {MRef, MList}} ->
  865. dict:store(CPid, {MRef, [MemberPid | MList]}, CPMap);
  866. error ->
  867. MRef = erlang:monitor(process, CPid),
  868. dict:store(CPid, {MRef, [MemberPid]}, CPMap)
  869. end.
  870. -spec cull_members_from_pool(#pool{}) -> #pool{}.
  871. cull_members_from_pool(#pool{cull_interval = {0, _}} = Pool) ->
  872. %% 0 cull_interval means do not cull
  873. Pool;
  874. cull_members_from_pool(#pool{init_count = C, max_count = C} = Pool) ->
  875. %% if init_count matches max_count, then we will not dynamically
  876. %% add capacity and should not schedule culling regardless of
  877. %% cull_interval config.
  878. Pool;
  879. cull_members_from_pool(
  880. #pool{
  881. free_count = FreeCount,
  882. init_count = InitCount,
  883. in_use_count = InUseCount,
  884. cull_interval = Delay,
  885. cull_timer = CullTRef,
  886. max_age = MaxAge,
  887. all_members = AllMembers
  888. } = Pool
  889. ) ->
  890. case is_reference(CullTRef) of
  891. true -> erlang:cancel_timer(CullTRef);
  892. false -> noop
  893. end,
  894. MaxCull = FreeCount - (InitCount - InUseCount),
  895. Pool1 =
  896. case MaxCull > 0 of
  897. true ->
  898. MemberInfo = member_info(Pool#pool.free_pids, AllMembers),
  899. ExpiredMembers =
  900. expired_free_members(MemberInfo, os:timestamp(), MaxAge),
  901. CullList = lists:sublist(ExpiredMembers, MaxCull),
  902. lists:foldl(
  903. fun({CullMe, _}, S) -> remove_pid(CullMe, S) end,
  904. Pool,
  905. CullList
  906. );
  907. false ->
  908. Pool
  909. end,
  910. Pool1#pool{cull_timer = schedule_cull(self(), Delay)}.
  911. -spec schedule_cull(
  912. Pool :: atom() | pid(),
  913. Delay :: time_spec()
  914. ) -> reference().
  915. %% @doc Schedule a pool cleaning or "cull" for `PoolName' in which
  916. %% members older than `max_age' will be removed until the pool has
  917. %% `init_count' members. Uses `erlang:send_after/3' for light-weight
  918. %% timer that will be auto-cancelled upon pooler shutdown.
  919. schedule_cull(Pool, Delay) ->
  920. DelayMillis = time_as_millis(Delay),
  921. erlang:send_after(DelayMillis, Pool, cull_pool).
  922. -spec member_info([pid()], dict:dict()) -> [{pid(), member_info()}].
  923. member_info(Pids, AllMembers) ->
  924. [{P, dict:fetch(P, AllMembers)} || P <- Pids].
  925. -spec expired_free_members(
  926. Members :: [{pid(), member_info()}],
  927. Now :: {_, _, _},
  928. MaxAge :: time_spec()
  929. ) -> [{pid(), free_member_info()}].
  930. expired_free_members(Members, Now, MaxAge) ->
  931. MaxMicros = time_as_micros(MaxAge),
  932. [
  933. MI
  934. || MI = {_, {_, free, LastReturn}} <- Members,
  935. timer:now_diff(Now, LastReturn) >= MaxMicros
  936. ].
  937. %% Send a metric using the metrics module from application config or
  938. %% do nothing.
  939. -spec send_metric(
  940. Pool :: #pool{},
  941. Label :: atom(),
  942. Value :: metric_value(),
  943. Type :: metric_type()
  944. ) -> ok.
  945. send_metric(#pool{metrics_mod = pooler_no_metrics}, _Label, _Value, _Type) ->
  946. ok;
  947. send_metric(
  948. #pool{
  949. name = PoolName,
  950. metrics_mod = MetricsMod,
  951. metrics_api = exometer
  952. },
  953. Label,
  954. {inc, Value},
  955. counter
  956. ) ->
  957. MetricName = pool_metric_exometer(PoolName, Label),
  958. MetricsMod:update_or_create(MetricName, Value, counter, []),
  959. ok;
  960. % Exometer does not support 'history' type metrics right now.
  961. send_metric(
  962. #pool{
  963. name = _PoolName,
  964. metrics_mod = _MetricsMod,
  965. metrics_api = exometer
  966. },
  967. _Label,
  968. _Value,
  969. history
  970. ) ->
  971. ok;
  972. send_metric(
  973. #pool{
  974. name = PoolName,
  975. metrics_mod = MetricsMod,
  976. metrics_api = exometer
  977. },
  978. Label,
  979. Value,
  980. Type
  981. ) ->
  982. MetricName = pool_metric_exometer(PoolName, Label),
  983. MetricsMod:update_or_create(MetricName, Value, Type, []),
  984. ok;
  985. %folsom API is the default one.
  986. send_metric(
  987. #pool{name = PoolName, metrics_mod = MetricsMod, metrics_api = folsom},
  988. Label,
  989. Value,
  990. Type
  991. ) ->
  992. MetricName = pool_metric(PoolName, Label),
  993. MetricsMod:notify(MetricName, Value, Type),
  994. ok.
  995. -spec pool_metric(atom(), atom()) -> binary().
  996. pool_metric(PoolName, Metric) ->
  997. iolist_to_binary([
  998. <<"pooler.">>,
  999. atom_to_binary(PoolName, utf8),
  1000. ".",
  1001. atom_to_binary(Metric, utf8)
  1002. ]).
  1003. %% Exometer metric names are lists, not binaries.
  1004. -spec pool_metric_exometer(atom(), atom()) -> nonempty_list(binary()).
  1005. pool_metric_exometer(PoolName, Metric) ->
  1006. [
  1007. <<"pooler">>,
  1008. atom_to_binary(PoolName, utf8),
  1009. atom_to_binary(Metric, utf8)
  1010. ].
  1011. -spec time_as_secs(time_spec()) -> non_neg_integer().
  1012. time_as_secs({Time, Unit}) ->
  1013. time_as_micros({Time, Unit}) div 1000000.
  1014. -spec time_as_millis(time_spec()) -> non_neg_integer().
  1015. %% @doc Convert time unit into milliseconds.
  1016. time_as_millis({Time, Unit}) ->
  1017. time_as_micros({Time, Unit}) div 1000;
  1018. %% Allows blind convert
  1019. time_as_millis(Time) when is_integer(Time) ->
  1020. Time.
  1021. -spec time_as_micros(time_spec()) -> non_neg_integer().
  1022. %% @doc Convert time unit into microseconds
  1023. time_as_micros({Time, min}) ->
  1024. 60 * 1000 * 1000 * Time;
  1025. time_as_micros({Time, sec}) ->
  1026. 1000 * 1000 * Time;
  1027. time_as_micros({Time, ms}) ->
  1028. 1000 * Time;
  1029. time_as_micros({Time, mu}) ->
  1030. Time.
  1031. secs_between({Mega1, Secs1, _}, {Mega2, Secs2, _}) ->
  1032. (Mega2 - Mega1) * 1000000 + (Secs2 - Secs1).
  1033. -spec maybe_reply({'queued' | 'error_no_members' | pid(), #pool{}}) ->
  1034. {noreply, #pool{}} | {reply, 'error_no_members' | pid(), #pool{}}.
  1035. maybe_reply({Member, NewPool}) ->
  1036. case Member of
  1037. queued ->
  1038. {noreply, NewPool};
  1039. error_no_members ->
  1040. {reply, error_no_members, NewPool};
  1041. Member when is_pid(Member) ->
  1042. {reply, Member, NewPool}
  1043. end.
  1044. %% Implementation of a best-effort termination for a pool member:
  1045. %% Terminates the pid's pool member given a MFA that gets applied. The list
  1046. %% of arguments must contain the fixed atom ?POOLER_PID, which is replaced
  1047. %% by the target pid. Failure to provide a valid MFA will lead to use the
  1048. %% default callback.
  1049. -spec terminate_pid(atom(), pid(), {atom(), atom(), [term()]}) -> ok.
  1050. terminate_pid(PoolName, Pid, {Mod, Fun, Args}) when is_list(Args) ->
  1051. NewArgs = replace_placeholders(PoolName, Pid, Args),
  1052. case catch erlang:apply(Mod, Fun, NewArgs) of
  1053. {'EXIT', _} ->
  1054. terminate_pid(PoolName, Pid, ?DEFAULT_STOP_MFA);
  1055. _Result ->
  1056. ok
  1057. end.
  1058. replace_placeholders(Name, Pid, Args) ->
  1059. [
  1060. case Arg of
  1061. ?POOLER_POOL_NAME ->
  1062. pooler_pool_sup:build_member_sup_name(Name);
  1063. ?POOLER_PID ->
  1064. Pid;
  1065. _ ->
  1066. Arg
  1067. end
  1068. || Arg <- Args
  1069. ].
  1070. compute_utilization(#pool{
  1071. max_count = MaxCount,
  1072. in_use_count = InUseCount,
  1073. free_count = FreeCount,
  1074. starting_members = Starting,
  1075. queued_requestors = Queue,
  1076. queue_max = QueueMax
  1077. }) ->
  1078. [
  1079. {max_count, MaxCount},
  1080. {in_use_count, InUseCount},
  1081. {free_count, FreeCount},
  1082. {starting_count, length(Starting)},
  1083. %% Note not O(n), so in pathological cases this might be expensive
  1084. {queued_count, queue:len(Queue)},
  1085. {queue_max, QueueMax}
  1086. ].
  1087. do_call_free_members(Fun, Pids) ->
  1088. [do_call_free_member(Fun, P) || P <- Pids].
  1089. do_call_free_member(Fun, Pid) ->
  1090. try
  1091. {ok, Fun(Pid)}
  1092. catch
  1093. _Class:Reason ->
  1094. {error, Reason}
  1095. end.
  1096. % >= OTP-21
  1097. -ifdef(OTP_RELEASE).
  1098. -if(?OTP_RELEASE >= 23).
  1099. -define(USE_PG_NOT_PG2, true).
  1100. -else.
  1101. -undef(USE_PG_NOT_PG2).
  1102. -endif.
  1103. % < OTP-21
  1104. -else.
  1105. -undef(USE_PG_NOT_PG2).
  1106. -endif.
  1107. -ifdef(USE_PG_NOT_PG2).
  1108. pg_get_local_members(GroupName) ->
  1109. pg:get_local_members(GroupName).
  1110. pg_delete(_GroupName) ->
  1111. ok.
  1112. pg_create(_Group) ->
  1113. ok.
  1114. pg_join(Group, Pid) ->
  1115. pg:join(Group, Pid).
  1116. -else.
  1117. pg_get_local_members(GroupName) ->
  1118. case pg2:get_local_members(GroupName) of
  1119. {error, {no_such_group, GroupName}} -> [];
  1120. Pids -> Pids
  1121. end.
  1122. pg_delete(GroupName) ->
  1123. pg2:delete(GroupName).
  1124. pg_create(Group) ->
  1125. pg2:create(Group).
  1126. pg_join(Group, Pid) ->
  1127. pg2:join(Group, Pid).
  1128. -endif.