Browse Source

add support for timing out long-running operations

Will 16 years ago
parent
commit
4960cec600
5 changed files with 182 additions and 51 deletions
  1. 5 1
      README
  2. 15 16
      src/pgsql.erl
  3. 121 33
      src/pgsql_connection.erl
  4. 13 1
      src/pgsql_sock.erl
  5. 28 0
      test_src/pgsql_tests.erl

+ 5 - 1
README

@@ -2,7 +2,7 @@ Erlang PostgreSQL Database Client
 
 * Connect
 
-  {ok, C} = pgsql:connect(Host, [Username], [Password], [Opts]).
+  {ok, C} = pgsql:connect(Host, [Username], [Password], Opts).
 
   Host      - host to connect to.
   Username  - username to connect as, defaults to $USER.
@@ -13,10 +13,14 @@ Erlang PostgreSQL Database Client
     + port
     + ssl (true | false | required)
     + ssl_opts (see ssl docs in OTP)
+    + timeout (milliseconds, defaults to 5000)
 
   {ok, C} = pgsql:connect("localhost", "username", [{database, "test_db"}]).
   ok = pgsql:close(C).
 
+  The timeout parameter is applied to all operations. In the case of equery
+  this means that total execution time may exceed the timeout value.
+
 * Simple Query
 
   {ok, Columns, Rows} = pgsql:squery(C, Sql).

+ 15 - 16
src/pgsql.erl

@@ -1,5 +1,4 @@
 %%% Copyright (C) 2008 - Will Glozer.  All rights reserved.
-
 -module(pgsql).
 
 -export([connect/2, connect/3, connect/4, close/1]).
@@ -11,8 +10,6 @@
 
 -include("pgsql.hrl").
 
--define(timeout, 5000).
-
 %% -- client interface --
 
 connect(Host, Opts) ->
@@ -47,7 +44,7 @@ equery(C, Sql, Parameters) ->
         {ok, #statement{types = Types} = S} ->
             Typed_Parameters = lists:zip(Types, Parameters),
             ok = pgsql_connection:equery(C, S, Typed_Parameters),
-            receive_result(C);
+            receive_result(C, undefined);
         Error ->
             Error
     end.
@@ -114,16 +111,18 @@ with_transaction(C, F) ->
 
 %% -- internal functions --
 
-receive_result(C) ->
-    R = receive_result(C, [], []),
-    receive
-        {pgsql, C, done} -> R
+receive_result(C, Result) ->
+    case receive_result(C, [], []) of
+        done    -> Result;
+        timeout -> {error, timeout};
+        R       -> receive_result(C, R)
     end.
 
 receive_results(C, Results) ->
     case receive_result(C, [], []) of
-        done -> lists:reverse(Results);
-        R    -> receive_results(C, [R | Results])
+        done    -> lists:reverse(Results);
+        timeout -> lists:reverse([{error, timeout} | Results]);
+        R       -> receive_results(C, [R | Results])
     end.
 
 receive_result(C, Cols, Rows) ->
@@ -144,9 +143,9 @@ receive_result(C, Cols, Rows) ->
         {pgsql, C, {notice, _N}} ->
             receive_result(C, Cols, Rows);
         {pgsql, C, done} ->
-            done
-    after
-        ?timeout -> {error, timeout}
+            done;
+        {pgsql, C, timeout} ->
+            timeout
     end.
 
 receive_extended_result(C)->
@@ -168,7 +167,7 @@ receive_extended_result(C, Rows) ->
         {pgsql, C, {complete, _Type}} ->
             {ok, lists:reverse(Rows)};
         {pgsql, C, {notice, _N}} ->
-            receive_extended_result(C, Rows)
-    after
-        ?timeout -> {error, timeout}
+            receive_extended_result(C, Rows);
+        {pgsql, C, timeout} ->
+            {error, timeout}
     end.

+ 121 - 33
src/pgsql_connection.erl

@@ -14,13 +14,14 @@
 
 -export([startup/3, auth/2, initializing/2, ready/2, ready/3]).
 -export([querying/2, parsing/2, binding/2, describing/2]).
--export([executing/2, closing/2, synchronizing/2]).
+-export([executing/2, closing/2, synchronizing/2, timeout/2]).
 
 -include("pgsql.hrl").
 
 -record(state, {
           reader,
           sock,
+          timeout,
           parameters = [],
           reply,
           reply_to,
@@ -111,31 +112,35 @@ code_change(_Old_Vsn, State_Name, State, _Extra) ->
 %% -- states --
 
 startup({connect, Host, Username, Password, Opts}, From, State) ->
+    Timeout = proplists:get_value(timeout, Opts, 5000),
     case pgsql_sock:start_link(self(), Host, Username, Opts) of
         {ok, Sock} ->
             put(username, Username),
             put(password, Password),
-            State2 = State#state{sock = Sock, reply_to = From},
-            {next_state, auth, State2};
+            State2 = State#state{sock = Sock, timeout = Timeout, reply_to = From},
+            {next_state, auth, State2, Timeout};
         Error ->
             {stop, normal, Error, State}
     end.
 
 %% AuthenticationOk
 auth({$R, <<0:?int32>>}, State) ->
-    {next_state, initializing, State};
+    #state{timeout = Timeout} = State,
+    {next_state, initializing, State, Timeout};
 
 %% AuthenticationCleartextPassword
 auth({$R, <<3:?int32>>}, State) ->
+    #state{timeout = Timeout} = State,
     send(State, $p, [get(password), 0]),
-    {next_state, auth, State};
+    {next_state, auth, State, Timeout};
 
 %% AuthenticationMD5Password
 auth({$R, <<5:?int32, Salt:4/binary>>}, State) ->
+    #state{timeout = Timeout} = State,
     Digest1 = hex(erlang:md5([get(password), get(username)])),
     Str = ["md5", hex(erlang:md5([Digest1, Salt])), 0],
     send(State, $p, Str),
-    {next_state, auth, State};
+    {next_state, auth, State, Timeout};
 
 auth({$R, <<M:?int32, _/binary>>}, State) ->
     case M of
@@ -157,12 +162,17 @@ auth({error, E}, State) ->
         Any         -> Why = Any
     end,
     gen_fsm:reply(State#state.reply_to, {error, Why}),
+    {stop, normal, State};
+
+auth(timeout, State) ->
+    gen_fsm:reply(State#state.reply_to, {error, timeout}),
     {stop, normal, State}.
 
 %% BackendKeyData
 initializing({$K, <<Pid:?int32, Key:?int32>>}, State) ->
+    #state{timeout = Timeout} = State,
     State2 = State#state{backend = {Pid, Key}},
-    {next_state, initializing, State2};
+    {next_state, initializing, State2, Timeout};
 
 %% ErrorResponse
 initializing({error, E}, State) ->
@@ -173,6 +183,10 @@ initializing({error, E}, State) ->
     gen_fsm:reply(State#state.reply_to, {error, Why}),
     {stop, normal, State};
 
+initializing(timeout, State) ->
+    gen_fsm:reply(State#state.reply_to, {error, timeout}),
+    {stop, normal, State};
+
 %% ReadyForQuery
 initializing({$Z, <<Status:8>>}, State) ->
     #state{parameters = Parameters, reply_to = Reply_To} = State,
@@ -190,12 +204,14 @@ ready(_Msg, State) ->
 
 %% execute simple query
 ready({squery, Sql}, From, State) ->
+    #state{timeout = Timeout} = State,
     send(State, $Q, [Sql, 0]),
     State2 = State#state{statement = #statement{}, reply_to = From},
-    {reply, ok, querying, State2};
+    {reply, ok, querying, State2, Timeout};
 
 %% execute extended query
 ready({equery, Statement, Parameters}, From, State) ->
+    #state{timeout = Timeout} = State,
     #statement{name = StatementName, columns = Columns} = Statement,
     Bin1 = encode_parameters(Parameters),
     Bin2 = encode_formats(Columns),
@@ -204,7 +220,7 @@ ready({equery, Statement, Parameters}, From, State) ->
     send(State, $C, [$S, "", 0]),
     send(State, $S, []),
     State2 = State#state{statement = Statement, reply_to = From},
-    {reply, ok, querying, State2};
+    {reply, ok, querying, State2, Timeout};
 
 ready({get_parameter, Name}, _From, State) ->
     case lists:keysearch(Name, 1, State#state.parameters) of
@@ -214,88 +230,108 @@ ready({get_parameter, Name}, _From, State) ->
     {reply, {ok, Value}, ready, State};
 
 ready({parse, Name, Sql, Types}, From, State) ->
+    #state{timeout = Timeout} = State,
     Bin = encode_types(Types),
     send(State, $P, [Name, 0, Sql, 0, Bin]),
     send(State, $D, [$S, Name, 0]),
     send(State, $H, []),
     S = #statement{name = Name},
-    {next_state, parsing, State#state{statement = S, reply_to = From}};
+    State2 = State#state{statement = S, reply_to = From},
+    {next_state, parsing, State2, Timeout};
 
 ready({bind, Statement, PortalName, Parameters}, From, State) ->
+    #state{timeout = Timeout} = State,
     #statement{name = StatementName, columns = Columns, types = Types} = Statement,
     Typed_Parameters = lists:zip(Types, Parameters),
     Bin1 = encode_parameters(Typed_Parameters),
     Bin2 = encode_formats(Columns),
     send(State, $B, [PortalName, 0, StatementName, 0, Bin1, Bin2]),
     send(State, $H, []),
-    {next_state, binding, State#state{statement = Statement, reply_to = From}};
+    State2 = State#state{statement = Statement, reply_to = From},
+    {next_state, binding, State2, Timeout};
 
 ready({execute, Statement, PortalName, MaxRows}, From, State) ->
+    #state{timeout = Timeout} = State,
     send(State, $E, [PortalName, 0, <<MaxRows:?int32>>]),
     send(State, $H, []),
-    {reply, ok, executing, State#state{statement = Statement, reply_to = From}};
+    State2 = State#state{statement = Statement, reply_to = From},
+    {reply, ok, executing, State2, Timeout};
 
 ready({describe, Type, Name}, From, State) ->
+    #state{timeout = Timeout} = State,
     case Type of
         statement -> Type2 = $S;
         portal    -> Type2 = $P
     end,
     send(State, $D, [Type2, Name, 0]),
     send(State, $H, []),
-    {next_state, describing, State#state{reply_to = From}};
+    {next_state, describing, State#state{reply_to = From}, Timeout};
 
 ready({close, Type, Name}, From, State) ->
+    #state{timeout = Timeout} = State,
     case Type of
         statement -> Type2 = $S;
         portal    -> Type2 = $P
     end,
     send(State, $C, [Type2, Name, 0]),
     send(State, $H, []),
-    {next_state, closing, State#state{reply_to = From}};
+    {next_state, closing, State#state{reply_to = From}, Timeout};
 
 ready(sync, From, State) ->
+    #state{timeout = Timeout} = State,
     send(State, $S, []),
-    {next_state, synchronizing, State#state{reply = ok, reply_to = From}}.
+    State2 = State#state{reply = ok, reply_to = From},
+    {next_state, synchronizing, State2, Timeout}.
 
 %% BindComplete
 querying({$2, <<>>}, State) ->
-    #state{statement = #statement{columns = Columns}} = State,
+    #state{timeout = Timeout, statement = #statement{columns = Columns}} = State,
     notify(State, {columns, Columns}),
-    {next_state, querying, State};
+    {next_state, querying, State, Timeout};
 
 %% CloseComplete
 querying({$3, <<>>}, State) ->
-    {next_state, querying, State};
+    #state{timeout = Timeout} = State,
+    {next_state, querying, State, Timeout};
 
 %% RowDescription
 querying({$T, <<Count:?int16, Bin/binary>>}, State) ->
+    #state{timeout = Timeout} = State,
     Columns = decode_columns(Count, Bin),
     S2 = (State#state.statement)#statement{columns = Columns},
     notify(State, {columns, Columns}),
-    {next_state, querying, State#state{statement = S2}};
+    {next_state, querying, State#state{statement = S2}, Timeout};
 
 %% DataRow
 querying({$D, <<_Count:?int16, Bin/binary>>}, State) ->
-    #state{statement = #statement{columns = Columns}} = State,
+    #state{timeout = Timeout, statement = #statement{columns = Columns}} = State,
     Data = decode_data(Columns, Bin),
     notify(State, {data, Data}),
-    {next_state, querying, State};
+    {next_state, querying, State, Timeout};
 
 %% CommandComplete
 querying({$C, Bin}, State) ->
+    #state{timeout = Timeout} = State,
     Complete = decode_complete(Bin),
     notify(State, {complete, Complete}),
-    {next_state, querying, State};
+    {next_state, querying, State, Timeout};
 
 %% EmptyQueryResponse
 querying({$I, _Bin}, State) ->
+    #state{timeout = Timeout} = State,
     notify(State, {complete, empty}),
-    {next_state, querying, State};
+    {next_state, querying, State, Timeout};
+
+querying(timeout, State) ->
+    #state{sock = Sock, timeout = Timeout, backend = {Pid, Key}} = State,
+    pgsql_sock:cancel(Sock, Pid, Key),
+    {next_state, timeout, State, Timeout};
 
 %% ErrorResponse
 querying({error, E}, State) ->
+    #state{timeout = Timeout} = State,
     notify(State, {error, E}),
-    {next_state, querying, State};
+    {next_state, querying, State, Timeout};
 
 %% ReadyForQuery
 querying({$Z, <<_Status:8>>}, State) ->
@@ -304,13 +340,21 @@ querying({$Z, <<_Status:8>>}, State) ->
 
 %% ParseComplete
 parsing({$1, <<>>}, State) ->
-    {next_state, describing, State};
+    #state{timeout = Timeout} = State,
+    {next_state, describing, State, Timeout};
+
+parsing(timeout, State) ->
+    #state{timeout = Timeout} = State,
+    Reply = {error, timeout},
+    send(State, $S, []),
+    {next_state, parsing, State#state{reply = Reply}, Timeout};
 
 %% ErrorResponse
 parsing({error, E}, State) ->
+    #state{timeout = Timeout} = State,
     Reply = {error, E},
     send(State, $S, []),
-    {next_state, parsing, State#state{reply = Reply}};
+    {next_state, parsing, State#state{reply = Reply}, Timeout};
 
 %% ReadyForQuery
 parsing({$Z, <<Status:8>>}, State) ->
@@ -323,11 +367,18 @@ binding({$2, <<>>}, State) ->
     gen_fsm:reply(State#state.reply_to, ok),
     {next_state, ready, State};
 
+binding(timeout, State) ->
+    #state{timeout = Timeout} = State,
+    Reply = {error, timeout},
+    send(State, $S, []),
+    {next_state, binding, State#state{reply = Reply}, Timeout};
+
 %% ErrorResponse
 binding({error, E}, State) ->
+    #state{timeout = Timeout} = State,
     Reply = {error, E},
     send(State, $S, []),
-    {next_state, binding, State#state{reply = Reply}};
+    {next_state, binding, State#state{reply = Reply}, Timeout};
 
 %% ReadyForQuery
 binding({$Z, <<Status:8>>}, State) ->
@@ -337,9 +388,10 @@ binding({$Z, <<Status:8>>}, State) ->
 
 %% ParameterDescription
 describing({$t, <<_Count:?int16, Bin/binary>>}, State) ->
+    #state{timeout = Timeout} = State,
     Types = [pgsql_types:oid2type(Oid) || <<Oid:?int32>> <= Bin],
     S2 = (State#state.statement)#statement{types = Types},
-    {next_state, describing, State#state{statement = S2}};
+    {next_state, describing, State#state{statement = S2}, Timeout};
 
 %% RowDescription
 describing({$T, <<Count:?int16, Bin/binary>>}, State) ->
@@ -355,11 +407,18 @@ describing({$n, <<>>}, State) ->
     gen_fsm:reply(State#state.reply_to, {ok, S2}),
     {next_state, ready, State};
 
+describing(timeout, State) ->
+    #state{timeout = Timeout} = State,
+    Reply = {error, timeout},
+    send(State, $S, []),
+    {next_state, describing, State#state{reply = Reply}, Timeout};
+
 %% ErrorResponse
 describing({error, E}, State) ->
+    #state{timeout = Timeout} = State,
     Reply = {error, E},
     send(State, $S, []),
-    {next_state, describing, State#state{reply = Reply}};
+    {next_state, describing, State#state{reply = Reply}, Timeout};
 
 %% ReadyForQuery
 describing({$Z, <<Status:8>>}, State) ->
@@ -369,10 +428,10 @@ describing({$Z, <<Status:8>>}, State) ->
 
 %% DataRow
 executing({$D, <<_Count:?int16, Bin/binary>>}, State) ->
-    #state{statement = #statement{columns = Columns}} = State,
+    #state{timeout = Timeout, statement = #statement{columns = Columns}} = State,
     Data = decode_data(Columns, Bin),
     notify(State, {data, Data}),
-    {next_state, executing, State};
+    {next_state, executing, State, Timeout};
 
 %% PortalSuspended
 executing({$s, <<>>}, State) ->
@@ -389,16 +448,27 @@ executing({$I, _Bin}, State) ->
     notify(State, {complete, empty}),
     {next_state, ready, State};
 
+executing(timeout, State) ->
+    #state{sock = Sock, timeout = Timeout, backend = {Pid, Key}} = State,
+    pgsql_sock:cancel(Sock, Pid, Key),
+    send(State, $S, []),
+    {next_state, timeout, State, Timeout};
+
 %% ErrorResponse
 executing({error, E}, State) ->
+    #state{timeout = Timeout} = State,
     notify(State, {error, E}),
-    {next_state, executing, State}.
+    {next_state, executing, State, Timeout}.
 
 %% CloseComplete
 closing({$3, <<>>}, State) ->
     gen_fsm:reply(State#state.reply_to, ok),
     {next_state, ready, State};
 
+closing(timeout, State) ->
+    gen_fsm:reply(State#state.reply_to, {error, timeout}),
+    {next_state, ready, State};
+
 %% ErrorResponse
 closing({error, E}, State) ->
     Error = {error, E},
@@ -407,8 +477,14 @@ closing({error, E}, State) ->
 
 %% ErrorResponse
 synchronizing({error, E}, State) ->
+    #state{timeout = Timeout} = State,
     Reply = {error, E},
-    {next_state, synchronizing, State#state{reply = Reply}};
+    {next_state, synchronizing, State#state{reply = Reply}, Timeout};
+
+synchronizing(timeout, State) ->
+    #state{timeout = Timeout} = State,
+    Reply = {error, timeout},
+    {next_state, synchronizing, State#state{reply = Reply}, Timeout};
 
 %% ReadyForQuery
 synchronizing({$Z, <<Status:8>>}, State) ->
@@ -416,6 +492,18 @@ synchronizing({$Z, <<Status:8>>}, State) ->
     gen_fsm:reply(Reply_To, Reply),
     {next_state, ready, State#state{reply = undefined, txstatus = Status}}.
 
+timeout({$Z, <<Status:8>>}, State) ->
+    notify(State, timeout),
+    {next_state, ready, State#state{txstatus = Status}};
+
+timeout(timeout, State) ->
+    {stop, timeout, State};
+
+%% ignore events that occur after timeout
+timeout(_Event, State) ->
+    #state{timeout = Timeout} = State,
+    {next_state, timeout, State, Timeout}.
+
 %% -- internal functions --
 
 %% decode data

+ 13 - 1
src/pgsql_sock.erl

@@ -4,7 +4,7 @@
 
 -behavior(gen_server).
 
--export([start_link/4, send/2, send/3]).
+-export([start_link/4, send/2, send/3, cancel/3]).
 -export([decode_string/1, lower_atom/1]).
 
 -export([handle_call/3, handle_cast/2, handle_info/2]).
@@ -32,6 +32,9 @@ send(S, Data) ->
     Msg = <<(byte_size(Bin) + 4):?int32, Bin/binary>>,
     gen_server:cast(S, {send, Msg}).
 
+cancel(S, Pid, Key) ->
+    gen_server:cast(S, {cancel, Pid, Key}).
+
 %% -- gen_server implementation --
 
 init([C, Host, Username, Opts]) ->
@@ -74,6 +77,15 @@ handle_cast({send, Data}, State) ->
     ok = Mod:send(Sock, Data),
     {noreply, State};
 
+handle_cast({cancel, Pid, Key}, State) ->
+    {ok, {Addr, Port}} = inet:peername(State#state.sock),
+    SockOpts = [{active, false}, {packet, raw}, binary],
+    {ok, Sock} = gen_tcp:connect(Addr, Port, SockOpts),
+    Msg = <<16:?int32, 80877102:?int32, Pid:?int32, Key:?int32>>,
+    ok = gen_tcp:send(Sock, Msg),
+    gen_tcp:close(Sock),
+    {noreply, State};
+
 handle_cast(Cast, State) ->
     {stop, {unsupported_cast, Cast}, State}.
 

+ 28 - 0
test_src/pgsql_tests.erl

@@ -406,6 +406,31 @@ text_format_test() ->
               Select("numeric", "123456")
       end).
 
+connect_timeout_test() ->
+    {error, timeout} = pgsql:connect(?host, [{port, ?port}, {timeout, 0}]).
+
+query_timeout_test() ->
+    with_connection(
+      fun(C) ->
+              {error, timeout} = pgsql:squery(C, "select pg_sleep(1)"),
+              {error, timeout} = pgsql:equery(C, "select pg_sleep(2)"),
+              {ok, _Cols, [{1}]} = pgsql:equery(C, "select 1")
+      end,
+      [{timeout, 10}]).
+
+execute_timeout_test() ->
+    with_connection(
+      fun(C) ->
+              {ok, S} = pgsql:parse(C, "select pg_sleep($1)"),
+              ok = pgsql:bind(C, S, [2]),
+              {error, timeout} = pgsql:execute(C, S, 0),
+              ok = pgsql:bind(C, S, [0]),
+              {ok, [{<<>>}]} = pgsql:execute(C, S, 0),
+              ok = pgsql:close(C, S),
+              ok = pgsql:sync(C)
+      end,
+      [{timeout, 10}]).
+
 %% -- run all tests --
 
 run_tests() ->
@@ -423,6 +448,9 @@ connect_only(Args) ->
 with_connection(F) ->
     with_connection(F, "epgsql_test", []).
 
+with_connection(F, Args) ->
+    with_connection(F, "epgsql_test", Args).
+
 with_connection(F, Username, Args) ->
     Args2 = [{port, ?port}, {database, "epgsql_test_db1"} | Args],
     {ok, C} = pgsql:connect(?host, Username, Args2),