Browse Source

Use monitors instead of links to track members and consumers

pooler now tracks both members and consumers using monitors. This
means that named pools (pooler gen_server) no longer traps exits. A
crash of pooker will no longer crash consumers with checked out
members.
Seth Falcon 12 years ago
parent
commit
08211e5324
2 changed files with 59 additions and 39 deletions
  1. 45 38
      src/pooler.erl
  2. 14 1
      src/pooler.hrl

+ 45 - 38
src/pooler.erl

@@ -168,9 +168,6 @@ pool_stats(PoolName) ->
 
 -spec init(#pool{}) -> {'ok', #pool{}, 0}.
 init(#pool{}=Pool) ->
-    %% FIXME: change to a monitor only model so that this doesn't have
-    %% to be a system process.
-    process_flag(trap_exit, true),
     #pool{init_count = N} = Pool,
     MemberSup = pooler_pool_sup:member_sup_name(Pool),
     Pool1 = set_member_sup(Pool, MemberSup),
@@ -210,14 +207,14 @@ handle_info(timeout, #pool{group = Group} = Pool) ->
     ok = pg2:create(Group),
     ok = pg2:join(Group, self()),
     {noreply, Pool};
-handle_info({'EXIT', Pid, Reason}, State) ->
+handle_info({'DOWN', MRef, process, Pid, Reason}, State) ->
     State1 =
         case dict:find(Pid, State#pool.all_members) of
             {ok, {_PoolName, _ConsumerPid, _Time}} ->
                 do_return_member(Pid, fail, State);
             error ->
                 case dict:find(Pid, State#pool.consumer_to_pid) of
-                    {ok, Pids} ->
+                    {ok, {MRef, Pids}} ->
                         IsOk = case Reason of
                                    normal -> ok;
                                    _Crash -> fail
@@ -260,8 +257,7 @@ add_pids(N, Pool) ->
     PoolName = Pool#pool.name,
     case Total + N =< Max of
         true ->
-            {AllMembers1, NewPids} = start_n_pids(N, PoolName, PoolSup,
-                                                  AllMembers),
+            {AllMembers1, NewPids} = start_n_pids(N, PoolSup, AllMembers),
             %% start_n_pids may return fewer than N if errors were
             %% encountered.
             NewPidCount = length(NewPids),
@@ -313,7 +309,6 @@ take_member_from_pool(#pool{max_count = Max,
             send_metric(Pool, events, error_no_members, history),
             {error_no_members, Pool};
         [Pid|Rest] ->
-            erlang:link(From),
             Pool1 = Pool#pool{free_pids = Rest, in_use_count = NumInUse + 1,
                               free_count = NumFree - 1},
             send_metric(Pool, in_use_count, Pool1#pool.in_use_count, histogram),
@@ -329,12 +324,12 @@ take_member_from_pool(#pool{max_count = Max,
 do_return_member(Pid, ok, #pool{all_members = AllMembers} = Pool) ->
     clean_group_table(Pid, Pool),
     case dict:find(Pid, AllMembers) of
-        {ok, {PoolName, CPid, _}} ->
+        {ok, {MRef, CPid, _}} ->
             #pool{free_pids = Free, in_use_count = NumInUse,
                   free_count = NumFree} = Pool,
             Pool1 = Pool#pool{free_pids = [Pid | Free], in_use_count = NumInUse - 1,
                               free_count = NumFree + 1},
-            Entry = {PoolName, free, os:timestamp()},
+            Entry = {MRef, free, os:timestamp()},
             Pool1#pool{all_members = store_all_members(Pid, Entry, AllMembers),
                        consumer_to_pid = cpmap_remove(Pid, CPid,
                                                       Pool1#pool.consumer_to_pid)};
@@ -346,7 +341,7 @@ do_return_member(Pid, fail, #pool{all_members = AllMembers} = Pool) ->
     % removed, so use find instead of fetch and ignore missing.
     clean_group_table(Pid, Pool),
     case dict:find(Pid, AllMembers) of
-        {ok, {_PoolName, _, _}} ->
+        {ok, {_MRef, _, _}} ->
             Pool1 = remove_pid(Pid, Pool),
             case add_pids(1, Pool1) of
                 {Status, Pool2} when Status =:= ok;
@@ -378,13 +373,14 @@ cpmap_remove(_Pid, free, CPMap) ->
     CPMap;
 cpmap_remove(Pid, CPid, CPMap) ->
     case dict:find(CPid, CPMap) of
-        {ok, Pids0} ->
-            unlink(CPid), % FIXME: flush msg queue here?
+        {ok, {MRef, Pids0}} ->
             Pids1 = lists:delete(Pid, Pids0),
             case Pids1 of
                 [_H|_T] ->
-                    dict:store(CPid, Pids1, CPMap);
+                    dict:store(CPid, {MRef, Pids1}, CPMap);
                 [] ->
+                    %% no more members for this consumer
+                    erlang:demonitor(MRef),
                     dict:erase(CPid, CPMap)
             end;
         error ->
@@ -403,15 +399,19 @@ remove_pid(Pid, Pool) ->
           all_members = AllMembers,
           consumer_to_pid = CPMap} = Pool,
     case dict:find(Pid, AllMembers) of
-        {ok, {PoolName, free, _Time}} ->
+        {ok, {MRef, free, _Time}} ->
             % remove an unused member
+            erlang:demonitor(MRef),
             FreePids = lists:delete(Pid, Pool#pool.free_pids),
             NumFree = Pool#pool.free_count - 1,
             Pool1 = Pool#pool{free_pids = FreePids, free_count = NumFree},
             exit(Pid, kill),
             send_metric(Pool1, killed_free_count, {inc, 1}, counter),
             Pool1#pool{all_members = dict:erase(Pid, AllMembers)};
-        {ok, {PoolName, CPid, _Time}} ->
+        {ok, {MRef, CPid, _Time}} ->
+            %% remove a member being consumed. No notice is sent to
+            %% the consumer.
+            erlang:demonitor(MRef),
             Pool1 = Pool#pool{in_use_count = Pool#pool.in_use_count - 1},
             exit(Pid, kill),
             send_metric(Pool1, killed_in_use_count, {inc, 1}, counter),
@@ -424,24 +424,23 @@ remove_pid(Pid, Pool) ->
             Pool
     end.
 
--spec start_n_pids(non_neg_integer(), atom() | pid(), pid(), dict()) ->
-    {dict(), [pid()]}.
-start_n_pids(N, PoolName, PoolSup, AllMembers) ->
-    NewPids = do_n(N, fun(Acc) ->
-                              case supervisor:start_child(PoolSup, []) of
-                                  {ok, Pid} ->
-                                      %% FIXME: we should monitor instead
-                                      erlang:link(Pid),
-                                      [Pid | Acc];
-                                  _Else ->
-                                      Acc
-                              end
-                      end, []),
+-spec start_n_pids(non_neg_integer(), pid(), dict()) -> {dict(), [pid()]}.
+start_n_pids(N, PoolSup, AllMembers) ->
+    NewPidsWith = do_n(N, fun(Acc) ->
+                                  case supervisor:start_child(PoolSup, []) of
+                                      {ok, Pid} ->
+                                          MRef = erlang:monitor(process, Pid),
+                                          [{MRef, Pid} | Acc];
+                                      _Else ->
+                                          Acc
+                                  end
+                          end, []),
     AllMembers1 = lists:foldl(
-                    fun(M, Dict) ->
-                            Entry = {PoolName, free, os:timestamp()},
-                            store_all_members(M, Entry, Dict)
-                    end, AllMembers, NewPids),
+                    fun({MRef, Pid}, Dict) ->
+                            Entry = {MRef, free, os:timestamp()},
+                            store_all_members(Pid, Entry, Dict)
+                    end, AllMembers, NewPidsWith),
+    NewPids = [ Pid || {_MRef, Pid} <- NewPidsWith ],
     {AllMembers1, NewPids}.
 
 do_n(0, _Fun, Acc) ->
@@ -453,20 +452,28 @@ pool_add_retries(#pool{add_member_retry = Retries}) ->
     Retries.
 
 -spec store_all_members(pid(),
-                        {string(), free | pid(), {_, _, _}}, dict()) -> dict().
-store_all_members(Pid, Val = {_PoolName, _CPid, _Time}, AllMembers) ->
+                        {reference(), free | pid(), {_, _, _}}, dict()) -> dict().
+store_all_members(Pid, Val = {_MRef, _CPid, _Time}, AllMembers) ->
     dict:store(Pid, Val, AllMembers).
 
 -spec set_cpid_for_member(pid(), pid(), dict()) -> dict().
 set_cpid_for_member(MemberPid, CPid, AllMembers) ->
     dict:update(MemberPid,
-                fun({PoolName, free, Time = {_, _, _}}) ->
-                        {PoolName, CPid, Time}
+                fun({MRef, free, Time = {_, _, _}}) ->
+                        {MRef, CPid, Time}
                 end, AllMembers).
 
 -spec add_member_to_consumer(pid(), pid(), dict()) -> dict().
 add_member_to_consumer(MemberPid, CPid, CPMap) ->
-    dict:update(CPid, fun(O) -> [MemberPid|O] end, [MemberPid], CPMap).
+    %% we can't use dict:update here because we need to create the
+    %% monitor if we aren't already tracking this consumer.
+    case dict:find(CPid, CPMap) of
+        {ok, {MRef, MList}} ->
+            dict:store(CPid, {MRef, [MemberPid | MList]}, CPMap);
+        error ->
+            MRef = erlang:monitor(process, CPid),
+            dict:store(CPid, {MRef, [MemberPid]}, CPMap)
+    end.
 
 -spec cull_members_from_pool(#pool{}) -> #pool{}.
 cull_members_from_pool(#pool{cull_interval = {0, _}} = Pool) ->

+ 14 - 1
src/pooler.hrl

@@ -33,8 +33,21 @@
           %% The maximum age for members.
           max_age = ?DEFAULT_MAX_AGE             :: time_spec(),
 
-          member_sup,
+          %% The supervisor used to start new members
+          member_sup :: atom() | pid(),
+
+          %% Maps member pid to a tuple of the form:
+          %% {MonitorRef, Status, Time},
+          %% where MonitorRef is a monitor reference for the member,,
+          %% Status is either 'free' or the consumer pid, and Time is
+          %% an Erlang timestamp that records when the member became
+          %% free.
           all_members = dict:new()     :: dict(),
+
+          %% Maps consumer pid to a tuple of the form:
+          %% {MonitorRef, MemberList} where MonitorRef is a monitor
+          %% reference for the consumer and MemberList is a list of
+          %% members being consumed.
           consumer_to_pid = dict:new() :: dict(),
 
           %% The module to use for collecting metrics. If set to