|
@@ -197,6 +197,10 @@ get_parameter_internal(Name, #state{parameters = Parameters}) ->
|
|
|
init([]) ->
|
|
|
{ok, #state{}}.
|
|
|
|
|
|
+handle_call({command, Command, Args}, From, State) ->
|
|
|
+ Transport = {call, From},
|
|
|
+ command_new(Transport, Command, Args, State);
|
|
|
+
|
|
|
handle_call({get_parameter, Name}, _From, State) ->
|
|
|
{reply, {ok, get_parameter_internal(Name, State)}, State};
|
|
|
|
|
@@ -212,10 +216,7 @@ handle_call({standby_status_update, FlushedLSN, AppliedLSN}, _From,
|
|
|
send(State, ?COPY_DATA, epgsql_wire:encode_standby_status_update(ReceivedLSN, FlushedLSN, AppliedLSN)),
|
|
|
Repl1 = Repl#repl{last_flushed_lsn = FlushedLSN,
|
|
|
last_applied_lsn = AppliedLSN},
|
|
|
- {reply, ok, State#state{repl = Repl1}};
|
|
|
-handle_call({command, Command, Args}, From, State) ->
|
|
|
- Transport = {call, From},
|
|
|
- command_new(Transport, Command, Args, State).
|
|
|
+ {reply, ok, State#state{repl = Repl1}}.
|
|
|
|
|
|
handle_cast({{Method, From, Ref} = Transport, Command, Args}, State)
|
|
|
when ((Method == cast) or (Method == incremental)),
|
|
@@ -241,6 +242,10 @@ handle_cast(cancel, State = #state{backend = {Pid, Key},
|
|
|
end,
|
|
|
{noreply, State}.
|
|
|
|
|
|
+handle_info({DataTag, Sock, Data2}, #state{data = Data, sock = Sock} = State)
|
|
|
+ when DataTag == tcp; DataTag == ssl ->
|
|
|
+ loop(State#state{data = <<Data/binary, Data2/binary>>});
|
|
|
+
|
|
|
handle_info({Closed, Sock}, #state{sock = Sock} = State)
|
|
|
when Closed == tcp_closed; Closed == ssl_closed ->
|
|
|
{stop, sock_closed, flush_queue(State#state{sock = undefined}, {error, sock_closed})};
|
|
@@ -254,10 +259,7 @@ handle_info({inet_reply, _, ok}, State) ->
|
|
|
{noreply, State};
|
|
|
|
|
|
handle_info({inet_reply, _, Status}, State) ->
|
|
|
- {stop, Status, flush_queue(State, {error, Status})};
|
|
|
-
|
|
|
-handle_info({_, Sock, Data2}, #state{data = Data, sock = Sock} = State) ->
|
|
|
- loop(State#state{data = <<Data/binary, Data2/binary>>}).
|
|
|
+ {stop, Status, flush_queue(State, {error, Status})}.
|
|
|
|
|
|
terminate(_Reason, #state{sock = undefined}) -> ok;
|
|
|
terminate(_Reason, #state{mod = gen_tcp, sock = Sock}) -> gen_tcp:close(Sock);
|