Просмотр исходного кода

Support return_member satisfying a queued requestors

Oliver Ferrigni 10 лет назад
Родитель
Сommit
e1ba4a75dc
2 измененных файлов с 83 добавлено и 12 удалено
  1. 21 10
      src/pooler.erl
  2. 62 2
      test/pooler_tests.erl

+ 21 - 10
src/pooler.erl

@@ -434,15 +434,19 @@ maybe_reply_with_pid(Pid,
             Pool#pool{free_pids = [Pid | Free],
                       free_count = NumFree + 1};
         {{value, {From = {APid, _}, TRef}}, NewQueuedRequestors} when is_pid(APid) ->
-            timer:cancel(TRef),
-            Pool1 = take_member_bookkeeping(Pid, From, NewQueuedRequestors, Pool),
-            send_metric(Pool, in_use_count, Pool1#pool.in_use_count, histogram),
-            send_metric(Pool, free_count, Pool1#pool.free_count, histogram),
-            send_metric(Pool, events, error_no_members, history),
-            gen_server:reply(From, Pid),
-            Pool1
+            reply_to_queued_requestor(TRef, Pid, From, NewQueuedRequestors, Pool)
     end.
 
+reply_to_queued_requestor(TRef, Pid, From = {APid, _}, NewQueuedRequestors, Pool) when is_pid(APid) ->
+    timer:cancel(TRef),
+    Pool1 = take_member_bookkeeping(Pid, From, NewQueuedRequestors, Pool),
+    send_metric(Pool, in_use_count, Pool1#pool.in_use_count, histogram),
+    send_metric(Pool, free_count, Pool1#pool.free_count, histogram),
+    send_metric(Pool, events, error_no_members, history),
+    gen_server:reply(From, Pid),
+    Pool1.
+    
+
 -spec take_member_bookkeeping(pid(),
                               {pid(), _},
                               [pid()] | p_requestor_queue(),
@@ -597,7 +601,8 @@ add_members_async(Count, #pool{starting_members = StartingMembers} = Pool) ->
 
 -spec do_return_member(pid(), ok | fail, #pool{}) -> #pool{}.
 do_return_member(Pid, ok, #pool{name = PoolName,
-                                all_members = AllMembers} = Pool) ->
+                                all_members = AllMembers,
+                                queued_requestors = QueuedRequestors} = Pool) ->
     clean_group_table(Pid, Pool),
     case dict:find(Pid, AllMembers) of
         {ok, {_, free, _}} ->
@@ -610,9 +615,15 @@ do_return_member(Pid, ok, #pool{name = PoolName,
             Pool1 = Pool#pool{free_pids = [Pid | Free], in_use_count = NumInUse - 1,
                               free_count = NumFree + 1},
             Entry = {MRef, free, os:timestamp()},
-            Pool1#pool{all_members = store_all_members(Pid, Entry, AllMembers),
+            Pool2 = Pool1#pool{all_members = store_all_members(Pid, Entry, AllMembers),
                        consumer_to_pid = cpmap_remove(Pid, CPid,
-                                                      Pool1#pool.consumer_to_pid)};
+                                                      Pool1#pool.consumer_to_pid)},
+            case queue:out(QueuedRequestors) of
+                {empty, _ } ->
+                    Pool2;
+                {{value, {From = {APid, _}, TRef}}, NewQueuedRequestors} when is_pid(APid) ->
+                    reply_to_queued_requestor(TRef, Pid, From, NewQueuedRequestors, Pool2)
+            end;
         error ->
             Pool
     end;

+ 62 - 2
test/pooler_tests.erl

@@ -809,8 +809,8 @@ pooler_integration_queueing_test_() ->
                       application:set_env(pooler, sleep_time, 100),
                       [ proc_lib:spawn(fun() ->
                                                Val = pooler:take_member(test_pool_1, 200),
-                                                ?assert(is_pid(Val)),
-                                                pooler:return_member(Val)
+                                               ?assert(is_pid(Val)),
+                                               pooler:return_member(Val)
                                        end)
                         || _ <- lists:seq(1, (dump_pool(test_pool_1))#pool.max_count)
                       ],
@@ -834,6 +834,66 @@ pooler_integration_queueing_test_() ->
       end
      ]
     }.
+pooler_integration_queueing_return_member_test_() ->
+    {foreach,
+     % setup
+     fun() ->
+             Pool = [{name, test_pool_1},
+                     {max_count, 10},
+                     {queue_max, 10},
+                     {init_count, 10},
+                     {metrics, fake_metrics},
+                     {member_start_timeout, {5, sec}},
+                     {start_mfa,
+                      {pooled_gs, start_link, [
+                                               {"type-0",
+                                                fun pooler_tests:sleep_for_configured_timeout/0 }
+                                              ]
+                      }
+                     }
+                    ],
+
+             application:set_env(pooler, pools, [Pool]),
+             fake_metrics:start_link(),
+             application:start(pooler)
+     end,
+     % cleanup
+     fun(_) ->
+             fake_metrics:stop(),
+             application:stop(pooler)
+     end,
+     [
+      fun(_) ->
+              fun() ->
+                      application:set_env(pooler, sleep_time, 0),
+                      Pids = [ proc_lib:spawn_link(fun() ->
+                                               Val = pooler:take_member(test_pool_1, 200),
+                                               ?assert(is_pid(Val)),
+                                               receive
+                                                   _ ->
+                                                       pooler:return_member(test_pool_1, Val)
+                                                   after
+                                                       5000 ->
+                                                           pooler:return_member(test_pool_1, Val)
+                                               end
+                                       end)
+                        || _ <- lists:seq(1, (dump_pool(test_pool_1))#pool.max_count)
+                      ],
+                      timer:sleep(50),
+                      Parent = self(),
+                      proc_lib:spawn_link(fun() ->
+                                             Val = pooler:take_member(test_pool_1, 200),
+                                             Parent ! Val
+                                     end),
+                      [Pid ! return || Pid <- Pids],
+                      receive
+                          Result ->
+                              ?assert(is_pid(Result))
+                      end
+              end
+      end
+      ]
+     }.
 
 
 pooler_integration_test_() ->