gproc_lib.erl 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. %% ``The contents of this file are subject to the Erlang Public License,
  2. %% Version 1.1, (the "License"); you may not use this file except in
  3. %% compliance with the License. You should have received a copy of the
  4. %% Erlang Public License along with this software. If not, it can be
  5. %% retrieved via the world wide web at http://www.erlang.org/.
  6. %%
  7. %% Software distributed under the License is distributed on an "AS IS"
  8. %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  9. %% the License for the specific language governing rights and limitations
  10. %% under the License.
  11. %%
  12. %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
  13. %% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
  14. %% AB. All Rights Reserved.''
  15. %%
  16. %% @author Ulf Wiger <ulf.wiger@ericsson.com>
  17. %%
  18. %% @doc Extended process registry
  19. %% <p>This module implements an extended process registry</p>
  20. %% <p>For a detailed description, see gproc/doc/erlang07-wiger.pdf.</p>
  21. %% @end
  22. -module(gproc_lib).
  23. -compile(export_all).
  24. -include("gproc.hrl").
  25. %% We want to store names and aggregated counters with the same
  26. %% structure as properties, but at the same time, we must ensure
  27. %% that the key is unique. We replace the Pid in the key part
  28. %% with an atom. To know which Pid owns the object, we lug the
  29. %% Pid around as payload as well. This is a bit redundant, but
  30. %% symmetric.
  31. %%
  32. -spec insert_reg(key(), any(), pid(), scope()) -> boolean().
  33. insert_reg({T,_,Name} = K, Value, Pid, Scope) when T==a; T==n ->
  34. MaybeScan = fun() ->
  35. if T==a ->
  36. Initial = scan_existing_counters(Scope, Name),
  37. ets:insert(?TAB, {{K,a}, Pid, Initial});
  38. true ->
  39. true
  40. end
  41. end,
  42. Info = [{{K, T}, Pid, Value}, {{Pid,K},r}],
  43. case ets:insert_new(?TAB, Info) of
  44. true ->
  45. MaybeScan();
  46. false ->
  47. if T==n ->
  48. maybe_waiters(K, Pid, Value, T, Info);
  49. true ->
  50. false
  51. end
  52. end;
  53. insert_reg({c,Scope,Ctr} = Key, Value, Pid, Scope) when Scope==l; Scope==g ->
  54. %% Non-unique keys; store Pid in the key part
  55. K = {Key, Pid},
  56. Kr = {Pid, Key},
  57. Res = ets:insert_new(?TAB, [{K, Pid, Value}, {Kr,r}]),
  58. case Res of
  59. true ->
  60. update_aggr_counter(Scope, Ctr, Value);
  61. false ->
  62. ignore
  63. end,
  64. Res;
  65. insert_reg(Key, Value, Pid, _Scope) ->
  66. %% Non-unique keys; store Pid in the key part
  67. K = {Key, Pid},
  68. Kr = {Pid, Key},
  69. ets:insert_new(?TAB, [{K, Pid, Value}, {Kr,r}]).
  70. -spec insert_many(type(), scope(), [{key(),any()}], pid()) ->
  71. {true,list()} | false.
  72. insert_many(T, Scope, KVL, Pid) ->
  73. Objs = mk_reg_objs(T, Scope, Pid, KVL),
  74. case ets:insert_new(?TAB, Objs) of
  75. true ->
  76. RevObjs = mk_reg_rev_objs(T, Scope, Pid, KVL),
  77. ets:insert(?TAB, RevObjs),
  78. gproc_lib:ensure_monitor(Pid, Scope),
  79. {true, Objs};
  80. false ->
  81. Existing = [{Obj, ets:lookup(?TAB, K)} || {K,_,_} = Obj <- Objs],
  82. case lists:any(fun({_, [{_, _, _}]}) ->
  83. true;
  84. (_) ->
  85. %% (not found), or waiters registered
  86. false
  87. end, Existing) of
  88. true ->
  89. %% conflict; return 'false', indicating failure
  90. false;
  91. false ->
  92. %% possibly waiters, but they are handled in next step
  93. insert_objects(Existing),
  94. gproc_lib:ensure_monitor(Pid, Scope),
  95. {true, Objs}
  96. end
  97. end.
  98. -spec insert_objects([{key(), pid(), any()}]) -> ok.
  99. insert_objects(Objs) ->
  100. lists:foreach(
  101. fun({{{Id,_} = K, Pid, V} = Obj, Existing}) ->
  102. ets:insert(?TAB, [Obj, {{Pid, Id}, r}]),
  103. case Existing of
  104. [] -> ok;
  105. [{_, Waiters}] ->
  106. notify_waiters(Waiters, Id, Pid, V)
  107. end
  108. end, Objs).
  109. await({T,C,_} = Key, {Pid, Ref} = From) ->
  110. Rev = {{Pid,Key}, r},
  111. case ets:lookup(?TAB, {Key,T}) of
  112. [{_, P, Value}] ->
  113. %% for symmetry, we always reply with Ref and then send a message
  114. gen_server:reply(From, Ref),
  115. Pid ! {gproc, Ref, registered, {Key, P, Value}},
  116. noreply;
  117. [{K, Waiters}] ->
  118. NewWaiters = [{Pid,Ref} | Waiters],
  119. W = {K, NewWaiters},
  120. ets:insert(?TAB, [W, Rev]),
  121. gproc_lib:ensure_monitor(Pid,C),
  122. {reply, Ref, [W,Rev]};
  123. [] ->
  124. W = {{Key,T}, [{Pid,Ref}]},
  125. ets:insert(?TAB, [W, Rev]),
  126. gproc_lib:ensure_monitor(Pid,C),
  127. {reply, Ref, [W,Rev]}
  128. end.
  129. maybe_waiters(K, Pid, Value, T, Info) ->
  130. case ets:lookup(?TAB, {K,T}) of
  131. [{_, Waiters}] when is_list(Waiters) ->
  132. ets:insert(?TAB, Info),
  133. notify_waiters(Waiters, K, Pid, Value),
  134. true;
  135. [_] ->
  136. false
  137. end.
  138. -spec notify_waiters([{pid(), reference()}], key(), pid(), any()) -> ok.
  139. notify_waiters(Waiters, K, Pid, V) ->
  140. [begin
  141. P ! {gproc, Ref, registered, {K, Pid, V}},
  142. ets:delete(?TAB, {P, K})
  143. end || {P, Ref} <- Waiters],
  144. ok.
  145. mk_reg_objs(T, Scope, Pid, L) when T==n; T==a ->
  146. lists:map(fun({K,V}) ->
  147. {{{T,Scope,K},T}, Pid, V};
  148. (_) ->
  149. erlang:error(badarg)
  150. end, L);
  151. mk_reg_objs(p = T, Scope, Pid, L) ->
  152. lists:map(fun({K,V}) ->
  153. {{{T,Scope,K},Pid}, Pid, V};
  154. (_) ->
  155. erlang:error(badarg)
  156. end, L).
  157. mk_reg_rev_objs(T, Scope, Pid, L) ->
  158. [{{Pid,{T,Scope,K}},r} || {K,_} <- L].
  159. ensure_monitor(Pid, Scope) when Scope==g; Scope==l ->
  160. case node(Pid) == node() andalso ets:insert_new(?TAB, {{Pid, Scope}}) of
  161. false -> ok;
  162. true -> erlang:monitor(process, Pid)
  163. end.
  164. remove_reg(Key, Pid) ->
  165. remove_reg_1(Key, Pid),
  166. ets:delete(?TAB, {Pid,Key}).
  167. remove_reg_1({c,_,_} = Key, Pid) ->
  168. remove_counter_1(Key, ets:lookup_element(?TAB, {Key,Pid}, 3), Pid);
  169. remove_reg_1({a,_,_} = Key, _Pid) ->
  170. ets:delete(?TAB, {Key,a});
  171. remove_reg_1({n,_,_} = Key, _Pid) ->
  172. ets:delete(?TAB, {Key,n});
  173. remove_reg_1({_,_,_} = Key, Pid) ->
  174. ets:delete(?TAB, {Key, Pid}).
  175. remove_counter_1({c,C,N} = Key, Val, Pid) ->
  176. Res = ets:delete(?TAB, {Key, Pid}),
  177. update_aggr_counter(C, N, -Val),
  178. Res.
  179. do_set_value({T,_,_} = Key, Value, Pid) ->
  180. K2 = if T==n orelse T==a -> T;
  181. true -> Pid
  182. end,
  183. case (catch ets:lookup_element(?TAB, {Key,K2}, 2)) of
  184. {'EXIT', {badarg, _}} ->
  185. false;
  186. Pid ->
  187. ets:insert(?TAB, {{Key, K2}, Pid, Value});
  188. _ ->
  189. false
  190. end.
  191. do_set_counter_value({_,C,N} = Key, Value, Pid) ->
  192. OldVal = ets:lookup_element(?TAB, {Key, Pid}, 3), % may fail with badarg
  193. Res = ets:insert(?TAB, {{Key, Pid}, Pid, Value}),
  194. update_aggr_counter(C, N, Value - OldVal),
  195. Res.
  196. update_counter({c,l,Ctr} = Key, Incr, Pid) ->
  197. Res = ets:update_counter(?TAB, {Key, Pid}, {3,Incr}),
  198. update_aggr_counter(l, Ctr, Incr),
  199. Res.
  200. update_aggr_counter(C, N, Val) ->
  201. catch ets:update_counter(?TAB, {{a,C,N},a}, {3, Val}).
  202. %% cleanup_counter({c,g,N}=K, Pid, Acc) ->
  203. %% remove_reg(K,Pid),
  204. %% case ets:lookup(?TAB, {{a,g,N},a}) of
  205. %% [Aggr] ->
  206. %% [Aggr|Acc];
  207. %% [] ->
  208. %% Acc
  209. %% end;
  210. %% cleanup_counter(K, Pid, Acc) ->
  211. %% remove_reg(K,Pid),
  212. %% Acc.
  213. scan_existing_counters(Ctxt, Name) ->
  214. Head = {{{c,Ctxt,Name},'_'},'_','$1'},
  215. Cs = ets:select(?TAB, [{Head, [], ['$1']}]),
  216. lists:sum(Cs).