Browse Source

Add option `stop_mfa`

Juan Puig 10 years ago
parent
commit
8dc7020e15
4 changed files with 75 additions and 3 deletions
  1. 24 3
      src/pooler.erl
  2. 6 0
      src/pooler.hrl
  3. 1 0
      src/pooler_config.erl
  4. 44 0
      test/pooler_tests.erl

+ 24 - 3
src/pooler.erl

@@ -689,7 +689,8 @@ cpmap_remove(Pid, CPid, CPMap) ->
 remove_pid(Pid, Pool) ->
     #pool{name = PoolName,
           all_members = AllMembers,
-          consumer_to_pid = CPMap} = Pool,
+          consumer_to_pid = CPMap,
+          stop_mfa = StopMFA} = Pool,
     case dict:find(Pid, AllMembers) of
         {ok, {MRef, free, _Time}} ->
             % remove an unused member
@@ -697,7 +698,7 @@ remove_pid(Pid, Pool) ->
             FreePids = lists:delete(Pid, Pool#pool.free_pids),
             NumFree = Pool#pool.free_count - 1,
             Pool1 = Pool#pool{free_pids = FreePids, free_count = NumFree},
-            exit(Pid, kill),
+            terminate_pid(Pid, StopMFA),
             send_metric(Pool1, killed_free_count, {inc, 1}, counter),
             Pool1#pool{all_members = dict:erase(Pid, AllMembers)};
         {ok, {MRef, CPid, _Time}} ->
@@ -705,7 +706,7 @@ remove_pid(Pid, Pool) ->
             %% the consumer.
             erlang:demonitor(MRef, [flush]),
             Pool1 = Pool#pool{in_use_count = Pool#pool.in_use_count - 1},
-            exit(Pid, kill),
+            terminate_pid(Pid, StopMFA),
             send_metric(Pool1, killed_in_use_count, {inc, 1}, counter),
             Pool1#pool{consumer_to_pid = cpmap_remove(Pid, CPid, CPMap),
                        all_members = dict:erase(Pid, AllMembers)};
@@ -879,3 +880,23 @@ maybe_reply({Member, NewPool}) ->
         Member when is_pid(Member) ->
             {reply, Member, NewPool}
     end.
+
+%% Implementation of a best-effort termination for a pool member:
+%% Terminates the pid's pool member given a MFA that gets applied. The list
+%% of arguments must contain the fixed atom ?POOLER_PID, which is replaced
+%% by the target pid. Failure to provide a valid MFA will lead to use the
+%% default callback, i.e `erlang:exit(Pid, kill)`.
+-spec terminate_pid(pid(), {atom(), atom(), [term()]}) -> ok.
+terminate_pid(Pid, {Mod, Fun, Args}) when is_list(Args) ->
+    NewArgs = [case Arg of
+                   ?POOLER_PID -> Pid;
+                   _ -> Arg
+               end || Arg <- Args],
+    case catch erlang:apply(Mod, Fun, NewArgs) of
+        {'EXIT', _} ->
+            terminate_pid(Pid, ?DEFAULT_STOP_MFA);
+        _Result ->
+            ok
+    end;
+terminate_pid(Pid, _) ->
+    terminate_pid(Pid, ?DEFAULT_STOP_MFA).

+ 6 - 0
src/pooler.hrl

@@ -5,6 +5,8 @@
 -define(DEFAULT_AUTO_GROW_THRESHOLD, undefined).
 -define(POOLER_GROUP_TABLE, pooler_group_table).
 -define(DEFAULT_POOLER_QUEUE_MAX, 50).
+-define(POOLER_PID, '$pooler_pid').
+-define(DEFAULT_STOP_MFA, {erlang, exit, [?POOLER_PID, kill]}).
 
 -type member_info() :: {string(), free | pid(), {_, _, _}}.
 -type free_member_info() :: {string(), free, {_, _, _}}.
@@ -80,6 +82,10 @@
           %% behavior (start members before they're actually needed).
           auto_grow_threshold = ?DEFAULT_AUTO_GROW_THRESHOLD :: undefined | non_neg_integer(),
 
+          %% Stop callback to gracefully attempt to terminate pool members.
+          %% The list of arguments must contain the fixed atom '$pooler_pid'.
+          stop_mfa = ?DEFAULT_STOP_MFA :: {atom(), atom(), [term()]},
+
           %% The module to use for collecting metrics. If set to
           %% 'pooler_no_metrics', then metric sending calls do
           %% nothing. A typical value to actually capture metrics is

+ 1 - 0
src/pooler_config.erl

@@ -21,6 +21,7 @@ list_to_pool(P) ->
        max_age           = ?gv(max_age, P, ?DEFAULT_MAX_AGE),
        member_start_timeout = ?gv(member_start_timeout, P, ?DEFAULT_MEMBER_START_TIMEOUT),
        auto_grow_threshold = ?gv(auto_grow_threshold, P, ?DEFAULT_AUTO_GROW_THRESHOLD),
+       stop_mfa          = ?gv(stop_mfa, P, ?DEFAULT_STOP_MFA),
        metrics_mod       = ?gv(metrics_mod, P, pooler_no_metrics),
        metrics_api       = ?gv(metrics_api, P, folsom),
        queue_max         = ?gv(queue_max, P, ?DEFAULT_POOLER_QUEUE_MAX)}.

+ 44 - 0
test/pooler_tests.erl

@@ -1060,6 +1060,50 @@ pooler_auto_grow_enabled_test_() ->
        end}
      ]}}.
 
+pooler_custom_stop_mfa_test_() ->
+    {foreach,
+     fun() ->
+             Pool = [{name, test_pool_1},
+                     {max_count, 3},
+                     {init_count, 2},
+                     {cull_interval, {200, ms}},
+                     {max_age, {0, min}},
+                     {start_mfa, {pooled_gs, start_link, [{foo_type}]}}],
+             application:set_env(pooler, pools, [Pool])
+     end,
+     fun(_) ->
+             application:unset_env(pooler, pools)
+     end,
+     [
+      {"default behavior kills the pool member",
+       fun() ->
+               ok = application:start(pooler),
+               Reason = monitor_members_trigger_culling_and_return_reason(),
+               ok = application:stop(pooler),
+               ?assertEqual(killed, Reason)
+       end},
+      {"custom callback terminates the pool member normally",
+       fun() ->
+               {ok, [Pool]} = application:get_env(pooler, pools),
+               Stop = {stop_mfa, {pooled_gs, stop, ['$pooler_pid']}},
+               application:set_env(pooler, pools, [[Stop | Pool]]),
+               ok = application:start(pooler),
+               Reason = monitor_members_trigger_culling_and_return_reason(),
+               ok = application:stop(pooler),
+               ?assertEqual(normal, Reason)
+       end}]}.
+
+monitor_members_trigger_culling_and_return_reason() ->
+    Pids = get_n_pids(test_pool_1, 3, []),
+    [ erlang:monitor(process, P) || P <- Pids ],
+    [ pooler:return_member(test_pool_1, P) || P <- Pids ],
+    receive
+        {'DOWN', _Ref, process, _Pid, Reason} ->
+            Reason
+    after
+        250 -> timeout
+    end.
+
 time_as_millis_test_() ->
     Zeros = [ {{0, U}, 0} || U <- [min, sec, ms, mu] ],
     Ones = [{{1, min}, 60000},