Browse Source

Merge branch 'devel'

Sergey Prokhorov 2 years ago
parent
commit
bd6544711f

+ 31 - 14
.github/workflows/ci.yml

@@ -16,32 +16,49 @@ jobs:
       fail-fast: false
       matrix:
         os:
-          - "ubuntu-20.04"
+          - "ubuntu-22.04"
         pg:
-          - 12
+          - 14
         postgis:
-          - 3
+          - "3"
         otp:
-          - "24.0"
-          - "23.3"
-          - "22.3"
-          - "21.3"
-          - "20.3"
+          - "25.2"
+          - "24.3"
+        rebar3:
+          - "3.20.0"
         include:
-          - otp: "19.3"
-            os: "ubuntu-18.04"
-            pg: 10
-            postgis: "2.4"
+          - os: "ubuntu-20.04"
+            otp: "23.3"
+            rebar3: "3.17.0"
+            pg: "12"
+            postgis: "3"
+          - os: "ubuntu-20.04"
+            otp: "22.3"
+            rebar3: "3.17.0"
+            pg: "12"
+            postgis: "3"
+          - os: "ubuntu-20.04"
+            otp: "21.3"
+            rebar3: "3.15.2"
+            pg: "12"
+            postgis: "3"
+          - os: "ubuntu-20.04"
+            otp: "20.3"
+            rebar3: "3.15.2"
+            pg: "12"
+            postgis: "3"
+
     # env:
     #   PATH: ".:/usr/lib/postgresql/12/bin:$PATH"
     env:
       SHELL: /bin/sh            # needed for erlexec
     steps:
-      - uses: actions/checkout@v2
+      - uses: actions/checkout@v3
 
       - uses: erlef/setup-beam@v1
         with:
           otp-version: ${{matrix.otp}}
+          rebar3-version: ${{ matrix.rebar3 }}
 
       - name: Setup postgresql server with postgis
         run: sudo apt install postgresql-${{matrix.pg}} postgresql-contrib-${{matrix.pg}} postgresql-${{matrix.pg}}-postgis-${{matrix.postgis}} postgresql-${{matrix.pg}}-postgis-${{matrix.postgis}}-scripts
@@ -53,7 +70,7 @@ jobs:
         run: PATH=$PATH:/usr/lib/postgresql/${{matrix.pg}}/bin/ make test
 
       - name: Upload CT logs artifact
-        uses: actions/upload-artifact@v2
+        uses: actions/upload-artifact@v3
         if: failure()
         with:
           name: ct_report_pg-${{matrix.pg}}_otp-${{matrix.otp}}

+ 7 - 0
CHANGES

@@ -1,3 +1,10 @@
+In 4.7.0
+
+* Flow control `{socket_active, N}` option in streaming replication #271
+* `{socket_active, N}` option generalized to all `epgsql` modes #278
+* CI upgraded to support OTP-25, OTP-19 dropped, run some tests on PostgreSQL 14 #276
+* `format_status/1` OTP-25 `gen_server` callback implemented #277
+
 In 4.6.1
 
 * Introduce `get_backend_pid` function #270

+ 5 - 9
Makefile

@@ -1,16 +1,12 @@
-REBAR = ./rebar3
+REBAR = rebar3
 MINIMAL_COVERAGE = 55
 
 all: compile
 
-$(REBAR):
-	wget https://github.com/erlang/rebar3/releases/download/3.15.2/rebar3
-	chmod +x rebar3
-
-compile: src/epgsql_errcodes.erl $(REBAR)
+compile: src/epgsql_errcodes.erl
 	@$(REBAR) compile
 
-clean: $(REBAR)
+clean:
 	@$(REBAR) clean
 	@rm -f doc/*.html
 	@rm -f doc/erlang.png
@@ -35,10 +31,10 @@ test: compile eunit common-test cover
 dialyzer: compile
 	@$(REBAR) dialyzer
 
-elvis: $(REBAR)
+elvis:
 	@$(REBAR) as lint lint
 
-edoc: $(REBAR)
+edoc:
 	@$(REBAR) edoc
 
 .PHONY: all compile clean common-test eunit cover test dialyzer elvis

+ 53 - 4
README.md

@@ -73,6 +73,7 @@ connect(Opts) -> {ok, Connection :: epgsql:connection()} | {error, Reason :: epg
       port =>     inet:port_number(),
       ssl =>      boolean() | required,
       ssl_opts => [ssl:tls_client_option()], % @see OTP ssl documentation
+      socket_active => true | integer(), % @see "Active socket" section below
       tcp_opts => [gen_tcp:option()],    % @see OTP gen_tcp module documentation
       timeout =>  timeout(),             % socket connect timeout, default: 5000 ms
       async =>    pid() | atom(),        % process to receive LISTEN/NOTIFY msgs
@@ -108,7 +109,7 @@ Only `host` and `username` are mandatory, but most likely you would need `databa
 - `ssl` if set to `true`, perform an attempt to connect in ssl mode, but continue unencrypted
   if encryption isn't supported by server. if set to `required` connection will fail if encryption
   is not available.
-- `ssl_opts` will be passed as is to `ssl:connect/3`
+- `ssl_opts` will be passed as is to `ssl:connect/3`.
 - `tcp_opts` will be passed as is to `gen_tcp:connect/3`. Some options are forbidden, such as
   `mode`, `packet`, `header`, `active`. When `tcp_opts` is not provided, epgsql does some tuning
   (eg, sets TCP `keepalive` and auto-tunes `buffer`), but when `tcp_opts` is provided, no
@@ -125,6 +126,12 @@ Only `host` and `username` are mandatory, but most likely you would need `databa
 - `application_name` is an optional string parameter. It is usually set by an application upon
    connection to the server. The name will be displayed in the `pg_stat_activity`
    view and included in CSV log entries.
+- `socket_active` is an optional parameter, which can be `true` or an integer in the range -32768
+   to 32767 (inclusive, however only positive value make sense right now).
+   This option is used to control the flow of incoming messages from the network socket to make
+   sure huge query results won't result in `epgsql` process mailbox overflow. It affects the
+   behaviour of some of the commands and interfaces (`epgsqli` and replication), so, use with
+   caution! See [Active socket](#active-socket) for more details.
 
 Options may be passed as proplist or as map with the same key names.
 
@@ -673,6 +680,48 @@ Retrieve actual value of server-side parameters, such as character endoding,
 date/time format and timezone, server version and so on. See [libpq PQparameterStatus](https://www.postgresql.org/docs/current/static/libpq-status.html#LIBPQ-PQPARAMETERSTATUS).
 Parameter's value may change during connection's lifetime.
 
+## Active socket
+
+By default `epgsql` sets its underlying `gen_tcp` or `ssl` socket into `{active, true}` mode
+(make sure you understand the [OTP inet:setopts/2 documentation](https://www.erlang.org/doc/man/inet.html#setopts-2)
+about `active` option).
+That means if PostgreSQL decides to quickly send a huge amount of data to the client (for example,
+client made a SELECT that returns large amount of results or when we are connected in streaming
+replication mode and receiving a lot of updates), underlying network socket might quickly send
+large number of messages to the `epgsql` connection process leading to the growing mailbox and high
+RAM consumption (or even OOM situation in case of really large query result or massive replication
+update).
+
+To avoid such scenarios, `epgsql` can may rely on "TCP backpressure" to prevent socket from sending
+unlimited number of messages - implement a "flow control". To do so, `socket_active => 1..32767`
+could be added at connection time. This option would set `{active, N}` option on the underlying
+socket and would tell the network to send no more than `N` messages to `epgsql` connection and then
+pause to let `epgsql` and the client process the already received network data and then decide how
+to proceed.
+
+The way this pause is signalled to the client and how the socket can be activated again depends on
+the interface client is using:
+
+- when `epgsqli` interface is used, `epgsql` would send all the normal low level messages and then
+  at any point it may send `{epgsql, C, socket_passive}` message to signal that socket have been
+  paused. `epgsql:activate(C)` must be called to re-activate the socket.
+- when `epgsql` is connected in [Streaming replication](doc/streaming.md) mode and `pid()` is used
+  as the receiver of the X-Log Data messages, it would behave in the same way:
+  `{epgsql, C, socket_passive}` might be sent along with
+  `{epgsql, self(), {x_log_data, _, _, _}}` messages and `epgsql:activate/1` can be used to
+  re-activate.
+- in all the other cases (`epgsql` / `epgsqla` command, while `COPY FROM STDIN` mode is active,
+  when Streaming replication with Erlang module callback as receiver of X-Log Data or
+  while connection is idle) `epgsql` would transparently re-activate the socket automatically: it
+  won't prevent high RAM usage from large SELECT result, but it would make sure `epgsql` process
+  has no more than `N` messages from the network in its mailbox.
+
+It is a good idea to combine `socket_active => N` with some specific value of
+`tcp_opts => [{buffer, X}]` since each of the `N` messages sent from the network to `epgsql`
+process would contain no more than `X` bytes. So the MAXIMUM amount of data seating at the `epgsql`
+mailbox could be roughly estimated as `N * X`. So if `N = 256` and `X = 512*1024` (512kb) then
+there will be no more than `N * X = 256 * 524288 = 134_217_728` or 128MB of data in the mailbox
+at the same time.
 
 ## Streaming replication protocol
 
@@ -698,7 +747,7 @@ Here's how to create a patch that's easy to integrate:
 - Create a new branch for the proposed fix.
 - Make sure it includes a test and documentation, if appropriate.
 - Open a pull request against the `devel` branch of epgsql.
-- Passing build in travis
+- Passing CI build
 
 ## Test Setup
 
@@ -708,11 +757,11 @@ Postgres database.
 NOTE: you will need the postgis and hstore extensions to run these
 tests! On Ubuntu, you can install them with a command like this:
 
-1. `apt-get install postgresql-9.3-postgis-2.1 postgresql-contrib`
+1. `apt-get install postgresql-12-postgis-3 postgresql-contrib`
 1. `make test` # Runs the tests
 
 NOTE 2: It's possible to run tests on exact postgres version by changing $PATH like
 
-   `PATH=$PATH:/usr/lib/postgresql/9.5/bin/ make test`
+   `PATH=$PATH:/usr/lib/postgresql/12/bin/ make test`
 
 [![CI](https://github.com/epgsql/epgsql/actions/workflows/ci.yml/badge.svg)](https://github.com/epgsql/epgsql/actions/workflows/ci.yml)

+ 39 - 1
doc/streaming.md

@@ -138,4 +138,42 @@ handle_x_log_data(StartLSN, EndLSN, Data, CbState) ->
     then you do not have any risk to lose data. 
     
     Otherwise (if you do not load all data from tables during erlang app startup) 
-    it is not recommended to set align_lsn to true. In this case to stop PG server stop epgsql replication first.
+    it is not recommended to set align_lsn to true. In this case to stop PG server stop epgsql replication first.
+    
+## Flow control
+
+It is possible to set `{socket_active, N}` on a [TCP](https://www.erlang.org/doc/man/inet.html#setopts-2)
+or [SSL](https://www.erlang.org/doc/man/ssl.html#setopts-2) (since OTP 21.3) socket. E.g. for SSL:
+```erlang
+Opts = #{host => "localhost",
+         username => "me",
+         password => "pwd",
+         database => "test",
+         ssl => require,
+         socket_active => 10
+        },
+{ok, Conn} = epgsql:connect(Opts).
+```
+
+Its main purpose is to control the flow of replication messages from Postgresql database.
+If a database is under a high load and a process, which
+handles the message stream, cannot keep up with it then setting this option gives the handling process
+ability to get messages on-demand.
+
+When the connection is in the asynchronous mode, a process which owns a connection will receive
+```erlang
+{epgsql, Connection, socket_passive}
+```
+as soon as underlying socket received N messages from network.
+
+The process decides when to activate connection's socket again. To do that it should call:
+```erlang
+epgsql:activate(Connection).
+```
+The `active` parameter of the socket will be set to the same value as it was configured in
+the connection's options.
+
+In the case of synchronous handler for replication messages `epgsql` will handle `socket_passive`
+messages internally.
+
+See [Active socket README section](../#active-socket) for more details.

+ 1 - 1
src/commands/epgsql_cmd_connect.erl

@@ -85,7 +85,7 @@ execute(PgSock, #connect{opts = #{username := Username} = Opts, stage = connect}
 execute(PgSock, #connect{stage = auth, auth_send = {PacketType, Data}} = St) ->
     {send, PacketType, Data, PgSock, St#connect{auth_send = undefined}}.
 
--spec open_socket([{atom(), any()}], epgsql:connect_opts()) ->
+-spec open_socket([{atom(), any()}], epgsql:connect_opts_map()) ->
     {ok , gen_tcp | ssl, gen_tcp:socket() | ssl:sslsocket()} | {error, any()}.
 open_socket(SockOpts, #{host := Host} = ConnectOpts) ->
     Timeout = maps:get(timeout, ConnectOpts, 5000),

+ 1 - 1
src/epgsql.app.src

@@ -1,6 +1,6 @@
 {application, epgsql,
  [{description, "PostgreSQL Client"},
-  {vsn, "4.6.1"},
+  {vsn, "4.7.0"},
   {modules, []},
   {registered, []},
   {applications, [kernel,

+ 28 - 10
src/epgsql.erl

@@ -37,13 +37,15 @@
          start_replication/5,
          start_replication/6,
          start_replication/7,
-         to_map/1]).
--export([handle_x_log_data/5]).                 % private
+         to_map/1,
+         activate/1]).
+%% private
+-export([handle_x_log_data/5]).
 
--export_type([connection/0, connect_option/0, connect_opts/0,
+-export_type([connection/0, connect_option/0, connect_opts/0, connect_opts_map/0,
               connect_error/0, query_error/0, sql_query/0, column/0,
               type_name/0, epgsql_type/0, statement/0,
-              transaction_option/0, transaction_opts/0]).
+              transaction_option/0, transaction_opts/0, socket_active/0]).
 
 %% Deprecated types
 -export_type([bind_param/0, typed_param/0,
@@ -62,6 +64,7 @@
 -type host() :: inet:ip_address() | inet:hostname().
 -type password() :: string() | iodata() | fun( () -> iodata() ).
 -type connection() :: pid().
+-type socket_active() :: true | -32768..32767.
 -type connect_option() ::
     {host, host()}                                 |
     {username, string()}                           |
@@ -76,11 +79,10 @@
     {codecs,   Codecs     :: [{epgsql_codec:codec_mod(), any()}]} |
     {nulls,    Nulls      :: [any(), ...]} |    % terms to be used as NULL
     {replication, Replication :: string()} | % Pass "database" to connect in replication mode
-    {application_name, ApplicationName :: string()}.
-
--type connect_opts() ::
-        [connect_option()]
-      | #{host => host(),
+    {application_name, ApplicationName :: string()} |
+    {socket_active, Active :: socket_active()}.
+-type connect_opts_map() ::
+        #{host => host(),
           username => string(),
           password => password(),
           database => string(),
@@ -93,9 +95,12 @@
           codecs => [{epgsql_codec:codec_mod(), any()}],
           nulls => [any(), ...],
           replication => string(),
-          application_name => string()
+          application_name => string(),
+          socket_active => socket_active()
           }.
 
+-type connect_opts() :: connect_opts_map() | [connect_option()].
+
 -type transaction_option() ::
     {reraise, boolean()}          |
     {ensure_committed, boolean()} |
@@ -571,3 +576,16 @@ to_map(Map) when is_map(Map) ->
     Map;
 to_map(List) when is_list(List) ->
     maps:from_list(List).
+
+%% @doc Activates TCP or SSL socket of a connection.
+%%
+%% If the `socket_active` connection option is supplied the function sets
+%% `{active, X}' the connection's SSL or TCP socket. It sets `{active, true}' otherwise.
+%%
+%% @param Connection connection
+%% @returns `ok' or `{error, Reason}'
+%%
+%% Note: The ssl:reason() type is not exported so that we use `any()' on the spec.
+-spec activate(connection()) -> ok | {error, inet:posix() | any()}.
+activate(Connection) ->
+    epgsql_sock:activate(Connection).

+ 86 - 13
src/epgsql_sock.erl

@@ -53,9 +53,10 @@
          cancel/1,
          copy_send_rows/3,
          standby_status_update/3,
-         get_backend_pid/1]).
+         get_backend_pid/1,
+         activate/1]).
 
--export([handle_call/3, handle_cast/2, handle_info/2, format_status/2]).
+-export([handle_call/3, handle_cast/2, handle_info/2, format_status/1, format_status/2]).
 -export([init/1, code_change/3, terminate/2]).
 
 %% loop callback
@@ -67,9 +68,12 @@
          get_parameter_internal/2,
          get_subproto_state/1, set_packet_handler/2]).
 
+-ifdef(TEST).
+-export([state_to_map/1]).
+-endif.
+
 -export_type([transport/0, pg_sock/0, error/0]).
 
--include("epgsql.hrl").
 -include("protocol.hrl").
 -include("epgsql_replication.hrl").
 -include("epgsql_copy.hrl").
@@ -78,7 +82,7 @@
                    | {cast, pid(), reference()}
                    | {incremental, pid(), reference()}.
 
--type tcp_socket() :: port(). %gen_tcp:socket() isn't exported prior to erl 18
+-type tcp_socket() :: gen_tcp:socket().
 -type repl_state() :: #repl{}.
 -type copy_state() :: #copy{}.
 
@@ -102,7 +106,7 @@
                 txstatus :: byte() | undefined,  % $I | $T | $E,
                 complete_status :: atom() | {atom(), integer()} | undefined,
                 subproto_state :: repl_state() | copy_state() | undefined,
-                connect_opts :: epgsql:connect_opts() | undefined}).
+                connect_opts :: epgsql:connect_opts_map() | undefined}).
 
 -opaque pg_sock() :: #state{}.
 
@@ -156,6 +160,11 @@ standby_status_update(C, FlushedLSN, AppliedLSN) ->
 get_backend_pid(C) ->
     gen_server:call(C, get_backend_pid).
 
+%% The ssl:reason() type is not exported
+-spec activate(epgsql:connection()) -> ok | {error, inet:posix() | any()}.
+activate(C) ->
+    gen_server:call(C, activate).
+
 %% -- command APIs --
 
 %% send()
@@ -164,7 +173,7 @@ get_backend_pid(C) ->
 -spec set_net_socket(gen_tcp | ssl, tcp_socket() | ssl:sslsocket(), pg_sock()) -> pg_sock().
 set_net_socket(Mod, Socket, State) ->
     State1 = State#state{mod = Mod, sock = Socket},
-    setopts(State1, [{active, true}]),
+    ok = activate_socket(State1),
     State1.
 
 -spec init_replication_state(pg_sock()) -> pg_sock().
@@ -189,8 +198,8 @@ set_attr(connect_opts, ConnectOpts, State) ->
 
 %% XXX: be careful!
 -spec set_packet_handler(atom(), pg_sock()) -> pg_sock().
-set_packet_handler(Handler, State) ->
-    State#state{handler = Handler}.
+set_packet_handler(Handler, State0) ->
+    State0#state{handler = Handler}.
 
 -spec get_codec(pg_sock()) -> epgsql_binary:codec().
 get_codec(#state{codec = Codec}) ->
@@ -215,7 +224,6 @@ get_parameter_internal(Name, #state{parameters = Parameters}) ->
         false                  -> undefined
     end.
 
-
 %% -- gen_server implementation --
 
 init([]) ->
@@ -248,7 +256,11 @@ handle_call({standby_status_update, FlushedLSN, AppliedLSN}, _From,
 handle_call({copy_send_rows, Rows}, _From,
            #state{handler = Handler, subproto_state = CopyState} = State) ->
     Response = handle_copy_send_rows(Rows, Handler, CopyState, State),
-    {reply, Response, State}.
+    {reply, Response, State};
+
+handle_call(activate, _From, State) ->
+    Res = activate_socket(State),
+    {reply, Res, State}.
 
 handle_cast({{Method, From, Ref} = Transport, Command, Args}, State)
   when ((Method == cast) or (Method == incremental)),
@@ -278,6 +290,11 @@ handle_info({DataTag, Sock, Data2}, #state{data = Data, sock = Sock} = State)
   when DataTag == tcp; DataTag == ssl ->
     loop(State#state{data = <<Data/binary, Data2/binary>>});
 
+handle_info({Passive, Sock}, #state{sock = Sock} = State)
+  when Passive == ssl_passive; Passive == tcp_passive ->
+    NewState = handle_socket_pasive(State),
+    {noreply, NewState};
+
 handle_info({Closed, Sock}, #state{sock = Sock} = State)
   when Closed == tcp_closed; Closed == ssl_closed ->
     {stop, sock_closed, flush_queue(State#state{sock = undefined}, {error, sock_closed})};
@@ -305,14 +322,44 @@ terminate(_Reason, #state{mod = ssl, sock = Sock}) -> ssl:close(Sock).
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
+format_status(Status = #{reason := _Reason, state := State}) ->
+  %% Do not format the rows attribute when process terminates abnormally
+  %% but allow it when is a sys:get_status/1.2
+  Status#{state => redact_state(State)};
+format_status(Status) ->
+    Status.
+
+%% TODO
+%% This is deprecated since OTP-25 in favor of `format_status/1`. Remove once
+%% OTP-25 becomes minimum supported OTP version.
 format_status(normal, [_PDict, State=#state{}]) ->
   [{data, [{"State", State}]}];
 format_status(terminate, [_PDict, State]) ->
   %% Do not format the rows attribute when process terminates abnormally
   %% but allow it when is a sys:get_status/1.2
-  State#state{rows = information_redacted}.
+  redact_state(State).
 
 %% -- internal functions --
+-spec handle_socket_pasive(pg_sock()) -> pg_sock().
+handle_socket_pasive(#state{handler = on_replication,
+                            subproto_state = #repl{receiver = Rec}} = State) when is_pid(Rec) ->
+    %% Replication with pid() as X-Log data receiver
+    Rec ! {epgsql, self(), socket_passive},
+    State;
+handle_socket_pasive(#state{current_cmd_transport = {incremental, From, _}} = State) ->
+    %% `epgsqli' interface command
+    From ! {epgsql, self(), socket_passive},
+    State;
+handle_socket_pasive(State) ->
+    %% - current_cmd_transport is `call' or `cast': client expects whole result set anyway
+    %% - handler = on_copy_from_stdin: we don't expect much data from the server
+    %% - handler = on_replication with callback module as X-Log data receiver: pace controlled by
+    %%   callback execution time
+    %% - idle (eg, receiving asynchronous error or NOTIFICATION/WARNING): client might not expect
+    %%   to receive the `socket_passive' messages or there might be no client at all. Also, async
+    %%   notifications are usually small.
+    ok = activate_socket(State),
+    State.
 
 -spec command_new(transport(), epgsql_command:command(), any(), pg_sock()) ->
                          Result when
@@ -407,13 +454,23 @@ command_next(#state{current_cmd = PrevCmd,
                         results = []}
     end.
 
-
 setopts(#state{mod = Mod, sock = Sock}, Opts) ->
     case Mod of
         gen_tcp -> inet:setopts(Sock, Opts);
         ssl     -> ssl:setopts(Sock, Opts)
     end.
 
+-spec get_socket_active(pg_sock()) -> epgsql:socket_active().
+get_socket_active(#state{connect_opts = #{socket_active := Active}}) ->
+    Active;
+get_socket_active(_State) ->
+    true.
+
+-spec activate_socket(pg_sock()) -> ok | {error, inet:posix() | any()}.
+activate_socket(State) ->
+  Active = get_socket_active(State),
+  setopts(State, [{active, Active}]).
+
 %% This one only used in connection initiation to send client's
 %% `StartupMessage' and `SSLRequest' packets
 -spec send(pg_sock(), iodata()) -> ok | {error, any()}.
@@ -433,7 +490,12 @@ send_multi(#state{mod = Mod, sock = Sock}, List) ->
 do_send(gen_tcp, Sock, Bin) ->
     %% Why not gen_tcp:send/2?
     %% See https://github.com/rabbitmq/rabbitmq-common/blob/v3.7.4/src/rabbit_writer.erl#L367-L384
-    %% Because of that we also have `handle_info({inet_reply, ...`
+    %% Since `epgsql' uses `{active, true}' socket option by-default, it may potentially quickly
+    %% receive huge amount of data from the network.
+    %% With introduction of `{socket_active, N}' option it becomes less of a problem, but
+    %% `{active, true}' is still the default.
+    %%
+    %% Because we use `inet' driver directly, we also have `handle_info({inet_reply, ...`
     try erlang:port_command(Sock, Bin) of
         true ->
             ok
@@ -737,3 +799,14 @@ handle_xlog_data(StartLSN, EndLSN, WALRecord,
               last_flushed_lsn = LastFlushedLSN,
               last_applied_lsn = LastAppliedLSN,
               cbstate = NewCbState}.
+
+redact_state(State) ->
+    State#state{rows = information_redacted}.
+
+-ifdef(TEST).
+
+state_to_map(State) ->
+    [state | Fields] = tuple_to_list(State),
+    maps:from_list(lists:zip(record_info(fields, state), Fields)).
+
+-endif.

+ 110 - 2
test/epgsql_SUITE.erl

@@ -15,6 +15,9 @@
     end_per_suite/1
 ]).
 
+%% logger handler
+-export([log/2]).
+
 -compile([export_all, nowarn_export_all]).
 
 modules() ->
@@ -77,9 +80,14 @@ groups() ->
             pipelined_prepared_query,
             pipelined_parse_batch_execute
         ]},
+        {incremental_sock_active, [parallel], [
+            incremental_sock_active_n,
+            incremental_sock_active_n_ssl
+        ]},
         {generic, [parallel], [
             with_transaction,
-            mixed_api
+            mixed_api,
+            redacted_state
         ]}
     ],
 
@@ -145,7 +153,7 @@ groups() ->
     SubGroups ++
         [{epgsql, [], [{group, generic} | Tests]},
          {epgsql_cast, [], [{group, pipelining} | Tests]},
-         {epgsql_incremental, [], Tests}].
+         {epgsql_incremental, [], [{group, incremental_sock_active} | Tests]}].
 
 end_per_suite(_Config) ->
     ok.
@@ -1609,6 +1617,106 @@ pipelined_parse_batch_execute(Config) ->
                end || Ref <- CloseRefs],
               erlang:cancel_timer(Timer)
       end).
+
+incremental_sock_active_n(Config) ->
+    epgsql_incremental = ?config(module, Config),
+    Q = "SELECT *, 'Hello world' FROM generate_series(0, 10240)",
+    epgsql_ct:with_connection(Config,
+         fun(C) ->
+             Ref = epgsqli:squery(C, Q),
+             {done, NumPassive, Others, Rows} = recv_incremental_active_n(C, Ref),
+             ?assertMatch([{columns, _}, {complete, _}], Others),
+             ?assert(NumPassive > 0),
+             ?assertMatch([{<<"0">>, <<"Hello world">>},
+                           {<<"1">>, <<"Hello world">>} | _], Rows),
+             ?assertEqual(10241, length(Rows))
+         end,
+         "epgsql_test",
+         [{socket_active, 2}]).
+
+-ifdef(OTP_RELEASE).
+incremental_sock_active_n_ssl(Config) ->
+    epgsql_incremental = ?config(module, Config),
+    Q = "SELECT *, 'Hello world' FROM generate_series(0, 10240)",
+    epgsql_ct:with_connection(Config,
+         fun(C) ->
+             Ref = epgsqli:squery(C, Q),
+             {done, NumPassive, Others, Rows} = recv_incremental_active_n(C, Ref),
+             ?assertMatch([{columns, _}, {complete, _}], Others),
+             ?assert(NumPassive > 0),
+             ?assertMatch([{<<"0">>, <<"Hello world">>},
+                           {<<"1">>, <<"Hello world">>} | _], Rows),
+             ?assertEqual(10241, length(Rows))
+         end,
+         "epgsql_test",
+         [{ssl, true}, {socket_active, 2}]).
+-else.
+%% {active, N} for SSL is only supported on OTP-21+
+incremental_sock_active_n_ssl(_Config) ->
+    noop.
+-endif.
+
+recv_incremental_active_n(C, Ref) ->
+    recv_incremental_active_n(C, Ref, 0, [], []).
+
+recv_incremental_active_n(C, Ref, NumPassive, Rows, Others) ->
+    receive
+        {C, Ref, {data, Row}} ->
+            recv_incremental_active_n(C, Ref, NumPassive, [Row | Rows], Others);
+        {epgsql, C, socket_passive} ->
+            ok = epgsql:activate(C),
+            recv_incremental_active_n(C, Ref, NumPassive + 1, Rows, Others);
+        {C, Ref, {error, _} = E} ->
+            E;
+        {C, Ref, done} ->
+            {done, NumPassive, lists:reverse(Others), lists:reverse(Rows)};
+        {C, Ref, Other} ->
+            recv_incremental_active_n(C, Ref, NumPassive, Rows, [Other | Others]);
+        Other ->
+            recv_incremental_active_n(C, Ref, NumPassive, Rows, [Other | Others])
+    after 5000 ->
+            error({timeout, NumPassive, Others, Rows})
+    end.
+
+redacted_state(Config) ->
+    case erlang:system_info(otp_release) of
+      V = [_, _] when V > "20" ->
+        redacted_state_(Config);
+      V ->
+        {skip, {"Logger facility is available starting OTP 21, running on OTP " ++ V}}
+    end.
+
+redacted_state_(Config) ->
+    _Handle = ct:timetrap({seconds, 3}),
+    try
+      logger:add_handler(?MODULE, ?MODULE, #{relay_to => self()}),
+      C = epgsql_ct:connect(Config),
+      true = unlink(C),
+      Reason = {please, ?MODULE, ?FUNCTION_NAME},
+      ok = proc_lib:stop(C, Reason, 1000),
+      receive
+        {log, Message} ->
+            ?assertMatch({report, #{label := {gen_server, terminate},
+                                    reason := Reason,
+                                    state := _}},
+                         Message),
+            {report, #{state := State}} = Message,
+            ?assertMatch(#{rows := information_redacted},
+                         epgsql_sock:state_to_map(State))
+      end
+    after
+      logger:remove_handler(?MODULE)
+    end.
+
+%% =============================================================================
+%% Logger handler
+%% ============================================================================
+
+log(#{msg := Msg}, #{relay_to := Pid}) ->
+    Pid ! {log, Msg};
+log(_, _) ->
+    ok.
+
 %% =============================================================================
 %% Internal functions
 %% ============================================================================

+ 12 - 3
test/epgsql_ct.erl

@@ -4,6 +4,7 @@
 
 -export([
     connection_data/1,
+    connect/1,
     connect_only/2,
     with_connection/2,
     with_connection/3,
@@ -19,6 +20,16 @@ connection_data(Config) ->
     Port = ?config(port, PgConfig),
     {Host, Port}.
 
+connect(Config) ->
+    connect(Config, "epgsql_test", []).
+
+connect(Config, Username, Args) ->
+    {Host, Port} = connection_data(Config),
+    Module = ?config(module, Config),
+    Args2 = [{port, Port}, {database, "epgsql_test_db1"} | Args],
+    {ok, C} = Module:connect(Host, Username, Args2),
+    C.
+
 connect_only(Config, Args) ->
     {Host, Port} = connection_data(Config),
     Module = ?config(module, Config),
@@ -39,10 +50,8 @@ with_connection(Config, F, Args) ->
     with_connection(Config, F, "epgsql_test", Args).
 
 with_connection(Config, F, Username, Args) ->
-    {Host, Port} = connection_data(Config),
     Module = ?config(module, Config),
-    Args2 = [{port, Port}, {database, "epgsql_test_db1"} | Args],
-    {ok, C} = Module:connect(Host, Username, Args2),
+    C = connect(Config, Username, Args),
     try
         F(C)
     after

+ 7 - 3
test/epgsql_cth.erl

@@ -150,7 +150,7 @@ init_database(Config) ->
 
     {ok, Cwd} = file:get_cwd(),
     PgDataDir = filename:append(Cwd, "datadir"),
-    {ok, _} = exec:run(Initdb ++ " --locale en_US.UTF8 " ++ PgDataDir, [sync,stdout,stderr]),
+    {ok, _} = exec:run(Initdb ++ " --locale en_US.UTF-8 " ++ PgDataDir, [sync, stdout, stderr]),
     [{datadir, PgDataDir}|Config].
 
 get_version(Config) ->
@@ -200,6 +200,10 @@ write_pg_hba_config(Config) ->
     Version = ?config(version, Config),
 
     User = os:getenv("USER"),
+    ClientCert = case Version >= [12] of
+                   true -> "cert";
+                   false -> "cert clientcert=1"
+                 end,
     PGConfig = [
         "local   all             ", User, "                              trust\n",
         "host    template1       ", User, "              127.0.0.1/32    trust\n",
@@ -209,7 +213,7 @@ write_pg_hba_config(Config) ->
         "host    epgsql_test_db1 epgsql_test             127.0.0.1/32    trust\n",
         "host    epgsql_test_db1 epgsql_test_md5         127.0.0.1/32    md5\n",
         "host    epgsql_test_db1 epgsql_test_cleartext   127.0.0.1/32    password\n",
-        "hostssl epgsql_test_db1 epgsql_test_cert        127.0.0.1/32    cert clientcert=1\n",
+        "hostssl epgsql_test_db1 epgsql_test_cert        127.0.0.1/32    ", ClientCert, "\n",
         "host    template1       ", User, "              ::1/128    trust\n",
         "host    ", User, "      ", User, "              ::1/128    trust\n",
         "hostssl postgres        ", User, "              ::1/128    trust\n",
@@ -217,7 +221,7 @@ write_pg_hba_config(Config) ->
         "host    epgsql_test_db1 epgsql_test             ::1/128    trust\n",
         "host    epgsql_test_db1 epgsql_test_md5         ::1/128    md5\n",
         "host    epgsql_test_db1 epgsql_test_cleartext   ::1/128    password\n",
-        "hostssl epgsql_test_db1 epgsql_test_cert        ::1/128    cert clientcert=1\n" |
+        "hostssl epgsql_test_db1 epgsql_test_cert        ::1/128    ", ClientCert, "\n" |
         case Version >= [10] of
             true ->
                 %% See

+ 19 - 1
test/epgsql_incremental.erl

@@ -40,7 +40,10 @@ await_connect(Ref, Opts0) ->
         {C, Ref, connected} ->
             {ok, C};
         {_C, Ref, Error = {error, _}} ->
-            Error
+            Error;
+        {epgsql, C, socket_passive} ->
+            ok = epgsql:activate(C),
+            await_connect(Ref, Opts0)
     after Timeout ->
             error(timeout)
     end.
@@ -200,6 +203,9 @@ receive_result(C, Ref, Cols, Rows) ->
             {ok, Cols, lists:reverse(Rows)};
         {C, Ref, done} ->
             done;
+        {epgsql, C, socket_passive} ->
+            ok = epgsql:activate(C),
+            receive_result(C, Ref, Cols, Rows);
         {'EXIT', C, _Reason} ->
             throw({error, closed})
     end.
@@ -229,6 +235,9 @@ receive_extended_result(C, Ref, Rows) ->
             {ok, lists:reverse(Rows)};
         {C, Ref, done} ->
             done;
+        {epgsql, C, socket_passive} ->
+            ok = epgsql:activate(C),
+            receive_extended_result(C, Ref, Rows);
         {'EXIT', C, _Reason} ->
             {error, closed}
     end.
@@ -243,6 +252,9 @@ receive_describe(C, Ref, Statement = #statement{}) ->
             {ok, Statement#statement{columns = []}};
         {C, Ref, Error = {error, _}} ->
             Error;
+        {epgsql, C, socket_passive} ->
+            ok = epgsql:activate(C),
+            receive_describe(C, Ref, Statement);
         {'EXIT', C, _Reason} ->
             {error, closed}
     end.
@@ -255,6 +267,9 @@ receive_describe_portal(C, Ref) ->
             {ok, []};
         {C, Ref, Error = {error, _}} ->
             Error;
+        {epgsql, C, socket_passive} ->
+            ok = epgsql:activate(C),
+            receive_describe_portal(C, Ref);
         {'EXIT', C, _Reason} ->
             {error, closed}
     end.
@@ -265,6 +280,9 @@ receive_atom(C, Ref, Receive, Return) ->
             Return;
         {C, Ref, Error = {error, _}} ->
             Error;
+        {epgsql, C, socket_passive} ->
+            ok = epgsql:activate(C),
+            receive_atom(C, Ref, Receive, Return);
         {'EXIT', C, _Reason} ->
             {error, closed}
     end.

+ 155 - 85
test/epgsql_replication_SUITE.erl

@@ -1,110 +1,180 @@
 -module(epgsql_replication_SUITE).
+
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include("epgsql.hrl").
 
--export([
-    init_per_suite/1,
-    all/0,
-    end_per_suite/1,
+-export([all/0,
+         init_per_suite/1,
+         end_per_suite/1,
 
-    connect_in_repl_mode/1,
-    create_drop_replication_slot/1,
-    replication_sync/1,
-    replication_async/1,
+         connect_in_repl_mode/1,
+         create_drop_replication_slot/1,
+         replication_sync/1,
+         replication_async/1,
+         replication_async_active_n_socket/1,
+         replication_sync_active_n_socket/1,
+         replication_async_active_n_ssl/1,
 
-    %% Callbacks
-    handle_x_log_data/4
-]).
+         %% Callbacks
+         handle_x_log_data/4
+        ]).
 
 init_per_suite(Config) ->
-    [{module, epgsql}|Config].
+  [{module, epgsql} | Config].
 
 end_per_suite(_Config) ->
-    ok.
+  ok.
 
 all() ->
-    [
-     connect_in_repl_mode,
-     create_drop_replication_slot,
-     replication_async,
-     replication_sync
-    ].
+  [connect_in_repl_mode,
+   create_drop_replication_slot,
+   replication_async,
+   replication_sync,
+   replication_async_active_n_socket,
+   replication_sync_active_n_socket,
+   replication_async_active_n_ssl
+  ].
 
 connect_in_repl_mode(Config) ->
-    epgsql_ct:connect_only(Config, ["epgsql_test_replication",
-        "epgsql_test_replication",
-        [{database, "epgsql_test_db1"}, {replication, "database"}]]).
+  epgsql_ct:connect_only(
+    Config,
+    ["epgsql_test_replication",
+     "epgsql_test_replication",
+     [{database, "epgsql_test_db1"}, {replication, "database"}]
+    ]).
 
 create_drop_replication_slot(Config) ->
-    Module = ?config(module, Config),
-    epgsql_ct:with_connection(
-        Config,
-        fun(C) ->
-            {ok, Cols, Rows} = Module:squery(C, "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
-            [#column{name = <<"slot_name">>}, #column{name = <<"consistent_point">>},
-                #column{name = <<"snapshot_name">>}, #column{name = <<"output_plugin">>}] = Cols,
-            [{<<"epgsql_test">>, _, _, <<"test_decoding">>}] = Rows,
-            [{ok, _, _}, {ok, _, _}] = Module:squery(C, "DROP_REPLICATION_SLOT ""epgsql_test""")
-        end,
-        "epgsql_test_replication",
-        [{replication, "database"}]).
+  epgsql_ct:with_connection(
+    Config,
+    fun(C) ->
+        create_replication_slot(Config, C),
+        drop_replication_slot(Config, C)
+    end,
+    "epgsql_test_replication",
+    [{replication, "database"}]).
 
 replication_async(Config) ->
-    replication_test_run(Config, self()).
+  replication_test_run(Config, self()).
 
 replication_sync(Config) ->
-    replication_test_run(Config, ?MODULE).
+  replication_test_run(Config, ?MODULE).
+
+replication_async_active_n_socket(Config) ->
+  replication_test_run(Config, self(), [{socket_active, 1}]).
+
+replication_sync_active_n_socket(Config) ->
+  replication_test_run(Config, ?MODULE, [{socket_active, 1}]).
+
+-ifdef(OTP_RELEASE).
+replication_async_active_n_ssl(Config) ->
+  replication_test_run(Config, self(), [{socket_active, 1}, {ssl, require}]).
+-else.
+%% {active, N} for SSL is only supported on OTP-21+
+replication_async_active_n_ssl(Config) ->
+    noop.
+-endif.
 
 replication_test_run(Config, Callback) ->
-    Module = ?config(module, Config),
-    epgsql_ct:with_connection(
-        Config,
-        fun(C) ->
-            {ok, _, _} = Module:squery(C, "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
-
-            %% new connection because main id in a replication mode
-            epgsql_ct:with_connection(
-                Config,
-                fun(C2) ->
-                    [{ok, 1},{ok, 1}] = Module:squery(C2,
-                        "insert into test_table1 (id, value) values (5, 'five');delete from test_table1 where id = 5;")
-                end),
-
-            Module:start_replication(C, "epgsql_test", Callback, {C, self()}, "0/0"),
-            ok = receive_replication_msgs(
-                [<<"table public.test_table1: INSERT: id[integer]:5 value[text]:'five'">>,
-                    <<"table public.test_table1: DELETE: id[integer]:5">>], C, [])
-        end,
-        "epgsql_test_replication",
-        [{replication, "database"}]),
-    %% cleanup
-    epgsql_ct:with_connection(
-        Config,
-        fun(C) ->
-            [{ok, _, _}, {ok, _, _}] = Module:squery(C, "DROP_REPLICATION_SLOT ""epgsql_test""")
-        end,
-        "epgsql_test_replication",
-        [{replication, "database"}]).
-
-receive_replication_msgs(Pattern, Pid, ReceivedMsgs) ->
-    receive
-        {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"BEGIN", _/binary>>}} ->
-            receive_replication_msgs(Pattern, Pid, [begin_msg | ReceivedMsgs]);
-        {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"COMMIT", _/binary>>}} ->
-            case lists:reverse(ReceivedMsgs) of
-                [begin_msg, row_msg | _] -> ok;
-                _ -> error_replication_messages_not_received
-            end;
-        {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, Msg}} ->
-            [Msg | T] = Pattern,
-            receive_replication_msgs(T, Pid, [row_msg | ReceivedMsgs])
-    after
-        60000 ->
-            error_timeout
-    end.
+  replication_test_run(Config, Callback, []).
+
+replication_test_run(Config, Callback, ExtOpts) ->
+  Module = ?config(module, Config),
+  {Queries, ReplicationMsgs} = gen_query_and_replication_msgs(lists:seq(100, 110)),
+  epgsql_ct:with_connection(
+    Config,
+    fun(C) ->
+        create_replication_slot(Config, C),
+        %% new connection because main is in the replication mode
+        epgsql_ct:with_connection(
+          Config,
+          fun(C2) ->
+              ExpectedResult = lists:duplicate(length(Queries), {ok, 1}),
+              Res = Module:squery(C2, Queries),
+              ?assertEqual(ExpectedResult, Res)
+          end),
+        Module:start_replication(C, "epgsql_test", Callback, {C, self()}, "0/0"),
+        ok = receive_replication_msgs(Module, ReplicationMsgs, C, [])
+    end,
+    "epgsql_test_replication",
+    [{replication, "database"} | ExtOpts]),
+  %% cleanup
+  epgsql_ct:with_connection(
+    Config,
+    fun(C) -> drop_replication_slot(Config, C) end,
+    "epgsql_test_replication",
+    [{replication, "database"}]).
+
+create_replication_slot(Config, Connection) ->
+  Module = ?config(module, Config),
+  {ok, Cols, Rows} =
+    Module:squery(Connection,
+                  "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
+  ?assertMatch([#column{name = <<"slot_name">>},
+                #column{name = <<"consistent_point">>},
+                #column{name = <<"snapshot_name">>},
+                #column{name = <<"output_plugin">>}
+               ],
+               Cols),
+  ?assertMatch([{<<"epgsql_test">>, _, _, <<"test_decoding">>}], Rows).
+
+drop_replication_slot(Config, Connection) ->
+  Module = ?config(module, Config),
+  Result = Module:squery(Connection, "DROP_REPLICATION_SLOT ""epgsql_test"""),
+  case ?config(version, ?config(pg_config, Config)) >= [13, 0] of
+    true -> ?assertMatch({ok, _, _}, Result);
+    false -> ?assertMatch([{ok, _, _}, {ok, _, _}], Result)
+  end.
+
+gen_query_and_replication_msgs(Ids) ->
+  QInsFmt = "INSERT INTO test_table1 (id, value) VALUES (~b, '~s');",
+  QDelFmt = "DELETE FROM test_table1 WHERE id = ~b;",
+  RmInsFmt = "table public.test_table1: INSERT: id[integer]:~b value[text]:'~s'",
+  RmDelFmt = "table public.test_table1: DELETE: id[integer]:~b",
+  LongBin = base64:encode(crypto:strong_rand_bytes(254)),
+  lists:foldl(
+    fun(Id, {Qs, RMs}) ->
+        QIns = lists:flatten(io_lib:format(QInsFmt, [Id, LongBin])),
+        QDel = lists:flatten(io_lib:format(QDelFmt, [Id])),
+        RmIns = iolist_to_binary(io_lib:format(RmInsFmt, [Id, LongBin])),
+        RmDel = iolist_to_binary(io_lib:format(RmDelFmt, [Id])),
+        {Qs ++ [QIns, QDel], RMs ++ [RmIns, RmDel]}
+    end,
+    {[], []},
+    Ids).
+
+receive_replication_msgs(Module, Pattern, Pid, ReceivedMsgs) ->
+  receive
+    {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"BEGIN", _/binary>>}} ->
+      receive_replication_msgs(Module, Pattern, Pid, [begin_msg | ReceivedMsgs]);
+    {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"COMMIT", _/binary>>}} ->
+      ensure_no_socket_passive_msgs(Module, Pid),
+      case lists:reverse(ReceivedMsgs) of
+        [begin_msg, row_msg | _] -> ok;
+        _ -> error_replication_messages_not_received
+      end;
+    {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, Msg}} ->
+      [Msg | T] = Pattern,
+      receive_replication_msgs(Module, T, Pid, [row_msg | ReceivedMsgs]);
+    {epgsql, Pid, socket_passive} ->
+      Module:activate(Pid),
+      receive_replication_msgs(Module, Pattern, Pid, ReceivedMsgs)
+  after
+    60000 ->
+      error_timeout
+  end.
+
+ensure_no_socket_passive_msgs(Module, Pid) ->
+  receive
+    {epgsql, Pid, socket_passive} ->
+      Module:activate(Pid),
+      ensure_no_socket_passive_msgs(Module, Pid)
+  after
+    100 ->
+      ok
+  end.
 
 handle_x_log_data(StartLSN, EndLSN, Data, CbState) ->
-    {C, Pid} = CbState,
-    Pid ! {epgsql, C, {x_log_data, StartLSN, EndLSN, Data}},
-    {ok, EndLSN, EndLSN, CbState}.
+  {C, Pid} = CbState,
+  Pid ! {epgsql, C, {x_log_data, StartLSN, EndLSN, Data}},
+  {ok, EndLSN, EndLSN, CbState}.