Browse Source

Merge pull request #248 from seriyps/copy-from-stdin

COPY .. FROM STDIN
Sergey Prokhorov 4 years ago
parent
commit
e013266047

+ 8 - 2
include/protocol.hrl

@@ -41,10 +41,16 @@
 -define(PARAMETER_DESCRIPTION, $t).
 -define(PARAMETER_DESCRIPTION, $t).
 -define(ROW_DESCRIPTION, $T).
 -define(ROW_DESCRIPTION, $T).
 -define(READY_FOR_QUERY, $Z).
 -define(READY_FOR_QUERY, $Z).
--define(COPY_BOTH_RESPONSE, $W).
--define(COPY_DATA, $d).
 -define(TERMINATE, $X).
 -define(TERMINATE, $X).
 
 
+% Copy protocol
+-define(COPY_DATA, $d).
+-define(COPY_DONE, $c).
+-define(COPY_FAIL, $f).
+-define(COPY_IN_RESPONSE, $G).
+-define(COPY_OUT_RESPONSE, $H).
+-define(COPY_BOTH_RESPONSE, $W).
+
 % CopyData replication messages
 % CopyData replication messages
 -define(X_LOG_DATA, $w).
 -define(X_LOG_DATA, $w).
 -define(PRIMARY_KEEPALIVE_MESSAGE, $k).
 -define(PRIMARY_KEEPALIVE_MESSAGE, $k).

+ 47 - 0
src/commands/epgsql_cmd_copy_done.erl

@@ -0,0 +1,47 @@
+%%% @doc Tells server that the transfer of COPY data is done.
+%%%
+%%% It makes server to "commit" the data to the table and switch to the normal command-processing
+%%% mode.
+%%%
+%%% @see epgsql_cmd_copy_from_stdin
+
+-module(epgsql_cmd_copy_done).
+-behaviour(epgsql_command).
+-export([init/1, execute/2, handle_message/4]).
+-export_type([response/0]).
+
+-type response() :: {ok, Count :: non_neg_integer()}
+                  | {error, epgsql:query_error()}.
+
+%% -include("epgsql.hrl").
+-include("protocol.hrl").
+-include("../epgsql_copy.hrl").
+
+init(_) ->
+    [].
+
+execute(Sock0, St) ->
+    #copy{format = Format} = epgsql_sock:get_subproto_state(Sock0), % assert we are in copy-mode
+    Sock1 = epgsql_sock:set_packet_handler(on_message, Sock0),
+    Sock = epgsql_sock:set_attr(subproto_state, undefined, Sock1),
+    {PktType, PktData} = epgsql_wire:encode_copy_done(),
+    case Format of
+        text ->
+            {send, PktType, PktData, Sock, St};
+        binary ->
+            Pkts = [{?COPY_DATA, epgsql_wire:encode_copy_trailer()},
+                    {PktType, PktData}],
+            {send_multi, Pkts, Sock, St}
+    end.
+
+handle_message(?COMMAND_COMPLETE, Bin, Sock, St) ->
+    Complete = {copy, Count} = epgsql_wire:decode_complete(Bin),
+    {add_result, {ok, Count}, {complete, Complete}, Sock, St};
+handle_message(?ERROR, Error, Sock, St) ->
+    Result = {error, Error},
+    {add_result, Result, Result, Sock, St};
+handle_message(?READY_FOR_QUERY, _Status, Sock, _State) ->
+    [Result] = epgsql_sock:get_results(Sock),
+    {finish, Result, done, Sock};
+handle_message(_, _, _, _) ->
+    unknown.

+ 102 - 0
src/commands/epgsql_cmd_copy_from_stdin.erl

@@ -0,0 +1,102 @@
+%%% @doc Tells server to switch to "COPY-in" mode
+%%%
+%%% See [https://www.postgresql.org/docs/current/sql-copy.html].
+%%% See [https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-COPY].
+%%%
+%%% When `Format' is `text', copy data should then be delivered using Erlang
+%%% <a href="https://erlang.org/doc/apps/stdlib/io_protocol.html">io protocol</a>.
+%%% See {@link file:write/2}, {@link io:put_chars/2}.
+%%% "End-of-data" marker `\.' at the end of TEXT or CSV data stream is not needed.
+%%%
+%%% When `Format' is `{binary, [epgsql_type()]}', recommended way to deliver data is
+%%% {@link epgsql:copy_send_rows/3}. IO-protocol can be used as well, as long as you can
+%%% do proper binary encoding of data tuples (header and trailer are sent automatically),
+%%% see [https://www.postgresql.org/docs/current/sql-copy.html#id-1.9.3.55.9.4.6].
+%%% When you don't know what are the correct type names for your columns, you could try to
+%%% construct equivalent `INSERT' or `SELECT' statement and call {@link epgsql:parse/2} command.
+%%% It will return `#statement{columns = [#column{type = TypeName}]}' with correct type names.
+%%%
+%%% {@link epgsql_cmd_copy_done} should be called in the end.
+%%%
+%%% This command should not be used with command pipelining!
+%%%
+%%% ```
+%%% > SQuery COPY ... FROM STDIN ...
+%%% < CopyInResponse
+%%% > CopyData*            -- implemented in io protocol, not here
+%%% > CopyDone | CopyFail  -- implemented in epgsql_cmd_copy_done
+%%% < CommandComplete      -- implemented in epgsql_cmd_copy_done
+%%% '''
+-module(epgsql_cmd_copy_from_stdin).
+-behaviour(epgsql_command).
+-export([init/1, execute/2, handle_message/4]).
+-export_type([response/0]).
+
+-type response() :: {ok, [text | binary]} | {error, epgsql:query_error()}.
+
+-include("epgsql.hrl").
+-include("protocol.hrl").
+-include("../epgsql_copy.hrl").
+
+-record(copy_stdin,
+        {query :: iodata(),
+         initiator :: pid(),
+         format :: {binary, [epgsql:epgsql_type()]} | text}).
+
+init({SQL, Initiator, Format}) ->
+    #copy_stdin{query = SQL, initiator = Initiator, format = Format}.
+
+execute(Sock, #copy_stdin{query = SQL, format = Format} = St) ->
+    undefined = epgsql_sock:get_subproto_state(Sock), % assert we are not in copy-mode already
+    {PktType, PktData} = epgsql_wire:encode_query(SQL),
+    case Format of
+        text ->
+            {send, PktType, PktData, Sock, St};
+        {binary, _} ->
+            Header = epgsql_wire:encode_copy_header(),
+            {send_multi, [{PktType, PktData},
+                          {?COPY_DATA, Header}], Sock, St}
+    end.
+
+%% CopyBothResponses
+handle_message(?COPY_IN_RESPONSE, <<BinOrText, NumColumns:?int16, Formats/binary>>, Sock,
+               #copy_stdin{initiator = Initiator, format = RequestedFormat}) ->
+    ColumnFormats = [format_to_atom(Format) || <<Format:?int16>> <= Formats],
+    length(ColumnFormats) =:= NumColumns orelse error(invalid_copy_in_response),
+    CopyState = init_copy_state(format_to_atom(BinOrText), RequestedFormat, ColumnFormats, Initiator),
+    Sock1 = epgsql_sock:set_attr(subproto_state, CopyState, Sock),
+    Res = {ok, ColumnFormats},
+    {finish, Res, Res, epgsql_sock:set_packet_handler(on_copy_from_stdin, Sock1)};
+handle_message(?ERROR, Error, _Sock, _State) ->
+    Result = {error, Error},
+    {sync_required, Result};
+handle_message(_, _, _, _) ->
+    unknown.
+
+init_copy_state(text, text, ColumnFormats, Initiator) ->
+    %% When BinOrText is `text', all "columns" should be `text' format as well.
+    %% See https://www.postgresql.org/docs/current/protocol-message-formats.html
+    %% CopyInResponse
+    (lists:member(binary, ColumnFormats) == false)
+        orelse error(invalid_copy_in_response),
+    #copy{initiator = Initiator, format = text};
+init_copy_state(binary, {binary, ColumnTypes}, ColumnFormats, Initiator) ->
+    %% https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-COPY
+    %% "As of the present implementation, all columns in a given COPY operation will use the same
+    %% format, but the message design does not assume this."
+    (lists:member(text, ColumnFormats) == false)
+        orelse error(invalid_copy_in_response),
+    NumColumns = length(ColumnFormats),
+    %% Eg, `epgsql:copy_from_stdin(C, "COPY tab (a, b, c) WITH (FORMAT binary)", {binary, [int2, int4]})'
+    %% so number of columns in SQL is not same as number of types in `binary'
+    (NumColumns == length(ColumnTypes))
+        orelse error({column_count_mismatch, ColumnTypes, NumColumns}),
+    #copy{initiator = Initiator, format = binary, binary_types = ColumnTypes};
+init_copy_state(ServerExpectedFormat, RequestedFormat, _, _Initiator) ->
+    %% Eg, `epgsql:copy_from_stdin(C, "COPY ... WITH (FORMAT text)", {binary, ...})' or
+    %% `epgsql:copy_from_stdin(C, "COPY ... WITH (FORMAT binary)", text)' or maybe PostgreSQL
+    %% got some new format epgsql is not aware of
+    error({format_mismatch, RequestedFormat, ServerExpectedFormat}).
+
+format_to_atom(0) -> text;
+format_to_atom(1) -> binary.

+ 2 - 2
src/commands/epgsql_cmd_start_replication.erl

@@ -37,7 +37,7 @@ execute(Sock, #start_repl{slot = ReplicationSlot, callback = Callback,
                           plugin_opts = PluginOpts, opts = Opts} = St) ->
                           plugin_opts = PluginOpts, opts = Opts} = St) ->
     %% Connection should be started with 'replication' option. Then
     %% Connection should be started with 'replication' option. Then
     %% 'replication_state' will be initialized
     %% 'replication_state' will be initialized
-    Repl = #repl{} = epgsql_sock:get_replication_state(Sock),
+    Repl = #repl{} = epgsql_sock:get_subproto_state(Sock),
     Sql1 = ["START_REPLICATION SLOT ", ReplicationSlot, " LOGICAL ", WALPosition],
     Sql1 = ["START_REPLICATION SLOT ", ReplicationSlot, " LOGICAL ", WALPosition],
     Sql2 =
     Sql2 =
         case PluginOpts of
         case PluginOpts of
@@ -57,7 +57,7 @@ execute(Sock, #start_repl{slot = ReplicationSlot, callback = Callback,
     Repl3 = Repl2#repl{last_flushed_lsn = LSN,
     Repl3 = Repl2#repl{last_flushed_lsn = LSN,
                        last_applied_lsn = LSN,
                        last_applied_lsn = LSN,
                        align_lsn = AlignLsn},
                        align_lsn = AlignLsn},
-    Sock2 = epgsql_sock:set_attr(replication_state, Repl3, Sock),
+    Sock2 = epgsql_sock:set_attr(subproto_state, Repl3, Sock),
                          %% handler = on_replication},
                          %% handler = on_replication},
     {PktType, PktData} = epgsql_wire:encode_query(Sql2),
     {PktType, PktData} = epgsql_wire:encode_query(Sql2),
     {send, PktType, PktData, Sock2, St}.
     {send, PktType, PktData, Sock2, St}.

+ 52 - 1
src/epgsql.erl

@@ -28,6 +28,10 @@
          with_transaction/2,
          with_transaction/2,
          with_transaction/3,
          with_transaction/3,
          sync_on_error/2,
          sync_on_error/2,
+         copy_from_stdin/2,
+         copy_from_stdin/3,
+         copy_send_rows/3,
+         copy_done/1,
          standby_status_update/3,
          standby_status_update/3,
          start_replication/5,
          start_replication/5,
          start_replication/6,
          start_replication/6,
@@ -448,11 +452,58 @@ sync_on_error(C, Error = {error, _}) ->
 sync_on_error(_C, R) ->
 sync_on_error(_C, R) ->
     R.
     R.
 
 
+%% @equiv copy_from_stdin(C, SQL, text)
+copy_from_stdin(C, SQL) ->
+    copy_from_stdin(C, SQL, text).
+
+%% @doc Switches epgsql into COPY-mode
+%%
+%% When `Format' is `text', Erlang IO-protocol should be used to transfer "raw" COPY data to the
+%% server (see, eg, `io:put_chars/2' and `file:write/2' etc).
+%%
+%% When `Format' is `{binary, Types}', {@link copy_send_rows/3} should be used instead.
+%%
+%% In case COPY-payload is invalid, asynchronous message of the form
+%% `{epgsql, connection(), {error, epgsql:query_error()}}' (similar to asynchronous notification,
+%% see {@link set_notice_receiver/2}) will be sent to the process that called `copy_from_stdin'
+%% and all the subsequent IO-protocol requests will return error.
+%% It's important to not call `copy_done' if such error is detected!
+%%
+%% @param SQL have to be `COPY ... FROM STDIN ...' statement
+%% @param Format data transfer format specification: `text' or `{binary, epgsql_type()}'. Have to
+%%        match `WHERE (FORMAT ???)' from SQL (`text' for `text'/`csv' OR `{binary, ..}' for `binary').
+%% @returns in case of success, `{ok, [text | binary]}' tuple is returned. List describes the expected
+%%        payload format for each column of input. In current implementation all the atoms in a list
+%%        will be the same and will match the atom in `Format' parameter. It may change in the future
+%%        if PostgreSQL will introduce alternative payload formats.
+-spec copy_from_stdin(connection(), sql_query(), text | {binary, [epgsql_type()]}) ->
+          epgsql_cmd_copy_from_stdin:response().
+copy_from_stdin(C, SQL, Format) ->
+    epgsql_sock:sync_command(C, epgsql_cmd_copy_from_stdin, {SQL, self(), Format}).
+
+%% @doc Send a batch of rows to `COPY .. FROM STDIN WITH (FORMAT binary)' in Erlang format
+%%
+%% Erlang values will be converted to postgres types same way as parameters of, eg, {@link equery/3}
+%% using data type specification from 3rd argument of {@link copy_from_stdin/3} (number of columns in
+%% each element of `Rows' should match the number of elements in `{binary, Types}').
+%% @param Rows might be a list of tuples or list of lists. List of lists is slightly more efficient.
+-spec copy_send_rows(connection(), [tuple() | [bind_param()]], timeout()) -> ok | {error, ErrReason} when
+      ErrReason :: not_in_copy_mode | not_binary_format | query_error().
+copy_send_rows(C, Rows, Timeout) ->
+    epgsql_sock:copy_send_rows(C, Rows, Timeout).
+
+%% @doc Tells server that the transfer of COPY data is done
+%%
+%% Stops copy-mode and returns the number of inserted rows.
+-spec copy_done(connection()) -> epgsql_cmd_copy_done:response().
+copy_done(C) ->
+    epgsql_sock:sync_command(C, epgsql_cmd_copy_done, []).
+
 -spec standby_status_update(connection(), lsn(), lsn()) -> ok.
 -spec standby_status_update(connection(), lsn(), lsn()) -> ok.
 %% @doc sends last flushed and applied WAL positions to the server in a standby status update message via
 %% @doc sends last flushed and applied WAL positions to the server in a standby status update message via
 %% given `Connection'
 %% given `Connection'
 standby_status_update(Connection, FlushedLSN, AppliedLSN) ->
 standby_status_update(Connection, FlushedLSN, AppliedLSN) ->
-    gen_server:call(Connection, {standby_status_update, FlushedLSN, AppliedLSN}).
+    epgsql_sock:standby_status_update(Connection, FlushedLSN, AppliedLSN).
 
 
 handle_x_log_data(Mod, StartLSN, EndLSN, WALRecord, Repl) ->
 handle_x_log_data(Mod, StartLSN, EndLSN, WALRecord, Repl) ->
     Mod:handle_x_log_data(StartLSN, EndLSN, WALRecord, Repl).
     Mod:handle_x_log_data(StartLSN, EndLSN, WALRecord, Repl).

+ 9 - 0
src/epgsql_copy.hrl

@@ -0,0 +1,9 @@
+-record(copy,
+        {
+         %% pid of the process that started the COPY. It is used to receive asynchronous error
+         %% messages when some error in data stream was detected
+         initiator :: pid(),
+         last_error :: undefined | epgsql:query_error(),
+         format :: binary | text,
+         binary_types :: [epgsql:epgsql_type()] | undefined
+        }).

+ 148 - 27
src/epgsql_sock.erl

@@ -30,6 +30,10 @@
 %%% some conflicting low-level commands (such as `parse', `bind', `execute') are
 %%% some conflicting low-level commands (such as `parse', `bind', `execute') are
 %%% executed in a wrong order. In this case server and epgsql states become out of
 %%% executed in a wrong order. In this case server and epgsql states become out of
 %%% sync and {@link epgsql_cmd_sync} have to be executed in order to recover.
 %%% sync and {@link epgsql_cmd_sync} have to be executed in order to recover.
+%%%
+%%% {@link epgsql_cmd_copy_from_stdin} and {@link epgsql_cmd_start_replication} switches the
+%%% "state machine" of connection process to a special "COPY mode" subprotocol.
+%%% See [https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-COPY].
 %%% @see epgsql_cmd_connect. epgsql_cmd_connect for network connection and authentication setup
 %%% @see epgsql_cmd_connect. epgsql_cmd_connect for network connection and authentication setup
 %%% @end
 %%% @end
 %%% Copyright (C) 2009 - Will Glozer.  All rights reserved.
 %%% Copyright (C) 2009 - Will Glozer.  All rights reserved.
@@ -46,25 +50,28 @@
          get_parameter/2,
          get_parameter/2,
          set_notice_receiver/2,
          set_notice_receiver/2,
          get_cmd_status/1,
          get_cmd_status/1,
-         cancel/1]).
+         cancel/1,
+         copy_send_rows/3,
+         standby_status_update/3]).
 
 
 -export([handle_call/3, handle_cast/2, handle_info/2]).
 -export([handle_call/3, handle_cast/2, handle_info/2]).
 -export([init/1, code_change/3, terminate/2]).
 -export([init/1, code_change/3, terminate/2]).
 
 
 %% loop callback
 %% loop callback
--export([on_message/3, on_replication/3]).
+-export([on_message/3, on_replication/3, on_copy_from_stdin/3]).
 
 
 %% Comand's APIs
 %% Comand's APIs
 -export([set_net_socket/3, init_replication_state/1, set_attr/3, get_codec/1,
 -export([set_net_socket/3, init_replication_state/1, set_attr/3, get_codec/1,
          get_rows/1, get_results/1, notify/2, send/2, send/3, send_multi/2,
          get_rows/1, get_results/1, notify/2, send/2, send/3, send_multi/2,
          get_parameter_internal/2,
          get_parameter_internal/2,
-         get_replication_state/1, set_packet_handler/2]).
+         get_subproto_state/1, set_packet_handler/2]).
 
 
 -export_type([transport/0, pg_sock/0, error/0]).
 -export_type([transport/0, pg_sock/0, error/0]).
 
 
 -include("epgsql.hrl").
 -include("epgsql.hrl").
 -include("protocol.hrl").
 -include("protocol.hrl").
 -include("epgsql_replication.hrl").
 -include("epgsql_replication.hrl").
+-include("epgsql_copy.hrl").
 
 
 -type transport() :: {call, any()}
 -type transport() :: {call, any()}
                    | {cast, pid(), reference()}
                    | {cast, pid(), reference()}
@@ -72,6 +79,7 @@
 
 
 -type tcp_socket() :: port(). %gen_tcp:socket() isn't exported prior to erl 18
 -type tcp_socket() :: port(). %gen_tcp:socket() isn't exported prior to erl 18
 -type repl_state() :: #repl{}.
 -type repl_state() :: #repl{}.
+-type copy_state() :: #copy{}.
 
 
 -type error() :: {error, sync_required | closed | sock_closed | sock_error}.
 -type error() :: {error, sync_required | closed | sock_closed | sock_error}.
 
 
@@ -79,7 +87,7 @@
                 sock :: tcp_socket() | ssl:sslsocket() | undefined,
                 sock :: tcp_socket() | ssl:sslsocket() | undefined,
                 data = <<>>,
                 data = <<>>,
                 backend :: {Pid :: integer(), Key :: integer()} | undefined,
                 backend :: {Pid :: integer(), Key :: integer()} | undefined,
-                handler = on_message :: on_message | on_replication | undefined,
+                handler = on_message :: on_message | on_replication | on_copy_from_stdin | undefined,
                 codec :: epgsql_binary:codec() | undefined,
                 codec :: epgsql_binary:codec() | undefined,
                 queue = queue:new() :: queue:queue({epgsql_command:command(), any(), transport()}),
                 queue = queue:new() :: queue:queue({epgsql_command:command(), any(), transport()}),
                 current_cmd :: epgsql_command:command() | undefined,
                 current_cmd :: epgsql_command:command() | undefined,
@@ -92,11 +100,17 @@
                 sync_required :: boolean() | undefined,
                 sync_required :: boolean() | undefined,
                 txstatus :: byte() | undefined,  % $I | $T | $E,
                 txstatus :: byte() | undefined,  % $I | $T | $E,
                 complete_status :: atom() | {atom(), integer()} | undefined,
                 complete_status :: atom() | {atom(), integer()} | undefined,
-                repl :: repl_state() | undefined,
+                subproto_state :: repl_state() | copy_state() | undefined,
                 connect_opts :: epgsql:connect_opts() | undefined}).
                 connect_opts :: epgsql:connect_opts() | undefined}).
 
 
 -opaque pg_sock() :: #state{}.
 -opaque pg_sock() :: #state{}.
 
 
+-ifndef(OTP_RELEASE).                           % pre-OTP21
+-define(WITH_STACKTRACE(T, R, S), T:R -> S = erlang:get_stacktrace(), ).
+-else.
+-define(WITH_STACKTRACE(T, R, S), T:R:S ->).
+-endif.
+
 %% -- client interface --
 %% -- client interface --
 
 
 start_link() ->
 start_link() ->
@@ -131,6 +145,12 @@ get_cmd_status(C) ->
 cancel(S) ->
 cancel(S) ->
     gen_server:cast(S, cancel).
     gen_server:cast(S, cancel).
 
 
+copy_send_rows(C, Rows, Timeout) ->
+    gen_server:call(C, {copy_send_rows, Rows}, Timeout).
+
+standby_status_update(C, FlushedLSN, AppliedLSN) ->
+    gen_server:call(C, {standby_status_update, FlushedLSN, AppliedLSN}).
+
 
 
 %% -- command APIs --
 %% -- command APIs --
 
 
@@ -145,7 +165,7 @@ set_net_socket(Mod, Socket, State) ->
 
 
 -spec init_replication_state(pg_sock()) -> pg_sock().
 -spec init_replication_state(pg_sock()) -> pg_sock().
 init_replication_state(State) ->
 init_replication_state(State) ->
-    State#state{repl = #repl{}}.
+    State#state{subproto_state = #repl{}}.
 
 
 -spec set_attr(atom(), any(), pg_sock()) -> pg_sock().
 -spec set_attr(atom(), any(), pg_sock()) -> pg_sock().
 set_attr(backend, {_Pid, _Key} = Backend, State) ->
 set_attr(backend, {_Pid, _Key} = Backend, State) ->
@@ -158,8 +178,8 @@ set_attr(codec, Codec, State) ->
     State#state{codec = Codec};
     State#state{codec = Codec};
 set_attr(sync_required, Value, State) ->
 set_attr(sync_required, Value, State) ->
     State#state{sync_required = Value};
     State#state{sync_required = Value};
-set_attr(replication_state, Value, State) ->
-    State#state{repl = Value};
+set_attr(subproto_state, Value, State) ->
+    State#state{subproto_state = Value};
 set_attr(connect_opts, ConnectOpts, State) ->
 set_attr(connect_opts, ConnectOpts, State) ->
     State#state{connect_opts = ConnectOpts}.
     State#state{connect_opts = ConnectOpts}.
 
 
@@ -172,9 +192,9 @@ set_packet_handler(Handler, State) ->
 get_codec(#state{codec = Codec}) ->
 get_codec(#state{codec = Codec}) ->
     Codec.
     Codec.
 
 
--spec get_replication_state(pg_sock()) -> repl_state().
-get_replication_state(#state{repl = Repl}) ->
-    Repl.
+-spec get_subproto_state(pg_sock()) -> repl_state() | copy_state() | undefined.
+get_subproto_state(#state{subproto_state = SubState}) ->
+    SubState.
 
 
 -spec get_rows(pg_sock()) -> [tuple()].
 -spec get_rows(pg_sock()) -> [tuple()].
 get_rows(#state{rows = Rows}) ->
 get_rows(#state{rows = Rows}) ->
@@ -197,6 +217,10 @@ get_parameter_internal(Name, #state{parameters = Parameters}) ->
 init([]) ->
 init([]) ->
     {ok, #state{}}.
     {ok, #state{}}.
 
 
+handle_call({command, Command, Args}, From, State) ->
+    Transport = {call, From},
+    command_new(Transport, Command, Args, State);
+
 handle_call({get_parameter, Name}, _From, State) ->
 handle_call({get_parameter, Name}, _From, State) ->
     {reply, {ok, get_parameter_internal(Name, State)}, State};
     {reply, {ok, get_parameter_internal(Name, State)}, State};
 
 
@@ -208,14 +232,16 @@ handle_call(get_cmd_status, _From, #state{complete_status = Status} = State) ->
 
 
 handle_call({standby_status_update, FlushedLSN, AppliedLSN}, _From,
 handle_call({standby_status_update, FlushedLSN, AppliedLSN}, _From,
             #state{handler = on_replication,
             #state{handler = on_replication,
-                   repl = #repl{last_received_lsn = ReceivedLSN} = Repl} = State) ->
+                   subproto_state = #repl{last_received_lsn = ReceivedLSN} = Repl} = State) ->
     send(State, ?COPY_DATA, epgsql_wire:encode_standby_status_update(ReceivedLSN, FlushedLSN, AppliedLSN)),
     send(State, ?COPY_DATA, epgsql_wire:encode_standby_status_update(ReceivedLSN, FlushedLSN, AppliedLSN)),
     Repl1 = Repl#repl{last_flushed_lsn = FlushedLSN,
     Repl1 = Repl#repl{last_flushed_lsn = FlushedLSN,
                       last_applied_lsn = AppliedLSN},
                       last_applied_lsn = AppliedLSN},
-    {reply, ok, State#state{repl = Repl1}};
-handle_call({command, Command, Args}, From, State) ->
-    Transport = {call, From},
-    command_new(Transport, Command, Args, State).
+    {reply, ok, State#state{subproto_state = Repl1}};
+
+handle_call({copy_send_rows, Rows}, _From,
+           #state{handler = Handler, subproto_state = CopyState} = State) ->
+    Response = handle_copy_send_rows(Rows, Handler, CopyState, State),
+    {reply, Response, State}.
 
 
 handle_cast({{Method, From, Ref} = Transport, Command, Args}, State)
 handle_cast({{Method, From, Ref} = Transport, Command, Args}, State)
   when ((Method == cast) or (Method == incremental)),
   when ((Method == cast) or (Method == incremental)),
@@ -241,6 +267,10 @@ handle_cast(cancel, State = #state{backend = {Pid, Key},
     end,
     end,
     {noreply, State}.
     {noreply, State}.
 
 
+handle_info({DataTag, Sock, Data2}, #state{data = Data, sock = Sock} = State)
+  when DataTag == tcp; DataTag == ssl ->
+    loop(State#state{data = <<Data/binary, Data2/binary>>});
+
 handle_info({Closed, Sock}, #state{sock = Sock} = State)
 handle_info({Closed, Sock}, #state{sock = Sock} = State)
   when Closed == tcp_closed; Closed == ssl_closed ->
   when Closed == tcp_closed; Closed == ssl_closed ->
     {stop, sock_closed, flush_queue(State#state{sock = undefined}, {error, sock_closed})};
     {stop, sock_closed, flush_queue(State#state{sock = undefined}, {error, sock_closed})};
@@ -256,8 +286,10 @@ handle_info({inet_reply, _, ok}, State) ->
 handle_info({inet_reply, _, Status}, State) ->
 handle_info({inet_reply, _, Status}, State) ->
     {stop, Status, flush_queue(State, {error, Status})};
     {stop, Status, flush_queue(State, {error, Status})};
 
 
-handle_info({_, Sock, Data2}, #state{data = Data, sock = Sock} = State) ->
-    loop(State#state{data = <<Data/binary, Data2/binary>>}).
+handle_info({io_request, From, ReplyAs, Request}, State) ->
+    Response = handle_io_request(Request, State),
+    io_reply(Response, From, ReplyAs),
+    {noreply, State}.
 
 
 terminate(_Reason, #state{sock = undefined}) -> ok;
 terminate(_Reason, #state{sock = undefined}) -> ok;
 terminate(_Reason, #state{mod = gen_tcp, sock = Sock}) -> gen_tcp:close(Sock);
 terminate(_Reason, #state{mod = gen_tcp, sock = Sock}) -> gen_tcp:close(Sock);
@@ -398,7 +430,7 @@ do_send(gen_tcp, Sock, Bin) ->
 do_send(ssl, Sock, Bin) ->
 do_send(ssl, Sock, Bin) ->
     ssl:send(Sock, Bin).
     ssl:send(Sock, Bin).
 
 
-loop(#state{data = Data, handler = Handler, repl = Repl} = State) ->
+loop(#state{data = Data, handler = Handler, subproto_state = Repl} = State) ->
     case epgsql_wire:decode_message(Data) of
     case epgsql_wire:decode_message(Data) of
         {Type, Payload, Tail} ->
         {Type, Payload, Tail} ->
             case ?MODULE:Handler(Type, Payload, State#state{data = Tail}) of
             case ?MODULE:Handler(Type, Payload, State#state{data = Tail}) of
@@ -409,14 +441,16 @@ loop(#state{data = Data, handler = Handler, repl = Repl} = State) ->
             end;
             end;
         _ ->
         _ ->
             %% in replication mode send feedback after each batch of messages
             %% in replication mode send feedback after each batch of messages
-            case (Repl =/= undefined) andalso (Repl#repl.feedback_required) of
+            case Handler == on_replication
+                  andalso (Repl =/= undefined)
+                  andalso (Repl#repl.feedback_required) of
                 true ->
                 true ->
                     #repl{last_received_lsn = LastReceivedLSN,
                     #repl{last_received_lsn = LastReceivedLSN,
                           last_flushed_lsn = LastFlushedLSN,
                           last_flushed_lsn = LastFlushedLSN,
                           last_applied_lsn = LastAppliedLSN} = Repl,
                           last_applied_lsn = LastAppliedLSN} = Repl,
                     send(State, ?COPY_DATA, epgsql_wire:encode_standby_status_update(
                     send(State, ?COPY_DATA, epgsql_wire:encode_standby_status_update(
                         LastReceivedLSN, LastFlushedLSN, LastAppliedLSN)),
                         LastReceivedLSN, LastFlushedLSN, LastAppliedLSN)),
-                    {noreply, State#state{repl = Repl#repl{feedback_required = false}}};
+                    {noreply, State#state{subproto_state = Repl#repl{feedback_required = false}}};
                 _ ->
                 _ ->
                     {noreply, State}
                     {noreply, State}
             end
             end
@@ -486,6 +520,74 @@ flush_queue(#state{current_cmd = undefined} = State, _) ->
 flush_queue(State, Error) ->
 flush_queue(State, Error) ->
     flush_queue(finish(State, Error), Error).
     flush_queue(finish(State, Error), Error).
 
 
+%% @doc Handler for IO protocol version of COPY FROM STDIN
+%%
+%% COPY FROM STDIN is implemented as Erlang
+%% <a href="https://erlang.org/doc/apps/stdlib/io_protocol.html">io protocol</a>.
+handle_io_request(_, #state{handler = Handler}) when Handler =/= on_copy_from_stdin ->
+    %% Received IO request when `epgsql_cmd_copy_from_stdin' haven't yet been called or it was
+    %% terminated with error and already sent `ReadyForQuery'
+    {error, not_in_copy_mode};
+handle_io_request(_, #state{subproto_state = #copy{last_error = Err}}) when Err =/= undefined ->
+    {error, Err};
+handle_io_request({put_chars, Encoding, Chars}, State) ->
+    send(State, ?COPY_DATA, encode_chars(Encoding, Chars));
+handle_io_request({put_chars, Encoding, Mod, Fun, Args}, State) ->
+    try apply(Mod, Fun, Args) of
+        Chars when is_binary(Chars);
+                   is_list(Chars) ->
+            handle_io_request({put_chars, Encoding, Chars}, State);
+        Other ->
+            {error, {fun_return_not_characters, Other}}
+    catch ?WITH_STACKTRACE(T, R, S)
+            {error, {fun_exception, {T, R, S}}}
+    end;
+handle_io_request({setopts, _}, _State) ->
+    {error, request};
+handle_io_request(getopts, _State) ->
+    {error, request};
+handle_io_request({requests, Requests}, State) ->
+    try_requests(Requests, State, ok).
+
+try_requests([Req | Requests], State, _) ->
+    case handle_io_request(Req, State) of
+        {error, _} = Err ->
+            Err;
+        Other ->
+            try_requests(Requests, State, Other)
+    end;
+try_requests([], _, LastRes) ->
+    LastRes.
+
+io_reply(Result, From, ReplyAs) ->
+    From ! {io_reply, ReplyAs, Result}.
+
+%% @doc Handler for `copy_send_rows' API
+%%
+%% Only supports binary protocol right now.
+%% But, in theory, can be used for text / csv formats as well, but we would need to add
+%% some more callbacks to `epgsql_type' behaviour (eg, `encode_text')
+handle_copy_send_rows(_Rows, Handler, _CopyState, _State) when Handler =/= on_copy_from_stdin ->
+    {error, not_in_copy_mode};
+handle_copy_send_rows(_, _, #copy{format = Format}, _) when Format =/= binary ->
+    %% copy_send_rows only supports "binary" format
+    {error, not_binary_format};
+handle_copy_send_rows(_, _, #copy{last_error = LastError}, _) when LastError =/= undefined ->
+    %% server already reported error in data stream asynchronously
+    {error, LastError};
+handle_copy_send_rows(Rows, _, #copy{binary_types = Types}, State) ->
+    Data = [epgsql_wire:encode_copy_row(Values, Types, get_codec(State))
+            || Values <- Rows],
+    ok = send(State, ?COPY_DATA, Data).
+
+encode_chars(_, Bin) when is_binary(Bin) ->
+    Bin;
+encode_chars(unicode, Chars) when is_list(Chars) ->
+    unicode:characters_to_binary(Chars);
+encode_chars(latin1, Chars) when is_list(Chars) ->
+    unicode:characters_to_binary(Chars, latin1).
+
+
 to_binary(B) when is_binary(B) -> B;
 to_binary(B) when is_binary(B) -> B;
 to_binary(L) when is_list(L)   -> list_to_binary(L).
 to_binary(L) when is_list(L)   -> list_to_binary(L).
 
 
@@ -547,12 +649,31 @@ on_message(?NOTIFICATION, <<Pid:?int32, Strings/binary>>, State) ->
 on_message(Msg, Payload, State) ->
 on_message(Msg, Payload, State) ->
     command_handle_message(Msg, Payload, State).
     command_handle_message(Msg, Payload, State).
 
 
+%% @doc Handle "copy subprotocol" for COPY .. FROM STDIN
+%%
+%% Activated by `epgsql_cmd_copy_from_stdin', deactivated by `epgsql_cmd_copy_done' or error
+on_copy_from_stdin(?READY_FOR_QUERY, <<Status:8>>,
+                   #state{subproto_state = #copy{last_error = Err,
+                                                 initiator = Pid}} = State) when Err =/= undefined ->
+    %% Reporting error from here and not from ?ERROR so it's easier to be in sync state
+    Pid ! {epgsql, self(), {error, Err}},
+    {noreply, State#state{subproto_state = undefined,
+                          handler = on_message,
+                          txstatus = Status}};
+on_copy_from_stdin(?ERROR, Err, #state{subproto_state = SubState} = State) ->
+    Reason = epgsql_wire:decode_error(Err),
+    {noreply, State#state{subproto_state = SubState#copy{last_error = Reason}}};
+on_copy_from_stdin(M, Data, Sock) when M == ?NOTICE;
+                                       M == ?NOTIFICATION;
+                                       M == ?PARAMETER_STATUS ->
+    on_message(M, Data, Sock).
+
 
 
 %% CopyData for Replication mode
 %% CopyData for Replication mode
 on_replication(?COPY_DATA, <<?PRIMARY_KEEPALIVE_MESSAGE:8, LSN:?int64, _Timestamp:?int64, ReplyRequired:8>>,
 on_replication(?COPY_DATA, <<?PRIMARY_KEEPALIVE_MESSAGE:8, LSN:?int64, _Timestamp:?int64, ReplyRequired:8>>,
-               #state{repl = #repl{last_flushed_lsn = LastFlushedLSN,
-                                   last_applied_lsn = LastAppliedLSN,
-                                   align_lsn = AlignLsn} = Repl} = State) ->
+               #state{subproto_state = #repl{last_flushed_lsn = LastFlushedLSN,
+                                             last_applied_lsn = LastAppliedLSN,
+                                             align_lsn = AlignLsn} = Repl} = State) ->
     Repl1 =
     Repl1 =
         case ReplyRequired of
         case ReplyRequired of
             1 when AlignLsn ->
             1 when AlignLsn ->
@@ -569,14 +690,14 @@ on_replication(?COPY_DATA, <<?PRIMARY_KEEPALIVE_MESSAGE:8, LSN:?int64, _Timestam
                 Repl#repl{feedback_required = true,
                 Repl#repl{feedback_required = true,
                           last_received_lsn = LSN}
                           last_received_lsn = LSN}
         end,
         end,
-    {noreply, State#state{repl = Repl1}};
+    {noreply, State#state{subproto_state = Repl1}};
 
 
 %% CopyData for Replication mode
 %% CopyData for Replication mode
 on_replication(?COPY_DATA, <<?X_LOG_DATA, StartLSN:?int64, EndLSN:?int64,
 on_replication(?COPY_DATA, <<?X_LOG_DATA, StartLSN:?int64, EndLSN:?int64,
                              _Timestamp:?int64, WALRecord/binary>>,
                              _Timestamp:?int64, WALRecord/binary>>,
-               #state{repl = Repl} = State) ->
+               #state{subproto_state = Repl} = State) ->
     Repl1 = handle_xlog_data(StartLSN, EndLSN, WALRecord, Repl),
     Repl1 = handle_xlog_data(StartLSN, EndLSN, WALRecord, Repl),
-    {noreply, State#state{repl = Repl1}};
+    {noreply, State#state{subproto_state = Repl1}};
 on_replication(?ERROR, Err, State) ->
 on_replication(?ERROR, Err, State) ->
     Reason = epgsql_wire:decode_error(Err),
     Reason = epgsql_wire:decode_error(Err),
     {stop, {error, Reason}, State};
     {stop, {error, Reason}, State};

+ 48 - 2
src/epgsql_wire.erl

@@ -23,12 +23,16 @@
          encode_formats/1,
          encode_formats/1,
          format/2,
          format/2,
          encode_parameters/2,
          encode_parameters/2,
-         encode_standby_status_update/3]).
+         encode_standby_status_update/3,
+         encode_copy_header/0,
+         encode_copy_row/3,
+         encode_copy_trailer/0]).
 %% Encoders for Client -> Server packets
 %% Encoders for Client -> Server packets
 -export([encode_query/1,
 -export([encode_query/1,
          encode_parse/3,
          encode_parse/3,
          encode_describe/2,
          encode_describe/2,
          encode_bind/4,
          encode_bind/4,
+         encode_copy_done/0,
          encode_execute/2,
          encode_execute/2,
          encode_close/2,
          encode_close/2,
          encode_flush/0,
          encode_flush/0,
@@ -213,6 +217,7 @@ decode_complete(Bin) ->
         ["DELETE", Rows]       -> {delete, list_to_integer(Rows)};
         ["DELETE", Rows]       -> {delete, list_to_integer(Rows)};
         ["MOVE", Rows]         -> {move, list_to_integer(Rows)};
         ["MOVE", Rows]         -> {move, list_to_integer(Rows)};
         ["FETCH", Rows]        -> {fetch, list_to_integer(Rows)};
         ["FETCH", Rows]        -> {fetch, list_to_integer(Rows)};
+        ["COPY", Rows]         -> {copy, list_to_integer(Rows)};
         [Type | _Rest]         -> lower_atom(Type)
         [Type | _Rest]         -> lower_atom(Type)
     end.
     end.
 
 
@@ -251,7 +256,8 @@ format(#column{oid = Oid}, Codec) ->
     end.
     end.
 
 
 %% @doc encode parameters for 'Bind'
 %% @doc encode parameters for 'Bind'
--spec encode_parameters([], epgsql_binary:codec()) -> iolist().
+-spec encode_parameters([{epgsql:epgsql_type(), epgsql:bind_param()}],
+                        epgsql_binary:codec()) -> iolist().
 encode_parameters(Parameters, Codec) ->
 encode_parameters(Parameters, Codec) ->
     encode_parameters(Parameters, 0, <<>>, [], Codec).
     encode_parameters(Parameters, 0, <<>>, [], Codec).
 
 
@@ -310,6 +316,41 @@ encode_standby_status_update(ReceivedLSN, FlushedLSN, AppliedLSN) ->
     Timestamp = ((MegaSecs * 1000000 + Secs) * 1000000 + MicroSecs) - 946684800*1000000,
     Timestamp = ((MegaSecs * 1000000 + Secs) * 1000000 + MicroSecs) - 946684800*1000000,
     <<$r:8, ReceivedLSN:?int64, FlushedLSN:?int64, AppliedLSN:?int64, Timestamp:?int64, 0:8>>.
     <<$r:8, ReceivedLSN:?int64, FlushedLSN:?int64, AppliedLSN:?int64, Timestamp:?int64, 0:8>>.
 
 
+%% @doc encode binary copy data file header
+%%
+%% See [https://www.postgresql.org/docs/current/sql-copy.html#id-1.9.3.55.9.4.5]
+encode_copy_header() ->
+    <<
+      "PGCOPY\n", 8#377, "\r\n", 0,             % "signature"
+      0:?int32,                                 % flags
+      0:?int32                                  % length of the extensions area
+    >>.
+
+%% @doc encode binary copy data file row / tuple
+%%
+%% See [https://www.postgresql.org/docs/current/sql-copy.html#id-1.9.3.55.9.4.6]
+encode_copy_row(ValuesTuple, Types, Codec) when is_tuple(ValuesTuple) ->
+    encode_copy_row(tuple_to_list(ValuesTuple), Types, Codec);
+encode_copy_row(Values, Types, Codec) ->
+    NumCols = length(Types),
+    [<<NumCols:?int16>>
+    | lists:zipwith(
+        fun(Type, Value) ->
+                case epgsql_binary:is_null(Value, Codec) of
+                    true ->
+                        <<-1:?int32>>;
+                    false ->
+                        epgsql_binary:encode(Type, Value, Codec)
+                end
+        end, Types, Values)
+    ].
+
+%% @doc encode binary copy data file header
+%%
+%% See [https://www.postgresql.org/docs/current/sql-copy.html#id-1.9.3.55.9.4.7]
+encode_copy_trailer() ->
+    <<-1:?int16>>.
+
 %%
 %%
 %% Encoders for various PostgreSQL protocol client-side packets
 %% Encoders for various PostgreSQL protocol client-side packets
 %% See https://www.postgresql.org/docs/current/protocol-message-formats.html
 %% See https://www.postgresql.org/docs/current/protocol-message-formats.html
@@ -390,5 +431,10 @@ encode_flush() ->
 encode_sync() ->
 encode_sync() ->
     {?SYNC, []}.
     {?SYNC, []}.
 
 
+%% @doc encodes `CopyDone' packet.
+-spec encode_copy_done() -> {packet_type(), iodata()}.
+encode_copy_done() ->
+    {?COPY_DONE, []}.
+
 obj_atom_to_byte(statement) -> ?PREPARED_STATEMENT;
 obj_atom_to_byte(statement) -> ?PREPARED_STATEMENT;
 obj_atom_to_byte(portal) -> ?PORTAL.
 obj_atom_to_byte(portal) -> ?PORTAL.

+ 347 - 0
test/epgsql_copy_SUITE.erl

@@ -0,0 +1,347 @@
+-module(epgsql_copy_SUITE).
+-include_lib("common_test/include/ct.hrl").
+-include_lib("stdlib/include/assert.hrl").
+-include("epgsql.hrl").
+
+-export([
+    init_per_suite/1,
+    all/0,
+    end_per_suite/1,
+
+    from_stdin_text/1,
+    from_stdin_csv/1,
+    from_stdin_binary/1,
+    from_stdin_io_apis/1,
+    from_stdin_with_terminator/1,
+    from_stdin_corrupt_data/1
+]).
+
+init_per_suite(Config) ->
+    [{module, epgsql}|Config].
+
+end_per_suite(_Config) ->
+    ok.
+
+all() ->
+    [
+     from_stdin_text,
+     from_stdin_csv,
+     from_stdin_binary,
+     from_stdin_io_apis,
+     from_stdin_with_terminator,
+     from_stdin_corrupt_data
+    ].
+
+%% @doc Test that COPY in text format works
+from_stdin_text(Config) ->
+    Module = ?config(module, Config),
+    epgsql_ct:with_connection(
+        Config,
+        fun(C) ->
+                ?assertEqual(
+                   {ok, [text, text]},
+                   Module:copy_from_stdin(
+                     C, "COPY test_table1 (id, value) FROM STDIN WITH (FORMAT text)")),
+                ?assertEqual(
+                   ok,
+                   io:put_chars(C,
+                                "10\thello world\n"
+                                "11\t\\N\n"
+                                "12\tline 12\n")),
+                ?assertEqual(
+                   ok,
+                   io:put_chars(C, "13\tline 13\n")),
+                ?assertEqual(
+                   ok,
+                   io:put_chars(C, "14\tli")),
+                ?assertEqual(
+                   ok,
+                   io:put_chars(C, "ne 14\n")),
+                ?assertEqual(
+                   {ok, 5},
+                   Module:copy_done(C)),
+                ?assertMatch(
+                   {ok, _, [{10, <<"hello world">>},
+                            {11, null},
+                            {12, <<"line 12">>},
+                            {13, <<"line 13">>},
+                            {14, <<"line 14">>}]},
+                   Module:equery(C,
+                                 "SELECT id, value FROM test_table1"
+                                 " WHERE id IN (10, 11, 12, 13, 14) ORDER BY id"))
+        end).
+
+%% @doc Test that COPY in CSV format works
+from_stdin_csv(Config) ->
+    Module = ?config(module, Config),
+    epgsql_ct:with_connection(
+        Config,
+        fun(C) ->
+                ?assertEqual(
+                   {ok, [text, text]},
+                   Module:copy_from_stdin(
+                     C, "COPY test_table1 (id, value) FROM STDIN WITH (FORMAT csv, QUOTE '''')")),
+                ?assertEqual(
+                   ok,
+                   io:put_chars(C,
+                                "20,'hello world'\n"
+                                "21,\n"
+                                "22,line 22\n")),
+                ?assertEqual(
+                   ok,
+                   io:put_chars(C, "23,'line 23'\n")),
+                ?assertEqual(
+                   ok,
+                   io:put_chars(C, "24,'li")),
+                ?assertEqual(
+                   ok,
+                   io:put_chars(C, "ne 24'\n")),
+                ?assertEqual(
+                   {ok, 5},
+                   Module:copy_done(C)),
+                ?assertMatch(
+                   {ok, _, [{20, <<"hello world">>},
+                            {21, null},
+                            {22, <<"line 22">>},
+                            {23, <<"line 23">>},
+                            {24, <<"line 24">>}]},
+                   Module:equery(C,
+                                 "SELECT id, value FROM test_table1"
+                                 " WHERE id IN (20, 21, 22, 23, 24) ORDER BY id"))
+        end).
+
+%% @doc Test that COPY in binary format works
+from_stdin_binary(Config) ->
+    Module = ?config(module, Config),
+    epgsql_ct:with_connection(
+        Config,
+        fun(C) ->
+                ?assertEqual(
+                   {ok, [binary, binary]},
+                   Module:copy_from_stdin(
+                     C, "COPY test_table1 (id, value) FROM STDIN WITH (FORMAT binary)",
+                     {binary, [int4, text]})),
+                %% Batch of rows
+                ?assertEqual(
+                   ok,
+                   Module:copy_send_rows(
+                     C,
+                     [{60, <<"hello world">>},
+                      {61, null},
+                      {62, "line 62"}],
+                     5000)),
+                %% Single row
+                ?assertEqual(
+                   ok,
+                   Module:copy_send_rows(
+                     C,
+                     [{63, <<"line 63">>}],
+                     1000)),
+                %% Rows as lists
+                ?assertEqual(
+                   ok,
+                   Module:copy_send_rows(
+                     C,
+                     [
+                      [64, <<"line 64">>],
+                      [65, <<"line 65">>]
+                     ],
+                     infinity)),
+                ?assertEqual({ok, 6}, Module:copy_done(C)),
+                ?assertMatch(
+                   {ok, _, [{60, <<"hello world">>},
+                            {61, null},
+                            {62, <<"line 62">>},
+                            {63, <<"line 63">>},
+                            {64, <<"line 64">>},
+                            {65, <<"line 65">>}]},
+                   Module:equery(C,
+                                 "SELECT id, value FROM test_table1"
+                                 " WHERE id IN (60, 61, 62, 63, 64, 65) ORDER BY id"))
+        end).
+
+%% @doc Tests that different IO-protocol APIs work
+from_stdin_io_apis(Config) ->
+    Module = ?config(module, Config),
+    epgsql_ct:with_connection(
+        Config,
+        fun(C) ->
+                ?assertEqual(
+                   {ok, [text, text]},
+                   Module:copy_from_stdin(
+                     C, "COPY test_table1 (id, value) FROM STDIN WITH (FORMAT text)")),
+                ?assertEqual(ok, io:format(C, "30\thello world\n", [])),
+                ?assertEqual(ok, io:format(C, "~b\t~s\n", [31, "line 31"])),
+                %% Output "32\thello\n" in multiple calls
+                ?assertEqual(ok, io:write(C, 32)),
+                ?assertEqual(ok, io:put_chars(C, "\t")),
+                ?assertEqual(ok, io:write(C, hello)),
+                ?assertEqual(ok, io:nl(C)),
+                %% Using `file` API
+                ?assertEqual(ok, file:write(C, "33\tline 33\n34\tline 34\n")),
+                %% Binary
+                ?assertEqual(ok, io:put_chars(C, <<"35\tline 35\n">>)),
+                ?assertEqual(ok, file:write(C, <<"36\tline 36\n">>)),
+                %% IoData
+                ?assertEqual(ok, io:put_chars(C, [<<"37">>, $\t, <<"line 37">>, <<$\n>>])),
+                ?assertEqual(ok, file:write(C, [["38", <<$\t>>], [<<"line 38">>, $\n]])),
+                %% Raw IO-protocol message-passing
+                Ref = erlang:make_ref(),
+                C ! {io_request, self(), Ref, {put_chars, unicode, "39\tline 39\n"}},
+                ?assertEqual(ok, receive {io_reply, Ref, Resp} -> Resp
+                                 after 5000 ->
+                                         timeout
+                                 end),
+                %% Not documented!
+                ?assertEqual(ok, io:requests(
+                                   C,
+                                   [{put_chars, unicode, "40\tline 40\n"},
+                                    {put_chars, latin1, "41\tline 41\n"},
+                                    {format, "~w\t~s", [42, "line 42"]},
+                                    nl])),
+                ?assertEqual(
+                   {ok, 13},
+                   Module:copy_done(C)),
+                ?assertMatch(
+                   {ok, _, [{30, <<"hello world">>},
+                            {31, <<"line 31">>},
+                            {32, <<"hello">>},
+                            {33, <<"line 33">>},
+                            {34, <<"line 34">>},
+                            {35, <<"line 35">>},
+                            {36, <<"line 36">>},
+                            {37, <<"line 37">>},
+                            {38, <<"line 38">>},
+                            {39, <<"line 39">>},
+                            {40, <<"line 40">>},
+                            {41, <<"line 41">>},
+                            {42, <<"line 42">>}
+                            ]},
+                   Module:equery(
+                     C,
+                     "SELECT id, value FROM test_table1"
+                     " WHERE id IN (30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42)"
+                     " ORDER BY id"))
+        end).
+
+%% @doc Tests that "end-of-data" terminator is successfully ignored
+from_stdin_with_terminator(Config) ->
+    Module = ?config(module, Config),
+    epgsql_ct:with_connection(
+        Config,
+        fun(C) ->
+                %% TEXT
+                ?assertEqual(
+                   {ok, [text, text]},
+                   Module:copy_from_stdin(
+                     C, "COPY test_table1 (id, value) FROM STDIN WITH (FORMAT text)")),
+                ?assertEqual(ok, io:put_chars(
+                                   C,
+                                   "50\tline 50\n"
+                                   "51\tline 51\n"
+                                   "\\.\n")),
+                ?assertEqual({ok, 2}, Module:copy_done(C)),
+                %% CSV
+                ?assertEqual(
+                   {ok, [text, text]},
+                   Module:copy_from_stdin(
+                     C, "COPY test_table1 (id, value) FROM STDIN WITH (FORMAT csv)")),
+                ?assertEqual(ok, io:put_chars(
+                                   C,
+                                   "52,line 52\n"
+                                   "53,line 53\n"
+                                   "\\.\n")),
+                ?assertEqual({ok, 2}, Module:copy_done(C)),
+                ?assertMatch(
+                   {ok, _, [{50, <<"line 50">>},
+                            {51, <<"line 51">>},
+                            {52, <<"line 52">>},
+                            {53, <<"line 53">>}
+                            ]},
+                   Module:equery(C,
+                                 "SELECT id, value FROM test_table1"
+                                 " WHERE id IN (50, 51, 52, 53) ORDER BY id"))
+        end).
+
+from_stdin_corrupt_data(Config) ->
+    Module = ?config(module, Config),
+    epgsql_ct:with_connection(
+        Config,
+        fun(C) ->
+                ?assertEqual(
+                   {ok, [text, text]},
+                   Module:copy_from_stdin(
+                     C, "COPY test_table1 (id, value) FROM STDIN WITH (FORMAT text)")),
+                %% Wrong number of arguments to io:format
+                Fmt = "~w\t~s\n",
+                ?assertMatch({error, {fun_exception, {error, badarg, _Stack}}},
+                             io:request(C, {format, Fmt, []})),
+                ?assertError(badarg, io:format(C, Fmt, [])),
+                %% Wrong return value from IO function
+                ?assertEqual({error, {fun_return_not_characters, node()}},
+                             io:request(C, {put_chars, unicode, erlang, node, []})),
+                ?assertEqual({ok, 0}, Module:copy_done(C)),
+                %%
+                %% Corrupt text format
+                ?assertEqual(
+                   {ok, [text, text]},
+                   Module:copy_from_stdin(
+                     C, "COPY test_table1 (id, value) FROM STDIN WITH (FORMAT text)")),
+                ?assertEqual(ok, io:put_chars(
+                                   C,
+                                   "42\n43\nwasd\n")),
+                ?assertMatch(
+                   #error{codename = bad_copy_file_format,
+                          severity = error},
+                   receive
+                       {epgsql, C, {error, Err}} ->
+                           Err
+                   after 5000 ->
+                           timeout
+                   end),
+                ?assertEqual({error, not_in_copy_mode},
+                             io:request(C, {put_chars, unicode, "queque\n"})),
+                ?assertError(badarg, io:format(C, "~w\n~s\n", [60, "wasd"])),
+                %%
+                %% Corrupt CSV format
+                ?assertEqual(
+                   {ok, [text, text]},
+                   Module:copy_from_stdin(
+                     C, "COPY test_table1 (id, value) FROM STDIN WITH (FORMAT csv)")),
+                ?assertEqual(ok, io:put_chars(
+                                   C,
+                                   "42\n43\nwasd\n")),
+                ?assertMatch(
+                   #error{codename = bad_copy_file_format,
+                          severity = error},
+                   receive
+                       {epgsql, C, {error, Err}} ->
+                           Err
+                   after 5000 ->
+                           timeout
+                   end),
+                %%
+                %% Corrupt binary format
+                ?assertEqual(
+                   {ok, [binary, binary]},
+                   Module:copy_from_stdin(
+                     C, "COPY test_table1 (id, value) FROM STDIN WITH (FORMAT binary)",
+                     {binary, [int4, text]})),
+                ?assertEqual(
+                   ok,
+                   Module:copy_send_rows(C, [{44, <<"line 44">>}], 1000)),
+                ?assertEqual(ok, io:put_chars(C, "45\tThis is not ok!\n")),
+                ?assertMatch(
+                   #error{codename = bad_copy_file_format,
+                          severity = error},
+                   receive
+                       {epgsql, C, {error, Err}} ->
+                           Err
+                   after 5000 ->
+                           timeout
+                   end),
+                %% Connection is still usable
+                ?assertMatch(
+                   {ok, _, [{1}]},
+                   Module:equery(C, "SELECT 1", []))
+        end).