|
@@ -99,16 +99,62 @@ handle_call({get_parameter, Name}, _From, State) ->
|
|
|
{value, {Name, Value}} -> Value;
|
|
|
false -> Value = undefined
|
|
|
end,
|
|
|
- {reply, {ok, Value}, State}.
|
|
|
+ {reply, {ok, Value}, State};
|
|
|
|
|
|
-%% TODO request formats
|
|
|
-handle_cast({{cast, From, Ref}, Command}, State = #state{sync_required = true})
|
|
|
- when Command /= sync ->
|
|
|
- From ! {Ref, {error, sync_required}},
|
|
|
- {noreply, State};
|
|
|
+handle_call(Command, From, State) ->
|
|
|
+ #state{queue = Q} = State,
|
|
|
+ Req = {{call, From}, Command},
|
|
|
+ command(Command, State#state{queue = queue:in(Req, Q)}).
|
|
|
|
|
|
-handle_cast(Req = {_, {connect, Host, Username, Password, Opts}}, State) ->
|
|
|
+handle_cast(Req = {{From, Ref}, Command}, State) ->
|
|
|
#state{queue = Q} = State,
|
|
|
+ Req = {{cast, From, Ref}, Command},
|
|
|
+ command(Command, State#state{queue = queue:in(Req, Q)});
|
|
|
+
|
|
|
+handle_cast(stop, State) ->
|
|
|
+ {stop, normal, flush_queue(State, {error, closed})};
|
|
|
+
|
|
|
+handle_cast(cancel, State = #state{backend = {Pid, Key}}) ->
|
|
|
+ {ok, {Addr, Port}} = inet:peername(State#state.sock),
|
|
|
+ SockOpts = [{active, false}, {packet, raw}, binary],
|
|
|
+ %% TODO timeout
|
|
|
+ {ok, Sock} = gen_tcp:connect(Addr, Port, SockOpts),
|
|
|
+ Msg = <<16:?int32, 80877102:?int32, Pid:?int32, Key:?int32>>,
|
|
|
+ ok = gen_tcp:send(Sock, Msg),
|
|
|
+ gen_tcp:close(Sock),
|
|
|
+ {noreply, State}.
|
|
|
+
|
|
|
+handle_info({Closed, Sock}, #state{sock = Sock} = State)
|
|
|
+ when Closed == tcp_closed; Closed == ssl_closed ->
|
|
|
+ {stop, sock_closed, flush_queue(State, {error, sock_closed})};
|
|
|
+
|
|
|
+handle_info({Error, Sock, Reason}, #state{sock = Sock} = State)
|
|
|
+ when Error == tcp_error; Error == ssl_error ->
|
|
|
+ Why = {sock_error, Reason},
|
|
|
+ {stop, Why, flush_queue(State, {error, Why})};
|
|
|
+
|
|
|
+handle_info({_, Sock, Data2}, #state{data = Data, sock = Sock} = State) ->
|
|
|
+ loop(State#state{data = <<Data/binary, Data2/binary>>}).
|
|
|
+
|
|
|
+terminate(_Reason, _State) ->
|
|
|
+ %% TODO send termination msg, close socket ??
|
|
|
+ ok.
|
|
|
+
|
|
|
+code_change(_OldVsn, State, _Extra) ->
|
|
|
+ {ok, State}.
|
|
|
+
|
|
|
+%% -- internal functions --
|
|
|
+
|
|
|
+cast(C, Command) ->
|
|
|
+ Ref = make_ref(),
|
|
|
+ gen_server:cast(C, {{self(), Ref}, Command}),
|
|
|
+ Ref.
|
|
|
+
|
|
|
+command(Command, State = #state{sync_required = true})
|
|
|
+ when Command /= sync ->
|
|
|
+ {noreply, finish(State, {error, sync_required})};
|
|
|
+
|
|
|
+command({connect, Host, Username, Password, Opts}, State) ->
|
|
|
Timeout = proplists:get_value(timeout, Opts, 5000),
|
|
|
Port = proplists:get_value(port, Opts, 5432),
|
|
|
SockOpts = [{active, false}, {packet, raw}, binary, {nodelay, true}],
|
|
@@ -133,22 +179,16 @@ handle_cast(Req = {_, {connect, Host, Username, Password, Opts}}, State) ->
|
|
|
put(password, Password),
|
|
|
{noreply,
|
|
|
State2#state{handler = auth,
|
|
|
- queue = queue:in(Req, Q),
|
|
|
async = Async}};
|
|
|
|
|
|
-handle_cast(stop, State) ->
|
|
|
- {stop, normal, flush_queue(State, {error, closed})};
|
|
|
-
|
|
|
-handle_cast(Req = {_, {squery, Sql}}, State) ->
|
|
|
- #state{queue = Q} = State,
|
|
|
+command({squery, Sql}, State) ->
|
|
|
send(State, $Q, [Sql, 0]),
|
|
|
- {noreply, State#state{queue = queue:in(Req, Q)}};
|
|
|
+ {noreply, State};
|
|
|
|
|
|
%% TODO add fast_equery command that doesn't need parsed statement,
|
|
|
%% uses default (text) column format,
|
|
|
%% sends Describe after Bind to get RowDescription
|
|
|
-handle_cast(Req = {_, {equery, Statement, Parameters}}, State) ->
|
|
|
- #state{queue = Q} = State,
|
|
|
+command({equery, Statement, Parameters}, State) ->
|
|
|
#statement{name = StatementName, columns = Columns} = Statement,
|
|
|
Bin1 = pgsql_wire:encode_parameters(Parameters),
|
|
|
Bin2 = pgsql_wire:encode_formats(Columns),
|
|
@@ -156,94 +196,52 @@ handle_cast(Req = {_, {equery, Statement, Parameters}}, State) ->
|
|
|
send(State, $E, ["", 0, <<0:?int32>>]),
|
|
|
send(State, $C, [$S, "", 0]),
|
|
|
send(State, $S, []),
|
|
|
- {noreply, State#state{queue = queue:in(Req, Q)}};
|
|
|
+ {noreply, State};
|
|
|
|
|
|
-handle_cast(Req = {_, {parse, Name, Sql, Types}}, State) ->
|
|
|
- #state{queue = Q} = State,
|
|
|
+command({parse, Name, Sql, Types}, State) ->
|
|
|
Bin = pgsql_wire:encode_types(Types),
|
|
|
send(State, $P, [Name, 0, Sql, 0, Bin]),
|
|
|
send(State, $D, [$S, Name, 0]),
|
|
|
send(State, $H, []),
|
|
|
- {noreply, State#state{queue = queue:in(Req, Q)}};
|
|
|
+ {noreply, State};
|
|
|
|
|
|
-handle_cast(Req = {_, {bind, Statement, PortalName, Parameters}}, State) ->
|
|
|
- #state{queue = Q} = State,
|
|
|
+command({bind, Statement, PortalName, Parameters}, State) ->
|
|
|
#statement{name = StatementName, columns = Columns, types = Types} = Statement,
|
|
|
Typed_Parameters = lists:zip(Types, Parameters),
|
|
|
Bin1 = pgsql_wire:encode_parameters(Typed_Parameters),
|
|
|
Bin2 = pgsql_wire:encode_formats(Columns),
|
|
|
send(State, $B, [PortalName, 0, StatementName, 0, Bin1, Bin2]),
|
|
|
send(State, $H, []),
|
|
|
- {noreply, State#state{queue = queue:in(Req, Q)}};
|
|
|
+ {noreply, State};
|
|
|
|
|
|
-handle_cast(Req = {_, {execute, _, PortalName, MaxRows}}, State) ->
|
|
|
- #state{queue = Q} = State,
|
|
|
+%% TODO unused parameter?
|
|
|
+command({execute, _, PortalName, MaxRows}, State) ->
|
|
|
send(State, $E, [PortalName, 0, <<MaxRows:?int32>>]),
|
|
|
send(State, $H, []),
|
|
|
- {noreply, State#state{queue = queue:in(Req, Q)}};
|
|
|
+ {noreply, State};
|
|
|
|
|
|
-handle_cast(Req = {_, {describe_statement, Name}}, State) ->
|
|
|
- #state{queue = Q} = State,
|
|
|
+command({describe_statement, Name}, State) ->
|
|
|
send(State, $D, [$S, Name, 0]),
|
|
|
send(State, $H, []),
|
|
|
- {noreply, State#state{queue = queue:in(Req, Q)}};
|
|
|
+ {noreply, State};
|
|
|
|
|
|
-handle_cast(Req = {_, {describe_portal, Name}}, State) ->
|
|
|
- #state{queue = Q} = State,
|
|
|
+command({describe_portal, Name}, State) ->
|
|
|
send(State, $D, [$P, Name, 0]),
|
|
|
send(State, $H, []),
|
|
|
- {noreply, State#state{queue = queue:in(Req, Q)}};
|
|
|
+ {noreply, State};
|
|
|
|
|
|
-handle_cast(Req = {_, {close, Type, Name}}, State) ->
|
|
|
- #state{queue = Q} = State,
|
|
|
+command({close, Type, Name}, State) ->
|
|
|
case Type of
|
|
|
statement -> Type2 = $S;
|
|
|
portal -> Type2 = $P
|
|
|
end,
|
|
|
send(State, $C, [Type2, Name, 0]),
|
|
|
send(State, $H, []),
|
|
|
- {noreply, State#state{queue = queue:in(Req, Q)}};
|
|
|
+ {noreply, State};
|
|
|
|
|
|
-handle_cast(Req = {_, sync}, State) ->
|
|
|
- #state{queue = Q} = State,
|
|
|
+command(sync, State) ->
|
|
|
send(State, $S, []),
|
|
|
- {noreply, State#state{queue = queue:in(Req, Q), sync_required = false}};
|
|
|
-
|
|
|
-handle_cast(cancel, State = #state{backend = {Pid, Key}}) ->
|
|
|
- {ok, {Addr, Port}} = inet:peername(State#state.sock),
|
|
|
- SockOpts = [{active, false}, {packet, raw}, binary],
|
|
|
- %% TODO timeout
|
|
|
- {ok, Sock} = gen_tcp:connect(Addr, Port, SockOpts),
|
|
|
- Msg = <<16:?int32, 80877102:?int32, Pid:?int32, Key:?int32>>,
|
|
|
- ok = gen_tcp:send(Sock, Msg),
|
|
|
- gen_tcp:close(Sock),
|
|
|
- {noreply, State}.
|
|
|
-
|
|
|
-handle_info({Closed, Sock}, #state{sock = Sock} = State)
|
|
|
- when Closed == tcp_closed; Closed == ssl_closed ->
|
|
|
- {stop, sock_closed, flush_queue(State, {error, sock_closed})};
|
|
|
-
|
|
|
-handle_info({Error, Sock, Reason}, #state{sock = Sock} = State)
|
|
|
- when Error == tcp_error; Error == ssl_error ->
|
|
|
- Why = {sock_error, Reason},
|
|
|
- {stop, Why, flush_queue(State, {error, Why})};
|
|
|
-
|
|
|
-handle_info({_, Sock, Data2}, #state{data = Data, sock = Sock} = State) ->
|
|
|
- loop(State#state{data = <<Data/binary, Data2/binary>>}).
|
|
|
-
|
|
|
-terminate(_Reason, _State) ->
|
|
|
- %% TODO send termination msg, close socket ??
|
|
|
- ok.
|
|
|
-
|
|
|
-code_change(_OldVsn, State, _Extra) ->
|
|
|
- {ok, State}.
|
|
|
-
|
|
|
-%% -- internal functions --
|
|
|
-
|
|
|
-cast(C, Command) ->
|
|
|
- Ref = make_ref(),
|
|
|
- gen_server:cast(C, {{cast, self(), Ref}, Command}),
|
|
|
- Ref.
|
|
|
+ {noreply, State#state{sync_required = false}}.
|
|
|
|
|
|
start_ssl(S, Flag, Opts, State) ->
|
|
|
ok = gen_tcp:send(S, <<8:?int32, 80877103:?int32>>),
|