Ery Lee 12 years ago
parent
commit
dd89861443
8 changed files with 220 additions and 31 deletions
  1. 20 0
      etc/epgsql.config
  2. 2 1
      src/epgsql.app.src
  3. 65 29
      src/epgsql.erl
  4. 16 0
      src/epgsql_app.erl
  5. 70 0
      src/epgsql_pool.erl
  6. 25 0
      src/epgsql_pool_sup.erl
  7. 21 0
      src/epgsql_sup.erl
  8. 1 1
      src/pgsql_conn.erl

+ 20 - 0
etc/epgsql.config

@@ -0,0 +1,20 @@
+%% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*-
+%% ex: ft=erlang ts=4 sw=4 et
+[{kernel, 
+	[{start_timer, true}, 
+	 {start_pg2, true}
+ ]},
+ {sasl, [
+	{sasl_error_logger, {file, "log/mit_sasl.log"}}
+ ]},
+ {epgsql, [{main, [{pool_size, 2},
+                   {host, "localhost"},
+                   {username, "ipon"},
+                   {password, "public"},
+                   {database, "ipon"},
+                   {port, 5432},
+                   {timeout, 6000}
+          ]}
+ ]}
+].
+

+ 2 - 1
src/epgsql.app.src

@@ -5,4 +5,5 @@
              pgsql_idatetime, pgsql_sock, pgsql_types]},
              pgsql_idatetime, pgsql_sock, pgsql_types]},
   {registered, []},
   {registered, []},
   {applications, [kernel, stdlib, crypto, ssl]},
   {applications, [kernel, stdlib, crypto, ssl]},
-  {included_applications, []}]}.
+  {included_applications, []},
+  {mod, {epqsql_app, []}}]}.

+ 65 - 29
src/pgsql.erl → src/epgsql.erl

@@ -1,8 +1,13 @@
 %%% Copyright (C) 2008 - Will Glozer.  All rights reserved.
 %%% Copyright (C) 2008 - Will Glozer.  All rights reserved.
 
 
--module(pgsql).
+-module(epgsql).
 
 
--export([connect/2, connect/3, connect/4, close/1]).
+-export([select/2,
+         insert/3,
+         update/4,
+         delete/3]).
+
+-export([connect/2]).
 -export([get_parameter/2, squery/2, equery/2, equery/3]).
 -export([get_parameter/2, squery/2, equery/2, equery/3]).
 -export([parse/2, parse/3, parse/4, describe/2, describe/3]).
 -export([parse/2, parse/3, parse/4, describe/2, describe/3]).
 -export([bind/3, bind/4, execute/2, execute/3, execute/4]).
 -export([bind/3, bind/4, execute/2, execute/3, execute/4]).
@@ -11,27 +16,46 @@
 
 
 -include("pgsql.hrl").
 -include("pgsql.hrl").
 
 
-%% -- client interface --
-
-connect(Host, Opts) ->
-    connect(Host, os:getenv("USER"), "", Opts).
-
-connect(Host, Username, Opts) ->
-    connect(Host, Username, "", Opts).
-
-connect(Host, Username, Password, Opts) ->
-    {ok, C} = pgsql_connection:start_link(),
-    pgsql_connection:connect(C, Host, Username, Password, Opts).
-
-close(C) when is_pid(C) ->
-    catch pgsql_connection:stop(C),
-    ok.
+%========================
+% connect and add to pool
+%========================
+connect(Pool, Opts) ->
+    {ok, C} = pgsql_conn:start_link(),
+    Host = proplists:get_value(host, Opts, "localhost"),
+    Username = proplists:get_value(username, Opts),
+    Password = proplists:get_value(password, Opts),
+    DB = proplists:get_value(database, Opts),
+    Timeout = proplists:get_value(timeout, Opts, 10000),
+    Port = proplists:get_value(port, Opts, 5432),
+    pgsql_conn:connect(C, Host, Username, Password,
+        [{database, DB}, {port, Port}, {timeout, Timeout}]),
+    epqsql_pool:add(Pool, C),
+    {ok, C}.
+
+%========================
+% api 
+%========================
+select(Pool, Table) ->
+    SQL = encode({select, Table}),
+    decode(squery(epgsql_pool:get_conn(Pool), SQL)).
+
+insert(Pool, Table, Record) ->
+    SQL = encode({insert, Table, Record}),
+    decode(squery(epgsql_pool:get_conn(Pool), SQL)).
+
+update(Pool, Table, Record, Where) ->
+    SQL = encode({update, Table, Record, Where}),
+    decode(squery(epgsql_pool:get_conn(Pool), SQL)).
+
+delete(Pool, Table, Where) ->
+    SQL = encode({delete, Table, Where}),
+    decode(squery(epgsql_pool:get_conn(Pool), SQL)).
 
 
 get_parameter(C, Name) ->
 get_parameter(C, Name) ->
-    pgsql_connection:get_parameter(C, Name).
+    pgsql_conn:get_parameter(C, Name).
 
 
 squery(C, Sql) ->
 squery(C, Sql) ->
-    ok = pgsql_connection:squery(C, Sql),
+    ok = pgsql_conn:squery(C, Sql),
     case receive_results(C, []) of
     case receive_results(C, []) of
         [Result] -> Result;
         [Result] -> Result;
         Results  -> Results
         Results  -> Results
@@ -41,10 +65,10 @@ equery(C, Sql) ->
     equery(C, Sql, []).
     equery(C, Sql, []).
 
 
 equery(C, Sql, Parameters) ->
 equery(C, Sql, Parameters) ->
-    case pgsql_connection:parse(C, "", Sql, []) of
+    case pgsql_conn:parse(C, "", Sql, []) of
         {ok, #statement{types = Types} = S} ->
         {ok, #statement{types = Types} = S} ->
             Typed_Parameters = lists:zip(Types, Parameters),
             Typed_Parameters = lists:zip(Types, Parameters),
-            ok = pgsql_connection:equery(C, S, Typed_Parameters),
+            ok = pgsql_conn:equery(C, S, Typed_Parameters),
             receive_result(C, undefined);
             receive_result(C, undefined);
         Error ->
         Error ->
             Error
             Error
@@ -59,7 +83,7 @@ parse(C, Sql, Types) ->
     parse(C, "", Sql, Types).
     parse(C, "", Sql, Types).
 
 
 parse(C, Name, Sql, Types) ->
 parse(C, Name, Sql, Types) ->
-    pgsql_connection:parse(C, Name, Sql, Types).
+    pgsql_conn:parse(C, Name, Sql, Types).
 
 
 %% bind
 %% bind
 
 
@@ -67,7 +91,7 @@ bind(C, Statement, Parameters) ->
     bind(C, Statement, "", Parameters).
     bind(C, Statement, "", Parameters).
 
 
 bind(C, Statement, PortalName, Parameters) ->
 bind(C, Statement, PortalName, Parameters) ->
-    pgsql_connection:bind(C, Statement, PortalName, Parameters).
+    pgsql_conn:bind(C, Statement, PortalName, Parameters).
 
 
 %% execute
 %% execute
 
 
@@ -78,25 +102,25 @@ execute(C, S, N) ->
     execute(C, S, "", N).
     execute(C, S, "", N).
 
 
 execute(C, S, PortalName, N) ->
 execute(C, S, PortalName, N) ->
-    pgsql_connection:execute(C, S, PortalName, N),
+    pgsql_conn:execute(C, S, PortalName, N),
     receive_extended_result(C).
     receive_extended_result(C).
 
 
 %% statement/portal functions
 %% statement/portal functions
 
 
 describe(C, #statement{name = Name}) ->
 describe(C, #statement{name = Name}) ->
-    pgsql_connection:describe(C, statement, Name).
+    pgsql_conn:describe(C, statement, Name).
 
 
 describe(C, Type, Name) ->
 describe(C, Type, Name) ->
-    pgsql_connection:describe(C, Type, Name).
+    pgsql_conn:describe(C, Type, Name).
 
 
 close(C, #statement{name = Name}) ->
 close(C, #statement{name = Name}) ->
-    pgsql_connection:close(C, statement, Name).
+    pgsql_conn:close(C, statement, Name).
 
 
 close(C, Type, Name) ->
 close(C, Type, Name) ->
-    pgsql_connection:close(C, Type, Name).
+    pgsql_conn:close(C, Type, Name).
 
 
 sync(C) ->
 sync(C) ->
-    pgsql_connection:sync(C).
+    pgsql_conn:sync(C).
 
 
 %% misc helper functions
 %% misc helper functions
 with_transaction(C, F) ->
 with_transaction(C, F) ->
@@ -112,6 +136,18 @@ with_transaction(C, F) ->
 
 
 %% -- internal functions --
 %% -- internal functions --
 
 
+decode(_) ->
+    ok.
+
+encode({select, _Table}) ->
+    "";
+encode({insert, _Table, _Record}) ->
+    "";
+encode({update, _Table, _Record, _Where}) ->
+    "";
+encode({delete, _Table, _Where}) ->
+    "".
+
 receive_result(C, Result) ->
 receive_result(C, Result) ->
     try receive_result(C, [], []) of
     try receive_result(C, [], []) of
         done    -> Result;
         done    -> Result;

+ 16 - 0
src/epgsql_app.erl

@@ -0,0 +1,16 @@
+-module(epgsql_app).
+
+-export([start/0]).
+
+-behavior(application).
+
+-export([start/2, stop/1]).
+
+start() ->
+    [application:start(A) || A <- [crypto, public_key, ssl, epgsql]].
+
+start(normal, _) ->
+    epgsql_sup:start_link().
+
+stop(_) ->
+    ok.

+ 70 - 0
src/epgsql_pool.erl

@@ -0,0 +1,70 @@
+-module(epgsql_pool).
+
+-behaviour(gen_server).
+
+-export([start_link/1,
+         id/1,
+         add/2,
+         get_conn/1]).
+
+-export([init/1,
+         handle_call/3,
+         handle_cast/2,
+         handle_info/2,
+         terminate/2,
+         code_change/3]).
+
+-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
+
+-record(state, { queue }).
+
+start_link(Pool) ->
+    gen_server:start_link({local, id(Pool)}, ?MODULE, [], [{timeout, infinity}]).
+
+id(Pool) when is_atom(Pool) ->
+    list_to_atom(list:concat([epgsql_pool,":", Pool])).
+
+get_conn(Pool) ->
+    case get(pgsql_conn) of
+    undefined -> 
+        gen_server:call(id(Pool), get_conn, infinity);
+    Pid -> 
+        Pid
+    end.
+
+add(Pool, Pid) ->
+    gen_server:cast(id(Pool), {add, Pid}).
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+    {ok, #state { queue = queue:new() }}.
+
+handle_call(get_conn, _From, State = #state{queue = Q}) ->
+    case queue:out(Q) of
+    {empty, _} ->
+        {reply, {error, empty_pool}, State};
+    {{value, Pid}, Q1} ->
+        {reply, Pid, State#state{queue = queue:in_r(Pid, Q1)}}
+    end;
+
+handle_call(Msg, _From, State) ->
+    {stop, {unexpected_call, Msg}, State}.
+
+handle_cast({add, Pid}, State = #state{queue = Q}) ->
+    %TODO: monitor??
+    {noreply, State#state{queue = queue:in(Pid, Q)}};
+
+handle_cast(Msg, State) ->
+    {stop, {unexpected_cast, Msg}, State}.
+
+handle_info(Msg, State) ->
+    {stop, {unexpected_info, Msg}, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+terminate(_Reason, State) ->
+    State.
+

+ 25 - 0
src/epgsql_pool_sup.erl

@@ -0,0 +1,25 @@
+-module(epgsql_pool_sup).
+
+-export([id/1, start_link/3]).
+
+-behaviour(supervisor).
+
+-export([init/1]).
+
+id(Pool) ->
+    list_to_atom(lists:concat(["epgsql_pool_sup:", Pool])).
+
+start_link(Id, Pool, Opts) ->
+    supervisor:start_link({local, Id}, ?MODULE, [Pool, Opts]). 
+
+init([Pool, Opts]) ->
+    PoolSize = proplists:get_value(pool_size, Opts, 2),
+    {ok, {{one_for_one, 10, 10},
+            [{epgsql_pool:id(Pool), {epqsql_pool, start_link, []}, transient,
+                16#ffffffff, worker, [epqsql_pool]} |
+             [{connid(Pool, I), {epqsql, connect, [Pool, Opts]}, transient, 
+                16#ffffffff, worker, [pqsql_conn]} || I <- lists:seq(1, PoolSize)]]}}.
+    
+connid(Pool, I) ->
+    list_to_atom(list:concat([pgsql_conn, ":", Pool, ":", integer_to_list(I)])). 
+

+ 21 - 0
src/epgsql_sup.erl

@@ -0,0 +1,21 @@
+-module(epgsql_sup).
+
+-export([start_link/0]).
+
+-behaviour(supervisor).
+
+-export([init/1]).
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+   {ok, Env} = application:get_env(epgsql), 
+   {ok, { {one_for_one, 10, 10}, [
+            poolsup(Pool, Opts) || {Pool, Opts} <- Env]}}.
+
+poolsup(Pool, Opts) ->
+    PoolId = epgsql_pool_sup:id(Pool),
+    {PoolId, {epgsql_pool_sup, start_link, [PoolId, Pool, Opts]},
+        transient, 16#ffffffff, supervisor, [epgsql_pool_sup]}.
+

+ 1 - 1
src/pgsql_connection.erl → src/pgsql_conn.erl

@@ -1,6 +1,6 @@
 %%% Copyright (C) 2008 - Will Glozer.  All rights reserved.
 %%% Copyright (C) 2008 - Will Glozer.  All rights reserved.
 
 
--module(pgsql_connection).
+-module(pgsql_conn).
 
 
 -behavior(gen_fsm).
 -behavior(gen_fsm).