gproc_dist.erl 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637
  1. %% ``The contents of this file are subject to the Erlang Public License,
  2. %% Version 1.1, (the "License"); you may not use this file except in
  3. %% compliance with the License. You should have received a copy of the
  4. %% Erlang Public License along with this software. If not, it can be
  5. %% retrieved via the world wide web at http://www.erlang.org/.
  6. %%
  7. %% Software distributed under the License is distributed on an "AS IS"
  8. %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  9. %% the License for the specific language governing rights and limitations
  10. %% under the License.
  11. %%
  12. %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
  13. %% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
  14. %% AB. All Rights Reserved.''
  15. %%
  16. %% @author Ulf Wiger <ulf.wiger@erlang-solutions.com>
  17. %%
  18. %% @doc Extended process registry
  19. %% <p>This module implements an extended process registry</p>
  20. %% <p>For a detailed description, see gproc/doc/erlang07-wiger.pdf.</p>
  21. %% @end
  22. -module(gproc_dist).
  23. -behaviour(gen_leader).
  24. -export([start_link/0, start_link/1,
  25. reg/1, reg/2, unreg/1,
  26. mreg/2,
  27. set_value/2,
  28. give_away/2,
  29. update_counter/2]).
  30. -export([leader_call/1,
  31. leader_cast/1,
  32. sync/0]).
  33. %%% internal exports
  34. -export([init/1,
  35. handle_cast/3,
  36. handle_call/4,
  37. handle_info/2,
  38. handle_leader_call/4,
  39. handle_leader_cast/3,
  40. handle_DOWN/3,
  41. elected/2, % original version
  42. elected/3,
  43. surrendered/3,
  44. from_leader/3,
  45. code_change/4,
  46. terminate/2]).
  47. -include("gproc.hrl").
  48. -define(SERVER, ?MODULE).
  49. -record(state, {
  50. always_broadcast = false,
  51. is_leader}).
  52. start_link() ->
  53. start_link({[node()|nodes()], []}).
  54. start_link(all) ->
  55. start_link({[node()|nodes()], []});
  56. start_link(Nodes) when is_list(Nodes) ->
  57. start_link({Nodes, []});
  58. start_link({Nodes, Opts}) ->
  59. gen_leader:start_link(
  60. ?SERVER, Nodes, Opts, ?MODULE, [], []).
  61. %% ?SERVER, Nodes, [],?MODULE, [], [{debug,[trace]}]).
  62. %% {@see gproc:reg/1}
  63. %%
  64. reg(Key) ->
  65. reg(Key, gproc:default(Key)).
  66. %%% @spec({Class,Scope, Key}, Value) -> true
  67. %%% @doc
  68. %%% Class = n - unique name
  69. %%% | p - non-unique property
  70. %%% | c - counter
  71. %%% | a - aggregated counter
  72. %%% Scope = l | g (global or local)
  73. %%% @end
  74. reg({_,g,_} = Key, Value) ->
  75. %% anything global
  76. leader_call({reg, Key, Value, self()});
  77. reg(_, _) ->
  78. erlang:error(badarg).
  79. mreg(T, KVL) ->
  80. if is_list(KVL) -> leader_call({mreg, T, g, KVL, self()});
  81. true -> erlang:error(badarg)
  82. end.
  83. unreg({_,g,_} = Key) ->
  84. leader_call({unreg, Key, self()});
  85. unreg(_) ->
  86. erlang:error(badarg).
  87. set_value({T,g,_} = Key, Value) when T==a; T==c ->
  88. if is_integer(Value) ->
  89. leader_call({set, Key, Value});
  90. true ->
  91. erlang:error(badarg)
  92. end;
  93. set_value({_,g,_} = Key, Value) ->
  94. leader_call({set, Key, Value, self()});
  95. set_value(_, _) ->
  96. erlang:error(badarg).
  97. give_away({_,g,_} = Key, To) ->
  98. leader_call({give_away, Key, To, self()}).
  99. update_counter({c,g,_} = Key, Incr) when is_integer(Incr) ->
  100. leader_call({update_counter, Key, Incr, self()});
  101. update_counter(_, _) ->
  102. erlang:error(badarg).
  103. %% @spec sync() -> true
  104. %% @doc Synchronize with the gproc leader
  105. %%
  106. %% This function can be used to ensure that data has been replicated from the
  107. %% leader to the current node.
  108. %% @end
  109. %%
  110. sync() ->
  111. leader_call(sync).
  112. %%% ==========================================================
  113. handle_cast(_Msg, S, _) ->
  114. {stop, unknown_cast, S}.
  115. handle_call(_, _, S, _) ->
  116. {reply, badarg, S}.
  117. handle_info({'DOWN', _MRef, process, Pid, _}, S) ->
  118. leader_cast({pid_is_DOWN, Pid}),
  119. %% ets:select_delete(?TAB, [{{{Pid,'_'}}, [], [true]}]),
  120. %% ets:delete(?TAB, Pid),
  121. %% lists:foreach(fun(Key) -> gproc_lib:remove_reg_1(Key, Pid) end, Keys),
  122. {ok, S};
  123. handle_info(_, S) ->
  124. {ok, S}.
  125. elected(S, _E) ->
  126. {ok, {globals,globs()}, S#state{is_leader = true}}.
  127. elected(S, _E, undefined) ->
  128. %% I have become leader; full synch
  129. {ok, {globals, globs()}, S#state{is_leader = true}};
  130. elected(S, _E, _Node) ->
  131. Synch = {globals, globs()},
  132. if not S#state.always_broadcast ->
  133. %% Another node recognized us as the leader.
  134. %% Don't broadcast all data to everyone else
  135. {reply, Synch, S};
  136. true ->
  137. %% Main reason for doing this is if we are using a gen_leader
  138. %% that doesn't support the 'reply' return value
  139. {ok, Synch, S}
  140. end.
  141. globs() ->
  142. ets:select(?TAB, [{{{{'_',g,'_'},'_'},'_','_'},[],['$_']}]).
  143. surrendered(S, {globals, Globs}, _E) ->
  144. %% globals from this node should be more correct in our table than
  145. %% in the leader's
  146. surrendered_1(Globs),
  147. {ok, S#state{is_leader = false}}.
  148. handle_DOWN(Node, S, _E) ->
  149. Head = {{{'_',g,'_'},'_'},'$1','_'},
  150. Gs = [{'==', {node,'$1'},Node}],
  151. Globs = ets:select(?TAB, [{Head, Gs, [{{{element,1,{element,1,'$_'}},
  152. {element,2,'$_'}}}]}]),
  153. case process_globals(Globs) of
  154. [] ->
  155. {ok, S};
  156. Broadcast ->
  157. {ok, Broadcast, S}
  158. end.
  159. %% ets:select_delete(?TAB, [{Head, Gs, [true]}]),
  160. %% {ok, [{delete, Globs}], S}.
  161. handle_leader_call(sync, _From, S, _E) ->
  162. {reply, true, sync, S};
  163. handle_leader_call({reg, {C,g,Name} = K, Value, Pid}, _From, S, _E) ->
  164. case gproc_lib:insert_reg(K, Value, Pid, g) of
  165. false ->
  166. {reply, badarg, S};
  167. true ->
  168. gproc_lib:ensure_monitor(Pid,g),
  169. Vals =
  170. if C == a ->
  171. ets:lookup(?TAB, {K,a});
  172. C == c ->
  173. [{{K,Pid},Pid,Value} | ets:lookup(?TAB,{{a,g,Name},a})];
  174. C == n ->
  175. [{{K,n},Pid,Value}];
  176. true ->
  177. [{{K,Pid},Pid,Value}]
  178. end,
  179. {reply, true, [{insert, Vals}], S}
  180. end;
  181. handle_leader_call({update_counter, {c,g,_Ctr} = Key, Incr, Pid}, _From, S, _E)
  182. when is_integer(Incr) ->
  183. try New = ets:update_counter(?TAB, {Key, Pid}, {3,Incr}),
  184. Vals = [{{Key,Pid},Pid,New} | update_aggr_counter(Key, Incr)],
  185. {reply, New, [{insert, Vals}], S}
  186. catch
  187. error:_ ->
  188. {reply, badarg, S}
  189. end;
  190. handle_leader_call({unreg, {T,g,Name} = K, Pid}, _From, S, _E) ->
  191. Key = if T == n; T == a -> {K,T};
  192. true -> {K, Pid}
  193. end,
  194. case ets:member(?TAB, Key) of
  195. true ->
  196. gproc_lib:remove_reg(K, Pid),
  197. if T == c ->
  198. case ets:lookup(?TAB, {{a,g,Name},a}) of
  199. [Aggr] ->
  200. %% updated by remove_reg/2
  201. {reply, true, [{delete,[Key, {Pid,K}]},
  202. {insert, [Aggr]}], S};
  203. [] ->
  204. {reply, true, [{delete, [Key, {Pid,K}]}], S}
  205. end;
  206. true ->
  207. {reply, true, [{delete, [Key]}], S}
  208. end;
  209. false ->
  210. {reply, badarg, S}
  211. end;
  212. handle_leader_call({give_away, {T,g,_} = K, To, Pid}, _From, S, _E)
  213. when T == a; T == n ->
  214. Key = {K, T},
  215. case ets:lookup(?TAB, Key) of
  216. [{_, Pid, Value}] ->
  217. case pid_to_give_away_to(To) of
  218. Pid ->
  219. {reply, Pid, S};
  220. ToPid when is_pid(ToPid) ->
  221. ets:insert(?TAB, [{Key, ToPid, Value},
  222. {{ToPid,K}, r}]),
  223. gproc_lib:ensure_monitor(ToPid, g),
  224. {reply, ToPid, [{delete, [Key, {Pid,K}]},
  225. {insert, [{Key, ToPid, Value}]}], S};
  226. undefined ->
  227. ets:delete(?TAB, Key),
  228. ets:delete(?TAB, {Pid, K}),
  229. {reply, undefined, [{delete, [Key, {Pid,K}]}], S}
  230. end;
  231. _ ->
  232. {reply, badarg, S}
  233. end;
  234. handle_leader_call({mreg, T, g, L, Pid}, _From, S, _E) ->
  235. if T==p; T==n ->
  236. try gproc_lib:insert_many(T, g, L, Pid) of
  237. {true,Objs} -> {reply, true, [{insert,Objs}], S};
  238. false -> {reply, badarg, S}
  239. catch
  240. error:_ -> {reply, badarg, S}
  241. end;
  242. true -> {reply, badarg, S}
  243. end;
  244. handle_leader_call({set,{T,g,N} =K,V,Pid}, _From, S, _E) ->
  245. if T == a ->
  246. if is_integer(V) ->
  247. case gproc_lib:do_set_value(K, V, Pid) of
  248. true -> {reply, true, [{insert,[{{K,T},Pid,V}]}], S};
  249. false -> {reply, badarg, S}
  250. end
  251. end;
  252. T == c ->
  253. try gproc_lib:do_set_counter_value(K, V, Pid),
  254. AKey = {{a,g,N},a},
  255. Aggr = ets:lookup(?TAB, AKey), % may be []
  256. {reply, true, [{insert, [{{K,Pid},Pid,V} | Aggr]}], S}
  257. catch
  258. error:_ ->
  259. {reply, badarg, S}
  260. end;
  261. true ->
  262. case gproc_lib:do_set_value(K, V, Pid) of
  263. true ->
  264. Obj = if T==n -> {{K, T}, Pid, V};
  265. true -> {{K, Pid}, Pid, V}
  266. end,
  267. {reply, true, [{insert,[Obj]}], S};
  268. false ->
  269. {reply, badarg, S}
  270. end
  271. end;
  272. handle_leader_call({await, Key, Pid}, {_,Ref} = From, S, _E) ->
  273. %% The pid in _From is of the gen_leader instance that forwarded the
  274. %% call - not of the client. This is why the Pid is explicitly passed.
  275. %% case gproc_lib:await(Key, {Pid,Ref}) of
  276. case gproc_lib:await(Key, Pid, From) of
  277. {reply, {Ref, {K, P, V}}} ->
  278. {reply, {Ref, {K, P, V}}, S};
  279. {reply, Reply, Insert} ->
  280. {reply, Reply, [{insert, Insert}], S}
  281. end;
  282. handle_leader_call(_, _, S, _E) ->
  283. {reply, badarg, S}.
  284. handle_leader_cast({add_globals, Missing}, S, _E) ->
  285. %% This is an audit message: a peer (non-leader) had info about granted
  286. %% global resources that we didn't know of when we became leader.
  287. %% This could happen due to a race condition when the old leader died.
  288. ets:insert(?TAB, Missing),
  289. {ok, [{insert, Missing}], S};
  290. handle_leader_cast({remove_globals, Globals}, S, _E) ->
  291. delete_globals(Globals),
  292. {ok, S};
  293. handle_leader_cast({pid_is_DOWN, Pid}, S, _E) ->
  294. Globals = ets:select(?TAB, [{{{Pid,'$1'},r},
  295. [{'==',{element,2,'$1'},g}],[{{'$1',Pid}}]}]),
  296. ets:delete(?TAB, {Pid,g}),
  297. case process_globals(Globals) of
  298. [] ->
  299. {ok, S};
  300. Broadcast ->
  301. {ok, Broadcast, S}
  302. end.
  303. process_globals(Globals) ->
  304. Modified =
  305. lists:foldl(
  306. fun({{T,_,_} = Key, Pid}, A) ->
  307. A1 = case T of
  308. c ->
  309. Incr = ets:lookup_element(?TAB, {Key,Pid}, 3),
  310. update_aggr_counter(Key, -Incr) ++ A;
  311. _ ->
  312. A
  313. end,
  314. K = ets_key(Key, Pid),
  315. ets:delete(?TAB, K),
  316. ets:delete(?TAB, {Pid,Key}),
  317. A1
  318. end, [], Globals),
  319. [{Op,Objs} || {Op,Objs} <- [{insert,Modified},
  320. {delete,Globals}], Objs =/= []].
  321. code_change(_FromVsn, S, _Extra, _E) ->
  322. {ok, S}.
  323. terminate(_Reason, _S) ->
  324. ok.
  325. from_leader(sync, S, _E) ->
  326. {ok, S};
  327. from_leader(Ops, S, _E) ->
  328. lists:foreach(
  329. fun({delete, Globals}) ->
  330. delete_globals(Globals);
  331. ({insert, Globals}) ->
  332. ets:insert(?TAB, Globals),
  333. lists:foreach(
  334. fun({{{_,g,_}=Key,_}, P, _}) ->
  335. ets:insert(?TAB, {{P,Key},r}),
  336. gproc_lib:ensure_monitor(P,g);
  337. ({{P,_K},r}) ->
  338. gproc_lib:ensure_monitor(P,g);
  339. (_) ->
  340. skip
  341. end, Globals)
  342. end, Ops),
  343. {ok, S}.
  344. delete_globals(Globals) ->
  345. lists:foreach(
  346. fun({{_,g,_},T} = K) when is_atom(T) ->
  347. ets:delete(?TAB, K);
  348. ({Key, Pid}) when is_pid(Pid) ->
  349. K = ets_key(Key,Pid),
  350. ets:delete(?TAB, K),
  351. ets:delete(?TAB, {Pid, Key});
  352. ({Pid, K}) when is_pid(Pid) ->
  353. ets:delete(?TAB, {Pid, K})
  354. %% case node(Pid) =:= node() of
  355. %% true ->
  356. %% ets:delete(?TAB, {Pid,g});
  357. %% _ -> ok
  358. %% end
  359. end, Globals).
  360. ets_key({T,_,_} = K, _) when T==n; T==a ->
  361. {K, T};
  362. ets_key(K, Pid) ->
  363. {K, Pid}.
  364. leader_call(Req) ->
  365. case gen_leader:leader_call(?MODULE, Req) of
  366. badarg -> erlang:error(badarg, Req);
  367. Reply -> Reply
  368. end.
  369. leader_cast(Msg) ->
  370. gen_leader:leader_cast(?MODULE, Msg).
  371. init(Opts) ->
  372. S0 = #state{},
  373. AlwaysBcast = proplists:get_value(always_broadcast, Opts,
  374. S0#state.always_broadcast),
  375. {ok, #state{always_broadcast = AlwaysBcast}}.
  376. surrendered_1(Globs) ->
  377. My_local_globs =
  378. ets:select(?TAB, [{{{{'_',g,'_'},'_'},'$1', '_'},
  379. [{'==', {node,'$1'}, node()}],
  380. ['$_']}]),
  381. %% remove all remote globals - we don't have monitors on them.
  382. ets:select_delete(?TAB, [{{{{'_',g,'_'},'_'}, '$1', '_'},
  383. [{'=/=', {node,'$1'}, node()}],
  384. [true]}]),
  385. %% insert new non-local globals, collect the leader's version of
  386. %% what my globals are
  387. Ldr_local_globs =
  388. lists:foldl(
  389. fun({{Key,_}=K, Pid, V}, Acc) when node(Pid) =/= node() ->
  390. ets:insert(?TAB, [{K, Pid, V}, {{Pid,Key}}]),
  391. Acc;
  392. ({_, Pid, _} = Obj, Acc) when node(Pid) == node() ->
  393. [Obj|Acc]
  394. end, [], Globs),
  395. case [{K,P,V} || {K,P,V} <- My_local_globs,
  396. not(lists:keymember(K, 1, Ldr_local_globs))] of
  397. [] ->
  398. %% phew! We have the same picture
  399. ok;
  400. [_|_] = Missing ->
  401. %% This is very unlikely, I think
  402. leader_cast({add_globals, Missing})
  403. end,
  404. case [{K,P} || {K,P,_} <- Ldr_local_globs,
  405. not(lists:keymember(K, 1, My_local_globs))] of
  406. [] ->
  407. ok;
  408. [_|_] = Remove ->
  409. leader_cast({remove_globals, Remove})
  410. end.
  411. update_aggr_counter({c,g,Ctr}, Incr) ->
  412. Key = {{a,g,Ctr},a},
  413. case ets:lookup(?TAB, Key) of
  414. [] ->
  415. [];
  416. [{K, Pid, Prev}] ->
  417. New = {K, Pid, Prev+Incr},
  418. ets:insert(?TAB, New),
  419. [New]
  420. end.
  421. pid_to_give_away_to(P) when is_pid(P) ->
  422. P;
  423. pid_to_give_away_to({T,g,_} = Key) when T==n; T==a ->
  424. case ets:lookup(?TAB, {Key, T}) of
  425. [{_, Pid, _}] ->
  426. Pid;
  427. _ ->
  428. undefined
  429. end.
  430. %% -ifdef(TEST).
  431. %% dist_test_() ->
  432. %% {timeout, 60,
  433. %% [{foreach,
  434. %% fun() ->
  435. %% Ns = start_slaves([n1, n2]),
  436. %% %% dbg:tracer(),
  437. %% %% [dbg:n(N) || N <- Ns],
  438. %% %% dbg:tpl(gproc_dist, x),
  439. %% %% dbg:p(all,[c]),
  440. %% Ns
  441. %% end,
  442. %% fun(Ns) ->
  443. %% [rpc:call(N, init, stop, []) || N <- Ns]
  444. %% end,
  445. %% [
  446. %% {with, [fun(Ns) -> {in_parallel, [fun(X) -> t_simple_reg(X) end,
  447. %% fun(X) -> t_await_reg(X) end,
  448. %% fun(X) -> t_give_away(X) end]
  449. %% }
  450. %% end]}
  451. %% ]}
  452. %% ]}.
  453. %% -define(T_NAME, {n, g, {?MODULE, ?LINE}}).
  454. %% t_simple_reg([H|_] = Ns) ->
  455. %% ?debugMsg(t_simple_reg),
  456. %% Name = ?T_NAME,
  457. %% P = t_spawn_reg(H, Name),
  458. %% ?assertMatch(ok, t_lookup_everywhere(Name, Ns, P)),
  459. %% ?assertMatch(true, t_call(P, {apply, gproc, unreg, [Name]})),
  460. %% ?assertMatch(ok, t_lookup_everywhere(Name, Ns, undefined)),
  461. %% ?assertMatch(ok, t_call(P, die)).
  462. %% t_await_reg([A,B|_]) ->
  463. %% ?debugMsg(t_await_reg),
  464. %% Name = ?T_NAME,
  465. %% P = t_spawn(A),
  466. %% P ! {self(), {apply, gproc, await, [Name]}},
  467. %% P1 = t_spawn_reg(B, Name),
  468. %% ?assert(P1 == receive
  469. %% {P, Res} ->
  470. %% element(1, Res)
  471. %% end),
  472. %% ?assertMatch(ok, t_call(P, die)),
  473. %% ?assertMatch(ok, t_call(P1, die)).
  474. %% t_give_away([A,B|_] = Ns) ->
  475. %% ?debugMsg(t_give_away),
  476. %% Na = ?T_NAME,
  477. %% Nb = ?T_NAME,
  478. %% Pa = t_spawn_reg(A, Na),
  479. %% Pb = t_spawn_reg(B, Nb),
  480. %% ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pa)),
  481. %% ?assertMatch(ok, t_lookup_everywhere(Nb, Ns, Pb)),
  482. %% %% ?debugHere,
  483. %% ?assertMatch(Pb, t_call(Pa, {apply, {gproc, give_away, [Na, Nb]}})),
  484. %% ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pb)),
  485. %% %% ?debugHere,
  486. %% ?assertMatch(Pa, t_call(Pa, {apply, {gproc, give_away, [Na, Pa]}})),
  487. %% ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pa)),
  488. %% %% ?debugHere,
  489. %% ?assertMatch(ok, t_call(Pa, die)),
  490. %% ?assertMatch(ok, t_call(Pb, die)).
  491. %% t_sleep() ->
  492. %% timer:sleep(1000).
  493. %% t_lookup_everywhere(Key, Nodes, Exp) ->
  494. %% t_lookup_everywhere(Key, Nodes, Exp, 3).
  495. %% t_lookup_everywhere(Key, _, Exp, 0) ->
  496. %% {lookup_failed, Key, Exp};
  497. %% t_lookup_everywhere(Key, Nodes, Exp, I) ->
  498. %% Expected = [{N, Exp} || N <- Nodes],
  499. %% Found = [{N,rpc:call(N, gproc, where, [Key])} || N <- Nodes],
  500. %% if Expected =/= Found ->
  501. %% ?debugFmt("lookup ~p failed (~p), retrying...~n", [Key, Found]),
  502. %% t_sleep(),
  503. %% t_lookup_everywhere(Key, Nodes, Exp, I-1);
  504. %% true ->
  505. %% ok
  506. %% end.
  507. %% t_spawn(Node) ->
  508. %% Me = self(),
  509. %% P = spawn(Node, fun() ->
  510. %% Me ! {self(), ok},
  511. %% t_loop()
  512. %% end),
  513. %% receive
  514. %% {P, ok} -> P
  515. %% end.
  516. %% t_spawn_reg(Node, Name) ->
  517. %% Me = self(),
  518. %% spawn(Node, fun() ->
  519. %% ?assertMatch(true, gproc:reg(Name)),
  520. %% Me ! {self(), ok},
  521. %% t_loop()
  522. %% end),
  523. %% receive
  524. %% {P, ok} -> P
  525. %% end.
  526. %% t_call(P, Req) ->
  527. %% P ! {self(), Req},
  528. %% receive
  529. %% {P, Res} ->
  530. %% Res
  531. %% end.
  532. %% t_loop() ->
  533. %% receive
  534. %% {From, die} ->
  535. %% From ! {self(), ok};
  536. %% {From, {apply, M, F, A}} ->
  537. %% From ! {self(), apply(M, F, A)},
  538. %% t_loop()
  539. %% end.
  540. %% start_slaves(Ns) ->
  541. %% [H|T] = Nodes = [start_slave(N) || N <- Ns],
  542. %% %% ?debugVal([pong = rpc:call(H, net, ping, [N]) || N <- T]),
  543. %% %% ?debugVal(rpc:multicall(Nodes, application, start, [gproc])),
  544. %% Nodes.
  545. %% start_slave(Name) ->
  546. %% case node() of
  547. %% nonode@nohost ->
  548. %% os:cmd("epmd -daemon"),
  549. %% {ok, _} = net_kernel:start([gproc_master, shortnames]);
  550. %% _ ->
  551. %% ok
  552. %% end,
  553. %% {ok, Node} = slave:start(
  554. %% host(), Name,
  555. %% "-pa . -pz ../ebin -pa ../deps/gen_leader/ebin "
  556. %% "-gproc gproc_dist all"),
  557. %% %% io:fwrite(user, "Slave node: ~p~n", [Node]),
  558. %% Node.
  559. %% host() ->
  560. %% [Name, Host] = re:split(atom_to_list(node()), "@", [{return, list}]),
  561. %% list_to_atom(Host).
  562. %% -endif.