gproc_dist_tests.erl 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. %% ``The contents of this file are subject to the Erlang Public License,
  2. %% Version 1.1, (the "License"); you may not use this file except in
  3. %% compliance with the License. You should have received a copy of the
  4. %% Erlang Public License along with this software. If not, it can be
  5. %% retrieved via the world wide web at http://www.erlang.org/.
  6. %%
  7. %% Software distributed under the License is distributed on an "AS IS"
  8. %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  9. %% the License for the specific language governing rights and limitations
  10. %% under the License.
  11. %%
  12. %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
  13. %% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
  14. %% AB. All Rights Reserved.''
  15. %%
  16. %% @author Ulf Wiger <ulf.wiger@erlang-solutions.com>
  17. %%
  18. -module(gproc_dist_tests).
  19. -ifdef(TEST).
  20. -include_lib("eunit/include/eunit.hrl").
  21. -export([t_spawn/1, t_spawn_reg/2]).
  22. dist_test_() ->
  23. {timeout, 120,
  24. [{setup,
  25. fun() ->
  26. Ns = start_slaves([dist_test_n1, dist_test_n2]),
  27. ?assertMatch({[ok,ok],[]},
  28. rpc:multicall(Ns, application, set_env,
  29. [gproc, gproc_dist, Ns])),
  30. ?assertMatch({[ok,ok],[]},
  31. rpc:multicall(Ns, application, start, [gproc])),
  32. Ns
  33. end,
  34. fun(Ns) ->
  35. [rpc:call(N, init, stop, []) || N <- Ns]
  36. end,
  37. fun(Ns) ->
  38. {inorder,
  39. [
  40. {inparallel, [
  41. fun() ->
  42. ?debugVal(t_simple_reg(Ns))
  43. end,
  44. fun() ->
  45. ?debugVal(t_mreg(Ns))
  46. end,
  47. fun() ->
  48. ?debugVal(t_await_reg(Ns))
  49. end,
  50. fun() ->
  51. ?debugVal(t_await_self(Ns))
  52. end,
  53. fun() ->
  54. ?debugVal(t_await_reg_exists(Ns))
  55. end,
  56. fun() ->
  57. ?debugVal(t_give_away(Ns))
  58. end,
  59. fun() ->
  60. ?debugVal(t_sync(Ns))
  61. end
  62. ]
  63. },
  64. fun() ->
  65. ?debugVal(t_sync_cand_dies(Ns))
  66. end,
  67. {timeout, 90, [fun() ->
  68. ?debugVal(t_fail_node(Ns))
  69. end]}
  70. ]}
  71. end
  72. }]}.
  73. -define(T_NAME, {n, g, {?MODULE, ?LINE}}).
  74. -define(T_KVL, [{foo, "foo"}, {bar, "bar"}]).
  75. t_simple_reg([H|_] = Ns) ->
  76. Name = ?T_NAME,
  77. P = t_spawn_reg(H, Name),
  78. ?assertMatch(ok, t_lookup_everywhere(Name, Ns, P)),
  79. ?assertMatch(true, t_call(P, {apply, gproc, unreg, [Name]})),
  80. ?assertMatch(ok, t_lookup_everywhere(Name, Ns, undefined)),
  81. ?assertMatch(ok, t_call(P, die)).
  82. t_mreg([H|_] = Ns) ->
  83. Kvl = ?T_KVL,
  84. Keys = [K || {K,_} <- Kvl],
  85. P = t_spawn_mreg(H, Kvl),
  86. [?assertMatch(ok, t_lookup_everywhere({n,g,K}, Ns, P)) || K <- Keys],
  87. ?assertMatch(true, t_call(P, {apply, gproc, munreg, [n, g, Keys]})),
  88. [?assertMatch(ok, t_lookup_everywhere({n,g,K},Ns,undefined)) || K <- Keys],
  89. ?assertMatch(ok, t_call(P, die)).
  90. t_await_reg([A,B|_]) ->
  91. Name = ?T_NAME,
  92. P = t_spawn(A),
  93. Ref = erlang:monitor(process, P),
  94. P ! {self(), Ref, {apply, gproc, await, [Name]}},
  95. t_sleep(),
  96. P1 = t_spawn_reg(B, Name),
  97. ?assert(P1 == receive
  98. {P, Ref, Res} ->
  99. element(1, Res);
  100. {'DOWN', Ref, _, _, Reason} ->
  101. erlang:error(Reason);
  102. Other ->
  103. erlang:error({received,Other})
  104. end),
  105. ?assertMatch(ok, t_call(P, die)),
  106. ?assertMatch(ok, t_call(P1, die)).
  107. t_await_self([A|_]) ->
  108. Name = ?T_NAME,
  109. P = t_spawn(A, false), % buffer unknowns
  110. Ref = t_call(P, {apply, gproc, nb_wait, [Name]}),
  111. ?assertMatch(ok, t_call(P, {selective, true})),
  112. ?assertMatch(true, t_call(P, {apply, gproc, reg, [Name, some_value]})),
  113. ?assertMatch({registered, {Name, P, some_value}},
  114. t_call(P, {apply_fun, fun() ->
  115. receive
  116. {gproc, Ref, R, Wh} ->
  117. {R, Wh}
  118. after 10000 ->
  119. timeout
  120. end
  121. end})),
  122. ?assertMatch(ok, t_call(P, {selective, false})),
  123. ?assertMatch(true, t_call(P, {apply, gproc, unreg, [Name]})).
  124. t_await_reg_exists([A,B|_]) ->
  125. Name = ?T_NAME,
  126. P = t_spawn(A),
  127. Ref = erlang:monitor(process, P),
  128. P1 = t_spawn_reg(B, Name),
  129. P ! {self(), Ref, {apply, gproc, await, [Name]}},
  130. ?assert(P1 == receive
  131. {P, Ref, Res} ->
  132. element(1, Res);
  133. {'DOWN', Ref, _, _, Reason} ->
  134. erlang:error(Reason);
  135. Other ->
  136. erlang:error({received,Other})
  137. end),
  138. ?assertMatch(ok, t_call(P, die)),
  139. ?assertMatch(ok, t_call(P1, die)).
  140. t_give_away([A,B|_] = Ns) ->
  141. Na = ?T_NAME,
  142. Nb = ?T_NAME,
  143. Pa = t_spawn_reg(A, Na),
  144. Pb = t_spawn_reg(B, Nb),
  145. ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pa)),
  146. ?assertMatch(ok, t_lookup_everywhere(Nb, Ns, Pb)),
  147. ?assertMatch(Pb, t_call(Pa, {apply, gproc, give_away, [Na, Nb]})),
  148. ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pb)),
  149. ?assertMatch(Pa, t_call(Pb, {apply, gproc, give_away, [Na, Pa]})),
  150. ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pa)),
  151. ?assertMatch(ok, t_call(Pa, die)),
  152. ?assertMatch(ok, t_call(Pb, die)).
  153. t_sync(Ns) ->
  154. %% Don't really know how to test this...
  155. [?assertMatch(true, rpc:call(N, gproc_dist, sync, []))
  156. || N <- Ns].
  157. %% Verify that the gproc_dist:sync() call returns true even if a candidate dies
  158. %% while the sync is underway. This test makes use of sys:suspend() to ensure that
  159. %% the other candidate doesn't respond too quickly.
  160. t_sync_cand_dies([A,B|_]) ->
  161. Leader = rpc:call(A, gproc_dist, get_leader, []),
  162. Other = case Leader of
  163. A -> B;
  164. B -> A
  165. end,
  166. ?assertMatch(ok, rpc:call(Other, sys, suspend, [gproc_dist])),
  167. P = rpc:call(Other, erlang, whereis, [gproc_dist]),
  168. Key = rpc:async_call(Leader, gproc_dist, sync, []),
  169. %% The overall timeout for gproc_dist:sync() is 5 seconds. Here, we should
  170. %% still be waiting.
  171. ?assertMatch(timeout, rpc:nb_yield(Key, 1000)),
  172. exit(P, kill),
  173. %% The leader should detect that the other candidate died and respond
  174. %% immediately. Therefore, we should have our answer well within 1 sec.
  175. ?assertMatch({value, true}, rpc:nb_yield(Key, 1000)).
  176. t_fail_node([A,B|_] = Ns) ->
  177. Na = ?T_NAME,
  178. Nb = ?T_NAME,
  179. Pa = t_spawn_reg(A, Na),
  180. Pb = t_spawn_reg(B, Nb),
  181. ?assertMatch(ok, rpc:call(A, application, stop, [gproc])),
  182. ?assertMatch(ok, t_lookup_everywhere(Na, Ns -- [A], undefined)),
  183. ?assertMatch(ok, t_lookup_everywhere(Nb, Ns -- [A], Pb)),
  184. ?assertMatch(ok, rpc:call(A, application, start, [gproc])),
  185. ?assertMatch(ok, t_lookup_everywhere(Na, Ns, undefined)),
  186. ?assertMatch(ok, t_lookup_everywhere(Nb, Ns, Pb)),
  187. ?assertMatch(ok, t_call(Pa, die)),
  188. ?assertMatch(ok, t_call(Pb, die)).
  189. t_sleep() ->
  190. timer:sleep(500).
  191. t_lookup_everywhere(Key, Nodes, Exp) ->
  192. t_lookup_everywhere(Key, Nodes, Exp, 3).
  193. t_lookup_everywhere(Key, _, Exp, 0) ->
  194. {lookup_failed, Key, Exp};
  195. t_lookup_everywhere(Key, Nodes, Exp, I) ->
  196. Expected = [{N, Exp} || N <- Nodes],
  197. Found = [{N,rpc:call(N, gproc, where, [Key])} || N <- Nodes],
  198. if Expected =/= Found ->
  199. ?debugFmt("lookup ~p failed (~p), retrying...~n", [Key, Found]),
  200. t_sleep(),
  201. t_lookup_everywhere(Key, Nodes, Exp, I-1);
  202. true ->
  203. ok
  204. end.
  205. t_spawn(Node) ->
  206. t_spawn(Node, false).
  207. t_spawn(Node, Selective) when is_boolean(Selective) ->
  208. Me = self(),
  209. P = spawn(Node, fun() ->
  210. Me ! {self(), ok},
  211. t_loop(Selective)
  212. end),
  213. receive
  214. {P, ok} -> P
  215. end.
  216. t_spawn_reg(Node, Name) ->
  217. Me = self(),
  218. spawn(Node, fun() ->
  219. ?assertMatch(true, gproc:reg(Name)),
  220. Me ! {self(), ok},
  221. t_loop()
  222. end),
  223. receive
  224. {P, ok} -> P
  225. end.
  226. t_spawn_mreg(Node, KVL) ->
  227. Me = self(),
  228. spawn(Node, fun() ->
  229. ?assertMatch(true, gproc:mreg(n, g, KVL)),
  230. Me ! {self(), ok},
  231. t_loop()
  232. end),
  233. receive
  234. {P, ok} -> P
  235. end.
  236. t_call(P, Req) ->
  237. Ref = erlang:monitor(process, P),
  238. P ! {self(), Ref, Req},
  239. receive
  240. {P, Ref, Res} ->
  241. erlang:demonitor(Ref),
  242. Res;
  243. {'DOWN', Ref, _, _, Error} ->
  244. erlang:error({'DOWN', P, Error})
  245. end.
  246. t_loop() ->
  247. t_loop(false).
  248. t_loop(Selective) when is_boolean(Selective) ->
  249. receive
  250. {From, Ref, die} ->
  251. From ! {self(), Ref, ok};
  252. {From, Ref, {selective, Bool}} when is_boolean(Bool) ->
  253. From ! {self(), Ref, ok},
  254. t_loop(Bool);
  255. {From, Ref, {apply, M, F, A}} ->
  256. From ! {self(), Ref, apply(M, F, A)},
  257. t_loop(Selective);
  258. {From, Ref, {apply_fun, F}} ->
  259. From ! {self(), Ref, F()},
  260. t_loop(Selective);
  261. Other when not Selective ->
  262. ?debugFmt("got unknown msg: ~p~n", [Other]),
  263. exit({unknown_msg, Other})
  264. end.
  265. start_slaves(Ns) ->
  266. [H|T] = Nodes = [start_slave(N) || N <- Ns],
  267. _ = [rpc:call(H, net, ping, [N]) || N <- T],
  268. Nodes.
  269. start_slave(Name) ->
  270. case node() of
  271. nonode@nohost ->
  272. os:cmd("epmd -daemon"),
  273. {ok, _} = net_kernel:start([gproc_master, shortnames]);
  274. _ ->
  275. ok
  276. end,
  277. {ok, Node} = slave:start(
  278. host(), Name,
  279. "-pa . -pz ../ebin -pa ../deps/gen_leader/ebin "),
  280. %% io:fwrite(user, "Slave node: ~p~n", [Node]),
  281. Node.
  282. host() ->
  283. [_Name, Host] = re:split(atom_to_list(node()), "@", [{return, list}]),
  284. list_to_atom(Host).
  285. -endif.