Browse Source

Merge branch 'cull_schedule'

Seth Falcon 13 years ago
parent
commit
d66fafd3f2
2 changed files with 185 additions and 69 deletions
  1. 102 48
      src/pooler.erl
  2. 83 21
      test/pooler_test.erl

+ 102 - 48
src/pooler.erl

@@ -1,5 +1,5 @@
 %% @author Seth Falcon <seth@userprimary.net>
 %% @author Seth Falcon <seth@userprimary.net>
-%% @copyright 2011 Seth Falcon
+%% @copyright 2011-2012 Seth Falcon
 %% @doc This is the main interface to the pooler application
 %% @doc This is the main interface to the pooler application
 %%
 %%
 %% To integrate with your application, you probably want to call
 %% To integrate with your application, you probably want to call
@@ -11,12 +11,26 @@
 -module(pooler).
 -module(pooler).
 -behaviour(gen_server).
 -behaviour(gen_server).
 -define(SERVER, ?MODULE).
 -define(SERVER, ?MODULE).
+
 -define(DEFAULT_ADD_RETRY, 1).
 -define(DEFAULT_ADD_RETRY, 1).
+-define(DEFAULT_CULL_INTERVAL, {0, min}).
+-define(DEFAULT_MAX_AGE, {0, min}).
 
 
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("eunit/include/eunit.hrl").
 
 
 -type member_info() :: {string(), free | pid(), {_, _, _}}.
 -type member_info() :: {string(), free | pid(), {_, _, _}}.
 -type free_member_info() :: {string(), free, {_, _, _}}.
 -type free_member_info() :: {string(), free, {_, _, _}}.
+-type time_unit() :: min | sec | ms | mu.
+-type time_spec() :: {non_neg_integer(), time_unit()}.
+
+%% type specs for pool metrics
+-type metric_label() :: binary().
+-type metric_value() :: 'unknown_pid' |
+                        non_neg_integer() |
+                        {'add_pids_failed', non_neg_integer(), non_neg_integer()} |
+                        {'inc',1} |
+                        'error_no_members'.
+-type metric_type() :: 'counter' | 'histogram' | 'history' | 'meter'.
 
 
 -record(pool, {
 -record(pool, {
           name             :: string(),
           name             :: string(),
@@ -32,7 +46,14 @@
           %% returned by a call to take_member. NOTE: this value
           %% returned by a call to take_member. NOTE: this value
           %% should be >= 2 or else the pool will not grow on demand
           %% should be >= 2 or else the pool will not grow on demand
           %% when max_count is larger than init_count.
           %% when max_count is larger than init_count.
-          add_member_retry = ?DEFAULT_ADD_RETRY :: non_neg_integer()
+          add_member_retry = ?DEFAULT_ADD_RETRY :: non_neg_integer(),
+
+          %% The interval to schedule a cull message. Both
+          %% 'cull_interval' and 'max_age' are specified using a
+          %% `time_spec()' type.
+          cull_interval = ?DEFAULT_CULL_INTERVAL :: time_spec(),
+          %% The maximum age for members.
+          max_age = ?DEFAULT_MAX_AGE             :: time_spec()
          }).
          }).
 
 
 -record(state, {
 -record(state, {
@@ -60,8 +81,7 @@
          return_member/2,
          return_member/2,
          % remove_pool/2,
          % remove_pool/2,
          % add_pool/1,
          % add_pool/1,
-         pool_stats/0,
-         cull_pool/2]).
+         pool_stats/0]).
 
 
 %% ------------------------------------------------------------------
 %% ------------------------------------------------------------------
 %% gen_server Function Exports
 %% gen_server Function Exports
@@ -74,6 +94,11 @@
          terminate/2,
          terminate/2,
          code_change/3]).
          code_change/3]).
 
 
+%% To help with testing internal functions
+-ifdef(TEST).
+-compile([export_all]).
+-endif.
+
 %% ------------------------------------------------------------------
 %% ------------------------------------------------------------------
 %% API Function Definitions
 %% API Function Definitions
 %% ------------------------------------------------------------------
 %% ------------------------------------------------------------------
@@ -144,15 +169,6 @@ return_member(error_no_members) ->
 pool_stats() ->
 pool_stats() ->
     gen_server:call(?SERVER, pool_stats).
     gen_server:call(?SERVER, pool_stats).
 
 
-%% @doc Remove members whose last return timestamp is older than
-%% `MaxAgeMin' minutes.
-%%
-%% EXPERIMENTAL
-%%
--spec cull_pool(string(), non_neg_integer()) -> ok.
-cull_pool(PoolName, MaxAgeMin) when MaxAgeMin >= 0 ->
-    gen_server:call(?SERVER, {cull_pool, PoolName, MaxAgeMin}).
-
 %% ------------------------------------------------------------------
 %% ------------------------------------------------------------------
 %% gen_server Function Definitions
 %% gen_server Function Definitions
 %% ------------------------------------------------------------------
 %% ------------------------------------------------------------------
@@ -176,8 +192,10 @@ init(Config) ->
                     pool_sups = dict:from_list(PoolSups),
                     pool_sups = dict:from_list(PoolSups),
                     pool_selector = array:from_list([PN || {PN, _} <- Pools])
                     pool_selector = array:from_list([PN || {PN, _} <- Pools])
                   },
                   },
+
     lists:foldl(fun(#pool{name = PName, init_count = N}, {ok, AccState}) ->
     lists:foldl(fun(#pool{name = PName, init_count = N}, {ok, AccState}) ->
-                        add_pids(PName, N, AccState)
+                        AccState1 = cull_members(PName, AccState),
+                        add_pids(PName, N, AccState1)
                 end, {ok, State0}, PoolRecs).
                 end, {ok, State0}, PoolRecs).
 
 
 handle_call(take_member, {CPid, _Tag},
 handle_call(take_member, {CPid, _Tag},
@@ -206,8 +224,6 @@ handle_call(stop, _From, State) ->
     {stop, normal, stop_ok, State};
     {stop, normal, stop_ok, State};
 handle_call(pool_stats, _From, State) ->
 handle_call(pool_stats, _From, State) ->
     {reply, dict:to_list(State#state.all_members), State};
     {reply, dict:to_list(State#state.all_members), State};
-handle_call({cull_pool, PoolName, MaxAgeMin}, _From, State) ->
-    {reply, ok, cull_members(PoolName, MaxAgeMin, State)};
 handle_call(_Request, _From, State) ->
 handle_call(_Request, _From, State) ->
     {noreply, State}.
     {noreply, State}.
 
 
@@ -238,6 +254,8 @@ handle_info({'EXIT', Pid, Reason}, State) ->
                 end
                 end
         end,
         end,
     {noreply, State1};
     {noreply, State1};
+handle_info({cull_pool, PoolName}, State) ->
+    {noreply, cull_members(PoolName, State)};
 handle_info(_Info, State) ->
 handle_info(_Info, State) ->
     {noreply, State}.
     {noreply, State}.
 
 
@@ -259,7 +277,9 @@ props_to_pool(P) ->
            max_count = ?gv(max_count, P),
            max_count = ?gv(max_count, P),
           init_count = ?gv(init_count, P),
           init_count = ?gv(init_count, P),
            start_mfa = ?gv(start_mfa, P),
            start_mfa = ?gv(start_mfa, P),
- add_member_retry = ?gv(add_member_retry, P, ?DEFAULT_ADD_RETRY)}.
+    add_member_retry = ?gv(add_member_retry, P, ?DEFAULT_ADD_RETRY),
+       cull_interval = ?gv(cull_interval, P, ?DEFAULT_CULL_INTERVAL),
+             max_age = ?gv(max_age, P, ?DEFAULT_MAX_AGE)}.
 
 
 % FIXME: creation of new pids should probably happen
 % FIXME: creation of new pids should probably happen
 % in a spawned process to avoid tying up the loop.
 % in a spawned process to avoid tying up the loop.
@@ -518,46 +538,64 @@ set_cpid_for_member(MemberPid, CPid, AllMembers) ->
 add_member_to_consumer(MemberPid, CPid, CPMap) ->
 add_member_to_consumer(MemberPid, CPid, CPMap) ->
     dict:update(CPid, fun(O) -> [MemberPid|O] end, [MemberPid], CPMap).
     dict:update(CPid, fun(O) -> [MemberPid|O] end, [MemberPid], CPMap).
 
 
--spec cull_members(string(), non_neg_integer(), #state{}) -> #state{}.
-cull_members(PoolName, MaxAgeMin, #state{pools = Pools} = State) ->
-    cull_members_from_pool(fetch_pool(PoolName, Pools), MaxAgeMin, State).
-
--spec cull_members_from_pool(#pool{}, non_neg_integer(), #state{}) -> #state{}.
-cull_members_from_pool(#pool{free_count = FreeCount,
+-spec cull_members(string(), #state{}) -> #state{}.
+cull_members(PoolName, #state{pools = Pools} = State) ->
+    cull_members_from_pool(fetch_pool(PoolName, Pools), State).
+
+-spec cull_members_from_pool(#pool{}, #state{}) -> #state{}.
+cull_members_from_pool(error_no_pool, State) ->
+    State;
+cull_members_from_pool(#pool{cull_interval = {0, _}}, State) ->
+    %% 0 cull_interval means do not cull
+    State;
+cull_members_from_pool(#pool{name = PoolName,
+                             free_count = FreeCount,
                              init_count = InitCount,
                              init_count = InitCount,
-                             in_use_count = InUseCount} = Pool, MaxAgeMin,
+                             in_use_count = InUseCount,
+                             cull_interval = Delay,
+                             max_age = MaxAge} = Pool,
                        #state{all_members = AllMembers} = State) ->
                        #state{all_members = AllMembers} = State) ->
     MaxCull = FreeCount - (InitCount - InUseCount),
     MaxCull = FreeCount - (InitCount - InUseCount),
-    case MaxCull > 0 of
-        true ->
-            MemberInfo = member_info(Pool#pool.free_pids, AllMembers),
-            ExpiredMembers =
-                expired_free_members(MemberInfo, os:timestamp(), MaxAgeMin),
-            CullList = lists:sublist(ExpiredMembers, MaxCull),
-            lists:foldl(fun({CullMe, _}, S) -> remove_pid(CullMe, S) end,
-                        State, CullList);
-        false ->
-            State
-    end.
+    State1 = case MaxCull > 0 of
+                 true ->
+                     MemberInfo = member_info(Pool#pool.free_pids, AllMembers),
+                     ExpiredMembers =
+                         expired_free_members(MemberInfo, os:timestamp(), MaxAge),
+                     CullList = lists:sublist(ExpiredMembers, MaxCull),
+                     lists:foldl(fun({CullMe, _}, S) -> remove_pid(CullMe, S) end,
+                                 State, CullList);
+                 false ->
+                     State
+             end,
+    schedule_cull(PoolName, Delay),
+    State1.
+
+-spec schedule_cull(PoolName :: string(), Delay :: time_spec()) -> reference().
+%% @doc Schedule a pool cleaning or "cull" for `PoolName' in which
+%% members older than `max_age' will be removed until the pool has
+%% `init_count' members. Uses `erlang:send_after/3' for light-weight
+%% timer that will be auto-cancelled upon pooler shutdown.
+schedule_cull(PoolName, Delay) ->
+    DelayMillis = time_as_millis(Delay),
+    %% use pid instead of server name atom to take advantage of
+    %% automatic cancelling
+    erlang:send_after(DelayMillis, self(), {cull_pool, PoolName}).
 
 
 -spec member_info([pid()], dict()) -> [{pid(), member_info()}].
 -spec member_info([pid()], dict()) -> [{pid(), member_info()}].
 member_info(Pids, AllMembers) ->
 member_info(Pids, AllMembers) ->
     [ {P, dict:fetch(P, AllMembers)} || P <- Pids ].
     [ {P, dict:fetch(P, AllMembers)} || P <- Pids ].
 
 
--spec expired_free_members([{pid(), member_info()}], {_, _, _},
-                           non_neg_integer()) -> [{pid(), free_member_info()}].
-expired_free_members(Members, Now, MaxAgeMin) ->
-    Micros = 60 * 1000 * 1000,
+-spec expired_free_members(Members :: [{pid(), member_info()}],
+                           Now :: {_, _, _},
+                           MaxAge :: time_spec()) -> [{pid(), free_member_info()}].
+expired_free_members(Members, Now, MaxAge) ->
+    MaxMicros = time_as_micros(MaxAge),
     [ MI || MI = {_, {_, free, LastReturn}} <- Members,
     [ MI || MI = {_, {_, free, LastReturn}} <- Members,
-            timer:now_diff(Now, LastReturn) >= (MaxAgeMin * Micros) ].
-
--spec send_metric(binary(),
-                  'error_no_members' |
-                  'unknown_pid' |
-                  non_neg_integer() |
-                  {'inc',1} |
-                  {'add_pids_failed', non_neg_integer(), non_neg_integer()},
-                  'counter' | 'histogram' | 'history' | 'meter') -> ok.
+            timer:now_diff(Now, LastReturn) >= MaxMicros ].
+
+-spec send_metric(Name :: metric_label(),
+                  Value :: metric_value(),
+                  Type :: metric_type()) -> ok.
 %% Send a metric using the metrics module from application config or
 %% Send a metric using the metrics module from application config or
 %% do nothing.
 %% do nothing.
 send_metric(Name, Value, Type) ->
 send_metric(Name, Value, Type) ->
@@ -571,3 +609,19 @@ send_metric(Name, Value, Type) ->
 pool_metric(PoolName, Metric) ->
 pool_metric(PoolName, Metric) ->
     iolist_to_binary([<<"pooler.">>, PoolName, ".",
     iolist_to_binary([<<"pooler.">>, PoolName, ".",
                       atom_to_binary(Metric, utf8)]).
                       atom_to_binary(Metric, utf8)]).
+
+-spec time_as_millis(time_spec()) -> non_neg_integer().
+%% @doc Convert time unit into milliseconds.
+time_as_millis({Time, Unit}) ->
+    time_as_micros({Time, Unit}) div 1000.
+
+-spec time_as_micros(time_spec()) -> non_neg_integer().
+%% @doc Convert time unit into microseconds
+time_as_micros({Time, min}) ->
+    60 * 1000 * 1000 * Time;
+time_as_micros({Time, sec}) ->
+    1000 * 1000 * Time;
+time_as_micros({Time, ms}) ->
+    1000 * Time;
+time_as_micros({Time, mu}) ->
+    Time.

+ 83 - 21
test/pooler_test.erl

@@ -235,27 +235,6 @@ pooler_basics_test_() ->
        end
        end
       },
       },
 
 
-      {"cull_pool can be called and do nothing",
-       %% FIXME: this exercises the code path, but doesn't test anything
-       fun() ->
-               ?assertEqual(ok, pooler:cull_pool("p1", 10))
-       end
-      },
-
-      {"cull_pool culls unused members",
-       fun() ->
-               %% take all
-               [P1, P2, _P3] = [pooler:take_member(), pooler:take_member(), pooler:take_member()],
-               %% return one
-               pooler:return_member(P1),
-               pooler:return_member(P2),
-               %% call a sync action since return_member is async
-               _Ignore = pooler:pool_stats(),
-               ?assertEqual(ok, pooler:cull_pool("p1", 0)),
-               ?assertEqual(2, length(pooler:pool_stats()))
-       end
-      },
-
       {"metrics have been called",
       {"metrics have been called",
        fun() ->
        fun() ->
                %% exercise the API to ensure we have certain keys reported as metrics
                %% exercise the API to ensure we have certain keys reported as metrics
@@ -305,6 +284,69 @@ pooler_limit_failed_adds_test_() ->
              ?assertEqual(error_no_members, pooler:take_member("p1"))
              ?assertEqual(error_no_members, pooler:take_member("p1"))
      end}.
      end}.
 
 
+pooler_scheduled_cull_test_() ->
+    {setup,
+     fun() ->
+             application:set_env(pooler, metrics_module, fake_metrics),
+             fake_metrics:start_link(),
+             Pools = [[{name, "p1"},
+                       {max_count, 10},
+                       {init_count, 2},
+                       {start_mfa, {pooled_gs, start_link, [{"type-0"}]}},
+                       {cull_interval, {200, ms}}]],
+             application:set_env(pooler, pools, Pools),
+             error_logger:delete_report_handler(error_logger_tty_h),
+             application:start(pooler)
+     end,
+     fun(_X) ->
+             fake_metrics:stop(),
+             application:stop(pooler)
+     end,
+     [{"excess members are culled repeatedly",
+       fun() ->
+               %% take all members
+               Pids1 = [ pooler:take_member("p1") || _X <- lists:seq(1, 10) ],
+               %% return all
+               [ pooler:return_member(P) || P <- Pids1 ],
+               ?assertEqual(10, length(pooler:pool_stats())),
+               %% wait for longer than cull delay
+               timer:sleep(250),
+               ?assertEqual(2, length(pooler:pool_stats())),
+
+               %% repeat the test to verify that culling gets rescheduled.
+               Pids2 = [ pooler:take_member("p1") || _X <- lists:seq(1, 10) ],
+               %% return all
+               [ pooler:return_member(P) || P <- Pids2 ],
+               ?assertEqual(10, length(pooler:pool_stats())),
+               %% wait for longer than cull delay
+               timer:sleep(250),
+               ?assertEqual(2, length(pooler:pool_stats()))
+       end
+      },
+
+      {"non-excess members are not culled",
+       fun() ->
+               [P1, P2] = [pooler:take_member("p1") || _X <- [1, 2] ],
+               [pooler:return_member(P) || P <- [P1, P2] ],
+               ?assertEqual(2, length(pooler:pool_stats())),
+               timer:sleep(250),
+               ?assertEqual(2, length(pooler:pool_stats()))
+       end
+      },
+
+      {"in-use members are not culled",
+       fun() ->
+               %% take all members
+               Pids = [ pooler:take_member("p1") || _X <- lists:seq(1, 10) ],
+               %% don't return any
+               ?assertEqual(10, length(pooler:pool_stats())),
+               %% wait for longer than cull delay
+               timer:sleep(250),
+               ?assertEqual(10, length(pooler:pool_stats())),
+               [ pooler:return_member(P) || P <- Pids ]
+       end}
+     ]}.
+
 random_message_test_() ->
 random_message_test_() ->
     {setup,
     {setup,
      fun() ->
      fun() ->
@@ -392,6 +434,26 @@ pooler_integration_test_() ->
      ]
      ]
     }.
     }.
 
 
+time_as_millis_test_() ->
+    Zeros = [ {{0, U}, 0} || U <- [min, sec, ms, mu] ],
+    Ones = [{{1, min}, 60000},
+            {{1, sec}, 1000},
+            {{1, ms}, 1},
+            {{1, mu}, 0}],
+    Misc = [{{3000, mu}, 3}],
+    Tests = Zeros ++ Ones ++ Misc,
+    [ ?_assertEqual(E, pooler:time_as_millis(I)) || {I, E} <- Tests ].
+
+time_as_micros_test_() ->
+    Zeros = [ {{0, U}, 0} || U <- [min, sec, ms, mu] ],
+    Ones = [{{1, min}, 60000000},
+            {{1, sec}, 1000000},
+            {{1, ms}, 1000},
+            {{1, mu}, 1}],
+    Misc = [{{3000, mu}, 3000}],
+    Tests = Zeros ++ Ones ++ Misc,
+    [ ?_assertEqual(E, pooler:time_as_micros(I)) || {I, E} <- Tests ].
+
 % testing crash recovery means race conditions when either pids
 % testing crash recovery means race conditions when either pids
 % haven't yet crashed or pooler hasn't recovered.  So this helper loops
 % haven't yet crashed or pooler hasn't recovered.  So this helper loops
 % forver until N pids are obtained, ignoring error_no_members.
 % forver until N pids are obtained, ignoring error_no_members.