gproc_dist.erl 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848
  1. %% ``The contents of this file are subject to the Erlang Public License,
  2. %% Version 1.1, (the "License"); you may not use this file except in
  3. %% compliance with the License. You should have received a copy of the
  4. %% Erlang Public License along with this software. If not, it can be
  5. %% retrieved via the world wide web at http://www.erlang.org/.
  6. %%
  7. %% Software distributed under the License is distributed on an "AS IS"
  8. %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  9. %% the License for the specific language governing rights and limitations
  10. %% under the License.
  11. %%
  12. %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
  13. %% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
  14. %% AB. All Rights Reserved.''
  15. %%
  16. %% @author Ulf Wiger <ulf@wiger.net>
  17. %%
  18. %% @doc Extended process registry
  19. %% <p>This module implements an extended process registry</p>
  20. %% <p>For a detailed description, see gproc/doc/erlang07-wiger.pdf.</p>
  21. %% @end
  22. -module(gproc_dist).
  23. -behaviour(gen_leader).
  24. -export([start_link/0, start_link/1,
  25. reg/1, reg/2, unreg/1,
  26. reg_or_locate/3,
  27. reg_shared/2, unreg_shared/1,
  28. set_attributes/2,
  29. set_attributes_shared/2,
  30. mreg/2,
  31. munreg/2,
  32. set_value/2,
  33. set_value_shared/2,
  34. give_away/2,
  35. update_counter/3,
  36. update_counters/1,
  37. update_shared_counter/2,
  38. reset_counter/1]).
  39. -export([leader_call/1,
  40. leader_cast/1,
  41. sync/0,
  42. get_leader/0]).
  43. %%% internal exports
  44. -export([init/1,
  45. handle_cast/3,
  46. handle_call/4,
  47. handle_info/2, handle_info/3,
  48. handle_leader_call/4,
  49. handle_leader_cast/3,
  50. handle_DOWN/3,
  51. elected/2, % original version
  52. elected/3,
  53. surrendered/3,
  54. from_leader/3,
  55. code_change/4,
  56. terminate/2]).
  57. -include("gproc_int.hrl").
  58. -include("gproc.hrl").
  59. -define(SERVER, ?MODULE).
  60. -record(state, {
  61. always_broadcast = false,
  62. is_leader,
  63. sync_requests = []}).
  64. %% ==========================================================
  65. %% Start functions
  66. start_link() ->
  67. start_link({[node()|nodes()], []}).
  68. start_link(all) ->
  69. start_link({[node()|nodes()], [{bcast_type, all}]});
  70. start_link(Nodes) when is_list(Nodes) ->
  71. start_link({Nodes, []});
  72. start_link({Nodes, Opts}) ->
  73. SpawnOpts = gproc_lib:valid_opts(server_options, []),
  74. gen_leader:start_link(
  75. ?SERVER, Nodes, Opts, ?MODULE, [], [{spawn_opt, SpawnOpts}]).
  76. %% ==========================================================
  77. %% API
  78. %% {@see gproc:reg/1}
  79. %%
  80. reg(Key) ->
  81. reg(Key, gproc:default(Key)).
  82. %% {@see gproc:reg_or_locate/2}
  83. %%
  84. reg_or_locate({n,g,_} = Key, Value, Pid) when is_pid(Pid) ->
  85. leader_call({reg_or_locate, Key, Value, Pid});
  86. reg_or_locate({n,g,_} = Key, Value, F) when is_function(F, 0) ->
  87. MyGroupLeader = group_leader(),
  88. leader_call({reg_or_locate, Key, Value,
  89. fun() ->
  90. %% leader will spawn on caller's node
  91. group_leader(MyGroupLeader, self()),
  92. F()
  93. end});
  94. reg_or_locate(_, _, _) ->
  95. ?THROW_GPROC_ERROR(badarg).
  96. %%% @spec({Class,Scope, Key}, Value) -> true
  97. %%% @doc
  98. %%% Class = n - unique name
  99. %%% | p - non-unique property
  100. %%% | c - counter
  101. %%% | a - aggregated counter
  102. %%% Scope = l | g (global or local)
  103. %%% @end
  104. reg({_,g,_} = Key, Value) ->
  105. %% anything global
  106. leader_call({reg, Key, Value, self()});
  107. reg(_, _) ->
  108. ?THROW_GPROC_ERROR(badarg).
  109. reg_shared({_,g,_} = Key, Value) ->
  110. leader_call({reg, Key, Value, shared});
  111. reg_shared(_, _) ->
  112. ?THROW_GPROC_ERROR(badarg).
  113. set_attributes({_,g,_} = Key, Attrs) ->
  114. leader_call({set_attributes, Key, Attrs, self()});
  115. set_attributes(_, _) ->
  116. ?THROW_GPROC_ERROR(badarg).
  117. set_attributes_shared({_,g,_} = Key, Attrs) ->
  118. leader_call({set_attributes, Key, Attrs, shared});
  119. set_attributes_shared(_, _) ->
  120. ?THROW_GPROC_ERROR(badarg).
  121. mreg(T, KVL) ->
  122. if is_list(KVL) -> leader_call({mreg, T, g, KVL, self()});
  123. true -> ?THROW_GPROC_ERROR(badarg)
  124. end.
  125. munreg(T, Keys) ->
  126. if is_list(Keys) -> leader_call({munreg, T, g, Keys, self()});
  127. true -> ?THROW_GPROC_ERROR(badarg)
  128. end.
  129. unreg({_,g,_} = Key) ->
  130. leader_call({unreg, Key, self()});
  131. unreg(_) ->
  132. ?THROW_GPROC_ERROR(badarg).
  133. unreg_shared({T,g,_} = Key) when T==c; T==a ->
  134. leader_call({unreg, Key, shared});
  135. unreg_shared(_) ->
  136. ?THROW_GPROC_ERROR(badarg).
  137. set_value({T,g,_} = Key, Value) when T==a; T==c ->
  138. if is_integer(Value) ->
  139. leader_call({set, Key, Value, self()});
  140. true ->
  141. ?THROW_GPROC_ERROR(badarg)
  142. end;
  143. set_value({_,g,_} = Key, Value) ->
  144. leader_call({set, Key, Value, self()});
  145. set_value(_, _) ->
  146. ?THROW_GPROC_ERROR(badarg).
  147. set_value_shared({T,g,_} = Key, Value) when T==a; T==c; T==p ->
  148. leader_call({set, Key, Value, shared});
  149. set_value_shared(_, _) ->
  150. ?THROW_GPROC_ERROR(badarg).
  151. give_away({_,g,_} = Key, To) ->
  152. leader_call({give_away, Key, To, self()}).
  153. update_counter({T,g,_} = Key, Pid, Incr) when is_integer(Incr), T==c;
  154. is_integer(Incr), T==n ->
  155. leader_call({update_counter, Key, Incr, Pid});
  156. update_counter(_, _, _) ->
  157. ?THROW_GPROC_ERROR(badarg).
  158. update_counters(List) when is_list(List) ->
  159. leader_call({update_counters, List});
  160. update_counters(_) ->
  161. ?THROW_GPROC_ERROR(badarg).
  162. update_shared_counter({c,g,_} = Key, Incr) when is_integer(Incr) ->
  163. leader_call({update_counter, Key, Incr, shared});
  164. update_shared_counter(_, _) ->
  165. ?THROW_GPROC_ERROR(badarg).
  166. reset_counter({c,g,_} = Key) ->
  167. leader_call({reset_counter, Key, self()});
  168. reset_counter(_) ->
  169. ?THROW_GPROC_ERROR(badarg).
  170. %% @spec sync() -> true
  171. %% @doc Synchronize with the gproc leader
  172. %%
  173. %% This function can be used to ensure that data has been replicated from the
  174. %% leader to the current node. It does so by asking the leader to ping all
  175. %% live participating nodes. The call will return `true' when all these nodes
  176. %% have either responded or died. In the special case where the leader dies
  177. %% during an ongoing sync, the call will fail with a timeout exception.
  178. %% (Actually, it should be a `leader_died' exception; more study needed to find
  179. %% out why gen_leader times out in this situation, rather than reporting that
  180. %% the leader died.)
  181. %% @end
  182. %%
  183. sync() ->
  184. leader_call(sync).
  185. %% @spec get_leader() -> node()
  186. %% @doc Returns the node of the current gproc leader.
  187. %% @end
  188. get_leader() ->
  189. GenLeader = gen_leader,
  190. GenLeader:call(?MODULE, get_leader).
  191. %% ==========================================================
  192. %% Server-side
  193. handle_cast(_Msg, S, _) ->
  194. {stop, unknown_cast, S}.
  195. handle_call(get_leader, _, S, E) ->
  196. {reply, gen_leader:leader_node(E), S};
  197. handle_call(_, _, S, _) ->
  198. {reply, badarg, S}.
  199. handle_info({'DOWN', _MRef, process, Pid, _}, S) ->
  200. ets:delete(?TAB, {Pid, g}),
  201. leader_cast({pid_is_DOWN, Pid}),
  202. {ok, S};
  203. handle_info(_, S) ->
  204. {ok, S}.
  205. handle_info(Msg, S, _E) ->
  206. handle_info(Msg, S).
  207. elected(S, _E) ->
  208. {ok, {globals,globs()}, S#state{is_leader = true}}.
  209. elected(S, _E, undefined) ->
  210. %% I have become leader; full synch
  211. {ok, {globals, globs()}, S#state{is_leader = true}};
  212. elected(S, _E, _Node) ->
  213. Synch = {globals, globs()},
  214. if not S#state.always_broadcast ->
  215. %% Another node recognized us as the leader.
  216. %% Don't broadcast all data to everyone else
  217. {reply, Synch, S};
  218. true ->
  219. %% Main reason for doing this is if we are using a gen_leader
  220. %% that doesn't support the 'reply' return value
  221. {ok, Synch, S}
  222. end.
  223. globs() ->
  224. Gs = ets:select(?TAB, [{{{{'_',g,'_'},'_'},'_','_'},[],['$_']}]),
  225. As = ets:select(?TAB, [{{{'$1',{'_',g,'_'}}, '$2'},[],['$_']}]),
  226. _ = [gproc_lib:ensure_monitor(Pid, g) || {_, Pid, _} <- Gs],
  227. Gs ++ As.
  228. surrendered(#state{is_leader = true} = S, {globals, Globs}, _E) ->
  229. %% Leader conflict!
  230. surrendered_1(Globs),
  231. {ok, S#state{is_leader = false}};
  232. surrendered(S, {globals, Globs}, _E) ->
  233. %% globals from this node should be more correct in our table than
  234. %% in the leader's
  235. surrendered_1(Globs),
  236. {ok, S#state{is_leader = false}}.
  237. handle_DOWN(Node, S, _E) ->
  238. S1 = check_sync_requests(Node, S),
  239. Head = {{{'_',g,'_'},'_'},'$1','_'},
  240. Gs = [{'==', {node,'$1'},Node}],
  241. Globs = ets:select(?TAB, [{Head, Gs, [{{{element,1,{element,1,'$_'}},
  242. {element,2,'$_'}}}]}]),
  243. case process_globals(Globs) of
  244. [] ->
  245. {ok, S1};
  246. Broadcast ->
  247. {ok, Broadcast, S1}
  248. end.
  249. check_sync_requests(Node, #state{sync_requests = SReqs} = S) ->
  250. SReqs1 = lists:flatmap(
  251. fun({From, Ns}) ->
  252. case Ns -- [Node] of
  253. [] ->
  254. gen_leader:reply(From, {leader, reply, true}),
  255. [];
  256. Ns1 ->
  257. [{From, Ns1}]
  258. end
  259. end, SReqs),
  260. S#state{sync_requests = SReqs1}.
  261. handle_leader_call(sync, From, #state{sync_requests = SReqs} = S, E) ->
  262. GenLeader = gen_leader,
  263. case GenLeader:alive(E) -- [node()] of
  264. [] ->
  265. {reply, true, S};
  266. Alive ->
  267. GenLeader:broadcast({from_leader, {sync, From}}, Alive, E),
  268. {noreply, S#state{sync_requests = [{From, Alive}|SReqs]}}
  269. end;
  270. handle_leader_call({reg, {_C,g,_Name} = K, Value, Pid}, _From, S, _E) ->
  271. case gproc_lib:insert_reg(K, Value, Pid, g) of
  272. false ->
  273. {reply, badarg, S};
  274. true ->
  275. _ = gproc_lib:ensure_monitor(Pid,g),
  276. Vals = mk_broadcast_insert_vals([{K, Pid, Value}]),
  277. %% Vals =
  278. %% if C == a ->
  279. %% ets:lookup(?TAB, {K,a});
  280. %% C == c ->
  281. %% [{{K,Pid},Pid,Value} | ets:lookup(?TAB,{{a,g,Name},a})];
  282. %% C == n ->
  283. %% [{{K,n},Pid,Value}];
  284. %% true ->
  285. %% [{{K,Pid},Pid,Value}]
  286. %% end,
  287. {reply, true, [{insert, Vals}], S}
  288. end;
  289. handle_leader_call({set_attributes, {_,g,_} = K, Attrs, Pid}, _From, S, _E) ->
  290. case gproc_lib:insert_attr(K, Attrs, Pid, g) of
  291. false ->
  292. {reply, badarg, S};
  293. NewAttrs when is_list(NewAttrs) ->
  294. {reply, true, [{insert, [{{Pid,K}, NewAttrs}]}], S}
  295. end;
  296. handle_leader_call({reg_or_locate, {n,g,_} = K, Value, P},
  297. {FromPid, _}, S, _E) ->
  298. FromNode = node(FromPid),
  299. Reg = fun() ->
  300. Pid = if is_function(P, 0) ->
  301. spawn(FromNode, P);
  302. is_pid(P) ->
  303. P
  304. end,
  305. case gproc_lib:insert_reg(K, Value, Pid, g) of
  306. true ->
  307. _ = gproc_lib:ensure_monitor(Pid,g),
  308. Vals = [{{K,n},Pid,Value}],
  309. {reply, {Pid, Value}, [{insert, Vals}], S};
  310. false ->
  311. {reply, badarg, S}
  312. end
  313. end,
  314. case ets:lookup(?TAB, {K, n}) of
  315. [] ->
  316. Reg();
  317. [{_, _Waiters}] ->
  318. Reg();
  319. [{_, OtherPid, OtherVal}] ->
  320. {reply, {OtherPid, OtherVal}, S}
  321. end;
  322. handle_leader_call({monitor, {T,g,_} = Key, Pid}, _From, S, _E)
  323. when T==n; T==a ->
  324. Ref = make_ref(),
  325. case gproc:where(Key) of
  326. undefined ->
  327. Pid ! {gproc, unreg, Ref, Key},
  328. {reply, Ref, [], S};
  329. RegPid ->
  330. NewRev =
  331. case ets:lookup_element(?TAB, K = {RegPid, Key}, 2) of
  332. r ->
  333. {K, [{monitor, [{Pid,Ref}]}]};
  334. Opts ->
  335. {K, gproc_lib:add_monitor(Opts, Pid, Ref)}
  336. end,
  337. ets:insert(?TAB, NewRev),
  338. {reply, Ref, [{insert, [NewRev]}], S}
  339. end;
  340. handle_leader_call({demonitor, {T,g,_} = Key, Ref, Pid}, _From, S, _E)
  341. when T==n; T==a ->
  342. case gproc:where(Key) of
  343. undefined ->
  344. {reply, ok, [], S};
  345. RegPid ->
  346. case ets:lookup_element(?TAB, K = {RegPid, Key}, 2) of
  347. r ->
  348. {reply, ok, [], S};
  349. Opts ->
  350. NewRev = {K, gproc_lib:remove_monitor(Opts, Pid, Ref)},
  351. ets:insert(?TAB, NewRev),
  352. {reply, Ref, [{insert, [NewRev]}], S}
  353. end
  354. end;
  355. handle_leader_call({update_counter, {T,g,_Ctr} = Key, Incr, Pid}, _From, S, _E)
  356. when is_integer(Incr), T==c;
  357. is_integer(Incr), T==n ->
  358. try New = ets:update_counter(?TAB, {Key, Pid}, {3,Incr}),
  359. RealPid = case Pid of
  360. n -> ets:lookup_element(?TAB, {Key,Pid}, 2);
  361. shared -> shared;
  362. P when is_pid(P) -> P
  363. end,
  364. Vals = [{{Key,Pid},RealPid,New} | update_aggr_counter(Key, Incr)],
  365. {reply, New, [{insert, Vals}], S}
  366. catch
  367. error:_ ->
  368. {reply, badarg, S}
  369. end;
  370. handle_leader_call({update_counters, Cs}, _From, S, _E) ->
  371. try {Replies, Vals} = batch_update_counters(Cs),
  372. {reply, Replies, [{insert, Vals}], S}
  373. catch
  374. error:_ ->
  375. {reply, badarg, S}
  376. end;
  377. handle_leader_call({reset_counter, {c,g,_Ctr} = Key, Pid}, _From, S, _E) ->
  378. try Current = ets:lookup_element(?TAB, {Key, Pid}, 3),
  379. Initial = case ets:lookup_element(?TAB, {Pid, Key}, 2) of
  380. r -> 0;
  381. Opts when is_list(Opts) ->
  382. proplists:get_value(initial, Opts, 0)
  383. end,
  384. Incr = Initial - Current,
  385. New = ets:update_counter(?TAB, {Key, Pid}, {3, Incr}),
  386. Vals = [{{Key,Pid},Pid,New} | update_aggr_counter(Key, Incr)],
  387. {reply, {Current, New}, [{insert, Vals}], S}
  388. catch
  389. error:_R ->
  390. io:fwrite("reset_counter failed: ~p~n~p~n", [_R, erlang:get_stacktrace()]),
  391. {reply, badarg, S}
  392. end;
  393. handle_leader_call({unreg, {T,g,Name} = K, Pid}, _From, S, _E) ->
  394. Key = if T == n; T == a -> {K,T};
  395. true -> {K, Pid}
  396. end,
  397. case ets:member(?TAB, Key) of
  398. true ->
  399. _ = gproc_lib:remove_reg(K, Pid, unreg),
  400. if T == c ->
  401. case ets:lookup(?TAB, {{a,g,Name},a}) of
  402. [Aggr] ->
  403. %% updated by remove_reg/3
  404. {reply, true, [{delete,[Key, {Pid,K}], unreg},
  405. {insert, [Aggr]}], S};
  406. [] ->
  407. {reply, true, [{delete, [Key, {Pid,K}], unreg}], S}
  408. end;
  409. true ->
  410. {reply, true, [{delete, [Key, {Pid,K}], unreg}], S}
  411. end;
  412. false ->
  413. {reply, badarg, S}
  414. end;
  415. handle_leader_call({give_away, {T,g,_} = K, To, Pid}, _From, S, _E)
  416. when T == a; T == n ->
  417. Key = {K, T},
  418. case ets:lookup(?TAB, Key) of
  419. [{_, Pid, Value}] ->
  420. case pid_to_give_away_to(To) of
  421. Pid ->
  422. {reply, Pid, S};
  423. ToPid when is_pid(ToPid) ->
  424. ets:insert(?TAB, [{Key, ToPid, Value},
  425. {{ToPid,K}, []}]),
  426. _ = gproc_lib:ensure_monitor(ToPid, g),
  427. {reply, ToPid, [{delete, [Key, {Pid,K}], {migrated,ToPid}},
  428. {insert, [{Key, ToPid, Value}]}], S};
  429. undefined ->
  430. ets:delete(?TAB, Key),
  431. Rev = gproc_lib:remove_reverse_mapping(unreg, Pid, K),
  432. {reply, undefined, [{delete, [Key, Rev], unreg}], S}
  433. end;
  434. _ ->
  435. {reply, badarg, S}
  436. end;
  437. handle_leader_call({mreg, T, g, L, Pid}, _From, S, _E) ->
  438. if T==p; T==n ->
  439. try gproc_lib:insert_many(T, g, L, Pid) of
  440. {true,Objs} -> {reply, true, [{insert,Objs}], S};
  441. false -> {reply, badarg, S}
  442. catch
  443. error:_ -> {reply, badarg, S}
  444. end;
  445. true -> {reply, badarg, S}
  446. end;
  447. handle_leader_call({munreg, T, g, L, Pid}, _From, S, _E) ->
  448. try gproc_lib:remove_many(T, g, L, Pid) of
  449. [] ->
  450. {reply, true, S};
  451. Objs ->
  452. {reply, true, [{delete, Objs, unreg}], S}
  453. catch
  454. error:_ -> {reply, badarg, S}
  455. end;
  456. handle_leader_call({set,{T,g,N} =K,V,Pid}, _From, S, _E) ->
  457. if T == a ->
  458. if is_integer(V) ->
  459. case gproc_lib:do_set_value(K, V, Pid) of
  460. true -> {reply, true, [{insert,[{{K,T},Pid,V}]}], S};
  461. false -> {reply, badarg, S}
  462. end
  463. end;
  464. T == c ->
  465. try gproc_lib:do_set_counter_value(K, V, Pid),
  466. AKey = {{a,g,N},a},
  467. Aggr = ets:lookup(?TAB, AKey), % may be []
  468. {reply, true, [{insert, [{{K,Pid},Pid,V} | Aggr]}], S}
  469. catch
  470. error:_ ->
  471. {reply, badarg, S}
  472. end;
  473. true ->
  474. case gproc_lib:do_set_value(K, V, Pid) of
  475. true ->
  476. Obj = if T==n -> {{K, T}, Pid, V};
  477. true -> {{K, Pid}, Pid, V}
  478. end,
  479. {reply, true, [{insert,[Obj]}], S};
  480. false ->
  481. {reply, badarg, S}
  482. end
  483. end;
  484. handle_leader_call({await, Key, Pid}, {_,Ref} = From, S, _E) ->
  485. %% The pid in _From is of the gen_leader instance that forwarded the
  486. %% call - not of the client. This is why the Pid is explicitly passed.
  487. %% case gproc_lib:await(Key, {Pid,Ref}) of
  488. case gproc_lib:await(Key, Pid, From) of
  489. {reply, {Ref, {K, P, V}}} ->
  490. {reply, {Ref, {K, P, V}}, S};
  491. {reply, Reply, Insert} ->
  492. {reply, Reply, [{insert, Insert}], S}
  493. end;
  494. handle_leader_call(_, _, S, _E) ->
  495. {reply, badarg, S}.
  496. handle_leader_cast({sync_reply, Node, Ref}, S, _E) ->
  497. #state{sync_requests = SReqs} = S,
  498. case lists:keyfind(Ref, 1, SReqs) of
  499. false ->
  500. %% This should never happen, except perhaps if the leader who
  501. %% received the sync request died, and the new leader gets the
  502. %% sync reply. In that case, we trust that the client has been
  503. %% notified anyway, and ignore the message.
  504. {ok, S};
  505. {_, Ns} ->
  506. case lists:delete(Node, Ns) of
  507. [] ->
  508. gen_leader:reply(Ref, {leader, reply, true}),
  509. {ok, S#state{sync_requests = lists:keydelete(Ref,1,SReqs)}};
  510. Ns1 ->
  511. SReqs1 = lists:keyreplace(Ref, 1, SReqs, {Ref, Ns1}),
  512. {ok, S#state{sync_requests = SReqs1}}
  513. end
  514. end;
  515. handle_leader_cast({add_globals, Missing}, S, _E) ->
  516. %% This is an audit message: a peer (non-leader) had info about granted
  517. %% global resources that we didn't know of when we became leader.
  518. %% This could happen due to a race condition when the old leader died.
  519. Update = insert_globals(Missing),
  520. {ok, [{insert, Update}], S};
  521. handle_leader_cast({remove_globals, Globals}, S, _E) ->
  522. delete_globals(Globals, []),
  523. {ok, S};
  524. handle_leader_cast({cancel_wait, Pid, {T,_,_} = Key, Ref}, S, _E) ->
  525. case ets:lookup(?TAB, {Key, T}) of
  526. [{_, Waiters}] ->
  527. Ops = gproc_lib:remove_wait(Key, Pid, Ref, Waiters),
  528. {ok, Ops, S};
  529. _ ->
  530. {ok, [], S}
  531. end;
  532. handle_leader_cast({cancel_wait_or_monitor, Pid, {T,_,_} = Key}, S, _E) ->
  533. case ets:lookup(?TAB, {Key, T}) of
  534. [{_, Waiters}] ->
  535. Ops = gproc_lib:remove_wait(Key, Pid, all, Waiters),
  536. {ok, Ops, S};
  537. [{_, OtherPid, _}] ->
  538. Ops = gproc_lib:remove_monitors(Key, OtherPid, Pid),
  539. {ok, Ops, S}
  540. end;
  541. handle_leader_cast({pid_is_DOWN, Pid}, S, _E) ->
  542. Globals = ets:select(?TAB, [{{{Pid,'$1'}, '_'},
  543. [{'==',{element,2,'$1'},g}],[{{'$1',Pid}}]}]),
  544. ets:delete(?TAB, {Pid,g}),
  545. case process_globals(Globals) of
  546. [] ->
  547. {ok, S};
  548. Broadcast ->
  549. {ok, Broadcast, S}
  550. end.
  551. mk_broadcast_insert_vals(Objs) ->
  552. lists:flatmap(
  553. fun({{C, g, Name} = K, Pid, Value}) ->
  554. if C == a ->
  555. ets:lookup(?TAB, {K,a}) ++ ets:lookup(?TAB, {Pid,K});
  556. C == c ->
  557. [{{K,Pid},Pid,Value} | ets:lookup(?TAB,{{a,g,Name},a})]
  558. ++ ets:lookup(?TAB, {Pid,K});
  559. C == n ->
  560. [{{K,n},Pid,Value}| ets:lookup(?TAB, {Pid,K})];
  561. true ->
  562. [{{K,Pid},Pid,Value} | ets:lookup(?TAB, {Pid,K})]
  563. end
  564. end, Objs).
  565. process_globals(Globals) ->
  566. Modified =
  567. lists:foldl(
  568. fun({{T,_,_} = Key, Pid}, A) ->
  569. A1 = case T of
  570. c ->
  571. Incr = ets:lookup_element(?TAB, {Key,Pid}, 3),
  572. update_aggr_counter(Key, -Incr) ++ A;
  573. _ ->
  574. A
  575. end,
  576. K = ets_key(Key, Pid),
  577. ets:delete(?TAB, K),
  578. remove_rev_entry(Pid, Key, unreg),
  579. A1
  580. end, [], Globals),
  581. [{insert, Modified} || Modified =/= []] ++
  582. [{delete, Globals, unreg} || Globals =/= []].
  583. remove_rev_entry(Pid, {T,g,_} = K, Event) when T==n; T==a ->
  584. Key = {Pid, K},
  585. _ = case ets:lookup(?TAB, Key) of
  586. [] -> ok;
  587. [{_, r}] -> ok;
  588. [{_, Opts}] when is_list(Opts) ->
  589. gproc_lib:notify(Event, K, Opts)
  590. end,
  591. ets:delete(?TAB, Key);
  592. remove_rev_entry(Pid, K, _Event) ->
  593. ets:delete(?TAB, {Pid, K}).
  594. code_change(_FromVsn, S, _Extra, _E) ->
  595. {ok, S}.
  596. terminate(_Reason, _S) ->
  597. ok.
  598. from_leader({sync, Ref}, S, _E) ->
  599. gen_leader:leader_cast(?MODULE, {sync_reply, node(), Ref}),
  600. {ok, S};
  601. from_leader(Ops, S, _E) ->
  602. lists:foreach(
  603. fun({delete, Globals, Event}) ->
  604. delete_globals(Globals, Event);
  605. ({insert, Globals}) ->
  606. _ = insert_globals(Globals)
  607. end, Ops),
  608. {ok, S}.
  609. insert_globals(Globals) ->
  610. ets:insert(?TAB, Globals),
  611. lists:foldl(
  612. fun({{{_,_,_} = Key,Pid}, Pid, _}, A) ->
  613. ets:insert_new(?TAB, {{Pid,Key}, []}),
  614. gproc_lib:ensure_monitor(Pid,g),
  615. A;
  616. ({{{_,_,_}, n}, Pid, _}, A) ->
  617. gproc_lib:ensure_monitor(Pid,g),
  618. A;
  619. ({{P,_K}, Opts} = Obj, A) when is_pid(P), is_list(Opts),Opts =/= [] ->
  620. ets:insert(?TAB, Obj),
  621. gproc_lib:ensure_monitor(P,g),
  622. [Obj] ++ A;
  623. (_Other, A) ->
  624. A
  625. end, Globals, Globals).
  626. delete_globals(Globals, Event) ->
  627. lists:foreach(
  628. fun({{_,g,_},T} = K) when is_atom(T) ->
  629. ets:delete(?TAB, K);
  630. ({Key, Pid}) when is_pid(Pid); Pid==shared ->
  631. K = ets_key(Key,Pid),
  632. ets:delete(?TAB, K),
  633. remove_rev_entry(Pid, Key, Event);
  634. ({Pid, Key}) when is_pid(Pid); Pid==shared ->
  635. K = ets_key(Key, Pid),
  636. ets:delete(?TAB, K),
  637. remove_rev_entry(Pid, Key, Event)
  638. end, Globals).
  639. ets_key({T,_,_} = K, _) when T==n; T==a ->
  640. {K, T};
  641. ets_key(K, Pid) ->
  642. {K, Pid}.
  643. leader_call(Req) ->
  644. case gen_leader:leader_call(?MODULE, Req) of
  645. badarg -> ?THROW_GPROC_ERROR(badarg);
  646. Reply -> Reply
  647. end.
  648. leader_cast(Msg) ->
  649. gen_leader:leader_cast(?MODULE, Msg).
  650. init(Opts) ->
  651. S0 = #state{},
  652. AlwaysBcast = proplists:get_value(always_broadcast, Opts,
  653. S0#state.always_broadcast),
  654. {ok, #state{always_broadcast = AlwaysBcast}}.
  655. surrendered_1(Globs) ->
  656. My_local_globs =
  657. ets:select(?TAB, [{{{{'_',g,'_'},'_'},'$1', '$2'},
  658. [{'==', {node,'$1'}, node()}],
  659. [{{ {element,1,{element,1,'$_'}}, '$1', '$2' }}]}]),
  660. _ = [gproc_lib:ensure_monitor(Pid, g) || {_, Pid, _} <- My_local_globs],
  661. %% remove all remote globals.
  662. ets:select_delete(?TAB, [{{{{'_',g,'_'},'_'}, '$1', '_'},
  663. [{'=/=', {node,'$1'}, node()}],
  664. [true]},
  665. {{{'$1',{'_',g,'_'}}, '_'},
  666. [{'=/=', {node,'$1'}, node()}],
  667. [true]}]),
  668. %% insert new non-local globals, collect the leader's version of
  669. %% what my globals are
  670. Ldr_local_globs =
  671. lists:foldl(
  672. fun({{Key,_}=K, Pid, V}, Acc) when node(Pid) =/= node() ->
  673. ets:insert(?TAB, {K, Pid, V}),
  674. _ = gproc_lib:ensure_monitor(Pid, g),
  675. ets:insert_new(?TAB, {{Pid,Key}, []}),
  676. Acc;
  677. ({{_Pid,_}=K, Opts}, Acc) -> % when node(Pid) =/= node() ->
  678. ets:insert(?TAB, {K, Opts}),
  679. Acc;
  680. ({_, Pid, _} = Obj, Acc) when node(Pid) == node() ->
  681. [Obj|Acc]
  682. end, [], Globs),
  683. case [{K,P,V} || {K,P,V} <- My_local_globs,
  684. is_pid(P) andalso
  685. not(lists:keymember(K, 1, Ldr_local_globs))] of
  686. [] ->
  687. %% phew! We have the same picture
  688. ok;
  689. [_|_] = Missing ->
  690. %% This is very unlikely, I think
  691. leader_cast({add_globals, mk_broadcast_insert_vals(Missing)})
  692. end,
  693. case [{K,P} || {K,P,_} <- Ldr_local_globs,
  694. is_pid(P) andalso
  695. not(lists:keymember(K, 1, My_local_globs))] of
  696. [] ->
  697. ok;
  698. [_|_] = Remove ->
  699. leader_cast({remove_globals, Remove})
  700. end.
  701. batch_update_counters(Cs) ->
  702. batch_update_counters(Cs, [], []).
  703. batch_update_counters([{{c,g,_} = Key, Pid, Incr}|T], Returns, Updates) ->
  704. case update_counter_g(Key, Incr, Pid) of
  705. [{_,_,_} = A, {_, _, V} = C] ->
  706. batch_update_counters(T, [{Key,Pid,V}|Returns], add_object(
  707. A, add_object(C, Updates)));
  708. [{_, _, V} = C] ->
  709. batch_update_counters(T, [{Key,Pid,V}|Returns], add_object(C, Updates))
  710. end;
  711. batch_update_counters([], Returns, Updates) ->
  712. {lists:reverse(Returns), Updates}.
  713. add_object({K,P,_} = Obj, [{K,P,_} | T]) ->
  714. [Obj | T];
  715. add_object(Obj, [H|T]) ->
  716. [H | add_object(Obj, T)];
  717. add_object(Obj, []) ->
  718. [Obj].
  719. update_counter_g({c,g,_} = Key, Incr, Pid) when is_integer(Incr) ->
  720. Res = ets:update_counter(?TAB, {Key, Pid}, {3,Incr}),
  721. update_aggr_counter(Key, Incr, [{{Key,Pid},Pid,Res}]);
  722. update_counter_g({c,g,_} = Key, {Incr, Threshold, SetValue}, Pid)
  723. when is_integer(Incr), is_integer(Threshold), is_integer(SetValue) ->
  724. [Prev, New] = ets:update_counter(?TAB, {Key, Pid},
  725. [{3, 0}, {3, Incr, Threshold, SetValue}]),
  726. update_aggr_counter(Key, New - Prev, [{{Key,Pid},Pid,New}]);
  727. update_counter_g({c,g,_} = Key, Ops, Pid) when is_list(Ops) ->
  728. case ets:update_counter(?TAB, {Key, Pid},
  729. [{3, 0} | expand_ops(Ops)]) of
  730. [_] ->
  731. [];
  732. [Prev | Rest] ->
  733. [New | _] = lists:reverse(Rest),
  734. update_aggr_counter(Key, New - Prev, [{Key, Pid, Rest}])
  735. end;
  736. update_counter_g(_, _, _) ->
  737. ?THROW_GPROC_ERROR(badarg).
  738. expand_ops([{Incr,Thr,SetV}|T])
  739. when is_integer(Incr), is_integer(Thr), is_integer(SetV) ->
  740. [{3, Incr, Thr, SetV}|expand_ops(T)];
  741. expand_ops([Incr|T]) when is_integer(Incr) ->
  742. [{3, Incr}|expand_ops(T)];
  743. expand_ops([]) ->
  744. [];
  745. expand_ops(_) ->
  746. ?THROW_GPROC_ERROR(badarg).
  747. update_aggr_counter({n,_,_}, _) ->
  748. [];
  749. update_aggr_counter(Key, Incr) ->
  750. update_aggr_counter(Key, Incr, []).
  751. update_aggr_counter({c,g,Ctr}, Incr, Acc) ->
  752. Key = {{a,g,Ctr},a},
  753. case ets:lookup(?TAB, Key) of
  754. [] ->
  755. Acc;
  756. [{K, Pid, Prev}] ->
  757. New = {K, Pid, Prev+Incr},
  758. ets:insert(?TAB, New),
  759. [New|Acc]
  760. end.
  761. pid_to_give_away_to(P) when is_pid(P) ->
  762. P;
  763. pid_to_give_away_to({T,g,_} = Key) when T==n; T==a ->
  764. case ets:lookup(?TAB, {Key, T}) of
  765. [{_, Pid, _}] ->
  766. Pid;
  767. _ ->
  768. undefined
  769. end.