epgsql_pool_SUITE.erl 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. -module(epgsql_pool_SUITE).
  2. %% test needs connection to database
  3. %% and database should be inited with ./testdb_schema.sql
  4. -include("epgsql_pool.hrl").
  5. -include_lib("common_test/include/ct.hrl").
  6. -include_lib("eunit/include/eunit.hrl").
  7. -export([all/0,
  8. init_per_suite/1, end_per_suite/1, init_per_testcase/2, end_per_testcase/2,
  9. equery_test/1, transaction_test/1, reconnect_test/1
  10. ]).
  11. -define(SELECT_ITEMS_QUERY, "SELECT id, category_id, title, num FROM item ORDER by id ASC").
  12. all() ->
  13. [equery_test,
  14. transaction_test,
  15. reconnect_test
  16. ].
  17. init_per_suite(Config) ->
  18. application:ensure_all_started(epgsql_pool),
  19. Config.
  20. end_per_suite(Config) ->
  21. application:stop(epgsql_pool),
  22. Config.
  23. init_per_testcase(_, Config) ->
  24. Params = #epgsql_connection_params{host = "localhost", port = 5432, username = "test", password = "test", database = "testdb"},
  25. epgsql_pool_settings:set_connection_params(my_pool, Params),
  26. {ok, _} = epgsql_pool:start(my_pool, 5, 10),
  27. epgsql_pool:equery(my_pool, "TRUNCATE TABLE item"),
  28. epgsql_pool:equery(my_pool, "TRUNCATE TABLE category CASCADE"),
  29. Config.
  30. end_per_testcase(_, Config) ->
  31. ok = epgsql_pool:stop(my_pool),
  32. Config.
  33. equery_test(Config) ->
  34. {ok, 3, _, Ids} = epgsql_pool:equery(my_pool,
  35. "INSERT INTO category (title) "
  36. "VALUES ('cat 1'), ('cat 2'), ('cat 3') "
  37. "RETURNING id"),
  38. WaitForRows = lists:map(fun({{Id}, Title}) -> {Id, Title} end,
  39. lists:zip(Ids, [<<"cat 1">>, <<"cat 2">>, <<"cat 3">>])),
  40. {ok, _, Rows} = epgsql_pool:equery(my_pool, "SELECT id, title FROM category ORDER by id ASC"),
  41. ct:pal("Rows ~p", [Rows]),
  42. ?assertEqual(WaitForRows, Rows),
  43. ok.
  44. transaction_test(Config) ->
  45. {FirstCatId, CatIds2, ItemIds2} =
  46. epgsql_pool:transaction(my_pool,
  47. fun(Worker) ->
  48. ct:pal("worker:~p", [Worker]),
  49. {ok, 3, _, CatIds0} =
  50. epgsql_pool:equery(Worker,
  51. "INSERT INTO category (title) "
  52. "VALUES ('cat 4'), ('cat 5'), ('cat 6') "
  53. "RETURNING id"),
  54. CatIds1 = lists:map(fun({Cid}) -> Cid end, CatIds0),
  55. CatId = hd(CatIds1),
  56. {ok, 2, _, ItemIds0} =
  57. epgsql_pool:equery(Worker,
  58. "INSERT INTO item (category_id, title, num) "
  59. "VALUES ($1, 'item 1', 5), ($1, 'item 2', 7) "
  60. "RETURNING id", [CatId]),
  61. ItemIds1 = lists:map(fun({Iid}) -> Iid end, ItemIds0),
  62. {CatId, CatIds1, ItemIds1}
  63. end),
  64. WaitForCats = lists:zip(CatIds2, [<<"cat 4">>, <<"cat 5">>, <<"cat 6">>]),
  65. {ok, _, CatRows} = epgsql_pool:equery(my_pool, "SELECT id, title FROM category ORDER by id ASC"),
  66. ct:pal("CatRows ~p", [CatRows]),
  67. ?assertEqual(WaitForCats, CatRows),
  68. WaitForItems = lists:map(fun({ItemId, {Title, Num}}) -> {ItemId, FirstCatId, Title, Num} end,
  69. lists:zip(ItemIds2, [{<<"item 1">>, 5}, {<<"item 2">>, 7}])),
  70. {ok, _, ItemRows} = epgsql_pool:equery(my_pool, ?SELECT_ITEMS_QUERY),
  71. ct:pal("ItemRows ~p", [ItemRows]),
  72. ?assertEqual(WaitForItems, ItemRows),
  73. try
  74. epgsql_pool:transaction(my_pool,
  75. fun(Worker) ->
  76. ct:pal("worker:~p", [Worker]),
  77. {ok, 2} =
  78. epgsql_pool:equery(Worker,
  79. "INSERT INTO item (category_id, title, num) "
  80. "VALUES ($1, 'item 3', 55), ($1, 'item 4', 77) ",
  81. [FirstCatId]),
  82. {ok, _, ItemRows2} = epgsql_pool:equery(Worker, ?SELECT_ITEMS_QUERY),
  83. ct:pal("ItemRows2 ~p", [ItemRows2]),
  84. ?assertMatch([{_, FirstCatId, <<"item 1">>, 5},
  85. {_, FirstCatId, <<"item 2">>, 7},
  86. {_, FirstCatId, <<"item 3">>, 55},
  87. {_, FirstCatId, <<"item 4">>, 77}],
  88. ItemRows2),
  89. throw(cancel_transaction)
  90. end)
  91. catch
  92. throw:cancel_transaction -> ok
  93. end,
  94. %% items not changes after calcelled transaction
  95. {ok, _, ItemRows} = epgsql_pool:equery(my_pool, ?SELECT_ITEMS_QUERY),
  96. ok.
  97. reconnect_test(Config) ->
  98. Worker = pooler:take_member(my_pool, 1000) ,
  99. {state, my_pool, #epgsql_connection{sock = Sock1}} = sys:get_state(Worker),
  100. ct:pal("Worker: ~p, sock: ~p", [Worker, Sock1]),
  101. R1 = epgsql_pool:equery(Worker, ?SELECT_ITEMS_QUERY),
  102. ct:pal("first query ~p", [R1]),
  103. {ok, _, []} = R1,
  104. ct:pal("~p close_connection", [Sock1]),
  105. exit(Sock1, close_connection),
  106. R2 = epgsql_pool:equery(Worker, "select * from item"),
  107. ct:pal("second query goes immediatelly ~p", [R2]),
  108. {error, reconnecting} = R2,
  109. timer:sleep(50),
  110. R3 = epgsql_pool:equery(Worker, "select * from item"),
  111. ct:pal("third query goes after 50 ms ~p", [R3]),
  112. {error, reconnecting} = R3,
  113. timer:sleep(150),
  114. R4 = epgsql_pool:equery(Worker, "select * from item"),
  115. ct:pal("fouth query goes after 200 ms ~p", [R4]),
  116. {ok, _, []} = R4,
  117. {state, my_pool, #epgsql_connection{sock = Sock2}} = sys:get_state(Worker),
  118. ct:pal("Worker: ~p, sock: ~p", [Worker, Sock2]),
  119. ?assertNotEqual(Sock1, Sock2),
  120. ok.