pooler_tests.erl 52 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399
  1. -module(pooler_tests).
  2. -include_lib("eunit/include/eunit.hrl").
  3. -include("../src/pooler.hrl").
  4. -export([sleep_for_configured_timeout/0]).
  5. % The `user' processes represent users of the pooler library. A user
  6. % process will take a pid, report details on the pid it has, release
  7. % and take a new pid, stop cleanly, and crash.
  8. start_user() ->
  9. spawn(fun() -> user_loop(start) end).
  10. user_id(Pid) ->
  11. Pid ! {get_tc_id, self()},
  12. receive
  13. {Type, Id} ->
  14. {Type, Id}
  15. end.
  16. user_new_tc(Pid) ->
  17. Pid ! new_tc.
  18. user_stop(Pid) ->
  19. Pid ! stop.
  20. user_crash(Pid) ->
  21. Pid ! crash.
  22. user_loop(Atom) when Atom =:= error_no_members orelse Atom =:= start ->
  23. user_loop(pooler:take_member(test_pool_1));
  24. user_loop(MyTC) ->
  25. receive
  26. {get_tc_id, From} ->
  27. From ! pooled_gs:get_id(MyTC),
  28. user_loop(MyTC);
  29. {ping_tc, From} ->
  30. From ! pooled_gs:ping(MyTC),
  31. user_loop(MyTC);
  32. {ping_count, From} ->
  33. From ! pooled_gs:ping_count(MyTC),
  34. user_loop(MyTC);
  35. new_tc ->
  36. pooler:return_member(test_pool_1, MyTC, ok),
  37. MyNewTC = pooler:take_member(test_pool_1),
  38. user_loop(MyNewTC);
  39. stop ->
  40. pooler:return_member(test_pool_1, MyTC, ok),
  41. stopped;
  42. crash ->
  43. erlang:error({user_loop, kaboom})
  44. end.
  45. % The `tc' processes represent the pids tracked by pooler for testing.
  46. % They have a type and an ID and can report their type and ID and
  47. % stop.
  48. %% tc_loop({Type, Id}) ->
  49. %% receive
  50. %% {get_id, From} ->
  51. %% From ! {ok, Type, Id},
  52. %% tc_loop({Type, Id});
  53. %% stop -> stopped;
  54. %% crash ->
  55. %% erlang:error({tc_loop, kaboom})
  56. %% end.
  57. %% get_tc_id(Pid) ->
  58. %% Pid ! {get_id, self()},
  59. %% receive
  60. %% {ok, Type, Id} ->
  61. %% {Type, Id}
  62. %% after 200 ->
  63. %% timeout
  64. %% end.
  65. %% stop_tc(Pid) ->
  66. %% Pid ! stop.
  67. %% tc_starter(Type) ->
  68. %% Ref = make_ref(),
  69. %% spawn_link(fun() -> tc_loop({Type, Ref}) end).
  70. %% assert_tc_valid(Pid) ->
  71. %% ?assertMatch({_Type, _Ref}, get_tc_id(Pid)),
  72. %% ok.
  73. % tc_sanity_test() ->
  74. % Pid1 = tc_starter("1"),
  75. % {"1", Id1} = get_tc_id(Pid1),
  76. % Pid2 = tc_starter("1"),
  77. % {"1", Id2} = get_tc_id(Pid2),
  78. % ?assertNot(Id1 == Id2),
  79. % stop_tc(Pid1),
  80. % stop_tc(Pid2).
  81. % user_sanity_test() ->
  82. % Pid1 = tc_starter("1"),
  83. % User = spawn(fun() -> user_loop(Pid1) end),
  84. % ?assertMatch({"1", _Ref}, user_id(User)),
  85. % user_crash(User),
  86. % stop_tc(Pid1).
  87. pooler_basics_via_config_test_() ->
  88. {setup,
  89. fun() ->
  90. {ok, _} = error_logger_mon:start_link(),
  91. error_logger_mon:install_handler(pooler),
  92. logger:set_handler_config(default, filters, []),
  93. application:set_env(pooler, metrics_module, fake_metrics),
  94. fake_metrics:start_link()
  95. end,
  96. fun(_X) ->
  97. error_logger_mon:uninstall_handler(),
  98. ok = error_logger_mon:stop(),
  99. fake_metrics:stop()
  100. end,
  101. {foreach,
  102. % setup
  103. fun() ->
  104. error_logger_mon:reset(),
  105. Pools = [[{name, test_pool_1},
  106. {max_count, 3},
  107. {init_count, 2},
  108. {cull_interval, {0, min}},
  109. {start_mfa,
  110. {pooled_gs, start_link, [{"type-0"}]}}]],
  111. application:set_env(pooler, pools, Pools),
  112. application:start(pooler)
  113. end,
  114. fun(_X) ->
  115. application:stop(pooler)
  116. end,
  117. basic_tests()}}.
  118. pooler_basics_dynamic_test_() ->
  119. {setup,
  120. fun() ->
  121. {ok, _} = error_logger_mon:start_link(),
  122. error_logger_mon:install_handler(pooler),
  123. logger:set_handler_config(default, filters, []),
  124. application:set_env(pooler, metrics_module, fake_metrics),
  125. fake_metrics:start_link()
  126. end,
  127. fun(_X) ->
  128. error_logger_mon:uninstall_handler(),
  129. ok = error_logger_mon:stop(),
  130. fake_metrics:stop()
  131. end,
  132. {foreach,
  133. % setup
  134. fun() ->
  135. error_logger_mon:reset(),
  136. Pool = [{name, test_pool_1},
  137. {max_count, 3},
  138. {init_count, 2},
  139. {start_mfa,
  140. {pooled_gs, start_link, [{"type-0"}]}}],
  141. application:unset_env(pooler, pools),
  142. application:start(pooler),
  143. pooler:new_pool(Pool)
  144. end,
  145. fun(_X) ->
  146. application:stop(pooler)
  147. end,
  148. basic_tests()}}.
  149. pooler_basics_integration_to_other_supervisor_test_() ->
  150. {setup,
  151. fun() ->
  152. {ok, _} = error_logger_mon:start_link(),
  153. error_logger_mon:install_handler(pooler),
  154. logger:set_handler_config(default, filters, []),
  155. application:set_env(pooler, metrics_module, fake_metrics),
  156. fake_metrics:start_link()
  157. end,
  158. fun(_X) ->
  159. error_logger_mon:uninstall_handler(),
  160. ok = error_logger_mon:stop(),
  161. fake_metrics:stop()
  162. end,
  163. {foreach,
  164. % setup
  165. fun() ->
  166. error_logger_mon:reset(),
  167. Pool = [{name, test_pool_1},
  168. {max_count, 3},
  169. {init_count, 2},
  170. {start_mfa,
  171. {pooled_gs, start_link, [{"type-0"}]}}],
  172. application:unset_env(pooler, pools),
  173. application:start(pooler),
  174. supervisor:start_link(fake_external_supervisor, Pool)
  175. end,
  176. fun({ok, SupPid}) ->
  177. exit(SupPid, normal),
  178. application:stop(pooler)
  179. end,
  180. basic_tests()}}.
  181. basic_tests() ->
  182. [
  183. {"there are init_count members at start",
  184. fun() ->
  185. Stats = [ P || {P, {_, free, _}} <- pooler:pool_stats(test_pool_1) ],
  186. ?assertEqual(2, length(Stats))
  187. end},
  188. {"take and return one",
  189. fun() ->
  190. P = pooler:take_member(test_pool_1),
  191. ?assertMatch({"type-0", _Id}, pooled_gs:get_id(P)),
  192. ok = pooler:return_member(test_pool_1, P, ok)
  193. end},
  194. {"take and return one, named pool",
  195. fun() ->
  196. P = pooler:take_member(test_pool_1),
  197. ?assertMatch({"type-0", _Id}, pooled_gs:get_id(P)),
  198. ok, pooler:return_member(test_pool_1, P)
  199. end},
  200. {"attempt to take form unknown pool",
  201. fun() ->
  202. %% since pools are now servers, an unknown pool will timeout
  203. ?assertExit({noproc, _}, pooler:take_member(bad_pool_name))
  204. end},
  205. {"members creation is triggered after pool exhaustion until max",
  206. fun() ->
  207. %% init count is 2
  208. Pids0 = [pooler:take_member(test_pool_1), pooler:take_member(test_pool_1)],
  209. %% since new member creation is async, can only assert
  210. %% that we will get a pid, but may not be first try.
  211. Pids = get_n_pids(1, Pids0),
  212. %% pool is at max now, requests should give error
  213. ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
  214. ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
  215. PRefs = [ R || {_T, R} <- [ pooled_gs:get_id(P) || P <- Pids ] ],
  216. % no duplicates
  217. ?assertEqual(length(PRefs), length(lists:usort(PRefs)))
  218. end
  219. },
  220. {"pids are reused most recent return first",
  221. fun() ->
  222. P1 = pooler:take_member(test_pool_1),
  223. P2 = pooler:take_member(test_pool_1),
  224. ?assertNot(P1 == P2),
  225. ok = pooler:return_member(test_pool_1, P1, ok),
  226. ok = pooler:return_member(test_pool_1, P2, ok),
  227. % pids are reused most recent first
  228. ?assertEqual(P2, pooler:take_member(test_pool_1)),
  229. ?assertEqual(P1, pooler:take_member(test_pool_1))
  230. end},
  231. {"if an in-use pid crashes it is replaced",
  232. fun() ->
  233. Pids0 = get_n_pids(3, []),
  234. Ids0 = [ pooled_gs:get_id(P) || P <- Pids0 ],
  235. % crash them all
  236. [ pooled_gs:crash(P) || P <- Pids0 ],
  237. Pids1 = get_n_pids(3, []),
  238. Ids1 = [ pooled_gs:get_id(P) || P <- Pids1 ],
  239. [ ?assertNot(lists:member(I, Ids0)) || I <- Ids1 ]
  240. end
  241. },
  242. {"if a free pid crashes it is replaced",
  243. fun() ->
  244. FreePids = [ P || {P, {_, free, _}} <- pooler:pool_stats(test_pool_1) ],
  245. [ exit(P, kill) || P <- FreePids ],
  246. Pids1 = get_n_pids(3, []),
  247. ?assertEqual(3, length(Pids1))
  248. end},
  249. {"if a pid is returned with bad status it is replaced",
  250. fun() ->
  251. Pids0 = get_n_pids(3, []),
  252. Ids0 = [ pooled_gs:get_id(P) || P <- Pids0 ],
  253. % return them all marking as bad
  254. [ pooler:return_member(test_pool_1, P, fail) || P <- Pids0 ],
  255. Pids1 = get_n_pids(3, []),
  256. Ids1 = [ pooled_gs:get_id(P) || P <- Pids1 ],
  257. [ ?assertNot(lists:member(I, Ids0)) || I <- Ids1 ]
  258. end
  259. },
  260. {"if a consumer crashes, pid is replaced",
  261. fun() ->
  262. Consumer = start_user(),
  263. StartId = user_id(Consumer),
  264. user_crash(Consumer),
  265. NewPid = hd(get_n_pids(1, [])),
  266. NewId = pooled_gs:get_id(NewPid),
  267. ?assertNot(NewId == StartId)
  268. end
  269. },
  270. {"it is ok to return an unknown pid",
  271. fun() ->
  272. Bogus1 = spawn(fun() -> ok end),
  273. Bogus2 = spawn(fun() -> ok end),
  274. ?assertEqual(ok, pooler:return_member(test_pool_1, Bogus1, ok)),
  275. ?assertEqual(ok, pooler:return_member(test_pool_1, Bogus2, fail))
  276. end
  277. },
  278. {"it is ok to return a pid more than once",
  279. fun() ->
  280. M = pooler:take_member(test_pool_1),
  281. [ pooler:return_member(test_pool_1, M)
  282. || _I <- lists:seq(1, 37) ],
  283. ?assertEqual(
  284. 36,
  285. length(
  286. lists:filter(
  287. fun(#{msg := {report, #{label := "ignored return of free member",
  288. pid := Pid}}}) ->
  289. Pid =:= M;
  290. (_) ->
  291. false
  292. end,
  293. error_logger_mon:get_msgs()))),
  294. M1 = pooler:take_member(test_pool_1),
  295. M2 = pooler:take_member(test_pool_1),
  296. ?assert(M1 =/= M2),
  297. Pool1 = gen_server:call(test_pool_1, dump_pool),
  298. ?assertEqual(2, Pool1#pool.in_use_count),
  299. ?assertEqual(0, Pool1#pool.free_count),
  300. pooler:return_member(test_pool_1, M1),
  301. pooler:return_member(test_pool_1, M2),
  302. Pool2 = gen_server:call(test_pool_1, dump_pool),
  303. ?assertEqual(0, Pool2#pool.in_use_count),
  304. ?assertEqual(2, Pool2#pool.free_count),
  305. ok
  306. end},
  307. {"calling return_member on error_no_members is ignored",
  308. fun() ->
  309. ?assertEqual(ok, pooler:return_member(test_pool_1, error_no_members)),
  310. ?assertEqual(ok, pooler:return_member(test_pool_1, error_no_members, ok)),
  311. ?assertEqual(ok, pooler:return_member(test_pool_1, error_no_members, fail))
  312. end
  313. },
  314. {"dynamic pool creation",
  315. fun() ->
  316. PoolSpec = [{name, dyn_pool_1},
  317. {max_count, 3},
  318. {init_count, 2},
  319. {start_mfa,
  320. {pooled_gs, start_link, [{"dyn-0"}]}}],
  321. {ok, SupPid1} = pooler:new_pool(PoolSpec),
  322. ?assert(is_pid(SupPid1)),
  323. M = pooler:take_member(dyn_pool_1),
  324. ?assertMatch({"dyn-0", _Id}, pooled_gs:get_id(M)),
  325. ?assertEqual(ok, pooler:rm_pool(dyn_pool_1)),
  326. ?assertExit({noproc, _}, pooler:take_member(dyn_pool_1)),
  327. %% verify pool of same name can be created after removal
  328. {ok, SupPid2} = pooler:new_pool(PoolSpec),
  329. ?assert(is_pid(SupPid2)),
  330. %% remove non-existing pool
  331. ?assertEqual(ok, pooler:rm_pool(dyn_pool_X)),
  332. ?assertEqual(ok, pooler:rm_pool(dyn_pool_1))
  333. end},
  334. {"metrics have been called (no timeout/queue)",
  335. fun() ->
  336. %% exercise the API to ensure we have certain keys reported as metrics
  337. fake_metrics:reset_metrics(),
  338. Pids = [ pooler:take_member(test_pool_1) || _I <- lists:seq(1, 10) ],
  339. [ pooler:return_member(test_pool_1, P) || P <- Pids ],
  340. catch pooler:take_member(bad_pool_name),
  341. %% kill and unused member
  342. exit(hd(Pids), kill),
  343. %% kill a used member
  344. KillMe = pooler:take_member(test_pool_1),
  345. exit(KillMe, kill),
  346. %% FIXME: We need to wait for pooler to process the
  347. %% exit message. This is ugly, will fix later.
  348. timer:sleep(200), % :(
  349. ExpectKeys = lists:sort([<<"pooler.test_pool_1.error_no_members_count">>,
  350. <<"pooler.test_pool_1.events">>,
  351. <<"pooler.test_pool_1.free_count">>,
  352. <<"pooler.test_pool_1.in_use_count">>,
  353. <<"pooler.test_pool_1.killed_free_count">>,
  354. <<"pooler.test_pool_1.killed_in_use_count">>,
  355. <<"pooler.test_pool_1.take_rate">>]),
  356. Metrics = fake_metrics:get_metrics(),
  357. GotKeys = lists:usort([ Name || {Name, _, _} <- Metrics ]),
  358. ?assertEqual(ExpectKeys, GotKeys)
  359. end},
  360. {"metrics have been called (with timeout/queue)",
  361. fun() ->
  362. %% exercise the API to ensure we have certain keys reported as metrics
  363. fake_metrics:reset_metrics(),
  364. %% pass a non-zero timeout here to exercise queueing
  365. Pids = [ pooler:take_member(test_pool_1, 1) || _I <- lists:seq(1, 10) ],
  366. [ pooler:return_member(test_pool_1, P) || P <- Pids ],
  367. catch pooler:take_member(bad_pool_name),
  368. %% kill and unused member
  369. exit(hd(Pids), kill),
  370. %% kill a used member
  371. KillMe = pooler:take_member(test_pool_1),
  372. exit(KillMe, kill),
  373. %% FIXME: We need to wait for pooler to process the
  374. %% exit message. This is ugly, will fix later.
  375. timer:sleep(200), % :(
  376. ExpectKeys = lists:sort([<<"pooler.test_pool_1.error_no_members_count">>,
  377. <<"pooler.test_pool_1.events">>,
  378. <<"pooler.test_pool_1.free_count">>,
  379. <<"pooler.test_pool_1.in_use_count">>,
  380. <<"pooler.test_pool_1.killed_free_count">>,
  381. <<"pooler.test_pool_1.killed_in_use_count">>,
  382. <<"pooler.test_pool_1.take_rate">>,
  383. <<"pooler.test_pool_1.queue_count">>]),
  384. Metrics = fake_metrics:get_metrics(),
  385. GotKeys = lists:usort([ Name || {Name, _, _} <- Metrics ]),
  386. ?assertEqual(ExpectKeys, GotKeys)
  387. end},
  388. {"accept bad member is handled",
  389. fun() ->
  390. Bad = spawn(fun() -> ok end),
  391. FakeStarter = spawn(fun() -> starter end),
  392. ?assertEqual(ok, pooler:accept_member(test_pool_1, {FakeStarter, Bad}))
  393. end},
  394. {"utilization returns sane results",
  395. fun() ->
  396. #pool{max_count = MaxCount, queue_max = QueueMax} = sys:get_state(test_pool_1),
  397. ?assertEqual(MaxCount, ?gv(max_count, pooler:pool_utilization(test_pool_1))),
  398. ?assertEqual(0, ?gv(in_use_count, pooler:pool_utilization(test_pool_1))),
  399. ?assertEqual(2, ?gv(free_count, pooler:pool_utilization(test_pool_1))),
  400. ?assertEqual(0, ?gv(queued_count, pooler:pool_utilization(test_pool_1))),
  401. ?assertEqual(QueueMax, ?gv(queue_max, pooler:pool_utilization(test_pool_1)))
  402. end}
  403. ].
  404. pooler_groups_test_() ->
  405. {setup,
  406. fun() ->
  407. logger:set_handler_config(default, filters, []),
  408. application:set_env(pooler, metrics_module, fake_metrics),
  409. fake_metrics:start_link()
  410. end,
  411. fun(_X) ->
  412. fake_metrics:stop()
  413. end,
  414. {foreach,
  415. % setup
  416. fun() ->
  417. Pools = [[{name, test_pool_1},
  418. {group, group_1},
  419. {max_count, 3},
  420. {init_count, 2},
  421. {start_mfa,
  422. {pooled_gs, start_link, [{"type-1-1"}]}}],
  423. [{name, test_pool_2},
  424. {group, group_1},
  425. {max_count, 3},
  426. {init_count, 2},
  427. {start_mfa,
  428. {pooled_gs, start_link, [{"type-1-2"}]}}],
  429. %% test_pool_3 not part of the group
  430. [{name, test_pool_3},
  431. {group, undefined},
  432. {max_count, 3},
  433. {init_count, 2},
  434. {start_mfa,
  435. {pooled_gs, start_link, [{"type-3"}]}}]
  436. ],
  437. application:set_env(pooler, pools, Pools),
  438. pg_start(),
  439. application:start(pooler)
  440. end,
  441. fun(_X) ->
  442. application:stop(pooler),
  443. pg_stop()
  444. end,
  445. [
  446. {"take and return one group member (repeated)",
  447. fun() ->
  448. Types = [ begin
  449. Pid = pooler:take_group_member(group_1),
  450. ?assert(is_pid(Pid), [{result, Pid}, {i, I}]),
  451. {Type, _} = pooled_gs:get_id(Pid),
  452. ?assertMatch("type-1" ++ _, Type),
  453. ok = pooler:return_group_member(group_1, Pid, ok),
  454. timer:sleep(10),
  455. Type
  456. end
  457. || I <- lists:seq(1, 50) ],
  458. Type_1_1 = [ X || "type-1-1" = X <- Types ],
  459. Type_1_2 = [ X || "type-1-2" = X <- Types ],
  460. ?assert(length(Type_1_1) > 0, [{types, Types}]),
  461. ?assert(length(Type_1_2) > 0, [{types, Types}])
  462. end},
  463. {"take member from unknown group",
  464. fun() ->
  465. ?assertEqual(error_no_members,
  466. pooler:take_group_member(not_a_group))
  467. end},
  468. {"return member to unknown group",
  469. fun() ->
  470. Pid = pooler:take_group_member(group_1),
  471. ?assertEqual(ok, pooler:return_group_member(no_such_group, Pid))
  472. end},
  473. {"return member to wrong group",
  474. fun() ->
  475. Pid = pooler:take_member(test_pool_3),
  476. ?assertEqual(ok, pooler:return_group_member(group_1, Pid))
  477. end},
  478. {"return member with something which is not a pid",
  479. fun() ->
  480. ?assertException(error, _, pooler:return_group_member(group_1, not_pid))
  481. end},
  482. {"take member from empty group",
  483. fun() ->
  484. %% artificially empty group member list
  485. [ pg_leave(group_1, M) || M <- pg_get_members(group_1) ],
  486. ?assertEqual(error_no_members, pooler:take_group_member(group_1))
  487. end},
  488. {"return member to group, implied ok",
  489. fun() ->
  490. Pid = pooler:take_group_member(group_1),
  491. ?assertEqual(ok, pooler:return_group_member(group_1, Pid))
  492. end},
  493. {"return error_no_member to group",
  494. fun() ->
  495. ?assertEqual(ok, pooler:return_group_member(group_1, error_no_members))
  496. end},
  497. {"exhaust pools in group",
  498. fun() ->
  499. Pids = get_n_pids_group(group_1, 6, []),
  500. %% they should all be pids
  501. [ begin
  502. {Type, _} = pooled_gs:get_id(P),
  503. ?assertMatch("type-1" ++ _, Type),
  504. ok
  505. end || P <- Pids ],
  506. %% further attempts should be error
  507. [error_no_members,
  508. error_no_members,
  509. error_no_members] = [ pooler:take_group_member(group_1)
  510. || _I <- lists:seq(1, 3) ]
  511. end},
  512. {"rm_group with nonexisting group",
  513. fun() ->
  514. ?assertEqual(ok, pooler:rm_group(i_dont_exist))
  515. end},
  516. {"rm_group with existing empty group",
  517. fun() ->
  518. ?assertEqual(ok, pooler:rm_pool(test_pool_1)),
  519. ?assertEqual(ok, pooler:rm_pool(test_pool_2)),
  520. timer:sleep(100), % process group de-registration is asynchronous
  521. ?assertEqual(error_no_members, pooler:take_group_member(group_1)),
  522. ?assertEqual(ok, pooler:rm_group(group_1)),
  523. ?assertExit({noproc, _}, pooler:take_member(test_pool_1)),
  524. ?assertExit({noproc, _}, pooler:take_member(test_pool_2)),
  525. ?assertEqual(error_no_members,
  526. pooler:take_group_member(group_1))
  527. end},
  528. {"rm_group with existing non-empty group",
  529. fun() ->
  530. %% Verify that group members exist
  531. MemberPid = pooler:take_group_member(group_1),
  532. ?assert(is_pid(MemberPid)),
  533. pooler:return_group_member(group_1, MemberPid),
  534. Pool1Pid = pooler:take_member(test_pool_1),
  535. ?assert(is_pid(Pool1Pid)),
  536. pooler:return_member(test_pool_1, Pool1Pid),
  537. Pool2Pid = pooler:take_member(test_pool_2),
  538. ?assert(is_pid(Pool2Pid)),
  539. pooler:return_member(test_pool_2, Pool2Pid),
  540. %% Delete and verify that group and pools are destroyed
  541. ?assertEqual(ok, pooler:rm_group(group_1)),
  542. ?assertExit({noproc, _}, pooler:take_member(test_pool_1)),
  543. ?assertExit({noproc, _}, pooler:take_member(test_pool_2)),
  544. ?assertEqual(error_no_members,
  545. pooler:take_group_member(group_1))
  546. end}
  547. ]}}.
  548. pooler_limit_failed_adds_test_() ->
  549. %% verify that pooler crashes completely if too many failures are
  550. %% encountered while trying to add pids.
  551. {setup,
  552. fun() ->
  553. logger:set_handler_config(default, filters, []),
  554. Pools = [[{name, test_pool_1},
  555. {max_count, 10},
  556. {init_count, 10},
  557. {start_mfa,
  558. {pooled_gs, start_link, [crash]}}]],
  559. application:set_env(pooler, pools, Pools)
  560. end,
  561. fun(_) ->
  562. application:stop(pooler)
  563. end,
  564. fun() ->
  565. application:start(pooler),
  566. ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
  567. ?assertEqual(error_no_members, pooler:take_member(test_pool_1))
  568. end}.
  569. pooler_scheduled_cull_test_() ->
  570. {setup,
  571. fun() ->
  572. logger:set_handler_config(default, filters, []),
  573. application:set_env(pooler, metrics_module, fake_metrics),
  574. fake_metrics:start_link(),
  575. Pools = [[{name, test_pool_1},
  576. {max_count, 10},
  577. {init_count, 2},
  578. {start_mfa, {pooled_gs, start_link, [{"type-0"}]}},
  579. {cull_interval, {200, ms}},
  580. {max_age, {0, min}}]],
  581. application:set_env(pooler, pools, Pools),
  582. application:start(pooler)
  583. end,
  584. fun(_X) ->
  585. fake_metrics:stop(),
  586. application:stop(pooler)
  587. end,
  588. [
  589. {foreach,
  590. fun() ->
  591. Pids = get_n_pids(test_pool_1, 10, []),
  592. ?assertEqual(10, length(pooler:pool_stats(test_pool_1))),
  593. ?assertEqual(10, length(Pids)),
  594. Pids
  595. end,
  596. fun(Pids) ->
  597. [ pooler:return_member(test_pool_1, P) || P <- Pids ]
  598. end,
  599. [
  600. fun(Pids) ->
  601. {"excess members are culled run 1",
  602. fun() ->
  603. [ pooler:return_member(test_pool_1, P) || P <- Pids ],
  604. %% wait for longer than cull delay
  605. timer:sleep(250),
  606. ?assertEqual(2, length(pooler:pool_stats(test_pool_1))),
  607. ?assertEqual(2, ?gv(free_count,pooler:pool_utilization(test_pool_1)))
  608. end}
  609. end,
  610. fun(Pids) ->
  611. {"excess members are culled run 2",
  612. fun() ->
  613. [ pooler:return_member(test_pool_1, P) || P <- Pids ],
  614. %% wait for longer than cull delay
  615. timer:sleep(250),
  616. ?assertEqual(2, length(pooler:pool_stats(test_pool_1))),
  617. ?assertEqual(2, ?gv(free_count,pooler:pool_utilization(test_pool_1)))
  618. end}
  619. end,
  620. fun(Pids) -> in_use_members_not_culled(Pids, 1) end,
  621. fun(Pids) -> in_use_members_not_culled(Pids, 2) end,
  622. fun(Pids) -> in_use_members_not_culled(Pids, 3) end,
  623. fun(Pids) -> in_use_members_not_culled(Pids, 4) end,
  624. fun(Pids) -> in_use_members_not_culled(Pids, 5) end,
  625. fun(Pids) -> in_use_members_not_culled(Pids, 6) end
  626. ]},
  627. {"no cull when init_count matches max_count",
  628. %% not sure how to verify this. But this test at least
  629. %% exercises the code path.
  630. fun() ->
  631. Config = [{name, test_static_pool_1},
  632. {max_count, 2},
  633. {init_count, 2},
  634. {start_mfa, {pooled_gs, start_link, [{"static-0"}]}},
  635. {cull_interval, {200, ms}}], % ignored
  636. pooler:new_pool(Config),
  637. P = pooler:take_member(test_static_pool_1),
  638. ?assertMatch({"static-0", _}, pooled_gs:get_id(P)),
  639. pooler:return_member(test_static_pool_1, P),
  640. ok
  641. end}
  642. ]}.
  643. in_use_members_not_culled(Pids, N) ->
  644. {"in-use members are not culled " ++ erlang:integer_to_list(N),
  645. fun() ->
  646. %% wait for longer than cull delay
  647. timer:sleep(250),
  648. PidCount = length(Pids),
  649. ?assertEqual(PidCount,
  650. length(pooler:pool_stats(test_pool_1))),
  651. ?assertEqual(0, ?gv(free_count,pooler:pool_utilization(test_pool_1))),
  652. ?assertEqual(PidCount, ?gv(in_use_count,pooler:pool_utilization(test_pool_1))),
  653. Returns = lists:sublist(Pids, N),
  654. [ pooler:return_member(test_pool_1, P)
  655. || P <- Returns ],
  656. timer:sleep(250),
  657. ?assertEqual(PidCount - N,
  658. length(pooler:pool_stats(test_pool_1)))
  659. end}.
  660. random_message_test_() ->
  661. {setup,
  662. fun() ->
  663. logger:set_handler_config(default, filters, []),
  664. Pools = [[{name, test_pool_1},
  665. {max_count, 2},
  666. {init_count, 1},
  667. {start_mfa,
  668. {pooled_gs, start_link, [{"type-0"}]}}]],
  669. application:set_env(pooler, pools, Pools),
  670. application:start(pooler),
  671. %% now send some bogus messages
  672. %% do the call in a throw-away process to avoid timeout error
  673. spawn(fun() -> catch gen_server:call(test_pool_1, {unexpected_garbage_msg, 5}) end),
  674. gen_server:cast(test_pool_1, {unexpected_garbage_msg, 6}),
  675. whereis(test_pool_1) ! {unexpected_garbage_msg, 7},
  676. ok
  677. end,
  678. fun(_) ->
  679. application:stop(pooler)
  680. end,
  681. [
  682. fun() ->
  683. Pid = spawn(fun() -> ok end),
  684. MonMsg = {'DOWN', erlang:make_ref(), process, Pid, because},
  685. test_pool_1 ! MonMsg
  686. end,
  687. fun() ->
  688. Pid = pooler:take_member(test_pool_1),
  689. {Type, _} = pooled_gs:get_id(Pid),
  690. ?assertEqual("type-0", Type)
  691. end,
  692. fun() ->
  693. RawPool = gen_server:call(test_pool_1, dump_pool),
  694. ?assertEqual(pool, element(1, RawPool))
  695. end
  696. ]}.
  697. pooler_integration_long_init_test_() ->
  698. {foreach,
  699. % setup
  700. fun() ->
  701. logger:set_handler_config(default, filters, []),
  702. {ok, _} = error_logger_mon:start_link(),
  703. error_logger_mon:install_handler(pooler),
  704. Pool = [{name, test_pool_1},
  705. {max_count, 10},
  706. {init_count, 0},
  707. {member_start_timeout, {10, ms}},
  708. {start_mfa,
  709. {pooled_gs, start_link, [{"type-0", fun() -> timer:sleep(15) end}]}}],
  710. application:set_env(pooler, pools, [Pool]),
  711. application:start(pooler)
  712. end,
  713. % cleanup
  714. fun(_) ->
  715. error_logger_mon:uninstall_handler(),
  716. ok = error_logger_mon:stop(),
  717. application:stop(pooler)
  718. end,
  719. %
  720. [
  721. fun(_) ->
  722. % Test what happens when pool members take too long to start.
  723. % The pooler_starter should kill off stale members, there by
  724. % reducing the number of children of the member_sup. This
  725. % activity occurs both during take member and accept member.
  726. % Accordingly, the count should go to zero once all starters
  727. % check in.
  728. fun() ->
  729. ?assertEqual(0, children_count(pooler_test_pool_1_member_sup)),
  730. [begin
  731. ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
  732. ?assertEqual(1, starting_members(test_pool_1))
  733. end
  734. || _ <- lists:seq(1,10)],
  735. ?assertEqual(10, children_count(pooler_test_pool_1_member_sup)),
  736. timer:sleep(150),
  737. ?assertEqual(0, children_count(pooler_test_pool_1_member_sup)),
  738. ?assertEqual(0, starting_members(test_pool_1)),
  739. %% there is a log when worker start times out
  740. ?assert(lists:any(
  741. fun(#{level := error,
  742. msg := {report, #{label := "starting member timeout",
  743. pool := test_pool_1}}}) ->
  744. true;
  745. (_) ->
  746. false
  747. end, error_logger_mon:get_msgs()))
  748. end
  749. end
  750. ]
  751. }.
  752. sleep_for_configured_timeout() ->
  753. SleepTime = case application:get_env(pooler, sleep_time) of
  754. {ok, Val} ->
  755. Val;
  756. _ ->
  757. 0
  758. end,
  759. timer:sleep(SleepTime).
  760. pooler_integration_queueing_test_() ->
  761. {foreach,
  762. % setup
  763. fun() ->
  764. logger:set_handler_config(default, filters, []),
  765. Pool = [{name, test_pool_1},
  766. {max_count, 10},
  767. {queue_max, 10},
  768. {init_count, 0},
  769. {metrics, fake_metrics},
  770. {member_start_timeout, {5, sec}},
  771. {start_mfa,
  772. {pooled_gs, start_link, [
  773. {"type-0",
  774. fun pooler_tests:sleep_for_configured_timeout/0 }
  775. ]
  776. }
  777. }
  778. ],
  779. application:set_env(pooler, pools, [Pool]),
  780. fake_metrics:start_link(),
  781. application:start(pooler)
  782. end,
  783. % cleanup
  784. fun(_) ->
  785. fake_metrics:stop(),
  786. application:stop(pooler)
  787. end,
  788. [
  789. fun(_) ->
  790. fun() ->
  791. ?assertEqual(0, (dump_pool(test_pool_1))#pool.free_count),
  792. Val = pooler:take_member(test_pool_1, 10),
  793. ?assert(is_pid(Val)),
  794. pooler:return_member(test_pool_1, Val)
  795. end
  796. end,
  797. fun(_) ->
  798. fun() ->
  799. application:set_env(pooler, sleep_time, 1),
  800. ?assertEqual(0, (dump_pool(test_pool_1))#pool.free_count),
  801. Val = pooler:take_member(test_pool_1, 0),
  802. ?assertEqual(error_no_members, Val),
  803. ?assertEqual(0, ?gv(free_count,pooler:pool_utilization(test_pool_1))),
  804. timer:sleep(50),
  805. %Next request should be available
  806. Pid = pooler:take_member(test_pool_1, 0),
  807. ?assert(is_pid(Pid)),
  808. pooler:return_member(test_pool_1, Pid)
  809. end
  810. end,
  811. fun(_) ->
  812. fun() ->
  813. application:set_env(pooler, sleep_time, 10),
  814. ?assertEqual(0, (dump_pool(test_pool_1))#pool.free_count),
  815. [
  816. ?assertEqual(pooler:take_member(test_pool_1, 0), error_no_members) ||
  817. _ <- lists:seq(1, (dump_pool(test_pool_1))#pool.max_count)],
  818. timer:sleep(50),
  819. %Next request should be available
  820. Pid = pooler:take_member(test_pool_1, 0),
  821. ?assert(is_pid(Pid)),
  822. pooler:return_member(test_pool_1, Pid)
  823. end
  824. end,
  825. fun(_) ->
  826. fun() ->
  827. % fill to queue_max, next request should return immediately with no_members
  828. % Will return a if queue max is not enforced.
  829. application:set_env(pooler, sleep_time, 100),
  830. [ proc_lib:spawn(fun() ->
  831. Val = pooler:take_member(test_pool_1, 200),
  832. ?assert(is_pid(Val)),
  833. pooler:return_member(Val)
  834. end)
  835. || _ <- lists:seq(1, (dump_pool(test_pool_1))#pool.max_count)
  836. ],
  837. ?assertEqual(0, ?gv(free_count,pooler:pool_utilization(test_pool_1))),
  838. ?assert(?gv(queued_count,pooler:pool_utilization(test_pool_1)) >= 1),
  839. ?assertEqual(10, ?gv(queue_max,pooler:pool_utilization(test_pool_1))),
  840. timer:sleep(50),
  841. ?assertEqual(10, queue:len((dump_pool(test_pool_1))#pool.queued_requestors)),
  842. ?assertEqual(10, ?gv(queue_max,pooler:pool_utilization(test_pool_1))),
  843. ?assertEqual(pooler:take_member(test_pool_1, 500), error_no_members),
  844. ExpectKeys = lists:sort([<<"pooler.test_pool_1.error_no_members_count">>,
  845. <<"pooler.test_pool_1.events">>,
  846. <<"pooler.test_pool_1.take_rate">>,
  847. <<"pooler.test_pool_1.queue_count">>,
  848. <<"pooler.test_pool_1.queue_max_reached">>]),
  849. Metrics = fake_metrics:get_metrics(),
  850. GotKeys = lists:usort([ Name || {Name, _, _} <- Metrics ]),
  851. ?assertEqual(ExpectKeys, GotKeys),
  852. timer:sleep(100),
  853. Val = pooler:take_member(test_pool_1, 500),
  854. ?assert(is_pid(Val)),
  855. pooler:return_member(test_pool_1, Val)
  856. end
  857. end
  858. ]
  859. }.
  860. pooler_integration_queueing_return_member_test_() ->
  861. {foreach,
  862. % setup
  863. fun() ->
  864. logger:set_handler_config(default, filters, []),
  865. Pool = [{name, test_pool_1},
  866. {max_count, 10},
  867. {queue_max, 10},
  868. {init_count, 10},
  869. {metrics, fake_metrics},
  870. {member_start_timeout, {5, sec}},
  871. {start_mfa,
  872. {pooled_gs, start_link, [
  873. {"type-0",
  874. fun pooler_tests:sleep_for_configured_timeout/0 }
  875. ]
  876. }
  877. }
  878. ],
  879. application:set_env(pooler, pools, [Pool]),
  880. fake_metrics:start_link(),
  881. application:start(pooler)
  882. end,
  883. % cleanup
  884. fun(_) ->
  885. fake_metrics:stop(),
  886. application:stop(pooler)
  887. end,
  888. [
  889. fun(_) ->
  890. fun() ->
  891. application:set_env(pooler, sleep_time, 0),
  892. Parent = self(),
  893. Pids = [ proc_lib:spawn_link(fun() ->
  894. Val = pooler:take_member(test_pool_1, 200),
  895. ?assert(is_pid(Val)),
  896. Parent ! {taken, self()},
  897. receive
  898. return ->
  899. pooler:return_member(test_pool_1, Val)
  900. after
  901. 5000 ->
  902. pooler:return_member(test_pool_1, Val)
  903. end,
  904. Parent ! {returned, self()}
  905. end)
  906. || _ <- lists:seq(1, (dump_pool(test_pool_1))#pool.max_count)
  907. ],
  908. [receive {taken, Pid} -> ok end || Pid <- Pids],
  909. ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
  910. proc_lib:spawn_link(fun() ->
  911. Val = pooler:take_member(test_pool_1, 200),
  912. Parent ! {extra_result, Val}
  913. end),
  914. [Pid ! return || Pid <- Pids],
  915. [receive {returned, Pid} -> ok end || Pid <- Pids],
  916. receive
  917. {extra_result, Result} ->
  918. ?assert(is_pid(Result)),
  919. pooler:return_member(test_pool_1, Result)
  920. end,
  921. ?assertEqual((dump_pool(test_pool_1))#pool.max_count, length((dump_pool(test_pool_1))#pool.free_pids)),
  922. ?assertEqual((dump_pool(test_pool_1))#pool.max_count, (dump_pool(test_pool_1))#pool.free_count)
  923. end
  924. end
  925. ]
  926. }.
  927. pooler_integration_test_() ->
  928. {foreach,
  929. % setup
  930. fun() ->
  931. logger:set_handler_config(default, filters, []),
  932. Pools = [[{name, test_pool_1},
  933. {max_count, 10},
  934. {init_count, 10},
  935. {start_mfa,
  936. {pooled_gs, start_link, [{"type-0"}]}}]],
  937. application:set_env(pooler, pools, Pools),
  938. application:start(pooler),
  939. Users = [ start_user() || _X <- lists:seq(1, 10) ],
  940. Users
  941. end,
  942. % cleanup
  943. fun(Users) ->
  944. [ user_stop(U) || U <- Users ],
  945. application:stop(pooler)
  946. end,
  947. %
  948. [
  949. fun(Users) ->
  950. fun() ->
  951. % each user has a different tc ID
  952. TcIds = lists:sort([ user_id(UPid) || UPid <- Users ]),
  953. ?assertEqual(lists:usort(TcIds), TcIds)
  954. end
  955. end
  956. ,
  957. fun(Users) ->
  958. fun() ->
  959. % users still unique after a renew cycle
  960. [ user_new_tc(UPid) || UPid <- Users ],
  961. TcIds = lists:sort([ user_id(UPid) || UPid <- Users ]),
  962. ?assertEqual(lists:usort(TcIds), TcIds)
  963. end
  964. end
  965. ,
  966. fun(Users) ->
  967. fun() ->
  968. % all users crash, pids are replaced
  969. TcIds1 = lists:sort([ user_id(UPid) || UPid <- Users ]),
  970. [ user_crash(UPid) || UPid <- Users ],
  971. Seq = lists:seq(1, 5),
  972. Users2 = [ start_user() || _X <- Seq ],
  973. TcIds2 = lists:sort([ user_id(UPid) || UPid <- Users2 ]),
  974. Both =
  975. sets:to_list(sets:intersection([sets:from_list(TcIds1),
  976. sets:from_list(TcIds2)])),
  977. ?assertEqual([], Both)
  978. end
  979. end
  980. ]
  981. }.
  982. pooler_auto_grow_disabled_by_default_test_() ->
  983. {setup,
  984. fun() ->
  985. logger:set_handler_config(default, filters, []),
  986. application:set_env(pooler, metrics_module, fake_metrics),
  987. fake_metrics:start_link()
  988. end,
  989. fun(_X) ->
  990. fake_metrics:stop()
  991. end,
  992. {foreach,
  993. % setup
  994. fun() ->
  995. Pool = [{name, test_pool_1},
  996. {max_count, 5},
  997. {init_count, 2},
  998. {start_mfa,
  999. {pooled_gs, start_link, [{"type-0"}]}}],
  1000. application:unset_env(pooler, pools),
  1001. application:start(pooler),
  1002. pooler:new_pool(Pool)
  1003. end,
  1004. fun(_X) ->
  1005. application:stop(pooler)
  1006. end,
  1007. [
  1008. {"take one, and it should not auto-grow",
  1009. fun() ->
  1010. ?assertEqual(2, (dump_pool(test_pool_1))#pool.free_count),
  1011. P = pooler:take_member(test_pool_1),
  1012. ?assertMatch({"type-0", _Id}, pooled_gs:get_id(P)),
  1013. timer:sleep(100),
  1014. ?assertEqual(1, (dump_pool(test_pool_1))#pool.free_count),
  1015. ok, pooler:return_member(test_pool_1, P)
  1016. end}
  1017. ]}}.
  1018. pooler_auto_grow_enabled_test_() ->
  1019. {setup,
  1020. fun() ->
  1021. logger:set_handler_config(default, filters, []),
  1022. application:set_env(pooler, metrics_module, fake_metrics),
  1023. fake_metrics:start_link()
  1024. end,
  1025. fun(_X) ->
  1026. fake_metrics:stop()
  1027. end,
  1028. {foreach,
  1029. % setup
  1030. fun() ->
  1031. Pool = [{name, test_pool_1},
  1032. {max_count, 5},
  1033. {init_count, 2},
  1034. {auto_grow_threshold, 1},
  1035. {start_mfa,
  1036. {pooled_gs, start_link, [{"type-0"}]}}],
  1037. application:unset_env(pooler, pools),
  1038. application:start(pooler),
  1039. pooler:new_pool(Pool)
  1040. end,
  1041. fun(_X) ->
  1042. application:stop(pooler)
  1043. end,
  1044. [
  1045. {"take one, and it should grow by 2",
  1046. fun() ->
  1047. ?assertEqual(2, (dump_pool(test_pool_1))#pool.free_count),
  1048. P = pooler:take_member(test_pool_1),
  1049. ?assertMatch({"type-0", _Id}, pooled_gs:get_id(P)),
  1050. timer:sleep(100),
  1051. ?assertEqual(3, (dump_pool(test_pool_1))#pool.free_count),
  1052. ok, pooler:return_member(test_pool_1, P)
  1053. end}
  1054. ]}}.
  1055. pooler_custom_stop_mfa_test_() ->
  1056. {foreach,
  1057. fun() ->
  1058. logger:set_handler_config(default, filters, []),
  1059. Pool = [{name, test_pool_1},
  1060. {max_count, 3},
  1061. {init_count, 2},
  1062. {cull_interval, {200, ms}},
  1063. {max_age, {0, min}},
  1064. {start_mfa, {pooled_gs, start_link, [{foo_type}]}}],
  1065. application:set_env(pooler, pools, [Pool])
  1066. end,
  1067. fun(_) ->
  1068. application:unset_env(pooler, pools)
  1069. end,
  1070. [
  1071. {"default behavior kills the pool member",
  1072. fun() ->
  1073. ok = application:start(pooler),
  1074. Reason = monitor_members_trigger_culling_and_return_reason(),
  1075. ok = application:stop(pooler),
  1076. ?assertEqual(killed, Reason)
  1077. end},
  1078. {"custom callback terminates the pool member normally",
  1079. fun() ->
  1080. {ok, [Pool]} = application:get_env(pooler, pools),
  1081. Stop = {stop_mfa, {pooled_gs, stop, ['$pooler_pid']}},
  1082. application:set_env(pooler, pools, [[Stop | Pool]]),
  1083. ok = application:start(pooler),
  1084. Reason = monitor_members_trigger_culling_and_return_reason(),
  1085. ok = application:stop(pooler),
  1086. ?assertEqual(normal, Reason)
  1087. end}]}.
  1088. no_error_logger_reports_after_culling_test_() ->
  1089. %% damn those wraiths! This is the cure
  1090. {foreach,
  1091. fun() ->
  1092. logger:set_handler_config(default, filters, []),
  1093. {ok, _Pid} = error_logger_mon:start_link(),
  1094. Pool = [{name, test_pool_1},
  1095. {max_count, 3},
  1096. {init_count, 2},
  1097. {cull_interval, {200, ms}},
  1098. {max_age, {0, min}},
  1099. {start_mfa, {pooled_gs, start_link, [{type}]}}],
  1100. application:set_env(pooler, pools, [Pool])
  1101. end,
  1102. fun(_) ->
  1103. ok = error_logger_mon:stop(),
  1104. error_logger_mon:uninstall_handler(),
  1105. application:unset_env(pooler, pools)
  1106. end,
  1107. [
  1108. {"Force supervisor to report by using exit/2 instead of terminate_child",
  1109. fun() ->
  1110. {ok, [Pool]} = application:get_env(pooler, pools),
  1111. Stop = {stop_mfa, {erlang, exit, ['$pooler_pid', kill]}},
  1112. application:set_env(pooler, pools, [[Stop | Pool]]),
  1113. ok = application:start(pooler),
  1114. error_logger_mon:install_handler(),
  1115. error_logger_mon:reset(),
  1116. Reason = monitor_members_trigger_culling_and_return_reason(),
  1117. %% we need to wait for the message to arrive before deleting handler
  1118. timer:sleep(250),
  1119. error_logger_mon:uninstall_handler(),
  1120. ok = application:stop(pooler),
  1121. ?assertEqual(killed, Reason),
  1122. ?assertEqual(1, error_logger_mon:get_msg_count(),
  1123. [{msgs, error_logger_mon:get_msgs()},
  1124. {m, [R || #{msg := {report, R}} <- error_logger_mon:get_msgs()]}])
  1125. end},
  1126. {"Default MFA shouldn't generate any reports during culling",
  1127. fun() ->
  1128. ok = application:start(pooler),
  1129. error_logger_mon:install_handler(),
  1130. Reason = monitor_members_trigger_culling_and_return_reason(),
  1131. error_logger_mon:uninstall_handler(),
  1132. ok = application:stop(pooler),
  1133. ?assertEqual(killed, Reason),
  1134. ?assertEqual(0, error_logger_mon:get_msg_count())
  1135. end}
  1136. ]}.
  1137. monitor_members_trigger_culling_and_return_reason() ->
  1138. Pids = get_n_pids(test_pool_1, 3, []),
  1139. [ erlang:monitor(process, P) || P <- Pids ],
  1140. [ pooler:return_member(test_pool_1, P) || P <- Pids ],
  1141. receive
  1142. {'DOWN', _Ref, process, _Pid, Reason} ->
  1143. Reason
  1144. after
  1145. 250 -> timeout
  1146. end.
  1147. time_as_millis_test_() ->
  1148. Zeros = [ {{0, U}, 0} || U <- [min, sec, ms, mu] ],
  1149. Ones = [{{1, min}, 60000},
  1150. {{1, sec}, 1000},
  1151. {{1, ms}, 1},
  1152. {{1, mu}, 0}],
  1153. Misc = [{{3000, mu}, 3}],
  1154. Tests = Zeros ++ Ones ++ Misc,
  1155. [ ?_assertEqual(E, pooler:time_as_millis(I)) || {I, E} <- Tests ].
  1156. time_as_micros_test_() ->
  1157. Zeros = [ {{0, U}, 0} || U <- [min, sec, ms, mu] ],
  1158. Ones = [{{1, min}, 60000000},
  1159. {{1, sec}, 1000000},
  1160. {{1, ms}, 1000},
  1161. {{1, mu}, 1}],
  1162. Misc = [{{3000, mu}, 3000}],
  1163. Tests = Zeros ++ Ones ++ Misc,
  1164. [ ?_assertEqual(E, pooler:time_as_micros(I)) || {I, E} <- Tests ].
  1165. call_free_members_test_() ->
  1166. NumWorkers = 10,
  1167. PoolName = test_pool_1,
  1168. {setup,
  1169. fun() ->
  1170. application:set_env(pooler, metrics_module, fake_metrics),
  1171. fake_metrics:start_link()
  1172. end,
  1173. fun(_X) ->
  1174. fake_metrics:stop()
  1175. end,
  1176. {foreach,
  1177. fun() ->
  1178. Pool = [{name, PoolName},
  1179. {max_count, NumWorkers},
  1180. {init_count, NumWorkers},
  1181. {start_mfa,
  1182. {pooled_gs, start_link, [{"type-0"}]}}],
  1183. application:unset_env(pooler, pools),
  1184. application:start(pooler),
  1185. pooler:new_pool(Pool)
  1186. end,
  1187. fun(_X) ->
  1188. application:stop(pooler)
  1189. end,
  1190. [
  1191. {"perform a ping across the pool when all workers are free",
  1192. fun() ->
  1193. ?assertEqual(NumWorkers, (dump_pool(PoolName))#pool.free_count),
  1194. Res = pooler:call_free_members(PoolName, fun pooled_gs:ping/1),
  1195. ?assertEqual(NumWorkers, count_pongs(Res))
  1196. end},
  1197. {"perform a ping across the pool when two workers are taken",
  1198. fun() ->
  1199. ?assertEqual(NumWorkers, (dump_pool(PoolName))#pool.free_count),
  1200. Pids = [pooler:take_member(PoolName) || _X <- lists:seq(0, 1)],
  1201. Res = pooler:call_free_members(PoolName, fun pooled_gs:ping/1),
  1202. ?assertEqual(NumWorkers -2, count_pongs(Res)),
  1203. [pooler:return_member(PoolName, P) || P <- Pids]
  1204. end},
  1205. {"perform an op where the op crashes all free members",
  1206. fun() ->
  1207. ?assertEqual(NumWorkers, (dump_pool(PoolName))#pool.free_count),
  1208. Res = pooler:call_free_members(PoolName,
  1209. fun pooled_gs:error_on_call/1),
  1210. ?assertEqual(NumWorkers, count_errors(Res))
  1211. end}
  1212. ]}}.
  1213. count_pongs(Result) ->
  1214. lists:foldl(fun({ok, pong}, Acc) -> Acc + 1;
  1215. ({error, _}, Acc) -> Acc
  1216. end,
  1217. 0,
  1218. Result).
  1219. count_errors(Result) ->
  1220. lists:foldl(fun({error, _}, Acc) -> Acc + 1;
  1221. ({ok, _}, Acc) -> Acc
  1222. end,
  1223. 0,
  1224. Result).
  1225. % testing crash recovery means race conditions when either pids
  1226. % haven't yet crashed or pooler hasn't recovered. So this helper loops
  1227. % forver until N pids are obtained, ignoring error_no_members.
  1228. get_n_pids(N, Acc) ->
  1229. get_n_pids(test_pool_1, N, Acc).
  1230. get_n_pids(_Pool, 0, Acc) ->
  1231. Acc;
  1232. get_n_pids(Pool, N, Acc) ->
  1233. case pooler:take_member(Pool) of
  1234. error_no_members ->
  1235. get_n_pids(Pool, N, Acc);
  1236. Pid ->
  1237. get_n_pids(Pool, N - 1, [Pid|Acc])
  1238. end.
  1239. get_n_pids_group(_Group, 0, Acc) ->
  1240. Acc;
  1241. get_n_pids_group(Group, N, Acc) ->
  1242. case pooler:take_group_member(Group) of
  1243. error_no_members ->
  1244. get_n_pids_group(Group, N, Acc);
  1245. Pid ->
  1246. get_n_pids_group(Group, N - 1, [Pid|Acc])
  1247. end.
  1248. children_count(SupId) ->
  1249. length(supervisor:which_children(SupId)).
  1250. starting_members(PoolName) ->
  1251. length((dump_pool(PoolName))#pool.starting_members).
  1252. dump_pool(PoolName) ->
  1253. gen_server:call(PoolName, dump_pool).
  1254. -ifdef(OTP_RELEASE). % >= OTP-21
  1255. -if(?OTP_RELEASE >= 23).
  1256. -define(USE_PG_NOT_PG2, true).
  1257. -else.
  1258. -undef(USE_PG_NOT_PG2).
  1259. -endif.
  1260. -else. % < OTP-21
  1261. -undef(USE_PG_NOT_PG2).
  1262. -endif.
  1263. -ifdef(USE_PG_NOT_PG2).
  1264. pg_start() ->
  1265. pg:start(_Scope = 'pg').
  1266. pg_stop() ->
  1267. lists:foreach(fun(Group) -> pg:leave(Group, pg:get_members(Group)) end,
  1268. pg:which_groups()).
  1269. pg_leave(Group, Pid) ->
  1270. pg:leave(Group, Pid).
  1271. pg_get_members(Group) ->
  1272. pg:get_members(Group).
  1273. -else.
  1274. pg_start() ->
  1275. pg2:start().
  1276. pg_stop() ->
  1277. application:stop(pg2).
  1278. pg_leave(Group, Pid) ->
  1279. pg2:leave(Group, Pid).
  1280. pg_get_members(Group) ->
  1281. pg2:get_members(Group).
  1282. -endif.