mysql.erl 42 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977
  1. %% MySQL/OTP – MySQL client library for Erlang/OTP
  2. %% Copyright (C) 2014-2015, 2018 Viktor Söderqvist,
  3. %% 2016 Johan Lövdahl
  4. %% 2017 Piotr Nosek, Michal Slaski
  5. %%
  6. %% This file is part of MySQL/OTP.
  7. %%
  8. %% MySQL/OTP is free software: you can redistribute it and/or modify it under
  9. %% the terms of the GNU Lesser General Public License as published by the Free
  10. %% Software Foundation, either version 3 of the License, or (at your option)
  11. %% any later version.
  12. %%
  13. %% This program is distributed in the hope that it will be useful, but WITHOUT
  14. %% ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  15. %% FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
  16. %% more details.
  17. %%
  18. %% You should have received a copy of the GNU Lesser General Public License
  19. %% along with this program. If not, see <https://www.gnu.org/licenses/>.
  20. %% @doc MySQL client.
  21. %%
  22. %% The `connection()' type is a gen_server reference as described in the
  23. %% documentation for `gen_server:call/2,3', e.g. the pid or the name if the
  24. %% gen_server is locally registered.
  25. -module(mysql).
  26. -export([start_link/1, query/2, query/3, query/4, execute/3, execute/4,
  27. prepare/2, prepare/3, unprepare/2,
  28. warning_count/1, affected_rows/1, autocommit/1, insert_id/1,
  29. encode/2, in_transaction/1,
  30. transaction/2, transaction/3, transaction/4]).
  31. -export_type([connection/0, server_reason/0, query_result/0]).
  32. -behaviour(gen_server).
  33. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
  34. code_change/3]).
  35. -define(default_host, "localhost").
  36. -define(default_port, 3306).
  37. -define(default_user, <<>>).
  38. -define(default_password, <<>>).
  39. -define(default_connect_timeout, 5000).
  40. -define(default_query_timeout, infinity).
  41. -define(default_query_cache_time, 60000). %% for query/3.
  42. -define(default_ping_timeout, 60000).
  43. -define(cmd_timeout, 3000). %% Timeout used for various commands to the server
  44. %% Errors that cause "implicit rollback"
  45. -define(ERROR_DEADLOCK, 1213).
  46. %% A connection is a ServerRef as in gen_server:call/2,3.
  47. -type connection() :: Name :: atom() |
  48. {Name :: atom(), Node :: atom()} |
  49. {global, GlobalName :: term()} |
  50. {via, Module :: atom(), ViaName :: term()} |
  51. pid().
  52. %% MySQL error with the codes and message returned from the server.
  53. -type server_reason() :: {Code :: integer(), SQLState :: binary(),
  54. Message :: binary()}.
  55. -type column_names() :: [binary()].
  56. -type rows() :: [[term()]].
  57. -type query_result() :: ok
  58. | {ok, column_names(), rows()}
  59. | {ok, [{column_names(), rows()}, ...]}
  60. | {error, server_reason()}.
  61. -include("exception.hrl").
  62. %% @doc Starts a connection gen_server process and connects to a database. To
  63. %% disconnect just do `exit(Pid, normal)'.
  64. %%
  65. %% Options:
  66. %%
  67. %% <dl>
  68. %% <dt>`{name, ServerName}'</dt>
  69. %% <dd>If a name is provided, the gen_server will be registered with this
  70. %% name. For details see the documentation for the first argument of
  71. %% gen_server:start_link/4.</dd>
  72. %% <dt>`{host, Host}'</dt>
  73. %% <dd>Hostname of the MySQL database; default `"localhost"'.</dd>
  74. %% <dt>`{port, Port}'</dt>
  75. %% <dd>Port; default 3306 if omitted.</dd>
  76. %% <dt>`{user, User}'</dt>
  77. %% <dd>Username.</dd>
  78. %% <dt>`{password, Password}'</dt>
  79. %% <dd>Password.</dd>
  80. %% <dt>`{database, Database}'</dt>
  81. %% <dd>The name of the database AKA schema to use. This can be changed later
  82. %% using the query `USE <database>'.</dd>
  83. %% <dt>`{connect_timeout, Timeout}'</dt>
  84. %% <dd>The maximum time to spend for start_link/1.</dd>
  85. %% <dt>`{log_warnings, boolean()}'</dt>
  86. %% <dd>Whether to fetch warnings and log them using error_logger; default
  87. %% true.</dd>
  88. %% <dt>`{keep_alive, boolean() | timeout()}'</dt>
  89. %% <dd>Send ping when unused for a certain time. Possible values are `true',
  90. %% `false' and `integer() > 0' for an explicit interval in milliseconds.
  91. %% The default is `false'. For `true' a default ping timeout is used.
  92. %% </dd>
  93. %% <dt>`{prepare, NamedStatements}'</dt>
  94. %% <dd>Named prepared statements to be created as soon as the connection is
  95. %% ready.</dd>
  96. %% <dt>`{queries, Queries}'</dt>
  97. %% <dd>Queries to be executed as soon as the connection is ready. Any results
  98. %% are discarded. Typically, this is used for setting time zone and other
  99. %% session variables.</dd>
  100. %% <dt>`{query_timeout, Timeout}'</dt>
  101. %% <dd>The default time to wait for a response when executing a query or a
  102. %% prepared statement. This can be given per query using `query/3,4' and
  103. %% `execute/4'. The default is `infinity'.</dd>
  104. %% <dt>`{found_rows, boolean()}'</dt>
  105. %% <dd>If set to true, the connection will be established with
  106. %% CLIENT_FOUND_ROWS capability. affected_rows/1 will now return the
  107. %% number of found rows, not the number of rows changed by the
  108. %% query.</dd>
  109. %% <dt>`{query_cache_time, Timeout}'</dt>
  110. %% <dd>The minimum number of milliseconds to cache prepared statements used
  111. %% for parametrized queries with query/3.</dd>
  112. %% <dt>`{tcp_options, Options}'</dt>
  113. %% <dd>Additional options for `gen_tcp:connect/3'. You may want to set
  114. %% `{recbuf, Size}' and `{sndbuf, Size}' if you send or receive more than
  115. %% the default (typically 8K) per query.</dd>
  116. %% </dl>
  117. -spec start_link(Options) -> {ok, pid()} | ignore | {error, term()}
  118. when Options :: [Option],
  119. Option :: {name, ServerName} |
  120. {host, inet:socket_address() | inet:hostname()} | {port, integer()} |
  121. {user, iodata()} | {password, iodata()} |
  122. {database, iodata()} |
  123. {connect_timeout, timeout()} |
  124. {log_warnings, boolean()} |
  125. {keep_alive, boolean() | timeout()} |
  126. {prepare, NamedStatements} |
  127. {queries, [iodata()]} |
  128. {query_timeout, timeout()} |
  129. {found_rows, boolean()} |
  130. {query_cache_time, non_neg_integer()},
  131. ServerName :: {local, Name :: atom()} |
  132. {global, GlobalName :: term()} |
  133. {via, Module :: atom(), ViaName :: term()},
  134. NamedStatements :: [{StatementName :: atom(), Statement :: iodata()}].
  135. start_link(Options) ->
  136. GenSrvOpts = [{timeout, proplists:get_value(connect_timeout, Options,
  137. ?default_connect_timeout)}],
  138. Ret = case proplists:get_value(name, Options) of
  139. undefined ->
  140. gen_server:start_link(?MODULE, Options, GenSrvOpts);
  141. ServerName ->
  142. gen_server:start_link(ServerName, ?MODULE, Options, GenSrvOpts)
  143. end,
  144. case Ret of
  145. {ok, Pid} ->
  146. %% Initial queries
  147. Queries = proplists:get_value(queries, Options, []),
  148. lists:foreach(fun (Query) ->
  149. case mysql:query(Pid, Query) of
  150. ok -> ok;
  151. {ok, _, _} -> ok;
  152. {ok, _} -> ok
  153. end
  154. end,
  155. Queries),
  156. %% Prepare
  157. Prepare = proplists:get_value(prepare, Options, []),
  158. lists:foreach(fun ({Name, Stmt}) ->
  159. {ok, Name} = mysql:prepare(Pid, Name, Stmt)
  160. end,
  161. Prepare);
  162. _ -> ok
  163. end,
  164. Ret.
  165. %% @doc Executes a query with the query timeout as given to start_link/1.
  166. %%
  167. %% It is possible to execute multiple semicolon-separated queries.
  168. %%
  169. %% Results are returned in the form `{ok, ColumnNames, Rows}' if there is one
  170. %% result set. If there are more than one result sets, they are returned in the
  171. %% form `{ok, [{ColumnNames, Rows}, ...]}'.
  172. %%
  173. %% For queries that don't return any rows (INSERT, UPDATE, etc.) only the atom
  174. %% `ok' is returned.
  175. -spec query(Conn, Query) -> Result
  176. when Conn :: connection(),
  177. Query :: iodata(),
  178. Result :: query_result().
  179. query(Conn, Query) ->
  180. query_call(Conn, {query, Query}).
  181. %% @doc Depending on the 3rd argument this function does different things.
  182. %%
  183. %% If the 3rd argument is a list, it executes a parameterized query. This is
  184. %% equivallent to query/4 with the query timeout as given to start_link/1.
  185. %%
  186. %% If the 3rd argument is a timeout, it executes a plain query with this
  187. %% timeout.
  188. %%
  189. %% The return value is the same as for query/2.
  190. %%
  191. %% @see query/2.
  192. %% @see query/4.
  193. -spec query(Conn, Query, Params | Timeout) -> Result
  194. when Conn :: connection(),
  195. Query :: iodata(),
  196. Timeout :: timeout(),
  197. Params :: [term()],
  198. Result :: query_result().
  199. query(Conn, Query, Params) when is_list(Params) ->
  200. query_call(Conn, {param_query, Query, Params});
  201. query(Conn, Query, Timeout) when is_integer(Timeout); Timeout == infinity ->
  202. query_call(Conn, {query, Query, Timeout}).
  203. %% @doc Executes a parameterized query with a timeout.
  204. %%
  205. %% A prepared statement is created, executed and then cached for a certain
  206. %% time. If the same query is executed again when it is already cached, it does
  207. %% not need to be prepared again.
  208. %%
  209. %% The minimum time the prepared statement is cached can be specified using the
  210. %% option `{query_cache_time, Milliseconds}' to start_link/1.
  211. %%
  212. %% The return value is the same as for query/2.
  213. -spec query(Conn, Query, Params, Timeout) -> Result
  214. when Conn :: connection(),
  215. Query :: iodata(),
  216. Timeout :: timeout(),
  217. Params :: [term()],
  218. Result :: query_result().
  219. query(Conn, Query, Params, Timeout) ->
  220. query_call(Conn, {param_query, Query, Params, Timeout}).
  221. %% @doc Executes a prepared statement with the default query timeout as given
  222. %% to start_link/1.
  223. %% @see prepare/2
  224. %% @see prepare/3
  225. -spec execute(Conn, StatementRef, Params) -> Result | {error, not_prepared}
  226. when Conn :: connection(),
  227. StatementRef :: atom() | integer(),
  228. Params :: [term()],
  229. Result :: query_result().
  230. execute(Conn, StatementRef, Params) ->
  231. query_call(Conn, {execute, StatementRef, Params}).
  232. %% @doc Executes a prepared statement.
  233. %% @see prepare/2
  234. %% @see prepare/3
  235. -spec execute(Conn, StatementRef, Params, Timeout) ->
  236. Result | {error, not_prepared}
  237. when Conn :: connection(),
  238. StatementRef :: atom() | integer(),
  239. Params :: [term()],
  240. Timeout :: timeout(),
  241. Result :: query_result().
  242. execute(Conn, StatementRef, Params, Timeout) ->
  243. query_call(Conn, {execute, StatementRef, Params, Timeout}).
  244. %% @doc Creates a prepared statement from the passed query.
  245. %% @see prepare/3
  246. -spec prepare(Conn, Query) -> {ok, StatementId} | {error, Reason}
  247. when Conn :: connection(),
  248. Query :: iodata(),
  249. StatementId :: integer(),
  250. Reason :: server_reason().
  251. prepare(Conn, Query) ->
  252. gen_server:call(Conn, {prepare, Query}).
  253. %% @doc Creates a prepared statement from the passed query and associates it
  254. %% with the given name.
  255. %% @see prepare/2
  256. -spec prepare(Conn, Name, Query) -> {ok, Name} | {error, Reason}
  257. when Conn :: connection(),
  258. Name :: atom(),
  259. Query :: iodata(),
  260. Reason :: server_reason().
  261. prepare(Conn, Name, Query) ->
  262. gen_server:call(Conn, {prepare, Name, Query}).
  263. %% @doc Deallocates a prepared statement.
  264. -spec unprepare(Conn, StatementRef) -> ok | {error, Reason}
  265. when Conn :: connection(),
  266. StatementRef :: atom() | integer(),
  267. Reason :: server_reason() | not_prepared.
  268. unprepare(Conn, StatementRef) ->
  269. gen_server:call(Conn, {unprepare, StatementRef}).
  270. %% @doc Returns the number of warnings generated by the last query/2 or
  271. %% execute/3 calls.
  272. -spec warning_count(connection()) -> integer().
  273. warning_count(Conn) ->
  274. gen_server:call(Conn, warning_count).
  275. %% @doc Returns the number of inserted, updated and deleted rows of the last
  276. %% executed query or prepared statement. If found_rows is set on the
  277. %% connection, for update operation the return value will equal to the number
  278. %% of rows matched by the query.
  279. -spec affected_rows(connection()) -> integer().
  280. affected_rows(Conn) ->
  281. gen_server:call(Conn, affected_rows).
  282. %% @doc Returns true if auto-commit is enabled and false otherwise.
  283. -spec autocommit(connection()) -> boolean().
  284. autocommit(Conn) ->
  285. gen_server:call(Conn, autocommit).
  286. %% @doc Returns the last insert-id.
  287. -spec insert_id(connection()) -> integer().
  288. insert_id(Conn) ->
  289. gen_server:call(Conn, insert_id).
  290. %% @doc Returns true if the connection is in a transaction and false otherwise.
  291. %% This works regardless of whether the transaction has been started using
  292. %% transaction/2,3 or using a plain `mysql:query(Connection, "BEGIN")'.
  293. %% @see transaction/2
  294. %% @see transaction/4
  295. -spec in_transaction(connection()) -> boolean().
  296. in_transaction(Conn) ->
  297. gen_server:call(Conn, in_transaction).
  298. %% @doc This function executes the functional object Fun as a transaction.
  299. %% @see transaction/4
  300. -spec transaction(connection(), fun()) -> {atomic, term()} | {aborted, term()}.
  301. transaction(Conn, Fun) ->
  302. transaction(Conn, Fun, [], infinity).
  303. %% @doc This function executes the functional object Fun as a transaction.
  304. %% @see transaction/4
  305. -spec transaction(connection(), fun(), Retries) -> {atomic, term()} |
  306. {aborted, term()}
  307. when Retries :: non_neg_integer() | infinity.
  308. transaction(Conn, Fun, Retries) ->
  309. transaction(Conn, Fun, [], Retries).
  310. %% @doc This function executes the functional object Fun with arguments Args as
  311. %% a transaction.
  312. %%
  313. %% The semantics are as close as possible to mnesia's transactions. Transactions
  314. %% can be nested and are restarted automatically when deadlocks are detected.
  315. %% MySQL's savepoints are used to implement nested transactions.
  316. %%
  317. %% Fun must be a function and Args must be a list of the same length as the
  318. %% arity of Fun.
  319. %%
  320. %% If an exception occurs within Fun, the exception is caught and `{aborted,
  321. %% Reason}' is returned. The value of `Reason' depends on the class of the
  322. %% exception.
  323. %%
  324. %% Note that an error response from a query does not cause a transaction to be
  325. %% rollbacked. To force a rollback on a MySQL error you can trigger a `badmatch'
  326. %% using e.g. `ok = mysql:query(Pid, "SELECT some_non_existent_value")'. An
  327. %% exception to this is the error 1213 "Deadlock", after the specified number
  328. %% of retries, all failed. In this case, the transaction is aborted and the
  329. %% error is retured as the reason for the aborted transaction, along with a
  330. %% stacktrace pointing to where the last deadlock was detected. (In earlier
  331. %% versions, up to and including 1.3.2, transactions where automatically
  332. %% restarted also for the error 1205 "Lock wait timeout". This is no longer the
  333. %% case.)
  334. %%
  335. %% Some queries such as ALTER TABLE cause an *implicit commit* on the server.
  336. %% If such a query is executed within a transaction, an error on the form
  337. %% `{implicit_commit, Query}' is raised. This means that the transaction has
  338. %% been committed prematurely. This also happens if an explicit COMMIT is
  339. %% executed as a plain query within a managed transaction. (Don't do that!)
  340. %%
  341. %% <table>
  342. %% <thead>
  343. %% <tr><th>Class of exception</th><th>Return value</th></tr>
  344. %% </thead>
  345. %% <tbody>
  346. %% <tr>
  347. %% <td>`error' with reason `ErrorReason'</td>
  348. %% <td>`{aborted, {ErrorReason, Stack}}'</td>
  349. %% </tr>
  350. %% <tr><td>`exit(Term)'</td><td>`{aborted, Term}'</td></tr>
  351. %% <tr><td>`throw(Term)'</td><td>`{aborted, {throw, Term}}'</td></tr>
  352. %% </tbody>
  353. %% </table>
  354. -spec transaction(connection(), fun(), list(), Retries) -> {atomic, term()} |
  355. {aborted, term()}
  356. when Retries :: non_neg_integer() | infinity.
  357. transaction(Conn, Fun, Args, Retries) when is_list(Args),
  358. is_function(Fun, length(Args)) ->
  359. %% The guard makes sure that we can apply Fun to Args. Any error we catch
  360. %% in the try-catch are actual errors that occurred in Fun.
  361. ok = gen_server:call(Conn, start_transaction, infinity),
  362. execute_transaction(Conn, Fun, Args, Retries).
  363. %% @private
  364. %% @doc This is a helper for transaction/2,3,4. It performs everything except
  365. %% executing the BEGIN statement. It is called recursively when a transaction
  366. %% is retried.
  367. %%
  368. %% "When a transaction rollback occurs due to a deadlock or lock wait timeout,
  369. %% it cancels the effect of the statements within the transaction. But if the
  370. %% start-transaction statement was START TRANSACTION or BEGIN statement,
  371. %% rollback does not cancel that statement."
  372. %% (https://dev.mysql.com/doc/refman/5.6/en/innodb-error-handling.html)
  373. %%
  374. %% Lock Wait Timeout:
  375. %% "InnoDB rolls back only the last statement on a transaction timeout by
  376. %% default. If --innodb_rollback_on_timeout is specified, a transaction timeout
  377. %% causes InnoDB to abort and roll back the entire transaction (the same
  378. %% behavior as in MySQL 4.1)."
  379. %% (https://dev.mysql.com/doc/refman/5.6/en/innodb-parameters.html)
  380. execute_transaction(Conn, Fun, Args, Retries) ->
  381. try apply(Fun, Args) of
  382. ResultOfFun ->
  383. ok = gen_server:call(Conn, commit, infinity),
  384. {atomic, ResultOfFun}
  385. catch
  386. %% We are at the top level, try to restart the transaction if there are
  387. %% retries left
  388. ?EXCEPTION(throw, {implicit_rollback, 1, _}, _Stacktrace)
  389. when Retries == infinity ->
  390. execute_transaction(Conn, Fun, Args, infinity);
  391. ?EXCEPTION(throw, {implicit_rollback, 1, _}, _Stacktrace)
  392. when Retries > 0 ->
  393. execute_transaction(Conn, Fun, Args, Retries - 1);
  394. ?EXCEPTION(throw, {implicit_rollback, 1, Reason}, Stacktrace)
  395. when Retries == 0 ->
  396. %% No more retries. Return 'aborted' along with the deadlock error
  397. %% and a the trace to the line where the deadlock occured.
  398. Trace = ?GET_STACK(Stacktrace),
  399. ok = gen_server:call(Conn, rollback, infinity),
  400. {aborted, {Reason, Trace}};
  401. ?EXCEPTION(throw, {implicit_rollback, N, Reason}, Stacktrace)
  402. when N > 1 ->
  403. %% Nested transaction. Bubble out to the outermost level.
  404. erlang:raise(throw, {implicit_rollback, N - 1, Reason},
  405. ?GET_STACK(Stacktrace));
  406. ?EXCEPTION(error, {implicit_commit, _Query} = E, Stacktrace) ->
  407. %% The called did something like ALTER TABLE which resulted in an
  408. %% implicit commit. The server has already committed. We need to
  409. %% jump out of N levels of transactions.
  410. %%
  411. %% Returning 'atomic' or 'aborted' would both be wrong. Raise an
  412. %% exception is the best we can do.
  413. erlang:raise(error, E, ?GET_STACK(Stacktrace));
  414. ?EXCEPTION(Class, Reason, Stacktrace) ->
  415. %% We must be able to rollback. Otherwise let's crash.
  416. ok = gen_server:call(Conn, rollback, infinity),
  417. %% These forms for throw, error and exit mirror Mnesia's behaviour.
  418. Aborted = case Class of
  419. throw -> {throw, Reason};
  420. error -> {Reason, ?GET_STACK(Stacktrace)};
  421. exit -> Reason
  422. end,
  423. {aborted, Aborted}
  424. end.
  425. %% @doc Encodes a term as a MySQL literal so that it can be used to inside a
  426. %% query. If backslash escapes are enabled, backslashes and single quotes in
  427. %% strings and binaries are escaped. Otherwise only single quotes are escaped.
  428. %%
  429. %% Note that the preferred way of sending values is by prepared statements or
  430. %% parametrized queries with placeholders.
  431. %%
  432. %% @see query/3
  433. %% @see execute/3
  434. -spec encode(connection(), term()) -> iodata().
  435. encode(Conn, Term) ->
  436. Term1 = case (is_list(Term) orelse is_binary(Term)) andalso
  437. gen_server:call(Conn, backslash_escapes_enabled) of
  438. true -> mysql_encode:backslash_escape(Term);
  439. false -> Term
  440. end,
  441. mysql_encode:encode(Term1).
  442. %% --- Gen_server callbacks ---
  443. -include("records.hrl").
  444. -include("server_status.hrl").
  445. %% Gen_server state
  446. -record(state, {server_version, connection_id, socket, sockmod, ssl_opts,
  447. host, port, user, password, log_warnings,
  448. ping_timeout,
  449. query_timeout, query_cache_time,
  450. affected_rows = 0, status = 0, warning_count = 0, insert_id = 0,
  451. transaction_level = 0, ping_ref = undefined,
  452. monitors = [],
  453. stmts = dict:new(), query_cache = empty, cap_found_rows = false}).
  454. %% @private
  455. init(Opts) ->
  456. %% Connect
  457. Host = proplists:get_value(host, Opts, ?default_host),
  458. Port = proplists:get_value(port, Opts, ?default_port),
  459. User = proplists:get_value(user, Opts, ?default_user),
  460. Password = proplists:get_value(password, Opts, ?default_password),
  461. Database = proplists:get_value(database, Opts, undefined),
  462. LogWarn = proplists:get_value(log_warnings, Opts, true),
  463. KeepAlive = proplists:get_value(keep_alive, Opts, false),
  464. Timeout = proplists:get_value(query_timeout, Opts,
  465. ?default_query_timeout),
  466. QueryCacheTime = proplists:get_value(query_cache_time, Opts,
  467. ?default_query_cache_time),
  468. TcpOpts = proplists:get_value(tcp_options, Opts, []),
  469. SetFoundRows = proplists:get_value(found_rows, Opts, false),
  470. SSLOpts = proplists:get_value(ssl, Opts, undefined),
  471. SockMod0 = mysql_sock_tcp,
  472. PingTimeout = case KeepAlive of
  473. true -> ?default_ping_timeout;
  474. false -> infinity;
  475. N when N > 0 -> N
  476. end,
  477. %% Connect socket
  478. SockOpts = [binary, {packet, raw}, {active, false} | TcpOpts],
  479. {ok, Socket0} = SockMod0:connect(Host, Port, SockOpts),
  480. %% Exchange handshake communication.
  481. Result = mysql_protocol:handshake(User, Password, Database, SockMod0, SSLOpts,
  482. Socket0, SetFoundRows),
  483. case Result of
  484. {ok, Handshake, SockMod, Socket} ->
  485. SockMod:setopts(Socket, [{active, once}]),
  486. #handshake{server_version = Version, connection_id = ConnId,
  487. status = Status} = Handshake,
  488. State = #state{server_version = Version, connection_id = ConnId,
  489. sockmod = SockMod,
  490. socket = Socket,
  491. ssl_opts = SSLOpts,
  492. host = Host, port = Port, user = User,
  493. password = Password, status = Status,
  494. log_warnings = LogWarn,
  495. ping_timeout = PingTimeout,
  496. query_timeout = Timeout,
  497. query_cache_time = QueryCacheTime,
  498. cap_found_rows = (SetFoundRows =:= true)},
  499. %% Trap exit so that we can properly disconnect when we die.
  500. process_flag(trap_exit, true),
  501. State1 = schedule_ping(State),
  502. {ok, State1};
  503. #error{} = E ->
  504. {stop, error_to_reason(E)}
  505. end.
  506. %% @private
  507. %% @doc
  508. %%
  509. %% Query and execute calls:
  510. %%
  511. %% <ul>
  512. %% <li>{query, Query}</li>
  513. %% <li>{query, Query, Timeout}</li>
  514. %% <li>{param_query, Query, Params}</li>
  515. %% <li>{param_query, Query, Params, Timeout}</li>
  516. %% <li>{execute, Stmt, Args}</li>
  517. %% <li>{execute, Stmt, Args, Timeout}</li>
  518. %% </ul>
  519. %%
  520. %% For the calls listed above, we return these values:
  521. %%
  522. %% <dl>
  523. %% <dt>`ok'</dt>
  524. %% <dd>Success without returning any table data (UPDATE, etc.)</dd>
  525. %% <dt>`{ok, ColumnNames, Rows}'</dt>
  526. %% <dd>Queries returning one result set of table data</dd>
  527. %% <dt>`{ok, [{ColumnNames, Rows}, ...]}'</dt>
  528. %% <dd>Queries returning more than one result set of table data</dd>
  529. %% <dt>`{error, ServerReason}'</dt>
  530. %% <dd>MySQL server error</dd>
  531. %% <dt>`{implicit_commit, NestingLevel, Query}'</dt>
  532. %% <dd>A DDL statement (e.g. CREATE TABLE, ALTER TABLE, etc.) results in
  533. %% an implicit commit.
  534. %%
  535. %% If the caller is in a (nested) transaction, it must be aborted. To be
  536. %% able to handle this in the caller's process, we also return the
  537. %% nesting level.</dd>
  538. %% <dt>`{implicit_rollback, NestingLevel, ServerReason}'</dt>
  539. %% <dd>This errors results in an implicit rollback: `{1213, <<"40001">>,
  540. %% <<"Deadlock found when trying to get lock; try restarting "
  541. %% "transaction">>}'.
  542. %%
  543. %% If the caller is in a (nested) transaction, it must be aborted. To be
  544. %% able to handle this in the caller's process, we also return the
  545. %% nesting level.</dd>
  546. %% </dl>
  547. handle_call({query, Query}, From, State) ->
  548. handle_call({query, Query, State#state.query_timeout}, From, State);
  549. handle_call({query, Query, Timeout}, _From, State) ->
  550. SockMod = State#state.sockmod,
  551. Socket = State#state.socket,
  552. SockMod:setopts(Socket, [{active, false}]),
  553. {ok, Recs} = case mysql_protocol:query(Query, SockMod, Socket, Timeout) of
  554. {error, timeout} when State#state.server_version >= [5, 0, 0] ->
  555. kill_query(State),
  556. mysql_protocol:fetch_query_response(SockMod, Socket, ?cmd_timeout);
  557. {error, timeout} ->
  558. %% For MySQL 4.x.x there is no way to recover from timeout except
  559. %% killing the connection itself.
  560. exit(timeout);
  561. QueryResult ->
  562. QueryResult
  563. end,
  564. SockMod:setopts(Socket, [{active, once}]),
  565. State1 = lists:foldl(fun update_state/2, State, Recs),
  566. State1#state.warning_count > 0 andalso State1#state.log_warnings
  567. andalso log_warnings(State1, Query),
  568. handle_query_call_reply(Recs, Query, State1, []);
  569. handle_call({param_query, Query, Params}, From, State) ->
  570. handle_call({param_query, Query, Params, State#state.query_timeout}, From,
  571. State);
  572. handle_call({param_query, Query, Params, Timeout}, _From, State) ->
  573. %% Parametrized query: Prepared statement cached with the query as the key
  574. QueryBin = iolist_to_binary(Query),
  575. #state{socket = Socket, sockmod = SockMod} = State,
  576. Cache = State#state.query_cache,
  577. {StmtResult, Cache1} = case mysql_cache:lookup(QueryBin, Cache) of
  578. {found, FoundStmt, NewCache} ->
  579. %% Found
  580. {{ok, FoundStmt}, NewCache};
  581. not_found ->
  582. %% Prepare
  583. SockMod:setopts(Socket, [{active, false}]),
  584. SockMod = State#state.sockmod,
  585. Rec = mysql_protocol:prepare(Query, SockMod, Socket),
  586. SockMod:setopts(Socket, [{active, once}]),
  587. case Rec of
  588. #error{} = E ->
  589. {{error, error_to_reason(E)}, Cache};
  590. #prepared{} = Stmt ->
  591. %% If the first entry in the cache, start the timer.
  592. Cache == empty andalso begin
  593. When = State#state.query_cache_time * 2,
  594. erlang:send_after(When, self(), query_cache)
  595. end,
  596. {{ok, Stmt}, mysql_cache:store(QueryBin, Stmt, Cache)}
  597. end
  598. end,
  599. case StmtResult of
  600. {ok, StmtRec} ->
  601. State1 = State#state{query_cache = Cache1},
  602. execute_stmt(StmtRec, Params, Timeout, State1);
  603. PrepareError ->
  604. {reply, PrepareError, State}
  605. end;
  606. handle_call({execute, Stmt, Args}, From, State) ->
  607. handle_call({execute, Stmt, Args, State#state.query_timeout}, From, State);
  608. handle_call({execute, Stmt, Args, Timeout}, _From, State) ->
  609. case dict:find(Stmt, State#state.stmts) of
  610. {ok, StmtRec} ->
  611. execute_stmt(StmtRec, Args, Timeout, State);
  612. error ->
  613. {reply, {error, not_prepared}, State}
  614. end;
  615. handle_call({prepare, Query}, _From, State) ->
  616. #state{socket = Socket, sockmod = SockMod} = State,
  617. SockMod:setopts(Socket, [{active, false}]),
  618. SockMod = State#state.sockmod,
  619. Rec = mysql_protocol:prepare(Query, SockMod, Socket),
  620. SockMod:setopts(Socket, [{active, once}]),
  621. State1 = update_state(Rec, State),
  622. case Rec of
  623. #error{} = E ->
  624. {reply, {error, error_to_reason(E)}, State1};
  625. #prepared{statement_id = Id} = Stmt ->
  626. Stmts1 = dict:store(Id, Stmt, State1#state.stmts),
  627. State2 = State#state{stmts = Stmts1},
  628. {reply, {ok, Id}, State2}
  629. end;
  630. handle_call({prepare, Name, Query}, _From, State) when is_atom(Name) ->
  631. #state{socket = Socket, sockmod = SockMod} = State,
  632. %% First unprepare if there is an old statement with this name.
  633. SockMod:setopts(Socket, [{active, false}]),
  634. SockMod = State#state.sockmod,
  635. State1 = case dict:find(Name, State#state.stmts) of
  636. {ok, OldStmt} ->
  637. mysql_protocol:unprepare(OldStmt, SockMod, Socket),
  638. State#state{stmts = dict:erase(Name, State#state.stmts)};
  639. error ->
  640. State
  641. end,
  642. Rec = mysql_protocol:prepare(Query, SockMod, Socket),
  643. SockMod:setopts(Socket, [{active, once}]),
  644. State2 = update_state(Rec, State1),
  645. case Rec of
  646. #error{} = E ->
  647. {reply, {error, error_to_reason(E)}, State2};
  648. #prepared{} = Stmt ->
  649. Stmts1 = dict:store(Name, Stmt, State2#state.stmts),
  650. State3 = State2#state{stmts = Stmts1},
  651. {reply, {ok, Name}, State3}
  652. end;
  653. handle_call({unprepare, Stmt}, _From, State) when is_atom(Stmt);
  654. is_integer(Stmt) ->
  655. case dict:find(Stmt, State#state.stmts) of
  656. {ok, StmtRec} ->
  657. #state{socket = Socket, sockmod = SockMod} = State,
  658. SockMod:setopts(Socket, [{active, false}]),
  659. SockMod = State#state.sockmod,
  660. mysql_protocol:unprepare(StmtRec, SockMod, Socket),
  661. SockMod:setopts(Socket, [{active, once}]),
  662. State1 = State#state{stmts = dict:erase(Stmt, State#state.stmts)},
  663. State2 = schedule_ping(State1),
  664. {reply, ok, State2};
  665. error ->
  666. {reply, {error, not_prepared}, State}
  667. end;
  668. handle_call(warning_count, _From, State) ->
  669. {reply, State#state.warning_count, State};
  670. handle_call(insert_id, _From, State) ->
  671. {reply, State#state.insert_id, State};
  672. handle_call(affected_rows, _From, State) ->
  673. {reply, State#state.affected_rows, State};
  674. handle_call(autocommit, _From, State) ->
  675. {reply, State#state.status band ?SERVER_STATUS_AUTOCOMMIT /= 0, State};
  676. handle_call(backslash_escapes_enabled, _From, State = #state{status = S}) ->
  677. {reply, S band ?SERVER_STATUS_NO_BACKSLASH_ESCAPES == 0, State};
  678. handle_call(in_transaction, _From, State) ->
  679. {reply, State#state.status band ?SERVER_STATUS_IN_TRANS /= 0, State};
  680. handle_call(start_transaction, {FromPid, _},
  681. State = #state{socket = Socket, sockmod = SockMod,
  682. transaction_level = L, status = Status, monitors = Monitors})
  683. when Status band ?SERVER_STATUS_IN_TRANS == 0, L == 0;
  684. Status band ?SERVER_STATUS_IN_TRANS /= 0, L > 0 ->
  685. MRef = erlang:monitor(process, FromPid),
  686. Query = case L of
  687. 0 -> <<"BEGIN">>;
  688. _ -> <<"SAVEPOINT s", (integer_to_binary(L))/binary>>
  689. end,
  690. SockMod:setopts(Socket, [{active, false}]),
  691. SockMod = State#state.sockmod,
  692. {ok, [Res = #ok{}]} = mysql_protocol:query(Query, SockMod, Socket,
  693. ?cmd_timeout),
  694. SockMod:setopts(Socket, [{active, once}]),
  695. State1 = update_state(Res, State),
  696. {reply, ok, State1#state{transaction_level = L + 1, monitors = [{FromPid, MRef} | Monitors]}};
  697. handle_call(rollback, {FromPid, _}, State = #state{socket = Socket, sockmod = SockMod,
  698. status = Status, transaction_level = L,
  699. monitors = [{FromPid, MRef}|NewMonitors]})
  700. when Status band ?SERVER_STATUS_IN_TRANS /= 0, L >= 1 ->
  701. erlang:demonitor(MRef),
  702. Query = case L of
  703. 1 -> <<"ROLLBACK">>;
  704. _ -> <<"ROLLBACK TO s", (integer_to_binary(L - 1))/binary>>
  705. end,
  706. SockMod:setopts(Socket, [{active, false}]),
  707. SockMod = State#state.sockmod,
  708. {ok, [Res = #ok{}]} = mysql_protocol:query(Query, SockMod, Socket,
  709. ?cmd_timeout),
  710. SockMod:setopts(Socket, [{active, once}]),
  711. State1 = update_state(Res, State),
  712. {reply, ok, State1#state{transaction_level = L - 1, monitors = NewMonitors}};
  713. handle_call(commit, {FromPid, _}, State = #state{socket = Socket, sockmod = SockMod,
  714. status = Status, transaction_level = L,
  715. monitors = [{FromPid, MRef}|NewMonitors]})
  716. when Status band ?SERVER_STATUS_IN_TRANS /= 0, L >= 1 ->
  717. erlang:demonitor(MRef),
  718. Query = case L of
  719. 1 -> <<"COMMIT">>;
  720. _ -> <<"RELEASE SAVEPOINT s", (integer_to_binary(L - 1))/binary>>
  721. end,
  722. SockMod:setopts(Socket, [{active, false}]),
  723. SockMod = State#state.sockmod,
  724. {ok, [Res = #ok{}]} = mysql_protocol:query(Query, SockMod, Socket,
  725. ?cmd_timeout),
  726. SockMod:setopts(Socket, [{active, once}]),
  727. State1 = update_state(Res, State),
  728. {reply, ok, State1#state{transaction_level = L - 1, monitors = NewMonitors}}.
  729. %% @private
  730. handle_cast(_Msg, State) ->
  731. {noreply, State}.
  732. %% @private
  733. handle_info(query_cache, #state{query_cache = Cache,
  734. query_cache_time = CacheTime} = State) ->
  735. %% Evict expired queries/statements in the cache used by query/3.
  736. {Evicted, Cache1} = mysql_cache:evict_older_than(Cache, CacheTime),
  737. %% Unprepare the evicted statements
  738. #state{socket = Socket, sockmod = SockMod} = State,
  739. SockMod:setopts(Socket, [{active, false}]),
  740. SockMod = State#state.sockmod,
  741. lists:foreach(fun ({_Query, Stmt}) ->
  742. mysql_protocol:unprepare(Stmt, SockMod, Socket)
  743. end,
  744. Evicted),
  745. SockMod:setopts(Socket, [{active, once}]),
  746. %% If nonempty, schedule eviction again.
  747. mysql_cache:size(Cache1) > 0 andalso
  748. erlang:send_after(CacheTime, self(), query_cache),
  749. {noreply, State#state{query_cache = Cache1}};
  750. handle_info({'DOWN', _MRef, _, Pid, _Info}, State) ->
  751. stop_server({application_process_died, Pid}, State);
  752. handle_info(ping, #state{socket = Socket, sockmod = SockMod} = State) ->
  753. SockMod:setopts(Socket, [{active, false}]),
  754. SockMod = State#state.sockmod,
  755. Ok = mysql_protocol:ping(SockMod, Socket),
  756. SockMod:setopts(Socket, [{active, once}]),
  757. {noreply, update_state(Ok, State)};
  758. handle_info({tcp_closed, _Socket}, State) ->
  759. stop_server(tcp_closed, State);
  760. handle_info({tcp_error, _Socket, Reason}, State) ->
  761. stop_server({tcp_error, Reason}, State);
  762. handle_info(_Info, State) ->
  763. {noreply, State}.
  764. %% @private
  765. terminate(Reason, #state{socket = Socket, sockmod = SockMod})
  766. when Reason == normal; Reason == shutdown ->
  767. %% Send the goodbye message for politeness.
  768. SockMod:setopts(Socket, [{active, false}]),
  769. mysql_protocol:quit(SockMod, Socket);
  770. terminate(_Reason, _State) ->
  771. ok.
  772. %% @private
  773. code_change(_OldVsn, State = #state{}, _Extra) ->
  774. {ok, State};
  775. code_change(_OldVsn, _State, _Extra) ->
  776. {error, incompatible_state}.
  777. %% --- Helpers ---
  778. %% @doc Makes a gen_server call for a query (plain, parametrized or prepared),
  779. %% checks the reply and sometimes throws an exception when we need to jump out
  780. %% of a transaction.
  781. query_call(Conn, CallReq) ->
  782. case gen_server:call(Conn, CallReq, infinity) of
  783. {implicit_commit, _NestingLevel, Query} ->
  784. error({implicit_commit, Query});
  785. {implicit_rollback, _NestingLevel, _ServerReason} = ImplicitRollback ->
  786. throw(ImplicitRollback);
  787. Result ->
  788. Result
  789. end.
  790. %% @doc Executes a prepared statement and returns {Reply, NextState}.
  791. execute_stmt(Stmt, Args, Timeout, State = #state{socket = Socket, sockmod = SockMod}) ->
  792. SockMod:setopts(Socket, [{active, false}]),
  793. SockMod = State#state.sockmod,
  794. {ok, Recs} = case mysql_protocol:execute(Stmt, Args, SockMod, Socket,
  795. Timeout) of
  796. {error, timeout} when State#state.server_version >= [5, 0, 0] ->
  797. kill_query(State),
  798. mysql_protocol:fetch_execute_response(SockMod, Socket,
  799. ?cmd_timeout);
  800. {error, timeout} ->
  801. %% For MySQL 4.x.x there is no way to recover from timeout except
  802. %% killing the connection itself.
  803. exit(timeout);
  804. QueryResult ->
  805. QueryResult
  806. end,
  807. SockMod:setopts(Socket, [{active, once}]),
  808. State1 = lists:foldl(fun update_state/2, State, Recs),
  809. State1#state.warning_count > 0 andalso State1#state.log_warnings
  810. andalso log_warnings(State1, Stmt#prepared.orig_query),
  811. handle_query_call_reply(Recs, Stmt#prepared.orig_query, State1, []).
  812. %% @doc Produces a tuple to return as an error reason.
  813. -spec error_to_reason(#error{}) -> server_reason().
  814. error_to_reason(#error{code = Code, state = State, msg = Msg}) ->
  815. {Code, State, Msg}.
  816. %% @doc Updates a state with information from a response. Also re-schedules
  817. %% ping.
  818. -spec update_state(#ok{} | #eof{} | any(), #state{}) -> #state{}.
  819. update_state(Rec, State) ->
  820. State1 = case Rec of
  821. #ok{status = S, affected_rows = R, insert_id = Id, warning_count = W} ->
  822. State#state{status = S, affected_rows = R, insert_id = Id,
  823. warning_count = W};
  824. #resultset{status = S, warning_count = W} ->
  825. State#state{status = S, warning_count = W};
  826. #prepared{warning_count = W} ->
  827. State#state{warning_count = W};
  828. _Other ->
  829. %% This includes errors.
  830. %% Reset some things. (Note: We don't reset status and insert_id.)
  831. State#state{warning_count = 0, affected_rows = 0}
  832. end,
  833. schedule_ping(State1).
  834. %% @doc Produces a reply for handle_call/3 for queries and prepared statements.
  835. handle_query_call_reply([], _Query, State, ResultSetsAcc) ->
  836. Reply = case ResultSetsAcc of
  837. [] -> ok;
  838. [{ColumnNames, Rows}] -> {ok, ColumnNames, Rows};
  839. [_|_] -> {ok, lists:reverse(ResultSetsAcc)}
  840. end,
  841. {reply, Reply, State};
  842. handle_query_call_reply([Rec|Recs], Query, #state{monitors = Monitors} = State, ResultSetsAcc) ->
  843. case Rec of
  844. #ok{status = Status} when Status band ?SERVER_STATUS_IN_TRANS == 0,
  845. State#state.transaction_level > 0 ->
  846. %% DDL statements (e.g. CREATE TABLE, ALTER TABLE, etc.) result in
  847. %% an implicit commit.
  848. Reply = {implicit_commit, State#state.transaction_level, Query},
  849. NewMonitors = demonitor_processes(Monitors, length(Monitors)),
  850. {reply, Reply, State#state{transaction_level = 0, monitors = NewMonitors}};
  851. #ok{} ->
  852. handle_query_call_reply(Recs, Query, State, ResultSetsAcc);
  853. #resultset{cols = ColDefs, rows = Rows} ->
  854. Names = [Def#col.name || Def <- ColDefs],
  855. ResultSetsAcc1 = [{Names, Rows} | ResultSetsAcc],
  856. handle_query_call_reply(Recs, Query, State, ResultSetsAcc1);
  857. #error{code = ?ERROR_DEADLOCK} when State#state.transaction_level > 0 ->
  858. %% These errors result in an implicit rollback.
  859. Reply = {implicit_rollback, State#state.transaction_level,
  860. error_to_reason(Rec)},
  861. %% Everything in the transaction is rolled back, except the BEGIN
  862. %% statement itself. Thus, we are in transaction level 1.
  863. NewMonitors = demonitor_processes(Monitors, length(Monitors) -1),
  864. {reply, Reply, State#state{transaction_level = 1, monitors = NewMonitors}};
  865. #error{} ->
  866. {reply, {error, error_to_reason(Rec)}, State}
  867. end.
  868. %% @doc Schedules (or re-schedules) ping.
  869. schedule_ping(State = #state{ping_timeout = infinity}) ->
  870. State;
  871. schedule_ping(State = #state{ping_timeout = Timeout, ping_ref = Ref}) ->
  872. is_reference(Ref) andalso erlang:cancel_timer(Ref),
  873. State#state{ping_ref = erlang:send_after(Timeout, self(), ping)}.
  874. %% @doc Fetches and logs warnings. Query is the query that gave the warnings.
  875. log_warnings(#state{socket = Socket, sockmod = SockMod} = State, Query) ->
  876. SockMod:setopts(Socket, [{active, false}]),
  877. SockMod = State#state.sockmod,
  878. {ok, [#resultset{rows = Rows}]} = mysql_protocol:query(<<"SHOW WARNINGS">>,
  879. SockMod, Socket,
  880. ?cmd_timeout),
  881. SockMod:setopts(Socket, [{active, once}]),
  882. Lines = [[Level, " ", integer_to_binary(Code), ": ", Message, "\n"]
  883. || [Level, Code, Message] <- Rows],
  884. error_logger:warning_msg("~s in ~s~n", [Lines, Query]).
  885. %% @doc Makes a separate connection and execute KILL QUERY. We do this to get
  886. %% our main connection back to normal. KILL QUERY appeared in MySQL 5.0.0.
  887. kill_query(#state{connection_id = ConnId, host = Host, port = Port,
  888. user = User, password = Password, ssl_opts = SSLOpts,
  889. cap_found_rows = SetFoundRows}) ->
  890. %% Connect socket
  891. SockOpts = [{active, false}, binary, {packet, raw}],
  892. {ok, Socket0} = mysql_sock_tcp:connect(Host, Port, SockOpts),
  893. %% Exchange handshake communication.
  894. Result = mysql_protocol:handshake(User, Password, undefined, mysql_sock_tcp,
  895. SSLOpts, Socket0, SetFoundRows),
  896. case Result of
  897. {ok, #handshake{}, SockMod, Socket} ->
  898. %% Kill and disconnect
  899. IdBin = integer_to_binary(ConnId),
  900. {ok, [#ok{}]} = mysql_protocol:query(<<"KILL QUERY ", IdBin/binary>>,
  901. SockMod, Socket, ?cmd_timeout),
  902. mysql_protocol:quit(SockMod, Socket);
  903. #error{} = E ->
  904. error_logger:error_msg("Failed to connect to kill query: ~p",
  905. [error_to_reason(E)])
  906. end.
  907. stop_server(Reason,
  908. #state{socket = Socket, connection_id = ConnId} = State) ->
  909. error_logger:error_msg("Connection Id ~p closing with reason: ~p~n",
  910. [ConnId, Reason]),
  911. ok = gen_tcp:close(Socket),
  912. {stop, Reason, State#state{socket = undefined, connection_id = undefined}}.
  913. demonitor_processes(List, 0) ->
  914. List;
  915. demonitor_processes([{_FromPid, MRef}|T], Count) ->
  916. erlang:demonitor(MRef),
  917. demonitor_processes(T, Count -1).