epgsql_replication_SUITE.erl 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. -module(epgsql_replication_SUITE).
  2. -include_lib("eunit/include/eunit.hrl").
  3. -include_lib("common_test/include/ct.hrl").
  4. -include("epgsql.hrl").
  5. -export([ all/0
  6. , init_per_suite/1
  7. , end_per_suite/1
  8. , connect_in_repl_mode/1
  9. , create_drop_replication_slot/1
  10. , replication_sync/1
  11. , replication_async/1
  12. , replication_async_active_n_socket/1
  13. , replication_sync_active_n_socket/1
  14. %% Callbacks
  15. , handle_x_log_data/4
  16. , handle_socket_passive/1
  17. ]).
  18. init_per_suite(Config) ->
  19. [{module, epgsql} | Config].
  20. end_per_suite(_Config) ->
  21. ok.
  22. all() ->
  23. [ connect_in_repl_mode
  24. , create_drop_replication_slot
  25. , replication_async
  26. , replication_sync
  27. , replication_async_active_n_socket
  28. , replication_sync_active_n_socket
  29. ].
  30. connect_in_repl_mode(Config) ->
  31. epgsql_ct:connect_only(
  32. Config,
  33. [ "epgsql_test_replication"
  34. , "epgsql_test_replication"
  35. , [{database, "epgsql_test_db1"}, {replication, "database"}]
  36. ]).
  37. create_drop_replication_slot(Config) ->
  38. epgsql_ct:with_connection(
  39. Config,
  40. fun(C) ->
  41. create_replication_slot(Config, C),
  42. drop_replication_slot(Config, C)
  43. end,
  44. "epgsql_test_replication",
  45. [{replication, "database"}]).
  46. replication_async(Config) ->
  47. replication_test_run(Config, self()).
  48. replication_sync(Config) ->
  49. replication_test_run(Config, ?MODULE).
  50. replication_async_active_n_socket(Config) ->
  51. replication_test_run(Config, self(), [{tcp_opts, [{active, 1}]}, {ssl_opts, [{active, 1}]}]).
  52. replication_sync_active_n_socket(Config) ->
  53. replication_test_run(Config, ?MODULE, [{tcp_opts, [{active, 1}]}, {ssl_opts, [{active, 1}]}]).
  54. replication_test_run(Config, Callback) ->
  55. replication_test_run(Config, Callback, []).
  56. replication_test_run(Config, Callback, ExtOpts) ->
  57. Module = ?config(module, Config),
  58. {Queries, ReplicationMsgs} = gen_query_and_replication_msgs(lists:seq(100, 110)),
  59. epgsql_ct:with_connection(
  60. Config,
  61. fun(C) ->
  62. create_replication_slot(Config, C),
  63. %% new connection because main is in the replication mode
  64. epgsql_ct:with_connection(
  65. Config,
  66. fun(C2) ->
  67. ExpectedResult = lists:duplicate(length(Queries), {ok, 1}),
  68. Res = Module:squery(C2, lists:flatten(Queries)),
  69. ?assertEqual(ExpectedResult, Res)
  70. end),
  71. Module:start_replication(C, "epgsql_test", Callback, {C, self()}, "0/0"),
  72. ok = receive_replication_msgs(Module, ReplicationMsgs, C, [])
  73. end,
  74. "epgsql_test_replication",
  75. [{replication, "database"} | ExtOpts]),
  76. %% cleanup
  77. epgsql_ct:with_connection(
  78. Config,
  79. fun(C) -> drop_replication_slot(Config, C) end,
  80. "epgsql_test_replication",
  81. [{replication, "database"}]).
  82. create_replication_slot(Config, Connection) ->
  83. Module = ?config(module, Config),
  84. {ok, Cols, Rows} =
  85. Module:squery(Connection,
  86. "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
  87. ?assertMatch([ #column{name = <<"slot_name">>}
  88. , #column{name = <<"consistent_point">>}
  89. , #column{name = <<"snapshot_name">>}
  90. , #column{name = <<"output_plugin">>}
  91. ],
  92. Cols),
  93. ?assertMatch([{<<"epgsql_test">>, _, _, <<"test_decoding">>}], Rows).
  94. drop_replication_slot(Config, Connection) ->
  95. Module = ?config(module, Config),
  96. Result = Module:squery(Connection, "DROP_REPLICATION_SLOT ""epgsql_test"""),
  97. case ?config(version, ?config(pg_config, Config)) >= [13, 0] of
  98. true -> ?assertMatch({ok, _, _}, Result);
  99. false -> ?assertMatch([{ok, _, _}, {ok, _, _}], Result)
  100. end.
  101. gen_query_and_replication_msgs(Ids) ->
  102. QInsFmt = "INSERT INTO test_table1 (id, value) VALUES (~b, '~s');",
  103. QDelFmt = "DELETE FROM test_table1 WHERE id = ~b;",
  104. RmInsFmt = "table public.test_table1: INSERT: id[integer]:~b value[text]:'~s'",
  105. RmDelFmt = "table public.test_table1: DELETE: id[integer]:~b",
  106. LongBin = base64:encode(crypto:strong_rand_bytes(254)),
  107. lists:foldl(
  108. fun(Id, {Qs, RMs}) ->
  109. QIns = lists:flatten(io_lib:format(QInsFmt, [Id, LongBin])),
  110. QDel = lists:flatten(io_lib:format(QDelFmt, [Id])),
  111. RmIns = iolist_to_binary(io_lib:format(RmInsFmt, [Id, LongBin])),
  112. RmDel = iolist_to_binary(io_lib:format(RmDelFmt, [Id])),
  113. {Qs ++ [QIns, QDel], RMs ++ [RmIns, RmDel]}
  114. end,
  115. {[], []},
  116. Ids).
  117. receive_replication_msgs(Module, Pattern, Pid, ReceivedMsgs) ->
  118. receive
  119. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"BEGIN", _/binary>>}} ->
  120. receive_replication_msgs(Module, Pattern, Pid, [begin_msg | ReceivedMsgs]);
  121. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"COMMIT", _/binary>>}} ->
  122. ensure_no_socket_passive_msgs(Module, Pid),
  123. case lists:reverse(ReceivedMsgs) of
  124. [begin_msg, row_msg | _] -> ok;
  125. _ -> error_replication_messages_not_received
  126. end;
  127. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, Msg}} ->
  128. [Msg | T] = Pattern,
  129. receive_replication_msgs(Module, T, Pid, [row_msg | ReceivedMsgs]);
  130. {epgsql, Pid, socket_passive} ->
  131. Module:activate(Pid),
  132. receive_replication_msgs(Module, Pattern, Pid, ReceivedMsgs)
  133. after
  134. 60000 ->
  135. error_timeout
  136. end.
  137. ensure_no_socket_passive_msgs(Module, Pid) ->
  138. receive
  139. {epgsql, Pid, socket_passive} ->
  140. Module:activate(Pid),
  141. ensure_no_socket_passive_msgs(Module, Pid)
  142. after
  143. 100 ->
  144. ok
  145. end.
  146. handle_x_log_data(StartLSN, EndLSN, Data, CbState) ->
  147. {C, Pid} = CbState,
  148. Pid ! {epgsql, C, {x_log_data, StartLSN, EndLSN, Data}},
  149. {ok, EndLSN, EndLSN, CbState}.
  150. handle_socket_passive({C, _Pid} = CbState) ->
  151. spawn(fun() -> epgsql:activate(C) end),
  152. {ok, CbState}.