epgsql_pool_SUITE.erl 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. -module(epgsql_pool_SUITE).
  2. %% test needs connection to database
  3. %% and database should be inited with ./testdb_schema.sql
  4. -include("epgsql_pool.hrl").
  5. -include_lib("common_test/include/ct.hrl").
  6. -include_lib("eunit/include/eunit.hrl").
  7. -export([all/0,
  8. init_per_suite/1, end_per_suite/1, init_per_testcase/2, end_per_testcase/2,
  9. start_stop_test/1, equery_test/1, transaction_test/1, reconnect_test/1
  10. ]).
  11. -define(SELECT_ITEMS_QUERY, "SELECT id, category_id, title, num FROM item ORDER by id ASC").
  12. all() ->
  13. [start_stop_test,
  14. equery_test,
  15. transaction_test,
  16. reconnect_test
  17. ].
  18. init_per_suite(Config) ->
  19. application:ensure_all_started(epgsql_pool),
  20. Config.
  21. end_per_suite(Config) ->
  22. application:stop(epgsql_pool),
  23. Config.
  24. init_per_testcase(_, Config) ->
  25. Params = #epgsql_connection_params{host = "localhost", port = 5432, username = "test", password = "test", database = "testdb"},
  26. {ok, Connection} = epgsql_pool_utils:open_connection(Params),
  27. #epgsql_connection{sock = Sock} = Connection,
  28. epgsql:equery(Sock, "TRUNCATE TABLE item"),
  29. epgsql:equery(Sock, "TRUNCATE TABLE category CASCADE"),
  30. [{connection, Connection} | proplists:delete(connection, Config)].
  31. end_per_testcase(_, Config) ->
  32. Connection = proplists:get_value(connection, Config),
  33. Connection2 = epgsql_pool_utils:close_connection(Connection),
  34. #epgsql_connection{sock = undefined} = Connection2,
  35. [{connection, Connection2} | proplists:delete(connection, Config)].
  36. start_stop_test(Config) ->
  37. Params = #epgsql_connection_params{host = "localhost", port = 5432,
  38. username="test", password="test",
  39. database="testdb"},
  40. epgsql_pool_settings:set_connection_params(my_pool, Params),
  41. {ok, _} = epgsql_pool:start(my_pool, 5, 10),
  42. ok = epgsql_pool:stop(my_pool),
  43. ok.
  44. equery_test(Config) ->
  45. Connection = proplists:get_value(connection, Config),
  46. epgsql_pool_settings:set_connection_params(my_pool, Connection#epgsql_connection.params),
  47. {ok, _} = epgsql_pool:start(my_pool, 5, 10),
  48. {ok, 3, _, Ids} = epgsql_pool:equery(my_pool,
  49. "INSERT INTO category (title) "
  50. "VALUES ('cat 1'), ('cat 2'), ('cat 3') "
  51. "RETURNING id"),
  52. WaitForRows = lists:map(fun({{Id}, Title}) -> {Id, Title} end,
  53. lists:zip(Ids, [<<"cat 1">>, <<"cat 2">>, <<"cat 3">>])),
  54. {ok, _, Rows} = epgsql_pool:equery(my_pool, "SELECT id, title FROM category ORDER by id ASC"),
  55. ct:pal("Rows ~p", [Rows]),
  56. ?assertEqual(WaitForRows, Rows),
  57. ok = epgsql_pool:stop(my_pool),
  58. ok.
  59. transaction_test(Config) ->
  60. Connection = proplists:get_value(connection, Config),
  61. epgsql_pool_settings:set_connection_params(my_pool, Connection#epgsql_connection.params),
  62. {ok, _} = epgsql_pool:start(my_pool, 5, 10),
  63. {FirstCatId, CatIds2, ItemIds2} =
  64. epgsql_pool:transaction(my_pool,
  65. fun(Worker) ->
  66. ct:pal("worker:~p", [Worker]),
  67. {ok, 3, _, CatIds0} =
  68. epgsql_pool:equery(Worker,
  69. "INSERT INTO category (title) "
  70. "VALUES ('cat 4'), ('cat 5'), ('cat 6') "
  71. "RETURNING id"),
  72. CatIds1 = lists:map(fun({Cid}) -> Cid end, CatIds0),
  73. CatId = hd(CatIds1),
  74. {ok, 2, _, ItemIds0} =
  75. epgsql_pool:equery(Worker,
  76. "INSERT INTO item (category_id, title, num) "
  77. "VALUES ($1, 'item 1', 5), ($1, 'item 2', 7) "
  78. "RETURNING id", [CatId]),
  79. ItemIds1 = lists:map(fun({Iid}) -> Iid end, ItemIds0),
  80. {CatId, CatIds1, ItemIds1}
  81. end),
  82. WaitForCats = lists:zip(CatIds2, [<<"cat 4">>, <<"cat 5">>, <<"cat 6">>]),
  83. {ok, _, CatRows} = epgsql_pool:equery(my_pool, "SELECT id, title FROM category ORDER by id ASC"),
  84. ct:pal("CatRows ~p", [CatRows]),
  85. ?assertEqual(WaitForCats, CatRows),
  86. WaitForItems = lists:map(fun({ItemId, {Title, Num}}) -> {ItemId, FirstCatId, Title, Num} end,
  87. lists:zip(ItemIds2, [{<<"item 1">>, 5}, {<<"item 2">>, 7}])),
  88. {ok, _, ItemRows} = epgsql_pool:equery(my_pool, ?SELECT_ITEMS_QUERY),
  89. ct:pal("ItemRows ~p", [ItemRows]),
  90. ?assertEqual(WaitForItems, ItemRows),
  91. try
  92. epgsql_pool:transaction(my_pool,
  93. fun(Worker) ->
  94. ct:pal("worker:~p", [Worker]),
  95. {ok, 2} =
  96. epgsql_pool:equery(Worker,
  97. "INSERT INTO item (category_id, title, num) "
  98. "VALUES ($1, 'item 3', 55), ($1, 'item 4', 77) ",
  99. [FirstCatId]),
  100. {ok, _, ItemRows2} = epgsql_pool:equery(Worker, ?SELECT_ITEMS_QUERY),
  101. ct:pal("ItemRows2 ~p", [ItemRows2]),
  102. ?assertMatch([{_, FirstCatId, <<"item 1">>, 5},
  103. {_, FirstCatId, <<"item 2">>, 7},
  104. {_, FirstCatId, <<"item 3">>, 55},
  105. {_, FirstCatId, <<"item 4">>, 77}],
  106. ItemRows2),
  107. throw(cancel_transaction)
  108. end)
  109. catch
  110. throw:cancel_transaction -> ok
  111. end,
  112. %% items not changes after calcelled transaction
  113. {ok, _, ItemRows} = epgsql_pool:equery(my_pool, ?SELECT_ITEMS_QUERY),
  114. ok = epgsql_pool:stop(my_pool),
  115. ok.
  116. reconnect_test(Config) ->
  117. Connection = proplists:get_value(connection, Config),
  118. epgsql_pool_settings:set_connection_params(my_pool, Connection#epgsql_connection.params),
  119. {ok, _} = epgsql_pool:start(my_pool, 5, 10),
  120. Worker = pooler:take_member(my_pool, 1000) ,
  121. {state, my_pool, #epgsql_connection{sock = Sock1}} = sys:get_state(Worker),
  122. ct:pal("Worker: ~p, sock: ~p", [Worker, Sock1]),
  123. R1 = epgsql_pool:equery(Worker, ?SELECT_ITEMS_QUERY),
  124. ct:pal("first query ~p", [R1]),
  125. {ok, _, []} = R1,
  126. ct:pal("~p close_connection", [Sock1]),
  127. exit(Sock1, close_connection),
  128. R2 = epgsql_pool:equery(Worker, "select * from item"),
  129. ct:pal("second query goes immediatelly ~p", [R2]),
  130. {error, reconnecting} = R2,
  131. timer:sleep(50),
  132. R3 = epgsql_pool:equery(Worker, "select * from item"),
  133. ct:pal("third query goes after 50 ms ~p", [R3]),
  134. {error, reconnecting} = R3,
  135. timer:sleep(150),
  136. R4 = epgsql_pool:equery(Worker, "select * from item"),
  137. ct:pal("fouth query goes after 200 ms ~p", [R4]),
  138. {ok, _, []} = R4,
  139. {state, my_pool, #epgsql_connection{sock = Sock2}} = sys:get_state(Worker),
  140. ct:pal("Worker: ~p, sock: ~p", [Worker, Sock2]),
  141. ?assertNotEqual(Sock1, Sock2),
  142. ok = epgsql_pool:stop(my_pool),
  143. ok.