Просмотр исходного кода

Make sure `socket_active` option can be used with any interface and mode

* `epgsql` / `epgsqla` / `epgsqli` / copy-mode can be used with `socket_active` option
* `epgsqli` can do fine-grained flow control because it produces `{epgsql, C, socket_passive}` messages
* other interfaces re-activate the socket under the hood as soon as `epgsql_sock` receives
  `tcp_passive` / `ssl_passive`
Sergey Prokhorov 2 лет назад
Родитель
Сommit
f4a949a139
7 измененных файлов с 168 добавлено и 28 удалено
  1. 52 7
      README.md
  2. 4 2
      doc/streaming.md
  3. 1 1
      src/commands/epgsql_cmd_connect.erl
  4. 5 5
      src/epgsql.erl
  5. 27 11
      src/epgsql_sock.erl
  6. 60 1
      test/epgsql_SUITE.erl
  7. 19 1
      test/epgsql_incremental.erl

+ 52 - 7
README.md

@@ -73,6 +73,7 @@ connect(Opts) -> {ok, Connection :: epgsql:connection()} | {error, Reason :: epg
       port =>     inet:port_number(),
       ssl =>      boolean() | required,
       ssl_opts => [ssl:tls_client_option()], % @see OTP ssl documentation
+      socket_active => true | integer(), % @see "Active socket" section below
       tcp_opts => [gen_tcp:option()],    % @see OTP gen_tcp module documentation
       timeout =>  timeout(),             % socket connect timeout, default: 5000 ms
       async =>    pid() | atom(),        % process to receive LISTEN/NOTIFY msgs
@@ -125,10 +126,12 @@ Only `host` and `username` are mandatory, but most likely you would need `databa
 - `application_name` is an optional string parameter. It is usually set by an application upon
    connection to the server. The name will be displayed in the `pg_stat_activity`
    view and included in CSV log entries.
-- `socket_active` is an optional parameter, which can be true or integer in the range -32768
-  to 32767 (inclusive). This option only works in the replication mode and is used to control
-  the flow of incoming messages. See [Streaming replication protocol](#streaming-replication-protocol)
-  for more details.
+- `socket_active` is an optional parameter, which can be `true` or an integer in the range -32768
+   to 32767 (inclusive, however only positive value make sense right now).
+   This option is used to control the flow of incoming messages from the network socket to make
+   sure huge query results won't result in `epgsql` process mailbox overflow. It affects the
+   behaviour of some of the commands and interfaces (`epgsqli` and replication), so, use with
+   caution! See [Active socket](#active-socket) for more details.
 
 Options may be passed as proplist or as map with the same key names.
 
@@ -677,6 +680,48 @@ Retrieve actual value of server-side parameters, such as character endoding,
 date/time format and timezone, server version and so on. See [libpq PQparameterStatus](https://www.postgresql.org/docs/current/static/libpq-status.html#LIBPQ-PQPARAMETERSTATUS).
 Parameter's value may change during connection's lifetime.
 
+## Active socket
+
+By default `epgsql` sets its underlying `gen_tcp` or `ssl` socket into `{active, true}` mode
+(make sure you understand the [OTP inet:setopts/2 documentation](https://www.erlang.org/doc/man/inet.html#setopts-2)
+about `active` option).
+That means if PostgreSQL decides to quickly send a huge amount of data to the client (for example,
+client made a SELECT that returns large amount of results or when we are connected in streaming
+replication mode and receiving a lot of updates), underlying network socket might quickly send
+large number of messages to the `epgsql` connection process leading to the growing mailbox and high
+RAM consumption (or even OOM situation in case of really large query result or massive replication
+update).
+
+To avoid such scenarios, `epgsql` can may rely on "TCP backpressure" to prevent socket from sending
+unlimited number of messages - implement a "flow control". To do so, `socket_active => 1..32767`
+could be added at connection time. This option would set `{active, N}` option on the underlying
+socket and would tell the network to send no more than `N` messages to `epgsql` connection and then
+pause to let `epgsql` and the client process the already received network data and then decide how
+to proceed.
+
+The way this pause is signalled to the client and how the socket can be activated again depends on
+the interface client is using:
+
+- when `epgsqli` interface is used, `epgsql` would send all the normal low level messages and then
+  at any point it may send `{epgsql, C, socket_passive}` message to signal that socket have been
+  paused. `epgsql:activate(C)` must be called to re-activate the socket.
+- when `epgsql` is connected in [Streaming replication](doc/streaming.md) mode and `pid()` is used
+  as the receiver of the X-Log Data messages, it would behave in the same way:
+  `{epgsql, C, socket_passive}` might be sent along with
+  `{epgsql, self(), {x_log_data, _, _, _}}` messages and `epgsql:activate/1` can be used to
+  re-activate.
+- in all the other cases (`epgsql` / `epgsqla` command, while `COPY FROM STDIN` mode is active,
+  when Streaming replication with Erlang module callback as receiver of X-Log Data or
+  while connection is idle) `epgsql` would transparently re-activate the socket automatically: it
+  won't prevent high RAM usage from large SELECT result, but it would make sure `epgsql` process
+  has no more than `N` messages from the network in its mailbox.
+
+It is a good idea to combine `socket_active => N` with some specific value of
+`tcp_opts => [{buffer, X}]` since each of the `N` messages sent from the network to `epgsql`
+process would contain no more than `X` bytes. So the MAXIMUM amount of data seating at the `epgsql`
+mailbox could be roughly estimated as `N * X`. So if `N = 256` and `X = 512*1024` (512kb) then
+there will be no more than `N * X = 256 * 524288 = 134_217_728` or 128MB of data in the mailbox
+at the same time.
 
 ## Streaming replication protocol
 
@@ -702,7 +747,7 @@ Here's how to create a patch that's easy to integrate:
 - Create a new branch for the proposed fix.
 - Make sure it includes a test and documentation, if appropriate.
 - Open a pull request against the `devel` branch of epgsql.
-- Passing build in travis
+- Passing CI build
 
 ## Test Setup
 
@@ -712,11 +757,11 @@ Postgres database.
 NOTE: you will need the postgis and hstore extensions to run these
 tests! On Ubuntu, you can install them with a command like this:
 
-1. `apt-get install postgresql-9.3-postgis-2.1 postgresql-contrib`
+1. `apt-get install postgresql-12-postgis-3 postgresql-contrib`
 1. `make test` # Runs the tests
 
 NOTE 2: It's possible to run tests on exact postgres version by changing $PATH like
 
-   `PATH=$PATH:/usr/lib/postgresql/9.5/bin/ make test`
+   `PATH=$PATH:/usr/lib/postgresql/12/bin/ make test`
 
 [![CI](https://github.com/epgsql/epgsql/actions/workflows/ci.yml/badge.svg)](https://github.com/epgsql/epgsql/actions/workflows/ci.yml)

+ 4 - 2
doc/streaming.md

@@ -155,8 +155,8 @@ Opts = #{host => "localhost",
 {ok, Conn} = epgsql:connect(Opts).
 ```
 
-It is currently allowed only in the replication mode. Its main purpose is to control the flow of
-replication messages from Postgresql database. If a database is under a high load and a process, which
+Its main purpose is to control the flow of replication messages from Postgresql database.
+If a database is under a high load and a process, which
 handles the message stream, cannot keep up with it then setting this option gives the handling process
 ability to get messages on-demand.
 
@@ -175,3 +175,5 @@ the connection's options.
 
 In the case of synchronous handler for replication messages `epgsql` will handle `socket_passive`
 messages internally.
+
+See [Active socket README section](../#active-socket) for more details.

+ 1 - 1
src/commands/epgsql_cmd_connect.erl

@@ -85,7 +85,7 @@ execute(PgSock, #connect{opts = #{username := Username} = Opts, stage = connect}
 execute(PgSock, #connect{stage = auth, auth_send = {PacketType, Data}} = St) ->
     {send, PacketType, Data, PgSock, St#connect{auth_send = undefined}}.
 
--spec open_socket([{atom(), any()}], epgsql:connect_opts()) ->
+-spec open_socket([{atom(), any()}], epgsql:connect_opts_map()) ->
     {ok , gen_tcp | ssl, gen_tcp:socket() | ssl:sslsocket()} | {error, any()}.
 open_socket(SockOpts, #{host := Host} = ConnectOpts) ->
     Timeout = maps:get(timeout, ConnectOpts, 5000),

+ 5 - 5
src/epgsql.erl

@@ -42,7 +42,7 @@
 %% private
 -export([handle_x_log_data/5]).
 
--export_type([connection/0, connect_option/0, connect_opts/0,
+-export_type([connection/0, connect_option/0, connect_opts/0, connect_opts_map/0,
               connect_error/0, query_error/0, sql_query/0, column/0,
               type_name/0, epgsql_type/0, statement/0,
               transaction_option/0, transaction_opts/0, socket_active/0]).
@@ -81,10 +81,8 @@
     {replication, Replication :: string()} | % Pass "database" to connect in replication mode
     {application_name, ApplicationName :: string()} |
     {socket_active, Active :: socket_active()}.
-
--type connect_opts() ::
-        [connect_option()]
-      | #{host => host(),
+-type connect_opts_map() ::
+        #{host => host(),
           username => string(),
           password => password(),
           database => string(),
@@ -101,6 +99,8 @@
           socket_active => socket_active()
           }.
 
+-type connect_opts() :: connect_opts_map() | [connect_option()].
+
 -type transaction_option() ::
     {reraise, boolean()}          |
     {ensure_committed, boolean()} |

+ 27 - 11
src/epgsql_sock.erl

@@ -78,7 +78,7 @@
                    | {cast, pid(), reference()}
                    | {incremental, pid(), reference()}.
 
--type tcp_socket() :: port(). %gen_tcp:socket() isn't exported prior to erl 18
+-type tcp_socket() :: gen_tcp:socket().
 -type repl_state() :: #repl{}.
 -type copy_state() :: #copy{}.
 
@@ -102,7 +102,7 @@
                 txstatus :: byte() | undefined,  % $I | $T | $E,
                 complete_status :: atom() | {atom(), integer()} | undefined,
                 subproto_state :: repl_state() | copy_state() | undefined,
-                connect_opts :: epgsql:connect_opts() | undefined}).
+                connect_opts :: epgsql:connect_opts_map() | undefined}).
 
 -opaque pg_sock() :: #state{}.
 
@@ -195,9 +195,7 @@ set_attr(connect_opts, ConnectOpts, State) ->
 %% XXX: be careful!
 -spec set_packet_handler(atom(), pg_sock()) -> pg_sock().
 set_packet_handler(Handler, State0) ->
-    State = State0#state{handler = Handler},
-    ok = activate_socket(State),
-    State.
+    State0#state{handler = Handler}.
 
 -spec get_codec(pg_sock()) -> epgsql_binary:codec().
 get_codec(#state{codec = Codec}) ->
@@ -290,7 +288,7 @@ handle_info({DataTag, Sock, Data2}, #state{data = Data, sock = Sock} = State)
 
 handle_info({Passive, Sock}, #state{sock = Sock} = State)
   when Passive == ssl_passive; Passive == tcp_passive ->
-    NewState = send_socket_pasive(State),
+    NewState = handle_socket_pasive(State),
     {noreply, NewState};
 
 handle_info({Closed, Sock}, #state{sock = Sock} = State)
@@ -328,11 +326,24 @@ format_status(terminate, [_PDict, State]) ->
   State#state{rows = information_redacted}.
 
 %% -- internal functions --
--spec send_socket_pasive(pg_sock()) -> pg_sock().
-send_socket_pasive(#state{subproto_state = #repl{receiver = Rec}} = State) when Rec =/= undefined ->
+-spec handle_socket_pasive(pg_sock()) -> pg_sock().
+handle_socket_pasive(#state{handler = on_replication,
+                            subproto_state = #repl{receiver = Rec}} = State) when is_pid(Rec) ->
+    %% Replication with pid() as X-Log data receiver
     Rec ! {epgsql, self(), socket_passive},
     State;
-send_socket_pasive(State) ->
+handle_socket_pasive(#state{current_cmd_transport = {incremental, From, _}} = State) ->
+    %% `epgsqli' interface command
+    From ! {epgsql, self(), socket_passive},
+    State;
+handle_socket_pasive(State) ->
+    %% - current_cmd_transport is `call' or `cast': client expects whole result set anyway
+    %% - handler = on_copy_from_stdin: we don't expect much data from the server
+    %% - handler = on_replication with callback module as X-Log data receiver: pace controlled by
+    %%   callback execution time
+    %% - idle (eg, receiving asynchronous error or NOTIFICATION/WARNING): client might not expect
+    %%   to receive the `socket_passive' messages or there might be no client at all. Also, async
+    %%   notifications are usually small.
     ok = activate_socket(State),
     State.
 
@@ -436,7 +447,7 @@ setopts(#state{mod = Mod, sock = Sock}, Opts) ->
     end.
 
 -spec get_socket_active(pg_sock()) -> epgsql:socket_active().
-get_socket_active(#state{handler = on_replication, connect_opts = #{socket_active := Active}}) ->
+get_socket_active(#state{connect_opts = #{socket_active := Active}}) ->
     Active;
 get_socket_active(_State) ->
     true.
@@ -465,7 +476,12 @@ send_multi(#state{mod = Mod, sock = Sock}, List) ->
 do_send(gen_tcp, Sock, Bin) ->
     %% Why not gen_tcp:send/2?
     %% See https://github.com/rabbitmq/rabbitmq-common/blob/v3.7.4/src/rabbit_writer.erl#L367-L384
-    %% Because of that we also have `handle_info({inet_reply, ...`
+    %% Since `epgsql' uses `{active, true}' socket option by-default, it may potentially quickly
+    %% receive huge amount of data from the network.
+    %% With introduction of `{socket_active, N}' option it becomes less of a problem, but
+    %% `{active, true}' is still the default.
+    %%
+    %% Because we use `inet' driver directly, we also have `handle_info({inet_reply, ...`
     try erlang:port_command(Sock, Bin) of
         true ->
             ok

+ 60 - 1
test/epgsql_SUITE.erl

@@ -77,6 +77,10 @@ groups() ->
             pipelined_prepared_query,
             pipelined_parse_batch_execute
         ]},
+        {incremental_sock_active, [parallel], [
+            incremental_sock_active_n,
+            incremental_sock_active_n_ssl
+        ]},
         {generic, [parallel], [
             with_transaction,
             mixed_api
@@ -145,7 +149,7 @@ groups() ->
     SubGroups ++
         [{epgsql, [], [{group, generic} | Tests]},
          {epgsql_cast, [], [{group, pipelining} | Tests]},
-         {epgsql_incremental, [], Tests}].
+         {epgsql_incremental, [], [{group, incremental_sock_active} | Tests]}].
 
 end_per_suite(_Config) ->
     ok.
@@ -1609,6 +1613,61 @@ pipelined_parse_batch_execute(Config) ->
                end || Ref <- CloseRefs],
               erlang:cancel_timer(Timer)
       end).
+
+incremental_sock_active_n(Config) ->
+    epgsql_incremental = ?config(module, Config),
+    Q = "SELECT *, 'Hello world' FROM generate_series(0, 10240)",
+    epgsql_ct:with_connection(Config,
+         fun(C) ->
+             Ref = epgsqli:squery(C, Q),
+             {done, NumPassive, Others, Rows} = recv_incremental_active_n(C, Ref),
+             ?assertMatch([{columns, _}, {complete, _}], Others),
+             ?assert(NumPassive > 0),
+             ?assertMatch([{<<"0">>, <<"Hello world">>},
+                           {<<"1">>, <<"Hello world">>} | _], Rows),
+             ?assertEqual(10241, length(Rows))
+         end,
+         "epgsql_test",
+         [{socket_active, 2}]).
+
+incremental_sock_active_n_ssl(Config) ->
+    epgsql_incremental = ?config(module, Config),
+    Q = "SELECT *, 'Hello world' FROM generate_series(0, 10240)",
+    epgsql_ct:with_connection(Config,
+         fun(C) ->
+             Ref = epgsqli:squery(C, Q),
+             {done, NumPassive, Others, Rows} = recv_incremental_active_n(C, Ref),
+             ?assertMatch([{columns, _}, {complete, _}], Others),
+             ?assert(NumPassive > 0),
+             ?assertMatch([{<<"0">>, <<"Hello world">>},
+                           {<<"1">>, <<"Hello world">>} | _], Rows),
+             ?assertEqual(10241, length(Rows))
+         end,
+         "epgsql_test",
+         [{ssl, true}, {socket_active, 2}]).
+
+recv_incremental_active_n(C, Ref) ->
+    recv_incremental_active_n(C, Ref, 0, [], []).
+
+recv_incremental_active_n(C, Ref, NumPassive, Rows, Others) ->
+    receive
+        {C, Ref, {data, Row}} ->
+            recv_incremental_active_n(C, Ref, NumPassive, [Row | Rows], Others);
+        {epgsql, C, socket_passive} ->
+            ok = epgsql:activate(C),
+            recv_incremental_active_n(C, Ref, NumPassive + 1, Rows, Others);
+        {C, Ref, {error, _} = E} ->
+            E;
+        {C, Ref, done} ->
+            {done, NumPassive, lists:reverse(Others), lists:reverse(Rows)};
+        {C, Ref, Other} ->
+            recv_incremental_active_n(C, Ref, NumPassive, Rows, [Other | Others]);
+        Other ->
+            recv_incremental_active_n(C, Ref, NumPassive, Rows, [Other | Others])
+    after 5000 ->
+            error({timeout, NumPassive, Others, Rows})
+    end.
+
 %% =============================================================================
 %% Internal functions
 %% ============================================================================

+ 19 - 1
test/epgsql_incremental.erl

@@ -40,7 +40,10 @@ await_connect(Ref, Opts0) ->
         {C, Ref, connected} ->
             {ok, C};
         {_C, Ref, Error = {error, _}} ->
-            Error
+            Error;
+        {epgsql, C, socket_passive} ->
+            ok = epgsql:activate(C),
+            await_connect(Ref, Opts0)
     after Timeout ->
             error(timeout)
     end.
@@ -200,6 +203,9 @@ receive_result(C, Ref, Cols, Rows) ->
             {ok, Cols, lists:reverse(Rows)};
         {C, Ref, done} ->
             done;
+        {epgsql, C, socket_passive} ->
+            ok = epgsql:activate(C),
+            receive_result(C, Ref, Cols, Rows);
         {'EXIT', C, _Reason} ->
             throw({error, closed})
     end.
@@ -229,6 +235,9 @@ receive_extended_result(C, Ref, Rows) ->
             {ok, lists:reverse(Rows)};
         {C, Ref, done} ->
             done;
+        {epgsql, C, socket_passive} ->
+            ok = epgsql:activate(C),
+            receive_extended_result(C, Ref, Rows);
         {'EXIT', C, _Reason} ->
             {error, closed}
     end.
@@ -243,6 +252,9 @@ receive_describe(C, Ref, Statement = #statement{}) ->
             {ok, Statement#statement{columns = []}};
         {C, Ref, Error = {error, _}} ->
             Error;
+        {epgsql, C, socket_passive} ->
+            ok = epgsql:activate(C),
+            receive_describe(C, Ref, Statement);
         {'EXIT', C, _Reason} ->
             {error, closed}
     end.
@@ -255,6 +267,9 @@ receive_describe_portal(C, Ref) ->
             {ok, []};
         {C, Ref, Error = {error, _}} ->
             Error;
+        {epgsql, C, socket_passive} ->
+            ok = epgsql:activate(C),
+            receive_describe_portal(C, Ref);
         {'EXIT', C, _Reason} ->
             {error, closed}
     end.
@@ -265,6 +280,9 @@ receive_atom(C, Ref, Receive, Return) ->
             Return;
         {C, Ref, Error = {error, _}} ->
             Error;
+        {epgsql, C, socket_passive} ->
+            ok = epgsql:activate(C),
+            receive_atom(C, Ref, Receive, Return);
         {'EXIT', C, _Reason} ->
             {error, closed}
     end.