Browse Source

Working basic "copy .. from stdin .." for text/csv protocols. GH-137

* 2 commands
* separate subprotocol packet handler
* shares some structures with replication
* uses Erlang IO-protocol to send replication data to epgsql connection process
Sergey Prokhorov 4 years ago
parent
commit
c173985acd

+ 8 - 2
include/protocol.hrl

@@ -41,10 +41,16 @@
 -define(PARAMETER_DESCRIPTION, $t).
 -define(PARAMETER_DESCRIPTION, $t).
 -define(ROW_DESCRIPTION, $T).
 -define(ROW_DESCRIPTION, $T).
 -define(READY_FOR_QUERY, $Z).
 -define(READY_FOR_QUERY, $Z).
--define(COPY_BOTH_RESPONSE, $W).
--define(COPY_DATA, $d).
 -define(TERMINATE, $X).
 -define(TERMINATE, $X).
 
 
+% Copy protocol
+-define(COPY_DATA, $d).
+-define(COPY_DONE, $c).
+-define(COPY_FAIL, $f).
+-define(COPY_IN_RESPONSE, $G).
+-define(COPY_OUT_RESPONSE, $H).
+-define(COPY_BOTH_RESPONSE, $W).
+
 % CopyData replication messages
 % CopyData replication messages
 -define(X_LOG_DATA, $w).
 -define(X_LOG_DATA, $w).
 -define(PRIMARY_KEEPALIVE_MESSAGE, $k).
 -define(PRIMARY_KEEPALIVE_MESSAGE, $k).

+ 42 - 0
src/commands/epgsql_cmd_copy_done.erl

@@ -0,0 +1,42 @@
+%%% @doc Tells server that the transfer of COPY data is done.
+%%%
+%%% It makes server to "commit" the data to the table and switch to the normal command-processing
+%%% mode.
+%%%
+%%% @see epgsql_cmd_copy_from_stdin
+
+-module(epgsql_cmd_copy_done).
+-behaviour(epgsql_command).
+-export([init/1, execute/2, handle_message/4]).
+-export_type([response/0]).
+
+-type response() :: {ok, Count :: non_neg_integer()}
+                  | {error, epgsql:query_error()}.
+
+%% -include("epgsql.hrl").
+-include("protocol.hrl").
+
+init(_) ->
+    [].
+
+execute(Sock0, St) ->
+    {PktType, PktData} = epgsql_wire:encode_copy_done(),
+    Sock1 = epgsql_sock:set_packet_handler(on_message, Sock0),
+    Sock = epgsql_sock:set_attr(subproto_state, undefined, Sock1),
+    {send, PktType, PktData, Sock, St}.
+
+handle_message(?COMMAND_COMPLETE, Bin, Sock, St) ->
+    Complete = epgsql_wire:decode_complete(Bin),
+    Res = case Complete of
+        {copy, Count} -> {ok, Count};
+        copy -> ok
+    end,
+    {add_result, Res, {complete, Complete}, Sock, St};
+handle_message(?ERROR, Error, Sock, St) ->
+    Result = {error, Error},
+    {add_result, Result, Result, Sock, St};
+handle_message(?READY_FOR_QUERY, _Status, Sock, _State) ->
+    [Result] = epgsql_sock:get_results(Sock),
+    {finish, Result, done, Sock};
+handle_message(_, _, _, _) ->
+    unknown.

+ 70 - 0
src/commands/epgsql_cmd_copy_from_stdin.erl

@@ -0,0 +1,70 @@
+%%% @doc Tells server to switch to "COPY-in" mode
+%%%
+%%% See [https://www.postgresql.org/docs/current/sql-copy.html].
+%%% See [https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-COPY].
+%%%
+%%% The copy data can then be delivered using Erlang
+%%% <a href="https://erlang.org/doc/apps/stdlib/io_protocol.html">io protocol</a>.
+%%% See {@link file:write/2}, {@link io:put_chars/2}.
+%%%
+%%% "End-of-data" marker `\.' at the end of TEXT or CSV data stream is not needed,
+%%% {@link epgsql_cmd_copy_done} should be called in the end.
+%%%
+%%% This command should not be used with command pipelining!
+%%%
+%%% ```
+%%% > SQuery COPY ... FROM STDIN ...
+%%% < CopyInResponse
+%%% > CopyData*            -- implemented in io protocol, not here
+%%% > CopyDone | CopyFail  -- implemented in epgsql_cmd_copy_done
+%%% < CommandComplete      -- implemented in epgsql_cmd_copy_done
+%%% '''
+-module(epgsql_cmd_copy_from_stdin).
+-behaviour(epgsql_command).
+-export([init/1, execute/2, handle_message/4]).
+-export_type([response/0]).
+
+-type response() :: ok | {error, epgsql:query_error()}.
+
+-include("epgsql.hrl").
+-include("protocol.hrl").
+-include("../epgsql_copy.hrl").
+
+-record(copy_stdin,
+        {query :: iodata()}).
+
+init(SQL) ->
+    #copy_stdin{query = SQL}.
+
+execute(Sock, #copy_stdin{query = SQL} = St) ->
+    undefined = epgsql_sock:get_subproto_state(Sock), % assert we are not in copy-mode already
+    {PktType, PktData} = epgsql_wire:encode_query(SQL),
+    {send, PktType, PktData, Sock, St}.
+
+%% CopyBothResponseщ
+handle_message(?COPY_IN_RESPONSE, <<BinOrText, NumColumns:?int16, Formats/binary>>, Sock, _State) ->
+    ColumnFormats =
+        [case Format of
+             0 -> text;
+             1 -> binary
+         end || <<Format:?int16>> <= Formats],
+    length(ColumnFormats) =:= NumColumns orelse error(invalid_copy_in_response),
+    case BinOrText of
+        0 ->
+            %% When BinOrText is 0, all "columns" should be 0 format as well.
+            %% See https://www.postgresql.org/docs/current/protocol-message-formats.html
+            %% CopyInResponse
+            (lists:member(binary, ColumnFormats) == false)
+                orelse error(invalid_copy_in_response);
+        _ ->
+            ok
+    end,
+    CopyState = #copy{},
+    Sock1 = epgsql_sock:set_attr(subproto_state, CopyState, Sock),
+    Res = {ok, ColumnFormats},
+    {finish, Res, Res, epgsql_sock:set_packet_handler(on_copy_from_stdin, Sock1)};
+handle_message(?ERROR, Error, _Sock, _State) ->
+    Result = {error, Error},
+    {sync_required, Result};
+handle_message(_, _, _, _) ->
+    unknown.

+ 2 - 2
src/commands/epgsql_cmd_start_replication.erl

@@ -37,7 +37,7 @@ execute(Sock, #start_repl{slot = ReplicationSlot, callback = Callback,
                           plugin_opts = PluginOpts, opts = Opts} = St) ->
                           plugin_opts = PluginOpts, opts = Opts} = St) ->
     %% Connection should be started with 'replication' option. Then
     %% Connection should be started with 'replication' option. Then
     %% 'replication_state' will be initialized
     %% 'replication_state' will be initialized
-    Repl = #repl{} = epgsql_sock:get_replication_state(Sock),
+    Repl = #repl{} = epgsql_sock:get_subproto_state(Sock),
     Sql1 = ["START_REPLICATION SLOT ", ReplicationSlot, " LOGICAL ", WALPosition],
     Sql1 = ["START_REPLICATION SLOT ", ReplicationSlot, " LOGICAL ", WALPosition],
     Sql2 =
     Sql2 =
         case PluginOpts of
         case PluginOpts of
@@ -57,7 +57,7 @@ execute(Sock, #start_repl{slot = ReplicationSlot, callback = Callback,
     Repl3 = Repl2#repl{last_flushed_lsn = LSN,
     Repl3 = Repl2#repl{last_flushed_lsn = LSN,
                        last_applied_lsn = LSN,
                        last_applied_lsn = LSN,
                        align_lsn = AlignLsn},
                        align_lsn = AlignLsn},
-    Sock2 = epgsql_sock:set_attr(replication_state, Repl3, Sock),
+    Sock2 = epgsql_sock:set_attr(subproto_state, Repl3, Sock),
                          %% handler = on_replication},
                          %% handler = on_replication},
     {PktType, PktData} = epgsql_wire:encode_query(Sql2),
     {PktType, PktData} = epgsql_wire:encode_query(Sql2),
     {send, PktType, PktData, Sock2, St}.
     {send, PktType, PktData, Sock2, St}.

+ 19 - 0
src/epgsql.erl

@@ -28,6 +28,8 @@
          with_transaction/2,
          with_transaction/2,
          with_transaction/3,
          with_transaction/3,
          sync_on_error/2,
          sync_on_error/2,
+         copy_from_stdin/2,
+         copy_done/1,
          standby_status_update/3,
          standby_status_update/3,
          start_replication/5,
          start_replication/5,
          start_replication/6,
          start_replication/6,
@@ -448,6 +450,23 @@ sync_on_error(C, Error = {error, _}) ->
 sync_on_error(_C, R) ->
 sync_on_error(_C, R) ->
     R.
     R.
 
 
+%% @doc Switches epgsql into COPY-mode
+%%
+%% Erlang IO-protocol can be used to transfer "raw" COPY data to the server (see, eg,
+%% `io:put_chars/2' and `file:write/2' etc).
+%% @param SQL have to be `COPY ... FROM STDIN ...' statement
+-spec copy_from_stdin(connection(), sql_query()) ->
+          epgsql_cmd_copy_from_stdin:response().
+copy_from_stdin(C, SQL) ->
+    epgsql_sock:sync_command(C, epgsql_cmd_copy_from_stdin, SQL).
+
+%% @doc Tells server that the transfer of COPY data is done
+%%
+%% Stops copy-mode and returns number of inserted rows
+-spec copy_done(connection()) -> epgsql_cmd_copy_done:response().
+copy_done(C) ->
+    epgsql_sock:sync_command(C, epgsql_cmd_copy_done, []).
+
 -spec standby_status_update(connection(), lsn(), lsn()) -> ok.
 -spec standby_status_update(connection(), lsn(), lsn()) -> ok.
 %% @doc sends last flushed and applied WAL positions to the server in a standby status update message via
 %% @doc sends last flushed and applied WAL positions to the server in a standby status update message via
 %% given `Connection'
 %% given `Connection'

+ 1 - 0
src/epgsql_copy.hrl

@@ -0,0 +1 @@
+-record(copy, {}).

+ 89 - 22
src/epgsql_sock.erl

@@ -52,19 +52,20 @@
 -export([init/1, code_change/3, terminate/2]).
 -export([init/1, code_change/3, terminate/2]).
 
 
 %% loop callback
 %% loop callback
--export([on_message/3, on_replication/3]).
+-export([on_message/3, on_replication/3, on_copy_from_stdin/3]).
 
 
 %% Comand's APIs
 %% Comand's APIs
 -export([set_net_socket/3, init_replication_state/1, set_attr/3, get_codec/1,
 -export([set_net_socket/3, init_replication_state/1, set_attr/3, get_codec/1,
          get_rows/1, get_results/1, notify/2, send/2, send/3, send_multi/2,
          get_rows/1, get_results/1, notify/2, send/2, send/3, send_multi/2,
          get_parameter_internal/2,
          get_parameter_internal/2,
-         get_replication_state/1, set_packet_handler/2]).
+         get_subproto_state/1, set_packet_handler/2]).
 
 
 -export_type([transport/0, pg_sock/0, error/0]).
 -export_type([transport/0, pg_sock/0, error/0]).
 
 
 -include("epgsql.hrl").
 -include("epgsql.hrl").
 -include("protocol.hrl").
 -include("protocol.hrl").
 -include("epgsql_replication.hrl").
 -include("epgsql_replication.hrl").
+-include("epgsql_copy.hrl").
 
 
 -type transport() :: {call, any()}
 -type transport() :: {call, any()}
                    | {cast, pid(), reference()}
                    | {cast, pid(), reference()}
@@ -72,6 +73,7 @@
 
 
 -type tcp_socket() :: port(). %gen_tcp:socket() isn't exported prior to erl 18
 -type tcp_socket() :: port(). %gen_tcp:socket() isn't exported prior to erl 18
 -type repl_state() :: #repl{}.
 -type repl_state() :: #repl{}.
+-type copy_state() :: #copy{}.
 
 
 -type error() :: {error, sync_required | closed | sock_closed | sock_error}.
 -type error() :: {error, sync_required | closed | sock_closed | sock_error}.
 
 
@@ -79,7 +81,7 @@
                 sock :: tcp_socket() | ssl:sslsocket() | undefined,
                 sock :: tcp_socket() | ssl:sslsocket() | undefined,
                 data = <<>>,
                 data = <<>>,
                 backend :: {Pid :: integer(), Key :: integer()} | undefined,
                 backend :: {Pid :: integer(), Key :: integer()} | undefined,
-                handler = on_message :: on_message | on_replication | undefined,
+                handler = on_message :: on_message | on_replication | on_copy_from_stdin | undefined,
                 codec :: epgsql_binary:codec() | undefined,
                 codec :: epgsql_binary:codec() | undefined,
                 queue = queue:new() :: queue:queue({epgsql_command:command(), any(), transport()}),
                 queue = queue:new() :: queue:queue({epgsql_command:command(), any(), transport()}),
                 current_cmd :: epgsql_command:command() | undefined,
                 current_cmd :: epgsql_command:command() | undefined,
@@ -92,7 +94,7 @@
                 sync_required :: boolean() | undefined,
                 sync_required :: boolean() | undefined,
                 txstatus :: byte() | undefined,  % $I | $T | $E,
                 txstatus :: byte() | undefined,  % $I | $T | $E,
                 complete_status :: atom() | {atom(), integer()} | undefined,
                 complete_status :: atom() | {atom(), integer()} | undefined,
-                repl :: repl_state() | undefined,
+                subproto_state :: repl_state() | copy_state() | undefined,
                 connect_opts :: epgsql:connect_opts() | undefined}).
                 connect_opts :: epgsql:connect_opts() | undefined}).
 
 
 -opaque pg_sock() :: #state{}.
 -opaque pg_sock() :: #state{}.
@@ -145,7 +147,7 @@ set_net_socket(Mod, Socket, State) ->
 
 
 -spec init_replication_state(pg_sock()) -> pg_sock().
 -spec init_replication_state(pg_sock()) -> pg_sock().
 init_replication_state(State) ->
 init_replication_state(State) ->
-    State#state{repl = #repl{}}.
+    State#state{subproto_state = #repl{}}.
 
 
 -spec set_attr(atom(), any(), pg_sock()) -> pg_sock().
 -spec set_attr(atom(), any(), pg_sock()) -> pg_sock().
 set_attr(backend, {_Pid, _Key} = Backend, State) ->
 set_attr(backend, {_Pid, _Key} = Backend, State) ->
@@ -158,8 +160,8 @@ set_attr(codec, Codec, State) ->
     State#state{codec = Codec};
     State#state{codec = Codec};
 set_attr(sync_required, Value, State) ->
 set_attr(sync_required, Value, State) ->
     State#state{sync_required = Value};
     State#state{sync_required = Value};
-set_attr(replication_state, Value, State) ->
-    State#state{repl = Value};
+set_attr(subproto_state, Value, State) ->
+    State#state{subproto_state = Value};
 set_attr(connect_opts, ConnectOpts, State) ->
 set_attr(connect_opts, ConnectOpts, State) ->
     State#state{connect_opts = ConnectOpts}.
     State#state{connect_opts = ConnectOpts}.
 
 
@@ -172,9 +174,9 @@ set_packet_handler(Handler, State) ->
 get_codec(#state{codec = Codec}) ->
 get_codec(#state{codec = Codec}) ->
     Codec.
     Codec.
 
 
--spec get_replication_state(pg_sock()) -> repl_state().
-get_replication_state(#state{repl = Repl}) ->
-    Repl.
+-spec get_subproto_state(pg_sock()) -> repl_state() | copy_state() | undefined.
+get_subproto_state(#state{subproto_state = SubState}) ->
+    SubState.
 
 
 -spec get_rows(pg_sock()) -> [tuple()].
 -spec get_rows(pg_sock()) -> [tuple()].
 get_rows(#state{rows = Rows}) ->
 get_rows(#state{rows = Rows}) ->
@@ -212,11 +214,11 @@ handle_call(get_cmd_status, _From, #state{complete_status = Status} = State) ->
 
 
 handle_call({standby_status_update, FlushedLSN, AppliedLSN}, _From,
 handle_call({standby_status_update, FlushedLSN, AppliedLSN}, _From,
             #state{handler = on_replication,
             #state{handler = on_replication,
-                   repl = #repl{last_received_lsn = ReceivedLSN} = Repl} = State) ->
+                   subproto_state = #repl{last_received_lsn = ReceivedLSN} = Repl} = State) ->
     send(State, ?COPY_DATA, epgsql_wire:encode_standby_status_update(ReceivedLSN, FlushedLSN, AppliedLSN)),
     send(State, ?COPY_DATA, epgsql_wire:encode_standby_status_update(ReceivedLSN, FlushedLSN, AppliedLSN)),
     Repl1 = Repl#repl{last_flushed_lsn = FlushedLSN,
     Repl1 = Repl#repl{last_flushed_lsn = FlushedLSN,
                       last_applied_lsn = AppliedLSN},
                       last_applied_lsn = AppliedLSN},
-    {reply, ok, State#state{repl = Repl1}}.
+    {reply, ok, State#state{subproto_state = Repl1}}.
 
 
 handle_cast({{Method, From, Ref} = Transport, Command, Args}, State)
 handle_cast({{Method, From, Ref} = Transport, Command, Args}, State)
   when ((Method == cast) or (Method == incremental)),
   when ((Method == cast) or (Method == incremental)),
@@ -259,7 +261,12 @@ handle_info({inet_reply, _, ok}, State) ->
     {noreply, State};
     {noreply, State};
 
 
 handle_info({inet_reply, _, Status}, State) ->
 handle_info({inet_reply, _, Status}, State) ->
-    {stop, Status, flush_queue(State, {error, Status})}.
+    {stop, Status, flush_queue(State, {error, Status})};
+
+handle_info({io_request, From, ReplyAs, Request}, #state{handler = on_copy_from_stdin} = State) ->
+    Response = handle_io_request(Request, State),
+    io_reply(Response, From, ReplyAs),
+    {noreply, State}.
 
 
 terminate(_Reason, #state{sock = undefined}) -> ok;
 terminate(_Reason, #state{sock = undefined}) -> ok;
 terminate(_Reason, #state{mod = gen_tcp, sock = Sock}) -> gen_tcp:close(Sock);
 terminate(_Reason, #state{mod = gen_tcp, sock = Sock}) -> gen_tcp:close(Sock);
@@ -400,7 +407,7 @@ do_send(gen_tcp, Sock, Bin) ->
 do_send(ssl, Sock, Bin) ->
 do_send(ssl, Sock, Bin) ->
     ssl:send(Sock, Bin).
     ssl:send(Sock, Bin).
 
 
-loop(#state{data = Data, handler = Handler, repl = Repl} = State) ->
+loop(#state{data = Data, handler = Handler, subproto_state = Repl} = State) ->
     case epgsql_wire:decode_message(Data) of
     case epgsql_wire:decode_message(Data) of
         {Type, Payload, Tail} ->
         {Type, Payload, Tail} ->
             case ?MODULE:Handler(Type, Payload, State#state{data = Tail}) of
             case ?MODULE:Handler(Type, Payload, State#state{data = Tail}) of
@@ -411,14 +418,16 @@ loop(#state{data = Data, handler = Handler, repl = Repl} = State) ->
             end;
             end;
         _ ->
         _ ->
             %% in replication mode send feedback after each batch of messages
             %% in replication mode send feedback after each batch of messages
-            case (Repl =/= undefined) andalso (Repl#repl.feedback_required) of
+            case Handler == on_replication
+                  andalso (Repl =/= undefined)
+                  andalso (Repl#repl.feedback_required) of
                 true ->
                 true ->
                     #repl{last_received_lsn = LastReceivedLSN,
                     #repl{last_received_lsn = LastReceivedLSN,
                           last_flushed_lsn = LastFlushedLSN,
                           last_flushed_lsn = LastFlushedLSN,
                           last_applied_lsn = LastAppliedLSN} = Repl,
                           last_applied_lsn = LastAppliedLSN} = Repl,
                     send(State, ?COPY_DATA, epgsql_wire:encode_standby_status_update(
                     send(State, ?COPY_DATA, epgsql_wire:encode_standby_status_update(
                         LastReceivedLSN, LastFlushedLSN, LastAppliedLSN)),
                         LastReceivedLSN, LastFlushedLSN, LastAppliedLSN)),
-                    {noreply, State#state{repl = Repl#repl{feedback_required = false}}};
+                    {noreply, State#state{subproto_state = Repl#repl{feedback_required = false}}};
                 _ ->
                 _ ->
                     {noreply, State}
                     {noreply, State}
             end
             end
@@ -488,6 +497,50 @@ flush_queue(#state{current_cmd = undefined} = State, _) ->
 flush_queue(State, Error) ->
 flush_queue(State, Error) ->
     flush_queue(finish(State, Error), Error).
     flush_queue(finish(State, Error), Error).
 
 
+%% @doc Handler for IO protocol version of COPY FROM STDIN
+%%
+%% COPY FROM STDIN is implemented as Erlang
+%% <a href="https://erlang.org/doc/apps/stdlib/io_protocol.html">io protocol</a>.
+handle_io_request({put_chars, Encoding, Chars}, State) ->
+    send(State, ?COPY_DATA, encode_chars(Encoding, Chars));
+handle_io_request({put_chars, Encoding, Mod, Fun, Args}, State) ->
+    try apply(Mod, Fun, Args) of
+        Chars when is_binary(Chars);
+                   is_list(Chars) ->
+            handle_io_request({put_chars, Encoding, Chars}, State);
+        Other ->
+            {error, {fun_return_not_characters, Other}}
+    catch T:R:S ->
+            {error, {fun_exception, {T, R, S}}}
+    end;
+handle_io_request({setopts, _}, _State) ->
+    {error, request};
+handle_io_request(getopts, _State) ->
+    {error, request};
+handle_io_request({requests, Requests}, State) ->
+    try_requests(Requests, State, ok).
+
+try_requests([Req | Requests], State, _) ->
+    case handle_io_request(Req, State) of
+        {error, _} = Err ->
+            Err;
+        Other ->
+            try_requests(Requests, State, Other)
+    end;
+try_requests([], _, LastRes) ->
+    LastRes.
+
+io_reply(Result, From, ReplyAs) ->
+    From ! {io_reply, ReplyAs, Result}.
+
+encode_chars(_, Bin) when is_binary(Bin) ->
+    Bin;
+encode_chars(unicode, Chars) when is_list(Chars) ->
+    unicode:characters_to_binary(Chars);
+encode_chars(latin1, Chars) when is_list(Chars) ->
+    unicode:characters_to_binary(Chars, latin1).
+
+
 to_binary(B) when is_binary(B) -> B;
 to_binary(B) when is_binary(B) -> B;
 to_binary(L) when is_list(L)   -> list_to_binary(L).
 to_binary(L) when is_list(L)   -> list_to_binary(L).
 
 
@@ -549,12 +602,26 @@ on_message(?NOTIFICATION, <<Pid:?int32, Strings/binary>>, State) ->
 on_message(Msg, Payload, State) ->
 on_message(Msg, Payload, State) ->
     command_handle_message(Msg, Payload, State).
     command_handle_message(Msg, Payload, State).
 
 
+%% @doc Handle "copy subprotocol" for COPY .. FROM STDIN
+%%
+%% Activated by `epgsql_cmd_copy_from_stdin' and deactivated by `epgsql_cmd_copy_done'
+on_copy_from_stdin(?COMMAND_COMPLETE, Bin, State) ->
+    _Complete = epgsql_wire:decode_complete(Bin),
+    {noreply, State#state{subproto_state = undefined, handler = on_message}};
+on_copy_from_stdin(?ERROR, Err, State) ->
+    Reason = epgsql_wire:decode_error(Err),
+    {stop, {error, Reason}, State};
+on_copy_from_stdin(M, Data, Sock) when M == ?NOTICE;
+                                       M == ?NOTIFICATION;
+                                       M == ?PARAMETER_STATUS ->
+    on_message(M, Data, Sock).
+
 
 
 %% CopyData for Replication mode
 %% CopyData for Replication mode
 on_replication(?COPY_DATA, <<?PRIMARY_KEEPALIVE_MESSAGE:8, LSN:?int64, _Timestamp:?int64, ReplyRequired:8>>,
 on_replication(?COPY_DATA, <<?PRIMARY_KEEPALIVE_MESSAGE:8, LSN:?int64, _Timestamp:?int64, ReplyRequired:8>>,
-               #state{repl = #repl{last_flushed_lsn = LastFlushedLSN,
-                                   last_applied_lsn = LastAppliedLSN,
-                                   align_lsn = AlignLsn} = Repl} = State) ->
+               #state{subproto_state = #repl{last_flushed_lsn = LastFlushedLSN,
+                                             last_applied_lsn = LastAppliedLSN,
+                                             align_lsn = AlignLsn} = Repl} = State) ->
     Repl1 =
     Repl1 =
         case ReplyRequired of
         case ReplyRequired of
             1 when AlignLsn ->
             1 when AlignLsn ->
@@ -571,14 +638,14 @@ on_replication(?COPY_DATA, <<?PRIMARY_KEEPALIVE_MESSAGE:8, LSN:?int64, _Timestam
                 Repl#repl{feedback_required = true,
                 Repl#repl{feedback_required = true,
                           last_received_lsn = LSN}
                           last_received_lsn = LSN}
         end,
         end,
-    {noreply, State#state{repl = Repl1}};
+    {noreply, State#state{subproto_state = Repl1}};
 
 
 %% CopyData for Replication mode
 %% CopyData for Replication mode
 on_replication(?COPY_DATA, <<?X_LOG_DATA, StartLSN:?int64, EndLSN:?int64,
 on_replication(?COPY_DATA, <<?X_LOG_DATA, StartLSN:?int64, EndLSN:?int64,
                              _Timestamp:?int64, WALRecord/binary>>,
                              _Timestamp:?int64, WALRecord/binary>>,
-               #state{repl = Repl} = State) ->
+               #state{subproto_state = Repl} = State) ->
     Repl1 = handle_xlog_data(StartLSN, EndLSN, WALRecord, Repl),
     Repl1 = handle_xlog_data(StartLSN, EndLSN, WALRecord, Repl),
-    {noreply, State#state{repl = Repl1}};
+    {noreply, State#state{subproto_state = Repl1}};
 on_replication(?ERROR, Err, State) ->
 on_replication(?ERROR, Err, State) ->
     Reason = epgsql_wire:decode_error(Err),
     Reason = epgsql_wire:decode_error(Err),
     {stop, {error, Reason}, State};
     {stop, {error, Reason}, State};

+ 7 - 0
src/epgsql_wire.erl

@@ -29,6 +29,7 @@
          encode_parse/3,
          encode_parse/3,
          encode_describe/2,
          encode_describe/2,
          encode_bind/4,
          encode_bind/4,
+         encode_copy_done/0,
          encode_execute/2,
          encode_execute/2,
          encode_close/2,
          encode_close/2,
          encode_flush/0,
          encode_flush/0,
@@ -213,6 +214,7 @@ decode_complete(Bin) ->
         ["DELETE", Rows]       -> {delete, list_to_integer(Rows)};
         ["DELETE", Rows]       -> {delete, list_to_integer(Rows)};
         ["MOVE", Rows]         -> {move, list_to_integer(Rows)};
         ["MOVE", Rows]         -> {move, list_to_integer(Rows)};
         ["FETCH", Rows]        -> {fetch, list_to_integer(Rows)};
         ["FETCH", Rows]        -> {fetch, list_to_integer(Rows)};
+        ["COPY", Rows]         -> {copy, list_to_integer(Rows)};
         [Type | _Rest]         -> lower_atom(Type)
         [Type | _Rest]         -> lower_atom(Type)
     end.
     end.
 
 
@@ -390,5 +392,10 @@ encode_flush() ->
 encode_sync() ->
 encode_sync() ->
     {?SYNC, []}.
     {?SYNC, []}.
 
 
+%% @doc encodes `CopyDone' packet.
+-spec encode_copy_done() -> {packet_type(), iodata()}.
+encode_copy_done() ->
+    {?COPY_DONE, []}.
+
 obj_atom_to_byte(statement) -> ?PREPARED_STATEMENT;
 obj_atom_to_byte(statement) -> ?PREPARED_STATEMENT;
 obj_atom_to_byte(portal) -> ?PORTAL.
 obj_atom_to_byte(portal) -> ?PORTAL.

+ 59 - 0
test/epgsql_copy_SUITE.erl

@@ -0,0 +1,59 @@
+-module(epgsql_copy_SUITE).
+-include_lib("common_test/include/ct.hrl").
+-include_lib("stdlib/include/assert.hrl").
+-include("epgsql.hrl").
+
+-export([
+    init_per_suite/1,
+    all/0,
+    end_per_suite/1,
+
+    from_stdin_text/1
+]).
+
+init_per_suite(Config) ->
+    [{module, epgsql}|Config].
+
+end_per_suite(_Config) ->
+    ok.
+
+all() ->
+    [
+     from_stdin_text%% ,
+     %% from_stdin_csv,
+     %% from_stdin_io_apis,
+     %% from_stdin_fragmented,
+     %% from_stdin_with_terminator,
+     %% from_stdin_corrupt_data
+    ].
+
+from_stdin_text(Config) ->
+    Module = ?config(module, Config),
+    epgsql_ct:with_connection(
+        Config,
+        fun(C) ->
+                ?assertEqual(
+                   {ok, [text, text]},
+                   Module:copy_from_stdin(
+                     C, "COPY test_table1 (id, value) FROM STDIN WITH (FORMAT text)")),
+                ?assertEqual(
+                   ok,
+                   io:put_chars(C,
+                                "10\thello world\n"
+                                "11\t\\N\n"
+                                "12\tline 12\n")),
+                ?assertEqual(
+                   ok,
+                   io:put_chars(C, "13\tline 13\n")),
+                ?assertEqual(
+                   {ok, 4},
+                   Module:copy_done(C)),
+                ?assertMatch(
+                   {ok, _, [{10, <<"hello world">>},
+                            {11, null},
+                            {12, <<"line 12">>},
+                            {13, <<"line 13">>}]},
+                   Module:equery(C,
+                                 "SELECT id, value FROM test_table1"
+                                 " WHERE id IN (10, 11, 12, 13) ORDER BY id"))
+        end).