gproc_dist.erl 35 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072
  1. %% ``The contents of this file are subject to the Erlang Public License,
  2. %% Version 1.1, (the "License"); you may not use this file except in
  3. %% compliance with the License. You should have received a copy of the
  4. %% Erlang Public License along with this software. If not, it can be
  5. %% retrieved via the world wide web at http://www.erlang.org/.
  6. %%
  7. %% Software distributed under the License is distributed on an "AS IS"
  8. %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  9. %% the License for the specific language governing rights and limitations
  10. %% under the License.
  11. %%
  12. %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
  13. %% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
  14. %% AB. All Rights Reserved.''
  15. %%
  16. %% @author Ulf Wiger <ulf@wiger.net>
  17. %%
  18. %% @doc Extended process registry
  19. %% <p>This module implements an extended process registry</p>
  20. %% <p>For a detailed description, see gproc/doc/erlang07-wiger.pdf.</p>
  21. %% @end
  22. -module(gproc_dist).
  23. -behaviour(gen_leader).
  24. -export([start_link/0, start_link/1,
  25. reg/1, reg/4, unreg/1,
  26. reg_other/5, unreg_other/2,
  27. reg_or_locate/3,
  28. reg_shared/3, unreg_shared/1,
  29. monitor/2,
  30. demonitor/2,
  31. set_attributes/2,
  32. set_attributes_shared/2,
  33. mreg/2,
  34. munreg/2,
  35. set_value/2,
  36. set_value_shared/2,
  37. give_away/2,
  38. update_counter/3,
  39. update_counters/1,
  40. update_shared_counter/2,
  41. reset_counter/1]).
  42. -export([leader_call/1,
  43. leader_cast/1,
  44. sync/0,
  45. get_leader/0]).
  46. %%% internal exports
  47. -export([init/1,
  48. handle_cast/3,
  49. handle_call/4,
  50. handle_info/2, handle_info/3,
  51. handle_leader_call/4,
  52. handle_leader_cast/3,
  53. handle_DOWN/3,
  54. elected/2, % original version
  55. elected/3,
  56. surrendered/3,
  57. from_leader/3,
  58. code_change/4,
  59. terminate/2]).
  60. -include("gproc_int.hrl").
  61. -include("gproc.hrl").
  62. -define(SERVER, ?MODULE).
  63. -record(state, {
  64. always_broadcast = false,
  65. is_leader,
  66. sync_requests = []}).
  67. -include("gproc_trace.hrl").
  68. %% ==========================================================
  69. %% Start functions
  70. start_link() ->
  71. start_link({[node()|nodes()], []}).
  72. start_link(all) ->
  73. start_link({[node()|nodes()], [{bcast_type, all}]});
  74. start_link(Nodes) when is_list(Nodes) ->
  75. start_link({Nodes, []});
  76. start_link({Nodes, Opts}) ->
  77. SpawnOpts = gproc_lib:valid_opts(server_options, []),
  78. gen_leader:start_link(
  79. ?SERVER, Nodes, Opts, ?MODULE, [], [{spawn_opt, SpawnOpts}]).
  80. %% ==========================================================
  81. %% API
  82. %% {@see gproc:reg/1}
  83. %%
  84. reg(Key) ->
  85. reg(Key, gproc:default(Key), [], reg).
  86. %% {@see gproc:reg_or_locate/2}
  87. %%
  88. reg_or_locate({n,g,_} = Key, Value, Pid) when is_pid(Pid) ->
  89. leader_call({reg_or_locate, Key, Value, Pid});
  90. reg_or_locate({n,g,_} = Key, Value, F) when is_function(F, 0) ->
  91. MyGroupLeader = group_leader(),
  92. leader_call({reg_or_locate, Key, Value,
  93. fun() ->
  94. %% leader will spawn on caller's node
  95. group_leader(MyGroupLeader, self()),
  96. F()
  97. end});
  98. reg_or_locate(_, _, _) ->
  99. ?THROW_GPROC_ERROR(badarg).
  100. %%% @spec({Class,g, Key}, Value) -> true
  101. %%% @doc
  102. %%% Class = n - unique name
  103. %%% | p - non-unique property
  104. %%% | c - counter
  105. %%% | a - aggregated counter
  106. %%% | r - resource property
  107. %%% | rc - resource counter
  108. %%% @end
  109. reg({_,g,_} = Key, Value, Attrs, Op) ->
  110. %% anything global
  111. leader_call({reg, Key, Value, self(), Attrs, Op});
  112. reg(_, _, _, _) ->
  113. ?THROW_GPROC_ERROR(badarg).
  114. %% @spec ({Class,g,Key}, pid(), Value, Attrs, Op::reg | unreg) -> true
  115. %% @doc
  116. %% Class = n - unique name
  117. %% | a - aggregated counter
  118. %% | r - resource property
  119. %% | rc - resource counter
  120. %% Value = term()
  121. %% Attrs = [{Key, Value}]
  122. %% @end
  123. reg_other({T,g,_} = Key, Pid, Value, Attrs, Op) when is_pid(Pid) ->
  124. if T==n; T==a; T==r; T==rc ->
  125. leader_call({reg_other, Key, Value, Pid, Attrs, Op});
  126. true ->
  127. ?THROW_GPROC_ERROR(badarg)
  128. end;
  129. reg_other(_, _, _, _, _) ->
  130. ?THROW_GPROC_ERROR(badarg).
  131. unreg_other({T,g,_} = Key, Pid) when is_pid(Pid) ->
  132. if T==n; T==a; T==r; T==rc ->
  133. leader_call({unreg_other, Key, Pid});
  134. true ->
  135. ?THROW_GPROC_ERROR(badarg)
  136. end;
  137. unreg_other(_, _) ->
  138. ?THROW_GPROC_ERROR(badarg).
  139. reg_shared({_,g,_} = Key, Value, Attrs) ->
  140. leader_call({reg, Key, Value, shared, Attrs, reg});
  141. reg_shared(_, _, _) ->
  142. ?THROW_GPROC_ERROR(badarg).
  143. monitor({_,g,_} = Key, Type) when Type==info;
  144. Type==follow;
  145. Type==standby ->
  146. leader_call({monitor, Key, self(), Type});
  147. monitor(_, _) ->
  148. ?THROW_GPROC_ERROR(badarg).
  149. demonitor({_,g,_} = Key, Ref) ->
  150. leader_call({demonitor, Key, self(), Ref});
  151. demonitor(_, _) ->
  152. ?THROW_GPROC_ERROR(badarg).
  153. set_attributes({_,g,_} = Key, Attrs) ->
  154. leader_call({set_attributes, Key, Attrs, self()});
  155. set_attributes(_, _) ->
  156. ?THROW_GPROC_ERROR(badarg).
  157. set_attributes_shared({_,g,_} = Key, Attrs) ->
  158. leader_call({set_attributes, Key, Attrs, shared});
  159. set_attributes_shared(_, _) ->
  160. ?THROW_GPROC_ERROR(badarg).
  161. mreg(T, KVL) ->
  162. if is_list(KVL) -> leader_call({mreg, T, g, KVL, self()});
  163. true -> ?THROW_GPROC_ERROR(badarg)
  164. end.
  165. munreg(T, Keys) ->
  166. if is_list(Keys) -> leader_call({munreg, T, g, Keys, self()});
  167. true -> ?THROW_GPROC_ERROR(badarg)
  168. end.
  169. unreg({_,g,_} = Key) ->
  170. leader_call({unreg, Key, self()});
  171. unreg(_) ->
  172. ?THROW_GPROC_ERROR(badarg).
  173. unreg_shared({T,g,_} = Key) when T==c; T==a ->
  174. leader_call({unreg, Key, shared});
  175. unreg_shared(_) ->
  176. ?THROW_GPROC_ERROR(badarg).
  177. set_value({T,g,_} = Key, Value) when T==a; T==c ->
  178. if is_integer(Value) ->
  179. leader_call({set, Key, Value, self()});
  180. true ->
  181. ?THROW_GPROC_ERROR(badarg)
  182. end;
  183. set_value({_,g,_} = Key, Value) ->
  184. leader_call({set, Key, Value, self()});
  185. set_value(_, _) ->
  186. ?THROW_GPROC_ERROR(badarg).
  187. set_value_shared({T,g,_} = Key, Value) when T==a; T==c; T==p ->
  188. leader_call({set, Key, Value, shared});
  189. set_value_shared(_, _) ->
  190. ?THROW_GPROC_ERROR(badarg).
  191. give_away({_,g,_} = Key, To) ->
  192. leader_call({give_away, Key, To, self()}).
  193. update_counter({T,g,_} = Key, Pid, Incr) when is_integer(Incr), T==c;
  194. is_integer(Incr), T==n ->
  195. leader_call({update_counter, Key, Incr, Pid});
  196. update_counter(_, _, _) ->
  197. ?THROW_GPROC_ERROR(badarg).
  198. update_counters(List) when is_list(List) ->
  199. leader_call({update_counters, List});
  200. update_counters(_) ->
  201. ?THROW_GPROC_ERROR(badarg).
  202. update_shared_counter({c,g,_} = Key, Incr) when is_integer(Incr) ->
  203. leader_call({update_counter, Key, Incr, shared});
  204. update_shared_counter(_, _) ->
  205. ?THROW_GPROC_ERROR(badarg).
  206. reset_counter({c,g,_} = Key) ->
  207. leader_call({reset_counter, Key, self()});
  208. reset_counter(_) ->
  209. ?THROW_GPROC_ERROR(badarg).
  210. %% @spec sync() -> true
  211. %% @doc Synchronize with the gproc leader
  212. %%
  213. %% This function can be used to ensure that data has been replicated from the
  214. %% leader to the current node. It does so by asking the leader to ping all
  215. %% live participating nodes. The call will return `true' when all these nodes
  216. %% have either responded or died. In the special case where the leader dies
  217. %% during an ongoing sync, the call will fail with a timeout exception.
  218. %% (Actually, it should be a `leader_died' exception; more study needed to find
  219. %% out why gen_leader times out in this situation, rather than reporting that
  220. %% the leader died.)
  221. %% @end
  222. %%
  223. sync() ->
  224. leader_call(sync).
  225. %% @spec get_leader() -> node()
  226. %% @doc Returns the node of the current gproc leader.
  227. %% @end
  228. get_leader() ->
  229. GenLeader = gen_leader,
  230. GenLeader:call(?MODULE, get_leader).
  231. %% ==========================================================
  232. %% Server-side
  233. handle_cast(_Msg, S, _) ->
  234. {stop, unknown_cast, S}.
  235. handle_call(get_leader, _, S, E) ->
  236. {reply, gen_leader:leader_node(E), S};
  237. handle_call(_, _, S, _) ->
  238. {reply, badarg, S}.
  239. handle_info({'DOWN', _MRef, process, Pid, _}, S) ->
  240. ets:delete(?TAB, {Pid, g}),
  241. leader_cast({pid_is_DOWN, Pid}),
  242. {ok, S};
  243. handle_info({gproc_unreg, Objs}, S) ->
  244. {ok, [{delete, Objs}], S};
  245. handle_info(_, S) ->
  246. {ok, S}.
  247. handle_info(Msg, S, _E) ->
  248. handle_info(Msg, S).
  249. elected(S, _E) ->
  250. {ok, {globals,globs()}, S#state{is_leader = true}}.
  251. elected(S, _E, undefined) ->
  252. %% I have become leader; full synch
  253. {ok, {globals, globs()}, S#state{is_leader = true}};
  254. elected(S, _E, _Node) ->
  255. Synch = {globals, globs()},
  256. if not S#state.always_broadcast ->
  257. %% Another node recognized us as the leader.
  258. %% Don't broadcast all data to everyone else
  259. {reply, Synch, S};
  260. true ->
  261. %% Main reason for doing this is if we are using a gen_leader
  262. %% that doesn't support the 'reply' return value
  263. {ok, Synch, S}
  264. end.
  265. globs() ->
  266. Gs = ets:select(?TAB, [{{{{'_',g,'_'},'_'},'_','_'},[],['$_']}]),
  267. As = ets:select(?TAB, [{{{'$1',{'_',g,'_'}}, '$2'},[],['$_']}]),
  268. _ = [gproc_lib:ensure_monitor(Pid, g) || {_, Pid, _} <- Gs],
  269. Gs ++ As.
  270. surrendered(#state{is_leader = true} = S, {globals, Globs}, _E) ->
  271. %% Leader conflict!
  272. surrendered_1(Globs),
  273. {ok, S#state{is_leader = false}};
  274. surrendered(S, {globals, Globs}, _E) ->
  275. %% globals from this node should be more correct in our table than
  276. %% in the leader's
  277. surrendered_1(Globs),
  278. {ok, S#state{is_leader = false}}.
  279. handle_DOWN(Node, S, _E) ->
  280. S1 = check_sync_requests(Node, S),
  281. Head = {{{'_',g,'_'},'_'},'$1','_'},
  282. Gs = [{'==', {node,'$1'},Node}],
  283. Globs = ets:select(?TAB, [{Head, Gs, [{{{element,1,{element,1,'$_'}},
  284. {element,2,'$_'}}}]}]),
  285. case process_globals(Globs) of
  286. [] ->
  287. {ok, S1};
  288. Broadcast ->
  289. {ok, Broadcast, S1}
  290. end.
  291. check_sync_requests(Node, #state{sync_requests = SReqs} = S) ->
  292. SReqs1 = lists:flatmap(
  293. fun({From, Ns}) ->
  294. case Ns -- [Node] of
  295. [] ->
  296. gen_leader:reply(From, {leader, reply, true}),
  297. [];
  298. Ns1 ->
  299. [{From, Ns1}]
  300. end
  301. end, SReqs),
  302. S#state{sync_requests = SReqs1}.
  303. handle_leader_call(sync, From, #state{sync_requests = SReqs} = S, E) ->
  304. GenLeader = gen_leader,
  305. case GenLeader:alive(E) -- [node()] of
  306. [] ->
  307. {reply, true, S};
  308. Alive ->
  309. GenLeader:broadcast({from_leader, {sync, From}}, Alive, E),
  310. {noreply, S#state{sync_requests = [{From, Alive}|SReqs]}}
  311. end;
  312. handle_leader_call({Reg, {_C,g,_Name} = K, Value, Pid, As, Op}, _From, S, _E)
  313. when Reg==reg; Reg==reg_other ->
  314. case gproc_lib:insert_reg(K, Value, Pid, g) of
  315. false when Op == reg ->
  316. {reply, badarg, S};
  317. false when Op == ensure ->
  318. case ets:lookup(?TAB, ets_key(K, Pid)) of
  319. [{_, Pid, _}] ->
  320. gproc_lib:do_set_value(K, Value, Pid),
  321. gproc_lib:insert_attr(K, As, Pid, g),
  322. Vals = mk_broadcast_insert_vals([{K, Pid, Value}]),
  323. {reply, updated, [{insert, Vals}], S};
  324. _ ->
  325. {reply, badarg, [], S}
  326. end;
  327. true ->
  328. _ = gproc_lib:ensure_monitor(Pid,g),
  329. _ = if As =/= [] ->
  330. gproc_lib:insert_attr(K, As, Pid, g);
  331. true -> []
  332. end,
  333. Vals = mk_broadcast_insert_vals([{K, Pid, Value}]),
  334. {reply, regged_new(Op), [{insert, Vals}], S}
  335. end;
  336. handle_leader_call({monitor, {T,g,_} = K, MPid, Type}, _From, S, _E) when T==n;
  337. T==a ->
  338. case ets:lookup(?TAB, {K, T}) of
  339. [{_, Pid, _}] ->
  340. Opts = get_opts(Pid, K),
  341. Ref = make_ref(),
  342. Opts1 = gproc_lib:add_monitor(Opts, MPid, Ref, Type),
  343. _ = gproc_lib:ensure_monitor(MPid, g),
  344. Obj = {{Pid,K}, Opts1},
  345. ets:insert(?TAB, Obj),
  346. {reply, Ref, [{insert, [Obj]}], S};
  347. LookupRes ->
  348. Ref = make_ref(),
  349. case Type of
  350. standby ->
  351. Event = {failover, MPid},
  352. Msgs = insert_reg(LookupRes, K, undefined, MPid, Event),
  353. Obj = {{K,T}, MPid, undefined},
  354. Rev = {{MPid,K}, []},
  355. ets:insert(?TAB, [Obj, Rev]),
  356. MPid ! {gproc, {failover,MPid}, Ref, K},
  357. {reply, Ref, [{insert, [Obj, Rev]},
  358. {notify, Msgs}], S};
  359. follow ->
  360. case LookupRes of
  361. [{_, Waiters}] ->
  362. add_follow_to_waiters(Waiters, K, MPid, Ref, S);
  363. [] ->
  364. add_follow_to_waiters([], K, MPid, Ref, S);
  365. [{_, Pid, _}] ->
  366. case ets:lookup(?TAB, {Pid,K}) of
  367. [{_, Opts}] when is_list(Opts) ->
  368. Opts1 = gproc_lib:add_monitor(
  369. Opts, MPid, Ref, follow),
  370. ets:insert(?TAB, {{Pid,K}, Opts1}),
  371. {reply, Ref,
  372. [{insert, [{{Pid,K}, Opts1}]}], S}
  373. end
  374. end;
  375. _ ->
  376. MPid ! {gproc, unreg, Ref, K},
  377. {reply, Ref, S}
  378. end
  379. end;
  380. handle_leader_call({demonitor, {T,g,_} = K, MPid, Ref}, _From, S, _E) ->
  381. case ets:lookup(?TAB, {K,T}) of
  382. [{_, Pid, _}] ->
  383. Opts = get_opts(Pid, K),
  384. Opts1 = gproc_lib:remove_monitors(Opts, MPid, Ref),
  385. Obj = {{Pid,K}, Opts1},
  386. ets:insert(?TAB, Obj),
  387. ets:delete(?TAB, {MPid, K}),
  388. {reply, ok, [{delete, [{MPid,K}]},
  389. {insert, [Obj]}], S};
  390. [{Key, Waiters}] ->
  391. NewWaiters = [W || W <- Waiters,
  392. W =/= {MPid, Ref, follow}],
  393. {reply, ok, [{insert, [{Key, NewWaiters}]}], S};
  394. _ ->
  395. {reply, ok, S}
  396. end;
  397. handle_leader_call({set_attributes, {_,g,_} = K, Attrs, Pid}, _From, S, _E) ->
  398. case gproc_lib:insert_attr(K, Attrs, Pid, g) of
  399. false ->
  400. {reply, badarg, S};
  401. NewAttrs when is_list(NewAttrs) ->
  402. {reply, true, [{insert, [{{Pid,K}, NewAttrs}]}], S}
  403. end;
  404. handle_leader_call({reg_or_locate, {n,g,_} = K, Value, P},
  405. {FromPid, _}, S, _E) ->
  406. FromNode = node(FromPid),
  407. Reg = fun() ->
  408. Pid = if is_function(P, 0) ->
  409. spawn(FromNode, P);
  410. is_pid(P) ->
  411. P
  412. end,
  413. case gproc_lib:insert_reg(K, Value, Pid, g) of
  414. true ->
  415. _ = gproc_lib:ensure_monitor(Pid,g),
  416. Vals = [{{K,n},Pid,Value}],
  417. {reply, {Pid, Value}, [{insert, Vals}], S};
  418. false ->
  419. {reply, badarg, S}
  420. end
  421. end,
  422. case ets:lookup(?TAB, {K, n}) of
  423. [] ->
  424. Reg();
  425. [{_, _Waiters}] ->
  426. Reg();
  427. [{_, OtherPid, OtherVal}] ->
  428. {reply, {OtherPid, OtherVal}, S}
  429. end;
  430. handle_leader_call({update_counter, {T,g,_Ctr} = Key, Incr, Pid}, _From, S, _E)
  431. when is_integer(Incr), T==c;
  432. is_integer(Incr), T==n ->
  433. try New = ets:update_counter(?TAB, {Key, Pid}, {3,Incr}),
  434. RealPid = case Pid of
  435. n -> ets:lookup_element(?TAB, {Key,Pid}, 2);
  436. shared -> shared;
  437. P when is_pid(P) -> P
  438. end,
  439. Vals = [{{Key,Pid},RealPid,New} | update_aggr_counter(Key, Incr)],
  440. {reply, New, [{insert, Vals}], S}
  441. catch
  442. error:_ ->
  443. {reply, badarg, S}
  444. end;
  445. handle_leader_call({update_counters, Cs}, _From, S, _E) ->
  446. try {Replies, Vals} = batch_update_counters(Cs),
  447. {reply, Replies, [{insert, Vals}], S}
  448. catch
  449. error:_ ->
  450. {reply, badarg, S}
  451. end;
  452. handle_leader_call({reset_counter, {c,g,_Ctr} = Key, Pid}, _From, S, _E) ->
  453. try Current = ets:lookup_element(?TAB, {Key, Pid}, 3),
  454. Initial = case ets:lookup_element(?TAB, {Pid, Key}, 2) of
  455. r -> 0;
  456. Opts when is_list(Opts) ->
  457. proplists:get_value(initial, Opts, 0)
  458. end,
  459. Incr = Initial - Current,
  460. New = ets:update_counter(?TAB, {Key, Pid}, {3, Incr}),
  461. Vals = [{{Key,Pid},Pid,New} | update_aggr_counter(Key, Incr)],
  462. {reply, {Current, New}, [{insert, Vals}], S}
  463. catch
  464. error:_R ->
  465. io:fwrite("reset_counter failed: ~p~n~p~n", [_R, erlang:get_stacktrace()]),
  466. {reply, badarg, S}
  467. end;
  468. handle_leader_call({Unreg, {T,g,Name} = K, Pid}, _From, S, _E)
  469. when Unreg==unreg;
  470. Unreg==unreg_other->
  471. Key = if T == n; T == a; T == rc -> {K,T};
  472. true -> {K, Pid}
  473. end,
  474. case ets:member(?TAB, Key) of
  475. true ->
  476. _ = gproc_lib:remove_reg(K, Pid, unreg),
  477. if T == c ->
  478. case ets:lookup(?TAB, {{a,g,Name},a}) of
  479. [Aggr] ->
  480. %% updated by remove_reg/3
  481. {reply, true, [{delete,[{K,Pid}, {Pid,K}]},
  482. {insert, [Aggr]}], S};
  483. [] ->
  484. {reply, true, [{delete, [{K,Pid}, {Pid,K}]}], S}
  485. end;
  486. T == r ->
  487. case ets:lookup(?TAB, {{rc,g,Name},rc}) of
  488. [RC] ->
  489. {reply, true, [{delete,[{K,Pid}, {Pid,K}]},
  490. {insert, [RC]}], S};
  491. [] ->
  492. {reply, true, [{delete, [{K,Pid}, {Pid, K}]}], S}
  493. end;
  494. true ->
  495. {reply, true, [{notify, [{K, Pid, unreg}]},
  496. {delete, [{K, Pid}, {Pid,K}]}], S}
  497. end;
  498. false ->
  499. {reply, badarg, S}
  500. end;
  501. handle_leader_call({give_away, {T,g,_} = K, To, Pid}, _From, S, _E)
  502. when T == a; T == n; T == rc ->
  503. Key = {K, T},
  504. case ets:lookup(?TAB, Key) of
  505. [{_, Pid, Value}] ->
  506. Opts = get_opts(Pid, K),
  507. case pid_to_give_away_to(To) of
  508. Pid ->
  509. {reply, Pid, S};
  510. ToPid when is_pid(ToPid) ->
  511. ets:insert(?TAB, [{Key, ToPid, Value},
  512. {{ToPid,K}, Opts}]),
  513. _ = gproc_lib:ensure_monitor(ToPid, g),
  514. Rev = {Pid, K},
  515. ets:delete(?TAB, Rev),
  516. gproc_lib:notify({migrated, ToPid}, K, Opts),
  517. {reply, ToPid, [{insert, [{Key, ToPid, Value}]},
  518. {notify, [{K, Pid, {migrated, ToPid}}]},
  519. {delete, [{K, Pid}, Rev]}], S};
  520. undefined ->
  521. ets:delete(?TAB, Key),
  522. Rev = {Pid, K},
  523. ets:delete(?TAB, Rev),
  524. gproc_lib:notify(unreg, K, Opts),
  525. {reply, undefined, [{notify, [{K, Pid, unreg}]},
  526. {delete, [{K, Pid}, Rev]}], S}
  527. end;
  528. _ ->
  529. {reply, badarg, S}
  530. end;
  531. handle_leader_call({mreg, T, g, L, Pid}, _From, S, _E) ->
  532. if T==p; T==n; T==r ->
  533. try gproc_lib:insert_many(T, g, L, Pid) of
  534. {true,Objs} -> {reply, true, [{insert,Objs}], S};
  535. false -> {reply, badarg, S}
  536. catch
  537. error:_ -> {reply, badarg, S}
  538. end;
  539. true -> {reply, badarg, S}
  540. end;
  541. handle_leader_call({munreg, T, g, L, Pid}, _From, S, _E) ->
  542. try gproc_lib:remove_many(T, g, L, Pid) of
  543. [] ->
  544. {reply, true, S};
  545. Objs ->
  546. {reply, true, [{delete, Objs}], S}
  547. catch
  548. error:_ -> {reply, badarg, S}
  549. end;
  550. handle_leader_call({set,{T,g,N} =K,V,Pid}, _From, S, _E) ->
  551. if T == a ->
  552. if is_integer(V) ->
  553. case gproc_lib:do_set_value(K, V, Pid) of
  554. true -> {reply, true, [{insert,[{{K,T},Pid,V}]}], S};
  555. false -> {reply, badarg, S}
  556. end
  557. end;
  558. T == c ->
  559. try gproc_lib:do_set_counter_value(K, V, Pid),
  560. AKey = {{a,g,N},a},
  561. Aggr = ets:lookup(?TAB, AKey), % may be []
  562. {reply, true, [{insert, [{{K,Pid},Pid,V} | Aggr]}], S}
  563. catch
  564. error:_ ->
  565. {reply, badarg, S}
  566. end;
  567. true ->
  568. case gproc_lib:do_set_value(K, V, Pid) of
  569. true ->
  570. Obj = if T==n -> {{K, T}, Pid, V};
  571. true -> {{K, Pid}, Pid, V}
  572. end,
  573. {reply, true, [{insert,[Obj]}], S};
  574. false ->
  575. {reply, badarg, S}
  576. end
  577. end;
  578. handle_leader_call({await, Key, Pid}, {_,Ref} = From, S, _E) ->
  579. %% The pid in _From is of the gen_leader instance that forwarded the
  580. %% call - not of the client. This is why the Pid is explicitly passed.
  581. %% case gproc_lib:await(Key, {Pid,Ref}) of
  582. case gproc_lib:await(Key, Pid, From) of
  583. {reply, {Ref, {K, P, V}}} ->
  584. {reply, {Ref, {K, P, V}}, S};
  585. {reply, Reply, Insert} ->
  586. {reply, Reply, [{insert, Insert}], S}
  587. end;
  588. handle_leader_call(_, _, S, _E) ->
  589. {reply, badarg, S}.
  590. handle_leader_cast({sync_reply, Node, Ref}, S, _E) ->
  591. #state{sync_requests = SReqs} = S,
  592. case lists:keyfind(Ref, 1, SReqs) of
  593. false ->
  594. %% This should never happen, except perhaps if the leader who
  595. %% received the sync request died, and the new leader gets the
  596. %% sync reply. In that case, we trust that the client has been
  597. %% notified anyway, and ignore the message.
  598. {ok, S};
  599. {_, Ns} ->
  600. case lists:delete(Node, Ns) of
  601. [] ->
  602. gen_leader:reply(Ref, {leader, reply, true}),
  603. {ok, S#state{sync_requests = lists:keydelete(Ref,1,SReqs)}};
  604. Ns1 ->
  605. SReqs1 = lists:keyreplace(Ref, 1, SReqs, {Ref, Ns1}),
  606. {ok, S#state{sync_requests = SReqs1}}
  607. end
  608. end;
  609. handle_leader_cast({add_globals, Missing}, S, _E) ->
  610. %% This is an audit message: a peer (non-leader) had info about granted
  611. %% global resources that we didn't know of when we became leader.
  612. %% This could happen due to a race condition when the old leader died.
  613. Update = insert_globals(Missing),
  614. {ok, [{insert, Update}], S};
  615. handle_leader_cast({remove_globals, Globals}, S, _E) ->
  616. delete_globals(Globals),
  617. {ok, S};
  618. handle_leader_cast({cancel_wait, Pid, {T,_,_} = Key, Ref}, S, _E) ->
  619. case ets:lookup(?TAB, {Key, T}) of
  620. [{_, Waiters}] ->
  621. Ops = gproc_lib:remove_wait(Key, Pid, Ref, Waiters),
  622. {ok, Ops, S};
  623. _ ->
  624. {ok, [], S}
  625. end;
  626. handle_leader_cast({cancel_wait_or_monitor, Pid, {T,_,_} = Key}, S, _E) ->
  627. case ets:lookup(?TAB, {Key, T}) of
  628. [{_, Waiters}] ->
  629. Ops = gproc_lib:remove_wait(Key, Pid, all, Waiters),
  630. {ok, Ops, S};
  631. [{_, OtherPid, _}] ->
  632. Ops = gproc_lib:remove_monitors(Key, OtherPid, Pid),
  633. {ok, Ops, S}
  634. end;
  635. handle_leader_cast({pid_is_DOWN, Pid}, S, _E) ->
  636. Globals = ets:select(?TAB, [{{{Pid,'$1'}, '_'},
  637. [{'==',{element,2,'$1'},g}],[{{'$1',Pid}}]}]),
  638. ets:delete(?TAB, {Pid,g}),
  639. case process_globals(Globals) of
  640. [] ->
  641. {ok, S};
  642. Broadcast ->
  643. {ok, Broadcast, S}
  644. end.
  645. mk_broadcast_insert_vals(Objs) ->
  646. lists:flatmap(
  647. fun({{C, g, Name} = K, Pid, Value}) ->
  648. if C == a; C == rc ->
  649. ets:lookup(?TAB, {K,C}) ++ ets:lookup(?TAB, {Pid,K});
  650. C == c ->
  651. [{{K,Pid},Pid,Value} | ets:lookup(?TAB,{{a,g,Name},a})]
  652. ++ ets:lookup(?TAB, {Pid,K});
  653. C == r ->
  654. [{{K,Pid},Pid,Value} | ets:lookup(?TAB,{{rc,g,Name},rc})]
  655. ++ ets:lookup(?TAB, {Pid, K});
  656. C == n ->
  657. [{{K,n},Pid,Value}| ets:lookup(?TAB, {Pid,K})];
  658. true ->
  659. [{{K,Pid},Pid,Value} | ets:lookup(?TAB, {Pid,K})]
  660. end
  661. end, Objs).
  662. process_globals(Globals) ->
  663. {Modified, Notifications} =
  664. lists:foldl(
  665. fun({{T,_,_} = Key, Pid}, A) when T==n; T==a; T==rc ->
  666. case ets:lookup(?TAB, {Pid,Key}) of
  667. [{_, Opts}] when is_list(Opts) ->
  668. maybe_failover(Key, Pid, Opts, A);
  669. _ ->
  670. A
  671. end;
  672. ({{T,_,_} = Key, Pid}, {MA,NA}) ->
  673. MA1 = case T of
  674. c ->
  675. Incr = ets:lookup_element(?TAB, {Key,Pid}, 3),
  676. update_aggr_counter(Key, -Incr) ++ MA;
  677. r ->
  678. decrement_resource_count(Key, []) ++ MA;
  679. _ ->
  680. MA
  681. end,
  682. N = remove_entry(Key, Pid, unreg),
  683. {MA1, N ++ NA}
  684. end, {[],[]}, Globals),
  685. [{insert, Modified} || Modified =/= []] ++
  686. [{notify, Notifications} || Notifications =/= []] ++
  687. [{delete, Globals} || Globals =/= []].
  688. maybe_failover({T,_,_} = Key, Pid, Opts, {MAcc, NAcc}) ->
  689. Opts = get_opts(Pid, Key),
  690. case filter_standbys(gproc_lib:standbys(Opts)) of
  691. [] ->
  692. Notify = remove_entry(Key, Pid, unreg),
  693. {MAcc, Notify ++ NAcc};
  694. [{ToPid,Ref,_}|_] ->
  695. Value = case ets:lookup(?TAB, {Key,T}) of
  696. [{_, _, V}] -> V;
  697. _ -> undefined
  698. end,
  699. Notify = remove_rev_entry(Opts, Pid, Key, {failover, ToPid}),
  700. Opts1 = gproc_lib:remove_monitor(Opts, ToPid, Ref),
  701. _ = gproc_lib:ensure_monitor(ToPid, g),
  702. NewReg = {{Key,T}, ToPid, Value},
  703. NewRev = {{ToPid, Key}, Opts1},
  704. ets:insert(?TAB, [NewReg, NewRev]),
  705. {[NewReg, NewRev | MAcc], Notify ++ NAcc}
  706. end.
  707. filter_standbys(SBs) ->
  708. filter_standbys(SBs, [node()|nodes()]).
  709. filter_standbys([{Pid,_,_} = H|T], Nodes) ->
  710. case lists:member(node(Pid), Nodes) of
  711. true ->
  712. [H|T];
  713. false ->
  714. filter_standbys(T, Nodes)
  715. end;
  716. filter_standbys([], _) ->
  717. [].
  718. remove_entry(Key, Pid, Event) ->
  719. K = ets_key(Key, Pid),
  720. case ets:lookup(?TAB, K) of
  721. [{_, P, _}] when is_pid(P), P =:= Pid; is_atom(Pid) ->
  722. ets:delete(?TAB, K),
  723. remove_rev_entry(get_opts(Pid, Key), Pid, Key, Event);
  724. [{_, _OtherPid, _}] ->
  725. ets:delete(?TAB, {Pid, Key}),
  726. [];
  727. [{_, _Waiters}] ->
  728. %% Skip
  729. [];
  730. [] -> []
  731. end.
  732. remove_rev_entry(Opts, Pid, {T,g,_} = K, Event) when T==n; T==a ->
  733. Key = {Pid, K},
  734. gproc_lib:notify(Event, K, Opts),
  735. ets:delete(?TAB, Key),
  736. [{K, Pid, Event}];
  737. remove_rev_entry(_, Pid, K, _Event) ->
  738. ets:delete(?TAB, {Pid, K}),
  739. [].
  740. get_opts(Pid, K) ->
  741. case ets:lookup(?TAB, {Pid, K}) of
  742. [] -> [];
  743. [{_, r}] -> [];
  744. [{_, Opts}] -> Opts
  745. end.
  746. code_change(_FromVsn, S, _Extra, _E) ->
  747. {ok, S}.
  748. terminate(_Reason, _S) ->
  749. ok.
  750. from_leader({sync, Ref}, S, _E) ->
  751. gen_leader:leader_cast(?MODULE, {sync_reply, node(), Ref}),
  752. {ok, S};
  753. from_leader(Ops, S, _E) ->
  754. lists:foreach(
  755. fun({delete, Globals}) ->
  756. delete_globals(Globals);
  757. ({insert, Globals}) ->
  758. _ = insert_globals(Globals);
  759. ({notify, Events}) ->
  760. do_notify(Events)
  761. end, Ops),
  762. {ok, S}.
  763. insert_globals(Globals) ->
  764. lists:foldl(
  765. fun({{{_,_,_} = Key,_}, Pid, _} = Obj, A) ->
  766. ets:insert(?TAB, Obj),
  767. ets:insert_new(?TAB, {{Pid,Key}, []}),
  768. gproc_lib:ensure_monitor(Pid,g),
  769. A;
  770. ({{P,_K}, Opts} = Obj, A) when is_pid(P), is_list(Opts),Opts =/= [] ->
  771. ets:insert(?TAB, Obj),
  772. gproc_lib:ensure_monitor(P,g),
  773. [Obj] ++ A;
  774. (_Other, A) ->
  775. A
  776. end, Globals, Globals).
  777. delete_globals(Globals) ->
  778. lists:foreach(
  779. fun({{_,g,_} = K, T}) when is_atom(T); is_pid(T) ->
  780. remove_entry(K, T, []);
  781. ({{{_,g,_} = K, T}, P}) when is_pid(P), is_atom(T);
  782. is_pid(P), is_pid(T) ->
  783. remove_entry(K, P, []);
  784. ({Pid, Key}) when is_pid(Pid); Pid==shared ->
  785. ets:delete(?TAB, {Pid, Key})
  786. end, Globals).
  787. do_notify([{P, Msg}|T]) when is_pid(P) ->
  788. P ! Msg,
  789. do_notify(T);
  790. do_notify([{K, P, E}|T]) ->
  791. case ets:lookup(?TAB, {P,K}) of
  792. [{_, Opts}] when is_list(Opts) ->
  793. gproc_lib:notify(E, K, Opts);
  794. _ ->
  795. do_notify(T)
  796. end;
  797. do_notify([]) ->
  798. ok.
  799. ets_key({T,_,_} = K, _) when T==n; T==a; T==rc ->
  800. {K, T};
  801. ets_key(K, Pid) ->
  802. {K, Pid}.
  803. leader_call(Req) ->
  804. case gen_leader:leader_call(?MODULE, Req) of
  805. badarg -> ?THROW_GPROC_ERROR(badarg);
  806. Reply -> Reply
  807. end.
  808. leader_cast(Msg) ->
  809. gen_leader:leader_cast(?MODULE, Msg).
  810. init(Opts) ->
  811. S0 = #state{},
  812. AlwaysBcast = proplists:get_value(always_broadcast, Opts,
  813. S0#state.always_broadcast),
  814. {ok, #state{always_broadcast = AlwaysBcast}}.
  815. surrendered_1(Globs) ->
  816. My_local_globs =
  817. ets:select(?TAB, [{{{{'_',g,'_'},'_'},'$1', '$2'},
  818. [{'==', {node,'$1'}, node()}],
  819. [{{ {element,1,'$_'}, '$1', '$2' }}]}]),
  820. _ = [gproc_lib:ensure_monitor(Pid, g) || {_, Pid, _} <- My_local_globs],
  821. ?event({'My_local_globs', My_local_globs}),
  822. %% remove all remote globals.
  823. ets:select_delete(?TAB, [{{{{'_',g,'_'},'_'}, '$1', '_'},
  824. [{'=/=', {node,'$1'}, node()}],
  825. [true]},
  826. {{{'$1',{'_',g,'_'}}, '_'},
  827. [{'=/=', {node,'$1'}, node()}],
  828. [true]}]),
  829. %% insert new non-local globals, collect the leader's version of
  830. %% what my globals are
  831. Ldr_local_globs =
  832. lists:foldl(
  833. fun({{Key,_}=K, Pid, V}, Acc) when node(Pid) =/= node() ->
  834. ets:insert(?TAB, {K, Pid, V}),
  835. _ = gproc_lib:ensure_monitor(Pid, g),
  836. ets:insert_new(?TAB, {{Pid,Key}, []}),
  837. Acc;
  838. ({{_Pid,_}=K, Opts}, Acc) -> % when node(Pid) =/= node() ->
  839. ets:insert(?TAB, {K, Opts}),
  840. Acc;
  841. ({_, Pid, _} = Obj, Acc) when node(Pid) == node() ->
  842. [Obj|Acc]
  843. end, [], Globs),
  844. ?event({'Ldr_local_globs', Ldr_local_globs}),
  845. case [{K,P,V} || {K,P,V} <- My_local_globs,
  846. is_pid(P) andalso
  847. not(lists:keymember(K, 1, Ldr_local_globs))] of
  848. [] ->
  849. %% phew! We have the same picture
  850. ok;
  851. [_|_] = Missing ->
  852. %% This is very unlikely, I think
  853. ?event({'Missing', Missing}),
  854. leader_cast({add_globals, mk_broadcast_insert_vals(Missing)})
  855. end,
  856. case [{K,P} || {{K,_}=R,P,_} <- Ldr_local_globs,
  857. is_pid(P) andalso
  858. not(lists:keymember(R, 1, My_local_globs))] of
  859. [] ->
  860. ok;
  861. [_|_] = Remove ->
  862. ?event({'Remove', Remove}),
  863. leader_cast({remove_globals, Remove})
  864. end.
  865. batch_update_counters(Cs) ->
  866. batch_update_counters(Cs, [], []).
  867. batch_update_counters([{{c,g,_} = Key, Pid, Incr}|T], Returns, Updates) ->
  868. case update_counter_g(Key, Incr, Pid) of
  869. [{_,_,_} = A, {_, _, V} = C] ->
  870. batch_update_counters(T, [{Key,Pid,V}|Returns], add_object(
  871. A, add_object(C, Updates)));
  872. [{_, _, V} = C] ->
  873. batch_update_counters(T, [{Key,Pid,V}|Returns], add_object(C, Updates))
  874. end;
  875. batch_update_counters([], Returns, Updates) ->
  876. {lists:reverse(Returns), Updates}.
  877. add_object({K,P,_} = Obj, [{K,P,_} | T]) ->
  878. [Obj | T];
  879. add_object(Obj, [H|T]) ->
  880. [H | add_object(Obj, T)];
  881. add_object(Obj, []) ->
  882. [Obj].
  883. update_counter_g({c,g,_} = Key, Incr, Pid) when is_integer(Incr) ->
  884. Res = ets:update_counter(?TAB, {Key, Pid}, {3,Incr}),
  885. update_aggr_counter(Key, Incr, [{{Key,Pid},Pid,Res}]);
  886. update_counter_g({c,g,_} = Key, {Incr, Threshold, SetValue}, Pid)
  887. when is_integer(Incr), is_integer(Threshold), is_integer(SetValue) ->
  888. [Prev, New] = ets:update_counter(?TAB, {Key, Pid},
  889. [{3, 0}, {3, Incr, Threshold, SetValue}]),
  890. update_aggr_counter(Key, New - Prev, [{{Key,Pid},Pid,New}]);
  891. update_counter_g({c,g,_} = Key, Ops, Pid) when is_list(Ops) ->
  892. case ets:update_counter(?TAB, {Key, Pid},
  893. [{3, 0} | expand_ops(Ops)]) of
  894. [_] ->
  895. [];
  896. [Prev | Rest] ->
  897. [New | _] = lists:reverse(Rest),
  898. update_aggr_counter(Key, New - Prev, [{Key, Pid, Rest}])
  899. end;
  900. update_counter_g(_, _, _) ->
  901. ?THROW_GPROC_ERROR(badarg).
  902. expand_ops([{Incr,Thr,SetV}|T])
  903. when is_integer(Incr), is_integer(Thr), is_integer(SetV) ->
  904. [{3, Incr, Thr, SetV}|expand_ops(T)];
  905. expand_ops([Incr|T]) when is_integer(Incr) ->
  906. [{3, Incr}|expand_ops(T)];
  907. expand_ops([]) ->
  908. [];
  909. expand_ops(_) ->
  910. ?THROW_GPROC_ERROR(badarg).
  911. update_aggr_counter({n,_,_}, _) ->
  912. [];
  913. update_aggr_counter(Key, Incr) ->
  914. update_aggr_counter(Key, Incr, []).
  915. update_aggr_counter({c,g,Ctr}, Incr, Acc) ->
  916. Key = {{a,g,Ctr},a},
  917. case ets:lookup(?TAB, Key) of
  918. [] ->
  919. Acc;
  920. [{K, Pid, Prev}] ->
  921. New = {K, Pid, Prev+Incr},
  922. ets:insert(?TAB, New),
  923. [New|Acc]
  924. end.
  925. decrement_resource_count({r,g,Rsrc}, Acc) ->
  926. Key = {{rc,g,Rsrc},rc},
  927. case ets:member(?TAB, Key) of
  928. false ->
  929. Acc;
  930. true ->
  931. %% Call the lib function, which might trigger events
  932. gproc_lib:decrement_resource_count(g, Rsrc),
  933. ets:lookup(?TAB, Key) ++ Acc
  934. end.
  935. pid_to_give_away_to(P) when is_pid(P) ->
  936. P;
  937. pid_to_give_away_to({T,g,_} = Key) when T==n; T==a ->
  938. case ets:lookup(?TAB, {Key, T}) of
  939. [{_, Pid, _}] ->
  940. Pid;
  941. _ ->
  942. undefined
  943. end.
  944. insert_reg([{_, Waiters}], K, Val, Pid, Event) ->
  945. gproc_lib:insert_reg(K, Val, Pid, g),
  946. tell_waiters(Waiters, K, Pid, Val, Event).
  947. tell_waiters([{P,R}|T], K, Pid, V, Event) ->
  948. Msg = {gproc, R, registered, {K, Pid, V}},
  949. if node(P) == node() ->
  950. P ! Msg;
  951. true ->
  952. [{P, Msg} | tell_waiters(T, K, Pid, V, Event)]
  953. end;
  954. tell_waiters([{P,R,follow}|T], K, Pid, V, Event) ->
  955. Msg = {gproc, Event, R, K},
  956. if node(P) == node() ->
  957. P ! Msg;
  958. true ->
  959. [{P, Msg} | tell_waiters(T, K, Pid, V, Event)]
  960. end;
  961. tell_waiters([], _, _, _, _) ->
  962. [].
  963. add_follow_to_waiters(Waiters, {T,_,_} = K, Pid, Ref, S) ->
  964. Obj = {{K,T}, [{Pid, Ref, follow}|Waiters]},
  965. Rev = {{Pid,K}, []},
  966. ets:insert(?TAB, [Obj, Rev]),
  967. Msg = {gproc, unreg, Ref, K},
  968. if node(Pid) == node() ->
  969. Pid ! Msg,
  970. {reply, Ref, [{insert, [Obj, Rev]}], S};
  971. true ->
  972. {reply, Ref, [{insert, [Obj, Rev]},
  973. {notify, [{Pid, Msg}]}], S}
  974. end.
  975. regged_new(reg ) -> true;
  976. regged_new(ensure) -> new.