Просмотр исходного кода

Merge pull request #52 from sata/call-free-members

Perform task across free members
Seth Falcon 9 лет назад
Родитель
Сommit
02c8ac9559
3 измененных файлов с 107 добавлено и 1 удалено
  1. 32 1
      src/pooler.erl
  2. 6 0
      test/pooled_gs.erl
  3. 69 0
      test/pooler_tests.erl

+ 32 - 1
src/pooler.erl

@@ -41,7 +41,9 @@
          new_pool/1,
          pool_child_spec/1,
          rm_pool/1,
-         rm_group/1
+         rm_group/1,
+         call_free_members/2,
+         call_free_members/3
         ]).
 
 %% ------------------------------------------------------------------
@@ -290,6 +292,23 @@ return_member(_, error_no_members) ->
 pool_stats(PoolName) ->
     gen_server:call(PoolName, pool_stats).
 
+%% @doc Invokes `Fun' with arity 1 over all free members in pool with `PoolName'.
+%%
+-spec call_free_members(atom() | pid(), fun((pid()) -> term())) -> Res when
+      Res :: [{ok, term()} | {error, term()}].
+call_free_members(PoolName, Fun)
+  when (is_atom(PoolName) orelse is_pid(PoolName)) andalso is_function(Fun, 1) ->
+    call_free_members(PoolName, Fun, infinity).
+
+%% @doc Invokes `Fun' with arity 1 over all free members in pool with `PoolName'.
+%% `Timeout' sets the timeout of gen_server call.
+-spec call_free_members(atom() | pid(), Fun, timeout()) -> Res when
+      Fun :: fun((pid()) -> term()),
+      Res :: [{ok, term()} | {error, term()}].
+call_free_members(PoolName, Fun, Timeout)
+  when (is_atom(PoolName) orelse is_pid(PoolName)) andalso is_function(Fun, 1) ->
+    gen_server:call(PoolName, {call_free_members, Fun}, Timeout).
+
 %% ------------------------------------------------------------------
 %% gen_server Function Definitions
 %% ------------------------------------------------------------------
@@ -325,6 +344,8 @@ handle_call(pool_stats, _From, Pool) ->
     {reply, dict:to_list(Pool#pool.all_members), Pool};
 handle_call(dump_pool, _From, Pool) ->
     {reply, Pool, Pool};
+handle_call({call_free_members, Fun}, _From, #pool{free_pids = Pids} = Pool) ->
+  {reply, do_call_free_members(Fun, Pids), Pool};
 handle_call(_Request, _From, Pool) ->
     {noreply, Pool}.
 
@@ -900,3 +921,13 @@ terminate_pid(Pid, {Mod, Fun, Args}) when is_list(Args) ->
     end;
 terminate_pid(Pid, _) ->
     terminate_pid(Pid, ?DEFAULT_STOP_MFA).
+
+do_call_free_members(Fun, Pids) ->
+    [do_call_free_member(Fun, P) || P <- Pids].
+
+do_call_free_member(Fun, Pid) ->
+    try {ok, Fun(Pid)}
+    catch
+        _Class:Reason ->
+            {error, Reason}
+    end.

+ 6 - 0
test/pooled_gs.erl

@@ -19,6 +19,7 @@
          ping/1,
          ping_count/1,
          crash/1,
+         error_on_call/1,
          do_work/2,
          stop/1
         ]).
@@ -64,6 +65,9 @@ crash(S) ->
     gen_server:cast(S, crash),
     sent_crash_request.
 
+error_on_call(S) ->
+    gen_server:call(S, error_on_call).
+
 stop(S) ->
     gen_server:call(S, stop).
 
@@ -95,6 +99,8 @@ handle_call(ping, _From, #state{ping_count = C } = State) ->
     {reply, pong, State1};
 handle_call(ping_count, _From, #state{ping_count = C } = State) ->
     {reply, C, State};
+handle_call(error_on_call, _From, _State) ->
+    erlang:error({pooled_gs, requested_error});
 handle_call(stop, _From, State) ->
     {stop, normal, stop_ok, State};
 handle_call(_Request, _From, State) ->

+ 69 - 0
test/pooler_tests.erl

@@ -1129,6 +1129,75 @@ time_as_micros_test_() ->
     Tests = Zeros ++ Ones ++ Misc,
     [ ?_assertEqual(E, pooler:time_as_micros(I)) || {I, E} <- Tests ].
 
+
+call_free_members_test_() ->
+    NumWorkers = 10,
+    PoolName = test_pool_1,
+
+    {setup,
+     fun() ->
+             application:set_env(pooler, metrics_module, fake_metrics),
+             fake_metrics:start_link()
+     end,
+     fun(_X) ->
+             fake_metrics:stop()
+     end,
+     {foreach,
+      fun() ->
+              Pool = [{name, PoolName},
+                      {max_count, NumWorkers},
+                      {init_count, NumWorkers},
+                      {start_mfa,
+                       {pooled_gs, start_link, [{"type-0"}]}}],
+              application:unset_env(pooler, pools),
+              application:start(pooler),
+              pooler:new_pool(Pool)
+      end,
+      fun(_X) ->
+              application:stop(pooler)
+      end,
+      [
+       {"perform a ping across the pool when all workers are free",
+        fun() ->
+                ?assertEqual(NumWorkers, (dump_pool(PoolName))#pool.free_count),
+                Res = pooler:call_free_members(PoolName, fun pooled_gs:ping/1),
+
+                ?assertEqual(NumWorkers, count_pongs(Res))
+        end},
+       {"perform a ping across the pool when two workers are taken",
+        fun() ->
+                ?assertEqual(NumWorkers, (dump_pool(PoolName))#pool.free_count),
+                Pids = [pooler:take_member(PoolName) || _X <- lists:seq(0, 1)],
+                Res = pooler:call_free_members(PoolName, fun pooled_gs:ping/1),
+
+                ?assertEqual(NumWorkers -2, count_pongs(Res)),
+
+                [pooler:return_member(PoolName, P) || P <- Pids]
+        end},
+       {"perform an op where the op crashes all free members",
+        fun() ->
+                ?assertEqual(NumWorkers, (dump_pool(PoolName))#pool.free_count),
+                Res = pooler:call_free_members(PoolName,
+                                               fun pooled_gs:error_on_call/1),
+
+                ?assertEqual(NumWorkers, count_errors(Res))
+        end}
+      ]}}.
+
+count_pongs(Result) ->
+    lists:foldl(fun({ok, pong}, Acc) -> Acc + 1;
+                   ({error, _}, Acc) -> Acc
+                end,
+                0,
+                Result).
+
+count_errors(Result) ->
+    lists:foldl(fun({error, _}, Acc) -> Acc + 1;
+                   ({ok, _}, Acc) -> Acc
+                end,
+                0,
+                Result).
+
 % testing crash recovery means race conditions when either pids
 % haven't yet crashed or pooler hasn't recovered.  So this helper loops
 % forver until N pids are obtained, ignoring error_no_members.