Browse Source

support asynchronous notices and notifications

Will 15 years ago
parent
commit
122028f9e1
5 changed files with 113 additions and 14 deletions
  1. 33 7
      README
  2. 0 4
      src/pgsql.erl
  3. 19 3
      src/pgsql_connection.erl
  4. 8 0
      src/pgsql_sock.erl
  5. 53 0
      test_src/pgsql_tests.erl

+ 33 - 7
README

@@ -9,17 +9,18 @@ Erlang PostgreSQL Database Client
   Password  - optional password to authenticate with.
   Opts      - property list of extra options. Supported properties:
 
-    + database
-    + port
-    + ssl (true | false | required)
-    + ssl_opts (see ssl docs in OTP)
-    + timeout (milliseconds, defaults to 5000)
+    + {database, String}
+    + {port,     Integer}
+    + {ssl,      Atom}       true | false | required
+    + {ssl_opts  List}       see ssl application docs in OTP
+    + {timeout,  Integer}    milliseconds, defaults to 5000
+    + {async,    Pid}        see Asynchronous Messages section
 
   {ok, C} = pgsql:connect("localhost", "username", [{database, "test_db"}]).
   ok = pgsql:close(C).
 
-  The timeout parameter is applied to all operations. In the case of equery
-  this means that total execution time may exceed the timeout value.
+  The timeout parameter will trigger an {error, timeout} result when the
+  server fails to respond within Timeout milliseconds.
 
 * Simple Query
 
@@ -109,3 +110,28 @@ Erlang PostgreSQL Database Client
   Errors originating from the PostgreSQL backend are returned as {error, #error{}},
   see pgsql.hrl for the record definition. epgsql may also return {error, Atom}
   where Atom is 'timeout' or 'closed'.
+
+* Asynchronous Messages
+
+  PostgreSQL may deliver two types of asynchronous message: "notices" in response
+  to notice and warning messages generated by the server, and "notifications" which
+  are generated by the LISTEN/NOTIFY mechanism.
+
+  Passing the {async, Pid} option to pgsql:connect will result in these async
+  messages being sent to the specified process, otherwise they will be dropped.
+
+  Message formats:
+
+    {pgsql, Connection, {notification, Channel, Pid, Payload}}
+
+      Connection  - connection the notification occured on
+
+      Channel     - channel the notification occured on
+      Pid         - database session pid that sent notification
+      Payload     - optional payload, only available from PostgreSQL >= 9.0
+
+    {pgsql, Connection, {notice, Error}}
+
+      Connection  - connection the notice occured on
+      Error       - an #error{} record, see pgsql.hrl
+

+ 0 - 4
src/pgsql.erl

@@ -143,8 +143,6 @@ receive_result(C, Cols, Rows) ->
             end;
         {pgsql, C, {complete, _Type}} ->
             {ok, Cols, lists:reverse(Rows)};
-        {pgsql, C, {notice, _N}} ->
-            receive_result(C, Cols, Rows);
         {pgsql, C, done} ->
             done;
         {pgsql, C, timeout} ->
@@ -171,8 +169,6 @@ receive_extended_result(C, Rows) ->
             end;
         {pgsql, C, {complete, _Type}} ->
             {ok, lists:reverse(Rows)};
-        {pgsql, C, {notice, _N}} ->
-            receive_extended_result(C, Rows);
         {pgsql, C, timeout} ->
             {error, timeout};
         {'EXIT', C, _Reason} ->

+ 19 - 3
src/pgsql_connection.erl

@@ -25,6 +25,7 @@
           parameters = [],
           reply,
           reply_to,
+          async,
           backend,
           statement,
           txstatus}).
@@ -76,8 +77,12 @@ init([]) ->
     process_flag(trap_exit, true),
     {ok, startup, #state{}}.
 
-handle_event({notice, Notice}, State_Name, State) ->
-    notify(State, {notice, Notice}),
+handle_event({notice, _Notice} = Msg, State_Name, State) ->
+    notify_async(State, Msg),
+    {next_state, State_Name, State};
+
+handle_event({notification, _Channel, _Pid, _Payload} = Msg, State_Name, State) ->
+    notify_async(State, Msg),
     {next_state, State_Name, State};
 
 handle_event({parameter_status, Name, Value}, State_Name, State) ->
@@ -113,11 +118,16 @@ code_change(_Old_Vsn, State_Name, State, _Extra) ->
 
 startup({connect, Host, Username, Password, Opts}, From, State) ->
     Timeout = proplists:get_value(timeout, Opts, 5000),
+    Async   = proplists:get_value(async, Opts, undefined),
     case pgsql_sock:start_link(self(), Host, Username, Opts) of
         {ok, Sock} ->
             put(username, Username),
             put(password, Password),
-            State2 = State#state{sock = Sock, timeout = Timeout, reply_to = From},
+            State2 = State#state{
+                       sock     = Sock,
+                       timeout  = Timeout,
+                       reply_to = From,
+                       async    = Async},
             {next_state, auth, State2, Timeout};
         Error ->
             {stop, normal, Error, State}
@@ -619,6 +629,12 @@ encode_list(L) ->
 notify(#state{reply_to = {Pid, _Tag}}, Msg) ->
     Pid ! {pgsql, self(), Msg}.
 
+notify_async(#state{async = Pid}, Msg) ->
+    case is_pid(Pid) of
+        true  -> Pid ! {pgsql, self(), Msg};
+        false -> false
+    end.
+
 to_binary(B) when is_binary(B) -> B;
 to_binary(L) when is_list(L)   -> list_to_binary(L).
 

+ 8 - 0
src/pgsql_sock.erl

@@ -147,6 +147,14 @@ decode(<<Type:8, Len:?int32, Rest/binary>> = Bin, #state{c = C} = State) ->
         <<Data:Len2/binary, Tail/binary>> when Type == $E ->
             gen_fsm:send_event(C, {error, decode_error(Data)}),
             decode(Tail, State);
+        <<Data:Len2/binary, Tail/binary>> when Type == $A ->
+            <<Pid:?int32, Strings/binary>> = Data,
+            case decode_strings(Strings) of
+                [Channel, Payload] -> ok;
+                [Channel]          -> Payload = <<>>
+            end,
+            gen_fsm:send_all_state_event(C, {notification, Channel, Pid, Payload}),
+            decode(Tail, State);
         <<Data:Len2/binary, Tail/binary>> ->
             gen_fsm:send_event(C, {Type, Data}),
             decode(Tail, State);

+ 53 - 0
test_src/pgsql_tests.erl

@@ -484,6 +484,47 @@ active_connection_closed_test() ->
     end,
     flush().
 
+warning_notice_test() ->
+    with_connection(
+      fun(C) ->
+          {ok, _, _} = pgsql:squery(C, "select 'test\\n'"),
+          receive
+              {pgsql, C, {notice, #error{code = <<"22P06">>}}} -> ok
+          after
+              100 -> erlang:error(didnt_receive_notice)
+          end
+      end,
+      [{async, self()}]).
+
+listen_notify_test() ->
+    with_connection(
+      fun(C) ->
+          {ok, [], []}     = pgsql:squery(C, "listen epgsql_test"),
+          {ok, _, [{Pid}]} = pgsql:equery(C, "select pg_backend_pid()"),
+          {ok, [], []}     = pgsql:squery(C, "notify epgsql_test"),
+          receive
+              {pgsql, C, {notification, <<"epgsql_test">>, Pid, <<>>}} -> ok
+          after
+              100 -> erlang:error(didnt_receive_notification)
+          end
+      end,
+      [{async, self()}]).
+
+listen_notify_payload_test() ->
+    with_min_version(
+      9.0,
+      fun(C) ->
+          {ok, [], []}     = pgsql:squery(C, "listen epgsql_test"),
+          {ok, _, [{Pid}]} = pgsql:equery(C, "select pg_backend_pid()"),
+          {ok, [], []}     = pgsql:squery(C, "notify epgsql_test, 'test!'"),
+          receive
+              {pgsql, C, {notification, <<"epgsql_test">>, Pid, <<"test!">>}} -> ok
+          after
+              100 -> erlang:error(didnt_receive_notification)
+          end
+      end,
+      [{async, self()}]).
+
 %% -- run all tests --
 
 run_tests() ->
@@ -531,6 +572,18 @@ with_rollback(F) ->
                   end
       end).
 
+with_min_version(Min, F, Args) ->
+    with_connection(
+      fun(C) ->
+          {ok, Bin} = pgsql:get_parameter(C, <<"server_version">>),
+          {ok, [{float, 1, Ver} | _], _} = erl_scan:string(binary_to_list(Bin)),
+          case Ver >= Min of
+              true  -> F(C);
+              false -> ?debugFmt("skipping test requiring PostgreSQL >= ~.2f~n", [Min])
+          end
+      end,
+      Args).
+
 check_type(Type, In, Out, Values) ->
     Column = "c_" ++ atom_to_list(Type),
     check_type(Type, In, Out, Values, Column).