|
@@ -9,6 +9,7 @@
|
|
|
close/1,
|
|
|
get_parameter/2,
|
|
|
set_notice_receiver/2,
|
|
|
+ get_cmd_status/1,
|
|
|
cancel/1]).
|
|
|
|
|
|
-export([handle_call/3, handle_cast/2, handle_info/2]).
|
|
@@ -81,6 +82,7 @@
|
|
|
batch = [],
|
|
|
sync_required,
|
|
|
txstatus,
|
|
|
+ complete_status :: undefined | atom() | {atom(), integer()},
|
|
|
repl_last_received_lsn,
|
|
|
repl_last_flushed_lsn,
|
|
|
repl_last_applied_lsn,
|
|
@@ -105,6 +107,9 @@ set_notice_receiver(C, PidOrName) when is_pid(PidOrName);
|
|
|
is_atom(PidOrName) ->
|
|
|
gen_server:call(C, {set_async_receiver, PidOrName}, infinity).
|
|
|
|
|
|
+get_cmd_status(C) ->
|
|
|
+ gen_server:call(C, get_cmd_status, infinity).
|
|
|
+
|
|
|
cancel(S) ->
|
|
|
gen_server:cast(S, cancel).
|
|
|
|
|
@@ -127,6 +132,9 @@ handle_call({get_parameter, Name}, _From, State) ->
|
|
|
handle_call({set_async_receiver, PidOrName}, _From, #state{async = Previous} = State) ->
|
|
|
{reply, {ok, Previous}, State#state{async = PidOrName}};
|
|
|
|
|
|
+handle_call(get_cmd_status, _From, #state{complete_status = Status} = State) ->
|
|
|
+ {reply, {ok, Status}, State};
|
|
|
+
|
|
|
handle_call({standby_status_update, FlushedLSN, AppliedLSN}, _From,
|
|
|
#state{repl_last_received_lsn = ReceivedLSN} = State) ->
|
|
|
send(State, ?COPY_DATA, epgsql_wire:encode_standby_status_update(ReceivedLSN, FlushedLSN, AppliedLSN)),
|
|
@@ -135,14 +143,16 @@ handle_call({standby_status_update, FlushedLSN, AppliedLSN}, _From,
|
|
|
handle_call(Command, From, State) ->
|
|
|
#state{queue = Q} = State,
|
|
|
Req = {{call, From}, Command},
|
|
|
- command(Command, State#state{queue = queue:in(Req, Q)}).
|
|
|
+ command(Command, State#state{queue = queue:in(Req, Q),
|
|
|
+ complete_status = undefined}).
|
|
|
|
|
|
handle_cast({{Method, From, Ref}, Command} = Req, State)
|
|
|
when ((Method == cast) or (Method == incremental)),
|
|
|
is_pid(From),
|
|
|
is_reference(Ref) ->
|
|
|
#state{queue = Q} = State,
|
|
|
- command(Command, State#state{queue = queue:in(Req, Q)});
|
|
|
+ command(Command, State#state{queue = queue:in(Req, Q),
|
|
|
+ complete_status = undefined});
|
|
|
|
|
|
handle_cast(stop, State) ->
|
|
|
{stop, normal, flush_queue(State, {error, closed})};
|
|
@@ -714,8 +724,9 @@ on_message({?PORTAL_SUSPENDED, <<>>}, State) ->
|
|
|
{noreply, State2};
|
|
|
|
|
|
%% CommandComplete
|
|
|
-on_message({?COMMAND_COMPLETE, Bin}, State) ->
|
|
|
+on_message({?COMMAND_COMPLETE, Bin}, State0) ->
|
|
|
Complete = epgsql_wire:decode_complete(Bin),
|
|
|
+ State = State0#state{complete_status = Complete},
|
|
|
Command = command_tag(State),
|
|
|
Notice = {complete, Complete},
|
|
|
Rows = lists:reverse(State#state.rows),
|
|
@@ -845,4 +856,4 @@ on_message({?COPY_DATA, <<?X_LOG_DATA, StartLSN:?int64, EndLSN:?int64, _Timestam
|
|
|
CbModule:handle_x_log_data(StartLSN, EndLSN, WALRecord, CbState),
|
|
|
{noreply, State#state{repl_feedback_required = true, repl_last_received_lsn = EndLSN,
|
|
|
repl_last_flushed_lsn = LastFlushedLSN, repl_last_applied_lsn = LastAppliedLSN,
|
|
|
- repl_cbstate = NewCbState}}.
|
|
|
+ repl_cbstate = NewCbState}}.
|