Browse Source

Refactor tracking of in use and free pids

We now track all members across pools in the #state.all_members dict.
This simplifies the handling of returned members and dealing with
crashes of members that are not currently in use.

The all_members dict maps member pid to a tuple of {PoolName,
ConsumerPid | free, Time} where Time is a timestamp that gets updated
each time the member is returned to the pool.  This sets things up to
cull unused members after a traffic spike.
Seth Falcon 14 years ago
parent
commit
2675804fac
3 changed files with 145 additions and 67 deletions
  1. 45 6
      doc/dev-notes.org
  2. 95 56
      src/pooler.erl
  3. 5 5
      test/pooler_test.erl

+ 45 - 6
doc/dev-notes.org

@@ -1,13 +1,24 @@
 * pooler dev notes
 * pooler dev notes
-** Details
+** Implementation Details
 
 
-pooler is implemented as a =gen_server=.  Server state consists of:
+pooler is implemented as an OTP application.  Consumers interact with
+the pooler gen_server.  Pools and pool members are managed in a
+supervision tree described below (see [[Supervision]]).
 
 
+Server state consists of:
+
+- Count of number of pools.
 - A dict of pools keyed by pool name.
 - A dict of pools keyed by pool name.
 - A dict of pool supervisors keyed by pool name.
 - A dict of pool supervisors keyed by pool name.
-- A dict mapping in-use members to their pool name and the pid of the
-  consumer that is using the member.
+- A dict mapping member pids (across all pools) to ={PoolName, Status,
+  Time}=, where Status is either 'free' or the pid of the consumer
+  process using the member and Time is a timestamp from
+  =os:timestamp/0= that records the time this member was last returned
+  to the pool (this is used to cull inactive members).
 - A dict mapping consumer process pids to the member they are using.
 - A dict mapping consumer process pids to the member they are using.
+  A consumer can use more than one member at a time.
+- An array mapping integers to pool names to provide for a convenient
+  way of selecting a random pool.
 
 
 Each pool keeps track of its parameters, such as max member to allow,
 Each pool keeps track of its parameters, such as max member to allow,
 initial members to start, number of members in use, and a list of free
 initial members to start, number of members in use, and a list of free
@@ -28,7 +39,6 @@ consumer, we will destroy the member and add a fresh one to the pool.
 The member starter MFA should use start_link so that pooler will be
 The member starter MFA should use start_link so that pooler will be
 linked to the members.  This way, when members crash, pooler will be
 linked to the members.  This way, when members crash, pooler will be
 notified and can refill the pool with new pids.
 notified and can refill the pool with new pids.
-
 *** Supervision
 *** Supervision
 
 
 The top-level pooler supervisor, pooler_sup, supervises the pooler
 The top-level pooler supervisor, pooler_sup, supervises the pooler
@@ -36,9 +46,38 @@ gen_server and the pooler_pool_sup supervisor.  pooler_pool_sup
 supervises individual pool supervisors (pooler_pooled_worker_sup).
 supervises individual pool supervisors (pooler_pooled_worker_sup).
 Each pooler_pooled_worker_sup supervises the members of a pool.
 Each pooler_pooled_worker_sup supervises the members of a pool.
 
 
-[[doc/pooler-appmon.jpg]]
+[[../doc/pooler-appmon.jpg]]
+
+*** Decomissioning pool members when not used
+
+pooler will allocate up to =max_count= members in a pool before
+returning =error_no_members= when take_member is called.  If you set
+max_count to a large value, you may end up with a very large pool
+after a traffic spike.  These members will be culled if unused for
+=cull_after= minutes.
+
+Whenever a member is returned to the pool, either explicitly or
+implicitly via member or consumer exit, a check for inactive members
+will be made if such a check has not been made in =cull_after=
+minutes.  To cull inactive members, all pool members are examined
+using the all_members dict and those that are marked 'free' with a
+timestamp older than =cull_after= minutes will be removed from the
+pool, killed and not replaced.
 
 
 ** Notes
 ** Notes
+*** TODO Add ability to specify a health function for members.
+For riak pb, you should probably call the ping function at start and
+possibly on a schedule to cull bad members.
+*** TODO take_member should try other pools if one is empty
+Or perhaps provide a load-balancing scheme based on choosing the pool
+with the smallest in_use_count (assumes same sized pools).  Could also
+track free count (to avoid length(free_pids)) and pick pool with
+largest free count.
+*** TODO Should we add event logging, stat gather?
+*** TODO Enhance random load balancing
+Would it be better to make a bulk call for randomness and cache it in
+server state rather than calling into the crypto module for each
+take_member call?
 *** Pool management
 *** Pool management
 
 
 It is an error to add a pool with a name that already exists.
 It is an error to add a pool with a name that already exists.

+ 95 - 56
src/pooler.erl

@@ -17,7 +17,7 @@
           npools                       :: non_neg_integer(),
           npools                       :: non_neg_integer(),
           pools = dict:new()           :: dict(),
           pools = dict:new()           :: dict(),
           pool_sups = dict:new()       :: dict(),
           pool_sups = dict:new()       :: dict(),
-          in_use_pids = dict:new()     :: dict(),
+          all_members = dict:new()     :: dict(),
           consumer_to_pid = dict:new() :: dict(),
           consumer_to_pid = dict:new() :: dict(),
           pool_selector                :: array()
           pool_selector                :: array()
          }).
          }).
@@ -36,7 +36,7 @@
          return_member/2,
          return_member/2,
          % remove_pool/2,
          % remove_pool/2,
          % add_pool/1,
          % add_pool/1,
-         pool_stats/1]).
+         pool_stats/0]).
 
 
 %% ------------------------------------------------------------------
 %% ------------------------------------------------------------------
 %% gen_server Function Exports
 %% gen_server Function Exports
@@ -87,9 +87,9 @@ return_member(Pid, Status) when Status == ok; Status == fail ->
 %     gen_server:call(?SERVER, {add_pool, Pool}).
 %     gen_server:call(?SERVER, {add_pool, Pool}).
 
 
 %% @doc Obtain runtime state info for a given pool.
 %% @doc Obtain runtime state info for a given pool.
--spec pool_stats(string()) -> [tuple()].
-pool_stats(Pool) ->
-    gen_server:call(?SERVER, {pool_stats, Pool}).
+-spec pool_stats() -> [tuple()].
+pool_stats() ->
+    gen_server:call(?SERVER, pool_stats).
 
 
 
 
 %% ------------------------------------------------------------------
 %% ------------------------------------------------------------------
@@ -124,38 +124,31 @@ handle_call(take_member, {CPid, _Tag},
     {reply, NewPid, NewState};
     {reply, NewPid, NewState};
 handle_call(stop, _From, State) ->
 handle_call(stop, _From, State) ->
     {stop, normal, stop_ok, State};
     {stop, normal, stop_ok, State};
-handle_call({pool_stats, PoolName}, _From, State) ->
-    Pool = dict:fetch(PoolName, State#state.pools),
-    Stats = [{in_use, dict:fetch_keys(State#state.in_use_pids)},
-             {free, Pool#pool.free_pids}],
-    {reply, Stats, State};
+handle_call(pool_stats, _From, State) ->
+    {reply, dict:to_list(State#state.all_members), State};
 handle_call(_Request, _From, State) ->
 handle_call(_Request, _From, State) ->
     {noreply, ok, State}.
     {noreply, ok, State}.
 
 
-
-handle_cast({return_member, Pid, Status, CPid}, State) ->
-    {noreply, do_return_member({Pid, Status}, CPid, State)};
+handle_cast({return_member, Pid, Status, _CPid}, State) ->
+    {noreply, do_return_member2(Pid, Status, State)};
 handle_cast(_Msg, State) ->
 handle_cast(_Msg, State) ->
     {noreply, State}.
     {noreply, State}.
 
 
 handle_info({'EXIT', Pid, Reason}, State) ->
 handle_info({'EXIT', Pid, Reason}, State) ->
-    % error_logger:info_report({got_exit, Pid, Reason}),
     State1 =
     State1 =
-        case dict:find(Pid, State#state.in_use_pids) of
-            {ok, {_PName, CPid}} ->
-                do_return_member({Pid, fail}, CPid, State);
+        case dict:find(Pid, State#state.all_members) of
+            {ok, {_PoolName, _ConsumerPid, _Time}} ->
+                do_return_member2(Pid, fail, State);
             error ->
             error ->
-                CPMap = State#state.consumer_to_pid,
-                case dict:find(Pid, CPMap) of
+                case dict:find(Pid, State#state.consumer_to_pid) of
                     {ok, Pids} ->
                     {ok, Pids} ->
                         IsOk = case Reason of
                         IsOk = case Reason of
                                    normal -> ok;
                                    normal -> ok;
                                    _Crash -> fail
                                    _Crash -> fail
                                end,
                                end,
                         lists:foldl(
                         lists:foldl(
-                          fun(P, S) ->
-                                  do_return_member({P, IsOk}, Pid, S)
-                          end, State, Pids);
+                          fun(P, S) -> do_return_member2(P, IsOk, S) end,
+                          State, Pids);
                     error ->
                     error ->
                         State
                         State
                 end
                 end
@@ -181,11 +174,12 @@ props_to_pool(P) ->
            start_mfa = ?gv(start_mfa, P)}.
            start_mfa = ?gv(start_mfa, P)}.
 
 
 % FIXME: creation of new pids should probably happen
 % FIXME: creation of new pids should probably happen
-% in a spawned process to avoid typing up the loop.
+% in a spawned process to avoid tying up the loop.
 add_pids(error, _N, State) ->
 add_pids(error, _N, State) ->
     {bad_pool_name, State};
     {bad_pool_name, State};
 add_pids(PoolName, N, State) ->
 add_pids(PoolName, N, State) ->
-    #state{pools = Pools, pool_sups = PoolSups} = State,
+    #state{pools = Pools, pool_sups = PoolSups,
+           all_members = AllMembers} = State,
     Pool = dict:fetch(PoolName, Pools),
     Pool = dict:fetch(PoolName, Pools),
     #pool{max_count = Max, free_pids = Free, in_use_count = NumInUse} = Pool,
     #pool{max_count = Max, free_pids = Free, in_use_count = NumInUse} = Pool,
     Total = length(Free) + NumInUse,
     Total = length(Free) + NumInUse,
@@ -198,64 +192,79 @@ add_pids(PoolName, N, State) ->
                                   erlang:link(Pid),
                                   erlang:link(Pid),
                                   Pid
                                   Pid
                           end, lists:seq(1, N)),
                           end, lists:seq(1, N)),
+            AllMembers1 = lists:foldl(
+                            fun(M, Dict) ->
+                                    Time = os:timestamp(),
+                                    dict:store(M, {PoolName, free, Time}, Dict)
+                            end, AllMembers, NewPids),
             Pool1 = Pool#pool{free_pids = Free ++ NewPids},
             Pool1 = Pool#pool{free_pids = Free ++ NewPids},
-            {ok, State#state{pools = dict:store(PoolName, Pool1, Pools)}};
+            {ok, State#state{pools = dict:store(PoolName, Pool1, Pools),
+                             all_members = AllMembers1}};
         false ->
         false ->
             {max_count_reached, State}
             {max_count_reached, State}
     end.
     end.
 
 
 take_member(PoolName, From, State) ->
 take_member(PoolName, From, State) ->
-    #state{pools = Pools, in_use_pids = InUse, consumer_to_pid = CPMap} = State,
+    #state{pools = Pools, consumer_to_pid = CPMap} = State,
     Pool = dict:fetch(PoolName, Pools),
     Pool = dict:fetch(PoolName, Pools),
     #pool{max_count = Max, free_pids = Free, in_use_count = NumInUse} = Pool,
     #pool{max_count = Max, free_pids = Free, in_use_count = NumInUse} = Pool,
     case Free of
     case Free of
         [] when NumInUse == Max ->
         [] when NumInUse == Max ->
-            {error_no_pids, State};
+            {error_no_members, State};
         [] when NumInUse < Max ->
         [] when NumInUse < Max ->
             case add_pids(PoolName, 1, State) of
             case add_pids(PoolName, 1, State) of
                 {ok, State1} ->
                 {ok, State1} ->
                     take_member(PoolName, From, State1);
                     take_member(PoolName, From, State1);
                 {max_count_reached, _} ->
                 {max_count_reached, _} ->
-                    {error_no_pids, State}
+                    {error_no_members, State}
             end;
             end;
         [Pid|Rest] ->
         [Pid|Rest] ->
             erlang:link(From),
             erlang:link(From),
             Pool1 = Pool#pool{free_pids = Rest, in_use_count = NumInUse + 1},
             Pool1 = Pool#pool{free_pids = Rest, in_use_count = NumInUse + 1},
             CPMap1 = dict:update(From, fun(O) -> [Pid|O] end, [Pid], CPMap),
             CPMap1 = dict:update(From, fun(O) -> [Pid|O] end, [Pid], CPMap),
+            AllMembers =
+                dict:update(Pid,
+                            fun({PName, free, Time}) -> {PName, From, Time} end,
+                            State#state.all_members),
             {Pid, State#state{pools = dict:store(PoolName, Pool1, Pools),
             {Pid, State#state{pools = dict:store(PoolName, Pool1, Pools),
-                              in_use_pids = dict:store(Pid, {PoolName, From}, InUse),
-                              consumer_to_pid = CPMap1}}
+                              consumer_to_pid = CPMap1,
+                              all_members = AllMembers}}
     end.
     end.
 
 
-do_return_member({Pid, Status}, CPid, State) ->
-    #state{in_use_pids = InUse, pools = Pools,
-           consumer_to_pid = CPMap} = State,
-    case dict:find(Pid, InUse) of
-        {ok, {PoolName, _CPid2}} -> % FIXME, assert that CPid2 == CPid?
-            Pool = dict:fetch(PoolName, Pools),
-            {Pool1, State1} =
-                case Status of
-                    ok -> {add_pid_to_free(Pid, Pool), State};
-                    fail -> handle_failed_pid(Pid, PoolName, Pool, State)
-                    end,
-            State1#state{in_use_pids = dict:erase(Pid, InUse),
-                         pools = dict:store(PoolName, Pool1, Pools),
-                         consumer_to_pid = cpmap_remove(Pid, CPid, CPMap)};
+-spec do_return_member2(pid(), ok | fail, #state{}) -> #state{}.
+do_return_member2(Pid, ok, State = #state{all_members = AllMembers}) ->
+    {PoolName, _CPid, _} = dict:fetch(Pid, AllMembers),
+    Pool = dict:fetch(PoolName, State#state.pools),
+    #pool{free_pids = Free, in_use_count = NumInUse} = Pool,
+    Pool1 = Pool#pool{free_pids = [Pid | Free], in_use_count = NumInUse - 1},
+    State#state{pools = dict:store(PoolName, Pool1, State#state.pools),
+                all_members = dict:store(Pid, {PoolName, free, os:timestamp()},
+                                         AllMembers)};
+do_return_member2(Pid, fail, State = #state{all_members = AllMembers}) ->
+    % for the fail case, perhaps the member crashed and was alerady
+    % removed, so use find instead of fetch and ignore missing.
+    case dict:find(Pid, AllMembers) of
+        {ok, {PoolName, _, _}} ->
+            State1 = remove_pid(Pid, State),
+            {Status, State2} = add_pids(PoolName, 1, State1),
+            case Status =:= ok orelse Status =:= max_count_reached of
+                true ->
+                    State2;
+                false ->
+                    erlang:error({error, "unexpected return from add_pid",
+                                  Status, erlang:get_stacktrace()})
+            end;
         error ->
         error ->
-            error_logger:warning_report({return_member_not_found, Pid, dict:to_list(InUse)}),
             State
             State
     end.
     end.
-
-add_pid_to_free(Pid, Pool) ->
-    #pool{free_pids = Free, in_use_count = NumInUse} = Pool,
-    Pool#pool{free_pids = [Pid|Free], in_use_count = NumInUse - 1}.
-
-handle_failed_pid(Pid, PoolName, Pool, State) ->
-    exit(Pid, kill),
-    {_, NewState} = add_pids(PoolName, 1, State),
-    NumInUse = Pool#pool.in_use_count,
-    {Pool#pool{in_use_count = NumInUse - 1}, NewState}.
-
+    
+% @doc Remove `Pid' from the pid list associated with `CPid' in the
+% consumer to member map given by `CPMap'.
+%
+% If `Pid' is the last element in `CPid's pid list, then the `CPid'
+% entry is removed entirely.
+%
+-spec cpmap_remove(pid(), pid(), dict()) -> dict().
 cpmap_remove(Pid, CPid, CPMap) ->
 cpmap_remove(Pid, CPid, CPMap) ->
     case dict:find(CPid, CPMap) of
     case dict:find(CPid, CPMap) of
         {ok, Pids0} ->
         {ok, Pids0} ->
@@ -271,3 +280,33 @@ cpmap_remove(Pid, CPid, CPMap) ->
             % FIXME: this shouldn't happen, should we log or error?
             % FIXME: this shouldn't happen, should we log or error?
             CPMap
             CPMap
     end.
     end.
+
+% @doc Remove and kill a pool member.
+%
+% Handles in-use and free members.  Logs an error if the pid is not
+% tracked in state.all_members.
+%
+-spec remove_pid(pid(), #state{}) -> #state{}.
+remove_pid(Pid, State) ->
+    #state{all_members = AllMembers, pools = Pools,
+           consumer_to_pid = CPMap} = State,
+    case dict:find(Pid, AllMembers) of
+        {ok, {PoolName, free, _Time}} ->
+            % remove an unused member
+            Pool = dict:fetch(PoolName, Pools),
+            Pool1 = lists:delete(Pid, Pool#pool.free_pids),
+            exit(Pid, kill),
+            State#state{pools = dict:store(PoolName, Pool1, Pools),
+                        all_members = dict:erase(Pid, AllMembers)};
+        {ok, {PoolName, CPid, _Time}} ->
+            Pool = dict:fetch(PoolName, Pools),
+            Pool1 = Pool#pool{in_use_count = Pool#pool.in_use_count - 1},
+            exit(Pid, kill),
+            State#state{pools = dict:store(PoolName, Pool1, Pools),
+                        consumer_to_pid = cpmap_remove(Pid, CPid, CPMap),
+                        all_members = dict:erase(Pid, AllMembers)};
+        error ->
+            error_logger:error_report({unknown_pid, Pid,
+                                       erlang:get_stacktrace()}),
+            State
+    end.

+ 5 - 5
test/pooler_test.erl

@@ -27,7 +27,7 @@ user_stop(Pid) ->
 user_crash(Pid) ->
 user_crash(Pid) ->
     Pid ! crash.
     Pid ! crash.
 
 
-user_loop(Atom) when Atom =:= error_no_pids orelse Atom =:= start ->
+user_loop(Atom) when Atom =:= error_no_members orelse Atom =:= start ->
     user_loop(pooler:take_member());
     user_loop(pooler:take_member());
 user_loop(MyTC) ->
 user_loop(MyTC) ->
     receive
     receive
@@ -127,8 +127,8 @@ pooler_basics_test_() ->
       {"pids are created on demand until max",
       {"pids are created on demand until max",
        fun() ->
        fun() ->
                Pids = [pooler:take_member(), pooler:take_member(), pooler:take_member()],
                Pids = [pooler:take_member(), pooler:take_member(), pooler:take_member()],
-               ?assertMatch(error_no_pids, pooler:take_member()),
-               ?assertMatch(error_no_pids, pooler:take_member()),
+               ?assertMatch(error_no_members, pooler:take_member()),
+               ?assertMatch(error_no_members, pooler:take_member()),
                PRefs = [ R || {_T, R} <- [ pooled_gs:get_id(P) || P <- Pids ] ],
                PRefs = [ R || {_T, R} <- [ pooled_gs:get_id(P) || P <- Pids ] ],
                % no duplicates
                % no duplicates
                ?assertEqual(length(PRefs), length(lists:usort(PRefs)))
                ?assertEqual(length(PRefs), length(lists:usort(PRefs)))
@@ -243,12 +243,12 @@ pooler_integration_test_() ->
 
 
 % testing crash recovery means race conditions when either pids
 % testing crash recovery means race conditions when either pids
 % haven't yet crashed or pooler hasn't recovered.  So this helper loops
 % haven't yet crashed or pooler hasn't recovered.  So this helper loops
-% forver until N pids are obtained, ignoring error_no_pids.
+% forver until N pids are obtained, ignoring error_no_members.
 get_n_pids(0, Acc) ->
 get_n_pids(0, Acc) ->
     Acc;
     Acc;
 get_n_pids(N, Acc) ->
 get_n_pids(N, Acc) ->
     case pooler:take_member() of
     case pooler:take_member() of
-        error_no_pids ->
+        error_no_members ->
             get_n_pids(N, Acc);
             get_n_pids(N, Acc);
         Pid ->
         Pid ->
             get_n_pids(N - 1, [Pid|Acc])
             get_n_pids(N - 1, [Pid|Acc])