|
@@ -34,7 +34,6 @@
|
|
queue = queue:new(),
|
|
queue = queue:new(),
|
|
async,
|
|
async,
|
|
ready,
|
|
ready,
|
|
- timeout = 5000,
|
|
|
|
parameters = [],
|
|
parameters = [],
|
|
txstatus}).
|
|
txstatus}).
|
|
|
|
|
|
@@ -44,35 +43,38 @@ start_link() ->
|
|
gen_server:start_link(?MODULE, [], []).
|
|
gen_server:start_link(?MODULE, [], []).
|
|
|
|
|
|
connect(C, Host, Username, Password, Opts) ->
|
|
connect(C, Host, Username, Password, Opts) ->
|
|
- gen_server:call(C, {connect, Host, Username, Password, Opts}, infinity).
|
|
|
|
|
|
+ gen_server:cast(C, {connect, Host, Username, Password, Opts}).
|
|
|
|
|
|
close(C) when is_pid(C) ->
|
|
close(C) when is_pid(C) ->
|
|
- catch gen_server:call(C, stop, infinity),
|
|
|
|
|
|
+ catch gen_server:cast(C, stop),
|
|
ok.
|
|
ok.
|
|
|
|
|
|
get_parameter(C, Name) ->
|
|
get_parameter(C, Name) ->
|
|
gen_server:call(C, {get_parameter, to_binary(Name)}).
|
|
gen_server:call(C, {get_parameter, to_binary(Name)}).
|
|
|
|
|
|
squery(C, Sql) ->
|
|
squery(C, Sql) ->
|
|
- gen_server:call(C, {squery, Sql}, infinity).
|
|
|
|
|
|
+ gen_server:cast(C, {squery, Sql}).
|
|
|
|
+
|
|
|
|
+equery(C, Statement, Parameters) ->
|
|
|
|
+ gen_server:cast(C, {equery, Statement, Parameters}).
|
|
|
|
|
|
parse(C, Name, Sql, Types) ->
|
|
parse(C, Name, Sql, Types) ->
|
|
- gen_server:call(C, {parse, Name, Sql, Types}, infinity).
|
|
|
|
|
|
+ gen_server:cast(C, {parse, Name, Sql, Types}).
|
|
|
|
|
|
bind(C, Statement, PortalName, Parameters) ->
|
|
bind(C, Statement, PortalName, Parameters) ->
|
|
- gen_server:call(C, {bind, Statement, PortalName, Parameters}, infinity).
|
|
|
|
|
|
+ gen_server:cast(C, {bind, Statement, PortalName, Parameters}).
|
|
|
|
|
|
execute(C, Statement, PortalName, MaxRows) ->
|
|
execute(C, Statement, PortalName, MaxRows) ->
|
|
- gen_server:call(C, {execute, Statement, PortalName, MaxRows}, infinity).
|
|
|
|
|
|
+ gen_server:cast(C, {execute, Statement, PortalName, MaxRows}).
|
|
|
|
|
|
describe(C, Type, Name) ->
|
|
describe(C, Type, Name) ->
|
|
- gen_server:call(C, {describe, Type, Name}, infinity).
|
|
|
|
|
|
+ gen_server:cast(C, {describe, Type, Name}).
|
|
|
|
|
|
close(C, Type, Name) ->
|
|
close(C, Type, Name) ->
|
|
- gen_server:call(C, {close, Type, Name}, infinity).
|
|
|
|
|
|
+ gen_server:cast(C, {close, Type, Name}).
|
|
|
|
|
|
sync(C) ->
|
|
sync(C) ->
|
|
- gen_server:call(C, sync, infinity).
|
|
|
|
|
|
+ gen_server:cast(C, sync).
|
|
|
|
|
|
cancel(S) ->
|
|
cancel(S) ->
|
|
gen_server:cast(S, cancel).
|
|
gen_server:cast(S, cancel).
|
|
@@ -85,7 +87,6 @@ init([]) ->
|
|
handle_call({connect, Host, Username, Password, Opts},
|
|
handle_call({connect, Host, Username, Password, Opts},
|
|
From,
|
|
From,
|
|
#state{queue = Queue} = State) ->
|
|
#state{queue = Queue} = State) ->
|
|
- %% TODO split connect/query timeout?
|
|
|
|
Timeout = proplists:get_value(timeout, Opts, 5000),
|
|
Timeout = proplists:get_value(timeout, Opts, 5000),
|
|
Port = proplists:get_value(port, Opts, 5432),
|
|
Port = proplists:get_value(port, Opts, 5432),
|
|
SockOpts = [{active, false}, {packet, raw}, binary, {nodelay, true}],
|
|
SockOpts = [{active, false}, {packet, raw}, binary, {nodelay, true}],
|
|
@@ -111,22 +112,21 @@ handle_call({connect, Host, Username, Password, Opts},
|
|
{noreply,
|
|
{noreply,
|
|
State2#state{handler = auth,
|
|
State2#state{handler = auth,
|
|
queue = queue:in(From, Queue),
|
|
queue = queue:in(From, Queue),
|
|
- async = Async},
|
|
|
|
- Timeout};
|
|
|
|
|
|
+ async = Async}};
|
|
|
|
|
|
handle_call(stop, From, #state{queue = Queue} = State) ->
|
|
handle_call(stop, From, #state{queue = Queue} = State) ->
|
|
%% TODO flush queue
|
|
%% TODO flush queue
|
|
{stop, normal, ok, State};
|
|
{stop, normal, ok, State};
|
|
|
|
|
|
handle_call({parse, Name, Sql, Types}, From, State) ->
|
|
handle_call({parse, Name, Sql, Types}, From, State) ->
|
|
- #state{timeout = Timeout, queue = Queue} = State,
|
|
|
|
|
|
+ #state{queue = Queue} = State,
|
|
Bin = pgsql_wire:encode_types(Types),
|
|
Bin = pgsql_wire:encode_types(Types),
|
|
send(State, $P, [Name, 0, Sql, 0, Bin]),
|
|
send(State, $P, [Name, 0, Sql, 0, Bin]),
|
|
send(State, $D, [$S, Name, 0]),
|
|
send(State, $D, [$S, Name, 0]),
|
|
send(State, $H, []),
|
|
send(State, $H, []),
|
|
S = #statement{name = Name},
|
|
S = #statement{name = Name},
|
|
State2 = State#state{queue = queue:in(From, Queue)},
|
|
State2 = State#state{queue = queue:in(From, Queue)},
|
|
- {noreply, State2, Timeout}.
|
|
|
|
|
|
+ {noreply, State2}.
|
|
|
|
|
|
handle_cast(cancel, State = #state{backend = {Pid, Key}}) ->
|
|
handle_cast(cancel, State = #state{backend = {Pid, Key}}) ->
|
|
{ok, {Addr, Port}} = inet:peername(State#state.sock),
|
|
{ok, {Addr, Port}} = inet:peername(State#state.sock),
|
|
@@ -145,25 +145,20 @@ handle_info({Error, Sock, Reason}, #state{sock = Sock} = State)
|
|
when Error == tcp_error; Error == ssl_error ->
|
|
when Error == tcp_error; Error == ssl_error ->
|
|
{stop, {sock_error, Reason}, State};
|
|
{stop, {sock_error, Reason}, State};
|
|
|
|
|
|
-handle_info(timeout, #state{handler = Handler} = State) ->
|
|
|
|
- ?MODULE:Handler(timeout, State);
|
|
|
|
-
|
|
|
|
handle_info({_, Sock, Data2}, #state{data = Data, sock = Sock} = State) ->
|
|
handle_info({_, Sock, Data2}, #state{data = Data, sock = Sock} = State) ->
|
|
- loop(State#state{data = <<Data/binary, Data2/binary>>}, infinity).
|
|
|
|
|
|
+ loop(State#state{data = <<Data/binary, Data2/binary>>}).
|
|
|
|
|
|
-loop(#state{data = Data, handler = Handler} = State, Timeout) ->
|
|
|
|
|
|
+loop(#state{data = Data, handler = Handler} = State) ->
|
|
case pgsql_wire:decode_message(Data) of
|
|
case pgsql_wire:decode_message(Data) of
|
|
{Message, Tail} ->
|
|
{Message, Tail} ->
|
|
case ?MODULE:Handler(Message, State#state{data = Tail}) of
|
|
case ?MODULE:Handler(Message, State#state{data = Tail}) of
|
|
{noreply, State2} ->
|
|
{noreply, State2} ->
|
|
- loop(State2, infinity);
|
|
|
|
- {noreply, State2, Timeout2} ->
|
|
|
|
- loop(State2, Timeout2);
|
|
|
|
|
|
+ loop(State2);
|
|
R = {stop, _Reason2, _State2} ->
|
|
R = {stop, _Reason2, _State2} ->
|
|
R
|
|
R
|
|
end;
|
|
end;
|
|
_ ->
|
|
_ ->
|
|
- {noreply, State, Timeout}
|
|
|
|
|
|
+ {noreply, State}
|
|
end.
|
|
end.
|
|
|
|
|
|
terminate(_Reason, _State) ->
|
|
terminate(_Reason, _State) ->
|
|
@@ -175,8 +170,9 @@ code_change(_OldVsn, State, _Extra) ->
|
|
|
|
|
|
%% -- internal functions --
|
|
%% -- internal functions --
|
|
|
|
|
|
-start_ssl(S, Flag, Opts, #state{timeout = Timeout} = State) ->
|
|
|
|
|
|
+start_ssl(S, Flag, Opts, State) ->
|
|
ok = gen_tcp:send(S, <<8:?int32, 80877103:?int32>>),
|
|
ok = gen_tcp:send(S, <<8:?int32, 80877103:?int32>>),
|
|
|
|
+ Timeout = proplists:get_value(timeout, Opts, 5000),
|
|
{ok, <<Code>>} = gen_tcp:recv(S, 1, Timeout),
|
|
{ok, <<Code>>} = gen_tcp:recv(S, 1, Timeout),
|
|
case Code of
|
|
case Code of
|
|
$S ->
|
|
$S ->
|
|
@@ -216,22 +212,19 @@ notify_async(#state{async = Pid}, Msg) ->
|
|
|
|
|
|
%% AuthenticationOk
|
|
%% AuthenticationOk
|
|
auth({$R, <<0:?int32>>}, State) ->
|
|
auth({$R, <<0:?int32>>}, State) ->
|
|
- #state{timeout = Timeout} = State,
|
|
|
|
- {noreply, State#state{handler = initializing}, Timeout};
|
|
|
|
|
|
+ {noreply, State#state{handler = initializing}};
|
|
|
|
|
|
%% AuthenticationCleartextPassword
|
|
%% AuthenticationCleartextPassword
|
|
auth({$R, <<3:?int32>>}, State) ->
|
|
auth({$R, <<3:?int32>>}, State) ->
|
|
- #state{timeout = Timeout} = State,
|
|
|
|
send(State, $p, [get(password), 0]),
|
|
send(State, $p, [get(password), 0]),
|
|
- {noreply, State, Timeout};
|
|
|
|
|
|
+ {noreply, State};
|
|
|
|
|
|
%% AuthenticationMD5Password
|
|
%% AuthenticationMD5Password
|
|
auth({$R, <<5:?int32, Salt:4/binary>>}, State) ->
|
|
auth({$R, <<5:?int32, Salt:4/binary>>}, State) ->
|
|
- #state{timeout = Timeout} = State,
|
|
|
|
Digest1 = hex(erlang:md5([get(password), get(username)])),
|
|
Digest1 = hex(erlang:md5([get(password), get(username)])),
|
|
Str = ["md5", hex(erlang:md5([Digest1, Salt])), 0],
|
|
Str = ["md5", hex(erlang:md5([Digest1, Salt])), 0],
|
|
send(State, $p, Str),
|
|
send(State, $p, Str),
|
|
- {noreply, State, Timeout};
|
|
|
|
|
|
+ {noreply, State};
|
|
|
|
|
|
auth({$R, <<M:?int32, _/binary>>}, State) ->
|
|
auth({$R, <<M:?int32, _/binary>>}, State) ->
|
|
case M of
|
|
case M of
|
|
@@ -255,22 +248,13 @@ auth({error, E}, State) ->
|
|
end,
|
|
end,
|
|
{stop, normal, gen_reply(State, {error, Why})};
|
|
{stop, normal, gen_reply(State, {error, Why})};
|
|
|
|
|
|
-auth(timeout, State) ->
|
|
|
|
- {stop, normal, gen_reply(State, {error, timeout})};
|
|
|
|
-
|
|
|
|
auth(Other, State) ->
|
|
auth(Other, State) ->
|
|
- #state{timeout = Timeout} = State,
|
|
|
|
- {noreply, State2} = on_message(Other, State),
|
|
|
|
- {noreply, State2, Timeout}.
|
|
|
|
|
|
+ on_message(Other, State).
|
|
|
|
|
|
%% BackendKeyData
|
|
%% BackendKeyData
|
|
initializing({$K, <<Pid:?int32, Key:?int32>>}, State) ->
|
|
initializing({$K, <<Pid:?int32, Key:?int32>>}, State) ->
|
|
- #state{timeout = Timeout} = State,
|
|
|
|
State2 = State#state{backend = {Pid, Key}},
|
|
State2 = State#state{backend = {Pid, Key}},
|
|
- {noreply, State2, Timeout};
|
|
|
|
-
|
|
|
|
-initializing(timeout, State) ->
|
|
|
|
- {stop, normal, gen_reply(State, {error, timeout})};
|
|
|
|
|
|
+ {noreply, State2};
|
|
|
|
|
|
%% ReadyForQuery
|
|
%% ReadyForQuery
|
|
initializing({$Z, <<Status:8>>}, State) ->
|
|
initializing({$Z, <<Status:8>>}, State) ->
|
|
@@ -291,9 +275,7 @@ initializing({error, _} = Error, State) ->
|
|
{stop, normal, gen_reply(State, Error)};
|
|
{stop, normal, gen_reply(State, Error)};
|
|
|
|
|
|
initializing(Other, State) ->
|
|
initializing(Other, State) ->
|
|
- #state{timeout = Timeout} = State,
|
|
|
|
- {noreply, State2} = on_message(Other, State),
|
|
|
|
- {noreply, State2, Timeout}.
|
|
|
|
|
|
+ on_message(Other, State).
|
|
|
|
|
|
%% NoticeResponse
|
|
%% NoticeResponse
|
|
on_message({$N, Data}, State) ->
|
|
on_message({$N, Data}, State) ->
|