pooler_test.erl 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. -module(pooler_test).
  2. -include_lib("eunit/include/eunit.hrl").
  3. -compile([export_all]).
  4. % The `user' processes represent users of the pooler library. A user
  5. % process will take a pid, report details on the pid it has, release
  6. % and take a new pid, stop cleanly, and crash.
  7. start_user() ->
  8. spawn(fun() -> user_loop(start) end).
  9. user_id(Pid) ->
  10. Pid ! {get_tc_id, self()},
  11. receive
  12. {Type, Id} ->
  13. {Type, Id}
  14. end.
  15. user_new_tc(Pid) ->
  16. Pid ! new_tc.
  17. user_stop(Pid) ->
  18. Pid ! stop.
  19. user_crash(Pid) ->
  20. Pid ! crash.
  21. user_loop(Atom) when Atom =:= error_no_pids orelse Atom =:= start ->
  22. user_loop(pooler:take_member());
  23. user_loop(MyTC) ->
  24. receive
  25. {get_tc_id, From} ->
  26. From ! pooled_gs:get_id(MyTC),
  27. user_loop(MyTC);
  28. {ping_tc, From} ->
  29. From ! pooled_gs:ping(MyTC),
  30. user_loop(MyTC);
  31. {ping_count, From} ->
  32. From ! pooled_gs:ping_count(MyTC),
  33. user_loop(MyTC);
  34. new_tc ->
  35. pooler:return_member(MyTC, ok),
  36. MyNewTC = pooler:take_member(),
  37. user_loop(MyNewTC);
  38. stop ->
  39. pooler:return_member(MyTC, ok),
  40. stopped;
  41. crash ->
  42. erlang:error({user_loop, kaboom})
  43. end.
  44. % The `tc' processes represent the pids tracked by pooler for testing.
  45. % They have a type and an ID and can report their type and ID and
  46. % stop.
  47. tc_loop({Type, Id}) ->
  48. receive
  49. {get_id, From} ->
  50. From ! {ok, Type, Id},
  51. tc_loop({Type, Id});
  52. stop -> stopped;
  53. crash ->
  54. erlang:error({tc_loop, kaboom})
  55. end.
  56. get_tc_id(Pid) ->
  57. Pid ! {get_id, self()},
  58. receive
  59. {ok, Type, Id} ->
  60. {Type, Id}
  61. after 200 ->
  62. timeout
  63. end.
  64. stop_tc(Pid) ->
  65. Pid ! stop.
  66. tc_starter(Type) ->
  67. Ref = make_ref(),
  68. spawn_link(fun() -> tc_loop({Type, Ref}) end).
  69. assert_tc_valid(Pid) ->
  70. ?assertMatch({_Type, _Ref}, get_tc_id(Pid)),
  71. ok.
  72. % tc_sanity_test() ->
  73. % Pid1 = tc_starter("1"),
  74. % {"1", Id1} = get_tc_id(Pid1),
  75. % Pid2 = tc_starter("1"),
  76. % {"1", Id2} = get_tc_id(Pid2),
  77. % ?assertNot(Id1 == Id2),
  78. % stop_tc(Pid1),
  79. % stop_tc(Pid2).
  80. % user_sanity_test() ->
  81. % Pid1 = tc_starter("1"),
  82. % User = spawn(fun() -> user_loop(Pid1) end),
  83. % ?assertMatch({"1", _Ref}, user_id(User)),
  84. % user_crash(User),
  85. % stop_tc(Pid1).
  86. pooler_basics_test_() ->
  87. {foreach,
  88. % setup
  89. fun() ->
  90. Pools = [[{name, "p1"},
  91. {max_count, 3},
  92. {init_count, 2},
  93. {start_mfa,
  94. {pooled_gs, start_link, [{"type-0"}]}}]],
  95. application:set_env(pooler, pools, Pools),
  96. application:start(pooler)
  97. end,
  98. fun(_X) ->
  99. application:stop(pooler)
  100. end,
  101. [
  102. {"take and return one",
  103. fun() ->
  104. P = pooler:take_member(),
  105. ?assertMatch({"type-0", _Id}, pooled_gs:get_id(P)),
  106. ok = pooler:return_member(P, ok)
  107. end},
  108. {"pids are created on demand until max",
  109. fun() ->
  110. Pids = [pooler:take_member(), pooler:take_member(), pooler:take_member()],
  111. ?assertMatch(error_no_pids, pooler:take_member()),
  112. ?assertMatch(error_no_pids, pooler:take_member()),
  113. PRefs = [ R || {_T, R} <- [ pooled_gs:get_id(P) || P <- Pids ] ],
  114. % no duplicates
  115. ?assertEqual(length(PRefs), length(lists:usort(PRefs)))
  116. end
  117. },
  118. {"pids are reused most recent return first",
  119. fun() ->
  120. P1 = pooler:take_member(),
  121. P2 = pooler:take_member(),
  122. ?assertNot(P1 == P2),
  123. ok = pooler:return_member(P1, ok),
  124. ok = pooler:return_member(P2, ok),
  125. % pids are reused most recent first
  126. ?assertEqual(P2, pooler:take_member()),
  127. ?assertEqual(P1, pooler:take_member())
  128. end},
  129. {"if a pid crashes it is replaced",
  130. fun() ->
  131. Pids0 = [pooler:take_member(), pooler:take_member(), pooler:take_member()],
  132. Ids0 = [ pooled_gs:get_id(P) || P <- Pids0 ],
  133. % crash them all
  134. [ pooled_gs:crash(P) || P <- Pids0 ],
  135. Pids1 = get_n_pids(3, []),
  136. Ids1 = [ pooled_gs:get_id(P) || P <- Pids1 ],
  137. [ ?assertNot(lists:member(I, Ids0)) || I <- Ids1 ]
  138. end
  139. },
  140. {"if a pid is returned with bad status it is replaced",
  141. fun() ->
  142. Pids0 = [pooler:take_member(), pooler:take_member(), pooler:take_member()],
  143. Ids0 = [ pooled_gs:get_id(P) || P <- Pids0 ],
  144. % return them all marking as bad
  145. [ pooler:return_member(P, fail) || P <- Pids0 ],
  146. Pids1 = get_n_pids(3, []),
  147. Ids1 = [ pooled_gs:get_id(P) || P <- Pids1 ],
  148. [ ?assertNot(lists:member(I, Ids0)) || I <- Ids1 ]
  149. end
  150. },
  151. {"if a consumer crashes, pid is replaced",
  152. fun() ->
  153. Consumer = start_user(),
  154. StartId = user_id(Consumer),
  155. user_crash(Consumer),
  156. NewPid = hd(get_n_pids(1, [])),
  157. NewId = pooled_gs:get_id(NewPid),
  158. ?assertNot(NewId == StartId)
  159. end
  160. }
  161. ]}.
  162. pooler_integration_test_() ->
  163. {foreach,
  164. % setup
  165. fun() ->
  166. Pools = [[{name, "p1"},
  167. {max_count, 10},
  168. {init_count, 10},
  169. {start_mfa,
  170. {pooled_gs, start_link, [{"type-0"}]}}]],
  171. application:set_env(pooler, pools, Pools),
  172. application:start(pooler),
  173. Users = [ start_user() || _X <- lists:seq(1, 10) ],
  174. Users
  175. end,
  176. % cleanup
  177. fun(Users) ->
  178. [ user_stop(U) || U <- Users ],
  179. application:stop(pooler)
  180. end,
  181. %
  182. [
  183. fun(Users) ->
  184. fun() ->
  185. % each user has a different tc ID
  186. TcIds = lists:sort([ user_id(UPid) || UPid <- Users ]),
  187. ?assertEqual(lists:usort(TcIds), TcIds)
  188. end
  189. end
  190. ,
  191. fun(Users) ->
  192. fun() ->
  193. % users still unique after a renew cycle
  194. [ user_new_tc(UPid) || UPid <- Users ],
  195. TcIds = lists:sort([ user_id(UPid) || UPid <- Users ]),
  196. ?assertEqual(lists:usort(TcIds), TcIds)
  197. end
  198. end
  199. ,
  200. fun(Users) ->
  201. fun() ->
  202. % all users crash, pids are replaced
  203. TcIds1 = lists:sort([ user_id(UPid) || UPid <- Users ]),
  204. [ user_crash(UPid) || UPid <- Users ],
  205. Seq = lists:seq(1, 5),
  206. Users2 = [ start_user() || _X <- Seq ],
  207. TcIds2 = lists:sort([ user_id(UPid) || UPid <- Users2 ]),
  208. Both =
  209. sets:to_list(sets:intersection([sets:from_list(TcIds1),
  210. sets:from_list(TcIds2)])),
  211. ?assertEqual([], Both)
  212. end
  213. end
  214. ]
  215. }.
  216. % testing crash recovery means race conditions when either pids
  217. % haven't yet crashed or pooler hasn't recovered. So this helper loops
  218. % forver until N pids are obtained, ignoring error_no_pids.
  219. get_n_pids(0, Acc) ->
  220. Acc;
  221. get_n_pids(N, Acc) ->
  222. case pooler:take_member() of
  223. error_no_pids ->
  224. get_n_pids(N, Acc);
  225. Pid ->
  226. get_n_pids(N - 1, [Pid|Acc])
  227. end.