pooler_test.erl 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. -module(pooler_test).
  2. -include_lib("eunit/include/eunit.hrl").
  3. -compile([export_all]).
  4. % The `user' processes represent users of the pooler library. A user
  5. % process will take a pid, report details on the pid it has, release
  6. % and take a new pid, stop cleanly, and crash.
  7. start_user() ->
  8. spawn(fun() -> user_loop(start) end).
  9. user_id(Pid) ->
  10. Pid ! {get_tc_id, self()},
  11. receive
  12. {Type, Id} ->
  13. {Type, Id}
  14. end.
  15. user_new_tc(Pid) ->
  16. Pid ! new_tc.
  17. user_stop(Pid) ->
  18. Pid ! stop.
  19. user_crash(Pid) ->
  20. Pid ! crash.
  21. user_loop(Atom) when Atom =:= error_no_members orelse Atom =:= start ->
  22. user_loop(pooler:take_member());
  23. user_loop(MyTC) ->
  24. receive
  25. {get_tc_id, From} ->
  26. From ! pooled_gs:get_id(MyTC),
  27. user_loop(MyTC);
  28. {ping_tc, From} ->
  29. From ! pooled_gs:ping(MyTC),
  30. user_loop(MyTC);
  31. {ping_count, From} ->
  32. From ! pooled_gs:ping_count(MyTC),
  33. user_loop(MyTC);
  34. new_tc ->
  35. pooler:return_member(MyTC, ok),
  36. MyNewTC = pooler:take_member(),
  37. user_loop(MyNewTC);
  38. stop ->
  39. pooler:return_member(MyTC, ok),
  40. stopped;
  41. crash ->
  42. erlang:error({user_loop, kaboom})
  43. end.
  44. % The `tc' processes represent the pids tracked by pooler for testing.
  45. % They have a type and an ID and can report their type and ID and
  46. % stop.
  47. tc_loop({Type, Id}) ->
  48. receive
  49. {get_id, From} ->
  50. From ! {ok, Type, Id},
  51. tc_loop({Type, Id});
  52. stop -> stopped;
  53. crash ->
  54. erlang:error({tc_loop, kaboom})
  55. end.
  56. get_tc_id(Pid) ->
  57. Pid ! {get_id, self()},
  58. receive
  59. {ok, Type, Id} ->
  60. {Type, Id}
  61. after 200 ->
  62. timeout
  63. end.
  64. stop_tc(Pid) ->
  65. Pid ! stop.
  66. tc_starter(Type) ->
  67. Ref = make_ref(),
  68. spawn_link(fun() -> tc_loop({Type, Ref}) end).
  69. assert_tc_valid(Pid) ->
  70. ?assertMatch({_Type, _Ref}, get_tc_id(Pid)),
  71. ok.
  72. % tc_sanity_test() ->
  73. % Pid1 = tc_starter("1"),
  74. % {"1", Id1} = get_tc_id(Pid1),
  75. % Pid2 = tc_starter("1"),
  76. % {"1", Id2} = get_tc_id(Pid2),
  77. % ?assertNot(Id1 == Id2),
  78. % stop_tc(Pid1),
  79. % stop_tc(Pid2).
  80. % user_sanity_test() ->
  81. % Pid1 = tc_starter("1"),
  82. % User = spawn(fun() -> user_loop(Pid1) end),
  83. % ?assertMatch({"1", _Ref}, user_id(User)),
  84. % user_crash(User),
  85. % stop_tc(Pid1).
  86. pooler_basics_test_() ->
  87. {setup,
  88. fun() ->
  89. application:set_env(pooler, metrics_module, fake_metrics),
  90. fake_metrics:start_link()
  91. end,
  92. fun(_X) ->
  93. fake_metrics:stop()
  94. end,
  95. {foreach,
  96. % setup
  97. fun() ->
  98. Pools = [[{name, "p1"},
  99. {max_count, 3},
  100. {init_count, 2},
  101. {start_mfa,
  102. {pooled_gs, start_link, [{"type-0"}]}}]],
  103. application:set_env(pooler, pools, Pools),
  104. error_logger:delete_report_handler(error_logger_tty_h),
  105. application:start(pooler)
  106. end,
  107. fun(_X) ->
  108. application:stop(pooler)
  109. end,
  110. [
  111. {"there are init_count members at start",
  112. fun() ->
  113. Stats = [ P || {P, {_, free, _}} <- pooler:pool_stats() ],
  114. ?assertEqual(2, length(Stats))
  115. end},
  116. {"take and return one",
  117. fun() ->
  118. P = pooler:take_member(),
  119. ?assertMatch({"type-0", _Id}, pooled_gs:get_id(P)),
  120. ok = pooler:return_member(P, ok)
  121. end},
  122. {"take and return one, named pool",
  123. fun() ->
  124. P = pooler:take_member("p1"),
  125. ?assertMatch({"type-0", _Id}, pooled_gs:get_id(P)),
  126. ok, pooler:return_member(P)
  127. end},
  128. {"attempt to take form unknown pool",
  129. fun() ->
  130. ?assertEqual(error_no_pool, pooler:take_member("bad_pool_name"))
  131. end},
  132. {"pids are created on demand until max",
  133. fun() ->
  134. Pids = [pooler:take_member(), pooler:take_member(), pooler:take_member()],
  135. ?assertMatch(error_no_members, pooler:take_member()),
  136. ?assertMatch(error_no_members, pooler:take_member()),
  137. PRefs = [ R || {_T, R} <- [ pooled_gs:get_id(P) || P <- Pids ] ],
  138. % no duplicates
  139. ?assertEqual(length(PRefs), length(lists:usort(PRefs)))
  140. end
  141. },
  142. {"pids are reused most recent return first",
  143. fun() ->
  144. P1 = pooler:take_member(),
  145. P2 = pooler:take_member(),
  146. ?assertNot(P1 == P2),
  147. ok = pooler:return_member(P1, ok),
  148. ok = pooler:return_member(P2, ok),
  149. % pids are reused most recent first
  150. ?assertEqual(P2, pooler:take_member()),
  151. ?assertEqual(P1, pooler:take_member())
  152. end},
  153. {"if an in-use pid crashes it is replaced",
  154. fun() ->
  155. Pids0 = [pooler:take_member(), pooler:take_member(),
  156. pooler:take_member()],
  157. Ids0 = [ pooled_gs:get_id(P) || P <- Pids0 ],
  158. % crash them all
  159. [ pooled_gs:crash(P) || P <- Pids0 ],
  160. Pids1 = get_n_pids(3, []),
  161. Ids1 = [ pooled_gs:get_id(P) || P <- Pids1 ],
  162. [ ?assertNot(lists:member(I, Ids0)) || I <- Ids1 ]
  163. end
  164. },
  165. {"if a free pid crashes it is replaced",
  166. fun() ->
  167. FreePids = [ P || {P, {_, free, _}} <- pooler:pool_stats() ],
  168. [ exit(P, kill) || P <- FreePids ],
  169. Pids1 = get_n_pids(3, []),
  170. ?assertEqual(3, length(Pids1))
  171. end},
  172. {"if a pid is returned with bad status it is replaced",
  173. fun() ->
  174. Pids0 = [pooler:take_member(), pooler:take_member(), pooler:take_member()],
  175. Ids0 = [ pooled_gs:get_id(P) || P <- Pids0 ],
  176. % return them all marking as bad
  177. [ pooler:return_member(P, fail) || P <- Pids0 ],
  178. Pids1 = get_n_pids(3, []),
  179. Ids1 = [ pooled_gs:get_id(P) || P <- Pids1 ],
  180. [ ?assertNot(lists:member(I, Ids0)) || I <- Ids1 ]
  181. end
  182. },
  183. {"if a consumer crashes, pid is replaced",
  184. fun() ->
  185. Consumer = start_user(),
  186. StartId = user_id(Consumer),
  187. user_crash(Consumer),
  188. NewPid = hd(get_n_pids(1, [])),
  189. NewId = pooled_gs:get_id(NewPid),
  190. ?assertNot(NewId == StartId)
  191. end
  192. },
  193. {"it is ok to return an unknown pid",
  194. fun() ->
  195. Bogus1 = spawn(fun() -> ok end),
  196. Bogus2 = spawn(fun() -> ok end),
  197. ?assertEqual(ok, pooler:return_member(Bogus1, ok)),
  198. ?assertEqual(ok, pooler:return_member(Bogus2, fail))
  199. end
  200. },
  201. {"calling return_member on error_no_members is ignored",
  202. fun() ->
  203. ?assertEqual(ok, pooler:return_member(error_no_members)),
  204. ?assertEqual(ok, pooler:return_member(error_no_members, ok)),
  205. ?assertEqual(ok, pooler:return_member(error_no_members, fail))
  206. end
  207. },
  208. {"cull_pool can be called and do nothing",
  209. %% FIXME: this exercises the code path, but doesn't test anything
  210. fun() ->
  211. ?assertEqual(ok, pooler:cull_pool("p1", 10))
  212. end
  213. },
  214. {"cull_pool culls unused members",
  215. fun() ->
  216. %% take all
  217. [P1, P2, _P3] = [pooler:take_member(), pooler:take_member(), pooler:take_member()],
  218. %% return one
  219. pooler:return_member(P1),
  220. pooler:return_member(P2),
  221. %% call a sync action since return_member is async
  222. _Ignore = pooler:pool_stats(),
  223. ?assertEqual(ok, pooler:cull_pool("p1", 0)),
  224. ?assertEqual(2, length(pooler:pool_stats()))
  225. end
  226. },
  227. {"metrics have been called",
  228. fun() ->
  229. %% exercise the API to ensure we have certain keys reported as metrics
  230. fake_metrics:reset_metrics(),
  231. Pids = [ pooler:take_member() || _I <- lists:seq(1, 10) ],
  232. [ pooler:return_member(P) || P <- Pids ],
  233. pooler:take_member("bad_pool_name"),
  234. %% kill and unused member
  235. exit(hd(Pids), kill),
  236. %% kill a used member
  237. KillMe = pooler:take_member("p1"),
  238. exit(KillMe, kill),
  239. %% FIXME: We need to wait for pooler to process the
  240. %% exit message. This is ugly, will fix later.
  241. timer:sleep(200), % :(
  242. ExpectKeys = [<<"pooler.error_no_members_count">>,
  243. <<"pooler.events">>,
  244. <<"pooler.killed_free_count">>,
  245. <<"pooler.killed_in_use_count">>,
  246. <<"pooler.p1.free_count">>,
  247. <<"pooler.p1.in_use_count">>,
  248. <<"pooler.p1.take_rate">>],
  249. Metrics = fake_metrics:get_metrics(),
  250. GotKeys = lists:usort([ Name || {Name, _, _} <- Metrics ]),
  251. ?assertEqual(ExpectKeys, GotKeys)
  252. end}
  253. ]}}.
  254. pooler_integration_test_() ->
  255. {foreach,
  256. % setup
  257. fun() ->
  258. Pools = [[{name, "p1"},
  259. {max_count, 10},
  260. {init_count, 10},
  261. {start_mfa,
  262. {pooled_gs, start_link, [{"type-0"}]}}]],
  263. application:set_env(pooler, pools, Pools),
  264. error_logger:delete_report_handler(error_logger_tty_h),
  265. application:start(pooler),
  266. Users = [ start_user() || _X <- lists:seq(1, 10) ],
  267. Users
  268. end,
  269. % cleanup
  270. fun(Users) ->
  271. [ user_stop(U) || U <- Users ],
  272. application:stop(pooler)
  273. end,
  274. %
  275. [
  276. fun(Users) ->
  277. fun() ->
  278. % each user has a different tc ID
  279. TcIds = lists:sort([ user_id(UPid) || UPid <- Users ]),
  280. ?assertEqual(lists:usort(TcIds), TcIds)
  281. end
  282. end
  283. ,
  284. fun(Users) ->
  285. fun() ->
  286. % users still unique after a renew cycle
  287. [ user_new_tc(UPid) || UPid <- Users ],
  288. TcIds = lists:sort([ user_id(UPid) || UPid <- Users ]),
  289. ?assertEqual(lists:usort(TcIds), TcIds)
  290. end
  291. end
  292. ,
  293. fun(Users) ->
  294. fun() ->
  295. % all users crash, pids are replaced
  296. TcIds1 = lists:sort([ user_id(UPid) || UPid <- Users ]),
  297. [ user_crash(UPid) || UPid <- Users ],
  298. Seq = lists:seq(1, 5),
  299. Users2 = [ start_user() || _X <- Seq ],
  300. TcIds2 = lists:sort([ user_id(UPid) || UPid <- Users2 ]),
  301. Both =
  302. sets:to_list(sets:intersection([sets:from_list(TcIds1),
  303. sets:from_list(TcIds2)])),
  304. ?assertEqual([], Both)
  305. end
  306. end
  307. ]
  308. }.
  309. % testing crash recovery means race conditions when either pids
  310. % haven't yet crashed or pooler hasn't recovered. So this helper loops
  311. % forver until N pids are obtained, ignoring error_no_members.
  312. get_n_pids(0, Acc) ->
  313. Acc;
  314. get_n_pids(N, Acc) ->
  315. case pooler:take_member() of
  316. error_no_members ->
  317. get_n_pids(N, Acc);
  318. Pid ->
  319. get_n_pids(N - 1, [Pid|Acc])
  320. end.