gproc_dist.erl 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. %% ``The contents of this file are subject to the Erlang Public License,
  2. %% Version 1.1, (the "License"); you may not use this file except in
  3. %% compliance with the License. You should have received a copy of the
  4. %% Erlang Public License along with this software. If not, it can be
  5. %% retrieved via the world wide web at http://www.erlang.org/.
  6. %%
  7. %% Software distributed under the License is distributed on an "AS IS"
  8. %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  9. %% the License for the specific language governing rights and limitations
  10. %% under the License.
  11. %%
  12. %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
  13. %% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
  14. %% AB. All Rights Reserved.''
  15. %%
  16. %% @author Ulf Wiger <ulf.wiger@ericsson.com>
  17. %%
  18. %% @doc Extended process registry
  19. %% <p>This module implements an extended process registry</p>
  20. %% <p>For a detailed description, see gproc/doc/erlang07-wiger.pdf.</p>
  21. %% @end
  22. -module(gproc_dist).
  23. -behaviour(gen_leader).
  24. -export([start_link/0, start_link/1,
  25. reg/1, reg/2, unreg/1,
  26. mreg/2,
  27. set_value/2,
  28. update_counter/2]).
  29. -export([leader_call/1, leader_cast/1]).
  30. %%% internal exports
  31. -export([init/1,
  32. handle_cast/3,
  33. handle_call/4,
  34. handle_info/2,
  35. handle_leader_call/4,
  36. handle_leader_cast/3,
  37. handle_DOWN/3,
  38. elected/2, % original version
  39. elected/3,
  40. surrendered/3,
  41. from_leader/3,
  42. code_change/4,
  43. terminate/2]).
  44. -include("gproc.hrl").
  45. -define(SERVER, ?MODULE).
  46. -record(state, {
  47. always_broadcast = false,
  48. is_leader}).
  49. start_link() ->
  50. start_link({[node()|nodes()], []}).
  51. start_link(Nodes) when is_list(Nodes) ->
  52. start_link({Nodes, []});
  53. start_link({Nodes, Opts}) ->
  54. gen_leader:start_link(
  55. ?SERVER, Nodes, Opts, ?MODULE, [], []).
  56. %% ?SERVER, Nodes, [],?MODULE, [], [{debug,[trace]}]).
  57. reg(Key) ->
  58. reg(Key, gproc:default(Key)).
  59. %%% @spec({Class,Scope, Key}, Value) -> true
  60. %%% @doc
  61. %%% Class = n - unique name
  62. %%% | p - non-unique property
  63. %%% | c - counter
  64. %%% | a - aggregated counter
  65. %%% Scope = l | g (global or local)
  66. %%%
  67. reg({_,g,_} = Key, Value) ->
  68. %% anything global
  69. leader_call({reg, Key, Value, self()});
  70. reg(_, _) ->
  71. erlang:error(badarg).
  72. mreg(T, KVL) ->
  73. if is_list(KVL) -> leader_call({mreg, T, g, KVL, self()});
  74. true -> erlang:error(badarg)
  75. end.
  76. unreg({_,g,_} = Key) ->
  77. leader_call({unreg, Key, self()});
  78. unreg(_) ->
  79. erlang:error(badarg).
  80. set_value({T,g,_} = Key, Value) when T==a; T==c ->
  81. if is_integer(Value) ->
  82. leader_call({set, Key, Value});
  83. true ->
  84. erlang:error(badarg)
  85. end;
  86. set_value({_,g,_} = Key, Value) ->
  87. leader_call({set, Key, Value, self()});
  88. set_value(_, _) ->
  89. erlang:error(badarg).
  90. update_counter({c,g,_} = Key, Incr) when is_integer(Incr) ->
  91. leader_call({update_counter, Key, Incr, self()});
  92. update_counter(_, _) ->
  93. erlang:error(badarg).
  94. %%% ==========================================================
  95. handle_cast(_Msg, S, _) ->
  96. {stop, unknown_cast, S}.
  97. handle_call(_, _, S, _) ->
  98. {reply, badarg, S}.
  99. handle_info({'DOWN', _MRef, process, Pid, _}, S) ->
  100. leader_cast({pid_is_DOWN, Pid}),
  101. %% ets:select_delete(?TAB, [{{{Pid,'_'}}, [], [true]}]),
  102. %% ets:delete(?TAB, Pid),
  103. %% lists:foreach(fun(Key) -> gproc_lib:remove_reg_1(Key, Pid) end, Keys),
  104. {ok, S};
  105. handle_info(_, S) ->
  106. {ok, S}.
  107. elected(S, _E) ->
  108. io:fwrite("elected(_, E = ~p)~n", [_E]),
  109. {ok, {globals,globs()}, S#state{is_leader = true}}.
  110. elected(S, _E, undefined) ->
  111. %% I have become leader; full synch
  112. {ok, {globals, globs()}, S#state{is_leader = true}};
  113. elected(S, _E, _Node) ->
  114. Synch = {globals, globs()},
  115. if not S#state.always_broadcast ->
  116. %% Another node recognized us as the leader.
  117. %% Don't broadcast all data to everyone else
  118. {reply, Synch, S};
  119. true ->
  120. %% Main reason for doing this is if we are using a gen_leader
  121. %% that doesn't support the 'reply' return value
  122. {ok, Synch, S}
  123. end.
  124. globs() ->
  125. ets:select(?TAB, [{{{{'_',g,'_'},'_'},'_','_'},[],['$_']}]).
  126. surrendered(S, {globals, Globs}, _E) ->
  127. %% globals from this node should be more correct in our table than
  128. %% in the leader's
  129. surrendered_1(Globs),
  130. {ok, S#state{is_leader = false}}.
  131. handle_DOWN(Node, S, _E) ->
  132. Head = {{{'_',g,'_'},'_'},'$1','_'},
  133. Gs = [{'==', {node,'$1'},Node}],
  134. Globs = ets:select(?TAB, [{Head, Gs, [{{{element,1,{element,1,'$_'}},
  135. {element,2,'$_'}}}]}]),
  136. io:fwrite("handle_DOWN(~p); Globs = ~p~n", [Node, Globs]),
  137. case process_globals(Globs) of
  138. [] ->
  139. {ok, S};
  140. Broadcast ->
  141. {ok, Broadcast, S}
  142. end.
  143. %% ets:select_delete(?TAB, [{Head, Gs, [true]}]),
  144. %% {ok, [{delete, Globs}], S}.
  145. handle_leader_call({reg, {C,g,Name} = K, Value, Pid}, _From, S, _E) ->
  146. case gproc_lib:insert_reg(K, Value, Pid, g) of
  147. false ->
  148. {reply, badarg, S};
  149. true ->
  150. gproc_lib:ensure_monitor(Pid,g),
  151. Vals =
  152. if C == a ->
  153. ets:lookup(?TAB, {K,a});
  154. C == c ->
  155. [{{K,Pid},Pid,Value} | ets:lookup(?TAB,{{a,g,Name},a})];
  156. C == n ->
  157. [{{K,n},Pid,Value}];
  158. true ->
  159. [{{K,Pid},Pid,Value}]
  160. end,
  161. {reply, true, [{insert, Vals}], S}
  162. end;
  163. handle_leader_call({update_counter, {c,g,Ctr} = Key, Incr, Pid}, _From, S, _E)
  164. when is_integer(Incr) ->
  165. try New = ets:update_counter(?TAB, {Key, Pid}, {3,Incr}),
  166. Vals = [{{Key,Pid},Pid,New} | update_aggr_counter({c,g,Ctr}, Incr)],
  167. {reply, New, [{insert, Vals}], S}
  168. catch
  169. error:_ ->
  170. {reply, badarg, S}
  171. end;
  172. handle_leader_call({unreg, {T,g,Name} = K, Pid}, _From, S, _E) ->
  173. Key = if T == n; T == a -> {K,T};
  174. true -> {K, Pid}
  175. end,
  176. case ets:member(?TAB, Key) of
  177. true ->
  178. gproc_lib:remove_reg(K, Pid),
  179. if T == c ->
  180. case ets:lookup(?TAB, {{a,g,Name},a}) of
  181. [Aggr] ->
  182. %% updated by remove_reg/2
  183. {reply, true, [{delete,[{K,Pid}]},
  184. {insert, [Aggr]}], S};
  185. [] ->
  186. {reply, true, [{delete, [{K, Pid}]}], S}
  187. end;
  188. true ->
  189. {reply, true, [{delete, [{K,Pid}]}], S}
  190. end;
  191. false ->
  192. {reply, badarg, S}
  193. end;
  194. handle_leader_call({mreg, T, g, L, Pid}, _From, S, _E) ->
  195. if T==p; T==n ->
  196. try gproc_lib:insert_many(T, g, Pid, L) of
  197. {true,Objs} -> {reply, true, [{insert,Objs}], S};
  198. false -> {reply, badarg, S}
  199. catch
  200. error:_ -> {reply, badarg, S}
  201. end;
  202. true -> {reply, badarg, S}
  203. end;
  204. handle_leader_call({set,{T,g,N} =K,V,Pid}, _From, S, _E) ->
  205. if T == a ->
  206. if is_integer(V) ->
  207. case gproc_lib:do_set_value(K, V, Pid) of
  208. true -> {reply, true, [{insert,[{{K,T},Pid,V}]}], S};
  209. false -> {reply, badarg, S}
  210. end
  211. end;
  212. T == c ->
  213. try gproc_lib:do_set_counter_value(K, V, Pid),
  214. AKey = {{a,g,N},a},
  215. Aggr = ets:lookup(?TAB, AKey), % may be []
  216. {reply, true, [{insert, [{{K,Pid},Pid,V} | Aggr]}], S}
  217. catch
  218. error:_ ->
  219. {reply, badarg, S}
  220. end;
  221. true ->
  222. case gproc_lib:do_set_value(K, V, Pid) of
  223. true ->
  224. Obj = if T==n -> {{K, T}, Pid, V};
  225. true -> {{K, Pid}, Pid, V}
  226. end,
  227. {reply, true, [{insert,[Obj]}], S};
  228. false ->
  229. {reply, badarg, S}
  230. end
  231. end;
  232. handle_leader_call({await, Key, Pid}, {_,Ref} = _From, S, _E) ->
  233. %% The pid in _From is of the gen_leader instance that forwarded the
  234. %% call - not of the client. This is why the Pid is explicitly passed.
  235. case gproc_lib:await(Key, {Pid,Ref}) of
  236. noreply ->
  237. {noreply, S};
  238. {reply, Reply, Insert} ->
  239. {reply, Reply, [{insert, Insert}], S}
  240. end;
  241. handle_leader_call(_, _, S, _E) ->
  242. {reply, badarg, S}.
  243. handle_leader_cast({add_globals, Missing}, S, _E) ->
  244. %% This is an audit message: a peer (non-leader) had info about granted
  245. %% global resources that we didn't know of when we became leader.
  246. %% This could happen due to a race condition when the old leader died.
  247. ets:insert(?TAB, Missing),
  248. {ok, [{insert, Missing}], S};
  249. handle_leader_cast({remove_globals, Globals}, S, _E) ->
  250. delete_globals(Globals),
  251. {ok, S};
  252. handle_leader_cast({pid_is_DOWN, Pid}, S, _E) ->
  253. Globals = ets:select(?TAB, [{{{Pid,'$1'},r},
  254. [{'==',{element,2,'$1'},g}],[{{'$1',Pid}}]}]),
  255. io:fwrite("pid_is_DOWN(~p); Globals = ~p~n", [Pid,Globals]),
  256. %% ets:select_delete(?TAB, [{{{Pid,{'_',g,'_'}},r},[],[true]}]),
  257. ets:delete(?TAB, Pid),
  258. case process_globals(Globals) of
  259. [] ->
  260. {ok, S};
  261. Broadcast ->
  262. {ok, Broadcast, S}
  263. end.
  264. process_globals(Globals) ->
  265. Modified =
  266. lists:foldl(
  267. fun({{T,_,_} = Key, Pid}, A) ->
  268. A1 = case T of
  269. c ->
  270. Incr = ets:lookup_element(?TAB, {Key,Pid}, 3),
  271. update_aggr_counter(Key, -Incr) ++ A;
  272. _ ->
  273. A
  274. end,
  275. K = ets_key(Key, Pid),
  276. ets:delete(?TAB, K),
  277. ets:delete(?TAB, {Pid,Key}),
  278. A1
  279. end, [], Globals),
  280. [{Op,Objs} || {Op,Objs} <- [{insert,Modified},
  281. {delete,Globals}], Objs =/= []].
  282. code_change(_FromVsn, S, _Extra, _E) ->
  283. {ok, S}.
  284. terminate(_Reason, _S) ->
  285. ok.
  286. from_leader(Ops, S, _E) ->
  287. lists:foreach(
  288. fun({delete, Globals}) ->
  289. delete_globals(Globals);
  290. ({insert, Globals}) ->
  291. ets:insert(?TAB, Globals),
  292. lists:foreach(
  293. fun({{{_,g,_}=Key,_}, P, _}) ->
  294. ets:insert(?TAB, {{P,Key},r}),
  295. gproc_lib:ensure_monitor(P,g);
  296. ({{P,_K},r}) ->
  297. gproc_lib:ensure_monitor(P,g);
  298. (_) ->
  299. skip
  300. end, Globals)
  301. end, Ops),
  302. {ok, S}.
  303. delete_globals(Globals) ->
  304. lists:foreach(
  305. fun({Key, Pid}) ->
  306. K = ets_key(Key,Pid),
  307. ets:delete(?TAB, K),
  308. ets:delete(?TAB, {Pid, Key}),
  309. case node(Pid) =:= node() of
  310. true ->
  311. ets:delete(?TAB, Pid);
  312. _ -> ok
  313. end
  314. end, Globals).
  315. ets_key({T,_,_} = K, _) when T==n; T==a ->
  316. {K, T};
  317. ets_key(K, Pid) ->
  318. {K, Pid}.
  319. leader_call(Req) ->
  320. case gen_leader:leader_call(?MODULE, Req) of
  321. badarg -> erlang:error(badarg, Req);
  322. Reply -> Reply
  323. end.
  324. leader_cast(Msg) ->
  325. gen_leader:leader_cast(?MODULE, Msg).
  326. init(Opts) ->
  327. S0 = #state{},
  328. AlwaysBcast = proplists:get_value(always_broadcast, Opts,
  329. S0#state.always_broadcast),
  330. {ok, #state{always_broadcast = AlwaysBcast}}.
  331. surrendered_1(Globs) ->
  332. My_local_globs =
  333. ets:select(?TAB, [{{{{'_',g,'_'},'_'},'$1', '_'},
  334. [{'==', {node,'$1'}, node()}],
  335. ['$_']}]),
  336. %% remove all remote globals - we don't have monitors on them.
  337. ets:select_delete(?TAB, [{{{{'_',g,'_'},'_'}, '$1', '_'},
  338. [{'=/=', {node,'$1'}, node()}],
  339. [true]}]),
  340. %% insert new non-local globals, collect the leader's version of
  341. %% what my globals are
  342. Ldr_local_globs =
  343. lists:foldl(
  344. fun({{Key,_}=K, Pid, V}, Acc) when node(Pid) =/= node() ->
  345. ets:insert(?TAB, [{K, Pid, V}, {{Pid,Key}}]),
  346. Acc;
  347. ({_, Pid, _} = Obj, Acc) when node(Pid) == node() ->
  348. [Obj|Acc]
  349. end, [], Globs),
  350. case [{K,P,V} || {K,P,V} <- My_local_globs,
  351. not(lists:keymember(K, 1, Ldr_local_globs))] of
  352. [] ->
  353. %% phew! We have the same picture
  354. ok;
  355. [_|_] = Missing ->
  356. %% This is very unlikely, I think
  357. leader_cast({add_globals, Missing})
  358. end,
  359. case [{K,P} || {K,P,_} <- Ldr_local_globs,
  360. not(lists:keymember(K, 1, My_local_globs))] of
  361. [] ->
  362. ok;
  363. [_|_] = Remove ->
  364. leader_cast({remove_globals, Remove})
  365. end.
  366. update_aggr_counter({c,g,Ctr}, Incr) ->
  367. Key = {{a,g,Ctr},a},
  368. case ets:lookup(?TAB, Key) of
  369. [] ->
  370. [];
  371. [{K, Pid, Prev}] ->
  372. New = {K, Pid, Prev+Incr},
  373. ets:insert(?TAB, New),
  374. [New]
  375. end.