Seth Falcon 14 лет назад
Родитель
Сommit
f6d5dc1658
14 измененных файлов с 516 добавлено и 723 удалено
  1. 0 413
      README.html
  2. 171 112
      README.org
  3. 25 0
      pidq.config.example
  4. BIN
      rebar
  5. 1 0
      rebar.config
  6. 0 28
      src/pidq_sup.erl
  7. 3 3
      src/pooler.app.src
  8. 106 91
      src/pooler.erl
  9. 5 2
      src/pooler_app.erl
  10. 15 0
      src/pooler_pool_sup.erl
  11. 14 0
      src/pooler_pooled_worker_sup.erl
  12. 17 0
      src/pooler_sup.erl
  13. 85 0
      test/pooled_gs.erl
  14. 74 74
      test/pooler_test.erl

+ 0 - 413
README.html

@@ -1,413 +0,0 @@
-<?xml version="1.0" encoding="utf-8"?>
-<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN"
-               "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
-<html xmlns="http://www.w3.org/1999/xhtml"
-lang="en" xml:lang="en">
-<head>
-<title>README</title>
-<meta http-equiv="Content-Type" content="text/html;charset=utf-8"/>
-<meta name="generator" content="Org-mode"/>
-<meta name="generated" content="2010-09-08 23:24:52 PDT"/>
-<meta name="author" content="Seth Falcon"/>
-<meta name="description" content=""/>
-<meta name="keywords" content=""/>
-<style type="text/css">
- <!--/*--><![CDATA[/*><!--*/
-  html { font-family: Times, serif; font-size: 12pt; }
-  .title  { text-align: center; }
-  .todo   { color: red; }
-  .done   { color: green; }
-  .tag    { background-color: #add8e6; font-weight:normal }
-  .target { }
-  .timestamp { color: #bebebe; }
-  .timestamp-kwd { color: #5f9ea0; }
-  p.verse { margin-left: 3% }
-  pre {
-	border: 1pt solid #AEBDCC;
-	background-color: #F3F5F7;
-	padding: 5pt;
-	font-family: courier, monospace;
-        font-size: 90%;
-        overflow:auto;
-  }
-  table { border-collapse: collapse; }
-  td, th { vertical-align: top; }
-  dt { font-weight: bold; }
-  div.figure { padding: 0.5em; }
-  div.figure p { text-align: center; }
-  textarea { overflow-x: auto; }
-  .linenr { font-size:smaller }
-  .code-highlighted {background-color:#ffff00;}
-  .org-info-js_info-navigation { border-style:none; }
-  #org-info-js_console-label { font-size:10px; font-weight:bold;
-                               white-space:nowrap; }
-  .org-info-js_search-highlight {background-color:#ffff00; color:#000000;
-                                 font-weight:bold; }
-  /*]]>*/-->
-</style>
-<script type="text/javascript">
-<!--/*--><![CDATA[/*><!--*/
- function CodeHighlightOn(elem, id)
- {
-   var target = document.getElementById(id);
-   if(null != target) {
-     elem.cacheClassElem = elem.className;
-     elem.cacheClassTarget = target.className;
-     target.className = "code-highlighted";
-     elem.className   = "code-highlighted";
-   }
- }
- function CodeHighlightOff(elem, id)
- {
-   var target = document.getElementById(id);
-   if(elem.cacheClassElem)
-     elem.className = elem.cacheClassElem;
-   if(elem.cacheClassTarget)
-     target.className = elem.cacheClassTarget;
- }
-/*]]>*///-->
-</script>
-
-</head>
-<body>
-<div id="content">
-
-<h1 class="title">README</h1>
-
-
-<div id="table-of-contents">
-<h2>Table of Contents</h2>
-<div id="text-table-of-contents">
-<ul>
-<li><a href="#sec-1">1 pidq - A Process Pool Library for Erlang </a>
-<ul>
-<li><a href="#sec-1_1">1.1 Use pidq to manage pools of processes (pids). </a></li>
-<li><a href="#sec-1_2">1.2 Motivation </a></li>
-<li><a href="#sec-1_3">1.3 Usage and API </a>
-<ul>
-<li><a href="#sec-1_3_1">1.3.1 Startup configuration </a></li>
-<li><a href="#sec-1_3_2">1.3.2 Getting and returning pids </a></li>
-<li><a href="#sec-1_3_3">1.3.3 Other things you can do </a></li>
-</ul>
-</li>
-<li><a href="#sec-1_4">1.4 Details </a>
-<ul>
-<li><a href="#sec-1_4_1">1.4.1 Pool management </a></li>
-</ul>
-</li>
-</ul>
-</li>
-</ul>
-</div>
-</div>
-
-<div id="outline-container-1" class="outline-2">
-<h2 id="sec-1"><span class="section-number-2">1</span> pidq - A Process Pool Library for Erlang </h2>
-<div class="outline-text-2" id="text-1">
-
-
-<p>
-<b>Note:</b> this is all work very much in progress.  If you are
-interested, drop me a note.  Right now, it is really just a readme
-and no working code.
-</p>
-
-</div>
-
-<div id="outline-container-1_1" class="outline-3">
-<h3 id="sec-1_1"><span class="section-number-3">1.1</span> Use pidq to manage pools of processes (pids). </h3>
-<div class="outline-text-3" id="text-1_1">
-
-
-<ul>
-<li>
-Protect the pids from being used concurrently.  The main pidq
-interface is <code>pidq:take_pid/0</code> and <code>pidq:return_pid/2</code>.  The pidq
-server will keep track of which pids are <b>in use</b> and which are
-<b>free</b>.
-
-</li>
-<li>
-Maintain the size of the pid pool.  Specify a maximum number of pids
-in the pool.  Trigger pid creation when the free count drops below a
-minimum level or when a pid is marked as failing.
-
-</li>
-<li>
-Organize pids by type and randomly load-balance pids by type.  This
-is useful when the pids represent client processes connected to a
-particular node in a cluster (think database read slaves).  Separate
-pools are maintained for each type and a request for a pid will
-randomly select a type.
-
-</li>
-</ul>
-</div>
-
-</div>
-
-<div id="outline-container-1_2" class="outline-3">
-<h3 id="sec-1_2"><span class="section-number-3">1.2</span> Motivation </h3>
-<div class="outline-text-3" id="text-1_2">
-
-
-<p>
-The need for the pidq kit arose while writing an Erlang-based
-application that uses <a href="https://wiki.basho.com/display/RIAK/">Riak</a> for data storage.  When using the Erlang
-protocol buffer client for Riak, one should avoid accessing a given
-client concurrently.  This is because each client is associated with a
-unique client ID that corresponds to an element in an object's vector
-clock.  Concurrent action from the same client ID defeats the vector
-clock.  For some further explaination, see <sup><a class="footref" name="fnr.1" href="#fn.1">1</a></sup> and <sup><a class="footref" name="fnr.2" href="#fn.2">2</a></sup>.
-</p>
-<p>
-I wanted to avoid spinning up a new client for each request in the
-application.  Riak's protocol buffer client is a <code>gen_server</code> process
-and my intuition is that one doesn't want to pay for the startup time
-for every request you send to an app.  This suggested a pool of
-clients with some management to avoid concurrent use of a given
-client.  On top of that, it seemed convenient to add the ability to
-load balance between clients connected to different nodes in the Riak
-cluster.  The load-balancing is a secondary feature; even if you end
-up setting up <a href="http://haproxy.1wt.eu/">HAProxy</a> for that aspect, you might still want the client
-pooling.
-</p>
-</div>
-
-</div>
-
-<div id="outline-container-1_3" class="outline-3">
-<h3 id="sec-1_3"><span class="section-number-3">1.3</span> Usage and API </h3>
-<div class="outline-text-3" id="text-1_3">
-
-
-
-</div>
-
-<div id="outline-container-1_3_1" class="outline-4">
-<h4 id="sec-1_3_1"><span class="section-number-4">1.3.1</span> Startup configuration </h4>
-<div class="outline-text-4" id="text-1_3_1">
-
-
-<p>
-The idea is that you would wire up pidq to be a supervised process in
-your application.  When you start pidq, you specify a module and
-function to use for creating new pids.  You also specify the
-properties for each pool that you want pidq to manage, including the
-arguments to pass to the pid starter function.
-</p>
-<p>
-An example configuration looks like this:
-</p>
-
-
-
-<pre class="src src-erlang"><span style="color: #FF6400;">Pool1</span> = [{<span style="color: #D8FA3C;">name</span>, <span style="color: #61CE3C;">"node1"</span>},
-         {<span style="color: #D8FA3C;">max_pids</span>, 10},
-         {<span style="color: #D8FA3C;">min_free</span>, 2},
-         {<span style="color: #D8FA3C;">init_size</span>, 5}
-         {<span style="color: #D8FA3C;">pid_starter_args</span>, <span style="color: #FF6400;">Args1</span>}],
-
-<span style="color: #FF6400;">Pool2</span> = [{<span style="color: #D8FA3C;">name</span>, <span style="color: #61CE3C;">"node2"</span>},
-         {<span style="color: #D8FA3C;">max_pids</span>, 100},
-         {<span style="color: #D8FA3C;">min_free</span>, 2},
-         {<span style="color: #D8FA3C;">init_size</span>, 50}
-         {<span style="color: #D8FA3C;">pid_starter_args</span>, <span style="color: #FF6400;">Args2</span>}],
-
-<span style="color: #FF6400;">Config</span> = [{<span style="color: #D8FA3C;">pid_starter</span>, {<span style="color: #FF6400;">M</span>, <span style="color: #FF6400;">F</span>}},
-          {<span style="color: #D8FA3C;">pid_stopper</span>, {<span style="color: #FF6400;">M</span>, <span style="color: #FF6400;">F</span>}},
-          {<span style="color: #D8FA3C;">pools</span>, [<span style="color: #FF6400;">Pool1</span>, <span style="color: #FF6400;">Pool2</span>]}]
-
-<span style="color: #AEAEAE; font-style: italic;">% </span><span style="color: #AEAEAE; font-style: italic;">either call this directly, or wire this
-</span><span style="color: #AEAEAE; font-style: italic;">% </span><span style="color: #AEAEAE; font-style: italic;">call into your application's supervisor  
-</span><span style="color: #8DA6CE;">pidq</span>:<span style="color: #8DA6CE;">start</span>(<span style="color: #FF6400;">Config</span>)
-
-</pre>
-
-
-
-<p>
-Each pool has a unique name, a maximum number of pids, an initial
-number of pids, and a minimum free pids count.  When pidq starts, it
-will create pids to match the <code>init_size</code> value.  If there are <code>min_free</code>
-pids or fewer, pidq will add a pid as long as that doesn't bring the
-total used + free count over <code>max_pids</code>.
-</p>
-<p>
-Specifying a <code>pid_stopper</code> function is optional.  If not specified,
-<code>exit(pid, kill)</code> will be used to shutdown pids in the case of error,
-pidq shutdown, or pool removal.  The function specified will be passed
-a pid as returned by the <code>pid_starter</code> function.
-</p>
-</div>
-
-</div>
-
-<div id="outline-container-1_3_2" class="outline-4">
-<h4 id="sec-1_3_2"><span class="section-number-4">1.3.2</span> Getting and returning pids </h4>
-<div class="outline-text-4" id="text-1_3_2">
-
-
-<p>
-Once started, the main interaction you will have with pidq is through
-two functions, <code>take_pid/0</code> and <code>return_pid/2</code>.
-</p>
-<p>
-Call <code>pidq:take_pid()</code> to obtain a pid from the pool.  When you are done
-with it, return it to the pool using <code>pidq:return_pid(Pid, ok)</code>.  If
-you encountered an error using the pid, you can pass <code>fail</code> as the
-second argument.  In this case, pidq will permently remove that pid
-from the pool and start a new pid to replace it.
-</p>
-</div>
-
-</div>
-
-<div id="outline-container-1_3_3" class="outline-4">
-<h4 id="sec-1_3_3"><span class="section-number-4">1.3.3</span> Other things you can do </h4>
-<div class="outline-text-4" id="text-1_3_3">
-
-
-<p>
-You can get the status for the system via <code>pidq:status()</code>.  This will
-return some informational details about the pools being managed.
-</p>
-<p>
-You can also add or remove new pools while pidq is running using
-<code>pidq:add_pool/1</code> and <code>pidq:remove_pool/1</code>.  Each pid 
-</p>
-</div>
-</div>
-
-</div>
-
-<div id="outline-container-1_4" class="outline-3">
-<h3 id="sec-1_4"><span class="section-number-3">1.4</span> Details </h3>
-<div class="outline-text-3" id="text-1_4">
-
-
-<p>
-pidq is implemented as a <code>gen_server</code>.  Server state consists of:
-</p>
-<ul>
-<li>
-A dict of pools keyed by pool name.
-</li>
-<li>
-A dict mapping in use pids to their pool name.
-</li>
-<li>
-A dict mapping consumer process pids to the pid they are using.
-</li>
-<li>
-A module and function to use for starting new pids.
-
-</li>
-</ul>
-
-<p>Each pool keeps track of its parameters, such as max pids to allow,
-initial pids to start, number of pids in use, and a list of free pids.
-</p>
-<p>
-Since our motivating use-case is Riak's pb client, we opt to reuse a
-given client as much as possible to avoid unnecessary vector clock
-growth; pids are taken from the head of the free list and returned
-to the head of the free list.
-</p>
-<p>
-pidq is a system process and traps exits.  Before giving out a pid, it
-links to the requesting consumer process.  This way, if the consumer
-process crashes, pidq can recover the pid.  When the pid is returned,
-the requesting process will be unlinked.  Since the state of the pid
-is unknown in the case of a crashing consumer, we will destroy the pid
-and add a fresh one to the pool.
-</p>
-<p>
-The pid starter MFA should use spawn<sub>link</sub> so that pidq will be linked
-to the pids (is it confusing that we've taken the term "pid" and
-turned it into a noun of this system?).  This way, when pids crash,
-pidq will be notified and can refill the pool with new pids.
-</p>
-<p>
-Also note that an alternative to a consumer explicitly returning a pid
-is for the consumer to exit normally.  pidq will receive the normal
-exit and can reclaim the pid.  In fact, we might want to implement pid
-return as "fake death" by sending pidq exit(PidqPid, normal).
-</p>
-
-</div>
-
-<div id="outline-container-1_4_1" class="outline-4">
-<h4 id="sec-1_4_1"><span class="section-number-4">1.4.1</span> Pool management </h4>
-<div class="outline-text-4" id="text-1_4_1">
-
-
-<p>
-It is an error to add a pool with a name that already exists.
-</p>
-<p>
-Pool removal has two forms:
-</p>
-<ul>
-<li>
-<b>graceful</b> pids in the free list are killed (using exit(pid, kill)
-unless a <code>pid_stopper</code> is specified in the pool parameters.  No pids
-will be handed out from this pool's free list.  As pids are
-returned, they are shut down.  When the pool is empty, it is
-removed.
-
-</li>
-<li>
-<b>immediate</b> all pids in free and in-use lists are shut down; the
-pool is removed.
-
-
-</li>
-</ul>
-
-
-<pre class="src src-erlang"><span style="color: #7fffd4;">-spec</span>(<span style="color: #8DA6CE;">take_pid</span>() -&gt; <span style="color: #94bff3;">pid</span>()).
-
-<span style="color: #7fffd4;">-spec</span>(<span style="color: #8DA6CE;">return_pid</span>(<span style="color: #94bff3;">pid</span>(), <span style="color: #D8FA3C;">ok</span> | <span style="color: #D8FA3C;">fail</span>) -&gt; <span style="color: #D8FA3C;">ignore</span>).
-
-<span style="color: #7fffd4;">-spec</span>(<span style="color: #8DA6CE;">status</span>() -&gt; [<span style="color: #8DA6CE;">term</span>()]).
-
-<span style="color: #7fffd4;">-type</span>(<span style="color: #8DA6CE;">pid_type_opt</span>() ::
-      {<span style="color: #D8FA3C;">name</span>, <span style="color: #8DA6CE;">string</span>()} |
-      {<span style="color: #D8FA3C;">max_pids</span>, <span style="color: #8DA6CE;">int</span>()} |
-      {<span style="color: #D8FA3C;">min_free</span>, <span style="color: #8DA6CE;">int</span>()} |
-      {<span style="color: #D8FA3C;">init_size</span>, <span style="color: #8DA6CE;">int</span>()} |
-      {<span style="color: #D8FA3C;">pid_starter_args</span>, [<span style="color: #8DA6CE;">term</span>()]}).
-
-<span style="color: #7fffd4;">-type</span>(<span style="color: #8DA6CE;">pid_type_spec</span>() :: [<span style="color: #8DA6CE;">pid_type_opt</span>()]).
-<span style="color: #7fffd4;">-spec</span>(<span style="color: #8DA6CE;">add_type</span>(<span style="color: #8DA6CE;">pid_type_spec</span>()) -&gt; <span style="color: #D8FA3C;">ok</span> | {<span style="color: #D8FA3C;">error</span>, <span style="color: #FF6400;">Why</span>}).
-<span style="color: #7fffd4;">-spec</span>(<span style="color: #8DA6CE;">remove_type</span>(<span style="color: #8DA6CE;">string</span>()) -&gt; <span style="color: #D8FA3C;">ok</span> | {<span style="color: #D8FA3C;">error</span>, <span style="color: #FF6400;">Why</span>}).
-</pre>
-
-
-
-
-
-
-</div>
-</div>
-</div>
-</div>
-<div id="footnotes">
-<h2 class="footnotes">Footnotes: </h2>
-<div id="text-footnotes">
-<p class="footnote"><sup><a class="footnum" name="fn.1" href="#fnr.1">1</a></sup> <a href="http://lists.basho.com/pipermail/riak-users_lists.basho.com/2010-September/001900.html">http://lists.basho.com/pipermail/riak-users_lists.basho.com/2010-September/001900.html</a>
-</p>
-<p class="footnote"><sup><a class="footnum" name="fn.2" href="#fnr.2">2</a></sup> <a href="http://lists.basho.com/pipermail/riak-users_lists.basho.com/2010-September/001904.html">http://lists.basho.com/pipermail/riak-users_lists.basho.com/2010-September/001904.html</a>
-</p>
-</div>
-</div>
-<div id="postamble">
-<p class="author"> Author: Seth Falcon
-</p>
-<p class="date"> Date: 2010-09-08 23:24:52 PDT</p>
-<p class="creator">HTML generated by org-mode 7.01trans in emacs 23</p>
-</div>
-</div>
-</body>
-</html>

+ 171 - 112
README.org

@@ -1,149 +1,174 @@
-* pidq - A Process Pool Library for Erlang
-
-*Note:* this is all work very much in progress.  If you are
- interested, drop me a note.  Right now, it is really just a readme
- and no working code.
-
-** Use pidq to manage pools of processes (pids).
-
-- Protect the pids from being used concurrently.  The main pidq
-  interface is =pidq:take_pid/0= and =pidq:return_pid/2=.  The pidq
-  server will keep track of which pids are *in use* and which are
-  *free*.
-
-- Maintain the size of the pid pool.  Specify a maximum number of pids
-  in the pool.  Trigger pid creation when the free count drops below a
-  minimum level or when a pid is marked as failing.
-
-- Organize pids by type and randomly load-balance pids by type.  This
-  is useful when the pids represent client processes connected to a
-  particular node in a cluster (think database read slaves).  Separate
-  pools are maintained for each type and a request for a pid will
-  randomly select a type.
+* pooler - An OTP Process Pool Application
+
+The pooler application allows you to manage pools of OTP behaviors
+such as gen_servers, gen_fsms, or supervisors, and provide consumers
+with exclusive access to pool members using pooler:take_member.
+
+** What pooler does
+
+- Protects the members of a pool from being used concurrently.  The
+  main pooler interface is =pooler:take_member/0= and
+  =pooler:return_member/2=.  The pooler server will keep track of
+  which members are *in use* and which are *free*.  There is no need
+  to call =pooler:return_member= if the consumer is a short-lived
+  process; in this case, pooler will detect the consumer's normal exit
+  and reclaim the member.
+
+- Maintains the size of the pool.  You specify an initial and a
+  maximum number of members in the pool.  Pooler will trigger member
+  creation when the free count drops to zero (as long as the in use
+  count is less than the maximum).  New pool members are added to
+  replace member that crash.  If a consumer crashes, the member it was
+  using will be destroyed and replaced.
+
+- Manage multiple pools.  A common configuration is to have each pool
+  contain client processes connected to a particular node in a cluster
+  (think database read slaves).  By default, pooler will randomly
+  select a pool to fetch a member from.
 
 ** Motivation
 
-The need for the pidq kit arose while writing an Erlang-based
-application that uses [[https://wiki.basho.com/display/RIAK/][Riak]] for data storage.  When using the Erlang
-protocol buffer client for Riak, one should avoid accessing a given
-client concurrently.  This is because each client is associated with a
-unique client ID that corresponds to an element in an object's vector
-clock.  Concurrent action from the same client ID defeats the vector
-clock.  For some further explaination, see [1] and [2].
-
-I wanted to avoid spinning up a new client for each request in the
-application.  Riak's protocol buffer client is a =gen_server= process
-that initiates a connection to a Riak node and my intuition is that
-one doesn't want to pay for the startup time for every request you
-send to an app.  This suggested a pool of clients with some management
-to avoid concurrent use of a given client.  On top of that, it seemed
-convenient to add the ability to load balance between clients
-connected to different nodes in the Riak cluster.  The load-balancing
-is a secondary feature; even if you end up setting up [[http://haproxy.1wt.eu/][HAProxy]] for that
-aspect, you might still want the client pooling.
+The need for pooler arose while writing an Erlang-based application
+that uses [[https://wiki.basho.com/display/RIAK/][Riak]] for data storage.  Riak's protocol buffer client is a
+=gen_server= process that initiates a connection to a Riak node.  A
+pool is needed to avoid spinning up a new client for each request in
+the application.  Reusing clients also has the benefit of keeping the
+vector clocks smaller since each client ID corresponds to an entry in
+the vector clock.
+
+When using the Erlang protocol buffer client for Riak, one should
+avoid accessing a given client concurrently.  This is because each
+client is associated with a unique client ID that corresponds to an
+element in an object's vector clock.  Concurrent action from the same
+client ID defeats the vector clock.  For some further explanation,
+see [1] and [2].  Note that concurrent access to Riak's pb client is
+actual ok as long as you avoid updating the same key at the same
+time.  So the pool needs to have checkout/checkin semantics that give
+consumers exclusive access to a client.
+
+On top of that, in order to evenly load a Riak cluster and be able to
+continue in the face of Riak node failures, consumers should spread
+their requests across clients connected to each node.  The client pool
+provides an easy way to load balance.
 
 [1] http://lists.basho.com/pipermail/riak-users_lists.basho.com/2010-September/001900.html
 [2] http://lists.basho.com/pipermail/riak-users_lists.basho.com/2010-September/001904.html
 
 ** Usage and API
 
-*** Startup configuration
-
-The idea is that you would wire up pidq to be a supervised process in
-your application.  When you start pidq, you specify a module and
-function to use for creating new pids.  You also specify the
-properties for each pool that you want pidq to manage, including the
-arguments to pass to the pid starter function.
+*** Pool Configuration
 
-An example configuration looks like this:
+Pool configuration is specified in the pooler application's
+environment.  This can be provided in a config file using =-config= or
+set at startup using =application:set_env(pooler, pools,
+Pools)=. Here's an example config file that creates three pools of
+Riak pb clients each talking to a different node in a local cluster:
 
 #+BEGIN_SRC erlang
-  Pool1 = [{name, "node1"},
-           {max_pids, 10},
-           {min_free, 2},
-           {init_size, 5}
-           {pid_starter_args, Args1}],
-  
-  Pool2 = [{name, "node2"},
-           {max_pids, 100},
-           {min_free, 2},
-           {init_size, 50}
-           {pid_starter_args, Args2}],
-  
-  Config = [{pid_starter, {M, F}},
-            {pid_stopper, {M, F}},
-            {pools, [Pool1, Pool2]}]
-
-  % either call this directly, or wire this
-  % call into your application's supervisor  
-  pidq:start(Config)
-
+% pooler.config
+% Start Erlang as: erl -config pooler
+% -*- mode: erlang -*-
+% pooler app config
+[
+ {pooler, [
+         {pools, [
+                  [{name, "rc8081"},
+                   {max_count, 5},
+                   {init_count, 2},
+                   {start_mfa,
+                    {riakc_pb_socket, start_link, ["localhost", 8081]}}],
+
+                  [{name, "rc8082"},
+                   {max_count, 5},
+                   {init_count, 2},
+                   {start_mfa,
+                    {riakc_pb_socket, start_link, ["localhost", 8082]}}],
+
+                  [{name, "rc8083"},
+                   {max_count, 5},
+                   {init_count, 2},
+                   {start_mfa,
+                    {riakc_pb_socket, start_link, ["localhost", 8083]}}]
+                 ]}
+        ]}
+].
 #+END_SRC
 
-Each pool has a unique name, a maximum number of pids, an initial
-number of pids, and a minimum free pids count.  When pidq starts, it
-will create pids to match the =init_size= value.  If there are =min_free=
-pids or fewer, pidq will add a pid as long as that doesn't bring the
-total used + free count over =max_pids=.
+Each pool has a unique name, an initial and maximum number of members,
+and an ={M, F, A}= describing how to start members of the pool.  When
+pooler starts, it will create members in each pool according to
+=init_count=.
+
+*** Using pooler
 
-Specifying a =pid_stopper= function is optional.  If not specified,
-=exit(pid, kill)= will be used to shutdown pids in the case of error,
-pidq shutdown, or pool removal.  The function specified will be passed
-a pid as returned by the =pid_starter= function.
+Here's an example session:
 
-*** Getting and returning pids
+#+BEGIN_SRC erlang
+application:start(pooler).
+P = pooler:take_member(),
+% use P
+pooler:return_member(P, ok).
+#+END_SRC
 
-Once started, the main interaction you will have with pidq is through
-two functions, =take_pid/0= and =return_pid/2=.
+Once started, the main interaction you will have with pooler is through
+two functions, =take_member/0= and =return_member/2=.
 
-Call =pidq:take_pid()= to obtain a pid from the pool.  When you are done
-with it, return it to the pool using =pidq:return_pid(Pid, ok)=.  If
-you encountered an error using the pid, you can pass =fail= as the
-second argument.  In this case, pidq will permently remove that pid
-from the pool and start a new pid to replace it.
+Call =pooler:take_member()= to obtain a member from a randomly
+selected pool.  When you are done with it, return it to the pool using
+=pooler:return_member(Pid, ok)=.  If you encountered an error using
+the member, you can pass =fail= as the second argument.  In this case,
+pooler will permanently remove that member from the pool and start a
+new member to replace it.  If your process is short lived, you can
+omit the call to =return_member=.  In this case, pooler will detect
+the normal exit of the consumer and reclaim the member.
 
 *** Other things you can do
 
-You can get the status for the system via =pidq:status()=.  This will
+You can get the status for the system via =pooler:status()=.  This will
 return some informational details about the pools being managed.
 
-You can also add or remove new pools while pidq is running using
-=pidq:add_pool/1= and =pidq:remove_pool/1=.  Each pid 
+You can also add or remove new pools while pooler is running using
+=pooler:add_pool/1= and =pooler:remove_pool/1= (not yet implemented). 
 
 ** Details
 
-pidq is implemented as a =gen_server=.  Server state consists of:
+pooler is implemented as a =gen_server=.  Server state consists of:
 
 - A dict of pools keyed by pool name.
-- A dict mapping in-use pids to their pool name and the pid of the
-  consumer that is using the pid.
-- A dict mapping consumer process pids to the pid they are using.
-- A module and function to use for starting new pids.
+- A dict of pool supervisors keyed by pool name.
+- A dict mapping in-use members to their pool name and the pid of the
+  consumer that is using the member.
+- A dict mapping consumer process pids to the member they are using.
 
-Each pool keeps track of its parameters, such as max pids to allow,
-initial pids to start, number of pids in use, and a list of free pids.
+Each pool keeps track of its parameters, such as max member to allow,
+initial members to start, number of members in use, and a list of free
+members.
 
 Since our motivating use-case is Riak's pb client, we opt to reuse a
 given client as much as possible to avoid unnecessary vector clock
-growth; pids are taken from the head of the free list and returned
+growth; members are taken from the head of the free list and returned
 to the head of the free list.
 
-pidq is a system process and traps exits.  Before giving out a pid, it
-links to the requesting consumer process.  This way, if the consumer
-process crashes, pidq can recover the pid.  When the pid is returned,
-the link to the consumer process will be severed.  Since the state of
-the pid is unknown in the case of a crashing consumer, we will destroy
-the pid and add a fresh one to the pool.
+pooler is a system process and traps exits.  Before giving out a
+member, it links to the requesting consumer process.  This way, if the
+consumer process crashes, pooler can recover the member.  When the
+member is returned, the link to the consumer process will be severed.
+Since the state of the member is unknown in the case of a crashing
+consumer, we will destroy the member and add a fresh one to the pool.
+
+The member starter MFA should use start_link so that pooler will be
+linked to the members.  This way, when members crash, pooler will be
+notified and can refill the pool with new pids.
 
-The pid starter MFA should use spawn_link so that pidq will be linked
-to the pids (is it confusing that we've taken the term "pid" and
-turned it into a noun of this system?).  This way, when pids crash,
-pidq will be notified and can refill the pool with new pids.
+*** Supervision
+
+The top-level pooler supervisor, pooler_sup, supervises the pooler
+gen_server and the pooler_pool_sup supervisor.  pooler_pool_sup
+supervises individual pool supervisors (pooler_pooled_worker_sup).
+Each pooler_pooled_worker_sup supervises the members of a pool.
+
+[[./pidq_appmon.jpg]]
 
-Also note that an alternative to a consumer explicitly returning a pid
-is for the consumer to exit normally.  pidq will receive the normal
-exit and can reclaim the pid.  In fact, we might want to implement pid
-return as "fake death" by sending pidq exit(PidqPid, normal).
 
 *** Pool management
 
@@ -161,9 +186,9 @@ Pool removal has two forms:
   pool is removed.
 
 #+BEGIN_SRC erlang
-  -spec(take_pid() -> pid()).
+  -spec(take_member() -> pid()).
   
-  -spec(return_pid(pid(), ok | fail) -> ignore).
+  -spec(return_member(pid(), ok | fail) -> ignore).
   
   -spec(status() -> [term()]).
   
@@ -187,3 +212,37 @@ by entirely different services.  Could be useful for testing a new
 client implementation.
 *** Rename something other than "pid"
 *** Consider ets for state storage rather than dict
+
+#+BEGIN_SRC erlang
+pman:start().
+A1 = {riakc_pb_socket, start_link, ["127.0.0.1", 8081]}.
+{ok, S1} = pidq_pool_sup:start_link(A1).
+supervisor:start_child(S1, []).
+
+{ok, S2} = pidq_sup:start_link([]).
+supervisor:start_child(pidq_pool_sup, [A1]).
+
+application:load(pidq).
+C = application:get_all_env(pidq).
+pidq:start(C).
+#+END_SRC
+
+*** supervision strategy
+
+**** pidq_sup
+top-level supervisor watches pidq gen_server and the pidq_pool_sup
+supervisor.
+**** pidq_pool_sup
+A simple_one_for_one supervisor that is used to create/watch
+pidq_pooled_worker_sup supervisor.  You use this to create a new pool
+and specify the M,F,A of the pooled worker at start.
+**** pidq_pooled_worker_sup
+Another simple_one_for_one that is used to create actual workers.
+
+* Rename plan
+genpool, gs_pool, pooler
+pid => worker, member, gs, gspid
+pooler:take_member/0
+pooler:return_member/2
+
+#+OPTIONS: ^:{}

+ 25 - 0
pidq.config.example

@@ -0,0 +1,25 @@
+% -*- mode: erlang -*-
+% pidq app config
+[
+ {pidq, [
+         {pools, [
+                  [{name, "rc8081"},
+                   {max_count, 5},
+                   {init_count, 2},
+                   {start_mfa,
+                    {riakc_pb_socket, start_link, ["localhost", 8081]}}],
+
+                  [{name, "rc8082"},
+                   {max_count, 5},
+                   {init_count, 2},
+                   {start_mfa,
+                    {riakc_pb_socket, start_link, ["localhost", 8082]}}],
+
+                  [{name, "rc8083"},
+                   {max_count, 5},
+                   {init_count, 2},
+                   {start_mfa,
+                    {riakc_pb_socket, start_link, ["localhost", 8083]}}]
+                 ]}
+        ]}
+].


+ 1 - 0
rebar.config

@@ -0,0 +1 @@
+{erl_opts, [debug_info]}.

+ 0 - 28
src/pidq_sup.erl

@@ -1,28 +0,0 @@
-
--module(pidq_sup).
-
--behaviour(supervisor).
-
-%% API
--export([start_link/0]).
-
-%% Supervisor callbacks
--export([init/1]).
-
-%% Helper macro for declaring children of supervisor
--define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
-
-%% ===================================================================
-%% API functions
-%% ===================================================================
-
-start_link() ->
-    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-
-%% ===================================================================
-%% Supervisor callbacks
-%% ===================================================================
-
-init([]) ->
-    {ok, { {one_for_one, 5, 10}, []} }.
-

+ 3 - 3
src/pidq.app.src → src/pooler.app.src

@@ -1,12 +1,12 @@
-{application, pidq,
+{application, pooler,
  [
   {description, ""},
-  {vsn, "1"},
+  {vsn, git},
   {registered, []},
   {applications, [
                   kernel,
                   stdlib
                  ]},
-  {mod, { pidq_app, []}},
+  {mod, { pooler_app, []}},
   {env, []}
  ]}.

+ 106 - 91
src/pidq.erl → src/pooler.erl

@@ -1,24 +1,26 @@
--module(pidq).
+-module(pooler).
 -behaviour(gen_server).
 -define(SERVER, ?MODULE).
 
 -include_lib("eunit/include/eunit.hrl").
 
--record(pool, {name,
-               max_pids = 100,
-               min_free = 3,
-               init_size = 10,
-               pid_starter_args = [],
-               free_pids,
-               in_use_count}).
+-record(pool, {
+          name             :: string(),
+          max_count = 100  :: non_neg_integer(),
+          init_count = 10  :: non_neg_integer(),
+          start_mfa        :: {atom(), atom(), [term()]},
+          free_pids = []   :: [pid()],
+          in_use_count = 0 :: non_neg_integer()
+         }).
 
 -record(state, {
-          npools,
-          pools = dict:new(),
-          in_use_pids = dict:new(),
-          consumer_to_pid = dict:new(),
-          pid_starter,
-          pid_stopper}).
+          npools                       :: non_neg_integer(),
+          pools = dict:new()           :: dict:dictionary(),
+          pool_sups = dict:new()       :: dict:dictionary(),
+          in_use_pids = dict:new()     :: dict:dictionary(),
+          consumer_to_pid = dict:new() :: dict:dictionary(),
+          pool_selector                :: array()
+         }).
 
 -define(gv(X, Y), proplists:get_value(X, Y)).
 -define(gv(X, Y, D), proplists:get_value(X, Y, D)).
@@ -28,49 +30,57 @@
 %% ------------------------------------------------------------------
 
 -export([start/1,
+         start_link/1,
          stop/0,
-         take_pid/0,
-         return_pid/2,
-         remove_pool/2,
-         add_pool/1,
-         pool_stats/1,
-         status/0]).
+         take_member/0,
+         return_member/2,
+         % remove_pool/2,
+         % add_pool/1,
+         pool_stats/1]).
 
 %% ------------------------------------------------------------------
 %% gen_server Function Exports
 %% ------------------------------------------------------------------
 
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
+-export([init/1,
+         handle_call/3,
+         handle_cast/2,
+         handle_info/2,
+         terminate/2,
+         code_change/3]).
 
 %% ------------------------------------------------------------------
 %% API Function Definitions
 %% ------------------------------------------------------------------
 
+start_link(Config) ->
+    gen_server:start_link({local, ?SERVER}, ?MODULE, Config, []).
+
 start(Config) ->
     gen_server:start_link({local, ?SERVER}, ?MODULE, Config, []).
 
 stop() ->
     gen_server:call(?SERVER, stop).
 
-take_pid() ->
-    gen_server:call(?SERVER, take_pid).
+take_member() ->
+    gen_server:call(?SERVER, take_member).
 
-return_pid(Pid, Status) when Status == ok; Status == fail ->
+return_member(Pid, Status) when Status == ok; Status == fail ->
     CPid = self(),
-    gen_server:cast(?SERVER, {return_pid, Pid, Status, CPid}),
+    gen_server:cast(?SERVER, {return_member, Pid, Status, CPid}),
     ok.
 
-remove_pool(Name, How) when How == graceful; How == immediate ->
-    gen_server:call(?SERVER, {remove_pool, Name, How}).
+% TODO:
+% remove_pool(Name, How) when How == graceful; How == immediate ->
+%     gen_server:call(?SERVER, {remove_pool, Name, How}).
 
-add_pool(Pool) ->
-    gen_server:call(?SERVER, {add_pool, Pool}).
+% TODO:
+% add_pool(Pool) ->
+%     gen_server:call(?SERVER, {add_pool, Pool}).
 
 pool_stats(Pool) ->
     gen_server:call(?SERVER, {pool_stats, Pool}).
 
-status() ->
-    gen_server:call(?SERVER, status).
 
 %% ------------------------------------------------------------------
 %% gen_server Function Definitions
@@ -79,23 +89,30 @@ status() ->
 init(Config) ->
     PoolRecs = [ props_to_pool(P) || P <- ?gv(pools, Config) ],
     Pools = [ {Pool#pool.name, Pool} || Pool <-  PoolRecs ],
-    State = #state{pid_starter = ?gv(pid_starter, Config),
-                   pid_stopper = ?gv(pid_stopper, Config,
-                                     {?MODULE, default_stopper}),
-                   npools = length(Pools),
-                   pools = dict:from_list(Pools)},
+    PoolSups =
+        lists:map(
+          fun(#pool{name = Name, start_mfa = MFA}) ->
+                  {ok, SupPid} = supervisor:start_child(pooler_pool_sup, [MFA]),
+                  {Name, SupPid}
+          end, PoolRecs),
+    State0 = #state{npools = length(Pools),
+                    pools = dict:from_list(Pools),
+                    pool_sups = dict:from_list(PoolSups),
+                    pool_selector = array:from_list([PN || {PN, _} <- Pools])
+                  },
+    {ok, State} = lists:foldl(
+                    fun(#pool{name = PName, init_count = N}, {ok, AccState}) ->
+                            add_pids(PName, N, AccState)
+                    end, {ok, State0}, PoolRecs),
     process_flag(trap_exit, true),
     {ok, State}.
 
-handle_call(take_pid, {CPid, _Tag}, State) ->
-    % FIXME: load-balance?
-    PoolName = hd(dict:fetch_keys(State#state.pools)),
-    {NewPid, NewState} = take_pid(PoolName, CPid, State),
+handle_call(take_member, {CPid, _Tag},
+            State = #state{pool_selector = PS, npools = NP}) ->
+    PoolName = array:get(crypto:rand_uniform(0, NP), PS),
+    {NewPid, NewState} = take_member(PoolName, CPid, State),
     {reply, NewPid, NewState};
 handle_call(stop, _From, State) ->
-    % FIXME:
-    % loop over in use and free pids and stop them?
-    % {M, F} = State#state.pid_stopper,
     {stop, normal, stop_ok, State};
 handle_call({pool_stats, PoolName}, _From, State) ->
     Pool = dict:fetch(PoolName, State#state.pools),
@@ -106,32 +123,33 @@ handle_call(_Request, _From, State) ->
     {noreply, ok, State}.
 
 
-handle_cast({return_pid, Pid, Status, CPid}, State) ->
-    {noreply, do_return_pid({Pid, Status}, CPid, State)};
+handle_cast({return_member, Pid, Status, CPid}, State) ->
+    {noreply, do_return_member({Pid, Status}, CPid, State)};
 handle_cast(_Msg, State) ->
     {noreply, State}.
 
 handle_info({'EXIT', Pid, Reason}, State) ->
     % error_logger:info_report({got_exit, Pid, Reason}),
-    State1 = case dict:find(Pid, State#state.in_use_pids) of
-                 {ok, {_PName, CPid}} -> do_return_pid({Pid, fail}, CPid, State);
-                 error ->
-                     CPMap = State#state.consumer_to_pid,
-                     case dict:find(Pid, CPMap) of
-
-                         {ok, Pids} ->
-                             error_logger:info_report({{consumer, Pid, Reason}, Pids}),
-                             IsOk = case Reason of
-                                        normal -> ok;
-                                        _Crash -> fail
-                                    end,
-                             lists:foldl(fun(P, S) ->
-                                                 do_return_pid({P, IsOk}, Pid, S)
-                                         end, State, Pids);
-                         error ->
-                             State
-                     end
-             end,
+    State1 =
+        case dict:find(Pid, State#state.in_use_pids) of
+            {ok, {_PName, CPid}} ->
+                do_return_member({Pid, fail}, CPid, State);
+            error ->
+                CPMap = State#state.consumer_to_pid,
+                case dict:find(Pid, CPMap) of
+                    {ok, Pids} ->
+                        IsOk = case Reason of
+                                   normal -> ok;
+                                   _Crash -> fail
+                               end,
+                        lists:foldl(
+                          fun(P, S) ->
+                                  do_return_member({P, IsOk}, Pid, S)
+                          end, State, Pids);
+                    error ->
+                        State
+                end
+        end,
     {noreply, State1};
 handle_info(_Info, State) ->
     {noreply, State}.
@@ -146,53 +164,51 @@ code_change(_OldVsn, State, _Extra) ->
 %% Internal Function Definitions
 %% ------------------------------------------------------------------
 
-% default_stopper(Pid) ->
-%     exit(Pid, kill).
-
 props_to_pool(P) ->
-    Defs = [{free_pids, []}, {in_use_count, 0}],
-    % a record is just a tagged tuple
-    P2 = lists:append(Defs, P),
-    Values = [ ?gv(Field, P2) || Field <- record_info(fields, pool) ],
-    list_to_tuple([pool|Values]).
+    #pool{      name = ?gv(name, P),
+           max_count = ?gv(max_count, P),
+          init_count = ?gv(init_count, P),
+           start_mfa = ?gv(start_mfa, P)}.
 
+% FIXME: creation of new pids should probably happen
+% in a spawned process to avoid typing up the loop.
 add_pids(error, _N, State) ->
     {bad_pool_name, State};
 add_pids(PoolName, N, State) ->
-    #state{pools = Pools, pid_starter = {M, F}} = State,
+    #state{pools = Pools, pool_sups = PoolSups} = State,
     Pool = dict:fetch(PoolName, Pools),
-    #pool{max_pids = Max, free_pids = Free, in_use_count = NumInUse,
-          pid_starter_args = Args} = Pool,
+    #pool{max_count = Max, free_pids = Free, in_use_count = NumInUse} = Pool,
     Total = length(Free) + NumInUse,
     case Total + N =< Max of
         true ->
-            % FIXME: we'll want to link to these pids so we'll know if
-            % they crash. Or should the starter function be expected
-            % to do spawn_link?
-            NewPids = [ apply(M, F, Args) || _X <- lists:seq(1, N) ],
-            Pool1 = Pool#pool{free_pids = lists:append(Free, NewPids)},
+            Sup = dict:fetch(PoolName, PoolSups),
+            NewPids =
+                lists:map(fun(_I) ->
+                                  {ok, Pid} = supervisor:start_child(Sup, []),
+                                  erlang:link(Pid),
+                                  Pid
+                          end, lists:seq(1, N)),
+            Pool1 = Pool#pool{free_pids = Free ++ NewPids},
             {ok, State#state{pools = dict:store(PoolName, Pool1, Pools)}};
         false ->
-            {max_pids_reached, State}
+            {max_count_reached, State}
     end.
 
-take_pid(PoolName, From, State) ->
+take_member(PoolName, From, State) ->
     #state{pools = Pools, in_use_pids = InUse, consumer_to_pid = CPMap} = State,
     Pool = dict:fetch(PoolName, Pools),
-    #pool{max_pids = Max, free_pids = Free, in_use_count = NumInUse} = Pool,
+    #pool{max_count = Max, free_pids = Free, in_use_count = NumInUse} = Pool,
     case Free of
         [] when NumInUse == Max ->
             {error_no_pids, State};
         [] when NumInUse < Max ->
             case add_pids(PoolName, 1, State) of
                 {ok, State1} ->
-                    take_pid(PoolName, From, State1);
-                {max_pids_reached, _} ->
+                    take_member(PoolName, From, State1);
+                {max_count_reached, _} ->
                     {error_no_pids, State}
             end;
         [Pid|Rest] ->
-            % FIXME: handle min_free here -- should adding pids
-            % to satisfy min_free be done in a spawned worker?
             erlang:link(From),
             Pool1 = Pool#pool{free_pids = Rest, in_use_count = NumInUse + 1},
             CPMap1 = dict:update(From, fun(O) -> [Pid|O] end, [Pid], CPMap),
@@ -201,7 +217,7 @@ take_pid(PoolName, From, State) ->
                               consumer_to_pid = CPMap1}}
     end.
 
-do_return_pid({Pid, Status}, CPid, State) ->
+do_return_member({Pid, Status}, CPid, State) ->
     #state{in_use_pids = InUse, pools = Pools,
            consumer_to_pid = CPMap} = State,
     case dict:find(Pid, InUse) of
@@ -216,7 +232,7 @@ do_return_pid({Pid, Status}, CPid, State) ->
                          pools = dict:store(PoolName, Pool1, Pools),
                          consumer_to_pid = cpmap_remove(Pid, CPid, CPMap)};
         error ->
-            error_logger:warning_report({return_pid_not_found, Pid, dict:to_list(InUse)}),
+            error_logger:warning_report({return_member_not_found, Pid, dict:to_list(InUse)}),
             State
     end.
 
@@ -225,8 +241,7 @@ add_pid_to_free(Pid, Pool) ->
     Pool#pool{free_pids = [Pid|Free], in_use_count = NumInUse - 1}.
 
 handle_failed_pid(Pid, PoolName, Pool, State) ->
-    {M, F} = State#state.pid_stopper,
-    M:F(Pid),
+    exit(Pid, kill),
     {_, NewState} = add_pids(PoolName, 1, State),
     NumInUse = Pool#pool.in_use_count,
     {Pool#pool{in_use_count = NumInUse - 1}, NewState}.

+ 5 - 2
src/pidq_app.erl → src/pooler_app.erl

@@ -1,4 +1,4 @@
--module(pidq_app).
+-module(pooler_app).
 
 -behaviour(application).
 
@@ -10,7 +10,10 @@
 %% ===================================================================
 
 start(_StartType, _StartArgs) ->
-    pidq_sup:start_link().
+    case pooler_sup:start_link() of
+        {ok, Pid} -> {ok, Pid};
+        Other -> {error, Other}
+    end.
 
 stop(_State) ->
     ok.

+ 15 - 0
src/pooler_pool_sup.erl

@@ -0,0 +1,15 @@
+-module(pooler_pool_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/0, init/1]).
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+    Worker = {pooler_pooled_worker_sup,
+              {pooler_pooled_worker_sup, start_link, []},
+              temporary, 5000, supervisor, [pooler_pooled_worker_sup]},
+    Restart = {simple_one_for_one, 1, 1},
+    {ok, {Restart, [Worker]}}.

+ 14 - 0
src/pooler_pooled_worker_sup.erl

@@ -0,0 +1,14 @@
+-module(pooler_pooled_worker_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/1, init/1]).
+
+start_link(Config) ->
+    supervisor:start_link(?MODULE, Config).
+
+init({Mod, Fun, Args}) ->
+    Worker = {Mod, {Mod, Fun, Args}, temporary, brutal_kill, worker, [Mod]},
+    Specs = [Worker],
+    Restart = {simple_one_for_one, 1, 1},
+    {ok, {Restart, Specs}}.

+ 17 - 0
src/pooler_sup.erl

@@ -0,0 +1,17 @@
+-module(pooler_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/0, init/1]).
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+    Config = application:get_all_env(),
+    Pooler = {pooler, {pooler, start_link, [Config]},
+            permanent, 5000, worker, [pooler]},
+    PoolerPool = {pooler_pool_sup, {pooler_pool_sup, start_link, []},
+                permanent, 5000, supervisor, [pooler_pool_sup]},
+    {ok, {{one_for_one, 5, 10}, [PoolerPool, Pooler]}}.
+

+ 85 - 0
test/pooled_gs.erl

@@ -0,0 +1,85 @@
+-module(pooled_gs).
+-behaviour(gen_server).
+-define(SERVER, ?MODULE).
+
+%% ------------------------------------------------------------------
+%% API Function Exports
+%% ------------------------------------------------------------------
+
+-export([start_link/1,
+         get_id/1,
+         ping/1,
+         ping_count/1,
+         crash/1,
+         stop/1
+        ]).
+
+%% ------------------------------------------------------------------
+%% gen_server Function Exports
+%% ------------------------------------------------------------------
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+%% ------------------------------------------------------------------
+%% API Function Definitions
+%% ------------------------------------------------------------------
+
+start_link(Args ={_Type}) ->
+    % not registered
+    gen_server:start_link(?MODULE, Args, []).
+
+get_id(S) ->
+    gen_server:call(S, get_id).
+
+ping(S) ->
+    gen_server:call(S, ping).
+
+ping_count(S) ->
+    gen_server:call(S, ping_count).
+
+crash(S) ->
+    gen_server:cast(S, crash),
+    sent_crash_request.
+
+stop(S) ->
+    gen_server:call(S, stop).
+
+%% ------------------------------------------------------------------
+%% gen_server Function Definitions
+%% ------------------------------------------------------------------
+-record(state, {
+          type = "",
+          id,
+          ping_count = 0
+         }).
+
+init({Type}) ->
+    {ok, #state{type = Type, id = make_ref()}}.
+
+handle_call(get_id, _From, State) ->
+    {reply, {State#state.type, State#state.id}, State};
+handle_call(ping, _From, #state{ping_count = C } = State) ->
+    State1 = State#state{ping_count = C + 1},
+    {reply, pong, State1};
+handle_call(ping_count, _From, #state{ping_count = C } = State) ->
+    {reply, C, State};
+handle_call(stop, _From, State) ->
+    {stop, normal, stop_ok, State};
+handle_call(_Request, _From, State) ->
+    {noreply, ok, State}.
+
+handle_cast(crash, State) ->
+    erlang:error({pooled_gs, requested_crash});
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+

+ 74 - 74
test/pidq_test.erl → test/pooler_test.erl

@@ -1,18 +1,15 @@
--module(pidq_test).
+-module(pooler_test).
 
 -include_lib("eunit/include/eunit.hrl").
 
 -compile([export_all]).
 
-% The `user' processes represent users of the pidq library.  A user
+% The `user' processes represent users of the pooler library.  A user
 % process will take a pid, report details on the pid it has, release
 % and take a new pid, stop cleanly, and crash.
 
 start_user() ->
-    spawn(fun() ->
-                  TC = pidq:take_pid(),
-                  user_loop(TC)
-          end).
+    spawn(fun() -> user_loop(start) end).
 
 user_id(Pid) ->
     Pid ! {get_tc_id, self()},
@@ -30,23 +27,31 @@ user_stop(Pid) ->
 user_crash(Pid) ->
     Pid ! crash.
 
+user_loop(Atom) when Atom =:= error_no_pids orelse Atom =:= start ->
+    user_loop(pooler:take_member());
 user_loop(MyTC) ->
     receive
         {get_tc_id, From} ->
-            From ! get_tc_id(MyTC),
+            From ! pooled_gs:get_id(MyTC),
+            user_loop(MyTC);
+        {ping_tc, From} ->
+            From ! pooled_gs:ping(MyTC),
+            user_loop(MyTC);
+        {ping_count, From} ->
+            From ! pooled_gs:ping_count(MyTC),
             user_loop(MyTC);
         new_tc ->
-            pidq:return_pid(MyTC, ok),
-            MyNewTC = pidq:take_pid(),
+            pooler:return_member(MyTC, ok),
+            MyNewTC = pooler:take_member(),
             user_loop(MyNewTC);
         stop ->
-            pidq:return_pid(MyTC, ok),
+            pooler:return_member(MyTC, ok),
             stopped;
         crash ->
             erlang:error({user_loop, kaboom})
     end.
 
-% The `tc' processes represent the pids tracked by pidq for testing.
+% The `tc' processes represent the pids tracked by pooler for testing.
 % They have a type and an ID and can report their type and ID and
 % stop.
 
@@ -80,52 +85,51 @@ assert_tc_valid(Pid) ->
     ?assertMatch({_Type, _Ref}, get_tc_id(Pid)),
     ok.
 
-tc_sanity_test() ->
-    Pid1 = tc_starter("1"),
-    {"1", Id1} = get_tc_id(Pid1),
-    Pid2 = tc_starter("1"),
-    {"1", Id2} = get_tc_id(Pid2),
-    ?assertNot(Id1 == Id2),
-    stop_tc(Pid1),
-    stop_tc(Pid2).
-
-user_sanity_test() ->
-    Pid1 = tc_starter("1"),
-    User = spawn(fun() -> user_loop(Pid1) end),
-    ?assertMatch({"1", _Ref}, user_id(User)),
-    user_crash(User),
-    stop_tc(Pid1).
-
-pidq_basics_test_() ->
+% tc_sanity_test() ->
+%     Pid1 = tc_starter("1"),
+%     {"1", Id1} = get_tc_id(Pid1),
+%     Pid2 = tc_starter("1"),
+%     {"1", Id2} = get_tc_id(Pid2),
+%     ?assertNot(Id1 == Id2),
+%     stop_tc(Pid1),
+%     stop_tc(Pid2).
+
+% user_sanity_test() ->
+%     Pid1 = tc_starter("1"),
+%     User = spawn(fun() -> user_loop(Pid1) end),
+%     ?assertMatch({"1", _Ref}, user_id(User)),
+%     user_crash(User),
+%     stop_tc(Pid1).
+
+pooler_basics_test_() ->
     {foreach,
      % setup
      fun() ->
              Pools = [[{name, "p1"},
-                       {max_pids, 3}, {min_free, 1},
-                       {init_size, 2}, {pid_starter_args, ["type-0"]}]],
-
-             Config = [{pid_starter, {?MODULE, tc_starter}},
-                       {pid_stopper, {?MODULE, stop_tc}},
-                       {pools, Pools}],
-             pidq:start(Config)
+                       {max_count, 3},
+                       {init_count, 2},
+                       {start_mfa,
+                        {pooled_gs, start_link, [{"type-0"}]}}]],
+             application:set_env(pooler, pools, Pools),
+             application:start(pooler)
      end,
      fun(_X) ->
-             pidq:stop()
+             application:stop(pooler)
      end,
      [
       {"take and return one",
        fun() ->
-               P = pidq:take_pid(),
-               ?assertMatch({"type-0", _Id}, get_tc_id(P)),
-               ok = pidq:return_pid(P, ok)
+               P = pooler:take_member(),
+               ?assertMatch({"type-0", _Id}, pooled_gs:get_id(P)),
+               ok = pooler:return_member(P, ok)
        end},
 
       {"pids are created on demand until max",
        fun() ->
-               Pids = [pidq:take_pid(), pidq:take_pid(), pidq:take_pid()],
-               ?assertMatch(error_no_pids, pidq:take_pid()),
-               ?assertMatch(error_no_pids, pidq:take_pid()),
-               PRefs = [ R || {_T, R} <- [ get_tc_id(P) || P <- Pids ] ],
+               Pids = [pooler:take_member(), pooler:take_member(), pooler:take_member()],
+               ?assertMatch(error_no_pids, pooler:take_member()),
+               ?assertMatch(error_no_pids, pooler:take_member()),
+               PRefs = [ R || {_T, R} <- [ pooled_gs:get_id(P) || P <- Pids ] ],
                % no duplicates
                ?assertEqual(length(PRefs), length(lists:usort(PRefs)))
        end
@@ -133,36 +137,36 @@ pidq_basics_test_() ->
 
       {"pids are reused most recent return first",
        fun() ->
-               P1 = pidq:take_pid(),
-               P2 = pidq:take_pid(),
+               P1 = pooler:take_member(),
+               P2 = pooler:take_member(),
                ?assertNot(P1 == P2),
-               ok = pidq:return_pid(P1, ok),
-               ok = pidq:return_pid(P2, ok),
+               ok = pooler:return_member(P1, ok),
+               ok = pooler:return_member(P2, ok),
                % pids are reused most recent first
-               ?assertEqual(P2, pidq:take_pid()),
-               ?assertEqual(P1, pidq:take_pid())
+               ?assertEqual(P2, pooler:take_member()),
+               ?assertEqual(P1, pooler:take_member())
        end},
 
       {"if a pid crashes it is replaced",
        fun() ->
-               Pids0 = [pidq:take_pid(), pidq:take_pid(), pidq:take_pid()],
-               Ids0 = [ get_tc_id(P) || P <- Pids0 ],
+               Pids0 = [pooler:take_member(), pooler:take_member(), pooler:take_member()],
+               Ids0 = [ pooled_gs:get_id(P) || P <- Pids0 ],
                % crash them all
-               [ P ! crash || P <- Pids0 ],
+               [ pooled_gs:crash(P) || P <- Pids0 ],
                Pids1 = get_n_pids(3, []),
-               Ids1 = [ get_tc_id(P) || P <- Pids1 ],
+               Ids1 = [ pooled_gs:get_id(P) || P <- Pids1 ],
                [ ?assertNot(lists:member(I, Ids0)) || I <- Ids1 ]
        end
        },
 
       {"if a pid is returned with bad status it is replaced",
        fun() ->
-               Pids0 = [pidq:take_pid(), pidq:take_pid(), pidq:take_pid()],
-               Ids0 = [ get_tc_id(P) || P <- Pids0 ],
+               Pids0 = [pooler:take_member(), pooler:take_member(), pooler:take_member()],
+               Ids0 = [ pooled_gs:get_id(P) || P <- Pids0 ],
                % return them all marking as bad
-               [ pidq:return_pid(P, fail) || P <- Pids0 ],
+               [ pooler:return_member(P, fail) || P <- Pids0 ],
                Pids1 = get_n_pids(3, []),
-               Ids1 = [ get_tc_id(P) || P <- Pids1 ],
+               Ids1 = [ pooled_gs:get_id(P) || P <- Pids1 ],
                [ ?assertNot(lists:member(I, Ids0)) || I <- Ids1 ]
        end
       },
@@ -171,38 +175,33 @@ pidq_basics_test_() ->
        fun() ->
                Consumer = start_user(),
                StartId = user_id(Consumer),
-               ?debugVal(pidq:pool_stats("p1")),
                user_crash(Consumer),
                NewPid = hd(get_n_pids(1, [])),
-               NewId = get_tc_id(NewPid),
-               ?debugVal(pidq:pool_stats("p1")),
+               NewId = pooled_gs:get_id(NewPid),
                ?assertNot(NewId == StartId)
        end
       }
      ]}.
 
 
-pidq_integration_test_() ->
+pooler_integration_test_() ->
     {foreach,
      % setup
      fun() ->
              Pools = [[{name, "p1"},
-                      {max_pids, 10},
-                      {min_free, 3},
-                      {init_size, 10},
-                      {pid_starter_args, ["type-0"]}]],
-
-             Config = [{pid_starter, {?MODULE, tc_starter}},
-                       {pid_stopper, {?MODULE, stop_tc}},
-                       {pools, Pools}],
-             pidq:start(Config),
+                       {max_count, 10},
+                       {init_count, 10},
+                       {start_mfa,
+                        {pooled_gs, start_link, [{"type-0"}]}}]],
+             application:set_env(pooler, pools, Pools),
+             application:start(pooler),
              Users = [ start_user() || _X <- lists:seq(1, 10) ],
              Users
      end,
      % cleanup
      fun(Users) ->
              [ user_stop(U) || U <- Users ],
-             pidq:stop()
+             application:stop(pooler)
      end,
      %
      [
@@ -212,7 +211,8 @@ pidq_integration_test_() ->
                      TcIds = lists:sort([ user_id(UPid) || UPid <- Users ]),
                      ?assertEqual(lists:usort(TcIds), TcIds)
              end
-      end,
+      end
+      ,
 
       fun(Users) ->
               fun() ->
@@ -242,12 +242,12 @@ pidq_integration_test_() ->
     }.
 
 % testing crash recovery means race conditions when either pids
-% haven't yet crashed or pidq hasn't recovered.  So this helper loops
+% haven't yet crashed or pooler hasn't recovered.  So this helper loops
 % forver until N pids are obtained, ignoring error_no_pids.
 get_n_pids(0, Acc) ->
     Acc;
 get_n_pids(N, Acc) ->
-    case pidq:take_pid() of
+    case pooler:take_member() of
         error_no_pids ->
             get_n_pids(N, Acc);
         Pid ->