pooler_tests.erl 63 KB


  1. -module(pooler_tests).
  2. -include_lib("eunit/include/eunit.hrl").
  3. -export([sleep_for_configured_timeout/0]).
  4. % The `user' processes represent users of the pooler library. A user
  5. % process will take a pid, report details on the pid it has, release
  6. % and take a new pid, stop cleanly, and crash.
  7. start_user() ->
  8. spawn(fun() -> user_loop(start) end).
  9. user_id(Pid) ->
  10. Pid ! {get_tc_id, self()},
  11. receive
  12. {Type, Id} ->
  13. {Type, Id}
  14. end.
  15. user_new_tc(Pid) ->
  16. Pid ! new_tc.
  17. user_stop(Pid) ->
  18. Pid ! stop.
  19. user_crash(Pid) ->
  20. Pid ! crash.
  21. user_loop(Atom) when Atom =:= error_no_members orelse Atom =:= start ->
  22. user_loop(pooler:take_member(test_pool_1));
  23. user_loop(MyTC) ->
  24. receive
  25. {get_tc_id, From} ->
  26. From ! pooled_gs:get_id(MyTC),
  27. user_loop(MyTC);
  28. {ping_tc, From} ->
  29. From ! pooled_gs:ping(MyTC),
  30. user_loop(MyTC);
  31. {ping_count, From} ->
  32. From ! pooled_gs:ping_count(MyTC),
  33. user_loop(MyTC);
  34. new_tc ->
  35. pooler:return_member(test_pool_1, MyTC, ok),
  36. MyNewTC = pooler:take_member(test_pool_1),
  37. user_loop(MyNewTC);
  38. stop ->
  39. pooler:return_member(test_pool_1, MyTC, ok),
  40. stopped;
  41. crash ->
  42. erlang:error({user_loop, kaboom})
  43. end.
  44. % The `tc' processes represent the pids tracked by pooler for testing.
  45. % They have a type and an ID and can report their type and ID and
  46. % stop.
  47. %% tc_loop({Type, Id}) ->
  48. %% receive
  49. %% {get_id, From} ->
  50. %% From ! {ok, Type, Id},
  51. %% tc_loop({Type, Id});
  52. %% stop -> stopped;
  53. %% crash ->
  54. %% erlang:error({tc_loop, kaboom})
  55. %% end.
  56. %% get_tc_id(Pid) ->
  57. %% Pid ! {get_id, self()},
  58. %% receive
  59. %% {ok, Type, Id} ->
  60. %% {Type, Id}
  61. %% after 200 ->
  62. %% timeout
  63. %% end.
  64. %% stop_tc(Pid) ->
  65. %% Pid ! stop.
  66. %% tc_starter(Type) ->
  67. %% Ref = make_ref(),
  68. %% spawn_link(fun() -> tc_loop({Type, Ref}) end).
  69. %% assert_tc_valid(Pid) ->
  70. %% ?assertMatch({_Type, _Ref}, get_tc_id(Pid)),
  71. %% ok.
  72. % tc_sanity_test() ->
  73. % Pid1 = tc_starter("1"),
  74. % {"1", Id1} = get_tc_id(Pid1),
  75. % Pid2 = tc_starter("1"),
  76. % {"1", Id2} = get_tc_id(Pid2),
  77. % ?assertNot(Id1 == Id2),
  78. % stop_tc(Pid1),
  79. % stop_tc(Pid2).
  80. % user_sanity_test() ->
  81. % Pid1 = tc_starter("1"),
  82. % User = spawn(fun() -> user_loop(Pid1) end),
  83. % ?assertMatch({"1", _Ref}, user_id(User)),
  84. % user_crash(User),
  85. % stop_tc(Pid1).
  86. pooler_basics_via_config_test_() ->
  87. {setup,
  88. fun() ->
  89. {ok, _} = error_logger_mon:start_link(),
  90. error_logger_mon:install_handler(pooler),
  91. logger:set_handler_config(default, filters, []),
  92. application:set_env(pooler, metrics_module, fake_metrics),
  93. fake_metrics:start_link()
  94. end,
  95. fun(_X) ->
  96. error_logger_mon:uninstall_handler(),
  97. ok = error_logger_mon:stop(),
  98. fake_metrics:stop()
  99. end,
  100. {foreach,
  101. % setup
  102. fun() ->
  103. error_logger_mon:reset(),
  104. Pools = [
  105. #{
  106. name => test_pool_1,
  107. max_count => 3,
  108. init_count => 2,
  109. cull_interval => {0, min},
  110. start_mfa => {pooled_gs, start_link, [{"type-0"}]}
  111. }
  112. ],
  113. application:set_env(pooler, pools, Pools),
  114. application:start(pooler)
  115. end,
  116. fun(_X) ->
  117. application:stop(pooler)
  118. end,
  119. basic_tests()}}.
  120. pooler_basics_dynamic_test_() ->
  121. {setup,
  122. fun() ->
  123. {ok, _} = error_logger_mon:start_link(),
  124. error_logger_mon:install_handler(pooler),
  125. logger:set_handler_config(default, filters, []),
  126. application:set_env(pooler, metrics_module, fake_metrics),
  127. fake_metrics:start_link()
  128. end,
  129. fun(_X) ->
  130. error_logger_mon:uninstall_handler(),
  131. ok = error_logger_mon:stop(),
  132. fake_metrics:stop()
  133. end,
  134. {foreach,
  135. % setup
  136. fun() ->
  137. error_logger_mon:reset(),
  138. Pool = #{
  139. name => test_pool_1,
  140. max_count => 3,
  141. init_count => 2,
  142. start_mfa => {pooled_gs, start_link, [{"type-0"}]}
  143. },
  144. application:unset_env(pooler, pools),
  145. application:start(pooler),
  146. pooler:new_pool(Pool)
  147. end,
  148. fun(_X) ->
  149. application:stop(pooler)
  150. end,
  151. basic_tests()}}.
  152. pooler_basics_integration_to_other_supervisor_test_() ->
  153. {setup,
  154. fun() ->
  155. {ok, _} = error_logger_mon:start_link(),
  156. error_logger_mon:install_handler(pooler),
  157. logger:set_handler_config(default, filters, []),
  158. application:set_env(pooler, metrics_module, fake_metrics),
  159. fake_metrics:start_link()
  160. end,
  161. fun(_X) ->
  162. error_logger_mon:uninstall_handler(),
  163. ok = error_logger_mon:stop(),
  164. fake_metrics:stop()
  165. end,
  166. {foreach,
  167. % setup
  168. fun() ->
  169. error_logger_mon:reset(),
  170. Pool = #{
  171. name => test_pool_1,
  172. max_count => 3,
  173. init_count => 2,
  174. start_mfa => {pooled_gs, start_link, [{"type-0"}]}
  175. },
  176. application:unset_env(pooler, pools),
  177. application:start(pooler),
  178. supervisor:start_link(fake_external_supervisor, Pool)
  179. end,
  180. fun({ok, SupPid}) ->
  181. exit(SupPid, normal),
  182. application:stop(pooler)
  183. end,
  184. basic_tests()}}.
  185. basic_tests() ->
  186. [
  187. {"there are init_count members at start", fun() ->
  188. Stats = [P || {P, {_, free, _}} <- pooler:pool_stats(test_pool_1)],
  189. ?assertEqual(2, length(Stats))
  190. end},
  191. {"take and return one", fun() ->
  192. P = pooler:take_member(test_pool_1),
  193. ?assertMatch({"type-0", _Id}, pooled_gs:get_id(P)),
  194. ok = pooler:return_member(test_pool_1, P, ok)
  195. end},
  196. {"take and return one, named pool", fun() ->
  197. P = pooler:take_member(test_pool_1),
  198. ?assertMatch({"type-0", _Id}, pooled_gs:get_id(P)),
  199. ok,
  200. pooler:return_member(test_pool_1, P)
  201. end},
  202. {"attempt to take form unknown pool", fun() ->
  203. %% since pools are now servers, an unknown pool will timeout
  204. ?assertExit({noproc, _}, pooler:take_member(bad_pool_name))
  205. end},
  206. {"members creation is triggered after pool exhaustion until max", fun() ->
  207. %% init count is 2
  208. Pids0 = [pooler:take_member(test_pool_1), pooler:take_member(test_pool_1)],
  209. %% since new member creation is async, can only assert
  210. %% that we will get a pid, but may not be first try.
  211. Pids = get_n_pids(1, Pids0),
  212. %% pool is at max now, requests should give error
  213. ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
  214. ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
  215. PRefs = [R || {_T, R} <- [pooled_gs:get_id(P) || P <- Pids]],
  216. % no duplicates
  217. ?assertEqual(length(PRefs), length(lists:usort(PRefs)))
  218. end},
  219. {"pids are reused most recent return first", fun() ->
  220. P1 = pooler:take_member(test_pool_1),
  221. P2 = pooler:take_member(test_pool_1),
  222. ?assertNot(P1 == P2),
  223. ok = pooler:return_member(test_pool_1, P1, ok),
  224. ok = pooler:return_member(test_pool_1, P2, ok),
  225. % pids are reused most recent first
  226. ?assertEqual(P2, pooler:take_member(test_pool_1)),
  227. ?assertEqual(P1, pooler:take_member(test_pool_1))
  228. end},
  229. {"if an in-use pid crashes it is replaced", fun() ->
  230. Pids0 = get_n_pids(3, []),
  231. Ids0 = [pooled_gs:get_id(P) || P <- Pids0],
  232. % crash them all
  233. [pooled_gs:crash(P) || P <- Pids0],
  234. Pids1 = get_n_pids(3, []),
  235. Ids1 = [pooled_gs:get_id(P) || P <- Pids1],
  236. [?assertNot(lists:member(I, Ids0)) || I <- Ids1]
  237. end},
  238. {"if a free pid crashes it is replaced", fun() ->
  239. FreePids = [P || {P, {_, free, _}} <- pooler:pool_stats(test_pool_1)],
  240. [exit(P, kill) || P <- FreePids],
  241. Pids1 = get_n_pids(3, []),
  242. ?assertEqual(3, length(Pids1))
  243. end},
  244. {"if a pid is returned with bad status it is replaced", fun() ->
  245. Pids0 = get_n_pids(3, []),
  246. Ids0 = [pooled_gs:get_id(P) || P <- Pids0],
  247. % return them all marking as bad
  248. [pooler:return_member(test_pool_1, P, fail) || P <- Pids0],
  249. Pids1 = get_n_pids(3, []),
  250. Ids1 = [pooled_gs:get_id(P) || P <- Pids1],
  251. [?assertNot(lists:member(I, Ids0)) || I <- Ids1]
  252. end},
  253. {"if a consumer crashes, pid is replaced", fun() ->
  254. Consumer = start_user(),
  255. StartId = user_id(Consumer),
  256. user_crash(Consumer),
  257. NewPid = hd(get_n_pids(1, [])),
  258. NewId = pooled_gs:get_id(NewPid),
  259. ?assertNot(NewId == StartId)
  260. end},
  261. {"it is ok to return an unknown pid", fun() ->
  262. Bogus1 = spawn(fun() -> ok end),
  263. Bogus2 = spawn(fun() -> ok end),
  264. ?assertEqual(ok, pooler:return_member(test_pool_1, Bogus1, ok)),
  265. ?assertEqual(ok, pooler:return_member(test_pool_1, Bogus2, fail))
  266. end},
  267. {"it is ok to return a pid more than once", fun() ->
  268. M = pooler:take_member(test_pool_1),
  269. [
  270. pooler:return_member(test_pool_1, M)
  271. || _I <- lists:seq(1, 37)
  272. ],
  273. ?assertEqual(
  274. 36,
  275. length(
  276. lists:filter(
  277. fun
  278. (
  279. #{
  280. msg :=
  281. {report, #{
  282. label := "ignored return of free member",
  283. pid := Pid
  284. }}
  285. }
  286. ) ->
  287. Pid =:= M;
  288. (_) ->
  289. false
  290. end,
  291. error_logger_mon:get_msgs()
  292. )
  293. )
  294. ),
  295. M1 = pooler:take_member(test_pool_1),
  296. M2 = pooler:take_member(test_pool_1),
  297. ?assert(M1 =/= M2),
  298. Pool1 = gen_server:call(test_pool_1, dump_pool),
  299. ?assertEqual(2, maps:get(in_use_count, Pool1)),
  300. ?assertEqual(0, maps:get(free_count, Pool1)),
  301. pooler:return_member(test_pool_1, M1),
  302. pooler:return_member(test_pool_1, M2),
  303. Pool2 = gen_server:call(test_pool_1, dump_pool),
  304. ?assertEqual(0, maps:get(in_use_count, Pool2)),
  305. ?assertEqual(2, maps:get(free_count, Pool2)),
  306. ok
  307. end},
  308. {"calling return_member on error_no_members is ignored", fun() ->
  309. ?assertEqual(ok, pooler:return_member(test_pool_1, error_no_members)),
  310. ?assertEqual(ok, pooler:return_member(test_pool_1, error_no_members, ok)),
  311. ?assertEqual(ok, pooler:return_member(test_pool_1, error_no_members, fail))
  312. end},
  313. {"dynamic pool creation", fun() ->
  314. PoolSpec = [
  315. {name, dyn_pool_1},
  316. {max_count, 3},
  317. {init_count, 2},
  318. {start_mfa, {pooled_gs, start_link, [{"dyn-0"}]}}
  319. ],
  320. {ok, SupPid1} = pooler:new_pool(PoolSpec),
  321. ?assert(is_pid(SupPid1)),
  322. M = pooler:take_member(dyn_pool_1),
  323. ?assertMatch({"dyn-0", _Id}, pooled_gs:get_id(M)),
  324. ?assertEqual(ok, pooler:rm_pool(dyn_pool_1)),
  325. ?assertExit({noproc, _}, pooler:take_member(dyn_pool_1)),
  326. %% verify pool of same name can be created after removal
  327. {ok, SupPid2} = pooler:new_pool(PoolSpec),
  328. ?assert(is_pid(SupPid2)),
  329. %% remove non-existing pool
  330. ?assertEqual(ok, pooler:rm_pool(dyn_pool_X)),
  331. ?assertEqual(ok, pooler:rm_pool(dyn_pool_1))
  332. end},
  333. {"metrics have been called (no timeout/queue)", fun() ->
  334. %% exercise the API to ensure we have certain keys reported as metrics
  335. fake_metrics:reset_metrics(),
  336. Pids = [pooler:take_member(test_pool_1) || _I <- lists:seq(1, 10)],
  337. [pooler:return_member(test_pool_1, P) || P <- Pids],
  338. catch pooler:take_member(bad_pool_name),
  339. %% kill and unused member
  340. exit(hd(Pids), kill),
  341. %% kill a used member
  342. KillMe = pooler:take_member(test_pool_1),
  343. exit(KillMe, kill),
  344. %% FIXME: We need to wait for pooler to process the
  345. %% exit message. This is ugly, will fix later.
  346. % :(
  347. timer:sleep(200),
  348. ExpectKeys = lists:sort([
  349. <<"pooler.test_pool_1.error_no_members_count">>,
  350. <<"pooler.test_pool_1.events">>,
  351. <<"pooler.test_pool_1.free_count">>,
  352. <<"pooler.test_pool_1.in_use_count">>,
  353. <<"pooler.test_pool_1.killed_free_count">>,
  354. <<"pooler.test_pool_1.killed_in_use_count">>,
  355. <<"pooler.test_pool_1.take_rate">>
  356. ]),
  357. Metrics = fake_metrics:get_metrics(),
  358. GotKeys = lists:usort([Name || {Name, _, _} <- Metrics]),
  359. ?assertEqual(ExpectKeys, GotKeys)
  360. end},
  361. {"metrics have been called (with timeout/queue)", fun() ->
  362. %% exercise the API to ensure we have certain keys reported as metrics
  363. fake_metrics:reset_metrics(),
  364. %% pass a non-zero timeout here to exercise queueing
  365. Pids = [pooler:take_member(test_pool_1, 1) || _I <- lists:seq(1, 10)],
  366. [pooler:return_member(test_pool_1, P) || P <- Pids],
  367. catch pooler:take_member(bad_pool_name),
  368. %% kill and unused member
  369. exit(hd(Pids), kill),
  370. %% kill a used member
  371. KillMe = pooler:take_member(test_pool_1),
  372. exit(KillMe, kill),
  373. %% FIXME: We need to wait for pooler to process the
  374. %% exit message. This is ugly, will fix later.
  375. % :(
  376. timer:sleep(200),
  377. ExpectKeys = lists:sort([
  378. <<"pooler.test_pool_1.error_no_members_count">>,
  379. <<"pooler.test_pool_1.events">>,
  380. <<"pooler.test_pool_1.free_count">>,
  381. <<"pooler.test_pool_1.in_use_count">>,
  382. <<"pooler.test_pool_1.killed_free_count">>,
  383. <<"pooler.test_pool_1.killed_in_use_count">>,
  384. <<"pooler.test_pool_1.take_rate">>,
  385. <<"pooler.test_pool_1.queue_count">>
  386. ]),
  387. Metrics = fake_metrics:get_metrics(),
  388. GotKeys = lists:usort([Name || {Name, _, _} <- Metrics]),
  389. ?assertEqual(ExpectKeys, GotKeys)
  390. end},
  391. {"accept bad member is handled", fun() ->
  392. Bad = spawn(fun() -> ok end),
  393. FakeStarter = spawn(fun() -> starter end),
  394. ?assertEqual(ok, pooler:accept_member(test_pool_1, {FakeStarter, Bad}))
  395. end},
  396. {"utilization returns sane results", fun() ->
  397. #{max_count := MaxCount, queue_max := QueueMax} = dump_pool(test_pool_1),
  398. ?assertEqual(MaxCount, proplists:get_value(max_count, pooler:pool_utilization(test_pool_1))),
  399. ?assertEqual(0, proplists:get_value(in_use_count, pooler:pool_utilization(test_pool_1))),
  400. ?assertEqual(2, proplists:get_value(free_count, pooler:pool_utilization(test_pool_1))),
  401. ?assertEqual(0, proplists:get_value(queued_count, pooler:pool_utilization(test_pool_1))),
  402. ?assertEqual(QueueMax, proplists:get_value(queue_max, pooler:pool_utilization(test_pool_1)))
  403. end}
  404. ].
  405. pooler_groups_test_() ->
  406. {setup,
  407. fun() ->
  408. logger:set_handler_config(default, filters, []),
  409. application:set_env(pooler, metrics_module, fake_metrics),
  410. fake_metrics:start_link()
  411. end,
  412. fun(_X) ->
  413. fake_metrics:stop()
  414. end,
  415. {foreach,
  416. % setup
  417. fun() ->
  418. Pools = [
  419. [
  420. {name, test_pool_1},
  421. {group, group_1},
  422. {max_count, 3},
  423. {init_count, 2},
  424. {start_mfa, {pooled_gs, start_link, [{"type-1-1"}]}}
  425. ],
  426. [
  427. {name, test_pool_2},
  428. {group, group_1},
  429. {max_count, 3},
  430. {init_count, 2},
  431. {start_mfa, {pooled_gs, start_link, [{"type-1-2"}]}}
  432. ],
  433. %% test_pool_3 not part of the group
  434. [
  435. {name, test_pool_3},
  436. {group, undefined},
  437. {max_count, 3},
  438. {init_count, 2},
  439. {start_mfa, {pooled_gs, start_link, [{"type-3"}]}}
  440. ]
  441. ],
  442. application:set_env(pooler, pools, Pools),
  443. pg_start(),
  444. application:start(pooler)
  445. end,
  446. fun(_X) ->
  447. application:stop(pooler),
  448. pg_stop()
  449. end,
  450. [
  451. {"take and return one group member (repeated)", fun() ->
  452. Types = [
  453. begin
  454. Pid = pooler:take_group_member(group_1),
  455. ?assert(is_pid(Pid), [{result, Pid}, {i, I}]),
  456. {Type, _} = pooled_gs:get_id(Pid),
  457. ?assertMatch("type-1" ++ _, Type),
  458. ok = pooler:return_group_member(group_1, Pid, ok),
  459. timer:sleep(10),
  460. Type
  461. end
  462. || I <- lists:seq(1, 50)
  463. ],
  464. Type_1_1 = [X || "type-1-1" = X <- Types],
  465. Type_1_2 = [X || "type-1-2" = X <- Types],
  466. ?assert(length(Type_1_1) > 0, [{types, Types}]),
  467. ?assert(length(Type_1_2) > 0, [{types, Types}])
  468. end},
  469. {"take member from unknown group", fun() ->
  470. ?assertEqual(
  471. error_no_members,
  472. pooler:take_group_member(not_a_group)
  473. )
  474. end},
  475. {"return member to unknown group", fun() ->
  476. Pid = pooler:take_group_member(group_1),
  477. ?assertEqual(ok, pooler:return_group_member(no_such_group, Pid))
  478. end},
  479. {"return member to wrong group", fun() ->
  480. Pid = pooler:take_member(test_pool_3),
  481. ?assertEqual(ok, pooler:return_group_member(group_1, Pid))
  482. end},
  483. {"return member with something which is not a pid", fun() ->
  484. ?assertException(error, _, pooler:return_group_member(group_1, not_pid))
  485. end},
  486. {"take member from empty group", fun() ->
  487. %% artificially empty group member list
  488. [pg_leave(group_1, M) || M <- pooler:group_pools(group_1)],
  489. ?assertEqual(error_no_members, pooler:take_group_member(group_1))
  490. end},
  491. {"return member to group, implied ok", fun() ->
  492. Pid = pooler:take_group_member(group_1),
  493. ?assertEqual(ok, pooler:return_group_member(group_1, Pid))
  494. end},
  495. {"return error_no_member to group", fun() ->
  496. ?assertEqual(ok, pooler:return_group_member(group_1, error_no_members))
  497. end},
  498. {"exhaust pools in group", fun() ->
  499. Pids = get_n_pids_group(group_1, 6, []),
  500. %% they should all be pids
  501. [
  502. begin
  503. {Type, _} = pooled_gs:get_id(P),
  504. ?assertMatch("type-1" ++ _, Type),
  505. ok
  506. end
  507. || P <- Pids
  508. ],
  509. %% further attempts should be error
  510. [
  511. error_no_members,
  512. error_no_members,
  513. error_no_members
  514. ] = [
  515. pooler:take_group_member(group_1)
  516. || _I <- lists:seq(1, 3)
  517. ]
  518. end},
  519. {"rm_group with nonexisting group", fun() ->
  520. ?assertEqual(ok, pooler:rm_group(i_dont_exist))
  521. end},
  522. {"rm_group with existing empty group", fun() ->
  523. ?assertEqual(ok, pooler:rm_pool(test_pool_1)),
  524. ?assertEqual(ok, pooler:rm_pool(test_pool_2)),
  525. % process group de-registration is asynchronous
  526. timer:sleep(100),
  527. ?assertEqual(error_no_members, pooler:take_group_member(group_1)),
  528. ?assertEqual(ok, pooler:rm_group(group_1)),
  529. ?assertExit({noproc, _}, pooler:take_member(test_pool_1)),
  530. ?assertExit({noproc, _}, pooler:take_member(test_pool_2)),
  531. ?assertEqual(
  532. error_no_members,
  533. pooler:take_group_member(group_1)
  534. )
  535. end},
  536. {"rm_group with existing non-empty group", fun() ->
  537. %% Verify that group members exist
  538. MemberPid = pooler:take_group_member(group_1),
  539. ?assert(is_pid(MemberPid)),
  540. pooler:return_group_member(group_1, MemberPid),
  541. Pool1Pid = pooler:take_member(test_pool_1),
  542. ?assert(is_pid(Pool1Pid)),
  543. pooler:return_member(test_pool_1, Pool1Pid),
  544. Pool2Pid = pooler:take_member(test_pool_2),
  545. ?assert(is_pid(Pool2Pid)),
  546. pooler:return_member(test_pool_2, Pool2Pid),
  547. %% Delete and verify that group and pools are destroyed
  548. ?assertEqual(ok, pooler:rm_group(group_1)),
  549. ?assertExit({noproc, _}, pooler:take_member(test_pool_1)),
  550. ?assertExit({noproc, _}, pooler:take_member(test_pool_2)),
  551. ?assertEqual(
  552. error_no_members,
  553. pooler:take_group_member(group_1)
  554. )
  555. end}
  556. ]}}.
  557. pooler_limit_failed_adds_test_() ->
  558. %% verify that pooler crashes completely if too many failures are
  559. %% encountered while trying to add pids.
  560. {setup,
  561. fun() ->
  562. logger:set_handler_config(default, filters, []),
  563. Pools = [
  564. [
  565. {name, test_pool_1},
  566. {max_count, 10},
  567. {init_count, 10},
  568. {start_mfa, {pooled_gs, start_link, [crash]}}
  569. ]
  570. ],
  571. application:set_env(pooler, pools, Pools)
  572. end,
  573. fun(_) ->
  574. application:stop(pooler)
  575. end,
  576. fun() ->
  577. application:start(pooler),
  578. ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
  579. ?assertEqual(error_no_members, pooler:take_member(test_pool_1))
  580. end}.
  581. pooler_scheduled_cull_test_() ->
  582. {setup,
  583. fun() ->
  584. logger:set_handler_config(default, filters, []),
  585. application:set_env(pooler, metrics_module, fake_metrics),
  586. fake_metrics:start_link(),
  587. Pools = [
  588. [
  589. {name, test_pool_1},
  590. {max_count, 10},
  591. {init_count, 2},
  592. {start_mfa, {pooled_gs, start_link, [{"type-0"}]}},
  593. {cull_interval, {200, ms}},
  594. {max_age, {0, min}}
  595. ]
  596. ],
  597. application:set_env(pooler, pools, Pools),
  598. application:start(pooler)
  599. end,
  600. fun(_X) ->
  601. fake_metrics:stop(),
  602. application:stop(pooler)
  603. end,
  604. [
  605. {foreach,
  606. fun() ->
  607. Pids = get_n_pids(test_pool_1, 10, []),
  608. ?assertEqual(10, length(pooler:pool_stats(test_pool_1))),
  609. ?assertEqual(10, length(Pids)),
  610. Pids
  611. end,
  612. fun(Pids) ->
  613. [pooler:return_member(test_pool_1, P) || P <- Pids]
  614. end,
  615. [
  616. fun(Pids) ->
  617. {"excess members are culled run 1", fun() ->
  618. [pooler:return_member(test_pool_1, P) || P <- Pids],
  619. %% wait for longer than cull delay
  620. timer:sleep(250),
  621. ?assertEqual(2, length(pooler:pool_stats(test_pool_1))),
  622. ?assertEqual(2, proplists:get_value(free_count, pooler:pool_utilization(test_pool_1)))
  623. end}
  624. end,
  625. fun(Pids) ->
  626. {"excess members are culled run 2", fun() ->
  627. [pooler:return_member(test_pool_1, P) || P <- Pids],
  628. %% wait for longer than cull delay
  629. timer:sleep(250),
  630. ?assertEqual(2, length(pooler:pool_stats(test_pool_1))),
  631. ?assertEqual(2, proplists:get_value(free_count, pooler:pool_utilization(test_pool_1)))
  632. end}
  633. end,
  634. fun(Pids) -> in_use_members_not_culled(Pids, 1) end,
  635. fun(Pids) -> in_use_members_not_culled(Pids, 2) end,
  636. fun(Pids) -> in_use_members_not_culled(Pids, 3) end,
  637. fun(Pids) -> in_use_members_not_culled(Pids, 4) end,
  638. fun(Pids) -> in_use_members_not_culled(Pids, 5) end,
  639. fun(Pids) -> in_use_members_not_culled(Pids, 6) end
  640. ]},
  641. {"no cull when init_count matches max_count",
  642. %% not sure how to verify this. But this test at least
  643. %% exercises the code path.
  644. fun() ->
  645. Config = [
  646. {name, test_static_pool_1},
  647. {max_count, 2},
  648. {init_count, 2},
  649. {start_mfa, {pooled_gs, start_link, [{"static-0"}]}},
  650. % ignored
  651. {cull_interval, {200, ms}}
  652. ],
  653. pooler:new_pool(Config),
  654. P = pooler:take_member(test_static_pool_1),
  655. ?assertMatch({"static-0", _}, pooled_gs:get_id(P)),
  656. pooler:return_member(test_static_pool_1, P),
  657. ok
  658. end}
  659. ]}.
  660. in_use_members_not_culled(Pids, N) ->
  661. {"in-use members are not culled " ++ erlang:integer_to_list(N), fun() ->
  662. %% wait for longer than cull delay
  663. timer:sleep(250),
  664. PidCount = length(Pids),
  665. ?assertEqual(
  666. PidCount,
  667. length(pooler:pool_stats(test_pool_1))
  668. ),
  669. ?assertEqual(0, proplists:get_value(free_count, pooler:pool_utilization(test_pool_1))),
  670. ?assertEqual(PidCount, proplists:get_value(in_use_count, pooler:pool_utilization(test_pool_1))),
  671. Returns = lists:sublist(Pids, N),
  672. [
  673. pooler:return_member(test_pool_1, P)
  674. || P <- Returns
  675. ],
  676. timer:sleep(250),
  677. ?assertEqual(
  678. PidCount - N,
  679. length(pooler:pool_stats(test_pool_1))
  680. )
  681. end}.
  682. random_message_test_() ->
  683. {setup,
  684. fun() ->
  685. logger:set_handler_config(default, filters, []),
  686. Pools = [
  687. [
  688. {name, test_pool_1},
  689. {max_count, 2},
  690. {init_count, 1},
  691. {start_mfa, {pooled_gs, start_link, [{"type-0"}]}}
  692. ]
  693. ],
  694. application:set_env(pooler, pools, Pools),
  695. application:start(pooler),
  696. %% now send some bogus messages
  697. %% do the call in a throw-away process to avoid timeout error
  698. spawn(fun() -> catch gen_server:call(test_pool_1, {unexpected_garbage_msg, 5}) end),
  699. gen_server:cast(test_pool_1, {unexpected_garbage_msg, 6}),
  700. whereis(test_pool_1) ! {unexpected_garbage_msg, 7},
  701. ok
  702. end,
  703. fun(_) ->
  704. application:stop(pooler)
  705. end,
  706. [
  707. fun() ->
  708. Pid = spawn(fun() -> ok end),
  709. MonMsg = {'DOWN', erlang:make_ref(), process, Pid, because},
  710. test_pool_1 ! MonMsg
  711. end,
  712. fun() ->
  713. Pid = pooler:take_member(test_pool_1),
  714. {Type, _} = pooled_gs:get_id(Pid),
  715. ?assertEqual("type-0", Type)
  716. end,
  717. fun() ->
  718. RawPool = gen_server:call(test_pool_1, dump_pool),
  719. ?assertEqual(pool, maps:get('$record_name', RawPool))
  720. end
  721. ]}.
  722. pooler_integration_long_init_test_() ->
  723. {foreach,
  724. % setup
  725. fun() ->
  726. logger:set_handler_config(default, filters, []),
  727. {ok, _} = error_logger_mon:start_link(),
  728. error_logger_mon:install_handler(pooler),
  729. Pool = [
  730. {name, test_pool_1},
  731. {max_count, 10},
  732. {init_count, 0},
  733. {member_start_timeout, {10, ms}},
  734. {start_mfa, {pooled_gs, start_link, [{"type-0", fun() -> timer:sleep(15) end}]}}
  735. ],
  736. application:set_env(pooler, pools, [Pool]),
  737. application:start(pooler)
  738. end,
  739. % cleanup
  740. fun(_) ->
  741. error_logger_mon:uninstall_handler(),
  742. ok = error_logger_mon:stop(),
  743. application:stop(pooler)
  744. end,
  745. %
  746. [
  747. fun(_) ->
  748. % Test what happens when pool members take too long to start.
  749. % The pooler_starter should kill off stale members, there by
  750. % reducing the number of children of the member_sup. This
  751. % activity occurs both during take member and accept member.
  752. % Accordingly, the count should go to zero once all starters
  753. % check in.
  754. fun() ->
  755. ?assertEqual(0, children_count(pooler_test_pool_1_member_sup)),
  756. [
  757. begin
  758. ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
  759. ?assertEqual(1, starting_members(test_pool_1))
  760. end
  761. || _ <- lists:seq(1, 10)
  762. ],
  763. ?assertEqual(10, children_count(pooler_test_pool_1_member_sup)),
  764. timer:sleep(150),
  765. ?assertEqual(0, children_count(pooler_test_pool_1_member_sup)),
  766. ?assertEqual(0, starting_members(test_pool_1)),
  767. %% there is a log when worker start times out
  768. ?assert(
  769. lists:any(
  770. fun
  771. (
  772. #{
  773. level := error,
  774. msg :=
  775. {report, #{
  776. label := "starting member timeout",
  777. pool := test_pool_1
  778. }}
  779. }
  780. ) ->
  781. true;
  782. (_) ->
  783. false
  784. end,
  785. error_logger_mon:get_msgs()
  786. )
  787. )
  788. end
  789. end
  790. ]}.
  791. sleep_for_configured_timeout() ->
  792. SleepTime =
  793. case application:get_env(pooler, sleep_time) of
  794. {ok, Val} ->
  795. Val;
  796. _ ->
  797. 0
  798. end,
  799. timer:sleep(SleepTime).
  800. pooler_integration_queueing_test_() ->
  801. {foreach,
  802. % setup
  803. fun() ->
  804. logger:set_handler_config(default, filters, []),
  805. Pool = [
  806. {name, test_pool_1},
  807. {max_count, 10},
  808. {queue_max, 10},
  809. {init_count, 0},
  810. {metrics, fake_metrics},
  811. {member_start_timeout, {5, sec}},
  812. {start_mfa,
  813. {pooled_gs, start_link, [
  814. {"type-0", fun pooler_tests:sleep_for_configured_timeout/0}
  815. ]}}
  816. ],
  817. application:set_env(pooler, pools, [Pool]),
  818. fake_metrics:start_link(),
  819. application:start(pooler)
  820. end,
  821. % cleanup
  822. fun(_) ->
  823. fake_metrics:stop(),
  824. application:stop(pooler)
  825. end,
  826. [
  827. fun(_) ->
  828. fun() ->
  829. ?assertEqual(0, maps:get(free_count, dump_pool(test_pool_1))),
  830. Val = pooler:take_member(test_pool_1, 10),
  831. ?assert(is_pid(Val)),
  832. pooler:return_member(test_pool_1, Val)
  833. end
  834. end,
  835. fun(_) ->
  836. fun() ->
  837. application:set_env(pooler, sleep_time, 1),
  838. ?assertEqual(0, maps:get(free_count, dump_pool(test_pool_1))),
  839. Val = pooler:take_member(test_pool_1, 0),
  840. ?assertEqual(error_no_members, Val),
  841. ?assertEqual(0, proplists:get_value(free_count, pooler:pool_utilization(test_pool_1))),
  842. timer:sleep(50),
  843. %Next request should be available
  844. Pid = pooler:take_member(test_pool_1, 0),
  845. ?assert(is_pid(Pid)),
  846. pooler:return_member(test_pool_1, Pid)
  847. end
  848. end,
  849. fun(_) ->
  850. fun() ->
  851. application:set_env(pooler, sleep_time, 10),
  852. ?assertEqual(0, maps:get(free_count, dump_pool(test_pool_1))),
  853. [
  854. ?assertEqual(pooler:take_member(test_pool_1, 0), error_no_members)
  855. || _ <- lists:seq(1, maps:get(max_count, dump_pool(test_pool_1)))
  856. ],
  857. timer:sleep(50),
  858. %Next request should be available
  859. Pid = pooler:take_member(test_pool_1, 0),
  860. ?assert(is_pid(Pid)),
  861. pooler:return_member(test_pool_1, Pid)
  862. end
  863. end,
  864. fun(_) ->
  865. fun() ->
  866. % fill to queue_max, next request should return immediately with no_members
  867. % Will return a if queue max is not enforced.
  868. application:set_env(pooler, sleep_time, 100),
  869. [
  870. proc_lib:spawn(fun() ->
  871. Val = pooler:take_member(test_pool_1, 200),
  872. ?assert(is_pid(Val)),
  873. pooler:return_member(Val)
  874. end)
  875. || _ <- lists:seq(1, maps:get(max_count, dump_pool(test_pool_1)))
  876. ],
  877. ?assertEqual(0, proplists:get_value(free_count, pooler:pool_utilization(test_pool_1))),
  878. ?assert(proplists:get_value(queued_count, pooler:pool_utilization(test_pool_1)) >= 1),
  879. ?assertEqual(10, proplists:get_value(queue_max, pooler:pool_utilization(test_pool_1))),
  880. timer:sleep(50),
  881. ?assertEqual(10, queue:len(maps:get(queued_requestors, dump_pool(test_pool_1)))),
  882. ?assertEqual(10, proplists:get_value(queue_max, pooler:pool_utilization(test_pool_1))),
  883. ?assertEqual(pooler:take_member(test_pool_1, 500), error_no_members),
  884. ExpectKeys = lists:sort([
  885. <<"pooler.test_pool_1.error_no_members_count">>,
  886. <<"pooler.test_pool_1.events">>,
  887. <<"pooler.test_pool_1.take_rate">>,
  888. <<"pooler.test_pool_1.queue_count">>,
  889. <<"pooler.test_pool_1.queue_max_reached">>
  890. ]),
  891. Metrics = fake_metrics:get_metrics(),
  892. GotKeys = lists:usort([Name || {Name, _, _} <- Metrics]),
  893. ?assertEqual(ExpectKeys, GotKeys),
  894. timer:sleep(100),
  895. Val = pooler:take_member(test_pool_1, 500),
  896. ?assert(is_pid(Val)),
  897. pooler:return_member(test_pool_1, Val)
  898. end
  899. end
  900. ]}.
  901. pooler_integration_queueing_return_member_test_() ->
  902. {foreach,
  903. % setup
  904. fun() ->
  905. logger:set_handler_config(default, filters, []),
  906. Pool = [
  907. {name, test_pool_1},
  908. {max_count, 10},
  909. {queue_max, 10},
  910. {init_count, 10},
  911. {metrics, fake_metrics},
  912. {member_start_timeout, {5, sec}},
  913. {start_mfa,
  914. {pooled_gs, start_link, [
  915. {"type-0", fun pooler_tests:sleep_for_configured_timeout/0}
  916. ]}}
  917. ],
  918. application:set_env(pooler, pools, [Pool]),
  919. fake_metrics:start_link(),
  920. application:start(pooler)
  921. end,
  922. % cleanup
  923. fun(_) ->
  924. fake_metrics:stop(),
  925. application:stop(pooler)
  926. end,
  927. [
  928. fun(_) ->
  929. fun() ->
  930. application:set_env(pooler, sleep_time, 0),
  931. Parent = self(),
  932. Pids = [
  933. proc_lib:spawn_link(fun() ->
  934. Val = pooler:take_member(test_pool_1, 200),
  935. ?assert(is_pid(Val)),
  936. Parent ! {taken, self()},
  937. receive
  938. return ->
  939. pooler:return_member(test_pool_1, Val)
  940. after 5000 ->
  941. pooler:return_member(test_pool_1, Val)
  942. end,
  943. Parent ! {returned, self()}
  944. end)
  945. || _ <- lists:seq(1, maps:get(max_count, dump_pool(test_pool_1)))
  946. ],
  947. [
  948. receive
  949. {taken, Pid} -> ok
  950. end
  951. || Pid <- Pids
  952. ],
  953. ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
  954. proc_lib:spawn_link(fun() ->
  955. Val = pooler:take_member(test_pool_1, 200),
  956. Parent ! {extra_result, Val}
  957. end),
  958. [Pid ! return || Pid <- Pids],
  959. [
  960. receive
  961. {returned, Pid} -> ok
  962. end
  963. || Pid <- Pids
  964. ],
  965. receive
  966. {extra_result, Result} ->
  967. ?assert(is_pid(Result)),
  968. pooler:return_member(test_pool_1, Result)
  969. end,
  970. ?assertEqual(
  971. maps:get(max_count, dump_pool(test_pool_1)),
  972. length(maps:get(free_pids, dump_pool(test_pool_1)))
  973. ),
  974. ?assertEqual(
  975. maps:get(max_count, dump_pool(test_pool_1)),
  976. maps:get(free_count, dump_pool(test_pool_1))
  977. )
  978. end
  979. end
  980. ]}.
  981. pooler_integration_test_() ->
  982. {foreach,
  983. % setup
  984. fun() ->
  985. logger:set_handler_config(default, filters, []),
  986. Pools = [
  987. [
  988. {name, test_pool_1},
  989. {max_count, 10},
  990. {init_count, 10},
  991. {start_mfa, {pooled_gs, start_link, [{"type-0"}]}}
  992. ]
  993. ],
  994. application:set_env(pooler, pools, Pools),
  995. application:start(pooler),
  996. Users = [start_user() || _X <- lists:seq(1, 10)],
  997. Users
  998. end,
  999. % cleanup
  1000. fun(Users) ->
  1001. [user_stop(U) || U <- Users],
  1002. application:stop(pooler)
  1003. end,
  1004. %
  1005. [
  1006. fun(Users) ->
  1007. fun() ->
  1008. % each user has a different tc ID
  1009. TcIds = lists:sort([user_id(UPid) || UPid <- Users]),
  1010. ?assertEqual(lists:usort(TcIds), TcIds)
  1011. end
  1012. end,
  1013. fun(Users) ->
  1014. fun() ->
  1015. % users still unique after a renew cycle
  1016. [user_new_tc(UPid) || UPid <- Users],
  1017. TcIds = lists:sort([user_id(UPid) || UPid <- Users]),
  1018. ?assertEqual(lists:usort(TcIds), TcIds)
  1019. end
  1020. end,
  1021. fun(Users) ->
  1022. fun() ->
  1023. % all users crash, pids are replaced
  1024. TcIds1 = lists:sort([user_id(UPid) || UPid <- Users]),
  1025. [user_crash(UPid) || UPid <- Users],
  1026. Seq = lists:seq(1, 5),
  1027. Users2 = [start_user() || _X <- Seq],
  1028. TcIds2 = lists:sort([user_id(UPid) || UPid <- Users2]),
  1029. Both =
  1030. sets:to_list(
  1031. sets:intersection([
  1032. sets:from_list(TcIds1),
  1033. sets:from_list(TcIds2)
  1034. ])
  1035. ),
  1036. ?assertEqual([], Both)
  1037. end
  1038. end
  1039. ]}.
  1040. pooler_auto_grow_disabled_by_default_test_() ->
  1041. {setup,
  1042. fun() ->
  1043. logger:set_handler_config(default, filters, []),
  1044. application:set_env(pooler, metrics_module, fake_metrics),
  1045. fake_metrics:start_link()
  1046. end,
  1047. fun(_X) ->
  1048. fake_metrics:stop()
  1049. end,
  1050. {foreach,
  1051. % setup
  1052. fun() ->
  1053. Pool = [
  1054. {name, test_pool_1},
  1055. {max_count, 5},
  1056. {init_count, 2},
  1057. {start_mfa, {pooled_gs, start_link, [{"type-0"}]}}
  1058. ],
  1059. application:unset_env(pooler, pools),
  1060. application:start(pooler),
  1061. pooler:new_pool(Pool)
  1062. end,
  1063. fun(_X) ->
  1064. application:stop(pooler)
  1065. end,
  1066. [
  1067. {"take one, and it should not auto-grow", fun() ->
  1068. ?assertEqual(2, maps:get(free_count, dump_pool(test_pool_1))),
  1069. P = pooler:take_member(test_pool_1),
  1070. ?assertMatch({"type-0", _Id}, pooled_gs:get_id(P)),
  1071. timer:sleep(100),
  1072. ?assertEqual(1, maps:get(free_count, dump_pool(test_pool_1))),
  1073. ok,
  1074. pooler:return_member(test_pool_1, P)
  1075. end}
  1076. ]}}.
  1077. pooler_auto_grow_enabled_test_() ->
  1078. {setup,
  1079. fun() ->
  1080. logger:set_handler_config(default, filters, []),
  1081. application:set_env(pooler, metrics_module, fake_metrics),
  1082. fake_metrics:start_link()
  1083. end,
  1084. fun(_X) ->
  1085. fake_metrics:stop()
  1086. end,
  1087. {foreach,
  1088. % setup
  1089. fun() ->
  1090. Pool = [
  1091. {name, test_pool_1},
  1092. {max_count, 5},
  1093. {init_count, 2},
  1094. {auto_grow_threshold, 1},
  1095. {start_mfa, {pooled_gs, start_link, [{"type-0"}]}}
  1096. ],
  1097. application:unset_env(pooler, pools),
  1098. application:start(pooler),
  1099. pooler:new_pool(Pool)
  1100. end,
  1101. fun(_X) ->
  1102. application:stop(pooler)
  1103. end,
  1104. [
  1105. {"take one, and it should grow by 2", fun() ->
  1106. ?assertEqual(2, maps:get(free_count, dump_pool(test_pool_1))),
  1107. P = pooler:take_member(test_pool_1),
  1108. ?assertMatch({"type-0", _Id}, pooled_gs:get_id(P)),
  1109. timer:sleep(100),
  1110. ?assertEqual(3, maps:get(free_count, dump_pool(test_pool_1))),
  1111. ok,
  1112. pooler:return_member(test_pool_1, P)
  1113. end}
  1114. ]}}.
  1115. pooler_custom_stop_mfa_test_() ->
  1116. {foreach,
  1117. fun() ->
  1118. logger:set_handler_config(default, filters, []),
  1119. Pool = [
  1120. {name, test_pool_1},
  1121. {max_count, 3},
  1122. {init_count, 2},
  1123. {cull_interval, {200, ms}},
  1124. {max_age, {0, min}},
  1125. {start_mfa, {pooled_gs, start_link, [{foo_type}]}}
  1126. ],
  1127. application:set_env(pooler, pools, [Pool])
  1128. end,
  1129. fun(_) ->
  1130. application:unset_env(pooler, pools)
  1131. end,
  1132. [
  1133. {"default behavior kills the pool member", fun() ->
  1134. ok = application:start(pooler),
  1135. Reason = monitor_members_trigger_culling_and_return_reason(),
  1136. ok = application:stop(pooler),
  1137. ?assertEqual(killed, Reason)
  1138. end},
  1139. {"custom callback terminates the pool member normally", fun() ->
  1140. {ok, [Pool]} = application:get_env(pooler, pools),
  1141. Stop = {stop_mfa, {pooled_gs, stop, ['$pooler_pid']}},
  1142. application:set_env(pooler, pools, [[Stop | Pool]]),
  1143. ok = application:start(pooler),
  1144. Reason = monitor_members_trigger_culling_and_return_reason(),
  1145. ok = application:stop(pooler),
  1146. ?assertEqual(normal, Reason)
  1147. end}
  1148. ]}.
  1149. no_error_logger_reports_after_culling_test_() ->
  1150. %% damn those wraiths! This is the cure
  1151. {foreach,
  1152. fun() ->
  1153. logger:set_handler_config(default, filters, []),
  1154. {ok, _Pid} = error_logger_mon:start_link(),
  1155. Pool = [
  1156. {name, test_pool_1},
  1157. {max_count, 3},
  1158. {init_count, 2},
  1159. {cull_interval, {200, ms}},
  1160. {max_age, {0, min}},
  1161. {start_mfa, {pooled_gs, start_link, [{type}]}}
  1162. ],
  1163. application:set_env(pooler, pools, [Pool])
  1164. end,
  1165. fun(_) ->
  1166. ok = error_logger_mon:stop(),
  1167. error_logger_mon:uninstall_handler(),
  1168. application:unset_env(pooler, pools)
  1169. end,
  1170. [
  1171. {"Force supervisor to report by using exit/2 instead of terminate_child", fun() ->
  1172. {ok, [Pool]} = application:get_env(pooler, pools),
  1173. Stop = {stop_mfa, {erlang, exit, ['$pooler_pid', kill]}},
  1174. application:set_env(pooler, pools, [[Stop | Pool]]),
  1175. ok = application:start(pooler),
  1176. error_logger_mon:install_handler(),
  1177. error_logger_mon:reset(),
  1178. Reason = monitor_members_trigger_culling_and_return_reason(),
  1179. %% we need to wait for the message to arrive before deleting handler
  1180. timer:sleep(250),
  1181. error_logger_mon:uninstall_handler(),
  1182. ok = application:stop(pooler),
  1183. ?assertEqual(killed, Reason),
  1184. ?assertEqual(
  1185. 1,
  1186. error_logger_mon:get_msg_count(),
  1187. [
  1188. {msgs, error_logger_mon:get_msgs()},
  1189. {m, [R || #{msg := {report, R}} <- error_logger_mon:get_msgs()]}
  1190. ]
  1191. )
  1192. end},
  1193. {"Default MFA shouldn't generate any reports during culling", fun() ->
  1194. ok = application:start(pooler),
  1195. error_logger_mon:install_handler(),
  1196. Reason = monitor_members_trigger_culling_and_return_reason(),
  1197. error_logger_mon:uninstall_handler(),
  1198. ok = application:stop(pooler),
  1199. ?assertEqual(killed, Reason),
  1200. ?assertEqual(0, error_logger_mon:get_msg_count())
  1201. end}
  1202. ]}.
  1203. reconfigure_test_() ->
  1204. Name = test_pool_1,
  1205. InitCount = 2,
  1206. MaxCount = 4,
  1207. StartConfig = #{
  1208. name => Name,
  1209. max_count => MaxCount,
  1210. init_count => InitCount,
  1211. start_mfa => {pooled_gs, start_link, [{reconfigure_test}]}
  1212. },
  1213. {foreach,
  1214. fun() ->
  1215. logger:set_handler_config(default, filters, []),
  1216. application:set_env(pooler, pools, [StartConfig]),
  1217. application:set_env(pooler, metrics_module, pooler_no_metrics),
  1218. application:start(pooler)
  1219. end,
  1220. fun(_) ->
  1221. application:unset_env(pooler, pools),
  1222. application:stop(pooler)
  1223. end,
  1224. [
  1225. {"Raise init_count", fun() ->
  1226. Config1 = StartConfig#{init_count => 3},
  1227. ?assertEqual(
  1228. {ok, [
  1229. {set_parameter, {init_count, 3}},
  1230. {start_workers, 1}
  1231. ]},
  1232. pooler:pool_reconfigure(Name, Config1)
  1233. ),
  1234. ?assertMatch(
  1235. #{
  1236. init_count := 3,
  1237. free_count := 3,
  1238. free_pids := [_, _, _]
  1239. },
  1240. wait_for_dump(Name, 5000, fun(#{free_count := C}) -> C =:= 3 end)
  1241. )
  1242. end},
  1243. {"Lower max_count", fun() ->
  1244. Config1 = StartConfig#{max_count => 1, init_count => 1},
  1245. ?assertEqual(
  1246. {ok, [
  1247. {set_parameter, {init_count, 1}},
  1248. {set_parameter, {max_count, 1}},
  1249. {stop_free_workers, 1}
  1250. ]},
  1251. pooler:pool_reconfigure(Name, Config1)
  1252. ),
  1253. ?assertMatch(
  1254. #{
  1255. init_count := 1,
  1256. max_count := 1,
  1257. free_count := 1,
  1258. free_pids := [_]
  1259. },
  1260. wait_for_dump(Name, 5000, fun(#{free_count := C}) -> C =:= 1 end)
  1261. )
  1262. end},
  1263. {"Lower queue_max", fun() ->
  1264. NewQMax = 4,
  1265. NewConfig = StartConfig#{queue_max => NewQMax},
  1266. Parent = self(),
  1267. NumClients = 10,
  1268. _Clients = lists:map(
  1269. fun(_) ->
  1270. spawn_link(
  1271. fun() ->
  1272. Parent ! pooler:take_member(Name, 60000),
  1273. timer:sleep(60000)
  1274. end
  1275. )
  1276. end,
  1277. lists:seq(1, NumClients)
  1278. ),
  1279. timer:sleep(100),
  1280. % 6
  1281. QueueSize = NumClients - MaxCount,
  1282. %2
  1283. Shrink = QueueSize - NewQMax,
  1284. ?assertEqual(
  1285. {ok, [
  1286. {set_parameter, {queue_max, NewQMax}},
  1287. {shrink_queue, Shrink}
  1288. ]},
  1289. pooler:pool_reconfigure(Name, NewConfig)
  1290. ),
  1291. ?assertMatch(
  1292. [
  1293. W1,
  1294. W2,
  1295. W3,
  1296. W4,
  1297. error_no_members,
  1298. error_no_members
  1299. ] when
  1300. is_pid(W1) andalso
  1301. is_pid(W2) andalso
  1302. is_pid(W3) andalso
  1303. is_pid(W4),
  1304. [
  1305. receive
  1306. M -> M
  1307. after 5000 -> error(timeout)
  1308. end
  1309. || _ <- lists:seq(1, MaxCount + Shrink)
  1310. ]
  1311. ),
  1312. #{
  1313. queue_max := QMax,
  1314. queued_requestors := Q
  1315. } = gen_server:call(Name, dump_pool),
  1316. % queue_max in the state is updated
  1317. ?assertEqual(QMax, NewQMax),
  1318. % queue is full
  1319. ?assertEqual(NewQMax, queue:len(Q))
  1320. end},
  1321. {"Lower cull_interval", fun() ->
  1322. NewConfig = StartConfig#{cull_interval => {10, sec}},
  1323. ?assertEqual(
  1324. {ok, [
  1325. {set_parameter, {cull_interval, {10, sec}}},
  1326. {reset_cull_timer, {10, sec}}
  1327. ]},
  1328. pooler:pool_reconfigure(Name, NewConfig)
  1329. )
  1330. end},
  1331. {"Lower max_age", fun() ->
  1332. NewConfig = StartConfig#{max_age => {100, ms}},
  1333. Workers = [pooler:take_member(Name, 5000) || _ <- lists:seq(1, MaxCount)],
  1334. [pooler:return_member(Name, Pid) || Pid <- Workers],
  1335. ?assertEqual(
  1336. {ok, [
  1337. {set_parameter, {max_age, {100, ms}}},
  1338. {cull, []}
  1339. ]},
  1340. pooler:pool_reconfigure(Name, NewConfig)
  1341. ),
  1342. %% make sure workers are culled
  1343. wait_for_dump(
  1344. Name,
  1345. 1000,
  1346. fun(#{free_count := Free}) ->
  1347. Name ! cull_pool,
  1348. Free =:= InitCount
  1349. end
  1350. )
  1351. end},
  1352. {"Update group", fun() ->
  1353. NewConfig1 = StartConfig#{group => my_group1},
  1354. ?assertEqual(
  1355. {ok, [
  1356. {set_parameter, {group, my_group1}},
  1357. {join_group, my_group1}
  1358. ]},
  1359. pooler:pool_reconfigure(Name, NewConfig1)
  1360. ),
  1361. PoolPid = whereis(Name),
  1362. ?assertMatch([PoolPid], pooler:group_pools(my_group1)),
  1363. NewConfig2 = StartConfig#{group => my_group2},
  1364. ?assertEqual(
  1365. {ok, [
  1366. {set_parameter, {group, my_group2}},
  1367. {leave_group, my_group1},
  1368. {join_group, my_group2}
  1369. ]},
  1370. pooler:pool_reconfigure(Name, NewConfig2)
  1371. ),
  1372. ?assertMatch([], pooler:group_pools(my_group1)),
  1373. ?assertMatch([PoolPid], pooler:group_pools(my_group2)),
  1374. ?assertEqual(
  1375. {ok, [
  1376. {set_parameter, {group, undefined}},
  1377. {leave_group, my_group2}
  1378. ]},
  1379. pooler:pool_reconfigure(Name, StartConfig)
  1380. ),
  1381. ?assertMatch([], pooler:group_pools(my_group1)),
  1382. ?assertMatch([], pooler:group_pools(my_group2))
  1383. end},
  1384. {"Change basic configs", fun() ->
  1385. NewMaxCount = MaxCount + 5,
  1386. NewConfig = StartConfig#{
  1387. max_count => NewMaxCount,
  1388. member_start_timeout => {10, sec},
  1389. queue_max => 100,
  1390. metrics_mod => fake_metrics,
  1391. stop_mfa => {erlang, exit, ['$pooler_pid', shutdown]},
  1392. auto_grow_threshold => 1
  1393. },
  1394. ?assertEqual(
  1395. {ok, [
  1396. {set_parameter, {max_count, NewMaxCount}},
  1397. {set_parameter, {member_start_timeout, {10, sec}}},
  1398. {set_parameter, {queue_max, 100}},
  1399. {set_parameter, {metrics_mod, fake_metrics}},
  1400. {set_parameter, {stop_mfa, {erlang, exit, ['$pooler_pid', shutdown]}}},
  1401. {set_parameter, {auto_grow_threshold, 1}}
  1402. ]},
  1403. pooler:pool_reconfigure(Name, NewConfig)
  1404. ),
  1405. ?assertMatch(
  1406. #{
  1407. max_count := NewMaxCount,
  1408. member_start_timeout := {10, sec},
  1409. queue_max := 100,
  1410. metrics_mod := fake_metrics,
  1411. stop_mfa := {erlang, exit, ['$pooler_pid', shutdown]},
  1412. auto_grow_threshold := 1
  1413. },
  1414. gen_server:call(Name, dump_pool)
  1415. )
  1416. end},
  1417. {"Update failed", fun() ->
  1418. ?assertEqual(
  1419. {error, changed_unsupported_parameter},
  1420. pooler:pool_reconfigure(
  1421. Name, StartConfig#{start_mfa := {erlang, spawn, [a, b, []]}}
  1422. )
  1423. ),
  1424. ?assertEqual(
  1425. {error, changed_unsupported_parameter},
  1426. pooler:pool_reconfigure(
  1427. Name, StartConfig#{name := not_a_pool_name}
  1428. )
  1429. )
  1430. end}
  1431. ]}.
  1432. wait_for_dump(Pool, Timeout, Fun) when Timeout > 0 ->
  1433. Dump = gen_server:call(Pool, dump_pool),
  1434. case Fun(Dump) of
  1435. true ->
  1436. Dump;
  1437. false ->
  1438. timer:sleep(50),
  1439. wait_for_dump(Pool, Timeout - 50, Fun)
  1440. end;
  1441. wait_for_dump(_, _, _) ->
  1442. error(timeout).
  1443. monitor_members_trigger_culling_and_return_reason() ->
  1444. Pids = get_n_pids(test_pool_1, 3, []),
  1445. [erlang:monitor(process, P) || P <- Pids],
  1446. [pooler:return_member(test_pool_1, P) || P <- Pids],
  1447. receive
  1448. {'DOWN', _Ref, process, _Pid, Reason} ->
  1449. Reason
  1450. after 250 -> timeout
  1451. end.
  1452. time_as_millis_test_() ->
  1453. Zeros = [{{0, U}, 0} || U <- [min, sec, ms, mu]],
  1454. Ones = [
  1455. {{1, min}, 60000},
  1456. {{1, sec}, 1000},
  1457. {{1, ms}, 1},
  1458. {{1, mu}, 0}
  1459. ],
  1460. Misc = [{{3000, mu}, 3}],
  1461. Tests = Zeros ++ Ones ++ Misc,
  1462. [?_assertEqual(E, pooler:time_as_millis(I)) || {I, E} <- Tests].
  1463. time_as_micros_test_() ->
  1464. Zeros = [{{0, U}, 0} || U <- [min, sec, ms, mu]],
  1465. Ones = [
  1466. {{1, min}, 60000000},
  1467. {{1, sec}, 1000000},
  1468. {{1, ms}, 1000},
  1469. {{1, mu}, 1}
  1470. ],
  1471. Misc = [{{3000, mu}, 3000}],
  1472. Tests = Zeros ++ Ones ++ Misc,
  1473. [?_assertEqual(E, pooler:time_as_micros(I)) || {I, E} <- Tests].
  1474. call_free_members_test_() ->
  1475. NumWorkers = 10,
  1476. PoolName = test_pool_1,
  1477. {setup,
  1478. fun() ->
  1479. application:set_env(pooler, metrics_module, fake_metrics),
  1480. fake_metrics:start_link()
  1481. end,
  1482. fun(_X) ->
  1483. fake_metrics:stop()
  1484. end,
  1485. {foreach,
  1486. fun() ->
  1487. Pool = [
  1488. {name, PoolName},
  1489. {max_count, NumWorkers},
  1490. {init_count, NumWorkers},
  1491. {start_mfa, {pooled_gs, start_link, [{"type-0"}]}}
  1492. ],
  1493. application:unset_env(pooler, pools),
  1494. application:start(pooler),
  1495. pooler:new_pool(Pool)
  1496. end,
  1497. fun(_X) ->
  1498. application:stop(pooler)
  1499. end,
  1500. [
  1501. {"perform a ping across the pool when all workers are free", fun() ->
  1502. ?assertEqual(NumWorkers, maps:get(free_count, dump_pool(PoolName))),
  1503. Res = pooler:call_free_members(PoolName, fun pooled_gs:ping/1),
  1504. ?assertEqual(NumWorkers, count_pongs(Res))
  1505. end},
  1506. {"perform a ping across the pool when two workers are taken", fun() ->
  1507. ?assertEqual(NumWorkers, maps:get(free_count, dump_pool(PoolName))),
  1508. Pids = [pooler:take_member(PoolName) || _X <- lists:seq(0, 1)],
  1509. Res = pooler:call_free_members(PoolName, fun pooled_gs:ping/1),
  1510. ?assertEqual(NumWorkers - 2, count_pongs(Res)),
  1511. [pooler:return_member(PoolName, P) || P <- Pids]
  1512. end},
  1513. {"perform an op where the op crashes all free members", fun() ->
  1514. ?assertEqual(NumWorkers, maps:get(free_count, dump_pool(PoolName))),
  1515. Res = pooler:call_free_members(
  1516. PoolName,
  1517. fun pooled_gs:error_on_call/1
  1518. ),
  1519. ?assertEqual(NumWorkers, count_errors(Res))
  1520. end}
  1521. ]}}.
  1522. count_pongs(Result) ->
  1523. lists:foldl(
  1524. fun
  1525. ({ok, pong}, Acc) -> Acc + 1;
  1526. ({error, _}, Acc) -> Acc
  1527. end,
  1528. 0,
  1529. Result
  1530. ).
  1531. count_errors(Result) ->
  1532. lists:foldl(
  1533. fun
  1534. ({error, _}, Acc) -> Acc + 1;
  1535. ({ok, _}, Acc) -> Acc
  1536. end,
  1537. 0,
  1538. Result
  1539. ).
  1540. % testing crash recovery means race conditions when either pids
  1541. % haven't yet crashed or pooler hasn't recovered. So this helper loops
  1542. % forver until N pids are obtained, ignoring error_no_members.
  1543. get_n_pids(N, Acc) ->
  1544. get_n_pids(test_pool_1, N, Acc).
  1545. get_n_pids(_Pool, 0, Acc) ->
  1546. Acc;
  1547. get_n_pids(Pool, N, Acc) ->
  1548. case pooler:take_member(Pool) of
  1549. error_no_members ->
  1550. get_n_pids(Pool, N, Acc);
  1551. Pid ->
  1552. get_n_pids(Pool, N - 1, [Pid | Acc])
  1553. end.
  1554. get_n_pids_group(_Group, 0, Acc) ->
  1555. Acc;
  1556. get_n_pids_group(Group, N, Acc) ->
  1557. case pooler:take_group_member(Group) of
  1558. error_no_members ->
  1559. get_n_pids_group(Group, N, Acc);
  1560. Pid ->
  1561. get_n_pids_group(Group, N - 1, [Pid | Acc])
  1562. end.
  1563. children_count(SupId) ->
  1564. length(supervisor:which_children(SupId)).
  1565. starting_members(PoolName) ->
  1566. length(maps:get(starting_members, dump_pool(PoolName))).
  1567. dump_pool(PoolName) ->
  1568. gen_server:call(PoolName, dump_pool).
  1569. % >= OTP-21
  1570. -ifdef(OTP_RELEASE).
  1571. -if(?OTP_RELEASE >= 23).
  1572. -define(USE_PG_NOT_PG2, true).
  1573. -else.
  1574. -undef(USE_PG_NOT_PG2).
  1575. -endif.
  1576. % < OTP-21
  1577. -else.
  1578. -undef(USE_PG_NOT_PG2).
  1579. -endif.
  1580. -ifdef(USE_PG_NOT_PG2).
  1581. pg_start() ->
  1582. pg:start(_Scope = 'pg').
  1583. pg_stop() ->
  1584. lists:foreach(
  1585. fun(Group) -> pg:leave(Group, pg:get_members(Group)) end,
  1586. pg:which_groups()
  1587. ).
  1588. pg_leave(Group, Pid) ->
  1589. pg:leave(Group, Pid).
  1590. -else.
  1591. pg_start() ->
  1592. pg2:start().
  1593. pg_stop() ->
  1594. application:stop(pg2).
  1595. pg_leave(Group, Pid) ->
  1596. pg2:leave(Group, Pid).
  1597. -endif.