pooler_test.erl 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. -module(pooler_test).
  2. -include_lib("eunit/include/eunit.hrl").
  3. -compile([export_all]).
  4. % The `user' processes represent users of the pooler library. A user
  5. % process will take a pid, report details on the pid it has, release
  6. % and take a new pid, stop cleanly, and crash.
  7. start_user() ->
  8. spawn(fun() -> user_loop(start) end).
  9. user_id(Pid) ->
  10. Pid ! {get_tc_id, self()},
  11. receive
  12. {Type, Id} ->
  13. {Type, Id}
  14. end.
  15. user_new_tc(Pid) ->
  16. Pid ! new_tc.
  17. user_stop(Pid) ->
  18. Pid ! stop.
  19. user_crash(Pid) ->
  20. Pid ! crash.
  21. user_loop(Atom) when Atom =:= error_no_members orelse Atom =:= start ->
  22. user_loop(pooler:take_member());
  23. user_loop(MyTC) ->
  24. receive
  25. {get_tc_id, From} ->
  26. From ! pooled_gs:get_id(MyTC),
  27. user_loop(MyTC);
  28. {ping_tc, From} ->
  29. From ! pooled_gs:ping(MyTC),
  30. user_loop(MyTC);
  31. {ping_count, From} ->
  32. From ! pooled_gs:ping_count(MyTC),
  33. user_loop(MyTC);
  34. new_tc ->
  35. pooler:return_member(MyTC, ok),
  36. MyNewTC = pooler:take_member(),
  37. user_loop(MyNewTC);
  38. stop ->
  39. pooler:return_member(MyTC, ok),
  40. stopped;
  41. crash ->
  42. erlang:error({user_loop, kaboom})
  43. end.
  44. % The `tc' processes represent the pids tracked by pooler for testing.
  45. % They have a type and an ID and can report their type and ID and
  46. % stop.
  47. tc_loop({Type, Id}) ->
  48. receive
  49. {get_id, From} ->
  50. From ! {ok, Type, Id},
  51. tc_loop({Type, Id});
  52. stop -> stopped;
  53. crash ->
  54. erlang:error({tc_loop, kaboom})
  55. end.
  56. get_tc_id(Pid) ->
  57. Pid ! {get_id, self()},
  58. receive
  59. {ok, Type, Id} ->
  60. {Type, Id}
  61. after 200 ->
  62. timeout
  63. end.
  64. stop_tc(Pid) ->
  65. Pid ! stop.
  66. tc_starter(Type) ->
  67. Ref = make_ref(),
  68. spawn_link(fun() -> tc_loop({Type, Ref}) end).
  69. assert_tc_valid(Pid) ->
  70. ?assertMatch({_Type, _Ref}, get_tc_id(Pid)),
  71. ok.
  72. % tc_sanity_test() ->
  73. % Pid1 = tc_starter("1"),
  74. % {"1", Id1} = get_tc_id(Pid1),
  75. % Pid2 = tc_starter("1"),
  76. % {"1", Id2} = get_tc_id(Pid2),
  77. % ?assertNot(Id1 == Id2),
  78. % stop_tc(Pid1),
  79. % stop_tc(Pid2).
  80. % user_sanity_test() ->
  81. % Pid1 = tc_starter("1"),
  82. % User = spawn(fun() -> user_loop(Pid1) end),
  83. % ?assertMatch({"1", _Ref}, user_id(User)),
  84. % user_crash(User),
  85. % stop_tc(Pid1).
  86. pooler_basics_test_() ->
  87. {foreach,
  88. % setup
  89. fun() ->
  90. Pools = [[{name, "p1"},
  91. {max_count, 3},
  92. {init_count, 2},
  93. {start_mfa,
  94. {pooled_gs, start_link, [{"type-0"}]}}]],
  95. application:set_env(pooler, pools, Pools),
  96. error_logger:delete_report_handler(error_logger_tty_h),
  97. application:start(pooler)
  98. end,
  99. fun(_X) ->
  100. application:stop(pooler)
  101. end,
  102. [
  103. {"there are init_count members at start",
  104. fun() ->
  105. Stats = [ P || {P, {_, free, _}} <- pooler:pool_stats() ],
  106. ?assertEqual(2, length(Stats))
  107. end},
  108. {"take and return one",
  109. fun() ->
  110. P = pooler:take_member(),
  111. ?assertMatch({"type-0", _Id}, pooled_gs:get_id(P)),
  112. ok = pooler:return_member(P, ok)
  113. end},
  114. {"pids are created on demand until max",
  115. fun() ->
  116. Pids = [pooler:take_member(), pooler:take_member(), pooler:take_member()],
  117. ?assertMatch(error_no_members, pooler:take_member()),
  118. ?assertMatch(error_no_members, pooler:take_member()),
  119. PRefs = [ R || {_T, R} <- [ pooled_gs:get_id(P) || P <- Pids ] ],
  120. % no duplicates
  121. ?assertEqual(length(PRefs), length(lists:usort(PRefs)))
  122. end
  123. },
  124. {"pids are reused most recent return first",
  125. fun() ->
  126. P1 = pooler:take_member(),
  127. P2 = pooler:take_member(),
  128. ?assertNot(P1 == P2),
  129. ok = pooler:return_member(P1, ok),
  130. ok = pooler:return_member(P2, ok),
  131. % pids are reused most recent first
  132. ?assertEqual(P2, pooler:take_member()),
  133. ?assertEqual(P1, pooler:take_member())
  134. end},
  135. {"if an in-use pid crashes it is replaced",
  136. fun() ->
  137. Pids0 = [pooler:take_member(), pooler:take_member(),
  138. pooler:take_member()],
  139. Ids0 = [ pooled_gs:get_id(P) || P <- Pids0 ],
  140. % crash them all
  141. [ pooled_gs:crash(P) || P <- Pids0 ],
  142. Pids1 = get_n_pids(3, []),
  143. Ids1 = [ pooled_gs:get_id(P) || P <- Pids1 ],
  144. [ ?assertNot(lists:member(I, Ids0)) || I <- Ids1 ]
  145. end
  146. },
  147. {"if a free pid crashes it is replaced",
  148. fun() ->
  149. FreePids = [ P || {P, {_, free, _}} <- pooler:pool_stats() ],
  150. [ exit(P, kill) || P <- FreePids ],
  151. Pids1 = get_n_pids(3, []),
  152. ?assertEqual(3, length(Pids1))
  153. end},
  154. {"if a pid is returned with bad status it is replaced",
  155. fun() ->
  156. Pids0 = [pooler:take_member(), pooler:take_member(), pooler:take_member()],
  157. Ids0 = [ pooled_gs:get_id(P) || P <- Pids0 ],
  158. % return them all marking as bad
  159. [ pooler:return_member(P, fail) || P <- Pids0 ],
  160. Pids1 = get_n_pids(3, []),
  161. Ids1 = [ pooled_gs:get_id(P) || P <- Pids1 ],
  162. [ ?assertNot(lists:member(I, Ids0)) || I <- Ids1 ]
  163. end
  164. },
  165. {"if a consumer crashes, pid is replaced",
  166. fun() ->
  167. Consumer = start_user(),
  168. StartId = user_id(Consumer),
  169. user_crash(Consumer),
  170. NewPid = hd(get_n_pids(1, [])),
  171. NewId = pooled_gs:get_id(NewPid),
  172. ?assertNot(NewId == StartId)
  173. end
  174. }
  175. ]}.
  176. pooler_integration_test_() ->
  177. {foreach,
  178. % setup
  179. fun() ->
  180. Pools = [[{name, "p1"},
  181. {max_count, 10},
  182. {init_count, 10},
  183. {start_mfa,
  184. {pooled_gs, start_link, [{"type-0"}]}}]],
  185. application:set_env(pooler, pools, Pools),
  186. error_logger:delete_report_handler(error_logger_tty_h),
  187. application:start(pooler),
  188. Users = [ start_user() || _X <- lists:seq(1, 10) ],
  189. Users
  190. end,
  191. % cleanup
  192. fun(Users) ->
  193. [ user_stop(U) || U <- Users ],
  194. application:stop(pooler)
  195. end,
  196. %
  197. [
  198. fun(Users) ->
  199. fun() ->
  200. % each user has a different tc ID
  201. TcIds = lists:sort([ user_id(UPid) || UPid <- Users ]),
  202. ?assertEqual(lists:usort(TcIds), TcIds)
  203. end
  204. end
  205. ,
  206. fun(Users) ->
  207. fun() ->
  208. % users still unique after a renew cycle
  209. [ user_new_tc(UPid) || UPid <- Users ],
  210. TcIds = lists:sort([ user_id(UPid) || UPid <- Users ]),
  211. ?assertEqual(lists:usort(TcIds), TcIds)
  212. end
  213. end
  214. ,
  215. fun(Users) ->
  216. fun() ->
  217. % all users crash, pids are replaced
  218. TcIds1 = lists:sort([ user_id(UPid) || UPid <- Users ]),
  219. [ user_crash(UPid) || UPid <- Users ],
  220. Seq = lists:seq(1, 5),
  221. Users2 = [ start_user() || _X <- Seq ],
  222. TcIds2 = lists:sort([ user_id(UPid) || UPid <- Users2 ]),
  223. Both =
  224. sets:to_list(sets:intersection([sets:from_list(TcIds1),
  225. sets:from_list(TcIds2)])),
  226. ?assertEqual([], Both)
  227. end
  228. end
  229. ]
  230. }.
  231. % testing crash recovery means race conditions when either pids
  232. % haven't yet crashed or pooler hasn't recovered. So this helper loops
  233. % forver until N pids are obtained, ignoring error_no_members.
  234. get_n_pids(0, Acc) ->
  235. Acc;
  236. get_n_pids(N, Acc) ->
  237. case pooler:take_member() of
  238. error_no_members ->
  239. get_n_pids(N, Acc);
  240. Pid ->
  241. get_n_pids(N - 1, [Pid|Acc])
  242. end.