gproc_dist.erl 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191
  1. %% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
  2. %% --------------------------------------------------
  3. %% This file is provided to you under the Apache License,
  4. %% Version 2.0 (the "License"); you may not use this file
  5. %% except in compliance with the License. You may obtain
  6. %% a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing,
  11. %% software distributed under the License is distributed on an
  12. %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  13. %% KIND, either express or implied. See the License for the
  14. %% specific language governing permissions and limitations
  15. %% under the License.
  16. %% --------------------------------------------------
  17. %%
  18. %% @author Ulf Wiger <ulf@wiger.net>
  19. %%
  20. %% @doc Extended process registry
  21. %% <p>This module implements an extended process registry</p>
  22. %% <p>For a detailed description, see gproc/doc/erlang07-wiger.pdf.</p>
  23. %% @end
  24. -module(gproc_dist).
  25. -behaviour(gen_leader).
  26. -export([start_link/0, start_link/1,
  27. reg/1, reg/4, unreg/1,
  28. reg_other/5, unreg_other/2,
  29. reg_or_locate/3,
  30. reg_shared/3, unreg_shared/1,
  31. monitor/2,
  32. demonitor/2,
  33. set_attributes/2,
  34. set_attributes_shared/2,
  35. mreg/2,
  36. munreg/2,
  37. set_value/2,
  38. set_value_shared/2,
  39. give_away/2,
  40. update_counter/3,
  41. update_counters/1,
  42. update_shared_counter/2,
  43. reset_counter/1]).
  44. -export([leader_call/1,
  45. leader_cast/1,
  46. sync/0,
  47. get_leader/0]).
  48. %%% internal exports
  49. -export([init/1,
  50. handle_cast/3,
  51. handle_call/4,
  52. handle_info/2, handle_info/3,
  53. handle_leader_call/4,
  54. handle_leader_cast/3,
  55. handle_DOWN/3,
  56. elected/2, % original version
  57. elected/3,
  58. surrendered/3,
  59. from_leader/3,
  60. code_change/4,
  61. terminate/2]).
  62. -include("gproc_int.hrl").
  63. -include("gproc.hrl").
  64. -define(SERVER, ?MODULE).
  65. -record(state, {
  66. always_broadcast = false,
  67. is_leader,
  68. sync_clients = [],
  69. sync_requests = []}).
  70. -include("gproc_trace.hrl").
  71. %% ==========================================================
  72. %% Start functions
  73. start_link() ->
  74. start_link({[node()|nodes()], []}).
  75. start_link(all) ->
  76. Workers = case application:get_env(gproc_dist_workers) of
  77. {ok, [_|_] = WorkersList} -> WorkersList;
  78. _ -> []
  79. end,
  80. start_link({[node()|nodes()], [{bcast_type, all}, {workers, Workers}]});
  81. start_link(Nodes) when is_list(Nodes) ->
  82. start_link({Nodes, []});
  83. start_link({Nodes, Opts}) ->
  84. SpawnOpts = gproc_lib:valid_opts(server_options, []),
  85. gen_leader:start_link(
  86. ?SERVER, Nodes, Opts, ?MODULE, [], [{spawn_opt, SpawnOpts}]).
  87. %% ==========================================================
  88. %% API
  89. %% {@see gproc:reg/1}
  90. %%
  91. reg(Key) ->
  92. reg(Key, gproc:default(Key), [], reg).
  93. %% {@see gproc:reg_or_locate/2}
  94. %%
  95. reg_or_locate({n,g,_} = Key, Value, Pid) when is_pid(Pid) ->
  96. leader_call({reg_or_locate, Key, Value, Pid});
  97. reg_or_locate({n,g,_} = Key, Value, F) when is_function(F, 0) ->
  98. MyGroupLeader = group_leader(),
  99. leader_call({reg_or_locate, Key, Value,
  100. fun() ->
  101. %% leader will spawn on caller's node
  102. group_leader(MyGroupLeader, self()),
  103. F()
  104. end});
  105. reg_or_locate(_, _, _) ->
  106. ?THROW_GPROC_ERROR(badarg).
  107. %%% @spec({Class,g, Key}, Value) -> true
  108. %%% @doc
  109. %%% Class = n - unique name
  110. %%% | p - non-unique property
  111. %%% | c - counter
  112. %%% | a - aggregated counter
  113. %%% | r - resource property
  114. %%% | rc - resource counter
  115. %%% @end
  116. reg({_,g,_} = Key, Value, Attrs, Op) ->
  117. %% anything global
  118. leader_call({reg, Key, Value, self(), Attrs, Op});
  119. reg(_, _, _, _) ->
  120. ?THROW_GPROC_ERROR(badarg).
  121. %% @spec ({Class,g,Key}, pid(), Value, Attrs, Op::reg | unreg) -> true
  122. %% @doc
  123. %% Class = n - unique name
  124. %% | a - aggregated counter
  125. %% | r - resource property
  126. %% | rc - resource counter
  127. %% Value = term()
  128. %% Attrs = [{Key, Value}]
  129. %% @end
  130. reg_other({T,g,_} = Key, Pid, Value, Attrs, Op) when is_pid(Pid) ->
  131. if T==n; T==a; T==r; T==rc ->
  132. leader_call({reg_other, Key, Value, Pid, Attrs, Op});
  133. true ->
  134. ?THROW_GPROC_ERROR(badarg)
  135. end;
  136. reg_other(_, _, _, _, _) ->
  137. ?THROW_GPROC_ERROR(badarg).
  138. unreg_other({T,g,_} = Key, Pid) when is_pid(Pid) ->
  139. if T==n; T==a; T==r; T==rc ->
  140. leader_call({unreg_other, Key, Pid});
  141. true ->
  142. ?THROW_GPROC_ERROR(badarg)
  143. end;
  144. unreg_other(_, _) ->
  145. ?THROW_GPROC_ERROR(badarg).
  146. reg_shared({_,g,_} = Key, Value, Attrs) ->
  147. leader_call({reg, Key, Value, shared, Attrs, reg});
  148. reg_shared(_, _, _) ->
  149. ?THROW_GPROC_ERROR(badarg).
  150. monitor({_,g,_} = Key, Type) when Type==info;
  151. Type==follow;
  152. Type==standby ->
  153. leader_call({monitor, Key, self(), Type});
  154. monitor(_, _) ->
  155. ?THROW_GPROC_ERROR(badarg).
  156. demonitor({_,g,_} = Key, Ref) ->
  157. leader_call({demonitor, Key, self(), Ref});
  158. demonitor(_, _) ->
  159. ?THROW_GPROC_ERROR(badarg).
  160. set_attributes({_,g,_} = Key, Attrs) ->
  161. leader_call({set_attributes, Key, Attrs, self()});
  162. set_attributes(_, _) ->
  163. ?THROW_GPROC_ERROR(badarg).
  164. set_attributes_shared({_,g,_} = Key, Attrs) ->
  165. leader_call({set_attributes, Key, Attrs, shared});
  166. set_attributes_shared(_, _) ->
  167. ?THROW_GPROC_ERROR(badarg).
  168. mreg(T, KVL) ->
  169. if is_list(KVL) -> leader_call({mreg, T, g, KVL, self()});
  170. true -> ?THROW_GPROC_ERROR(badarg)
  171. end.
  172. munreg(T, Keys) ->
  173. if is_list(Keys) -> leader_call({munreg, T, g, Keys, self()});
  174. true -> ?THROW_GPROC_ERROR(badarg)
  175. end.
  176. unreg({_,g,_} = Key) ->
  177. leader_call({unreg, Key, self()});
  178. unreg(_) ->
  179. ?THROW_GPROC_ERROR(badarg).
  180. unreg_shared({T,g,_} = Key) when T==c; T==a ->
  181. leader_call({unreg, Key, shared});
  182. unreg_shared(_) ->
  183. ?THROW_GPROC_ERROR(badarg).
  184. set_value({T,g,_} = Key, Value) when T==a; T==c ->
  185. if is_integer(Value) ->
  186. leader_call({set, Key, Value, self()});
  187. true ->
  188. ?THROW_GPROC_ERROR(badarg)
  189. end;
  190. set_value({_,g,_} = Key, Value) ->
  191. leader_call({set, Key, Value, self()});
  192. set_value(_, _) ->
  193. ?THROW_GPROC_ERROR(badarg).
  194. set_value_shared({T,g,_} = Key, Value) when T==a; T==c; T==p ->
  195. leader_call({set, Key, Value, shared});
  196. set_value_shared(_, _) ->
  197. ?THROW_GPROC_ERROR(badarg).
  198. give_away({_,g,_} = Key, To) ->
  199. leader_call({give_away, Key, To, self()}).
  200. update_counter({T,g,_} = Key, Pid, Incr) when is_integer(Incr), T==c;
  201. is_integer(Incr), T==n ->
  202. leader_call({update_counter, Key, Incr, Pid});
  203. update_counter(_, _, _) ->
  204. ?THROW_GPROC_ERROR(badarg).
  205. update_counters(List) when is_list(List) ->
  206. leader_call({update_counters, List});
  207. update_counters(_) ->
  208. ?THROW_GPROC_ERROR(badarg).
  209. update_shared_counter({c,g,_} = Key, Incr) when is_integer(Incr) ->
  210. leader_call({update_counter, Key, Incr, shared});
  211. update_shared_counter(_, _) ->
  212. ?THROW_GPROC_ERROR(badarg).
  213. reset_counter({c,g,_} = Key) ->
  214. leader_call({reset_counter, Key, self()});
  215. reset_counter(_) ->
  216. ?THROW_GPROC_ERROR(badarg).
  217. %% @spec sync() -> true
  218. %% @doc Synchronize with the gproc leader
  219. %%
  220. %% This function can be used to ensure that data has been replicated from the
  221. %% leader to the current node. It does so by asking the leader to ping all
  222. %% live participating nodes. The call will return `true' when all these nodes
  223. %% have either responded or died. In the special case where the leader dies
  224. %% during an ongoing sync, the call will fail with a timeout exception.
  225. %% (Actually, it should be a `leader_died' exception; more study needed to find
  226. %% out why gen_leader times out in this situation, rather than reporting that
  227. %% the leader died.)
  228. %% @end
  229. %%
  230. sync() ->
  231. %% Increase timeout since gen_leader can take some time ...
  232. gen_server:call(?MODULE, sync, 10000).
  233. %% @spec get_leader() -> node()
  234. %% @doc Returns the node of the current gproc leader.
  235. %% @end
  236. get_leader() ->
  237. GenLeader = gen_leader,
  238. GenLeader:call(?MODULE, get_leader).
  239. %% ==========================================================
  240. %% Server-side
  241. handle_cast(_Msg, S, _) ->
  242. {stop, unknown_cast, S}.
  243. handle_call(get_leader, _, S, E) ->
  244. {reply, gen_leader:leader_node(E), S};
  245. handle_call(sync, From, S, E) ->
  246. {noreply, initiate_sync(From, S, E)};
  247. handle_call(_, _, S, _) ->
  248. {reply, badarg, S}.
  249. handle_info({'DOWN', _MRef, process, Pid, _}, S) ->
  250. ets:delete(?TAB, {Pid, g}),
  251. leader_cast({pid_is_DOWN, Pid}),
  252. {ok, S};
  253. handle_info({gproc_unreg, Objs}, S) ->
  254. {ok, [{delete, Objs}], S};
  255. handle_info(_, S) ->
  256. {ok, S}.
  257. handle_info(Msg, S, _E) ->
  258. handle_info(Msg, S).
  259. elected(S, _E) ->
  260. {ok, {globals,globs()}, S#state{is_leader = true}}.
  261. elected(S, E, undefined) ->
  262. %% I have become leader; full synch
  263. {ok, {globals, globs()},
  264. maybe_reinitiate_sync(S#state{is_leader = true}, E)};
  265. elected(S, E, _Node) ->
  266. Synch = {globals, globs()},
  267. if not S#state.always_broadcast ->
  268. %% Another node recognized us as the leader.
  269. %% Don't broadcast all data to everyone else
  270. {reply, Synch, maybe_reinitiate_sync(S, E)};
  271. true ->
  272. %% Main reason for doing this is if we are using a gen_leader
  273. %% that doesn't support the 'reply' return value
  274. {ok, Synch, maybe_reinitiate_sync(S, E)}
  275. end.
  276. globs() ->
  277. Gs = ets:select(?TAB, [{{{{'_',g,'_'},'_'},'_','_'},[],['$_']}]),
  278. As = ets:select(?TAB, [{{{'$1',{'_',g,'_'}}, '$2'},[],['$_']}]),
  279. _ = [gproc_lib:ensure_monitor(Pid, g) || {_, Pid, _} <- Gs],
  280. Gs ++ As.
  281. surrendered(#state{is_leader = true} = S, {globals, Globs}, E) ->
  282. %% Leader conflict!
  283. surrendered_1(Globs),
  284. {ok, maybe_reinitiate_sync(S#state{is_leader = false}, E)};
  285. surrendered(S, {globals, Globs}, E) ->
  286. %% globals from this node should be more correct in our table than
  287. %% in the leader's
  288. surrendered_1(Globs),
  289. {ok, maybe_reinitiate_sync(S#state{is_leader = false}, E)}.
  290. handle_DOWN(Node, S, E) ->
  291. S1 = check_sync_requests(Node, S, E),
  292. Head = {{{'_',g,'_'},'_'},'$1','_'},
  293. Gs = [{'==', {node,'$1'},Node}],
  294. Globs = ets:select(?TAB, [{Head, Gs, [{{{element,1,{element,1,'$_'}},
  295. {element,2,'$_'}}}]}]),
  296. case process_globals(Globs) of
  297. [] ->
  298. {ok, S1};
  299. Broadcast ->
  300. {ok, Broadcast, S1}
  301. end.
  302. check_sync_requests(Node, #state{sync_requests = SReqs} = S, E) ->
  303. check_sync_requests(SReqs, Node, S, E).
  304. check_sync_requests([], _, S, _) ->
  305. S;
  306. check_sync_requests([{From, Ns}|Reqs], Node, S, E) ->
  307. case lists:member(Node, Ns) of
  308. true ->
  309. remove_node_from_sync_request(Node, Ns, From, S, E);
  310. false ->
  311. check_sync_requests(Reqs, Node, S, E)
  312. end.
  313. remove_node_from_sync_request(Node, Ns, From, S, E) ->
  314. case Ns -- [Node] of
  315. [] ->
  316. check_sync_requests(Node, send_sync_complete(From, S, E), E);
  317. Ns1 ->
  318. Rs1 = lists:keyreplace(
  319. From, 1, S#state.sync_requests, {From, Ns1}),
  320. %% Yes, we start over and run through the list from the top,
  321. %% with updated state; simpler code that way.
  322. check_sync_requests(Node, S#state{sync_requests = Rs1}, E)
  323. end.
  324. handle_leader_call({Reg, {_C,g,_Name} = K, Value, Pid, As, Op}, _From, S, _E)
  325. when Reg==reg; Reg==reg_other ->
  326. case gproc_lib:insert_reg(K, Value, Pid, g) of
  327. false when Op == reg ->
  328. {reply, badarg, S};
  329. false when Op == ensure ->
  330. case ets:lookup(?TAB, ets_key(K, Pid)) of
  331. [{_, Pid, _}] ->
  332. gproc_lib:do_set_value(K, Value, Pid),
  333. gproc_lib:insert_attr(K, As, Pid, g),
  334. Vals = mk_broadcast_insert_vals([{K, Pid, Value}]),
  335. {reply, updated, [{insert, Vals}], S};
  336. _ ->
  337. {reply, badarg, [], S}
  338. end;
  339. true ->
  340. _ = gproc_lib:ensure_monitor(Pid,g),
  341. _ = if As =/= [] ->
  342. gproc_lib:insert_attr(K, As, Pid, g);
  343. true -> []
  344. end,
  345. Vals = mk_broadcast_insert_vals([{K, Pid, Value}]),
  346. {reply, regged_new(Op), [{insert, Vals}], S}
  347. end;
  348. handle_leader_call({monitor, {T,g,_} = K, MPid, Type}, _From, S, _E) when T==n;
  349. T==a ->
  350. case ets:lookup(?TAB, {K, T}) of
  351. [{_, Pid, _}] ->
  352. Opts = get_opts(Pid, K),
  353. Ref = make_ref(),
  354. Opts1 = gproc_lib:add_monitor(Opts, MPid, Ref, Type),
  355. _ = gproc_lib:ensure_monitor(MPid, g),
  356. Obj = {{Pid,K}, Opts1},
  357. ets:insert(?TAB, Obj),
  358. {reply, Ref, [{insert, [Obj]}], S};
  359. LookupRes ->
  360. Ref = make_ref(),
  361. case Type of
  362. standby ->
  363. Event = {failover, MPid},
  364. Msgs = insert_reg(LookupRes, K, undefined, MPid, Event),
  365. Obj = {{K,T}, MPid, undefined},
  366. Rev = {{MPid,K}, []},
  367. ets:insert(?TAB, [Obj, Rev]),
  368. MPid ! {gproc, {failover,MPid}, Ref, K},
  369. {reply, Ref, [{insert, [Obj, Rev]},
  370. {notify, Msgs}], S};
  371. follow ->
  372. case LookupRes of
  373. [{_, Waiters}] ->
  374. add_follow_to_waiters(Waiters, K, MPid, Ref, S);
  375. [] ->
  376. add_follow_to_waiters([], K, MPid, Ref, S);
  377. [{_, Pid, _}] ->
  378. case ets:lookup(?TAB, {Pid,K}) of
  379. [{_, Opts}] when is_list(Opts) ->
  380. Opts1 = gproc_lib:add_monitor(
  381. Opts, MPid, Ref, follow),
  382. ets:insert(?TAB, {{Pid,K}, Opts1}),
  383. {reply, Ref,
  384. [{insert, [{{Pid,K}, Opts1}]}], S}
  385. end
  386. end;
  387. _ ->
  388. MPid ! {gproc, unreg, Ref, K},
  389. {reply, Ref, S}
  390. end
  391. end;
  392. handle_leader_call({demonitor, {T,g,_} = K, MPid, Ref}, _From, S, _E) ->
  393. case ets:lookup(?TAB, {K,T}) of
  394. [{_, Pid, _}] ->
  395. Opts = get_opts(Pid, K),
  396. Opts1 = gproc_lib:remove_monitors(Opts, MPid, Ref),
  397. Obj = {{Pid,K}, Opts1},
  398. ets:insert(?TAB, Obj),
  399. Del = case gproc_lib:does_pid_monitor(MPid, Opts1) of
  400. true -> [];
  401. false ->
  402. ets:delete(?TAB, {MPid, K}),
  403. [{delete, [{MPid, K}]}]
  404. end,
  405. {reply, ok, Del ++ [{insert, [Obj]}], S};
  406. [{Key, Waiters}] ->
  407. case lists:filter(fun({P, R, _}) ->
  408. P =/= MPid orelse R =/= Ref
  409. end, Waiters) of
  410. [] ->
  411. ets:delete(?TAB, {MPid, K}),
  412. ets:delete(?TAB, Key),
  413. {reply, ok, [{delete, [{MPid, K}, Key]}], S};
  414. NewWaiters ->
  415. ets:insert(?TAB, {Key, NewWaiters}),
  416. Del = case lists:keymember(MPid, 1, NewWaiters) of
  417. false ->
  418. ets:delete(?TAB, {MPid, K}),
  419. [{delete, [{MPid, K}]}];
  420. true ->
  421. []
  422. end,
  423. {reply, ok, Del ++ [{insert, [{Key, NewWaiters}]}], S}
  424. end;
  425. _ ->
  426. {reply, ok, S}
  427. end;
  428. handle_leader_call({set_attributes, {_,g,_} = K, Attrs, Pid}, _From, S, _E) ->
  429. case gproc_lib:insert_attr(K, Attrs, Pid, g) of
  430. false ->
  431. {reply, badarg, S};
  432. NewAttrs when is_list(NewAttrs) ->
  433. {reply, true, [{insert, [{{Pid,K}, NewAttrs}]}], S}
  434. end;
  435. handle_leader_call({reg_or_locate, {n,g,_} = K, Value, P},
  436. {FromPid, _}, S, _E) ->
  437. FromNode = node(FromPid),
  438. Reg = fun() ->
  439. Pid = if is_function(P, 0) ->
  440. spawn(FromNode, P);
  441. is_pid(P) ->
  442. P
  443. end,
  444. case gproc_lib:insert_reg(K, Value, Pid, g) of
  445. true ->
  446. _ = gproc_lib:ensure_monitor(Pid,g),
  447. Vals = [{{K,n},Pid,Value}],
  448. {reply, {Pid, Value}, [{insert, Vals}], S};
  449. false ->
  450. {reply, badarg, S}
  451. end
  452. end,
  453. case ets:lookup(?TAB, {K, n}) of
  454. [] ->
  455. Reg();
  456. [{_, _Waiters}] ->
  457. Reg();
  458. [{_, OtherPid, OtherVal}] ->
  459. {reply, {OtherPid, OtherVal}, S}
  460. end;
  461. handle_leader_call({update_counter, {T,g,_Ctr} = Key, Incr, Pid}, _From, S, _E)
  462. when is_integer(Incr), T==c;
  463. is_integer(Incr), T==n ->
  464. try New = ets:update_counter(?TAB, {Key, Pid}, {3,Incr}),
  465. RealPid = case Pid of
  466. n -> ets:lookup_element(?TAB, {Key,Pid}, 2);
  467. shared -> shared;
  468. P when is_pid(P) -> P
  469. end,
  470. Vals = [{{Key,Pid},RealPid,New} | update_aggr_counter(Key, Incr)],
  471. {reply, New, [{insert, Vals}], S}
  472. catch
  473. error:_ ->
  474. {reply, badarg, S}
  475. end;
  476. handle_leader_call({update_counters, Cs}, _From, S, _E) ->
  477. try {Replies, Vals} = batch_update_counters(Cs),
  478. {reply, Replies, [{insert, Vals}], S}
  479. catch
  480. error:_ ->
  481. {reply, badarg, S}
  482. end;
  483. handle_leader_call({reset_counter, {c,g,_Ctr} = Key, Pid}, _From, S, _E) ->
  484. try Current = ets:lookup_element(?TAB, {Key, Pid}, 3),
  485. Initial = case ets:lookup_element(?TAB, {Pid, Key}, 2) of
  486. r -> 0;
  487. Opts when is_list(Opts) ->
  488. proplists:get_value(initial, Opts, 0)
  489. end,
  490. Incr = Initial - Current,
  491. New = ets:update_counter(?TAB, {Key, Pid}, {3, Incr}),
  492. Vals = [{{Key,Pid},Pid,New} | update_aggr_counter(Key, Incr)],
  493. {reply, {Current, New}, [{insert, Vals}], S}
  494. catch
  495. error:_R ->
  496. io:fwrite("reset_counter failed: ~p~n~p~n", [_R, erlang:get_stacktrace()]),
  497. {reply, badarg, S}
  498. end;
  499. handle_leader_call({Unreg, {T,g,Name} = K, Pid}, _From, S, _E)
  500. when Unreg==unreg;
  501. Unreg==unreg_other->
  502. Key = if T == n; T == a; T == rc -> {K,T};
  503. true -> {K, Pid}
  504. end,
  505. case ets:member(?TAB, Key) of
  506. true ->
  507. _ = gproc_lib:remove_reg(K, Pid, unreg),
  508. if T == c ->
  509. case ets:lookup(?TAB, {{a,g,Name},a}) of
  510. [Aggr] ->
  511. %% updated by remove_reg/3
  512. {reply, true, [{delete,[{K,Pid}, {Pid,K}]},
  513. {insert, [Aggr]}], S};
  514. [] ->
  515. {reply, true, [{delete, [{K,Pid}, {Pid,K}]}], S}
  516. end;
  517. T == r ->
  518. case ets:lookup(?TAB, {{rc,g,Name},rc}) of
  519. [RC] ->
  520. {reply, true, [{delete,[{K,Pid}, {Pid,K}]},
  521. {insert, [RC]}], S};
  522. [] ->
  523. {reply, true, [{delete, [{K,Pid}, {Pid, K}]}], S}
  524. end;
  525. true ->
  526. {reply, true, [{notify, [{K, Pid, unreg}]},
  527. {delete, [{K, Pid}, {Pid,K}]}], S}
  528. end;
  529. false ->
  530. {reply, badarg, S}
  531. end;
  532. handle_leader_call({give_away, {T,g,_} = K, To, Pid}, _From, S, _E)
  533. when T == a; T == n; T == rc ->
  534. Key = {K, T},
  535. case ets:lookup(?TAB, Key) of
  536. [{_, Pid, Value}] ->
  537. Opts = get_opts(Pid, K),
  538. case pid_to_give_away_to(To) of
  539. Pid ->
  540. {reply, Pid, S};
  541. ToPid when is_pid(ToPid) ->
  542. ets:insert(?TAB, [{Key, ToPid, Value},
  543. {{ToPid,K}, Opts}]),
  544. _ = gproc_lib:ensure_monitor(ToPid, g),
  545. Rev = {Pid, K},
  546. ets:delete(?TAB, Rev),
  547. gproc_lib:notify({migrated, ToPid}, K, Opts),
  548. {reply, ToPid, [{insert, [{Key, ToPid, Value}]},
  549. {notify, [{K, Pid, {migrated, ToPid}}]},
  550. {delete, [{K, Pid}, Rev]}], S};
  551. undefined ->
  552. ets:delete(?TAB, Key),
  553. Rev = {Pid, K},
  554. ets:delete(?TAB, Rev),
  555. gproc_lib:notify(unreg, K, Opts),
  556. {reply, undefined, [{notify, [{K, Pid, unreg}]},
  557. {delete, [{K, Pid}, Rev]}], S}
  558. end;
  559. _ ->
  560. {reply, badarg, S}
  561. end;
  562. handle_leader_call({mreg, T, g, L, Pid}, _From, S, _E) ->
  563. if T==p; T==n; T==r ->
  564. try gproc_lib:insert_many(T, g, L, Pid) of
  565. {true,Objs} -> {reply, true, [{insert,Objs}], S};
  566. false -> {reply, badarg, S}
  567. catch
  568. error:_ -> {reply, badarg, S}
  569. end;
  570. true -> {reply, badarg, S}
  571. end;
  572. handle_leader_call({munreg, T, g, L, Pid}, _From, S, _E) ->
  573. try gproc_lib:remove_many(T, g, L, Pid) of
  574. [] ->
  575. {reply, true, S};
  576. Objs ->
  577. {reply, true, [{delete, Objs}], S}
  578. catch
  579. error:_ -> {reply, badarg, S}
  580. end;
  581. handle_leader_call({set,{T,g,N} =K,V,Pid}, _From, S, _E) ->
  582. if T == a ->
  583. if is_integer(V) ->
  584. case gproc_lib:do_set_value(K, V, Pid) of
  585. true -> {reply, true, [{insert,[{{K,T},Pid,V}]}], S};
  586. false -> {reply, badarg, S}
  587. end
  588. end;
  589. T == c ->
  590. try gproc_lib:do_set_counter_value(K, V, Pid),
  591. AKey = {{a,g,N},a},
  592. Aggr = ets:lookup(?TAB, AKey), % may be []
  593. {reply, true, [{insert, [{{K,Pid},Pid,V} | Aggr]}], S}
  594. catch
  595. error:_ ->
  596. {reply, badarg, S}
  597. end;
  598. true ->
  599. case gproc_lib:do_set_value(K, V, Pid) of
  600. true ->
  601. Obj = if T==n -> {{K, T}, Pid, V};
  602. true -> {{K, Pid}, Pid, V}
  603. end,
  604. {reply, true, [{insert,[Obj]}], S};
  605. false ->
  606. {reply, badarg, S}
  607. end
  608. end;
  609. handle_leader_call({await, Key, Pid}, {_,Ref} = From, S, _E) ->
  610. %% The pid in _From is of the gen_leader instance that forwarded the
  611. %% call - not of the client. This is why the Pid is explicitly passed.
  612. %% case gproc_lib:await(Key, {Pid,Ref}) of
  613. case gproc_lib:await(Key, Pid, From) of
  614. {reply, {Ref, {K, P, V}}} ->
  615. {reply, {Ref, {K, P, V}}, S};
  616. {reply, Reply, Insert} ->
  617. {reply, Reply, [{insert, Insert}], S}
  618. end;
  619. handle_leader_call(_, _, S, _E) ->
  620. {reply, badarg, S}.
  621. handle_leader_cast({initiate_sync, Ref}, S, E) ->
  622. case gen_leader:alive(E) -- [node()] of
  623. [] ->
  624. %% ???
  625. {noreply, send_sync_complete(Ref, S, E)};
  626. Alive ->
  627. gen_leader:broadcast({from_leader, {sync, Ref}}, Alive, E),
  628. {noreply, S#state{sync_requests =
  629. [{Ref, Alive}|S#state.sync_requests]}}
  630. end;
  631. handle_leader_cast({sync_reply, Node, Ref}, S, E) ->
  632. #state{sync_requests = SReqs} = S,
  633. case lists:keyfind(Ref, 1, SReqs) of
  634. false ->
  635. %% This should never happen, except perhaps if the leader who
  636. %% received the sync request died, and the new leader gets the
  637. %% sync reply. In that case, we trust that the client has been
  638. %% notified anyway, and ignore the message.
  639. {ok, S};
  640. {_, Ns} ->
  641. case lists:delete(Node, Ns) of
  642. [] ->
  643. {ok, send_sync_complete(Ref, S, E)};
  644. Ns1 ->
  645. SReqs1 = lists:keyreplace(Ref, 1, SReqs, {Ref, Ns1}),
  646. {ok, S#state{sync_requests = SReqs1}}
  647. end
  648. end;
  649. handle_leader_cast({add_globals, Missing}, S, _E) ->
  650. %% This is an audit message: a peer (non-leader) had info about granted
  651. %% global resources that we didn't know of when we became leader.
  652. %% This could happen due to a race condition when the old leader died.
  653. Update = insert_globals(Missing),
  654. {ok, [{insert, Update}], S};
  655. handle_leader_cast({remove_globals, Globals}, S, _E) ->
  656. delete_globals(Globals),
  657. {ok, S};
  658. handle_leader_cast({cancel_wait, Pid, {T,_,_} = Key, Ref}, S, _E) ->
  659. case ets:lookup(?TAB, {Key, T}) of
  660. [{_, Waiters}] ->
  661. Ops = gproc_lib:remove_wait(Key, Pid, Ref, Waiters),
  662. {ok, Ops, S};
  663. _ ->
  664. {ok, [], S}
  665. end;
  666. handle_leader_cast({cancel_wait_or_monitor, Pid, {T,_,_} = Key}, S, _E) ->
  667. case ets:lookup(?TAB, {Key, T}) of
  668. [{_, Waiters}] ->
  669. Ops = gproc_lib:remove_wait(Key, Pid, all, Waiters),
  670. {ok, Ops, S};
  671. [{_, OtherPid, _}] ->
  672. Ops = gproc_lib:remove_monitors(Key, OtherPid, Pid),
  673. {ok, Ops, S}
  674. end;
  675. handle_leader_cast({pid_is_DOWN, Pid}, S, _E) ->
  676. Globals = ets:select(?TAB, [{{{Pid,'$1'}, '_'},
  677. [{'==',{element,2,'$1'},g}],[{{'$1',Pid}}]}]),
  678. ets:delete(?TAB, {Pid,g}),
  679. case process_globals(Globals) of
  680. [] ->
  681. {ok, S};
  682. Broadcast ->
  683. {ok, Broadcast, S}
  684. end.
  685. mk_broadcast_insert_vals(Objs) ->
  686. lists:flatmap(
  687. fun({{C, g, Name} = K, Pid, Value}) ->
  688. if C == a; C == rc ->
  689. ets:lookup(?TAB, {K,C}) ++ ets:lookup(?TAB, {Pid,K});
  690. C == c ->
  691. [{{K,Pid},Pid,Value} | ets:lookup(?TAB,{{a,g,Name},a})]
  692. ++ ets:lookup(?TAB, {Pid,K});
  693. C == r ->
  694. [{{K,Pid},Pid,Value} | ets:lookup(?TAB,{{rc,g,Name},rc})]
  695. ++ ets:lookup(?TAB, {Pid, K});
  696. C == n ->
  697. [{{K,n},Pid,Value}| ets:lookup(?TAB, {Pid,K})];
  698. true ->
  699. [{{K,Pid},Pid,Value} | ets:lookup(?TAB, {Pid,K})]
  700. end
  701. end, Objs).
  702. process_globals(Globals) ->
  703. {Modified, Notifications} =
  704. lists:foldl(
  705. fun({{T,_,_} = Key, Pid}, A) when T==n; T==a; T==rc ->
  706. case ets:lookup(?TAB, {Pid,Key}) of
  707. [{_, Opts}] when is_list(Opts) ->
  708. maybe_failover(Key, Pid, Opts, A);
  709. _ ->
  710. A
  711. end;
  712. ({{T,_,_} = Key, Pid}, {MA,NA}) ->
  713. MA1 = case T of
  714. c ->
  715. Incr = ets:lookup_element(?TAB, {Key,Pid}, 3),
  716. update_aggr_counter(Key, -Incr) ++ MA;
  717. r ->
  718. decrement_resource_count(Key, []) ++ MA;
  719. _ ->
  720. MA
  721. end,
  722. N = remove_entry(Key, Pid, unreg),
  723. {MA1, N ++ NA}
  724. end, {[],[]}, Globals),
  725. [{insert, Modified} || Modified =/= []] ++
  726. [{notify, Notifications} || Notifications =/= []] ++
  727. [{delete, Globals} || Globals =/= []].
  728. maybe_failover({T,_,_} = Key, Pid, Opts, {MAcc, NAcc}) ->
  729. Opts = get_opts(Pid, Key),
  730. case filter_standbys(gproc_lib:standbys(Opts)) of
  731. [] ->
  732. Notify = remove_entry(Key, Pid, unreg),
  733. {MAcc, Notify ++ NAcc};
  734. [{ToPid,Ref,_}|_] ->
  735. Value = case ets:lookup(?TAB, {Key,T}) of
  736. [{_, _, V}] -> V;
  737. _ -> undefined
  738. end,
  739. Notify = remove_rev_entry(Opts, Pid, Key, {failover, ToPid}),
  740. Opts1 = gproc_lib:remove_monitor(Opts, ToPid, Ref),
  741. _ = gproc_lib:ensure_monitor(ToPid, g),
  742. NewReg = {{Key,T}, ToPid, Value},
  743. NewRev = {{ToPid, Key}, Opts1},
  744. ets:insert(?TAB, [NewReg, NewRev]),
  745. {[NewReg, NewRev | MAcc], Notify ++ NAcc}
  746. end.
  747. filter_standbys(SBs) ->
  748. filter_standbys(SBs, [node()|nodes()]).
  749. filter_standbys([{Pid,_,_} = H|T], Nodes) ->
  750. case lists:member(node(Pid), Nodes) of
  751. true ->
  752. [H|T];
  753. false ->
  754. filter_standbys(T, Nodes)
  755. end;
  756. filter_standbys([], _) ->
  757. [].
  758. remove_entry(Key, Pid, Event) ->
  759. K = ets_key(Key, Pid),
  760. case ets:lookup(?TAB, K) of
  761. [{_, P, _}] when is_pid(P), P =:= Pid; is_atom(Pid) ->
  762. ets:delete(?TAB, K),
  763. remove_rev_entry(get_opts(Pid, Key), Pid, Key, Event);
  764. [{_, _OtherPid, _}] ->
  765. ets:delete(?TAB, {Pid, Key}),
  766. [];
  767. [{_, _Waiters}] ->
  768. ets:delete(?TAB, K),
  769. [];
  770. [] -> []
  771. end.
  772. remove_rev_entry(Opts, Pid, {T,g,_} = K, Event) when T==n; T==a ->
  773. Key = {Pid, K},
  774. gproc_lib:notify(Event, K, Opts),
  775. ets:delete(?TAB, Key),
  776. [{K, Pid, Event}];
  777. remove_rev_entry(_, Pid, K, _Event) ->
  778. ets:delete(?TAB, {Pid, K}),
  779. [].
  780. get_opts(Pid, K) ->
  781. case ets:lookup(?TAB, {Pid, K}) of
  782. [] -> [];
  783. [{_, r}] -> [];
  784. [{_, Opts}] -> Opts
  785. end.
  786. code_change(_FromVsn, S, _Extra, _E) ->
  787. {ok, S}.
  788. terminate(_Reason, _S) ->
  789. ok.
  790. from_leader({sync, Ref}, S, _E) ->
  791. gen_leader:leader_cast(?MODULE, {sync_reply, node(), Ref}),
  792. {ok, S};
  793. from_leader({sync_complete, Ref}, S, _E) ->
  794. case Ref of
  795. {From, _} when node(From) == node() ->
  796. {ok, reply_to_sync_client(Ref, S)};
  797. _ ->
  798. %% we shouldn't have to, but ensure that we don't have
  799. %% the sync request in our state.
  800. {ok, S#state{sync_requests = lists:keydelete(
  801. Ref, 1, S#state.sync_requests)}}
  802. end;
  803. from_leader(Ops, S, _E) ->
  804. lists:foreach(
  805. fun({delete, Globals}) ->
  806. delete_globals(Globals);
  807. ({insert, Globals}) ->
  808. _ = insert_globals(Globals);
  809. ({notify, Events}) ->
  810. do_notify(Events)
  811. end, Ops),
  812. {ok, S}.
  813. insert_globals(Globals) ->
  814. lists:foldl(
  815. fun({{{_,_,_} = Key,_}, Pid, _} = Obj, A) ->
  816. ets:insert(?TAB, Obj),
  817. ets:insert_new(?TAB, {{Pid,Key}, []}),
  818. gproc_lib:ensure_monitor(Pid,g),
  819. A;
  820. ({{{_,_,_},_}, _} = Obj, A) ->
  821. ets:insert(?TAB, Obj),
  822. A;
  823. ({{P,_K}, Opts} = Obj, A) when is_pid(P), is_list(Opts) ->
  824. ets:insert(?TAB, Obj),
  825. gproc_lib:ensure_monitor(P,g),
  826. [Obj] ++ A;
  827. (_Other, A) ->
  828. A
  829. end, Globals, Globals).
  830. delete_globals(Globals) ->
  831. lists:foreach(
  832. fun({{_,g,_} = K, T}) when is_atom(T); is_pid(T) ->
  833. remove_entry(K, T, []);
  834. ({{{_,g,_} = K, T}, P}) when is_pid(P), is_atom(T);
  835. is_pid(P), is_pid(T) ->
  836. remove_entry(K, P, []);
  837. ({Pid, Key}) when is_pid(Pid); Pid==shared ->
  838. ets:delete(?TAB, {Pid, Key})
  839. end, Globals).
  840. do_notify([{P, Msg}|T]) when is_pid(P), node(P) =:= node() ->
  841. P ! Msg,
  842. do_notify(T);
  843. do_notify([{P, _Msg}|T]) when is_pid(P) ->
  844. do_notify(T);
  845. do_notify([{K, P, E}|T]) ->
  846. case ets:lookup(?TAB, {P,K}) of
  847. [{_, Opts}] when is_list(Opts) ->
  848. gproc_lib:notify(E, K, Opts);
  849. _ ->
  850. do_notify(T)
  851. end;
  852. do_notify([]) ->
  853. ok.
  854. ets_key({T,_,_} = K, _) when T==n; T==a; T==rc ->
  855. {K, T};
  856. ets_key(K, Pid) ->
  857. {K, Pid}.
  858. leader_call(Req) ->
  859. case gen_leader:leader_call(?MODULE, Req) of
  860. badarg -> ?THROW_GPROC_ERROR(badarg);
  861. Reply -> Reply
  862. end.
  863. %% leader_call(Req, Timeout) ->
  864. %% case gen_leader:leader_call(?MODULE, Req, Timeout) of
  865. %% badarg -> ?THROW_GPROC_ERROR(badarg);
  866. %% Reply -> Reply
  867. %% end.
  868. leader_cast(Msg) ->
  869. gen_leader:leader_cast(?MODULE, Msg).
  870. init(Opts) ->
  871. S0 = #state{},
  872. AlwaysBcast = proplists:get_value(always_broadcast, Opts,
  873. S0#state.always_broadcast),
  874. {ok, #state{always_broadcast = AlwaysBcast}}.
  875. surrendered_1(Globs) ->
  876. My_local_globs =
  877. ets:select(?TAB, [{{{{'_',g,'_'},'_'},'$1', '$2'},
  878. [{'==', {node,'$1'}, node()}],
  879. [{{ {element,1,'$_'}, '$1', '$2' }}]}]),
  880. _ = [gproc_lib:ensure_monitor(Pid, g) || {_, Pid, _} <- My_local_globs],
  881. ?event({'My_local_globs', My_local_globs}),
  882. %% remove all remote globals.
  883. ets:select_delete(?TAB, [{{{{'_',g,'_'},'_'}, '$1', '_'},
  884. [{'=/=', {node,'$1'}, node()}],
  885. [true]},
  886. {{{'$1',{'_',g,'_'}}, '_'},
  887. [{'=/=', {node,'$1'}, node()}],
  888. [true]}]),
  889. %% insert new non-local globals, collect the leader's version of
  890. %% what my globals are
  891. Ldr_local_globs =
  892. lists:foldl(
  893. fun({{Key,_}=K, Pid, V}, Acc) when node(Pid) =/= node() ->
  894. ets:insert(?TAB, {K, Pid, V}),
  895. _ = gproc_lib:ensure_monitor(Pid, g),
  896. ets:insert_new(?TAB, {{Pid,Key}, []}),
  897. Acc;
  898. ({{_Pid,_}=K, Opts}, Acc) -> % when node(Pid) =/= node() ->
  899. ets:insert(?TAB, {K, Opts}),
  900. Acc;
  901. ({_, Pid, _} = Obj, Acc) when node(Pid) == node() ->
  902. [Obj|Acc]
  903. end, [], Globs),
  904. ?event({'Ldr_local_globs', Ldr_local_globs}),
  905. case [{K,P,V} || {{K,_}=R,P,V} <- My_local_globs,
  906. is_pid(P) andalso
  907. not(lists:keymember(R, 1, Ldr_local_globs))] of
  908. [] ->
  909. %% phew! We have the same picture
  910. ok;
  911. [_|_] = Missing ->
  912. %% This is very unlikely, I think
  913. ?event({'Missing', Missing}),
  914. leader_cast({add_globals, mk_broadcast_insert_vals(Missing)})
  915. end,
  916. case [{K,P} || {{K,_}=R,P,_} <- Ldr_local_globs,
  917. is_pid(P) andalso
  918. not(lists:keymember(R, 1, My_local_globs))] of
  919. [] ->
  920. ok;
  921. [_|_] = Remove ->
  922. ?event({'Remove', Remove}),
  923. leader_cast({remove_globals, Remove})
  924. end.
  925. batch_update_counters(Cs) ->
  926. batch_update_counters(Cs, [], []).
  927. batch_update_counters([{{c,g,_} = Key, Pid, Incr}|T], Returns, Updates) ->
  928. case update_counter_g(Key, Incr, Pid) of
  929. [{_,_,_} = A, {_, _, V} = C] ->
  930. batch_update_counters(T, [{Key,Pid,V}|Returns], add_object(
  931. A, add_object(C, Updates)));
  932. [{_, _, V} = C] ->
  933. batch_update_counters(T, [{Key,Pid,V}|Returns], add_object(C, Updates))
  934. end;
  935. batch_update_counters([], Returns, Updates) ->
  936. {lists:reverse(Returns), Updates}.
  937. add_object({K,P,_} = Obj, [{K,P,_} | T]) ->
  938. [Obj | T];
  939. add_object(Obj, [H|T]) ->
  940. [H | add_object(Obj, T)];
  941. add_object(Obj, []) ->
  942. [Obj].
  943. update_counter_g({c,g,_} = Key, Incr, Pid) when is_integer(Incr) ->
  944. Res = ets:update_counter(?TAB, {Key, Pid}, {3,Incr}),
  945. update_aggr_counter(Key, Incr, [{{Key,Pid},Pid,Res}]);
  946. update_counter_g({c,g,_} = Key, {Incr, Threshold, SetValue}, Pid)
  947. when is_integer(Incr), is_integer(Threshold), is_integer(SetValue) ->
  948. [Prev, New] = ets:update_counter(?TAB, {Key, Pid},
  949. [{3, 0}, {3, Incr, Threshold, SetValue}]),
  950. update_aggr_counter(Key, New - Prev, [{{Key,Pid},Pid,New}]);
  951. update_counter_g({c,g,_} = Key, Ops, Pid) when is_list(Ops) ->
  952. case ets:update_counter(?TAB, {Key, Pid},
  953. [{3, 0} | expand_ops(Ops)]) of
  954. [_] ->
  955. [];
  956. [Prev | Rest] ->
  957. [New | _] = lists:reverse(Rest),
  958. update_aggr_counter(Key, New - Prev, [{Key, Pid, Rest}])
  959. end;
  960. update_counter_g(_, _, _) ->
  961. ?THROW_GPROC_ERROR(badarg).
  962. expand_ops([{Incr,Thr,SetV}|T])
  963. when is_integer(Incr), is_integer(Thr), is_integer(SetV) ->
  964. [{3, Incr, Thr, SetV}|expand_ops(T)];
  965. expand_ops([Incr|T]) when is_integer(Incr) ->
  966. [{3, Incr}|expand_ops(T)];
  967. expand_ops([]) ->
  968. [];
  969. expand_ops(_) ->
  970. ?THROW_GPROC_ERROR(badarg).
  971. update_aggr_counter({n,_,_}, _) ->
  972. [];
  973. update_aggr_counter(Key, Incr) ->
  974. update_aggr_counter(Key, Incr, []).
  975. update_aggr_counter({c,g,Ctr}, Incr, Acc) ->
  976. Key = {{a,g,Ctr},a},
  977. case ets:lookup(?TAB, Key) of
  978. [] ->
  979. Acc;
  980. [{K, Pid, Prev}] ->
  981. New = {K, Pid, Prev+Incr},
  982. ets:insert(?TAB, New),
  983. [New|Acc]
  984. end.
  985. decrement_resource_count({r,g,Rsrc}, Acc) ->
  986. Key = {{rc,g,Rsrc},rc},
  987. case ets:member(?TAB, Key) of
  988. false ->
  989. Acc;
  990. true ->
  991. %% Call the lib function, which might trigger events
  992. gproc_lib:decrement_resource_count(g, Rsrc),
  993. ets:lookup(?TAB, Key) ++ Acc
  994. end.
  995. pid_to_give_away_to(P) when is_pid(P) ->
  996. P;
  997. pid_to_give_away_to({T,g,_} = Key) when T==n; T==a ->
  998. case ets:lookup(?TAB, {Key, T}) of
  999. [{_, Pid, _}] ->
  1000. Pid;
  1001. _ ->
  1002. undefined
  1003. end.
  1004. insert_reg([{_, Waiters}], K, Val, Pid, Event) ->
  1005. gproc_lib:insert_reg(K, Val, Pid, g),
  1006. tell_waiters(Waiters, K, Pid, Val, Event);
  1007. insert_reg([], K, Val, Pid, Event) ->
  1008. gproc_lib:insert_reg(K, Val, Pid, g),
  1009. tell_waiters([], K, Val, Pid, Event).
  1010. tell_waiters([{P,R}|T], K, Pid, V, Event) ->
  1011. Msg = {gproc, R, registered, {K, Pid, V}},
  1012. if node(P) == node() ->
  1013. P ! Msg;
  1014. true ->
  1015. [{P, Msg} | tell_waiters(T, K, Pid, V, Event)]
  1016. end;
  1017. tell_waiters([{P,R,follow}|T], K, Pid, V, Event) ->
  1018. Msg = {gproc, Event, R, K},
  1019. if node(P) == node() ->
  1020. P ! Msg;
  1021. true ->
  1022. [{P, Msg} | tell_waiters(T, K, Pid, V, Event)]
  1023. end;
  1024. tell_waiters([], _, _, _, _) ->
  1025. [].
  1026. add_follow_to_waiters(Waiters, {T,_,_} = K, Pid, Ref, S) ->
  1027. Obj = {{K,T}, [{Pid, Ref, follow}|Waiters]},
  1028. ets:insert(?TAB, Obj),
  1029. Rev = ensure_rev({Pid, K}),
  1030. Msg = {gproc, unreg, Ref, K},
  1031. if node(Pid) =:= node() ->
  1032. Pid ! Msg,
  1033. {reply, Ref, [{insert, [Obj, Rev]}], S};
  1034. true ->
  1035. {reply, Ref, [{insert, [Obj, Rev]},
  1036. {notify, [{Pid, Msg}]}], S}
  1037. end.
  1038. ensure_rev(K) ->
  1039. case ets:lookup(?TAB, K) of
  1040. [Rev] ->
  1041. Rev;
  1042. [] ->
  1043. Rev = {K, []},
  1044. ets:insert(?TAB, Rev),
  1045. Rev
  1046. end.
  1047. regged_new(reg ) -> true;
  1048. regged_new(ensure) -> new.
  1049. initiate_sync(From, #state{is_leader = true} = S, E) ->
  1050. case gen_leader:alive(E) -- [node()] of
  1051. [] ->
  1052. %% I'm alone - sync is trivial
  1053. gen_server:reply(From, true),
  1054. S;
  1055. Alive ->
  1056. gen_leader:broadcast(
  1057. {from_leader, {sync, From}}, Alive, E),
  1058. S#state{sync_requests =
  1059. [{From, Alive}|S#state.sync_requests]}
  1060. end;
  1061. initiate_sync(From, S, _E) ->
  1062. leader_cast({initiate_sync, From}),
  1063. S.
  1064. maybe_reinitiate_sync(#state{sync_clients = []} = S, _E) ->
  1065. S;
  1066. maybe_reinitiate_sync(#state{sync_clients = Cs} = S, E) ->
  1067. lists:foldl(
  1068. fun(From, Sx) ->
  1069. initiate_sync(From, Sx, E)
  1070. end, S, Cs).
  1071. send_sync_complete({From, _} = Ref, S, _E) when node(From) == node() ->
  1072. reply_to_sync_client(Ref, S);
  1073. send_sync_complete({From, _} = Ref, S, E) ->
  1074. %% Notify the node that initiated the sync
  1075. %% 'broadcasting' to exactly one node.
  1076. gen_leader:broadcast(
  1077. {from_leader, {sync_complete, Ref}}, [node(From)], E),
  1078. S#state{sync_requests =
  1079. lists:keydelete(Ref, 1, S#state.sync_requests)}.
  1080. reply_to_sync_client(Ref, S) ->
  1081. gen_server:reply(Ref, true),
  1082. S#state{sync_clients =
  1083. S#state.sync_clients -- [Ref],
  1084. sync_requests =
  1085. lists:keydelete(Ref, 1, S#state.sync_requests)}.