Просмотр исходного кода

Add helpers to encode Client->Server packets to epgsql_wire

Sergey Prokhorov 4 лет назад
Родитель
Сommit
b493ec70c9

+ 4 - 4
src/commands/epgsql_cmd_batch.erl

@@ -58,7 +58,7 @@ execute(Sock, #batch{batch = Batch, statement = undefined} = State) ->
                   BinFormats = epgsql_wire:encode_formats(Columns),
                   add_command(StatementName, Types, Parameters, BinFormats, Codec, Acc)
           end,
-          [{?SYNC, []}],
+          [epgsql_wire:encode_sync()],
           Batch),
     {send_multi, Commands, Sock, State};
 execute(Sock, #batch{batch = Batch,
@@ -73,15 +73,15 @@ execute(Sock, #batch{batch = Batch,
           fun(Parameters, Acc) ->
                   add_command(StatementName, Types, Parameters, BinFormats, Codec, Acc)
           end,
-          [{?SYNC, []}],
+          [epgsql_wire:encode_sync()],
           Batch),
     {send_multi, Commands, Sock, State}.
 
 add_command(StmtName, Types, Params, BinFormats, Codec, Acc) ->
     TypedParameters = lists:zip(Types, Params),
     BinParams = epgsql_wire:encode_parameters(TypedParameters, Codec),
-    [{?BIND, [0, StmtName, 0, BinParams, BinFormats]},
-     {?EXECUTE, [0, <<0:?int32>>]} | Acc].
+    [epgsql_wire:encode_bind("", StmtName, BinParams, BinFormats),
+     epgsql_wire:encode_execute("", 0) | Acc].
 
 handle_message(?BIND_COMPLETE, <<>>, Sock, State) ->
     Columns = current_cols(State),

+ 2 - 2
src/commands/epgsql_cmd_bind.erl

@@ -31,8 +31,8 @@ execute(Sock, #bind{stmt = Stmt, portal = PortalName, params = Params} = St) ->
     Bin1 = epgsql_wire:encode_parameters(TypedParams, Codec),
     Bin2 = epgsql_wire:encode_formats(Columns),
     Commands = [
-       {?BIND, [PortalName, 0, StatementName, 0, Bin1, Bin2]},
-       {?FLUSH, []}
+       epgsql_wire:encode_bind(PortalName, StatementName, Bin1, Bin2),
+       epgsql_wire:encode_flush()
       ],
     {send_multi, Commands, Sock, St}.
 

+ 2 - 6
src/commands/epgsql_cmd_close.erl

@@ -22,13 +22,9 @@ init({Type, Name}) ->
     #close{type = Type, name = Name}.
 
 execute(Sock, #close{type = Type, name = Name} = St) ->
-    Type2 = case Type of
-        statement -> ?PREPARED_STATEMENT;
-        portal    -> ?PORTAL
-    end,
     Packets = [
-       {?CLOSE, [Type2, Name, 0]},
-       {?FLUSH, []}
+       epgsql_wire:encode_close(Type, Name),
+       epgsql_wire:encode_flush()
       ],
     {send_multi, Packets, Sock, St}.
 

+ 2 - 2
src/commands/epgsql_cmd_describe_portal.erl

@@ -24,8 +24,8 @@ init(Name) ->
 execute(Sock, #desc_portal{name = Name} = St) ->
     Commands =
       [
-       {?DESCRIBE, [?PORTAL, Name, 0]},
-       {?FLUSH, []}
+       epgsql_wire:encode_describe(portal, Name),
+       epgsql_wire:encode_flush()
       ],
     {send_multi, Commands, Sock, St}.
 

+ 2 - 2
src/commands/epgsql_cmd_describe_statement.erl

@@ -28,8 +28,8 @@ init(Name) ->
 execute(Sock, #desc_stmt{name = Name} = St) ->
     Commands =
       [
-       {?DESCRIBE, [?PREPARED_STATEMENT, Name, 0]},
-       {?FLUSH, []}
+       epgsql_wire:encode_describe(statement, Name),
+       epgsql_wire:encode_flush()
       ],
     {send_multi, Commands, Sock, St}.
 

+ 4 - 4
src/commands/epgsql_cmd_equery.erl

@@ -45,10 +45,10 @@ execute(Sock, #equery{stmt = Stmt, params = TypedParams} = St) ->
     Bin2 = epgsql_wire:encode_formats(Columns),
     Commands =
       [
-       {?BIND, ["", 0, StatementName, 0, Bin1, Bin2]},
-       {?EXECUTE, ["", 0, <<0:?int32>>]},
-       {?CLOSE, [?PREPARED_STATEMENT, StatementName, 0]},
-       {?SYNC, []}
+       epgsql_wire:encode_bind("", StatementName, Bin1, Bin2),
+       epgsql_wire:encode_execute("", 0),
+       epgsql_wire:encode_close(statement, StatementName),
+       epgsql_wire:encode_sync()
       ],
     {send_multi, Commands, Sock, St}.
 

+ 2 - 2
src/commands/epgsql_cmd_execute.erl

@@ -37,8 +37,8 @@ execute(Sock, #execute{stmt = Stmt, portal_name = PortalName, max_rows = MaxRows
     Decoder = epgsql_wire:build_decoder(Columns, Codec),
     Commands =
       [
-       {?EXECUTE, [PortalName, 0, <<MaxRows:?int32>>]},
-       {?FLUSH, []}
+       epgsql_wire:encode_execute(PortalName, MaxRows),
+       epgsql_wire:encode_flush()
       ],
     {send_multi, Commands, Sock, State#execute{decoder = Decoder}}.
 

+ 3 - 3
src/commands/epgsql_cmd_parse.erl

@@ -38,9 +38,9 @@ execute(Sock, #parse{name = Name, sql = Sql, types = Types} = St) ->
     Bin = epgsql_wire:encode_types(Types, Codec),
     Commands =
       [
-       {?PARSE, [Name, 0, Sql, 0, Bin]},
-       {?DESCRIBE, [?PREPARED_STATEMENT, Name, 0]},
-       {?FLUSH, []}
+       epgsql_wire:encode_parse(Name, Sql, Bin),
+       epgsql_wire:encode_describe(statement, Name),
+       epgsql_wire:encode_flush()
       ],
     {send_multi, Commands, Sock, St}.
 

+ 3 - 3
src/commands/epgsql_cmd_prepared_query.erl

@@ -39,9 +39,9 @@ execute(Sock, #pquery{stmt = Stmt, params = TypedParams} = St) ->
     Bin2 = epgsql_wire:encode_formats(Columns),
     Commands =
       [
-       {?BIND, ["", 0, StatementName, 0, Bin1, Bin2]},
-       {?EXECUTE, ["", 0, <<0:?int32>>]},
-       {?SYNC, []}
+       epgsql_wire:encode_bind("", StatementName, Bin1, Bin2),
+       epgsql_wire:encode_execute("", 0),
+       epgsql_wire:encode_sync()
       ],
     {send_multi, Commands, Sock, St}.
 

+ 2 - 1
src/commands/epgsql_cmd_squery.erl

@@ -38,7 +38,8 @@ init(Sql) ->
     #squery{query = Sql}.
 
 execute(Sock, #squery{query = Q} = State) ->
-    {send, ?SIMPLEQUERY, [Q, 0], Sock, State}.
+    {Type, Data} = epgsql_wire:encode_query(Q),
+    {send, Type, Data, Sock, State}.
 
 handle_message(?ROW_DESCRIPTION, <<Count:?int16, Bin/binary>>, Sock, State) ->
     Codec = epgsql_sock:get_codec(Sock),

+ 2 - 1
src/commands/epgsql_cmd_start_replication.erl

@@ -59,7 +59,8 @@ execute(Sock, #start_repl{slot = ReplicationSlot, callback = Callback,
                        align_lsn = AlignLsn},
     Sock2 = epgsql_sock:set_attr(replication_state, Repl3, Sock),
                          %% handler = on_replication},
-    {send, ?SIMPLEQUERY, [Sql2, 0], Sock2, St}.
+    {PktType, PktData} = epgsql_wire:encode_query(Sql2),
+    {send, PktType, PktData, Sock2, St}.
 
 %% CopyBothResponse
 handle_message(?COPY_BOTH_RESPONSE, _Data, Sock, _State) ->

+ 3 - 1
src/commands/epgsql_cmd_sync.erl

@@ -1,6 +1,7 @@
 %% @doc Synchronize client and server states for multi-command combinations
 %%
 %% Should be executed if APIs start to return `{error, sync_required}'.
+%% See [https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY]
 %% ```
 %% > Sync
 %% < ReadyForQuery
@@ -21,7 +22,8 @@ init(_) ->
 
 execute(Sock, St) ->
     Sock1 = epgsql_sock:set_attr(sync_required, false, Sock),
-    {send, ?SYNC, [], Sock1, St}.
+    {Type, Data} = epgsql_wire:encode_sync(),
+    {send, Type, Data, Sock1, St}.
 
 handle_message(?READY_FOR_QUERY, _, Sock, _State) ->
     {finish, ok, ok, Sock};

+ 2 - 1
src/commands/epgsql_cmd_update_type_cache.erl

@@ -22,7 +22,8 @@ execute(Sock, #upd{codecs = Codecs} = State) ->
     CodecEntries = epgsql_codec:init_mods(Codecs, Sock),
     TypeNames = [element(1, Entry) || Entry <- CodecEntries],
     Query = epgsql_oid_db:build_query(TypeNames),
-    {send, ?SIMPLEQUERY, [Query, 0], Sock, State#upd{codec_entries = CodecEntries}}.
+    {PktType, PktData} = epgsql_wire:encode_query(Query),
+    {send, PktType, PktData, Sock, State#upd{codec_entries = CodecEntries}}.
 
 handle_message(?ROW_DESCRIPTION, <<Count:?int16, Bin/binary>>, Sock, State) ->
     Codec = epgsql_sock:get_codec(Sock),

+ 96 - 1
src/epgsql_wire.erl

@@ -24,13 +24,24 @@
          format/2,
          encode_parameters/2,
          encode_standby_status_update/3]).
--export_type([row_decoder/0, packet_type/0]).
+%% Encoders for Client -> Server packets
+-export([encode_query/1,
+         encode_parse/3,
+         encode_describe/2,
+         encode_bind/4,
+         encode_execute/2,
+         encode_close/2,
+         encode_flush/0,
+         encode_sync/0]).
+
+-export_type([row_decoder/0, packet_type/0, packet_type/1]).
 
 -include("epgsql.hrl").
 -include("protocol.hrl").
 
 -opaque row_decoder() :: {[epgsql_binary:decoder()], [epgsql:column()], epgsql_binary:codec()}.
 -type packet_type() :: byte().                 % see protocol.hrl
+-type packet_type(Exact) :: Exact.
 
 %% @doc tries to extract single postgresql packet from TCP stream
 -spec decode_message(binary()) -> {packet_type(), binary(), binary()} | binary().
@@ -61,6 +72,10 @@ decode_strings(Bin) ->
     <<Subj:Sz/binary, 0>> = Bin,
     binary:split(Subj, <<0>>, [global]).
 
+-spec encode_string(iodata()) -> iodata().
+encode_string(Val) ->
+    [Val, 0].
+
 %% @doc decode error's field
 -spec decode_fields(binary()) -> [{byte(), binary()}].
 decode_fields(Bin) ->
@@ -294,3 +309,83 @@ encode_standby_status_update(ReceivedLSN, FlushedLSN, AppliedLSN) ->
     %% microseconds since midnight on 2000-01-01
     Timestamp = ((MegaSecs * 1000000 + Secs) * 1000000 + MicroSecs) - 946684800*1000000,
     <<$r:8, ReceivedLSN:?int64, FlushedLSN:?int64, AppliedLSN:?int64, Timestamp:?int64, 0:8>>.
+
+%%
+%% Encoders for various PostgreSQL protocol client-side packets
+%% See https://www.postgresql.org/docs/current/protocol-message-formats.html
+%%
+
+%% @doc encodes simple 'Query' packet.
+encode_query(SQL) ->
+    {?SIMPLEQUERY, encode_string(SQL)}.
+
+%% @doc encodes 'Parse' packet.
+%%
+%% Results in `ParseComplete' response.
+%%
+%% @param ColumnEncoding see {@link encode_types/2}
+-spec encode_parse(iodata(), iodata(), iodata()) -> {packet_type(?PARSE), iodata()}.
+encode_parse(Name, SQL, ColumnEncoding) ->
+    {?PARSE, [encode_string(Name), encode_string(SQL), ColumnEncoding]}.
+
+%% @doc encodes `Describe' packet.
+%%
+%% @param What might be `?PORTAL' (results in `RowDescription' response) or `?PREPARED_STATEMENT'
+%%   (results in `ParameterDescription' followed by `RowDescription' or `NoData' response)
+-spec encode_describe(?PREPARED_STATEMENT | ?PORTAL | statement | portal, iodata()) ->
+          {packet_type(?DESCRIBE), iodata()}.
+encode_describe(What, Name) when What =:= ?PREPARED_STATEMENT;
+                                 What =:= ?PORTAL ->
+    {?DESCRIBE, [What, encode_string(Name)]};
+encode_describe(What, Name) when is_atom(What) ->
+    encode_describe(obj_atom_to_byte(What), Name).
+
+%% @doc encodes `Bind' packet.
+%%
+%% @param BinParams see {@link encode_parameters/2}.
+%% @param BinFormats  see {@link encode_formats/1}
+-spec encode_bind(iodata(), iodata(), iodata(), iodata()) -> {packet_type(?BIND), iodata()}.
+encode_bind(PortalName, StmtName, BinParams, BinFormats) ->
+    {?BIND, [encode_string(PortalName), encode_string(StmtName), BinParams, BinFormats]}.
+
+%% @doc encodes `Execute' packet.
+%%
+%% Results in 0 or up to `MaxRows' packets of `DataRow' type followed by `CommandComplete' (when no
+%% more rows are available) or `PortalSuspend' (repeated `Execute' will return more rows)
+%%
+%% @param PortalName  might be an empty string (anonymous portal) or name of the named portal
+%% @param MaxRows  how many rows server should send (0 means all of them)
+-spec encode_execute(iodata(), non_neg_integer()) -> {packet_type(?EXECUTE), iodata()}.
+encode_execute(PortalName, MaxRows) ->
+    {?EXECUTE, [encode_string(PortalName), <<MaxRows:?int32>>]}.
+
+%% @doc encodes `Close' packet.
+%%
+%% Results in `CloseComplete' response
+%%
+%% @param What see {@link encode_describe/2}
+-spec encode_close(?PREPARED_STATEMENT | ?PORTAL | statement | portal, iodata()) ->
+          {packet_type(?CLOSE), iodata()}.
+encode_close(What, Name) when What =:= ?PREPARED_STATEMENT;
+                              What =:= ?PORTAL ->
+    {?CLOSE, [What, encode_string(Name)]};
+encode_close(What, Name) when is_atom(What) ->
+    encode_close(obj_atom_to_byte(What), Name).
+
+%% @doc encodes `Flush' packet.
+%%
+%% It doesn't cause any specific response packet, but tells PostgreSQL server to flush it's send
+%% network buffers
+-spec encode_flush() -> {packet_type(?FLUSH), iodata()}.
+encode_flush() ->
+    {?FLUSH, []}.
+
+%% @doc encodes `Sync' packet.
+%%
+%% Results in `ReadyForQuery' response
+-spec encode_sync() -> {packet_type(?SYNC), iodata()}.
+encode_sync() ->
+    {?SYNC, []}.
+
+obj_atom_to_byte(statement) -> ?PREPARED_STATEMENT;
+obj_atom_to_byte(portal) -> ?PORTAL.