pooler.erl 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. %% @author Seth Falcon <seth@userprimary.net>
  2. %% @copyright 2011 Seth Falcon
  3. %% @doc This is the main interface to the pooler application
  4. %%
  5. %% To integrate with your application, you probably want to call
  6. %% application:start(pooler) after having specified appropriate
  7. %% configuration for the pooler application (either via a config file
  8. %% or appropriate calls to the application module to set the
  9. %% application's config).
  10. %%
  11. -module(pooler).
  12. -behaviour(gen_server).
  13. -define(SERVER, ?MODULE).
  14. -include_lib("eunit/include/eunit.hrl").
  15. -record(pool, {
  16. name :: string(),
  17. max_count = 100 :: non_neg_integer(),
  18. init_count = 10 :: non_neg_integer(),
  19. start_mfa :: {atom(), atom(), [term()]},
  20. free_pids = [] :: [pid()],
  21. in_use_count = 0 :: non_neg_integer(),
  22. free_count = 0 :: non_neg_integer()
  23. }).
  24. -record(state, {
  25. npools :: non_neg_integer(),
  26. pools = dict:new() :: dict(),
  27. pool_sups = dict:new() :: dict(),
  28. all_members = dict:new() :: dict(),
  29. consumer_to_pid = dict:new() :: dict(),
  30. pool_selector :: array()
  31. }).
  32. -define(gv(X, Y), proplists:get_value(X, Y)).
  33. -define(gv(X, Y, D), proplists:get_value(X, Y, D)).
  34. %% ------------------------------------------------------------------
  35. %% API Function Exports
  36. %% ------------------------------------------------------------------
  37. -export([start/1,
  38. start_link/1,
  39. stop/0,
  40. take_member/0,
  41. return_member/2,
  42. % remove_pool/2,
  43. % add_pool/1,
  44. pool_stats/0]).
  45. %% ------------------------------------------------------------------
  46. %% gen_server Function Exports
  47. %% ------------------------------------------------------------------
  48. -export([init/1,
  49. handle_call/3,
  50. handle_cast/2,
  51. handle_info/2,
  52. terminate/2,
  53. code_change/3]).
  54. %% ------------------------------------------------------------------
  55. %% API Function Definitions
  56. %% ------------------------------------------------------------------
  57. start_link(Config) ->
  58. gen_server:start_link({local, ?SERVER}, ?MODULE, Config, []).
  59. start(Config) ->
  60. gen_server:start_link({local, ?SERVER}, ?MODULE, Config, []).
  61. stop() ->
  62. gen_server:call(?SERVER, stop).
  63. %% @doc Obtain exclusive access to a member from a randomly selected pool.
  64. %%
  65. %% If there are no free members in the randomly selected pool, then a
  66. %% member will be returned from the pool with the most free members.
  67. %% If no free members are available, 'error_no_members' is returned.
  68. %%
  69. -spec take_member() -> pid() | error_no_members.
  70. take_member() ->
  71. gen_server:call(?SERVER, take_member).
  72. %% @doc Return a member to the pool so it can be reused.
  73. %%
  74. %% If `Status' is 'ok', the member is returned to the pool. If
  75. %% `Status' is 'fail', the member is destroyed and a new member is
  76. %% added to the pool in its place.
  77. -spec return_member(pid(), ok | fail) -> ok.
  78. return_member(Pid, Status) when Status == ok; Status == fail ->
  79. CPid = self(),
  80. gen_server:cast(?SERVER, {return_member, Pid, Status, CPid}),
  81. ok.
  82. % TODO:
  83. % remove_pool(Name, How) when How == graceful; How == immediate ->
  84. % gen_server:call(?SERVER, {remove_pool, Name, How}).
  85. % TODO:
  86. % add_pool(Pool) ->
  87. % gen_server:call(?SERVER, {add_pool, Pool}).
  88. %% @doc Obtain runtime state info for all pools.
  89. %%
  90. %% Format of the return value is subject to change.
  91. -spec pool_stats() -> [tuple()].
  92. pool_stats() ->
  93. gen_server:call(?SERVER, pool_stats).
  94. %% ------------------------------------------------------------------
  95. %% gen_server Function Definitions
  96. %% ------------------------------------------------------------------
  97. -spec init([any()]) -> {'ok', #state{npools::'undefined' | non_neg_integer(),
  98. pools::dict(),
  99. pool_sups::dict(),
  100. all_members::dict(),
  101. consumer_to_pid::dict(),
  102. pool_selector::'undefined' | array()}}.
  103. init(Config) ->
  104. PoolRecs = [ props_to_pool(P) || P <- ?gv(pools, Config) ],
  105. Pools = [ {Pool#pool.name, Pool} || Pool <- PoolRecs ],
  106. PoolSups =
  107. lists:map(
  108. fun(#pool{name = Name, start_mfa = MFA}) ->
  109. {ok, SupPid} = supervisor:start_child(pooler_pool_sup, [MFA]),
  110. {Name, SupPid}
  111. end, PoolRecs),
  112. State0 = #state{npools = length(Pools),
  113. pools = dict:from_list(Pools),
  114. pool_sups = dict:from_list(PoolSups),
  115. pool_selector = array:from_list([PN || {PN, _} <- Pools])
  116. },
  117. {ok, State} = lists:foldl(
  118. fun(#pool{name = PName, init_count = N}, {ok, AccState}) ->
  119. add_pids(PName, N, AccState)
  120. end, {ok, State0}, PoolRecs),
  121. process_flag(trap_exit, true),
  122. {ok, State}.
  123. -spec handle_call(_, _, _) -> {'noreply','ok',_} |
  124. {'reply',
  125. 'error_no_members' | pid() | [{_,_}],
  126. #state{npools::'undefined' | non_neg_integer(),
  127. pools::dict(),
  128. pool_sups::dict(),
  129. all_members::dict(),
  130. consumer_to_pid::dict(),
  131. pool_selector::'undefined' | array()}}
  132. | {'stop','normal','stop_ok', _}.
  133. handle_call(take_member, {CPid, _Tag},
  134. State = #state{pool_selector = PS, npools = NP}) ->
  135. % attempt to return a member from a randomly selected pool. If
  136. % that pool has no members, find the pool with most free members
  137. % and return a member from there.
  138. PoolName = array:get(crypto:rand_uniform(0, NP), PS),
  139. case take_member(PoolName, CPid, State) of
  140. {error_no_members, NewState} ->
  141. case max_free_pool(State#state.pools) of
  142. error_no_members ->
  143. {reply, error_no_members, NewState};
  144. MaxFreePoolName ->
  145. {NewPid, State2} = take_member(MaxFreePoolName, CPid, NewState),
  146. {reply, NewPid, State2}
  147. end;
  148. {NewPid, NewState} ->
  149. {reply, NewPid, NewState}
  150. end;
  151. handle_call(stop, _From, State) ->
  152. {stop, normal, stop_ok, State};
  153. handle_call(pool_stats, _From, State) ->
  154. {reply, dict:to_list(State#state.all_members), State};
  155. handle_call(_Request, _From, State) ->
  156. {noreply, ok, State}.
  157. -spec handle_cast(_,_) -> {'noreply', _}.
  158. handle_cast({return_member, Pid, Status, _CPid}, State) ->
  159. {noreply, do_return_member(Pid, Status, State)};
  160. handle_cast(_Msg, State) ->
  161. {noreply, State}.
  162. -spec handle_info(_, _) -> {'noreply', _}.
  163. handle_info({'EXIT', Pid, Reason}, State) ->
  164. State1 =
  165. case dict:find(Pid, State#state.all_members) of
  166. {ok, {_PoolName, _ConsumerPid, _Time}} ->
  167. do_return_member(Pid, fail, State);
  168. error ->
  169. case dict:find(Pid, State#state.consumer_to_pid) of
  170. {ok, Pids} ->
  171. IsOk = case Reason of
  172. normal -> ok;
  173. _Crash -> fail
  174. end,
  175. lists:foldl(
  176. fun(P, S) -> do_return_member(P, IsOk, S) end,
  177. State, Pids);
  178. error ->
  179. State
  180. end
  181. end,
  182. {noreply, State1};
  183. handle_info(_Info, State) ->
  184. {noreply, State}.
  185. -spec terminate(_, _) -> 'ok'.
  186. terminate(_Reason, _State) ->
  187. ok.
  188. -spec code_change(_, _, _) -> {'ok', _}.
  189. code_change(_OldVsn, State, _Extra) ->
  190. {ok, State}.
  191. %% ------------------------------------------------------------------
  192. %% Internal Function Definitions
  193. %% ------------------------------------------------------------------
  194. -spec props_to_pool([{atom(), term()}]) -> #pool{}.
  195. props_to_pool(P) ->
  196. #pool{ name = ?gv(name, P),
  197. max_count = ?gv(max_count, P),
  198. init_count = ?gv(init_count, P),
  199. start_mfa = ?gv(start_mfa, P)}.
  200. % FIXME: creation of new pids should probably happen
  201. % in a spawned process to avoid tying up the loop.
  202. -spec add_pids(error | string(), non_neg_integer(), #state{}) ->
  203. {bad_pool_name | max_count_reached | ok, #state{}}.
  204. add_pids(error, _N, State) ->
  205. {bad_pool_name, State};
  206. add_pids(PoolName, N, State) ->
  207. #state{pools = Pools, pool_sups = PoolSups,
  208. all_members = AllMembers} = State,
  209. Pool = dict:fetch(PoolName, Pools),
  210. #pool{max_count = Max, free_pids = Free,
  211. in_use_count = NumInUse, free_count = NumFree} = Pool,
  212. Total = NumFree + NumInUse,
  213. case Total + N =< Max of
  214. true ->
  215. PoolSup = dict:fetch(PoolName, PoolSups),
  216. {AllMembers1, NewPids} = start_n_pids(N, PoolName, PoolSup,
  217. AllMembers),
  218. % should we sanity check or take length(Free ++ NewPids)
  219. % as free_count?
  220. Pool1 = Pool#pool{free_pids = Free ++ NewPids,
  221. free_count = NumFree + N},
  222. {ok, State#state{pools = dict:store(PoolName, Pool1, Pools),
  223. all_members = AllMembers1}};
  224. false ->
  225. {max_count_reached, State}
  226. end.
  227. -spec take_member(string(), pid(), #state{}) ->
  228. {error_no_members | pid(), #state{}}.
  229. take_member(PoolName, From, State) ->
  230. #state{pools = Pools, consumer_to_pid = CPMap} = State,
  231. Pool = dict:fetch(PoolName, Pools),
  232. #pool{max_count = Max, free_pids = Free, in_use_count = NumInUse,
  233. free_count = NumFree} = Pool,
  234. case Free of
  235. [] when NumInUse == Max ->
  236. {error_no_members, State};
  237. [] when NumInUse < Max ->
  238. case add_pids(PoolName, 1, State) of
  239. {ok, State1} ->
  240. take_member(PoolName, From, State1);
  241. {max_count_reached, _} ->
  242. {error_no_members, State}
  243. end;
  244. [Pid|Rest] ->
  245. erlang:link(From),
  246. Pool1 = Pool#pool{free_pids = Rest, in_use_count = NumInUse + 1,
  247. free_count = NumFree - 1},
  248. CPMap1 = dict:update(From, fun(O) -> [Pid|O] end, [Pid], CPMap),
  249. AllMembers =
  250. dict:update(Pid,
  251. fun({PName, free, Time}) -> {PName, From, Time} end,
  252. State#state.all_members),
  253. {Pid, State#state{pools = dict:store(PoolName, Pool1, Pools),
  254. consumer_to_pid = CPMap1,
  255. all_members = AllMembers}}
  256. end.
  257. -spec do_return_member(pid(), ok | fail, #state{}) -> #state{}.
  258. do_return_member(Pid, ok, State = #state{all_members = AllMembers}) ->
  259. {PoolName, _CPid, _} = dict:fetch(Pid, AllMembers),
  260. Pool = dict:fetch(PoolName, State#state.pools),
  261. #pool{free_pids = Free, in_use_count = NumInUse,
  262. free_count = NumFree} = Pool,
  263. Pool1 = Pool#pool{free_pids = [Pid | Free], in_use_count = NumInUse - 1,
  264. free_count = NumFree + 1},
  265. State#state{pools = dict:store(PoolName, Pool1, State#state.pools),
  266. all_members = dict:store(Pid, {PoolName, free, os:timestamp()},
  267. AllMembers)};
  268. do_return_member(Pid, fail, State = #state{all_members = AllMembers}) ->
  269. % for the fail case, perhaps the member crashed and was alerady
  270. % removed, so use find instead of fetch and ignore missing.
  271. case dict:find(Pid, AllMembers) of
  272. {ok, {PoolName, _, _}} ->
  273. State1 = remove_pid(Pid, State),
  274. {Status, State2} = add_pids(PoolName, 1, State1),
  275. case Status =:= ok orelse Status =:= max_count_reached of
  276. true ->
  277. State2;
  278. false ->
  279. erlang:error({error, "unexpected return from add_pid",
  280. Status, erlang:get_stacktrace()})
  281. end;
  282. error ->
  283. State
  284. end.
  285. % @doc Remove `Pid' from the pid list associated with `CPid' in the
  286. % consumer to member map given by `CPMap'.
  287. %
  288. % If `Pid' is the last element in `CPid's pid list, then the `CPid'
  289. % entry is removed entirely.
  290. %
  291. -spec cpmap_remove(pid(), pid(), dict()) -> dict().
  292. cpmap_remove(Pid, CPid, CPMap) ->
  293. case dict:find(CPid, CPMap) of
  294. {ok, Pids0} ->
  295. unlink(CPid), % FIXME: flush msg queue here?
  296. Pids1 = lists:delete(Pid, Pids0),
  297. case Pids1 of
  298. [_H|_T] ->
  299. dict:store(CPid, Pids1, CPMap);
  300. [] ->
  301. dict:erase(CPid, CPMap)
  302. end;
  303. error ->
  304. % FIXME: this shouldn't happen, should we log or error?
  305. CPMap
  306. end.
  307. % @doc Remove and kill a pool member.
  308. %
  309. % Handles in-use and free members. Logs an error if the pid is not
  310. % tracked in state.all_members.
  311. %
  312. -spec remove_pid(pid(), #state{}) -> #state{}.
  313. remove_pid(Pid, State) ->
  314. #state{all_members = AllMembers, pools = Pools,
  315. consumer_to_pid = CPMap} = State,
  316. case dict:find(Pid, AllMembers) of
  317. {ok, {PoolName, free, _Time}} ->
  318. % remove an unused member
  319. Pool = dict:fetch(PoolName, Pools),
  320. Pool1 = lists:delete(Pid, Pool#pool.free_pids),
  321. exit(Pid, kill),
  322. State#state{pools = dict:store(PoolName, Pool1, Pools),
  323. all_members = dict:erase(Pid, AllMembers)};
  324. {ok, {PoolName, CPid, _Time}} ->
  325. Pool = dict:fetch(PoolName, Pools),
  326. Pool1 = Pool#pool{in_use_count = Pool#pool.in_use_count - 1},
  327. exit(Pid, kill),
  328. State#state{pools = dict:store(PoolName, Pool1, Pools),
  329. consumer_to_pid = cpmap_remove(Pid, CPid, CPMap),
  330. all_members = dict:erase(Pid, AllMembers)};
  331. error ->
  332. error_logger:error_report({unknown_pid, Pid,
  333. erlang:get_stacktrace()}),
  334. State
  335. end.
  336. -spec max_free_pool(dict()) -> error_no_members | string().
  337. max_free_pool(Pools) ->
  338. case dict:fold(fun fold_max_free_count/3, {"", 0}, Pools) of
  339. {"", 0} -> error_no_members;
  340. {MaxFreePoolName, _} -> MaxFreePoolName
  341. end.
  342. -spec fold_max_free_count(string(), #pool{}, {string(), non_neg_integer()}) ->
  343. {string(), non_neg_integer()}.
  344. fold_max_free_count(Name, Pool, {CName, CMax}) ->
  345. case Pool#pool.free_count > CMax of
  346. true -> {Name, Pool#pool.free_count};
  347. false -> {CName, CMax}
  348. end.
  349. -spec start_n_pids(non_neg_integer(), string(), pid(), dict()) ->
  350. {dict(), [pid()]}.
  351. start_n_pids(N, PoolName, PoolSup, AllMembers) ->
  352. NewPids = lists:map(
  353. fun(_I) ->
  354. {ok, Pid} = supervisor:start_child(PoolSup, []),
  355. % FIXME: race condition here if child
  356. % crashes early.
  357. erlang:link(Pid),
  358. Pid
  359. end, lists:seq(1, N)),
  360. AllMembers1 = lists:foldl(
  361. fun(M, Dict) ->
  362. Time = os:timestamp(),
  363. dict:store(M, {PoolName, free, Time}, Dict)
  364. end, AllMembers, NewPids),
  365. {AllMembers1, NewPids}.