Browse Source

common test for transaction

Yuriy Zhloba 10 years ago
parent
commit
91453488db
2 changed files with 56 additions and 24 deletions
  1. 19 23
      src/epgsql_pool.erl
  2. 37 1
      test/epgsql_pool_SUITE.erl

+ 19 - 23
src/epgsql_pool.erl

@@ -30,18 +30,29 @@ stop(PoolName) ->
     pooler:rm_pool(epgsql_pool_utils:pool_name_to_atom(PoolName)).
     pooler:rm_pool(epgsql_pool_utils:pool_name_to_atom(PoolName)).
 
 
 
 
--spec equery(pool_name(), epgsql:sql_query()) -> epgsql:reply().
-equery(PoolName, Stmt) ->
-    equery(PoolName, Stmt, []).
+-spec equery(pool_name() | pid(), epgsql:sql_query()) -> epgsql:reply().
+equery(PoolNameOrWorker, Stmt) ->
+    equery(PoolNameOrWorker, Stmt, []).
 
 
 
 
 %% Example
 %% Example
 %% epgsql_pool:equery("my_db_pool", "SELECT NOW() as now", []).
 %% epgsql_pool:equery("my_db_pool", "SELECT NOW() as now", []).
--spec equery(pool_name(), epgsql:sql_query(), [epgsql:bind_param()]) -> epgsql:reply().
+-spec equery(pool_name() | pid(), epgsql:sql_query(), [epgsql:bind_param()]) -> epgsql:reply().
+equery(Worker, Stmt, Params) when is_pid(Worker) ->
+    Timeout = epgsql_pool_settings:get(query_timeout),
+    % TODO process timeout,
+    % try-catch
+    % send cancel
+    % log error
+    % reply to client with error
+    % reconnect
+    % return to pool
+    gen_server:call(Worker, {equery, Stmt, Params}, Timeout);
+
 equery(PoolName, Stmt, Params) ->
 equery(PoolName, Stmt, Params) ->
     transaction(PoolName,
     transaction(PoolName,
                 fun(Worker) ->
                 fun(Worker) ->
-                        equery_with_worker(Worker, Stmt, Params)
+                        equery(Worker, Stmt, Params)
                 end).
                 end).
 
 
 
 
@@ -52,13 +63,13 @@ transaction(PoolName0, Fun) ->
     case pooler:take_member(PoolName, Timeout) of
     case pooler:take_member(PoolName, Timeout) of
         Worker when is_pid(Worker) ->
         Worker when is_pid(Worker) ->
             try
             try
-                equery_with_worker(Worker, "BEGIN", []),
+                equery(Worker, "BEGIN", []),
                 Result = Fun(Worker),
                 Result = Fun(Worker),
-                equery_with_worker(Worker, "COMMIT", []),
+                equery(Worker, "COMMIT", []),
                 Result
                 Result
             catch
             catch
                 Err:Reason ->
                 Err:Reason ->
-                    equery_with_worker(Worker, "ROLLBACK", []),
+                    equery(Worker, "ROLLBACK", []),
                     erlang:raise(Err, Reason, erlang:get_stacktrace())
                     erlang:raise(Err, Reason, erlang:get_stacktrace())
             after
             after
                 pooler:return_member(PoolName, Worker, ok)
                 pooler:return_member(PoolName, Worker, ok)
@@ -68,18 +79,3 @@ transaction(PoolName0, Fun) ->
             error_logger:error_msg("Pool ~p overload: ~p", [PoolName, PoolStats]),
             error_logger:error_msg("Pool ~p overload: ~p", [PoolName, PoolStats]),
             {error, pool_overload}
             {error, pool_overload}
     end.
     end.
-
-
-%% Inner functions
-
--spec equery_with_worker(pid(), epgsql:sql_query(), [epgsql:bind_param()]) -> epgsql:reply().
-equery_with_worker(Worker, Stmt, Params) ->
-    Timeout = epgsql_pool_settings:get(query_timeout),
-    % TODO process timeout,
-    % try-catch
-    % send cancel
-    % log error
-    % reply to client with error
-    % reconnect
-    % return to pool
-    gen_server:call(Worker, {equery, Stmt, Params}, Timeout).

+ 37 - 1
test/epgsql_pool_SUITE.erl

@@ -77,7 +77,43 @@ equery_test(Config) ->
 
 
 
 
 transaction_test(Config) ->
 transaction_test(Config) ->
-    throw(not_implemented),
+    Connection = proplists:get_value(connection, Config),
+    epgsql_pool_settings:set_connection_params(my_pool, Connection#epgsql_connection.params),
+    {ok, _} = epgsql_pool:start(my_pool, 5, 10),
+
+    {FirstCatId, CatIds2, ItemIds2} =
+        epgsql_pool:transaction(my_pool,
+                                fun(Worker) ->
+                                        ct:pal("worker:~p", [Worker]),
+                                        {ok, 3, _, CatIds0} =
+                                            epgsql_pool:equery(Worker,
+                                                               "INSERT INTO category (title) "
+                                                               "VALUES ('cat 4'), ('cat 5'), ('cat 6') "
+                                                               "RETURNING id"),
+                                        CatIds1 = lists:map(fun({Cid}) -> Cid end, CatIds0),
+                                        CatId = hd(CatIds1),
+                                        {ok, 2, _, ItemIds0} =
+                                            epgsql_pool:equery(Worker,
+                                                               "INSERT INTO item (category_id, title, num) "
+                                                               "VALUES ($1, 'item 1', 5), ($1, 'item 2', 7) "
+                                                               "RETURNING id", [CatId]),
+                                        ItemIds1 = lists:map(fun({Iid}) -> Iid end, ItemIds0),
+                                        {CatId, CatIds1, ItemIds1}
+                                end),
+    WaitForCats = lists:zip(CatIds2, [<<"cat 4">>, <<"cat 5">>, <<"cat 6">>]),
+    {ok, _, CatRows} = epgsql_pool:equery(my_pool, "SELECT id, title FROM category ORDER by id ASC"),
+    ct:pal("CatRows ~p", [CatRows]),
+    ?assertEqual(WaitForCats, CatRows),
+
+    WaitForItems = lists:map(fun({ItemId, {Title, Num}}) -> {ItemId, FirstCatId, Title, Num} end,
+                             lists:zip(ItemIds2, [{<<"item 1">>, 5}, {<<"item 2">>, 7}])),
+    {ok, _, ItemRows} = epgsql_pool:equery(my_pool, "SELECT id, category_id, title, num FROM item ORDER by id ASC"),
+    ct:pal("ItemRows ~p", [ItemRows]),
+    ?assertEqual(WaitForItems, ItemRows),
+
+    %% TODO invalid transation (with exception)
+
+    ok = epgsql_pool:stop(my_pool),
     ok.
     ok.