pooler_test.erl 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. -module(pooler_test).
  2. -include_lib("eunit/include/eunit.hrl").
  3. -compile([export_all]).
  4. % The `user' processes represent users of the pooler library. A user
  5. % process will take a pid, report details on the pid it has, release
  6. % and take a new pid, stop cleanly, and crash.
  7. start_user() ->
  8. spawn(fun() -> user_loop(start) end).
  9. user_id(Pid) ->
  10. Pid ! {get_tc_id, self()},
  11. receive
  12. {Type, Id} ->
  13. {Type, Id}
  14. end.
  15. user_new_tc(Pid) ->
  16. Pid ! new_tc.
  17. user_stop(Pid) ->
  18. Pid ! stop.
  19. user_crash(Pid) ->
  20. Pid ! crash.
  21. user_loop(Atom) when Atom =:= error_no_members orelse Atom =:= start ->
  22. user_loop(pooler:take_member());
  23. user_loop(MyTC) ->
  24. receive
  25. {get_tc_id, From} ->
  26. From ! pooled_gs:get_id(MyTC),
  27. user_loop(MyTC);
  28. {ping_tc, From} ->
  29. From ! pooled_gs:ping(MyTC),
  30. user_loop(MyTC);
  31. {ping_count, From} ->
  32. From ! pooled_gs:ping_count(MyTC),
  33. user_loop(MyTC);
  34. new_tc ->
  35. pooler:return_member(MyTC, ok),
  36. MyNewTC = pooler:take_member(),
  37. user_loop(MyNewTC);
  38. stop ->
  39. pooler:return_member(MyTC, ok),
  40. stopped;
  41. crash ->
  42. erlang:error({user_loop, kaboom})
  43. end.
  44. % The `tc' processes represent the pids tracked by pooler for testing.
  45. % They have a type and an ID and can report their type and ID and
  46. % stop.
  47. tc_loop({Type, Id}) ->
  48. receive
  49. {get_id, From} ->
  50. From ! {ok, Type, Id},
  51. tc_loop({Type, Id});
  52. stop -> stopped;
  53. crash ->
  54. erlang:error({tc_loop, kaboom})
  55. end.
  56. get_tc_id(Pid) ->
  57. Pid ! {get_id, self()},
  58. receive
  59. {ok, Type, Id} ->
  60. {Type, Id}
  61. after 200 ->
  62. timeout
  63. end.
  64. stop_tc(Pid) ->
  65. Pid ! stop.
  66. tc_starter(Type) ->
  67. Ref = make_ref(),
  68. spawn_link(fun() -> tc_loop({Type, Ref}) end).
  69. assert_tc_valid(Pid) ->
  70. ?assertMatch({_Type, _Ref}, get_tc_id(Pid)),
  71. ok.
  72. % tc_sanity_test() ->
  73. % Pid1 = tc_starter("1"),
  74. % {"1", Id1} = get_tc_id(Pid1),
  75. % Pid2 = tc_starter("1"),
  76. % {"1", Id2} = get_tc_id(Pid2),
  77. % ?assertNot(Id1 == Id2),
  78. % stop_tc(Pid1),
  79. % stop_tc(Pid2).
  80. % user_sanity_test() ->
  81. % Pid1 = tc_starter("1"),
  82. % User = spawn(fun() -> user_loop(Pid1) end),
  83. % ?assertMatch({"1", _Ref}, user_id(User)),
  84. % user_crash(User),
  85. % stop_tc(Pid1).
  86. pooler_basics_test_() ->
  87. {setup,
  88. fun() ->
  89. application:set_env(pooler, metrics_module, fake_metrics),
  90. fake_metrics:start_link()
  91. end,
  92. fun(_X) ->
  93. fake_metrics:stop()
  94. end,
  95. {foreach,
  96. % setup
  97. fun() ->
  98. Pools = [[{name, "p1"},
  99. {max_count, 3},
  100. {init_count, 2},
  101. {start_mfa,
  102. {pooled_gs, start_link, [{"type-0"}]}}]],
  103. application:set_env(pooler, pools, Pools),
  104. error_logger:delete_report_handler(error_logger_tty_h),
  105. application:start(pooler)
  106. end,
  107. fun(_X) ->
  108. application:stop(pooler)
  109. end,
  110. [
  111. {"there are init_count members at start",
  112. fun() ->
  113. Stats = [ P || {P, {_, free, _}} <- pooler:pool_stats() ],
  114. ?assertEqual(2, length(Stats))
  115. end},
  116. {"take and return one",
  117. fun() ->
  118. P = pooler:take_member(),
  119. ?assertMatch({"type-0", _Id}, pooled_gs:get_id(P)),
  120. ok = pooler:return_member(P, ok)
  121. end},
  122. {"take and return one, named pool",
  123. fun() ->
  124. P = pooler:take_member("p1"),
  125. ?assertMatch({"type-0", _Id}, pooled_gs:get_id(P)),
  126. ok, pooler:return_member(P)
  127. end},
  128. {"attempt to take form unknown pool",
  129. fun() ->
  130. ?assertEqual(error_no_pool, pooler:take_member("bad_pool_name"))
  131. end},
  132. {"pids are created on demand until max",
  133. fun() ->
  134. Pids = [pooler:take_member(), pooler:take_member(), pooler:take_member()],
  135. ?assertEqual(error_no_members, pooler:take_member()),
  136. ?assertEqual(error_no_members, pooler:take_member()),
  137. PRefs = [ R || {_T, R} <- [ pooled_gs:get_id(P) || P <- Pids ] ],
  138. % no duplicates
  139. ?assertEqual(length(PRefs), length(lists:usort(PRefs)))
  140. end
  141. },
  142. {"pids are reused most recent return first",
  143. fun() ->
  144. P1 = pooler:take_member(),
  145. P2 = pooler:take_member(),
  146. ?assertNot(P1 == P2),
  147. ok = pooler:return_member(P1, ok),
  148. ok = pooler:return_member(P2, ok),
  149. % pids are reused most recent first
  150. ?assertEqual(P2, pooler:take_member()),
  151. ?assertEqual(P1, pooler:take_member())
  152. end},
  153. {"if an in-use pid crashes it is replaced",
  154. fun() ->
  155. Pids0 = [pooler:take_member(), pooler:take_member(),
  156. pooler:take_member()],
  157. Ids0 = [ pooled_gs:get_id(P) || P <- Pids0 ],
  158. % crash them all
  159. [ pooled_gs:crash(P) || P <- Pids0 ],
  160. Pids1 = get_n_pids(3, []),
  161. Ids1 = [ pooled_gs:get_id(P) || P <- Pids1 ],
  162. [ ?assertNot(lists:member(I, Ids0)) || I <- Ids1 ]
  163. end
  164. },
  165. {"if a free pid crashes it is replaced",
  166. fun() ->
  167. FreePids = [ P || {P, {_, free, _}} <- pooler:pool_stats() ],
  168. [ exit(P, kill) || P <- FreePids ],
  169. Pids1 = get_n_pids(3, []),
  170. ?assertEqual(3, length(Pids1))
  171. end},
  172. {"if a pid is returned with bad status it is replaced",
  173. fun() ->
  174. Pids0 = [pooler:take_member(), pooler:take_member(), pooler:take_member()],
  175. Ids0 = [ pooled_gs:get_id(P) || P <- Pids0 ],
  176. % return them all marking as bad
  177. [ pooler:return_member(P, fail) || P <- Pids0 ],
  178. Pids1 = get_n_pids(3, []),
  179. Ids1 = [ pooled_gs:get_id(P) || P <- Pids1 ],
  180. [ ?assertNot(lists:member(I, Ids0)) || I <- Ids1 ]
  181. end
  182. },
  183. {"if a consumer crashes, pid is replaced",
  184. fun() ->
  185. Consumer = start_user(),
  186. StartId = user_id(Consumer),
  187. user_crash(Consumer),
  188. NewPid = hd(get_n_pids(1, [])),
  189. NewId = pooled_gs:get_id(NewPid),
  190. ?assertNot(NewId == StartId)
  191. end
  192. },
  193. {"it is ok to return an unknown pid",
  194. fun() ->
  195. Bogus1 = spawn(fun() -> ok end),
  196. Bogus2 = spawn(fun() -> ok end),
  197. ?assertEqual(ok, pooler:return_member(Bogus1, ok)),
  198. ?assertEqual(ok, pooler:return_member(Bogus2, fail))
  199. end
  200. },
  201. {"calling return_member on error_no_members is ignored",
  202. fun() ->
  203. ?assertEqual(ok, pooler:return_member(error_no_members)),
  204. ?assertEqual(ok, pooler:return_member(error_no_members, ok)),
  205. ?assertEqual(ok, pooler:return_member(error_no_members, fail))
  206. end
  207. },
  208. {"metrics have been called",
  209. fun() ->
  210. %% exercise the API to ensure we have certain keys reported as metrics
  211. fake_metrics:reset_metrics(),
  212. Pids = [ pooler:take_member() || _I <- lists:seq(1, 10) ],
  213. [ pooler:return_member(P) || P <- Pids ],
  214. pooler:take_member("bad_pool_name"),
  215. %% kill and unused member
  216. exit(hd(Pids), kill),
  217. %% kill a used member
  218. KillMe = pooler:take_member("p1"),
  219. exit(KillMe, kill),
  220. %% FIXME: We need to wait for pooler to process the
  221. %% exit message. This is ugly, will fix later.
  222. timer:sleep(200), % :(
  223. ExpectKeys = [<<"pooler.error_no_members_count">>,
  224. <<"pooler.events">>,
  225. <<"pooler.killed_free_count">>,
  226. <<"pooler.killed_in_use_count">>,
  227. <<"pooler.p1.free_count">>,
  228. <<"pooler.p1.in_use_count">>,
  229. <<"pooler.p1.take_rate">>],
  230. Metrics = fake_metrics:get_metrics(),
  231. GotKeys = lists:usort([ Name || {Name, _, _} <- Metrics ]),
  232. ?assertEqual(ExpectKeys, GotKeys)
  233. end}
  234. ]}}.
  235. pooler_limit_failed_adds_test_() ->
  236. %% verify that pooler crashes completely if too many failures are
  237. %% encountered while trying to add pids.
  238. {setup,
  239. fun() ->
  240. Pools = [[{name, "p1"},
  241. {max_count, 10},
  242. {init_count, 10},
  243. {start_mfa,
  244. {pooled_gs, start_link, [crash]}}]],
  245. application:set_env(pooler, pools, Pools)
  246. end,
  247. fun(_) ->
  248. application:stop(pooler)
  249. end,
  250. fun() ->
  251. application:start(pooler),
  252. ?assertEqual(error_no_members, pooler:take_member()),
  253. ?assertEqual(error_no_members, pooler:take_member("p1"))
  254. end}.
  255. pooler_scheduled_cull_test_() ->
  256. {setup,
  257. fun() ->
  258. application:set_env(pooler, metrics_module, fake_metrics),
  259. fake_metrics:start_link(),
  260. Pools = [[{name, "p1"},
  261. {max_count, 10},
  262. {init_count, 2},
  263. {start_mfa, {pooled_gs, start_link, [{"type-0"}]}},
  264. {cull_interval, {200, ms}}]],
  265. application:set_env(pooler, pools, Pools),
  266. error_logger:delete_report_handler(error_logger_tty_h),
  267. application:start(pooler)
  268. end,
  269. fun(_X) ->
  270. fake_metrics:stop(),
  271. application:stop(pooler)
  272. end,
  273. [{"excess members are culled repeatedly",
  274. fun() ->
  275. %% take all members
  276. Pids1 = [ pooler:take_member("p1") || _X <- lists:seq(1, 10) ],
  277. %% return all
  278. [ pooler:return_member(P) || P <- Pids1 ],
  279. ?assertEqual(10, length(pooler:pool_stats())),
  280. %% wait for longer than cull delay
  281. timer:sleep(250),
  282. ?assertEqual(2, length(pooler:pool_stats())),
  283. %% repeat the test to verify that culling gets rescheduled.
  284. Pids2 = [ pooler:take_member("p1") || _X <- lists:seq(1, 10) ],
  285. %% return all
  286. [ pooler:return_member(P) || P <- Pids2 ],
  287. ?assertEqual(10, length(pooler:pool_stats())),
  288. %% wait for longer than cull delay
  289. timer:sleep(250),
  290. ?assertEqual(2, length(pooler:pool_stats()))
  291. end
  292. },
  293. {"non-excess members are not culled",
  294. fun() ->
  295. [P1, P2] = [pooler:take_member("p1") || _X <- [1, 2] ],
  296. [pooler:return_member(P) || P <- [P1, P2] ],
  297. ?assertEqual(2, length(pooler:pool_stats())),
  298. timer:sleep(250),
  299. ?assertEqual(2, length(pooler:pool_stats()))
  300. end
  301. },
  302. {"in-use members are not culled",
  303. fun() ->
  304. %% take all members
  305. Pids = [ pooler:take_member("p1") || _X <- lists:seq(1, 10) ],
  306. %% don't return any
  307. ?assertEqual(10, length(pooler:pool_stats())),
  308. %% wait for longer than cull delay
  309. timer:sleep(250),
  310. ?assertEqual(10, length(pooler:pool_stats())),
  311. [ pooler:return_member(P) || P <- Pids ]
  312. end}
  313. ]}.
  314. random_message_test_() ->
  315. {setup,
  316. fun() ->
  317. Pools = [[{name, "p1"},
  318. {max_count, 2},
  319. {init_count, 1},
  320. {start_mfa,
  321. {pooled_gs, start_link, [{"type-0"}]}}]],
  322. application:set_env(pooler, pools, Pools),
  323. error_logger:delete_report_handler(error_logger_tty_h),
  324. application:start(pooler),
  325. %% now send some bogus messages
  326. %% do the call in a throw-away process to avoid timeout error
  327. spawn(fun() -> catch gen_server:call(pooler, {unexpected_garbage_msg, 5}) end),
  328. gen_server:cast(pooler, {unexpected_garbage_msg, 6}),
  329. whereis(pooler) ! {unexpected_garbage_msg, 7},
  330. ok
  331. end,
  332. fun(_) ->
  333. application:stop(pooler)
  334. end,
  335. [
  336. fun() ->
  337. Pid = pooler:take_member("p1"),
  338. {Type, _} = pooled_gs:get_id(Pid),
  339. ?assertEqual("type-0", Type)
  340. end
  341. ]}.
  342. pooler_integration_test_() ->
  343. {foreach,
  344. % setup
  345. fun() ->
  346. Pools = [[{name, "p1"},
  347. {max_count, 10},
  348. {init_count, 10},
  349. {start_mfa,
  350. {pooled_gs, start_link, [{"type-0"}]}}]],
  351. application:set_env(pooler, pools, Pools),
  352. error_logger:delete_report_handler(error_logger_tty_h),
  353. application:start(pooler),
  354. Users = [ start_user() || _X <- lists:seq(1, 10) ],
  355. Users
  356. end,
  357. % cleanup
  358. fun(Users) ->
  359. [ user_stop(U) || U <- Users ],
  360. application:stop(pooler)
  361. end,
  362. %
  363. [
  364. fun(Users) ->
  365. fun() ->
  366. % each user has a different tc ID
  367. TcIds = lists:sort([ user_id(UPid) || UPid <- Users ]),
  368. ?assertEqual(lists:usort(TcIds), TcIds)
  369. end
  370. end
  371. ,
  372. fun(Users) ->
  373. fun() ->
  374. % users still unique after a renew cycle
  375. [ user_new_tc(UPid) || UPid <- Users ],
  376. TcIds = lists:sort([ user_id(UPid) || UPid <- Users ]),
  377. ?assertEqual(lists:usort(TcIds), TcIds)
  378. end
  379. end
  380. ,
  381. fun(Users) ->
  382. fun() ->
  383. % all users crash, pids are replaced
  384. TcIds1 = lists:sort([ user_id(UPid) || UPid <- Users ]),
  385. [ user_crash(UPid) || UPid <- Users ],
  386. Seq = lists:seq(1, 5),
  387. Users2 = [ start_user() || _X <- Seq ],
  388. TcIds2 = lists:sort([ user_id(UPid) || UPid <- Users2 ]),
  389. Both =
  390. sets:to_list(sets:intersection([sets:from_list(TcIds1),
  391. sets:from_list(TcIds2)])),
  392. ?assertEqual([], Both)
  393. end
  394. end
  395. ]
  396. }.
  397. time_as_millis_test_() ->
  398. Zeros = [ {{0, U}, 0} || U <- [min, sec, ms, mu] ],
  399. Ones = [{{1, min}, 60000},
  400. {{1, sec}, 1000},
  401. {{1, ms}, 1},
  402. {{1, mu}, 0}],
  403. Misc = [{{3000, mu}, 3}],
  404. Tests = Zeros ++ Ones ++ Misc,
  405. [ ?_assertEqual(E, pooler:time_as_millis(I)) || {I, E} <- Tests ].
  406. time_as_micros_test_() ->
  407. Zeros = [ {{0, U}, 0} || U <- [min, sec, ms, mu] ],
  408. Ones = [{{1, min}, 60000000},
  409. {{1, sec}, 1000000},
  410. {{1, ms}, 1000},
  411. {{1, mu}, 1}],
  412. Misc = [{{3000, mu}, 3000}],
  413. Tests = Zeros ++ Ones ++ Misc,
  414. [ ?_assertEqual(E, pooler:time_as_micros(I)) || {I, E} <- Tests ].
  415. % testing crash recovery means race conditions when either pids
  416. % haven't yet crashed or pooler hasn't recovered. So this helper loops
  417. % forver until N pids are obtained, ignoring error_no_members.
  418. get_n_pids(0, Acc) ->
  419. Acc;
  420. get_n_pids(N, Acc) ->
  421. case pooler:take_member() of
  422. error_no_members ->
  423. get_n_pids(N, Acc);
  424. Pid ->
  425. get_n_pids(N - 1, [Pid|Acc])
  426. end.