epgsql_replication_SUITE.erl 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. -module(epgsql_replication_SUITE).
  2. -include_lib("eunit/include/eunit.hrl").
  3. -include_lib("common_test/include/ct.hrl").
  4. -include("epgsql.hrl").
  5. -export([all/0,
  6. init_per_suite/1,
  7. end_per_suite/1,
  8. connect_in_repl_mode/1,
  9. create_drop_replication_slot/1,
  10. no_replication_slot/1,
  11. replication_sync/1,
  12. replication_async/1,
  13. replication_async_active_n_socket/1,
  14. replication_sync_active_n_socket/1,
  15. replication_async_active_n_ssl/1,
  16. two_replications_on_same_slot/1,
  17. %% Callbacks
  18. handle_x_log_data/4
  19. ]).
  20. init_per_suite(Config) ->
  21. [{module, epgsql} | Config].
  22. end_per_suite(_Config) ->
  23. ok.
  24. all() ->
  25. [connect_in_repl_mode,
  26. create_drop_replication_slot,
  27. no_replication_slot,
  28. replication_async,
  29. replication_sync,
  30. replication_async_active_n_socket,
  31. replication_sync_active_n_socket,
  32. replication_async_active_n_ssl,
  33. two_replications_on_same_slot
  34. ].
  35. connect_in_repl_mode(Config) ->
  36. epgsql_ct:connect_only(
  37. Config,
  38. ["epgsql_test_replication",
  39. "epgsql_test_replication",
  40. [{database, "epgsql_test_db1"}, {replication, "database"}]
  41. ]).
  42. create_drop_replication_slot(Config) ->
  43. epgsql_ct:with_connection(
  44. Config,
  45. fun(C) ->
  46. create_replication_slot(Config, C),
  47. drop_replication_slot(Config, C)
  48. end,
  49. "epgsql_test_replication",
  50. [{replication, "database"}]).
  51. replication_async(Config) ->
  52. replication_test_run(Config, self()).
  53. replication_sync(Config) ->
  54. replication_test_run(Config, ?MODULE).
  55. replication_async_active_n_socket(Config) ->
  56. replication_test_run(Config, self(), [{socket_active, 1}]).
  57. replication_sync_active_n_socket(Config) ->
  58. replication_test_run(Config, ?MODULE, [{socket_active, 1}]).
  59. -ifdef(OTP_RELEASE).
  60. replication_async_active_n_ssl(Config) ->
  61. replication_test_run(Config, self(), [{socket_active, 1}, {ssl, require}]).
  62. -else.
  63. %% {active, N} for SSL is only supported on OTP-21+
  64. replication_async_active_n_ssl(Config) ->
  65. noop.
  66. -endif.
  67. two_replications_on_same_slot(Config) ->
  68. Module = ?config(module, Config),
  69. User = "epgsql_test_replication",
  70. SlotName = "epgsql_test",
  71. Parent = self(),
  72. epgsql_ct:with_connection(
  73. Config,
  74. fun(C) ->
  75. create_replication_slot(Config, C),
  76. Res1 = Module:start_replication(C, SlotName, Parent, {C, Parent}, "0/0"),
  77. ?assertEqual(ok, Res1),
  78. ErrorReceivedMsg = error_received,
  79. spawn(
  80. fun() ->
  81. %% Test that the second connection receives the ReadyForQuery message from PG
  82. %% synchronously after getting an error that the slot is occupied:
  83. %% ReadyForQuery (B), Byte1('Z'), Int32(5), Byte1
  84. %% https://www.postgresql.org/docs/current/protocol-message-formats.html
  85. epgsql_ct:with_connection(
  86. Config,
  87. fun(C2) ->
  88. Res2 = Module:start_replication(C2, SlotName, self(), {C2, self()}, "0/0"),
  89. ?assertMatch({error, #error{codename = object_in_use}}, Res2),
  90. Parent ! ErrorReceivedMsg
  91. end,
  92. User,
  93. [{replication, "database"}])
  94. end),
  95. receive
  96. Result -> ?assertEqual(ErrorReceivedMsg, Result)
  97. after
  98. 1000 -> ?assert(false, "Expected answer hasn't been received in 1000ms when "
  99. "establishing a second connection to the same replication slot")
  100. end
  101. end,
  102. User,
  103. [{replication, "database"}]),
  104. drop_replication_slot(Config).
  105. no_replication_slot(Config) ->
  106. Module = ?config(module, Config),
  107. epgsql_ct:with_connection(
  108. Config,
  109. fun(C) ->
  110. Res = Module:start_replication(C, "epgsql_test", self(), {C, self()}, "0/0"),
  111. ?assertMatch({error, #error{codename = undefined_object}}, Res)
  112. end,
  113. "epgsql_test_replication",
  114. [{replication, "database"}]).
  115. replication_test_run(Config, Callback) ->
  116. replication_test_run(Config, Callback, []).
  117. replication_test_run(Config, Callback, ExtOpts) ->
  118. Module = ?config(module, Config),
  119. {Queries, ReplicationMsgs} = gen_query_and_replication_msgs(lists:seq(100, 110)),
  120. epgsql_ct:with_connection(
  121. Config,
  122. fun(C) ->
  123. create_replication_slot(Config, C),
  124. %% new connection because main is in the replication mode
  125. epgsql_ct:with_connection(
  126. Config,
  127. fun(C2) ->
  128. ExpectedResult = lists:duplicate(length(Queries), {ok, 1}),
  129. Res = Module:squery(C2, Queries),
  130. ?assertEqual(ExpectedResult, Res)
  131. end),
  132. Module:start_replication(C, "epgsql_test", Callback, {C, self()}, "0/0"),
  133. ok = receive_replication_msgs(Module, ReplicationMsgs, C, [])
  134. end,
  135. "epgsql_test_replication",
  136. [{replication, "database"} | ExtOpts]),
  137. %% cleanup
  138. drop_replication_slot(Config).
  139. create_replication_slot(Config, Connection) ->
  140. Module = ?config(module, Config),
  141. {ok, Cols, Rows} =
  142. Module:squery(Connection,
  143. "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
  144. ?assertMatch([#column{name = <<"slot_name">>},
  145. #column{name = <<"consistent_point">>},
  146. #column{name = <<"snapshot_name">>},
  147. #column{name = <<"output_plugin">>}
  148. ],
  149. Cols),
  150. ?assertMatch([{<<"epgsql_test">>, _, _, <<"test_decoding">>}], Rows).
  151. drop_replication_slot(Config) ->
  152. epgsql_ct:with_connection(
  153. Config,
  154. fun(C) -> drop_replication_slot(Config, C) end,
  155. "epgsql_test_replication",
  156. [{replication, "database"}]).
  157. drop_replication_slot(Config, Connection) ->
  158. Module = ?config(module, Config),
  159. Result = Module:squery(Connection, "DROP_REPLICATION_SLOT ""epgsql_test"""),
  160. case ?config(version, ?config(pg_config, Config)) >= [13, 0] of
  161. true -> ?assertMatch({ok, _, _}, Result);
  162. false -> ?assertMatch([{ok, _, _}, {ok, _, _}], Result)
  163. end.
  164. gen_query_and_replication_msgs(Ids) ->
  165. QInsFmt = "INSERT INTO test_table1 (id, value) VALUES (~b, '~s');",
  166. QDelFmt = "DELETE FROM test_table1 WHERE id = ~b;",
  167. RmInsFmt = "table public.test_table1: INSERT: id[integer]:~b value[text]:'~s'",
  168. RmDelFmt = "table public.test_table1: DELETE: id[integer]:~b",
  169. LongBin = base64:encode(crypto:strong_rand_bytes(254)),
  170. lists:foldl(
  171. fun(Id, {Qs, RMs}) ->
  172. QIns = lists:flatten(io_lib:format(QInsFmt, [Id, LongBin])),
  173. QDel = lists:flatten(io_lib:format(QDelFmt, [Id])),
  174. RmIns = iolist_to_binary(io_lib:format(RmInsFmt, [Id, LongBin])),
  175. RmDel = iolist_to_binary(io_lib:format(RmDelFmt, [Id])),
  176. {Qs ++ [QIns, QDel], RMs ++ [RmIns, RmDel]}
  177. end,
  178. {[], []},
  179. Ids).
  180. receive_replication_msgs(Module, Pattern, Pid, ReceivedMsgs) ->
  181. receive
  182. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"BEGIN", _/binary>>}} ->
  183. receive_replication_msgs(Module, Pattern, Pid, [begin_msg | ReceivedMsgs]);
  184. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"COMMIT", _/binary>>}} ->
  185. ensure_no_socket_passive_msgs(Module, Pid),
  186. case lists:reverse(ReceivedMsgs) of
  187. [begin_msg, row_msg | _] -> ok;
  188. _ -> error_replication_messages_not_received
  189. end;
  190. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, Msg}} ->
  191. [Msg | T] = Pattern,
  192. receive_replication_msgs(Module, T, Pid, [row_msg | ReceivedMsgs]);
  193. {epgsql, Pid, socket_passive} ->
  194. Module:activate(Pid),
  195. receive_replication_msgs(Module, Pattern, Pid, ReceivedMsgs)
  196. after
  197. 60000 ->
  198. error_timeout
  199. end.
  200. ensure_no_socket_passive_msgs(Module, Pid) ->
  201. receive
  202. {epgsql, Pid, socket_passive} ->
  203. Module:activate(Pid),
  204. ensure_no_socket_passive_msgs(Module, Pid)
  205. after
  206. 100 ->
  207. ok
  208. end.
  209. handle_x_log_data(StartLSN, EndLSN, Data, CbState) ->
  210. {C, Pid} = CbState,
  211. Pid ! {epgsql, C, {x_log_data, StartLSN, EndLSN, Data}},
  212. {ok, EndLSN, EndLSN, CbState}.