Browse Source

Handle ReadyForQuery when a replication slot is in use by another process

Yury Yantsevich 2 years ago
parent
commit
e7e2487316

+ 8 - 0
src/commands/epgsql_cmd_start_replication.erl

@@ -65,8 +65,16 @@ execute(Sock, #start_repl{slot = ReplicationSlot, callback = Callback,
 %% CopyBothResponse
 handle_message(?COPY_BOTH_RESPONSE, _Data, Sock, _State) ->
     {finish, ok, ok, epgsql_sock:set_packet_handler(on_replication, Sock)};
+handle_message(?ERROR, #error{codename = object_in_use} = Error, Sock, State) ->
+    %% A replication slot is in use already. In this case, protocol sends a ReadyForQuery message.
+    %% Adds an error to results to handle it later in the ?READY_FOR_QUERY branch.
+    Result = {error, Error},
+    {add_result, Result, Result, Sock, State};
 handle_message(?ERROR, Error, _Sock, _State) ->
     Result = {error, Error},
     {sync_required, Result};
+handle_message(?READY_FOR_QUERY, _Data, Sock, _State) ->
+    [Error = {error, _}] = epgsql_sock:get_results(Sock), % assert a single error response
+    {sync_required, Error};
 handle_message(_, _, _, _) ->
     unknown.

+ 0 - 2
src/epgsql_command.erl

@@ -67,7 +67,5 @@ execute(Command, PgSock, CmdState) ->
 
 -spec handle_message(command(), Type :: byte(), Payload :: binary() | epgsql:query_error(),
                      epgsql_sock:pg_sock(), state()) -> handle_message_return().
-handle_message(undefined = _Command, _Type, _Payload, _PgSock, _State) ->
-    unknown;
 handle_message(Command, Type, Payload, PgSock, State) ->
     Command:handle_message(Type, Payload, PgSock, State).

+ 5 - 15
test/epgsql_replication_SUITE.erl

@@ -90,18 +90,19 @@ two_replications_on_same_slot(Config) ->
         spawn(
           fun() ->
               %% This connection will be terminated due to an unexpected message:
-              %%   EmptyQueryResponse (B), Byte1('I'), Int32(4)
+              %%   ReadyForQuery (B), Byte1('Z'), Int32(5), Byte1
               %% https://www.postgresql.org/docs/current/protocol-message-formats.html
               process_flag(trap_exit, true),
               C2 = epgsql_ct:connect(Config, User, [{replication, "database"}]),
               Res2 = Module:start_replication(C2, "epgsql_test", self(), {C2, self()}, "0/0"),
               ?assertMatch({error, _}, Res2),
-              cleanup_and_notify(Parent)
+              Parent ! error_received
           end),
         receive
-          Result -> ?assertEqual(terminated, Result)
+          Result -> ?assertEqual(error_received, Result)
         after
-          1000 -> ?assert(false, "Second connection hasn't been closed")
+          1000 -> ?assert(false, "Error hasn't been received when establishing "
+                                 "a second connection to the same replication slot")
         end
     end,
     User,
@@ -210,17 +211,6 @@ ensure_no_socket_passive_msgs(Module, Pid) ->
       ok
   end.
 
-cleanup_and_notify(Parent) ->
-  receive
-    {'EXIT', _Pid, {error, _}} ->
-      Parent ! terminated;
-    Msg ->
-      Parent ! Msg
-  after
-    100 ->
-      Parent ! no_messages_received
-  end.
-
 handle_x_log_data(StartLSN, EndLSN, Data, CbState) ->
   {C, Pid} = CbState,
   Pid ! {epgsql, C, {x_log_data, StartLSN, EndLSN, Data}},