|
@@ -35,7 +35,9 @@
|
|
|
queue = queue:new(),
|
|
|
async,
|
|
|
parameters = [],
|
|
|
+ statement,
|
|
|
columns,
|
|
|
+ rows = [],
|
|
|
sync_required,
|
|
|
txstatus}).
|
|
|
|
|
@@ -69,8 +71,11 @@ bind(C, Statement, PortalName, Parameters) ->
|
|
|
execute(C, Statement, PortalName, MaxRows) ->
|
|
|
cast(C, {execute, Statement, PortalName, MaxRows}).
|
|
|
|
|
|
-describe(C, Type, Name) ->
|
|
|
- cast(C, {describe, Type, Name}).
|
|
|
+describe(C, statement, Name) ->
|
|
|
+ cast(C, {describe_statement, Name});
|
|
|
+
|
|
|
+describe(C, portal, Name) ->
|
|
|
+ cast(C, {describe_portal, Name}).
|
|
|
|
|
|
close(C, Type, Name) ->
|
|
|
cast(C, {close, Type, Name}).
|
|
@@ -174,13 +179,15 @@ handle_cast(Req = {_, {execute, _, PortalName, MaxRows}}, State) ->
|
|
|
send(State, $H, []),
|
|
|
{noreply, State#state{queue = queue:in(Req, Q)}};
|
|
|
|
|
|
-handle_cast(Req = {_, {describe, Type, Name}}, State) ->
|
|
|
+handle_cast(Req = {_, {describe_statement, Name}}, State) ->
|
|
|
#state{queue = Q} = State,
|
|
|
- case Type of
|
|
|
- statement -> Type2 = $S;
|
|
|
- portal -> Type2 = $P
|
|
|
- end,
|
|
|
- send(State, $D, [Type2, Name, 0]),
|
|
|
+ send(State, $D, [$S, Name, 0]),
|
|
|
+ send(State, $H, []),
|
|
|
+ {noreply, State#state{queue = queue:in(Req, Q)}};
|
|
|
+
|
|
|
+handle_cast(Req = {_, {describe_portal, Name}}, State) ->
|
|
|
+ #state{queue = Q} = State,
|
|
|
+ send(State, $D, [$P, Name, 0]),
|
|
|
send(State, $H, []),
|
|
|
{noreply, State#state{queue = queue:in(Req, Q)}};
|
|
|
|
|
@@ -367,20 +374,28 @@ on_message({$1, <<>>}, State) ->
|
|
|
|
|
|
%% ParameterDescription
|
|
|
on_message({$t, <<_Count:?int16, Bin/binary>>}, State) ->
|
|
|
+ #state{queue = Q} = State,
|
|
|
Types = [pgsql_types:oid2type(Oid) || <<Oid:?int32>> <= Bin],
|
|
|
- notify(State, {types, Types}),
|
|
|
- {noreply, State};
|
|
|
+ Name = case queue:get(Q) of
|
|
|
+ {_, {parse, N, _, _}} -> N;
|
|
|
+ {_, {describe_statement, N}} -> N
|
|
|
+ end,
|
|
|
+ {noreply, State#state{statement = #statement{name = Name,
|
|
|
+ types = Types}}};
|
|
|
|
|
|
%% RowDescription
|
|
|
on_message({$T, <<Count:?int16, Bin/binary>>}, State) ->
|
|
|
#state{queue = Q} = State,
|
|
|
Columns = pgsql_wire:decode_columns(Count, Bin),
|
|
|
- notify(State, {columns, Columns}),
|
|
|
State2 = case request_tag(State) of
|
|
|
C when C == squery ->
|
|
|
- %% TODO drop them on completion
|
|
|
State#state{columns = Columns};
|
|
|
- C when C == parse; C == describe ->
|
|
|
+ C when C == parse; C == describe_statement ->
|
|
|
+ notify(State,
|
|
|
+ State#state.statement#statement{columns= Columns}),
|
|
|
+ State#state{statement = undefined, queue = queue:drop(Q)};
|
|
|
+ C when C == describe_portal ->
|
|
|
+ notify(State, Columns),
|
|
|
State#state{queue = queue:drop(Q)}
|
|
|
end,
|
|
|
{noreply, State2};
|
|
@@ -421,7 +436,6 @@ on_message({$3, <<>>}, State) ->
|
|
|
on_message({$D, <<_Count:?int16, Bin/binary>>}, State) ->
|
|
|
#state{queue = Q} = State,
|
|
|
Columns = case queue:get(Q) of
|
|
|
- %% TODO use colums from State
|
|
|
{_, {equery, #statement{columns = C}, _}} ->
|
|
|
C;
|
|
|
{_, {execute, #statement{columns = C}, _, _}} ->
|
|
@@ -430,8 +444,7 @@ on_message({$D, <<_Count:?int16, Bin/binary>>}, State) ->
|
|
|
State#state.columns
|
|
|
end,
|
|
|
Data = pgsql_wire:decode_data(Columns, Bin),
|
|
|
- notify(State, {data, Data}),
|
|
|
- {noreply, State};
|
|
|
+ {noreply, State#state{rows = [Data, State#state.rows]}};
|
|
|
|
|
|
%% PortalSuspended
|
|
|
on_message({$s, <<>>}, State) ->
|