gproc_lib.erl 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701
  1. %% -*- erlang-indent-level: 4; indent-tabs-mode: nil -*-
  2. %% --------------------------------------------------
  3. %% This file is provided to you under the Apache License,
  4. %% Version 2.0 (the "License"); you may not use this file
  5. %% except in compliance with the License. You may obtain
  6. %% a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing,
  11. %% software distributed under the License is distributed on an
  12. %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  13. %% KIND, either express or implied. See the License for the
  14. %% specific language governing permissions and limitations
  15. %% under the License.
  16. %% --------------------------------------------------
  17. %%
  18. %% @author Ulf Wiger <ulf@wiger.net>
  19. %%
  20. %% @doc Extended process registry
  21. %% <p>This module implements an extended process registry</p>
  22. %% <p>For a detailed description, see gproc/doc/erlang07-wiger.pdf.</p>
  23. %% @end
  24. -module(gproc_lib).
  25. -export([await/3,
  26. do_set_counter_value/3,
  27. do_set_value/3,
  28. ensure_monitor/2,
  29. insert_many/4,
  30. insert_reg/4, insert_reg/5,
  31. insert_attr/4,
  32. remove_many/4,
  33. remove_reg/3, remove_reg/4,
  34. monitors/1,
  35. standbys/1,
  36. followers/1,
  37. remove_monitor_pid/2,
  38. does_pid_monitor/2,
  39. add_monitor/4,
  40. remove_monitor/3,
  41. remove_monitors/3,
  42. remove_reverse_mapping/3, remove_reverse_mapping/4,
  43. notify/2, notify/3,
  44. remove_wait/4,
  45. update_aggr_counter/3,
  46. update_counter/3,
  47. decrement_resource_count/2,
  48. valid_opts/2,
  49. valid_key/1]).
  50. -export([dbg/1]).
  51. -include("gproc_int.hrl").
  52. -include("gproc.hrl").
  53. dbg(Mods) ->
  54. dbg:tracer(),
  55. [dbg:tpl(M,x) || M <- Mods],
  56. dbg:tp(ets,'_',[{[gproc,'_'], [], [{message,{exception_trace}}]}]),
  57. dbg:p(all,[c]).
  58. %% We want to store names and aggregated counters with the same
  59. %% structure as properties, but at the same time, we must ensure
  60. %% that the key is unique. We replace the Pid in the key part
  61. %% with an atom. To know which Pid owns the object, we lug the
  62. %% Pid around as payload as well. This is a bit redundant, but
  63. %% symmetric.
  64. %%
  65. -spec insert_reg(gproc:key(), any(), pid() | shared, gproc:scope()) -> boolean().
  66. insert_reg(K, Value, Pid, Scope) ->
  67. insert_reg(K, Value, Pid, Scope, registered).
  68. insert_reg({T,_,Name} = K, Value, Pid, Scope, Event) when T==a; T==n; T==rc ->
  69. Res = case ets:insert_new(?TAB, {{K,T}, Pid, Value}) of
  70. true ->
  71. %% Use insert_new to avoid overwriting existing entry
  72. _ = ets:insert_new(?TAB, {{Pid,K}, []}),
  73. true;
  74. false ->
  75. maybe_waiters(K, Pid, Value, T, Event)
  76. end,
  77. case Res of
  78. true ->
  79. maybe_scan(T, Pid, Scope, Name, K);
  80. false ->
  81. false
  82. end,
  83. Res;
  84. insert_reg({p,Scope,_} = K, Value, shared, Scope, _E)
  85. when Scope == g; Scope == l ->
  86. %% shared properties are unique
  87. Info = [{{K, shared}, shared, Value}, {{shared,K}, []}],
  88. ets:insert_new(?TAB, Info);
  89. insert_reg({c,Scope,Ctr} = Key, Value, Pid, Scope, _E) when Scope==l; Scope==g ->
  90. %% Non-unique keys; store Pid in the key part
  91. K = {Key, Pid},
  92. Kr = {Pid, Key},
  93. Res = ets:insert_new(?TAB, [{K, Pid, Value}, {Kr, [{initial, Value}]}]),
  94. case Res of
  95. true ->
  96. update_aggr_counter(Scope, Ctr, Value);
  97. false ->
  98. ignore
  99. end,
  100. Res;
  101. insert_reg({r,Scope,R} = Key, Value, Pid, Scope, _E) when Scope==l; Scope==g ->
  102. K = {Key, Pid},
  103. Kr = {Pid, Key},
  104. Res = ets:insert_new(?TAB, [{K, Pid, Value}, {Kr, [{initial, Value}]}]),
  105. case Res of
  106. true ->
  107. update_resource_count(Scope, R, 1);
  108. false ->
  109. ignore
  110. end,
  111. Res;
  112. insert_reg({_,_,_} = Key, Value, Pid, _Scope, _E) when is_pid(Pid) ->
  113. %% Non-unique keys; store Pid in the key part
  114. K = {Key, Pid},
  115. Kr = {Pid, Key},
  116. ets:insert_new(?TAB, [{K, Pid, Value}, {Kr, []}]).
  117. maybe_scan(a, Pid, Scope, Name, K) ->
  118. Initial = scan_existing_counters(Scope, Name),
  119. ets:insert(?TAB, {{K,a}, Pid, Initial});
  120. maybe_scan(rc, Pid, Scope, Name, K) ->
  121. Initial = scan_existing_resources(Scope, Name),
  122. ets:insert(?TAB, {{K,rc}, Pid, Initial});
  123. maybe_scan(_, _, _, _, _) ->
  124. true.
  125. insert_attr({_,Scope,_} = Key, Attrs, Pid, Scope) when Scope==l;
  126. Scope==g ->
  127. case ets:lookup(?TAB, K = {Pid, Key}) of
  128. [{_, Attrs0}] when is_list(Attrs) ->
  129. As = proplists:get_value(attrs, Attrs0, []),
  130. As1 = lists:foldl(fun({K1,_} = Attr, Acc) ->
  131. lists:keystore(K1, 1, Acc, Attr)
  132. end, As, Attrs),
  133. Attrs1 = lists:keystore(attrs, 1, Attrs0, {attrs, As1}),
  134. ets:insert(?TAB, {K, Attrs1}),
  135. Attrs1;
  136. _ ->
  137. false
  138. end.
  139. get_attr(Attr, Pid, {_,_,_} = Key, Default) ->
  140. case ets:lookup(?TAB, {Pid, Key}) of
  141. [{_, Opts}] when is_list(Opts) ->
  142. case lists:keyfind(attrs, 1, Opts) of
  143. {_, Attrs} ->
  144. case lists:keyfind(Attr, 1, Attrs) of
  145. {_, Val} ->
  146. Val;
  147. _ ->
  148. Default
  149. end;
  150. _ ->
  151. Default
  152. end;
  153. _ ->
  154. Default
  155. end.
  156. -spec insert_many(gproc:type(), gproc:scope(), [{gproc:key(),any()}], pid()) ->
  157. {true,list()} | false.
  158. insert_many(T, Scope, KVL, Pid) ->
  159. try insert_many_(T, Scope, KVL, Pid)
  160. catch
  161. throw:?GPROC_THROW(_) ->
  162. false
  163. end.
  164. insert_many_(T, Scope, KVL, Pid) ->
  165. Objs = mk_reg_objs(T, Scope, Pid, KVL),
  166. case ets:insert_new(?TAB, Objs) of
  167. true ->
  168. RevObjs = mk_reg_rev_objs(T, Scope, Pid, KVL),
  169. ets:insert(?TAB, RevObjs),
  170. _ = gproc_lib:ensure_monitor(Pid, Scope),
  171. {true, Objs};
  172. false ->
  173. Existing = [{Obj, ets:lookup(?TAB, K)} || {K,_,_} = Obj <- Objs],
  174. case lists:any(fun({_, [{_, _, _}]}) ->
  175. true;
  176. (_) ->
  177. %% (not found), or waiters registered
  178. false
  179. end, Existing) of
  180. true ->
  181. %% conflict; return 'false', indicating failure
  182. false;
  183. false ->
  184. %% possibly waiters, but they are handled in next step
  185. insert_objects(Existing),
  186. _ = gproc_lib:ensure_monitor(Pid, Scope),
  187. {true, Objs}
  188. end
  189. end.
  190. -spec insert_objects([{gproc:key(), pid(), any()}]) -> ok.
  191. insert_objects(Objs) ->
  192. lists:foreach(
  193. fun({{{Id,_} = _K, Pid, V} = Obj, Existing}) ->
  194. ets:insert(?TAB, [Obj, {{Pid, Id}, []}]),
  195. case Existing of
  196. [] -> ok;
  197. [{_, Waiters}] ->
  198. notify_waiters(Waiters, Id, Pid, V, registered)
  199. end
  200. end, Objs).
  201. await({T,C,_} = Key, WPid, {_Pid, Ref} = From) ->
  202. Rev = {{WPid,Key}, []},
  203. case ets:lookup(?TAB, {Key,T}) of
  204. [{_, P, Value}] ->
  205. %% for symmetry, we always reply with Ref and then send a message
  206. if C == g ->
  207. %% in the global case, we bundle the reply, since otherwise
  208. %% the messages can pass each other
  209. {reply, {Ref, {Key, P, Value}}};
  210. true ->
  211. gen_server:reply(From, Ref),
  212. WPid ! {gproc, Ref, registered, {Key, P, Value}},
  213. noreply
  214. end;
  215. [{K, Waiters}] ->
  216. NewWaiters = [{WPid,Ref} | Waiters],
  217. W = {K, NewWaiters},
  218. ets:insert(?TAB, [W, Rev]),
  219. _ = gproc_lib:ensure_monitor(WPid,C),
  220. {reply, Ref, [W,Rev]};
  221. [] ->
  222. W = {{Key,T}, [{WPid,Ref}]},
  223. ets:insert(?TAB, [W, Rev]),
  224. _ = gproc_lib:ensure_monitor(WPid,C),
  225. {reply, Ref, [W,Rev]}
  226. end.
  227. maybe_waiters(_, _, _, _, []) ->
  228. false;
  229. maybe_waiters(K, Pid, Value, T, Event) ->
  230. case ets:lookup(?TAB, {K,T}) of
  231. [{_, Waiters}] when is_list(Waiters) ->
  232. Followers = [F || {_,_,follow} = F <- Waiters],
  233. ets:insert(?TAB, [{{K,T}, Pid, Value},
  234. {{Pid,K}, [{monitor, Followers}
  235. || Followers =/= []]}]),
  236. notify_waiters(Waiters, K, Pid, Value, Event),
  237. true;
  238. _ ->
  239. false
  240. end.
  241. -spec notify_waiters([{pid(), reference()}], gproc:key(), pid(), any(), any()) -> ok.
  242. notify_waiters([{P, Ref}|T], K, Pid, V, E) ->
  243. P ! {gproc, Ref, registered, {K, Pid, V}},
  244. notify_waiters(T, K, Pid, V, E);
  245. notify_waiters([{P, Ref, follow}|T], K, Pid, V, E) ->
  246. %% This is really a monitor, lurking in the Waiters list
  247. P ! {gproc, E, Ref, K},
  248. notify_waiters(T, K, Pid, V, E);
  249. notify_waiters([], _, _, _, _) ->
  250. ok.
  251. remove_wait({T,_,_} = Key, Pid, Ref, Waiters) ->
  252. Rev = {Pid,Key},
  253. case remove_from_waiters(Waiters, Pid, Ref) of
  254. [] ->
  255. ets:delete(?TAB, {Key,T}),
  256. ets:delete(?TAB, Rev),
  257. [{delete, [{Key,T}, Rev], []}];
  258. NewWaiters ->
  259. ets:insert(?TAB, {Key, NewWaiters}),
  260. case lists:keymember(Pid, 1, NewWaiters) of
  261. true ->
  262. %% should be extremely unlikely
  263. [{insert, [{Key, NewWaiters}]}];
  264. false ->
  265. %% delete the reverse entry
  266. ets:delete(?TAB, Rev),
  267. [{insert, [{Key, NewWaiters}]},
  268. {delete, [Rev], []}]
  269. end
  270. end.
  271. remove_from_waiters(Waiters, Pid, all) ->
  272. [W || W <- Waiters,
  273. element(1,W) =/= Pid];
  274. remove_from_waiters(Waiters, Pid, Ref) ->
  275. [W || W <- Waiters, not is_waiter(W, Pid, Ref)].
  276. is_waiter({Pid, Ref} , Pid, Ref) -> true;
  277. is_waiter({Pid, Ref, _}, Pid, Ref) -> true;
  278. is_waiter(_, _, _) ->
  279. false.
  280. remove_monitors(Key, Pid, MPid) ->
  281. case ets:lookup(?TAB, {Pid, Key}) of
  282. [{_, r}] ->
  283. [];
  284. [{K, Opts}] when is_list(Opts) ->
  285. case lists:keyfind(monitor, 1, Opts) of
  286. false ->
  287. [];
  288. {_, Ms} ->
  289. Ms1 = [{P,R,T} || {P,R,T} <- Ms,
  290. P =/= MPid],
  291. NewMs = lists:keyreplace(monitor, 1, Opts, {monitor,Ms1}),
  292. ets:insert(?TAB, {K, NewMs}),
  293. [{insert, [{{Pid,Key}, NewMs}]}]
  294. end;
  295. _ ->
  296. []
  297. end.
  298. does_pid_monitor(Pid, Opts) ->
  299. case lists:keyfind(monitor, 1, Opts) of
  300. false ->
  301. false;
  302. {_, Ms} ->
  303. lists:keymember(Pid, 1, Ms)
  304. end.
  305. mk_reg_objs(T, Scope, Pid, L) when T==n; T==a; T==rc ->
  306. lists:map(fun({K,V}) ->
  307. Key = {T, Scope, K},
  308. _ = valid_key(Key),
  309. {{Key,T}, Pid, V};
  310. (_) ->
  311. ?THROW_GPROC_ERROR(badarg)
  312. end, L);
  313. mk_reg_objs(T, Scope, Pid, L) when T==p; T==r ->
  314. lists:map(fun({K,V}) ->
  315. Key = {T, Scope, K},
  316. _ = valid_key(Key),
  317. {{Key,Pid}, Pid, V};
  318. (_) ->
  319. ?THROW_GPROC_ERROR(badarg)
  320. end, L).
  321. mk_reg_rev_objs(T, Scope, Pid, L) ->
  322. [{{Pid,{T,Scope,K}}, []} || {K,_} <- L].
  323. valid_key({r,_,R} = Key) when is_tuple(R) ->
  324. case element(tuple_size(R), R) of
  325. '\\_' ->
  326. %% Cannot allow this, since '\\_' is a wildcard for resources
  327. ?THROW_GPROC_ERROR(badarg);
  328. _ ->
  329. Key
  330. end;
  331. valid_key(Key) ->
  332. Key.
  333. ensure_monitor(shared, _) ->
  334. ok;
  335. ensure_monitor(Pid, _) when Pid == self() ->
  336. %% monitoring is ensured through a 'monitor_me' message
  337. ok;
  338. ensure_monitor(Pid, Scope) when Scope==g; Scope==l ->
  339. case ets:insert_new(?TAB, {{Pid, Scope}}) of
  340. false -> ok;
  341. true -> erlang:monitor(process, Pid)
  342. end.
  343. remove_reg(Key, Pid, Event) ->
  344. Reg = remove_reg_1(Key, Pid),
  345. Rev = remove_reverse_mapping(Event, Pid, Key),
  346. [Reg, Rev].
  347. remove_reg(Key, Pid, Event, Opts) ->
  348. Reg = remove_reg_1(Key, Pid),
  349. Rev = remove_reverse_mapping(Event, Pid, Key, Opts),
  350. [Reg, Rev].
  351. remove_reverse_mapping(Event, Pid, Key) ->
  352. Opts = case ets:lookup(?TAB, {Pid, Key}) of
  353. [] -> [];
  354. [{_, r}] -> [];
  355. [{_, L}] when is_list(L) ->
  356. L
  357. end,
  358. remove_reverse_mapping(Event, Pid, Key, Opts).
  359. remove_reverse_mapping(Event, Pid, Key, Opts) when Event==unreg;
  360. element(1,Event)==migrated;
  361. element(1,Event)==failover ->
  362. Rev = {Pid, Key},
  363. _ = notify(Event, Key, Opts),
  364. ets:delete(?TAB, Rev),
  365. Rev.
  366. notify(Key, Opts) ->
  367. notify(unreg, Key, Opts).
  368. monitors(Opts) ->
  369. case lists:keyfind(monitor, 1, Opts) of
  370. false ->
  371. [];
  372. {_, Mons} ->
  373. Mons
  374. end.
  375. standbys(Opts) ->
  376. select_monitors(monitors(Opts), standby, []).
  377. followers(Opts) ->
  378. select_monitors(monitors(Opts), follow, []).
  379. select_monitors([{_,_,Type}=H|T], Type, Acc) ->
  380. select_monitors(T, Type, [H|Acc]);
  381. select_monitors([_|T], Type, Acc) ->
  382. select_monitors(T, Type, Acc);
  383. select_monitors([], _, Acc) ->
  384. Acc.
  385. remove_monitor_pid([{monitor, Mons}|T], Pid) ->
  386. [{monitor, [M || M <- Mons,
  387. element(1, M) =/= Pid]}|T];
  388. remove_monitor_pid([H|T], Pid) ->
  389. [H | remove_monitor_pid(T, Pid)];
  390. remove_monitor_pid([], _) ->
  391. [].
  392. notify([], _, _) ->
  393. ok;
  394. notify(Event, Key, Opts) ->
  395. notify_(monitors(Opts), Event, Key).
  396. %% Also handle old-style monitors
  397. notify_([{Pid,Ref}|T], Event, Key) ->
  398. Pid ! {gproc, Event, Ref, Key},
  399. notify_(T, Event, Key);
  400. notify_([{Pid,Ref,_}|T], Event, {_,l,_} = Key) ->
  401. Pid ! {gproc, Event, Ref, Key},
  402. notify_(T, Event, Key);
  403. notify_([{Pid,Ref,_}|T], Event, {_,g,_} = Key) when node(Pid) == node() ->
  404. Pid ! {gproc, Event, Ref, Key},
  405. notify_(T, Event, Key);
  406. notify_([_|T], Event, Key) ->
  407. notify_(T, Event, Key);
  408. notify_([], _, _) ->
  409. ok.
  410. add_monitor([{monitor, Mons}|T], Pid, Ref, Type) ->
  411. [{monitor, [{Pid,Ref,Type}|Mons]}|T];
  412. add_monitor([H|T], Pid, Ref, Type) ->
  413. [H|add_monitor(T, Pid, Ref, Type)];
  414. add_monitor([], Pid, Ref, Type) ->
  415. [{monitor, [{Pid, Ref, Type}]}].
  416. remove_monitor([{monitor, Mons}|T], Pid, Ref) ->
  417. [{monitor, [Mon || Mon <- Mons, not is_mon(Mon,Pid,Ref)]} | T];
  418. remove_monitor([H|T], Pid, Ref) ->
  419. [H|remove_monitor(T, Pid, Ref)];
  420. remove_monitor([], _Pid, _Ref) ->
  421. [].
  422. is_mon({Pid,Ref,_}, Pid, Ref) -> true;
  423. is_mon({Pid,Ref}, Pid, Ref) -> true;
  424. is_mon(_, _, _) ->
  425. false.
  426. remove_many(T, Scope, L, Pid) ->
  427. lists:flatmap(fun(K) ->
  428. Key = {T, Scope, K},
  429. remove_reg(Key, Pid, unreg, unreg_opts(Key, Pid))
  430. end, L).
  431. unreg_opts(Key, Pid) ->
  432. case ets:lookup(?TAB, {Pid, Key}) of
  433. [] ->
  434. [];
  435. [{_,r}] ->
  436. [];
  437. [{_,Opts}] ->
  438. Opts
  439. end.
  440. remove_reg_1({c,_,_} = Key, Pid) ->
  441. remove_counter_1(Key, ets:lookup_element(?TAB, Reg = {Key,Pid}, 3), Pid),
  442. Reg;
  443. remove_reg_1({r,_,_} = Key, Pid) ->
  444. remove_resource_1(Key, ets:lookup_element(?TAB, Reg = {Key,Pid}, 3), Pid),
  445. Reg;
  446. remove_reg_1({T,_,_} = Key, _Pid) when T==a; T==n; T==rc ->
  447. ets:delete(?TAB, Reg = {Key,T}),
  448. Reg;
  449. remove_reg_1({_,_,_} = Key, Pid) ->
  450. ets:delete(?TAB, Reg = {Key, Pid}),
  451. Reg.
  452. remove_counter_1({c,C,N} = Key, Val, Pid) ->
  453. Res = ets:delete(?TAB, {Key, Pid}),
  454. update_aggr_counter(C, N, -Val),
  455. Res.
  456. remove_resource_1({r,C,N} = Key, _, Pid) ->
  457. Res = ets:delete(?TAB, {Key, Pid}),
  458. update_resource_count(C, N, -1),
  459. Res.
  460. do_set_value({T,_,_} = Key, Value, Pid) ->
  461. K2 = if Pid == shared -> shared;
  462. T==n orelse T==a orelse T==rc -> T;
  463. true -> Pid
  464. end,
  465. try ets:lookup_element(?TAB, {Key,K2}, 2) of
  466. Pid ->
  467. ets:insert(?TAB, {{Key, K2}, Pid, Value});
  468. _ ->
  469. false
  470. catch
  471. error:_ -> false
  472. end.
  473. do_set_counter_value({_,C,N} = Key, Value, Pid) ->
  474. OldVal = ets:lookup_element(?TAB, {Key, Pid}, 3), % may fail with badarg
  475. Res = ets:insert(?TAB, {{Key, Pid}, Pid, Value}),
  476. update_aggr_counter(C, N, Value - OldVal),
  477. Res.
  478. update_counter({T,l,Ctr} = Key, Incr, Pid) when is_integer(Incr), T==c;
  479. is_integer(Incr), T==n ->
  480. Res = ets:update_counter(?TAB, {Key, Pid}, {3,Incr}),
  481. if T==c ->
  482. update_aggr_counter(l, Ctr, Incr);
  483. true ->
  484. ok
  485. end,
  486. Res;
  487. update_counter({T,l,Ctr} = Key, {Incr, Threshold, SetValue}, Pid)
  488. when is_integer(Incr), is_integer(Threshold), is_integer(SetValue), T==c;
  489. is_integer(Incr), is_integer(Threshold), is_integer(SetValue), T==n ->
  490. [Prev, New] = ets:update_counter(?TAB, {Key, Pid},
  491. [{3, 0}, {3, Incr, Threshold, SetValue}]),
  492. if T==c ->
  493. update_aggr_counter(l, Ctr, New - Prev);
  494. true ->
  495. ok
  496. end,
  497. New;
  498. update_counter({T,l,Ctr} = Key, Ops, Pid) when is_list(Ops), T==c;
  499. is_list(Ops), T==r;
  500. is_list(Ops), T==n ->
  501. case ets:update_counter(?TAB, {Key, Pid},
  502. [{3, 0} | expand_ops(Ops)]) of
  503. [_] ->
  504. [];
  505. [Prev | Rest] ->
  506. [New | _] = lists:reverse(Rest),
  507. if T==c ->
  508. update_aggr_counter(l, Ctr, New - Prev);
  509. true ->
  510. ok
  511. end,
  512. Rest
  513. end;
  514. update_counter(_, _, _) ->
  515. ?THROW_GPROC_ERROR(badarg).
  516. expand_ops([{Incr,Thr,SetV}|T])
  517. when is_integer(Incr), is_integer(Thr), is_integer(SetV) ->
  518. [{3, Incr, Thr, SetV}|expand_ops(T)];
  519. expand_ops([Incr|T]) when is_integer(Incr) ->
  520. [{3, Incr}|expand_ops(T)];
  521. expand_ops([]) ->
  522. [];
  523. expand_ops(_) ->
  524. ?THROW_GPROC_ERROR(badarg).
  525. update_aggr_counter(C, N, Val) ->
  526. ?MAY_FAIL(ets:update_counter(?TAB, {{a,C,N},a}, {3, Val})).
  527. decrement_resource_count(C, N) ->
  528. update_resource_count(C, N, -1).
  529. update_resource_count(C, N, Val) ->
  530. update_resource_count_(C, N, Val),
  531. case is_tuple(N) of
  532. true -> update_resource_count_(
  533. C, setelement(size(N), N, '\\_'), Val);
  534. false -> ok
  535. end.
  536. update_resource_count_(C, N, Val) ->
  537. try ets:update_counter(?TAB, {{rc,C,N},rc}, {3, Val}) of
  538. 0 ->
  539. resource_count_zero(C, N);
  540. _ ->
  541. ok
  542. catch
  543. _:_ -> ok
  544. end.
  545. resource_count_zero(C, N) ->
  546. case ets:lookup(?TAB, {K = {rc,C,N},rc}) of
  547. [{_, Pid, _}] ->
  548. case get_attr(on_zero, Pid, K, undefined) of
  549. undefined -> ok;
  550. Actions ->
  551. perform_on_zero(Actions, C, N, Pid)
  552. end;
  553. _ -> ok
  554. end.
  555. perform_on_zero(Actions, C, N, Pid) ->
  556. lists:foreach(
  557. fun(A) ->
  558. try perform_on_zero_(A, C, N, Pid)
  559. catch error:_ -> ignore
  560. end
  561. end, Actions).
  562. perform_on_zero_({send, ToProc}, C, N, Pid) ->
  563. gproc:send(ToProc, {gproc, resource_on_zero, C, N, Pid}),
  564. ok;
  565. perform_on_zero_({bcast, ToProc}, C, N, Pid) ->
  566. gproc:bcast(ToProc, {gproc, resource_on_zero, C, N, Pid}),
  567. ok;
  568. perform_on_zero_(publish, C, N, Pid) ->
  569. gproc_ps:publish(C, gproc_resource_on_zero, {C, N, Pid}),
  570. ok;
  571. perform_on_zero_({unreg_shared, T,N}, C, _, _) ->
  572. K = {T, C, N},
  573. case ets:member(?TAB, {K, shared}) of
  574. true ->
  575. Objs = remove_reg(K, shared, unreg),
  576. _ = if C == g -> self() ! {gproc_unreg, Objs};
  577. true -> ok
  578. end,
  579. ok;
  580. false ->
  581. ok
  582. end;
  583. perform_on_zero_(_, _, _, _) ->
  584. ok.
  585. scan_existing_counters(Ctxt, Name) ->
  586. Head = {{{c,Ctxt,Name},'_'},'_','$1'},
  587. Cs = ets:select(?TAB, [{Head, [], ['$1']}]),
  588. lists:sum(Cs).
  589. scan_existing_resources(Ctxt, Name) ->
  590. Head = {{{r,Ctxt,adjust_wild(Name)},'_'},'_','_'},
  591. ets:select_count(?TAB, [{Head, [], [true]}]).
  592. adjust_wild(N) when is_tuple(N) ->
  593. Sz = size(N),
  594. case element(Sz, N) of
  595. '\\_' -> setelement(Sz, N, '_');
  596. _ -> N
  597. end;
  598. adjust_wild(N) ->
  599. N.
  600. valid_opts(Type, Default) ->
  601. Opts = get_app_env(Type, Default),
  602. check_opts(Type, Opts).
  603. check_opts(Type, Opts) when is_list(Opts) ->
  604. Check = check_option_f(Type),
  605. lists:map(fun(X) ->
  606. case Check(X) of
  607. true -> X;
  608. false ->
  609. erlang:error({illegal_option, X}, [Type, Opts])
  610. end
  611. end, Opts);
  612. check_opts(Type, Other) ->
  613. erlang:error(invalid_options, [Type, Other]).
  614. check_option_f(ets_options) -> fun check_ets_option/1;
  615. check_option_f(server_options) -> fun check_server_option/1.
  616. check_ets_option({read_concurrency , B}) -> is_boolean(B);
  617. check_ets_option({write_concurrency, B}) -> is_boolean(B);
  618. check_ets_option(_) -> false.
  619. check_server_option({priority, P}) ->
  620. %% Forbid setting priority to 'low' since that would
  621. %% surely cause problems. Unsure about 'max'...
  622. lists:member(P, [normal, high, max]);
  623. check_server_option(_) ->
  624. %% assume it's a valid spawn option
  625. true.
  626. get_app_env(Key, Default) ->
  627. case application:get_env(Key) of
  628. undefined -> Default;
  629. {ok, undefined} -> Default;
  630. {ok, Value} -> Value
  631. end.