epgsql_cmd_equery.erl 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. %% > Bind
  2. %% < BindComplete
  3. %% > Execute
  4. %% < DataRow*
  5. %% < CommandComplete
  6. %% > Close
  7. %% < CloseComplete
  8. %% > Sync
  9. %% < ReadyForQuery
  10. -module(epgsql_cmd_equery).
  11. -behaviour(epgsql_command).
  12. -export([init/1, execute/2, handle_message/4]).
  13. -export_type([response/0]).
  14. -type response() :: {ok, Count :: non_neg_integer(), Cols :: [epgsql:column()], Rows :: [tuple()]}
  15. | {ok, Count :: non_neg_integer()}
  16. | {ok, Cols :: [epgsql:column()], Rows :: [tuple()]}
  17. | {error, epgsql:query_error()}.
  18. -include("epgsql.hrl").
  19. -include("protocol.hrl").
  20. -record(equery,
  21. {stmt :: #statement{},
  22. params :: list(),
  23. decoder}).
  24. init({Stmt, TypedParams}) ->
  25. #equery{stmt = Stmt,
  26. params = TypedParams}.
  27. execute(Sock, #equery{stmt = Stmt, params = TypedParams} = St) ->
  28. #statement{name = StatementName, columns = Columns} = Stmt,
  29. Codec = epgsql_sock:get_codec(Sock),
  30. Bin1 = epgsql_wire:encode_parameters(TypedParams, Codec),
  31. Bin2 = epgsql_wire:encode_formats(Columns),
  32. epgsql_sock:send_multi(
  33. Sock,
  34. [
  35. {?BIND, ["", 0, StatementName, 0, Bin1, Bin2]},
  36. {?EXECUTE, ["", 0, <<0:?int32>>]},
  37. {?CLOSE, [?PREPARED_STATEMENT, StatementName, 0]},
  38. {?SYNC, []}
  39. ]),
  40. {ok, Sock, St}.
  41. handle_message(?BIND_COMPLETE, <<>>, Sock, #equery{stmt = Stmt} = State) ->
  42. #statement{columns = Columns} = Stmt,
  43. epgsql_sock:notify(Sock, {columns, Columns}), % Why do we need this?
  44. Codec = epgsql_sock:get_codec(Sock),
  45. Decoder = epgsql_wire:build_decoder(Columns, Codec),
  46. {noaction, Sock, State#equery{decoder = Decoder}};
  47. handle_message(?DATA_ROW, <<_Count:?int16, Bin/binary>>,
  48. Sock, #equery{decoder = Decoder} = St) ->
  49. Row = epgsql_wire:decode_data(Bin, Decoder),
  50. {add_row, Row, Sock, St};
  51. handle_message(?EMPTY_QUERY, <<>>, Sock, St) ->
  52. {add_result, {ok, [], []}, {complete, empty}, Sock, St};
  53. handle_message(?COMMAND_COMPLETE, Bin, Sock, #equery{stmt = Stmt} = St) ->
  54. Complete = epgsql_wire:decode_complete(Bin),
  55. #statement{columns = Cols} = Stmt,
  56. Rows = epgsql_sock:get_rows(Sock),
  57. Result = case Complete of
  58. {_, Count} when Cols == [] ->
  59. {ok, Count};
  60. {_, Count} ->
  61. {ok, Count, Cols, Rows};
  62. _ ->
  63. {ok, Cols, Rows}
  64. end,
  65. {add_result, Result, {complete, Complete}, Sock, St};
  66. handle_message(?CLOSE_COMPLETE, _, Sock, _State) ->
  67. {noaction, Sock};
  68. handle_message(?READY_FOR_QUERY, _Status, Sock, _State) ->
  69. case epgsql_sock:get_results(Sock) of
  70. [Result] ->
  71. {finish, Result, done, Sock};
  72. [] ->
  73. {finish, done, done, Sock}
  74. end;
  75. handle_message(?ERROR, Error, Sock, St) ->
  76. Result = {error, Error},
  77. {add_result, Result, Result, Sock, St};
  78. handle_message(_, _, _, _) ->
  79. unknown.