Browse Source

Add property-based tests; add `pooler:group_pools/1` API

Sergey Prokhorov 2 years ago
parent
commit
4154ed990b
9 changed files with 514 additions and 27 deletions
  1. 7 1
      .github/workflows/ci.yml
  2. 1 0
      .gitignore
  3. 8 1
      Makefile
  4. 4 1
      README.org
  5. 4 2
      rebar.config
  6. 57 14
      src/pooler.erl
  7. 1 1
      src/pooler_config.erl
  8. 1 7
      test/pooler_tests.erl
  9. 431 0
      test/prop_pooler.erl

+ 7 - 1
.github/workflows/ci.yml

@@ -47,7 +47,13 @@ jobs:
         run: make format_check
 
       - name: Eunit test
-        run: make test
+        run: make eunit
+
+      - name: Proper test
+        run: make proper
+
+      - name: Check coverage
+        run: make cover
 
       - name: Generate docs
         run: make doc

+ 1 - 0
.gitignore

@@ -24,5 +24,6 @@ doc/*.html
 doc/*.md
 bench/tests
 bench/deps
+erlang_ls.config
 
 /rebar3

+ 8 - 1
Makefile

@@ -11,10 +11,17 @@ compile: $(REBAR)
 run: $(REBAR)
 	@$(REBAR) as test shell --apps pooler --config config/demo.config
 
-test: $(REBAR)
+eunit: $(REBAR)
 	$(REBAR) eunit --verbose --cover
+
+proper: $(REBAR)
+	$(REBAR) proper --cover
+
+cover: $(REBAR)
 	$(REBAR) cover --verbose --min_coverage $(MINIMAL_COVERAGE)
 
+test: eunit proper cover
+
 xref: $(REBAR)
 	$(REBAR) xref
 

+ 4 - 1
README.org

@@ -309,10 +309,13 @@ pooler_NAME_pool_sup:      all_for_one
 pooler_NAME_member_sup:    simple_one_for_one
 pooler_starter_sup:        simple_one_for_one
 
-Groups of pools are managed using the pg2 application. This imposes a
+Groups of pools are managed using the ~pg~ (OTP-23+) or ~pg2~ (OTP below 23) application. This imposes a
 requirement to set a configuration parameter on the kernel application
 in an OTP release. Like this in sys.config:
 #+begin_src erlang
+% OTP_RELEASE >= 23
+{kernel, [{start_pg, true}]}
+% OTP_RELEASE < 23
 {kernel, [{start_pg2, true}]}
 #+end_src
 

+ 4 - 2
rebar.config

@@ -28,7 +28,8 @@
 {deps, []}.
 
 {project_plugins, [
-    erlfmt
+    erlfmt,
+    rebar3_proper
 ]}.
 
 {erlfmt, [
@@ -51,7 +52,8 @@
         {erl_opts, [nowarn_export_all]}
     ]},
     {test, [
-        {erl_opts, [nowarn_export_all]}
+        {erl_opts, [nowarn_export_all]},
+        {deps, [proper]}
     ]}
 ]}.
 

+ 57 - 14
src/pooler.erl

@@ -40,6 +40,7 @@
     take_group_member/1,
     return_group_member/2,
     return_group_member/3,
+    group_pools/1,
     return_member/2,
     return_member/3,
     pool_stats/1,
@@ -68,6 +69,31 @@
 ]).
 
 %% ------------------------------------------------------------------
+%% Types
+%% ------------------------------------------------------------------
+-export_type([pool_config/0, pool_name/0, group_name/0]).
+
+-type pool_name() :: atom().
+-type group_name() :: atom().
+
+-type pool_config() :: [
+    {name, pool_name()}
+    | {init_count, non_neg_integer()}
+    | {max_count, non_neg_integer()}
+    | {start_mfa, {module(), atom(), [any()]}}
+    | {group, group_name()}
+    | {cull_interval, time_spec()}
+    | {max_age, time_spec()}
+    | {member_start_timeout, time_spec()}
+    | {queue_max, non_neg_integer()}
+    | {metrics_api, folsom | exometer}
+    | {metrics_mod, module()}
+    | {stop_mfa, {module(), atom(), ['$pooler_pid' | any(), ...]}}
+    | {auto_grow_threshold, non_neg_integer()}
+    | {add_member_retry, non_neg_integer()}
+].
+
+%% ------------------------------------------------------------------
 %% Application API
 %% ------------------------------------------------------------------
 
@@ -136,10 +162,12 @@ manual_start() ->
 %% <dd>Time limit for member starts. Specified as `{Time,
 %% Unit}'. Defaults to `{1, min}'.</dd>
 %% </dl>
+-spec new_pool(pool_config()) -> {ok, pid()} | {error, {already_started, pid()}}.
 new_pool(PoolConfig) ->
     pooler_sup:new_pool(PoolConfig).
 
 %% @doc Terminate the named pool.
+-spec rm_pool(pool_name()) -> ok | {error, not_found | running | restarting}.
 rm_pool(PoolName) ->
     pooler_sup:rm_pool(PoolName).
 
@@ -152,7 +180,7 @@ rm_pool(PoolName) ->
 %% The group is NOT terminated if any member pool did not
 %% successfully terminate.
 %%
--spec rm_group(atom()) -> ok | {error, {failed_rm_pools, [atom()]}}.
+-spec rm_group(group_name()) -> ok | {error, {failed_rm_pools, [atom()]}}.
 rm_group(GroupName) ->
     Pools = pg_get_local_members(GroupName),
     case rm_group_members(Pools) of
@@ -180,7 +208,7 @@ rm_group_members(MemberPids) ->
 %% @doc Get child spec described by the proplist `PoolConfig'.
 %%
 %% See {@link pooler:new_pool/1} for info about `PoolConfig'.
--spec pool_child_spec([{atom(), term()}]) -> supervisor:child_spec().
+-spec pool_child_spec(pool_config()) -> supervisor:child_spec().
 pool_child_spec(PoolConfig) ->
     pooler_sup:pool_child_spec(PoolConfig).
 
@@ -193,7 +221,7 @@ accept_member(PoolName, MemberPid) ->
 %%
 %% If no free members are available, 'error_no_members' is returned.
 %%
--spec take_member(atom() | pid()) -> pid() | error_no_members.
+-spec take_member(pool_name() | pid()) -> pid() | error_no_members.
 take_member(PoolName) when is_atom(PoolName) orelse is_pid(PoolName) ->
     gen_server:call(PoolName, {take_member, 0}, infinity).
 
@@ -204,7 +232,7 @@ take_member(PoolName) when is_atom(PoolName) orelse is_pid(PoolName) ->
 %% is available within the specified timeout, error_no_members is returned.
 %% `Timeout' can be either milliseconds as integer or `{duration, time_unit}'
 %%
--spec take_member(atom() | pid(), non_neg_integer() | time_spec()) -> pid() | error_no_members.
+-spec take_member(pool_name() | pid(), non_neg_integer() | time_spec()) -> pid() | error_no_members.
 take_member(PoolName, Timeout) when is_atom(PoolName) orelse is_pid(PoolName) ->
     gen_server:call(PoolName, {take_member, time_as_millis(Timeout)}, infinity).
 
@@ -212,7 +240,7 @@ take_member(PoolName, Timeout) when is_atom(PoolName) orelse is_pid(PoolName) ->
 %% `GroupName'. Returns `MemberPid' or `error_no_members'.  If no
 %% members are available in the randomly chosen pool, all other pools
 %% in the group are tried in order.
--spec take_group_member(atom()) -> pid() | error_no_members.
+-spec take_group_member(group_name()) -> pid() | error_no_members.
 take_group_member(GroupName) ->
     case pg_get_local_members(GroupName) of
         [] ->
@@ -252,7 +280,7 @@ extract_nth(_, [], _) ->
 %% @doc Return a member that was taken from the group
 %% `GroupName'. This is a convenience function for
 %% `return_group_member/3' with `Status' of `ok'.
--spec return_group_member(atom(), pid() | error_no_members) -> ok.
+-spec return_group_member(group_name(), pid() | error_no_members) -> ok.
 return_group_member(GroupName, MemberPid) ->
     return_group_member(GroupName, MemberPid, ok).
 
@@ -260,7 +288,7 @@ return_group_member(GroupName, MemberPid) ->
 %% `Status' is `ok' the member is returned to the pool from which is
 %% came. If `Status' is `fail' the member will be terminated and a new
 %% member added to the appropriate pool.
--spec return_group_member(atom(), pid() | error_no_members, ok | fail) -> ok.
+-spec return_group_member(group_name(), pid() | error_no_members, ok | fail) -> ok.
 return_group_member(_, error_no_members, _) ->
     ok;
 return_group_member(_GroupName, MemberPid, Status) when is_pid(MemberPid) ->
@@ -276,7 +304,7 @@ return_group_member(_GroupName, MemberPid, Status) when is_pid(MemberPid) ->
 %% If `Status' is 'ok', the member is returned to the pool.  If
 %% `Status' is 'fail', the member is destroyed and a new member is
 %% added to the pool in its place.
--spec return_member(atom() | pid(), pid() | error_no_members, ok | fail) -> ok.
+-spec return_member(pool_name() | pid(), pid() | error_no_members, ok | fail) -> ok.
 return_member(PoolName, Pid, Status) when
     is_pid(Pid) andalso
         (is_atom(PoolName) orelse
@@ -291,7 +319,7 @@ return_member(_, error_no_members, _) ->
 
 %% @doc Return a member to the pool so it can be reused.
 %%
--spec return_member(atom() | pid(), pid() | error_no_members) -> ok.
+-spec return_member(pool_name() | pid(), pid() | error_no_members) -> ok.
 return_member(PoolName, Pid) when
     is_pid(Pid) andalso
         (is_atom(PoolName) orelse is_pid(PoolName))
@@ -301,24 +329,37 @@ return_member(PoolName, Pid) when
 return_member(_, error_no_members) ->
     ok.
 
-%% @doc Obtain runtime state info for all pools.
+%% @doc Obtain runtime state info for all workers.
 %%
 %% Format of the return value is subject to change.
--spec pool_stats(atom() | pid()) -> [tuple()].
+-spec pool_stats(pool_name() | pid()) -> [{pid(), {reference(), free | pid(), erlang:timestamp()}}].
 pool_stats(PoolName) ->
     gen_server:call(PoolName, pool_stats).
 
+%% @doc Obtain the pids of all pools which are members of the group.
+-spec group_pools(group_name()) -> [pid()].
+group_pools(GroupName) ->
+    pg_get_local_members(GroupName).
+
 %% @doc Obtain utilization info for a pool.
 %%
 %% Format of the return value is subject to change, but for now it
 %% will be a proplist to maintain backcompat with R16.
--spec pool_utilization(atom() | pid()) -> [{atom(), integer()}].
+-spec pool_utilization(pool_name() | pid()) ->
+    [
+        {max_count, pos_integer()}
+        | {in_use_count, non_neg_integer()}
+        | {free_count, non_neg_integer()}
+        | {starting_count, non_neg_integer()}
+        | {queued_count, non_neg_integer()}
+        | {queue_max, non_neg_integer()}
+    ].
 pool_utilization(PoolName) ->
     gen_server:call(PoolName, pool_utilization).
 
 %% @doc Invokes `Fun' with arity 1 over all free members in pool with `PoolName'.
 %%
--spec call_free_members(atom() | pid(), fun((pid()) -> term())) -> Res when
+-spec call_free_members(pool_name() | pid(), fun((pid()) -> term())) -> Res when
     Res :: [{ok, term()} | {error, term()}].
 call_free_members(PoolName, Fun) when
     (is_atom(PoolName) orelse is_pid(PoolName)) andalso is_function(Fun, 1)
@@ -327,7 +368,7 @@ call_free_members(PoolName, Fun) when
 
 %% @doc Invokes `Fun' with arity 1 over all free members in pool with `PoolName'.
 %% `Timeout' sets the timeout of gen_server call.
--spec call_free_members(atom() | pid(), Fun, timeout()) -> Res when
+-spec call_free_members(pool_name() | pid(), Fun, timeout()) -> Res when
     Fun :: fun((pid()) -> term()),
     Res :: [{ok, term()} | {error, term()}].
 call_free_members(PoolName, Fun, Timeout) when
@@ -1109,6 +1150,7 @@ compute_utilization(#pool{
     max_count = MaxCount,
     in_use_count = InUseCount,
     free_count = FreeCount,
+    starting_members = Starting,
     queued_requestors = Queue,
     queue_max = QueueMax
 }) ->
@@ -1116,6 +1158,7 @@ compute_utilization(#pool{
         {max_count, MaxCount},
         {in_use_count, InUseCount},
         {free_count, FreeCount},
+        {starting_count, length(Starting)},
         %% Note not O(n), so in pathological cases this might be expensive
         {queued_count, queue:len(Queue)},
         {queue_max, QueueMax}

+ 1 - 1
src/pooler_config.erl

@@ -8,7 +8,7 @@
 
 -include("pooler.hrl").
 
--spec list_to_pool([{atom(), term()}]) -> #pool{}.
+-spec list_to_pool(pooler:pool_config()) -> #pool{}.
 list_to_pool(P) ->
     #pool{
         name = req(name, P),

+ 1 - 7
test/pooler_tests.erl

@@ -533,7 +533,7 @@ pooler_groups_test_() ->
 
                 {"take member from empty group", fun() ->
                     %% artificially empty group member list
-                    [pg_leave(group_1, M) || M <- pg_get_members(group_1)],
+                    [pg_leave(group_1, M) || M <- pooler:group_pools(group_1)],
                     ?assertEqual(error_no_members, pooler:take_group_member(group_1))
                 end},
 
@@ -1452,9 +1452,6 @@ pg_stop() ->
 pg_leave(Group, Pid) ->
     pg:leave(Group, Pid).
 
-pg_get_members(Group) ->
-    pg:get_members(Group).
-
 -else.
 
 pg_start() ->
@@ -1466,7 +1463,4 @@ pg_stop() ->
 pg_leave(Group, Pid) ->
     pg2:leave(Group, Pid).
 
-pg_get_members(Group) ->
-    pg2:get_members(Group).
-
 -endif.

+ 431 - 0
test/prop_pooler.erl

@@ -0,0 +1,431 @@
+-module(prop_pooler).
+
+-export([
+    prop_fixed_start/1,
+    prop_fixed_checkout_all/1,
+    prop_dynamic_checkout/1,
+    prop_fixed_take_return/1,
+    prop_fixed_take_return_broken/1,
+    prop_fixed_client_died/1,
+    prop_group_take_return/1
+]).
+
+-include_lib("proper/include/proper.hrl").
+-include_lib("stdlib/include/assert.hrl").
+-include("pooler.hrl").
+
+prop_fixed_start(doc) ->
+    "Check that the pool of any fixed size can be started, internal statistics is correct".
+
+prop_fixed_start() ->
+    Conf0 = [
+        {name, ?FUNCTION_NAME},
+        {start_mfa, {pooled_gs, start_link, [{?FUNCTION_NAME}]}}
+    ],
+    ?FORALL(
+        Size,
+        pos_integer(),
+        with_pool(
+            [
+                {init_count, Size},
+                {max_count, Size}
+                | Conf0
+            ],
+            fun() ->
+                %% Pool is not utilized
+                pool_is_free(?FUNCTION_NAME, Size),
+                true
+            end
+        )
+    ).
+
+prop_fixed_checkout_all(doc) ->
+    "Can take all members from fixed-size pool. Following attempts will return error. Stats is correct.".
+
+prop_fixed_checkout_all() ->
+    Conf0 = [
+        {name, ?FUNCTION_NAME},
+        {start_mfa, {pooled_gs, start_link, [{?FUNCTION_NAME}]}}
+    ],
+    ?FORALL(
+        Size,
+        pos_integer(),
+        with_pool(
+            [
+                {init_count, Size},
+                {max_count, Size}
+                | Conf0
+            ],
+            fun() ->
+                ?assert(
+                    lists:all(
+                        fun(Res) -> is_pid(Res) end,
+                        take_n(?FUNCTION_NAME, 0, Size)
+                    )
+                ),
+                %% Fixed pool - can't take more members than pool size
+                ?assertEqual(error_no_members, pooler:take_member(?FUNCTION_NAME, 10)),
+                %% Pool is fully utilized
+                pool_is_utilized(?FUNCTION_NAME, self(), Size),
+                true
+            end
+        )
+    ).
+
+prop_dynamic_checkout(doc) ->
+    "It's possible to take all fixed and then all dynamic members, but no more than max_count; stats is correct".
+
+prop_dynamic_checkout() ->
+    Conf0 = [
+        {name, ?FUNCTION_NAME},
+        {max_age, {1, min}},
+        {start_mfa, {pooled_gs, start_link, [{?FUNCTION_NAME}]}}
+    ],
+    ?FORALL(
+        {Size, Extra},
+        {pos_integer(), pos_integer()},
+        with_pool(
+            [
+                {init_count, Size},
+                {max_count, Size + Extra}
+                | Conf0
+            ],
+            fun() ->
+                MaxCount = Size + Extra,
+                ?assert(
+                    lists:all(
+                        fun(Res) -> is_pid(Res) end,
+                        take_n(?FUNCTION_NAME, 0, Size)
+                    )
+                ),
+                %% Fixed pool is fully utilized up to init_count
+                pool_is_utilized(?FUNCTION_NAME, self(), Size),
+                %% Take all dynamic workers
+                ?assert(
+                    lists:all(
+                        fun(Res) -> is_pid(Res) end,
+                        take_n(?FUNCTION_NAME, 1000, Extra)
+                    )
+                ),
+                %% Pool is fully utilized now
+                ?assertEqual(error_no_members, pooler:take_member(?FUNCTION_NAME, 10)),
+                %% Dynamic pool is fully utilized up to max_count
+                pool_is_utilized(?FUNCTION_NAME, self(), MaxCount),
+                true
+            end
+        )
+    ).
+
+prop_fixed_take_return(doc) ->
+    "The state of the pool is same before all members are taken and after they are returned".
+
+prop_fixed_take_return() ->
+    Conf0 = [
+        {name, ?FUNCTION_NAME},
+        {start_mfa, {pooled_gs, start_link, [{?FUNCTION_NAME}]}}
+    ],
+    Stats = fun() ->
+        lists:sort([{Pid, State} || {Pid, {_, State, _}} <- pooler:pool_stats(?FUNCTION_NAME)])
+    end,
+    ?FORALL(
+        Size,
+        pos_integer(),
+        with_pool(
+            [
+                {init_count, Size},
+                {max_count, Size}
+                | Conf0
+            ],
+            fun() ->
+                UtilizationBefore = utilization(?FUNCTION_NAME),
+                StatsBefore = Stats(),
+                Taken = take_n(?FUNCTION_NAME, 0, Size),
+                ?assert(lists:all(fun(Res) -> is_pid(Res) end, Taken)),
+                pool_is_utilized(?FUNCTION_NAME, self(), Size),
+                [pooler:return_member(?FUNCTION_NAME, Pid) || Pid <- Taken],
+                pool_is_free(?FUNCTION_NAME, Size),
+                UtilizationAfter = utilization(?FUNCTION_NAME),
+                StatsAfter = Stats(),
+                ?assertEqual(UtilizationBefore, UtilizationAfter),
+                ?assertEqual(StatsBefore, StatsAfter),
+                true
+            end
+        )
+    ).
+
+prop_fixed_take_return_broken(doc) ->
+    "Pool recovers to initial state when all members are returned with 'fail' flag, but workers are replaced".
+
+prop_fixed_take_return_broken() ->
+    Conf0 = [
+        {name, ?FUNCTION_NAME},
+        {start_mfa, {pooled_gs, start_link, [{?FUNCTION_NAME}]}}
+    ],
+    Stats = fun() ->
+        lists:sort([{Pid, State} || {Pid, {_, State, _}} <- pooler:pool_stats(?FUNCTION_NAME)])
+    end,
+    ?FORALL(
+        Size,
+        pos_integer(),
+        with_pool(
+            [
+                {init_count, Size},
+                {max_count, Size}
+                | Conf0
+            ],
+            fun() ->
+                UtilizationBefore = utilization(?FUNCTION_NAME),
+                StatsBefore = Stats(),
+                Taken = take_n(?FUNCTION_NAME, 0, Size),
+                ?assert(lists:all(fun(Res) -> is_pid(Res) end, Taken)),
+                pool_is_utilized(?FUNCTION_NAME, self(), Size),
+                [pooler:return_member(?FUNCTION_NAME, Pid, fail) || Pid <- Taken],
+                %% Since failed workers are replaced asynchronously, we need to wait for pool to recover
+                UtilizationAfter =
+                    wait_for_utilization(
+                        ?FUNCTION_NAME,
+                        5000,
+                        fun(#{free_count := Free, starting_count := Starting}) ->
+                            Free =:= Size andalso Starting =:= 0
+                        end
+                    ),
+                pool_is_free(?FUNCTION_NAME, Size),
+                StatsAfter = Stats(),
+                ?assertEqual(UtilizationBefore, UtilizationAfter),
+                {PidsBefore, StatusBefore} = lists:unzip(StatsBefore),
+                {PidsAfter, StatusAfter} = lists:unzip(StatsAfter),
+                %% all workers have status `free` before and after
+                ?assertEqual(StatusBefore, StatusAfter),
+                %% however, all workers are new processes, none reused
+                ?assertEqual([], ordsets:intersection(ordsets:from_list(PidsBefore), ordsets:from_list(PidsAfter))),
+                true
+            end
+        )
+    ).
+
+prop_fixed_client_died(doc) ->
+    "Pool recovers to initial state when client that have taken processes have died with reason 'normal'".
+
+prop_fixed_client_died() ->
+    Conf0 = [
+        {name, ?FUNCTION_NAME},
+        {start_mfa, {pooled_gs, start_link, [{?FUNCTION_NAME}]}}
+    ],
+    Stats = fun() ->
+        lists:sort([{Pid, State} || {Pid, {_, State, _}} <- pooler:pool_stats(?FUNCTION_NAME)])
+    end,
+    ?FORALL(
+        Size,
+        pos_integer(),
+        with_pool(
+            [
+                {init_count, Size},
+                {max_count, Size}
+                | Conf0
+            ],
+            fun() ->
+                Main = self(),
+                UtilizationBefore = utilization(?FUNCTION_NAME),
+                StatsBefore = Stats(),
+                {Pid, MRef} =
+                    erlang:spawn_monitor(
+                        fun() ->
+                            Taken = take_n(?FUNCTION_NAME, 0, Size),
+                            ?assert(lists:all(fun(Res) -> is_pid(Res) end, Taken)),
+                            Main ! {taken, self()},
+                            receive
+                                {finish, Main} -> ok
+                            after 5000 ->
+                                exit(timeout)
+                            end,
+                            exit(normal)
+                        end
+                    ),
+                %% Wait for spawned client to take all workers
+                receive
+                    {taken, Pid} -> ok
+                after 5000 ->
+                    error(timeout)
+                end,
+                pool_is_utilized(?FUNCTION_NAME, Pid, Size),
+                %% Wait for the client to die
+                Pid ! {finish, self()},
+                receive
+                    {'DOWN', MRef, process, Pid, normal} ->
+                        ok
+                after 5000 ->
+                    error(timeout)
+                end,
+                %% Since worker monitors are asynchronous, we need to wait for pool to recover
+                UtilizationAfter =
+                    wait_for_utilization(
+                        ?FUNCTION_NAME,
+                        5000,
+                        fun(#{free_count := Free, in_use_count := InUse}) ->
+                            Free =:= Size andalso InUse =:= 0
+                        end
+                    ),
+                pool_is_free(?FUNCTION_NAME, Size),
+                StatsAfter = Stats(),
+                ?assertEqual(UtilizationBefore, UtilizationAfter),
+                ?assertEqual(StatsBefore, StatsAfter),
+                true
+            end
+        )
+    ).
+
+prop_group_take_return(doc) ->
+    "Take all workers from all group members - no more workers can be taken. Return them - pools are free.".
+
+prop_group_take_return() ->
+    Conf0 = [
+        {start_mfa, {pooled_gs, start_link, [{?FUNCTION_NAME}]}}
+    ],
+    PoolName = fun(I) -> list_to_atom(atom_to_list(?FUNCTION_NAME) ++ integer_to_list(I)) end,
+    ?FORALL(
+        {NumWorkers, NumPools},
+        {pos_integer(), pos_integer()},
+        begin
+            with_pools(
+                [
+                    [
+                        {name, PoolName(I)},
+                        {init_count, NumWorkers},
+                        {max_count, NumWorkers},
+                        {group, ?FUNCTION_NAME}
+                        | Conf0
+                    ]
+                 || I <- lists:seq(1, NumPools)
+                ],
+                fun() ->
+                    Client = self(),
+                    %% Group registration is asynchronous, so, need to wait for it to happen
+                    GroupPoolPids = wait_for_group_size(?FUNCTION_NAME, NumPools, 5000),
+                    %% All pools are members of the group
+                    ?assertEqual(NumPools, length(GroupPoolPids)),
+                    %% It's possible to take all workers from all members of a group
+                    Taken = group_take_n(?FUNCTION_NAME, NumWorkers * NumPools),
+                    ?assert(lists:all(fun(Res) -> is_pid(Res) end, Taken)),
+                    %% All pools are saturated
+                    ?assertEqual(error_no_members, pooler:take_group_member(?FUNCTION_NAME)),
+                    %% All pools are utilized
+                    lists:foreach(
+                        fun(Pool) -> pool_is_utilized(Pool, Client, NumWorkers) end,
+                        GroupPoolPids
+                    ),
+                    %% Now return all the workers
+                    [ok = pooler:return_group_member(?FUNCTION_NAME, Pid) || Pid <- Taken],
+                    %% All pools are free
+                    lists:foreach(
+                        fun(Pool) -> pool_is_free(Pool, NumWorkers) end,
+                        GroupPoolPids
+                    ),
+                    true
+                end
+            )
+        end
+    ).
+
+%% Helpers
+
+take_n(Pool, Timeout, N) when N > 0 ->
+    [pooler:take_member(Pool, Timeout) | take_n(Pool, Timeout, N - 1)];
+take_n(_Pool, _Timeout, 0) ->
+    [].
+
+group_take_n(Group, N) when N > 0 ->
+    [pooler:take_group_member(Group) | group_take_n(Group, N - 1)];
+group_take_n(_Group, 0) ->
+    [].
+
+with_pool(Conf, Fun) ->
+    with_pools([Conf], Fun).
+
+with_pools(Confs, Fun) ->
+    pg_start(),
+    %% Disable SASL logs
+    logger:set_handler_config(default, filters, []),
+    try
+        {ok, _} = application:ensure_all_started(pooler),
+        [{ok, _} = pooler:new_pool(Conf) || Conf <- Confs],
+        Res = Fun(),
+        [ok = pooler:rm_pool(proplists:get_value(name, Conf)) || Conf <- Confs],
+        Res
+    after
+        application:stop(pooler)
+    end.
+
+wait_for_utilization(Pool, Timeout, Fun) when Timeout > 0 ->
+    Utilization = utilization(Pool),
+    case Fun(Utilization) of
+        true ->
+            Utilization;
+        false ->
+            timer:sleep(50),
+            wait_for_utilization(Pool, Timeout - 50, Fun)
+    end;
+wait_for_utilization(_, _, _) ->
+    error(timeout).
+
+wait_for_group_size(GroupName, Size, Timeout) when Timeout > 0 ->
+    Pools = pooler:group_pools(GroupName),
+    case length(Pools) of
+        Size ->
+            Pools;
+        Larger when Larger > Size ->
+            error(group_size_exceeded);
+        Smaller when Smaller < Size ->
+            timer:sleep(50),
+            wait_for_group_size(GroupName, Size, Timeout - 50)
+    end;
+wait_for_group_size(_, _, _) ->
+    error(timeout).
+
+utilization(Pool) ->
+    maps:from_list(pooler:pool_utilization(Pool)).
+
+pool_is_utilized(Pool, Client, NumWorkers) ->
+    Utilization = utilization(Pool),
+    ?assertMatch(
+        #{
+            in_use_count := NumWorkers,
+            free_count := 0,
+            queued_count := 0
+        },
+        Utilization
+    ),
+    %% All members are taken by Client
+    ?assert(
+        lists:all(
+            fun({_, {_, State, _}}) -> State =:= Client end,
+            pooler:pool_stats(Pool)
+        )
+    ),
+    true.
+
+pool_is_free(Pool, NumWorkers) ->
+    Utilization = utilization(Pool),
+    ?assertMatch(
+        #{
+            in_use_count := 0,
+            free_count := NumWorkers,
+            queued_count := 0
+        },
+        Utilization
+    ),
+    %% All members are free
+    ?assert(
+        lists:all(
+            fun({_, {_, State, _}}) -> State =:= free end,
+            pooler:pool_stats(Pool)
+        )
+    ),
+    true.
+
+-if(?OTP_RELEASE >= 23).
+pg_start() ->
+    pg:start(pg).
+-else.
+pg_start() ->
+    pg2:start().
+-endif.