gproc_dist.erl 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727
  1. %% ``The contents of this file are subject to the Erlang Public License,
  2. %% Version 1.1, (the "License"); you may not use this file except in
  3. %% compliance with the License. You should have received a copy of the
  4. %% Erlang Public License along with this software. If not, it can be
  5. %% retrieved via the world wide web at http://www.erlang.org/.
  6. %%
  7. %% Software distributed under the License is distributed on an "AS IS"
  8. %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  9. %% the License for the specific language governing rights and limitations
  10. %% under the License.
  11. %%
  12. %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
  13. %% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
  14. %% AB. All Rights Reserved.''
  15. %%
  16. %% @author Ulf Wiger <ulf.wiger@erlang-solutions.com>
  17. %%
  18. %% @doc Extended process registry
  19. %% <p>This module implements an extended process registry</p>
  20. %% <p>For a detailed description, see gproc/doc/erlang07-wiger.pdf.</p>
  21. %% @end
  22. -module(gproc_dist).
  23. -behaviour(gen_leader).
  24. -export([start_link/0, start_link/1,
  25. reg/1, reg/2, unreg/1,
  26. reg_shared/2, unreg_shared/1,
  27. mreg/2,
  28. munreg/2,
  29. set_value/2,
  30. give_away/2,
  31. update_counter/2,
  32. update_counters/1,
  33. update_shared_counter/2,
  34. reset_counter/1]).
  35. -export([leader_call/1,
  36. leader_cast/1,
  37. sync/0,
  38. get_leader/0]).
  39. %%% internal exports
  40. -export([init/1,
  41. handle_cast/3,
  42. handle_call/4,
  43. handle_info/2,
  44. handle_leader_call/4,
  45. handle_leader_cast/3,
  46. handle_DOWN/3,
  47. elected/2, % original version
  48. elected/3,
  49. surrendered/3,
  50. from_leader/3,
  51. code_change/4,
  52. terminate/2]).
  53. -include("gproc_int.hrl").
  54. -include("gproc.hrl").
  55. -define(SERVER, ?MODULE).
  56. -record(state, {
  57. always_broadcast = false,
  58. is_leader,
  59. sync_requests = []}).
  60. %% ==========================================================
  61. %% Start functions
  62. start_link() ->
  63. start_link({[node()|nodes()], []}).
  64. start_link(all) ->
  65. start_link({[node()|nodes()], [{bcast_type, all}]});
  66. start_link(Nodes) when is_list(Nodes) ->
  67. start_link({Nodes, []});
  68. start_link({Nodes, Opts}) ->
  69. SpawnOpts = gproc_lib:valid_opts(server_options, []),
  70. gen_leader:start_link(
  71. ?SERVER, Nodes, Opts, ?MODULE, [], [{spawn_opt, SpawnOpts}]).
  72. %% ==========================================================
  73. %% API
  74. %% {@see gproc:reg/1}
  75. %%
  76. reg(Key) ->
  77. reg(Key, gproc:default(Key)).
  78. %%% @spec({Class,Scope, Key}, Value) -> true
  79. %%% @doc
  80. %%% Class = n - unique name
  81. %%% | p - non-unique property
  82. %%% | c - counter
  83. %%% | a - aggregated counter
  84. %%% Scope = l | g (global or local)
  85. %%% @end
  86. reg({_,g,_} = Key, Value) ->
  87. %% anything global
  88. leader_call({reg, Key, Value, self()});
  89. reg(_, _) ->
  90. ?THROW_GPROC_ERROR(badarg).
  91. reg_shared({_,g,_} = Key, Value) ->
  92. leader_call({reg, Key, Value, shared});
  93. reg_shared(_, _) ->
  94. ?THROW_GPROC_ERROR(badarg).
  95. mreg(T, KVL) ->
  96. if is_list(KVL) -> leader_call({mreg, T, g, KVL, self()});
  97. true -> ?THROW_GPROC_ERROR(badarg)
  98. end.
  99. munreg(T, Keys) ->
  100. if is_list(Keys) -> leader_call({munreg, T, g, Keys, self()});
  101. true -> ?THROW_GPROC_ERROR(badarg)
  102. end.
  103. unreg({_,g,_} = Key) ->
  104. leader_call({unreg, Key, self()});
  105. unreg(_) ->
  106. ?THROW_GPROC_ERROR(badarg).
  107. unreg_shared({T,g,_} = Key) when T==c; T==a ->
  108. leader_call({unreg, Key, shared});
  109. unreg_shared(_) ->
  110. ?THROW_GPROC_ERROR(badarg).
  111. set_value({T,g,_} = Key, Value) when T==a; T==c ->
  112. if is_integer(Value) ->
  113. leader_call({set, Key, Value});
  114. true ->
  115. ?THROW_GPROC_ERROR(badarg)
  116. end;
  117. set_value({_,g,_} = Key, Value) ->
  118. leader_call({set, Key, Value, self()});
  119. set_value(_, _) ->
  120. ?THROW_GPROC_ERROR(badarg).
  121. give_away({_,g,_} = Key, To) ->
  122. leader_call({give_away, Key, To, self()}).
  123. update_counter({c,g,_} = Key, Incr) when is_integer(Incr) ->
  124. leader_call({update_counter, Key, Incr, self()});
  125. update_counter(_, _) ->
  126. ?THROW_GPROC_ERROR(badarg).
  127. update_counters(List) when is_list(List) ->
  128. leader_call({update_counters, List});
  129. update_counters(_) ->
  130. ?THROW_GPROC_ERROR(badarg).
  131. update_shared_counter({c,g,_} = Key, Incr) when is_integer(Incr) ->
  132. leader_call({update_counter, Key, Incr, shared});
  133. update_shared_counter(_, _) ->
  134. ?THROW_GPROC_ERROR(badarg).
  135. reset_counter({c,g,_} = Key) ->
  136. leader_call({reset_counter, Key, self()});
  137. reset_counter(_) ->
  138. ?THROW_GPROC_ERROR(badarg).
  139. %% @spec sync() -> true
  140. %% @doc Synchronize with the gproc leader
  141. %%
  142. %% This function can be used to ensure that data has been replicated from the
  143. %% leader to the current node. It does so by asking the leader to ping all
  144. %% live participating nodes. The call will return `true' when all these nodes
  145. %% have either responded or died. In the special case where the leader dies
  146. %% during an ongoing sync, the call will fail with a timeout exception.
  147. %% (Actually, it should be a `leader_died' exception; more study needed to find
  148. %% out why gen_leader times out in this situation, rather than reporting that
  149. %% the leader died.)
  150. %% @end
  151. %%
  152. sync() ->
  153. leader_call(sync).
  154. %% @spec get_leader() -> node()
  155. %% @doc Returns the node of the current gproc leader.
  156. %% @end
  157. get_leader() ->
  158. gen_leader:call(?MODULE, get_leader).
  159. %% ==========================================================
  160. %% Server-side
  161. handle_cast(_Msg, S, _) ->
  162. {stop, unknown_cast, S}.
  163. handle_call(get_leader, _, S, E) ->
  164. {reply, gen_leader:leader_node(E), S};
  165. handle_call(_, _, S, _) ->
  166. {reply, badarg, S}.
  167. handle_info({'DOWN', _MRef, process, Pid, _}, S) ->
  168. leader_cast({pid_is_DOWN, Pid}),
  169. {ok, S};
  170. handle_info(_, S) ->
  171. {ok, S}.
  172. elected(S, _E) ->
  173. {ok, {globals,globs()}, S#state{is_leader = true}}.
  174. elected(S, _E, undefined) ->
  175. %% I have become leader; full synch
  176. {ok, {globals, globs()}, S#state{is_leader = true}};
  177. elected(S, _E, _Node) ->
  178. Synch = {globals, globs()},
  179. if not S#state.always_broadcast ->
  180. %% Another node recognized us as the leader.
  181. %% Don't broadcast all data to everyone else
  182. {reply, Synch, S};
  183. true ->
  184. %% Main reason for doing this is if we are using a gen_leader
  185. %% that doesn't support the 'reply' return value
  186. {ok, Synch, S}
  187. end.
  188. globs() ->
  189. ets:select(?TAB, [{{{{'_',g,'_'},'_'},'_','_'},[],['$_']}]).
  190. surrendered(S, {globals, Globs}, _E) ->
  191. %% globals from this node should be more correct in our table than
  192. %% in the leader's
  193. surrendered_1(Globs),
  194. {ok, S#state{is_leader = false}}.
  195. handle_DOWN(Node, S, _E) ->
  196. S1 = check_sync_requests(Node, S),
  197. Head = {{{'_',g,'_'},'_'},'$1','_'},
  198. Gs = [{'==', {node,'$1'},Node}],
  199. Globs = ets:select(?TAB, [{Head, Gs, [{{{element,1,{element,1,'$_'}},
  200. {element,2,'$_'}}}]}]),
  201. case process_globals(Globs) of
  202. [] ->
  203. {ok, S1};
  204. Broadcast ->
  205. {ok, Broadcast, S1}
  206. end.
  207. check_sync_requests(Node, #state{sync_requests = SReqs} = S) ->
  208. SReqs1 = lists:flatmap(
  209. fun({From, Ns}) ->
  210. case Ns -- [Node] of
  211. [] ->
  212. gen_leader:reply(From, {leader, reply, true}),
  213. [];
  214. Ns1 ->
  215. [{From, Ns1}]
  216. end
  217. end, SReqs),
  218. S#state{sync_requests = SReqs1}.
  219. handle_leader_call(sync, From, #state{sync_requests = SReqs} = S, E) ->
  220. case gen_leader:alive(E) -- [node()] of
  221. [] ->
  222. {reply, true, S};
  223. Alive ->
  224. gen_leader:broadcast({from_leader, {sync, From}}, Alive, E),
  225. {noreply, S#state{sync_requests = [{From, Alive}|SReqs]}}
  226. end;
  227. handle_leader_call({reg, {C,g,Name} = K, Value, Pid}, _From, S, _E) ->
  228. case gproc_lib:insert_reg(K, Value, Pid, g) of
  229. false ->
  230. {reply, badarg, S};
  231. true ->
  232. _ = gproc_lib:ensure_monitor(Pid,g),
  233. Vals =
  234. if C == a ->
  235. ets:lookup(?TAB, {K,a});
  236. C == c ->
  237. [{{K,Pid},Pid,Value} | ets:lookup(?TAB,{{a,g,Name},a})];
  238. C == n ->
  239. [{{K,n},Pid,Value}];
  240. true ->
  241. [{{K,Pid},Pid,Value}]
  242. end,
  243. {reply, true, [{insert, Vals}], S}
  244. end;
  245. handle_leader_call({monitor, {T,g,_} = Key, Pid}, _From, S, _E)
  246. when T==n; T==a ->
  247. Ref = make_ref(),
  248. case gproc:where(Key) of
  249. undefined ->
  250. Pid ! {gproc, unreg, Ref, Key},
  251. {reply, Ref, [], S};
  252. RegPid ->
  253. NewRev =
  254. case ets:lookup_element(?TAB, K = {RegPid, Key}, 2) of
  255. r ->
  256. {K, [{monitor, [{Pid,Ref}]}]};
  257. Opts ->
  258. {K, gproc_lib:add_monitor(Opts, Pid, Ref)}
  259. end,
  260. ets:insert(?TAB, NewRev),
  261. {reply, Ref, [{insert, [NewRev]}], S}
  262. end;
  263. handle_leader_call({demonitor, {T,g,_} = Key, Ref, Pid}, _From, S, _E)
  264. when T==n; T==a ->
  265. case gproc:where(Key) of
  266. undefined ->
  267. {reply, ok, [], S};
  268. RegPid ->
  269. case ets:lookup_element(?TAB, K = {RegPid, Key}, 2) of
  270. r ->
  271. {reply, ok, [], S};
  272. Opts ->
  273. NewRev = {K, gproc_lib:remove_monitor(Opts, Pid, Ref)},
  274. ets:insert(?TAB, NewRev),
  275. {reply, Ref, [{insert, [NewRev]}], S}
  276. end
  277. end;
  278. handle_leader_call({update_counter, {c,g,_Ctr} = Key, Incr, Pid}, _From, S, _E)
  279. when is_integer(Incr) ->
  280. try New = ets:update_counter(?TAB, {Key, Pid}, {3,Incr}),
  281. Vals = [{{Key,Pid},Pid,New} | update_aggr_counter(Key, Incr)],
  282. {reply, New, [{insert, Vals}], S}
  283. catch
  284. error:_ ->
  285. {reply, badarg, S}
  286. end;
  287. handle_leader_call({update_counters, Cs}, _From, S, _E) ->
  288. try {Replies, Vals} = batch_update_counters(Cs),
  289. {reply, Replies, [{insert, Vals}], S}
  290. catch
  291. error:_ ->
  292. {reply, badarg, S}
  293. end;
  294. handle_leader_call({reset_counter, {c,g,_Ctr} = Key, Pid}, _From, S, _E) ->
  295. try Current = ets:lookup_element(?TAB, {Key, Pid}, 3),
  296. Initial = case ets:lookup_element(?TAB, {Pid, Key}, 2) of
  297. r -> 0;
  298. Opts when is_list(Opts) ->
  299. proplists:get_value(initial, Opts, 0)
  300. end,
  301. Incr = Initial - Current,
  302. New = ets:update_counter(?TAB, {Key, Pid}, {3, Incr}),
  303. Vals = [{{Key,Pid},Pid,New} | update_aggr_counter(Key, Incr)],
  304. {reply, {Current, New}, [{insert, Vals}], S}
  305. catch
  306. error:_R ->
  307. io:fwrite("reset_counter failed: ~p~n~p~n", [_R, erlang:get_stacktrace()]),
  308. {reply, badarg, S}
  309. end;
  310. handle_leader_call({unreg, {T,g,Name} = K, Pid}, _From, S, _E) ->
  311. Key = if T == n; T == a -> {K,T};
  312. true -> {K, Pid}
  313. end,
  314. case ets:member(?TAB, Key) of
  315. true ->
  316. _ = gproc_lib:remove_reg(K, Pid, unreg),
  317. if T == c ->
  318. case ets:lookup(?TAB, {{a,g,Name},a}) of
  319. [Aggr] ->
  320. %% updated by remove_reg/3
  321. {reply, true, [{delete,[Key, {Pid,K}], unreg},
  322. {insert, [Aggr]}], S};
  323. [] ->
  324. {reply, true, [{delete, [Key, {Pid,K}], unreg}], S}
  325. end;
  326. true ->
  327. {reply, true, [{delete, [Key, {Pid,K}], unreg}], S}
  328. end;
  329. false ->
  330. {reply, badarg, S}
  331. end;
  332. handle_leader_call({give_away, {T,g,_} = K, To, Pid}, _From, S, _E)
  333. when T == a; T == n ->
  334. Key = {K, T},
  335. case ets:lookup(?TAB, Key) of
  336. [{_, Pid, Value}] ->
  337. case pid_to_give_away_to(To) of
  338. Pid ->
  339. {reply, Pid, S};
  340. ToPid when is_pid(ToPid) ->
  341. ets:insert(?TAB, [{Key, ToPid, Value},
  342. {{ToPid,K}, []}]),
  343. _ = gproc_lib:ensure_monitor(ToPid, g),
  344. {reply, ToPid, [{delete, [Key, {Pid,K}], {migrated,ToPid}},
  345. {insert, [{Key, ToPid, Value}]}], S};
  346. undefined ->
  347. ets:delete(?TAB, Key),
  348. Rev = gproc_lib:remove_reverse_mapping(unreg, Pid, K),
  349. {reply, undefined, [{delete, [Key, Rev], unreg}], S}
  350. end;
  351. _ ->
  352. {reply, badarg, S}
  353. end;
  354. handle_leader_call({mreg, T, g, L, Pid}, _From, S, _E) ->
  355. if T==p; T==n ->
  356. try gproc_lib:insert_many(T, g, L, Pid) of
  357. {true,Objs} -> {reply, true, [{insert,Objs}], S};
  358. false -> {reply, badarg, S}
  359. catch
  360. error:_ -> {reply, badarg, S}
  361. end;
  362. true -> {reply, badarg, S}
  363. end;
  364. handle_leader_call({munreg, T, g, L, Pid}, _From, S, _E) ->
  365. try gproc_lib:remove_many(T, g, L, Pid) of
  366. [] ->
  367. {reply, true, S};
  368. Objs ->
  369. {reply, true, [{delete, Objs, unreg}], S}
  370. catch
  371. error:_ -> {reply, badarg, S}
  372. end;
  373. handle_leader_call({set,{T,g,N} =K,V,Pid}, _From, S, _E) ->
  374. if T == a ->
  375. if is_integer(V) ->
  376. case gproc_lib:do_set_value(K, V, Pid) of
  377. true -> {reply, true, [{insert,[{{K,T},Pid,V}]}], S};
  378. false -> {reply, badarg, S}
  379. end
  380. end;
  381. T == c ->
  382. try gproc_lib:do_set_counter_value(K, V, Pid),
  383. AKey = {{a,g,N},a},
  384. Aggr = ets:lookup(?TAB, AKey), % may be []
  385. {reply, true, [{insert, [{{K,Pid},Pid,V} | Aggr]}], S}
  386. catch
  387. error:_ ->
  388. {reply, badarg, S}
  389. end;
  390. true ->
  391. case gproc_lib:do_set_value(K, V, Pid) of
  392. true ->
  393. Obj = if T==n -> {{K, T}, Pid, V};
  394. true -> {{K, Pid}, Pid, V}
  395. end,
  396. {reply, true, [{insert,[Obj]}], S};
  397. false ->
  398. {reply, badarg, S}
  399. end
  400. end;
  401. handle_leader_call({await, Key, Pid}, {_,Ref} = From, S, _E) ->
  402. %% The pid in _From is of the gen_leader instance that forwarded the
  403. %% call - not of the client. This is why the Pid is explicitly passed.
  404. %% case gproc_lib:await(Key, {Pid,Ref}) of
  405. case gproc_lib:await(Key, Pid, From) of
  406. {reply, {Ref, {K, P, V}}} ->
  407. {reply, {Ref, {K, P, V}}, S};
  408. {reply, Reply, Insert} ->
  409. {reply, Reply, [{insert, Insert}], S}
  410. end;
  411. handle_leader_call(_, _, S, _E) ->
  412. {reply, badarg, S}.
  413. handle_leader_cast({sync_reply, Node, Ref}, S, _E) ->
  414. #state{sync_requests = SReqs} = S,
  415. case lists:keyfind(Ref, 1, SReqs) of
  416. false ->
  417. %% This should never happen, except perhaps if the leader who
  418. %% received the sync request died, and the new leader gets the
  419. %% sync reply. In that case, we trust that the client has been
  420. %% notified anyway, and ignore the message.
  421. {ok, S};
  422. {_, Ns} ->
  423. case lists:delete(Node, Ns) of
  424. [] ->
  425. gen_leader:reply(Ref, {leader, reply, true}),
  426. {ok, S#state{sync_requests = lists:keydelete(Ref,1,SReqs)}};
  427. Ns1 ->
  428. SReqs1 = lists:keyreplace(Ref, 1, SReqs, {Ref, Ns1}),
  429. {ok, S#state{sync_requests = SReqs1}}
  430. end
  431. end;
  432. handle_leader_cast({add_globals, Missing}, S, _E) ->
  433. %% This is an audit message: a peer (non-leader) had info about granted
  434. %% global resources that we didn't know of when we became leader.
  435. %% This could happen due to a race condition when the old leader died.
  436. ets:insert(?TAB, Missing),
  437. {ok, [{insert, Missing}], S};
  438. handle_leader_cast({remove_globals, Globals}, S, _E) ->
  439. delete_globals(Globals, []),
  440. {ok, S};
  441. handle_leader_cast({cancel_wait, Pid, {T,_,_} = Key, Ref}, S, _E) ->
  442. case ets:lookup(?TAB, {Key, T}) of
  443. [{_, Waiters}] ->
  444. Ops = gproc_lib:remove_wait(Key, Pid, Ref, Waiters),
  445. {ok, Ops, S};
  446. _ ->
  447. {ok, [], S}
  448. end;
  449. handle_leader_cast({cancel_wait_or_monitor, Pid, {T,_,_} = Key}, S, _E) ->
  450. case ets:lookup(?TAB, {Key, T}) of
  451. [{_, Waiters}] ->
  452. Ops = gproc_lib:remove_wait(Key, Pid, all, Waiters),
  453. {ok, Ops, S};
  454. [{_, OtherPid, _}] ->
  455. Ops = gproc_lib:remove_monitors(Key, OtherPid, Pid),
  456. {ok, Ops, S}
  457. end;
  458. handle_leader_cast({pid_is_DOWN, Pid}, S, _E) ->
  459. Globals = ets:select(?TAB, [{{{Pid,'$1'}, '_'},
  460. [{'==',{element,2,'$1'},g}],[{{'$1',Pid}}]}]),
  461. ets:delete(?TAB, {Pid,g}),
  462. case process_globals(Globals) of
  463. [] ->
  464. {ok, S};
  465. Broadcast ->
  466. {ok, Broadcast, S}
  467. end.
  468. process_globals(Globals) ->
  469. Modified =
  470. lists:foldl(
  471. fun({{T,_,_} = Key, Pid}, A) ->
  472. A1 = case T of
  473. c ->
  474. Incr = ets:lookup_element(?TAB, {Key,Pid}, 3),
  475. update_aggr_counter(Key, -Incr) ++ A;
  476. _ ->
  477. A
  478. end,
  479. K = ets_key(Key, Pid),
  480. ets:delete(?TAB, K),
  481. remove_rev_entry(Pid, Key, unreg),
  482. A1
  483. end, [], Globals),
  484. [{insert, Modified} || Modified =/= []] ++
  485. [{delete, Globals, unreg} || Globals =/= []].
  486. remove_rev_entry(Pid, {T,g,_} = K, Event) when T==n; T==a ->
  487. Key = {Pid, K},
  488. _ = case ets:lookup(?TAB, Key) of
  489. [] -> ok;
  490. [{_, r}] -> ok;
  491. [{_, Opts}] when is_list(Opts) ->
  492. gproc_lib:notify(Event, K, Opts)
  493. end,
  494. ets:delete(?TAB, Key);
  495. remove_rev_entry(Pid, K, _Event) ->
  496. ets:delete(?TAB, {Pid, K}).
  497. code_change(_FromVsn, S, _Extra, _E) ->
  498. {ok, S}.
  499. terminate(_Reason, _S) ->
  500. ok.
  501. from_leader({sync, Ref}, S, _E) ->
  502. gen_leader:leader_cast(?MODULE, {sync_reply, node(), Ref}),
  503. {ok, S};
  504. from_leader(Ops, S, _E) ->
  505. lists:foreach(
  506. fun({delete, Globals, Event}) ->
  507. delete_globals(Globals, Event);
  508. ({insert, Globals}) ->
  509. ets:insert(?TAB, Globals),
  510. lists:foreach(
  511. fun({{{_,g,_}=Key,_}, P, _}) ->
  512. ets:insert_new(?TAB, {{P,Key}, []}),
  513. gproc_lib:ensure_monitor(P,g);
  514. ({{P,_K}, _Opts} = Obj) when is_pid(P) ->
  515. ets:insert(?TAB, Obj),
  516. gproc_lib:ensure_monitor(P,g);
  517. (_) ->
  518. skip
  519. end, Globals)
  520. end, Ops),
  521. {ok, S}.
  522. delete_globals(Globals, Event) ->
  523. lists:foreach(
  524. fun({{_,g,_},T} = K) when is_atom(T) ->
  525. ets:delete(?TAB, K);
  526. ({Key, Pid}) when is_pid(Pid); Pid==shared ->
  527. K = ets_key(Key,Pid),
  528. ets:delete(?TAB, K),
  529. remove_rev_entry(Pid, Key, Event);
  530. ({Pid, Key}) when is_pid(Pid); Pid==shared ->
  531. K = ets_key(Key, Pid),
  532. ets:delete(?TAB, K),
  533. remove_rev_entry(Pid, Key, Event)
  534. end, Globals).
  535. ets_key({T,_,_} = K, _) when T==n; T==a ->
  536. {K, T};
  537. ets_key(K, Pid) ->
  538. {K, Pid}.
  539. leader_call(Req) ->
  540. case gen_leader:leader_call(?MODULE, Req) of
  541. badarg -> ?THROW_GPROC_ERROR(badarg);
  542. Reply -> Reply
  543. end.
  544. leader_cast(Msg) ->
  545. gen_leader:leader_cast(?MODULE, Msg).
  546. init(Opts) ->
  547. S0 = #state{},
  548. AlwaysBcast = proplists:get_value(always_broadcast, Opts,
  549. S0#state.always_broadcast),
  550. {ok, #state{always_broadcast = AlwaysBcast}}.
  551. surrendered_1(Globs) ->
  552. My_local_globs =
  553. ets:select(?TAB, [{{{{'_',g,'_'},'_'},'$1', '_'},
  554. [{'==', {node,'$1'}, node()}],
  555. ['$_']}]),
  556. %% remove all remote globals - we don't have monitors on them.
  557. ets:select_delete(?TAB, [{{{{'_',g,'_'},'_'}, '$1', '_'},
  558. [{'=/=', {node,'$1'}, node()}],
  559. [true]}]),
  560. %% insert new non-local globals, collect the leader's version of
  561. %% what my globals are
  562. Ldr_local_globs =
  563. lists:foldl(
  564. fun({{Key,_}=K, Pid, V}, Acc) when node(Pid) =/= node() ->
  565. ets:insert(?TAB, {K, Pid, V}),
  566. ets:insert_new(?TAB, {{Pid,Key}, r}),
  567. Acc;
  568. ({{Pid,_}=K, Opts}, Acc) when node(Pid) =/= node() ->
  569. ets:insert(?TAB, {K, Opts}),
  570. Acc;
  571. ({_, Pid, _} = Obj, Acc) when node(Pid) == node() ->
  572. [Obj|Acc]
  573. end, [], Globs),
  574. case [{K,P,V} || {K,P,V} <- My_local_globs,
  575. is_pid(P) andalso
  576. not(lists:keymember(K, 1, Ldr_local_globs))] of
  577. [] ->
  578. %% phew! We have the same picture
  579. ok;
  580. [_|_] = Missing ->
  581. %% This is very unlikely, I think
  582. leader_cast({add_globals, Missing})
  583. end,
  584. case [{K,P} || {K,P,_} <- Ldr_local_globs,
  585. is_pid(P) andalso
  586. not(lists:keymember(K, 1, My_local_globs))] of
  587. [] ->
  588. ok;
  589. [_|_] = Remove ->
  590. leader_cast({remove_globals, Remove})
  591. end.
  592. batch_update_counters(Cs) ->
  593. batch_update_counters(Cs, [], []).
  594. batch_update_counters([{{c,g,_} = Key, Pid, Incr}|T], Returns, Updates) ->
  595. case update_counter_g(Key, Incr, Pid) of
  596. [{_,_,_} = A, {_, _, V} = C] ->
  597. batch_update_counters(T, [V|Returns], add_object(
  598. A, add_object(C, Updates)));
  599. [{_, _, V} = C] ->
  600. batch_update_counters(T, [V|Returns], add_object(C, Updates))
  601. end;
  602. batch_update_counters([], Returns, Updates) ->
  603. {lists:reverse(Returns), Updates}.
  604. add_object({K,P,_} = Obj, [{K,P,_} | T]) ->
  605. [Obj | T];
  606. add_object(Obj, [H|T]) ->
  607. [H | add_object(Obj, T)];
  608. add_object(Obj, []) ->
  609. [Obj].
  610. update_counter_g({c,g,_} = Key, Incr, Pid) when is_integer(Incr) ->
  611. Res = ets:update_counter(?TAB, {Key, Pid}, {3,Incr}),
  612. update_aggr_counter(Key, Incr, [{{Key,Pid},Pid,Res}]);
  613. update_counter_g({c,g,_} = Key, {Incr, Threshold, SetValue}, Pid)
  614. when is_integer(Incr), is_integer(Threshold), is_integer(SetValue) ->
  615. [Prev, New] = ets:update_counter(?TAB, {Key, Pid},
  616. [{3, 0}, {3, Incr, Threshold, SetValue}]),
  617. update_aggr_counter(Key, New - Prev, [{{Key,Pid},Pid,New}]);
  618. update_counter_g({c,l,_} = Key, Ops, Pid) when is_list(Ops) ->
  619. case ets:update_counter(?TAB, {Key, Pid},
  620. [{3, 0} | expand_ops(Ops)]) of
  621. [_] ->
  622. [];
  623. [Prev | Rest] ->
  624. [New | _] = lists:reverse(Rest),
  625. update_aggr_counter(Key, New - Prev, [{Key, Pid, Rest}])
  626. end;
  627. update_counter_g(_, _, _) ->
  628. ?THROW_GPROC_ERROR(badarg).
  629. expand_ops([{Incr,Thr,SetV}|T])
  630. when is_integer(Incr), is_integer(Thr), is_integer(SetV) ->
  631. [{3, Incr, Thr, SetV}|expand_ops(T)];
  632. expand_ops([Incr|T]) when is_integer(Incr) ->
  633. [{3, Incr}|expand_ops(T)];
  634. expand_ops([]) ->
  635. [];
  636. expand_ops(_) ->
  637. ?THROW_GPROC_ERROR(badarg).
  638. update_aggr_counter(Key, Incr) ->
  639. update_aggr_counter(Key, Incr, []).
  640. update_aggr_counter({c,g,Ctr}, Incr, Acc) ->
  641. Key = {{a,g,Ctr},a},
  642. case ets:lookup(?TAB, Key) of
  643. [] ->
  644. Acc;
  645. [{K, Pid, Prev}] ->
  646. New = {K, Pid, Prev+Incr},
  647. ets:insert(?TAB, New),
  648. [New|Acc]
  649. end.
  650. pid_to_give_away_to(P) when is_pid(P) ->
  651. P;
  652. pid_to_give_away_to({T,g,_} = Key) when T==n; T==a ->
  653. case ets:lookup(?TAB, {Key, T}) of
  654. [{_, Pid, _}] ->
  655. Pid;
  656. _ ->
  657. undefined
  658. end.