gproc_pool.erl 34 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075
  1. %% ``The contents of this file are subject to the Erlang Public License,
  2. %% Version 1.1, (the "License"); you may not use this file except in
  3. %% compliance with the License. You should have received a copy of the
  4. %% Erlang Public License along with this software. If not, it can be
  5. %% retrieved via the world wide web at http://www.erlang.org/.
  6. %%
  7. %% Software distributed under the License is distributed on an "AS IS"
  8. %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  9. %% the License for the specific language governing rights and limitations
  10. %% under the License.
  11. %%
  12. %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
  13. %% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
  14. %% AB. All Rights Reserved.''
  15. %%
  16. %% @author Ulf Wiger <ulf@wiger.net>
  17. %%
  18. %% @doc Load balancing functions based on Gproc.
  19. %%
  20. %% This module implements support for load-balancing server pools. It was
  21. %% originally intended mainly as an example of how to use various Gproc
  22. %% resources (e.g. counters and shared properties), but is fully integrated
  23. %% into Gproc, and fully functional.
  24. %%
  25. %% <h2>Concepts</h2>
  26. %%
  27. %% Each pool has a list of 'named' workers (defined using `add_worker/2') and
  28. %% a load-balancing strategy. Processes can then 'connect' to the pool (with
  29. %% `connect_worker/2'), using one of the defined names.
  30. %%
  31. %% Users then 'pick' one of the currently connected processes in the pool. Which
  32. %% process is picked depends on the load-balancing strategy.
  33. %%
  34. %% The whole representation of the pool and its connected workers is in gproc.
  35. %% The server `gproc_pool' is used to serialize pool management updates, but
  36. %% worker selection is performed entirely in the calling process, and can be
  37. %% performed by several processes concurrently.
  38. %%
  39. %% <h3>Load-balancing strategies</h3>
  40. %%
  41. %% * `round_robin' is the default. A wrapping gproc counter keeps track of the
  42. %% latest worker picked, and `gproc:next()' is used to find the next worker.
  43. %% * `random' picks a random worker from the pool.
  44. %% * `hash' requires a value (`pick/2'), and picks a worker based on the hash of
  45. %% that value.
  46. %% * `direct' takes an integer as an argument, and picks the next worker (modulo
  47. %% the size of the pool). This is mainly for implementations that implement
  48. %% a load-balancing strategy on top of `gproc_pool'.
  49. %% * `claim' picks the first available worker and 'claims' it while executing
  50. %% a user-provided fun. This means that the number of concurrently executing
  51. %% jobs will not exceed the size of the pool.
  52. %% @end
  53. -module(gproc_pool).
  54. -behavior(gen_server).
  55. %% gproc round-robin name lookup
  56. -export([new/1, % (Pool) -> (Pool, round_robin, [])
  57. new/3, % (Pool, Type, Opts)
  58. delete/1, % (Pool)
  59. force_delete/1, % (Pool)
  60. add_worker/2, % (Pool, Name) -> Pos
  61. add_worker/3, % (Pool, Name, Pos) -> Pos
  62. remove_worker/2, % (Pool, Name)
  63. connect_worker/2, % (Pool, Name)
  64. disconnect_worker/2, % (Pool, Name)
  65. whereis_worker/2, % (Pool, Name)
  66. worker_id/2, % (Pool, Name)
  67. active_workers/1, % (Pool)
  68. defined_workers/1, % (Pool)
  69. worker_pool/1, % (Pool)
  70. pick/1, % (Pool)
  71. pick/2, % (Pool, Value)
  72. pick_worker/1, % (Pool)
  73. pick_worker/2, % (Pool, Value)
  74. claim/2, % (Pool, Fun)
  75. claim/3, % (Pool, Fun, Wait)
  76. log/1, % (WorkerId)
  77. randomize/1]). % (Pool)
  78. -export([start_link/0]).
  79. -export([init/1,
  80. handle_call/3,
  81. handle_cast/2,
  82. handle_info/2,
  83. terminate/2,
  84. code_change/3]).
  85. -export([test/1, test/3, ptest/4, test_run/2, test_run1/2, test_run2/2,
  86. test_run0/2, setup_test_pool/3, setup_test_pool/4,
  87. remove_test_pool/1]).
  88. -define(POOL(Pool), {p,l,{?MODULE,Pool}}).
  89. -define(POOL_CUR(Pool), {c,l,{?MODULE,Pool,cur}}).
  90. -define(POOL_WRK(Pool,Name), {c,l,{?MODULE,Pool,w,Name}}).
  91. -record(st, {}).
  92. %% @spec new(Pool::any()) -> ok
  93. %%
  94. %% @equiv new(Pool, round_robin, [])
  95. new(Pool) ->
  96. new(Pool, round_robin, []).
  97. %% @spec new(Pool::any(), Type, Opts) -> true
  98. %% Type = round_robin | random | hash | direct | claim
  99. %% Opts = [{size, integer()} | {auto_size, boolean()}]
  100. %%
  101. %% @doc Create a new pool.
  102. %%
  103. %% The pool starts out empty. If a size is not given, the pool size is set to
  104. %% 0 initially. `auto_size' is `true' by default if size is not specified, but
  105. %% `false' by default otherwise. If `auto_size == true', the pool will be
  106. %% enlarged to accomodate new workers, when necessary. Otherwise, trying to add
  107. %% a worker when the pool is full will raise an exception, as will trying to add
  108. %% a worker on a specific position beyond the current size of the pool.
  109. %%
  110. %% If the given pool already exists, this function will raise an exception.
  111. %% @end
  112. new(Pool, Type, Opts) when Type == round_robin;
  113. Type == random;
  114. Type == hash;
  115. Type == direct;
  116. Type == claim ->
  117. call({new, Pool, Type, Opts}).
  118. %% @spec delete(Pool::any()) -> true
  119. %% @doc Delete an existing pool.
  120. %%
  121. %% This function will delete a pool, only if there are no connected workers.
  122. %% Ensure that workers have been disconnected before deleting the pool.
  123. %% @end
  124. %%
  125. delete(Pool) ->
  126. call({delete, Pool}).
  127. %% @spec force_delete(Pool::any()) -> true
  128. %% @doc Forcibly remove a pool, terminating all active workers
  129. %%
  130. %% This function is primarily intended for cleanup of any pools that might have
  131. %% become inconsistent (for whatever reason). It will clear out all resources
  132. %% belonging to the pool and send `exit(Pid, kill)' signals to all connected
  133. %% workers (except the calling process).
  134. %% @end
  135. %%
  136. force_delete(Pool) ->
  137. %% This is not pretty, but this function is mainly intended to clean up
  138. %% a pool that's not used, with no regard to connected workers, except self(),
  139. %% (that is, we kill each connected worker). We don't worry about races,
  140. %% so don't go to the server (which doesn't have own state on the pool
  141. %% anyway).
  142. force_delete_(Pool).
  143. %% @spec add_worker(Pool::any(), Name::any()) -> integer()
  144. %%
  145. %% @doc Assign a worker name to the pool, returning the worker's position.
  146. %%
  147. %% Before a worker can connect to the pool, its name must be added. If no explicit
  148. %% position is given (see {@link add_worker/3}), the most suitable position,
  149. %% depending on load-balancing algorithm, is selected: for round_robin and direct
  150. %% pools, names are packed tightly from the beginning; for hash and random pools,
  151. %% slots are filled as sparsely as possible, in order to maintain an even
  152. %% likelihood of hitting each worker.
  153. %%
  154. %% An exception is raised if the pool is full (and `auto_size' is false), or if
  155. %% `Name' already exists in the pool.
  156. %%
  157. %% Before a worker can be used, a process must connect to it (see
  158. %% {@link connect_worker/2}.
  159. %% @end
  160. add_worker(Pool, Name) ->
  161. call({add_worker, Pool, Name}).
  162. %% @spec add_worker(Pool::any(), Name::any(), Slot::integer()) -> integer()
  163. %%
  164. %% @doc Assign a worker name to a given slot in the pool, returning the slot.
  165. %%
  166. %% This function allows the pool maintainer to exactly position each worker
  167. %% inside the pool. An exception is raised if the position is already taken,
  168. %% or if `Name' already exists in the pool. If `Slot' is larger than the current
  169. %% size of the pool, an exception is raised iff `auto_size' is `false';
  170. %% otherwise the pool is expanded to accomodate the new position.
  171. %% @end
  172. add_worker(Pool, Name, Slot) ->
  173. call({add_worker, Pool, Name, Slot}).
  174. %% @spec connect_worker(Pool::any(), Name::any()) -> true
  175. %% @doc Connect the current process to `Name' in `Pool'.
  176. %%
  177. %% Typically, a server will call this function as it starts, similarly to when
  178. %% it registers itself. In fact, calling `connect_worker/2' leads to the process
  179. %% being registered as `{n,l,[gproc_pool,N,Name]}', where `N' is the position of
  180. %% `Name' in the pool. This means (a) that gproc monitors the worker, and
  181. %% removes the connection automatically if it dies, and (b) that the registered
  182. %% names can be listed in order of their positions in the pool.
  183. %%
  184. %% This function raises an exception if `Name' does not exist in `Pool' (or
  185. %% there is no such pool), or if another worker is already connected to
  186. %% `Name'.
  187. %% @end
  188. %%
  189. connect_worker(Pool, Name) ->
  190. gproc:reg(worker_id(Pool, Name), 0).
  191. %% @spec disconnect_worker(Pool, Name) -> true
  192. %%
  193. %% @doc Disconnect the current process from `Name' in `Pool'.
  194. %%
  195. %% This function is similar to a `gproc:unreg()' call. It removes the
  196. %% connection between `Pool', `Name' and pid, and makes it possible for another
  197. %% process to connect to `Name'.
  198. %%
  199. %% An exception is raised if there is no prior connection between `Pool',
  200. %% `Name' and the current process.
  201. %% @end
  202. %%
  203. disconnect_worker(Pool, Name) ->
  204. gproc:unreg(worker_id(Pool, Name)).
  205. %% @spec remove_worker(Pool::any(), Name::any()) -> true
  206. %% @doc Remove a previously added worker.
  207. %%
  208. %% This function will assume that any connected worker is disconnected first.
  209. %% It will fail if there is no such pool, but will return `true' in the case
  210. %% when `Name' did not exist in the pool in the first place.
  211. %% @end
  212. remove_worker(Pool, Name) ->
  213. call({remove_worker, Pool, Name}).
  214. %% @spec whereis_worker(Pool::any(), Name::any()) -> pid() | undefined
  215. %% @doc Look up the pid of a connected worker.
  216. %%
  217. %% This function works similarly to `gproc:where/1': it will return the pid
  218. %% of the worker connected as `Pool / Name', if there is such a worker; otherwise
  219. %% it will return `undefined'. It will raise an exception if `Name' has not been
  220. %% added to the pool.
  221. %% @end
  222. whereis_worker(Pool, Name) ->
  223. ID = worker_id(Pool, Name),
  224. gproc:where(ID).
  225. %% @spec worker_id(Pool, Name) -> GprocName
  226. %% @doc Return the unique gproc name corresponding to a name in the pool.
  227. %%
  228. %% This function assumes that `Name' has been added to `Pool'. It returns the
  229. %% unique name that a connected worker will be registered as. This doesn't mean
  230. %% that there is, in fact, such a connected worker.
  231. %% @end
  232. worker_id(Pool, Name) ->
  233. N = gproc:get_attribute(?POOL_WRK(Pool, Name), shared, n),
  234. {n, l, [?MODULE, Pool, N, Name]}.
  235. %% @spec active_workers(Pool::any()) -> [{Name, Pid}]
  236. %% @doc Return a list of currently connected workers in the pool.
  237. %%
  238. active_workers(Pool) ->
  239. gproc:select(
  240. {l,n},
  241. [{ {{n,l,[?MODULE,Pool,'$1','$2']},'$3','_'}, [{is_integer, '$1'}],
  242. [{{'$2', '$3'}}] }]).
  243. %% @spec defined_workers(Pool::any()) -> [{Name, Pos, Count}]
  244. %% @doc Return a list of added workers in the pool.
  245. %%
  246. %% The added workers are slots in the pool that have been given names, and thus
  247. %% can be connected to. This function doesn't detect whether or not there are
  248. %% any connected (active) workers.
  249. %%
  250. %% The list contains `{Name, Pos, Count}', where `Name' is the name of the added
  251. %% worker, `Pos' is its position in the pool, and `Count' represents the number
  252. %% of times the worker has been picked (assuming callers keep count by explicitly
  253. %% calling {@link log/1}).
  254. %% @end
  255. defined_workers(Pool) ->
  256. K = ?POOL(Pool),
  257. [{N, Pos, gproc:get_value(?POOL_WRK(Pool, N), shared)}
  258. || {N, Pos} <- get_workers_(K)].
  259. %% @spec worker_pool(Pool::any()) -> [integer() | {Name, Pos}]
  260. %% @doc Return a list of slots and/or named workers in the pool.
  261. %%
  262. %% This function is mainly for testing, but can also be useful when implementing
  263. %% your own worker placement algorithm on top of gproc_pool.
  264. %%
  265. %% A plain integer represents an unfilled slot, and `{Name, Pos}' represents an
  266. %% added worker. The pool is always filled to the current size.
  267. %% @end
  268. worker_pool(Pool) ->
  269. get_workers_(?POOL(Pool)).
  270. %% @spec pick(Pool::any()) -> GprocName | false
  271. %% @doc Pick a worker from the pool given the pool's load-balancing algorithm.
  272. %%
  273. %% The pool types that allows picking without an extra argument are
  274. %% round_robin and random. This function returns `false' if there is no available
  275. %% worker, or if `Pool' is not a valid pool.
  276. %% @end
  277. pick(Pool) ->
  278. case gproc:get_value(?POOL(Pool), shared) of
  279. {0, _} -> false;
  280. {Sz, Type} when Type == round_robin; Type == random ->
  281. pick(Pool, Sz, Type, name);
  282. _ ->
  283. error(badarg)
  284. end.
  285. %% @spec pick_worker(Pool::any()) -> pid() | false
  286. %% @doc Pick a worker pid from the pool given the pool's load-balancing algorithm.
  287. %%
  288. %% Like {@link pick/1}, but returns the worker pid instead of the name.
  289. %% @end
  290. pick_worker(Pool) ->
  291. case gproc:get_value(?POOL(Pool), shared) of
  292. {0, _} -> false;
  293. {Sz, Type} when Type == round_robin; Type == random ->
  294. pick(Pool, Sz, Type, pid);
  295. _ ->
  296. error(badarg)
  297. end.
  298. %% @spec pick(Pool::any(), Value::any()) -> GprocName | false
  299. %% @doc Pick a worker from the pool based on `Value'.
  300. %%
  301. %% The pool types that allows picking based on an extra argument are
  302. %% hash and direct. This function returns `false' if there is no available
  303. %% worker, or if `Pool' is not a valid pool.
  304. %%
  305. %% If the pool is of type `direct', `Value' must be an integer corresponding to
  306. %% a position in the pool (modulo the size of the pool). If the type is
  307. %% `hash', `Value' may be any term, and its hash value will serve as a guide for
  308. %% selecting a worker.
  309. %% @end
  310. pick(Pool, N) ->
  311. case gproc:get_value(?POOL(Pool), shared) of
  312. {0, _} -> false;
  313. {Sz, Type} when Type == hash; Type == direct ->
  314. pick(Pool, Sz, Type, N, name);
  315. _ ->
  316. error(badarg)
  317. end.
  318. %% @spec pick_worker(Pool::any(), Value::any()) -> pid() | false
  319. %% @doc Pick a worker pid from the pool given the pool's load-balancing algorithm.
  320. %%
  321. %% Like {@link pick/2}, but returns the worker pid instead of the name.
  322. %% @end
  323. pick_worker(Pool, N) ->
  324. case gproc:get_value(?POOL(Pool), shared) of
  325. {0, _} -> false;
  326. {Sz, Type} when Type == hash; Type == direct ->
  327. pick(Pool, Sz, Type, N, pid);
  328. _ ->
  329. error(badarg)
  330. end.
  331. pick(Pool, Sz, round_robin, Ret) ->
  332. Next = incr(Pool, 1, Sz),
  333. case ets:next(gproc, {{n,l,[?MODULE,Pool,Next]},n}) of
  334. {{n,l,[?MODULE,Pool,Actual,_Name]} = Pick, _} ->
  335. case Actual - Next of
  336. Diff when Diff > 1 ->
  337. gproc:update_counter(
  338. ?POOL_CUR(Pool), shared, {Diff, Sz, 1}),
  339. ret(Pick, Ret);
  340. _ ->
  341. ret(Pick, Ret)
  342. end;
  343. _ ->
  344. case ets:next(gproc, {{n,l,[?MODULE,Pool,0]}, n}) of
  345. {{n,l,[?MODULE,Pool,Actual1,_Name1]} = Pick, _} ->
  346. incr(Pool, Sz-Next+Actual1, Sz),
  347. %% gproc:update_counter(
  348. %% ?POOL_CUR(Pool), shared, {Sz-Next+Actual1, Sz, 1}),
  349. ret(Pick, Ret);
  350. _ ->
  351. false
  352. end
  353. end;
  354. pick(Pool, Sz, random, Ret) ->
  355. pick_near(Pool, crypto:rand_uniform(1, Sz + 1), Ret).
  356. pick(Pool, Sz, hash, Val, Ret) ->
  357. pick_near(Pool, erlang:phash2(Val, Sz) + 1, Ret);
  358. pick(Pool, Sz, direct, N, Ret) when is_integer(N), N > 0 ->
  359. pick_near(Pool, case (N rem Sz-1) + 1 of 0 -> Sz; N1 -> N1 end, Ret).
  360. pick_near(Pool, N, Ret) ->
  361. case ets:next(gproc, {{n,l,[?MODULE,Pool,N]}, n}) of
  362. {{n,l,[?MODULE,Pool,_,_]} = Pick, _} ->
  363. ret(Pick, Ret);
  364. _ ->
  365. %% wrap
  366. case ets:next(gproc, {{n,l,[?MODULE,Pool,1]}, n}) of
  367. {{n,l,[?MODULE,Pool,_,_]} = Pick, _} ->
  368. ret(Pick, Ret);
  369. _ ->
  370. false
  371. end
  372. end.
  373. ret(Name, name) ->
  374. Name;
  375. ret(Name, pid) ->
  376. case ets:lookup(gproc, {Name,n}) of
  377. [{_, Pid, _}] ->
  378. Pid;
  379. [] ->
  380. %% possible race
  381. false
  382. end.
  383. %% @equiv claim(Pool, F, nowait)
  384. claim(Pool, F) when is_function(F, 2) ->
  385. claim(Pool, F, nowait).
  386. %% @spec claim(Pool, Fun, Wait) -> {true, Res} | false
  387. %% Pool = any()
  388. %% Fun = function()
  389. %% Wait = nowait | {busy_wait, integer()}
  390. %%
  391. %% @doc Picks the first available worker in the pool and applies `Fun'.
  392. %%
  393. %% A `claim' pool allows the caller to "claim" a worker during a short span
  394. %% (essentially, a lock is set and released as soon as `Fun' returns).
  395. %% Once a worker is selected, `Fun(Name, Pid)' is called, where `Name' is a
  396. %% unique gproc name of the worker, and `Pid' is its process identifier.
  397. %% The gproc name of the worker serves as a mutex, where its value is 0 (zero)
  398. %% if the worker is free, and 1 (one) if it is busy. The mutex operation is
  399. %% implemented using `gproc:update_counter/2'.
  400. %%
  401. %% `Wait == nowait' means that the call will return `false' immediately if
  402. %% there is no available worker.
  403. %%
  404. %% `Wait == {busy_wait, Timeout}' will keep repeating the claim attempt
  405. %% for `Timeout' milliseconds. If still no worker is available, it will
  406. %% return `false'.
  407. %% @end
  408. claim(Pool, F, Wait) ->
  409. case gproc:get_value(?POOL(Pool), shared) of
  410. {0, _} -> false;
  411. {_, claim} ->
  412. W = setup_wait(Wait, Pool),
  413. claim_w(Pool, F, W);
  414. _ ->
  415. error(badarg)
  416. end.
  417. claim_w(_Pool, _F, timeout) ->
  418. false;
  419. claim_w(Pool, F, W) ->
  420. case claim_(Pool, F) of
  421. false ->
  422. claim_w(Pool, F, do_wait(W));
  423. Other ->
  424. clear_wait(W),
  425. Other
  426. end.
  427. %% Define how many workers to select in each chunk. We want to strike
  428. %% a good compromise between the cost of succeeding on the first try
  429. %% (likely a common event) and the cost of retrying. In my measurements,
  430. %% if a chunk size of 1 costs ca 30 us (on my Macbook), a chunk size of 5
  431. %% adds only ca 20% to the cost, i.e. a few us.
  432. -define(CLAIM_CHUNK, 5).
  433. claim_(Pool, F) ->
  434. %% Sorry, but we use ets:select/3 here in order to shave off a few us.
  435. case ets:select(gproc, [{ {{{n,l,[?MODULE,Pool,'_','_']},n}, '$1', 0}, [],
  436. [{{ {element,1,{element,1,'$_'}}, '$1' }}]}],
  437. ?CLAIM_CHUNK) of
  438. {[_|_] = Workers, Cont} ->
  439. case try_claim(Workers, F) of
  440. {true, _} = True ->
  441. True;
  442. false ->
  443. claim_cont(Cont, F)
  444. end;
  445. _ ->
  446. false
  447. end.
  448. claim_cont('$end_of_table', _) ->
  449. false;
  450. claim_cont(Cont, F) ->
  451. case ets:select(Cont) of
  452. {[_|_] = Workers, Cont1} ->
  453. case try_claim(Workers, F) of
  454. {true, _} = True ->
  455. True;
  456. false ->
  457. claim_cont(Cont1, F)
  458. end;
  459. _ ->
  460. false
  461. end.
  462. try_claim([], _) ->
  463. false;
  464. try_claim([{K,Pid}|T], F) ->
  465. case try_claim(K, Pid, F) of
  466. false ->
  467. try_claim(T, F);
  468. Other ->
  469. Other
  470. end.
  471. try_claim(K, Pid, F) ->
  472. case gproc:update_counter(K, [0, {1, 1, 1}]) of
  473. [0, 1] ->
  474. %% have lock
  475. try Res = F(K, Pid),
  476. {true, Res}
  477. after
  478. gproc:reset_counter(K)
  479. end;
  480. [1, 1] ->
  481. %% no
  482. false
  483. end.
  484. setup_wait(nowait, _) ->
  485. nowait;
  486. setup_wait({busy_wait, MS}, Pool) ->
  487. Ref = erlang:send_after(MS, self(), {claim, Pool}),
  488. {busy_wait, Ref}.
  489. do_wait(nowait) ->
  490. timeout;
  491. do_wait({busy_wait, Ref} = W) ->
  492. %% Yielding here serves two purposes:
  493. %% 1) Increase the chance that whoever's before us can finish
  494. %% 2) The value of read_timer/1 only refreshes after yield (so I've heard)
  495. erlang:yield(),
  496. case erlang:read_timer(Ref) of
  497. false ->
  498. erlang:cancel_timer(Ref),
  499. timeout;
  500. _ ->
  501. W
  502. end.
  503. clear_wait(nowait) ->
  504. ok;
  505. clear_wait({busy_wait, Ref}) ->
  506. erlang:cancel_timer(Ref),
  507. ok.
  508. %% @spec log(GprocKey) -> integer()
  509. %% @doc Update a counter associated with a worker name.
  510. %%
  511. %% Each added worker has a gproc counter that can be used e.g. to keep track of
  512. %% the number of times the worker has been picked. Since it's associated with the
  513. %% named 'slot', and not to the connected worker, its value will persist even
  514. %% if the currently connected worker dies.
  515. %% @end
  516. log({n,l,[?MODULE,Pool,_,Name]}) ->
  517. gproc:update_shared_counter(?POOL_WRK(Pool,Name), 1).
  518. %% @spec randomize(Pool::any()) -> integer()
  519. %% @doc Randomizes the "next" pointer for the pool.
  520. %%
  521. %% This function only has an effect for `round_robin' pools, which have a
  522. %% reference to the next worker to be picked. Without randomizing, the load
  523. %% balancing will always start with the first worker in the pool.
  524. %% @end
  525. randomize(Pool) ->
  526. case pool_size(Pool) of
  527. 0 -> 0;
  528. 1 -> 1;
  529. Sz ->
  530. incr(Pool, crypto:rand_uniform(0, Sz), Sz)
  531. end.
  532. %% @spec pool_size(Pool::any()) -> integer()
  533. %% @doc Return the size of the pool.
  534. %%
  535. pool_size(Pool) ->
  536. {Sz, _} = gproc:get_value(?POOL(Pool), shared),
  537. Sz.
  538. %% ===================================================================
  539. %% Start, stop, call gen_server
  540. %% @private
  541. start_link() ->
  542. gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
  543. %% @private
  544. init([]) ->
  545. {ok, #st{}}.
  546. %% @private
  547. call(Req) ->
  548. case gen_server:call(?MODULE, Req) of
  549. badarg ->
  550. error(badarg);
  551. {badarg, Reason} ->
  552. error(Reason);
  553. Reply ->
  554. Reply
  555. end.
  556. %% ===================================================================
  557. %% Gen_server callbacks
  558. %% @private
  559. handle_call(Req, From, S) ->
  560. try handle_call_(Req, From, S)
  561. catch
  562. error:Reason ->
  563. {reply, {badarg, Reason}, S}
  564. end.
  565. handle_call_({new, Pool, Type, Opts}, _, S) ->
  566. new_(Pool, Type, Opts),
  567. {reply, ok, S};
  568. handle_call_({delete, Pool}, _, S) ->
  569. delete_(Pool),
  570. {reply, ok, S};
  571. handle_call_({force_delete, Pool}, _, S) ->
  572. force_delete_(Pool),
  573. {reply, ok, S};
  574. handle_call_({add_worker, Pool, Name}, _, S) ->
  575. N = add_worker_(Pool, Name),
  576. {reply, N, S};
  577. handle_call_({add_worker, Pool, Name, Pos}, _, S) ->
  578. N = add_worker_(Pool, Name, Pos),
  579. {reply, N, S};
  580. handle_call_({set_pool_size, Pool, Sz}, _, S) ->
  581. Workers = get_workers_(Pool),
  582. case get_last_worker_n(Workers) of
  583. N when N > Sz ->
  584. {reply, badarg, S};
  585. _ ->
  586. set_pool_size_(?POOL(Pool), Sz, Workers),
  587. {reply, true, S}
  588. end;
  589. handle_call_({remove_worker, Pool, Name}, _, S) ->
  590. ok = remove_worker_(Pool, Name),
  591. {reply, true, S}.
  592. %% @private
  593. handle_cast(_, S) ->
  594. {noreply, S}.
  595. %% @private
  596. handle_info(_, S) ->
  597. {noreply, S}.
  598. %% @private
  599. terminate(_, _) ->
  600. ok.
  601. %% @private
  602. code_change(_, S, _) ->
  603. {ok, S}.
  604. %% ===================================================================
  605. %% Internal functions
  606. new_(Pool, Type, Opts) ->
  607. valid_type(Type),
  608. Size = proplists:get_value(size, Opts, 0),
  609. Workers = lists:seq(1, Size),
  610. K = ?POOL(Pool),
  611. try gproc:reg_shared(K, {Size, Type})
  612. catch
  613. error:_ -> error(exists)
  614. end,
  615. Opts1 =
  616. case lists:keyfind(auto_size, 1, Opts) of
  617. false ->
  618. Opts ++ [{auto_size, not lists:keymember(size, 1, Opts)}];
  619. {_, Bool} when is_boolean(Bool) ->
  620. Opts
  621. end,
  622. gproc:set_attributes_shared(K, Opts1),
  623. set_workers(K, Workers),
  624. gproc:reg_shared(?POOL_CUR(Pool), Size).
  625. valid_type(T) when T==round_robin; T==hash; T==random; T==direct; T==claim ->
  626. true;
  627. valid_type(_) ->
  628. error(invalid_type).
  629. set_pool_size_(K, Sz, Workers) ->
  630. {_, Type} = gproc:get_value(K, shared),
  631. case length(Workers) of
  632. Sz ->
  633. set_workers(K, Workers);
  634. Len when Len > Sz ->
  635. Workers1 = lists:sublist(Workers, 1, Sz),
  636. set_workers(K, Workers1);
  637. Len when Len < Sz ->
  638. Workers1 = Workers ++ lists:seq(Len+1, Sz),
  639. set_workers(K, Workers1)
  640. end,
  641. gproc:set_value_shared(K, {Sz, Type}).
  642. delete_(Pool) ->
  643. K = ?POOL(Pool),
  644. Ws = get_workers_(K),
  645. case [1 || {_,_} <- Ws] of
  646. [] ->
  647. gproc:unreg_shared(K),
  648. gproc:unreg_shared(?POOL_CUR(Pool));
  649. [_|_] ->
  650. error(not_empty)
  651. end.
  652. force_delete_(Pool) ->
  653. Props = gproc:select({l,p}, [{ {?POOL(Pool), '_', '_'}, [], ['$_']}]),
  654. Cur = gproc:select({l,c}, [{ {?POOL_CUR(Pool), '_', '_'}, [], ['$_']}]),
  655. Workers = gproc:select(
  656. {l,c}, [{ {?POOL_WRK(Pool,'_'), '_', '_'}, [], ['$_']}]),
  657. Names = find_names(Pool, '_'),
  658. lists:foreach(
  659. fun({Key, Pid, _}) when Pid == self() -> gproc:unreg(Key);
  660. ({_, Pid, _}) when is_pid(Pid) -> exit(Pid, kill)
  661. end, Names),
  662. [gproc:unreg_shared(W) || {W,shared,_} <- Cur ++ Props ++ Workers],
  663. true.
  664. find_names(Pool, Pid) ->
  665. gproc:select(
  666. {l,n}, [{ {{n,l,[?MODULE,Pool,Pid,'_']}, '_', '_'}, [], ['$_']}]).
  667. add_worker_(Pool, Name) ->
  668. K = ?POOL(Pool),
  669. {Sz, Type} = gproc:get_value(K, shared),
  670. AutoSz = gproc:get_attribute(K, shared, auto_size),
  671. Ws0 = get_workers_(K),
  672. {N,Ws1} =
  673. case lists:keymember(Name, 1, Ws0) of
  674. false ->
  675. case find_slot(Name, K, Ws0, Sz, Type, AutoSz) of
  676. {_, _} = Res ->
  677. Res;
  678. false ->
  679. error(pool_full)
  680. end;
  681. true ->
  682. error(exists)
  683. end,
  684. if N > Sz ->
  685. set_pool_size_(K, N, Ws1); % also calls set_workers/2
  686. true ->
  687. %% size not changed
  688. set_workers(K, Ws1)
  689. end,
  690. reg_worker(Pool, Name, N),
  691. N.
  692. add_worker_(Pool, Name, Pos) ->
  693. K = ?POOL(Pool),
  694. {Sz, _} = gproc:get_value(K, shared),
  695. Ws0 = get_workers_(K),
  696. if Pos > Sz ->
  697. case gproc:get_attribute(K, shared, auto_size) of
  698. true ->
  699. Ws1 = Ws0 ++ lists:seq(Sz+1,Pos-1) ++ [{Name, Pos}],
  700. set_pool_size_(K, Pos, Ws1);
  701. false ->
  702. error(out_of_range)
  703. end;
  704. true ->
  705. case lists:nth(Pos, Ws0) of
  706. {_,_} -> error(exists);
  707. P when is_integer(P) ->
  708. Ws1 = set_pos(Pos, Ws0, {Name, Pos}),
  709. set_workers(K, Ws1)
  710. end
  711. end,
  712. reg_worker(Pool, Name, Pos),
  713. Pos.
  714. reg_worker(Pool, Name, Pos) ->
  715. gproc:reg_shared(Wrk = ?POOL_WRK(Pool, Name), 0),
  716. gproc:set_attributes_shared(Wrk, [{n, Pos}]).
  717. remove_worker_(Pool, Name) ->
  718. case whereis_worker(Pool, Name) of
  719. Pid when is_pid(Pid) ->
  720. error({worker_connected, Pid});
  721. undefined ->
  722. do_remove_worker_(Pool, Name)
  723. end.
  724. do_remove_worker_(Pool, Name) ->
  725. K = ?POOL(Pool),
  726. Ws0 = get_workers_(K),
  727. Ws1 = del_slot(Name, Ws0),
  728. gproc:unreg_shared(?POOL_WRK(Pool, Name)),
  729. case (NewLen = length(Ws1)) - length(Ws0) of
  730. 0 -> ok;
  731. Diff when Diff < 0 ->
  732. {_, Type} = gproc:get_value(K, shared),
  733. gproc:set_value_shared(K, {NewLen, Type})
  734. end,
  735. gproc:set_attributes_shared(K, [{workers, Ws1}]),
  736. ok.
  737. del_slot(Name, [{Name,_}]) ->
  738. [];
  739. del_slot(Name, [{Name, Pos}|T]) ->
  740. [Pos|T];
  741. del_slot(Name, [H|T]) ->
  742. [H|del_slot(Name, T)].
  743. find_slot(Name, _, [], Sz, _, Auto) ->
  744. case {Sz, Auto} of
  745. {0, false} -> false;
  746. {_, _} ->
  747. {1, [{Name, 1}]}
  748. end;
  749. find_slot(Name, Key, Workers, Sz, Type, AutoSz) ->
  750. case get_strategy(Key, Type) of
  751. packed ->
  752. find_slot_packed(Name, Workers, AutoSz);
  753. sparse ->
  754. find_slot_sparse(Name, Workers, Sz, AutoSz)
  755. end.
  756. %% find_slot(Name, Key, Workers, Sz, Type, AutoSz, Strategy).
  757. %% find_slot(Name, []) ->
  758. %% {1, [{Name, 1}]};
  759. %% find_slot(Name, Slots) ->
  760. %% find_slot(Name, Slots, []).
  761. get_last_worker_n(Ws) ->
  762. get_last_worker_n(Ws, 0, 1).
  763. get_last_worker_n([{_,_}|T], _, P) ->
  764. get_last_worker_n(T, P, P+1);
  765. get_last_worker_n([H|T], Last, P) when is_integer(H) ->
  766. get_last_worker_n(T, Last, P+1);
  767. get_last_worker_n([], Last, _) ->
  768. Last.
  769. find_slot_packed(Name, Workers, AutoSz) ->
  770. find_slot_packed(Name, Workers, AutoSz, []).
  771. find_slot_packed(Name, [N|T], _, Acc) when is_integer(N) -> % empty slot
  772. {N, lists:reverse(Acc) ++ [{Name, N}|T]};
  773. find_slot_packed(Name, [{_,Prev} = Last], true, Acc) -> % last elem; expand pool
  774. New = Prev+1,
  775. {New, lists:reverse([{Name, New}, Last|Acc])};
  776. find_slot_packed(_, [_], false, _) ->
  777. false;
  778. find_slot_packed(Name, [{_,_} = H|T], Auto, Acc) ->
  779. find_slot_packed(Name, T, Auto, [H|Acc]).
  780. find_slot_sparse(Name, Ws, Sz, Auto) ->
  781. %% Collect the position of the first and last filled slots, as well as
  782. %% the largest gap between filled slots
  783. case lists:foldl(
  784. fun(N, {Prev, StartP, First, Last, Max, MaxP}) when is_integer(N) ->
  785. case Prev+1 of
  786. Gap when Gap > Max ->
  787. {Gap, StartP, First, Last, Gap, StartP};
  788. Gap ->
  789. {Gap, StartP, First, Last, Max, MaxP}
  790. end;
  791. (N, []) when is_integer(N) ->
  792. %% skip
  793. [];
  794. ({_, Pos}, []) ->
  795. {0, Pos, _First = Pos, _Last = Pos, 0, 0};
  796. ({_, Pos}, {Prev, StartP, First, _PrevLast, Max, MaxP}) ->
  797. if Prev > Max ->
  798. {0, Pos, First, Pos, Prev, StartP};
  799. true ->
  800. {0, Pos, First, Pos, Max, MaxP}
  801. end
  802. end, [], Ws) of
  803. [] ->
  804. %% all empty slots
  805. case {Sz, Auto} of
  806. {0, false} ->
  807. false;
  808. {0, true} ->
  809. {1, [{Name, 1}]};
  810. {_, _} when is_integer(Sz), Sz > 0 ->
  811. {1, [{Name, 1}|tl(Ws)]}
  812. end;
  813. {_, _, 1, Last, 0, _} ->
  814. %% Pool full
  815. if Auto ->
  816. NewPos = Last + 1,
  817. {NewPos, Ws ++ [{Name, NewPos}]};
  818. true ->
  819. false
  820. end;
  821. {_, _, First, Last, MaxGap, StartPos} ->
  822. WrapGap = (Sz - Last) + First - 1,
  823. NewPos = if WrapGap >= MaxGap ->
  824. (Last + (WrapGap div 2) + 1) rem (Sz+1);
  825. true ->
  826. (StartPos + (MaxGap div 2) + 1) rem (Sz+1)
  827. end,
  828. {NewPos, set_pos(NewPos, Ws, {Name, NewPos})}
  829. end.
  830. set_pos(P, L, X) when P > 0, is_list(L) ->
  831. set_pos(P, 1, L, X).
  832. set_pos(P, P, [_|T], X) ->
  833. [X|T];
  834. set_pos(P, C, [H|T], X) when C < P ->
  835. [H|set_pos(P, C+1, T, X)].
  836. get_workers_(K) ->
  837. case gproc:get_attribute(K, shared, workers) of
  838. undefined ->
  839. [];
  840. L when is_list(L) ->
  841. L
  842. end.
  843. set_workers(K, L) when is_list(L) ->
  844. gproc:set_attributes_shared(K, [{workers, L}]).
  845. get_strategy(Key, Type) ->
  846. Default = case Type of
  847. round_robin -> packed;
  848. random -> sparse;
  849. hash -> sparse;
  850. direct -> packed;
  851. claim -> packed
  852. end,
  853. attribute(Key, fill_strategy, Default).
  854. attribute(Key, A, Default) ->
  855. case gproc:get_attribute(Key, shared, A) of
  856. undefined -> Default;
  857. Value -> Value
  858. end.
  859. incr(Pool, Incr, Sz) ->
  860. gproc:update_counter(?POOL_CUR(Pool), shared, {Incr, Sz, 1}).
  861. %% find_worker(Pool, Name) ->
  862. %% case gproc:select(n, [{ {{n, l, {?MODULE, Pool, '_'}}, '_', Name},
  863. %% [], ['$_'] }]) of
  864. %% [] ->
  865. %% undefined;
  866. %% [{{n,l,{?MODULE,_,N}}, Pid, _}] ->
  867. %% {N, Pid}
  868. %% end.
  869. %% ============================= Test code ===========================
  870. %% @private
  871. test(N) when N > 0 ->
  872. test(N, round_robin, []).
  873. %% @private
  874. test(N, Type, Opts) when Type==round_robin;
  875. Type==random;
  876. Type==hash;
  877. Type==direct;
  878. Type==claim ->
  879. P = ?LINE,
  880. setup_test_pool(P, Type, Opts),
  881. try timer:tc(?MODULE, f(Type), [N, P])
  882. after
  883. remove_test_pool(P)
  884. end.
  885. ptest(N, I, Type, Opts) ->
  886. P = ?LINE,
  887. setup_test_pool(P, Type, Opts),
  888. F = f(Type),
  889. Pids =
  890. [spawn_monitor(fun() -> exit({ok, timer:tc(?MODULE, F, [I, P])}) end)
  891. || _ <- lists:seq(1, N)],
  892. try collect(Pids)
  893. after
  894. remove_test_pool(P)
  895. end.
  896. collect(Pids) ->
  897. Results = [receive
  898. {'DOWN', Ref, _, _, Reason} ->
  899. Reason
  900. end || {_, Ref} <- Pids],
  901. {Times, Avgs} = lists:foldr(fun({ok, {T, Avg}}, {A,B}) ->
  902. {[T|A], [Avg|B]} end,
  903. {[],[]}, Results),
  904. {Times, lists:sum(Times)/length(Times),
  905. lists:sum(Avgs)/length(Avgs)}.
  906. f(Type) when Type==hash; Type==direct ->
  907. test_run1;
  908. f(Type) when Type==claim ->
  909. test_run2;
  910. f({empty,_}) ->
  911. test_run0;
  912. f(_) ->
  913. test_run.
  914. %% @private
  915. setup_test_pool(P, Type, Opts) ->
  916. setup_test_pool(P, Type, Opts, test_workers()).
  917. setup_test_pool(P, Type0, Opts, Workers) ->
  918. Type = case Type0 of {_, T} -> T; T when is_atom(T) -> T end,
  919. new(P, Type, Opts),
  920. [begin R = add_worker(P, W),
  921. io:fwrite("add_worker(~p, ~p) -> ~p; Ws = ~p~n",
  922. [P, W, R, get_workers_(?POOL(P))]),
  923. connect_worker(P, W)
  924. end || W <- Workers].
  925. %% @private
  926. remove_test_pool(P) ->
  927. io:fwrite("worker stats (~p):~n"
  928. "~p~n", [P, gproc:select(
  929. {l,c},
  930. [{ {{c,l,{?MODULE,P,w,'$1'}},'_','$2'}, [],
  931. [{{'$1','$2'}}] }])]),
  932. [begin disconnect_worker(P, W),
  933. remove_worker(P, W)
  934. end || W <- test_workers()],
  935. delete(P).
  936. test_workers() -> [a,b,c,d,e,f].
  937. %% @private
  938. test_run(N, P) ->
  939. test_run(N, P, 0, 0).
  940. test_run(N, P, S, M) when N > 0 ->
  941. {T, Worker} = timer:tc(?MODULE, pick, [P]),
  942. true = (Worker =/= false),
  943. log(Worker),
  944. timer:sleep(crypto:rand_uniform(1,50)),
  945. test_run(N-1, P, S+T, M+1);
  946. test_run(_, _, S, M) ->
  947. S/M.
  948. %% @private
  949. test_run1(N, P) ->
  950. test_run1(N, P, 0, 0).
  951. test_run1(N, P, S, M) when N > 0 ->
  952. {T, Worker} = timer:tc(?MODULE, pick, [P, N]),
  953. true = (Worker =/= false),
  954. log(Worker),
  955. timer:sleep(crypto:rand_uniform(1,50)),
  956. test_run1(N-1, P, S+T, M+1);
  957. test_run1(_, _, S, M) ->
  958. S/M.
  959. %% @private
  960. test_run2(N, P) ->
  961. test_run2(N, P, fun(K,_) ->
  962. R = log(K),
  963. timer:sleep(crypto:rand_uniform(1,50)),
  964. R
  965. end, 0, 0).
  966. test_run2(N, P, F, S, M) when N > 0 ->
  967. {T, {true, _}} = timer:tc(?MODULE, claim, [P, F, {busy_wait, 5000}]),
  968. test_run2(N-1, P, F, S+T, M+1);
  969. test_run2(_, _, _, S, M) ->
  970. S/M.
  971. test_run0(N, X) when N > 0 ->
  972. test_run0(N-1, X);
  973. test_run0(_, _) ->
  974. ok.