gproc.erl 100 KB


  1. %% -*- erlang-indent-level: 4; indent-tabs-mode: nil -*-
  2. %% --------------------------------------------------
  3. %% This file is provided to you under the Apache License,
  4. %% Version 2.0 (the "License"); you may not use this file
  5. %% except in compliance with the License. You may obtain
  6. %% a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing,
  11. %% software distributed under the License is distributed on an
  12. %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  13. %% KIND, either express or implied. See the License for the
  14. %% specific language governing permissions and limitations
  15. %% under the License.
  16. %% --------------------------------------------------
  17. %%
  18. %% @author Ulf Wiger <ulf@wiger.net>
  19. %%
  20. %% @doc Extended process registry
  21. %% This module implements an extended process registry
  22. %%
  23. %% For a detailed description, see
  24. %% <a href="erlang07-wiger.pdf">erlang07-wiger.pdf</a>.
  25. %%
  26. %% <b>NOTE:</b> The functions in the Gproc API expect the Gproc application
  27. %% to be running.
  28. %%
  29. %% <h2>Tuning Gproc performance</h2>
  30. %%
  31. %% Gproc relies on a central server and an ordered-set ets table.
  32. %% Effort is made to perform as much work as possible in the client without
  33. %% sacrificing consistency. A few things can be tuned by setting the following
  34. %% application environment variables in the top application of `gproc'
  35. %% (usually `gproc'):
  36. %%
  37. %% * `{ets_options, list()}' - Currently, the options `{write_concurrency, F}'
  38. %% and `{read_concurrency, F}' are allowed. The default is
  39. %% `[{write_concurrency, true}, {read_concurrency, true}]'
  40. %% * `{server_options, list()}' - These will be passed as spawn options when
  41. %% starting the `gproc' and `gproc_dist' servers. Default is `[]'. It is
  42. %% likely that `{priority, high | max}' and/or increasing `min_heap_size'
  43. %% will improve performance.
  44. %%
  45. %% @end
  46. -module(gproc).
  47. -behaviour(gen_server).
  48. -export([start_link/0,
  49. reg/1, reg/2, reg/3, unreg/1, set_attributes/2,
  50. reg_other/2, reg_other/3, reg_other/4, unreg_other/2,
  51. reg_or_locate/1, reg_or_locate/2, reg_or_locate/3,
  52. reg_shared/1, reg_shared/2, reg_shared/3, unreg_shared/1,
  53. set_attributes_shared/2, set_value_shared/2,
  54. ensure_reg/1, ensure_reg/2, ensure_reg/3,
  55. ensure_reg_other/2, ensure_reg_other/3, ensure_reg_other/4,
  56. mreg/3,
  57. munreg/3,
  58. set_value/2,
  59. get_value/1, get_value/2, get_value_shared/1,
  60. get_attribute/2, get_attribute/3, get_attribute_shared/2,
  61. get_attributes/1, get_attributes/2,
  62. where/1,
  63. await/1, await/2, await/3,
  64. wide_await/3,
  65. nb_wait/1, nb_wait/2,
  66. cancel_wait/2, cancel_wait/3,
  67. cancel_wait_or_monitor/1,
  68. monitor/1, monitor/2,
  69. demonitor/2,
  70. lookup_pid/1,
  71. lookup_pids/1,
  72. lookup_value/1,
  73. lookup_values/1,
  74. update_counter/2, update_counter/3,
  75. update_counters/2,
  76. reset_counter/1,
  77. update_shared_counter/2,
  78. give_away/2,
  79. goodbye/0,
  80. send/2,
  81. bcast/2, bcast/3,
  82. info/1, info/2,
  83. i/0,
  84. select/1, select/2, select/3,
  85. select_count/1, select_count/2,
  86. first/1,
  87. next/2,
  88. prev/2,
  89. last/1,
  90. table/0, table/1, table/2]).
  91. %% Environment handling
  92. -export([get_env/3, get_env/4,
  93. get_set_env/3, get_set_env/4,
  94. set_env/5]).
  95. %% Convenience functions
  96. -export([add_local_name/1,
  97. add_global_name/1,
  98. add_local_property/2,
  99. add_global_property/2,
  100. add_local_counter/2,
  101. add_global_counter/2,
  102. add_local_aggr_counter/1,
  103. add_global_aggr_counter/1,
  104. add_shared_local_counter/2,
  105. lookup_local_name/1,
  106. lookup_global_name/1,
  107. lookup_local_properties/1,
  108. lookup_global_properties/1,
  109. lookup_local_counters/1,
  110. lookup_global_counters/1,
  111. lookup_local_aggr_counter/1,
  112. lookup_global_aggr_counter/1]).
  113. %% Callbacks for behaviour support
  114. -export([whereis_name/1,
  115. register_name/2,
  116. unregister_name/1]).
  117. -export([default/1]).
  118. %%% internal exports
  119. -export([init/1,
  120. handle_cast/2,
  121. handle_call/3,
  122. handle_info/2,
  123. code_change/3,
  124. terminate/2]).
  125. %% this shouldn't be necessary
  126. -export([audit_process/1]).
  127. -include("gproc_int.hrl").
  128. -include("gproc.hrl").
  129. -export_type([scope/0, type/0, key/0,
  130. context/0, sel_pattern/0, sel_scope/0, sel_context/0,
  131. reg_id/0, unique_id/0, monitor_type/0]).
  132. -type type() :: n | p | c | a | r | rc.
  133. -type scope() :: l | g.
  134. -type context() :: {scope(),type()} | type().
  135. -type sel_type() :: type()
  136. | names | props | counters | aggr_counters
  137. | resources | resource_counters.
  138. -type sel_var() :: '_' | atom().
  139. -type keypat() :: {sel_type() | sel_var(), l | g | sel_var(), any()}.
  140. -type pidpat() :: pid() | sel_var().
  141. -type headpat() :: {keypat(), pidpat(), any()}.
  142. -type key() :: {type(), scope(), any()}.
  143. -type value() :: any().
  144. -type attr() :: {atom(), any()}.
  145. -type attrs() :: [attr()].
  146. -type sel_pattern() :: [{headpat(), list(), list()}].
  147. -type reg_id() :: {type(), scope(), any()}.
  148. -type unique_id() :: {n | a, scope(), any()}.
  149. -type monitor_type() :: info | standby | follow.
  150. -type sel_scope() :: scope | all | global | local.
  151. -type sel_context() :: {scope(), type()} | type().
  152. %% update_counter increment
  153. -type ctr_incr() :: integer().
  154. -type ctr_thr() :: integer().
  155. -type ctr_setval() :: integer().
  156. -type ctr_update() :: ctr_incr()
  157. | {ctr_incr(), ctr_thr(), ctr_setval()}.
  158. -type increment() :: ctr_incr() | ctr_update() | [ctr_update()].
  159. -define(SERVER, ?MODULE).
  160. %%-define(l, l(?LINE)). % when activated, calls a traceable empty function
  161. -define(l, ignore).
  162. -define(CHK_DIST,
  163. case whereis(gproc_dist) of
  164. undefined ->
  165. ?THROW_GPROC_ERROR(local_only);
  166. _ ->
  167. ok
  168. end).
  169. -define(PID_IS_DEAD(Pid),
  170. (node(Pid) == node() andalso is_process_alive(Pid) == false)).
  171. -record(state, {}).
  172. %% @spec () -> {ok, pid()}
  173. %%
  174. %% @doc Starts the gproc server.
  175. %%
  176. %% This function is intended to be called from gproc_sup, as part of
  177. %% starting the gproc application.
  178. %% @end
  179. start_link() ->
  180. _ = create_tabs(),
  181. SpawnOpts = gproc_lib:valid_opts(server_options, []),
  182. gen_server:start_link({local, ?SERVER}, ?MODULE, [],
  183. [{spawn_opt, SpawnOpts}]).
  184. %% spec(Name::any()) -> true
  185. %%
  186. %% @doc Registers a local (unique) name. @equiv reg({n,l,Name})
  187. %% @end
  188. %%
  189. add_local_name(Name) ->
  190. ?CATCH_GPROC_ERROR(reg1({n,l,Name}, undefined, [], reg), [Name]).
  191. %% spec(Name::any()) -> true
  192. %%
  193. %% @doc Registers a global (unique) name. @equiv reg({n,g,Name})
  194. %% @end
  195. %%
  196. add_global_name(Name) ->
  197. ?CATCH_GPROC_ERROR(reg1({n,g,Name}, undefined, [], reg), [Name]).
  198. %% spec(Name::any(), Value::any()) -> true
  199. %%
  200. %% @doc Registers a local (non-unique) property. @equiv reg({p,l,Name},Value)
  201. %% @end
  202. %%
  203. add_local_property(Name , Value) ->
  204. ?CATCH_GPROC_ERROR(reg1({p,l,Name}, Value, [], reg), [Name, Value]).
  205. %% spec(Name::any(), Value::any()) -> true
  206. %%
  207. %% @doc Registers a global (non-unique) property. @equiv reg({p,g,Name},Value)
  208. %% @end
  209. %%
  210. add_global_property(Name, Value) ->
  211. ?CATCH_GPROC_ERROR(reg1({p,g,Name}, Value, [], reg), [Name, Value]).
  212. %% spec(Name::any(), Initial::integer()) -> true
  213. %%
  214. %% @doc Registers a local (non-unique) counter. @equiv reg({c,l,Name},Value)
  215. %% @end
  216. %%
  217. add_local_counter(Name, Initial) when is_integer(Initial) ->
  218. ?CATCH_GPROC_ERROR(reg1({c,l,Name}, Initial, [], reg), [Name, Initial]).
  219. %% spec(Name::any(), Initial::integer()) -> true
  220. %%
  221. %% @doc Registers a local shared (unique) counter.
  222. %% @equiv reg_shared({c,l,Name},Value)
  223. %% @end
  224. %%
  225. add_shared_local_counter(Name, Initial) when is_integer(Initial) ->
  226. reg_shared({c,l,Name}, Initial).
  227. %% spec(Name::any(), Initial::integer()) -> true
  228. %%
  229. %% @doc Registers a global (non-unique) counter. @equiv reg({c,g,Name},Value)
  230. %% @end
  231. %%
  232. add_global_counter(Name, Initial) when is_integer(Initial) ->
  233. ?CATCH_GPROC_ERROR(reg1({c,g,Name}, Initial, [], reg), [Name, Initial]).
  234. %% spec(Name::any()) -> true
  235. %%
  236. %% @doc Registers a local (unique) aggregated counter.
  237. %% @equiv reg({a,l,Name})
  238. %% @end
  239. %%
  240. add_local_aggr_counter(Name) -> ?CATCH_GPROC_ERROR(reg1({a,l,Name}), [Name]).
  241. %% spec(Name::any()) -> true
  242. %%
  243. %% @doc Registers a global (unique) aggregated counter.
  244. %% @equiv reg({a,g,Name})
  245. %% @end
  246. %%
  247. add_global_aggr_counter(Name) ->
  248. ?CATCH_GPROC_ERROR(reg1({a,g,Name}), [Name]).
  249. %% @spec (Name::any()) -> pid()
  250. %%
  251. %% @doc Lookup a local unique name. Fails if there is no such name.
  252. %% @equiv where({n,l,Name})
  253. %% @end
  254. %%
  255. lookup_local_name(Name) -> where({n,l,Name}).
  256. %% @spec (Name::any()) -> pid()
  257. %%
  258. %% @doc Lookup a global unique name. Fails if there is no such name.
  259. %% @equiv where({n,g,Name})
  260. %% @end
  261. %%
  262. lookup_global_name(Name) -> where({n,g,Name}).
  263. %% @spec (Name::any()) -> integer()
  264. %%
  265. %% @doc Lookup a local (unique) aggregated counter and returns its value.
  266. %% Fails if there is no such object.
  267. %% @equiv where({a,l,Name})
  268. %% @end
  269. %%
  270. lookup_local_aggr_counter(Name) -> lookup_value({a,l,Name}).
  271. %% @spec (Name::any()) -> integer()
  272. %%
  273. %% @doc Lookup a global (unique) aggregated counter and returns its value.
  274. %% Fails if there is no such object.
  275. %% @equiv lookup_value({a,g,Name})
  276. %% @end
  277. %%
  278. lookup_global_aggr_counter(Name) -> lookup_value({a,g,Name}).
  279. %% @spec (Property::any()) -> [{pid(), Value}]
  280. %%
  281. %% @doc Look up all local (non-unique) instances of a given Property.
  282. %% Returns a list of {Pid, Value} tuples for all matching objects.
  283. %% @equiv lookup_values({p, l, Property})
  284. %% @end
  285. %%
  286. lookup_local_properties(P) -> lookup_values({p,l,P}).
  287. %% @spec (Property::any()) -> [{pid(), Value}]
  288. %%
  289. %% @doc Look up all global (non-unique) instances of a given Property.
  290. %% Returns a list of {Pid, Value} tuples for all matching objects.
  291. %% @equiv lookup_values({p, g, Property})
  292. %% @end
  293. %%
  294. lookup_global_properties(P) -> lookup_values({p,g,P}).
  295. %% @spec (Counter::any()) -> [{pid(), Value::integer()}]
  296. %%
  297. %% @doc Look up all local (non-unique) instances of a given Counter.
  298. %% Returns a list of {Pid, Value} tuples for all matching objects.
  299. %% @equiv lookup_values({c, l, Counter})
  300. %% @end
  301. %%
  302. lookup_local_counters(P) -> lookup_values({c,l,P}).
  303. %% @spec (Counter::any()) -> [{pid(), Value::integer()}]
  304. %%
  305. %% @doc Look up all global (non-unique) instances of a given Counter.
  306. %% Returns a list of {Pid, Value} tuples for all matching objects.
  307. %% @equiv lookup_values({c, g, Counter})
  308. %% @end
  309. %%
  310. lookup_global_counters(P) -> lookup_values({c,g,P}).
  311. %% @spec get_env(Scope::scope(), App::atom(), Key::atom()) -> term()
  312. %% @equiv get_env(Scope, App, Key, [app_env])
  313. get_env(Scope, App, Key) ->
  314. get_env(Scope, App, Key, [app_env]).
  315. %% @spec (Scope::scope(), App::atom(), Key::atom(), Strategy) -> term()
  316. %% Strategy = [Alternative]
  317. %% Alternative = app_env
  318. %% | os_env
  319. %% | inherit | {inherit, pid()} | {inherit, unique_id()}
  320. %% | init_arg
  321. %% | {mnesia, ActivityType, Oid, Pos}
  322. %% | {default, term()}
  323. %% | error
  324. %% @doc Read an environment value, potentially cached as a `gproc_env' property.
  325. %%
  326. %% This function first tries to read the value of a cached property,
  327. %% `{p, Scope, {gproc_env, App, Key}}'. If this fails, it will try the provided
  328. %% alternative strategy. `Strategy' is a list of alternatives, tried in order.
  329. %% Each alternative can be one of:
  330. %%
  331. %% * `app_env' - try `application:get_env(App, Key)'
  332. %% * `os_env' - try `os:getenv(ENV)', where `ENV' is `Key' converted into an
  333. %% uppercase string
  334. %% * `{os_env, ENV}' - try `os:getenv(ENV)'
  335. %% * `inherit' - inherit the cached value, if any, held by the parent process.
  336. %% * `{inherit, Pid}' - inherit the cached value, if any, held by `Pid'.
  337. %% * `{inherit, Id}' - inherit the cached value, if any, held by the process
  338. %% registered in `gproc' as `Id'.
  339. %% * `init_arg' - try `init:get_argument(Key)'; expects a single value, if any.
  340. %% * `{mnesia, ActivityType, Oid, Pos}' - try
  341. %% `mnesia:activity(ActivityType, fun() -> mnesia:read(Oid) end)'; retrieve
  342. %% the value in position `Pos' if object found.
  343. %% * `{default, Value}' - set a default value to return once alternatives have
  344. %% been exhausted; if not set, `undefined' will be returned.
  345. %% * `error' - raise an exception, `erlang:error(gproc_env, [App, Key, Scope])'.
  346. %%
  347. %% While any alternative can occur more than once, the only one that might make
  348. %% sense to use multiple times is `{default, Value}'.
  349. %%
  350. %% The return value will be one of:
  351. %%
  352. %% * The value of the first matching alternative, or `error' eception,
  353. %% whichever comes first
  354. %% * The last instance of `{default, Value}', or `undefined', if there is no
  355. %% matching alternative, default or `error' entry in the list.
  356. %%
  357. %% The `error' option can be used to assert that a value has been previously
  358. %% cached. Alternatively, it can be used to assert that a value is either cached
  359. %% or at least defined somewhere,
  360. %% e.g. `get_env(l, mnesia, dir, [app_env, error])'.
  361. %% @end
  362. get_env(Scope, App, Key, Strategy)
  363. when Scope==l, is_atom(App), is_atom(Key);
  364. Scope==g, is_atom(App), is_atom(Key) ->
  365. do_get_env(Scope, App, Key, Strategy, false).
  366. %% @spec get_set_env(Scope::scope(), App::atom(), Key::atom()) -> term()
  367. %% @equiv get_set_env(Scope, App, Key, [app_env])
  368. get_set_env(Scope, App, Key) ->
  369. get_set_env(Scope, App, Key, [app_env]).
  370. %% @spec get_set_env(Scope::scope(), App::atom(), Key::atom(), Strategy) ->
  371. %% Value
  372. %% @doc Fetch and cache an environment value, if not already cached.
  373. %%
  374. %% This function does the same thing as {@link get_env/4}, but also updates the
  375. %% cache. Note that the cache will be updated even if the result of the lookup
  376. %% is `undefined'.
  377. %%
  378. %% @see get_env/4.
  379. %% @end
  380. %%
  381. get_set_env(Scope, App, Key, Strategy)
  382. when Scope==l, is_atom(App), is_atom(Key);
  383. Scope==g, is_atom(App), is_atom(Key) ->
  384. do_get_env(Scope, App, Key, Strategy, true).
  385. do_get_env(Context, App, Key, Alternatives, Set) ->
  386. case lookup_env(Context, App, Key, self()) of
  387. undefined ->
  388. check_alternatives(Alternatives, Context, App, Key, undefined, Set);
  389. {ok, Value} ->
  390. Value
  391. end.
  392. %% @spec set_env(Scope::scope(), App::atom(),
  393. %% Key::atom(), Value::term(), Strategy) -> Value
  394. %% Strategy = [Alternative]
  395. %% Alternative = app_env | os_env | {os_env, VAR}
  396. %% | {mnesia, ActivityType, Oid, Pos}
  397. %%
  398. %% @doc Updates the cached value as well as underlying environment.
  399. %%
  400. %% This function should be exercised with caution, as it affects the larger
  401. %% environment outside gproc. This function modifies the cached value, and then
  402. %% proceeds to update the underlying environment (OS environment variable or
  403. %% application environment variable).
  404. %%
  405. %% When the `mnesia' alternative is used, gproc will try to update any existing
  406. %% object, changing only the `Pos' position. If no such object exists, it will
  407. %% create a new object, setting any other attributes (except `Pos' and the key)
  408. %% to `undefined'.
  409. %% @end
  410. %%
  411. set_env(Scope, App, Key, Value, Strategy)
  412. when Scope==l, is_atom(App), is_atom(Key);
  413. Scope==g, is_atom(App), is_atom(Key) ->
  414. case is_valid_set_strategy(Strategy, Value) of
  415. true ->
  416. update_cached_env(Scope, App, Key, Value),
  417. set_strategy(Strategy, App, Key, Value);
  418. false ->
  419. erlang:error(badarg)
  420. end.
  421. check_alternatives([{default, Val}|Alts], Scope, App, Key, _, Set) ->
  422. check_alternatives(Alts, Scope, App, Key, Val, Set);
  423. check_alternatives([H|T], Scope, App, Key, Def, Set) ->
  424. case try_alternative(H, App, Key, Scope) of
  425. undefined ->
  426. check_alternatives(T, Scope, App, Key, Def, Set);
  427. {ok, Value} ->
  428. if Set ->
  429. cache_env(Scope, App, Key, Value),
  430. Value;
  431. true ->
  432. Value
  433. end
  434. end;
  435. check_alternatives([], Scope, App, Key, Def, Set) ->
  436. if Set ->
  437. cache_env(Scope, App, Key, Def);
  438. true ->
  439. ok
  440. end,
  441. Def.
  442. try_alternative(error, App, Key, Scope) ->
  443. erlang:error(gproc_env, [App, Key, Scope]);
  444. try_alternative(inherit, App, Key, Scope) ->
  445. case get('$ancestors') of
  446. [P|_] ->
  447. lookup_env(Scope, App, Key, P);
  448. _ ->
  449. undefined
  450. end;
  451. try_alternative({inherit, P}, App, Key, Scope) when is_pid(P) ->
  452. lookup_env(Scope, App, Key, P);
  453. try_alternative({inherit, P}, App, Key, Scope) ->
  454. case where(P) of
  455. undefined -> undefined;
  456. Pid when is_pid(Pid) ->
  457. lookup_env(Scope, App, Key, Pid)
  458. end;
  459. try_alternative(app_env, App, Key, _Scope) ->
  460. case application:get_env(App, Key) of
  461. undefined -> undefined;
  462. {ok, undefined} -> undefined;
  463. {ok, Value} -> {ok, Value}
  464. end;
  465. try_alternative(os_env, _App, Key, _) ->
  466. case os:getenv(os_env_key(Key)) of
  467. false -> undefined;
  468. Val -> {ok, Val}
  469. end;
  470. try_alternative({os_env, Key}, _, _, _) ->
  471. case os:getenv(Key) of
  472. false -> undefined;
  473. Val -> {ok, Val}
  474. end;
  475. try_alternative(init_arg, _, Key, _) ->
  476. case init:get_argument(Key) of
  477. {ok, [[Value]]} ->
  478. {ok, Value};
  479. error ->
  480. undefined
  481. end;
  482. try_alternative({mnesia,Type,Key,Pos}, _, _, _) ->
  483. case mnesia:activity(Type, fun() -> mnesia:read(Key) end) of
  484. [] -> undefined;
  485. [Found] ->
  486. {ok, element(Pos, Found)}
  487. end.
  488. os_env_key(Key) ->
  489. string:to_upper(atom_to_list(Key)).
  490. lookup_env(Scope, App, Key, P) ->
  491. case ets:lookup(?TAB, {{p, Scope, {gproc_env, App, Key}}, P}) of
  492. [] ->
  493. undefined;
  494. [{_, _, Value}] ->
  495. {ok, Value}
  496. end.
  497. cache_env(Scope, App, Key, Value) ->
  498. ?CATCH_GPROC_ERROR(
  499. reg1({p, Scope, {gproc_env, App, Key}}, Value, [], reg),
  500. [Scope,App,Key,Value]).
  501. update_cached_env(Scope, App, Key, Value) ->
  502. case lookup_env(Scope, App, Key, self()) of
  503. undefined ->
  504. cache_env(Scope, App, Key, Value);
  505. {ok, _} ->
  506. set_value({p, Scope, {gproc_env, App, Key}}, Value)
  507. end.
  508. is_valid_set_strategy([os_env|T], Value) ->
  509. is_string(Value) andalso is_valid_set_strategy(T, Value);
  510. is_valid_set_strategy([{os_env, _}|T], Value) ->
  511. is_string(Value) andalso is_valid_set_strategy(T, Value);
  512. is_valid_set_strategy([app_env|T], Value) ->
  513. is_valid_set_strategy(T, Value);
  514. is_valid_set_strategy([{mnesia,_Type,_Oid,_Pos}|T], Value) ->
  515. is_valid_set_strategy(T, Value);
  516. is_valid_set_strategy([], _) ->
  517. true;
  518. is_valid_set_strategy(_, _) ->
  519. false.
  520. set_strategy([H|T], App, Key, Value) ->
  521. case H of
  522. app_env ->
  523. application:set_env(App, Key, Value);
  524. os_env ->
  525. os:putenv(os_env_key(Key), Value);
  526. {os_env, ENV} ->
  527. os:putenv(ENV, Value);
  528. {mnesia,Type,Oid,Pos} ->
  529. mnesia:activity(
  530. Type,
  531. fun() ->
  532. Rec = case mnesia:read(Oid) of
  533. [] ->
  534. {Tab,K} = Oid,
  535. Tag = mnesia:table_info(Tab, record_name),
  536. Attrs = mnesia:table_info(Tab, attributes),
  537. list_to_tuple(
  538. [Tag,K |
  539. [undefined || _ <- tl(Attrs)]]);
  540. [Old] ->
  541. Old
  542. end,
  543. mnesia:write(setelement(Pos, Rec, Value))
  544. end)
  545. end,
  546. set_strategy(T, App, Key, Value);
  547. set_strategy([], _, _, Value) ->
  548. Value.
  549. is_string(S) ->
  550. try begin _ = iolist_to_binary(S),
  551. true
  552. end
  553. catch
  554. error:_ ->
  555. false
  556. end.
  557. %% @spec reg(Key::key()) -> true
  558. %%
  559. %% @doc
  560. %% @equiv reg(Key, default(Key), [])
  561. %% @end
  562. reg(Key) ->
  563. ?CATCH_GPROC_ERROR(reg1(Key), [Key]).
  564. reg1(Key) ->
  565. reg1(Key, default(Key), [], reg).
  566. %% @spec reg_or_locate(Key::key()) -> {pid(), NewValue}
  567. %%
  568. %% @doc
  569. %% @equiv reg_or_locate(Key, default(Key))
  570. %% @end
  571. reg_or_locate(Key) ->
  572. ?CATCH_GPROC_ERROR(reg_or_locate1(Key), [Key]).
  573. reg_or_locate1(Key) ->
  574. reg_or_locate1(Key, default(Key), self()).
  575. default({T,_,_}) when T==c -> 0;
  576. default(_) -> undefined.
  577. %% @spec await(Key::key()) -> {pid(),Value}
  578. %% @equiv await(Key,infinity)
  579. %%
  580. await(Key) ->
  581. ?CATCH_GPROC_ERROR(await1(Key, infinity), [Key]).
  582. %% @spec await(Key::key(), Timeout) -> {pid(),Value}
  583. %% Timeout = integer() | infinity
  584. %%
  585. %% @doc Wait for a name or aggregated counter to be registered.
  586. %% The function raises an exception if the timeout expires. Timeout must be
  587. %% either an interger &gt; 0 or 'infinity'.
  588. %% A small optimization: we first perform a lookup, to see if the name
  589. %% is already registered. This way, the cost of the operation will be
  590. %% roughly the same as of where/1 in the case where the name is already
  591. %% registered (the difference: await/2 also returns the value).
  592. %% @end
  593. %%
  594. await(Key, Timeout) ->
  595. ?CATCH_GPROC_ERROR(await1(Key, Timeout), [Key, Timeout]).
  596. %% @spec await(Node::node(), Key::key(), Timeout) -> {pid(),Value}
  597. %% Timeout = integer() | infinity
  598. %%
  599. %% @doc Wait for a name or aggregated counter to be registered on `Node'.
  600. %% This function works exactly like {@link await/2}, but queries a remote
  601. %% node instead. An exception is thrown if `Node' cannot be reached. If gproc
  602. %% is not running on a given node, this is treated the same as the node being
  603. %% down.
  604. %% @end
  605. %%
  606. await(Node, Key, Timeout) when Node == node() ->
  607. await(Key, Timeout);
  608. await(Node, Key, Timeout) when is_atom(Node) ->
  609. ?CATCH_GPROC_ERROR(await1(Node, Key, Timeout), [Node, Key, Timeout]).
  610. await1({T,g,_} = Key, Timeout) when T=:=n; T=:=a; T=:=rc ->
  611. ?CHK_DIST,
  612. request_wait(Key, Timeout);
  613. await1({T,l,_} = Key, Timeout) when T=:=n; T=:=a; T=:=rc ->
  614. case ets:lookup(?TAB, {Key, T}) of
  615. [{_, Pid, Value}] ->
  616. case is_process_alive(Pid) of
  617. true ->
  618. {Pid, Value};
  619. false ->
  620. %% we can send an asynchronous audit request, since the purpose is
  621. %% only to ensure that the server handles the audit before it serves
  622. %% our 'await' request. Strictly speaking, we could allow the bad Pid
  623. %% to be returned, as there are no guarantees that whatever Pid we return
  624. %% will still be alive when addressed. Still, we don't want to knowingly
  625. %% serve bad data.
  626. nb_audit_process(Pid),
  627. request_wait(Key, Timeout)
  628. end;
  629. _ ->
  630. request_wait(Key, Timeout)
  631. end;
  632. await1(_, _) ->
  633. ?THROW_GPROC_ERROR(badarg).
  634. await1(N, {n,l,_} = Key, Timeout) when is_atom(N) ->
  635. request_wait(N, Key, Timeout);
  636. await1(_, _, _) ->
  637. ?THROW_GPROC_ERROR(badarg).
  638. request_wait({_,g,_} = Key, Timeout) ->
  639. request_wait(undefined, Key, Timeout);
  640. request_wait(Key, Timeout) ->
  641. request_wait(node(), Key, Timeout).
  642. request_wait(N, {_,C,_} = Key, Timeout) when C==l; C==g ->
  643. TRef = case Timeout of
  644. infinity -> no_timer;
  645. T when is_integer(T), T > 0 ->
  646. erlang:start_timer(T, self(), gproc_timeout);
  647. _ ->
  648. ?THROW_GPROC_ERROR(badarg)
  649. end,
  650. WRef = case {call(N, {await,Key,self()}, C), C} of
  651. {{R, {Kg,Pg,Vg}}, g} ->
  652. self() ! {gproc, R, registered, {Kg,Pg,Vg}},
  653. R;
  654. {R,_} ->
  655. R
  656. end,
  657. receive
  658. {gproc, WRef, registered, {_K, Pid, V}} ->
  659. _ = case TRef of
  660. no_timer -> ignore;
  661. _ -> erlang:cancel_timer(TRef)
  662. end,
  663. {Pid, V};
  664. {timeout, TRef, gproc_timeout} ->
  665. cancel_wait(N, Key, WRef),
  666. ?THROW_GPROC_ERROR(timeout)
  667. end.
  668. %% @spec wide_await(Nodes::[node()], Key::key(), Timeout) -> {pid(),Value}
  669. %% Timeout = integer() | infinity
  670. %%
  671. %% @doc Wait for a local name to be registered on any of `Nodes'.
  672. %% This function works rather like {@link await/2}, but queries all nodes in
  673. %% the `Nodes' list at the same time. The first node to respond with a
  674. %% process registered as `Key' will provide the result. Other results are
  675. %% ignored. `Key' must be a unique name with local scope, i.e. `{n,l,Name}'.
  676. %%
  677. %% An exception is thrown upon timeout, or if no node can be reached (if gproc is
  678. %% not running on a given node, this is treated the same as the node being down).
  679. %% @end
  680. %%
  681. wide_await(Nodes, Key, Timeout) ->
  682. ?CATCH_GPROC_ERROR(wide_await1(Nodes, Key, Timeout), [Nodes, Key, Timeout]).
  683. wide_await1(Nodes, {T,l,_} = Key, Timeout) when T=:=n; T=:=a ->
  684. {_, Ref} = spawn_monitor(fun() ->
  685. wide_request_wait(Nodes, Key, Timeout)
  686. end),
  687. receive
  688. {'DOWN', Ref, _, _, Reason} ->
  689. case Reason of
  690. {ok, {gproc,_,registered,{_,Pid,V}}} ->
  691. {Pid, V};
  692. Other ->
  693. ?THROW_GPROC_ERROR(Other)
  694. end
  695. end;
  696. wide_await1(_, _, _) ->
  697. ?THROW_GPROC_ERROR(badarg).
  698. wide_request_wait(Nodes, {Tk,l,_} = Key, Timeout) when Tk=:=n; Tk=:=a ->
  699. TRef = case Timeout of
  700. infinity -> no_timer;
  701. T when is_integer(T), T > 0 ->
  702. erlang:start_timer(T, self(), gproc_timeout);
  703. _ ->
  704. exit(badarg)
  705. end,
  706. Req = {await, Key, self()},
  707. Refs = lists:map(
  708. fun(Node) ->
  709. S = {?MODULE, Node},
  710. Ref = erlang:monitor(process, S),
  711. ?MAY_FAIL(erlang:send(S, {'$gen_call', {self(), Ref}, Req},
  712. [noconnect])),
  713. {Node, Ref}
  714. end, Nodes),
  715. collect_replies(Refs, Key, TRef).
  716. collect_replies(Refs, Key, TRef) ->
  717. receive
  718. {gproc, _Ref, registered, {_, _, _}} = Result ->
  719. exit({ok, Result});
  720. {'DOWN', Ref, _, _, _} ->
  721. case lists:keydelete(Ref, 2, Refs) of
  722. [] ->
  723. exit(nodedown);
  724. Refs1 ->
  725. collect_replies(Refs1, Key, TRef)
  726. end;
  727. {timeout, TRef, gproc_timeout} ->
  728. exit(timeout);
  729. {Ref, Ref} ->
  730. %% ignore
  731. collect_replies(Refs, Key, TRef)
  732. end.
  733. %% @spec nb_wait(Key::key()) -> Ref
  734. %%
  735. %% @doc Wait for a name or aggregated counter to be registered.
  736. %% The caller can expect to receive a message,
  737. %% {gproc, Ref, registered, {Key, Pid, Value}}, once the name is registered.
  738. %% @end
  739. %%
  740. nb_wait(Key) ->
  741. ?CATCH_GPROC_ERROR(nb_wait1(Key), [Key]).
  742. %% @spec nb_wait(Node::node(), Key::key()) -> Ref
  743. %%
  744. %% @doc Wait for a name or aggregated counter to be registered on `Node'.
  745. %% The caller can expect to receive a message,
  746. %% {gproc, Ref, registered, {Key, Pid, Value}}, once the name is registered.
  747. %% @end
  748. %%
  749. nb_wait(Node, Key) ->
  750. ?CATCH_GPROC_ERROR(nb_wait1(Node, Key), [Node, Key]).
  751. nb_wait1({T,g,_} = Key) when T=:=n; T=:=a; T=:=rc ->
  752. ?CHK_DIST,
  753. call({await, Key, self()}, g);
  754. nb_wait1({T,l,_} = Key) when T=:=n; T=:=a; T=:=rc ->
  755. call({await, Key, self()}, l);
  756. nb_wait1(_) ->
  757. ?THROW_GPROC_ERROR(badarg).
  758. nb_wait1(Node, {T,l,_} = Key) when is_atom(Node), T=:=n;
  759. is_atom(Node), T=:=a;
  760. is_atom(Node), T=:=rc ->
  761. call(Node, {await, Key, self()}, l).
  762. %% @spec cancel_wait(Key::key(), Ref) -> ok
  763. %% Ref = all | reference()
  764. %%
  765. %% @doc Cancels a previous call to nb_wait/1
  766. %%
  767. %% If `Ref = all', all wait requests on `Key' from the calling process
  768. %% are canceled.
  769. %% @end
  770. %%
  771. cancel_wait(Key, Ref) ->
  772. ?CATCH_GPROC_ERROR(cancel_wait1(Key, Ref), [Key, Ref]).
  773. %% @spec cancel_wait(Node::node(), Key::key(), Ref) -> ok
  774. %% Ref = all | reference()
  775. %%
  776. %% @doc Cancels a previous call to nb_wait/2
  777. %%
  778. %% This function works just like {@link cancel_wait/2}, but talks to a remote
  779. %% node.
  780. %% @end
  781. %%
  782. cancel_wait(N, Key, Ref) when N == node() ->
  783. cancel_wait(Key, Ref);
  784. cancel_wait(N, Key, Ref) ->
  785. ?CATCH_GPROC_ERROR(cancel_wait1(N, Key, Ref), [N, Key, Ref]).
  786. cancel_wait1({_,g,_} = Key, Ref) ->
  787. ?CHK_DIST,
  788. cast({cancel_wait, self(), Key, Ref}, g),
  789. ok;
  790. cancel_wait1({_,l,_} = Key, Ref) ->
  791. cast({cancel_wait, self(), Key, Ref}, l),
  792. ok.
  793. cancel_wait1(undefined, {_,g,_} = Key, Ref) ->
  794. cast({cancel_wait, self(), Key, Ref}, g);
  795. cancel_wait1(N, {_,l,_} = Key, Ref) ->
  796. cast(N, {cancel_wait, self(), Key, Ref}, l).
  797. cancel_wait_or_monitor(Key) ->
  798. ?CATCH_GPROC_ERROR(cancel_wait_or_monitor1(Key), [Key]).
  799. cancel_wait_or_monitor1({_,g,_} = Key) ->
  800. ?CHK_DIST,
  801. cast({cancel_wait_or_monitor, self(), Key}, g),
  802. ok;
  803. cancel_wait_or_monitor1({_,l,_} = Key) ->
  804. cast({cancel_wait_or_monitor, self(), Key}, l),
  805. ok.
  806. %% @equiv monitor(Key, info)
  807. monitor(Key) ->
  808. ?CATCH_GPROC_ERROR(monitor1(Key, info), [Key]).
  809. %% @spec monitor(key(), monitor_type()) -> reference()
  810. %%
  811. %% @doc monitor a registered name
  812. %% `monitor(Key, info)' works much like erlang:monitor(process, Pid), but monitors
  813. %% a unique name registered via gproc. A message, `{gproc, unreg, Ref, Key}'
  814. %% will be sent to the requesting process, if the name is unregistered or
  815. %% the registered process dies. If there is a standby monitor (see below), a
  816. %% message `{gproc, {failover, ToPid}, Ref, Key}' is sent to all monitors.
  817. %% If the name is passed to another process using {@link give_away/2}, the event
  818. %% `{gproc, {migrated, ToPid}, Ref, Key}' is sent to all monitors.
  819. %%
  820. %% `monitor(Key, standby)' sets up the monitoring process as a standby for the
  821. %% registered name. If the registered process dies, the first standby process
  822. %% inherits the name, and a message `{gproc, {failover, ToPid}, Ref, Key}' is
  823. %% sent to all monitors, including the one that inherited the name.
  824. %%
  825. %% If the name is not yet registered, the unreg event is sent immediately.
  826. %% If the calling process in this case tried to start a `standby' monitoring,
  827. %% it receives the registered name and the failover event immediately.
  828. %%
  829. %% `monitor(Key, follow)' keeps monitoring the registered name even if it is
  830. %% temporarily unregistered. The messages received are the same as for the other
  831. %% monitor types, but `{gproc, registered, Ref, Key}' is also sent when a new
  832. %% process registers the name.
  833. %% @end
  834. monitor(Key, Type) when Type==info;
  835. Type==follow;
  836. Type==standby ->
  837. ?CATCH_GPROC_ERROR(monitor1(Key, Type), [Key, Type]).
  838. monitor1({T,g,_} = Key, Type) when T==n; T==a ->
  839. ?CHK_DIST,
  840. gproc_dist:monitor(Key, Type);
  841. monitor1({T,l,_} = Key, Type) when T==n; T==a ->
  842. call({monitor, Key, self(), Type}, l);
  843. monitor1(_, _) ->
  844. ?THROW_GPROC_ERROR(badarg).
  845. %% @spec demonitor(key(), reference()) -> ok
  846. %%
  847. %% @doc Remove a monitor on a registered name
  848. %% This function is the reverse of monitor/1. It removes a monitor previously
  849. %% set on a unique name. This function always succeeds given legal input.
  850. %% @end
  851. demonitor(Key, Ref) ->
  852. ?CATCH_GPROC_ERROR(demonitor1(Key, Ref), [Key, Ref]).
  853. demonitor1({T,g,_} = Key, Ref) when T==n; T==a ->
  854. ?CHK_DIST,
  855. gproc_dist:demonitor(Key, Ref);
  856. demonitor1({T,l,_} = Key, Ref) when T==n; T==a ->
  857. call({demonitor, Key, Ref, self()}, l);
  858. demonitor1(_, _) ->
  859. ?THROW_GPROC_ERROR(badarg).
  860. %% @spec reg(Key::key(), Value::value()) -> true
  861. %%
  862. %% @doc Register a name or property for the current process
  863. %%
  864. %%
  865. reg(Key, Value) ->
  866. ?CATCH_GPROC_ERROR(reg1(Key, Value, [], reg), [Key, Value]).
  867. %% @spec reg(Key::key(), Value::value(), Attrs::attrs()) -> true
  868. %%
  869. %% @doc Register a name or property for the current process
  870. %% `Attrs' (default: `[]') can be inspected using {@link get_attribute/2}.
  871. %%
  872. %% The structure of `Key' is `{Type, Context, Name}', where:
  873. %%
  874. %% * `Context :: l | g' - `l' means 'local' context; `g' means 'global'
  875. %% * `Type :: p | n | c | a | r | rc' specifies the type of entry
  876. %%
  877. %% The semantics of the different types:
  878. %%
  879. %% * `p' - 'property', is non-unique, i.e. different processes can each
  880. %% register a property with the same name.
  881. %% * `n' - 'name, is unique within the given context (local or global).
  882. %% * `c' - 'counter', is similar to a property, but has a numeric value
  883. %% and behaves roughly as an ets counter (see {@link update_counter/2}.)
  884. %% * `a' - 'aggregated counter', is automatically updated by gproc, and
  885. %% reflects the sum of all counter objects with the same name in the given
  886. %% scope. The initial value for an aggregated counter must be `undefined'.
  887. %% * `r' - 'resource property', behaves like a property, but can be tracked
  888. %% with a 'resource counter'.
  889. %% * `rc' - 'resource counter', tracks the number of resource properties
  890. %% with the same name. When the resource count reaches `0', any triggers
  891. %% specified using an `on_zero' attribute may be executed (see below).
  892. %%
  893. %% On-zero triggers:
  894. %%
  895. %% `Msg = {gproc, resource_on_zero, Context, Name, Pid}'
  896. %%
  897. %% * `{send, Key}' - run `gproc:send(Key, Msg)'
  898. %% * `{bcast, Key}' - run `gproc:bcast(Key, Msg)'
  899. %% * `publish' - run
  900. %% `gproc_ps:publish(Context, gproc_resource_on_zero, {Context, Name, Pid})'
  901. %% * `{unreg_shared, Type, Name}' - unregister the shared key
  902. %% `{Type, Context, Name}'
  903. %% @end
  904. reg(Key, Value, Attrs) ->
  905. ?CATCH_GPROC_ERROR(reg1(Key, Value, Attrs, reg), [Key, Value, Attrs]).
  906. %% @equiv ensure_reg(Key, default(Key), [])
  907. ensure_reg(Key) ->
  908. ?CATCH_GPROC_ERROR(reg1(Key, ensure), [Key]).
  909. %% @equiv ensure_reg(Key, Value, [])
  910. -spec ensure_reg(key(), value()) -> new | updated.
  911. ensure_reg(Key, Value) ->
  912. ?CATCH_GPROC_ERROR(reg1(Key, Value, ensure), [Key, Value]).
  913. %% @spec ensure_reg(Key::key(), Value::value(), Attrs::attrs()) ->
  914. %% new | updated
  915. %%
  916. %% @doc Registers a new name or property unless such and entry (by key) has
  917. %% already been registered by the current process. If `Key' already exists,
  918. %% the entry will be updated with the given `Value' and `Attrs'.
  919. %%
  920. %% This function allows the caller to efficiently register an entry without
  921. %% first checking whether it has already been registered. An exception is
  922. %% raised if the name or property is already registered by someone else.
  923. %% @end
  924. -spec ensure_reg(key(), value(), attrs()) -> new | updated.
  925. ensure_reg(Key, Value, Attrs) ->
  926. ?CATCH_GPROC_ERROR(reg1(Key, Value, Attrs, ensure), [Key, Value, Attrs]).
  927. reg1(Key, Op) ->
  928. reg1(Key, default(Key), [], Op).
  929. reg1(Key, Value, Op) ->
  930. reg1(Key, Value, [], Op).
  931. reg1({T,g,_} = Key, Value, As, Op) when T==p; T==a; T==c; T==n; T==r; T==rc ->
  932. %% anything global
  933. ?CHK_DIST,
  934. gproc_dist:reg(Key, Value, As, Op);
  935. reg1({p,l,_} = Key, Value, As, Op) ->
  936. local_reg(Key, Value, As, Op);
  937. reg1({a,l,_} = Key, undefined, As, Op) ->
  938. call({reg, Key, undefined, As, Op});
  939. reg1({c,l,_} = Key, Value, As, Op) when is_integer(Value) ->
  940. call({reg, Key, Value, As, Op});
  941. reg1({n,l,_} = Key, Value, As, Op) ->
  942. call({reg, Key, Value, As, Op});
  943. reg1({r,l,_} = Key, Value, As, Op) ->
  944. call({reg, Key, Value, As, Op});
  945. reg1({rc,l,_} = Key, Value, As, Op) ->
  946. call({reg, Key, Value, As, Op});
  947. reg1(_, _, _, _) ->
  948. ?THROW_GPROC_ERROR(badarg).
  949. %% @equiv reg_other(Key, Pid, default(Key), [])
  950. reg_other(Key, Pid) ->
  951. ?CATCH_GPROC_ERROR(reg_other1(Key, Pid, reg), [Key, Pid]).
  952. %% @equiv reg_other(Key, Pid, Value, [])
  953. reg_other(Key, Pid, Value) ->
  954. ?CATCH_GPROC_ERROR(reg_other1(Key, Pid, Value, [], reg), [Key, Pid, Value]).
  955. %% @spec reg_other(Key, Pid, Value, Attrs) -> true
  956. %% @doc Register name or property to another process.
  957. %%
  958. %% Equivalent to {@link reg/3}, but allows for registration of another process
  959. %% instead of the current process.
  960. %%
  961. %% Note that registering other processes introduces the possibility of
  962. %% confusing race conditions in user code. Letting each process register
  963. %% its own resources is highly recommended.
  964. %%
  965. %% Only the following resource types can be registered through this function:
  966. %%
  967. %% * `n' - unique names
  968. %% * `a' - aggregated counters
  969. %% * `r' - resource properties
  970. %% * `rc' - resource counters
  971. %% @end
  972. reg_other(Key, Pid, Value, Attrs) ->
  973. ?CATCH_GPROC_ERROR(reg_other1(Key, Pid, Value, Attrs, reg),
  974. [Key, Pid, Value, Attrs]).
  975. ensure_reg_other(Key, Pid) ->
  976. ?CATCH_GPROC_ERROR(reg_other1(Key, Pid, ensure), [Key, Pid]).
  977. %% @equiv ensure_reg_other(Key, Pid, Value, [])
  978. ensure_reg_other(Key, Pid, Value) ->
  979. ?CATCH_GPROC_ERROR(reg_other1(Key, Pid, Value, [], ensure),
  980. [Key, Pid, Value]).
  981. %% @spec ensure_reg_other(Key::key(), Pid::pid(),
  982. %% Value::value(), Attrs::attrs()) ->
  983. %% new | updated
  984. %%
  985. %% @doc Register or update name or property to another process.
  986. %%
  987. %% Equivalent to {@link reg_other/3}, but allows for registration of another
  988. %% process instead of the current process. Also see {@link ensure_reg/3}.
  989. %% @end
  990. ensure_reg_other(Key, Pid, Value, Attrs) ->
  991. ?CATCH_GPROC_ERROR(reg_other1(Key, Pid, Value, Attrs, ensure),
  992. [Key, Pid, Value, Attrs]).
  993. reg_other1(Key, Pid, Op) ->
  994. reg_other1(Key, Pid, default(Key), [], Op).
  995. reg_other1({_,g,_} = Key, Pid, Value, As, Op) when is_pid(Pid) ->
  996. ?CHK_DIST,
  997. gproc_dist:reg_other(Key, Pid, Value, As, Op);
  998. reg_other1({T,l,_} = Key, Pid, Value, As, Op) when is_pid(Pid) ->
  999. if T==n; T==a; T==r; T==rc ->
  1000. call({reg_other, Key, Pid, Value, As, Op});
  1001. true ->
  1002. ?THROW_GPROC_ERROR(badarg)
  1003. end.
  1004. %% @spec reg_or_locate(Key::key(), Value) -> {pid(), NewValue}
  1005. %%
  1006. %% @doc Try registering a unique name, or return existing registration.
  1007. %%
  1008. %% This function tries to register the name `Key', if available.
  1009. %% If such a registration object already exists, the pid and value of
  1010. %% the current registration is returned instead.
  1011. %% @end
  1012. reg_or_locate(Key, Value) ->
  1013. ?CATCH_GPROC_ERROR(reg_or_locate1(Key, Value, self()), [Key, Value]).
  1014. %% @spec reg_or_locate(Key::key(), Value, Fun::fun()) -> {pid(), NewValue}
  1015. %%
  1016. %% @doc Spawn a process with a registered name, or return existing registration.
  1017. %%
  1018. %% This function checks whether a local name is registered; if not, it spawns
  1019. %% a new process (with `spawn(Fun)') and gives it the name.
  1020. %% The pid and value of the resulting registration is returned.
  1021. %%
  1022. %% When a global name is registered in this fashion, the process is
  1023. %% spawned on the caller's node, and the group_leader of the spawned
  1024. %% process is set to the group_leader of the calling process.
  1025. %% @end
  1026. reg_or_locate({n,_,_} = Key, Value, F) when is_function(F, 0) ->
  1027. ?CATCH_GPROC_ERROR(reg_or_locate1(Key, Value, F), [Key, Value, F]).
  1028. reg_or_locate1({_,g,_} = Key, Value, P) ->
  1029. ?CHK_DIST,
  1030. gproc_dist:reg_or_locate(Key, Value, P);
  1031. reg_or_locate1({T,l,_} = Key, Value, P) when T==n; T==a; T==rc ->
  1032. call({reg_or_locate, Key, Value, P});
  1033. reg_or_locate1(_, _, _) ->
  1034. ?THROW_GPROC_ERROR(badarg).
  1035. %% @spec reg_shared(Key::key()) -> true
  1036. %%
  1037. %% @doc Register a resource, but don't tie it to a particular process.
  1038. %%
  1039. %% `reg_shared({c,l,C}) -> reg_shared({c,l,C}, 0).'
  1040. %% `reg_shared({a,l,A}) -> reg_shared({a,l,A}, undefined).'
  1041. %% @end
  1042. reg_shared(Key) ->
  1043. ?CATCH_GPROC_ERROR(reg_shared1(Key), [Key]).
  1044. %% @private
  1045. reg_shared1({T,_,_} = Key) when T==a; T==p; T==c ->
  1046. reg_shared(Key, default(Key)).
  1047. %% @spec reg_shared(Key::key(), Value) -> true
  1048. %%
  1049. %% @doc Register a resource, but don't tie it to a particular process.
  1050. %%
  1051. %% Shared resources are all unique. They remain until explicitly unregistered
  1052. %% (using {@link unreg_shared/1}). The types of shared resources currently
  1053. %% supported are `counter' and `aggregated counter'. In listings and query
  1054. %% results, shared resources appear as other similar resources, except that
  1055. %% `Pid == shared'. To wit, update_counter({c,l,myCounter}, shared, 1) would
  1056. %% increment the shared counter `myCounter' with 1, provided it exists.
  1057. %%
  1058. %% A shared aggregated counter will track updates in exactly the same way as
  1059. %% an aggregated counter which is owned by a process.
  1060. %% @end
  1061. %%
  1062. reg_shared(Key, Value) ->
  1063. ?CATCH_GPROC_ERROR(reg_shared1(Key, Value, []), [Key, Value]).
  1064. reg_shared(Key, Value, Attrs) when is_list(Attrs) ->
  1065. ?CATCH_GPROC_ERROR(reg_shared1(Key, Value, Attrs), [Key, Value, Attrs]).
  1066. %% @private
  1067. reg_shared1({_,g,_} = Key, Value, As) ->
  1068. %% anything global
  1069. ?CHK_DIST,
  1070. gproc_dist:reg_shared(Key, Value, As);
  1071. reg_shared1({a,l,_} = Key, undefined, As) ->
  1072. call({reg_shared, Key, undefined, As, reg});
  1073. reg_shared1({c,l,_} = Key, Value, As) when is_integer(Value) ->
  1074. call({reg_shared, Key, Value, As, reg});
  1075. reg_shared1({p,l,_} = Key, Value, As) ->
  1076. call({reg_shared, Key, Value, As, reg});
  1077. reg_shared1({rc,l,_} = Key, undefined, As) ->
  1078. call({reg_shared, Key, undefined, As, reg});
  1079. reg_shared1(_, _, _) ->
  1080. ?THROW_GPROC_ERROR(badarg).
  1081. %% @spec mreg(type(), scope(), [{Key::any(), Value::any()}]) -> true
  1082. %%
  1083. %% @doc Register multiple {Key,Value} pairs of a given type and scope.
  1084. %%
  1085. %% This function is more efficient than calling {@link reg/2} repeatedly.
  1086. %% It is also atomic in regard to unique names; either all names are registered
  1087. %% or none are.
  1088. %% @end
  1089. mreg(T, C, KVL) ->
  1090. ?CATCH_GPROC_ERROR(mreg1(T, C, KVL), [T, C, KVL]).
  1091. mreg1(T, g, KVL) ->
  1092. ?CHK_DIST,
  1093. gproc_dist:mreg(T, KVL);
  1094. mreg1(T, l, KVL) when T==a; T==n ->
  1095. if is_list(KVL) ->
  1096. call({mreg, T, l, KVL});
  1097. true ->
  1098. erlang:error(badarg)
  1099. end;
  1100. mreg1(p, l, KVL) ->
  1101. local_mreg(p, KVL);
  1102. mreg1(_, _, _) ->
  1103. ?THROW_GPROC_ERROR(badarg).
  1104. %% @spec munreg(type(), scope(), [Key::any()]) -> true
  1105. %%
  1106. %% @doc Unregister multiple Key items of a given type and scope.
  1107. %%
  1108. %% This function is usually more efficient than calling {@link unreg/1}
  1109. %% repeatedly.
  1110. %% @end
  1111. munreg(T, C, L) ->
  1112. ?CATCH_GPROC_ERROR(munreg1(T, C, L), [T, C, L]).
  1113. munreg1(T, g, L) ->
  1114. ?CHK_DIST,
  1115. gproc_dist:munreg(T, existing(T,g,L));
  1116. munreg1(T, l, L) when T==a; T==n ->
  1117. if is_list(L) ->
  1118. call({munreg, T, l, existing(T,l,L)});
  1119. true ->
  1120. erlang:error(badarg)
  1121. end;
  1122. munreg1(p, l, L) ->
  1123. local_munreg(p, existing(p,l,L));
  1124. munreg1(_, _, _) ->
  1125. ?THROW_GPROC_ERROR(badarg).
  1126. existing(T,Scope,L) ->
  1127. Keys = if T==p; T==c ->
  1128. [{{T,Scope,K}, self()} || K <- L];
  1129. T==a; T==n ->
  1130. [{{T,Scope,K}, T} || K <- L]
  1131. end,
  1132. _ = [case ets:member(?TAB, K) of
  1133. false -> erlang:error(badarg);
  1134. true -> true
  1135. end || K <- Keys],
  1136. L.
  1137. %% @spec (Key:: key()) -> true
  1138. %%
  1139. %% @doc Unregister a name or property.
  1140. %% @end
  1141. unreg(Key) ->
  1142. ?CATCH_GPROC_ERROR(unreg1(Key), [Key]).
  1143. unreg1(Key) ->
  1144. case Key of
  1145. {_, g, _} ->
  1146. ?CHK_DIST,
  1147. gproc_dist:unreg(Key);
  1148. {T, l, _} when T == n; T == a; T == r; T == rc ->
  1149. call({unreg, Key});
  1150. {_, l, _} ->
  1151. case ets:member(?TAB, {Key,self()}) of
  1152. true ->
  1153. _ = gproc_lib:remove_reg(Key, self(), unreg),
  1154. true;
  1155. false ->
  1156. ?THROW_GPROC_ERROR(badarg)
  1157. end
  1158. end.
  1159. %% @spec unreg_other(key(), pid()) -> true
  1160. %% @doc Unregister a name registered to another process.
  1161. %%
  1162. %% This function is equivalent to {@link unreg/1}, but specifies another
  1163. %% process as the holder of the registration. An exception is raised if the
  1164. %% name or property is not registered to the given process.
  1165. %% @end
  1166. unreg_other(Key, Pid) ->
  1167. ?CATCH_GPROC_ERROR(unreg_other1(Key, Pid), [Key, Pid]).
  1168. unreg_other1({_,g,_} = Key, Pid) ->
  1169. ?CHK_DIST,
  1170. gproc_dist:unreg_other(Key, Pid);
  1171. unreg_other1({T,l,_} = Key, Pid) when is_pid(Pid) ->
  1172. if T==n; T==a; T==r; T==rc ->
  1173. call({unreg_other, Key, Pid});
  1174. true ->
  1175. ?THROW_GPROC_ERROR(badarg)
  1176. end.
  1177. %% @spec (Key::key(), Props::[{atom(), any()}]) -> true
  1178. %%
  1179. %% @doc Add/modify `{Key, Value}' attributes associated with a registration.
  1180. %%
  1181. %% Gproc registration objects can have `{Key, Value}' attributes associated with
  1182. %% them. These are stored in a way that doesn't affect the cost of name lookup.
  1183. %%
  1184. %% Attributs can be retrieved using `gproc:get_attribute/3' or
  1185. %% `gproc:get_attributes/2'.
  1186. %% @end
  1187. set_attributes(Key, Props) ->
  1188. ?CATCH_GPROC_ERROR(set_attributes1(Key, Props), [Key, Props]).
  1189. set_attributes1(Key, Props) ->
  1190. case Key of
  1191. {_, g, _} ->
  1192. ?CHK_DIST,
  1193. gproc_dist:set_attributes(Key, Props);
  1194. {_, l, _} ->
  1195. call({set_attributes, Key, Props})
  1196. end.
  1197. %% @spec (Key:: key()) -> true
  1198. %%
  1199. %% @doc Unregister a shared resource.
  1200. %% @end
  1201. unreg_shared(Key) ->
  1202. ?CATCH_GPROC_ERROR(unreg_shared1(Key), [Key]).
  1203. %% @private
  1204. unreg_shared1(Key) ->
  1205. case Key of
  1206. {_, g, _} ->
  1207. ?CHK_DIST,
  1208. gproc_dist:unreg_shared(Key);
  1209. {T, l, _} when T == c;
  1210. T == a;
  1211. T == p;
  1212. T == rc -> call({unreg_shared, Key});
  1213. _ ->
  1214. ?THROW_GPROC_ERROR(badarg)
  1215. end.
  1216. %% @spec (Key::key(), Props::[{K,V}]) -> true
  1217. %% @doc Add/modify `{Key, Value}' attributes associated with a shared registration.
  1218. %%
  1219. %% Gproc registration objects can have `{Key, Value}' attributes associated with
  1220. %% them. These are stored in a way that doesn't affect the cost of name lookup.
  1221. %%
  1222. %% Attributes can be retrieved using `gproc:get_attribute/3' or
  1223. %% `gproc:get_attributes/2'.
  1224. %% @end
  1225. %%
  1226. set_attributes_shared(Key, Attrs) ->
  1227. ?CATCH_GPROC_ERROR(set_attributes_shared1(Key, Attrs), [Key, Attrs]).
  1228. set_attributes_shared1(Key, Attrs) ->
  1229. case Key of
  1230. {_, g, _} ->
  1231. ?CHK_DIST,
  1232. gproc_dist:set_attributes_shared(Key, Attrs);
  1233. {_, l, _} ->
  1234. call({set_attributes_shared, Key, Attrs})
  1235. end.
  1236. %% @spec (key(), pid()) -> yes | no
  1237. %%
  1238. %% @doc Behaviour support callback
  1239. %% @end
  1240. register_name({n,_,_} = Name, Pid) when Pid == self() ->
  1241. try reg(Name), yes
  1242. catch
  1243. error:_ ->
  1244. no
  1245. end.
  1246. %% @equiv unreg/1
  1247. unregister_name(Key) ->
  1248. unreg(Key).
  1249. %% @spec select(Arg) -> [Match] | {[Match], Continuation} | '$end_of_table'
  1250. %% where Arg = Continuation
  1251. %% | sel_pattern()
  1252. %% Match = {Key, Pid, Value}
  1253. %% @doc Perform a select operation on the process registry
  1254. %%
  1255. %% When Arg = Contination, resume a gproc:select/1 operation
  1256. %% (see {@link //stdlib/ets:select/1. ets:select/1}
  1257. %%
  1258. %% When Arg = {@type sel_pattern()}, this function executes a select operation,
  1259. %% emulating ets:select/1
  1260. %%
  1261. %% {@link select/2} offers the opportunity to narrow the search
  1262. %% (by limiting to only global or local scope, or a single type of object).
  1263. %% When only a pattern as single argument is given, both global and local scope,
  1264. %% as well as all types of object can be searched. Note that the pattern may
  1265. %% still limit the select operation so that scanning the entire table is avoided.
  1266. %%
  1267. %% The physical representation in the registry may differ from the above,
  1268. %% but the select patterns are transformed appropriately. The logical
  1269. %% representation for the gproc select operations is given by
  1270. %% {@type headpat()}.
  1271. %% @end
  1272. select({?TAB, _, _, _, _, _, _, _} = Continuation) ->
  1273. ets:select(Continuation);
  1274. select(Pat) ->
  1275. select(all, Pat).
  1276. %% @spec (Context::sel_context(), Pat::sel_pattern()) -> [{Key, Pid, Value}]
  1277. %%
  1278. %% @doc Perform a select operation with limited context on the process registry
  1279. %%
  1280. %% The physical representation in the registry may differ from the above,
  1281. %% but the select patterns are transformed appropriately.
  1282. %%
  1283. %% Note that limiting the context is just a convenience function, allowing you
  1284. %% to write a simpler select pattern and still avoid searching the entire
  1285. %% registry. Whenever variables are used in the head pattern, this will result
  1286. %% in a wider scan, even if the values are restricted through a guard (e.g.
  1287. %% <code>select([{'$1','$2','$3'}, [{'==', {element,1,'$1'}, p}], ...])</code>
  1288. %% will count as a wild pattern on the key and result in a full scan).
  1289. %% In this case, specifying a Context will allow gproc to perform some
  1290. %% variable substitution and ensure that the scan is limited.
  1291. %% @end
  1292. select(Context, Pat) ->
  1293. ets:select(?TAB, pattern(Pat, Context)).
  1294. %% @spec (Context::context(), Pat::sel_patten(), Limit::integer()) ->
  1295. %% {[Match],Continuation} | '$end_of_table'
  1296. %% @doc Like {@link select/2} but returns Limit objects at a time.
  1297. %%
  1298. %% See [http://www.erlang.org/doc/man/ets.html#select-3].
  1299. %% @end
  1300. select(Context, Pat, Limit) ->
  1301. ets:select(?TAB, pattern(Pat, Context), Limit).
  1302. %% @spec (sel_pattern()) -> list(sel_object())
  1303. %% @doc
  1304. %% @equiv select_count(all, Pat)
  1305. %% @end
  1306. select_count(Pat) ->
  1307. select_count(all, Pat).
  1308. %% @spec (Context::context(), Pat::sel_pattern()) -> [{Key, Pid, Value}]
  1309. %%
  1310. %% @doc Perform a select_count operation on the process registry.
  1311. %%
  1312. %% The physical representation in the registry may differ from the above,
  1313. %% but the select patterns are transformed appropriately.
  1314. %% @end
  1315. select_count(Context, Pat) ->
  1316. ets:select_count(?TAB, pattern(Pat, Context)).
  1317. %%% Local properties can be registered in the local process, since
  1318. %%% no other process can interfere.
  1319. %%%
  1320. local_reg({_,Scope,_} = Key, Value, As, Op) ->
  1321. case gproc_lib:insert_reg(Key, Value, self(), l) of
  1322. false ->
  1323. case ets:member(?TAB, {Key, self()}) of
  1324. true when Op == ensure ->
  1325. gproc_lib:do_set_value(Key, Value, self()),
  1326. set_attrs(As, Key, self()),
  1327. updated;
  1328. _ ->
  1329. ?THROW_GPROC_ERROR(badarg)
  1330. end;
  1331. true ->
  1332. monitor_me(),
  1333. if As =/= [] ->
  1334. gproc_lib:insert_attr(Key, As, self(), Scope),
  1335. regged_new(Op);
  1336. true ->
  1337. regged_new(Op)
  1338. end
  1339. end.
  1340. regged_new(reg ) -> true;
  1341. regged_new(ensure) -> new.
  1342. local_mreg(_, []) -> true;
  1343. local_mreg(T, [_|_] = KVL) ->
  1344. case gproc_lib:insert_many(T, l, KVL, self()) of
  1345. false -> ?THROW_GPROC_ERROR(badarg);
  1346. {true,_} -> monitor_me()
  1347. end.
  1348. local_munreg(T, L) when T==p; T==c ->
  1349. _ = [gproc_lib:remove_reg({T,l,K}, self(), unreg) || K <- L],
  1350. true.
  1351. %% @spec (Key :: key(), Value) -> true
  1352. %% @doc Sets the value of the registration given by Key
  1353. %%
  1354. %% Key is assumed to exist and belong to the calling process.
  1355. %% If it doesn't, this function will exit.
  1356. %%
  1357. %% Value can be any term, unless the object is a counter, in which case
  1358. %% it must be an integer.
  1359. %% @end
  1360. %%
  1361. set_value(Key, Value) ->
  1362. ?CATCH_GPROC_ERROR(set_value1(Key, Value), [Key, Value]).
  1363. %% @spec (Key :: key(), Value) -> true
  1364. %% @doc Sets the value of the shared registration given by Key
  1365. %%
  1366. %% Key is assumed to exist as a shared entity.
  1367. %% If it doesn't, this function will exit.
  1368. %%
  1369. %% Value can be any term, unless the object is a counter, in which case
  1370. %% it must be an integer.
  1371. %% @end
  1372. %%
  1373. set_value_shared({T,_,_} = Key, Value) when T == c;
  1374. T == a;
  1375. T == p;
  1376. T == r ->
  1377. ?CATCH_GPROC_ERROR(set_value_shared1(Key, Value), [Key, Value]).
  1378. set_value1({_,g,_} = Key, Value) ->
  1379. ?CHK_DIST,
  1380. gproc_dist:set_value(Key, Value);
  1381. set_value1({a,l,_} = Key, Value) when is_integer(Value) ->
  1382. call({set, Key, Value});
  1383. set_value1({n,l,_} = Key, Value) ->
  1384. %% we cannot do this locally, since we have to check that the object
  1385. %% exists first - not an atomic update.
  1386. call({set, Key, Value});
  1387. set_value1({p,l,_} = Key, Value) ->
  1388. %% we _can_ to this locally, since there is no race condition - no
  1389. %% other process can update our properties.
  1390. case gproc_lib:do_set_value(Key, Value, self()) of
  1391. true -> true;
  1392. false ->
  1393. erlang:error(badarg)
  1394. end;
  1395. set_value1({c,l,_} = Key, Value) when is_integer(Value) ->
  1396. gproc_lib:do_set_counter_value(Key, Value, self());
  1397. set_value1(_, _) ->
  1398. ?THROW_GPROC_ERROR(badarg).
  1399. set_value_shared1({_,g,_} = Key, Value) ->
  1400. ?CHK_DIST,
  1401. gproc_dist:set_value_shared(Key, Value);
  1402. set_value_shared1({_,l,_} = Key, Value) ->
  1403. call({set_shared, Key, Value}).
  1404. %% @spec (Key) -> Value
  1405. %% @doc Reads the value stored with a key registered to the current process.
  1406. %%
  1407. %% If no such key is registered to the current process, this function exits.
  1408. %% @end
  1409. get_value(Key) ->
  1410. ?CATCH_GPROC_ERROR(get_value1(Key, self()), [Key]).
  1411. %% @spec (Key) -> Value
  1412. %% @doc Reads the value stored with a shared key.
  1413. %%
  1414. %% If no such shared key is registered, this function exits.
  1415. %% @end
  1416. get_value_shared(Key) ->
  1417. ?CATCH_GPROC_ERROR(get_value1(Key, shared), [Key]).
  1418. %% @spec (Key, Pid) -> Value
  1419. %% @doc Reads the value stored with a key registered to the process Pid.
  1420. %%
  1421. %% If `Pid == shared', the value of a shared key (see {@link reg_shared/1})
  1422. %% will be read.
  1423. %% @end
  1424. %%
  1425. get_value(Key, Pid) ->
  1426. ?CATCH_GPROC_ERROR(get_value1(Key, Pid), [Key, Pid]).
  1427. get_value1({T,_,_} = Key, Pid) when is_pid(Pid) ->
  1428. if T==n; T==a; T==rc ->
  1429. case ets:lookup(?TAB, {Key, T}) of
  1430. [{_, P, Value}] when P == Pid -> Value;
  1431. _ -> ?THROW_GPROC_ERROR(badarg)
  1432. end;
  1433. true ->
  1434. ets:lookup_element(?TAB, {Key, Pid}, 3)
  1435. end;
  1436. get_value1({T,_,_} = K, shared) when T==c; T==a; T==p; T==r ->
  1437. Key = case T of
  1438. c -> {K, shared};
  1439. p -> {K, shared};
  1440. r -> {K, shared};
  1441. a -> {K, a}
  1442. end,
  1443. case ets:lookup(?TAB, Key) of
  1444. [{_, shared, Value}] -> Value;
  1445. _ -> ?THROW_GPROC_ERROR(badarg)
  1446. end;
  1447. get_value1(_, _) ->
  1448. ?THROW_GPROC_ERROR(badarg).
  1449. %% @spec (Key, Attribute::atom()) -> Value
  1450. %% @doc Get attribute value of `Attr' associated with `Key' for most likely Pid.
  1451. %%
  1452. %% The most likely Pid in this case is `self()' for properties and counters,
  1453. %% and the current registration holder in case of names or aggregated counters.
  1454. %% An exception is raised if `Key' is not registered for the given process.
  1455. %% @end
  1456. get_attribute(Key, A) ->
  1457. Pid = case Key of
  1458. {T,_,_} when T==n; T==a; T==rc ->
  1459. where(Key);
  1460. {T,_,_} when T==p; T==c; T==r ->
  1461. self()
  1462. end,
  1463. ?CATCH_GPROC_ERROR(get_attribute1(Key, Pid, A), [Key, A]).
  1464. %% @spec (Key, Pid::pid() | shared, Attr::atom()) -> Value
  1465. %% @doc Get the attribute value of `Attr' associated with `Key' for process Pid.
  1466. %%
  1467. %% If `Pid == shared', the attribute of a shared key (see {@link reg_shared/1})
  1468. %% will be read.
  1469. %% @end
  1470. %%
  1471. get_attribute(Key, Pid, A) ->
  1472. ?CATCH_GPROC_ERROR(get_attribute1(Key, Pid, A), [Key, Pid, A]).
  1473. %% @spec (Key, Attr::atom()) -> Value
  1474. %% @doc Get the attribute value of `Attr' associated with the shared `Key'.
  1475. %%
  1476. %% Equivalent to `get_attribute(Key, shared, Attr)'
  1477. %% (see {@link get_attribute/3}).
  1478. %% @end
  1479. get_attribute_shared(Key, Attr) ->
  1480. ?CATCH_GPROC_ERROR(get_attribute1(Key, shared, Attr), [Key, Attr]).
  1481. %% @private
  1482. get_attribute1({_,_,_} = Key, Pid, A) when is_pid(Pid); Pid==shared ->
  1483. case ets:lookup(?TAB, {Pid, Key}) of
  1484. [{_, Attrs}] ->
  1485. case lists:keyfind(attrs, 1, Attrs) of
  1486. false -> undefined;
  1487. {_, As} ->
  1488. case lists:keyfind(A, 1, As) of
  1489. false -> undefined;
  1490. {_, V} -> V
  1491. end
  1492. end;
  1493. _ -> ?THROW_GPROC_ERROR(badarg)
  1494. end;
  1495. get_attribute1(_, _, _) ->
  1496. ?THROW_GPROC_ERROR(badarg).
  1497. %% @spec get_attributes(Key::key()) -> [{K, V}]
  1498. %% @doc Get attributes associated with registration.
  1499. %% @equiv get_attributes(Key, self())
  1500. %%
  1501. get_attributes(Key) ->
  1502. ?CATCH_GPROC_ERROR(get_attributes1(Key, self()), [Key]).
  1503. %% @spec (Key::key(), Pid::pid() | shared) -> [{K, V}]
  1504. %%
  1505. %% @doc Returns the list of attributes associated with the registration.
  1506. %%
  1507. %% This function raises a `badarg' exception if there is no corresponding
  1508. %% registration.
  1509. %%
  1510. get_attributes(Key, Pid) ->
  1511. ?CATCH_GPROC_ERROR(get_attributes1(Key, Pid), [Key, Pid]).
  1512. get_attributes1({_,_,_} = Key, Pid) when is_pid(Pid); Pid==shared ->
  1513. case ets:lookup(?TAB, {Pid, Key}) of
  1514. [{_, Attrs}] ->
  1515. case lists:keyfind(attrs, 1, Attrs) of
  1516. false -> [];
  1517. {_, As} -> As
  1518. end;
  1519. _ -> ?THROW_GPROC_ERROR(badarg)
  1520. end;
  1521. get_attributes1(_, _) ->
  1522. ?THROW_GPROC_ERROR(badarg).
  1523. %% @spec (Key) -> Pid
  1524. %% @doc Lookup the Pid stored with a key.
  1525. %%
  1526. %% This function raises a `badarg' exception if `Key' is not registered.
  1527. %% @end
  1528. lookup_pid({_T,_,_} = Key) ->
  1529. case where(Key) of
  1530. undefined -> erlang:error(badarg);
  1531. P -> P
  1532. end.
  1533. %% @spec (Key) -> Value
  1534. %% @doc Lookup the value stored with a key.
  1535. %%
  1536. %% This function raises a `badarg' exception if `Key' is not registered.
  1537. %% @end
  1538. lookup_value({T,_,_} = Key) ->
  1539. if T==n orelse T==a orelse T==rc ->
  1540. ets:lookup_element(?TAB, {Key,T}, 3);
  1541. true ->
  1542. erlang:error(badarg)
  1543. end.
  1544. %% @spec (Key::key()) -> pid() | undefined
  1545. %%
  1546. %% @doc Returns the pid registered as Key
  1547. %%
  1548. %% The type of registration must be either name or aggregated counter.
  1549. %% Otherwise this function will raise a `badarg' exception.
  1550. %% Use {@link lookup_pids/1} in these cases.
  1551. %% @end
  1552. %%
  1553. where(Key) ->
  1554. ?CATCH_GPROC_ERROR(where1(Key), [Key]).
  1555. where1({T,_,_}=Key) ->
  1556. if T==n orelse T==a orelse T==rc ->
  1557. case ets:lookup(?TAB, {Key,T}) of
  1558. [{_, P, _Value}] ->
  1559. case my_is_process_alive(P) of
  1560. true -> P;
  1561. false ->
  1562. undefined
  1563. end;
  1564. _ -> % may be [] or [{Key,Waiters}]
  1565. undefined
  1566. end;
  1567. true ->
  1568. ?THROW_GPROC_ERROR(badarg)
  1569. end.
  1570. %% @equiv where/1
  1571. whereis_name(Key) ->
  1572. ?CATCH_GPROC_ERROR(where1(Key), [Key]).
  1573. %% @spec (Key::key()) -> [pid()]
  1574. %%
  1575. %% @doc Returns a list of pids with the published key Key
  1576. %%
  1577. %% If the type of registration is either name or aggregated counter,
  1578. %% this function will return either an empty list, or a list of one pid.
  1579. %% For non-unique types, the return value can be a list of any length.
  1580. %%
  1581. %% Note: shared resources are not associated with any pid, and will
  1582. %% therefore be excluded.
  1583. %% @end
  1584. %%
  1585. lookup_pids({T,_,_} = Key) ->
  1586. L = if T==n orelse T==a orelse T==rc ->
  1587. ets:select(?TAB, [{{{Key,T}, '$1', '_'},
  1588. [{is_pid, '$1'}], ['$1']}]);
  1589. true ->
  1590. ets:select(?TAB, [{{{Key,'_'}, '$1', '_'},
  1591. [{is_pid, '$1'}], ['$1']}])
  1592. end,
  1593. [P || P <- L, my_is_process_alive(P)].
  1594. %% @spec (pid()) -> boolean()
  1595. %%
  1596. my_is_process_alive(P) when node(P) =:= node() ->
  1597. is_process_alive(P);
  1598. my_is_process_alive(_) ->
  1599. %% remote pid - assume true (too costly to find out)
  1600. true.
  1601. %% @spec (Key::key()) -> [{pid(), Value}]
  1602. %%
  1603. %% @doc Retrieve the `{Pid,Value}' pairs corresponding to Key.
  1604. %%
  1605. %% Key refer to any type of registry object. If it refers to a unique
  1606. %% object, the list will be of length 0 or 1. If it refers to a non-unique
  1607. %% object, the return value can be a list of any length.
  1608. %% @end
  1609. %%
  1610. lookup_values({T,_,_} = Key) ->
  1611. L = if T==n orelse T==a orelse T==rc ->
  1612. ets:select(?TAB, [{{{Key,T}, '$1', '$2'},[],[{{'$1','$2'}}]}]);
  1613. true ->
  1614. ets:select(?TAB, [{{{Key,'_'}, '$1', '$2'},[],[{{'$1','$2'}}]}])
  1615. end,
  1616. [Pair || {P,_} = Pair <- L, my_is_process_alive(P)].
  1617. %% @ spec (Key::key(), Incr) -> integer() | [integer()]
  1618. %% Incr = IncrVal | UpdateOp | [UpdateOp]
  1619. %% UpdateOp = IncrVal | {IncrVal, Threshold, SetValue}
  1620. %% IncrVal = integer()
  1621. %%
  1622. %% @doc Updates the counter registered as Key for the current process.
  1623. %%
  1624. %% This function works almost exactly like ets:update_counter/3
  1625. %% (see [http://www.erlang.org/doc/man/ets.html#update_counter-3]), but
  1626. %% will fail if the type of object referred to by Key is not a counter or
  1627. %% a unique name (update_counter/2 can be performed on names as well, but they
  1628. %% do not count as counter objects, and do not affect aggregated counters).
  1629. %%
  1630. %% Aggregated counters with the same name will be updated automatically.
  1631. %% The `UpdateOp' patterns are the same as for `ets:update_counter/3', except
  1632. %% that the position is omitted; in gproc, the value position is always `3'.
  1633. %%
  1634. %% If `Key' refers to a unique name, the operation will depend on the value
  1635. %% part of the registration being an integer(). While non-integer values are
  1636. %% not permitted at all for counter objects, it is the user's responsibility to
  1637. %% ensure that a name, on which `update_counter/2' is to be performed, has the
  1638. %% appropriate value type.
  1639. %% @end
  1640. %%
  1641. -spec update_counter(key(), increment()) -> integer() | [integer()].
  1642. update_counter(Key, Incr) ->
  1643. Pid = case Key of
  1644. {n,_,_} -> n;
  1645. {c,_,_} -> self()
  1646. end,
  1647. ?CATCH_GPROC_ERROR(update_counter1(Key, Pid, Incr), [Key, Incr]).
  1648. update_counter(Key, Pid, Incr) when is_pid(Pid);
  1649. Pid == shared; Pid == n ->
  1650. ?CATCH_GPROC_ERROR(update_counter1(Key, Pid, Incr), [Key, Pid, Incr]).
  1651. update_counter1({T,l,_} = Key, Pid, Incr) when T==c; T==n ->
  1652. gproc_lib:update_counter(Key, Incr, Pid);
  1653. update_counter1({T,g,_} = Key, Pid, Incr) when T==c; T==n ->
  1654. ?CHK_DIST,
  1655. gproc_dist:update_counter(Key, Pid, Incr);
  1656. update_counter1(_, _, _) ->
  1657. ?THROW_GPROC_ERROR(badarg).
  1658. %% @doc Update a list of counters
  1659. %%
  1660. %% This function is not atomic, except (in a sense) for global counters. For local counters,
  1661. %% it is more of a convenience function. For global counters, it is much more efficient
  1662. %% than calling `gproc:update_counter/2' for each individual counter.
  1663. %%
  1664. %% The return value is the corresponding list of `[{Counter, Pid, NewValue}]'.
  1665. %% @end
  1666. -spec update_counters(scope(), [{key(), pid(), increment()}]) ->
  1667. [{key(), pid(), integer()}].
  1668. update_counters(_, []) ->
  1669. [];
  1670. update_counters(l, [_|_] = Cs) ->
  1671. ?CATCH_GPROC_ERROR(update_counters1(Cs), [Cs]);
  1672. update_counters(g, [_|_] = Cs) ->
  1673. ?CHK_DIST,
  1674. gproc_dist:update_counters(Cs).
  1675. update_counters1([{{c,l,_} = Key, Pid, Incr}|T]) ->
  1676. [{Key, Pid, gproc_lib:update_counter(Key, Incr, Pid)}|update_counters1(T)];
  1677. update_counters1([]) ->
  1678. [];
  1679. update_counters1(_) ->
  1680. ?THROW_GPROC_ERROR(badarg).
  1681. %% @spec (Key) -> {ValueBefore, ValueAfter}
  1682. %% Key = {c, Scope, Name} | {n, Scope, Name}
  1683. %% Scope = l | g
  1684. %% ValueBefore = integer()
  1685. %% ValueAfter = integer()
  1686. %%
  1687. %% @doc Reads and resets a counter in a "thread-safe" way
  1688. %%
  1689. %% This function reads the current value of a counter and then resets it to its
  1690. %% initial value. The reset operation is done using {@link update_counter/2},
  1691. %% which allows for concurrent calls to {@link update_counter/2} without losing
  1692. %% updates. Aggregated counters are updated accordingly.
  1693. %%
  1694. %% If `Key' refers to a unique name, the operation will depend on the value
  1695. %% part of the registration being an integer(). While non-integer values are
  1696. %% not permitted at all for counter objects, it is the user's responsibility to
  1697. %% ensure that a name, on which `reset_counter/1' is to be performed, has the
  1698. %% appropriate value type.
  1699. %% @end
  1700. %%
  1701. reset_counter(Key) ->
  1702. ?CATCH_GPROC_ERROR(reset_counter1(Key), [Key]).
  1703. reset_counter1({T,g,_} = Key) when T==c; T==n ->
  1704. ?CHK_DIST,
  1705. gproc_dist:reset_counter(Key);
  1706. reset_counter1({n,l,_} = Key) ->
  1707. [{_, Pid, Current}] = ets:lookup(?TAB, {Key, n}),
  1708. {Current, update_counter(Key, get_initial(Pid, Key) - Current)};
  1709. reset_counter1({c,l,_} = Key) ->
  1710. Current = ets:lookup_element(?TAB, {Key, self()}, 3),
  1711. {Current, update_counter(Key, get_initial(self(), Key) - Current)}.
  1712. get_initial(Pid, Key) ->
  1713. case ets:lookup(?TAB, {Pid, Key}) of
  1714. [{_, r}] -> 0;
  1715. [{_, Opts}] ->
  1716. proplists:get_value(initial, Opts, 0)
  1717. end.
  1718. %% @spec (Key::key(), Incr) -> integer() | [integer()]
  1719. %% Incr = IncrVal | UpdateOp | [UpdateOp]
  1720. %% UpdateOp = IncrVal | {IncrVal, Threshold, SetValue}
  1721. %% IncrVal = integer()
  1722. %%
  1723. %% @doc Updates the shared counter registered as Key.
  1724. %%
  1725. %% This function works almost exactly like ets:update_counter/3
  1726. %% (see [http://www.erlang.org/doc/man/ets.html#update_counter-3]), but
  1727. %% will fail if the type of object referred to by Key is not a counter.
  1728. %%
  1729. %% Aggregated counters with the same name will be updated automatically.
  1730. %% The `UpdateOp' patterns are the same as for `ets:update_counter/3', except
  1731. %% that the position is omitted; in gproc, the value position is always `3'.
  1732. %% @end
  1733. %%
  1734. update_shared_counter(Key, Incr) ->
  1735. ?CATCH_GPROC_ERROR(update_shared_counter1(Key, Incr), [Key, Incr]).
  1736. update_shared_counter1({c,g,_} = Key, Incr) ->
  1737. ?CHK_DIST,
  1738. gproc_dist:update_shared_counter(Key, Incr);
  1739. update_shared_counter1({c,l,_} = Key, Incr) ->
  1740. gproc_lib:update_counter(Key, Incr, shared).
  1741. %% @spec (From::key(), To::pid() | key()) -> undefined | pid()
  1742. %%
  1743. %% @doc Atomically transfers the key `From' to the process identified by `To'.
  1744. %%
  1745. %% This function transfers any gproc key (name, property, counter, aggr counter)
  1746. %% from one process to another, and returns the pid of the new owner.
  1747. %%
  1748. %% `To' must be either a pid or a unique name (name or aggregated counter), but
  1749. %% does not necessarily have to resolve to an existing process. If there is
  1750. %% no process registered with the `To' key, `give_away/2' returns `undefined',
  1751. %% and the `From' key is effectively unregistered.
  1752. %%
  1753. %% It is allowed to give away a key to oneself, but of course, this operation
  1754. %% will have no effect.
  1755. %%
  1756. %% Fails with `badarg' if the calling process does not have a `From' key
  1757. %% registered.
  1758. %% @end
  1759. give_away(Key, ToPid) ->
  1760. ?CATCH_GPROC_ERROR(give_away1(Key, ToPid), [Key, ToPid]).
  1761. give_away1({_,l,_} = Key, ToPid) when is_pid(ToPid), node(ToPid) == node() ->
  1762. call({give_away, Key, ToPid});
  1763. give_away1({_,l,_} = Key, {n,l,_} = ToKey) ->
  1764. call({give_away, Key, ToKey});
  1765. give_away1({_,g,_} = Key, To) ->
  1766. ?CHK_DIST,
  1767. gproc_dist:give_away(Key, To).
  1768. %% @spec () -> ok
  1769. %%
  1770. %% @doc Unregister all items of the calling process and inform gproc
  1771. %% to forget about the calling process.
  1772. %%
  1773. %% This function is more efficient than letting gproc perform these
  1774. %% cleanup operations.
  1775. %% @end
  1776. goodbye() ->
  1777. process_is_down(self()).
  1778. %% @spec (Key::process() | key(), Msg::any()) -> Msg
  1779. %%
  1780. %% @doc Sends a message to the process, or processes, corresponding to Key.
  1781. %%
  1782. %% If Key belongs to a unique object (name or aggregated counter), this
  1783. %% function will send a message to the corresponding process, or fail if there
  1784. %% is no such process. If Key is for a non-unique object type (counter or
  1785. %% property), Msg will be send to all processes that have such an object.
  1786. %%
  1787. %% Key can also be anything that the erlang:send/2, or '!' operator accepts as a process
  1788. %% identifier, namely a pid(), an atom(), or `{Name::atom(), Node::atom()}'.
  1789. %% @end
  1790. %%
  1791. send(P, Msg) when is_pid(P); is_atom(P) ->
  1792. P ! Msg;
  1793. send({Name, Node} = P, Msg) when is_atom(Node), is_atom(Name) ->
  1794. P ! Msg;
  1795. send(Key, Msg) ->
  1796. ?CATCH_GPROC_ERROR(send1(Key, Msg), [Key, Msg]).
  1797. send1({T,C,_} = Key, Msg) when C==l; C==g ->
  1798. if T == n orelse T == a orelse T == rc ->
  1799. case ets:lookup(?TAB, {Key, T}) of
  1800. [{_, Pid, _}] ->
  1801. Pid ! Msg;
  1802. _ ->
  1803. ?THROW_GPROC_ERROR(badarg)
  1804. end;
  1805. T==p orelse T==c orelse T==r ->
  1806. %% BUG - if the key part contains select wildcards, we may end up
  1807. %% sending multiple messages to the same pid
  1808. lists:foreach(fun(Pid) ->
  1809. Pid ! Msg
  1810. end, lookup_pids(Key)),
  1811. Msg;
  1812. true ->
  1813. erlang:error(badarg)
  1814. end;
  1815. send1(_, _) ->
  1816. ?THROW_GPROC_ERROR(badarg).
  1817. %% @spec (Key::key(), Msg::any()) -> Msg
  1818. %%
  1819. %% @equiv bcast(nodes(), Key, Msg)
  1820. %% @end
  1821. %%
  1822. bcast(Key, Msg) ->
  1823. bcast(nodes(), Key, Msg).
  1824. %% @spec (Nodes::[atom()], Key::key(), Msg::any()) -> Msg
  1825. %%
  1826. %% @doc Sends a message to processes corresponding to Key on Nodes.
  1827. %%
  1828. %% This function complements `send/2' and works on locally registered resources
  1829. %% that `send/2' supports. Messages are routed via a special broadcast server
  1830. %% on each node to ensure that ordering is preserved. Distributed delivery
  1831. %% is asynchronous and carries the same guarantees as normal message passing
  1832. %% (with the added proviso that the broadcast server also needs to be available).
  1833. %% @see send/2
  1834. %% @end
  1835. %%
  1836. bcast(Ns, Key, Msg) ->
  1837. ?CATCH_GPROC_ERROR(bcast1(Ns, Key, Msg), [Key, Msg]).
  1838. bcast1(Ns, {T,l,_} = Key, Msg) when T==p; T==a; T==c; T==n; T==r; T==rc ->
  1839. send1(Key, Msg),
  1840. gen_server:abcast(Ns -- [node()], gproc_bcast, {send, Key, Msg}),
  1841. Msg.
  1842. %% @spec (Context :: context()) -> key() | '$end_of_table'
  1843. %%
  1844. %% @doc Behaves as ets:first(Tab) for a given type of registration.
  1845. %%
  1846. %% See [http://www.erlang.org/doc/man/ets.html#first-1].
  1847. %% The registry behaves as an ordered_set table.
  1848. %% @end
  1849. %%
  1850. first(Context) ->
  1851. {S, T} = get_s_t(Context),
  1852. {HeadPat,_} = headpat({S, T}, '_', '_', '_'),
  1853. case ets:select(?TAB, [{HeadPat,[],[{element,1,'$_'}]}], 1) of
  1854. {[First], _} ->
  1855. First;
  1856. _ ->
  1857. '$end_of_table'
  1858. end.
  1859. %% @spec (Context :: context()) -> key() | '$end_of_table'
  1860. %%
  1861. %% @doc Behaves as ets:last(Tab) for a given type of registration.
  1862. %%
  1863. %% See [http://www.erlang.org/doc/man/ets.html#last-1].
  1864. %% The registry behaves as an ordered_set table.
  1865. %% @end
  1866. %%
  1867. last(Context) ->
  1868. {S, T} = get_s_t(Context),
  1869. S1 = if S == '_'; S == l -> m; % 'm' comes after 'l'
  1870. S == g -> h % 'h' comes between 'g' & 'l'
  1871. end,
  1872. Beyond = {{T,S1,[]},[]},
  1873. step(ets:prev(?TAB, Beyond), S, T).
  1874. %% @spec (Context::context(), Key::key()) -> key() | '$end_of_table'
  1875. %%
  1876. %% @doc Behaves as ets:next(Tab,Key) for a given type of registration.
  1877. %%
  1878. %% See [http://www.erlang.org/doc/man/ets.html#next-2].
  1879. %% The registry behaves as an ordered_set table.
  1880. %% @end
  1881. %%
  1882. next(Context, K) ->
  1883. {S,T} = get_s_t(Context),
  1884. {Prev,Unwrap} =
  1885. case K of
  1886. {{_,_,_},_} ->
  1887. {K, false};
  1888. {_,_,_} ->
  1889. {{K,[]}, true} % [] is higher than pid(), shared, p, c...
  1890. end,
  1891. unwrap(Unwrap, step(ets:next(?TAB,Prev), S, T)).
  1892. unwrap(true, {{_,_,_} = R,_}) ->
  1893. R;
  1894. unwrap(_, R) ->
  1895. R.
  1896. %% @spec (Context::context(), Key::key()) -> key() | '$end_of_table'
  1897. %%
  1898. %% @doc Behaves as ets:prev(Tab,Key) for a given type of registration.
  1899. %%
  1900. %% See [http://www.erlang.org/doc/man/ets.html#prev-2].
  1901. %% The registry behaves as an ordered_set table.
  1902. %% @end
  1903. %%
  1904. prev(Context, K) ->
  1905. {S, T} = get_s_t(Context),
  1906. {Prev, Unwrap} =
  1907. case K of
  1908. {{_,_,_},_} -> {K, false};
  1909. {_,_,_} ->
  1910. {{K,1}, true}
  1911. end,
  1912. unwrap(Unwrap, step(ets:prev(?TAB, Prev), S, T)).
  1913. step(Key, '_', '_') ->
  1914. case Key of
  1915. {{_,_,_},_} -> Key;
  1916. _ -> '$end_of_table'
  1917. end;
  1918. step(Key, '_', T) ->
  1919. case Key of
  1920. {{T,_,_},_} -> Key;
  1921. _ -> '$end_of_table'
  1922. end;
  1923. step(Key, S, '_') ->
  1924. case Key of
  1925. {{_, S, _}, _} -> Key;
  1926. _ -> '$end_of_table'
  1927. end;
  1928. step(Key, S, T) ->
  1929. case Key of
  1930. {{T, S, _}, _} -> Key;
  1931. _ -> '$end_of_table'
  1932. end.
  1933. %% @spec (Pid::pid()) -> ProcessInfo
  1934. %% ProcessInfo = [{gproc, [{Key,Value}]} | ProcessInfo]
  1935. %%
  1936. %% @doc Similar to `process_info(Pid)' but with additional gproc info.
  1937. %%
  1938. %% Returns the same information as process_info(Pid), but with the
  1939. %% addition of a `gproc' information item, containing the `{Key,Value}'
  1940. %% pairs registered to the process.
  1941. %% @end
  1942. info(Pid) when is_pid(Pid) ->
  1943. Items = [?MODULE | [ I || {I,_} <- process_info(self())]],
  1944. [info(Pid,I) || I <- Items].
  1945. %% @spec (Pid::pid(), Item::atom()) -> {Item, Info}
  1946. %%
  1947. %% @doc Similar to process_info(Pid, Item), but with additional gproc info.
  1948. %%
  1949. %% For `Item = gproc', this function returns a list of `{Key, Value}' pairs
  1950. %% registered to the process Pid. For other values of Item, it returns the
  1951. %% same as [http://www.erlang.org/doc/man/erlang.html#process_info-2].
  1952. %% @end
  1953. info(Pid, gproc) ->
  1954. gproc_info(Pid, '_');
  1955. info(Pid, {gproc, Pat}) ->
  1956. gproc_info(Pid, Pat);
  1957. info(Pid, current_function) ->
  1958. {_, T} = process_info(Pid, backtrace),
  1959. info_cur_f(T, process_info(Pid, current_function));
  1960. info(Pid, I) ->
  1961. process_info(Pid, I).
  1962. %% We don't want to return the internal gproc:info() function as current
  1963. %% function, so we grab the 'backtrace' and extract the call stack from it,
  1964. %% filtering out the functions gproc:info/_ and gproc:'-info/1-lc...' entries.
  1965. %%
  1966. %% This is really an indication that wrapping the process_info() BIF was a
  1967. %% bad idea to begin with... :P
  1968. %%
  1969. info_cur_f(T, Default) ->
  1970. {match, Matches} = re:run(T,<<"\\(([^\\)]+):(.+)/([0-9]+)">>,
  1971. [global,{capture,[1,2,3],list}]),
  1972. case lists:dropwhile(fun(["gproc","info",_]) -> true;
  1973. (["gproc","'-info/1-lc" ++ _, _]) -> true;
  1974. (_) -> false
  1975. end, Matches) of
  1976. [] ->
  1977. Default;
  1978. [[M,F,A]|_] ->
  1979. {current_function,
  1980. {to_atom(M), to_atom(F), list_to_integer(A)}}
  1981. end.
  1982. to_atom(S) ->
  1983. case erl_scan:string(S) of
  1984. {ok, [{atom,_,A}|_],_} ->
  1985. A;
  1986. _ ->
  1987. list_to_atom(S)
  1988. end.
  1989. gproc_info(Pid, Pat) ->
  1990. Keys = ets:select(?TAB, [{ {{Pid,Pat}, '_'}, [], [{element,2,
  1991. {element,1,'$_'}}] }]),
  1992. {?MODULE, lists:zf(
  1993. fun(K) ->
  1994. try V = get_value(K, Pid),
  1995. {true, {K,V}}
  1996. catch
  1997. error:_ ->
  1998. false
  1999. end
  2000. end, Keys)}.
  2001. %% @spec () -> ok
  2002. %%
  2003. %% @doc Similar to the built-in shell command `i()' but inserts information
  2004. %% about names and properties registered in Gproc, where applicable.
  2005. %% @end
  2006. i() ->
  2007. gproc_info:i().
  2008. %%% ==========================================================
  2009. %% @hidden
  2010. handle_cast({monitor_me, Pid}, S) ->
  2011. erlang:monitor(process, Pid),
  2012. {noreply, S};
  2013. handle_cast({audit_process, Pid}, S) ->
  2014. case is_process_alive(Pid) of
  2015. false ->
  2016. process_is_down(Pid);
  2017. true ->
  2018. ignore
  2019. end,
  2020. {noreply, S};
  2021. handle_cast({cancel_wait, Pid, {T,_,_} = Key, Ref}, S) ->
  2022. _ = case ets:lookup(?TAB, {Key,T}) of
  2023. [{_, Waiters}] ->
  2024. gproc_lib:remove_wait(Key, Pid, Ref, Waiters);
  2025. _ ->
  2026. ignore
  2027. end,
  2028. {noreply, S};
  2029. handle_cast({cancel_wait_or_monitor, Pid, {T,_,_} = Key}, S) ->
  2030. _ = case ets:lookup(?TAB, {Key, T}) of
  2031. [{_, Waiters}] ->
  2032. gproc_lib:remove_wait(Key, Pid, all, Waiters);
  2033. [{_, OtherPid, _}] ->
  2034. gproc_lib:remove_monitors(Key, OtherPid, Pid);
  2035. _ ->
  2036. ok
  2037. end,
  2038. {noreply, S}.
  2039. %% @hidden
  2040. handle_call({reg, {_T,l,_} = Key, Val, Attrs, Op}, {Pid,_}, S) ->
  2041. handle_reg_call(Key, Pid, Val, Attrs, Op, S);
  2042. handle_call({reg_other, {_T,l,_} = Key, Pid, Val, Attrs, Op}, _, S) ->
  2043. handle_reg_call(Key, Pid, Val, Attrs, Op, S);
  2044. handle_call({set_attributes, {_,l,_} = Key, Attrs}, {Pid,_}, S) ->
  2045. case gproc_lib:insert_attr(Key, Attrs, Pid, l) of
  2046. false -> {reply, badarg, S};
  2047. L when is_list(L) ->
  2048. {reply, true, S}
  2049. end;
  2050. handle_call({set_attributes_shared, {_,l,_} = Key, Attrs}, _, S) ->
  2051. case gproc_lib:insert_attr(Key, Attrs, shared, l) of
  2052. false ->
  2053. {reply, badarg, S};
  2054. L when is_list(L) ->
  2055. {reply, true, S}
  2056. end;
  2057. handle_call({reg_or_locate, {T,l,_} = Key, Val, P}, _, S) ->
  2058. Reg = fun() ->
  2059. Pid = if is_function(P, 0) ->
  2060. spawn(P);
  2061. is_pid(P) ->
  2062. P
  2063. end,
  2064. true = gproc_lib:insert_reg(Key, Val, Pid, l),
  2065. _ = gproc_lib:ensure_monitor(Pid, l),
  2066. {reply, {Pid, Val}, S}
  2067. end,
  2068. case ets:lookup(?TAB, {Key, T}) of
  2069. [] ->
  2070. Reg();
  2071. [{_, _Waiters}] ->
  2072. Reg();
  2073. [{_, OtherPid, OtherValue}] ->
  2074. {reply, {OtherPid, OtherValue}, S}
  2075. end;
  2076. handle_call({monitor, {T,l,_} = Key, Pid, Type}, _From, S)
  2077. when T==n; T==a ->
  2078. Ref = make_ref(),
  2079. Lookup = ets:lookup(?TAB, {Key, T}),
  2080. IsRegged = is_regged(Lookup),
  2081. _ = case {IsRegged, Type} of
  2082. {false, info} ->
  2083. Pid ! {gproc, unreg, Ref, Key};
  2084. {false, follow} ->
  2085. Pid ! {gproc, unreg, Ref, Key},
  2086. _ = gproc_lib:ensure_monitor(Pid, l),
  2087. case Lookup of
  2088. [{K, Waiters}] ->
  2089. NewWaiters = [{Pid,Ref,follow}|Waiters],
  2090. ets:insert(?TAB, {K, NewWaiters}),
  2091. ets:insert_new(?TAB, {{Pid,Key}, []});
  2092. [] ->
  2093. ets:insert(?TAB, {{Key,T}, [{Pid,Ref,follow}]}),
  2094. ets:insert_new(?TAB, {{Pid,Key}, []})
  2095. end;
  2096. {false, standby} ->
  2097. Evt = {failover, Pid},
  2098. true = gproc_lib:insert_reg(Key, undefined, Pid, l, Evt),
  2099. Pid ! {gproc, Evt, Ref, Key},
  2100. _ = gproc_lib:ensure_monitor(Pid, l);
  2101. {true, _} ->
  2102. [{_, RegPid, _}] = Lookup,
  2103. _ = gproc_lib:ensure_monitor(Pid, l),
  2104. case ets:lookup(?TAB, {RegPid, Key}) of
  2105. [{K,r}] ->
  2106. ets:insert(?TAB, {K, [{monitor, [{Pid,Ref,Type}]}]}),
  2107. ets:insert_new(?TAB, {{Pid,Key}, []});
  2108. [{K, Opts}] ->
  2109. ets:insert(?TAB, {K, gproc_lib:add_monitor(
  2110. Opts, Pid, Ref, Type)}),
  2111. ets:insert_new(?TAB, {{Pid,Key}, []})
  2112. end
  2113. end,
  2114. {reply, Ref, S};
  2115. handle_call({demonitor, {T,l,_} = Key, Ref, Pid}, _From, S)
  2116. when T==n; T==a; T==rc ->
  2117. _ = case ets:lookup(?TAB, {Key, T}) of
  2118. [] ->
  2119. ok; % be nice
  2120. [{_, Waiters}] ->
  2121. case lists:filter(fun({P, R, _}) ->
  2122. P =/= Pid orelse R =/= Ref
  2123. end, Waiters) of
  2124. [] ->
  2125. ets:delete(?TAB, {Pid, Key}),
  2126. ets:delete(?TAB, {Key, T});
  2127. NewWaiters ->
  2128. case lists:keymember(Pid, 1, NewWaiters) of
  2129. true ->
  2130. ok;
  2131. false ->
  2132. ets:delete(?TAB, {Pid, Key})
  2133. end,
  2134. ets:insert(?TAB, {{Key, T}, Waiters})
  2135. end;
  2136. [{_, RegPid, _}] ->
  2137. case ets:lookup(?TAB, {RegPid, Key}) of
  2138. [{_K,r}] ->
  2139. ok; % be nice
  2140. [{K, Opts}] ->
  2141. Opts1 = gproc_lib:remove_monitor(Opts, Pid, Ref),
  2142. ets:insert(?TAB, {K, Opts1}),
  2143. case gproc_lib:does_pid_monitor(Pid, Opts) of
  2144. true ->
  2145. ok;
  2146. false ->
  2147. ets:delete(?TAB, {Pid, Key})
  2148. end
  2149. end
  2150. end,
  2151. {reply, ok, S};
  2152. handle_call({reg_shared, {_T,l,_} = Key, Val, Attrs, Op}, _From, S) ->
  2153. case try_insert_reg(Key, Val, shared) of
  2154. true ->
  2155. _ = if Attrs =/= [] ->
  2156. gproc_lib:insert_attr(Key, Attrs, shared, l);
  2157. true -> true
  2158. end,
  2159. {reply, true, S};
  2160. already_registered when Op == ensure ->
  2161. case gproc_lib:do_set_value(Key, Val, shared) of
  2162. true ->
  2163. {reply, updated, S};
  2164. false ->
  2165. {reply, badarg, S}
  2166. end;
  2167. _ ->
  2168. {reply, badarg, S}
  2169. end;
  2170. handle_call({unreg, {_,l,_} = Key}, {Pid,_}, S) ->
  2171. handle_unreg_call(Key, Pid, S);
  2172. handle_call({unreg_other, {_,l,_} = Key, Pid}, _, S) ->
  2173. handle_unreg_call(Key, Pid, S);
  2174. handle_call({unreg_shared, {_,l,_} = Key}, _, S) ->
  2175. _ = case ets:lookup(?TAB, {shared, Key}) of
  2176. [{_, r}] ->
  2177. _ = gproc_lib:remove_reg(Key, shared, unreg, []);
  2178. [{_, Opts}] ->
  2179. _ = gproc_lib:remove_reg(Key, shared, unreg, Opts);
  2180. [] ->
  2181. %% don't crash if shared key already unregged.
  2182. ok
  2183. end,
  2184. {reply, true, S};
  2185. handle_call({await, {_,l,_} = Key, Pid}, From, S) ->
  2186. %% Passing the pid explicitly is needed when leader_call is used,
  2187. %% since the Pid given as From in the leader is the local gen_leader
  2188. %% instance on the calling node.
  2189. case gproc_lib:await(Key, Pid, From) of
  2190. noreply ->
  2191. {noreply, S};
  2192. {reply, Reply, _} ->
  2193. {reply, Reply, S}
  2194. end;
  2195. handle_call({mreg, T, l, L}, {Pid,_}, S) ->
  2196. try gproc_lib:insert_many(T, l, L, Pid) of
  2197. {true,_} -> {reply, true, S};
  2198. false -> {reply, badarg, S}
  2199. catch
  2200. error:_ -> {reply, badarg, S}
  2201. end;
  2202. handle_call({munreg, T, l, L}, {Pid,_}, S) ->
  2203. _ = gproc_lib:remove_many(T, l, L, Pid),
  2204. {reply, true, S};
  2205. handle_call({set, {_,l,_} = Key, Value}, {Pid,_}, S) ->
  2206. case gproc_lib:do_set_value(Key, Value, Pid) of
  2207. true ->
  2208. {reply, true, S};
  2209. false ->
  2210. {reply, badarg, S}
  2211. end;
  2212. handle_call({set_shared, {_,l,_} = Key, Value}, {_,_}, S) ->
  2213. case gproc_lib:do_set_value(Key, Value, shared) of
  2214. true ->
  2215. {reply, true, S};
  2216. false ->
  2217. {reply, badarg, S}
  2218. end;
  2219. handle_call({audit_process, Pid}, _, S) ->
  2220. _ = case is_process_alive(Pid) of
  2221. false ->
  2222. process_is_down(Pid);
  2223. true ->
  2224. ignore
  2225. end,
  2226. {reply, ok, S};
  2227. handle_call({give_away, Key, To}, {Pid,_}, S) ->
  2228. Reply = do_give_away(Key, To, Pid),
  2229. {reply, Reply, S};
  2230. handle_call(_, _, S) ->
  2231. {reply, badarg, S}.
  2232. %% @hidden
  2233. handle_info({'DOWN', _MRef, process, Pid, _}, S) ->
  2234. _ = process_is_down(Pid),
  2235. {noreply, S};
  2236. handle_info(_, S) ->
  2237. {noreply, S}.
  2238. %% @hidden
  2239. code_change(_FromVsn, S, _Extra) ->
  2240. %% We have changed local monitor markers from {Pid} to {Pid,l}.
  2241. _ = case ets:select(?TAB, [{{'$1'},[],['$1']}]) of
  2242. [] ->
  2243. ok;
  2244. Pids ->
  2245. ets:insert(?TAB, [{P,l} || P <- Pids]),
  2246. ets:select_delete(?TAB, [{{'_'},[],[true]}])
  2247. end,
  2248. {ok, S}.
  2249. %% @hidden
  2250. terminate(_Reason, _S) ->
  2251. ok.
  2252. %% handle_call body common to reg and reg_other.
  2253. %%
  2254. handle_reg_call(Key, Pid, Val, Attrs, Op, S) ->
  2255. case try_insert_reg(Key, Val, Pid) of
  2256. true ->
  2257. _ = gproc_lib:ensure_monitor(Pid,l),
  2258. _ = set_attrs(Attrs, Key, Pid),
  2259. {reply, regged_new(Op), S};
  2260. already_registered when Op == ensure ->
  2261. case gproc_lib:do_set_value(Key, Val, Pid) of
  2262. true ->
  2263. _ = set_attrs(Attrs, Key, Pid),
  2264. {reply, updated, S};
  2265. false ->
  2266. %% actually pretty bad, if it ever happens
  2267. {reply, badarg, S}
  2268. end;
  2269. false ->
  2270. {reply, badarg, S}
  2271. end.
  2272. set_attrs([], _, _) ->
  2273. true;
  2274. set_attrs([_|_] = Attrs, Key, Pid) ->
  2275. gproc_lib:insert_attr(Key, Attrs, Pid, l).
  2276. handle_unreg_call(Key, Pid, S) ->
  2277. case ets:lookup(?TAB, {Pid,Key}) of
  2278. [{_, r}] ->
  2279. _ = gproc_lib:remove_reg(Key, Pid, unreg, []),
  2280. {reply, true, S};
  2281. [{_, Opts}] when is_list(Opts) ->
  2282. _ = gproc_lib:remove_reg(Key, Pid, unreg, Opts),
  2283. {reply, true, S};
  2284. [] ->
  2285. {reply, badarg, S}
  2286. end.
  2287. call(Req) ->
  2288. call(Req, l).
  2289. call(Req, l) ->
  2290. chk_reply(gen_server:call(?MODULE, Req));
  2291. call(Req, g) ->
  2292. chk_reply(gproc_dist:leader_call(Req)).
  2293. call(N, Req, l) ->
  2294. chk_reply(gen_server:call({?MODULE, N}, Req));
  2295. call(undefined, Req, g) ->
  2296. %% we always call the leader
  2297. chk_reply(gproc_dist:leader_call(Req)).
  2298. chk_reply(Reply) ->
  2299. case Reply of
  2300. badarg -> ?THROW_GPROC_ERROR(badarg);
  2301. _ -> Reply
  2302. end.
  2303. cast(Msg) ->
  2304. cast(Msg, l).
  2305. cast(Msg, l) ->
  2306. gen_server:cast(?MODULE, Msg);
  2307. cast(Msg, g) ->
  2308. gproc_dist:leader_cast(Msg).
  2309. cast(N, Msg, l) ->
  2310. gen_server:cast({?MODULE, N}, Msg).
  2311. try_insert_reg({T,l,_} = Key, Val, Pid) ->
  2312. case gproc_lib:insert_reg(Key, Val, Pid, l) of
  2313. false ->
  2314. case ets:lookup(?TAB, {Key,T}) of
  2315. %% In this particular case, the lookup cannot result in
  2316. %% [{_, Waiters}], since the insert_reg/4 function would
  2317. %% have succeeded then.
  2318. [{_, Pid, _}] ->
  2319. already_registered;
  2320. [{_, OtherPid, _}] ->
  2321. case is_process_alive(OtherPid) of
  2322. true ->
  2323. false;
  2324. false ->
  2325. process_is_down(OtherPid), % may result in failover
  2326. try_insert_reg(Key, Val, Pid)
  2327. end;
  2328. [] ->
  2329. false
  2330. end;
  2331. true ->
  2332. true
  2333. end.
  2334. %% try_insert_shared({c,l,_} = Key, Val) ->
  2335. %% ets:insert_new(?TAB, [{{Key,shared}, shared, Val}, {{shared, Key}, []}]);
  2336. %% try_insert_shared({a,l,_} = Key, Val) ->
  2337. %% ets:insert_new(?TAB, [{{Key, a}, shared, Val}, {{shared, Key}, []}]).
  2338. -spec audit_process(pid()) -> ok.
  2339. audit_process(Pid) when is_pid(Pid) ->
  2340. ok = gen_server:call(gproc, {audit_process, Pid}, infinity).
  2341. nb_audit_process(Pid) when is_pid(Pid) ->
  2342. ok = gen_server:cast(gproc, {audit_process, Pid}).
  2343. -spec process_is_down(pid()) -> ok.
  2344. process_is_down(Pid) when is_pid(Pid) ->
  2345. %% delete the monitor marker
  2346. %% io:fwrite(user, "process_is_down(~p) - ~p~n", [Pid,ets:tab2list(?TAB)]),
  2347. Marker = {Pid,l},
  2348. case ets:member(?TAB, Marker) of
  2349. false ->
  2350. ok;
  2351. true ->
  2352. Revs = ets:select(?TAB, [{{{Pid,'$1'}, '$2'},
  2353. [{'==',{element,2,'$1'},l}],
  2354. [{{'$1','$2'}}]}]),
  2355. lists:foreach(
  2356. fun({{n,l,_}=K, R}) ->
  2357. Key = {K,n},
  2358. case ets:lookup(?TAB, Key) of
  2359. [{_, Pid, V}] ->
  2360. ets:delete(?TAB, Key),
  2361. opt_notify(R, K, Pid, V);
  2362. [{_, Waiters}] ->
  2363. case [W || W <- Waiters,
  2364. element(1,W) =/= Pid] of
  2365. [] ->
  2366. ets:delete(?TAB, Key);
  2367. Waiters1 ->
  2368. ets:insert(?TAB, {Key, Waiters1})
  2369. end;
  2370. [{_, OtherPid, _}] when Pid =/= OtherPid ->
  2371. case ets:lookup(?TAB, {OtherPid, K}) of
  2372. [{RK, Opts}] when is_list(Opts) ->
  2373. Opts1 = gproc_lib:remove_monitor_pid(
  2374. Opts, Pid),
  2375. ets:insert(?TAB, {RK, Opts1});
  2376. _ ->
  2377. true
  2378. end;
  2379. [] ->
  2380. true
  2381. end;
  2382. ({{c,l,C} = K, _}) ->
  2383. Key = {K, Pid},
  2384. [{_, _, Value}] = ets:lookup(?TAB, Key),
  2385. ets:delete(?TAB, Key),
  2386. gproc_lib:update_aggr_counter(l, C, -Value);
  2387. ({{r,l,Rsrc} = K, _}) ->
  2388. Key = {K, Pid},
  2389. ets:delete(?TAB, Key),
  2390. gproc_lib:decrement_resource_count(l, Rsrc);
  2391. ({{rc,l,_} = K, R}) ->
  2392. remove_aggregate(rc, K, R, Pid);
  2393. ({{a,l,_} = K, R}) ->
  2394. remove_aggregate(a, K, R, Pid);
  2395. ({{p,_,_} = K, _}) ->
  2396. ets:delete(?TAB, {K, Pid})
  2397. end, Revs),
  2398. ets:select_delete(?TAB, [{{{Pid,{'_',l,'_'}},'_'}, [], [true]}]),
  2399. ets:delete(?TAB, Marker),
  2400. ok
  2401. end.
  2402. remove_aggregate(T, K, R, Pid) ->
  2403. case ets:lookup(?TAB, {K,T}) of
  2404. [{_, Pid, V}] ->
  2405. ets:delete(?TAB, {K,T}),
  2406. opt_notify(R, K, Pid, V);
  2407. [{_, OtherPid, _}] when Pid =/= OtherPid ->
  2408. case ets:lookup(?TAB, {OtherPid, K}) of
  2409. [{RK, Opts}] when is_list(Opts) ->
  2410. Opts1 = gproc_lib:remove_monitor_pid(
  2411. Opts, Pid),
  2412. ets:insert(?TAB, {RK, Opts1});
  2413. _ ->
  2414. true
  2415. end;
  2416. [] ->
  2417. opt_notify(R, K, Pid, undefined)
  2418. end.
  2419. opt_notify(r, _, _, _) ->
  2420. ok;
  2421. opt_notify(Opts, {T,_,_} = Key, Pid, Value) ->
  2422. case gproc_lib:standbys(Opts) of
  2423. [] ->
  2424. keep_followers(Opts, Key),
  2425. gproc_lib:notify(unreg, Key, Opts);
  2426. SBs ->
  2427. case pick_standby(SBs) of
  2428. false ->
  2429. keep_followers(Opts, Key),
  2430. gproc_lib:notify(unreg, Key, Opts),
  2431. ok;
  2432. {ToPid, Ref} ->
  2433. ets:insert(?TAB, [{{Key,T}, ToPid, Value},
  2434. {{ToPid, Key},
  2435. gproc_lib:remove_monitor(
  2436. Opts, ToPid, Ref)}]),
  2437. _ = gproc_lib:remove_reverse_mapping(
  2438. {failover,ToPid}, Pid, Key),
  2439. _ = gproc_lib:ensure_monitor(ToPid, l),
  2440. ok
  2441. end
  2442. end.
  2443. keep_followers(Opts, {T,_,_} = Key) ->
  2444. case gproc_lib:followers(Opts) of
  2445. [] ->
  2446. ok;
  2447. [_|_] = F ->
  2448. ets:insert(?TAB, {{Key,T}, F})
  2449. end.
  2450. pick_standby([{Pid, Ref, standby}|T]) when node(Pid) =:= node() ->
  2451. case is_process_alive(Pid) of
  2452. true ->
  2453. {Pid, Ref};
  2454. false ->
  2455. pick_standby(T)
  2456. end;
  2457. pick_standby([_|T]) ->
  2458. pick_standby(T);
  2459. pick_standby([]) ->
  2460. false.
  2461. do_give_away({T,l,_} = K, To, Pid) when T==n; T==a; T==rc ->
  2462. Key = {K, T},
  2463. case ets:lookup(?TAB, Key) of
  2464. [{_, Pid, Value}] ->
  2465. %% Pid owns the reg; allowed to give_away
  2466. case pid_to_give_away_to(To) of
  2467. Pid ->
  2468. %% Give away to ourselves? Why not? We'll allow it,
  2469. %% but nothing needs to be done.
  2470. Pid;
  2471. ToPid when is_pid(ToPid) ->
  2472. ets:insert(?TAB, [{Key, ToPid, Value},
  2473. {{ToPid, K}, []}]),
  2474. _ = gproc_lib:remove_reverse_mapping({migrated,ToPid}, Pid, K),
  2475. _ = gproc_lib:ensure_monitor(ToPid, l),
  2476. ToPid;
  2477. undefined ->
  2478. _ = gproc_lib:remove_reg(K, Pid, unreg),
  2479. undefined
  2480. end;
  2481. _ ->
  2482. badarg
  2483. end;
  2484. do_give_away({T,l,_} = K, To, Pid) when T==c; T==p; T==r ->
  2485. Key = {K, Pid},
  2486. case ets:lookup(?TAB, Key) of
  2487. [{_, Pid, Value}] ->
  2488. case pid_to_give_away_to(To) of
  2489. ToPid when is_pid(ToPid) ->
  2490. ToKey = {K, ToPid},
  2491. case ets:member(?TAB, ToKey) of
  2492. true ->
  2493. badarg;
  2494. false ->
  2495. ets:insert(?TAB, [{ToKey, ToPid, Value},
  2496. {{ToPid, K}, []}]),
  2497. ets:delete(?TAB, {Pid, K}),
  2498. ets:delete(?TAB, Key),
  2499. _ = gproc_lib:ensure_monitor(ToPid, l),
  2500. ToPid
  2501. end;
  2502. undefined ->
  2503. _ = gproc_lib:remove_reg(K, Pid, {migrated, undefined}),
  2504. undefined
  2505. end;
  2506. _ ->
  2507. badarg
  2508. end.
  2509. pid_to_give_away_to(P) when is_pid(P), node(P) == node() ->
  2510. P;
  2511. pid_to_give_away_to({T,l,_} = Key) when T==n; T==a; T==rc ->
  2512. case ets:lookup(?TAB, {Key, T}) of
  2513. [{_, Pid, _}] ->
  2514. Pid;
  2515. _ ->
  2516. undefined
  2517. end.
  2518. create_tabs() ->
  2519. Opts = gproc_lib:valid_opts(ets_options, [{write_concurrency,true},
  2520. {read_concurrency, true}]),
  2521. case ets:info(?TAB, name) of
  2522. undefined ->
  2523. ets:new(?TAB, [ordered_set, public, named_table | Opts]);
  2524. _ ->
  2525. ok
  2526. end.
  2527. %% @hidden
  2528. init([]) ->
  2529. set_monitors(),
  2530. {ok, #state{}}.
  2531. set_monitors() ->
  2532. set_monitors(ets:select(?TAB, [{{{'$1',l}},[],['$1']}], 100)).
  2533. set_monitors('$end_of_table') ->
  2534. ok;
  2535. set_monitors({Pids, Cont}) ->
  2536. _ = [erlang:monitor(process,Pid) || Pid <- Pids],
  2537. set_monitors(ets:select(Cont)).
  2538. monitor_me() ->
  2539. case ets:insert_new(?TAB, {{self(),l}}) of
  2540. false -> true;
  2541. true ->
  2542. cast({monitor_me,self()}),
  2543. true
  2544. end.
  2545. pattern([{'_', Gs, As}], T) ->
  2546. ?l,
  2547. {HeadPat, Vs} = headpat(T, '$1', '$2', '$3'),
  2548. [{HeadPat, rewrite(Gs,Vs), rewrite(As,Vs)}];
  2549. pattern([{{A,B,C},Gs,As}], Scope) ->
  2550. ?l,
  2551. {HeadPat, Vars} = headpat(Scope, A,B,C),
  2552. [{HeadPat, rewrite(Gs,Vars), rewrite(As,Vars)}];
  2553. pattern([{Head, Gs, As}], Scope) ->
  2554. ?l,
  2555. {S, T} = get_s_t(Scope),
  2556. case is_var(Head) of
  2557. {true,_N} ->
  2558. HeadPat = {{{T,S,'_'},'_'},'_','_'},
  2559. Vs = [{Head, obj_prod()}],
  2560. %% the headpat function should somehow verify that Head is
  2561. %% consistent with Scope (or should we add a guard?)
  2562. [{HeadPat, rewrite(Gs, Vs), rewrite(As, Vs)}];
  2563. false ->
  2564. erlang:error(badarg)
  2565. end.
  2566. %% This is the expression to use in guards and the RHS to address the whole
  2567. %% object, in its logical representation.
  2568. obj_prod() ->
  2569. {{ {element,1,{element,1,'$_'}},
  2570. {element,2,'$_'},
  2571. {element,3,'$_'} }}.
  2572. obj_prod_l() ->
  2573. [ {element,1,{element,1,'$_'}},
  2574. {element,2,'$_'},
  2575. {element,3,'$_'} ].
  2576. headpat({S, T}, V1,V2,V3) ->
  2577. headpat(type(T), scope(S), V1,V2,V3);
  2578. headpat(T, V1, V2, V3) when is_atom(T) ->
  2579. headpat(type(T), l, V1, V2, V3);
  2580. headpat(_, _, _, _) -> erlang:error(badarg).
  2581. headpat(T, C, V1,V2,V3) ->
  2582. Rf = fun(Pos) ->
  2583. {element,Pos,{element,1,{element,1,'$_'}}}
  2584. end,
  2585. K2 = if T==n orelse T==a orelse T==rc -> T;
  2586. true -> '_'
  2587. end,
  2588. {Kp,Vars} = case V1 of
  2589. {Vt,Vc,Vn} ->
  2590. ?l,
  2591. {T1,Vs1} = subst(T,Vt,fun() -> Rf(1) end,[]),
  2592. {C1,Vs2} = subst(C,Vc,fun() -> Rf(2) end,Vs1),
  2593. {{T1,C1,Vn}, Vs2};
  2594. '_' ->
  2595. ?l,
  2596. {{T,C,'_'}, []};
  2597. _ ->
  2598. ?l,
  2599. case is_var(V1) of
  2600. {true,_} ->
  2601. {{T,C,V1}, [{V1, {element,1,
  2602. {element,1,'$_'}}}]};
  2603. false ->
  2604. erlang:error(badarg)
  2605. end
  2606. end,
  2607. {{{Kp,K2},V2,V3}, Vars}.
  2608. %% l(L) -> L.
  2609. subst(X, '_', _F, Vs) ->
  2610. {X, Vs};
  2611. subst(X, V, F, Vs) ->
  2612. case is_var(V) of
  2613. {true,_} ->
  2614. {X, [{V,F()}|Vs]};
  2615. false ->
  2616. {V, Vs}
  2617. end.
  2618. scope('_') -> '_';
  2619. scope(all) -> '_';
  2620. scope(global) -> g;
  2621. scope(local) -> l;
  2622. scope(S) when S==l; S==g -> S.
  2623. type('_') -> '_';
  2624. type(all) -> '_';
  2625. type(T) when T==n; T==p; T==c; T==a -> T;
  2626. type(names) -> n;
  2627. type(props) -> p;
  2628. type(resources) -> r;
  2629. type(counters) -> c;
  2630. type(aggr_counters) -> a;
  2631. type(resource_counters) -> rc.
  2632. rev_keypat(Context) ->
  2633. {S,T} = get_s_t(Context),
  2634. {T,S,'_'}.
  2635. get_s_t({S,T}) -> {scope(S), type(T)};
  2636. get_s_t(T) when is_atom(T) ->
  2637. {scope(all), type(T)}.
  2638. is_var('$1') -> {true,1};
  2639. is_var('$2') -> {true,2};
  2640. is_var('$3') -> {true,3};
  2641. is_var('$4') -> {true,4};
  2642. is_var('$5') -> {true,5};
  2643. is_var('$6') -> {true,6};
  2644. is_var('$7') -> {true,7};
  2645. is_var('$8') -> {true,8};
  2646. is_var('$9') -> {true,9};
  2647. is_var(X) when is_atom(X) ->
  2648. case atom_to_list(X) of
  2649. "\$" ++ Tl ->
  2650. try N = list_to_integer(Tl),
  2651. {true,N}
  2652. catch
  2653. error:_ ->
  2654. false
  2655. end;
  2656. _ ->
  2657. false
  2658. end;
  2659. is_var(_) -> false.
  2660. rewrite(Gs, R) ->
  2661. [rewrite1(G, R) || G <- Gs].
  2662. rewrite1('$_', _) ->
  2663. obj_prod();
  2664. rewrite1('$$', _) ->
  2665. obj_prod_l();
  2666. rewrite1(Guard, R) when is_tuple(Guard) ->
  2667. list_to_tuple([rewrite1(G, R) || G <- tuple_to_list(Guard)]);
  2668. rewrite1(Exprs, R) when is_list(Exprs) ->
  2669. [rewrite1(E, R) || E <- Exprs];
  2670. rewrite1(V, R) when is_atom(V) ->
  2671. case is_var(V) of
  2672. {true,_N} ->
  2673. case lists:keysearch(V, 1, R) of
  2674. {value, {_, V1}} ->
  2675. V1;
  2676. false ->
  2677. V
  2678. end;
  2679. false ->
  2680. V
  2681. end;
  2682. rewrite1(Expr, _) ->
  2683. Expr.
  2684. %% @spec () -> any()
  2685. %%
  2686. %% @doc
  2687. %% @equiv table({all, all})
  2688. %% @end
  2689. table() ->
  2690. table({all, all}).
  2691. %% @spec (Context::context()) -> any()
  2692. %%
  2693. %% @doc
  2694. %% @equiv table(Context, [])
  2695. %% @end
  2696. %%
  2697. table(Context) ->
  2698. table(Context, []).
  2699. %% @spec (Context::context(), Opts) -> any()
  2700. %%
  2701. %% @doc QLC table generator for the gproc registry.
  2702. %% Context specifies which subset of the registry should be queried.
  2703. %% See [http://www.erlang.org/doc/man/qlc.html].
  2704. %%
  2705. %% NOTE: By default, the gproc table generator will not filter out entries
  2706. %% belonging to processes that have just died, but which have yet to be cleared
  2707. %% out of the registry. Use the option `check_pids' (or `{check_pids, true}')
  2708. %% if you want to filter out dead entries already in the query. There will be
  2709. %% some overhead associated with doing so, and given that the process monitoring
  2710. %% is asynchronous, there can never be any guarantee that there are no dead
  2711. %% entries in the list by the time your program processes it.
  2712. %%
  2713. %% @end
  2714. table(Context, Opts) ->
  2715. Ctxt = {_, Type} = get_s_t(Context),
  2716. [Traverse, NObjs] = [proplists:get_value(K,Opts,Def) ||
  2717. {K,Def} <- [{traverse,select}, {n_objects,100}]],
  2718. CheckPids = proplists:get_bool(check_pids, Opts),
  2719. TF = case Traverse of
  2720. first_next ->
  2721. fun() -> qlc_next(Ctxt, first(Ctxt), CheckPids) end;
  2722. last_prev -> fun() -> qlc_prev(Ctxt, last(Ctxt), CheckPids) end;
  2723. select ->
  2724. fun(MS) -> qlc_select(
  2725. CheckPids,
  2726. select(Ctxt, wrap_qlc_ms_prod(CheckPids, MS),
  2727. NObjs))
  2728. end;
  2729. {select,MS} ->
  2730. fun() -> qlc_select(
  2731. CheckPids,
  2732. select(Ctxt, wrap_qlc_ms_prod(CheckPids, MS),
  2733. NObjs))
  2734. end;
  2735. _ ->
  2736. erlang:error(badarg, [Ctxt,Opts])
  2737. end,
  2738. InfoFun = fun(indices) -> [2];
  2739. (is_unique_objects) -> is_unique(Type);
  2740. (keypos) -> 1;
  2741. (is_sorted_key) -> true;
  2742. (num_of_objects) ->
  2743. %% this is just a guesstimate.
  2744. trunc(ets:info(?TAB,size) / 2.5)
  2745. end,
  2746. LookupFun =
  2747. case Traverse of
  2748. {select, _MS} -> undefined;
  2749. _ -> fun(Pos, Ks) -> qlc_lookup(Ctxt, Pos, Ks, CheckPids) end
  2750. end,
  2751. qlc:table(TF, [{info_fun, InfoFun},
  2752. {lookup_fun, LookupFun}] ++ [{K,V} || {K,V} <- Opts,
  2753. K =/= traverse,
  2754. K =/= n_objects]).
  2755. wrap_qlc_ms_prod(false, Pats) ->
  2756. Pats;
  2757. wrap_qlc_ms_prod(true, Pats) ->
  2758. [ wrap_qlc_ms_prod_(P) || P <- Pats ].
  2759. wrap_qlc_ms_prod_({H, Gs, [P]}) ->
  2760. {H, Gs, [{{ {element, 2, '$_'}, P }}]}.
  2761. qlc_lookup(_Scope, 1, Keys, Check) ->
  2762. lists:flatmap(
  2763. fun(Key) ->
  2764. remove_dead(
  2765. Check,
  2766. ets:select(?TAB, [{ {{Key,'_'},'_','_'}, [],
  2767. [{{ {element,1,{element,1,'$_'}},
  2768. {element,2,'$_'},
  2769. {element,3,'$_'} }}] }]))
  2770. end, Keys);
  2771. qlc_lookup(Scope, 2, Pids, Check) ->
  2772. lists:flatmap(fun(Pid) ->
  2773. qlc_lookup_pid(Pid, Scope, Check)
  2774. end, Pids).
  2775. remove_dead(false, Objs) ->
  2776. Objs;
  2777. remove_dead(true, Objs) ->
  2778. [ Reg || {_, Pid, _} = Reg <- Objs,
  2779. not ?PID_IS_DEAD(Pid) ].
  2780. %% While it may seem obsessive not to do the sensible pid liveness check here
  2781. %% every time, we make it optional for consistency; this way, we can devise
  2782. %% a test case that verifies the difference between having the option and not.
  2783. qlc_lookup_pid(Pid, Scope, Check) ->
  2784. case Check andalso ?PID_IS_DEAD(Pid) of
  2785. true ->
  2786. [];
  2787. false ->
  2788. Found =
  2789. ets:select(?TAB, [{{{Pid, rev_keypat(Scope)}, '_'},
  2790. [], ['$_']}]),
  2791. lists:flatmap(
  2792. fun({{_,{T,_,_}=K}, _}) ->
  2793. K2 = if T==n orelse T==a -> T;
  2794. true -> Pid
  2795. end,
  2796. case ets:lookup(?TAB, {K,K2}) of
  2797. [{{Key,_},_,Value}] ->
  2798. [{Key, Pid, Value}];
  2799. [] ->
  2800. []
  2801. end
  2802. end, Found)
  2803. end.
  2804. qlc_next(_, '$end_of_table', _) -> [];
  2805. qlc_next(Scope, K, Check) ->
  2806. case ets:lookup(?TAB, K) of
  2807. [{{Key,_}, Pid, V}] ->
  2808. case Check andalso ?PID_IS_DEAD(Pid) of
  2809. true ->
  2810. qlc_next(Scope, next(Scope, K), Check);
  2811. false ->
  2812. [{Key,Pid,V}] ++ fun() ->
  2813. qlc_next(Scope, next(Scope, K),
  2814. Check)
  2815. end
  2816. end;
  2817. [] ->
  2818. qlc_next(Scope, next(Scope, K), Check)
  2819. end.
  2820. qlc_prev(_, '$end_of_table', _) -> [];
  2821. qlc_prev(Scope, K, Check) ->
  2822. case ets:lookup(?TAB, K) of
  2823. [{{Key,_},Pid,V}] ->
  2824. case Check andalso ?PID_IS_DEAD(Pid) of
  2825. true ->
  2826. qlc_prev(Scope, prev(Scope, K), Check);
  2827. false ->
  2828. [{Key,Pid,V}] ++ fun() ->
  2829. qlc_prev(Scope, prev(Scope, K),
  2830. Check)
  2831. end
  2832. end;
  2833. [] ->
  2834. qlc_prev(Scope, prev(Scope, K), Check)
  2835. end.
  2836. qlc_select(_, '$end_of_table') ->
  2837. [];
  2838. qlc_select(true, {Objects, Cont}) ->
  2839. case [O || {Pid,O} <- Objects,
  2840. not ?PID_IS_DEAD(Pid)] of
  2841. [] ->
  2842. %% re-run search
  2843. qlc_select(true, ets:select(Cont));
  2844. Found ->
  2845. Found ++ fun() -> qlc_select(true, ets:select(Cont)) end
  2846. end;
  2847. qlc_select(false, {Objects, Cont}) ->
  2848. Objects ++ fun() -> qlc_select(false, ets:select(Cont)) end.
  2849. is_unique(n) -> true;
  2850. is_unique(a) -> true;
  2851. is_unique(_) -> false.
  2852. is_regged([{_, _, _}]) ->
  2853. true;
  2854. is_regged(_) ->
  2855. false.