gproc_pool.erl 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949
  1. %% ``The contents of this file are subject to the Erlang Public License,
  2. %% Version 1.1, (the "License"); you may not use this file except in
  3. %% compliance with the License. You should have received a copy of the
  4. %% Erlang Public License along with this software. If not, it can be
  5. %% retrieved via the world wide web at http://www.erlang.org/.
  6. %%
  7. %% Software distributed under the License is distributed on an "AS IS"
  8. %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  9. %% the License for the specific language governing rights and limitations
  10. %% under the License.
  11. %%
  12. %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
  13. %% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
  14. %% AB. All Rights Reserved.''
  15. %%
  16. %% @author Ulf Wiger <ulf@wiger.net>
  17. %%
  18. %% @doc Load balancing functions based on Gproc.
  19. %%
  20. %% This module implements support for load-balancing server pools. It was
  21. %% originally intended mainly as an example of how to use various Gproc
  22. %% resources (e.g. counters and shared properties), but is fully integrated
  23. %% into Gproc, and fully functional.
  24. %%
  25. %% <h2>Concepts</h2>
  26. %%
  27. %% Each pool has a list of 'named' workers (defined using `add_worker/2') and
  28. %% a load-balancing strategy. Processes can then 'connect' to the pool (with
  29. %% `connect_worker/2'), using one of the defined names.
  30. %%
  31. %% Users then 'pick' one of the currently connected processes in the pool. Which
  32. %% process is picked depends on the load-balancing strategy.
  33. %%
  34. %% The whole representation of the pool and its connected workers is in gproc.
  35. %% The server `gproc_pool' is used to serialize pool management updates, but
  36. %% worker selection is performed entirely in the calling process, and can be
  37. %% performed by several processes concurrently.
  38. %%
  39. %% <h3>Load-balancing strategies</h3>
  40. %%
  41. %% * `round_robin' is the default. A wrapping gproc counter keeps track of the
  42. %% latest worker picked, and `gproc:next()' is used to find the next worker.
  43. %% * `random' picks a random worker from the pool.
  44. %% * `hash' requires a value (`pick/2'), and picks a worker based on the hash of
  45. %% that value.
  46. %% * `direct' takes an integer as an argument, and picks the next worker (modulo
  47. %% the size of the pool). This is mainly for implementations that implement
  48. %% a load-balancing strategy on top of `gproc_pool'.
  49. %% * `claim' picks the first available worker and 'claims' it while executing
  50. %% a user-provided fun. This means that the number of concurrently executing
  51. %% jobs will not exceed the size of the pool.
  52. %% @end
  53. -module(gproc_pool).
  54. -behavior(gen_server).
  55. %% gproc round-robin name lookup
  56. -export([new/1, % (Pool) -> (Pool, round_robin, [])
  57. new/3, % (Pool, Type, Opts)
  58. delete/1, % (Pool)
  59. force_delete/1, % (Pool)
  60. add_worker/2, % (Pool, Name) -> Pos
  61. add_worker/3, % (Pool, Name, Pos) -> Pos
  62. remove_worker/2, % (Pool, Name)
  63. connect_worker/2, % (Pool, Name)
  64. disconnect_worker/2, % (Pool, Name)
  65. whereis_worker/2, % (Pool, Name)
  66. worker_id/2, % (Pool, Name)
  67. active_workers/1, % (Pool)
  68. defined_workers/1, % (Pool)
  69. worker_pool/1, % (Pool)
  70. pick/1, % (Pool)
  71. pick/2, % (Pool, Value)
  72. claim/2, % (Pool, Fun)
  73. log/1, % (WorkerId)
  74. randomize/1]). % (Pool)
  75. -export([start_link/0]).
  76. -export([init/1,
  77. handle_call/3,
  78. handle_cast/2,
  79. handle_info/2,
  80. terminate/2,
  81. code_change/3]).
  82. -export([test/1, test/3, ptest/4, test_run/2, test_run1/2, test_run2/2,
  83. test_run0/2, setup_test_pool/3, remove_test_pool/1]).
  84. -define(POOL(Pool), {p,l,{?MODULE,Pool}}).
  85. -define(POOL_CUR(Pool), {c,l,{?MODULE,Pool,cur}}).
  86. -define(POOL_WRK(Pool,Name), {c,l,{?MODULE,Pool,w,Name}}).
  87. -record(st, {}).
  88. %% @spec new(Pool::any()) -> true
  89. %%
  90. %% @equiv new(Pool, round_robin, [])
  91. new(Pool) ->
  92. new(Pool, round_robin, []).
  93. %% @spec new(Pool::any(), Type, Opts) -> true
  94. %% Type = round_robin | random | hash | direct | claim
  95. %% Opts = [{size, integer()} | {auto_size, boolean()}]
  96. %%
  97. %% @doc Create a new pool.
  98. %%
  99. %% The pool starts out empty. If a size is not given, the pool size is set to
  100. %% 0 initially. `auto_size' is `true' by default if size is not specified, but
  101. %% `false' by default otherwise. If `auto_size == true', the pool will be
  102. %% enlarged to accomodate new workers, when necessary. Otherwise, trying to add
  103. %% a worker when the pool is full will raise an exception, as will trying to add
  104. %% a worker on a specific position beyond the current size of the pool.
  105. %%
  106. %% If the given pool already exists, this function will raise an exception.
  107. %% @end
  108. new(Pool, Type, Opts) when Type == round_robin;
  109. Type == random;
  110. Type == hash;
  111. Type == direct;
  112. Type == claim ->
  113. call({new, Pool, Type, Opts}).
  114. %% @spec delete(Pool::any()) -> true
  115. %% @doc Delete an existing pool.
  116. %%
  117. %% This function will delete a pool, only if there are no connected workers.
  118. %% Ensure that workers have been disconnected before deleting the pool.
  119. %% @end
  120. %%
  121. delete(Pool) ->
  122. call({delete, Pool}).
  123. %% @spec force_delete(Pool::any()) -> true
  124. %% @doc Forcibly remove a pool, terminating all active workers
  125. %%
  126. %% This function is primarily intended for cleanup of any pools that might have
  127. %% become inconsistent (for whatever reason). It will clear out all resources
  128. %% belonging to the pool and send `exit(Pid, kill)' signals to all connected
  129. %% workers (except the calling process).
  130. %% @end
  131. %%
  132. force_delete(Pool) ->
  133. %% This is not pretty, but this function is mainly intended to clean up
  134. %% a pool that's not used, with no regard to connected workers, except self(),
  135. %% (that is, we kill each connected worker). We don't worry about races,
  136. %% so don't go to the server (which doesn't have own state on the pool
  137. %% anyway).
  138. force_delete_(Pool).
  139. %% @spec add_worker(Pool::any(), Name::any()) -> integer()
  140. %%
  141. %% @doc Assign a worker name to the pool, returning the worker's position.
  142. %%
  143. %% Before a worker can connect to the pool, its name must be added. If no explicit
  144. %% position is given (see {@link add_worker/3}), the most suitable position,
  145. %% depending on load-balancing algorithm, is selected: for round_robin and direct
  146. %% pools, names are packed tightly from the beginning; for hash and random pools,
  147. %% slots are filled as sparsely as possible, in order to maintain an even
  148. %% likelihood of hitting each worker.
  149. %%
  150. %% An exception is raised if the pool is full (and `auto_size' is false), or if
  151. %% `Name' already exists in the pool.
  152. %%
  153. %% Before a worker can be used, a process must connect to it (see
  154. %% {@link connect_worker/2}.
  155. %% @end
  156. add_worker(Pool, Name) ->
  157. call({add_worker, Pool, Name}).
  158. %% @spec add_worker(Pool::any(), Name::any(), Slot::integer()) -> integer()
  159. %%
  160. %% @doc Assign a worker name to a given slot in the pool, returning the slot.
  161. %%
  162. %% This function allows the pool maintainer to exactly position each worker
  163. %% inside the pool. An exception is raised if the position is already taken,
  164. %% or if `Name' already exists in the pool. If `Slot' is larger than the current
  165. %% size of the pool, an exception is raised iff `auto_size' is `false';
  166. %% otherwise the pool is expanded to accomodate the new position.
  167. %% @end
  168. add_worker(Pool, Name, Slot) ->
  169. call({add_worker, Pool, Name, Slot}).
  170. %% @spec connect_worker(Pool::any(), Name::any()) -> true
  171. %% @doc Connect the current process to `Name' in `Pool'.
  172. %%
  173. %% Typically, a server will call this function as it starts, similarly to when
  174. %% it registers itself. In fact, calling `connect_worker/2' leads to the process
  175. %% being registered as `{n,l,[gproc_pool,N,Name]}', where `N' is the position of
  176. %% `Name' in the pool. This means (a) that gproc monitors the worker, and
  177. %% removes the connection automatically if it dies, and (b) that the registered
  178. %% names can be listed in order of their positions in the pool.
  179. %%
  180. %% This function raises an exception if `Name' does not exist in `Pool' (or
  181. %% there is no such pool), or if another worker is already connected to
  182. %% `Name'.
  183. %% @end
  184. %%
  185. connect_worker(Pool, Name) ->
  186. gproc:reg(worker_id(Pool, Name), 0).
  187. %% @spec disconnect_worker(Pool, Name) -> true
  188. %%
  189. %% @doc Disconnect the current process from `Name' in `Pool'.
  190. %%
  191. %% This function is similar to a `gproc:unreg()' call. It removes the
  192. %% connection between `Pool', `Name' and pid, and makes it possible for another
  193. %% process to connect to `Name'.
  194. %%
  195. %% An exception is raised if there is no prior connection between `Pool',
  196. %% `Name' and the current process.
  197. %% @end
  198. %%
  199. disconnect_worker(Pool, Name) ->
  200. gproc:unreg(worker_id(Pool, Name)).
  201. %% @spec remove_worker(Pool::any(), Name::any()) -> true
  202. %% @doc Remove a previously added worker.
  203. %%
  204. %% This function will assume that any connected worker is disconnected first.
  205. %% It will fail if there is no such pool, but will return `true' in the case
  206. %% when `Name' did not exist in the pool in the first place.
  207. %% @end
  208. remove_worker(Pool, Name) ->
  209. call({remove_worker, Pool, Name}).
  210. %% @spec whereis_worker(Pool::any(), Name::any()) -> pid() | undefined
  211. %% @doc Look up the pid of a connected worker.
  212. %%
  213. %% This function works similarly to `gproc:where/1': it will return the pid
  214. %% of the worker connected as `Pool / Name', if there is such a worker; otherwise
  215. %% it will return `undefined'. It will raise an exception if `Name' has not been
  216. %% added to the pool.
  217. %% @end
  218. whereis_worker(Pool, Name) ->
  219. ID = worker_id(Pool, Name),
  220. gproc:where(ID).
  221. %% @spec worker_id(Pool, Name) -> GprocName
  222. %% @doc Return the unique gproc name corresponding to a name in the pool.
  223. %%
  224. %% This function assumes that `Name' has been added to `Pool'. It returns the
  225. %% unique name that a connected worker will be registered as. This doesn't mean
  226. %% that there is, in fact, such a connected worker.
  227. %% @end
  228. worker_id(Pool, Name) ->
  229. N = gproc:get_attribute(?POOL_WRK(Pool, Name), shared, n),
  230. {n, l, [?MODULE, Pool, N, Name]}.
  231. %% @spec active_workers(Pool::any()) -> [{Name, Pid}]
  232. %% @doc Return a list of currently connected workers in the pool.
  233. %%
  234. active_workers(Pool) ->
  235. gproc:select(
  236. {l,n},
  237. [{ {{n,l,[?MODULE,Pool,'$1','$2']},'$3','_'}, [{is_integer, '$1'}],
  238. [{{'$2', '$3'}}] }]).
  239. %% @spec defined_workers(Pool::any()) -> [{Name, Pos, Count}]
  240. %% @doc Return a list of added workers in the pool.
  241. %%
  242. %% The added workers are slots in the pool that have been given names, and thus
  243. %% can be connected to. This function doesn't detect whether or not there are
  244. %% any connected (active) workers.
  245. %%
  246. %% The list contains `{Name, Pos, Count}', where `Name' is the name of the added
  247. %% worker, `Pos' is its position in the pool, and `Count' represents the number
  248. %% of times the worker has been picked (assuming callers keep count by explicitly
  249. %% calling {@link log/1}).
  250. %% @end
  251. defined_workers(Pool) ->
  252. K = ?POOL(Pool),
  253. [{N, Pos, gproc:get_value(?POOL_WRK(Pool, N), shared)}
  254. || {N, Pos} <- get_workers_(K)].
  255. %% @spec worker_pool(Pool::any()) -> [integer() | {Name, Pos}]
  256. %% @doc Return a list of slots and/or named workers in the pool.
  257. %%
  258. %% This function is mainly for testing, but can also be useful when implementing
  259. %% your own worker placement algorithm on top of gproc_pool.
  260. %%
  261. %% A plain integer represents an unfilled slot, and `{Name, Pos}' represents an
  262. %% added worker. The pool is always filled to the current size.
  263. %% @end
  264. worker_pool(Pool) ->
  265. get_workers_(?POOL(Pool)).
  266. %% @spec pick(Pool::any()) -> GprocName | false
  267. %% @doc Pick a worker from the pool given the pool's load-balancing algorithm.
  268. %%
  269. %% The pool types that allows picking without an extra argument are
  270. %% round_robin and random. This function returns `false' if there is no available
  271. %% worker, or if `Pool' is not a valid pool.
  272. %% @end
  273. pick(Pool) ->
  274. case gproc:get_value(?POOL(Pool), shared) of
  275. {0, _} -> false;
  276. {Sz, Type} when Type == round_robin; Type == random ->
  277. pick(Pool, Sz, Type);
  278. _ ->
  279. error(badarg)
  280. end.
  281. %% @spec pick(Pool::any(), Value::any()) -> GprocName | false
  282. %% @doc Pick a worker from the pool based on `Value'.
  283. %%
  284. %% The pool types that allows picking based on an extra argument are
  285. %% hash and direct. This function returns `false' if there is no available
  286. %% worker, or if `Pool' is not a valid pool.
  287. %%
  288. %% If the pool is of type `direct', `Value' must be an integer corresponding to
  289. %% a position in the pool (modulo the size of the pool). If the type is
  290. %% `hash', `Value' may be any term, and its hash value will serve as a guide for
  291. %% selecting a worker.
  292. %% @end
  293. pick(Pool, N) ->
  294. case gproc:get_value(?POOL(Pool), shared) of
  295. {0, _} -> false;
  296. {Sz, Type} when Type == hash; Type == direct ->
  297. pick(Pool, Sz, Type, N);
  298. _ ->
  299. error(badarg)
  300. end.
  301. pick(Pool, Sz, round_robin) ->
  302. Next = incr(Pool, 1, Sz),
  303. case gproc:next({l,n}, {n,l,[?MODULE,Pool,Next]}) of
  304. {n,l,[?MODULE,Pool,Actual,_Name]} = Pick ->
  305. case Actual - Next of
  306. Diff when Diff > 1 ->
  307. gproc:update_counter(
  308. ?POOL_CUR(Pool), shared, {Diff, Sz, 1}),
  309. Pick;
  310. _ ->
  311. Pick
  312. end;
  313. _ ->
  314. case gproc:next({l,n}, {n,l,[?MODULE,Pool,0]}) of
  315. {n,l,[?MODULE,Pool,Actual1,_Name1]} = Pick ->
  316. incr(Pool, Sz-Next+Actual1, Sz),
  317. %% gproc:update_counter(
  318. %% ?POOL_CUR(Pool), shared, {Sz-Next+Actual1, Sz, 1}),
  319. Pick;
  320. _ ->
  321. false
  322. end
  323. end;
  324. pick(Pool, Sz, random) ->
  325. pick_near(Pool, crypto:rand_uniform(1, Sz + 1)).
  326. pick(Pool, Sz, hash, Val) ->
  327. pick_near(Pool, erlang:phash2(Val, Sz) + 1);
  328. pick(Pool, Sz, direct, N) when is_integer(N), N > 0 ->
  329. pick_near(Pool, case (N rem Sz-1) + 1 of 0 -> Sz; N1 -> N1 end).
  330. pick_near(Pool, N) ->
  331. case gproc:next({l,n}, {n,l,[?MODULE,Pool,N]}) of
  332. {n,l,[?MODULE,Pool,_,_]} = Pick ->
  333. Pick;
  334. _ ->
  335. %% wrap
  336. case gproc:next({l,n}, {n,l,[?MODULE,Pool,1]}) of
  337. {n,l,[?MODULE,Pool,_,_]} = Pick ->
  338. Pick;
  339. _ ->
  340. false
  341. end
  342. end.
  343. %% @spec claim(Pool::any(), Fun::function()) -> {true, Res} | false
  344. %% @doc Picks the first available worker in the pool and applies `Fun'.
  345. %%
  346. %% A `claim' pool allows the caller to "claim" a worker during a short span
  347. %% (essentially, a lock is set and released as soon as `Fun' returns).
  348. %% Once a worker is selected, `Fun(Name, Pid)' is called, where `Name' is a
  349. %% unique gproc name of the worker, and `Pid' is its process identifier.
  350. %% The gproc name of the worker serves as a mutex, where its value is 0 (zero)
  351. %% if the worker is free, and 1 (one) if it is busy. The mutex operation is
  352. %% implemented using `gproc:update_counter/2'.
  353. %% @end
  354. claim(Pool, F) when is_function(F, 2) ->
  355. case gproc:get_value(?POOL(Pool), shared) of
  356. {0, _} -> false;
  357. {_, claim} ->
  358. claim_(Pool, F);
  359. _ ->
  360. error(badarg)
  361. end.
  362. claim_(Pool, F) ->
  363. case gproc:select({l,n}, [{ {{n,l,[?MODULE,Pool,'$1','_']}, '_', 0}, [],
  364. [{{ {element,1,'$_'}, '$1' }}]}], 1) of
  365. {[{K, Pid}], Cont} ->
  366. case try_claim(K, Pid, F) of
  367. {true, _} = True ->
  368. True;
  369. false ->
  370. claim_cont(Cont, F)
  371. end;
  372. _ ->
  373. false
  374. end.
  375. claim_cont(Cont, F) ->
  376. case gproc:select(Cont) of
  377. {[{K, Pid}], Cont1} ->
  378. case try_claim(K, Pid, F) of
  379. {true, _} = True ->
  380. True;
  381. false ->
  382. claim_cont(Cont1, F)
  383. end;
  384. _ ->
  385. false
  386. end.
  387. try_claim(K, Pid, F) ->
  388. case gproc:update_counter(K, [0, {1, 1, 1}]) of
  389. [0, 1] ->
  390. %% have lock
  391. try Res = F(K, Pid),
  392. {true, Res}
  393. after
  394. gproc:set_value(K, 0)
  395. end;
  396. [1, 1] ->
  397. %% no
  398. false
  399. end.
  400. %% @spec log(GprocKey) -> integer()
  401. %% @doc Update a counter associated with a worker name.
  402. %%
  403. %% Each added worker has a gproc counter that can be used e.g. to keep track of
  404. %% the number of times the worker has been picked. Since it's associated with the
  405. %% named 'slot', and not to the connected worker, its value will persist even
  406. %% if the currently connected worker dies.
  407. %% @end
  408. log({n,l,[?MODULE,Pool,_,Name]}) ->
  409. gproc:update_shared_counter(?POOL_WRK(Pool,Name), 1).
  410. %% @spec randomize(Pool::any()) -> integer()
  411. %% @doc Randomizes the "next" pointer for the pool.
  412. %%
  413. %% This function only has an effect for `round_robin' pools, which have a
  414. %% reference to the next worker to be picked. Without randomizing, the load
  415. %% balancing will always start with the first worker in the pool.
  416. %% @end
  417. randomize(Pool) ->
  418. case pool_size(Pool) of
  419. 0 -> 0;
  420. 1 -> 1;
  421. Sz ->
  422. incr(Pool, crypto:rand_uniform(0, Sz), Sz)
  423. end.
  424. %% @spec pool_size(Pool::any()) -> integer()
  425. %% @doc Return the size of the pool.
  426. %%
  427. pool_size(Pool) ->
  428. {Sz, _} = gproc:get_value(?POOL(Pool), shared),
  429. Sz.
  430. %% ===================================================================
  431. %% Start, stop, call gen_server
  432. %% @private
  433. start_link() ->
  434. gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
  435. %% @private
  436. init([]) ->
  437. {ok, #st{}}.
  438. %% @private
  439. call(Req) ->
  440. case gen_server:call(?MODULE, Req) of
  441. badarg ->
  442. error(badarg);
  443. {badarg, Reason} ->
  444. error(Reason);
  445. Reply ->
  446. Reply
  447. end.
  448. %% ===================================================================
  449. %% Gen_server callbacks
  450. %% @private
  451. handle_call(Req, From, S) ->
  452. try handle_call_(Req, From, S)
  453. catch
  454. error:Reason ->
  455. io:fwrite("server backtrace: ~p~n", [erlang:get_stacktrace()]),
  456. {reply, {badarg, Reason}, S}
  457. end.
  458. handle_call_({new, Pool, Type, Opts}, _, S) ->
  459. new_(Pool, Type, Opts),
  460. {reply, ok, S};
  461. handle_call_({delete, Pool}, _, S) ->
  462. delete_(Pool),
  463. {reply, ok, S};
  464. handle_call_({force_delete, Pool}, _, S) ->
  465. force_delete_(Pool),
  466. {reply, ok, S};
  467. handle_call_({add_worker, Pool, Name}, _, S) ->
  468. N = add_worker_(Pool, Name),
  469. {reply, N, S};
  470. handle_call_({add_worker, Pool, Name, Pos}, _, S) ->
  471. N = add_worker_(Pool, Name, Pos),
  472. {reply, N, S};
  473. handle_call_({set_pool_size, Pool, Sz}, _, S) ->
  474. Workers = get_workers_(Pool),
  475. case get_last_worker_n(Workers) of
  476. N when N > Sz ->
  477. {reply, badarg, S};
  478. _ ->
  479. set_pool_size_(?POOL(Pool), Sz, Workers),
  480. {reply, true, S}
  481. end;
  482. handle_call_({remove_worker, Pool, Name}, _, S) ->
  483. ok = remove_worker_(Pool, Name),
  484. {reply, true, S}.
  485. %% @private
  486. handle_cast(_, S) ->
  487. {noreply, S}.
  488. %% @private
  489. handle_info(_, S) ->
  490. {noreply, S}.
  491. %% @private
  492. terminate(_, _) ->
  493. ok.
  494. %% @private
  495. code_change(_, S, _) ->
  496. {ok, S}.
  497. %% ===================================================================
  498. %% Internal functions
  499. new_(Pool, Type, Opts) ->
  500. valid_type(Type),
  501. Size = proplists:get_value(size, Opts, 0),
  502. Workers = lists:seq(1, Size),
  503. gproc:reg_shared(K = ?POOL(Pool), {Size, Type}),
  504. Opts1 =
  505. case lists:keyfind(auto_size, 1, Opts) of
  506. false ->
  507. Opts ++ [{auto_size, not lists:keymember(size, 1, Opts)}];
  508. {_, Bool} when is_boolean(Bool) ->
  509. Opts
  510. end,
  511. gproc:set_attributes_shared(K, Opts1),
  512. set_workers(K, Workers),
  513. gproc:reg_shared(?POOL_CUR(Pool), Size).
  514. valid_type(T) when T==round_robin; T==hash; T==random; T==direct; T==claim ->
  515. true;
  516. valid_type(_) ->
  517. error(invalid_type).
  518. set_pool_size_(K, Sz, Workers) ->
  519. {_, Type} = gproc:get_value(K, shared),
  520. case length(Workers) of
  521. Sz ->
  522. set_workers(K, Workers);
  523. Len when Len > Sz ->
  524. Workers1 = lists:sublist(Workers, 1, Sz),
  525. set_workers(K, Workers1);
  526. Len when Len < Sz ->
  527. Workers1 = Workers ++ lists:seq(Len+1, Sz),
  528. set_workers(K, Workers1)
  529. end,
  530. gproc:set_value_shared(K, {Sz, Type}).
  531. delete_(Pool) ->
  532. K = ?POOL(Pool),
  533. Ws = get_workers_(K),
  534. case [1 || {_,_} <- Ws] of
  535. [] ->
  536. gproc:unreg_shared(K),
  537. gproc:unreg_shared(?POOL_CUR(Pool));
  538. [_|_] ->
  539. error(not_empty)
  540. end.
  541. force_delete_(Pool) ->
  542. Props = gproc:select({l,p}, [{ {?POOL(Pool), '_', '_'}, [], ['$_']}]),
  543. Cur = gproc:select({l,c}, [{ {?POOL_CUR(Pool), '_', '_'}, [], ['$_']}]),
  544. Workers = gproc:select(
  545. {l,c}, [{ {?POOL_WRK(Pool,'_'), '_', '_'}, [], ['$_']}]),
  546. Names = find_names(Pool, '_'),
  547. lists:foreach(
  548. fun({Key, Pid, _}) when Pid == self() -> gproc:unreg(Key);
  549. ({_, Pid, _}) when is_pid(Pid) -> exit(Pid, kill)
  550. end, Names),
  551. [gproc:unreg_shared(W) || {W,shared,_} <- Cur ++ Props ++ Workers],
  552. true.
  553. find_names(Pool, Pid) ->
  554. gproc:select(
  555. {l,n}, [{ {{n,l,[?MODULE,Pool,Pid,'_']}, '_', '_'}, [], ['$_']}]).
  556. add_worker_(Pool, Name) ->
  557. K = ?POOL(Pool),
  558. {Sz, Type} = gproc:get_value(K, shared),
  559. AutoSz = gproc:get_attribute(K, shared, auto_size),
  560. Ws0 = get_workers_(K),
  561. {N,Ws1} =
  562. case lists:keymember(Name, 1, Ws0) of
  563. false ->
  564. case find_slot(Name, K, Ws0, Sz, Type, AutoSz) of
  565. {_, _} = Res ->
  566. Res;
  567. false ->
  568. error(pool_full)
  569. end;
  570. true ->
  571. error(exists)
  572. end,
  573. if N > Sz ->
  574. set_pool_size_(K, N, Ws1); % also calls set_workers/2
  575. true ->
  576. %% size not changed
  577. set_workers(K, Ws1)
  578. end,
  579. reg_worker(Pool, Name, N),
  580. N.
  581. add_worker_(Pool, Name, Pos) ->
  582. K = ?POOL(Pool),
  583. {Sz, _} = gproc:get_value(K, shared),
  584. Ws0 = get_workers_(K),
  585. if Pos > Sz ->
  586. case gproc:get_attribute(K, shared, auto_size) of
  587. true ->
  588. Ws1 = Ws0 ++ lists:seq(Sz+1,Pos-1) ++ [{Name, Pos}],
  589. set_pool_size_(K, Pos, Ws1);
  590. false ->
  591. error(out_of_range)
  592. end;
  593. true ->
  594. case lists:nth(Pos, Ws0) of
  595. {_,_} -> error(exists);
  596. P when is_integer(P) ->
  597. Ws1 = set_pos(Pos, Ws0, {Name, Pos}),
  598. set_workers(K, Ws1)
  599. end
  600. end,
  601. reg_worker(Pool, Name, Pos),
  602. Pos.
  603. reg_worker(Pool, Name, Pos) ->
  604. gproc:reg_shared(Wrk = ?POOL_WRK(Pool, Name), 0),
  605. gproc:set_attributes_shared(Wrk, [{n, Pos}]).
  606. remove_worker_(Pool, Name) ->
  607. case whereis_worker(Pool, Name) of
  608. Pid when is_pid(Pid) ->
  609. error({worker_connected, Pid});
  610. undefined ->
  611. do_remove_worker_(Pool, Name)
  612. end.
  613. do_remove_worker_(Pool, Name) ->
  614. K = ?POOL(Pool),
  615. Ws0 = get_workers_(K),
  616. Ws1 = del_slot(Name, Ws0),
  617. gproc:unreg_shared(?POOL_WRK(Pool, Name)),
  618. case (NewLen = length(Ws1)) - length(Ws0) of
  619. 0 -> ok;
  620. Diff when Diff < 0 ->
  621. {_, Type} = gproc:get_value(K, shared),
  622. gproc:set_value_shared(K, {NewLen, Type})
  623. end,
  624. gproc:set_attributes_shared(K, [{workers, Ws1}]),
  625. ok.
  626. del_slot(Name, [{Name,_}]) ->
  627. [];
  628. del_slot(Name, [{Name, Pos}|T]) ->
  629. [Pos|T];
  630. del_slot(Name, [H|T]) ->
  631. [H|del_slot(Name, T)].
  632. find_slot(Name, _, [], Sz, _, Auto) ->
  633. case {Sz, Auto} of
  634. {0, false} -> false;
  635. {_, _} ->
  636. {1, [{Name, 1}]}
  637. end;
  638. find_slot(Name, Key, Workers, Sz, Type, AutoSz) ->
  639. case get_strategy(Key, Type) of
  640. packed ->
  641. find_slot_packed(Name, Workers, AutoSz);
  642. sparse ->
  643. find_slot_sparse(Name, Workers, Sz, AutoSz)
  644. end.
  645. %% find_slot(Name, Key, Workers, Sz, Type, AutoSz, Strategy).
  646. %% find_slot(Name, []) ->
  647. %% {1, [{Name, 1}]};
  648. %% find_slot(Name, Slots) ->
  649. %% find_slot(Name, Slots, []).
  650. get_last_worker_n(Ws) ->
  651. get_last_worker_n(Ws, 0, 1).
  652. get_last_worker_n([{_,_}|T], _, P) ->
  653. get_last_worker_n(T, P, P+1);
  654. get_last_worker_n([H|T], Last, P) when is_integer(H) ->
  655. get_last_worker_n(T, Last, P+1);
  656. get_last_worker_n([], Last, _) ->
  657. Last.
  658. find_slot_packed(Name, Workers, AutoSz) ->
  659. find_slot_packed(Name, Workers, AutoSz, []).
  660. find_slot_packed(Name, [N|T], _, Acc) when is_integer(N) -> % empty slot
  661. {N, lists:reverse(Acc) ++ [{Name, N}|T]};
  662. find_slot_packed(Name, [{_,Prev} = Last], true, Acc) -> % last elem; expand pool
  663. New = Prev+1,
  664. {New, lists:reverse([{Name, New}, Last|Acc])};
  665. find_slot_packed(_, [_], false, _) ->
  666. false;
  667. find_slot_packed(Name, [{_,_} = H|T], Auto, Acc) ->
  668. find_slot_packed(Name, T, Auto, [H|Acc]).
  669. find_slot_sparse(Name, Ws, Sz, Auto) ->
  670. %% Collect the position of the first and last filled slots, as well as
  671. %% the largest gap between filled slots
  672. case lists:foldl(
  673. fun(N, {Prev, StartP, First, Last, Max, MaxP}) when is_integer(N) ->
  674. case Prev+1 of
  675. Gap when Gap > Max ->
  676. {Gap, StartP, First, Last, Gap, StartP};
  677. Gap ->
  678. {Gap, StartP, First, Last, Max, MaxP}
  679. end;
  680. (N, []) when is_integer(N) ->
  681. %% skip
  682. [];
  683. ({_, Pos}, []) ->
  684. {0, Pos, _First = Pos, _Last = Pos, 0, 0};
  685. ({_, Pos}, {Prev, StartP, First, _PrevLast, Max, MaxP}) ->
  686. if Prev > Max ->
  687. {0, Pos, First, Pos, Prev, StartP};
  688. true ->
  689. {0, Pos, First, Pos, Max, MaxP}
  690. end
  691. end, [], Ws) of
  692. [] ->
  693. %% all empty slots
  694. case {Sz, Auto} of
  695. {0, false} ->
  696. false;
  697. {0, true} ->
  698. {1, [{Name, 1}]};
  699. {_, _} when is_integer(Sz), Sz > 0 ->
  700. {1, [{Name, 1}|tl(Ws)]}
  701. end;
  702. {_, _, 1, Last, 0, _} ->
  703. %% Pool full
  704. if Auto ->
  705. NewPos = Last + 1,
  706. {NewPos, Ws ++ [{Name, NewPos}]};
  707. true ->
  708. false
  709. end;
  710. {_, _, First, Last, MaxGap, StartPos} ->
  711. WrapGap = (Sz - Last) + First - 1,
  712. NewPos = if WrapGap >= MaxGap ->
  713. (Last + (WrapGap div 2) + 1) rem (Sz+1);
  714. true ->
  715. (StartPos + (MaxGap div 2) + 1) rem (Sz+1)
  716. end,
  717. {NewPos, set_pos(NewPos, Ws, {Name, NewPos})}
  718. end.
  719. set_pos(P, L, X) when P > 0, is_list(L) ->
  720. set_pos(P, 1, L, X).
  721. set_pos(P, P, [_|T], X) ->
  722. [X|T];
  723. set_pos(P, C, [H|T], X) when C < P ->
  724. [H|set_pos(P, C+1, T, X)].
  725. get_workers_(K) ->
  726. case gproc:get_attribute(K, shared, workers) of
  727. undefined ->
  728. [];
  729. L when is_list(L) ->
  730. L
  731. end.
  732. set_workers(K, L) when is_list(L) ->
  733. gproc:set_attributes_shared(K, [{workers, L}]).
  734. get_strategy(Key, Type) ->
  735. Default = case Type of
  736. round_robin -> packed;
  737. random -> sparse;
  738. hash -> sparse;
  739. direct -> packed;
  740. claim -> packed
  741. end,
  742. attribute(Key, fill_strategy, Default).
  743. attribute(Key, A, Default) ->
  744. case gproc:get_attribute(Key, shared, A) of
  745. undefined -> Default;
  746. Value -> Value
  747. end.
  748. incr(Pool, Incr, Sz) ->
  749. gproc:update_counter(?POOL_CUR(Pool), shared, {Incr, Sz, 1}).
  750. %% find_worker(Pool, Name) ->
  751. %% case gproc:select(n, [{ {{n, l, {?MODULE, Pool, '_'}}, '_', Name},
  752. %% [], ['$_'] }]) of
  753. %% [] ->
  754. %% undefined;
  755. %% [{{n,l,{?MODULE,_,N}}, Pid, _}] ->
  756. %% {N, Pid}
  757. %% end.
  758. %% ============================= Test code ===========================
  759. %% @private
  760. test(N) when N > 0 ->
  761. test(N, round_robin, []).
  762. %% @private
  763. test(N, Type, Opts) when Type==round_robin;
  764. Type==random;
  765. Type==hash;
  766. Type==direct;
  767. Type==claim ->
  768. P = ?LINE,
  769. setup_test_pool(P, Type, Opts),
  770. try timer:tc(?MODULE, f(Type), [N, P])
  771. after
  772. remove_test_pool(P)
  773. end.
  774. ptest(N, I, Type, Opts) ->
  775. P = ?LINE,
  776. setup_test_pool(P, Type, Opts),
  777. F = f(Type),
  778. Pids =
  779. [spawn_monitor(fun() -> exit({ok, timer:tc(?MODULE, F, [I, P])}) end)
  780. || _ <- lists:seq(1, N)],
  781. try collect(Pids)
  782. after
  783. remove_test_pool(P)
  784. end.
  785. collect(Pids) ->
  786. Results = [receive
  787. {'DOWN', Ref, _, _, Reason} ->
  788. Reason
  789. end || {_, Ref} <- Pids],
  790. {Times, Avgs} = lists:foldr(fun({ok, {T, Avg}}, {A,B}) ->
  791. {[T|A], [Avg|B]} end,
  792. {[],[]}, Results),
  793. {Times, lists:sum(Times)/length(Times),
  794. lists:sum(Avgs)/length(Avgs)}.
  795. f(Type) when Type==hash; Type==direct ->
  796. test_run1;
  797. f(Type) when Type==claim ->
  798. test_run2;
  799. f({empty,_}) ->
  800. test_run0;
  801. f(_) ->
  802. test_run.
  803. %% @private
  804. setup_test_pool(P, Type0, Opts) ->
  805. Type = case Type0 of {_, T} -> T; T when is_atom(T) -> T end,
  806. new(P, Type, Opts),
  807. [begin R = add_worker(P, W),
  808. io:fwrite("add_worker(~p, ~p) -> ~p; Ws = ~p~n",
  809. [P, W, R, get_workers_(?POOL(P))]),
  810. connect_worker(P, W)
  811. end || W <- test_workers()].
  812. %% @private
  813. remove_test_pool(P) ->
  814. io:fwrite("worker stats (~p):~n"
  815. "~p~n", [P, gproc:select(
  816. {l,c},
  817. [{ {{c,l,{?MODULE,P,w,'$1'}},'_','$2'}, [],
  818. [{{'$1','$2'}}] }])]),
  819. [begin disconnect_worker(P, W),
  820. remove_worker(P, W)
  821. end || W <- test_workers()],
  822. delete(P).
  823. test_workers() -> [a,b,c,d,e,f].
  824. %% @private
  825. test_run(N, P) ->
  826. test_run(N, P, 0, 0).
  827. test_run(N, P, S, M) when N > 0 ->
  828. {T, Worker} = timer:tc(?MODULE, pick, [P]),
  829. true = (Worker =/= false),
  830. log(Worker),
  831. timer:sleep(crypto:rand_uniform(1,50)),
  832. test_run(N-1, P, S+T, M+1);
  833. test_run(_, _, S, M) ->
  834. S/M.
  835. %% @private
  836. test_run1(N, P) ->
  837. test_run1(N, P, 0, 0).
  838. test_run1(N, P, S, M) when N > 0 ->
  839. {T, Worker} = timer:tc(?MODULE, pick, [P, N]),
  840. true = (Worker =/= false),
  841. log(Worker),
  842. timer:sleep(crypto:rand_uniform(1,50)),
  843. test_run1(N-1, P, S+T, M+1);
  844. test_run1(_, _, S, M) ->
  845. S/M.
  846. %% @private
  847. test_run2(N, P) ->
  848. test_run2(N, P, fun(K,_) -> log(K) end, 0, 0).
  849. test_run2(N, P, F, S, M) when N > 0 ->
  850. {T, {true, _}} = timer:tc(?MODULE, claim, [P, F]),
  851. timer:sleep(crypto:rand_uniform(1,50)),
  852. test_run2(N-1, P, F, S+T, M+1);
  853. test_run2(_, _, _, S, M) ->
  854. S/M.
  855. test_run0(N, X) when N > 0 ->
  856. test_run0(N-1, X);
  857. test_run0(_, _) ->
  858. ok.