Browse Source

refactor, optimize query and transaction

Yuriy Zhloba 9 years ago
parent
commit
c8a0e5927b
4 changed files with 109 additions and 100 deletions
  1. 53 47
      src/epgsql_pool.erl
  2. 4 1
      src/epgsql_pool_app.erl
  3. 11 4
      src/epgsql_pool_worker.erl
  4. 41 48
      test/epgsql_pool_SUITE.erl

+ 53 - 47
src/epgsql_pool.erl

@@ -1,7 +1,7 @@
 -module(epgsql_pool).
 -module(epgsql_pool).
 
 
 -export([start/3, stop/1,
 -export([start/3, stop/1,
-         equery/2, equery/3, equery/4,
+         query/2, query/3, query/4,
          transaction/2
          transaction/2
         ]).
         ]).
 
 
@@ -17,11 +17,11 @@
 start(PoolName0, InitCount, MaxCount) ->
 start(PoolName0, InitCount, MaxCount) ->
     PoolName = epgsql_pool_utils:pool_name_to_atom(PoolName0),
     PoolName = epgsql_pool_utils:pool_name_to_atom(PoolName0),
     PoolConfig = [
     PoolConfig = [
-        {name, PoolName},
-        {init_count, InitCount},
-        {max_count, MaxCount},
-        {start_mfa, {epgsql_pool_worker, start_link, [PoolName]}}
-    ],
+                  {name, PoolName},
+                  {init_count, InitCount},
+                  {max_count, MaxCount},
+                  {start_mfa, {epgsql_pool_worker, start_link, [PoolName]}}
+                 ],
     pooler:new_pool(PoolConfig).
     pooler:new_pool(PoolConfig).
 
 
 
 
@@ -30,59 +30,65 @@ stop(PoolName) ->
     pooler:rm_pool(epgsql_pool_utils:pool_name_to_atom(PoolName)).
     pooler:rm_pool(epgsql_pool_utils:pool_name_to_atom(PoolName)).
 
 
 
 
--spec equery(pool_name() | pid(), epgsql:sql_query()) -> epgsql:reply().
-equery(PoolNameOrWorker, Stmt) ->
-    equery(PoolNameOrWorker, Stmt, [], []).
-
-
--spec equery(pool_name() | pid(), epgsql:sql_query(), [epgsql:bind_param()]) -> epgsql:reply().
-equery(PoolNameOrWorker, Stmt, Params) ->
-    equery(PoolNameOrWorker, Stmt, Params, []).
-
-
--spec equery(pool_name() | pid(), epgsql:sql_query(), [epgsql:bind_param()], [proplists:option()]) -> epgsql:reply().
-equery(Worker, Stmt, Params, Options) when is_pid(Worker) ->
-    Timeout = epgsql_pool_settings:get(query_timeout),
-    % TODO process timeout,
-    % try-catch
-    % send cancel
-    % log error
-    % reply to client with error
-    % reconnect
-    % return to pool
-    Res = gen_server:call(Worker, {equery, Stmt, Params}, Timeout),
-    ErrorHandler = proplists:get_value(error_handler, Options),
-    case {ErrorHandler, Res} of
-        {undefined, _} -> Res;
-        {Fun, {error, Error}} -> Fun(Worker, Stmt, Params, Error);
-        _ -> Res
-    end;
-
-equery(PoolName, Stmt, Params, Options) ->
-    transaction(PoolName,
-                fun(Worker) ->
-                        equery(Worker, Stmt, Params, Options)
-                end).
+-spec query(pool_name() | pid(), epgsql:sql_query()) -> epgsql:reply().
+query(PoolNameOrWorker, Stmt) ->
+    query(PoolNameOrWorker, Stmt, [], []).
+
+
+-spec query(pool_name() | pid(), epgsql:sql_query(), [epgsql:bind_param()]) -> epgsql:reply().
+query(PoolNameOrWorker, Stmt, Params) ->
+    query(PoolNameOrWorker, Stmt, Params, []).
+
+
+-spec query(pool_name() | pid(), epgsql:sql_query(), [epgsql:bind_param()], [proplists:option()]) -> epgsql:reply().
+query(Worker, Stmt, Params, Options) when is_pid(Worker) ->
+    Timeout = case proplists:get_value(timeout, Options) of
+                  undefined -> epgsql_pool_settings:get(query_timeout);
+                  V -> V
+              end,
+    error_logger:info_msg("Worker:~p Stmt:~p Params:~p", [Worker, Stmt, Params]), %% TEMP
+    %% TODO process timeout,
+    %% try-catch
+    %% send cancel
+    %% log error
+    %% reply to client with error
+    %% reconnect
+    %% return to pool
+    Res = gen_server:call(Worker, {query, Stmt, Params}, Timeout),
+    Res;
+
+query(PoolName, Stmt, Params, Options) ->
+    case get_worker(PoolName) of
+        {ok, Worker} -> query(Worker, Stmt, Params, Options);
+        {error, Reason} -> {error, Reason}
+    end.
 
 
 
 
 -spec transaction(pool_name(), fun()) -> epgsql:reply() | {error, term()}.
 -spec transaction(pool_name(), fun()) -> epgsql:reply() | {error, term()}.
-transaction(PoolName0, Fun) ->
-    PoolName = epgsql_pool_utils:pool_name_to_atom(PoolName0),
-    Timeout = epgsql_pool_settings:get(pooler_get_worker_timeout),
-    case pooler:take_member(PoolName, Timeout) of
-        Worker when is_pid(Worker) ->
+transaction(PoolName, Fun) ->
+    case get_worker(PoolName) of
+        {ok, Worker} ->
             try
             try
-                equery(Worker, "BEGIN"),
+                gen_server:call(Worker, {query, "BEGIN", []}),
                 Result = Fun(Worker),
                 Result = Fun(Worker),
-                equery(Worker, "COMMIT"),
+                gen_server:call(Worker, {query, "COMMIT", []}),
                 Result
                 Result
             catch
             catch
                 Err:Reason ->
                 Err:Reason ->
-                    equery(Worker, "ROLLBACK"),
+                    gen_server:call(Worker, {query, "ROLLBACK", []}),
                     erlang:raise(Err, Reason, erlang:get_stacktrace())
                     erlang:raise(Err, Reason, erlang:get_stacktrace())
             after
             after
                 pooler:return_member(PoolName, Worker, ok)
                 pooler:return_member(PoolName, Worker, ok)
             end;
             end;
+        {error, Reason} -> {error, Reason}
+    end.
+
+
+get_worker(PoolName0) ->
+    PoolName = epgsql_pool_utils:pool_name_to_atom(PoolName0),
+    Timeout = epgsql_pool_settings:get(pooler_get_worker_timeout),
+    case pooler:take_member(PoolName, Timeout) of
+        Worker when is_pid(Worker) -> {ok, Worker};
         error_no_members ->
         error_no_members ->
             PoolStats = pooler:pool_stats(PoolName),
             PoolStats = pooler:pool_stats(PoolName),
             error_logger:error_msg("Pool ~p overload: ~p", [PoolName, PoolStats]),
             error_logger:error_msg("Pool ~p overload: ~p", [PoolName, PoolStats]),

+ 4 - 1
src/epgsql_pool_app.erl

@@ -27,5 +27,8 @@ test_run() ->
                                        database = "testdb"},
                                        database = "testdb"},
     epgsql_pool_settings:set_connection_params(my_pool, Params),
     epgsql_pool_settings:set_connection_params(my_pool, Params),
     {ok, _} = epgsql_pool:start(my_pool, 1, 2),
     {ok, _} = epgsql_pool:start(my_pool, 1, 2),
-
+    Res1 = epgsql_pool:query(my_pool, "select * from category"),
+    error_logger:info_msg("~p", [Res1]),
+    Res2 = epgsql_pool:query(my_pool, "select * from category where id = $1", [1], [{timeout, 200}]),
+    error_logger:info_msg("~p", [Res2]),
     ok.
     ok.

+ 11 - 4
src/epgsql_pool_worker.erl

@@ -37,19 +37,26 @@ init(PoolName) ->
 
 
 
 
 -spec handle_call(gs_request(), gs_from(), gs_reply()) -> gs_call_reply().
 -spec handle_call(gs_request(), gs_from(), gs_reply()) -> gs_call_reply().
-handle_call({equery, _, _}, _From, #state{connection = undefined} = State) ->
+handle_call({query, _, _}, _From, #state{connection = undefined} = State) ->
     {reply, {error, no_connection}, State};
     {reply, {error, no_connection}, State};
 
 
-handle_call({equery, _, _}, _From,
+handle_call({query, _, _}, _From,
             #state{connection = #epgsql_connection{sock = undefined}} = State) ->
             #state{connection = #epgsql_connection{sock = undefined}} = State) ->
     {reply, {error, reconnecting}, State};
     {reply, {error, reconnecting}, State};
 
 
-handle_call({equery, Stmt, Params}, _From,
+handle_call({query, Stmt, Params}, _From,
             #state{connection = #epgsql_connection{sock = Sock}} = State) ->
             #state{connection = #epgsql_connection{sock = Sock}} = State) ->
     %% TStart = os:timestamp(),
     %% TStart = os:timestamp(),
+    %% timer:sleep(1000), %% TEMP
     Reply = case process_info(Sock, status) of
     Reply = case process_info(Sock, status) of
                 undefined -> {error, reconnecting};
                 undefined -> {error, reconnecting};
-                {status, _} -> epgsql:equery(Sock, Stmt, Params)
+                {status, _} ->
+                    case Stmt of
+                        "BEGIN" -> epgsql:squery(Sock, Stmt); % skip Stmt parsing
+                        "COMMIT" -> epgsql:squery(Sock, Stmt);
+                        "ROLLBACK" -> epgsql:squery(Sock, Stmt);
+                        _ -> epgsql:equery(Sock, Stmt, Params)
+                    end
             end,
             end,
     %% Time = timer:now_diff(os:timestamp(), TStart),
     %% Time = timer:now_diff(os:timestamp(), TStart),
     {reply, Reply, State};
     {reply, Reply, State};

+ 41 - 48
test/epgsql_pool_SUITE.erl

@@ -11,17 +11,17 @@
 
 
 -export([all/0,
 -export([all/0,
          init_per_suite/1, end_per_suite/1, init_per_testcase/2, end_per_testcase/2,
          init_per_suite/1, end_per_suite/1, init_per_testcase/2, end_per_testcase/2,
-         equery_test/1, transaction_test/1, reconnect_test/1, error_handler_test/1
+         query_test/1, transaction_test/1, reconnect_test/1, timeout_test/1
         ]).
         ]).
 
 
 -define(SELECT_ITEMS_QUERY, "SELECT id, category_id, title, num FROM item ORDER by id ASC").
 -define(SELECT_ITEMS_QUERY, "SELECT id, category_id, title, num FROM item ORDER by id ASC").
 
 
 
 
 all() ->
 all() ->
-    [equery_test,
+    [query_test,
      transaction_test,
      transaction_test,
      reconnect_test,
      reconnect_test,
-     error_handler_test
+     timeout_test
     ].
     ].
 
 
 
 
@@ -39,8 +39,8 @@ init_per_testcase(_, Config) ->
     Params = #epgsql_connection_params{host = "localhost", port = 5432, username = "test", password = "test", database = "testdb"},
     Params = #epgsql_connection_params{host = "localhost", port = 5432, username = "test", password = "test", database = "testdb"},
     epgsql_pool_settings:set_connection_params(my_pool, Params),
     epgsql_pool_settings:set_connection_params(my_pool, Params),
     {ok, _} = epgsql_pool:start(my_pool, 5, 10),
     {ok, _} = epgsql_pool:start(my_pool, 5, 10),
-    epgsql_pool:equery(my_pool, "TRUNCATE TABLE item"),
-    epgsql_pool:equery(my_pool, "TRUNCATE TABLE category CASCADE"),
+    epgsql_pool:query(my_pool, "TRUNCATE TABLE item"),
+    epgsql_pool:query(my_pool, "TRUNCATE TABLE category CASCADE"),
     Config.
     Config.
 
 
 
 
@@ -49,16 +49,25 @@ end_per_testcase(_, Config) ->
     Config.
     Config.
 
 
 
 
-equery_test(Config) ->
-    {ok, 3, _, Ids} = epgsql_pool:equery(my_pool,
-                                         "INSERT INTO category (title) "
-                                         "VALUES ('cat 1'), ('cat 2'), ('cat 3') "
-                                         "RETURNING id"),
+query_test(Config) ->
+    {ok, 3, _, Ids} = epgsql_pool:query(my_pool,
+                                        "INSERT INTO category (title) "
+                                        "VALUES ('cat 1'), ('cat 2'), ('cat 3') "
+                                        "RETURNING id"),
     WaitForRows = lists:map(fun({{Id}, Title}) -> {Id, Title} end,
     WaitForRows = lists:map(fun({{Id}, Title}) -> {Id, Title} end,
                             lists:zip(Ids, [<<"cat 1">>, <<"cat 2">>, <<"cat 3">>])),
                             lists:zip(Ids, [<<"cat 1">>, <<"cat 2">>, <<"cat 3">>])),
-    {ok, _, Rows} = epgsql_pool:equery(my_pool, "SELECT id, title FROM category ORDER by id ASC"),
+    {ok, _, Rows} = epgsql_pool:query(my_pool, "SELECT id, title FROM category ORDER by id ASC"),
     ct:pal("Rows ~p", [Rows]),
     ct:pal("Rows ~p", [Rows]),
     ?assertEqual(WaitForRows, Rows),
     ?assertEqual(WaitForRows, Rows),
+
+    [{Id1}, {Id2} | _] = Ids,
+    {ok, _, Rows2} = epgsql_pool:query(my_pool, "SELECT id, title FROM category WHERE id = $1 OR id = $2 ORDER BY id ASC", [Id1, Id2]),
+    ct:pal("Rows2 ~p", [Rows2]),
+    ?assertEqual([{Id1, <<"cat 1">>}, {Id2, <<"cat 2">>}], Rows2),
+
+    {error, Error} = epgsql_pool:query(my_pool, "SELECT id, title FROM some_table"),
+    ct:pal("Error:~p", [Error]),
+    ?assertMatch(#error{severity = error, message = <<"relation \"some_table\" does not exist">>}, Error),
     ok.
     ok.
 
 
 
 
@@ -68,28 +77,28 @@ transaction_test(Config) ->
                                 fun(Worker) ->
                                 fun(Worker) ->
                                         ct:pal("worker:~p", [Worker]),
                                         ct:pal("worker:~p", [Worker]),
                                         {ok, 3, _, CatIds0} =
                                         {ok, 3, _, CatIds0} =
-                                            epgsql_pool:equery(Worker,
-                                                               "INSERT INTO category (title) "
-                                                               "VALUES ('cat 4'), ('cat 5'), ('cat 6') "
-                                                               "RETURNING id"),
+                                            epgsql_pool:query(Worker,
+                                                              "INSERT INTO category (title) "
+                                                              "VALUES ('cat 4'), ('cat 5'), ('cat 6') "
+                                                              "RETURNING id"),
                                         CatIds1 = lists:map(fun({Cid}) -> Cid end, CatIds0),
                                         CatIds1 = lists:map(fun({Cid}) -> Cid end, CatIds0),
                                         CatId = hd(CatIds1),
                                         CatId = hd(CatIds1),
                                         {ok, 2, _, ItemIds0} =
                                         {ok, 2, _, ItemIds0} =
-                                            epgsql_pool:equery(Worker,
-                                                               "INSERT INTO item (category_id, title, num) "
-                                                               "VALUES ($1, 'item 1', 5), ($1, 'item 2', 7) "
-                                                               "RETURNING id", [CatId]),
+                                            epgsql_pool:query(Worker,
+                                                              "INSERT INTO item (category_id, title, num) "
+                                                              "VALUES ($1, 'item 1', 5), ($1, 'item 2', 7) "
+                                                              "RETURNING id", [CatId]),
                                         ItemIds1 = lists:map(fun({Iid}) -> Iid end, ItemIds0),
                                         ItemIds1 = lists:map(fun({Iid}) -> Iid end, ItemIds0),
                                         {CatId, CatIds1, ItemIds1}
                                         {CatId, CatIds1, ItemIds1}
                                 end),
                                 end),
     WaitForCats = lists:zip(CatIds2, [<<"cat 4">>, <<"cat 5">>, <<"cat 6">>]),
     WaitForCats = lists:zip(CatIds2, [<<"cat 4">>, <<"cat 5">>, <<"cat 6">>]),
-    {ok, _, CatRows} = epgsql_pool:equery(my_pool, "SELECT id, title FROM category ORDER by id ASC"),
+    {ok, _, CatRows} = epgsql_pool:query(my_pool, "SELECT id, title FROM category ORDER by id ASC"),
     ct:pal("CatRows ~p", [CatRows]),
     ct:pal("CatRows ~p", [CatRows]),
     ?assertEqual(WaitForCats, CatRows),
     ?assertEqual(WaitForCats, CatRows),
 
 
     WaitForItems = lists:map(fun({ItemId, {Title, Num}}) -> {ItemId, FirstCatId, Title, Num} end,
     WaitForItems = lists:map(fun({ItemId, {Title, Num}}) -> {ItemId, FirstCatId, Title, Num} end,
                              lists:zip(ItemIds2, [{<<"item 1">>, 5}, {<<"item 2">>, 7}])),
                              lists:zip(ItemIds2, [{<<"item 1">>, 5}, {<<"item 2">>, 7}])),
-    {ok, _, ItemRows} = epgsql_pool:equery(my_pool, ?SELECT_ITEMS_QUERY),
+    {ok, _, ItemRows} = epgsql_pool:query(my_pool, ?SELECT_ITEMS_QUERY),
     ct:pal("ItemRows ~p", [ItemRows]),
     ct:pal("ItemRows ~p", [ItemRows]),
     ?assertEqual(WaitForItems, ItemRows),
     ?assertEqual(WaitForItems, ItemRows),
 
 
@@ -98,11 +107,11 @@ transaction_test(Config) ->
                                 fun(Worker) ->
                                 fun(Worker) ->
                                         ct:pal("worker:~p", [Worker]),
                                         ct:pal("worker:~p", [Worker]),
                                         {ok, 2} =
                                         {ok, 2} =
-                                            epgsql_pool:equery(Worker,
-                                                               "INSERT INTO item (category_id, title, num) "
-                                                               "VALUES ($1, 'item 3', 55), ($1, 'item 4', 77) ",
-                                                               [FirstCatId]),
-                                        {ok, _, ItemRows2} = epgsql_pool:equery(Worker, ?SELECT_ITEMS_QUERY),
+                                            epgsql_pool:query(Worker,
+                                                              "INSERT INTO item (category_id, title, num) "
+                                                              "VALUES ($1, 'item 3', 55), ($1, 'item 4', 77) ",
+                                                              [FirstCatId]),
+                                        {ok, _, ItemRows2} = epgsql_pool:query(Worker, ?SELECT_ITEMS_QUERY),
                                         ct:pal("ItemRows2 ~p", [ItemRows2]),
                                         ct:pal("ItemRows2 ~p", [ItemRows2]),
                                         ?assertMatch([{_, FirstCatId, <<"item 1">>, 5},
                                         ?assertMatch([{_, FirstCatId, <<"item 1">>, 5},
                                                       {_, FirstCatId, <<"item 2">>, 7},
                                                       {_, FirstCatId, <<"item 2">>, 7},
@@ -116,7 +125,7 @@ transaction_test(Config) ->
     end,
     end,
 
 
     %% items not changes after calcelled transaction
     %% items not changes after calcelled transaction
-    {ok, _, ItemRows} = epgsql_pool:equery(my_pool, ?SELECT_ITEMS_QUERY),
+    {ok, _, ItemRows} = epgsql_pool:query(my_pool, ?SELECT_ITEMS_QUERY),
     ok.
     ok.
 
 
 
 
@@ -125,26 +134,26 @@ reconnect_test(Config) ->
     [state, my_pool, #epgsql_connection{sock = Sock1} | _]= tuple_to_list(sys:get_state(Worker)),
     [state, my_pool, #epgsql_connection{sock = Sock1} | _]= tuple_to_list(sys:get_state(Worker)),
     ct:pal("Worker: ~p, sock: ~p", [Worker, Sock1]),
     ct:pal("Worker: ~p, sock: ~p", [Worker, Sock1]),
 
 
-    R1 = epgsql_pool:equery(Worker, ?SELECT_ITEMS_QUERY),
+    R1 = epgsql_pool:query(Worker, ?SELECT_ITEMS_QUERY),
     ct:pal("first query ~p", [R1]),
     ct:pal("first query ~p", [R1]),
     {ok, _, []} = R1,
     {ok, _, []} = R1,
 
 
     ct:pal("~p close_connection", [Sock1]),
     ct:pal("~p close_connection", [Sock1]),
     exit(Sock1, close_connection),
     exit(Sock1, close_connection),
 
 
-    R2 = epgsql_pool:equery(Worker, "select * from item"),
+    R2 = epgsql_pool:query(Worker, "select * from item"),
     ct:pal("second query goes immediatelly ~p", [R2]),
     ct:pal("second query goes immediatelly ~p", [R2]),
     {error, reconnecting} = R2,
     {error, reconnecting} = R2,
 
 
     timer:sleep(50),
     timer:sleep(50),
 
 
-    R3 = epgsql_pool:equery(Worker, "select * from item"),
+    R3 = epgsql_pool:query(Worker, "select * from item"),
     ct:pal("third query goes after 50 ms ~p", [R3]),
     ct:pal("third query goes after 50 ms ~p", [R3]),
     {error, reconnecting} = R3,
     {error, reconnecting} = R3,
 
 
     timer:sleep(150),
     timer:sleep(150),
 
 
-    R4 = epgsql_pool:equery(Worker, "select * from item"),
+    R4 = epgsql_pool:query(Worker, "select * from item"),
     ct:pal("fouth query goes after 200 ms ~p", [R4]),
     ct:pal("fouth query goes after 200 ms ~p", [R4]),
     {ok, _, []} = R4,
     {ok, _, []} = R4,
 
 
@@ -155,22 +164,6 @@ reconnect_test(Config) ->
     ok.
     ok.
 
 
 
 
-error_handler_test(Config) ->
-    {error, Error} = epgsql_pool:equery(my_pool, "SELECT id, title FROM some_table"),
-    ct:pal("Error:~p", [Error]),
-    ?assertMatch(#error{severity = error, message = <<"relation \"some_table\" does not exist">>}, Error),
-
-    Query2 = "SELECT some_field FROM item WHERE id = $1",
-    ErrorMessage2 = <<"column \"some_field\" does not exist">>,
-    ErrorHandler = fun(_Worker, Stmt, Params, Error2) ->
-                           ct:pal("ErrorHandler: ~p", [Error2]),
-                           ?assertEqual(Query2, Stmt),
-                           ?assertEqual([1], Params),
-                           ?assertMatch(#error{severity = error, message = ErrorMessage2}, Error2),
-                           {db_error, Error2#error.message}
-                   end,
-    Res = epgsql_pool:equery(my_pool, Query2, [1], [{error_handler, ErrorHandler}]),
-    ct:pal("Res:~p", [Res]),
-    ?assertEqual({db_error, ErrorMessage2}, Res),
+timeout_test(_Config) ->
 
 
     ok.
     ok.