|
@@ -189,13 +189,13 @@ handle_call({standby_status_update, FlushedLSN, AppliedLSN}, _From,
|
|
|
{reply, ok, State#state{repl = Repl1}};
|
|
|
handle_call({command, Command, Args}, From, State) ->
|
|
|
Transport = {call, From},
|
|
|
- {noreply, command_new(Transport, Command, Args, State)}.
|
|
|
+ command_new(Transport, Command, Args, State).
|
|
|
|
|
|
handle_cast({{Method, From, Ref} = Transport, Command, Args}, State)
|
|
|
when ((Method == cast) or (Method == incremental)),
|
|
|
is_pid(From),
|
|
|
is_reference(Ref) ->
|
|
|
- {noreply, command_new(Transport, Command, Args, State)};
|
|
|
+ command_new(Transport, Command, Args, State);
|
|
|
|
|
|
handle_cast(stop, State) ->
|
|
|
{stop, normal, flush_queue(State, {error, closed})};
|
|
@@ -241,18 +241,32 @@ code_change(_OldVsn, State, _Extra) ->
|
|
|
|
|
|
%% -- internal functions --
|
|
|
|
|
|
+-spec command_new(transport(), epgsql_command:command(), any(), #state{}) ->
|
|
|
+ Result when
|
|
|
+ Result :: {noreply, #state{}}
|
|
|
+ | {stop, Reason :: any(), #state{}}.
|
|
|
command_new(Transport, Command, Args, State) ->
|
|
|
CmdState = Command:init(Args),
|
|
|
command_exec(Transport, Command, CmdState, State).
|
|
|
|
|
|
+-spec command_exec(transport(), epgsql_command:command(), any(), #state{}) ->
|
|
|
+ Result when
|
|
|
+ Result :: {noreply, #state{}}
|
|
|
+ | {stop, Reason :: any(), #state{}}.
|
|
|
command_exec(Transport, Command, _, State = #state{sync_required = true})
|
|
|
when Command /= epgsql_cmd_sync ->
|
|
|
- finish(State#state{current_cmd = Command,
|
|
|
- current_cmd_transport = Transport},
|
|
|
- {error, sync_required});
|
|
|
+ {noreply,
|
|
|
+ finish(State#state{current_cmd = Command,
|
|
|
+ current_cmd_transport = Transport},
|
|
|
+ {error, sync_required})};
|
|
|
command_exec(Transport, Command, CmdState, State) ->
|
|
|
- {ok, State1, CmdState1} = Command:execute(State, CmdState),
|
|
|
- command_enqueue(Transport, Command, CmdState1, State1).
|
|
|
+ case Command:execute(State, CmdState) of
|
|
|
+ {ok, State1, CmdState1} ->
|
|
|
+ {noreply, command_enqueue(Transport, Command, CmdState1, State1)};
|
|
|
+ {stop, StopReason, Response, State1} ->
|
|
|
+ reply(Transport, Response, Response),
|
|
|
+ {stop, StopReason, State1}
|
|
|
+ end.
|
|
|
|
|
|
command_enqueue(Transport, Command, CmdState, #state{current_cmd = undefined} = State) ->
|
|
|
State#state{current_cmd = Command,
|
|
@@ -281,8 +295,8 @@ command_handle_message(Msg, Payload,
|
|
|
{noreply, State1#state{current_cmd_state = CmdState1}};
|
|
|
{requeue, State1, CmdState1} ->
|
|
|
Transport = State1#state.current_cmd_transport,
|
|
|
- {noreply, command_exec(Transport, Command, CmdState1,
|
|
|
- State1#state{current_cmd = undefined})};
|
|
|
+ command_exec(Transport, Command, CmdState1,
|
|
|
+ State1#state{current_cmd = undefined});
|
|
|
{stop, Reason, Response, State1} ->
|
|
|
{stop, Reason, finish(State1, Response)};
|
|
|
{sync_required, Why} ->
|
|
@@ -368,16 +382,16 @@ finish(State, Result) ->
|
|
|
finish(State, Result, Result).
|
|
|
|
|
|
finish(State = #state{current_cmd_transport = Transport}, Notice, Result) ->
|
|
|
- case Transport of
|
|
|
- {cast, From, Ref} ->
|
|
|
- From ! {self(), Ref, Result};
|
|
|
- {incremental, From, Ref} ->
|
|
|
- From ! {self(), Ref, Notice};
|
|
|
- {call, From} ->
|
|
|
- gen_server:reply(From, Result)
|
|
|
- end,
|
|
|
+ reply(Transport, Notice, Result),
|
|
|
command_next(State).
|
|
|
|
|
|
+reply({cast, From, Ref}, _, Result) ->
|
|
|
+ From ! {self(), Ref, Result};
|
|
|
+reply({incremental, From, Ref}, Notice, _) ->
|
|
|
+ From ! {self(), Ref, Notice};
|
|
|
+reply({call, From}, _, Result) ->
|
|
|
+ gen_server:reply(From, Result).
|
|
|
+
|
|
|
add_result(#state{results = Results, current_cmd_transport = Transport} = State, Notice, Result) ->
|
|
|
Results2 = case Transport of
|
|
|
{incremental, From, Ref} ->
|