gproc_pool.erl 36 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114
  1. %% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
  2. %% --------------------------------------------------
  3. %% This file is provided to you under the Apache License,
  4. %% Version 2.0 (the "License"); you may not use this file
  5. %% except in compliance with the License. You may obtain
  6. %% a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing,
  11. %% software distributed under the License is distributed on an
  12. %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  13. %% KIND, either express or implied. See the License for the
  14. %% specific language governing permissions and limitations
  15. %% under the License.
  16. %% --------------------------------------------------
  17. %%
  18. %% @author Ulf Wiger <ulf@wiger.net>
  19. %%
  20. %% @doc Load balancing functions based on Gproc.
  21. %%
  22. %% This module implements support for load-balancing server pools. It was
  23. %% originally intended mainly as an example of how to use various Gproc
  24. %% resources (e.g. counters and shared properties), but is fully integrated
  25. %% into Gproc, and fully functional.
  26. %%
  27. %% <h2>Concepts</h2>
  28. %%
  29. %% Each pool has a list of 'named' workers (defined using `add_worker/2') and
  30. %% a load-balancing strategy. Processes can then 'connect' to the pool (with
  31. %% `connect_worker/2'), using one of the defined names.
  32. %%
  33. %% Users then 'pick' one of the currently connected processes in the pool. Which
  34. %% process is picked depends on the load-balancing strategy.
  35. %%
  36. %% The whole representation of the pool and its connected workers is in gproc.
  37. %% The server `gproc_pool' is used to serialize pool management updates, but
  38. %% worker selection is performed entirely in the calling process, and can be
  39. %% performed by several processes concurrently.
  40. %%
  41. %% <h3>Load-balancing strategies</h3>
  42. %%
  43. %% * `round_robin' is the default. A wrapping gproc counter keeps track of the
  44. %% latest worker picked, and `gproc:next()' is used to find the next worker.
  45. %% * `random' picks a random worker from the pool.
  46. %% * `hash' requires a value (`pick/2'), and picks a worker based on the hash of
  47. %% that value.
  48. %% * `direct' takes an integer as an argument, and picks the next worker (modulo
  49. %% the size of the pool). This is mainly for implementations that implement
  50. %% a load-balancing strategy on top of `gproc_pool'.
  51. %% * `claim' picks the first available worker and 'claims' it while executing
  52. %% a user-provided fun. This means that the number of concurrently executing
  53. %% jobs will not exceed the size of the pool.
  54. %% @end
  55. -module(gproc_pool).
  56. -behavior(gen_server).
  57. %% gproc round-robin name lookup
  58. -export([new/1, % (Pool) -> (Pool, round_robin, [])
  59. new/3, % (Pool, Type, Opts)
  60. delete/1, % (Pool)
  61. force_delete/1, % (Pool)
  62. add_worker/2, % (Pool, Name) -> Pos
  63. add_worker/3, % (Pool, Name, Pos) -> Pos
  64. remove_worker/2, % (Pool, Name)
  65. connect_worker/2, % (Pool, Name)
  66. disconnect_worker/2, % (Pool, Name)
  67. whereis_worker/2, % (Pool, Name)
  68. worker_id/2, % (Pool, Name)
  69. active_workers/1, % (Pool)
  70. defined_workers/1, % (Pool)
  71. worker_pool/1, % (Pool)
  72. pick/1, % (Pool)
  73. pick/2, % (Pool, Value)
  74. pick_worker/1, % (Pool)
  75. pick_worker/2, % (Pool, Value)
  76. claim/2, % (Pool, Fun)
  77. claim/3, % (Pool, Fun, Wait)
  78. log/1, % (WorkerId)
  79. randomize/1]). % (Pool)
  80. -export([start_link/0]).
  81. -export([init/1,
  82. handle_call/3,
  83. handle_cast/2,
  84. handle_info/2,
  85. terminate/2,
  86. code_change/3]).
  87. -export([test/1, test/3, ptest/4, test_run/2, test_run1/2, test_run2/2,
  88. test_run0/2, setup_test_pool/3, setup_test_pool/4,
  89. remove_test_pool/1]).
  90. -define(POOL(Pool), {p,l,{?MODULE,Pool}}).
  91. -define(POOL_CUR(Pool), {c,l,{?MODULE,Pool,cur}}).
  92. -define(POOL_WRK(Pool,Name), {c,l,{?MODULE,Pool,w,Name}}).
  93. -record(st, {}).
  94. %% @spec new(Pool::any()) -> ok
  95. %%
  96. %% @equiv new(Pool, round_robin, [])
  97. new(Pool) ->
  98. new(Pool, round_robin, []).
  99. %% @spec new(Pool::any(), Type, Opts) -> true
  100. %% Type = round_robin | random | hash | direct | claim
  101. %% Opts = [{size, integer()} | {auto_size, boolean()}]
  102. %%
  103. %% @doc Create a new pool.
  104. %%
  105. %% The pool starts out empty. If a size is not given, the pool size is set to
  106. %% 0 initially. `auto_size' is `true' by default if size is not specified, but
  107. %% `false' by default otherwise. If `auto_size == true', the pool will be
  108. %% enlarged to accomodate new workers, when necessary. Otherwise, trying to add
  109. %% a worker when the pool is full will raise an exception, as will trying to add
  110. %% a worker on a specific position beyond the current size of the pool.
  111. %%
  112. %% If the given pool already exists, this function will raise an exception.
  113. %% @end
  114. new(Pool, Type, Opts) when Type == round_robin;
  115. Type == random;
  116. Type == hash;
  117. Type == direct;
  118. Type == claim ->
  119. call({new, Pool, Type, Opts}).
  120. %% @spec delete(Pool::any()) -> true
  121. %% @doc Delete an existing pool.
  122. %%
  123. %% This function will delete a pool, only if there are no connected workers.
  124. %% Ensure that workers have been disconnected before deleting the pool.
  125. %% @end
  126. %%
  127. delete(Pool) ->
  128. call({delete, Pool}).
  129. %% @spec force_delete(Pool::any()) -> true
  130. %% @doc Forcibly remove a pool, terminating all active workers
  131. %%
  132. %% This function is primarily intended for cleanup of any pools that might have
  133. %% become inconsistent (for whatever reason). It will clear out all resources
  134. %% belonging to the pool and send `exit(Pid, kill)' signals to all connected
  135. %% workers (except the calling process).
  136. %% @end
  137. %%
  138. force_delete(Pool) ->
  139. %% This is not pretty, but this function is mainly intended to clean up
  140. %% a pool that's not used, with no regard to connected workers, except self(),
  141. %% (that is, we kill each connected worker). We don't worry about races,
  142. %% so don't go to the server (which doesn't have own state on the pool
  143. %% anyway).
  144. force_delete_(Pool).
  145. %% @spec add_worker(Pool::any(), Name::any()) -> integer()
  146. %%
  147. %% @doc Assign a worker name to the pool, returning the worker's position.
  148. %%
  149. %% Before a worker can connect to the pool, its name must be added. If no explicit
  150. %% position is given (see {@link add_worker/3}), the most suitable position,
  151. %% depending on load-balancing algorithm, is selected: for round_robin and direct
  152. %% pools, names are packed tightly from the beginning; for hash and random pools,
  153. %% slots are filled as sparsely as possible, in order to maintain an even
  154. %% likelihood of hitting each worker.
  155. %%
  156. %% An exception is raised if the pool is full (and `auto_size' is false), or if
  157. %% `Name' already exists in the pool.
  158. %%
  159. %% Before a worker can be used, a process must connect to it (see
  160. %% {@link connect_worker/2}.
  161. %% @end
  162. add_worker(Pool, Name) ->
  163. call({add_worker, Pool, Name}).
  164. %% @spec add_worker(Pool::any(), Name::any(), Slot::integer()) -> integer()
  165. %%
  166. %% @doc Assign a worker name to a given slot in the pool, returning the slot.
  167. %%
  168. %% This function allows the pool maintainer to exactly position each worker
  169. %% inside the pool. An exception is raised if the position is already taken,
  170. %% or if `Name' already exists in the pool. If `Slot' is larger than the current
  171. %% size of the pool, an exception is raised iff `auto_size' is `false';
  172. %% otherwise the pool is expanded to accomodate the new position.
  173. %% @end
  174. add_worker(Pool, Name, Slot) ->
  175. call({add_worker, Pool, Name, Slot}).
  176. %% @spec connect_worker(Pool::any(), Name::any()) -> true
  177. %% @doc Connect the current process to `Name' in `Pool'.
  178. %%
  179. %% Typically, a server will call this function as it starts, similarly to when
  180. %% it registers itself. In fact, calling `connect_worker/2' leads to the process
  181. %% being registered as `{n,l,[gproc_pool,N,Name]}', where `N' is the position of
  182. %% `Name' in the pool. This means (a) that gproc monitors the worker, and
  183. %% removes the connection automatically if it dies, and (b) that the registered
  184. %% names can be listed in order of their positions in the pool.
  185. %%
  186. %% This function raises an exception if `Name' does not exist in `Pool' (or
  187. %% there is no such pool), or if another worker is already connected to
  188. %% `Name'.
  189. %% @end
  190. %%
  191. connect_worker(Pool, Name) ->
  192. gproc:reg(worker_id(Pool, Name), 0).
  193. %% @spec disconnect_worker(Pool, Name) -> true
  194. %%
  195. %% @doc Disconnect the current process from `Name' in `Pool'.
  196. %%
  197. %% This function is similar to a `gproc:unreg()' call. It removes the
  198. %% connection between `Pool', `Name' and pid, and makes it possible for another
  199. %% process to connect to `Name'.
  200. %%
  201. %% An exception is raised if there is no prior connection between `Pool',
  202. %% `Name' and the current process.
  203. %% @end
  204. %%
  205. disconnect_worker(Pool, Name) ->
  206. gproc:unreg(worker_id(Pool, Name)).
  207. %% @spec remove_worker(Pool::any(), Name::any()) -> true
  208. %% @doc Remove a previously added worker.
  209. %%
  210. %% This function will assume that any connected worker is disconnected first.
  211. %% It will fail if there is no such pool, but will return `true' in the case
  212. %% when `Name' did not exist in the pool in the first place.
  213. %% @end
  214. remove_worker(Pool, Name) ->
  215. call({remove_worker, Pool, Name}).
  216. %% @spec whereis_worker(Pool::any(), Name::any()) -> pid() | undefined
  217. %% @doc Look up the pid of a connected worker.
  218. %%
  219. %% This function works similarly to `gproc:where/1': it will return the pid
  220. %% of the worker connected as `Pool / Name', if there is such a worker; otherwise
  221. %% it will return `undefined'. It will raise an exception if `Name' has not been
  222. %% added to the pool.
  223. %% @end
  224. whereis_worker(Pool, Name) ->
  225. ID = worker_id(Pool, Name),
  226. gproc:where(ID).
  227. %% @spec worker_id(Pool, Name) -> GprocName
  228. %% @doc Return the unique gproc name corresponding to a name in the pool.
  229. %%
  230. %% This function assumes that `Name' has been added to `Pool'. It returns the
  231. %% unique name that a connected worker will be registered as. This doesn't mean
  232. %% that there is, in fact, such a connected worker.
  233. %% @end
  234. worker_id(Pool, Name) ->
  235. N = gproc:get_attribute(?POOL_WRK(Pool, Name), shared, n),
  236. {n, l, [?MODULE, Pool, N, Name]}.
  237. %% @spec active_workers(Pool::any()) -> [{Name, Pid}]
  238. %% @doc Return a list of currently connected workers in the pool.
  239. %%
  240. active_workers(Pool) ->
  241. gproc:select(
  242. {l,n},
  243. [{ {{n,l,[?MODULE,Pool,'$1','$2']},'$3','_'}, [{is_integer, '$1'}],
  244. [{{'$2', '$3'}}] }]).
  245. %% @spec defined_workers(Pool::any()) -> [{Name, Pos, Count}]
  246. %% @doc Return a list of added workers in the pool.
  247. %%
  248. %% The added workers are slots in the pool that have been given names, and thus
  249. %% can be connected to. This function doesn't detect whether or not there are
  250. %% any connected (active) workers.
  251. %%
  252. %% The list contains `{Name, Pos, Count}', where `Name' is the name of the added
  253. %% worker, `Pos' is its position in the pool, and `Count' represents the number
  254. %% of times the worker has been picked (assuming callers keep count by explicitly
  255. %% calling {@link log/1}).
  256. %% @end
  257. defined_workers(Pool) ->
  258. K = ?POOL(Pool),
  259. [{N, Pos, gproc:get_value(?POOL_WRK(Pool, N), shared)}
  260. || {N, Pos} <- get_workers_(K)].
  261. %% @spec worker_pool(Pool::any()) -> [integer() | {Name, Pos}]
  262. %% @doc Return a list of slots and/or named workers in the pool.
  263. %%
  264. %% This function is mainly for testing, but can also be useful when implementing
  265. %% your own worker placement algorithm on top of gproc_pool.
  266. %%
  267. %% A plain integer represents an unfilled slot, and `{Name, Pos}' represents an
  268. %% added worker. The pool is always filled to the current size.
  269. %% @end
  270. worker_pool(Pool) ->
  271. get_workers_(?POOL(Pool)).
  272. %% @spec pick(Pool::any()) -> GprocName | false
  273. %% @doc Pick a worker from the pool given the pool's load-balancing algorithm.
  274. %%
  275. %% The pool types that allows picking without an extra argument are
  276. %% round_robin and random. This function returns `false' if there is no available
  277. %% worker, or if `Pool' is not a valid pool.
  278. %% @end
  279. pick(Pool) ->
  280. case gproc:get_value(?POOL(Pool), shared) of
  281. {0, _} -> false;
  282. {Sz, Type} when Type == round_robin; Type == random ->
  283. pick(Pool, Sz, Type, name);
  284. _ ->
  285. error(badarg)
  286. end.
  287. %% @spec pick_worker(Pool::any()) -> pid() | false
  288. %% @doc Pick a worker pid from the pool given the pool's load-balancing algorithm.
  289. %%
  290. %% Like {@link pick/1}, but returns the worker pid instead of the name.
  291. %% @end
  292. pick_worker(Pool) ->
  293. case gproc:get_value(?POOL(Pool), shared) of
  294. {0, _} -> false;
  295. {Sz, Type} when Type == round_robin; Type == random ->
  296. pick(Pool, Sz, Type, pid);
  297. _ ->
  298. error(badarg)
  299. end.
  300. %% @spec pick(Pool::any(), Value::any()) -> GprocName | false
  301. %% @doc Pick a worker from the pool based on `Value'.
  302. %%
  303. %% The pool types that allows picking based on an extra argument are
  304. %% hash and direct. This function returns `false' if there is no available
  305. %% worker, or if `Pool' is not a valid pool.
  306. %%
  307. %% If the pool is of type `direct', `Value' must be an integer corresponding to
  308. %% a position in the pool (modulo the size of the pool). If the type is
  309. %% `hash', `Value' may be any term, and its hash value will serve as a guide for
  310. %% selecting a worker.
  311. %% @end
  312. pick(Pool, N) ->
  313. case gproc:get_value(?POOL(Pool), shared) of
  314. {0, _} -> false;
  315. {Sz, Type} when Type == hash; Type == direct ->
  316. pick(Pool, Sz, Type, N, name);
  317. _ ->
  318. error(badarg)
  319. end.
  320. %% @spec pick_worker(Pool::any(), Value::any()) -> pid() | false
  321. %% @doc Pick a worker pid from the pool given the pool's load-balancing algorithm.
  322. %%
  323. %% Like {@link pick/2}, but returns the worker pid instead of the name.
  324. %% @end
  325. pick_worker(Pool, N) ->
  326. case gproc:get_value(?POOL(Pool), shared) of
  327. {0, _} -> false;
  328. {Sz, Type} when Type == hash; Type == direct ->
  329. pick(Pool, Sz, Type, N, pid);
  330. _ ->
  331. error(badarg)
  332. end.
  333. pick(Pool, Sz, round_robin, Ret) ->
  334. Next = incr(Pool, 1, Sz),
  335. case ets:next(gproc, {{n,l,[?MODULE,Pool,Next]},n}) of
  336. {{n,l,[?MODULE,Pool,Actual,_Name]} = Pick, _} ->
  337. case Actual - Next of
  338. Diff when Diff > 0 ->
  339. incr(Pool, Diff, Sz),
  340. ret(Pick, Ret);
  341. _ ->
  342. ret(Pick, Ret)
  343. end;
  344. _ ->
  345. case ets:next(gproc, {{n,l,[?MODULE,Pool,0]}, n}) of
  346. {{n,l,[?MODULE,Pool,Actual1,_Name1]} = Pick, _} ->
  347. incr(Pool, Sz-Next+Actual1, Sz),
  348. ret(Pick, Ret);
  349. _ ->
  350. false
  351. end
  352. end;
  353. pick(Pool, Sz, random, Ret) ->
  354. pick_near(Pool, rand:uniform(Sz + 1), Ret).
  355. pick(Pool, Sz, hash, Val, Ret) ->
  356. pick_near(Pool, erlang:phash2(Val, Sz) + 1, Ret);
  357. pick(Pool, Sz, direct, N, Ret) when is_integer(N), N > 0 ->
  358. pick_near(Pool, case (N rem Sz-1) + 1 of 0 -> Sz; N1 -> N1 end, Ret).
  359. pick_near(Pool, N, Ret) ->
  360. case ets:next(gproc, {{n,l,[?MODULE,Pool,N]}, n}) of
  361. {{n,l,[?MODULE,Pool,_,_]} = Pick, _} ->
  362. ret(Pick, Ret);
  363. _ ->
  364. %% wrap
  365. case ets:next(gproc, {{n,l,[?MODULE,Pool,1]}, n}) of
  366. {{n,l,[?MODULE,Pool,_,_]} = Pick, _} ->
  367. ret(Pick, Ret);
  368. _ ->
  369. false
  370. end
  371. end.
  372. ret(Name, name) ->
  373. Name;
  374. ret(Name, pid) ->
  375. case ets:lookup(gproc, {Name,n}) of
  376. [{_, Pid, _}] ->
  377. Pid;
  378. [] ->
  379. %% possible race
  380. false
  381. end.
  382. %% @equiv claim(Pool, F, nowait)
  383. claim(Pool, F) when is_function(F, 2) ->
  384. claim(Pool, F, nowait).
  385. %% @spec claim(Pool, Fun, Wait) -> {true, Res} | false
  386. %% Pool = any()
  387. %% Fun = function()
  388. %% Wait = nowait | {busy_wait, integer()}
  389. %%
  390. %% @doc Picks the first available worker in the pool and applies `Fun'.
  391. %%
  392. %% A `claim' pool allows the caller to "claim" a worker during a short span
  393. %% (essentially, a lock is set and released as soon as `Fun' returns).
  394. %% Once a worker is selected, `Fun(Name, Pid)' is called, where `Name' is a
  395. %% unique gproc name of the worker, and `Pid' is its process identifier.
  396. %% The gproc name of the worker serves as a mutex, where its value is 0 (zero)
  397. %% if the worker is free, and 1 (one) if it is busy. The mutex operation is
  398. %% implemented using `gproc:update_counter/2'.
  399. %%
  400. %% `Wait == nowait' means that the call will return `false' immediately if
  401. %% there is no available worker.
  402. %%
  403. %% `Wait == {busy_wait, Timeout}' will keep repeating the claim attempt
  404. %% for `Timeout' milliseconds. If still no worker is available, it will
  405. %% return `false'.
  406. %% @end
  407. claim(Pool, F, Wait) ->
  408. case gproc:get_value(?POOL(Pool), shared) of
  409. {0, _} -> false;
  410. {_, claim} ->
  411. W = setup_wait(Wait, Pool),
  412. claim_w(Pool, F, W);
  413. _ ->
  414. error(badarg)
  415. end.
  416. claim_w(_Pool, _F, timeout) ->
  417. false;
  418. claim_w(Pool, F, W) ->
  419. case claim_(Pool, F) of
  420. false ->
  421. claim_w(Pool, F, do_wait(W));
  422. Other ->
  423. clear_wait(W),
  424. Other
  425. end.
  426. %% Define how many workers to select in each chunk. We want to strike
  427. %% a good compromise between the cost of succeeding on the first try
  428. %% (likely a common event) and the cost of retrying. In my measurements,
  429. %% if a chunk size of 1 costs ca 30 us (on my Macbook), a chunk size of 5
  430. %% adds only ca 20% to the cost, i.e. a few us.
  431. -define(CLAIM_CHUNK, 5).
  432. claim_(Pool, F) ->
  433. %% Sorry, but we use ets:select/3 here in order to shave off a few us.
  434. case ets:select(gproc, [{ {{{n,l,[?MODULE,Pool,'_','_']},n}, '$1', 0}, [],
  435. [{{ {element,1,{element,1,'$_'}}, '$1' }}]}],
  436. ?CLAIM_CHUNK) of
  437. {[_|_] = Workers, Cont} ->
  438. case try_claim(Workers, F) of
  439. {true, _} = True ->
  440. True;
  441. false ->
  442. claim_cont(Cont, F)
  443. end;
  444. _ ->
  445. false
  446. end.
  447. claim_cont('$end_of_table', _) ->
  448. false;
  449. claim_cont(Cont, F) ->
  450. case ets:select(Cont) of
  451. {[_|_] = Workers, Cont1} ->
  452. case try_claim(Workers, F) of
  453. {true, _} = True ->
  454. True;
  455. false ->
  456. claim_cont(Cont1, F)
  457. end;
  458. _ ->
  459. false
  460. end.
  461. try_claim([], _) ->
  462. false;
  463. try_claim([{K,Pid}|T], F) ->
  464. case try_claim(K, Pid, F) of
  465. false ->
  466. try_claim(T, F);
  467. Other ->
  468. Other
  469. end.
  470. try_claim(K, Pid, F) ->
  471. case gproc:update_counter(K, [0, {1, 1, 1}]) of
  472. [0, 1] ->
  473. %% have lock
  474. execute_claim(F, K, Pid);
  475. [1, 1] ->
  476. %% no
  477. false
  478. end.
  479. %% Wrapper to handle the case where the claimant gets killed by another
  480. %% process while executing within the critical section.
  481. %% This is likely a rare case, but if it happens, the claim would never
  482. %% get released.
  483. %% Solution:
  484. %% - spawn a monitoring process which resets the claim if the parent dies
  485. %% (spawn_link() might be more efficient, but we cannot enable trap_exit
  486. %% atomically, which introduces a race condition).
  487. %% - for all return types, kill the monitor and release the claim.
  488. %% - the one case where the monitor *isn't* killed is when Parent itself
  489. %% is killed before executing the `after' clause. In this case, it should
  490. %% be safe to release the claim from the monitoring process.
  491. %%
  492. %% Overhead in the normal case:
  493. %% - spawning the monitoring process
  494. %% - (possibly scheduling the monitoring process to set up the monitor)
  495. %% - killing the monitoring process (untrappably)
  496. %% Timing the overhead over 100,000 claims on a Core i7 MBP running OTP 17,
  497. %% this wrapper increases the cost of a minimal claim from ca 3 us to
  498. %% ca 7-8 us.
  499. execute_claim(F, K, Pid) ->
  500. Parent = self(),
  501. Mon = spawn(
  502. fun() ->
  503. Ref = erlang:monitor(process, Parent),
  504. receive
  505. {'DOWN', Ref, _, _, _} ->
  506. gproc:reset_counter(K)
  507. end
  508. end),
  509. try begin
  510. Res = F(K, Pid),
  511. {true, Res}
  512. end
  513. after
  514. exit(Mon, kill),
  515. gproc:reset_counter(K)
  516. end.
  517. setup_wait(nowait, _) ->
  518. nowait;
  519. setup_wait({busy_wait, MS}, Pool) ->
  520. Ref = erlang:send_after(MS, self(), {claim, Pool}),
  521. {busy_wait, Ref}.
  522. do_wait(nowait) ->
  523. timeout;
  524. do_wait({busy_wait, Ref} = W) ->
  525. %% Yielding here serves two purposes:
  526. %% 1) Increase the chance that whoever's before us can finish
  527. %% 2) The value of read_timer/1 only refreshes after yield (so I've heard)
  528. erlang:yield(),
  529. case erlang:read_timer(Ref) of
  530. false ->
  531. erlang:cancel_timer(Ref),
  532. timeout;
  533. _ ->
  534. W
  535. end.
  536. clear_wait(nowait) ->
  537. ok;
  538. clear_wait({busy_wait, Ref}) ->
  539. erlang:cancel_timer(Ref),
  540. ok.
  541. %% @spec log(GprocKey) -> integer()
  542. %% @doc Update a counter associated with a worker name.
  543. %%
  544. %% Each added worker has a gproc counter that can be used e.g. to keep track of
  545. %% the number of times the worker has been picked. Since it's associated with the
  546. %% named 'slot', and not to the connected worker, its value will persist even
  547. %% if the currently connected worker dies.
  548. %% @end
  549. log({n,l,[?MODULE,Pool,_,Name]}) ->
  550. gproc:update_shared_counter(?POOL_WRK(Pool,Name), 1).
  551. %% @spec randomize(Pool::any()) -> integer()
  552. %% @doc Randomizes the "next" pointer for the pool.
  553. %%
  554. %% This function only has an effect for `round_robin' pools, which have a
  555. %% reference to the next worker to be picked. Without randomizing, the load
  556. %% balancing will always start with the first worker in the pool.
  557. %% @end
  558. randomize(Pool) ->
  559. case pool_size(Pool) of
  560. 0 -> 0;
  561. 1 -> 1;
  562. Sz ->
  563. incr(Pool, rand:uniform(Sz + 1) - 1, Sz)
  564. end.
  565. %% @spec pool_size(Pool::any()) -> integer()
  566. %% @doc Return the size of the pool.
  567. %%
  568. pool_size(Pool) ->
  569. {Sz, _} = gproc:get_value(?POOL(Pool), shared),
  570. Sz.
  571. %% ===================================================================
  572. %% Start, stop, call gen_server
  573. %% @private
  574. start_link() ->
  575. gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
  576. %% @private
  577. init([]) ->
  578. {ok, #st{}}.
  579. %% @private
  580. call(Req) ->
  581. case gen_server:call(?MODULE, Req) of
  582. badarg ->
  583. error(badarg);
  584. {badarg, Reason} ->
  585. error(Reason);
  586. Reply ->
  587. Reply
  588. end.
  589. %% ===================================================================
  590. %% Gen_server callbacks
  591. %% @private
  592. handle_call(Req, From, S) ->
  593. try handle_call_(Req, From, S)
  594. catch
  595. error:Reason ->
  596. {reply, {badarg, Reason}, S}
  597. end.
  598. handle_call_({new, Pool, Type, Opts}, _, S) ->
  599. new_(Pool, Type, Opts),
  600. {reply, ok, S};
  601. handle_call_({delete, Pool}, _, S) ->
  602. delete_(Pool),
  603. {reply, ok, S};
  604. handle_call_({force_delete, Pool}, _, S) ->
  605. force_delete_(Pool),
  606. {reply, ok, S};
  607. handle_call_({add_worker, Pool, Name}, _, S) ->
  608. N = add_worker_(Pool, Name),
  609. {reply, N, S};
  610. handle_call_({add_worker, Pool, Name, Pos}, _, S) ->
  611. N = add_worker_(Pool, Name, Pos),
  612. {reply, N, S};
  613. handle_call_({set_pool_size, Pool, Sz}, _, S) ->
  614. Workers = get_workers_(Pool),
  615. case get_last_worker_n(Workers) of
  616. N when N > Sz ->
  617. {reply, badarg, S};
  618. _ ->
  619. set_pool_size_(?POOL(Pool), Sz, Workers),
  620. {reply, true, S}
  621. end;
  622. handle_call_({remove_worker, Pool, Name}, _, S) ->
  623. ok = remove_worker_(Pool, Name),
  624. {reply, true, S}.
  625. %% @private
  626. handle_cast(_, S) ->
  627. {noreply, S}.
  628. %% @private
  629. handle_info(_, S) ->
  630. {noreply, S}.
  631. %% @private
  632. terminate(_, _) ->
  633. ok.
  634. %% @private
  635. code_change(_, S, _) ->
  636. {ok, S}.
  637. %% ===================================================================
  638. %% Internal functions
  639. new_(Pool, Type, Opts) ->
  640. valid_type(Type),
  641. Size = proplists:get_value(size, Opts, 0),
  642. Workers = lists:seq(1, Size),
  643. K = ?POOL(Pool),
  644. try gproc:reg_shared(K, {Size, Type})
  645. catch
  646. error:_ -> error(exists)
  647. end,
  648. Opts1 =
  649. case lists:keyfind(auto_size, 1, Opts) of
  650. false ->
  651. Opts ++ [{auto_size, not lists:keymember(size, 1, Opts)}];
  652. {_, Bool} when is_boolean(Bool) ->
  653. Opts
  654. end,
  655. gproc:set_attributes_shared(K, Opts1),
  656. set_workers(K, Workers),
  657. gproc:reg_shared(?POOL_CUR(Pool), Size).
  658. valid_type(T) when T==round_robin; T==hash; T==random; T==direct; T==claim ->
  659. true;
  660. valid_type(_) ->
  661. error(invalid_type).
  662. set_pool_size_(K, Sz, Workers) ->
  663. {_, Type} = gproc:get_value(K, shared),
  664. case length(Workers) of
  665. Sz ->
  666. set_workers(K, Workers);
  667. Len when Len > Sz ->
  668. Workers1 = lists:sublist(Workers, 1, Sz),
  669. set_workers(K, Workers1);
  670. Len when Len < Sz ->
  671. Workers1 = Workers ++ lists:seq(Len+1, Sz),
  672. set_workers(K, Workers1)
  673. end,
  674. gproc:set_value_shared(K, {Sz, Type}).
  675. delete_(Pool) ->
  676. K = ?POOL(Pool),
  677. Ws = get_workers_(K),
  678. case [1 || {_,_} <- Ws] of
  679. [] ->
  680. gproc:unreg_shared(K),
  681. gproc:unreg_shared(?POOL_CUR(Pool));
  682. [_|_] ->
  683. error(not_empty)
  684. end.
  685. force_delete_(Pool) ->
  686. Props = gproc:select({l,p}, [{ {?POOL(Pool), '_', '_'}, [], ['$_']}]),
  687. Cur = gproc:select({l,c}, [{ {?POOL_CUR(Pool), '_', '_'}, [], ['$_']}]),
  688. Workers = gproc:select(
  689. {l,c}, [{ {?POOL_WRK(Pool,'_'), '_', '_'}, [], ['$_']}]),
  690. Names = find_names(Pool, '_'),
  691. lists:foreach(
  692. fun({Key, Pid, _}) when Pid == self() -> gproc:unreg(Key);
  693. ({_, Pid, _}) when is_pid(Pid) -> exit(Pid, kill)
  694. end, Names),
  695. [gproc:unreg_shared(W) || {W,shared,_} <- Cur ++ Props ++ Workers],
  696. true.
  697. find_names(Pool, Pid) ->
  698. gproc:select(
  699. {l,n}, [{ {{n,l,[?MODULE,Pool,Pid,'_']}, '_', '_'}, [], ['$_']}]).
  700. add_worker_(Pool, Name) ->
  701. K = ?POOL(Pool),
  702. {Sz, Type} = gproc:get_value(K, shared),
  703. AutoSz = gproc:get_attribute(K, shared, auto_size),
  704. Ws0 = get_workers_(K),
  705. {N,Ws1} =
  706. case lists:keymember(Name, 1, Ws0) of
  707. false ->
  708. case find_slot(Name, K, Ws0, Sz, Type, AutoSz) of
  709. {_, _} = Res ->
  710. Res;
  711. false ->
  712. error(pool_full)
  713. end;
  714. true ->
  715. error(exists)
  716. end,
  717. if N > Sz ->
  718. set_pool_size_(K, N, Ws1); % also calls set_workers/2
  719. true ->
  720. %% size not changed
  721. set_workers(K, Ws1)
  722. end,
  723. reg_worker(Pool, Name, N),
  724. N.
  725. add_worker_(Pool, Name, Pos) ->
  726. K = ?POOL(Pool),
  727. {Sz, _} = gproc:get_value(K, shared),
  728. Ws0 = get_workers_(K),
  729. if Pos > Sz ->
  730. case gproc:get_attribute(K, shared, auto_size) of
  731. true ->
  732. Ws1 = Ws0 ++ lists:seq(Sz+1,Pos-1) ++ [{Name, Pos}],
  733. set_pool_size_(K, Pos, Ws1);
  734. false ->
  735. error(out_of_range)
  736. end;
  737. true ->
  738. case lists:nth(Pos, Ws0) of
  739. {_,_} -> error(exists);
  740. P when is_integer(P) ->
  741. Ws1 = set_pos(Pos, Ws0, {Name, Pos}),
  742. set_workers(K, Ws1)
  743. end
  744. end,
  745. reg_worker(Pool, Name, Pos),
  746. Pos.
  747. reg_worker(Pool, Name, Pos) ->
  748. gproc:reg_shared(Wrk = ?POOL_WRK(Pool, Name), 0),
  749. gproc:set_attributes_shared(Wrk, [{n, Pos}]).
  750. remove_worker_(Pool, Name) ->
  751. case whereis_worker(Pool, Name) of
  752. Pid when is_pid(Pid) ->
  753. error({worker_connected, Pid});
  754. undefined ->
  755. do_remove_worker_(Pool, Name)
  756. end.
  757. do_remove_worker_(Pool, Name) ->
  758. K = ?POOL(Pool),
  759. AutoSize = gproc:get_attribute(K, shared, auto_size),
  760. Ws0 = get_workers_(K),
  761. Ws1 = del_slot(Name, Ws0, AutoSize),
  762. gproc:unreg_shared(?POOL_WRK(Pool, Name)),
  763. case AutoSize of
  764. false -> ok;
  765. true ->
  766. case (NewLen = length(Ws1)) - length(Ws0) of
  767. 0 -> ok;
  768. Diff when Diff < 0 ->
  769. {_, Type} = gproc:get_value(K, shared),
  770. gproc:set_value_shared(K, {NewLen, Type})
  771. end
  772. end,
  773. gproc:set_attributes_shared(K, [{workers, Ws1}]),
  774. ok.
  775. del_slot(Name, [{Name,_}], true) ->
  776. [];
  777. del_slot(Name, [{Name, Pos}|T], _AutoSize) ->
  778. [Pos|T];
  779. del_slot(Name, [H|T], AutoSize) ->
  780. [H|del_slot(Name, T, AutoSize)].
  781. find_slot(Name, _, [], Sz, _, Auto) ->
  782. case {Sz, Auto} of
  783. {0, false} -> false;
  784. {_, _} ->
  785. {1, [{Name, 1}]}
  786. end;
  787. find_slot(Name, Key, Workers, Sz, Type, AutoSz) ->
  788. case get_strategy(Key, Type) of
  789. packed ->
  790. find_slot_packed(Name, Workers, AutoSz);
  791. sparse ->
  792. find_slot_sparse(Name, Workers, Sz, AutoSz)
  793. end.
  794. %% find_slot(Name, Key, Workers, Sz, Type, AutoSz, Strategy).
  795. %% find_slot(Name, []) ->
  796. %% {1, [{Name, 1}]};
  797. %% find_slot(Name, Slots) ->
  798. %% find_slot(Name, Slots, []).
  799. get_last_worker_n(Ws) ->
  800. get_last_worker_n(Ws, 0, 1).
  801. get_last_worker_n([{_,_}|T], _, P) ->
  802. get_last_worker_n(T, P, P+1);
  803. get_last_worker_n([H|T], Last, P) when is_integer(H) ->
  804. get_last_worker_n(T, Last, P+1);
  805. get_last_worker_n([], Last, _) ->
  806. Last.
  807. find_slot_packed(Name, Workers, AutoSz) ->
  808. find_slot_packed(Name, Workers, AutoSz, []).
  809. find_slot_packed(Name, [N|T], _, Acc) when is_integer(N) -> % empty slot
  810. {N, lists:reverse(Acc) ++ [{Name, N}|T]};
  811. find_slot_packed(Name, [{_,Prev} = Last], true, Acc) -> % last elem; expand pool
  812. New = Prev+1,
  813. {New, lists:reverse([{Name, New}, Last|Acc])};
  814. find_slot_packed(_, [_], false, _) ->
  815. false;
  816. find_slot_packed(Name, [{_,_} = H|T], Auto, Acc) ->
  817. find_slot_packed(Name, T, Auto, [H|Acc]).
  818. find_slot_sparse(Name, Ws, Sz, Auto) ->
  819. %% Collect the position of the first and last filled slots, as well as
  820. %% the largest gap between filled slots
  821. case lists:foldl(
  822. fun(N, {Prev, StartP, First, Last, Max, MaxP}) when is_integer(N) ->
  823. case Prev+1 of
  824. Gap when Gap > Max ->
  825. {Gap, StartP, First, Last, Gap, StartP};
  826. Gap ->
  827. {Gap, StartP, First, Last, Max, MaxP}
  828. end;
  829. (N, []) when is_integer(N) ->
  830. %% skip
  831. [];
  832. ({_, Pos}, []) ->
  833. {0, Pos, _First = Pos, _Last = Pos, 0, 0};
  834. ({_, Pos}, {Prev, StartP, First, _PrevLast, Max, MaxP}) ->
  835. if Prev > Max ->
  836. {0, Pos, First, Pos, Prev, StartP};
  837. true ->
  838. {0, Pos, First, Pos, Max, MaxP}
  839. end
  840. end, [], Ws) of
  841. [] ->
  842. %% all empty slots
  843. case {Sz, Auto} of
  844. {0, false} ->
  845. false;
  846. {0, true} ->
  847. {1, [{Name, 1}]};
  848. {_, _} when is_integer(Sz), Sz > 0 ->
  849. {1, [{Name, 1}|tl(Ws)]}
  850. end;
  851. {_, _, 1, Last, 0, _} ->
  852. %% Pool full
  853. if Auto ->
  854. NewPos = Last + 1,
  855. {NewPos, Ws ++ [{Name, NewPos}]};
  856. true ->
  857. false
  858. end;
  859. {_, _, First, Last, MaxGap, StartPos} ->
  860. WrapGap = (Sz - Last) + First - 1,
  861. NewPos = if WrapGap >= MaxGap ->
  862. (Last + (WrapGap div 2) + 1) rem (Sz+1);
  863. true ->
  864. (StartPos + (MaxGap div 2) + 1) rem (Sz+1)
  865. end,
  866. {NewPos, set_pos(NewPos, Ws, {Name, NewPos})}
  867. end.
  868. set_pos(P, L, X) when P > 0, is_list(L) ->
  869. set_pos(P, 1, L, X).
  870. set_pos(P, P, [_|T], X) ->
  871. [X|T];
  872. set_pos(P, C, [H|T], X) when C < P ->
  873. [H|set_pos(P, C+1, T, X)].
  874. get_workers_(K) ->
  875. case gproc:get_attribute(K, shared, workers) of
  876. undefined ->
  877. [];
  878. L when is_list(L) ->
  879. L
  880. end.
  881. set_workers(K, L) when is_list(L) ->
  882. gproc:set_attributes_shared(K, [{workers, L}]).
  883. get_strategy(Key, Type) ->
  884. Default = case Type of
  885. round_robin -> packed;
  886. random -> sparse;
  887. hash -> sparse;
  888. direct -> packed;
  889. claim -> packed
  890. end,
  891. attribute(Key, fill_strategy, Default).
  892. attribute(Key, A, Default) ->
  893. case gproc:get_attribute(Key, shared, A) of
  894. undefined -> Default;
  895. Value -> Value
  896. end.
  897. incr(Pool, Incr, Sz) ->
  898. gproc:update_counter(?POOL_CUR(Pool), shared, {Incr, Sz, 1}).
  899. %% find_worker(Pool, Name) ->
  900. %% case gproc:select(n, [{ {{n, l, {?MODULE, Pool, '_'}}, '_', Name},
  901. %% [], ['$_'] }]) of
  902. %% [] ->
  903. %% undefined;
  904. %% [{{n,l,{?MODULE,_,N}}, Pid, _}] ->
  905. %% {N, Pid}
  906. %% end.
  907. %% ============================= Test code ===========================
  908. %% @private
  909. test(N) when N > 0 ->
  910. test(N, round_robin, []).
  911. %% @private
  912. test(N, Type, Opts) when Type==round_robin;
  913. Type==random;
  914. Type==hash;
  915. Type==direct;
  916. Type==claim ->
  917. P = ?LINE,
  918. setup_test_pool(P, Type, Opts),
  919. try timer:tc(?MODULE, f(Type), [N, P])
  920. after
  921. remove_test_pool(P)
  922. end.
  923. ptest(N, I, Type, Opts) ->
  924. P = ?LINE,
  925. setup_test_pool(P, Type, Opts),
  926. F = f(Type),
  927. Pids =
  928. [spawn_monitor(fun() -> exit({ok, timer:tc(?MODULE, F, [I, P])}) end)
  929. || _ <- lists:seq(1, N)],
  930. try collect(Pids)
  931. after
  932. remove_test_pool(P)
  933. end.
  934. collect(Pids) ->
  935. Results = [receive
  936. {'DOWN', Ref, _, _, Reason} ->
  937. Reason
  938. end || {_, Ref} <- Pids],
  939. {Times, Avgs} = lists:foldr(fun({ok, {T, Avg}}, {A,B}) ->
  940. {[T|A], [Avg|B]} end,
  941. {[],[]}, Results),
  942. {Times, lists:sum(Times)/length(Times),
  943. lists:sum(Avgs)/length(Avgs)}.
  944. f(Type) when Type==hash; Type==direct ->
  945. test_run1;
  946. f(Type) when Type==claim ->
  947. test_run2;
  948. f({empty,_}) ->
  949. test_run0;
  950. f(_) ->
  951. test_run.
  952. %% @private
  953. setup_test_pool(P, Type, Opts) ->
  954. setup_test_pool(P, Type, Opts, test_workers()).
  955. setup_test_pool(P, Type0, Opts, Workers) ->
  956. Type = case Type0 of {_, T} -> T; T when is_atom(T) -> T end,
  957. new(P, Type, Opts),
  958. [begin R = add_worker(P, W),
  959. io:fwrite("add_worker(~p, ~p) -> ~p; Ws = ~p~n",
  960. [P, W, R, get_workers_(?POOL(P))]),
  961. connect_worker(P, W)
  962. end || W <- Workers].
  963. %% @private
  964. remove_test_pool(P) ->
  965. io:fwrite("worker stats (~p):~n"
  966. "~p~n", [P, gproc:select(
  967. {l,c},
  968. [{ {{c,l,{?MODULE,P,w,'$1'}},'_','$2'}, [],
  969. [{{'$1','$2'}}] }])]),
  970. [begin disconnect_worker(P, W),
  971. remove_worker(P, W)
  972. end || W <- test_workers()],
  973. delete(P).
  974. test_workers() -> [a,b,c,d,e,f].
  975. %% @private
  976. test_run(N, P) ->
  977. test_run(N, P, 0, 0).
  978. test_run(N, P, S, M) when N > 0 ->
  979. {T, Worker} = timer:tc(?MODULE, pick, [P]),
  980. true = (Worker =/= false),
  981. log(Worker),
  982. timer:sleep(rand:uniform(50)),
  983. test_run(N-1, P, S+T, M+1);
  984. test_run(_, _, S, M) ->
  985. S/M.
  986. %% @private
  987. test_run1(N, P) ->
  988. test_run1(N, P, 0, 0).
  989. test_run1(N, P, S, M) when N > 0 ->
  990. {T, Worker} = timer:tc(?MODULE, pick, [P, N]),
  991. true = (Worker =/= false),
  992. log(Worker),
  993. timer:sleep(rand:uniform(50)),
  994. test_run1(N-1, P, S+T, M+1);
  995. test_run1(_, _, S, M) ->
  996. S/M.
  997. %% @private
  998. test_run2(N, P) ->
  999. test_run2(N, P, fun(K,_) ->
  1000. R = log(K),
  1001. timer:sleep(rand:uniform(50)),
  1002. R
  1003. end, 0, 0).
  1004. test_run2(N, P, F, S, M) when N > 0 ->
  1005. {T, {true, _}} = timer:tc(?MODULE, claim, [P, F, {busy_wait, 5000}]),
  1006. test_run2(N-1, P, F, S+T, M+1);
  1007. test_run2(_, _, _, S, M) ->
  1008. S/M.
  1009. test_run0(N, X) when N > 0 ->
  1010. test_run0(N-1, X);
  1011. test_run0(_, _) ->
  1012. ok.