gproc_dist.erl 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998
  1. %% ``The contents of this file are subject to the Erlang Public License,
  2. %% Version 1.1, (the "License"); you may not use this file except in
  3. %% compliance with the License. You should have received a copy of the
  4. %% Erlang Public License along with this software. If not, it can be
  5. %% retrieved via the world wide web at http://www.erlang.org/.
  6. %%
  7. %% Software distributed under the License is distributed on an "AS IS"
  8. %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  9. %% the License for the specific language governing rights and limitations
  10. %% under the License.
  11. %%
  12. %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
  13. %% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
  14. %% AB. All Rights Reserved.''
  15. %%
  16. %% @author Ulf Wiger <ulf@wiger.net>
  17. %%
  18. %% @doc Extended process registry
  19. %% <p>This module implements an extended process registry</p>
  20. %% <p>For a detailed description, see gproc/doc/erlang07-wiger.pdf.</p>
  21. %% @end
  22. -module(gproc_dist).
  23. -behaviour(gen_leader).
  24. -export([start_link/0, start_link/1,
  25. reg/1, reg/2, unreg/1,
  26. reg_or_locate/3,
  27. reg_shared/2, unreg_shared/1,
  28. monitor/2,
  29. demonitor/2,
  30. set_attributes/2,
  31. set_attributes_shared/2,
  32. mreg/2,
  33. munreg/2,
  34. set_value/2,
  35. set_value_shared/2,
  36. give_away/2,
  37. update_counter/3,
  38. update_counters/1,
  39. update_shared_counter/2,
  40. reset_counter/1]).
  41. -export([leader_call/1,
  42. leader_cast/1,
  43. sync/0,
  44. get_leader/0]).
  45. %%% internal exports
  46. -export([init/1,
  47. handle_cast/3,
  48. handle_call/4,
  49. handle_info/2, handle_info/3,
  50. handle_leader_call/4,
  51. handle_leader_cast/3,
  52. handle_DOWN/3,
  53. elected/2, % original version
  54. elected/3,
  55. surrendered/3,
  56. from_leader/3,
  57. code_change/4,
  58. terminate/2]).
  59. -include("gproc_int.hrl").
  60. -include("gproc.hrl").
  61. -define(SERVER, ?MODULE).
  62. -record(state, {
  63. always_broadcast = false,
  64. is_leader,
  65. sync_requests = []}).
  66. %% ==========================================================
  67. %% Start functions
  68. start_link() ->
  69. start_link({[node()|nodes()], []}).
  70. start_link(all) ->
  71. start_link({[node()|nodes()], [{bcast_type, all}]});
  72. start_link(Nodes) when is_list(Nodes) ->
  73. start_link({Nodes, []});
  74. start_link({Nodes, Opts}) ->
  75. SpawnOpts = gproc_lib:valid_opts(server_options, []),
  76. gen_leader:start_link(
  77. ?SERVER, Nodes, Opts, ?MODULE, [], [{spawn_opt, SpawnOpts}]).
  78. %% ==========================================================
  79. %% API
  80. %% {@see gproc:reg/1}
  81. %%
  82. reg(Key) ->
  83. reg(Key, gproc:default(Key)).
  84. %% {@see gproc:reg_or_locate/2}
  85. %%
  86. reg_or_locate({n,g,_} = Key, Value, Pid) when is_pid(Pid) ->
  87. leader_call({reg_or_locate, Key, Value, Pid});
  88. reg_or_locate({n,g,_} = Key, Value, F) when is_function(F, 0) ->
  89. MyGroupLeader = group_leader(),
  90. leader_call({reg_or_locate, Key, Value,
  91. fun() ->
  92. %% leader will spawn on caller's node
  93. group_leader(MyGroupLeader, self()),
  94. F()
  95. end});
  96. reg_or_locate(_, _, _) ->
  97. ?THROW_GPROC_ERROR(badarg).
  98. %%% @spec({Class,Scope, Key}, Value) -> true
  99. %%% @doc
  100. %%% Class = n - unique name
  101. %%% | p - non-unique property
  102. %%% | c - counter
  103. %%% | a - aggregated counter
  104. %%% Scope = l | g (global or local)
  105. %%% @end
  106. reg({_,g,_} = Key, Value) ->
  107. %% anything global
  108. leader_call({reg, Key, Value, self()});
  109. reg(_, _) ->
  110. ?THROW_GPROC_ERROR(badarg).
  111. reg_shared({_,g,_} = Key, Value) ->
  112. leader_call({reg, Key, Value, shared});
  113. reg_shared(_, _) ->
  114. ?THROW_GPROC_ERROR(badarg).
  115. monitor({_,g,_} = Key, Type) when Type==info;
  116. Type==follow;
  117. Type==standby ->
  118. leader_call({monitor, Key, self(), Type});
  119. monitor(_, _) ->
  120. ?THROW_GPROC_ERROR(badarg).
  121. demonitor({_,g,_} = Key, Ref) ->
  122. leader_call({demonitor, Key, self(), Ref});
  123. demonitor(_, _) ->
  124. ?THROW_GPROC_ERROR(badarg).
  125. set_attributes({_,g,_} = Key, Attrs) ->
  126. leader_call({set_attributes, Key, Attrs, self()});
  127. set_attributes(_, _) ->
  128. ?THROW_GPROC_ERROR(badarg).
  129. set_attributes_shared({_,g,_} = Key, Attrs) ->
  130. leader_call({set_attributes, Key, Attrs, shared});
  131. set_attributes_shared(_, _) ->
  132. ?THROW_GPROC_ERROR(badarg).
  133. mreg(T, KVL) ->
  134. if is_list(KVL) -> leader_call({mreg, T, g, KVL, self()});
  135. true -> ?THROW_GPROC_ERROR(badarg)
  136. end.
  137. munreg(T, Keys) ->
  138. if is_list(Keys) -> leader_call({munreg, T, g, Keys, self()});
  139. true -> ?THROW_GPROC_ERROR(badarg)
  140. end.
  141. unreg({_,g,_} = Key) ->
  142. leader_call({unreg, Key, self()});
  143. unreg(_) ->
  144. ?THROW_GPROC_ERROR(badarg).
  145. unreg_shared({T,g,_} = Key) when T==c; T==a ->
  146. leader_call({unreg, Key, shared});
  147. unreg_shared(_) ->
  148. ?THROW_GPROC_ERROR(badarg).
  149. set_value({T,g,_} = Key, Value) when T==a; T==c ->
  150. if is_integer(Value) ->
  151. leader_call({set, Key, Value, self()});
  152. true ->
  153. ?THROW_GPROC_ERROR(badarg)
  154. end;
  155. set_value({_,g,_} = Key, Value) ->
  156. leader_call({set, Key, Value, self()});
  157. set_value(_, _) ->
  158. ?THROW_GPROC_ERROR(badarg).
  159. set_value_shared({T,g,_} = Key, Value) when T==a; T==c; T==p ->
  160. leader_call({set, Key, Value, shared});
  161. set_value_shared(_, _) ->
  162. ?THROW_GPROC_ERROR(badarg).
  163. give_away({_,g,_} = Key, To) ->
  164. leader_call({give_away, Key, To, self()}).
  165. update_counter({T,g,_} = Key, Pid, Incr) when is_integer(Incr), T==c;
  166. is_integer(Incr), T==n ->
  167. leader_call({update_counter, Key, Incr, Pid});
  168. update_counter(_, _, _) ->
  169. ?THROW_GPROC_ERROR(badarg).
  170. update_counters(List) when is_list(List) ->
  171. leader_call({update_counters, List});
  172. update_counters(_) ->
  173. ?THROW_GPROC_ERROR(badarg).
  174. update_shared_counter({c,g,_} = Key, Incr) when is_integer(Incr) ->
  175. leader_call({update_counter, Key, Incr, shared});
  176. update_shared_counter(_, _) ->
  177. ?THROW_GPROC_ERROR(badarg).
  178. reset_counter({c,g,_} = Key) ->
  179. leader_call({reset_counter, Key, self()});
  180. reset_counter(_) ->
  181. ?THROW_GPROC_ERROR(badarg).
  182. %% @spec sync() -> true
  183. %% @doc Synchronize with the gproc leader
  184. %%
  185. %% This function can be used to ensure that data has been replicated from the
  186. %% leader to the current node. It does so by asking the leader to ping all
  187. %% live participating nodes. The call will return `true' when all these nodes
  188. %% have either responded or died. In the special case where the leader dies
  189. %% during an ongoing sync, the call will fail with a timeout exception.
  190. %% (Actually, it should be a `leader_died' exception; more study needed to find
  191. %% out why gen_leader times out in this situation, rather than reporting that
  192. %% the leader died.)
  193. %% @end
  194. %%
  195. sync() ->
  196. leader_call(sync).
  197. %% @spec get_leader() -> node()
  198. %% @doc Returns the node of the current gproc leader.
  199. %% @end
  200. get_leader() ->
  201. GenLeader = gen_leader,
  202. GenLeader:call(?MODULE, get_leader).
  203. %% ==========================================================
  204. %% Server-side
  205. handle_cast(_Msg, S, _) ->
  206. {stop, unknown_cast, S}.
  207. handle_call(get_leader, _, S, E) ->
  208. {reply, gen_leader:leader_node(E), S};
  209. handle_call(_, _, S, _) ->
  210. {reply, badarg, S}.
  211. handle_info({'DOWN', _MRef, process, Pid, _}, S) ->
  212. ets:delete(?TAB, {Pid, g}),
  213. leader_cast({pid_is_DOWN, Pid}),
  214. {ok, S};
  215. handle_info(_, S) ->
  216. {ok, S}.
  217. handle_info(Msg, S, _E) ->
  218. handle_info(Msg, S).
  219. elected(S, _E) ->
  220. {ok, {globals,globs()}, S#state{is_leader = true}}.
  221. elected(S, _E, undefined) ->
  222. %% I have become leader; full synch
  223. {ok, {globals, globs()}, S#state{is_leader = true}};
  224. elected(S, _E, _Node) ->
  225. Synch = {globals, globs()},
  226. if not S#state.always_broadcast ->
  227. %% Another node recognized us as the leader.
  228. %% Don't broadcast all data to everyone else
  229. {reply, Synch, S};
  230. true ->
  231. %% Main reason for doing this is if we are using a gen_leader
  232. %% that doesn't support the 'reply' return value
  233. {ok, Synch, S}
  234. end.
  235. globs() ->
  236. Gs = ets:select(?TAB, [{{{{'_',g,'_'},'_'},'_','_'},[],['$_']}]),
  237. As = ets:select(?TAB, [{{{'$1',{'_',g,'_'}}, '$2'},[],['$_']}]),
  238. _ = [gproc_lib:ensure_monitor(Pid, g) || {_, Pid, _} <- Gs],
  239. Gs ++ As.
  240. surrendered(#state{is_leader = true} = S, {globals, Globs}, _E) ->
  241. %% Leader conflict!
  242. surrendered_1(Globs),
  243. {ok, S#state{is_leader = false}};
  244. surrendered(S, {globals, Globs}, _E) ->
  245. %% globals from this node should be more correct in our table than
  246. %% in the leader's
  247. surrendered_1(Globs),
  248. {ok, S#state{is_leader = false}}.
  249. handle_DOWN(Node, S, _E) ->
  250. S1 = check_sync_requests(Node, S),
  251. Head = {{{'_',g,'_'},'_'},'$1','_'},
  252. Gs = [{'==', {node,'$1'},Node}],
  253. Globs = ets:select(?TAB, [{Head, Gs, [{{{element,1,{element,1,'$_'}},
  254. {element,2,'$_'}}}]}]),
  255. case process_globals(Globs) of
  256. [] ->
  257. {ok, S1};
  258. Broadcast ->
  259. {ok, Broadcast, S1}
  260. end.
  261. check_sync_requests(Node, #state{sync_requests = SReqs} = S) ->
  262. SReqs1 = lists:flatmap(
  263. fun({From, Ns}) ->
  264. case Ns -- [Node] of
  265. [] ->
  266. gen_leader:reply(From, {leader, reply, true}),
  267. [];
  268. Ns1 ->
  269. [{From, Ns1}]
  270. end
  271. end, SReqs),
  272. S#state{sync_requests = SReqs1}.
  273. handle_leader_call(sync, From, #state{sync_requests = SReqs} = S, E) ->
  274. GenLeader = gen_leader,
  275. case GenLeader:alive(E) -- [node()] of
  276. [] ->
  277. {reply, true, S};
  278. Alive ->
  279. GenLeader:broadcast({from_leader, {sync, From}}, Alive, E),
  280. {noreply, S#state{sync_requests = [{From, Alive}|SReqs]}}
  281. end;
  282. handle_leader_call({reg, {_C,g,_Name} = K, Value, Pid}, _From, S, _E) ->
  283. case gproc_lib:insert_reg(K, Value, Pid, g) of
  284. false ->
  285. {reply, badarg, S};
  286. true ->
  287. _ = gproc_lib:ensure_monitor(Pid,g),
  288. Vals = mk_broadcast_insert_vals([{K, Pid, Value}]),
  289. %% Vals =
  290. %% if C == a ->
  291. %% ets:lookup(?TAB, {K,a});
  292. %% C == c ->
  293. %% [{{K,Pid},Pid,Value} | ets:lookup(?TAB,{{a,g,Name},a})];
  294. %% C == n ->
  295. %% [{{K,n},Pid,Value}];
  296. %% true ->
  297. %% [{{K,Pid},Pid,Value}]
  298. %% end,
  299. {reply, true, [{insert, Vals}], S}
  300. end;
  301. handle_leader_call({monitor, {T,g,_} = K, MPid, Type}, _From, S, _E) when T==n;
  302. T==a ->
  303. case ets:lookup(?TAB, {K, T}) of
  304. [{_, Pid, _}] ->
  305. Opts = get_opts(Pid, K),
  306. Ref = make_ref(),
  307. Opts1 = gproc_lib:add_monitor(Opts, MPid, Ref, Type),
  308. _ = gproc_lib:ensure_monitor(MPid, g),
  309. Obj = {{Pid,K}, Opts1},
  310. ets:insert(?TAB, Obj),
  311. {reply, Ref, [{insert, [Obj]}], S};
  312. LookupRes ->
  313. Ref = make_ref(),
  314. case Type of
  315. standby ->
  316. Event = {failover, MPid},
  317. Msgs = insert_reg(LookupRes, K, undefined, MPid, Event),
  318. Obj = {{K,T}, MPid, undefined},
  319. Rev = {{MPid,K}, []},
  320. ets:insert(?TAB, [Obj, Rev]),
  321. MPid ! {gproc, {failover,MPid}, Ref, K},
  322. {reply, Ref, [{insert, [Obj, Rev]},
  323. {notify, Msgs}], S};
  324. follow ->
  325. case LookupRes of
  326. [{_, Waiters}] ->
  327. add_follow_to_waiters(Waiters, K, MPid, Ref, S);
  328. [] ->
  329. add_follow_to_waiters([], K, MPid, Ref, S);
  330. [{_, Pid, _}] ->
  331. case ets:lookup(?TAB, {Pid,K}) of
  332. [{_, Opts}] when is_list(Opts) ->
  333. Opts1 = gproc_lib:add_monitor(
  334. Opts, MPid, Ref, follow),
  335. ets:insert(?TAB, {{Pid,K}, Opts1}),
  336. {reply, Ref,
  337. [{insert, [{{Pid,K}, Opts1}]}], S}
  338. end
  339. end;
  340. _ ->
  341. MPid ! {gproc, unreg, Ref, K},
  342. {reply, Ref, S}
  343. end
  344. end;
  345. handle_leader_call({demonitor, {T,g,_} = K, MPid, Ref}, _From, S, _E) ->
  346. case ets:lookup(?TAB, {K,T}) of
  347. [{_, Pid, _}] ->
  348. Opts = get_opts(Pid, K),
  349. Opts1 = gproc_lib:remove_monitors(Opts, MPid, Ref),
  350. Obj = {{Pid,K}, Opts1},
  351. ets:insert(?TAB, Obj),
  352. ets:delete(?TAB, {MPid, K}),
  353. {reply, ok, [{delete, [{MPid,K}]},
  354. {insert, [Obj]}], S};
  355. [{Key, Waiters}] ->
  356. NewWaiters = [W || W <- Waiters,
  357. W =/= {MPid, Ref, follow}],
  358. {reply, ok, [{insert, [{Key, NewWaiters}]}], S};
  359. _ ->
  360. {reply, ok, S}
  361. end;
  362. handle_leader_call({set_attributes, {_,g,_} = K, Attrs, Pid}, _From, S, _E) ->
  363. case gproc_lib:insert_attr(K, Attrs, Pid, g) of
  364. false ->
  365. {reply, badarg, S};
  366. NewAttrs when is_list(NewAttrs) ->
  367. {reply, true, [{insert, [{{Pid,K}, NewAttrs}]}], S}
  368. end;
  369. handle_leader_call({reg_or_locate, {n,g,_} = K, Value, P},
  370. {FromPid, _}, S, _E) ->
  371. FromNode = node(FromPid),
  372. Reg = fun() ->
  373. Pid = if is_function(P, 0) ->
  374. spawn(FromNode, P);
  375. is_pid(P) ->
  376. P
  377. end,
  378. case gproc_lib:insert_reg(K, Value, Pid, g) of
  379. true ->
  380. _ = gproc_lib:ensure_monitor(Pid,g),
  381. Vals = [{{K,n},Pid,Value}],
  382. {reply, {Pid, Value}, [{insert, Vals}], S};
  383. false ->
  384. {reply, badarg, S}
  385. end
  386. end,
  387. case ets:lookup(?TAB, {K, n}) of
  388. [] ->
  389. Reg();
  390. [{_, _Waiters}] ->
  391. Reg();
  392. [{_, OtherPid, OtherVal}] ->
  393. {reply, {OtherPid, OtherVal}, S}
  394. end;
  395. handle_leader_call({update_counter, {T,g,_Ctr} = Key, Incr, Pid}, _From, S, _E)
  396. when is_integer(Incr), T==c;
  397. is_integer(Incr), T==n ->
  398. try New = ets:update_counter(?TAB, {Key, Pid}, {3,Incr}),
  399. RealPid = case Pid of
  400. n -> ets:lookup_element(?TAB, {Key,Pid}, 2);
  401. shared -> shared;
  402. P when is_pid(P) -> P
  403. end,
  404. Vals = [{{Key,Pid},RealPid,New} | update_aggr_counter(Key, Incr)],
  405. {reply, New, [{insert, Vals}], S}
  406. catch
  407. error:_ ->
  408. {reply, badarg, S}
  409. end;
  410. handle_leader_call({update_counters, Cs}, _From, S, _E) ->
  411. try {Replies, Vals} = batch_update_counters(Cs),
  412. {reply, Replies, [{insert, Vals}], S}
  413. catch
  414. error:_ ->
  415. {reply, badarg, S}
  416. end;
  417. handle_leader_call({reset_counter, {c,g,_Ctr} = Key, Pid}, _From, S, _E) ->
  418. try Current = ets:lookup_element(?TAB, {Key, Pid}, 3),
  419. Initial = case ets:lookup_element(?TAB, {Pid, Key}, 2) of
  420. r -> 0;
  421. Opts when is_list(Opts) ->
  422. proplists:get_value(initial, Opts, 0)
  423. end,
  424. Incr = Initial - Current,
  425. New = ets:update_counter(?TAB, {Key, Pid}, {3, Incr}),
  426. Vals = [{{Key,Pid},Pid,New} | update_aggr_counter(Key, Incr)],
  427. {reply, {Current, New}, [{insert, Vals}], S}
  428. catch
  429. error:_R ->
  430. io:fwrite("reset_counter failed: ~p~n~p~n", [_R, erlang:get_stacktrace()]),
  431. {reply, badarg, S}
  432. end;
  433. handle_leader_call({unreg, {T,g,Name} = K, Pid}, _From, S, _E) ->
  434. Key = if T == n; T == a -> {K,T};
  435. true -> {K, Pid}
  436. end,
  437. case ets:member(?TAB, Key) of
  438. true ->
  439. _ = gproc_lib:remove_reg(K, Pid, unreg),
  440. if T == c ->
  441. case ets:lookup(?TAB, {{a,g,Name},a}) of
  442. [Aggr] ->
  443. %% updated by remove_reg/3
  444. {reply, true, [{delete,[Key, {Pid,K}]},
  445. {insert, [Aggr]}], S};
  446. [] ->
  447. {reply, true, [{delete, [Key, {Pid,K}]}], S}
  448. end;
  449. true ->
  450. {reply, true, [{notify, [{K, Pid, unreg}]},
  451. {delete, [Key, {Pid,K}]}], S}
  452. end;
  453. false ->
  454. {reply, badarg, S}
  455. end;
  456. handle_leader_call({give_away, {T,g,_} = K, To, Pid}, _From, S, _E)
  457. when T == a; T == n ->
  458. Key = {K, T},
  459. case ets:lookup(?TAB, Key) of
  460. [{_, Pid, Value}] ->
  461. Opts = get_opts(Pid, K),
  462. case pid_to_give_away_to(To) of
  463. Pid ->
  464. {reply, Pid, S};
  465. ToPid when is_pid(ToPid) ->
  466. ets:insert(?TAB, [{Key, ToPid, Value},
  467. {{ToPid,K}, Opts}]),
  468. _ = gproc_lib:ensure_monitor(ToPid, g),
  469. Rev = {Pid, K},
  470. ets:delete(?TAB, Rev),
  471. gproc_lib:notify({migrated, ToPid}, K, Opts),
  472. {reply, ToPid, [{insert, [{Key, ToPid, Value}]},
  473. {notify, [{K, Pid, {migrated, ToPid}}]},
  474. {delete, [Rev]}], S};
  475. undefined ->
  476. ets:delete(?TAB, Key),
  477. Rev = {Pid, K},
  478. ets:delete(?TAB, Rev),
  479. gproc_lib:notify(unreg, K, Opts),
  480. {reply, undefined, [{notify, [{K, Pid, unreg}]},
  481. {delete, [Key, Rev]}], S}
  482. end;
  483. _ ->
  484. {reply, badarg, S}
  485. end;
  486. handle_leader_call({mreg, T, g, L, Pid}, _From, S, _E) ->
  487. if T==p; T==n ->
  488. try gproc_lib:insert_many(T, g, L, Pid) of
  489. {true,Objs} -> {reply, true, [{insert,Objs}], S};
  490. false -> {reply, badarg, S}
  491. catch
  492. error:_ -> {reply, badarg, S}
  493. end;
  494. true -> {reply, badarg, S}
  495. end;
  496. handle_leader_call({munreg, T, g, L, Pid}, _From, S, _E) ->
  497. try gproc_lib:remove_many(T, g, L, Pid) of
  498. [] ->
  499. {reply, true, S};
  500. Objs ->
  501. {reply, true, [{delete, Objs}], S}
  502. catch
  503. error:_ -> {reply, badarg, S}
  504. end;
  505. handle_leader_call({set,{T,g,N} =K,V,Pid}, _From, S, _E) ->
  506. if T == a ->
  507. if is_integer(V) ->
  508. case gproc_lib:do_set_value(K, V, Pid) of
  509. true -> {reply, true, [{insert,[{{K,T},Pid,V}]}], S};
  510. false -> {reply, badarg, S}
  511. end
  512. end;
  513. T == c ->
  514. try gproc_lib:do_set_counter_value(K, V, Pid),
  515. AKey = {{a,g,N},a},
  516. Aggr = ets:lookup(?TAB, AKey), % may be []
  517. {reply, true, [{insert, [{{K,Pid},Pid,V} | Aggr]}], S}
  518. catch
  519. error:_ ->
  520. {reply, badarg, S}
  521. end;
  522. true ->
  523. case gproc_lib:do_set_value(K, V, Pid) of
  524. true ->
  525. Obj = if T==n -> {{K, T}, Pid, V};
  526. true -> {{K, Pid}, Pid, V}
  527. end,
  528. {reply, true, [{insert,[Obj]}], S};
  529. false ->
  530. {reply, badarg, S}
  531. end
  532. end;
  533. handle_leader_call({await, Key, Pid}, {_,Ref} = From, S, _E) ->
  534. %% The pid in _From is of the gen_leader instance that forwarded the
  535. %% call - not of the client. This is why the Pid is explicitly passed.
  536. %% case gproc_lib:await(Key, {Pid,Ref}) of
  537. case gproc_lib:await(Key, Pid, From) of
  538. {reply, {Ref, {K, P, V}}} ->
  539. {reply, {Ref, {K, P, V}}, S};
  540. {reply, Reply, Insert} ->
  541. {reply, Reply, [{insert, Insert}], S}
  542. end;
  543. handle_leader_call(_, _, S, _E) ->
  544. {reply, badarg, S}.
  545. handle_leader_cast({sync_reply, Node, Ref}, S, _E) ->
  546. #state{sync_requests = SReqs} = S,
  547. case lists:keyfind(Ref, 1, SReqs) of
  548. false ->
  549. %% This should never happen, except perhaps if the leader who
  550. %% received the sync request died, and the new leader gets the
  551. %% sync reply. In that case, we trust that the client has been
  552. %% notified anyway, and ignore the message.
  553. {ok, S};
  554. {_, Ns} ->
  555. case lists:delete(Node, Ns) of
  556. [] ->
  557. gen_leader:reply(Ref, {leader, reply, true}),
  558. {ok, S#state{sync_requests = lists:keydelete(Ref,1,SReqs)}};
  559. Ns1 ->
  560. SReqs1 = lists:keyreplace(Ref, 1, SReqs, {Ref, Ns1}),
  561. {ok, S#state{sync_requests = SReqs1}}
  562. end
  563. end;
  564. handle_leader_cast({add_globals, Missing}, S, _E) ->
  565. %% This is an audit message: a peer (non-leader) had info about granted
  566. %% global resources that we didn't know of when we became leader.
  567. %% This could happen due to a race condition when the old leader died.
  568. Update = insert_globals(Missing),
  569. {ok, [{insert, Update}], S};
  570. handle_leader_cast({remove_globals, Globals}, S, _E) ->
  571. delete_globals(Globals),
  572. {ok, S};
  573. handle_leader_cast({cancel_wait, Pid, {T,_,_} = Key, Ref}, S, _E) ->
  574. case ets:lookup(?TAB, {Key, T}) of
  575. [{_, Waiters}] ->
  576. Ops = gproc_lib:remove_wait(Key, Pid, Ref, Waiters),
  577. {ok, Ops, S};
  578. _ ->
  579. {ok, [], S}
  580. end;
  581. handle_leader_cast({cancel_wait_or_monitor, Pid, {T,_,_} = Key}, S, _E) ->
  582. case ets:lookup(?TAB, {Key, T}) of
  583. [{_, Waiters}] ->
  584. Ops = gproc_lib:remove_wait(Key, Pid, all, Waiters),
  585. {ok, Ops, S};
  586. [{_, OtherPid, _}] ->
  587. Ops = gproc_lib:remove_monitors(Key, OtherPid, Pid),
  588. {ok, Ops, S}
  589. end;
  590. handle_leader_cast({pid_is_DOWN, Pid}, S, _E) ->
  591. Globals = ets:select(?TAB, [{{{Pid,'$1'}, '_'},
  592. [{'==',{element,2,'$1'},g}],[{{'$1',Pid}}]}]),
  593. ets:delete(?TAB, {Pid,g}),
  594. case process_globals(Globals) of
  595. [] ->
  596. {ok, S};
  597. Broadcast ->
  598. {ok, Broadcast, S}
  599. end.
  600. mk_broadcast_insert_vals(Objs) ->
  601. lists:flatmap(
  602. fun({{C, g, Name} = K, Pid, Value}) ->
  603. if C == a ->
  604. ets:lookup(?TAB, {K,a}) ++ ets:lookup(?TAB, {Pid,K});
  605. C == c ->
  606. [{{K,Pid},Pid,Value} | ets:lookup(?TAB,{{a,g,Name},a})]
  607. ++ ets:lookup(?TAB, {Pid,K});
  608. C == n ->
  609. [{{K,n},Pid,Value}| ets:lookup(?TAB, {Pid,K})];
  610. true ->
  611. [{{K,Pid},Pid,Value} | ets:lookup(?TAB, {Pid,K})]
  612. end
  613. end, Objs).
  614. process_globals(Globals) ->
  615. {Modified, Notifications} =
  616. lists:foldl(
  617. fun({{T,_,_} = Key, Pid}, A) when T==n; T==a ->
  618. case ets:lookup(?TAB, {Pid,Key}) of
  619. [{_, Opts}] when is_list(Opts) ->
  620. maybe_failover(Key, Pid, Opts, A);
  621. _ ->
  622. A
  623. end;
  624. ({{T,_,_} = Key, Pid}, {MA,NA}) ->
  625. MA1 = case T of
  626. c ->
  627. Incr = ets:lookup_element(?TAB, {Key,Pid}, 3),
  628. update_aggr_counter(Key, -Incr) ++ MA;
  629. _ ->
  630. MA
  631. end,
  632. N = remove_entry(Key, Pid, unreg),
  633. {MA1, N ++ NA}
  634. end, {[],[]}, Globals),
  635. [{insert, Modified} || Modified =/= []] ++
  636. [{notify, Notifications} || Notifications =/= []] ++
  637. [{delete, Globals} || Globals =/= []].
  638. maybe_failover({T,_,_} = Key, Pid, Opts, {MAcc, NAcc}) ->
  639. Opts = get_opts(Pid, Key),
  640. case filter_standbys(gproc_lib:standbys(Opts)) of
  641. [] ->
  642. Notify = remove_entry(Key, Pid, unreg),
  643. {MAcc, Notify ++ NAcc};
  644. [{ToPid,Ref,_}|_] ->
  645. Value = case ets:lookup(?TAB, {Key,T}) of
  646. [{_, _, V}] -> V;
  647. _ -> undefined
  648. end,
  649. Notify = remove_rev_entry(Opts, Pid, Key, {failover, ToPid}),
  650. Opts1 = gproc_lib:remove_monitor(Opts, ToPid, Ref),
  651. _ = gproc_lib:ensure_monitor(ToPid, g),
  652. NewReg = {{Key,T}, ToPid, Value},
  653. NewRev = {{ToPid, Key}, Opts1},
  654. ets:insert(?TAB, [NewReg, NewRev]),
  655. {[NewReg, NewRev | MAcc], Notify ++ NAcc}
  656. end.
  657. filter_standbys(SBs) ->
  658. filter_standbys(SBs, [node()|nodes()]).
  659. filter_standbys([{Pid,_,_} = H|T], Nodes) ->
  660. case lists:member(node(Pid), Nodes) of
  661. true ->
  662. [H|T];
  663. false ->
  664. filter_standbys(T, Nodes)
  665. end;
  666. filter_standbys([], _) ->
  667. [].
  668. remove_entry(Key, Pid, Event) ->
  669. K = ets_key(Key, Pid),
  670. case ets:lookup(?TAB, K) of
  671. [{_, Pid, _}] ->
  672. ets:delete(?TAB, K),
  673. remove_rev_entry(get_opts(Pid, Key), Pid, Key, Event);
  674. [{_, _OtherPid, _}] ->
  675. ets:delete(?TAB, {Pid, Key}),
  676. [];
  677. [] -> []
  678. end.
  679. remove_rev_entry(Opts, Pid, {T,g,_} = K, Event) when T==n; T==a ->
  680. Key = {Pid, K},
  681. gproc_lib:notify(Event, K, Opts),
  682. ets:delete(?TAB, Key),
  683. [{K, Pid, Event}];
  684. remove_rev_entry(_, Pid, K, _Event) ->
  685. ets:delete(?TAB, {Pid, K}),
  686. [].
  687. get_opts(Pid, K) ->
  688. case ets:lookup(?TAB, {Pid, K}) of
  689. [] -> [];
  690. [{_, r}] -> [];
  691. [{_, Opts}] -> Opts
  692. end.
  693. code_change(_FromVsn, S, _Extra, _E) ->
  694. {ok, S}.
  695. terminate(_Reason, _S) ->
  696. ok.
  697. from_leader({sync, Ref}, S, _E) ->
  698. gen_leader:leader_cast(?MODULE, {sync_reply, node(), Ref}),
  699. {ok, S};
  700. from_leader(Ops, S, _E) ->
  701. lists:foreach(
  702. fun({delete, Globals}) ->
  703. delete_globals(Globals);
  704. ({insert, Globals}) ->
  705. _ = insert_globals(Globals);
  706. ({notify, Events}) ->
  707. do_notify(Events)
  708. end, Ops),
  709. {ok, S}.
  710. insert_globals(Globals) ->
  711. lists:foldl(
  712. fun({{{_,_,_} = Key,_}, Pid, _} = Obj, A) ->
  713. ets:insert(?TAB, Obj),
  714. ets:insert_new(?TAB, {{Pid,Key}, []}),
  715. gproc_lib:ensure_monitor(Pid,g),
  716. A;
  717. ({{P,_K}, Opts} = Obj, A) when is_pid(P), is_list(Opts),Opts =/= [] ->
  718. ets:insert(?TAB, Obj),
  719. gproc_lib:ensure_monitor(P,g),
  720. [Obj] ++ A;
  721. (_Other, A) ->
  722. A
  723. end, Globals, Globals).
  724. delete_globals(Globals) ->
  725. lists:foreach(
  726. fun({{_,g,_},T} = K) when is_atom(T) ->
  727. ets:delete(?TAB, K);
  728. ({Key, Pid}) when is_pid(Pid); Pid==shared ->
  729. ets:delete(?TAB, {Pid, Key});
  730. ({Pid, Key}) when is_pid(Pid); Pid==shared ->
  731. ets:delete(?TAB, {Pid, Key})
  732. end, Globals).
  733. do_notify([{P, Msg}|T]) when is_pid(P) ->
  734. P ! Msg,
  735. do_notify(T);
  736. do_notify([{K, P, E}|T]) ->
  737. case ets:lookup(?TAB, {P,K}) of
  738. [{_, Opts}] when is_list(Opts) ->
  739. gproc_lib:notify(E, K, Opts);
  740. _ ->
  741. do_notify(T)
  742. end;
  743. do_notify([]) ->
  744. ok.
  745. ets_key({T,_,_} = K, _) when T==n; T==a ->
  746. {K, T};
  747. ets_key(K, Pid) ->
  748. {K, Pid}.
  749. leader_call(Req) ->
  750. case gen_leader:leader_call(?MODULE, Req) of
  751. badarg -> ?THROW_GPROC_ERROR(badarg);
  752. Reply -> Reply
  753. end.
  754. leader_cast(Msg) ->
  755. gen_leader:leader_cast(?MODULE, Msg).
  756. init(Opts) ->
  757. S0 = #state{},
  758. AlwaysBcast = proplists:get_value(always_broadcast, Opts,
  759. S0#state.always_broadcast),
  760. {ok, #state{always_broadcast = AlwaysBcast}}.
  761. surrendered_1(Globs) ->
  762. My_local_globs =
  763. ets:select(?TAB, [{{{{'_',g,'_'},'_'},'$1', '$2'},
  764. [{'==', {node,'$1'}, node()}],
  765. [{{ {element,1,{element,1,'$_'}}, '$1', '$2' }}]}]),
  766. _ = [gproc_lib:ensure_monitor(Pid, g) || {_, Pid, _} <- My_local_globs],
  767. %% remove all remote globals.
  768. ets:select_delete(?TAB, [{{{{'_',g,'_'},'_'}, '$1', '_'},
  769. [{'=/=', {node,'$1'}, node()}],
  770. [true]},
  771. {{{'$1',{'_',g,'_'}}, '_'},
  772. [{'=/=', {node,'$1'}, node()}],
  773. [true]}]),
  774. %% insert new non-local globals, collect the leader's version of
  775. %% what my globals are
  776. Ldr_local_globs =
  777. lists:foldl(
  778. fun({{Key,_}=K, Pid, V}, Acc) when node(Pid) =/= node() ->
  779. ets:insert(?TAB, {K, Pid, V}),
  780. _ = gproc_lib:ensure_monitor(Pid, g),
  781. ets:insert_new(?TAB, {{Pid,Key}, []}),
  782. Acc;
  783. ({{_Pid,_}=K, Opts}, Acc) -> % when node(Pid) =/= node() ->
  784. ets:insert(?TAB, {K, Opts}),
  785. Acc;
  786. ({_, Pid, _} = Obj, Acc) when node(Pid) == node() ->
  787. [Obj|Acc]
  788. end, [], Globs),
  789. case [{K,P,V} || {K,P,V} <- My_local_globs,
  790. is_pid(P) andalso
  791. not(lists:keymember(K, 1, Ldr_local_globs))] of
  792. [] ->
  793. %% phew! We have the same picture
  794. ok;
  795. [_|_] = Missing ->
  796. %% This is very unlikely, I think
  797. leader_cast({add_globals, mk_broadcast_insert_vals(Missing)})
  798. end,
  799. case [{K,P} || {K,P,_} <- Ldr_local_globs,
  800. is_pid(P) andalso
  801. not(lists:keymember(K, 1, My_local_globs))] of
  802. [] ->
  803. ok;
  804. [_|_] = Remove ->
  805. leader_cast({remove_globals, Remove})
  806. end.
  807. batch_update_counters(Cs) ->
  808. batch_update_counters(Cs, [], []).
  809. batch_update_counters([{{c,g,_} = Key, Pid, Incr}|T], Returns, Updates) ->
  810. case update_counter_g(Key, Incr, Pid) of
  811. [{_,_,_} = A, {_, _, V} = C] ->
  812. batch_update_counters(T, [{Key,Pid,V}|Returns], add_object(
  813. A, add_object(C, Updates)));
  814. [{_, _, V} = C] ->
  815. batch_update_counters(T, [{Key,Pid,V}|Returns], add_object(C, Updates))
  816. end;
  817. batch_update_counters([], Returns, Updates) ->
  818. {lists:reverse(Returns), Updates}.
  819. add_object({K,P,_} = Obj, [{K,P,_} | T]) ->
  820. [Obj | T];
  821. add_object(Obj, [H|T]) ->
  822. [H | add_object(Obj, T)];
  823. add_object(Obj, []) ->
  824. [Obj].
  825. update_counter_g({c,g,_} = Key, Incr, Pid) when is_integer(Incr) ->
  826. Res = ets:update_counter(?TAB, {Key, Pid}, {3,Incr}),
  827. update_aggr_counter(Key, Incr, [{{Key,Pid},Pid,Res}]);
  828. update_counter_g({c,g,_} = Key, {Incr, Threshold, SetValue}, Pid)
  829. when is_integer(Incr), is_integer(Threshold), is_integer(SetValue) ->
  830. [Prev, New] = ets:update_counter(?TAB, {Key, Pid},
  831. [{3, 0}, {3, Incr, Threshold, SetValue}]),
  832. update_aggr_counter(Key, New - Prev, [{{Key,Pid},Pid,New}]);
  833. update_counter_g({c,g,_} = Key, Ops, Pid) when is_list(Ops) ->
  834. case ets:update_counter(?TAB, {Key, Pid},
  835. [{3, 0} | expand_ops(Ops)]) of
  836. [_] ->
  837. [];
  838. [Prev | Rest] ->
  839. [New | _] = lists:reverse(Rest),
  840. update_aggr_counter(Key, New - Prev, [{Key, Pid, Rest}])
  841. end;
  842. update_counter_g(_, _, _) ->
  843. ?THROW_GPROC_ERROR(badarg).
  844. expand_ops([{Incr,Thr,SetV}|T])
  845. when is_integer(Incr), is_integer(Thr), is_integer(SetV) ->
  846. [{3, Incr, Thr, SetV}|expand_ops(T)];
  847. expand_ops([Incr|T]) when is_integer(Incr) ->
  848. [{3, Incr}|expand_ops(T)];
  849. expand_ops([]) ->
  850. [];
  851. expand_ops(_) ->
  852. ?THROW_GPROC_ERROR(badarg).
  853. update_aggr_counter({n,_,_}, _) ->
  854. [];
  855. update_aggr_counter(Key, Incr) ->
  856. update_aggr_counter(Key, Incr, []).
  857. update_aggr_counter({c,g,Ctr}, Incr, Acc) ->
  858. Key = {{a,g,Ctr},a},
  859. case ets:lookup(?TAB, Key) of
  860. [] ->
  861. Acc;
  862. [{K, Pid, Prev}] ->
  863. New = {K, Pid, Prev+Incr},
  864. ets:insert(?TAB, New),
  865. [New|Acc]
  866. end.
  867. pid_to_give_away_to(P) when is_pid(P) ->
  868. P;
  869. pid_to_give_away_to({T,g,_} = Key) when T==n; T==a ->
  870. case ets:lookup(?TAB, {Key, T}) of
  871. [{_, Pid, _}] ->
  872. Pid;
  873. _ ->
  874. undefined
  875. end.
  876. insert_reg([{_, Waiters}], K, Val, Pid, Event) ->
  877. gproc_lib:insert_reg(K, Val, Pid, g, []),
  878. tell_waiters(Waiters, K, Pid, Val, Event).
  879. tell_waiters([{P,R}|T], K, Pid, V, Event) ->
  880. Msg = {gproc, R, registered, {K, Pid, V}},
  881. if node(P) == node() ->
  882. P ! Msg;
  883. true ->
  884. [{P, Msg} | tell_waiters(T, K, Pid, V, Event)]
  885. end;
  886. tell_waiters([{P,R,follow}|T], K, Pid, V, Event) ->
  887. Msg = {gproc, Event, R, K},
  888. if node(P) == node() ->
  889. P ! Msg;
  890. true ->
  891. [{P, Msg} | tell_waiters(T, K, Pid, V, Event)]
  892. end;
  893. tell_waiters([], _, _, _, _) ->
  894. [].
  895. add_follow_to_waiters(Waiters, {T,_,_} = K, Pid, Ref, S) ->
  896. Obj = {{K,T}, [{Pid, Ref, follow}|Waiters]},
  897. Rev = {{Pid,K}, []},
  898. ets:insert(?TAB, [Obj, Rev]),
  899. Msg = {gproc, unreg, Ref, K},
  900. if node(Pid) == node() ->
  901. Pid ! Msg,
  902. {reply, Ref, [{insert, [Obj, Rev]}], S};
  903. true ->
  904. {reply, Ref, [{insert, [Obj, Rev]},
  905. {notify, [{Pid, Msg}]}], S}
  906. end.