gproc.erl 68 KB


  1. %% ``The contents of this file are subject to the Erlang Public License,
  2. %% Version 1.1, (the "License"); you may not use this file except in
  3. %% compliance with the License. You should have received a copy of the
  4. %% Erlang Public License along with this software. If not, it can be
  5. %% retrieved via the world wide web at http://www.erlang.org/.
  6. %%
  7. %% Software distributed under the License is distributed on an "AS IS"
  8. %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  9. %% the License for the specific language governing rights and limitations
  10. %% under the License.
  11. %%
  12. %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
  13. %% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
  14. %% AB. All Rights Reserved.''
  15. %%
  16. %% @author Ulf Wiger <ulf@wiger.net>
  17. %%
  18. %% @doc Extended process registry
  19. %% This module implements an extended process registry
  20. %%
  21. %% For a detailed description, see
  22. %% <a href="erlang07-wiger.pdf">erlang07-wiger.pdf</a>.
  23. %%
  24. %% <h2>Tuning Gproc performance</h2>
  25. %%
  26. %% Gproc relies on a central server and an ordered-set ets table.
  27. %% Effort is made to perform as much work as possible in the client without
  28. %% sacrificing consistency. A few things can be tuned by setting the following
  29. %% application environment variables in the top application of `gproc'
  30. %% (usually `gproc'):
  31. %%
  32. %% * `{ets_options, list()}' - Currently, the options `{write_concurrency, F}'
  33. %% and `{read_concurrency, F}' are allowed. The default is
  34. %% `[{write_concurrency, true}, {read_concurrency, true}]'
  35. %% * `{server_options, list()}' - These will be passed as spawn options when
  36. %% starting the `gproc' and `gproc_dist' servers. Default is `[]'. It is
  37. %% likely that `{priority, high | max}' and/or increasing `min_heap_size'
  38. %% will improve performance.
  39. %%
  40. %% @end
  41. %% @type type() = n | p | c | a. n = name; p = property; c = counter;
  42. %% a = aggregate_counter
  43. %% @type scope() = l | g. l = local registration; g = global registration
  44. %%
  45. %% @type reg_id() = {type(), scope(), any()}.
  46. %% @type unique_id() = {n | a, scope(), any()}.
  47. %%
  48. %% @type sel_scope() = scope | all | global | local.
  49. %% @type sel_type() = type() | names | props | counters | aggr_counters.
  50. %% @type context() = {scope(), type()} | type(). {'all','all'} is the default
  51. %%
  52. %% @type headpat() = {keypat(),pidpat(),ValPat}.
  53. %% @type keypat() = {sel_type() | sel_var(),
  54. %% l | g | sel_var(),
  55. %% any()}.
  56. %% @type pidpat() = pid() | sel_var().
  57. %% @type sel_var() = DollarVar | '_'.
  58. %% @type sel_pattern() = [{headpat(), Guards, Prod}].
  59. %% @type key() = {type(), scope(), any()}.
  60. %%
  61. %% update_counter increment
  62. %% @type ctr_incr() = integer().
  63. %% @type ctr_thr() = integer().
  64. %% @type ctr_setval() = integer().
  65. %% @type ctr_update() = ctr_incr()
  66. %% | {ctr_incr(), ctr_thr(), ctr_setval()}.
  67. %% @type increment() = ctr_incr() | ctr_update() | [ctr_update()].
  68. -module(gproc).
  69. -behaviour(gen_server).
  70. -export([start_link/0,
  71. reg/1, reg/2, unreg/1,
  72. reg_shared/1, reg_shared/2, unreg_shared/1,
  73. mreg/3,
  74. munreg/3,
  75. set_value/2,
  76. get_value/1, get_value/2,
  77. where/1,
  78. await/1, await/2,
  79. nb_wait/1,
  80. cancel_wait/2,
  81. cancel_wait_or_monitor/1,
  82. monitor/1,
  83. demonitor/2,
  84. lookup_pid/1,
  85. lookup_pids/1,
  86. lookup_value/1,
  87. lookup_values/1,
  88. update_counter/2,
  89. update_counters/2,
  90. reset_counter/1,
  91. update_shared_counter/2,
  92. give_away/2,
  93. goodbye/0,
  94. send/2,
  95. bcast/2, bcast/3,
  96. info/1, info/2,
  97. i/0,
  98. select/1, select/2, select/3,
  99. select_count/1, select_count/2,
  100. first/1,
  101. next/2,
  102. prev/2,
  103. last/1,
  104. table/0, table/1, table/2]).
  105. %% Environment handling
  106. -export([get_env/3, get_env/4,
  107. get_set_env/3, get_set_env/4,
  108. set_env/5]).
  109. %% Convenience functions
  110. -export([add_local_name/1,
  111. add_global_name/1,
  112. add_local_property/2,
  113. add_global_property/2,
  114. add_local_counter/2,
  115. add_global_counter/2,
  116. add_local_aggr_counter/1,
  117. add_global_aggr_counter/1,
  118. add_shared_local_counter/2,
  119. lookup_local_name/1,
  120. lookup_global_name/1,
  121. lookup_local_properties/1,
  122. lookup_global_properties/1,
  123. lookup_local_counters/1,
  124. lookup_global_counters/1,
  125. lookup_local_aggr_counter/1,
  126. lookup_global_aggr_counter/1]).
  127. %% Callbacks for behaviour support
  128. -export([whereis_name/1,
  129. register_name/2,
  130. unregister_name/1]).
  131. -export([default/1]).
  132. %%% internal exports
  133. -export([init/1,
  134. handle_cast/2,
  135. handle_call/3,
  136. handle_info/2,
  137. code_change/3,
  138. terminate/2]).
  139. %% this shouldn't be necessary
  140. -export([audit_process/1]).
  141. -include("gproc_int.hrl").
  142. -include("gproc.hrl").
  143. -include_lib("eunit/include/eunit.hrl").
  144. -define(SERVER, ?MODULE).
  145. %%-define(l, l(?LINE)). % when activated, calls a traceable empty function
  146. -define(l, ignore).
  147. -define(CHK_DIST,
  148. case whereis(gproc_dist) of
  149. undefined ->
  150. ?THROW_GPROC_ERROR(local_only);
  151. _ ->
  152. ok
  153. end).
  154. -record(state, {}).
  155. %% @spec () -> {ok, pid()}
  156. %%
  157. %% @doc Starts the gproc server.
  158. %%
  159. %% This function is intended to be called from gproc_sup, as part of
  160. %% starting the gproc application.
  161. %% @end
  162. start_link() ->
  163. _ = create_tabs(),
  164. SpawnOpts = gproc_lib:valid_opts(server_options, []),
  165. gen_server:start_link({local, ?SERVER}, ?MODULE, [],
  166. [{spawn_opt, SpawnOpts}]).
  167. %% spec(Name::any()) -> true
  168. %%
  169. %% @doc Registers a local (unique) name. @equiv reg({n,l,Name})
  170. %% @end
  171. %%
  172. add_local_name(Name) -> ?CATCH_GPROC_ERROR(reg1({n,l,Name}, undefined), [Name]).
  173. %% spec(Name::any()) -> true
  174. %%
  175. %% @doc Registers a global (unique) name. @equiv reg({n,g,Name})
  176. %% @end
  177. %%
  178. add_global_name(Name) -> ?CATCH_GPROC_ERROR(reg1({n,g,Name}, undefined), [Name]).
  179. %% spec(Name::any(), Value::any()) -> true
  180. %%
  181. %% @doc Registers a local (non-unique) property. @equiv reg({p,l,Name},Value)
  182. %% @end
  183. %%
  184. add_local_property(Name , Value) ->
  185. ?CATCH_GPROC_ERROR(reg1({p,l,Name}, Value), [Name, Value]).
  186. %% spec(Name::any(), Value::any()) -> true
  187. %%
  188. %% @doc Registers a global (non-unique) property. @equiv reg({p,g,Name},Value)
  189. %% @end
  190. %%
  191. add_global_property(Name, Value) ->
  192. ?CATCH_GPROC_ERROR(reg1({p,g,Name}, Value), [Name, Value]).
  193. %% spec(Name::any(), Initial::integer()) -> true
  194. %%
  195. %% @doc Registers a local (non-unique) counter. @equiv reg({c,l,Name},Value)
  196. %% @end
  197. %%
  198. add_local_counter(Name, Initial) when is_integer(Initial) ->
  199. ?CATCH_GPROC_ERROR(reg1({c,l,Name}, Initial), [Name, Initial]).
  200. %% spec(Name::any(), Initial::integer()) -> true
  201. %%
  202. %% @doc Registers a local shared (unique) counter.
  203. %% @equiv reg_shared({c,l,Name},Value)
  204. %% @end
  205. %%
  206. add_shared_local_counter(Name, Initial) when is_integer(Initial) ->
  207. reg_shared({c,l,Name}, Initial).
  208. %% spec(Name::any(), Initial::integer()) -> true
  209. %%
  210. %% @doc Registers a global (non-unique) counter. @equiv reg({c,g,Name},Value)
  211. %% @end
  212. %%
  213. add_global_counter(Name, Initial) when is_integer(Initial) ->
  214. ?CATCH_GPROC_ERROR(reg1({c,g,Name}, Initial), [Name, Initial]).
  215. %% spec(Name::any()) -> true
  216. %%
  217. %% @doc Registers a local (unique) aggregated counter.
  218. %% @equiv reg({a,l,Name})
  219. %% @end
  220. %%
  221. add_local_aggr_counter(Name) -> ?CATCH_GPROC_ERROR(reg1({a,l,Name}), [Name]).
  222. %% spec(Name::any()) -> true
  223. %%
  224. %% @doc Registers a global (unique) aggregated counter.
  225. %% @equiv reg({a,g,Name})
  226. %% @end
  227. %%
  228. add_global_aggr_counter(Name) -> ?CATCH_GPROC_ERROR(reg1({a,g,Name}), [Name]).
  229. %% @spec (Name::any()) -> pid()
  230. %%
  231. %% @doc Lookup a local unique name. Fails if there is no such name.
  232. %% @equiv where({n,l,Name})
  233. %% @end
  234. %%
  235. lookup_local_name(Name) -> where({n,l,Name}).
  236. %% @spec (Name::any()) -> pid()
  237. %%
  238. %% @doc Lookup a global unique name. Fails if there is no such name.
  239. %% @equiv where({n,g,Name})
  240. %% @end
  241. %%
  242. lookup_global_name(Name) -> where({n,g,Name}).
  243. %% @spec (Name::any()) -> integer()
  244. %%
  245. %% @doc Lookup a local (unique) aggregated counter and returns its value.
  246. %% Fails if there is no such object.
  247. %% @equiv where({a,l,Name})
  248. %% @end
  249. %%
  250. lookup_local_aggr_counter(Name) -> lookup_value({a,l,Name}).
  251. %% @spec (Name::any()) -> integer()
  252. %%
  253. %% @doc Lookup a global (unique) aggregated counter and returns its value.
  254. %% Fails if there is no such object.
  255. %% @equiv where({a,g,Name})
  256. %% @end
  257. %%
  258. lookup_global_aggr_counter(Name) -> lookup_value({a,g,Name}).
  259. %% @spec (Property::any()) -> [{pid(), Value}]
  260. %%
  261. %% @doc Look up all local (non-unique) instances of a given Property.
  262. %% Returns a list of {Pid, Value} tuples for all matching objects.
  263. %% @equiv lookup_values({p, l, Property})
  264. %% @end
  265. %%
  266. lookup_local_properties(P) -> lookup_values({p,l,P}).
  267. %% @spec (Property::any()) -> [{pid(), Value}]
  268. %%
  269. %% @doc Look up all global (non-unique) instances of a given Property.
  270. %% Returns a list of {Pid, Value} tuples for all matching objects.
  271. %% @equiv lookup_values({p, g, Property})
  272. %% @end
  273. %%
  274. lookup_global_properties(P) -> lookup_values({p,g,P}).
  275. %% @spec (Counter::any()) -> [{pid(), Value::integer()}]
  276. %%
  277. %% @doc Look up all local (non-unique) instances of a given Counter.
  278. %% Returns a list of {Pid, Value} tuples for all matching objects.
  279. %% @equiv lookup_values({c, l, Counter})
  280. %% @end
  281. %%
  282. lookup_local_counters(P) -> lookup_values({c,l,P}).
  283. %% @spec (Counter::any()) -> [{pid(), Value::integer()}]
  284. %%
  285. %% @doc Look up all global (non-unique) instances of a given Counter.
  286. %% Returns a list of {Pid, Value} tuples for all matching objects.
  287. %% @equiv lookup_values({c, g, Counter})
  288. %% @end
  289. %%
  290. lookup_global_counters(P) -> lookup_values({c,g,P}).
  291. %% @spec get_env(Scope::scope(), App::atom(), Key::atom()) -> term()
  292. %% @equiv get_env(Scope, App, Key, [app_env])
  293. get_env(Scope, App, Key) ->
  294. get_env(Scope, App, Key, [app_env]).
  295. %% @spec (Scope::scope(), App::atom(), Key::atom(), Strategy) -> term()
  296. %% Strategy = [Alternative]
  297. %% Alternative = app_env
  298. %% | os_env
  299. %% | inherit | {inherit, pid()} | {inherit, unique_id()}
  300. %% | init_arg
  301. %% | {mnesia, ActivityType, Oid, Pos}
  302. %% | {default, term()}
  303. %% | error
  304. %% @doc Read an environment value, potentially cached as a `gproc_env' property.
  305. %%
  306. %% This function first tries to read the value of a cached property,
  307. %% `{p, Scope, {gproc_env, App, Key}}'. If this fails, it will try the provided
  308. %% alternative strategy. `Strategy' is a list of alternatives, tried in order.
  309. %% Each alternative can be one of:
  310. %%
  311. %% * `app_env' - try `application:get_env(App, Key)'
  312. %% * `os_env' - try `os:getenv(ENV)', where `ENV' is `Key' converted into an
  313. %% uppercase string
  314. %% * `{os_env, ENV}' - try `os:getenv(ENV)'
  315. %% * `inherit' - inherit the cached value, if any, held by the parent process.
  316. %% * `{inherit, Pid}' - inherit the cached value, if any, held by `Pid'.
  317. %% * `{inherit, Id}' - inherit the cached value, if any, held by the process
  318. %% registered in `gproc' as `Id'.
  319. %% * `init_arg' - try `init:get_argument(Key)'; expects a single value, if any.
  320. %% * `{mnesia, ActivityType, Oid, Pos}' - try
  321. %% `mnesia:activity(ActivityType, fun() -> mnesia:read(Oid) end)'; retrieve
  322. %% the value in position `Pos' if object found.
  323. %% * `{default, Value}' - set a default value to return once alternatives have
  324. %% been exhausted; if not set, `undefined' will be returned.
  325. %% * `error' - raise an exception, `erlang:error(gproc_env, [App, Key, Scope])'.
  326. %%
  327. %% While any alternative can occur more than once, the only one that might make
  328. %% sense to use multiple times is `{default, Value}'.
  329. %%
  330. %% The return value will be one of:
  331. %%
  332. %% * The value of the first matching alternative, or `error' eception,
  333. %% whichever comes first
  334. %% * The last instance of `{default, Value}', or `undefined', if there is no
  335. %% matching alternative, default or `error' entry in the list.
  336. %%
  337. %% The `error' option can be used to assert that a value has been previously
  338. %% cached. Alternatively, it can be used to assert that a value is either cached
  339. %% or at least defined somewhere,
  340. %% e.g. `get_env(l, mnesia, dir, [app_env, error])'.
  341. %% @end
  342. get_env(Scope, App, Key, Strategy)
  343. when Scope==l, is_atom(App), is_atom(Key);
  344. Scope==g, is_atom(App), is_atom(Key) ->
  345. do_get_env(Scope, App, Key, Strategy, false).
  346. %% @spec get_set_env(Scope::scope(), App::atom(), Key::atom()) -> term()
  347. %% @equiv get_set_env(Scope, App, Key, [app_env])
  348. get_set_env(Scope, App, Key) ->
  349. get_set_env(Scope, App, Key, [app_env]).
  350. %% @spec get_set_env(Scope::scope(), App::atom(), Key::atom(), Strategy) ->
  351. %% Value
  352. %% @doc Fetch and cache an environment value, if not already cached.
  353. %%
  354. %% This function does the same thing as {@link get_env/4}, but also updates the
  355. %% cache. Note that the cache will be updated even if the result of the lookup
  356. %% is `undefined'.
  357. %%
  358. %% @see get_env/4.
  359. %% @end
  360. %%
  361. get_set_env(Scope, App, Key, Strategy)
  362. when Scope==l, is_atom(App), is_atom(Key);
  363. Scope==g, is_atom(App), is_atom(Key) ->
  364. do_get_env(Scope, App, Key, Strategy, true).
  365. do_get_env(Context, App, Key, Alternatives, Set) ->
  366. case lookup_env(Context, App, Key, self()) of
  367. undefined ->
  368. check_alternatives(Alternatives, Context, App, Key, undefined, Set);
  369. {ok, Value} ->
  370. Value
  371. end.
  372. %% @spec set_env(Scope::scope(), App::atom(),
  373. %% Key::atom(), Value::term(), Strategy) -> Value
  374. %% Strategy = [Alternative]
  375. %% Alternative = app_env | os_env | {os_env, VAR}
  376. %% | {mnesia, ActivityType, Oid, Pos}
  377. %%
  378. %% @doc Updates the cached value as well as underlying environment.
  379. %%
  380. %% This function should be exercised with caution, as it affects the larger
  381. %% environment outside gproc. This function modifies the cached value, and then
  382. %% proceeds to update the underlying environment (OS environment variable or
  383. %% application environment variable).
  384. %%
  385. %% When the `mnesia' alternative is used, gproc will try to update any existing
  386. %% object, changing only the `Pos' position. If no such object exists, it will
  387. %% create a new object, setting any other attributes (except `Pos' and the key)
  388. %% to `undefined'.
  389. %% @end
  390. %%
  391. set_env(Scope, App, Key, Value, Strategy)
  392. when Scope==l, is_atom(App), is_atom(Key);
  393. Scope==g, is_atom(App), is_atom(Key) ->
  394. case is_valid_set_strategy(Strategy, Value) of
  395. true ->
  396. update_cached_env(Scope, App, Key, Value),
  397. set_strategy(Strategy, App, Key, Value);
  398. false ->
  399. erlang:error(badarg)
  400. end.
  401. check_alternatives([{default, Val}|Alts], Scope, App, Key, _, Set) ->
  402. check_alternatives(Alts, Scope, App, Key, Val, Set);
  403. check_alternatives([H|T], Scope, App, Key, Def, Set) ->
  404. case try_alternative(H, App, Key, Scope) of
  405. undefined ->
  406. check_alternatives(T, Scope, App, Key, Def, Set);
  407. {ok, Value} ->
  408. if Set ->
  409. cache_env(Scope, App, Key, Value),
  410. Value;
  411. true ->
  412. Value
  413. end
  414. end;
  415. check_alternatives([], Scope, App, Key, Def, Set) ->
  416. if Set ->
  417. cache_env(Scope, App, Key, Def);
  418. true ->
  419. ok
  420. end,
  421. Def.
  422. try_alternative(error, App, Key, Scope) ->
  423. erlang:error(gproc_env, [App, Key, Scope]);
  424. try_alternative(inherit, App, Key, Scope) ->
  425. case get('$ancestors') of
  426. [P|_] ->
  427. lookup_env(Scope, App, Key, P);
  428. _ ->
  429. undefined
  430. end;
  431. try_alternative({inherit, P}, App, Key, Scope) when is_pid(P) ->
  432. lookup_env(Scope, App, Key, P);
  433. try_alternative({inherit, P}, App, Key, Scope) ->
  434. case where(P) of
  435. undefined -> undefined;
  436. Pid when is_pid(Pid) ->
  437. lookup_env(Scope, App, Key, Pid)
  438. end;
  439. try_alternative(app_env, App, Key, _Scope) ->
  440. case application:get_env(App, Key) of
  441. undefined -> undefined;
  442. {ok, undefined} -> undefined;
  443. {ok, Value} -> {ok, Value}
  444. end;
  445. try_alternative(os_env, _App, Key, _) ->
  446. case os:getenv(os_env_key(Key)) of
  447. false -> undefined;
  448. Val -> {ok, Val}
  449. end;
  450. try_alternative({os_env, Key}, _, _, _) ->
  451. case os:getenv(Key) of
  452. false -> undefined;
  453. Val -> {ok, Val}
  454. end;
  455. try_alternative(init_arg, _, Key, _) ->
  456. case init:get_argument(Key) of
  457. {ok, [[Value]]} ->
  458. {ok, Value};
  459. error ->
  460. undefined
  461. end;
  462. try_alternative({mnesia,Type,Key,Pos}, _, _, _) ->
  463. case mnesia:activity(Type, fun() -> mnesia:read(Key) end) of
  464. [] -> undefined;
  465. [Found] ->
  466. {ok, element(Pos, Found)}
  467. end.
  468. os_env_key(Key) ->
  469. string:to_upper(atom_to_list(Key)).
  470. lookup_env(Scope, App, Key, P) ->
  471. case ets:lookup(?TAB, {{p, Scope, {gproc_env, App, Key}}, P}) of
  472. [] ->
  473. undefined;
  474. [{_, _, Value}] ->
  475. {ok, Value}
  476. end.
  477. cache_env(Scope, App, Key, Value) ->
  478. ?CATCH_GPROC_ERROR(
  479. reg1({p, Scope, {gproc_env, App, Key}}, Value),
  480. [Scope,App,Key,Value]).
  481. update_cached_env(Scope, App, Key, Value) ->
  482. case lookup_env(Scope, App, Key, self()) of
  483. undefined ->
  484. cache_env(Scope, App, Key, Value);
  485. {ok, _} ->
  486. set_value({p, Scope, {gproc_env, App, Key}}, Value)
  487. end.
  488. is_valid_set_strategy([os_env|T], Value) ->
  489. is_string(Value) andalso is_valid_set_strategy(T, Value);
  490. is_valid_set_strategy([{os_env, _}|T], Value) ->
  491. is_string(Value) andalso is_valid_set_strategy(T, Value);
  492. is_valid_set_strategy([app_env|T], Value) ->
  493. is_valid_set_strategy(T, Value);
  494. is_valid_set_strategy([{mnesia,_Type,_Oid,_Pos}|T], Value) ->
  495. is_valid_set_strategy(T, Value);
  496. is_valid_set_strategy([], _) ->
  497. true;
  498. is_valid_set_strategy(_, _) ->
  499. false.
  500. set_strategy([H|T], App, Key, Value) ->
  501. case H of
  502. app_env ->
  503. application:set_env(App, Key, Value);
  504. os_env ->
  505. os:putenv(os_env_key(Key), Value);
  506. {os_env, ENV} ->
  507. os:putenv(ENV, Value);
  508. {mnesia,Type,Oid,Pos} ->
  509. mnesia:activity(
  510. Type,
  511. fun() ->
  512. Rec = case mnesia:read(Oid) of
  513. [] ->
  514. {Tab,K} = Oid,
  515. Tag = mnesia:table_info(Tab, record_name),
  516. Attrs = mnesia:table_info(Tab, attributes),
  517. list_to_tuple(
  518. [Tag,K |
  519. [undefined || _ <- tl(Attrs)]]);
  520. [Old] ->
  521. Old
  522. end,
  523. mnesia:write(setelement(Pos, Rec, Value))
  524. end)
  525. end,
  526. set_strategy(T, App, Key, Value);
  527. set_strategy([], _, _, Value) ->
  528. Value.
  529. is_string(S) ->
  530. try begin _ = iolist_to_binary(S),
  531. true
  532. end
  533. catch
  534. error:_ ->
  535. false
  536. end.
  537. %% @spec reg(Key::key()) -> true
  538. %%
  539. %% @doc
  540. %% @equiv reg(Key, default(Key))
  541. %% @end
  542. reg(Key) ->
  543. ?CATCH_GPROC_ERROR(reg1(Key), [Key]).
  544. reg1(Key) ->
  545. reg1(Key, default(Key)).
  546. default({T,_,_}) when T==c -> 0;
  547. default(_) -> undefined.
  548. %% @spec await(Key::key()) -> {pid(),Value}
  549. %% @equiv await(Key,infinity)
  550. %%
  551. await(Key) ->
  552. ?CATCH_GPROC_ERROR(await1(Key, infinity), [Key]).
  553. %% @spec await(Key::key(), Timeout) -> {pid(),Value}
  554. %% Timeout = integer() | infinity
  555. %%
  556. %% @doc Wait for a local name to be registered.
  557. %% The function raises an exception if the timeout expires. Timeout must be
  558. %% either an interger &gt; 0 or 'infinity'.
  559. %% A small optimization: we first perform a lookup, to see if the name
  560. %% is already registered. This way, the cost of the operation will be
  561. %% roughly the same as of where/1 in the case where the name is already
  562. %% registered (the difference: await/2 also returns the value).
  563. %% @end
  564. %%
  565. await(Key, Timeout) ->
  566. ?CATCH_GPROC_ERROR(await1(Key, Timeout), [Key, Timeout]).
  567. await1({n,g,_} = Key, Timeout) ->
  568. ?CHK_DIST,
  569. request_wait(Key, Timeout);
  570. await1({n,l,_} = Key, Timeout) ->
  571. case ets:lookup(?TAB, {Key, n}) of
  572. [{_, Pid, Value}] ->
  573. case is_process_alive(Pid) of
  574. true ->
  575. {Pid, Value};
  576. false ->
  577. %% we can send an asynchronous audit request, since the purpose is
  578. %% only to ensure that the server handles the audit before it serves
  579. %% our 'await' request. Strictly speaking, we could allow the bad Pid
  580. %% to be returned, as there are no guarantees that whatever Pid we return
  581. %% will still be alive when addressed. Still, we don't want to knowingly
  582. %% serve bad data.
  583. nb_audit_process(Pid),
  584. request_wait(Key, Timeout)
  585. end;
  586. _ ->
  587. request_wait(Key, Timeout)
  588. end;
  589. await1(_, _) ->
  590. throw(badarg).
  591. request_wait({n,C,_} = Key, Timeout) when C==l; C==g ->
  592. TRef = case Timeout of
  593. infinity -> no_timer;
  594. T when is_integer(T), T > 0 ->
  595. erlang:start_timer(T, self(), gproc_timeout);
  596. _ ->
  597. ?THROW_GPROC_ERROR(badarg)
  598. end,
  599. WRef = case {call({await,Key,self()}, C), C} of
  600. {{R, {Kg,Pg,Vg}}, g} ->
  601. self() ! {gproc, R, registered, {Kg,Pg,Vg}},
  602. R;
  603. {R,_} ->
  604. R
  605. end,
  606. receive
  607. {gproc, WRef, registered, {_K, Pid, V}} ->
  608. _ = case TRef of
  609. no_timer -> ignore;
  610. _ -> erlang:cancel_timer(TRef)
  611. end,
  612. {Pid, V};
  613. {timeout, TRef, gproc_timeout} ->
  614. cancel_wait(Key, WRef),
  615. ?THROW_GPROC_ERROR(timeout)
  616. end.
  617. %% @spec nb_wait(Key::key()) -> Ref
  618. %%
  619. %% @doc Wait for a local name to be registered.
  620. %% The caller can expect to receive a message,
  621. %% {gproc, Ref, registered, {Key, Pid, Value}}, once the name is registered.
  622. %% @end
  623. %%
  624. nb_wait(Key) ->
  625. ?CATCH_GPROC_ERROR(nb_wait1(Key), [Key]).
  626. nb_wait1({n,g,_} = Key) ->
  627. ?CHK_DIST,
  628. call({await, Key, self()}, g);
  629. nb_wait1({n,l,_} = Key) ->
  630. call({await, Key, self()}, l);
  631. nb_wait1(_) ->
  632. ?THROW_GPROC_ERROR(badarg).
  633. %% @spec cancel_wait(Key::key(), Ref) -> ok
  634. %% Ref = all | reference()
  635. %%
  636. %% @doc Cancels a previous call to nb_wait/1
  637. %%
  638. %% If `Ref = all', all wait requests on `Key' from the calling process
  639. %% are canceled.
  640. %% @end
  641. %%
  642. cancel_wait(Key, Ref) ->
  643. ?CATCH_GPROC_ERROR(cancel_wait1(Key, Ref), [Key, Ref]).
  644. cancel_wait1({_,g,_} = Key, Ref) ->
  645. ?CHK_DIST,
  646. cast({cancel_wait, self(), Key, Ref}, g),
  647. ok;
  648. cancel_wait1({_,l,_} = Key, Ref) ->
  649. cast({cancel_wait, self(), Key, Ref}, l),
  650. ok.
  651. cancel_wait_or_monitor(Key) ->
  652. ?CATCH_GPROC_ERROR(cancel_wait_or_monitor1(Key), [Key]).
  653. cancel_wait_or_monitor1({_,g,_} = Key) ->
  654. ?CHK_DIST,
  655. cast({cancel_wait_or_monitor, self(), Key}, g),
  656. ok;
  657. cancel_wait_or_monitor1({_,l,_} = Key) ->
  658. cast({cancel_wait_or_monitor, self(), Key}, l),
  659. ok.
  660. %% @spec monitor(key()) -> reference()
  661. %%
  662. %% @doc monitor a registered name
  663. %% This function works much like erlang:monitor(process, Pid), but monitors
  664. %% a unique name registered via gproc. A message, `{gproc, unreg, Ref, Key}'
  665. %% will be sent to the requesting process, if the name is unregistered or
  666. %% the registered process dies.
  667. %%
  668. %% If the name is not yet registered, the same message is sent immediately.
  669. %% @end
  670. monitor(Key) ->
  671. ?CATCH_GPROC_ERROR(monitor1(Key), [Key]).
  672. monitor1({T,g,_} = Key) when T==n; T==a ->
  673. ?CHK_DIST,
  674. call({monitor, Key, self()}, g);
  675. monitor1({T,l,_} = Key) when T==n; T==a ->
  676. call({monitor, Key, self()}, l);
  677. monitor1(_) ->
  678. ?THROW_GPROC_ERROR(badarg).
  679. %% @spec demonitor(key(), reference()) -> ok
  680. %%
  681. %% @doc Remove a monitor on a registered name
  682. %% This function is the reverse of monitor/1. It removes a monitor previously
  683. %% set on a unique name. This function always succeeds given legal input.
  684. %% @end
  685. demonitor(Key, Ref) ->
  686. ?CATCH_GPROC_ERROR(demonitor1(Key, Ref), [Key, Ref]).
  687. demonitor1({T,g,_} = Key, Ref) when T==n; T==a ->
  688. ?CHK_DIST,
  689. call({demonitor, Key, Ref, self()}, g);
  690. demonitor1({T,l,_} = Key, Ref) when T==n; T==a ->
  691. call({demonitor, Key, Ref, self()}, l);
  692. demonitor1(_, _) ->
  693. ?THROW_GPROC_ERROR(badarg).
  694. %% @spec reg(Key::key(), Value) -> true
  695. %%
  696. %% @doc Register a name or property for the current process
  697. %%
  698. %%
  699. reg(Key, Value) ->
  700. ?CATCH_GPROC_ERROR(reg1(Key, Value), [Key, Value]).
  701. reg1({_,g,_} = Key, Value) ->
  702. %% anything global
  703. ?CHK_DIST,
  704. gproc_dist:reg(Key, Value);
  705. reg1({p,l,_} = Key, Value) ->
  706. local_reg(Key, Value);
  707. reg1({a,l,_} = Key, undefined) ->
  708. call({reg, Key, undefined});
  709. reg1({c,l,_} = Key, Value) when is_integer(Value) ->
  710. call({reg, Key, Value});
  711. reg1({n,l,_} = Key, Value) ->
  712. call({reg, Key, Value});
  713. reg1(_, _) ->
  714. ?THROW_GPROC_ERROR(badarg).
  715. %% @spec reg_shared(Key::key()) -> true
  716. %%
  717. %% @doc Register a resource, but don't tie it to a particular process.
  718. %%
  719. %% `reg_shared({c,l,C}) -> reg_shared({c,l,C}, 0).'
  720. %% `reg_shared({a,l,A}) -> reg_shared({a,l,A}, undefined).'
  721. %% @end
  722. reg_shared(Key) ->
  723. ?CATCH_GPROC_ERROR(reg_shared1(Key), [Key]).
  724. reg_shared1({c,_,_} = Key) ->
  725. reg_shared(Key, 0);
  726. reg_shared1({a,_,_} = Key) ->
  727. reg_shared(Key, undefined).
  728. %% @spec reg_shared(Key::key(), Value) -> true
  729. %%
  730. %% @doc Register a resource, but don't tie it to a particular process.
  731. %%
  732. %% Shared resources are all unique. They remain until explicitly unregistered
  733. %% (using {@link unreg_shared/1}). The types of shared resources currently
  734. %% supported are `counter' and `aggregated counter'. In listings and query
  735. %% results, shared resources appear as other similar resources, except that
  736. %% `Pid == shared'. To wit, update_counter({c,l,myCounter}, 1, shared) would
  737. %% increment the shared counter `myCounter' with 1, provided it exists.
  738. %%
  739. %% A shared aggregated counter will track updates in exactly the same way as
  740. %% an aggregated counter which is owned by a process.
  741. %% @end
  742. %%
  743. reg_shared(Key, Value) ->
  744. ?CATCH_GPROC_ERROR(reg_shared1(Key, Value), [Key, Value]).
  745. reg_shared1({_,g,_} = Key, Value) ->
  746. %% anything global
  747. ?CHK_DIST,
  748. gproc_dist:reg_shared(Key, Value);
  749. reg_shared1({a,l,_} = Key, undefined) ->
  750. call({reg_shared, Key, undefined});
  751. reg_shared1({c,l,_} = Key, Value) when is_integer(Value) ->
  752. call({reg_shared, Key, Value});
  753. reg_shared1(_, _) ->
  754. ?THROW_GPROC_ERROR(badarg).
  755. %% @spec mreg(type(), scope(), [{Key::any(), Value::any()}]) -> true
  756. %%
  757. %% @doc Register multiple {Key,Value} pairs of a given type and scope.
  758. %%
  759. %% This function is more efficient than calling {@link reg/2} repeatedly.
  760. %% It is also atomic in regard to unique names; either all names are registered
  761. %% or none are.
  762. %% @end
  763. mreg(T, C, KVL) ->
  764. ?CATCH_GPROC_ERROR(mreg1(T, C, KVL), [T, C, KVL]).
  765. mreg1(T, g, KVL) ->
  766. ?CHK_DIST,
  767. gproc_dist:mreg(T, KVL);
  768. mreg1(T, l, KVL) when T==a; T==n ->
  769. if is_list(KVL) ->
  770. call({mreg, T, l, KVL});
  771. true ->
  772. erlang:error(badarg)
  773. end;
  774. mreg1(p, l, KVL) ->
  775. local_mreg(p, KVL);
  776. mreg1(_, _, _) ->
  777. ?THROW_GPROC_ERROR(badarg).
  778. %% @spec munreg(type(), scope(), [Key::any()]) -> true
  779. %%
  780. %% @doc Unregister multiple Key items of a given type and scope.
  781. %%
  782. %% This function is usually more efficient than calling {@link unreg/1}
  783. %% repeatedly.
  784. %% @end
  785. munreg(T, C, L) ->
  786. ?CATCH_GPROC_ERROR(munreg1(T, C, L), [T, C, L]).
  787. munreg1(T, g, L) ->
  788. ?CHK_DIST,
  789. gproc_dist:munreg(T, existing(T,g,L));
  790. munreg1(T, l, L) when T==a; T==n ->
  791. if is_list(L) ->
  792. call({munreg, T, l, existing(T,l,L)});
  793. true ->
  794. erlang:error(badarg)
  795. end;
  796. munreg1(p, l, L) ->
  797. local_munreg(p, existing(p,l,L));
  798. munreg1(_, _, _) ->
  799. ?THROW_GPROC_ERROR(badarg).
  800. existing(T,Scope,L) ->
  801. Keys = if T==p; T==c ->
  802. [{{T,Scope,K}, self()} || K <- L];
  803. T==a; T==n ->
  804. [{{T,Scope,K}, T} || K <- L]
  805. end,
  806. _ = [case ets:member(?TAB, K) of
  807. false -> erlang:error(badarg);
  808. true -> true
  809. end || K <- Keys],
  810. L.
  811. %% @spec (Key:: key()) -> true
  812. %%
  813. %% @doc Unregister a name or property.
  814. %% @end
  815. unreg(Key) ->
  816. ?CATCH_GPROC_ERROR(unreg1(Key), [Key]).
  817. unreg1(Key) ->
  818. case Key of
  819. {_, g, _} ->
  820. ?CHK_DIST,
  821. gproc_dist:unreg(Key);
  822. {T, l, _} when T == n;
  823. T == a -> call({unreg, Key});
  824. {_, l, _} ->
  825. case ets:member(?TAB, {Key,self()}) of
  826. true ->
  827. _ = gproc_lib:remove_reg(Key, self(), unreg),
  828. true;
  829. false ->
  830. ?THROW_GPROC_ERROR(badarg)
  831. end
  832. end.
  833. %% @spec (Key:: key()) -> true
  834. %%
  835. %% @doc Unregister a shared resource.
  836. %% @end
  837. unreg_shared(Key) ->
  838. ?CATCH_GPROC_ERROR(unreg_shared1(Key), [Key]).
  839. unreg_shared1(Key) ->
  840. case Key of
  841. {_, g, _} ->
  842. ?CHK_DIST,
  843. gproc_dist:unreg_shared(Key);
  844. {T, l, _} when T == c;
  845. T == a -> call({unreg_shared, Key});
  846. _ ->
  847. ?THROW_GPROC_ERROR(badarg)
  848. end.
  849. %% @spec (key(), pid()) -> yes | no
  850. %%
  851. %% @doc Behaviour support callback
  852. %% @end
  853. register_name({n,_,_} = Name, Pid) when Pid == self() ->
  854. try reg(Name), yes
  855. catch
  856. error:_ ->
  857. no
  858. end.
  859. %% @equiv unreg/1
  860. unregister_name(Key) ->
  861. unreg(Key).
  862. %% @spec select(Arg) -> [Match] | {[Match], Continuation} | '$end_of_table'
  863. %% where Arg = Continuation
  864. %% | sel_pattern()
  865. %% Match = {Key, Pid, Value}
  866. %% @doc Perform a select operation on the process registry
  867. %%
  868. %% When Arg = Contination, resume a gproc:select/1 operation
  869. %% (see {@link //stdlib/ets:select/1. ets:select/1}
  870. %%
  871. %% When Arg = {@type sel_pattern()}, this function executes a select operation,
  872. %% emulating ets:select/1
  873. %%
  874. %% {@link select/2} offers the opportunity to narrow the search
  875. %% (by limiting to only global or local scope, or a single type of object).
  876. %% When only a pattern as single argument is given, both global and local scope,
  877. %% as well as all types of object can be searched. Note that the pattern may
  878. %% still limit the select operation so that scanning the entire table is avoided.
  879. %%
  880. %% The physical representation in the registry may differ from the above,
  881. %% but the select patterns are transformed appropriately. The logical
  882. %% representation for the gproc select operations is given by
  883. %% {@type headpat()}.
  884. %% @end
  885. select({?TAB, _, _, _, _, _, _, _} = Continuation) ->
  886. ets:select(Continuation);
  887. select(Pat) ->
  888. select(all, Pat).
  889. %% @spec (Context::context(), Pat::sel_pattern()) -> [{Key, Pid, Value}]
  890. %%
  891. %% @doc Perform a select operation with limited context on the process registry
  892. %%
  893. %% The physical representation in the registry may differ from the above,
  894. %% but the select patterns are transformed appropriately.
  895. %%
  896. %% Note that limiting the context is just a convenience function, allowing you
  897. %% to write a simpler select pattern and still avoid searching the entire
  898. %% registry. Whenever variables are used in the head pattern, this will result
  899. %% in a wider scan, even if the values are restricted through a guard (e.g.
  900. %% <code>select([{'$1','$2','$3'}, [{'==', '$1', p}], ...])</code> will count as a wild
  901. %% pattern on the key and result in a full scan). In this case, specifying a
  902. %% Context will allow gproc to perform some variable substitution and ensure
  903. %% that the scan is limited.
  904. %% @end
  905. select(Context, Pat) ->
  906. ets:select(?TAB, pattern(Pat, Context)).
  907. %% @spec (Context::context(), Pat::sel_patten(), Limit::integer()) ->
  908. %% {[Match],Continuation} | '$end_of_table'
  909. %% @doc Like {@link select/2} but returns Limit objects at a time.
  910. %%
  911. %% See [http://www.erlang.org/doc/man/ets.html#select-3].
  912. %% @end
  913. select(Context, Pat, Limit) ->
  914. ets:select(?TAB, pattern(Pat, Context), Limit).
  915. %% @spec (sel_pattern()) -> list(sel_object())
  916. %% @doc
  917. %% @equiv select_count(all, Pat)
  918. %% @end
  919. select_count(Pat) ->
  920. select_count(all, Pat).
  921. %% @spec (Context::context(), Pat::sel_pattern()) -> [{Key, Pid, Value}]
  922. %%
  923. %% @doc Perform a select_count operation on the process registry.
  924. %%
  925. %% The physical representation in the registry may differ from the above,
  926. %% but the select patterns are transformed appropriately.
  927. %% @end
  928. select_count(Context, Pat) ->
  929. ets:select_count(?TAB, pattern(Pat, Context)).
  930. %%% Local properties can be registered in the local process, since
  931. %%% no other process can interfere.
  932. %%%
  933. local_reg(Key, Value) ->
  934. case gproc_lib:insert_reg(Key, Value, self(), l) of
  935. false -> ?THROW_GPROC_ERROR(badarg);
  936. true -> monitor_me()
  937. end.
  938. local_mreg(_, []) -> true;
  939. local_mreg(T, [_|_] = KVL) ->
  940. case gproc_lib:insert_many(T, l, KVL, self()) of
  941. false -> ?THROW_GPROC_ERROR(badarg);
  942. {true,_} -> monitor_me()
  943. end.
  944. local_munreg(T, L) when T==p; T==c ->
  945. _ = [gproc_lib:remove_reg({T,l,K}, self(), unreg) || K <- L],
  946. true.
  947. %% @spec (Key :: key(), Value) -> true
  948. %% @doc Sets the value of the registeration entry given by Key
  949. %%
  950. %% Key is assumed to exist and belong to the calling process.
  951. %% If it doesn't, this function will exit.
  952. %%
  953. %% Value can be any term, unless the object is a counter, in which case
  954. %% it must be an integer.
  955. %% @end
  956. %%
  957. set_value(Key, Value) ->
  958. ?CATCH_GPROC_ERROR(set_value1(Key, Value), [Key, Value]).
  959. set_value1({_,g,_} = Key, Value) ->
  960. ?CHK_DIST,
  961. gproc_dist:set_value(Key, Value);
  962. set_value1({a,l,_} = Key, Value) when is_integer(Value) ->
  963. call({set, Key, Value});
  964. set_value1({n,l,_} = Key, Value) ->
  965. %% we cannot do this locally, since we have to check that the object
  966. %% exists first - not an atomic update.
  967. call({set, Key, Value});
  968. set_value1({p,l,_} = Key, Value) ->
  969. %% we _can_ to this locally, since there is no race condition - no
  970. %% other process can update our properties.
  971. case gproc_lib:do_set_value(Key, Value, self()) of
  972. true -> true;
  973. false ->
  974. erlang:error(badarg)
  975. end;
  976. set_value1({c,l,_} = Key, Value) when is_integer(Value) ->
  977. gproc_lib:do_set_counter_value(Key, Value, self());
  978. set_value1(_, _) ->
  979. ?THROW_GPROC_ERROR(badarg).
  980. %% @spec (Key) -> Value
  981. %% @doc Reads the value stored with a key registered to the current process.
  982. %%
  983. %% If no such key is registered to the current process, this function exits.
  984. %% @end
  985. get_value(Key) ->
  986. ?CATCH_GPROC_ERROR(get_value1(Key, self()), [Key]).
  987. %% @spec (Key, Pid) -> Value
  988. %% @doc Reads the value stored with a key registered to the process Pid.
  989. %%
  990. %% If `Pid == shared', the value of a shared key (see {@link reg_shared/1})
  991. %% will be read.
  992. %% @end
  993. %%
  994. get_value(Key, Pid) ->
  995. ?CATCH_GPROC_ERROR(get_value1(Key, Pid), [Key, Pid]).
  996. get_value1({T,_,_} = Key, Pid) when is_pid(Pid) ->
  997. if T==n orelse T==a ->
  998. case ets:lookup(?TAB, {Key, T}) of
  999. [{_, P, Value}] when P == Pid -> Value;
  1000. _ -> ?THROW_GPROC_ERROR(badarg)
  1001. end;
  1002. true ->
  1003. ets:lookup_element(?TAB, {Key, Pid}, 3)
  1004. end;
  1005. get_value1({T,_,_} = K, shared) when T==c; T==a ->
  1006. Key = case T of
  1007. c -> {K, shared};
  1008. a -> {K, a}
  1009. end,
  1010. case ets:lookup(?TAB, Key) of
  1011. [{_, shared, Value}] -> Value;
  1012. _ -> ?THROW_GPROC_ERROR(badarg)
  1013. end;
  1014. get_value1(_, _) ->
  1015. ?THROW_GPROC_ERROR(badarg).
  1016. %% @spec (Key) -> Pid
  1017. %% @doc Lookup the Pid stored with a key.
  1018. %%
  1019. lookup_pid({_T,_,_} = Key) ->
  1020. case where(Key) of
  1021. undefined -> erlang:error(badarg);
  1022. P -> P
  1023. end.
  1024. %% @spec (Key) -> Value
  1025. %% @doc Lookup the value stored with a key.
  1026. %%
  1027. lookup_value({T,_,_} = Key) ->
  1028. if T==n orelse T==a ->
  1029. ets:lookup_element(?TAB, {Key,T}, 3);
  1030. true ->
  1031. erlang:error(badarg)
  1032. end.
  1033. %% @spec (Key::key()) -> pid()
  1034. %%
  1035. %% @doc Returns the pid registered as Key
  1036. %%
  1037. %% The type of registration entry must be either name or aggregated counter.
  1038. %% Otherwise this function will exit. Use {@link lookup_pids/1} in these
  1039. %% cases.
  1040. %% @end
  1041. %%
  1042. where(Key) ->
  1043. ?CATCH_GPROC_ERROR(where1(Key), [Key]).
  1044. where1({T,_,_}=Key) ->
  1045. if T==n orelse T==a ->
  1046. case ets:lookup(?TAB, {Key,T}) of
  1047. [{_, P, _Value}] ->
  1048. case my_is_process_alive(P) of
  1049. true -> P;
  1050. false ->
  1051. undefined
  1052. end;
  1053. _ -> % may be [] or [{Key,Waiters}]
  1054. undefined
  1055. end;
  1056. true ->
  1057. ?THROW_GPROC_ERROR(badarg)
  1058. end.
  1059. %% @equiv where/1
  1060. whereis_name(Key) ->
  1061. ?CATCH_GPROC_ERROR(where1(Key), [Key]).
  1062. %% @spec (Key::key()) -> [pid()]
  1063. %%
  1064. %% @doc Returns a list of pids with the published key Key
  1065. %%
  1066. %% If the type of registration entry is either name or aggregated counter,
  1067. %% this function will return either an empty list, or a list of one pid.
  1068. %% For non-unique types, the return value can be a list of any length.
  1069. %% @end
  1070. %%
  1071. lookup_pids({T,_,_} = Key) ->
  1072. L = if T==n orelse T==a ->
  1073. ets:select(?TAB, [{{{Key,T}, '$1', '_'},[],['$1']}]);
  1074. true ->
  1075. ets:select(?TAB, [{{{Key,'_'}, '$1', '_'},[],['$1']}])
  1076. end,
  1077. [P || P <- L, my_is_process_alive(P)].
  1078. %% @spec (pid()) -> boolean()
  1079. %%
  1080. my_is_process_alive(P) when node(P) =:= node() ->
  1081. is_process_alive(P);
  1082. my_is_process_alive(_) ->
  1083. %% remote pid - assume true (too costly to find out)
  1084. true.
  1085. %% @spec (Key::key()) -> [{pid(), Value}]
  1086. %%
  1087. %% @doc Retrieve the `{Pid,Value}' pairs corresponding to Key.
  1088. %%
  1089. %% Key refer to any type of registry object. If it refers to a unique
  1090. %% object, the list will be of length 0 or 1. If it refers to a non-unique
  1091. %% object, the return value can be a list of any length.
  1092. %% @end
  1093. %%
  1094. lookup_values({T,_,_} = Key) ->
  1095. L = if T==n orelse T==a ->
  1096. ets:select(?TAB, [{{{Key,T}, '$1', '$2'},[],[{{'$1','$2'}}]}]);
  1097. true ->
  1098. ets:select(?TAB, [{{{Key,'_'}, '$1', '$2'},[],[{{'$1','$2'}}]}])
  1099. end,
  1100. [Pair || {P,_} = Pair <- L, my_is_process_alive(P)].
  1101. %% @ spec (Key::key(), Incr) -> integer() | [integer()]
  1102. %% Incr = IncrVal | UpdateOp | [UpdateOp]
  1103. %% UpdateOp = IncrVal | {IncrVal, Threshold, SetValue}
  1104. %% IncrVal = integer()
  1105. %%
  1106. %% @doc Updates the counter registered as Key for the current process.
  1107. %%
  1108. %% This function works almost exactly like ets:update_counter/3
  1109. %% (see [http://www.erlang.org/doc/man/ets.html#update_counter-3]), but
  1110. %% will fail if the type of object referred to by Key is not a counter.
  1111. %%
  1112. %% Aggregated counters with the same name will be updated automatically.
  1113. %% The `UpdateOp' patterns are the same as for `ets:update_counter/3', except
  1114. %% that the position is omitted; in gproc, the value position is always `3'.
  1115. %% @end
  1116. %%
  1117. -spec update_counter(key(), increment()) -> integer().
  1118. update_counter(Key, Incr) ->
  1119. ?CATCH_GPROC_ERROR(update_counter1(Key, Incr), [Key, Incr]).
  1120. update_counter1({c,l,_} = Key, Incr) ->
  1121. gproc_lib:update_counter(Key, Incr, self());
  1122. update_counter1({c,g,_} = Key, Incr) ->
  1123. ?CHK_DIST,
  1124. gproc_dist:update_counter(Key, Incr);
  1125. update_counter1(_, _) ->
  1126. ?THROW_GPROC_ERROR(badarg).
  1127. %% @doc Update a list of counters
  1128. %%
  1129. %% This function is not atomic, except (in a sense) for global counters. For local counters,
  1130. %% it is more of a convenience function. For global counters, it is much more efficient
  1131. %% than calling `gproc:update_counter/2' for each individual counter.
  1132. %%
  1133. %% The return value is the corresponding list of `[{Counter, Pid, NewValue}]'.
  1134. %% @end
  1135. -spec update_counters(scope(), [{key(), pid(), increment()}]) ->
  1136. [{key(), pid(), integer()}].
  1137. update_counters(_, []) ->
  1138. [];
  1139. update_counters(l, [_|_] = Cs) ->
  1140. ?CATCH_GPROC_ERROR(update_counters1(Cs), [Cs]);
  1141. update_counters(g, [_|_] = Cs) ->
  1142. ?CHK_DIST,
  1143. gproc_dist:update_counters(Cs).
  1144. update_counters1([{{c,l,_} = Key, Pid, Incr}|T]) ->
  1145. [{Key, Pid, gproc_lib:update_counter(Key, Incr, Pid)}|update_counters1(T)];
  1146. update_counters1([]) ->
  1147. [];
  1148. update_counters1(_) ->
  1149. ?THROW_GPROC_ERROR(badarg).
  1150. %% @spec (Key) -> {ValueBefore, ValueAfter}
  1151. %% Key = {c, Scope, Name}
  1152. %% Scope = l | g
  1153. %% ValueBefore = integer()
  1154. %% ValueAfter = integer()
  1155. %%
  1156. %% @doc Reads and resets a counter in a "thread-safe" way
  1157. %%
  1158. %% This function reads the current value of a counter and then resets it to its
  1159. %% initial value. The reset operation is done using {@link update_counter/2},
  1160. %% which allows for concurrent calls to {@link update_counter/2} without losing
  1161. %% updates. Aggregated counters are updated accordingly.
  1162. %% @end
  1163. %%
  1164. reset_counter(Key) ->
  1165. ?CATCH_GPROC_ERROR(reset_counter1(Key), [Key]).
  1166. reset_counter1({c,g,_} = Key) ->
  1167. ?CHK_DIST,
  1168. gproc_dist:reset_counter(Key);
  1169. reset_counter1({c,l,_} = Key) ->
  1170. Current = ets:lookup_element(?TAB, {Key, self()}, 3),
  1171. Initial = case ets:lookup(?TAB, {self(), Key}) of
  1172. [{_, r}] -> 0;
  1173. [{_, Opts}] ->
  1174. proplists:get_value(initial, Opts, 0)
  1175. end,
  1176. {Current, update_counter(Key, Initial - Current)}.
  1177. %% @spec (Key::key(), Incr) -> integer() | [integer()]
  1178. %% Incr = IncrVal | UpdateOp | [UpdateOp]
  1179. %% UpdateOp = IncrVal | {IncrVal, Threshold, SetValue}
  1180. %% IncrVal = integer()
  1181. %%
  1182. %% @doc Updates the shared counter registered as Key.
  1183. %%
  1184. %% This function works almost exactly like ets:update_counter/3
  1185. %% (see [http://www.erlang.org/doc/man/ets.html#update_counter-3]), but
  1186. %% will fail if the type of object referred to by Key is not a counter.
  1187. %%
  1188. %% Aggregated counters with the same name will be updated automatically.
  1189. %% The `UpdateOp' patterns are the same as for `ets:update_counter/3', except
  1190. %% that the position is omitted; in gproc, the value position is always `3'.
  1191. %% @end
  1192. %%
  1193. update_shared_counter(Key, Incr) ->
  1194. ?CATCH_GPROC_ERROR(update_shared_counter1(Key, Incr), [Key, Incr]).
  1195. update_shared_counter1({c,g,_} = Key, Incr) ->
  1196. ?CHK_DIST,
  1197. gproc_dist:update_shared_counter(Key, Incr);
  1198. update_shared_counter1({c,l,_} = Key, Incr) ->
  1199. gproc_lib:update_counter(Key, Incr, shared).
  1200. %% @spec (From::key(), To::pid() | key()) -> undefined | pid()
  1201. %%
  1202. %% @doc Atomically transfers the key `From' to the process identified by `To'.
  1203. %%
  1204. %% This function transfers any gproc key (name, property, counter, aggr counter)
  1205. %% from one process to another, and returns the pid of the new owner.
  1206. %%
  1207. %% `To' must be either a pid or a unique name (name or aggregated counter), but
  1208. %% does not necessarily have to resolve to an existing process. If there is
  1209. %% no process registered with the `To' key, `give_away/2' returns `undefined',
  1210. %% and the `From' key is effectively unregistered.
  1211. %%
  1212. %% It is allowed to give away a key to oneself, but of course, this operation
  1213. %% will have no effect.
  1214. %%
  1215. %% Fails with `badarg' if the calling process does not have a `From' key
  1216. %% registered.
  1217. %% @end
  1218. give_away(Key, ToPid) ->
  1219. ?CATCH_GPROC_ERROR(give_away1(Key, ToPid), [Key, ToPid]).
  1220. give_away1({_,l,_} = Key, ToPid) when is_pid(ToPid), node(ToPid) == node() ->
  1221. call({give_away, Key, ToPid});
  1222. give_away1({_,l,_} = Key, {n,l,_} = ToKey) ->
  1223. call({give_away, Key, ToKey});
  1224. give_away1({_,g,_} = Key, To) ->
  1225. ?CHK_DIST,
  1226. gproc_dist:give_away(Key, To).
  1227. %% @spec () -> ok
  1228. %%
  1229. %% @doc Unregister all items of the calling process and inform gproc
  1230. %% to forget about the calling process.
  1231. %%
  1232. %% This function is more efficient than letting gproc perform these
  1233. %% cleanup operations.
  1234. %% @end
  1235. goodbye() ->
  1236. process_is_down(self()).
  1237. %% @spec (Key::key(), Msg::any()) -> Msg
  1238. %%
  1239. %% @doc Sends a message to the process, or processes, corresponding to Key.
  1240. %%
  1241. %% If Key belongs to a unique object (name or aggregated counter), this
  1242. %% function will send a message to the corresponding process, or fail if there
  1243. %% is no such process. If Key is for a non-unique object type (counter or
  1244. %% property), Msg will be send to all processes that have such an object.
  1245. %% @end
  1246. %%
  1247. send(Key, Msg) ->
  1248. ?CATCH_GPROC_ERROR(send1(Key, Msg), [Key, Msg]).
  1249. send1({T,C,_} = Key, Msg) when C==l; C==g ->
  1250. if T == n orelse T == a ->
  1251. case ets:lookup(?TAB, {Key, T}) of
  1252. [{_, Pid, _}] ->
  1253. Pid ! Msg;
  1254. _ ->
  1255. ?THROW_GPROC_ERROR(badarg)
  1256. end;
  1257. T==p orelse T==c ->
  1258. %% BUG - if the key part contains select wildcards, we may end up
  1259. %% sending multiple messages to the same pid
  1260. lists:foreach(fun(Pid) ->
  1261. Pid ! Msg
  1262. end, lookup_pids(Key)),
  1263. Msg;
  1264. true ->
  1265. erlang:error(badarg)
  1266. end;
  1267. send1(_, _) ->
  1268. ?THROW_GPROC_ERROR(badarg).
  1269. %% @spec (Key::key(), Msg::any()) -> Msg
  1270. %%
  1271. %% @equiv bcast(nodes(), Key, Msg)
  1272. %% @end
  1273. %%
  1274. bcast(Key, Msg) ->
  1275. bcast(nodes(), Key, Msg).
  1276. %% @spec (Nodes::[atom()], Key::key(), Msg::any()) -> Msg
  1277. %%
  1278. %% @doc Sends a message to processes corresponding to Key on Nodes.
  1279. %%
  1280. %% This function complements `send/2' and works on locally registered resources
  1281. %% that `send/2' supports. Messages are routed via a special broadcast server
  1282. %% on each node to ensure that ordering is preserved. Distributed delivery
  1283. %% is asynchronous and carries the same guarantees as normal message passing
  1284. %% (with the added proviso that the broadcast server also needs to be available).
  1285. %% @see send/2
  1286. %% @end
  1287. %%
  1288. bcast(Ns, Key, Msg) ->
  1289. ?CATCH_GPROC_ERROR(bcast1(Ns, Key, Msg), [Key, Msg]).
  1290. bcast1(Ns, {T,l,_} = Key, Msg) when T==p; T==a; T==c; T==n; T==p ->
  1291. send1(Key, Msg),
  1292. gen_server:abcast(Ns -- [node()], gproc_bcast, {send, Key, Msg}),
  1293. Msg.
  1294. %% @spec (Context :: context()) -> key() | '$end_of_table'
  1295. %%
  1296. %% @doc Behaves as ets:first(Tab) for a given type of registration object.
  1297. %%
  1298. %% See [http://www.erlang.org/doc/man/ets.html#first-1].
  1299. %% The registry behaves as an ordered_set table.
  1300. %% @end
  1301. %%
  1302. first(Context) ->
  1303. {S, T} = get_s_t(Context),
  1304. {HeadPat,_} = headpat({S, T}, '_', '_', '_'),
  1305. case ets:select(?TAB, [{HeadPat,[],[{element,1,'$_'}]}], 1) of
  1306. {[First], _} ->
  1307. First;
  1308. _ ->
  1309. '$end_of_table'
  1310. end.
  1311. %% @spec (Context :: context()) -> key() | '$end_of_table'
  1312. %%
  1313. %% @doc Behaves as ets:last(Tab) for a given type of registration object.
  1314. %%
  1315. %% See [http://www.erlang.org/doc/man/ets.html#last-1].
  1316. %% The registry behaves as an ordered_set table.
  1317. %% @end
  1318. %%
  1319. last(Context) ->
  1320. {S, T} = get_s_t(Context),
  1321. S1 = if S == '_'; S == l -> m; % 'm' comes after 'l'
  1322. S == g -> h % 'h' comes between 'g' & 'l'
  1323. end,
  1324. Beyond = {{T,S1,[]},[]},
  1325. step(ets:prev(?TAB, Beyond), S, T).
  1326. %% @spec (Context::context(), Key::key()) -> key() | '$end_of_table'
  1327. %%
  1328. %% @doc Behaves as ets:next(Tab,Key) for a given type of registration object.
  1329. %%
  1330. %% See [http://www.erlang.org/doc/man/ets.html#next-2].
  1331. %% The registry behaves as an ordered_set table.
  1332. %% @end
  1333. %%
  1334. next(Context, K) ->
  1335. {S,T} = get_s_t(Context),
  1336. step(ets:next(?TAB,K), S, T).
  1337. %% @spec (Context::context(), Key::key()) -> key() | '$end_of_table'
  1338. %%
  1339. %% @doc Behaves as ets:prev(Tab,Key) for a given type of registration object.
  1340. %%
  1341. %% See [http://www.erlang.org/doc/man/ets.html#prev-2].
  1342. %% The registry behaves as an ordered_set table.
  1343. %% @end
  1344. %%
  1345. prev(Context, K) ->
  1346. {S, T} = get_s_t(Context),
  1347. step(ets:prev(?TAB, K), S, T).
  1348. step(Key, '_', '_') ->
  1349. case Key of
  1350. {{_,_,_},_} -> Key;
  1351. _ -> '$end_of_table'
  1352. end;
  1353. step(Key, '_', T) ->
  1354. case Key of
  1355. {{T,_,_},_} -> Key;
  1356. _ -> '$end_of_table'
  1357. end;
  1358. step(Key, S, '_') ->
  1359. case Key of
  1360. {{_, S, _}, _} -> Key;
  1361. _ -> '$end_of_table'
  1362. end;
  1363. step(Key, S, T) ->
  1364. case Key of
  1365. {{T, S, _}, _} -> Key;
  1366. _ -> '$end_of_table'
  1367. end.
  1368. %% @spec (Pid::pid()) -> ProcessInfo
  1369. %% ProcessInfo = [{gproc, [{Key,Value}]} | ProcessInfo]
  1370. %%
  1371. %% @doc Similar to `process_info(Pid)' but with additional gproc info.
  1372. %%
  1373. %% Returns the same information as process_info(Pid), but with the
  1374. %% addition of a `gproc' information item, containing the `{Key,Value}'
  1375. %% pairs registered to the process.
  1376. %% @end
  1377. info(Pid) when is_pid(Pid) ->
  1378. Items = [?MODULE | [ I || {I,_} <- process_info(self())]],
  1379. [info(Pid,I) || I <- Items].
  1380. %% @spec (Pid::pid(), Item::atom()) -> {Item, Info}
  1381. %%
  1382. %% @doc Similar to process_info(Pid, Item), but with additional gproc info.
  1383. %%
  1384. %% For `Item = gproc', this function returns a list of `{Key, Value}' pairs
  1385. %% registered to the process Pid. For other values of Item, it returns the
  1386. %% same as [http://www.erlang.org/doc/man/erlang.html#process_info-2].
  1387. %% @end
  1388. info(Pid, gproc) ->
  1389. gproc_info(Pid, '_');
  1390. info(Pid, {gproc, Pat}) ->
  1391. gproc_info(Pid, Pat);
  1392. info(Pid, current_function) ->
  1393. {_, T} = process_info(Pid, backtrace),
  1394. info_cur_f(T, process_info(Pid, current_function));
  1395. info(Pid, I) ->
  1396. process_info(Pid, I).
  1397. %% We don't want to return the internal gproc:info() function as current
  1398. %% function, so we grab the 'backtrace' and extract the call stack from it,
  1399. %% filtering out the functions gproc:info/_ and gproc:'-info/1-lc...' entries.
  1400. %%
  1401. %% This is really an indication that wrapping the process_info() BIF was a
  1402. %% bad idea to begin with... :P
  1403. %%
  1404. info_cur_f(T, Default) ->
  1405. {match, Matches} = re:run(T,<<"\\(([^\\)]+):(.+)/([0-9]+)">>,
  1406. [global,{capture,[1,2,3],list}]),
  1407. case lists:dropwhile(fun(["gproc","info",_]) -> true;
  1408. (["gproc","'-info/1-lc" ++ _, _]) -> true;
  1409. (_) -> false
  1410. end, Matches) of
  1411. [] ->
  1412. Default;
  1413. [[M,F,A]|_] ->
  1414. {current_function,
  1415. {to_atom(M), to_atom(F), list_to_integer(A)}}
  1416. end.
  1417. to_atom(S) ->
  1418. case erl_scan:string(S) of
  1419. {ok, [{atom,_,A}|_],_} ->
  1420. A;
  1421. _ ->
  1422. list_to_atom(S)
  1423. end.
  1424. gproc_info(Pid, Pat) ->
  1425. Keys = ets:select(?TAB, [{ {{Pid,Pat}, '_'}, [], [{element,2,
  1426. {element,1,'$_'}}] }]),
  1427. {?MODULE, lists:zf(
  1428. fun(K) ->
  1429. try V = get_value(K, Pid),
  1430. {true, {K,V}}
  1431. catch
  1432. error:_ ->
  1433. false
  1434. end
  1435. end, Keys)}.
  1436. %% @spec () -> ok
  1437. %%
  1438. %% @doc Similar to the built-in shell command `i()' but inserts information
  1439. %% about names and properties registered in Gproc, where applicable.
  1440. %% @end
  1441. i() ->
  1442. gproc_info:i().
  1443. %%% ==========================================================
  1444. %% @hidden
  1445. handle_cast({monitor_me, Pid}, S) ->
  1446. erlang:monitor(process, Pid),
  1447. {noreply, S};
  1448. handle_cast({audit_process, Pid}, S) ->
  1449. case is_process_alive(Pid) of
  1450. false ->
  1451. process_is_down(Pid);
  1452. true ->
  1453. ignore
  1454. end,
  1455. {noreply, S};
  1456. handle_cast({cancel_wait, Pid, {T,_,_} = Key, Ref}, S) ->
  1457. _ = case ets:lookup(?TAB, {Key,T}) of
  1458. [{_, Waiters}] ->
  1459. gproc_lib:remove_wait(Key, Pid, Ref, Waiters);
  1460. _ ->
  1461. ignore
  1462. end,
  1463. {noreply, S};
  1464. handle_cast({cancel_wait_or_monitor, Pid, {T,_,_} = Key}, S) ->
  1465. _ = case ets:lookup(?TAB, {Key, T}) of
  1466. [{_, Waiters}] ->
  1467. gproc_lib:remove_wait(Key, Pid, all, Waiters);
  1468. [{_, OtherPid, _}] ->
  1469. gproc_lib:remove_monitors(Key, OtherPid, Pid);
  1470. _ ->
  1471. ok
  1472. end,
  1473. {noreply, S}.
  1474. %% @hidden
  1475. handle_call({reg, {_T,l,_} = Key, Val}, {Pid,_}, S) ->
  1476. case try_insert_reg(Key, Val, Pid) of
  1477. true ->
  1478. _ = gproc_lib:ensure_monitor(Pid,l),
  1479. {reply, true, S};
  1480. false ->
  1481. {reply, badarg, S}
  1482. end;
  1483. handle_call({monitor, {T,l,_} = Key, Pid}, _From, S)
  1484. when T==n; T==a ->
  1485. Ref = make_ref(),
  1486. _ = case where(Key) of
  1487. undefined ->
  1488. Pid ! {gproc, unreg, Ref, Key};
  1489. RegPid ->
  1490. case ets:lookup(?TAB, {RegPid, Key}) of
  1491. [{K,r}] ->
  1492. ets:insert(?TAB, {K, [{monitor, [{Pid,Ref}]}]});
  1493. [{K, Opts}] ->
  1494. ets:insert(?TAB, {K, gproc_lib:add_monitor(Opts, Pid, Ref)})
  1495. end
  1496. end,
  1497. {reply, Ref, S};
  1498. handle_call({demonitor, {T,l,_} = Key, Ref, Pid}, _From, S)
  1499. when T==n; T==a ->
  1500. _ = case where(Key) of
  1501. undefined ->
  1502. ok; % be nice
  1503. RegPid ->
  1504. case ets:lookup(?TAB, {RegPid, Key}) of
  1505. [{_K,r}] ->
  1506. ok; % be nice
  1507. [{K, Opts}] ->
  1508. ets:insert(?TAB, {K, gproc_lib:remove_monitor(
  1509. Opts, Pid, Ref)})
  1510. end
  1511. end,
  1512. {reply, ok, S};
  1513. handle_call({reg_shared, {_T,l,_} = Key, Val}, _From, S) ->
  1514. case try_insert_reg(Key, Val, shared) of
  1515. %% case try_insert_shared(Key, Val) of
  1516. true ->
  1517. {reply, true, S};
  1518. false ->
  1519. {reply, badarg, S}
  1520. end;
  1521. handle_call({unreg, {_,l,_} = Key}, {Pid,_}, S) ->
  1522. case ets:lookup(?TAB, {Pid,Key}) of
  1523. [{_, r}] ->
  1524. _ = gproc_lib:remove_reg(Key, Pid, unreg),
  1525. {reply, true, S};
  1526. [{_, Opts}] when is_list(Opts) ->
  1527. _ = gproc_lib:remove_reg(Key, Pid, unreg, Opts),
  1528. {reply, true, S};
  1529. [] ->
  1530. {reply, badarg, S}
  1531. end;
  1532. handle_call({unreg_shared, {_,l,_} = Key}, _, S) ->
  1533. _ = case ets:lookup(?TAB, {shared, Key}) of
  1534. [{_, r}] ->
  1535. _ = gproc_lib:remove_reg(Key, shared, unreg);
  1536. [{_, Opts}] ->
  1537. _ = gproc_lib:remove_reg(Key, shared, unreg, Opts);
  1538. [] ->
  1539. %% don't crash if shared key already unregged.
  1540. ok
  1541. end,
  1542. {reply, true, S};
  1543. handle_call({await, {_,l,_} = Key, Pid}, From, S) ->
  1544. %% Passing the pid explicitly is needed when leader_call is used,
  1545. %% since the Pid given as From in the leader is the local gen_leader
  1546. %% instance on the calling node.
  1547. case gproc_lib:await(Key, Pid, From) of
  1548. noreply ->
  1549. {noreply, S};
  1550. {reply, Reply, _} ->
  1551. {reply, Reply, S}
  1552. end;
  1553. handle_call({mreg, T, l, L}, {Pid,_}, S) ->
  1554. try gproc_lib:insert_many(T, l, L, Pid) of
  1555. {true,_} -> {reply, true, S};
  1556. false -> {reply, badarg, S}
  1557. catch
  1558. error:_ -> {reply, badarg, S}
  1559. end;
  1560. handle_call({munreg, T, l, L}, {Pid,_}, S) ->
  1561. _ = gproc_lib:remove_many(T, l, L, Pid),
  1562. {reply, true, S};
  1563. handle_call({set, {_,l,_} = Key, Value}, {Pid,_}, S) ->
  1564. case gproc_lib:do_set_value(Key, Value, Pid) of
  1565. true ->
  1566. {reply, true, S};
  1567. false ->
  1568. {reply, badarg, S}
  1569. end;
  1570. handle_call({audit_process, Pid}, _, S) ->
  1571. _ = case is_process_alive(Pid) of
  1572. false ->
  1573. process_is_down(Pid);
  1574. true ->
  1575. ignore
  1576. end,
  1577. {reply, ok, S};
  1578. handle_call({give_away, Key, To}, {Pid,_}, S) ->
  1579. Reply = do_give_away(Key, To, Pid),
  1580. {reply, Reply, S};
  1581. handle_call(_, _, S) ->
  1582. {reply, badarg, S}.
  1583. %% @hidden
  1584. handle_info({'DOWN', _MRef, process, Pid, _}, S) ->
  1585. _ = process_is_down(Pid),
  1586. {noreply, S};
  1587. handle_info(_, S) ->
  1588. {noreply, S}.
  1589. %% @hidden
  1590. code_change(_FromVsn, S, _Extra) ->
  1591. %% We have changed local monitor markers from {Pid} to {Pid,l}.
  1592. _ = case ets:select(?TAB, [{{'$1'},[],['$1']}]) of
  1593. [] ->
  1594. ok;
  1595. Pids ->
  1596. ets:insert(?TAB, [{P,l} || P <- Pids]),
  1597. ets:select_delete(?TAB, [{{'_'},[],[true]}])
  1598. end,
  1599. {ok, S}.
  1600. %% @hidden
  1601. terminate(_Reason, _S) ->
  1602. ok.
  1603. call(Req) ->
  1604. call(Req, l).
  1605. call(Req, l) ->
  1606. chk_reply(gen_server:call(?MODULE, Req));
  1607. call(Req, g) ->
  1608. chk_reply(gproc_dist:leader_call(Req)).
  1609. chk_reply(Reply) ->
  1610. case Reply of
  1611. badarg -> ?THROW_GPROC_ERROR(badarg);
  1612. _ -> Reply
  1613. end.
  1614. cast(Msg) ->
  1615. cast(Msg, l).
  1616. cast(Msg, l) ->
  1617. gen_server:cast(?MODULE, Msg);
  1618. cast(Msg, g) ->
  1619. gproc_dist:leader_cast(Msg).
  1620. try_insert_reg({T,l,_} = Key, Val, Pid) ->
  1621. case gproc_lib:insert_reg(Key, Val, Pid, l) of
  1622. false ->
  1623. case ets:lookup(?TAB, {Key,T}) of
  1624. %% In this particular case, the lookup cannot result in
  1625. %% [{_, Waiters}], since the insert_reg/4 function would
  1626. %% have succeeded then.
  1627. [{_, OtherPid, _}] ->
  1628. case is_process_alive(OtherPid) of
  1629. true ->
  1630. false;
  1631. false ->
  1632. process_is_down(OtherPid),
  1633. true = gproc_lib:insert_reg(Key, Val, Pid, l)
  1634. end;
  1635. [] ->
  1636. false
  1637. end;
  1638. true ->
  1639. true
  1640. end.
  1641. %% try_insert_shared({c,l,_} = Key, Val) ->
  1642. %% ets:insert_new(?TAB, [{{Key,shared}, shared, Val}, {{shared, Key}, []}]);
  1643. %% try_insert_shared({a,l,_} = Key, Val) ->
  1644. %% ets:insert_new(?TAB, [{{Key, a}, shared, Val}, {{shared, Key}, []}]).
  1645. -spec audit_process(pid()) -> ok.
  1646. audit_process(Pid) when is_pid(Pid) ->
  1647. ok = gen_server:call(gproc, {audit_process, Pid}, infinity).
  1648. nb_audit_process(Pid) when is_pid(Pid) ->
  1649. ok = gen_server:cast(gproc, {audit_process, Pid}).
  1650. -spec process_is_down(pid()) -> ok.
  1651. process_is_down(Pid) when is_pid(Pid) ->
  1652. %% delete the monitor marker
  1653. %% io:fwrite(user, "process_is_down(~p) - ~p~n", [Pid,ets:tab2list(?TAB)]),
  1654. Marker = {Pid,l},
  1655. case ets:member(?TAB, Marker) of
  1656. false ->
  1657. ok;
  1658. true ->
  1659. Revs = ets:select(?TAB, [{{{Pid,'$1'}, '$2'},
  1660. [{'==',{element,2,'$1'},l}],
  1661. [{{'$1','$2'}}]}]),
  1662. lists:foreach(
  1663. fun({{n,l,_}=K, R}) ->
  1664. Key = {K,n},
  1665. case ets:lookup(?TAB, Key) of
  1666. [{_, Pid, _}] ->
  1667. ets:delete(?TAB, Key),
  1668. opt_notify(R, K);
  1669. [{_, Waiters}] ->
  1670. case [W || {P,_} = W <- Waiters,
  1671. P =/= Pid] of
  1672. [] ->
  1673. ets:delete(?TAB, Key);
  1674. Waiters1 ->
  1675. ets:insert(?TAB, {Key, Waiters1})
  1676. end;
  1677. [] ->
  1678. true
  1679. end;
  1680. ({{c,l,C} = K, _}) ->
  1681. Key = {K, Pid},
  1682. [{_, _, Value}] = ets:lookup(?TAB, Key),
  1683. ets:delete(?TAB, Key),
  1684. gproc_lib:update_aggr_counter(l, C, -Value);
  1685. ({{a,l,_} = K, R}) ->
  1686. ets:delete(?TAB, {K,a}),
  1687. opt_notify(R, K);
  1688. ({{p,_,_} = K, _}) ->
  1689. ets:delete(?TAB, {K, Pid})
  1690. end, Revs),
  1691. ets:select_delete(?TAB, [{{{Pid,{'_',l,'_'}},'_'}, [], [true]}]),
  1692. ets:delete(?TAB, Marker),
  1693. ok
  1694. end.
  1695. opt_notify(r, _) ->
  1696. ok;
  1697. opt_notify(Opts, Key) ->
  1698. gproc_lib:notify(Key, Opts).
  1699. do_give_away({T,l,_} = K, To, Pid) when T==n; T==a ->
  1700. Key = {K, T},
  1701. case ets:lookup(?TAB, Key) of
  1702. [{_, Pid, Value}] ->
  1703. %% Pid owns the reg; allowed to give_away
  1704. case pid_to_give_away_to(To) of
  1705. Pid ->
  1706. %% Give away to ourselves? Why not? We'll allow it,
  1707. %% but nothing needs to be done.
  1708. Pid;
  1709. ToPid when is_pid(ToPid) ->
  1710. ets:insert(?TAB, [{Key, ToPid, Value},
  1711. {{ToPid, K}, []}]),
  1712. _ = gproc_lib:remove_reverse_mapping({migrated,ToPid}, Pid, K),
  1713. _ = gproc_lib:ensure_monitor(ToPid, l),
  1714. ToPid;
  1715. undefined ->
  1716. _ = gproc_lib:remove_reg(K, Pid, unreg),
  1717. undefined
  1718. end;
  1719. _ ->
  1720. badarg
  1721. end;
  1722. do_give_away({T,l,_} = K, To, Pid) when T==c; T==p ->
  1723. Key = {K, Pid},
  1724. case ets:lookup(?TAB, Key) of
  1725. [{_, Pid, Value}] ->
  1726. case pid_to_give_away_to(To) of
  1727. ToPid when is_pid(ToPid) ->
  1728. ToKey = {K, ToPid},
  1729. case ets:member(?TAB, ToKey) of
  1730. true ->
  1731. badarg;
  1732. false ->
  1733. ets:insert(?TAB, [{ToKey, ToPid, Value},
  1734. {{ToPid, K}, []}]),
  1735. ets:delete(?TAB, {Pid, K}),
  1736. ets:delete(?TAB, Key),
  1737. _ = gproc_lib:ensure_monitor(ToPid, l),
  1738. ToPid
  1739. end;
  1740. undefined ->
  1741. _ = gproc_lib:remove_reg(K, Pid, {migrated, undefined}),
  1742. undefined
  1743. end;
  1744. _ ->
  1745. badarg
  1746. end.
  1747. pid_to_give_away_to(P) when is_pid(P), node(P) == node() ->
  1748. P;
  1749. pid_to_give_away_to({T,l,_} = Key) when T==n; T==a ->
  1750. case ets:lookup(?TAB, {Key, T}) of
  1751. [{_, Pid, _}] ->
  1752. Pid;
  1753. _ ->
  1754. undefined
  1755. end.
  1756. create_tabs() ->
  1757. Opts = gproc_lib:valid_opts(ets_options, [{write_concurrency,true},
  1758. {read_concurrency, true}]),
  1759. case ets:info(?TAB, name) of
  1760. undefined ->
  1761. ets:new(?TAB, [ordered_set, public, named_table | Opts]);
  1762. _ ->
  1763. ok
  1764. end.
  1765. %% @hidden
  1766. init([]) ->
  1767. set_monitors(),
  1768. {ok, #state{}}.
  1769. set_monitors() ->
  1770. set_monitors(ets:select(?TAB, [{{{'$1',l}},[],['$1']}], 100)).
  1771. set_monitors('$end_of_table') ->
  1772. ok;
  1773. set_monitors({Pids, Cont}) ->
  1774. _ = [erlang:monitor(process,Pid) || Pid <- Pids],
  1775. set_monitors(ets:select(Cont)).
  1776. monitor_me() ->
  1777. case ets:insert_new(?TAB, {{self(),l}}) of
  1778. false -> true;
  1779. true ->
  1780. cast({monitor_me,self()}),
  1781. true
  1782. end.
  1783. pattern([{'_', Gs, As}], T) ->
  1784. ?l,
  1785. {HeadPat, Vs} = headpat(T, '$1', '$2', '$3'),
  1786. [{HeadPat, rewrite(Gs,Vs), rewrite(As,Vs)}];
  1787. pattern([{{A,B,C},Gs,As}], Scope) ->
  1788. ?l,
  1789. {HeadPat, Vars} = headpat(Scope, A,B,C),
  1790. [{HeadPat, rewrite(Gs,Vars), rewrite(As,Vars)}];
  1791. pattern([{Head, Gs, As}], Scope) ->
  1792. ?l,
  1793. {S, T} = get_s_t(Scope),
  1794. case is_var(Head) of
  1795. {true,_N} ->
  1796. HeadPat = {{{T,S,'_'},'_'},'_','_'},
  1797. Vs = [{Head, obj_prod()}],
  1798. %% the headpat function should somehow verify that Head is
  1799. %% consistent with Scope (or should we add a guard?)
  1800. [{HeadPat, rewrite(Gs, Vs), rewrite(As, Vs)}];
  1801. false ->
  1802. erlang:error(badarg)
  1803. end.
  1804. %% This is the expression to use in guards and the RHS to address the whole
  1805. %% object, in its logical representation.
  1806. obj_prod() ->
  1807. {{ {element,1,{element,1,'$_'}},
  1808. {element,2,'$_'},
  1809. {element,3,'$_'} }}.
  1810. obj_prod_l() ->
  1811. [ {element,1,{element,1,'$_'}},
  1812. {element,2,'$_'},
  1813. {element,3,'$_'} ].
  1814. headpat({S, T}, V1,V2,V3) ->
  1815. headpat(type(T), scope(S), V1,V2,V3);
  1816. headpat(T, V1, V2, V3) when is_atom(T) ->
  1817. headpat(type(T), l, V1, V2, V3);
  1818. headpat(_, _, _, _) -> erlang:error(badarg).
  1819. headpat(T, C, V1,V2,V3) ->
  1820. Rf = fun(Pos) ->
  1821. {element,Pos,{element,1,{element,1,'$_'}}}
  1822. end,
  1823. K2 = if T==n orelse T==a -> T;
  1824. true -> '_'
  1825. end,
  1826. {Kp,Vars} = case V1 of
  1827. {Vt,Vc,Vn} ->
  1828. ?l,
  1829. {T1,Vs1} = subst(T,Vt,fun() -> Rf(1) end,[]),
  1830. {C1,Vs2} = subst(C,Vc,fun() -> Rf(2) end,Vs1),
  1831. {{T1,C1,Vn}, Vs2};
  1832. '_' ->
  1833. ?l,
  1834. {{T,C,'_'}, []};
  1835. _ ->
  1836. ?l,
  1837. case is_var(V1) of
  1838. {true,_} ->
  1839. {{T,C,V1}, [{V1, {element,1,
  1840. {element,1,'$_'}}}]};
  1841. false ->
  1842. erlang:error(badarg)
  1843. end
  1844. end,
  1845. {{{Kp,K2},V2,V3}, Vars}.
  1846. %% l(L) -> L.
  1847. subst(X, '_', _F, Vs) ->
  1848. {X, Vs};
  1849. subst(X, V, F, Vs) ->
  1850. case is_var(V) of
  1851. {true,_} ->
  1852. {X, [{V,F()}|Vs]};
  1853. false ->
  1854. {V, Vs}
  1855. end.
  1856. scope('_') -> '_';
  1857. scope(all) -> '_';
  1858. scope(global) -> g;
  1859. scope(local) -> l;
  1860. scope(S) when S==l; S==g -> S.
  1861. type('_') -> '_';
  1862. type(all) -> '_';
  1863. type(T) when T==n; T==p; T==c; T==a -> T;
  1864. type(names) -> n;
  1865. type(props) -> p;
  1866. type(counters) -> c;
  1867. type(aggr_counters) -> a.
  1868. rev_keypat(Context) ->
  1869. {S,T} = get_s_t(Context),
  1870. {T,S,'_'}.
  1871. get_s_t({S,T}) -> {scope(S), type(T)};
  1872. get_s_t(T) when is_atom(T) ->
  1873. {scope(all), type(T)}.
  1874. is_var('$1') -> {true,1};
  1875. is_var('$2') -> {true,2};
  1876. is_var('$3') -> {true,3};
  1877. is_var('$4') -> {true,4};
  1878. is_var('$5') -> {true,5};
  1879. is_var('$6') -> {true,6};
  1880. is_var('$7') -> {true,7};
  1881. is_var('$8') -> {true,8};
  1882. is_var('$9') -> {true,9};
  1883. is_var(X) when is_atom(X) ->
  1884. case atom_to_list(X) of
  1885. "\$" ++ Tl ->
  1886. try N = list_to_integer(Tl),
  1887. {true,N}
  1888. catch
  1889. error:_ ->
  1890. false
  1891. end;
  1892. _ ->
  1893. false
  1894. end;
  1895. is_var(_) -> false.
  1896. rewrite(Gs, R) ->
  1897. [rewrite1(G, R) || G <- Gs].
  1898. rewrite1('$_', _) ->
  1899. obj_prod();
  1900. rewrite1('$$', _) ->
  1901. obj_prod_l();
  1902. rewrite1(Guard, R) when is_tuple(Guard) ->
  1903. list_to_tuple([rewrite1(G, R) || G <- tuple_to_list(Guard)]);
  1904. rewrite1(Exprs, R) when is_list(Exprs) ->
  1905. [rewrite1(E, R) || E <- Exprs];
  1906. rewrite1(V, R) when is_atom(V) ->
  1907. case is_var(V) of
  1908. {true,_N} ->
  1909. case lists:keysearch(V, 1, R) of
  1910. {value, {_, V1}} ->
  1911. V1;
  1912. false ->
  1913. V
  1914. end;
  1915. false ->
  1916. V
  1917. end;
  1918. rewrite1(Expr, _) ->
  1919. Expr.
  1920. %% @spec () -> any()
  1921. %%
  1922. %% @doc
  1923. %% @equiv table({all, all})
  1924. %% @end
  1925. table() ->
  1926. table({all, all}).
  1927. %% @spec (Context::context()) -> any()
  1928. %%
  1929. %% @doc
  1930. %% @equiv table(Context, [])
  1931. %% @end
  1932. %%
  1933. table(Context) ->
  1934. table(Context, []).
  1935. %% @spec (Context::context(), Opts) -> any()
  1936. %%
  1937. %% @doc QLC table generator for the gproc registry.
  1938. %% Context specifies which subset of the registry should be queried.
  1939. %% See [http://www.erlang.org/doc/man/qlc.html].
  1940. %% @end
  1941. table(Context, Opts) ->
  1942. Ctxt = {_, Type} = get_s_t(Context),
  1943. [Traverse, NObjs] = [proplists:get_value(K,Opts,Def) ||
  1944. {K,Def} <- [{traverse,select}, {n_objects,100}]],
  1945. TF = case Traverse of
  1946. first_next ->
  1947. fun() -> qlc_next(Ctxt, first(Ctxt)) end;
  1948. last_prev -> fun() -> qlc_prev(Ctxt, last(Ctxt)) end;
  1949. select ->
  1950. fun(MS) -> qlc_select(select(Ctxt, MS, NObjs)) end;
  1951. {select,MS} ->
  1952. fun() -> qlc_select(select(Ctxt, MS, NObjs)) end;
  1953. _ ->
  1954. erlang:error(badarg, [Ctxt,Opts])
  1955. end,
  1956. InfoFun = fun(indices) -> [2];
  1957. (is_unique_objects) -> is_unique(Type);
  1958. (keypos) -> 1;
  1959. (is_sorted_key) -> true;
  1960. (num_of_objects) ->
  1961. %% this is just a guesstimate.
  1962. trunc(ets:info(?TAB,size) / 2.5)
  1963. end,
  1964. LookupFun =
  1965. case Traverse of
  1966. {select, _MS} -> undefined;
  1967. _ -> fun(Pos, Ks) -> qlc_lookup(Ctxt, Pos, Ks) end
  1968. end,
  1969. qlc:table(TF, [{info_fun, InfoFun},
  1970. {lookup_fun, LookupFun}] ++ [{K,V} || {K,V} <- Opts,
  1971. K =/= traverse,
  1972. K =/= n_objects]).
  1973. qlc_lookup(_Scope, 1, Keys) ->
  1974. lists:flatmap(
  1975. fun(Key) ->
  1976. ets:select(?TAB, [{ {{Key,'_'},'_','_'}, [],
  1977. [{{ {element,1,{element,1,'$_'}},
  1978. {element,2,'$_'},
  1979. {element,3,'$_'} }}] }])
  1980. end, Keys);
  1981. qlc_lookup(Scope, 2, Pids) ->
  1982. lists:flatmap(fun(Pid) ->
  1983. Found =
  1984. ets:select(?TAB, [{{{Pid, rev_keypat(Scope)}, '_'},
  1985. [], ['$_']}]),
  1986. lists:flatmap(
  1987. fun({{_,{T,_,_}=K}, _}) ->
  1988. K2 = if T==n orelse T==a -> T;
  1989. true -> Pid
  1990. end,
  1991. case ets:lookup(?TAB, {K,K2}) of
  1992. [{{Key,_},_,Value}] ->
  1993. [{Key, Pid, Value}];
  1994. [] ->
  1995. []
  1996. end
  1997. end, Found)
  1998. end, Pids).
  1999. qlc_next(_, '$end_of_table') -> [];
  2000. qlc_next(Scope, K) ->
  2001. case ets:lookup(?TAB, K) of
  2002. [{{Key,_}, Pid, V}] ->
  2003. [{Key,Pid,V}] ++ fun() -> qlc_next(Scope, next(Scope, K)) end;
  2004. [] ->
  2005. qlc_next(Scope, next(Scope, K))
  2006. end.
  2007. qlc_prev(_, '$end_of_table') -> [];
  2008. qlc_prev(Scope, K) ->
  2009. case ets:lookup(?TAB, K) of
  2010. [{{Key,_},Pid,V}] ->
  2011. [{Key,Pid,V}] ++ fun() -> qlc_prev(Scope, prev(Scope, K)) end;
  2012. [] ->
  2013. qlc_prev(Scope, prev(Scope, K))
  2014. end.
  2015. qlc_select('$end_of_table') ->
  2016. [];
  2017. qlc_select({Objects, Cont}) ->
  2018. Objects ++ fun() -> qlc_select(ets:select(Cont)) end.
  2019. is_unique(n) -> true;
  2020. is_unique(a) -> true;
  2021. is_unique(_) -> false.