Browse Source

Merge pull request #275 from featalion/implement-socket-active-n-for-replication-streaming

A flow control in the replication streaming mode
Sergey Prokhorov 2 years ago
parent
commit
8f3751460b
6 changed files with 256 additions and 103 deletions
  1. 5 1
      README.md
  2. 37 1
      doc/streaming.md
  3. 23 5
      src/epgsql.erl
  4. 40 8
      src/epgsql_sock.erl
  5. 7 3
      test/epgsql_cth.erl
  6. 144 85
      test/epgsql_replication_SUITE.erl

+ 5 - 1
README.md

@@ -108,7 +108,7 @@ Only `host` and `username` are mandatory, but most likely you would need `databa
 - `ssl` if set to `true`, perform an attempt to connect in ssl mode, but continue unencrypted
 - `ssl` if set to `true`, perform an attempt to connect in ssl mode, but continue unencrypted
   if encryption isn't supported by server. if set to `required` connection will fail if encryption
   if encryption isn't supported by server. if set to `required` connection will fail if encryption
   is not available.
   is not available.
-- `ssl_opts` will be passed as is to `ssl:connect/3`
+- `ssl_opts` will be passed as is to `ssl:connect/3`.
 - `tcp_opts` will be passed as is to `gen_tcp:connect/3`. Some options are forbidden, such as
 - `tcp_opts` will be passed as is to `gen_tcp:connect/3`. Some options are forbidden, such as
   `mode`, `packet`, `header`, `active`. When `tcp_opts` is not provided, epgsql does some tuning
   `mode`, `packet`, `header`, `active`. When `tcp_opts` is not provided, epgsql does some tuning
   (eg, sets TCP `keepalive` and auto-tunes `buffer`), but when `tcp_opts` is provided, no
   (eg, sets TCP `keepalive` and auto-tunes `buffer`), but when `tcp_opts` is provided, no
@@ -125,6 +125,10 @@ Only `host` and `username` are mandatory, but most likely you would need `databa
 - `application_name` is an optional string parameter. It is usually set by an application upon
 - `application_name` is an optional string parameter. It is usually set by an application upon
    connection to the server. The name will be displayed in the `pg_stat_activity`
    connection to the server. The name will be displayed in the `pg_stat_activity`
    view and included in CSV log entries.
    view and included in CSV log entries.
+- `socket_active` is an optional parameter, which can be true or integer in the range -32768
+  to 32767 (inclusive). This option only works in the replication mode and is used to control
+  the flow of incoming messages. See [Streaming replication protocol](#streaming-replication-protocol)
+  for more details.
 
 
 Options may be passed as proplist or as map with the same key names.
 Options may be passed as proplist or as map with the same key names.
 
 

+ 37 - 1
doc/streaming.md

@@ -138,4 +138,40 @@ handle_x_log_data(StartLSN, EndLSN, Data, CbState) ->
     then you do not have any risk to lose data. 
     then you do not have any risk to lose data. 
     
     
     Otherwise (if you do not load all data from tables during erlang app startup) 
     Otherwise (if you do not load all data from tables during erlang app startup) 
-    it is not recommended to set align_lsn to true. In this case to stop PG server stop epgsql replication first.
+    it is not recommended to set align_lsn to true. In this case to stop PG server stop epgsql replication first.
+    
+## Flow control
+
+It is possible to set `{socket_active, N}` on a [TCP](https://www.erlang.org/doc/man/inet.html#setopts-2)
+or [SSL](https://www.erlang.org/doc/man/ssl.html#setopts-2) (since OTP 21.3) socket. E.g. for SSL:
+```erlang
+Opts = #{host => "localhost",
+         username => "me",
+         password => "pwd",
+         database => "test",
+         ssl => require,
+         socket_active => 10
+        },
+{ok, Conn} = epgsql:connect(Opts).
+```
+
+It is currently allowed only in the replication mode. Its main purpose is to control the flow of
+replication messages from Postgresql database. If a database is under a high load and a process, which
+handles the message stream, cannot keep up with it then setting this option gives the handling process
+ability to get messages on-demand.
+
+When the connection is in the asynchronous mode, a process which owns a connection will receive
+```erlang
+{epgsql, Connection, socket_passive}
+```
+as soon as underlying socket received N messages from network.
+
+The process decides when to activate connection's socket again. To do that it should call:
+```erlang
+epgsql:activate(Connection).
+```
+The `active` parameter of the socket will be set to the same value as it was configured in
+the connection's options.
+
+In the case of synchronous handler for replication messages `epgsql` will handle `socket_passive`
+messages internally.

+ 23 - 5
src/epgsql.erl

@@ -37,13 +37,15 @@
          start_replication/5,
          start_replication/5,
          start_replication/6,
          start_replication/6,
          start_replication/7,
          start_replication/7,
-         to_map/1]).
--export([handle_x_log_data/5]).                 % private
+         to_map/1,
+         activate/1]).
+%% private
+-export([handle_x_log_data/5]).
 
 
 -export_type([connection/0, connect_option/0, connect_opts/0,
 -export_type([connection/0, connect_option/0, connect_opts/0,
               connect_error/0, query_error/0, sql_query/0, column/0,
               connect_error/0, query_error/0, sql_query/0, column/0,
               type_name/0, epgsql_type/0, statement/0,
               type_name/0, epgsql_type/0, statement/0,
-              transaction_option/0, transaction_opts/0]).
+              transaction_option/0, transaction_opts/0, socket_active/0]).
 
 
 %% Deprecated types
 %% Deprecated types
 -export_type([bind_param/0, typed_param/0,
 -export_type([bind_param/0, typed_param/0,
@@ -62,6 +64,7 @@
 -type host() :: inet:ip_address() | inet:hostname().
 -type host() :: inet:ip_address() | inet:hostname().
 -type password() :: string() | iodata() | fun( () -> iodata() ).
 -type password() :: string() | iodata() | fun( () -> iodata() ).
 -type connection() :: pid().
 -type connection() :: pid().
+-type socket_active() :: true | -32768..32767.
 -type connect_option() ::
 -type connect_option() ::
     {host, host()}                                 |
     {host, host()}                                 |
     {username, string()}                           |
     {username, string()}                           |
@@ -76,7 +79,8 @@
     {codecs,   Codecs     :: [{epgsql_codec:codec_mod(), any()}]} |
     {codecs,   Codecs     :: [{epgsql_codec:codec_mod(), any()}]} |
     {nulls,    Nulls      :: [any(), ...]} |    % terms to be used as NULL
     {nulls,    Nulls      :: [any(), ...]} |    % terms to be used as NULL
     {replication, Replication :: string()} | % Pass "database" to connect in replication mode
     {replication, Replication :: string()} | % Pass "database" to connect in replication mode
-    {application_name, ApplicationName :: string()}.
+    {application_name, ApplicationName :: string()} |
+    {socket_active, Active :: socket_active()}.
 
 
 -type connect_opts() ::
 -type connect_opts() ::
         [connect_option()]
         [connect_option()]
@@ -93,7 +97,8 @@
           codecs => [{epgsql_codec:codec_mod(), any()}],
           codecs => [{epgsql_codec:codec_mod(), any()}],
           nulls => [any(), ...],
           nulls => [any(), ...],
           replication => string(),
           replication => string(),
-          application_name => string()
+          application_name => string(),
+          socket_active => socket_active()
           }.
           }.
 
 
 -type transaction_option() ::
 -type transaction_option() ::
@@ -571,3 +576,16 @@ to_map(Map) when is_map(Map) ->
     Map;
     Map;
 to_map(List) when is_list(List) ->
 to_map(List) when is_list(List) ->
     maps:from_list(List).
     maps:from_list(List).
+
+%% @doc Activates TCP or SSL socket of a connection.
+%%
+%% If the `socket_active` connection option is supplied the function sets
+%% `{active, X}' the connection's SSL or TCP socket. It sets `{active, true}' otherwise.
+%%
+%% @param Connection connection
+%% @returns `ok' or `{error, Reason}'
+%%
+%% Note: The ssl:reason() type is not exported so that we use `any()' on the spec.
+-spec activate(connection()) -> ok | {error, inet:posix() | any()}.
+activate(Connection) ->
+    epgsql_sock:activate(Connection).

+ 40 - 8
src/epgsql_sock.erl

@@ -53,7 +53,8 @@
          cancel/1,
          cancel/1,
          copy_send_rows/3,
          copy_send_rows/3,
          standby_status_update/3,
          standby_status_update/3,
-         get_backend_pid/1]).
+         get_backend_pid/1,
+         activate/1]).
 
 
 -export([handle_call/3, handle_cast/2, handle_info/2, format_status/2]).
 -export([handle_call/3, handle_cast/2, handle_info/2, format_status/2]).
 -export([init/1, code_change/3, terminate/2]).
 -export([init/1, code_change/3, terminate/2]).
@@ -69,7 +70,6 @@
 
 
 -export_type([transport/0, pg_sock/0, error/0]).
 -export_type([transport/0, pg_sock/0, error/0]).
 
 
--include("epgsql.hrl").
 -include("protocol.hrl").
 -include("protocol.hrl").
 -include("epgsql_replication.hrl").
 -include("epgsql_replication.hrl").
 -include("epgsql_copy.hrl").
 -include("epgsql_copy.hrl").
@@ -156,6 +156,11 @@ standby_status_update(C, FlushedLSN, AppliedLSN) ->
 get_backend_pid(C) ->
 get_backend_pid(C) ->
     gen_server:call(C, get_backend_pid).
     gen_server:call(C, get_backend_pid).
 
 
+%% The ssl:reason() type is not exported
+-spec activate(epgsql:connection()) -> ok | {error, inet:posix() | any()}.
+activate(C) ->
+    gen_server:call(C, activate).
+
 %% -- command APIs --
 %% -- command APIs --
 
 
 %% send()
 %% send()
@@ -164,7 +169,7 @@ get_backend_pid(C) ->
 -spec set_net_socket(gen_tcp | ssl, tcp_socket() | ssl:sslsocket(), pg_sock()) -> pg_sock().
 -spec set_net_socket(gen_tcp | ssl, tcp_socket() | ssl:sslsocket(), pg_sock()) -> pg_sock().
 set_net_socket(Mod, Socket, State) ->
 set_net_socket(Mod, Socket, State) ->
     State1 = State#state{mod = Mod, sock = Socket},
     State1 = State#state{mod = Mod, sock = Socket},
-    setopts(State1, [{active, true}]),
+    ok = activate_socket(State1),
     State1.
     State1.
 
 
 -spec init_replication_state(pg_sock()) -> pg_sock().
 -spec init_replication_state(pg_sock()) -> pg_sock().
@@ -189,8 +194,10 @@ set_attr(connect_opts, ConnectOpts, State) ->
 
 
 %% XXX: be careful!
 %% XXX: be careful!
 -spec set_packet_handler(atom(), pg_sock()) -> pg_sock().
 -spec set_packet_handler(atom(), pg_sock()) -> pg_sock().
-set_packet_handler(Handler, State) ->
-    State#state{handler = Handler}.
+set_packet_handler(Handler, State0) ->
+    State = State0#state{handler = Handler},
+    ok = activate_socket(State),
+    State.
 
 
 -spec get_codec(pg_sock()) -> epgsql_binary:codec().
 -spec get_codec(pg_sock()) -> epgsql_binary:codec().
 get_codec(#state{codec = Codec}) ->
 get_codec(#state{codec = Codec}) ->
@@ -215,7 +222,6 @@ get_parameter_internal(Name, #state{parameters = Parameters}) ->
         false                  -> undefined
         false                  -> undefined
     end.
     end.
 
 
-
 %% -- gen_server implementation --
 %% -- gen_server implementation --
 
 
 init([]) ->
 init([]) ->
@@ -248,7 +254,11 @@ handle_call({standby_status_update, FlushedLSN, AppliedLSN}, _From,
 handle_call({copy_send_rows, Rows}, _From,
 handle_call({copy_send_rows, Rows}, _From,
            #state{handler = Handler, subproto_state = CopyState} = State) ->
            #state{handler = Handler, subproto_state = CopyState} = State) ->
     Response = handle_copy_send_rows(Rows, Handler, CopyState, State),
     Response = handle_copy_send_rows(Rows, Handler, CopyState, State),
-    {reply, Response, State}.
+    {reply, Response, State};
+
+handle_call(activate, _From, State) ->
+    Res = activate_socket(State),
+    {reply, Res, State}.
 
 
 handle_cast({{Method, From, Ref} = Transport, Command, Args}, State)
 handle_cast({{Method, From, Ref} = Transport, Command, Args}, State)
   when ((Method == cast) or (Method == incremental)),
   when ((Method == cast) or (Method == incremental)),
@@ -278,6 +288,11 @@ handle_info({DataTag, Sock, Data2}, #state{data = Data, sock = Sock} = State)
   when DataTag == tcp; DataTag == ssl ->
   when DataTag == tcp; DataTag == ssl ->
     loop(State#state{data = <<Data/binary, Data2/binary>>});
     loop(State#state{data = <<Data/binary, Data2/binary>>});
 
 
+handle_info({Passive, Sock}, #state{sock = Sock} = State)
+  when Passive == ssl_passive; Passive == tcp_passive ->
+    NewState = send_socket_pasive(State),
+    {noreply, NewState};
+
 handle_info({Closed, Sock}, #state{sock = Sock} = State)
 handle_info({Closed, Sock}, #state{sock = Sock} = State)
   when Closed == tcp_closed; Closed == ssl_closed ->
   when Closed == tcp_closed; Closed == ssl_closed ->
     {stop, sock_closed, flush_queue(State#state{sock = undefined}, {error, sock_closed})};
     {stop, sock_closed, flush_queue(State#state{sock = undefined}, {error, sock_closed})};
@@ -313,6 +328,13 @@ format_status(terminate, [_PDict, State]) ->
   State#state{rows = information_redacted}.
   State#state{rows = information_redacted}.
 
 
 %% -- internal functions --
 %% -- internal functions --
+-spec send_socket_pasive(pg_sock()) -> pg_sock().
+send_socket_pasive(#state{subproto_state = #repl{receiver = Rec}} = State) when Rec =/= undefined ->
+    Rec ! {epgsql, self(), socket_passive},
+    State;
+send_socket_pasive(State) ->
+    ok = activate_socket(State),
+    State.
 
 
 -spec command_new(transport(), epgsql_command:command(), any(), pg_sock()) ->
 -spec command_new(transport(), epgsql_command:command(), any(), pg_sock()) ->
                          Result when
                          Result when
@@ -407,13 +429,23 @@ command_next(#state{current_cmd = PrevCmd,
                         results = []}
                         results = []}
     end.
     end.
 
 
-
 setopts(#state{mod = Mod, sock = Sock}, Opts) ->
 setopts(#state{mod = Mod, sock = Sock}, Opts) ->
     case Mod of
     case Mod of
         gen_tcp -> inet:setopts(Sock, Opts);
         gen_tcp -> inet:setopts(Sock, Opts);
         ssl     -> ssl:setopts(Sock, Opts)
         ssl     -> ssl:setopts(Sock, Opts)
     end.
     end.
 
 
+-spec get_socket_active(pg_sock()) -> epgsql:socket_active().
+get_socket_active(#state{handler = on_replication, connect_opts = #{socket_active := Active}}) ->
+    Active;
+get_socket_active(_State) ->
+    true.
+
+-spec activate_socket(pg_sock()) -> ok | {error, inet:posix() | any()}.
+activate_socket(State) ->
+  Active = get_socket_active(State),
+  setopts(State, [{active, Active}]).
+
 %% This one only used in connection initiation to send client's
 %% This one only used in connection initiation to send client's
 %% `StartupMessage' and `SSLRequest' packets
 %% `StartupMessage' and `SSLRequest' packets
 -spec send(pg_sock(), iodata()) -> ok | {error, any()}.
 -spec send(pg_sock(), iodata()) -> ok | {error, any()}.

+ 7 - 3
test/epgsql_cth.erl

@@ -150,7 +150,7 @@ init_database(Config) ->
 
 
     {ok, Cwd} = file:get_cwd(),
     {ok, Cwd} = file:get_cwd(),
     PgDataDir = filename:append(Cwd, "datadir"),
     PgDataDir = filename:append(Cwd, "datadir"),
-    {ok, _} = exec:run(Initdb ++ " --locale en_US.UTF8 " ++ PgDataDir, [sync,stdout,stderr]),
+    {ok, _} = exec:run(Initdb ++ " --locale en_US.UTF-8 " ++ PgDataDir, [sync, stdout, stderr]),
     [{datadir, PgDataDir}|Config].
     [{datadir, PgDataDir}|Config].
 
 
 get_version(Config) ->
 get_version(Config) ->
@@ -200,6 +200,10 @@ write_pg_hba_config(Config) ->
     Version = ?config(version, Config),
     Version = ?config(version, Config),
 
 
     User = os:getenv("USER"),
     User = os:getenv("USER"),
+    ClientCert = case Version >= [12] of
+                   true -> "cert";
+                   false -> "cert clientcert=1"
+                 end,
     PGConfig = [
     PGConfig = [
         "local   all             ", User, "                              trust\n",
         "local   all             ", User, "                              trust\n",
         "host    template1       ", User, "              127.0.0.1/32    trust\n",
         "host    template1       ", User, "              127.0.0.1/32    trust\n",
@@ -209,7 +213,7 @@ write_pg_hba_config(Config) ->
         "host    epgsql_test_db1 epgsql_test             127.0.0.1/32    trust\n",
         "host    epgsql_test_db1 epgsql_test             127.0.0.1/32    trust\n",
         "host    epgsql_test_db1 epgsql_test_md5         127.0.0.1/32    md5\n",
         "host    epgsql_test_db1 epgsql_test_md5         127.0.0.1/32    md5\n",
         "host    epgsql_test_db1 epgsql_test_cleartext   127.0.0.1/32    password\n",
         "host    epgsql_test_db1 epgsql_test_cleartext   127.0.0.1/32    password\n",
-        "hostssl epgsql_test_db1 epgsql_test_cert        127.0.0.1/32    cert clientcert=1\n",
+        "hostssl epgsql_test_db1 epgsql_test_cert        127.0.0.1/32    ", ClientCert, "\n",
         "host    template1       ", User, "              ::1/128    trust\n",
         "host    template1       ", User, "              ::1/128    trust\n",
         "host    ", User, "      ", User, "              ::1/128    trust\n",
         "host    ", User, "      ", User, "              ::1/128    trust\n",
         "hostssl postgres        ", User, "              ::1/128    trust\n",
         "hostssl postgres        ", User, "              ::1/128    trust\n",
@@ -217,7 +221,7 @@ write_pg_hba_config(Config) ->
         "host    epgsql_test_db1 epgsql_test             ::1/128    trust\n",
         "host    epgsql_test_db1 epgsql_test             ::1/128    trust\n",
         "host    epgsql_test_db1 epgsql_test_md5         ::1/128    md5\n",
         "host    epgsql_test_db1 epgsql_test_md5         ::1/128    md5\n",
         "host    epgsql_test_db1 epgsql_test_cleartext   ::1/128    password\n",
         "host    epgsql_test_db1 epgsql_test_cleartext   ::1/128    password\n",
-        "hostssl epgsql_test_db1 epgsql_test_cert        ::1/128    cert clientcert=1\n" |
+        "hostssl epgsql_test_db1 epgsql_test_cert        ::1/128    ", ClientCert, "\n" |
         case Version >= [10] of
         case Version >= [10] of
             true ->
             true ->
                 %% See
                 %% See

+ 144 - 85
test/epgsql_replication_SUITE.erl

@@ -1,110 +1,169 @@
 -module(epgsql_replication_SUITE).
 -module(epgsql_replication_SUITE).
+
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include("epgsql.hrl").
 -include("epgsql.hrl").
 
 
--export([
-    init_per_suite/1,
-    all/0,
-    end_per_suite/1,
+-export([all/0,
+         init_per_suite/1,
+         end_per_suite/1,
 
 
-    connect_in_repl_mode/1,
-    create_drop_replication_slot/1,
-    replication_sync/1,
-    replication_async/1,
+         connect_in_repl_mode/1,
+         create_drop_replication_slot/1,
+         replication_sync/1,
+         replication_async/1,
+         replication_async_active_n_socket/1,
+         replication_sync_active_n_socket/1,
 
 
-    %% Callbacks
-    handle_x_log_data/4
-]).
+         %% Callbacks
+         handle_x_log_data/4
+        ]).
 
 
 init_per_suite(Config) ->
 init_per_suite(Config) ->
-    [{module, epgsql}|Config].
+  [{module, epgsql} | Config].
 
 
 end_per_suite(_Config) ->
 end_per_suite(_Config) ->
-    ok.
+  ok.
 
 
 all() ->
 all() ->
-    [
-     connect_in_repl_mode,
-     create_drop_replication_slot,
-     replication_async,
-     replication_sync
-    ].
+  [connect_in_repl_mode,
+   create_drop_replication_slot,
+   replication_async,
+   replication_sync,
+   replication_async_active_n_socket,
+   replication_sync_active_n_socket
+  ].
 
 
 connect_in_repl_mode(Config) ->
 connect_in_repl_mode(Config) ->
-    epgsql_ct:connect_only(Config, ["epgsql_test_replication",
-        "epgsql_test_replication",
-        [{database, "epgsql_test_db1"}, {replication, "database"}]]).
+  epgsql_ct:connect_only(
+    Config,
+    ["epgsql_test_replication",
+     "epgsql_test_replication",
+     [{database, "epgsql_test_db1"}, {replication, "database"}]
+    ]).
 
 
 create_drop_replication_slot(Config) ->
 create_drop_replication_slot(Config) ->
-    Module = ?config(module, Config),
-    epgsql_ct:with_connection(
-        Config,
-        fun(C) ->
-            {ok, Cols, Rows} = Module:squery(C, "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
-            [#column{name = <<"slot_name">>}, #column{name = <<"consistent_point">>},
-                #column{name = <<"snapshot_name">>}, #column{name = <<"output_plugin">>}] = Cols,
-            [{<<"epgsql_test">>, _, _, <<"test_decoding">>}] = Rows,
-            [{ok, _, _}, {ok, _, _}] = Module:squery(C, "DROP_REPLICATION_SLOT ""epgsql_test""")
-        end,
-        "epgsql_test_replication",
-        [{replication, "database"}]).
+  epgsql_ct:with_connection(
+    Config,
+    fun(C) ->
+        create_replication_slot(Config, C),
+        drop_replication_slot(Config, C)
+    end,
+    "epgsql_test_replication",
+    [{replication, "database"}]).
 
 
 replication_async(Config) ->
 replication_async(Config) ->
-    replication_test_run(Config, self()).
+  replication_test_run(Config, self()).
 
 
 replication_sync(Config) ->
 replication_sync(Config) ->
-    replication_test_run(Config, ?MODULE).
+  replication_test_run(Config, ?MODULE).
+
+replication_async_active_n_socket(Config) ->
+  replication_test_run(Config, self(), [{socket_active, 1}]).
+
+replication_sync_active_n_socket(Config) ->
+  replication_test_run(Config, ?MODULE, [{socket_active, 1}]).
 
 
 replication_test_run(Config, Callback) ->
 replication_test_run(Config, Callback) ->
-    Module = ?config(module, Config),
-    epgsql_ct:with_connection(
-        Config,
-        fun(C) ->
-            {ok, _, _} = Module:squery(C, "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
-
-            %% new connection because main id in a replication mode
-            epgsql_ct:with_connection(
-                Config,
-                fun(C2) ->
-                    [{ok, 1},{ok, 1}] = Module:squery(C2,
-                        "insert into test_table1 (id, value) values (5, 'five');delete from test_table1 where id = 5;")
-                end),
-
-            Module:start_replication(C, "epgsql_test", Callback, {C, self()}, "0/0"),
-            ok = receive_replication_msgs(
-                [<<"table public.test_table1: INSERT: id[integer]:5 value[text]:'five'">>,
-                    <<"table public.test_table1: DELETE: id[integer]:5">>], C, [])
-        end,
-        "epgsql_test_replication",
-        [{replication, "database"}]),
-    %% cleanup
-    epgsql_ct:with_connection(
-        Config,
-        fun(C) ->
-            [{ok, _, _}, {ok, _, _}] = Module:squery(C, "DROP_REPLICATION_SLOT ""epgsql_test""")
-        end,
-        "epgsql_test_replication",
-        [{replication, "database"}]).
-
-receive_replication_msgs(Pattern, Pid, ReceivedMsgs) ->
-    receive
-        {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"BEGIN", _/binary>>}} ->
-            receive_replication_msgs(Pattern, Pid, [begin_msg | ReceivedMsgs]);
-        {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"COMMIT", _/binary>>}} ->
-            case lists:reverse(ReceivedMsgs) of
-                [begin_msg, row_msg | _] -> ok;
-                _ -> error_replication_messages_not_received
-            end;
-        {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, Msg}} ->
-            [Msg | T] = Pattern,
-            receive_replication_msgs(T, Pid, [row_msg | ReceivedMsgs])
-    after
-        60000 ->
-            error_timeout
-    end.
+  replication_test_run(Config, Callback, []).
+
+replication_test_run(Config, Callback, ExtOpts) ->
+  Module = ?config(module, Config),
+  {Queries, ReplicationMsgs} = gen_query_and_replication_msgs(lists:seq(100, 110)),
+  epgsql_ct:with_connection(
+    Config,
+    fun(C) ->
+        create_replication_slot(Config, C),
+        %% new connection because main is in the replication mode
+        epgsql_ct:with_connection(
+          Config,
+          fun(C2) ->
+              ExpectedResult = lists:duplicate(length(Queries), {ok, 1}),
+              Res = Module:squery(C2, Queries),
+              ?assertEqual(ExpectedResult, Res)
+          end),
+        Module:start_replication(C, "epgsql_test", Callback, {C, self()}, "0/0"),
+        ok = receive_replication_msgs(Module, ReplicationMsgs, C, [])
+    end,
+    "epgsql_test_replication",
+    [{replication, "database"} | ExtOpts]),
+  %% cleanup
+  epgsql_ct:with_connection(
+    Config,
+    fun(C) -> drop_replication_slot(Config, C) end,
+    "epgsql_test_replication",
+    [{replication, "database"}]).
+
+create_replication_slot(Config, Connection) ->
+  Module = ?config(module, Config),
+  {ok, Cols, Rows} =
+    Module:squery(Connection,
+                  "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
+  ?assertMatch([#column{name = <<"slot_name">>},
+                #column{name = <<"consistent_point">>},
+                #column{name = <<"snapshot_name">>},
+                #column{name = <<"output_plugin">>}
+               ],
+               Cols),
+  ?assertMatch([{<<"epgsql_test">>, _, _, <<"test_decoding">>}], Rows).
+
+drop_replication_slot(Config, Connection) ->
+  Module = ?config(module, Config),
+  Result = Module:squery(Connection, "DROP_REPLICATION_SLOT ""epgsql_test"""),
+  case ?config(version, ?config(pg_config, Config)) >= [13, 0] of
+    true -> ?assertMatch({ok, _, _}, Result);
+    false -> ?assertMatch([{ok, _, _}, {ok, _, _}], Result)
+  end.
+
+gen_query_and_replication_msgs(Ids) ->
+  QInsFmt = "INSERT INTO test_table1 (id, value) VALUES (~b, '~s');",
+  QDelFmt = "DELETE FROM test_table1 WHERE id = ~b;",
+  RmInsFmt = "table public.test_table1: INSERT: id[integer]:~b value[text]:'~s'",
+  RmDelFmt = "table public.test_table1: DELETE: id[integer]:~b",
+  LongBin = base64:encode(crypto:strong_rand_bytes(254)),
+  lists:foldl(
+    fun(Id, {Qs, RMs}) ->
+        QIns = lists:flatten(io_lib:format(QInsFmt, [Id, LongBin])),
+        QDel = lists:flatten(io_lib:format(QDelFmt, [Id])),
+        RmIns = iolist_to_binary(io_lib:format(RmInsFmt, [Id, LongBin])),
+        RmDel = iolist_to_binary(io_lib:format(RmDelFmt, [Id])),
+        {Qs ++ [QIns, QDel], RMs ++ [RmIns, RmDel]}
+    end,
+    {[], []},
+    Ids).
+
+receive_replication_msgs(Module, Pattern, Pid, ReceivedMsgs) ->
+  receive
+    {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"BEGIN", _/binary>>}} ->
+      receive_replication_msgs(Module, Pattern, Pid, [begin_msg | ReceivedMsgs]);
+    {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"COMMIT", _/binary>>}} ->
+      ensure_no_socket_passive_msgs(Module, Pid),
+      case lists:reverse(ReceivedMsgs) of
+        [begin_msg, row_msg | _] -> ok;
+        _ -> error_replication_messages_not_received
+      end;
+    {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, Msg}} ->
+      [Msg | T] = Pattern,
+      receive_replication_msgs(Module, T, Pid, [row_msg | ReceivedMsgs]);
+    {epgsql, Pid, socket_passive} ->
+      Module:activate(Pid),
+      receive_replication_msgs(Module, Pattern, Pid, ReceivedMsgs)
+  after
+    60000 ->
+      error_timeout
+  end.
+
+ensure_no_socket_passive_msgs(Module, Pid) ->
+  receive
+    {epgsql, Pid, socket_passive} ->
+      Module:activate(Pid),
+      ensure_no_socket_passive_msgs(Module, Pid)
+  after
+    100 ->
+      ok
+  end.
 
 
 handle_x_log_data(StartLSN, EndLSN, Data, CbState) ->
 handle_x_log_data(StartLSN, EndLSN, Data, CbState) ->
-    {C, Pid} = CbState,
-    Pid ! {epgsql, C, {x_log_data, StartLSN, EndLSN, Data}},
-    {ok, EndLSN, EndLSN, CbState}.
+  {C, Pid} = CbState,
+  Pid ! {epgsql, C, {x_log_data, StartLSN, EndLSN, Data}},
+  {ok, EndLSN, EndLSN, CbState}.