Browse Source

Remove redundant transaction level info in state

Also silence the error log noise from expected errors in test case for
the scenario where the calling process is killed while the connection
is in a transaction.
Viktor Söderqvist 6 years ago
parent
commit
43a85c61e5
2 changed files with 61 additions and 44 deletions
  1. 34 37
      src/mysql_conn.erl
  2. 27 7
      test/transaction_tests.erl

+ 34 - 37
src/mysql_conn.erl

@@ -54,8 +54,7 @@
                 ping_timeout,
                 query_timeout, query_cache_time,
                 affected_rows = 0, status = 0, warning_count = 0, insert_id = 0,
-                transaction_level = 0, ping_ref = undefined,
-                monitors = [],
+                transaction_levels = [], ping_ref = undefined,
                 stmts = dict:new(), query_cache = empty, cap_found_rows = false}).
 
 %% @private
@@ -290,15 +289,13 @@ handle_call(in_transaction, _From, State) ->
     {reply, State#state.status band ?SERVER_STATUS_IN_TRANS /= 0, State};
 handle_call(start_transaction, {FromPid, _},
             State = #state{socket = Socket, sockmod = SockMod,
-                           transaction_level = L, status = Status, monitors = Monitors})
-  when Status band ?SERVER_STATUS_IN_TRANS == 0, L == 0;
-       Status band ?SERVER_STATUS_IN_TRANS /= 0, L > 0 ->
-
+                           transaction_levels = L, status = Status})
+  when Status band ?SERVER_STATUS_IN_TRANS == 0, L == [];
+       Status band ?SERVER_STATUS_IN_TRANS /= 0, L /= [] ->
     MRef = erlang:monitor(process, FromPid),
-
     Query = case L of
-        0 -> <<"BEGIN">>;
-        _ -> <<"SAVEPOINT s", (integer_to_binary(L))/binary>>
+        [] -> <<"BEGIN">>;
+        _  -> <<"SAVEPOINT s", (integer_to_binary(length(L)))/binary>>
     end,
     SockMod:setopts(Socket, [{active, false}]),
     SockMod = State#state.sockmod,
@@ -306,16 +303,15 @@ handle_call(start_transaction, {FromPid, _},
                                                ?cmd_timeout),
     SockMod:setopts(Socket, [{active, once}]),
     State1 = update_state(Res, State),
-    {reply, ok, State1#state{transaction_level = L + 1, monitors = [{FromPid, MRef} | Monitors]}};
-handle_call(rollback, {FromPid, _}, State = #state{socket = Socket, sockmod = SockMod,
-                                                   status = Status, transaction_level = L,
-                                                   monitors = [{FromPid, MRef}|NewMonitors]})
-  when Status band ?SERVER_STATUS_IN_TRANS /= 0, L >= 1 ->
+    {reply, ok, State1#state{transaction_levels = [{FromPid, MRef} | L]}};
+handle_call(rollback, {FromPid, _},
+            State = #state{socket = Socket, sockmod = SockMod, status = Status,
+                           transaction_levels = [{FromPid, MRef} | L]})
+  when Status band ?SERVER_STATUS_IN_TRANS /= 0 ->
     erlang:demonitor(MRef),
-
     Query = case L of
-        1 -> <<"ROLLBACK">>;
-        _ -> <<"ROLLBACK TO s", (integer_to_binary(L - 1))/binary>>
+        [] -> <<"ROLLBACK">>;
+        _  -> <<"ROLLBACK TO s", (integer_to_binary(length(L)))/binary>>
     end,
     SockMod:setopts(Socket, [{active, false}]),
     SockMod = State#state.sockmod,
@@ -323,16 +319,15 @@ handle_call(rollback, {FromPid, _}, State = #state{socket = Socket, sockmod = So
                                                ?cmd_timeout),
     SockMod:setopts(Socket, [{active, once}]),
     State1 = update_state(Res, State),
-    {reply, ok, State1#state{transaction_level = L - 1, monitors = NewMonitors}};
-handle_call(commit, {FromPid, _}, State = #state{socket = Socket, sockmod = SockMod,
-                                                 status = Status, transaction_level = L,
-                                                 monitors = [{FromPid, MRef}|NewMonitors]})
-  when Status band ?SERVER_STATUS_IN_TRANS /= 0, L >= 1 ->
+    {reply, ok, State1#state{transaction_levels = L}};
+handle_call(commit, {FromPid, _},
+            State = #state{socket = Socket, sockmod = SockMod, status = Status,
+                           transaction_levels = [{FromPid, MRef} | L]})
+  when Status band ?SERVER_STATUS_IN_TRANS /= 0 ->
     erlang:demonitor(MRef),
-
     Query = case L of
-        1 -> <<"COMMIT">>;
-        _ -> <<"RELEASE SAVEPOINT s", (integer_to_binary(L - 1))/binary>>
+        [] -> <<"COMMIT">>;
+        _  -> <<"RELEASE SAVEPOINT s", (integer_to_binary(length(L)))/binary>>
     end,
     SockMod:setopts(Socket, [{active, false}]),
     SockMod = State#state.sockmod,
@@ -340,7 +335,7 @@ handle_call(commit, {FromPid, _}, State = #state{socket = Socket, sockmod = Sock
                                                ?cmd_timeout),
     SockMod:setopts(Socket, [{active, once}]),
     State1 = update_state(Res, State),
-    {reply, ok, State1#state{transaction_level = L - 1, monitors = NewMonitors}}.
+    {reply, ok, State1#state{transaction_levels = L}}.
 
 %% @private
 handle_cast(_Msg, State) ->
@@ -451,29 +446,31 @@ handle_query_call_reply([], _Query, State, ResultSetsAcc) ->
         [_|_]                 -> {ok, lists:reverse(ResultSetsAcc)}
     end,
     {reply, Reply, State};
-handle_query_call_reply([Rec|Recs], Query, #state{monitors = Monitors} = State, ResultSetsAcc) ->
+handle_query_call_reply([Rec|Recs], Query,
+                        #state{transaction_levels = L} = State,
+                        ResultSetsAcc) ->
     case Rec of
         #ok{status = Status} when Status band ?SERVER_STATUS_IN_TRANS == 0,
-                                  State#state.transaction_level > 0 ->
+                                  L /= [] ->
             %% DDL statements (e.g. CREATE TABLE, ALTER TABLE, etc.) result in
             %% an implicit commit.
-            Reply = {implicit_commit, State#state.transaction_level, Query},
-            NewMonitors = demonitor_processes(Monitors, length(Monitors)),
-            {reply, Reply, State#state{transaction_level = 0, monitors = NewMonitors}};
+            Length = length(L),
+            Reply = {implicit_commit, Length, Query},
+            [] = demonitor_processes(L, Length),
+            {reply, Reply, State#state{transaction_levels = []}};
         #ok{} ->
             handle_query_call_reply(Recs, Query, State, ResultSetsAcc);
         #resultset{cols = ColDefs, rows = Rows} ->
             Names = [Def#col.name || Def <- ColDefs],
             ResultSetsAcc1 = [{Names, Rows} | ResultSetsAcc],
             handle_query_call_reply(Recs, Query, State, ResultSetsAcc1);
-        #error{code = ?ERROR_DEADLOCK} when State#state.transaction_level > 0 ->
+        #error{code = ?ERROR_DEADLOCK} when L /= [] ->
             %% These errors result in an implicit rollback.
-            Reply = {implicit_rollback, State#state.transaction_level,
-                     error_to_reason(Rec)},
+            Reply = {implicit_rollback, length(L), error_to_reason(Rec)},
             %% Everything in the transaction is rolled back, except the BEGIN
             %% statement itself. Thus, we are in transaction level 1.
-            NewMonitors = demonitor_processes(Monitors, length(Monitors) -1),
-            {reply, Reply, State#state{transaction_level = 1, monitors = NewMonitors}};
+            NewMonitors = demonitor_processes(L, length(L) - 1),
+            {reply, Reply, State#state{transaction_levels = NewMonitors}};
         #error{} ->
             {reply, {error, error_to_reason(Rec)}, State}
     end.
@@ -532,4 +529,4 @@ demonitor_processes(List, 0) ->
     List;
 demonitor_processes([{_FromPid, MRef}|T], Count) ->
     erlang:demonitor(MRef),
-    demonitor_processes(T, Count -1).
+    demonitor_processes(T, Count - 1).

+ 27 - 7
test/transaction_tests.erl

@@ -48,7 +48,16 @@ single_connection_test_() ->
           {"Implicit commit",      fun () -> implicit_commit(Pid) end}]
      end}.
 
-application_process_kill_test() ->
+application_process_kill_test_() ->
+    {timeout, 30, fun application_process_kill/0}.
+
+application_process_kill() ->
+    %% This test case simulates a setup where the connection is owned by
+    %% another process, e.g. a connection pool. An application process (e.g.
+    %% a cowboy worker) is killed when it using a connection in a transaction.
+    %% In this case, the connection should not go back in the pool as it would
+    %% be in a bad state. Therefore, the connection is monitoring the process
+    %% starting a transaction and kills itself if the caller dies.
     {ok, Pid} = mysql:start_link([
         {user, ?user},
         {password, ?password},
@@ -78,17 +87,27 @@ application_process_kill_test() ->
         end)
     end),
 
-    receive killme -> exit(AppPid, kill) end,
+    %% Wait for the AppPid to be ready to be killed when in a transaction
+    receive killme -> ok end,
 
-    receive
-        {'DOWN', Mref, process, Pid, {application_process_died, AppPid}}->
-            ok
+    %% Kill AppPid, the process using the connection, capturing the noise
+    {ok, ok, LoggedErrors} = error_logger_acc:capture(fun () ->
+        exit(AppPid, kill),
+        receive
+            {'DOWN', Mref, process, Pid, {application_process_died, AppPid}} ->
+                ok
         after 10000 ->
             throw(too_long)
-    end,
+        end
+    end),
+    %% Check that we got the expected error log noise
+    ?assertMatch([{error, "Connection Id" ++ _},     %% from mysql_conn
+                  {error, "** Generic server" ++ _}, %% from gen_server
+                  {error_report, _}], LoggedErrors),
 
     ?assertNot(is_process_alive(Pid)),
 
+    %% Check that the transaction was not commited
     {ok, Pid2} = mysql:start_link([
         {user, ?user},
         {password, ?password},
@@ -96,7 +115,8 @@ application_process_kill_test() ->
         {log_warnings, false}
     ]),
     ok = mysql:query(Pid2, <<"USE otptest">>),
-    ?assertMatch({ok, _, []}, mysql:query(Pid2, <<"SELECT * from foo where bar = 42">>)),
+    ?assertMatch({ok, _, []},
+                 mysql:query(Pid2, <<"SELECT * from foo where bar = 42">>)),
     ok = mysql:query(Pid2, <<"DROP DATABASE otptest">>),
     exit(Pid2, normal).