epgsql_replication_SUITE.erl 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. -module(epgsql_replication_SUITE).
  2. -include_lib("eunit/include/eunit.hrl").
  3. -include_lib("common_test/include/ct.hrl").
  4. -include("epgsql.hrl").
  5. -export([all/0,
  6. init_per_suite/1,
  7. end_per_suite/1,
  8. connect_in_repl_mode/1,
  9. create_drop_replication_slot/1,
  10. replication_sync/1,
  11. replication_async/1,
  12. replication_async_active_n_socket/1,
  13. replication_sync_active_n_socket/1,
  14. replication_async_active_n_ssl/1,
  15. two_replications_on_same_slot/1,
  16. %% Callbacks
  17. handle_x_log_data/4
  18. ]).
  19. init_per_suite(Config) ->
  20. [{module, epgsql} | Config].
  21. end_per_suite(_Config) ->
  22. ok.
  23. all() ->
  24. [connect_in_repl_mode,
  25. create_drop_replication_slot,
  26. replication_async,
  27. replication_sync,
  28. replication_async_active_n_socket,
  29. replication_sync_active_n_socket,
  30. replication_async_active_n_ssl,
  31. two_replications_on_same_slot
  32. ].
  33. connect_in_repl_mode(Config) ->
  34. epgsql_ct:connect_only(
  35. Config,
  36. ["epgsql_test_replication",
  37. "epgsql_test_replication",
  38. [{database, "epgsql_test_db1"}, {replication, "database"}]
  39. ]).
  40. create_drop_replication_slot(Config) ->
  41. epgsql_ct:with_connection(
  42. Config,
  43. fun(C) ->
  44. create_replication_slot(Config, C),
  45. drop_replication_slot(Config, C)
  46. end,
  47. "epgsql_test_replication",
  48. [{replication, "database"}]).
  49. replication_async(Config) ->
  50. replication_test_run(Config, self()).
  51. replication_sync(Config) ->
  52. replication_test_run(Config, ?MODULE).
  53. replication_async_active_n_socket(Config) ->
  54. replication_test_run(Config, self(), [{socket_active, 1}]).
  55. replication_sync_active_n_socket(Config) ->
  56. replication_test_run(Config, ?MODULE, [{socket_active, 1}]).
  57. -ifdef(OTP_RELEASE).
  58. replication_async_active_n_ssl(Config) ->
  59. replication_test_run(Config, self(), [{socket_active, 1}, {ssl, require}]).
  60. -else.
  61. %% {active, N} for SSL is only supported on OTP-21+
  62. replication_async_active_n_ssl(Config) ->
  63. noop.
  64. -endif.
  65. two_replications_on_same_slot(Config) ->
  66. Module = ?config(module, Config),
  67. User = "epgsql_test_replication",
  68. SlotName = "epgsql_test",
  69. Parent = self(),
  70. epgsql_ct:with_connection(
  71. Config,
  72. fun(C) ->
  73. create_replication_slot(Config, C),
  74. Res1 = Module:start_replication(C, SlotName, Parent, {C, Parent}, "0/0"),
  75. ?assertEqual(ok, Res1),
  76. ErrorReceivedMsg = error_received,
  77. spawn(
  78. fun() ->
  79. %% Test that the second connection receives the ReadyForQuery message from PG
  80. %% synchronously after getting an error that the slot is occupied:
  81. %% ReadyForQuery (B), Byte1('Z'), Int32(5), Byte1
  82. %% https://www.postgresql.org/docs/current/protocol-message-formats.html
  83. epgsql_ct:with_connection(
  84. Config,
  85. fun(C2) ->
  86. Res2 = Module:start_replication(C2, SlotName, self(), {C2, self()}, "0/0"),
  87. ?assertMatch({error, #error{codename = object_in_use}}, Res2),
  88. Parent ! ErrorReceivedMsg
  89. end,
  90. User,
  91. [{replication, "database"}])
  92. end),
  93. receive
  94. Result -> ?assertEqual(ErrorReceivedMsg, Result)
  95. after
  96. 1000 -> ?assert(false, "Expected answer hasn't been received in 1000ms when "
  97. "establishing a second connection to the same replication slot")
  98. end
  99. end,
  100. User,
  101. [{replication, "database"}]),
  102. drop_replication_slot(Config).
  103. replication_test_run(Config, Callback) ->
  104. replication_test_run(Config, Callback, []).
  105. replication_test_run(Config, Callback, ExtOpts) ->
  106. Module = ?config(module, Config),
  107. {Queries, ReplicationMsgs} = gen_query_and_replication_msgs(lists:seq(100, 110)),
  108. epgsql_ct:with_connection(
  109. Config,
  110. fun(C) ->
  111. create_replication_slot(Config, C),
  112. %% new connection because main is in the replication mode
  113. epgsql_ct:with_connection(
  114. Config,
  115. fun(C2) ->
  116. ExpectedResult = lists:duplicate(length(Queries), {ok, 1}),
  117. Res = Module:squery(C2, Queries),
  118. ?assertEqual(ExpectedResult, Res)
  119. end),
  120. Module:start_replication(C, "epgsql_test", Callback, {C, self()}, "0/0"),
  121. ok = receive_replication_msgs(Module, ReplicationMsgs, C, [])
  122. end,
  123. "epgsql_test_replication",
  124. [{replication, "database"} | ExtOpts]),
  125. %% cleanup
  126. drop_replication_slot(Config).
  127. create_replication_slot(Config, Connection) ->
  128. Module = ?config(module, Config),
  129. {ok, Cols, Rows} =
  130. Module:squery(Connection,
  131. "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
  132. ?assertMatch([#column{name = <<"slot_name">>},
  133. #column{name = <<"consistent_point">>},
  134. #column{name = <<"snapshot_name">>},
  135. #column{name = <<"output_plugin">>}
  136. ],
  137. Cols),
  138. ?assertMatch([{<<"epgsql_test">>, _, _, <<"test_decoding">>}], Rows).
  139. drop_replication_slot(Config) ->
  140. epgsql_ct:with_connection(
  141. Config,
  142. fun(C) -> drop_replication_slot(Config, C) end,
  143. "epgsql_test_replication",
  144. [{replication, "database"}]).
  145. drop_replication_slot(Config, Connection) ->
  146. Module = ?config(module, Config),
  147. Result = Module:squery(Connection, "DROP_REPLICATION_SLOT ""epgsql_test"""),
  148. case ?config(version, ?config(pg_config, Config)) >= [13, 0] of
  149. true -> ?assertMatch({ok, _, _}, Result);
  150. false -> ?assertMatch([{ok, _, _}, {ok, _, _}], Result)
  151. end.
  152. gen_query_and_replication_msgs(Ids) ->
  153. QInsFmt = "INSERT INTO test_table1 (id, value) VALUES (~b, '~s');",
  154. QDelFmt = "DELETE FROM test_table1 WHERE id = ~b;",
  155. RmInsFmt = "table public.test_table1: INSERT: id[integer]:~b value[text]:'~s'",
  156. RmDelFmt = "table public.test_table1: DELETE: id[integer]:~b",
  157. LongBin = base64:encode(crypto:strong_rand_bytes(254)),
  158. lists:foldl(
  159. fun(Id, {Qs, RMs}) ->
  160. QIns = lists:flatten(io_lib:format(QInsFmt, [Id, LongBin])),
  161. QDel = lists:flatten(io_lib:format(QDelFmt, [Id])),
  162. RmIns = iolist_to_binary(io_lib:format(RmInsFmt, [Id, LongBin])),
  163. RmDel = iolist_to_binary(io_lib:format(RmDelFmt, [Id])),
  164. {Qs ++ [QIns, QDel], RMs ++ [RmIns, RmDel]}
  165. end,
  166. {[], []},
  167. Ids).
  168. receive_replication_msgs(Module, Pattern, Pid, ReceivedMsgs) ->
  169. receive
  170. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"BEGIN", _/binary>>}} ->
  171. receive_replication_msgs(Module, Pattern, Pid, [begin_msg | ReceivedMsgs]);
  172. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"COMMIT", _/binary>>}} ->
  173. ensure_no_socket_passive_msgs(Module, Pid),
  174. case lists:reverse(ReceivedMsgs) of
  175. [begin_msg, row_msg | _] -> ok;
  176. _ -> error_replication_messages_not_received
  177. end;
  178. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, Msg}} ->
  179. [Msg | T] = Pattern,
  180. receive_replication_msgs(Module, T, Pid, [row_msg | ReceivedMsgs]);
  181. {epgsql, Pid, socket_passive} ->
  182. Module:activate(Pid),
  183. receive_replication_msgs(Module, Pattern, Pid, ReceivedMsgs)
  184. after
  185. 60000 ->
  186. error_timeout
  187. end.
  188. ensure_no_socket_passive_msgs(Module, Pid) ->
  189. receive
  190. {epgsql, Pid, socket_passive} ->
  191. Module:activate(Pid),
  192. ensure_no_socket_passive_msgs(Module, Pid)
  193. after
  194. 100 ->
  195. ok
  196. end.
  197. handle_x_log_data(StartLSN, EndLSN, Data, CbState) ->
  198. {C, Pid} = CbState,
  199. Pid ! {epgsql, C, {x_log_data, StartLSN, EndLSN, Data}},
  200. {ok, EndLSN, EndLSN, CbState}.