Browse Source

Merge pull request #139 from seriyps/pluggable_commands

Pluggable commands
Sergey Prokhorov 7 years ago
parent
commit
45db8064d9

+ 0 - 3
include/epgsql_binary.hrl

@@ -1,3 +0,0 @@
--define(int16, 1/big-signed-unit:16).
--define(int32, 1/big-signed-unit:32).
--define(int64, 1/big-signed-unit:64).

+ 49 - 0
include/protocol.hrl

@@ -0,0 +1,49 @@
+-define(int16, 1/big-signed-unit:16).
+-define(int32, 1/big-signed-unit:32).
+-define(int64, 1/big-signed-unit:64).
+
+%% Commands defined as per this page:
+%% https://www.postgresql.org/docs/current/static/protocol-message-formats.html
+
+%% Commands
+-define(BIND, $B).
+-define(CLOSE, $C).
+-define(DESCRIBE, $D).
+-define(EXECUTE, $E).
+-define(FLUSH, $H).
+-define(PASSWORD, $p).
+-define(PARSE, $P).
+-define(SIMPLEQUERY, $Q).
+-define(AUTHENTICATION_REQUEST, $R).
+-define(SYNC, $S).
+
+%% Parameters
+
+-define(PREPARED_STATEMENT, $S).
+-define(PORTAL, $P).
+
+%% Responses
+
+-define(PARSE_COMPLETE, $1).
+-define(BIND_COMPLETE, $2).
+-define(CLOSE_COMPLETE, $3).
+-define(NOTIFICATION, $A).
+-define(COMMAND_COMPLETE, $C).
+-define(DATA_ROW, $D).
+-define(ERROR, $E).
+-define(EMPTY_QUERY, $I).
+-define(CANCELLATION_KEY, $K).
+-define(NO_DATA, $n).
+-define(NOTICE, $N).
+-define(PORTAL_SUSPENDED, $s).
+-define(PARAMETER_STATUS, $S).
+-define(PARAMETER_DESCRIPTION, $t).
+-define(ROW_DESCRIPTION, $T).
+-define(READY_FOR_QUERY, $Z).
+-define(COPY_BOTH_RESPONSE, $W).
+-define(COPY_DATA, $d).
+
+% CopyData replication messages
+-define(X_LOG_DATA, $w).
+-define(PRIMARY_KEEPALIVE_MESSAGE, $k).
+-define(STANDBY_STATUS_UPDATE, $r).

+ 81 - 0
src/commands/epgsql_cmd_batch.erl

@@ -0,0 +1,81 @@
+%% > Bind
+%% < BindComplete
+%% > Execute
+%% < DataRow*
+%% < CommandComplete
+%% -- Repeated many times --
+%% > Sync
+%% < ReadyForQuery
+-module(epgsql_cmd_batch).
+-behaviour(epgsql_command).
+-export([init/1, execute/2, handle_message/4]).
+-export_type([response/0]).
+
+-type response() :: [{ok, Count :: non_neg_integer(), Rows :: [tuple()]}
+                     | {ok, Count :: non_neg_integer()}
+                     | {ok, Rows :: [tuple()]}
+                     | {error, epgsql:query_error()}].
+
+-include("epgsql.hrl").
+-include("protocol.hrl").
+
+-record(batch,
+        {batch :: [{#statement{}, list()}],
+         decoder}).
+
+init(Batch) ->
+    #batch{batch = Batch}.
+
+execute(Sock, #batch{batch = Batch} = State) ->
+    Codec = epgsql_sock:get_codec(Sock),
+    Commands =
+        lists:foldr(
+          fun({Statement, Parameters}, Acc) ->
+                  #statement{name = StatementName,
+                             columns = Columns,
+                             types = Types} = Statement,
+                  TypedParameters = lists:zip(Types, Parameters),
+                  Bin1 = epgsql_wire:encode_parameters(TypedParameters, Codec),
+                  Bin2 = epgsql_wire:encode_formats(Columns),
+                  [{?BIND, [0, StatementName, 0, Bin1, Bin2]},
+                   {?EXECUTE, [0, <<0:?int32>>]} | Acc]
+          end,
+          [{?SYNC, []}],
+          Batch),
+    epgsql_sock:send_multi(Sock, Commands),
+    {ok, Sock, State}.
+
+handle_message(?BIND_COMPLETE, <<>>, Sock, #batch{batch = [{Stmt, _} | _]} = State) ->
+    #statement{columns = Columns} = Stmt,
+    Codec = epgsql_sock:get_codec(Sock),
+    Decoder = epgsql_wire:build_decoder(Columns, Codec),
+    {noaction, Sock, State#batch{decoder = Decoder}};
+handle_message(?DATA_ROW, <<_Count:?int16, Bin/binary>>, Sock,
+               #batch{decoder = Decoder} = State) ->
+    Row = epgsql_wire:decode_data(Bin, Decoder),
+    {add_row, Row, Sock, State};
+%% handle_message(?EMPTY_QUERY, _, Sock, _State) ->
+%%     Sock1 = epgsql_sock:add_result(Sock, {complete, empty}, {ok, [], []}),
+%%     {noaction, Sock1};
+handle_message(?COMMAND_COMPLETE, Bin, Sock,
+               #batch{batch = [{#statement{columns = Columns}, _} | Batch]} = State) ->
+    Complete = epgsql_wire:decode_complete(Bin),
+    Rows = epgsql_sock:get_rows(Sock),
+    Result = case Complete of
+                 {_, Count} when Columns == [] ->
+                     {ok, Count};
+                 {_, Count} ->
+                     {ok, Count, Rows};
+                 _ ->
+                     {ok, Rows}
+             end,
+    {add_result, Result, {complete, Complete}, Sock, State#batch{batch = Batch}};
+handle_message(?READY_FOR_QUERY, _Status, Sock, #batch{batch = B} = _State) when
+      length(B) =< 1 ->
+    Results = epgsql_sock:get_results(Sock),
+    {finish, Results, done, Sock};
+handle_message(?ERROR, Error, Sock, #batch{batch = [_ | Batch]} = State) ->
+    Result = {error, Error},
+    {add_result, Result, Result, Sock, State#batch{batch = Batch}};
+handle_message(_, _, _, _) ->
+    unknown.

+ 40 - 0
src/commands/epgsql_cmd_bind.erl

@@ -0,0 +1,40 @@
+%% > Bind
+%% < BindComplete
+-module(epgsql_cmd_bind).
+-behaviour(epgsql_command).
+-export([init/1, execute/2, handle_message/4]).
+-export_type([response/0]).
+
+-type response() :: ok | {error, epgsql:query_error()}.
+
+-include("epgsql.hrl").
+-include("protocol.hrl").
+
+-record(bind,
+        {stmt :: #statement{},
+         portal :: iodata(),
+         params :: list()}).
+
+init({Stmt, PortalName, Params}) ->
+    #bind{stmt = Stmt, portal = PortalName, params = Params}.
+
+execute(Sock, #bind{stmt = Stmt, portal = PortalName, params = Params} = St) ->
+    #statement{name = StatementName, columns = Columns, types = Types} = Stmt,
+    Codec = epgsql_sock:get_codec(Sock),
+    TypedParams = lists:zip(Types, Params),
+    Bin1 = epgsql_wire:encode_parameters(TypedParams, Codec),
+    Bin2 = epgsql_wire:encode_formats(Columns),
+    epgsql_sock:send_multi(
+      Sock,
+      [
+       {?BIND, [PortalName, 0, StatementName, 0, Bin1, Bin2]},
+       {?FLUSH, []}
+      ]),
+    {ok, Sock, St}.
+
+handle_message(?BIND_COMPLETE, <<>>, Sock, _State) ->
+    {finish, ok, ok, Sock};
+handle_message(?ERROR, Error, _Sock, _State) ->
+    {sync_required, {error, Error}};
+handle_message(_, _, _, _) ->
+    unknown.

+ 38 - 0
src/commands/epgsql_cmd_close.erl

@@ -0,0 +1,38 @@
+%% > Close
+%% < CloseComplete
+-module(epgsql_cmd_close).
+-behaviour(epgsql_command).
+-export([init/1, execute/2, handle_message/4]).
+-export_type([response/0]).
+
+-type response() :: ok | {error, epgsql:query_error()}.
+
+-include("epgsql.hrl").
+-include("protocol.hrl").
+
+-record(close,
+        {type :: statement | portal,
+         name :: iodata()}).
+
+init({Type, Name}) ->
+    #close{type = Type, name = Name}.
+
+execute(Sock, #close{type = Type, name = Name} = St) ->
+    Type2 = case Type of
+        statement -> ?PREPARED_STATEMENT;
+        portal    -> ?PORTAL
+    end,
+    epgsql_sock:send_multi(
+      Sock,
+      [
+       {?CLOSE, [Type2, Name, 0]},
+       {?FLUSH, []}
+      ]),
+    {ok, Sock, St}.
+
+handle_message(?CLOSE_COMPLETE, <<>>, Sock, _St) ->
+    {finish, ok, ok, Sock};
+handle_message(?ERROR, Error, _Sock, _State) ->
+    {sync_required, {error, Error}};
+handle_message(_, _, _, _) ->
+    unknown.

+ 178 - 0
src/commands/epgsql_cmd_connect.erl

@@ -0,0 +1,178 @@
+%%% Special kind of command - it's exclusive: no other commands can run until
+%%% this one finishes.
+%%% It also uses some 'private' epgsql_sock's APIs
+%%%
+-module(epgsql_cmd_connect).
+-behaviour(epgsql_command).
+-export([init/1, execute/2, handle_message/4]).
+-export_type([response/0]).
+
+-type response() :: connected
+                  | {error,
+                     invalid_authorization_specification
+                     | invalid_password
+                     | epgsql:query_error()}.
+
+-include("epgsql.hrl").
+-include("protocol.hrl").
+
+-record(connect,
+        {opts :: list(),
+         auth_method,
+         stage = connect :: connect | auth | initialization}).
+
+init({Host, Username, Password, Opts}) ->
+    Opts1 = [{host, Host},
+             {username, Username},
+             {password, Password}
+             | Opts],
+    #connect{opts = Opts1}.
+
+execute(PgSock, #connect{opts = Opts, stage = connect} = State) ->
+    Host = get_val(host, Opts),
+    Username = get_val(username, Opts),
+    %% _ = get_val(password, Opts),
+    Timeout = proplists:get_value(timeout, Opts, 5000),
+    Port = proplists:get_value(port, Opts, 5432),
+    SockOpts = [{active, false}, {packet, raw}, binary, {nodelay, true}, {keepalive, true}],
+    case gen_tcp:connect(Host, Port, SockOpts, Timeout) of
+        {ok, Sock} ->
+
+            %% Increase the buffer size.  Following the recommendation in the inet man page:
+            %%
+            %%    It is recommended to have val(buffer) >=
+            %%    max(val(sndbuf),val(recbuf)).
+
+            {ok, [{recbuf, RecBufSize}, {sndbuf, SndBufSize}]} =
+                inet:getopts(Sock, [recbuf, sndbuf]),
+            inet:setopts(Sock, [{buffer, max(RecBufSize, SndBufSize)}]),
+
+            PgSock1 = maybe_ssl(Sock, proplists:get_value(ssl, Opts, false), Opts, PgSock),
+
+            Opts2 = ["user", 0, Username, 0],
+            Opts3 = case proplists:get_value(database, Opts, undefined) of
+                undefined -> Opts2;
+                Database  -> [Opts2 | ["database", 0, Database, 0]]
+            end,
+
+            Replication = proplists:get_value(replication, Opts, undefined),
+            Opts4 = case Replication of
+                        undefined -> Opts3;
+                        Replication  ->
+                            [Opts3 | ["replication", 0, Replication, 0]]
+                    end,
+            PgSock2 = case Replication of
+                          undefined -> PgSock1;
+                          _ -> epgsql_sock:init_replication_state(PgSock1)
+                      end,
+
+            epgsql_sock:send(PgSock2, [<<196608:?int32>>, Opts4, 0]),
+            PgSock3 = case proplists:get_value(async, Opts, undefined) of
+                          undefined -> PgSock2;
+                          Async -> epgsql_sock:set_attr(async, Async, PgSock2)
+                      end,
+            {ok, PgSock3, State#connect{stage = auth}};
+        {error, Reason} = Error ->
+            {stop, Reason, Error, PgSock}
+    end;
+execute(PgSock, #connect{stage = auth, auth_method = cleartext, opts = Opts} = St) ->
+    Password = get_val(password, Opts),
+    epgsql_sock:send(PgSock, ?PASSWORD, [Password, 0]),
+    {ok, PgSock, St};
+execute(PgSock, #connect{stage = auth, auth_method = {md5, Salt}, opts = Opts} = St) ->
+    User = get_val(username, Opts),
+    Password = get_val(password, Opts),
+    Digest1 = hex(erlang:md5([Password, User])),
+    Str = ["md5", hex(erlang:md5([Digest1, Salt])), 0],
+    epgsql_sock:send(PgSock, ?PASSWORD, Str),
+    {ok, PgSock, St}.
+
+
+maybe_ssl(S, false, _, PgSock) ->
+    epgsql_sock:set_net_socket(gen_tcp, S, PgSock);
+maybe_ssl(S, Flag, Opts, PgSock) ->
+    ok = gen_tcp:send(S, <<8:?int32, 80877103:?int32>>),
+    Timeout = proplists:get_value(timeout, Opts, 5000),
+    {ok, <<Code>>} = gen_tcp:recv(S, 1, Timeout),
+    case Code of
+        $S  ->
+            SslOpts = proplists:get_value(ssl_opts, Opts, []),
+            case ssl:connect(S, SslOpts, Timeout) of
+                {ok, S2}        ->
+                    epgsql_sock:set_net_socket(ssl, S2, PgSock);
+                {error, Reason} ->
+                    exit({ssl_negotiation_failed, Reason})
+            end;
+        $N ->
+            case Flag of
+                true ->
+                    epgsql_sock:set_net_socket(gen_tcp, S, PgSock);
+                required ->
+                    exit(ssl_not_available)
+            end
+    end.
+
+%% --- Auth ---
+
+%% AuthenticationOk
+handle_message(?AUTHENTICATION_REQUEST, <<0:?int32>>, Sock, State) ->
+    {noaction, Sock, State#connect{stage = initialization}};
+
+%% AuthenticationCleartextPassword
+handle_message(?AUTHENTICATION_REQUEST, <<3:?int32>>, Sock, St) ->
+    {requeue, Sock, St#connect{stage = auth, auth_method = cleartext}};
+
+%% AuthenticationMD5Password
+handle_message(?AUTHENTICATION_REQUEST, <<5:?int32, Salt:4/binary>>, Sock, St) ->
+    {requeue, Sock, St#connect{stage = auth, auth_method = {md5, Salt}}};
+
+handle_message(?AUTHENTICATION_REQUEST, <<M:?int32, _/binary>>, Sock, _State) ->
+    Method = case M of
+        2 -> kerberosV5;
+        4 -> crypt;
+        6 -> scm;
+        7 -> gss;
+        8 -> sspi;
+        _ -> unknown
+    end,
+    {stop, normal, {error, {unsupported_auth_method, Method}}, Sock};
+
+%% --- Initialization ---
+
+%% BackendKeyData
+handle_message(?CANCELLATION_KEY, <<Pid:?int32, Key:?int32>>, Sock, _State) ->
+    {noaction, epgsql_sock:set_attr(backend, {Pid, Key}, Sock)};
+
+%% ReadyForQuery
+handle_message(?READY_FOR_QUERY, _, Sock, _State) ->
+    %% TODO decode dates to now() format
+    case epgsql_sock:get_parameter_internal(<<"integer_datetimes">>, Sock) of
+        <<"on">>  -> put(datetime_mod, epgsql_idatetime);
+        <<"off">> -> put(datetime_mod, epgsql_fdatetime)
+    end,
+    Sock1 = epgsql_sock:set_attr(codec, epgsql_binary:new_codec([]), Sock),
+    {finish, connected, connected, Sock1};
+
+
+%% ErrorResponse
+handle_message(?ERROR, Err, Sock, #connect{stage = auth} = _State) ->
+    Why = case Err#error.code of
+        <<"28000">> -> invalid_authorization_specification;
+        <<"28P01">> -> invalid_password;
+        Any         -> Any
+    end,
+    {stop, normal, {error, Why}, Sock};
+handle_message(_, _, _, _) ->
+    unknown.
+
+
+get_val(Key, Proplist) ->
+    Val = proplists:get_value(Key, Proplist),
+    (Val =/= undefined) orelse error({required_option, Key}),
+    Val.
+
+hex(Bin) ->
+    HChar = fun(N) when N < 10 -> $0 + N;
+               (N) when N < 16 -> $W + N
+            end,
+    <<<<(HChar(H)), (HChar(L))>> || <<H:4, L:4>> <= Bin>>.

+ 39 - 0
src/commands/epgsql_cmd_describe_portal.erl

@@ -0,0 +1,39 @@
+%% > Describe
+%% < RowDescription | NoData
+-module(epgsql_cmd_describe_portal).
+-behaviour(epgsql_command).
+-export([init/1, execute/2, handle_message/4]).
+-export_type([response/0]).
+
+-include("epgsql.hrl").
+-include("protocol.hrl").
+
+-type response() :: {ok, [epgsql:column()]} | {error, epgsql:query_error()}.
+
+-record(desc_portal,
+        {name :: iodata(),
+         parameter_descr}).
+
+init(Name) ->
+    #desc_portal{name = Name}.
+
+execute(Sock, #desc_portal{name = Name} = St) ->
+    epgsql_sock:send_multi(
+      Sock,
+      [
+       {?DESCRIBE, [?PORTAL, Name, 0]},
+       {?FLUSH, []}
+      ]),
+    {ok, Sock, St}.
+
+handle_message(?ROW_DESCRIPTION, <<Count:?int16, Bin/binary>>, Sock, St) ->
+    Codec = epgsql_sock:get_codec(Sock),
+    Columns = epgsql_wire:decode_columns(Count, Bin, Codec),
+    {finish, {ok, Columns}, {columns, Columns}, St};
+handle_message(?NO_DATA, <<>>, _Sock, _State) ->
+    {finish, {ok, []}, no_data};
+handle_message(?ERROR, Error, _Sock, _State) ->
+    Result = {error, Error},
+    {sync_required, Result};
+handle_message(_, _, _, _) ->
+    unknown.

+ 51 - 0
src/commands/epgsql_cmd_describe_statement.erl

@@ -0,0 +1,51 @@
+%% Almost the same as "parse"
+%% > Describe
+%% < ParameterDescription
+%% < RowDescription | NoData
+-module(epgsql_cmd_describe_statement).
+-behaviour(epgsql_command).
+-export([init/1, execute/2, handle_message/4]).
+-export_type([response/0]).
+
+-include("epgsql.hrl").
+-include("protocol.hrl").
+
+-type response() :: {ok, #statement{}} | {error, epgsql:query_error()}.
+
+-record(desc_stmt,
+        {name :: iodata(),
+         parameter_descr}).
+
+init(Name) ->
+    #desc_stmt{name = Name}.
+
+execute(Sock, #desc_stmt{name = Name} = St) ->
+    epgsql_sock:send_multi(
+      Sock,
+      [
+       {?DESCRIBE, [?PREPARED_STATEMENT, Name, 0]},
+       {?FLUSH, []}
+      ]),
+    {ok, Sock, St}.
+
+handle_message(?PARAMETER_DESCRIPTION, Bin, Sock, State) ->
+    Codec = epgsql_sock:get_codec(Sock),
+    Types = epgsql_wire:decode_parameters(Bin, Codec),
+    Sock2 = epgsql_sock:notify(Sock, {types, Types}),
+    {noaction, Sock2, State#desc_stmt{parameter_descr = Types}};
+handle_message(?ROW_DESCRIPTION, <<Count:?int16, Bin/binary>>, Sock,
+               #desc_stmt{name = Name, parameter_descr = Params}) ->
+    Codec = epgsql_sock:get_codec(Sock),
+    Columns = epgsql_wire:decode_columns(Count, Bin, Codec),
+    Columns2 = [Col#column{format = epgsql_wire:format(Col#column.type, Codec)}
+                || Col <- Columns],
+    Result = {ok, #statement{name = Name, types = Params, columns = Columns2}},
+    {finish, Result, {columns, Columns2}, Sock};
+handle_message(?NO_DATA, <<>>, Sock, #desc_stmt{name = Name, parameter_descr = Params}) ->
+    Result = {ok, #statement{name = Name, types = Params, columns = []}},
+    {finish, Result, no_data, Sock};
+handle_message(?ERROR, Error, _Sock, _State) ->
+    Result = {error, Error},
+    {sync_required, Result};
+handle_message(_, _, _, _) ->
+    unknown.

+ 85 - 0
src/commands/epgsql_cmd_equery.erl

@@ -0,0 +1,85 @@
+%% > Bind
+%% < BindComplete
+%% > Execute
+%% < DataRow*
+%% < CommandComplete
+%% > Close
+%% < CloseComplete
+%% > Sync
+%% < ReadyForQuery
+-module(epgsql_cmd_equery).
+-behaviour(epgsql_command).
+-export([init/1, execute/2, handle_message/4]).
+-export_type([response/0]).
+
+-type response() :: {ok, Count :: non_neg_integer(), Cols :: [epgsql:column()], Rows :: [tuple()]}
+                  | {ok, Count :: non_neg_integer()}
+                  | {ok, Cols :: [epgsql:column()], Rows :: [tuple()]}
+                  | {error, epgsql:query_error()}.
+
+-include("epgsql.hrl").
+-include("protocol.hrl").
+
+-record(equery,
+        {stmt :: #statement{},
+         params :: list(),
+         decoder}).
+
+init({Stmt, TypedParams}) ->
+    #equery{stmt = Stmt,
+            params = TypedParams}.
+
+execute(Sock, #equery{stmt = Stmt, params = TypedParams} = St) ->
+    #statement{name = StatementName, columns = Columns} = Stmt,
+    Codec = epgsql_sock:get_codec(Sock),
+    Bin1 = epgsql_wire:encode_parameters(TypedParams, Codec),
+    Bin2 = epgsql_wire:encode_formats(Columns),
+    epgsql_sock:send_multi(
+      Sock,
+      [
+       {?BIND, ["", 0, StatementName, 0, Bin1, Bin2]},
+       {?EXECUTE, ["", 0, <<0:?int32>>]},
+       {?CLOSE, [?PREPARED_STATEMENT, StatementName, 0]},
+       {?SYNC, []}
+      ]),
+    {ok, Sock, St}.
+
+handle_message(?BIND_COMPLETE, <<>>, Sock, #equery{stmt = Stmt} = State) ->
+    #statement{columns = Columns} = Stmt,
+    epgsql_sock:notify(Sock, {columns, Columns}), % Why do we need this?
+    Codec = epgsql_sock:get_codec(Sock),
+    Decoder = epgsql_wire:build_decoder(Columns, Codec),
+    {noaction, Sock, State#equery{decoder = Decoder}};
+handle_message(?DATA_ROW, <<_Count:?int16, Bin/binary>>,
+               Sock, #equery{decoder = Decoder} = St) ->
+    Row = epgsql_wire:decode_data(Bin, Decoder),
+    {add_row, Row, Sock, St};
+handle_message(?EMPTY_QUERY, <<>>, Sock, St) ->
+    {add_result, {ok, [], []}, {complete, empty}, Sock, St};
+handle_message(?COMMAND_COMPLETE, Bin, Sock, #equery{stmt = Stmt} = St) ->
+    Complete = epgsql_wire:decode_complete(Bin),
+    #statement{columns = Cols} = Stmt,
+    Rows = epgsql_sock:get_rows(Sock),
+    Result = case Complete of
+                 {_, Count} when Cols == [] ->
+                     {ok, Count};
+                 {_, Count} ->
+                     {ok, Count, Cols, Rows};
+                 _ ->
+                     {ok, Cols, Rows}
+             end,
+    {add_result, Result, {complete, Complete}, Sock, St};
+handle_message(?CLOSE_COMPLETE, _, Sock, _State) ->
+    {noaction, Sock};
+handle_message(?READY_FOR_QUERY, _Status, Sock, _State) ->
+    case epgsql_sock:get_results(Sock) of
+        [Result] ->
+            {finish, Result, done, Sock};
+        [] ->
+            {finish, done, done, Sock}
+    end;
+handle_message(?ERROR, Error, Sock, St) ->
+    Result = {error, Error},
+    {add_result, Result, Result, Sock, St};
+handle_message(_, _, _, _) ->
+    unknown.

+ 63 - 0
src/commands/epgsql_cmd_execute.erl

@@ -0,0 +1,63 @@
+%% > Execute
+%% < DataRow*
+%% < CommandComplete | PortalSuspended
+-module(epgsql_cmd_execute).
+-behaviour(epgsql_command).
+-export([init/1, execute/2, handle_message/4]).
+-export_type([response/0]).
+
+-type response() :: {ok, Count :: non_neg_integer(), Rows :: [tuple()]}
+                  | {ok, Count :: non_neg_integer()}
+                  | {ok, Rows :: [tuple()]}
+                  | {error, epgsql:query_error()}.
+
+-include("epgsql.hrl").
+-include("protocol.hrl").
+
+-record(execute,
+        {stmt :: #statement{},
+         portal_name :: iodata(),
+         max_rows :: non_neg_integer(),
+         decoder}).
+
+init({Stmt, PortalName, MaxRows}) ->
+    #execute{stmt = Stmt, portal_name = PortalName, max_rows = MaxRows}.
+
+execute(Sock, #execute{stmt = Stmt, portal_name = PortalName, max_rows = MaxRows} = State) ->
+    epgsql_sock:send_multi(
+      Sock,
+      [
+       {?EXECUTE, [PortalName, 0, <<MaxRows:?int32>>]},
+       {?FLUSH, []}
+      ]),
+    #statement{columns = Columns} = Stmt,
+    Codec = epgsql_sock:get_codec(Sock),
+    Decoder = epgsql_wire:build_decoder(Columns, Codec),
+    {ok, Sock, State#execute{decoder = Decoder}}.
+
+handle_message(?DATA_ROW, <<_Count:?int16, Bin/binary>>, Sock,
+               #execute{decoder = Decoder} = St) ->
+    Row = epgsql_wire:decode_data(Bin, Decoder),
+    {add_row, Row, Sock, St};
+handle_message(?EMPTY_QUERY, _, Sock, _State) ->
+    {finish, {ok, [], []}, {complete, empty}, Sock};
+handle_message(?COMMAND_COMPLETE, Bin, Sock,
+               #execute{stmt = #statement{columns = Cols}}) ->
+    Complete = epgsql_wire:decode_complete(Bin),
+    Rows = epgsql_sock:get_rows(Sock),
+    Result = case Complete of
+                 {_, Count} when Cols == [] ->
+                     {ok, Count};
+                 {_, Count} ->
+                     {ok, Count, Rows};
+                 _ ->
+                     {ok, Rows}
+             end,
+    {finish, Result, {complete, Complete}, Sock};
+handle_message(?PORTAL_SUSPENDED, <<>>, Sock, _State) ->
+    Rows = epgsql_sock:get_rows(Sock),
+    {finish, {partial, Rows}, suspended, Sock};
+handle_message(?ERROR, Error, _Sock, _State) ->
+    {sync_required, {error, Error}};
+handle_message(_, _, _, _) ->
+    unknown.

+ 59 - 0
src/commands/epgsql_cmd_parse.erl

@@ -0,0 +1,59 @@
+%% > Parse
+%% < ParseComplete
+%% > Describe
+%% < ParameterDescription
+%% < RowDescription | NoData
+-module(epgsql_cmd_parse).
+-behaviour(epgsql_command).
+-export([init/1, execute/2, handle_message/4]).
+-export_type([response/0]).
+
+-include("epgsql.hrl").
+-include("protocol.hrl").
+
+-type response() :: {ok, #statement{}} | {error, epgsql:query_error()}.
+
+-record(parse,
+        {name :: iodata(),
+         sql :: iodata(),
+         types :: [atom()],
+         parameter_descr = []}).
+
+init({Name, Sql, Types}) ->
+    #parse{name = Name, sql = Sql, types = Types}.
+
+execute(Sock, #parse{name = Name, sql = Sql, types = Types} = St) ->
+    Codec = epgsql_sock:get_codec(Sock),
+    Bin = epgsql_wire:encode_types(Types, Codec),
+    epgsql_sock:send_multi(
+      Sock,
+      [
+       {?PARSE, [Name, 0, Sql, 0, Bin]},
+       {?DESCRIBE, [?PREPARED_STATEMENT, Name, 0]},
+       {?FLUSH, []}
+      ]),
+    {ok, Sock, St}.
+
+handle_message(?PARSE_COMPLETE, <<>>, Sock, _State) ->
+    {noaction, Sock};
+handle_message(?PARAMETER_DESCRIPTION, Bin, Sock, State) ->
+    Codec = epgsql_sock:get_codec(Sock),
+    Types = epgsql_wire:decode_parameters(Bin, Codec),
+    Sock2 = epgsql_sock:notify(Sock, {types, Types}),
+    {noaction, Sock2, State#parse{parameter_descr = Types}};
+handle_message(?ROW_DESCRIPTION, <<Count:?int16, Bin/binary>>, Sock,
+               #parse{name = Name, parameter_descr = Params}) ->
+    Codec = epgsql_sock:get_codec(Sock),
+    Columns = epgsql_wire:decode_columns(Count, Bin, Codec),
+    Columns2 = [Col#column{format = epgsql_wire:format(Col#column.type, Codec)}
+                || Col <- Columns],
+    Result = {ok, #statement{name = Name, types = Params, columns = Columns2}},
+    {finish, Result, {columns, Columns2}, Sock};
+handle_message(?NO_DATA, <<>>, Sock, #parse{name = Name, parameter_descr = Params}) ->
+    Result = {ok, #statement{name = Name, types = Params, columns = []}},
+    {finish, Result, no_data, Sock};
+handle_message(?ERROR, Error, _Sock, _State) ->
+    Result = {error, Error},
+    {sync_required, Result};
+handle_message(_, _, _, _) ->
+    unknown.

+ 81 - 0
src/commands/epgsql_cmd_prepared_query.erl

@@ -0,0 +1,81 @@
+%% Almost the same as equery, but don't execute 'CLOSE'
+%% > Bind
+%% < BindComplete
+%% > Execute
+%% < DataRow*
+%% < CommandComplete
+%% > Sync
+%% < ReadyForQuery
+-module(epgsql_cmd_prepared_query).
+-behaviour(epgsql_command).
+-export([init/1, execute/2, handle_message/4]).
+-export_type([response/0]).
+
+-type response() :: {ok, Count :: non_neg_integer(), Cols :: [epgsql:column()], Rows :: [tuple()]}
+                  | {ok, Count :: non_neg_integer()}
+                  | {ok, Cols :: [epgsql:column()], Rows :: [tuple()]}
+                  | {error, epgsql:query_error()}.
+
+-include("epgsql.hrl").
+-include("protocol.hrl").
+
+-record(pquery,
+        {stmt :: #statement{},
+         params :: list(),
+         decoder}).
+
+init({Stmt, TypedParams}) ->
+    #pquery{stmt = Stmt,
+            params = TypedParams}.
+
+execute(Sock, #pquery{stmt = Stmt, params = TypedParams} = St) ->
+    #statement{name = StatementName, columns = Columns} = Stmt,
+    Codec = epgsql_sock:get_codec(Sock),
+    Bin1 = epgsql_wire:encode_parameters(TypedParams, Codec),
+    Bin2 = epgsql_wire:encode_formats(Columns),
+    epgsql_sock:send_multi(
+      Sock,
+      [
+       {?BIND, ["", 0, StatementName, 0, Bin1, Bin2]},
+       {?EXECUTE, ["", 0, <<0:?int32>>]},
+       {?SYNC, []}
+      ]),
+    {ok, Sock, St}.
+
+handle_message(?BIND_COMPLETE, <<>>, Sock, #pquery{stmt = Stmt} = State) ->
+    #statement{columns = Columns} = Stmt,
+    epgsql_sock:notify(Sock, {columns, Columns}), % Why do we need this?
+    Codec = epgsql_sock:get_codec(Sock),
+    Decoder = epgsql_wire:build_decoder(Columns, Codec),
+    {noaction, Sock, State#pquery{decoder = Decoder}};
+handle_message(?DATA_ROW, <<_Count:?int16, Bin/binary>>,
+               Sock, #pquery{decoder = Decoder} = St) ->
+    Row = epgsql_wire:decode_data(Bin, Decoder),
+    {add_row, Row, Sock, St};
+handle_message(?EMPTY_QUERY, _, Sock, St) ->
+    {add_result, {ok, [], []}, {complete, empty}, Sock, St};
+handle_message(?COMMAND_COMPLETE, Bin, Sock, #pquery{stmt = Stmt} = St) ->
+    Complete = epgsql_wire:decode_complete(Bin),
+    #statement{columns = Cols} = Stmt,
+    Rows = epgsql_sock:get_rows(Sock),
+    Result = case Complete of
+                 {_, Count} when Cols == [] ->
+                     {ok, Count};
+                 {_, Count} ->
+                     {ok, Count, Cols, Rows};
+                 _ ->
+                     {ok, Cols, Rows}
+             end,
+    {add_result, Result, {complete, Complete}, Sock, St};
+handle_message(?READY_FOR_QUERY, _Status, Sock, _State) ->
+    case epgsql_sock:get_results(Sock) of
+        [Result] ->
+            {finish, Result, done, Sock};
+        [] ->
+            {finish, done, done, Sock}
+    end;
+handle_message(?ERROR, Error, Sock, St) ->
+    Result = {error, Error},
+    {add_result, Result, Result, Sock, St};
+handle_message(_, _, _, _) ->
+    unknown.

+ 74 - 0
src/commands/epgsql_cmd_squery.erl

@@ -0,0 +1,74 @@
+%% Squery may contain many semicolon-separated queries
+%% > Query
+%% < (RowDescription?
+%% <  DataRow*
+%% <  CommandComplete)+
+%% < ReadyForQuery
+%% ---
+%% > Query when len(strip(Query)) == 0
+%% < EmptyQueryResponse
+%% < ReadyForQuery
+-module(epgsql_cmd_squery).
+-behaviour(epgsql_command).
+-export([init/1, execute/2, handle_message/4]).
+-export_type([response/0]).
+
+-type response_single() ::
+        {ok, Count :: non_neg_integer(), Cols :: [epgsql:column()], Rows :: [tuple()]}
+      | {ok, Count :: non_neg_integer()}
+      | {ok, Cols :: [epgsql:column()], Rows :: [tuple()]}
+      | {error, epgsql:query_error()}.
+-type response() :: response_single() | [response_single()].
+
+-include("protocol.hrl").
+
+-record(squery,
+        {query :: iodata(),
+         columns = [],
+         decoder}).
+
+init(Sql) ->
+    #squery{query = Sql}.
+
+execute(Sock, #squery{query = Q} = State) ->
+    epgsql_sock:send(Sock, ?SIMPLEQUERY, [Q, 0]),
+    {ok, Sock, State}.
+
+handle_message(?ROW_DESCRIPTION, <<Count:?int16, Bin/binary>>, Sock, State) ->
+    Codec = epgsql_sock:get_codec(Sock),
+    Columns = epgsql_wire:decode_columns(Count, Bin, Codec),
+    Decoder = epgsql_wire:build_decoder(Columns, Codec),
+    epgsql_sock:notify(Sock, {columns, Columns}),
+    {noaction, Sock, State#squery{columns = Columns,
+                                  decoder = Decoder}};
+handle_message(?DATA_ROW, <<_Count:?int16, Bin/binary>>,
+               Sock, #squery{decoder = Decoder} = St) ->
+    Row = epgsql_wire:decode_data(Bin, Decoder),
+    {add_row, Row, Sock, St};
+handle_message(?COMMAND_COMPLETE, Bin, Sock, #squery{columns = Cols} = St) ->
+    Complete = epgsql_wire:decode_complete(Bin),
+    Rows = epgsql_sock:get_rows(Sock),
+    Result = case Complete of
+                 {_, Count} when Cols == [] ->
+                     {ok, Count};
+                 {_, Count} ->
+                     {ok, Count, Cols, Rows};
+                 _ ->
+                     {ok, Cols, Rows}
+             end,
+    {add_result, Result, {complete, Complete}, Sock, St};
+handle_message(?EMPTY_QUERY, _, Sock, St) ->
+    {add_result, {ok, [], []}, {complete, empty}, Sock, St};
+handle_message(?READY_FOR_QUERY, _Status, Sock, _State) ->
+    %% We return single result if there is only one or list of results if
+    %% there are more than one
+    Result = case epgsql_sock:get_results(Sock) of
+                 [Res] -> Res;
+                 Res -> Res
+             end,
+    {finish, Result, done, Sock};
+handle_message(?ERROR, Error, Sock, St) ->
+    Result = {error, Error},
+    {add_result, Result, Result, Sock, St};
+handle_message(_, _, _, _) ->
+    unknown.

+ 65 - 0
src/commands/epgsql_cmd_start_replication.erl

@@ -0,0 +1,65 @@
+%% > SimpleQuery "START_REPLICATION ..."
+%% < CopyBothResponse | Error
+-module(epgsql_cmd_start_replication).
+-behaviour(epgsql_command).
+-export([init/1, execute/2, handle_message/4]).
+-export_type([response/0]).
+
+-type response() :: ok | {error, epgsql:query_error()}.
+
+-include("epgsql.hrl").
+-include("protocol.hrl").
+-include("../epgsql_replication.hrl").
+
+-record(start_repl,
+        {slot,
+         callback,
+         cb_state,
+         wal_pos,
+         plugin_opts}).
+
+init({ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts}) ->
+    #start_repl{slot = ReplicationSlot,
+                callback = Callback,
+                cb_state = CbInitState,
+                wal_pos = WALPosition,
+                plugin_opts = PluginOpts}.
+
+execute(Sock, #start_repl{slot = ReplicationSlot, callback = Callback,
+                          cb_state = CbInitState, wal_pos = WALPosition,
+                          plugin_opts = PluginOpts} = St) ->
+    %% Connection should be started with 'replication' option. Then
+    %% 'replication_state' will be initialized
+    Repl = #repl{} = epgsql_sock:get_replication_state(Sock),
+    Sql1 = ["START_REPLICATION SLOT ", ReplicationSlot, " LOGICAL ", WALPosition],
+    Sql2 =
+        case PluginOpts of
+            [] -> Sql1;
+            PluginOpts -> [Sql1 , " (", PluginOpts, ")"]
+        end,
+
+    Repl2 =
+        case Callback of
+            Pid when is_pid(Pid) -> Repl#repl{receiver = Pid};
+            Module -> Repl#repl{cbmodule = Module, cbstate = CbInitState}
+        end,
+
+    Hex = [H || H <- WALPosition, H =/= $/],
+    {ok, [LSN], _} = io_lib:fread("~16u", Hex),
+
+    Repl3 = Repl2#repl{last_flushed_lsn = LSN,
+                       last_applied_lsn = LSN},
+    Sock2 = epgsql_sock:set_attr(replication_state, Repl3, Sock),
+                         %% handler = on_replication},
+
+    epgsql_sock:send(Sock2, ?SIMPLEQUERY, [Sql2, 0]),
+    {ok, Sock2, St}.
+
+%% CopyBothResponse
+handle_message(?COPY_BOTH_RESPONSE, _Data, Sock, _State) ->
+    {finish, ok, ok, epgsql_sock:set_packet_handler(on_replication, Sock)};
+handle_message(?ERROR, Error, _Sock, _State) ->
+    Result = {error, Error},
+    {sync_required, Result};
+handle_message(_, _, _, _) ->
+    unknown.

+ 27 - 0
src/commands/epgsql_cmd_sync.erl

@@ -0,0 +1,27 @@
+%% > Sync
+%% < ReadyForQuery
+-module(epgsql_cmd_sync).
+-behaviour(epgsql_command).
+-export([init/1, execute/2, handle_message/4]).
+-export_type([response/0]).
+
+-type response() :: ok | {error, epgsql:query_error()}.
+
+-include("epgsql.hrl").
+-include("protocol.hrl").
+
+
+init(_) ->
+    undefined.
+
+execute(Sock, St) ->
+    epgsql_sock:send(Sock, ?SYNC, []),
+    Sock1 = epgsql_sock:set_attr(sync_required, false, Sock),
+    {ok, Sock1, St}.
+
+handle_message(?READY_FOR_QUERY, _, Sock, _State) ->
+    {finish, ok, ok, Sock};
+handle_message(?ERROR, Error, _Sock, _State) ->
+    {sync_required, {error, Error}};
+handle_message(_, _, _, _) ->
+    unknown.

+ 27 - 18
src/epgsql.erl

@@ -31,7 +31,7 @@
 
 -export_type([connection/0, connect_option/0, connect_opts/0,
               connect_error/0, query_error/0,
-              sql_query/0, bind_param/0, typed_param/0,
+              sql_query/0, column/0, bind_param/0, typed_param/0,
               squery_row/0, equery_row/0, reply/1,
               pg_time/0, pg_date/0, pg_datetime/0, pg_interval/0]).
 
@@ -105,10 +105,11 @@
 -type typed_param() ::
     {epgsql_type(), bind_param()}.
 
+-type column() :: #column{}.
 -type squery_row() :: tuple(). % tuple of binary().
 -type equery_row() :: tuple(). % tuple of bind_param().
 -type ok_reply(RowType) ::
-    {ok, ColumnsDescription :: [#column{}], RowsValues :: [RowType]} |                            % select
+    {ok, ColumnsDescription :: [column()], RowsValues :: [RowType]} |                            % select
     {ok, Count :: non_neg_integer()} |                                                            % update/insert/delete
     {ok, Count :: non_neg_integer(), ColumnsDescription :: [#column{}], RowsValues :: [RowType]}. % update/insert/delete + returning
 -type error_reply() :: {error, query_error()}.
@@ -157,9 +158,8 @@ connect(Host, Username, Password, Opts) ->
 connect(C, Host, Username, Password, Opts0) ->
     Opts = to_proplist(Opts0),
     %% TODO connect timeout
-    case gen_server:call(C,
-                         {connect, Host, Username, Password, Opts},
-                         infinity) of
+    case epgsql_sock:sync_command(
+           C, epgsql_cmd_connect, {Host, Username, Password, Opts}) of
         connected ->
             case proplists:get_value(replication, Opts, undefined) of
                 undefined ->
@@ -216,7 +216,7 @@ get_cmd_status(C) ->
 -spec squery(connection(), sql_query()) -> reply(squery_row()) | [reply(squery_row())].
 %% @doc runs simple `SqlQuery' via given `Connection'
 squery(Connection, SqlQuery) ->
-    gen_server:call(Connection, {squery, SqlQuery}, infinity).
+    epgsql_sock:sync_command(Connection, epgsql_cmd_squery, SqlQuery).
 
 equery(C, Sql) ->
     equery(C, Sql, []).
@@ -226,7 +226,7 @@ equery(C, Sql, Parameters) ->
     case parse(C, "", Sql, []) of
         {ok, #statement{types = Types} = S} ->
             Typed_Parameters = lists:zip(Types, Parameters),
-            gen_server:call(C, {equery, S, Typed_Parameters}, infinity);
+            epgsql_sock:sync_command(C, epgsql_cmd_equery, {S, Typed_Parameters});
         Error ->
             Error
     end.
@@ -236,7 +236,7 @@ equery(C, Name, Sql, Parameters) ->
     case parse(C, Name, Sql, []) of
         {ok, #statement{types = Types} = S} ->
             Typed_Parameters = lists:zip(Types, Parameters),
-            gen_server:call(C, {equery, S, Typed_Parameters}, infinity);
+            epgsql_sock:sync_command(C, epgsql_cmd_equery, {S, Typed_Parameters});
         Error ->
             Error
     end.
@@ -246,7 +246,7 @@ prepared_query(C, Name, Parameters) ->
     case describe(C, statement, Name) of
         {ok, #statement{types = Types} = S} ->
             Typed_Parameters = lists:zip(Types, Parameters),
-            gen_server:call(C, {prepared_query, S, Typed_Parameters}, infinity);
+            epgsql_sock:sync_command(C, epgsql_cmd_prepared_query, {S, Typed_Parameters});
         Error ->
             Error
     end.
@@ -263,7 +263,9 @@ parse(C, Sql, Types) ->
 -spec parse(connection(), iolist(), sql_query(), [epgsql_type()]) ->
                    {ok, #statement{}} | {error, query_error()}.
 parse(C, Name, Sql, Types) ->
-    sync_on_error(C, gen_server:call(C, {parse, Name, Sql, Types}, infinity)).
+    sync_on_error(
+      C, epgsql_sock:sync_command(
+           C, epgsql_cmd_parse, {Name, Sql, Types})).
 
 %% bind
 
@@ -275,7 +277,8 @@ bind(C, Statement, Parameters) ->
 bind(C, Statement, PortalName, Parameters) ->
     sync_on_error(
       C,
-      gen_server:call(C, {bind, Statement, PortalName, Parameters}, infinity)).
+      epgsql_sock:sync_command(
+        C, epgsql_cmd_bind, {Statement, PortalName, Parameters})).
 
 %% execute
 
@@ -292,11 +295,11 @@ execute(C, S, N) ->
              | {ok, non_neg_integer(), [equery_row()]}
              | {error, query_error()}.
 execute(C, S, PortalName, N) ->
-    gen_server:call(C, {execute, S, PortalName, N}, infinity).
+    epgsql_sock:sync_command(C, epgsql_cmd_execute, {S, PortalName, N}).
 
 -spec execute_batch(connection(), [{#statement{}, [bind_param()]}]) -> [reply(equery_row())].
 execute_batch(C, Batch) ->
-    gen_server:call(C, {execute_batch, Batch}, infinity).
+    epgsql_sock:sync_command(C, epgsql_cmd_batch, Batch).
 
 %% statement/portal functions
 
@@ -304,20 +307,24 @@ describe(C, #statement{name = Name}) ->
     describe(C, statement, Name).
 
 describe(C, statement, Name) ->
-    sync_on_error(C, gen_server:call(C, {describe_statement, Name}, infinity));
+    sync_on_error(
+      C, epgsql_sock:sync_command(
+           C, epgsql_cmd_describe_statement, Name));
 
 %% TODO unknown result format of Describe portal
 describe(C, portal, Name) ->
-    sync_on_error(C, gen_server:call(C, {describe_portal, Name}, infinity)).
+    sync_on_error(
+      C, epgsql_sock:sync_command(
+           C, epgsql_cmd_describe_portal, Name)).
 
 close(C, #statement{name = Name}) ->
     close(C, statement, Name).
 
 close(C, Type, Name) ->
-    gen_server:call(C, {close, Type, Name}).
+    epgsql_sock:sync_command(C, epgsql_cmd_close, {Type, Name}).
 
 sync(C) ->
-    gen_server:call(C, sync).
+    epgsql_sock:sync_command(C, epgsql_cmd_sync, []).
 
 -spec cancel(connection()) -> ok.
 cancel(C) ->
@@ -405,7 +412,9 @@ standby_status_update(Connection, FlushedLSN, AppliedLSN) ->
 %%                      For example: "option_name1 'value1', option_name2 'value2'"
 %% returns `ok' otherwise `{error, Reason}'
 start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts) ->
-    gen_server:call(Connection, {start_replication, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts}).
+    Command = {ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts},
+    epgsql_sock:sync_command(Connection, epgsql_cmd_start_replication, Command).
+
 start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition) ->
     start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, []).
 

+ 11 - 1
src/epgsql_binary.erl

@@ -7,12 +7,16 @@
          type2oid/2, oid2type/2,
          encode/3, decode/3, supports/1]).
 
+-export_type([codec/0]).
+
 -record(codec, {
     type2oid = [],
     oid2type = []
 }).
 
--include("epgsql_binary.hrl").
+-include("protocol.hrl").
+
+-opaque codec() :: #codec{}.
 
 -define(datetime, (get(datetime_mod))).
 
@@ -24,8 +28,10 @@
 -define(MAX_IP6_MASK, 128).
 -define(JSONB_VERSION_1, 1).
 
+-spec new_codec(list()) -> codec().
 new_codec([]) -> #codec{}.
 
+-spec update_type_cache(list(), codec()) -> codec().
 update_type_cache(TypeInfos, Codec) ->
     Type2Oid = lists:flatmap(
         fun({NameBin, ElementOid, ArrayOid}) ->
@@ -36,6 +42,8 @@ update_type_cache(TypeInfos, Codec) ->
     Oid2Type = [{Oid, Type} || {Type, Oid} <- Type2Oid],
     Codec#codec{type2oid = Type2Oid, oid2type = Oid2Type}.
 
+-spec oid2type(integer(), codec()) -> Type | {unknown_oid, integer()} when
+      Type :: atom() | {array, atom()}.
 oid2type(Oid, #codec{oid2type = Oid2Type}) ->
     case epgsql_types:oid2type(Oid) of
         {unknown_oid, _} ->
@@ -43,6 +51,8 @@ oid2type(Oid, #codec{oid2type = Oid2Type}) ->
         Type -> Type
     end.
 
+-spec type2oid(Type, codec()) -> integer() | {unknown_type, Type} when
+      Type :: atom() | {array, atom()}.
 type2oid(Type, #codec{type2oid = Type2Oid}) ->
     case epgsql_types:type2oid(Type) of
         {unknown_type, _} ->

+ 65 - 0
src/epgsql_command.erl

@@ -0,0 +1,65 @@
+%%% Behaviour module for epgsql_sock commands.
+%%%
+%%% Copyright (C) 2017 - Sergey Prokhorov.  All rights reserved.
+
+-module(epgsql_command).
+-export([init/2, execute/3, handle_message/5]).
+
+-export_type([command/0]).
+
+-type command() :: module().
+-type state() :: any().
+
+%% Initialize command's state. Called when command is received by epgsql_sock process.
+-callback init(any()) -> state().
+
+-type execute_return() ::
+        {ok, epgsql_sock:pg_sock(), state()}
+      | {stop, Reason :: any(), Response :: any(), epgsql_sock:pg_sock()}.
+%% Execute command. It should send commands to socket.
+%% May be called many times if 'handle_message' will return 'requeue'.
+-callback execute(epgsql_sock:pg_sock(), state()) -> execute_return().
+
+-type handle_message_return() ::
+        {noaction, epgsql_sock:pg_sock()}
+        %% Do nothing; remember changed state
+      | {noaction, epgsql_sock:pg_sock(), state()}
+        %% Add result to resultset (eg, `{ok, Count}' `{ok, Cols, Rows}', `{error, #error{}}'
+        %% It may be returned many times for eg, `squery' with multiple
+        %% queries separated by ';'
+        %% See epgsql_sock:get_results/1
+      | {add_result, Data :: any(), Notification :: any(), epgsql_sock:pg_sock(), state()}
+        %% Add new row to current resultset;
+        %% See epgsql_sock:get_rows/1
+      | {add_row, tuple(), epgsql_sock:pg_sock(), state()}
+        %% Finish command execution, reply to the client and go to next command
+      | {finish, Result :: any(), Notification :: any(), epgsql_sock:pg_sock()}
+        %% Stop `epgsql_sock' process
+      | {stop, Reason :: any(), Response :: any(), epgsql_sock:pg_sock()}
+        %% Call 'execute' and reschedule command.
+        %% It's forbidden to call epgsql_sock:send from `handle_message'.
+        %% If you need to do so, you should set some flag in state and
+        %% reschedule command.
+        %% See `epgsql_cmd_connect' for reference.
+      | {requeue, epgsql_sock:pg_sock(), state()}
+        %% Protocol synchronization error (eg, unexpected packet)
+        %% Drop command queue and don't accept any command except 'sync'
+      | {sync_required, Why :: any()}
+        %% Unknown packet. Terminate `epgsql_sock' process
+      | unknown.
+%% Handle incoming packet
+-callback handle_message(Type :: byte(), Payload :: binary() | epgsql:query_error(),
+                         epgsql_sock:pg_sock(), state()) -> handle_message_return().
+
+-spec init(command(), any()) -> state().
+init(Command, Args) ->
+    Command:init(Args).
+
+-spec execute(command(), epgsql_sock:pg_sock(), state()) -> execute_return().
+execute(Command, PgSock, CmdState) ->
+    Command:execute(PgSock, CmdState).
+
+-spec handle_message(command(), Type :: byte(), Payload :: binary() | epgsql:query_error(),
+                     epgsql_sock:pg_sock(), state()) -> handle_message_return().
+handle_message(Command, Type, Payload, PgSock, State) ->
+    Command:handle_message(Type, Payload, PgSock, State).

+ 1 - 1
src/epgsql_fdatetime.erl

@@ -4,7 +4,7 @@
 
 -export([decode/2, encode/2]).
 
--include("epgsql_binary.hrl").
+-include("protocol.hrl").
 
 -define(postgres_epoc_jdate, 2451545).
 -define(postgres_epoc_secs, 946684800).

+ 1 - 1
src/epgsql_idatetime.erl

@@ -4,7 +4,7 @@
 
 -export([decode/2, encode/2]).
 
--include("epgsql_binary.hrl").
+-include("protocol.hrl").
 
 -define(postgres_epoc_jdate, 2451545).
 -define(postgres_epoc_usecs, 946684800000000).

+ 10 - 0
src/epgsql_replication.hrl

@@ -0,0 +1,10 @@
+-record(repl,
+        {
+          last_received_lsn :: integer() | undefined,
+          last_flushed_lsn :: integer() | undefined,
+          last_applied_lsn :: integer() | undefined,
+          feedback_required :: boolean() | undefined,
+          cbmodule :: module() | undefined,
+          cbstate :: any() | undefined,
+          receiver :: pid() | undefined
+        }).

+ 340 - 631
src/epgsql_sock.erl

@@ -1,12 +1,34 @@
 %%% Copyright (C) 2009 - Will Glozer.  All rights reserved.
 %%% Copyright (C) 2011 - Anton Lebedevich.  All rights reserved.
 
+%%% @doc GenServer holding all connection state (including socket).
+%%%
+%%% See https://www.postgresql.org/docs/current/static/protocol-flow.html
+%%% Commands in PostgreSQL are pipelined: you don't need to wait for reply to
+%%% be able to send next command.
+%%% Commands are processed (and responses to them are generated) in FIFO order.
+%%% eg, if you execute 2 SimpleQuery: #1 and #2, first you get all response
+%%% packets for #1 and then all for #2:
+%%% > SQuery #1
+%%% > SQuery #2
+%%% < RowDescription #1
+%%% < DataRow #1
+%%% < CommandComplete #1
+%%% < RowDescription #2
+%%% < DataRow #2
+%%% < CommandComplete #2
+%%%
+%%% See epgsql_cmd_connect for network connection and authentication setup
+
+
 -module(epgsql_sock).
 
 -behavior(gen_server).
 
 -export([start_link/0,
          close/1,
+         sync_command/3,
+         async_command/4,
          get_parameter/2,
          set_notice_receiver/2,
          get_cmd_status/1,
@@ -15,81 +37,46 @@
 -export([handle_call/3, handle_cast/2, handle_info/2]).
 -export([init/1, code_change/3, terminate/2]).
 
-%% state callbacks
--export([auth/2, initializing/2, on_message/2]).
+%% loop callback
+-export([on_message/3, on_replication/3]).
+
+%% Comand's APIs
+-export([set_net_socket/3, init_replication_state/1, set_attr/3, get_codec/1,
+         get_rows/1, get_results/1, notify/2, send/2, send/3, send_multi/2,
+         get_parameter_internal/2,
+         get_replication_state/1, set_packet_handler/2]).
+
+-export_type([transport/0, pg_sock/0]).
 
 -include("epgsql.hrl").
--include("epgsql_binary.hrl").
-
-%% Commands defined as per this page:
-%% http://www.postgresql.org/docs/9.2/static/protocol-message-formats.html
-
-%% Commands
--define(BIND, $B).
--define(CLOSE, $C).
--define(DESCRIBE, $D).
--define(EXECUTE, $E).
--define(FLUSH, $H).
--define(PASSWORD, $p).
--define(PARSE, $P).
--define(SIMPLEQUERY, $Q).
--define(AUTHENTICATION_REQUEST, $R).
--define(SYNC, $S).
-
-%% Parameters
-
--define(PREPARED_STATEMENT, $S).
--define(PORTAL, $P).
-
-%% Responses
-
--define(PARSE_COMPLETE, $1).
--define(BIND_COMPLETE, $2).
--define(CLOSE_COMPLETE, $3).
--define(NOTIFICATION, $A).
--define(COMMAND_COMPLETE, $C).
--define(DATA_ROW, $D).
--define(EMPTY_QUERY, $I).
--define(CANCELLATION_KEY, $K).
--define(NO_DATA, $n).
--define(NOTICE, $N).
--define(PORTAL_SUSPENDED, $s).
--define(PARAMETER_STATUS, $S).
--define(PARAMETER_DESCRIPTION, $t).
--define(ROW_DESCRIPTION, $T).
--define(READY_FOR_QUERY, $Z).
--define(COPY_BOTH_RESPONSE, $W).
--define(COPY_DATA, $d).
-
-% CopyData replication messages
--define(X_LOG_DATA, $w).
--define(PRIMARY_KEEPALIVE_MESSAGE, $k).
--define(STANDBY_STATUS_UPDATE, $r).
-
--record(state, {mod,
-                sock,
+-include("protocol.hrl").
+-include("epgsql_replication.hrl").
+
+-type transport() :: {call, any()}
+                   | {cast, pid(), reference()}
+                   | {incremental, pid(), reference()}.
+
+-record(state, {mod :: gen_tcp | ssl | undefined,
+                sock :: gen_tcp:socket() | ssl:sslsocket() | undefined,
                 data = <<>>,
-                backend,
-                handler,
-                codec,
-                queue = queue:new(),
-                async,
-                parameters = [],
-                types = [],
-                columns = [],
-                rows = [],
+                backend :: {Pid :: integer(), Key :: integer()} | undefined,
+                handler = on_message :: on_message | on_replication | undefined,
+                codec :: epgsql_binary:codec() | undefined,
+                queue = queue:new() :: queue:queue({epgsql_command:command(), any(), transport()}),
+                current_cmd :: epgsql_command:command() | undefined,
+                current_cmd_state :: any() | undefined,
+                current_cmd_transport :: transport() | undefined,
+                async :: undefined | atom() | pid(),
+                parameters = [] :: [{Key :: binary(), Value :: binary()}],
+                rows = [] :: [tuple()],
                 results = [],
-                batch = [],
-                sync_required,
-                txstatus,
-                complete_status :: undefined | atom() | {atom(), integer()},
-                repl_last_received_lsn,
-                repl_last_flushed_lsn,
-                repl_last_applied_lsn,
-                repl_feedback_required,
-                repl_cbmodule,
-                repl_cbstate,
-                repl_receiver}).
+                sync_required :: boolean() | undefined,
+                txstatus :: byte() | undefined,  % $I | $T | $E,
+                complete_status :: atom() | {atom(), integer()} | undefined,
+                repl :: #repl{} | undefined}).
+
+-opaque pg_sock() :: #state{}.
+
 
 %% -- client interface --
 
@@ -100,6 +87,18 @@ close(C) when is_pid(C) ->
     catch gen_server:cast(C, stop),
     ok.
 
+-spec sync_command(epgsql:conection(), epgsql_command:command(), any()) -> any().
+sync_command(C, Command, Args) ->
+    gen_server:call(C, {command, Command, Args}, infinity).
+
+-spec async_command(epgsql:conection(), cast | incremental,
+                    epgsql_command:command(), any()) -> reference().
+async_command(C, Transport, Command, Args) ->
+    Ref = make_ref(),
+    Pid = self(),
+    ok = gen_server:cast(C, {{Transport, Pid, Ref}, Command, Args}),
+    Ref.
+
 get_parameter(C, Name) ->
     gen_server:call(C, {get_parameter, to_binary(Name)}, infinity).
 
@@ -113,6 +112,65 @@ get_cmd_status(C) ->
 cancel(S) ->
     gen_server:cast(S, cancel).
 
+
+%% -- command APIs --
+
+%% send()
+%% send_many()
+
+-spec set_net_socket(gen_tcp | ssl, gen_tcp:socket() | ssl:sslsocket(), pg_sock()) -> pg_sock().
+set_net_socket(Mod, Socket, State) ->
+    State1 = State#state{mod = Mod, sock = Socket},
+    setopts(State1, [{active, true}]),
+    State1.
+
+-spec init_replication_state(pg_sock()) -> pg_sock().
+init_replication_state(State) ->
+    State#state{repl = #repl{}}.
+
+-spec set_attr(atom(), any(), pg_sock()) -> pg_sock().
+set_attr(backend, {_Pid, _Key} = Backend, State) ->
+    State#state{backend = Backend};
+set_attr(async, Async, State) ->
+    State#state{async = Async};
+set_attr(txstatus, Status, State) ->
+    State#state{txstatus = Status};
+set_attr(codec, Codec, State) ->
+    State#state{codec = Codec};
+set_attr(sync_required, Value, State) ->
+    State#state{sync_required = Value};
+set_attr(replication_state, Value, State) ->
+    State#state{repl = Value}.
+
+%% XXX: be careful!
+-spec set_packet_handler(atom(), pg_sock()) -> pg_sock().
+set_packet_handler(Handler, State) ->
+    State#state{handler = Handler}.
+
+-spec get_codec(pg_sock()) -> epgsql_binary:codec().
+get_codec(#state{codec = Codec}) ->
+    Codec.
+
+-spec get_replication_state(pg_sock()) -> #repl{}.
+get_replication_state(#state{repl = Repl}) ->
+    Repl.
+
+-spec get_rows(pg_sock()) -> [tuple()].
+get_rows(#state{rows = Rows}) ->
+    lists:reverse(Rows).
+
+-spec get_results(pg_sock()) -> [any()].
+get_results(#state{results = Results}) ->
+    lists:reverse(Results).
+
+-spec get_parameter_internal(binary(), pg_sock()) -> binary() | undefined.
+get_parameter_internal(Name, #state{parameters = Parameters}) ->
+    case lists:keysearch(Name, 1, Parameters) of
+        {value, {Name, Value}} -> Value;
+        false                  -> undefined
+    end.
+
+
 %% -- gen_server implementation --
 
 init([]) ->
@@ -123,11 +181,7 @@ handle_call({update_type_cache, TypeInfos}, _From, #state{codec = Codec} = State
     {reply, ok, State#state{codec = Codec2}};
 
 handle_call({get_parameter, Name}, _From, State) ->
-    Value1 = case lists:keysearch(Name, 1, State#state.parameters) of
-        {value, {Name, Value}} -> Value;
-        false                  -> undefined
-    end,
-    {reply, {ok, Value1}, State};
+    {reply, {ok, get_parameter_internal(Name, State)}, State};
 
 handle_call({set_async_receiver, PidOrName}, _From, #state{async = Previous} = State) ->
     {reply, {ok, Previous}, State#state{async = PidOrName}};
@@ -136,23 +190,21 @@ handle_call(get_cmd_status, _From, #state{complete_status = Status} = State) ->
     {reply, {ok, Status}, State};
 
 handle_call({standby_status_update, FlushedLSN, AppliedLSN}, _From,
-    #state{repl_last_received_lsn = ReceivedLSN} = State) ->
+            #state{handler = on_replication,
+                   repl = #repl{last_received_lsn = ReceivedLSN} = Repl} = State) ->
     send(State, ?COPY_DATA, epgsql_wire:encode_standby_status_update(ReceivedLSN, FlushedLSN, AppliedLSN)),
-    {reply, ok, State#state{repl_last_flushed_lsn = FlushedLSN, repl_last_applied_lsn = AppliedLSN}};
-
-handle_call(Command, From, State) ->
-    #state{queue = Q} = State,
-    Req = {{call, From}, Command},
-    command(Command, State#state{queue = queue:in(Req, Q),
-                                 complete_status = undefined}).
-
-handle_cast({{Method, From, Ref}, Command} = Req, State)
+    Repl1 = Repl#repl{last_flushed_lsn = FlushedLSN,
+                      last_applied_lsn = AppliedLSN},
+    {reply, ok, State#state{repl = Repl1}};
+handle_call({command, Command, Args}, From, State) ->
+    Transport = {call, From},
+    command_new(Transport, Command, Args, State).
+
+handle_cast({{Method, From, Ref} = Transport, Command, Args}, State)
   when ((Method == cast) or (Method == incremental)),
        is_pid(From),
        is_reference(Ref)  ->
-    #state{queue = Q} = State,
-    command(Command, State#state{queue = queue:in(Req, Q),
-                                 complete_status = undefined});
+    command_new(Transport, Command, Args, State);
 
 handle_cast(stop, State) ->
     {stop, normal, flush_queue(State, {error, closed})};
@@ -198,213 +250,109 @@ code_change(_OldVsn, State, _Extra) ->
 
 %% -- internal functions --
 
-command(Command, State = #state{sync_required = true})
-  when Command /= sync ->
-    {noreply, finish(State, {error, sync_required})};
-
-command({connect, Host, Username, Password, Opts}, State) ->
-    Timeout = proplists:get_value(timeout, Opts, 5000),
-    Port = proplists:get_value(port, Opts, 5432),
-    SockOpts = [{active, false}, {packet, raw}, binary, {nodelay, true}, {keepalive, true}],
-    case gen_tcp:connect(Host, Port, SockOpts, Timeout) of
-        {ok, Sock} ->
-
-            %% Increase the buffer size.  Following the recommendation in the inet man page:
-            %%
-            %%    It is recommended to have val(buffer) >=
-            %%    max(val(sndbuf),val(recbuf)).
-
-            {ok, [{recbuf, RecBufSize}, {sndbuf, SndBufSize}]} =
-                inet:getopts(Sock, [recbuf, sndbuf]),
-            inet:setopts(Sock, [{buffer, max(RecBufSize, SndBufSize)}]),
-
-            State2 = case proplists:get_value(ssl, Opts) of
-                         T when T == true; T == required ->
-                             start_ssl(Sock, T, Opts, State);
-                         _ ->
-                             State#state{mod  = gen_tcp, sock = Sock}
-                     end,
-
-            Opts2 = ["user", 0, Username, 0],
-            Opts3 = case proplists:get_value(database, Opts, undefined) of
-                undefined -> Opts2;
-                Database  -> [Opts2 | ["database", 0, Database, 0]]
-            end,
-
-            Opts4 = case proplists:get_value(replication, Opts, undefined) of
-                        undefined -> Opts3;
-                        Replication  -> [Opts3 | ["replication", 0, Replication, 0]]
-                    end,
+-spec command_new(transport(), epgsql_command:command(), any(), pg_sock()) ->
+                         Result when
+      Result :: {noreply, pg_sock()}
+              | {stop, Reason :: any(), pg_sock()}.
+command_new(Transport, Command, Args, State) ->
+    CmdState = epgsql_command:init(Command, Args),
+    command_exec(Transport, Command, CmdState, State).
+
+-spec command_exec(transport(), epgsql_command:command(), any(), pg_sock()) ->
+                          Result when
+      Result :: {noreply, pg_sock()}
+              | {stop, Reason :: any(), pg_sock()}.
+command_exec(Transport, Command, _, State = #state{sync_required = true})
+  when Command /= epgsql_cmd_sync ->
+    {noreply,
+     finish(State#state{current_cmd = Command,
+                        current_cmd_transport = Transport},
+            {error, sync_required})};
+command_exec(Transport, Command, CmdState, State) ->
+    case epgsql_command:execute(Command, State, CmdState) of
+        {ok, State1, CmdState1} ->
+            {noreply, command_enqueue(Transport, Command, CmdState1, State1)};
+        {stop, StopReason, Response, State1} ->
+            reply(Transport, Response, Response),
+            {stop, StopReason, State1}
+    end.
 
-            send(State2, [<<196608:?int32>>, Opts4, 0]),
-            Async   = proplists:get_value(async, Opts, undefined),
-            setopts(State2, [{active, true}]),
-            put(username, Username),
-            put(password, Password),
+-spec command_enqueue(transport(), epgsql_command:command(), epgsql_command:state(), pg_sock()) -> pg_sock().
+command_enqueue(Transport, Command, CmdState, #state{current_cmd = undefined} = State) ->
+    State#state{current_cmd = Command,
+                current_cmd_state = CmdState,
+                current_cmd_transport = Transport,
+                complete_status = undefined};
+command_enqueue(Transport, Command, CmdState, #state{queue = Q} = State) ->
+    State#state{queue = queue:in({Command, CmdState, Transport}, Q),
+                complete_status = undefined}.
+
+-spec command_handle_message(byte(), binary() | epgsql:query_error(), pg_sock()) ->
+                                    {noreply, pg_sock()}
+                                  | {stop, any(), pg_sock()}.
+command_handle_message(Msg, Payload,
+                       #state{current_cmd = Command,
+                              current_cmd_state = CmdState} = State) ->
+    case epgsql_command:handle_message(Command, Msg, Payload, State, CmdState) of
+        {add_row, Row, State1, CmdState1} ->
+            {noreply, add_row(State1#state{current_cmd_state = CmdState1}, Row)};
+        {add_result, Result, Notice, State1, CmdState1} ->
             {noreply,
-             State2#state{handler = auth,
-                          async = Async}};
-
-        {error, Reason} = Error ->
-            {stop, Reason, finish(State, Error)}
-    end;
-
-command({squery, Sql}, State) ->
-    send(State, ?SIMPLEQUERY, [Sql, 0]),
-    {noreply, State};
-
-%% TODO add fast_equery command that doesn't need parsed statement,
-%% uses default (text) column format,
-%% sends Describe after Bind to get RowDescription
-command({equery, Statement, Parameters}, #state{codec = Codec} = State) ->
-    #statement{name = StatementName, columns = Columns} = Statement,
-    Bin1 = epgsql_wire:encode_parameters(Parameters, Codec),
-    Bin2 = epgsql_wire:encode_formats(Columns),
-    send_multi(State, [
-        {?BIND, ["", 0, StatementName, 0, Bin1, Bin2]},
-        {?EXECUTE, ["", 0, <<0:?int32>>]},
-        {?CLOSE, [?PREPARED_STATEMENT, StatementName, 0]},
-        {?SYNC, []}
-    ]),
-    {noreply, State};
-
-command({prepared_query, Statement, Parameters}, #state{codec = Codec} = State) ->
-    #statement{name = StatementName, columns = Columns} = Statement,
-    Bin1 = epgsql_wire:encode_parameters(Parameters, Codec),
-    Bin2 = epgsql_wire:encode_formats(Columns),
-    send_multi(State, [
-        {?BIND, ["", 0, StatementName, 0, Bin1, Bin2]},
-        {?EXECUTE, ["", 0, <<0:?int32>>]},
-        {?SYNC, []}
-    ]),
-    {noreply, State};
-
-command({parse, Name, Sql, Types}, State) ->
-    Bin = epgsql_wire:encode_types(Types, State#state.codec),
-    send_multi(State, [
-        {?PARSE, [Name, 0, Sql, 0, Bin]},
-        {?DESCRIBE, [?PREPARED_STATEMENT, Name, 0]},
-        {?FLUSH, []}
-    ]),
-    {noreply, State};
-
-command({bind, Statement, PortalName, Parameters}, #state{codec = Codec} = State) ->
-    #statement{name = StatementName, columns = Columns, types = Types} = Statement,
-    Typed_Parameters = lists:zip(Types, Parameters),
-    Bin1 = epgsql_wire:encode_parameters(Typed_Parameters, Codec),
-    Bin2 = epgsql_wire:encode_formats(Columns),
-    send_multi(State, [
-        {?BIND, [PortalName, 0, StatementName, 0, Bin1, Bin2]},
-        {?FLUSH, []}
-    ]),
-    {noreply, State};
-
-command({execute, _Statement, PortalName, MaxRows}, State) ->
-    send_multi(State, [
-        {?EXECUTE, [PortalName, 0, <<MaxRows:?int32>>]},
-        {?FLUSH, []}
-    ]),
-    {noreply, State};
-
-command({execute_batch, Batch}, #state{codec = Codec} = State) ->
-    Commands =
-        lists:foldr(
-          fun({Statement, Parameters}, Acc) ->
-                  #statement{name = StatementName,
-                             columns = Columns,
-                             types = Types} = Statement,
-                  Typed_Parameters = lists:zip(Types, Parameters),
-                  Bin1 = epgsql_wire:encode_parameters(Typed_Parameters, Codec),
-                  Bin2 = epgsql_wire:encode_formats(Columns),
-                  [{?BIND, [0, StatementName, 0, Bin1, Bin2]},
-                   {?EXECUTE, [0, <<0:?int32>>]} | Acc]
-          end,
-          [{?SYNC, []}],
-          Batch),
-    send_multi(State, Commands),
-    {noreply, State};
-
-command({describe_statement, Name}, State) ->
-    send_multi(State, [
-        {?DESCRIBE, [?PREPARED_STATEMENT, Name, 0]},
-        {?FLUSH, []}
-    ]),
-    {noreply, State};
-
-command({describe_portal, Name}, State) ->
-    send_multi(State, [
-        {?DESCRIBE, [?PORTAL, Name, 0]},
-        {?FLUSH, []}
-    ]),
-    {noreply, State};
-
-command({close, Type, Name}, State) ->
-    Type2 = case Type of
-        statement -> ?PREPARED_STATEMENT;
-        portal    -> ?PORTAL
-    end,
-    send_multi(State, [
-        {?CLOSE, [Type2, Name, 0]},
-        {?FLUSH, []}
-    ]),
-    {noreply, State};
-
-command(sync, State) ->
-    send(State, ?SYNC, []),
-    {noreply, State#state{sync_required = false}};
-
-command({start_replication, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts}, State) ->
-    Sql1 = ["START_REPLICATION SLOT """, ReplicationSlot, """ LOGICAL ", WALPosition],
-    Sql2 =
-        case PluginOpts of
-            [] -> Sql1;
-            PluginOpts -> [Sql1 , " (", PluginOpts, ")"]
-        end,
-
-    State2 =
-        case Callback of
-            Pid when is_pid(Pid) -> State#state{repl_receiver = Pid};
-            Module -> State#state{repl_cbmodule = Module, repl_cbstate = CbInitState}
-        end,
-
-    Hex = [H || H <- WALPosition, H =/= $/],
-    {ok, [LSN], _} = io_lib:fread("~16u", Hex),
-
-    State3 = State2#state{repl_last_flushed_lsn = LSN, repl_last_applied_lsn = LSN},
-
-    send(State3, ?SIMPLEQUERY, [Sql2, 0]),
-    {noreply, State3}.
+             add_result(State1#state{current_cmd_state = CmdState1},
+                        Notice, Result)};
+        {finish, Result, Notice, State1} ->
+            {noreply, finish(State1, Notice, Result)};
+        {noaction, State1} ->
+            {noreply, State1};
+        {noaction, State1, CmdState1} ->
+            {noreply, State1#state{current_cmd_state = CmdState1}};
+        {requeue, State1, CmdState1} ->
+            Transport = State1#state.current_cmd_transport,
+            command_exec(Transport, Command, CmdState1,
+                         State1#state{current_cmd = undefined});
+        {stop, Reason, Response, State1} ->
+            {stop, Reason, finish(State1, Response)};
+        {sync_required, Why} ->
+            %% Protocol error. Finish and flush all pending commands.
+            {noreply, sync_required(finish(State#state{sync_required = true}, Why))};
+        unknown ->
+            {stop, {error, {unexpected_message, Msg, Command, CmdState}}, State}
+    end.
 
-start_ssl(S, Flag, Opts, State) ->
-    ok = gen_tcp:send(S, <<8:?int32, 80877103:?int32>>),
-    Timeout = proplists:get_value(timeout, Opts, 5000),
-    {ok, <<Code>>} = gen_tcp:recv(S, 1, Timeout),
-    case Code of
-        $S  ->
-            SslOpts = proplists:get_value(ssl_opts, Opts, []),
-            case ssl:connect(S, SslOpts, Timeout) of
-                {ok, S2}        -> State#state{mod = ssl, sock = S2};
-                {error, Reason} -> exit({ssl_negotiation_failed, Reason})
-            end;
-        $N ->
-            case Flag of
-                true     -> State;
-                required -> exit(ssl_not_available)
-            end
+command_next(#state{current_cmd = PrevCmd,
+                    queue = Q} = State) when PrevCmd =/= undefined ->
+    case queue:out(Q) of
+        {empty, _} ->
+            State#state{current_cmd = undefined,
+                        current_cmd_state = undefined,
+                        current_cmd_transport = undefined,
+                        rows = [],
+                        results = []};
+        {{value, {Command, CmdState, Transport}}, Q1} ->
+            State#state{current_cmd = Command,
+                        current_cmd_state = CmdState,
+                        current_cmd_transport = Transport,
+                        queue = Q1,
+                        rows = [],
+                        results = []}
     end.
 
+
 setopts(#state{mod = Mod, sock = Sock}, Opts) ->
     case Mod of
         gen_tcp -> inet:setopts(Sock, Opts);
         ssl     -> ssl:setopts(Sock, Opts)
     end.
 
+-spec send(pg_sock(), iodata()) -> ok | {error, any()}.
 send(#state{mod = Mod, sock = Sock}, Data) ->
     do_send(Mod, Sock, epgsql_wire:encode(Data)).
 
+-spec send(pg_sock(), byte(), iodata()) -> ok | {error, any()}.
 send(#state{mod = Mod, sock = Sock}, Type, Data) ->
     do_send(Mod, Sock, epgsql_wire:encode(Type, Data)).
 
+-spec send_multi(pg_sock(), [{byte(), iodata()}]) -> ok | {error, any()}.
 send_multi(#state{mod = Mod, sock = Sock}, List) ->
     do_send(Mod, Sock, lists:map(fun({Type, Data}) ->
         epgsql_wire:encode(Type, Data)
@@ -422,12 +370,10 @@ do_send(gen_tcp, Sock, Bin) ->
 do_send(Mod, Sock, Bin) ->
     Mod:send(Sock, Bin).
 
-loop(#state{data = Data, handler = Handler, repl_last_received_lsn = LastReceivedLSN,
-    repl_last_flushed_lsn = LastFlushedLSN, repl_last_applied_lsn = LastAppliedLSN,
-    repl_feedback_required = ReplFeedbackRequired} = State) ->
+loop(#state{data = Data, handler = Handler, repl = Repl} = State) ->
     case epgsql_wire:decode_message(Data) of
-        {Message, Tail} ->
-            case ?MODULE:Handler(Message, State#state{data = Tail}) of
+        {Type, Payload, Tail} ->
+            case ?MODULE:Handler(Type, Payload, State#state{data = Tail}) of
                 {noreply, State2} ->
                     loop(State2);
                 R = {stop, _Reason2, _State2} ->
@@ -435,11 +381,14 @@ loop(#state{data = Data, handler = Handler, repl_last_received_lsn = LastReceive
             end;
         _ ->
             %% in replication mode send feedback after each batch of messages
-            case ReplFeedbackRequired of
+            case (Repl =/= undefined) andalso (Repl#repl.feedback_required) of
                 true ->
+                    #repl{last_received_lsn = LastReceivedLSN,
+                          last_flushed_lsn = LastFlushedLSN,
+                          last_applied_lsn = LastAppliedLSN} = Repl,
                     send(State, ?COPY_DATA, epgsql_wire:encode_standby_status_update(
                         LastReceivedLSN, LastFlushedLSN, LastAppliedLSN)),
-                    {noreply, State#state{repl_feedback_required = false}};
+                    {noreply, State#state{repl = Repl#repl{feedback_required = false}}};
                 _ ->
                     {noreply, State}
             end
@@ -448,44 +397,31 @@ loop(#state{data = Data, handler = Handler, repl_last_received_lsn = LastReceive
 finish(State, Result) ->
     finish(State, Result, Result).
 
-finish(State = #state{queue = Q}, Notice, Result) ->
-    case queue:get(Q) of
-        {{cast, From, Ref}, _} ->
-            From ! {self(), Ref, Result};
-        {{incremental, From, Ref}, _} ->
-            From ! {self(), Ref, Notice};
-        {{call, From}, _} ->
-            gen_server:reply(From, Result)
-    end,
-    State#state{queue = queue:drop(Q),
-                types = [],
-                columns = [],
-                rows = [],
-                results = [],
-                batch = []}.
+finish(State = #state{current_cmd_transport = Transport}, Notice, Result) ->
+    reply(Transport, Notice, Result),
+    command_next(State).
 
-add_result(State, Notice, Result) ->
-    #state{queue = Q, results = Results, batch = Batch} = State,
-    Results2 = case queue:get(Q) of
-                   {{incremental, From, Ref}, _} ->
+reply({cast, From, Ref}, _, Result) ->
+    From ! {self(), Ref, Result};
+reply({incremental, From, Ref}, Notice, _) ->
+    From ! {self(), Ref, Notice};
+reply({call, From}, _, Result) ->
+    gen_server:reply(From, Result).
+
+add_result(#state{results = Results, current_cmd_transport = Transport} = State, Notice, Result) ->
+    Results2 = case Transport of
+                   {incremental, From, Ref} ->
                        From ! {self(), Ref, Notice},
                        Results;
                    _ ->
                        [Result | Results]
                end,
-    Batch2 = case Batch of
-                 [] -> [];
-                 _ -> tl(Batch)
-             end,
-    State#state{types = [],
-                columns = [],
-                rows = [],
-                results = Results2,
-                batch = Batch2}.
-
-add_row(State = #state{queue = Q, rows = Rows}, Data) ->
-    Rows2 = case queue:get(Q) of
-                {{incremental, From, Ref}, _} ->
+    State#state{rows = [],
+                results = Results2}.
+
+add_row(#state{rows = Rows, current_cmd_transport = Transport} = State, Data) ->
+    Rows2 = case Transport of
+                {incremental, From, Ref} ->
                     From ! {self(), Ref, {data, Data}},
                     Rows;
                 _ ->
@@ -493,15 +429,13 @@ add_row(State = #state{queue = Q, rows = Rows}, Data) ->
             end,
     State#state{rows = Rows2}.
 
-notify(State = #state{queue = Q}, Notice) ->
-    case queue:get(Q) of
-        {{incremental, From, Ref}, _} ->
-            From ! {self(), Ref, Notice};
-        _ ->
-            ignore
-    end,
+notify(#state{current_cmd_transport = {incremental, From, Ref}} = State, Notice) ->
+    From ! {self(), Ref, Notice},
+    State;
+notify(State, _) ->
     State.
 
+%% Send asynchronous messages (notice / notification)
 notify_async(#state{async = undefined}, _) ->
     false;
 notify_async(#state{async = PidOrName}, Msg) ->
@@ -512,311 +446,58 @@ notify_async(#state{async = PidOrName}, Msg) ->
             false
     end.
 
-command_tag(#state{queue = Q}) ->
-    {_, Req} = queue:get(Q),
-    if is_tuple(Req) ->
-            element(1, Req);
-       is_atom(Req) ->
-            Req
-    end.
-
-get_columns(State) ->
-    #state{queue = Q, columns = Columns, batch = Batch} = State,
-    case queue:get(Q) of
-        {_, {Command, #statement{columns = C}, _}}  when Command == equery; Command == prepared_query ->
-            C;
-        {_, {execute, #statement{columns = C}, _, _}} ->
-            C;
-        {_, {squery, _}} ->
-            Columns;
-        {_, {execute_batch, _}} ->
-            [{#statement{columns = C}, _} | _] = Batch,
-            C
-    end.
-
-make_statement(State) ->
-    #state{queue = Q, types = Types, columns = Columns} = State,
-    Name = case queue:get(Q) of
-               {_, {parse, N, _, _}} -> N;
-               {_, {describe_statement, N}} -> N
-           end,
-    #statement{name = Name, types = Types, columns = Columns}.
-
-sync_required(#state{queue = Q} = State) ->
-    case queue:is_empty(Q) of
-        false ->
-            case command_tag(State) of
-                sync ->
-                    State;
-                _ ->
-                    sync_required(finish(State, {error, sync_required}))
-            end;
-        true ->
-            State#state{sync_required = true}
-    end.
+sync_required(#state{current_cmd = epgsql_cmd_sync} = State) ->
+    State;
+sync_required(#state{current_cmd = undefined} = State) ->
+    State#state{sync_required = true};
+sync_required(State) ->
+    sync_required(finish(State, {error, sync_required})).
 
-flush_queue(#state{queue = Q} = State, Error) ->
-    case queue:is_empty(Q) of
-        false ->
-            flush_queue(finish(State, Error), Error);
-        true -> State
-    end.
+flush_queue(#state{current_cmd = undefined} = State, _) ->
+    State;
+flush_queue(State, Error) ->
+    flush_queue(finish(State, Error), Error).
 
 to_binary(B) when is_binary(B) -> B;
 to_binary(L) when is_list(L)   -> list_to_binary(L).
 
-hex(Bin) ->
-    HChar = fun(N) when N < 10 -> $0 + N;
-               (N) when N < 16 -> $W + N
-            end,
-    <<<<(HChar(H)), (HChar(L))>> || <<H:4, L:4>> <= Bin>>.
 
 %% -- backend message handling --
 
-%% AuthenticationOk
-auth({?AUTHENTICATION_REQUEST, <<0:?int32>>}, State) ->
-    {noreply, State#state{handler = initializing}};
-
-%% AuthenticationCleartextPassword
-auth({?AUTHENTICATION_REQUEST, <<3:?int32>>}, State) ->
-    send(State, ?PASSWORD, [get(password), 0]),
-    {noreply, State};
-
-%% AuthenticationMD5Password
-auth({?AUTHENTICATION_REQUEST, <<5:?int32, Salt:4/binary>>}, State) ->
-    Digest1 = hex(erlang:md5([get(password), get(username)])),
-    Str = ["md5", hex(erlang:md5([Digest1, Salt])), 0],
-    send(State, ?PASSWORD, Str),
-    {noreply, State};
-
-auth({?AUTHENTICATION_REQUEST, <<M:?int32, _/binary>>}, State) ->
-    Method = case M of
-        2 -> kerberosV5;
-        4 -> crypt;
-        6 -> scm;
-        7 -> gss;
-        8 -> sspi;
-        _ -> unknown
-    end,
-    State2 = finish(State, {error, {unsupported_auth_method, Method}}),
-    {stop, normal, State2};
-
-%% ErrorResponse
-auth({error, E}, State) ->
-    Why = case E#error.code of
-        <<"28000">> -> invalid_authorization_specification;
-        <<"28P01">> -> invalid_password;
-        Any         -> Any
-    end,
-    {stop, normal, finish(State, {error, Why})};
-
-auth(Other, State) ->
-    on_message(Other, State).
-
-%% BackendKeyData
-initializing({?CANCELLATION_KEY, <<Pid:?int32, Key:?int32>>}, State) ->
-    {noreply, State#state{backend = {Pid, Key}}};
-
-%% ReadyForQuery
-initializing({?READY_FOR_QUERY, <<Status:8>>}, State) ->
-    #state{parameters = Parameters} = State,
-    erase(username),
-    erase(password),
-    %% TODO decode dates to now() format
-    case lists:keysearch(<<"integer_datetimes">>, 1, Parameters) of
-        {value, {_, <<"on">>}}  -> put(datetime_mod, epgsql_idatetime);
-        {value, {_, <<"off">>}} -> put(datetime_mod, epgsql_fdatetime)
-    end,
-    State2 = finish(State#state{handler = on_message,
-                               txstatus = Status,
-                               codec = epgsql_binary:new_codec([])},
-                   connected),
-    {noreply, State2};
-
-initializing({error, _} = Error, State) ->
-    {stop, normal, finish(State, Error)};
-
-initializing(Other, State) ->
-    on_message(Other, State).
-
-%% ParseComplete
-on_message({?PARSE_COMPLETE, <<>>}, State) ->
-    {noreply, State};
-
-%% ParameterDescription
-on_message({?PARAMETER_DESCRIPTION, <<_Count:?int16, Bin/binary>>}, State) ->
-    Types = [epgsql_binary:oid2type(Oid, State#state.codec) || <<Oid:?int32>> <= Bin],
-    State2 = notify(State#state{types = Types}, {types, Types}),
-    {noreply, State2};
-
-%% RowDescription
-on_message({?ROW_DESCRIPTION, <<Count:?int16, Bin/binary>>}, State) ->
-    Columns = epgsql_wire:decode_columns(Count, Bin, State#state.codec),
-    Columns2 =
-        case command_tag(State) of
-            C when C == describe_portal; C == squery ->
-                Columns;
-            C when C == parse; C == describe_statement ->
-                [Col#column{format = epgsql_wire:format(Col#column.type)}
-                 || Col <- Columns]
-        end,
-    State2 = State#state{columns = Columns2},
-    Message = {columns, Columns2},
-    State3 = case command_tag(State2) of
-                 squery ->
-                     notify(State2, Message);
-                 T when T == parse; T == describe_statement ->
-                     finish(State2, Message, {ok, make_statement(State2)});
-                 describe_portal ->
-                     finish(State2, Message, {ok, Columns})
-             end,
-    {noreply, State3};
-
-%% NoData
-on_message({?NO_DATA, <<>>}, State) ->
-    State2 = case command_tag(State) of
-                 C when C == parse; C == describe_statement ->
-                     finish(State, no_data, {ok, make_statement(State)});
-                 describe_portal ->
-                     finish(State, no_data, {ok, []})
-             end,
-    {noreply, State2};
-
-%% BindComplete
-on_message({?BIND_COMPLETE, <<>>}, State) ->
-    State2 = case command_tag(State) of
-                 Command when Command == equery; Command == prepared_query ->
-                     %% TODO send Describe as a part of equery, needs text format support
-                     notify(State, {columns, get_columns(State)});
-                 bind ->
-                     finish(State, ok);
-                 execute_batch ->
-                     Batch =
-                         case State#state.batch of
-                             [] ->
-                                 {_, {_, B}} = queue:get(State#state.queue),
-                                 B;
-                             B -> B
-                         end,
-                     State#state{batch = Batch}
-             end,
-    {noreply, State2};
-
-%% CloseComplete
-on_message({?CLOSE_COMPLETE, <<>>}, State) ->
-    State2 = case command_tag(State) of
-                 Command when Command == equery; Command == prepared_query ->
-                     State;
-                 close ->
-                     finish(State, ok)
-             end,
-    {noreply, State2};
-
-%% DataRow
-on_message({?DATA_ROW, <<_Count:?int16, Bin/binary>>}, State) ->
-    Data = epgsql_wire:decode_data(get_columns(State), Bin, State#state.codec),
-    {noreply, add_row(State, Data)};
-
-%% PortalSuspended
-on_message({?PORTAL_SUSPENDED, <<>>}, State) ->
-    State2 = finish(State,
-                   suspended,
-                   {partial, lists:reverse(State#state.rows)}),
-    {noreply, State2};
-
 %% CommandComplete
-on_message({?COMMAND_COMPLETE, Bin}, State0) ->
+on_message(?COMMAND_COMPLETE = Msg, Bin, State) ->
     Complete = epgsql_wire:decode_complete(Bin),
-    State = State0#state{complete_status = Complete},
-    Command = command_tag(State),
-    Notice = {complete, Complete},
-    Rows = lists:reverse(State#state.rows),
-    Columns = get_columns(State),
-    State2 = case {Command, Complete, Columns} of
-                 {execute, {_, Count}, []} ->
-                     finish(State, Notice, {ok, Count});
-                 {execute, {_, Count}, _} ->
-                     finish(State, Notice, {ok, Count, Rows});
-                 {execute, _, _} ->
-                     finish(State, Notice, {ok, Rows});
-                 {execute_batch, {_, Count}, []} ->
-                     add_result(State, Notice, {ok, Count});
-                 {execute_batch, {_, Count}, _} ->
-                     add_result(State, Notice, {ok, Count, Rows});
-                 {execute_batch, _, _} ->
-                     add_result(State, Notice, {ok, Rows});
-                 {C, {_, Count}, []} when C == squery; C == equery; C == prepared_query ->
-                     add_result(State, Notice, {ok, Count});
-                 {C, {_, Count}, _} when C == squery; C == equery; C == prepared_query ->
-                     add_result(State, Notice, {ok, Count, Columns, Rows});
-                 {C, _, _} when C == squery; C == equery; C == prepared_query ->
-                     add_result(State, Notice, {ok, Columns, Rows})
-             end,
-    {noreply, State2};
-
-%% EmptyQueryResponse
-on_message({?EMPTY_QUERY, _Bin}, State) ->
-    Notice = {complete, empty},
-    State2 = case command_tag(State) of
-                 execute ->
-                     finish(State, Notice, {ok, [], []});
-                 C when C == squery; C == equery; C == prepared_query ->
-                     add_result(State, Notice, {ok, [], []})
-             end,
-    {noreply, State2};
+    command_handle_message(Msg, Bin, State#state{complete_status = Complete});
 
 %% ReadyForQuery
-on_message({?READY_FOR_QUERY, <<Status:8>>}, State) ->
-    State2 = case command_tag(State) of
-                 squery ->
-                     case State#state.results of
-                         [Result] ->
-                             finish(State, done, Result);
-                         Results ->
-                             finish(State, done, lists:reverse(Results))
-                     end;
-                 execute_batch ->
-                     finish(State, done, lists:reverse(State#state.results));
-                 Command when Command == equery; Command == prepared_query ->
-                     case State#state.results of
-                         [Result] ->
-                             finish(State, done, Result);
-                         [] ->
-                             finish(State, done)
-                     end;
-                 sync ->
-                     finish(State, ok)
-             end,
-    {noreply, State2#state{txstatus = Status}};
-
-on_message(Error = {error, Reason}, State) ->
-    case queue:is_empty(State#state.queue) of
-        true ->
+on_message(?READY_FOR_QUERY = Msg, <<Status:8>> = Bin, State) ->
+    command_handle_message(Msg, Bin, State#state{txstatus = Status});
+
+%% Error
+on_message(?ERROR = Msg, Err, #state{current_cmd = CurrentCmd} = State) ->
+    Reason = epgsql_wire:decode_error(Err),
+    case CurrentCmd of
+        undefined ->
+            %% Message generated by server asynchronously
             {stop, {shutdown, Reason}, State};
-        false ->
-            State2 = case command_tag(State) of
-                C when C == squery; C == equery; C == execute_batch; C == prepared_query ->
-                    add_result(State, Error, Error);
-                _ ->
-                    sync_required(finish(State, Error))
-            end,
-            {noreply, State2}
+        _ ->
+            command_handle_message(Msg, Reason, State)
     end;
 
 %% NoticeResponse
-on_message({?NOTICE, Data}, State) ->
+on_message(?NOTICE, Data, State) ->
     notify_async(State, {notice, epgsql_wire:decode_error(Data)}),
     {noreply, State};
 
 %% ParameterStatus
-on_message({?PARAMETER_STATUS, Data}, State) ->
+on_message(?PARAMETER_STATUS, Data, State) ->
     [Name, Value] = epgsql_wire:decode_strings(Data),
     Parameters2 = lists:keystore(Name, 1, State#state.parameters,
                                  {Name, Value}),
     {noreply, State#state{parameters = Parameters2}};
 
 %% NotificationResponse
-on_message({?NOTIFICATION, <<Pid:?int32, Strings/binary>>}, State) ->
+on_message(?NOTIFICATION, <<Pid:?int32, Strings/binary>>, State) ->
     {Channel1, Payload1} = case epgsql_wire:decode_strings(Strings) of
         [Channel, Payload] -> {Channel, Payload};
         [Channel]          -> {Channel, <<>>}
@@ -824,38 +505,66 @@ on_message({?NOTIFICATION, <<Pid:?int32, Strings/binary>>}, State) ->
     notify_async(State, {notification, Channel1, Pid, Payload1}),
     {noreply, State};
 
+%% ParseComplete
+%% ParameterDescription
+%% RowDescription
+%% NoData
+%% BindComplete
+%% CloseComplete
+%% DataRow
+%% PortalSuspended
+%% EmptyQueryResponse
+%% CopyData
 %% CopyBothResponse
-on_message({?COPY_BOTH_RESPONSE, _Data}, State) ->
-    State2 = finish(State, ok),
-    {noreply, State2};
+on_message(Msg, Payload, State) ->
+    command_handle_message(Msg, Payload, State).
 
-%% CopyData for COPY command. COPY command not supported yet.
-on_message({?COPY_DATA, _Data}, #state{repl_cbmodule = undefined, repl_receiver = undefined} = State) ->
-    {stop, {error, copy_command_not_supported}, State};
 
 %% CopyData for Replication mode
-on_message({?COPY_DATA, <<?PRIMARY_KEEPALIVE_MESSAGE:8, LSN:?int64, _Timestamp:?int64, ReplyRequired:8>>},
-    #state{repl_last_flushed_lsn = LastFlushedLSN, repl_last_applied_lsn = LastAppliedLSN} = State) ->
-    case ReplyRequired of
-        1 ->
-            send(State, ?COPY_DATA,
-                epgsql_wire:encode_standby_status_update(LSN, LastFlushedLSN, LastAppliedLSN)),
-            {noreply, State#state{repl_feedback_required = false, repl_last_received_lsn = LSN}};
-        _ ->
-            {noreply, State#state{repl_feedback_required = true, repl_last_received_lsn = LSN}}
-    end;
+on_replication(?COPY_DATA, <<?PRIMARY_KEEPALIVE_MESSAGE:8, LSN:?int64, _Timestamp:?int64, ReplyRequired:8>>,
+               #state{repl = #repl{last_flushed_lsn = LastFlushedLSN,
+                                   last_applied_lsn = LastAppliedLSN} = Repl} = State) ->
+    Repl1 =
+        case ReplyRequired of
+            1 ->
+                send(State, ?COPY_DATA,
+                     epgsql_wire:encode_standby_status_update(LSN, LastFlushedLSN, LastAppliedLSN)),
+                Repl#repl{feedback_required = false,
+                          last_received_lsn = LSN};
+            _ ->
+                Repl#repl{feedback_required = true,
+                          last_received_lsn = LSN}
+        end,
+    {noreply, State#state{repl = Repl1}};
 
-%% CopyData for Replication mode. with async messages
-on_message({?COPY_DATA, <<?X_LOG_DATA, StartLSN:?int64, EndLSN:?int64, _Timestamp:?int64, WALRecord/binary>>},
-    #state{repl_cbmodule = undefined, repl_receiver = Receiver} = State) ->
+%% CopyData for Replication mode
+on_replication(?COPY_DATA, <<?X_LOG_DATA, StartLSN:?int64, EndLSN:?int64,
+                             _Timestamp:?int64, WALRecord/binary>>,
+               #state{repl = Repl} = State) ->
+    Repl1 = handle_xlog_data(StartLSN, EndLSN, WALRecord, Repl),
+    {noreply, State#state{repl = Repl1}};
+on_replication(?ERROR, Err, State) ->
+    Reason = epgsql_wire:decode_error(Err),
+    {stop, {error, Reason}, State};
+on_replication(M, Data, Sock) when M == ?NOTICE;
+                                   M == ?NOTIFICATION;
+                                   M == ?PARAMETER_STATUS ->
+    on_message(M, Data, Sock).
+
+
+handle_xlog_data(StartLSN, EndLSN, WALRecord, #repl{cbmodule = undefined,
+                                                    receiver = Receiver} = Repl) ->
+    %% with async messages
     Receiver ! {epgsql, self(), {x_log_data, StartLSN, EndLSN, WALRecord}},
-    {noreply, State#state{repl_feedback_required = true, repl_last_received_lsn = EndLSN}};
-
-%% CopyData for Replication mode. with callback method
-on_message({?COPY_DATA, <<?X_LOG_DATA, StartLSN:?int64, EndLSN:?int64, _Timestamp:?int64, WALRecord/binary>>},
-    #state{repl_cbmodule = CbModule, repl_cbstate = CbState, repl_receiver = undefined} = State) ->
+    Repl#repl{feedback_required = true,
+              last_received_lsn = EndLSN};
+handle_xlog_data(StartLSN, EndLSN, WALRecord,
+                 #repl{cbmodule = CbModule, cbstate = CbState, receiver = undefined} = Repl) ->
+    %% with callback method
     {ok, LastFlushedLSN, LastAppliedLSN, NewCbState} =
         CbModule:handle_x_log_data(StartLSN, EndLSN, WALRecord, CbState),
-    {noreply, State#state{repl_feedback_required = true, repl_last_received_lsn = EndLSN,
-        repl_last_flushed_lsn = LastFlushedLSN, repl_last_applied_lsn = LastAppliedLSN,
-        repl_cbstate = NewCbState}}.
+    Repl#repl{feedback_required = true,
+              last_received_lsn = EndLSN,
+              last_flushed_lsn = LastFlushedLSN,
+              last_applied_lsn = LastAppliedLSN,
+              cbstate = NewCbState}.

+ 21 - 12
src/epgsql_wire.erl

@@ -7,29 +7,27 @@
          decode_error/1,
          decode_strings/1,
          decode_columns/3,
+         decode_parameters/2,
          encode/1,
          encode/2,
-         decode_data/3,
+         build_decoder/2,
+         decode_data/2,
          decode_complete/1,
          encode_types/2,
          encode_formats/1,
-         format/1,
+         format/2,
          encode_parameters/2,
          encode_standby_status_update/3]).
 
 -include("epgsql.hrl").
--include("epgsql_binary.hrl").
+-include("protocol.hrl").
+
 
 decode_message(<<Type:8, Len:?int32, Rest/binary>> = Bin) ->
     Len2 = Len - 4,
     case Rest of
         <<Data:Len2/binary, Tail/binary>> ->
-            case Type of
-                $E ->
-                    {{error, decode_error(Data)}, Tail};
-                _ ->
-                    {{Type, Data}, Tail}
-            end;
+            {Type, Data, Tail};
         _Other ->
             Bin
     end;
@@ -113,6 +111,7 @@ lower_atom(Str) when is_binary(Str) ->
 lower_atom(Str) when is_list(Str) ->
     list_to_atom(string:to_lower(Str)).
 
+%% FIXME: return iolist
 encode(Data) ->
     Bin = iolist_to_binary(Data),
     <<(byte_size(Bin) + 4):?int32, Bin/binary>>.
@@ -121,8 +120,13 @@ encode(Type, Data) ->
     Bin = iolist_to_binary(Data),
     <<Type:8, (byte_size(Bin) + 4):?int32, Bin/binary>>.
 
-%% decode data
-decode_data(Columns, Bin, Codec) ->
+%% Build decoder for DataRow
+build_decoder(Columns, Codec) ->
+    {Columns, Codec}.
+
+%% decode row data
+%% FIXME: use body recursion
+decode_data(Bin, {Columns, Codec}) ->
     decode_data(Columns, Bin, [], Codec).
 
 decode_data([], _Bin, Acc, _Codec) ->
@@ -139,6 +143,7 @@ decode_data([C | T], <<Len:?int32, Value:Len/binary, Rest/binary>>, Acc, Codec)
     decode_data(T, Rest, [Value2 | Acc], Codec).
 
 %% decode column information
+%% TODO: use body-recursion
 decode_columns(Count, Bin, Codec) ->
     decode_columns(Count, Bin, [], Codec).
 
@@ -156,6 +161,10 @@ decode_columns(N, Bin, Acc, Codec) ->
       format   = Format},
     decode_columns(N - 1, Rest2, [Desc | Acc], Codec).
 
+%% decode ParameterDescription
+decode_parameters(<<_Count:?int16, Bin/binary>>, Codec) ->
+    [epgsql_binary:oid2type(Oid, Codec) || <<Oid:?int32>> <= Bin].
+
 %% decode command complete msg
 decode_complete(<<"SELECT", 0>>)        -> select;
 decode_complete(<<"SELECT", _/binary>>) -> select;
@@ -196,7 +205,7 @@ encode_formats([], Count, Acc) ->
 encode_formats([#column{format = Format} | T], Count, Acc) ->
     encode_formats(T, Count + 1, <<Acc/binary, Format:?int16>>).
 
-format(Type) ->
+format(Type, _Codec) ->
     case epgsql_binary:supports(Type) of
         true  -> 1;
         false -> 0

+ 16 - 16
src/epgsqla.erl

@@ -48,7 +48,9 @@ connect(Host, Username, Password, Opts) ->
 -spec connect(epgsql:connection(), inet:ip_address() | inet:hostname(),
               string(), string(), [epgsql:connect_option()]) -> reference().
 connect(C, Host, Username, Password, Opts) ->
-    complete_connect(C, cast(C, {connect, Host, Username, Password, epgsql:to_proplist(Opts)})).
+    complete_connect(
+      C, cast(
+           C, epgsql_cmd_connect, {Host, Username, Password, epgsql:to_proplist(Opts)})).
 
 -spec close(epgsql:connection()) -> ok.
 close(C) ->
@@ -71,18 +73,18 @@ get_cmd_status(C) ->
 
 -spec squery(epgsql:connection(), string()) -> reference().
 squery(C, Sql) ->
-    cast(C, {squery, Sql}).
+    cast(C, epgsql_cmd_squery, Sql).
 
 equery(C, Sql) ->
     equery(C, Sql, []).
 
 -spec equery(epgsql:connection(), #statement{}, [epgsql:typed_param()]) -> reference().
 equery(C, Statement, TypedParameters) ->
-    cast(C, {equery, Statement, TypedParameters}).
+    cast(C, epgsql_cmd_equery, {Statement, TypedParameters}).
 
 -spec prepared_query(epgsql:connection(), #statement{}, [epgsql:typed_param()]) -> reference().
 prepared_query(C, Statement, TypedParameters) ->
-    cast(C, {prepared_query, Statement, TypedParameters}).
+    cast(C, epgsql_cmd_prepared_query, {Statement, TypedParameters}).
 
 parse(C, Sql) ->
     parse(C, "", Sql, []).
@@ -92,14 +94,14 @@ parse(C, Sql, Types) ->
 
 -spec parse(epgsql:connection(), iolist(), string(), [epgsql_type()]) -> reference().
 parse(C, Name, Sql, Types) ->
-    cast(C, {parse, Name, Sql, Types}).
+    cast(C, epgsql_cmd_parse, {Name, Sql, Types}).
 
 bind(C, Statement, Parameters) ->
     bind(C, Statement, "", Parameters).
 
 -spec bind(epgsql:connection(), #statement{}, string(), [epgsql:bind_param()]) -> reference().
 bind(C, Statement, PortalName, Parameters) ->
-    cast(C, {bind, Statement, PortalName, Parameters}).
+    cast(C, epgsql_cmd_bind, {Statement, PortalName, Parameters}).
 
 execute(C, S) ->
     execute(C, S, "", 0).
@@ -109,29 +111,29 @@ execute(C, S, N) ->
 
 -spec execute(epgsql:connection(), #statement{}, string(), non_neg_integer()) -> reference().
 execute(C, Statement, PortalName, MaxRows) ->
-    cast(C, {execute, Statement, PortalName, MaxRows}).
+    cast(C, epgsql_cmd_execute, {Statement, PortalName, MaxRows}).
 
 -spec execute_batch(epgsql:connection(), [{#statement{}, [epgsql:bind_param()]}]) -> reference().
 execute_batch(C, Batch) ->
-    cast(C, {execute_batch, Batch}).
+    cast(C, epgsql_cmd_batch, Batch).
 
 describe(C, #statement{name = Name}) ->
     describe(C, statement, Name).
 
 describe(C, statement, Name) ->
-    cast(C, {describe_statement, Name});
+    cast(C, epgsql_cmd_describe_statement, Name);
 
 describe(C, portal, Name) ->
-    cast(C, {describe_portal, Name}).
+    cast(C, epgsql_cmd_describe_portal, Name).
 
 close(C, #statement{name = Name}) ->
     close(C, statement, Name).
 
 close(C, Type, Name) ->
-    cast(C, {close, Type, Name}).
+    cast(C, epgsql_cmd_close, {Type, Name}).
 
 sync(C) ->
-    cast(C, sync).
+    cast(C, epgsql_cmd_sync, []).
 
 -spec cancel(epgsql:connection()) -> ok.
 cancel(C) ->
@@ -139,10 +141,8 @@ cancel(C) ->
 
 %% -- internal functions --
 
-cast(C, Command) ->
-    Ref = make_ref(),
-    gen_server:cast(C, {{cast, self(), Ref}, Command}),
-    Ref.
+cast(C, Command, Args) ->
+    epgsql_sock:async_command(C, cast, Command, Args).
 
 complete_connect(C, Ref) ->
     receive

+ 16 - 16
src/epgsqli.erl

@@ -47,7 +47,9 @@ connect(Host, Username, Password, Opts) ->
 -spec connect(epgsql:connection(), inet:ip_address() | inet:hostname(),
               string(), string(), [epgsql:connect_option()]) -> reference().
 connect(C, Host, Username, Password, Opts) ->
-    epgsqla:complete_connect(C, incremental(C, {connect, Host, Username, Password, epgsql:to_proplist(Opts)})).
+    epgsqla:complete_connect(
+      C, incremental(
+           C, epgsql_cmd_connect, {Host, Username, Password, epgsql:to_proplist(Opts)})).
 
 -spec close(epgsql:connection()) -> ok.
 close(C) ->
@@ -70,18 +72,18 @@ get_cmd_status(C) ->
 
 -spec squery(epgsql:connection(), string()) -> reference().
 squery(C, Sql) ->
-    incremental(C, {squery, Sql}).
+    incremental(C, epgsql_cmd_squery, Sql).
 
 equery(C, Sql) ->
     equery(C, Sql, []).
 
 -spec equery(epgsql:connection(), #statement{}, [epgsql:typed_param()]) -> reference().
 equery(C, Statement, TypedParameters) ->
-    incremental(C, {equery, Statement, TypedParameters}).
+    incremental(C, epgsql_cmd_equery, {Statement, TypedParameters}).
 
 -spec prepared_query(epgsql:connection(), #statement{}, [epgsql:typed_param()]) -> reference().
 prepared_query(C, Statement, TypedParameters) ->
-    incremental(C, {prepared_query, Statement, TypedParameters}).
+    incremental(C, epgsql_cmd_prepared_query, {Statement, TypedParameters}).
 
 parse(C, Sql) ->
     parse(C, "", Sql, []).
@@ -91,14 +93,14 @@ parse(C, Sql, Types) ->
 
 -spec parse(epgsql:connection(), iolist(), string(), [epgsql_type()]) -> reference().
 parse(C, Name, Sql, Types) ->
-    incremental(C, {parse, Name, Sql, Types}).
+    incremental(C, epgsql_cmd_parse, {Name, Sql, Types}).
 
 bind(C, Statement, Parameters) ->
     bind(C, Statement, "", Parameters).
 
 -spec bind(epgsql:connection(), #statement{}, string(), [epgsql:bind_param()]) -> reference().
 bind(C, Statement, PortalName, Parameters) ->
-    incremental(C, {bind, Statement, PortalName, Parameters}).
+    incremental(C, epgsql_cmd_bind, {Statement, PortalName, Parameters}).
 
 execute(C, S) ->
     execute(C, S, "", 0).
@@ -108,29 +110,29 @@ execute(C, S, N) ->
 
 -spec execute(epgsql:connection(), #statement{}, string(), non_neg_integer()) -> reference().
 execute(C, Statement, PortalName, MaxRows) ->
-    incremental(C, {execute, Statement, PortalName, MaxRows}).
+    incremental(C, epgsql_cmd_execute, {Statement, PortalName, MaxRows}).
 
 -spec execute_batch(epgsql:connection(), [{#statement{}, [epgsql:bind_param()]}]) -> reference().
 execute_batch(C, Batch) ->
-    incremental(C, {execute_batch, Batch}).
+    incremental(C, epgsql_cmd_batch, Batch).
 
 describe(C, #statement{name = Name}) ->
     describe(C, statement, Name).
 
 describe(C, statement, Name) ->
-    incremental(C, {describe_statement, Name});
+    incremental(C, epgsql_cmd_describe_statement, Name);
 
 describe(C, portal, Name) ->
-    incremental(C, {describe_portal, Name}).
+    incremental(C, epgsql_cmd_describe_portal, Name).
 
 close(C, #statement{name = Name}) ->
     close(C, statement, Name).
 
 close(C, Type, Name) ->
-    incremental(C, {close, Type, Name}).
+    incremental(C, epgsql_cmd_close, {Type, Name}).
 
 sync(C) ->
-    incremental(C, sync).
+    incremental(C, epgsql_cmd_sync, []).
 
 -spec cancel(epgsql:connection()) -> ok.
 cancel(C) ->
@@ -139,7 +141,5 @@ cancel(C) ->
 
 %% -- internal functions --
 
-incremental(C, Command) ->
-    Ref = make_ref(),
-    gen_server:cast(C, {{incremental, self(), Ref}, Command}),
-    Ref.
+incremental(C, Command, Args) ->
+    epgsql_sock:async_command(C, incremental, Command, Args).

+ 1 - 1
test/epgsql_SUITE.erl

@@ -1038,7 +1038,7 @@ get_cmd_status(Config) ->
         {ok, 1} = Module:squery(C, "UPDATE cmd_status_t SET col=3 WHERE col=1"),
         ?assertEqual({ok, {'update', 1}}, Module:get_cmd_status(C)),
         %% Failed queries have no status
-        {error, _} = Module:squery(C, "UPDATE cmd_status_t SET col='text' WHERE col=2"),
+        {error, _} = Module:squery(C, "DELETE FROM cmd_status_t WHERE not_col=2"),
         ?assertEqual({ok, undefined}, Module:get_cmd_status(C)),
         %% if COMMIT failed, status will be 'rollback'
         {ok, [], []} = Module:squery(C, "COMMIT"),

+ 1 - 2
test/epgsql_incremental.erl

@@ -221,8 +221,7 @@ receive_describe(C, Ref, Statement = #statement{}) ->
         {C, Ref, {types, Types}} ->
             receive_describe(C, Ref, Statement#statement{types = Types});
         {C, Ref, {columns, Columns}} ->
-            Columns2 = [Col#column{format = epgsql_wire:format(Col#column.type)} || Col <- Columns],
-            {ok, Statement#statement{columns = Columns2}};
+            {ok, Statement#statement{columns = Columns}};
         {C, Ref, no_data} ->
             {ok, Statement#statement{columns = []}};
         {C, Ref, Error = {error, _}} ->