epgsql_cmd_prepared_query.erl 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. %% Almost the same as equery, but don't execute 'CLOSE'
  2. %% > Bind
  3. %% < BindComplete
  4. %% > Execute
  5. %% < DataRow*
  6. %% < CommandComplete
  7. %% > Sync
  8. %% < ReadyForQuery
  9. -module(epgsql_cmd_prepared_query).
  10. -behaviour(epgsql_command).
  11. -export([init/1, execute/2, handle_message/4]).
  12. -export_type([response/0]).
  13. -type response() :: {ok, Count :: non_neg_integer(), Cols :: [epgsql:column()], Rows :: [tuple()]}
  14. | {ok, Count :: non_neg_integer()}
  15. | {ok, Cols :: [epgsql:column()], Rows :: [tuple()]}
  16. | {error, epgsql:query_error()}.
  17. -include("epgsql.hrl").
  18. -include("protocol.hrl").
  19. -record(pquery,
  20. {stmt :: #statement{},
  21. params :: list(),
  22. decoder}).
  23. init({Stmt, TypedParams}) ->
  24. #pquery{stmt = Stmt,
  25. params = TypedParams}.
  26. execute(Sock, #pquery{stmt = Stmt, params = TypedParams} = St) ->
  27. #statement{name = StatementName, columns = Columns} = Stmt,
  28. Codec = epgsql_sock:get_codec(Sock),
  29. Bin1 = epgsql_wire:encode_parameters(TypedParams, Codec),
  30. Bin2 = epgsql_wire:encode_formats(Columns),
  31. epgsql_sock:send_multi(
  32. Sock,
  33. [
  34. {?BIND, ["", 0, StatementName, 0, Bin1, Bin2]},
  35. {?EXECUTE, ["", 0, <<0:?int32>>]},
  36. {?SYNC, []}
  37. ]),
  38. {ok, Sock, St}.
  39. handle_message(?BIND_COMPLETE, <<>>, Sock, #pquery{stmt = Stmt} = State) ->
  40. #statement{columns = Columns} = Stmt,
  41. epgsql_sock:notify(Sock, {columns, Columns}), % Why do we need this?
  42. Codec = epgsql_sock:get_codec(Sock),
  43. Decoder = epgsql_wire:build_decoder(Columns, Codec),
  44. {noaction, Sock, State#pquery{decoder = Decoder}};
  45. handle_message(?DATA_ROW, <<_Count:?int16, Bin/binary>>,
  46. Sock, #pquery{decoder = Decoder} = St) ->
  47. Row = epgsql_wire:decode_data(Bin, Decoder),
  48. {add_row, Row, Sock, St};
  49. handle_message(?EMPTY_QUERY, _, Sock, St) ->
  50. {add_result, {ok, [], []}, {complete, empty}, Sock, St};
  51. handle_message(?COMMAND_COMPLETE, Bin, Sock, #pquery{stmt = Stmt} = St) ->
  52. Complete = epgsql_wire:decode_complete(Bin),
  53. #statement{columns = Cols} = Stmt,
  54. Rows = epgsql_sock:get_rows(Sock),
  55. Result = case Complete of
  56. {_, Count} when Cols == [] ->
  57. {ok, Count};
  58. {_, Count} ->
  59. {ok, Count, Cols, Rows};
  60. _ ->
  61. {ok, Cols, Rows}
  62. end,
  63. {add_result, Result, {complete, Complete}, Sock, St};
  64. handle_message(?READY_FOR_QUERY, _Status, Sock, _State) ->
  65. case epgsql_sock:get_results(Sock) of
  66. [Result] ->
  67. {finish, Result, done, Sock};
  68. [] ->
  69. {finish, done, done, Sock}
  70. end;
  71. handle_message(?ERROR, Error, Sock, St) ->
  72. Result = {error, Error},
  73. {add_result, Result, Result, Sock, St};
  74. handle_message(_, _, _, _) ->
  75. unknown.