Seth Falcon 14 лет назад
Родитель
Сommit
99c8a08804
4 измененных файлов с 435 добавлено и 17 удалено
  1. 67 10
      README.html
  2. 37 7
      README.org
  3. 176 0
      src/pidq.erl
  4. 155 0
      test/pidq_test.erl

+ 67 - 10
README.html

@@ -7,7 +7,7 @@ lang="en" xml:lang="en">
 <title>README</title>
 <meta http-equiv="Content-Type" content="text/html;charset=utf-8"/>
 <meta name="generator" content="Org-mode"/>
-<meta name="generated" content="2010-09-05 17:17:09 PDT"/>
+<meta name="generated" content="2010-09-08 23:24:52 PDT"/>
 <meta name="author" content="Seth Falcon"/>
 <meta name="description" content=""/>
 <meta name="keywords" content=""/>
@@ -90,7 +90,11 @@ lang="en" xml:lang="en">
 <li><a href="#sec-1_3_3">1.3.3 Other things you can do </a></li>
 </ul>
 </li>
-<li><a href="#sec-1_4">1.4 Details </a></li>
+<li><a href="#sec-1_4">1.4 Details </a>
+<ul>
+<li><a href="#sec-1_4_1">1.4.1 Pool management </a></li>
+</ul>
+</li>
 </ul>
 </li>
 </ul>
@@ -102,6 +106,11 @@ lang="en" xml:lang="en">
 <div class="outline-text-2" id="text-1">
 
 
+<p>
+<b>Note:</b> this is all work very much in progress.  If you are
+interested, drop me a note.  Right now, it is really just a readme
+and no working code.
+</p>
 
 </div>
 
@@ -278,13 +287,61 @@ You can also add or remove new pools while pidq is running using
 
 
 <p>
-pidq is implemented as a <code>gen_server</code>.  Server state consists of a dict
-of pools keyed by pool name.  Each pool keeps track of its parameters,
-a free list, and an in-use list.  Since our motivating use-case is
-Riak's pb client, we opt to reuse a given client as much as possible
-to avoid unnecessary vector clock growth.  So pids are taken from the
-head of the free list and returned to the head of the free list.
+pidq is implemented as a <code>gen_server</code>.  Server state consists of:
 </p>
+<ul>
+<li>
+A dict of pools keyed by pool name.
+</li>
+<li>
+A dict mapping in use pids to their pool name.
+</li>
+<li>
+A dict mapping consumer process pids to the pid they are using.
+</li>
+<li>
+A module and function to use for starting new pids.
+
+</li>
+</ul>
+
+<p>Each pool keeps track of its parameters, such as max pids to allow,
+initial pids to start, number of pids in use, and a list of free pids.
+</p>
+<p>
+Since our motivating use-case is Riak's pb client, we opt to reuse a
+given client as much as possible to avoid unnecessary vector clock
+growth; pids are taken from the head of the free list and returned
+to the head of the free list.
+</p>
+<p>
+pidq is a system process and traps exits.  Before giving out a pid, it
+links to the requesting consumer process.  This way, if the consumer
+process crashes, pidq can recover the pid.  When the pid is returned,
+the requesting process will be unlinked.  Since the state of the pid
+is unknown in the case of a crashing consumer, we will destroy the pid
+and add a fresh one to the pool.
+</p>
+<p>
+The pid starter MFA should use spawn<sub>link</sub> so that pidq will be linked
+to the pids (is it confusing that we've taken the term "pid" and
+turned it into a noun of this system?).  This way, when pids crash,
+pidq will be notified and can refill the pool with new pids.
+</p>
+<p>
+Also note that an alternative to a consumer explicitly returning a pid
+is for the consumer to exit normally.  pidq will receive the normal
+exit and can reclaim the pid.  In fact, we might want to implement pid
+return as "fake death" by sending pidq exit(PidqPid, normal).
+</p>
+
+</div>
+
+<div id="outline-container-1_4_1" class="outline-4">
+<h4 id="sec-1_4_1"><span class="section-number-4">1.4.1</span> Pool management </h4>
+<div class="outline-text-4" id="text-1_4_1">
+
+
 <p>
 It is an error to add a pool with a name that already exists.
 </p>
@@ -305,7 +362,6 @@ removed.
 pool is removed.
 
 
-
 </li>
 </ul>
 
@@ -336,6 +392,7 @@ pool is removed.
 </div>
 </div>
 </div>
+</div>
 <div id="footnotes">
 <h2 class="footnotes">Footnotes: </h2>
 <div id="text-footnotes">
@@ -348,7 +405,7 @@ pool is removed.
 <div id="postamble">
 <p class="author"> Author: Seth Falcon
 </p>
-<p class="date"> Date: 2010-09-05 17:17:09 PDT</p>
+<p class="date"> Date: 2010-09-08 23:24:52 PDT</p>
 <p class="creator">HTML generated by org-mode 7.01trans in emacs 23</p>
 </div>
 </div>

+ 37 - 7
README.org

@@ -1,5 +1,9 @@
 * pidq - A Process Pool Library for Erlang
 
+*Note:* this is all work very much in progress.  If you are
+ interested, drop me a note.  Right now, it is really just a readme
+ and no working code.
+
 ** Use pidq to manage pools of processes (pids).
 
 - Protect the pids from being used concurrently.  The main pidq
@@ -108,12 +112,39 @@ You can also add or remove new pools while pidq is running using
 
 ** Details
 
-pidq is implemented as a =gen_server=.  Server state consists of a dict
-of pools keyed by pool name.  Each pool keeps track of its parameters,
-a free list, and an in-use list.  Since our motivating use-case is
-Riak's pb client, we opt to reuse a given client as much as possible
-to avoid unnecessary vector clock growth.  So pids are taken from the
-head of the free list and returned to the head of the free list.
+pidq is implemented as a =gen_server=.  Server state consists of:
+
+- A dict of pools keyed by pool name.
+- A dict mapping in use pids to their pool name.
+- A dict mapping consumer process pids to the pid they are using.
+- A module and function to use for starting new pids.
+
+Each pool keeps track of its parameters, such as max pids to allow,
+initial pids to start, number of pids in use, and a list of free pids.
+
+Since our motivating use-case is Riak's pb client, we opt to reuse a
+given client as much as possible to avoid unnecessary vector clock
+growth; pids are taken from the head of the free list and returned
+to the head of the free list.
+
+pidq is a system process and traps exits.  Before giving out a pid, it
+links to the requesting consumer process.  This way, if the consumer
+process crashes, pidq can recover the pid.  When the pid is returned,
+the requesting process will be unlinked.  Since the state of the pid
+is unknown in the case of a crashing consumer, we will destroy the pid
+and add a fresh one to the pool.
+
+The pid starter MFA should use spawn_link so that pidq will be linked
+to the pids (is it confusing that we've taken the term "pid" and
+turned it into a noun of this system?).  This way, when pids crash,
+pidq will be notified and can refill the pool with new pids.
+
+Also note that an alternative to a consumer explicitly returning a pid
+is for the consumer to exit normally.  pidq will receive the normal
+exit and can reclaim the pid.  In fact, we might want to implement pid
+return as "fake death" by sending pidq exit(PidqPid, normal).
+
+*** Pool management
 
 It is an error to add a pool with a name that already exists.
 
@@ -128,7 +159,6 @@ Pool removal has two forms:
 - *immediate* all pids in free and in-use lists are shut down; the
   pool is removed.
 
-
 #+BEGIN_SRC erlang
   -spec(take_pid() -> pid()).
   

+ 176 - 0
src/pidq.erl

@@ -0,0 +1,176 @@
+-module(pidq).
+-behaviour(gen_server).
+-define(SERVER, ?MODULE).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-record(pool, {name,
+               max_pids = 100,
+               min_free = 3,
+               init_size = 10,
+               pid_starter_args = [],
+               free_pids,
+               in_use_count}).
+
+-record(state, {
+          npools,
+          pools = dict:new(),
+          in_use_pids = dict:new(),
+          consumer_to_pid = dict:new(),
+          pid_starter,
+          pid_stopper}).
+
+-define(gv(X, Y), proplists:get_value(X, Y)).
+-define(gv(X, Y, D), proplists:get_value(X, Y, D)).
+
+%% ------------------------------------------------------------------
+%% API Function Exports
+%% ------------------------------------------------------------------
+
+-export([start/1,
+         stop/0,
+         stop/1,
+         take_pid/0,
+         return_pid/2,
+         remove_pool/2,
+         add_pool/1,
+         status/0]).
+
+%% ------------------------------------------------------------------
+%% gen_server Function Exports
+%% ------------------------------------------------------------------
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
+
+%% ------------------------------------------------------------------
+%% API Function Definitions
+%% ------------------------------------------------------------------
+
+start(Config) ->
+    gen_server:start_link({local, ?SERVER}, ?MODULE, Config, []).
+
+stop() ->
+    gen_server:call(?SERVER, stop).
+
+stop(_How) ->
+    stop().
+
+take_pid() ->
+    gen_server:call(?SERVER, take_pid).
+
+return_pid(Pid, Status) when Status == ok; Status == fail ->
+    gen_server:call(?SERVER, {return_pid, Pid, Status}).
+
+remove_pool(Name, How) when How == graceful; How == immediate ->
+    gen_server:call(?SERVER, {remove_pool, Name, How}).
+
+add_pool(Pool) ->
+    gen_server:call(?SERVER, {add_pool, Pool}).
+
+status() ->
+    gen_server:call(?SERVER, status).
+
+%% ------------------------------------------------------------------
+%% gen_server Function Definitions
+%% ------------------------------------------------------------------
+
+init(Config) ->
+    PoolRecs = [ props_to_pool(P) || P <- ?gv(pools, Config) ],
+    Pools = [ {Pool#pool.name, Pool} || Pool <-  PoolRecs ],
+    State = #state{pid_starter = ?gv(pid_starter, Config),
+                   pid_stopper = ?gv(pid_stopper, Config,
+                                     {?MODULE, default_stopper}),
+                   npools = length(Pools),
+                   pools = dict:from_list(Pools)},
+    {ok, State}.
+
+handle_call(take_pid, {CPid, _Tag}, State) ->
+    % FIXME: load-balance?
+    PoolName = hd(dict:fetch_keys(State#state.pools)),
+    {NewPid, NewState} = take_pid(PoolName, CPid, State),
+    {reply, NewPid, NewState};
+handle_call(stop, _From, State) ->
+    {stop, normal, stop_ok, State};
+handle_call(_Request, _From, State) ->
+    {noreply, ok, State}.
+
+
+handle_cast({return_pid, Pid, _Status}, State) ->
+    {noreply, do_return_pid(Pid, State)};
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%% ------------------------------------------------------------------
+%% Internal Function Definitions
+%% ------------------------------------------------------------------
+
+% default_stopper(Pid) ->
+%     exit(Pid, kill).
+
+props_to_pool(P) ->
+    Defs = [{free_pids, []}, {in_use_count, 0}],
+    % a record is just a tagged tuple
+    P2 = lists:append(Defs, P),
+    Values = [ ?gv(Field, P2) || Field <- record_info(fields, pool) ],
+    list_to_tuple([pool|Values]).
+
+add_pids(PoolName, N, State) ->
+    #state{pools = Pools, pid_starter = {M, F}} = State,
+    Pool = dict:fetch(PoolName, Pools),
+    #pool{max_pids = Max, free_pids = Free, in_use_count = NumInUse,
+          pid_starter_args = Args} = Pool,
+    Total = length(Free) + NumInUse,
+    case Total + N < Max of
+        true ->
+            % FIXME: we'll want to link to these pids so we'll know if
+            % they crash. Or should the starter function be expected
+            % to do spawn_link?
+            NewPids = [ apply(M, F, Args) || _X <- lists:seq(1, N) ],
+            Pool1 = Pool#pool{free_pids = lists:append(Free, NewPids)},
+            {ok, State#state{pools = dict:store(PoolName, Pool1, Pools)}};
+        false ->
+            {max_pids_reached, State}
+    end.
+
+take_pid(PoolName, From, State) ->
+    #state{pools = Pools, in_use_pids = InUse, consumer_to_pid = CPMap} = State,
+    Pool = dict:fetch(PoolName, Pools),
+    #pool{max_pids = Max, free_pids = Free, in_use_count = NumInUse} = Pool,
+    case Free of
+        [] when NumInUse == Max ->
+            {error_no_pids, State};
+        [] when NumInUse < Max ->
+            {_Status, State1} = add_pids(PoolName, 1, State),
+            take_pid(PoolName, From, State1);
+        [Pid|Rest] ->
+            % FIXME: handle min_free here -- should adding pids
+            % to satisfy min_free be done in a spawned worker?
+            Pool1 = Pool#pool{free_pids = Rest, in_use_count = NumInUse + 1},
+            CPMap1 = dict:update(From, fun(O) -> [Pid|O] end, [Pid], CPMap),
+            {Pid, State#state{pools = dict:store(PoolName, Pool1, Pools),
+                              in_use_pids = dict:store(Pid, PoolName, InUse),
+                              consumer_to_pid = CPMap1}}
+    end.
+
+do_return_pid(Pid, State) ->
+    #state{in_use_pids = InUse, pools = Pools} = State,
+    case dict:find(Pid, InUse) of
+        {ok, PoolName} ->
+            Pool = dict:fetch(PoolName, Pools),
+            #pool{free_pids = Free, in_use_count = NumInUse} = Pool,
+            Pool1 = Pool#pool{free_pids = [Pid|Free], in_use_count = NumInUse - 1},
+            State#state{in_use_pids = dict:erase(Pid, InUse),
+                        pools = dict:store(PoolName, Pool1, Pools)};
+        error ->
+            error_logger:warning_report({return_pid_not_found, Pid}),
+            State
+    end.

+ 155 - 0
test/pidq_test.erl

@@ -0,0 +1,155 @@
+-module(pidq_test).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-compile([export_all]).
+
+% The `user' processes represent users of the pidq library.  A user
+% process will take a pid, report details on the pid it has, release
+% and take a new pid, stop cleanly, and crash.
+
+start_user() ->
+    TC = pidq:take_pid(),
+    spawn(fun() -> user_loop(TC) end).
+
+user_id(Pid) ->
+    Pid ! {get_tc_id, self()},
+    receive
+        {Type, Id} ->
+            {Type, Id}
+    end.
+
+user_new_tc(Pid) ->
+    Pid ! new_tc.
+
+user_stop(Pid) ->
+    Pid ! stop.
+
+user_crash(Pid) ->
+    Pid ! crash.
+
+user_loop(MyTC) ->
+    receive
+        {get_tc_id, From} ->
+            From ! get_tc_id(MyTC),
+            user_loop(MyTC);
+        new_tc ->
+            pidq:return_pid(MyTC, ok),
+            MyNewTC = pidq:take_pid(),
+            user_loop(MyNewTC);
+        stop ->
+            pidq:return_pid(MyTC, ok),
+            stopped;
+        crash ->
+            erlang:error({user_loop, kaboom})
+    end.
+
+% The `tc' processes represent the pids tracked by pidq for testing.
+% They have a type and an ID and can report their type and ID and
+% stop.
+
+tc_loop({Type, Id}) ->
+    receive
+        {get_id, From} ->
+            From ! {ok, Type, Id},
+            tc_loop({Type, Id});
+        stop -> stopped;
+        crash ->
+            erlang:error({tc_loop, kaboom})
+    end.
+
+get_tc_id(Pid) ->
+    Pid ! {get_id, self()},
+    receive
+        {ok, Type, Id} ->
+            {Type, Id}
+    after 200 ->
+            timeout
+    end.
+
+stop_tc(Pid) ->
+    Pid ! stop.
+
+tc_starter(Type) ->
+    Ref = make_ref(),
+    spawn(fun() -> tc_loop({Type, Ref}) end).
+
+
+tc_sanity_test() ->
+    Pid1 = tc_starter("1"),
+    {"1", Id1} = get_tc_id(Pid1),
+    Pid2 = tc_starter("1"),
+    {"1", Id2} = get_tc_id(Pid2),
+    ?assertNot(Id1 == Id2),
+    stop_tc(Pid1),
+    stop_tc(Pid2).
+
+user_sanity_test() ->
+    Pid1 = tc_starter("1"),
+    User = spawn(fun() -> user_loop(Pid1) end),
+    ?assertMatch({"1", _Ref}, user_id(User)),
+    user_crash(User),
+    stop_tc(Pid1).
+
+pidq_integration_test_() ->
+    {foreach,
+     % setup
+     fun() ->
+             Pools = [[{name, "p1"},
+                      {max_pids, 20},
+                      {min_free, 3},
+                      {init_size, 10},
+                      {pid_starter_args, ["type-0"]}]],
+
+             Config = [{pid_starter, {?MODULE, tc_starter}},
+                       {pid_stopper, {?MODULE, stop_tc}},
+                       {pools, Pools}],
+             pidq:start(Config),
+             Users = [ start_user() || _X <- lists:seq(1, 10) ],
+             Users
+     end,
+     % cleanup
+     fun(Users) ->
+             [ user_stop(U) || U <- Users ],
+             pidq:stop()
+     end,
+     %
+     [
+      fun(Users) ->
+             fun() ->
+                     % each user has a different tc ID
+                     TcIds = lists:sort([ user_id(UPid) || UPid <- Users ]),
+                     ?assertEqual(lists:usort(TcIds), TcIds)
+             end
+     end
+     ]
+    }.
+
+      % fun(Users) ->
+      % ]}
+
+      % {"users still unique after a renew cycle",
+      %  fun() ->
+      %          Users = [ start_user() || _X <- lists:seq(1, 10) ],
+      %          % return and take new tc pids, expect unique
+      %          [ user_new_tc(UPid) || UPid <- Users ],
+      %          TcIds = lists:sort([ user_id(UPid) || UPid <- Users ]),
+      %          % each user has a different tc ID
+      %          ?assertEqual(lists:usort(TcIds), TcIds)
+
+
+      % ]}.
+
+
+
+      %          % return and take new tc pids, still unique
+      %          [ user_new_tc(UPid) || UPid <- Users ],
+      %          TcIds2 = lists:sort([ user_id(UPid) || UPid <- Users ]),
+      %          ?assertEqual(lists:usort(TcIds2), TcIds2),
+      %          % if the users all crash...
+      %          [ user_crash(UPid) || UPid <- Users ],
+      %          Users2 = [ start_user() || _X <- lists:seq(1, 10) ],
+      %          TcIds3 = lists:sort([ user_id(UPid) || UPid <- Users ]),
+      %          ?assertEqual(lists:usort(TcIds3), TcIds3)
+
+