epgsql_replication_SUITE.erl 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. -module(epgsql_replication_SUITE).
  2. -include_lib("eunit/include/eunit.hrl").
  3. -include_lib("common_test/include/ct.hrl").
  4. -include("epgsql.hrl").
  5. -export([all/0,
  6. init_per_suite/1,
  7. end_per_suite/1,
  8. connect_in_repl_mode/1,
  9. create_drop_replication_slot/1,
  10. replication_sync/1,
  11. replication_async/1,
  12. replication_async_active_n_socket/1,
  13. replication_sync_active_n_socket/1,
  14. replication_async_active_n_ssl/1,
  15. %% Callbacks
  16. handle_x_log_data/4
  17. ]).
  18. init_per_suite(Config) ->
  19. [{module, epgsql} | Config].
  20. end_per_suite(_Config) ->
  21. ok.
  22. all() ->
  23. [connect_in_repl_mode,
  24. create_drop_replication_slot,
  25. replication_async,
  26. replication_sync,
  27. replication_async_active_n_socket,
  28. replication_sync_active_n_socket,
  29. replication_async_active_n_ssl
  30. ].
  31. connect_in_repl_mode(Config) ->
  32. epgsql_ct:connect_only(
  33. Config,
  34. ["epgsql_test_replication",
  35. "epgsql_test_replication",
  36. [{database, "epgsql_test_db1"}, {replication, "database"}]
  37. ]).
  38. create_drop_replication_slot(Config) ->
  39. epgsql_ct:with_connection(
  40. Config,
  41. fun(C) ->
  42. create_replication_slot(Config, C),
  43. drop_replication_slot(Config, C)
  44. end,
  45. "epgsql_test_replication",
  46. [{replication, "database"}]).
  47. replication_async(Config) ->
  48. replication_test_run(Config, self()).
  49. replication_sync(Config) ->
  50. replication_test_run(Config, ?MODULE).
  51. replication_async_active_n_socket(Config) ->
  52. replication_test_run(Config, self(), [{socket_active, 1}]).
  53. replication_sync_active_n_socket(Config) ->
  54. replication_test_run(Config, ?MODULE, [{socket_active, 1}]).
  55. replication_async_active_n_ssl(Config) ->
  56. replication_test_run(Config, self(), [{socket_active, 1}, {ssl, require}]).
  57. replication_test_run(Config, Callback) ->
  58. replication_test_run(Config, Callback, []).
  59. replication_test_run(Config, Callback, ExtOpts) ->
  60. Module = ?config(module, Config),
  61. {Queries, ReplicationMsgs} = gen_query_and_replication_msgs(lists:seq(100, 110)),
  62. epgsql_ct:with_connection(
  63. Config,
  64. fun(C) ->
  65. create_replication_slot(Config, C),
  66. %% new connection because main is in the replication mode
  67. epgsql_ct:with_connection(
  68. Config,
  69. fun(C2) ->
  70. ExpectedResult = lists:duplicate(length(Queries), {ok, 1}),
  71. Res = Module:squery(C2, Queries),
  72. ?assertEqual(ExpectedResult, Res)
  73. end),
  74. Module:start_replication(C, "epgsql_test", Callback, {C, self()}, "0/0"),
  75. ok = receive_replication_msgs(Module, ReplicationMsgs, C, [])
  76. end,
  77. "epgsql_test_replication",
  78. [{replication, "database"} | ExtOpts]),
  79. %% cleanup
  80. epgsql_ct:with_connection(
  81. Config,
  82. fun(C) -> drop_replication_slot(Config, C) end,
  83. "epgsql_test_replication",
  84. [{replication, "database"}]).
  85. create_replication_slot(Config, Connection) ->
  86. Module = ?config(module, Config),
  87. {ok, Cols, Rows} =
  88. Module:squery(Connection,
  89. "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
  90. ?assertMatch([#column{name = <<"slot_name">>},
  91. #column{name = <<"consistent_point">>},
  92. #column{name = <<"snapshot_name">>},
  93. #column{name = <<"output_plugin">>}
  94. ],
  95. Cols),
  96. ?assertMatch([{<<"epgsql_test">>, _, _, <<"test_decoding">>}], Rows).
  97. drop_replication_slot(Config, Connection) ->
  98. Module = ?config(module, Config),
  99. Result = Module:squery(Connection, "DROP_REPLICATION_SLOT ""epgsql_test"""),
  100. case ?config(version, ?config(pg_config, Config)) >= [13, 0] of
  101. true -> ?assertMatch({ok, _, _}, Result);
  102. false -> ?assertMatch([{ok, _, _}, {ok, _, _}], Result)
  103. end.
  104. gen_query_and_replication_msgs(Ids) ->
  105. QInsFmt = "INSERT INTO test_table1 (id, value) VALUES (~b, '~s');",
  106. QDelFmt = "DELETE FROM test_table1 WHERE id = ~b;",
  107. RmInsFmt = "table public.test_table1: INSERT: id[integer]:~b value[text]:'~s'",
  108. RmDelFmt = "table public.test_table1: DELETE: id[integer]:~b",
  109. LongBin = base64:encode(crypto:strong_rand_bytes(254)),
  110. lists:foldl(
  111. fun(Id, {Qs, RMs}) ->
  112. QIns = lists:flatten(io_lib:format(QInsFmt, [Id, LongBin])),
  113. QDel = lists:flatten(io_lib:format(QDelFmt, [Id])),
  114. RmIns = iolist_to_binary(io_lib:format(RmInsFmt, [Id, LongBin])),
  115. RmDel = iolist_to_binary(io_lib:format(RmDelFmt, [Id])),
  116. {Qs ++ [QIns, QDel], RMs ++ [RmIns, RmDel]}
  117. end,
  118. {[], []},
  119. Ids).
  120. receive_replication_msgs(Module, Pattern, Pid, ReceivedMsgs) ->
  121. receive
  122. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"BEGIN", _/binary>>}} ->
  123. receive_replication_msgs(Module, Pattern, Pid, [begin_msg | ReceivedMsgs]);
  124. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"COMMIT", _/binary>>}} ->
  125. ensure_no_socket_passive_msgs(Module, Pid),
  126. case lists:reverse(ReceivedMsgs) of
  127. [begin_msg, row_msg | _] -> ok;
  128. _ -> error_replication_messages_not_received
  129. end;
  130. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, Msg}} ->
  131. [Msg | T] = Pattern,
  132. receive_replication_msgs(Module, T, Pid, [row_msg | ReceivedMsgs]);
  133. {epgsql, Pid, socket_passive} ->
  134. Module:activate(Pid),
  135. receive_replication_msgs(Module, Pattern, Pid, ReceivedMsgs)
  136. after
  137. 60000 ->
  138. error_timeout
  139. end.
  140. ensure_no_socket_passive_msgs(Module, Pid) ->
  141. receive
  142. {epgsql, Pid, socket_passive} ->
  143. Module:activate(Pid),
  144. ensure_no_socket_passive_msgs(Module, Pid)
  145. after
  146. 100 ->
  147. ok
  148. end.
  149. handle_x_log_data(StartLSN, EndLSN, Data, CbState) ->
  150. {C, Pid} = CbState,
  151. Pid ! {epgsql, C, {x_log_data, StartLSN, EndLSN, Data}},
  152. {ok, EndLSN, EndLSN, CbState}.