gproc.erl 34 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202
  1. %% ``The contents of this file are subject to the Erlang Public License,
  2. %% Version 1.1, (the "License"); you may not use this file except in
  3. %% compliance with the License. You should have received a copy of the
  4. %% Erlang Public License along with this software. If not, it can be
  5. %% retrieved via the world wide web at http://www.erlang.org/.
  6. %%
  7. %% Software distributed under the License is distributed on an "AS IS"
  8. %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  9. %% the License for the specific language governing rights and limitations
  10. %% under the License.
  11. %%
  12. %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
  13. %% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
  14. %% AB. All Rights Reserved.''
  15. %%
  16. %% @author Ulf Wiger <ulf.wiger@erlang-consulting.com>
  17. %%
  18. %% @doc Extended process registry
  19. %% <p>This module implements an extended process registry</p>
  20. %% <p>For a detailed description, see
  21. %% <a href="erlang07-wiger.pdf">erlang07-wiger.pdf</a>.</p>
  22. %%
  23. %% @type type() = n | p | c | a. n = name; p = property; c = counter;
  24. %% a = aggregate_counter
  25. %% @type scope() = l | g. l = local registration; g = global registration
  26. %% @type context() = {scope(), type()} | type(). Local scope is the default
  27. %% @type sel_type() = n | p | c | a |
  28. %% names | props | counters | aggr_counters.
  29. %% @type headpat() = {keypat(),pidpat(),ValPat}.
  30. %% @type keypat() = {sel_type() | sel_var(),
  31. %% l | g | sel_var(),
  32. %% any()}.
  33. %% @type pidpat() = pid() | sel_var().
  34. %% sel_var() = DollarVar | '_'.
  35. %% @type sel_pattern() = [{headpat(), Guards, Prod}].
  36. %% @type key() = {type(), scope(), any()}
  37. %% @end
  38. -module(gproc).
  39. -behaviour(gen_server).
  40. -export([start_link/0,
  41. reg/1, reg/2, unreg/1,
  42. mreg/3,
  43. set_value/2,
  44. get_value/1,
  45. where/1,
  46. await/1, await/2,
  47. nb_wait/1,
  48. cancel_wait/2,
  49. lookup_pid/1,
  50. lookup_pids/1,
  51. lookup_values/1,
  52. update_counter/2,
  53. send/2,
  54. info/1, info/2,
  55. select/1, select/2, select/3,
  56. first/1,
  57. next/2,
  58. prev/2,
  59. last/1,
  60. table/1, table/2]).
  61. %% Convenience functions
  62. -export([add_local_name/1,
  63. add_global_name/1,
  64. add_local_property/2,
  65. add_global_property/2,
  66. add_local_counter/2,
  67. add_global_counter/2,
  68. add_local_aggr_counter/1,
  69. add_global_aggr_counter/1,
  70. lookup_local_name/1,
  71. lookup_global_name/1,
  72. lookup_local_properties/1,
  73. lookup_global_properties/1,
  74. lookup_local_counters/1,
  75. lookup_global_counters/1,
  76. lookup_local_aggr_counter/1,
  77. lookup_global_aggr_counter/1]).
  78. -export([default/1]).
  79. %%% internal exports
  80. -export([init/1,
  81. handle_cast/2,
  82. handle_call/3,
  83. handle_info/2,
  84. code_change/3,
  85. terminate/2]).
  86. -include("gproc.hrl").
  87. -define(SERVER, ?MODULE).
  88. %%-define(l, l(?LINE)). % when activated, calls a traceable empty function
  89. -define(l, ignore).
  90. -define(CHK_DIST,
  91. case whereis(gproc_dist) of
  92. undefined ->
  93. erlang:error(local_only);
  94. _ ->
  95. ok
  96. end).
  97. -record(state, {}).
  98. %% @spec () -> {ok, pid()}
  99. %%
  100. %% @doc Starts the gproc server.
  101. %%
  102. %% This function is intended to be called from gproc_sup, as part of
  103. %% starting the gproc application.
  104. %% @end
  105. start_link() ->
  106. create_tabs(),
  107. gen_server:start({local, ?SERVER}, ?MODULE, [], []).
  108. %% spec(Name::any()) -> true
  109. %%
  110. %% @doc Registers a local (unique) name. @equiv reg({n,l,Name})
  111. %% @end
  112. %%
  113. add_local_name(Name) -> reg({n,l,Name}, undefined).
  114. %% spec(Name::any()) -> true
  115. %%
  116. %% @doc Registers a global (unique) name. @equiv reg({n,g,Name})
  117. %% @end
  118. %%
  119. add_global_name(Name) -> reg({n,g,Name}, undefined).
  120. %% spec(Name::any(), Value::any()) -> true
  121. %%
  122. %% @doc Registers a local (non-unique) property. @equiv reg({p,l,Name},Value)
  123. %% @end
  124. %%
  125. add_local_property(Name , Value) -> reg({p,l,Name}, Value).
  126. %% spec(Name::any(), Value::any()) -> true
  127. %%
  128. %% @doc Registers a global (non-unique) property. @equiv reg({p,g,Name},Value)
  129. %% @end
  130. %%
  131. add_global_property(Name, Value) -> reg({p,g,Name}, Value).
  132. %% spec(Name::any(), Initial::integer()) -> true
  133. %%
  134. %% @doc Registers a local (non-unique) counter. @equiv reg({c,l,Name},Value)
  135. %% @end
  136. %%
  137. add_local_counter(Name, Initial) when is_integer(Initial) ->
  138. reg({c,l,Name}, Initial).
  139. %% spec(Name::any(), Initial::integer()) -> true
  140. %%
  141. %% @doc Registers a global (non-unique) counter. @equiv reg({c,g,Name},Value)
  142. %% @end
  143. %%
  144. add_global_counter(Name, Initial) when is_integer(Initial) ->
  145. reg({c,g,Name}, Initial).
  146. %% spec(Name::any()) -> true
  147. %%
  148. %% @doc Registers a local (unique) aggregated counter.
  149. %% @equiv reg({a,l,Name})
  150. %% @end
  151. %%
  152. add_local_aggr_counter(Name) -> reg({a,l,Name}).
  153. %% spec(Name::any()) -> true
  154. %%
  155. %% @doc Registers a global (unique) aggregated counter.
  156. %% @equiv reg({a,g,Name})
  157. %% @end
  158. %%
  159. add_global_aggr_counter(Name) -> reg({a,g,Name}).
  160. %% @spec (Name::any()) -> pid()
  161. %%
  162. %% @doc Lookup a local unique name. Fails if there is no such name.
  163. %% @equiv where({n,l,Name})
  164. %% @end
  165. %%
  166. lookup_local_name(Name) -> where({n,l,Name}).
  167. %% @spec (Name::any()) -> pid()
  168. %%
  169. %% @doc Lookup a global unique name. Fails if there is no such name.
  170. %% @equiv where({n,g,Name})
  171. %% @end
  172. %%
  173. lookup_global_name(Name) -> where({n,g,Name}).
  174. %% @spec (Name::any()) -> integer()
  175. %%
  176. %% @doc Lookup a local (unique) aggregated counter and returns its value.
  177. %% Fails if there is no such object.
  178. %% @equiv where({a,l,Name})
  179. %% @end
  180. %%
  181. lookup_local_aggr_counter(Name) -> lookup_value({a,l,Name}).
  182. %% @spec (Name::any()) -> integer()
  183. %%
  184. %% @doc Lookup a global (unique) aggregated counter and returns its value.
  185. %% Fails if there is no such object.
  186. %% @equiv where({a,g,Name})
  187. %% @end
  188. %%
  189. lookup_global_aggr_counter(Name) -> lookup_value({a,g,Name}).
  190. %% @spec (Property::any()) -> [{pid(), Value}]
  191. %%
  192. %% @doc Look up all local (non-unique) instances of a given Property.
  193. %% Returns a list of {Pid, Value} tuples for all matching objects.
  194. %% @equiv lookup_values({p, l, Property})
  195. %% @end
  196. %%
  197. lookup_local_properties(P) -> lookup_values({p,l,P}).
  198. %% @spec (Property::any()) -> [{pid(), Value}]
  199. %%
  200. %% @doc Look up all global (non-unique) instances of a given Property.
  201. %% Returns a list of {Pid, Value} tuples for all matching objects.
  202. %% @equiv lookup_values({p, g, Property})
  203. %% @end
  204. %%
  205. lookup_global_properties(P) -> lookup_values({p,g,P}).
  206. %% @spec (Counter::any()) -> [{pid(), Value::integer()}]
  207. %%
  208. %% @doc Look up all local (non-unique) instances of a given Counter.
  209. %% Returns a list of {Pid, Value} tuples for all matching objects.
  210. %% @equiv lookup_values({c, l, Counter})
  211. %% @end
  212. %%
  213. lookup_local_counters(P) -> lookup_values({c,l,P}).
  214. %% @spec (Counter::any()) -> [{pid(), Value::integer()}]
  215. %%
  216. %% @doc Look up all global (non-unique) instances of a given Counter.
  217. %% Returns a list of {Pid, Value} tuples for all matching objects.
  218. %% @equiv lookup_values({c, g, Counter})
  219. %% @end
  220. %%
  221. lookup_global_counters(P) -> lookup_values({c,g,P}).
  222. %% @spec reg(Key::key()) -> true
  223. %%
  224. %% @doc
  225. %% @equiv reg(Key, default(Key))
  226. %% @end
  227. reg(Key) ->
  228. reg(Key, default(Key)).
  229. default({T,_,_}) when T==c -> 0;
  230. default(_) -> undefined.
  231. %% @spec await(Key::key(), Timeout) -> {pid(),Value}
  232. %% Timeout = integer() | infinity
  233. %%
  234. %% @doc Wait for a local name to be registered.
  235. %% The function raises an exception if the timeout expires. Timeout must be
  236. %% either an interger &gt; 0 or 'infinity'.
  237. %% A small optimization: we first perform a lookup, to see if the name
  238. %% is already registered. This way, the cost of the operation will be
  239. %% roughly the same as of where/1 in the case where the name is already
  240. %% registered (the difference: await/2 also returns the value).
  241. %% @end
  242. %%
  243. await(Key) ->
  244. await(Key, infinity).
  245. await({n,g,_} = Key, Timeout) ->
  246. ?CHK_DIST,
  247. request_wait(Key, Timeout);
  248. await({n,l,_} = Key, Timeout) ->
  249. case ets:lookup(?TAB, {Key, n}) of
  250. [{_, Pid, Value}] ->
  251. {Pid, Value};
  252. _ ->
  253. request_wait(Key, Timeout)
  254. end;
  255. await(K, T) ->
  256. erlang:error(badarg, [K, T]).
  257. request_wait({n,C,_} = Key, Timeout) when C==l; C==g ->
  258. TRef = case Timeout of
  259. infinity -> no_timer;
  260. T when is_integer(T), T > 0 ->
  261. erlang:start_timer(T, self(), timeout);
  262. _ ->
  263. erlang:error(badarg, [Key, Timeout])
  264. end,
  265. WRef = call({await,Key,self()}, C),
  266. receive
  267. {gproc, WRef, registered, {_K, Pid, V}} ->
  268. {Pid, V};
  269. {timeout, TRef, timeout} ->
  270. cancel_wait(Key, WRef),
  271. erlang:error(timeout, [Key, Timeout])
  272. end.
  273. %% @spec nb_wait(Key::key()) -> Ref
  274. %%
  275. %% @doc Wait for a local name to be registered.
  276. %% The caller can expect to receive a message,
  277. %% {gproc, Ref, registered, {Key, Pid, Value}}, once the name is registered.
  278. %% @end
  279. %%
  280. nb_wait({n,g,_} = Key) ->
  281. ?CHK_DIST,
  282. call({await, Key, self()}, g);
  283. nb_wait({n,l,_} = Key) ->
  284. call({await, Key, self()}, l);
  285. nb_wait(Key) ->
  286. erlang:error(badarg, [Key]).
  287. cancel_wait({_,g,_} = Key, Ref) ->
  288. ?CHK_DIST,
  289. cast({cancel_wait, self(), Key, Ref}, g),
  290. ok;
  291. cancel_wait({_,l,_} = Key, Ref) ->
  292. cast({cancel_wait, self(), Key, Ref}, l),
  293. ok.
  294. %% @spec reg(Key::key(), Value) -> true
  295. %%
  296. %% @doc Register a name or property for the current process
  297. %%
  298. %%
  299. reg({_,g,_} = Key, Value) ->
  300. %% anything global
  301. ?CHK_DIST,
  302. gproc_dist:reg(Key, Value);
  303. reg({p,l,_} = Key, Value) ->
  304. local_reg(Key, Value);
  305. reg({a,l,_} = Key, undefined) ->
  306. call({reg, Key, undefined});
  307. reg({c,l,_} = Key, Value) when is_integer(Value) ->
  308. call({reg, Key, Value});
  309. reg({n,l,_} = Key, Value) ->
  310. call({reg, Key, Value});
  311. reg(_, _) ->
  312. erlang:error(badarg).
  313. %% @spec mreg(type(), scope(), [{Key::any(), Value::any()}]) -> true
  314. %%
  315. %% @doc Register multiple {Key,Value} pairs of a given type and scope.
  316. %%
  317. %% This function is more efficient than calling {@link reg/2} repeatedly.
  318. %% @end
  319. mreg(T, g, KVL) ->
  320. ?CHK_DIST,
  321. gproc_dist:mreg(T, KVL);
  322. mreg(T, l, KVL) when T==a; T==n ->
  323. if is_list(KVL) ->
  324. call({mreg, T, l, KVL});
  325. true ->
  326. erlang:error(badarg)
  327. end;
  328. mreg(p, l, KVL) ->
  329. local_mreg(p, KVL);
  330. mreg(_, _, _) ->
  331. erlang:error(badarg).
  332. %% @spec (Key:: key()) -> true
  333. %%
  334. %% @doc Unregister a name or property.
  335. %% @end
  336. unreg(Key) ->
  337. case Key of
  338. {_, g, _} ->
  339. ?CHK_DIST,
  340. gproc_dist:unreg(Key);
  341. {T, l, _} when T == n;
  342. T == a -> call({unreg, Key});
  343. {_, l, _} ->
  344. case ets:member(?TAB, {Key,self()}) of
  345. true ->
  346. gproc_lib:remove_reg(Key, self());
  347. false ->
  348. erlang:error(badarg)
  349. end
  350. end.
  351. %% @spec (select_pattern()) -> list(sel_object())
  352. %% @doc
  353. %% @equiv select(all, Pat)
  354. %% @end
  355. select(Pat) ->
  356. select(all, Pat).
  357. %% @spec (Type::sel_type(), Pat::sel_pattern()) -> [{Key, Pid, Value}]
  358. %%
  359. %% @doc Perform a select operation on the process registry.
  360. %%
  361. %% The physical representation in the registry may differ from the above,
  362. %% but the select patterns are transformed appropriately.
  363. %% @end
  364. select(Type, Pat) ->
  365. ets:select(?TAB, pattern(Pat, Type)).
  366. %% @spec (Type::sel_type(), Pat::sel_patten(), Limit::integer()) ->
  367. %% [{Key, Pid, Value}]
  368. %% @doc Like {@link select/2} but returns Limit objects at a time.
  369. %%
  370. %% See [http://www.erlang.org/doc/man/ets.html#select-3].
  371. %% @end
  372. select(Type, Pat, Limit) ->
  373. ets:select(?TAB, pattern(Pat, Type), Limit).
  374. %%% Local properties can be registered in the local process, since
  375. %%% no other process can interfere.
  376. %%%
  377. local_reg(Key, Value) ->
  378. case gproc_lib:insert_reg(Key, Value, self(), l) of
  379. false -> erlang:error(badarg);
  380. true -> monitor_me()
  381. end.
  382. local_mreg(_, []) -> true;
  383. local_mreg(T, [_|_] = KVL) ->
  384. case gproc_lib:insert_many(T, l, KVL, self()) of
  385. false -> erlang:error(badarg);
  386. {true,_} -> monitor_me()
  387. end.
  388. %% @spec (Key :: key(), Value) -> true
  389. %% @doc Sets the value of the registeration entry given by Key
  390. %%
  391. %% Key is assumed to exist and belong to the calling process.
  392. %% If it doesn't, this function will exit.
  393. %%
  394. %% Value can be any term, unless the object is a counter, in which case
  395. %% it must be an integer.
  396. %% @end
  397. %%
  398. set_value({_,g,_} = Key, Value) ->
  399. ?CHK_DIST,
  400. gproc_dist:set_value(Key, Value);
  401. set_value({a,l,_} = Key, Value) when is_integer(Value) ->
  402. call({set, Key, Value});
  403. set_value({n,l,_} = Key, Value) ->
  404. %% we cannot do this locally, since we have to check that the object
  405. %% exists first - not an atomic update.
  406. call({set, Key, Value});
  407. set_value({p,l,_} = Key, Value) ->
  408. %% we _can_ to this locally, since there is no race condition - no
  409. %% other process can update our properties.
  410. case gproc_lib:do_set_value(Key, Value, self()) of
  411. true -> true;
  412. false ->
  413. erlang:error(badarg)
  414. end;
  415. set_value({c,l,_} = Key, Value) when is_integer(Value) ->
  416. gproc_lib:do_set_counter_value(Key, Value, self());
  417. set_value(_, _) ->
  418. erlang:error(badarg).
  419. %% @spec (Key) -> Value
  420. %% @doc Read the value stored with a key registered to the current process.
  421. %%
  422. %% If no such key is registered to the current process, this function exits.
  423. %% @end
  424. get_value(Key) ->
  425. get_value(Key, self()).
  426. get_value({T,_,_} = Key, Pid) when is_pid(Pid) ->
  427. if T==n orelse T==a ->
  428. case ets:lookup(?TAB, {Key, T}) of
  429. [{_, P, Value}] when P == Pid -> Value;
  430. _ -> erlang:error(badarg)
  431. end;
  432. true ->
  433. ets:lookup_element(?TAB, {Key, Pid}, 3)
  434. end;
  435. get_value(_, _) ->
  436. erlang:error(badarg).
  437. %% @spec (Key) -> Pid
  438. %% @doc Lookup the Pid stored with a key.
  439. %%
  440. lookup_pid({_T,_,_} = Key) ->
  441. case where(Key) of
  442. undefined -> erlang:error(badarg);
  443. P -> P
  444. end.
  445. lookup_value({T,_,_} = Key) ->
  446. if T==n orelse T==a ->
  447. ets:lookup_element(?TAB, {Key,T}, 3);
  448. true ->
  449. erlang:error(badarg)
  450. end.
  451. %% @spec (Key::key()) -> pid()
  452. %%
  453. %% @doc Returns the pid registered as Key
  454. %%
  455. %% The type of registration entry must be either name or aggregated counter.
  456. %% Otherwise this function will exit. Use {@link lookup_pids/1} in these
  457. %% cases.
  458. %% @end
  459. %%
  460. where({T,_,_}=Key) ->
  461. if T==n orelse T==a ->
  462. case ets:lookup(?TAB, {Key,T}) of
  463. [{_, P, _Value}] ->
  464. P;
  465. _ -> % may be [] or [{Key,Waiters}]
  466. undefined
  467. end;
  468. true ->
  469. erlang:error(badarg)
  470. end.
  471. %% @spec (Key::key()) -> [pid()]
  472. %%
  473. %% @doc Returns a list of pids with the published key Key
  474. %%
  475. %% If the type of registration entry is either name or aggregated counter,
  476. %% this function will return either an empty list, or a list of one pid.
  477. %% For non-unique types, the return value can be a list of any length.
  478. %% @end
  479. %%
  480. lookup_pids({T,_,_} = Key) ->
  481. if T==n orelse T==a ->
  482. ets:select(?TAB, [{{{Key,T}, '$1', '_'},[],['$1']}]);
  483. true ->
  484. ets:select(?TAB, [{{{Key,'_'}, '$1', '_'},[],['$1']}])
  485. %%% true ->
  486. %%% erlang:error(badarg)
  487. end.
  488. %% @spec (Key::key()) -> [{pid(), Value}]
  489. %%
  490. %% @doc Retrieve the `{Pid,Value}' pairs corresponding to Key.
  491. %%
  492. %% Key refer to any type of registry object. If it refers to a unique
  493. %% object, the list will be of length 0 or 1. If it refers to a non-unique
  494. %% object, the return value can be a list of any length.
  495. %% @end
  496. %%
  497. lookup_values({T,_,_} = Key) ->
  498. if T==n orelse T==a ->
  499. ets:select(?TAB, [{{{Key,T}, '$1', '$2'},[],[{{'$1','$2'}}]}]);
  500. true ->
  501. ets:select(?TAB, [{{{Key,'_'}, '$1', '$2'},[],[{{'$1','$2'}}]}])
  502. end.
  503. %% @spec (Key::key(), Incr::integer()) -> integer()
  504. %%
  505. %% @doc Updates the counter registered as Key for the current process.
  506. %%
  507. %% This function works like ets:update_counter/3
  508. %% (see [http://www.erlang.org/doc/man/ets.html#update_counter-3]), but
  509. %% will fail if the type of object referred to by Key is not a counter.
  510. %% @end
  511. %%
  512. update_counter({c,l,_} = Key, Incr) when is_integer(Incr) ->
  513. gproc_lib:update_counter(Key, Incr, self());
  514. update_counter({c,g,_} = Key, Incr) when is_integer(Incr) ->
  515. ?CHK_DIST,
  516. gproc_dist:update_counter(Key, Incr);
  517. update_counter(_, _) ->
  518. erlang:error(badarg).
  519. %% @spec (Key::key(), Msg::any()) -> Msg
  520. %%
  521. %% @doc Sends a message to the process, or processes, corresponding to Key.
  522. %%
  523. %% If Key belongs to a unique object (name or aggregated counter), this
  524. %% function will send a message to the corresponding process, or fail if there
  525. %% is no such process. If Key is for a non-unique object type (counter or
  526. %% property), Msg will be send to all processes that have such an object.
  527. %% @end
  528. %%
  529. send({T,C,_} = Key, Msg) when C==l; C==g ->
  530. if T == n orelse T == a ->
  531. case ets:lookup(?TAB, {Key, T}) of
  532. [{_, Pid, _}] ->
  533. Pid ! Msg;
  534. _ ->
  535. erlang:error(badarg)
  536. end;
  537. T==p orelse T==c ->
  538. %% BUG - if the key part contains select wildcards, we may end up
  539. %% sending multiple messages to the same pid
  540. lists:foreach(fun(Pid) ->
  541. Pid ! Msg
  542. end, lookup_pids(Key)),
  543. Msg;
  544. true ->
  545. erlang:error(badarg)
  546. end;
  547. send(_, _) ->
  548. erlang:error(badarg).
  549. %% @spec (Type :: type()) -> key() | '$end_of_table'
  550. %%
  551. %% @doc Behaves as ets:first(Tab) for a given type of registration object.
  552. %%
  553. %% See [http://www.erlang.org/doc/man/ets.html#first-1].
  554. %% The registry behaves as an ordered_set table.
  555. %% @end
  556. %%
  557. first(Type) ->
  558. {HeadPat,_} = headpat(Type, '_', '_', '_'),
  559. case ets:select(?TAB, [{HeadPat,[],[{element,1,'$_'}]}], 1) of
  560. {[First], _} ->
  561. First;
  562. _ ->
  563. '$end_of_table'
  564. end.
  565. %% @spec (Context :: context()) -> key() | '$end_of_table'
  566. %%
  567. %% @doc Behaves as ets:last(Tab) for a given type of registration object.
  568. %%
  569. %% See [http://www.erlang.org/doc/man/ets.html#last-1].
  570. %% The registry behaves as an ordered_set table.
  571. %% @end
  572. %%
  573. last(Context) ->
  574. {S, T} = get_s_t(Context),
  575. S1 = if S == '_'; S == l -> m;
  576. S == g -> h
  577. end,
  578. Beyond = {{T,S1,[]},[]},
  579. step(ets:prev(?TAB, Beyond), S, T).
  580. %% @spec (Context::context(), Key::key()) -> key() | '$end_of_table'
  581. %%
  582. %% @doc Behaves as ets:next(Tab,Key) for a given type of registration object.
  583. %%
  584. %% See [http://www.erlang.org/doc/man/ets.html#next-2].
  585. %% The registry behaves as an ordered_set table.
  586. %% @end
  587. %%
  588. next(Context, K) ->
  589. {S,T} = get_s_t(Context),
  590. step(ets:next(?TAB,K), S, T).
  591. %% @spec (Context::context(), Key::key()) -> key() | '$end_of_table'
  592. %%
  593. %% @doc Behaves as ets:prev(Tab,Key) for a given type of registration object.
  594. %%
  595. %% See [http://www.erlang.org/doc/man/ets.html#prev-2].
  596. %% The registry behaves as an ordered_set table.
  597. %% @end
  598. %%
  599. prev(Context, K) ->
  600. {S, T} = get_s_t(Context),
  601. step(ets:prev(?TAB, K), S, T).
  602. step(Key, '_', '_') ->
  603. case Key of
  604. {{_,_,_},_} -> Key;
  605. _ -> '$end_of_table'
  606. end;
  607. step(Key, '_', T) ->
  608. case Key of
  609. {{T,_,_},_} -> Key;
  610. _ -> '$end_of_table'
  611. end;
  612. step(Key, S, '_') ->
  613. case Key of
  614. {{_, S, _}, _} -> Key;
  615. _ -> '$end_of_table'
  616. end;
  617. step(Key, S, T) ->
  618. case Key of
  619. {{T, S, _}, _} -> Key;
  620. _ -> '$end_of_table'
  621. end.
  622. %% @spec (Pid::pid()) -> ProcessInfo
  623. %% ProcessInfo = [{gproc, [{Key,Value}]} | ProcessInfo]
  624. %%
  625. %% @doc Similar to `process_info(Pid)' but with additional gproc info.
  626. %%
  627. %% Returns the same information as process_info(Pid), but with the
  628. %% addition of a `gproc' information item, containing the `{Key,Value}'
  629. %% pairs registered to the process.
  630. %% @end
  631. info(Pid) when is_pid(Pid) ->
  632. Items = [?MODULE | [ I || {I,_} <- process_info(self())]],
  633. [info(Pid,I) || I <- Items].
  634. %% @spec (Pid::pid(), Item::atom()) -> {Item, Info}
  635. %%
  636. %% @doc Similar to process_info(Pid, Item), but with additional gproc info.
  637. %%
  638. %% For `Item = gproc', this function returns a list of `{Key, Value}' pairs
  639. %% registered to the process Pid. For other values of Item, it returns the
  640. %% same as [http://www.erlang.org/doc/man/erlang.html#process_info-2].
  641. %% @end
  642. info(Pid, ?MODULE) ->
  643. Keys = ets:select(?TAB, [{ {{Pid,'$1'}, r}, [], ['$1'] }]),
  644. {?MODULE, lists:zf(
  645. fun(K) ->
  646. try V = get_value(K, Pid),
  647. {true, {K,V}}
  648. catch
  649. error:_ ->
  650. false
  651. end
  652. end, Keys)};
  653. info(Pid, I) ->
  654. process_info(Pid, I).
  655. %%% ==========================================================
  656. %% @hidden
  657. handle_cast({monitor_me, Pid}, S) ->
  658. erlang:monitor(process, Pid),
  659. {noreply, S};
  660. handle_cast({cancel_wait, Pid, {T,_,_} = Key, Ref}, S) ->
  661. case ets:lookup(?TAB, {Key,T}) of
  662. [{K, Waiters}] ->
  663. NewWaiters = Waiters -- [{Pid,Ref}],
  664. %% for now, we don't remove the reverse entry. If we should do
  665. %% that, we have to make sure that Pid doesn't have another
  666. %% waiter (which it shouldn't have, given that the wait is
  667. %% synchronous). Keeping it is not problematic - worst case, we
  668. %% will get an unnecessary cleanup.
  669. ets:insert(?TAB, {K, NewWaiters});
  670. _ ->
  671. ignore
  672. end,
  673. {noreply, S}.
  674. %% @hidden
  675. handle_call({reg, {_T,l,_} = Key, Val}, {Pid,_}, S) ->
  676. case try_insert_reg(Key, Val, Pid) of
  677. true ->
  678. gproc_lib:ensure_monitor(Pid,l),
  679. {reply, true, S};
  680. false ->
  681. {reply, badarg, S}
  682. end;
  683. handle_call({unreg, {_,l,_} = Key}, {Pid,_}, S) ->
  684. case ets:member(?TAB, {Pid,Key}) of
  685. true ->
  686. gproc_lib:remove_reg(Key, Pid),
  687. {reply, true, S};
  688. false ->
  689. {reply, badarg, S}
  690. end;
  691. handle_call({await, {_,l,_} = Key, Pid}, {_, Ref}, S) ->
  692. %% Passing the pid explicitly is needed when leader_call is used,
  693. %% since the Pid given as From in the leader is the local gen_leader
  694. %% instance on the calling node.
  695. case gproc_lib:await(Key, {Pid, Ref}) of
  696. noreply ->
  697. {noreply, S};
  698. {reply, Reply, _} ->
  699. {reply, Reply, S}
  700. end;
  701. %% Rev = {{Pid,Key}, r},
  702. %% case ets:lookup(?TAB, {Key,T}) of
  703. %% [{_, P, Value}] ->
  704. %% %% for symmetry, we always reply with Ref and then send a message
  705. %% gen_server:reply(From, Ref),
  706. %% Pid ! {gproc, Ref, registered, {Key, P, Value}},
  707. %% {noreply, S};
  708. %% [{K, Waiters}] ->
  709. %% NewWaiters = [{Pid,Ref} | Waiters],
  710. %% ets:insert(?TAB, [{K, NewWaiters}, Rev]),
  711. %% gproc_lib:ensure_monitor(Pid,l),
  712. %% {reply, Ref, S};
  713. %% [] ->
  714. %% ets:insert(?TAB, [{{Key,T}, [{Pid,Ref}]}, Rev]),
  715. %% gproc_lib:ensure_monitor(Pid,l),
  716. %% {reply, Ref, S}
  717. %% end;
  718. handle_call({mreg, T, l, L}, {Pid,_}, S) ->
  719. try gproc_lib:insert_many(T, l, L, Pid) of
  720. {true,_} -> {reply, true, S};
  721. false -> {reply, badarg, S}
  722. catch
  723. error:_ -> {reply, badarg, S}
  724. end;
  725. handle_call({set, {_,l,_} = Key, Value}, {Pid,_}, S) ->
  726. case gproc_lib:do_set_value(Key, Value, Pid) of
  727. true ->
  728. {reply, true, S};
  729. false ->
  730. {reply, badarg, S}
  731. end;
  732. handle_call(_, _, S) ->
  733. {reply, badarg, S}.
  734. %% @hidden
  735. handle_info({'DOWN', _MRef, process, Pid, _}, S) ->
  736. process_is_down(Pid),
  737. {noreply, S};
  738. handle_info(_, S) ->
  739. {noreply, S}.
  740. %% @hidden
  741. code_change(_FromVsn, S, _Extra) ->
  742. %% We have changed local monitor markers from {Pid} to {Pid,l}.
  743. case ets:select(?TAB, [{{'$1'},[],['$1']}]) of
  744. [] ->
  745. ok;
  746. Pids ->
  747. ets:insert(?TAB, [{P,l} || P <- Pids]),
  748. ets:select_delete(?TAB, [{{'_'},[],[true]}])
  749. end,
  750. {ok, S}.
  751. %% @hidden
  752. terminate(_Reason, _S) ->
  753. ok.
  754. call(Req) ->
  755. call(Req, l).
  756. call(Req, l) ->
  757. chk_reply(gen_server:call(?MODULE, Req), Req);
  758. call(Req, g) ->
  759. chk_reply(gproc_dist:leader_call(Req), Req).
  760. chk_reply(Reply, Req) ->
  761. case Reply of
  762. badarg -> erlang:error(badarg, Req);
  763. Reply -> Reply
  764. end.
  765. cast(Msg) ->
  766. cast(Msg, l).
  767. cast(Msg, l) ->
  768. gen_server:cast(?MODULE, Msg);
  769. cast(Msg, g) ->
  770. gproc_dist:leader_cast(Msg).
  771. try_insert_reg({T,l,_} = Key, Val, Pid) ->
  772. case gproc_lib:insert_reg(Key, Val, Pid, l) of
  773. false ->
  774. case ets:lookup(?TAB, {Key,T}) of
  775. %% In this particular case, the lookup cannot result in
  776. %% [{_, Waiters}], since the insert_reg/4 function would
  777. %% have succeeded then.
  778. [{_, OtherPid, _}] ->
  779. case is_process_alive(OtherPid) of
  780. true ->
  781. false;
  782. false ->
  783. process_is_down(Pid),
  784. true = gproc_lib:insert_reg(Key, Val, Pid, l)
  785. end;
  786. [] ->
  787. false
  788. end;
  789. true ->
  790. true
  791. end.
  792. process_is_down(Pid) ->
  793. Keys = ets:select(?TAB, [{{{Pid,'$1'},'$2'},
  794. [{'==',{element,2,'$1'},l}], [{{'$1','$2'}}]}]),
  795. ets:select_delete(?TAB, [{{{Pid,{'_',l,'_'}},'_'}, [], [true]}]),
  796. ets:delete(?TAB, {Pid,l}),
  797. lists:foreach(fun({Key,r}) ->
  798. gproc_lib:remove_reg_1(Key, Pid);
  799. ({Key,w}) ->
  800. gproc_lib:remove_waiter(Key, Pid)
  801. end, Keys).
  802. create_tabs() ->
  803. ets:new(?MODULE, [ordered_set, public, named_table]).
  804. %% @hidden
  805. init([]) ->
  806. {ok, #state{}}.
  807. %% ensure_monitor(Pid) when node(Pid) == node() ->
  808. %% case ets:insert_new(?TAB, {Pid}) of
  809. %% false -> ok;
  810. %% true -> erlang:monitor(process, Pid)
  811. %% end;
  812. %% ensure_monitor(_) ->
  813. %% true.
  814. monitor_me() ->
  815. case ets:insert_new(?TAB, {{self(),l}}) of
  816. false -> true;
  817. true ->
  818. cast({monitor_me,self()}),
  819. true
  820. end.
  821. pattern([{'_', Gs, As}], T) ->
  822. ?l,
  823. {HeadPat, Vs} = headpat(T, '$1', '$2', '$3'),
  824. [{HeadPat, rewrite(Gs,Vs), rewrite(As,Vs)}];
  825. pattern([{{A,B,C},Gs,As}], Scope) ->
  826. ?l,
  827. {HeadPat, Vars} = headpat(Scope, A,B,C),
  828. [{HeadPat, rewrite(Gs,Vars), rewrite(As,Vars)}];
  829. pattern([{Head, Gs, As}], Scope) ->
  830. ?l,
  831. case is_var(Head) of
  832. {true,_N} ->
  833. HeadPat = {{{type(Scope),'_','_'},'_'},'_','_'},
  834. Vs = [{Head, obj_prod()}],
  835. %% {HeadPat, Vs} = headpat(Scope, A,B,C),
  836. %% the headpat function should somehow verify that Head is
  837. %% consistent with Scope (or should we add a guard?)
  838. [{HeadPat, rewrite(Gs, Vs), rewrite(As, Vs)}];
  839. false ->
  840. erlang:error(badarg)
  841. end.
  842. %% This is the expression to use in guards and the RHS to address the whole
  843. %% object, in its logical representation.
  844. obj_prod() ->
  845. {{ {element,1,{element,1,'$_'}},
  846. {element,2,'$_'},
  847. {element,3,'$_'} }}.
  848. obj_prod_l() ->
  849. [ {element,1,{element,1,'$_'}},
  850. {element,2,'$_'},
  851. {element,3,'$_'} ].
  852. headpat({S, T}, V1,V2,V3) when S==global; S==local; S==all ->
  853. headpat(type(T), scope(S), V1,V2,V3);
  854. headpat(T, V1, V2, V3) when is_atom(T) ->
  855. headpat(type(T), l, V1, V2, V3);
  856. headpat(_, _, _, _) -> erlang:error(badarg).
  857. headpat(T, C, V1,V2,V3) ->
  858. Rf = fun(Pos) ->
  859. {element,Pos,{element,1,{element,1,'$_'}}}
  860. end,
  861. K2 = if T==n orelse T==a -> T;
  862. true -> '_'
  863. end,
  864. {Kp,Vars} = case V1 of
  865. {Vt,Vc,Vn} ->
  866. ?l,
  867. {T1,Vs1} = subst(T,Vt,fun() -> Rf(1) end,[]),
  868. {C1,Vs2} = subst(C,Vc,fun() -> Rf(2) end,Vs1),
  869. {{T1,C1,Vn}, Vs2};
  870. '_' ->
  871. ?l,
  872. {{T,C,'_'}, []};
  873. _ ->
  874. ?l,
  875. case is_var(V1) of
  876. {true,_} ->
  877. {{T,C,V1}, [{V1, {element,1,
  878. {element,1,'$_'}}}]};
  879. false ->
  880. erlang:error(badarg)
  881. end
  882. end,
  883. {{{Kp,K2},V2,V3}, Vars}.
  884. %% l(L) -> L.
  885. subst(X, '_', _F, Vs) ->
  886. {X, Vs};
  887. subst(X, V, F, Vs) ->
  888. case is_var(V) of
  889. {true,_} ->
  890. {X, [{V,F()}|Vs]};
  891. false ->
  892. {V, Vs}
  893. end.
  894. scope(all) -> '_';
  895. scope(global) -> g;
  896. scope(local) -> l.
  897. type(all) -> '_';
  898. type(T) when T==n; T==p; T==c; T==a -> T;
  899. type(names) -> n;
  900. type(props) -> p;
  901. type(counters) -> c;
  902. type(aggr_counters) -> a.
  903. keypat(Context) ->
  904. {S,T} = get_s_t(Context),
  905. {{T,S,'_'},'_'}.
  906. get_s_t({S,T}) -> {scope(S), type(T)};
  907. get_s_t(T) when is_atom(T) ->
  908. {l, type(T)}.
  909. is_var('$1') -> {true,1};
  910. is_var('$2') -> {true,2};
  911. is_var('$3') -> {true,3};
  912. is_var('$4') -> {true,4};
  913. is_var('$5') -> {true,5};
  914. is_var('$6') -> {true,6};
  915. is_var('$7') -> {true,7};
  916. is_var('$8') -> {true,8};
  917. is_var('$9') -> {true,9};
  918. is_var(X) when is_atom(X) ->
  919. case atom_to_list(X) of
  920. "$" ++ Tl ->
  921. try N = list_to_integer(Tl),
  922. {true,N}
  923. catch
  924. error:_ ->
  925. false
  926. end;
  927. _ ->
  928. false
  929. end;
  930. is_var(_) -> false.
  931. rewrite(Gs, R) ->
  932. [rewrite1(G, R) || G <- Gs].
  933. rewrite1('$_', _) ->
  934. obj_prod();
  935. rewrite1('$$', _) ->
  936. obj_prod_l();
  937. rewrite1(Guard, R) when is_tuple(Guard) ->
  938. list_to_tuple([rewrite1(G, R) || G <- tuple_to_list(Guard)]);
  939. rewrite1(Exprs, R) when is_list(Exprs) ->
  940. [rewrite1(E, R) || E <- Exprs];
  941. rewrite1(V, R) when is_atom(V) ->
  942. case is_var(V) of
  943. {true,_N} ->
  944. case lists:keysearch(V, 1, R) of
  945. {value, {_, V1}} ->
  946. V1;
  947. false ->
  948. V
  949. end;
  950. false ->
  951. V
  952. end;
  953. rewrite1(Expr, _) ->
  954. Expr.
  955. %% @spec (Context::context()) -> any()
  956. %%
  957. %% @doc
  958. %% @equiv table(Context, [])
  959. %% @end
  960. %%
  961. table(Context) ->
  962. table(Context, []).
  963. %% @spec (Context::context(), Opts) -> any()
  964. %%
  965. %% @doc QLC table generator for the gproc registry.
  966. %% Context specifies which subset of the registry should be queried.
  967. %% See [http://www.erlang.org/doc/man/qlc.html].
  968. %% @end
  969. table(Ctxt, Opts) ->
  970. [Traverse, NObjs] = [proplists:get_value(K,Opts,Def) ||
  971. {K,Def} <- [{traverse,select}, {n_objects,100}]],
  972. TF = case Traverse of
  973. first_next ->
  974. fun() -> qlc_next(Ctxt, first(Ctxt)) end;
  975. last_prev -> fun() -> qlc_prev(Ctxt, last(Ctxt)) end;
  976. select ->
  977. fun(MS) -> qlc_select(select(Ctxt, MS, NObjs)) end;
  978. {select,MS} ->
  979. fun() -> qlc_select(select(Ctxt, MS, NObjs)) end;
  980. _ ->
  981. erlang:error(badarg, [Ctxt,Opts])
  982. end,
  983. InfoFun = fun(indices) -> [2];
  984. (is_unique_objects) -> is_unique(Ctxt);
  985. (keypos) -> 1;
  986. (is_sorted_key) -> true;
  987. (num_of_objects) ->
  988. %% this is just a guesstimate.
  989. trunc(ets:info(?TAB,size) / 2.5)
  990. end,
  991. LookupFun =
  992. case Traverse of
  993. {select, _MS} -> undefined;
  994. _ -> fun(Pos, Ks) -> qlc_lookup(Ctxt, Pos, Ks) end
  995. end,
  996. qlc:table(TF, [{info_fun, InfoFun},
  997. {lookup_fun, LookupFun}] ++ [{K,V} || {K,V} <- Opts,
  998. K =/= traverse,
  999. K =/= n_objects]).
  1000. qlc_lookup(_Scope, 1, Keys) ->
  1001. lists:flatmap(
  1002. fun(Key) ->
  1003. ets:select(?TAB, [{ {{Key,'_'},'_','_'}, [],
  1004. [{{ {element,1,{element,1,'$_'}},
  1005. {element,2,'$_'},
  1006. {element,3,'$_'} }}] }])
  1007. end, Keys);
  1008. qlc_lookup(Scope, 2, Pids) ->
  1009. lists:flatmap(fun(Pid) ->
  1010. Found =
  1011. ets:select(?TAB, [{ {{Pid,keypat(Scope)}},
  1012. [], ['$_']}]),
  1013. lists:flatmap(
  1014. fun({{_,{T,_,_}=K}}) ->
  1015. K2 = if T==n orelse T==a -> T;
  1016. true -> Pid
  1017. end,
  1018. case ets:lookup(?TAB, {K,K2}) of
  1019. [{{Key,_},_,Value}] ->
  1020. [{Key, Pid, Value}];
  1021. [] ->
  1022. []
  1023. end
  1024. end, Found)
  1025. end, Pids).
  1026. qlc_next(_, '$end_of_table') -> [];
  1027. qlc_next(Scope, K) ->
  1028. case ets:lookup(?TAB, K) of
  1029. [{{Key,_}, Pid, V}] ->
  1030. [{Key,Pid,V} | fun() -> qlc_next(Scope, next(Scope, K)) end];
  1031. [] ->
  1032. qlc_next(Scope, next(Scope, K))
  1033. end.
  1034. qlc_prev(_, '$end_of_table') -> [];
  1035. qlc_prev(Scope, K) ->
  1036. case ets:lookup(?TAB, K) of
  1037. [{{Key,_},Pid,V}] ->
  1038. [{Key,Pid,V} | fun() -> qlc_prev(Scope, prev(Scope, K)) end];
  1039. [] ->
  1040. qlc_prev(Scope, prev(Scope, K))
  1041. end.
  1042. qlc_select('$end_of_table') ->
  1043. [];
  1044. qlc_select({Objects, Cont}) ->
  1045. Objects ++ fun() -> qlc_select(ets:select(Cont)) end.
  1046. is_unique(names) -> true;
  1047. is_unique(aggr_counters) -> true;
  1048. is_unique({_, names}) -> true;
  1049. is_unique({_, aggr_counters}) -> true;
  1050. is_unique(n) -> true;
  1051. is_unique(a) -> true;
  1052. is_unique({_,n}) -> true;
  1053. is_unique({_,a}) -> true;
  1054. is_unique(_) -> false.