mysql_conn.erl 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570
  1. %% MySQL/OTP – MySQL client library for Erlang/OTP
  2. %% Copyright (C) 2014-2018 Viktor Söderqvist
  3. %%
  4. %% This file is part of MySQL/OTP.
  5. %%
  6. %% MySQL/OTP is free software: you can redistribute it and/or modify it under
  7. %% the terms of the GNU Lesser General Public License as published by the Free
  8. %% Software Foundation, either version 3 of the License, or (at your option)
  9. %% any later version.
  10. %%
  11. %% This program is distributed in the hope that it will be useful, but WITHOUT
  12. %% ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13. %% FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
  14. %% more details.
  15. %%
  16. %% You should have received a copy of the GNU Lesser General Public License
  17. %% along with this program. If not, see <https://www.gnu.org/licenses/>.
  18. %% @doc This module implements parts of the MySQL client/server protocol.
  19. %%
  20. %% The protocol is described in the document "MySQL Internals" which can be
  21. %% found under "MySQL Documentation: Expert Guides" on http://dev.mysql.com/.
  22. %%
  23. %% TCP communication is not handled in this module. Most of the public functions
  24. %% take funs for data communitaction as parameters.
  25. %% @private
  26. -module(mysql_conn).
  27. -behaviour(gen_server).
  28. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
  29. code_change/3]).
  30. -define(default_host, "localhost").
  31. -define(default_port, 3306).
  32. -define(default_user, <<>>).
  33. -define(default_password, <<>>).
  34. -define(default_query_timeout, infinity).
  35. -define(default_query_cache_time, 60000). %% for query/3.
  36. -define(default_ping_timeout, 60000).
  37. -define(cmd_timeout, 3000). %% Timeout used for various commands to the server
  38. %% Errors that cause "implicit rollback"
  39. -define(ERROR_DEADLOCK, 1213).
  40. %% --- Gen_server callbacks ---
  41. -include("records.hrl").
  42. -include("server_status.hrl").
  43. %% Gen_server state
  44. -record(state, {server_version, connection_id, socket, sockmod, ssl_opts,
  45. host, port, user, password, auth_plugin_data, log_warnings,
  46. ping_timeout,
  47. query_timeout, query_cache_time,
  48. affected_rows = 0, status = 0, warning_count = 0, insert_id = 0,
  49. transaction_levels = [], ping_ref = undefined,
  50. stmts = dict:new(), query_cache = empty, cap_found_rows = false}).
  51. %% @private
  52. init(Opts) ->
  53. %% Connect
  54. Host = proplists:get_value(host, Opts, ?default_host),
  55. DefaultPort = case Host of
  56. {local, _LocalAddr} -> 0;
  57. _NonLocalAddr -> ?default_port
  58. end,
  59. Port = proplists:get_value(port, Opts, DefaultPort),
  60. User = proplists:get_value(user, Opts, ?default_user),
  61. Password = proplists:get_value(password, Opts, ?default_password),
  62. Database = proplists:get_value(database, Opts, undefined),
  63. LogWarn = proplists:get_value(log_warnings, Opts, true),
  64. KeepAlive = proplists:get_value(keep_alive, Opts, false),
  65. Timeout = proplists:get_value(query_timeout, Opts,
  66. ?default_query_timeout),
  67. QueryCacheTime = proplists:get_value(query_cache_time, Opts,
  68. ?default_query_cache_time),
  69. TcpOpts = proplists:get_value(tcp_options, Opts, []),
  70. SetFoundRows = proplists:get_value(found_rows, Opts, false),
  71. SSLOpts = proplists:get_value(ssl, Opts, undefined),
  72. SockMod0 = gen_tcp,
  73. PingTimeout = case KeepAlive of
  74. true -> ?default_ping_timeout;
  75. false -> infinity;
  76. N when N > 0 -> N
  77. end,
  78. %% Connect socket
  79. SockOpts = [binary, {packet, raw}, {active, false}, {nodelay, true}
  80. | TcpOpts],
  81. {ok, Socket0} = SockMod0:connect(Host, Port, SockOpts),
  82. %% If buffer wasn't specifically defined make it at least as
  83. %% large as recbuf, as suggested by the inet:setopts() docs.
  84. case proplists:is_defined(buffer, TcpOpts) of
  85. true ->
  86. ok;
  87. false ->
  88. {ok, [{buffer, Buffer}]} = inet:getopts(Socket0, [buffer]),
  89. {ok, [{recbuf, Recbuf}]} = inet:getopts(Socket0, [recbuf]),
  90. ok = inet:setopts(Socket0,[{buffer, max(Buffer, Recbuf)}])
  91. end,
  92. %% Exchange handshake communication.
  93. Result = mysql_protocol:handshake(User, Password, Database, SockMod0, SSLOpts,
  94. Socket0, SetFoundRows),
  95. case Result of
  96. {ok, Handshake, SockMod, Socket} ->
  97. setopts(SockMod, Socket, [{active, once}]),
  98. #handshake{server_version = Version, connection_id = ConnId,
  99. status = Status,
  100. auth_plugin_data = AuthPluginData} = Handshake,
  101. State = #state{server_version = Version, connection_id = ConnId,
  102. sockmod = SockMod,
  103. socket = Socket,
  104. ssl_opts = SSLOpts,
  105. host = Host, port = Port,
  106. user = User, password = Password,
  107. auth_plugin_data = AuthPluginData,
  108. status = Status,
  109. log_warnings = LogWarn,
  110. ping_timeout = PingTimeout,
  111. query_timeout = Timeout,
  112. query_cache_time = QueryCacheTime,
  113. cap_found_rows = (SetFoundRows =:= true)},
  114. %% Trap exit so that we can properly disconnect when we die.
  115. process_flag(trap_exit, true),
  116. State1 = schedule_ping(State),
  117. {ok, State1};
  118. #error{} = E ->
  119. {stop, error_to_reason(E)}
  120. end.
  121. %% @private
  122. %% @doc
  123. %%
  124. %% Query and execute calls:
  125. %%
  126. %% <ul>
  127. %% <li>{query, Query, FilterMap, Timeout}</li>
  128. %% <li>{param_query, Query, Params, FilterMap, Timeout}</li>
  129. %% <li>{execute, Stmt, Args, FilterMap, Timeout}</li>
  130. %% </ul>
  131. %%
  132. %% For the calls listed above, we return these values:
  133. %%
  134. %% <dl>
  135. %% <dt>`ok'</dt>
  136. %% <dd>Success without returning any table data (UPDATE, etc.)</dd>
  137. %% <dt>`{ok, ColumnNames, Rows}'</dt>
  138. %% <dd>Queries returning one result set of table data</dd>
  139. %% <dt>`{ok, [{ColumnNames, Rows}, ...]}'</dt>
  140. %% <dd>Queries returning more than one result set of table data</dd>
  141. %% <dt>`{error, ServerReason}'</dt>
  142. %% <dd>MySQL server error</dd>
  143. %% <dt>`{implicit_commit, NestingLevel, Query}'</dt>
  144. %% <dd>A DDL statement (e.g. CREATE TABLE, ALTER TABLE, etc.) results in
  145. %% an implicit commit.
  146. %%
  147. %% If the caller is in a (nested) transaction, it must be aborted. To be
  148. %% able to handle this in the caller's process, we also return the
  149. %% nesting level.</dd>
  150. %% <dt>`{implicit_rollback, NestingLevel, ServerReason}'</dt>
  151. %% <dd>This errors results in an implicit rollback: `{1213, <<"40001">>,
  152. %% <<"Deadlock found when trying to get lock; try restarting "
  153. %% "transaction">>}'.
  154. %%
  155. %% If the caller is in a (nested) transaction, it must be aborted. To be
  156. %% able to handle this in the caller's process, we also return the
  157. %% nesting level.</dd>
  158. %% </dl>
  159. handle_call({query, Query, FilterMap, default_timeout}, From, State) ->
  160. handle_call({query, Query, FilterMap, State#state.query_timeout}, From,
  161. State);
  162. handle_call({query, Query, FilterMap, Timeout}, _From,
  163. #state{sockmod = SockMod, socket = Socket} = State) ->
  164. setopts(SockMod, Socket, [{active, false}]),
  165. Result = mysql_protocol:query(Query, SockMod, Socket, FilterMap, Timeout),
  166. {ok, Recs} = case Result of
  167. {error, timeout} when State#state.server_version >= [5, 0, 0] ->
  168. kill_query(State),
  169. mysql_protocol:fetch_query_response(SockMod, Socket, FilterMap,
  170. ?cmd_timeout);
  171. {error, timeout} ->
  172. %% For MySQL 4.x.x there is no way to recover from timeout except
  173. %% killing the connection itself.
  174. exit(timeout);
  175. QueryResult ->
  176. QueryResult
  177. end,
  178. setopts(SockMod, Socket, [{active, once}]),
  179. State1 = lists:foldl(fun update_state/2, State, Recs),
  180. State1#state.warning_count > 0 andalso State1#state.log_warnings
  181. andalso log_warnings(State1, Query),
  182. handle_query_call_reply(Recs, Query, State1, []);
  183. handle_call({param_query, Query, Params, FilterMap, default_timeout}, From,
  184. State) ->
  185. handle_call({param_query, Query, Params, FilterMap,
  186. State#state.query_timeout}, From, State);
  187. handle_call({param_query, Query, Params, FilterMap, Timeout}, _From,
  188. #state{socket = Socket, sockmod = SockMod} = State) ->
  189. %% Parametrized query: Prepared statement cached with the query as the key
  190. QueryBin = iolist_to_binary(Query),
  191. Cache = State#state.query_cache,
  192. {StmtResult, Cache1} = case mysql_cache:lookup(QueryBin, Cache) of
  193. {found, FoundStmt, NewCache} ->
  194. %% Found
  195. {{ok, FoundStmt}, NewCache};
  196. not_found ->
  197. %% Prepare
  198. setopts(SockMod, Socket, [{active, false}]),
  199. Rec = mysql_protocol:prepare(Query, SockMod, Socket),
  200. setopts(SockMod, Socket, [{active, once}]),
  201. case Rec of
  202. #error{} = E ->
  203. {{error, error_to_reason(E)}, Cache};
  204. #prepared{} = Stmt ->
  205. %% If the first entry in the cache, start the timer.
  206. Cache == empty andalso begin
  207. When = State#state.query_cache_time * 2,
  208. erlang:send_after(When, self(), query_cache)
  209. end,
  210. {{ok, Stmt}, mysql_cache:store(QueryBin, Stmt, Cache)}
  211. end
  212. end,
  213. case StmtResult of
  214. {ok, StmtRec} ->
  215. State1 = State#state{query_cache = Cache1},
  216. execute_stmt(StmtRec, Params, FilterMap, Timeout, State1);
  217. PrepareError ->
  218. {reply, PrepareError, State}
  219. end;
  220. handle_call({execute, Stmt, Args, FilterMap, default_timeout}, From, State) ->
  221. handle_call({execute, Stmt, Args, FilterMap, State#state.query_timeout},
  222. From, State);
  223. handle_call({execute, Stmt, Args, FilterMap, Timeout}, _From, State) ->
  224. case dict:find(Stmt, State#state.stmts) of
  225. {ok, StmtRec} ->
  226. execute_stmt(StmtRec, Args, FilterMap, Timeout, State);
  227. error ->
  228. {reply, {error, not_prepared}, State}
  229. end;
  230. handle_call({prepare, Query}, _From, State) ->
  231. #state{socket = Socket, sockmod = SockMod} = State,
  232. setopts(SockMod, Socket, [{active, false}]),
  233. Rec = mysql_protocol:prepare(Query, SockMod, Socket),
  234. setopts(SockMod, Socket, [{active, once}]),
  235. State1 = update_state(Rec, State),
  236. case Rec of
  237. #error{} = E ->
  238. {reply, {error, error_to_reason(E)}, State1};
  239. #prepared{statement_id = Id} = Stmt ->
  240. Stmts1 = dict:store(Id, Stmt, State1#state.stmts),
  241. State2 = State#state{stmts = Stmts1},
  242. {reply, {ok, Id}, State2}
  243. end;
  244. handle_call({prepare, Name, Query}, _From, State) when is_atom(Name) ->
  245. #state{socket = Socket, sockmod = SockMod} = State,
  246. %% First unprepare if there is an old statement with this name.
  247. setopts(SockMod, Socket, [{active, false}]),
  248. State1 = case dict:find(Name, State#state.stmts) of
  249. {ok, OldStmt} ->
  250. mysql_protocol:unprepare(OldStmt, SockMod, Socket),
  251. State#state{stmts = dict:erase(Name, State#state.stmts)};
  252. error ->
  253. State
  254. end,
  255. Rec = mysql_protocol:prepare(Query, SockMod, Socket),
  256. setopts(SockMod, Socket, [{active, once}]),
  257. State2 = update_state(Rec, State1),
  258. case Rec of
  259. #error{} = E ->
  260. {reply, {error, error_to_reason(E)}, State2};
  261. #prepared{} = Stmt ->
  262. Stmts1 = dict:store(Name, Stmt, State2#state.stmts),
  263. State3 = State2#state{stmts = Stmts1},
  264. {reply, {ok, Name}, State3}
  265. end;
  266. handle_call({unprepare, Stmt}, _From, State) when is_atom(Stmt);
  267. is_integer(Stmt) ->
  268. case dict:find(Stmt, State#state.stmts) of
  269. {ok, StmtRec} ->
  270. #state{socket = Socket, sockmod = SockMod} = State,
  271. setopts(SockMod, Socket, [{active, false}]),
  272. mysql_protocol:unprepare(StmtRec, SockMod, Socket),
  273. setopts(SockMod, Socket, [{active, once}]),
  274. State1 = State#state{stmts = dict:erase(Stmt, State#state.stmts)},
  275. State2 = schedule_ping(State1),
  276. {reply, ok, State2};
  277. error ->
  278. {reply, {error, not_prepared}, State}
  279. end;
  280. handle_call({change_user, Username, Password, Database}, From,
  281. State = #state{transaction_levels = []}) ->
  282. #state{socket = Socket, sockmod = SockMod,
  283. auth_plugin_data = AuthPluginData,
  284. server_version = ServerVersion} = State,
  285. setopts(SockMod, Socket, [{active, false}]),
  286. Result = mysql_protocol:change_user(SockMod, Socket, Username, Password,
  287. AuthPluginData, Database,
  288. ServerVersion),
  289. setopts(SockMod, Socket, [{active, once}]),
  290. State1 = update_state(Result, State),
  291. State1#state.warning_count > 0 andalso State1#state.log_warnings
  292. andalso log_warnings(State1, "CHANGE USER"),
  293. State2 = State1#state{query_cache = empty, stmts = dict:new()},
  294. case Result of
  295. #ok{} ->
  296. {reply, ok, State2#state{user = Username, password = Password}};
  297. #error{} = E ->
  298. gen_server:reply(From, {error, error_to_reason(E)}),
  299. stop_server(change_user_failed, State2)
  300. end;
  301. handle_call(warning_count, _From, State) ->
  302. {reply, State#state.warning_count, State};
  303. handle_call(insert_id, _From, State) ->
  304. {reply, State#state.insert_id, State};
  305. handle_call(affected_rows, _From, State) ->
  306. {reply, State#state.affected_rows, State};
  307. handle_call(autocommit, _From, State) ->
  308. {reply, State#state.status band ?SERVER_STATUS_AUTOCOMMIT /= 0, State};
  309. handle_call(backslash_escapes_enabled, _From, State = #state{status = S}) ->
  310. {reply, S band ?SERVER_STATUS_NO_BACKSLASH_ESCAPES == 0, State};
  311. handle_call(in_transaction, _From, State) ->
  312. {reply, State#state.status band ?SERVER_STATUS_IN_TRANS /= 0, State};
  313. handle_call(start_transaction, {FromPid, _},
  314. State = #state{socket = Socket, sockmod = SockMod,
  315. transaction_levels = L, status = Status})
  316. when Status band ?SERVER_STATUS_IN_TRANS == 0, L == [];
  317. Status band ?SERVER_STATUS_IN_TRANS /= 0, L /= [] ->
  318. MRef = erlang:monitor(process, FromPid),
  319. Query = case L of
  320. [] -> <<"BEGIN">>;
  321. _ -> <<"SAVEPOINT s", (integer_to_binary(length(L)))/binary>>
  322. end,
  323. setopts(SockMod, Socket, [{active, false}]),
  324. {ok, [Res = #ok{}]} = mysql_protocol:query(Query, SockMod, Socket,
  325. ?cmd_timeout),
  326. setopts(SockMod, Socket, [{active, once}]),
  327. State1 = update_state(Res, State),
  328. {reply, ok, State1#state{transaction_levels = [{FromPid, MRef} | L]}};
  329. handle_call(rollback, {FromPid, _},
  330. State = #state{socket = Socket, sockmod = SockMod, status = Status,
  331. transaction_levels = [{FromPid, MRef} | L]})
  332. when Status band ?SERVER_STATUS_IN_TRANS /= 0 ->
  333. erlang:demonitor(MRef),
  334. Query = case L of
  335. [] -> <<"ROLLBACK">>;
  336. _ -> <<"ROLLBACK TO s", (integer_to_binary(length(L)))/binary>>
  337. end,
  338. setopts(SockMod, Socket, [{active, false}]),
  339. {ok, [Res = #ok{}]} = mysql_protocol:query(Query, SockMod, Socket,
  340. ?cmd_timeout),
  341. setopts(SockMod, Socket, [{active, once}]),
  342. State1 = update_state(Res, State),
  343. {reply, ok, State1#state{transaction_levels = L}};
  344. handle_call(commit, {FromPid, _},
  345. State = #state{socket = Socket, sockmod = SockMod, status = Status,
  346. transaction_levels = [{FromPid, MRef} | L]})
  347. when Status band ?SERVER_STATUS_IN_TRANS /= 0 ->
  348. erlang:demonitor(MRef),
  349. Query = case L of
  350. [] -> <<"COMMIT">>;
  351. _ -> <<"RELEASE SAVEPOINT s", (integer_to_binary(length(L)))/binary>>
  352. end,
  353. setopts(SockMod, Socket, [{active, false}]),
  354. {ok, [Res = #ok{}]} = mysql_protocol:query(Query, SockMod, Socket,
  355. ?cmd_timeout),
  356. setopts(SockMod, Socket, [{active, once}]),
  357. State1 = update_state(Res, State),
  358. {reply, ok, State1#state{transaction_levels = L}}.
  359. %% @private
  360. handle_cast(_Msg, State) ->
  361. {noreply, State}.
  362. %% @private
  363. handle_info(query_cache, #state{query_cache = Cache,
  364. query_cache_time = CacheTime} = State) ->
  365. %% Evict expired queries/statements in the cache used by query/3.
  366. {Evicted, Cache1} = mysql_cache:evict_older_than(Cache, CacheTime),
  367. %% Unprepare the evicted statements
  368. #state{socket = Socket, sockmod = SockMod} = State,
  369. setopts(SockMod, Socket, [{active, false}]),
  370. lists:foreach(fun ({_Query, Stmt}) ->
  371. mysql_protocol:unprepare(Stmt, SockMod, Socket)
  372. end,
  373. Evicted),
  374. setopts(SockMod, Socket, [{active, once}]),
  375. %% If nonempty, schedule eviction again.
  376. mysql_cache:size(Cache1) > 0 andalso
  377. erlang:send_after(CacheTime, self(), query_cache),
  378. {noreply, State#state{query_cache = Cache1}};
  379. handle_info({'DOWN', _MRef, _, Pid, _Info}, State) ->
  380. stop_server({application_process_died, Pid}, State);
  381. handle_info(ping, #state{socket = Socket, sockmod = SockMod} = State) ->
  382. setopts(SockMod, Socket, [{active, false}]),
  383. Ok = mysql_protocol:ping(SockMod, Socket),
  384. setopts(SockMod, Socket, [{active, once}]),
  385. {noreply, update_state(Ok, State)};
  386. handle_info({tcp_closed, _Socket}, State) ->
  387. stop_server(tcp_closed, State);
  388. handle_info({tcp_error, _Socket, Reason}, State) ->
  389. stop_server({tcp_error, Reason}, State);
  390. handle_info(_Info, State) ->
  391. {noreply, State}.
  392. %% @private
  393. terminate(Reason, #state{socket = Socket, sockmod = SockMod})
  394. when Reason == normal; Reason == shutdown ->
  395. %% Send the goodbye message for politeness.
  396. setopts(SockMod, Socket, [{active, false}]),
  397. mysql_protocol:quit(SockMod, Socket);
  398. terminate(_Reason, _State) ->
  399. ok.
  400. %% @private
  401. code_change(_OldVsn, State = #state{}, _Extra) ->
  402. {ok, State};
  403. code_change(_OldVsn, _State, _Extra) ->
  404. {error, incompatible_state}.
  405. %% --- Helpers ---
  406. %% @doc Executes a prepared statement and returns {Reply, NextState}.
  407. execute_stmt(Stmt, Args, FilterMap, Timeout,
  408. State = #state{socket = Socket, sockmod = SockMod}) ->
  409. setopts(SockMod, Socket, [{active, false}]),
  410. {ok, Recs} = case mysql_protocol:execute(Stmt, Args, SockMod, Socket,
  411. FilterMap, Timeout) of
  412. {error, timeout} when State#state.server_version >= [5, 0, 0] ->
  413. kill_query(State),
  414. mysql_protocol:fetch_execute_response(SockMod, Socket,
  415. FilterMap, ?cmd_timeout);
  416. {error, timeout} ->
  417. %% For MySQL 4.x.x there is no way to recover from timeout except
  418. %% killing the connection itself.
  419. exit(timeout);
  420. QueryResult ->
  421. QueryResult
  422. end,
  423. setopts(SockMod, Socket, [{active, once}]),
  424. State1 = lists:foldl(fun update_state/2, State, Recs),
  425. State1#state.warning_count > 0 andalso State1#state.log_warnings
  426. andalso log_warnings(State1, Stmt#prepared.orig_query),
  427. handle_query_call_reply(Recs, Stmt#prepared.orig_query, State1, []).
  428. %% @doc Produces a tuple to return as an error reason.
  429. -spec error_to_reason(#error{}) -> mysql:server_reason().
  430. error_to_reason(#error{code = Code, state = State, msg = Msg}) ->
  431. {Code, State, Msg}.
  432. %% @doc Updates a state with information from a response. Also re-schedules
  433. %% ping.
  434. -spec update_state(#ok{} | #eof{} | any(), #state{}) -> #state{}.
  435. update_state(Rec, State) ->
  436. State1 = case Rec of
  437. #ok{status = S, affected_rows = R, insert_id = Id, warning_count = W} ->
  438. State#state{status = S, affected_rows = R, insert_id = Id,
  439. warning_count = W};
  440. #resultset{status = S, warning_count = W} ->
  441. State#state{status = S, warning_count = W};
  442. #prepared{warning_count = W} ->
  443. State#state{warning_count = W};
  444. _Other ->
  445. %% This includes errors.
  446. %% Reset some things. (Note: We don't reset status and insert_id.)
  447. State#state{warning_count = 0, affected_rows = 0}
  448. end,
  449. schedule_ping(State1).
  450. %% @doc Produces a reply for handle_call/3 for queries and prepared statements.
  451. handle_query_call_reply([], _Query, State, ResultSetsAcc) ->
  452. Reply = case ResultSetsAcc of
  453. [] -> ok;
  454. [{ColumnNames, Rows}] -> {ok, ColumnNames, Rows};
  455. [_|_] -> {ok, lists:reverse(ResultSetsAcc)}
  456. end,
  457. {reply, Reply, State};
  458. handle_query_call_reply([Rec|Recs], Query,
  459. #state{transaction_levels = L} = State,
  460. ResultSetsAcc) ->
  461. case Rec of
  462. #ok{status = Status} when Status band ?SERVER_STATUS_IN_TRANS == 0,
  463. L /= [] ->
  464. %% DDL statements (e.g. CREATE TABLE, ALTER TABLE, etc.) result in
  465. %% an implicit commit.
  466. Length = length(L),
  467. Reply = {implicit_commit, Length, Query},
  468. [] = demonitor_processes(L, Length),
  469. {reply, Reply, State#state{transaction_levels = []}};
  470. #ok{} ->
  471. handle_query_call_reply(Recs, Query, State, ResultSetsAcc);
  472. #resultset{cols = ColDefs, rows = Rows} ->
  473. Names = [Def#col.name || Def <- ColDefs],
  474. ResultSetsAcc1 = [{Names, Rows} | ResultSetsAcc],
  475. handle_query_call_reply(Recs, Query, State, ResultSetsAcc1);
  476. #error{code = ?ERROR_DEADLOCK} when L /= [] ->
  477. %% These errors result in an implicit rollback.
  478. Reply = {implicit_rollback, length(L), error_to_reason(Rec)},
  479. %% Everything in the transaction is rolled back, except the BEGIN
  480. %% statement itself. Thus, we are in transaction level 1.
  481. NewMonitors = demonitor_processes(L, length(L) - 1),
  482. {reply, Reply, State#state{transaction_levels = NewMonitors}};
  483. #error{} ->
  484. {reply, {error, error_to_reason(Rec)}, State}
  485. end.
  486. %% @doc Schedules (or re-schedules) ping.
  487. schedule_ping(State = #state{ping_timeout = infinity}) ->
  488. State;
  489. schedule_ping(State = #state{ping_timeout = Timeout, ping_ref = Ref}) ->
  490. is_reference(Ref) andalso erlang:cancel_timer(Ref),
  491. State#state{ping_ref = erlang:send_after(Timeout, self(), ping)}.
  492. %% @doc Fetches and logs warnings. Query is the query that gave the warnings.
  493. log_warnings(#state{socket = Socket, sockmod = SockMod}, Query) ->
  494. setopts(SockMod, Socket, [{active, false}]),
  495. {ok, [#resultset{rows = Rows}]} = mysql_protocol:query(<<"SHOW WARNINGS">>,
  496. SockMod, Socket,
  497. ?cmd_timeout),
  498. setopts(SockMod, Socket, [{active, once}]),
  499. Lines = [[Level, " ", integer_to_binary(Code), ": ", Message, "\n"]
  500. || [Level, Code, Message] <- Rows],
  501. error_logger:warning_msg("~s in ~s~n", [Lines, Query]).
  502. %% @doc Makes a separate connection and execute KILL QUERY. We do this to get
  503. %% our main connection back to normal. KILL QUERY appeared in MySQL 5.0.0.
  504. kill_query(#state{connection_id = ConnId, host = Host, port = Port,
  505. user = User, password = Password, ssl_opts = SSLOpts,
  506. cap_found_rows = SetFoundRows}) ->
  507. %% Connect socket
  508. SockOpts = [{active, false}, binary, {packet, raw}],
  509. {ok, Socket0} = gen_tcp:connect(Host, Port, SockOpts),
  510. %% Exchange handshake communication.
  511. Result = mysql_protocol:handshake(User, Password, undefined, gen_tcp,
  512. SSLOpts, Socket0, SetFoundRows),
  513. case Result of
  514. {ok, #handshake{}, SockMod, Socket} ->
  515. %% Kill and disconnect
  516. IdBin = integer_to_binary(ConnId),
  517. {ok, [#ok{}]} = mysql_protocol:query(<<"KILL QUERY ", IdBin/binary>>,
  518. SockMod, Socket, ?cmd_timeout),
  519. mysql_protocol:quit(SockMod, Socket);
  520. #error{} = E ->
  521. error_logger:error_msg("Failed to connect to kill query: ~p",
  522. [error_to_reason(E)])
  523. end.
  524. stop_server(Reason,
  525. #state{socket = Socket, connection_id = ConnId} = State) ->
  526. error_logger:error_msg("Connection Id ~p closing with reason: ~p~n",
  527. [ConnId, Reason]),
  528. ok = gen_tcp:close(Socket),
  529. {stop, Reason, State#state{socket = undefined, connection_id = undefined}}.
  530. setopts(gen_tcp, Socket, Opts) ->
  531. inet:setopts(Socket, Opts);
  532. setopts(SockMod, Socket, Opts) ->
  533. SockMod:setopts(Socket, Opts).
  534. demonitor_processes(List, 0) ->
  535. List;
  536. demonitor_processes([{_FromPid, MRef}|T], Count) ->
  537. erlang:demonitor(MRef),
  538. demonitor_processes(T, Count - 1).