Browse Source

Merge branch 'devel'

Sergey Prokhorov 2 years ago
parent
commit
ae284d824f

+ 31 - 14
.github/workflows/ci.yml

@@ -16,32 +16,49 @@ jobs:
       fail-fast: false
       fail-fast: false
       matrix:
       matrix:
         os:
         os:
-          - "ubuntu-20.04"
+          - "ubuntu-22.04"
         pg:
         pg:
-          - 12
+          - 14
         postgis:
         postgis:
-          - 3
+          - "3"
         otp:
         otp:
-          - "24.0"
-          - "23.3"
-          - "22.3"
-          - "21.3"
-          - "20.3"
+          - "25.2"
+          - "24.3"
+        rebar3:
+          - "3.20.0"
         include:
         include:
-          - otp: "19.3"
-            os: "ubuntu-18.04"
-            pg: 10
-            postgis: "2.4"
+          - os: "ubuntu-20.04"
+            otp: "23.3"
+            rebar3: "3.17.0"
+            pg: "12"
+            postgis: "3"
+          - os: "ubuntu-20.04"
+            otp: "22.3"
+            rebar3: "3.17.0"
+            pg: "12"
+            postgis: "3"
+          - os: "ubuntu-20.04"
+            otp: "21.3"
+            rebar3: "3.15.2"
+            pg: "12"
+            postgis: "3"
+          - os: "ubuntu-20.04"
+            otp: "20.3"
+            rebar3: "3.15.2"
+            pg: "12"
+            postgis: "3"
+
     # env:
     # env:
     #   PATH: ".:/usr/lib/postgresql/12/bin:$PATH"
     #   PATH: ".:/usr/lib/postgresql/12/bin:$PATH"
     env:
     env:
       SHELL: /bin/sh            # needed for erlexec
       SHELL: /bin/sh            # needed for erlexec
     steps:
     steps:
-      - uses: actions/checkout@v2
+      - uses: actions/checkout@v3
 
 
       - uses: erlef/setup-beam@v1
       - uses: erlef/setup-beam@v1
         with:
         with:
           otp-version: ${{matrix.otp}}
           otp-version: ${{matrix.otp}}
+          rebar3-version: ${{ matrix.rebar3 }}
 
 
       - name: Setup postgresql server with postgis
       - name: Setup postgresql server with postgis
         run: sudo apt install postgresql-${{matrix.pg}} postgresql-contrib-${{matrix.pg}} postgresql-${{matrix.pg}}-postgis-${{matrix.postgis}} postgresql-${{matrix.pg}}-postgis-${{matrix.postgis}}-scripts
         run: sudo apt install postgresql-${{matrix.pg}} postgresql-contrib-${{matrix.pg}} postgresql-${{matrix.pg}}-postgis-${{matrix.postgis}} postgresql-${{matrix.pg}}-postgis-${{matrix.postgis}}-scripts
@@ -53,7 +70,7 @@ jobs:
         run: PATH=$PATH:/usr/lib/postgresql/${{matrix.pg}}/bin/ make test
         run: PATH=$PATH:/usr/lib/postgresql/${{matrix.pg}}/bin/ make test
 
 
       - name: Upload CT logs artifact
       - name: Upload CT logs artifact
-        uses: actions/upload-artifact@v2
+        uses: actions/upload-artifact@v3
         if: failure()
         if: failure()
         with:
         with:
           name: ct_report_pg-${{matrix.pg}}_otp-${{matrix.otp}}
           name: ct_report_pg-${{matrix.pg}}_otp-${{matrix.otp}}

+ 7 - 0
CHANGES

@@ -1,3 +1,10 @@
+In 4.7.0
+
+* Flow control `{socket_active, N}` option in streaming replication #271
+* `{socket_active, N}` option generalized to all `epgsql` modes #278
+* CI upgraded to support OTP-25, OTP-19 dropped, run some tests on PostgreSQL 14 #276
+* `format_status/1` OTP-25 `gen_server` callback implemented #277
+
 In 4.6.1
 In 4.6.1
 
 
 * Introduce `get_backend_pid` function #270
 * Introduce `get_backend_pid` function #270

+ 5 - 9
Makefile

@@ -1,16 +1,12 @@
-REBAR = ./rebar3
+REBAR = rebar3
 MINIMAL_COVERAGE = 55
 MINIMAL_COVERAGE = 55
 
 
 all: compile
 all: compile
 
 
-$(REBAR):
-	wget https://github.com/erlang/rebar3/releases/download/3.15.2/rebar3
-	chmod +x rebar3
-
-compile: src/epgsql_errcodes.erl $(REBAR)
+compile: src/epgsql_errcodes.erl
 	@$(REBAR) compile
 	@$(REBAR) compile
 
 
-clean: $(REBAR)
+clean:
 	@$(REBAR) clean
 	@$(REBAR) clean
 	@rm -f doc/*.html
 	@rm -f doc/*.html
 	@rm -f doc/erlang.png
 	@rm -f doc/erlang.png
@@ -35,10 +31,10 @@ test: compile eunit common-test cover
 dialyzer: compile
 dialyzer: compile
 	@$(REBAR) dialyzer
 	@$(REBAR) dialyzer
 
 
-elvis: $(REBAR)
+elvis:
 	@$(REBAR) as lint lint
 	@$(REBAR) as lint lint
 
 
-edoc: $(REBAR)
+edoc:
 	@$(REBAR) edoc
 	@$(REBAR) edoc
 
 
 .PHONY: all compile clean common-test eunit cover test dialyzer elvis
 .PHONY: all compile clean common-test eunit cover test dialyzer elvis

+ 53 - 4
README.md

@@ -73,6 +73,7 @@ connect(Opts) -> {ok, Connection :: epgsql:connection()} | {error, Reason :: epg
       port =>     inet:port_number(),
       port =>     inet:port_number(),
       ssl =>      boolean() | required,
       ssl =>      boolean() | required,
       ssl_opts => [ssl:tls_client_option()], % @see OTP ssl documentation
       ssl_opts => [ssl:tls_client_option()], % @see OTP ssl documentation
+      socket_active => true | integer(), % @see "Active socket" section below
       tcp_opts => [gen_tcp:option()],    % @see OTP gen_tcp module documentation
       tcp_opts => [gen_tcp:option()],    % @see OTP gen_tcp module documentation
       timeout =>  timeout(),             % socket connect timeout, default: 5000 ms
       timeout =>  timeout(),             % socket connect timeout, default: 5000 ms
       async =>    pid() | atom(),        % process to receive LISTEN/NOTIFY msgs
       async =>    pid() | atom(),        % process to receive LISTEN/NOTIFY msgs
@@ -108,7 +109,7 @@ Only `host` and `username` are mandatory, but most likely you would need `databa
 - `ssl` if set to `true`, perform an attempt to connect in ssl mode, but continue unencrypted
 - `ssl` if set to `true`, perform an attempt to connect in ssl mode, but continue unencrypted
   if encryption isn't supported by server. if set to `required` connection will fail if encryption
   if encryption isn't supported by server. if set to `required` connection will fail if encryption
   is not available.
   is not available.
-- `ssl_opts` will be passed as is to `ssl:connect/3`
+- `ssl_opts` will be passed as is to `ssl:connect/3`.
 - `tcp_opts` will be passed as is to `gen_tcp:connect/3`. Some options are forbidden, such as
 - `tcp_opts` will be passed as is to `gen_tcp:connect/3`. Some options are forbidden, such as
   `mode`, `packet`, `header`, `active`. When `tcp_opts` is not provided, epgsql does some tuning
   `mode`, `packet`, `header`, `active`. When `tcp_opts` is not provided, epgsql does some tuning
   (eg, sets TCP `keepalive` and auto-tunes `buffer`), but when `tcp_opts` is provided, no
   (eg, sets TCP `keepalive` and auto-tunes `buffer`), but when `tcp_opts` is provided, no
@@ -125,6 +126,12 @@ Only `host` and `username` are mandatory, but most likely you would need `databa
 - `application_name` is an optional string parameter. It is usually set by an application upon
 - `application_name` is an optional string parameter. It is usually set by an application upon
    connection to the server. The name will be displayed in the `pg_stat_activity`
    connection to the server. The name will be displayed in the `pg_stat_activity`
    view and included in CSV log entries.
    view and included in CSV log entries.
+- `socket_active` is an optional parameter, which can be `true` or an integer in the range -32768
+   to 32767 (inclusive, however only positive value make sense right now).
+   This option is used to control the flow of incoming messages from the network socket to make
+   sure huge query results won't result in `epgsql` process mailbox overflow. It affects the
+   behaviour of some of the commands and interfaces (`epgsqli` and replication), so, use with
+   caution! See [Active socket](#active-socket) for more details.
 
 
 Options may be passed as proplist or as map with the same key names.
 Options may be passed as proplist or as map with the same key names.
 
 
@@ -673,6 +680,48 @@ Retrieve actual value of server-side parameters, such as character endoding,
 date/time format and timezone, server version and so on. See [libpq PQparameterStatus](https://www.postgresql.org/docs/current/static/libpq-status.html#LIBPQ-PQPARAMETERSTATUS).
 date/time format and timezone, server version and so on. See [libpq PQparameterStatus](https://www.postgresql.org/docs/current/static/libpq-status.html#LIBPQ-PQPARAMETERSTATUS).
 Parameter's value may change during connection's lifetime.
 Parameter's value may change during connection's lifetime.
 
 
+## Active socket
+
+By default `epgsql` sets its underlying `gen_tcp` or `ssl` socket into `{active, true}` mode
+(make sure you understand the [OTP inet:setopts/2 documentation](https://www.erlang.org/doc/man/inet.html#setopts-2)
+about `active` option).
+That means if PostgreSQL decides to quickly send a huge amount of data to the client (for example,
+client made a SELECT that returns large amount of results or when we are connected in streaming
+replication mode and receiving a lot of updates), underlying network socket might quickly send
+large number of messages to the `epgsql` connection process leading to the growing mailbox and high
+RAM consumption (or even OOM situation in case of really large query result or massive replication
+update).
+
+To avoid such scenarios, `epgsql` can may rely on "TCP backpressure" to prevent socket from sending
+unlimited number of messages - implement a "flow control". To do so, `socket_active => 1..32767`
+could be added at connection time. This option would set `{active, N}` option on the underlying
+socket and would tell the network to send no more than `N` messages to `epgsql` connection and then
+pause to let `epgsql` and the client process the already received network data and then decide how
+to proceed.
+
+The way this pause is signalled to the client and how the socket can be activated again depends on
+the interface client is using:
+
+- when `epgsqli` interface is used, `epgsql` would send all the normal low level messages and then
+  at any point it may send `{epgsql, C, socket_passive}` message to signal that socket have been
+  paused. `epgsql:activate(C)` must be called to re-activate the socket.
+- when `epgsql` is connected in [Streaming replication](doc/streaming.md) mode and `pid()` is used
+  as the receiver of the X-Log Data messages, it would behave in the same way:
+  `{epgsql, C, socket_passive}` might be sent along with
+  `{epgsql, self(), {x_log_data, _, _, _}}` messages and `epgsql:activate/1` can be used to
+  re-activate.
+- in all the other cases (`epgsql` / `epgsqla` command, while `COPY FROM STDIN` mode is active,
+  when Streaming replication with Erlang module callback as receiver of X-Log Data or
+  while connection is idle) `epgsql` would transparently re-activate the socket automatically: it
+  won't prevent high RAM usage from large SELECT result, but it would make sure `epgsql` process
+  has no more than `N` messages from the network in its mailbox.
+
+It is a good idea to combine `socket_active => N` with some specific value of
+`tcp_opts => [{buffer, X}]` since each of the `N` messages sent from the network to `epgsql`
+process would contain no more than `X` bytes. So the MAXIMUM amount of data seating at the `epgsql`
+mailbox could be roughly estimated as `N * X`. So if `N = 256` and `X = 512*1024` (512kb) then
+there will be no more than `N * X = 256 * 524288 = 134_217_728` or 128MB of data in the mailbox
+at the same time.
 
 
 ## Streaming replication protocol
 ## Streaming replication protocol
 
 
@@ -698,7 +747,7 @@ Here's how to create a patch that's easy to integrate:
 - Create a new branch for the proposed fix.
 - Create a new branch for the proposed fix.
 - Make sure it includes a test and documentation, if appropriate.
 - Make sure it includes a test and documentation, if appropriate.
 - Open a pull request against the `devel` branch of epgsql.
 - Open a pull request against the `devel` branch of epgsql.
-- Passing build in travis
+- Passing CI build
 
 
 ## Test Setup
 ## Test Setup
 
 
@@ -708,11 +757,11 @@ Postgres database.
 NOTE: you will need the postgis and hstore extensions to run these
 NOTE: you will need the postgis and hstore extensions to run these
 tests! On Ubuntu, you can install them with a command like this:
 tests! On Ubuntu, you can install them with a command like this:
 
 
-1. `apt-get install postgresql-9.3-postgis-2.1 postgresql-contrib`
+1. `apt-get install postgresql-12-postgis-3 postgresql-contrib`
 1. `make test` # Runs the tests
 1. `make test` # Runs the tests
 
 
 NOTE 2: It's possible to run tests on exact postgres version by changing $PATH like
 NOTE 2: It's possible to run tests on exact postgres version by changing $PATH like
 
 
-   `PATH=$PATH:/usr/lib/postgresql/9.5/bin/ make test`
+   `PATH=$PATH:/usr/lib/postgresql/12/bin/ make test`
 
 
 [![CI](https://github.com/epgsql/epgsql/actions/workflows/ci.yml/badge.svg)](https://github.com/epgsql/epgsql/actions/workflows/ci.yml)
 [![CI](https://github.com/epgsql/epgsql/actions/workflows/ci.yml/badge.svg)](https://github.com/epgsql/epgsql/actions/workflows/ci.yml)

+ 39 - 1
doc/streaming.md

@@ -138,4 +138,42 @@ handle_x_log_data(StartLSN, EndLSN, Data, CbState) ->
     then you do not have any risk to lose data. 
     then you do not have any risk to lose data. 
     
     
     Otherwise (if you do not load all data from tables during erlang app startup) 
     Otherwise (if you do not load all data from tables during erlang app startup) 
-    it is not recommended to set align_lsn to true. In this case to stop PG server stop epgsql replication first.
+    it is not recommended to set align_lsn to true. In this case to stop PG server stop epgsql replication first.
+    
+## Flow control
+
+It is possible to set `{socket_active, N}` on a [TCP](https://www.erlang.org/doc/man/inet.html#setopts-2)
+or [SSL](https://www.erlang.org/doc/man/ssl.html#setopts-2) (since OTP 21.3) socket. E.g. for SSL:
+```erlang
+Opts = #{host => "localhost",
+         username => "me",
+         password => "pwd",
+         database => "test",
+         ssl => require,
+         socket_active => 10
+        },
+{ok, Conn} = epgsql:connect(Opts).
+```
+
+Its main purpose is to control the flow of replication messages from Postgresql database.
+If a database is under a high load and a process, which
+handles the message stream, cannot keep up with it then setting this option gives the handling process
+ability to get messages on-demand.
+
+When the connection is in the asynchronous mode, a process which owns a connection will receive
+```erlang
+{epgsql, Connection, socket_passive}
+```
+as soon as underlying socket received N messages from network.
+
+The process decides when to activate connection's socket again. To do that it should call:
+```erlang
+epgsql:activate(Connection).
+```
+The `active` parameter of the socket will be set to the same value as it was configured in
+the connection's options.
+
+In the case of synchronous handler for replication messages `epgsql` will handle `socket_passive`
+messages internally.
+
+See [Active socket README section](../#active-socket) for more details.

+ 1 - 1
src/commands/epgsql_cmd_connect.erl

@@ -85,7 +85,7 @@ execute(PgSock, #connect{opts = #{username := Username} = Opts, stage = connect}
 execute(PgSock, #connect{stage = auth, auth_send = {PacketType, Data}} = St) ->
 execute(PgSock, #connect{stage = auth, auth_send = {PacketType, Data}} = St) ->
     {send, PacketType, Data, PgSock, St#connect{auth_send = undefined}}.
     {send, PacketType, Data, PgSock, St#connect{auth_send = undefined}}.
 
 
--spec open_socket([{atom(), any()}], epgsql:connect_opts()) ->
+-spec open_socket([{atom(), any()}], epgsql:connect_opts_map()) ->
     {ok , gen_tcp | ssl, gen_tcp:socket() | ssl:sslsocket()} | {error, any()}.
     {ok , gen_tcp | ssl, gen_tcp:socket() | ssl:sslsocket()} | {error, any()}.
 open_socket(SockOpts, #{host := Host} = ConnectOpts) ->
 open_socket(SockOpts, #{host := Host} = ConnectOpts) ->
     Timeout = maps:get(timeout, ConnectOpts, 5000),
     Timeout = maps:get(timeout, ConnectOpts, 5000),

+ 1 - 1
src/epgsql.app.src

@@ -1,6 +1,6 @@
 {application, epgsql,
 {application, epgsql,
  [{description, "PostgreSQL Client"},
  [{description, "PostgreSQL Client"},
-  {vsn, "4.6.1"},
+  {vsn, "4.7.0"},
   {modules, []},
   {modules, []},
   {registered, []},
   {registered, []},
   {applications, [kernel,
   {applications, [kernel,

+ 28 - 10
src/epgsql.erl

@@ -37,13 +37,15 @@
          start_replication/5,
          start_replication/5,
          start_replication/6,
          start_replication/6,
          start_replication/7,
          start_replication/7,
-         to_map/1]).
--export([handle_x_log_data/5]).                 % private
+         to_map/1,
+         activate/1]).
+%% private
+-export([handle_x_log_data/5]).
 
 
--export_type([connection/0, connect_option/0, connect_opts/0,
+-export_type([connection/0, connect_option/0, connect_opts/0, connect_opts_map/0,
               connect_error/0, query_error/0, sql_query/0, column/0,
               connect_error/0, query_error/0, sql_query/0, column/0,
               type_name/0, epgsql_type/0, statement/0,
               type_name/0, epgsql_type/0, statement/0,
-              transaction_option/0, transaction_opts/0]).
+              transaction_option/0, transaction_opts/0, socket_active/0]).
 
 
 %% Deprecated types
 %% Deprecated types
 -export_type([bind_param/0, typed_param/0,
 -export_type([bind_param/0, typed_param/0,
@@ -62,6 +64,7 @@
 -type host() :: inet:ip_address() | inet:hostname().
 -type host() :: inet:ip_address() | inet:hostname().
 -type password() :: string() | iodata() | fun( () -> iodata() ).
 -type password() :: string() | iodata() | fun( () -> iodata() ).
 -type connection() :: pid().
 -type connection() :: pid().
+-type socket_active() :: true | -32768..32767.
 -type connect_option() ::
 -type connect_option() ::
     {host, host()}                                 |
     {host, host()}                                 |
     {username, string()}                           |
     {username, string()}                           |
@@ -76,11 +79,10 @@
     {codecs,   Codecs     :: [{epgsql_codec:codec_mod(), any()}]} |
     {codecs,   Codecs     :: [{epgsql_codec:codec_mod(), any()}]} |
     {nulls,    Nulls      :: [any(), ...]} |    % terms to be used as NULL
     {nulls,    Nulls      :: [any(), ...]} |    % terms to be used as NULL
     {replication, Replication :: string()} | % Pass "database" to connect in replication mode
     {replication, Replication :: string()} | % Pass "database" to connect in replication mode
-    {application_name, ApplicationName :: string()}.
-
--type connect_opts() ::
-        [connect_option()]
-      | #{host => host(),
+    {application_name, ApplicationName :: string()} |
+    {socket_active, Active :: socket_active()}.
+-type connect_opts_map() ::
+        #{host => host(),
           username => string(),
           username => string(),
           password => password(),
           password => password(),
           database => string(),
           database => string(),
@@ -93,9 +95,12 @@
           codecs => [{epgsql_codec:codec_mod(), any()}],
           codecs => [{epgsql_codec:codec_mod(), any()}],
           nulls => [any(), ...],
           nulls => [any(), ...],
           replication => string(),
           replication => string(),
-          application_name => string()
+          application_name => string(),
+          socket_active => socket_active()
           }.
           }.
 
 
+-type connect_opts() :: connect_opts_map() | [connect_option()].
+
 -type transaction_option() ::
 -type transaction_option() ::
     {reraise, boolean()}          |
     {reraise, boolean()}          |
     {ensure_committed, boolean()} |
     {ensure_committed, boolean()} |
@@ -571,3 +576,16 @@ to_map(Map) when is_map(Map) ->
     Map;
     Map;
 to_map(List) when is_list(List) ->
 to_map(List) when is_list(List) ->
     maps:from_list(List).
     maps:from_list(List).
+
+%% @doc Activates TCP or SSL socket of a connection.
+%%
+%% If the `socket_active` connection option is supplied the function sets
+%% `{active, X}' the connection's SSL or TCP socket. It sets `{active, true}' otherwise.
+%%
+%% @param Connection connection
+%% @returns `ok' or `{error, Reason}'
+%%
+%% Note: The ssl:reason() type is not exported so that we use `any()' on the spec.
+-spec activate(connection()) -> ok | {error, inet:posix() | any()}.
+activate(Connection) ->
+    epgsql_sock:activate(Connection).

+ 86 - 13
src/epgsql_sock.erl

@@ -53,9 +53,10 @@
          cancel/1,
          cancel/1,
          copy_send_rows/3,
          copy_send_rows/3,
          standby_status_update/3,
          standby_status_update/3,
-         get_backend_pid/1]).
+         get_backend_pid/1,
+         activate/1]).
 
 
--export([handle_call/3, handle_cast/2, handle_info/2, format_status/2]).
+-export([handle_call/3, handle_cast/2, handle_info/2, format_status/1, format_status/2]).
 -export([init/1, code_change/3, terminate/2]).
 -export([init/1, code_change/3, terminate/2]).
 
 
 %% loop callback
 %% loop callback
@@ -67,9 +68,12 @@
          get_parameter_internal/2,
          get_parameter_internal/2,
          get_subproto_state/1, set_packet_handler/2]).
          get_subproto_state/1, set_packet_handler/2]).
 
 
+-ifdef(TEST).
+-export([state_to_map/1]).
+-endif.
+
 -export_type([transport/0, pg_sock/0, error/0]).
 -export_type([transport/0, pg_sock/0, error/0]).
 
 
--include("epgsql.hrl").
 -include("protocol.hrl").
 -include("protocol.hrl").
 -include("epgsql_replication.hrl").
 -include("epgsql_replication.hrl").
 -include("epgsql_copy.hrl").
 -include("epgsql_copy.hrl").
@@ -78,7 +82,7 @@
                    | {cast, pid(), reference()}
                    | {cast, pid(), reference()}
                    | {incremental, pid(), reference()}.
                    | {incremental, pid(), reference()}.
 
 
--type tcp_socket() :: port(). %gen_tcp:socket() isn't exported prior to erl 18
+-type tcp_socket() :: gen_tcp:socket().
 -type repl_state() :: #repl{}.
 -type repl_state() :: #repl{}.
 -type copy_state() :: #copy{}.
 -type copy_state() :: #copy{}.
 
 
@@ -102,7 +106,7 @@
                 txstatus :: byte() | undefined,  % $I | $T | $E,
                 txstatus :: byte() | undefined,  % $I | $T | $E,
                 complete_status :: atom() | {atom(), integer()} | undefined,
                 complete_status :: atom() | {atom(), integer()} | undefined,
                 subproto_state :: repl_state() | copy_state() | undefined,
                 subproto_state :: repl_state() | copy_state() | undefined,
-                connect_opts :: epgsql:connect_opts() | undefined}).
+                connect_opts :: epgsql:connect_opts_map() | undefined}).
 
 
 -opaque pg_sock() :: #state{}.
 -opaque pg_sock() :: #state{}.
 
 
@@ -156,6 +160,11 @@ standby_status_update(C, FlushedLSN, AppliedLSN) ->
 get_backend_pid(C) ->
 get_backend_pid(C) ->
     gen_server:call(C, get_backend_pid).
     gen_server:call(C, get_backend_pid).
 
 
+%% The ssl:reason() type is not exported
+-spec activate(epgsql:connection()) -> ok | {error, inet:posix() | any()}.
+activate(C) ->
+    gen_server:call(C, activate).
+
 %% -- command APIs --
 %% -- command APIs --
 
 
 %% send()
 %% send()
@@ -164,7 +173,7 @@ get_backend_pid(C) ->
 -spec set_net_socket(gen_tcp | ssl, tcp_socket() | ssl:sslsocket(), pg_sock()) -> pg_sock().
 -spec set_net_socket(gen_tcp | ssl, tcp_socket() | ssl:sslsocket(), pg_sock()) -> pg_sock().
 set_net_socket(Mod, Socket, State) ->
 set_net_socket(Mod, Socket, State) ->
     State1 = State#state{mod = Mod, sock = Socket},
     State1 = State#state{mod = Mod, sock = Socket},
-    setopts(State1, [{active, true}]),
+    ok = activate_socket(State1),
     State1.
     State1.
 
 
 -spec init_replication_state(pg_sock()) -> pg_sock().
 -spec init_replication_state(pg_sock()) -> pg_sock().
@@ -189,8 +198,8 @@ set_attr(connect_opts, ConnectOpts, State) ->
 
 
 %% XXX: be careful!
 %% XXX: be careful!
 -spec set_packet_handler(atom(), pg_sock()) -> pg_sock().
 -spec set_packet_handler(atom(), pg_sock()) -> pg_sock().
-set_packet_handler(Handler, State) ->
-    State#state{handler = Handler}.
+set_packet_handler(Handler, State0) ->
+    State0#state{handler = Handler}.
 
 
 -spec get_codec(pg_sock()) -> epgsql_binary:codec().
 -spec get_codec(pg_sock()) -> epgsql_binary:codec().
 get_codec(#state{codec = Codec}) ->
 get_codec(#state{codec = Codec}) ->
@@ -215,7 +224,6 @@ get_parameter_internal(Name, #state{parameters = Parameters}) ->
         false                  -> undefined
         false                  -> undefined
     end.
     end.
 
 
-
 %% -- gen_server implementation --
 %% -- gen_server implementation --
 
 
 init([]) ->
 init([]) ->
@@ -248,7 +256,11 @@ handle_call({standby_status_update, FlushedLSN, AppliedLSN}, _From,
 handle_call({copy_send_rows, Rows}, _From,
 handle_call({copy_send_rows, Rows}, _From,
            #state{handler = Handler, subproto_state = CopyState} = State) ->
            #state{handler = Handler, subproto_state = CopyState} = State) ->
     Response = handle_copy_send_rows(Rows, Handler, CopyState, State),
     Response = handle_copy_send_rows(Rows, Handler, CopyState, State),
-    {reply, Response, State}.
+    {reply, Response, State};
+
+handle_call(activate, _From, State) ->
+    Res = activate_socket(State),
+    {reply, Res, State}.
 
 
 handle_cast({{Method, From, Ref} = Transport, Command, Args}, State)
 handle_cast({{Method, From, Ref} = Transport, Command, Args}, State)
   when ((Method == cast) or (Method == incremental)),
   when ((Method == cast) or (Method == incremental)),
@@ -278,6 +290,11 @@ handle_info({DataTag, Sock, Data2}, #state{data = Data, sock = Sock} = State)
   when DataTag == tcp; DataTag == ssl ->
   when DataTag == tcp; DataTag == ssl ->
     loop(State#state{data = <<Data/binary, Data2/binary>>});
     loop(State#state{data = <<Data/binary, Data2/binary>>});
 
 
+handle_info({Passive, Sock}, #state{sock = Sock} = State)
+  when Passive == ssl_passive; Passive == tcp_passive ->
+    NewState = handle_socket_pasive(State),
+    {noreply, NewState};
+
 handle_info({Closed, Sock}, #state{sock = Sock} = State)
 handle_info({Closed, Sock}, #state{sock = Sock} = State)
   when Closed == tcp_closed; Closed == ssl_closed ->
   when Closed == tcp_closed; Closed == ssl_closed ->
     {stop, sock_closed, flush_queue(State#state{sock = undefined}, {error, sock_closed})};
     {stop, sock_closed, flush_queue(State#state{sock = undefined}, {error, sock_closed})};
@@ -305,14 +322,44 @@ terminate(_Reason, #state{mod = ssl, sock = Sock}) -> ssl:close(Sock).
 code_change(_OldVsn, State, _Extra) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
     {ok, State}.
 
 
+format_status(Status = #{reason := _Reason, state := State}) ->
+  %% Do not format the rows attribute when process terminates abnormally
+  %% but allow it when is a sys:get_status/1.2
+  Status#{state => redact_state(State)};
+format_status(Status) ->
+    Status.
+
+%% TODO
+%% This is deprecated since OTP-25 in favor of `format_status/1`. Remove once
+%% OTP-25 becomes minimum supported OTP version.
 format_status(normal, [_PDict, State=#state{}]) ->
 format_status(normal, [_PDict, State=#state{}]) ->
   [{data, [{"State", State}]}];
   [{data, [{"State", State}]}];
 format_status(terminate, [_PDict, State]) ->
 format_status(terminate, [_PDict, State]) ->
   %% Do not format the rows attribute when process terminates abnormally
   %% Do not format the rows attribute when process terminates abnormally
   %% but allow it when is a sys:get_status/1.2
   %% but allow it when is a sys:get_status/1.2
-  State#state{rows = information_redacted}.
+  redact_state(State).
 
 
 %% -- internal functions --
 %% -- internal functions --
+-spec handle_socket_pasive(pg_sock()) -> pg_sock().
+handle_socket_pasive(#state{handler = on_replication,
+                            subproto_state = #repl{receiver = Rec}} = State) when is_pid(Rec) ->
+    %% Replication with pid() as X-Log data receiver
+    Rec ! {epgsql, self(), socket_passive},
+    State;
+handle_socket_pasive(#state{current_cmd_transport = {incremental, From, _}} = State) ->
+    %% `epgsqli' interface command
+    From ! {epgsql, self(), socket_passive},
+    State;
+handle_socket_pasive(State) ->
+    %% - current_cmd_transport is `call' or `cast': client expects whole result set anyway
+    %% - handler = on_copy_from_stdin: we don't expect much data from the server
+    %% - handler = on_replication with callback module as X-Log data receiver: pace controlled by
+    %%   callback execution time
+    %% - idle (eg, receiving asynchronous error or NOTIFICATION/WARNING): client might not expect
+    %%   to receive the `socket_passive' messages or there might be no client at all. Also, async
+    %%   notifications are usually small.
+    ok = activate_socket(State),
+    State.
 
 
 -spec command_new(transport(), epgsql_command:command(), any(), pg_sock()) ->
 -spec command_new(transport(), epgsql_command:command(), any(), pg_sock()) ->
                          Result when
                          Result when
@@ -407,13 +454,23 @@ command_next(#state{current_cmd = PrevCmd,
                         results = []}
                         results = []}
     end.
     end.
 
 
-
 setopts(#state{mod = Mod, sock = Sock}, Opts) ->
 setopts(#state{mod = Mod, sock = Sock}, Opts) ->
     case Mod of
     case Mod of
         gen_tcp -> inet:setopts(Sock, Opts);
         gen_tcp -> inet:setopts(Sock, Opts);
         ssl     -> ssl:setopts(Sock, Opts)
         ssl     -> ssl:setopts(Sock, Opts)
     end.
     end.
 
 
+-spec get_socket_active(pg_sock()) -> epgsql:socket_active().
+get_socket_active(#state{connect_opts = #{socket_active := Active}}) ->
+    Active;
+get_socket_active(_State) ->
+    true.
+
+-spec activate_socket(pg_sock()) -> ok | {error, inet:posix() | any()}.
+activate_socket(State) ->
+  Active = get_socket_active(State),
+  setopts(State, [{active, Active}]).
+
 %% This one only used in connection initiation to send client's
 %% This one only used in connection initiation to send client's
 %% `StartupMessage' and `SSLRequest' packets
 %% `StartupMessage' and `SSLRequest' packets
 -spec send(pg_sock(), iodata()) -> ok | {error, any()}.
 -spec send(pg_sock(), iodata()) -> ok | {error, any()}.
@@ -433,7 +490,12 @@ send_multi(#state{mod = Mod, sock = Sock}, List) ->
 do_send(gen_tcp, Sock, Bin) ->
 do_send(gen_tcp, Sock, Bin) ->
     %% Why not gen_tcp:send/2?
     %% Why not gen_tcp:send/2?
     %% See https://github.com/rabbitmq/rabbitmq-common/blob/v3.7.4/src/rabbit_writer.erl#L367-L384
     %% See https://github.com/rabbitmq/rabbitmq-common/blob/v3.7.4/src/rabbit_writer.erl#L367-L384
-    %% Because of that we also have `handle_info({inet_reply, ...`
+    %% Since `epgsql' uses `{active, true}' socket option by-default, it may potentially quickly
+    %% receive huge amount of data from the network.
+    %% With introduction of `{socket_active, N}' option it becomes less of a problem, but
+    %% `{active, true}' is still the default.
+    %%
+    %% Because we use `inet' driver directly, we also have `handle_info({inet_reply, ...`
     try erlang:port_command(Sock, Bin) of
     try erlang:port_command(Sock, Bin) of
         true ->
         true ->
             ok
             ok
@@ -737,3 +799,14 @@ handle_xlog_data(StartLSN, EndLSN, WALRecord,
               last_flushed_lsn = LastFlushedLSN,
               last_flushed_lsn = LastFlushedLSN,
               last_applied_lsn = LastAppliedLSN,
               last_applied_lsn = LastAppliedLSN,
               cbstate = NewCbState}.
               cbstate = NewCbState}.
+
+redact_state(State) ->
+    State#state{rows = information_redacted}.
+
+-ifdef(TEST).
+
+state_to_map(State) ->
+    [state | Fields] = tuple_to_list(State),
+    maps:from_list(lists:zip(record_info(fields, state), Fields)).
+
+-endif.

+ 110 - 2
test/epgsql_SUITE.erl

@@ -15,6 +15,9 @@
     end_per_suite/1
     end_per_suite/1
 ]).
 ]).
 
 
+%% logger handler
+-export([log/2]).
+
 -compile([export_all, nowarn_export_all]).
 -compile([export_all, nowarn_export_all]).
 
 
 modules() ->
 modules() ->
@@ -77,9 +80,14 @@ groups() ->
             pipelined_prepared_query,
             pipelined_prepared_query,
             pipelined_parse_batch_execute
             pipelined_parse_batch_execute
         ]},
         ]},
+        {incremental_sock_active, [parallel], [
+            incremental_sock_active_n,
+            incremental_sock_active_n_ssl
+        ]},
         {generic, [parallel], [
         {generic, [parallel], [
             with_transaction,
             with_transaction,
-            mixed_api
+            mixed_api,
+            redacted_state
         ]}
         ]}
     ],
     ],
 
 
@@ -145,7 +153,7 @@ groups() ->
     SubGroups ++
     SubGroups ++
         [{epgsql, [], [{group, generic} | Tests]},
         [{epgsql, [], [{group, generic} | Tests]},
          {epgsql_cast, [], [{group, pipelining} | Tests]},
          {epgsql_cast, [], [{group, pipelining} | Tests]},
-         {epgsql_incremental, [], Tests}].
+         {epgsql_incremental, [], [{group, incremental_sock_active} | Tests]}].
 
 
 end_per_suite(_Config) ->
 end_per_suite(_Config) ->
     ok.
     ok.
@@ -1609,6 +1617,106 @@ pipelined_parse_batch_execute(Config) ->
                end || Ref <- CloseRefs],
                end || Ref <- CloseRefs],
               erlang:cancel_timer(Timer)
               erlang:cancel_timer(Timer)
       end).
       end).
+
+incremental_sock_active_n(Config) ->
+    epgsql_incremental = ?config(module, Config),
+    Q = "SELECT *, 'Hello world' FROM generate_series(0, 10240)",
+    epgsql_ct:with_connection(Config,
+         fun(C) ->
+             Ref = epgsqli:squery(C, Q),
+             {done, NumPassive, Others, Rows} = recv_incremental_active_n(C, Ref),
+             ?assertMatch([{columns, _}, {complete, _}], Others),
+             ?assert(NumPassive > 0),
+             ?assertMatch([{<<"0">>, <<"Hello world">>},
+                           {<<"1">>, <<"Hello world">>} | _], Rows),
+             ?assertEqual(10241, length(Rows))
+         end,
+         "epgsql_test",
+         [{socket_active, 2}]).
+
+-ifdef(OTP_RELEASE).
+incremental_sock_active_n_ssl(Config) ->
+    epgsql_incremental = ?config(module, Config),
+    Q = "SELECT *, 'Hello world' FROM generate_series(0, 10240)",
+    epgsql_ct:with_connection(Config,
+         fun(C) ->
+             Ref = epgsqli:squery(C, Q),
+             {done, NumPassive, Others, Rows} = recv_incremental_active_n(C, Ref),
+             ?assertMatch([{columns, _}, {complete, _}], Others),
+             ?assert(NumPassive > 0),
+             ?assertMatch([{<<"0">>, <<"Hello world">>},
+                           {<<"1">>, <<"Hello world">>} | _], Rows),
+             ?assertEqual(10241, length(Rows))
+         end,
+         "epgsql_test",
+         [{ssl, true}, {socket_active, 2}]).
+-else.
+%% {active, N} for SSL is only supported on OTP-21+
+incremental_sock_active_n_ssl(_Config) ->
+    noop.
+-endif.
+
+recv_incremental_active_n(C, Ref) ->
+    recv_incremental_active_n(C, Ref, 0, [], []).
+
+recv_incremental_active_n(C, Ref, NumPassive, Rows, Others) ->
+    receive
+        {C, Ref, {data, Row}} ->
+            recv_incremental_active_n(C, Ref, NumPassive, [Row | Rows], Others);
+        {epgsql, C, socket_passive} ->
+            ok = epgsql:activate(C),
+            recv_incremental_active_n(C, Ref, NumPassive + 1, Rows, Others);
+        {C, Ref, {error, _} = E} ->
+            E;
+        {C, Ref, done} ->
+            {done, NumPassive, lists:reverse(Others), lists:reverse(Rows)};
+        {C, Ref, Other} ->
+            recv_incremental_active_n(C, Ref, NumPassive, Rows, [Other | Others]);
+        Other ->
+            recv_incremental_active_n(C, Ref, NumPassive, Rows, [Other | Others])
+    after 5000 ->
+            error({timeout, NumPassive, Others, Rows})
+    end.
+
+redacted_state(Config) ->
+    case erlang:system_info(otp_release) of
+      V = [_, _] when V > "20" ->
+        redacted_state_(Config);
+      V ->
+        {skip, {"Logger facility is available starting OTP 21, running on OTP " ++ V}}
+    end.
+
+redacted_state_(Config) ->
+    _Handle = ct:timetrap({seconds, 3}),
+    try
+      logger:add_handler(?MODULE, ?MODULE, #{relay_to => self()}),
+      C = epgsql_ct:connect(Config),
+      true = unlink(C),
+      Reason = {please, ?MODULE, ?FUNCTION_NAME},
+      ok = proc_lib:stop(C, Reason, 1000),
+      receive
+        {log, Message} ->
+            ?assertMatch({report, #{label := {gen_server, terminate},
+                                    reason := Reason,
+                                    state := _}},
+                         Message),
+            {report, #{state := State}} = Message,
+            ?assertMatch(#{rows := information_redacted},
+                         epgsql_sock:state_to_map(State))
+      end
+    after
+      logger:remove_handler(?MODULE)
+    end.
+
+%% =============================================================================
+%% Logger handler
+%% ============================================================================
+
+log(#{msg := Msg}, #{relay_to := Pid}) ->
+    Pid ! {log, Msg};
+log(_, _) ->
+    ok.
+
 %% =============================================================================
 %% =============================================================================
 %% Internal functions
 %% Internal functions
 %% ============================================================================
 %% ============================================================================

+ 12 - 3
test/epgsql_ct.erl

@@ -4,6 +4,7 @@
 
 
 -export([
 -export([
     connection_data/1,
     connection_data/1,
+    connect/1,
     connect_only/2,
     connect_only/2,
     with_connection/2,
     with_connection/2,
     with_connection/3,
     with_connection/3,
@@ -19,6 +20,16 @@ connection_data(Config) ->
     Port = ?config(port, PgConfig),
     Port = ?config(port, PgConfig),
     {Host, Port}.
     {Host, Port}.
 
 
+connect(Config) ->
+    connect(Config, "epgsql_test", []).
+
+connect(Config, Username, Args) ->
+    {Host, Port} = connection_data(Config),
+    Module = ?config(module, Config),
+    Args2 = [{port, Port}, {database, "epgsql_test_db1"} | Args],
+    {ok, C} = Module:connect(Host, Username, Args2),
+    C.
+
 connect_only(Config, Args) ->
 connect_only(Config, Args) ->
     {Host, Port} = connection_data(Config),
     {Host, Port} = connection_data(Config),
     Module = ?config(module, Config),
     Module = ?config(module, Config),
@@ -39,10 +50,8 @@ with_connection(Config, F, Args) ->
     with_connection(Config, F, "epgsql_test", Args).
     with_connection(Config, F, "epgsql_test", Args).
 
 
 with_connection(Config, F, Username, Args) ->
 with_connection(Config, F, Username, Args) ->
-    {Host, Port} = connection_data(Config),
     Module = ?config(module, Config),
     Module = ?config(module, Config),
-    Args2 = [{port, Port}, {database, "epgsql_test_db1"} | Args],
-    {ok, C} = Module:connect(Host, Username, Args2),
+    C = connect(Config, Username, Args),
     try
     try
         F(C)
         F(C)
     after
     after

+ 7 - 3
test/epgsql_cth.erl

@@ -150,7 +150,7 @@ init_database(Config) ->
 
 
     {ok, Cwd} = file:get_cwd(),
     {ok, Cwd} = file:get_cwd(),
     PgDataDir = filename:append(Cwd, "datadir"),
     PgDataDir = filename:append(Cwd, "datadir"),
-    {ok, _} = exec:run(Initdb ++ " --locale en_US.UTF8 " ++ PgDataDir, [sync,stdout,stderr]),
+    {ok, _} = exec:run(Initdb ++ " --locale en_US.UTF-8 " ++ PgDataDir, [sync, stdout, stderr]),
     [{datadir, PgDataDir}|Config].
     [{datadir, PgDataDir}|Config].
 
 
 get_version(Config) ->
 get_version(Config) ->
@@ -200,6 +200,10 @@ write_pg_hba_config(Config) ->
     Version = ?config(version, Config),
     Version = ?config(version, Config),
 
 
     User = os:getenv("USER"),
     User = os:getenv("USER"),
+    ClientCert = case Version >= [12] of
+                   true -> "cert";
+                   false -> "cert clientcert=1"
+                 end,
     PGConfig = [
     PGConfig = [
         "local   all             ", User, "                              trust\n",
         "local   all             ", User, "                              trust\n",
         "host    template1       ", User, "              127.0.0.1/32    trust\n",
         "host    template1       ", User, "              127.0.0.1/32    trust\n",
@@ -209,7 +213,7 @@ write_pg_hba_config(Config) ->
         "host    epgsql_test_db1 epgsql_test             127.0.0.1/32    trust\n",
         "host    epgsql_test_db1 epgsql_test             127.0.0.1/32    trust\n",
         "host    epgsql_test_db1 epgsql_test_md5         127.0.0.1/32    md5\n",
         "host    epgsql_test_db1 epgsql_test_md5         127.0.0.1/32    md5\n",
         "host    epgsql_test_db1 epgsql_test_cleartext   127.0.0.1/32    password\n",
         "host    epgsql_test_db1 epgsql_test_cleartext   127.0.0.1/32    password\n",
-        "hostssl epgsql_test_db1 epgsql_test_cert        127.0.0.1/32    cert clientcert=1\n",
+        "hostssl epgsql_test_db1 epgsql_test_cert        127.0.0.1/32    ", ClientCert, "\n",
         "host    template1       ", User, "              ::1/128    trust\n",
         "host    template1       ", User, "              ::1/128    trust\n",
         "host    ", User, "      ", User, "              ::1/128    trust\n",
         "host    ", User, "      ", User, "              ::1/128    trust\n",
         "hostssl postgres        ", User, "              ::1/128    trust\n",
         "hostssl postgres        ", User, "              ::1/128    trust\n",
@@ -217,7 +221,7 @@ write_pg_hba_config(Config) ->
         "host    epgsql_test_db1 epgsql_test             ::1/128    trust\n",
         "host    epgsql_test_db1 epgsql_test             ::1/128    trust\n",
         "host    epgsql_test_db1 epgsql_test_md5         ::1/128    md5\n",
         "host    epgsql_test_db1 epgsql_test_md5         ::1/128    md5\n",
         "host    epgsql_test_db1 epgsql_test_cleartext   ::1/128    password\n",
         "host    epgsql_test_db1 epgsql_test_cleartext   ::1/128    password\n",
-        "hostssl epgsql_test_db1 epgsql_test_cert        ::1/128    cert clientcert=1\n" |
+        "hostssl epgsql_test_db1 epgsql_test_cert        ::1/128    ", ClientCert, "\n" |
         case Version >= [10] of
         case Version >= [10] of
             true ->
             true ->
                 %% See
                 %% See

+ 19 - 1
test/epgsql_incremental.erl

@@ -40,7 +40,10 @@ await_connect(Ref, Opts0) ->
         {C, Ref, connected} ->
         {C, Ref, connected} ->
             {ok, C};
             {ok, C};
         {_C, Ref, Error = {error, _}} ->
         {_C, Ref, Error = {error, _}} ->
-            Error
+            Error;
+        {epgsql, C, socket_passive} ->
+            ok = epgsql:activate(C),
+            await_connect(Ref, Opts0)
     after Timeout ->
     after Timeout ->
             error(timeout)
             error(timeout)
     end.
     end.
@@ -200,6 +203,9 @@ receive_result(C, Ref, Cols, Rows) ->
             {ok, Cols, lists:reverse(Rows)};
             {ok, Cols, lists:reverse(Rows)};
         {C, Ref, done} ->
         {C, Ref, done} ->
             done;
             done;
+        {epgsql, C, socket_passive} ->
+            ok = epgsql:activate(C),
+            receive_result(C, Ref, Cols, Rows);
         {'EXIT', C, _Reason} ->
         {'EXIT', C, _Reason} ->
             throw({error, closed})
             throw({error, closed})
     end.
     end.
@@ -229,6 +235,9 @@ receive_extended_result(C, Ref, Rows) ->
             {ok, lists:reverse(Rows)};
             {ok, lists:reverse(Rows)};
         {C, Ref, done} ->
         {C, Ref, done} ->
             done;
             done;
+        {epgsql, C, socket_passive} ->
+            ok = epgsql:activate(C),
+            receive_extended_result(C, Ref, Rows);
         {'EXIT', C, _Reason} ->
         {'EXIT', C, _Reason} ->
             {error, closed}
             {error, closed}
     end.
     end.
@@ -243,6 +252,9 @@ receive_describe(C, Ref, Statement = #statement{}) ->
             {ok, Statement#statement{columns = []}};
             {ok, Statement#statement{columns = []}};
         {C, Ref, Error = {error, _}} ->
         {C, Ref, Error = {error, _}} ->
             Error;
             Error;
+        {epgsql, C, socket_passive} ->
+            ok = epgsql:activate(C),
+            receive_describe(C, Ref, Statement);
         {'EXIT', C, _Reason} ->
         {'EXIT', C, _Reason} ->
             {error, closed}
             {error, closed}
     end.
     end.
@@ -255,6 +267,9 @@ receive_describe_portal(C, Ref) ->
             {ok, []};
             {ok, []};
         {C, Ref, Error = {error, _}} ->
         {C, Ref, Error = {error, _}} ->
             Error;
             Error;
+        {epgsql, C, socket_passive} ->
+            ok = epgsql:activate(C),
+            receive_describe_portal(C, Ref);
         {'EXIT', C, _Reason} ->
         {'EXIT', C, _Reason} ->
             {error, closed}
             {error, closed}
     end.
     end.
@@ -265,6 +280,9 @@ receive_atom(C, Ref, Receive, Return) ->
             Return;
             Return;
         {C, Ref, Error = {error, _}} ->
         {C, Ref, Error = {error, _}} ->
             Error;
             Error;
+        {epgsql, C, socket_passive} ->
+            ok = epgsql:activate(C),
+            receive_atom(C, Ref, Receive, Return);
         {'EXIT', C, _Reason} ->
         {'EXIT', C, _Reason} ->
             {error, closed}
             {error, closed}
     end.
     end.

+ 155 - 85
test/epgsql_replication_SUITE.erl

@@ -1,110 +1,180 @@
 -module(epgsql_replication_SUITE).
 -module(epgsql_replication_SUITE).
+
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include("epgsql.hrl").
 -include("epgsql.hrl").
 
 
--export([
-    init_per_suite/1,
-    all/0,
-    end_per_suite/1,
+-export([all/0,
+         init_per_suite/1,
+         end_per_suite/1,
 
 
-    connect_in_repl_mode/1,
-    create_drop_replication_slot/1,
-    replication_sync/1,
-    replication_async/1,
+         connect_in_repl_mode/1,
+         create_drop_replication_slot/1,
+         replication_sync/1,
+         replication_async/1,
+         replication_async_active_n_socket/1,
+         replication_sync_active_n_socket/1,
+         replication_async_active_n_ssl/1,
 
 
-    %% Callbacks
-    handle_x_log_data/4
-]).
+         %% Callbacks
+         handle_x_log_data/4
+        ]).
 
 
 init_per_suite(Config) ->
 init_per_suite(Config) ->
-    [{module, epgsql}|Config].
+  [{module, epgsql} | Config].
 
 
 end_per_suite(_Config) ->
 end_per_suite(_Config) ->
-    ok.
+  ok.
 
 
 all() ->
 all() ->
-    [
-     connect_in_repl_mode,
-     create_drop_replication_slot,
-     replication_async,
-     replication_sync
-    ].
+  [connect_in_repl_mode,
+   create_drop_replication_slot,
+   replication_async,
+   replication_sync,
+   replication_async_active_n_socket,
+   replication_sync_active_n_socket,
+   replication_async_active_n_ssl
+  ].
 
 
 connect_in_repl_mode(Config) ->
 connect_in_repl_mode(Config) ->
-    epgsql_ct:connect_only(Config, ["epgsql_test_replication",
-        "epgsql_test_replication",
-        [{database, "epgsql_test_db1"}, {replication, "database"}]]).
+  epgsql_ct:connect_only(
+    Config,
+    ["epgsql_test_replication",
+     "epgsql_test_replication",
+     [{database, "epgsql_test_db1"}, {replication, "database"}]
+    ]).
 
 
 create_drop_replication_slot(Config) ->
 create_drop_replication_slot(Config) ->
-    Module = ?config(module, Config),
-    epgsql_ct:with_connection(
-        Config,
-        fun(C) ->
-            {ok, Cols, Rows} = Module:squery(C, "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
-            [#column{name = <<"slot_name">>}, #column{name = <<"consistent_point">>},
-                #column{name = <<"snapshot_name">>}, #column{name = <<"output_plugin">>}] = Cols,
-            [{<<"epgsql_test">>, _, _, <<"test_decoding">>}] = Rows,
-            [{ok, _, _}, {ok, _, _}] = Module:squery(C, "DROP_REPLICATION_SLOT ""epgsql_test""")
-        end,
-        "epgsql_test_replication",
-        [{replication, "database"}]).
+  epgsql_ct:with_connection(
+    Config,
+    fun(C) ->
+        create_replication_slot(Config, C),
+        drop_replication_slot(Config, C)
+    end,
+    "epgsql_test_replication",
+    [{replication, "database"}]).
 
 
 replication_async(Config) ->
 replication_async(Config) ->
-    replication_test_run(Config, self()).
+  replication_test_run(Config, self()).
 
 
 replication_sync(Config) ->
 replication_sync(Config) ->
-    replication_test_run(Config, ?MODULE).
+  replication_test_run(Config, ?MODULE).
+
+replication_async_active_n_socket(Config) ->
+  replication_test_run(Config, self(), [{socket_active, 1}]).
+
+replication_sync_active_n_socket(Config) ->
+  replication_test_run(Config, ?MODULE, [{socket_active, 1}]).
+
+-ifdef(OTP_RELEASE).
+replication_async_active_n_ssl(Config) ->
+  replication_test_run(Config, self(), [{socket_active, 1}, {ssl, require}]).
+-else.
+%% {active, N} for SSL is only supported on OTP-21+
+replication_async_active_n_ssl(Config) ->
+    noop.
+-endif.
 
 
 replication_test_run(Config, Callback) ->
 replication_test_run(Config, Callback) ->
-    Module = ?config(module, Config),
-    epgsql_ct:with_connection(
-        Config,
-        fun(C) ->
-            {ok, _, _} = Module:squery(C, "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
-
-            %% new connection because main id in a replication mode
-            epgsql_ct:with_connection(
-                Config,
-                fun(C2) ->
-                    [{ok, 1},{ok, 1}] = Module:squery(C2,
-                        "insert into test_table1 (id, value) values (5, 'five');delete from test_table1 where id = 5;")
-                end),
-
-            Module:start_replication(C, "epgsql_test", Callback, {C, self()}, "0/0"),
-            ok = receive_replication_msgs(
-                [<<"table public.test_table1: INSERT: id[integer]:5 value[text]:'five'">>,
-                    <<"table public.test_table1: DELETE: id[integer]:5">>], C, [])
-        end,
-        "epgsql_test_replication",
-        [{replication, "database"}]),
-    %% cleanup
-    epgsql_ct:with_connection(
-        Config,
-        fun(C) ->
-            [{ok, _, _}, {ok, _, _}] = Module:squery(C, "DROP_REPLICATION_SLOT ""epgsql_test""")
-        end,
-        "epgsql_test_replication",
-        [{replication, "database"}]).
-
-receive_replication_msgs(Pattern, Pid, ReceivedMsgs) ->
-    receive
-        {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"BEGIN", _/binary>>}} ->
-            receive_replication_msgs(Pattern, Pid, [begin_msg | ReceivedMsgs]);
-        {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"COMMIT", _/binary>>}} ->
-            case lists:reverse(ReceivedMsgs) of
-                [begin_msg, row_msg | _] -> ok;
-                _ -> error_replication_messages_not_received
-            end;
-        {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, Msg}} ->
-            [Msg | T] = Pattern,
-            receive_replication_msgs(T, Pid, [row_msg | ReceivedMsgs])
-    after
-        60000 ->
-            error_timeout
-    end.
+  replication_test_run(Config, Callback, []).
+
+replication_test_run(Config, Callback, ExtOpts) ->
+  Module = ?config(module, Config),
+  {Queries, ReplicationMsgs} = gen_query_and_replication_msgs(lists:seq(100, 110)),
+  epgsql_ct:with_connection(
+    Config,
+    fun(C) ->
+        create_replication_slot(Config, C),
+        %% new connection because main is in the replication mode
+        epgsql_ct:with_connection(
+          Config,
+          fun(C2) ->
+              ExpectedResult = lists:duplicate(length(Queries), {ok, 1}),
+              Res = Module:squery(C2, Queries),
+              ?assertEqual(ExpectedResult, Res)
+          end),
+        Module:start_replication(C, "epgsql_test", Callback, {C, self()}, "0/0"),
+        ok = receive_replication_msgs(Module, ReplicationMsgs, C, [])
+    end,
+    "epgsql_test_replication",
+    [{replication, "database"} | ExtOpts]),
+  %% cleanup
+  epgsql_ct:with_connection(
+    Config,
+    fun(C) -> drop_replication_slot(Config, C) end,
+    "epgsql_test_replication",
+    [{replication, "database"}]).
+
+create_replication_slot(Config, Connection) ->
+  Module = ?config(module, Config),
+  {ok, Cols, Rows} =
+    Module:squery(Connection,
+                  "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
+  ?assertMatch([#column{name = <<"slot_name">>},
+                #column{name = <<"consistent_point">>},
+                #column{name = <<"snapshot_name">>},
+                #column{name = <<"output_plugin">>}
+               ],
+               Cols),
+  ?assertMatch([{<<"epgsql_test">>, _, _, <<"test_decoding">>}], Rows).
+
+drop_replication_slot(Config, Connection) ->
+  Module = ?config(module, Config),
+  Result = Module:squery(Connection, "DROP_REPLICATION_SLOT ""epgsql_test"""),
+  case ?config(version, ?config(pg_config, Config)) >= [13, 0] of
+    true -> ?assertMatch({ok, _, _}, Result);
+    false -> ?assertMatch([{ok, _, _}, {ok, _, _}], Result)
+  end.
+
+gen_query_and_replication_msgs(Ids) ->
+  QInsFmt = "INSERT INTO test_table1 (id, value) VALUES (~b, '~s');",
+  QDelFmt = "DELETE FROM test_table1 WHERE id = ~b;",
+  RmInsFmt = "table public.test_table1: INSERT: id[integer]:~b value[text]:'~s'",
+  RmDelFmt = "table public.test_table1: DELETE: id[integer]:~b",
+  LongBin = base64:encode(crypto:strong_rand_bytes(254)),
+  lists:foldl(
+    fun(Id, {Qs, RMs}) ->
+        QIns = lists:flatten(io_lib:format(QInsFmt, [Id, LongBin])),
+        QDel = lists:flatten(io_lib:format(QDelFmt, [Id])),
+        RmIns = iolist_to_binary(io_lib:format(RmInsFmt, [Id, LongBin])),
+        RmDel = iolist_to_binary(io_lib:format(RmDelFmt, [Id])),
+        {Qs ++ [QIns, QDel], RMs ++ [RmIns, RmDel]}
+    end,
+    {[], []},
+    Ids).
+
+receive_replication_msgs(Module, Pattern, Pid, ReceivedMsgs) ->
+  receive
+    {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"BEGIN", _/binary>>}} ->
+      receive_replication_msgs(Module, Pattern, Pid, [begin_msg | ReceivedMsgs]);
+    {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"COMMIT", _/binary>>}} ->
+      ensure_no_socket_passive_msgs(Module, Pid),
+      case lists:reverse(ReceivedMsgs) of
+        [begin_msg, row_msg | _] -> ok;
+        _ -> error_replication_messages_not_received
+      end;
+    {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, Msg}} ->
+      [Msg | T] = Pattern,
+      receive_replication_msgs(Module, T, Pid, [row_msg | ReceivedMsgs]);
+    {epgsql, Pid, socket_passive} ->
+      Module:activate(Pid),
+      receive_replication_msgs(Module, Pattern, Pid, ReceivedMsgs)
+  after
+    60000 ->
+      error_timeout
+  end.
+
+ensure_no_socket_passive_msgs(Module, Pid) ->
+  receive
+    {epgsql, Pid, socket_passive} ->
+      Module:activate(Pid),
+      ensure_no_socket_passive_msgs(Module, Pid)
+  after
+    100 ->
+      ok
+  end.
 
 
 handle_x_log_data(StartLSN, EndLSN, Data, CbState) ->
 handle_x_log_data(StartLSN, EndLSN, Data, CbState) ->
-    {C, Pid} = CbState,
-    Pid ! {epgsql, C, {x_log_data, StartLSN, EndLSN, Data}},
-    {ok, EndLSN, EndLSN, CbState}.
+  {C, Pid} = CbState,
+  Pid ! {epgsql, C, {x_log_data, StartLSN, EndLSN, Data}},
+  {ok, EndLSN, EndLSN, CbState}.