Browse Source

epgsql_sock: aggregate small packets into one

Aggregate small packets into large one to reduce number of
port_command/2 calls, and eventually reduce number of packets.
Jihyun Yu 9 years ago
parent
commit
9eb40af6a8
1 changed files with 44 additions and 27 deletions
  1. 44 27
      src/epgsql_sock.erl

+ 44 - 27
src/epgsql_sock.erl

@@ -217,17 +217,21 @@ command({equery, Statement, Parameters}, #state{codec = Codec} = State) ->
     #statement{name = StatementName, columns = Columns} = Statement,
     Bin1 = epgsql_wire:encode_parameters(Parameters, Codec),
     Bin2 = epgsql_wire:encode_formats(Columns),
-    send(State, ?BIND, ["", 0, StatementName, 0, Bin1, Bin2]),
-    send(State, ?EXECUTE, ["", 0, <<0:?int32>>]),
-    send(State, ?CLOSE, [?PREPARED_STATEMENT, StatementName, 0]),
-    send(State, ?SYNC, []),
+    send_multi(State, [
+        {?BIND, ["", 0, StatementName, 0, Bin1, Bin2]},
+        {?EXECUTE, ["", 0, <<0:?int32>>]},
+        {?CLOSE, [?PREPARED_STATEMENT, StatementName, 0]},
+        {?SYNC, []}
+    ]),
     {noreply, State};
 
 command({parse, Name, Sql, Types}, State) ->
     Bin = epgsql_wire:encode_types(Types, State#state.codec),
-    send(State, ?PARSE, [Name, 0, Sql, 0, Bin]),
-    send(State, ?DESCRIBE, [?PREPARED_STATEMENT, Name, 0]),
-    send(State, ?FLUSH, []),
+    send_multi(State, [
+        {?PARSE, [Name, 0, Sql, 0, Bin]},
+        {?DESCRIBE, [?PREPARED_STATEMENT, Name, 0]},
+        {?FLUSH, []}
+    ]),
     {noreply, State};
 
 command({bind, Statement, PortalName, Parameters}, #state{codec = Codec} = State) ->
@@ -235,43 +239,49 @@ command({bind, Statement, PortalName, Parameters}, #state{codec = Codec} = State
     Typed_Parameters = lists:zip(Types, Parameters),
     Bin1 = epgsql_wire:encode_parameters(Typed_Parameters, Codec),
     Bin2 = epgsql_wire:encode_formats(Columns),
-    send(State, ?BIND, [PortalName, 0, StatementName, 0, Bin1, Bin2]),
-    send(State, ?FLUSH, []),
+    send_multi(State, [
+        {?BIND, [PortalName, 0, StatementName, 0, Bin1, Bin2]},
+        {?FLUSH, []}
+    ]),
     {noreply, State};
 
 command({execute, _Statement, PortalName, MaxRows}, State) ->
-    send(State, ?EXECUTE, [PortalName, 0, <<MaxRows:?int32>>]),
-    send(State, ?FLUSH, []),
+    send_multi(State, [
+        {?EXECUTE, [PortalName, 0, <<MaxRows:?int32>>]},
+        {?FLUSH, []}
+    ]),
     {noreply, State};
 
-command({execute_batch, Batch}, State) ->
-    #state{mod = Mod, sock = Sock, codec = Codec} = State,
-    BindExecute =
-        lists:map(
-          fun({Statement, Parameters}) ->
+command({execute_batch, Batch}, #state{codec = Codec} = State) ->
+    Commands =
+        lists:foldr(
+          fun({Statement, Parameters}, Acc) ->
                   #statement{name = StatementName,
                              columns = Columns,
                              types = Types} = Statement,
                   Typed_Parameters = lists:zip(Types, Parameters),
                   Bin1 = epgsql_wire:encode_parameters(Typed_Parameters, Codec),
                   Bin2 = epgsql_wire:encode_formats(Columns),
-                  [epgsql_wire:encode(?BIND, [0, StatementName, 0,
-                                             Bin1, Bin2]),
-                   epgsql_wire:encode(?EXECUTE, [0, <<0:?int32>>])]
+                  [{?BIND, [0, StatementName, 0, Bin1, Bin2]},
+                   {?EXECUTE, [0, <<0:?int32>>]} | Acc]
           end,
+          [{?SYNC, []}],
           Batch),
-    Sync = epgsql_wire:encode(?SYNC, []),
-    do_send(Mod, Sock, [BindExecute, Sync]),
+    send_multi(State, Commands),
     {noreply, State};
 
 command({describe_statement, Name}, State) ->
-    send(State, ?DESCRIBE, [?PREPARED_STATEMENT, Name, 0]),
-    send(State, ?FLUSH, []),
+    send_multi(State, [
+        {?DESCRIBE, [?PREPARED_STATEMENT, Name, 0]},
+        {?FLUSH, []}
+    ]),
     {noreply, State};
 
 command({describe_portal, Name}, State) ->
-    send(State, ?DESCRIBE, [?PORTAL, Name, 0]),
-    send(State, ?FLUSH, []),
+    send_multi(State, [
+        {?DESCRIBE, [?PORTAL, Name, 0]},
+        {?FLUSH, []}
+    ]),
     {noreply, State};
 
 command({close, Type, Name}, State) ->
@@ -279,8 +289,10 @@ command({close, Type, Name}, State) ->
         statement -> ?PREPARED_STATEMENT;
         portal    -> ?PORTAL
     end,
-    send(State, ?CLOSE, [Type2, Name, 0]),
-    send(State, ?FLUSH, []),
+    send_multi(State, [
+        {?CLOSE, [Type2, Name, 0]},
+        {?FLUSH, []}
+    ]),
     {noreply, State};
 
 command(sync, State) ->
@@ -316,6 +328,11 @@ send(#state{mod = Mod, sock = Sock}, Data) ->
 send(#state{mod = Mod, sock = Sock}, Type, Data) ->
     do_send(Mod, Sock, epgsql_wire:encode(Type, Data)).
 
+send_multi(#state{mod = Mod, sock = Sock}, List) ->
+    do_send(Mod, Sock, lists:map(fun({Type, Data}) ->
+        epgsql_wire:encode(Type, Data)
+    end, List)).
+
 do_send(gen_tcp, Sock, Bin) ->
     try erlang:port_command(Sock, Bin) of
         true ->