pooler_tests.erl 22 KB


  1. -module(pooler_tests).
  2. -include_lib("eunit/include/eunit.hrl").
  3. -compile([export_all]).
  4. % The `user' processes represent users of the pooler library. A user
  5. % process will take a pid, report details on the pid it has, release
  6. % and take a new pid, stop cleanly, and crash.
  7. start_user() ->
  8. spawn(fun() -> user_loop(start) end).
  9. user_id(Pid) ->
  10. Pid ! {get_tc_id, self()},
  11. receive
  12. {Type, Id} ->
  13. {Type, Id}
  14. end.
  15. user_new_tc(Pid) ->
  16. Pid ! new_tc.
  17. user_stop(Pid) ->
  18. Pid ! stop.
  19. user_crash(Pid) ->
  20. Pid ! crash.
  21. user_loop(Atom) when Atom =:= error_no_members orelse Atom =:= start ->
  22. user_loop(pooler:take_member(test_pool_1));
  23. user_loop(MyTC) ->
  24. receive
  25. {get_tc_id, From} ->
  26. From ! pooled_gs:get_id(MyTC),
  27. user_loop(MyTC);
  28. {ping_tc, From} ->
  29. From ! pooled_gs:ping(MyTC),
  30. user_loop(MyTC);
  31. {ping_count, From} ->
  32. From ! pooled_gs:ping_count(MyTC),
  33. user_loop(MyTC);
  34. new_tc ->
  35. pooler:return_member(test_pool_1, MyTC, ok),
  36. MyNewTC = pooler:take_member(test_pool_1),
  37. user_loop(MyNewTC);
  38. stop ->
  39. pooler:return_member(test_pool_1, MyTC, ok),
  40. stopped;
  41. crash ->
  42. erlang:error({user_loop, kaboom})
  43. end.
  44. % The `tc' processes represent the pids tracked by pooler for testing.
  45. % They have a type and an ID and can report their type and ID and
  46. % stop.
  47. tc_loop({Type, Id}) ->
  48. receive
  49. {get_id, From} ->
  50. From ! {ok, Type, Id},
  51. tc_loop({Type, Id});
  52. stop -> stopped;
  53. crash ->
  54. erlang:error({tc_loop, kaboom})
  55. end.
  56. get_tc_id(Pid) ->
  57. Pid ! {get_id, self()},
  58. receive
  59. {ok, Type, Id} ->
  60. {Type, Id}
  61. after 200 ->
  62. timeout
  63. end.
  64. stop_tc(Pid) ->
  65. Pid ! stop.
  66. tc_starter(Type) ->
  67. Ref = make_ref(),
  68. spawn_link(fun() -> tc_loop({Type, Ref}) end).
  69. assert_tc_valid(Pid) ->
  70. ?assertMatch({_Type, _Ref}, get_tc_id(Pid)),
  71. ok.
  72. % tc_sanity_test() ->
  73. % Pid1 = tc_starter("1"),
  74. % {"1", Id1} = get_tc_id(Pid1),
  75. % Pid2 = tc_starter("1"),
  76. % {"1", Id2} = get_tc_id(Pid2),
  77. % ?assertNot(Id1 == Id2),
  78. % stop_tc(Pid1),
  79. % stop_tc(Pid2).
  80. % user_sanity_test() ->
  81. % Pid1 = tc_starter("1"),
  82. % User = spawn(fun() -> user_loop(Pid1) end),
  83. % ?assertMatch({"1", _Ref}, user_id(User)),
  84. % user_crash(User),
  85. % stop_tc(Pid1).
  86. pooler_basics_test_() ->
  87. {setup,
  88. fun() ->
  89. application:set_env(pooler, metrics_module, fake_metrics),
  90. fake_metrics:start_link()
  91. end,
  92. fun(_X) ->
  93. fake_metrics:stop()
  94. end,
  95. {foreach,
  96. % setup
  97. fun() ->
  98. Pools = [[{name, test_pool_1},
  99. {max_count, 3},
  100. {init_count, 2},
  101. {start_mfa,
  102. {pooled_gs, start_link, [{"type-0"}]}}]],
  103. application:set_env(pooler, pools, Pools),
  104. error_logger:delete_report_handler(error_logger_tty_h),
  105. application:start(pooler)
  106. end,
  107. fun(_X) ->
  108. application:stop(pooler)
  109. end,
  110. [
  111. {"there are init_count members at start",
  112. fun() ->
  113. Stats = [ P || {P, {_, free, _}} <- pooler:pool_stats(test_pool_1) ],
  114. ?assertEqual(2, length(Stats))
  115. end},
  116. {"take and return one",
  117. fun() ->
  118. P = pooler:take_member(test_pool_1),
  119. ?assertMatch({"type-0", _Id}, pooled_gs:get_id(P)),
  120. ok = pooler:return_member(test_pool_1, P, ok)
  121. end},
  122. {"take and return one, named pool",
  123. fun() ->
  124. P = pooler:take_member(test_pool_1),
  125. ?assertMatch({"type-0", _Id}, pooled_gs:get_id(P)),
  126. ok, pooler:return_member(test_pool_1, P)
  127. end},
  128. {"attempt to take form unknown pool",
  129. fun() ->
  130. %% since pools are now servers, an unknown pool will timeout
  131. ?assertExit({noproc, _}, pooler:take_member(bad_pool_name))
  132. end},
  133. {"members creation is triggered after pool exhaustion until max",
  134. fun() ->
  135. %% init count is 2
  136. Pids0 = [pooler:take_member(test_pool_1), pooler:take_member(test_pool_1)],
  137. %% since new member creation is async, can only assert
  138. %% that we will get a pid, but may not be first try.
  139. Pids = get_n_pids(1, Pids0),
  140. %% pool is at max now, requests should give error
  141. ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
  142. ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
  143. PRefs = [ R || {_T, R} <- [ pooled_gs:get_id(P) || P <- Pids ] ],
  144. % no duplicates
  145. ?assertEqual(length(PRefs), length(lists:usort(PRefs)))
  146. end
  147. },
  148. {"pids are reused most recent return first",
  149. fun() ->
  150. P1 = pooler:take_member(test_pool_1),
  151. P2 = pooler:take_member(test_pool_1),
  152. ?assertNot(P1 == P2),
  153. ok = pooler:return_member(test_pool_1, P1, ok),
  154. ok = pooler:return_member(test_pool_1, P2, ok),
  155. % pids are reused most recent first
  156. ?assertEqual(P2, pooler:take_member(test_pool_1)),
  157. ?assertEqual(P1, pooler:take_member(test_pool_1))
  158. end},
  159. {"if an in-use pid crashes it is replaced",
  160. fun() ->
  161. Pids0 = get_n_pids(3, []),
  162. Ids0 = [ pooled_gs:get_id(P) || P <- Pids0 ],
  163. % crash them all
  164. [ pooled_gs:crash(P) || P <- Pids0 ],
  165. Pids1 = get_n_pids(3, []),
  166. Ids1 = [ pooled_gs:get_id(P) || P <- Pids1 ],
  167. [ ?assertNot(lists:member(I, Ids0)) || I <- Ids1 ]
  168. end
  169. },
  170. {"if a free pid crashes it is replaced",
  171. fun() ->
  172. FreePids = [ P || {P, {_, free, _}} <- pooler:pool_stats(test_pool_1) ],
  173. [ exit(P, kill) || P <- FreePids ],
  174. Pids1 = get_n_pids(3, []),
  175. ?assertEqual(3, length(Pids1))
  176. end},
  177. {"if a pid is returned with bad status it is replaced",
  178. fun() ->
  179. Pids0 = get_n_pids(3, []),
  180. Ids0 = [ pooled_gs:get_id(P) || P <- Pids0 ],
  181. % return them all marking as bad
  182. [ pooler:return_member(test_pool_1, P, fail) || P <- Pids0 ],
  183. Pids1 = get_n_pids(3, []),
  184. Ids1 = [ pooled_gs:get_id(P) || P <- Pids1 ],
  185. [ ?assertNot(lists:member(I, Ids0)) || I <- Ids1 ]
  186. end
  187. },
  188. {"if a consumer crashes, pid is replaced",
  189. fun() ->
  190. Consumer = start_user(),
  191. StartId = user_id(Consumer),
  192. user_crash(Consumer),
  193. NewPid = hd(get_n_pids(1, [])),
  194. NewId = pooled_gs:get_id(NewPid),
  195. ?assertNot(NewId == StartId)
  196. end
  197. },
  198. {"it is ok to return an unknown pid",
  199. fun() ->
  200. Bogus1 = spawn(fun() -> ok end),
  201. Bogus2 = spawn(fun() -> ok end),
  202. ?assertEqual(ok, pooler:return_member(test_pool_1, Bogus1, ok)),
  203. ?assertEqual(ok, pooler:return_member(test_pool_1, Bogus2, fail))
  204. end
  205. },
  206. {"calling return_member on error_no_members is ignored",
  207. fun() ->
  208. ?assertEqual(ok, pooler:return_member(test_pool_1, error_no_members)),
  209. ?assertEqual(ok, pooler:return_member(test_pool_1, error_no_members, ok)),
  210. ?assertEqual(ok, pooler:return_member(test_pool_1, error_no_members, fail))
  211. end
  212. },
  213. {"dynamic pool creation",
  214. fun() ->
  215. {ok, SupPid} = pooler:new_pool([{name, dyn_pool_1},
  216. {max_count, 3},
  217. {init_count, 2},
  218. {start_mfa,
  219. {pooled_gs, start_link, [{"dyn-0"}]}}]),
  220. ?assert(is_pid(SupPid)),
  221. M = pooler:take_member(dyn_pool_1),
  222. ?assertMatch({"dyn-0", _Id}, pooled_gs:get_id(M)),
  223. ?assertEqual(ok, pooler:rm_pool(dyn_pool_1)),
  224. ?assertExit({noproc, _}, pooler:take_member(dyn_pool_1)),
  225. %% remove non-existing pool
  226. ?assertEqual(ok, pooler:rm_pool(dyn_pool_X)),
  227. ?assertEqual(ok, pooler:rm_pool(dyn_pool_1))
  228. end},
  229. {"metrics have been called",
  230. fun() ->
  231. %% exercise the API to ensure we have certain keys reported as metrics
  232. fake_metrics:reset_metrics(),
  233. Pids = [ pooler:take_member(test_pool_1) || _I <- lists:seq(1, 10) ],
  234. [ pooler:return_member(test_pool_1, P) || P <- Pids ],
  235. catch pooler:take_member(bad_pool_name),
  236. %% kill and unused member
  237. exit(hd(Pids), kill),
  238. %% kill a used member
  239. KillMe = pooler:take_member(test_pool_1),
  240. exit(KillMe, kill),
  241. %% FIXME: We need to wait for pooler to process the
  242. %% exit message. This is ugly, will fix later.
  243. timer:sleep(200), % :(
  244. ExpectKeys = lists:sort([<<"pooler.test_pool_1.error_no_members_count">>,
  245. <<"pooler.test_pool_1.events">>,
  246. <<"pooler.test_pool_1.free_count">>,
  247. <<"pooler.test_pool_1.in_use_count">>,
  248. <<"pooler.test_pool_1.killed_free_count">>,
  249. <<"pooler.test_pool_1.killed_in_use_count">>,
  250. <<"pooler.test_pool_1.take_rate">>]),
  251. Metrics = fake_metrics:get_metrics(),
  252. GotKeys = lists:usort([ Name || {Name, _, _} <- Metrics ]),
  253. ?assertEqual(ExpectKeys, GotKeys)
  254. end},
  255. {"accept bad member is handled",
  256. fun() ->
  257. Bad = spawn(fun() -> ok end),
  258. Ref = erlang:make_ref(),
  259. ?assertEqual(ok, pooler:accept_member(test_pool_1, {Ref, Bad}))
  260. end}
  261. ]}}.
  262. pooler_groups_test_() ->
  263. {setup,
  264. fun() ->
  265. application:set_env(pooler, metrics_module, fake_metrics),
  266. fake_metrics:start_link()
  267. end,
  268. fun(_X) ->
  269. fake_metrics:stop()
  270. end,
  271. {foreach,
  272. % setup
  273. fun() ->
  274. Pools = [[{name, test_pool_1},
  275. {group, group_1},
  276. {max_count, 3},
  277. {init_count, 2},
  278. {start_mfa,
  279. {pooled_gs, start_link, [{"type-1-1"}]}}],
  280. [{name, test_pool_2},
  281. {group, group_1},
  282. {max_count, 3},
  283. {init_count, 2},
  284. {start_mfa,
  285. {pooled_gs, start_link, [{"type-1-2"}]}}],
  286. %% test_pool_3 not part of the group
  287. [{name, test_pool_3},
  288. {group, undefined},
  289. {max_count, 3},
  290. {init_count, 2},
  291. {start_mfa,
  292. {pooled_gs, start_link, [{"type-3"}]}}]
  293. ],
  294. application:set_env(pooler, pools, Pools),
  295. %% error_logger:delete_report_handler(error_logger_tty_h),
  296. pg2:start(),
  297. application:start(pooler)
  298. end,
  299. fun(_X) ->
  300. application:stop(pooler),
  301. application:stop(pg2)
  302. end,
  303. [
  304. {"take and return one group member (repeated)",
  305. fun() ->
  306. Types = [ begin
  307. Pid = pooler:take_group_member(group_1),
  308. {Type, _} = pooled_gs:get_id(Pid),
  309. ?assertMatch("type-1" ++ _, Type),
  310. ok = pooler:return_group_member(group_1, Pid, ok),
  311. Type
  312. end
  313. || _I <- lists:seq(1, 50) ],
  314. Type_1_1 = [ X || "type-1-1" = X <- Types ],
  315. Type_1_2 = [ X || "type-1-2" = X <- Types ],
  316. ?assert(length(Type_1_1) > 0),
  317. ?assert(length(Type_1_2) > 0)
  318. end},
  319. {"take member from unknown group",
  320. fun() ->
  321. ?assertEqual({error_no_group, not_a_group},
  322. pooler:take_group_member(not_a_group))
  323. end},
  324. {"return member to unknown group",
  325. fun() ->
  326. Pid = pooler:take_group_member(group_1),
  327. ?assertEqual(ok, pooler:return_group_member(no_such_group, Pid))
  328. end},
  329. {"return member to wrong group",
  330. fun() ->
  331. Pid = pooler:take_member(test_pool_3),
  332. ?assertEqual(ok, pooler:return_group_member(group_1, Pid))
  333. end},
  334. {"take member from empty group",
  335. fun() ->
  336. %% artificially empty group member list
  337. [ pg2:leave(group_1, M) || M <- pg2:get_members(group_1) ],
  338. ?assertEqual(error_no_members, pooler:take_group_member(group_1))
  339. end},
  340. {"return member to group, implied ok",
  341. fun() ->
  342. Pid = pooler:take_group_member(group_1),
  343. ?assertEqual(ok, pooler:return_group_member(group_1, Pid))
  344. end},
  345. {"return error_no_member to group",
  346. fun() ->
  347. ?assertEqual(ok, pooler:return_group_member(group_1, error_no_members))
  348. end},
  349. {"exhaust pools in group",
  350. fun() ->
  351. Pids = get_n_pids_group(group_1, 6, []),
  352. %% they should all be pids
  353. [ begin
  354. {Type, _} = pooled_gs:get_id(P),
  355. ?assertMatch("type-1" ++ _, Type),
  356. ok
  357. end || P <- Pids ],
  358. %% further attempts should be error
  359. [error_no_members,
  360. error_no_members,
  361. error_no_members] = [ pooler:take_group_member(group_1)
  362. || _I <- lists:seq(1, 3) ]
  363. end}
  364. ]}}.
  365. pooler_limit_failed_adds_test_() ->
  366. %% verify that pooler crashes completely if too many failures are
  367. %% encountered while trying to add pids.
  368. {setup,
  369. fun() ->
  370. Pools = [[{name, test_pool_1},
  371. {max_count, 10},
  372. {init_count, 10},
  373. {start_mfa,
  374. {pooled_gs, start_link, [crash]}}]],
  375. application:set_env(pooler, pools, Pools)
  376. end,
  377. fun(_) ->
  378. application:stop(pooler)
  379. end,
  380. fun() ->
  381. application:start(pooler),
  382. ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
  383. ?assertEqual(error_no_members, pooler:take_member(test_pool_1))
  384. end}.
  385. pooler_scheduled_cull_test_() ->
  386. {setup,
  387. fun() ->
  388. application:set_env(pooler, metrics_module, fake_metrics),
  389. fake_metrics:start_link(),
  390. Pools = [[{name, test_pool_1},
  391. {max_count, 10},
  392. {init_count, 2},
  393. {start_mfa, {pooled_gs, start_link, [{"type-0"}]}},
  394. {cull_interval, {200, ms}}]],
  395. application:set_env(pooler, pools, Pools),
  396. %% error_logger:delete_report_handler(error_logger_tty_h),
  397. application:start(pooler)
  398. end,
  399. fun(_X) ->
  400. fake_metrics:stop(),
  401. application:stop(pooler)
  402. end,
  403. [{"excess members are culled repeatedly",
  404. fun() ->
  405. %% take all members
  406. Pids1 = get_n_pids(test_pool_1, 10, []),
  407. %% return all
  408. [ pooler:return_member(test_pool_1, P) || P <- Pids1 ],
  409. ?assertEqual(10, length(pooler:pool_stats(test_pool_1))),
  410. %% wait for longer than cull delay
  411. timer:sleep(250),
  412. ?assertEqual(2, length(pooler:pool_stats(test_pool_1))),
  413. %% repeat the test to verify that culling gets rescheduled.
  414. Pids2 = get_n_pids(test_pool_1, 10, []),
  415. %% return all
  416. [ pooler:return_member(test_pool_1, P) || P <- Pids2 ],
  417. ?assertEqual(10, length(pooler:pool_stats(test_pool_1))),
  418. %% wait for longer than cull delay
  419. timer:sleep(250),
  420. ?assertEqual(2, length(pooler:pool_stats(test_pool_1)))
  421. end
  422. },
  423. {"non-excess members are not culled",
  424. fun() ->
  425. [P1, P2] = [pooler:take_member(test_pool_1) || _X <- [1, 2] ],
  426. [pooler:return_member(test_pool_1, P) || P <- [P1, P2] ],
  427. ?assertEqual(2, length(pooler:pool_stats(test_pool_1))),
  428. timer:sleep(250),
  429. ?assertEqual(2, length(pooler:pool_stats(test_pool_1)))
  430. end
  431. },
  432. {"in-use members are not culled",
  433. fun() ->
  434. %% take all members
  435. Pids = get_n_pids(test_pool_1, 10, []),
  436. %% don't return any
  437. ?assertEqual(10, length(pooler:pool_stats(test_pool_1))),
  438. %% wait for longer than cull delay
  439. timer:sleep(250),
  440. ?assertEqual(10, length(pooler:pool_stats(test_pool_1))),
  441. [ pooler:return_member(test_pool_1, P) || P <- Pids ]
  442. end}
  443. ]}.
  444. random_message_test_() ->
  445. {setup,
  446. fun() ->
  447. Pools = [[{name, test_pool_1},
  448. {max_count, 2},
  449. {init_count, 1},
  450. {start_mfa,
  451. {pooled_gs, start_link, [{"type-0"}]}}]],
  452. application:set_env(pooler, pools, Pools),
  453. error_logger:delete_report_handler(error_logger_tty_h),
  454. application:start(pooler),
  455. %% now send some bogus messages
  456. %% do the call in a throw-away process to avoid timeout error
  457. spawn(fun() -> catch gen_server:call(test_pool_1, {unexpected_garbage_msg, 5}) end),
  458. gen_server:cast(test_pool_1, {unexpected_garbage_msg, 6}),
  459. whereis(test_pool_1) ! {unexpected_garbage_msg, 7},
  460. ok
  461. end,
  462. fun(_) ->
  463. application:stop(pooler)
  464. end,
  465. [
  466. fun() ->
  467. Pid = spawn(fun() -> ok end),
  468. MonMsg = {'DOWN', erlang:make_ref(), process, Pid, because},
  469. test_pool_1 ! MonMsg
  470. end,
  471. fun() ->
  472. Pid = pooler:take_member(test_pool_1),
  473. {Type, _} = pooled_gs:get_id(Pid),
  474. ?assertEqual("type-0", Type)
  475. end,
  476. fun() ->
  477. RawPool = gen_server:call(test_pool_1, dump_pool),
  478. ?assertEqual(pool, element(1, RawPool))
  479. end
  480. ]}.
  481. pooler_integration_test_() ->
  482. {foreach,
  483. % setup
  484. fun() ->
  485. Pools = [[{name, test_pool_1},
  486. {max_count, 10},
  487. {init_count, 10},
  488. {start_mfa,
  489. {pooled_gs, start_link, [{"type-0"}]}}]],
  490. application:set_env(pooler, pools, Pools),
  491. error_logger:delete_report_handler(error_logger_tty_h),
  492. application:start(pooler),
  493. Users = [ start_user() || _X <- lists:seq(1, 10) ],
  494. Users
  495. end,
  496. % cleanup
  497. fun(Users) ->
  498. [ user_stop(U) || U <- Users ],
  499. application:stop(pooler)
  500. end,
  501. %
  502. [
  503. fun(Users) ->
  504. fun() ->
  505. % each user has a different tc ID
  506. TcIds = lists:sort([ user_id(UPid) || UPid <- Users ]),
  507. ?assertEqual(lists:usort(TcIds), TcIds)
  508. end
  509. end
  510. ,
  511. fun(Users) ->
  512. fun() ->
  513. % users still unique after a renew cycle
  514. [ user_new_tc(UPid) || UPid <- Users ],
  515. TcIds = lists:sort([ user_id(UPid) || UPid <- Users ]),
  516. ?assertEqual(lists:usort(TcIds), TcIds)
  517. end
  518. end
  519. ,
  520. fun(Users) ->
  521. fun() ->
  522. % all users crash, pids are replaced
  523. TcIds1 = lists:sort([ user_id(UPid) || UPid <- Users ]),
  524. [ user_crash(UPid) || UPid <- Users ],
  525. Seq = lists:seq(1, 5),
  526. Users2 = [ start_user() || _X <- Seq ],
  527. TcIds2 = lists:sort([ user_id(UPid) || UPid <- Users2 ]),
  528. Both =
  529. sets:to_list(sets:intersection([sets:from_list(TcIds1),
  530. sets:from_list(TcIds2)])),
  531. ?assertEqual([], Both)
  532. end
  533. end
  534. ]
  535. }.
  536. time_as_millis_test_() ->
  537. Zeros = [ {{0, U}, 0} || U <- [min, sec, ms, mu] ],
  538. Ones = [{{1, min}, 60000},
  539. {{1, sec}, 1000},
  540. {{1, ms}, 1},
  541. {{1, mu}, 0}],
  542. Misc = [{{3000, mu}, 3}],
  543. Tests = Zeros ++ Ones ++ Misc,
  544. [ ?_assertEqual(E, pooler:time_as_millis(I)) || {I, E} <- Tests ].
  545. time_as_micros_test_() ->
  546. Zeros = [ {{0, U}, 0} || U <- [min, sec, ms, mu] ],
  547. Ones = [{{1, min}, 60000000},
  548. {{1, sec}, 1000000},
  549. {{1, ms}, 1000},
  550. {{1, mu}, 1}],
  551. Misc = [{{3000, mu}, 3000}],
  552. Tests = Zeros ++ Ones ++ Misc,
  553. [ ?_assertEqual(E, pooler:time_as_micros(I)) || {I, E} <- Tests ].
  554. % testing crash recovery means race conditions when either pids
  555. % haven't yet crashed or pooler hasn't recovered. So this helper loops
  556. % forver until N pids are obtained, ignoring error_no_members.
  557. get_n_pids(N, Acc) ->
  558. get_n_pids(test_pool_1, N, Acc).
  559. get_n_pids(_Pool, 0, Acc) ->
  560. Acc;
  561. get_n_pids(Pool, N, Acc) ->
  562. case pooler:take_member(Pool) of
  563. error_no_members ->
  564. get_n_pids(Pool, N, Acc);
  565. Pid ->
  566. get_n_pids(Pool, N - 1, [Pid|Acc])
  567. end.
  568. get_n_pids_group(_Group, 0, Acc) ->
  569. Acc;
  570. get_n_pids_group(Group, N, Acc) ->
  571. case pooler:take_group_member(Group) of
  572. error_no_members ->
  573. get_n_pids_group(Group, N, Acc);
  574. Pid ->
  575. get_n_pids_group(Group, N - 1, [Pid|Acc])
  576. end.