|
@@ -5,17 +5,17 @@
|
|
-behavior(gen_server).
|
|
-behavior(gen_server).
|
|
|
|
|
|
-export([start_link/0,
|
|
-export([start_link/0,
|
|
- connect/7,
|
|
|
|
|
|
+ connect/5,
|
|
close/1,
|
|
close/1,
|
|
get_parameter/2,
|
|
get_parameter/2,
|
|
- squery/4,
|
|
|
|
- equery/5,
|
|
|
|
- parse/6,
|
|
|
|
- bind/6,
|
|
|
|
- execute/6,
|
|
|
|
- describe/5,
|
|
|
|
- close/5,
|
|
|
|
- sync/3,
|
|
|
|
|
|
+ squery/2,
|
|
|
|
+ equery/3,
|
|
|
|
+ parse/4,
|
|
|
|
+ bind/4,
|
|
|
|
+ execute/4,
|
|
|
|
+ describe/3,
|
|
|
|
+ close/3,
|
|
|
|
+ sync/1,
|
|
cancel/1]).
|
|
cancel/1]).
|
|
|
|
|
|
-export([handle_call/3, handle_cast/2, handle_info/2]).
|
|
-export([handle_call/3, handle_cast/2, handle_info/2]).
|
|
@@ -42,8 +42,8 @@
|
|
start_link() ->
|
|
start_link() ->
|
|
gen_server:start_link(?MODULE, [], []).
|
|
gen_server:start_link(?MODULE, [], []).
|
|
|
|
|
|
-connect(C, From, Ref, Host, Username, Password, Opts) ->
|
|
|
|
- gen_server:cast(C, {connect, From, Ref, Host, Username, Password, Opts}).
|
|
|
|
|
|
+connect(C, Host, Username, Password, Opts) ->
|
|
|
|
+ cast(C, {connect, Host, Username, Password, Opts}).
|
|
|
|
|
|
close(C) when is_pid(C) ->
|
|
close(C) when is_pid(C) ->
|
|
catch gen_server:cast(C, stop),
|
|
catch gen_server:cast(C, stop),
|
|
@@ -52,29 +52,29 @@ close(C) when is_pid(C) ->
|
|
get_parameter(C, Name) ->
|
|
get_parameter(C, Name) ->
|
|
gen_server:call(C, {get_parameter, to_binary(Name)}, infinity).
|
|
gen_server:call(C, {get_parameter, to_binary(Name)}, infinity).
|
|
|
|
|
|
-squery(C, From, Ref, Sql) ->
|
|
|
|
- gen_server:cast(C, {squery, From, Ref, Sql}).
|
|
|
|
|
|
+squery(C, Sql) ->
|
|
|
|
+ cast(C, {squery, Sql}).
|
|
|
|
|
|
-equery(C, From, Ref, Statement, Parameters) ->
|
|
|
|
- gen_server:cast(C, {equery, From, Ref, Statement, Parameters}).
|
|
|
|
|
|
+equery(C, Statement, Parameters) ->
|
|
|
|
+ cast(C, {equery, Statement, Parameters}).
|
|
|
|
|
|
-parse(C, From, Ref, Name, Sql, Types) ->
|
|
|
|
- gen_server:cast(C, {parse, From, Ref, Name, Sql, Types}).
|
|
|
|
|
|
+parse(C, Name, Sql, Types) ->
|
|
|
|
+ cast(C, {parse, Name, Sql, Types}).
|
|
|
|
|
|
-bind(C, From, Ref, Statement, PortalName, Parameters) ->
|
|
|
|
- gen_server:cast(C, {bind, From, Ref, Statement, PortalName, Parameters}).
|
|
|
|
|
|
+bind(C, Statement, PortalName, Parameters) ->
|
|
|
|
+ cast(C, {bind, Statement, PortalName, Parameters}).
|
|
|
|
|
|
-execute(C, From, Ref, Statement, PortalName, MaxRows) ->
|
|
|
|
- gen_server:cast(C, {execute, From, Ref, Statement, PortalName, MaxRows}).
|
|
|
|
|
|
+execute(C, Statement, PortalName, MaxRows) ->
|
|
|
|
+ cast(C, {execute, Statement, PortalName, MaxRows}).
|
|
|
|
|
|
-describe(C, From, Ref, Type, Name) ->
|
|
|
|
- gen_server:cast(C, {describe, From, Ref, Type, Name}).
|
|
|
|
|
|
+describe(C, Type, Name) ->
|
|
|
|
+ cast(C, {describe, Type, Name}).
|
|
|
|
|
|
-close(C, From, Ref, Type, Name) ->
|
|
|
|
- gen_server:cast(C, {close, From, Ref, Type, Name}).
|
|
|
|
|
|
+close(C, Type, Name) ->
|
|
|
|
+ cast(C, {close, Type, Name}).
|
|
|
|
|
|
-sync(C, From, Ref) ->
|
|
|
|
- gen_server:cast(C, {sync, From, Ref}).
|
|
|
|
|
|
+sync(C) ->
|
|
|
|
+ cast(C, sync).
|
|
|
|
|
|
cancel(S) ->
|
|
cancel(S) ->
|
|
gen_server:cast(S, cancel).
|
|
gen_server:cast(S, cancel).
|
|
@@ -91,7 +91,7 @@ handle_call({get_parameter, Name}, From, State) ->
|
|
end,
|
|
end,
|
|
{reply, {ok, Value}, State}.
|
|
{reply, {ok, Value}, State}.
|
|
|
|
|
|
-handle_cast({connect, From, Ref, Host, Username, Password, Opts},
|
|
|
|
|
|
+handle_cast(Req = {{_, _}, {connect, Host, Username, Password, Opts}},
|
|
#state{queue = Queue} = State) ->
|
|
#state{queue = Queue} = State) ->
|
|
Timeout = proplists:get_value(timeout, Opts, 5000),
|
|
Timeout = proplists:get_value(timeout, Opts, 5000),
|
|
Port = proplists:get_value(port, Opts, 5432),
|
|
Port = proplists:get_value(port, Opts, 5432),
|
|
@@ -117,21 +117,21 @@ handle_cast({connect, From, Ref, Host, Username, Password, Opts},
|
|
put(password, Password),
|
|
put(password, Password),
|
|
{noreply,
|
|
{noreply,
|
|
State2#state{handler = auth,
|
|
State2#state{handler = auth,
|
|
- queue = queue:in({connect, From, Ref}, Queue),
|
|
|
|
|
|
+ queue = queue:in(Req, Queue),
|
|
async = Async}};
|
|
async = Async}};
|
|
|
|
|
|
handle_cast(stop, #state{queue = Queue} = State) ->
|
|
handle_cast(stop, #state{queue = Queue} = State) ->
|
|
%% TODO flush queue
|
|
%% TODO flush queue
|
|
{stop, normal, State};
|
|
{stop, normal, State};
|
|
|
|
|
|
-handle_cast({parse, From, Ref, Name, Sql, Types}, State) ->
|
|
|
|
|
|
+handle_cast(Req = {{_, _}, {parse, Name, Sql, Types}}, State) ->
|
|
#state{queue = Queue} = State,
|
|
#state{queue = Queue} = State,
|
|
Bin = pgsql_wire:encode_types(Types),
|
|
Bin = pgsql_wire:encode_types(Types),
|
|
send(State, $P, [Name, 0, Sql, 0, Bin]),
|
|
send(State, $P, [Name, 0, Sql, 0, Bin]),
|
|
send(State, $D, [$S, Name, 0]),
|
|
send(State, $D, [$S, Name, 0]),
|
|
send(State, $H, []),
|
|
send(State, $H, []),
|
|
S = #statement{name = Name},
|
|
S = #statement{name = Name},
|
|
- {noreply, State#state{queue = queue:in({parse, From, Ref}, Queue)}};
|
|
|
|
|
|
+ {noreply, State#state{queue = queue:in(Req, Queue)}};
|
|
|
|
|
|
handle_cast(cancel, State = #state{backend = {Pid, Key}}) ->
|
|
handle_cast(cancel, State = #state{backend = {Pid, Key}}) ->
|
|
{ok, {Addr, Port}} = inet:peername(State#state.sock),
|
|
{ok, {Addr, Port}} = inet:peername(State#state.sock),
|
|
@@ -144,10 +144,12 @@ handle_cast(cancel, State = #state{backend = {Pid, Key}}) ->
|
|
|
|
|
|
handle_info({Closed, Sock}, #state{sock = Sock} = State)
|
|
handle_info({Closed, Sock}, #state{sock = Sock} = State)
|
|
when Closed == tcp_closed; Closed == ssl_closed ->
|
|
when Closed == tcp_closed; Closed == ssl_closed ->
|
|
|
|
+ %% TODO flush queue
|
|
{stop, sock_closed, State};
|
|
{stop, sock_closed, State};
|
|
|
|
|
|
handle_info({Error, Sock, Reason}, #state{sock = Sock} = State)
|
|
handle_info({Error, Sock, Reason}, #state{sock = Sock} = State)
|
|
when Error == tcp_error; Error == ssl_error ->
|
|
when Error == tcp_error; Error == ssl_error ->
|
|
|
|
+ %% TODO flush queue
|
|
{stop, {sock_error, Reason}, State};
|
|
{stop, {sock_error, Reason}, State};
|
|
|
|
|
|
handle_info({_, Sock, Data2}, #state{data = Data, sock = Sock} = State) ->
|
|
handle_info({_, Sock, Data2}, #state{data = Data, sock = Sock} = State) ->
|
|
@@ -167,7 +169,7 @@ loop(#state{data = Data, handler = Handler} = State) ->
|
|
end.
|
|
end.
|
|
|
|
|
|
terminate(_Reason, _State) ->
|
|
terminate(_Reason, _State) ->
|
|
- %% TODO send termination msg, close socket
|
|
|
|
|
|
+ %% TODO send termination msg, close socket ??
|
|
ok.
|
|
ok.
|
|
|
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
@@ -175,6 +177,11 @@ code_change(_OldVsn, State, _Extra) ->
|
|
|
|
|
|
%% -- internal functions --
|
|
%% -- internal functions --
|
|
|
|
|
|
|
|
+cast(C, Command) ->
|
|
|
|
+ Ref = make_ref(),
|
|
|
|
+ gen_server:cast(C, {{self(), Ref}, Command}),
|
|
|
|
+ Ref.
|
|
|
|
+
|
|
start_ssl(S, Flag, Opts, State) ->
|
|
start_ssl(S, Flag, Opts, State) ->
|
|
ok = gen_tcp:send(S, <<8:?int32, 80877103:?int32>>),
|
|
ok = gen_tcp:send(S, <<8:?int32, 80877103:?int32>>),
|
|
Timeout = proplists:get_value(timeout, Opts, 5000),
|
|
Timeout = proplists:get_value(timeout, Opts, 5000),
|