Browse Source

Merge pull request #34 from seth/of/queueing

Queueing requestors with an external api
Oliver Ferrigni 10 years ago
parent
commit
d0b0e6bdca
4 changed files with 255 additions and 30 deletions
  1. 131 25
      src/pooler.erl
  2. 9 1
      src/pooler.hrl
  3. 2 1
      src/pooler_config.erl
  4. 113 3
      test/pooler_tests.erl

+ 131 - 25
src/pooler.erl

@@ -30,6 +30,7 @@
 -export([accept_member/2,
          start_link/1,
          take_member/1,
+         take_member/2,
          take_group_member/1,
          return_group_member/2,
          return_group_member/3,
@@ -40,7 +41,8 @@
          new_pool/1,
          pool_child_spec/1,
          rm_pool/1,
-         rm_group/1]).
+         rm_group/1
+        ]).
 
 %% ------------------------------------------------------------------
 %% gen_server Function Exports
@@ -176,7 +178,19 @@ accept_member(PoolName, MemberPid) ->
 %%
 -spec take_member(atom() | pid()) -> pid() | error_no_members.
 take_member(PoolName) when is_atom(PoolName) orelse is_pid(PoolName) ->
-    gen_server:call(PoolName, take_member, infinity).
+    gen_server:call(PoolName, {take_member, 0}, infinity).
+
+%% @doc Obtain exclusive access to a member of 'PoolName'.
+%%
+%% If no members are available, wait for up to Timeout milliseconds for a member
+%% to become available. Waiting requests are served in FIFO order. If no member
+%% is available within the specified timeout, error_no_members is returned.
+%% `Timeout' can be either milliseconds as integer or `{duration, time_unit}'
+%%
+-spec take_member(atom() | pid(), non_neg_integer() | time_spec()) -> pid() | error_no_members.
+take_member(PoolName, Timeout) when is_atom(PoolName) orelse is_pid(PoolName) ->
+    gen_server:call(PoolName, {take_member, time_as_millis(Timeout)}, infinity).
+
 
 %% @doc Take a member from a randomly selected member of the group
 %% `GroupName'. Returns `MemberPid' or `error_no_members'.  If no
@@ -297,9 +311,10 @@ init(#pool{}=Pool) ->
 
 set_member_sup(#pool{} = Pool, MemberSup) ->
     Pool#pool{member_sup = MemberSup}.
-handle_call(take_member, {CPid, _Tag}, #pool{} = Pool) ->
-    {Member, NewPool} = take_member_from_pool(Pool, CPid),
-    {reply, Member, NewPool};
+
+handle_call({take_member, Timeout}, From = {APid, _}, #pool{} = Pool) when is_pid(APid) ->
+    maybe_reply(take_member_from_pool_queued(Pool, From, Timeout));
+
 handle_call({return_member, Pid, Status}, {_CPid, _Tag}, Pool) ->
     {reply, ok, do_return_member(Pid, Status, Pool)};
 handle_call({accept_member, Pid}, _From, Pool) ->
@@ -318,6 +333,14 @@ handle_cast(_Msg, Pool) ->
     {noreply, Pool}.
 
 -spec handle_info(_, _) -> {'noreply', _}.
+handle_info({requestor_timeout, From}, Pool = #pool{ queued_requestors = RequestorQueue }) ->
+    NewQueue = queue:filter(fun({RequestorFrom, _TRef}) when RequestorFrom =:= From ->
+                                    gen_server:reply(RequestorFrom, error_no_members),
+                                    false;
+                               ({_, _}) ->
+                                    true
+                            end, RequestorQueue),
+    {noreply, Pool#pool{ queued_requestors = NewQueue} };
 handle_info(timeout, #pool{group = undefined} = Pool) ->
     %% ignore
     {noreply, Pool};
@@ -365,13 +388,12 @@ code_change(_OldVsn, State, _Extra) ->
 do_accept_member({StarterPid, Pid},
                  #pool{
                     all_members = AllMembers,
-                    free_pids = Free,
-                    free_count = NumFree,
                     starting_members = StartingMembers0,
                     member_start_timeout = StartTimeout
                    } = Pool) when is_pid(Pid) ->
     %% make sure we don't accept a timedout member
-    Pool1 = #pool{starting_members = StartingMembers}= remove_stale_starting_members(Pool, StartingMembers0, StartTimeout),
+    Pool1 = #pool{starting_members = StartingMembers} =
+        remove_stale_starting_members(Pool, StartingMembers0, StartTimeout),
     case lists:keymember(StarterPid, 1, StartingMembers) of
         false ->
             %% A starter completed even though we invalidated the pid
@@ -388,21 +410,74 @@ do_accept_member({StarterPid, Pid},
             Entry = {MRef, free, os:timestamp()},
             AllMembers1 = store_all_members(Pid, Entry, AllMembers),
             pooler_starter:stop(StarterPid),
-            Pool#pool{free_pids = Free ++ [Pid],
-                      free_count = NumFree + 1,
-                      all_members = AllMembers1,
-                      starting_members = StartingMembers1}
+            maybe_reply_with_pid(Pid, Pool1#pool{all_members = AllMembers1,
+                                                 starting_members = StartingMembers1})
     end;
-do_accept_member({StarterPid, _Reason}, #pool{starting_members = StartingMembers0,
-                                       member_start_timeout = StartTimeout} = Pool) ->
+do_accept_member({StarterPid, _Reason},
+                 #pool{starting_members = StartingMembers0,
+                       member_start_timeout = StartTimeout} = Pool) ->
     %% member start failed, remove in-flight ref and carry on.
     pooler_starter:stop(StarterPid),
-    Pool1 = #pool{starting_members = StartingMembers} = remove_stale_starting_members(Pool, StartingMembers0,
-                                                    StartTimeout),
+    Pool1 = #pool{starting_members = StartingMembers} =
+        remove_stale_starting_members(Pool, StartingMembers0,
+                                      StartTimeout),
     StartingMembers1 = lists:keydelete(StarterPid, 1, StartingMembers),
     Pool1#pool{starting_members = StartingMembers1}.
 
 
+maybe_reply_with_pid(Pid,
+                     Pool = #pool{queued_requestors = QueuedRequestors,
+                                  free_pids = Free,
+                                  free_count = NumFree}) when is_pid(Pid) ->
+    case queue:out(QueuedRequestors) of
+        {empty, _} ->
+            Pool#pool{free_pids = [Pid | Free],
+                      free_count = NumFree + 1};
+        {{value, {From = {APid, _}, TRef}}, NewQueuedRequestors} when is_pid(APid) ->
+            timer:cancel(TRef),
+            Pool1 = take_member_bookkeeping(Pid, From, NewQueuedRequestors, Pool),
+            send_metric(Pool, in_use_count, Pool1#pool.in_use_count, histogram),
+            send_metric(Pool, free_count, Pool1#pool.free_count, histogram),
+            send_metric(Pool, events, error_no_members, history),
+            gen_server:reply(From, Pid),
+            Pool1
+    end.
+
+-spec take_member_bookkeeping(pid(),
+                              {pid(), _},
+                              [pid()] | p_requestor_queue(),
+                              #pool{}) -> #pool{}.
+take_member_bookkeeping(MemberPid,
+                        {CPid, _},
+                        Rest,
+                        Pool = #pool{in_use_count = NumInUse,
+                                     free_count = NumFree,
+                                     consumer_to_pid = CPMap,
+                                     all_members = AllMembers})
+  when is_pid(MemberPid),
+       is_pid(CPid),
+       is_list(Rest) ->
+    Pool#pool{free_pids = Rest,
+              in_use_count = NumInUse + 1,
+              free_count = NumFree - 1,
+              consumer_to_pid = add_member_to_consumer(MemberPid, CPid, CPMap),
+              all_members = set_cpid_for_member(MemberPid, CPid, AllMembers)
+             };
+take_member_bookkeeping(MemberPid,
+                        {ReplyPid, _Tag},
+                        NewQueuedRequestors,
+                        Pool = #pool{
+                                  in_use_count = NumInUse,
+                                  all_members = AllMembers,
+                                  consumer_to_pid = CPMap
+                                 }) ->
+    Pool#pool{
+      in_use_count = NumInUse + 1,
+      all_members = set_cpid_for_member(MemberPid, ReplyPid, AllMembers),
+      consumer_to_pid = add_member_to_consumer(MemberPid, ReplyPid, CPMap),
+      queued_requestors = NewQueuedRequestors
+     }.
+
 -spec remove_stale_starting_members(#pool{}, [{reference(), erlang:timestamp()}],
                                     time_spec()) -> #pool{}.
 remove_stale_starting_members(Pool, StartingMembers, MaxAge) ->
@@ -458,7 +533,6 @@ take_member_from_pool(#pool{init_count = InitCount,
                             free_pids = Free,
                             in_use_count = NumInUse,
                             free_count = NumFree,
-                            consumer_to_pid = CPMap,
                             starting_members = StartingMembers,
                             member_start_timeout = StartTimeout} = Pool,
                       From) ->
@@ -485,15 +559,31 @@ take_member_from_pool(#pool{init_count = InitCount,
             send_metric(Pool, events, error_no_members, history),
             {error_no_members, Pool2};
         [Pid|Rest] ->
-            Pool2 = Pool1#pool{free_pids = Rest, in_use_count = NumInUse + 1,
-                              free_count = NumFree - 1},
+            Pool2 = take_member_bookkeeping(Pid, From, Rest, Pool1),
             send_metric(Pool, in_use_count, Pool2#pool.in_use_count, histogram),
             send_metric(Pool, free_count, Pool2#pool.free_count, histogram),
-            {Pid, Pool2#pool{
-                    consumer_to_pid = add_member_to_consumer(Pid, From, CPMap),
-                    all_members = set_cpid_for_member(Pid, From,
-                                                      Pool2#pool.all_members)
-                   }}
+            {Pid, Pool2}
+    end.
+
+-spec take_member_from_pool_queued(#pool{},
+                                   {pid(), _},
+                                   non_neg_integer()) ->
+                                   {error_no_members | queued | pid(), #pool{}}.
+take_member_from_pool_queued(Pool0 = #pool{queue_max = QMax,
+                                           queued_requestors = Requestors},
+                             From = {CPid, _},
+                             Timeout) when is_pid(CPid) ->
+    case {take_member_from_pool(Pool0, From), queue:len(Requestors)} of
+        {{error_no_members, Pool1}, QLen} when QLen >= QMax ->
+            send_metric(Pool1, events, error_no_members, history),
+            send_metric(Pool1, queue_max_reached, {inc, 1}, counter),
+            {error_no_members, Pool1};
+        {{error_no_members, Pool1 = #pool{queued_requestors = QueuedRequestors}}, QueueCount} ->
+            {ok, TRef} = timer:send_after(Timeout, {requestor_timeout, From}),
+            send_metric(Pool1, queue_count, QueueCount, histogram),
+            {queued, Pool1#pool{queued_requestors = queue:in({From, TRef}, QueuedRequestors)}};
+        {{Member, NewPool}, _} when is_pid(Member) ->
+            {Member, NewPool}
     end.
 
 %% @doc Add `Count' members to `Pool' asynchronously. Returns updated
@@ -737,7 +827,11 @@ time_as_secs({Time, Unit}) ->
 -spec time_as_millis(time_spec()) -> non_neg_integer().
 %% @doc Convert time unit into milliseconds.
 time_as_millis({Time, Unit}) ->
-    time_as_micros({Time, Unit}) div 1000.
+    time_as_micros({Time, Unit}) div 1000;
+%% Allows blind convert
+time_as_millis(Time) when is_integer(Time) ->
+    Time.
+
 
 -spec time_as_micros(time_spec()) -> non_neg_integer().
 %% @doc Convert time unit into microseconds
@@ -752,3 +846,15 @@ time_as_micros({Time, mu}) ->
 
 secs_between({Mega1, Secs1, _}, {Mega2, Secs2, _}) ->
     (Mega2 - Mega1) * 1000000 + (Secs2 - Secs1).
+
+-spec maybe_reply({'queued' | 'error_no_members' | pid(), #pool{}}) ->
+                         {noreply, #pool{}} | {reply, 'error_no_members' | pid(), #pool{}}.
+maybe_reply({Member, NewPool}) ->
+    case Member of
+        queued ->
+            {noreply, NewPool};
+        error_no_members ->
+            {reply, error_no_members, NewPool};
+        Member when is_pid(Member) ->
+            {reply, Member, NewPool}
+    end.

+ 9 - 1
src/pooler.hrl

@@ -3,6 +3,7 @@
 -define(DEFAULT_MAX_AGE, {30, sec}).
 -define(DEFAULT_MEMBER_START_TIMEOUT, {1, min}).
 -define(POOLER_GROUP_TABLE, pooler_group_table).
+-define(DEFAULT_POOLER_QUEUE_MAX, 50).
 
 -type member_info() :: {string(), free | pid(), {_, _, _}}.
 -type free_member_info() :: {string(), free, {_, _, _}}.
@@ -11,8 +12,10 @@
 
 -ifdef(namespaced_types).
 -type p_dict() :: dict:dict().
+-type p_requestor_queue() :: queue:queue({{pid(), _}, timer:tref()}).
 -else.
 -type p_dict() :: dict().
+-type p_requestor_queue() :: queue().
 -endif.
 
 -record(pool, {
@@ -78,7 +81,12 @@
 
           %% The API used to call the metrics system. It supports both Folsom
           %% and Exometer format.
-          metrics_api = folsom :: 'folsom' | 'exometer'
+          metrics_api = folsom :: 'folsom' | 'exometer',
+
+          %% A queue of requestors for blocking take member requests
+          queued_requestors = queue:new() :: p_requestor_queue(),
+          %% The max depth of the queue
+          queue_max = 50
          }).
 
 -define(gv(X, Y), proplists:get_value(X, Y)).

+ 2 - 1
src/pooler_config.erl

@@ -21,7 +21,8 @@ list_to_pool(P) ->
        max_age           = ?gv(max_age, P, ?DEFAULT_MAX_AGE),
        member_start_timeout = ?gv(member_start_timeout, P, ?DEFAULT_MEMBER_START_TIMEOUT),
        metrics_mod       = ?gv(metrics_mod, P, pooler_no_metrics),
-       metrics_api       = ?gv(metrics_api, P, folsom)}.
+       metrics_api       = ?gv(metrics_api, P, folsom),
+       queue_max         = ?gv(queue_max, P, ?DEFAULT_POOLER_QUEUE_MAX)}.
 
 %% Return `Value' for `Key' in proplist `P' or crashes with an
 %% informative message if no value is found.

+ 113 - 3
test/pooler_tests.erl

@@ -362,7 +362,8 @@ basic_tests() ->
                                         <<"pooler.test_pool_1.in_use_count">>,
                                         <<"pooler.test_pool_1.killed_free_count">>,
                                         <<"pooler.test_pool_1.killed_in_use_count">>,
-                                        <<"pooler.test_pool_1.take_rate">>]),
+                                        <<"pooler.test_pool_1.take_rate">>,
+                                        <<"pooler.test_pool_1.queue_count">>]),
                Metrics = fake_metrics:get_metrics(),
                GotKeys = lists:usort([ Name || {Name, _, _} <- Metrics ]),
                ?assertEqual(ExpectKeys, GotKeys)
@@ -727,7 +728,113 @@ pooler_integration_long_init_test_() ->
       end
      ]
      }.
-    
+
+sleep_for_configured_timeout() ->
+    SleepTime = case application:get_env(pooler, sleep_time) of
+                    {ok, Val} ->
+                        Val;
+                    _  ->
+                        0
+                end,
+    timer:sleep(SleepTime).    
+
+pooler_integration_queueing_test_() ->
+    {foreach,
+     % setup
+     fun() ->
+             Pool = [{name, test_pool_1},
+                     {max_count, 10},
+                     {queue_max, 10},
+                     {init_count, 0},
+                     {metrics, fake_metrics},
+                     {member_start_timeout, {5, sec}},
+                     {start_mfa,
+                      {pooled_gs, start_link, [
+                                               {"type-0",
+                                                fun pooler_tests:sleep_for_configured_timeout/0 }
+                                              ]
+                      }
+                     }
+                    ],
+
+             application:set_env(pooler, pools, [Pool]),
+             fake_metrics:start_link(),
+             application:start(pooler)
+     end,
+     % cleanup
+     fun(_) ->
+             fake_metrics:stop(),
+             application:stop(pooler)
+     end,
+     [
+      fun(_) ->
+              fun() ->
+                      ?assertEqual(0, (dump_pool(test_pool_1))#pool.free_count),
+                      Val = pooler:take_member(test_pool_1, 10),
+                      ?assert(is_pid(Val)),
+                      pooler:return_member(test_pool_1, Val)
+              end
+      end,
+      fun(_) ->
+              fun() ->
+                      application:set_env(pooler, sleep_time, 1),
+                      ?assertEqual(0, (dump_pool(test_pool_1))#pool.free_count),
+                      Val = pooler:take_member(test_pool_1, 0),
+                      ?assertEqual(error_no_members, Val),
+                      timer:sleep(50),
+                      %Next request should be available
+                      Pid = pooler:take_member(test_pool_1, 0),
+                      ?assert(is_pid(Pid)),
+                      pooler:return_member(test_pool_1, Pid)
+              end
+      end,
+      fun(_) ->
+              fun() ->
+                      application:set_env(pooler, sleep_time, 10),
+                      ?assertEqual(0, (dump_pool(test_pool_1))#pool.free_count),
+                      [
+                       ?assertEqual(pooler:take_member(test_pool_1, 0), error_no_members) ||
+                          _ <- lists:seq(1, (dump_pool(test_pool_1))#pool.max_count)],
+                      timer:sleep(50),
+                      %Next request should be available
+                      Pid = pooler:take_member(test_pool_1, 0),
+                      ?assert(is_pid(Pid)),
+                      pooler:return_member(test_pool_1, Pid)
+              end
+      end,
+      fun(_) ->
+              fun() ->
+                      % fill to queue_max, next request should return immediately with no_members
+                      % Will return a if queue max is not enforced.
+                      application:set_env(pooler, sleep_time, 100),
+                      [ proc_lib:spawn(fun() ->
+                                               Val = pooler:take_member(test_pool_1, 200),
+                                                ?assert(is_pid(Val)),
+                                                pooler:return_member(Val)
+                                       end)
+                        || _ <- lists:seq(1, (dump_pool(test_pool_1))#pool.max_count)
+                      ],
+                      timer:sleep(50),
+                      ?assertEqual(10, queue:len((dump_pool(test_pool_1))#pool.queued_requestors)),
+                      ?assertEqual(pooler:take_member(test_pool_1, 500), error_no_members),
+                      ExpectKeys = lists:sort([<<"pooler.test_pool_1.error_no_members_count">>,
+                                               <<"pooler.test_pool_1.events">>,
+                                               <<"pooler.test_pool_1.take_rate">>,
+                                               <<"pooler.test_pool_1.queue_count">>,
+                                               <<"pooler.test_pool_1.queue_max_reached">>]),
+                      Metrics = fake_metrics:get_metrics(),
+                      GotKeys = lists:usort([ Name || {Name, _, _} <- Metrics ]),
+                      ?assertEqual(ExpectKeys, GotKeys),
+
+                      timer:sleep(100),
+                      Val = pooler:take_member(test_pool_1, 500),
+                      ?assert(is_pid(Val)),
+                      pooler:return_member(test_pool_1, Val)
+              end
+      end
+     ]
+    }.
+
 
 pooler_integration_test_() ->
     {foreach,
@@ -837,4 +944,7 @@ children_count(SupId) ->
     length(supervisor:which_children(SupId)).
 
 starting_members(PoolName) ->
-    length((gen_server:call(PoolName, dump_pool))#pool.starting_members).
+    length((dump_pool(PoolName))#pool.starting_members).
+
+dump_pool(PoolName) ->
+    gen_server:call(PoolName, dump_pool).