epgsql_replication_SUITE.erl 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. -module(epgsql_replication_SUITE).
  2. -include_lib("eunit/include/eunit.hrl").
  3. -include_lib("common_test/include/ct.hrl").
  4. -include("epgsql.hrl").
  5. -export([all/0,
  6. init_per_suite/1,
  7. end_per_suite/1,
  8. connect_in_repl_mode/1,
  9. create_drop_replication_slot/1,
  10. replication_sync/1,
  11. replication_async/1,
  12. replication_async_active_n_socket/1,
  13. replication_sync_active_n_socket/1,
  14. replication_async_active_n_ssl/1,
  15. two_replications_on_same_slot/1,
  16. %% Callbacks
  17. handle_x_log_data/4
  18. ]).
  19. init_per_suite(Config) ->
  20. [{module, epgsql} | Config].
  21. end_per_suite(_Config) ->
  22. ok.
  23. all() ->
  24. [connect_in_repl_mode,
  25. create_drop_replication_slot,
  26. replication_async,
  27. replication_sync,
  28. replication_async_active_n_socket,
  29. replication_sync_active_n_socket,
  30. replication_async_active_n_ssl,
  31. two_replications_on_same_slot
  32. ].
  33. connect_in_repl_mode(Config) ->
  34. epgsql_ct:connect_only(
  35. Config,
  36. ["epgsql_test_replication",
  37. "epgsql_test_replication",
  38. [{database, "epgsql_test_db1"}, {replication, "database"}]
  39. ]).
  40. create_drop_replication_slot(Config) ->
  41. epgsql_ct:with_connection(
  42. Config,
  43. fun(C) ->
  44. create_replication_slot(Config, C),
  45. drop_replication_slot(Config, C)
  46. end,
  47. "epgsql_test_replication",
  48. [{replication, "database"}]).
  49. replication_async(Config) ->
  50. replication_test_run(Config, self()).
  51. replication_sync(Config) ->
  52. replication_test_run(Config, ?MODULE).
  53. replication_async_active_n_socket(Config) ->
  54. replication_test_run(Config, self(), [{socket_active, 1}]).
  55. replication_sync_active_n_socket(Config) ->
  56. replication_test_run(Config, ?MODULE, [{socket_active, 1}]).
  57. -ifdef(OTP_RELEASE).
  58. replication_async_active_n_ssl(Config) ->
  59. replication_test_run(Config, self(), [{socket_active, 1}, {ssl, require}]).
  60. -else.
  61. %% {active, N} for SSL is only supported on OTP-21+
  62. replication_async_active_n_ssl(Config) ->
  63. noop.
  64. -endif.
  65. two_replications_on_same_slot(Config) ->
  66. Module = ?config(module, Config),
  67. User = "epgsql_test_replication",
  68. Parent = self(),
  69. epgsql_ct:with_connection(
  70. Config,
  71. fun(C) ->
  72. create_replication_slot(Config, C),
  73. Res1 = Module:start_replication(C, "epgsql_test", Parent, {C, Parent}, "0/0"),
  74. ?assertEqual(ok, Res1),
  75. spawn(
  76. fun() ->
  77. %% This connection will be terminated due to an unexpected message:
  78. %% ReadyForQuery (B), Byte1('Z'), Int32(5), Byte1
  79. %% https://www.postgresql.org/docs/current/protocol-message-formats.html
  80. process_flag(trap_exit, true),
  81. C2 = epgsql_ct:connect(Config, User, [{replication, "database"}]),
  82. Res2 = Module:start_replication(C2, "epgsql_test", self(), {C2, self()}, "0/0"),
  83. ?assertMatch({error, _}, Res2),
  84. Parent ! error_received
  85. end),
  86. receive
  87. Result -> ?assertEqual(error_received, Result)
  88. after
  89. 1000 -> ?assert(false, "Error hasn't been received when establishing "
  90. "a second connection to the same replication slot")
  91. end
  92. end,
  93. User,
  94. [{replication, "database"}]),
  95. drop_replication_slot(Config).
  96. replication_test_run(Config, Callback) ->
  97. replication_test_run(Config, Callback, []).
  98. replication_test_run(Config, Callback, ExtOpts) ->
  99. Module = ?config(module, Config),
  100. {Queries, ReplicationMsgs} = gen_query_and_replication_msgs(lists:seq(100, 110)),
  101. epgsql_ct:with_connection(
  102. Config,
  103. fun(C) ->
  104. create_replication_slot(Config, C),
  105. %% new connection because main is in the replication mode
  106. epgsql_ct:with_connection(
  107. Config,
  108. fun(C2) ->
  109. ExpectedResult = lists:duplicate(length(Queries), {ok, 1}),
  110. Res = Module:squery(C2, Queries),
  111. ?assertEqual(ExpectedResult, Res)
  112. end),
  113. Module:start_replication(C, "epgsql_test", Callback, {C, self()}, "0/0"),
  114. ok = receive_replication_msgs(Module, ReplicationMsgs, C, [])
  115. end,
  116. "epgsql_test_replication",
  117. [{replication, "database"} | ExtOpts]),
  118. %% cleanup
  119. drop_replication_slot(Config).
  120. create_replication_slot(Config, Connection) ->
  121. Module = ?config(module, Config),
  122. {ok, Cols, Rows} =
  123. Module:squery(Connection,
  124. "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
  125. ?assertMatch([#column{name = <<"slot_name">>},
  126. #column{name = <<"consistent_point">>},
  127. #column{name = <<"snapshot_name">>},
  128. #column{name = <<"output_plugin">>}
  129. ],
  130. Cols),
  131. ?assertMatch([{<<"epgsql_test">>, _, _, <<"test_decoding">>}], Rows).
  132. drop_replication_slot(Config) ->
  133. epgsql_ct:with_connection(
  134. Config,
  135. fun(C) -> drop_replication_slot(Config, C) end,
  136. "epgsql_test_replication",
  137. [{replication, "database"}]).
  138. drop_replication_slot(Config, Connection) ->
  139. Module = ?config(module, Config),
  140. Result = Module:squery(Connection, "DROP_REPLICATION_SLOT ""epgsql_test"""),
  141. case ?config(version, ?config(pg_config, Config)) >= [13, 0] of
  142. true -> ?assertMatch({ok, _, _}, Result);
  143. false -> ?assertMatch([{ok, _, _}, {ok, _, _}], Result)
  144. end.
  145. gen_query_and_replication_msgs(Ids) ->
  146. QInsFmt = "INSERT INTO test_table1 (id, value) VALUES (~b, '~s');",
  147. QDelFmt = "DELETE FROM test_table1 WHERE id = ~b;",
  148. RmInsFmt = "table public.test_table1: INSERT: id[integer]:~b value[text]:'~s'",
  149. RmDelFmt = "table public.test_table1: DELETE: id[integer]:~b",
  150. LongBin = base64:encode(crypto:strong_rand_bytes(254)),
  151. lists:foldl(
  152. fun(Id, {Qs, RMs}) ->
  153. QIns = lists:flatten(io_lib:format(QInsFmt, [Id, LongBin])),
  154. QDel = lists:flatten(io_lib:format(QDelFmt, [Id])),
  155. RmIns = iolist_to_binary(io_lib:format(RmInsFmt, [Id, LongBin])),
  156. RmDel = iolist_to_binary(io_lib:format(RmDelFmt, [Id])),
  157. {Qs ++ [QIns, QDel], RMs ++ [RmIns, RmDel]}
  158. end,
  159. {[], []},
  160. Ids).
  161. receive_replication_msgs(Module, Pattern, Pid, ReceivedMsgs) ->
  162. receive
  163. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"BEGIN", _/binary>>}} ->
  164. receive_replication_msgs(Module, Pattern, Pid, [begin_msg | ReceivedMsgs]);
  165. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"COMMIT", _/binary>>}} ->
  166. ensure_no_socket_passive_msgs(Module, Pid),
  167. case lists:reverse(ReceivedMsgs) of
  168. [begin_msg, row_msg | _] -> ok;
  169. _ -> error_replication_messages_not_received
  170. end;
  171. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, Msg}} ->
  172. [Msg | T] = Pattern,
  173. receive_replication_msgs(Module, T, Pid, [row_msg | ReceivedMsgs]);
  174. {epgsql, Pid, socket_passive} ->
  175. Module:activate(Pid),
  176. receive_replication_msgs(Module, Pattern, Pid, ReceivedMsgs)
  177. after
  178. 60000 ->
  179. error_timeout
  180. end.
  181. ensure_no_socket_passive_msgs(Module, Pid) ->
  182. receive
  183. {epgsql, Pid, socket_passive} ->
  184. Module:activate(Pid),
  185. ensure_no_socket_passive_msgs(Module, Pid)
  186. after
  187. 100 ->
  188. ok
  189. end.
  190. handle_x_log_data(StartLSN, EndLSN, Data, CbState) ->
  191. {C, Pid} = CbState,
  192. Pid ! {epgsql, C, {x_log_data, StartLSN, EndLSN, Data}},
  193. {ok, EndLSN, EndLSN, CbState}.