epgsql_replication_SUITE.erl 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. -module(epgsql_replication_SUITE).
  2. -include_lib("eunit/include/eunit.hrl").
  3. -include_lib("common_test/include/ct.hrl").
  4. -include("epgsql.hrl").
  5. -export([all/0,
  6. init_per_suite/1,
  7. end_per_suite/1,
  8. connect_in_repl_mode/1,
  9. create_drop_replication_slot/1,
  10. replication_sync/1,
  11. replication_async/1,
  12. replication_async_active_n_socket/1,
  13. replication_sync_active_n_socket/1,
  14. %% Callbacks
  15. handle_x_log_data/4
  16. ]).
  17. init_per_suite(Config) ->
  18. [{module, epgsql} | Config].
  19. end_per_suite(_Config) ->
  20. ok.
  21. all() ->
  22. [connect_in_repl_mode,
  23. create_drop_replication_slot,
  24. replication_async,
  25. replication_sync,
  26. replication_async_active_n_socket,
  27. replication_sync_active_n_socket
  28. ].
  29. connect_in_repl_mode(Config) ->
  30. epgsql_ct:connect_only(
  31. Config,
  32. ["epgsql_test_replication",
  33. "epgsql_test_replication",
  34. [{database, "epgsql_test_db1"}, {replication, "database"}]
  35. ]).
  36. create_drop_replication_slot(Config) ->
  37. epgsql_ct:with_connection(
  38. Config,
  39. fun(C) ->
  40. create_replication_slot(Config, C),
  41. drop_replication_slot(Config, C)
  42. end,
  43. "epgsql_test_replication",
  44. [{replication, "database"}]).
  45. replication_async(Config) ->
  46. replication_test_run(Config, self()).
  47. replication_sync(Config) ->
  48. replication_test_run(Config, ?MODULE).
  49. replication_async_active_n_socket(Config) ->
  50. replication_test_run(Config, self(), [{socket_active, 1}]).
  51. replication_sync_active_n_socket(Config) ->
  52. replication_test_run(Config, ?MODULE, [{socket_active, 1}]).
  53. replication_test_run(Config, Callback) ->
  54. replication_test_run(Config, Callback, []).
  55. replication_test_run(Config, Callback, ExtOpts) ->
  56. Module = ?config(module, Config),
  57. {Queries, ReplicationMsgs} = gen_query_and_replication_msgs(lists:seq(100, 110)),
  58. epgsql_ct:with_connection(
  59. Config,
  60. fun(C) ->
  61. create_replication_slot(Config, C),
  62. %% new connection because main is in the replication mode
  63. epgsql_ct:with_connection(
  64. Config,
  65. fun(C2) ->
  66. ExpectedResult = lists:duplicate(length(Queries), {ok, 1}),
  67. Res = Module:squery(C2, Queries),
  68. ?assertEqual(ExpectedResult, Res)
  69. end),
  70. Module:start_replication(C, "epgsql_test", Callback, {C, self()}, "0/0"),
  71. ok = receive_replication_msgs(Module, ReplicationMsgs, C, [])
  72. end,
  73. "epgsql_test_replication",
  74. [{replication, "database"} | ExtOpts]),
  75. %% cleanup
  76. epgsql_ct:with_connection(
  77. Config,
  78. fun(C) -> drop_replication_slot(Config, C) end,
  79. "epgsql_test_replication",
  80. [{replication, "database"}]).
  81. create_replication_slot(Config, Connection) ->
  82. Module = ?config(module, Config),
  83. {ok, Cols, Rows} =
  84. Module:squery(Connection,
  85. "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
  86. ?assertMatch([#column{name = <<"slot_name">>},
  87. #column{name = <<"consistent_point">>},
  88. #column{name = <<"snapshot_name">>},
  89. #column{name = <<"output_plugin">>}
  90. ],
  91. Cols),
  92. ?assertMatch([{<<"epgsql_test">>, _, _, <<"test_decoding">>}], Rows).
  93. drop_replication_slot(Config, Connection) ->
  94. Module = ?config(module, Config),
  95. Result = Module:squery(Connection, "DROP_REPLICATION_SLOT ""epgsql_test"""),
  96. case ?config(version, ?config(pg_config, Config)) >= [13, 0] of
  97. true -> ?assertMatch({ok, _, _}, Result);
  98. false -> ?assertMatch([{ok, _, _}, {ok, _, _}], Result)
  99. end.
  100. gen_query_and_replication_msgs(Ids) ->
  101. QInsFmt = "INSERT INTO test_table1 (id, value) VALUES (~b, '~s');",
  102. QDelFmt = "DELETE FROM test_table1 WHERE id = ~b;",
  103. RmInsFmt = "table public.test_table1: INSERT: id[integer]:~b value[text]:'~s'",
  104. RmDelFmt = "table public.test_table1: DELETE: id[integer]:~b",
  105. LongBin = base64:encode(crypto:strong_rand_bytes(254)),
  106. lists:foldl(
  107. fun(Id, {Qs, RMs}) ->
  108. QIns = lists:flatten(io_lib:format(QInsFmt, [Id, LongBin])),
  109. QDel = lists:flatten(io_lib:format(QDelFmt, [Id])),
  110. RmIns = iolist_to_binary(io_lib:format(RmInsFmt, [Id, LongBin])),
  111. RmDel = iolist_to_binary(io_lib:format(RmDelFmt, [Id])),
  112. {Qs ++ [QIns, QDel], RMs ++ [RmIns, RmDel]}
  113. end,
  114. {[], []},
  115. Ids).
  116. receive_replication_msgs(Module, Pattern, Pid, ReceivedMsgs) ->
  117. receive
  118. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"BEGIN", _/binary>>}} ->
  119. receive_replication_msgs(Module, Pattern, Pid, [begin_msg | ReceivedMsgs]);
  120. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"COMMIT", _/binary>>}} ->
  121. ensure_no_socket_passive_msgs(Module, Pid),
  122. case lists:reverse(ReceivedMsgs) of
  123. [begin_msg, row_msg | _] -> ok;
  124. _ -> error_replication_messages_not_received
  125. end;
  126. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, Msg}} ->
  127. [Msg | T] = Pattern,
  128. receive_replication_msgs(Module, T, Pid, [row_msg | ReceivedMsgs]);
  129. {epgsql, Pid, socket_passive} ->
  130. Module:activate(Pid),
  131. receive_replication_msgs(Module, Pattern, Pid, ReceivedMsgs)
  132. after
  133. 60000 ->
  134. error_timeout
  135. end.
  136. ensure_no_socket_passive_msgs(Module, Pid) ->
  137. receive
  138. {epgsql, Pid, socket_passive} ->
  139. Module:activate(Pid),
  140. ensure_no_socket_passive_msgs(Module, Pid)
  141. after
  142. 100 ->
  143. ok
  144. end.
  145. handle_x_log_data(StartLSN, EndLSN, Data, CbState) ->
  146. {C, Pid} = CbState,
  147. Pid ! {epgsql, C, {x_log_data, StartLSN, EndLSN, Data}},
  148. {ok, EndLSN, EndLSN, CbState}.