gproc_dist.erl 18 KB


  1. %% ``The contents of this file are subject to the Erlang Public License,
  2. %% Version 1.1, (the "License"); you may not use this file except in
  3. %% compliance with the License. You should have received a copy of the
  4. %% Erlang Public License along with this software. If not, it can be
  5. %% retrieved via the world wide web at http://www.erlang.org/.
  6. %%
  7. %% Software distributed under the License is distributed on an "AS IS"
  8. %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  9. %% the License for the specific language governing rights and limitations
  10. %% under the License.
  11. %%
  12. %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
  13. %% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
  14. %% AB. All Rights Reserved.''
  15. %%
  16. %% @author Ulf Wiger <ulf.wiger@erlang-solutions.com>
  17. %%
  18. %% @doc Extended process registry
  19. %% <p>This module implements an extended process registry</p>
  20. %% <p>For a detailed description, see gproc/doc/erlang07-wiger.pdf.</p>
  21. %% @end
  22. -module(gproc_dist).
  23. -behaviour(gen_leader).
  24. -export([start_link/0, start_link/1,
  25. reg/1, reg/2, unreg/1,
  26. mreg/2,
  27. munreg/2,
  28. set_value/2,
  29. give_away/2,
  30. update_counter/2]).
  31. -export([leader_call/1,
  32. leader_cast/1,
  33. sync/0,
  34. get_leader/0]).
  35. %%% internal exports
  36. -export([init/1,
  37. handle_cast/3,
  38. handle_call/4,
  39. handle_info/2,
  40. handle_leader_call/4,
  41. handle_leader_cast/3,
  42. handle_DOWN/3,
  43. elected/2, % original version
  44. elected/3,
  45. surrendered/3,
  46. from_leader/3,
  47. code_change/4,
  48. terminate/2]).
  49. -include("gproc.hrl").
  50. -define(SERVER, ?MODULE).
  51. -record(state, {
  52. always_broadcast = false,
  53. is_leader,
  54. sync_requests = []}).
  55. %% ==========================================================
  56. %% Start functions
  57. start_link() ->
  58. start_link({[node()|nodes()], []}).
  59. start_link(all) ->
  60. start_link({[node()|nodes()], []});
  61. start_link(Nodes) when is_list(Nodes) ->
  62. start_link({Nodes, []});
  63. start_link({Nodes, Opts}) ->
  64. gen_leader:start_link(
  65. ?SERVER, Nodes, Opts, ?MODULE, [], []).
  66. %% ==========================================================
  67. %% API
  68. %% {@see gproc:reg/1}
  69. %%
  70. reg(Key) ->
  71. reg(Key, gproc:default(Key)).
  72. %%% @spec({Class,Scope, Key}, Value) -> true
  73. %%% @doc
  74. %%% Class = n - unique name
  75. %%% | p - non-unique property
  76. %%% | c - counter
  77. %%% | a - aggregated counter
  78. %%% Scope = l | g (global or local)
  79. %%% @end
  80. reg({_,g,_} = Key, Value) ->
  81. %% anything global
  82. leader_call({reg, Key, Value, self()});
  83. reg(_, _) ->
  84. erlang:error(badarg).
  85. mreg(T, KVL) ->
  86. if is_list(KVL) -> leader_call({mreg, T, g, KVL, self()});
  87. true -> erlang:error(badarg)
  88. end.
  89. munreg(T, Keys) ->
  90. if is_list(Keys) -> leader_call({munreg, T, g, Keys, self()});
  91. true -> erlang:error(badarg)
  92. end.
  93. unreg({_,g,_} = Key) ->
  94. leader_call({unreg, Key, self()});
  95. unreg(_) ->
  96. erlang:error(badarg).
  97. set_value({T,g,_} = Key, Value) when T==a; T==c ->
  98. if is_integer(Value) ->
  99. leader_call({set, Key, Value});
  100. true ->
  101. erlang:error(badarg)
  102. end;
  103. set_value({_,g,_} = Key, Value) ->
  104. leader_call({set, Key, Value, self()});
  105. set_value(_, _) ->
  106. erlang:error(badarg).
  107. give_away({_,g,_} = Key, To) ->
  108. leader_call({give_away, Key, To, self()}).
  109. update_counter({c,g,_} = Key, Incr) when is_integer(Incr) ->
  110. leader_call({update_counter, Key, Incr, self()});
  111. update_counter(_, _) ->
  112. erlang:error(badarg).
  113. %% @spec sync() -> true
  114. %% @doc Synchronize with the gproc leader
  115. %%
  116. %% This function can be used to ensure that data has been replicated from the
  117. %% leader to the current node. It does so by asking the leader to ping all
  118. %% live participating nodes. The call will return `true' when all these nodes
  119. %% have either responded or died. In the special case where the leader dies
  120. %% during an ongoing sync, the call will fail with a timeout exception.
  121. %% (Actually, it should be a `leader_died' exception; more study needed to find out
  122. %% why gen_leader times out in this situation, rather than reporting that the
  123. %% leader died.)
  124. %% @end
  125. %%
  126. sync() ->
  127. leader_call(sync).
  128. %% @spec get_leader() -> node()
  129. %% @doc Returns the node of the current gproc leader.
  130. %% @end
  131. get_leader() ->
  132. gen_leader:call(?MODULE, get_leader).
  133. %% ==========================================================
  134. %% Server-side
  135. handle_cast(_Msg, S, _) ->
  136. {stop, unknown_cast, S}.
  137. handle_call(get_leader, _, S, E) ->
  138. {reply, gen_leader:leader_node(E), S};
  139. handle_call(_, _, S, _) ->
  140. {reply, badarg, S}.
  141. handle_info({'DOWN', _MRef, process, Pid, _}, S) ->
  142. leader_cast({pid_is_DOWN, Pid}),
  143. {ok, S};
  144. handle_info(_, S) ->
  145. {ok, S}.
  146. elected(S, _E) ->
  147. {ok, {globals,globs()}, S#state{is_leader = true}}.
  148. elected(S, _E, undefined) ->
  149. %% I have become leader; full synch
  150. {ok, {globals, globs()}, S#state{is_leader = true}};
  151. elected(S, _E, _Node) ->
  152. Synch = {globals, globs()},
  153. if not S#state.always_broadcast ->
  154. %% Another node recognized us as the leader.
  155. %% Don't broadcast all data to everyone else
  156. {reply, Synch, S};
  157. true ->
  158. %% Main reason for doing this is if we are using a gen_leader
  159. %% that doesn't support the 'reply' return value
  160. {ok, Synch, S}
  161. end.
  162. globs() ->
  163. ets:select(?TAB, [{{{{'_',g,'_'},'_'},'_','_'},[],['$_']}]).
  164. surrendered(S, {globals, Globs}, _E) ->
  165. %% globals from this node should be more correct in our table than
  166. %% in the leader's
  167. surrendered_1(Globs),
  168. {ok, S#state{is_leader = false}}.
  169. handle_DOWN(Node, S, _E) ->
  170. S1 = check_sync_requests(Node, S),
  171. Head = {{{'_',g,'_'},'_'},'$1','_'},
  172. Gs = [{'==', {node,'$1'},Node}],
  173. Globs = ets:select(?TAB, [{Head, Gs, [{{{element,1,{element,1,'$_'}},
  174. {element,2,'$_'}}}]}]),
  175. case process_globals(Globs) of
  176. [] ->
  177. {ok, S1};
  178. Broadcast ->
  179. {ok, Broadcast, S1}
  180. end.
  181. check_sync_requests(Node, #state{sync_requests = SReqs} = S) ->
  182. SReqs1 = lists:flatmap(
  183. fun({From, Ns}) ->
  184. case Ns -- [Node] of
  185. [] ->
  186. gen_leader:reply(From, {leader, reply, true}),
  187. [];
  188. Ns1 ->
  189. [{From, Ns1}]
  190. end
  191. end, SReqs),
  192. S#state{sync_requests = SReqs1}.
  193. handle_leader_call(sync, From, #state{sync_requests = SReqs} = S, E) ->
  194. case gen_leader:alive(E) -- [node()] of
  195. [] ->
  196. {reply, true, S};
  197. Alive ->
  198. gen_leader:broadcast({from_leader, {sync, From}}, Alive, E),
  199. {noreply, S#state{sync_requests = [{From, Alive}|SReqs]}}
  200. end;
  201. handle_leader_call({reg, {C,g,Name} = K, Value, Pid}, _From, S, _E) ->
  202. case gproc_lib:insert_reg(K, Value, Pid, g) of
  203. false ->
  204. {reply, badarg, S};
  205. true ->
  206. _ = gproc_lib:ensure_monitor(Pid,g),
  207. Vals =
  208. if C == a ->
  209. ets:lookup(?TAB, {K,a});
  210. C == c ->
  211. [{{K,Pid},Pid,Value} | ets:lookup(?TAB,{{a,g,Name},a})];
  212. C == n ->
  213. [{{K,n},Pid,Value}];
  214. true ->
  215. [{{K,Pid},Pid,Value}]
  216. end,
  217. {reply, true, [{insert, Vals}], S}
  218. end;
  219. handle_leader_call({update_counter, {c,g,_Ctr} = Key, Incr, Pid}, _From, S, _E)
  220. when is_integer(Incr) ->
  221. try New = ets:update_counter(?TAB, {Key, Pid}, {3,Incr}),
  222. Vals = [{{Key,Pid},Pid,New} | update_aggr_counter(Key, Incr)],
  223. {reply, New, [{insert, Vals}], S}
  224. catch
  225. error:_ ->
  226. {reply, badarg, S}
  227. end;
  228. handle_leader_call({unreg, {T,g,Name} = K, Pid}, _From, S, _E) ->
  229. Key = if T == n; T == a -> {K,T};
  230. true -> {K, Pid}
  231. end,
  232. case ets:member(?TAB, Key) of
  233. true ->
  234. _ = gproc_lib:remove_reg(K, Pid),
  235. if T == c ->
  236. case ets:lookup(?TAB, {{a,g,Name},a}) of
  237. [Aggr] ->
  238. %% updated by remove_reg/2
  239. {reply, true, [{delete,[Key, {Pid,K}]},
  240. {insert, [Aggr]}], S};
  241. [] ->
  242. {reply, true, [{delete, [Key, {Pid,K}]}], S}
  243. end;
  244. true ->
  245. {reply, true, [{delete, [Key]}], S}
  246. end;
  247. false ->
  248. {reply, badarg, S}
  249. end;
  250. handle_leader_call({give_away, {T,g,_} = K, To, Pid}, _From, S, _E)
  251. when T == a; T == n ->
  252. Key = {K, T},
  253. case ets:lookup(?TAB, Key) of
  254. [{_, Pid, Value}] ->
  255. case pid_to_give_away_to(To) of
  256. Pid ->
  257. {reply, Pid, S};
  258. ToPid when is_pid(ToPid) ->
  259. ets:insert(?TAB, [{Key, ToPid, Value},
  260. {{ToPid,K}, r}]),
  261. _ = gproc_lib:ensure_monitor(ToPid, g),
  262. {reply, ToPid, [{delete, [Key, {Pid,K}]},
  263. {insert, [{Key, ToPid, Value}]}], S};
  264. undefined ->
  265. ets:delete(?TAB, Key),
  266. ets:delete(?TAB, {Pid, K}),
  267. {reply, undefined, [{delete, [Key, {Pid,K}]}], S}
  268. end;
  269. _ ->
  270. {reply, badarg, S}
  271. end;
  272. handle_leader_call({mreg, T, g, L, Pid}, _From, S, _E) ->
  273. if T==p; T==n ->
  274. try gproc_lib:insert_many(T, g, L, Pid) of
  275. {true,Objs} -> {reply, true, [{insert,Objs}], S};
  276. false -> {reply, badarg, S}
  277. catch
  278. error:_ -> {reply, badarg, S}
  279. end;
  280. true -> {reply, badarg, S}
  281. end;
  282. handle_leader_call({munreg, T, g, L, Pid}, _From, S, _E) ->
  283. try gproc_lib:remove_many(T, g, L, Pid) of
  284. [] ->
  285. {reply, true, S};
  286. Objs ->
  287. {reply, true, [{delete, Objs}], S}
  288. catch
  289. error:_ -> {reply, badarg, S}
  290. end;
  291. handle_leader_call({set,{T,g,N} =K,V,Pid}, _From, S, _E) ->
  292. if T == a ->
  293. if is_integer(V) ->
  294. case gproc_lib:do_set_value(K, V, Pid) of
  295. true -> {reply, true, [{insert,[{{K,T},Pid,V}]}], S};
  296. false -> {reply, badarg, S}
  297. end
  298. end;
  299. T == c ->
  300. try gproc_lib:do_set_counter_value(K, V, Pid),
  301. AKey = {{a,g,N},a},
  302. Aggr = ets:lookup(?TAB, AKey), % may be []
  303. {reply, true, [{insert, [{{K,Pid},Pid,V} | Aggr]}], S}
  304. catch
  305. error:_ ->
  306. {reply, badarg, S}
  307. end;
  308. true ->
  309. case gproc_lib:do_set_value(K, V, Pid) of
  310. true ->
  311. Obj = if T==n -> {{K, T}, Pid, V};
  312. true -> {{K, Pid}, Pid, V}
  313. end,
  314. {reply, true, [{insert,[Obj]}], S};
  315. false ->
  316. {reply, badarg, S}
  317. end
  318. end;
  319. handle_leader_call({await, Key, Pid}, {_,Ref} = From, S, _E) ->
  320. %% The pid in _From is of the gen_leader instance that forwarded the
  321. %% call - not of the client. This is why the Pid is explicitly passed.
  322. %% case gproc_lib:await(Key, {Pid,Ref}) of
  323. case gproc_lib:await(Key, Pid, From) of
  324. {reply, {Ref, {K, P, V}}} ->
  325. {reply, {Ref, {K, P, V}}, S};
  326. {reply, Reply, Insert} ->
  327. {reply, Reply, [{insert, Insert}], S}
  328. end;
  329. handle_leader_call(_, _, S, _E) ->
  330. {reply, badarg, S}.
  331. handle_leader_cast({sync_reply, Node, Ref}, S, _E) ->
  332. #state{sync_requests = SReqs} = S,
  333. case lists:keyfind(Ref, 1, SReqs) of
  334. false ->
  335. %% This should never happen, except perhaps if the leader who
  336. %% received the sync request died, and the new leader gets the
  337. %% sync reply. In that case, we trust that the client has been notified
  338. %% anyway, and ignore the message.
  339. {ok, S};
  340. {_, Ns} ->
  341. case lists:delete(Node, Ns) of
  342. [] ->
  343. gen_leader:reply(Ref, {leader, reply, true}),
  344. {ok, S#state{sync_requests = lists:keydelete(Ref,1,SReqs)}};
  345. Ns1 ->
  346. SReqs1 = lists:keyreplace(Ref, 1, SReqs, {Ref, Ns1}),
  347. {ok, S#state{sync_requests = SReqs1}}
  348. end
  349. end;
  350. handle_leader_cast({add_globals, Missing}, S, _E) ->
  351. %% This is an audit message: a peer (non-leader) had info about granted
  352. %% global resources that we didn't know of when we became leader.
  353. %% This could happen due to a race condition when the old leader died.
  354. ets:insert(?TAB, Missing),
  355. {ok, [{insert, Missing}], S};
  356. handle_leader_cast({remove_globals, Globals}, S, _E) ->
  357. delete_globals(Globals),
  358. {ok, S};
  359. handle_leader_cast({pid_is_DOWN, Pid}, S, _E) ->
  360. Globals = ets:select(?TAB, [{{{Pid,'$1'},r},
  361. [{'==',{element,2,'$1'},g}],[{{'$1',Pid}}]}]),
  362. ets:delete(?TAB, {Pid,g}),
  363. case process_globals(Globals) of
  364. [] ->
  365. {ok, S};
  366. Broadcast ->
  367. {ok, Broadcast, S}
  368. end.
  369. process_globals(Globals) ->
  370. Modified =
  371. lists:foldl(
  372. fun({{T,_,_} = Key, Pid}, A) ->
  373. A1 = case T of
  374. c ->
  375. Incr = ets:lookup_element(?TAB, {Key,Pid}, 3),
  376. update_aggr_counter(Key, -Incr) ++ A;
  377. _ ->
  378. A
  379. end,
  380. K = ets_key(Key, Pid),
  381. ets:delete(?TAB, K),
  382. ets:delete(?TAB, {Pid,Key}),
  383. A1
  384. end, [], Globals),
  385. [{Op,Objs} || {Op,Objs} <- [{insert,Modified},
  386. {delete,Globals}], Objs =/= []].
  387. code_change(_FromVsn, S, _Extra, _E) ->
  388. {ok, S}.
  389. terminate(_Reason, _S) ->
  390. ok.
  391. from_leader({sync, Ref}, S, _E) ->
  392. gen_leader:leader_cast(?MODULE, {sync_reply, node(), Ref}),
  393. {ok, S};
  394. from_leader(Ops, S, _E) ->
  395. lists:foreach(
  396. fun({delete, Globals}) ->
  397. delete_globals(Globals);
  398. ({insert, Globals}) ->
  399. ets:insert(?TAB, Globals),
  400. lists:foreach(
  401. fun({{{_,g,_}=Key,_}, P, _}) ->
  402. ets:insert(?TAB, {{P,Key},r}),
  403. gproc_lib:ensure_monitor(P,g);
  404. ({{P,_K},r}) ->
  405. gproc_lib:ensure_monitor(P,g);
  406. (_) ->
  407. skip
  408. end, Globals)
  409. end, Ops),
  410. {ok, S}.
  411. delete_globals(Globals) ->
  412. lists:foreach(
  413. fun({{_,g,_},T} = K) when is_atom(T) ->
  414. ets:delete(?TAB, K);
  415. ({Key, Pid}) when is_pid(Pid) ->
  416. K = ets_key(Key,Pid),
  417. ets:delete(?TAB, K),
  418. ets:delete(?TAB, {Pid, Key});
  419. ({Pid, K}) when is_pid(Pid) ->
  420. ets:delete(?TAB, {Pid, K})
  421. end, Globals).
  422. ets_key({T,_,_} = K, _) when T==n; T==a ->
  423. {K, T};
  424. ets_key(K, Pid) ->
  425. {K, Pid}.
  426. leader_call(Req) ->
  427. case gen_leader:leader_call(?MODULE, Req) of
  428. badarg -> erlang:error(badarg, Req);
  429. Reply -> Reply
  430. end.
  431. leader_cast(Msg) ->
  432. gen_leader:leader_cast(?MODULE, Msg).
  433. init(Opts) ->
  434. S0 = #state{},
  435. AlwaysBcast = proplists:get_value(always_broadcast, Opts,
  436. S0#state.always_broadcast),
  437. {ok, #state{always_broadcast = AlwaysBcast}}.
  438. surrendered_1(Globs) ->
  439. My_local_globs =
  440. ets:select(?TAB, [{{{{'_',g,'_'},'_'},'$1', '_'},
  441. [{'==', {node,'$1'}, node()}],
  442. ['$_']}]),
  443. %% remove all remote globals - we don't have monitors on them.
  444. ets:select_delete(?TAB, [{{{{'_',g,'_'},'_'}, '$1', '_'},
  445. [{'=/=', {node,'$1'}, node()}],
  446. [true]}]),
  447. %% insert new non-local globals, collect the leader's version of
  448. %% what my globals are
  449. Ldr_local_globs =
  450. lists:foldl(
  451. fun({{Key,_}=K, Pid, V}, Acc) when node(Pid) =/= node() ->
  452. ets:insert(?TAB, [{K, Pid, V}, {{Pid,Key}}]),
  453. Acc;
  454. ({_, Pid, _} = Obj, Acc) when node(Pid) == node() ->
  455. [Obj|Acc]
  456. end, [], Globs),
  457. case [{K,P,V} || {K,P,V} <- My_local_globs,
  458. not(lists:keymember(K, 1, Ldr_local_globs))] of
  459. [] ->
  460. %% phew! We have the same picture
  461. ok;
  462. [_|_] = Missing ->
  463. %% This is very unlikely, I think
  464. leader_cast({add_globals, Missing})
  465. end,
  466. case [{K,P} || {K,P,_} <- Ldr_local_globs,
  467. not(lists:keymember(K, 1, My_local_globs))] of
  468. [] ->
  469. ok;
  470. [_|_] = Remove ->
  471. leader_cast({remove_globals, Remove})
  472. end.
  473. update_aggr_counter({c,g,Ctr}, Incr) ->
  474. Key = {{a,g,Ctr},a},
  475. case ets:lookup(?TAB, Key) of
  476. [] ->
  477. [];
  478. [{K, Pid, Prev}] ->
  479. New = {K, Pid, Prev+Incr},
  480. ets:insert(?TAB, New),
  481. [New]
  482. end.
  483. pid_to_give_away_to(P) when is_pid(P) ->
  484. P;
  485. pid_to_give_away_to({T,g,_} = Key) when T==n; T==a ->
  486. case ets:lookup(?TAB, {Key, T}) of
  487. [{_, Pid, _}] ->
  488. Pid;
  489. _ ->
  490. undefined
  491. end.