Browse Source

Fix behavior when a consumer crashes.

Also started adding some stats which might provide for a cleaner way to
do some of the testing; take a pid and then inspect the stats, induce a
crash, and then inspect stats again.

Hi Sean,

Good chatting with you on irc yesterday.  The upcoming secondary index
feature you mentioned sounds like it would eliminate one of the
remaining points of friction we encounter when trying to model our
data in a Riak-efficient fashion.  Per your request, here are some
notes on how we would like to use Riak.

We are evaluating Riak for use as a backing store for Chef server as
well as the store for the authorization service that is part of the
Opscode Platform.  In both cases, the majority of operations can be
modeled as simple key/value lookup.  However, both uses also require
modeling collections that support add, delete, and list operations.

Rather than describing the complete schema, I'll describe the use case
around nodes in Chef as I think that captures the sort of thing we
need in a few places...

== Nodes in Chef ==

Node's are stored by unique name and have a typical value size of
30Kb.  Suppose an organization has ~10K nodes and there are 50K orgs.
The operations we need are as follows:

- Concurrently add new nodes to an org.

- Concurrently delete nodes.

- List all nodes in an org.  For large node counts, we will want to
  paginate in some fashion to display the list in a web-ui.  Nodes do
  not change orgs.

- List all nodes in an org with a specified environment attribute.
  The environment attribute is part of the node data, but can be
  changed; a node edit can move a node from env1 to env2.

Current assumption is that we would store nodes in an OrgName-Nodes
bucket by id.

It is also worth mentioning that we provide users with the ability to
search for nodes by arbitrary node attribute (nodes are loosely
structured JSON documents).  We currently use solr to index node data.
For search, we need wild-carding ('?', '*') and Boolean query at
minimum.  A while back I did a bit of experimenting with Riak search,
but quickly got stuck with some rough edges with query parsing (wrong
analyzer and difficulty with certain special characters that happen to
be in almost all of our queries).

Does that give enough context?  If not, let me know how I can
elaborate and we'll go from there.

Best Wishes,

+ seth

--
Seth Falcon | Senior Software Design Engineer | Opscode | @sfalcon
Seth Falcon 14 years ago
parent
commit
c36587b734
3 changed files with 128 additions and 65 deletions
  1. 13 12
      README.org
  2. 57 12
      src/pidq.erl
  3. 58 41
      test/pidq_test.erl

+ 13 - 12
README.org

@@ -33,14 +33,14 @@ clock.  For some further explaination, see [1] and [2].
 
 I wanted to avoid spinning up a new client for each request in the
 application.  Riak's protocol buffer client is a =gen_server= process
-and my intuition is that one doesn't want to pay for the startup time
-for every request you send to an app.  This suggested a pool of
-clients with some management to avoid concurrent use of a given
-client.  On top of that, it seemed convenient to add the ability to
-load balance between clients connected to different nodes in the Riak
-cluster.  The load-balancing is a secondary feature; even if you end
-up setting up [[http://haproxy.1wt.eu/][HAProxy]] for that aspect, you might still want the client
-pooling.
+that initiates a connection to a Riak node and my intuition is that
+one doesn't want to pay for the startup time for every request you
+send to an app.  This suggested a pool of clients with some management
+to avoid concurrent use of a given client.  On top of that, it seemed
+convenient to add the ability to load balance between clients
+connected to different nodes in the Riak cluster.  The load-balancing
+is a secondary feature; even if you end up setting up [[http://haproxy.1wt.eu/][HAProxy]] for that
+aspect, you might still want the client pooling.
 
 [1] http://lists.basho.com/pipermail/riak-users_lists.basho.com/2010-September/001900.html
 [2] http://lists.basho.com/pipermail/riak-users_lists.basho.com/2010-September/001904.html
@@ -115,7 +115,8 @@ You can also add or remove new pools while pidq is running using
 pidq is implemented as a =gen_server=.  Server state consists of:
 
 - A dict of pools keyed by pool name.
-- A dict mapping in use pids to their pool name.
+- A dict mapping in-use pids to their pool name and the pid of the
+  consumer that is using the pid.
 - A dict mapping consumer process pids to the pid they are using.
 - A module and function to use for starting new pids.
 
@@ -130,9 +131,9 @@ to the head of the free list.
 pidq is a system process and traps exits.  Before giving out a pid, it
 links to the requesting consumer process.  This way, if the consumer
 process crashes, pidq can recover the pid.  When the pid is returned,
-the requesting process will be unlinked.  Since the state of the pid
-is unknown in the case of a crashing consumer, we will destroy the pid
-and add a fresh one to the pool.
+the link to the consumer process will be severed.  Since the state of
+the pid is unknown in the case of a crashing consumer, we will destroy
+the pid and add a fresh one to the pool.
 
 The pid starter MFA should use spawn_link so that pidq will be linked
 to the pids (is it confusing that we've taken the term "pid" and

+ 57 - 12
src/pidq.erl

@@ -33,6 +33,7 @@
          return_pid/2,
          remove_pool/2,
          add_pool/1,
+         pool_stats/1,
          status/0]).
 
 %% ------------------------------------------------------------------
@@ -55,7 +56,8 @@ take_pid() ->
     gen_server:call(?SERVER, take_pid).
 
 return_pid(Pid, Status) when Status == ok; Status == fail ->
-    gen_server:cast(?SERVER, {return_pid, Pid, Status}),
+    CPid = self(),
+    gen_server:cast(?SERVER, {return_pid, Pid, Status, CPid}),
     ok.
 
 remove_pool(Name, How) when How == graceful; How == immediate ->
@@ -64,6 +66,9 @@ remove_pool(Name, How) when How == graceful; How == immediate ->
 add_pool(Pool) ->
     gen_server:call(?SERVER, {add_pool, Pool}).
 
+pool_stats(Pool) ->
+    gen_server:call(?SERVER, {pool_stats, Pool}).
+
 status() ->
     gen_server:call(?SERVER, status).
 
@@ -92,19 +97,40 @@ handle_call(stop, _From, State) ->
     % loop over in use and free pids and stop them?
     % {M, F} = State#state.pid_stopper,
     {stop, normal, stop_ok, State};
+handle_call({pool_stats, PoolName}, _From, State) ->
+    Pool = dict:fetch(PoolName, State#state.pools),
+    Stats = [{in_use, dict:fetch_keys(State#state.in_use_pids)},
+             {free, Pool#pool.free_pids}],
+    {reply, Stats, State};
 handle_call(_Request, _From, State) ->
     {noreply, ok, State}.
 
 
-handle_cast({return_pid, Pid, Status}, State) ->
-    {noreply, do_return_pid(Pid, Status, State)};
+handle_cast({return_pid, Pid, Status, CPid}, State) ->
+    {noreply, do_return_pid({Pid, Status}, CPid, State)};
 handle_cast(_Msg, State) ->
     {noreply, State}.
 
-handle_info({'EXIT', Pid, _Reason}, State) ->
+handle_info({'EXIT', Pid, Reason}, State) ->
+    % error_logger:info_report({got_exit, Pid, Reason}),
     State1 = case dict:find(Pid, State#state.in_use_pids) of
-                 {ok, _PName} -> do_return_pid(Pid, fail, State);
-                 error -> State
+                 {ok, {_PName, CPid}} -> do_return_pid({Pid, fail}, CPid, State);
+                 error ->
+                     CPMap = State#state.consumer_to_pid,
+                     case dict:find(Pid, CPMap) of
+
+                         {ok, Pids} ->
+                             error_logger:info_report({{consumer, Pid, Reason}, Pids}),
+                             IsOk = case Reason of
+                                        normal -> ok;
+                                        _Crash -> fail
+                                    end,
+                             lists:foldl(fun(P, S) ->
+                                                 do_return_pid({P, IsOk}, Pid, S)
+                                         end, State, Pids);
+                         error ->
+                             State
+                     end
              end,
     {noreply, State1};
 handle_info(_Info, State) ->
@@ -167,17 +193,19 @@ take_pid(PoolName, From, State) ->
         [Pid|Rest] ->
             % FIXME: handle min_free here -- should adding pids
             % to satisfy min_free be done in a spawned worker?
+            erlang:link(From),
             Pool1 = Pool#pool{free_pids = Rest, in_use_count = NumInUse + 1},
             CPMap1 = dict:update(From, fun(O) -> [Pid|O] end, [Pid], CPMap),
             {Pid, State#state{pools = dict:store(PoolName, Pool1, Pools),
-                              in_use_pids = dict:store(Pid, PoolName, InUse),
+                              in_use_pids = dict:store(Pid, {PoolName, From}, InUse),
                               consumer_to_pid = CPMap1}}
     end.
 
-do_return_pid(Pid, Status, State) ->
-    #state{in_use_pids = InUse, pools = Pools} = State,
+do_return_pid({Pid, Status}, CPid, State) ->
+    #state{in_use_pids = InUse, pools = Pools,
+           consumer_to_pid = CPMap} = State,
     case dict:find(Pid, InUse) of
-        {ok, PoolName} ->
+        {ok, {PoolName, _CPid2}} -> % FIXME, assert that CPid2 == CPid?
             Pool = dict:fetch(PoolName, Pools),
             {Pool1, State1} =
                 case Status of
@@ -185,9 +213,10 @@ do_return_pid(Pid, Status, State) ->
                     fail -> handle_failed_pid(Pid, PoolName, Pool, State)
                     end,
             State1#state{in_use_pids = dict:erase(Pid, InUse),
-                         pools = dict:store(PoolName, Pool1, Pools)};
+                         pools = dict:store(PoolName, Pool1, Pools),
+                         consumer_to_pid = cpmap_remove(Pid, CPid, CPMap)};
         error ->
-            error_logger:warning_report({return_pid_not_found, Pid}),
+            error_logger:warning_report({return_pid_not_found, Pid, dict:to_list(InUse)}),
             State
     end.
 
@@ -201,3 +230,19 @@ handle_failed_pid(Pid, PoolName, Pool, State) ->
     {_, NewState} = add_pids(PoolName, 1, State),
     NumInUse = Pool#pool.in_use_count,
     {Pool#pool{in_use_count = NumInUse - 1}, NewState}.
+
+cpmap_remove(Pid, CPid, CPMap) ->
+    case dict:find(CPid, CPMap) of
+        {ok, Pids0} ->
+            unlink(CPid), % FIXME: flush msg queue here?
+            Pids1 = lists:delete(Pid, Pids0),
+            case Pids1 of
+                [_H|_T] ->
+                    dict:store(CPid, Pids1, CPMap);
+                [] ->
+                    dict:erase(CPid, CPMap)
+            end;
+        error ->
+            % FIXME: this shouldn't happen, should we log or error?
+            CPMap
+    end.

+ 58 - 41
test/pidq_test.erl

@@ -9,8 +9,10 @@
 % and take a new pid, stop cleanly, and crash.
 
 start_user() ->
-    TC = pidq:take_pid(),
-    spawn(fun() -> user_loop(TC) end).
+    spawn(fun() ->
+                  TC = pidq:take_pid(),
+                  user_loop(TC)
+          end).
 
 user_id(Pid) ->
     Pid ! {get_tc_id, self()},
@@ -124,7 +126,7 @@ pidq_basics_test_() ->
                ?assertMatch(error_no_pids, pidq:take_pid()),
                ?assertMatch(error_no_pids, pidq:take_pid()),
                PRefs = [ R || {_T, R} <- [ get_tc_id(P) || P <- Pids ] ],
-               ?assertEqual(3, length(lists:usort(PRefs)))
+               ?assertEqual(length(PRefs), length(lists:usort(PRefs)))
        end
       },
 
@@ -133,7 +135,7 @@ pidq_basics_test_() ->
                P1 = pidq:take_pid(),
                P2 = pidq:take_pid(),
                ?assertNot(P1 == P2),
-               ok =  pidq:return_pid(P1, ok),
+               ok = pidq:return_pid(P1, ok),
                ok = pidq:return_pid(P2, ok),
                % pids are reused most recent first
                ?assertEqual(P2, pidq:take_pid()),
@@ -150,20 +152,33 @@ pidq_basics_test_() ->
                Ids1 = [ get_tc_id(P) || P <- Pids1 ],
                [ ?assertNot(lists:member(I, Ids0)) || I <- Ids1 ]
        end
-       }
-
-      % {"if a pid is returned with bad status it is replaced",
-      %  fun() ->
-      %          P1 = pidq:take_pid(),
-      %          P2 = pidq:take_pid(),
-      %          pidq:return_pid(P2, ok),
-      %          pidq:return_pid(P1, fail),
-      %          PN = pidq:take_pid(),
-      %          ?assertEqual(P2, pidq:take_pid()),
-      %          ?assertNot(PN == P1)
-      %  end
-      %  }
-      ]}.
+       },
+
+      {"if a pid is returned with bad status it is replaced",
+       fun() ->
+               Pids0 = [pidq:take_pid(), pidq:take_pid(), pidq:take_pid()],
+               Ids0 = [ get_tc_id(P) || P <- Pids0 ],
+               % return them all marking as bad
+               [ pidq:return_pid(P, fail) || P <- Pids0 ],
+               Pids1 = get_n_pids(3, []),
+               Ids1 = [ get_tc_id(P) || P <- Pids1 ],
+               [ ?assertNot(lists:member(I, Ids0)) || I <- Ids1 ]
+       end
+      },
+
+      {"if a consumer crashes, pid is replaced",
+       fun() ->
+               Consumer = start_user(),
+               StartId = user_id(Consumer),
+               ?debugVal(pidq:pool_stats("p1")),
+               user_crash(Consumer),
+               NewPid = hd(get_n_pids(1, [])),
+               NewId = get_tc_id(NewPid),
+               ?debugVal(pidq:pool_stats("p1")),
+               ?assertNot(NewId == StartId)
+       end
+      }
+     ]}.
 
 
 pidq_integration_test_() ->
@@ -171,7 +186,7 @@ pidq_integration_test_() ->
      % setup
      fun() ->
              Pools = [[{name, "p1"},
-                      {max_pids, 20},
+                      {max_pids, 10},
                       {min_free, 3},
                       {init_size, 10},
                       {pid_starter_args, ["type-0"]}]],
@@ -196,36 +211,38 @@ pidq_integration_test_() ->
                      TcIds = lists:sort([ user_id(UPid) || UPid <- Users ]),
                      ?assertEqual(lists:usort(TcIds), TcIds)
              end
-     end
-     ]
-    }.
-
-      % fun(Users) ->
-      % ]}
-
-      % {"users still unique after a renew cycle",
-      %  fun() ->
-      %          Users = [ start_user() || _X <- lists:seq(1, 10) ],
-      %          % return and take new tc pids, expect unique
-      %          [ user_new_tc(UPid) || UPid <- Users ],
-      %          TcIds = lists:sort([ user_id(UPid) || UPid <- Users ]),
-      %          % each user has a different tc ID
-      %          ?assertEqual(lists:usort(TcIds), TcIds)
-
-
-      % ]}.
+      end,
 
+      fun(Users) ->
+              fun() ->
+                      % users still unique after a renew cycle
+                      [ user_new_tc(UPid) || UPid <- Users ],
+                      TcIds = lists:sort([ user_id(UPid) || UPid <- Users ]),
+                      ?assertEqual(lists:usort(TcIds), TcIds)
+              end
+      end
+      % ,
 
+      % fun(Users) ->
+      %         fun() ->
+      %                 % all users crash, pids reused
+      %                 TcIds1 = lists:sort([ user_id(UPid) || UPid <- Users ]),
+      %                 [ user_crash(UPid) || UPid <- Users ],
+      %                 % Seq = lists:seq(1, length(Users)),
+      %                 Seq = lists:seq(1, 5),
+      %                 Users2 = [ start_user() || _X <- Seq ],
+      %                 TcIds2 = lists:sort([ user_id(UPid) || UPid <- Users2 ]),
+      %                 ?assertEqual(TcIds1, TcIds2)
+      %         end
+      % end
+     ]
+    }.
 
       %          % return and take new tc pids, still unique
       %          [ user_new_tc(UPid) || UPid <- Users ],
       %          TcIds2 = lists:sort([ user_id(UPid) || UPid <- Users ]),
       %          ?assertEqual(lists:usort(TcIds2), TcIds2),
       %          % if the users all crash...
-      %          [ user_crash(UPid) || UPid <- Users ],
-      %          Users2 = [ start_user() || _X <- lists:seq(1, 10) ],
-      %          TcIds3 = lists:sort([ user_id(UPid) || UPid <- Users ]),
-      %          ?assertEqual(lists:usort(TcIds3), TcIds3)
 
 
 % testing crash recovery means race conditions when either pids