mysql.erl 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796
  1. %% MySQL/OTP – MySQL client library for Erlang/OTP
  2. %% Copyright (C) 2014 Viktor Söderqvist
  3. %%
  4. %% This file is part of MySQL/OTP.
  5. %%
  6. %% MySQL/OTP is free software: you can redistribute it and/or modify it under
  7. %% the terms of the GNU Lesser General Public License as published by the Free
  8. %% Software Foundation, either version 3 of the License, or (at your option)
  9. %% any later version.
  10. %%
  11. %% This program is distributed in the hope that it will be useful, but WITHOUT
  12. %% ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13. %% FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
  14. %% more details.
  15. %%
  16. %% You should have received a copy of the GNU Lesser General Public License
  17. %% along with this program. If not, see <https://www.gnu.org/licenses/>.
  18. %% @doc MySQL client.
  19. %%
  20. %% The `connection()' type is a gen_server reference as described in the
  21. %% documentation for `gen_server:call/2,3', e.g. the pid or the name if the
  22. %% gen_server is locally registered.
  23. -module(mysql).
  24. -export([start_link/1, query/2, query/3, query/4, execute/3, execute/4,
  25. prepare/2, prepare/3, unprepare/2,
  26. warning_count/1, affected_rows/1, autocommit/1, insert_id/1,
  27. in_transaction/1,
  28. transaction/2, transaction/3, transaction/4]).
  29. -export_type([connection/0, server_reason/0]).
  30. -behaviour(gen_server).
  31. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
  32. code_change/3]).
  33. -define(default_host, "localhost").
  34. -define(default_port, 3306).
  35. -define(default_user, <<>>).
  36. -define(default_password, <<>>).
  37. -define(default_connect_timeout, 5000).
  38. -define(default_query_timeout, infinity).
  39. -define(default_query_cache_time, 60000). %% for query/3.
  40. -define(cmd_timeout, 3000). %% Timeout used for various commands to the server
  41. %% Errors that cause "implicit rollback"
  42. -define(ERROR_LOCK_WAIT_TIMEOUT, 1205).
  43. -define(ERROR_DEADLOCK, 1213).
  44. %% A connection is a ServerRef as in gen_server:call/2,3.
  45. -type connection() :: Name :: atom() |
  46. {Name :: atom(), Node :: atom()} |
  47. {global, GlobalName :: term()} |
  48. {via, Module :: atom(), ViaName :: term()} |
  49. pid().
  50. %% MySQL error with the codes and message returned from the server.
  51. -type server_reason() :: {Code :: integer(), SQLState :: binary(),
  52. Message :: binary()}.
  53. %% @doc Starts a connection gen_server process and connects to a database. To
  54. %% disconnect just do `exit(Pid, normal)'.
  55. %%
  56. %% Options:
  57. %%
  58. %% <dl>
  59. %% <dt>`{name, ServerName}'</dt>
  60. %% <dd>If a name is provided, the gen_server will be registered with this
  61. %% name. For details see the documentation for the first argument of
  62. %% gen_server:start_link/4.</dd>
  63. %% <dt>`{host, Host}'</dt>
  64. %% <dd>Hostname of the MySQL database; default `"localhost"'.</dd>
  65. %% <dt>`{port, Port}'</dt>
  66. %% <dd>Port; default 3306 if omitted.</dd>
  67. %% <dt>`{user, User}'</dt>
  68. %% <dd>Username.</dd>
  69. %% <dt>`{password, Password}'</dt>
  70. %% <dd>Password.</dd>
  71. %% <dt>`{database, Database}'</dt>
  72. %% <dd>The name of the database AKA schema to use. This can be changed later
  73. %% using the query `USE <database>'.</dd>
  74. %% <dt>`{connect_timeout, Timeout}'</dt>
  75. %% <dd>The maximum time to spend for start_link/1.</dd>
  76. %% <dt>`{log_warnings, boolean()}'</dt>
  77. %% <dd>Whether to fetch warnings and log them using error_logger; default
  78. %% true.</dd>
  79. %% <dt>`{query_timeout, Timeout}'</dt>
  80. %% <dd>The default time to wait for a response when executing a query or a
  81. %% prepared statement. This can be given per query using `query/3,4' and
  82. %% `execute/4'. The default is `infinity'.</dd>
  83. %% <dt>`{query_cache_time, Timeout}'</dt>
  84. %% <dd>The minimum number of milliseconds to cache prepared statements used
  85. %% for parametrized queries with query/3.</dd>
  86. %% </dl>
  87. -spec start_link(Options) -> {ok, pid()} | ignore | {error, term()}
  88. when Options :: [Option],
  89. Option :: {name, ServerName} | {host, iodata()} | {port, integer()} |
  90. {user, iodata()} | {password, iodata()} |
  91. {database, iodata()} |
  92. {connect_timeout, timeout()} |
  93. {log_warnings, boolean()} |
  94. {query_timeout, timeout()} |
  95. {query_cache_time, non_neg_integer()},
  96. ServerName :: {local, Name :: atom()} |
  97. {global, GlobalName :: term()} |
  98. {via, Module :: atom(), ViaName :: term()}.
  99. start_link(Options) ->
  100. GenSrvOpts = [{timeout, proplists:get_value(connect_timeout, Options,
  101. ?default_connect_timeout)}],
  102. case proplists:get_value(name, Options) of
  103. undefined ->
  104. gen_server:start_link(?MODULE, Options, GenSrvOpts);
  105. ServerName ->
  106. gen_server:start_link(ServerName, ?MODULE, Options, GenSrvOpts)
  107. end.
  108. %% @doc Executes a query with the query timeout as given to start_link/1.
  109. -spec query(Conn, Query) -> ok | {ok, ColumnNames, Rows} | {error, Reason}
  110. when Conn :: connection(),
  111. Query :: iodata(),
  112. ColumnNames :: [binary()],
  113. Rows :: [[term()]],
  114. Reason :: server_reason().
  115. query(Conn, Query) ->
  116. query_call(Conn, {query, Query}).
  117. %% @doc Depending on the 3rd argument this function does different things.
  118. %%
  119. %% If the 3rd argument is a list, it executes a parameterized query. This is
  120. %% equivallent to query/4 with the query timeout as given to start_link/1.
  121. %%
  122. %% If the 3rd argument is a timeout, it executes a plain query with this
  123. %% timeout.
  124. %% @see query/2.
  125. %% @see query/4.
  126. -spec query(Conn, Query, Params | Timeout) -> ok | {ok, ColumnNames, Rows} |
  127. {error, Reason}
  128. when Conn :: connection(),
  129. Query :: iodata(),
  130. Timeout :: timeout(),
  131. Params :: [term()],
  132. ColumnNames :: [binary()],
  133. Rows :: [[term()]],
  134. Reason :: server_reason().
  135. query(Conn, Query, Params) when is_list(Params) ->
  136. query_call(Conn, {param_query, Query, Params});
  137. query(Conn, Query, Timeout) when is_integer(Timeout); Timeout == infinity ->
  138. query_call(Conn, {query, Query, Timeout}).
  139. %% @doc Executes a parameterized query with a timeout.
  140. %%
  141. %% A prepared statement is created, executed and then cached for a certain
  142. %% time. If the same query is executed again when it is already cached, it does
  143. %% not need to be prepared again.
  144. %%
  145. %% The minimum time the prepared statement is cached can be specified using the
  146. %% option `{query_cache_time, Milliseconds}' to start_link/1.
  147. -spec query(Conn, Query, Params, Timeout) -> ok | {ok, ColumnNames, Rows} |
  148. {error, Reason}
  149. when Conn :: connection(),
  150. Query :: iodata(),
  151. Timeout :: timeout(),
  152. Params :: [term()],
  153. ColumnNames :: [binary()],
  154. Rows :: [[term()]],
  155. Reason :: server_reason().
  156. query(Conn, Query, Params, Timeout) ->
  157. query_call(Conn, {param_query, Query, Params, Timeout}).
  158. %% @doc Executes a prepared statement with the default query timeout as given
  159. %% to start_link/1.
  160. %% @see prepare/2
  161. %% @see prepare/3
  162. -spec execute(Conn, StatementRef, Params) ->
  163. ok | {ok, ColumnNames, Rows} | {error, Reason}
  164. when Conn :: connection(),
  165. StatementRef :: atom() | integer(),
  166. Params :: [term()],
  167. ColumnNames :: [binary()],
  168. Rows :: [[term()]],
  169. Reason :: server_reason() | not_prepared.
  170. execute(Conn, StatementRef, Params) ->
  171. query_call(Conn, {execute, StatementRef, Params}).
  172. %% @doc Executes a prepared statement.
  173. %% @see prepare/2
  174. %% @see prepare/3
  175. -spec execute(Conn, StatementRef, Params, Timeout) ->
  176. ok | {ok, ColumnNames, Rows} | {error, Reason}
  177. when Conn :: connection(),
  178. StatementRef :: atom() | integer(),
  179. Params :: [term()],
  180. Timeout :: timeout(),
  181. ColumnNames :: [binary()],
  182. Rows :: [[term()]],
  183. Reason :: server_reason() | not_prepared.
  184. execute(Conn, StatementRef, Params, Timeout) ->
  185. query_call(Conn, {execute, StatementRef, Params, Timeout}).
  186. %% @doc Creates a prepared statement from the passed query.
  187. %% @see prepare/3
  188. -spec prepare(Conn, Query) -> {ok, StatementId} | {error, Reason}
  189. when Conn :: connection(),
  190. Query :: iodata(),
  191. StatementId :: integer(),
  192. Reason :: server_reason().
  193. prepare(Conn, Query) ->
  194. gen_server:call(Conn, {prepare, Query}).
  195. %% @doc Creates a prepared statement from the passed query and associates it
  196. %% with the given name.
  197. %% @see prepare/2
  198. -spec prepare(Conn, Name, Query) -> {ok, Name} | {error, Reason}
  199. when Conn :: connection(),
  200. Name :: atom(),
  201. Query :: iodata(),
  202. Reason :: server_reason().
  203. prepare(Conn, Name, Query) ->
  204. gen_server:call(Conn, {prepare, Name, Query}).
  205. %% @doc Deallocates a prepared statement.
  206. -spec unprepare(Conn, StatementRef) -> ok | {error, Reason}
  207. when Conn :: connection(),
  208. StatementRef :: atom() | integer(),
  209. Reason :: server_reason() | not_prepared.
  210. unprepare(Conn, StatementRef) ->
  211. gen_server:call(Conn, {unprepare, StatementRef}).
  212. %% @doc Returns the number of warnings generated by the last query/2 or
  213. %% execute/3 calls.
  214. -spec warning_count(connection()) -> integer().
  215. warning_count(Conn) ->
  216. gen_server:call(Conn, warning_count).
  217. %% @doc Returns the number of inserted, updated and deleted rows of the last
  218. %% executed query or prepared statement.
  219. -spec affected_rows(connection()) -> integer().
  220. affected_rows(Conn) ->
  221. gen_server:call(Conn, affected_rows).
  222. %% @doc Returns true if auto-commit is enabled and false otherwise.
  223. -spec autocommit(connection()) -> boolean().
  224. autocommit(Conn) ->
  225. gen_server:call(Conn, autocommit).
  226. %% @doc Returns the last insert-id.
  227. -spec insert_id(connection()) -> integer().
  228. insert_id(Conn) ->
  229. gen_server:call(Conn, insert_id).
  230. %% @doc Returns true if the connection is in a transaction and false otherwise.
  231. %% This works regardless of whether the transaction has been started using
  232. %% transaction/2,3 or using a plain `mysql:query(Connection, "START
  233. %% TRANSACTION")'.
  234. %% @see transaction/2
  235. %% @see transaction/4
  236. -spec in_transaction(connection()) -> boolean().
  237. in_transaction(Conn) ->
  238. gen_server:call(Conn, in_transaction).
  239. %% @doc This function executes the functional object Fun as a transaction.
  240. %% @see transaction/4
  241. -spec transaction(connection(), fun()) -> {atomic, term()} | {aborted, term()}.
  242. transaction(Conn, Fun) ->
  243. transaction(Conn, Fun, [], infinity).
  244. %% @doc This function executes the functional object Fun as a transaction.
  245. %% @see transaction/4
  246. -spec transaction(connection(), fun(), Retries) -> {atomic, term()} |
  247. {aborted, term()}
  248. when Retries :: non_neg_integer() | infinity.
  249. transaction(Conn, Fun, Retries) ->
  250. transaction(Conn, Fun, [], Retries).
  251. %% @doc This function executes the functional object Fun with arguments Args as
  252. %% a transaction.
  253. %%
  254. %% The semantics are as close as possible to mnesia's transactions. Transactions
  255. %% can be nested and are restarted automatically when deadlocks are detected.
  256. %% MySQL's savepoints are used to implement nested transactions.
  257. %%
  258. %% Fun must be a function and Args must be a list of the same length as the
  259. %% arity of Fun.
  260. %%
  261. %% If an exception occurs within Fun, the exception is caught and `{aborted,
  262. %% Reason}' is returned. The value of `Reason' depends on the class of the
  263. %% exception.
  264. %%
  265. %% Note that an error response from a query does not cause a transaction to be
  266. %% rollbacked. To force a rollback on a MySQL error you can trigger a `badmatch'
  267. %% using e.g. `ok = mysql:query(Pid, "SELECT some_non_existent_value")'.
  268. %% Exceptions to this are error 1213 "Deadlock" (after the specified number
  269. %% retries all have failed) and error 1205 "Lock wait timeout" which causes an
  270. %% *implicit rollback*.
  271. %%
  272. %% Some queries such as ALTER TABLE cause an *implicit commit* on the server.
  273. %% If such a query is executed within a transaction, an error on the form
  274. %% `{implicit_commit, Query}' is raised. This means that the transaction has
  275. %% been committed prematurely. This also happens if an explicit COMMIT is
  276. %% executed as a plain query within a managed transaction. (Don't do that!)
  277. %%
  278. %% <table>
  279. %% <thead>
  280. %% <tr><th>Class of exception</th><th>Return value</th></tr>
  281. %% </thead>
  282. %% <tbody>
  283. %% <tr>
  284. %% <td>`error' with reason `ErrorReason'</td>
  285. %% <td>`{aborted, {ErrorReason, Stack}}'</td>
  286. %% </tr>
  287. %% <tr><td>`exit(Term)'</td><td>`{aborted, Term}'</td></tr>
  288. %% <tr><td>`throw(Term)'</td><td>`{aborted, {throw, Term}}'</td></tr>
  289. %% </tbody>
  290. %% </table>
  291. -spec transaction(connection(), fun(), list(), Retries) -> {atomic, term()} |
  292. {aborted, term()}
  293. when Retries :: non_neg_integer() | infinity.
  294. transaction(Conn, Fun, Args, Retries) when is_list(Args),
  295. is_function(Fun, length(Args)) ->
  296. %% The guard makes sure that we can apply Fun to Args. Any error we catch
  297. %% in the try-catch are actual errors that occurred in Fun.
  298. ok = gen_server:call(Conn, start_transaction),
  299. try apply(Fun, Args) of
  300. ResultOfFun ->
  301. %% We must be able to rollback. Otherwise let's crash.
  302. ok = gen_server:call(Conn, commit),
  303. {atomic, ResultOfFun}
  304. catch
  305. throw:{implicit_rollback, N, Reason} when N >= 1 ->
  306. %% Jump out of N nested transactions to restart the outer-most one.
  307. %% The server has already rollbacked so we shouldn't do that here.
  308. case N of
  309. 1 ->
  310. case Reason of
  311. {?ERROR_DEADLOCK, _, _} when Retries == infinity ->
  312. transaction(Conn, Fun, Args, infinity);
  313. {?ERROR_DEADLOCK, _, _} when Retries > 0 ->
  314. transaction(Conn, Fun, Args, Retries - 1);
  315. _OtherImplicitRollbackError ->
  316. %% This includes the case ?ERROR_LOCK_WAIT_TIMEOUT
  317. %% which we don't restart automatically.
  318. %% We issue a rollback here since MySQL doesn't
  319. %% seem to have fully rollbacked and an extra
  320. %% rollback doesn't hurt.
  321. ok = query(Conn, <<"ROLLBACK">>),
  322. {aborted, {Reason, erlang:get_stacktrace()}}
  323. end;
  324. _ ->
  325. %% Re-throw with the same trace. We'll use that in the
  326. %% final {aborted, {Reason, Trace}} in the outer level.
  327. erlang:raise(throw, {implicit_rollback, N - 1, Reason},
  328. erlang:get_stacktrace())
  329. end;
  330. throw:{implicit_commit, N, Query} when N >= 1 ->
  331. %% The called did something like ALTER TABLE which resulted in an
  332. %% implicit commit. The server has already committed. We need to
  333. %% jump out of N levels of transactions.
  334. %%
  335. %% Returning 'atomic' or 'aborted' would both be wrong. Raise an
  336. %% exception is the best we can do.
  337. case N of
  338. 1 -> error({implicit_commit, Query});
  339. _ -> erlang:raise(throw, {implicit_commit, N - 1, Query},
  340. erlang:get_stacktrace())
  341. end;
  342. Class:Reason ->
  343. %% We must be able to rollback. Otherwise let's crash.
  344. ok = gen_server:call(Conn, rollback),
  345. %% These forms for throw, error and exit mirror Mnesia's behaviour.
  346. Aborted = case Class of
  347. throw -> {throw, Reason};
  348. error -> {Reason, erlang:get_stacktrace()};
  349. exit -> Reason
  350. end,
  351. {aborted, Aborted}
  352. end.
  353. %% --- Gen_server callbacks ---
  354. -include("records.hrl").
  355. -include("server_status.hrl").
  356. %% Gen_server state
  357. -record(state, {server_version, connection_id, socket,
  358. host, port, user, password, log_warnings,
  359. query_timeout, query_cache_time,
  360. affected_rows = 0, status = 0, warning_count = 0, insert_id = 0,
  361. transaction_level = 0,
  362. stmts = dict:new(), query_cache = empty}).
  363. %% @private
  364. init(Opts) ->
  365. %% Connect
  366. Host = proplists:get_value(host, Opts, ?default_host),
  367. Port = proplists:get_value(port, Opts, ?default_port),
  368. User = proplists:get_value(user, Opts, ?default_user),
  369. Password = proplists:get_value(password, Opts, ?default_password),
  370. Database = proplists:get_value(database, Opts, undefined),
  371. LogWarn = proplists:get_value(log_warnings, Opts, true),
  372. Timeout = proplists:get_value(query_timeout, Opts, ?default_query_timeout),
  373. QueryCacheTime = proplists:get_value(query_cache_time, Opts,
  374. ?default_query_cache_time),
  375. %% Connect socket
  376. SockOpts = [{active, false}, binary, {packet, raw}],
  377. {ok, Socket} = gen_tcp:connect(Host, Port, SockOpts),
  378. %% Exchange handshake communication.
  379. Result = mysql_protocol:handshake(User, Password, Database, gen_tcp,
  380. Socket),
  381. case Result of
  382. #handshake{server_version = Version, connection_id = ConnId,
  383. status = Status} ->
  384. State = #state{server_version = Version, connection_id = ConnId,
  385. socket = Socket,
  386. host = Host, port = Port, user = User,
  387. password = Password, status = Status,
  388. log_warnings = LogWarn,
  389. query_timeout = Timeout,
  390. query_cache_time = QueryCacheTime},
  391. %% Trap exit so that we can properly disconnect when we die.
  392. process_flag(trap_exit, true),
  393. {ok, State};
  394. #error{} = E ->
  395. {stop, error_to_reason(E)}
  396. end.
  397. %% @private
  398. %% @doc
  399. %%
  400. %% Query and execute calls:
  401. %%
  402. %% <ul>
  403. %% <li>{query, Query}</li>
  404. %% <li>{query, Query, Timeout}</li>
  405. %% <li>{param_query, Query, Params}</li>
  406. %% <li>{param_query, Query, Params, Timeout}</li>
  407. %% <li>{execute, Stmt, Args}</li>
  408. %% <li>{execute, Stmt, Args, Timeout}</li>
  409. %% </ul>
  410. %%
  411. %% For the calls listed above, we return these values:
  412. %%
  413. %% <dl>
  414. %% <dt>`ok'</dt>
  415. %% <dd>Success without returning any table data (UPDATE, etc.)</dd>
  416. %% <dt>`{ok, ColumnNames, Rows}'</dt>
  417. %% <dd>Queries returning table data</dd>
  418. %% <dt>`{error, ServerReason}'</dt>
  419. %% <dd>MySQL server error</dd>
  420. %% <dt>`{implicit_commit, NestingLevel, Query}'</dt>
  421. %% <dd>A DDL statement (e.g. CREATE TABLE, ALTER TABLE, etc.) results in
  422. %% an implicit commit.
  423. %%
  424. %% If the caller is in a (nested) transaction, it must be aborted. To be
  425. %% able to handle this in the caller's process, we also return the
  426. %% nesting level.</dd>
  427. %% <dt>`{implicit_rollback, NestingLevel, ServerReason}'</dt>
  428. %% <dd>These errors result in an implicit rollback:
  429. %% <ul>
  430. %% <li>`{1205, <<"HY000">>, <<"Lock wait timeout exceeded;
  431. %% try restarting transaction">>}'</li>
  432. %% <li>`{1213, <<"40001">>, <<"Deadlock found when trying to get lock;
  433. %% try restarting transaction">>}'</li>
  434. %% </ul>
  435. %%
  436. %% If the caller is in a (nested) transaction, it must be aborted. To be
  437. %% able to handle this in the caller's process, we also return the
  438. %% nesting level.</dd>
  439. %% </dl>
  440. handle_call({query, Query}, From, State) ->
  441. handle_call({query, Query, State#state.query_timeout}, From, State);
  442. handle_call({query, Query, Timeout}, _From, State) ->
  443. Socket = State#state.socket,
  444. Rec = case mysql_protocol:query(Query, gen_tcp, Socket, Timeout) of
  445. {error, timeout} when State#state.server_version >= [5, 0, 0] ->
  446. kill_query(State),
  447. mysql_protocol:fetch_query_response(gen_tcp, Socket, infinity);
  448. {error, timeout} ->
  449. %% For MySQL 4.x.x there is no way to recover from timeout except
  450. %% killing the connection itself.
  451. exit(timeout);
  452. QueryResult ->
  453. QueryResult
  454. end,
  455. State1 = update_state(State, Rec),
  456. State1#state.warning_count > 0 andalso State1#state.log_warnings
  457. andalso log_warnings(State1, Query),
  458. case Rec of
  459. #ok{status = Status} when Status band ?SERVER_STATUS_IN_TRANS == 0,
  460. State1#state.transaction_level > 0 ->
  461. %% DDL statements (e.g. CREATE TABLE, ALTER TABLE, etc.) result in
  462. %% an implicit commit.
  463. Reply = {implicit_commit, State1#state.transaction_level, Query},
  464. {reply, Reply, State1#state{transaction_level = 0}};
  465. #ok{} ->
  466. {reply, ok, State1};
  467. #resultset{cols = ColDefs, rows = Rows} ->
  468. Names = [Def#col.name || Def <- ColDefs],
  469. {reply, {ok, Names, Rows}, State1};
  470. #error{code = Code} when State1#state.transaction_level > 0,
  471. (Code == ?ERROR_DEADLOCK orelse
  472. Code == ?ERROR_LOCK_WAIT_TIMEOUT) ->
  473. %% These errors result in an implicit rollback.
  474. Reply = {implicit_rollback, State1#state.transaction_level,
  475. error_to_reason(Rec)},
  476. State2 = clear_transaction_status(State1),
  477. {reply, Reply, State2};
  478. #error{} ->
  479. {reply, {error, error_to_reason(Rec)}, State1}
  480. end;
  481. handle_call({param_query, Query, Params}, From, State) ->
  482. handle_call({param_query, Query, Params, State#state.query_timeout}, From,
  483. State);
  484. handle_call({param_query, Query, Params, Timeout}, _From, State) ->
  485. %% Parametrized query: Prepared statement cached with the query as the key
  486. QueryBin = iolist_to_binary(Query),
  487. #state{socket = Socket} = State,
  488. Cache = State#state.query_cache,
  489. {StmtResult, Cache1} = case mysql_cache:lookup(QueryBin, Cache) of
  490. {found, FoundStmt, NewCache} ->
  491. %% Found
  492. {{ok, FoundStmt}, NewCache};
  493. not_found ->
  494. %% Prepare
  495. Rec = mysql_protocol:prepare(Query, gen_tcp, Socket),
  496. %State1 = update_state(State, Rec),
  497. case Rec of
  498. #error{} = E ->
  499. {{error, error_to_reason(E)}, Cache};
  500. #prepared{} = Stmt ->
  501. %% If the first entry in the cache, start the timer.
  502. Cache == empty andalso begin
  503. When = State#state.query_cache_time * 2,
  504. erlang:send_after(When, self(), query_cache)
  505. end,
  506. {{ok, Stmt}, mysql_cache:store(QueryBin, Stmt, Cache)}
  507. end
  508. end,
  509. case StmtResult of
  510. {ok, StmtRec} ->
  511. State1 = State#state{query_cache = Cache1},
  512. {Reply, State2} = execute_stmt(StmtRec, Params, Timeout, State1),
  513. State2#state.warning_count > 0 andalso State2#state.log_warnings
  514. andalso log_warnings(State2, Query),
  515. {reply, Reply, State2};
  516. PrepareError ->
  517. {reply, PrepareError, State}
  518. end;
  519. handle_call({execute, Stmt, Args}, From, State) ->
  520. handle_call({execute, Stmt, Args, State#state.query_timeout}, From, State);
  521. handle_call({execute, Stmt, Args, Timeout}, _From, State) ->
  522. case dict:find(Stmt, State#state.stmts) of
  523. {ok, StmtRec} ->
  524. {Reply, State1} = execute_stmt(StmtRec, Args, Timeout, State),
  525. State1#state.warning_count > 0 andalso State1#state.log_warnings
  526. andalso log_warnings(State1,
  527. io_lib:format("prepared statement ~p",
  528. [Stmt])),
  529. {reply, Reply, State1};
  530. error ->
  531. {reply, {error, not_prepared}, State}
  532. end;
  533. handle_call({prepare, Query}, _From, State) ->
  534. #state{socket = Socket} = State,
  535. Rec = mysql_protocol:prepare(Query, gen_tcp, Socket),
  536. State1 = update_state(State, Rec),
  537. case Rec of
  538. #error{} = E ->
  539. {reply, {error, error_to_reason(E)}, State1};
  540. #prepared{statement_id = Id} = Stmt ->
  541. Stmts1 = dict:store(Id, Stmt, State1#state.stmts),
  542. State2 = State#state{stmts = Stmts1},
  543. {reply, {ok, Id}, State2}
  544. end;
  545. handle_call({prepare, Name, Query}, _From, State) when is_atom(Name) ->
  546. #state{socket = Socket} = State,
  547. %% First unprepare if there is an old statement with this name.
  548. State1 = case dict:find(Name, State#state.stmts) of
  549. {ok, OldStmt} ->
  550. mysql_protocol:unprepare(OldStmt, gen_tcp, Socket),
  551. State#state{stmts = dict:erase(Name, State#state.stmts)};
  552. error ->
  553. State
  554. end,
  555. Rec = mysql_protocol:prepare(Query, gen_tcp, Socket),
  556. State2 = update_state(State1, Rec),
  557. case Rec of
  558. #error{} = E ->
  559. {reply, {error, error_to_reason(E)}, State2};
  560. #prepared{} = Stmt ->
  561. Stmts1 = dict:store(Name, Stmt, State2#state.stmts),
  562. State3 = State2#state{stmts = Stmts1},
  563. {reply, {ok, Name}, State3}
  564. end;
  565. handle_call({unprepare, Stmt}, _From, State) when is_atom(Stmt);
  566. is_integer(Stmt) ->
  567. case dict:find(Stmt, State#state.stmts) of
  568. {ok, StmtRec} ->
  569. #state{socket = Socket} = State,
  570. mysql_protocol:unprepare(StmtRec, gen_tcp, Socket),
  571. Stmts1 = dict:erase(Stmt, State#state.stmts),
  572. {reply, ok, State#state{stmts = Stmts1}};
  573. error ->
  574. {reply, {error, not_prepared}, State}
  575. end;
  576. handle_call(warning_count, _From, State) ->
  577. {reply, State#state.warning_count, State};
  578. handle_call(insert_id, _From, State) ->
  579. {reply, State#state.insert_id, State};
  580. handle_call(affected_rows, _From, State) ->
  581. {reply, State#state.affected_rows, State};
  582. handle_call(autocommit, _From, State) ->
  583. {reply, State#state.status band ?SERVER_STATUS_AUTOCOMMIT /= 0, State};
  584. handle_call(in_transaction, _From, State) ->
  585. {reply, State#state.status band ?SERVER_STATUS_IN_TRANS /= 0, State};
  586. handle_call(start_transaction, _From,
  587. State = #state{socket = Socket, transaction_level = L,
  588. status = Status})
  589. when Status band ?SERVER_STATUS_IN_TRANS == 0, L == 0;
  590. Status band ?SERVER_STATUS_IN_TRANS /= 0, L > 0 ->
  591. Query = case L of
  592. 0 -> <<"BEGIN">>;
  593. _ -> <<"SAVEPOINT s", (integer_to_binary(L))/binary>>
  594. end,
  595. Res = #ok{} = mysql_protocol:query(Query, gen_tcp, Socket, ?cmd_timeout),
  596. State1 = update_state(State, Res),
  597. {reply, ok, State1#state{transaction_level = L + 1}};
  598. handle_call(rollback, _From, State = #state{socket = Socket, status = Status,
  599. transaction_level = L})
  600. when Status band ?SERVER_STATUS_IN_TRANS /= 0, L >= 1 ->
  601. Query = case L of
  602. 1 -> <<"ROLLBACK">>;
  603. _ -> <<"ROLLBACK TO s", (integer_to_binary(L - 1))/binary>>
  604. end,
  605. Res = #ok{} = mysql_protocol:query(Query, gen_tcp, Socket, ?cmd_timeout),
  606. State1 = update_state(State, Res),
  607. {reply, ok, State1#state{transaction_level = L - 1}};
  608. handle_call(commit, _From, State = #state{socket = Socket, status = Status,
  609. transaction_level = L})
  610. when Status band ?SERVER_STATUS_IN_TRANS /= 0, L >= 1 ->
  611. Query = case L of
  612. 1 -> <<"COMMIT">>;
  613. _ -> <<"RELEASE SAVEPOINT s", (integer_to_binary(L - 1))/binary>>
  614. end,
  615. Res = #ok{} = mysql_protocol:query(Query, gen_tcp, Socket, ?cmd_timeout),
  616. State1 = update_state(State, Res),
  617. {reply, ok, State1#state{transaction_level = L - 1}};
  618. handle_call(Trans, _From, State) when Trans == start_transaction;
  619. Trans == rollback;
  620. Trans == commit ->
  621. %% The 'in transaction' flag doesn't match the level we have in the state.
  622. {reply, {error, incorrectly_nested}, State}.
  623. %% @private
  624. handle_cast(_Msg, State) ->
  625. {noreply, State}.
  626. %% @private
  627. handle_info(query_cache, State = #state{query_cache = Cache,
  628. query_cache_time = CacheTime}) ->
  629. %% Evict expired queries/statements in the cache used by query/3.
  630. {Evicted, Cache1} = mysql_cache:evict_older_than(Cache, CacheTime),
  631. %% Unprepare the evicted statements
  632. #state{socket = Socket} = State,
  633. lists:foreach(fun ({_Query, Stmt}) ->
  634. mysql_protocol:unprepare(Stmt, gen_tcp, Socket)
  635. end,
  636. Evicted),
  637. %% If nonempty, schedule eviction again.
  638. mysql_cache:size(Cache1) > 0 andalso
  639. erlang:send_after(CacheTime, self(), query_cache),
  640. {noreply, State#state{query_cache = Cache1}};
  641. handle_info(_Info, State) ->
  642. {noreply, State}.
  643. %% @private
  644. terminate(Reason, State) when Reason == normal; Reason == shutdown ->
  645. %% Send the goodbye message for politeness.
  646. #state{socket = Socket} = State,
  647. mysql_protocol:quit(gen_tcp, Socket);
  648. terminate(_Reason, _State) ->
  649. ok.
  650. %% @private
  651. code_change(_OldVsn, State = #state{}, _Extra) ->
  652. {ok, State};
  653. code_change(_OldVsn, _State, _Extra) ->
  654. {error, incompatible_state}.
  655. %% --- Helpers ---
  656. %% @doc Makes a gen_server call for a query (plain, parametrized or prepared),
  657. %% checks the reply and sometimes throws an exception when we need to jump out
  658. %% of a transaction.
  659. query_call(Conn, CallReq) ->
  660. case gen_server:call(Conn, CallReq, infinity) of
  661. {implicit_commit, _NestingLevel, _Query} = ImplicitCommit ->
  662. throw(ImplicitCommit);
  663. {implicit_rollback, _NestingLevel, _ServerReason} = ImplicitRollback ->
  664. throw(ImplicitRollback);
  665. Result ->
  666. Result
  667. end.
  668. %% @doc Executes a prepared statement and returns {Reply, NextState}.
  669. execute_stmt(Stmt, Args, Timeout, State = #state{socket = Socket}) ->
  670. Rec = case mysql_protocol:execute(Stmt, Args, gen_tcp, Socket, Timeout) of
  671. {error, timeout} when State#state.server_version >= [5, 0, 0] ->
  672. kill_query(State),
  673. mysql_protocol:fetch_execute_response(gen_tcp, Socket, infinity);
  674. {error, timeout} ->
  675. %% For MySQL 4.x.x there is no way to recover from timeout except
  676. %% killing the connection itself.
  677. exit(timeout);
  678. QueryResult ->
  679. QueryResult
  680. end,
  681. State1 = update_state(State, Rec),
  682. case Rec of
  683. #ok{} ->
  684. {ok, State1};
  685. #error{code = Code} when State1#state.transaction_level > 0,
  686. (Code == ?ERROR_DEADLOCK orelse
  687. Code == ?ERROR_LOCK_WAIT_TIMEOUT) ->
  688. %% Implicit rollback.
  689. Reply = {implicit_rollback, State1#state.transaction_level,
  690. error_to_reason(Rec)},
  691. State2 = clear_transaction_status(State1),
  692. {Reply, State2};
  693. #error{} = E ->
  694. {{error, error_to_reason(E)}, State1};
  695. #resultset{cols = ColDefs, rows = Rows} ->
  696. Names = [Def#col.name || Def <- ColDefs],
  697. {{ok, Names, Rows}, State1}
  698. end.
  699. %% @doc Produces a tuple to return as an error reason.
  700. -spec error_to_reason(#error{}) -> server_reason().
  701. error_to_reason(#error{code = Code, state = State, msg = Msg}) ->
  702. {Code, State, Msg}.
  703. %% @doc Updates a state with information from a response.
  704. -spec update_state(#state{}, #ok{} | #eof{} | any()) -> #state{}.
  705. update_state(State, #ok{status = S, affected_rows = R,
  706. insert_id = Id, warning_count = W}) ->
  707. State#state{status = S, affected_rows = R, insert_id = Id,
  708. warning_count = W};
  709. %update_state(State, #eof{status = S, warning_count = W}) ->
  710. % State#state{status = S, warning_count = W, affected_rows = 0};
  711. update_state(State, #prepared{warning_count = W}) ->
  712. State#state{warning_count = W};
  713. update_state(State, _Other) ->
  714. %% This includes errors, resultsets, etc.
  715. %% Reset warnings, etc. (Note: We don't reset status and insert_id.)
  716. State#state{warning_count = 0, affected_rows = 0}.
  717. %% @doc Since errors don't return a status but some errors cause an implicit
  718. %% rollback, we use this function to clear fix the transaction bit in the
  719. %% status.
  720. clear_transaction_status(State = #state{status = Status}) ->
  721. State#state{status = Status band bnot ?SERVER_STATUS_IN_TRANS,
  722. transaction_level = 0}.
  723. %% @doc Fetches and logs warnings. Query is the query that gave the warnings.
  724. log_warnings(#state{socket = Socket}, Query) ->
  725. #resultset{rows = Rows} =
  726. mysql_protocol:query(<<"SHOW WARNINGS">>, gen_tcp, Socket, infinity),
  727. Lines = [[Level, " ", integer_to_binary(Code), ": ", Message, "\n"]
  728. || [Level, Code, Message] <- Rows],
  729. error_logger:warning_msg("~s in ~s~n", [Lines, Query]).
  730. %% @doc Makes a separate connection and execute KILL QUERY. We do this to get
  731. %% our main connection back to normal. KILL QUERY appeared in MySQL 5.0.0.
  732. kill_query(#state{connection_id = ConnId, host = Host, port = Port,
  733. user = User, password = Password}) ->
  734. %% Connect socket
  735. SockOpts = [{active, false}, binary, {packet, raw}],
  736. {ok, Socket} = gen_tcp:connect(Host, Port, SockOpts),
  737. %% Exchange handshake communication.
  738. Result = mysql_protocol:handshake(User, Password, undefined, gen_tcp,
  739. Socket),
  740. case Result of
  741. #handshake{} ->
  742. %% Kill and disconnect
  743. IdBin = integer_to_binary(ConnId),
  744. #ok{} = mysql_protocol:query(<<"KILL QUERY ", IdBin/binary>>,
  745. gen_tcp, Socket, ?cmd_timeout),
  746. mysql_protocol:quit(gen_tcp, Socket);
  747. #error{} = E ->
  748. error_logger:error_msg("Failed to connect to kill query: ~p",
  749. [error_to_reason(E)])
  750. end.