epgsql_replication_tests.erl 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. -module(epgsql_replication_tests).
  2. -export([run_tests/0]).
  3. -compile([export_all]).
  4. -include_lib("eunit/include/eunit.hrl").
  5. -include("epgsql.hrl").
  6. connect_in_repl_mode_test(Module) ->
  7. epgsql_tests:connect_only(Module, ["epgsql_test_replication",
  8. "epgsql_test_replication",
  9. [{database, "epgsql_test_db1"}, {replication, "database"}]]).
  10. create_drop_replication_slot_test(Module) ->
  11. epgsql_tests:with_connection(
  12. Module,
  13. fun(C) ->
  14. {ok, Cols, Rows} = Module:squery(C, "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
  15. [#column{name = <<"slot_name">>}, #column{name = <<"consistent_point">>},
  16. #column{name = <<"snapshot_name">>}, #column{name = <<"output_plugin">>}] = Cols,
  17. [{<<"epgsql_test">>, _, _, <<"test_decoding">>}] = Rows,
  18. [{ok, _, _}, {ok, _, _}] = Module:squery(C, "DROP_REPLICATION_SLOT ""epgsql_test""")
  19. end,
  20. "epgsql_test_replication",
  21. [{replication, "database"}]).
  22. replication_async_test(Module) ->
  23. replication_test_run(Module, self()).
  24. replication_sync_test(Module) ->
  25. replication_test_run(Module, ?MODULE).
  26. %% -- run all tests --
  27. run_tests() ->
  28. Files = filelib:wildcard(filename:dirname(code:which(epgsql_replication_tests))
  29. ++ "/*tests.beam"),
  30. Mods = [list_to_atom(filename:basename(F, ".beam")) || F <- Files],
  31. eunit:test(Mods, []).
  32. all_test_() ->
  33. Tests =
  34. lists:map(
  35. fun({Name, _}) ->
  36. {Name, fun(X) -> ?MODULE:Name(X) end}
  37. end,
  38. lists:filter(
  39. fun({Name, Arity}) ->
  40. case {lists:suffix("_test", atom_to_list(Name)), Arity} of
  41. {true, 1} -> true;
  42. _ -> false
  43. end
  44. end,
  45. ?MODULE:module_info(functions))),
  46. WithModule =
  47. fun(Module) ->
  48. lists:map(
  49. fun({Name, Test}) ->
  50. {lists:flatten(
  51. io_lib:format("~s(~s)", [Name, Module])),
  52. fun() -> Test(Module) end}
  53. end,
  54. Tests)
  55. end,
  56. [WithModule(epgsql)
  57. ].
  58. %% -- internal functions --
  59. replication_test_run(Module, Callback) ->
  60. epgsql_tests:with_connection(
  61. Module,
  62. fun(C) ->
  63. {ok, _, _} = Module:squery(C, "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
  64. %% new connection because main id in a replication mode
  65. epgsql_tests:with_connection(
  66. Module,
  67. fun(C2) ->
  68. [{ok, 1},{ok, 1}] = Module:squery(C2,
  69. "insert into test_table1 (id, value) values (5, 'five');delete from test_table1 where id = 5;")
  70. end,
  71. "epgsql_test_db1"),
  72. Module:start_replication(C, "epgsql_test", Callback, {C, self()}, "0/0"),
  73. ok = receive_replication_msgs(
  74. [<<"table public.test_table1: INSERT: id[integer]:5 value[text]:'five'">>,
  75. <<"table public.test_table1: DELETE: id[integer]:5">>], C, [])
  76. end,
  77. "epgsql_test_replication",
  78. [{replication, "database"}]),
  79. %% cleanup
  80. epgsql_tests:with_connection(
  81. Module,
  82. fun(C) ->
  83. [{ok, _, _}, {ok, _, _}] = Module:squery(C, "DROP_REPLICATION_SLOT ""epgsql_test""")
  84. end,
  85. "epgsql_test_replication",
  86. [{replication, "database"}]).
  87. receive_replication_msgs(Pattern, Pid, ReceivedMsgs) ->
  88. receive
  89. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"BEGIN", _/binary>>}} ->
  90. receive_replication_msgs(Pattern, Pid, [begin_msg | ReceivedMsgs]);
  91. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"COMMIT", _/binary>>}} ->
  92. case lists:reverse(ReceivedMsgs) of
  93. [begin_msg, row_msg | _] -> ok;
  94. _ -> error_replication_messages_not_received
  95. end;
  96. {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, Msg}} ->
  97. [Msg | T] = Pattern,
  98. receive_replication_msgs(T, Pid, [row_msg | ReceivedMsgs])
  99. after
  100. 60000 ->
  101. error_timeout
  102. end.
  103. handle_x_log_data(StartLSN, EndLSN, Data, CbState) ->
  104. {C, Pid} = CbState,
  105. Pid ! {epgsql, C, {x_log_data, StartLSN, EndLSN, Data}},
  106. {ok, EndLSN, EndLSN, CbState}.