gproc_dist.erl 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749
  1. %% ``The contents of this file are subject to the Erlang Public License,
  2. %% Version 1.1, (the "License"); you may not use this file except in
  3. %% compliance with the License. You should have received a copy of the
  4. %% Erlang Public License along with this software. If not, it can be
  5. %% retrieved via the world wide web at http://www.erlang.org/.
  6. %%
  7. %% Software distributed under the License is distributed on an "AS IS"
  8. %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  9. %% the License for the specific language governing rights and limitations
  10. %% under the License.
  11. %%
  12. %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
  13. %% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
  14. %% AB. All Rights Reserved.''
  15. %%
  16. %% @author Ulf Wiger <ulf@wiger.net>
  17. %%
  18. %% @doc Extended process registry
  19. %% <p>This module implements an extended process registry</p>
  20. %% <p>For a detailed description, see gproc/doc/erlang07-wiger.pdf.</p>
  21. %% @end
  22. -module(gproc_dist).
  23. -behaviour(gen_leader).
  24. -export([start_link/0, start_link/1,
  25. reg/1, reg/2, unreg/1,
  26. reg_or_locate/3,
  27. reg_shared/2, unreg_shared/1,
  28. mreg/2,
  29. munreg/2,
  30. set_value/2,
  31. give_away/2,
  32. update_counter/2,
  33. update_counters/1,
  34. update_shared_counter/2,
  35. reset_counter/1]).
  36. -export([leader_call/1,
  37. leader_cast/1,
  38. sync/0,
  39. get_leader/0]).
  40. %%% internal exports
  41. -export([init/1,
  42. handle_cast/3,
  43. handle_call/4,
  44. handle_info/2,
  45. handle_leader_call/4,
  46. handle_leader_cast/3,
  47. handle_DOWN/3,
  48. elected/2, % original version
  49. elected/3,
  50. surrendered/3,
  51. from_leader/3,
  52. code_change/4,
  53. terminate/2]).
  54. -include("gproc_int.hrl").
  55. -include("gproc.hrl").
  56. -define(SERVER, ?MODULE).
  57. -record(state, {
  58. always_broadcast = false,
  59. is_leader,
  60. sync_requests = []}).
  61. %% ==========================================================
  62. %% Start functions
  63. start_link() ->
  64. start_link({[node()|nodes()], []}).
  65. start_link(all) ->
  66. start_link({[node()|nodes()], [{bcast_type, all}]});
  67. start_link(Nodes) when is_list(Nodes) ->
  68. start_link({Nodes, []});
  69. start_link({Nodes, Opts}) ->
  70. SpawnOpts = gproc_lib:valid_opts(server_options, []),
  71. gen_leader:start_link(
  72. ?SERVER, Nodes, Opts, ?MODULE, [], [{spawn_opt, SpawnOpts}]).
  73. %% ==========================================================
  74. %% API
  75. %% {@see gproc:reg/1}
  76. %%
  77. reg(Key) ->
  78. reg(Key, gproc:default(Key)).
  79. %% {@see gproc:reg_or_locate/2}
  80. %%
  81. reg_or_locate({n,g,_} = Key, Value, Pid) when is_pid(Pid) ->
  82. leader_call({reg_or_locate, Key, Value, Pid});
  83. reg_or_locate(_, _, _) ->
  84. ?THROW_GPROC_ERROR(badarg).
  85. %%% @spec({Class,Scope, Key}, Value) -> true
  86. %%% @doc
  87. %%% Class = n - unique name
  88. %%% | p - non-unique property
  89. %%% | c - counter
  90. %%% | a - aggregated counter
  91. %%% Scope = l | g (global or local)
  92. %%% @end
  93. reg({_,g,_} = Key, Value) ->
  94. %% anything global
  95. leader_call({reg, Key, Value, self()});
  96. reg(_, _) ->
  97. ?THROW_GPROC_ERROR(badarg).
  98. reg_shared({_,g,_} = Key, Value) ->
  99. leader_call({reg, Key, Value, shared});
  100. reg_shared(_, _) ->
  101. ?THROW_GPROC_ERROR(badarg).
  102. mreg(T, KVL) ->
  103. if is_list(KVL) -> leader_call({mreg, T, g, KVL, self()});
  104. true -> ?THROW_GPROC_ERROR(badarg)
  105. end.
  106. munreg(T, Keys) ->
  107. if is_list(Keys) -> leader_call({munreg, T, g, Keys, self()});
  108. true -> ?THROW_GPROC_ERROR(badarg)
  109. end.
  110. unreg({_,g,_} = Key) ->
  111. leader_call({unreg, Key, self()});
  112. unreg(_) ->
  113. ?THROW_GPROC_ERROR(badarg).
  114. unreg_shared({T,g,_} = Key) when T==c; T==a ->
  115. leader_call({unreg, Key, shared});
  116. unreg_shared(_) ->
  117. ?THROW_GPROC_ERROR(badarg).
  118. set_value({T,g,_} = Key, Value) when T==a; T==c ->
  119. if is_integer(Value) ->
  120. leader_call({set, Key, Value});
  121. true ->
  122. ?THROW_GPROC_ERROR(badarg)
  123. end;
  124. set_value({_,g,_} = Key, Value) ->
  125. leader_call({set, Key, Value, self()});
  126. set_value(_, _) ->
  127. ?THROW_GPROC_ERROR(badarg).
  128. give_away({_,g,_} = Key, To) ->
  129. leader_call({give_away, Key, To, self()}).
  130. update_counter({c,g,_} = Key, Incr) when is_integer(Incr) ->
  131. leader_call({update_counter, Key, Incr, self()});
  132. update_counter(_, _) ->
  133. ?THROW_GPROC_ERROR(badarg).
  134. update_counters(List) when is_list(List) ->
  135. leader_call({update_counters, List});
  136. update_counters(_) ->
  137. ?THROW_GPROC_ERROR(badarg).
  138. update_shared_counter({c,g,_} = Key, Incr) when is_integer(Incr) ->
  139. leader_call({update_counter, Key, Incr, shared});
  140. update_shared_counter(_, _) ->
  141. ?THROW_GPROC_ERROR(badarg).
  142. reset_counter({c,g,_} = Key) ->
  143. leader_call({reset_counter, Key, self()});
  144. reset_counter(_) ->
  145. ?THROW_GPROC_ERROR(badarg).
  146. %% @spec sync() -> true
  147. %% @doc Synchronize with the gproc leader
  148. %%
  149. %% This function can be used to ensure that data has been replicated from the
  150. %% leader to the current node. It does so by asking the leader to ping all
  151. %% live participating nodes. The call will return `true' when all these nodes
  152. %% have either responded or died. In the special case where the leader dies
  153. %% during an ongoing sync, the call will fail with a timeout exception.
  154. %% (Actually, it should be a `leader_died' exception; more study needed to find
  155. %% out why gen_leader times out in this situation, rather than reporting that
  156. %% the leader died.)
  157. %% @end
  158. %%
  159. sync() ->
  160. leader_call(sync).
  161. %% @spec get_leader() -> node()
  162. %% @doc Returns the node of the current gproc leader.
  163. %% @end
  164. get_leader() ->
  165. gen_leader:call(?MODULE, get_leader).
  166. %% ==========================================================
  167. %% Server-side
  168. handle_cast(_Msg, S, _) ->
  169. {stop, unknown_cast, S}.
  170. handle_call(get_leader, _, S, E) ->
  171. {reply, gen_leader:leader_node(E), S};
  172. handle_call(_, _, S, _) ->
  173. {reply, badarg, S}.
  174. handle_info({'DOWN', _MRef, process, Pid, _}, S) ->
  175. leader_cast({pid_is_DOWN, Pid}),
  176. {ok, S};
  177. handle_info(_, S) ->
  178. {ok, S}.
  179. elected(S, _E) ->
  180. {ok, {globals,globs()}, S#state{is_leader = true}}.
  181. elected(S, _E, undefined) ->
  182. %% I have become leader; full synch
  183. {ok, {globals, globs()}, S#state{is_leader = true}};
  184. elected(S, _E, _Node) ->
  185. Synch = {globals, globs()},
  186. if not S#state.always_broadcast ->
  187. %% Another node recognized us as the leader.
  188. %% Don't broadcast all data to everyone else
  189. {reply, Synch, S};
  190. true ->
  191. %% Main reason for doing this is if we are using a gen_leader
  192. %% that doesn't support the 'reply' return value
  193. {ok, Synch, S}
  194. end.
  195. globs() ->
  196. ets:select(?TAB, [{{{{'_',g,'_'},'_'},'_','_'},[],['$_']}]).
  197. surrendered(S, {globals, Globs}, _E) ->
  198. %% globals from this node should be more correct in our table than
  199. %% in the leader's
  200. surrendered_1(Globs),
  201. {ok, S#state{is_leader = false}}.
  202. handle_DOWN(Node, S, _E) ->
  203. S1 = check_sync_requests(Node, S),
  204. Head = {{{'_',g,'_'},'_'},'$1','_'},
  205. Gs = [{'==', {node,'$1'},Node}],
  206. Globs = ets:select(?TAB, [{Head, Gs, [{{{element,1,{element,1,'$_'}},
  207. {element,2,'$_'}}}]}]),
  208. case process_globals(Globs) of
  209. [] ->
  210. {ok, S1};
  211. Broadcast ->
  212. {ok, Broadcast, S1}
  213. end.
  214. check_sync_requests(Node, #state{sync_requests = SReqs} = S) ->
  215. SReqs1 = lists:flatmap(
  216. fun({From, Ns}) ->
  217. case Ns -- [Node] of
  218. [] ->
  219. gen_leader:reply(From, {leader, reply, true}),
  220. [];
  221. Ns1 ->
  222. [{From, Ns1}]
  223. end
  224. end, SReqs),
  225. S#state{sync_requests = SReqs1}.
  226. handle_leader_call(sync, From, #state{sync_requests = SReqs} = S, E) ->
  227. case gen_leader:alive(E) -- [node()] of
  228. [] ->
  229. {reply, true, S};
  230. Alive ->
  231. gen_leader:broadcast({from_leader, {sync, From}}, Alive, E),
  232. {noreply, S#state{sync_requests = [{From, Alive}|SReqs]}}
  233. end;
  234. handle_leader_call({reg, {C,g,Name} = K, Value, Pid}, _From, S, _E) ->
  235. case gproc_lib:insert_reg(K, Value, Pid, g) of
  236. false ->
  237. {reply, badarg, S};
  238. true ->
  239. _ = gproc_lib:ensure_monitor(Pid,g),
  240. Vals =
  241. if C == a ->
  242. ets:lookup(?TAB, {K,a});
  243. C == c ->
  244. [{{K,Pid},Pid,Value} | ets:lookup(?TAB,{{a,g,Name},a})];
  245. C == n ->
  246. [{{K,n},Pid,Value}];
  247. true ->
  248. [{{K,Pid},Pid,Value}]
  249. end,
  250. {reply, true, [{insert, Vals}], S}
  251. end;
  252. handle_leader_call({reg_or_locate, {n,g,_} = K, Value, Pid}, _From, S, _E) ->
  253. case gproc_lib:insert_reg(K, Value, Pid, g) of
  254. false ->
  255. case ets:lookup(?TAB, {K,n}) of
  256. [{_, OtherPid, OtherVal}] ->
  257. {reply, {OtherPid, OtherVal}, S};
  258. [] ->
  259. {reply, badarg, S}
  260. end;
  261. true ->
  262. _ = gproc_lib:ensure_monitor(Pid,g),
  263. Vals = [{{K,n},Pid,Value}],
  264. {reply, {Pid, Value}, [{insert, Vals}], S}
  265. end;
  266. handle_leader_call({monitor, {T,g,_} = Key, Pid}, _From, S, _E)
  267. when T==n; T==a ->
  268. Ref = make_ref(),
  269. case gproc:where(Key) of
  270. undefined ->
  271. Pid ! {gproc, unreg, Ref, Key},
  272. {reply, Ref, [], S};
  273. RegPid ->
  274. NewRev =
  275. case ets:lookup_element(?TAB, K = {RegPid, Key}, 2) of
  276. r ->
  277. {K, [{monitor, [{Pid,Ref}]}]};
  278. Opts ->
  279. {K, gproc_lib:add_monitor(Opts, Pid, Ref)}
  280. end,
  281. ets:insert(?TAB, NewRev),
  282. {reply, Ref, [{insert, [NewRev]}], S}
  283. end;
  284. handle_leader_call({demonitor, {T,g,_} = Key, Ref, Pid}, _From, S, _E)
  285. when T==n; T==a ->
  286. case gproc:where(Key) of
  287. undefined ->
  288. {reply, ok, [], S};
  289. RegPid ->
  290. case ets:lookup_element(?TAB, K = {RegPid, Key}, 2) of
  291. r ->
  292. {reply, ok, [], S};
  293. Opts ->
  294. NewRev = {K, gproc_lib:remove_monitor(Opts, Pid, Ref)},
  295. ets:insert(?TAB, NewRev),
  296. {reply, Ref, [{insert, [NewRev]}], S}
  297. end
  298. end;
  299. handle_leader_call({update_counter, {c,g,_Ctr} = Key, Incr, Pid}, _From, S, _E)
  300. when is_integer(Incr) ->
  301. try New = ets:update_counter(?TAB, {Key, Pid}, {3,Incr}),
  302. Vals = [{{Key,Pid},Pid,New} | update_aggr_counter(Key, Incr)],
  303. {reply, New, [{insert, Vals}], S}
  304. catch
  305. error:_ ->
  306. {reply, badarg, S}
  307. end;
  308. handle_leader_call({update_counters, Cs}, _From, S, _E) ->
  309. try {Replies, Vals} = batch_update_counters(Cs),
  310. {reply, Replies, [{insert, Vals}], S}
  311. catch
  312. error:_ ->
  313. {reply, badarg, S}
  314. end;
  315. handle_leader_call({reset_counter, {c,g,_Ctr} = Key, Pid}, _From, S, _E) ->
  316. try Current = ets:lookup_element(?TAB, {Key, Pid}, 3),
  317. Initial = case ets:lookup_element(?TAB, {Pid, Key}, 2) of
  318. r -> 0;
  319. Opts when is_list(Opts) ->
  320. proplists:get_value(initial, Opts, 0)
  321. end,
  322. Incr = Initial - Current,
  323. New = ets:update_counter(?TAB, {Key, Pid}, {3, Incr}),
  324. Vals = [{{Key,Pid},Pid,New} | update_aggr_counter(Key, Incr)],
  325. {reply, {Current, New}, [{insert, Vals}], S}
  326. catch
  327. error:_R ->
  328. io:fwrite("reset_counter failed: ~p~n~p~n", [_R, erlang:get_stacktrace()]),
  329. {reply, badarg, S}
  330. end;
  331. handle_leader_call({unreg, {T,g,Name} = K, Pid}, _From, S, _E) ->
  332. Key = if T == n; T == a -> {K,T};
  333. true -> {K, Pid}
  334. end,
  335. case ets:member(?TAB, Key) of
  336. true ->
  337. _ = gproc_lib:remove_reg(K, Pid, unreg),
  338. if T == c ->
  339. case ets:lookup(?TAB, {{a,g,Name},a}) of
  340. [Aggr] ->
  341. %% updated by remove_reg/3
  342. {reply, true, [{delete,[Key, {Pid,K}], unreg},
  343. {insert, [Aggr]}], S};
  344. [] ->
  345. {reply, true, [{delete, [Key, {Pid,K}], unreg}], S}
  346. end;
  347. true ->
  348. {reply, true, [{delete, [Key, {Pid,K}], unreg}], S}
  349. end;
  350. false ->
  351. {reply, badarg, S}
  352. end;
  353. handle_leader_call({give_away, {T,g,_} = K, To, Pid}, _From, S, _E)
  354. when T == a; T == n ->
  355. Key = {K, T},
  356. case ets:lookup(?TAB, Key) of
  357. [{_, Pid, Value}] ->
  358. case pid_to_give_away_to(To) of
  359. Pid ->
  360. {reply, Pid, S};
  361. ToPid when is_pid(ToPid) ->
  362. ets:insert(?TAB, [{Key, ToPid, Value},
  363. {{ToPid,K}, []}]),
  364. _ = gproc_lib:ensure_monitor(ToPid, g),
  365. {reply, ToPid, [{delete, [Key, {Pid,K}], {migrated,ToPid}},
  366. {insert, [{Key, ToPid, Value}]}], S};
  367. undefined ->
  368. ets:delete(?TAB, Key),
  369. Rev = gproc_lib:remove_reverse_mapping(unreg, Pid, K),
  370. {reply, undefined, [{delete, [Key, Rev], unreg}], S}
  371. end;
  372. _ ->
  373. {reply, badarg, S}
  374. end;
  375. handle_leader_call({mreg, T, g, L, Pid}, _From, S, _E) ->
  376. if T==p; T==n ->
  377. try gproc_lib:insert_many(T, g, L, Pid) of
  378. {true,Objs} -> {reply, true, [{insert,Objs}], S};
  379. false -> {reply, badarg, S}
  380. catch
  381. error:_ -> {reply, badarg, S}
  382. end;
  383. true -> {reply, badarg, S}
  384. end;
  385. handle_leader_call({munreg, T, g, L, Pid}, _From, S, _E) ->
  386. try gproc_lib:remove_many(T, g, L, Pid) of
  387. [] ->
  388. {reply, true, S};
  389. Objs ->
  390. {reply, true, [{delete, Objs, unreg}], S}
  391. catch
  392. error:_ -> {reply, badarg, S}
  393. end;
  394. handle_leader_call({set,{T,g,N} =K,V,Pid}, _From, S, _E) ->
  395. if T == a ->
  396. if is_integer(V) ->
  397. case gproc_lib:do_set_value(K, V, Pid) of
  398. true -> {reply, true, [{insert,[{{K,T},Pid,V}]}], S};
  399. false -> {reply, badarg, S}
  400. end
  401. end;
  402. T == c ->
  403. try gproc_lib:do_set_counter_value(K, V, Pid),
  404. AKey = {{a,g,N},a},
  405. Aggr = ets:lookup(?TAB, AKey), % may be []
  406. {reply, true, [{insert, [{{K,Pid},Pid,V} | Aggr]}], S}
  407. catch
  408. error:_ ->
  409. {reply, badarg, S}
  410. end;
  411. true ->
  412. case gproc_lib:do_set_value(K, V, Pid) of
  413. true ->
  414. Obj = if T==n -> {{K, T}, Pid, V};
  415. true -> {{K, Pid}, Pid, V}
  416. end,
  417. {reply, true, [{insert,[Obj]}], S};
  418. false ->
  419. {reply, badarg, S}
  420. end
  421. end;
  422. handle_leader_call({await, Key, Pid}, {_,Ref} = From, S, _E) ->
  423. %% The pid in _From is of the gen_leader instance that forwarded the
  424. %% call - not of the client. This is why the Pid is explicitly passed.
  425. %% case gproc_lib:await(Key, {Pid,Ref}) of
  426. case gproc_lib:await(Key, Pid, From) of
  427. {reply, {Ref, {K, P, V}}} ->
  428. {reply, {Ref, {K, P, V}}, S};
  429. {reply, Reply, Insert} ->
  430. {reply, Reply, [{insert, Insert}], S}
  431. end;
  432. handle_leader_call(_, _, S, _E) ->
  433. {reply, badarg, S}.
  434. handle_leader_cast({sync_reply, Node, Ref}, S, _E) ->
  435. #state{sync_requests = SReqs} = S,
  436. case lists:keyfind(Ref, 1, SReqs) of
  437. false ->
  438. %% This should never happen, except perhaps if the leader who
  439. %% received the sync request died, and the new leader gets the
  440. %% sync reply. In that case, we trust that the client has been
  441. %% notified anyway, and ignore the message.
  442. {ok, S};
  443. {_, Ns} ->
  444. case lists:delete(Node, Ns) of
  445. [] ->
  446. gen_leader:reply(Ref, {leader, reply, true}),
  447. {ok, S#state{sync_requests = lists:keydelete(Ref,1,SReqs)}};
  448. Ns1 ->
  449. SReqs1 = lists:keyreplace(Ref, 1, SReqs, {Ref, Ns1}),
  450. {ok, S#state{sync_requests = SReqs1}}
  451. end
  452. end;
  453. handle_leader_cast({add_globals, Missing}, S, _E) ->
  454. %% This is an audit message: a peer (non-leader) had info about granted
  455. %% global resources that we didn't know of when we became leader.
  456. %% This could happen due to a race condition when the old leader died.
  457. ets:insert(?TAB, Missing),
  458. {ok, [{insert, Missing}], S};
  459. handle_leader_cast({remove_globals, Globals}, S, _E) ->
  460. delete_globals(Globals, []),
  461. {ok, S};
  462. handle_leader_cast({cancel_wait, Pid, {T,_,_} = Key, Ref}, S, _E) ->
  463. case ets:lookup(?TAB, {Key, T}) of
  464. [{_, Waiters}] ->
  465. Ops = gproc_lib:remove_wait(Key, Pid, Ref, Waiters),
  466. {ok, Ops, S};
  467. _ ->
  468. {ok, [], S}
  469. end;
  470. handle_leader_cast({cancel_wait_or_monitor, Pid, {T,_,_} = Key}, S, _E) ->
  471. case ets:lookup(?TAB, {Key, T}) of
  472. [{_, Waiters}] ->
  473. Ops = gproc_lib:remove_wait(Key, Pid, all, Waiters),
  474. {ok, Ops, S};
  475. [{_, OtherPid, _}] ->
  476. Ops = gproc_lib:remove_monitors(Key, OtherPid, Pid),
  477. {ok, Ops, S}
  478. end;
  479. handle_leader_cast({pid_is_DOWN, Pid}, S, _E) ->
  480. Globals = ets:select(?TAB, [{{{Pid,'$1'}, '_'},
  481. [{'==',{element,2,'$1'},g}],[{{'$1',Pid}}]}]),
  482. ets:delete(?TAB, {Pid,g}),
  483. case process_globals(Globals) of
  484. [] ->
  485. {ok, S};
  486. Broadcast ->
  487. {ok, Broadcast, S}
  488. end.
  489. process_globals(Globals) ->
  490. Modified =
  491. lists:foldl(
  492. fun({{T,_,_} = Key, Pid}, A) ->
  493. A1 = case T of
  494. c ->
  495. Incr = ets:lookup_element(?TAB, {Key,Pid}, 3),
  496. update_aggr_counter(Key, -Incr) ++ A;
  497. _ ->
  498. A
  499. end,
  500. K = ets_key(Key, Pid),
  501. ets:delete(?TAB, K),
  502. remove_rev_entry(Pid, Key, unreg),
  503. A1
  504. end, [], Globals),
  505. [{insert, Modified} || Modified =/= []] ++
  506. [{delete, Globals, unreg} || Globals =/= []].
  507. remove_rev_entry(Pid, {T,g,_} = K, Event) when T==n; T==a ->
  508. Key = {Pid, K},
  509. _ = case ets:lookup(?TAB, Key) of
  510. [] -> ok;
  511. [{_, r}] -> ok;
  512. [{_, Opts}] when is_list(Opts) ->
  513. gproc_lib:notify(Event, K, Opts)
  514. end,
  515. ets:delete(?TAB, Key);
  516. remove_rev_entry(Pid, K, _Event) ->
  517. ets:delete(?TAB, {Pid, K}).
  518. code_change(_FromVsn, S, _Extra, _E) ->
  519. {ok, S}.
  520. terminate(_Reason, _S) ->
  521. ok.
  522. from_leader({sync, Ref}, S, _E) ->
  523. gen_leader:leader_cast(?MODULE, {sync_reply, node(), Ref}),
  524. {ok, S};
  525. from_leader(Ops, S, _E) ->
  526. lists:foreach(
  527. fun({delete, Globals, Event}) ->
  528. delete_globals(Globals, Event);
  529. ({insert, Globals}) ->
  530. ets:insert(?TAB, Globals),
  531. lists:foreach(
  532. fun({{{_,g,_}=Key,_}, P, _}) ->
  533. ets:insert_new(?TAB, {{P,Key}, []}),
  534. gproc_lib:ensure_monitor(P,g);
  535. ({{P,_K}, _Opts} = Obj) when is_pid(P) ->
  536. ets:insert(?TAB, Obj),
  537. gproc_lib:ensure_monitor(P,g);
  538. (_) ->
  539. skip
  540. end, Globals)
  541. end, Ops),
  542. {ok, S}.
  543. delete_globals(Globals, Event) ->
  544. lists:foreach(
  545. fun({{_,g,_},T} = K) when is_atom(T) ->
  546. ets:delete(?TAB, K);
  547. ({Key, Pid}) when is_pid(Pid); Pid==shared ->
  548. K = ets_key(Key,Pid),
  549. ets:delete(?TAB, K),
  550. remove_rev_entry(Pid, Key, Event);
  551. ({Pid, Key}) when is_pid(Pid); Pid==shared ->
  552. K = ets_key(Key, Pid),
  553. ets:delete(?TAB, K),
  554. remove_rev_entry(Pid, Key, Event)
  555. end, Globals).
  556. ets_key({T,_,_} = K, _) when T==n; T==a ->
  557. {K, T};
  558. ets_key(K, Pid) ->
  559. {K, Pid}.
  560. leader_call(Req) ->
  561. case gen_leader:leader_call(?MODULE, Req) of
  562. badarg -> ?THROW_GPROC_ERROR(badarg);
  563. Reply -> Reply
  564. end.
  565. leader_cast(Msg) ->
  566. gen_leader:leader_cast(?MODULE, Msg).
  567. init(Opts) ->
  568. S0 = #state{},
  569. AlwaysBcast = proplists:get_value(always_broadcast, Opts,
  570. S0#state.always_broadcast),
  571. {ok, #state{always_broadcast = AlwaysBcast}}.
  572. surrendered_1(Globs) ->
  573. My_local_globs =
  574. ets:select(?TAB, [{{{{'_',g,'_'},'_'},'$1', '_'},
  575. [{'==', {node,'$1'}, node()}],
  576. ['$_']}]),
  577. %% remove all remote globals - we don't have monitors on them.
  578. ets:select_delete(?TAB, [{{{{'_',g,'_'},'_'}, '$1', '_'},
  579. [{'=/=', {node,'$1'}, node()}],
  580. [true]}]),
  581. %% insert new non-local globals, collect the leader's version of
  582. %% what my globals are
  583. Ldr_local_globs =
  584. lists:foldl(
  585. fun({{Key,_}=K, Pid, V}, Acc) when node(Pid) =/= node() ->
  586. ets:insert(?TAB, {K, Pid, V}),
  587. ets:insert_new(?TAB, {{Pid,Key}, r}),
  588. Acc;
  589. ({{Pid,_}=K, Opts}, Acc) when node(Pid) =/= node() ->
  590. ets:insert(?TAB, {K, Opts}),
  591. Acc;
  592. ({_, Pid, _} = Obj, Acc) when node(Pid) == node() ->
  593. [Obj|Acc]
  594. end, [], Globs),
  595. case [{K,P,V} || {K,P,V} <- My_local_globs,
  596. is_pid(P) andalso
  597. not(lists:keymember(K, 1, Ldr_local_globs))] of
  598. [] ->
  599. %% phew! We have the same picture
  600. ok;
  601. [_|_] = Missing ->
  602. %% This is very unlikely, I think
  603. leader_cast({add_globals, Missing})
  604. end,
  605. case [{K,P} || {K,P,_} <- Ldr_local_globs,
  606. is_pid(P) andalso
  607. not(lists:keymember(K, 1, My_local_globs))] of
  608. [] ->
  609. ok;
  610. [_|_] = Remove ->
  611. leader_cast({remove_globals, Remove})
  612. end.
  613. batch_update_counters(Cs) ->
  614. batch_update_counters(Cs, [], []).
  615. batch_update_counters([{{c,g,_} = Key, Pid, Incr}|T], Returns, Updates) ->
  616. case update_counter_g(Key, Incr, Pid) of
  617. [{_,_,_} = A, {_, _, V} = C] ->
  618. batch_update_counters(T, [{Key,Pid,V}|Returns], add_object(
  619. A, add_object(C, Updates)));
  620. [{_, _, V} = C] ->
  621. batch_update_counters(T, [{Key,Pid,V}|Returns], add_object(C, Updates))
  622. end;
  623. batch_update_counters([], Returns, Updates) ->
  624. {lists:reverse(Returns), Updates}.
  625. add_object({K,P,_} = Obj, [{K,P,_} | T]) ->
  626. [Obj | T];
  627. add_object(Obj, [H|T]) ->
  628. [H | add_object(Obj, T)];
  629. add_object(Obj, []) ->
  630. [Obj].
  631. update_counter_g({c,g,_} = Key, Incr, Pid) when is_integer(Incr) ->
  632. Res = ets:update_counter(?TAB, {Key, Pid}, {3,Incr}),
  633. update_aggr_counter(Key, Incr, [{{Key,Pid},Pid,Res}]);
  634. update_counter_g({c,g,_} = Key, {Incr, Threshold, SetValue}, Pid)
  635. when is_integer(Incr), is_integer(Threshold), is_integer(SetValue) ->
  636. [Prev, New] = ets:update_counter(?TAB, {Key, Pid},
  637. [{3, 0}, {3, Incr, Threshold, SetValue}]),
  638. update_aggr_counter(Key, New - Prev, [{{Key,Pid},Pid,New}]);
  639. update_counter_g({c,g,_} = Key, Ops, Pid) when is_list(Ops) ->
  640. case ets:update_counter(?TAB, {Key, Pid},
  641. [{3, 0} | expand_ops(Ops)]) of
  642. [_] ->
  643. [];
  644. [Prev | Rest] ->
  645. [New | _] = lists:reverse(Rest),
  646. update_aggr_counter(Key, New - Prev, [{Key, Pid, Rest}])
  647. end;
  648. update_counter_g(_, _, _) ->
  649. ?THROW_GPROC_ERROR(badarg).
  650. expand_ops([{Incr,Thr,SetV}|T])
  651. when is_integer(Incr), is_integer(Thr), is_integer(SetV) ->
  652. [{3, Incr, Thr, SetV}|expand_ops(T)];
  653. expand_ops([Incr|T]) when is_integer(Incr) ->
  654. [{3, Incr}|expand_ops(T)];
  655. expand_ops([]) ->
  656. [];
  657. expand_ops(_) ->
  658. ?THROW_GPROC_ERROR(badarg).
  659. update_aggr_counter(Key, Incr) ->
  660. update_aggr_counter(Key, Incr, []).
  661. update_aggr_counter({c,g,Ctr}, Incr, Acc) ->
  662. Key = {{a,g,Ctr},a},
  663. case ets:lookup(?TAB, Key) of
  664. [] ->
  665. Acc;
  666. [{K, Pid, Prev}] ->
  667. New = {K, Pid, Prev+Incr},
  668. ets:insert(?TAB, New),
  669. [New|Acc]
  670. end.
  671. pid_to_give_away_to(P) when is_pid(P) ->
  672. P;
  673. pid_to_give_away_to({T,g,_} = Key) when T==n; T==a ->
  674. case ets:lookup(?TAB, {Key, T}) of
  675. [{_, Pid, _}] ->
  676. Pid;
  677. _ ->
  678. undefined
  679. end.