epgsql_cmd_execute.erl 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. %% > Execute
  2. %% < DataRow*
  3. %% < CommandComplete | PortalSuspended
  4. -module(epgsql_cmd_execute).
  5. -behaviour(epgsql_command).
  6. -export([init/1, execute/2, handle_message/4]).
  7. -export_type([response/0]).
  8. -type response() :: {ok, Count :: non_neg_integer(), Rows :: [tuple()]}
  9. | {ok, Count :: non_neg_integer()}
  10. | {ok | partial, Rows :: [tuple()]}
  11. | {error, epgsql:query_error()}.
  12. -include("epgsql.hrl").
  13. -include("protocol.hrl").
  14. -record(execute,
  15. {stmt :: #statement{},
  16. portal_name :: iodata(),
  17. max_rows :: non_neg_integer(),
  18. decoder}).
  19. init({Stmt, PortalName, MaxRows}) ->
  20. #execute{stmt = Stmt, portal_name = PortalName, max_rows = MaxRows}.
  21. execute(Sock, #execute{stmt = Stmt, portal_name = PortalName, max_rows = MaxRows} = State) ->
  22. epgsql_sock:send_multi(
  23. Sock,
  24. [
  25. {?EXECUTE, [PortalName, 0, <<MaxRows:?int32>>]},
  26. {?FLUSH, []}
  27. ]),
  28. #statement{columns = Columns} = Stmt,
  29. Codec = epgsql_sock:get_codec(Sock),
  30. Decoder = epgsql_wire:build_decoder(Columns, Codec),
  31. {ok, Sock, State#execute{decoder = Decoder}}.
  32. handle_message(?DATA_ROW, <<_Count:?int16, Bin/binary>>, Sock,
  33. #execute{decoder = Decoder} = St) ->
  34. Row = epgsql_wire:decode_data(Bin, Decoder),
  35. {add_row, Row, Sock, St};
  36. handle_message(?EMPTY_QUERY, _, Sock, _State) ->
  37. {finish, {ok, [], []}, {complete, empty}, Sock};
  38. handle_message(?COMMAND_COMPLETE, Bin, Sock,
  39. #execute{stmt = #statement{columns = Cols}}) ->
  40. Complete = epgsql_wire:decode_complete(Bin),
  41. Rows = epgsql_sock:get_rows(Sock),
  42. Result = case Complete of
  43. {_, Count} when Cols == [] ->
  44. {ok, Count};
  45. {_, Count} ->
  46. {ok, Count, Rows};
  47. _ ->
  48. {ok, Rows}
  49. end,
  50. {finish, Result, {complete, Complete}, Sock};
  51. handle_message(?PORTAL_SUSPENDED, <<>>, Sock, _State) ->
  52. Rows = epgsql_sock:get_rows(Sock),
  53. {finish, {partial, Rows}, suspended, Sock};
  54. handle_message(?ERROR, Error, _Sock, _State) ->
  55. {sync_required, {error, Error}};
  56. handle_message(_, _, _, _) ->
  57. unknown.