pgsql_sock.erl 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682
  1. %%% Copyright (C) 2009 - Will Glozer. All rights reserved.
  2. %%% Copyright (C) 2011 - Anton Lebedevich. All rights reserved.
  3. -module(pgsql_sock).
  4. -behavior(gen_server).
  5. -export([start_link/0,
  6. close/1,
  7. get_parameter/2,
  8. cancel/1]).
  9. -export([handle_call/3, handle_cast/2, handle_info/2]).
  10. -export([init/1, code_change/3, terminate/2]).
  11. %% state callbacks
  12. -export([auth/2, initializing/2, on_message/2]).
  13. -include("pgsql.hrl").
  14. -include("pgsql_binary.hrl").
  15. %% Commands defined as per this page:
  16. %% http://www.postgresql.org/docs/9.2/static/protocol-message-formats.html
  17. %% Commands
  18. -define(BIND, $B).
  19. -define(CLOSE, $C).
  20. -define(DESCRIBE, $D).
  21. -define(EXECUTE, $E).
  22. -define(FLUSH, $H).
  23. -define(PARSE, $P).
  24. -define(SIMPLEQUERY, $Q).
  25. -define(AUTHENTICATION_REQUEST, $R).
  26. -define(SYNC, $S).
  27. %% Parameters
  28. -define(PREPARED_STATEMENT, $S).
  29. -define(PORTAL, $P).
  30. %% Responses
  31. -define(PARSE_COMPLETE, $1).
  32. -define(BIND_COMPLETE, $2).
  33. -define(CLOSE_COMPLETE, $3).
  34. -define(COMMAND_COMPLETE, $C).
  35. -define(DATA_ROW, $D).
  36. -define(CANCELLATION_KEY, $K).
  37. -define(NO_DATA, $n).
  38. -define(PORTAL_SUSPENDED, $s).
  39. -define(PARAMETER_DESCRIPTION, $t).
  40. -define(ROW_DESCRIPTION, $T).
  41. -define(READY_FOR_QUERY, $Z).
  42. -record(state, {mod,
  43. sock,
  44. data = <<>>,
  45. backend,
  46. handler,
  47. queue = queue:new(),
  48. async,
  49. parameters = [],
  50. types = [],
  51. columns = [],
  52. rows = [],
  53. results = [],
  54. batch = [],
  55. sync_required,
  56. txstatus}).
  57. %% -- client interface --
  58. start_link() ->
  59. gen_server:start_link(?MODULE, [], []).
  60. close(C) when is_pid(C) ->
  61. catch gen_server:cast(C, stop),
  62. ok.
  63. get_parameter(C, Name) ->
  64. gen_server:call(C, {get_parameter, to_binary(Name)}, infinity).
  65. cancel(S) ->
  66. gen_server:cast(S, cancel).
  67. %% -- gen_server implementation --
  68. init([]) ->
  69. {ok, #state{}}.
  70. handle_call({get_parameter, Name}, _From, State) ->
  71. case lists:keysearch(Name, 1, State#state.parameters) of
  72. {value, {Name, Value}} -> Value;
  73. false -> Value = undefined
  74. end,
  75. {reply, {ok, Value}, State};
  76. handle_call(Command, From, State) ->
  77. #state{queue = Q} = State,
  78. Req = {{call, From}, Command},
  79. command(Command, State#state{queue = queue:in(Req, Q)}).
  80. handle_cast({{Method, From, Ref}, Command} = Req, State)
  81. when ((Method == cast) or (Method == incremental)),
  82. is_pid(From),
  83. is_reference(Ref) ->
  84. #state{queue = Q} = State,
  85. command(Command, State#state{queue = queue:in(Req, Q)});
  86. handle_cast(stop, State) ->
  87. {stop, normal, flush_queue(State, {error, closed})};
  88. handle_cast(cancel, State = #state{backend = {Pid, Key}}) ->
  89. {ok, {Addr, Port}} = inet:peername(State#state.sock),
  90. SockOpts = [{active, false}, {packet, raw}, binary],
  91. %% TODO timeout
  92. {ok, Sock} = gen_tcp:connect(Addr, Port, SockOpts),
  93. Msg = <<16:?int32, 80877102:?int32, Pid:?int32, Key:?int32>>,
  94. ok = gen_tcp:send(Sock, Msg),
  95. gen_tcp:close(Sock),
  96. {noreply, State}.
  97. handle_info({Closed, Sock}, #state{sock = Sock} = State)
  98. when Closed == tcp_closed; Closed == ssl_closed ->
  99. {stop, sock_closed, flush_queue(State, {error, sock_closed})};
  100. handle_info({Error, Sock, Reason}, #state{sock = Sock} = State)
  101. when Error == tcp_error; Error == ssl_error ->
  102. Why = {sock_error, Reason},
  103. {stop, Why, flush_queue(State, {error, Why})};
  104. handle_info({inet_reply, _, ok}, State) ->
  105. {noreply, State};
  106. handle_info({inet_reply, _, Status}, State) ->
  107. {stop, Status, flush_queue(State, {error, Status})};
  108. handle_info({_, Sock, Data2}, #state{data = Data, sock = Sock} = State) ->
  109. loop(State#state{data = <<Data/binary, Data2/binary>>}).
  110. terminate(_Reason, _State) ->
  111. %% TODO send termination msg, close socket ??
  112. ok.
  113. code_change(_OldVsn, State, _Extra) ->
  114. {ok, State}.
  115. %% -- internal functions --
  116. command(Command, State = #state{sync_required = true})
  117. when Command /= sync ->
  118. {noreply, finish(State, {error, sync_required})};
  119. command({connect, Host, Username, Password, Opts}, State) ->
  120. Timeout = proplists:get_value(timeout, Opts, 5000),
  121. Port = proplists:get_value(port, Opts, 5432),
  122. SockOpts = [{active, false}, {packet, raw}, binary, {nodelay, true}],
  123. {ok, Sock} = gen_tcp:connect(Host, Port, SockOpts, Timeout),
  124. State2 = case proplists:get_value(ssl, Opts) of
  125. T when T == true; T == required ->
  126. start_ssl(Sock, T, Opts, State);
  127. _ ->
  128. State#state{mod = gen_tcp, sock = Sock}
  129. end,
  130. Opts2 = ["user", 0, Username, 0],
  131. case proplists:get_value(database, Opts, undefined) of
  132. undefined -> Opts3 = Opts2;
  133. Database -> Opts3 = [Opts2 | ["database", 0, Database, 0]]
  134. end,
  135. send(State2, [<<196608:?int32>>, Opts3, 0]),
  136. Async = proplists:get_value(async, Opts, undefined),
  137. setopts(State2, [{active, true}]),
  138. put(username, Username),
  139. put(password, Password),
  140. {noreply,
  141. State2#state{handler = auth,
  142. async = Async}};
  143. command({squery, Sql}, State) ->
  144. send(State, ?SIMPLEQUERY, [Sql, 0]),
  145. {noreply, State};
  146. %% TODO add fast_equery command that doesn't need parsed statement,
  147. %% uses default (text) column format,
  148. %% sends Describe after Bind to get RowDescription
  149. command({equery, Statement, Parameters}, State) ->
  150. #statement{name = StatementName, columns = Columns} = Statement,
  151. Bin1 = pgsql_wire:encode_parameters(Parameters),
  152. Bin2 = pgsql_wire:encode_formats(Columns),
  153. send(State, ?BIND, ["", 0, StatementName, 0, Bin1, Bin2]),
  154. send(State, ?EXECUTE, ["", 0, <<0:?int32>>]),
  155. send(State, ?CLOSE, [?PREPARED_STATEMENT, StatementName, 0]),
  156. send(State, ?SYNC, []),
  157. {noreply, State};
  158. command({parse, Name, Sql, Types}, State) ->
  159. Bin = pgsql_wire:encode_types(Types),
  160. send(State, ?PARSE, [Name, 0, Sql, 0, Bin]),
  161. send(State, ?DESCRIBE, [?PREPARED_STATEMENT, Name, 0]),
  162. send(State, ?FLUSH, []),
  163. {noreply, State};
  164. command({bind, Statement, PortalName, Parameters}, State) ->
  165. #statement{name = StatementName, columns = Columns, types = Types} = Statement,
  166. Typed_Parameters = lists:zip(Types, Parameters),
  167. Bin1 = pgsql_wire:encode_parameters(Typed_Parameters),
  168. Bin2 = pgsql_wire:encode_formats(Columns),
  169. send(State, ?BIND, [PortalName, 0, StatementName, 0, Bin1, Bin2]),
  170. send(State, ?FLUSH, []),
  171. {noreply, State};
  172. command({execute, _Statement, PortalName, MaxRows}, State) ->
  173. send(State, ?EXECUTE, [PortalName, 0, <<MaxRows:?int32>>]),
  174. send(State, ?FLUSH, []),
  175. {noreply, State};
  176. command({execute_batch, Batch}, State) ->
  177. #state{mod = Mod, sock = Sock} = State,
  178. BindExecute =
  179. lists:map(
  180. fun({Statement, Parameters}) ->
  181. #statement{name = StatementName,
  182. columns = Columns,
  183. types = Types} = Statement,
  184. Typed_Parameters = lists:zip(Types, Parameters),
  185. Bin1 = pgsql_wire:encode_parameters(Typed_Parameters),
  186. Bin2 = pgsql_wire:encode_formats(Columns),
  187. [pgsql_wire:encode(?BIND, [0, StatementName, 0,
  188. Bin1, Bin2]),
  189. pgsql_wire:encode(?EXECUTE, [0, <<0:?int32>>])]
  190. end,
  191. Batch),
  192. Sync = pgsql_wire:encode(?SYNC, []),
  193. do_send(Mod, Sock, [BindExecute, Sync]),
  194. {noreply, State};
  195. command({describe_statement, Name}, State) ->
  196. send(State, ?DESCRIBE, [?PREPARED_STATEMENT, Name, 0]),
  197. send(State, ?FLUSH, []),
  198. {noreply, State};
  199. command({describe_portal, Name}, State) ->
  200. send(State, ?DESCRIBE, [?PORTAL, Name, 0]),
  201. send(State, ?FLUSH, []),
  202. {noreply, State};
  203. command({close, Type, Name}, State) ->
  204. case Type of
  205. statement -> Type2 = ?PREPARED_STATEMENT;
  206. portal -> Type2 = ?PORTAL
  207. end,
  208. send(State, ?CLOSE, [Type2, Name, 0]),
  209. send(State, ?FLUSH, []),
  210. {noreply, State};
  211. command(sync, State) ->
  212. send(State, ?SYNC, []),
  213. {noreply, State#state{sync_required = false}}.
  214. start_ssl(S, Flag, Opts, State) ->
  215. ok = gen_tcp:send(S, <<8:?int32, 80877103:?int32>>),
  216. Timeout = proplists:get_value(timeout, Opts, 5000),
  217. {ok, <<Code>>} = gen_tcp:recv(S, 1, Timeout),
  218. case Code of
  219. $S ->
  220. case ssl:connect(S, Opts, Timeout) of
  221. {ok, S2} -> State#state{mod = ssl, sock = S2};
  222. {error, Reason} -> exit({ssl_negotiation_failed, Reason})
  223. end;
  224. $N ->
  225. case Flag of
  226. true -> State;
  227. required -> exit(ssl_not_available)
  228. end
  229. end.
  230. setopts(#state{mod = Mod, sock = Sock}, Opts) ->
  231. case Mod of
  232. gen_tcp -> inet:setopts(Sock, Opts);
  233. ssl -> ssl:setopts(Sock, Opts)
  234. end.
  235. send(#state{mod = Mod, sock = Sock}, Data) ->
  236. do_send(Mod, Sock, pgsql_wire:encode(Data)).
  237. send(#state{mod = Mod, sock = Sock}, Type, Data) ->
  238. do_send(Mod, Sock, pgsql_wire:encode(Type, Data)).
  239. do_send(gen_tcp, Sock, Bin) ->
  240. try erlang:port_command(Sock, Bin) of
  241. true ->
  242. ok
  243. catch
  244. error:_Error ->
  245. {error,einval}
  246. end;
  247. do_send(Mod, Sock, Bin) ->
  248. Mod:send(Sock, Bin).
  249. loop(#state{data = Data, handler = Handler} = State) ->
  250. case pgsql_wire:decode_message(Data) of
  251. {Message, Tail} ->
  252. case ?MODULE:Handler(Message, State#state{data = Tail}) of
  253. {noreply, State2} ->
  254. loop(State2);
  255. R = {stop, _Reason2, _State2} ->
  256. R
  257. end;
  258. _ ->
  259. {noreply, State}
  260. end.
  261. finish(State, Result) ->
  262. finish(State, Result, Result).
  263. finish(State = #state{queue = Q}, Notice, Result) ->
  264. case queue:get(Q) of
  265. {{cast, From, Ref}, _} ->
  266. From ! {self(), Ref, Result};
  267. {{incremental, From, Ref}, _} ->
  268. From ! {self(), Ref, Notice};
  269. {{call, From}, _} ->
  270. gen_server:reply(From, Result)
  271. end,
  272. State#state{queue = queue:drop(Q),
  273. types = [],
  274. columns = [],
  275. rows = [],
  276. results = [],
  277. batch = []}.
  278. add_result(State, Notice, Result) ->
  279. #state{queue = Q, results = Results, batch = Batch} = State,
  280. Results2 = case queue:get(Q) of
  281. {{incremental, From, Ref}, _} ->
  282. From ! {self(), Ref, Notice},
  283. Results;
  284. _ ->
  285. [Result | Results]
  286. end,
  287. Batch2 = case Batch of
  288. [] -> [];
  289. _ -> tl(Batch)
  290. end,
  291. State#state{types = [],
  292. columns = [],
  293. rows = [],
  294. results = Results2,
  295. batch = Batch2}.
  296. add_row(State = #state{queue = Q, rows = Rows}, Data) ->
  297. Rows2 = case queue:get(Q) of
  298. {{incremental, From, Ref}, _} ->
  299. From ! {self(), Ref, {data, Data}},
  300. Rows;
  301. _ ->
  302. [Data | Rows]
  303. end,
  304. State#state{rows = Rows2}.
  305. notify(State = #state{queue = Q}, Notice) ->
  306. case queue:get(Q) of
  307. {{incremental, From, Ref}, _} ->
  308. From ! {self(), Ref, Notice};
  309. _ ->
  310. ignore
  311. end,
  312. State.
  313. notify_async(State = #state{async = Pid}, Msg) ->
  314. case is_pid(Pid) of
  315. true -> Pid ! {pgsql, self(), Msg};
  316. false -> false
  317. end,
  318. State.
  319. command_tag(#state{queue = Q}) ->
  320. {_, Req} = queue:get(Q),
  321. if is_tuple(Req) ->
  322. element(1, Req);
  323. is_atom(Req) ->
  324. Req
  325. end.
  326. get_columns(State) ->
  327. #state{queue = Q, columns = Columns, batch = Batch} = State,
  328. case queue:get(Q) of
  329. {_, {equery, #statement{columns = C}, _}} ->
  330. C;
  331. {_, {execute, #statement{columns = C}, _, _}} ->
  332. C;
  333. {_, {squery, _}} ->
  334. Columns;
  335. {_, {execute_batch, _}} ->
  336. [{#statement{columns = C}, _} | _] = Batch,
  337. C
  338. end.
  339. make_statement(State) ->
  340. #state{queue = Q, types = Types, columns = Columns} = State,
  341. Name = case queue:get(Q) of
  342. {_, {parse, N, _, _}} -> N;
  343. {_, {describe_statement, N}} -> N
  344. end,
  345. #statement{name = Name, types = Types, columns = Columns}.
  346. sync_required(#state{queue = Q} = State) ->
  347. case queue:is_empty(Q) of
  348. false ->
  349. case command_tag(State) of
  350. sync ->
  351. State;
  352. _ ->
  353. sync_required(finish(State, {error, sync_required}))
  354. end;
  355. true ->
  356. State#state{sync_required = true}
  357. end.
  358. flush_queue(#state{queue = Q} = State, Error) ->
  359. case queue:is_empty(Q) of
  360. false ->
  361. flush_queue(finish(State, Error), Error);
  362. true -> State
  363. end.
  364. to_binary(B) when is_binary(B) -> B;
  365. to_binary(L) when is_list(L) -> list_to_binary(L).
  366. hex(Bin) ->
  367. HChar = fun(N) when N < 10 -> $0 + N;
  368. (N) when N < 16 -> $W + N
  369. end,
  370. <<<<(HChar(H)), (HChar(L))>> || <<H:4, L:4>> <= Bin>>.
  371. %% -- backend message handling --
  372. %% AuthenticationOk
  373. auth({?AUTHENTICATION_REQUEST, <<0:?int32>>}, State) ->
  374. {noreply, State#state{handler = initializing}};
  375. %% AuthenticationCleartextPassword
  376. auth({?AUTHENTICATION_REQUEST, <<3:?int32>>}, State) ->
  377. send(State, $p, [get(password), 0]),
  378. {noreply, State};
  379. %% AuthenticationMD5Password
  380. auth({?AUTHENTICATION_REQUEST, <<5:?int32, Salt:4/binary>>}, State) ->
  381. Digest1 = hex(erlang:md5([get(password), get(username)])),
  382. Str = ["md5", hex(erlang:md5([Digest1, Salt])), 0],
  383. send(State, $p, Str),
  384. {noreply, State};
  385. auth({?AUTHENTICATION_REQUEST, <<M:?int32, _/binary>>}, State) ->
  386. case M of
  387. 2 -> Method = kerberosV5;
  388. 4 -> Method = crypt;
  389. 6 -> Method = scm;
  390. 7 -> Method = gss;
  391. 8 -> Method = sspi;
  392. _ -> Method = unknown
  393. end,
  394. State2 = finish(State, {error, {unsupported_auth_method, Method}}),
  395. {stop, normal, State2};
  396. %% ErrorResponse
  397. auth({error, E}, State) ->
  398. case E#error.code of
  399. <<"28000">> -> Why = invalid_authorization_specification;
  400. <<"28P01">> -> Why = invalid_password;
  401. Any -> Why = Any
  402. end,
  403. {stop, normal, finish(State, {error, Why})};
  404. auth(Other, State) ->
  405. on_message(Other, State).
  406. %% BackendKeyData
  407. initializing({?CANCELLATION_KEY, <<Pid:?int32, Key:?int32>>}, State) ->
  408. {noreply, State#state{backend = {Pid, Key}}};
  409. %% ReadyForQuery
  410. initializing({?READY_FOR_QUERY, <<Status:8>>}, State) ->
  411. #state{parameters = Parameters} = State,
  412. erase(username),
  413. erase(password),
  414. %% TODO decode dates to now() format
  415. case lists:keysearch(<<"integer_datetimes">>, 1, Parameters) of
  416. {value, {_, <<"on">>}} -> put(datetime_mod, pgsql_idatetime);
  417. {value, {_, <<"off">>}} -> put(datetime_mod, pgsql_fdatetime)
  418. end,
  419. State2 = finish(State#state{handler = on_message,
  420. txstatus = Status},
  421. connected),
  422. {noreply, State2};
  423. initializing({error, _} = Error, State) ->
  424. {stop, normal, finish(State, Error)};
  425. initializing(Other, State) ->
  426. on_message(Other, State).
  427. %% ParseComplete
  428. on_message({?PARSE_COMPLETE, <<>>}, State) ->
  429. {noreply, State};
  430. %% ParameterDescription
  431. on_message({?PARAMETER_DESCRIPTION, <<_Count:?int16, Bin/binary>>}, State) ->
  432. Types = [pgsql_types:oid2type(Oid) || <<Oid:?int32>> <= Bin],
  433. State2 = notify(State#state{types = Types}, {types, Types}),
  434. {noreply, State2};
  435. %% RowDescription
  436. on_message({?ROW_DESCRIPTION, <<Count:?int16, Bin/binary>>}, State) ->
  437. Columns = pgsql_wire:decode_columns(Count, Bin),
  438. Columns2 =
  439. case command_tag(State) of
  440. C when C == describe_portal; C == squery ->
  441. Columns;
  442. C when C == parse; C == describe_statement ->
  443. [Col#column{format = pgsql_wire:format(Col#column.type)}
  444. || Col <- Columns]
  445. end,
  446. State2 = State#state{columns = Columns2},
  447. Message = {columns, Columns2},
  448. State3 = case command_tag(State2) of
  449. squery ->
  450. notify(State2, Message);
  451. T when T == parse; T == describe_statement ->
  452. finish(State2, Message, {ok, make_statement(State2)});
  453. describe_portal ->
  454. finish(State2, Message, {ok, Columns})
  455. end,
  456. {noreply, State3};
  457. %% NoData
  458. on_message({?NO_DATA, <<>>}, State) ->
  459. State2 = case command_tag(State) of
  460. C when C == parse; C == describe_statement ->
  461. finish(State, no_data, {ok, make_statement(State)});
  462. describe_portal ->
  463. finish(State, no_data, {ok, []})
  464. end,
  465. {noreply, State2};
  466. %% BindComplete
  467. on_message({?BIND_COMPLETE, <<>>}, State) ->
  468. State2 = case command_tag(State) of
  469. equery ->
  470. %% TODO send Describe as a part of equery, needs text format support
  471. notify(State, {columns, get_columns(State)});
  472. bind ->
  473. finish(State, ok);
  474. execute_batch ->
  475. Batch =
  476. case State#state.batch of
  477. [] ->
  478. {_, {_, B}} = queue:get(State#state.queue),
  479. B;
  480. B -> B
  481. end,
  482. State#state{batch = Batch}
  483. end,
  484. {noreply, State2};
  485. %% CloseComplete
  486. on_message({?CLOSE_COMPLETE, <<>>}, State) ->
  487. State2 = case command_tag(State) of
  488. equery ->
  489. State;
  490. close ->
  491. finish(State, ok)
  492. end,
  493. {noreply, State2};
  494. %% DataRow
  495. on_message({?DATA_ROW, <<_Count:?int16, Bin/binary>>}, State) ->
  496. Data = pgsql_wire:decode_data(get_columns(State), Bin),
  497. {noreply, add_row(State, Data)};
  498. %% PortalSuspended
  499. on_message({?PORTAL_SUSPENDED, <<>>}, State) ->
  500. State2 = finish(State,
  501. suspended,
  502. {partial, lists:reverse(State#state.rows)}),
  503. {noreply, State2};
  504. %% CommandComplete
  505. on_message({?COMMAND_COMPLETE, Bin}, State) ->
  506. Complete = pgsql_wire:decode_complete(Bin),
  507. Command = command_tag(State),
  508. Notice = {complete, Complete},
  509. Rows = lists:reverse(State#state.rows),
  510. State2 = case {Command, Complete, Rows} of
  511. {execute, {_, Count}, []} ->
  512. finish(State, Notice, {ok, Count});
  513. {execute, {_, Count}, _} ->
  514. finish(State, Notice, {ok, Count, Rows});
  515. {execute, _, _} ->
  516. finish(State, Notice, {ok, Rows});
  517. {execute_batch, {_, Count}, []} ->
  518. add_result(State, Notice, {ok, Count});
  519. {execute_batch, {_, Count}, _} ->
  520. add_result(State, Notice, {ok, Count, Rows});
  521. {execute_batch, _, _} ->
  522. add_result(State, Notice, {ok, Rows});
  523. {C, {_, Count}, []} when C == squery; C == equery ->
  524. add_result(State, Notice, {ok, Count});
  525. {C, {_, Count}, _} when C == squery; C == equery ->
  526. add_result(State, Notice, {ok, Count, get_columns(State), Rows});
  527. {C, _, _} when C == squery; C == equery ->
  528. add_result(State, Notice, {ok, get_columns(State), Rows})
  529. end,
  530. {noreply, State2};
  531. %% EmptyQueryResponse
  532. on_message({$I, _Bin}, State) ->
  533. Notice = {complete, empty},
  534. State2 = case command_tag(State) of
  535. execute ->
  536. finish(State, Notice, {ok, [], []});
  537. C when C == squery; C == equery ->
  538. add_result(State, Notice, {ok, [], []})
  539. end,
  540. {noreply, State2};
  541. %% ReadyForQuery
  542. on_message({$Z, <<Status:8>>}, State) ->
  543. State2 = case command_tag(State) of
  544. squery ->
  545. case State#state.results of
  546. [Result] ->
  547. finish(State, done, Result);
  548. Results ->
  549. finish(State, done, lists:reverse(Results))
  550. end;
  551. execute_batch ->
  552. finish(State, done, lists:reverse(State#state.results));
  553. equery ->
  554. case State#state.results of
  555. [Result] ->
  556. finish(State, done, Result);
  557. [] ->
  558. finish(State, done)
  559. end;
  560. sync ->
  561. finish(State, ok)
  562. end,
  563. {noreply, State2#state{txstatus = Status}};
  564. on_message(Error = {error, _}, State) ->
  565. State2 = case command_tag(State) of
  566. C when C == squery; C == equery; C == execute_batch ->
  567. add_result(State, Error, Error);
  568. _ ->
  569. sync_required(finish(State, Error))
  570. end,
  571. {noreply, State2};
  572. %% NoticeResponse
  573. on_message({$N, Data}, State) ->
  574. State2 = notify_async(State, {notice, pgsql_wire:decode_error(Data)}),
  575. {noreply, State2};
  576. %% ParameterStatus
  577. on_message({$S, Data}, State) ->
  578. [Name, Value] = pgsql_wire:decode_strings(Data),
  579. Parameters2 = lists:keystore(Name, 1, State#state.parameters,
  580. {Name, Value}),
  581. {noreply, State#state{parameters = Parameters2}};
  582. %% NotificationResponse
  583. on_message({$A, <<Pid:?int32, Strings/binary>>}, State) ->
  584. case pgsql_wire:decode_strings(Strings) of
  585. [Channel, Payload] -> ok;
  586. [Channel] -> Payload = <<>>
  587. end,
  588. State2 = notify_async(State, {notification, Channel, Pid, Payload}),
  589. {noreply, State2}.