Browse Source

Introduce epgsql{a,i}:execute_batch/3

It is useful when there is a need to execute the same query
with a different sets of parameters.
The most obvious example is batch insert.
Sergey Prokhorov 5 years ago
parent
commit
44d900b434
8 changed files with 169 additions and 28 deletions
  1. 40 9
      README.md
  2. 52 14
      src/commands/epgsql_cmd_batch.erl
  3. 14 1
      src/epgsql.erl
  4. 5 1
      src/epgsqla.erl
  5. 5 1
      src/epgsqli.erl
  6. 29 0
      test/epgsql_SUITE.erl
  7. 12 1
      test/epgsql_cast.erl
  8. 12 1
      test/epgsql_incremental.erl

+ 40 - 9
README.md

@@ -289,8 +289,8 @@ squery including final `{C, Ref, done}`.
 {error, Error}             = epgsql:prepared_query(C, "non_existent_query", [Parameters]).
 ```
 
-`Parameters` - optional list of values to be bound to `$1`, `$2`, `$3`, etc.
-`Statement` - name of query given with ```erlang epgsql:parse(C, StatementName, "select ...", []).```
+- `Parameters` - optional list of values to be bound to `$1`, `$2`, `$3`, etc.
+- `Statement` - name of query given with ```erlang epgsql:parse(C, StatementName, "select ...", []).```
                (can be empty string) or `#statement{}` record returned by `epgsql:parse`.
 
 With prepared query one can parse a query giving it a name with `epgsql:parse` on start and reuse the name
@@ -385,23 +385,54 @@ Batch execution is `bind` + `execute` for several prepared statements.
 It uses unnamed portals and `MaxRows = 0`.
 
 ```erlang
-Results = epgsql:execute_batch(C, Batch).
+Results = epgsql:execute_batch(C, BatchStmt :: [{statement(), [bind_param()]}]).
+{Columns, Results} = epgsql:execute_batch(C, statement() | sql_query(), Batch :: [ [bind_param()] ]).
 ```
 
-- `Batch`   - list of {Statement, ParameterValues}
-- `Results` - list of {ok, Count} or {ok, Count, Rows}
+- `BatchStmt` - list of `{Statement, ParameterValues}`, each item has it's own `#statement{}`
+- `Batch` - list of `ParameterValues`, each item executes the same common `#statement{}` or SQL query
+- `Columns` - list of `#column{}` descriptions of `Results` columns
+- `Results` - list of `{ok, Count}` or `{ok, Count, Rows}`
+
+There are 2 versions:
+
+`execute_batch/2` - each item in a batch has it's own named statement (but it's allowed to have duplicates)
 
 example:
 
 ```erlang
-{ok, S1} = epgsql:parse(C, "one", "select $1", [int4]),
-{ok, S2} = epgsql:parse(C, "two", "select $1 + $2", [int4, int4]),
+{ok, S1} = epgsql:parse(C, "one", "select $1::integer", []),
+{ok, S2} = epgsql:parse(C, "two", "select $1::integer + $2::integer", []),
 [{ok, [{1}]}, {ok, [{3}]}] = epgsql:execute_batch(C, [{S1, [1]}, {S2, [1, 2]}]).
+ok = epgsql:close(C, "one").
+ok = epgsql:close(C, "two").
 ```
 
-`epgsqla:execute_batch/3` sends `{C, Ref, Results}`
+`execute_batch/3` - each item in a batch executed with the same common SQL query or `#statement{}`.
+It's allowed to use unnamed statement.
+
+example (the most efficient way to make batch inserts with epgsql):
+
+```erlang
+{ok, Stmt} = epgsql:parse(C, "my_insert", "INSERT INTO account (name, age) VALUES ($1, $2) RETURNING id", []).
+{[#column{name = <<"id">>}], [{ok, [{1}]}, {ok, [{2}]}, {ok, [{3}]}]} =
+    epgsql:execute_batch(C, Stmt, [ ["Joe", 35], ["Paul", 26], ["Mary", 24] ]).
+ok = epgsql:close(C, "my_insert").
+```
+
+equivalent:
+
+```erlang
+epgsql:execute_batch(C, "INSERT INTO account (name, age) VALUES ($1, $2) RETURNING id",
+                     [ ["Joe", 35], ["Paul", 26], ["Mary", 24] ]).
+```
+
+In case one of the batch items will cause an error, result returned for this particular
+item will be `{error, #error{}}` and no more results will be produced.
+
+`epgsqla:execute_batch/{2,3}` sends `{C, Ref, Results}`
 
-`epgsqli:execute_batch/3` sends
+`epgsqli:execute_batch/{2,3}` sends
 
 - `{C, Ref, {data, Row}}`
 - `{C, Ref, {error, Reason}}`

+ 52 - 14
src/commands/epgsql_cmd_batch.erl

@@ -1,15 +1,18 @@
-%% > Bind
-%% < BindComplete
-%% > Execute
-%% < DataRow*
-%% < CommandComplete
-%% -- Repeated many times --
+%% > {Bind
+%% <  BindComplete
+%% >  Execute
+%% <  DataRow*
+%% <  CommandComplete}*
 %% > Sync
 %% < ReadyForQuery
 -module(epgsql_cmd_batch).
 -behaviour(epgsql_command).
 -export([init/1, execute/2, handle_message/4]).
--export_type([response/0]).
+-export_type([arguments/0, response/0]).
+
+-type arguments() ::
+        {epgsql:statement(), [ [epgsql:bind_param()] ]} |
+        [{epgsql:statement(), [epgsql:bind_param()]}].
 
 -type response() :: [{ok, Count :: non_neg_integer(), Rows :: [tuple()]}
                      | {ok, Count :: non_neg_integer()}
@@ -20,13 +23,18 @@
 -include("protocol.hrl").
 
 -record(batch,
-        {batch :: [{#statement{}, list()}],
-         decoder}).
+        {batch :: [ [epgsql:bind_param()] ] | [{#statement{}, [epgsql:bind_param()]}],
+         statement :: #statement{} | undefined,
+         decoder :: epgsql_wire:row_decoder() | undefined}).
 
-init(Batch) ->
+-spec init(arguments()) -> #batch{}.
+init({#statement{} = Statement, Batch}) ->
+    #batch{statement = Statement,
+           batch = Batch};
+init(Batch) when is_list(Batch) ->
     #batch{batch = Batch}.
 
-execute(Sock, #batch{batch = Batch} = State) ->
+execute(Sock, #batch{batch = Batch, statement = undefined} = State) ->
     Codec = epgsql_sock:get_codec(Sock),
     Commands =
         lists:foldr(
@@ -43,10 +51,28 @@ execute(Sock, #batch{batch = Batch} = State) ->
           [{?SYNC, []}],
           Batch),
     epgsql_sock:send_multi(Sock, Commands),
+    {ok, Sock, State};
+execute(Sock, #batch{batch = Batch,
+                     statement = #statement{name = StatementName,
+                                            columns = Columns,
+                                            types = Types}} = State) ->
+    Codec = epgsql_sock:get_codec(Sock),
+    BinFormats = epgsql_wire:encode_formats(Columns),
+    Commands =
+        lists:foldr(
+          fun(Parameters, Acc) ->
+                  TypedParameters = lists:zip(Types, Parameters),
+                  BinParams = epgsql_wire:encode_parameters(TypedParameters, Codec),
+                  [{?BIND, [0, StatementName, 0, BinParams, BinFormats]},
+                   {?EXECUTE, [0, <<0:?int32>>]} | Acc]
+          end,
+          [{?SYNC, []}],
+          Batch),
+    epgsql_sock:send_multi(Sock, Commands),
     {ok, Sock, State}.
 
-handle_message(?BIND_COMPLETE, <<>>, Sock, #batch{batch = [{Stmt, _} | _]} = State) ->
-    #statement{columns = Columns} = Stmt,
+handle_message(?BIND_COMPLETE, <<>>, Sock, State) ->
+    Columns = current_cols(State),
     Codec = epgsql_sock:get_codec(Sock),
     Decoder = epgsql_wire:build_decoder(Columns, Codec),
     {noaction, Sock, State#batch{decoder = Decoder}};
@@ -58,7 +84,8 @@ handle_message(?DATA_ROW, <<_Count:?int16, Bin/binary>>, Sock,
 %%     Sock1 = epgsql_sock:add_result(Sock, {complete, empty}, {ok, [], []}),
 %%     {noaction, Sock1};
 handle_message(?COMMAND_COMPLETE, Bin, Sock,
-               #batch{batch = [{#statement{columns = Columns}, _} | Batch]} = State) ->
+               #batch{batch = [_ | Batch]} = State) ->
+    Columns = current_cols(State),
     Complete = epgsql_wire:decode_complete(Bin),
     Rows = epgsql_sock:get_rows(Sock),
     Result = case Complete of
@@ -79,3 +106,14 @@ handle_message(?ERROR, Error, Sock, #batch{batch = [_ | Batch]} = State) ->
     {add_result, Result, Result, Sock, State#batch{batch = Batch}};
 handle_message(_, _, _, _) ->
     unknown.
+
+%% Helpers
+
+current_cols(Batch) ->
+    #statement{columns = Columns} = current_stmt(Batch),
+    Columns.
+
+current_stmt(#batch{batch = [{Stmt, _} | _], statement = undefined}) ->
+    Stmt;
+current_stmt(#batch{statement = #statement{} = Stmt}) ->
+    Stmt.

+ 14 - 1
src/epgsql.erl

@@ -15,7 +15,7 @@
          describe/2, describe/3,
          bind/3, bind/4,
          execute/2, execute/3, execute/4,
-         execute_batch/2,
+         execute_batch/2, execute_batch/3,
          close/2, close/3,
          sync/1,
          cancel/1,
@@ -315,6 +315,19 @@ execute(C, S, PortalName, N) ->
 execute_batch(C, Batch) ->
     epgsql_sock:sync_command(C, epgsql_cmd_batch, Batch).
 
+-spec execute_batch(connection(), statement() | sql_query(), [ [bind_param()] ]) ->
+                           {[column()], epgsql_cmd_batch:response()}.
+execute_batch(C, #statement{columns = Cols} = Statement, Batch) ->
+    {Cols, epgsql_sock:sync_command(C, epgsql_cmd_batch, {Statement, Batch})};
+execute_batch(C, Sql, Batch) ->
+    case parse(C, Sql) of
+        {ok, #statement{} = S} ->
+            execute_batch(C, S, Batch);
+        Error ->
+            Error
+    end.
+
+
 %% statement/portal functions
 -spec describe(connection(), statement()) -> epgsql_cmd_describe_statement:response().
 describe(C, #statement{name = Name}) ->

+ 5 - 1
src/epgsqla.erl

@@ -15,7 +15,7 @@
          describe/2, describe/3,
          bind/3, bind/4,
          execute/2, execute/3, execute/4,
-         execute_batch/2,
+         execute_batch/2, execute_batch/3,
          close/2, close/3,
          sync/1,
          cancel/1,
@@ -123,6 +123,10 @@ execute(C, Statement, PortalName, MaxRows) ->
 execute_batch(C, Batch) ->
     cast(C, epgsql_cmd_batch, Batch).
 
+-spec execute_batch(epgsql:connection(), epgsql:statement(), [ [epgsql:bind_param()] ]) -> reference().
+execute_batch(C, #statement{} = Statement, Batch) ->
+    cast(C, epgsql_cmd_batch, {Statement, Batch}).
+
 describe(C, #statement{name = Name}) ->
     describe(C, statement, Name).
 

+ 5 - 1
src/epgsqli.erl

@@ -15,7 +15,7 @@
          describe/2, describe/3,
          bind/3, bind/4,
          execute/2, execute/3, execute/4,
-         execute_batch/2,
+         execute_batch/2, execute_batch/3,
          close/2, close/3,
          sync/1,
          cancel/1]).
@@ -123,6 +123,10 @@ execute(C, Statement, PortalName, MaxRows) ->
 execute_batch(C, Batch) ->
     incremental(C, epgsql_cmd_batch, Batch).
 
+-spec execute_batch(epgsql:connection(), epgsql:statement(), [ [epgsql:bind_param()] ]) -> reference().
+execute_batch(C, #statement{} = Statement, Batch) ->
+    incremental(C, epgsql_cmd_batch, {Statement, Batch}).
+
 describe(C, #statement{name = Name}) ->
     describe(C, statement, Name).
 

+ 29 - 0
test/epgsql_SUITE.erl

@@ -87,6 +87,9 @@ groups() ->
         cursor,
         multiple_result,
         execute_batch,
+        execute_batch_3_named_stmt,
+        execute_batch_3_unnamed_stmt,
+        execute_batch_3_sql,
         batch_error,
         single_batch,
         extended_select,
@@ -431,6 +434,32 @@ execute_batch(Config) ->
             Module:execute_batch(C, [{S1, [1]}, {S2, [1, 2]}])
     end).
 
+execute_batch_3_named_stmt(Config) ->
+    Module = ?config(module, Config),
+    epgsql_ct:with_connection(Config, fun(C) ->
+        {ok, Stmt} = Module:parse(C, "my_stmt", "select $1 + $2", [int4, int4]),
+        ?assertMatch(
+           {[#column{type = int4, _ = _}], [{ok, [{3}]}, {ok, [{7}]}]},
+           Module:execute_batch(C, Stmt, [[1, 2], [3, 4]]))
+    end).
+
+execute_batch_3_unnamed_stmt(Config) ->
+    Module = ?config(module, Config),
+    epgsql_ct:with_connection(Config, fun(C) ->
+        {ok, Stmt} = Module:parse(C, "select $1::integer + $2::integer"),
+        ?assertMatch(
+           {[#column{type = int4, _ = _}], [{ok, [{3}]}, {ok, [{7}]}]},
+           Module:execute_batch(C, Stmt, [[2, 1], [4, 3]]))
+    end).
+
+execute_batch_3_sql(Config) ->
+    Module = ?config(module, Config),
+    epgsql_ct:with_connection(Config, fun(C) ->
+        ?assertMatch(
+           {[#column{type = int4, _ = _}], [{ok, [{3}]}, {ok, [{7}]}]},
+           Module:execute_batch(C, "select $1::integer + $2::integer", [[1, 2], [3, 4]]))
+    end).
+
 batch_error(Config) ->
     Module = ?config(module, Config),
     epgsql_ct:with_rollback(Config, fun(C) ->

+ 12 - 1
test/epgsql_cast.erl

@@ -9,7 +9,7 @@
 -export([get_parameter/2, set_notice_receiver/2, get_cmd_status/1, squery/2, equery/2, equery/3]).
 -export([prepared_query/3]).
 -export([parse/2, parse/3, parse/4, describe/2, describe/3]).
--export([bind/3, bind/4, execute/2, execute/3, execute/4, execute_batch/2]).
+-export([bind/3, bind/4, execute/2, execute/3, execute/4, execute_batch/2, execute_batch/3]).
 -export([close/2, close/3, sync/1]).
 -export([receive_result/2, sync_on_error/2]).
 
@@ -123,6 +123,17 @@ execute_batch(C, Batch) ->
     Ref = epgsqla:execute_batch(C, Batch),
     receive_result(C, Ref).
 
+execute_batch(C, #statement{columns = Cols} = Stmt, Batch) ->
+    Ref = epgsqla:execute_batch(C, Stmt, Batch),
+    {Cols, receive_result(C, Ref)};
+execute_batch(C, Sql, Batch) ->
+    case parse(C, Sql) of
+        {ok, #statement{} = S} ->
+            execute_batch(C, S, Batch);
+        Error ->
+            Error
+    end.
+
 %% statement/portal functions
 
 describe(C, #statement{name = Name}) ->

+ 12 - 1
test/epgsql_incremental.erl

@@ -9,7 +9,7 @@
 -export([get_parameter/2, set_notice_receiver/2, get_cmd_status/1, squery/2, equery/2, equery/3]).
 -export([prepared_query/3]).
 -export([parse/2, parse/3, parse/4, describe/2, describe/3]).
--export([bind/3, bind/4, execute/2, execute/3, execute/4, execute_batch/2]).
+-export([bind/3, bind/4, execute/2, execute/3, execute/4, execute_batch/2, execute_batch/3]).
 -export([close/2, close/3, sync/1]).
 
 -include("epgsql.hrl").
@@ -124,6 +124,17 @@ execute_batch(C, Batch) ->
     Ref = epgsqli:execute_batch(C, Batch),
     receive_extended_results(C, Ref, []).
 
+execute_batch(C, #statement{columns = Cols} = Stmt, Batch) ->
+    Ref = epgsqli:execute_batch(C, Stmt, Batch),
+    {Cols, receive_extended_results(C, Ref, [])};
+execute_batch(C, Sql, Batch) ->
+    case parse(C, Sql) of
+        {ok, #statement{} = S} ->
+            execute_batch(C, S, Batch);
+        Error ->
+            Error
+    end.
+
 %% statement/portal functions
 
 describe(C, #statement{name = Name}) ->