Browse Source

Merge pull request #66 from sata/fix_default_terminate

Terminating pooled children through supervisor
Seth Falcon 8 years ago
parent
commit
521f568bf9
6 changed files with 161 additions and 13 deletions
  1. 17 10
      src/pooler.erl
  2. 4 1
      src/pooler.hrl
  3. 5 2
      src/pooler_pool_sup.erl
  4. 65 0
      test/error_logger_mon.erl
  5. 25 0
      test/error_logger_pooler_h.erl
  6. 45 0
      test/pooler_tests.erl

+ 17 - 10
src/pooler.erl

@@ -729,7 +729,7 @@ remove_pid(Pid, Pool) ->
             FreePids = lists:delete(Pid, Pool#pool.free_pids),
             NumFree = Pool#pool.free_count - 1,
             Pool1 = Pool#pool{free_pids = FreePids, free_count = NumFree},
-            terminate_pid(Pid, StopMFA),
+            terminate_pid(PoolName, Pid, StopMFA),
             send_metric(Pool1, killed_free_count, {inc, 1}, counter),
             Pool1#pool{all_members = dict:erase(Pid, AllMembers)};
         {ok, {MRef, CPid, _Time}} ->
@@ -737,7 +737,7 @@ remove_pid(Pid, Pool) ->
             %% the consumer.
             erlang:demonitor(MRef, [flush]),
             Pool1 = Pool#pool{in_use_count = Pool#pool.in_use_count - 1},
-            terminate_pid(Pid, StopMFA),
+            terminate_pid(PoolName, Pid, StopMFA),
             send_metric(Pool1, killed_in_use_count, {inc, 1}, counter),
             Pool1#pool{consumer_to_pid = cpmap_remove(Pid, CPid, CPMap),
                        all_members = dict:erase(Pid, AllMembers)};
@@ -911,20 +911,27 @@ maybe_reply({Member, NewPool}) ->
 %% Terminates the pid's pool member given a MFA that gets applied. The list
 %% of arguments must contain the fixed atom ?POOLER_PID, which is replaced
 %% by the target pid. Failure to provide a valid MFA will lead to use the
-%% default callback, i.e `erlang:exit(Pid, kill)`.
--spec terminate_pid(pid(), {atom(), atom(), [term()]}) -> ok.
-terminate_pid(Pid, {Mod, Fun, Args}) when is_list(Args) ->
-    NewArgs = [case Arg of
-                   ?POOLER_PID -> Pid;
-                   _ -> Arg
-               end || Arg <- Args],
+%% default callback.
+-spec terminate_pid(atom(), pid(), {atom(), atom(), [term()]}) -> ok.
+terminate_pid(PoolName, Pid, {Mod, Fun, Args}) when is_list(Args) ->
+    NewArgs = replace_placeholders(PoolName, Pid, Args),
     case catch erlang:apply(Mod, Fun, NewArgs) of
         {'EXIT', _} ->
-            terminate_pid(Pid, ?DEFAULT_STOP_MFA);
+            terminate_pid(PoolName, Pid, ?DEFAULT_STOP_MFA);
         _Result ->
             ok
     end.
 
+replace_placeholders(Name, Pid, Args) ->
+  [case Arg of
+     ?POOLER_POOL_NAME ->
+       pooler_pool_sup:build_member_sup_name(Name);
+     ?POOLER_PID ->
+       Pid;
+     _ ->
+       Arg
+   end || Arg <- Args].
+
 do_call_free_members(Fun, Pids) ->
     [do_call_free_member(Fun, P) || P <- Pids].
 

+ 4 - 1
src/pooler.hrl

@@ -5,8 +5,11 @@
 -define(DEFAULT_AUTO_GROW_THRESHOLD, undefined).
 -define(POOLER_GROUP_TABLE, pooler_group_table).
 -define(DEFAULT_POOLER_QUEUE_MAX, 50).
+-define(POOLER_POOL_NAME, '$pooler_pool_name').
 -define(POOLER_PID, '$pooler_pid').
--define(DEFAULT_STOP_MFA, {erlang, exit, [?POOLER_PID, kill]}).
+-define(DEFAULT_STOP_MFA, {supervisor,
+                           terminate_child,
+                           [?POOLER_POOL_NAME, ?POOLER_PID]}).
 
 -type member_info() :: {string(), free | pid(), {_, _, _}}.
 -type free_member_info() :: {string(), free, {_, _, _}}.

+ 5 - 2
src/pooler_pool_sup.erl

@@ -4,7 +4,8 @@
 
 -export([start_link/1, init/1,
          pool_sup_name/1,
-         member_sup_name/1]).
+         member_sup_name/1,
+         build_member_sup_name/1]).
 
 -include("pooler.hrl").
 
@@ -25,8 +26,10 @@ init(#pool{} = Pool) ->
     Restart = {one_for_all, 5, 60},
     {ok, {Restart, [MemberSupSpec, PoolerSpec]}}.
 
-
 member_sup_name(#pool{name = PoolName}) ->
+    build_member_sup_name(PoolName).
+
+build_member_sup_name(PoolName) ->
     list_to_atom("pooler_" ++ atom_to_list(PoolName) ++ "_member_sup").
 
 pool_sup_name(#pool{name = PoolName}) ->

+ 65 - 0
test/error_logger_mon.erl

@@ -0,0 +1,65 @@
+%%% A gen_server to check if we get any error_logger messages during test to see if
+%%% any messages gets generated when they shouldn't
+
+-module(error_logger_mon).
+
+-behaviour(gen_server).
+-define(SERVER, ?MODULE).
+
+-record(state, {count = 0 :: integer()}).
+
+%% ------------------------------------------------------------------
+%% API Function Exports
+%% ------------------------------------------------------------------
+%% gen_server
+-export([start_link/0,
+         report/0,
+         get_msg_count/0,
+         stop/0
+        ]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+%% ------------------------------------------------------------------
+%% API Function Definitions
+%% ------------------------------------------------------------------
+
+start_link() ->
+    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+report() ->
+    gen_server:call(?SERVER, report).
+
+get_msg_count() ->
+    gen_server:call(?SERVER, get_count).
+
+stop() ->
+    gen_server:call(?SERVER, stop).
+
+%% ------------------------------------------------------------------
+%% gen_server Function Definitions
+%% ------------------------------------------------------------------
+init([]) ->
+    {ok, #state{}}.
+
+handle_call(get_count, _From, #state{count = C} = State) ->
+    {reply, C, State};
+handle_call(report, _From, #state{count = C} = State) ->
+    {reply, ok, State#state{count = C+1}};
+handle_call(stop, _From, State) ->
+    {stop, normal, ok, State};
+handle_call(_Request, _From, State) ->
+    {reply, error, State}.
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.

+ 25 - 0
test/error_logger_pooler_h.erl

@@ -0,0 +1,25 @@
+%%% report handler to add to error_logger for calling error_logger_mon
+%%% during test
+-module(error_logger_pooler_h).
+
+-export([init/1,
+	 handle_event/2,
+         handle_call/2,
+         handle_info/2,
+	 terminate/2]).
+
+init(T) ->
+    {ok, T}.
+
+handle_event(_Event, Type) ->
+    error_logger_mon:report(),
+    {ok, Type}.
+
+handle_info(_, Type) ->
+    {ok, Type}.
+
+handle_call(_Query, _Type) ->
+    {error, bad_query}.
+
+terminate(_Reason, _Type) ->
+    [].

+ 45 - 0
test/pooler_tests.erl

@@ -1098,6 +1098,51 @@ pooler_custom_stop_mfa_test_() ->
                ?assertEqual(normal, Reason)
        end}]}.
 
+no_error_logger_reports_after_culling_test_() ->
+    %% damn those wraiths! This is the cure
+    {foreach,
+     fun() ->
+             {ok, _Pid} = error_logger_mon:start_link(),
+             Pool = [{name, test_pool_1},
+                     {max_count, 3},
+                     {init_count, 2},
+                     {cull_interval, {200, ms}},
+                     {max_age, {0, min}},
+                     {start_mfa, {pooled_gs, start_link, [{type}]}}],
+             application:set_env(pooler, pools, [Pool])
+     end,
+     fun(_) ->
+             ok = error_logger_mon:stop(),
+             error_logger:delete_report_handler(error_logger_pooler_h),
+             application:unset_env(pooler, pools)
+     end,
+     [
+      {"Force supervisor to report by using exit/2 instead of terminate_child",
+       fun() ->
+               {ok, [Pool]} = application:get_env(pooler, pools),
+               Stop = {stop_mfa, {erlang, exit, ['$pooler_pid', kill]}},
+               application:set_env(pooler, pools, [[Stop | Pool]]),
+               ok = application:start(pooler),
+               error_logger:add_report_handler(error_logger_pooler_h),
+               Reason = monitor_members_trigger_culling_and_return_reason(),
+               error_logger:delete_report_handler(error_logger_pooler_h),
+               ok = application:stop(pooler),
+               ?assertEqual(killed, Reason),
+               ?assertEqual(1, error_logger_mon:get_msg_count())
+       end},
+      {"Default MFA shouldn't generate any reports during culling",
+       fun() ->
+               ok = application:start(pooler),
+               error_logger:add_report_handler(error_logger_pooler_h),
+               Reason = monitor_members_trigger_culling_and_return_reason(),
+               error_logger:delete_report_handler(error_logger_pooler_h),
+               ok = application:stop(pooler),
+               ?assertEqual(killed, Reason),
+               ?assertEqual(0, error_logger_mon:get_msg_count())
+       end}
+     ]}.
+
+
 monitor_members_trigger_culling_and_return_reason() ->
     Pids = get_n_pids(test_pool_1, 3, []),
     [ erlang:monitor(process, P) || P <- Pids ],