gproc_dist_tests.erl 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464
  1. %% ``The contents of this file are subject to the Erlang Public License,
  2. %% Version 1.1, (the "License"); you may not use this file except in
  3. %% compliance with the License. You should have received a copy of the
  4. %% Erlang Public License along with this software. If not, it can be
  5. %% retrieved via the world wide web at http://www.erlang.org/.
  6. %%
  7. %% Software distributed under the License is distributed on an "AS IS"
  8. %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  9. %% the License for the specific language governing rights and limitations
  10. %% under the License.
  11. %%
  12. %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
  13. %% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
  14. %% AB. All Rights Reserved.''
  15. %%
  16. %% @author Ulf Wiger <ulf.wiger@erlang-solutions.com>
  17. %%
  18. -module(gproc_dist_tests).
  19. -ifdef(TEST).
  20. -include_lib("eunit/include/eunit.hrl").
  21. -export([t_spawn/1, t_spawn_reg/2]).
  22. dist_test_() ->
  23. {timeout, 120,
  24. [{setup,
  25. fun() ->
  26. Ns = start_slaves([dist_test_n1, dist_test_n2]),
  27. ?assertMatch({[ok,ok],[]},
  28. rpc:multicall(Ns, application, set_env,
  29. [gproc, gproc_dist, Ns])),
  30. ?assertMatch({[ok,ok],[]},
  31. rpc:multicall(Ns, application, start, [gproc])),
  32. Ns
  33. end,
  34. fun(Ns) ->
  35. [rpc:call(N, init, stop, []) || N <- Ns]
  36. end,
  37. fun(Ns) ->
  38. {inorder,
  39. [
  40. {inparallel, [
  41. fun() ->
  42. ?debugVal(t_simple_reg(Ns))
  43. end,
  44. fun() ->
  45. ?debugVal(t_simple_counter(Ns))
  46. end,
  47. fun() ->
  48. ?debugVal(t_aggr_counter(Ns))
  49. end,
  50. fun() ->
  51. ?debugVal(t_shared_counter(Ns))
  52. end,
  53. fun() ->
  54. ?debugVal(t_mreg(Ns))
  55. end,
  56. fun() ->
  57. ?debugVal(t_await_reg(Ns))
  58. end,
  59. fun() ->
  60. ?debugVal(t_await_self(Ns))
  61. end,
  62. fun() ->
  63. ?debugVal(t_await_reg_exists(Ns))
  64. end,
  65. fun() ->
  66. ?debugVal(t_give_away(Ns))
  67. end,
  68. fun() ->
  69. ?debugVal(t_sync(Ns))
  70. end,
  71. fun() ->
  72. ?debugVal(t_monitor(Ns))
  73. end,
  74. fun() ->
  75. ?debugVal(t_subscribe(Ns))
  76. end
  77. ]
  78. },
  79. fun() ->
  80. ?debugVal(t_sync_cand_dies(Ns))
  81. end,
  82. {timeout, 90, [fun() ->
  83. ?debugVal(t_fail_node(Ns))
  84. end]}
  85. ]}
  86. end
  87. }]}.
  88. -define(T_NAME, {n, g, {?MODULE, ?LINE}}).
  89. -define(T_KVL, [{foo, "foo"}, {bar, "bar"}]).
  90. -define(T_COUNTER, {c, g, {?MODULE, ?LINE}}).
  91. t_simple_reg([H|_] = Ns) ->
  92. Name = ?T_NAME,
  93. P = t_spawn_reg(H, Name),
  94. ?assertMatch(ok, t_lookup_everywhere(Name, Ns, P)),
  95. ?assertMatch(true, t_call(P, {apply, gproc, unreg, [Name]})),
  96. ?assertMatch(ok, t_lookup_everywhere(Name, Ns, undefined)),
  97. ?assertMatch(ok, t_call(P, die)).
  98. t_simple_counter([H|_] = Ns) ->
  99. Ctr = ?T_COUNTER,
  100. P = t_spawn_reg(H, Ctr, 3),
  101. ?assertMatch(ok, t_read_everywhere(Ctr, P, Ns, 3)),
  102. ?assertMatch(5, t_call(P, {apply, gproc, update_counter, [Ctr, 2]})),
  103. ?assertMatch(ok, t_read_everywhere(Ctr, P, Ns, 5)),
  104. ?assertMatch(ok, t_call(P, die)).
  105. t_shared_counter([H|_] = Ns) ->
  106. Ctr = ?T_COUNTER,
  107. P = t_spawn_reg_shared(H, Ctr, 3),
  108. ?assertMatch(ok, t_read_everywhere(Ctr, shared, Ns, 3)),
  109. ?assertMatch(5, t_call(P, {apply, gproc, update_shared_counter, [Ctr, 2]})),
  110. ?assertMatch(ok, t_read_everywhere(Ctr, shared, Ns, 5)),
  111. ?assertMatch(ok, t_call(P, die)),
  112. ?assertMatch(ok, t_read_everywhere(Ctr, shared, Ns, 5)),
  113. ?assertMatch(ok, t_read_everywhere(Ctr, shared, Ns, 5)), % twice
  114. P1 = t_spawn(H),
  115. ?assertMatch(true, t_call(P1, {apply, gproc, unreg_shared, [Ctr]})),
  116. ?assertMatch(ok, t_read_everywhere(Ctr, shared, Ns, badarg)).
  117. t_aggr_counter([H1,H2|_] = Ns) ->
  118. {c,g,Nm} = Ctr = ?T_COUNTER,
  119. Aggr = {a,g,Nm},
  120. Pc1 = t_spawn_reg(H1, Ctr, 3),
  121. Pa = t_spawn_reg(H2, Aggr),
  122. ?assertMatch(ok, t_read_everywhere(Ctr, Pc1, Ns, 3)),
  123. ?assertMatch(ok, t_read_everywhere(Aggr, Pa, Ns, 3)),
  124. Pc2 = t_spawn_reg(H2, Ctr, 3),
  125. ?assertMatch(ok, t_read_everywhere(Ctr, Pc2, Ns, 3)),
  126. ?assertMatch(ok, t_read_everywhere(Aggr, Pa, Ns, 6)),
  127. ?assertMatch(5, t_call(Pc1, {apply, gproc, update_counter, [Ctr, 2]})),
  128. ?assertMatch(ok, t_read_everywhere(Ctr, Pc1, Ns, 5)),
  129. ?assertMatch(ok, t_read_everywhere(Aggr, Pa, Ns, 8)),
  130. ?assertMatch(ok, t_call(Pc1, die)),
  131. ?assertMatch(ok, t_read_everywhere(Aggr, Pa, Ns, 3)),
  132. ?assertMatch(ok, t_call(Pc2, die)),
  133. ?assertMatch(ok, t_call(Pa, die)).
  134. t_mreg([H|_] = Ns) ->
  135. Kvl = ?T_KVL,
  136. Keys = [K || {K,_} <- Kvl],
  137. P = t_spawn_mreg(H, Kvl),
  138. [?assertMatch(ok, t_lookup_everywhere({n,g,K}, Ns, P)) || K <- Keys],
  139. ?assertMatch(true, t_call(P, {apply, gproc, munreg, [n, g, Keys]})),
  140. [?assertMatch(ok, t_lookup_everywhere({n,g,K},Ns,undefined)) || K <- Keys],
  141. ?assertMatch(ok, t_call(P, die)).
  142. t_await_reg([A,B|_]) ->
  143. Name = ?T_NAME,
  144. P = t_spawn(A),
  145. Ref = erlang:monitor(process, P),
  146. P ! {self(), Ref, {apply, gproc, await, [Name]}},
  147. t_sleep(),
  148. P1 = t_spawn_reg(B, Name),
  149. ?assert(P1 == receive
  150. {P, Ref, Res} ->
  151. element(1, Res);
  152. {'DOWN', Ref, _, _, Reason} ->
  153. erlang:error(Reason);
  154. Other ->
  155. erlang:error({received,Other})
  156. end),
  157. ?assertMatch(ok, t_call(P, die)),
  158. ?assertMatch(ok, t_call(P1, die)).
  159. t_await_self([A|_]) ->
  160. Name = ?T_NAME,
  161. P = t_spawn(A, false), % don't buffer unknowns
  162. Ref = t_call(P, {apply, gproc, nb_wait, [Name]}),
  163. ?assertMatch(ok, t_call(P, {selective, true})),
  164. ?assertMatch(true, t_call(P, {apply, gproc, reg, [Name, some_value]})),
  165. ?assertMatch({registered, {Name, P, some_value}},
  166. t_call(P, {apply_fun, fun() ->
  167. receive
  168. {gproc, Ref, R, Wh} ->
  169. {R, Wh}
  170. after 10000 ->
  171. timeout
  172. end
  173. end})),
  174. ?assertMatch(ok, t_call(P, {selective, false})),
  175. ?assertMatch(true, t_call(P, {apply, gproc, unreg, [Name]})).
  176. t_await_reg_exists([A,B|_]) ->
  177. Name = ?T_NAME,
  178. P = t_spawn(A),
  179. Ref = erlang:monitor(process, P),
  180. P1 = t_spawn_reg(B, Name),
  181. P ! {self(), Ref, {apply, gproc, await, [Name]}},
  182. ?assert(P1 == receive
  183. {P, Ref, Res} ->
  184. element(1, Res);
  185. {'DOWN', Ref, _, _, Reason} ->
  186. erlang:error(Reason);
  187. Other ->
  188. erlang:error({received,Other})
  189. end),
  190. ?assertMatch(ok, t_call(P, die)),
  191. ?assertMatch(ok, t_call(P1, die)).
  192. t_give_away([A,B|_] = Ns) ->
  193. Na = ?T_NAME,
  194. Nb = ?T_NAME,
  195. Pa = t_spawn_reg(A, Na),
  196. Pb = t_spawn_reg(B, Nb),
  197. ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pa)),
  198. ?assertMatch(ok, t_lookup_everywhere(Nb, Ns, Pb)),
  199. ?assertMatch(Pb, t_call(Pa, {apply, gproc, give_away, [Na, Nb]})),
  200. ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pb)),
  201. ?assertMatch(Pa, t_call(Pb, {apply, gproc, give_away, [Na, Pa]})),
  202. ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pa)),
  203. ?assertMatch(ok, t_call(Pa, die)),
  204. ?assertMatch(ok, t_call(Pb, die)).
  205. t_sync(Ns) ->
  206. %% Don't really know how to test this...
  207. [?assertMatch(true, rpc:call(N, gproc_dist, sync, []))
  208. || N <- Ns].
  209. t_monitor([A,B|_]) ->
  210. Na = ?T_NAME,
  211. Pa = t_spawn_reg(A, Na),
  212. Pb = t_spawn(B, _Selective = true),
  213. Ref = t_call(Pb, {apply, gproc, monitor, [Na]}),
  214. ?assert(is_reference(Ref)),
  215. ?assertMatch(ok, t_call(Pa, die)),
  216. ?assertMatch({gproc,unreg,Ref,Na}, got_msg(Pb, gproc)),
  217. Pc = t_spawn_reg(A, Na),
  218. Ref1 = t_call(Pb, {apply, gproc, monitor, [Na]}),
  219. ?assertMatch(true, t_call(Pc, {apply, gproc, unreg, [Na]})),
  220. ?assertMatch({gproc,unreg,Ref1,Na}, got_msg(Pb, gproc)).
  221. t_subscribe([A,B|_] = Ns) ->
  222. Na = ?T_NAME,
  223. Pb = t_spawn(B, _Selective = true),
  224. ?assertEqual(ok, t_call(Pb, {apply, gproc_monitor, subscribe, [Na]})),
  225. ?assertMatch({gproc_monitor, Na, undefined}, got_msg(Pb, gproc_monitor)),
  226. Pa = t_spawn_reg(A, Na),
  227. ?assertMatch({gproc_monitor, Na, Pa}, got_msg(Pb, gproc_monitor)),
  228. Pc = t_spawn(A),
  229. t_call(Pa, {apply, gproc, give_away, [Na, Pc]}),
  230. ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pc)),
  231. ?assertEqual({gproc_monitor,Na,{migrated,Pc}}, got_msg(Pb, gproc_monitor)),
  232. ?assertEqual(ok, t_call(Pc, die)),
  233. ?assertEqual({gproc_monitor,Na,undefined}, got_msg(Pb, gproc_monitor)).
  234. got_msg(Pb, Tag) ->
  235. t_call(Pb,
  236. {apply_fun,
  237. fun() ->
  238. receive
  239. M when element(1, M) == Tag ->
  240. M
  241. after 1000 ->
  242. timeout
  243. end
  244. end}).
  245. %% Verify that the gproc_dist:sync() call returns true even if a candidate dies
  246. %% while the sync is underway. This test makes use of sys:suspend() to ensure that
  247. %% the other candidate doesn't respond too quickly.
  248. t_sync_cand_dies([A,B|_]) ->
  249. Leader = rpc:call(A, gproc_dist, get_leader, []),
  250. Other = case Leader of
  251. A -> B;
  252. B -> A
  253. end,
  254. ?assertMatch(ok, rpc:call(Other, sys, suspend, [gproc_dist])),
  255. P = rpc:call(Other, erlang, whereis, [gproc_dist]),
  256. Key = rpc:async_call(Leader, gproc_dist, sync, []),
  257. %% The overall timeout for gproc_dist:sync() is 5 seconds. Here, we should
  258. %% still be waiting.
  259. ?assertMatch(timeout, rpc:nb_yield(Key, 1000)),
  260. exit(P, kill),
  261. %% The leader should detect that the other candidate died and respond
  262. %% immediately. Therefore, we should have our answer well within 1 sec.
  263. ?assertMatch({value, true}, rpc:nb_yield(Key, 1000)).
  264. t_fail_node([A,B|_] = Ns) ->
  265. Na = ?T_NAME,
  266. Nb = ?T_NAME,
  267. Pa = t_spawn_reg(A, Na),
  268. Pb = t_spawn_reg(B, Nb),
  269. ?assertMatch(ok, rpc:call(A, application, stop, [gproc])),
  270. ?assertMatch(ok, t_lookup_everywhere(Na, Ns -- [A], undefined)),
  271. ?assertMatch(ok, t_lookup_everywhere(Nb, Ns -- [A], Pb)),
  272. ?assertMatch(ok, rpc:call(A, application, start, [gproc])),
  273. ?assertMatch(ok, t_lookup_everywhere(Na, Ns, undefined)),
  274. ?assertMatch(ok, t_lookup_everywhere(Nb, Ns, Pb)),
  275. ?assertMatch(ok, t_call(Pa, die)),
  276. ?assertMatch(ok, t_call(Pb, die)).
  277. t_sleep() ->
  278. timer:sleep(500).
  279. t_lookup_everywhere(Key, Nodes, Exp) ->
  280. t_lookup_everywhere(Key, Nodes, Exp, 3).
  281. t_lookup_everywhere(Key, _, Exp, 0) ->
  282. {lookup_failed, Key, Exp};
  283. t_lookup_everywhere(Key, Nodes, Exp, I) ->
  284. Expected = [{N, Exp} || N <- Nodes],
  285. Found = [{N,rpc:call(N, gproc, where, [Key])} || N <- Nodes],
  286. if Expected =/= Found ->
  287. ?debugFmt("lookup ~p failed~n"
  288. "(Expected: ~p;~n"
  289. " Found : ~p), retrying...~n",
  290. [Key, Expected, Found]),
  291. t_sleep(),
  292. t_lookup_everywhere(Key, Nodes, Exp, I-1);
  293. true ->
  294. ok
  295. end.
  296. t_read_everywhere(Key, Pid, Nodes, Exp) ->
  297. t_read_everywhere(Key, Pid, Nodes, Exp, 3).
  298. t_read_everywhere(Key, _, _, Exp, 0) ->
  299. {read_failed, Key, Exp};
  300. t_read_everywhere(Key, Pid, Nodes, Exp, I) ->
  301. Expected = [{N, Exp} || N <- Nodes],
  302. Found = [{N, read_result(rpc:call(N, gproc, get_value, [Key, Pid]))}
  303. || N <- Nodes],
  304. if Expected =/= Found ->
  305. ?debugFmt("read ~p failed~n"
  306. "(Expected: ~p;~n"
  307. " Found : ~p), retrying...~n",
  308. [{Key, Pid}, Expected, Found]),
  309. t_sleep(),
  310. t_read_everywhere(Key, Pid, Nodes, Exp, I-1);
  311. true ->
  312. ok
  313. end.
  314. read_result({badrpc, {'EXIT', {badarg, _}}}) -> badarg;
  315. read_result(R) -> R.
  316. t_spawn(Node) ->
  317. t_spawn(Node, false).
  318. t_spawn(Node, Selective) when is_boolean(Selective) ->
  319. Me = self(),
  320. P = spawn(Node, fun() ->
  321. Me ! {self(), ok},
  322. t_loop(Selective)
  323. end),
  324. receive
  325. {P, ok} -> P
  326. end.
  327. t_spawn_reg(Node, Name) ->
  328. t_spawn_reg(Node, Name, default_value(Name)).
  329. t_spawn_reg(Node, Name, Value) ->
  330. Me = self(),
  331. spawn(Node, fun() ->
  332. ?assertMatch(true, gproc:reg(Name, Value)),
  333. Me ! {self(), ok},
  334. t_loop()
  335. end),
  336. receive
  337. {P, ok} -> P
  338. end.
  339. t_spawn_reg_shared(Node, Name, Value) ->
  340. Me = self(),
  341. spawn(Node, fun() ->
  342. ?assertMatch(true, gproc:reg_shared(Name, Value)),
  343. Me ! {self(), ok},
  344. t_loop()
  345. end),
  346. receive
  347. {P, ok} -> P
  348. end.
  349. default_value({c,_,_}) -> 0;
  350. default_value(_) -> undefined.
  351. t_spawn_mreg(Node, KVL) ->
  352. Me = self(),
  353. spawn(Node, fun() ->
  354. ?assertMatch(true, gproc:mreg(n, g, KVL)),
  355. Me ! {self(), ok},
  356. t_loop()
  357. end),
  358. receive
  359. {P, ok} -> P
  360. end.
  361. t_call(P, Req) ->
  362. Ref = erlang:monitor(process, P),
  363. P ! {self(), Ref, Req},
  364. receive
  365. {P, Ref, Res} ->
  366. erlang:demonitor(Ref),
  367. Res;
  368. {'DOWN', Ref, _, _, Error} ->
  369. erlang:error({'DOWN', P, Error})
  370. end.
  371. t_loop() ->
  372. t_loop(false).
  373. t_loop(Selective) when is_boolean(Selective) ->
  374. receive
  375. {From, Ref, die} ->
  376. From ! {self(), Ref, ok};
  377. {From, Ref, {selective, Bool}} when is_boolean(Bool) ->
  378. From ! {self(), Ref, ok},
  379. t_loop(Bool);
  380. {From, Ref, {apply, M, F, A}} ->
  381. From ! {self(), Ref, apply(M, F, A)},
  382. t_loop(Selective);
  383. {From, Ref, {apply_fun, F}} ->
  384. From ! {self(), Ref, F()},
  385. t_loop(Selective);
  386. Other when not Selective ->
  387. ?debugFmt("got unknown msg: ~p~n", [Other]),
  388. exit({unknown_msg, Other})
  389. end.
  390. start_slaves(Ns) ->
  391. [H|T] = Nodes = [start_slave(N) || N <- Ns],
  392. _ = [rpc:call(H, net_adm, ping, [N]) || N <- T],
  393. Nodes.
  394. start_slave(Name) ->
  395. case node() of
  396. nonode@nohost ->
  397. os:cmd("epmd -daemon"),
  398. {ok, _} = net_kernel:start([gproc_master, shortnames]);
  399. _ ->
  400. ok
  401. end,
  402. {Pa, Pz} = paths(),
  403. Paths = "-pa ./ -pz ../ebin" ++
  404. lists:flatten([[" -pa " ++ Path || Path <- Pa],
  405. [" -pz " ++ Path || Path <- Pz]]),
  406. {ok, Node} = slave:start(host(), Name, Paths),
  407. %% io:fwrite(user, "Slave node: ~p~n", [Node]),
  408. Node.
  409. paths() ->
  410. Path = code:get_path(),
  411. {ok, [[Root]]} = init:get_argument(root),
  412. {Pas, Rest} = lists:splitwith(fun(P) ->
  413. not lists:prefix(Root, P)
  414. end, Path),
  415. {_, Pzs} = lists:splitwith(fun(P) ->
  416. lists:prefix(Root, P)
  417. end, Rest),
  418. {Pas, Pzs}.
  419. host() ->
  420. [_Name, Host] = re:split(atom_to_list(node()), "@", [{return, list}]),
  421. list_to_atom(Host).
  422. -endif.