Browse Source

Detect transaction owned by another process and return error (#190)

If query/N or execute/N is called on a connection which is
currently in a transaction owned by another process, `{error,
busy}` is returned.

If transaction/N is called is called on a connection which is
currently in a transaction owned by another process, `{aborted,
busy}` is returned.

The other process and its transaction are unaffected.
chenduo 3 years ago
parent
commit
2ea95505a7
2 changed files with 39 additions and 3 deletions
  1. 18 3
      src/mysql.erl
  2. 21 0
      src/mysql_conn.erl

+ 18 - 3
src/mysql.erl

@@ -68,7 +68,7 @@
 -type query_result() :: ok
                       | {ok, [column_name()], [row()]}
                       | {ok, [{[column_name()], [row()]}, ...]}
-                      | {error, server_reason()}.
+                      | {error, server_reason() | busy}.
 
 -type transaction_result(Result) :: {atomic, Result} | {aborted, Reason :: term()}.
 
@@ -366,6 +366,9 @@ query(Conn, Query, Params, FilterMap) when (Params == no_params orelse
 %% For queries that don't return any rows (INSERT, UPDATE, etc.) only the atom
 %% `ok' is returned.
 %%
+%% If this function is called on a connection which is already in transaction 
+%% owned by another process, `{error, busy}` will be returned.
+%%
 %% === FilterMap details ===
 %%
 %% If the `FilterMap' argument is used, it must be a function of arity 1 or 2
@@ -498,6 +501,9 @@ execute(Conn, StatementRef, Params, FilterMap) when FilterMap == no_filtermap_fu
 %%
 %% See `query/5' for an explanation of the `FilterMap' argument.
 %%
+%% Note that if this function is called on a connection which is already in transaction 
+%% owned by another process, `{error, busy}` will be returned.
+%%
 %% @see prepare/2
 %% @see prepare/3
 %% @see prepare/4
@@ -618,6 +624,11 @@ transaction(Conn, Fun, Retries) ->
 %% can be nested and are restarted automatically when deadlocks are detected.
 %% MySQL's savepoints are used to implement nested transactions.
 %%
+%% If this function is called on a connection which is already in a transaction
+%% owned by another process, `{aborted, busy}` is returned. The idea of nested
+%% transactions is that this function can be called in the Fun, but all within
+%% the same process.
+%%
 %% Fun must be a function and Args must be a list of the same length as the
 %% arity of Fun.
 %%
@@ -665,8 +676,12 @@ transaction(Conn, Fun, Args, Retries) when is_list(Args),
                                            is_function(Fun, length(Args)) ->
     %% The guard makes sure that we can apply Fun to Args. Any error we catch
     %% in the try-catch are actual errors that occurred in Fun.
-    ok = gen_server:call(Conn, start_transaction, infinity),
-    execute_transaction(Conn, Fun, Args, Retries).
+    case gen_server:call(Conn, start_transaction, infinity) of
+        ok -> 
+            execute_transaction(Conn, Fun, Args, Retries);
+        {error, busy} ->
+            {aborted, busy}
+    end.        
 
 %% @private
 %% @doc This is a helper for transaction/2,3,4. It performs everything except

+ 21 - 0
src/mysql_conn.erl

@@ -302,6 +302,11 @@ handle_call(Msg, From, #state{socket = undefined} = State) ->
         {error, _} = E ->
             {stop, E, State}
     end;
+handle_call({query, _Query, _FilterMap, _Timeout}, {FromPid, _},
+            State = #state{transaction_levels = [{OtherFromPid, _} | _]})
+  when FromPid =/= OtherFromPid ->
+    %% this conn is currently in transaction owned by another process
+    {reply, {error, busy}, State};
 handle_call({query, Query, FilterMap, Timeout}, _From, State) ->
     {Reply, State1} = query(Query, FilterMap, Timeout, State),
     {reply, Reply, State1};
@@ -309,6 +314,11 @@ handle_call({param_query, Query, Params, FilterMap, default_timeout}, From,
             State) ->
     handle_call({param_query, Query, Params, FilterMap,
                 State#state.query_timeout}, From, State);
+handle_call({param_query, _Query, _Params, _FilterMap, _Timeout}, {FromPid, _},
+            State = #state{transaction_levels = [{OtherFromPid, _} | _]})
+  when FromPid =/= OtherFromPid ->
+    %% this conn is currently in transaction owned by another process
+    {reply, {error, busy}, State};
 handle_call({param_query, Query, Params, FilterMap, Timeout}, _From,
             #state{socket = Socket, sockmod = SockMod} = State) ->
     %% Parametrized query: Prepared statement cached with the query as the key
@@ -346,6 +356,11 @@ handle_call({param_query, Query, Params, FilterMap, Timeout}, _From,
 handle_call({execute, Stmt, Args, FilterMap, default_timeout}, From, State) ->
     handle_call({execute, Stmt, Args, FilterMap, State#state.query_timeout},
         From, State);
+handle_call({execute, _Stmt, _Args, _FilterMap, _Timeout}, {FromPid, _},
+            State = #state{transaction_levels = [{OtherFromPid, _} | _]})
+  when FromPid =/= OtherFromPid ->
+    %% this conn is currently in transaction owned by another process
+    {reply, {error, busy}, State};
 handle_call({execute, Stmt, Args, FilterMap, Timeout}, _From, State) ->
     case dict:find(Stmt, State#state.stmts) of
         {ok, StmtRec} ->
@@ -445,6 +460,12 @@ handle_call(backslash_escapes_enabled, _From, State = #state{status = S}) ->
     {reply, S band ?SERVER_STATUS_NO_BACKSLASH_ESCAPES == 0, State};
 handle_call(in_transaction, _From, State) ->
     {reply, State#state.status band ?SERVER_STATUS_IN_TRANS /= 0, State};
+handle_call(start_transaction, {FromPid, _}, 
+            State = #state{transaction_levels = [{OtherFromPid, _}|_]})
+  when FromPid =/= OtherFromPid ->
+    %% the idea of "nested transaction" is that mysql:transaction can be called 
+    %% in the transaction function, but all within the same process
+    {reply, {error, busy}, State};
 handle_call(start_transaction, {FromPid, _},
             State = #state{socket = Socket, sockmod = SockMod,
                            transaction_levels = L, status = Status})