Просмотр исходного кода

Merge pull request #99 from seriyps/set_notice_receiver

Add `set_notice_receiver/2` function
Sergey Prokhorov 8 лет назад
Родитель
Сommit
c4d675a89d
9 измененных файлов с 130 добавлено и 19 удалено
  1. 22 6
      README.md
  2. 2 1
      include/epgsql.hrl
  3. 6 0
      src/epgsql.erl
  4. 21 10
      src/epgsql_sock.erl
  5. 6 0
      src/epgsqla.erl
  6. 6 0
      src/epgsqli.erl
  7. 4 1
      test/epgsql_cast.erl
  8. 4 1
      test/epgsql_incremental.erl
  9. 59 0
      test/epgsql_tests.erl

+ 22 - 6
README.md

@@ -68,8 +68,8 @@ see `CHANGES` for full list.
     {ssl,      IsEnabled  :: boolean() | required} |
     {ssl_opts, SslOptions :: [ssl:ssl_option()]}   | % @see OTP ssl app, ssl_api.hrl
     {timeout,  TimeoutMs  :: timeout()}            | % default: 5000 ms
-    {async,    Receiver   :: pid()}. % process to receive LISTEN/NOTIFY msgs
-    
+    {async,    Receiver   :: pid() | atom()}. % process to receive LISTEN/NOTIFY msgs
+
 -spec connect(host(), string(), string(), [connect_option()])
         -> {ok, Connection :: connection()} | {error, Reason :: connect_error()}.    
 %% @doc connects to Postgres
@@ -442,11 +442,27 @@ see `epgsql.hrl` for the record definition. `epgsql` functions may also return
 ## Server Notifications
 
 PostgreSQL may deliver two types of asynchronous message: "notices" in response
-to notice and warning messages generated by the server, and "notifications" which
-are generated by the `LISTEN/NOTIFY` mechanism.
+to [notice and warning](https://www.postgresql.org/docs/current/static/plpgsql-errors-and-messages.html)
+messages generated by the server, and [notifications](https://www.postgresql.org/docs/current/static/sql-notify.html)
+which are generated by the `LISTEN/NOTIFY` mechanism.
+
+Passing the `{async, PidOrName}` option to `epgsql:connect/3` will result in these async
+messages being sent to the specified pid or registered process, otherwise they will be dropped.
 
-Passing the `{async, Pid}` option to `epgsql:connect/3` will result in these async
-messages being sent to the specified process, otherwise they will be dropped.
+Another way to set notification receiver is to use `set_notice_receiver/2` function.
+It returns previous `async` value. Use `undefined` to disable notifications.
+
+```erlang
+% receiver is pid()
+{ok, Previous} = epgsql:set_notice_receiver(C, self()).
+
+% receiver is registered process
+register(notify_receiver, self()).
+{ok, Previous1} = epgsqla:set_notice_receiver(C, notify_receiver).
+
+% disable notifications
+{ok, Previous2} = epgsqli:set_notice_receiver(C, undefined).
+```
 
 Message formats:
 

+ 2 - 1
include/epgsql.hrl

@@ -15,7 +15,8 @@
 }).
 
 -record(error, {
-    severity :: fatal | error | atom(), %TODO: concretize
+    % see client_min_messages config option
+    severity :: debug | log | info | notice | warning | error | fatal | panic,
     code :: binary(),
     codename :: atom(),
     message :: binary(),

+ 6 - 0
src/epgsql.erl

@@ -6,6 +6,7 @@
 -export([connect/1, connect/2, connect/3, connect/4, connect/5,
          close/1,
          get_parameter/2,
+         set_notice_receiver/2,
          squery/2,
          equery/2, equery/3, equery/4,
          prepared_query/3,
@@ -135,6 +136,11 @@ close(C) ->
 get_parameter(C, Name) ->
     epgsql_sock:get_parameter(C, Name).
 
+-spec set_notice_receiver(connection(), undefined | pid() | atom()) ->
+                                 {ok, Previous :: pid() | atom()}.
+set_notice_receiver(C, PidOrName) ->
+    epgsql_sock:set_notice_receiver(C, PidOrName).
+
 -spec squery(connection(), sql_query()) -> reply(squery_row()) | [reply(squery_row())].
 %% @doc runs simple `SqlQuery' via given `Connection'
 squery(Connection, SqlQuery) ->

+ 21 - 10
src/epgsql_sock.erl

@@ -8,6 +8,7 @@
 -export([start_link/0,
          close/1,
          get_parameter/2,
+         set_notice_receiver/2,
          cancel/1]).
 
 -export([handle_call/3, handle_cast/2, handle_info/2]).
@@ -86,6 +87,10 @@ close(C) when is_pid(C) ->
 get_parameter(C, Name) ->
     gen_server:call(C, {get_parameter, to_binary(Name)}, infinity).
 
+set_notice_receiver(C, PidOrName) when is_pid(PidOrName);
+                                       is_atom(PidOrName) ->
+    gen_server:call(C, {set_async_receiver, PidOrName}, infinity).
+
 cancel(S) ->
     gen_server:cast(S, cancel).
 
@@ -105,6 +110,9 @@ handle_call({get_parameter, Name}, _From, State) ->
     end,
     {reply, {ok, Value1}, State};
 
+handle_call({set_async_receiver, PidOrName}, _From, #state{async = Previous} = State) ->
+    {reply, {ok, Previous}, State#state{async = PidOrName}};
+
 handle_call(Command, From, State) ->
     #state{queue = Q} = State,
     Req = {{call, From}, Command},
@@ -426,12 +434,15 @@ notify(State = #state{queue = Q}, Notice) ->
     end,
     State.
 
-notify_async(State = #state{async = Pid}, Msg) ->
-    case is_pid(Pid) of
-        true  -> Pid ! {epgsql, self(), Msg};
-        false -> false
-    end,
-    State.
+notify_async(#state{async = undefined}, _) ->
+    false;
+notify_async(#state{async = PidOrName}, Msg) ->
+    try PidOrName ! {epgsql, self(), Msg} of
+        _ -> true
+    catch error:badarg ->
+            %% no process registered under this name
+            false
+    end.
 
 command_tag(#state{queue = Q}) ->
     {_, Req} = queue:get(Q),
@@ -724,8 +735,8 @@ on_message(Error = {error, Reason}, State) ->
 
 %% NoticeResponse
 on_message({?NOTICE, Data}, State) ->
-    State2 = notify_async(State, {notice, epgsql_wire:decode_error(Data)}),
-    {noreply, State2};
+    notify_async(State, {notice, epgsql_wire:decode_error(Data)}),
+    {noreply, State};
 
 %% ParameterStatus
 on_message({?PARAMETER_STATUS, Data}, State) ->
@@ -740,5 +751,5 @@ on_message({?NOTIFICATION, <<Pid:?int32, Strings/binary>>}, State) ->
         [Channel, Payload] -> {Channel, Payload};
         [Channel]          -> {Channel, <<>>}
     end,
-    State2 = notify_async(State, {notification, Channel1, Pid, Payload1}),
-    {noreply, State2}.
+    notify_async(State, {notification, Channel1, Pid, Payload1}),
+    {noreply, State}.

+ 6 - 0
src/epgsqla.erl

@@ -6,6 +6,7 @@
          connect/2, connect/3, connect/4, connect/5,
          close/1,
          get_parameter/2,
+         set_notice_receiver/2,
          squery/2,
          equery/2, equery/3,
          prepared_query/3,
@@ -49,6 +50,11 @@ close(C) ->
 get_parameter(C, Name) ->
     epgsql_sock:get_parameter(C, Name).
 
+-spec set_notice_receiver(epgsql:connection(), undefined | pid() | atom()) ->
+                                 {ok, Previous :: pid() | atom()}.
+set_notice_receiver(C, PidOrName) ->
+    epgsql_sock:set_notice_receiver(C, PidOrName).
+
 -spec squery(epgsql:connection(), string()) -> reference().
 squery(C, Sql) ->
     cast(C, {squery, Sql}).

+ 6 - 0
src/epgsqli.erl

@@ -6,6 +6,7 @@
          connect/2, connect/3, connect/4, connect/5,
          close/1,
          get_parameter/2,
+         set_notice_receiver/2,
          squery/2,
          equery/2, equery/3,
          prepared_query/3,
@@ -48,6 +49,11 @@ close(C) ->
 get_parameter(C, Name) ->
     epgsql_sock:get_parameter(C, Name).
 
+-spec set_notice_receiver(epgsql:connection(), undefined | pid() | atom()) ->
+                                 {ok, Previous :: pid() | atom()}.
+set_notice_receiver(C, PidOrName) ->
+    epgsql_sock:set_notice_receiver(C, PidOrName).
+
 -spec squery(epgsql:connection(), string()) -> reference().
 squery(C, Sql) ->
     incremental(C, {squery, Sql}).

+ 4 - 1
test/epgsql_cast.erl

@@ -6,7 +6,7 @@
 -module(epgsql_cast).
 
 -export([connect/2, connect/3, connect/4, close/1]).
--export([get_parameter/2, squery/2, equery/2, equery/3]).
+-export([get_parameter/2, set_notice_receiver/2, squery/2, equery/2, equery/3]).
 -export([prepared_query/3]).
 -export([parse/2, parse/3, parse/4, describe/2, describe/3]).
 -export([bind/3, bind/4, execute/2, execute/3, execute/4, execute_batch/2]).
@@ -43,6 +43,9 @@ close(C) ->
 get_parameter(C, Name) ->
     epgsqla:get_parameter(C, Name).
 
+set_notice_receiver(C, PidOrName) ->
+    epgsqla:set_notice_receiver(C, PidOrName).
+
 squery(C, Sql) ->
     Ref = epgsqla:squery(C, Sql),
     receive_result(C, Ref).

+ 4 - 1
test/epgsql_incremental.erl

@@ -6,7 +6,7 @@
 -module(epgsql_incremental).
 
 -export([connect/2, connect/3, connect/4, close/1]).
--export([get_parameter/2, squery/2, equery/2, equery/3]).
+-export([get_parameter/2, set_notice_receiver/2, squery/2, equery/2, equery/3]).
 -export([prepared_query/3]).
 -export([parse/2, parse/3, parse/4, describe/2, describe/3]).
 -export([bind/3, bind/4, execute/2, execute/3, execute/4, execute_batch/2]).
@@ -41,6 +41,9 @@ close(C) ->
 get_parameter(C, Name) ->
     epgsqli:get_parameter(C, Name).
 
+set_notice_receiver(C, PidOrName) ->
+    epgsqli:set_notice_receiver(C, PidOrName).
+
 squery(C, Sql) ->
     Ref = epgsqli:squery(C, Sql),
     case receive_results(C, Ref, []) of

+ 59 - 0
test/epgsql_tests.erl

@@ -826,6 +826,65 @@ listen_notify_payload_test(Module) ->
       end,
       [{async, self()}]).
 
+set_notice_receiver_test(Module) ->
+    with_min_version(
+      Module,
+      9.0,
+      fun(C) ->
+          {ok, [], []}     = Module:squery(C, "listen epgsql_test"),
+          {ok, _, [{Pid}]} = Module:equery(C, "select pg_backend_pid()"),
+
+          EnsureNoNotification = fun(Payload) ->
+              {ok, [], []}     = Module:squery(C, ["notify epgsql_test, '", Payload, "'"]),
+              receive
+                  {epgsql, _, _} -> erlang:error(got_unexpected_notification)
+              after
+                  10 -> ok
+              end
+          end,
+          EnsureNotification = fun(Payload) ->
+              {ok, [], []}     = Module:squery(C, ["notify epgsql_test, '", Payload, "'"]),
+              receive
+                  {epgsql, C, {notification, <<"epgsql_test">>, Pid, Payload}} -> ok
+              after
+                  100 -> erlang:error(didnt_receive_notification)
+              end
+          end,
+          Self = self(),
+
+          EnsureNoNotification(<<"test1">>),
+
+          % Set pid()
+          {ok, undefined} = Module:set_notice_receiver(C, Self),
+          EnsureNotification(<<"test2">>),
+
+          %% test PL/PgSQL NOTICE
+          {ok, [], []} = Module:squery(
+                           C, ["DO $$ BEGIN RAISE WARNING 'test notice'; END $$;"]),
+          receive
+              {epgsql, C, {notice, #error{severity = warning,
+                                          code = <<"01000">>,
+                                          message = <<"test notice">>}}} -> ok
+          after
+              100 -> erlang:error(didnt_receive_notice)
+          end,
+
+          % set registered pid
+          Receiver = pg_notification_receiver,
+          register(Receiver, Self),
+          {ok, Self} = Module:set_notice_receiver(C, Receiver),
+          EnsureNotification(<<"test3">>),
+
+          % make registered name invalid
+          unregister(Receiver),
+          EnsureNoNotification(<<"test4">>),
+
+          % disable
+          {ok, Receiver} = Module:set_notice_receiver(C, undefined),
+          EnsureNoNotification(<<"test5">>)
+      end,
+      []).
+
 application_test(_Module) ->
     lists:foreach(fun application:start/1, ?ssl_apps),
     ok = application:start(epgsql),