epgsql_replication_SUITE.erl 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. -module(epgsql_replication_SUITE).
  2. -include_lib("eunit/include/eunit.hrl").
  3. -include_lib("common_test/include/ct.hrl").
  4. -include("epgsql.hrl").
  5. -export([all/0,
  6. init_per_suite/1,
  7. end_per_suite/1,
  8. connect_in_repl_mode/1,
  9. create_drop_replication_slot/1,
  10. no_replication_slot/1,
  11. replication_sync/1,
  12. replication_async/1,
  13. replication_async_active_n_socket/1,
  14. replication_sync_active_n_socket/1,
  15. replication_async_active_n_ssl/1,
  16. two_replications_on_same_slot/1,
  17. %% Callbacks
  18. handle_x_log_data/4
  19. ]).
  20. init_per_suite(Config) ->
  21. [{module, epgsql} | Config].
  22. end_per_suite(_Config) ->
  23. ok.
  24. all() ->
  25. [connect_in_repl_mode,
  26. create_drop_replication_slot,
  27. no_replication_slot,
  28. replication_async,
  29. replication_sync,
  30. replication_async_active_n_socket,
  31. replication_sync_active_n_socket,
  32. replication_async_active_n_ssl,
  33. two_replications_on_same_slot
  34. ].
  35. connect_in_repl_mode(Config) ->
  36. epgsql_ct:connect_only(
  37. Config,
  38. ["epgsql_test_replication",
  39. "epgsql_test_replication",
  40. [{database, "epgsql_test_db1"}, {replication, "database"}]
  41. ]).
  42. create_drop_replication_slot(Config) ->
  43. epgsql_ct:with_connection(
  44. Config,
  45. fun(C) ->
  46. create_replication_slot(Config, C),
  47. drop_replication_slot(Config, C)
  48. end,
  49. "epgsql_test_replication",
  50. [{replication, "database"}]).
  51. replication_async(Config) ->
  52. replication_test_run(Config, self()).
  53. replication_sync(Config) ->
  54. replication_test_run(Config, ?MODULE).
  55. replication_async_active_n_socket(Config) ->
  56. replication_test_run(Config, self(), [{socket_active, 1}]).
  57. replication_sync_active_n_socket(Config) ->
  58. replication_test_run(Config, ?MODULE, [{socket_active, 1}]).
  59. -ifdef(OTP_RELEASE).
  60. replication_async_active_n_ssl(Config) ->
  61. replication_test_run(Config, self(), [{socket_active, 1},
  62. {ssl, require},
  63. {ssl_opts, [{verify, verify_none}]}]).
  64. -else.
  65. %% {active, N} for SSL is only supported on OTP-21+
  66. replication_async_active_n_ssl(Config) ->
  67. noop.
  68. -endif.
  69. two_replications_on_same_slot(Config) ->
  70. Module = ?config(module, Config),
  71. User = "epgsql_test_replication",
  72. SlotName = "epgsql_test",
  73. Parent = self(),
  74. epgsql_ct:with_connection(
  75. Config,
  76. fun(C) ->
  77. create_replication_slot(Config, C),
  78. Res1 = Module:start_replication(C, SlotName, Parent, {C, Parent}, "0/0"),
  79. ?assertEqual(ok, Res1),
  80. ErrorReceivedMsg = error_received,
  81. spawn(
  82. fun() ->
  83. %% Test that the second connection receives the ReadyForQuery message from PG
  84. %% synchronously after getting an error that the slot is occupied:
  85. %% ReadyForQuery (B), Byte1('Z'), Int32(5), Byte1
  86. %% https://www.postgresql.org/docs/current/protocol-message-formats.html
  87. epgsql_ct:with_connection(
  88. Config,
  89. fun(C2) ->
  90. Res2 = Module:start_replication(C2, SlotName, self(), {C2, self()}, "0/0"),
  91. ?assertMatch({error, #error{codename = object_in_use}}, Res2),
  92. Parent ! ErrorReceivedMsg
  93. end,
  94. User,
  95. [{replication, "database"}])
  96. end),
  97. receive
  98. Result -> ?assertEqual(ErrorReceivedMsg, Result)
  99. after
  100. 1000 -> ?assert(false, "Expected answer hasn't been received in 1000ms when "
  101. "establishing a second connection to the same replication slot")
  102. end
  103. end,
  104. User,
  105. [{replication, "database"}]),
  106. drop_replication_slot(Config).
  107. no_replication_slot(Config) ->
  108. Module = ?config(module, Config),
  109. epgsql_ct:with_connection(
  110. Config,
  111. fun(C) ->
  112. Res = Module:start_replication(C, "epgsql_test", self(), {C, self()}, "0/0"),
  113. ?assertMatch({error, #error{codename = undefined_object}}, Res)
  114. end,
  115. "epgsql_test_replication",
  116. [{replication, "database"}]).
  117. replication_test_run(Config, Callback) ->
  118. replication_test_run(Config, Callback, []).
  119. replication_test_run(Config, Callback, ExtOpts) ->
  120. Module = ?config(module, Config),
  121. {Queries, ReplicationMsgs} = gen_query_and_replication_msgs(lists:seq(100, 110)),
  122. epgsql_ct:with_connection(
  123. Config,
  124. fun(C) ->
  125. create_replication_slot(Config, C),
  126. %% new connection because main is in the replication mode
  127. epgsql_ct:with_connection(
  128. Config,
  129. fun(C2) ->
  130. ExpectedResult = lists:duplicate(length(Queries), {ok, 1}),
  131. Res = Module:squery(C2, Queries),
  132. ?assertEqual(ExpectedResult, Res)
  133. end),
  134. Module:start_replication(C, "epgsql_test", Callback, {C, self()}, "0/0"),
  135. ok = receive_replication_msgs(Module, ReplicationMsgs, C, [])
  136. end,
  137. "epgsql_test_replication",
  138. [{replication, "database"} | ExtOpts]),
  139. %% cleanup
  140. drop_replication_slot(Config).
  141. create_replication_slot(Config, Connection) ->
  142. Module = ?config(module, Config),
  143. {ok, Cols, Rows} =
  144. Module:squery(Connection,
  145. "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
  146. ?assertMatch([#column{name = <<"slot_name">>},
  147. #column{name = <<"consistent_point">>},
  148. #column{name = <<"snapshot_name">>},
  149. #column{name = <<"output_plugin">>}
  150. ],
  151. Cols),
  152. ?assertMatch([{<<"epgsql_test">>, _, _, <<"test_decoding">>}], Rows).
  153. drop_replication_slot(Config) ->
  154. epgsql_ct:with_connection(
  155. Config,
  156. fun(C) -> drop_replication_slot(Config, C) end,
  157. "epgsql_test_replication",
  158. [{replication, "database"}]).
  159. drop_replication_slot(Config, Connection) ->
  160. Module = ?config(module, Config),
  161. Result = Module:squery(Connection, "DROP_REPLICATION_SLOT ""epgsql_test"""),
  162. case ?config(version, ?config(pg_config, Config)) >= [13, 0] of
  163. true -> ?assertMatch({ok, _, _}, Result);
  164. false -> ?assertMatch([{ok, _, _}, {ok, _, _}], Result)
  165. end.
  166. gen_query_and_replication_msgs(Ids) ->
  167. QInsFmt = "INSERT INTO test_table1 (id, value) VALUES (~b, '~s');",
  168. QDelFmt = "DELETE FROM test_table1 WHERE id = ~b;",
  169. RmInsFmt = "table public.test_table1: INSERT: id[integer]:~b value[text]:'~s'",
  170. RmDelFmt = "table public.test_table1: DELETE: id[integer]:~b",
  171. LongBin = base64:encode(crypto:strong_rand_bytes(254)),
  172. lists:foldl(
  173. fun(Id, {Qs, RMs}) ->
  174. QIns = lists:flatten(io_lib:format(QInsFmt, [Id, LongBin])),
  175. QDel = lists:flatten(io_lib:format(QDelFmt, [Id])),
  176. RmIns = iolist_to_binary(io_lib:format(RmInsFmt, [Id, LongBin])),
  177. RmDel = iolist_to_binary(io_lib:format(RmDelFmt, [Id])),
  178. {Qs ++ [QIns, QDel], RMs ++ [RmIns, RmDel]}
  179. end,
  180. {[], []},
  181. Ids).
  182. receive_replication_msgs(Module, Pattern, Pid, ReceivedMsgs) ->
  183. receive
  184. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"BEGIN", _/binary>>}} ->
  185. receive_replication_msgs(Module, Pattern, Pid, [begin_msg | ReceivedMsgs]);
  186. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"COMMIT", _/binary>>}} ->
  187. ensure_no_socket_passive_msgs(Module, Pid),
  188. case lists:reverse(ReceivedMsgs) of
  189. [begin_msg, row_msg | _] -> ok;
  190. _ -> error_replication_messages_not_received
  191. end;
  192. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, Msg}} ->
  193. [Msg | T] = Pattern,
  194. receive_replication_msgs(Module, T, Pid, [row_msg | ReceivedMsgs]);
  195. {epgsql, Pid, socket_passive} ->
  196. Module:activate(Pid),
  197. receive_replication_msgs(Module, Pattern, Pid, ReceivedMsgs)
  198. after
  199. 60000 ->
  200. error_timeout
  201. end.
  202. ensure_no_socket_passive_msgs(Module, Pid) ->
  203. receive
  204. {epgsql, Pid, socket_passive} ->
  205. Module:activate(Pid),
  206. ensure_no_socket_passive_msgs(Module, Pid)
  207. after
  208. 100 ->
  209. ok
  210. end.
  211. handle_x_log_data(StartLSN, EndLSN, Data, CbState) ->
  212. {C, Pid} = CbState,
  213. Pid ! {epgsql, C, {x_log_data, StartLSN, EndLSN, Data}},
  214. {ok, EndLSN, EndLSN, CbState}.