Browse Source

Nested transactions

Viktor Söderqvist 10 years ago
parent
commit
512d7e7806
4 changed files with 102 additions and 26 deletions
  1. 14 9
      README.md
  2. 3 3
      doc/overview.edoc
  3. 54 13
      src/mysql.erl
  4. 31 1
      test/mysql_tests.erl

+ 14 - 9
README.md

@@ -3,15 +3,16 @@ MySQL/OTP
 
 
 [![Build Status](https://travis-ci.org/mysql-otp/mysql-otp.svg)](https://travis-ci.org/mysql-otp/mysql-otp)
 [![Build Status](https://travis-ci.org/mysql-otp/mysql-otp.svg)](https://travis-ci.org/mysql-otp/mysql-otp)
 
 
-MySQL/OTP is a client library for connecting to MySQL databases from Erlang/OTP
-applications. It is a native implementation of the MySQL protocol in Erlang.
+MySQL/OTP is a driver for connecting Erlang/OTP applications to MySQL
+databases. It is a native implementation of the MySQL protocol in Erlang.
 
 
 Features:
 Features:
 
 
-* Mnesia style transactions. (Currenly transactions cannot be nested and are
-  not retried automatically when deadlocks are detected. These are features of
-  Mnesia style transactions and there are plans to implement them. See
-  [#7](https://github.com/mysql-otp/mysql-otp/issues/7).)
+* Nestable Mnesia style transactions.
+  * Nested transactions are implemented using savepoints (since 0.6.0).
+  * Currenly transactions are not automatically retried when deadlocks are
+    detected but there are plans to implement that too.
+    See [#7](https://github.com/mysql-otp/mysql-otp/issues/7).)
 * Uses the binary protocol for prepared statements.
 * Uses the binary protocol for prepared statements.
 * Each connection is a gen_server, which makes it compatible with Poolboy (for
 * Each connection is a gen_server, which makes it compatible with Poolboy (for
   connection pooling) and ordinary OTP supervisors.
   connection pooling) and ordinary OTP supervisors.
@@ -24,8 +25,12 @@ See also:
 * [Test coverage](//mysql-otp.github.io/mysql-otp/eunit.html) (EUnit)
 * [Test coverage](//mysql-otp.github.io/mysql-otp/eunit.html) (EUnit)
 * [Why another MySQL driver?](https://github.com/mysql-otp/mysql-otp/wiki#why-another-mysql-driver) in the wiki
 * [Why another MySQL driver?](https://github.com/mysql-otp/mysql-otp/wiki#why-another-mysql-driver) in the wiki
 
 
-This is a work in progress. The API and the value representation may still
-change. Use a tagged version to make sure nothing breaks.
+This is a work in progress. Use a tagged version to make sure nothing breaks.
+
+Todo:
+
+* Ping regularily when inactive
+* Retry transactions when deadlocks are detected
 
 
 Synopsis
 Synopsis
 --------
 --------
@@ -49,7 +54,7 @@ LastInsertId = mysql:insert_id(Pid),
 AffectedRows = mysql:affected_rows(Pid),
 AffectedRows = mysql:affected_rows(Pid),
 WarningCount = mysql:warning_count(Pid),
 WarningCount = mysql:warning_count(Pid),
 
 
-%% Mnesia style transaction
+%% Mnesia style transaction (nestable)
 Result = mysql:transaction(Pid, fun () ->
 Result = mysql:transaction(Pid, fun () ->
     ok = mysql:query(Pid, "INSERT INTO mytable (foo) VALUES (1)"),
     ok = mysql:query(Pid, "INSERT INTO mytable (foo) VALUES (1)"),
     throw(foo),
     throw(foo),

+ 3 - 3
doc/overview.edoc

@@ -19,11 +19,11 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
 
 
 @title MySQL/OTP User's Guide
 @title MySQL/OTP User's Guide
 @doc
 @doc
-MySQL/OTP is a client library for connecting to MySQL databases from Erlang/OTP
-applications.
+MySQL/OTP is a driver for connecting Erlang/OTP applications to MySQL
+databases. It is a native implementation of the MySQL protocol in Erlang.
 
 
 This is the documentation generated from the Erlang source code using EDoc.
 This is the documentation generated from the Erlang source code using EDoc.
-The project page is here:
+The project page is on Gitbub:
 <a href="https://github.com/mysql-otp/mysql-otp/"
 <a href="https://github.com/mysql-otp/mysql-otp/"
    target="_top">https://github.com/mysql-otp/mysql-otp/</a>.
    target="_top">https://github.com/mysql-otp/mysql-otp/</a>.
 
 

+ 54 - 13
src/mysql.erl

@@ -43,6 +43,8 @@
 -define(default_query_timeout, infinity).
 -define(default_query_timeout, infinity).
 -define(default_query_cache_time, 60000). %% for query/3.
 -define(default_query_cache_time, 60000). %% for query/3.
 
 
+-define(cmd_timeout, 3000). %% Timeout used for various commands to the server
+
 %% A connection is a ServerRef as in gen_server:call/2,3.
 %% A connection is a ServerRef as in gen_server:call/2,3.
 -type connection() :: Name :: atom() |
 -type connection() :: Name :: atom() |
                       {Name :: atom(), Node :: atom()} |
                       {Name :: atom(), Node :: atom()} |
@@ -260,22 +262,24 @@ transaction(Conn, Fun) ->
 %% @doc This function executes the functional object Fun with arguments Args as
 %% @doc This function executes the functional object Fun with arguments Args as
 %% a transaction. 
 %% a transaction. 
 %%
 %%
-%% The semantics are the same as for mnesia's transactions.
+%% The semantics are the same as for mnesia's transactions except they are not
+%% automatically retried when deadlocks are detected. Transactions can be
+%% nested. (MySQL savepoints are used to implement nested transactions.)
 %%
 %%
 %% The Fun must be a function and Args must be a list with the same length
 %% The Fun must be a function and Args must be a list with the same length
 %% as the arity of Fun. 
 %% as the arity of Fun. 
 %%
 %%
-%% Current limitations:
-%%
-%% <ul>
-%%   <li>Transactions cannot be nested</li>
-%%   <li>They are not automatically restarted when deadlocks are detected.</li>
-%% </ul>
+%% Current limitation: Transactions is not automatically restarted when
+%% deadlocks are detected.
 %%
 %%
 %% If an exception occurs within Fun, the exception is caught and `{aborted,
 %% If an exception occurs within Fun, the exception is caught and `{aborted,
 %% Reason}' is returned. The value of `Reason' depends on the class of the
 %% Reason}' is returned. The value of `Reason' depends on the class of the
 %% exception.
 %% exception.
 %%
 %%
+%% Note that an error response from a query does not cause a transaction to be
+%% rollbacked. To force a rollback on MySQL errors, you can trigger a `badmatch'
+%% using e.g. `ok = mysql:query(Pid, "SELECT some_non_existent_value")'.
+%%
 %% <table>
 %% <table>
 %%   <thead>
 %%   <thead>
 %%     <tr><th>Class of exception</th><th>Return value</th></tr>
 %%     <tr><th>Class of exception</th><th>Return value</th></tr>
@@ -290,7 +294,6 @@ transaction(Conn, Fun) ->
 %%   </tbody>
 %%   </tbody>
 %% </table>
 %% </table>
 %%
 %%
-%% TODO: Implement nested transactions
 %% TODO: Automatic restart on deadlocks
 %% TODO: Automatic restart on deadlocks
 %% @see in_transaction/1
 %% @see in_transaction/1
 -spec transaction(connection(), fun(), list()) -> {atomic, term()} |
 -spec transaction(connection(), fun(), list()) -> {atomic, term()} |
@@ -299,16 +302,16 @@ transaction(Conn, Fun, Args) when is_list(Args),
                                   is_function(Fun, length(Args)) ->
                                   is_function(Fun, length(Args)) ->
     %% The guard makes sure that we can apply Fun to Args. Any error we catch
     %% The guard makes sure that we can apply Fun to Args. Any error we catch
     %% in the try-catch are actual errors that occurred in Fun.
     %% in the try-catch are actual errors that occurred in Fun.
-    ok = query(Conn, <<"BEGIN">>),
+    ok = gen_server:call(Conn, start_transaction),
     try apply(Fun, Args) of
     try apply(Fun, Args) of
         ResultOfFun ->
         ResultOfFun ->
             %% We must be able to rollback. Otherwise let's crash.
             %% We must be able to rollback. Otherwise let's crash.
-            ok = query(Conn, <<"COMMIT">>),
+            ok = gen_server:call(Conn, commit),
             {atomic, ResultOfFun}
             {atomic, ResultOfFun}
     catch
     catch
         Class:Reason ->
         Class:Reason ->
             %% We must be able to rollback. Otherwise let's crash.
             %% We must be able to rollback. Otherwise let's crash.
-            ok = query(Conn, <<"ROLLBACK">>),
+            ok = gen_server:call(Conn, rollback),
             %% These forms for throw, error and exit mirror Mnesia's behaviour.
             %% These forms for throw, error and exit mirror Mnesia's behaviour.
             Aborted = case Class of
             Aborted = case Class of
                 throw -> {throw, Reason};
                 throw -> {throw, Reason};
@@ -328,6 +331,7 @@ transaction(Conn, Fun, Args) when is_list(Args),
                 host, port, user, password,
                 host, port, user, password,
                 query_timeout, query_cache_time,
                 query_timeout, query_cache_time,
                 affected_rows = 0, status = 0, warning_count = 0, insert_id = 0,
                 affected_rows = 0, status = 0, warning_count = 0, insert_id = 0,
+                transaction_level = 0,
                 stmts = dict:new(), query_cache = empty}).
                 stmts = dict:new(), query_cache = empty}).
 
 
 %% @private
 %% @private
@@ -487,7 +491,44 @@ handle_call(affected_rows, _From, State) ->
 handle_call(autocommit, _From, State) ->
 handle_call(autocommit, _From, State) ->
     {reply, State#state.status band ?SERVER_STATUS_AUTOCOMMIT /= 0, State};
     {reply, State#state.status band ?SERVER_STATUS_AUTOCOMMIT /= 0, State};
 handle_call(in_transaction, _From, State) ->
 handle_call(in_transaction, _From, State) ->
-    {reply, State#state.status band ?SERVER_STATUS_IN_TRANS /= 0, State}.
+    {reply, State#state.status band ?SERVER_STATUS_IN_TRANS /= 0, State};
+handle_call(start_transaction, _From,
+            State = #state{socket = Socket, transaction_level = L,
+                           status = Status})
+  when Status band ?SERVER_STATUS_IN_TRANS == 0, L == 0;
+       Status band ?SERVER_STATUS_IN_TRANS /= 0, L > 0 ->
+    Query = case L of
+        0 -> <<"BEGIN">>;
+        _ -> <<"SAVEPOINT s", (integer_to_binary(L))/binary>>
+    end,
+    Res = #ok{} = mysql_protocol:query(Query, gen_tcp, Socket, ?cmd_timeout),
+    State1 = update_state(State, Res),
+    {reply, ok, State1#state{transaction_level = L + 1}};
+handle_call(rollback, _From, State = #state{socket = Socket, status = Status,
+                                            transaction_level = L})
+  when Status band ?SERVER_STATUS_IN_TRANS /= 0, L >= 1 ->
+    Query = case L of
+        1 -> <<"ROLLBACK">>;
+        _ -> <<"ROLLBACK TO s", (integer_to_binary(L - 1))/binary>>
+    end,
+    Res = #ok{} = mysql_protocol:query(Query, gen_tcp, Socket, ?cmd_timeout),
+    State1 = update_state(State, Res),
+    {reply, ok, State1#state{transaction_level = L - 1}};
+handle_call(commit, _From, State = #state{socket = Socket, status = Status,
+                                          transaction_level = L})
+  when Status band ?SERVER_STATUS_IN_TRANS /= 0, L >= 1 ->
+    Query = case L of
+        1 -> <<"COMMIT">>;
+        _ -> <<"RELEASE SAVEPOINT s", (integer_to_binary(L - 1))/binary>>
+    end,
+    Res = #ok{} = mysql_protocol:query(Query, gen_tcp, Socket, ?cmd_timeout),
+    State1 = update_state(State, Res),
+    {reply, ok, State1#state{transaction_level = L - 1}};
+handle_call(Trans, _From, State) when Trans == start_transaction;
+                                      Trans == rollback;
+                                      Trans == commit ->
+    %% The 'in transaction' flag doesn't match the level we have in the state.
+    {reply, {error, incorrectly_nested}, State}.
 
 
 %% @private
 %% @private
 handle_cast(_Msg, State) ->
 handle_cast(_Msg, State) ->
@@ -587,7 +628,7 @@ kill_query(#state{connection_id = ConnId, host = Host, port = Port,
             %% Kill and disconnect
             %% Kill and disconnect
             IdBin = integer_to_binary(ConnId),
             IdBin = integer_to_binary(ConnId),
             #ok{} = mysql_protocol:query(<<"KILL QUERY ", IdBin/binary>>,
             #ok{} = mysql_protocol:query(<<"KILL QUERY ", IdBin/binary>>,
-                                         gen_tcp, Socket, 3000),
+                                         gen_tcp, Socket, ?cmd_timeout),
             mysql_protocol:quit(gen_tcp, Socket);
             mysql_protocol:quit(gen_tcp, Socket);
         #error{} = E ->
         #error{} = E ->
             error_logger:error_msg("Failed to connect to kill query: ~p",
             error_logger:error_msg("Failed to connect to kill query: ~p",

+ 31 - 1
test/mysql_tests.erl

@@ -434,7 +434,9 @@ with_table_foo_test_() ->
      {with, [fun prepared_statements/1,
      {with, [fun prepared_statements/1,
              fun parameterized_query/1,
              fun parameterized_query/1,
              fun transaction_simple_success/1,
              fun transaction_simple_success/1,
-             fun transaction_simple_aborted/1]}}.
+             fun transaction_simple_aborted/1,
+             fun transaction_nested_success/1,
+             fun transaction_inner_rollback/1]}}.
 
 
 prepared_statements(Pid) ->
 prepared_statements(Pid) ->
     %% Unnamed
     %% Unnamed
@@ -501,6 +503,34 @@ transaction_simple_aborted(Pid) ->
     ?assertEqual({aborted, foo},
     ?assertEqual({aborted, foo},
                  mysql:transaction(Pid, fun () -> exit(foo) end)).
                  mysql:transaction(Pid, fun () -> exit(foo) end)).
 
 
+transaction_nested_success(Pid) ->
+    OuterResult = mysql:transaction(Pid, fun () ->
+        ok = mysql:query(Pid, "INSERT INTO foo VALUES (9)"),
+        InnerResult = mysql:transaction(Pid, fun () ->
+            ok = mysql:query(Pid, "INSERT INTO foo VALUES (42)"),
+            inner
+        end),
+        ?assertEqual({atomic, inner}, InnerResult),
+        outer
+    end),
+    ?assertMatch({ok, _, [[2]]}, mysql:query(Pid, "SELECT COUNT(*) FROM foo")),
+    ok = mysql:query(Pid, "DELETE FROM foo"),
+    ?assertEqual({atomic, outer}, OuterResult).
+
+transaction_inner_rollback(Pid) ->
+    OuterResult = mysql:transaction(Pid, fun () ->
+        ok = mysql:query(Pid, "INSERT INTO foo VALUES (9)"),
+        InnerResult = mysql:transaction(Pid, fun () ->
+            ok = mysql:query(Pid, "INSERT INTO foo VALUES (42)"),
+            throw(inner)
+        end),
+        ?assertEqual({aborted, {throw, inner}}, InnerResult),
+        outer
+    end),
+    ?assertMatch({ok, _, [[9]]}, mysql:query(Pid, "SELECT bar FROM foo")),
+    ok = mysql:query(Pid, "DELETE FROM foo"),
+    ?assertEqual({atomic, outer}, OuterResult).
+
 %% --- simple gen_server callbacks ---
 %% --- simple gen_server callbacks ---
 
 
 gen_server_coverage_test() ->
 gen_server_coverage_test() ->