pooler.erl 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965
  1. %% @author Seth Falcon <seth@userprimary.net>
  2. %% @copyright 2011-2013 Seth Falcon
  3. %% @doc This is the main interface to the pooler application
  4. %%
  5. %% To integrate with your application, you probably want to call
  6. %% application:start(pooler) after having specified appropriate
  7. %% configuration for the pooler application (either via a config file
  8. %% or appropriate calls to the application module to set the
  9. %% application's config).
  10. %%
  11. -module(pooler).
  12. -behaviour(gen_server).
  13. -include("pooler.hrl").
  14. %% type specs for pool metrics
  15. -type metric_value() :: 'unknown_pid' |
  16. non_neg_integer() |
  17. {'add_pids_failed', non_neg_integer(), non_neg_integer()} |
  18. {'inc',1} |
  19. 'error_no_members'.
  20. -type metric_type() :: 'counter' | 'histogram' | 'history' | 'meter'.
  21. %% ------------------------------------------------------------------
  22. %% API Function Exports
  23. %% ------------------------------------------------------------------
  24. -export([start/0,
  25. stop/0]).
  26. -export([accept_member/2,
  27. start_link/1,
  28. take_member/1,
  29. take_member/2,
  30. take_group_member/1,
  31. return_group_member/2,
  32. return_group_member/3,
  33. return_member/2,
  34. return_member/3,
  35. pool_stats/1,
  36. pool_utilization/1,
  37. manual_start/0,
  38. new_pool/1,
  39. pool_child_spec/1,
  40. rm_pool/1,
  41. rm_group/1,
  42. call_free_members/2,
  43. call_free_members/3
  44. ]).
  45. %% ------------------------------------------------------------------
  46. %% gen_server Function Exports
  47. %% ------------------------------------------------------------------
  48. -export([init/1,
  49. handle_call/3,
  50. handle_cast/2,
  51. handle_info/2,
  52. terminate/2,
  53. code_change/3]).
  54. %% ------------------------------------------------------------------
  55. %% Application API
  56. %% ------------------------------------------------------------------
  57. -spec start() -> 'ok'.
  58. start() ->
  59. {ok, _} = application:ensure_all_started(pooler),
  60. ok.
  61. -spec stop() -> 'ok'.
  62. stop() ->
  63. ok = application:stop(pooler).
  64. %% ------------------------------------------------------------------
  65. %% API Function Definitions
  66. %% ------------------------------------------------------------------
  67. start_link(#pool{name = Name} = Pool) ->
  68. gen_server:start_link({local, Name}, ?MODULE, Pool, []).
  69. manual_start() ->
  70. application:start(sasl),
  71. application:start(pooler).
  72. %% @doc Start a new pool described by the proplist `PoolConfig'. The
  73. %% following keys are required in the proplist:
  74. %%
  75. %% <dl>
  76. %% <dt>`name'</dt>
  77. %% <dd>An atom giving the name of the pool.</dd>
  78. %% <dt>`init_count'</dt>
  79. %% <dd>Number of members to add to the pool at start. When the pool is
  80. %% started, `init_count' members will be started in parallel.</dd>
  81. %% <dt>`max_count'</dt>
  82. %% <dd>Maximum number of members in the pool.</dd>
  83. %% <dt>`start_mfa'</dt>
  84. %% <dd>A tuple of the form `{Mod, Fun, Args}' describing how to start
  85. %% new pool members.</dd>
  86. %% </dl>
  87. %%
  88. %% In addition, you can specify any of the following optional
  89. %% configuration options:
  90. %%
  91. %% <dl>
  92. %% <dt>`group'</dt>
  93. %% <dd>An atom giving the name of the group this pool belongs
  94. %% to. Pools sharing a common `group' value can be accessed using
  95. %% {@link take_group_member/1} and {@link return_group_member/2}.</dd>
  96. %% <dt>`cull_interval'</dt>
  97. %% <dd>Time between checks for stale pool members. Specified as
  98. %% `{Time, Unit}' where `Time' is a non-negative integer and `Unit' is
  99. %% one of `min', `sec', `ms', or `mu'. The default value of `{1, min}'
  100. %% triggers a once per minute check to remove members that have not
  101. %% been accessed in `max_age' time units. Culling can be disabled by
  102. %% specifying a zero time vaule (e.g. `{0, min}'. Culling will also be
  103. %% disabled if `init_count' is the same as `max_count'.</dd>
  104. %% <dt>`max_age'</dt>
  105. %% <dd>Members idle longer than `max_age' time units are removed from
  106. %% the pool when stale checking is enabled via
  107. %% `cull_interval'. Culling of idle members will never reduce the pool
  108. %% below `init_count'. The value is specified as `{Time, Unit}'. Note
  109. %% that timers are not set on individual pool members and may remain
  110. %% in the pool beyond the configured `max_age' value since members are
  111. %% only removed on the interval configured via `cull_interval'. The
  112. %% default value is `{30, sec}'.</dd>
  113. %% <dt>`member_start_timeout'</dt>
  114. %% <dd>Time limit for member starts. Specified as `{Time,
  115. %% Unit}'. Defaults to `{1, min}'.</dd>
  116. %% </dl>
  117. new_pool(PoolConfig) ->
  118. pooler_sup:new_pool(PoolConfig).
  119. %% @doc Terminate the named pool.
  120. rm_pool(PoolName) ->
  121. pooler_sup:rm_pool(PoolName).
  122. %% @doc Terminates the group and all pools in that group.
  123. %%
  124. %% If termination of any member pool fails, `rm_group/1' returns
  125. %% `{error, {failed_delete_pools, Pools}}', where `Pools' is a list
  126. %% of pools that failed to terminate.
  127. %%
  128. %% The group is NOT terminated if any member pool did not
  129. %% successfully terminate.
  130. %%
  131. -spec rm_group(atom()) -> ok | {error, {failed_rm_pools, [atom()]}}.
  132. rm_group(GroupName) ->
  133. case pg2:get_local_members(GroupName) of
  134. {error, {no_such_group, GroupName}} ->
  135. ok;
  136. Pools ->
  137. case rm_group_members(Pools) of
  138. [] ->
  139. pg2:delete(GroupName);
  140. Failures ->
  141. {error, {failed_rm_pools, Failures}}
  142. end
  143. end.
  144. -spec rm_group_members([pid()]) -> [atom()].
  145. rm_group_members(MemberPids) ->
  146. lists:foldl(
  147. fun(MemberPid, Acc) ->
  148. Pool = gen_server:call(MemberPid, dump_pool),
  149. PoolName = Pool#pool.name,
  150. case pooler_sup:rm_pool(PoolName) of
  151. ok -> Acc;
  152. _ -> [PoolName | Acc]
  153. end
  154. end,
  155. [],
  156. MemberPids).
  157. %% @doc Get child spec described by the proplist `PoolConfig'.
  158. %%
  159. %% See {@link pooler:new_pool/1} for info about `PoolConfig'.
  160. -spec pool_child_spec([{atom(), term()}]) -> supervisor:child_spec().
  161. pool_child_spec(PoolConfig) ->
  162. pooler_sup:pool_child_spec(PoolConfig).
  163. %% @doc For INTERNAL use. Adds `MemberPid' to the pool.
  164. -spec accept_member(atom() | pid(), pid() | {noproc, _}) -> ok.
  165. accept_member(PoolName, MemberPid) ->
  166. gen_server:call(PoolName, {accept_member, MemberPid}).
  167. %% @doc Obtain exclusive access to a member from `PoolName'.
  168. %%
  169. %% If no free members are available, 'error_no_members' is returned.
  170. %%
  171. -spec take_member(atom() | pid()) -> pid() | error_no_members.
  172. take_member(PoolName) when is_atom(PoolName) orelse is_pid(PoolName) ->
  173. gen_server:call(PoolName, {take_member, 0}, infinity).
  174. %% @doc Obtain exclusive access to a member of 'PoolName'.
  175. %%
  176. %% If no members are available, wait for up to Timeout milliseconds for a member
  177. %% to become available. Waiting requests are served in FIFO order. If no member
  178. %% is available within the specified timeout, error_no_members is returned.
  179. %% `Timeout' can be either milliseconds as integer or `{duration, time_unit}'
  180. %%
  181. -spec take_member(atom() | pid(), non_neg_integer() | time_spec()) -> pid() | error_no_members.
  182. take_member(PoolName, Timeout) when is_atom(PoolName) orelse is_pid(PoolName) ->
  183. gen_server:call(PoolName, {take_member, time_as_millis(Timeout)}, infinity).
  184. %% @doc Take a member from a randomly selected member of the group
  185. %% `GroupName'. Returns `MemberPid' or `error_no_members'. If no
  186. %% members are available in the randomly chosen pool, all other pools
  187. %% in the group are tried in order.
  188. -spec take_group_member(atom()) -> pid() | error_no_members | {error_no_group, atom()}.
  189. take_group_member(GroupName) ->
  190. case pg2:get_local_members(GroupName) of
  191. {error, {no_such_group, GroupName}} ->
  192. {error_no_group, GroupName};
  193. [] ->
  194. error_no_members;
  195. Pools ->
  196. %% Put a random member at the front of the list and then
  197. %% return the first member you can walking the list.
  198. {_, _, X} = os:timestamp(),
  199. Idx = (X rem length(Pools)) + 1,
  200. {PoolPid, Rest} = extract_nth(Idx, Pools),
  201. take_first_pool([PoolPid | Rest])
  202. end.
  203. take_first_pool([PoolPid | Rest]) ->
  204. case take_member(PoolPid) of
  205. error_no_members ->
  206. take_first_pool(Rest);
  207. Member ->
  208. ets:insert(?POOLER_GROUP_TABLE, {Member, PoolPid}),
  209. Member
  210. end;
  211. take_first_pool([]) ->
  212. error_no_members.
  213. %% this helper function returns `{Nth_Elt, Rest}' where `Nth_Elt' is
  214. %% the nth element of `L' and `Rest' is `L -- [Nth_Elt]'.
  215. extract_nth(N, L) ->
  216. extract_nth(N, L, []).
  217. extract_nth(1, [H | T], Acc) ->
  218. {H, Acc ++ T};
  219. extract_nth(N, [H | T], Acc) ->
  220. extract_nth(N - 1, T, [H | Acc]);
  221. extract_nth(_, [], _) ->
  222. error(badarg).
  223. %% @doc Return a member that was taken from the group
  224. %% `GroupName'. This is a convenience function for
  225. %% `return_group_member/3' with `Status' of `ok'.
  226. -spec return_group_member(atom(), pid() | error_no_members) -> ok.
  227. return_group_member(GroupName, MemberPid) ->
  228. return_group_member(GroupName, MemberPid, ok).
  229. %% @doc Return a member that was taken from the group `GroupName'. If
  230. %% `Status' is `ok' the member is returned to the pool from which is
  231. %% came. If `Status' is `fail' the member will be terminated and a new
  232. %% member added to the appropriate pool.
  233. -spec return_group_member(atom(), pid() | error_no_members, ok | fail) -> ok.
  234. return_group_member(_, error_no_members, _) ->
  235. ok;
  236. return_group_member(_GroupName, MemberPid, Status) when is_pid(MemberPid) ->
  237. case ets:lookup(?POOLER_GROUP_TABLE, MemberPid) of
  238. [{MemberPid, PoolPid}] ->
  239. return_member(PoolPid, MemberPid, Status);
  240. [] ->
  241. ok
  242. end.
  243. %% @doc Return a member to the pool so it can be reused.
  244. %%
  245. %% If `Status' is 'ok', the member is returned to the pool. If
  246. %% `Status' is 'fail', the member is destroyed and a new member is
  247. %% added to the pool in its place.
  248. -spec return_member(atom() | pid(), pid() | error_no_members, ok | fail) -> ok.
  249. return_member(PoolName, Pid, Status) when is_pid(Pid) andalso
  250. (is_atom(PoolName) orelse
  251. is_pid(PoolName)) andalso
  252. (Status =:= ok orelse
  253. Status =:= fail) ->
  254. gen_server:call(PoolName, {return_member, Pid, Status}, infinity),
  255. ok;
  256. return_member(_, error_no_members, _) ->
  257. ok.
  258. %% @doc Return a member to the pool so it can be reused.
  259. %%
  260. -spec return_member(atom() | pid(), pid() | error_no_members) -> ok.
  261. return_member(PoolName, Pid) when is_pid(Pid) andalso
  262. (is_atom(PoolName) orelse is_pid(PoolName)) ->
  263. gen_server:call(PoolName, {return_member, Pid, ok}, infinity),
  264. ok;
  265. return_member(_, error_no_members) ->
  266. ok.
  267. %% @doc Obtain runtime state info for all pools.
  268. %%
  269. %% Format of the return value is subject to change.
  270. -spec pool_stats(atom() | pid()) -> [tuple()].
  271. pool_stats(PoolName) ->
  272. gen_server:call(PoolName, pool_stats).
  273. %% @doc Obtain utilization info for a pool.
  274. %%
  275. %% Format of the return value is subject to change, but for now it
  276. %% will be a proplist to maintain backcompat with R16.
  277. -spec pool_utilization(atom() | pid()) -> [{atom(), integer()}].
  278. pool_utilization(PoolName) ->
  279. gen_server:call(PoolName, pool_utilization).
  280. %% @doc Invokes `Fun' with arity 1 over all free members in pool with `PoolName'.
  281. %%
  282. -spec call_free_members(atom() | pid(), fun((pid()) -> term())) -> Res when
  283. Res :: [{ok, term()} | {error, term()}].
  284. call_free_members(PoolName, Fun)
  285. when (is_atom(PoolName) orelse is_pid(PoolName)) andalso is_function(Fun, 1) ->
  286. call_free_members(PoolName, Fun, infinity).
  287. %% @doc Invokes `Fun' with arity 1 over all free members in pool with `PoolName'.
  288. %% `Timeout' sets the timeout of gen_server call.
  289. -spec call_free_members(atom() | pid(), Fun, timeout()) -> Res when
  290. Fun :: fun((pid()) -> term()),
  291. Res :: [{ok, term()} | {error, term()}].
  292. call_free_members(PoolName, Fun, Timeout)
  293. when (is_atom(PoolName) orelse is_pid(PoolName)) andalso is_function(Fun, 1) ->
  294. gen_server:call(PoolName, {call_free_members, Fun}, Timeout).
  295. %% ------------------------------------------------------------------
  296. %% gen_server Function Definitions
  297. %% ------------------------------------------------------------------
  298. -spec init(#pool{}) -> {'ok', #pool{}, 0}.
  299. init(#pool{}=Pool) ->
  300. #pool{init_count = N} = Pool,
  301. MemberSup = pooler_pool_sup:member_sup_name(Pool),
  302. Pool1 = set_member_sup(Pool, MemberSup),
  303. %% This schedules the next cull when the pool is configured for
  304. %% such and is otherwise a no-op.
  305. Pool2 = cull_members_from_pool(Pool1),
  306. {ok, NewPool} = init_members_sync(N, Pool2),
  307. %% trigger an immediate timeout, handled by handle_info to allow
  308. %% us to register with pg2. We use the timeout mechanism to ensure
  309. %% that a server is added to a group only when it is ready to
  310. %% process messages.
  311. {ok, NewPool, 0}.
  312. set_member_sup(#pool{} = Pool, MemberSup) ->
  313. Pool#pool{member_sup = MemberSup}.
  314. handle_call({take_member, Timeout}, From = {APid, _}, #pool{} = Pool) when is_pid(APid) ->
  315. maybe_reply(take_member_from_pool_queued(Pool, From, Timeout));
  316. handle_call({return_member, Pid, Status}, {_CPid, _Tag}, Pool) ->
  317. {reply, ok, do_return_member(Pid, Status, Pool)};
  318. handle_call({accept_member, Pid}, _From, Pool) ->
  319. {reply, ok, do_accept_member(Pid, Pool)};
  320. handle_call(stop, _From, Pool) ->
  321. {stop, normal, stop_ok, Pool};
  322. handle_call(pool_stats, _From, Pool) ->
  323. {reply, dict:to_list(Pool#pool.all_members), Pool};
  324. handle_call(pool_utilization, _From, Pool) ->
  325. {reply, compute_utilization(Pool), Pool};
  326. handle_call(dump_pool, _From, Pool) ->
  327. {reply, Pool, Pool};
  328. handle_call({call_free_members, Fun}, _From, #pool{free_pids = Pids} = Pool) ->
  329. {reply, do_call_free_members(Fun, Pids), Pool};
  330. handle_call(_Request, _From, Pool) ->
  331. {noreply, Pool}.
  332. -spec handle_cast(_,_) -> {'noreply', _}.
  333. handle_cast(_Msg, Pool) ->
  334. {noreply, Pool}.
  335. -spec handle_info(_, _) -> {'noreply', _}.
  336. handle_info({requestor_timeout, From}, Pool = #pool{ queued_requestors = RequestorQueue }) ->
  337. NewQueue = queue:filter(fun({RequestorFrom, _TRef}) when RequestorFrom =:= From ->
  338. gen_server:reply(RequestorFrom, error_no_members),
  339. false;
  340. ({_, _}) ->
  341. true
  342. end, RequestorQueue),
  343. {noreply, Pool#pool{ queued_requestors = NewQueue} };
  344. handle_info(timeout, #pool{group = undefined} = Pool) ->
  345. %% ignore
  346. {noreply, Pool};
  347. handle_info(timeout, #pool{group = Group} = Pool) ->
  348. ok = pg2:create(Group),
  349. ok = pg2:join(Group, self()),
  350. {noreply, Pool};
  351. handle_info({'DOWN', MRef, process, Pid, Reason}, State) ->
  352. State1 =
  353. case dict:find(Pid, State#pool.all_members) of
  354. {ok, {_PoolName, _ConsumerPid, _Time}} ->
  355. do_return_member(Pid, fail, State);
  356. error ->
  357. case dict:find(Pid, State#pool.consumer_to_pid) of
  358. {ok, {MRef, Pids}} ->
  359. IsOk = case Reason of
  360. normal -> ok;
  361. _Crash -> fail
  362. end,
  363. lists:foldl(
  364. fun(P, S) -> do_return_member(P, IsOk, S) end,
  365. State, Pids);
  366. error ->
  367. State
  368. end
  369. end,
  370. {noreply, State1};
  371. handle_info(cull_pool, Pool) ->
  372. {noreply, cull_members_from_pool(Pool)};
  373. handle_info(_Info, State) ->
  374. {noreply, State}.
  375. -spec terminate(_, _) -> 'ok'.
  376. terminate(_Reason, _State) ->
  377. ok.
  378. -spec code_change(_, _, _) -> {'ok', _}.
  379. code_change(_OldVsn, State, _Extra) ->
  380. {ok, State}.
  381. %% ------------------------------------------------------------------
  382. %% Internal Function Definitions
  383. %% ------------------------------------------------------------------
  384. do_accept_member({StarterPid, Pid},
  385. #pool{
  386. all_members = AllMembers,
  387. starting_members = StartingMembers0,
  388. member_start_timeout = StartTimeout
  389. } = Pool) when is_pid(Pid) ->
  390. %% make sure we don't accept a timedout member
  391. Pool1 = #pool{starting_members = StartingMembers} =
  392. remove_stale_starting_members(Pool, StartingMembers0, StartTimeout),
  393. case lists:keymember(StarterPid, 1, StartingMembers) of
  394. false ->
  395. %% A starter completed even though we invalidated the pid
  396. %% Ask the starter to kill the child and stop. In most cases, the
  397. %% starter has already received this message. However, when pools
  398. %% are dynamically re-created with the same name, it is possible
  399. %% to receive an accept from a pool that has since gone away.
  400. %% In this case, we should cleanup.
  401. pooler_starter:stop_member_async(StarterPid),
  402. Pool1;
  403. true ->
  404. StartingMembers1 = lists:keydelete(StarterPid, 1, StartingMembers),
  405. MRef = erlang:monitor(process, Pid),
  406. Entry = {MRef, free, os:timestamp()},
  407. AllMembers1 = store_all_members(Pid, Entry, AllMembers),
  408. pooler_starter:stop(StarterPid),
  409. maybe_reply_with_pid(Pid, Pool1#pool{all_members = AllMembers1,
  410. starting_members = StartingMembers1})
  411. end;
  412. do_accept_member({StarterPid, _Reason},
  413. #pool{starting_members = StartingMembers0,
  414. member_start_timeout = StartTimeout} = Pool) ->
  415. %% member start failed, remove in-flight ref and carry on.
  416. pooler_starter:stop(StarterPid),
  417. Pool1 = #pool{starting_members = StartingMembers} =
  418. remove_stale_starting_members(Pool, StartingMembers0,
  419. StartTimeout),
  420. StartingMembers1 = lists:keydelete(StarterPid, 1, StartingMembers),
  421. Pool1#pool{starting_members = StartingMembers1}.
  422. maybe_reply_with_pid(Pid,
  423. Pool = #pool{queued_requestors = QueuedRequestors,
  424. free_pids = Free,
  425. free_count = NumFree}) when is_pid(Pid) ->
  426. case queue:out(QueuedRequestors) of
  427. {empty, _} ->
  428. Pool#pool{free_pids = [Pid | Free],
  429. free_count = NumFree + 1};
  430. {{value, {From = {APid, _}, TRef}}, NewQueuedRequestors} when is_pid(APid) ->
  431. reply_to_queued_requestor(TRef, Pid, From, NewQueuedRequestors, Pool)
  432. end.
  433. reply_to_queued_requestor(TRef, Pid, From = {APid, _}, NewQueuedRequestors, Pool) when is_pid(APid) ->
  434. erlang:cancel_timer(TRef),
  435. Pool1 = take_member_bookkeeping(Pid, From, NewQueuedRequestors, Pool),
  436. send_metric(Pool, in_use_count, Pool1#pool.in_use_count, histogram),
  437. send_metric(Pool, free_count, Pool1#pool.free_count, histogram),
  438. send_metric(Pool, events, error_no_members, history),
  439. gen_server:reply(From, Pid),
  440. Pool1.
  441. -spec take_member_bookkeeping(pid(),
  442. {pid(), _},
  443. [pid()] | p_requestor_queue(),
  444. #pool{}) -> #pool{}.
  445. take_member_bookkeeping(MemberPid,
  446. {CPid, _},
  447. Rest,
  448. Pool = #pool{in_use_count = NumInUse,
  449. free_count = NumFree,
  450. consumer_to_pid = CPMap,
  451. all_members = AllMembers})
  452. when is_pid(MemberPid),
  453. is_pid(CPid),
  454. is_list(Rest) ->
  455. Pool#pool{free_pids = Rest,
  456. in_use_count = NumInUse + 1,
  457. free_count = NumFree - 1,
  458. consumer_to_pid = add_member_to_consumer(MemberPid, CPid, CPMap),
  459. all_members = set_cpid_for_member(MemberPid, CPid, AllMembers)
  460. };
  461. take_member_bookkeeping(MemberPid,
  462. {ReplyPid, _Tag},
  463. NewQueuedRequestors,
  464. Pool = #pool{
  465. in_use_count = NumInUse,
  466. all_members = AllMembers,
  467. consumer_to_pid = CPMap
  468. }) ->
  469. Pool#pool{
  470. in_use_count = NumInUse + 1,
  471. all_members = set_cpid_for_member(MemberPid, ReplyPid, AllMembers),
  472. consumer_to_pid = add_member_to_consumer(MemberPid, ReplyPid, CPMap),
  473. queued_requestors = NewQueuedRequestors
  474. }.
  475. -spec remove_stale_starting_members(#pool{}, [{reference(), erlang:timestamp()}],
  476. time_spec()) -> #pool{}.
  477. remove_stale_starting_members(Pool, StartingMembers, MaxAge) ->
  478. Now = os:timestamp(),
  479. MaxAgeSecs = time_as_secs(MaxAge),
  480. FilteredStartingMembers = lists:foldl(fun(SM, AccIn) ->
  481. accumulate_starting_member_not_stale(Pool, Now, SM, MaxAgeSecs, AccIn)
  482. end, [], StartingMembers),
  483. Pool#pool{starting_members = FilteredStartingMembers}.
  484. accumulate_starting_member_not_stale(Pool, Now, SM = {Pid, StartTime}, MaxAgeSecs, AccIn) ->
  485. case secs_between(StartTime, Now) < MaxAgeSecs of
  486. true ->
  487. [SM | AccIn];
  488. false ->
  489. error_logger:error_msg("pool '~s': starting member timeout", [Pool#pool.name]),
  490. send_metric(Pool, starting_member_timeout, {inc, 1}, counter),
  491. pooler_starter:stop_member_async(Pid),
  492. AccIn
  493. end.
  494. init_members_sync(N, #pool{name = PoolName} = Pool) ->
  495. Self = self(),
  496. StartTime = os:timestamp(),
  497. StartRefs = [ {pooler_starter:start_member(Pool, Self), StartTime}
  498. || _I <- lists:seq(1, N) ],
  499. Pool1 = Pool#pool{starting_members = StartRefs},
  500. case collect_init_members(Pool1) of
  501. timeout ->
  502. error_logger:error_msg("pool '~s': exceeded timeout waiting for ~B members",
  503. [PoolName, Pool1#pool.init_count]),
  504. error({timeout, "unable to start members"});
  505. #pool{} = Pool2 ->
  506. {ok, Pool2}
  507. end.
  508. collect_init_members(#pool{starting_members = Empty} = Pool)
  509. when Empty =:= [] ->
  510. Pool;
  511. collect_init_members(#pool{member_start_timeout = StartTimeout} = Pool) ->
  512. Timeout = time_as_millis(StartTimeout),
  513. receive
  514. {accept_member, {Ref, Member}} ->
  515. collect_init_members(do_accept_member({Ref, Member}, Pool))
  516. after
  517. Timeout ->
  518. timeout
  519. end.
  520. -spec take_member_from_pool(#pool{}, {pid(), term()}) ->
  521. {error_no_members | pid(), #pool{}}.
  522. take_member_from_pool(#pool{init_count = InitCount,
  523. max_count = Max,
  524. free_pids = Free,
  525. in_use_count = NumInUse,
  526. free_count = NumFree,
  527. starting_members = StartingMembers,
  528. member_start_timeout = StartTimeout} = Pool,
  529. From) ->
  530. send_metric(Pool, take_rate, 1, meter),
  531. Pool1 = remove_stale_starting_members(Pool, StartingMembers, StartTimeout),
  532. NonStaleStartingMemberCount = length(Pool1#pool.starting_members),
  533. NumCanAdd = Max - (NumInUse + NumFree + NonStaleStartingMemberCount),
  534. case Free of
  535. [] when NumCanAdd =< 0 ->
  536. send_metric(Pool, error_no_members_count, {inc, 1}, counter),
  537. send_metric(Pool, events, error_no_members, history),
  538. {error_no_members, Pool1};
  539. [] when NumCanAdd > 0 ->
  540. %% Limit concurrently starting members to init_count. Add
  541. %% up to init_count members. Starting members here means
  542. %% we always return an error_no_members for a take request
  543. %% when all members are in-use. By adding a batch of new
  544. %% members, the pool should reach a steady state with
  545. %% unused members culled over time (if scheduled cull is
  546. %% enabled).
  547. NumToAdd = max(min(InitCount - NonStaleStartingMemberCount, NumCanAdd), 1),
  548. Pool2 = add_members_async(NumToAdd, Pool1),
  549. send_metric(Pool, error_no_members_count, {inc, 1}, counter),
  550. send_metric(Pool, events, error_no_members, history),
  551. {error_no_members, Pool2};
  552. [Pid|Rest] ->
  553. Pool2 = take_member_bookkeeping(Pid, From, Rest, Pool1),
  554. Pool3 = case Pool2#pool.auto_grow_threshold of
  555. N when is_integer(N) andalso
  556. Pool2#pool.free_count =< N andalso
  557. NumCanAdd > 0 ->
  558. NumToAdd = max(min(InitCount - NonStaleStartingMemberCount, NumCanAdd), 0),
  559. add_members_async(NumToAdd, Pool2);
  560. _ ->
  561. Pool2
  562. end,
  563. send_metric(Pool, in_use_count, Pool3#pool.in_use_count, histogram),
  564. send_metric(Pool, free_count, Pool3#pool.free_count, histogram),
  565. {Pid, Pool3}
  566. end.
  567. -spec take_member_from_pool_queued(#pool{},
  568. {pid(), _},
  569. non_neg_integer()) ->
  570. {error_no_members | queued | pid(), #pool{}}.
  571. take_member_from_pool_queued(Pool0 = #pool{queue_max = QMax,
  572. queued_requestors = Requestors},
  573. From = {CPid, _},
  574. Timeout) when is_pid(CPid) ->
  575. case {take_member_from_pool(Pool0, From), queue:len(Requestors)} of
  576. {{error_no_members, Pool1}, QLen} when QLen >= QMax ->
  577. send_metric(Pool1, events, error_no_members, history),
  578. send_metric(Pool1, queue_max_reached, {inc, 1}, counter),
  579. {error_no_members, Pool1};
  580. {{error_no_members, Pool1}, _} when Timeout =:= 0 ->
  581. {error_no_members, Pool1};
  582. {{error_no_members, Pool1 = #pool{queued_requestors = QueuedRequestors}}, QueueCount} ->
  583. TRef = erlang:send_after(Timeout, self(), {requestor_timeout, From}),
  584. send_metric(Pool1, queue_count, QueueCount, histogram),
  585. {queued, Pool1#pool{queued_requestors = queue:in({From, TRef}, QueuedRequestors)}};
  586. {{Member, NewPool}, _} when is_pid(Member) ->
  587. {Member, NewPool}
  588. end.
  589. %% @doc Add `Count' members to `Pool' asynchronously. Returns updated
  590. %% `Pool' record with starting member refs added to field
  591. %% `starting_members'.
  592. add_members_async(Count, #pool{starting_members = StartingMembers} = Pool) ->
  593. StartTime = os:timestamp(),
  594. StartRefs = [ {pooler_starter:start_member(Pool), StartTime}
  595. || _I <- lists:seq(1, Count) ],
  596. Pool#pool{starting_members = StartRefs ++ StartingMembers}.
  597. -spec do_return_member(pid(), ok | fail, #pool{}) -> #pool{}.
  598. do_return_member(Pid, ok, #pool{name = PoolName,
  599. all_members = AllMembers,
  600. queued_requestors = QueuedRequestors} = Pool) ->
  601. clean_group_table(Pid, Pool),
  602. case dict:find(Pid, AllMembers) of
  603. {ok, {_, free, _}} ->
  604. Fmt = "pool '~s': ignored return of free member ~p",
  605. error_logger:warning_msg(Fmt, [PoolName, Pid]),
  606. Pool;
  607. {ok, {MRef, CPid, _}} ->
  608. #pool{free_pids = Free, in_use_count = NumInUse,
  609. free_count = NumFree} = Pool,
  610. Pool1 = Pool#pool{in_use_count = NumInUse - 1},
  611. Entry = {MRef, free, os:timestamp()},
  612. Pool2 = Pool1#pool{all_members = store_all_members(Pid, Entry, AllMembers),
  613. consumer_to_pid = cpmap_remove(Pid, CPid,
  614. Pool1#pool.consumer_to_pid)},
  615. case queue:out(QueuedRequestors) of
  616. {empty, _ } ->
  617. Pool2#pool{free_pids = [Pid | Free], free_count = NumFree + 1};
  618. {{value, {From = {APid, _}, TRef}}, NewQueuedRequestors} when is_pid(APid) ->
  619. reply_to_queued_requestor(TRef, Pid, From, NewQueuedRequestors, Pool2)
  620. end;
  621. error ->
  622. Pool
  623. end;
  624. do_return_member(Pid, fail, #pool{all_members = AllMembers} = Pool) ->
  625. % for the fail case, perhaps the member crashed and was alerady
  626. % removed, so use find instead of fetch and ignore missing.
  627. clean_group_table(Pid, Pool),
  628. case dict:find(Pid, AllMembers) of
  629. {ok, {_MRef, _, _}} ->
  630. Pool1 = remove_pid(Pid, Pool),
  631. add_members_async(1, Pool1);
  632. error ->
  633. Pool
  634. end.
  635. clean_group_table(_MemberPid, #pool{group = undefined}) ->
  636. ok;
  637. clean_group_table(MemberPid, #pool{group = _GroupName}) ->
  638. ets:delete(?POOLER_GROUP_TABLE, MemberPid).
  639. % @doc Remove `Pid' from the pid list associated with `CPid' in the
  640. % consumer to member map given by `CPMap'.
  641. %
  642. % If `Pid' is the last element in `CPid's pid list, then the `CPid'
  643. % entry is removed entirely.
  644. %
  645. -spec cpmap_remove(pid(), pid() | free, p_dict()) -> p_dict().
  646. cpmap_remove(_Pid, free, CPMap) ->
  647. CPMap;
  648. cpmap_remove(Pid, CPid, CPMap) ->
  649. case dict:find(CPid, CPMap) of
  650. {ok, {MRef, Pids0}} ->
  651. Pids1 = lists:delete(Pid, Pids0),
  652. case Pids1 of
  653. [_H|_T] ->
  654. dict:store(CPid, {MRef, Pids1}, CPMap);
  655. [] ->
  656. %% no more members for this consumer
  657. erlang:demonitor(MRef, [flush]),
  658. dict:erase(CPid, CPMap)
  659. end;
  660. error ->
  661. % FIXME: this shouldn't happen, should we log or error?
  662. CPMap
  663. end.
  664. % @doc Remove and kill a pool member.
  665. %
  666. % Handles in-use and free members. Logs an error if the pid is not
  667. % tracked in state.all_members.
  668. %
  669. -spec remove_pid(pid(), #pool{}) -> #pool{}.
  670. remove_pid(Pid, Pool) ->
  671. #pool{name = PoolName,
  672. all_members = AllMembers,
  673. consumer_to_pid = CPMap,
  674. stop_mfa = StopMFA} = Pool,
  675. case dict:find(Pid, AllMembers) of
  676. {ok, {MRef, free, _Time}} ->
  677. % remove an unused member
  678. erlang:demonitor(MRef, [flush]),
  679. FreePids = lists:delete(Pid, Pool#pool.free_pids),
  680. NumFree = Pool#pool.free_count - 1,
  681. Pool1 = Pool#pool{free_pids = FreePids, free_count = NumFree},
  682. terminate_pid(PoolName, Pid, StopMFA),
  683. send_metric(Pool1, killed_free_count, {inc, 1}, counter),
  684. Pool1#pool{all_members = dict:erase(Pid, AllMembers)};
  685. {ok, {MRef, CPid, _Time}} ->
  686. %% remove a member being consumed. No notice is sent to
  687. %% the consumer.
  688. erlang:demonitor(MRef, [flush]),
  689. Pool1 = Pool#pool{in_use_count = Pool#pool.in_use_count - 1},
  690. terminate_pid(PoolName, Pid, StopMFA),
  691. send_metric(Pool1, killed_in_use_count, {inc, 1}, counter),
  692. Pool1#pool{consumer_to_pid = cpmap_remove(Pid, CPid, CPMap),
  693. all_members = dict:erase(Pid, AllMembers)};
  694. error ->
  695. error_logger:error_report({{pool, PoolName}, unknown_pid, Pid,
  696. ?GET_STACKTRACE}),
  697. send_metric(Pool, events, unknown_pid, history),
  698. Pool
  699. end.
  700. -spec store_all_members(pid(),
  701. {reference(), free | pid(), {_, _, _}}, p_dict()) -> p_dict().
  702. store_all_members(Pid, Val = {_MRef, _CPid, _Time}, AllMembers) ->
  703. dict:store(Pid, Val, AllMembers).
  704. -spec set_cpid_for_member(pid(), pid(), p_dict()) -> p_dict().
  705. set_cpid_for_member(MemberPid, CPid, AllMembers) ->
  706. dict:update(MemberPid,
  707. fun({MRef, free, Time = {_, _, _}}) ->
  708. {MRef, CPid, Time}
  709. end, AllMembers).
  710. -spec add_member_to_consumer(pid(), pid(), p_dict()) -> p_dict().
  711. add_member_to_consumer(MemberPid, CPid, CPMap) ->
  712. %% we can't use dict:update here because we need to create the
  713. %% monitor if we aren't already tracking this consumer.
  714. case dict:find(CPid, CPMap) of
  715. {ok, {MRef, MList}} ->
  716. dict:store(CPid, {MRef, [MemberPid | MList]}, CPMap);
  717. error ->
  718. MRef = erlang:monitor(process, CPid),
  719. dict:store(CPid, {MRef, [MemberPid]}, CPMap)
  720. end.
  721. -spec cull_members_from_pool(#pool{}) -> #pool{}.
  722. cull_members_from_pool(#pool{cull_interval = {0, _}} = Pool) ->
  723. %% 0 cull_interval means do not cull
  724. Pool;
  725. cull_members_from_pool(#pool{init_count = C, max_count = C} = Pool) ->
  726. %% if init_count matches max_count, then we will not dynamically
  727. %% add capacity and should not schedule culling regardless of
  728. %% cull_interval config.
  729. Pool;
  730. cull_members_from_pool(#pool{name = PoolName,
  731. free_count = FreeCount,
  732. init_count = InitCount,
  733. in_use_count = InUseCount,
  734. cull_interval = Delay,
  735. max_age = MaxAge,
  736. all_members = AllMembers} = Pool) ->
  737. MaxCull = FreeCount - (InitCount - InUseCount),
  738. Pool1 = case MaxCull > 0 of
  739. true ->
  740. MemberInfo = member_info(Pool#pool.free_pids, AllMembers),
  741. ExpiredMembers =
  742. expired_free_members(MemberInfo, os:timestamp(), MaxAge),
  743. CullList = lists:sublist(ExpiredMembers, MaxCull),
  744. lists:foldl(fun({CullMe, _}, S) -> remove_pid(CullMe, S) end,
  745. Pool, CullList);
  746. false ->
  747. Pool
  748. end,
  749. schedule_cull(PoolName, Delay),
  750. Pool1.
  751. -spec schedule_cull(PoolName :: atom() | pid(),
  752. Delay :: time_spec()) -> reference().
  753. %% @doc Schedule a pool cleaning or "cull" for `PoolName' in which
  754. %% members older than `max_age' will be removed until the pool has
  755. %% `init_count' members. Uses `erlang:send_after/3' for light-weight
  756. %% timer that will be auto-cancelled upon pooler shutdown.
  757. schedule_cull(PoolName, Delay) ->
  758. DelayMillis = time_as_millis(Delay),
  759. %% use pid instead of server name atom to take advantage of
  760. %% automatic cancelling
  761. erlang:send_after(DelayMillis, PoolName, cull_pool).
  762. -spec member_info([pid()], p_dict()) -> [{pid(), member_info()}].
  763. member_info(Pids, AllMembers) ->
  764. [ {P, dict:fetch(P, AllMembers)} || P <- Pids ].
  765. -spec expired_free_members(Members :: [{pid(), member_info()}],
  766. Now :: {_, _, _},
  767. MaxAge :: time_spec()) -> [{pid(), free_member_info()}].
  768. expired_free_members(Members, Now, MaxAge) ->
  769. MaxMicros = time_as_micros(MaxAge),
  770. [ MI || MI = {_, {_, free, LastReturn}} <- Members,
  771. timer:now_diff(Now, LastReturn) >= MaxMicros ].
  772. %% Send a metric using the metrics module from application config or
  773. %% do nothing.
  774. -spec send_metric(Pool :: #pool{},
  775. Label :: atom(),
  776. Value :: metric_value(),
  777. Type :: metric_type()) -> ok.
  778. send_metric(#pool{metrics_mod = pooler_no_metrics}, _Label, _Value, _Type) ->
  779. ok;
  780. send_metric(#pool{name = PoolName, metrics_mod = MetricsMod,
  781. metrics_api = exometer}, Label, {inc, Value}, counter) ->
  782. MetricName = pool_metric_exometer(PoolName, Label),
  783. MetricsMod:update_or_create(MetricName, Value, counter, []),
  784. ok;
  785. % Exometer does not support 'history' type metrics right now.
  786. send_metric(#pool{name = _PoolName, metrics_mod = _MetricsMod,
  787. metrics_api = exometer}, _Label, _Value, history) ->
  788. ok;
  789. send_metric(#pool{name = PoolName, metrics_mod = MetricsMod,
  790. metrics_api = exometer}, Label, Value, Type) ->
  791. MetricName = pool_metric_exometer(PoolName, Label),
  792. MetricsMod:update_or_create(MetricName, Value, Type, []),
  793. ok;
  794. %folsom API is the default one.
  795. send_metric(#pool{name = PoolName, metrics_mod = MetricsMod, metrics_api = folsom},
  796. Label, Value, Type) ->
  797. MetricName = pool_metric(PoolName, Label),
  798. MetricsMod:notify(MetricName, Value, Type),
  799. ok.
  800. -spec pool_metric(atom(), atom()) -> binary().
  801. pool_metric(PoolName, Metric) ->
  802. iolist_to_binary([<<"pooler.">>, atom_to_binary(PoolName, utf8),
  803. ".", atom_to_binary(Metric, utf8)]).
  804. %% Exometer metric names are lists, not binaries.
  805. -spec pool_metric_exometer(atom(), atom()) -> nonempty_list(binary()).
  806. pool_metric_exometer(PoolName, Metric) ->
  807. [<<"pooler">>, atom_to_binary(PoolName, utf8),
  808. atom_to_binary(Metric, utf8)].
  809. -spec time_as_secs(time_spec()) -> non_neg_integer().
  810. time_as_secs({Time, Unit}) ->
  811. time_as_micros({Time, Unit}) div 1000000.
  812. -spec time_as_millis(time_spec()) -> non_neg_integer().
  813. %% @doc Convert time unit into milliseconds.
  814. time_as_millis({Time, Unit}) ->
  815. time_as_micros({Time, Unit}) div 1000;
  816. %% Allows blind convert
  817. time_as_millis(Time) when is_integer(Time) ->
  818. Time.
  819. -spec time_as_micros(time_spec()) -> non_neg_integer().
  820. %% @doc Convert time unit into microseconds
  821. time_as_micros({Time, min}) ->
  822. 60 * 1000 * 1000 * Time;
  823. time_as_micros({Time, sec}) ->
  824. 1000 * 1000 * Time;
  825. time_as_micros({Time, ms}) ->
  826. 1000 * Time;
  827. time_as_micros({Time, mu}) ->
  828. Time.
  829. secs_between({Mega1, Secs1, _}, {Mega2, Secs2, _}) ->
  830. (Mega2 - Mega1) * 1000000 + (Secs2 - Secs1).
  831. -spec maybe_reply({'queued' | 'error_no_members' | pid(), #pool{}}) ->
  832. {noreply, #pool{}} | {reply, 'error_no_members' | pid(), #pool{}}.
  833. maybe_reply({Member, NewPool}) ->
  834. case Member of
  835. queued ->
  836. {noreply, NewPool};
  837. error_no_members ->
  838. {reply, error_no_members, NewPool};
  839. Member when is_pid(Member) ->
  840. {reply, Member, NewPool}
  841. end.
  842. %% Implementation of a best-effort termination for a pool member:
  843. %% Terminates the pid's pool member given a MFA that gets applied. The list
  844. %% of arguments must contain the fixed atom ?POOLER_PID, which is replaced
  845. %% by the target pid. Failure to provide a valid MFA will lead to use the
  846. %% default callback.
  847. -spec terminate_pid(atom(), pid(), {atom(), atom(), [term()]}) -> ok.
  848. terminate_pid(PoolName, Pid, {Mod, Fun, Args}) when is_list(Args) ->
  849. NewArgs = replace_placeholders(PoolName, Pid, Args),
  850. case catch erlang:apply(Mod, Fun, NewArgs) of
  851. {'EXIT', _} ->
  852. terminate_pid(PoolName, Pid, ?DEFAULT_STOP_MFA);
  853. _Result ->
  854. ok
  855. end.
  856. replace_placeholders(Name, Pid, Args) ->
  857. [case Arg of
  858. ?POOLER_POOL_NAME ->
  859. pooler_pool_sup:build_member_sup_name(Name);
  860. ?POOLER_PID ->
  861. Pid;
  862. _ ->
  863. Arg
  864. end || Arg <- Args].
  865. compute_utilization(#pool{max_count = MaxCount,
  866. in_use_count = InUseCount,
  867. free_count = FreeCount,
  868. queued_requestors = Queue,
  869. queue_max = QueueMax}) ->
  870. [{max_count, MaxCount},
  871. {in_use_count, InUseCount},
  872. {free_count, FreeCount},
  873. {queued_count, queue:len(Queue)}, %% Note not O(n), so in pathological cases this might be expensive
  874. {queue_max, QueueMax}].
  875. do_call_free_members(Fun, Pids) ->
  876. [do_call_free_member(Fun, P) || P <- Pids].
  877. do_call_free_member(Fun, Pid) ->
  878. try {ok, Fun(Pid)}
  879. catch
  880. _Class:Reason ->
  881. {error, Reason}
  882. end.