Browse Source

Restartable transactions and implicit commits and rollbacks, fixes #7

Viktor Söderqvist 10 years ago
parent
commit
639d78c784
3 changed files with 452 additions and 96 deletions
  1. 166 30
      src/mysql.erl
  2. 2 66
      test/mysql_tests.erl
  3. 284 0
      test/transaction_tests.erl

+ 166 - 30
src/mysql.erl

@@ -27,7 +27,7 @@
          prepare/2, prepare/3, unprepare/2,
          warning_count/1, affected_rows/1, autocommit/1, insert_id/1,
          in_transaction/1,
-         transaction/2, transaction/3]).
+         transaction/2, transaction/3, transaction/4]).
 
 -export_type([connection/0, server_reason/0]).
 
@@ -45,6 +45,10 @@
 
 -define(cmd_timeout, 3000). %% Timeout used for various commands to the server
 
+%% Errors that cause "implicit rollback"
+-define(ERROR_LOCK_WAIT_TIMEOUT, 1205).
+-define(ERROR_DEADLOCK, 1213).
+
 %% A connection is a ServerRef as in gen_server:call/2,3.
 -type connection() :: Name :: atom() |
                       {Name :: atom(), Node :: atom()} |
@@ -120,7 +124,7 @@ start_link(Options) ->
          Rows :: [[term()]],
          Reason :: server_reason().
 query(Conn, Query) ->
-    gen_server:call(Conn, {query, Query}, infinity).
+    query_call(Conn, {query, Query}).
 
 %% @doc Depending on the 3rd argument this function does different things.
 %%
@@ -141,9 +145,9 @@ query(Conn, Query) ->
          Rows :: [[term()]],
          Reason :: server_reason().
 query(Conn, Query, Params) when is_list(Params) ->
-    gen_server:call(Conn, {param_query, Query, Params}, infinity);
+    query_call(Conn, {param_query, Query, Params});
 query(Conn, Query, Timeout) when is_integer(Timeout); Timeout == infinity ->
-    gen_server:call(Conn, {query, Query, Timeout}, infinity).
+    query_call(Conn, {query, Query, Timeout}).
 
 %% @doc Executes a parameterized query with a timeout.
 %%
@@ -163,7 +167,7 @@ query(Conn, Query, Timeout) when is_integer(Timeout); Timeout == infinity ->
          Rows :: [[term()]],
          Reason :: server_reason().
 query(Conn, Query, Params, Timeout) ->
-    gen_server:call(Conn, {param_query, Query, Params, Timeout}, infinity).
+    query_call(Conn, {param_query, Query, Params, Timeout}).
 
 %% @doc Executes a prepared statement with the default query timeout as given
 %% to start_link/1.
@@ -178,7 +182,7 @@ query(Conn, Query, Params, Timeout) ->
        Rows :: [[term()]],
        Reason :: server_reason() | not_prepared.
 execute(Conn, StatementRef, Params) ->
-    gen_server:call(Conn, {execute, StatementRef, Params}, infinity).
+    query_call(Conn, {execute, StatementRef, Params}).
 
 %% @doc Executes a prepared statement.
 %% @see prepare/2
@@ -193,7 +197,7 @@ execute(Conn, StatementRef, Params) ->
        Rows :: [[term()]],
        Reason :: server_reason() | not_prepared.
 execute(Conn, StatementRef, Params, Timeout) ->
-    gen_server:call(Conn, {execute, StatementRef, Params, Timeout}, infinity).
+    query_call(Conn, {execute, StatementRef, Params, Timeout}).
 
 %% @doc Creates a prepared statement from the passed query.
 %% @see prepare/3
@@ -251,38 +255,51 @@ insert_id(Conn) ->
 %% transaction/2,3 or using a plain `mysql:query(Connection, "START
 %% TRANSACTION")'.
 %% @see transaction/2
-%% @see transaction/3
+%% @see transaction/4
 -spec in_transaction(connection()) -> boolean().
 in_transaction(Conn) ->
     gen_server:call(Conn, in_transaction).
 
 %% @doc This function executes the functional object Fun as a transaction.
-%% @see transaction/3
-%% @see in_transaction/1
+%% @see transaction/4
 -spec transaction(connection(), fun()) -> {atomic, term()} | {aborted, term()}.
 transaction(Conn, Fun) ->
-    transaction(Conn, Fun, []).
+    transaction(Conn, Fun, [], infinity).
+
+%% @doc This function executes the functional object Fun as a transaction.
+%% @see transaction/4
+-spec transaction(connection(), fun(), Retries) -> {atomic, term()} |
+                                                   {aborted, term()}
+    when Retries :: non_neg_integer() | infinity.
+transaction(Conn, Fun, Retries) ->
+    transaction(Conn, Fun, [], Retries).
 
 %% @doc This function executes the functional object Fun with arguments Args as
 %% a transaction. 
 %%
-%% The semantics are the same as for mnesia's transactions except they are not
-%% automatically retried when deadlocks are detected. Transactions can be
-%% nested. (MySQL savepoints are used to implement nested transactions.)
+%% The semantics are as close as possible to mnesia's transactions. Transactions
+%% can be nested and are restarted automatically when deadlocks are detected.
+%% MySQL's savepoints are used to implement nested transactions.
 %%
-%% The Fun must be a function and Args must be a list with the same length
-%% as the arity of Fun. 
-%%
-%% Current limitation: Transactions is not automatically restarted when
-%% deadlocks are detected.
+%% Fun must be a function and Args must be a list of the same length as the
+%% arity of Fun.
 %%
 %% If an exception occurs within Fun, the exception is caught and `{aborted,
 %% Reason}' is returned. The value of `Reason' depends on the class of the
 %% exception.
 %%
 %% Note that an error response from a query does not cause a transaction to be
-%% rollbacked. To force a rollback on MySQL errors, you can trigger a `badmatch'
+%% rollbacked. To force a rollback on a MySQL error you can trigger a `badmatch'
 %% using e.g. `ok = mysql:query(Pid, "SELECT some_non_existent_value")'.
+%% Exceptions to this are error 1213 "Deadlock" (after the specified number
+%% retries all have failed) and error 1205 "Lock wait timeout" which causes an
+%% *implicit rollback*.
+%%
+%% Some queries such as ALTER TABLE cause an *implicit commit* on the server.
+%% If such a query is executed within a transaction, an error on the form
+%% `{implicit_commit, Query}' is raised. This means that the transaction has
+%% been committed prematurely. This also happens if an explicit COMMIT is
+%% executed as a plain query within a managed transaction. (Don't do that!)
 %%
 %% <table>
 %%   <thead>
@@ -297,13 +314,11 @@ transaction(Conn, Fun) ->
 %%     <tr><td>`throw(Term)'</td><td>`{aborted, {throw, Term}}'</td></tr>
 %%   </tbody>
 %% </table>
-%%
-%% TODO: Automatic restart on deadlocks
-%% @see in_transaction/1
--spec transaction(connection(), fun(), list()) -> {atomic, term()} |
-                                                  {aborted, term()}.
-transaction(Conn, Fun, Args) when is_list(Args),
-                                  is_function(Fun, length(Args)) ->
+-spec transaction(connection(), fun(), list(), Retries) -> {atomic, term()} |
+                                                           {aborted, term()}
+    when Retries :: non_neg_integer() | infinity.
+transaction(Conn, Fun, Args, Retries) when is_list(Args),
+                                           is_function(Fun, length(Args)) ->
     %% The guard makes sure that we can apply Fun to Args. Any error we catch
     %% in the try-catch are actual errors that occurred in Fun.
     ok = gen_server:call(Conn, start_transaction),
@@ -313,6 +328,43 @@ transaction(Conn, Fun, Args) when is_list(Args),
             ok = gen_server:call(Conn, commit),
             {atomic, ResultOfFun}
     catch
+        throw:{implicit_rollback, N, Reason} when N >= 1 ->
+            %% Jump out of N nested transactions to restart the outer-most one.
+            %% The server has already rollbacked so we shouldn't do that here.
+            case N of
+                1 ->
+                    case Reason of
+                        {?ERROR_DEADLOCK, _, _} when Retries == infinity ->
+                            transaction(Conn, Fun, Args, infinity);
+                        {?ERROR_DEADLOCK, _, _} when Retries > 0 ->
+                            transaction(Conn, Fun, Args, Retries - 1);
+                        _OtherImplicitRollbackError ->
+                            %% This includes the case ?ERROR_LOCK_WAIT_TIMEOUT
+                            %% which we don't restart automatically.
+                            %% We issue a rollback here since MySQL doesn't
+                            %% seem to have fully rollbacked and an extra
+                            %% rollback doesn't hurt.
+                            ok = query(Conn, <<"ROLLBACK">>),
+                            {aborted, {Reason, erlang:get_stacktrace()}}
+                    end;
+                _ ->
+                    %% Re-throw with the same trace. We'll use that in the
+                    %% final {aborted, {Reason, Trace}} in the outer level.
+                    erlang:raise(throw, {implicit_rollback, N - 1, Reason},
+                                 erlang:get_stacktrace())
+            end;
+        throw:{implicit_commit, N, Query} when N >= 1 ->
+            %% The called did something like ALTER TABLE which resulted in an
+            %% implicit commit. The server has already committed. We need to
+            %% jump out of N levels of transactions.
+            %%
+            %% Returning 'atomic' or 'aborted' would both be wrong. Raise an
+            %% exception is the best we can do.
+            case N of
+                1 -> error({implicit_commit, Query});
+                _ -> erlang:raise(throw, {implicit_commit, N - 1, Query},
+                                  erlang:get_stacktrace())
+            end;
         Class:Reason ->
             %% We must be able to rollback. Otherwise let's crash.
             ok = gen_server:call(Conn, rollback),
@@ -376,6 +428,48 @@ init(Opts) ->
     end.
 
 %% @private
+%% @doc
+%%
+%% Query and execute calls:
+%%
+%% <ul>
+%%   <li>{query, Query}</li>
+%%   <li>{query, Query, Timeout}</li>
+%%   <li>{param_query, Query, Params}</li>
+%%   <li>{param_query, Query, Params, Timeout}</li>
+%%   <li>{execute, Stmt, Args}</li>
+%%   <li>{execute, Stmt, Args, Timeout}</li>
+%% </ul>
+%%
+%% For the calls listed above, we return these values:
+%%
+%% <dl>
+%%   <dt>`ok'</dt>
+%%   <dd>Success without returning any table data (UPDATE, etc.)</dd>
+%%   <dt>`{ok, ColumnNames, Rows}'</dt>
+%%   <dd>Queries returning table data</dd>
+%%   <dt>`{error, ServerReason}'</dt>
+%%   <dd>MySQL server error</dd>
+%%   <dt>`{implicit_commit, NestingLevel, Query}'</dt>
+%%   <dd>A DDL statement (e.g. CREATE TABLE, ALTER TABLE, etc.) results in
+%%       an implicit commit.
+%%
+%%       If the caller is in a (nested) transaction, it must be aborted. To be
+%%       able to handle this in the caller's process, we also return the
+%%       nesting level.</dd>
+%%   <dt>`{implicit_rollback, NestingLevel, ServerReason}'</dt>
+%%   <dd>These errors result in an implicit rollback:
+%%       <ul>
+%%         <li>`{1205, <<"HY000">>, <<"Lock wait timeout exceeded;
+%%                                     try restarting transaction">>}'</li>
+%%         <li>`{1213, <<"40001">>, <<"Deadlock found when trying to get lock;
+%%                                     try restarting transaction">>}'</li>
+%%       </ul>
+%%
+%%       If the caller is in a (nested) transaction, it must be aborted. To be
+%%       able to handle this in the caller's process, we also return the
+%%       nesting level.</dd>
+%% </dl>
 handle_call({query, Query}, From, State) ->
     handle_call({query, Query, State#state.query_timeout}, From, State);
 handle_call({query, Query, Timeout}, _From, State) ->
@@ -395,13 +489,27 @@ handle_call({query, Query, Timeout}, _From, State) ->
     State1#state.warning_count > 0 andalso State1#state.log_warnings
         andalso log_warnings(State1, Query),
     case Rec of
+        #ok{status = Status} when Status band ?SERVER_STATUS_IN_TRANS == 0,
+                                  State1#state.transaction_level > 0 ->
+            %% DDL statements (e.g. CREATE TABLE, ALTER TABLE, etc.) result in
+            %% an implicit commit.
+            Reply = {implicit_commit, State1#state.transaction_level, Query},
+            {reply, Reply, State1#state{transaction_level = 0}};
         #ok{} ->
             {reply, ok, State1};
-        #error{} = E ->
-            {reply, {error, error_to_reason(E)}, State1};
         #resultset{cols = ColDefs, rows = Rows} ->
             Names = [Def#col.name || Def <- ColDefs],
-            {reply, {ok, Names, Rows}, State1}
+            {reply, {ok, Names, Rows}, State1};
+        #error{code = Code} when State1#state.transaction_level > 0,
+                                 (Code == ?ERROR_DEADLOCK orelse
+                                  Code == ?ERROR_LOCK_WAIT_TIMEOUT) ->
+            %% These errors result in an implicit rollback.
+            Reply = {implicit_rollback, State1#state.transaction_level,
+                     error_to_reason(Rec)},
+            State2 = clear_transaction_status(State1),
+            {reply, Reply, State2};
+        #error{} ->
+            {reply, {error, error_to_reason(Rec)}, State1}
     end;
 handle_call({param_query, Query, Params}, From, State) ->
     handle_call({param_query, Query, Params, State#state.query_timeout}, From,
@@ -584,6 +692,19 @@ code_change(_OldVsn, _State, _Extra) ->
 
 %% --- Helpers ---
 
+%% @doc Makes a gen_server call for a query (plain, parametrized or prepared),
+%% checks the reply and sometimes throws an exception when we need to jump out
+%% of a transaction.
+query_call(Conn, CallReq) ->
+    case gen_server:call(Conn, CallReq, infinity) of
+        {implicit_commit, _NestingLevel, _Query} = ImplicitCommit ->
+            throw(ImplicitCommit);
+        {implicit_rollback, _NestingLevel, _ServerReason} = ImplicitRollback ->
+            throw(ImplicitRollback);
+        Result ->
+            Result
+    end.
+
 %% @doc Executes a prepared statement and returns {Reply, NextState}.
 execute_stmt(Stmt, Args, Timeout, State = #state{socket = Socket}) ->
     Rec = case mysql_protocol:execute(Stmt, Args, gen_tcp, Socket, Timeout) of
@@ -601,6 +722,14 @@ execute_stmt(Stmt, Args, Timeout, State = #state{socket = Socket}) ->
     case Rec of
         #ok{} ->
             {ok, State1};
+        #error{code = Code} when State1#state.transaction_level > 0,
+                                 (Code == ?ERROR_DEADLOCK orelse
+                                  Code == ?ERROR_LOCK_WAIT_TIMEOUT) ->
+            %% Implicit rollback.
+            Reply = {implicit_rollback, State1#state.transaction_level,
+                     error_to_reason(Rec)},
+            State2 = clear_transaction_status(State1),
+            {Reply, State2};
         #error{} = E ->
             {{error, error_to_reason(E)}, State1};
         #resultset{cols = ColDefs, rows = Rows} ->
@@ -628,6 +757,13 @@ update_state(State, _Other) ->
     %% Reset warnings, etc. (Note: We don't reset status and insert_id.)
     State#state{warning_count = 0, affected_rows = 0}.
 
+%% @doc Since errors don't return a status but some errors cause an implicit
+%% rollback, we use this function to clear fix the transaction bit in the
+%% status.
+clear_transaction_status(State = #state{status = Status}) ->
+    State#state{status = Status band bnot ?SERVER_STATUS_IN_TRANS,
+                transaction_level = 0}.
+
 %% @doc Fetches and logs warnings. Query is the query that gave the warnings.
 log_warnings(#state{socket = Socket}, Query) ->
     #resultset{rows = Rows} =

+ 2 - 66
test/mysql_tests.erl

@@ -418,7 +418,7 @@ timeout_test_() ->
 
 %% --------------------------------------------------------------------------
 
-%% Prepared statements and transactions
+%% Prepared statements
 
 with_table_foo_test_() ->
     {setup,
@@ -437,11 +437,7 @@ with_table_foo_test_() ->
          exit(Pid, normal)
      end,
      {with, [fun prepared_statements/1,
-             fun parameterized_query/1,
-             fun transaction_simple_success/1,
-             fun transaction_simple_aborted/1,
-             fun transaction_nested_success/1,
-             fun transaction_inner_rollback/1]}}.
+             fun parameterized_query/1]}}.
 
 prepared_statements(Pid) ->
     %% Unnamed
@@ -476,66 +472,6 @@ parameterized_query(Conn) ->
     receive after 150 -> ok end, %% Now the query cache should emptied
     {ok, _, []} = mysql:query(Conn, "SELECT * FROM foo WHERE bar = ?", [3]).
 
-transaction_simple_success(Pid) ->
-    ?assertNot(mysql:in_transaction(Pid)),
-    Result = mysql:transaction(Pid, fun () ->
-                 ok = mysql:query(Pid, "INSERT INTO foo VALUES (42)"),
-                 ?assert(mysql:in_transaction(Pid)),
-                 hello
-             end),
-    ?assertEqual({atomic, hello}, Result),
-    ?assertNot(mysql:in_transaction(Pid)),
-    ok = mysql:query(Pid, "DELETE FROM foo").
-
-transaction_simple_aborted(Pid) ->
-    ok = mysql:query(Pid, "INSERT INTO foo VALUES (9)"),
-    ?assertEqual({ok, [<<"bar">>], [[9]]},
-                 mysql:query(Pid, "SELECT bar FROM foo")),
-    Result = mysql:transaction(Pid, fun () ->
-                 ok = mysql:query(Pid, "INSERT INTO foo VALUES (42)"),
-                 ?assertMatch({ok, _, [[2]]},
-                              mysql:query(Pid, "SELECT COUNT(*) FROM foo")),
-                 error(hello)
-             end),
-    ?assertMatch({aborted, {hello, Stacktrace}} when is_list(Stacktrace),
-                 Result),
-    ?assertEqual({ok, [<<"bar">>], [[9]]},
-                 mysql:query(Pid, "SELECT bar FROM foo")),
-    ok = mysql:query(Pid, "DELETE FROM foo"),
-    %% Also check the abort Reason for throw and exit.
-    ?assertEqual({aborted, {throw, foo}},
-                 mysql:transaction(Pid, fun () -> throw(foo) end)),
-    ?assertEqual({aborted, foo},
-                 mysql:transaction(Pid, fun () -> exit(foo) end)).
-
-transaction_nested_success(Pid) ->
-    OuterResult = mysql:transaction(Pid, fun () ->
-        ok = mysql:query(Pid, "INSERT INTO foo VALUES (9)"),
-        InnerResult = mysql:transaction(Pid, fun () ->
-            ok = mysql:query(Pid, "INSERT INTO foo VALUES (42)"),
-            inner
-        end),
-        ?assertEqual({atomic, inner}, InnerResult),
-        outer
-    end),
-    ?assertMatch({ok, _, [[2]]}, mysql:query(Pid, "SELECT COUNT(*) FROM foo")),
-    ok = mysql:query(Pid, "DELETE FROM foo"),
-    ?assertEqual({atomic, outer}, OuterResult).
-
-transaction_inner_rollback(Pid) ->
-    OuterResult = mysql:transaction(Pid, fun () ->
-        ok = mysql:query(Pid, "INSERT INTO foo VALUES (9)"),
-        InnerResult = mysql:transaction(Pid, fun () ->
-            ok = mysql:query(Pid, "INSERT INTO foo VALUES (42)"),
-            throw(inner)
-        end),
-        ?assertEqual({aborted, {throw, inner}}, InnerResult),
-        outer
-    end),
-    ?assertMatch({ok, _, [[9]]}, mysql:query(Pid, "SELECT bar FROM foo")),
-    ok = mysql:query(Pid, "DELETE FROM foo"),
-    ?assertEqual({atomic, outer}, OuterResult).
-
 %% --- simple gen_server callbacks ---
 
 gen_server_coverage_test() ->

+ 284 - 0
test/transaction_tests.erl

@@ -0,0 +1,284 @@
+%% MySQL/OTP – MySQL client library for Erlang/OTP
+%% Copyright (C) 2014 Viktor Söderqvist
+%%
+%% This file is part of MySQL/OTP.
+%%
+%% MySQL/OTP is free software: you can redistribute it and/or modify it under
+%% the terms of the GNU Lesser General Public License as published by the Free
+%% Software Foundation, either version 3 of the License, or (at your option)
+%% any later version.
+%%
+%% This program is distributed in the hope that it will be useful, but WITHOUT
+%% ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+%% FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+%% more details.
+%%
+%% You should have received a copy of the GNU Lesser General Public License
+%% along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+%% @doc This module performs test to an actual database.
+-module(transaction_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-define(user,     "otptest").
+-define(password, "otptest").
+
+single_connection_test_() ->
+    {setup,
+     fun () ->
+         {ok, Pid} = mysql:start_link([{user, ?user}, {password, ?password},
+                                       {query_cache_time, 50},
+                                       {log_warnings, false}]),
+         ok = mysql:query(Pid, <<"DROP DATABASE IF EXISTS otptest">>),
+         ok = mysql:query(Pid, <<"CREATE DATABASE otptest">>),
+         ok = mysql:query(Pid, <<"USE otptest">>),
+         ok = mysql:query(Pid, <<"CREATE TABLE foo (bar INT) engine=InnoDB">>),
+         Pid
+     end,
+     fun (Pid) ->
+         ok = mysql:query(Pid, <<"DROP DATABASE otptest">>),
+         exit(Pid, normal)
+     end,
+     {with, [fun simple_atomic/1,
+             fun simple_aborted/1,
+             fun nested_atomic/1,
+             fun nested_inner_aborted/1,
+             fun implicit_commit/1]}}.
+
+simple_atomic(Pid) ->
+    ?assertNot(mysql:in_transaction(Pid)),
+    Result = mysql:transaction(Pid, fun () ->
+                 ok = mysql:query(Pid, "INSERT INTO foo (bar) VALUES (42)"),
+                 ?assert(mysql:in_transaction(Pid)),
+                 hello
+             end),
+    ?assertEqual({atomic, hello}, Result),
+    ?assertNot(mysql:in_transaction(Pid)),
+    ok = mysql:query(Pid, "DELETE FROM foo").
+
+simple_aborted(Pid) ->
+    ok = mysql:query(Pid, "INSERT INTO foo VALUES (9)"),
+    ?assertEqual({ok, [<<"bar">>], [[9]]},
+                 mysql:query(Pid, "SELECT bar FROM foo")),
+    Result = mysql:transaction(Pid, fun () ->
+                 ok = mysql:query(Pid, "INSERT INTO foo VALUES (42)"),
+                 ?assertMatch({ok, _, [[2]]},
+                              mysql:query(Pid, "SELECT COUNT(*) FROM foo")),
+                 error(hello)
+             end),
+    ?assertMatch({aborted, {hello, Stacktrace}} when is_list(Stacktrace),
+                 Result),
+    ?assertEqual({ok, [<<"bar">>], [[9]]},
+                 mysql:query(Pid, "SELECT bar FROM foo")),
+    ok = mysql:query(Pid, "DELETE FROM foo"),
+    %% Also check the abort Reason for throw and exit.
+    ?assertEqual({aborted, {throw, foo}},
+                 mysql:transaction(Pid, fun () -> throw(foo) end)),
+    ?assertEqual({aborted, foo},
+                 mysql:transaction(Pid, fun () -> exit(foo) end)).
+
+nested_atomic(Pid) ->
+    OuterResult = mysql:transaction(Pid, fun () ->
+        ok = mysql:query(Pid, "INSERT INTO foo VALUES (9)"),
+        InnerResult = mysql:transaction(Pid, fun () ->
+            ok = mysql:query(Pid, "INSERT INTO foo VALUES (42)"),
+            inner
+        end),
+        ?assertEqual({atomic, inner}, InnerResult),
+        outer
+    end),
+    ?assertMatch({ok, _, [[2]]}, mysql:query(Pid, "SELECT COUNT(*) FROM foo")),
+    ok = mysql:query(Pid, "DELETE FROM foo"),
+    ?assertEqual({atomic, outer}, OuterResult).
+
+nested_inner_aborted(Pid) ->
+    OuterResult = mysql:transaction(Pid, fun () ->
+        ok = mysql:query(Pid, "INSERT INTO foo VALUES (9)"),
+        InnerResult = mysql:transaction(Pid, fun () ->
+            ok = mysql:query(Pid, "INSERT INTO foo VALUES (42)"),
+            throw(inner)
+        end),
+        ?assertEqual({aborted, {throw, inner}}, InnerResult),
+        outer
+    end),
+    ?assertMatch({ok, _, [[9]]}, mysql:query(Pid, "SELECT bar FROM foo")),
+    ok = mysql:query(Pid, "DELETE FROM foo"),
+    ?assertEqual({atomic, outer}, OuterResult).
+
+implicit_commit(Conn) ->
+    %% This causes an implicit commit in a nested transaction.
+    Query = "ALTER TABLE foo ADD baz INT",
+    ?assertError({implicit_commit, Query}, mysql:transaction(Conn, fun () ->
+        mysql:transaction(Conn, fun () ->
+            mysql:query(Conn, Query)
+        end)
+    end)),
+    ?assertNot(mysql:in_transaction(Conn)).
+
+%% -----------------------------------------------------------------------------
+
+deadlock_test_() ->
+    {setup,
+     fun () ->
+         {ok, Conn1} = mysql:start_link([{user, ?user}, {password, ?password}]),
+         ok = mysql:query(Conn1, <<"CREATE DATABASE IF NOT EXISTS otptest">>),
+         ok = mysql:query(Conn1, <<"USE otptest">>),
+         ok = mysql:query(Conn1, <<"CREATE TABLE foo (k INT PRIMARY KEY, v INT)"
+                                   " engine=InnoDB">>),
+         ok = mysql:query(Conn1, "INSERT INTO foo (k,v) VALUES (1,0), (2,0)"),
+         {ok, Conn2} = mysql:start_link([{user, ?user}, {password, ?password}]),
+         ok = mysql:query(Conn2, <<"USE otptest">>),
+         {Conn1, Conn2}
+     end,
+     fun ({Conn1, Conn2}) ->
+         ok = mysql:query(Conn1, <<"DROP DATABASE otptest">>, 1000),
+         exit(Conn1, normal),
+         exit(Conn2, normal)
+     end,
+     fun (Conns) ->
+         [{"Plain queries", fun () -> deadlock_plain_queries(Conns) end},
+          {"Prep stmts", fun () -> deadlock_prepared_statements(Conns) end},
+          {"Lock wait timeout", fun () -> lock_wait_timeout(Conns) end}]
+     end}.
+
+flush_inbox() ->
+    receive _ -> flush_inbox() after 0 -> ok end.
+
+deadlock_plain_queries({Conn1, Conn2}) ->
+    {ok, _, [[2]]} = mysql:query(Conn1, "SELECT COUNT(*) FROM foo"),
+    MainPid = self(),
+    %?debugMsg("\nExtra output from the deadlock test:"),
+
+    %% Spawn worker 2 to lock rows; first in table foo, then in bar.
+    Worker2 = spawn_link(fun () ->
+        {atomic, ok} = mysql:transaction(Conn2, fun () ->
+            MainPid ! start,
+            %?debugMsg("Worker 2: Starting. First get a lock on row 2."),
+            ok = mysql:query(Conn2, "UPDATE foo SET v = 2 WHERE k = 2"),
+            %?debugMsg("Worker 2: Got lock on foo. Now wait for signal from 1."),
+            %% Sync. Send 'go' to worker 1 multiple times in case it restarts.
+            MainPid ! go, MainPid ! go, MainPid ! go,
+            receive go -> ok after 10000 -> throw(too_long) end,
+            %?debugMsg("Worker 2: Got signal from 1. Now get a lock on row 1."),
+            {atomic, ok} = mysql:transaction(Conn2, fun () ->
+                %% Nested transaction, just to make sure we can handle nested.
+                ok = mysql:query(Conn2, "UPDATE foo SET v = 2 WHERE k = 1")
+            end),
+            %?debugMsg("Worker 2: Got both locks and is done."),
+            ok
+        end),
+        MainPid ! done
+    end),
+
+    %% Do worker 1's job and lock the rows in the opposite order.
+    {atomic, ok} = mysql:transaction(Conn1, fun () ->
+        MainPid ! start,
+        %?debugMsg("Worker 1: Starting. First get a lock on row 1."),
+        ok = mysql:query(Conn1, "UPDATE foo SET v = 1 WHERE k = 1"),
+        %?debugMsg("Worker 1: Got lock on bar. Now wait for signal from 2."),
+        %% Sync. Send 'go' to worker 2 multiple times in case it restarts.
+        Worker2 ! go, Worker2 ! go, Worker2 ! go,
+        receive go -> ok after 10000 -> throw(too_long) end,
+        %?debugMsg("Worker 1: Got signal from 2. Now get lock on row 2."),
+        {atomic, ok} = mysql:transaction(Conn1, fun () ->
+            %% Nested transaction, just to make sure we can handle nested.
+            ok = mysql:query(Conn1, "UPDATE foo SET v = 1 WHERE k = 2")
+        end),
+        %?debugMsg("Worker 1: Got both locks and is done."),
+        ok
+    end),
+
+    %% Wait for a reply from worker 2 to make sure it is done.
+    receive done -> ok end,
+
+    %% None of the connections should be in a transaction at this point
+    ?assertNot(mysql:in_transaction(Conn1)),
+    ?assertNot(mysql:in_transaction(Conn2)),
+
+    %% Make sure we got at least 3 start messages, i.e. at least 1 restart.
+    ?assertEqual(ok, receive start -> ok after 0 -> no_worker_ever_started end),
+    ?assertEqual(ok, receive start -> ok after 0 -> only_one_worker_started end),
+    ?assertEqual(ok, receive start -> ok after 0 -> there_was_no_deadlock end),
+    flush_inbox().
+
+%% This case is very similar to the above test. We use prepared statements
+%% instead of plain queries. (Some lines of code in the implementation differ.)
+deadlock_prepared_statements({Conn1, Conn2}) ->
+    {ok, _, [[2]]} = mysql:query(Conn1, "SELECT COUNT(*) FROM foo"),
+    {ok, upd} = mysql:prepare(Conn1, upd, "UPDATE foo SET v = ? WHERE k = ?"),
+    {ok, upd} = mysql:prepare(Conn2, upd, "UPDATE foo SET v = ? WHERE k = ?"),
+    MainPid = self(),
+
+    %% Spawn worker 2 to lock rows; first in table foo, then in bar.
+    Worker2 = spawn_link(fun () ->
+        {atomic, ok} = mysql:transaction(Conn2, fun () ->
+            MainPid ! start,
+            ok = mysql:execute(Conn2, upd, [2, 2]),
+            %% Sync. Send 'go' to worker 1 multiple times in case it restarts.
+            MainPid ! go, MainPid ! go, MainPid ! go,
+            receive go -> ok end,
+            {atomic, ok} = mysql:transaction(Conn2, fun () ->
+                %% Nested transaction, just to make sure we can handle nested.
+                ok = mysql:execute(Conn2, upd, [2, 1])
+            end),
+            ok
+        end),
+        MainPid ! done
+    end),
+
+    %% Do worker 1's job and lock the rows in the opposite order.
+    {atomic, ok} = mysql:transaction(Conn1, fun () ->
+        MainPid ! start,
+        ok = mysql:execute(Conn1, upd, [1, 1]),
+        %% Sync. Send 'go' to worker 2 multiple times in case it restarts.
+        Worker2 ! go, Worker2 ! go, Worker2 ! go,
+        receive go -> ok end,
+        {atomic, ok} = mysql:transaction(Conn1, fun () ->
+            %% Nested transaction, just to make sure we can handle nested.
+            ok = mysql:execute(Conn1, upd, [1, 2])
+        end),
+        ok
+    end),
+
+    %% Wait for a reply from worker 2.
+    receive done -> ok end,
+
+    %% None of the connections should be in a transaction at this point
+    ?assertNot(mysql:in_transaction(Conn1)),
+    ?assertNot(mysql:in_transaction(Conn2)),
+
+    %% Make sure we got at least 3 start messages, i.e. at least 1 restart.
+    ?assertEqual(ok, receive start -> ok after 0 -> no_worker_ever_started end),
+    ?assertEqual(ok, receive start -> ok after 0 -> only_one_worker_started end),
+    ?assertEqual(ok, receive start -> ok after 0 -> there_was_no_deadlock end),
+    flush_inbox().
+
+lock_wait_timeout({Conn1, Conn2}) ->
+    %% Set the lowest timeout possible to speed up the test.
+    ok = mysql:query(Conn2, "SET innodb_lock_wait_timeout = 1"),
+    {ok, _, [[1]]} = mysql:query(Conn2, "SELECT COUNT(*) FROM foo WHERE k = 1"),
+    MainPid = self(),
+
+    %% Create a worker that takes the lock and sleeps on it.
+    LockingWorker = spawn_link(fun () ->
+        {atomic, ok} = mysql:transaction(Conn1, fun () ->
+            ok = mysql:query(Conn1, "UPDATE foo SET v = 0 WHERE k = 1"),
+            MainPid ! go,
+            receive release -> ok end
+        end),
+        MainPid ! done
+    end),
+
+    %% Wait for the locking worker to take the lock.
+    receive go -> ok end,
+    {aborted, Reason} = mysql:transaction(Conn2, fun () ->
+        ok = mysql:query(Conn2, "UPDATE foo SET v = 42 WHERE k = 1")
+    end),
+    ?assertMatch({{1205, _, <<"Lock wait timeout", _/binary>>}, _Trace},
+                 Reason),
+
+    %% Wake the sleeping worker.
+    LockingWorker ! release,
+    receive done -> ok end,
+    flush_inbox().