|
@@ -29,7 +29,7 @@
|
|
encode/2, in_transaction/1,
|
|
encode/2, in_transaction/1,
|
|
transaction/2, transaction/3, transaction/4]).
|
|
transaction/2, transaction/3, transaction/4]).
|
|
|
|
|
|
--export_type([connection/0, server_reason/0]).
|
|
|
|
|
|
+-export_type([connection/0, server_reason/0, query_result/0]).
|
|
|
|
|
|
-behaviour(gen_server).
|
|
-behaviour(gen_server).
|
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
|
@@ -61,6 +61,14 @@
|
|
-type server_reason() :: {Code :: integer(), SQLState :: binary(),
|
|
-type server_reason() :: {Code :: integer(), SQLState :: binary(),
|
|
Message :: binary()}.
|
|
Message :: binary()}.
|
|
|
|
|
|
|
|
+-type column_names() :: [binary()].
|
|
|
|
+-type rows() :: [[term()]].
|
|
|
|
+
|
|
|
|
+-type query_result() :: ok
|
|
|
|
+ | {ok, column_names(), rows()}
|
|
|
|
+ | {ok, [{column_names(), rows()}, ...]}
|
|
|
|
+ | {error, server_reason()}.
|
|
|
|
+
|
|
%% @doc Starts a connection gen_server process and connects to a database. To
|
|
%% @doc Starts a connection gen_server process and connects to a database. To
|
|
%% disconnect just do `exit(Pid, normal)'.
|
|
%% disconnect just do `exit(Pid, normal)'.
|
|
%%
|
|
%%
|
|
@@ -143,7 +151,8 @@ start_link(Options) ->
|
|
lists:foreach(fun (Query) ->
|
|
lists:foreach(fun (Query) ->
|
|
case mysql:query(Pid, Query) of
|
|
case mysql:query(Pid, Query) of
|
|
ok -> ok;
|
|
ok -> ok;
|
|
- {ok, _, _} -> ok
|
|
|
|
|
|
+ {ok, _, _} -> ok;
|
|
|
|
+ {ok, _} -> ok
|
|
end
|
|
end
|
|
end,
|
|
end,
|
|
Queries),
|
|
Queries),
|
|
@@ -158,12 +167,19 @@ start_link(Options) ->
|
|
Ret.
|
|
Ret.
|
|
|
|
|
|
%% @doc Executes a query with the query timeout as given to start_link/1.
|
|
%% @doc Executes a query with the query timeout as given to start_link/1.
|
|
--spec query(Conn, Query) -> ok | {ok, ColumnNames, Rows} | {error, Reason}
|
|
|
|
|
|
+%%
|
|
|
|
+%% It is possible to execute multiple semicolon-separated queries.
|
|
|
|
+%%
|
|
|
|
+%% Results are returned in the form `{ok, ColumnNames, Rows}' if there is one
|
|
|
|
+%% result set. If there are more than one result sets, they are returned in the
|
|
|
|
+%% form `{ok, [{ColumnNames, Rows}, ...]}'.
|
|
|
|
+%%
|
|
|
|
+%% For queries that don't return any rows (INSERT, UPDATE, etc.) only the atom
|
|
|
|
+%% `ok' is returned.
|
|
|
|
+-spec query(Conn, Query) -> Result
|
|
when Conn :: connection(),
|
|
when Conn :: connection(),
|
|
Query :: iodata(),
|
|
Query :: iodata(),
|
|
- ColumnNames :: [binary()],
|
|
|
|
- Rows :: [[term()]],
|
|
|
|
- Reason :: server_reason().
|
|
|
|
|
|
+ Result :: query_result().
|
|
query(Conn, Query) ->
|
|
query(Conn, Query) ->
|
|
query_call(Conn, {query, Query}).
|
|
query_call(Conn, {query, Query}).
|
|
|
|
|
|
@@ -174,17 +190,17 @@ query(Conn, Query) ->
|
|
%%
|
|
%%
|
|
%% If the 3rd argument is a timeout, it executes a plain query with this
|
|
%% If the 3rd argument is a timeout, it executes a plain query with this
|
|
%% timeout.
|
|
%% timeout.
|
|
|
|
+%%
|
|
|
|
+%% The return value is the same as for query/2.
|
|
|
|
+%%
|
|
%% @see query/2.
|
|
%% @see query/2.
|
|
%% @see query/4.
|
|
%% @see query/4.
|
|
--spec query(Conn, Query, Params | Timeout) -> ok | {ok, ColumnNames, Rows} |
|
|
|
|
- {error, Reason}
|
|
|
|
|
|
+-spec query(Conn, Query, Params | Timeout) -> Result
|
|
when Conn :: connection(),
|
|
when Conn :: connection(),
|
|
Query :: iodata(),
|
|
Query :: iodata(),
|
|
Timeout :: timeout(),
|
|
Timeout :: timeout(),
|
|
Params :: [term()],
|
|
Params :: [term()],
|
|
- ColumnNames :: [binary()],
|
|
|
|
- Rows :: [[term()]],
|
|
|
|
- Reason :: server_reason().
|
|
|
|
|
|
+ Result :: query_result().
|
|
query(Conn, Query, Params) when is_list(Params) ->
|
|
query(Conn, Query, Params) when is_list(Params) ->
|
|
query_call(Conn, {param_query, Query, Params});
|
|
query_call(Conn, {param_query, Query, Params});
|
|
query(Conn, Query, Timeout) when is_integer(Timeout); Timeout == infinity ->
|
|
query(Conn, Query, Timeout) when is_integer(Timeout); Timeout == infinity ->
|
|
@@ -198,15 +214,14 @@ query(Conn, Query, Timeout) when is_integer(Timeout); Timeout == infinity ->
|
|
%%
|
|
%%
|
|
%% The minimum time the prepared statement is cached can be specified using the
|
|
%% The minimum time the prepared statement is cached can be specified using the
|
|
%% option `{query_cache_time, Milliseconds}' to start_link/1.
|
|
%% option `{query_cache_time, Milliseconds}' to start_link/1.
|
|
--spec query(Conn, Query, Params, Timeout) -> ok | {ok, ColumnNames, Rows} |
|
|
|
|
- {error, Reason}
|
|
|
|
|
|
+%%
|
|
|
|
+%% The return value is the same as for query/2.
|
|
|
|
+-spec query(Conn, Query, Params, Timeout) -> Result
|
|
when Conn :: connection(),
|
|
when Conn :: connection(),
|
|
Query :: iodata(),
|
|
Query :: iodata(),
|
|
Timeout :: timeout(),
|
|
Timeout :: timeout(),
|
|
Params :: [term()],
|
|
Params :: [term()],
|
|
- ColumnNames :: [binary()],
|
|
|
|
- Rows :: [[term()]],
|
|
|
|
- Reason :: server_reason().
|
|
|
|
|
|
+ Result :: query_result().
|
|
query(Conn, Query, Params, Timeout) ->
|
|
query(Conn, Query, Params, Timeout) ->
|
|
query_call(Conn, {param_query, Query, Params, Timeout}).
|
|
query_call(Conn, {param_query, Query, Params, Timeout}).
|
|
|
|
|
|
@@ -214,14 +229,11 @@ query(Conn, Query, Params, Timeout) ->
|
|
%% to start_link/1.
|
|
%% to start_link/1.
|
|
%% @see prepare/2
|
|
%% @see prepare/2
|
|
%% @see prepare/3
|
|
%% @see prepare/3
|
|
--spec execute(Conn, StatementRef, Params) ->
|
|
|
|
- ok | {ok, ColumnNames, Rows} | {error, Reason}
|
|
|
|
|
|
+-spec execute(Conn, StatementRef, Params) -> Result | {error, not_prepared}
|
|
when Conn :: connection(),
|
|
when Conn :: connection(),
|
|
StatementRef :: atom() | integer(),
|
|
StatementRef :: atom() | integer(),
|
|
Params :: [term()],
|
|
Params :: [term()],
|
|
- ColumnNames :: [binary()],
|
|
|
|
- Rows :: [[term()]],
|
|
|
|
- Reason :: server_reason() | not_prepared.
|
|
|
|
|
|
+ Result :: query_result().
|
|
execute(Conn, StatementRef, Params) ->
|
|
execute(Conn, StatementRef, Params) ->
|
|
query_call(Conn, {execute, StatementRef, Params}).
|
|
query_call(Conn, {execute, StatementRef, Params}).
|
|
|
|
|
|
@@ -229,14 +241,12 @@ execute(Conn, StatementRef, Params) ->
|
|
%% @see prepare/2
|
|
%% @see prepare/2
|
|
%% @see prepare/3
|
|
%% @see prepare/3
|
|
-spec execute(Conn, StatementRef, Params, Timeout) ->
|
|
-spec execute(Conn, StatementRef, Params, Timeout) ->
|
|
- ok | {ok, ColumnNames, Rows} | {error, Reason}
|
|
|
|
|
|
+ Result | {error, not_prepared}
|
|
when Conn :: connection(),
|
|
when Conn :: connection(),
|
|
StatementRef :: atom() | integer(),
|
|
StatementRef :: atom() | integer(),
|
|
Params :: [term()],
|
|
Params :: [term()],
|
|
Timeout :: timeout(),
|
|
Timeout :: timeout(),
|
|
- ColumnNames :: [binary()],
|
|
|
|
- Rows :: [[term()]],
|
|
|
|
- Reason :: server_reason() | not_prepared.
|
|
|
|
|
|
+ Result :: query_result().
|
|
execute(Conn, StatementRef, Params, Timeout) ->
|
|
execute(Conn, StatementRef, Params, Timeout) ->
|
|
query_call(Conn, {execute, StatementRef, Params, Timeout}).
|
|
query_call(Conn, {execute, StatementRef, Params, Timeout}).
|
|
|
|
|
|
@@ -422,7 +432,7 @@ transaction(Conn, Fun, Args, Retries) when is_list(Args),
|
|
%% parametrized queries with placeholders.
|
|
%% parametrized queries with placeholders.
|
|
%%
|
|
%%
|
|
%% @see query/3
|
|
%% @see query/3
|
|
-%% @see execute/30
|
|
|
|
|
|
+%% @see execute/3
|
|
-spec encode(connection(), term()) -> iodata().
|
|
-spec encode(connection(), term()) -> iodata().
|
|
encode(Conn, Term) ->
|
|
encode(Conn, Term) ->
|
|
Term1 = case (is_list(Term) orelse is_binary(Term)) andalso
|
|
Term1 = case (is_list(Term) orelse is_binary(Term)) andalso
|
|
@@ -514,7 +524,9 @@ init(Opts) ->
|
|
%% <dt>`ok'</dt>
|
|
%% <dt>`ok'</dt>
|
|
%% <dd>Success without returning any table data (UPDATE, etc.)</dd>
|
|
%% <dd>Success without returning any table data (UPDATE, etc.)</dd>
|
|
%% <dt>`{ok, ColumnNames, Rows}'</dt>
|
|
%% <dt>`{ok, ColumnNames, Rows}'</dt>
|
|
-%% <dd>Queries returning table data</dd>
|
|
|
|
|
|
+%% <dd>Queries returning one result set of table data</dd>
|
|
|
|
+%% <dt>`{ok, [{ColumnNames, Rows}, ...]}'</dt>
|
|
|
|
+%% <dd>Queries returning more than one result set of table data</dd>
|
|
%% <dt>`{error, ServerReason}'</dt>
|
|
%% <dt>`{error, ServerReason}'</dt>
|
|
%% <dd>MySQL server error</dd>
|
|
%% <dd>MySQL server error</dd>
|
|
%% <dt>`{implicit_commit, NestingLevel, Query}'</dt>
|
|
%% <dt>`{implicit_commit, NestingLevel, Query}'</dt>
|
|
@@ -541,7 +553,7 @@ handle_call({query, Query}, From, State) ->
|
|
handle_call({query, Query, State#state.query_timeout}, From, State);
|
|
handle_call({query, Query, State#state.query_timeout}, From, State);
|
|
handle_call({query, Query, Timeout}, _From, State) ->
|
|
handle_call({query, Query, Timeout}, _From, State) ->
|
|
Socket = State#state.socket,
|
|
Socket = State#state.socket,
|
|
- Rec = case mysql_protocol:query(Query, gen_tcp, Socket, Timeout) of
|
|
|
|
|
|
+ {ok, Recs} = case mysql_protocol:query(Query, gen_tcp, Socket, Timeout) of
|
|
{error, timeout} when State#state.server_version >= [5, 0, 0] ->
|
|
{error, timeout} when State#state.server_version >= [5, 0, 0] ->
|
|
kill_query(State),
|
|
kill_query(State),
|
|
mysql_protocol:fetch_query_response(gen_tcp, Socket, ?cmd_timeout);
|
|
mysql_protocol:fetch_query_response(gen_tcp, Socket, ?cmd_timeout);
|
|
@@ -552,32 +564,10 @@ handle_call({query, Query, Timeout}, _From, State) ->
|
|
QueryResult ->
|
|
QueryResult ->
|
|
QueryResult
|
|
QueryResult
|
|
end,
|
|
end,
|
|
- State1 = update_state(State, Rec),
|
|
|
|
|
|
+ State1 = lists:foldl(fun update_state/2, State, Recs),
|
|
State1#state.warning_count > 0 andalso State1#state.log_warnings
|
|
State1#state.warning_count > 0 andalso State1#state.log_warnings
|
|
andalso log_warnings(State1, Query),
|
|
andalso log_warnings(State1, Query),
|
|
- case Rec of
|
|
|
|
- #ok{status = Status} when Status band ?SERVER_STATUS_IN_TRANS == 0,
|
|
|
|
- State1#state.transaction_level > 0 ->
|
|
|
|
- %% DDL statements (e.g. CREATE TABLE, ALTER TABLE, etc.) result in
|
|
|
|
- %% an implicit commit.
|
|
|
|
- Reply = {implicit_commit, State1#state.transaction_level, Query},
|
|
|
|
- {reply, Reply, State1#state{transaction_level = 0}};
|
|
|
|
- #ok{} ->
|
|
|
|
- {reply, ok, State1};
|
|
|
|
- #resultset{cols = ColDefs, rows = Rows} ->
|
|
|
|
- Names = [Def#col.name || Def <- ColDefs],
|
|
|
|
- {reply, {ok, Names, Rows}, State1};
|
|
|
|
- #error{code = Code} when State1#state.transaction_level > 0,
|
|
|
|
- (Code == ?ERROR_DEADLOCK orelse
|
|
|
|
- Code == ?ERROR_LOCK_WAIT_TIMEOUT) ->
|
|
|
|
- %% These errors result in an implicit rollback.
|
|
|
|
- Reply = {implicit_rollback, State1#state.transaction_level,
|
|
|
|
- error_to_reason(Rec)},
|
|
|
|
- State2 = clear_transaction_status(State1),
|
|
|
|
- {reply, Reply, State2};
|
|
|
|
- #error{} ->
|
|
|
|
- {reply, {error, error_to_reason(Rec)}, State1}
|
|
|
|
- end;
|
|
|
|
|
|
+ handle_query_call_reply(Recs, Query, State1, []);
|
|
handle_call({param_query, Query, Params}, From, State) ->
|
|
handle_call({param_query, Query, Params}, From, State) ->
|
|
handle_call({param_query, Query, Params, State#state.query_timeout}, From,
|
|
handle_call({param_query, Query, Params, State#state.query_timeout}, From,
|
|
State);
|
|
State);
|
|
@@ -593,7 +583,7 @@ handle_call({param_query, Query, Params, Timeout}, _From, State) ->
|
|
not_found ->
|
|
not_found ->
|
|
%% Prepare
|
|
%% Prepare
|
|
Rec = mysql_protocol:prepare(Query, gen_tcp, Socket),
|
|
Rec = mysql_protocol:prepare(Query, gen_tcp, Socket),
|
|
- %State1 = update_state(State, Rec),
|
|
|
|
|
|
+ %State1 = update_state(Rec, State),
|
|
case Rec of
|
|
case Rec of
|
|
#error{} = E ->
|
|
#error{} = E ->
|
|
{{error, error_to_reason(E)}, Cache};
|
|
{{error, error_to_reason(E)}, Cache};
|
|
@@ -609,10 +599,7 @@ handle_call({param_query, Query, Params, Timeout}, _From, State) ->
|
|
case StmtResult of
|
|
case StmtResult of
|
|
{ok, StmtRec} ->
|
|
{ok, StmtRec} ->
|
|
State1 = State#state{query_cache = Cache1},
|
|
State1 = State#state{query_cache = Cache1},
|
|
- {Reply, State2} = execute_stmt(StmtRec, Params, Timeout, State1),
|
|
|
|
- State2#state.warning_count > 0 andalso State2#state.log_warnings
|
|
|
|
- andalso log_warnings(State2, Query),
|
|
|
|
- {reply, Reply, State2};
|
|
|
|
|
|
+ execute_stmt(StmtRec, Params, Timeout, State1);
|
|
PrepareError ->
|
|
PrepareError ->
|
|
{reply, PrepareError, State}
|
|
{reply, PrepareError, State}
|
|
end;
|
|
end;
|
|
@@ -621,19 +608,14 @@ handle_call({execute, Stmt, Args}, From, State) ->
|
|
handle_call({execute, Stmt, Args, Timeout}, _From, State) ->
|
|
handle_call({execute, Stmt, Args, Timeout}, _From, State) ->
|
|
case dict:find(Stmt, State#state.stmts) of
|
|
case dict:find(Stmt, State#state.stmts) of
|
|
{ok, StmtRec} ->
|
|
{ok, StmtRec} ->
|
|
- {Reply, State1} = execute_stmt(StmtRec, Args, Timeout, State),
|
|
|
|
- State1#state.warning_count > 0 andalso State1#state.log_warnings
|
|
|
|
- andalso log_warnings(State1,
|
|
|
|
- io_lib:format("prepared statement ~p",
|
|
|
|
- [Stmt])),
|
|
|
|
- {reply, Reply, State1};
|
|
|
|
|
|
+ execute_stmt(StmtRec, Args, Timeout, State);
|
|
error ->
|
|
error ->
|
|
{reply, {error, not_prepared}, State}
|
|
{reply, {error, not_prepared}, State}
|
|
end;
|
|
end;
|
|
handle_call({prepare, Query}, _From, State) ->
|
|
handle_call({prepare, Query}, _From, State) ->
|
|
#state{socket = Socket} = State,
|
|
#state{socket = Socket} = State,
|
|
Rec = mysql_protocol:prepare(Query, gen_tcp, Socket),
|
|
Rec = mysql_protocol:prepare(Query, gen_tcp, Socket),
|
|
- State1 = update_state(State, Rec),
|
|
|
|
|
|
+ State1 = update_state(Rec, State),
|
|
case Rec of
|
|
case Rec of
|
|
#error{} = E ->
|
|
#error{} = E ->
|
|
{reply, {error, error_to_reason(E)}, State1};
|
|
{reply, {error, error_to_reason(E)}, State1};
|
|
@@ -653,7 +635,7 @@ handle_call({prepare, Name, Query}, _From, State) when is_atom(Name) ->
|
|
State
|
|
State
|
|
end,
|
|
end,
|
|
Rec = mysql_protocol:prepare(Query, gen_tcp, Socket),
|
|
Rec = mysql_protocol:prepare(Query, gen_tcp, Socket),
|
|
- State2 = update_state(State1, Rec),
|
|
|
|
|
|
+ State2 = update_state(Rec, State1),
|
|
case Rec of
|
|
case Rec of
|
|
#error{} = E ->
|
|
#error{} = E ->
|
|
{reply, {error, error_to_reason(E)}, State2};
|
|
{reply, {error, error_to_reason(E)}, State2};
|
|
@@ -695,8 +677,9 @@ handle_call(start_transaction, _From,
|
|
0 -> <<"BEGIN">>;
|
|
0 -> <<"BEGIN">>;
|
|
_ -> <<"SAVEPOINT s", (integer_to_binary(L))/binary>>
|
|
_ -> <<"SAVEPOINT s", (integer_to_binary(L))/binary>>
|
|
end,
|
|
end,
|
|
- Res = #ok{} = mysql_protocol:query(Query, gen_tcp, Socket, ?cmd_timeout),
|
|
|
|
- State1 = update_state(State, Res),
|
|
|
|
|
|
+ {ok, [Res = #ok{}]} = mysql_protocol:query(Query, gen_tcp, Socket,
|
|
|
|
+ ?cmd_timeout),
|
|
|
|
+ State1 = update_state(Res, State),
|
|
{reply, ok, State1#state{transaction_level = L + 1}};
|
|
{reply, ok, State1#state{transaction_level = L + 1}};
|
|
handle_call(rollback, _From, State = #state{socket = Socket, status = Status,
|
|
handle_call(rollback, _From, State = #state{socket = Socket, status = Status,
|
|
transaction_level = L})
|
|
transaction_level = L})
|
|
@@ -705,8 +688,9 @@ handle_call(rollback, _From, State = #state{socket = Socket, status = Status,
|
|
1 -> <<"ROLLBACK">>;
|
|
1 -> <<"ROLLBACK">>;
|
|
_ -> <<"ROLLBACK TO s", (integer_to_binary(L - 1))/binary>>
|
|
_ -> <<"ROLLBACK TO s", (integer_to_binary(L - 1))/binary>>
|
|
end,
|
|
end,
|
|
- Res = #ok{} = mysql_protocol:query(Query, gen_tcp, Socket, ?cmd_timeout),
|
|
|
|
- State1 = update_state(State, Res),
|
|
|
|
|
|
+ {ok, [Res = #ok{}]} = mysql_protocol:query(Query, gen_tcp, Socket,
|
|
|
|
+ ?cmd_timeout),
|
|
|
|
+ State1 = update_state(Res, State),
|
|
{reply, ok, State1#state{transaction_level = L - 1}};
|
|
{reply, ok, State1#state{transaction_level = L - 1}};
|
|
handle_call(commit, _From, State = #state{socket = Socket, status = Status,
|
|
handle_call(commit, _From, State = #state{socket = Socket, status = Status,
|
|
transaction_level = L})
|
|
transaction_level = L})
|
|
@@ -715,8 +699,9 @@ handle_call(commit, _From, State = #state{socket = Socket, status = Status,
|
|
1 -> <<"COMMIT">>;
|
|
1 -> <<"COMMIT">>;
|
|
_ -> <<"RELEASE SAVEPOINT s", (integer_to_binary(L - 1))/binary>>
|
|
_ -> <<"RELEASE SAVEPOINT s", (integer_to_binary(L - 1))/binary>>
|
|
end,
|
|
end,
|
|
- Res = #ok{} = mysql_protocol:query(Query, gen_tcp, Socket, ?cmd_timeout),
|
|
|
|
- State1 = update_state(State, Res),
|
|
|
|
|
|
+ {ok, [Res = #ok{}]} = mysql_protocol:query(Query, gen_tcp, Socket,
|
|
|
|
+ ?cmd_timeout),
|
|
|
|
+ State1 = update_state(Res, State),
|
|
{reply, ok, State1#state{transaction_level = L - 1}}.
|
|
{reply, ok, State1#state{transaction_level = L - 1}}.
|
|
|
|
|
|
%% @private
|
|
%% @private
|
|
@@ -740,7 +725,7 @@ handle_info(query_cache, State = #state{query_cache = Cache,
|
|
{noreply, State#state{query_cache = Cache1}};
|
|
{noreply, State#state{query_cache = Cache1}};
|
|
handle_info(ping, State) ->
|
|
handle_info(ping, State) ->
|
|
Ok = mysql_protocol:ping(gen_tcp, State#state.socket),
|
|
Ok = mysql_protocol:ping(gen_tcp, State#state.socket),
|
|
- {noreply, update_state(State, Ok)};
|
|
|
|
|
|
+ {noreply, update_state(Ok, State)};
|
|
handle_info(_Info, State) ->
|
|
handle_info(_Info, State) ->
|
|
{noreply, State}.
|
|
{noreply, State}.
|
|
|
|
|
|
@@ -775,7 +760,8 @@ query_call(Conn, CallReq) ->
|
|
|
|
|
|
%% @doc Executes a prepared statement and returns {Reply, NextState}.
|
|
%% @doc Executes a prepared statement and returns {Reply, NextState}.
|
|
execute_stmt(Stmt, Args, Timeout, State = #state{socket = Socket}) ->
|
|
execute_stmt(Stmt, Args, Timeout, State = #state{socket = Socket}) ->
|
|
- Rec = case mysql_protocol:execute(Stmt, Args, gen_tcp, Socket, Timeout) of
|
|
|
|
|
|
+ {ok, Recs} = case mysql_protocol:execute(Stmt, Args, gen_tcp, Socket,
|
|
|
|
+ Timeout) of
|
|
{error, timeout} when State#state.server_version >= [5, 0, 0] ->
|
|
{error, timeout} when State#state.server_version >= [5, 0, 0] ->
|
|
kill_query(State),
|
|
kill_query(State),
|
|
mysql_protocol:fetch_execute_response(gen_tcp, Socket,
|
|
mysql_protocol:fetch_execute_response(gen_tcp, Socket,
|
|
@@ -787,24 +773,10 @@ execute_stmt(Stmt, Args, Timeout, State = #state{socket = Socket}) ->
|
|
QueryResult ->
|
|
QueryResult ->
|
|
QueryResult
|
|
QueryResult
|
|
end,
|
|
end,
|
|
- State1 = update_state(State, Rec),
|
|
|
|
- case Rec of
|
|
|
|
- #ok{} ->
|
|
|
|
- {ok, State1};
|
|
|
|
- #error{code = Code} when State1#state.transaction_level > 0,
|
|
|
|
- (Code == ?ERROR_DEADLOCK orelse
|
|
|
|
- Code == ?ERROR_LOCK_WAIT_TIMEOUT) ->
|
|
|
|
- %% Implicit rollback.
|
|
|
|
- Reply = {implicit_rollback, State1#state.transaction_level,
|
|
|
|
- error_to_reason(Rec)},
|
|
|
|
- State2 = clear_transaction_status(State1),
|
|
|
|
- {Reply, State2};
|
|
|
|
- #error{} = E ->
|
|
|
|
- {{error, error_to_reason(E)}, State1};
|
|
|
|
- #resultset{cols = ColDefs, rows = Rows} ->
|
|
|
|
- Names = [Def#col.name || Def <- ColDefs],
|
|
|
|
- {{ok, Names, Rows}, State1}
|
|
|
|
- end.
|
|
|
|
|
|
+ State1 = lists:foldl(fun update_state/2, State, Recs),
|
|
|
|
+ State1#state.warning_count > 0 andalso State1#state.log_warnings
|
|
|
|
+ andalso log_warnings(State1, Stmt#prepared.orig_query),
|
|
|
|
+ handle_query_call_reply(Recs, Stmt#prepared.orig_query, State1, []).
|
|
|
|
|
|
%% @doc Produces a tuple to return as an error reason.
|
|
%% @doc Produces a tuple to return as an error reason.
|
|
-spec error_to_reason(#error{}) -> server_reason().
|
|
-spec error_to_reason(#error{}) -> server_reason().
|
|
@@ -813,8 +785,8 @@ error_to_reason(#error{code = Code, state = State, msg = Msg}) ->
|
|
|
|
|
|
%% @doc Updates a state with information from a response. Also re-schedules
|
|
%% @doc Updates a state with information from a response. Also re-schedules
|
|
%% ping.
|
|
%% ping.
|
|
--spec update_state(#state{}, #ok{} | #eof{} | any()) -> #state{}.
|
|
|
|
-update_state(State, Rec) ->
|
|
|
|
|
|
+-spec update_state(#ok{} | #eof{} | any(), #state{}) -> #state{}.
|
|
|
|
+update_state(Rec, State) ->
|
|
State1 = case Rec of
|
|
State1 = case Rec of
|
|
#ok{status = S, affected_rows = R, insert_id = Id, warning_count = W} ->
|
|
#ok{status = S, affected_rows = R, insert_id = Id, warning_count = W} ->
|
|
State#state{status = S, affected_rows = R, insert_id = Id,
|
|
State#state{status = S, affected_rows = R, insert_id = Id,
|
|
@@ -830,6 +802,40 @@ update_state(State, Rec) ->
|
|
end,
|
|
end,
|
|
schedule_ping(State1).
|
|
schedule_ping(State1).
|
|
|
|
|
|
|
|
+%% @doc Produces a reply for handle_call/3 for queries and prepared statements.
|
|
|
|
+handle_query_call_reply([], _Query, State, ResultSetsAcc) ->
|
|
|
|
+ Reply = case ResultSetsAcc of
|
|
|
|
+ [] -> ok;
|
|
|
|
+ [{ColumnNames, Rows}] -> {ok, ColumnNames, Rows};
|
|
|
|
+ [_|_] -> {ok, lists:reverse(ResultSetsAcc)}
|
|
|
|
+ end,
|
|
|
|
+ {reply, Reply, State};
|
|
|
|
+handle_query_call_reply([Rec|Recs], Query, State, ResultSetsAcc) ->
|
|
|
|
+ case Rec of
|
|
|
|
+ #ok{status = Status} when Status band ?SERVER_STATUS_IN_TRANS == 0,
|
|
|
|
+ State#state.transaction_level > 0 ->
|
|
|
|
+ %% DDL statements (e.g. CREATE TABLE, ALTER TABLE, etc.) result in
|
|
|
|
+ %% an implicit commit.
|
|
|
|
+ Reply = {implicit_commit, State#state.transaction_level, Query},
|
|
|
|
+ {reply, Reply, State#state{transaction_level = 0}};
|
|
|
|
+ #ok{} ->
|
|
|
|
+ handle_query_call_reply(Recs, Query, State, ResultSetsAcc);
|
|
|
|
+ #resultset{cols = ColDefs, rows = Rows} ->
|
|
|
|
+ Names = [Def#col.name || Def <- ColDefs],
|
|
|
|
+ ResultSetsAcc1 = [{Names, Rows} | ResultSetsAcc],
|
|
|
|
+ handle_query_call_reply(Recs, Query, State, ResultSetsAcc1);
|
|
|
|
+ #error{code = Code} when State#state.transaction_level > 0,
|
|
|
|
+ (Code == ?ERROR_DEADLOCK orelse
|
|
|
|
+ Code == ?ERROR_LOCK_WAIT_TIMEOUT) ->
|
|
|
|
+ %% These errors result in an implicit rollback.
|
|
|
|
+ Reply = {implicit_rollback, State#state.transaction_level,
|
|
|
|
+ error_to_reason(Rec)},
|
|
|
|
+ State2 = clear_transaction_status(State),
|
|
|
|
+ {reply, Reply, State2};
|
|
|
|
+ #error{} ->
|
|
|
|
+ {reply, {error, error_to_reason(Rec)}, State}
|
|
|
|
+ end.
|
|
|
|
+
|
|
%% @doc Schedules (or re-schedules) ping.
|
|
%% @doc Schedules (or re-schedules) ping.
|
|
schedule_ping(State = #state{ping_timeout = infinity}) ->
|
|
schedule_ping(State = #state{ping_timeout = infinity}) ->
|
|
State;
|
|
State;
|
|
@@ -846,8 +852,9 @@ clear_transaction_status(State = #state{status = Status}) ->
|
|
|
|
|
|
%% @doc Fetches and logs warnings. Query is the query that gave the warnings.
|
|
%% @doc Fetches and logs warnings. Query is the query that gave the warnings.
|
|
log_warnings(#state{socket = Socket}, Query) ->
|
|
log_warnings(#state{socket = Socket}, Query) ->
|
|
- #resultset{rows = Rows} = mysql_protocol:query(<<"SHOW WARNINGS">>, gen_tcp,
|
|
|
|
- Socket, ?cmd_timeout),
|
|
|
|
|
|
+ {ok, [#resultset{rows = Rows}]} = mysql_protocol:query(<<"SHOW WARNINGS">>,
|
|
|
|
+ gen_tcp, Socket,
|
|
|
|
+ ?cmd_timeout),
|
|
Lines = [[Level, " ", integer_to_binary(Code), ": ", Message, "\n"]
|
|
Lines = [[Level, " ", integer_to_binary(Code), ": ", Message, "\n"]
|
|
|| [Level, Code, Message] <- Rows],
|
|
|| [Level, Code, Message] <- Rows],
|
|
error_logger:warning_msg("~s in ~s~n", [Lines, Query]).
|
|
error_logger:warning_msg("~s in ~s~n", [Lines, Query]).
|
|
@@ -867,8 +874,9 @@ kill_query(#state{connection_id = ConnId, host = Host, port = Port,
|
|
#handshake{} ->
|
|
#handshake{} ->
|
|
%% Kill and disconnect
|
|
%% Kill and disconnect
|
|
IdBin = integer_to_binary(ConnId),
|
|
IdBin = integer_to_binary(ConnId),
|
|
- #ok{} = mysql_protocol:query(<<"KILL QUERY ", IdBin/binary>>,
|
|
|
|
- gen_tcp, Socket, ?cmd_timeout),
|
|
|
|
|
|
+ {ok, [#ok{}]} = mysql_protocol:query(<<"KILL QUERY ",
|
|
|
|
+ IdBin/binary>>, gen_tcp,
|
|
|
|
+ Socket, ?cmd_timeout),
|
|
mysql_protocol:quit(gen_tcp, Socket);
|
|
mysql_protocol:quit(gen_tcp, Socket);
|
|
#error{} = E ->
|
|
#error{} = E ->
|
|
error_logger:error_msg("Failed to connect to kill query: ~p",
|
|
error_logger:error_msg("Failed to connect to kill query: ~p",
|