mysql_conn.erl 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532
  1. %% MySQL/OTP – MySQL client library for Erlang/OTP
  2. %% Copyright (C) 2014-2018 Viktor Söderqvist
  3. %%
  4. %% This file is part of MySQL/OTP.
  5. %%
  6. %% MySQL/OTP is free software: you can redistribute it and/or modify it under
  7. %% the terms of the GNU Lesser General Public License as published by the Free
  8. %% Software Foundation, either version 3 of the License, or (at your option)
  9. %% any later version.
  10. %%
  11. %% This program is distributed in the hope that it will be useful, but WITHOUT
  12. %% ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13. %% FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
  14. %% more details.
  15. %%
  16. %% You should have received a copy of the GNU Lesser General Public License
  17. %% along with this program. If not, see <https://www.gnu.org/licenses/>.
  18. %% @doc This module implements parts of the MySQL client/server protocol.
  19. %%
  20. %% The protocol is described in the document "MySQL Internals" which can be
  21. %% found under "MySQL Documentation: Expert Guides" on http://dev.mysql.com/.
  22. %%
  23. %% TCP communication is not handled in this module. Most of the public functions
  24. %% take funs for data communitaction as parameters.
  25. %% @private
  26. -module(mysql_conn).
  27. -behaviour(gen_server).
  28. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
  29. code_change/3]).
  30. -define(default_host, "localhost").
  31. -define(default_port, 3306).
  32. -define(default_user, <<>>).
  33. -define(default_password, <<>>).
  34. -define(default_query_timeout, infinity).
  35. -define(default_query_cache_time, 60000). %% for query/3.
  36. -define(default_ping_timeout, 60000).
  37. -define(cmd_timeout, 3000). %% Timeout used for various commands to the server
  38. %% Errors that cause "implicit rollback"
  39. -define(ERROR_DEADLOCK, 1213).
  40. %% --- Gen_server callbacks ---
  41. -include("records.hrl").
  42. -include("server_status.hrl").
  43. %% Gen_server state
  44. -record(state, {server_version, connection_id, socket, sockmod, ssl_opts,
  45. host, port, user, password, log_warnings,
  46. ping_timeout,
  47. query_timeout, query_cache_time,
  48. affected_rows = 0, status = 0, warning_count = 0, insert_id = 0,
  49. transaction_levels = [], ping_ref = undefined,
  50. stmts = dict:new(), query_cache = empty, cap_found_rows = false}).
  51. %% @private
  52. init(Opts) ->
  53. %% Connect
  54. Host = proplists:get_value(host, Opts, ?default_host),
  55. Port = proplists:get_value(port, Opts, ?default_port),
  56. User = proplists:get_value(user, Opts, ?default_user),
  57. Password = proplists:get_value(password, Opts, ?default_password),
  58. Database = proplists:get_value(database, Opts, undefined),
  59. LogWarn = proplists:get_value(log_warnings, Opts, true),
  60. KeepAlive = proplists:get_value(keep_alive, Opts, false),
  61. Timeout = proplists:get_value(query_timeout, Opts,
  62. ?default_query_timeout),
  63. QueryCacheTime = proplists:get_value(query_cache_time, Opts,
  64. ?default_query_cache_time),
  65. TcpOpts = proplists:get_value(tcp_options, Opts, []),
  66. SetFoundRows = proplists:get_value(found_rows, Opts, false),
  67. SSLOpts = proplists:get_value(ssl, Opts, undefined),
  68. SockMod0 = mysql_sock_tcp,
  69. PingTimeout = case KeepAlive of
  70. true -> ?default_ping_timeout;
  71. false -> infinity;
  72. N when N > 0 -> N
  73. end,
  74. %% Connect socket
  75. SockOpts = [binary, {packet, raw}, {active, false} | TcpOpts],
  76. {ok, Socket0} = SockMod0:connect(Host, Port, SockOpts),
  77. %% Exchange handshake communication.
  78. Result = mysql_protocol:handshake(User, Password, Database, SockMod0, SSLOpts,
  79. Socket0, SetFoundRows),
  80. case Result of
  81. {ok, Handshake, SockMod, Socket} ->
  82. SockMod:setopts(Socket, [{active, once}]),
  83. #handshake{server_version = Version, connection_id = ConnId,
  84. status = Status} = Handshake,
  85. State = #state{server_version = Version, connection_id = ConnId,
  86. sockmod = SockMod,
  87. socket = Socket,
  88. ssl_opts = SSLOpts,
  89. host = Host, port = Port, user = User,
  90. password = Password, status = Status,
  91. log_warnings = LogWarn,
  92. ping_timeout = PingTimeout,
  93. query_timeout = Timeout,
  94. query_cache_time = QueryCacheTime,
  95. cap_found_rows = (SetFoundRows =:= true)},
  96. %% Trap exit so that we can properly disconnect when we die.
  97. process_flag(trap_exit, true),
  98. State1 = schedule_ping(State),
  99. {ok, State1};
  100. #error{} = E ->
  101. {stop, error_to_reason(E)}
  102. end.
  103. %% @private
  104. %% @doc
  105. %%
  106. %% Query and execute calls:
  107. %%
  108. %% <ul>
  109. %% <li>{query, Query}</li>
  110. %% <li>{query, Query, Timeout}</li>
  111. %% <li>{param_query, Query, Params}</li>
  112. %% <li>{param_query, Query, Params, Timeout}</li>
  113. %% <li>{execute, Stmt, Args}</li>
  114. %% <li>{execute, Stmt, Args, Timeout}</li>
  115. %% </ul>
  116. %%
  117. %% For the calls listed above, we return these values:
  118. %%
  119. %% <dl>
  120. %% <dt>`ok'</dt>
  121. %% <dd>Success without returning any table data (UPDATE, etc.)</dd>
  122. %% <dt>`{ok, ColumnNames, Rows}'</dt>
  123. %% <dd>Queries returning one result set of table data</dd>
  124. %% <dt>`{ok, [{ColumnNames, Rows}, ...]}'</dt>
  125. %% <dd>Queries returning more than one result set of table data</dd>
  126. %% <dt>`{error, ServerReason}'</dt>
  127. %% <dd>MySQL server error</dd>
  128. %% <dt>`{implicit_commit, NestingLevel, Query}'</dt>
  129. %% <dd>A DDL statement (e.g. CREATE TABLE, ALTER TABLE, etc.) results in
  130. %% an implicit commit.
  131. %%
  132. %% If the caller is in a (nested) transaction, it must be aborted. To be
  133. %% able to handle this in the caller's process, we also return the
  134. %% nesting level.</dd>
  135. %% <dt>`{implicit_rollback, NestingLevel, ServerReason}'</dt>
  136. %% <dd>This errors results in an implicit rollback: `{1213, <<"40001">>,
  137. %% <<"Deadlock found when trying to get lock; try restarting "
  138. %% "transaction">>}'.
  139. %%
  140. %% If the caller is in a (nested) transaction, it must be aborted. To be
  141. %% able to handle this in the caller's process, we also return the
  142. %% nesting level.</dd>
  143. %% </dl>
  144. handle_call({query, Query}, From, State) ->
  145. handle_call({query, Query, State#state.query_timeout}, From, State);
  146. handle_call({query, Query, Timeout}, _From, State) ->
  147. SockMod = State#state.sockmod,
  148. Socket = State#state.socket,
  149. SockMod:setopts(Socket, [{active, false}]),
  150. {ok, Recs} = case mysql_protocol:query(Query, SockMod, Socket, Timeout) of
  151. {error, timeout} when State#state.server_version >= [5, 0, 0] ->
  152. kill_query(State),
  153. mysql_protocol:fetch_query_response(SockMod, Socket, ?cmd_timeout);
  154. {error, timeout} ->
  155. %% For MySQL 4.x.x there is no way to recover from timeout except
  156. %% killing the connection itself.
  157. exit(timeout);
  158. QueryResult ->
  159. QueryResult
  160. end,
  161. SockMod:setopts(Socket, [{active, once}]),
  162. State1 = lists:foldl(fun update_state/2, State, Recs),
  163. State1#state.warning_count > 0 andalso State1#state.log_warnings
  164. andalso log_warnings(State1, Query),
  165. handle_query_call_reply(Recs, Query, State1, []);
  166. handle_call({param_query, Query, Params}, From, State) ->
  167. handle_call({param_query, Query, Params, State#state.query_timeout}, From,
  168. State);
  169. handle_call({param_query, Query, Params, Timeout}, _From, State) ->
  170. %% Parametrized query: Prepared statement cached with the query as the key
  171. QueryBin = iolist_to_binary(Query),
  172. #state{socket = Socket, sockmod = SockMod} = State,
  173. Cache = State#state.query_cache,
  174. {StmtResult, Cache1} = case mysql_cache:lookup(QueryBin, Cache) of
  175. {found, FoundStmt, NewCache} ->
  176. %% Found
  177. {{ok, FoundStmt}, NewCache};
  178. not_found ->
  179. %% Prepare
  180. SockMod:setopts(Socket, [{active, false}]),
  181. SockMod = State#state.sockmod,
  182. Rec = mysql_protocol:prepare(Query, SockMod, Socket),
  183. SockMod:setopts(Socket, [{active, once}]),
  184. case Rec of
  185. #error{} = E ->
  186. {{error, error_to_reason(E)}, Cache};
  187. #prepared{} = Stmt ->
  188. %% If the first entry in the cache, start the timer.
  189. Cache == empty andalso begin
  190. When = State#state.query_cache_time * 2,
  191. erlang:send_after(When, self(), query_cache)
  192. end,
  193. {{ok, Stmt}, mysql_cache:store(QueryBin, Stmt, Cache)}
  194. end
  195. end,
  196. case StmtResult of
  197. {ok, StmtRec} ->
  198. State1 = State#state{query_cache = Cache1},
  199. execute_stmt(StmtRec, Params, Timeout, State1);
  200. PrepareError ->
  201. {reply, PrepareError, State}
  202. end;
  203. handle_call({execute, Stmt, Args}, From, State) ->
  204. handle_call({execute, Stmt, Args, State#state.query_timeout}, From, State);
  205. handle_call({execute, Stmt, Args, Timeout}, _From, State) ->
  206. case dict:find(Stmt, State#state.stmts) of
  207. {ok, StmtRec} ->
  208. execute_stmt(StmtRec, Args, Timeout, State);
  209. error ->
  210. {reply, {error, not_prepared}, State}
  211. end;
  212. handle_call({prepare, Query}, _From, State) ->
  213. #state{socket = Socket, sockmod = SockMod} = State,
  214. SockMod:setopts(Socket, [{active, false}]),
  215. SockMod = State#state.sockmod,
  216. Rec = mysql_protocol:prepare(Query, SockMod, Socket),
  217. SockMod:setopts(Socket, [{active, once}]),
  218. State1 = update_state(Rec, State),
  219. case Rec of
  220. #error{} = E ->
  221. {reply, {error, error_to_reason(E)}, State1};
  222. #prepared{statement_id = Id} = Stmt ->
  223. Stmts1 = dict:store(Id, Stmt, State1#state.stmts),
  224. State2 = State#state{stmts = Stmts1},
  225. {reply, {ok, Id}, State2}
  226. end;
  227. handle_call({prepare, Name, Query}, _From, State) when is_atom(Name) ->
  228. #state{socket = Socket, sockmod = SockMod} = State,
  229. %% First unprepare if there is an old statement with this name.
  230. SockMod:setopts(Socket, [{active, false}]),
  231. SockMod = State#state.sockmod,
  232. State1 = case dict:find(Name, State#state.stmts) of
  233. {ok, OldStmt} ->
  234. mysql_protocol:unprepare(OldStmt, SockMod, Socket),
  235. State#state{stmts = dict:erase(Name, State#state.stmts)};
  236. error ->
  237. State
  238. end,
  239. Rec = mysql_protocol:prepare(Query, SockMod, Socket),
  240. SockMod:setopts(Socket, [{active, once}]),
  241. State2 = update_state(Rec, State1),
  242. case Rec of
  243. #error{} = E ->
  244. {reply, {error, error_to_reason(E)}, State2};
  245. #prepared{} = Stmt ->
  246. Stmts1 = dict:store(Name, Stmt, State2#state.stmts),
  247. State3 = State2#state{stmts = Stmts1},
  248. {reply, {ok, Name}, State3}
  249. end;
  250. handle_call({unprepare, Stmt}, _From, State) when is_atom(Stmt);
  251. is_integer(Stmt) ->
  252. case dict:find(Stmt, State#state.stmts) of
  253. {ok, StmtRec} ->
  254. #state{socket = Socket, sockmod = SockMod} = State,
  255. SockMod:setopts(Socket, [{active, false}]),
  256. SockMod = State#state.sockmod,
  257. mysql_protocol:unprepare(StmtRec, SockMod, Socket),
  258. SockMod:setopts(Socket, [{active, once}]),
  259. State1 = State#state{stmts = dict:erase(Stmt, State#state.stmts)},
  260. State2 = schedule_ping(State1),
  261. {reply, ok, State2};
  262. error ->
  263. {reply, {error, not_prepared}, State}
  264. end;
  265. handle_call(warning_count, _From, State) ->
  266. {reply, State#state.warning_count, State};
  267. handle_call(insert_id, _From, State) ->
  268. {reply, State#state.insert_id, State};
  269. handle_call(affected_rows, _From, State) ->
  270. {reply, State#state.affected_rows, State};
  271. handle_call(autocommit, _From, State) ->
  272. {reply, State#state.status band ?SERVER_STATUS_AUTOCOMMIT /= 0, State};
  273. handle_call(backslash_escapes_enabled, _From, State = #state{status = S}) ->
  274. {reply, S band ?SERVER_STATUS_NO_BACKSLASH_ESCAPES == 0, State};
  275. handle_call(in_transaction, _From, State) ->
  276. {reply, State#state.status band ?SERVER_STATUS_IN_TRANS /= 0, State};
  277. handle_call(start_transaction, {FromPid, _},
  278. State = #state{socket = Socket, sockmod = SockMod,
  279. transaction_levels = L, status = Status})
  280. when Status band ?SERVER_STATUS_IN_TRANS == 0, L == [];
  281. Status band ?SERVER_STATUS_IN_TRANS /= 0, L /= [] ->
  282. MRef = erlang:monitor(process, FromPid),
  283. Query = case L of
  284. [] -> <<"BEGIN">>;
  285. _ -> <<"SAVEPOINT s", (integer_to_binary(length(L)))/binary>>
  286. end,
  287. SockMod:setopts(Socket, [{active, false}]),
  288. SockMod = State#state.sockmod,
  289. {ok, [Res = #ok{}]} = mysql_protocol:query(Query, SockMod, Socket,
  290. ?cmd_timeout),
  291. SockMod:setopts(Socket, [{active, once}]),
  292. State1 = update_state(Res, State),
  293. {reply, ok, State1#state{transaction_levels = [{FromPid, MRef} | L]}};
  294. handle_call(rollback, {FromPid, _},
  295. State = #state{socket = Socket, sockmod = SockMod, status = Status,
  296. transaction_levels = [{FromPid, MRef} | L]})
  297. when Status band ?SERVER_STATUS_IN_TRANS /= 0 ->
  298. erlang:demonitor(MRef),
  299. Query = case L of
  300. [] -> <<"ROLLBACK">>;
  301. _ -> <<"ROLLBACK TO s", (integer_to_binary(length(L)))/binary>>
  302. end,
  303. SockMod:setopts(Socket, [{active, false}]),
  304. SockMod = State#state.sockmod,
  305. {ok, [Res = #ok{}]} = mysql_protocol:query(Query, SockMod, Socket,
  306. ?cmd_timeout),
  307. SockMod:setopts(Socket, [{active, once}]),
  308. State1 = update_state(Res, State),
  309. {reply, ok, State1#state{transaction_levels = L}};
  310. handle_call(commit, {FromPid, _},
  311. State = #state{socket = Socket, sockmod = SockMod, status = Status,
  312. transaction_levels = [{FromPid, MRef} | L]})
  313. when Status band ?SERVER_STATUS_IN_TRANS /= 0 ->
  314. erlang:demonitor(MRef),
  315. Query = case L of
  316. [] -> <<"COMMIT">>;
  317. _ -> <<"RELEASE SAVEPOINT s", (integer_to_binary(length(L)))/binary>>
  318. end,
  319. SockMod:setopts(Socket, [{active, false}]),
  320. SockMod = State#state.sockmod,
  321. {ok, [Res = #ok{}]} = mysql_protocol:query(Query, SockMod, Socket,
  322. ?cmd_timeout),
  323. SockMod:setopts(Socket, [{active, once}]),
  324. State1 = update_state(Res, State),
  325. {reply, ok, State1#state{transaction_levels = L}}.
  326. %% @private
  327. handle_cast(_Msg, State) ->
  328. {noreply, State}.
  329. %% @private
  330. handle_info(query_cache, #state{query_cache = Cache,
  331. query_cache_time = CacheTime} = State) ->
  332. %% Evict expired queries/statements in the cache used by query/3.
  333. {Evicted, Cache1} = mysql_cache:evict_older_than(Cache, CacheTime),
  334. %% Unprepare the evicted statements
  335. #state{socket = Socket, sockmod = SockMod} = State,
  336. SockMod:setopts(Socket, [{active, false}]),
  337. SockMod = State#state.sockmod,
  338. lists:foreach(fun ({_Query, Stmt}) ->
  339. mysql_protocol:unprepare(Stmt, SockMod, Socket)
  340. end,
  341. Evicted),
  342. SockMod:setopts(Socket, [{active, once}]),
  343. %% If nonempty, schedule eviction again.
  344. mysql_cache:size(Cache1) > 0 andalso
  345. erlang:send_after(CacheTime, self(), query_cache),
  346. {noreply, State#state{query_cache = Cache1}};
  347. handle_info({'DOWN', _MRef, _, Pid, _Info}, State) ->
  348. stop_server({application_process_died, Pid}, State);
  349. handle_info(ping, #state{socket = Socket, sockmod = SockMod} = State) ->
  350. SockMod:setopts(Socket, [{active, false}]),
  351. SockMod = State#state.sockmod,
  352. Ok = mysql_protocol:ping(SockMod, Socket),
  353. SockMod:setopts(Socket, [{active, once}]),
  354. {noreply, update_state(Ok, State)};
  355. handle_info({tcp_closed, _Socket}, State) ->
  356. stop_server(tcp_closed, State);
  357. handle_info({tcp_error, _Socket, Reason}, State) ->
  358. stop_server({tcp_error, Reason}, State);
  359. handle_info(_Info, State) ->
  360. {noreply, State}.
  361. %% @private
  362. terminate(Reason, #state{socket = Socket, sockmod = SockMod})
  363. when Reason == normal; Reason == shutdown ->
  364. %% Send the goodbye message for politeness.
  365. SockMod:setopts(Socket, [{active, false}]),
  366. mysql_protocol:quit(SockMod, Socket);
  367. terminate(_Reason, _State) ->
  368. ok.
  369. %% @private
  370. code_change(_OldVsn, State = #state{}, _Extra) ->
  371. {ok, State};
  372. code_change(_OldVsn, _State, _Extra) ->
  373. {error, incompatible_state}.
  374. %% --- Helpers ---
  375. %% @doc Executes a prepared statement and returns {Reply, NextState}.
  376. execute_stmt(Stmt, Args, Timeout, State = #state{socket = Socket, sockmod = SockMod}) ->
  377. SockMod:setopts(Socket, [{active, false}]),
  378. SockMod = State#state.sockmod,
  379. {ok, Recs} = case mysql_protocol:execute(Stmt, Args, SockMod, Socket,
  380. Timeout) of
  381. {error, timeout} when State#state.server_version >= [5, 0, 0] ->
  382. kill_query(State),
  383. mysql_protocol:fetch_execute_response(SockMod, Socket,
  384. ?cmd_timeout);
  385. {error, timeout} ->
  386. %% For MySQL 4.x.x there is no way to recover from timeout except
  387. %% killing the connection itself.
  388. exit(timeout);
  389. QueryResult ->
  390. QueryResult
  391. end,
  392. SockMod:setopts(Socket, [{active, once}]),
  393. State1 = lists:foldl(fun update_state/2, State, Recs),
  394. State1#state.warning_count > 0 andalso State1#state.log_warnings
  395. andalso log_warnings(State1, Stmt#prepared.orig_query),
  396. handle_query_call_reply(Recs, Stmt#prepared.orig_query, State1, []).
  397. %% @doc Produces a tuple to return as an error reason.
  398. -spec error_to_reason(#error{}) -> mysql:server_reason().
  399. error_to_reason(#error{code = Code, state = State, msg = Msg}) ->
  400. {Code, State, Msg}.
  401. %% @doc Updates a state with information from a response. Also re-schedules
  402. %% ping.
  403. -spec update_state(#ok{} | #eof{} | any(), #state{}) -> #state{}.
  404. update_state(Rec, State) ->
  405. State1 = case Rec of
  406. #ok{status = S, affected_rows = R, insert_id = Id, warning_count = W} ->
  407. State#state{status = S, affected_rows = R, insert_id = Id,
  408. warning_count = W};
  409. #resultset{status = S, warning_count = W} ->
  410. State#state{status = S, warning_count = W};
  411. #prepared{warning_count = W} ->
  412. State#state{warning_count = W};
  413. _Other ->
  414. %% This includes errors.
  415. %% Reset some things. (Note: We don't reset status and insert_id.)
  416. State#state{warning_count = 0, affected_rows = 0}
  417. end,
  418. schedule_ping(State1).
  419. %% @doc Produces a reply for handle_call/3 for queries and prepared statements.
  420. handle_query_call_reply([], _Query, State, ResultSetsAcc) ->
  421. Reply = case ResultSetsAcc of
  422. [] -> ok;
  423. [{ColumnNames, Rows}] -> {ok, ColumnNames, Rows};
  424. [_|_] -> {ok, lists:reverse(ResultSetsAcc)}
  425. end,
  426. {reply, Reply, State};
  427. handle_query_call_reply([Rec|Recs], Query,
  428. #state{transaction_levels = L} = State,
  429. ResultSetsAcc) ->
  430. case Rec of
  431. #ok{status = Status} when Status band ?SERVER_STATUS_IN_TRANS == 0,
  432. L /= [] ->
  433. %% DDL statements (e.g. CREATE TABLE, ALTER TABLE, etc.) result in
  434. %% an implicit commit.
  435. Length = length(L),
  436. Reply = {implicit_commit, Length, Query},
  437. [] = demonitor_processes(L, Length),
  438. {reply, Reply, State#state{transaction_levels = []}};
  439. #ok{} ->
  440. handle_query_call_reply(Recs, Query, State, ResultSetsAcc);
  441. #resultset{cols = ColDefs, rows = Rows} ->
  442. Names = [Def#col.name || Def <- ColDefs],
  443. ResultSetsAcc1 = [{Names, Rows} | ResultSetsAcc],
  444. handle_query_call_reply(Recs, Query, State, ResultSetsAcc1);
  445. #error{code = ?ERROR_DEADLOCK} when L /= [] ->
  446. %% These errors result in an implicit rollback.
  447. Reply = {implicit_rollback, length(L), error_to_reason(Rec)},
  448. %% Everything in the transaction is rolled back, except the BEGIN
  449. %% statement itself. Thus, we are in transaction level 1.
  450. NewMonitors = demonitor_processes(L, length(L) - 1),
  451. {reply, Reply, State#state{transaction_levels = NewMonitors}};
  452. #error{} ->
  453. {reply, {error, error_to_reason(Rec)}, State}
  454. end.
  455. %% @doc Schedules (or re-schedules) ping.
  456. schedule_ping(State = #state{ping_timeout = infinity}) ->
  457. State;
  458. schedule_ping(State = #state{ping_timeout = Timeout, ping_ref = Ref}) ->
  459. is_reference(Ref) andalso erlang:cancel_timer(Ref),
  460. State#state{ping_ref = erlang:send_after(Timeout, self(), ping)}.
  461. %% @doc Fetches and logs warnings. Query is the query that gave the warnings.
  462. log_warnings(#state{socket = Socket, sockmod = SockMod} = State, Query) ->
  463. SockMod:setopts(Socket, [{active, false}]),
  464. SockMod = State#state.sockmod,
  465. {ok, [#resultset{rows = Rows}]} = mysql_protocol:query(<<"SHOW WARNINGS">>,
  466. SockMod, Socket,
  467. ?cmd_timeout),
  468. SockMod:setopts(Socket, [{active, once}]),
  469. Lines = [[Level, " ", integer_to_binary(Code), ": ", Message, "\n"]
  470. || [Level, Code, Message] <- Rows],
  471. error_logger:warning_msg("~s in ~s~n", [Lines, Query]).
  472. %% @doc Makes a separate connection and execute KILL QUERY. We do this to get
  473. %% our main connection back to normal. KILL QUERY appeared in MySQL 5.0.0.
  474. kill_query(#state{connection_id = ConnId, host = Host, port = Port,
  475. user = User, password = Password, ssl_opts = SSLOpts,
  476. cap_found_rows = SetFoundRows}) ->
  477. %% Connect socket
  478. SockOpts = [{active, false}, binary, {packet, raw}],
  479. {ok, Socket0} = mysql_sock_tcp:connect(Host, Port, SockOpts),
  480. %% Exchange handshake communication.
  481. Result = mysql_protocol:handshake(User, Password, undefined, mysql_sock_tcp,
  482. SSLOpts, Socket0, SetFoundRows),
  483. case Result of
  484. {ok, #handshake{}, SockMod, Socket} ->
  485. %% Kill and disconnect
  486. IdBin = integer_to_binary(ConnId),
  487. {ok, [#ok{}]} = mysql_protocol:query(<<"KILL QUERY ", IdBin/binary>>,
  488. SockMod, Socket, ?cmd_timeout),
  489. mysql_protocol:quit(SockMod, Socket);
  490. #error{} = E ->
  491. error_logger:error_msg("Failed to connect to kill query: ~p",
  492. [error_to_reason(E)])
  493. end.
  494. stop_server(Reason,
  495. #state{socket = Socket, connection_id = ConnId} = State) ->
  496. error_logger:error_msg("Connection Id ~p closing with reason: ~p~n",
  497. [ConnId, Reason]),
  498. ok = gen_tcp:close(Socket),
  499. {stop, Reason, State#state{socket = undefined, connection_id = undefined}}.
  500. demonitor_processes(List, 0) ->
  501. List;
  502. demonitor_processes([{_FromPid, MRef}|T], Count) ->
  503. erlang:demonitor(MRef),
  504. demonitor_processes(T, Count - 1).