gproc_dist.erl 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188
  1. %% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
  2. %% --------------------------------------------------
  3. %% This file is provided to you under the Apache License,
  4. %% Version 2.0 (the "License"); you may not use this file
  5. %% except in compliance with the License. You may obtain
  6. %% a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing,
  11. %% software distributed under the License is distributed on an
  12. %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  13. %% KIND, either express or implied. See the License for the
  14. %% specific language governing permissions and limitations
  15. %% under the License.
  16. %% --------------------------------------------------
  17. %%
  18. %% @author Ulf Wiger <ulf@wiger.net>
  19. %%
  20. %% @doc Extended process registry
  21. %% <p>This module implements an extended process registry</p>
  22. %% <p>For a detailed description, see gproc/doc/erlang07-wiger.pdf.</p>
  23. %% @end
  24. -module(gproc_dist).
  25. -behaviour(gen_leader).
  26. -export([start_link/0, start_link/1,
  27. reg/1, reg/4, unreg/1,
  28. reg_other/5, unreg_other/2,
  29. reg_or_locate/3,
  30. reg_shared/3, unreg_shared/1,
  31. monitor/2,
  32. demonitor/2,
  33. set_attributes/2,
  34. set_attributes_shared/2,
  35. mreg/2,
  36. munreg/2,
  37. set_value/2,
  38. set_value_shared/2,
  39. give_away/2,
  40. update_counter/3,
  41. update_counters/1,
  42. update_shared_counter/2,
  43. reset_counter/1]).
  44. -export([leader_call/1,
  45. leader_cast/1,
  46. sync/0,
  47. get_leader/0]).
  48. %%% internal exports
  49. -export([init/1,
  50. handle_cast/3,
  51. handle_call/4,
  52. handle_info/2, handle_info/3,
  53. handle_leader_call/4,
  54. handle_leader_cast/3,
  55. handle_DOWN/3,
  56. elected/2, % original version
  57. elected/3,
  58. surrendered/3,
  59. from_leader/3,
  60. code_change/4,
  61. terminate/2]).
  62. -include("gproc_int.hrl").
  63. -include("gproc.hrl").
  64. -define(SERVER, ?MODULE).
  65. -record(state, {
  66. always_broadcast = false,
  67. is_leader,
  68. sync_clients = [],
  69. sync_requests = []}).
  70. -include("gproc_trace.hrl").
  71. %% ==========================================================
  72. %% Start functions
  73. start_link() ->
  74. start_link({[node()|nodes()], []}).
  75. start_link(all) ->
  76. Workers = case application:get_env(gproc_dist_workers) of
  77. {ok, [_|_] = WorkersList} -> WorkersList;
  78. _ -> []
  79. end,
  80. start_link({[node()|nodes()], [{bcast_type, all}, {workers, Workers}]});
  81. start_link(Nodes) when is_list(Nodes) ->
  82. start_link({Nodes, []});
  83. start_link({Nodes, Opts}) ->
  84. SpawnOpts = gproc_lib:valid_opts(server_options, []),
  85. gen_leader:start_link(
  86. ?SERVER, Nodes, Opts, ?MODULE, [], [{spawn_opt, SpawnOpts}]).
  87. %% ==========================================================
  88. %% API
  89. %% {@see gproc:reg/1}
  90. %%
  91. reg(Key) ->
  92. reg(Key, gproc:default(Key), [], reg).
  93. %% {@see gproc:reg_or_locate/2}
  94. %%
  95. reg_or_locate({n,g,_} = Key, Value, Pid) when is_pid(Pid) ->
  96. leader_call({reg_or_locate, Key, Value, Pid});
  97. reg_or_locate({n,g,_} = Key, Value, F) when is_function(F, 0) ->
  98. MyGroupLeader = group_leader(),
  99. leader_call({reg_or_locate, Key, Value,
  100. fun() ->
  101. %% leader will spawn on caller's node
  102. group_leader(MyGroupLeader, self()),
  103. F()
  104. end});
  105. reg_or_locate(_, _, _) ->
  106. ?THROW_GPROC_ERROR(badarg).
  107. %%% @spec({Class,g, Key}, Value) -> true
  108. %%% @doc
  109. %%% Class = n - unique name
  110. %%% | p - non-unique property
  111. %%% | c - counter
  112. %%% | a - aggregated counter
  113. %%% | r - resource property
  114. %%% | rc - resource counter
  115. %%% @end
  116. reg({_,g,_} = Key, Value, Attrs, Op) ->
  117. %% anything global
  118. leader_call({reg, Key, Value, self(), Attrs, Op});
  119. reg(_, _, _, _) ->
  120. ?THROW_GPROC_ERROR(badarg).
  121. %% @spec ({Class,g,Key}, pid(), Value, Attrs, Op::reg | unreg) -> true
  122. %% @doc
  123. %% Class = n - unique name
  124. %% | a - aggregated counter
  125. %% | r - resource property
  126. %% | rc - resource counter
  127. %% Value = term()
  128. %% Attrs = [{Key, Value}]
  129. %% @end
  130. reg_other({T,g,_} = Key, Pid, Value, Attrs, Op) when is_pid(Pid) ->
  131. if T==n; T==a; T==r; T==rc ->
  132. leader_call({reg_other, Key, Value, Pid, Attrs, Op});
  133. true ->
  134. ?THROW_GPROC_ERROR(badarg)
  135. end;
  136. reg_other(_, _, _, _, _) ->
  137. ?THROW_GPROC_ERROR(badarg).
  138. unreg_other({T,g,_} = Key, Pid) when is_pid(Pid) ->
  139. if T==n; T==a; T==r; T==rc ->
  140. leader_call({unreg_other, Key, Pid});
  141. true ->
  142. ?THROW_GPROC_ERROR(badarg)
  143. end;
  144. unreg_other(_, _) ->
  145. ?THROW_GPROC_ERROR(badarg).
  146. reg_shared({_,g,_} = Key, Value, Attrs) ->
  147. leader_call({reg, Key, Value, shared, Attrs, reg});
  148. reg_shared(_, _, _) ->
  149. ?THROW_GPROC_ERROR(badarg).
  150. monitor({_,g,_} = Key, Type) when Type==info;
  151. Type==follow;
  152. Type==standby ->
  153. leader_call({monitor, Key, self(), Type});
  154. monitor(_, _) ->
  155. ?THROW_GPROC_ERROR(badarg).
  156. demonitor({_,g,_} = Key, Ref) ->
  157. leader_call({demonitor, Key, self(), Ref});
  158. demonitor(_, _) ->
  159. ?THROW_GPROC_ERROR(badarg).
  160. set_attributes({_,g,_} = Key, Attrs) ->
  161. leader_call({set_attributes, Key, Attrs, self()});
  162. set_attributes(_, _) ->
  163. ?THROW_GPROC_ERROR(badarg).
  164. set_attributes_shared({_,g,_} = Key, Attrs) ->
  165. leader_call({set_attributes, Key, Attrs, shared});
  166. set_attributes_shared(_, _) ->
  167. ?THROW_GPROC_ERROR(badarg).
  168. mreg(T, KVL) ->
  169. if is_list(KVL) -> leader_call({mreg, T, g, KVL, self()});
  170. true -> ?THROW_GPROC_ERROR(badarg)
  171. end.
  172. munreg(T, Keys) ->
  173. if is_list(Keys) -> leader_call({munreg, T, g, Keys, self()});
  174. true -> ?THROW_GPROC_ERROR(badarg)
  175. end.
  176. unreg({_,g,_} = Key) ->
  177. leader_call({unreg, Key, self()});
  178. unreg(_) ->
  179. ?THROW_GPROC_ERROR(badarg).
  180. unreg_shared({T,g,_} = Key) when T==c; T==a ->
  181. leader_call({unreg, Key, shared});
  182. unreg_shared(_) ->
  183. ?THROW_GPROC_ERROR(badarg).
  184. set_value({T,g,_} = Key, Value) when T==a; T==c ->
  185. if is_integer(Value) ->
  186. leader_call({set, Key, Value, self()});
  187. true ->
  188. ?THROW_GPROC_ERROR(badarg)
  189. end;
  190. set_value({_,g,_} = Key, Value) ->
  191. leader_call({set, Key, Value, self()});
  192. set_value(_, _) ->
  193. ?THROW_GPROC_ERROR(badarg).
  194. set_value_shared({T,g,_} = Key, Value) when T==a; T==c; T==p ->
  195. leader_call({set, Key, Value, shared});
  196. set_value_shared(_, _) ->
  197. ?THROW_GPROC_ERROR(badarg).
  198. give_away({_,g,_} = Key, To) ->
  199. leader_call({give_away, Key, To, self()}).
  200. update_counter({T,g,_} = Key, Pid, Incr) when is_integer(Incr), T==c;
  201. is_integer(Incr), T==n ->
  202. leader_call({update_counter, Key, Incr, Pid});
  203. update_counter(_, _, _) ->
  204. ?THROW_GPROC_ERROR(badarg).
  205. update_counters(List) when is_list(List) ->
  206. leader_call({update_counters, List});
  207. update_counters(_) ->
  208. ?THROW_GPROC_ERROR(badarg).
  209. update_shared_counter({c,g,_} = Key, Incr) when is_integer(Incr) ->
  210. leader_call({update_counter, Key, Incr, shared});
  211. update_shared_counter(_, _) ->
  212. ?THROW_GPROC_ERROR(badarg).
  213. reset_counter({c,g,_} = Key) ->
  214. leader_call({reset_counter, Key, self()});
  215. reset_counter(_) ->
  216. ?THROW_GPROC_ERROR(badarg).
  217. %% @spec sync() -> true
  218. %% @doc Synchronize with the gproc leader
  219. %%
  220. %% This function can be used to ensure that data has been replicated from the
  221. %% leader to the current node. It does so by asking the leader to ping all
  222. %% live participating nodes. The call will return `true' when all these nodes
  223. %% have either responded or died. In the special case where the leader dies
  224. %% during an ongoing sync, the call will fail with a timeout exception.
  225. %% (Actually, it should be a `leader_died' exception; more study needed to find
  226. %% out why gen_leader times out in this situation, rather than reporting that
  227. %% the leader died.)
  228. %% @end
  229. %%
  230. sync() ->
  231. %% Increase timeout since gen_leader can take some time ...
  232. gen_server:call(?MODULE, sync, 5000).
  233. %% @spec get_leader() -> node()
  234. %% @doc Returns the node of the current gproc leader.
  235. %% @end
  236. get_leader() ->
  237. GenLeader = gen_leader,
  238. GenLeader:call(?MODULE, get_leader).
  239. %% ==========================================================
  240. %% Server-side
  241. handle_cast(_Msg, S, _) ->
  242. {stop, unknown_cast, S}.
  243. handle_call(get_leader, _, S, E) ->
  244. {reply, gen_leader:leader_node(E), S};
  245. handle_call(sync, From, S, E) ->
  246. {noreply, initiate_sync(From, S, E)};
  247. handle_call(_, _, S, _) ->
  248. {reply, badarg, S}.
  249. handle_info({'DOWN', _MRef, process, Pid, _}, S) ->
  250. ets:delete(?TAB, {Pid, g}),
  251. leader_cast({pid_is_DOWN, Pid}),
  252. {ok, S};
  253. handle_info({gproc_unreg, Objs}, S) ->
  254. {ok, [{delete, Objs}], S};
  255. handle_info(_, S) ->
  256. {ok, S}.
  257. handle_info(Msg, S, _E) ->
  258. handle_info(Msg, S).
  259. elected(S, _E) ->
  260. {ok, {globals,globs()}, S#state{is_leader = true}}.
  261. elected(S, _E, undefined) ->
  262. %% I have become leader; full synch
  263. {ok, {globals, globs()}, S#state{is_leader = true}};
  264. elected(S, _E, _Node) ->
  265. Synch = {globals, globs()},
  266. if not S#state.always_broadcast ->
  267. %% Another node recognized us as the leader.
  268. %% Don't broadcast all data to everyone else
  269. {reply, Synch, S};
  270. true ->
  271. %% Main reason for doing this is if we are using a gen_leader
  272. %% that doesn't support the 'reply' return value
  273. {ok, Synch, S}
  274. end.
  275. globs() ->
  276. Gs = ets:select(?TAB, [{{{{'_',g,'_'},'_'},'_','_'},[],['$_']}]),
  277. As = ets:select(?TAB, [{{{'$1',{'_',g,'_'}}, '$2'},[],['$_']}]),
  278. _ = [gproc_lib:ensure_monitor(Pid, g) || {_, Pid, _} <- Gs],
  279. Gs ++ As.
  280. surrendered(#state{is_leader = true} = S, {globals, Globs}, _E) ->
  281. %% Leader conflict!
  282. surrendered_1(Globs),
  283. {ok, maybe_reinitiate_sync(S#state{is_leader = false})};
  284. surrendered(S, {globals, Globs}, _E) ->
  285. %% globals from this node should be more correct in our table than
  286. %% in the leader's
  287. surrendered_1(Globs),
  288. {ok, maybe_reinitiate_sync(S#state{is_leader = false})}.
  289. handle_DOWN(Node, S, E) ->
  290. S1 = check_sync_requests(Node, S, E),
  291. Head = {{{'_',g,'_'},'_'},'$1','_'},
  292. Gs = [{'==', {node,'$1'},Node}],
  293. Globs = ets:select(?TAB, [{Head, Gs, [{{{element,1,{element,1,'$_'}},
  294. {element,2,'$_'}}}]}]),
  295. case process_globals(Globs) of
  296. [] ->
  297. {ok, S1};
  298. Broadcast ->
  299. {ok, Broadcast, S1}
  300. end.
  301. check_sync_requests(Node, #state{sync_requests = SReqs} = S, E) ->
  302. check_sync_requests(SReqs, Node, S, E).
  303. check_sync_requests([], _, S, _) ->
  304. S;
  305. check_sync_requests([{From, Ns}|Reqs], Node, S, E) ->
  306. case lists:member(Node, Ns) of
  307. true ->
  308. remove_node_from_sync_request(Node, Ns, From, S, E);
  309. false ->
  310. check_sync_requests(Reqs, Node, S, E)
  311. end.
  312. remove_node_from_sync_request(Node, Ns, From, S, E) ->
  313. case Ns -- [Node] of
  314. [] ->
  315. check_sync_requests(Node, send_sync_complete(From, S, E), E);
  316. Ns1 ->
  317. Rs1 = lists:keyreplace(
  318. From, 1, S#state.sync_requests, {From, Ns1}),
  319. %% Yes, we start over and run through the list from the top,
  320. %% with updated state; simpler code that way.
  321. check_sync_requests(Node, S#state{sync_requests = Rs1}, E)
  322. end.
  323. handle_leader_call({Reg, {_C,g,_Name} = K, Value, Pid, As, Op}, _From, S, _E)
  324. when Reg==reg; Reg==reg_other ->
  325. case gproc_lib:insert_reg(K, Value, Pid, g) of
  326. false when Op == reg ->
  327. {reply, badarg, S};
  328. false when Op == ensure ->
  329. case ets:lookup(?TAB, ets_key(K, Pid)) of
  330. [{_, Pid, _}] ->
  331. gproc_lib:do_set_value(K, Value, Pid),
  332. gproc_lib:insert_attr(K, As, Pid, g),
  333. Vals = mk_broadcast_insert_vals([{K, Pid, Value}]),
  334. {reply, updated, [{insert, Vals}], S};
  335. _ ->
  336. {reply, badarg, [], S}
  337. end;
  338. true ->
  339. _ = gproc_lib:ensure_monitor(Pid,g),
  340. _ = if As =/= [] ->
  341. gproc_lib:insert_attr(K, As, Pid, g);
  342. true -> []
  343. end,
  344. Vals = mk_broadcast_insert_vals([{K, Pid, Value}]),
  345. {reply, regged_new(Op), [{insert, Vals}], S}
  346. end;
  347. handle_leader_call({monitor, {T,g,_} = K, MPid, Type}, _From, S, _E) when T==n;
  348. T==a ->
  349. case ets:lookup(?TAB, {K, T}) of
  350. [{_, Pid, _}] ->
  351. Opts = get_opts(Pid, K),
  352. Ref = make_ref(),
  353. Opts1 = gproc_lib:add_monitor(Opts, MPid, Ref, Type),
  354. _ = gproc_lib:ensure_monitor(MPid, g),
  355. Obj = {{Pid,K}, Opts1},
  356. ets:insert(?TAB, Obj),
  357. {reply, Ref, [{insert, [Obj]}], S};
  358. LookupRes ->
  359. Ref = make_ref(),
  360. case Type of
  361. standby ->
  362. Event = {failover, MPid},
  363. Msgs = insert_reg(LookupRes, K, undefined, MPid, Event),
  364. Obj = {{K,T}, MPid, undefined},
  365. Rev = {{MPid,K}, []},
  366. ets:insert(?TAB, [Obj, Rev]),
  367. MPid ! {gproc, {failover,MPid}, Ref, K},
  368. {reply, Ref, [{insert, [Obj, Rev]},
  369. {notify, Msgs}], S};
  370. follow ->
  371. case LookupRes of
  372. [{_, Waiters}] ->
  373. add_follow_to_waiters(Waiters, K, MPid, Ref, S);
  374. [] ->
  375. add_follow_to_waiters([], K, MPid, Ref, S);
  376. [{_, Pid, _}] ->
  377. case ets:lookup(?TAB, {Pid,K}) of
  378. [{_, Opts}] when is_list(Opts) ->
  379. Opts1 = gproc_lib:add_monitor(
  380. Opts, MPid, Ref, follow),
  381. ets:insert(?TAB, {{Pid,K}, Opts1}),
  382. {reply, Ref,
  383. [{insert, [{{Pid,K}, Opts1}]}], S}
  384. end
  385. end;
  386. _ ->
  387. MPid ! {gproc, unreg, Ref, K},
  388. {reply, Ref, S}
  389. end
  390. end;
  391. handle_leader_call({demonitor, {T,g,_} = K, MPid, Ref}, _From, S, _E) ->
  392. case ets:lookup(?TAB, {K,T}) of
  393. [{_, Pid, _}] ->
  394. Opts = get_opts(Pid, K),
  395. Opts1 = gproc_lib:remove_monitors(Opts, MPid, Ref),
  396. Obj = {{Pid,K}, Opts1},
  397. ets:insert(?TAB, Obj),
  398. Del = case gproc_lib:does_pid_monitor(MPid, Opts1) of
  399. true -> [];
  400. false ->
  401. ets:delete(?TAB, {MPid, K}),
  402. [{delete, [{MPid, K}]}]
  403. end,
  404. {reply, ok, Del ++ [{insert, [Obj]}], S};
  405. [{Key, Waiters}] ->
  406. case lists:filter(fun({P, R, _}) ->
  407. P =/= MPid orelse R =/= Ref
  408. end, Waiters) of
  409. [] ->
  410. ets:delete(?TAB, {MPid, K}),
  411. ets:delete(?TAB, Key),
  412. {reply, ok, [{delete, [{MPid, K}, Key]}], S};
  413. NewWaiters ->
  414. ets:insert(?TAB, {Key, NewWaiters}),
  415. Del = case lists:keymember(MPid, 1, NewWaiters) of
  416. false ->
  417. ets:delete(?TAB, {MPid, K}),
  418. [{delete, [{MPid, K}]}];
  419. true ->
  420. []
  421. end,
  422. {reply, ok, Del ++ [{insert, [{Key, NewWaiters}]}], S}
  423. end;
  424. _ ->
  425. {reply, ok, S}
  426. end;
  427. handle_leader_call({set_attributes, {_,g,_} = K, Attrs, Pid}, _From, S, _E) ->
  428. case gproc_lib:insert_attr(K, Attrs, Pid, g) of
  429. false ->
  430. {reply, badarg, S};
  431. NewAttrs when is_list(NewAttrs) ->
  432. {reply, true, [{insert, [{{Pid,K}, NewAttrs}]}], S}
  433. end;
  434. handle_leader_call({reg_or_locate, {n,g,_} = K, Value, P},
  435. {FromPid, _}, S, _E) ->
  436. FromNode = node(FromPid),
  437. Reg = fun() ->
  438. Pid = if is_function(P, 0) ->
  439. spawn(FromNode, P);
  440. is_pid(P) ->
  441. P
  442. end,
  443. case gproc_lib:insert_reg(K, Value, Pid, g) of
  444. true ->
  445. _ = gproc_lib:ensure_monitor(Pid,g),
  446. Vals = [{{K,n},Pid,Value}],
  447. {reply, {Pid, Value}, [{insert, Vals}], S};
  448. false ->
  449. {reply, badarg, S}
  450. end
  451. end,
  452. case ets:lookup(?TAB, {K, n}) of
  453. [] ->
  454. Reg();
  455. [{_, _Waiters}] ->
  456. Reg();
  457. [{_, OtherPid, OtherVal}] ->
  458. {reply, {OtherPid, OtherVal}, S}
  459. end;
  460. handle_leader_call({update_counter, {T,g,_Ctr} = Key, Incr, Pid}, _From, S, _E)
  461. when is_integer(Incr), T==c;
  462. is_integer(Incr), T==n ->
  463. try New = ets:update_counter(?TAB, {Key, Pid}, {3,Incr}),
  464. RealPid = case Pid of
  465. n -> ets:lookup_element(?TAB, {Key,Pid}, 2);
  466. shared -> shared;
  467. P when is_pid(P) -> P
  468. end,
  469. Vals = [{{Key,Pid},RealPid,New} | update_aggr_counter(Key, Incr)],
  470. {reply, New, [{insert, Vals}], S}
  471. catch
  472. error:_ ->
  473. {reply, badarg, S}
  474. end;
  475. handle_leader_call({update_counters, Cs}, _From, S, _E) ->
  476. try {Replies, Vals} = batch_update_counters(Cs),
  477. {reply, Replies, [{insert, Vals}], S}
  478. catch
  479. error:_ ->
  480. {reply, badarg, S}
  481. end;
  482. handle_leader_call({reset_counter, {c,g,_Ctr} = Key, Pid}, _From, S, _E) ->
  483. try Current = ets:lookup_element(?TAB, {Key, Pid}, 3),
  484. Initial = case ets:lookup_element(?TAB, {Pid, Key}, 2) of
  485. r -> 0;
  486. Opts when is_list(Opts) ->
  487. proplists:get_value(initial, Opts, 0)
  488. end,
  489. Incr = Initial - Current,
  490. New = ets:update_counter(?TAB, {Key, Pid}, {3, Incr}),
  491. Vals = [{{Key,Pid},Pid,New} | update_aggr_counter(Key, Incr)],
  492. {reply, {Current, New}, [{insert, Vals}], S}
  493. catch
  494. error:_R ->
  495. io:fwrite("reset_counter failed: ~p~n~p~n", [_R, erlang:get_stacktrace()]),
  496. {reply, badarg, S}
  497. end;
  498. handle_leader_call({Unreg, {T,g,Name} = K, Pid}, _From, S, _E)
  499. when Unreg==unreg;
  500. Unreg==unreg_other->
  501. Key = if T == n; T == a; T == rc -> {K,T};
  502. true -> {K, Pid}
  503. end,
  504. case ets:member(?TAB, Key) of
  505. true ->
  506. _ = gproc_lib:remove_reg(K, Pid, unreg),
  507. if T == c ->
  508. case ets:lookup(?TAB, {{a,g,Name},a}) of
  509. [Aggr] ->
  510. %% updated by remove_reg/3
  511. {reply, true, [{delete,[{K,Pid}, {Pid,K}]},
  512. {insert, [Aggr]}], S};
  513. [] ->
  514. {reply, true, [{delete, [{K,Pid}, {Pid,K}]}], S}
  515. end;
  516. T == r ->
  517. case ets:lookup(?TAB, {{rc,g,Name},rc}) of
  518. [RC] ->
  519. {reply, true, [{delete,[{K,Pid}, {Pid,K}]},
  520. {insert, [RC]}], S};
  521. [] ->
  522. {reply, true, [{delete, [{K,Pid}, {Pid, K}]}], S}
  523. end;
  524. true ->
  525. {reply, true, [{notify, [{K, Pid, unreg}]},
  526. {delete, [{K, Pid}, {Pid,K}]}], S}
  527. end;
  528. false ->
  529. {reply, badarg, S}
  530. end;
  531. handle_leader_call({give_away, {T,g,_} = K, To, Pid}, _From, S, _E)
  532. when T == a; T == n; T == rc ->
  533. Key = {K, T},
  534. case ets:lookup(?TAB, Key) of
  535. [{_, Pid, Value}] ->
  536. Opts = get_opts(Pid, K),
  537. case pid_to_give_away_to(To) of
  538. Pid ->
  539. {reply, Pid, S};
  540. ToPid when is_pid(ToPid) ->
  541. ets:insert(?TAB, [{Key, ToPid, Value},
  542. {{ToPid,K}, Opts}]),
  543. _ = gproc_lib:ensure_monitor(ToPid, g),
  544. Rev = {Pid, K},
  545. ets:delete(?TAB, Rev),
  546. gproc_lib:notify({migrated, ToPid}, K, Opts),
  547. {reply, ToPid, [{insert, [{Key, ToPid, Value}]},
  548. {notify, [{K, Pid, {migrated, ToPid}}]},
  549. {delete, [{K, Pid}, Rev]}], S};
  550. undefined ->
  551. ets:delete(?TAB, Key),
  552. Rev = {Pid, K},
  553. ets:delete(?TAB, Rev),
  554. gproc_lib:notify(unreg, K, Opts),
  555. {reply, undefined, [{notify, [{K, Pid, unreg}]},
  556. {delete, [{K, Pid}, Rev]}], S}
  557. end;
  558. _ ->
  559. {reply, badarg, S}
  560. end;
  561. handle_leader_call({mreg, T, g, L, Pid}, _From, S, _E) ->
  562. if T==p; T==n; T==r ->
  563. try gproc_lib:insert_many(T, g, L, Pid) of
  564. {true,Objs} -> {reply, true, [{insert,Objs}], S};
  565. false -> {reply, badarg, S}
  566. catch
  567. error:_ -> {reply, badarg, S}
  568. end;
  569. true -> {reply, badarg, S}
  570. end;
  571. handle_leader_call({munreg, T, g, L, Pid}, _From, S, _E) ->
  572. try gproc_lib:remove_many(T, g, L, Pid) of
  573. [] ->
  574. {reply, true, S};
  575. Objs ->
  576. {reply, true, [{delete, Objs}], S}
  577. catch
  578. error:_ -> {reply, badarg, S}
  579. end;
  580. handle_leader_call({set,{T,g,N} =K,V,Pid}, _From, S, _E) ->
  581. if T == a ->
  582. if is_integer(V) ->
  583. case gproc_lib:do_set_value(K, V, Pid) of
  584. true -> {reply, true, [{insert,[{{K,T},Pid,V}]}], S};
  585. false -> {reply, badarg, S}
  586. end
  587. end;
  588. T == c ->
  589. try gproc_lib:do_set_counter_value(K, V, Pid),
  590. AKey = {{a,g,N},a},
  591. Aggr = ets:lookup(?TAB, AKey), % may be []
  592. {reply, true, [{insert, [{{K,Pid},Pid,V} | Aggr]}], S}
  593. catch
  594. error:_ ->
  595. {reply, badarg, S}
  596. end;
  597. true ->
  598. case gproc_lib:do_set_value(K, V, Pid) of
  599. true ->
  600. Obj = if T==n -> {{K, T}, Pid, V};
  601. true -> {{K, Pid}, Pid, V}
  602. end,
  603. {reply, true, [{insert,[Obj]}], S};
  604. false ->
  605. {reply, badarg, S}
  606. end
  607. end;
  608. handle_leader_call({await, Key, Pid}, {_,Ref} = From, S, _E) ->
  609. %% The pid in _From is of the gen_leader instance that forwarded the
  610. %% call - not of the client. This is why the Pid is explicitly passed.
  611. %% case gproc_lib:await(Key, {Pid,Ref}) of
  612. case gproc_lib:await(Key, Pid, From) of
  613. {reply, {Ref, {K, P, V}}} ->
  614. {reply, {Ref, {K, P, V}}, S};
  615. {reply, Reply, Insert} ->
  616. {reply, Reply, [{insert, Insert}], S}
  617. end;
  618. handle_leader_call(_, _, S, _E) ->
  619. {reply, badarg, S}.
  620. handle_leader_cast({initiate_sync, Ref}, S, E) ->
  621. case gen_leader:alive(E) -- [node()] of
  622. [] ->
  623. %% ???
  624. {noreply, send_sync_complete(Ref, S, E)};
  625. Alive ->
  626. gen_leader:broadcast({from_leader, {sync, Ref}}, Alive, E),
  627. {noreply, S#state{sync_requests =
  628. [{Ref, Alive}|S#state.sync_requests]}}
  629. end;
  630. handle_leader_cast({sync_reply, Node, Ref}, S, E) ->
  631. #state{sync_requests = SReqs} = S,
  632. case lists:keyfind(Ref, 1, SReqs) of
  633. false ->
  634. %% This should never happen, except perhaps if the leader who
  635. %% received the sync request died, and the new leader gets the
  636. %% sync reply. In that case, we trust that the client has been
  637. %% notified anyway, and ignore the message.
  638. {ok, S};
  639. {_, Ns} ->
  640. case lists:delete(Node, Ns) of
  641. [] ->
  642. {ok, send_sync_complete(Ref, S, E)};
  643. Ns1 ->
  644. SReqs1 = lists:keyreplace(Ref, 1, SReqs, {Ref, Ns1}),
  645. {ok, S#state{sync_requests = SReqs1}}
  646. end
  647. end;
  648. handle_leader_cast({add_globals, Missing}, S, _E) ->
  649. %% This is an audit message: a peer (non-leader) had info about granted
  650. %% global resources that we didn't know of when we became leader.
  651. %% This could happen due to a race condition when the old leader died.
  652. Update = insert_globals(Missing),
  653. {ok, [{insert, Update}], S};
  654. handle_leader_cast({remove_globals, Globals}, S, _E) ->
  655. delete_globals(Globals),
  656. {ok, S};
  657. handle_leader_cast({cancel_wait, Pid, {T,_,_} = Key, Ref}, S, _E) ->
  658. case ets:lookup(?TAB, {Key, T}) of
  659. [{_, Waiters}] ->
  660. Ops = gproc_lib:remove_wait(Key, Pid, Ref, Waiters),
  661. {ok, Ops, S};
  662. _ ->
  663. {ok, [], S}
  664. end;
  665. handle_leader_cast({cancel_wait_or_monitor, Pid, {T,_,_} = Key}, S, _E) ->
  666. case ets:lookup(?TAB, {Key, T}) of
  667. [{_, Waiters}] ->
  668. Ops = gproc_lib:remove_wait(Key, Pid, all, Waiters),
  669. {ok, Ops, S};
  670. [{_, OtherPid, _}] ->
  671. Ops = gproc_lib:remove_monitors(Key, OtherPid, Pid),
  672. {ok, Ops, S}
  673. end;
  674. handle_leader_cast({pid_is_DOWN, Pid}, S, _E) ->
  675. Globals = ets:select(?TAB, [{{{Pid,'$1'}, '_'},
  676. [{'==',{element,2,'$1'},g}],[{{'$1',Pid}}]}]),
  677. ets:delete(?TAB, {Pid,g}),
  678. case process_globals(Globals) of
  679. [] ->
  680. {ok, S};
  681. Broadcast ->
  682. {ok, Broadcast, S}
  683. end.
  684. mk_broadcast_insert_vals(Objs) ->
  685. lists:flatmap(
  686. fun({{C, g, Name} = K, Pid, Value}) ->
  687. if C == a; C == rc ->
  688. ets:lookup(?TAB, {K,C}) ++ ets:lookup(?TAB, {Pid,K});
  689. C == c ->
  690. [{{K,Pid},Pid,Value} | ets:lookup(?TAB,{{a,g,Name},a})]
  691. ++ ets:lookup(?TAB, {Pid,K});
  692. C == r ->
  693. [{{K,Pid},Pid,Value} | ets:lookup(?TAB,{{rc,g,Name},rc})]
  694. ++ ets:lookup(?TAB, {Pid, K});
  695. C == n ->
  696. [{{K,n},Pid,Value}| ets:lookup(?TAB, {Pid,K})];
  697. true ->
  698. [{{K,Pid},Pid,Value} | ets:lookup(?TAB, {Pid,K})]
  699. end
  700. end, Objs).
  701. process_globals(Globals) ->
  702. {Modified, Notifications} =
  703. lists:foldl(
  704. fun({{T,_,_} = Key, Pid}, A) when T==n; T==a; T==rc ->
  705. case ets:lookup(?TAB, {Pid,Key}) of
  706. [{_, Opts}] when is_list(Opts) ->
  707. maybe_failover(Key, Pid, Opts, A);
  708. _ ->
  709. A
  710. end;
  711. ({{T,_,_} = Key, Pid}, {MA,NA}) ->
  712. MA1 = case T of
  713. c ->
  714. Incr = ets:lookup_element(?TAB, {Key,Pid}, 3),
  715. update_aggr_counter(Key, -Incr) ++ MA;
  716. r ->
  717. decrement_resource_count(Key, []) ++ MA;
  718. _ ->
  719. MA
  720. end,
  721. N = remove_entry(Key, Pid, unreg),
  722. {MA1, N ++ NA}
  723. end, {[],[]}, Globals),
  724. [{insert, Modified} || Modified =/= []] ++
  725. [{notify, Notifications} || Notifications =/= []] ++
  726. [{delete, Globals} || Globals =/= []].
  727. maybe_failover({T,_,_} = Key, Pid, Opts, {MAcc, NAcc}) ->
  728. Opts = get_opts(Pid, Key),
  729. case filter_standbys(gproc_lib:standbys(Opts)) of
  730. [] ->
  731. Notify = remove_entry(Key, Pid, unreg),
  732. {MAcc, Notify ++ NAcc};
  733. [{ToPid,Ref,_}|_] ->
  734. Value = case ets:lookup(?TAB, {Key,T}) of
  735. [{_, _, V}] -> V;
  736. _ -> undefined
  737. end,
  738. Notify = remove_rev_entry(Opts, Pid, Key, {failover, ToPid}),
  739. Opts1 = gproc_lib:remove_monitor(Opts, ToPid, Ref),
  740. _ = gproc_lib:ensure_monitor(ToPid, g),
  741. NewReg = {{Key,T}, ToPid, Value},
  742. NewRev = {{ToPid, Key}, Opts1},
  743. ets:insert(?TAB, [NewReg, NewRev]),
  744. {[NewReg, NewRev | MAcc], Notify ++ NAcc}
  745. end.
  746. filter_standbys(SBs) ->
  747. filter_standbys(SBs, [node()|nodes()]).
  748. filter_standbys([{Pid,_,_} = H|T], Nodes) ->
  749. case lists:member(node(Pid), Nodes) of
  750. true ->
  751. [H|T];
  752. false ->
  753. filter_standbys(T, Nodes)
  754. end;
  755. filter_standbys([], _) ->
  756. [].
  757. remove_entry(Key, Pid, Event) ->
  758. K = ets_key(Key, Pid),
  759. case ets:lookup(?TAB, K) of
  760. [{_, P, _}] when is_pid(P), P =:= Pid; is_atom(Pid) ->
  761. ets:delete(?TAB, K),
  762. remove_rev_entry(get_opts(Pid, Key), Pid, Key, Event);
  763. [{_, _OtherPid, _}] ->
  764. ets:delete(?TAB, {Pid, Key}),
  765. [];
  766. [{_, _Waiters}] ->
  767. ets:delete(?TAB, K),
  768. [];
  769. [] -> []
  770. end.
  771. remove_rev_entry(Opts, Pid, {T,g,_} = K, Event) when T==n; T==a ->
  772. Key = {Pid, K},
  773. gproc_lib:notify(Event, K, Opts),
  774. ets:delete(?TAB, Key),
  775. [{K, Pid, Event}];
  776. remove_rev_entry(_, Pid, K, _Event) ->
  777. ets:delete(?TAB, {Pid, K}),
  778. [].
  779. get_opts(Pid, K) ->
  780. case ets:lookup(?TAB, {Pid, K}) of
  781. [] -> [];
  782. [{_, r}] -> [];
  783. [{_, Opts}] -> Opts
  784. end.
  785. code_change(_FromVsn, S, _Extra, _E) ->
  786. {ok, S}.
  787. terminate(_Reason, _S) ->
  788. ok.
  789. from_leader({sync, Ref}, S, _E) ->
  790. gen_leader:leader_cast(?MODULE, {sync_reply, node(), Ref}),
  791. {ok, S};
  792. from_leader({sync_complete, Ref}, S, _E) ->
  793. case Ref of
  794. {From, _} when node(From) == node() ->
  795. {ok, reply_to_sync_client(Ref, S)};
  796. _ ->
  797. %% we shouldn't have to, but ensure that we don't have
  798. %% the sync request in our state.
  799. {ok, S#state{sync_requests = lists:keydelete(
  800. Ref, 1, S#state.sync_requests)}}
  801. end;
  802. from_leader(Ops, S, _E) ->
  803. lists:foreach(
  804. fun({delete, Globals}) ->
  805. delete_globals(Globals);
  806. ({insert, Globals}) ->
  807. _ = insert_globals(Globals);
  808. ({notify, Events}) ->
  809. do_notify(Events)
  810. end, Ops),
  811. {ok, S}.
  812. insert_globals(Globals) ->
  813. lists:foldl(
  814. fun({{{_,_,_} = Key,_}, Pid, _} = Obj, A) ->
  815. ets:insert(?TAB, Obj),
  816. ets:insert_new(?TAB, {{Pid,Key}, []}),
  817. gproc_lib:ensure_monitor(Pid,g),
  818. A;
  819. ({{{_,_,_},_}, _} = Obj, A) ->
  820. ets:insert(?TAB, Obj),
  821. A;
  822. ({{P,_K}, Opts} = Obj, A) when is_pid(P), is_list(Opts) ->
  823. ets:insert(?TAB, Obj),
  824. gproc_lib:ensure_monitor(P,g),
  825. [Obj] ++ A;
  826. (_Other, A) ->
  827. A
  828. end, Globals, Globals).
  829. delete_globals(Globals) ->
  830. lists:foreach(
  831. fun({{_,g,_} = K, T}) when is_atom(T); is_pid(T) ->
  832. remove_entry(K, T, []);
  833. ({{{_,g,_} = K, T}, P}) when is_pid(P), is_atom(T);
  834. is_pid(P), is_pid(T) ->
  835. remove_entry(K, P, []);
  836. ({Pid, Key}) when is_pid(Pid); Pid==shared ->
  837. ets:delete(?TAB, {Pid, Key})
  838. end, Globals).
  839. do_notify([{P, Msg}|T]) when is_pid(P), node(P) =:= node() ->
  840. P ! Msg,
  841. do_notify(T);
  842. do_notify([{P, _Msg}|T]) when is_pid(P) ->
  843. do_notify(T);
  844. do_notify([{K, P, E}|T]) ->
  845. case ets:lookup(?TAB, {P,K}) of
  846. [{_, Opts}] when is_list(Opts) ->
  847. gproc_lib:notify(E, K, Opts);
  848. _ ->
  849. do_notify(T)
  850. end;
  851. do_notify([]) ->
  852. ok.
  853. ets_key({T,_,_} = K, _) when T==n; T==a; T==rc ->
  854. {K, T};
  855. ets_key(K, Pid) ->
  856. {K, Pid}.
  857. leader_call(Req) ->
  858. case gen_leader:leader_call(?MODULE, Req) of
  859. badarg -> ?THROW_GPROC_ERROR(badarg);
  860. Reply -> Reply
  861. end.
  862. %% leader_call(Req, Timeout) ->
  863. %% case gen_leader:leader_call(?MODULE, Req, Timeout) of
  864. %% badarg -> ?THROW_GPROC_ERROR(badarg);
  865. %% Reply -> Reply
  866. %% end.
  867. leader_cast(Msg) ->
  868. gen_leader:leader_cast(?MODULE, Msg).
  869. init(Opts) ->
  870. S0 = #state{},
  871. AlwaysBcast = proplists:get_value(always_broadcast, Opts,
  872. S0#state.always_broadcast),
  873. {ok, #state{always_broadcast = AlwaysBcast}}.
  874. surrendered_1(Globs) ->
  875. My_local_globs =
  876. ets:select(?TAB, [{{{{'_',g,'_'},'_'},'$1', '$2'},
  877. [{'==', {node,'$1'}, node()}],
  878. [{{ {element,1,'$_'}, '$1', '$2' }}]}]),
  879. _ = [gproc_lib:ensure_monitor(Pid, g) || {_, Pid, _} <- My_local_globs],
  880. ?event({'My_local_globs', My_local_globs}),
  881. %% remove all remote globals.
  882. ets:select_delete(?TAB, [{{{{'_',g,'_'},'_'}, '$1', '_'},
  883. [{'=/=', {node,'$1'}, node()}],
  884. [true]},
  885. {{{'$1',{'_',g,'_'}}, '_'},
  886. [{'=/=', {node,'$1'}, node()}],
  887. [true]}]),
  888. %% insert new non-local globals, collect the leader's version of
  889. %% what my globals are
  890. Ldr_local_globs =
  891. lists:foldl(
  892. fun({{Key,_}=K, Pid, V}, Acc) when node(Pid) =/= node() ->
  893. ets:insert(?TAB, {K, Pid, V}),
  894. _ = gproc_lib:ensure_monitor(Pid, g),
  895. ets:insert_new(?TAB, {{Pid,Key}, []}),
  896. Acc;
  897. ({{_Pid,_}=K, Opts}, Acc) -> % when node(Pid) =/= node() ->
  898. ets:insert(?TAB, {K, Opts}),
  899. Acc;
  900. ({_, Pid, _} = Obj, Acc) when node(Pid) == node() ->
  901. [Obj|Acc]
  902. end, [], Globs),
  903. ?event({'Ldr_local_globs', Ldr_local_globs}),
  904. case [{K,P,V} || {K,P,V} <- My_local_globs,
  905. is_pid(P) andalso
  906. not(lists:keymember(K, 1, Ldr_local_globs))] of
  907. [] ->
  908. %% phew! We have the same picture
  909. ok;
  910. [_|_] = Missing ->
  911. %% This is very unlikely, I think
  912. ?event({'Missing', Missing}),
  913. leader_cast({add_globals, mk_broadcast_insert_vals(Missing)})
  914. end,
  915. case [{K,P} || {{K,_}=R,P,_} <- Ldr_local_globs,
  916. is_pid(P) andalso
  917. not(lists:keymember(R, 1, My_local_globs))] of
  918. [] ->
  919. ok;
  920. [_|_] = Remove ->
  921. ?event({'Remove', Remove}),
  922. leader_cast({remove_globals, Remove})
  923. end.
  924. batch_update_counters(Cs) ->
  925. batch_update_counters(Cs, [], []).
  926. batch_update_counters([{{c,g,_} = Key, Pid, Incr}|T], Returns, Updates) ->
  927. case update_counter_g(Key, Incr, Pid) of
  928. [{_,_,_} = A, {_, _, V} = C] ->
  929. batch_update_counters(T, [{Key,Pid,V}|Returns], add_object(
  930. A, add_object(C, Updates)));
  931. [{_, _, V} = C] ->
  932. batch_update_counters(T, [{Key,Pid,V}|Returns], add_object(C, Updates))
  933. end;
  934. batch_update_counters([], Returns, Updates) ->
  935. {lists:reverse(Returns), Updates}.
  936. add_object({K,P,_} = Obj, [{K,P,_} | T]) ->
  937. [Obj | T];
  938. add_object(Obj, [H|T]) ->
  939. [H | add_object(Obj, T)];
  940. add_object(Obj, []) ->
  941. [Obj].
  942. update_counter_g({c,g,_} = Key, Incr, Pid) when is_integer(Incr) ->
  943. Res = ets:update_counter(?TAB, {Key, Pid}, {3,Incr}),
  944. update_aggr_counter(Key, Incr, [{{Key,Pid},Pid,Res}]);
  945. update_counter_g({c,g,_} = Key, {Incr, Threshold, SetValue}, Pid)
  946. when is_integer(Incr), is_integer(Threshold), is_integer(SetValue) ->
  947. [Prev, New] = ets:update_counter(?TAB, {Key, Pid},
  948. [{3, 0}, {3, Incr, Threshold, SetValue}]),
  949. update_aggr_counter(Key, New - Prev, [{{Key,Pid},Pid,New}]);
  950. update_counter_g({c,g,_} = Key, Ops, Pid) when is_list(Ops) ->
  951. case ets:update_counter(?TAB, {Key, Pid},
  952. [{3, 0} | expand_ops(Ops)]) of
  953. [_] ->
  954. [];
  955. [Prev | Rest] ->
  956. [New | _] = lists:reverse(Rest),
  957. update_aggr_counter(Key, New - Prev, [{Key, Pid, Rest}])
  958. end;
  959. update_counter_g(_, _, _) ->
  960. ?THROW_GPROC_ERROR(badarg).
  961. expand_ops([{Incr,Thr,SetV}|T])
  962. when is_integer(Incr), is_integer(Thr), is_integer(SetV) ->
  963. [{3, Incr, Thr, SetV}|expand_ops(T)];
  964. expand_ops([Incr|T]) when is_integer(Incr) ->
  965. [{3, Incr}|expand_ops(T)];
  966. expand_ops([]) ->
  967. [];
  968. expand_ops(_) ->
  969. ?THROW_GPROC_ERROR(badarg).
  970. update_aggr_counter({n,_,_}, _) ->
  971. [];
  972. update_aggr_counter(Key, Incr) ->
  973. update_aggr_counter(Key, Incr, []).
  974. update_aggr_counter({c,g,Ctr}, Incr, Acc) ->
  975. Key = {{a,g,Ctr},a},
  976. case ets:lookup(?TAB, Key) of
  977. [] ->
  978. Acc;
  979. [{K, Pid, Prev}] ->
  980. New = {K, Pid, Prev+Incr},
  981. ets:insert(?TAB, New),
  982. [New|Acc]
  983. end.
  984. decrement_resource_count({r,g,Rsrc}, Acc) ->
  985. Key = {{rc,g,Rsrc},rc},
  986. case ets:member(?TAB, Key) of
  987. false ->
  988. Acc;
  989. true ->
  990. %% Call the lib function, which might trigger events
  991. gproc_lib:decrement_resource_count(g, Rsrc),
  992. ets:lookup(?TAB, Key) ++ Acc
  993. end.
  994. pid_to_give_away_to(P) when is_pid(P) ->
  995. P;
  996. pid_to_give_away_to({T,g,_} = Key) when T==n; T==a ->
  997. case ets:lookup(?TAB, {Key, T}) of
  998. [{_, Pid, _}] ->
  999. Pid;
  1000. _ ->
  1001. undefined
  1002. end.
  1003. insert_reg([{_, Waiters}], K, Val, Pid, Event) ->
  1004. gproc_lib:insert_reg(K, Val, Pid, g),
  1005. tell_waiters(Waiters, K, Pid, Val, Event);
  1006. insert_reg([], K, Val, Pid, Event) ->
  1007. gproc_lib:insert_reg(K, Val, Pid, g),
  1008. tell_waiters([], K, Val, Pid, Event).
  1009. tell_waiters([{P,R}|T], K, Pid, V, Event) ->
  1010. Msg = {gproc, R, registered, {K, Pid, V}},
  1011. if node(P) == node() ->
  1012. P ! Msg;
  1013. true ->
  1014. [{P, Msg} | tell_waiters(T, K, Pid, V, Event)]
  1015. end;
  1016. tell_waiters([{P,R,follow}|T], K, Pid, V, Event) ->
  1017. Msg = {gproc, Event, R, K},
  1018. if node(P) == node() ->
  1019. P ! Msg;
  1020. true ->
  1021. [{P, Msg} | tell_waiters(T, K, Pid, V, Event)]
  1022. end;
  1023. tell_waiters([], _, _, _, _) ->
  1024. [].
  1025. add_follow_to_waiters(Waiters, {T,_,_} = K, Pid, Ref, S) ->
  1026. Obj = {{K,T}, [{Pid, Ref, follow}|Waiters]},
  1027. ets:insert(?TAB, Obj),
  1028. Rev = ensure_rev({Pid, K}),
  1029. Msg = {gproc, unreg, Ref, K},
  1030. if node(Pid) =:= node() ->
  1031. Pid ! Msg,
  1032. {reply, Ref, [{insert, [Obj, Rev]}], S};
  1033. true ->
  1034. {reply, Ref, [{insert, [Obj, Rev]},
  1035. {notify, [{Pid, Msg}]}], S}
  1036. end.
  1037. ensure_rev(K) ->
  1038. case ets:lookup(?TAB, K) of
  1039. [Rev] ->
  1040. Rev;
  1041. [] ->
  1042. Rev = {K, []},
  1043. ets:insert(?TAB, Rev),
  1044. Rev
  1045. end.
  1046. regged_new(reg ) -> true;
  1047. regged_new(ensure) -> new.
  1048. initiate_sync(From, #state{is_leader = true} = S, E) ->
  1049. case gen_leader:alive(E) -- [node()] of
  1050. [] ->
  1051. %% I'm alone - sync is trivial
  1052. gen_server:reply(From, true),
  1053. S;
  1054. Alive ->
  1055. gen_leader:broadcast(
  1056. {from_leader, {sync, From}}, Alive, E),
  1057. S#state{sync_requests =
  1058. [{From, Alive}|S#state.sync_requests]}
  1059. end;
  1060. initiate_sync(From, S, _E) ->
  1061. leader_cast({initiate_sync, From}),
  1062. S.
  1063. maybe_reinitiate_sync(#state{sync_clients = []} = S) ->
  1064. S;
  1065. maybe_reinitiate_sync(#state{sync_clients = Cs} = S) ->
  1066. _ = [leader_cast({initiate_sync, From}) || From <- Cs],
  1067. S.
  1068. send_sync_complete({From, _} = Ref, S, _E) when node(From) == node() ->
  1069. reply_to_sync_client(Ref, S);
  1070. send_sync_complete({From, _} = Ref, S, E) ->
  1071. %% Notify the node that initiated the sync
  1072. %% 'broadcasting' to exactly one node.
  1073. gen_leader:broadcast(
  1074. {from_leader, {sync_complete, Ref}}, [node(From)], E),
  1075. S#state{sync_requests =
  1076. lists:keydelete(Ref, 1, S#state.sync_requests)}.
  1077. reply_to_sync_client(Ref, S) ->
  1078. gen_server:reply(Ref, true),
  1079. S#state{sync_clients =
  1080. S#state.sync_clients -- [Ref],
  1081. sync_requests =
  1082. lists:keydelete(Ref, 1, S#state.sync_requests)}.