epgsql_replication_SUITE.erl 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. -module(epgsql_replication_SUITE).
  2. -include_lib("eunit/include/eunit.hrl").
  3. -include_lib("common_test/include/ct.hrl").
  4. -include("epgsql.hrl").
  5. -export([
  6. init_per_suite/1,
  7. all/0,
  8. end_per_suite/1,
  9. connect_in_repl_mode/1,
  10. create_drop_replication_slot/1,
  11. replication_sync/1,
  12. replication_async/1,
  13. %% Callbacks
  14. handle_x_log_data/4
  15. ]).
  16. init_per_suite(Config) ->
  17. [{module, epgsql}|Config].
  18. end_per_suite(_Config) ->
  19. ok.
  20. all() ->
  21. [
  22. connect_in_repl_mode,
  23. create_drop_replication_slot,
  24. replication_async,
  25. replication_sync
  26. ].
  27. connect_in_repl_mode(Config) ->
  28. epgsql_ct:connect_only(Config, ["epgsql_test_replication",
  29. "epgsql_test_replication",
  30. [{database, "epgsql_test_db1"}, {replication, "database"}]]).
  31. create_drop_replication_slot(Config) ->
  32. Module = ?config(module, Config),
  33. epgsql_ct:with_connection(
  34. Config,
  35. fun(C) ->
  36. {ok, Cols, Rows} = Module:squery(C, "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
  37. [#column{name = <<"slot_name">>}, #column{name = <<"consistent_point">>},
  38. #column{name = <<"snapshot_name">>}, #column{name = <<"output_plugin">>}] = Cols,
  39. [{<<"epgsql_test">>, _, _, <<"test_decoding">>}] = Rows,
  40. [{ok, _, _}, {ok, _, _}] = Module:squery(C, "DROP_REPLICATION_SLOT ""epgsql_test""")
  41. end,
  42. "epgsql_test_replication",
  43. [{replication, "database"}]).
  44. replication_async(Config) ->
  45. replication_test_run(Config, self()).
  46. replication_sync(Config) ->
  47. replication_test_run(Config, ?MODULE).
  48. replication_test_run(Config, Callback) ->
  49. Module = ?config(module, Config),
  50. epgsql_ct:with_connection(
  51. Config,
  52. fun(C) ->
  53. {ok, _, _} = Module:squery(C, "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
  54. %% new connection because main id in a replication mode
  55. epgsql_ct:with_connection(
  56. Config,
  57. fun(C2) ->
  58. [{ok, 1},{ok, 1}] = Module:squery(C2,
  59. "insert into test_table1 (id, value) values (5, 'five');delete from test_table1 where id = 5;")
  60. end,
  61. "epgsql_test_db1"),
  62. Module:start_replication(C, "epgsql_test", Callback, {C, self()}, "0/0"),
  63. ok = receive_replication_msgs(
  64. [<<"table public.test_table1: INSERT: id[integer]:5 value[text]:'five'">>,
  65. <<"table public.test_table1: DELETE: id[integer]:5">>], C, [])
  66. end,
  67. "epgsql_test_replication",
  68. [{replication, "database"}]),
  69. %% cleanup
  70. epgsql_ct:with_connection(
  71. Config,
  72. fun(C) ->
  73. [{ok, _, _}, {ok, _, _}] = Module:squery(C, "DROP_REPLICATION_SLOT ""epgsql_test""")
  74. end,
  75. "epgsql_test_replication",
  76. [{replication, "database"}]).
  77. receive_replication_msgs(Pattern, Pid, ReceivedMsgs) ->
  78. receive
  79. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"BEGIN", _/binary>>}} ->
  80. receive_replication_msgs(Pattern, Pid, [begin_msg | ReceivedMsgs]);
  81. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"COMMIT", _/binary>>}} ->
  82. case lists:reverse(ReceivedMsgs) of
  83. [begin_msg, row_msg | _] -> ok;
  84. _ -> error_replication_messages_not_received
  85. end;
  86. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, Msg}} ->
  87. [Msg | T] = Pattern,
  88. receive_replication_msgs(T, Pid, [row_msg | ReceivedMsgs])
  89. after
  90. 60000 ->
  91. error_timeout
  92. end.
  93. handle_x_log_data(StartLSN, EndLSN, Data, CbState) ->
  94. {C, Pid} = CbState,
  95. Pid ! {epgsql, C, {x_log_data, StartLSN, EndLSN, Data}},
  96. {ok, EndLSN, EndLSN, CbState}.