|
@@ -74,7 +74,7 @@ init(Opts) ->
|
|
|
TcpOpts = proplists:get_value(tcp_options, Opts, []),
|
|
|
SetFoundRows = proplists:get_value(found_rows, Opts, false),
|
|
|
SSLOpts = proplists:get_value(ssl, Opts, undefined),
|
|
|
- SockMod0 = mysql_sock_tcp,
|
|
|
+ SockMod0 = gen_tcp,
|
|
|
|
|
|
PingTimeout = case KeepAlive of
|
|
|
true -> ?default_ping_timeout;
|
|
@@ -91,7 +91,7 @@ init(Opts) ->
|
|
|
Socket0, SetFoundRows),
|
|
|
case Result of
|
|
|
{ok, Handshake, SockMod, Socket} ->
|
|
|
- SockMod:setopts(Socket, [{active, once}]),
|
|
|
+ setopts(SockMod, Socket, [{active, once}]),
|
|
|
#handshake{server_version = Version, connection_id = ConnId,
|
|
|
status = Status} = Handshake,
|
|
|
State = #state{server_version = Version, connection_id = ConnId,
|
|
@@ -156,10 +156,9 @@ init(Opts) ->
|
|
|
%% </dl>
|
|
|
handle_call({query, Query}, From, State) ->
|
|
|
handle_call({query, Query, State#state.query_timeout}, From, State);
|
|
|
-handle_call({query, Query, Timeout}, _From, State) ->
|
|
|
- SockMod = State#state.sockmod,
|
|
|
- Socket = State#state.socket,
|
|
|
- SockMod:setopts(Socket, [{active, false}]),
|
|
|
+handle_call({query, Query, Timeout}, _From,
|
|
|
+ #state{sockmod = SockMod, socket = Socket} = State) ->
|
|
|
+ setopts(SockMod, Socket, [{active, false}]),
|
|
|
{ok, Recs} = case mysql_protocol:query(Query, SockMod, Socket, Timeout) of
|
|
|
{error, timeout} when State#state.server_version >= [5, 0, 0] ->
|
|
|
kill_query(State),
|
|
@@ -171,7 +170,7 @@ handle_call({query, Query, Timeout}, _From, State) ->
|
|
|
QueryResult ->
|
|
|
QueryResult
|
|
|
end,
|
|
|
- SockMod:setopts(Socket, [{active, once}]),
|
|
|
+ setopts(SockMod, Socket, [{active, once}]),
|
|
|
State1 = lists:foldl(fun update_state/2, State, Recs),
|
|
|
State1#state.warning_count > 0 andalso State1#state.log_warnings
|
|
|
andalso log_warnings(State1, Query),
|
|
@@ -179,10 +178,10 @@ handle_call({query, Query, Timeout}, _From, State) ->
|
|
|
handle_call({param_query, Query, Params}, From, State) ->
|
|
|
handle_call({param_query, Query, Params, State#state.query_timeout}, From,
|
|
|
State);
|
|
|
-handle_call({param_query, Query, Params, Timeout}, _From, State) ->
|
|
|
+handle_call({param_query, Query, Params, Timeout}, _From,
|
|
|
+ #state{socket = Socket, sockmod = SockMod} = State) ->
|
|
|
%% Parametrized query: Prepared statement cached with the query as the key
|
|
|
QueryBin = iolist_to_binary(Query),
|
|
|
- #state{socket = Socket, sockmod = SockMod} = State,
|
|
|
Cache = State#state.query_cache,
|
|
|
{StmtResult, Cache1} = case mysql_cache:lookup(QueryBin, Cache) of
|
|
|
{found, FoundStmt, NewCache} ->
|
|
@@ -190,10 +189,9 @@ handle_call({param_query, Query, Params, Timeout}, _From, State) ->
|
|
|
{{ok, FoundStmt}, NewCache};
|
|
|
not_found ->
|
|
|
%% Prepare
|
|
|
- SockMod:setopts(Socket, [{active, false}]),
|
|
|
- SockMod = State#state.sockmod,
|
|
|
+ setopts(SockMod, Socket, [{active, false}]),
|
|
|
Rec = mysql_protocol:prepare(Query, SockMod, Socket),
|
|
|
- SockMod:setopts(Socket, [{active, once}]),
|
|
|
+ setopts(SockMod, Socket, [{active, once}]),
|
|
|
case Rec of
|
|
|
#error{} = E ->
|
|
|
{{error, error_to_reason(E)}, Cache};
|
|
@@ -224,10 +222,9 @@ handle_call({execute, Stmt, Args, Timeout}, _From, State) ->
|
|
|
end;
|
|
|
handle_call({prepare, Query}, _From, State) ->
|
|
|
#state{socket = Socket, sockmod = SockMod} = State,
|
|
|
- SockMod:setopts(Socket, [{active, false}]),
|
|
|
- SockMod = State#state.sockmod,
|
|
|
+ setopts(SockMod, Socket, [{active, false}]),
|
|
|
Rec = mysql_protocol:prepare(Query, SockMod, Socket),
|
|
|
- SockMod:setopts(Socket, [{active, once}]),
|
|
|
+ setopts(SockMod, Socket, [{active, once}]),
|
|
|
State1 = update_state(Rec, State),
|
|
|
case Rec of
|
|
|
#error{} = E ->
|
|
@@ -240,8 +237,7 @@ handle_call({prepare, Query}, _From, State) ->
|
|
|
handle_call({prepare, Name, Query}, _From, State) when is_atom(Name) ->
|
|
|
#state{socket = Socket, sockmod = SockMod} = State,
|
|
|
%% First unprepare if there is an old statement with this name.
|
|
|
- SockMod:setopts(Socket, [{active, false}]),
|
|
|
- SockMod = State#state.sockmod,
|
|
|
+ setopts(SockMod, Socket, [{active, false}]),
|
|
|
State1 = case dict:find(Name, State#state.stmts) of
|
|
|
{ok, OldStmt} ->
|
|
|
mysql_protocol:unprepare(OldStmt, SockMod, Socket),
|
|
@@ -250,7 +246,7 @@ handle_call({prepare, Name, Query}, _From, State) when is_atom(Name) ->
|
|
|
State
|
|
|
end,
|
|
|
Rec = mysql_protocol:prepare(Query, SockMod, Socket),
|
|
|
- SockMod:setopts(Socket, [{active, once}]),
|
|
|
+ setopts(SockMod, Socket, [{active, once}]),
|
|
|
State2 = update_state(Rec, State1),
|
|
|
case Rec of
|
|
|
#error{} = E ->
|
|
@@ -265,10 +261,9 @@ handle_call({unprepare, Stmt}, _From, State) when is_atom(Stmt);
|
|
|
case dict:find(Stmt, State#state.stmts) of
|
|
|
{ok, StmtRec} ->
|
|
|
#state{socket = Socket, sockmod = SockMod} = State,
|
|
|
- SockMod:setopts(Socket, [{active, false}]),
|
|
|
- SockMod = State#state.sockmod,
|
|
|
+ setopts(SockMod, Socket, [{active, false}]),
|
|
|
mysql_protocol:unprepare(StmtRec, SockMod, Socket),
|
|
|
- SockMod:setopts(Socket, [{active, once}]),
|
|
|
+ setopts(SockMod, Socket, [{active, once}]),
|
|
|
State1 = State#state{stmts = dict:erase(Stmt, State#state.stmts)},
|
|
|
State2 = schedule_ping(State1),
|
|
|
{reply, ok, State2};
|
|
@@ -297,11 +292,10 @@ handle_call(start_transaction, {FromPid, _},
|
|
|
[] -> <<"BEGIN">>;
|
|
|
_ -> <<"SAVEPOINT s", (integer_to_binary(length(L)))/binary>>
|
|
|
end,
|
|
|
- SockMod:setopts(Socket, [{active, false}]),
|
|
|
- SockMod = State#state.sockmod,
|
|
|
+ setopts(SockMod, Socket, [{active, false}]),
|
|
|
{ok, [Res = #ok{}]} = mysql_protocol:query(Query, SockMod, Socket,
|
|
|
?cmd_timeout),
|
|
|
- SockMod:setopts(Socket, [{active, once}]),
|
|
|
+ setopts(SockMod, Socket, [{active, once}]),
|
|
|
State1 = update_state(Res, State),
|
|
|
{reply, ok, State1#state{transaction_levels = [{FromPid, MRef} | L]}};
|
|
|
handle_call(rollback, {FromPid, _},
|
|
@@ -313,11 +307,10 @@ handle_call(rollback, {FromPid, _},
|
|
|
[] -> <<"ROLLBACK">>;
|
|
|
_ -> <<"ROLLBACK TO s", (integer_to_binary(length(L)))/binary>>
|
|
|
end,
|
|
|
- SockMod:setopts(Socket, [{active, false}]),
|
|
|
- SockMod = State#state.sockmod,
|
|
|
+ setopts(SockMod, Socket, [{active, false}]),
|
|
|
{ok, [Res = #ok{}]} = mysql_protocol:query(Query, SockMod, Socket,
|
|
|
?cmd_timeout),
|
|
|
- SockMod:setopts(Socket, [{active, once}]),
|
|
|
+ setopts(SockMod, Socket, [{active, once}]),
|
|
|
State1 = update_state(Res, State),
|
|
|
{reply, ok, State1#state{transaction_levels = L}};
|
|
|
handle_call(commit, {FromPid, _},
|
|
@@ -329,11 +322,10 @@ handle_call(commit, {FromPid, _},
|
|
|
[] -> <<"COMMIT">>;
|
|
|
_ -> <<"RELEASE SAVEPOINT s", (integer_to_binary(length(L)))/binary>>
|
|
|
end,
|
|
|
- SockMod:setopts(Socket, [{active, false}]),
|
|
|
- SockMod = State#state.sockmod,
|
|
|
+ setopts(SockMod, Socket, [{active, false}]),
|
|
|
{ok, [Res = #ok{}]} = mysql_protocol:query(Query, SockMod, Socket,
|
|
|
?cmd_timeout),
|
|
|
- SockMod:setopts(Socket, [{active, once}]),
|
|
|
+ setopts(SockMod, Socket, [{active, once}]),
|
|
|
State1 = update_state(Res, State),
|
|
|
{reply, ok, State1#state{transaction_levels = L}}.
|
|
|
|
|
@@ -348,13 +340,12 @@ handle_info(query_cache, #state{query_cache = Cache,
|
|
|
{Evicted, Cache1} = mysql_cache:evict_older_than(Cache, CacheTime),
|
|
|
%% Unprepare the evicted statements
|
|
|
#state{socket = Socket, sockmod = SockMod} = State,
|
|
|
- SockMod:setopts(Socket, [{active, false}]),
|
|
|
- SockMod = State#state.sockmod,
|
|
|
+ setopts(SockMod, Socket, [{active, false}]),
|
|
|
lists:foreach(fun ({_Query, Stmt}) ->
|
|
|
mysql_protocol:unprepare(Stmt, SockMod, Socket)
|
|
|
end,
|
|
|
Evicted),
|
|
|
- SockMod:setopts(Socket, [{active, once}]),
|
|
|
+ setopts(SockMod, Socket, [{active, once}]),
|
|
|
%% If nonempty, schedule eviction again.
|
|
|
mysql_cache:size(Cache1) > 0 andalso
|
|
|
erlang:send_after(CacheTime, self(), query_cache),
|
|
@@ -362,10 +353,9 @@ handle_info(query_cache, #state{query_cache = Cache,
|
|
|
handle_info({'DOWN', _MRef, _, Pid, _Info}, State) ->
|
|
|
stop_server({application_process_died, Pid}, State);
|
|
|
handle_info(ping, #state{socket = Socket, sockmod = SockMod} = State) ->
|
|
|
- SockMod:setopts(Socket, [{active, false}]),
|
|
|
- SockMod = State#state.sockmod,
|
|
|
+ setopts(SockMod, Socket, [{active, false}]),
|
|
|
Ok = mysql_protocol:ping(SockMod, Socket),
|
|
|
- SockMod:setopts(Socket, [{active, once}]),
|
|
|
+ setopts(SockMod, Socket, [{active, once}]),
|
|
|
{noreply, update_state(Ok, State)};
|
|
|
handle_info({tcp_closed, _Socket}, State) ->
|
|
|
stop_server(tcp_closed, State);
|
|
@@ -378,7 +368,7 @@ handle_info(_Info, State) ->
|
|
|
terminate(Reason, #state{socket = Socket, sockmod = SockMod})
|
|
|
when Reason == normal; Reason == shutdown ->
|
|
|
%% Send the goodbye message for politeness.
|
|
|
- SockMod:setopts(Socket, [{active, false}]),
|
|
|
+ setopts(SockMod, Socket, [{active, false}]),
|
|
|
mysql_protocol:quit(SockMod, Socket);
|
|
|
terminate(_Reason, _State) ->
|
|
|
ok.
|
|
@@ -393,8 +383,7 @@ code_change(_OldVsn, _State, _Extra) ->
|
|
|
|
|
|
%% @doc Executes a prepared statement and returns {Reply, NextState}.
|
|
|
execute_stmt(Stmt, Args, Timeout, State = #state{socket = Socket, sockmod = SockMod}) ->
|
|
|
- SockMod:setopts(Socket, [{active, false}]),
|
|
|
- SockMod = State#state.sockmod,
|
|
|
+ setopts(SockMod, Socket, [{active, false}]),
|
|
|
{ok, Recs} = case mysql_protocol:execute(Stmt, Args, SockMod, Socket,
|
|
|
Timeout) of
|
|
|
{error, timeout} when State#state.server_version >= [5, 0, 0] ->
|
|
@@ -408,7 +397,7 @@ execute_stmt(Stmt, Args, Timeout, State = #state{socket = Socket, sockmod = Sock
|
|
|
QueryResult ->
|
|
|
QueryResult
|
|
|
end,
|
|
|
- SockMod:setopts(Socket, [{active, once}]),
|
|
|
+ setopts(SockMod, Socket, [{active, once}]),
|
|
|
State1 = lists:foldl(fun update_state/2, State, Recs),
|
|
|
State1#state.warning_count > 0 andalso State1#state.log_warnings
|
|
|
andalso log_warnings(State1, Stmt#prepared.orig_query),
|
|
@@ -483,13 +472,12 @@ schedule_ping(State = #state{ping_timeout = Timeout, ping_ref = Ref}) ->
|
|
|
State#state{ping_ref = erlang:send_after(Timeout, self(), ping)}.
|
|
|
|
|
|
%% @doc Fetches and logs warnings. Query is the query that gave the warnings.
|
|
|
-log_warnings(#state{socket = Socket, sockmod = SockMod} = State, Query) ->
|
|
|
- SockMod:setopts(Socket, [{active, false}]),
|
|
|
- SockMod = State#state.sockmod,
|
|
|
+log_warnings(#state{socket = Socket, sockmod = SockMod}, Query) ->
|
|
|
+ setopts(SockMod, Socket, [{active, false}]),
|
|
|
{ok, [#resultset{rows = Rows}]} = mysql_protocol:query(<<"SHOW WARNINGS">>,
|
|
|
SockMod, Socket,
|
|
|
?cmd_timeout),
|
|
|
- SockMod:setopts(Socket, [{active, once}]),
|
|
|
+ setopts(SockMod, Socket, [{active, once}]),
|
|
|
Lines = [[Level, " ", integer_to_binary(Code), ": ", Message, "\n"]
|
|
|
|| [Level, Code, Message] <- Rows],
|
|
|
error_logger:warning_msg("~s in ~s~n", [Lines, Query]).
|
|
@@ -501,10 +489,10 @@ kill_query(#state{connection_id = ConnId, host = Host, port = Port,
|
|
|
cap_found_rows = SetFoundRows}) ->
|
|
|
%% Connect socket
|
|
|
SockOpts = [{active, false}, binary, {packet, raw}],
|
|
|
- {ok, Socket0} = mysql_sock_tcp:connect(Host, Port, SockOpts),
|
|
|
+ {ok, Socket0} = gen_tcp:connect(Host, Port, SockOpts),
|
|
|
|
|
|
%% Exchange handshake communication.
|
|
|
- Result = mysql_protocol:handshake(User, Password, undefined, mysql_sock_tcp,
|
|
|
+ Result = mysql_protocol:handshake(User, Password, undefined, gen_tcp,
|
|
|
SSLOpts, Socket0, SetFoundRows),
|
|
|
case Result of
|
|
|
{ok, #handshake{}, SockMod, Socket} ->
|
|
@@ -525,6 +513,11 @@ stop_server(Reason,
|
|
|
ok = gen_tcp:close(Socket),
|
|
|
{stop, Reason, State#state{socket = undefined, connection_id = undefined}}.
|
|
|
|
|
|
+setopts(gen_tcp, Socket, Opts) ->
|
|
|
+ inet:setopts(Socket, Opts);
|
|
|
+setopts(SockMod, Socket, Opts) ->
|
|
|
+ SockMod:setopts(Socket, Opts).
|
|
|
+
|
|
|
demonitor_processes(List, 0) ->
|
|
|
List;
|
|
|
demonitor_processes([{_FromPid, MRef}|T], Count) ->
|