gproc_lib.erl 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523
  1. %% ``The contents of this file are subject to the Erlang Public License,
  2. %% Version 1.1, (the "License"); you may not use this file except in
  3. %% compliance with the License. You should have received a copy of the
  4. %% Erlang Public License along with this software. If not, it can be
  5. %% retrieved via the world wide web at http://www.erlang.org/.
  6. %%
  7. %% Software distributed under the License is distributed on an "AS IS"
  8. %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  9. %% the License for the specific language governing rights and limitations
  10. %% under the License.
  11. %%
  12. %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
  13. %% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
  14. %% AB. All Rights Reserved.''
  15. %%
  16. %% @author Ulf Wiger <ulf@wiger.net>
  17. %%
  18. %% @doc Extended process registry
  19. %% <p>This module implements an extended process registry</p>
  20. %% <p>For a detailed description, see gproc/doc/erlang07-wiger.pdf.</p>
  21. %% @end
  22. -module(gproc_lib).
  23. -export([await/3,
  24. do_set_counter_value/3,
  25. do_set_value/3,
  26. ensure_monitor/2,
  27. insert_many/4,
  28. insert_reg/4,
  29. insert_attr/4,
  30. remove_many/4,
  31. remove_reg/3, remove_reg/4,
  32. monitors/1,
  33. standbys/1,
  34. remove_monitor_pid/2,
  35. add_monitor/4,
  36. remove_monitor/3,
  37. remove_monitors/3,
  38. remove_reverse_mapping/3, remove_reverse_mapping/4,
  39. notify/2, notify/3,
  40. remove_wait/4,
  41. update_aggr_counter/3,
  42. update_counter/3,
  43. valid_opts/2]).
  44. -include("gproc_int.hrl").
  45. -include("gproc.hrl").
  46. %% We want to store names and aggregated counters with the same
  47. %% structure as properties, but at the same time, we must ensure
  48. %% that the key is unique. We replace the Pid in the key part
  49. %% with an atom. To know which Pid owns the object, we lug the
  50. %% Pid around as payload as well. This is a bit redundant, but
  51. %% symmetric.
  52. %%
  53. -spec insert_reg(key(), any(), pid() | shared, scope()) -> boolean().
  54. insert_reg({T,_,Name} = K, Value, Pid, Scope) when T==a; T==n ->
  55. MaybeScan = fun() ->
  56. if T==a ->
  57. Initial = scan_existing_counters(Scope, Name),
  58. ets:insert(?TAB, {{K,a}, Pid, Initial});
  59. true ->
  60. true
  61. end
  62. end,
  63. Info = [{{K, T}, Pid, Value}, {{Pid,K}, []}],
  64. case ets:insert_new(?TAB, Info) of
  65. true ->
  66. MaybeScan();
  67. false ->
  68. if T==n ->
  69. maybe_waiters(K, Pid, Value, T, Info);
  70. true ->
  71. false
  72. end
  73. end;
  74. insert_reg({p,Scope,_} = K, Value, shared, Scope)
  75. when Scope == g; Scope == l ->
  76. %% shared properties are unique
  77. Info = [{{K, shared}, shared, Value}, {{shared,K}, []}],
  78. ets:insert_new(?TAB, Info);
  79. insert_reg({c,Scope,Ctr} = Key, Value, Pid, Scope) when Scope==l; Scope==g ->
  80. %% Non-unique keys; store Pid in the key part
  81. K = {Key, Pid},
  82. Kr = {Pid, Key},
  83. Res = ets:insert_new(?TAB, [{K, Pid, Value}, {Kr, [{initial, Value}]}]),
  84. case Res of
  85. true ->
  86. update_aggr_counter(Scope, Ctr, Value);
  87. false ->
  88. ignore
  89. end,
  90. Res;
  91. insert_reg({_,_,_} = Key, Value, Pid, _Scope) when is_pid(Pid) ->
  92. %% Non-unique keys; store Pid in the key part
  93. K = {Key, Pid},
  94. Kr = {Pid, Key},
  95. ets:insert_new(?TAB, [{K, Pid, Value}, {Kr, []}]).
  96. insert_attr({_,Scope,_} = Key, Attrs, Pid, Scope) when Scope==l;
  97. Scope==g ->
  98. case ets:lookup(?TAB, K = {Pid, Key}) of
  99. [{_, Attrs0}] when is_list(Attrs) ->
  100. As = proplists:get_value(attrs, Attrs0, []),
  101. As1 = lists:foldl(fun({K1,_} = Attr, Acc) ->
  102. lists:keystore(K1, 1, Acc, Attr)
  103. end, As, Attrs),
  104. Attrs1 = lists:keystore(attrs, 1, Attrs0, {attrs, As1}),
  105. ets:insert(?TAB, {K, Attrs1}),
  106. Attrs1;
  107. _ ->
  108. false
  109. end.
  110. -spec insert_many(type(), scope(), [{key(),any()}], pid()) ->
  111. {true,list()} | false.
  112. insert_many(T, Scope, KVL, Pid) ->
  113. Objs = mk_reg_objs(T, Scope, Pid, KVL),
  114. case ets:insert_new(?TAB, Objs) of
  115. true ->
  116. RevObjs = mk_reg_rev_objs(T, Scope, Pid, KVL),
  117. ets:insert(?TAB, RevObjs),
  118. _ = gproc_lib:ensure_monitor(Pid, Scope),
  119. {true, Objs};
  120. false ->
  121. Existing = [{Obj, ets:lookup(?TAB, K)} || {K,_,_} = Obj <- Objs],
  122. case lists:any(fun({_, [{_, _, _}]}) ->
  123. true;
  124. (_) ->
  125. %% (not found), or waiters registered
  126. false
  127. end, Existing) of
  128. true ->
  129. %% conflict; return 'false', indicating failure
  130. false;
  131. false ->
  132. %% possibly waiters, but they are handled in next step
  133. insert_objects(Existing),
  134. _ = gproc_lib:ensure_monitor(Pid, Scope),
  135. {true, Objs}
  136. end
  137. end.
  138. -spec insert_objects([{key(), pid(), any()}]) -> ok.
  139. insert_objects(Objs) ->
  140. lists:foreach(
  141. fun({{{Id,_} = _K, Pid, V} = Obj, Existing}) ->
  142. ets:insert(?TAB, [Obj, {{Pid, Id}, []}]),
  143. case Existing of
  144. [] -> ok;
  145. [{_, Waiters}] ->
  146. notify_waiters(Waiters, Id, Pid, V)
  147. end
  148. end, Objs).
  149. await({T,C,_} = Key, WPid, {_Pid, Ref} = From) ->
  150. Rev = {{WPid,Key}, []},
  151. case ets:lookup(?TAB, {Key,T}) of
  152. [{_, P, Value}] ->
  153. %% for symmetry, we always reply with Ref and then send a message
  154. if C == g ->
  155. %% in the global case, we bundle the reply, since otherwise
  156. %% the messages can pass each other
  157. {reply, {Ref, {Key, P, Value}}};
  158. true ->
  159. gen_server:reply(From, Ref),
  160. WPid ! {gproc, Ref, registered, {Key, P, Value}},
  161. noreply
  162. end;
  163. [{K, Waiters}] ->
  164. NewWaiters = [{WPid,Ref} | Waiters],
  165. W = {K, NewWaiters},
  166. ets:insert(?TAB, [W, Rev]),
  167. _ = gproc_lib:ensure_monitor(WPid,C),
  168. {reply, Ref, [W,Rev]};
  169. [] ->
  170. W = {{Key,T}, [{WPid,Ref}]},
  171. ets:insert(?TAB, [W, Rev]),
  172. _ = gproc_lib:ensure_monitor(WPid,C),
  173. {reply, Ref, [W,Rev]}
  174. end.
  175. maybe_waiters(K, Pid, Value, T, Info) ->
  176. case ets:lookup(?TAB, {K,T}) of
  177. [{_, Waiters}] when is_list(Waiters) ->
  178. ets:insert(?TAB, Info),
  179. notify_waiters(Waiters, K, Pid, Value),
  180. true;
  181. [_] ->
  182. false
  183. end.
  184. -spec notify_waiters([{pid(), reference()}], key(), pid(), any()) -> ok.
  185. notify_waiters(Waiters, K, Pid, V) ->
  186. _ = [begin
  187. P ! {gproc, Ref, registered, {K, Pid, V}},
  188. case P of
  189. Pid -> ignore;
  190. _ ->
  191. ets:delete(?TAB, {P, K})
  192. end
  193. end || {P, Ref} <- Waiters],
  194. ok.
  195. remove_wait({T,_,_} = Key, Pid, Ref, Waiters) ->
  196. Rev = {Pid,Key},
  197. case remove_from_waiters(Waiters, Pid, Ref) of
  198. [] ->
  199. ets:delete(?TAB, {Key,T}),
  200. ets:delete(?TAB, Rev),
  201. [{delete, [{Key,T}, Rev], []}];
  202. NewWaiters ->
  203. ets:insert(?TAB, {Key, NewWaiters}),
  204. case lists:keymember(Pid, 1, NewWaiters) of
  205. true ->
  206. %% should be extremely unlikely
  207. [{insert, [{Key, NewWaiters}]}];
  208. false ->
  209. %% delete the reverse entry
  210. ets:delete(?TAB, Rev),
  211. [{insert, [{Key, NewWaiters}]},
  212. {delete, [Rev], []}]
  213. end
  214. end.
  215. remove_from_waiters(Waiters, Pid, all) ->
  216. [{P,R} || {P,R} <- Waiters,
  217. P =/= Pid];
  218. remove_from_waiters(Waiters, Pid, Ref) ->
  219. Waiters -- [{Pid, Ref}].
  220. remove_monitors(Key, Pid, MPid) ->
  221. case ets:lookup(?TAB, {Pid, Key}) of
  222. [{_, r}] ->
  223. [];
  224. [{K, Opts}] when is_list(Opts) ->
  225. case lists:keyfind(monitors, 1, Opts) of
  226. false ->
  227. [];
  228. {_, Ms} ->
  229. Ms1 = [{P,R} || {P,R} <- Ms,
  230. P =/= MPid],
  231. NewMs = lists:keyreplace(monitors, 1, Opts, {monitors,Ms1}),
  232. ets:insert(?TAB, {K, NewMs}),
  233. [{insert, [{{Pid,Key}, NewMs}]}]
  234. end;
  235. _ ->
  236. []
  237. end.
  238. mk_reg_objs(T, Scope, Pid, L) when T==n; T==a ->
  239. lists:map(fun({K,V}) ->
  240. {{{T,Scope,K},T}, Pid, V};
  241. (_) ->
  242. erlang:error(badarg)
  243. end, L);
  244. mk_reg_objs(p = T, Scope, Pid, L) ->
  245. lists:map(fun({K,V}) ->
  246. {{{T,Scope,K},Pid}, Pid, V};
  247. (_) ->
  248. erlang:error(badarg)
  249. end, L).
  250. mk_reg_rev_objs(T, Scope, Pid, L) ->
  251. [{{Pid,{T,Scope,K}}, []} || {K,_} <- L].
  252. ensure_monitor(shared, _) ->
  253. ok;
  254. ensure_monitor(Pid, _) when Pid == self() ->
  255. %% monitoring is ensured through a 'monitor_me' message
  256. ok;
  257. ensure_monitor(Pid, Scope) when Scope==g; Scope==l ->
  258. case ets:insert_new(?TAB, {{Pid, Scope}}) of
  259. false -> ok;
  260. true -> erlang:monitor(process, Pid)
  261. end.
  262. remove_reg(Key, Pid, Event) ->
  263. Reg = remove_reg_1(Key, Pid),
  264. Rev = remove_reverse_mapping(Event, Pid, Key),
  265. [Reg, Rev].
  266. remove_reg(Key, Pid, Event, Opts) ->
  267. Reg = remove_reg_1(Key, Pid),
  268. Rev = remove_reverse_mapping(Event, Pid, Key, Opts),
  269. [Reg, Rev].
  270. remove_reverse_mapping(Event, Pid, Key) ->
  271. Opts = case ets:lookup(?TAB, {Pid, Key}) of
  272. [] -> [];
  273. [{_, r}] -> [];
  274. [{_, L}] when is_list(L) ->
  275. L
  276. end,
  277. remove_reverse_mapping(Event, Pid, Key, Opts).
  278. remove_reverse_mapping(Event, Pid, Key, Opts) when Event==unreg;
  279. element(1,Event)==migrated;
  280. element(1,Event)==failover ->
  281. Rev = {Pid, Key},
  282. _ = notify(Event, Key, Opts),
  283. ets:delete(?TAB, Rev),
  284. Rev.
  285. notify(Key, Opts) ->
  286. notify(unreg, Key, Opts).
  287. monitors(Opts) ->
  288. case lists:keyfind(monitor, 1, Opts) of
  289. false ->
  290. [];
  291. {_, Mons} ->
  292. Mons
  293. end.
  294. standbys(Opts) ->
  295. standbys(monitors(Opts), []).
  296. standbys([{_,_,standby}=H|T], Acc) ->
  297. standbys(T, [H|Acc]);
  298. standbys([_|T], Acc) ->
  299. standbys(T, Acc);
  300. standbys([], Acc) ->
  301. Acc.
  302. remove_monitor_pid([{monitor, Mons}|T], Pid) ->
  303. [{monitors, [M || M <- Mons,
  304. element(1, M) =/= Pid]}|T];
  305. remove_monitor_pid([H|T], Pid) ->
  306. [H | remove_monitor_pid(T, Pid)];
  307. remove_monitor_pid([], _) ->
  308. [].
  309. notify([], _, _) ->
  310. ok;
  311. notify(Event, Key, Opts) ->
  312. notify_(monitors(Opts), Event, Key).
  313. %% Also handle old-style monitors
  314. notify_([{Pid,Ref}|T], Event, Key) ->
  315. Pid ! {gproc, Event, Ref, Key},
  316. notify_(T, Event, Key);
  317. notify_([{Pid,Ref,_}|T], Event, Key) ->
  318. Pid ! {gproc, Event, Ref, Key},
  319. notify_(T, Event, Key);
  320. notify_([_|T], Event, Key) ->
  321. notify_(T, Event, Key);
  322. notify_([], _, _) ->
  323. ok.
  324. add_monitor([{monitor, Mons}|T], Pid, Ref, Type) ->
  325. [{monitor, [{Pid,Ref,Type}|Mons]}|T];
  326. add_monitor([H|T], Pid, Ref, Type) ->
  327. [H|add_monitor(T, Pid, Ref, Type)];
  328. add_monitor([], Pid, Ref, Type) ->
  329. [{monitor, [{Pid, Ref, Type}]}].
  330. remove_monitor([{monitor, Mons}|T], Pid, Ref) ->
  331. [{monitor, [Mon || Mon <- Mons, not is_mon(Mon,Pid,Ref)]} | T];
  332. remove_monitor([H|T], Pid, Ref) ->
  333. [H|remove_monitor(T, Pid, Ref)];
  334. remove_monitor([], _Pid, _Ref) ->
  335. [].
  336. is_mon({Pid,Ref,_}, Pid, Ref) -> true;
  337. is_mon({Pid,Ref}, Pid, Ref) -> true;
  338. is_mon(_, _, _) ->
  339. false.
  340. remove_many(T, Scope, L, Pid) ->
  341. lists:flatmap(fun(K) ->
  342. Key = {T, Scope, K},
  343. remove_reg(Key, Pid, unreg, unreg_opts(Key, Pid))
  344. end, L).
  345. unreg_opts(Key, Pid) ->
  346. case ets:lookup(?TAB, {Pid, Key}) of
  347. [] ->
  348. [];
  349. [{_,r}] ->
  350. [];
  351. [{_,Opts}] ->
  352. Opts
  353. end.
  354. remove_reg_1({c,_,_} = Key, Pid) ->
  355. remove_counter_1(Key, ets:lookup_element(?TAB, Reg = {Key,Pid}, 3), Pid),
  356. Reg;
  357. remove_reg_1({a,_,_} = Key, _Pid) ->
  358. ets:delete(?TAB, Reg = {Key,a}),
  359. Reg;
  360. remove_reg_1({n,_,_} = Key, _Pid) ->
  361. ets:delete(?TAB, Reg = {Key,n}),
  362. Reg;
  363. remove_reg_1({_,_,_} = Key, Pid) ->
  364. ets:delete(?TAB, Reg = {Key, Pid}),
  365. Reg.
  366. remove_counter_1({c,C,N} = Key, Val, Pid) ->
  367. Res = ets:delete(?TAB, {Key, Pid}),
  368. update_aggr_counter(C, N, -Val),
  369. Res.
  370. do_set_value({T,_,_} = Key, Value, Pid) ->
  371. K2 = if Pid == shared -> shared;
  372. T==n orelse T==a -> T;
  373. true -> Pid
  374. end,
  375. case (catch ets:lookup_element(?TAB, {Key,K2}, 2)) of
  376. {'EXIT', {badarg, _}} ->
  377. false;
  378. Pid ->
  379. ets:insert(?TAB, {{Key, K2}, Pid, Value});
  380. _ ->
  381. false
  382. end.
  383. do_set_counter_value({_,C,N} = Key, Value, Pid) ->
  384. OldVal = ets:lookup_element(?TAB, {Key, Pid}, 3), % may fail with badarg
  385. Res = ets:insert(?TAB, {{Key, Pid}, Pid, Value}),
  386. update_aggr_counter(C, N, Value - OldVal),
  387. Res.
  388. update_counter({T,l,Ctr} = Key, Incr, Pid) when is_integer(Incr), T==c;
  389. is_integer(Incr), T==n ->
  390. Res = ets:update_counter(?TAB, {Key, Pid}, {3,Incr}),
  391. update_aggr_counter(l, Ctr, Incr),
  392. Res;
  393. update_counter({T,l,Ctr} = Key, {Incr, Threshold, SetValue}, Pid)
  394. when is_integer(Incr), is_integer(Threshold), is_integer(SetValue), T==c;
  395. is_integer(Incr), is_integer(Threshold), is_integer(SetValue), T==n ->
  396. [Prev, New] = ets:update_counter(?TAB, {Key, Pid},
  397. [{3, 0}, {3, Incr, Threshold, SetValue}]),
  398. update_aggr_counter(l, Ctr, New - Prev),
  399. New;
  400. update_counter({T,l,Ctr} = Key, Ops, Pid) when is_list(Ops), T==c;
  401. is_list(Ops), T==n ->
  402. case ets:update_counter(?TAB, {Key, Pid},
  403. [{3, 0} | expand_ops(Ops)]) of
  404. [_] ->
  405. [];
  406. [Prev | Rest] ->
  407. [New | _] = lists:reverse(Rest),
  408. update_aggr_counter(l, Ctr, New - Prev),
  409. Rest
  410. end;
  411. update_counter(_, _, _) ->
  412. ?THROW_GPROC_ERROR(badarg).
  413. expand_ops([{Incr,Thr,SetV}|T])
  414. when is_integer(Incr), is_integer(Thr), is_integer(SetV) ->
  415. [{3, Incr, Thr, SetV}|expand_ops(T)];
  416. expand_ops([Incr|T]) when is_integer(Incr) ->
  417. [{3, Incr}|expand_ops(T)];
  418. expand_ops([]) ->
  419. [];
  420. expand_ops(_) ->
  421. ?THROW_GPROC_ERROR(badarg).
  422. update_aggr_counter(C, N, Val) ->
  423. catch ets:update_counter(?TAB, {{a,C,N},a}, {3, Val}).
  424. scan_existing_counters(Ctxt, Name) ->
  425. Head = {{{c,Ctxt,Name},'_'},'_','$1'},
  426. Cs = ets:select(?TAB, [{Head, [], ['$1']}]),
  427. lists:sum(Cs).
  428. valid_opts(Type, Default) ->
  429. Opts = get_app_env(Type, Default),
  430. check_opts(Type, Opts).
  431. check_opts(Type, Opts) when is_list(Opts) ->
  432. Check = check_option_f(Type),
  433. lists:map(fun(X) ->
  434. case Check(X) of
  435. true -> X;
  436. false ->
  437. erlang:error({illegal_option, X}, [Type, Opts])
  438. end
  439. end, Opts);
  440. check_opts(Type, Other) ->
  441. erlang:error(invalid_options, [Type, Other]).
  442. check_option_f(ets_options) -> fun check_ets_option/1;
  443. check_option_f(server_options) -> fun check_server_option/1.
  444. check_ets_option({read_concurrency , B}) -> is_boolean(B);
  445. check_ets_option({write_concurrency, B}) -> is_boolean(B);
  446. check_ets_option(_) -> false.
  447. check_server_option({priority, P}) ->
  448. %% Forbid setting priority to 'low' since that would
  449. %% surely cause problems. Unsure about 'max'...
  450. lists:member(P, [normal, high, max]);
  451. check_server_option(_) ->
  452. %% assume it's a valid spawn option
  453. true.
  454. get_app_env(Key, Default) ->
  455. case application:get_env(Key) of
  456. undefined -> Default;
  457. {ok, undefined} -> Default;
  458. {ok, Value} -> Value
  459. end.