|
@@ -246,6 +246,8 @@ finish(State = #state{queue = Q}, Notice, Result) ->
|
|
|
case queue:get(Q) of
|
|
|
{{cast, From, Ref}, _} ->
|
|
|
From ! {Ref, Result};
|
|
|
+ {{incremental, From, Ref}, _} ->
|
|
|
+ From ! {Ref, Notice};
|
|
|
{{call, From}, _} ->
|
|
|
gen_server:reply(From, Result)
|
|
|
end,
|
|
@@ -255,24 +257,44 @@ finish(State = #state{queue = Q}, Notice, Result) ->
|
|
|
rows = [],
|
|
|
results = []}.
|
|
|
|
|
|
-add_result(State = #state{results = Results}, Notice, Result) ->
|
|
|
+add_result(State = #state{queue = Q, results = Results}, Notice, Result) ->
|
|
|
+ Results2 = case queue:get(Q) of
|
|
|
+ {{incremental, From, Ref}, _} ->
|
|
|
+ From ! {Ref, Notice},
|
|
|
+ Results;
|
|
|
+ _ ->
|
|
|
+ [Result | Results]
|
|
|
+ end,
|
|
|
State#state{types = [],
|
|
|
columns = [],
|
|
|
rows = [],
|
|
|
- results = [Result | Results]}.
|
|
|
+ results = Results2}.
|
|
|
|
|
|
-add_row(State = #state{rows = Rows}, Data) ->
|
|
|
- State#state{rows = [Data | Rows]}.
|
|
|
+add_row(State = #state{queue = Q, rows = Rows}, Data) ->
|
|
|
+ Rows2 = case queue:get(Q) of
|
|
|
+ {{incremental, From, Ref}, _} ->
|
|
|
+ From ! {Ref, {data, Data}},
|
|
|
+ Rows;
|
|
|
+ _ ->
|
|
|
+ [Data | Rows]
|
|
|
+ end,
|
|
|
+ State#state{rows = Rows2}.
|
|
|
|
|
|
notify(State = #state{queue = Q}, Notice) ->
|
|
|
- %% TODO handle 'incremental' request
|
|
|
+ case queue:get(Q) of
|
|
|
+ {{incremental, From, Ref}, _} ->
|
|
|
+ From ! {Ref, Notice};
|
|
|
+ _ ->
|
|
|
+ ignore
|
|
|
+ end,
|
|
|
State.
|
|
|
|
|
|
-notify_async(#state{async = Pid}, Msg) ->
|
|
|
+notify_async(State = #state{async = Pid}, Msg) ->
|
|
|
case is_pid(Pid) of
|
|
|
true -> Pid ! {pgsql, self(), Msg};
|
|
|
false -> false
|
|
|
- end.
|
|
|
+ end,
|
|
|
+ State.
|
|
|
|
|
|
command_tag(#state{queue = Q}) ->
|
|
|
{_, Req} = queue:get(Q),
|
|
@@ -534,8 +556,8 @@ on_message(Error = {error, _}, State) ->
|
|
|
|
|
|
%% NoticeResponse
|
|
|
on_message({$N, Data}, State) ->
|
|
|
- notify_async(State, {notice, pgsql_wire:decode_error(Data)}),
|
|
|
- {noreply, State};
|
|
|
+ State2 = notify_async(State, {notice, pgsql_wire:decode_error(Data)}),
|
|
|
+ {noreply, State2};
|
|
|
|
|
|
%% ParameterStatus
|
|
|
on_message({$S, Data}, State) ->
|
|
@@ -550,5 +572,5 @@ on_message({$A, <<Pid:?int32, Strings/binary>>}, State) ->
|
|
|
[Channel, Payload] -> ok;
|
|
|
[Channel] -> Payload = <<>>
|
|
|
end,
|
|
|
- notify_async(State, {notification, Channel, Pid, Payload}),
|
|
|
- {noreply, State}.
|
|
|
+ State2 = notify_async(State, {notification, Channel, Pid, Payload}),
|
|
|
+ {noreply, State2}.
|