Browse Source

Merge pull request #97 from seriyps/live-reconfigure

Add support for reconfiguration without restart
Sergey Prokhorov 1 year ago
parent
commit
0ead353843
4 changed files with 497 additions and 2 deletions
  1. 15 1
      README.org
  2. 231 0
      src/pooler.erl
  3. 9 1
      src/pooler_app.erl
  4. 242 0
      test/pooler_tests.erl

+ 15 - 1
README.org

@@ -112,7 +112,7 @@ and one pool talking to a Postgresql database:
                       max_count => 10,
                       init_count => 2,
                       start_mfa =>
-                       {my_pg_sql_driver, start_link, ["db_host"]}}
+                       {epgsql, connect, [#{host => "localhost", username => "user", database => "base"}]}}
                    ]}
              %% if you want to enable metrics, set this to a module with
              %% an API conformant to the folsom_metrics module.
@@ -170,6 +170,20 @@ PoolConfig = #{
 },
 pooler:new_pool(PoolConfig).
 #+END_SRC
+*** Dynamic pool reconfiguration
+Pool configuration can be changed in runtime
+
+#+BEGIN_SRC erlang
+pooler:pool_reconfigure(rc8081, PoolConfig#{max_count => 10, init_count => 4}).
+#+END_SRC
+
+It will update the pool's state and will start/stop workers if necessary, join/leave group,
+reschedule the cull timer etc.
+The only parameters that can't be updated are ~name~ and ~start_mfa~.
+
+However, updated configuration won't survive pool crash (it will be restarted with old config by
+supervisor). But this should not normally happen.
+
 *** Using pooler
 
 Here's an example session:

+ 231 - 0
src/pooler.erl

@@ -38,6 +38,7 @@
     manual_start/0,
     new_pool/1,
     pool_child_spec/1,
+    pool_reconfigure/2,
     rm_pool/1,
     rm_group/1,
     call_free_members/2,
@@ -191,6 +192,27 @@
 -type pool_config_legacy() :: [{atom(), any()}].
 %% Can be provided as a proplist, but is not recommended
 
+-type reconfigure_action() ::
+    {start_workers, pos_integer()}
+    | {stop_free_workers, pos_integer()}
+    | {shrink_queue, pos_integer()}
+    | {reset_cull_timer, time_spec()}
+    | {cull, _}
+    | {leave_group, group_name()}
+    | {join_group, group_name()}
+    | {set_parameter,
+        {group, group_name() | undefined}
+        | {init_count, non_neg_integer()}
+        | {max_count, non_neg_integer()}
+        | {cull_interval, time_spec()}
+        | {max_age, time_spec()}
+        | {member_start_timeout, time_spec()}
+        | {queue_max, non_neg_integer()}
+        | {metrics_api, folsom | exometer}
+        | {metrics_mod, module()}
+        | {stop_mfa, {module(), atom(), ['$pooler_pid' | any(), ...]}}
+        | {auto_grow_threshold, non_neg_integer()}}.
+
 -type free_member_info() :: {reference(), free, erlang:timestamp()}.
 -type member_info() :: {reference(), free | pid(), erlang:timestamp()}.
 %% See {@link pool_stats/1}
@@ -359,6 +381,11 @@ rm_group_members(MemberPids) ->
 pool_child_spec(PoolConfig) ->
     pooler_sup:pool_child_spec(config_as_map(PoolConfig)).
 
+%% @doc Updates the pool's state so it starts to behave like it was started with the new configuration without restart
+-spec pool_reconfigure(pool_name() | pid(), pool_config()) -> {ok, [reconfigure_action()]} | {error, any()}.
+pool_reconfigure(Pool, NewConfig) ->
+    gen_server:call(Pool, {reconfigure, NewConfig}).
+
 %% @doc For INTERNAL use. Adds `MemberPid' to the pool.
 -spec accept_member(pool_name(), pooler_starter:start_result()) -> ok.
 accept_member(PoolName, StartResult) ->
@@ -579,6 +606,14 @@ handle_call(dump_pool, _From, Pool) ->
     {reply, to_map(Pool), Pool};
 handle_call({call_free_members, Fun}, _From, #pool{free_pids = Pids} = Pool) ->
     {reply, do_call_free_members(Fun, Pids), Pool};
+handle_call({reconfigure, NewConfig}, _From, Pool) ->
+    case calculate_reconfigure_actions(NewConfig, Pool) of
+        {ok, Actions} = Res ->
+            NewPool = lists:foldl(fun apply_reconfigure_action/2, Pool, Actions),
+            {reply, Res, NewPool};
+        {error, _} = Res ->
+            {reply, Res, Pool}
+    end;
 handle_call(_Request, _From, Pool) ->
     {noreply, Pool}.
 
@@ -1198,6 +1233,196 @@ expired_free_members(Members, Now, MaxAge) ->
         timer:now_diff(Now, LastReturn) >= MaxMicros
     ].
 
+-spec calculate_reconfigure_actions(pool_config(), #pool{}) -> {ok, [reconfigure_action()]} | {error, any()}.
+calculate_reconfigure_actions(
+    #{name := Name, start_mfa := MFA} = NewConfig,
+    #pool{name = PName, start_mfa = PMFA} = Pool
+) when
+    Name =:= PName,
+    MFA =:= PMFA
+->
+    Defaults = #{
+        group => undefined,
+        cull_interval => ?DEFAULT_CULL_INTERVAL,
+        max_age => ?DEFAULT_MAX_AGE,
+        member_start_timeout => ?DEFAULT_MEMBER_START_TIMEOUT,
+        auto_grow_threshold => ?DEFAULT_AUTO_GROW_THRESHOLD,
+        stop_mfa => ?DEFAULT_STOP_MFA,
+        metrics_mod => pooler_no_metrics,
+        metrics_api => folsom,
+        queue_max => ?DEFAULT_POOLER_QUEUE_MAX
+    },
+    NewWithDefaults = maps:merge(Defaults, NewConfig),
+    try
+        lists:flatmap(
+            fun(Param) ->
+                mk_rec_action(Param, maps:get(Param, NewWithDefaults), NewConfig, Pool)
+            end,
+            [
+                group,
+                init_count,
+                max_count,
+                cull_interval,
+                max_age,
+                member_start_timeout,
+                queue_max,
+                metrics_api,
+                metrics_mod,
+                stop_mfa,
+                auto_grow_threshold
+            ]
+        )
+    of
+        Actions ->
+            {ok, Actions}
+    catch
+        throw:{error, _} = Err ->
+            Err
+    end;
+calculate_reconfigure_actions(_, _) ->
+    {error, changed_unsupported_parameter}.
+
+mk_rec_action(group, New, _, #pool{group = Old}) when New =/= Old ->
+    [{set_parameter, {group, New}}] ++
+        case Old of
+            undefined -> [];
+            _ -> [{leave_group, Old}]
+        end ++
+        case New of
+            undefined -> [];
+            _ -> [{join_group, New}]
+        end;
+mk_rec_action(init_count, NewInitCount, _, #pool{init_count = OldInitCount, in_use_count = InUse, free_count = Free}) when
+    NewInitCount > OldInitCount
+->
+    AliveCount = InUse + Free,
+    [
+        {set_parameter, {init_count, NewInitCount}}
+        | case AliveCount < NewInitCount of
+            true ->
+                [{start_workers, NewInitCount - AliveCount}];
+            false ->
+                []
+        end
+    ];
+mk_rec_action(init_count, NewInitCount, _, #pool{init_count = OldInitCount}) when NewInitCount < OldInitCount ->
+    [{set_parameter, {init_count, NewInitCount}}];
+mk_rec_action(max_count, NewMaxCount, _, #pool{max_count = OldMaxCount, in_use_count = InUse, free_count = Free}) when
+    NewMaxCount < OldMaxCount
+->
+    AliveCount = InUse + Free,
+    [
+        {set_parameter, {max_count, NewMaxCount}}
+        | case AliveCount > NewMaxCount of
+            true when Free >= (AliveCount - NewMaxCount) ->
+                %% We have enough free workers to shut down
+                [{stop_free_workers, AliveCount - NewMaxCount}];
+            true ->
+                %% We don't have enough free workers to shutdown
+                throw({error, {max_count, not_enough_free_workers_to_shutdown}});
+            false ->
+                []
+        end
+    ];
+mk_rec_action(max_count, NewMaxCount, _, #pool{max_count = OldMaxCount}) when NewMaxCount > OldMaxCount ->
+    [{set_parameter, {max_count, NewMaxCount}}];
+mk_rec_action(cull_interval, New, _, #pool{cull_interval = Old, cull_timer = _Timer}) when New =/= Old ->
+    [
+        {set_parameter, {cull_interval, New}}
+        | case time_as_millis(New) < time_as_millis(Old) of
+            true ->
+                [{reset_cull_timer, New}];
+            false ->
+                []
+        end
+    ];
+mk_rec_action(max_age, New, _, #pool{max_age = Old}) when New =/= Old ->
+    [
+        {set_parameter, {max_age, New}}
+        | case time_as_millis(New) < time_as_millis(Old) of
+            true ->
+                [{cull, []}];
+            false ->
+                []
+        end
+    ];
+mk_rec_action(member_start_timeout = P, New, _, #pool{member_start_timeout = Old}) when New =/= Old ->
+    [{set_parameter, {P, New}}];
+mk_rec_action(queue_max = P, New, _, #pool{queue_max = Old, queued_requestors = Queue}) when New < Old ->
+    QLen = queue:len(Queue),
+    [
+        {set_parameter, {P, New}}
+        | case QLen > New of
+            true ->
+                [{shrink_queue, QLen - New}];
+            false ->
+                []
+        end
+    ];
+mk_rec_action(queue_max = P, New, _, #pool{queue_max = Old}) when New > Old ->
+    [{set_parameter, {P, New}}];
+mk_rec_action(metrics_api = P, New, _, #pool{metrics_api = Old}) when New =/= Old ->
+    [{set_parameter, {P, New}}];
+mk_rec_action(metrics_mod = P, New, _, #pool{metrics_mod = Old}) when New =/= Old ->
+    [{set_parameter, {P, New}}];
+mk_rec_action(stop_mfa = P, New, _, #pool{stop_mfa = Old}) when New =/= Old ->
+    [{set_parameter, {P, New}}];
+mk_rec_action(auto_grow_threshold = P, New, _, #pool{auto_grow_threshold = Old}) when New =/= Old ->
+    [{set_parameter, {P, New}}];
+mk_rec_action(_Param, _NewVal, _, _Pool) ->
+    %% not changed
+    [].
+
+-spec apply_reconfigure_action(reconfigure_action(), #pool{}) -> #pool{}.
+apply_reconfigure_action({set_parameter, {Name, Value}}, Pool) ->
+    set_parameter(Name, Value, Pool);
+apply_reconfigure_action({start_workers, Count}, Pool) ->
+    add_members_async(Count, Pool);
+apply_reconfigure_action({stop_free_workers, Count}, #pool{free_pids = Free} = Pool) ->
+    lists:foldl(fun remove_pid/2, Pool, lists:sublist(Free, Count));
+apply_reconfigure_action({shrink_queue, Count}, #pool{queued_requestors = Q} = Pool) ->
+    {ToShrink, ToKeep} = lists:split(Count, queue:to_list(Q)),
+    [gen_server:reply(From, error_no_members) || {From, _TRef} <- ToShrink],
+    Pool#pool{queued_requestors = queue:from_list(ToKeep)};
+apply_reconfigure_action({reset_cull_timer, Interval}, #pool{cull_timer = TRef} = Pool) ->
+    case is_reference(TRef) of
+        true -> erlang:cancel_timer(TRef);
+        false -> noop
+    end,
+    Pool#pool{cull_timer = schedule_cull(self(), Interval)};
+apply_reconfigure_action({cull, _}, Pool) ->
+    cull_members_from_pool(Pool);
+apply_reconfigure_action({join_group, Group}, Pool) ->
+    ok = pg_create(Group),
+    ok = pg_join(Group, self()),
+    Pool;
+apply_reconfigure_action({leave_group, Group}, Pool) ->
+    ok = pg_leave(Group, self()),
+    Pool.
+
+set_parameter(group, Value, Pool) ->
+    Pool#pool{group = Value};
+set_parameter(init_count, Value, Pool) ->
+    Pool#pool{init_count = Value};
+set_parameter(max_count, Value, Pool) ->
+    Pool#pool{max_count = Value};
+set_parameter(cull_interval, Value, Pool) ->
+    Pool#pool{cull_interval = Value};
+set_parameter(max_age, Value, Pool) ->
+    Pool#pool{max_age = Value};
+set_parameter(member_start_timeout, Value, Pool) ->
+    Pool#pool{member_start_timeout = Value};
+set_parameter(queue_max, Value, Pool) ->
+    Pool#pool{queue_max = Value};
+set_parameter(metrics_api, Value, Pool) ->
+    Pool#pool{metrics_api = Value};
+set_parameter(metrics_mod, Value, Pool) ->
+    Pool#pool{metrics_mod = Value};
+set_parameter(stop_mfa, Value, Pool) ->
+    Pool#pool{stop_mfa = Value};
+set_parameter(auto_grow_threshold, Value, Pool) ->
+    Pool#pool{auto_grow_threshold = Value}.
+
 %% Send a metric using the metrics module from application config or
 %% do nothing.
 -spec send_metric(
@@ -1410,6 +1635,9 @@ pg_create(_Group) ->
 pg_join(Group, Pid) ->
     pg:join(Group, Pid).
 
+pg_leave(Group, Pid) ->
+    pg:leave(Group, Pid).
+
 -else.
 
 pg_get_local_members(GroupName) ->
@@ -1427,4 +1655,7 @@ pg_create(Group) ->
 pg_join(Group, Pid) ->
     pg2:join(Group, Pid).
 
+pg_leave(Group, Pid) ->
+    pg2:leave(Group, Pid).
+
 -endif.

+ 9 - 1
src/pooler_app.erl

@@ -3,7 +3,7 @@
 -behaviour(application).
 
 %% Application callbacks
--export([start/2, stop/1]).
+-export([start/2, stop/1, config_change/3]).
 
 %% ===================================================================
 %% Application callbacks
@@ -14,3 +14,11 @@ start(_StartType, _StartArgs) ->
 
 stop(_State) ->
     ok.
+
+config_change(_Changed, _New, _Removed) ->
+    %% TODO: implement.
+    %% Only 3 keys are in use right now:
+    %% * pools (would require a custom diff function)
+    %% * metrics_module
+    %% * metrics_api
+    ok.

+ 242 - 0
test/pooler_tests.erl

@@ -1286,6 +1286,248 @@ no_error_logger_reports_after_culling_test_() ->
             end}
         ]}.
 
+reconfigure_test_() ->
+    Name = test_pool_1,
+    InitCount = 2,
+    MaxCount = 4,
+    StartConfig = #{
+        name => Name,
+        max_count => MaxCount,
+        init_count => InitCount,
+        start_mfa => {pooled_gs, start_link, [{reconfigure_test}]}
+    },
+    {foreach,
+        fun() ->
+            logger:set_handler_config(default, filters, []),
+            application:set_env(pooler, pools, [StartConfig]),
+            application:set_env(pooler, metrics_module, pooler_no_metrics),
+            application:start(pooler)
+        end,
+        fun(_) ->
+            application:unset_env(pooler, pools),
+            application:stop(pooler)
+        end,
+        [
+            {"Raise init_count", fun() ->
+                Config1 = StartConfig#{init_count => 3},
+                ?assertEqual(
+                    {ok, [
+                        {set_parameter, {init_count, 3}},
+                        {start_workers, 1}
+                    ]},
+                    pooler:pool_reconfigure(Name, Config1)
+                ),
+                ?assertMatch(
+                    #{
+                        init_count := 3,
+                        free_count := 3,
+                        free_pids := [_, _, _]
+                    },
+                    wait_for_dump(Name, 5000, fun(#{free_count := C}) -> C =:= 3 end)
+                )
+            end},
+            {"Lower max_count", fun() ->
+                Config1 = StartConfig#{max_count => 1, init_count => 1},
+                ?assertEqual(
+                    {ok, [
+                        {set_parameter, {init_count, 1}},
+                        {set_parameter, {max_count, 1}},
+                        {stop_free_workers, 1}
+                    ]},
+                    pooler:pool_reconfigure(Name, Config1)
+                ),
+                ?assertMatch(
+                    #{
+                        init_count := 1,
+                        max_count := 1,
+                        free_count := 1,
+                        free_pids := [_]
+                    },
+                    wait_for_dump(Name, 5000, fun(#{free_count := C}) -> C =:= 1 end)
+                )
+            end},
+            {"Lower queue_max", fun() ->
+                NewQMax = 4,
+                NewConfig = StartConfig#{queue_max => NewQMax},
+                Parent = self(),
+                NumClients = 10,
+                _Clients = lists:map(
+                    fun(_) ->
+                        spawn_link(
+                            fun() ->
+                                Parent ! pooler:take_member(Name, 60000),
+                                timer:sleep(60000)
+                            end
+                        )
+                    end,
+                    lists:seq(1, NumClients)
+                ),
+                timer:sleep(100),
+                % 6
+                QueueSize = NumClients - MaxCount,
+                %2
+                Shrink = QueueSize - NewQMax,
+                ?assertEqual(
+                    {ok, [
+                        {set_parameter, {queue_max, NewQMax}},
+                        {shrink_queue, Shrink}
+                    ]},
+                    pooler:pool_reconfigure(Name, NewConfig)
+                ),
+                ?assertMatch(
+                    [
+                        W1,
+                        W2,
+                        W3,
+                        W4,
+                        error_no_members,
+                        error_no_members
+                    ] when
+                        is_pid(W1) andalso
+                            is_pid(W2) andalso
+                            is_pid(W3) andalso
+                            is_pid(W4),
+                    [
+                        receive
+                            M -> M
+                        after 5000 -> error(timeout)
+                        end
+                     || _ <- lists:seq(1, MaxCount + Shrink)
+                    ]
+                ),
+                #{
+                    queue_max := QMax,
+                    queued_requestors := Q
+                } = gen_server:call(Name, dump_pool),
+                % queue_max in the state is updated
+                ?assertEqual(QMax, NewQMax),
+                % queue is full
+                ?assertEqual(NewQMax, queue:len(Q))
+            end},
+            {"Lower cull_interval", fun() ->
+                NewConfig = StartConfig#{cull_interval => {10, sec}},
+                ?assertEqual(
+                    {ok, [
+                        {set_parameter, {cull_interval, {10, sec}}},
+                        {reset_cull_timer, {10, sec}}
+                    ]},
+                    pooler:pool_reconfigure(Name, NewConfig)
+                )
+            end},
+            {"Lower max_age", fun() ->
+                NewConfig = StartConfig#{max_age => {100, ms}},
+                Workers = [pooler:take_member(Name, 5000) || _ <- lists:seq(1, MaxCount)],
+                [pooler:return_member(Name, Pid) || Pid <- Workers],
+                ?assertEqual(
+                    {ok, [
+                        {set_parameter, {max_age, {100, ms}}},
+                        {cull, []}
+                    ]},
+                    pooler:pool_reconfigure(Name, NewConfig)
+                ),
+                %% make sure workers are culled
+                wait_for_dump(
+                    Name,
+                    1000,
+                    fun(#{free_count := Free}) ->
+                        Name ! cull_pool,
+                        Free =:= InitCount
+                    end
+                )
+            end},
+            {"Update group", fun() ->
+                NewConfig1 = StartConfig#{group => my_group1},
+                ?assertEqual(
+                    {ok, [
+                        {set_parameter, {group, my_group1}},
+                        {join_group, my_group1}
+                    ]},
+                    pooler:pool_reconfigure(Name, NewConfig1)
+                ),
+                PoolPid = whereis(Name),
+                ?assertMatch([PoolPid], pooler:group_pools(my_group1)),
+                NewConfig2 = StartConfig#{group => my_group2},
+                ?assertEqual(
+                    {ok, [
+                        {set_parameter, {group, my_group2}},
+                        {leave_group, my_group1},
+                        {join_group, my_group2}
+                    ]},
+                    pooler:pool_reconfigure(Name, NewConfig2)
+                ),
+                ?assertMatch([], pooler:group_pools(my_group1)),
+                ?assertMatch([PoolPid], pooler:group_pools(my_group2)),
+                ?assertEqual(
+                    {ok, [
+                        {set_parameter, {group, undefined}},
+                        {leave_group, my_group2}
+                    ]},
+                    pooler:pool_reconfigure(Name, StartConfig)
+                ),
+                ?assertMatch([], pooler:group_pools(my_group1)),
+                ?assertMatch([], pooler:group_pools(my_group2))
+            end},
+            {"Change basic configs", fun() ->
+                NewMaxCount = MaxCount + 5,
+                NewConfig = StartConfig#{
+                    max_count => NewMaxCount,
+                    member_start_timeout => {10, sec},
+                    queue_max => 100,
+                    metrics_mod => fake_metrics,
+                    stop_mfa => {erlang, exit, ['$pooler_pid', shutdown]},
+                    auto_grow_threshold => 1
+                },
+                ?assertEqual(
+                    {ok, [
+                        {set_parameter, {max_count, NewMaxCount}},
+                        {set_parameter, {member_start_timeout, {10, sec}}},
+                        {set_parameter, {queue_max, 100}},
+                        {set_parameter, {metrics_mod, fake_metrics}},
+                        {set_parameter, {stop_mfa, {erlang, exit, ['$pooler_pid', shutdown]}}},
+                        {set_parameter, {auto_grow_threshold, 1}}
+                    ]},
+                    pooler:pool_reconfigure(Name, NewConfig)
+                ),
+                ?assertMatch(
+                    #{
+                        max_count := NewMaxCount,
+                        member_start_timeout := {10, sec},
+                        queue_max := 100,
+                        metrics_mod := fake_metrics,
+                        stop_mfa := {erlang, exit, ['$pooler_pid', shutdown]},
+                        auto_grow_threshold := 1
+                    },
+                    gen_server:call(Name, dump_pool)
+                )
+            end},
+            {"Update failed", fun() ->
+                ?assertEqual(
+                    {error, changed_unsupported_parameter},
+                    pooler:pool_reconfigure(
+                        Name, StartConfig#{start_mfa := {erlang, spawn, [a, b, []]}}
+                    )
+                ),
+                ?assertEqual(
+                    {error, changed_unsupported_parameter},
+                    pooler:pool_reconfigure(
+                        Name, StartConfig#{name := not_a_pool_name}
+                    )
+                )
+            end}
+        ]}.
+
+wait_for_dump(Pool, Timeout, Fun) when Timeout > 0 ->
+    Dump = gen_server:call(Pool, dump_pool),
+    case Fun(Dump) of
+        true ->
+            Dump;
+        false ->
+            timer:sleep(50),
+            wait_for_dump(Pool, Timeout - 50, Fun)
+    end;
+wait_for_dump(_, _, _) ->
+    error(timeout).
+
 monitor_members_trigger_culling_and_return_reason() ->
     Pids = get_n_pids(test_pool_1, 3, []),
     [erlang:monitor(process, P) || P <- Pids],