123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422 |
- -module(prop_pooler).
- -export([
- prop_fixed_start/1,
- prop_fixed_checkout_all/1,
- prop_dynamic_checkout/1,
- prop_fixed_take_return/1,
- prop_fixed_take_return_broken/1,
- prop_fixed_client_died/1,
- prop_group_take_return/1
- ]).
- -include_lib("proper/include/proper.hrl").
- -include_lib("stdlib/include/assert.hrl").
- prop_fixed_start(doc) ->
- "Check that the pool of any fixed size can be started, internal statistics is correct".
- prop_fixed_start() ->
- Conf0 =
- #{
- name => ?FUNCTION_NAME,
- start_mfa => {pooled_gs, start_link, [{?FUNCTION_NAME}]}
- },
- ?FORALL(
- Size,
- pos_integer(),
- with_pool(
- Conf0#{
- init_count => Size,
- max_count => Size
- },
- fun() ->
- %% Pool is not utilized
- pool_is_free(?FUNCTION_NAME, Size),
- true
- end
- )
- ).
- prop_fixed_checkout_all(doc) ->
- "Can take all members from fixed-size pool. Following attempts will return error. Stats is correct.".
- prop_fixed_checkout_all() ->
- Conf0 = #{
- name => ?FUNCTION_NAME,
- start_mfa => {pooled_gs, start_link, [{?FUNCTION_NAME}]}
- },
- ?FORALL(
- Size,
- pos_integer(),
- with_pool(
- Conf0#{
- init_count => Size,
- max_count => Size
- },
- fun() ->
- ?assert(
- lists:all(
- fun(Res) -> is_pid(Res) end,
- take_n(?FUNCTION_NAME, 0, Size)
- )
- ),
- %% Fixed pool - can't take more members than pool size
- ?assertEqual(error_no_members, pooler:take_member(?FUNCTION_NAME, 10)),
- %% Pool is fully utilized
- pool_is_utilized(?FUNCTION_NAME, self(), Size),
- true
- end
- )
- ).
- prop_dynamic_checkout(doc) ->
- "It's possible to take all fixed and then all dynamic members, but no more than max_count; stats is correct".
- prop_dynamic_checkout() ->
- Conf0 = #{
- name => ?FUNCTION_NAME,
- max_age => {1, min},
- start_mfa => {pooled_gs, start_link, [{?FUNCTION_NAME}]}
- },
- ?FORALL(
- {Size, Extra},
- {pos_integer(), pos_integer()},
- with_pool(
- Conf0#{
- init_count => Size,
- max_count => Size + Extra
- },
- fun() ->
- MaxCount = Size + Extra,
- ?assert(
- lists:all(
- fun(Res) -> is_pid(Res) end,
- take_n(?FUNCTION_NAME, 0, Size)
- )
- ),
- %% Fixed pool is fully utilized up to init_count
- pool_is_utilized(?FUNCTION_NAME, self(), Size),
- %% Take all dynamic workers
- ?assert(
- lists:all(
- fun(Res) -> is_pid(Res) end,
- take_n(?FUNCTION_NAME, 1000, Extra)
- )
- ),
- %% Pool is fully utilized now
- ?assertEqual(error_no_members, pooler:take_member(?FUNCTION_NAME, 10)),
- %% Dynamic pool is fully utilized up to max_count
- pool_is_utilized(?FUNCTION_NAME, self(), MaxCount),
- true
- end
- )
- ).
- prop_fixed_take_return(doc) ->
- "The state of the pool is same before all members are taken and after they are returned".
- prop_fixed_take_return() ->
- Conf0 = #{
- name => ?FUNCTION_NAME,
- start_mfa => {pooled_gs, start_link, [{?FUNCTION_NAME}]}
- },
- Stats = fun() ->
- lists:sort([{Pid, State} || {Pid, {_, State, _}} <- pooler:pool_stats(?FUNCTION_NAME)])
- end,
- ?FORALL(
- Size,
- pos_integer(),
- with_pool(
- Conf0#{
- init_count => Size,
- max_count => Size
- },
- fun() ->
- UtilizationBefore = utilization(?FUNCTION_NAME),
- StatsBefore = Stats(),
- Taken = take_n(?FUNCTION_NAME, 0, Size),
- ?assert(lists:all(fun(Res) -> is_pid(Res) end, Taken)),
- pool_is_utilized(?FUNCTION_NAME, self(), Size),
- [pooler:return_member(?FUNCTION_NAME, Pid) || Pid <- Taken],
- pool_is_free(?FUNCTION_NAME, Size),
- UtilizationAfter = utilization(?FUNCTION_NAME),
- StatsAfter = Stats(),
- ?assertEqual(UtilizationBefore, UtilizationAfter),
- ?assertEqual(StatsBefore, StatsAfter),
- true
- end
- )
- ).
- prop_fixed_take_return_broken(doc) ->
- "Pool recovers to initial state when all members are returned with 'fail' flag, but workers are replaced".
- prop_fixed_take_return_broken() ->
- Conf0 = #{
- name => ?FUNCTION_NAME,
- start_mfa => {pooled_gs, start_link, [{?FUNCTION_NAME}]}
- },
- Stats = fun() ->
- lists:sort([{Pid, State} || {Pid, {_, State, _}} <- pooler:pool_stats(?FUNCTION_NAME)])
- end,
- ?FORALL(
- Size,
- pos_integer(),
- with_pool(
- Conf0#{
- init_count => Size,
- max_count => Size
- },
- fun() ->
- UtilizationBefore = utilization(?FUNCTION_NAME),
- StatsBefore = Stats(),
- Taken = take_n(?FUNCTION_NAME, 0, Size),
- ?assert(lists:all(fun(Res) -> is_pid(Res) end, Taken)),
- pool_is_utilized(?FUNCTION_NAME, self(), Size),
- [pooler:return_member(?FUNCTION_NAME, Pid, fail) || Pid <- Taken],
- %% Since failed workers are replaced asynchronously, we need to wait for pool to recover
- UtilizationAfter =
- wait_for_utilization(
- ?FUNCTION_NAME,
- 5000,
- fun(#{free_count := Free, starting_count := Starting}) ->
- Free =:= Size andalso Starting =:= 0
- end
- ),
- pool_is_free(?FUNCTION_NAME, Size),
- StatsAfter = Stats(),
- ?assertEqual(UtilizationBefore, UtilizationAfter),
- {PidsBefore, StatusBefore} = lists:unzip(StatsBefore),
- {PidsAfter, StatusAfter} = lists:unzip(StatsAfter),
- %% all workers have status `free` before and after
- ?assertEqual(StatusBefore, StatusAfter),
- %% however, all workers are new processes, none reused
- ?assertEqual([], ordsets:intersection(ordsets:from_list(PidsBefore), ordsets:from_list(PidsAfter))),
- true
- end
- )
- ).
- prop_fixed_client_died(doc) ->
- "Pool recovers to initial state when client that have taken processes have died with reason 'normal'".
- prop_fixed_client_died() ->
- Conf0 = #{
- name => ?FUNCTION_NAME,
- start_mfa => {pooled_gs, start_link, [{?FUNCTION_NAME}]}
- },
- Stats = fun() ->
- lists:sort([{Pid, State} || {Pid, {_, State, _}} <- pooler:pool_stats(?FUNCTION_NAME)])
- end,
- ?FORALL(
- Size,
- pos_integer(),
- with_pool(
- Conf0#{
- init_count => Size,
- max_count => Size
- },
- fun() ->
- Main = self(),
- UtilizationBefore = utilization(?FUNCTION_NAME),
- StatsBefore = Stats(),
- {Pid, MRef} =
- erlang:spawn_monitor(
- fun() ->
- Taken = take_n(?FUNCTION_NAME, 0, Size),
- ?assert(lists:all(fun(Res) -> is_pid(Res) end, Taken)),
- Main ! {taken, self()},
- receive
- {finish, Main} -> ok
- after 5000 ->
- exit(timeout)
- end,
- exit(normal)
- end
- ),
- %% Wait for spawned client to take all workers
- receive
- {taken, Pid} -> ok
- after 5000 ->
- error(timeout)
- end,
- pool_is_utilized(?FUNCTION_NAME, Pid, Size),
- %% Wait for the client to die
- Pid ! {finish, self()},
- receive
- {'DOWN', MRef, process, Pid, normal} ->
- ok
- after 5000 ->
- error(timeout)
- end,
- %% Since worker monitors are asynchronous, we need to wait for pool to recover
- UtilizationAfter =
- wait_for_utilization(
- ?FUNCTION_NAME,
- 5000,
- fun(#{free_count := Free, in_use_count := InUse}) ->
- Free =:= Size andalso InUse =:= 0
- end
- ),
- pool_is_free(?FUNCTION_NAME, Size),
- StatsAfter = Stats(),
- ?assertEqual(UtilizationBefore, UtilizationAfter),
- ?assertEqual(StatsBefore, StatsAfter),
- true
- end
- )
- ).
- prop_group_take_return(doc) ->
- "Take all workers from all group members - no more workers can be taken. Return them - pools are free.".
- prop_group_take_return() ->
- Conf0 = #{start_mfa => {pooled_gs, start_link, [{?FUNCTION_NAME}]}},
- PoolName = fun(I) -> list_to_atom(atom_to_list(?FUNCTION_NAME) ++ integer_to_list(I)) end,
- ?FORALL(
- {NumWorkers, NumPools},
- {pos_integer(), pos_integer()},
- begin
- with_pools(
- [
- Conf0#{
- name => PoolName(I),
- init_count => NumWorkers,
- max_count => NumWorkers,
- group => ?FUNCTION_NAME
- }
- || I <- lists:seq(1, NumPools)
- ],
- fun() ->
- Client = self(),
- %% Group registration is asynchronous, so, need to wait for it to happen
- GroupPoolPids = wait_for_group_size(?FUNCTION_NAME, NumPools, 5000),
- %% All pools are members of the group
- ?assertEqual(NumPools, length(GroupPoolPids)),
- %% It's possible to take all workers from all members of a group
- Taken = group_take_n(?FUNCTION_NAME, NumWorkers * NumPools),
- ?assert(lists:all(fun(Res) -> is_pid(Res) end, Taken)),
- %% All pools are saturated
- ?assertEqual(error_no_members, pooler:take_group_member(?FUNCTION_NAME)),
- %% All pools are utilized
- lists:foreach(
- fun(Pool) -> pool_is_utilized(Pool, Client, NumWorkers) end,
- GroupPoolPids
- ),
- %% Now return all the workers
- [ok = pooler:return_group_member(?FUNCTION_NAME, Pid) || Pid <- Taken],
- %% All pools are free
- lists:foreach(
- fun(Pool) -> pool_is_free(Pool, NumWorkers) end,
- GroupPoolPids
- ),
- true
- end
- )
- end
- ).
- %% Helpers
- take_n(Pool, Timeout, N) when N > 0 ->
- [pooler:take_member(Pool, Timeout) | take_n(Pool, Timeout, N - 1)];
- take_n(_Pool, _Timeout, 0) ->
- [].
- group_take_n(Group, N) when N > 0 ->
- [pooler:take_group_member(Group) | group_take_n(Group, N - 1)];
- group_take_n(_Group, 0) ->
- [].
- with_pool(Conf, Fun) ->
- with_pools([Conf], Fun).
- with_pools(Confs, Fun) ->
- pg_start(),
- %% Disable SASL logs
- logger:set_handler_config(default, filters, []),
- try
- {ok, _} = application:ensure_all_started(pooler),
- [{ok, _} = pooler:new_pool(Conf) || Conf <- Confs],
- Res = Fun(),
- [ok = pooler:rm_pool(maps:get(name, Conf)) || Conf <- Confs],
- Res
- after
- application:stop(pooler)
- end.
- wait_for_utilization(Pool, Timeout, Fun) when Timeout > 0 ->
- Utilization = utilization(Pool),
- case Fun(Utilization) of
- true ->
- Utilization;
- false ->
- timer:sleep(50),
- wait_for_utilization(Pool, Timeout - 50, Fun)
- end;
- wait_for_utilization(_, _, _) ->
- error(timeout).
- wait_for_group_size(GroupName, Size, Timeout) when Timeout > 0 ->
- Pools = pooler:group_pools(GroupName),
- case length(Pools) of
- Size ->
- Pools;
- Larger when Larger > Size ->
- error(group_size_exceeded);
- Smaller when Smaller < Size ->
- timer:sleep(50),
- wait_for_group_size(GroupName, Size, Timeout - 50)
- end;
- wait_for_group_size(_, _, _) ->
- error(timeout).
- utilization(Pool) ->
- maps:from_list(pooler:pool_utilization(Pool)).
- pool_is_utilized(Pool, Client, NumWorkers) ->
- Utilization = utilization(Pool),
- ?assertMatch(
- #{
- in_use_count := NumWorkers,
- free_count := 0,
- queued_count := 0
- },
- Utilization
- ),
- %% All members are taken by Client
- ?assert(
- lists:all(
- fun({_, {_, State, _}}) -> State =:= Client end,
- pooler:pool_stats(Pool)
- )
- ),
- true.
- pool_is_free(Pool, NumWorkers) ->
- Utilization = utilization(Pool),
- ?assertMatch(
- #{
- in_use_count := 0,
- free_count := NumWorkers,
- queued_count := 0
- },
- Utilization
- ),
- %% All members are free
- ?assert(
- lists:all(
- fun({_, {_, State, _}}) -> State =:= free end,
- pooler:pool_stats(Pool)
- )
- ),
- true.
- -if(?OTP_RELEASE >= 23).
- pg_start() ->
- pg:start(pg).
- -else.
- pg_start() ->
- pg2:start().
- -endif.
|