pooler.erl 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619
  1. %% @author Seth Falcon <seth@userprimary.net>
  2. %% @copyright 2011-2012 Seth Falcon
  3. %% @doc This is the main interface to the pooler application
  4. %%
  5. %% To integrate with your application, you probably want to call
  6. %% application:start(pooler) after having specified appropriate
  7. %% configuration for the pooler application (either via a config file
  8. %% or appropriate calls to the application module to set the
  9. %% application's config).
  10. %%
  11. -module(pooler).
  12. -behaviour(gen_server).
  13. -include("pooler.hrl").
  14. -include_lib("eunit/include/eunit.hrl").
  15. %% type specs for pool metrics
  16. -type metric_value() :: 'unknown_pid' |
  17. non_neg_integer() |
  18. {'add_pids_failed', non_neg_integer(), non_neg_integer()} |
  19. {'inc',1} |
  20. 'error_no_members'.
  21. -type metric_type() :: 'counter' | 'histogram' | 'history' | 'meter'.
  22. %% ------------------------------------------------------------------
  23. %% API Function Exports
  24. %% ------------------------------------------------------------------
  25. -export([accept_member/2,
  26. start_link/1,
  27. take_member/1,
  28. take_group_member/1,
  29. return_group_member/2,
  30. return_group_member/3,
  31. return_member/2,
  32. return_member/3,
  33. pool_stats/1]).
  34. %% ------------------------------------------------------------------
  35. %% gen_server Function Exports
  36. %% ------------------------------------------------------------------
  37. -export([init/1,
  38. handle_call/3,
  39. handle_cast/2,
  40. handle_info/2,
  41. terminate/2,
  42. code_change/3]).
  43. %% To help with testing internal functions
  44. -ifdef(TEST).
  45. -compile([export_all]).
  46. -endif.
  47. %% ------------------------------------------------------------------
  48. %% API Function Definitions
  49. %% ------------------------------------------------------------------
  50. start_link(#pool{name = Name} = Pool) ->
  51. gen_server:start_link({local, Name}, ?MODULE, Pool, []).
  52. %% @doc For INTERNAL use. Adds `MemberPid' to the pool.
  53. -spec accept_member(atom() | pid(), pid() | {noproc, _}) -> ok.
  54. accept_member(PoolName, MemberPid) ->
  55. gen_server:call(PoolName, {accept_member, MemberPid}).
  56. %% @doc Obtain exclusive access to a member from `PoolName'.
  57. %%
  58. %% If no free members are available, 'error_no_members' is returned.
  59. %%
  60. -spec take_member(atom() | pid()) -> pid() | error_no_members.
  61. take_member(PoolName) when is_atom(PoolName) orelse is_pid(PoolName) ->
  62. gen_server:call(PoolName, take_member, infinity).
  63. %% @doc Take a member from a randomly selected member of the group
  64. %% `GroupName'. Returns `MemberPid' or `error_no_members'. If no
  65. %% members are available in the randomly chosen pool, all other pools
  66. %% in the group are tried in order.
  67. -spec take_group_member(atom()) -> pid() | error_no_members.
  68. take_group_member(GroupName) ->
  69. case pg2:get_local_members(GroupName) of
  70. {error, {no_such_group, GroupName}} = Error ->
  71. Error;
  72. Members ->
  73. %% Put a random member at the front of the list and then
  74. %% return the first member you can walking the list.
  75. Idx = crypto:rand_uniform(1, length(Members) + 1),
  76. {Pid, Rest} = extract_nth(Idx, Members),
  77. take_first_member([Pid | Rest])
  78. end.
  79. take_first_member([Pid | Rest]) ->
  80. case take_member(Pid) of
  81. error_no_members ->
  82. take_first_member(Rest);
  83. Member ->
  84. ets:insert(?POOLER_GROUP_TABLE, {Member, Pid}),
  85. Member
  86. end;
  87. take_first_member([]) ->
  88. error_no_members.
  89. %% this helper function returns `{Nth_Elt, Rest}' where `Nth_Elt' is
  90. %% the nth element of `L' and `Rest' is `L -- [Nth_Elt]'.
  91. extract_nth(N, L) ->
  92. extract_nth(N, L, []).
  93. extract_nth(1, [H | T], Acc) ->
  94. {H, Acc ++ T};
  95. extract_nth(N, [H | T], Acc) ->
  96. extract_nth(N - 1, T, [H | Acc]);
  97. extract_nth(_, [], _) ->
  98. error(badarg).
  99. %% @doc Return a member that was taken from the group
  100. %% `GroupName'. This is a convenience function for
  101. %% `return_group_member/3' with `Status' of `ok'.
  102. -spec return_group_member(atom(), pid() | error_no_members) -> ok.
  103. return_group_member(GroupName, MemberPid) ->
  104. return_group_member(GroupName, MemberPid, ok).
  105. %% @doc Return a member that was taken from the group `GroupName'. If
  106. %% `Status' is `ok' the member is returned to the pool from which is
  107. %% came. If `Status' is `fail' the member will be terminated and a new
  108. %% member added to the appropriate pool.
  109. -spec return_group_member(atom(), pid() | error_no_members, ok | fail) -> ok.
  110. return_group_member(_, error_no_members, _) ->
  111. ok;
  112. return_group_member(_GroupName, MemberPid, Status) ->
  113. case ets:lookup(?POOLER_GROUP_TABLE, MemberPid) of
  114. [{MemberPid, PoolPid}] ->
  115. return_member(PoolPid, MemberPid, Status);
  116. [] ->
  117. ok
  118. end.
  119. %% @doc Return a member to the pool so it can be reused.
  120. %%
  121. %% If `Status' is 'ok', the member is returned to the pool. If
  122. %% `Status' is 'fail', the member is destroyed and a new member is
  123. %% added to the pool in its place.
  124. -spec return_member(atom() | pid(), pid() | error_no_members, ok | fail) -> ok.
  125. return_member(PoolName, Pid, Status) when is_pid(Pid) andalso
  126. (is_atom(PoolName) orelse
  127. is_pid(PoolName)) andalso
  128. (Status =:= ok orelse
  129. Status =:= fail) ->
  130. gen_server:call(PoolName, {return_member, Pid, Status}, infinity),
  131. ok;
  132. return_member(_, error_no_members, _) ->
  133. ok.
  134. %% @doc Return a member to the pool so it can be reused.
  135. %%
  136. -spec return_member(atom() | pid(), pid() | error_no_members) -> ok.
  137. return_member(PoolName, Pid) when is_pid(Pid) andalso
  138. (is_atom(PoolName) orelse is_pid(PoolName)) ->
  139. gen_server:call(PoolName, {return_member, Pid, ok}, infinity),
  140. ok;
  141. return_member(_, error_no_members) ->
  142. ok.
  143. %% @doc Obtain runtime state info for all pools.
  144. %%
  145. %% Format of the return value is subject to change.
  146. -spec pool_stats(atom() | pid()) -> [tuple()].
  147. pool_stats(PoolName) ->
  148. gen_server:call(PoolName, pool_stats).
  149. %% ------------------------------------------------------------------
  150. %% gen_server Function Definitions
  151. %% ------------------------------------------------------------------
  152. -spec init(#pool{}) -> {'ok', #pool{}, 0}.
  153. init(#pool{}=Pool) ->
  154. #pool{init_count = N} = Pool,
  155. MemberSup = pooler_pool_sup:member_sup_name(Pool),
  156. Pool1 = set_member_sup(Pool, MemberSup),
  157. Pool2 = cull_members_from_pool(Pool1),
  158. {ok, NewPool} = add_pids(N, Pool2),
  159. %% trigger an immediate timeout, handled by handle_info to allow
  160. %% us to register with pg2. We use the timeout mechanism to ensure
  161. %% that a server is added to a group only when it is ready to
  162. %% process messages.
  163. {ok, NewPool, 0}.
  164. set_member_sup(#pool{} = Pool, MemberSup) ->
  165. Pool#pool{member_sup = MemberSup}.
  166. handle_call(take_member, {CPid, _Tag}, #pool{} = Pool) ->
  167. {Member, NewPool} = take_member_from_pool(Pool, CPid),
  168. {reply, Member, NewPool};
  169. handle_call({return_member, Pid, Status}, {_CPid, _Tag}, Pool) ->
  170. {reply, ok, do_return_member(Pid, Status, Pool)};
  171. handle_call({accept_member, Pid}, _From, Pool) ->
  172. {reply, ok, do_accept_member(Pid, Pool)};
  173. handle_call(stop, _From, Pool) ->
  174. {stop, normal, stop_ok, Pool};
  175. handle_call(pool_stats, _From, Pool) ->
  176. {reply, dict:to_list(Pool#pool.all_members), Pool};
  177. handle_call(dump_pool, _From, Pool) ->
  178. {reply, Pool, Pool};
  179. handle_call(_Request, _From, Pool) ->
  180. {noreply, Pool}.
  181. -spec handle_cast(_,_) -> {'noreply', _}.
  182. handle_cast(_Msg, Pool) ->
  183. {noreply, Pool}.
  184. -spec handle_info(_, _) -> {'noreply', _}.
  185. handle_info(timeout, #pool{group = undefined} = Pool) ->
  186. %% ignore
  187. {noreply, Pool};
  188. handle_info(timeout, #pool{group = Group} = Pool) ->
  189. ok = pg2:create(Group),
  190. ok = pg2:join(Group, self()),
  191. {noreply, Pool};
  192. handle_info({'DOWN', MRef, process, Pid, Reason}, State) ->
  193. State1 =
  194. case dict:find(Pid, State#pool.all_members) of
  195. {ok, {_PoolName, _ConsumerPid, _Time}} ->
  196. do_return_member(Pid, fail, State);
  197. error ->
  198. case dict:find(Pid, State#pool.consumer_to_pid) of
  199. {ok, {MRef, Pids}} ->
  200. IsOk = case Reason of
  201. normal -> ok;
  202. _Crash -> fail
  203. end,
  204. lists:foldl(
  205. fun(P, S) -> do_return_member(P, IsOk, S) end,
  206. State, Pids);
  207. error ->
  208. State
  209. end
  210. end,
  211. {noreply, State1};
  212. handle_info(cull_pool, Pool) ->
  213. {noreply, cull_members_from_pool(Pool)};
  214. handle_info(_Info, State) ->
  215. {noreply, State}.
  216. -spec terminate(_, _) -> 'ok'.
  217. terminate(_Reason, _State) ->
  218. ok.
  219. -spec code_change(_, _, _) -> {'ok', _}.
  220. code_change(_OldVsn, State, _Extra) ->
  221. {ok, State}.
  222. %% ------------------------------------------------------------------
  223. %% Internal Function Definitions
  224. %% ------------------------------------------------------------------
  225. do_accept_member({Ref, Pid},
  226. #pool{
  227. all_members = AllMembers,
  228. free_pids = Free,
  229. free_count = NumFree,
  230. starting_members = StartingMembers
  231. } = Pool) when is_pid(Pid) ->
  232. case lists:keymember(Ref, 1, StartingMembers) of
  233. false ->
  234. %% a pid we didn't ask to start, ignore it.
  235. %% should we log it?
  236. Pool;
  237. true ->
  238. StartingMembers1 = lists:keydelete(Ref, 1, StartingMembers),
  239. MRef = erlang:monitor(process, Pid),
  240. Entry = {MRef, free, os:timestamp()},
  241. AllMembers1 = store_all_members(Pid, Entry, AllMembers),
  242. Pool#pool{free_pids = Free ++ [Pid],
  243. free_count = NumFree + 1,
  244. all_members = AllMembers1,
  245. starting_members = StartingMembers1}
  246. end;
  247. do_accept_member({Ref, _Reason}, #pool{starting_members = StartingMembers} = Pool) ->
  248. %% member start failed, remove in-flight ref and carry on.
  249. StartingMembers1 = lists:keydelete(Ref, 1, StartingMembers),
  250. Pool#pool{starting_members = StartingMembers1}.
  251. -spec remove_stale_starting_members([{reference(), erlang:timestamp()}], time_spec()) -> [{reference(), erlang:timestamp()}].
  252. remove_stale_starting_members(StartingMembers, MaxAge) ->
  253. Now = calendar:time_to_seconds(os:timestamp()),
  254. lists:filter(fun({_Ref, StartTime}) ->
  255. StartSecs = calendar:time_to_seconds(StartTime),
  256. (Now - StartSecs) < MaxAge
  257. end, StartingMembers).
  258. % FIXME: creation of new pids should probably happen
  259. % in a spawned process to avoid tying up the loop.
  260. -spec add_pids(non_neg_integer(), #pool{}) ->
  261. {max_count_reached | ok, #pool{}}.
  262. add_pids(N, Pool) ->
  263. #pool{max_count = Max, free_pids = Free,
  264. in_use_count = NumInUse, free_count = NumFree,
  265. member_sup = PoolSup,
  266. all_members = AllMembers} = Pool,
  267. Total = NumFree + NumInUse,
  268. PoolName = Pool#pool.name,
  269. case Total + N =< Max of
  270. true ->
  271. {AllMembers1, NewPids} = start_n_pids(N, PoolSup, AllMembers),
  272. %% start_n_pids may return fewer than N if errors were
  273. %% encountered.
  274. NewPidCount = length(NewPids),
  275. case NewPidCount =:= N of
  276. true -> ok;
  277. false ->
  278. error_logger:error_msg("pool '~s' tried to add ~B members, only added ~B~n",
  279. [PoolName, N, NewPidCount]),
  280. %% consider changing this to a count?
  281. send_metric(Pool, events,
  282. {add_pids_failed, N, NewPidCount}, history)
  283. end,
  284. Pool1 = Pool#pool{free_pids = Free ++ NewPids,
  285. free_count = length(Free) + NewPidCount},
  286. {ok, Pool1#pool{all_members = AllMembers1}};
  287. false ->
  288. {max_count_reached, Pool}
  289. end.
  290. -spec take_member_from_pool(#pool{}, {pid(), term()}) ->
  291. {error_no_members | pid(), #pool{}}.
  292. take_member_from_pool(#pool{max_count = Max,
  293. free_pids = Free,
  294. in_use_count = NumInUse,
  295. free_count = NumFree,
  296. consumer_to_pid = CPMap,
  297. starting_members = StartingMembers} = Pool,
  298. From) ->
  299. send_metric(Pool, take_rate, 1, meter),
  300. CanAdd = (NumInUse + length(StartingMembers)) < Max,
  301. case Free of
  302. [] when CanAdd =:= false ->
  303. send_metric(Pool, error_no_members_count, {inc, 1}, counter),
  304. send_metric(Pool, events, error_no_members, history),
  305. {error_no_members, Pool};
  306. [] when CanAdd =:= true ->
  307. %% request async member creation, return error for now.
  308. %% also should verify that starting_members length will
  309. %% not exceed max size if all come back success. need a
  310. %% reference to the starter supervisor or else we reuse a
  311. %% starter for now.
  312. {ok, Starter} = pooler_starter_sup:new_starter(),
  313. StartRef = pooler_starter:start_member(Starter, Pool),
  314. %% case add_pids(1, Pool) of
  315. %% {ok, Pool1} ->
  316. %% %% add_pids may have updated our pool
  317. %% take_member_from_pool(Pool1, From, Retries - 1);
  318. %% {max_count_reached, _} ->
  319. %% send_metric(Pool, error_no_members_count, {inc, 1}, counter),
  320. %% send_metric(Pool, events, error_no_members, history),
  321. %% {error_no_members, Pool}
  322. %% end,
  323. send_metric(Pool, error_no_members_count, {inc, 1}, counter),
  324. send_metric(Pool, events, error_no_members, history),
  325. StartingMembers1 = [{StartRef, os:timestamp()} | StartingMembers],
  326. {error_no_members, Pool#pool{starting_members = StartingMembers1}};
  327. %% [] when Retries =:= 0 ->
  328. %% %% max retries reached
  329. %% send_metric(Pool, error_no_members_count, {inc, 1}, counter),
  330. %% send_metric(Pool, events, error_no_members, history),
  331. %% {error_no_members, Pool};
  332. [Pid|Rest] ->
  333. Pool1 = Pool#pool{free_pids = Rest, in_use_count = NumInUse + 1,
  334. free_count = NumFree - 1},
  335. send_metric(Pool, in_use_count, Pool1#pool.in_use_count, histogram),
  336. send_metric(Pool, free_count, Pool1#pool.free_count, histogram),
  337. {Pid, Pool1#pool{
  338. consumer_to_pid = add_member_to_consumer(Pid, From, CPMap),
  339. all_members = set_cpid_for_member(Pid, From,
  340. Pool1#pool.all_members)
  341. }}
  342. end.
  343. -spec do_return_member(pid(), ok | fail, #pool{}) -> #pool{}.
  344. do_return_member(Pid, ok, #pool{all_members = AllMembers} = Pool) ->
  345. clean_group_table(Pid, Pool),
  346. case dict:find(Pid, AllMembers) of
  347. {ok, {MRef, CPid, _}} ->
  348. #pool{free_pids = Free, in_use_count = NumInUse,
  349. free_count = NumFree} = Pool,
  350. Pool1 = Pool#pool{free_pids = [Pid | Free], in_use_count = NumInUse - 1,
  351. free_count = NumFree + 1},
  352. Entry = {MRef, free, os:timestamp()},
  353. Pool1#pool{all_members = store_all_members(Pid, Entry, AllMembers),
  354. consumer_to_pid = cpmap_remove(Pid, CPid,
  355. Pool1#pool.consumer_to_pid)};
  356. error ->
  357. Pool
  358. end;
  359. do_return_member(Pid, fail, #pool{all_members = AllMembers} = Pool) ->
  360. % for the fail case, perhaps the member crashed and was alerady
  361. % removed, so use find instead of fetch and ignore missing.
  362. clean_group_table(Pid, Pool),
  363. case dict:find(Pid, AllMembers) of
  364. {ok, {_MRef, _, _}} ->
  365. Pool1 = remove_pid(Pid, Pool),
  366. case add_pids(1, Pool1) of
  367. {Status, Pool2} when Status =:= ok;
  368. Status =:= max_count_reached ->
  369. Pool2;
  370. {Status, _} ->
  371. erlang:error({error, "unexpected return from add_pid",
  372. Status, erlang:get_stacktrace()}),
  373. send_metric(Pool1, events, bad_return_from_add_pid,
  374. history)
  375. end;
  376. error ->
  377. Pool
  378. end.
  379. clean_group_table(_MemberPid, #pool{group = undefined}) ->
  380. ok;
  381. clean_group_table(MemberPid, #pool{group = _GroupName}) ->
  382. ets:delete(?POOLER_GROUP_TABLE, MemberPid).
  383. % @doc Remove `Pid' from the pid list associated with `CPid' in the
  384. % consumer to member map given by `CPMap'.
  385. %
  386. % If `Pid' is the last element in `CPid's pid list, then the `CPid'
  387. % entry is removed entirely.
  388. %
  389. -spec cpmap_remove(pid(), pid() | free, dict()) -> dict().
  390. cpmap_remove(_Pid, free, CPMap) ->
  391. CPMap;
  392. cpmap_remove(Pid, CPid, CPMap) ->
  393. case dict:find(CPid, CPMap) of
  394. {ok, {MRef, Pids0}} ->
  395. Pids1 = lists:delete(Pid, Pids0),
  396. case Pids1 of
  397. [_H|_T] ->
  398. dict:store(CPid, {MRef, Pids1}, CPMap);
  399. [] ->
  400. %% no more members for this consumer
  401. erlang:demonitor(MRef),
  402. dict:erase(CPid, CPMap)
  403. end;
  404. error ->
  405. % FIXME: this shouldn't happen, should we log or error?
  406. CPMap
  407. end.
  408. % @doc Remove and kill a pool member.
  409. %
  410. % Handles in-use and free members. Logs an error if the pid is not
  411. % tracked in state.all_members.
  412. %
  413. -spec remove_pid(pid(), #pool{}) -> #pool{}.
  414. remove_pid(Pid, Pool) ->
  415. #pool{name = PoolName,
  416. all_members = AllMembers,
  417. consumer_to_pid = CPMap} = Pool,
  418. case dict:find(Pid, AllMembers) of
  419. {ok, {MRef, free, _Time}} ->
  420. % remove an unused member
  421. erlang:demonitor(MRef),
  422. FreePids = lists:delete(Pid, Pool#pool.free_pids),
  423. NumFree = Pool#pool.free_count - 1,
  424. Pool1 = Pool#pool{free_pids = FreePids, free_count = NumFree},
  425. exit(Pid, kill),
  426. send_metric(Pool1, killed_free_count, {inc, 1}, counter),
  427. Pool1#pool{all_members = dict:erase(Pid, AllMembers)};
  428. {ok, {MRef, CPid, _Time}} ->
  429. %% remove a member being consumed. No notice is sent to
  430. %% the consumer.
  431. erlang:demonitor(MRef),
  432. Pool1 = Pool#pool{in_use_count = Pool#pool.in_use_count - 1},
  433. exit(Pid, kill),
  434. send_metric(Pool1, killed_in_use_count, {inc, 1}, counter),
  435. Pool1#pool{consumer_to_pid = cpmap_remove(Pid, CPid, CPMap),
  436. all_members = dict:erase(Pid, AllMembers)};
  437. error ->
  438. error_logger:error_report({{pool, PoolName}, unknown_pid, Pid,
  439. erlang:get_stacktrace()}),
  440. send_metric(Pool, events, unknown_pid, history),
  441. Pool
  442. end.
  443. -spec start_n_pids(non_neg_integer(), pid(), dict()) -> {dict(), [pid()]}.
  444. start_n_pids(N, PoolSup, AllMembers) ->
  445. NewPidsWith = do_n(N, fun(Acc) ->
  446. case supervisor:start_child(PoolSup, []) of
  447. {ok, Pid} ->
  448. MRef = erlang:monitor(process, Pid),
  449. [{MRef, Pid} | Acc];
  450. _Else ->
  451. Acc
  452. end
  453. end, []),
  454. AllMembers1 = lists:foldl(
  455. fun({MRef, Pid}, Dict) ->
  456. Entry = {MRef, free, os:timestamp()},
  457. store_all_members(Pid, Entry, Dict)
  458. end, AllMembers, NewPidsWith),
  459. NewPids = [ Pid || {_MRef, Pid} <- NewPidsWith ],
  460. {AllMembers1, NewPids}.
  461. do_n(0, _Fun, Acc) ->
  462. Acc;
  463. do_n(N, Fun, Acc) ->
  464. do_n(N - 1, Fun, Fun(Acc)).
  465. pool_add_retries(#pool{add_member_retry = Retries}) ->
  466. Retries.
  467. -spec store_all_members(pid(),
  468. {reference(), free | pid(), {_, _, _}}, dict()) -> dict().
  469. store_all_members(Pid, Val = {_MRef, _CPid, _Time}, AllMembers) ->
  470. dict:store(Pid, Val, AllMembers).
  471. -spec set_cpid_for_member(pid(), pid(), dict()) -> dict().
  472. set_cpid_for_member(MemberPid, CPid, AllMembers) ->
  473. dict:update(MemberPid,
  474. fun({MRef, free, Time = {_, _, _}}) ->
  475. {MRef, CPid, Time}
  476. end, AllMembers).
  477. -spec add_member_to_consumer(pid(), pid(), dict()) -> dict().
  478. add_member_to_consumer(MemberPid, CPid, CPMap) ->
  479. %% we can't use dict:update here because we need to create the
  480. %% monitor if we aren't already tracking this consumer.
  481. case dict:find(CPid, CPMap) of
  482. {ok, {MRef, MList}} ->
  483. dict:store(CPid, {MRef, [MemberPid | MList]}, CPMap);
  484. error ->
  485. MRef = erlang:monitor(process, CPid),
  486. dict:store(CPid, {MRef, [MemberPid]}, CPMap)
  487. end.
  488. -spec cull_members_from_pool(#pool{}) -> #pool{}.
  489. cull_members_from_pool(#pool{cull_interval = {0, _}} = Pool) ->
  490. %% 0 cull_interval means do not cull
  491. Pool;
  492. cull_members_from_pool(#pool{name = PoolName,
  493. free_count = FreeCount,
  494. init_count = InitCount,
  495. in_use_count = InUseCount,
  496. cull_interval = Delay,
  497. max_age = MaxAge,
  498. all_members = AllMembers} = Pool) ->
  499. MaxCull = FreeCount - (InitCount - InUseCount),
  500. Pool1 = case MaxCull > 0 of
  501. true ->
  502. MemberInfo = member_info(Pool#pool.free_pids, AllMembers),
  503. ExpiredMembers =
  504. expired_free_members(MemberInfo, os:timestamp(), MaxAge),
  505. CullList = lists:sublist(ExpiredMembers, MaxCull),
  506. lists:foldl(fun({CullMe, _}, S) -> remove_pid(CullMe, S) end,
  507. Pool, CullList);
  508. false ->
  509. Pool
  510. end,
  511. schedule_cull(PoolName, Delay),
  512. Pool1.
  513. -spec schedule_cull(PoolName :: atom() | pid(),
  514. Delay :: time_spec()) -> reference().
  515. %% @doc Schedule a pool cleaning or "cull" for `PoolName' in which
  516. %% members older than `max_age' will be removed until the pool has
  517. %% `init_count' members. Uses `erlang:send_after/3' for light-weight
  518. %% timer that will be auto-cancelled upon pooler shutdown.
  519. schedule_cull(PoolName, Delay) ->
  520. DelayMillis = time_as_millis(Delay),
  521. %% use pid instead of server name atom to take advantage of
  522. %% automatic cancelling
  523. erlang:send_after(DelayMillis, PoolName, cull_pool).
  524. -spec member_info([pid()], dict()) -> [{pid(), member_info()}].
  525. member_info(Pids, AllMembers) ->
  526. [ {P, dict:fetch(P, AllMembers)} || P <- Pids ].
  527. -spec expired_free_members(Members :: [{pid(), member_info()}],
  528. Now :: {_, _, _},
  529. MaxAge :: time_spec()) -> [{pid(), free_member_info()}].
  530. expired_free_members(Members, Now, MaxAge) ->
  531. MaxMicros = time_as_micros(MaxAge),
  532. [ MI || MI = {_, {_, free, LastReturn}} <- Members,
  533. timer:now_diff(Now, LastReturn) >= MaxMicros ].
  534. %% Send a metric using the metrics module from application config or
  535. %% do nothing.
  536. -spec send_metric(Pool :: #pool{},
  537. Label :: atom(),
  538. Value :: metric_value(),
  539. Type :: metric_type()) -> ok.
  540. send_metric(#pool{metrics_mod = pooler_no_metrics}, _Label, _Value, _Type) ->
  541. ok;
  542. send_metric(#pool{name = PoolName, metrics_mod = MetricsMod}, Label, Value, Type) ->
  543. MetricName = pool_metric(PoolName, Label),
  544. MetricsMod:notify(MetricName, Value, Type),
  545. ok.
  546. -spec pool_metric(atom(), atom()) -> binary().
  547. pool_metric(PoolName, Metric) ->
  548. iolist_to_binary([<<"pooler.">>, atom_to_binary(PoolName, utf8),
  549. ".", atom_to_binary(Metric, utf8)]).
  550. -spec time_as_secs(time_spec()) -> non_neg_integer().
  551. time_as_secs({Time, Unit}) ->
  552. time_as_micros({Time, Unit}) div 1000000.
  553. -spec time_as_millis(time_spec()) -> non_neg_integer().
  554. %% @doc Convert time unit into milliseconds.
  555. time_as_millis({Time, Unit}) ->
  556. time_as_micros({Time, Unit}) div 1000.
  557. -spec time_as_micros(time_spec()) -> non_neg_integer().
  558. %% @doc Convert time unit into microseconds
  559. time_as_micros({Time, min}) ->
  560. 60 * 1000 * 1000 * Time;
  561. time_as_micros({Time, sec}) ->
  562. 1000 * 1000 * Time;
  563. time_as_micros({Time, ms}) ->
  564. 1000 * Time;
  565. time_as_micros({Time, mu}) ->
  566. Time.