pooler_tests.erl 37 KB


  1. -module(pooler_tests).
  2. -include_lib("eunit/include/eunit.hrl").
  3. -include("../src/pooler.hrl").
  4. -compile([export_all]).
  5. % The `user' processes represent users of the pooler library. A user
  6. % process will take a pid, report details on the pid it has, release
  7. % and take a new pid, stop cleanly, and crash.
  8. start_user() ->
  9. spawn(fun() -> user_loop(start) end).
  10. user_id(Pid) ->
  11. Pid ! {get_tc_id, self()},
  12. receive
  13. {Type, Id} ->
  14. {Type, Id}
  15. end.
  16. user_new_tc(Pid) ->
  17. Pid ! new_tc.
  18. user_stop(Pid) ->
  19. Pid ! stop.
  20. user_crash(Pid) ->
  21. Pid ! crash.
  22. user_loop(Atom) when Atom =:= error_no_members orelse Atom =:= start ->
  23. user_loop(pooler:take_member(test_pool_1));
  24. user_loop(MyTC) ->
  25. receive
  26. {get_tc_id, From} ->
  27. From ! pooled_gs:get_id(MyTC),
  28. user_loop(MyTC);
  29. {ping_tc, From} ->
  30. From ! pooled_gs:ping(MyTC),
  31. user_loop(MyTC);
  32. {ping_count, From} ->
  33. From ! pooled_gs:ping_count(MyTC),
  34. user_loop(MyTC);
  35. new_tc ->
  36. pooler:return_member(test_pool_1, MyTC, ok),
  37. MyNewTC = pooler:take_member(test_pool_1),
  38. user_loop(MyNewTC);
  39. stop ->
  40. pooler:return_member(test_pool_1, MyTC, ok),
  41. stopped;
  42. crash ->
  43. erlang:error({user_loop, kaboom})
  44. end.
  45. % The `tc' processes represent the pids tracked by pooler for testing.
  46. % They have a type and an ID and can report their type and ID and
  47. % stop.
  48. tc_loop({Type, Id}) ->
  49. receive
  50. {get_id, From} ->
  51. From ! {ok, Type, Id},
  52. tc_loop({Type, Id});
  53. stop -> stopped;
  54. crash ->
  55. erlang:error({tc_loop, kaboom})
  56. end.
  57. get_tc_id(Pid) ->
  58. Pid ! {get_id, self()},
  59. receive
  60. {ok, Type, Id} ->
  61. {Type, Id}
  62. after 200 ->
  63. timeout
  64. end.
  65. stop_tc(Pid) ->
  66. Pid ! stop.
  67. tc_starter(Type) ->
  68. Ref = make_ref(),
  69. spawn_link(fun() -> tc_loop({Type, Ref}) end).
  70. assert_tc_valid(Pid) ->
  71. ?assertMatch({_Type, _Ref}, get_tc_id(Pid)),
  72. ok.
  73. % tc_sanity_test() ->
  74. % Pid1 = tc_starter("1"),
  75. % {"1", Id1} = get_tc_id(Pid1),
  76. % Pid2 = tc_starter("1"),
  77. % {"1", Id2} = get_tc_id(Pid2),
  78. % ?assertNot(Id1 == Id2),
  79. % stop_tc(Pid1),
  80. % stop_tc(Pid2).
  81. % user_sanity_test() ->
  82. % Pid1 = tc_starter("1"),
  83. % User = spawn(fun() -> user_loop(Pid1) end),
  84. % ?assertMatch({"1", _Ref}, user_id(User)),
  85. % user_crash(User),
  86. % stop_tc(Pid1).
  87. pooler_basics_via_config_test_() ->
  88. {setup,
  89. fun() ->
  90. application:set_env(pooler, metrics_module, fake_metrics),
  91. fake_metrics:start_link()
  92. end,
  93. fun(_X) ->
  94. fake_metrics:stop()
  95. end,
  96. {foreach,
  97. % setup
  98. fun() ->
  99. Pools = [[{name, test_pool_1},
  100. {max_count, 3},
  101. {init_count, 2},
  102. {cull_interval, {0, min}},
  103. {start_mfa,
  104. {pooled_gs, start_link, [{"type-0"}]}}]],
  105. application:set_env(pooler, pools, Pools),
  106. error_logger:delete_report_handler(error_logger_tty_h),
  107. application:start(pooler)
  108. end,
  109. fun(_X) ->
  110. application:stop(pooler)
  111. end,
  112. basic_tests()}}.
  113. pooler_basics_dynamic_test_() ->
  114. {setup,
  115. fun() ->
  116. application:set_env(pooler, metrics_module, fake_metrics),
  117. fake_metrics:start_link()
  118. end,
  119. fun(_X) ->
  120. fake_metrics:stop()
  121. end,
  122. {foreach,
  123. % setup
  124. fun() ->
  125. Pool = [{name, test_pool_1},
  126. {max_count, 3},
  127. {init_count, 2},
  128. {start_mfa,
  129. {pooled_gs, start_link, [{"type-0"}]}}],
  130. application:unset_env(pooler, pools),
  131. error_logger:delete_report_handler(error_logger_tty_h),
  132. application:start(pooler),
  133. pooler:new_pool(Pool)
  134. end,
  135. fun(_X) ->
  136. application:stop(pooler)
  137. end,
  138. basic_tests()}}.
  139. pooler_basics_integration_to_other_supervisor_test_() ->
  140. {setup,
  141. fun() ->
  142. application:set_env(pooler, metrics_module, fake_metrics),
  143. fake_metrics:start_link()
  144. end,
  145. fun(_X) ->
  146. fake_metrics:stop()
  147. end,
  148. {foreach,
  149. % setup
  150. fun() ->
  151. Pool = [{name, test_pool_1},
  152. {max_count, 3},
  153. {init_count, 2},
  154. {start_mfa,
  155. {pooled_gs, start_link, [{"type-0"}]}}],
  156. application:unset_env(pooler, pools),
  157. error_logger:delete_report_handler(error_logger_tty_h),
  158. application:start(pooler),
  159. supervisor:start_link(fake_external_supervisor, Pool)
  160. end,
  161. fun({ok, SupPid}) ->
  162. exit(SupPid, normal),
  163. application:stop(pooler)
  164. end,
  165. basic_tests()}}.
  166. basic_tests() ->
  167. [
  168. {"there are init_count members at start",
  169. fun() ->
  170. Stats = [ P || {P, {_, free, _}} <- pooler:pool_stats(test_pool_1) ],
  171. ?assertEqual(2, length(Stats))
  172. end},
  173. {"take and return one",
  174. fun() ->
  175. P = pooler:take_member(test_pool_1),
  176. ?assertMatch({"type-0", _Id}, pooled_gs:get_id(P)),
  177. ok = pooler:return_member(test_pool_1, P, ok)
  178. end},
  179. {"take and return one, named pool",
  180. fun() ->
  181. P = pooler:take_member(test_pool_1),
  182. ?assertMatch({"type-0", _Id}, pooled_gs:get_id(P)),
  183. ok, pooler:return_member(test_pool_1, P)
  184. end},
  185. {"attempt to take form unknown pool",
  186. fun() ->
  187. %% since pools are now servers, an unknown pool will timeout
  188. ?assertExit({noproc, _}, pooler:take_member(bad_pool_name))
  189. end},
  190. {"members creation is triggered after pool exhaustion until max",
  191. fun() ->
  192. %% init count is 2
  193. Pids0 = [pooler:take_member(test_pool_1), pooler:take_member(test_pool_1)],
  194. %% since new member creation is async, can only assert
  195. %% that we will get a pid, but may not be first try.
  196. Pids = get_n_pids(1, Pids0),
  197. %% pool is at max now, requests should give error
  198. ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
  199. ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
  200. PRefs = [ R || {_T, R} <- [ pooled_gs:get_id(P) || P <- Pids ] ],
  201. % no duplicates
  202. ?assertEqual(length(PRefs), length(lists:usort(PRefs)))
  203. end
  204. },
  205. {"pids are reused most recent return first",
  206. fun() ->
  207. P1 = pooler:take_member(test_pool_1),
  208. P2 = pooler:take_member(test_pool_1),
  209. ?assertNot(P1 == P2),
  210. ok = pooler:return_member(test_pool_1, P1, ok),
  211. ok = pooler:return_member(test_pool_1, P2, ok),
  212. % pids are reused most recent first
  213. ?assertEqual(P2, pooler:take_member(test_pool_1)),
  214. ?assertEqual(P1, pooler:take_member(test_pool_1))
  215. end},
  216. {"if an in-use pid crashes it is replaced",
  217. fun() ->
  218. Pids0 = get_n_pids(3, []),
  219. Ids0 = [ pooled_gs:get_id(P) || P <- Pids0 ],
  220. % crash them all
  221. [ pooled_gs:crash(P) || P <- Pids0 ],
  222. Pids1 = get_n_pids(3, []),
  223. Ids1 = [ pooled_gs:get_id(P) || P <- Pids1 ],
  224. [ ?assertNot(lists:member(I, Ids0)) || I <- Ids1 ]
  225. end
  226. },
  227. {"if a free pid crashes it is replaced",
  228. fun() ->
  229. FreePids = [ P || {P, {_, free, _}} <- pooler:pool_stats(test_pool_1) ],
  230. [ exit(P, kill) || P <- FreePids ],
  231. Pids1 = get_n_pids(3, []),
  232. ?assertEqual(3, length(Pids1))
  233. end},
  234. {"if a pid is returned with bad status it is replaced",
  235. fun() ->
  236. Pids0 = get_n_pids(3, []),
  237. Ids0 = [ pooled_gs:get_id(P) || P <- Pids0 ],
  238. % return them all marking as bad
  239. [ pooler:return_member(test_pool_1, P, fail) || P <- Pids0 ],
  240. Pids1 = get_n_pids(3, []),
  241. Ids1 = [ pooled_gs:get_id(P) || P <- Pids1 ],
  242. [ ?assertNot(lists:member(I, Ids0)) || I <- Ids1 ]
  243. end
  244. },
  245. {"if a consumer crashes, pid is replaced",
  246. fun() ->
  247. Consumer = start_user(),
  248. StartId = user_id(Consumer),
  249. user_crash(Consumer),
  250. NewPid = hd(get_n_pids(1, [])),
  251. NewId = pooled_gs:get_id(NewPid),
  252. ?assertNot(NewId == StartId)
  253. end
  254. },
  255. {"it is ok to return an unknown pid",
  256. fun() ->
  257. Bogus1 = spawn(fun() -> ok end),
  258. Bogus2 = spawn(fun() -> ok end),
  259. ?assertEqual(ok, pooler:return_member(test_pool_1, Bogus1, ok)),
  260. ?assertEqual(ok, pooler:return_member(test_pool_1, Bogus2, fail))
  261. end
  262. },
  263. {"it is ok to return a pid more than once",
  264. fun() ->
  265. M = pooler:take_member(test_pool_1),
  266. [ pooler:return_member(test_pool_1, M)
  267. || _I <- lists:seq(1, 37) ],
  268. M1 = pooler:take_member(test_pool_1),
  269. M2 = pooler:take_member(test_pool_1),
  270. ?assert(M1 =/= M2),
  271. Pool1 = gen_server:call(test_pool_1, dump_pool),
  272. ?assertEqual(2, Pool1#pool.in_use_count),
  273. ?assertEqual(0, Pool1#pool.free_count),
  274. pooler:return_member(test_pool_1, M1),
  275. pooler:return_member(test_pool_1, M2),
  276. Pool2 = gen_server:call(test_pool_1, dump_pool),
  277. ?assertEqual(0, Pool2#pool.in_use_count),
  278. ?assertEqual(2, Pool2#pool.free_count),
  279. ok
  280. end},
  281. {"calling return_member on error_no_members is ignored",
  282. fun() ->
  283. ?assertEqual(ok, pooler:return_member(test_pool_1, error_no_members)),
  284. ?assertEqual(ok, pooler:return_member(test_pool_1, error_no_members, ok)),
  285. ?assertEqual(ok, pooler:return_member(test_pool_1, error_no_members, fail))
  286. end
  287. },
  288. {"dynamic pool creation",
  289. fun() ->
  290. PoolSpec = [{name, dyn_pool_1},
  291. {max_count, 3},
  292. {init_count, 2},
  293. {start_mfa,
  294. {pooled_gs, start_link, [{"dyn-0"}]}}],
  295. {ok, SupPid1} = pooler:new_pool(PoolSpec),
  296. ?assert(is_pid(SupPid1)),
  297. M = pooler:take_member(dyn_pool_1),
  298. ?assertMatch({"dyn-0", _Id}, pooled_gs:get_id(M)),
  299. ?assertEqual(ok, pooler:rm_pool(dyn_pool_1)),
  300. ?assertExit({noproc, _}, pooler:take_member(dyn_pool_1)),
  301. %% verify pool of same name can be created after removal
  302. {ok, SupPid2} = pooler:new_pool(PoolSpec),
  303. ?assert(is_pid(SupPid2)),
  304. %% remove non-existing pool
  305. ?assertEqual(ok, pooler:rm_pool(dyn_pool_X)),
  306. ?assertEqual(ok, pooler:rm_pool(dyn_pool_1))
  307. end},
  308. {"metrics have been called",
  309. fun() ->
  310. %% exercise the API to ensure we have certain keys reported as metrics
  311. fake_metrics:reset_metrics(),
  312. Pids = [ pooler:take_member(test_pool_1) || _I <- lists:seq(1, 10) ],
  313. [ pooler:return_member(test_pool_1, P) || P <- Pids ],
  314. catch pooler:take_member(bad_pool_name),
  315. %% kill and unused member
  316. exit(hd(Pids), kill),
  317. %% kill a used member
  318. KillMe = pooler:take_member(test_pool_1),
  319. exit(KillMe, kill),
  320. %% FIXME: We need to wait for pooler to process the
  321. %% exit message. This is ugly, will fix later.
  322. timer:sleep(200), % :(
  323. ExpectKeys = lists:sort([<<"pooler.test_pool_1.error_no_members_count">>,
  324. <<"pooler.test_pool_1.events">>,
  325. <<"pooler.test_pool_1.free_count">>,
  326. <<"pooler.test_pool_1.in_use_count">>,
  327. <<"pooler.test_pool_1.killed_free_count">>,
  328. <<"pooler.test_pool_1.killed_in_use_count">>,
  329. <<"pooler.test_pool_1.take_rate">>,
  330. <<"pooler.test_pool_1.queue_count">>]),
  331. Metrics = fake_metrics:get_metrics(),
  332. GotKeys = lists:usort([ Name || {Name, _, _} <- Metrics ]),
  333. ?assertEqual(ExpectKeys, GotKeys)
  334. end},
  335. {"accept bad member is handled",
  336. fun() ->
  337. Bad = spawn(fun() -> ok end),
  338. FakeStarter = spawn(fun() -> starter end),
  339. ?assertEqual(ok, pooler:accept_member(test_pool_1, {FakeStarter, Bad}))
  340. end}
  341. ].
  342. pooler_groups_test_() ->
  343. {setup,
  344. fun() ->
  345. application:set_env(pooler, metrics_module, fake_metrics),
  346. fake_metrics:start_link()
  347. end,
  348. fun(_X) ->
  349. fake_metrics:stop()
  350. end,
  351. {foreach,
  352. % setup
  353. fun() ->
  354. Pools = [[{name, test_pool_1},
  355. {group, group_1},
  356. {max_count, 3},
  357. {init_count, 2},
  358. {start_mfa,
  359. {pooled_gs, start_link, [{"type-1-1"}]}}],
  360. [{name, test_pool_2},
  361. {group, group_1},
  362. {max_count, 3},
  363. {init_count, 2},
  364. {start_mfa,
  365. {pooled_gs, start_link, [{"type-1-2"}]}}],
  366. %% test_pool_3 not part of the group
  367. [{name, test_pool_3},
  368. {group, undefined},
  369. {max_count, 3},
  370. {init_count, 2},
  371. {start_mfa,
  372. {pooled_gs, start_link, [{"type-3"}]}}]
  373. ],
  374. application:set_env(pooler, pools, Pools),
  375. %% error_logger:delete_report_handler(error_logger_tty_h),
  376. pg2:start(),
  377. application:start(pooler)
  378. end,
  379. fun(_X) ->
  380. application:stop(pooler),
  381. application:stop(pg2)
  382. end,
  383. [
  384. {"take and return one group member (repeated)",
  385. fun() ->
  386. Types = [ begin
  387. Pid = pooler:take_group_member(group_1),
  388. {Type, _} = pooled_gs:get_id(Pid),
  389. ?assertMatch("type-1" ++ _, Type),
  390. ok = pooler:return_group_member(group_1, Pid, ok),
  391. Type
  392. end
  393. || _I <- lists:seq(1, 50) ],
  394. Type_1_1 = [ X || "type-1-1" = X <- Types ],
  395. Type_1_2 = [ X || "type-1-2" = X <- Types ],
  396. ?assert(length(Type_1_1) > 0),
  397. ?assert(length(Type_1_2) > 0)
  398. end},
  399. {"take member from unknown group",
  400. fun() ->
  401. ?assertEqual({error_no_group, not_a_group},
  402. pooler:take_group_member(not_a_group))
  403. end},
  404. {"return member to unknown group",
  405. fun() ->
  406. Pid = pooler:take_group_member(group_1),
  407. ?assertEqual(ok, pooler:return_group_member(no_such_group, Pid))
  408. end},
  409. {"return member to wrong group",
  410. fun() ->
  411. Pid = pooler:take_member(test_pool_3),
  412. ?assertEqual(ok, pooler:return_group_member(group_1, Pid))
  413. end},
  414. {"take member from empty group",
  415. fun() ->
  416. %% artificially empty group member list
  417. [ pg2:leave(group_1, M) || M <- pg2:get_members(group_1) ],
  418. ?assertEqual(error_no_members, pooler:take_group_member(group_1))
  419. end},
  420. {"return member to group, implied ok",
  421. fun() ->
  422. Pid = pooler:take_group_member(group_1),
  423. ?assertEqual(ok, pooler:return_group_member(group_1, Pid))
  424. end},
  425. {"return error_no_member to group",
  426. fun() ->
  427. ?assertEqual(ok, pooler:return_group_member(group_1, error_no_members))
  428. end},
  429. {"exhaust pools in group",
  430. fun() ->
  431. Pids = get_n_pids_group(group_1, 6, []),
  432. %% they should all be pids
  433. [ begin
  434. {Type, _} = pooled_gs:get_id(P),
  435. ?assertMatch("type-1" ++ _, Type),
  436. ok
  437. end || P <- Pids ],
  438. %% further attempts should be error
  439. [error_no_members,
  440. error_no_members,
  441. error_no_members] = [ pooler:take_group_member(group_1)
  442. || _I <- lists:seq(1, 3) ]
  443. end},
  444. {"rm_group with nonexisting group",
  445. fun() ->
  446. ?assertEqual(ok, pooler:rm_group(i_dont_exist))
  447. end},
  448. {"rm_group with existing empty group",
  449. fun() ->
  450. ?assertEqual(ok, pooler:rm_pool(test_pool_1)),
  451. ?assertEqual(ok, pooler:rm_pool(test_pool_2)),
  452. ?assertEqual(error_no_members, pooler:take_group_member(group_1)),
  453. ?assertEqual(ok, pooler:rm_group(group_1)),
  454. ?assertExit({noproc, _}, pooler:take_member(test_pool_1)),
  455. ?assertExit({noproc, _}, pooler:take_member(test_pool_2)),
  456. ?assertEqual({error_no_group, group_1},
  457. pooler:take_group_member(group_1))
  458. end},
  459. {"rm_group with existing non-empty group",
  460. fun() ->
  461. %% Verify that group members exist
  462. MemberPid = pooler:take_group_member(group_1),
  463. ?assert(is_pid(MemberPid)),
  464. pooler:return_group_member(group_1, MemberPid),
  465. Pool1Pid = pooler:take_member(test_pool_1),
  466. ?assert(is_pid(Pool1Pid)),
  467. pooler:return_member(test_pool_1, Pool1Pid),
  468. Pool2Pid = pooler:take_member(test_pool_2),
  469. ?assert(is_pid(Pool2Pid)),
  470. pooler:return_member(test_pool_2, Pool2Pid),
  471. %% Delete and verify that group and pools are destroyed
  472. ?assertEqual(ok, pooler:rm_group(group_1)),
  473. ?assertExit({noproc, _}, pooler:take_member(test_pool_1)),
  474. ?assertExit({noproc, _}, pooler:take_member(test_pool_2)),
  475. ?assertEqual({error_no_group, group_1},
  476. pooler:take_group_member(group_1))
  477. end}
  478. ]}}.
  479. pooler_limit_failed_adds_test_() ->
  480. %% verify that pooler crashes completely if too many failures are
  481. %% encountered while trying to add pids.
  482. {setup,
  483. fun() ->
  484. Pools = [[{name, test_pool_1},
  485. {max_count, 10},
  486. {init_count, 10},
  487. {start_mfa,
  488. {pooled_gs, start_link, [crash]}}]],
  489. application:set_env(pooler, pools, Pools)
  490. end,
  491. fun(_) ->
  492. application:stop(pooler)
  493. end,
  494. fun() ->
  495. application:start(pooler),
  496. ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
  497. ?assertEqual(error_no_members, pooler:take_member(test_pool_1))
  498. end}.
  499. pooler_scheduled_cull_test_() ->
  500. {setup,
  501. fun() ->
  502. application:set_env(pooler, metrics_module, fake_metrics),
  503. fake_metrics:start_link(),
  504. Pools = [[{name, test_pool_1},
  505. {max_count, 10},
  506. {init_count, 2},
  507. {start_mfa, {pooled_gs, start_link, [{"type-0"}]}},
  508. {cull_interval, {200, ms}},
  509. {max_age, {0, min}}]],
  510. application:set_env(pooler, pools, Pools),
  511. %% error_logger:delete_report_handler(error_logger_tty_h),
  512. application:start(pooler)
  513. end,
  514. fun(_X) ->
  515. fake_metrics:stop(),
  516. application:stop(pooler)
  517. end,
  518. [
  519. {foreach,
  520. fun() ->
  521. Pids = get_n_pids(test_pool_1, 10, []),
  522. ?assertEqual(10, length(pooler:pool_stats(test_pool_1))),
  523. ?assertEqual(10, length(Pids)),
  524. Pids
  525. end,
  526. fun(Pids) ->
  527. [ pooler:return_member(test_pool_1, P) || P <- Pids ]
  528. end,
  529. [
  530. fun(Pids) ->
  531. {"excess members are culled run 1",
  532. fun() ->
  533. [ pooler:return_member(test_pool_1, P) || P <- Pids ],
  534. %% wait for longer than cull delay
  535. timer:sleep(250),
  536. ?assertEqual(2, length(pooler:pool_stats(test_pool_1)))
  537. end}
  538. end,
  539. fun(Pids) ->
  540. {"excess members are culled run 2",
  541. fun() ->
  542. [ pooler:return_member(test_pool_1, P) || P <- Pids ],
  543. %% wait for longer than cull delay
  544. timer:sleep(250),
  545. ?assertEqual(2, length(pooler:pool_stats(test_pool_1)))
  546. end}
  547. end,
  548. fun(Pids) -> in_use_members_not_culled(Pids, 1) end,
  549. fun(Pids) -> in_use_members_not_culled(Pids, 2) end,
  550. fun(Pids) -> in_use_members_not_culled(Pids, 3) end,
  551. fun(Pids) -> in_use_members_not_culled(Pids, 4) end,
  552. fun(Pids) -> in_use_members_not_culled(Pids, 5) end,
  553. fun(Pids) -> in_use_members_not_culled(Pids, 6) end
  554. ]},
  555. {"no cull when init_count matches max_count",
  556. %% not sure how to verify this. But this test at least
  557. %% exercises the code path.
  558. fun() ->
  559. Config = [{name, test_static_pool_1},
  560. {max_count, 2},
  561. {init_count, 2},
  562. {start_mfa, {pooled_gs, start_link, [{"static-0"}]}},
  563. {cull_interval, {200, ms}}], % ignored
  564. pooler:new_pool(Config),
  565. P = pooler:take_member(test_static_pool_1),
  566. ?assertMatch({"static-0", _}, pooled_gs:get_id(P)),
  567. pooler:return_member(test_static_pool_1, P),
  568. ok
  569. end}
  570. ]}.
  571. in_use_members_not_culled(Pids, N) ->
  572. {"in-use members are not culled " ++ erlang:integer_to_list(N),
  573. fun() ->
  574. %% wait for longer than cull delay
  575. timer:sleep(250),
  576. PidCount = length(Pids),
  577. ?assertEqual(PidCount,
  578. length(pooler:pool_stats(test_pool_1))),
  579. Returns = lists:sublist(Pids, N),
  580. [ pooler:return_member(test_pool_1, P)
  581. || P <- Returns ],
  582. timer:sleep(250),
  583. ?assertEqual(PidCount - N,
  584. length(pooler:pool_stats(test_pool_1)))
  585. end}.
  586. random_message_test_() ->
  587. {setup,
  588. fun() ->
  589. Pools = [[{name, test_pool_1},
  590. {max_count, 2},
  591. {init_count, 1},
  592. {start_mfa,
  593. {pooled_gs, start_link, [{"type-0"}]}}]],
  594. application:set_env(pooler, pools, Pools),
  595. error_logger:delete_report_handler(error_logger_tty_h),
  596. application:start(pooler),
  597. %% now send some bogus messages
  598. %% do the call in a throw-away process to avoid timeout error
  599. spawn(fun() -> catch gen_server:call(test_pool_1, {unexpected_garbage_msg, 5}) end),
  600. gen_server:cast(test_pool_1, {unexpected_garbage_msg, 6}),
  601. whereis(test_pool_1) ! {unexpected_garbage_msg, 7},
  602. ok
  603. end,
  604. fun(_) ->
  605. application:stop(pooler)
  606. end,
  607. [
  608. fun() ->
  609. Pid = spawn(fun() -> ok end),
  610. MonMsg = {'DOWN', erlang:make_ref(), process, Pid, because},
  611. test_pool_1 ! MonMsg
  612. end,
  613. fun() ->
  614. Pid = pooler:take_member(test_pool_1),
  615. {Type, _} = pooled_gs:get_id(Pid),
  616. ?assertEqual("type-0", Type)
  617. end,
  618. fun() ->
  619. RawPool = gen_server:call(test_pool_1, dump_pool),
  620. ?assertEqual(pool, element(1, RawPool))
  621. end
  622. ]}.
  623. pooler_integration_long_init_test_() ->
  624. {foreach,
  625. % setup
  626. fun() ->
  627. Pool = [{name, test_pool_1},
  628. {max_count, 10},
  629. {init_count, 0},
  630. {member_start_timeout, {10, ms}},
  631. {start_mfa,
  632. {pooled_gs, start_link, [{"type-0", fun() -> timer:sleep(15) end}]}}],
  633. application:set_env(pooler, pools, [Pool]),
  634. application:start(pooler)
  635. end,
  636. % cleanup
  637. fun(_) ->
  638. application:stop(pooler)
  639. end,
  640. %
  641. [
  642. fun(_) ->
  643. % Test what happens when pool members take too long to start.
  644. % The pooler_starter should kill off stale members, there by
  645. % reducing the number of children of the member_sup. This
  646. % activity occurs both during take member and accept member.
  647. % Accordingly, the count should go to zero once all starters
  648. % check in.
  649. fun() ->
  650. ?assertEqual(0, children_count(pooler_test_pool_1_member_sup)),
  651. [begin
  652. ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
  653. ?assertEqual(1, starting_members(test_pool_1))
  654. end
  655. || _ <- lists:seq(1,10)],
  656. ?assertEqual(10, children_count(pooler_test_pool_1_member_sup)),
  657. timer:sleep(150),
  658. ?assertEqual(0, children_count(pooler_test_pool_1_member_sup)),
  659. ?assertEqual(0, starting_members(test_pool_1))
  660. end
  661. end
  662. ]
  663. }.
  664. sleep_for_configured_timeout() ->
  665. SleepTime = case application:get_env(pooler, sleep_time) of
  666. {ok, Val} ->
  667. Val;
  668. _ ->
  669. 0
  670. end,
  671. timer:sleep(SleepTime).
  672. pooler_integration_queueing_test_() ->
  673. {foreach,
  674. % setup
  675. fun() ->
  676. Pool = [{name, test_pool_1},
  677. {max_count, 10},
  678. {queue_max, 10},
  679. {init_count, 0},
  680. {metrics, fake_metrics},
  681. {member_start_timeout, {5, sec}},
  682. {start_mfa,
  683. {pooled_gs, start_link, [
  684. {"type-0",
  685. fun pooler_tests:sleep_for_configured_timeout/0 }
  686. ]
  687. }
  688. }
  689. ],
  690. application:set_env(pooler, pools, [Pool]),
  691. fake_metrics:start_link(),
  692. application:start(pooler)
  693. end,
  694. % cleanup
  695. fun(_) ->
  696. fake_metrics:stop(),
  697. application:stop(pooler)
  698. end,
  699. [
  700. fun(_) ->
  701. fun() ->
  702. ?assertEqual(0, (dump_pool(test_pool_1))#pool.free_count),
  703. Val = pooler:take_member(test_pool_1, 10),
  704. ?assert(is_pid(Val)),
  705. pooler:return_member(test_pool_1, Val)
  706. end
  707. end,
  708. fun(_) ->
  709. fun() ->
  710. application:set_env(pooler, sleep_time, 1),
  711. ?assertEqual(0, (dump_pool(test_pool_1))#pool.free_count),
  712. Val = pooler:take_member(test_pool_1, 0),
  713. ?assertEqual(error_no_members, Val),
  714. timer:sleep(50),
  715. %Next request should be available
  716. Pid = pooler:take_member(test_pool_1, 0),
  717. ?assert(is_pid(Pid)),
  718. pooler:return_member(test_pool_1, Pid)
  719. end
  720. end,
  721. fun(_) ->
  722. fun() ->
  723. application:set_env(pooler, sleep_time, 10),
  724. ?assertEqual(0, (dump_pool(test_pool_1))#pool.free_count),
  725. [
  726. ?assertEqual(pooler:take_member(test_pool_1, 0), error_no_members) ||
  727. _ <- lists:seq(1, (dump_pool(test_pool_1))#pool.max_count)],
  728. timer:sleep(50),
  729. %Next request should be available
  730. Pid = pooler:take_member(test_pool_1, 0),
  731. ?assert(is_pid(Pid)),
  732. pooler:return_member(test_pool_1, Pid)
  733. end
  734. end,
  735. fun(_) ->
  736. fun() ->
  737. % fill to queue_max, next request should return immediately with no_members
  738. % Will return a if queue max is not enforced.
  739. application:set_env(pooler, sleep_time, 100),
  740. [ proc_lib:spawn(fun() ->
  741. Val = pooler:take_member(test_pool_1, 200),
  742. ?assert(is_pid(Val)),
  743. pooler:return_member(Val)
  744. end)
  745. || _ <- lists:seq(1, (dump_pool(test_pool_1))#pool.max_count)
  746. ],
  747. timer:sleep(50),
  748. ?assertEqual(10, queue:len((dump_pool(test_pool_1))#pool.queued_requestors)),
  749. ?assertEqual(pooler:take_member(test_pool_1, 500), error_no_members),
  750. ExpectKeys = lists:sort([<<"pooler.test_pool_1.error_no_members_count">>,
  751. <<"pooler.test_pool_1.events">>,
  752. <<"pooler.test_pool_1.take_rate">>,
  753. <<"pooler.test_pool_1.queue_count">>,
  754. <<"pooler.test_pool_1.queue_max_reached">>]),
  755. Metrics = fake_metrics:get_metrics(),
  756. GotKeys = lists:usort([ Name || {Name, _, _} <- Metrics ]),
  757. ?assertEqual(ExpectKeys, GotKeys),
  758. timer:sleep(100),
  759. Val = pooler:take_member(test_pool_1, 500),
  760. ?assert(is_pid(Val)),
  761. pooler:return_member(test_pool_1, Val)
  762. end
  763. end
  764. ]
  765. }.
  766. pooler_integration_queueing_return_member_test_() ->
  767. {foreach,
  768. % setup
  769. fun() ->
  770. Pool = [{name, test_pool_1},
  771. {max_count, 10},
  772. {queue_max, 10},
  773. {init_count, 10},
  774. {metrics, fake_metrics},
  775. {member_start_timeout, {5, sec}},
  776. {start_mfa,
  777. {pooled_gs, start_link, [
  778. {"type-0",
  779. fun pooler_tests:sleep_for_configured_timeout/0 }
  780. ]
  781. }
  782. }
  783. ],
  784. application:set_env(pooler, pools, [Pool]),
  785. fake_metrics:start_link(),
  786. application:start(pooler)
  787. end,
  788. % cleanup
  789. fun(_) ->
  790. fake_metrics:stop(),
  791. application:stop(pooler)
  792. end,
  793. [
  794. fun(_) ->
  795. fun() ->
  796. application:set_env(pooler, sleep_time, 0),
  797. Pids = [ proc_lib:spawn_link(fun() ->
  798. Val = pooler:take_member(test_pool_1, 200),
  799. ?assert(is_pid(Val)),
  800. receive
  801. _ ->
  802. pooler:return_member(test_pool_1, Val)
  803. after
  804. 5000 ->
  805. pooler:return_member(test_pool_1, Val)
  806. end
  807. end)
  808. || _ <- lists:seq(1, (dump_pool(test_pool_1))#pool.max_count)
  809. ],
  810. timer:sleep(50),
  811. Parent = self(),
  812. proc_lib:spawn_link(fun() ->
  813. Val = pooler:take_member(test_pool_1, 200),
  814. Parent ! Val
  815. end),
  816. [Pid ! return || Pid <- Pids],
  817. receive
  818. Result ->
  819. ?assert(is_pid(Result))
  820. end
  821. end
  822. end
  823. ]
  824. }.
  825. pooler_integration_test_() ->
  826. {foreach,
  827. % setup
  828. fun() ->
  829. Pools = [[{name, test_pool_1},
  830. {max_count, 10},
  831. {init_count, 10},
  832. {start_mfa,
  833. {pooled_gs, start_link, [{"type-0"}]}}]],
  834. application:set_env(pooler, pools, Pools),
  835. error_logger:delete_report_handler(error_logger_tty_h),
  836. application:start(pooler),
  837. Users = [ start_user() || _X <- lists:seq(1, 10) ],
  838. Users
  839. end,
  840. % cleanup
  841. fun(Users) ->
  842. [ user_stop(U) || U <- Users ],
  843. application:stop(pooler)
  844. end,
  845. %
  846. [
  847. fun(Users) ->
  848. fun() ->
  849. % each user has a different tc ID
  850. TcIds = lists:sort([ user_id(UPid) || UPid <- Users ]),
  851. ?assertEqual(lists:usort(TcIds), TcIds)
  852. end
  853. end
  854. ,
  855. fun(Users) ->
  856. fun() ->
  857. % users still unique after a renew cycle
  858. [ user_new_tc(UPid) || UPid <- Users ],
  859. TcIds = lists:sort([ user_id(UPid) || UPid <- Users ]),
  860. ?assertEqual(lists:usort(TcIds), TcIds)
  861. end
  862. end
  863. ,
  864. fun(Users) ->
  865. fun() ->
  866. % all users crash, pids are replaced
  867. TcIds1 = lists:sort([ user_id(UPid) || UPid <- Users ]),
  868. [ user_crash(UPid) || UPid <- Users ],
  869. Seq = lists:seq(1, 5),
  870. Users2 = [ start_user() || _X <- Seq ],
  871. TcIds2 = lists:sort([ user_id(UPid) || UPid <- Users2 ]),
  872. Both =
  873. sets:to_list(sets:intersection([sets:from_list(TcIds1),
  874. sets:from_list(TcIds2)])),
  875. ?assertEqual([], Both)
  876. end
  877. end
  878. ]
  879. }.
  880. time_as_millis_test_() ->
  881. Zeros = [ {{0, U}, 0} || U <- [min, sec, ms, mu] ],
  882. Ones = [{{1, min}, 60000},
  883. {{1, sec}, 1000},
  884. {{1, ms}, 1},
  885. {{1, mu}, 0}],
  886. Misc = [{{3000, mu}, 3}],
  887. Tests = Zeros ++ Ones ++ Misc,
  888. [ ?_assertEqual(E, pooler:time_as_millis(I)) || {I, E} <- Tests ].
  889. time_as_micros_test_() ->
  890. Zeros = [ {{0, U}, 0} || U <- [min, sec, ms, mu] ],
  891. Ones = [{{1, min}, 60000000},
  892. {{1, sec}, 1000000},
  893. {{1, ms}, 1000},
  894. {{1, mu}, 1}],
  895. Misc = [{{3000, mu}, 3000}],
  896. Tests = Zeros ++ Ones ++ Misc,
  897. [ ?_assertEqual(E, pooler:time_as_micros(I)) || {I, E} <- Tests ].
  898. % testing crash recovery means race conditions when either pids
  899. % haven't yet crashed or pooler hasn't recovered. So this helper loops
  900. % forver until N pids are obtained, ignoring error_no_members.
  901. get_n_pids(N, Acc) ->
  902. get_n_pids(test_pool_1, N, Acc).
  903. get_n_pids(_Pool, 0, Acc) ->
  904. Acc;
  905. get_n_pids(Pool, N, Acc) ->
  906. case pooler:take_member(Pool) of
  907. error_no_members ->
  908. get_n_pids(Pool, N, Acc);
  909. Pid ->
  910. get_n_pids(Pool, N - 1, [Pid|Acc])
  911. end.
  912. get_n_pids_group(_Group, 0, Acc) ->
  913. Acc;
  914. get_n_pids_group(Group, N, Acc) ->
  915. case pooler:take_group_member(Group) of
  916. error_no_members ->
  917. get_n_pids_group(Group, N, Acc);
  918. Pid ->
  919. get_n_pids_group(Group, N - 1, [Pid|Acc])
  920. end.
  921. children_count(SupId) ->
  922. length(supervisor:which_children(SupId)).
  923. starting_members(PoolName) ->
  924. length((dump_pool(PoolName))#pool.starting_members).
  925. dump_pool(PoolName) ->
  926. gen_server:call(PoolName, dump_pool).