%% MySQL/OTP – MySQL client library for Erlang/OTP %% Copyright (C) 2014 Viktor Söderqvist %% %% This file is part of MySQL/OTP. %% %% MySQL/OTP is free software: you can redistribute it and/or modify it under %% the terms of the GNU Lesser General Public License as published by the Free %% Software Foundation, either version 3 of the License, or (at your option) %% any later version. %% %% This program is distributed in the hope that it will be useful, but WITHOUT %% ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or %% FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for %% more details. %% %% You should have received a copy of the GNU Lesser General Public License %% along with this program. If not, see . %% @doc MySQL client. %% %% The `connection()' type is a gen_server reference as described in the %% documentation for `gen_server:call/2,3', e.g. the pid or the name if the %% gen_server is locally registered. -module(mysql). -export([start_link/1, query/2, query/3, query/4, execute/3, execute/4, prepare/2, prepare/3, unprepare/2, warning_count/1, affected_rows/1, autocommit/1, insert_id/1, in_transaction/1, transaction/2, transaction/3, transaction/4]). -export_type([connection/0, server_reason/0]). -behaviour(gen_server). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -define(default_host, "localhost"). -define(default_port, 3306). -define(default_user, <<>>). -define(default_password, <<>>). -define(default_connect_timeout, 5000). -define(default_query_timeout, infinity). -define(default_query_cache_time, 60000). %% for query/3. -define(cmd_timeout, 3000). %% Timeout used for various commands to the server %% Errors that cause "implicit rollback" -define(ERROR_LOCK_WAIT_TIMEOUT, 1205). -define(ERROR_DEADLOCK, 1213). %% A connection is a ServerRef as in gen_server:call/2,3. -type connection() :: Name :: atom() | {Name :: atom(), Node :: atom()} | {global, GlobalName :: term()} | {via, Module :: atom(), ViaName :: term()} | pid(). %% MySQL error with the codes and message returned from the server. -type server_reason() :: {Code :: integer(), SQLState :: binary(), Message :: binary()}. %% @doc Starts a connection gen_server process and connects to a database. To %% disconnect just do `exit(Pid, normal)'. %% %% Options: %% %%
%%
`{name, ServerName}'
%%
If a name is provided, the gen_server will be registered with this %% name. For details see the documentation for the first argument of %% gen_server:start_link/4.
%%
`{host, Host}'
%%
Hostname of the MySQL database; default `"localhost"'.
%%
`{port, Port}'
%%
Port; default 3306 if omitted.
%%
`{user, User}'
%%
Username.
%%
`{password, Password}'
%%
Password.
%%
`{database, Database}'
%%
The name of the database AKA schema to use. This can be changed later %% using the query `USE '.
%%
`{connect_timeout, Timeout}'
%%
The maximum time to spend for start_link/1.
%%
`{log_warnings, boolean()}'
%%
Whether to fetch warnings and log them using error_logger; default %% true.
%%
`{query_timeout, Timeout}'
%%
The default time to wait for a response when executing a query or a %% prepared statement. This can be given per query using `query/3,4' and %% `execute/4'. The default is `infinity'.
%%
`{query_cache_time, Timeout}'
%%
The minimum number of milliseconds to cache prepared statements used %% for parametrized queries with query/3.
%%
-spec start_link(Options) -> {ok, pid()} | ignore | {error, term()} when Options :: [Option], Option :: {name, ServerName} | {host, iodata()} | {port, integer()} | {user, iodata()} | {password, iodata()} | {database, iodata()} | {connect_timeout, timeout()} | {log_warnings, boolean()} | {query_timeout, timeout()} | {query_cache_time, non_neg_integer()}, ServerName :: {local, Name :: atom()} | {global, GlobalName :: term()} | {via, Module :: atom(), ViaName :: term()}. start_link(Options) -> GenSrvOpts = [{timeout, proplists:get_value(connect_timeout, Options, ?default_connect_timeout)}], case proplists:get_value(name, Options) of undefined -> gen_server:start_link(?MODULE, Options, GenSrvOpts); ServerName -> gen_server:start_link(ServerName, ?MODULE, Options, GenSrvOpts) end. %% @doc Executes a query with the query timeout as given to start_link/1. -spec query(Conn, Query) -> ok | {ok, ColumnNames, Rows} | {error, Reason} when Conn :: connection(), Query :: iodata(), ColumnNames :: [binary()], Rows :: [[term()]], Reason :: server_reason(). query(Conn, Query) -> query_call(Conn, {query, Query}). %% @doc Depending on the 3rd argument this function does different things. %% %% If the 3rd argument is a list, it executes a parameterized query. This is %% equivallent to query/4 with the query timeout as given to start_link/1. %% %% If the 3rd argument is a timeout, it executes a plain query with this %% timeout. %% @see query/2. %% @see query/4. -spec query(Conn, Query, Params | Timeout) -> ok | {ok, ColumnNames, Rows} | {error, Reason} when Conn :: connection(), Query :: iodata(), Timeout :: timeout(), Params :: [term()], ColumnNames :: [binary()], Rows :: [[term()]], Reason :: server_reason(). query(Conn, Query, Params) when is_list(Params) -> query_call(Conn, {param_query, Query, Params}); query(Conn, Query, Timeout) when is_integer(Timeout); Timeout == infinity -> query_call(Conn, {query, Query, Timeout}). %% @doc Executes a parameterized query with a timeout. %% %% A prepared statement is created, executed and then cached for a certain %% time. If the same query is executed again when it is already cached, it does %% not need to be prepared again. %% %% The minimum time the prepared statement is cached can be specified using the %% option `{query_cache_time, Milliseconds}' to start_link/1. -spec query(Conn, Query, Params, Timeout) -> ok | {ok, ColumnNames, Rows} | {error, Reason} when Conn :: connection(), Query :: iodata(), Timeout :: timeout(), Params :: [term()], ColumnNames :: [binary()], Rows :: [[term()]], Reason :: server_reason(). query(Conn, Query, Params, Timeout) -> query_call(Conn, {param_query, Query, Params, Timeout}). %% @doc Executes a prepared statement with the default query timeout as given %% to start_link/1. %% @see prepare/2 %% @see prepare/3 -spec execute(Conn, StatementRef, Params) -> ok | {ok, ColumnNames, Rows} | {error, Reason} when Conn :: connection(), StatementRef :: atom() | integer(), Params :: [term()], ColumnNames :: [binary()], Rows :: [[term()]], Reason :: server_reason() | not_prepared. execute(Conn, StatementRef, Params) -> query_call(Conn, {execute, StatementRef, Params}). %% @doc Executes a prepared statement. %% @see prepare/2 %% @see prepare/3 -spec execute(Conn, StatementRef, Params, Timeout) -> ok | {ok, ColumnNames, Rows} | {error, Reason} when Conn :: connection(), StatementRef :: atom() | integer(), Params :: [term()], Timeout :: timeout(), ColumnNames :: [binary()], Rows :: [[term()]], Reason :: server_reason() | not_prepared. execute(Conn, StatementRef, Params, Timeout) -> query_call(Conn, {execute, StatementRef, Params, Timeout}). %% @doc Creates a prepared statement from the passed query. %% @see prepare/3 -spec prepare(Conn, Query) -> {ok, StatementId} | {error, Reason} when Conn :: connection(), Query :: iodata(), StatementId :: integer(), Reason :: server_reason(). prepare(Conn, Query) -> gen_server:call(Conn, {prepare, Query}). %% @doc Creates a prepared statement from the passed query and associates it %% with the given name. %% @see prepare/2 -spec prepare(Conn, Name, Query) -> {ok, Name} | {error, Reason} when Conn :: connection(), Name :: atom(), Query :: iodata(), Reason :: server_reason(). prepare(Conn, Name, Query) -> gen_server:call(Conn, {prepare, Name, Query}). %% @doc Deallocates a prepared statement. -spec unprepare(Conn, StatementRef) -> ok | {error, Reason} when Conn :: connection(), StatementRef :: atom() | integer(), Reason :: server_reason() | not_prepared. unprepare(Conn, StatementRef) -> gen_server:call(Conn, {unprepare, StatementRef}). %% @doc Returns the number of warnings generated by the last query/2 or %% execute/3 calls. -spec warning_count(connection()) -> integer(). warning_count(Conn) -> gen_server:call(Conn, warning_count). %% @doc Returns the number of inserted, updated and deleted rows of the last %% executed query or prepared statement. -spec affected_rows(connection()) -> integer(). affected_rows(Conn) -> gen_server:call(Conn, affected_rows). %% @doc Returns true if auto-commit is enabled and false otherwise. -spec autocommit(connection()) -> boolean(). autocommit(Conn) -> gen_server:call(Conn, autocommit). %% @doc Returns the last insert-id. -spec insert_id(connection()) -> integer(). insert_id(Conn) -> gen_server:call(Conn, insert_id). %% @doc Returns true if the connection is in a transaction and false otherwise. %% This works regardless of whether the transaction has been started using %% transaction/2,3 or using a plain `mysql:query(Connection, "START %% TRANSACTION")'. %% @see transaction/2 %% @see transaction/4 -spec in_transaction(connection()) -> boolean(). in_transaction(Conn) -> gen_server:call(Conn, in_transaction). %% @doc This function executes the functional object Fun as a transaction. %% @see transaction/4 -spec transaction(connection(), fun()) -> {atomic, term()} | {aborted, term()}. transaction(Conn, Fun) -> transaction(Conn, Fun, [], infinity). %% @doc This function executes the functional object Fun as a transaction. %% @see transaction/4 -spec transaction(connection(), fun(), Retries) -> {atomic, term()} | {aborted, term()} when Retries :: non_neg_integer() | infinity. transaction(Conn, Fun, Retries) -> transaction(Conn, Fun, [], Retries). %% @doc This function executes the functional object Fun with arguments Args as %% a transaction. %% %% The semantics are as close as possible to mnesia's transactions. Transactions %% can be nested and are restarted automatically when deadlocks are detected. %% MySQL's savepoints are used to implement nested transactions. %% %% Fun must be a function and Args must be a list of the same length as the %% arity of Fun. %% %% If an exception occurs within Fun, the exception is caught and `{aborted, %% Reason}' is returned. The value of `Reason' depends on the class of the %% exception. %% %% Note that an error response from a query does not cause a transaction to be %% rollbacked. To force a rollback on a MySQL error you can trigger a `badmatch' %% using e.g. `ok = mysql:query(Pid, "SELECT some_non_existent_value")'. %% Exceptions to this are error 1213 "Deadlock" (after the specified number %% retries all have failed) and error 1205 "Lock wait timeout" which causes an %% *implicit rollback*. %% %% Some queries such as ALTER TABLE cause an *implicit commit* on the server. %% If such a query is executed within a transaction, an error on the form %% `{implicit_commit, Query}' is raised. This means that the transaction has %% been committed prematurely. This also happens if an explicit COMMIT is %% executed as a plain query within a managed transaction. (Don't do that!) %% %% %% %% %% %% %% %% %% %% %% %% %% %%
Class of exceptionReturn value
`error' with reason `ErrorReason'`{aborted, {ErrorReason, Stack}}'
`exit(Term)'`{aborted, Term}'
`throw(Term)'`{aborted, {throw, Term}}'
-spec transaction(connection(), fun(), list(), Retries) -> {atomic, term()} | {aborted, term()} when Retries :: non_neg_integer() | infinity. transaction(Conn, Fun, Args, Retries) when is_list(Args), is_function(Fun, length(Args)) -> %% The guard makes sure that we can apply Fun to Args. Any error we catch %% in the try-catch are actual errors that occurred in Fun. ok = gen_server:call(Conn, start_transaction), try apply(Fun, Args) of ResultOfFun -> %% We must be able to rollback. Otherwise let's crash. ok = gen_server:call(Conn, commit), {atomic, ResultOfFun} catch throw:{implicit_rollback, N, Reason} when N >= 1 -> %% Jump out of N nested transactions to restart the outer-most one. %% The server has already rollbacked so we shouldn't do that here. case N of 1 -> case Reason of {?ERROR_DEADLOCK, _, _} when Retries == infinity -> transaction(Conn, Fun, Args, infinity); {?ERROR_DEADLOCK, _, _} when Retries > 0 -> transaction(Conn, Fun, Args, Retries - 1); _OtherImplicitRollbackError -> %% This includes the case ?ERROR_LOCK_WAIT_TIMEOUT %% which we don't restart automatically. %% We issue a rollback here since MySQL doesn't %% seem to have fully rollbacked and an extra %% rollback doesn't hurt. ok = query(Conn, <<"ROLLBACK">>), {aborted, {Reason, erlang:get_stacktrace()}} end; _ -> %% Re-throw with the same trace. We'll use that in the %% final {aborted, {Reason, Trace}} in the outer level. erlang:raise(throw, {implicit_rollback, N - 1, Reason}, erlang:get_stacktrace()) end; throw:{implicit_commit, N, Query} when N >= 1 -> %% The called did something like ALTER TABLE which resulted in an %% implicit commit. The server has already committed. We need to %% jump out of N levels of transactions. %% %% Returning 'atomic' or 'aborted' would both be wrong. Raise an %% exception is the best we can do. case N of 1 -> error({implicit_commit, Query}); _ -> erlang:raise(throw, {implicit_commit, N - 1, Query}, erlang:get_stacktrace()) end; Class:Reason -> %% We must be able to rollback. Otherwise let's crash. ok = gen_server:call(Conn, rollback), %% These forms for throw, error and exit mirror Mnesia's behaviour. Aborted = case Class of throw -> {throw, Reason}; error -> {Reason, erlang:get_stacktrace()}; exit -> Reason end, {aborted, Aborted} end. %% --- Gen_server callbacks --- -include("records.hrl"). -include("server_status.hrl"). %% Gen_server state -record(state, {server_version, connection_id, socket, host, port, user, password, log_warnings, query_timeout, query_cache_time, affected_rows = 0, status = 0, warning_count = 0, insert_id = 0, transaction_level = 0, stmts = dict:new(), query_cache = empty}). %% @private init(Opts) -> %% Connect Host = proplists:get_value(host, Opts, ?default_host), Port = proplists:get_value(port, Opts, ?default_port), User = proplists:get_value(user, Opts, ?default_user), Password = proplists:get_value(password, Opts, ?default_password), Database = proplists:get_value(database, Opts, undefined), LogWarn = proplists:get_value(log_warnings, Opts, true), Timeout = proplists:get_value(query_timeout, Opts, ?default_query_timeout), QueryCacheTime = proplists:get_value(query_cache_time, Opts, ?default_query_cache_time), %% Connect socket SockOpts = [{active, false}, binary, {packet, raw}], {ok, Socket} = gen_tcp:connect(Host, Port, SockOpts), %% Exchange handshake communication. Result = mysql_protocol:handshake(User, Password, Database, gen_tcp, Socket), case Result of #handshake{server_version = Version, connection_id = ConnId, status = Status} -> State = #state{server_version = Version, connection_id = ConnId, socket = Socket, host = Host, port = Port, user = User, password = Password, status = Status, log_warnings = LogWarn, query_timeout = Timeout, query_cache_time = QueryCacheTime}, %% Trap exit so that we can properly disconnect when we die. process_flag(trap_exit, true), {ok, State}; #error{} = E -> {stop, error_to_reason(E)} end. %% @private %% @doc %% %% Query and execute calls: %% %% %% %% For the calls listed above, we return these values: %% %%
%%
`ok'
%%
Success without returning any table data (UPDATE, etc.)
%%
`{ok, ColumnNames, Rows}'
%%
Queries returning table data
%%
`{error, ServerReason}'
%%
MySQL server error
%%
`{implicit_commit, NestingLevel, Query}'
%%
A DDL statement (e.g. CREATE TABLE, ALTER TABLE, etc.) results in %% an implicit commit. %% %% If the caller is in a (nested) transaction, it must be aborted. To be %% able to handle this in the caller's process, we also return the %% nesting level.
%%
`{implicit_rollback, NestingLevel, ServerReason}'
%%
These errors result in an implicit rollback: %%
    %%
  • `{1205, <<"HY000">>, <<"Lock wait timeout exceeded; %% try restarting transaction">>}'
  • %%
  • `{1213, <<"40001">>, <<"Deadlock found when trying to get lock; %% try restarting transaction">>}'
  • %%
%% %% If the caller is in a (nested) transaction, it must be aborted. To be %% able to handle this in the caller's process, we also return the %% nesting level.
%%
handle_call({query, Query}, From, State) -> handle_call({query, Query, State#state.query_timeout}, From, State); handle_call({query, Query, Timeout}, _From, State) -> Socket = State#state.socket, Rec = case mysql_protocol:query(Query, gen_tcp, Socket, Timeout) of {error, timeout} when State#state.server_version >= [5, 0, 0] -> kill_query(State), mysql_protocol:fetch_query_response(gen_tcp, Socket, infinity); {error, timeout} -> %% For MySQL 4.x.x there is no way to recover from timeout except %% killing the connection itself. exit(timeout); QueryResult -> QueryResult end, State1 = update_state(State, Rec), State1#state.warning_count > 0 andalso State1#state.log_warnings andalso log_warnings(State1, Query), case Rec of #ok{status = Status} when Status band ?SERVER_STATUS_IN_TRANS == 0, State1#state.transaction_level > 0 -> %% DDL statements (e.g. CREATE TABLE, ALTER TABLE, etc.) result in %% an implicit commit. Reply = {implicit_commit, State1#state.transaction_level, Query}, {reply, Reply, State1#state{transaction_level = 0}}; #ok{} -> {reply, ok, State1}; #resultset{cols = ColDefs, rows = Rows} -> Names = [Def#col.name || Def <- ColDefs], {reply, {ok, Names, Rows}, State1}; #error{code = Code} when State1#state.transaction_level > 0, (Code == ?ERROR_DEADLOCK orelse Code == ?ERROR_LOCK_WAIT_TIMEOUT) -> %% These errors result in an implicit rollback. Reply = {implicit_rollback, State1#state.transaction_level, error_to_reason(Rec)}, State2 = clear_transaction_status(State1), {reply, Reply, State2}; #error{} -> {reply, {error, error_to_reason(Rec)}, State1} end; handle_call({param_query, Query, Params}, From, State) -> handle_call({param_query, Query, Params, State#state.query_timeout}, From, State); handle_call({param_query, Query, Params, Timeout}, _From, State) -> %% Parametrized query: Prepared statement cached with the query as the key QueryBin = iolist_to_binary(Query), #state{socket = Socket} = State, Cache = State#state.query_cache, {StmtResult, Cache1} = case mysql_cache:lookup(QueryBin, Cache) of {found, FoundStmt, NewCache} -> %% Found {{ok, FoundStmt}, NewCache}; not_found -> %% Prepare Rec = mysql_protocol:prepare(Query, gen_tcp, Socket), %State1 = update_state(State, Rec), case Rec of #error{} = E -> {{error, error_to_reason(E)}, Cache}; #prepared{} = Stmt -> %% If the first entry in the cache, start the timer. Cache == empty andalso begin When = State#state.query_cache_time * 2, erlang:send_after(When, self(), query_cache) end, {{ok, Stmt}, mysql_cache:store(QueryBin, Stmt, Cache)} end end, case StmtResult of {ok, StmtRec} -> State1 = State#state{query_cache = Cache1}, {Reply, State2} = execute_stmt(StmtRec, Params, Timeout, State1), State2#state.warning_count > 0 andalso State2#state.log_warnings andalso log_warnings(State2, Query), {reply, Reply, State2}; PrepareError -> {reply, PrepareError, State} end; handle_call({execute, Stmt, Args}, From, State) -> handle_call({execute, Stmt, Args, State#state.query_timeout}, From, State); handle_call({execute, Stmt, Args, Timeout}, _From, State) -> case dict:find(Stmt, State#state.stmts) of {ok, StmtRec} -> {Reply, State1} = execute_stmt(StmtRec, Args, Timeout, State), State1#state.warning_count > 0 andalso State1#state.log_warnings andalso log_warnings(State1, io_lib:format("prepared statement ~p", [Stmt])), {reply, Reply, State1}; error -> {reply, {error, not_prepared}, State} end; handle_call({prepare, Query}, _From, State) -> #state{socket = Socket} = State, Rec = mysql_protocol:prepare(Query, gen_tcp, Socket), State1 = update_state(State, Rec), case Rec of #error{} = E -> {reply, {error, error_to_reason(E)}, State1}; #prepared{statement_id = Id} = Stmt -> Stmts1 = dict:store(Id, Stmt, State1#state.stmts), State2 = State#state{stmts = Stmts1}, {reply, {ok, Id}, State2} end; handle_call({prepare, Name, Query}, _From, State) when is_atom(Name) -> #state{socket = Socket} = State, %% First unprepare if there is an old statement with this name. State1 = case dict:find(Name, State#state.stmts) of {ok, OldStmt} -> mysql_protocol:unprepare(OldStmt, gen_tcp, Socket), State#state{stmts = dict:erase(Name, State#state.stmts)}; error -> State end, Rec = mysql_protocol:prepare(Query, gen_tcp, Socket), State2 = update_state(State1, Rec), case Rec of #error{} = E -> {reply, {error, error_to_reason(E)}, State2}; #prepared{} = Stmt -> Stmts1 = dict:store(Name, Stmt, State2#state.stmts), State3 = State2#state{stmts = Stmts1}, {reply, {ok, Name}, State3} end; handle_call({unprepare, Stmt}, _From, State) when is_atom(Stmt); is_integer(Stmt) -> case dict:find(Stmt, State#state.stmts) of {ok, StmtRec} -> #state{socket = Socket} = State, mysql_protocol:unprepare(StmtRec, gen_tcp, Socket), Stmts1 = dict:erase(Stmt, State#state.stmts), {reply, ok, State#state{stmts = Stmts1}}; error -> {reply, {error, not_prepared}, State} end; handle_call(warning_count, _From, State) -> {reply, State#state.warning_count, State}; handle_call(insert_id, _From, State) -> {reply, State#state.insert_id, State}; handle_call(affected_rows, _From, State) -> {reply, State#state.affected_rows, State}; handle_call(autocommit, _From, State) -> {reply, State#state.status band ?SERVER_STATUS_AUTOCOMMIT /= 0, State}; handle_call(in_transaction, _From, State) -> {reply, State#state.status band ?SERVER_STATUS_IN_TRANS /= 0, State}; handle_call(start_transaction, _From, State = #state{socket = Socket, transaction_level = L, status = Status}) when Status band ?SERVER_STATUS_IN_TRANS == 0, L == 0; Status band ?SERVER_STATUS_IN_TRANS /= 0, L > 0 -> Query = case L of 0 -> <<"BEGIN">>; _ -> <<"SAVEPOINT s", (integer_to_binary(L))/binary>> end, Res = #ok{} = mysql_protocol:query(Query, gen_tcp, Socket, ?cmd_timeout), State1 = update_state(State, Res), {reply, ok, State1#state{transaction_level = L + 1}}; handle_call(rollback, _From, State = #state{socket = Socket, status = Status, transaction_level = L}) when Status band ?SERVER_STATUS_IN_TRANS /= 0, L >= 1 -> Query = case L of 1 -> <<"ROLLBACK">>; _ -> <<"ROLLBACK TO s", (integer_to_binary(L - 1))/binary>> end, Res = #ok{} = mysql_protocol:query(Query, gen_tcp, Socket, ?cmd_timeout), State1 = update_state(State, Res), {reply, ok, State1#state{transaction_level = L - 1}}; handle_call(commit, _From, State = #state{socket = Socket, status = Status, transaction_level = L}) when Status band ?SERVER_STATUS_IN_TRANS /= 0, L >= 1 -> Query = case L of 1 -> <<"COMMIT">>; _ -> <<"RELEASE SAVEPOINT s", (integer_to_binary(L - 1))/binary>> end, Res = #ok{} = mysql_protocol:query(Query, gen_tcp, Socket, ?cmd_timeout), State1 = update_state(State, Res), {reply, ok, State1#state{transaction_level = L - 1}}; handle_call(Trans, _From, State) when Trans == start_transaction; Trans == rollback; Trans == commit -> %% The 'in transaction' flag doesn't match the level we have in the state. {reply, {error, incorrectly_nested}, State}. %% @private handle_cast(_Msg, State) -> {noreply, State}. %% @private handle_info(query_cache, State = #state{query_cache = Cache, query_cache_time = CacheTime}) -> %% Evict expired queries/statements in the cache used by query/3. {Evicted, Cache1} = mysql_cache:evict_older_than(Cache, CacheTime), %% Unprepare the evicted statements #state{socket = Socket} = State, lists:foreach(fun ({_Query, Stmt}) -> mysql_protocol:unprepare(Stmt, gen_tcp, Socket) end, Evicted), %% If nonempty, schedule eviction again. mysql_cache:size(Cache1) > 0 andalso erlang:send_after(CacheTime, self(), query_cache), {noreply, State#state{query_cache = Cache1}}; handle_info(_Info, State) -> {noreply, State}. %% @private terminate(Reason, State) when Reason == normal; Reason == shutdown -> %% Send the goodbye message for politeness. #state{socket = Socket} = State, mysql_protocol:quit(gen_tcp, Socket); terminate(_Reason, _State) -> ok. %% @private code_change(_OldVsn, State = #state{}, _Extra) -> {ok, State}; code_change(_OldVsn, _State, _Extra) -> {error, incompatible_state}. %% --- Helpers --- %% @doc Makes a gen_server call for a query (plain, parametrized or prepared), %% checks the reply and sometimes throws an exception when we need to jump out %% of a transaction. query_call(Conn, CallReq) -> case gen_server:call(Conn, CallReq, infinity) of {implicit_commit, _NestingLevel, _Query} = ImplicitCommit -> throw(ImplicitCommit); {implicit_rollback, _NestingLevel, _ServerReason} = ImplicitRollback -> throw(ImplicitRollback); Result -> Result end. %% @doc Executes a prepared statement and returns {Reply, NextState}. execute_stmt(Stmt, Args, Timeout, State = #state{socket = Socket}) -> Rec = case mysql_protocol:execute(Stmt, Args, gen_tcp, Socket, Timeout) of {error, timeout} when State#state.server_version >= [5, 0, 0] -> kill_query(State), mysql_protocol:fetch_execute_response(gen_tcp, Socket, infinity); {error, timeout} -> %% For MySQL 4.x.x there is no way to recover from timeout except %% killing the connection itself. exit(timeout); QueryResult -> QueryResult end, State1 = update_state(State, Rec), case Rec of #ok{} -> {ok, State1}; #error{code = Code} when State1#state.transaction_level > 0, (Code == ?ERROR_DEADLOCK orelse Code == ?ERROR_LOCK_WAIT_TIMEOUT) -> %% Implicit rollback. Reply = {implicit_rollback, State1#state.transaction_level, error_to_reason(Rec)}, State2 = clear_transaction_status(State1), {Reply, State2}; #error{} = E -> {{error, error_to_reason(E)}, State1}; #resultset{cols = ColDefs, rows = Rows} -> Names = [Def#col.name || Def <- ColDefs], {{ok, Names, Rows}, State1} end. %% @doc Produces a tuple to return as an error reason. -spec error_to_reason(#error{}) -> server_reason(). error_to_reason(#error{code = Code, state = State, msg = Msg}) -> {Code, State, Msg}. %% @doc Updates a state with information from a response. -spec update_state(#state{}, #ok{} | #eof{} | any()) -> #state{}. update_state(State, #ok{status = S, affected_rows = R, insert_id = Id, warning_count = W}) -> State#state{status = S, affected_rows = R, insert_id = Id, warning_count = W}; %update_state(State, #eof{status = S, warning_count = W}) -> % State#state{status = S, warning_count = W, affected_rows = 0}; update_state(State, #prepared{warning_count = W}) -> State#state{warning_count = W}; update_state(State, _Other) -> %% This includes errors, resultsets, etc. %% Reset warnings, etc. (Note: We don't reset status and insert_id.) State#state{warning_count = 0, affected_rows = 0}. %% @doc Since errors don't return a status but some errors cause an implicit %% rollback, we use this function to clear fix the transaction bit in the %% status. clear_transaction_status(State = #state{status = Status}) -> State#state{status = Status band bnot ?SERVER_STATUS_IN_TRANS, transaction_level = 0}. %% @doc Fetches and logs warnings. Query is the query that gave the warnings. log_warnings(#state{socket = Socket}, Query) -> #resultset{rows = Rows} = mysql_protocol:query(<<"SHOW WARNINGS">>, gen_tcp, Socket, infinity), Lines = [[Level, " ", integer_to_binary(Code), ": ", Message, "\n"] || [Level, Code, Message] <- Rows], error_logger:warning_msg("~s in ~s~n", [Lines, Query]). %% @doc Makes a separate connection and execute KILL QUERY. We do this to get %% our main connection back to normal. KILL QUERY appeared in MySQL 5.0.0. kill_query(#state{connection_id = ConnId, host = Host, port = Port, user = User, password = Password}) -> %% Connect socket SockOpts = [{active, false}, binary, {packet, raw}], {ok, Socket} = gen_tcp:connect(Host, Port, SockOpts), %% Exchange handshake communication. Result = mysql_protocol:handshake(User, Password, undefined, gen_tcp, Socket), case Result of #handshake{} -> %% Kill and disconnect IdBin = integer_to_binary(ConnId), #ok{} = mysql_protocol:query(<<"KILL QUERY ", IdBin/binary>>, gen_tcp, Socket, ?cmd_timeout), mysql_protocol:quit(gen_tcp, Socket); #error{} = E -> error_logger:error_msg("Failed to connect to kill query: ~p", [error_to_reason(E)]) end.