epgsql_replication_SUITE.erl 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. -module(epgsql_replication_SUITE).
  2. -include_lib("eunit/include/eunit.hrl").
  3. -include_lib("common_test/include/ct.hrl").
  4. -include("epgsql.hrl").
  5. -export([all/0,
  6. init_per_suite/1,
  7. end_per_suite/1,
  8. connect_in_repl_mode/1,
  9. create_drop_replication_slot/1,
  10. replication_sync/1,
  11. replication_async/1,
  12. replication_async_active_n_socket/1,
  13. replication_sync_active_n_socket/1,
  14. replication_async_active_n_ssl/1,
  15. two_replications_on_same_slot/1,
  16. %% Callbacks
  17. handle_x_log_data/4
  18. ]).
  19. init_per_suite(Config) ->
  20. [{module, epgsql} | Config].
  21. end_per_suite(_Config) ->
  22. ok.
  23. all() ->
  24. [connect_in_repl_mode,
  25. create_drop_replication_slot,
  26. replication_async,
  27. replication_sync,
  28. replication_async_active_n_socket,
  29. replication_sync_active_n_socket,
  30. replication_async_active_n_ssl,
  31. two_replications_on_same_slot
  32. ].
  33. connect_in_repl_mode(Config) ->
  34. epgsql_ct:connect_only(
  35. Config,
  36. ["epgsql_test_replication",
  37. "epgsql_test_replication",
  38. [{database, "epgsql_test_db1"}, {replication, "database"}]
  39. ]).
  40. create_drop_replication_slot(Config) ->
  41. epgsql_ct:with_connection(
  42. Config,
  43. fun(C) ->
  44. create_replication_slot(Config, C),
  45. drop_replication_slot(Config, C)
  46. end,
  47. "epgsql_test_replication",
  48. [{replication, "database"}]).
  49. replication_async(Config) ->
  50. replication_test_run(Config, self()).
  51. replication_sync(Config) ->
  52. replication_test_run(Config, ?MODULE).
  53. replication_async_active_n_socket(Config) ->
  54. replication_test_run(Config, self(), [{socket_active, 1}]).
  55. replication_sync_active_n_socket(Config) ->
  56. replication_test_run(Config, ?MODULE, [{socket_active, 1}]).
  57. -ifdef(OTP_RELEASE).
  58. replication_async_active_n_ssl(Config) ->
  59. replication_test_run(Config, self(), [{socket_active, 1}, {ssl, require}]).
  60. -else.
  61. %% {active, N} for SSL is only supported on OTP-21+
  62. replication_async_active_n_ssl(Config) ->
  63. noop.
  64. -endif.
  65. two_replications_on_same_slot(Config) ->
  66. Module = ?config(module, Config),
  67. User = "epgsql_test_replication",
  68. Parent = self(),
  69. epgsql_ct:with_connection(
  70. Config,
  71. fun(C) ->
  72. create_replication_slot(Config, C),
  73. Res1 = Module:start_replication(C, "epgsql_test", Parent, {C, Parent}, "0/0"),
  74. ?assertEqual(ok, Res1),
  75. spawn(
  76. fun() ->
  77. %% This connection will be terminated due to an unexpected message:
  78. %% EmptyQueryResponse (B), Byte1('I'), Int32(4)
  79. %% https://www.postgresql.org/docs/current/protocol-message-formats.html
  80. process_flag(trap_exit, true),
  81. C2 = epgsql_ct:connect(Config, User, [{replication, "database"}]),
  82. Res2 = Module:start_replication(C2, "epgsql_test", self(), {C2, self()}, "0/0"),
  83. ?assertMatch({error, _}, Res2),
  84. cleanup_and_notify(Parent)
  85. end),
  86. receive
  87. Result -> ?assertEqual(terminated, Result)
  88. after
  89. 1000 -> ?assert(false, "Second connection hasn't been closed")
  90. end
  91. end,
  92. User,
  93. [{replication, "database"}]),
  94. drop_replication_slot(Config).
  95. replication_test_run(Config, Callback) ->
  96. replication_test_run(Config, Callback, []).
  97. replication_test_run(Config, Callback, ExtOpts) ->
  98. Module = ?config(module, Config),
  99. {Queries, ReplicationMsgs} = gen_query_and_replication_msgs(lists:seq(100, 110)),
  100. epgsql_ct:with_connection(
  101. Config,
  102. fun(C) ->
  103. create_replication_slot(Config, C),
  104. %% new connection because main is in the replication mode
  105. epgsql_ct:with_connection(
  106. Config,
  107. fun(C2) ->
  108. ExpectedResult = lists:duplicate(length(Queries), {ok, 1}),
  109. Res = Module:squery(C2, Queries),
  110. ?assertEqual(ExpectedResult, Res)
  111. end),
  112. Module:start_replication(C, "epgsql_test", Callback, {C, self()}, "0/0"),
  113. ok = receive_replication_msgs(Module, ReplicationMsgs, C, [])
  114. end,
  115. "epgsql_test_replication",
  116. [{replication, "database"} | ExtOpts]),
  117. %% cleanup
  118. drop_replication_slot(Config).
  119. create_replication_slot(Config, Connection) ->
  120. Module = ?config(module, Config),
  121. {ok, Cols, Rows} =
  122. Module:squery(Connection,
  123. "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
  124. ?assertMatch([#column{name = <<"slot_name">>},
  125. #column{name = <<"consistent_point">>},
  126. #column{name = <<"snapshot_name">>},
  127. #column{name = <<"output_plugin">>}
  128. ],
  129. Cols),
  130. ?assertMatch([{<<"epgsql_test">>, _, _, <<"test_decoding">>}], Rows).
  131. drop_replication_slot(Config) ->
  132. epgsql_ct:with_connection(
  133. Config,
  134. fun(C) -> drop_replication_slot(Config, C) end,
  135. "epgsql_test_replication",
  136. [{replication, "database"}]).
  137. drop_replication_slot(Config, Connection) ->
  138. Module = ?config(module, Config),
  139. Result = Module:squery(Connection, "DROP_REPLICATION_SLOT ""epgsql_test"""),
  140. case ?config(version, ?config(pg_config, Config)) >= [13, 0] of
  141. true -> ?assertMatch({ok, _, _}, Result);
  142. false -> ?assertMatch([{ok, _, _}, {ok, _, _}], Result)
  143. end.
  144. gen_query_and_replication_msgs(Ids) ->
  145. QInsFmt = "INSERT INTO test_table1 (id, value) VALUES (~b, '~s');",
  146. QDelFmt = "DELETE FROM test_table1 WHERE id = ~b;",
  147. RmInsFmt = "table public.test_table1: INSERT: id[integer]:~b value[text]:'~s'",
  148. RmDelFmt = "table public.test_table1: DELETE: id[integer]:~b",
  149. LongBin = base64:encode(crypto:strong_rand_bytes(254)),
  150. lists:foldl(
  151. fun(Id, {Qs, RMs}) ->
  152. QIns = lists:flatten(io_lib:format(QInsFmt, [Id, LongBin])),
  153. QDel = lists:flatten(io_lib:format(QDelFmt, [Id])),
  154. RmIns = iolist_to_binary(io_lib:format(RmInsFmt, [Id, LongBin])),
  155. RmDel = iolist_to_binary(io_lib:format(RmDelFmt, [Id])),
  156. {Qs ++ [QIns, QDel], RMs ++ [RmIns, RmDel]}
  157. end,
  158. {[], []},
  159. Ids).
  160. receive_replication_msgs(Module, Pattern, Pid, ReceivedMsgs) ->
  161. receive
  162. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"BEGIN", _/binary>>}} ->
  163. receive_replication_msgs(Module, Pattern, Pid, [begin_msg | ReceivedMsgs]);
  164. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"COMMIT", _/binary>>}} ->
  165. ensure_no_socket_passive_msgs(Module, Pid),
  166. case lists:reverse(ReceivedMsgs) of
  167. [begin_msg, row_msg | _] -> ok;
  168. _ -> error_replication_messages_not_received
  169. end;
  170. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, Msg}} ->
  171. [Msg | T] = Pattern,
  172. receive_replication_msgs(Module, T, Pid, [row_msg | ReceivedMsgs]);
  173. {epgsql, Pid, socket_passive} ->
  174. Module:activate(Pid),
  175. receive_replication_msgs(Module, Pattern, Pid, ReceivedMsgs)
  176. after
  177. 60000 ->
  178. error_timeout
  179. end.
  180. ensure_no_socket_passive_msgs(Module, Pid) ->
  181. receive
  182. {epgsql, Pid, socket_passive} ->
  183. Module:activate(Pid),
  184. ensure_no_socket_passive_msgs(Module, Pid)
  185. after
  186. 100 ->
  187. ok
  188. end.
  189. cleanup_and_notify(Parent) ->
  190. receive
  191. {'EXIT', _Pid, {error, _}} ->
  192. Parent ! terminated;
  193. Msg ->
  194. Parent ! Msg
  195. after
  196. 100 ->
  197. Parent ! no_messages_received
  198. end.
  199. handle_x_log_data(StartLSN, EndLSN, Data, CbState) ->
  200. {C, Pid} = CbState,
  201. Pid ! {epgsql, C, {x_log_data, StartLSN, EndLSN, Data}},
  202. {ok, EndLSN, EndLSN, CbState}.