pooler_tests.erl 54 KB


  1. -module(pooler_tests).
  2. -include_lib("eunit/include/eunit.hrl").
  3. -include("../src/pooler.hrl").
  4. -export([sleep_for_configured_timeout/0]).
  5. % The `user' processes represent users of the pooler library. A user
  6. % process will take a pid, report details on the pid it has, release
  7. % and take a new pid, stop cleanly, and crash.
  8. start_user() ->
  9. spawn(fun() -> user_loop(start) end).
  10. user_id(Pid) ->
  11. Pid ! {get_tc_id, self()},
  12. receive
  13. {Type, Id} ->
  14. {Type, Id}
  15. end.
  16. user_new_tc(Pid) ->
  17. Pid ! new_tc.
  18. user_stop(Pid) ->
  19. Pid ! stop.
  20. user_crash(Pid) ->
  21. Pid ! crash.
  22. user_loop(Atom) when Atom =:= error_no_members orelse Atom =:= start ->
  23. user_loop(pooler:take_member(test_pool_1));
  24. user_loop(MyTC) ->
  25. receive
  26. {get_tc_id, From} ->
  27. From ! pooled_gs:get_id(MyTC),
  28. user_loop(MyTC);
  29. {ping_tc, From} ->
  30. From ! pooled_gs:ping(MyTC),
  31. user_loop(MyTC);
  32. {ping_count, From} ->
  33. From ! pooled_gs:ping_count(MyTC),
  34. user_loop(MyTC);
  35. new_tc ->
  36. pooler:return_member(test_pool_1, MyTC, ok),
  37. MyNewTC = pooler:take_member(test_pool_1),
  38. user_loop(MyNewTC);
  39. stop ->
  40. pooler:return_member(test_pool_1, MyTC, ok),
  41. stopped;
  42. crash ->
  43. erlang:error({user_loop, kaboom})
  44. end.
  45. % The `tc' processes represent the pids tracked by pooler for testing.
  46. % They have a type and an ID and can report their type and ID and
  47. % stop.
  48. %% tc_loop({Type, Id}) ->
  49. %% receive
  50. %% {get_id, From} ->
  51. %% From ! {ok, Type, Id},
  52. %% tc_loop({Type, Id});
  53. %% stop -> stopped;
  54. %% crash ->
  55. %% erlang:error({tc_loop, kaboom})
  56. %% end.
  57. %% get_tc_id(Pid) ->
  58. %% Pid ! {get_id, self()},
  59. %% receive
  60. %% {ok, Type, Id} ->
  61. %% {Type, Id}
  62. %% after 200 ->
  63. %% timeout
  64. %% end.
  65. %% stop_tc(Pid) ->
  66. %% Pid ! stop.
  67. %% tc_starter(Type) ->
  68. %% Ref = make_ref(),
  69. %% spawn_link(fun() -> tc_loop({Type, Ref}) end).
  70. %% assert_tc_valid(Pid) ->
  71. %% ?assertMatch({_Type, _Ref}, get_tc_id(Pid)),
  72. %% ok.
  73. % tc_sanity_test() ->
  74. % Pid1 = tc_starter("1"),
  75. % {"1", Id1} = get_tc_id(Pid1),
  76. % Pid2 = tc_starter("1"),
  77. % {"1", Id2} = get_tc_id(Pid2),
  78. % ?assertNot(Id1 == Id2),
  79. % stop_tc(Pid1),
  80. % stop_tc(Pid2).
  81. % user_sanity_test() ->
  82. % Pid1 = tc_starter("1"),
  83. % User = spawn(fun() -> user_loop(Pid1) end),
  84. % ?assertMatch({"1", _Ref}, user_id(User)),
  85. % user_crash(User),
  86. % stop_tc(Pid1).
  87. pooler_basics_via_config_test_() ->
  88. {setup,
  89. fun() ->
  90. {ok, _} = error_logger_mon:start_link(),
  91. error_logger_mon:install_handler(pooler),
  92. logger:set_handler_config(default, filters, []),
  93. application:set_env(pooler, metrics_module, fake_metrics),
  94. fake_metrics:start_link()
  95. end,
  96. fun(_X) ->
  97. error_logger_mon:uninstall_handler(),
  98. ok = error_logger_mon:stop(),
  99. fake_metrics:stop()
  100. end,
  101. {foreach,
  102. % setup
  103. fun() ->
  104. error_logger_mon:reset(),
  105. Pools = [
  106. [
  107. {name, test_pool_1},
  108. {max_count, 3},
  109. {init_count, 2},
  110. {cull_interval, {0, min}},
  111. {start_mfa, {pooled_gs, start_link, [{"type-0"}]}}
  112. ]
  113. ],
  114. application:set_env(pooler, pools, Pools),
  115. application:start(pooler)
  116. end,
  117. fun(_X) ->
  118. application:stop(pooler)
  119. end,
  120. basic_tests()}}.
  121. pooler_basics_dynamic_test_() ->
  122. {setup,
  123. fun() ->
  124. {ok, _} = error_logger_mon:start_link(),
  125. error_logger_mon:install_handler(pooler),
  126. logger:set_handler_config(default, filters, []),
  127. application:set_env(pooler, metrics_module, fake_metrics),
  128. fake_metrics:start_link()
  129. end,
  130. fun(_X) ->
  131. error_logger_mon:uninstall_handler(),
  132. ok = error_logger_mon:stop(),
  133. fake_metrics:stop()
  134. end,
  135. {foreach,
  136. % setup
  137. fun() ->
  138. error_logger_mon:reset(),
  139. Pool = [
  140. {name, test_pool_1},
  141. {max_count, 3},
  142. {init_count, 2},
  143. {start_mfa, {pooled_gs, start_link, [{"type-0"}]}}
  144. ],
  145. application:unset_env(pooler, pools),
  146. application:start(pooler),
  147. pooler:new_pool(Pool)
  148. end,
  149. fun(_X) ->
  150. application:stop(pooler)
  151. end,
  152. basic_tests()}}.
  153. pooler_basics_integration_to_other_supervisor_test_() ->
  154. {setup,
  155. fun() ->
  156. {ok, _} = error_logger_mon:start_link(),
  157. error_logger_mon:install_handler(pooler),
  158. logger:set_handler_config(default, filters, []),
  159. application:set_env(pooler, metrics_module, fake_metrics),
  160. fake_metrics:start_link()
  161. end,
  162. fun(_X) ->
  163. error_logger_mon:uninstall_handler(),
  164. ok = error_logger_mon:stop(),
  165. fake_metrics:stop()
  166. end,
  167. {foreach,
  168. % setup
  169. fun() ->
  170. error_logger_mon:reset(),
  171. Pool = [
  172. {name, test_pool_1},
  173. {max_count, 3},
  174. {init_count, 2},
  175. {start_mfa, {pooled_gs, start_link, [{"type-0"}]}}
  176. ],
  177. application:unset_env(pooler, pools),
  178. application:start(pooler),
  179. supervisor:start_link(fake_external_supervisor, Pool)
  180. end,
  181. fun({ok, SupPid}) ->
  182. exit(SupPid, normal),
  183. application:stop(pooler)
  184. end,
  185. basic_tests()}}.
  186. basic_tests() ->
  187. [
  188. {"there are init_count members at start", fun() ->
  189. Stats = [P || {P, {_, free, _}} <- pooler:pool_stats(test_pool_1)],
  190. ?assertEqual(2, length(Stats))
  191. end},
  192. {"take and return one", fun() ->
  193. P = pooler:take_member(test_pool_1),
  194. ?assertMatch({"type-0", _Id}, pooled_gs:get_id(P)),
  195. ok = pooler:return_member(test_pool_1, P, ok)
  196. end},
  197. {"take and return one, named pool", fun() ->
  198. P = pooler:take_member(test_pool_1),
  199. ?assertMatch({"type-0", _Id}, pooled_gs:get_id(P)),
  200. ok,
  201. pooler:return_member(test_pool_1, P)
  202. end},
  203. {"attempt to take form unknown pool", fun() ->
  204. %% since pools are now servers, an unknown pool will timeout
  205. ?assertExit({noproc, _}, pooler:take_member(bad_pool_name))
  206. end},
  207. {"members creation is triggered after pool exhaustion until max", fun() ->
  208. %% init count is 2
  209. Pids0 = [pooler:take_member(test_pool_1), pooler:take_member(test_pool_1)],
  210. %% since new member creation is async, can only assert
  211. %% that we will get a pid, but may not be first try.
  212. Pids = get_n_pids(1, Pids0),
  213. %% pool is at max now, requests should give error
  214. ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
  215. ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
  216. PRefs = [R || {_T, R} <- [pooled_gs:get_id(P) || P <- Pids]],
  217. % no duplicates
  218. ?assertEqual(length(PRefs), length(lists:usort(PRefs)))
  219. end},
  220. {"pids are reused most recent return first", fun() ->
  221. P1 = pooler:take_member(test_pool_1),
  222. P2 = pooler:take_member(test_pool_1),
  223. ?assertNot(P1 == P2),
  224. ok = pooler:return_member(test_pool_1, P1, ok),
  225. ok = pooler:return_member(test_pool_1, P2, ok),
  226. % pids are reused most recent first
  227. ?assertEqual(P2, pooler:take_member(test_pool_1)),
  228. ?assertEqual(P1, pooler:take_member(test_pool_1))
  229. end},
  230. {"if an in-use pid crashes it is replaced", fun() ->
  231. Pids0 = get_n_pids(3, []),
  232. Ids0 = [pooled_gs:get_id(P) || P <- Pids0],
  233. % crash them all
  234. [pooled_gs:crash(P) || P <- Pids0],
  235. Pids1 = get_n_pids(3, []),
  236. Ids1 = [pooled_gs:get_id(P) || P <- Pids1],
  237. [?assertNot(lists:member(I, Ids0)) || I <- Ids1]
  238. end},
  239. {"if a free pid crashes it is replaced", fun() ->
  240. FreePids = [P || {P, {_, free, _}} <- pooler:pool_stats(test_pool_1)],
  241. [exit(P, kill) || P <- FreePids],
  242. Pids1 = get_n_pids(3, []),
  243. ?assertEqual(3, length(Pids1))
  244. end},
  245. {"if a pid is returned with bad status it is replaced", fun() ->
  246. Pids0 = get_n_pids(3, []),
  247. Ids0 = [pooled_gs:get_id(P) || P <- Pids0],
  248. % return them all marking as bad
  249. [pooler:return_member(test_pool_1, P, fail) || P <- Pids0],
  250. Pids1 = get_n_pids(3, []),
  251. Ids1 = [pooled_gs:get_id(P) || P <- Pids1],
  252. [?assertNot(lists:member(I, Ids0)) || I <- Ids1]
  253. end},
  254. {"if a consumer crashes, pid is replaced", fun() ->
  255. Consumer = start_user(),
  256. StartId = user_id(Consumer),
  257. user_crash(Consumer),
  258. NewPid = hd(get_n_pids(1, [])),
  259. NewId = pooled_gs:get_id(NewPid),
  260. ?assertNot(NewId == StartId)
  261. end},
  262. {"it is ok to return an unknown pid", fun() ->
  263. Bogus1 = spawn(fun() -> ok end),
  264. Bogus2 = spawn(fun() -> ok end),
  265. ?assertEqual(ok, pooler:return_member(test_pool_1, Bogus1, ok)),
  266. ?assertEqual(ok, pooler:return_member(test_pool_1, Bogus2, fail))
  267. end},
  268. {"it is ok to return a pid more than once", fun() ->
  269. M = pooler:take_member(test_pool_1),
  270. [
  271. pooler:return_member(test_pool_1, M)
  272. || _I <- lists:seq(1, 37)
  273. ],
  274. ?assertEqual(
  275. 36,
  276. length(
  277. lists:filter(
  278. fun
  279. (
  280. #{
  281. msg :=
  282. {report, #{
  283. label := "ignored return of free member",
  284. pid := Pid
  285. }}
  286. }
  287. ) ->
  288. Pid =:= M;
  289. (_) ->
  290. false
  291. end,
  292. error_logger_mon:get_msgs()
  293. )
  294. )
  295. ),
  296. M1 = pooler:take_member(test_pool_1),
  297. M2 = pooler:take_member(test_pool_1),
  298. ?assert(M1 =/= M2),
  299. Pool1 = gen_server:call(test_pool_1, dump_pool),
  300. ?assertEqual(2, Pool1#pool.in_use_count),
  301. ?assertEqual(0, Pool1#pool.free_count),
  302. pooler:return_member(test_pool_1, M1),
  303. pooler:return_member(test_pool_1, M2),
  304. Pool2 = gen_server:call(test_pool_1, dump_pool),
  305. ?assertEqual(0, Pool2#pool.in_use_count),
  306. ?assertEqual(2, Pool2#pool.free_count),
  307. ok
  308. end},
  309. {"calling return_member on error_no_members is ignored", fun() ->
  310. ?assertEqual(ok, pooler:return_member(test_pool_1, error_no_members)),
  311. ?assertEqual(ok, pooler:return_member(test_pool_1, error_no_members, ok)),
  312. ?assertEqual(ok, pooler:return_member(test_pool_1, error_no_members, fail))
  313. end},
  314. {"dynamic pool creation", fun() ->
  315. PoolSpec = [
  316. {name, dyn_pool_1},
  317. {max_count, 3},
  318. {init_count, 2},
  319. {start_mfa, {pooled_gs, start_link, [{"dyn-0"}]}}
  320. ],
  321. {ok, SupPid1} = pooler:new_pool(PoolSpec),
  322. ?assert(is_pid(SupPid1)),
  323. M = pooler:take_member(dyn_pool_1),
  324. ?assertMatch({"dyn-0", _Id}, pooled_gs:get_id(M)),
  325. ?assertEqual(ok, pooler:rm_pool(dyn_pool_1)),
  326. ?assertExit({noproc, _}, pooler:take_member(dyn_pool_1)),
  327. %% verify pool of same name can be created after removal
  328. {ok, SupPid2} = pooler:new_pool(PoolSpec),
  329. ?assert(is_pid(SupPid2)),
  330. %% remove non-existing pool
  331. ?assertEqual(ok, pooler:rm_pool(dyn_pool_X)),
  332. ?assertEqual(ok, pooler:rm_pool(dyn_pool_1))
  333. end},
  334. {"metrics have been called (no timeout/queue)", fun() ->
  335. %% exercise the API to ensure we have certain keys reported as metrics
  336. fake_metrics:reset_metrics(),
  337. Pids = [pooler:take_member(test_pool_1) || _I <- lists:seq(1, 10)],
  338. [pooler:return_member(test_pool_1, P) || P <- Pids],
  339. catch pooler:take_member(bad_pool_name),
  340. %% kill and unused member
  341. exit(hd(Pids), kill),
  342. %% kill a used member
  343. KillMe = pooler:take_member(test_pool_1),
  344. exit(KillMe, kill),
  345. %% FIXME: We need to wait for pooler to process the
  346. %% exit message. This is ugly, will fix later.
  347. % :(
  348. timer:sleep(200),
  349. ExpectKeys = lists:sort([
  350. <<"pooler.test_pool_1.error_no_members_count">>,
  351. <<"pooler.test_pool_1.events">>,
  352. <<"pooler.test_pool_1.free_count">>,
  353. <<"pooler.test_pool_1.in_use_count">>,
  354. <<"pooler.test_pool_1.killed_free_count">>,
  355. <<"pooler.test_pool_1.killed_in_use_count">>,
  356. <<"pooler.test_pool_1.take_rate">>
  357. ]),
  358. Metrics = fake_metrics:get_metrics(),
  359. GotKeys = lists:usort([Name || {Name, _, _} <- Metrics]),
  360. ?assertEqual(ExpectKeys, GotKeys)
  361. end},
  362. {"metrics have been called (with timeout/queue)", fun() ->
  363. %% exercise the API to ensure we have certain keys reported as metrics
  364. fake_metrics:reset_metrics(),
  365. %% pass a non-zero timeout here to exercise queueing
  366. Pids = [pooler:take_member(test_pool_1, 1) || _I <- lists:seq(1, 10)],
  367. [pooler:return_member(test_pool_1, P) || P <- Pids],
  368. catch pooler:take_member(bad_pool_name),
  369. %% kill and unused member
  370. exit(hd(Pids), kill),
  371. %% kill a used member
  372. KillMe = pooler:take_member(test_pool_1),
  373. exit(KillMe, kill),
  374. %% FIXME: We need to wait for pooler to process the
  375. %% exit message. This is ugly, will fix later.
  376. % :(
  377. timer:sleep(200),
  378. ExpectKeys = lists:sort([
  379. <<"pooler.test_pool_1.error_no_members_count">>,
  380. <<"pooler.test_pool_1.events">>,
  381. <<"pooler.test_pool_1.free_count">>,
  382. <<"pooler.test_pool_1.in_use_count">>,
  383. <<"pooler.test_pool_1.killed_free_count">>,
  384. <<"pooler.test_pool_1.killed_in_use_count">>,
  385. <<"pooler.test_pool_1.take_rate">>,
  386. <<"pooler.test_pool_1.queue_count">>
  387. ]),
  388. Metrics = fake_metrics:get_metrics(),
  389. GotKeys = lists:usort([Name || {Name, _, _} <- Metrics]),
  390. ?assertEqual(ExpectKeys, GotKeys)
  391. end},
  392. {"accept bad member is handled", fun() ->
  393. Bad = spawn(fun() -> ok end),
  394. FakeStarter = spawn(fun() -> starter end),
  395. ?assertEqual(ok, pooler:accept_member(test_pool_1, {FakeStarter, Bad}))
  396. end},
  397. {"utilization returns sane results", fun() ->
  398. #pool{max_count = MaxCount, queue_max = QueueMax} = sys:get_state(test_pool_1),
  399. ?assertEqual(MaxCount, ?gv(max_count, pooler:pool_utilization(test_pool_1))),
  400. ?assertEqual(0, ?gv(in_use_count, pooler:pool_utilization(test_pool_1))),
  401. ?assertEqual(2, ?gv(free_count, pooler:pool_utilization(test_pool_1))),
  402. ?assertEqual(0, ?gv(queued_count, pooler:pool_utilization(test_pool_1))),
  403. ?assertEqual(QueueMax, ?gv(queue_max, pooler:pool_utilization(test_pool_1)))
  404. end}
  405. ].
  406. pooler_groups_test_() ->
  407. {setup,
  408. fun() ->
  409. logger:set_handler_config(default, filters, []),
  410. application:set_env(pooler, metrics_module, fake_metrics),
  411. fake_metrics:start_link()
  412. end,
  413. fun(_X) ->
  414. fake_metrics:stop()
  415. end,
  416. {foreach,
  417. % setup
  418. fun() ->
  419. Pools = [
  420. [
  421. {name, test_pool_1},
  422. {group, group_1},
  423. {max_count, 3},
  424. {init_count, 2},
  425. {start_mfa, {pooled_gs, start_link, [{"type-1-1"}]}}
  426. ],
  427. [
  428. {name, test_pool_2},
  429. {group, group_1},
  430. {max_count, 3},
  431. {init_count, 2},
  432. {start_mfa, {pooled_gs, start_link, [{"type-1-2"}]}}
  433. ],
  434. %% test_pool_3 not part of the group
  435. [
  436. {name, test_pool_3},
  437. {group, undefined},
  438. {max_count, 3},
  439. {init_count, 2},
  440. {start_mfa, {pooled_gs, start_link, [{"type-3"}]}}
  441. ]
  442. ],
  443. application:set_env(pooler, pools, Pools),
  444. pg_start(),
  445. application:start(pooler)
  446. end,
  447. fun(_X) ->
  448. application:stop(pooler),
  449. pg_stop()
  450. end,
  451. [
  452. {"take and return one group member (repeated)", fun() ->
  453. Types = [
  454. begin
  455. Pid = pooler:take_group_member(group_1),
  456. ?assert(is_pid(Pid), [{result, Pid}, {i, I}]),
  457. {Type, _} = pooled_gs:get_id(Pid),
  458. ?assertMatch("type-1" ++ _, Type),
  459. ok = pooler:return_group_member(group_1, Pid, ok),
  460. timer:sleep(10),
  461. Type
  462. end
  463. || I <- lists:seq(1, 50)
  464. ],
  465. Type_1_1 = [X || "type-1-1" = X <- Types],
  466. Type_1_2 = [X || "type-1-2" = X <- Types],
  467. ?assert(length(Type_1_1) > 0, [{types, Types}]),
  468. ?assert(length(Type_1_2) > 0, [{types, Types}])
  469. end},
  470. {"take member from unknown group", fun() ->
  471. ?assertEqual(
  472. error_no_members,
  473. pooler:take_group_member(not_a_group)
  474. )
  475. end},
  476. {"return member to unknown group", fun() ->
  477. Pid = pooler:take_group_member(group_1),
  478. ?assertEqual(ok, pooler:return_group_member(no_such_group, Pid))
  479. end},
  480. {"return member to wrong group", fun() ->
  481. Pid = pooler:take_member(test_pool_3),
  482. ?assertEqual(ok, pooler:return_group_member(group_1, Pid))
  483. end},
  484. {"return member with something which is not a pid", fun() ->
  485. ?assertException(error, _, pooler:return_group_member(group_1, not_pid))
  486. end},
  487. {"take member from empty group", fun() ->
  488. %% artificially empty group member list
  489. [pg_leave(group_1, M) || M <- pooler:group_pools(group_1)],
  490. ?assertEqual(error_no_members, pooler:take_group_member(group_1))
  491. end},
  492. {"return member to group, implied ok", fun() ->
  493. Pid = pooler:take_group_member(group_1),
  494. ?assertEqual(ok, pooler:return_group_member(group_1, Pid))
  495. end},
  496. {"return error_no_member to group", fun() ->
  497. ?assertEqual(ok, pooler:return_group_member(group_1, error_no_members))
  498. end},
  499. {"exhaust pools in group", fun() ->
  500. Pids = get_n_pids_group(group_1, 6, []),
  501. %% they should all be pids
  502. [
  503. begin
  504. {Type, _} = pooled_gs:get_id(P),
  505. ?assertMatch("type-1" ++ _, Type),
  506. ok
  507. end
  508. || P <- Pids
  509. ],
  510. %% further attempts should be error
  511. [
  512. error_no_members,
  513. error_no_members,
  514. error_no_members
  515. ] = [
  516. pooler:take_group_member(group_1)
  517. || _I <- lists:seq(1, 3)
  518. ]
  519. end},
  520. {"rm_group with nonexisting group", fun() ->
  521. ?assertEqual(ok, pooler:rm_group(i_dont_exist))
  522. end},
  523. {"rm_group with existing empty group", fun() ->
  524. ?assertEqual(ok, pooler:rm_pool(test_pool_1)),
  525. ?assertEqual(ok, pooler:rm_pool(test_pool_2)),
  526. % process group de-registration is asynchronous
  527. timer:sleep(100),
  528. ?assertEqual(error_no_members, pooler:take_group_member(group_1)),
  529. ?assertEqual(ok, pooler:rm_group(group_1)),
  530. ?assertExit({noproc, _}, pooler:take_member(test_pool_1)),
  531. ?assertExit({noproc, _}, pooler:take_member(test_pool_2)),
  532. ?assertEqual(
  533. error_no_members,
  534. pooler:take_group_member(group_1)
  535. )
  536. end},
  537. {"rm_group with existing non-empty group", fun() ->
  538. %% Verify that group members exist
  539. MemberPid = pooler:take_group_member(group_1),
  540. ?assert(is_pid(MemberPid)),
  541. pooler:return_group_member(group_1, MemberPid),
  542. Pool1Pid = pooler:take_member(test_pool_1),
  543. ?assert(is_pid(Pool1Pid)),
  544. pooler:return_member(test_pool_1, Pool1Pid),
  545. Pool2Pid = pooler:take_member(test_pool_2),
  546. ?assert(is_pid(Pool2Pid)),
  547. pooler:return_member(test_pool_2, Pool2Pid),
  548. %% Delete and verify that group and pools are destroyed
  549. ?assertEqual(ok, pooler:rm_group(group_1)),
  550. ?assertExit({noproc, _}, pooler:take_member(test_pool_1)),
  551. ?assertExit({noproc, _}, pooler:take_member(test_pool_2)),
  552. ?assertEqual(
  553. error_no_members,
  554. pooler:take_group_member(group_1)
  555. )
  556. end}
  557. ]}}.
  558. pooler_limit_failed_adds_test_() ->
  559. %% verify that pooler crashes completely if too many failures are
  560. %% encountered while trying to add pids.
  561. {setup,
  562. fun() ->
  563. logger:set_handler_config(default, filters, []),
  564. Pools = [
  565. [
  566. {name, test_pool_1},
  567. {max_count, 10},
  568. {init_count, 10},
  569. {start_mfa, {pooled_gs, start_link, [crash]}}
  570. ]
  571. ],
  572. application:set_env(pooler, pools, Pools)
  573. end,
  574. fun(_) ->
  575. application:stop(pooler)
  576. end,
  577. fun() ->
  578. application:start(pooler),
  579. ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
  580. ?assertEqual(error_no_members, pooler:take_member(test_pool_1))
  581. end}.
  582. pooler_scheduled_cull_test_() ->
  583. {setup,
  584. fun() ->
  585. logger:set_handler_config(default, filters, []),
  586. application:set_env(pooler, metrics_module, fake_metrics),
  587. fake_metrics:start_link(),
  588. Pools = [
  589. [
  590. {name, test_pool_1},
  591. {max_count, 10},
  592. {init_count, 2},
  593. {start_mfa, {pooled_gs, start_link, [{"type-0"}]}},
  594. {cull_interval, {200, ms}},
  595. {max_age, {0, min}}
  596. ]
  597. ],
  598. application:set_env(pooler, pools, Pools),
  599. application:start(pooler)
  600. end,
  601. fun(_X) ->
  602. fake_metrics:stop(),
  603. application:stop(pooler)
  604. end,
  605. [
  606. {foreach,
  607. fun() ->
  608. Pids = get_n_pids(test_pool_1, 10, []),
  609. ?assertEqual(10, length(pooler:pool_stats(test_pool_1))),
  610. ?assertEqual(10, length(Pids)),
  611. Pids
  612. end,
  613. fun(Pids) ->
  614. [pooler:return_member(test_pool_1, P) || P <- Pids]
  615. end,
  616. [
  617. fun(Pids) ->
  618. {"excess members are culled run 1", fun() ->
  619. [pooler:return_member(test_pool_1, P) || P <- Pids],
  620. %% wait for longer than cull delay
  621. timer:sleep(250),
  622. ?assertEqual(2, length(pooler:pool_stats(test_pool_1))),
  623. ?assertEqual(2, ?gv(free_count, pooler:pool_utilization(test_pool_1)))
  624. end}
  625. end,
  626. fun(Pids) ->
  627. {"excess members are culled run 2", fun() ->
  628. [pooler:return_member(test_pool_1, P) || P <- Pids],
  629. %% wait for longer than cull delay
  630. timer:sleep(250),
  631. ?assertEqual(2, length(pooler:pool_stats(test_pool_1))),
  632. ?assertEqual(2, ?gv(free_count, pooler:pool_utilization(test_pool_1)))
  633. end}
  634. end,
  635. fun(Pids) -> in_use_members_not_culled(Pids, 1) end,
  636. fun(Pids) -> in_use_members_not_culled(Pids, 2) end,
  637. fun(Pids) -> in_use_members_not_culled(Pids, 3) end,
  638. fun(Pids) -> in_use_members_not_culled(Pids, 4) end,
  639. fun(Pids) -> in_use_members_not_culled(Pids, 5) end,
  640. fun(Pids) -> in_use_members_not_culled(Pids, 6) end
  641. ]},
  642. {"no cull when init_count matches max_count",
  643. %% not sure how to verify this. But this test at least
  644. %% exercises the code path.
  645. fun() ->
  646. Config = [
  647. {name, test_static_pool_1},
  648. {max_count, 2},
  649. {init_count, 2},
  650. {start_mfa, {pooled_gs, start_link, [{"static-0"}]}},
  651. % ignored
  652. {cull_interval, {200, ms}}
  653. ],
  654. pooler:new_pool(Config),
  655. P = pooler:take_member(test_static_pool_1),
  656. ?assertMatch({"static-0", _}, pooled_gs:get_id(P)),
  657. pooler:return_member(test_static_pool_1, P),
  658. ok
  659. end}
  660. ]}.
  661. in_use_members_not_culled(Pids, N) ->
  662. {"in-use members are not culled " ++ erlang:integer_to_list(N), fun() ->
  663. %% wait for longer than cull delay
  664. timer:sleep(250),
  665. PidCount = length(Pids),
  666. ?assertEqual(
  667. PidCount,
  668. length(pooler:pool_stats(test_pool_1))
  669. ),
  670. ?assertEqual(0, ?gv(free_count, pooler:pool_utilization(test_pool_1))),
  671. ?assertEqual(PidCount, ?gv(in_use_count, pooler:pool_utilization(test_pool_1))),
  672. Returns = lists:sublist(Pids, N),
  673. [
  674. pooler:return_member(test_pool_1, P)
  675. || P <- Returns
  676. ],
  677. timer:sleep(250),
  678. ?assertEqual(
  679. PidCount - N,
  680. length(pooler:pool_stats(test_pool_1))
  681. )
  682. end}.
  683. random_message_test_() ->
  684. {setup,
  685. fun() ->
  686. logger:set_handler_config(default, filters, []),
  687. Pools = [
  688. [
  689. {name, test_pool_1},
  690. {max_count, 2},
  691. {init_count, 1},
  692. {start_mfa, {pooled_gs, start_link, [{"type-0"}]}}
  693. ]
  694. ],
  695. application:set_env(pooler, pools, Pools),
  696. application:start(pooler),
  697. %% now send some bogus messages
  698. %% do the call in a throw-away process to avoid timeout error
  699. spawn(fun() -> catch gen_server:call(test_pool_1, {unexpected_garbage_msg, 5}) end),
  700. gen_server:cast(test_pool_1, {unexpected_garbage_msg, 6}),
  701. whereis(test_pool_1) ! {unexpected_garbage_msg, 7},
  702. ok
  703. end,
  704. fun(_) ->
  705. application:stop(pooler)
  706. end,
  707. [
  708. fun() ->
  709. Pid = spawn(fun() -> ok end),
  710. MonMsg = {'DOWN', erlang:make_ref(), process, Pid, because},
  711. test_pool_1 ! MonMsg
  712. end,
  713. fun() ->
  714. Pid = pooler:take_member(test_pool_1),
  715. {Type, _} = pooled_gs:get_id(Pid),
  716. ?assertEqual("type-0", Type)
  717. end,
  718. fun() ->
  719. RawPool = gen_server:call(test_pool_1, dump_pool),
  720. ?assertEqual(pool, element(1, RawPool))
  721. end
  722. ]}.
  723. pooler_integration_long_init_test_() ->
  724. {foreach,
  725. % setup
  726. fun() ->
  727. logger:set_handler_config(default, filters, []),
  728. {ok, _} = error_logger_mon:start_link(),
  729. error_logger_mon:install_handler(pooler),
  730. Pool = [
  731. {name, test_pool_1},
  732. {max_count, 10},
  733. {init_count, 0},
  734. {member_start_timeout, {10, ms}},
  735. {start_mfa, {pooled_gs, start_link, [{"type-0", fun() -> timer:sleep(15) end}]}}
  736. ],
  737. application:set_env(pooler, pools, [Pool]),
  738. application:start(pooler)
  739. end,
  740. % cleanup
  741. fun(_) ->
  742. error_logger_mon:uninstall_handler(),
  743. ok = error_logger_mon:stop(),
  744. application:stop(pooler)
  745. end,
  746. %
  747. [
  748. fun(_) ->
  749. % Test what happens when pool members take too long to start.
  750. % The pooler_starter should kill off stale members, there by
  751. % reducing the number of children of the member_sup. This
  752. % activity occurs both during take member and accept member.
  753. % Accordingly, the count should go to zero once all starters
  754. % check in.
  755. fun() ->
  756. ?assertEqual(0, children_count(pooler_test_pool_1_member_sup)),
  757. [
  758. begin
  759. ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
  760. ?assertEqual(1, starting_members(test_pool_1))
  761. end
  762. || _ <- lists:seq(1, 10)
  763. ],
  764. ?assertEqual(10, children_count(pooler_test_pool_1_member_sup)),
  765. timer:sleep(150),
  766. ?assertEqual(0, children_count(pooler_test_pool_1_member_sup)),
  767. ?assertEqual(0, starting_members(test_pool_1)),
  768. %% there is a log when worker start times out
  769. ?assert(
  770. lists:any(
  771. fun
  772. (
  773. #{
  774. level := error,
  775. msg :=
  776. {report, #{
  777. label := "starting member timeout",
  778. pool := test_pool_1
  779. }}
  780. }
  781. ) ->
  782. true;
  783. (_) ->
  784. false
  785. end,
  786. error_logger_mon:get_msgs()
  787. )
  788. )
  789. end
  790. end
  791. ]}.
  792. sleep_for_configured_timeout() ->
  793. SleepTime =
  794. case application:get_env(pooler, sleep_time) of
  795. {ok, Val} ->
  796. Val;
  797. _ ->
  798. 0
  799. end,
  800. timer:sleep(SleepTime).
  801. pooler_integration_queueing_test_() ->
  802. {foreach,
  803. % setup
  804. fun() ->
  805. logger:set_handler_config(default, filters, []),
  806. Pool = [
  807. {name, test_pool_1},
  808. {max_count, 10},
  809. {queue_max, 10},
  810. {init_count, 0},
  811. {metrics, fake_metrics},
  812. {member_start_timeout, {5, sec}},
  813. {start_mfa,
  814. {pooled_gs, start_link, [
  815. {"type-0", fun pooler_tests:sleep_for_configured_timeout/0}
  816. ]}}
  817. ],
  818. application:set_env(pooler, pools, [Pool]),
  819. fake_metrics:start_link(),
  820. application:start(pooler)
  821. end,
  822. % cleanup
  823. fun(_) ->
  824. fake_metrics:stop(),
  825. application:stop(pooler)
  826. end,
  827. [
  828. fun(_) ->
  829. fun() ->
  830. ?assertEqual(0, (dump_pool(test_pool_1))#pool.free_count),
  831. Val = pooler:take_member(test_pool_1, 10),
  832. ?assert(is_pid(Val)),
  833. pooler:return_member(test_pool_1, Val)
  834. end
  835. end,
  836. fun(_) ->
  837. fun() ->
  838. application:set_env(pooler, sleep_time, 1),
  839. ?assertEqual(0, (dump_pool(test_pool_1))#pool.free_count),
  840. Val = pooler:take_member(test_pool_1, 0),
  841. ?assertEqual(error_no_members, Val),
  842. ?assertEqual(0, ?gv(free_count, pooler:pool_utilization(test_pool_1))),
  843. timer:sleep(50),
  844. %Next request should be available
  845. Pid = pooler:take_member(test_pool_1, 0),
  846. ?assert(is_pid(Pid)),
  847. pooler:return_member(test_pool_1, Pid)
  848. end
  849. end,
  850. fun(_) ->
  851. fun() ->
  852. application:set_env(pooler, sleep_time, 10),
  853. ?assertEqual(0, (dump_pool(test_pool_1))#pool.free_count),
  854. [
  855. ?assertEqual(pooler:take_member(test_pool_1, 0), error_no_members)
  856. || _ <- lists:seq(1, (dump_pool(test_pool_1))#pool.max_count)
  857. ],
  858. timer:sleep(50),
  859. %Next request should be available
  860. Pid = pooler:take_member(test_pool_1, 0),
  861. ?assert(is_pid(Pid)),
  862. pooler:return_member(test_pool_1, Pid)
  863. end
  864. end,
  865. fun(_) ->
  866. fun() ->
  867. % fill to queue_max, next request should return immediately with no_members
  868. % Will return a if queue max is not enforced.
  869. application:set_env(pooler, sleep_time, 100),
  870. [
  871. proc_lib:spawn(fun() ->
  872. Val = pooler:take_member(test_pool_1, 200),
  873. ?assert(is_pid(Val)),
  874. pooler:return_member(Val)
  875. end)
  876. || _ <- lists:seq(1, (dump_pool(test_pool_1))#pool.max_count)
  877. ],
  878. ?assertEqual(0, ?gv(free_count, pooler:pool_utilization(test_pool_1))),
  879. ?assert(?gv(queued_count, pooler:pool_utilization(test_pool_1)) >= 1),
  880. ?assertEqual(10, ?gv(queue_max, pooler:pool_utilization(test_pool_1))),
  881. timer:sleep(50),
  882. ?assertEqual(10, queue:len((dump_pool(test_pool_1))#pool.queued_requestors)),
  883. ?assertEqual(10, ?gv(queue_max, pooler:pool_utilization(test_pool_1))),
  884. ?assertEqual(pooler:take_member(test_pool_1, 500), error_no_members),
  885. ExpectKeys = lists:sort([
  886. <<"pooler.test_pool_1.error_no_members_count">>,
  887. <<"pooler.test_pool_1.events">>,
  888. <<"pooler.test_pool_1.take_rate">>,
  889. <<"pooler.test_pool_1.queue_count">>,
  890. <<"pooler.test_pool_1.queue_max_reached">>
  891. ]),
  892. Metrics = fake_metrics:get_metrics(),
  893. GotKeys = lists:usort([Name || {Name, _, _} <- Metrics]),
  894. ?assertEqual(ExpectKeys, GotKeys),
  895. timer:sleep(100),
  896. Val = pooler:take_member(test_pool_1, 500),
  897. ?assert(is_pid(Val)),
  898. pooler:return_member(test_pool_1, Val)
  899. end
  900. end
  901. ]}.
  902. pooler_integration_queueing_return_member_test_() ->
  903. {foreach,
  904. % setup
  905. fun() ->
  906. logger:set_handler_config(default, filters, []),
  907. Pool = [
  908. {name, test_pool_1},
  909. {max_count, 10},
  910. {queue_max, 10},
  911. {init_count, 10},
  912. {metrics, fake_metrics},
  913. {member_start_timeout, {5, sec}},
  914. {start_mfa,
  915. {pooled_gs, start_link, [
  916. {"type-0", fun pooler_tests:sleep_for_configured_timeout/0}
  917. ]}}
  918. ],
  919. application:set_env(pooler, pools, [Pool]),
  920. fake_metrics:start_link(),
  921. application:start(pooler)
  922. end,
  923. % cleanup
  924. fun(_) ->
  925. fake_metrics:stop(),
  926. application:stop(pooler)
  927. end,
  928. [
  929. fun(_) ->
  930. fun() ->
  931. application:set_env(pooler, sleep_time, 0),
  932. Parent = self(),
  933. Pids = [
  934. proc_lib:spawn_link(fun() ->
  935. Val = pooler:take_member(test_pool_1, 200),
  936. ?assert(is_pid(Val)),
  937. Parent ! {taken, self()},
  938. receive
  939. return ->
  940. pooler:return_member(test_pool_1, Val)
  941. after 5000 ->
  942. pooler:return_member(test_pool_1, Val)
  943. end,
  944. Parent ! {returned, self()}
  945. end)
  946. || _ <- lists:seq(1, (dump_pool(test_pool_1))#pool.max_count)
  947. ],
  948. [
  949. receive
  950. {taken, Pid} -> ok
  951. end
  952. || Pid <- Pids
  953. ],
  954. ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
  955. proc_lib:spawn_link(fun() ->
  956. Val = pooler:take_member(test_pool_1, 200),
  957. Parent ! {extra_result, Val}
  958. end),
  959. [Pid ! return || Pid <- Pids],
  960. [
  961. receive
  962. {returned, Pid} -> ok
  963. end
  964. || Pid <- Pids
  965. ],
  966. receive
  967. {extra_result, Result} ->
  968. ?assert(is_pid(Result)),
  969. pooler:return_member(test_pool_1, Result)
  970. end,
  971. ?assertEqual(
  972. (dump_pool(test_pool_1))#pool.max_count, length((dump_pool(test_pool_1))#pool.free_pids)
  973. ),
  974. ?assertEqual((dump_pool(test_pool_1))#pool.max_count, (dump_pool(test_pool_1))#pool.free_count)
  975. end
  976. end
  977. ]}.
  978. pooler_integration_test_() ->
  979. {foreach,
  980. % setup
  981. fun() ->
  982. logger:set_handler_config(default, filters, []),
  983. Pools = [
  984. [
  985. {name, test_pool_1},
  986. {max_count, 10},
  987. {init_count, 10},
  988. {start_mfa, {pooled_gs, start_link, [{"type-0"}]}}
  989. ]
  990. ],
  991. application:set_env(pooler, pools, Pools),
  992. application:start(pooler),
  993. Users = [start_user() || _X <- lists:seq(1, 10)],
  994. Users
  995. end,
  996. % cleanup
  997. fun(Users) ->
  998. [user_stop(U) || U <- Users],
  999. application:stop(pooler)
  1000. end,
  1001. %
  1002. [
  1003. fun(Users) ->
  1004. fun() ->
  1005. % each user has a different tc ID
  1006. TcIds = lists:sort([user_id(UPid) || UPid <- Users]),
  1007. ?assertEqual(lists:usort(TcIds), TcIds)
  1008. end
  1009. end,
  1010. fun(Users) ->
  1011. fun() ->
  1012. % users still unique after a renew cycle
  1013. [user_new_tc(UPid) || UPid <- Users],
  1014. TcIds = lists:sort([user_id(UPid) || UPid <- Users]),
  1015. ?assertEqual(lists:usort(TcIds), TcIds)
  1016. end
  1017. end,
  1018. fun(Users) ->
  1019. fun() ->
  1020. % all users crash, pids are replaced
  1021. TcIds1 = lists:sort([user_id(UPid) || UPid <- Users]),
  1022. [user_crash(UPid) || UPid <- Users],
  1023. Seq = lists:seq(1, 5),
  1024. Users2 = [start_user() || _X <- Seq],
  1025. TcIds2 = lists:sort([user_id(UPid) || UPid <- Users2]),
  1026. Both =
  1027. sets:to_list(
  1028. sets:intersection([
  1029. sets:from_list(TcIds1),
  1030. sets:from_list(TcIds2)
  1031. ])
  1032. ),
  1033. ?assertEqual([], Both)
  1034. end
  1035. end
  1036. ]}.
  1037. pooler_auto_grow_disabled_by_default_test_() ->
  1038. {setup,
  1039. fun() ->
  1040. logger:set_handler_config(default, filters, []),
  1041. application:set_env(pooler, metrics_module, fake_metrics),
  1042. fake_metrics:start_link()
  1043. end,
  1044. fun(_X) ->
  1045. fake_metrics:stop()
  1046. end,
  1047. {foreach,
  1048. % setup
  1049. fun() ->
  1050. Pool = [
  1051. {name, test_pool_1},
  1052. {max_count, 5},
  1053. {init_count, 2},
  1054. {start_mfa, {pooled_gs, start_link, [{"type-0"}]}}
  1055. ],
  1056. application:unset_env(pooler, pools),
  1057. application:start(pooler),
  1058. pooler:new_pool(Pool)
  1059. end,
  1060. fun(_X) ->
  1061. application:stop(pooler)
  1062. end,
  1063. [
  1064. {"take one, and it should not auto-grow", fun() ->
  1065. ?assertEqual(2, (dump_pool(test_pool_1))#pool.free_count),
  1066. P = pooler:take_member(test_pool_1),
  1067. ?assertMatch({"type-0", _Id}, pooled_gs:get_id(P)),
  1068. timer:sleep(100),
  1069. ?assertEqual(1, (dump_pool(test_pool_1))#pool.free_count),
  1070. ok,
  1071. pooler:return_member(test_pool_1, P)
  1072. end}
  1073. ]}}.
  1074. pooler_auto_grow_enabled_test_() ->
  1075. {setup,
  1076. fun() ->
  1077. logger:set_handler_config(default, filters, []),
  1078. application:set_env(pooler, metrics_module, fake_metrics),
  1079. fake_metrics:start_link()
  1080. end,
  1081. fun(_X) ->
  1082. fake_metrics:stop()
  1083. end,
  1084. {foreach,
  1085. % setup
  1086. fun() ->
  1087. Pool = [
  1088. {name, test_pool_1},
  1089. {max_count, 5},
  1090. {init_count, 2},
  1091. {auto_grow_threshold, 1},
  1092. {start_mfa, {pooled_gs, start_link, [{"type-0"}]}}
  1093. ],
  1094. application:unset_env(pooler, pools),
  1095. application:start(pooler),
  1096. pooler:new_pool(Pool)
  1097. end,
  1098. fun(_X) ->
  1099. application:stop(pooler)
  1100. end,
  1101. [
  1102. {"take one, and it should grow by 2", fun() ->
  1103. ?assertEqual(2, (dump_pool(test_pool_1))#pool.free_count),
  1104. P = pooler:take_member(test_pool_1),
  1105. ?assertMatch({"type-0", _Id}, pooled_gs:get_id(P)),
  1106. timer:sleep(100),
  1107. ?assertEqual(3, (dump_pool(test_pool_1))#pool.free_count),
  1108. ok,
  1109. pooler:return_member(test_pool_1, P)
  1110. end}
  1111. ]}}.
  1112. pooler_custom_stop_mfa_test_() ->
  1113. {foreach,
  1114. fun() ->
  1115. logger:set_handler_config(default, filters, []),
  1116. Pool = [
  1117. {name, test_pool_1},
  1118. {max_count, 3},
  1119. {init_count, 2},
  1120. {cull_interval, {200, ms}},
  1121. {max_age, {0, min}},
  1122. {start_mfa, {pooled_gs, start_link, [{foo_type}]}}
  1123. ],
  1124. application:set_env(pooler, pools, [Pool])
  1125. end,
  1126. fun(_) ->
  1127. application:unset_env(pooler, pools)
  1128. end,
  1129. [
  1130. {"default behavior kills the pool member", fun() ->
  1131. ok = application:start(pooler),
  1132. Reason = monitor_members_trigger_culling_and_return_reason(),
  1133. ok = application:stop(pooler),
  1134. ?assertEqual(killed, Reason)
  1135. end},
  1136. {"custom callback terminates the pool member normally", fun() ->
  1137. {ok, [Pool]} = application:get_env(pooler, pools),
  1138. Stop = {stop_mfa, {pooled_gs, stop, ['$pooler_pid']}},
  1139. application:set_env(pooler, pools, [[Stop | Pool]]),
  1140. ok = application:start(pooler),
  1141. Reason = monitor_members_trigger_culling_and_return_reason(),
  1142. ok = application:stop(pooler),
  1143. ?assertEqual(normal, Reason)
  1144. end}
  1145. ]}.
  1146. no_error_logger_reports_after_culling_test_() ->
  1147. %% damn those wraiths! This is the cure
  1148. {foreach,
  1149. fun() ->
  1150. logger:set_handler_config(default, filters, []),
  1151. {ok, _Pid} = error_logger_mon:start_link(),
  1152. Pool = [
  1153. {name, test_pool_1},
  1154. {max_count, 3},
  1155. {init_count, 2},
  1156. {cull_interval, {200, ms}},
  1157. {max_age, {0, min}},
  1158. {start_mfa, {pooled_gs, start_link, [{type}]}}
  1159. ],
  1160. application:set_env(pooler, pools, [Pool])
  1161. end,
  1162. fun(_) ->
  1163. ok = error_logger_mon:stop(),
  1164. error_logger_mon:uninstall_handler(),
  1165. application:unset_env(pooler, pools)
  1166. end,
  1167. [
  1168. {"Force supervisor to report by using exit/2 instead of terminate_child", fun() ->
  1169. {ok, [Pool]} = application:get_env(pooler, pools),
  1170. Stop = {stop_mfa, {erlang, exit, ['$pooler_pid', kill]}},
  1171. application:set_env(pooler, pools, [[Stop | Pool]]),
  1172. ok = application:start(pooler),
  1173. error_logger_mon:install_handler(),
  1174. error_logger_mon:reset(),
  1175. Reason = monitor_members_trigger_culling_and_return_reason(),
  1176. %% we need to wait for the message to arrive before deleting handler
  1177. timer:sleep(250),
  1178. error_logger_mon:uninstall_handler(),
  1179. ok = application:stop(pooler),
  1180. ?assertEqual(killed, Reason),
  1181. ?assertEqual(
  1182. 1,
  1183. error_logger_mon:get_msg_count(),
  1184. [
  1185. {msgs, error_logger_mon:get_msgs()},
  1186. {m, [R || #{msg := {report, R}} <- error_logger_mon:get_msgs()]}
  1187. ]
  1188. )
  1189. end},
  1190. {"Default MFA shouldn't generate any reports during culling", fun() ->
  1191. ok = application:start(pooler),
  1192. error_logger_mon:install_handler(),
  1193. Reason = monitor_members_trigger_culling_and_return_reason(),
  1194. error_logger_mon:uninstall_handler(),
  1195. ok = application:stop(pooler),
  1196. ?assertEqual(killed, Reason),
  1197. ?assertEqual(0, error_logger_mon:get_msg_count())
  1198. end}
  1199. ]}.
  1200. monitor_members_trigger_culling_and_return_reason() ->
  1201. Pids = get_n_pids(test_pool_1, 3, []),
  1202. [erlang:monitor(process, P) || P <- Pids],
  1203. [pooler:return_member(test_pool_1, P) || P <- Pids],
  1204. receive
  1205. {'DOWN', _Ref, process, _Pid, Reason} ->
  1206. Reason
  1207. after 250 -> timeout
  1208. end.
  1209. time_as_millis_test_() ->
  1210. Zeros = [{{0, U}, 0} || U <- [min, sec, ms, mu]],
  1211. Ones = [
  1212. {{1, min}, 60000},
  1213. {{1, sec}, 1000},
  1214. {{1, ms}, 1},
  1215. {{1, mu}, 0}
  1216. ],
  1217. Misc = [{{3000, mu}, 3}],
  1218. Tests = Zeros ++ Ones ++ Misc,
  1219. [?_assertEqual(E, pooler:time_as_millis(I)) || {I, E} <- Tests].
  1220. time_as_micros_test_() ->
  1221. Zeros = [{{0, U}, 0} || U <- [min, sec, ms, mu]],
  1222. Ones = [
  1223. {{1, min}, 60000000},
  1224. {{1, sec}, 1000000},
  1225. {{1, ms}, 1000},
  1226. {{1, mu}, 1}
  1227. ],
  1228. Misc = [{{3000, mu}, 3000}],
  1229. Tests = Zeros ++ Ones ++ Misc,
  1230. [?_assertEqual(E, pooler:time_as_micros(I)) || {I, E} <- Tests].
  1231. call_free_members_test_() ->
  1232. NumWorkers = 10,
  1233. PoolName = test_pool_1,
  1234. {setup,
  1235. fun() ->
  1236. application:set_env(pooler, metrics_module, fake_metrics),
  1237. fake_metrics:start_link()
  1238. end,
  1239. fun(_X) ->
  1240. fake_metrics:stop()
  1241. end,
  1242. {foreach,
  1243. fun() ->
  1244. Pool = [
  1245. {name, PoolName},
  1246. {max_count, NumWorkers},
  1247. {init_count, NumWorkers},
  1248. {start_mfa, {pooled_gs, start_link, [{"type-0"}]}}
  1249. ],
  1250. application:unset_env(pooler, pools),
  1251. application:start(pooler),
  1252. pooler:new_pool(Pool)
  1253. end,
  1254. fun(_X) ->
  1255. application:stop(pooler)
  1256. end,
  1257. [
  1258. {"perform a ping across the pool when all workers are free", fun() ->
  1259. ?assertEqual(NumWorkers, (dump_pool(PoolName))#pool.free_count),
  1260. Res = pooler:call_free_members(PoolName, fun pooled_gs:ping/1),
  1261. ?assertEqual(NumWorkers, count_pongs(Res))
  1262. end},
  1263. {"perform a ping across the pool when two workers are taken", fun() ->
  1264. ?assertEqual(NumWorkers, (dump_pool(PoolName))#pool.free_count),
  1265. Pids = [pooler:take_member(PoolName) || _X <- lists:seq(0, 1)],
  1266. Res = pooler:call_free_members(PoolName, fun pooled_gs:ping/1),
  1267. ?assertEqual(NumWorkers - 2, count_pongs(Res)),
  1268. [pooler:return_member(PoolName, P) || P <- Pids]
  1269. end},
  1270. {"perform an op where the op crashes all free members", fun() ->
  1271. ?assertEqual(NumWorkers, (dump_pool(PoolName))#pool.free_count),
  1272. Res = pooler:call_free_members(
  1273. PoolName,
  1274. fun pooled_gs:error_on_call/1
  1275. ),
  1276. ?assertEqual(NumWorkers, count_errors(Res))
  1277. end}
  1278. ]}}.
  1279. count_pongs(Result) ->
  1280. lists:foldl(
  1281. fun
  1282. ({ok, pong}, Acc) -> Acc + 1;
  1283. ({error, _}, Acc) -> Acc
  1284. end,
  1285. 0,
  1286. Result
  1287. ).
  1288. count_errors(Result) ->
  1289. lists:foldl(
  1290. fun
  1291. ({error, _}, Acc) -> Acc + 1;
  1292. ({ok, _}, Acc) -> Acc
  1293. end,
  1294. 0,
  1295. Result
  1296. ).
  1297. % testing crash recovery means race conditions when either pids
  1298. % haven't yet crashed or pooler hasn't recovered. So this helper loops
  1299. % forver until N pids are obtained, ignoring error_no_members.
  1300. get_n_pids(N, Acc) ->
  1301. get_n_pids(test_pool_1, N, Acc).
  1302. get_n_pids(_Pool, 0, Acc) ->
  1303. Acc;
  1304. get_n_pids(Pool, N, Acc) ->
  1305. case pooler:take_member(Pool) of
  1306. error_no_members ->
  1307. get_n_pids(Pool, N, Acc);
  1308. Pid ->
  1309. get_n_pids(Pool, N - 1, [Pid | Acc])
  1310. end.
  1311. get_n_pids_group(_Group, 0, Acc) ->
  1312. Acc;
  1313. get_n_pids_group(Group, N, Acc) ->
  1314. case pooler:take_group_member(Group) of
  1315. error_no_members ->
  1316. get_n_pids_group(Group, N, Acc);
  1317. Pid ->
  1318. get_n_pids_group(Group, N - 1, [Pid | Acc])
  1319. end.
  1320. children_count(SupId) ->
  1321. length(supervisor:which_children(SupId)).
  1322. starting_members(PoolName) ->
  1323. length((dump_pool(PoolName))#pool.starting_members).
  1324. dump_pool(PoolName) ->
  1325. gen_server:call(PoolName, dump_pool).
  1326. % >= OTP-21
  1327. -ifdef(OTP_RELEASE).
  1328. -if(?OTP_RELEASE >= 23).
  1329. -define(USE_PG_NOT_PG2, true).
  1330. -else.
  1331. -undef(USE_PG_NOT_PG2).
  1332. -endif.
  1333. % < OTP-21
  1334. -else.
  1335. -undef(USE_PG_NOT_PG2).
  1336. -endif.
  1337. -ifdef(USE_PG_NOT_PG2).
  1338. pg_start() ->
  1339. pg:start(_Scope = 'pg').
  1340. pg_stop() ->
  1341. lists:foreach(
  1342. fun(Group) -> pg:leave(Group, pg:get_members(Group)) end,
  1343. pg:which_groups()
  1344. ).
  1345. pg_leave(Group, Pid) ->
  1346. pg:leave(Group, Pid).
  1347. -else.
  1348. pg_start() ->
  1349. pg2:start().
  1350. pg_stop() ->
  1351. application:stop(pg2).
  1352. pg_leave(Group, Pid) ->
  1353. pg2:leave(Group, Pid).
  1354. -endif.