gproc.erl 57 KB


  1. %% ``The contents of this file are subject to the Erlang Public License,
  2. %% Version 1.1, (the "License"); you may not use this file except in
  3. %% compliance with the License. You should have received a copy of the
  4. %% Erlang Public License along with this software. If not, it can be
  5. %% retrieved via the world wide web at http://www.erlang.org/.
  6. %%
  7. %% Software distributed under the License is distributed on an "AS IS"
  8. %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  9. %% the License for the specific language governing rights and limitations
  10. %% under the License.
  11. %%
  12. %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
  13. %% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
  14. %% AB. All Rights Reserved.''
  15. %%
  16. %% @author Ulf Wiger <ulf.wiger@erlang-consulting.com>
  17. %%
  18. %% @doc Extended process registry
  19. %% This module implements an extended process registry
  20. %%
  21. %% For a detailed description, see
  22. %% <a href="erlang07-wiger.pdf">erlang07-wiger.pdf</a>.
  23. %%
  24. %% <h2>Tuning Gproc performance</h2>
  25. %%
  26. %% Gproc relies on a central server and an ordered-set ets table.
  27. %% Effort is made to perform as much work as possible in the client without
  28. %% sacrificing consistency. A few things can be tuned by setting the following
  29. %% application environment variables in the top application of `gproc'
  30. %% (usually `gproc'):
  31. %%
  32. %% * `{ets_options, list()}' - Currently, the options `{write_concurrency, F}'
  33. %% and `{read_concurrency, F}' are allowed. The default is
  34. %% `[{write_concurrency, true}, {read_concurrency, true}]'
  35. %% * `{server_options, list()}' - These will be passed as spawn options when
  36. %% starting the `gproc' and `gproc_dist' servers. Default is `[]'. It is
  37. %% likely that `{priority, high | max}' and/or increasing `min_heap_size'
  38. %% will improve performance.
  39. %%
  40. %% @end
  41. %% @type type() = n | p | c | a. n = name; p = property; c = counter;
  42. %% a = aggregate_counter
  43. %% @type scope() = l | g. l = local registration; g = global registration
  44. %%
  45. %% @type reg_id() = {type(), scope(), any()}.
  46. %% @type unique_id() = {n | a, scope(), any()}.
  47. %%
  48. %% @type sel_scope() = scope | all | global | local.
  49. %% @type sel_type() = type() | names | props | counters | aggr_counters.
  50. %% @type context() = {scope(), type()} | type(). {'all','all'} is the default
  51. %%
  52. %% @type headpat() = {keypat(),pidpat(),ValPat}.
  53. %% @type keypat() = {sel_type() | sel_var(),
  54. %% l | g | sel_var(),
  55. %% any()}.
  56. %% @type pidpat() = pid() | sel_var().
  57. %% sel_var() = DollarVar | '_'.
  58. %% @type sel_pattern() = [{headpat(), Guards, Prod}].
  59. %% @type key() = {type(), scope(), any()}.
  60. -module(gproc).
  61. -behaviour(gen_server).
  62. -export([start_link/0,
  63. reg/1, reg/2, unreg/1,
  64. reg_shared/1, reg_shared/2, unreg_shared/1,
  65. mreg/3,
  66. munreg/3,
  67. set_value/2,
  68. get_value/1, get_value/2,
  69. where/1,
  70. await/1, await/2,
  71. nb_wait/1,
  72. cancel_wait/2,
  73. lookup_pid/1,
  74. lookup_pids/1,
  75. lookup_value/1,
  76. lookup_values/1,
  77. update_counter/2,
  78. reset_counter/1,
  79. update_shared_counter/2,
  80. give_away/2,
  81. goodbye/0,
  82. send/2,
  83. info/1, info/2,
  84. i/0,
  85. select/1, select/2, select/3,
  86. select_count/1, select_count/2,
  87. first/1,
  88. next/2,
  89. prev/2,
  90. last/1,
  91. table/0, table/1, table/2]).
  92. %% Environment handling
  93. -export([get_env/3, get_env/4,
  94. get_set_env/3, get_set_env/4,
  95. set_env/5]).
  96. %% Convenience functions
  97. -export([add_local_name/1,
  98. add_global_name/1,
  99. add_local_property/2,
  100. add_global_property/2,
  101. add_local_counter/2,
  102. add_global_counter/2,
  103. add_local_aggr_counter/1,
  104. add_global_aggr_counter/1,
  105. add_shared_local_counter/2,
  106. lookup_local_name/1,
  107. lookup_global_name/1,
  108. lookup_local_properties/1,
  109. lookup_global_properties/1,
  110. lookup_local_counters/1,
  111. lookup_global_counters/1,
  112. lookup_local_aggr_counter/1,
  113. lookup_global_aggr_counter/1]).
  114. %% Callbacks for behaviour support
  115. -export([whereis_name/1,
  116. unregister_name/1]).
  117. -export([default/1]).
  118. %%% internal exports
  119. -export([init/1,
  120. handle_cast/2,
  121. handle_call/3,
  122. handle_info/2,
  123. code_change/3,
  124. terminate/2]).
  125. %% this shouldn't be necessary
  126. -export([audit_process/1]).
  127. -include("gproc.hrl").
  128. -include_lib("eunit/include/eunit.hrl").
  129. -define(SERVER, ?MODULE).
  130. %%-define(l, l(?LINE)). % when activated, calls a traceable empty function
  131. -define(l, ignore).
  132. -define(CHK_DIST,
  133. case whereis(gproc_dist) of
  134. undefined ->
  135. erlang:error(local_only);
  136. _ ->
  137. ok
  138. end).
  139. -record(state, {}).
  140. %% @spec () -> {ok, pid()}
  141. %%
  142. %% @doc Starts the gproc server.
  143. %%
  144. %% This function is intended to be called from gproc_sup, as part of
  145. %% starting the gproc application.
  146. %% @end
  147. start_link() ->
  148. _ = create_tabs(),
  149. SpawnOpts = gproc_lib:valid_opts(server_options, []),
  150. gen_server:start_link({local, ?SERVER}, ?MODULE, [],
  151. [{spawn_opt, SpawnOpts}]).
  152. %% spec(Name::any()) -> true
  153. %%
  154. %% @doc Registers a local (unique) name. @equiv reg({n,l,Name})
  155. %% @end
  156. %%
  157. add_local_name(Name) -> reg({n,l,Name}, undefined).
  158. %% spec(Name::any()) -> true
  159. %%
  160. %% @doc Registers a global (unique) name. @equiv reg({n,g,Name})
  161. %% @end
  162. %%
  163. add_global_name(Name) -> reg({n,g,Name}, undefined).
  164. %% spec(Name::any(), Value::any()) -> true
  165. %%
  166. %% @doc Registers a local (non-unique) property. @equiv reg({p,l,Name},Value)
  167. %% @end
  168. %%
  169. add_local_property(Name , Value) -> reg({p,l,Name}, Value).
  170. %% spec(Name::any(), Value::any()) -> true
  171. %%
  172. %% @doc Registers a global (non-unique) property. @equiv reg({p,g,Name},Value)
  173. %% @end
  174. %%
  175. add_global_property(Name, Value) -> reg({p,g,Name}, Value).
  176. %% spec(Name::any(), Initial::integer()) -> true
  177. %%
  178. %% @doc Registers a local (non-unique) counter. @equiv reg({c,l,Name},Value)
  179. %% @end
  180. %%
  181. add_local_counter(Name, Initial) when is_integer(Initial) ->
  182. reg({c,l,Name}, Initial).
  183. %% spec(Name::any(), Initial::integer()) -> true
  184. %%
  185. %% @doc Registers a local shared (unique) counter.
  186. %% @equiv reg_shared({c,l,Name},Value)
  187. %% @end
  188. %%
  189. add_shared_local_counter(Name, Initial) when is_integer(Initial) ->
  190. reg_shared({c,l,Name}, Initial).
  191. %% spec(Name::any(), Initial::integer()) -> true
  192. %%
  193. %% @doc Registers a global (non-unique) counter. @equiv reg({c,g,Name},Value)
  194. %% @end
  195. %%
  196. add_global_counter(Name, Initial) when is_integer(Initial) ->
  197. reg({c,g,Name}, Initial).
  198. %% spec(Name::any()) -> true
  199. %%
  200. %% @doc Registers a local (unique) aggregated counter.
  201. %% @equiv reg({a,l,Name})
  202. %% @end
  203. %%
  204. add_local_aggr_counter(Name) -> reg({a,l,Name}).
  205. %% spec(Name::any()) -> true
  206. %%
  207. %% @doc Registers a global (unique) aggregated counter.
  208. %% @equiv reg({a,g,Name})
  209. %% @end
  210. %%
  211. add_global_aggr_counter(Name) -> reg({a,g,Name}).
  212. %% @spec (Name::any()) -> pid()
  213. %%
  214. %% @doc Lookup a local unique name. Fails if there is no such name.
  215. %% @equiv where({n,l,Name})
  216. %% @end
  217. %%
  218. lookup_local_name(Name) -> where({n,l,Name}).
  219. %% @spec (Name::any()) -> pid()
  220. %%
  221. %% @doc Lookup a global unique name. Fails if there is no such name.
  222. %% @equiv where({n,g,Name})
  223. %% @end
  224. %%
  225. lookup_global_name(Name) -> where({n,g,Name}).
  226. %% @spec (Name::any()) -> integer()
  227. %%
  228. %% @doc Lookup a local (unique) aggregated counter and returns its value.
  229. %% Fails if there is no such object.
  230. %% @equiv where({a,l,Name})
  231. %% @end
  232. %%
  233. lookup_local_aggr_counter(Name) -> lookup_value({a,l,Name}).
  234. %% @spec (Name::any()) -> integer()
  235. %%
  236. %% @doc Lookup a global (unique) aggregated counter and returns its value.
  237. %% Fails if there is no such object.
  238. %% @equiv where({a,g,Name})
  239. %% @end
  240. %%
  241. lookup_global_aggr_counter(Name) -> lookup_value({a,g,Name}).
  242. %% @spec (Property::any()) -> [{pid(), Value}]
  243. %%
  244. %% @doc Look up all local (non-unique) instances of a given Property.
  245. %% Returns a list of {Pid, Value} tuples for all matching objects.
  246. %% @equiv lookup_values({p, l, Property})
  247. %% @end
  248. %%
  249. lookup_local_properties(P) -> lookup_values({p,l,P}).
  250. %% @spec (Property::any()) -> [{pid(), Value}]
  251. %%
  252. %% @doc Look up all global (non-unique) instances of a given Property.
  253. %% Returns a list of {Pid, Value} tuples for all matching objects.
  254. %% @equiv lookup_values({p, g, Property})
  255. %% @end
  256. %%
  257. lookup_global_properties(P) -> lookup_values({p,g,P}).
  258. %% @spec (Counter::any()) -> [{pid(), Value::integer()}]
  259. %%
  260. %% @doc Look up all local (non-unique) instances of a given Counter.
  261. %% Returns a list of {Pid, Value} tuples for all matching objects.
  262. %% @equiv lookup_values({c, l, Counter})
  263. %% @end
  264. %%
  265. lookup_local_counters(P) -> lookup_values({c,l,P}).
  266. %% @spec (Counter::any()) -> [{pid(), Value::integer()}]
  267. %%
  268. %% @doc Look up all global (non-unique) instances of a given Counter.
  269. %% Returns a list of {Pid, Value} tuples for all matching objects.
  270. %% @equiv lookup_values({c, g, Counter})
  271. %% @end
  272. %%
  273. lookup_global_counters(P) -> lookup_values({c,g,P}).
  274. %% @spec get_env(Scope::scope(), App::atom(), Key::atom()) -> term()
  275. %% @equiv get_env(Scope, App, Key, [app_env])
  276. get_env(Scope, App, Key) ->
  277. get_env(Scope, App, Key, [app_env]).
  278. %% @spec (Scope::scope(), App::atom(), Key::atom(), Strategy) -> term()
  279. %% Strategy = [Alternative]
  280. %% Alternative = app_env
  281. %% | os_env
  282. %% | inherit | {inherit, pid()} | {inherit, unique_id()}
  283. %% | init_arg
  284. %% | {mnesia, ActivityType, Oid, Pos}
  285. %% | {default, term()}
  286. %% | error
  287. %% @doc Read an environment value, potentially cached as a `gproc_env' property.
  288. %%
  289. %% This function first tries to read the value of a cached property,
  290. %% `{p, Scope, {gproc_env, App, Key}}'. If this fails, it will try the provided
  291. %% alternative strategy. `Strategy' is a list of alternatives, tried in order.
  292. %% Each alternative can be one of:
  293. %%
  294. %% * `app_env' - try `application:get_env(App, Key)'
  295. %% * `os_env' - try `os:getenv(ENV)', where `ENV' is `Key' converted into an
  296. %% uppercase string
  297. %% * `{os_env, ENV}' - try `os:getenv(ENV)'
  298. %% * `inherit' - inherit the cached value, if any, held by the parent process.
  299. %% * `{inherit, Pid}' - inherit the cached value, if any, held by `Pid'.
  300. %% * `{inherit, Id}' - inherit the cached value, if any, held by the process
  301. %% registered in `gproc' as `Id'.
  302. %% * `init_arg' - try `init:get_argument(Key)'; expects a single value, if any.
  303. %% * `{mnesia, ActivityType, Oid, Pos}' - try
  304. %% `mnesia:activity(ActivityType, fun() -> mnesia:read(Oid) end)'; retrieve
  305. %% the value in position `Pos' if object found.
  306. %% * `{default, Value}' - set a default value to return once alternatives have
  307. %% been exhausted; if not set, `undefined' will be returned.
  308. %% * `error' - raise an exception, `erlang:error(gproc_env, [App, Key, Scope])'.
  309. %%
  310. %% While any alternative can occur more than once, the only one that might make
  311. %% sense to use multiple times is `{default, Value}'.
  312. %%
  313. %% The return value will be one of:
  314. %%
  315. %% * The value of the first matching alternative, or `error' eception,
  316. %% whichever comes first
  317. %% * The last instance of `{default, Value}', or `undefined', if there is no
  318. %% matching alternative, default or `error' entry in the list.
  319. %%
  320. %% The `error' option can be used to assert that a value has been previously
  321. %% cached. Alternatively, it can be used to assert that a value is either cached
  322. %% or at least defined somewhere,
  323. %% e.g. `get_env(l, mnesia, dir, [app_env, error])'.
  324. %% @end
  325. get_env(Scope, App, Key, Strategy)
  326. when Scope==l, is_atom(App), is_atom(Key);
  327. Scope==g, is_atom(App), is_atom(Key) ->
  328. do_get_env(Scope, App, Key, Strategy, false).
  329. %% @spec get_set_env(Scope::scope(), App::atom(), Key::atom()) -> term()
  330. %% @equiv get_set_env(Scope, App, Key, [app_env])
  331. get_set_env(Scope, App, Key) ->
  332. get_set_env(Scope, App, Key, [app_env]).
  333. %% @spec get_set_env(Scope::scope(), App::atom(), Key::atom(), Strategy) ->
  334. %% Value
  335. %% @doc Fetch and cache an environment value, if not already cached.
  336. %%
  337. %% This function does the same thing as {@link get_env/4}, but also updates the
  338. %% cache. Note that the cache will be updated even if the result of the lookup
  339. %% is `undefined'.
  340. %%
  341. %% @see get_env/4.
  342. %% @end
  343. %%
  344. get_set_env(Scope, App, Key, Strategy)
  345. when Scope==l, is_atom(App), is_atom(Key);
  346. Scope==g, is_atom(App), is_atom(Key) ->
  347. do_get_env(Scope, App, Key, Strategy, true).
  348. do_get_env(Context, App, Key, Alternatives, Set) ->
  349. case lookup_env(Context, App, Key, self()) of
  350. undefined ->
  351. check_alternatives(Alternatives, Context, App, Key, undefined, Set);
  352. {ok, Value} ->
  353. Value
  354. end.
  355. %% @spec set_env(Scope::scope(), App::atom(),
  356. %% Key::atom(), Value::term(), Strategy) -> Value
  357. %% Strategy = [Alternative]
  358. %% Alternative = app_env | os_env | {os_env, VAR}
  359. %% | {mnesia, ActivityType, Oid, Pos}
  360. %%
  361. %% @doc Updates the cached value as well as underlying environment.
  362. %%
  363. %% This function should be exercised with caution, as it affects the larger
  364. %% environment outside gproc. This function modifies the cached value, and then
  365. %% proceeds to update the underlying environment (OS environment variable or
  366. %% application environment variable).
  367. %%
  368. %% When the `mnesia' alternative is used, gproc will try to update any existing
  369. %% object, changing only the `Pos' position. If no such object exists, it will
  370. %% create a new object, setting any other attributes (except `Pos' and the key)
  371. %% to `undefined'.
  372. %% @end
  373. %%
  374. set_env(Scope, App, Key, Value, Strategy)
  375. when Scope==l, is_atom(App), is_atom(Key);
  376. Scope==g, is_atom(App), is_atom(Key) ->
  377. case is_valid_set_strategy(Strategy, Value) of
  378. true ->
  379. update_cached_env(Scope, App, Key, Value),
  380. set_strategy(Strategy, App, Key, Value);
  381. false ->
  382. erlang:error(badarg)
  383. end.
  384. check_alternatives([{default, Val}|Alts], Scope, App, Key, _, Set) ->
  385. check_alternatives(Alts, Scope, App, Key, Val, Set);
  386. check_alternatives([H|T], Scope, App, Key, Def, Set) ->
  387. case try_alternative(H, App, Key, Scope) of
  388. undefined ->
  389. check_alternatives(T, Scope, App, Key, Def, Set);
  390. {ok, Value} ->
  391. if Set ->
  392. cache_env(Scope, App, Key, Value),
  393. Value;
  394. true ->
  395. Value
  396. end
  397. end;
  398. check_alternatives([], Scope, App, Key, Def, Set) ->
  399. if Set ->
  400. cache_env(Scope, App, Key, Def);
  401. true ->
  402. ok
  403. end,
  404. Def.
  405. try_alternative(error, App, Key, Scope) ->
  406. erlang:error(gproc_env, [App, Key, Scope]);
  407. try_alternative(inherit, App, Key, Scope) ->
  408. case get('$ancestors') of
  409. [P|_] ->
  410. lookup_env(Scope, App, Key, P);
  411. _ ->
  412. undefined
  413. end;
  414. try_alternative({inherit, P}, App, Key, Scope) when is_pid(P) ->
  415. lookup_env(Scope, App, Key, P);
  416. try_alternative({inherit, P}, App, Key, Scope) ->
  417. case where(P) of
  418. undefined -> undefined;
  419. Pid when is_pid(Pid) ->
  420. lookup_env(Scope, App, Key, Pid)
  421. end;
  422. try_alternative(app_env, App, Key, _Scope) ->
  423. case application:get_env(App, Key) of
  424. undefined -> undefined;
  425. {ok, undefined} -> undefined;
  426. {ok, Value} -> {ok, Value}
  427. end;
  428. try_alternative(os_env, _App, Key, _) ->
  429. case os:getenv(os_env_key(Key)) of
  430. false -> undefined;
  431. Val -> {ok, Val}
  432. end;
  433. try_alternative({os_env, Key}, _, _, _) ->
  434. case os:getenv(Key) of
  435. false -> undefined;
  436. Val -> {ok, Val}
  437. end;
  438. try_alternative(init_arg, _, Key, _) ->
  439. case init:get_argument(Key) of
  440. {ok, [[Value]]} ->
  441. {ok, Value};
  442. error ->
  443. undefined
  444. end;
  445. try_alternative({mnesia,Type,Key,Pos}, _, _, _) ->
  446. case mnesia:activity(Type, fun() -> mnesia:read(Key) end) of
  447. [] -> undefined;
  448. [Found] ->
  449. {ok, element(Pos, Found)}
  450. end.
  451. os_env_key(Key) ->
  452. string:to_upper(atom_to_list(Key)).
  453. lookup_env(Scope, App, Key, P) ->
  454. case ets:lookup(?TAB, {{p, Scope, {gproc_env, App, Key}}, P}) of
  455. [] ->
  456. undefined;
  457. [{_, _, Value}] ->
  458. {ok, Value}
  459. end.
  460. cache_env(Scope, App, Key, Value) ->
  461. reg({p, Scope, {gproc_env, App, Key}}, Value).
  462. update_cached_env(Scope, App, Key, Value) ->
  463. case lookup_env(Scope, App, Key, self()) of
  464. undefined ->
  465. cache_env(Scope, App, Key, Value);
  466. {ok, _} ->
  467. set_value({p, Scope, {gproc_env, App, Key}}, Value)
  468. end.
  469. is_valid_set_strategy([os_env|T], Value) ->
  470. is_string(Value) andalso is_valid_set_strategy(T, Value);
  471. is_valid_set_strategy([{os_env, _}|T], Value) ->
  472. is_string(Value) andalso is_valid_set_strategy(T, Value);
  473. is_valid_set_strategy([app_env|T], Value) ->
  474. is_valid_set_strategy(T, Value);
  475. is_valid_set_strategy([{mnesia,_Type,_Oid,_Pos}|T], Value) ->
  476. is_valid_set_strategy(T, Value);
  477. is_valid_set_strategy([], _) ->
  478. true;
  479. is_valid_set_strategy(_, _) ->
  480. false.
  481. set_strategy([H|T], App, Key, Value) ->
  482. case H of
  483. app_env ->
  484. application:set_env(App, Key, Value);
  485. os_env ->
  486. os:putenv(os_env_key(Key), Value);
  487. {os_env, ENV} ->
  488. os:putenv(ENV, Value);
  489. {mnesia,Type,Oid,Pos} ->
  490. mnesia:activity(
  491. Type,
  492. fun() ->
  493. Rec = case mnesia:read(Oid) of
  494. [] ->
  495. {Tab,K} = Oid,
  496. Tag = mnesia:table_info(Tab, record_name),
  497. Attrs = mnesia:table_info(Tab, attributes),
  498. list_to_tuple(
  499. [Tag,K |
  500. [undefined || _ <- tl(Attrs)]]);
  501. [Old] ->
  502. Old
  503. end,
  504. mnesia:write(setelement(Pos, Rec, Value))
  505. end)
  506. end,
  507. set_strategy(T, App, Key, Value);
  508. set_strategy([], _, _, Value) ->
  509. Value.
  510. is_string(S) ->
  511. try begin _ = iolist_to_binary(S),
  512. true
  513. end
  514. catch
  515. error:_ ->
  516. false
  517. end.
  518. %% @spec reg(Key::key()) -> true
  519. %%
  520. %% @doc
  521. %% @equiv reg(Key, default(Key))
  522. %% @end
  523. reg(Key) ->
  524. reg(Key, default(Key)).
  525. default({T,_,_}) when T==c -> 0;
  526. default(_) -> undefined.
  527. %% @spec await(Key::key()) -> {pid(),Value}
  528. %% @equiv await(Key,infinity)
  529. %%
  530. await(Key) ->
  531. await(Key, infinity).
  532. %% @spec await(Key::key(), Timeout) -> {pid(),Value}
  533. %% Timeout = integer() | infinity
  534. %%
  535. %% @doc Wait for a local name to be registered.
  536. %% The function raises an exception if the timeout expires. Timeout must be
  537. %% either an interger &gt; 0 or 'infinity'.
  538. %% A small optimization: we first perform a lookup, to see if the name
  539. %% is already registered. This way, the cost of the operation will be
  540. %% roughly the same as of where/1 in the case where the name is already
  541. %% registered (the difference: await/2 also returns the value).
  542. %% @end
  543. %%
  544. await({n,g,_} = Key, Timeout) ->
  545. ?CHK_DIST,
  546. request_wait(Key, Timeout);
  547. await({n,l,_} = Key, Timeout) ->
  548. case ets:lookup(?TAB, {Key, n}) of
  549. [{_, Pid, Value}] ->
  550. {Pid, Value};
  551. _ ->
  552. request_wait(Key, Timeout)
  553. end;
  554. await(K, T) ->
  555. erlang:error(badarg, [K, T]).
  556. request_wait({n,C,_} = Key, Timeout) when C==l; C==g ->
  557. TRef = case Timeout of
  558. infinity -> no_timer;
  559. T when is_integer(T), T > 0 ->
  560. erlang:start_timer(T, self(), gproc_timeout);
  561. _ ->
  562. erlang:error(badarg, [Key, Timeout])
  563. end,
  564. WRef = case {call({await,Key,self()}, C), C} of
  565. {{R, {Kg,Pg,Vg}}, g} ->
  566. self() ! {gproc, R, registered, {Kg,Pg,Vg}},
  567. R;
  568. {R,_} ->
  569. R
  570. end,
  571. receive
  572. {gproc, WRef, registered, {_K, Pid, V}} ->
  573. _ = case TRef of
  574. no_timer -> ignore;
  575. _ -> erlang:cancel_timer(TRef)
  576. end,
  577. {Pid, V};
  578. {timeout, TRef, gproc_timeout} ->
  579. cancel_wait(Key, WRef),
  580. erlang:error(timeout, [Key, Timeout])
  581. end.
  582. %% @spec nb_wait(Key::key()) -> Ref
  583. %%
  584. %% @doc Wait for a local name to be registered.
  585. %% The caller can expect to receive a message,
  586. %% {gproc, Ref, registered, {Key, Pid, Value}}, once the name is registered.
  587. %% @end
  588. %%
  589. nb_wait({n,g,_} = Key) ->
  590. ?CHK_DIST,
  591. call({await, Key, self()}, g);
  592. nb_wait({n,l,_} = Key) ->
  593. call({await, Key, self()}, l);
  594. nb_wait(Key) ->
  595. erlang:error(badarg, [Key]).
  596. cancel_wait({_,g,_} = Key, Ref) ->
  597. ?CHK_DIST,
  598. cast({cancel_wait, self(), Key, Ref}, g),
  599. ok;
  600. cancel_wait({_,l,_} = Key, Ref) ->
  601. cast({cancel_wait, self(), Key, Ref}, l),
  602. ok.
  603. %% @spec reg(Key::key(), Value) -> true
  604. %%
  605. %% @doc Register a name or property for the current process
  606. %%
  607. %%
  608. reg({_,g,_} = Key, Value) ->
  609. %% anything global
  610. ?CHK_DIST,
  611. gproc_dist:reg(Key, Value);
  612. reg({p,l,_} = Key, Value) ->
  613. local_reg(Key, Value);
  614. reg({a,l,_} = Key, undefined) ->
  615. call({reg, Key, undefined});
  616. reg({c,l,_} = Key, Value) when is_integer(Value) ->
  617. call({reg, Key, Value});
  618. reg({n,l,_} = Key, Value) ->
  619. call({reg, Key, Value});
  620. reg(_, _) ->
  621. erlang:error(badarg).
  622. %% @spec reg_shared(Key::key()) -> true
  623. %%
  624. %% @doc Register a resource, but don't tie it to a particular process.
  625. %%
  626. %% `reg_shared({c,l,C}) -> reg_shared({c,l,C}, 0).'
  627. %% `reg_shared({a,l,A}) -> reg_shared({a,l,A}, undefined).'
  628. %% @end
  629. reg_shared({c,_,_} = Key) ->
  630. reg_shared(Key, 0);
  631. reg_shared({a,_,_} = Key) ->
  632. reg_shared(Key, undefined).
  633. %% @spec reg_shared(Key::key(), Value) -> true
  634. %%
  635. %% @doc Register a resource, but don't tie it to a particular process.
  636. %%
  637. %% Shared resources are all unique. They remain until explicitly unregistered
  638. %% (using {@link unreg_shared/1}). The types of shared resources currently
  639. %% supported are `counter' and `aggregated counter'. In listings and query
  640. %% results, shared resources appear as other similar resources, except that
  641. %% `Pid == shared'. To wit, update_counter({c,l,myCounter}, 1, shared) would
  642. %% increment the shared counter `myCounter' with 1, provided it exists.
  643. %%
  644. %% A shared aggregated counter will track updates in exactly the same way as
  645. %% an aggregated counter which is owned by a process.
  646. %% @end
  647. %%
  648. reg_shared({_,g,_} = Key, Value) ->
  649. %% anything global
  650. ?CHK_DIST,
  651. gproc_dist:reg_shared(Key, Value);
  652. reg_shared({a,l,_} = Key, undefined) ->
  653. call({reg_shared, Key, undefined});
  654. reg_shared({c,l,_} = Key, Value) when is_integer(Value) ->
  655. call({reg_shared, Key, Value});
  656. reg_shared(_, _) ->
  657. erlang:error(badarg).
  658. %% @spec mreg(type(), scope(), [{Key::any(), Value::any()}]) -> true
  659. %%
  660. %% @doc Register multiple {Key,Value} pairs of a given type and scope.
  661. %%
  662. %% This function is more efficient than calling {@link reg/2} repeatedly.
  663. %% It is also atomic in regard to unique names; either all names are registered
  664. %% or none are.
  665. %% @end
  666. mreg(T, g, KVL) ->
  667. ?CHK_DIST,
  668. gproc_dist:mreg(T, KVL);
  669. mreg(T, l, KVL) when T==a; T==n ->
  670. if is_list(KVL) ->
  671. call({mreg, T, l, KVL});
  672. true ->
  673. erlang:error(badarg)
  674. end;
  675. mreg(p, l, KVL) ->
  676. local_mreg(p, KVL);
  677. mreg(_, _, _) ->
  678. erlang:error(badarg).
  679. %% @spec munreg(type(), scope(), [Key::any()]) -> true
  680. %%
  681. %% @doc Unregister multiple Key items of a given type and scope.
  682. %%
  683. %% This function is usually more efficient than calling {@link unreg/1}
  684. %% repeatedly.
  685. %% @end
  686. munreg(T, g, L) ->
  687. ?CHK_DIST,
  688. gproc_dist:munreg(T, existing(T,g,L));
  689. munreg(T, l, L) when T==a; T==n ->
  690. if is_list(L) ->
  691. call({munreg, T, l, existing(T,l,L)});
  692. true ->
  693. erlang:error(badarg)
  694. end;
  695. munreg(p, l, L) ->
  696. local_munreg(p, existing(p,l,L));
  697. munreg(_, _, _) ->
  698. erlang:error(badarg).
  699. existing(T,Scope,L) ->
  700. Keys = if T==p; T==c ->
  701. [{{T,Scope,K}, self()} || K <- L];
  702. T==a; T==n ->
  703. [{{T,Scope,K}, T} || K <- L]
  704. end,
  705. _ = [case ets:member(?TAB, K) of
  706. false -> erlang:error(badarg);
  707. true -> true
  708. end || K <- Keys],
  709. L.
  710. %% @spec (Key:: key()) -> true
  711. %%
  712. %% @doc Unregister a name or property.
  713. %% @end
  714. unreg(Key) ->
  715. case Key of
  716. {_, g, _} ->
  717. ?CHK_DIST,
  718. gproc_dist:unreg(Key);
  719. {T, l, _} when T == n;
  720. T == a -> call({unreg, Key});
  721. {_, l, _} ->
  722. case ets:member(?TAB, {Key,self()}) of
  723. true ->
  724. _ = gproc_lib:remove_reg(Key, self()),
  725. true;
  726. false ->
  727. erlang:error(badarg)
  728. end
  729. end.
  730. %% @spec (Key:: key()) -> true
  731. %%
  732. %% @doc Unregister a shared resource.
  733. %% @end
  734. unreg_shared(Key) ->
  735. case Key of
  736. {_, g, _} ->
  737. ?CHK_DIST,
  738. gproc_dist:unreg_shared(Key);
  739. {T, l, _} when T == c;
  740. T == a -> call({unreg_shared, Key});
  741. _ ->
  742. erlang:error(badarg)
  743. end.
  744. %% @equiv unreg/1
  745. unregister_name(Key) ->
  746. unreg(Key).
  747. %% @spec (Continuation ::term()) -> {[Match],Continuation} | '$end_of_table'
  748. %% @doc
  749. %% see http://www.erlang.org/doc/man/ets.html#select-1
  750. %% @end
  751. select({?TAB, _, _, _, _, _, _, _} = Continuation) ->
  752. ets:select(Continuation);
  753. %% @spec (select_pattern()) -> list(sel_object())
  754. %% @doc
  755. %% @equiv select(all, Pat)
  756. %% @end
  757. select(Pat) ->
  758. select(all, Pat).
  759. %% @spec (Context::context(), Pat::sel_pattern()) -> [{Key, Pid, Value}]
  760. %%
  761. %% @doc Perform a select operation on the process registry.
  762. %%
  763. %% The physical representation in the registry may differ from the above,
  764. %% but the select patterns are transformed appropriately.
  765. %% @end
  766. select(Context, Pat) ->
  767. ets:select(?TAB, pattern(Pat, Context)).
  768. %% @spec (Context::context(), Pat::sel_patten(), Limit::integer()) ->
  769. %% {[Match],Continuation} | '$end_of_table'
  770. %% @doc Like {@link select/2} but returns Limit objects at a time.
  771. %%
  772. %% See [http://www.erlang.org/doc/man/ets.html#select-3].
  773. %% @end
  774. select(Context, Pat, Limit) ->
  775. ets:select(?TAB, pattern(Pat, Context), Limit).
  776. %% @spec (select_pattern()) -> list(sel_object())
  777. %% @doc
  778. %% @equiv select_count(all, Pat)
  779. %% @end
  780. select_count(Pat) ->
  781. select_count(all, Pat).
  782. %% @spec (Context::context(), Pat::sel_pattern()) -> [{Key, Pid, Value}]
  783. %%
  784. %% @doc Perform a select_count operation on the process registry.
  785. %%
  786. %% The physical representation in the registry may differ from the above,
  787. %% but the select patterns are transformed appropriately.
  788. %% @end
  789. select_count(Context, Pat) ->
  790. ets: select_count(?TAB, pattern(Pat, Context)).
  791. %%% Local properties can be registered in the local process, since
  792. %%% no other process can interfere.
  793. %%%
  794. local_reg(Key, Value) ->
  795. case gproc_lib:insert_reg(Key, Value, self(), l) of
  796. false -> erlang:error(badarg);
  797. true -> monitor_me()
  798. end.
  799. local_mreg(_, []) -> true;
  800. local_mreg(T, [_|_] = KVL) ->
  801. case gproc_lib:insert_many(T, l, KVL, self()) of
  802. false -> erlang:error(badarg);
  803. {true,_} -> monitor_me()
  804. end.
  805. local_munreg(T, L) when T==p; T==c ->
  806. _ = [gproc_lib:remove_reg({T,l,K}, self()) || K <- L],
  807. true.
  808. %% @spec (Key :: key(), Value) -> true
  809. %% @doc Sets the value of the registeration entry given by Key
  810. %%
  811. %% Key is assumed to exist and belong to the calling process.
  812. %% If it doesn't, this function will exit.
  813. %%
  814. %% Value can be any term, unless the object is a counter, in which case
  815. %% it must be an integer.
  816. %% @end
  817. %%
  818. set_value({_,g,_} = Key, Value) ->
  819. ?CHK_DIST,
  820. gproc_dist:set_value(Key, Value);
  821. set_value({a,l,_} = Key, Value) when is_integer(Value) ->
  822. call({set, Key, Value});
  823. set_value({n,l,_} = Key, Value) ->
  824. %% we cannot do this locally, since we have to check that the object
  825. %% exists first - not an atomic update.
  826. call({set, Key, Value});
  827. set_value({p,l,_} = Key, Value) ->
  828. %% we _can_ to this locally, since there is no race condition - no
  829. %% other process can update our properties.
  830. case gproc_lib:do_set_value(Key, Value, self()) of
  831. true -> true;
  832. false ->
  833. erlang:error(badarg)
  834. end;
  835. set_value({c,l,_} = Key, Value) when is_integer(Value) ->
  836. gproc_lib:do_set_counter_value(Key, Value, self());
  837. set_value(_, _) ->
  838. erlang:error(badarg).
  839. %% @spec (Key) -> Value
  840. %% @doc Reads the value stored with a key registered to the current process.
  841. %%
  842. %% If no such key is registered to the current process, this function exits.
  843. %% @end
  844. get_value(Key) ->
  845. get_value(Key, self()).
  846. %% @spec (Key, Pid) -> Value
  847. %% @doc Reads the value stored with a key registered to the process Pid.
  848. %%
  849. %% If `Pid == shared', the value of a shared key (see {@link reg_shared/1})
  850. %% will be read.
  851. %% @end
  852. %%
  853. get_value({T,_,_} = Key, Pid) when is_pid(Pid) ->
  854. if T==n orelse T==a ->
  855. case ets:lookup(?TAB, {Key, T}) of
  856. [{_, P, Value}] when P == Pid -> Value;
  857. _ -> erlang:error(badarg)
  858. end;
  859. true ->
  860. ets:lookup_element(?TAB, {Key, Pid}, 3)
  861. end;
  862. get_value({T,_,_} = K, shared) when T==c; T==a ->
  863. Key = case T of
  864. c -> {K, shared};
  865. a -> {K, a}
  866. end,
  867. case ets:lookup(?TAB, Key) of
  868. [{_, shared, Value}] -> Value;
  869. _ -> erlang:error(badarg)
  870. end;
  871. get_value(_, _) ->
  872. erlang:error(badarg).
  873. %% @spec (Key) -> Pid
  874. %% @doc Lookup the Pid stored with a key.
  875. %%
  876. lookup_pid({_T,_,_} = Key) ->
  877. case where(Key) of
  878. undefined -> erlang:error(badarg);
  879. P -> P
  880. end.
  881. %% @spec (Key) -> Value
  882. %% @doc Lookup the value stored with a key.
  883. %%
  884. lookup_value({T,_,_} = Key) ->
  885. if T==n orelse T==a ->
  886. ets:lookup_element(?TAB, {Key,T}, 3);
  887. true ->
  888. erlang:error(badarg)
  889. end.
  890. %% @spec (Key::key()) -> pid()
  891. %%
  892. %% @doc Returns the pid registered as Key
  893. %%
  894. %% The type of registration entry must be either name or aggregated counter.
  895. %% Otherwise this function will exit. Use {@link lookup_pids/1} in these
  896. %% cases.
  897. %% @end
  898. %%
  899. where({T,_,_}=Key) ->
  900. if T==n orelse T==a ->
  901. case ets:lookup(?TAB, {Key,T}) of
  902. [{_, P, _Value}] ->
  903. case my_is_process_alive(P) of
  904. true -> P;
  905. false ->
  906. undefined
  907. end;
  908. _ -> % may be [] or [{Key,Waiters}]
  909. undefined
  910. end;
  911. true ->
  912. erlang:error(badarg)
  913. end.
  914. %% @equiv where/1
  915. whereis_name(Key) ->
  916. where(Key).
  917. %% @spec (Key::key()) -> [pid()]
  918. %%
  919. %% @doc Returns a list of pids with the published key Key
  920. %%
  921. %% If the type of registration entry is either name or aggregated counter,
  922. %% this function will return either an empty list, or a list of one pid.
  923. %% For non-unique types, the return value can be a list of any length.
  924. %% @end
  925. %%
  926. lookup_pids({T,_,_} = Key) ->
  927. L = if T==n orelse T==a ->
  928. ets:select(?TAB, [{{{Key,T}, '$1', '_'},[],['$1']}]);
  929. true ->
  930. ets:select(?TAB, [{{{Key,'_'}, '$1', '_'},[],['$1']}])
  931. end,
  932. [P || P <- L, my_is_process_alive(P)].
  933. %% @spec (pid()) -> boolean()
  934. %%
  935. my_is_process_alive(P) when node(P) =:= node() ->
  936. is_process_alive(P);
  937. my_is_process_alive(_) ->
  938. %% remote pid - assume true (too costly to find out)
  939. true.
  940. %% @spec (Key::key()) -> [{pid(), Value}]
  941. %%
  942. %% @doc Retrieve the `{Pid,Value}' pairs corresponding to Key.
  943. %%
  944. %% Key refer to any type of registry object. If it refers to a unique
  945. %% object, the list will be of length 0 or 1. If it refers to a non-unique
  946. %% object, the return value can be a list of any length.
  947. %% @end
  948. %%
  949. lookup_values({T,_,_} = Key) ->
  950. L = if T==n orelse T==a ->
  951. ets:select(?TAB, [{{{Key,T}, '$1', '$2'},[],[{{'$1','$2'}}]}]);
  952. true ->
  953. ets:select(?TAB, [{{{Key,'_'}, '$1', '$2'},[],[{{'$1','$2'}}]}])
  954. end,
  955. [Pair || {P,_} = Pair <- L, my_is_process_alive(P)].
  956. %% @spec (Key::key(), Incr::integer()) -> integer()
  957. %%
  958. %% @doc Updates the counter registered as Key for the current process.
  959. %%
  960. %% This function works like ets:update_counter/3
  961. %% (see [http://www.erlang.org/doc/man/ets.html#update_counter-3]), but
  962. %% will fail if the type of object referred to by Key is not a counter.
  963. %% @end
  964. %%
  965. update_counter({c,l,_} = Key, Incr) when is_integer(Incr) ->
  966. gproc_lib:update_counter(Key, Incr, self());
  967. update_counter({c,g,_} = Key, Incr) when is_integer(Incr) ->
  968. ?CHK_DIST,
  969. gproc_dist:update_counter(Key, Incr);
  970. update_counter(_, _) ->
  971. erlang:error(badarg).
  972. %% @spec (Key) -> {ValueBefore, ValueAfter}
  973. %% Key = {c, Scope, Name}
  974. %% Scope = l | g
  975. %% ValueBefore = integer()
  976. %% ValueAfter = integer()
  977. %%
  978. %% @doc Reads and resets a counter in a "thread-safe" way
  979. %%
  980. %% This function reads the current value of a counter and then resets it to its
  981. %% initial value. The reset operation is done using {@link update_counter/2},
  982. %% which allows for concurrent calls to {@link update_counter/2} without losing
  983. %% updates. Aggregated counters are updated accordingly.
  984. %% @end
  985. %%
  986. reset_counter({c,g,_} = Key) ->
  987. ?CHK_DIST,
  988. gproc_dist:reset_counter(Key);
  989. reset_counter({c,l,_} = Key) ->
  990. Current = ets:lookup_element(?TAB, {Key, self()}, 3),
  991. Initial = case ets:lookup(?TAB, {self(), Key}) of
  992. [{_, r}] -> 0;
  993. [{_, Opts}] ->
  994. proplists:get_value(initial, Opts, 0)
  995. end,
  996. {Current, update_counter(Key, Initial - Current)}.
  997. update_shared_counter({c,g,_} = Key, Incr) ->
  998. ?CHK_DIST,
  999. gproc_dist:update_shared_counter(Key, Incr);
  1000. update_shared_counter({c,l,_} = Key, Incr) ->
  1001. gproc_lib:update_counter(Key, Incr, shared).
  1002. %% @spec (From::key(), To::pid() | key()) -> undefined | pid()
  1003. %%
  1004. %% @doc Atomically transfers the key `From' to the process identified by `To'.
  1005. %%
  1006. %% This function transfers any gproc key (name, property, counter, aggr counter)
  1007. %% from one process to another, and returns the pid of the new owner.
  1008. %%
  1009. %% `To' must be either a pid or a unique name (name or aggregated counter), but
  1010. %% does not necessarily have to resolve to an existing process. If there is
  1011. %% no process registered with the `To' key, `give_away/2' returns `undefined',
  1012. %% and the `From' key is effectively unregistered.
  1013. %%
  1014. %% It is allowed to give away a key to oneself, but of course, this operation
  1015. %% will have no effect.
  1016. %%
  1017. %% Fails with `badarg' if the calling process does not have a `From' key
  1018. %% registered.
  1019. %% @end
  1020. give_away({_,l,_} = Key, ToPid) when is_pid(ToPid), node(ToPid) == node() ->
  1021. call({give_away, Key, ToPid});
  1022. give_away({_,l,_} = Key, {n,l,_} = ToKey) ->
  1023. call({give_away, Key, ToKey});
  1024. give_away({_,g,_} = Key, To) ->
  1025. ?CHK_DIST,
  1026. gproc_dist:give_away(Key, To).
  1027. %% @spec () -> ok
  1028. %%
  1029. %% @doc Unregister all items of the calling process and inform gproc
  1030. %% to forget about the calling process.
  1031. %%
  1032. %% This function is more efficient than letting gproc perform these
  1033. %% cleanup operations.
  1034. %% @end
  1035. goodbye() ->
  1036. process_is_down(self()).
  1037. %% @spec (Key::key(), Msg::any()) -> Msg
  1038. %%
  1039. %% @doc Sends a message to the process, or processes, corresponding to Key.
  1040. %%
  1041. %% If Key belongs to a unique object (name or aggregated counter), this
  1042. %% function will send a message to the corresponding process, or fail if there
  1043. %% is no such process. If Key is for a non-unique object type (counter or
  1044. %% property), Msg will be send to all processes that have such an object.
  1045. %% @end
  1046. %%
  1047. send({T,C,_} = Key, Msg) when C==l; C==g ->
  1048. if T == n orelse T == a ->
  1049. case ets:lookup(?TAB, {Key, T}) of
  1050. [{_, Pid, _}] ->
  1051. Pid ! Msg;
  1052. _ ->
  1053. erlang:error(badarg)
  1054. end;
  1055. T==p orelse T==c ->
  1056. %% BUG - if the key part contains select wildcards, we may end up
  1057. %% sending multiple messages to the same pid
  1058. lists:foreach(fun(Pid) ->
  1059. Pid ! Msg
  1060. end, lookup_pids(Key)),
  1061. Msg;
  1062. true ->
  1063. erlang:error(badarg)
  1064. end;
  1065. send(_, _) ->
  1066. erlang:error(badarg).
  1067. %% @spec (Context :: context()) -> key() | '$end_of_table'
  1068. %%
  1069. %% @doc Behaves as ets:first(Tab) for a given type of registration object.
  1070. %%
  1071. %% See [http://www.erlang.org/doc/man/ets.html#first-1].
  1072. %% The registry behaves as an ordered_set table.
  1073. %% @end
  1074. %%
  1075. first(Context) ->
  1076. {S, T} = get_s_t(Context),
  1077. {HeadPat,_} = headpat({S, T}, '_', '_', '_'),
  1078. case ets:select(?TAB, [{HeadPat,[],[{element,1,'$_'}]}], 1) of
  1079. {[First], _} ->
  1080. First;
  1081. _ ->
  1082. '$end_of_table'
  1083. end.
  1084. %% @spec (Context :: context()) -> key() | '$end_of_table'
  1085. %%
  1086. %% @doc Behaves as ets:last(Tab) for a given type of registration object.
  1087. %%
  1088. %% See [http://www.erlang.org/doc/man/ets.html#last-1].
  1089. %% The registry behaves as an ordered_set table.
  1090. %% @end
  1091. %%
  1092. last(Context) ->
  1093. {S, T} = get_s_t(Context),
  1094. S1 = if S == '_'; S == l -> m; % 'm' comes after 'l'
  1095. S == g -> h % 'h' comes between 'g' & 'l'
  1096. end,
  1097. Beyond = {{T,S1,[]},[]},
  1098. step(ets:prev(?TAB, Beyond), S, T).
  1099. %% @spec (Context::context(), Key::key()) -> key() | '$end_of_table'
  1100. %%
  1101. %% @doc Behaves as ets:next(Tab,Key) for a given type of registration object.
  1102. %%
  1103. %% See [http://www.erlang.org/doc/man/ets.html#next-2].
  1104. %% The registry behaves as an ordered_set table.
  1105. %% @end
  1106. %%
  1107. next(Context, K) ->
  1108. {S,T} = get_s_t(Context),
  1109. step(ets:next(?TAB,K), S, T).
  1110. %% @spec (Context::context(), Key::key()) -> key() | '$end_of_table'
  1111. %%
  1112. %% @doc Behaves as ets:prev(Tab,Key) for a given type of registration object.
  1113. %%
  1114. %% See [http://www.erlang.org/doc/man/ets.html#prev-2].
  1115. %% The registry behaves as an ordered_set table.
  1116. %% @end
  1117. %%
  1118. prev(Context, K) ->
  1119. {S, T} = get_s_t(Context),
  1120. step(ets:prev(?TAB, K), S, T).
  1121. step(Key, '_', '_') ->
  1122. case Key of
  1123. {{_,_,_},_} -> Key;
  1124. _ -> '$end_of_table'
  1125. end;
  1126. step(Key, '_', T) ->
  1127. case Key of
  1128. {{T,_,_},_} -> Key;
  1129. _ -> '$end_of_table'
  1130. end;
  1131. step(Key, S, '_') ->
  1132. case Key of
  1133. {{_, S, _}, _} -> Key;
  1134. _ -> '$end_of_table'
  1135. end;
  1136. step(Key, S, T) ->
  1137. case Key of
  1138. {{T, S, _}, _} -> Key;
  1139. _ -> '$end_of_table'
  1140. end.
  1141. %% @spec (Pid::pid()) -> ProcessInfo
  1142. %% ProcessInfo = [{gproc, [{Key,Value}]} | ProcessInfo]
  1143. %%
  1144. %% @doc Similar to `process_info(Pid)' but with additional gproc info.
  1145. %%
  1146. %% Returns the same information as process_info(Pid), but with the
  1147. %% addition of a `gproc' information item, containing the `{Key,Value}'
  1148. %% pairs registered to the process.
  1149. %% @end
  1150. info(Pid) when is_pid(Pid) ->
  1151. Items = [?MODULE | [ I || {I,_} <- process_info(self())]],
  1152. [info(Pid,I) || I <- Items].
  1153. %% @spec (Pid::pid(), Item::atom()) -> {Item, Info}
  1154. %%
  1155. %% @doc Similar to process_info(Pid, Item), but with additional gproc info.
  1156. %%
  1157. %% For `Item = gproc', this function returns a list of `{Key, Value}' pairs
  1158. %% registered to the process Pid. For other values of Item, it returns the
  1159. %% same as [http://www.erlang.org/doc/man/erlang.html#process_info-2].
  1160. %% @end
  1161. info(Pid, ?MODULE) ->
  1162. Keys = ets:select(?TAB, [{ {{Pid,'$1'}, '_'}, [], ['$1'] }]),
  1163. {?MODULE, lists:zf(
  1164. fun(K) ->
  1165. try V = get_value(K, Pid),
  1166. {true, {K,V}}
  1167. catch
  1168. error:_ ->
  1169. false
  1170. end
  1171. end, Keys)};
  1172. info(Pid, I) ->
  1173. process_info(Pid, I).
  1174. %% @spec () -> ok
  1175. %%
  1176. %% @doc Similar to the built-in shell command `i()' but inserts information
  1177. %% about names and properties registered in Gproc, where applicable.
  1178. %% @end
  1179. i() ->
  1180. gproc_info:i().
  1181. %%% ==========================================================
  1182. %% @hidden
  1183. handle_cast({monitor_me, Pid}, S) ->
  1184. erlang:monitor(process, Pid),
  1185. {noreply, S};
  1186. handle_cast({cancel_wait, Pid, {T,_,_} = Key, Ref}, S) ->
  1187. Rev = {Pid,Key},
  1188. case ets:lookup(?TAB, {Key,T}) of
  1189. [{K, Waiters}] ->
  1190. case Waiters -- [{Pid,Ref}] of
  1191. [] ->
  1192. ets:delete(?TAB, K),
  1193. ets:delete(?TAB, Rev);
  1194. NewWaiters ->
  1195. ets:insert(?TAB, {K, NewWaiters}),
  1196. case lists:keymember(Pid, 1, NewWaiters) of
  1197. true ->
  1198. %% should be extremely unlikely
  1199. ok;
  1200. false ->
  1201. %% delete the reverse entry
  1202. ets:delete(?TAB, Rev)
  1203. end
  1204. end;
  1205. _ ->
  1206. ignore
  1207. end,
  1208. {noreply, S}.
  1209. %% @hidden
  1210. handle_call({reg, {_T,l,_} = Key, Val}, {Pid,_}, S) ->
  1211. case try_insert_reg(Key, Val, Pid) of
  1212. true ->
  1213. _ = gproc_lib:ensure_monitor(Pid,l),
  1214. {reply, true, S};
  1215. false ->
  1216. {reply, badarg, S}
  1217. end;
  1218. handle_call({reg_shared, {_T,l,_} = Key, Val}, _From, S) ->
  1219. case try_insert_reg(Key, Val, shared) of
  1220. %% case try_insert_shared(Key, Val) of
  1221. true ->
  1222. {reply, true, S};
  1223. false ->
  1224. {reply, badarg, S}
  1225. end;
  1226. handle_call({unreg, {_,l,_} = Key}, {Pid,_}, S) ->
  1227. case ets:member(?TAB, {Pid,Key}) of
  1228. true ->
  1229. _ = gproc_lib:remove_reg(Key, Pid),
  1230. {reply, true, S};
  1231. false ->
  1232. {reply, badarg, S}
  1233. end;
  1234. handle_call({unreg_shared, {_,l,_} = Key}, _, S) ->
  1235. _ = gproc_lib:remove_reg(Key, shared),
  1236. {reply, true, S};
  1237. handle_call({await, {_,l,_} = Key, Pid}, From, S) ->
  1238. %% Passing the pid explicitly is needed when leader_call is used,
  1239. %% since the Pid given as From in the leader is the local gen_leader
  1240. %% instance on the calling node.
  1241. case gproc_lib:await(Key, Pid, From) of
  1242. noreply ->
  1243. {noreply, S};
  1244. {reply, Reply, _} ->
  1245. {reply, Reply, S}
  1246. end;
  1247. handle_call({mreg, T, l, L}, {Pid,_}, S) ->
  1248. try gproc_lib:insert_many(T, l, L, Pid) of
  1249. {true,_} -> {reply, true, S};
  1250. false -> {reply, badarg, S}
  1251. catch
  1252. error:_ -> {reply, badarg, S}
  1253. end;
  1254. handle_call({munreg, T, l, L}, {Pid,_}, S) ->
  1255. _ = gproc_lib:remove_many(T, l, L, Pid),
  1256. {reply, true, S};
  1257. handle_call({set, {_,l,_} = Key, Value}, {Pid,_}, S) ->
  1258. case gproc_lib:do_set_value(Key, Value, Pid) of
  1259. true ->
  1260. {reply, true, S};
  1261. false ->
  1262. {reply, badarg, S}
  1263. end;
  1264. handle_call({audit_process, Pid}, _, S) ->
  1265. case is_process_alive(Pid) of
  1266. false ->
  1267. process_is_down(Pid);
  1268. true ->
  1269. ignore
  1270. end,
  1271. {reply, ok, S};
  1272. handle_call({give_away, Key, To}, {Pid,_}, S) ->
  1273. Reply = do_give_away(Key, To, Pid),
  1274. {reply, Reply, S};
  1275. handle_call(_, _, S) ->
  1276. {reply, badarg, S}.
  1277. %% @hidden
  1278. handle_info({'DOWN', _MRef, process, Pid, _}, S) ->
  1279. process_is_down(Pid),
  1280. {noreply, S};
  1281. handle_info(_, S) ->
  1282. {noreply, S}.
  1283. %% @hidden
  1284. code_change(_FromVsn, S, _Extra) ->
  1285. %% We have changed local monitor markers from {Pid} to {Pid,l}.
  1286. _ = case ets:select(?TAB, [{{'$1'},[],['$1']}]) of
  1287. [] ->
  1288. ok;
  1289. Pids ->
  1290. ets:insert(?TAB, [{P,l} || P <- Pids]),
  1291. ets:select_delete(?TAB, [{{'_'},[],[true]}])
  1292. end,
  1293. {ok, S}.
  1294. %% @hidden
  1295. terminate(_Reason, _S) ->
  1296. ok.
  1297. call(Req) ->
  1298. call(Req, l).
  1299. call(Req, l) ->
  1300. chk_reply(gen_server:call(?MODULE, Req), Req);
  1301. call(Req, g) ->
  1302. chk_reply(gproc_dist:leader_call(Req), Req).
  1303. chk_reply(Reply, Req) ->
  1304. case Reply of
  1305. badarg -> erlang:error(badarg, Req);
  1306. _ -> Reply
  1307. end.
  1308. cast(Msg) ->
  1309. cast(Msg, l).
  1310. cast(Msg, l) ->
  1311. gen_server:cast(?MODULE, Msg);
  1312. cast(Msg, g) ->
  1313. gproc_dist:leader_cast(Msg).
  1314. try_insert_reg({T,l,_} = Key, Val, Pid) ->
  1315. case gproc_lib:insert_reg(Key, Val, Pid, l) of
  1316. false ->
  1317. case ets:lookup(?TAB, {Key,T}) of
  1318. %% In this particular case, the lookup cannot result in
  1319. %% [{_, Waiters}], since the insert_reg/4 function would
  1320. %% have succeeded then.
  1321. [{_, OtherPid, _}] ->
  1322. case is_process_alive(OtherPid) of
  1323. true ->
  1324. false;
  1325. false ->
  1326. process_is_down(OtherPid),
  1327. true = gproc_lib:insert_reg(Key, Val, Pid, l)
  1328. end;
  1329. [] ->
  1330. false
  1331. end;
  1332. true ->
  1333. true
  1334. end.
  1335. %% try_insert_shared({c,l,_} = Key, Val) ->
  1336. %% ets:insert_new(?TAB, [{{Key,shared}, shared, Val}, {{shared, Key}, []}]);
  1337. %% try_insert_shared({a,l,_} = Key, Val) ->
  1338. %% ets:insert_new(?TAB, [{{Key, a}, shared, Val}, {{shared, Key}, []}]).
  1339. -spec audit_process(pid()) -> ok.
  1340. audit_process(Pid) when is_pid(Pid) ->
  1341. ok = gen_server:call(gproc, {audit_process, Pid}, infinity).
  1342. -spec process_is_down(pid()) -> ok.
  1343. process_is_down(Pid) when is_pid(Pid) ->
  1344. %% delete the monitor marker
  1345. %% io:fwrite(user, "process_is_down(~p) - ~p~n", [Pid,ets:tab2list(?TAB)]),
  1346. Marker = {Pid,l},
  1347. case ets:member(?TAB, Marker) of
  1348. false ->
  1349. ok;
  1350. true ->
  1351. Revs = ets:select(?TAB, [{{{Pid,'$1'}, '_'},
  1352. [{'==',{element,2,'$1'},l}], ['$1']}]),
  1353. lists:foreach(
  1354. fun({n,l,_}=K) ->
  1355. Key = {K,n},
  1356. case ets:lookup(?TAB, Key) of
  1357. [{_, Pid, _}] ->
  1358. ets:delete(?TAB, Key);
  1359. [{_, Waiters}] ->
  1360. case [W || {P,_} = W <- Waiters,
  1361. P =/= Pid] of
  1362. [] ->
  1363. ets:delete(?TAB, Key);
  1364. Waiters1 ->
  1365. ets:insert(?TAB, {Key, Waiters1})
  1366. end;
  1367. [] ->
  1368. true
  1369. end;
  1370. ({c,l,C} = K) ->
  1371. Key = {K, Pid},
  1372. [{_, _, Value}] = ets:lookup(?TAB, Key),
  1373. ets:delete(?TAB, Key),
  1374. gproc_lib:update_aggr_counter(l, C, -Value);
  1375. ({a,l,_} = K) ->
  1376. ets:delete(?TAB, {K,a});
  1377. ({p,_,_} = K) ->
  1378. ets:delete(?TAB, {K, Pid})
  1379. end, Revs),
  1380. ets:select_delete(?TAB, [{{{Pid,{'_',l,'_'}},'_'}, [], [true]}]),
  1381. ets:delete(?TAB, Marker),
  1382. ok
  1383. end.
  1384. do_give_away({T,l,_} = K, To, Pid) when T==n; T==a ->
  1385. Key = {K, T},
  1386. case ets:lookup(?TAB, Key) of
  1387. [{_, Pid, Value}] ->
  1388. %% Pid owns the reg; allowed to give_away
  1389. case pid_to_give_away_to(To) of
  1390. Pid ->
  1391. %% Give away to ourselves? Why not? We'll allow it,
  1392. %% but nothing needs to be done.
  1393. Pid;
  1394. ToPid when is_pid(ToPid) ->
  1395. ets:insert(?TAB, [{Key, ToPid, Value},
  1396. {{ToPid, K}, []}]),
  1397. ets:delete(?TAB, {Pid, K}),
  1398. _ = gproc_lib:ensure_monitor(ToPid, l),
  1399. ToPid;
  1400. undefined ->
  1401. _ = gproc_lib:remove_reg(K, Pid),
  1402. undefined
  1403. end;
  1404. _ ->
  1405. badarg
  1406. end;
  1407. do_give_away({T,l,_} = K, To, Pid) when T==c; T==p ->
  1408. Key = {K, Pid},
  1409. case ets:lookup(?TAB, Key) of
  1410. [{_, Pid, Value}] ->
  1411. case pid_to_give_away_to(To) of
  1412. ToPid when is_pid(ToPid) ->
  1413. ToKey = {K, ToPid},
  1414. case ets:member(?TAB, ToKey) of
  1415. true ->
  1416. badarg;
  1417. false ->
  1418. ets:insert(?TAB, [{ToKey, ToPid, Value},
  1419. {{ToPid, K}, []}]),
  1420. ets:delete(?TAB, {Pid, K}),
  1421. ets:delete(?TAB, Key),
  1422. _ = gproc_lib:ensure_monitor(ToPid, l),
  1423. ToPid
  1424. end;
  1425. undefined ->
  1426. _ = gproc_lib:remove_reg(K, Pid),
  1427. undefined
  1428. end;
  1429. _ ->
  1430. badarg
  1431. end.
  1432. pid_to_give_away_to(P) when is_pid(P), node(P) == node() ->
  1433. P;
  1434. pid_to_give_away_to({T,l,_} = Key) when T==n; T==a ->
  1435. case ets:lookup(?TAB, {Key, T}) of
  1436. [{_, Pid, _}] ->
  1437. Pid;
  1438. _ ->
  1439. undefined
  1440. end.
  1441. create_tabs() ->
  1442. Opts = gproc_lib:valid_opts(ets_options, [{write_concurrency,true},
  1443. {read_concurrency, true}]),
  1444. case ets:info(?TAB, name) of
  1445. undefined ->
  1446. ets:new(?TAB, [ordered_set, public, named_table | Opts]);
  1447. _ ->
  1448. ok
  1449. end.
  1450. %% @hidden
  1451. init([]) ->
  1452. set_monitors(),
  1453. {ok, #state{}}.
  1454. set_monitors() ->
  1455. set_monitors(ets:select(?TAB, [{{{'$1',l}},[],['$1']}], 100)).
  1456. set_monitors('$end_of_table') ->
  1457. ok;
  1458. set_monitors({Pids, Cont}) ->
  1459. _ = [erlang:monitor(process,Pid) || Pid <- Pids],
  1460. set_monitors(ets:select(Cont)).
  1461. monitor_me() ->
  1462. case ets:insert_new(?TAB, {{self(),l}}) of
  1463. false -> true;
  1464. true ->
  1465. cast({monitor_me,self()}),
  1466. true
  1467. end.
  1468. pattern([{'_', Gs, As}], T) ->
  1469. ?l,
  1470. {HeadPat, Vs} = headpat(T, '$1', '$2', '$3'),
  1471. [{HeadPat, rewrite(Gs,Vs), rewrite(As,Vs)}];
  1472. pattern([{{A,B,C},Gs,As}], Scope) ->
  1473. ?l,
  1474. {HeadPat, Vars} = headpat(Scope, A,B,C),
  1475. [{HeadPat, rewrite(Gs,Vars), rewrite(As,Vars)}];
  1476. pattern([{Head, Gs, As}], Scope) ->
  1477. ?l,
  1478. {S, T} = get_s_t(Scope),
  1479. case is_var(Head) of
  1480. {true,_N} ->
  1481. HeadPat = {{{T,S,'_'},'_'},'_','_'},
  1482. Vs = [{Head, obj_prod()}],
  1483. %% the headpat function should somehow verify that Head is
  1484. %% consistent with Scope (or should we add a guard?)
  1485. [{HeadPat, rewrite(Gs, Vs), rewrite(As, Vs)}];
  1486. false ->
  1487. erlang:error(badarg)
  1488. end.
  1489. %% This is the expression to use in guards and the RHS to address the whole
  1490. %% object, in its logical representation.
  1491. obj_prod() ->
  1492. {{ {element,1,{element,1,'$_'}},
  1493. {element,2,'$_'},
  1494. {element,3,'$_'} }}.
  1495. obj_prod_l() ->
  1496. [ {element,1,{element,1,'$_'}},
  1497. {element,2,'$_'},
  1498. {element,3,'$_'} ].
  1499. headpat({S, T}, V1,V2,V3) ->
  1500. headpat(type(T), scope(S), V1,V2,V3);
  1501. headpat(T, V1, V2, V3) when is_atom(T) ->
  1502. headpat(type(T), l, V1, V2, V3);
  1503. headpat(_, _, _, _) -> erlang:error(badarg).
  1504. headpat(T, C, V1,V2,V3) ->
  1505. Rf = fun(Pos) ->
  1506. {element,Pos,{element,1,{element,1,'$_'}}}
  1507. end,
  1508. K2 = if T==n orelse T==a -> T;
  1509. true -> '_'
  1510. end,
  1511. {Kp,Vars} = case V1 of
  1512. {Vt,Vc,Vn} ->
  1513. ?l,
  1514. {T1,Vs1} = subst(T,Vt,fun() -> Rf(1) end,[]),
  1515. {C1,Vs2} = subst(C,Vc,fun() -> Rf(2) end,Vs1),
  1516. {{T1,C1,Vn}, Vs2};
  1517. '_' ->
  1518. ?l,
  1519. {{T,C,'_'}, []};
  1520. _ ->
  1521. ?l,
  1522. case is_var(V1) of
  1523. {true,_} ->
  1524. {{T,C,V1}, [{V1, {element,1,
  1525. {element,1,'$_'}}}]};
  1526. false ->
  1527. erlang:error(badarg)
  1528. end
  1529. end,
  1530. {{{Kp,K2},V2,V3}, Vars}.
  1531. %% l(L) -> L.
  1532. subst(X, '_', _F, Vs) ->
  1533. {X, Vs};
  1534. subst(X, V, F, Vs) ->
  1535. case is_var(V) of
  1536. {true,_} ->
  1537. {X, [{V,F()}|Vs]};
  1538. false ->
  1539. {V, Vs}
  1540. end.
  1541. scope('_') -> '_';
  1542. scope(all) -> '_';
  1543. scope(global) -> g;
  1544. scope(local) -> l;
  1545. scope(S) when S==l; S==g -> S.
  1546. type('_') -> '_';
  1547. type(all) -> '_';
  1548. type(T) when T==n; T==p; T==c; T==a -> T;
  1549. type(names) -> n;
  1550. type(props) -> p;
  1551. type(counters) -> c;
  1552. type(aggr_counters) -> a.
  1553. rev_keypat(Context) ->
  1554. {S,T} = get_s_t(Context),
  1555. {T,S,'_'}.
  1556. get_s_t({S,T}) -> {scope(S), type(T)};
  1557. get_s_t(T) when is_atom(T) ->
  1558. {scope(all), type(T)}.
  1559. is_var('$1') -> {true,1};
  1560. is_var('$2') -> {true,2};
  1561. is_var('$3') -> {true,3};
  1562. is_var('$4') -> {true,4};
  1563. is_var('$5') -> {true,5};
  1564. is_var('$6') -> {true,6};
  1565. is_var('$7') -> {true,7};
  1566. is_var('$8') -> {true,8};
  1567. is_var('$9') -> {true,9};
  1568. is_var(X) when is_atom(X) ->
  1569. case atom_to_list(X) of
  1570. "\$" ++ Tl ->
  1571. try N = list_to_integer(Tl),
  1572. {true,N}
  1573. catch
  1574. error:_ ->
  1575. false
  1576. end;
  1577. _ ->
  1578. false
  1579. end;
  1580. is_var(_) -> false.
  1581. rewrite(Gs, R) ->
  1582. [rewrite1(G, R) || G <- Gs].
  1583. rewrite1('$_', _) ->
  1584. obj_prod();
  1585. rewrite1('$$', _) ->
  1586. obj_prod_l();
  1587. rewrite1(Guard, R) when is_tuple(Guard) ->
  1588. list_to_tuple([rewrite1(G, R) || G <- tuple_to_list(Guard)]);
  1589. rewrite1(Exprs, R) when is_list(Exprs) ->
  1590. [rewrite1(E, R) || E <- Exprs];
  1591. rewrite1(V, R) when is_atom(V) ->
  1592. case is_var(V) of
  1593. {true,_N} ->
  1594. case lists:keysearch(V, 1, R) of
  1595. {value, {_, V1}} ->
  1596. V1;
  1597. false ->
  1598. V
  1599. end;
  1600. false ->
  1601. V
  1602. end;
  1603. rewrite1(Expr, _) ->
  1604. Expr.
  1605. %% @spec () -> any()
  1606. %%
  1607. %% @doc
  1608. %% @equiv table({all, all})
  1609. %% @end
  1610. table() ->
  1611. table({all, all}).
  1612. %% @spec (Context::context()) -> any()
  1613. %%
  1614. %% @doc
  1615. %% @equiv table(Context, [])
  1616. %% @end
  1617. %%
  1618. table(Context) ->
  1619. table(Context, []).
  1620. %% @spec (Context::context(), Opts) -> any()
  1621. %%
  1622. %% @doc QLC table generator for the gproc registry.
  1623. %% Context specifies which subset of the registry should be queried.
  1624. %% See [http://www.erlang.org/doc/man/qlc.html].
  1625. %% @end
  1626. table(Context, Opts) ->
  1627. Ctxt = {_, Type} = get_s_t(Context),
  1628. [Traverse, NObjs] = [proplists:get_value(K,Opts,Def) ||
  1629. {K,Def} <- [{traverse,select}, {n_objects,100}]],
  1630. TF = case Traverse of
  1631. first_next ->
  1632. fun() -> qlc_next(Ctxt, first(Ctxt)) end;
  1633. last_prev -> fun() -> qlc_prev(Ctxt, last(Ctxt)) end;
  1634. select ->
  1635. fun(MS) -> qlc_select(select(Ctxt, MS, NObjs)) end;
  1636. {select,MS} ->
  1637. fun() -> qlc_select(select(Ctxt, MS, NObjs)) end;
  1638. _ ->
  1639. erlang:error(badarg, [Ctxt,Opts])
  1640. end,
  1641. InfoFun = fun(indices) -> [2];
  1642. (is_unique_objects) -> is_unique(Type);
  1643. (keypos) -> 1;
  1644. (is_sorted_key) -> true;
  1645. (num_of_objects) ->
  1646. %% this is just a guesstimate.
  1647. trunc(ets:info(?TAB,size) / 2.5)
  1648. end,
  1649. LookupFun =
  1650. case Traverse of
  1651. {select, _MS} -> undefined;
  1652. _ -> fun(Pos, Ks) -> qlc_lookup(Ctxt, Pos, Ks) end
  1653. end,
  1654. qlc:table(TF, [{info_fun, InfoFun},
  1655. {lookup_fun, LookupFun}] ++ [{K,V} || {K,V} <- Opts,
  1656. K =/= traverse,
  1657. K =/= n_objects]).
  1658. qlc_lookup(_Scope, 1, Keys) ->
  1659. lists:flatmap(
  1660. fun(Key) ->
  1661. ets:select(?TAB, [{ {{Key,'_'},'_','_'}, [],
  1662. [{{ {element,1,{element,1,'$_'}},
  1663. {element,2,'$_'},
  1664. {element,3,'$_'} }}] }])
  1665. end, Keys);
  1666. qlc_lookup(Scope, 2, Pids) ->
  1667. lists:flatmap(fun(Pid) ->
  1668. Found =
  1669. ets:select(?TAB, [{{{Pid, rev_keypat(Scope)}, '_'},
  1670. [], ['$_']}]),
  1671. lists:flatmap(
  1672. fun({{_,{T,_,_}=K}, _}) ->
  1673. K2 = if T==n orelse T==a -> T;
  1674. true -> Pid
  1675. end,
  1676. case ets:lookup(?TAB, {K,K2}) of
  1677. [{{Key,_},_,Value}] ->
  1678. [{Key, Pid, Value}];
  1679. [] ->
  1680. []
  1681. end
  1682. end, Found)
  1683. end, Pids).
  1684. qlc_next(_, '$end_of_table') -> [];
  1685. qlc_next(Scope, K) ->
  1686. case ets:lookup(?TAB, K) of
  1687. [{{Key,_}, Pid, V}] ->
  1688. [{Key,Pid,V}] ++ fun() -> qlc_next(Scope, next(Scope, K)) end;
  1689. [] ->
  1690. qlc_next(Scope, next(Scope, K))
  1691. end.
  1692. qlc_prev(_, '$end_of_table') -> [];
  1693. qlc_prev(Scope, K) ->
  1694. case ets:lookup(?TAB, K) of
  1695. [{{Key,_},Pid,V}] ->
  1696. [{Key,Pid,V}] ++ fun() -> qlc_prev(Scope, prev(Scope, K)) end;
  1697. [] ->
  1698. qlc_prev(Scope, prev(Scope, K))
  1699. end.
  1700. qlc_select('$end_of_table') ->
  1701. [];
  1702. qlc_select({Objects, Cont}) ->
  1703. Objects ++ fun() -> qlc_select(ets:select(Cont)) end.
  1704. is_unique(n) -> true;
  1705. is_unique(a) -> true;
  1706. is_unique(_) -> false.