epgsql_replication_SUITE.erl 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. -module(epgsql_replication_SUITE).
  2. -include_lib("eunit/include/eunit.hrl").
  3. -include_lib("common_test/include/ct.hrl").
  4. -include("epgsql.hrl").
  5. -export([all/0,
  6. init_per_suite/1,
  7. end_per_suite/1,
  8. connect_in_repl_mode/1,
  9. create_drop_replication_slot/1,
  10. replication_sync/1,
  11. replication_async/1,
  12. replication_async_active_n_socket/1,
  13. replication_sync_active_n_socket/1,
  14. replication_async_active_n_ssl/1,
  15. %% Callbacks
  16. handle_x_log_data/4
  17. ]).
  18. init_per_suite(Config) ->
  19. [{module, epgsql} | Config].
  20. end_per_suite(_Config) ->
  21. ok.
  22. all() ->
  23. [connect_in_repl_mode,
  24. create_drop_replication_slot,
  25. replication_async,
  26. replication_sync,
  27. replication_async_active_n_socket,
  28. replication_sync_active_n_socket,
  29. replication_async_active_n_ssl
  30. ].
  31. connect_in_repl_mode(Config) ->
  32. epgsql_ct:connect_only(
  33. Config,
  34. ["epgsql_test_replication",
  35. "epgsql_test_replication",
  36. [{database, "epgsql_test_db1"}, {replication, "database"}]
  37. ]).
  38. create_drop_replication_slot(Config) ->
  39. epgsql_ct:with_connection(
  40. Config,
  41. fun(C) ->
  42. create_replication_slot(Config, C),
  43. drop_replication_slot(Config, C)
  44. end,
  45. "epgsql_test_replication",
  46. [{replication, "database"}]).
  47. replication_async(Config) ->
  48. replication_test_run(Config, self()).
  49. replication_sync(Config) ->
  50. replication_test_run(Config, ?MODULE).
  51. replication_async_active_n_socket(Config) ->
  52. replication_test_run(Config, self(), [{socket_active, 1}]).
  53. replication_sync_active_n_socket(Config) ->
  54. replication_test_run(Config, ?MODULE, [{socket_active, 1}]).
  55. -ifdef(OTP_RELEASE).
  56. replication_async_active_n_ssl(Config) ->
  57. replication_test_run(Config, self(), [{socket_active, 1}, {ssl, require}]).
  58. -else.
  59. %% {active, N} for SSL is only supported on OTP-21+
  60. replication_async_active_n_ssl(Config) ->
  61. noop.
  62. -endif.
  63. replication_test_run(Config, Callback) ->
  64. replication_test_run(Config, Callback, []).
  65. replication_test_run(Config, Callback, ExtOpts) ->
  66. Module = ?config(module, Config),
  67. {Queries, ReplicationMsgs} = gen_query_and_replication_msgs(lists:seq(100, 110)),
  68. epgsql_ct:with_connection(
  69. Config,
  70. fun(C) ->
  71. create_replication_slot(Config, C),
  72. %% new connection because main is in the replication mode
  73. epgsql_ct:with_connection(
  74. Config,
  75. fun(C2) ->
  76. ExpectedResult = lists:duplicate(length(Queries), {ok, 1}),
  77. Res = Module:squery(C2, Queries),
  78. ?assertEqual(ExpectedResult, Res)
  79. end),
  80. Module:start_replication(C, "epgsql_test", Callback, {C, self()}, "0/0"),
  81. ok = receive_replication_msgs(Module, ReplicationMsgs, C, [])
  82. end,
  83. "epgsql_test_replication",
  84. [{replication, "database"} | ExtOpts]),
  85. %% cleanup
  86. epgsql_ct:with_connection(
  87. Config,
  88. fun(C) -> drop_replication_slot(Config, C) end,
  89. "epgsql_test_replication",
  90. [{replication, "database"}]).
  91. create_replication_slot(Config, Connection) ->
  92. Module = ?config(module, Config),
  93. {ok, Cols, Rows} =
  94. Module:squery(Connection,
  95. "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
  96. ?assertMatch([#column{name = <<"slot_name">>},
  97. #column{name = <<"consistent_point">>},
  98. #column{name = <<"snapshot_name">>},
  99. #column{name = <<"output_plugin">>}
  100. ],
  101. Cols),
  102. ?assertMatch([{<<"epgsql_test">>, _, _, <<"test_decoding">>}], Rows).
  103. drop_replication_slot(Config, Connection) ->
  104. Module = ?config(module, Config),
  105. Result = Module:squery(Connection, "DROP_REPLICATION_SLOT ""epgsql_test"""),
  106. case ?config(version, ?config(pg_config, Config)) >= [13, 0] of
  107. true -> ?assertMatch({ok, _, _}, Result);
  108. false -> ?assertMatch([{ok, _, _}, {ok, _, _}], Result)
  109. end.
  110. gen_query_and_replication_msgs(Ids) ->
  111. QInsFmt = "INSERT INTO test_table1 (id, value) VALUES (~b, '~s');",
  112. QDelFmt = "DELETE FROM test_table1 WHERE id = ~b;",
  113. RmInsFmt = "table public.test_table1: INSERT: id[integer]:~b value[text]:'~s'",
  114. RmDelFmt = "table public.test_table1: DELETE: id[integer]:~b",
  115. LongBin = base64:encode(crypto:strong_rand_bytes(254)),
  116. lists:foldl(
  117. fun(Id, {Qs, RMs}) ->
  118. QIns = lists:flatten(io_lib:format(QInsFmt, [Id, LongBin])),
  119. QDel = lists:flatten(io_lib:format(QDelFmt, [Id])),
  120. RmIns = iolist_to_binary(io_lib:format(RmInsFmt, [Id, LongBin])),
  121. RmDel = iolist_to_binary(io_lib:format(RmDelFmt, [Id])),
  122. {Qs ++ [QIns, QDel], RMs ++ [RmIns, RmDel]}
  123. end,
  124. {[], []},
  125. Ids).
  126. receive_replication_msgs(Module, Pattern, Pid, ReceivedMsgs) ->
  127. receive
  128. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"BEGIN", _/binary>>}} ->
  129. receive_replication_msgs(Module, Pattern, Pid, [begin_msg | ReceivedMsgs]);
  130. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"COMMIT", _/binary>>}} ->
  131. ensure_no_socket_passive_msgs(Module, Pid),
  132. case lists:reverse(ReceivedMsgs) of
  133. [begin_msg, row_msg | _] -> ok;
  134. _ -> error_replication_messages_not_received
  135. end;
  136. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, Msg}} ->
  137. [Msg | T] = Pattern,
  138. receive_replication_msgs(Module, T, Pid, [row_msg | ReceivedMsgs]);
  139. {epgsql, Pid, socket_passive} ->
  140. Module:activate(Pid),
  141. receive_replication_msgs(Module, Pattern, Pid, ReceivedMsgs)
  142. after
  143. 60000 ->
  144. error_timeout
  145. end.
  146. ensure_no_socket_passive_msgs(Module, Pid) ->
  147. receive
  148. {epgsql, Pid, socket_passive} ->
  149. Module:activate(Pid),
  150. ensure_no_socket_passive_msgs(Module, Pid)
  151. after
  152. 100 ->
  153. ok
  154. end.
  155. handle_x_log_data(StartLSN, EndLSN, Data, CbState) ->
  156. {C, Pid} = CbState,
  157. Pid ! {epgsql, C, {x_log_data, StartLSN, EndLSN, Data}},
  158. {ok, EndLSN, EndLSN, CbState}.