pooler_test.erl 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. -module(pooler_test).
  2. -include_lib("eunit/include/eunit.hrl").
  3. -compile([export_all]).
  4. % The `user' processes represent users of the pooler library. A user
  5. % process will take a pid, report details on the pid it has, release
  6. % and take a new pid, stop cleanly, and crash.
  7. start_user() ->
  8. spawn(fun() ->
  9. TC = pooler:take_pid(),
  10. user_loop(TC)
  11. end).
  12. user_id(Pid) ->
  13. Pid ! {get_tc_id, self()},
  14. receive
  15. {Type, Id} ->
  16. {Type, Id}
  17. end.
  18. user_new_tc(Pid) ->
  19. Pid ! new_tc.
  20. user_stop(Pid) ->
  21. Pid ! stop.
  22. user_crash(Pid) ->
  23. Pid ! crash.
  24. user_loop(MyTC) ->
  25. receive
  26. {get_tc_id, From} ->
  27. From ! pooled_gs:get_id(MyTC),
  28. user_loop(MyTC);
  29. {ping_tc, From} ->
  30. From ! pooled_gs:ping(MyTC),
  31. user_loop(MyTC);
  32. {ping_count, From} ->
  33. From ! pooled_gs:ping_count(MyTC),
  34. user_loop(MyTC);
  35. new_tc ->
  36. pooler:return_pid(MyTC, ok),
  37. MyNewTC = pooler:take_pid(),
  38. user_loop(MyNewTC);
  39. stop ->
  40. pooler:return_pid(MyTC, ok),
  41. stopped;
  42. crash ->
  43. erlang:error({user_loop, kaboom})
  44. end.
  45. % The `tc' processes represent the pids tracked by pooler for testing.
  46. % They have a type and an ID and can report their type and ID and
  47. % stop.
  48. tc_loop({Type, Id}) ->
  49. receive
  50. {get_id, From} ->
  51. From ! {ok, Type, Id},
  52. tc_loop({Type, Id});
  53. stop -> stopped;
  54. crash ->
  55. erlang:error({tc_loop, kaboom})
  56. end.
  57. get_tc_id(Pid) ->
  58. Pid ! {get_id, self()},
  59. receive
  60. {ok, Type, Id} ->
  61. {Type, Id}
  62. after 200 ->
  63. timeout
  64. end.
  65. stop_tc(Pid) ->
  66. Pid ! stop.
  67. tc_starter(Type) ->
  68. Ref = make_ref(),
  69. spawn_link(fun() -> tc_loop({Type, Ref}) end).
  70. assert_tc_valid(Pid) ->
  71. ?assertMatch({_Type, _Ref}, get_tc_id(Pid)),
  72. ok.
  73. % tc_sanity_test() ->
  74. % Pid1 = tc_starter("1"),
  75. % {"1", Id1} = get_tc_id(Pid1),
  76. % Pid2 = tc_starter("1"),
  77. % {"1", Id2} = get_tc_id(Pid2),
  78. % ?assertNot(Id1 == Id2),
  79. % stop_tc(Pid1),
  80. % stop_tc(Pid2).
  81. % user_sanity_test() ->
  82. % Pid1 = tc_starter("1"),
  83. % User = spawn(fun() -> user_loop(Pid1) end),
  84. % ?assertMatch({"1", _Ref}, user_id(User)),
  85. % user_crash(User),
  86. % stop_tc(Pid1).
  87. pooler_basics_test_() ->
  88. {foreach,
  89. % setup
  90. fun() ->
  91. Pools = [[{name, "p1"},
  92. {max_count, 3},
  93. {init_count, 2},
  94. {start_mfa,
  95. {pooled_gs, start_link, [{"type-0"}]}}]],
  96. application:set_env(pooler, pools, Pools),
  97. application:start(pooler)
  98. end,
  99. fun(_X) ->
  100. application:stop(pooler)
  101. end,
  102. [
  103. {"take and return one",
  104. fun() ->
  105. P = pooler:take_pid(),
  106. ?assertMatch({"type-0", _Id}, pooled_gs:get_id(P)),
  107. ok = pooler:return_pid(P, ok)
  108. end},
  109. {"pids are created on demand until max",
  110. fun() ->
  111. Pids = [pooler:take_pid(), pooler:take_pid(), pooler:take_pid()],
  112. ?assertMatch(error_no_pids, pooler:take_pid()),
  113. ?assertMatch(error_no_pids, pooler:take_pid()),
  114. PRefs = [ R || {_T, R} <- [ pooled_gs:get_id(P) || P <- Pids ] ],
  115. % no duplicates
  116. ?assertEqual(length(PRefs), length(lists:usort(PRefs)))
  117. end
  118. },
  119. {"pids are reused most recent return first",
  120. fun() ->
  121. P1 = pooler:take_pid(),
  122. P2 = pooler:take_pid(),
  123. ?assertNot(P1 == P2),
  124. ok = pooler:return_pid(P1, ok),
  125. ok = pooler:return_pid(P2, ok),
  126. % pids are reused most recent first
  127. ?assertEqual(P2, pooler:take_pid()),
  128. ?assertEqual(P1, pooler:take_pid())
  129. end},
  130. {"if a pid crashes it is replaced",
  131. fun() ->
  132. Pids0 = [pooler:take_pid(), pooler:take_pid(), pooler:take_pid()],
  133. Ids0 = [ pooled_gs:get_id(P) || P <- Pids0 ],
  134. % crash them all
  135. [ pooled_gs:crash(P) || P <- Pids0 ],
  136. Pids1 = get_n_pids(3, []),
  137. Ids1 = [ pooled_gs:get_id(P) || P <- Pids1 ],
  138. [ ?assertNot(lists:member(I, Ids0)) || I <- Ids1 ]
  139. end
  140. },
  141. {"if a pid is returned with bad status it is replaced",
  142. fun() ->
  143. Pids0 = [pooler:take_pid(), pooler:take_pid(), pooler:take_pid()],
  144. Ids0 = [ pooled_gs:get_id(P) || P <- Pids0 ],
  145. % return them all marking as bad
  146. [ pooler:return_pid(P, fail) || P <- Pids0 ],
  147. Pids1 = get_n_pids(3, []),
  148. Ids1 = [ pooled_gs:get_id(P) || P <- Pids1 ],
  149. [ ?assertNot(lists:member(I, Ids0)) || I <- Ids1 ]
  150. end
  151. },
  152. {"if a consumer crashes, pid is replaced",
  153. fun() ->
  154. Consumer = start_user(),
  155. StartId = user_id(Consumer),
  156. user_crash(Consumer),
  157. NewPid = hd(get_n_pids(1, [])),
  158. NewId = pooled_gs:get_id(NewPid),
  159. ?assertNot(NewId == StartId)
  160. end
  161. }
  162. ]}.
  163. pooler_integration_test_() ->
  164. {foreach,
  165. % setup
  166. fun() ->
  167. Pools = [[{name, "p1"},
  168. {max_count, 10},
  169. {init_count, 10},
  170. {start_mfa,
  171. {pooled_gs, start_link, [{"type-0"}]}}]],
  172. application:set_env(pooler, pools, Pools),
  173. application:start(pooler),
  174. Users = [ start_user() || _X <- lists:seq(1, 10) ],
  175. Users
  176. end,
  177. % cleanup
  178. fun(Users) ->
  179. [ user_stop(U) || U <- Users ],
  180. application:stop(pooler)
  181. end,
  182. %
  183. [
  184. fun(Users) ->
  185. fun() ->
  186. % each user has a different tc ID
  187. TcIds = lists:sort([ user_id(UPid) || UPid <- Users ]),
  188. ?assertEqual(lists:usort(TcIds), TcIds)
  189. end
  190. end
  191. ,
  192. fun(Users) ->
  193. fun() ->
  194. % users still unique after a renew cycle
  195. [ user_new_tc(UPid) || UPid <- Users ],
  196. TcIds = lists:sort([ user_id(UPid) || UPid <- Users ]),
  197. ?assertEqual(lists:usort(TcIds), TcIds)
  198. end
  199. end
  200. ,
  201. fun(Users) ->
  202. fun() ->
  203. % all users crash, pids are replaced
  204. TcIds1 = lists:sort([ user_id(UPid) || UPid <- Users ]),
  205. [ user_crash(UPid) || UPid <- Users ],
  206. Seq = lists:seq(1, 5),
  207. Users2 = [ start_user() || _X <- Seq ],
  208. TcIds2 = lists:sort([ user_id(UPid) || UPid <- Users2 ]),
  209. Both =
  210. sets:to_list(sets:intersection([sets:from_list(TcIds1),
  211. sets:from_list(TcIds2)])),
  212. ?assertEqual([], Both)
  213. end
  214. end
  215. ]
  216. }.
  217. % testing crash recovery means race conditions when either pids
  218. % haven't yet crashed or pooler hasn't recovered. So this helper loops
  219. % forver until N pids are obtained, ignoring error_no_pids.
  220. get_n_pids(0, Acc) ->
  221. Acc;
  222. get_n_pids(N, Acc) ->
  223. case pooler:take_pid() of
  224. error_no_pids ->
  225. get_n_pids(N, Acc);
  226. Pid ->
  227. get_n_pids(N - 1, [Pid|Acc])
  228. end.