Browse Source

Wait for ReadyForQuery in case of any error in replication command

Yury Yantsevich 2 years ago
parent
commit
790ec73cd3

+ 3 - 7
src/commands/epgsql_cmd_start_replication.erl

@@ -12,7 +12,6 @@
 
 
 -type response() :: ok | {error, epgsql:query_error()}.
 -type response() :: ok | {error, epgsql:query_error()}.
 
 
--include("epgsql.hrl").
 -include("protocol.hrl").
 -include("protocol.hrl").
 -include("../epgsql_replication.hrl").
 -include("../epgsql_replication.hrl").
 
 
@@ -65,16 +64,13 @@ execute(Sock, #start_repl{slot = ReplicationSlot, callback = Callback,
 %% CopyBothResponse
 %% CopyBothResponse
 handle_message(?COPY_BOTH_RESPONSE, _Data, Sock, _State) ->
 handle_message(?COPY_BOTH_RESPONSE, _Data, Sock, _State) ->
     {finish, ok, ok, epgsql_sock:set_packet_handler(on_replication, Sock)};
     {finish, ok, ok, epgsql_sock:set_packet_handler(on_replication, Sock)};
-handle_message(?ERROR, #error{codename = object_in_use} = Error, Sock, State) ->
-    %% A replication slot is in use already. In this case, protocol sends a ReadyForQuery message.
+handle_message(?ERROR, Error, Sock, State) ->
+    %% In the case of error, Postgresql replication protocol sends a ReadyForQuery message.
     %% Adds an error to results to handle it later in the ?READY_FOR_QUERY branch.
     %% Adds an error to results to handle it later in the ?READY_FOR_QUERY branch.
     Result = {error, Error},
     Result = {error, Error},
     {add_result, Result, Result, Sock, State};
     {add_result, Result, Result, Sock, State};
-handle_message(?ERROR, Error, _Sock, _State) ->
-    Result = {error, Error},
-    {sync_required, Result};
 handle_message(?READY_FOR_QUERY, _Data, Sock, _State) ->
 handle_message(?READY_FOR_QUERY, _Data, Sock, _State) ->
     [Error = {error, _}] = epgsql_sock:get_results(Sock), % assert a single error response
     [Error = {error, _}] = epgsql_sock:get_results(Sock), % assert a single error response
-    {sync_required, Error};
+    {finish, Error, done, Sock};
 handle_message(_, _, _, _) ->
 handle_message(_, _, _, _) ->
     unknown.
     unknown.

+ 0 - 1
test/epgsql_ct.erl

@@ -5,7 +5,6 @@
 -export([
 -export([
     connection_data/1,
     connection_data/1,
     connect/1,
     connect/1,
-    connect/3,
     connect_only/2,
     connect_only/2,
     with_connection/2,
     with_connection/2,
     with_connection/3,
     with_connection/3,

+ 17 - 10
test/epgsql_replication_SUITE.erl

@@ -80,29 +80,36 @@ replication_async_active_n_ssl(Config) ->
 two_replications_on_same_slot(Config) ->
 two_replications_on_same_slot(Config) ->
   Module = ?config(module, Config),
   Module = ?config(module, Config),
   User = "epgsql_test_replication",
   User = "epgsql_test_replication",
+  SlotName = "epgsql_test",
   Parent = self(),
   Parent = self(),
   epgsql_ct:with_connection(
   epgsql_ct:with_connection(
     Config,
     Config,
     fun(C) ->
     fun(C) ->
         create_replication_slot(Config, C),
         create_replication_slot(Config, C),
-        Res1 = Module:start_replication(C, "epgsql_test", Parent, {C, Parent}, "0/0"),
+        Res1 = Module:start_replication(C, SlotName, Parent, {C, Parent}, "0/0"),
         ?assertEqual(ok, Res1),
         ?assertEqual(ok, Res1),
+        ErrorReceivedMsg = error_received,
         spawn(
         spawn(
           fun() ->
           fun() ->
-              %% This connection will be terminated due to an unexpected message:
+              %% Test that the second connection receives the ReadyForQuery message from PG
+              %% synchronously after getting an error that the slot is occupied:
               %%   ReadyForQuery (B), Byte1('Z'), Int32(5), Byte1
               %%   ReadyForQuery (B), Byte1('Z'), Int32(5), Byte1
               %% https://www.postgresql.org/docs/current/protocol-message-formats.html
               %% https://www.postgresql.org/docs/current/protocol-message-formats.html
-              process_flag(trap_exit, true),
-              C2 = epgsql_ct:connect(Config, User, [{replication, "database"}]),
-              Res2 = Module:start_replication(C2, "epgsql_test", self(), {C2, self()}, "0/0"),
-              ?assertMatch({error, _}, Res2),
-              Parent ! error_received
+              epgsql_ct:with_connection(
+                Config,
+                fun(C2) ->
+                    Res2 = Module:start_replication(C2, SlotName, self(), {C2, self()}, "0/0"),
+                    ?assertMatch({error, #error{codename = object_in_use}}, Res2),
+                    Parent ! ErrorReceivedMsg
+                end,
+                User,
+                [{replication, "database"}])
           end),
           end),
         receive
         receive
-          Result -> ?assertEqual(error_received, Result)
+          Result -> ?assertEqual(ErrorReceivedMsg, Result)
         after
         after
-          1000 -> ?assert(false, "Error hasn't been received when establishing "
-                                 "a second connection to the same replication slot")
+          1000 -> ?assert(false, "Expected answer hasn't been received in 1000ms when "
+                                 "establishing a second connection to the same replication slot")
         end
         end
     end,
     end,
     User,
     User,