pooler.erl 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714
  1. %% @author Seth Falcon <seth@userprimary.net>
  2. %% @copyright 2011-2013 Seth Falcon
  3. %% @doc This is the main interface to the pooler application
  4. %%
  5. %% To integrate with your application, you probably want to call
  6. %% application:start(pooler) after having specified appropriate
  7. %% configuration for the pooler application (either via a config file
  8. %% or appropriate calls to the application module to set the
  9. %% application's config).
  10. %%
  11. -module(pooler).
  12. -behaviour(gen_server).
  13. -include("pooler.hrl").
  14. -include_lib("eunit/include/eunit.hrl").
  15. %% type specs for pool metrics
  16. -type metric_value() :: 'unknown_pid' |
  17. non_neg_integer() |
  18. {'add_pids_failed', non_neg_integer(), non_neg_integer()} |
  19. {'inc',1} |
  20. 'error_no_members'.
  21. -type metric_type() :: 'counter' | 'histogram' | 'history' | 'meter'.
  22. %% ------------------------------------------------------------------
  23. %% API Function Exports
  24. %% ------------------------------------------------------------------
  25. -export([accept_member/2,
  26. start_link/1,
  27. take_member/1,
  28. take_group_member/1,
  29. return_group_member/2,
  30. return_group_member/3,
  31. return_member/2,
  32. return_member/3,
  33. pool_stats/1,
  34. manual_start/0,
  35. new_pool/1,
  36. pool_child_spec/1,
  37. rm_pool/1,
  38. rm_group/1]).
  39. %% ------------------------------------------------------------------
  40. %% gen_server Function Exports
  41. %% ------------------------------------------------------------------
  42. -export([init/1,
  43. handle_call/3,
  44. handle_cast/2,
  45. handle_info/2,
  46. terminate/2,
  47. code_change/3]).
  48. %% To help with testing internal functions
  49. -ifdef(TEST).
  50. -compile([export_all]).
  51. -endif.
  52. %% ------------------------------------------------------------------
  53. %% API Function Definitions
  54. %% ------------------------------------------------------------------
  55. start_link(#pool{name = Name} = Pool) ->
  56. gen_server:start_link({local, Name}, ?MODULE, Pool, []).
  57. manual_start() ->
  58. application:start(sasl),
  59. application:start(pooler).
  60. %% @doc Start a new pool described by the proplist `PoolConfig'. The
  61. %% following keys are required in the proplist:
  62. %%
  63. %% <dl>
  64. %% <dt>`name'</dt>
  65. %% <dd>An atom giving the name of the pool.</dd>
  66. %% <dt>`init_count'</dt>
  67. %% <dd>Number of members to add to the pool at start. When the pool is
  68. %% started, `init_count' members will be started in parallel.</dd>
  69. %% <dt>`max_count'</dt>
  70. %% <dd>Maximum number of members in the pool.</dd>
  71. %% <dt>`start_mfa'</dt>
  72. %% <dd>A tuple of the form `{Mod, Fun, Args}' describing how to start
  73. %% new pool members.</dd>
  74. %% </dl>
  75. %%
  76. %% In addition, you can specify any of the following optional
  77. %% configuration options:
  78. %%
  79. %% <dl>
  80. %% <dt>`group'</dt>
  81. %% <dd>An atom giving the name of the group this pool belongs
  82. %% to. Pools sharing a common `group' value can be accessed using
  83. %% {@link take_group_member/1} and {@link return_group_member/2}.</dd>
  84. %% <dt>`cull_interval'</dt>
  85. %% <dd>Time between checks for stale pool members. Specified as
  86. %% `{Time, Unit}' where `Time' is a non-negative integer and `Unit' is
  87. %% one of `min', `sec', `ms', or `mu'. The default value of `{1, min}'
  88. %% triggers a once per minute check to remove members that have not
  89. %% been accessed in `max_age' time units. Culling can be disabled by
  90. %% specifying a zero time vaule (e.g. `{0, min}'. Culling will also be
  91. %% disabled if `init_count' is the same as `max_count'.</dd>
  92. %% <dt>`max_age'</dt>
  93. %% <dd>Members idle longer than `max_age' time units are removed from
  94. %% the pool when stale checking is enabled via
  95. %% `cull_interval'. Culling of idle members will never reduce the pool
  96. %% below `init_count'. The value is specified as `{Time, Unit}'. Note
  97. %% that timers are not set on individual pool members and may remain
  98. %% in the pool beyond the configured `max_age' value since members are
  99. %% only removed on the interval configured via `cull_interval'. The
  100. %% default value is `{30, sec}'.</dd>
  101. %% <dt>`member_start_timeout'</dt>
  102. %% <dd>Time limit for member starts. Specified as `{Time,
  103. %% Unit}'. Defaults to `{1, min}'.</dd>
  104. %% </dl>
  105. new_pool(PoolConfig) ->
  106. pooler_sup:new_pool(PoolConfig).
  107. %% @doc Terminate the named pool.
  108. rm_pool(PoolName) ->
  109. pooler_sup:rm_pool(PoolName).
  110. %% @doc Terminates the group and all pools in that group.
  111. %%
  112. %% If termination of any member pool fails, `rm_group/1` returns
  113. %% `{error, {failed_delete_pools, Pools}}`, where `Pools` is a list
  114. %% of pools that failed to terminate.
  115. %%
  116. %% The group is NOT terminated if any member pool did not
  117. %% successfully terminate.
  118. %%
  119. -spec rm_group(atom()) -> ok | {error, {failed_rm_pools, [atom()]}}.
  120. rm_group(GroupName) ->
  121. case pg2:get_local_members(GroupName) of
  122. {error, {no_such_group, GroupName}} ->
  123. ok;
  124. Pools ->
  125. case rm_group_members(Pools) of
  126. [] ->
  127. pg2:delete(GroupName);
  128. Failures ->
  129. {error, {failed_rm_pools, Failures}}
  130. end
  131. end.
  132. -spec rm_group_members([pid()]) -> [atom()].
  133. rm_group_members(MemberPids) ->
  134. lists:foldl(
  135. fun(MemberPid, Acc) ->
  136. Pool = gen_server:call(MemberPid, dump_pool),
  137. PoolName = Pool#pool.name,
  138. case pooler_sup:rm_pool(PoolName) of
  139. ok -> Acc;
  140. _ -> [PoolName | Acc]
  141. end
  142. end,
  143. [],
  144. MemberPids).
  145. %% @doc Get child spec described by the proplist `PoolConfig'.
  146. %%
  147. %% See {@link pooler:new_pool/1} for info about `PoolConfig'.
  148. -spec pool_child_spec([{atom(), term()}]) -> supervisor:child_spec().
  149. pool_child_spec(PoolConfig) ->
  150. pooler_sup:pool_child_spec(PoolConfig).
  151. %% @doc For INTERNAL use. Adds `MemberPid' to the pool.
  152. -spec accept_member(atom() | pid(), pid() | {noproc, _}) -> ok.
  153. accept_member(PoolName, MemberPid) ->
  154. gen_server:call(PoolName, {accept_member, MemberPid}).
  155. %% @doc Obtain exclusive access to a member from `PoolName'.
  156. %%
  157. %% If no free members are available, 'error_no_members' is returned.
  158. %%
  159. -spec take_member(atom() | pid()) -> pid() | error_no_members.
  160. take_member(PoolName) when is_atom(PoolName) orelse is_pid(PoolName) ->
  161. gen_server:call(PoolName, take_member, infinity).
  162. %% @doc Take a member from a randomly selected member of the group
  163. %% `GroupName'. Returns `MemberPid' or `error_no_members'. If no
  164. %% members are available in the randomly chosen pool, all other pools
  165. %% in the group are tried in order.
  166. -spec take_group_member(atom()) -> pid() | error_no_members | {error_no_group, atom()}.
  167. take_group_member(GroupName) ->
  168. case pg2:get_local_members(GroupName) of
  169. {error, {no_such_group, GroupName}} ->
  170. {error_no_group, GroupName};
  171. [] ->
  172. error_no_members;
  173. Pools ->
  174. %% Put a random member at the front of the list and then
  175. %% return the first member you can walking the list.
  176. {_, _, X} = erlang:now(),
  177. Idx = (X rem length(Pools)) + 1,
  178. {PoolPid, Rest} = extract_nth(Idx, Pools),
  179. take_first_pool([PoolPid | Rest])
  180. end.
  181. take_first_pool([PoolPid | Rest]) ->
  182. case take_member(PoolPid) of
  183. error_no_members ->
  184. take_first_pool(Rest);
  185. Member ->
  186. ets:insert(?POOLER_GROUP_TABLE, {Member, PoolPid}),
  187. Member
  188. end;
  189. take_first_pool([]) ->
  190. error_no_members.
  191. %% this helper function returns `{Nth_Elt, Rest}' where `Nth_Elt' is
  192. %% the nth element of `L' and `Rest' is `L -- [Nth_Elt]'.
  193. extract_nth(N, L) ->
  194. extract_nth(N, L, []).
  195. extract_nth(1, [H | T], Acc) ->
  196. {H, Acc ++ T};
  197. extract_nth(N, [H | T], Acc) ->
  198. extract_nth(N - 1, T, [H | Acc]);
  199. extract_nth(_, [], _) ->
  200. error(badarg).
  201. %% @doc Return a member that was taken from the group
  202. %% `GroupName'. This is a convenience function for
  203. %% `return_group_member/3' with `Status' of `ok'.
  204. -spec return_group_member(atom(), pid() | error_no_members) -> ok.
  205. return_group_member(GroupName, MemberPid) ->
  206. return_group_member(GroupName, MemberPid, ok).
  207. %% @doc Return a member that was taken from the group `GroupName'. If
  208. %% `Status' is `ok' the member is returned to the pool from which is
  209. %% came. If `Status' is `fail' the member will be terminated and a new
  210. %% member added to the appropriate pool.
  211. -spec return_group_member(atom(), pid() | error_no_members, ok | fail) -> ok.
  212. return_group_member(_, error_no_members, _) ->
  213. ok;
  214. return_group_member(_GroupName, MemberPid, Status) ->
  215. case ets:lookup(?POOLER_GROUP_TABLE, MemberPid) of
  216. [{MemberPid, PoolPid}] ->
  217. return_member(PoolPid, MemberPid, Status);
  218. [] ->
  219. ok
  220. end.
  221. %% @doc Return a member to the pool so it can be reused.
  222. %%
  223. %% If `Status' is 'ok', the member is returned to the pool. If
  224. %% `Status' is 'fail', the member is destroyed and a new member is
  225. %% added to the pool in its place.
  226. -spec return_member(atom() | pid(), pid() | error_no_members, ok | fail) -> ok.
  227. return_member(PoolName, Pid, Status) when is_pid(Pid) andalso
  228. (is_atom(PoolName) orelse
  229. is_pid(PoolName)) andalso
  230. (Status =:= ok orelse
  231. Status =:= fail) ->
  232. gen_server:call(PoolName, {return_member, Pid, Status}, infinity),
  233. ok;
  234. return_member(_, error_no_members, _) ->
  235. ok.
  236. %% @doc Return a member to the pool so it can be reused.
  237. %%
  238. -spec return_member(atom() | pid(), pid() | error_no_members) -> ok.
  239. return_member(PoolName, Pid) when is_pid(Pid) andalso
  240. (is_atom(PoolName) orelse is_pid(PoolName)) ->
  241. gen_server:call(PoolName, {return_member, Pid, ok}, infinity),
  242. ok;
  243. return_member(_, error_no_members) ->
  244. ok.
  245. %% @doc Obtain runtime state info for all pools.
  246. %%
  247. %% Format of the return value is subject to change.
  248. -spec pool_stats(atom() | pid()) -> [tuple()].
  249. pool_stats(PoolName) ->
  250. gen_server:call(PoolName, pool_stats).
  251. %% ------------------------------------------------------------------
  252. %% gen_server Function Definitions
  253. %% ------------------------------------------------------------------
  254. -spec init(#pool{}) -> {'ok', #pool{}, 0}.
  255. init(#pool{}=Pool) ->
  256. #pool{init_count = N} = Pool,
  257. MemberSup = pooler_pool_sup:member_sup_name(Pool),
  258. Pool1 = set_member_sup(Pool, MemberSup),
  259. %% This schedules the next cull when the pool is configured for
  260. %% such and is otherwise a no-op.
  261. Pool2 = cull_members_from_pool(Pool1),
  262. {ok, NewPool} = init_members_sync(N, Pool2),
  263. %% trigger an immediate timeout, handled by handle_info to allow
  264. %% us to register with pg2. We use the timeout mechanism to ensure
  265. %% that a server is added to a group only when it is ready to
  266. %% process messages.
  267. {ok, NewPool, 0}.
  268. set_member_sup(#pool{} = Pool, MemberSup) ->
  269. Pool#pool{member_sup = MemberSup}.
  270. handle_call(take_member, {CPid, _Tag}, #pool{} = Pool) ->
  271. {Member, NewPool} = take_member_from_pool(Pool, CPid),
  272. {reply, Member, NewPool};
  273. handle_call({return_member, Pid, Status}, {_CPid, _Tag}, Pool) ->
  274. {reply, ok, do_return_member(Pid, Status, Pool)};
  275. handle_call({accept_member, Pid}, _From, Pool) ->
  276. {reply, ok, do_accept_member(Pid, Pool)};
  277. handle_call(stop, _From, Pool) ->
  278. {stop, normal, stop_ok, Pool};
  279. handle_call(pool_stats, _From, Pool) ->
  280. {reply, dict:to_list(Pool#pool.all_members), Pool};
  281. handle_call(dump_pool, _From, Pool) ->
  282. {reply, Pool, Pool};
  283. handle_call(_Request, _From, Pool) ->
  284. {noreply, Pool}.
  285. -spec handle_cast(_,_) -> {'noreply', _}.
  286. handle_cast(_Msg, Pool) ->
  287. {noreply, Pool}.
  288. -spec handle_info(_, _) -> {'noreply', _}.
  289. handle_info(timeout, #pool{group = undefined} = Pool) ->
  290. %% ignore
  291. {noreply, Pool};
  292. handle_info(timeout, #pool{group = Group} = Pool) ->
  293. ok = pg2:create(Group),
  294. ok = pg2:join(Group, self()),
  295. {noreply, Pool};
  296. handle_info({'DOWN', MRef, process, Pid, Reason}, State) ->
  297. State1 =
  298. case dict:find(Pid, State#pool.all_members) of
  299. {ok, {_PoolName, _ConsumerPid, _Time}} ->
  300. do_return_member(Pid, fail, State);
  301. error ->
  302. case dict:find(Pid, State#pool.consumer_to_pid) of
  303. {ok, {MRef, Pids}} ->
  304. IsOk = case Reason of
  305. normal -> ok;
  306. _Crash -> fail
  307. end,
  308. lists:foldl(
  309. fun(P, S) -> do_return_member(P, IsOk, S) end,
  310. State, Pids);
  311. error ->
  312. State
  313. end
  314. end,
  315. {noreply, State1};
  316. handle_info(cull_pool, Pool) ->
  317. {noreply, cull_members_from_pool(Pool)};
  318. handle_info(_Info, State) ->
  319. {noreply, State}.
  320. -spec terminate(_, _) -> 'ok'.
  321. terminate(_Reason, _State) ->
  322. ok.
  323. -spec code_change(_, _, _) -> {'ok', _}.
  324. code_change(_OldVsn, State, _Extra) ->
  325. {ok, State}.
  326. %% ------------------------------------------------------------------
  327. %% Internal Function Definitions
  328. %% ------------------------------------------------------------------
  329. do_accept_member({Ref, Pid},
  330. #pool{
  331. all_members = AllMembers,
  332. free_pids = Free,
  333. free_count = NumFree,
  334. starting_members = StartingMembers0,
  335. member_start_timeout = StartTimeout
  336. } = Pool) when is_pid(Pid) ->
  337. %% make sure we don't accept a timedout member
  338. StartingMembers = remove_stale_starting_members(Pool, StartingMembers0,
  339. StartTimeout),
  340. case lists:keymember(Ref, 1, StartingMembers) of
  341. false ->
  342. %% a pid we didn't ask to start, ignore it.
  343. %% should we log it?
  344. Pool;
  345. true ->
  346. StartingMembers1 = lists:keydelete(Ref, 1, StartingMembers),
  347. MRef = erlang:monitor(process, Pid),
  348. Entry = {MRef, free, os:timestamp()},
  349. AllMembers1 = store_all_members(Pid, Entry, AllMembers),
  350. Pool#pool{free_pids = Free ++ [Pid],
  351. free_count = NumFree + 1,
  352. all_members = AllMembers1,
  353. starting_members = StartingMembers1}
  354. end;
  355. do_accept_member({Ref, _Reason}, #pool{starting_members = StartingMembers0,
  356. member_start_timeout = StartTimeout} = Pool) ->
  357. %% member start failed, remove in-flight ref and carry on.
  358. StartingMembers = remove_stale_starting_members(Pool, StartingMembers0,
  359. StartTimeout),
  360. StartingMembers1 = lists:keydelete(Ref, 1, StartingMembers),
  361. Pool#pool{starting_members = StartingMembers1}.
  362. -spec remove_stale_starting_members(#pool{}, [{reference(), erlang:timestamp()}],
  363. time_spec()) -> [{reference(), erlang:timestamp()}].
  364. remove_stale_starting_members(Pool, StartingMembers, MaxAge) ->
  365. Now = os:timestamp(),
  366. MaxAgeSecs = time_as_secs(MaxAge),
  367. lists:filter(fun(SM) ->
  368. starting_member_not_stale(Pool, Now, SM, MaxAgeSecs)
  369. end, StartingMembers).
  370. starting_member_not_stale(Pool, Now, {_Ref, StartTime}, MaxAgeSecs) ->
  371. case secs_between(StartTime, Now) < MaxAgeSecs of
  372. true ->
  373. true;
  374. false ->
  375. error_logger:error_msg("pool '~s': starting member timeout", [Pool#pool.name]),
  376. send_metric(Pool, starting_member_timeout, {inc, 1}, counter),
  377. false
  378. end.
  379. init_members_sync(N, #pool{name = PoolName} = Pool) ->
  380. Self = self(),
  381. StartTime = os:timestamp(),
  382. StartRefs = [ {pooler_starter:start_member(Pool, Self), StartTime}
  383. || _I <- lists:seq(1, N) ],
  384. Pool1 = Pool#pool{starting_members = StartRefs},
  385. case collect_init_members(Pool1) of
  386. timeout ->
  387. error_logger:error_msg("pool '~s': exceeded timeout waiting for ~B members",
  388. [PoolName, Pool1#pool.init_count]),
  389. error({timeout, "unable to start members"});
  390. #pool{} = Pool2 ->
  391. {ok, Pool2}
  392. end.
  393. collect_init_members(#pool{starting_members = []} = Pool) ->
  394. Pool;
  395. collect_init_members(#pool{member_start_timeout = StartTimeout} = Pool) ->
  396. Timeout = time_as_millis(StartTimeout),
  397. receive
  398. {accept_member, {Ref, Member}} ->
  399. collect_init_members(do_accept_member({Ref, Member}, Pool))
  400. after
  401. Timeout ->
  402. timeout
  403. end.
  404. -spec take_member_from_pool(#pool{}, {pid(), term()}) ->
  405. {error_no_members | pid(), #pool{}}.
  406. take_member_from_pool(#pool{init_count = InitCount,
  407. max_count = Max,
  408. free_pids = Free,
  409. in_use_count = NumInUse,
  410. free_count = NumFree,
  411. consumer_to_pid = CPMap,
  412. starting_members = StartingMembers0,
  413. member_start_timeout = StartTimeout} = Pool,
  414. From) ->
  415. send_metric(Pool, take_rate, 1, meter),
  416. StartingMembers = remove_stale_starting_members(Pool, StartingMembers0,
  417. StartTimeout),
  418. NumCanAdd = Max - (NumInUse + NumFree + length(StartingMembers)),
  419. case Free of
  420. [] when NumCanAdd =< 0 ->
  421. send_metric(Pool, error_no_members_count, {inc, 1}, counter),
  422. send_metric(Pool, events, error_no_members, history),
  423. {error_no_members, Pool};
  424. [] when NumCanAdd > 0 ->
  425. %% Limit concurrently starting members to init_count. Add
  426. %% up to init_count members. Starting members here means
  427. %% we always return an error_no_members for a take request
  428. %% when all members are in-use. By adding a batch of new
  429. %% members, the pool should reach a steady state with
  430. %% unused members culled over time (if scheduled cull is
  431. %% enabled).
  432. NumToAdd = min(InitCount - length(StartingMembers), NumCanAdd),
  433. Pool1 = add_members_async(NumToAdd, Pool),
  434. send_metric(Pool, error_no_members_count, {inc, 1}, counter),
  435. send_metric(Pool, events, error_no_members, history),
  436. {error_no_members, Pool1};
  437. [Pid|Rest] ->
  438. Pool1 = Pool#pool{free_pids = Rest, in_use_count = NumInUse + 1,
  439. free_count = NumFree - 1},
  440. send_metric(Pool, in_use_count, Pool1#pool.in_use_count, histogram),
  441. send_metric(Pool, free_count, Pool1#pool.free_count, histogram),
  442. {Pid, Pool1#pool{
  443. consumer_to_pid = add_member_to_consumer(Pid, From, CPMap),
  444. all_members = set_cpid_for_member(Pid, From,
  445. Pool1#pool.all_members)
  446. }}
  447. end.
  448. %% @doc Add `Count' members to `Pool' asynchronously. Returns updated
  449. %% `Pool' record with starting member refs added to field
  450. %% `starting_members'.
  451. add_members_async(Count, #pool{starting_members = StartingMembers} = Pool) ->
  452. StartTime = os:timestamp(),
  453. StartRefs = [ {pooler_starter:start_member(Pool), StartTime}
  454. || _I <- lists:seq(1, Count) ],
  455. Pool#pool{starting_members = StartRefs ++ StartingMembers}.
  456. -spec do_return_member(pid(), ok | fail, #pool{}) -> #pool{}.
  457. do_return_member(Pid, ok, #pool{all_members = AllMembers} = Pool) ->
  458. clean_group_table(Pid, Pool),
  459. case dict:find(Pid, AllMembers) of
  460. {ok, {MRef, CPid, _}} ->
  461. #pool{free_pids = Free, in_use_count = NumInUse,
  462. free_count = NumFree} = Pool,
  463. Pool1 = Pool#pool{free_pids = [Pid | Free], in_use_count = NumInUse - 1,
  464. free_count = NumFree + 1},
  465. Entry = {MRef, free, os:timestamp()},
  466. Pool1#pool{all_members = store_all_members(Pid, Entry, AllMembers),
  467. consumer_to_pid = cpmap_remove(Pid, CPid,
  468. Pool1#pool.consumer_to_pid)};
  469. error ->
  470. Pool
  471. end;
  472. do_return_member(Pid, fail, #pool{all_members = AllMembers} = Pool) ->
  473. % for the fail case, perhaps the member crashed and was alerady
  474. % removed, so use find instead of fetch and ignore missing.
  475. clean_group_table(Pid, Pool),
  476. case dict:find(Pid, AllMembers) of
  477. {ok, {_MRef, _, _}} ->
  478. Pool1 = remove_pid(Pid, Pool),
  479. add_members_async(1, Pool1);
  480. error ->
  481. Pool
  482. end.
  483. clean_group_table(_MemberPid, #pool{group = undefined}) ->
  484. ok;
  485. clean_group_table(MemberPid, #pool{group = _GroupName}) ->
  486. ets:delete(?POOLER_GROUP_TABLE, MemberPid).
  487. % @doc Remove `Pid' from the pid list associated with `CPid' in the
  488. % consumer to member map given by `CPMap'.
  489. %
  490. % If `Pid' is the last element in `CPid's pid list, then the `CPid'
  491. % entry is removed entirely.
  492. %
  493. -spec cpmap_remove(pid(), pid() | free, p_dict()) -> p_dict().
  494. cpmap_remove(_Pid, free, CPMap) ->
  495. CPMap;
  496. cpmap_remove(Pid, CPid, CPMap) ->
  497. case dict:find(CPid, CPMap) of
  498. {ok, {MRef, Pids0}} ->
  499. Pids1 = lists:delete(Pid, Pids0),
  500. case Pids1 of
  501. [_H|_T] ->
  502. dict:store(CPid, {MRef, Pids1}, CPMap);
  503. [] ->
  504. %% no more members for this consumer
  505. erlang:demonitor(MRef, [flush]),
  506. dict:erase(CPid, CPMap)
  507. end;
  508. error ->
  509. % FIXME: this shouldn't happen, should we log or error?
  510. CPMap
  511. end.
  512. % @doc Remove and kill a pool member.
  513. %
  514. % Handles in-use and free members. Logs an error if the pid is not
  515. % tracked in state.all_members.
  516. %
  517. -spec remove_pid(pid(), #pool{}) -> #pool{}.
  518. remove_pid(Pid, Pool) ->
  519. #pool{name = PoolName,
  520. all_members = AllMembers,
  521. consumer_to_pid = CPMap} = Pool,
  522. case dict:find(Pid, AllMembers) of
  523. {ok, {MRef, free, _Time}} ->
  524. % remove an unused member
  525. erlang:demonitor(MRef, [flush]),
  526. FreePids = lists:delete(Pid, Pool#pool.free_pids),
  527. NumFree = Pool#pool.free_count - 1,
  528. Pool1 = Pool#pool{free_pids = FreePids, free_count = NumFree},
  529. exit(Pid, kill),
  530. send_metric(Pool1, killed_free_count, {inc, 1}, counter),
  531. Pool1#pool{all_members = dict:erase(Pid, AllMembers)};
  532. {ok, {MRef, CPid, _Time}} ->
  533. %% remove a member being consumed. No notice is sent to
  534. %% the consumer.
  535. erlang:demonitor(MRef, [flush]),
  536. Pool1 = Pool#pool{in_use_count = Pool#pool.in_use_count - 1},
  537. exit(Pid, kill),
  538. send_metric(Pool1, killed_in_use_count, {inc, 1}, counter),
  539. Pool1#pool{consumer_to_pid = cpmap_remove(Pid, CPid, CPMap),
  540. all_members = dict:erase(Pid, AllMembers)};
  541. error ->
  542. error_logger:error_report({{pool, PoolName}, unknown_pid, Pid,
  543. erlang:get_stacktrace()}),
  544. send_metric(Pool, events, unknown_pid, history),
  545. Pool
  546. end.
  547. -spec store_all_members(pid(),
  548. {reference(), free | pid(), {_, _, _}}, p_dict()) -> p_dict().
  549. store_all_members(Pid, Val = {_MRef, _CPid, _Time}, AllMembers) ->
  550. dict:store(Pid, Val, AllMembers).
  551. -spec set_cpid_for_member(pid(), pid(), p_dict()) -> p_dict().
  552. set_cpid_for_member(MemberPid, CPid, AllMembers) ->
  553. dict:update(MemberPid,
  554. fun({MRef, free, Time = {_, _, _}}) ->
  555. {MRef, CPid, Time}
  556. end, AllMembers).
  557. -spec add_member_to_consumer(pid(), pid(), p_dict()) -> p_dict().
  558. add_member_to_consumer(MemberPid, CPid, CPMap) ->
  559. %% we can't use dict:update here because we need to create the
  560. %% monitor if we aren't already tracking this consumer.
  561. case dict:find(CPid, CPMap) of
  562. {ok, {MRef, MList}} ->
  563. dict:store(CPid, {MRef, [MemberPid | MList]}, CPMap);
  564. error ->
  565. MRef = erlang:monitor(process, CPid),
  566. dict:store(CPid, {MRef, [MemberPid]}, CPMap)
  567. end.
  568. -spec cull_members_from_pool(#pool{}) -> #pool{}.
  569. cull_members_from_pool(#pool{cull_interval = {0, _}} = Pool) ->
  570. %% 0 cull_interval means do not cull
  571. Pool;
  572. cull_members_from_pool(#pool{init_count = C, max_count = C} = Pool) ->
  573. %% if init_count matches max_count, then we will not dynamically
  574. %% add capacity and should not schedule culling regardless of
  575. %% cull_interval config.
  576. Pool;
  577. cull_members_from_pool(#pool{name = PoolName,
  578. free_count = FreeCount,
  579. init_count = InitCount,
  580. in_use_count = InUseCount,
  581. cull_interval = Delay,
  582. max_age = MaxAge,
  583. all_members = AllMembers} = Pool) ->
  584. MaxCull = FreeCount - (InitCount - InUseCount),
  585. Pool1 = case MaxCull > 0 of
  586. true ->
  587. MemberInfo = member_info(Pool#pool.free_pids, AllMembers),
  588. ExpiredMembers =
  589. expired_free_members(MemberInfo, os:timestamp(), MaxAge),
  590. CullList = lists:sublist(ExpiredMembers, MaxCull),
  591. lists:foldl(fun({CullMe, _}, S) -> remove_pid(CullMe, S) end,
  592. Pool, CullList);
  593. false ->
  594. Pool
  595. end,
  596. schedule_cull(PoolName, Delay),
  597. Pool1.
  598. -spec schedule_cull(PoolName :: atom() | pid(),
  599. Delay :: time_spec()) -> reference().
  600. %% @doc Schedule a pool cleaning or "cull" for `PoolName' in which
  601. %% members older than `max_age' will be removed until the pool has
  602. %% `init_count' members. Uses `erlang:send_after/3' for light-weight
  603. %% timer that will be auto-cancelled upon pooler shutdown.
  604. schedule_cull(PoolName, Delay) ->
  605. DelayMillis = time_as_millis(Delay),
  606. %% use pid instead of server name atom to take advantage of
  607. %% automatic cancelling
  608. erlang:send_after(DelayMillis, PoolName, cull_pool).
  609. -spec member_info([pid()], p_dict()) -> [{pid(), member_info()}].
  610. member_info(Pids, AllMembers) ->
  611. [ {P, dict:fetch(P, AllMembers)} || P <- Pids ].
  612. -spec expired_free_members(Members :: [{pid(), member_info()}],
  613. Now :: {_, _, _},
  614. MaxAge :: time_spec()) -> [{pid(), free_member_info()}].
  615. expired_free_members(Members, Now, MaxAge) ->
  616. MaxMicros = time_as_micros(MaxAge),
  617. [ MI || MI = {_, {_, free, LastReturn}} <- Members,
  618. timer:now_diff(Now, LastReturn) >= MaxMicros ].
  619. %% Send a metric using the metrics module from application config or
  620. %% do nothing.
  621. -spec send_metric(Pool :: #pool{},
  622. Label :: atom(),
  623. Value :: metric_value(),
  624. Type :: metric_type()) -> ok.
  625. send_metric(#pool{metrics_mod = pooler_no_metrics}, _Label, _Value, _Type) ->
  626. ok;
  627. send_metric(#pool{name = PoolName, metrics_mod = MetricsMod}, Label, Value, Type) ->
  628. MetricName = pool_metric(PoolName, Label),
  629. MetricsMod:notify(MetricName, Value, Type),
  630. ok.
  631. -spec pool_metric(atom(), atom()) -> binary().
  632. pool_metric(PoolName, Metric) ->
  633. iolist_to_binary([<<"pooler.">>, atom_to_binary(PoolName, utf8),
  634. ".", atom_to_binary(Metric, utf8)]).
  635. -spec time_as_secs(time_spec()) -> non_neg_integer().
  636. time_as_secs({Time, Unit}) ->
  637. time_as_micros({Time, Unit}) div 1000000.
  638. -spec time_as_millis(time_spec()) -> non_neg_integer().
  639. %% @doc Convert time unit into milliseconds.
  640. time_as_millis({Time, Unit}) ->
  641. time_as_micros({Time, Unit}) div 1000.
  642. -spec time_as_micros(time_spec()) -> non_neg_integer().
  643. %% @doc Convert time unit into microseconds
  644. time_as_micros({Time, min}) ->
  645. 60 * 1000 * 1000 * Time;
  646. time_as_micros({Time, sec}) ->
  647. 1000 * 1000 * Time;
  648. time_as_micros({Time, ms}) ->
  649. 1000 * Time;
  650. time_as_micros({Time, mu}) ->
  651. Time.
  652. secs_between({Mega1, Secs1, _}, {Mega2, Secs2, _}) ->
  653. (Mega2 - Mega1) * 1000000 + (Secs2 - Secs1).