mirror https://github.com/epgsql/pooler

Seth Falcon f6d5dc1658 Merge branch 'pooler' 14 лет назад
src 1ea6c5c446 Cleanup unimplemented functions and reformat a bit 14 лет назад
test 456f409595 Rename take_pid => take_member and fix race in test code 14 лет назад
.gitignore e8d76611f8 initial 14 лет назад
Makefile e8d76611f8 initial 14 лет назад
README.org 456f409595 Rename take_pid => take_member and fix race in test code 14 лет назад
pidq.config.example 991afb8b5f Fix typo in config example for riak client 14 лет назад
rebar 36a29220de update rebar 14 лет назад
rebar.config 6711d69cfe wire up supervisors for application:start(pidq) 14 лет назад

README.org

pooler - An OTP Process Pool Application

The pooler application allows you to manage pools of OTP behaviors such as gen_servers, gen_fsms, or supervisors, and provide consumers with exclusive access to pool members using pooler:take_member.

What pooler does

  • Protects the members of a pool from being used concurrently. The main pooler interface is pooler:take_member/0 and pooler:return_member/2. The pooler server will keep track of which members are in use and which are free. There is no need to call pooler:return_member if the consumer is a short-lived process; in this case, pooler will detect the consumer's normal exit and reclaim the member.
  • Maintains the size of the pool. You specify an initial and a maximum number of members in the pool. Pooler will trigger member creation when the free count drops to zero (as long as the in use count is less than the maximum). New pool members are added to replace member that crash. If a consumer crashes, the member it was using will be destroyed and replaced.
  • Manage multiple pools. A common configuration is to have each pool contain client processes connected to a particular node in a cluster (think database read slaves). By default, pooler will randomly select a pool to fetch a member from.

Motivation

The need for pooler arose while writing an Erlang-based application that uses Riak for data storage. Riak's protocol buffer client is a gen_server process that initiates a connection to a Riak node. A pool is needed to avoid spinning up a new client for each request in the application. Reusing clients also has the benefit of keeping the vector clocks smaller since each client ID corresponds to an entry in the vector clock.

When using the Erlang protocol buffer client for Riak, one should avoid accessing a given client concurrently. This is because each client is associated with a unique client ID that corresponds to an element in an object's vector clock. Concurrent action from the same client ID defeats the vector clock. For some further explanation, see [1] and [2]. Note that concurrent access to Riak's pb client is actual ok as long as you avoid updating the same key at the same time. So the pool needs to have checkout/checkin semantics that give consumers exclusive access to a client.

On top of that, in order to evenly load a Riak cluster and be able to continue in the face of Riak node failures, consumers should spread their requests across clients connected to each node. The client pool provides an easy way to load balance.

[1] http://lists.basho.com/pipermail/riak-users_lists.basho.com/2010-September/001900.html [2] http://lists.basho.com/pipermail/riak-users_lists.basho.com/2010-September/001904.html

Usage and API

Pool Configuration

Pool configuration is specified in the pooler application's environment. This can be provided in a config file using -config or set at startup using application:set_env(pooler, pools, Pools). Here's an example config file that creates three pools of Riak pb clients each talking to a different node in a local cluster:

% pooler.config
% Start Erlang as: erl -config pooler
% -*- mode: erlang -*-
% pooler app config
[
 {pooler, [
         {pools, [
                  [{name, "rc8081"},
                   {max_count, 5},
                   {init_count, 2},
                   {start_mfa,
                    {riakc_pb_socket, start_link, ["localhost", 8081]}}],

                  [{name, "rc8082"},
                   {max_count, 5},
                   {init_count, 2},
                   {start_mfa,
                    {riakc_pb_socket, start_link, ["localhost", 8082]}}],

                  [{name, "rc8083"},
                   {max_count, 5},
                   {init_count, 2},
                   {start_mfa,
                    {riakc_pb_socket, start_link, ["localhost", 8083]}}]
                 ]}
        ]}
].

Each pool has a unique name, an initial and maximum number of members, and an {M, F, A} describing how to start members of the pool. When pooler starts, it will create members in each pool according to init_count.

Using pooler

Here's an example session:

application:start(pooler).
P = pooler:take_member(),
% use P
pooler:return_member(P, ok).

Once started, the main interaction you will have with pooler is through two functions, take_member/0 and return_member/2.

Call pooler:take_member() to obtain a member from a randomly selected pool. When you are done with it, return it to the pool using pooler:return_member(Pid, ok). If you encountered an error using the member, you can pass fail as the second argument. In this case, pooler will permanently remove that member from the pool and start a new member to replace it. If your process is short lived, you can omit the call to return_member. In this case, pooler will detect the normal exit of the consumer and reclaim the member.

Other things you can do

You can get the status for the system via pooler:status(). This will return some informational details about the pools being managed.

You can also add or remove new pools while pooler is running using pooler:add_pool/1 and pooler:remove_pool/1 (not yet implemented).

Details

pooler is implemented as a gen_server. Server state consists of:

  • A dict of pools keyed by pool name.
  • A dict of pool supervisors keyed by pool name.
  • A dict mapping in-use members to their pool name and the pid of the consumer that is using the member.
  • A dict mapping consumer process pids to the member they are using.

Each pool keeps track of its parameters, such as max member to allow, initial members to start, number of members in use, and a list of free members.

Since our motivating use-case is Riak's pb client, we opt to reuse a given client as much as possible to avoid unnecessary vector clock growth; members are taken from the head of the free list and returned to the head of the free list.

pooler is a system process and traps exits. Before giving out a member, it links to the requesting consumer process. This way, if the consumer process crashes, pooler can recover the member. When the member is returned, the link to the consumer process will be severed. Since the state of the member is unknown in the case of a crashing consumer, we will destroy the member and add a fresh one to the pool.

The member starter MFA should use start_link so that pooler will be linked to the members. This way, when members crash, pooler will be notified and can refill the pool with new pids.

Supervision

The top-level pooler supervisor, pooler_sup, supervises the pooler gen_server and the pooler_pool_sup supervisor. pooler_pool_sup supervises individual pool supervisors (pooler_pooled_worker_sup). Each pooler_pooled_worker_sup supervises the members of a pool.

./pidq_appmon.jpg

Pool management

It is an error to add a pool with a name that already exists.

Pool removal has two forms:

  • graceful pids in the free list are killed (using exit(pid, kill) unless a pid_stopper is specified in the pool parameters. No pids will be handed out from this pool's free list. As pids are returned, they are shut down. When the pool is empty, it is removed.
  • immediate all pids in free and in-use lists are shut down; the pool is removed.
  -spec(take_member() -> pid()).
  
  -spec(return_member(pid(), ok | fail) -> ignore).
  
  -spec(status() -> [term()]).
  
  -type(pid_type_opt() ::
        {name, string()} |
        {max_pids, int()} |
        {min_free, int()} |
        {init_size, int()} |
        {pid_starter_args, [term()]}).
  
  -type(pid_type_spec() :: [pid_type_opt()]).
  -spec(add_type(pid_type_spec()) -> ok | {error, Why}).
  -spec(remove_type(string()) -> ok | {error, Why}).

Notes

Move pid_starter and pid_stopper to pool config

This way, you can have pools backed not only by different config, but by entirely different services. Could be useful for testing a new client implementation.

Rename something other than "pid"

Consider ets for state storage rather than dict

pman:start().
A1 = {riakc_pb_socket, start_link, ["127.0.0.1", 8081]}.
{ok, S1} = pidq_pool_sup:start_link(A1).
supervisor:start_child(S1, []).

{ok, S2} = pidq_sup:start_link([]).
supervisor:start_child(pidq_pool_sup, [A1]).

application:load(pidq).
C = application:get_all_env(pidq).
pidq:start(C).

supervision strategy

pidq_sup

top-level supervisor watches pidq gen_server and the pidq_pool_sup supervisor.

pidq_pool_sup

A simple_one_for_one supervisor that is used to create/watch pidq_pooled_worker_sup supervisor. You use this to create a new pool and specify the M,F,A of the pooled worker at start.

pidq_pooled_worker_sup

Another simple_one_for_one that is used to create actual workers.

Rename plan

genpool, gs_pool, pooler pid => worker, member, gs, gspid pooler:take_member/0 pooler:return_member/2