mysql.erl 41 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955
  1. %% MySQL/OTP – MySQL client library for Erlang/OTP
  2. %% Copyright (C) 2014-2015, 2018 Viktor Söderqvist,
  3. %% 2016 Johan Lövdahl
  4. %% 2017 Piotr Nosek, Michal Slaski
  5. %%
  6. %% This file is part of MySQL/OTP.
  7. %%
  8. %% MySQL/OTP is free software: you can redistribute it and/or modify it under
  9. %% the terms of the GNU Lesser General Public License as published by the Free
  10. %% Software Foundation, either version 3 of the License, or (at your option)
  11. %% any later version.
  12. %%
  13. %% This program is distributed in the hope that it will be useful, but WITHOUT
  14. %% ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  15. %% FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
  16. %% more details.
  17. %%
  18. %% You should have received a copy of the GNU Lesser General Public License
  19. %% along with this program. If not, see <https://www.gnu.org/licenses/>.
  20. %% @doc MySQL client.
  21. %%
  22. %% The `connection()' type is a gen_server reference as described in the
  23. %% documentation for `gen_server:call/2,3', e.g. the pid or the name if the
  24. %% gen_server is locally registered.
  25. -module(mysql).
  26. -export([start_link/1, query/2, query/3, query/4, execute/3, execute/4,
  27. prepare/2, prepare/3, unprepare/2,
  28. warning_count/1, affected_rows/1, autocommit/1, insert_id/1,
  29. encode/2, in_transaction/1,
  30. transaction/2, transaction/3, transaction/4]).
  31. -export_type([connection/0, server_reason/0, query_result/0]).
  32. -behaviour(gen_server).
  33. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
  34. code_change/3]).
  35. -define(default_host, "localhost").
  36. -define(default_port, 3306).
  37. -define(default_user, <<>>).
  38. -define(default_password, <<>>).
  39. -define(default_connect_timeout, 5000).
  40. -define(default_query_timeout, infinity).
  41. -define(default_query_cache_time, 60000). %% for query/3.
  42. -define(default_ping_timeout, 60000).
  43. -define(cmd_timeout, 3000). %% Timeout used for various commands to the server
  44. %% Errors that cause "implicit rollback"
  45. -define(ERROR_DEADLOCK, 1213).
  46. %% A connection is a ServerRef as in gen_server:call/2,3.
  47. -type connection() :: Name :: atom() |
  48. {Name :: atom(), Node :: atom()} |
  49. {global, GlobalName :: term()} |
  50. {via, Module :: atom(), ViaName :: term()} |
  51. pid().
  52. %% MySQL error with the codes and message returned from the server.
  53. -type server_reason() :: {Code :: integer(), SQLState :: binary(),
  54. Message :: binary()}.
  55. -type column_names() :: [binary()].
  56. -type rows() :: [[term()]].
  57. -type query_result() :: ok
  58. | {ok, column_names(), rows()}
  59. | {ok, [{column_names(), rows()}, ...]}
  60. | {error, server_reason()}.
  61. -include("exception.hrl").
  62. %% @doc Starts a connection gen_server process and connects to a database. To
  63. %% disconnect just do `exit(Pid, normal)'.
  64. %%
  65. %% Options:
  66. %%
  67. %% <dl>
  68. %% <dt>`{name, ServerName}'</dt>
  69. %% <dd>If a name is provided, the gen_server will be registered with this
  70. %% name. For details see the documentation for the first argument of
  71. %% gen_server:start_link/4.</dd>
  72. %% <dt>`{host, Host}'</dt>
  73. %% <dd>Hostname of the MySQL database; default `"localhost"'.</dd>
  74. %% <dt>`{port, Port}'</dt>
  75. %% <dd>Port; default 3306 if omitted.</dd>
  76. %% <dt>`{user, User}'</dt>
  77. %% <dd>Username.</dd>
  78. %% <dt>`{password, Password}'</dt>
  79. %% <dd>Password.</dd>
  80. %% <dt>`{database, Database}'</dt>
  81. %% <dd>The name of the database AKA schema to use. This can be changed later
  82. %% using the query `USE <database>'.</dd>
  83. %% <dt>`{connect_timeout, Timeout}'</dt>
  84. %% <dd>The maximum time to spend for start_link/1.</dd>
  85. %% <dt>`{log_warnings, boolean()}'</dt>
  86. %% <dd>Whether to fetch warnings and log them using error_logger; default
  87. %% true.</dd>
  88. %% <dt>`{keep_alive, boolean() | timeout()}'</dt>
  89. %% <dd>Send ping when unused for a certain time. Possible values are `true',
  90. %% `false' and `integer() > 0' for an explicit interval in milliseconds.
  91. %% The default is `false'. For `true' a default ping timeout is used.
  92. %% </dd>
  93. %% <dt>`{prepare, NamedStatements}'</dt>
  94. %% <dd>Named prepared statements to be created as soon as the connection is
  95. %% ready.</dd>
  96. %% <dt>`{queries, Queries}'</dt>
  97. %% <dd>Queries to be executed as soon as the connection is ready. Any results
  98. %% are discarded. Typically, this is used for setting time zone and other
  99. %% session variables.</dd>
  100. %% <dt>`{query_timeout, Timeout}'</dt>
  101. %% <dd>The default time to wait for a response when executing a query or a
  102. %% prepared statement. This can be given per query using `query/3,4' and
  103. %% `execute/4'. The default is `infinity'.</dd>
  104. %% <dt>`{found_rows, boolean()}'</dt>
  105. %% <dd>If set to true, the connection will be established with
  106. %% CLIENT_FOUND_ROWS capability. affected_rows/1 will now return the
  107. %% number of found rows, not the number of rows changed by the
  108. %% query.</dd>
  109. %% <dt>`{query_cache_time, Timeout}'</dt>
  110. %% <dd>The minimum number of milliseconds to cache prepared statements used
  111. %% for parametrized queries with query/3.</dd>
  112. %% <dt>`{tcp_options, Options}'</dt>
  113. %% <dd>Additional options for `gen_tcp:connect/3'. You may want to set
  114. %% `{recbuf, Size}' and `{sndbuf, Size}' if you send or receive more than
  115. %% the default (typically 8K) per query.</dd>
  116. %% </dl>
  117. -spec start_link(Options) -> {ok, pid()} | ignore | {error, term()}
  118. when Options :: [Option],
  119. Option :: {name, ServerName} | {host, iodata()} | {port, integer()} |
  120. {user, iodata()} | {password, iodata()} |
  121. {database, iodata()} |
  122. {connect_timeout, timeout()} |
  123. {log_warnings, boolean()} |
  124. {keep_alive, boolean() | timeout()} |
  125. {prepare, NamedStatements} |
  126. {queries, [iodata()]} |
  127. {query_timeout, timeout()} |
  128. {found_rows, boolean()} |
  129. {query_cache_time, non_neg_integer()},
  130. ServerName :: {local, Name :: atom()} |
  131. {global, GlobalName :: term()} |
  132. {via, Module :: atom(), ViaName :: term()},
  133. NamedStatements :: [{StatementName :: atom(), Statement :: iodata()}].
  134. start_link(Options) ->
  135. GenSrvOpts = [{timeout, proplists:get_value(connect_timeout, Options,
  136. ?default_connect_timeout)}],
  137. Ret = case proplists:get_value(name, Options) of
  138. undefined ->
  139. gen_server:start_link(?MODULE, Options, GenSrvOpts);
  140. ServerName ->
  141. gen_server:start_link(ServerName, ?MODULE, Options, GenSrvOpts)
  142. end,
  143. case Ret of
  144. {ok, Pid} ->
  145. %% Initial queries
  146. Queries = proplists:get_value(queries, Options, []),
  147. lists:foreach(fun (Query) ->
  148. case mysql:query(Pid, Query) of
  149. ok -> ok;
  150. {ok, _, _} -> ok;
  151. {ok, _} -> ok
  152. end
  153. end,
  154. Queries),
  155. %% Prepare
  156. Prepare = proplists:get_value(prepare, Options, []),
  157. lists:foreach(fun ({Name, Stmt}) ->
  158. {ok, Name} = mysql:prepare(Pid, Name, Stmt)
  159. end,
  160. Prepare);
  161. _ -> ok
  162. end,
  163. Ret.
  164. %% @doc Executes a query with the query timeout as given to start_link/1.
  165. %%
  166. %% It is possible to execute multiple semicolon-separated queries.
  167. %%
  168. %% Results are returned in the form `{ok, ColumnNames, Rows}' if there is one
  169. %% result set. If there are more than one result sets, they are returned in the
  170. %% form `{ok, [{ColumnNames, Rows}, ...]}'.
  171. %%
  172. %% For queries that don't return any rows (INSERT, UPDATE, etc.) only the atom
  173. %% `ok' is returned.
  174. -spec query(Conn, Query) -> Result
  175. when Conn :: connection(),
  176. Query :: iodata(),
  177. Result :: query_result().
  178. query(Conn, Query) ->
  179. query_call(Conn, {query, Query}).
  180. %% @doc Depending on the 3rd argument this function does different things.
  181. %%
  182. %% If the 3rd argument is a list, it executes a parameterized query. This is
  183. %% equivallent to query/4 with the query timeout as given to start_link/1.
  184. %%
  185. %% If the 3rd argument is a timeout, it executes a plain query with this
  186. %% timeout.
  187. %%
  188. %% The return value is the same as for query/2.
  189. %%
  190. %% @see query/2.
  191. %% @see query/4.
  192. -spec query(Conn, Query, Params | Timeout) -> Result
  193. when Conn :: connection(),
  194. Query :: iodata(),
  195. Timeout :: timeout(),
  196. Params :: [term()],
  197. Result :: query_result().
  198. query(Conn, Query, Params) when is_list(Params) ->
  199. query_call(Conn, {param_query, Query, Params});
  200. query(Conn, Query, Timeout) when is_integer(Timeout); Timeout == infinity ->
  201. query_call(Conn, {query, Query, Timeout}).
  202. %% @doc Executes a parameterized query with a timeout.
  203. %%
  204. %% A prepared statement is created, executed and then cached for a certain
  205. %% time. If the same query is executed again when it is already cached, it does
  206. %% not need to be prepared again.
  207. %%
  208. %% The minimum time the prepared statement is cached can be specified using the
  209. %% option `{query_cache_time, Milliseconds}' to start_link/1.
  210. %%
  211. %% The return value is the same as for query/2.
  212. -spec query(Conn, Query, Params, Timeout) -> Result
  213. when Conn :: connection(),
  214. Query :: iodata(),
  215. Timeout :: timeout(),
  216. Params :: [term()],
  217. Result :: query_result().
  218. query(Conn, Query, Params, Timeout) ->
  219. query_call(Conn, {param_query, Query, Params, Timeout}).
  220. %% @doc Executes a prepared statement with the default query timeout as given
  221. %% to start_link/1.
  222. %% @see prepare/2
  223. %% @see prepare/3
  224. -spec execute(Conn, StatementRef, Params) -> Result | {error, not_prepared}
  225. when Conn :: connection(),
  226. StatementRef :: atom() | integer(),
  227. Params :: [term()],
  228. Result :: query_result().
  229. execute(Conn, StatementRef, Params) ->
  230. query_call(Conn, {execute, StatementRef, Params}).
  231. %% @doc Executes a prepared statement.
  232. %% @see prepare/2
  233. %% @see prepare/3
  234. -spec execute(Conn, StatementRef, Params, Timeout) ->
  235. Result | {error, not_prepared}
  236. when Conn :: connection(),
  237. StatementRef :: atom() | integer(),
  238. Params :: [term()],
  239. Timeout :: timeout(),
  240. Result :: query_result().
  241. execute(Conn, StatementRef, Params, Timeout) ->
  242. query_call(Conn, {execute, StatementRef, Params, Timeout}).
  243. %% @doc Creates a prepared statement from the passed query.
  244. %% @see prepare/3
  245. -spec prepare(Conn, Query) -> {ok, StatementId} | {error, Reason}
  246. when Conn :: connection(),
  247. Query :: iodata(),
  248. StatementId :: integer(),
  249. Reason :: server_reason().
  250. prepare(Conn, Query) ->
  251. gen_server:call(Conn, {prepare, Query}).
  252. %% @doc Creates a prepared statement from the passed query and associates it
  253. %% with the given name.
  254. %% @see prepare/2
  255. -spec prepare(Conn, Name, Query) -> {ok, Name} | {error, Reason}
  256. when Conn :: connection(),
  257. Name :: atom(),
  258. Query :: iodata(),
  259. Reason :: server_reason().
  260. prepare(Conn, Name, Query) ->
  261. gen_server:call(Conn, {prepare, Name, Query}).
  262. %% @doc Deallocates a prepared statement.
  263. -spec unprepare(Conn, StatementRef) -> ok | {error, Reason}
  264. when Conn :: connection(),
  265. StatementRef :: atom() | integer(),
  266. Reason :: server_reason() | not_prepared.
  267. unprepare(Conn, StatementRef) ->
  268. gen_server:call(Conn, {unprepare, StatementRef}).
  269. %% @doc Returns the number of warnings generated by the last query/2 or
  270. %% execute/3 calls.
  271. -spec warning_count(connection()) -> integer().
  272. warning_count(Conn) ->
  273. gen_server:call(Conn, warning_count).
  274. %% @doc Returns the number of inserted, updated and deleted rows of the last
  275. %% executed query or prepared statement. If found_rows is set on the
  276. %% connection, for update operation the return value will equal to the number
  277. %% of rows matched by the query.
  278. -spec affected_rows(connection()) -> integer().
  279. affected_rows(Conn) ->
  280. gen_server:call(Conn, affected_rows).
  281. %% @doc Returns true if auto-commit is enabled and false otherwise.
  282. -spec autocommit(connection()) -> boolean().
  283. autocommit(Conn) ->
  284. gen_server:call(Conn, autocommit).
  285. %% @doc Returns the last insert-id.
  286. -spec insert_id(connection()) -> integer().
  287. insert_id(Conn) ->
  288. gen_server:call(Conn, insert_id).
  289. %% @doc Returns true if the connection is in a transaction and false otherwise.
  290. %% This works regardless of whether the transaction has been started using
  291. %% transaction/2,3 or using a plain `mysql:query(Connection, "BEGIN")'.
  292. %% @see transaction/2
  293. %% @see transaction/4
  294. -spec in_transaction(connection()) -> boolean().
  295. in_transaction(Conn) ->
  296. gen_server:call(Conn, in_transaction).
  297. %% @doc This function executes the functional object Fun as a transaction.
  298. %% @see transaction/4
  299. -spec transaction(connection(), fun()) -> {atomic, term()} | {aborted, term()}.
  300. transaction(Conn, Fun) ->
  301. transaction(Conn, Fun, [], infinity).
  302. %% @doc This function executes the functional object Fun as a transaction.
  303. %% @see transaction/4
  304. -spec transaction(connection(), fun(), Retries) -> {atomic, term()} |
  305. {aborted, term()}
  306. when Retries :: non_neg_integer() | infinity.
  307. transaction(Conn, Fun, Retries) ->
  308. transaction(Conn, Fun, [], Retries).
  309. %% @doc This function executes the functional object Fun with arguments Args as
  310. %% a transaction.
  311. %%
  312. %% The semantics are as close as possible to mnesia's transactions. Transactions
  313. %% can be nested and are restarted automatically when deadlocks are detected.
  314. %% MySQL's savepoints are used to implement nested transactions.
  315. %%
  316. %% Fun must be a function and Args must be a list of the same length as the
  317. %% arity of Fun.
  318. %%
  319. %% If an exception occurs within Fun, the exception is caught and `{aborted,
  320. %% Reason}' is returned. The value of `Reason' depends on the class of the
  321. %% exception.
  322. %%
  323. %% Note that an error response from a query does not cause a transaction to be
  324. %% rollbacked. To force a rollback on a MySQL error you can trigger a `badmatch'
  325. %% using e.g. `ok = mysql:query(Pid, "SELECT some_non_existent_value")'. An
  326. %% exception to this is the error 1213 "Deadlock", after the specified number
  327. %% of retries, all failed. In this case, the transaction is aborted and the
  328. %% error is retured as the reason for the aborted transaction, along with a
  329. %% stacktrace pointing to where the last deadlock was detected. (In earlier
  330. %% versions, up to and including 1.3.2, transactions where automatically
  331. %% restarted also for the error 1205 "Lock wait timeout". This is no longer the
  332. %% case.)
  333. %%
  334. %% Some queries such as ALTER TABLE cause an *implicit commit* on the server.
  335. %% If such a query is executed within a transaction, an error on the form
  336. %% `{implicit_commit, Query}' is raised. This means that the transaction has
  337. %% been committed prematurely. This also happens if an explicit COMMIT is
  338. %% executed as a plain query within a managed transaction. (Don't do that!)
  339. %%
  340. %% <table>
  341. %% <thead>
  342. %% <tr><th>Class of exception</th><th>Return value</th></tr>
  343. %% </thead>
  344. %% <tbody>
  345. %% <tr>
  346. %% <td>`error' with reason `ErrorReason'</td>
  347. %% <td>`{aborted, {ErrorReason, Stack}}'</td>
  348. %% </tr>
  349. %% <tr><td>`exit(Term)'</td><td>`{aborted, Term}'</td></tr>
  350. %% <tr><td>`throw(Term)'</td><td>`{aborted, {throw, Term}}'</td></tr>
  351. %% </tbody>
  352. %% </table>
  353. -spec transaction(connection(), fun(), list(), Retries) -> {atomic, term()} |
  354. {aborted, term()}
  355. when Retries :: non_neg_integer() | infinity.
  356. transaction(Conn, Fun, Args, Retries) when is_list(Args),
  357. is_function(Fun, length(Args)) ->
  358. %% The guard makes sure that we can apply Fun to Args. Any error we catch
  359. %% in the try-catch are actual errors that occurred in Fun.
  360. ok = gen_server:call(Conn, start_transaction, infinity),
  361. execute_transaction(Conn, Fun, Args, Retries).
  362. %% @private
  363. %% @doc This is a helper for transaction/2,3,4. It performs everything except
  364. %% executing the BEGIN statement. It is called recursively when a transaction
  365. %% is retried.
  366. %%
  367. %% "When a transaction rollback occurs due to a deadlock or lock wait timeout,
  368. %% it cancels the effect of the statements within the transaction. But if the
  369. %% start-transaction statement was START TRANSACTION or BEGIN statement,
  370. %% rollback does not cancel that statement."
  371. %% (https://dev.mysql.com/doc/refman/5.6/en/innodb-error-handling.html)
  372. %%
  373. %% Lock Wait Timeout:
  374. %% "InnoDB rolls back only the last statement on a transaction timeout by
  375. %% default. If --innodb_rollback_on_timeout is specified, a transaction timeout
  376. %% causes InnoDB to abort and roll back the entire transaction (the same
  377. %% behavior as in MySQL 4.1)."
  378. %% (https://dev.mysql.com/doc/refman/5.6/en/innodb-parameters.html)
  379. execute_transaction(Conn, Fun, Args, Retries) ->
  380. try apply(Fun, Args) of
  381. ResultOfFun ->
  382. ok = gen_server:call(Conn, commit, infinity),
  383. {atomic, ResultOfFun}
  384. catch
  385. %% We are at the top level, try to restart the transaction if there are
  386. %% retries left
  387. ?EXCEPTION(throw, {implicit_rollback, 1, _}, _Stacktrace)
  388. when Retries == infinity ->
  389. execute_transaction(Conn, Fun, Args, infinity);
  390. ?EXCEPTION(throw, {implicit_rollback, 1, _}, _Stacktrace)
  391. when Retries > 0 ->
  392. execute_transaction(Conn, Fun, Args, Retries - 1);
  393. ?EXCEPTION(throw, {implicit_rollback, 1, Reason}, Stacktrace)
  394. when Retries == 0 ->
  395. %% No more retries. Return 'aborted' along with the deadlock error
  396. %% and a the trace to the line where the deadlock occured.
  397. Trace = ?GET_STACK(Stacktrace),
  398. ok = gen_server:call(Conn, rollback, infinity),
  399. {aborted, {Reason, Trace}};
  400. ?EXCEPTION(throw, {implicit_rollback, N, Reason}, Stacktrace)
  401. when N > 1 ->
  402. %% Nested transaction. Bubble out to the outermost level.
  403. erlang:raise(throw, {implicit_rollback, N - 1, Reason},
  404. ?GET_STACK(Stacktrace));
  405. ?EXCEPTION(error, {implicit_commit, _Query} = E, Stacktrace) ->
  406. %% The called did something like ALTER TABLE which resulted in an
  407. %% implicit commit. The server has already committed. We need to
  408. %% jump out of N levels of transactions.
  409. %%
  410. %% Returning 'atomic' or 'aborted' would both be wrong. Raise an
  411. %% exception is the best we can do.
  412. erlang:raise(error, E, ?GET_STACK(Stacktrace));
  413. ?EXCEPTION(Class, Reason, Stacktrace) ->
  414. %% We must be able to rollback. Otherwise let's crash.
  415. ok = gen_server:call(Conn, rollback, infinity),
  416. %% These forms for throw, error and exit mirror Mnesia's behaviour.
  417. Aborted = case Class of
  418. throw -> {throw, Reason};
  419. error -> {Reason, ?GET_STACK(Stacktrace)};
  420. exit -> Reason
  421. end,
  422. {aborted, Aborted}
  423. end.
  424. %% @doc Encodes a term as a MySQL literal so that it can be used to inside a
  425. %% query. If backslash escapes are enabled, backslashes and single quotes in
  426. %% strings and binaries are escaped. Otherwise only single quotes are escaped.
  427. %%
  428. %% Note that the preferred way of sending values is by prepared statements or
  429. %% parametrized queries with placeholders.
  430. %%
  431. %% @see query/3
  432. %% @see execute/3
  433. -spec encode(connection(), term()) -> iodata().
  434. encode(Conn, Term) ->
  435. Term1 = case (is_list(Term) orelse is_binary(Term)) andalso
  436. gen_server:call(Conn, backslash_escapes_enabled) of
  437. true -> mysql_encode:backslash_escape(Term);
  438. false -> Term
  439. end,
  440. mysql_encode:encode(Term1).
  441. %% --- Gen_server callbacks ---
  442. -include("records.hrl").
  443. -include("server_status.hrl").
  444. %% Gen_server state
  445. -record(state, {server_version, connection_id, socket, sockmod, ssl_opts,
  446. host, port, user, password, log_warnings,
  447. ping_timeout,
  448. query_timeout, query_cache_time,
  449. affected_rows = 0, status = 0, warning_count = 0, insert_id = 0,
  450. transaction_level = 0, ping_ref = undefined,
  451. stmts = dict:new(), query_cache = empty, cap_found_rows = false}).
  452. %% @private
  453. init(Opts) ->
  454. %% Connect
  455. Host = proplists:get_value(host, Opts, ?default_host),
  456. Port = proplists:get_value(port, Opts, ?default_port),
  457. User = proplists:get_value(user, Opts, ?default_user),
  458. Password = proplists:get_value(password, Opts, ?default_password),
  459. Database = proplists:get_value(database, Opts, undefined),
  460. LogWarn = proplists:get_value(log_warnings, Opts, true),
  461. KeepAlive = proplists:get_value(keep_alive, Opts, false),
  462. Timeout = proplists:get_value(query_timeout, Opts,
  463. ?default_query_timeout),
  464. QueryCacheTime = proplists:get_value(query_cache_time, Opts,
  465. ?default_query_cache_time),
  466. TcpOpts = proplists:get_value(tcp_options, Opts, []),
  467. SetFoundRows = proplists:get_value(found_rows, Opts, false),
  468. SSLOpts = proplists:get_value(ssl, Opts, undefined),
  469. SockMod0 = mysql_sock_tcp,
  470. PingTimeout = case KeepAlive of
  471. true -> ?default_ping_timeout;
  472. false -> infinity;
  473. N when N > 0 -> N
  474. end,
  475. %% Connect socket
  476. SockOpts = [binary, {packet, raw}, {active, false} | TcpOpts],
  477. {ok, Socket0} = SockMod0:connect(Host, Port, SockOpts),
  478. %% Exchange handshake communication.
  479. Result = mysql_protocol:handshake(User, Password, Database, SockMod0, SSLOpts,
  480. Socket0, SetFoundRows),
  481. case Result of
  482. {ok, Handshake, SockMod, Socket} ->
  483. SockMod:setopts(Socket, [{active, once}]),
  484. #handshake{server_version = Version, connection_id = ConnId,
  485. status = Status} = Handshake,
  486. State = #state{server_version = Version, connection_id = ConnId,
  487. sockmod = SockMod,
  488. socket = Socket,
  489. ssl_opts = SSLOpts,
  490. host = Host, port = Port, user = User,
  491. password = Password, status = Status,
  492. log_warnings = LogWarn,
  493. ping_timeout = PingTimeout,
  494. query_timeout = Timeout,
  495. query_cache_time = QueryCacheTime,
  496. cap_found_rows = (SetFoundRows =:= true)},
  497. %% Trap exit so that we can properly disconnect when we die.
  498. process_flag(trap_exit, true),
  499. State1 = schedule_ping(State),
  500. {ok, State1};
  501. #error{} = E ->
  502. {stop, error_to_reason(E)}
  503. end.
  504. %% @private
  505. %% @doc
  506. %%
  507. %% Query and execute calls:
  508. %%
  509. %% <ul>
  510. %% <li>{query, Query}</li>
  511. %% <li>{query, Query, Timeout}</li>
  512. %% <li>{param_query, Query, Params}</li>
  513. %% <li>{param_query, Query, Params, Timeout}</li>
  514. %% <li>{execute, Stmt, Args}</li>
  515. %% <li>{execute, Stmt, Args, Timeout}</li>
  516. %% </ul>
  517. %%
  518. %% For the calls listed above, we return these values:
  519. %%
  520. %% <dl>
  521. %% <dt>`ok'</dt>
  522. %% <dd>Success without returning any table data (UPDATE, etc.)</dd>
  523. %% <dt>`{ok, ColumnNames, Rows}'</dt>
  524. %% <dd>Queries returning one result set of table data</dd>
  525. %% <dt>`{ok, [{ColumnNames, Rows}, ...]}'</dt>
  526. %% <dd>Queries returning more than one result set of table data</dd>
  527. %% <dt>`{error, ServerReason}'</dt>
  528. %% <dd>MySQL server error</dd>
  529. %% <dt>`{implicit_commit, NestingLevel, Query}'</dt>
  530. %% <dd>A DDL statement (e.g. CREATE TABLE, ALTER TABLE, etc.) results in
  531. %% an implicit commit.
  532. %%
  533. %% If the caller is in a (nested) transaction, it must be aborted. To be
  534. %% able to handle this in the caller's process, we also return the
  535. %% nesting level.</dd>
  536. %% <dt>`{implicit_rollback, NestingLevel, ServerReason}'</dt>
  537. %% <dd>This errors results in an implicit rollback: `{1213, <<"40001">>,
  538. %% <<"Deadlock found when trying to get lock; try restarting "
  539. %% "transaction">>}'.
  540. %%
  541. %% If the caller is in a (nested) transaction, it must be aborted. To be
  542. %% able to handle this in the caller's process, we also return the
  543. %% nesting level.</dd>
  544. %% </dl>
  545. handle_call({query, Query}, From, State) ->
  546. handle_call({query, Query, State#state.query_timeout}, From, State);
  547. handle_call({query, Query, Timeout}, _From, State) ->
  548. SockMod = State#state.sockmod,
  549. Socket = State#state.socket,
  550. SockMod:setopts(Socket, [{active, false}]),
  551. {ok, Recs} = case mysql_protocol:query(Query, SockMod, Socket, Timeout) of
  552. {error, timeout} when State#state.server_version >= [5, 0, 0] ->
  553. kill_query(State),
  554. mysql_protocol:fetch_query_response(SockMod, Socket, ?cmd_timeout);
  555. {error, timeout} ->
  556. %% For MySQL 4.x.x there is no way to recover from timeout except
  557. %% killing the connection itself.
  558. exit(timeout);
  559. QueryResult ->
  560. QueryResult
  561. end,
  562. SockMod:setopts(Socket, [{active, once}]),
  563. State1 = lists:foldl(fun update_state/2, State, Recs),
  564. State1#state.warning_count > 0 andalso State1#state.log_warnings
  565. andalso log_warnings(State1, Query),
  566. handle_query_call_reply(Recs, Query, State1, []);
  567. handle_call({param_query, Query, Params}, From, State) ->
  568. handle_call({param_query, Query, Params, State#state.query_timeout}, From,
  569. State);
  570. handle_call({param_query, Query, Params, Timeout}, _From, State) ->
  571. %% Parametrized query: Prepared statement cached with the query as the key
  572. QueryBin = iolist_to_binary(Query),
  573. #state{socket = Socket, sockmod = SockMod} = State,
  574. Cache = State#state.query_cache,
  575. {StmtResult, Cache1} = case mysql_cache:lookup(QueryBin, Cache) of
  576. {found, FoundStmt, NewCache} ->
  577. %% Found
  578. {{ok, FoundStmt}, NewCache};
  579. not_found ->
  580. %% Prepare
  581. SockMod:setopts(Socket, [{active, false}]),
  582. SockMod = State#state.sockmod,
  583. Rec = mysql_protocol:prepare(Query, SockMod, Socket),
  584. SockMod:setopts(Socket, [{active, once}]),
  585. case Rec of
  586. #error{} = E ->
  587. {{error, error_to_reason(E)}, Cache};
  588. #prepared{} = Stmt ->
  589. %% If the first entry in the cache, start the timer.
  590. Cache == empty andalso begin
  591. When = State#state.query_cache_time * 2,
  592. erlang:send_after(When, self(), query_cache)
  593. end,
  594. {{ok, Stmt}, mysql_cache:store(QueryBin, Stmt, Cache)}
  595. end
  596. end,
  597. case StmtResult of
  598. {ok, StmtRec} ->
  599. State1 = State#state{query_cache = Cache1},
  600. execute_stmt(StmtRec, Params, Timeout, State1);
  601. PrepareError ->
  602. {reply, PrepareError, State}
  603. end;
  604. handle_call({execute, Stmt, Args}, From, State) ->
  605. handle_call({execute, Stmt, Args, State#state.query_timeout}, From, State);
  606. handle_call({execute, Stmt, Args, Timeout}, _From, State) ->
  607. case dict:find(Stmt, State#state.stmts) of
  608. {ok, StmtRec} ->
  609. execute_stmt(StmtRec, Args, Timeout, State);
  610. error ->
  611. {reply, {error, not_prepared}, State}
  612. end;
  613. handle_call({prepare, Query}, _From, State) ->
  614. #state{socket = Socket, sockmod = SockMod} = State,
  615. SockMod:setopts(Socket, [{active, false}]),
  616. SockMod = State#state.sockmod,
  617. Rec = mysql_protocol:prepare(Query, SockMod, Socket),
  618. SockMod:setopts(Socket, [{active, once}]),
  619. State1 = update_state(Rec, State),
  620. case Rec of
  621. #error{} = E ->
  622. {reply, {error, error_to_reason(E)}, State1};
  623. #prepared{statement_id = Id} = Stmt ->
  624. Stmts1 = dict:store(Id, Stmt, State1#state.stmts),
  625. State2 = State#state{stmts = Stmts1},
  626. {reply, {ok, Id}, State2}
  627. end;
  628. handle_call({prepare, Name, Query}, _From, State) when is_atom(Name) ->
  629. #state{socket = Socket, sockmod = SockMod} = State,
  630. %% First unprepare if there is an old statement with this name.
  631. SockMod:setopts(Socket, [{active, false}]),
  632. SockMod = State#state.sockmod,
  633. State1 = case dict:find(Name, State#state.stmts) of
  634. {ok, OldStmt} ->
  635. mysql_protocol:unprepare(OldStmt, SockMod, Socket),
  636. State#state{stmts = dict:erase(Name, State#state.stmts)};
  637. error ->
  638. State
  639. end,
  640. Rec = mysql_protocol:prepare(Query, SockMod, Socket),
  641. SockMod:setopts(Socket, [{active, once}]),
  642. State2 = update_state(Rec, State1),
  643. case Rec of
  644. #error{} = E ->
  645. {reply, {error, error_to_reason(E)}, State2};
  646. #prepared{} = Stmt ->
  647. Stmts1 = dict:store(Name, Stmt, State2#state.stmts),
  648. State3 = State2#state{stmts = Stmts1},
  649. {reply, {ok, Name}, State3}
  650. end;
  651. handle_call({unprepare, Stmt}, _From, State) when is_atom(Stmt);
  652. is_integer(Stmt) ->
  653. case dict:find(Stmt, State#state.stmts) of
  654. {ok, StmtRec} ->
  655. #state{socket = Socket, sockmod = SockMod} = State,
  656. SockMod:setopts(Socket, [{active, false}]),
  657. SockMod = State#state.sockmod,
  658. mysql_protocol:unprepare(StmtRec, SockMod, Socket),
  659. SockMod:setopts(Socket, [{active, once}]),
  660. State1 = State#state{stmts = dict:erase(Stmt, State#state.stmts)},
  661. State2 = schedule_ping(State1),
  662. {reply, ok, State2};
  663. error ->
  664. {reply, {error, not_prepared}, State}
  665. end;
  666. handle_call(warning_count, _From, State) ->
  667. {reply, State#state.warning_count, State};
  668. handle_call(insert_id, _From, State) ->
  669. {reply, State#state.insert_id, State};
  670. handle_call(affected_rows, _From, State) ->
  671. {reply, State#state.affected_rows, State};
  672. handle_call(autocommit, _From, State) ->
  673. {reply, State#state.status band ?SERVER_STATUS_AUTOCOMMIT /= 0, State};
  674. handle_call(backslash_escapes_enabled, _From, State = #state{status = S}) ->
  675. {reply, S band ?SERVER_STATUS_NO_BACKSLASH_ESCAPES == 0, State};
  676. handle_call(in_transaction, _From, State) ->
  677. {reply, State#state.status band ?SERVER_STATUS_IN_TRANS /= 0, State};
  678. handle_call(start_transaction, _From,
  679. State = #state{socket = Socket, sockmod = SockMod,
  680. transaction_level = L, status = Status})
  681. when Status band ?SERVER_STATUS_IN_TRANS == 0, L == 0;
  682. Status band ?SERVER_STATUS_IN_TRANS /= 0, L > 0 ->
  683. Query = case L of
  684. 0 -> <<"BEGIN">>;
  685. _ -> <<"SAVEPOINT s", (integer_to_binary(L))/binary>>
  686. end,
  687. SockMod:setopts(Socket, [{active, false}]),
  688. SockMod = State#state.sockmod,
  689. {ok, [Res = #ok{}]} = mysql_protocol:query(Query, SockMod, Socket,
  690. ?cmd_timeout),
  691. SockMod:setopts(Socket, [{active, once}]),
  692. State1 = update_state(Res, State),
  693. {reply, ok, State1#state{transaction_level = L + 1}};
  694. handle_call(rollback, _From, State = #state{socket = Socket, sockmod = SockMod,
  695. status = Status, transaction_level = L})
  696. when Status band ?SERVER_STATUS_IN_TRANS /= 0, L >= 1 ->
  697. Query = case L of
  698. 1 -> <<"ROLLBACK">>;
  699. _ -> <<"ROLLBACK TO s", (integer_to_binary(L - 1))/binary>>
  700. end,
  701. SockMod:setopts(Socket, [{active, false}]),
  702. SockMod = State#state.sockmod,
  703. {ok, [Res = #ok{}]} = mysql_protocol:query(Query, SockMod, Socket,
  704. ?cmd_timeout),
  705. SockMod:setopts(Socket, [{active, once}]),
  706. State1 = update_state(Res, State),
  707. {reply, ok, State1#state{transaction_level = L - 1}};
  708. handle_call(commit, _From, State = #state{socket = Socket, sockmod = SockMod,
  709. status = Status, transaction_level = L})
  710. when Status band ?SERVER_STATUS_IN_TRANS /= 0, L >= 1 ->
  711. Query = case L of
  712. 1 -> <<"COMMIT">>;
  713. _ -> <<"RELEASE SAVEPOINT s", (integer_to_binary(L - 1))/binary>>
  714. end,
  715. SockMod:setopts(Socket, [{active, false}]),
  716. SockMod = State#state.sockmod,
  717. {ok, [Res = #ok{}]} = mysql_protocol:query(Query, SockMod, Socket,
  718. ?cmd_timeout),
  719. SockMod:setopts(Socket, [{active, once}]),
  720. State1 = update_state(Res, State),
  721. {reply, ok, State1#state{transaction_level = L - 1}}.
  722. %% @private
  723. handle_cast(_Msg, State) ->
  724. {noreply, State}.
  725. %% @private
  726. handle_info(query_cache, #state{query_cache = Cache,
  727. query_cache_time = CacheTime} = State) ->
  728. %% Evict expired queries/statements in the cache used by query/3.
  729. {Evicted, Cache1} = mysql_cache:evict_older_than(Cache, CacheTime),
  730. %% Unprepare the evicted statements
  731. #state{socket = Socket, sockmod = SockMod} = State,
  732. SockMod:setopts(Socket, [{active, false}]),
  733. SockMod = State#state.sockmod,
  734. lists:foreach(fun ({_Query, Stmt}) ->
  735. mysql_protocol:unprepare(Stmt, SockMod, Socket)
  736. end,
  737. Evicted),
  738. SockMod:setopts(Socket, [{active, once}]),
  739. %% If nonempty, schedule eviction again.
  740. mysql_cache:size(Cache1) > 0 andalso
  741. erlang:send_after(CacheTime, self(), query_cache),
  742. {noreply, State#state{query_cache = Cache1}};
  743. handle_info(ping, #state{socket = Socket, sockmod = SockMod} = State) ->
  744. SockMod:setopts(Socket, [{active, false}]),
  745. SockMod = State#state.sockmod,
  746. Ok = mysql_protocol:ping(SockMod, Socket),
  747. SockMod:setopts(Socket, [{active, once}]),
  748. {noreply, update_state(Ok, State)};
  749. handle_info({tcp_closed, _Socket}, State) ->
  750. stop_server(tcp_closed, State);
  751. handle_info({tcp_error, _Socket, Reason}, State) ->
  752. stop_server({tcp_error, Reason}, State);
  753. handle_info(_Info, State) ->
  754. {noreply, State}.
  755. %% @private
  756. terminate(Reason, #state{socket = Socket, sockmod = SockMod})
  757. when Reason == normal; Reason == shutdown ->
  758. %% Send the goodbye message for politeness.
  759. SockMod:setopts(Socket, [{active, false}]),
  760. mysql_protocol:quit(SockMod, Socket);
  761. terminate(_Reason, _State) ->
  762. ok.
  763. %% @private
  764. code_change(_OldVsn, State = #state{}, _Extra) ->
  765. {ok, State};
  766. code_change(_OldVsn, _State, _Extra) ->
  767. {error, incompatible_state}.
  768. %% --- Helpers ---
  769. %% @doc Makes a gen_server call for a query (plain, parametrized or prepared),
  770. %% checks the reply and sometimes throws an exception when we need to jump out
  771. %% of a transaction.
  772. query_call(Conn, CallReq) ->
  773. case gen_server:call(Conn, CallReq, infinity) of
  774. {implicit_commit, _NestingLevel, Query} ->
  775. error({implicit_commit, Query});
  776. {implicit_rollback, _NestingLevel, _ServerReason} = ImplicitRollback ->
  777. throw(ImplicitRollback);
  778. Result ->
  779. Result
  780. end.
  781. %% @doc Executes a prepared statement and returns {Reply, NextState}.
  782. execute_stmt(Stmt, Args, Timeout, State = #state{socket = Socket, sockmod = SockMod}) ->
  783. SockMod:setopts(Socket, [{active, false}]),
  784. SockMod = State#state.sockmod,
  785. {ok, Recs} = case mysql_protocol:execute(Stmt, Args, SockMod, Socket,
  786. Timeout) of
  787. {error, timeout} when State#state.server_version >= [5, 0, 0] ->
  788. kill_query(State),
  789. mysql_protocol:fetch_execute_response(SockMod, Socket,
  790. ?cmd_timeout);
  791. {error, timeout} ->
  792. %% For MySQL 4.x.x there is no way to recover from timeout except
  793. %% killing the connection itself.
  794. exit(timeout);
  795. QueryResult ->
  796. QueryResult
  797. end,
  798. SockMod:setopts(Socket, [{active, once}]),
  799. State1 = lists:foldl(fun update_state/2, State, Recs),
  800. State1#state.warning_count > 0 andalso State1#state.log_warnings
  801. andalso log_warnings(State1, Stmt#prepared.orig_query),
  802. handle_query_call_reply(Recs, Stmt#prepared.orig_query, State1, []).
  803. %% @doc Produces a tuple to return as an error reason.
  804. -spec error_to_reason(#error{}) -> server_reason().
  805. error_to_reason(#error{code = Code, state = State, msg = Msg}) ->
  806. {Code, State, Msg}.
  807. %% @doc Updates a state with information from a response. Also re-schedules
  808. %% ping.
  809. -spec update_state(#ok{} | #eof{} | any(), #state{}) -> #state{}.
  810. update_state(Rec, State) ->
  811. State1 = case Rec of
  812. #ok{status = S, affected_rows = R, insert_id = Id, warning_count = W} ->
  813. State#state{status = S, affected_rows = R, insert_id = Id,
  814. warning_count = W};
  815. #resultset{status = S, warning_count = W} ->
  816. State#state{status = S, warning_count = W};
  817. #prepared{warning_count = W} ->
  818. State#state{warning_count = W};
  819. _Other ->
  820. %% This includes errors.
  821. %% Reset some things. (Note: We don't reset status and insert_id.)
  822. State#state{warning_count = 0, affected_rows = 0}
  823. end,
  824. schedule_ping(State1).
  825. %% @doc Produces a reply for handle_call/3 for queries and prepared statements.
  826. handle_query_call_reply([], _Query, State, ResultSetsAcc) ->
  827. Reply = case ResultSetsAcc of
  828. [] -> ok;
  829. [{ColumnNames, Rows}] -> {ok, ColumnNames, Rows};
  830. [_|_] -> {ok, lists:reverse(ResultSetsAcc)}
  831. end,
  832. {reply, Reply, State};
  833. handle_query_call_reply([Rec|Recs], Query, State, ResultSetsAcc) ->
  834. case Rec of
  835. #ok{status = Status} when Status band ?SERVER_STATUS_IN_TRANS == 0,
  836. State#state.transaction_level > 0 ->
  837. %% DDL statements (e.g. CREATE TABLE, ALTER TABLE, etc.) result in
  838. %% an implicit commit.
  839. Reply = {implicit_commit, State#state.transaction_level, Query},
  840. {reply, Reply, State#state{transaction_level = 0}};
  841. #ok{} ->
  842. handle_query_call_reply(Recs, Query, State, ResultSetsAcc);
  843. #resultset{cols = ColDefs, rows = Rows} ->
  844. Names = [Def#col.name || Def <- ColDefs],
  845. ResultSetsAcc1 = [{Names, Rows} | ResultSetsAcc],
  846. handle_query_call_reply(Recs, Query, State, ResultSetsAcc1);
  847. #error{code = ?ERROR_DEADLOCK} when State#state.transaction_level > 0 ->
  848. %% These errors result in an implicit rollback.
  849. Reply = {implicit_rollback, State#state.transaction_level,
  850. error_to_reason(Rec)},
  851. %% Everything in the transaction is rolled back, except the BEGIN
  852. %% statement itself. Thus, we are in transaction level 1.
  853. {reply, Reply, State#state{transaction_level = 1}};
  854. #error{} ->
  855. {reply, {error, error_to_reason(Rec)}, State}
  856. end.
  857. %% @doc Schedules (or re-schedules) ping.
  858. schedule_ping(State = #state{ping_timeout = infinity}) ->
  859. State;
  860. schedule_ping(State = #state{ping_timeout = Timeout, ping_ref = Ref}) ->
  861. is_reference(Ref) andalso erlang:cancel_timer(Ref),
  862. State#state{ping_ref = erlang:send_after(Timeout, self(), ping)}.
  863. %% @doc Fetches and logs warnings. Query is the query that gave the warnings.
  864. log_warnings(#state{socket = Socket, sockmod = SockMod} = State, Query) ->
  865. SockMod:setopts(Socket, [{active, false}]),
  866. SockMod = State#state.sockmod,
  867. {ok, [#resultset{rows = Rows}]} = mysql_protocol:query(<<"SHOW WARNINGS">>,
  868. SockMod, Socket,
  869. ?cmd_timeout),
  870. SockMod:setopts(Socket, [{active, once}]),
  871. Lines = [[Level, " ", integer_to_binary(Code), ": ", Message, "\n"]
  872. || [Level, Code, Message] <- Rows],
  873. error_logger:warning_msg("~s in ~s~n", [Lines, Query]).
  874. %% @doc Makes a separate connection and execute KILL QUERY. We do this to get
  875. %% our main connection back to normal. KILL QUERY appeared in MySQL 5.0.0.
  876. kill_query(#state{connection_id = ConnId, host = Host, port = Port,
  877. user = User, password = Password, ssl_opts = SSLOpts,
  878. cap_found_rows = SetFoundRows}) ->
  879. %% Connect socket
  880. SockOpts = [{active, false}, binary, {packet, raw}],
  881. {ok, Socket0} = mysql_sock_tcp:connect(Host, Port, SockOpts),
  882. %% Exchange handshake communication.
  883. Result = mysql_protocol:handshake(User, Password, undefined, mysql_sock_tcp,
  884. SSLOpts, Socket0, SetFoundRows),
  885. case Result of
  886. {ok, #handshake{}, SockMod, Socket} ->
  887. %% Kill and disconnect
  888. IdBin = integer_to_binary(ConnId),
  889. {ok, [#ok{}]} = mysql_protocol:query(<<"KILL QUERY ", IdBin/binary>>,
  890. SockMod, Socket, ?cmd_timeout),
  891. mysql_protocol:quit(SockMod, Socket);
  892. #error{} = E ->
  893. error_logger:error_msg("Failed to connect to kill query: ~p",
  894. [error_to_reason(E)])
  895. end.
  896. stop_server(Reason,
  897. #state{socket = Socket, connection_id = ConnId} = State) ->
  898. error_logger:error_msg("Connection Id ~p closing with reason: ~p~n",
  899. [ConnId, Reason]),
  900. ok = gen_tcp:close(Socket),
  901. {stop, Reason, State#state{socket = undefined, connection_id = undefined}}.