1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711 |
- -module(pooler_tests).
- -include_lib("eunit/include/eunit.hrl").
- -export([sleep_for_configured_timeout/0]).
- % The `user' processes represent users of the pooler library. A user
- % process will take a pid, report details on the pid it has, release
- % and take a new pid, stop cleanly, and crash.
- start_user() ->
- spawn(fun() -> user_loop(start) end).
- user_id(Pid) ->
- Pid ! {get_tc_id, self()},
- receive
- {Type, Id} ->
- {Type, Id}
- end.
- user_new_tc(Pid) ->
- Pid ! new_tc.
- user_stop(Pid) ->
- Pid ! stop.
- user_crash(Pid) ->
- Pid ! crash.
- user_loop(Atom) when Atom =:= error_no_members orelse Atom =:= start ->
- user_loop(pooler:take_member(test_pool_1));
- user_loop(MyTC) ->
- receive
- {get_tc_id, From} ->
- From ! pooled_gs:get_id(MyTC),
- user_loop(MyTC);
- {ping_tc, From} ->
- From ! pooled_gs:ping(MyTC),
- user_loop(MyTC);
- {ping_count, From} ->
- From ! pooled_gs:ping_count(MyTC),
- user_loop(MyTC);
- new_tc ->
- pooler:return_member(test_pool_1, MyTC, ok),
- MyNewTC = pooler:take_member(test_pool_1),
- user_loop(MyNewTC);
- stop ->
- pooler:return_member(test_pool_1, MyTC, ok),
- stopped;
- crash ->
- erlang:error({user_loop, kaboom})
- end.
- % The `tc' processes represent the pids tracked by pooler for testing.
- % They have a type and an ID and can report their type and ID and
- % stop.
- %% tc_loop({Type, Id}) ->
- %% receive
- %% {get_id, From} ->
- %% From ! {ok, Type, Id},
- %% tc_loop({Type, Id});
- %% stop -> stopped;
- %% crash ->
- %% erlang:error({tc_loop, kaboom})
- %% end.
- %% get_tc_id(Pid) ->
- %% Pid ! {get_id, self()},
- %% receive
- %% {ok, Type, Id} ->
- %% {Type, Id}
- %% after 200 ->
- %% timeout
- %% end.
- %% stop_tc(Pid) ->
- %% Pid ! stop.
- %% tc_starter(Type) ->
- %% Ref = make_ref(),
- %% spawn_link(fun() -> tc_loop({Type, Ref}) end).
- %% assert_tc_valid(Pid) ->
- %% ?assertMatch({_Type, _Ref}, get_tc_id(Pid)),
- %% ok.
- % tc_sanity_test() ->
- % Pid1 = tc_starter("1"),
- % {"1", Id1} = get_tc_id(Pid1),
- % Pid2 = tc_starter("1"),
- % {"1", Id2} = get_tc_id(Pid2),
- % ?assertNot(Id1 == Id2),
- % stop_tc(Pid1),
- % stop_tc(Pid2).
- % user_sanity_test() ->
- % Pid1 = tc_starter("1"),
- % User = spawn(fun() -> user_loop(Pid1) end),
- % ?assertMatch({"1", _Ref}, user_id(User)),
- % user_crash(User),
- % stop_tc(Pid1).
- pooler_basics_via_config_test_() ->
- {setup,
- fun() ->
- {ok, _} = error_logger_mon:start_link(),
- error_logger_mon:install_handler(pooler),
- logger:set_handler_config(default, filters, []),
- application:set_env(pooler, metrics_module, fake_metrics),
- fake_metrics:start_link()
- end,
- fun(_X) ->
- error_logger_mon:uninstall_handler(),
- ok = error_logger_mon:stop(),
- fake_metrics:stop()
- end,
- {foreach,
- % setup
- fun() ->
- error_logger_mon:reset(),
- Pools = [
- #{
- name => test_pool_1,
- max_count => 3,
- init_count => 2,
- cull_interval => {0, min},
- start_mfa => {pooled_gs, start_link, [{"type-0"}]}
- }
- ],
- application:set_env(pooler, pools, Pools),
- application:start(pooler)
- end,
- fun(_X) ->
- application:stop(pooler)
- end,
- basic_tests()}}.
- pooler_basics_dynamic_test_() ->
- {setup,
- fun() ->
- {ok, _} = error_logger_mon:start_link(),
- error_logger_mon:install_handler(pooler),
- logger:set_handler_config(default, filters, []),
- application:set_env(pooler, metrics_module, fake_metrics),
- fake_metrics:start_link()
- end,
- fun(_X) ->
- error_logger_mon:uninstall_handler(),
- ok = error_logger_mon:stop(),
- fake_metrics:stop()
- end,
- {foreach,
- % setup
- fun() ->
- error_logger_mon:reset(),
- Pool = #{
- name => test_pool_1,
- max_count => 3,
- init_count => 2,
- start_mfa => {pooled_gs, start_link, [{"type-0"}]}
- },
- application:unset_env(pooler, pools),
- application:start(pooler),
- pooler:new_pool(Pool)
- end,
- fun(_X) ->
- application:stop(pooler)
- end,
- basic_tests()}}.
- pooler_basics_integration_to_other_supervisor_test_() ->
- {setup,
- fun() ->
- {ok, _} = error_logger_mon:start_link(),
- error_logger_mon:install_handler(pooler),
- logger:set_handler_config(default, filters, []),
- application:set_env(pooler, metrics_module, fake_metrics),
- fake_metrics:start_link()
- end,
- fun(_X) ->
- error_logger_mon:uninstall_handler(),
- ok = error_logger_mon:stop(),
- fake_metrics:stop()
- end,
- {foreach,
- % setup
- fun() ->
- error_logger_mon:reset(),
- Pool = #{
- name => test_pool_1,
- max_count => 3,
- init_count => 2,
- start_mfa => {pooled_gs, start_link, [{"type-0"}]}
- },
- application:unset_env(pooler, pools),
- application:start(pooler),
- supervisor:start_link(fake_external_supervisor, Pool)
- end,
- fun({ok, SupPid}) ->
- exit(SupPid, normal),
- application:stop(pooler)
- end,
- basic_tests()}}.
- basic_tests() ->
- [
- {"there are init_count members at start", fun() ->
- Stats = [P || {P, {_, free, _}} <- pooler:pool_stats(test_pool_1)],
- ?assertEqual(2, length(Stats))
- end},
- {"take and return one", fun() ->
- P = pooler:take_member(test_pool_1),
- ?assertMatch({"type-0", _Id}, pooled_gs:get_id(P)),
- ok = pooler:return_member(test_pool_1, P, ok)
- end},
- {"take and return one, named pool", fun() ->
- P = pooler:take_member(test_pool_1),
- ?assertMatch({"type-0", _Id}, pooled_gs:get_id(P)),
- ok,
- pooler:return_member(test_pool_1, P)
- end},
- {"attempt to take form unknown pool", fun() ->
- %% since pools are now servers, an unknown pool will timeout
- ?assertExit({noproc, _}, pooler:take_member(bad_pool_name))
- end},
- {"members creation is triggered after pool exhaustion until max", fun() ->
- %% init count is 2
- Pids0 = [pooler:take_member(test_pool_1), pooler:take_member(test_pool_1)],
- %% since new member creation is async, can only assert
- %% that we will get a pid, but may not be first try.
- Pids = get_n_pids(1, Pids0),
- %% pool is at max now, requests should give error
- ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
- ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
- PRefs = [R || {_T, R} <- [pooled_gs:get_id(P) || P <- Pids]],
- % no duplicates
- ?assertEqual(length(PRefs), length(lists:usort(PRefs)))
- end},
- {"pids are reused most recent return first", fun() ->
- P1 = pooler:take_member(test_pool_1),
- P2 = pooler:take_member(test_pool_1),
- ?assertNot(P1 == P2),
- ok = pooler:return_member(test_pool_1, P1, ok),
- ok = pooler:return_member(test_pool_1, P2, ok),
- % pids are reused most recent first
- ?assertEqual(P2, pooler:take_member(test_pool_1)),
- ?assertEqual(P1, pooler:take_member(test_pool_1))
- end},
- {"if an in-use pid crashes it is replaced", fun() ->
- Pids0 = get_n_pids(3, []),
- Ids0 = [pooled_gs:get_id(P) || P <- Pids0],
- % crash them all
- [pooled_gs:crash(P) || P <- Pids0],
- Pids1 = get_n_pids(3, []),
- Ids1 = [pooled_gs:get_id(P) || P <- Pids1],
- [?assertNot(lists:member(I, Ids0)) || I <- Ids1]
- end},
- {"if a free pid crashes it is replaced", fun() ->
- FreePids = [P || {P, {_, free, _}} <- pooler:pool_stats(test_pool_1)],
- [exit(P, kill) || P <- FreePids],
- Pids1 = get_n_pids(3, []),
- ?assertEqual(3, length(Pids1))
- end},
- {"if a pid is returned with bad status it is replaced", fun() ->
- Pids0 = get_n_pids(3, []),
- Ids0 = [pooled_gs:get_id(P) || P <- Pids0],
- % return them all marking as bad
- [pooler:return_member(test_pool_1, P, fail) || P <- Pids0],
- Pids1 = get_n_pids(3, []),
- Ids1 = [pooled_gs:get_id(P) || P <- Pids1],
- [?assertNot(lists:member(I, Ids0)) || I <- Ids1]
- end},
- {"if a consumer crashes, pid is replaced", fun() ->
- Consumer = start_user(),
- StartId = user_id(Consumer),
- user_crash(Consumer),
- NewPid = hd(get_n_pids(1, [])),
- NewId = pooled_gs:get_id(NewPid),
- ?assertNot(NewId == StartId)
- end},
- {"it is ok to return an unknown pid", fun() ->
- Bogus1 = spawn(fun() -> ok end),
- Bogus2 = spawn(fun() -> ok end),
- ?assertEqual(ok, pooler:return_member(test_pool_1, Bogus1, ok)),
- ?assertEqual(ok, pooler:return_member(test_pool_1, Bogus2, fail))
- end},
- {"it is ok to return a pid more than once", fun() ->
- M = pooler:take_member(test_pool_1),
- [
- pooler:return_member(test_pool_1, M)
- || _I <- lists:seq(1, 37)
- ],
- ?assertEqual(
- 36,
- length(
- lists:filter(
- fun
- (
- #{
- msg :=
- {report, #{
- label := "ignored return of free member",
- pid := Pid
- }}
- }
- ) ->
- Pid =:= M;
- (_) ->
- false
- end,
- error_logger_mon:get_msgs()
- )
- )
- ),
- M1 = pooler:take_member(test_pool_1),
- M2 = pooler:take_member(test_pool_1),
- ?assert(M1 =/= M2),
- Pool1 = gen_server:call(test_pool_1, dump_pool),
- ?assertEqual(2, maps:get(in_use_count, Pool1)),
- ?assertEqual(0, maps:get(free_count, Pool1)),
- pooler:return_member(test_pool_1, M1),
- pooler:return_member(test_pool_1, M2),
- Pool2 = gen_server:call(test_pool_1, dump_pool),
- ?assertEqual(0, maps:get(in_use_count, Pool2)),
- ?assertEqual(2, maps:get(free_count, Pool2)),
- ok
- end},
- {"calling return_member on error_no_members is ignored", fun() ->
- ?assertEqual(ok, pooler:return_member(test_pool_1, error_no_members)),
- ?assertEqual(ok, pooler:return_member(test_pool_1, error_no_members, ok)),
- ?assertEqual(ok, pooler:return_member(test_pool_1, error_no_members, fail))
- end},
- {"dynamic pool creation", fun() ->
- PoolSpec = [
- {name, dyn_pool_1},
- {max_count, 3},
- {init_count, 2},
- {start_mfa, {pooled_gs, start_link, [{"dyn-0"}]}}
- ],
- {ok, SupPid1} = pooler:new_pool(PoolSpec),
- ?assert(is_pid(SupPid1)),
- M = pooler:take_member(dyn_pool_1),
- ?assertMatch({"dyn-0", _Id}, pooled_gs:get_id(M)),
- ?assertEqual(ok, pooler:rm_pool(dyn_pool_1)),
- ?assertExit({noproc, _}, pooler:take_member(dyn_pool_1)),
- %% verify pool of same name can be created after removal
- {ok, SupPid2} = pooler:new_pool(PoolSpec),
- ?assert(is_pid(SupPid2)),
- %% remove non-existing pool
- ?assertEqual(ok, pooler:rm_pool(dyn_pool_X)),
- ?assertEqual(ok, pooler:rm_pool(dyn_pool_1))
- end},
- {"metrics have been called (no timeout/queue)", fun() ->
- %% exercise the API to ensure we have certain keys reported as metrics
- fake_metrics:reset_metrics(),
- Pids = [pooler:take_member(test_pool_1) || _I <- lists:seq(1, 10)],
- [pooler:return_member(test_pool_1, P) || P <- Pids],
- catch pooler:take_member(bad_pool_name),
- %% kill and unused member
- exit(hd(Pids), kill),
- %% kill a used member
- KillMe = pooler:take_member(test_pool_1),
- exit(KillMe, kill),
- %% FIXME: We need to wait for pooler to process the
- %% exit message. This is ugly, will fix later.
- % :(
- timer:sleep(200),
- ExpectKeys = lists:sort([
- <<"pooler.test_pool_1.error_no_members_count">>,
- <<"pooler.test_pool_1.events">>,
- <<"pooler.test_pool_1.free_count">>,
- <<"pooler.test_pool_1.in_use_count">>,
- <<"pooler.test_pool_1.killed_free_count">>,
- <<"pooler.test_pool_1.killed_in_use_count">>,
- <<"pooler.test_pool_1.take_rate">>
- ]),
- Metrics = fake_metrics:get_metrics(),
- GotKeys = lists:usort([Name || {Name, _, _} <- Metrics]),
- ?assertEqual(ExpectKeys, GotKeys)
- end},
- {"metrics have been called (with timeout/queue)", fun() ->
- %% exercise the API to ensure we have certain keys reported as metrics
- fake_metrics:reset_metrics(),
- %% pass a non-zero timeout here to exercise queueing
- Pids = [pooler:take_member(test_pool_1, 1) || _I <- lists:seq(1, 10)],
- [pooler:return_member(test_pool_1, P) || P <- Pids],
- catch pooler:take_member(bad_pool_name),
- %% kill and unused member
- exit(hd(Pids), kill),
- %% kill a used member
- KillMe = pooler:take_member(test_pool_1),
- exit(KillMe, kill),
- %% FIXME: We need to wait for pooler to process the
- %% exit message. This is ugly, will fix later.
- % :(
- timer:sleep(200),
- ExpectKeys = lists:sort([
- <<"pooler.test_pool_1.error_no_members_count">>,
- <<"pooler.test_pool_1.events">>,
- <<"pooler.test_pool_1.free_count">>,
- <<"pooler.test_pool_1.in_use_count">>,
- <<"pooler.test_pool_1.killed_free_count">>,
- <<"pooler.test_pool_1.killed_in_use_count">>,
- <<"pooler.test_pool_1.take_rate">>,
- <<"pooler.test_pool_1.queue_count">>
- ]),
- Metrics = fake_metrics:get_metrics(),
- GotKeys = lists:usort([Name || {Name, _, _} <- Metrics]),
- ?assertEqual(ExpectKeys, GotKeys)
- end},
- {"accept bad member is handled", fun() ->
- Bad = spawn(fun() -> ok end),
- FakeStarter = spawn(fun() -> starter end),
- ?assertEqual(ok, pooler:accept_member(test_pool_1, {FakeStarter, Bad}))
- end},
- {"utilization returns sane results", fun() ->
- #{max_count := MaxCount, queue_max := QueueMax} = dump_pool(test_pool_1),
- ?assertEqual(MaxCount, proplists:get_value(max_count, pooler:pool_utilization(test_pool_1))),
- ?assertEqual(0, proplists:get_value(in_use_count, pooler:pool_utilization(test_pool_1))),
- ?assertEqual(2, proplists:get_value(free_count, pooler:pool_utilization(test_pool_1))),
- ?assertEqual(0, proplists:get_value(queued_count, pooler:pool_utilization(test_pool_1))),
- ?assertEqual(QueueMax, proplists:get_value(queue_max, pooler:pool_utilization(test_pool_1)))
- end}
- ].
- pooler_groups_test_() ->
- {setup,
- fun() ->
- logger:set_handler_config(default, filters, []),
- application:set_env(pooler, metrics_module, fake_metrics),
- fake_metrics:start_link()
- end,
- fun(_X) ->
- fake_metrics:stop()
- end,
- {foreach,
- % setup
- fun() ->
- Pools = [
- [
- {name, test_pool_1},
- {group, group_1},
- {max_count, 3},
- {init_count, 2},
- {start_mfa, {pooled_gs, start_link, [{"type-1-1"}]}}
- ],
- [
- {name, test_pool_2},
- {group, group_1},
- {max_count, 3},
- {init_count, 2},
- {start_mfa, {pooled_gs, start_link, [{"type-1-2"}]}}
- ],
- %% test_pool_3 not part of the group
- [
- {name, test_pool_3},
- {group, undefined},
- {max_count, 3},
- {init_count, 2},
- {start_mfa, {pooled_gs, start_link, [{"type-3"}]}}
- ]
- ],
- application:set_env(pooler, pools, Pools),
- pg_start(),
- application:start(pooler)
- end,
- fun(_X) ->
- application:stop(pooler),
- pg_stop()
- end,
- [
- {"take and return one group member (repeated)", fun() ->
- Types = [
- begin
- Pid = pooler:take_group_member(group_1),
- ?assert(is_pid(Pid), [{result, Pid}, {i, I}]),
- {Type, _} = pooled_gs:get_id(Pid),
- ?assertMatch("type-1" ++ _, Type),
- ok = pooler:return_group_member(group_1, Pid, ok),
- timer:sleep(10),
- Type
- end
- || I <- lists:seq(1, 50)
- ],
- Type_1_1 = [X || "type-1-1" = X <- Types],
- Type_1_2 = [X || "type-1-2" = X <- Types],
- ?assert(length(Type_1_1) > 0, [{types, Types}]),
- ?assert(length(Type_1_2) > 0, [{types, Types}])
- end},
- {"take member from unknown group", fun() ->
- ?assertEqual(
- error_no_members,
- pooler:take_group_member(not_a_group)
- )
- end},
- {"return member to unknown group", fun() ->
- Pid = pooler:take_group_member(group_1),
- ?assertEqual(ok, pooler:return_group_member(no_such_group, Pid))
- end},
- {"return member to wrong group", fun() ->
- Pid = pooler:take_member(test_pool_3),
- ?assertEqual(ok, pooler:return_group_member(group_1, Pid))
- end},
- {"return member with something which is not a pid", fun() ->
- ?assertException(error, _, pooler:return_group_member(group_1, not_pid))
- end},
- {"take member from empty group", fun() ->
- %% artificially empty group member list
- [pg_leave(group_1, M) || M <- pooler:group_pools(group_1)],
- ?assertEqual(error_no_members, pooler:take_group_member(group_1))
- end},
- {"return member to group, implied ok", fun() ->
- Pid = pooler:take_group_member(group_1),
- ?assertEqual(ok, pooler:return_group_member(group_1, Pid))
- end},
- {"return error_no_member to group", fun() ->
- ?assertEqual(ok, pooler:return_group_member(group_1, error_no_members))
- end},
- {"exhaust pools in group", fun() ->
- Pids = get_n_pids_group(group_1, 6, []),
- %% they should all be pids
- [
- begin
- {Type, _} = pooled_gs:get_id(P),
- ?assertMatch("type-1" ++ _, Type),
- ok
- end
- || P <- Pids
- ],
- %% further attempts should be error
- [
- error_no_members,
- error_no_members,
- error_no_members
- ] = [
- pooler:take_group_member(group_1)
- || _I <- lists:seq(1, 3)
- ]
- end},
- {"rm_group with nonexisting group", fun() ->
- ?assertEqual(ok, pooler:rm_group(i_dont_exist))
- end},
- {"rm_group with existing empty group", fun() ->
- ?assertEqual(ok, pooler:rm_pool(test_pool_1)),
- ?assertEqual(ok, pooler:rm_pool(test_pool_2)),
- % process group de-registration is asynchronous
- timer:sleep(100),
- ?assertEqual(error_no_members, pooler:take_group_member(group_1)),
- ?assertEqual(ok, pooler:rm_group(group_1)),
- ?assertExit({noproc, _}, pooler:take_member(test_pool_1)),
- ?assertExit({noproc, _}, pooler:take_member(test_pool_2)),
- ?assertEqual(
- error_no_members,
- pooler:take_group_member(group_1)
- )
- end},
- {"rm_group with existing non-empty group", fun() ->
- %% Verify that group members exist
- MemberPid = pooler:take_group_member(group_1),
- ?assert(is_pid(MemberPid)),
- pooler:return_group_member(group_1, MemberPid),
- Pool1Pid = pooler:take_member(test_pool_1),
- ?assert(is_pid(Pool1Pid)),
- pooler:return_member(test_pool_1, Pool1Pid),
- Pool2Pid = pooler:take_member(test_pool_2),
- ?assert(is_pid(Pool2Pid)),
- pooler:return_member(test_pool_2, Pool2Pid),
- %% Delete and verify that group and pools are destroyed
- ?assertEqual(ok, pooler:rm_group(group_1)),
- ?assertExit({noproc, _}, pooler:take_member(test_pool_1)),
- ?assertExit({noproc, _}, pooler:take_member(test_pool_2)),
- ?assertEqual(
- error_no_members,
- pooler:take_group_member(group_1)
- )
- end}
- ]}}.
- pooler_limit_failed_adds_test_() ->
- %% verify that pooler crashes completely if too many failures are
- %% encountered while trying to add pids.
- {setup,
- fun() ->
- logger:set_handler_config(default, filters, []),
- Pools = [
- [
- {name, test_pool_1},
- {max_count, 10},
- {init_count, 10},
- {start_mfa, {pooled_gs, start_link, [crash]}}
- ]
- ],
- application:set_env(pooler, pools, Pools)
- end,
- fun(_) ->
- application:stop(pooler)
- end,
- fun() ->
- application:start(pooler),
- ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
- ?assertEqual(error_no_members, pooler:take_member(test_pool_1))
- end}.
- pooler_scheduled_cull_test_() ->
- {setup,
- fun() ->
- logger:set_handler_config(default, filters, []),
- application:set_env(pooler, metrics_module, fake_metrics),
- fake_metrics:start_link(),
- Pools = [
- [
- {name, test_pool_1},
- {max_count, 10},
- {init_count, 2},
- {start_mfa, {pooled_gs, start_link, [{"type-0"}]}},
- {cull_interval, {200, ms}},
- {max_age, {0, min}}
- ]
- ],
- application:set_env(pooler, pools, Pools),
- application:start(pooler)
- end,
- fun(_X) ->
- fake_metrics:stop(),
- application:stop(pooler)
- end,
- [
- {foreach,
- fun() ->
- Pids = get_n_pids(test_pool_1, 10, []),
- ?assertEqual(10, length(pooler:pool_stats(test_pool_1))),
- ?assertEqual(10, length(Pids)),
- Pids
- end,
- fun(Pids) ->
- [pooler:return_member(test_pool_1, P) || P <- Pids]
- end,
- [
- fun(Pids) ->
- {"excess members are culled run 1", fun() ->
- [pooler:return_member(test_pool_1, P) || P <- Pids],
- %% wait for longer than cull delay
- timer:sleep(250),
- ?assertEqual(2, length(pooler:pool_stats(test_pool_1))),
- ?assertEqual(2, proplists:get_value(free_count, pooler:pool_utilization(test_pool_1)))
- end}
- end,
- fun(Pids) ->
- {"excess members are culled run 2", fun() ->
- [pooler:return_member(test_pool_1, P) || P <- Pids],
- %% wait for longer than cull delay
- timer:sleep(250),
- ?assertEqual(2, length(pooler:pool_stats(test_pool_1))),
- ?assertEqual(2, proplists:get_value(free_count, pooler:pool_utilization(test_pool_1)))
- end}
- end,
- fun(Pids) -> in_use_members_not_culled(Pids, 1) end,
- fun(Pids) -> in_use_members_not_culled(Pids, 2) end,
- fun(Pids) -> in_use_members_not_culled(Pids, 3) end,
- fun(Pids) -> in_use_members_not_culled(Pids, 4) end,
- fun(Pids) -> in_use_members_not_culled(Pids, 5) end,
- fun(Pids) -> in_use_members_not_culled(Pids, 6) end
- ]},
- {"no cull when init_count matches max_count",
- %% not sure how to verify this. But this test at least
- %% exercises the code path.
- fun() ->
- Config = [
- {name, test_static_pool_1},
- {max_count, 2},
- {init_count, 2},
- {start_mfa, {pooled_gs, start_link, [{"static-0"}]}},
- % ignored
- {cull_interval, {200, ms}}
- ],
- pooler:new_pool(Config),
- P = pooler:take_member(test_static_pool_1),
- ?assertMatch({"static-0", _}, pooled_gs:get_id(P)),
- pooler:return_member(test_static_pool_1, P),
- ok
- end}
- ]}.
- in_use_members_not_culled(Pids, N) ->
- {"in-use members are not culled " ++ erlang:integer_to_list(N), fun() ->
- %% wait for longer than cull delay
- timer:sleep(250),
- PidCount = length(Pids),
- ?assertEqual(
- PidCount,
- length(pooler:pool_stats(test_pool_1))
- ),
- ?assertEqual(0, proplists:get_value(free_count, pooler:pool_utilization(test_pool_1))),
- ?assertEqual(PidCount, proplists:get_value(in_use_count, pooler:pool_utilization(test_pool_1))),
- Returns = lists:sublist(Pids, N),
- [
- pooler:return_member(test_pool_1, P)
- || P <- Returns
- ],
- timer:sleep(250),
- ?assertEqual(
- PidCount - N,
- length(pooler:pool_stats(test_pool_1))
- )
- end}.
- random_message_test_() ->
- {setup,
- fun() ->
- logger:set_handler_config(default, filters, []),
- Pools = [
- [
- {name, test_pool_1},
- {max_count, 2},
- {init_count, 1},
- {start_mfa, {pooled_gs, start_link, [{"type-0"}]}}
- ]
- ],
- application:set_env(pooler, pools, Pools),
- application:start(pooler),
- %% now send some bogus messages
- %% do the call in a throw-away process to avoid timeout error
- spawn(fun() -> catch gen_server:call(test_pool_1, {unexpected_garbage_msg, 5}) end),
- gen_server:cast(test_pool_1, {unexpected_garbage_msg, 6}),
- whereis(test_pool_1) ! {unexpected_garbage_msg, 7},
- ok
- end,
- fun(_) ->
- application:stop(pooler)
- end,
- [
- fun() ->
- Pid = spawn(fun() -> ok end),
- MonMsg = {'DOWN', erlang:make_ref(), process, Pid, because},
- test_pool_1 ! MonMsg
- end,
- fun() ->
- Pid = pooler:take_member(test_pool_1),
- {Type, _} = pooled_gs:get_id(Pid),
- ?assertEqual("type-0", Type)
- end,
- fun() ->
- RawPool = gen_server:call(test_pool_1, dump_pool),
- ?assertEqual(pool, maps:get('$record_name', RawPool))
- end
- ]}.
- pooler_integration_long_init_test_() ->
- {foreach,
- % setup
- fun() ->
- logger:set_handler_config(default, filters, []),
- {ok, _} = error_logger_mon:start_link(),
- error_logger_mon:install_handler(pooler),
- Pool = [
- {name, test_pool_1},
- {max_count, 10},
- {init_count, 0},
- {member_start_timeout, {10, ms}},
- {start_mfa, {pooled_gs, start_link, [{"type-0", fun() -> timer:sleep(15) end}]}}
- ],
- application:set_env(pooler, pools, [Pool]),
- application:start(pooler)
- end,
- % cleanup
- fun(_) ->
- error_logger_mon:uninstall_handler(),
- ok = error_logger_mon:stop(),
- application:stop(pooler)
- end,
- %
- [
- fun(_) ->
- % Test what happens when pool members take too long to start.
- % The pooler_starter should kill off stale members, there by
- % reducing the number of children of the member_sup. This
- % activity occurs both during take member and accept member.
- % Accordingly, the count should go to zero once all starters
- % check in.
- fun() ->
- ?assertEqual(0, children_count(pooler_test_pool_1_member_sup)),
- [
- begin
- ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
- ?assertEqual(1, starting_members(test_pool_1))
- end
- || _ <- lists:seq(1, 10)
- ],
- ?assertEqual(10, children_count(pooler_test_pool_1_member_sup)),
- timer:sleep(150),
- ?assertEqual(0, children_count(pooler_test_pool_1_member_sup)),
- ?assertEqual(0, starting_members(test_pool_1)),
- %% there is a log when worker start times out
- ?assert(
- lists:any(
- fun
- (
- #{
- level := error,
- msg :=
- {report, #{
- label := "starting member timeout",
- pool := test_pool_1
- }}
- }
- ) ->
- true;
- (_) ->
- false
- end,
- error_logger_mon:get_msgs()
- )
- )
- end
- end
- ]}.
- sleep_for_configured_timeout() ->
- SleepTime =
- case application:get_env(pooler, sleep_time) of
- {ok, Val} ->
- Val;
- _ ->
- 0
- end,
- timer:sleep(SleepTime).
- pooler_integration_queueing_test_() ->
- {foreach,
- % setup
- fun() ->
- logger:set_handler_config(default, filters, []),
- Pool = [
- {name, test_pool_1},
- {max_count, 10},
- {queue_max, 10},
- {init_count, 0},
- {metrics, fake_metrics},
- {member_start_timeout, {5, sec}},
- {start_mfa,
- {pooled_gs, start_link, [
- {"type-0", fun pooler_tests:sleep_for_configured_timeout/0}
- ]}}
- ],
- application:set_env(pooler, pools, [Pool]),
- fake_metrics:start_link(),
- application:start(pooler)
- end,
- % cleanup
- fun(_) ->
- fake_metrics:stop(),
- application:stop(pooler)
- end,
- [
- fun(_) ->
- fun() ->
- ?assertEqual(0, maps:get(free_count, dump_pool(test_pool_1))),
- Val = pooler:take_member(test_pool_1, 10),
- ?assert(is_pid(Val)),
- pooler:return_member(test_pool_1, Val)
- end
- end,
- fun(_) ->
- fun() ->
- application:set_env(pooler, sleep_time, 1),
- ?assertEqual(0, maps:get(free_count, dump_pool(test_pool_1))),
- Val = pooler:take_member(test_pool_1, 0),
- ?assertEqual(error_no_members, Val),
- ?assertEqual(0, proplists:get_value(free_count, pooler:pool_utilization(test_pool_1))),
- timer:sleep(50),
- %Next request should be available
- Pid = pooler:take_member(test_pool_1, 0),
- ?assert(is_pid(Pid)),
- pooler:return_member(test_pool_1, Pid)
- end
- end,
- fun(_) ->
- fun() ->
- application:set_env(pooler, sleep_time, 10),
- ?assertEqual(0, maps:get(free_count, dump_pool(test_pool_1))),
- [
- ?assertEqual(pooler:take_member(test_pool_1, 0), error_no_members)
- || _ <- lists:seq(1, maps:get(max_count, dump_pool(test_pool_1)))
- ],
- timer:sleep(50),
- %Next request should be available
- Pid = pooler:take_member(test_pool_1, 0),
- ?assert(is_pid(Pid)),
- pooler:return_member(test_pool_1, Pid)
- end
- end,
- fun(_) ->
- fun() ->
- % fill to queue_max, next request should return immediately with no_members
- % Will return a if queue max is not enforced.
- application:set_env(pooler, sleep_time, 100),
- [
- proc_lib:spawn(fun() ->
- Val = pooler:take_member(test_pool_1, 200),
- ?assert(is_pid(Val)),
- pooler:return_member(Val)
- end)
- || _ <- lists:seq(1, maps:get(max_count, dump_pool(test_pool_1)))
- ],
- ?assertEqual(0, proplists:get_value(free_count, pooler:pool_utilization(test_pool_1))),
- ?assert(proplists:get_value(queued_count, pooler:pool_utilization(test_pool_1)) >= 1),
- ?assertEqual(10, proplists:get_value(queue_max, pooler:pool_utilization(test_pool_1))),
- timer:sleep(50),
- ?assertEqual(10, queue:len(maps:get(queued_requestors, dump_pool(test_pool_1)))),
- ?assertEqual(10, proplists:get_value(queue_max, pooler:pool_utilization(test_pool_1))),
- ?assertEqual(pooler:take_member(test_pool_1, 500), error_no_members),
- ExpectKeys = lists:sort([
- <<"pooler.test_pool_1.error_no_members_count">>,
- <<"pooler.test_pool_1.events">>,
- <<"pooler.test_pool_1.take_rate">>,
- <<"pooler.test_pool_1.queue_count">>,
- <<"pooler.test_pool_1.queue_max_reached">>
- ]),
- Metrics = fake_metrics:get_metrics(),
- GotKeys = lists:usort([Name || {Name, _, _} <- Metrics]),
- ?assertEqual(ExpectKeys, GotKeys),
- timer:sleep(100),
- Val = pooler:take_member(test_pool_1, 500),
- ?assert(is_pid(Val)),
- pooler:return_member(test_pool_1, Val)
- end
- end
- ]}.
- pooler_integration_queueing_return_member_test_() ->
- {foreach,
- % setup
- fun() ->
- logger:set_handler_config(default, filters, []),
- Pool = [
- {name, test_pool_1},
- {max_count, 10},
- {queue_max, 10},
- {init_count, 10},
- {metrics, fake_metrics},
- {member_start_timeout, {5, sec}},
- {start_mfa,
- {pooled_gs, start_link, [
- {"type-0", fun pooler_tests:sleep_for_configured_timeout/0}
- ]}}
- ],
- application:set_env(pooler, pools, [Pool]),
- fake_metrics:start_link(),
- application:start(pooler)
- end,
- % cleanup
- fun(_) ->
- fake_metrics:stop(),
- application:stop(pooler)
- end,
- [
- fun(_) ->
- fun() ->
- application:set_env(pooler, sleep_time, 0),
- Parent = self(),
- Pids = [
- proc_lib:spawn_link(fun() ->
- Val = pooler:take_member(test_pool_1, 200),
- ?assert(is_pid(Val)),
- Parent ! {taken, self()},
- receive
- return ->
- pooler:return_member(test_pool_1, Val)
- after 5000 ->
- pooler:return_member(test_pool_1, Val)
- end,
- Parent ! {returned, self()}
- end)
- || _ <- lists:seq(1, maps:get(max_count, dump_pool(test_pool_1)))
- ],
- [
- receive
- {taken, Pid} -> ok
- end
- || Pid <- Pids
- ],
- ?assertEqual(error_no_members, pooler:take_member(test_pool_1)),
- proc_lib:spawn_link(fun() ->
- Val = pooler:take_member(test_pool_1, 200),
- Parent ! {extra_result, Val}
- end),
- [Pid ! return || Pid <- Pids],
- [
- receive
- {returned, Pid} -> ok
- end
- || Pid <- Pids
- ],
- receive
- {extra_result, Result} ->
- ?assert(is_pid(Result)),
- pooler:return_member(test_pool_1, Result)
- end,
- ?assertEqual(
- maps:get(max_count, dump_pool(test_pool_1)),
- length(maps:get(free_pids, dump_pool(test_pool_1)))
- ),
- ?assertEqual(
- maps:get(max_count, dump_pool(test_pool_1)),
- maps:get(free_count, dump_pool(test_pool_1))
- )
- end
- end
- ]}.
- pooler_integration_test_() ->
- {foreach,
- % setup
- fun() ->
- logger:set_handler_config(default, filters, []),
- Pools = [
- [
- {name, test_pool_1},
- {max_count, 10},
- {init_count, 10},
- {start_mfa, {pooled_gs, start_link, [{"type-0"}]}}
- ]
- ],
- application:set_env(pooler, pools, Pools),
- application:start(pooler),
- Users = [start_user() || _X <- lists:seq(1, 10)],
- Users
- end,
- % cleanup
- fun(Users) ->
- [user_stop(U) || U <- Users],
- application:stop(pooler)
- end,
- %
- [
- fun(Users) ->
- fun() ->
- % each user has a different tc ID
- TcIds = lists:sort([user_id(UPid) || UPid <- Users]),
- ?assertEqual(lists:usort(TcIds), TcIds)
- end
- end,
- fun(Users) ->
- fun() ->
- % users still unique after a renew cycle
- [user_new_tc(UPid) || UPid <- Users],
- TcIds = lists:sort([user_id(UPid) || UPid <- Users]),
- ?assertEqual(lists:usort(TcIds), TcIds)
- end
- end,
- fun(Users) ->
- fun() ->
- % all users crash, pids are replaced
- TcIds1 = lists:sort([user_id(UPid) || UPid <- Users]),
- [user_crash(UPid) || UPid <- Users],
- Seq = lists:seq(1, 5),
- Users2 = [start_user() || _X <- Seq],
- TcIds2 = lists:sort([user_id(UPid) || UPid <- Users2]),
- Both =
- sets:to_list(
- sets:intersection([
- sets:from_list(TcIds1),
- sets:from_list(TcIds2)
- ])
- ),
- ?assertEqual([], Both)
- end
- end
- ]}.
- pooler_auto_grow_disabled_by_default_test_() ->
- {setup,
- fun() ->
- logger:set_handler_config(default, filters, []),
- application:set_env(pooler, metrics_module, fake_metrics),
- fake_metrics:start_link()
- end,
- fun(_X) ->
- fake_metrics:stop()
- end,
- {foreach,
- % setup
- fun() ->
- Pool = [
- {name, test_pool_1},
- {max_count, 5},
- {init_count, 2},
- {start_mfa, {pooled_gs, start_link, [{"type-0"}]}}
- ],
- application:unset_env(pooler, pools),
- application:start(pooler),
- pooler:new_pool(Pool)
- end,
- fun(_X) ->
- application:stop(pooler)
- end,
- [
- {"take one, and it should not auto-grow", fun() ->
- ?assertEqual(2, maps:get(free_count, dump_pool(test_pool_1))),
- P = pooler:take_member(test_pool_1),
- ?assertMatch({"type-0", _Id}, pooled_gs:get_id(P)),
- timer:sleep(100),
- ?assertEqual(1, maps:get(free_count, dump_pool(test_pool_1))),
- ok,
- pooler:return_member(test_pool_1, P)
- end}
- ]}}.
- pooler_auto_grow_enabled_test_() ->
- {setup,
- fun() ->
- logger:set_handler_config(default, filters, []),
- application:set_env(pooler, metrics_module, fake_metrics),
- fake_metrics:start_link()
- end,
- fun(_X) ->
- fake_metrics:stop()
- end,
- {foreach,
- % setup
- fun() ->
- Pool = [
- {name, test_pool_1},
- {max_count, 5},
- {init_count, 2},
- {auto_grow_threshold, 1},
- {start_mfa, {pooled_gs, start_link, [{"type-0"}]}}
- ],
- application:unset_env(pooler, pools),
- application:start(pooler),
- pooler:new_pool(Pool)
- end,
- fun(_X) ->
- application:stop(pooler)
- end,
- [
- {"take one, and it should grow by 2", fun() ->
- ?assertEqual(2, maps:get(free_count, dump_pool(test_pool_1))),
- P = pooler:take_member(test_pool_1),
- ?assertMatch({"type-0", _Id}, pooled_gs:get_id(P)),
- timer:sleep(100),
- ?assertEqual(3, maps:get(free_count, dump_pool(test_pool_1))),
- ok,
- pooler:return_member(test_pool_1, P)
- end}
- ]}}.
- pooler_custom_stop_mfa_test_() ->
- {foreach,
- fun() ->
- logger:set_handler_config(default, filters, []),
- Pool = [
- {name, test_pool_1},
- {max_count, 3},
- {init_count, 2},
- {cull_interval, {200, ms}},
- {max_age, {0, min}},
- {start_mfa, {pooled_gs, start_link, [{foo_type}]}}
- ],
- application:set_env(pooler, pools, [Pool])
- end,
- fun(_) ->
- application:unset_env(pooler, pools)
- end,
- [
- {"default behavior kills the pool member", fun() ->
- ok = application:start(pooler),
- Reason = monitor_members_trigger_culling_and_return_reason(),
- ok = application:stop(pooler),
- ?assertEqual(killed, Reason)
- end},
- {"custom callback terminates the pool member normally", fun() ->
- {ok, [Pool]} = application:get_env(pooler, pools),
- Stop = {stop_mfa, {pooled_gs, stop, ['$pooler_pid']}},
- application:set_env(pooler, pools, [[Stop | Pool]]),
- ok = application:start(pooler),
- Reason = monitor_members_trigger_culling_and_return_reason(),
- ok = application:stop(pooler),
- ?assertEqual(normal, Reason)
- end}
- ]}.
- no_error_logger_reports_after_culling_test_() ->
- %% damn those wraiths! This is the cure
- {foreach,
- fun() ->
- logger:set_handler_config(default, filters, []),
- {ok, _Pid} = error_logger_mon:start_link(),
- Pool = [
- {name, test_pool_1},
- {max_count, 3},
- {init_count, 2},
- {cull_interval, {200, ms}},
- {max_age, {0, min}},
- {start_mfa, {pooled_gs, start_link, [{type}]}}
- ],
- application:set_env(pooler, pools, [Pool])
- end,
- fun(_) ->
- ok = error_logger_mon:stop(),
- error_logger_mon:uninstall_handler(),
- application:unset_env(pooler, pools)
- end,
- [
- {"Force supervisor to report by using exit/2 instead of terminate_child", fun() ->
- {ok, [Pool]} = application:get_env(pooler, pools),
- Stop = {stop_mfa, {erlang, exit, ['$pooler_pid', kill]}},
- application:set_env(pooler, pools, [[Stop | Pool]]),
- ok = application:start(pooler),
- error_logger_mon:install_handler(),
- error_logger_mon:reset(),
- Reason = monitor_members_trigger_culling_and_return_reason(),
- %% we need to wait for the message to arrive before deleting handler
- timer:sleep(250),
- error_logger_mon:uninstall_handler(),
- ok = application:stop(pooler),
- ?assertEqual(killed, Reason),
- ?assertEqual(
- 1,
- error_logger_mon:get_msg_count(),
- [
- {msgs, error_logger_mon:get_msgs()},
- {m, [R || #{msg := {report, R}} <- error_logger_mon:get_msgs()]}
- ]
- )
- end},
- {"Default MFA shouldn't generate any reports during culling", fun() ->
- ok = application:start(pooler),
- error_logger_mon:install_handler(),
- Reason = monitor_members_trigger_culling_and_return_reason(),
- error_logger_mon:uninstall_handler(),
- ok = application:stop(pooler),
- ?assertEqual(killed, Reason),
- ?assertEqual(0, error_logger_mon:get_msg_count())
- end}
- ]}.
- reconfigure_test_() ->
- Name = test_pool_1,
- InitCount = 2,
- MaxCount = 4,
- StartConfig = #{
- name => Name,
- max_count => MaxCount,
- init_count => InitCount,
- start_mfa => {pooled_gs, start_link, [{reconfigure_test}]}
- },
- {foreach,
- fun() ->
- logger:set_handler_config(default, filters, []),
- application:set_env(pooler, pools, [StartConfig]),
- application:set_env(pooler, metrics_module, pooler_no_metrics),
- application:start(pooler)
- end,
- fun(_) ->
- application:unset_env(pooler, pools),
- application:stop(pooler)
- end,
- [
- {"Raise init_count", fun() ->
- Config1 = StartConfig#{init_count => 3},
- ?assertEqual(
- {ok, [
- {set_parameter, {init_count, 3}},
- {start_workers, 1}
- ]},
- pooler:pool_reconfigure(Name, Config1)
- ),
- ?assertMatch(
- #{
- init_count := 3,
- free_count := 3,
- free_pids := [_, _, _]
- },
- wait_for_dump(Name, 5000, fun(#{free_count := C}) -> C =:= 3 end)
- )
- end},
- {"Lower max_count", fun() ->
- Config1 = StartConfig#{max_count => 1, init_count => 1},
- ?assertEqual(
- {ok, [
- {set_parameter, {init_count, 1}},
- {set_parameter, {max_count, 1}},
- {stop_free_workers, 1}
- ]},
- pooler:pool_reconfigure(Name, Config1)
- ),
- ?assertMatch(
- #{
- init_count := 1,
- max_count := 1,
- free_count := 1,
- free_pids := [_]
- },
- wait_for_dump(Name, 5000, fun(#{free_count := C}) -> C =:= 1 end)
- )
- end},
- {"Lower queue_max", fun() ->
- NewQMax = 4,
- NewConfig = StartConfig#{queue_max => NewQMax},
- Parent = self(),
- NumClients = 10,
- _Clients = lists:map(
- fun(_) ->
- spawn_link(
- fun() ->
- Parent ! pooler:take_member(Name, 60000),
- timer:sleep(60000)
- end
- )
- end,
- lists:seq(1, NumClients)
- ),
- timer:sleep(100),
- % 6
- QueueSize = NumClients - MaxCount,
- %2
- Shrink = QueueSize - NewQMax,
- ?assertEqual(
- {ok, [
- {set_parameter, {queue_max, NewQMax}},
- {shrink_queue, Shrink}
- ]},
- pooler:pool_reconfigure(Name, NewConfig)
- ),
- ?assertMatch(
- [
- W1,
- W2,
- W3,
- W4,
- error_no_members,
- error_no_members
- ] when
- is_pid(W1) andalso
- is_pid(W2) andalso
- is_pid(W3) andalso
- is_pid(W4),
- [
- receive
- M -> M
- after 5000 -> error(timeout)
- end
- || _ <- lists:seq(1, MaxCount + Shrink)
- ]
- ),
- #{
- queue_max := QMax,
- queued_requestors := Q
- } = gen_server:call(Name, dump_pool),
- % queue_max in the state is updated
- ?assertEqual(QMax, NewQMax),
- % queue is full
- ?assertEqual(NewQMax, queue:len(Q))
- end},
- {"Lower cull_interval", fun() ->
- NewConfig = StartConfig#{cull_interval => {10, sec}},
- ?assertEqual(
- {ok, [
- {set_parameter, {cull_interval, {10, sec}}},
- {reset_cull_timer, {10, sec}}
- ]},
- pooler:pool_reconfigure(Name, NewConfig)
- )
- end},
- {"Lower max_age", fun() ->
- NewConfig = StartConfig#{max_age => {100, ms}},
- Workers = [pooler:take_member(Name, 5000) || _ <- lists:seq(1, MaxCount)],
- [pooler:return_member(Name, Pid) || Pid <- Workers],
- ?assertEqual(
- {ok, [
- {set_parameter, {max_age, {100, ms}}},
- {cull, []}
- ]},
- pooler:pool_reconfigure(Name, NewConfig)
- ),
- %% make sure workers are culled
- wait_for_dump(
- Name,
- 1000,
- fun(#{free_count := Free}) ->
- Name ! cull_pool,
- Free =:= InitCount
- end
- )
- end},
- {"Update group", fun() ->
- NewConfig1 = StartConfig#{group => my_group1},
- ?assertEqual(
- {ok, [
- {set_parameter, {group, my_group1}},
- {join_group, my_group1}
- ]},
- pooler:pool_reconfigure(Name, NewConfig1)
- ),
- PoolPid = whereis(Name),
- ?assertMatch([PoolPid], pooler:group_pools(my_group1)),
- NewConfig2 = StartConfig#{group => my_group2},
- ?assertEqual(
- {ok, [
- {set_parameter, {group, my_group2}},
- {leave_group, my_group1},
- {join_group, my_group2}
- ]},
- pooler:pool_reconfigure(Name, NewConfig2)
- ),
- ?assertMatch([], pooler:group_pools(my_group1)),
- ?assertMatch([PoolPid], pooler:group_pools(my_group2)),
- ?assertEqual(
- {ok, [
- {set_parameter, {group, undefined}},
- {leave_group, my_group2}
- ]},
- pooler:pool_reconfigure(Name, StartConfig)
- ),
- ?assertMatch([], pooler:group_pools(my_group1)),
- ?assertMatch([], pooler:group_pools(my_group2))
- end},
- {"Change basic configs", fun() ->
- NewMaxCount = MaxCount + 5,
- NewConfig = StartConfig#{
- max_count => NewMaxCount,
- member_start_timeout => {10, sec},
- queue_max => 100,
- metrics_mod => fake_metrics,
- stop_mfa => {erlang, exit, ['$pooler_pid', shutdown]},
- auto_grow_threshold => 1
- },
- ?assertEqual(
- {ok, [
- {set_parameter, {max_count, NewMaxCount}},
- {set_parameter, {member_start_timeout, {10, sec}}},
- {set_parameter, {queue_max, 100}},
- {set_parameter, {metrics_mod, fake_metrics}},
- {set_parameter, {stop_mfa, {erlang, exit, ['$pooler_pid', shutdown]}}},
- {set_parameter, {auto_grow_threshold, 1}}
- ]},
- pooler:pool_reconfigure(Name, NewConfig)
- ),
- ?assertMatch(
- #{
- max_count := NewMaxCount,
- member_start_timeout := {10, sec},
- queue_max := 100,
- metrics_mod := fake_metrics,
- stop_mfa := {erlang, exit, ['$pooler_pid', shutdown]},
- auto_grow_threshold := 1
- },
- gen_server:call(Name, dump_pool)
- )
- end},
- {"Update failed", fun() ->
- ?assertEqual(
- {error, changed_unsupported_parameter},
- pooler:pool_reconfigure(
- Name, StartConfig#{start_mfa := {erlang, spawn, [a, b, []]}}
- )
- ),
- ?assertEqual(
- {error, changed_unsupported_parameter},
- pooler:pool_reconfigure(
- Name, StartConfig#{name := not_a_pool_name}
- )
- )
- end}
- ]}.
- wait_for_dump(Pool, Timeout, Fun) when Timeout > 0 ->
- Dump = gen_server:call(Pool, dump_pool),
- case Fun(Dump) of
- true ->
- Dump;
- false ->
- timer:sleep(50),
- wait_for_dump(Pool, Timeout - 50, Fun)
- end;
- wait_for_dump(_, _, _) ->
- error(timeout).
- monitor_members_trigger_culling_and_return_reason() ->
- Pids = get_n_pids(test_pool_1, 3, []),
- [erlang:monitor(process, P) || P <- Pids],
- [pooler:return_member(test_pool_1, P) || P <- Pids],
- receive
- {'DOWN', _Ref, process, _Pid, Reason} ->
- Reason
- after 250 -> timeout
- end.
- time_as_millis_test_() ->
- Zeros = [{{0, U}, 0} || U <- [min, sec, ms, mu]],
- Ones = [
- {{1, min}, 60000},
- {{1, sec}, 1000},
- {{1, ms}, 1},
- {{1, mu}, 0}
- ],
- Misc = [{{3000, mu}, 3}],
- Tests = Zeros ++ Ones ++ Misc,
- [?_assertEqual(E, pooler:time_as_millis(I)) || {I, E} <- Tests].
- time_as_micros_test_() ->
- Zeros = [{{0, U}, 0} || U <- [min, sec, ms, mu]],
- Ones = [
- {{1, min}, 60000000},
- {{1, sec}, 1000000},
- {{1, ms}, 1000},
- {{1, mu}, 1}
- ],
- Misc = [{{3000, mu}, 3000}],
- Tests = Zeros ++ Ones ++ Misc,
- [?_assertEqual(E, pooler:time_as_micros(I)) || {I, E} <- Tests].
- call_free_members_test_() ->
- NumWorkers = 10,
- PoolName = test_pool_1,
- {setup,
- fun() ->
- application:set_env(pooler, metrics_module, fake_metrics),
- fake_metrics:start_link()
- end,
- fun(_X) ->
- fake_metrics:stop()
- end,
- {foreach,
- fun() ->
- Pool = [
- {name, PoolName},
- {max_count, NumWorkers},
- {init_count, NumWorkers},
- {start_mfa, {pooled_gs, start_link, [{"type-0"}]}}
- ],
- application:unset_env(pooler, pools),
- application:start(pooler),
- pooler:new_pool(Pool)
- end,
- fun(_X) ->
- application:stop(pooler)
- end,
- [
- {"perform a ping across the pool when all workers are free", fun() ->
- ?assertEqual(NumWorkers, maps:get(free_count, dump_pool(PoolName))),
- Res = pooler:call_free_members(PoolName, fun pooled_gs:ping/1),
- ?assertEqual(NumWorkers, count_pongs(Res))
- end},
- {"perform a ping across the pool when two workers are taken", fun() ->
- ?assertEqual(NumWorkers, maps:get(free_count, dump_pool(PoolName))),
- Pids = [pooler:take_member(PoolName) || _X <- lists:seq(0, 1)],
- Res = pooler:call_free_members(PoolName, fun pooled_gs:ping/1),
- ?assertEqual(NumWorkers - 2, count_pongs(Res)),
- [pooler:return_member(PoolName, P) || P <- Pids]
- end},
- {"perform an op where the op crashes all free members", fun() ->
- ?assertEqual(NumWorkers, maps:get(free_count, dump_pool(PoolName))),
- Res = pooler:call_free_members(
- PoolName,
- fun pooled_gs:error_on_call/1
- ),
- ?assertEqual(NumWorkers, count_errors(Res))
- end}
- ]}}.
- count_pongs(Result) ->
- lists:foldl(
- fun
- ({ok, pong}, Acc) -> Acc + 1;
- ({error, _}, Acc) -> Acc
- end,
- 0,
- Result
- ).
- count_errors(Result) ->
- lists:foldl(
- fun
- ({error, _}, Acc) -> Acc + 1;
- ({ok, _}, Acc) -> Acc
- end,
- 0,
- Result
- ).
- % testing crash recovery means race conditions when either pids
- % haven't yet crashed or pooler hasn't recovered. So this helper loops
- % forver until N pids are obtained, ignoring error_no_members.
- get_n_pids(N, Acc) ->
- get_n_pids(test_pool_1, N, Acc).
- get_n_pids(_Pool, 0, Acc) ->
- Acc;
- get_n_pids(Pool, N, Acc) ->
- case pooler:take_member(Pool) of
- error_no_members ->
- get_n_pids(Pool, N, Acc);
- Pid ->
- get_n_pids(Pool, N - 1, [Pid | Acc])
- end.
- get_n_pids_group(_Group, 0, Acc) ->
- Acc;
- get_n_pids_group(Group, N, Acc) ->
- case pooler:take_group_member(Group) of
- error_no_members ->
- get_n_pids_group(Group, N, Acc);
- Pid ->
- get_n_pids_group(Group, N - 1, [Pid | Acc])
- end.
- children_count(SupId) ->
- length(supervisor:which_children(SupId)).
- starting_members(PoolName) ->
- length(maps:get(starting_members, dump_pool(PoolName))).
- dump_pool(PoolName) ->
- gen_server:call(PoolName, dump_pool).
- % >= OTP-21
- -ifdef(OTP_RELEASE).
- -if(?OTP_RELEASE >= 23).
- -define(USE_PG_NOT_PG2, true).
- -else.
- -undef(USE_PG_NOT_PG2).
- -endif.
- % < OTP-21
- -else.
- -undef(USE_PG_NOT_PG2).
- -endif.
- -ifdef(USE_PG_NOT_PG2).
- pg_start() ->
- pg:start(_Scope = 'pg').
- pg_stop() ->
- lists:foreach(
- fun(Group) -> pg:leave(Group, pg:get_members(Group)) end,
- pg:which_groups()
- ).
- pg_leave(Group, Pid) ->
- pg:leave(Group, Pid).
- -else.
- pg_start() ->
- pg2:start().
- pg_stop() ->
- application:stop(pg2).
- pg_leave(Group, Pid) ->
- pg2:leave(Group, Pid).
- -endif.
|