|
@@ -15,6 +15,7 @@
|
|
|
replication_async_active_n_socket/1,
|
|
|
replication_sync_active_n_socket/1,
|
|
|
replication_async_active_n_ssl/1,
|
|
|
+ two_replications_on_same_slot/1,
|
|
|
|
|
|
%% Callbacks
|
|
|
handle_x_log_data/4
|
|
@@ -33,7 +34,8 @@ all() ->
|
|
|
replication_sync,
|
|
|
replication_async_active_n_socket,
|
|
|
replication_sync_active_n_socket,
|
|
|
- replication_async_active_n_ssl
|
|
|
+ replication_async_active_n_ssl,
|
|
|
+ two_replications_on_same_slot
|
|
|
].
|
|
|
|
|
|
connect_in_repl_mode(Config) ->
|
|
@@ -75,6 +77,37 @@ replication_async_active_n_ssl(Config) ->
|
|
|
noop.
|
|
|
-endif.
|
|
|
|
|
|
+two_replications_on_same_slot(Config) ->
|
|
|
+ Module = ?config(module, Config),
|
|
|
+ User = "epgsql_test_replication",
|
|
|
+ Parent = self(),
|
|
|
+ epgsql_ct:with_connection(
|
|
|
+ Config,
|
|
|
+ fun(C) ->
|
|
|
+ create_replication_slot(Config, C),
|
|
|
+ Res1 = Module:start_replication(C, "epgsql_test", Parent, {C, Parent}, "0/0"),
|
|
|
+ ?assertEqual(ok, Res1),
|
|
|
+ spawn(
|
|
|
+ fun() ->
|
|
|
+ %% This connection will be terminated due to an unexpected message:
|
|
|
+ %% EmptyQueryResponse (B), Byte1('I'), Int32(4)
|
|
|
+ %% https://www.postgresql.org/docs/current/protocol-message-formats.html
|
|
|
+ process_flag(trap_exit, true),
|
|
|
+ C2 = epgsql_ct:connect(Config, User, [{replication, "database"}]),
|
|
|
+ Res2 = Module:start_replication(C2, "epgsql_test", self(), {C2, self()}, "0/0"),
|
|
|
+ ?assertMatch({error, _}, Res2),
|
|
|
+ cleanup_and_notify(Parent)
|
|
|
+ end),
|
|
|
+ receive
|
|
|
+ Result -> ?assertEqual(terminated, Result)
|
|
|
+ after
|
|
|
+ 1000 -> ?assert(false, "Second connection hasn't been closed")
|
|
|
+ end
|
|
|
+ end,
|
|
|
+ User,
|
|
|
+ [{replication, "database"}]),
|
|
|
+ drop_replication_slot(Config).
|
|
|
+
|
|
|
replication_test_run(Config, Callback) ->
|
|
|
replication_test_run(Config, Callback, []).
|
|
|
|
|
@@ -99,11 +132,7 @@ replication_test_run(Config, Callback, ExtOpts) ->
|
|
|
"epgsql_test_replication",
|
|
|
[{replication, "database"} | ExtOpts]),
|
|
|
%% cleanup
|
|
|
- epgsql_ct:with_connection(
|
|
|
- Config,
|
|
|
- fun(C) -> drop_replication_slot(Config, C) end,
|
|
|
- "epgsql_test_replication",
|
|
|
- [{replication, "database"}]).
|
|
|
+ drop_replication_slot(Config).
|
|
|
|
|
|
create_replication_slot(Config, Connection) ->
|
|
|
Module = ?config(module, Config),
|
|
@@ -118,6 +147,13 @@ create_replication_slot(Config, Connection) ->
|
|
|
Cols),
|
|
|
?assertMatch([{<<"epgsql_test">>, _, _, <<"test_decoding">>}], Rows).
|
|
|
|
|
|
+drop_replication_slot(Config) ->
|
|
|
+ epgsql_ct:with_connection(
|
|
|
+ Config,
|
|
|
+ fun(C) -> drop_replication_slot(Config, C) end,
|
|
|
+ "epgsql_test_replication",
|
|
|
+ [{replication, "database"}]).
|
|
|
+
|
|
|
drop_replication_slot(Config, Connection) ->
|
|
|
Module = ?config(module, Config),
|
|
|
Result = Module:squery(Connection, "DROP_REPLICATION_SLOT ""epgsql_test"""),
|
|
@@ -174,6 +210,17 @@ ensure_no_socket_passive_msgs(Module, Pid) ->
|
|
|
ok
|
|
|
end.
|
|
|
|
|
|
+cleanup_and_notify(Parent) ->
|
|
|
+ receive
|
|
|
+ {'EXIT', _Pid, {error, _}} ->
|
|
|
+ Parent ! terminated;
|
|
|
+ Msg ->
|
|
|
+ Parent ! Msg
|
|
|
+ after
|
|
|
+ 100 ->
|
|
|
+ Parent ! no_messages_received
|
|
|
+ end.
|
|
|
+
|
|
|
handle_x_log_data(StartLSN, EndLSN, Data, CbState) ->
|
|
|
{C, Pid} = CbState,
|
|
|
Pid ! {epgsql, C, {x_log_data, StartLSN, EndLSN, Data}},
|