gproc_dist_tests.erl 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567
  1. %% -*- erlang-indent-level: 4; indent-tabs-mode: nil -*-
  2. %% ``The contents of this file are subject to the Erlang Public License,
  3. %% Version 1.1, (the "License"); you may not use this file except in
  4. %% compliance with the License. You should have received a copy of the
  5. %% Erlang Public License along with this software. If not, it can be
  6. %% retrieved via the world wide web at http://www.erlang.org/.
  7. %%
  8. %% Software distributed under the License is distributed on an "AS IS"
  9. %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  10. %% the License for the specific language governing rights and limitations
  11. %% under the License.
  12. %%
  13. %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
  14. %% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
  15. %% AB. All Rights Reserved.''
  16. %%
  17. %% @author Ulf Wiger <ulf.wiger@erlang-solutions.com>
  18. %%
  19. -module(gproc_dist_tests).
  20. -ifdef(TEST).
  21. -include_lib("eunit/include/eunit.hrl").
  22. -export([t_spawn/1, t_spawn_reg/2]).
  23. dist_test_() ->
  24. {timeout, 120,
  25. [{setup,
  26. fun() ->
  27. case run_dist_tests() of
  28. true ->
  29. Ns = start_slaves([dist_test_n1, dist_test_n2]),
  30. ?assertMatch({[ok,ok],[]},
  31. rpc:multicall(Ns, application, set_env,
  32. [gproc, gproc_dist, Ns])),
  33. ?assertMatch({[ok,ok],[]},
  34. rpc:multicall(
  35. Ns, application, start, [gproc])),
  36. Ns;
  37. false ->
  38. skip
  39. end
  40. end,
  41. fun(_Ns) ->
  42. ok
  43. end,
  44. fun(skip) -> [];
  45. (Ns) when is_list(Ns) ->
  46. {inorder,
  47. [
  48. {inorder, [
  49. fun() ->
  50. ?debugVal(t_simple_reg(Ns))
  51. end,
  52. fun() ->
  53. ?debugVal(t_simple_reg_or_locate(Ns))
  54. end,
  55. fun() ->
  56. ?debugVal(t_simple_counter(Ns))
  57. end,
  58. fun() ->
  59. ?debugVal(t_aggr_counter(Ns))
  60. end,
  61. fun() ->
  62. ?debugVal(t_update_counters(Ns))
  63. end,
  64. fun() ->
  65. ?debugVal(t_shared_counter(Ns))
  66. end,
  67. fun() ->
  68. ?debugVal(t_mreg(Ns))
  69. end,
  70. fun() ->
  71. ?debugVal(t_await_reg(Ns))
  72. end,
  73. fun() ->
  74. ?debugVal(t_await_self(Ns))
  75. end,
  76. fun() ->
  77. ?debugVal(t_await_reg_exists(Ns))
  78. end,
  79. fun() ->
  80. ?debugVal(t_give_away(Ns))
  81. end,
  82. fun() ->
  83. ?debugVal(t_sync(Ns))
  84. end,
  85. fun() ->
  86. ?debugVal(t_monitor(Ns))
  87. end,
  88. fun() ->
  89. ?debugVal(t_standby_monitor(Ns))
  90. end,
  91. fun() ->
  92. ?debugVal(t_subscribe(Ns))
  93. end
  94. ]
  95. },
  96. fun() ->
  97. ?debugVal(t_sync_cand_dies(Ns))
  98. end,
  99. {timeout, 90, [fun() ->
  100. ?debugVal(t_fail_node(Ns))
  101. end]}
  102. ]}
  103. end
  104. }]}.
  105. run_dist_tests() ->
  106. case os:getenv("GPROC_DIST") of
  107. "true" -> true;
  108. "false" -> false;
  109. false ->
  110. case code:ensure_loaded(gen_leader) of
  111. {error, nofile} ->
  112. false;
  113. _ ->
  114. true
  115. end
  116. end.
  117. -define(T_NAME, {n, g, {?MODULE, ?LINE, erlang:now()}}).
  118. -define(T_KVL, [{foo, "foo"}, {bar, "bar"}]).
  119. -define(T_COUNTER, {c, g, {?MODULE, ?LINE}}).
  120. t_simple_reg([H|_] = Ns) ->
  121. Name = ?T_NAME,
  122. P = t_spawn_reg(H, Name),
  123. ?assertMatch(ok, t_lookup_everywhere(Name, Ns, P)),
  124. ?assertMatch(true, t_call(P, {apply, gproc, unreg, [Name]})),
  125. ?assertMatch(ok, t_lookup_everywhere(Name, Ns, undefined)),
  126. ?assertMatch(ok, t_call(P, die)).
  127. t_simple_reg_or_locate([A,B|_] = _Ns) ->
  128. Name = ?T_NAME,
  129. P1 = t_spawn(A),
  130. Ref = erlang:monitor(process, P1),
  131. ?assertMatch({P1, the_value},
  132. t_call(P1, {apply, gproc, reg_or_locate, [Name, the_value]})),
  133. P2 = t_spawn(B),
  134. Ref2 = erlang:monitor(process, P2),
  135. ?assertMatch({P1, the_value},
  136. t_call(P2, {apply, gproc, reg_or_locate, [Name, other_value]})),
  137. ?assertMatch(ok, t_call(P1, die)),
  138. ?assertMatch(ok, t_call(P2, die)),
  139. flush_down(Ref),
  140. flush_down(Ref2).
  141. flush_down(Ref) ->
  142. receive
  143. {'DOWN', Ref, _, _, _} ->
  144. ok
  145. after 1000 ->
  146. erlang:error({timeout, [flush_down, Ref]})
  147. end.
  148. t_simple_counter([H|_] = Ns) ->
  149. Ctr = ?T_COUNTER,
  150. P = t_spawn_reg(H, Ctr, 3),
  151. ?assertMatch(ok, t_read_everywhere(Ctr, P, Ns, 3)),
  152. ?assertMatch(5, t_call(P, {apply, gproc, update_counter, [Ctr, 2]})),
  153. ?assertMatch(ok, t_read_everywhere(Ctr, P, Ns, 5)),
  154. ?assertMatch(ok, t_call(P, die)).
  155. t_shared_counter([H|_] = Ns) ->
  156. Ctr = ?T_COUNTER,
  157. P = t_spawn_reg_shared(H, Ctr, 3),
  158. ?assertMatch(ok, t_read_everywhere(Ctr, shared, Ns, 3)),
  159. ?assertMatch(5, t_call(P, {apply, gproc, update_shared_counter, [Ctr, 2]})),
  160. ?assertMatch(ok, t_read_everywhere(Ctr, shared, Ns, 5)),
  161. ?assertMatch(ok, t_call(P, die)),
  162. ?assertMatch(ok, t_read_everywhere(Ctr, shared, Ns, 5)),
  163. ?assertMatch(ok, t_read_everywhere(Ctr, shared, Ns, 5)), % twice
  164. P1 = t_spawn(H),
  165. ?assertMatch(true, t_call(P1, {apply, gproc, unreg_shared, [Ctr]})),
  166. ?assertMatch(ok, t_read_everywhere(Ctr, shared, Ns, badarg)).
  167. t_aggr_counter([H1,H2|_] = Ns) ->
  168. {c,g,Nm} = Ctr = ?T_COUNTER,
  169. Aggr = {a,g,Nm},
  170. Pc1 = t_spawn_reg(H1, Ctr, 3),
  171. Pa = t_spawn_reg(H2, Aggr),
  172. ?assertMatch(ok, t_read_everywhere(Ctr, Pc1, Ns, 3)),
  173. ?assertMatch(ok, t_read_everywhere(Aggr, Pa, Ns, 3)),
  174. Pc2 = t_spawn_reg(H2, Ctr, 3),
  175. ?assertMatch(ok, t_read_everywhere(Ctr, Pc2, Ns, 3)),
  176. ?assertMatch(ok, t_read_everywhere(Aggr, Pa, Ns, 6)),
  177. ?assertMatch(5, t_call(Pc1, {apply, gproc, update_counter, [Ctr, 2]})),
  178. ?assertMatch(ok, t_read_everywhere(Ctr, Pc1, Ns, 5)),
  179. ?assertMatch(ok, t_read_everywhere(Aggr, Pa, Ns, 8)),
  180. ?assertMatch(ok, t_call(Pc1, die)),
  181. ?assertMatch(ok, t_read_everywhere(Aggr, Pa, Ns, 3)),
  182. ?assertMatch(ok, t_call(Pc2, die)),
  183. ?assertMatch(ok, t_call(Pa, die)).
  184. t_update_counters([H1,H2|_] = Ns) ->
  185. {c,g,N1} = C1 = ?T_COUNTER,
  186. A1 = {a,g,N1},
  187. C2 = ?T_COUNTER,
  188. P1 = t_spawn_reg(H1, C1, 2),
  189. P12 = t_spawn_reg(H2, C1, 2),
  190. P2 = t_spawn_reg(H2, C2, 1),
  191. Pa1 = t_spawn_reg(H2, A1),
  192. ?assertMatch(ok, t_read_everywhere(C1, P1, Ns, 2)),
  193. ?assertMatch(ok, t_read_everywhere(C1, P12, Ns, 2)),
  194. ?assertMatch(ok, t_read_everywhere(C2, P2, Ns, 1)),
  195. ?assertMatch(ok, t_read_everywhere(A1, Pa1, Ns, 4)),
  196. ?debugFmt("code:which(gproc_dist) = ~p~n", [code:which(gproc_dist)]),
  197. ?assertMatch([{C1,P1, 3},
  198. {C1,P12,4},
  199. {C2,P2, 0}], t_call(P1, {apply, gproc, update_counters,
  200. [g, [{C1,P1,1},{C1,P12,2},{C2,P2,{-2,0,0}}]]})),
  201. ?assertMatch(ok, t_read_everywhere(C1, P1, Ns, 3)),
  202. ?assertMatch(ok, t_read_everywhere(C1, P12, Ns, 4)),
  203. ?assertMatch(ok, t_read_everywhere(C2, P2, Ns, 0)),
  204. ?assertMatch(ok, t_read_everywhere(A1, Pa1, Ns, 7)),
  205. ?assertMatch(ok, t_call(P1, die)),
  206. ?assertMatch(ok, t_call(P12, die)),
  207. ?assertMatch(ok, t_call(P2, die)).
  208. t_mreg([H|_] = Ns) ->
  209. Kvl = ?T_KVL,
  210. Keys = [K || {K,_} <- Kvl],
  211. P = t_spawn_mreg(H, Kvl),
  212. [?assertMatch(ok, t_lookup_everywhere({n,g,K}, Ns, P)) || K <- Keys],
  213. ?assertMatch(true, t_call(P, {apply, gproc, munreg, [n, g, Keys]})),
  214. [?assertMatch(ok, t_lookup_everywhere({n,g,K},Ns,undefined)) || K <- Keys],
  215. ?assertMatch(ok, t_call(P, die)).
  216. t_await_reg([A,B|_]) ->
  217. Name = ?T_NAME,
  218. P = t_spawn(A),
  219. Ref = erlang:monitor(process, P),
  220. P ! {self(), Ref, {apply, gproc, await, [Name]}},
  221. t_sleep(),
  222. P1 = t_spawn_reg(B, Name),
  223. ?assert(P1 == receive
  224. {P, Ref, Res} ->
  225. element(1, Res);
  226. {'DOWN', Ref, _, _, Reason} ->
  227. erlang:error(Reason);
  228. Other ->
  229. erlang:error({received,Other})
  230. end),
  231. ?assertMatch(ok, t_call(P, die)),
  232. flush_down(Ref),
  233. ?assertMatch(ok, t_call(P1, die)).
  234. t_await_self([A|_]) ->
  235. Name = ?T_NAME,
  236. P = t_spawn(A, false), % don't buffer unknowns
  237. Ref = t_call(P, {apply, gproc, nb_wait, [Name]}),
  238. ?assertMatch(ok, t_call(P, {selective, true})),
  239. ?assertMatch(true, t_call(P, {apply, gproc, reg, [Name, some_value]})),
  240. ?assertMatch({registered, {Name, P, some_value}},
  241. t_call(P, {apply_fun, fun() ->
  242. receive
  243. {gproc, Ref, R, Wh} ->
  244. {R, Wh}
  245. after 10000 ->
  246. timeout
  247. end
  248. end})),
  249. ?assertMatch(ok, t_call(P, {selective, false})),
  250. ?assertMatch(true, t_call(P, {apply, gproc, unreg, [Name]})).
  251. t_await_reg_exists([A,B|_]) ->
  252. Name = ?T_NAME,
  253. P = t_spawn(A),
  254. Ref = erlang:monitor(process, P),
  255. P1 = t_spawn_reg(B, Name),
  256. P ! {self(), Ref, {apply, gproc, await, [Name]}},
  257. ?assert(P1 == receive
  258. {P, Ref, Res} ->
  259. element(1, Res);
  260. {'DOWN', Ref, _, _, Reason} ->
  261. erlang:error(Reason);
  262. Other ->
  263. erlang:error({received,Other})
  264. end),
  265. ?assertMatch(ok, t_call(P, die)),
  266. ?assertMatch(ok, t_call(P1, die)).
  267. t_give_away([A,B|_] = Ns) ->
  268. Na = ?T_NAME,
  269. Nb = ?T_NAME,
  270. Pa = t_spawn_reg(A, Na),
  271. Pb = t_spawn_reg(B, Nb),
  272. ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pa)),
  273. ?assertMatch(ok, t_lookup_everywhere(Nb, Ns, Pb)),
  274. ?assertMatch(Pb, t_call(Pa, {apply, gproc, give_away, [Na, Nb]})),
  275. ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pb)),
  276. ?assertMatch(Pa, t_call(Pb, {apply, gproc, give_away, [Na, Pa]})),
  277. ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pa)),
  278. ?assertMatch(ok, t_call(Pa, die)),
  279. ?assertMatch(ok, t_call(Pb, die)).
  280. t_sync(Ns) ->
  281. %% Don't really know how to test this...
  282. [?assertMatch(true, rpc:call(N, gproc_dist, sync, []))
  283. || N <- Ns].
  284. t_monitor([A,B|_]) ->
  285. Na = ?T_NAME,
  286. Pa = t_spawn_reg(A, Na),
  287. Pb = t_spawn(B, _Selective = true),
  288. Ref = t_call(Pb, {apply, gproc, monitor, [Na]}),
  289. ?assert(is_reference(Ref)),
  290. ?assertMatch(ok, t_call(Pa, die)),
  291. ?assertMatch({gproc,unreg,Ref,Na}, got_msg(Pb, gproc)),
  292. Pc = t_spawn_reg(A, Na),
  293. Ref1 = t_call(Pb, {apply, gproc, monitor, [Na]}),
  294. ?assertMatch(true, t_call(Pc, {apply, gproc, unreg, [Na]})),
  295. ?assertMatch({gproc,unreg,Ref1,Na}, got_msg(Pb, gproc)).
  296. t_standby_monitor([A,B|_] = Ns) ->
  297. Na = ?T_NAME,
  298. Pa = t_spawn_reg(A, Na),
  299. Pb = t_spawn(B, _Selective = true),
  300. Ref = t_call(Pb, {apply, gproc, monitor, [Na, standby]}),
  301. ?assert(is_reference(Ref)),
  302. ?assertMatch(ok, t_call(Pa, die)),
  303. ?assertMatch({gproc,{failover,Pb},Ref,Na}, got_msg(Pb, gproc)),
  304. ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pb)),
  305. Pc = t_spawn(A, true),
  306. Ref1 = t_call(Pc, {apply, gproc, monitor, [Na, standby]}),
  307. ?assertMatch(true, t_call(Pb, {apply, gproc, unreg, [Na]})),
  308. ?assertMatch({gproc,unreg,Ref1,Na}, got_msg(Pc, gproc)),
  309. ?assertMatch(ok, t_lookup_everywhere(Na, Ns, undefined)).
  310. t_subscribe([A,B|_] = Ns) ->
  311. Na = ?T_NAME,
  312. Pb = t_spawn(B, _Selective = true),
  313. ?assertEqual(ok, t_call(Pb, {apply, gproc_monitor, subscribe, [Na]})),
  314. ?assertMatch({gproc_monitor, Na, undefined}, got_msg(Pb, gproc_monitor)),
  315. Pa = t_spawn_reg(A, Na),
  316. ?assertMatch({gproc_monitor, Na, Pa}, got_msg(Pb, gproc_monitor)),
  317. Pc = t_spawn(A),
  318. t_call(Pa, {apply, gproc, give_away, [Na, Pc]}),
  319. ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pc)),
  320. ?assertEqual({gproc_monitor,Na,{migrated,Pc}}, got_msg(Pb, gproc_monitor)),
  321. ?assertEqual(ok, t_call(Pc, die)),
  322. ?assertEqual({gproc_monitor,Na,undefined}, got_msg(Pb, gproc_monitor)),
  323. ok.
  324. got_msg(Pb, Tag) ->
  325. t_call(Pb,
  326. {apply_fun,
  327. fun() ->
  328. receive
  329. M when element(1, M) == Tag ->
  330. M
  331. after 1000 ->
  332. erlang:error({timeout, got_msg, [Pb, Tag]})
  333. end
  334. end}).
  335. %% Verify that the gproc_dist:sync() call returns true even if a candidate dies
  336. %% while the sync is underway. This test makes use of sys:suspend() to ensure that
  337. %% the other candidate doesn't respond too quickly.
  338. t_sync_cand_dies([A,B|_]) ->
  339. Leader = rpc:call(A, gproc_dist, get_leader, []),
  340. Other = case Leader of
  341. A -> B;
  342. B -> A
  343. end,
  344. ?assertMatch(ok, rpc:call(Other, sys, suspend, [gproc_dist])),
  345. P = rpc:call(Other, erlang, whereis, [gproc_dist]),
  346. Key = rpc:async_call(Leader, gproc_dist, sync, []),
  347. %% The overall timeout for gproc_dist:sync() is 5 seconds. Here, we should
  348. %% still be waiting.
  349. ?assertMatch(timeout, rpc:nb_yield(Key, 1000)),
  350. exit(P, kill),
  351. %% The leader should detect that the other candidate died and respond
  352. %% immediately. Therefore, we should have our answer well within 1 sec.
  353. ?assertMatch({value, true}, rpc:nb_yield(Key, 1000)).
  354. t_fail_node([A,B|_] = Ns) ->
  355. Na = ?T_NAME,
  356. Nb = ?T_NAME,
  357. Pa = t_spawn_reg(A, Na),
  358. Pb = t_spawn_reg(B, Nb),
  359. ?assertMatch(ok, rpc:call(A, application, stop, [gproc])),
  360. ?assertMatch(ok, t_lookup_everywhere(Na, Ns -- [A], undefined)),
  361. ?assertMatch(ok, t_lookup_everywhere(Nb, Ns -- [A], Pb)),
  362. ?assertMatch(ok, rpc:call(A, application, start, [gproc])),
  363. ?assertMatch(ok, t_lookup_everywhere(Na, Ns, undefined)),
  364. ?assertMatch(ok, t_lookup_everywhere(Nb, Ns, Pb)),
  365. ?assertMatch(ok, t_call(Pa, die)),
  366. ?assertMatch(ok, t_call(Pb, die)).
  367. t_sleep() ->
  368. timer:sleep(500).
  369. t_lookup_everywhere(Key, Nodes, Exp) ->
  370. t_lookup_everywhere(Key, Nodes, Exp, 3).
  371. t_lookup_everywhere(Key, _, Exp, 0) ->
  372. {lookup_failed, Key, Exp};
  373. t_lookup_everywhere(Key, Nodes, Exp, I) ->
  374. Expected = [{N, Exp} || N <- Nodes],
  375. Found = [{N,rpc:call(N, gproc, where, [Key])} || N <- Nodes],
  376. if Expected =/= Found ->
  377. ?debugFmt("lookup ~p failed~n"
  378. "(Expected: ~p;~n"
  379. " Found : ~p), retrying...~n",
  380. [Key, Expected, Found]),
  381. t_sleep(),
  382. t_lookup_everywhere(Key, Nodes, Exp, I-1);
  383. true ->
  384. ok
  385. end.
  386. t_read_everywhere(Key, Pid, Nodes, Exp) ->
  387. t_read_everywhere(Key, Pid, Nodes, Exp, 3).
  388. t_read_everywhere(Key, _, _, Exp, 0) ->
  389. {read_failed, Key, Exp};
  390. t_read_everywhere(Key, Pid, Nodes, Exp, I) ->
  391. Expected = [{N, Exp} || N <- Nodes],
  392. Found = [{N, read_result(rpc:call(N, gproc, get_value, [Key, Pid]))}
  393. || N <- Nodes],
  394. if Expected =/= Found ->
  395. ?debugFmt("read ~p failed~n"
  396. "(Expected: ~p;~n"
  397. " Found : ~p), retrying...~n",
  398. [{Key, Pid}, Expected, Found]),
  399. t_sleep(),
  400. t_read_everywhere(Key, Pid, Nodes, Exp, I-1);
  401. true ->
  402. ok
  403. end.
  404. read_result({badrpc, {'EXIT', {badarg, _}}}) -> badarg;
  405. read_result(R) -> R.
  406. t_spawn(Node) ->
  407. t_spawn(Node, false).
  408. t_spawn(Node, Selective) when is_boolean(Selective) ->
  409. Me = self(),
  410. P = spawn(Node, fun() ->
  411. Me ! {self(), ok},
  412. t_loop(Selective)
  413. end),
  414. receive
  415. {P, ok} -> P
  416. after 1000 ->
  417. erlang:error({timeout, t_spawn, [Node, Selective]})
  418. end.
  419. t_spawn_reg(Node, Name) ->
  420. t_spawn_reg(Node, Name, default_value(Name)).
  421. t_spawn_reg(Node, Name, Value) ->
  422. Me = self(),
  423. P = spawn(Node, fun() ->
  424. ?assertMatch(true, gproc:reg(Name, Value)),
  425. Me ! {self(), ok},
  426. t_loop()
  427. end),
  428. receive
  429. {P, ok} -> P
  430. after 1000 ->
  431. erlang:error({timeout, t_spawn_reg, [Node, Name, Value]})
  432. end.
  433. t_spawn_reg_shared(Node, Name, Value) ->
  434. Me = self(),
  435. P = spawn(Node, fun() ->
  436. ?assertMatch(true, gproc:reg_shared(Name, Value)),
  437. Me ! {self(), ok},
  438. t_loop()
  439. end),
  440. receive
  441. {P, ok} -> P
  442. after 1000 ->
  443. erlang:error({timeout, t_spawn_reg_shared, [Node,Name,Value]})
  444. end.
  445. default_value({c,_,_}) -> 0;
  446. default_value(_) -> undefined.
  447. t_spawn_mreg(Node, KVL) ->
  448. Me = self(),
  449. P = spawn(Node, fun() ->
  450. ?assertMatch(true, gproc:mreg(n, g, KVL)),
  451. Me ! {self(), ok},
  452. t_loop()
  453. end),
  454. receive
  455. {P, ok} -> P
  456. end.
  457. t_call(P, Req) ->
  458. Ref = erlang:monitor(process, P),
  459. P ! {self(), Ref, Req},
  460. receive
  461. {P, Ref, Res} ->
  462. erlang:demonitor(Ref, [flush]),
  463. Res;
  464. {'DOWN', Ref, _, _, Error} ->
  465. erlang:error({'DOWN', P, Error})
  466. after 1000 ->
  467. erlang:error({timeout,t_call,[P,Req]})
  468. end.
  469. t_loop() ->
  470. t_loop(false).
  471. t_loop(Selective) when is_boolean(Selective) ->
  472. receive
  473. {From, Ref, die} ->
  474. From ! {self(), Ref, ok};
  475. {From, Ref, {selective, Bool}} when is_boolean(Bool) ->
  476. From ! {self(), Ref, ok},
  477. t_loop(Bool);
  478. {From, Ref, {apply, M, F, A}} ->
  479. From ! {self(), Ref, apply(M, F, A)},
  480. t_loop(Selective);
  481. {From, Ref, {apply_fun, F}} ->
  482. From ! {self(), Ref, F()},
  483. t_loop(Selective);
  484. Other when not Selective ->
  485. ?debugFmt("got unknown msg: ~p~n", [Other]),
  486. exit({unknown_msg, Other})
  487. end.
  488. start_slaves(Ns) ->
  489. [H|T] = Nodes = [start_slave(N) || N <- Ns],
  490. _ = [rpc:call(H, net_adm, ping, [N]) || N <- T],
  491. Nodes.
  492. start_slave(Name) ->
  493. case node() of
  494. nonode@nohost ->
  495. os:cmd("epmd -daemon"),
  496. {ok, _} = net_kernel:start([gproc_master, shortnames]);
  497. _ ->
  498. ok
  499. end,
  500. {Pa, Pz} = paths(),
  501. Paths = "-pa ./ -pz ../ebin" ++
  502. lists:flatten([[" -pa " ++ Path || Path <- Pa],
  503. [" -pz " ++ Path || Path <- Pz]]),
  504. {ok, Node} = slave:start(host(), Name, Paths),
  505. Node.
  506. paths() ->
  507. Path = code:get_path(),
  508. {ok, [[Root]]} = init:get_argument(root),
  509. {Pas, Rest} = lists:splitwith(fun(P) ->
  510. not lists:prefix(Root, P)
  511. end, Path),
  512. {_, Pzs} = lists:splitwith(fun(P) ->
  513. lists:prefix(Root, P)
  514. end, Rest),
  515. {Pas, Pzs}.
  516. host() ->
  517. [_Name, Host] = re:split(atom_to_list(node()), "@", [{return, list}]),
  518. list_to_atom(Host).
  519. -endif.