|
@@ -30,6 +30,10 @@
|
|
|
%%% some conflicting low-level commands (such as `parse', `bind', `execute') are
|
|
|
%%% executed in a wrong order. In this case server and epgsql states become out of
|
|
|
%%% sync and {@link epgsql_cmd_sync} have to be executed in order to recover.
|
|
|
+%%%
|
|
|
+%%% {@link epgsql_cmd_copy_from_stdin} and {@link epgsql_cmd_start_replication} switches the
|
|
|
+%%% "state machine" of connection process to a special "COPY mode" subprotocol.
|
|
|
+%%% See [https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-COPY].
|
|
|
%%% @see epgsql_cmd_connect. epgsql_cmd_connect for network connection and authentication setup
|
|
|
%%% @end
|
|
|
%%% Copyright (C) 2009 - Will Glozer. All rights reserved.
|
|
@@ -46,25 +50,28 @@
|
|
|
get_parameter/2,
|
|
|
set_notice_receiver/2,
|
|
|
get_cmd_status/1,
|
|
|
- cancel/1]).
|
|
|
+ cancel/1,
|
|
|
+ copy_send_rows/3,
|
|
|
+ standby_status_update/3]).
|
|
|
|
|
|
--export([handle_call/3, handle_cast/2, handle_info/2]).
|
|
|
+-export([handle_call/3, handle_cast/2, handle_info/2, format_status/2]).
|
|
|
-export([init/1, code_change/3, terminate/2]).
|
|
|
|
|
|
%% loop callback
|
|
|
--export([on_message/3, on_replication/3]).
|
|
|
+-export([on_message/3, on_replication/3, on_copy_from_stdin/3]).
|
|
|
|
|
|
%% Comand's APIs
|
|
|
-export([set_net_socket/3, init_replication_state/1, set_attr/3, get_codec/1,
|
|
|
get_rows/1, get_results/1, notify/2, send/2, send/3, send_multi/2,
|
|
|
get_parameter_internal/2,
|
|
|
- get_replication_state/1, set_packet_handler/2]).
|
|
|
+ get_subproto_state/1, set_packet_handler/2]).
|
|
|
|
|
|
-export_type([transport/0, pg_sock/0, error/0]).
|
|
|
|
|
|
-include("epgsql.hrl").
|
|
|
-include("protocol.hrl").
|
|
|
-include("epgsql_replication.hrl").
|
|
|
+-include("epgsql_copy.hrl").
|
|
|
|
|
|
-type transport() :: {call, any()}
|
|
|
| {cast, pid(), reference()}
|
|
@@ -72,6 +79,7 @@
|
|
|
|
|
|
-type tcp_socket() :: port(). %gen_tcp:socket() isn't exported prior to erl 18
|
|
|
-type repl_state() :: #repl{}.
|
|
|
+-type copy_state() :: #copy{}.
|
|
|
|
|
|
-type error() :: {error, sync_required | closed | sock_closed | sock_error}.
|
|
|
|
|
@@ -79,7 +87,7 @@
|
|
|
sock :: tcp_socket() | ssl:sslsocket() | undefined,
|
|
|
data = <<>>,
|
|
|
backend :: {Pid :: integer(), Key :: integer()} | undefined,
|
|
|
- handler = on_message :: on_message | on_replication | undefined,
|
|
|
+ handler = on_message :: on_message | on_replication | on_copy_from_stdin | undefined,
|
|
|
codec :: epgsql_binary:codec() | undefined,
|
|
|
queue = queue:new() :: queue:queue({epgsql_command:command(), any(), transport()}),
|
|
|
current_cmd :: epgsql_command:command() | undefined,
|
|
@@ -87,16 +95,22 @@
|
|
|
current_cmd_transport :: transport() | undefined,
|
|
|
async :: undefined | atom() | pid(),
|
|
|
parameters = [] :: [{Key :: binary(), Value :: binary()}],
|
|
|
- rows = [] :: [tuple()],
|
|
|
+ rows = [] :: [tuple()] | information_redacted,
|
|
|
results = [],
|
|
|
sync_required :: boolean() | undefined,
|
|
|
txstatus :: byte() | undefined, % $I | $T | $E,
|
|
|
complete_status :: atom() | {atom(), integer()} | undefined,
|
|
|
- repl :: repl_state() | undefined,
|
|
|
+ subproto_state :: repl_state() | copy_state() | undefined,
|
|
|
connect_opts :: epgsql:connect_opts() | undefined}).
|
|
|
|
|
|
-opaque pg_sock() :: #state{}.
|
|
|
|
|
|
+-ifndef(OTP_RELEASE). % pre-OTP21
|
|
|
+-define(WITH_STACKTRACE(T, R, S), T:R -> S = erlang:get_stacktrace(), ).
|
|
|
+-else.
|
|
|
+-define(WITH_STACKTRACE(T, R, S), T:R:S ->).
|
|
|
+-endif.
|
|
|
+
|
|
|
%% -- client interface --
|
|
|
|
|
|
start_link() ->
|
|
@@ -131,6 +145,12 @@ get_cmd_status(C) ->
|
|
|
cancel(S) ->
|
|
|
gen_server:cast(S, cancel).
|
|
|
|
|
|
+copy_send_rows(C, Rows, Timeout) ->
|
|
|
+ gen_server:call(C, {copy_send_rows, Rows}, Timeout).
|
|
|
+
|
|
|
+standby_status_update(C, FlushedLSN, AppliedLSN) ->
|
|
|
+ gen_server:call(C, {standby_status_update, FlushedLSN, AppliedLSN}).
|
|
|
+
|
|
|
|
|
|
%% -- command APIs --
|
|
|
|
|
@@ -145,7 +165,7 @@ set_net_socket(Mod, Socket, State) ->
|
|
|
|
|
|
-spec init_replication_state(pg_sock()) -> pg_sock().
|
|
|
init_replication_state(State) ->
|
|
|
- State#state{repl = #repl{}}.
|
|
|
+ State#state{subproto_state = #repl{}}.
|
|
|
|
|
|
-spec set_attr(atom(), any(), pg_sock()) -> pg_sock().
|
|
|
set_attr(backend, {_Pid, _Key} = Backend, State) ->
|
|
@@ -158,8 +178,8 @@ set_attr(codec, Codec, State) ->
|
|
|
State#state{codec = Codec};
|
|
|
set_attr(sync_required, Value, State) ->
|
|
|
State#state{sync_required = Value};
|
|
|
-set_attr(replication_state, Value, State) ->
|
|
|
- State#state{repl = Value};
|
|
|
+set_attr(subproto_state, Value, State) ->
|
|
|
+ State#state{subproto_state = Value};
|
|
|
set_attr(connect_opts, ConnectOpts, State) ->
|
|
|
State#state{connect_opts = ConnectOpts}.
|
|
|
|
|
@@ -172,9 +192,9 @@ set_packet_handler(Handler, State) ->
|
|
|
get_codec(#state{codec = Codec}) ->
|
|
|
Codec.
|
|
|
|
|
|
--spec get_replication_state(pg_sock()) -> repl_state().
|
|
|
-get_replication_state(#state{repl = Repl}) ->
|
|
|
- Repl.
|
|
|
+-spec get_subproto_state(pg_sock()) -> repl_state() | copy_state() | undefined.
|
|
|
+get_subproto_state(#state{subproto_state = SubState}) ->
|
|
|
+ SubState.
|
|
|
|
|
|
-spec get_rows(pg_sock()) -> [tuple()].
|
|
|
get_rows(#state{rows = Rows}) ->
|
|
@@ -197,6 +217,10 @@ get_parameter_internal(Name, #state{parameters = Parameters}) ->
|
|
|
init([]) ->
|
|
|
{ok, #state{}}.
|
|
|
|
|
|
+handle_call({command, Command, Args}, From, State) ->
|
|
|
+ Transport = {call, From},
|
|
|
+ command_new(Transport, Command, Args, State);
|
|
|
+
|
|
|
handle_call({get_parameter, Name}, _From, State) ->
|
|
|
{reply, {ok, get_parameter_internal(Name, State)}, State};
|
|
|
|
|
@@ -208,14 +232,16 @@ handle_call(get_cmd_status, _From, #state{complete_status = Status} = State) ->
|
|
|
|
|
|
handle_call({standby_status_update, FlushedLSN, AppliedLSN}, _From,
|
|
|
#state{handler = on_replication,
|
|
|
- repl = #repl{last_received_lsn = ReceivedLSN} = Repl} = State) ->
|
|
|
+ subproto_state = #repl{last_received_lsn = ReceivedLSN} = Repl} = State) ->
|
|
|
send(State, ?COPY_DATA, epgsql_wire:encode_standby_status_update(ReceivedLSN, FlushedLSN, AppliedLSN)),
|
|
|
Repl1 = Repl#repl{last_flushed_lsn = FlushedLSN,
|
|
|
last_applied_lsn = AppliedLSN},
|
|
|
- {reply, ok, State#state{repl = Repl1}};
|
|
|
-handle_call({command, Command, Args}, From, State) ->
|
|
|
- Transport = {call, From},
|
|
|
- command_new(Transport, Command, Args, State).
|
|
|
+ {reply, ok, State#state{subproto_state = Repl1}};
|
|
|
+
|
|
|
+handle_call({copy_send_rows, Rows}, _From,
|
|
|
+ #state{handler = Handler, subproto_state = CopyState} = State) ->
|
|
|
+ Response = handle_copy_send_rows(Rows, Handler, CopyState, State),
|
|
|
+ {reply, Response, State}.
|
|
|
|
|
|
handle_cast({{Method, From, Ref} = Transport, Command, Args}, State)
|
|
|
when ((Method == cast) or (Method == incremental)),
|
|
@@ -241,6 +267,10 @@ handle_cast(cancel, State = #state{backend = {Pid, Key},
|
|
|
end,
|
|
|
{noreply, State}.
|
|
|
|
|
|
+handle_info({DataTag, Sock, Data2}, #state{data = Data, sock = Sock} = State)
|
|
|
+ when DataTag == tcp; DataTag == ssl ->
|
|
|
+ loop(State#state{data = <<Data/binary, Data2/binary>>});
|
|
|
+
|
|
|
handle_info({Closed, Sock}, #state{sock = Sock} = State)
|
|
|
when Closed == tcp_closed; Closed == ssl_closed ->
|
|
|
{stop, sock_closed, flush_queue(State#state{sock = undefined}, {error, sock_closed})};
|
|
@@ -256,8 +286,10 @@ handle_info({inet_reply, _, ok}, State) ->
|
|
|
handle_info({inet_reply, _, Status}, State) ->
|
|
|
{stop, Status, flush_queue(State, {error, Status})};
|
|
|
|
|
|
-handle_info({_, Sock, Data2}, #state{data = Data, sock = Sock} = State) ->
|
|
|
- loop(State#state{data = <<Data/binary, Data2/binary>>}).
|
|
|
+handle_info({io_request, From, ReplyAs, Request}, State) ->
|
|
|
+ Response = handle_io_request(Request, State),
|
|
|
+ io_reply(Response, From, ReplyAs),
|
|
|
+ {noreply, State}.
|
|
|
|
|
|
terminate(_Reason, #state{sock = undefined}) -> ok;
|
|
|
terminate(_Reason, #state{mod = gen_tcp, sock = Sock}) -> gen_tcp:close(Sock);
|
|
@@ -266,6 +298,13 @@ terminate(_Reason, #state{mod = ssl, sock = Sock}) -> ssl:close(Sock).
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
|
{ok, State}.
|
|
|
|
|
|
+format_status(normal, [_PDict, State=#state{}]) ->
|
|
|
+ [{data, [{"State", State}]}];
|
|
|
+format_status(terminate, [_PDict, State]) ->
|
|
|
+ %% Do not format the rows attribute when process terminates abnormally
|
|
|
+ %% but allow it when is a sys:get_status/1.2
|
|
|
+ State#state{rows = information_redacted}.
|
|
|
+
|
|
|
%% -- internal functions --
|
|
|
|
|
|
-spec command_new(transport(), epgsql_command:command(), any(), pg_sock()) ->
|
|
@@ -398,7 +437,7 @@ do_send(gen_tcp, Sock, Bin) ->
|
|
|
do_send(ssl, Sock, Bin) ->
|
|
|
ssl:send(Sock, Bin).
|
|
|
|
|
|
-loop(#state{data = Data, handler = Handler, repl = Repl} = State) ->
|
|
|
+loop(#state{data = Data, handler = Handler, subproto_state = Repl} = State) ->
|
|
|
case epgsql_wire:decode_message(Data) of
|
|
|
{Type, Payload, Tail} ->
|
|
|
case ?MODULE:Handler(Type, Payload, State#state{data = Tail}) of
|
|
@@ -409,14 +448,16 @@ loop(#state{data = Data, handler = Handler, repl = Repl} = State) ->
|
|
|
end;
|
|
|
_ ->
|
|
|
%% in replication mode send feedback after each batch of messages
|
|
|
- case (Repl =/= undefined) andalso (Repl#repl.feedback_required) of
|
|
|
+ case Handler == on_replication
|
|
|
+ andalso (Repl =/= undefined)
|
|
|
+ andalso (Repl#repl.feedback_required) of
|
|
|
true ->
|
|
|
#repl{last_received_lsn = LastReceivedLSN,
|
|
|
last_flushed_lsn = LastFlushedLSN,
|
|
|
last_applied_lsn = LastAppliedLSN} = Repl,
|
|
|
send(State, ?COPY_DATA, epgsql_wire:encode_standby_status_update(
|
|
|
LastReceivedLSN, LastFlushedLSN, LastAppliedLSN)),
|
|
|
- {noreply, State#state{repl = Repl#repl{feedback_required = false}}};
|
|
|
+ {noreply, State#state{subproto_state = Repl#repl{feedback_required = false}}};
|
|
|
_ ->
|
|
|
{noreply, State}
|
|
|
end
|
|
@@ -486,6 +527,74 @@ flush_queue(#state{current_cmd = undefined} = State, _) ->
|
|
|
flush_queue(State, Error) ->
|
|
|
flush_queue(finish(State, Error), Error).
|
|
|
|
|
|
+%% @doc Handler for IO protocol version of COPY FROM STDIN
|
|
|
+%%
|
|
|
+%% COPY FROM STDIN is implemented as Erlang
|
|
|
+%% <a href="https://erlang.org/doc/apps/stdlib/io_protocol.html">io protocol</a>.
|
|
|
+handle_io_request(_, #state{handler = Handler}) when Handler =/= on_copy_from_stdin ->
|
|
|
+ %% Received IO request when `epgsql_cmd_copy_from_stdin' haven't yet been called or it was
|
|
|
+ %% terminated with error and already sent `ReadyForQuery'
|
|
|
+ {error, not_in_copy_mode};
|
|
|
+handle_io_request(_, #state{subproto_state = #copy{last_error = Err}}) when Err =/= undefined ->
|
|
|
+ {error, Err};
|
|
|
+handle_io_request({put_chars, Encoding, Chars}, State) ->
|
|
|
+ send(State, ?COPY_DATA, encode_chars(Encoding, Chars));
|
|
|
+handle_io_request({put_chars, Encoding, Mod, Fun, Args}, State) ->
|
|
|
+ try apply(Mod, Fun, Args) of
|
|
|
+ Chars when is_binary(Chars);
|
|
|
+ is_list(Chars) ->
|
|
|
+ handle_io_request({put_chars, Encoding, Chars}, State);
|
|
|
+ Other ->
|
|
|
+ {error, {fun_return_not_characters, Other}}
|
|
|
+ catch ?WITH_STACKTRACE(T, R, S)
|
|
|
+ {error, {fun_exception, {T, R, S}}}
|
|
|
+ end;
|
|
|
+handle_io_request({setopts, _}, _State) ->
|
|
|
+ {error, request};
|
|
|
+handle_io_request(getopts, _State) ->
|
|
|
+ {error, request};
|
|
|
+handle_io_request({requests, Requests}, State) ->
|
|
|
+ try_requests(Requests, State, ok).
|
|
|
+
|
|
|
+try_requests([Req | Requests], State, _) ->
|
|
|
+ case handle_io_request(Req, State) of
|
|
|
+ {error, _} = Err ->
|
|
|
+ Err;
|
|
|
+ Other ->
|
|
|
+ try_requests(Requests, State, Other)
|
|
|
+ end;
|
|
|
+try_requests([], _, LastRes) ->
|
|
|
+ LastRes.
|
|
|
+
|
|
|
+io_reply(Result, From, ReplyAs) ->
|
|
|
+ From ! {io_reply, ReplyAs, Result}.
|
|
|
+
|
|
|
+%% @doc Handler for `copy_send_rows' API
|
|
|
+%%
|
|
|
+%% Only supports binary protocol right now.
|
|
|
+%% But, in theory, can be used for text / csv formats as well, but we would need to add
|
|
|
+%% some more callbacks to `epgsql_type' behaviour (eg, `encode_text')
|
|
|
+handle_copy_send_rows(_Rows, Handler, _CopyState, _State) when Handler =/= on_copy_from_stdin ->
|
|
|
+ {error, not_in_copy_mode};
|
|
|
+handle_copy_send_rows(_, _, #copy{format = Format}, _) when Format =/= binary ->
|
|
|
+ %% copy_send_rows only supports "binary" format
|
|
|
+ {error, not_binary_format};
|
|
|
+handle_copy_send_rows(_, _, #copy{last_error = LastError}, _) when LastError =/= undefined ->
|
|
|
+ %% server already reported error in data stream asynchronously
|
|
|
+ {error, LastError};
|
|
|
+handle_copy_send_rows(Rows, _, #copy{binary_types = Types}, State) ->
|
|
|
+ Data = [epgsql_wire:encode_copy_row(Values, Types, get_codec(State))
|
|
|
+ || Values <- Rows],
|
|
|
+ ok = send(State, ?COPY_DATA, Data).
|
|
|
+
|
|
|
+encode_chars(_, Bin) when is_binary(Bin) ->
|
|
|
+ Bin;
|
|
|
+encode_chars(unicode, Chars) when is_list(Chars) ->
|
|
|
+ unicode:characters_to_binary(Chars);
|
|
|
+encode_chars(latin1, Chars) when is_list(Chars) ->
|
|
|
+ unicode:characters_to_binary(Chars, latin1).
|
|
|
+
|
|
|
+
|
|
|
to_binary(B) when is_binary(B) -> B;
|
|
|
to_binary(L) when is_list(L) -> list_to_binary(L).
|
|
|
|
|
@@ -547,12 +656,31 @@ on_message(?NOTIFICATION, <<Pid:?int32, Strings/binary>>, State) ->
|
|
|
on_message(Msg, Payload, State) ->
|
|
|
command_handle_message(Msg, Payload, State).
|
|
|
|
|
|
+%% @doc Handle "copy subprotocol" for COPY .. FROM STDIN
|
|
|
+%%
|
|
|
+%% Activated by `epgsql_cmd_copy_from_stdin', deactivated by `epgsql_cmd_copy_done' or error
|
|
|
+on_copy_from_stdin(?READY_FOR_QUERY, <<Status:8>>,
|
|
|
+ #state{subproto_state = #copy{last_error = Err,
|
|
|
+ initiator = Pid}} = State) when Err =/= undefined ->
|
|
|
+ %% Reporting error from here and not from ?ERROR so it's easier to be in sync state
|
|
|
+ Pid ! {epgsql, self(), {error, Err}},
|
|
|
+ {noreply, State#state{subproto_state = undefined,
|
|
|
+ handler = on_message,
|
|
|
+ txstatus = Status}};
|
|
|
+on_copy_from_stdin(?ERROR, Err, #state{subproto_state = SubState} = State) ->
|
|
|
+ Reason = epgsql_wire:decode_error(Err),
|
|
|
+ {noreply, State#state{subproto_state = SubState#copy{last_error = Reason}}};
|
|
|
+on_copy_from_stdin(M, Data, Sock) when M == ?NOTICE;
|
|
|
+ M == ?NOTIFICATION;
|
|
|
+ M == ?PARAMETER_STATUS ->
|
|
|
+ on_message(M, Data, Sock).
|
|
|
+
|
|
|
|
|
|
%% CopyData for Replication mode
|
|
|
on_replication(?COPY_DATA, <<?PRIMARY_KEEPALIVE_MESSAGE:8, LSN:?int64, _Timestamp:?int64, ReplyRequired:8>>,
|
|
|
- #state{repl = #repl{last_flushed_lsn = LastFlushedLSN,
|
|
|
- last_applied_lsn = LastAppliedLSN,
|
|
|
- align_lsn = AlignLsn} = Repl} = State) ->
|
|
|
+ #state{subproto_state = #repl{last_flushed_lsn = LastFlushedLSN,
|
|
|
+ last_applied_lsn = LastAppliedLSN,
|
|
|
+ align_lsn = AlignLsn} = Repl} = State) ->
|
|
|
Repl1 =
|
|
|
case ReplyRequired of
|
|
|
1 when AlignLsn ->
|
|
@@ -569,14 +697,14 @@ on_replication(?COPY_DATA, <<?PRIMARY_KEEPALIVE_MESSAGE:8, LSN:?int64, _Timestam
|
|
|
Repl#repl{feedback_required = true,
|
|
|
last_received_lsn = LSN}
|
|
|
end,
|
|
|
- {noreply, State#state{repl = Repl1}};
|
|
|
+ {noreply, State#state{subproto_state = Repl1}};
|
|
|
|
|
|
%% CopyData for Replication mode
|
|
|
on_replication(?COPY_DATA, <<?X_LOG_DATA, StartLSN:?int64, EndLSN:?int64,
|
|
|
_Timestamp:?int64, WALRecord/binary>>,
|
|
|
- #state{repl = Repl} = State) ->
|
|
|
+ #state{subproto_state = Repl} = State) ->
|
|
|
Repl1 = handle_xlog_data(StartLSN, EndLSN, WALRecord, Repl),
|
|
|
- {noreply, State#state{repl = Repl1}};
|
|
|
+ {noreply, State#state{subproto_state = Repl1}};
|
|
|
on_replication(?ERROR, Err, State) ->
|
|
|
Reason = epgsql_wire:decode_error(Err),
|
|
|
{stop, {error, Reason}, State};
|