Просмотр исходного кода

Merge pull request #279 from featalion/handle-unexpected-messages

Handle ReadyForQuery synchronously in the replication mode
Sergey Prokhorov 2 лет назад
Родитель
Сommit
041b70e4cc
2 измененных файлов с 70 добавлено и 9 удалено
  1. 7 3
      src/commands/epgsql_cmd_start_replication.erl
  2. 63 6
      test/epgsql_replication_SUITE.erl

+ 7 - 3
src/commands/epgsql_cmd_start_replication.erl

@@ -12,7 +12,6 @@
 
 -type response() :: ok | {error, epgsql:query_error()}.
 
--include("epgsql.hrl").
 -include("protocol.hrl").
 -include("../epgsql_replication.hrl").
 
@@ -65,8 +64,13 @@ execute(Sock, #start_repl{slot = ReplicationSlot, callback = Callback,
 %% CopyBothResponse
 handle_message(?COPY_BOTH_RESPONSE, _Data, Sock, _State) ->
     {finish, ok, ok, epgsql_sock:set_packet_handler(on_replication, Sock)};
-handle_message(?ERROR, Error, _Sock, _State) ->
+handle_message(?ERROR, Error, Sock, State) ->
+    %% In the case of error, Postgresql replication protocol sends a ReadyForQuery message.
+    %% Adds an error to results to handle it later in the ?READY_FOR_QUERY branch.
     Result = {error, Error},
-    {sync_required, Result};
+    {add_result, Result, Result, Sock, State};
+handle_message(?READY_FOR_QUERY, _Data, Sock, _State) ->
+    [Error = {error, _}] = epgsql_sock:get_results(Sock), % assert a single error response
+    {finish, Error, done, Sock};
 handle_message(_, _, _, _) ->
     unknown.

+ 63 - 6
test/epgsql_replication_SUITE.erl

@@ -10,11 +10,13 @@
 
          connect_in_repl_mode/1,
          create_drop_replication_slot/1,
+         no_replication_slot/1,
          replication_sync/1,
          replication_async/1,
          replication_async_active_n_socket/1,
          replication_sync_active_n_socket/1,
          replication_async_active_n_ssl/1,
+         two_replications_on_same_slot/1,
 
          %% Callbacks
          handle_x_log_data/4
@@ -29,11 +31,13 @@ end_per_suite(_Config) ->
 all() ->
   [connect_in_repl_mode,
    create_drop_replication_slot,
+   no_replication_slot,
    replication_async,
    replication_sync,
    replication_async_active_n_socket,
    replication_sync_active_n_socket,
-   replication_async_active_n_ssl
+   replication_async_active_n_ssl,
+   two_replications_on_same_slot
   ].
 
 connect_in_repl_mode(Config) ->
@@ -75,6 +79,56 @@ replication_async_active_n_ssl(Config) ->
     noop.
 -endif.
 
+two_replications_on_same_slot(Config) ->
+  Module = ?config(module, Config),
+  User = "epgsql_test_replication",
+  SlotName = "epgsql_test",
+  Parent = self(),
+  epgsql_ct:with_connection(
+    Config,
+    fun(C) ->
+        create_replication_slot(Config, C),
+        Res1 = Module:start_replication(C, SlotName, Parent, {C, Parent}, "0/0"),
+        ?assertEqual(ok, Res1),
+        ErrorReceivedMsg = error_received,
+        spawn(
+          fun() ->
+              %% Test that the second connection receives the ReadyForQuery message from PG
+              %% synchronously after getting an error that the slot is occupied:
+              %%   ReadyForQuery (B), Byte1('Z'), Int32(5), Byte1
+              %% https://www.postgresql.org/docs/current/protocol-message-formats.html
+              epgsql_ct:with_connection(
+                Config,
+                fun(C2) ->
+                    Res2 = Module:start_replication(C2, SlotName, self(), {C2, self()}, "0/0"),
+                    ?assertMatch({error, #error{codename = object_in_use}}, Res2),
+                    Parent ! ErrorReceivedMsg
+                end,
+                User,
+                [{replication, "database"}])
+          end),
+        receive
+          Result -> ?assertEqual(ErrorReceivedMsg, Result)
+        after
+          1000 -> ?assert(false, "Expected answer hasn't been received in 1000ms when "
+                                 "establishing a second connection to the same replication slot")
+        end
+    end,
+    User,
+    [{replication, "database"}]),
+  drop_replication_slot(Config).
+
+no_replication_slot(Config) ->
+  Module = ?config(module, Config),
+  epgsql_ct:with_connection(
+    Config,
+    fun(C) ->
+        Res = Module:start_replication(C, "epgsql_test", self(), {C, self()}, "0/0"),
+        ?assertMatch({error, #error{codename = undefined_object}}, Res)
+    end,
+    "epgsql_test_replication",
+    [{replication, "database"}]).
+
 replication_test_run(Config, Callback) ->
   replication_test_run(Config, Callback, []).
 
@@ -99,11 +153,7 @@ replication_test_run(Config, Callback, ExtOpts) ->
     "epgsql_test_replication",
     [{replication, "database"} | ExtOpts]),
   %% cleanup
-  epgsql_ct:with_connection(
-    Config,
-    fun(C) -> drop_replication_slot(Config, C) end,
-    "epgsql_test_replication",
-    [{replication, "database"}]).
+  drop_replication_slot(Config).
 
 create_replication_slot(Config, Connection) ->
   Module = ?config(module, Config),
@@ -118,6 +168,13 @@ create_replication_slot(Config, Connection) ->
                Cols),
   ?assertMatch([{<<"epgsql_test">>, _, _, <<"test_decoding">>}], Rows).
 
+drop_replication_slot(Config) ->
+  epgsql_ct:with_connection(
+    Config,
+    fun(C) -> drop_replication_slot(Config, C) end,
+    "epgsql_test_replication",
+    [{replication, "database"}]).
+
 drop_replication_slot(Config, Connection) ->
   Module = ?config(module, Config),
   Result = Module:squery(Connection, "DROP_REPLICATION_SLOT ""epgsql_test"""),