gproc.erl 100 KB


  1. %% ``The contents of this file are subject to the Erlang Public License,
  2. %% Version 1.1, (the "License"); you may not use this file except in
  3. %% compliance with the License. You should have received a copy of the
  4. %% Erlang Public License along with this software. If not, it can be
  5. %% retrieved via the world wide web at http://www.erlang.org/.
  6. %%
  7. %% Software distributed under the License is distributed on an "AS IS"
  8. %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  9. %% the License for the specific language governing rights and limitations
  10. %% under the License.
  11. %%
  12. %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
  13. %% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
  14. %% AB. All Rights Reserved.''
  15. %%
  16. %% @author Ulf Wiger <ulf@wiger.net>
  17. %%
  18. %% @doc Extended process registry
  19. %% This module implements an extended process registry
  20. %%
  21. %% For a detailed description, see
  22. %% <a href="erlang07-wiger.pdf">erlang07-wiger.pdf</a>.
  23. %%
  24. %% <b>NOTE:</b> The functions in the Gproc API expect the Gproc application
  25. %% to be running.
  26. %%
  27. %% <h2>Tuning Gproc performance</h2>
  28. %%
  29. %% Gproc relies on a central server and an ordered-set ets table.
  30. %% Effort is made to perform as much work as possible in the client without
  31. %% sacrificing consistency. A few things can be tuned by setting the following
  32. %% application environment variables in the top application of `gproc'
  33. %% (usually `gproc'):
  34. %%
  35. %% * `{ets_options, list()}' - Currently, the options `{write_concurrency, F}'
  36. %% and `{read_concurrency, F}' are allowed. The default is
  37. %% `[{write_concurrency, true}, {read_concurrency, true}]'
  38. %% * `{server_options, list()}' - These will be passed as spawn options when
  39. %% starting the `gproc' and `gproc_dist' servers. Default is `[]'. It is
  40. %% likely that `{priority, high | max}' and/or increasing `min_heap_size'
  41. %% will improve performance.
  42. %%
  43. %% @end
  44. -module(gproc).
  45. -behaviour(gen_server).
  46. -export([start_link/0,
  47. reg/1, reg/2, reg/3, unreg/1, set_attributes/2,
  48. reg_other/2, reg_other/3, reg_other/4, unreg_other/2,
  49. reg_or_locate/1, reg_or_locate/2, reg_or_locate/3,
  50. reg_shared/1, reg_shared/2, reg_shared/3, unreg_shared/1,
  51. set_attributes_shared/2, set_value_shared/2,
  52. ensure_reg/1, ensure_reg/2, ensure_reg/3,
  53. ensure_reg_other/2, ensure_reg_other/3, ensure_reg_other/4,
  54. mreg/3,
  55. munreg/3,
  56. set_value/2,
  57. get_value/1, get_value/2, get_value_shared/1,
  58. get_attribute/2, get_attribute/3, get_attribute_shared/2,
  59. get_attributes/1, get_attributes/2,
  60. where/1,
  61. await/1, await/2, await/3,
  62. wide_await/3,
  63. nb_wait/1, nb_wait/2,
  64. cancel_wait/2, cancel_wait/3,
  65. cancel_wait_or_monitor/1,
  66. monitor/1, monitor/2,
  67. demonitor/2,
  68. lookup_pid/1,
  69. lookup_pids/1,
  70. lookup_value/1,
  71. lookup_values/1,
  72. update_counter/2, update_counter/3,
  73. update_counters/2,
  74. reset_counter/1,
  75. update_shared_counter/2,
  76. give_away/2,
  77. goodbye/0,
  78. send/2,
  79. bcast/2, bcast/3,
  80. info/1, info/2,
  81. i/0,
  82. select/1, select/2, select/3,
  83. select_count/1, select_count/2,
  84. first/1,
  85. next/2,
  86. prev/2,
  87. last/1,
  88. table/0, table/1, table/2]).
  89. %% Environment handling
  90. -export([get_env/3, get_env/4,
  91. get_set_env/3, get_set_env/4,
  92. set_env/5]).
  93. %% Convenience functions
  94. -export([add_local_name/1,
  95. add_global_name/1,
  96. add_local_property/2,
  97. add_global_property/2,
  98. add_local_counter/2,
  99. add_global_counter/2,
  100. add_local_aggr_counter/1,
  101. add_global_aggr_counter/1,
  102. add_shared_local_counter/2,
  103. lookup_local_name/1,
  104. lookup_global_name/1,
  105. lookup_local_properties/1,
  106. lookup_global_properties/1,
  107. lookup_local_counters/1,
  108. lookup_global_counters/1,
  109. lookup_local_aggr_counter/1,
  110. lookup_global_aggr_counter/1]).
  111. %% Callbacks for behaviour support
  112. -export([whereis_name/1,
  113. register_name/2,
  114. unregister_name/1]).
  115. -export([default/1]).
  116. %%% internal exports
  117. -export([init/1,
  118. handle_cast/2,
  119. handle_call/3,
  120. handle_info/2,
  121. code_change/3,
  122. terminate/2]).
  123. %% this shouldn't be necessary
  124. -export([audit_process/1]).
  125. -include("gproc_int.hrl").
  126. -include("gproc.hrl").
  127. -export_type([scope/0, type/0, key/0,
  128. context/0, sel_pattern/0, sel_scope/0, sel_context/0,
  129. reg_id/0, unique_id/0, monitor_type/0]).
  130. -type type() :: n | p | c | a | r | rc.
  131. -type scope() :: l | g.
  132. -type context() :: {scope(),type()} | type().
  133. -type sel_type() :: type()
  134. | names | props | counters | aggr_counters
  135. | resources | resource_counters.
  136. -type sel_var() :: '_' | atom().
  137. -type keypat() :: {sel_type() | sel_var(), l | g | sel_var(), any()}.
  138. -type pidpat() :: pid() | sel_var().
  139. -type headpat() :: {keypat(), pidpat(), any()}.
  140. -type key() :: {type(), scope(), any()}.
  141. -type value() :: any().
  142. -type attr() :: {atom(), any()}.
  143. -type attrs() :: [attr()].
  144. -type sel_pattern() :: [{headpat(), list(), list()}].
  145. -type reg_id() :: {type(), scope(), any()}.
  146. -type unique_id() :: {n | a, scope(), any()}.
  147. -type monitor_type() :: info | standby | follow.
  148. -type sel_scope() :: scope | all | global | local.
  149. -type sel_context() :: {scope(), type()} | type().
  150. %% update_counter increment
  151. -type ctr_incr() :: integer().
  152. -type ctr_thr() :: integer().
  153. -type ctr_setval() :: integer().
  154. -type ctr_update() :: ctr_incr()
  155. | {ctr_incr(), ctr_thr(), ctr_setval()}.
  156. -type increment() :: ctr_incr() | ctr_update() | [ctr_update()].
  157. -define(SERVER, ?MODULE).
  158. %%-define(l, l(?LINE)). % when activated, calls a traceable empty function
  159. -define(l, ignore).
  160. -define(CHK_DIST,
  161. case whereis(gproc_dist) of
  162. undefined ->
  163. ?THROW_GPROC_ERROR(local_only);
  164. _ ->
  165. ok
  166. end).
  167. -define(PID_IS_DEAD(Pid),
  168. (node(Pid) == node() andalso is_process_alive(Pid) == false)).
  169. -record(state, {}).
  170. %% @spec () -> {ok, pid()}
  171. %%
  172. %% @doc Starts the gproc server.
  173. %%
  174. %% This function is intended to be called from gproc_sup, as part of
  175. %% starting the gproc application.
  176. %% @end
  177. start_link() ->
  178. _ = create_tabs(),
  179. SpawnOpts = gproc_lib:valid_opts(server_options, []),
  180. gen_server:start_link({local, ?SERVER}, ?MODULE, [],
  181. [{spawn_opt, SpawnOpts}]).
  182. %% spec(Name::any()) -> true
  183. %%
  184. %% @doc Registers a local (unique) name. @equiv reg({n,l,Name})
  185. %% @end
  186. %%
  187. add_local_name(Name) ->
  188. ?CATCH_GPROC_ERROR(reg1({n,l,Name}, undefined, [], reg), [Name]).
  189. %% spec(Name::any()) -> true
  190. %%
  191. %% @doc Registers a global (unique) name. @equiv reg({n,g,Name})
  192. %% @end
  193. %%
  194. add_global_name(Name) ->
  195. ?CATCH_GPROC_ERROR(reg1({n,g,Name}, undefined, [], reg), [Name]).
  196. %% spec(Name::any(), Value::any()) -> true
  197. %%
  198. %% @doc Registers a local (non-unique) property. @equiv reg({p,l,Name},Value)
  199. %% @end
  200. %%
  201. add_local_property(Name , Value) ->
  202. ?CATCH_GPROC_ERROR(reg1({p,l,Name}, Value, [], reg), [Name, Value]).
  203. %% spec(Name::any(), Value::any()) -> true
  204. %%
  205. %% @doc Registers a global (non-unique) property. @equiv reg({p,g,Name},Value)
  206. %% @end
  207. %%
  208. add_global_property(Name, Value) ->
  209. ?CATCH_GPROC_ERROR(reg1({p,g,Name}, Value, [], reg), [Name, Value]).
  210. %% spec(Name::any(), Initial::integer()) -> true
  211. %%
  212. %% @doc Registers a local (non-unique) counter. @equiv reg({c,l,Name},Value)
  213. %% @end
  214. %%
  215. add_local_counter(Name, Initial) when is_integer(Initial) ->
  216. ?CATCH_GPROC_ERROR(reg1({c,l,Name}, Initial, [], reg), [Name, Initial]).
  217. %% spec(Name::any(), Initial::integer()) -> true
  218. %%
  219. %% @doc Registers a local shared (unique) counter.
  220. %% @equiv reg_shared({c,l,Name},Value)
  221. %% @end
  222. %%
  223. add_shared_local_counter(Name, Initial) when is_integer(Initial) ->
  224. reg_shared({c,l,Name}, Initial).
  225. %% spec(Name::any(), Initial::integer()) -> true
  226. %%
  227. %% @doc Registers a global (non-unique) counter. @equiv reg({c,g,Name},Value)
  228. %% @end
  229. %%
  230. add_global_counter(Name, Initial) when is_integer(Initial) ->
  231. ?CATCH_GPROC_ERROR(reg1({c,g,Name}, Initial, [], reg), [Name, Initial]).
  232. %% spec(Name::any()) -> true
  233. %%
  234. %% @doc Registers a local (unique) aggregated counter.
  235. %% @equiv reg({a,l,Name})
  236. %% @end
  237. %%
  238. add_local_aggr_counter(Name) -> ?CATCH_GPROC_ERROR(reg1({a,l,Name}), [Name]).
  239. %% spec(Name::any()) -> true
  240. %%
  241. %% @doc Registers a global (unique) aggregated counter.
  242. %% @equiv reg({a,g,Name})
  243. %% @end
  244. %%
  245. add_global_aggr_counter(Name) ->
  246. ?CATCH_GPROC_ERROR(reg1({a,g,Name}), [Name]).
  247. %% @spec (Name::any()) -> pid()
  248. %%
  249. %% @doc Lookup a local unique name. Fails if there is no such name.
  250. %% @equiv where({n,l,Name})
  251. %% @end
  252. %%
  253. lookup_local_name(Name) -> where({n,l,Name}).
  254. %% @spec (Name::any()) -> pid()
  255. %%
  256. %% @doc Lookup a global unique name. Fails if there is no such name.
  257. %% @equiv where({n,g,Name})
  258. %% @end
  259. %%
  260. lookup_global_name(Name) -> where({n,g,Name}).
  261. %% @spec (Name::any()) -> integer()
  262. %%
  263. %% @doc Lookup a local (unique) aggregated counter and returns its value.
  264. %% Fails if there is no such object.
  265. %% @equiv where({a,l,Name})
  266. %% @end
  267. %%
  268. lookup_local_aggr_counter(Name) -> lookup_value({a,l,Name}).
  269. %% @spec (Name::any()) -> integer()
  270. %%
  271. %% @doc Lookup a global (unique) aggregated counter and returns its value.
  272. %% Fails if there is no such object.
  273. %% @equiv lookup_value({a,g,Name})
  274. %% @end
  275. %%
  276. lookup_global_aggr_counter(Name) -> lookup_value({a,g,Name}).
  277. %% @spec (Property::any()) -> [{pid(), Value}]
  278. %%
  279. %% @doc Look up all local (non-unique) instances of a given Property.
  280. %% Returns a list of {Pid, Value} tuples for all matching objects.
  281. %% @equiv lookup_values({p, l, Property})
  282. %% @end
  283. %%
  284. lookup_local_properties(P) -> lookup_values({p,l,P}).
  285. %% @spec (Property::any()) -> [{pid(), Value}]
  286. %%
  287. %% @doc Look up all global (non-unique) instances of a given Property.
  288. %% Returns a list of {Pid, Value} tuples for all matching objects.
  289. %% @equiv lookup_values({p, g, Property})
  290. %% @end
  291. %%
  292. lookup_global_properties(P) -> lookup_values({p,g,P}).
  293. %% @spec (Counter::any()) -> [{pid(), Value::integer()}]
  294. %%
  295. %% @doc Look up all local (non-unique) instances of a given Counter.
  296. %% Returns a list of {Pid, Value} tuples for all matching objects.
  297. %% @equiv lookup_values({c, l, Counter})
  298. %% @end
  299. %%
  300. lookup_local_counters(P) -> lookup_values({c,l,P}).
  301. %% @spec (Counter::any()) -> [{pid(), Value::integer()}]
  302. %%
  303. %% @doc Look up all global (non-unique) instances of a given Counter.
  304. %% Returns a list of {Pid, Value} tuples for all matching objects.
  305. %% @equiv lookup_values({c, g, Counter})
  306. %% @end
  307. %%
  308. lookup_global_counters(P) -> lookup_values({c,g,P}).
  309. %% @spec get_env(Scope::scope(), App::atom(), Key::atom()) -> term()
  310. %% @equiv get_env(Scope, App, Key, [app_env])
  311. get_env(Scope, App, Key) ->
  312. get_env(Scope, App, Key, [app_env]).
  313. %% @spec (Scope::scope(), App::atom(), Key::atom(), Strategy) -> term()
  314. %% Strategy = [Alternative]
  315. %% Alternative = app_env
  316. %% | os_env
  317. %% | inherit | {inherit, pid()} | {inherit, unique_id()}
  318. %% | init_arg
  319. %% | {mnesia, ActivityType, Oid, Pos}
  320. %% | {default, term()}
  321. %% | error
  322. %% @doc Read an environment value, potentially cached as a `gproc_env' property.
  323. %%
  324. %% This function first tries to read the value of a cached property,
  325. %% `{p, Scope, {gproc_env, App, Key}}'. If this fails, it will try the provided
  326. %% alternative strategy. `Strategy' is a list of alternatives, tried in order.
  327. %% Each alternative can be one of:
  328. %%
  329. %% * `app_env' - try `application:get_env(App, Key)'
  330. %% * `os_env' - try `os:getenv(ENV)', where `ENV' is `Key' converted into an
  331. %% uppercase string
  332. %% * `{os_env, ENV}' - try `os:getenv(ENV)'
  333. %% * `inherit' - inherit the cached value, if any, held by the parent process.
  334. %% * `{inherit, Pid}' - inherit the cached value, if any, held by `Pid'.
  335. %% * `{inherit, Id}' - inherit the cached value, if any, held by the process
  336. %% registered in `gproc' as `Id'.
  337. %% * `init_arg' - try `init:get_argument(Key)'; expects a single value, if any.
  338. %% * `{mnesia, ActivityType, Oid, Pos}' - try
  339. %% `mnesia:activity(ActivityType, fun() -> mnesia:read(Oid) end)'; retrieve
  340. %% the value in position `Pos' if object found.
  341. %% * `{default, Value}' - set a default value to return once alternatives have
  342. %% been exhausted; if not set, `undefined' will be returned.
  343. %% * `error' - raise an exception, `erlang:error(gproc_env, [App, Key, Scope])'.
  344. %%
  345. %% While any alternative can occur more than once, the only one that might make
  346. %% sense to use multiple times is `{default, Value}'.
  347. %%
  348. %% The return value will be one of:
  349. %%
  350. %% * The value of the first matching alternative, or `error' eception,
  351. %% whichever comes first
  352. %% * The last instance of `{default, Value}', or `undefined', if there is no
  353. %% matching alternative, default or `error' entry in the list.
  354. %%
  355. %% The `error' option can be used to assert that a value has been previously
  356. %% cached. Alternatively, it can be used to assert that a value is either cached
  357. %% or at least defined somewhere,
  358. %% e.g. `get_env(l, mnesia, dir, [app_env, error])'.
  359. %% @end
  360. get_env(Scope, App, Key, Strategy)
  361. when Scope==l, is_atom(App), is_atom(Key);
  362. Scope==g, is_atom(App), is_atom(Key) ->
  363. do_get_env(Scope, App, Key, Strategy, false).
  364. %% @spec get_set_env(Scope::scope(), App::atom(), Key::atom()) -> term()
  365. %% @equiv get_set_env(Scope, App, Key, [app_env])
  366. get_set_env(Scope, App, Key) ->
  367. get_set_env(Scope, App, Key, [app_env]).
  368. %% @spec get_set_env(Scope::scope(), App::atom(), Key::atom(), Strategy) ->
  369. %% Value
  370. %% @doc Fetch and cache an environment value, if not already cached.
  371. %%
  372. %% This function does the same thing as {@link get_env/4}, but also updates the
  373. %% cache. Note that the cache will be updated even if the result of the lookup
  374. %% is `undefined'.
  375. %%
  376. %% @see get_env/4.
  377. %% @end
  378. %%
  379. get_set_env(Scope, App, Key, Strategy)
  380. when Scope==l, is_atom(App), is_atom(Key);
  381. Scope==g, is_atom(App), is_atom(Key) ->
  382. do_get_env(Scope, App, Key, Strategy, true).
  383. do_get_env(Context, App, Key, Alternatives, Set) ->
  384. case lookup_env(Context, App, Key, self()) of
  385. undefined ->
  386. check_alternatives(Alternatives, Context, App, Key, undefined, Set);
  387. {ok, Value} ->
  388. Value
  389. end.
  390. %% @spec set_env(Scope::scope(), App::atom(),
  391. %% Key::atom(), Value::term(), Strategy) -> Value
  392. %% Strategy = [Alternative]
  393. %% Alternative = app_env | os_env | {os_env, VAR}
  394. %% | {mnesia, ActivityType, Oid, Pos}
  395. %%
  396. %% @doc Updates the cached value as well as underlying environment.
  397. %%
  398. %% This function should be exercised with caution, as it affects the larger
  399. %% environment outside gproc. This function modifies the cached value, and then
  400. %% proceeds to update the underlying environment (OS environment variable or
  401. %% application environment variable).
  402. %%
  403. %% When the `mnesia' alternative is used, gproc will try to update any existing
  404. %% object, changing only the `Pos' position. If no such object exists, it will
  405. %% create a new object, setting any other attributes (except `Pos' and the key)
  406. %% to `undefined'.
  407. %% @end
  408. %%
  409. set_env(Scope, App, Key, Value, Strategy)
  410. when Scope==l, is_atom(App), is_atom(Key);
  411. Scope==g, is_atom(App), is_atom(Key) ->
  412. case is_valid_set_strategy(Strategy, Value) of
  413. true ->
  414. update_cached_env(Scope, App, Key, Value),
  415. set_strategy(Strategy, App, Key, Value);
  416. false ->
  417. erlang:error(badarg)
  418. end.
  419. check_alternatives([{default, Val}|Alts], Scope, App, Key, _, Set) ->
  420. check_alternatives(Alts, Scope, App, Key, Val, Set);
  421. check_alternatives([H|T], Scope, App, Key, Def, Set) ->
  422. case try_alternative(H, App, Key, Scope) of
  423. undefined ->
  424. check_alternatives(T, Scope, App, Key, Def, Set);
  425. {ok, Value} ->
  426. if Set ->
  427. cache_env(Scope, App, Key, Value),
  428. Value;
  429. true ->
  430. Value
  431. end
  432. end;
  433. check_alternatives([], Scope, App, Key, Def, Set) ->
  434. if Set ->
  435. cache_env(Scope, App, Key, Def);
  436. true ->
  437. ok
  438. end,
  439. Def.
  440. try_alternative(error, App, Key, Scope) ->
  441. erlang:error(gproc_env, [App, Key, Scope]);
  442. try_alternative(inherit, App, Key, Scope) ->
  443. case get('$ancestors') of
  444. [P|_] ->
  445. lookup_env(Scope, App, Key, P);
  446. _ ->
  447. undefined
  448. end;
  449. try_alternative({inherit, P}, App, Key, Scope) when is_pid(P) ->
  450. lookup_env(Scope, App, Key, P);
  451. try_alternative({inherit, P}, App, Key, Scope) ->
  452. case where(P) of
  453. undefined -> undefined;
  454. Pid when is_pid(Pid) ->
  455. lookup_env(Scope, App, Key, Pid)
  456. end;
  457. try_alternative(app_env, App, Key, _Scope) ->
  458. case application:get_env(App, Key) of
  459. undefined -> undefined;
  460. {ok, undefined} -> undefined;
  461. {ok, Value} -> {ok, Value}
  462. end;
  463. try_alternative(os_env, _App, Key, _) ->
  464. case os:getenv(os_env_key(Key)) of
  465. false -> undefined;
  466. Val -> {ok, Val}
  467. end;
  468. try_alternative({os_env, Key}, _, _, _) ->
  469. case os:getenv(Key) of
  470. false -> undefined;
  471. Val -> {ok, Val}
  472. end;
  473. try_alternative(init_arg, _, Key, _) ->
  474. case init:get_argument(Key) of
  475. {ok, [[Value]]} ->
  476. {ok, Value};
  477. error ->
  478. undefined
  479. end;
  480. try_alternative({mnesia,Type,Key,Pos}, _, _, _) ->
  481. case mnesia:activity(Type, fun() -> mnesia:read(Key) end) of
  482. [] -> undefined;
  483. [Found] ->
  484. {ok, element(Pos, Found)}
  485. end.
  486. os_env_key(Key) ->
  487. string:to_upper(atom_to_list(Key)).
  488. lookup_env(Scope, App, Key, P) ->
  489. case ets:lookup(?TAB, {{p, Scope, {gproc_env, App, Key}}, P}) of
  490. [] ->
  491. undefined;
  492. [{_, _, Value}] ->
  493. {ok, Value}
  494. end.
  495. cache_env(Scope, App, Key, Value) ->
  496. ?CATCH_GPROC_ERROR(
  497. reg1({p, Scope, {gproc_env, App, Key}}, Value, [], reg),
  498. [Scope,App,Key,Value]).
  499. update_cached_env(Scope, App, Key, Value) ->
  500. case lookup_env(Scope, App, Key, self()) of
  501. undefined ->
  502. cache_env(Scope, App, Key, Value);
  503. {ok, _} ->
  504. set_value({p, Scope, {gproc_env, App, Key}}, Value)
  505. end.
  506. is_valid_set_strategy([os_env|T], Value) ->
  507. is_string(Value) andalso is_valid_set_strategy(T, Value);
  508. is_valid_set_strategy([{os_env, _}|T], Value) ->
  509. is_string(Value) andalso is_valid_set_strategy(T, Value);
  510. is_valid_set_strategy([app_env|T], Value) ->
  511. is_valid_set_strategy(T, Value);
  512. is_valid_set_strategy([{mnesia,_Type,_Oid,_Pos}|T], Value) ->
  513. is_valid_set_strategy(T, Value);
  514. is_valid_set_strategy([], _) ->
  515. true;
  516. is_valid_set_strategy(_, _) ->
  517. false.
  518. set_strategy([H|T], App, Key, Value) ->
  519. case H of
  520. app_env ->
  521. application:set_env(App, Key, Value);
  522. os_env ->
  523. os:putenv(os_env_key(Key), Value);
  524. {os_env, ENV} ->
  525. os:putenv(ENV, Value);
  526. {mnesia,Type,Oid,Pos} ->
  527. mnesia:activity(
  528. Type,
  529. fun() ->
  530. Rec = case mnesia:read(Oid) of
  531. [] ->
  532. {Tab,K} = Oid,
  533. Tag = mnesia:table_info(Tab, record_name),
  534. Attrs = mnesia:table_info(Tab, attributes),
  535. list_to_tuple(
  536. [Tag,K |
  537. [undefined || _ <- tl(Attrs)]]);
  538. [Old] ->
  539. Old
  540. end,
  541. mnesia:write(setelement(Pos, Rec, Value))
  542. end)
  543. end,
  544. set_strategy(T, App, Key, Value);
  545. set_strategy([], _, _, Value) ->
  546. Value.
  547. is_string(S) ->
  548. try begin _ = iolist_to_binary(S),
  549. true
  550. end
  551. catch
  552. error:_ ->
  553. false
  554. end.
  555. %% @spec reg(Key::key()) -> true
  556. %%
  557. %% @doc
  558. %% @equiv reg(Key, default(Key), [])
  559. %% @end
  560. reg(Key) ->
  561. ?CATCH_GPROC_ERROR(reg1(Key), [Key]).
  562. reg1(Key) ->
  563. reg1(Key, default(Key), [], reg).
  564. %% @spec reg_or_locate(Key::key()) -> {pid(), NewValue}
  565. %%
  566. %% @doc
  567. %% @equiv reg_or_locate(Key, default(Key))
  568. %% @end
  569. reg_or_locate(Key) ->
  570. ?CATCH_GPROC_ERROR(reg_or_locate1(Key), [Key]).
  571. reg_or_locate1(Key) ->
  572. reg_or_locate1(Key, default(Key), self()).
  573. default({T,_,_}) when T==c -> 0;
  574. default(_) -> undefined.
  575. %% @spec await(Key::key()) -> {pid(),Value}
  576. %% @equiv await(Key,infinity)
  577. %%
  578. await(Key) ->
  579. ?CATCH_GPROC_ERROR(await1(Key, infinity), [Key]).
  580. %% @spec await(Key::key(), Timeout) -> {pid(),Value}
  581. %% Timeout = integer() | infinity
  582. %%
  583. %% @doc Wait for a name or aggregated counter to be registered.
  584. %% The function raises an exception if the timeout expires. Timeout must be
  585. %% either an interger &gt; 0 or 'infinity'.
  586. %% A small optimization: we first perform a lookup, to see if the name
  587. %% is already registered. This way, the cost of the operation will be
  588. %% roughly the same as of where/1 in the case where the name is already
  589. %% registered (the difference: await/2 also returns the value).
  590. %% @end
  591. %%
  592. await(Key, Timeout) ->
  593. ?CATCH_GPROC_ERROR(await1(Key, Timeout), [Key, Timeout]).
  594. %% @spec await(Node::node(), Key::key(), Timeout) -> {pid(),Value}
  595. %% Timeout = integer() | infinity
  596. %%
  597. %% @doc Wait for a name or aggregated counter to be registered on `Node'.
  598. %% This function works exactly like {@link await/2}, but queries a remote
  599. %% node instead. An exception is thrown if `Node' cannot be reached. If gproc
  600. %% is not running on a given node, this is treated the same as the node being
  601. %% down.
  602. %% @end
  603. %%
  604. await(Node, Key, Timeout) when Node == node() ->
  605. await(Key, Timeout);
  606. await(Node, Key, Timeout) when is_atom(Node) ->
  607. ?CATCH_GPROC_ERROR(await1(Node, Key, Timeout), [Node, Key, Timeout]).
  608. await1({T,g,_} = Key, Timeout) when T=:=n; T=:=a; T=:=rc ->
  609. ?CHK_DIST,
  610. request_wait(Key, Timeout);
  611. await1({T,l,_} = Key, Timeout) when T=:=n; T=:=a; T=:=rc ->
  612. case ets:lookup(?TAB, {Key, T}) of
  613. [{_, Pid, Value}] ->
  614. case is_process_alive(Pid) of
  615. true ->
  616. {Pid, Value};
  617. false ->
  618. %% we can send an asynchronous audit request, since the purpose is
  619. %% only to ensure that the server handles the audit before it serves
  620. %% our 'await' request. Strictly speaking, we could allow the bad Pid
  621. %% to be returned, as there are no guarantees that whatever Pid we return
  622. %% will still be alive when addressed. Still, we don't want to knowingly
  623. %% serve bad data.
  624. nb_audit_process(Pid),
  625. request_wait(Key, Timeout)
  626. end;
  627. _ ->
  628. request_wait(Key, Timeout)
  629. end;
  630. await1(_, _) ->
  631. ?THROW_GPROC_ERROR(badarg).
  632. await1(N, {n,l,_} = Key, Timeout) when is_atom(N) ->
  633. request_wait(N, Key, Timeout);
  634. await1(_, _, _) ->
  635. ?THROW_GPROC_ERROR(badarg).
  636. request_wait({_,g,_} = Key, Timeout) ->
  637. request_wait(undefined, Key, Timeout);
  638. request_wait(Key, Timeout) ->
  639. request_wait(node(), Key, Timeout).
  640. request_wait(N, {_,C,_} = Key, Timeout) when C==l; C==g ->
  641. TRef = case Timeout of
  642. infinity -> no_timer;
  643. T when is_integer(T), T > 0 ->
  644. erlang:start_timer(T, self(), gproc_timeout);
  645. _ ->
  646. ?THROW_GPROC_ERROR(badarg)
  647. end,
  648. WRef = case {call(N, {await,Key,self()}, C), C} of
  649. {{R, {Kg,Pg,Vg}}, g} ->
  650. self() ! {gproc, R, registered, {Kg,Pg,Vg}},
  651. R;
  652. {R,_} ->
  653. R
  654. end,
  655. receive
  656. {gproc, WRef, registered, {_K, Pid, V}} ->
  657. _ = case TRef of
  658. no_timer -> ignore;
  659. _ -> erlang:cancel_timer(TRef)
  660. end,
  661. {Pid, V};
  662. {timeout, TRef, gproc_timeout} ->
  663. cancel_wait(N, Key, WRef),
  664. ?THROW_GPROC_ERROR(timeout)
  665. end.
  666. %% @spec wide_await(Nodes::[node()], Key::key(), Timeout) -> {pid(),Value}
  667. %% Timeout = integer() | infinity
  668. %%
  669. %% @doc Wait for a local name to be registered on any of `Nodes'.
  670. %% This function works rather like {@link await/2}, but queries all nodes in
  671. %% the `Nodes' list at the same time. The first node to respond with a
  672. %% process registered as `Key' will provide the result. Other results are
  673. %% ignored. `Key' must be a unique name with local scope, i.e. `{n,l,Name}'.
  674. %%
  675. %% An exception is thrown upon timeout, or if no node can be reached (if gproc is
  676. %% not running on a given node, this is treated the same as the node being down).
  677. %% @end
  678. %%
  679. wide_await(Nodes, Key, Timeout) ->
  680. ?CATCH_GPROC_ERROR(wide_await1(Nodes, Key, Timeout), [Nodes, Key, Timeout]).
  681. wide_await1(Nodes, {T,l,_} = Key, Timeout) when T=:=n; T=:=a ->
  682. {_, Ref} = spawn_monitor(fun() ->
  683. wide_request_wait(Nodes, Key, Timeout)
  684. end),
  685. receive
  686. {'DOWN', Ref, _, _, Reason} ->
  687. case Reason of
  688. {ok, {gproc,_,registered,{_,Pid,V}}} ->
  689. {Pid, V};
  690. Other ->
  691. ?THROW_GPROC_ERROR(Other)
  692. end
  693. end;
  694. wide_await1(_, _, _) ->
  695. ?THROW_GPROC_ERROR(badarg).
  696. wide_request_wait(Nodes, {Tk,l,_} = Key, Timeout) when Tk=:=n; Tk=:=a ->
  697. TRef = case Timeout of
  698. infinity -> no_timer;
  699. T when is_integer(T), T > 0 ->
  700. erlang:start_timer(T, self(), gproc_timeout);
  701. _ ->
  702. exit(badarg)
  703. end,
  704. Req = {await, Key, self()},
  705. Refs = lists:map(
  706. fun(Node) ->
  707. S = {?MODULE, Node},
  708. Ref = erlang:monitor(process, S),
  709. ?MAY_FAIL(erlang:send(S, {'$gen_call', {self(), Ref}, Req},
  710. [noconnect])),
  711. {Node, Ref}
  712. end, Nodes),
  713. collect_replies(Refs, Key, TRef).
  714. collect_replies(Refs, Key, TRef) ->
  715. receive
  716. {gproc, _Ref, registered, {_, _, _}} = Result ->
  717. exit({ok, Result});
  718. {'DOWN', Ref, _, _, _} ->
  719. case lists:keydelete(Ref, 2, Refs) of
  720. [] ->
  721. exit(nodedown);
  722. Refs1 ->
  723. collect_replies(Refs1, Key, TRef)
  724. end;
  725. {timeout, TRef, gproc_timeout} ->
  726. exit(timeout);
  727. {Ref, Ref} ->
  728. %% ignore
  729. collect_replies(Refs, Key, TRef)
  730. end.
  731. %% @spec nb_wait(Key::key()) -> Ref
  732. %%
  733. %% @doc Wait for a name or aggregated counter to be registered.
  734. %% The caller can expect to receive a message,
  735. %% {gproc, Ref, registered, {Key, Pid, Value}}, once the name is registered.
  736. %% @end
  737. %%
  738. nb_wait(Key) ->
  739. ?CATCH_GPROC_ERROR(nb_wait1(Key), [Key]).
  740. %% @spec nb_wait(Node::node(), Key::key()) -> Ref
  741. %%
  742. %% @doc Wait for a name or aggregated counter to be registered on `Node'.
  743. %% The caller can expect to receive a message,
  744. %% {gproc, Ref, registered, {Key, Pid, Value}}, once the name is registered.
  745. %% @end
  746. %%
  747. nb_wait(Node, Key) ->
  748. ?CATCH_GPROC_ERROR(nb_wait1(Node, Key), [Node, Key]).
  749. nb_wait1({T,g,_} = Key) when T=:=n; T=:=a; T=:=rc ->
  750. ?CHK_DIST,
  751. call({await, Key, self()}, g);
  752. nb_wait1({T,l,_} = Key) when T=:=n; T=:=a; T=:=rc ->
  753. call({await, Key, self()}, l);
  754. nb_wait1(_) ->
  755. ?THROW_GPROC_ERROR(badarg).
  756. nb_wait1(Node, {T,l,_} = Key) when is_atom(Node), T=:=n;
  757. is_atom(Node), T=:=a;
  758. is_atom(Node), T=:=rc ->
  759. call(Node, {await, Key, self()}, l).
  760. %% @spec cancel_wait(Key::key(), Ref) -> ok
  761. %% Ref = all | reference()
  762. %%
  763. %% @doc Cancels a previous call to nb_wait/1
  764. %%
  765. %% If `Ref = all', all wait requests on `Key' from the calling process
  766. %% are canceled.
  767. %% @end
  768. %%
  769. cancel_wait(Key, Ref) ->
  770. ?CATCH_GPROC_ERROR(cancel_wait1(Key, Ref), [Key, Ref]).
  771. %% @spec cancel_wait(Node::node(), Key::key(), Ref) -> ok
  772. %% Ref = all | reference()
  773. %%
  774. %% @doc Cancels a previous call to nb_wait/2
  775. %%
  776. %% This function works just like {@link cancel_wait/2}, but talks to a remote
  777. %% node.
  778. %% @end
  779. %%
  780. cancel_wait(N, Key, Ref) when N == node() ->
  781. cancel_wait(Key, Ref);
  782. cancel_wait(N, Key, Ref) ->
  783. ?CATCH_GPROC_ERROR(cancel_wait1(N, Key, Ref), [N, Key, Ref]).
  784. cancel_wait1({_,g,_} = Key, Ref) ->
  785. ?CHK_DIST,
  786. cast({cancel_wait, self(), Key, Ref}, g),
  787. ok;
  788. cancel_wait1({_,l,_} = Key, Ref) ->
  789. cast({cancel_wait, self(), Key, Ref}, l),
  790. ok.
  791. cancel_wait1(undefined, {_,g,_} = Key, Ref) ->
  792. cast({cancel_wait, self(), Key, Ref}, g);
  793. cancel_wait1(N, {_,l,_} = Key, Ref) ->
  794. cast(N, {cancel_wait, self(), Key, Ref}, l).
  795. cancel_wait_or_monitor(Key) ->
  796. ?CATCH_GPROC_ERROR(cancel_wait_or_monitor1(Key), [Key]).
  797. cancel_wait_or_monitor1({_,g,_} = Key) ->
  798. ?CHK_DIST,
  799. cast({cancel_wait_or_monitor, self(), Key}, g),
  800. ok;
  801. cancel_wait_or_monitor1({_,l,_} = Key) ->
  802. cast({cancel_wait_or_monitor, self(), Key}, l),
  803. ok.
  804. %% @equiv monitor(Key, info)
  805. monitor(Key) ->
  806. ?CATCH_GPROC_ERROR(monitor1(Key, info), [Key]).
  807. %% @spec monitor(key(), monitor_type()) -> reference()
  808. %%
  809. %% @doc monitor a registered name
  810. %% `monitor(Key, info)' works much like erlang:monitor(process, Pid), but monitors
  811. %% a unique name registered via gproc. A message, `{gproc, unreg, Ref, Key}'
  812. %% will be sent to the requesting process, if the name is unregistered or
  813. %% the registered process dies. If there is a standby monitor (see below), a
  814. %% message `{gproc, {failover, ToPid}, Ref, Key}' is sent to all monitors.
  815. %% If the name is passed to another process using {@link give_away/2}, the event
  816. %% `{gproc, {migrated, ToPid}, Ref, Key}' is sent to all monitors.
  817. %%
  818. %% `monitor(Key, standby)' sets up the monitoring process as a standby for the
  819. %% registered name. If the registered process dies, the first standby process
  820. %% inherits the name, and a message `{gproc, {failover, ToPid}, Ref, Key}' is
  821. %% sent to all monitors, including the one that inherited the name.
  822. %%
  823. %% If the name is not yet registered, the unreg event is sent immediately.
  824. %% If the calling process in this case tried to start a `standby' monitoring,
  825. %% it receives the registered name and the failover event immediately.
  826. %%
  827. %% `monitor(Key, follow)' keeps monitoring the registered name even if it is
  828. %% temporarily unregistered. The messages received are the same as for the other
  829. %% monitor types, but `{gproc, registered, Ref, Key}' is also sent when a new
  830. %% process registers the name.
  831. %% @end
  832. monitor(Key, Type) when Type==info;
  833. Type==follow;
  834. Type==standby ->
  835. ?CATCH_GPROC_ERROR(monitor1(Key, Type), [Key, Type]).
  836. monitor1({T,g,_} = Key, Type) when T==n; T==a ->
  837. ?CHK_DIST,
  838. gproc_dist:monitor(Key, Type);
  839. monitor1({T,l,_} = Key, Type) when T==n; T==a ->
  840. call({monitor, Key, self(), Type}, l);
  841. monitor1(_, _) ->
  842. ?THROW_GPROC_ERROR(badarg).
  843. %% @spec demonitor(key(), reference()) -> ok
  844. %%
  845. %% @doc Remove a monitor on a registered name
  846. %% This function is the reverse of monitor/1. It removes a monitor previously
  847. %% set on a unique name. This function always succeeds given legal input.
  848. %% @end
  849. demonitor(Key, Ref) ->
  850. ?CATCH_GPROC_ERROR(demonitor1(Key, Ref), [Key, Ref]).
  851. demonitor1({T,g,_} = Key, Ref) when T==n; T==a ->
  852. ?CHK_DIST,
  853. gproc_dist:demonitor(Key, Ref);
  854. demonitor1({T,l,_} = Key, Ref) when T==n; T==a ->
  855. call({demonitor, Key, Ref, self()}, l);
  856. demonitor1(_, _) ->
  857. ?THROW_GPROC_ERROR(badarg).
  858. %% @spec reg(Key::key(), Value::value()) -> true
  859. %%
  860. %% @doc Register a name or property for the current process
  861. %%
  862. %%
  863. reg(Key, Value) ->
  864. ?CATCH_GPROC_ERROR(reg1(Key, Value, [], reg), [Key, Value]).
  865. %% @spec reg(Key::key(), Value::value(), Attrs::attrs()) -> true
  866. %%
  867. %% @doc Register a name or property for the current process
  868. %% `Attrs' (default: `[]') can be inspected using {@link get_attribute/2}.
  869. %%
  870. %% The structure of `Key' is `{Type, Context, Name}', where:
  871. %%
  872. %% * `Context :: l | g' - `l' means 'local' context; `g' means 'global'
  873. %% * `Type :: p | n | c | a | r | rc' specifies the type of entry
  874. %%
  875. %% The semantics of the different types:
  876. %%
  877. %% * `p' - 'property', is non-unique, i.e. different processes can each
  878. %% register a property with the same name.
  879. %% * `n' - 'name, is unique within the given context (local or global).
  880. %% * `c' - 'counter', is similar to a property, but has a numeric value
  881. %% and behaves roughly as an ets counter (see {@link update_counter/2}.)
  882. %% * `a' - 'aggregated counter', is automatically updated by gproc, and
  883. %% reflects the sum of all counter objects with the same name in the given
  884. %% scope. The initial value for an aggregated counter must be `undefined'.
  885. %% * `r' - 'resource property', behaves like a property, but can be tracked
  886. %% with a 'resource counter'.
  887. %% * `rc' - 'resource counter', tracks the number of resource properties
  888. %% with the same name. When the resource count reaches `0', any triggers
  889. %% specified using an `on_zero' attribute may be executed (see below).
  890. %%
  891. %% On-zero triggers:
  892. %%
  893. %% `Msg = {gproc, resource_on_zero, Context, Name, Pid}'
  894. %%
  895. %% * `{send, Key}' - run `gproc:send(Key, Msg)'
  896. %% * `{bcast, Key}' - run `gproc:bcast(Key, Msg)'
  897. %% * `publish' - run
  898. %% `gproc_ps:publish(Context, gproc_resource_on_zero, {Context, Name, Pid})'
  899. %% * `{unreg_shared, Type, Name}' - unregister the shared key
  900. %% `{Type, Context, Name}'
  901. %% @end
  902. reg(Key, Value, Attrs) ->
  903. ?CATCH_GPROC_ERROR(reg1(Key, Value, Attrs, reg), [Key, Value, Attrs]).
  904. %% @equiv ensure_reg(Key, default(Key), [])
  905. ensure_reg(Key) ->
  906. ?CATCH_GPROC_ERROR(reg1(Key, ensure), [Key]).
  907. %% @equiv ensure_reg(Key, Value, [])
  908. -spec ensure_reg(key(), value()) -> new | updated.
  909. ensure_reg(Key, Value) ->
  910. ?CATCH_GPROC_ERROR(reg1(Key, Value, ensure), [Key, Value]).
  911. %% @spec ensure_reg(Key::key(), Value::value(), Attrs::attrs()) ->
  912. %% new | updated
  913. %%
  914. %% @doc Registers a new name or property unless such and entry (by key) has
  915. %% already been registered by the current process. If `Key' already exists,
  916. %% the entry will be updated with the given `Value' and `Attrs'.
  917. %%
  918. %% This function allows the caller to efficiently register an entry without
  919. %% first checking whether it has already been registered. An exception is
  920. %% raised if the name or property is already registered by someone else.
  921. %% @end
  922. -spec ensure_reg(key(), value(), attrs()) -> new | updated.
  923. ensure_reg(Key, Value, Attrs) ->
  924. ?CATCH_GPROC_ERROR(reg1(Key, Value, Attrs, ensure), [Key, Value, Attrs]).
  925. reg1(Key, Op) ->
  926. reg1(Key, default(Key), [], Op).
  927. reg1(Key, Value, Op) ->
  928. reg1(Key, Value, [], Op).
  929. reg1({T,g,_} = Key, Value, As, Op) when T==p; T==a; T==c; T==n; T==r; T==rc ->
  930. %% anything global
  931. ?CHK_DIST,
  932. gproc_dist:reg(Key, Value, As, Op);
  933. reg1({p,l,_} = Key, Value, As, Op) ->
  934. local_reg(Key, Value, As, Op);
  935. reg1({a,l,_} = Key, undefined, As, Op) ->
  936. call({reg, Key, undefined, As, Op});
  937. reg1({c,l,_} = Key, Value, As, Op) when is_integer(Value) ->
  938. call({reg, Key, Value, As, Op});
  939. reg1({n,l,_} = Key, Value, As, Op) ->
  940. call({reg, Key, Value, As, Op});
  941. reg1({r,l,_} = Key, Value, As, Op) ->
  942. call({reg, Key, Value, As, Op});
  943. reg1({rc,l,_} = Key, Value, As, Op) ->
  944. call({reg, Key, Value, As, Op});
  945. reg1(_, _, _, _) ->
  946. ?THROW_GPROC_ERROR(badarg).
  947. %% @equiv reg_other(Key, Pid, default(Key), [])
  948. reg_other(Key, Pid) ->
  949. ?CATCH_GPROC_ERROR(reg_other1(Key, Pid, reg), [Key, Pid]).
  950. %% @equiv reg_other(Key, Pid, Value, [])
  951. reg_other(Key, Pid, Value) ->
  952. ?CATCH_GPROC_ERROR(reg_other1(Key, Pid, Value, [], reg), [Key, Pid, Value]).
  953. %% @spec reg_other(Key, Pid, Value, Attrs) -> true
  954. %% @doc Register name or property to another process.
  955. %%
  956. %% Equivalent to {@link reg/3}, but allows for registration of another process
  957. %% instead of the current process.
  958. %%
  959. %% Note that registering other processes introduces the possibility of
  960. %% confusing race conditions in user code. Letting each process register
  961. %% its own resources is highly recommended.
  962. %%
  963. %% Only the following resource types can be registered through this function:
  964. %%
  965. %% * `n' - unique names
  966. %% * `a' - aggregated counters
  967. %% * `r' - resource properties
  968. %% * `rc' - resource counters
  969. %% @end
  970. reg_other(Key, Pid, Value, Attrs) ->
  971. ?CATCH_GPROC_ERROR(reg_other1(Key, Pid, Value, Attrs, reg),
  972. [Key, Pid, Value, Attrs]).
  973. ensure_reg_other(Key, Pid) ->
  974. ?CATCH_GPROC_ERROR(reg_other1(Key, Pid, ensure), [Key, Pid]).
  975. %% @equiv ensure_reg_other(Key, Pid, Value, [])
  976. ensure_reg_other(Key, Pid, Value) ->
  977. ?CATCH_GPROC_ERROR(reg_other1(Key, Pid, Value, [], ensure),
  978. [Key, Pid, Value]).
  979. %% @spec ensure_reg_other(Key::key(), Pid::pid(),
  980. %% Value::value(), Attrs::attrs()) ->
  981. %% new | updated
  982. %%
  983. %% @doc Register or update name or property to another process.
  984. %%
  985. %% Equivalent to {@link reg_other/3}, but allows for registration of another
  986. %% process instead of the current process. Also see {@link ensure_reg/3}.
  987. %% @end
  988. ensure_reg_other(Key, Pid, Value, Attrs) ->
  989. ?CATCH_GPROC_ERROR(reg_other1(Key, Pid, Value, Attrs, ensure),
  990. [Key, Pid, Value, Attrs]).
  991. reg_other1(Key, Pid, Op) ->
  992. reg_other1(Key, Pid, default(Key), [], Op).
  993. reg_other1({_,g,_} = Key, Pid, Value, As, Op) when is_pid(Pid) ->
  994. ?CHK_DIST,
  995. gproc_dist:reg_other(Key, Pid, Value, As, Op);
  996. reg_other1({T,l,_} = Key, Pid, Value, As, Op) when is_pid(Pid) ->
  997. if T==n; T==a; T==r; T==rc ->
  998. call({reg_other, Key, Pid, Value, As, Op});
  999. true ->
  1000. ?THROW_GPROC_ERROR(badarg)
  1001. end.
  1002. %% @spec reg_or_locate(Key::key(), Value) -> {pid(), NewValue}
  1003. %%
  1004. %% @doc Try registering a unique name, or return existing registration.
  1005. %%
  1006. %% This function tries to register the name `Key', if available.
  1007. %% If such a registration object already exists, the pid and value of
  1008. %% the current registration is returned instead.
  1009. %% @end
  1010. reg_or_locate(Key, Value) ->
  1011. ?CATCH_GPROC_ERROR(reg_or_locate1(Key, Value, self()), [Key, Value]).
  1012. %% @spec reg_or_locate(Key::key(), Value, Fun::fun()) -> {pid(), NewValue}
  1013. %%
  1014. %% @doc Spawn a process with a registered name, or return existing registration.
  1015. %%
  1016. %% This function checks whether a local name is registered; if not, it spawns
  1017. %% a new process (with `spawn(Fun)') and gives it the name.
  1018. %% The pid and value of the resulting registration is returned.
  1019. %%
  1020. %% When a global name is registered in this fashion, the process is
  1021. %% spawned on the caller's node, and the group_leader of the spawned
  1022. %% process is set to the group_leader of the calling process.
  1023. %% @end
  1024. reg_or_locate({n,_,_} = Key, Value, F) when is_function(F, 0) ->
  1025. ?CATCH_GPROC_ERROR(reg_or_locate1(Key, Value, F), [Key, Value, F]).
  1026. reg_or_locate1({_,g,_} = Key, Value, P) ->
  1027. ?CHK_DIST,
  1028. gproc_dist:reg_or_locate(Key, Value, P);
  1029. reg_or_locate1({T,l,_} = Key, Value, P) when T==n; T==a; T==rc ->
  1030. call({reg_or_locate, Key, Value, P});
  1031. reg_or_locate1(_, _, _) ->
  1032. ?THROW_GPROC_ERROR(badarg).
  1033. %% @spec reg_shared(Key::key()) -> true
  1034. %%
  1035. %% @doc Register a resource, but don't tie it to a particular process.
  1036. %%
  1037. %% `reg_shared({c,l,C}) -> reg_shared({c,l,C}, 0).'
  1038. %% `reg_shared({a,l,A}) -> reg_shared({a,l,A}, undefined).'
  1039. %% @end
  1040. reg_shared(Key) ->
  1041. ?CATCH_GPROC_ERROR(reg_shared1(Key), [Key]).
  1042. %% @private
  1043. reg_shared1({T,_,_} = Key) when T==a; T==p; T==c ->
  1044. reg_shared(Key, default(Key)).
  1045. %% @spec reg_shared(Key::key(), Value) -> true
  1046. %%
  1047. %% @doc Register a resource, but don't tie it to a particular process.
  1048. %%
  1049. %% Shared resources are all unique. They remain until explicitly unregistered
  1050. %% (using {@link unreg_shared/1}). The types of shared resources currently
  1051. %% supported are `counter' and `aggregated counter'. In listings and query
  1052. %% results, shared resources appear as other similar resources, except that
  1053. %% `Pid == shared'. To wit, update_counter({c,l,myCounter}, shared, 1) would
  1054. %% increment the shared counter `myCounter' with 1, provided it exists.
  1055. %%
  1056. %% A shared aggregated counter will track updates in exactly the same way as
  1057. %% an aggregated counter which is owned by a process.
  1058. %% @end
  1059. %%
  1060. reg_shared(Key, Value) ->
  1061. ?CATCH_GPROC_ERROR(reg_shared1(Key, Value, []), [Key, Value]).
  1062. reg_shared(Key, Value, Attrs) when is_list(Attrs) ->
  1063. ?CATCH_GPROC_ERROR(reg_shared1(Key, Value, Attrs), [Key, Value, Attrs]).
  1064. %% @private
  1065. reg_shared1({_,g,_} = Key, Value, As) ->
  1066. %% anything global
  1067. ?CHK_DIST,
  1068. gproc_dist:reg_shared(Key, Value, As);
  1069. reg_shared1({a,l,_} = Key, undefined, As) ->
  1070. call({reg_shared, Key, undefined, As, reg});
  1071. reg_shared1({c,l,_} = Key, Value, As) when is_integer(Value) ->
  1072. call({reg_shared, Key, Value, As, reg});
  1073. reg_shared1({p,l,_} = Key, Value, As) ->
  1074. call({reg_shared, Key, Value, As, reg});
  1075. reg_shared1({rc,l,_} = Key, undefined, As) ->
  1076. call({reg_shared, Key, undefined, As, reg});
  1077. reg_shared1(_, _, _) ->
  1078. ?THROW_GPROC_ERROR(badarg).
  1079. %% @spec mreg(type(), scope(), [{Key::any(), Value::any()}]) -> true
  1080. %%
  1081. %% @doc Register multiple {Key,Value} pairs of a given type and scope.
  1082. %%
  1083. %% This function is more efficient than calling {@link reg/2} repeatedly.
  1084. %% It is also atomic in regard to unique names; either all names are registered
  1085. %% or none are.
  1086. %% @end
  1087. mreg(T, C, KVL) ->
  1088. ?CATCH_GPROC_ERROR(mreg1(T, C, KVL), [T, C, KVL]).
  1089. mreg1(T, g, KVL) ->
  1090. ?CHK_DIST,
  1091. gproc_dist:mreg(T, KVL);
  1092. mreg1(T, l, KVL) when T==a; T==n ->
  1093. if is_list(KVL) ->
  1094. call({mreg, T, l, KVL});
  1095. true ->
  1096. erlang:error(badarg)
  1097. end;
  1098. mreg1(p, l, KVL) ->
  1099. local_mreg(p, KVL);
  1100. mreg1(_, _, _) ->
  1101. ?THROW_GPROC_ERROR(badarg).
  1102. %% @spec munreg(type(), scope(), [Key::any()]) -> true
  1103. %%
  1104. %% @doc Unregister multiple Key items of a given type and scope.
  1105. %%
  1106. %% This function is usually more efficient than calling {@link unreg/1}
  1107. %% repeatedly.
  1108. %% @end
  1109. munreg(T, C, L) ->
  1110. ?CATCH_GPROC_ERROR(munreg1(T, C, L), [T, C, L]).
  1111. munreg1(T, g, L) ->
  1112. ?CHK_DIST,
  1113. gproc_dist:munreg(T, existing(T,g,L));
  1114. munreg1(T, l, L) when T==a; T==n ->
  1115. if is_list(L) ->
  1116. call({munreg, T, l, existing(T,l,L)});
  1117. true ->
  1118. erlang:error(badarg)
  1119. end;
  1120. munreg1(p, l, L) ->
  1121. local_munreg(p, existing(p,l,L));
  1122. munreg1(_, _, _) ->
  1123. ?THROW_GPROC_ERROR(badarg).
  1124. existing(T,Scope,L) ->
  1125. Keys = if T==p; T==c ->
  1126. [{{T,Scope,K}, self()} || K <- L];
  1127. T==a; T==n ->
  1128. [{{T,Scope,K}, T} || K <- L]
  1129. end,
  1130. _ = [case ets:member(?TAB, K) of
  1131. false -> erlang:error(badarg);
  1132. true -> true
  1133. end || K <- Keys],
  1134. L.
  1135. %% @spec (Key:: key()) -> true
  1136. %%
  1137. %% @doc Unregister a name or property.
  1138. %% @end
  1139. unreg(Key) ->
  1140. ?CATCH_GPROC_ERROR(unreg1(Key), [Key]).
  1141. unreg1(Key) ->
  1142. case Key of
  1143. {_, g, _} ->
  1144. ?CHK_DIST,
  1145. gproc_dist:unreg(Key);
  1146. {T, l, _} when T == n; T == a; T == r; T == rc ->
  1147. call({unreg, Key});
  1148. {_, l, _} ->
  1149. case ets:member(?TAB, {Key,self()}) of
  1150. true ->
  1151. _ = gproc_lib:remove_reg(Key, self(), unreg),
  1152. true;
  1153. false ->
  1154. ?THROW_GPROC_ERROR(badarg)
  1155. end
  1156. end.
  1157. %% @spec unreg_other(key(), pid()) -> true
  1158. %% @doc Unregister a name registered to another process.
  1159. %%
  1160. %% This function is equivalent to {@link unreg/1}, but specifies another
  1161. %% process as the holder of the registration. An exception is raised if the
  1162. %% name or property is not registered to the given process.
  1163. %% @end
  1164. unreg_other(Key, Pid) ->
  1165. ?CATCH_GPROC_ERROR(unreg_other1(Key, Pid), [Key, Pid]).
  1166. unreg_other1({_,g,_} = Key, Pid) ->
  1167. ?CHK_DIST,
  1168. gproc_dist:unreg_other(Key, Pid);
  1169. unreg_other1({T,l,_} = Key, Pid) when is_pid(Pid) ->
  1170. if T==n; T==a; T==r; T==rc ->
  1171. call({unreg_other, Key, Pid});
  1172. true ->
  1173. ?THROW_GPROC_ERROR(badarg)
  1174. end.
  1175. %% @spec (Key::key(), Props::[{atom(), any()}]) -> true
  1176. %%
  1177. %% @doc Add/modify `{Key, Value}' attributes associated with a registration.
  1178. %%
  1179. %% Gproc registration objects can have `{Key, Value}' attributes associated with
  1180. %% them. These are stored in a way that doesn't affect the cost of name lookup.
  1181. %%
  1182. %% Attributs can be retrieved using `gproc:get_attribute/3' or
  1183. %% `gproc:get_attributes/2'.
  1184. %% @end
  1185. set_attributes(Key, Props) ->
  1186. ?CATCH_GPROC_ERROR(set_attributes1(Key, Props), [Key, Props]).
  1187. set_attributes1(Key, Props) ->
  1188. case Key of
  1189. {_, g, _} ->
  1190. ?CHK_DIST,
  1191. gproc_dist:set_attributes(Key, Props);
  1192. {_, l, _} ->
  1193. call({set_attributes, Key, Props})
  1194. end.
  1195. %% @spec (Key:: key()) -> true
  1196. %%
  1197. %% @doc Unregister a shared resource.
  1198. %% @end
  1199. unreg_shared(Key) ->
  1200. ?CATCH_GPROC_ERROR(unreg_shared1(Key), [Key]).
  1201. %% @private
  1202. unreg_shared1(Key) ->
  1203. case Key of
  1204. {_, g, _} ->
  1205. ?CHK_DIST,
  1206. gproc_dist:unreg_shared(Key);
  1207. {T, l, _} when T == c;
  1208. T == a;
  1209. T == p;
  1210. T == rc -> call({unreg_shared, Key});
  1211. _ ->
  1212. ?THROW_GPROC_ERROR(badarg)
  1213. end.
  1214. %% @spec (Key::key(), Props::[{K,V}]) -> true
  1215. %% @doc Add/modify `{Key, Value}' attributes associated with a shared registration.
  1216. %%
  1217. %% Gproc registration objects can have `{Key, Value}' attributes associated with
  1218. %% them. These are stored in a way that doesn't affect the cost of name lookup.
  1219. %%
  1220. %% Attributes can be retrieved using `gproc:get_attribute/3' or
  1221. %% `gproc:get_attributes/2'.
  1222. %% @end
  1223. %%
  1224. set_attributes_shared(Key, Attrs) ->
  1225. ?CATCH_GPROC_ERROR(set_attributes_shared1(Key, Attrs), [Key, Attrs]).
  1226. set_attributes_shared1(Key, Attrs) ->
  1227. case Key of
  1228. {_, g, _} ->
  1229. ?CHK_DIST,
  1230. gproc_dist:set_attributes_shared(Key, Attrs);
  1231. {_, l, _} ->
  1232. call({set_attributes_shared, Key, Attrs})
  1233. end.
  1234. %% @spec (key(), pid()) -> yes | no
  1235. %%
  1236. %% @doc Behaviour support callback
  1237. %% @end
  1238. register_name({n,_,_} = Name, Pid) when Pid == self() ->
  1239. try reg(Name), yes
  1240. catch
  1241. error:_ ->
  1242. no
  1243. end.
  1244. %% @equiv unreg/1
  1245. unregister_name(Key) ->
  1246. unreg(Key).
  1247. %% @spec select(Arg) -> [Match] | {[Match], Continuation} | '$end_of_table'
  1248. %% where Arg = Continuation
  1249. %% | sel_pattern()
  1250. %% Match = {Key, Pid, Value}
  1251. %% @doc Perform a select operation on the process registry
  1252. %%
  1253. %% When Arg = Contination, resume a gproc:select/1 operation
  1254. %% (see {@link //stdlib/ets:select/1. ets:select/1}
  1255. %%
  1256. %% When Arg = {@type sel_pattern()}, this function executes a select operation,
  1257. %% emulating ets:select/1
  1258. %%
  1259. %% {@link select/2} offers the opportunity to narrow the search
  1260. %% (by limiting to only global or local scope, or a single type of object).
  1261. %% When only a pattern as single argument is given, both global and local scope,
  1262. %% as well as all types of object can be searched. Note that the pattern may
  1263. %% still limit the select operation so that scanning the entire table is avoided.
  1264. %%
  1265. %% The physical representation in the registry may differ from the above,
  1266. %% but the select patterns are transformed appropriately. The logical
  1267. %% representation for the gproc select operations is given by
  1268. %% {@type headpat()}.
  1269. %% @end
  1270. select({?TAB, _, _, _, _, _, _, _} = Continuation) ->
  1271. ets:select(Continuation);
  1272. select(Pat) ->
  1273. select(all, Pat).
  1274. %% @spec (Context::sel_context(), Pat::sel_pattern()) -> [{Key, Pid, Value}]
  1275. %%
  1276. %% @doc Perform a select operation with limited context on the process registry
  1277. %%
  1278. %% The physical representation in the registry may differ from the above,
  1279. %% but the select patterns are transformed appropriately.
  1280. %%
  1281. %% Note that limiting the context is just a convenience function, allowing you
  1282. %% to write a simpler select pattern and still avoid searching the entire
  1283. %% registry. Whenever variables are used in the head pattern, this will result
  1284. %% in a wider scan, even if the values are restricted through a guard (e.g.
  1285. %% <code>select([{'$1','$2','$3'}, [{'==', {element,1,'$1'}, p}], ...])</code>
  1286. %% will count as a wild pattern on the key and result in a full scan).
  1287. %% In this case, specifying a Context will allow gproc to perform some
  1288. %% variable substitution and ensure that the scan is limited.
  1289. %% @end
  1290. select(Context, Pat) ->
  1291. ets:select(?TAB, pattern(Pat, Context)).
  1292. %% @spec (Context::context(), Pat::sel_patten(), Limit::integer()) ->
  1293. %% {[Match],Continuation} | '$end_of_table'
  1294. %% @doc Like {@link select/2} but returns Limit objects at a time.
  1295. %%
  1296. %% See [http://www.erlang.org/doc/man/ets.html#select-3].
  1297. %% @end
  1298. select(Context, Pat, Limit) ->
  1299. ets:select(?TAB, pattern(Pat, Context), Limit).
  1300. %% @spec (sel_pattern()) -> list(sel_object())
  1301. %% @doc
  1302. %% @equiv select_count(all, Pat)
  1303. %% @end
  1304. select_count(Pat) ->
  1305. select_count(all, Pat).
  1306. %% @spec (Context::context(), Pat::sel_pattern()) -> [{Key, Pid, Value}]
  1307. %%
  1308. %% @doc Perform a select_count operation on the process registry.
  1309. %%
  1310. %% The physical representation in the registry may differ from the above,
  1311. %% but the select patterns are transformed appropriately.
  1312. %% @end
  1313. select_count(Context, Pat) ->
  1314. ets:select_count(?TAB, pattern(Pat, Context)).
  1315. %%% Local properties can be registered in the local process, since
  1316. %%% no other process can interfere.
  1317. %%%
  1318. local_reg({_,Scope,_} = Key, Value, As, Op) ->
  1319. case gproc_lib:insert_reg(Key, Value, self(), l) of
  1320. false ->
  1321. case ets:member(?TAB, {Key, self()}) of
  1322. true when Op == ensure ->
  1323. gproc_lib:do_set_value(Key, Value, self()),
  1324. set_attrs(As, Key, self()),
  1325. updated;
  1326. _ ->
  1327. ?THROW_GPROC_ERROR(badarg)
  1328. end;
  1329. true ->
  1330. monitor_me(),
  1331. if As =/= [] ->
  1332. gproc_lib:insert_attr(Key, As, self(), Scope),
  1333. regged_new(Op);
  1334. true ->
  1335. regged_new(Op)
  1336. end
  1337. end.
  1338. regged_new(reg ) -> true;
  1339. regged_new(ensure) -> new.
  1340. local_mreg(_, []) -> true;
  1341. local_mreg(T, [_|_] = KVL) ->
  1342. case gproc_lib:insert_many(T, l, KVL, self()) of
  1343. false -> ?THROW_GPROC_ERROR(badarg);
  1344. {true,_} -> monitor_me()
  1345. end.
  1346. local_munreg(T, L) when T==p; T==c ->
  1347. _ = [gproc_lib:remove_reg({T,l,K}, self(), unreg) || K <- L],
  1348. true.
  1349. %% @spec (Key :: key(), Value) -> true
  1350. %% @doc Sets the value of the registration given by Key
  1351. %%
  1352. %% Key is assumed to exist and belong to the calling process.
  1353. %% If it doesn't, this function will exit.
  1354. %%
  1355. %% Value can be any term, unless the object is a counter, in which case
  1356. %% it must be an integer.
  1357. %% @end
  1358. %%
  1359. set_value(Key, Value) ->
  1360. ?CATCH_GPROC_ERROR(set_value1(Key, Value), [Key, Value]).
  1361. %% @spec (Key :: key(), Value) -> true
  1362. %% @doc Sets the value of the shared registration given by Key
  1363. %%
  1364. %% Key is assumed to exist as a shared entity.
  1365. %% If it doesn't, this function will exit.
  1366. %%
  1367. %% Value can be any term, unless the object is a counter, in which case
  1368. %% it must be an integer.
  1369. %% @end
  1370. %%
  1371. set_value_shared({T,_,_} = Key, Value) when T == c;
  1372. T == a;
  1373. T == p;
  1374. T == r ->
  1375. ?CATCH_GPROC_ERROR(set_value_shared1(Key, Value), [Key, Value]).
  1376. set_value1({_,g,_} = Key, Value) ->
  1377. ?CHK_DIST,
  1378. gproc_dist:set_value(Key, Value);
  1379. set_value1({a,l,_} = Key, Value) when is_integer(Value) ->
  1380. call({set, Key, Value});
  1381. set_value1({n,l,_} = Key, Value) ->
  1382. %% we cannot do this locally, since we have to check that the object
  1383. %% exists first - not an atomic update.
  1384. call({set, Key, Value});
  1385. set_value1({p,l,_} = Key, Value) ->
  1386. %% we _can_ to this locally, since there is no race condition - no
  1387. %% other process can update our properties.
  1388. case gproc_lib:do_set_value(Key, Value, self()) of
  1389. true -> true;
  1390. false ->
  1391. erlang:error(badarg)
  1392. end;
  1393. set_value1({c,l,_} = Key, Value) when is_integer(Value) ->
  1394. gproc_lib:do_set_counter_value(Key, Value, self());
  1395. set_value1(_, _) ->
  1396. ?THROW_GPROC_ERROR(badarg).
  1397. set_value_shared1({_,g,_} = Key, Value) ->
  1398. ?CHK_DIST,
  1399. gproc_dist:set_value_shared(Key, Value);
  1400. set_value_shared1({_,l,_} = Key, Value) ->
  1401. call({set_shared, Key, Value}).
  1402. %% @spec (Key) -> Value
  1403. %% @doc Reads the value stored with a key registered to the current process.
  1404. %%
  1405. %% If no such key is registered to the current process, this function exits.
  1406. %% @end
  1407. get_value(Key) ->
  1408. ?CATCH_GPROC_ERROR(get_value1(Key, self()), [Key]).
  1409. %% @spec (Key) -> Value
  1410. %% @doc Reads the value stored with a shared key.
  1411. %%
  1412. %% If no such shared key is registered, this function exits.
  1413. %% @end
  1414. get_value_shared(Key) ->
  1415. ?CATCH_GPROC_ERROR(get_value1(Key, shared), [Key]).
  1416. %% @spec (Key, Pid) -> Value
  1417. %% @doc Reads the value stored with a key registered to the process Pid.
  1418. %%
  1419. %% If `Pid == shared', the value of a shared key (see {@link reg_shared/1})
  1420. %% will be read.
  1421. %% @end
  1422. %%
  1423. get_value(Key, Pid) ->
  1424. ?CATCH_GPROC_ERROR(get_value1(Key, Pid), [Key, Pid]).
  1425. get_value1({T,_,_} = Key, Pid) when is_pid(Pid) ->
  1426. if T==n; T==a; T==rc ->
  1427. case ets:lookup(?TAB, {Key, T}) of
  1428. [{_, P, Value}] when P == Pid -> Value;
  1429. _ -> ?THROW_GPROC_ERROR(badarg)
  1430. end;
  1431. true ->
  1432. ets:lookup_element(?TAB, {Key, Pid}, 3)
  1433. end;
  1434. get_value1({T,_,_} = K, shared) when T==c; T==a; T==p; T==r ->
  1435. Key = case T of
  1436. c -> {K, shared};
  1437. p -> {K, shared};
  1438. r -> {K, shared};
  1439. a -> {K, a};
  1440. rc -> {K, rc}
  1441. end,
  1442. case ets:lookup(?TAB, Key) of
  1443. [{_, shared, Value}] -> Value;
  1444. _ -> ?THROW_GPROC_ERROR(badarg)
  1445. end;
  1446. get_value1(_, _) ->
  1447. ?THROW_GPROC_ERROR(badarg).
  1448. %% @spec (Key, Attribute::atom()) -> Value
  1449. %% @doc Get attribute value of `Attr' associated with `Key' for most likely Pid.
  1450. %%
  1451. %% The most likely Pid in this case is `self()' for properties and counters,
  1452. %% and the current registration holder in case of names or aggregated counters.
  1453. %% An exception is raised if `Key' is not registered for the given process.
  1454. %% @end
  1455. get_attribute(Key, A) ->
  1456. Pid = case Key of
  1457. {T,_,_} when T==n; T==a; T==rc ->
  1458. where(Key);
  1459. {T,_,_} when T==p; T==c; T==r ->
  1460. self()
  1461. end,
  1462. ?CATCH_GPROC_ERROR(get_attribute1(Key, Pid, A), [Key, A]).
  1463. %% @spec (Key, Pid::pid() | shared, Attr::atom()) -> Value
  1464. %% @doc Get the attribute value of `Attr' associated with `Key' for process Pid.
  1465. %%
  1466. %% If `Pid == shared', the attribute of a shared key (see {@link reg_shared/1})
  1467. %% will be read.
  1468. %% @end
  1469. %%
  1470. get_attribute(Key, Pid, A) ->
  1471. ?CATCH_GPROC_ERROR(get_attribute1(Key, Pid, A), [Key, Pid, A]).
  1472. %% @spec (Key, Attr::atom()) -> Value
  1473. %% @doc Get the attribute value of `Attr' associated with the shared `Key'.
  1474. %%
  1475. %% Equivalent to `get_attribute(Key, shared, Attr)'
  1476. %% (see {@link get_attribute/3}).
  1477. %% @end
  1478. get_attribute_shared(Key, Attr) ->
  1479. ?CATCH_GPROC_ERROR(get_attribute1(Key, shared, Attr), [Key, Attr]).
  1480. %% @private
  1481. get_attribute1({_,_,_} = Key, Pid, A) when is_pid(Pid); Pid==shared ->
  1482. case ets:lookup(?TAB, {Pid, Key}) of
  1483. [{_, Attrs}] ->
  1484. case lists:keyfind(attrs, 1, Attrs) of
  1485. false -> undefined;
  1486. {_, As} ->
  1487. case lists:keyfind(A, 1, As) of
  1488. false -> undefined;
  1489. {_, V} -> V
  1490. end
  1491. end;
  1492. _ -> ?THROW_GPROC_ERROR(badarg)
  1493. end;
  1494. get_attribute1(_, _, _) ->
  1495. ?THROW_GPROC_ERROR(badarg).
  1496. %% @spec get_attributes(Key::key()) -> [{K, V}]
  1497. %% @doc Get attributes associated with registration.
  1498. %% @equiv get_attributes(Key, self())
  1499. %%
  1500. get_attributes(Key) ->
  1501. ?CATCH_GPROC_ERROR(get_attributes1(Key, self()), [Key]).
  1502. %% @spec (Key::key(), Pid::pid() | shared) -> [{K, V}]
  1503. %%
  1504. %% @doc Returns the list of attributes associated with the registration.
  1505. %%
  1506. %% This function raises a `badarg' exception if there is no corresponding
  1507. %% registration.
  1508. %%
  1509. get_attributes(Key, Pid) ->
  1510. ?CATCH_GPROC_ERROR(get_attributes1(Key, Pid), [Key, Pid]).
  1511. get_attributes1({_,_,_} = Key, Pid) when is_pid(Pid); Pid==shared ->
  1512. case ets:lookup(?TAB, {Pid, Key}) of
  1513. [{_, Attrs}] ->
  1514. case lists:keyfind(attrs, 1, Attrs) of
  1515. false -> [];
  1516. {_, As} -> As
  1517. end;
  1518. _ -> ?THROW_GPROC_ERROR(badarg)
  1519. end;
  1520. get_attributes1(_, _) ->
  1521. ?THROW_GPROC_ERROR(badarg).
  1522. %% @spec (Key) -> Pid
  1523. %% @doc Lookup the Pid stored with a key.
  1524. %%
  1525. %% This function raises a `badarg' exception if `Key' is not registered.
  1526. %% @end
  1527. lookup_pid({_T,_,_} = Key) ->
  1528. case where(Key) of
  1529. undefined -> erlang:error(badarg);
  1530. P -> P
  1531. end.
  1532. %% @spec (Key) -> Value
  1533. %% @doc Lookup the value stored with a key.
  1534. %%
  1535. %% This function raises a `badarg' exception if `Key' is not registered.
  1536. %% @end
  1537. lookup_value({T,_,_} = Key) ->
  1538. if T==n orelse T==a orelse T==rc ->
  1539. ets:lookup_element(?TAB, {Key,T}, 3);
  1540. true ->
  1541. erlang:error(badarg)
  1542. end.
  1543. %% @spec (Key::key()) -> pid() | undefined
  1544. %%
  1545. %% @doc Returns the pid registered as Key
  1546. %%
  1547. %% The type of registration must be either name or aggregated counter.
  1548. %% Otherwise this function will raise a `badarg' exception.
  1549. %% Use {@link lookup_pids/1} in these cases.
  1550. %% @end
  1551. %%
  1552. where(Key) ->
  1553. ?CATCH_GPROC_ERROR(where1(Key), [Key]).
  1554. where1({T,_,_}=Key) ->
  1555. if T==n orelse T==a orelse T==rc ->
  1556. case ets:lookup(?TAB, {Key,T}) of
  1557. [{_, P, _Value}] ->
  1558. case my_is_process_alive(P) of
  1559. true -> P;
  1560. false ->
  1561. undefined
  1562. end;
  1563. _ -> % may be [] or [{Key,Waiters}]
  1564. undefined
  1565. end;
  1566. true ->
  1567. ?THROW_GPROC_ERROR(badarg)
  1568. end.
  1569. %% @equiv where/1
  1570. whereis_name(Key) ->
  1571. ?CATCH_GPROC_ERROR(where1(Key), [Key]).
  1572. %% @spec (Key::key()) -> [pid()]
  1573. %%
  1574. %% @doc Returns a list of pids with the published key Key
  1575. %%
  1576. %% If the type of registration is either name or aggregated counter,
  1577. %% this function will return either an empty list, or a list of one pid.
  1578. %% For non-unique types, the return value can be a list of any length.
  1579. %%
  1580. %% Note: shared resources are not associated with any pid, and will
  1581. %% therefore be excluded.
  1582. %% @end
  1583. %%
  1584. lookup_pids({T,_,_} = Key) ->
  1585. L = if T==n orelse T==a orelse T==rc ->
  1586. ets:select(?TAB, [{{{Key,T}, '$1', '_'},
  1587. [{is_pid, '$1'}], ['$1']}]);
  1588. true ->
  1589. ets:select(?TAB, [{{{Key,'_'}, '$1', '_'},
  1590. [{is_pid, '$1'}], ['$1']}])
  1591. end,
  1592. [P || P <- L, my_is_process_alive(P)].
  1593. %% @spec (pid()) -> boolean()
  1594. %%
  1595. my_is_process_alive(P) when node(P) =:= node() ->
  1596. is_process_alive(P);
  1597. my_is_process_alive(_) ->
  1598. %% remote pid - assume true (too costly to find out)
  1599. true.
  1600. %% @spec (Key::key()) -> [{pid(), Value}]
  1601. %%
  1602. %% @doc Retrieve the `{Pid,Value}' pairs corresponding to Key.
  1603. %%
  1604. %% Key refer to any type of registry object. If it refers to a unique
  1605. %% object, the list will be of length 0 or 1. If it refers to a non-unique
  1606. %% object, the return value can be a list of any length.
  1607. %% @end
  1608. %%
  1609. lookup_values({T,_,_} = Key) ->
  1610. L = if T==n orelse T==a orelse T==rc ->
  1611. ets:select(?TAB, [{{{Key,T}, '$1', '$2'},[],[{{'$1','$2'}}]}]);
  1612. true ->
  1613. ets:select(?TAB, [{{{Key,'_'}, '$1', '$2'},[],[{{'$1','$2'}}]}])
  1614. end,
  1615. [Pair || {P,_} = Pair <- L, my_is_process_alive(P)].
  1616. %% @ spec (Key::key(), Incr) -> integer() | [integer()]
  1617. %% Incr = IncrVal | UpdateOp | [UpdateOp]
  1618. %% UpdateOp = IncrVal | {IncrVal, Threshold, SetValue}
  1619. %% IncrVal = integer()
  1620. %%
  1621. %% @doc Updates the counter registered as Key for the current process.
  1622. %%
  1623. %% This function works almost exactly like ets:update_counter/3
  1624. %% (see [http://www.erlang.org/doc/man/ets.html#update_counter-3]), but
  1625. %% will fail if the type of object referred to by Key is not a counter or
  1626. %% a unique name (update_counter/2 can be performed on names as well, but they
  1627. %% do not count as counter objects, and do not affect aggregated counters).
  1628. %%
  1629. %% Aggregated counters with the same name will be updated automatically.
  1630. %% The `UpdateOp' patterns are the same as for `ets:update_counter/3', except
  1631. %% that the position is omitted; in gproc, the value position is always `3'.
  1632. %%
  1633. %% If `Key' refers to a unique name, the operation will depend on the value
  1634. %% part of the registration being an integer(). While non-integer values are
  1635. %% not permitted at all for counter objects, it is the user's responsibility to
  1636. %% ensure that a name, on which `update_counter/2' is to be performed, has the
  1637. %% appropriate value type.
  1638. %% @end
  1639. %%
  1640. -spec update_counter(key(), increment()) -> integer().
  1641. update_counter(Key, Incr) ->
  1642. Pid = case Key of
  1643. {n,_,_} -> n;
  1644. {c,_,_} -> self()
  1645. end,
  1646. ?CATCH_GPROC_ERROR(update_counter1(Key, Pid, Incr), [Key, Incr]).
  1647. update_counter(Key, Pid, Incr) when is_pid(Pid);
  1648. Pid == shared; Pid == n ->
  1649. ?CATCH_GPROC_ERROR(update_counter1(Key, Pid, Incr), [Key, Pid, Incr]).
  1650. update_counter1({T,l,_} = Key, Pid, Incr) when T==c; T==n ->
  1651. gproc_lib:update_counter(Key, Incr, Pid);
  1652. update_counter1({T,g,_} = Key, Pid, Incr) when T==c; T==n ->
  1653. ?CHK_DIST,
  1654. gproc_dist:update_counter(Key, Pid, Incr);
  1655. update_counter1(_, _, _) ->
  1656. ?THROW_GPROC_ERROR(badarg).
  1657. %% @doc Update a list of counters
  1658. %%
  1659. %% This function is not atomic, except (in a sense) for global counters. For local counters,
  1660. %% it is more of a convenience function. For global counters, it is much more efficient
  1661. %% than calling `gproc:update_counter/2' for each individual counter.
  1662. %%
  1663. %% The return value is the corresponding list of `[{Counter, Pid, NewValue}]'.
  1664. %% @end
  1665. -spec update_counters(scope(), [{key(), pid(), increment()}]) ->
  1666. [{key(), pid(), integer()}].
  1667. update_counters(_, []) ->
  1668. [];
  1669. update_counters(l, [_|_] = Cs) ->
  1670. ?CATCH_GPROC_ERROR(update_counters1(Cs), [Cs]);
  1671. update_counters(g, [_|_] = Cs) ->
  1672. ?CHK_DIST,
  1673. gproc_dist:update_counters(Cs).
  1674. update_counters1([{{c,l,_} = Key, Pid, Incr}|T]) ->
  1675. [{Key, Pid, gproc_lib:update_counter(Key, Incr, Pid)}|update_counters1(T)];
  1676. update_counters1([]) ->
  1677. [];
  1678. update_counters1(_) ->
  1679. ?THROW_GPROC_ERROR(badarg).
  1680. %% @spec (Key) -> {ValueBefore, ValueAfter}
  1681. %% Key = {c, Scope, Name} | {n, Scope, Name}
  1682. %% Scope = l | g
  1683. %% ValueBefore = integer()
  1684. %% ValueAfter = integer()
  1685. %%
  1686. %% @doc Reads and resets a counter in a "thread-safe" way
  1687. %%
  1688. %% This function reads the current value of a counter and then resets it to its
  1689. %% initial value. The reset operation is done using {@link update_counter/2},
  1690. %% which allows for concurrent calls to {@link update_counter/2} without losing
  1691. %% updates. Aggregated counters are updated accordingly.
  1692. %%
  1693. %% If `Key' refers to a unique name, the operation will depend on the value
  1694. %% part of the registration being an integer(). While non-integer values are
  1695. %% not permitted at all for counter objects, it is the user's responsibility to
  1696. %% ensure that a name, on which `reset_counter/1' is to be performed, has the
  1697. %% appropriate value type.
  1698. %% @end
  1699. %%
  1700. reset_counter(Key) ->
  1701. ?CATCH_GPROC_ERROR(reset_counter1(Key), [Key]).
  1702. reset_counter1({T,g,_} = Key) when T==c; T==n ->
  1703. ?CHK_DIST,
  1704. gproc_dist:reset_counter(Key);
  1705. reset_counter1({n,l,_} = Key) ->
  1706. [{_, Pid, Current}] = ets:lookup(?TAB, {Key, n}),
  1707. {Current, update_counter(Key, get_initial(Pid, Key) - Current)};
  1708. reset_counter1({c,l,_} = Key) ->
  1709. Current = ets:lookup_element(?TAB, {Key, self()}, 3),
  1710. {Current, update_counter(Key, get_initial(self(), Key) - Current)}.
  1711. get_initial(Pid, Key) ->
  1712. case ets:lookup(?TAB, {Pid, Key}) of
  1713. [{_, r}] -> 0;
  1714. [{_, Opts}] ->
  1715. proplists:get_value(initial, Opts, 0)
  1716. end.
  1717. %% @spec (Key::key(), Incr) -> integer() | [integer()]
  1718. %% Incr = IncrVal | UpdateOp | [UpdateOp]
  1719. %% UpdateOp = IncrVal | {IncrVal, Threshold, SetValue}
  1720. %% IncrVal = integer()
  1721. %%
  1722. %% @doc Updates the shared counter registered as Key.
  1723. %%
  1724. %% This function works almost exactly like ets:update_counter/3
  1725. %% (see [http://www.erlang.org/doc/man/ets.html#update_counter-3]), but
  1726. %% will fail if the type of object referred to by Key is not a counter.
  1727. %%
  1728. %% Aggregated counters with the same name will be updated automatically.
  1729. %% The `UpdateOp' patterns are the same as for `ets:update_counter/3', except
  1730. %% that the position is omitted; in gproc, the value position is always `3'.
  1731. %% @end
  1732. %%
  1733. update_shared_counter(Key, Incr) ->
  1734. ?CATCH_GPROC_ERROR(update_shared_counter1(Key, Incr), [Key, Incr]).
  1735. update_shared_counter1({c,g,_} = Key, Incr) ->
  1736. ?CHK_DIST,
  1737. gproc_dist:update_shared_counter(Key, Incr);
  1738. update_shared_counter1({c,l,_} = Key, Incr) ->
  1739. gproc_lib:update_counter(Key, Incr, shared).
  1740. %% @spec (From::key(), To::pid() | key()) -> undefined | pid()
  1741. %%
  1742. %% @doc Atomically transfers the key `From' to the process identified by `To'.
  1743. %%
  1744. %% This function transfers any gproc key (name, property, counter, aggr counter)
  1745. %% from one process to another, and returns the pid of the new owner.
  1746. %%
  1747. %% `To' must be either a pid or a unique name (name or aggregated counter), but
  1748. %% does not necessarily have to resolve to an existing process. If there is
  1749. %% no process registered with the `To' key, `give_away/2' returns `undefined',
  1750. %% and the `From' key is effectively unregistered.
  1751. %%
  1752. %% It is allowed to give away a key to oneself, but of course, this operation
  1753. %% will have no effect.
  1754. %%
  1755. %% Fails with `badarg' if the calling process does not have a `From' key
  1756. %% registered.
  1757. %% @end
  1758. give_away(Key, ToPid) ->
  1759. ?CATCH_GPROC_ERROR(give_away1(Key, ToPid), [Key, ToPid]).
  1760. give_away1({_,l,_} = Key, ToPid) when is_pid(ToPid), node(ToPid) == node() ->
  1761. call({give_away, Key, ToPid});
  1762. give_away1({_,l,_} = Key, {n,l,_} = ToKey) ->
  1763. call({give_away, Key, ToKey});
  1764. give_away1({_,g,_} = Key, To) ->
  1765. ?CHK_DIST,
  1766. gproc_dist:give_away(Key, To).
  1767. %% @spec () -> ok
  1768. %%
  1769. %% @doc Unregister all items of the calling process and inform gproc
  1770. %% to forget about the calling process.
  1771. %%
  1772. %% This function is more efficient than letting gproc perform these
  1773. %% cleanup operations.
  1774. %% @end
  1775. goodbye() ->
  1776. process_is_down(self()).
  1777. %% @spec (Key::process() | key(), Msg::any()) -> Msg
  1778. %%
  1779. %% @doc Sends a message to the process, or processes, corresponding to Key.
  1780. %%
  1781. %% If Key belongs to a unique object (name or aggregated counter), this
  1782. %% function will send a message to the corresponding process, or fail if there
  1783. %% is no such process. If Key is for a non-unique object type (counter or
  1784. %% property), Msg will be send to all processes that have such an object.
  1785. %%
  1786. %% Key can also be anything that the erlang:send/2, or '!' operator accepts as a process
  1787. %% identifier, namely a pid(), an atom(), or `{Name::atom(), Node::atom()}'.
  1788. %% @end
  1789. %%
  1790. send(P, Msg) when is_pid(P); is_atom(P) ->
  1791. P ! Msg;
  1792. send({Name, Node} = P, Msg) when is_atom(Node), is_atom(Name) ->
  1793. P ! Msg;
  1794. send(Key, Msg) ->
  1795. ?CATCH_GPROC_ERROR(send1(Key, Msg), [Key, Msg]).
  1796. send1({T,C,_} = Key, Msg) when C==l; C==g ->
  1797. if T == n orelse T == a orelse T == rc ->
  1798. case ets:lookup(?TAB, {Key, T}) of
  1799. [{_, Pid, _}] ->
  1800. Pid ! Msg;
  1801. _ ->
  1802. ?THROW_GPROC_ERROR(badarg)
  1803. end;
  1804. T==p orelse T==c orelse T==r ->
  1805. %% BUG - if the key part contains select wildcards, we may end up
  1806. %% sending multiple messages to the same pid
  1807. lists:foreach(fun(Pid) ->
  1808. Pid ! Msg
  1809. end, lookup_pids(Key)),
  1810. Msg;
  1811. true ->
  1812. erlang:error(badarg)
  1813. end;
  1814. send1(_, _) ->
  1815. ?THROW_GPROC_ERROR(badarg).
  1816. %% @spec (Key::key(), Msg::any()) -> Msg
  1817. %%
  1818. %% @equiv bcast(nodes(), Key, Msg)
  1819. %% @end
  1820. %%
  1821. bcast(Key, Msg) ->
  1822. bcast(nodes(), Key, Msg).
  1823. %% @spec (Nodes::[atom()], Key::key(), Msg::any()) -> Msg
  1824. %%
  1825. %% @doc Sends a message to processes corresponding to Key on Nodes.
  1826. %%
  1827. %% This function complements `send/2' and works on locally registered resources
  1828. %% that `send/2' supports. Messages are routed via a special broadcast server
  1829. %% on each node to ensure that ordering is preserved. Distributed delivery
  1830. %% is asynchronous and carries the same guarantees as normal message passing
  1831. %% (with the added proviso that the broadcast server also needs to be available).
  1832. %% @see send/2
  1833. %% @end
  1834. %%
  1835. bcast(Ns, Key, Msg) ->
  1836. ?CATCH_GPROC_ERROR(bcast1(Ns, Key, Msg), [Key, Msg]).
  1837. bcast1(Ns, {T,l,_} = Key, Msg) when T==p; T==a; T==c; T==n; T==r; T==rc ->
  1838. send1(Key, Msg),
  1839. gen_server:abcast(Ns -- [node()], gproc_bcast, {send, Key, Msg}),
  1840. Msg.
  1841. %% @spec (Context :: context()) -> key() | '$end_of_table'
  1842. %%
  1843. %% @doc Behaves as ets:first(Tab) for a given type of registration.
  1844. %%
  1845. %% See [http://www.erlang.org/doc/man/ets.html#first-1].
  1846. %% The registry behaves as an ordered_set table.
  1847. %% @end
  1848. %%
  1849. first(Context) ->
  1850. {S, T} = get_s_t(Context),
  1851. {HeadPat,_} = headpat({S, T}, '_', '_', '_'),
  1852. case ets:select(?TAB, [{HeadPat,[],[{element,1,'$_'}]}], 1) of
  1853. {[First], _} ->
  1854. First;
  1855. _ ->
  1856. '$end_of_table'
  1857. end.
  1858. %% @spec (Context :: context()) -> key() | '$end_of_table'
  1859. %%
  1860. %% @doc Behaves as ets:last(Tab) for a given type of registration.
  1861. %%
  1862. %% See [http://www.erlang.org/doc/man/ets.html#last-1].
  1863. %% The registry behaves as an ordered_set table.
  1864. %% @end
  1865. %%
  1866. last(Context) ->
  1867. {S, T} = get_s_t(Context),
  1868. S1 = if S == '_'; S == l -> m; % 'm' comes after 'l'
  1869. S == g -> h % 'h' comes between 'g' & 'l'
  1870. end,
  1871. Beyond = {{T,S1,[]},[]},
  1872. step(ets:prev(?TAB, Beyond), S, T).
  1873. %% @spec (Context::context(), Key::key()) -> key() | '$end_of_table'
  1874. %%
  1875. %% @doc Behaves as ets:next(Tab,Key) for a given type of registration.
  1876. %%
  1877. %% See [http://www.erlang.org/doc/man/ets.html#next-2].
  1878. %% The registry behaves as an ordered_set table.
  1879. %% @end
  1880. %%
  1881. next(Context, K) ->
  1882. {S,T} = get_s_t(Context),
  1883. {Prev,Unwrap} =
  1884. case K of
  1885. {{_,_,_},_} ->
  1886. {K, false};
  1887. {_,_,_} ->
  1888. {{K,[]}, true} % [] is higher than pid(), shared, p, c...
  1889. end,
  1890. unwrap(Unwrap, step(ets:next(?TAB,Prev), S, T)).
  1891. unwrap(true, {{_,_,_} = R,_}) ->
  1892. R;
  1893. unwrap(_, R) ->
  1894. R.
  1895. %% @spec (Context::context(), Key::key()) -> key() | '$end_of_table'
  1896. %%
  1897. %% @doc Behaves as ets:prev(Tab,Key) for a given type of registration.
  1898. %%
  1899. %% See [http://www.erlang.org/doc/man/ets.html#prev-2].
  1900. %% The registry behaves as an ordered_set table.
  1901. %% @end
  1902. %%
  1903. prev(Context, K) ->
  1904. {S, T} = get_s_t(Context),
  1905. {Prev, Unwrap} =
  1906. case K of
  1907. {{_,_,_},_} -> {K, false};
  1908. {_,_,_} ->
  1909. {{K,1}, true}
  1910. end,
  1911. unwrap(Unwrap, step(ets:prev(?TAB, Prev), S, T)).
  1912. step(Key, '_', '_') ->
  1913. case Key of
  1914. {{_,_,_},_} -> Key;
  1915. _ -> '$end_of_table'
  1916. end;
  1917. step(Key, '_', T) ->
  1918. case Key of
  1919. {{T,_,_},_} -> Key;
  1920. _ -> '$end_of_table'
  1921. end;
  1922. step(Key, S, '_') ->
  1923. case Key of
  1924. {{_, S, _}, _} -> Key;
  1925. _ -> '$end_of_table'
  1926. end;
  1927. step(Key, S, T) ->
  1928. case Key of
  1929. {{T, S, _}, _} -> Key;
  1930. _ -> '$end_of_table'
  1931. end.
  1932. %% @spec (Pid::pid()) -> ProcessInfo
  1933. %% ProcessInfo = [{gproc, [{Key,Value}]} | ProcessInfo]
  1934. %%
  1935. %% @doc Similar to `process_info(Pid)' but with additional gproc info.
  1936. %%
  1937. %% Returns the same information as process_info(Pid), but with the
  1938. %% addition of a `gproc' information item, containing the `{Key,Value}'
  1939. %% pairs registered to the process.
  1940. %% @end
  1941. info(Pid) when is_pid(Pid) ->
  1942. Items = [?MODULE | [ I || {I,_} <- process_info(self())]],
  1943. [info(Pid,I) || I <- Items].
  1944. %% @spec (Pid::pid(), Item::atom()) -> {Item, Info}
  1945. %%
  1946. %% @doc Similar to process_info(Pid, Item), but with additional gproc info.
  1947. %%
  1948. %% For `Item = gproc', this function returns a list of `{Key, Value}' pairs
  1949. %% registered to the process Pid. For other values of Item, it returns the
  1950. %% same as [http://www.erlang.org/doc/man/erlang.html#process_info-2].
  1951. %% @end
  1952. info(Pid, gproc) ->
  1953. gproc_info(Pid, '_');
  1954. info(Pid, {gproc, Pat}) ->
  1955. gproc_info(Pid, Pat);
  1956. info(Pid, current_function) ->
  1957. {_, T} = process_info(Pid, backtrace),
  1958. info_cur_f(T, process_info(Pid, current_function));
  1959. info(Pid, I) ->
  1960. process_info(Pid, I).
  1961. %% We don't want to return the internal gproc:info() function as current
  1962. %% function, so we grab the 'backtrace' and extract the call stack from it,
  1963. %% filtering out the functions gproc:info/_ and gproc:'-info/1-lc...' entries.
  1964. %%
  1965. %% This is really an indication that wrapping the process_info() BIF was a
  1966. %% bad idea to begin with... :P
  1967. %%
  1968. info_cur_f(T, Default) ->
  1969. {match, Matches} = re:run(T,<<"\\(([^\\)]+):(.+)/([0-9]+)">>,
  1970. [global,{capture,[1,2,3],list}]),
  1971. case lists:dropwhile(fun(["gproc","info",_]) -> true;
  1972. (["gproc","'-info/1-lc" ++ _, _]) -> true;
  1973. (_) -> false
  1974. end, Matches) of
  1975. [] ->
  1976. Default;
  1977. [[M,F,A]|_] ->
  1978. {current_function,
  1979. {to_atom(M), to_atom(F), list_to_integer(A)}}
  1980. end.
  1981. to_atom(S) ->
  1982. case erl_scan:string(S) of
  1983. {ok, [{atom,_,A}|_],_} ->
  1984. A;
  1985. _ ->
  1986. list_to_atom(S)
  1987. end.
  1988. gproc_info(Pid, Pat) ->
  1989. Keys = ets:select(?TAB, [{ {{Pid,Pat}, '_'}, [], [{element,2,
  1990. {element,1,'$_'}}] }]),
  1991. {?MODULE, lists:zf(
  1992. fun(K) ->
  1993. try V = get_value(K, Pid),
  1994. {true, {K,V}}
  1995. catch
  1996. error:_ ->
  1997. false
  1998. end
  1999. end, Keys)}.
  2000. %% @spec () -> ok
  2001. %%
  2002. %% @doc Similar to the built-in shell command `i()' but inserts information
  2003. %% about names and properties registered in Gproc, where applicable.
  2004. %% @end
  2005. i() ->
  2006. gproc_info:i().
  2007. %%% ==========================================================
  2008. %% @hidden
  2009. handle_cast({monitor_me, Pid}, S) ->
  2010. erlang:monitor(process, Pid),
  2011. {noreply, S};
  2012. handle_cast({audit_process, Pid}, S) ->
  2013. case is_process_alive(Pid) of
  2014. false ->
  2015. process_is_down(Pid);
  2016. true ->
  2017. ignore
  2018. end,
  2019. {noreply, S};
  2020. handle_cast({cancel_wait, Pid, {T,_,_} = Key, Ref}, S) ->
  2021. _ = case ets:lookup(?TAB, {Key,T}) of
  2022. [{_, Waiters}] ->
  2023. gproc_lib:remove_wait(Key, Pid, Ref, Waiters);
  2024. _ ->
  2025. ignore
  2026. end,
  2027. {noreply, S};
  2028. handle_cast({cancel_wait_or_monitor, Pid, {T,_,_} = Key}, S) ->
  2029. _ = case ets:lookup(?TAB, {Key, T}) of
  2030. [{_, Waiters}] ->
  2031. gproc_lib:remove_wait(Key, Pid, all, Waiters);
  2032. [{_, OtherPid, _}] ->
  2033. gproc_lib:remove_monitors(Key, OtherPid, Pid);
  2034. _ ->
  2035. ok
  2036. end,
  2037. {noreply, S}.
  2038. %% @hidden
  2039. handle_call({reg, {_T,l,_} = Key, Val, Attrs, Op}, {Pid,_}, S) ->
  2040. handle_reg_call(Key, Pid, Val, Attrs, Op, S);
  2041. handle_call({reg_other, {_T,l,_} = Key, Pid, Val, Attrs, Op}, _, S) ->
  2042. handle_reg_call(Key, Pid, Val, Attrs, Op, S);
  2043. handle_call({set_attributes, {_,l,_} = Key, Attrs}, {Pid,_}, S) ->
  2044. case gproc_lib:insert_attr(Key, Attrs, Pid, l) of
  2045. false -> {reply, badarg, S};
  2046. L when is_list(L) ->
  2047. {reply, true, S}
  2048. end;
  2049. handle_call({set_attributes_shared, {_,l,_} = Key, Attrs}, _, S) ->
  2050. case gproc_lib:insert_attr(Key, Attrs, shared, l) of
  2051. false ->
  2052. {reply, badarg, S};
  2053. L when is_list(L) ->
  2054. {reply, true, S}
  2055. end;
  2056. handle_call({reg_or_locate, {T,l,_} = Key, Val, P}, _, S) ->
  2057. Reg = fun() ->
  2058. Pid = if is_function(P, 0) ->
  2059. spawn(P);
  2060. is_pid(P) ->
  2061. P
  2062. end,
  2063. true = gproc_lib:insert_reg(Key, Val, Pid, l),
  2064. _ = gproc_lib:ensure_monitor(Pid, l),
  2065. {reply, {Pid, Val}, S}
  2066. end,
  2067. case ets:lookup(?TAB, {Key, T}) of
  2068. [] ->
  2069. Reg();
  2070. [{_, _Waiters}] ->
  2071. Reg();
  2072. [{_, OtherPid, OtherValue}] ->
  2073. {reply, {OtherPid, OtherValue}, S}
  2074. end;
  2075. handle_call({monitor, {T,l,_} = Key, Pid, Type}, _From, S)
  2076. when T==n; T==a ->
  2077. Ref = make_ref(),
  2078. Lookup = ets:lookup(?TAB, {Key, T}),
  2079. IsRegged = is_regged(Lookup),
  2080. _ = case {IsRegged, Type} of
  2081. {false, info} ->
  2082. Pid ! {gproc, unreg, Ref, Key};
  2083. {false, follow} ->
  2084. Pid ! {gproc, unreg, Ref, Key},
  2085. _ = gproc_lib:ensure_monitor(Pid, l),
  2086. case Lookup of
  2087. [{K, Waiters}] ->
  2088. NewWaiters = [{Pid,Ref,follow}|Waiters],
  2089. ets:insert(?TAB, {K, NewWaiters}),
  2090. ets:insert_new(?TAB, {{Pid,Key}, []});
  2091. [] ->
  2092. ets:insert(?TAB, {{Key,T}, [{Pid,Ref,follow}]}),
  2093. ets:insert_new(?TAB, {{Pid,Key}, []})
  2094. end;
  2095. {false, standby} ->
  2096. Evt = {failover, Pid},
  2097. true = gproc_lib:insert_reg(Key, undefined, Pid, l, Evt),
  2098. Pid ! {gproc, Evt, Ref, Key},
  2099. _ = gproc_lib:ensure_monitor(Pid, l);
  2100. {true, _} ->
  2101. [{_, RegPid, _}] = Lookup,
  2102. _ = gproc_lib:ensure_monitor(Pid, l),
  2103. case ets:lookup(?TAB, {RegPid, Key}) of
  2104. [{K,r}] ->
  2105. ets:insert(?TAB, {K, [{monitor, [{Pid,Ref,Type}]}]}),
  2106. ets:insert_new(?TAB, {{Pid,Key}, []});
  2107. [{K, Opts}] ->
  2108. ets:insert(?TAB, {K, gproc_lib:add_monitor(
  2109. Opts, Pid, Ref, Type)}),
  2110. ets:insert_new(?TAB, {{Pid,Key}, []})
  2111. end
  2112. end,
  2113. {reply, Ref, S};
  2114. handle_call({demonitor, {T,l,_} = Key, Ref, Pid}, _From, S)
  2115. when T==n; T==a; T==rc ->
  2116. _ = case ets:lookup(?TAB, {Key, T}) of
  2117. [] ->
  2118. ok; % be nice
  2119. [{_, Waiters}] ->
  2120. case lists:filter(fun({P, R, _}) ->
  2121. P =/= Pid orelse R =/= Ref
  2122. end, Waiters) of
  2123. [] ->
  2124. ets:delete(?TAB, {Pid, Key}),
  2125. ets:delete(?TAB, {Key, T});
  2126. NewWaiters ->
  2127. case lists:keymember(Pid, 1, NewWaiters) of
  2128. true ->
  2129. ok;
  2130. false ->
  2131. ets:delete(?TAB, {Pid, Key})
  2132. end,
  2133. ets:insert(?TAB, {{Key, T}, Waiters})
  2134. end;
  2135. [{_, RegPid, _}] ->
  2136. case ets:lookup(?TAB, {RegPid, Key}) of
  2137. [{_K,r}] ->
  2138. ok; % be nice
  2139. [{K, Opts}] ->
  2140. Opts1 = gproc_lib:remove_monitor(Opts, Pid, Ref),
  2141. ets:insert(?TAB, {K, Opts1}),
  2142. case gproc_lib:does_pid_monitor(Pid, Opts) of
  2143. true ->
  2144. ok;
  2145. false ->
  2146. ets:delete(?TAB, {Pid, Key})
  2147. end
  2148. end
  2149. end,
  2150. {reply, ok, S};
  2151. handle_call({reg_shared, {_T,l,_} = Key, Val, Attrs, Op}, _From, S) ->
  2152. case try_insert_reg(Key, Val, shared) of
  2153. true ->
  2154. _ = if Attrs =/= [] ->
  2155. gproc_lib:insert_attr(Key, Attrs, shared, l);
  2156. true -> true
  2157. end,
  2158. {reply, true, S};
  2159. already_registered when Op == ensure ->
  2160. case gproc_lib:do_set_value(Key, Val, shared) of
  2161. true ->
  2162. {reply, updated, S};
  2163. false ->
  2164. {reply, badarg, S}
  2165. end;
  2166. _ ->
  2167. {reply, badarg, S}
  2168. end;
  2169. handle_call({unreg, {_,l,_} = Key}, {Pid,_}, S) ->
  2170. handle_unreg_call(Key, Pid, S);
  2171. handle_call({unreg_other, {_,l,_} = Key, Pid}, _, S) ->
  2172. handle_unreg_call(Key, Pid, S);
  2173. handle_call({unreg_shared, {_,l,_} = Key}, _, S) ->
  2174. _ = case ets:lookup(?TAB, {shared, Key}) of
  2175. [{_, r}] ->
  2176. _ = gproc_lib:remove_reg(Key, shared, unreg, []);
  2177. [{_, Opts}] ->
  2178. _ = gproc_lib:remove_reg(Key, shared, unreg, Opts);
  2179. [] ->
  2180. %% don't crash if shared key already unregged.
  2181. ok
  2182. end,
  2183. {reply, true, S};
  2184. handle_call({await, {_,l,_} = Key, Pid}, From, S) ->
  2185. %% Passing the pid explicitly is needed when leader_call is used,
  2186. %% since the Pid given as From in the leader is the local gen_leader
  2187. %% instance on the calling node.
  2188. case gproc_lib:await(Key, Pid, From) of
  2189. noreply ->
  2190. {noreply, S};
  2191. {reply, Reply, _} ->
  2192. {reply, Reply, S}
  2193. end;
  2194. handle_call({mreg, T, l, L}, {Pid,_}, S) ->
  2195. try gproc_lib:insert_many(T, l, L, Pid) of
  2196. {true,_} -> {reply, true, S};
  2197. false -> {reply, badarg, S}
  2198. catch
  2199. error:_ -> {reply, badarg, S}
  2200. end;
  2201. handle_call({munreg, T, l, L}, {Pid,_}, S) ->
  2202. _ = gproc_lib:remove_many(T, l, L, Pid),
  2203. {reply, true, S};
  2204. handle_call({set, {_,l,_} = Key, Value}, {Pid,_}, S) ->
  2205. case gproc_lib:do_set_value(Key, Value, Pid) of
  2206. true ->
  2207. {reply, true, S};
  2208. false ->
  2209. {reply, badarg, S}
  2210. end;
  2211. handle_call({set_shared, {_,l,_} = Key, Value}, {_,_}, S) ->
  2212. case gproc_lib:do_set_value(Key, Value, shared) of
  2213. true ->
  2214. {reply, true, S};
  2215. false ->
  2216. {reply, badarg, S}
  2217. end;
  2218. handle_call({audit_process, Pid}, _, S) ->
  2219. _ = case is_process_alive(Pid) of
  2220. false ->
  2221. process_is_down(Pid);
  2222. true ->
  2223. ignore
  2224. end,
  2225. {reply, ok, S};
  2226. handle_call({give_away, Key, To}, {Pid,_}, S) ->
  2227. Reply = do_give_away(Key, To, Pid),
  2228. {reply, Reply, S};
  2229. handle_call(_, _, S) ->
  2230. {reply, badarg, S}.
  2231. %% @hidden
  2232. handle_info({'DOWN', _MRef, process, Pid, _}, S) ->
  2233. _ = process_is_down(Pid),
  2234. {noreply, S};
  2235. handle_info(_, S) ->
  2236. {noreply, S}.
  2237. %% @hidden
  2238. code_change(_FromVsn, S, _Extra) ->
  2239. %% We have changed local monitor markers from {Pid} to {Pid,l}.
  2240. _ = case ets:select(?TAB, [{{'$1'},[],['$1']}]) of
  2241. [] ->
  2242. ok;
  2243. Pids ->
  2244. ets:insert(?TAB, [{P,l} || P <- Pids]),
  2245. ets:select_delete(?TAB, [{{'_'},[],[true]}])
  2246. end,
  2247. {ok, S}.
  2248. %% @hidden
  2249. terminate(_Reason, _S) ->
  2250. ok.
  2251. %% handle_call body common to reg and reg_other.
  2252. %%
  2253. handle_reg_call(Key, Pid, Val, Attrs, Op, S) ->
  2254. case try_insert_reg(Key, Val, Pid) of
  2255. true ->
  2256. _ = gproc_lib:ensure_monitor(Pid,l),
  2257. _ = set_attrs(Attrs, Key, Pid),
  2258. {reply, regged_new(Op), S};
  2259. already_registered when Op == ensure ->
  2260. case gproc_lib:do_set_value(Key, Val, Pid) of
  2261. true ->
  2262. _ = set_attrs(Attrs, Key, Pid),
  2263. {reply, updated, S};
  2264. false ->
  2265. %% actually pretty bad, if it ever happens
  2266. {reply, badarg, S}
  2267. end;
  2268. false ->
  2269. {reply, badarg, S}
  2270. end.
  2271. set_attrs([], _, _) ->
  2272. true;
  2273. set_attrs([_|_] = Attrs, Key, Pid) ->
  2274. gproc_lib:insert_attr(Key, Attrs, Pid, l).
  2275. handle_unreg_call(Key, Pid, S) ->
  2276. case ets:lookup(?TAB, {Pid,Key}) of
  2277. [{_, r}] ->
  2278. _ = gproc_lib:remove_reg(Key, Pid, unreg, []),
  2279. {reply, true, S};
  2280. [{_, Opts}] when is_list(Opts) ->
  2281. _ = gproc_lib:remove_reg(Key, Pid, unreg, Opts),
  2282. {reply, true, S};
  2283. [] ->
  2284. {reply, badarg, S}
  2285. end.
  2286. call(Req) ->
  2287. call(Req, l).
  2288. call(Req, l) ->
  2289. chk_reply(gen_server:call(?MODULE, Req));
  2290. call(Req, g) ->
  2291. chk_reply(gproc_dist:leader_call(Req)).
  2292. call(N, Req, l) ->
  2293. chk_reply(gen_server:call({?MODULE, N}, Req));
  2294. call(undefined, Req, g) ->
  2295. %% we always call the leader
  2296. chk_reply(gproc_dist:leader_call(Req)).
  2297. chk_reply(Reply) ->
  2298. case Reply of
  2299. badarg -> ?THROW_GPROC_ERROR(badarg);
  2300. _ -> Reply
  2301. end.
  2302. cast(Msg) ->
  2303. cast(Msg, l).
  2304. cast(Msg, l) ->
  2305. gen_server:cast(?MODULE, Msg);
  2306. cast(Msg, g) ->
  2307. gproc_dist:leader_cast(Msg).
  2308. cast(N, Msg, l) ->
  2309. gen_server:cast({?MODULE, N}, Msg).
  2310. try_insert_reg({T,l,_} = Key, Val, Pid) ->
  2311. case gproc_lib:insert_reg(Key, Val, Pid, l) of
  2312. false ->
  2313. case ets:lookup(?TAB, {Key,T}) of
  2314. %% In this particular case, the lookup cannot result in
  2315. %% [{_, Waiters}], since the insert_reg/4 function would
  2316. %% have succeeded then.
  2317. [{_, Pid, _}] ->
  2318. already_registered;
  2319. [{_, OtherPid, _}] ->
  2320. case is_process_alive(OtherPid) of
  2321. true ->
  2322. false;
  2323. false ->
  2324. process_is_down(OtherPid), % may result in failover
  2325. try_insert_reg(Key, Val, Pid)
  2326. end;
  2327. [] ->
  2328. false
  2329. end;
  2330. true ->
  2331. true
  2332. end.
  2333. %% try_insert_shared({c,l,_} = Key, Val) ->
  2334. %% ets:insert_new(?TAB, [{{Key,shared}, shared, Val}, {{shared, Key}, []}]);
  2335. %% try_insert_shared({a,l,_} = Key, Val) ->
  2336. %% ets:insert_new(?TAB, [{{Key, a}, shared, Val}, {{shared, Key}, []}]).
  2337. -spec audit_process(pid()) -> ok.
  2338. audit_process(Pid) when is_pid(Pid) ->
  2339. ok = gen_server:call(gproc, {audit_process, Pid}, infinity).
  2340. nb_audit_process(Pid) when is_pid(Pid) ->
  2341. ok = gen_server:cast(gproc, {audit_process, Pid}).
  2342. -spec process_is_down(pid()) -> ok.
  2343. process_is_down(Pid) when is_pid(Pid) ->
  2344. %% delete the monitor marker
  2345. %% io:fwrite(user, "process_is_down(~p) - ~p~n", [Pid,ets:tab2list(?TAB)]),
  2346. Marker = {Pid,l},
  2347. case ets:member(?TAB, Marker) of
  2348. false ->
  2349. ok;
  2350. true ->
  2351. Revs = ets:select(?TAB, [{{{Pid,'$1'}, '$2'},
  2352. [{'==',{element,2,'$1'},l}],
  2353. [{{'$1','$2'}}]}]),
  2354. lists:foreach(
  2355. fun({{n,l,_}=K, R}) ->
  2356. Key = {K,n},
  2357. case ets:lookup(?TAB, Key) of
  2358. [{_, Pid, V}] ->
  2359. ets:delete(?TAB, Key),
  2360. opt_notify(R, K, Pid, V);
  2361. [{_, Waiters}] ->
  2362. case [W || W <- Waiters,
  2363. element(1,W) =/= Pid] of
  2364. [] ->
  2365. ets:delete(?TAB, Key);
  2366. Waiters1 ->
  2367. ets:insert(?TAB, {Key, Waiters1})
  2368. end;
  2369. [{_, OtherPid, _}] when Pid =/= OtherPid ->
  2370. case ets:lookup(?TAB, {OtherPid, K}) of
  2371. [{RK, Opts}] when is_list(Opts) ->
  2372. Opts1 = gproc_lib:remove_monitor_pid(
  2373. Opts, Pid),
  2374. ets:insert(?TAB, {RK, Opts1});
  2375. _ ->
  2376. true
  2377. end;
  2378. [] ->
  2379. true
  2380. end;
  2381. ({{c,l,C} = K, _}) ->
  2382. Key = {K, Pid},
  2383. [{_, _, Value}] = ets:lookup(?TAB, Key),
  2384. ets:delete(?TAB, Key),
  2385. gproc_lib:update_aggr_counter(l, C, -Value);
  2386. ({{r,l,Rsrc} = K, _}) ->
  2387. Key = {K, Pid},
  2388. ets:delete(?TAB, Key),
  2389. gproc_lib:decrement_resource_count(l, Rsrc);
  2390. ({{rc,l,_} = K, R}) ->
  2391. remove_aggregate(rc, K, R, Pid);
  2392. ({{a,l,_} = K, R}) ->
  2393. remove_aggregate(a, K, R, Pid);
  2394. ({{p,_,_} = K, _}) ->
  2395. ets:delete(?TAB, {K, Pid})
  2396. end, Revs),
  2397. ets:select_delete(?TAB, [{{{Pid,{'_',l,'_'}},'_'}, [], [true]}]),
  2398. ets:delete(?TAB, Marker),
  2399. ok
  2400. end.
  2401. remove_aggregate(T, K, R, Pid) ->
  2402. case ets:lookup(?TAB, {K,T}) of
  2403. [{_, Pid, V}] ->
  2404. ets:delete(?TAB, {K,T}),
  2405. opt_notify(R, K, Pid, V);
  2406. [{_, OtherPid, _}] when Pid =/= OtherPid ->
  2407. case ets:lookup(?TAB, {OtherPid, K}) of
  2408. [{RK, Opts}] when is_list(Opts) ->
  2409. Opts1 = gproc_lib:remove_monitor_pid(
  2410. Opts, Pid),
  2411. ets:insert(?TAB, {RK, Opts1});
  2412. _ ->
  2413. true
  2414. end;
  2415. [] ->
  2416. opt_notify(R, K, Pid, undefined)
  2417. end.
  2418. opt_notify(r, _, _, _) ->
  2419. ok;
  2420. opt_notify(Opts, {T,_,_} = Key, Pid, Value) ->
  2421. case gproc_lib:standbys(Opts) of
  2422. [] ->
  2423. keep_followers(Opts, Key),
  2424. gproc_lib:notify(unreg, Key, Opts);
  2425. SBs ->
  2426. case pick_standby(SBs) of
  2427. false ->
  2428. keep_followers(Opts, Key),
  2429. gproc_lib:notify(unreg, Key, Opts),
  2430. ok;
  2431. {ToPid, Ref} ->
  2432. ets:insert(?TAB, [{{Key,T}, ToPid, Value},
  2433. {{ToPid, Key},
  2434. gproc_lib:remove_monitor(
  2435. Opts, ToPid, Ref)}]),
  2436. _ = gproc_lib:remove_reverse_mapping(
  2437. {failover,ToPid}, Pid, Key),
  2438. _ = gproc_lib:ensure_monitor(ToPid, l),
  2439. ok
  2440. end
  2441. end.
  2442. keep_followers(Opts, {T,_,_} = Key) ->
  2443. case gproc_lib:followers(Opts) of
  2444. [] ->
  2445. ok;
  2446. [_|_] = F ->
  2447. ets:insert(?TAB, {{Key,T}, F})
  2448. end.
  2449. pick_standby([{Pid, Ref, standby}|T]) when node(Pid) =:= node() ->
  2450. case is_process_alive(Pid) of
  2451. true ->
  2452. {Pid, Ref};
  2453. false ->
  2454. pick_standby(T)
  2455. end;
  2456. pick_standby([_|T]) ->
  2457. pick_standby(T);
  2458. pick_standby([]) ->
  2459. false.
  2460. do_give_away({T,l,_} = K, To, Pid) when T==n; T==a; T==rc ->
  2461. Key = {K, T},
  2462. case ets:lookup(?TAB, Key) of
  2463. [{_, Pid, Value}] ->
  2464. %% Pid owns the reg; allowed to give_away
  2465. case pid_to_give_away_to(To) of
  2466. Pid ->
  2467. %% Give away to ourselves? Why not? We'll allow it,
  2468. %% but nothing needs to be done.
  2469. Pid;
  2470. ToPid when is_pid(ToPid) ->
  2471. ets:insert(?TAB, [{Key, ToPid, Value},
  2472. {{ToPid, K}, []}]),
  2473. _ = gproc_lib:remove_reverse_mapping({migrated,ToPid}, Pid, K),
  2474. _ = gproc_lib:ensure_monitor(ToPid, l),
  2475. ToPid;
  2476. undefined ->
  2477. _ = gproc_lib:remove_reg(K, Pid, unreg),
  2478. undefined
  2479. end;
  2480. _ ->
  2481. badarg
  2482. end;
  2483. do_give_away({T,l,_} = K, To, Pid) when T==c; T==p; T==r ->
  2484. Key = {K, Pid},
  2485. case ets:lookup(?TAB, Key) of
  2486. [{_, Pid, Value}] ->
  2487. case pid_to_give_away_to(To) of
  2488. ToPid when is_pid(ToPid) ->
  2489. ToKey = {K, ToPid},
  2490. case ets:member(?TAB, ToKey) of
  2491. true ->
  2492. badarg;
  2493. false ->
  2494. ets:insert(?TAB, [{ToKey, ToPid, Value},
  2495. {{ToPid, K}, []}]),
  2496. ets:delete(?TAB, {Pid, K}),
  2497. ets:delete(?TAB, Key),
  2498. _ = gproc_lib:ensure_monitor(ToPid, l),
  2499. ToPid
  2500. end;
  2501. undefined ->
  2502. _ = gproc_lib:remove_reg(K, Pid, {migrated, undefined}),
  2503. undefined
  2504. end;
  2505. _ ->
  2506. badarg
  2507. end.
  2508. pid_to_give_away_to(P) when is_pid(P), node(P) == node() ->
  2509. P;
  2510. pid_to_give_away_to({T,l,_} = Key) when T==n; T==a; T==rc ->
  2511. case ets:lookup(?TAB, {Key, T}) of
  2512. [{_, Pid, _}] ->
  2513. Pid;
  2514. _ ->
  2515. undefined
  2516. end.
  2517. create_tabs() ->
  2518. Opts = gproc_lib:valid_opts(ets_options, [{write_concurrency,true},
  2519. {read_concurrency, true}]),
  2520. case ets:info(?TAB, name) of
  2521. undefined ->
  2522. ets:new(?TAB, [ordered_set, public, named_table | Opts]);
  2523. _ ->
  2524. ok
  2525. end.
  2526. %% @hidden
  2527. init([]) ->
  2528. set_monitors(),
  2529. {ok, #state{}}.
  2530. set_monitors() ->
  2531. set_monitors(ets:select(?TAB, [{{{'$1',l}},[],['$1']}], 100)).
  2532. set_monitors('$end_of_table') ->
  2533. ok;
  2534. set_monitors({Pids, Cont}) ->
  2535. _ = [erlang:monitor(process,Pid) || Pid <- Pids],
  2536. set_monitors(ets:select(Cont)).
  2537. monitor_me() ->
  2538. case ets:insert_new(?TAB, {{self(),l}}) of
  2539. false -> true;
  2540. true ->
  2541. cast({monitor_me,self()}),
  2542. true
  2543. end.
  2544. pattern([{'_', Gs, As}], T) ->
  2545. ?l,
  2546. {HeadPat, Vs} = headpat(T, '$1', '$2', '$3'),
  2547. [{HeadPat, rewrite(Gs,Vs), rewrite(As,Vs)}];
  2548. pattern([{{A,B,C},Gs,As}], Scope) ->
  2549. ?l,
  2550. {HeadPat, Vars} = headpat(Scope, A,B,C),
  2551. [{HeadPat, rewrite(Gs,Vars), rewrite(As,Vars)}];
  2552. pattern([{Head, Gs, As}], Scope) ->
  2553. ?l,
  2554. {S, T} = get_s_t(Scope),
  2555. case is_var(Head) of
  2556. {true,_N} ->
  2557. HeadPat = {{{T,S,'_'},'_'},'_','_'},
  2558. Vs = [{Head, obj_prod()}],
  2559. %% the headpat function should somehow verify that Head is
  2560. %% consistent with Scope (or should we add a guard?)
  2561. [{HeadPat, rewrite(Gs, Vs), rewrite(As, Vs)}];
  2562. false ->
  2563. erlang:error(badarg)
  2564. end.
  2565. %% This is the expression to use in guards and the RHS to address the whole
  2566. %% object, in its logical representation.
  2567. obj_prod() ->
  2568. {{ {element,1,{element,1,'$_'}},
  2569. {element,2,'$_'},
  2570. {element,3,'$_'} }}.
  2571. obj_prod_l() ->
  2572. [ {element,1,{element,1,'$_'}},
  2573. {element,2,'$_'},
  2574. {element,3,'$_'} ].
  2575. headpat({S, T}, V1,V2,V3) ->
  2576. headpat(type(T), scope(S), V1,V2,V3);
  2577. headpat(T, V1, V2, V3) when is_atom(T) ->
  2578. headpat(type(T), l, V1, V2, V3);
  2579. headpat(_, _, _, _) -> erlang:error(badarg).
  2580. headpat(T, C, V1,V2,V3) ->
  2581. Rf = fun(Pos) ->
  2582. {element,Pos,{element,1,{element,1,'$_'}}}
  2583. end,
  2584. K2 = if T==n orelse T==a orelse T==rc -> T;
  2585. true -> '_'
  2586. end,
  2587. {Kp,Vars} = case V1 of
  2588. {Vt,Vc,Vn} ->
  2589. ?l,
  2590. {T1,Vs1} = subst(T,Vt,fun() -> Rf(1) end,[]),
  2591. {C1,Vs2} = subst(C,Vc,fun() -> Rf(2) end,Vs1),
  2592. {{T1,C1,Vn}, Vs2};
  2593. '_' ->
  2594. ?l,
  2595. {{T,C,'_'}, []};
  2596. _ ->
  2597. ?l,
  2598. case is_var(V1) of
  2599. {true,_} ->
  2600. {{T,C,V1}, [{V1, {element,1,
  2601. {element,1,'$_'}}}]};
  2602. false ->
  2603. erlang:error(badarg)
  2604. end
  2605. end,
  2606. {{{Kp,K2},V2,V3}, Vars}.
  2607. %% l(L) -> L.
  2608. subst(X, '_', _F, Vs) ->
  2609. {X, Vs};
  2610. subst(X, V, F, Vs) ->
  2611. case is_var(V) of
  2612. {true,_} ->
  2613. {X, [{V,F()}|Vs]};
  2614. false ->
  2615. {V, Vs}
  2616. end.
  2617. scope('_') -> '_';
  2618. scope(all) -> '_';
  2619. scope(global) -> g;
  2620. scope(local) -> l;
  2621. scope(S) when S==l; S==g -> S.
  2622. type('_') -> '_';
  2623. type(all) -> '_';
  2624. type(T) when T==n; T==p; T==c; T==a -> T;
  2625. type(names) -> n;
  2626. type(props) -> p;
  2627. type(resources) -> r;
  2628. type(counters) -> c;
  2629. type(aggr_counters) -> a;
  2630. type(resource_counters) -> rc.
  2631. rev_keypat(Context) ->
  2632. {S,T} = get_s_t(Context),
  2633. {T,S,'_'}.
  2634. get_s_t({S,T}) -> {scope(S), type(T)};
  2635. get_s_t(T) when is_atom(T) ->
  2636. {scope(all), type(T)}.
  2637. is_var('$1') -> {true,1};
  2638. is_var('$2') -> {true,2};
  2639. is_var('$3') -> {true,3};
  2640. is_var('$4') -> {true,4};
  2641. is_var('$5') -> {true,5};
  2642. is_var('$6') -> {true,6};
  2643. is_var('$7') -> {true,7};
  2644. is_var('$8') -> {true,8};
  2645. is_var('$9') -> {true,9};
  2646. is_var(X) when is_atom(X) ->
  2647. case atom_to_list(X) of
  2648. "\$" ++ Tl ->
  2649. try N = list_to_integer(Tl),
  2650. {true,N}
  2651. catch
  2652. error:_ ->
  2653. false
  2654. end;
  2655. _ ->
  2656. false
  2657. end;
  2658. is_var(_) -> false.
  2659. rewrite(Gs, R) ->
  2660. [rewrite1(G, R) || G <- Gs].
  2661. rewrite1('$_', _) ->
  2662. obj_prod();
  2663. rewrite1('$$', _) ->
  2664. obj_prod_l();
  2665. rewrite1(Guard, R) when is_tuple(Guard) ->
  2666. list_to_tuple([rewrite1(G, R) || G <- tuple_to_list(Guard)]);
  2667. rewrite1(Exprs, R) when is_list(Exprs) ->
  2668. [rewrite1(E, R) || E <- Exprs];
  2669. rewrite1(V, R) when is_atom(V) ->
  2670. case is_var(V) of
  2671. {true,_N} ->
  2672. case lists:keysearch(V, 1, R) of
  2673. {value, {_, V1}} ->
  2674. V1;
  2675. false ->
  2676. V
  2677. end;
  2678. false ->
  2679. V
  2680. end;
  2681. rewrite1(Expr, _) ->
  2682. Expr.
  2683. %% @spec () -> any()
  2684. %%
  2685. %% @doc
  2686. %% @equiv table({all, all})
  2687. %% @end
  2688. table() ->
  2689. table({all, all}).
  2690. %% @spec (Context::context()) -> any()
  2691. %%
  2692. %% @doc
  2693. %% @equiv table(Context, [])
  2694. %% @end
  2695. %%
  2696. table(Context) ->
  2697. table(Context, []).
  2698. %% @spec (Context::context(), Opts) -> any()
  2699. %%
  2700. %% @doc QLC table generator for the gproc registry.
  2701. %% Context specifies which subset of the registry should be queried.
  2702. %% See [http://www.erlang.org/doc/man/qlc.html].
  2703. %%
  2704. %% NOTE: By default, the gproc table generator will not filter out entries
  2705. %% belonging to processes that have just died, but which have yet to be cleared
  2706. %% out of the registry. Use the option `check_pids' (or `{check_pids, true}')
  2707. %% if you want to filter out dead entries already in the query. There will be
  2708. %% some overhead associated with doing so, and given that the process monitoring
  2709. %% is asynchronous, there can never be any guarantee that there are no dead
  2710. %% entries in the list by the time your program processes it.
  2711. %%
  2712. %% @end
  2713. table(Context, Opts) ->
  2714. Ctxt = {_, Type} = get_s_t(Context),
  2715. [Traverse, NObjs] = [proplists:get_value(K,Opts,Def) ||
  2716. {K,Def} <- [{traverse,select}, {n_objects,100}]],
  2717. CheckPids = proplists:get_bool(check_pids, Opts),
  2718. TF = case Traverse of
  2719. first_next ->
  2720. fun() -> qlc_next(Ctxt, first(Ctxt), CheckPids) end;
  2721. last_prev -> fun() -> qlc_prev(Ctxt, last(Ctxt), CheckPids) end;
  2722. select ->
  2723. fun(MS) -> qlc_select(
  2724. CheckPids,
  2725. select(Ctxt, wrap_qlc_ms_prod(CheckPids, MS),
  2726. NObjs))
  2727. end;
  2728. {select,MS} ->
  2729. fun() -> qlc_select(
  2730. CheckPids,
  2731. select(Ctxt, wrap_qlc_ms_prod(CheckPids, MS),
  2732. NObjs))
  2733. end;
  2734. _ ->
  2735. erlang:error(badarg, [Ctxt,Opts])
  2736. end,
  2737. InfoFun = fun(indices) -> [2];
  2738. (is_unique_objects) -> is_unique(Type);
  2739. (keypos) -> 1;
  2740. (is_sorted_key) -> true;
  2741. (num_of_objects) ->
  2742. %% this is just a guesstimate.
  2743. trunc(ets:info(?TAB,size) / 2.5)
  2744. end,
  2745. LookupFun =
  2746. case Traverse of
  2747. {select, _MS} -> undefined;
  2748. _ -> fun(Pos, Ks) -> qlc_lookup(Ctxt, Pos, Ks, CheckPids) end
  2749. end,
  2750. qlc:table(TF, [{info_fun, InfoFun},
  2751. {lookup_fun, LookupFun}] ++ [{K,V} || {K,V} <- Opts,
  2752. K =/= traverse,
  2753. K =/= n_objects]).
  2754. wrap_qlc_ms_prod(false, Pats) ->
  2755. Pats;
  2756. wrap_qlc_ms_prod(true, Pats) ->
  2757. [ wrap_qlc_ms_prod_(P) || P <- Pats ].
  2758. wrap_qlc_ms_prod_({H, Gs, [P]}) ->
  2759. {H, Gs, [{{ {element, 2, '$_'}, P }}]}.
  2760. qlc_lookup(_Scope, 1, Keys, Check) ->
  2761. lists:flatmap(
  2762. fun(Key) ->
  2763. remove_dead(
  2764. Check,
  2765. ets:select(?TAB, [{ {{Key,'_'},'_','_'}, [],
  2766. [{{ {element,1,{element,1,'$_'}},
  2767. {element,2,'$_'},
  2768. {element,3,'$_'} }}] }]))
  2769. end, Keys);
  2770. qlc_lookup(Scope, 2, Pids, Check) ->
  2771. lists:flatmap(fun(Pid) ->
  2772. qlc_lookup_pid(Pid, Scope, Check)
  2773. end, Pids).
  2774. remove_dead(false, Objs) ->
  2775. Objs;
  2776. remove_dead(true, Objs) ->
  2777. [ Reg || {_, Pid, _} = Reg <- Objs,
  2778. not ?PID_IS_DEAD(Pid) ].
  2779. %% While it may seem obsessive not to do the sensible pid liveness check here
  2780. %% every time, we make it optional for consistency; this way, we can devise
  2781. %% a test case that verifies the difference between having the option and not.
  2782. qlc_lookup_pid(Pid, Scope, Check) ->
  2783. case Check andalso ?PID_IS_DEAD(Pid) of
  2784. true ->
  2785. [];
  2786. false ->
  2787. Found =
  2788. ets:select(?TAB, [{{{Pid, rev_keypat(Scope)}, '_'},
  2789. [], ['$_']}]),
  2790. lists:flatmap(
  2791. fun({{_,{T,_,_}=K}, _}) ->
  2792. K2 = if T==n orelse T==a -> T;
  2793. true -> Pid
  2794. end,
  2795. case ets:lookup(?TAB, {K,K2}) of
  2796. [{{Key,_},_,Value}] ->
  2797. [{Key, Pid, Value}];
  2798. [] ->
  2799. []
  2800. end
  2801. end, Found)
  2802. end.
  2803. qlc_next(_, '$end_of_table', _) -> [];
  2804. qlc_next(Scope, K, Check) ->
  2805. case ets:lookup(?TAB, K) of
  2806. [{{Key,_}, Pid, V}] ->
  2807. case Check andalso ?PID_IS_DEAD(Pid) of
  2808. true ->
  2809. qlc_next(Scope, next(Scope, K), Check);
  2810. false ->
  2811. [{Key,Pid,V}] ++ fun() ->
  2812. qlc_next(Scope, next(Scope, K),
  2813. Check)
  2814. end
  2815. end;
  2816. [] ->
  2817. qlc_next(Scope, next(Scope, K), Check)
  2818. end.
  2819. qlc_prev(_, '$end_of_table', _) -> [];
  2820. qlc_prev(Scope, K, Check) ->
  2821. case ets:lookup(?TAB, K) of
  2822. [{{Key,_},Pid,V}] ->
  2823. case Check andalso ?PID_IS_DEAD(Pid) of
  2824. true ->
  2825. qlc_prev(Scope, prev(Scope, K), Check);
  2826. false ->
  2827. [{Key,Pid,V}] ++ fun() ->
  2828. qlc_prev(Scope, prev(Scope, K),
  2829. Check)
  2830. end
  2831. end;
  2832. [] ->
  2833. qlc_prev(Scope, prev(Scope, K), Check)
  2834. end.
  2835. qlc_select(_, '$end_of_table') ->
  2836. [];
  2837. qlc_select(true, {Objects, Cont}) ->
  2838. case [O || {Pid,O} <- Objects,
  2839. not ?PID_IS_DEAD(Pid)] of
  2840. [] ->
  2841. %% re-run search
  2842. qlc_select(true, ets:select(Cont));
  2843. Found ->
  2844. Found ++ fun() -> qlc_select(true, ets:select(Cont)) end
  2845. end;
  2846. qlc_select(false, {Objects, Cont}) ->
  2847. Objects ++ fun() -> qlc_select(false, ets:select(Cont)) end.
  2848. is_unique(n) -> true;
  2849. is_unique(a) -> true;
  2850. is_unique(_) -> false.
  2851. is_regged([{_, _, _}]) ->
  2852. true;
  2853. is_regged(_) ->
  2854. false.