gproc_lib.erl 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. %% ``The contents of this file are subject to the Erlang Public License,
  2. %% Version 1.1, (the "License"); you may not use this file except in
  3. %% compliance with the License. You should have received a copy of the
  4. %% Erlang Public License along with this software. If not, it can be
  5. %% retrieved via the world wide web at http://www.erlang.org/.
  6. %%
  7. %% Software distributed under the License is distributed on an "AS IS"
  8. %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  9. %% the License for the specific language governing rights and limitations
  10. %% under the License.
  11. %%
  12. %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
  13. %% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
  14. %% AB. All Rights Reserved.''
  15. %%
  16. %% @author Ulf Wiger <ulf.wiger@ericsson.com>
  17. %%
  18. %% @doc Extended process registry
  19. %% <p>This module implements an extended process registry</p>
  20. %% <p>For a detailed description, see gproc/doc/erlang07-wiger.pdf.</p>
  21. %% @end
  22. -module(gproc_lib).
  23. -compile(export_all).
  24. -include("gproc.hrl").
  25. %% We want to store names and aggregated counters with the same
  26. %% structure as properties, but at the same time, we must ensure
  27. %% that the key is unique. We replace the Pid in the key part
  28. %% with an atom. To know which Pid owns the object, we lug the
  29. %% Pid around as payload as well. This is a bit redundant, but
  30. %% symmetric.
  31. %%
  32. -spec insert_reg(key(), any(), pid(), scope()) -> boolean().
  33. insert_reg({T,_,Name} = K, Value, Pid, Scope) when T==a; T==n ->
  34. MaybeScan = fun() ->
  35. if T==a ->
  36. Initial = scan_existing_counters(Scope, Name),
  37. ets:insert(?TAB, {{K,a}, Pid, Initial});
  38. true ->
  39. true
  40. end
  41. end,
  42. Info = [{{K, T}, Pid, Value}, {{Pid,K},r}],
  43. case ets:insert_new(?TAB, Info) of
  44. true ->
  45. MaybeScan();
  46. false ->
  47. if T==n ->
  48. maybe_waiters(K, Pid, Value, T, Info);
  49. true ->
  50. false
  51. end
  52. end;
  53. insert_reg({c,Scope,Ctr} = Key, Value, Pid, Scope) when Scope==l; Scope==g ->
  54. %% Non-unique keys; store Pid in the key part
  55. K = {Key, Pid},
  56. Kr = {Pid, Key},
  57. Res = ets:insert_new(?TAB, [{K, Pid, Value}, {Kr,r}]),
  58. case Res of
  59. true ->
  60. update_aggr_counter(Scope, Ctr, Value);
  61. false ->
  62. ignore
  63. end,
  64. Res;
  65. insert_reg(Key, Value, Pid, _Scope) ->
  66. %% Non-unique keys; store Pid in the key part
  67. K = {Key, Pid},
  68. Kr = {Pid, Key},
  69. ets:insert_new(?TAB, [{K, Pid, Value}, {Kr,r}]).
  70. -spec insert_many(type(), scope(), [{key(),any()}], pid()) ->
  71. {true,list()} | false.
  72. insert_many(T, Scope, KVL, Pid) ->
  73. Objs = mk_reg_objs(T, Scope, Pid, KVL),
  74. case ets:insert_new(?TAB, Objs) of
  75. true ->
  76. RevObjs = mk_reg_rev_objs(T, Scope, Pid, KVL),
  77. ets:insert(?TAB, RevObjs),
  78. gproc_lib:ensure_monitor(Pid, Scope),
  79. {true, Objs};
  80. false ->
  81. Existing = [{Obj, ets:lookup(?TAB, K)} || {K,_,_} = Obj <- Objs],
  82. case lists:any(fun({_, [{_, _, _}]}) ->
  83. true;
  84. (_) ->
  85. %% (not found), or waiters registered
  86. false
  87. end, Existing) of
  88. true ->
  89. %% conflict; return 'false', indicating failure
  90. false;
  91. false ->
  92. %% possibly waiters, but they are handled in next step
  93. insert_objects(Existing),
  94. gproc_lib:ensure_monitor(Pid, Scope),
  95. {true, Objs}
  96. end
  97. end.
  98. -spec insert_objects([{key(), pid(), any()}]) -> ok.
  99. insert_objects(Objs) ->
  100. lists:foreach(
  101. fun({{{Id,_} = _K, Pid, V} = Obj, Existing}) ->
  102. ets:insert(?TAB, [Obj, {{Pid, Id}, r}]),
  103. case Existing of
  104. [] -> ok;
  105. [{_, Waiters}] ->
  106. notify_waiters(Waiters, Id, Pid, V)
  107. end
  108. end, Objs).
  109. await({T,C,_} = Key, WPid, {_Pid, Ref} = From) ->
  110. Rev = {{WPid,Key}, r},
  111. case ets:lookup(?TAB, {Key,T}) of
  112. [{_, P, Value}] ->
  113. %% for symmetry, we always reply with Ref and then send a message
  114. if C == g ->
  115. %% in the global case, we bundle the reply, since otherwise
  116. %% the messages can pass each other
  117. {reply, {Ref, {Key, P, Value}}};
  118. true ->
  119. gen_server:reply(From, Ref),
  120. WPid ! {gproc, Ref, registered, {Key, P, Value}},
  121. noreply
  122. end;
  123. [{K, Waiters}] ->
  124. NewWaiters = [{WPid,Ref} | Waiters],
  125. W = {K, NewWaiters},
  126. ets:insert(?TAB, [W, Rev]),
  127. gproc_lib:ensure_monitor(WPid,C),
  128. {reply, Ref, [W,Rev]};
  129. [] ->
  130. W = {{Key,T}, [{WPid,Ref}]},
  131. ets:insert(?TAB, [W, Rev]),
  132. gproc_lib:ensure_monitor(WPid,C),
  133. {reply, Ref, [W,Rev]}
  134. end.
  135. maybe_waiters(K, Pid, Value, T, Info) ->
  136. case ets:lookup(?TAB, {K,T}) of
  137. [{_, Waiters}] when is_list(Waiters) ->
  138. ets:insert(?TAB, Info),
  139. notify_waiters(Waiters, K, Pid, Value),
  140. true;
  141. [_] ->
  142. false
  143. end.
  144. -spec notify_waiters([{pid(), reference()}], key(), pid(), any()) -> ok.
  145. notify_waiters(Waiters, K, Pid, V) ->
  146. [begin
  147. P ! {gproc, Ref, registered, {K, Pid, V}},
  148. ets:delete(?TAB, {P, K})
  149. end || {P, Ref} <- Waiters],
  150. ok.
  151. mk_reg_objs(T, Scope, Pid, L) when T==n; T==a ->
  152. lists:map(fun({K,V}) ->
  153. {{{T,Scope,K},T}, Pid, V};
  154. (_) ->
  155. erlang:error(badarg)
  156. end, L);
  157. mk_reg_objs(p = T, Scope, Pid, L) ->
  158. lists:map(fun({K,V}) ->
  159. {{{T,Scope,K},Pid}, Pid, V};
  160. (_) ->
  161. erlang:error(badarg)
  162. end, L).
  163. mk_reg_rev_objs(T, Scope, Pid, L) ->
  164. [{{Pid,{T,Scope,K}},r} || {K,_} <- L].
  165. ensure_monitor(Pid, Scope) when Scope==g; Scope==l ->
  166. case node(Pid) == node() andalso ets:insert_new(?TAB, {{Pid, Scope}}) of
  167. false -> ok;
  168. true -> erlang:monitor(process, Pid)
  169. end.
  170. remove_reg(Key, Pid) ->
  171. remove_reg_1(Key, Pid),
  172. ets:delete(?TAB, {Pid,Key}).
  173. remove_reg_1({c,_,_} = Key, Pid) ->
  174. remove_counter_1(Key, ets:lookup_element(?TAB, {Key,Pid}, 3), Pid);
  175. remove_reg_1({a,_,_} = Key, _Pid) ->
  176. ets:delete(?TAB, {Key,a});
  177. remove_reg_1({n,_,_} = Key, _Pid) ->
  178. ets:delete(?TAB, {Key,n});
  179. remove_reg_1({_,_,_} = Key, Pid) ->
  180. ets:delete(?TAB, {Key, Pid}).
  181. remove_counter_1({c,C,N} = Key, Val, Pid) ->
  182. Res = ets:delete(?TAB, {Key, Pid}),
  183. update_aggr_counter(C, N, -Val),
  184. Res.
  185. do_set_value({T,_,_} = Key, Value, Pid) ->
  186. K2 = if T==n orelse T==a -> T;
  187. true -> Pid
  188. end,
  189. case (catch ets:lookup_element(?TAB, {Key,K2}, 2)) of
  190. {'EXIT', {badarg, _}} ->
  191. false;
  192. Pid ->
  193. ets:insert(?TAB, {{Key, K2}, Pid, Value});
  194. _ ->
  195. false
  196. end.
  197. do_set_counter_value({_,C,N} = Key, Value, Pid) ->
  198. OldVal = ets:lookup_element(?TAB, {Key, Pid}, 3), % may fail with badarg
  199. Res = ets:insert(?TAB, {{Key, Pid}, Pid, Value}),
  200. update_aggr_counter(C, N, Value - OldVal),
  201. Res.
  202. update_counter({c,l,Ctr} = Key, Incr, Pid) ->
  203. Res = ets:update_counter(?TAB, {Key, Pid}, {3,Incr}),
  204. update_aggr_counter(l, Ctr, Incr),
  205. Res.
  206. update_aggr_counter(C, N, Val) ->
  207. catch ets:update_counter(?TAB, {{a,C,N},a}, {3, Val}).
  208. %% cleanup_counter({c,g,N}=K, Pid, Acc) ->
  209. %% remove_reg(K,Pid),
  210. %% case ets:lookup(?TAB, {{a,g,N},a}) of
  211. %% [Aggr] ->
  212. %% [Aggr|Acc];
  213. %% [] ->
  214. %% Acc
  215. %% end;
  216. %% cleanup_counter(K, Pid, Acc) ->
  217. %% remove_reg(K,Pid),
  218. %% Acc.
  219. scan_existing_counters(Ctxt, Name) ->
  220. Head = {{{c,Ctxt,Name},'_'},'_','$1'},
  221. Cs = ets:select(?TAB, [{Head, [], ['$1']}]),
  222. lists:sum(Cs).