epgsql_cmd_batch.erl 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. %% > Bind
  2. %% < BindComplete
  3. %% > Execute
  4. %% < DataRow*
  5. %% < CommandComplete
  6. %% -- Repeated many times --
  7. %% > Sync
  8. %% < ReadyForQuery
  9. -module(epgsql_cmd_batch).
  10. -behaviour(epgsql_command).
  11. -export([init/1, execute/2, handle_message/4]).
  12. -export_type([response/0]).
  13. -type response() :: [{ok, Count :: non_neg_integer(), Rows :: [tuple()]}
  14. | {ok, Count :: non_neg_integer()}
  15. | {ok, Rows :: [tuple()]}
  16. | {error, epgsql:query_error()}].
  17. -include("epgsql.hrl").
  18. -include("protocol.hrl").
  19. -record(batch,
  20. {batch :: [{#statement{}, list()}],
  21. decoder}).
  22. init(Batch) ->
  23. #batch{batch = Batch}.
  24. execute(Sock, #batch{batch = Batch} = State) ->
  25. Codec = epgsql_sock:get_codec(Sock),
  26. Commands =
  27. lists:foldr(
  28. fun({Statement, Parameters}, Acc) ->
  29. #statement{name = StatementName,
  30. columns = Columns,
  31. types = Types} = Statement,
  32. TypedParameters = lists:zip(Types, Parameters),
  33. Bin1 = epgsql_wire:encode_parameters(TypedParameters, Codec),
  34. Bin2 = epgsql_wire:encode_formats(Columns),
  35. [{?BIND, [0, StatementName, 0, Bin1, Bin2]},
  36. {?EXECUTE, [0, <<0:?int32>>]} | Acc]
  37. end,
  38. [{?SYNC, []}],
  39. Batch),
  40. epgsql_sock:send_multi(Sock, Commands),
  41. {ok, Sock, State}.
  42. handle_message(?BIND_COMPLETE, <<>>, Sock, #batch{batch = [{Stmt, _} | _]} = State) ->
  43. #statement{columns = Columns} = Stmt,
  44. Codec = epgsql_sock:get_codec(Sock),
  45. Decoder = epgsql_wire:build_decoder(Columns, Codec),
  46. {noaction, Sock, State#batch{decoder = Decoder}};
  47. handle_message(?DATA_ROW, <<_Count:?int16, Bin/binary>>, Sock,
  48. #batch{decoder = Decoder} = State) ->
  49. Row = epgsql_wire:decode_data(Bin, Decoder),
  50. {add_row, Row, Sock, State};
  51. %% handle_message(?EMPTY_QUERY, _, Sock, _State) ->
  52. %% Sock1 = epgsql_sock:add_result(Sock, {complete, empty}, {ok, [], []}),
  53. %% {noaction, Sock1};
  54. handle_message(?COMMAND_COMPLETE, Bin, Sock,
  55. #batch{batch = [{#statement{columns = Columns}, _} | Batch]} = State) ->
  56. Complete = epgsql_wire:decode_complete(Bin),
  57. Rows = epgsql_sock:get_rows(Sock),
  58. Result = case Complete of
  59. {_, Count} when Columns == [] ->
  60. {ok, Count};
  61. {_, Count} ->
  62. {ok, Count, Rows};
  63. _ ->
  64. {ok, Rows}
  65. end,
  66. {add_result, Result, {complete, Complete}, Sock, State#batch{batch = Batch}};
  67. handle_message(?READY_FOR_QUERY, _Status, Sock, #batch{batch = B} = _State) when
  68. length(B) =< 1 ->
  69. Results = epgsql_sock:get_results(Sock),
  70. {finish, Results, done, Sock};
  71. handle_message(?ERROR, Error, Sock, #batch{batch = [_ | Batch]} = State) ->
  72. Result = {error, Error},
  73. {add_result, Result, Result, Sock, State#batch{batch = Batch}};
  74. handle_message(_, _, _, _) ->
  75. unknown.