epgsql_sock.erl 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711
  1. %%% Copyright (C) 2009 - Will Glozer. All rights reserved.
  2. %%% Copyright (C) 2011 - Anton Lebedevich. All rights reserved.
  3. -module(epgsql_sock).
  4. -behavior(gen_server).
  5. -export([start_link/0,
  6. close/1,
  7. get_parameter/2,
  8. cancel/1]).
  9. -export([handle_call/3, handle_cast/2, handle_info/2]).
  10. -export([init/1, code_change/3, terminate/2]).
  11. %% state callbacks
  12. -export([auth/2, initializing/2, on_message/2]).
  13. -include("epgsql.hrl").
  14. -include("epgsql_binary.hrl").
  15. %% Commands defined as per this page:
  16. %% http://www.postgresql.org/docs/9.2/static/protocol-message-formats.html
  17. %% Commands
  18. -define(BIND, $B).
  19. -define(CLOSE, $C).
  20. -define(DESCRIBE, $D).
  21. -define(EXECUTE, $E).
  22. -define(FLUSH, $H).
  23. -define(PASSWORD, $p).
  24. -define(PARSE, $P).
  25. -define(SIMPLEQUERY, $Q).
  26. -define(AUTHENTICATION_REQUEST, $R).
  27. -define(SYNC, $S).
  28. %% Parameters
  29. -define(PREPARED_STATEMENT, $S).
  30. -define(PORTAL, $P).
  31. %% Responses
  32. -define(PARSE_COMPLETE, $1).
  33. -define(BIND_COMPLETE, $2).
  34. -define(CLOSE_COMPLETE, $3).
  35. -define(NOTIFICATION, $A).
  36. -define(COMMAND_COMPLETE, $C).
  37. -define(DATA_ROW, $D).
  38. -define(EMPTY_QUERY, $I).
  39. -define(CANCELLATION_KEY, $K).
  40. -define(NO_DATA, $n).
  41. -define(NOTICE, $N).
  42. -define(PORTAL_SUSPENDED, $s).
  43. -define(PARAMETER_STATUS, $S).
  44. -define(PARAMETER_DESCRIPTION, $t).
  45. -define(ROW_DESCRIPTION, $T).
  46. -define(READY_FOR_QUERY, $Z).
  47. -record(state, {mod,
  48. sock,
  49. data = <<>>,
  50. backend,
  51. handler,
  52. codec,
  53. queue = queue:new(),
  54. async,
  55. parameters = [],
  56. types = [],
  57. columns = [],
  58. rows = [],
  59. results = [],
  60. batch = [],
  61. sync_required,
  62. txstatus}).
  63. %% -- client interface --
  64. start_link() ->
  65. gen_server:start_link(?MODULE, [], []).
  66. close(C) when is_pid(C) ->
  67. catch gen_server:cast(C, stop),
  68. ok.
  69. get_parameter(C, Name) ->
  70. gen_server:call(C, {get_parameter, to_binary(Name)}, infinity).
  71. cancel(S) ->
  72. gen_server:cast(S, cancel).
  73. %% -- gen_server implementation --
  74. init([]) ->
  75. {ok, #state{}}.
  76. handle_call({update_type_cache, TypeInfos}, _From, #state{codec = Codec} = State) ->
  77. Codec2 = epgsql_binary:update_type_cache(TypeInfos, Codec),
  78. {reply, ok, State#state{codec = Codec2}};
  79. handle_call({get_parameter, Name}, _From, State) ->
  80. Value1 = case lists:keysearch(Name, 1, State#state.parameters) of
  81. {value, {Name, Value}} -> Value;
  82. false -> undefined
  83. end,
  84. {reply, {ok, Value1}, State};
  85. handle_call(Command, From, State) ->
  86. #state{queue = Q} = State,
  87. Req = {{call, From}, Command},
  88. command(Command, State#state{queue = queue:in(Req, Q)}).
  89. handle_cast({{Method, From, Ref}, Command} = Req, State)
  90. when ((Method == cast) or (Method == incremental)),
  91. is_pid(From),
  92. is_reference(Ref) ->
  93. #state{queue = Q} = State,
  94. command(Command, State#state{queue = queue:in(Req, Q)});
  95. handle_cast(stop, State) ->
  96. {stop, normal, flush_queue(State, {error, closed})};
  97. handle_cast(cancel, State = #state{backend = {Pid, Key},
  98. sock = TimedOutSock}) ->
  99. {ok, {Addr, Port}} = case State#state.mod of
  100. gen_tcp -> inet:peername(TimedOutSock);
  101. ssl -> ssl:peername(TimedOutSock)
  102. end,
  103. SockOpts = [{active, false}, {packet, raw}, binary],
  104. %% TODO timeout
  105. {ok, Sock} = gen_tcp:connect(Addr, Port, SockOpts),
  106. Msg = <<16:?int32, 80877102:?int32, Pid:?int32, Key:?int32>>,
  107. ok = gen_tcp:send(Sock, Msg),
  108. gen_tcp:close(Sock),
  109. {noreply, State}.
  110. handle_info({Closed, Sock}, #state{sock = Sock} = State)
  111. when Closed == tcp_closed; Closed == ssl_closed ->
  112. {stop, sock_closed, flush_queue(State, {error, sock_closed})};
  113. handle_info({Error, Sock, Reason}, #state{sock = Sock} = State)
  114. when Error == tcp_error; Error == ssl_error ->
  115. Why = {sock_error, Reason},
  116. {stop, Why, flush_queue(State, {error, Why})};
  117. handle_info({inet_reply, _, ok}, State) ->
  118. {noreply, State};
  119. handle_info({inet_reply, _, Status}, State) ->
  120. {stop, Status, flush_queue(State, {error, Status})};
  121. handle_info({_, Sock, Data2}, #state{data = Data, sock = Sock} = State) ->
  122. loop(State#state{data = <<Data/binary, Data2/binary>>}).
  123. terminate(_Reason, _State) ->
  124. %% TODO send termination msg, close socket ??
  125. ok.
  126. code_change(_OldVsn, State, _Extra) ->
  127. {ok, State}.
  128. %% -- internal functions --
  129. command(Command, State = #state{sync_required = true})
  130. when Command /= sync ->
  131. {noreply, finish(State, {error, sync_required})};
  132. command({connect, Host, Username, Password, Opts}, State) ->
  133. Timeout = proplists:get_value(timeout, Opts, 5000),
  134. Port = proplists:get_value(port, Opts, 5432),
  135. SockOpts = [{active, false}, {packet, raw}, binary, {nodelay, true}, {keepalive, true}],
  136. case gen_tcp:connect(Host, Port, SockOpts, Timeout) of
  137. {ok, Sock} ->
  138. %% Increase the buffer size. Following the recommendation in the inet man page:
  139. %%
  140. %% It is recommended to have val(buffer) >=
  141. %% max(val(sndbuf),val(recbuf)).
  142. {ok, [{recbuf, RecBufSize}, {sndbuf, SndBufSize}]} =
  143. inet:getopts(Sock, [recbuf, sndbuf]),
  144. inet:setopts(Sock, [{buffer, max(RecBufSize, SndBufSize)}]),
  145. State2 = case proplists:get_value(ssl, Opts) of
  146. T when T == true; T == required ->
  147. start_ssl(Sock, T, Opts, State);
  148. _ ->
  149. State#state{mod = gen_tcp, sock = Sock}
  150. end,
  151. Opts2 = ["user", 0, Username, 0],
  152. Opts3 = case proplists:get_value(database, Opts, undefined) of
  153. undefined -> Opts2;
  154. Database -> [Opts2 | ["database", 0, Database, 0]]
  155. end,
  156. send(State2, [<<196608:?int32>>, Opts3, 0]),
  157. Async = proplists:get_value(async, Opts, undefined),
  158. setopts(State2, [{active, true}]),
  159. put(username, Username),
  160. put(password, Password),
  161. {noreply,
  162. State2#state{handler = auth,
  163. async = Async}};
  164. {error, Reason} = Error ->
  165. {stop, Reason, finish(State, Error)}
  166. end;
  167. command({squery, Sql}, State) ->
  168. send(State, ?SIMPLEQUERY, [Sql, 0]),
  169. {noreply, State};
  170. %% TODO add fast_equery command that doesn't need parsed statement,
  171. %% uses default (text) column format,
  172. %% sends Describe after Bind to get RowDescription
  173. command({equery, Statement, Parameters}, #state{codec = Codec} = State) ->
  174. #statement{name = StatementName, columns = Columns} = Statement,
  175. Bin1 = epgsql_wire:encode_parameters(Parameters, Codec),
  176. Bin2 = epgsql_wire:encode_formats(Columns),
  177. send(State, ?BIND, ["", 0, StatementName, 0, Bin1, Bin2]),
  178. send(State, ?EXECUTE, ["", 0, <<0:?int32>>]),
  179. send(State, ?CLOSE, [?PREPARED_STATEMENT, StatementName, 0]),
  180. send(State, ?SYNC, []),
  181. {noreply, State};
  182. command({parse, Name, Sql, Types}, State) ->
  183. Bin = epgsql_wire:encode_types(Types, State#state.codec),
  184. send(State, ?PARSE, [Name, 0, Sql, 0, Bin]),
  185. send(State, ?DESCRIBE, [?PREPARED_STATEMENT, Name, 0]),
  186. send(State, ?FLUSH, []),
  187. {noreply, State};
  188. command({bind, Statement, PortalName, Parameters}, #state{codec = Codec} = State) ->
  189. #statement{name = StatementName, columns = Columns, types = Types} = Statement,
  190. Typed_Parameters = lists:zip(Types, Parameters),
  191. Bin1 = epgsql_wire:encode_parameters(Typed_Parameters, Codec),
  192. Bin2 = epgsql_wire:encode_formats(Columns),
  193. send(State, ?BIND, [PortalName, 0, StatementName, 0, Bin1, Bin2]),
  194. send(State, ?FLUSH, []),
  195. {noreply, State};
  196. command({execute, _Statement, PortalName, MaxRows}, State) ->
  197. send(State, ?EXECUTE, [PortalName, 0, <<MaxRows:?int32>>]),
  198. send(State, ?FLUSH, []),
  199. {noreply, State};
  200. command({execute_batch, Batch}, State) ->
  201. #state{mod = Mod, sock = Sock, codec = Codec} = State,
  202. BindExecute =
  203. lists:map(
  204. fun({Statement, Parameters}) ->
  205. #statement{name = StatementName,
  206. columns = Columns,
  207. types = Types} = Statement,
  208. Typed_Parameters = lists:zip(Types, Parameters),
  209. Bin1 = epgsql_wire:encode_parameters(Typed_Parameters, Codec),
  210. Bin2 = epgsql_wire:encode_formats(Columns),
  211. [epgsql_wire:encode(?BIND, [0, StatementName, 0,
  212. Bin1, Bin2]),
  213. epgsql_wire:encode(?EXECUTE, [0, <<0:?int32>>])]
  214. end,
  215. Batch),
  216. Sync = epgsql_wire:encode(?SYNC, []),
  217. do_send(Mod, Sock, [BindExecute, Sync]),
  218. {noreply, State};
  219. command({describe_statement, Name}, State) ->
  220. send(State, ?DESCRIBE, [?PREPARED_STATEMENT, Name, 0]),
  221. send(State, ?FLUSH, []),
  222. {noreply, State};
  223. command({describe_portal, Name}, State) ->
  224. send(State, ?DESCRIBE, [?PORTAL, Name, 0]),
  225. send(State, ?FLUSH, []),
  226. {noreply, State};
  227. command({close, Type, Name}, State) ->
  228. Type2 = case Type of
  229. statement -> ?PREPARED_STATEMENT;
  230. portal -> ?PORTAL
  231. end,
  232. send(State, ?CLOSE, [Type2, Name, 0]),
  233. send(State, ?FLUSH, []),
  234. {noreply, State};
  235. command(sync, State) ->
  236. send(State, ?SYNC, []),
  237. {noreply, State#state{sync_required = false}}.
  238. start_ssl(S, Flag, Opts, State) ->
  239. ok = gen_tcp:send(S, <<8:?int32, 80877103:?int32>>),
  240. Timeout = proplists:get_value(timeout, Opts, 5000),
  241. {ok, <<Code>>} = gen_tcp:recv(S, 1, Timeout),
  242. case Code of
  243. $S ->
  244. case ssl:connect(S, Opts, Timeout) of
  245. {ok, S2} -> State#state{mod = ssl, sock = S2};
  246. {error, Reason} -> exit({ssl_negotiation_failed, Reason})
  247. end;
  248. $N ->
  249. case Flag of
  250. true -> State;
  251. required -> exit(ssl_not_available)
  252. end
  253. end.
  254. setopts(#state{mod = Mod, sock = Sock}, Opts) ->
  255. case Mod of
  256. gen_tcp -> inet:setopts(Sock, Opts);
  257. ssl -> ssl:setopts(Sock, Opts)
  258. end.
  259. send(#state{mod = Mod, sock = Sock}, Data) ->
  260. do_send(Mod, Sock, epgsql_wire:encode(Data)).
  261. send(#state{mod = Mod, sock = Sock}, Type, Data) ->
  262. do_send(Mod, Sock, epgsql_wire:encode(Type, Data)).
  263. do_send(gen_tcp, Sock, Bin) ->
  264. try erlang:port_command(Sock, Bin) of
  265. true ->
  266. ok
  267. catch
  268. error:_Error ->
  269. {error,einval}
  270. end;
  271. do_send(Mod, Sock, Bin) ->
  272. Mod:send(Sock, Bin).
  273. loop(#state{data = Data, handler = Handler} = State) ->
  274. case epgsql_wire:decode_message(Data) of
  275. {Message, Tail} ->
  276. case ?MODULE:Handler(Message, State#state{data = Tail}) of
  277. {noreply, State2} ->
  278. loop(State2);
  279. R = {stop, _Reason2, _State2} ->
  280. R
  281. end;
  282. _ ->
  283. {noreply, State}
  284. end.
  285. finish(State, Result) ->
  286. finish(State, Result, Result).
  287. finish(State = #state{queue = Q}, Notice, Result) ->
  288. case queue:get(Q) of
  289. {{cast, From, Ref}, _} ->
  290. From ! {self(), Ref, Result};
  291. {{incremental, From, Ref}, _} ->
  292. From ! {self(), Ref, Notice};
  293. {{call, From}, _} ->
  294. gen_server:reply(From, Result)
  295. end,
  296. State#state{queue = queue:drop(Q),
  297. types = [],
  298. columns = [],
  299. rows = [],
  300. results = [],
  301. batch = []}.
  302. add_result(State, Notice, Result) ->
  303. #state{queue = Q, results = Results, batch = Batch} = State,
  304. Results2 = case queue:get(Q) of
  305. {{incremental, From, Ref}, _} ->
  306. From ! {self(), Ref, Notice},
  307. Results;
  308. _ ->
  309. [Result | Results]
  310. end,
  311. Batch2 = case Batch of
  312. [] -> [];
  313. _ -> tl(Batch)
  314. end,
  315. State#state{types = [],
  316. columns = [],
  317. rows = [],
  318. results = Results2,
  319. batch = Batch2}.
  320. add_row(State = #state{queue = Q, rows = Rows}, Data) ->
  321. Rows2 = case queue:get(Q) of
  322. {{incremental, From, Ref}, _} ->
  323. From ! {self(), Ref, {data, Data}},
  324. Rows;
  325. _ ->
  326. [Data | Rows]
  327. end,
  328. State#state{rows = Rows2}.
  329. notify(State = #state{queue = Q}, Notice) ->
  330. case queue:get(Q) of
  331. {{incremental, From, Ref}, _} ->
  332. From ! {self(), Ref, Notice};
  333. _ ->
  334. ignore
  335. end,
  336. State.
  337. notify_async(State = #state{async = Pid}, Msg) ->
  338. case is_pid(Pid) of
  339. true -> Pid ! {epgsql, self(), Msg};
  340. false -> false
  341. end,
  342. State.
  343. command_tag(#state{queue = Q}) ->
  344. {_, Req} = queue:get(Q),
  345. if is_tuple(Req) ->
  346. element(1, Req);
  347. is_atom(Req) ->
  348. Req
  349. end.
  350. get_columns(State) ->
  351. #state{queue = Q, columns = Columns, batch = Batch} = State,
  352. case queue:get(Q) of
  353. {_, {equery, #statement{columns = C}, _}} ->
  354. C;
  355. {_, {execute, #statement{columns = C}, _, _}} ->
  356. C;
  357. {_, {squery, _}} ->
  358. Columns;
  359. {_, {execute_batch, _}} ->
  360. [{#statement{columns = C}, _} | _] = Batch,
  361. C
  362. end.
  363. make_statement(State) ->
  364. #state{queue = Q, types = Types, columns = Columns} = State,
  365. Name = case queue:get(Q) of
  366. {_, {parse, N, _, _}} -> N;
  367. {_, {describe_statement, N}} -> N
  368. end,
  369. #statement{name = Name, types = Types, columns = Columns}.
  370. sync_required(#state{queue = Q} = State) ->
  371. case queue:is_empty(Q) of
  372. false ->
  373. case command_tag(State) of
  374. sync ->
  375. State;
  376. _ ->
  377. sync_required(finish(State, {error, sync_required}))
  378. end;
  379. true ->
  380. State#state{sync_required = true}
  381. end.
  382. flush_queue(#state{queue = Q} = State, Error) ->
  383. case queue:is_empty(Q) of
  384. false ->
  385. flush_queue(finish(State, Error), Error);
  386. true -> State
  387. end.
  388. to_binary(B) when is_binary(B) -> B;
  389. to_binary(L) when is_list(L) -> list_to_binary(L).
  390. hex(Bin) ->
  391. HChar = fun(N) when N < 10 -> $0 + N;
  392. (N) when N < 16 -> $W + N
  393. end,
  394. <<<<(HChar(H)), (HChar(L))>> || <<H:4, L:4>> <= Bin>>.
  395. %% -- backend message handling --
  396. %% AuthenticationOk
  397. auth({?AUTHENTICATION_REQUEST, <<0:?int32>>}, State) ->
  398. {noreply, State#state{handler = initializing}};
  399. %% AuthenticationCleartextPassword
  400. auth({?AUTHENTICATION_REQUEST, <<3:?int32>>}, State) ->
  401. send(State, ?PASSWORD, [get(password), 0]),
  402. {noreply, State};
  403. %% AuthenticationMD5Password
  404. auth({?AUTHENTICATION_REQUEST, <<5:?int32, Salt:4/binary>>}, State) ->
  405. Digest1 = hex(erlang:md5([get(password), get(username)])),
  406. Str = ["md5", hex(erlang:md5([Digest1, Salt])), 0],
  407. send(State, ?PASSWORD, Str),
  408. {noreply, State};
  409. auth({?AUTHENTICATION_REQUEST, <<M:?int32, _/binary>>}, State) ->
  410. Method = case M of
  411. 2 -> kerberosV5;
  412. 4 -> crypt;
  413. 6 -> scm;
  414. 7 -> gss;
  415. 8 -> sspi;
  416. _ -> unknown
  417. end,
  418. State2 = finish(State, {error, {unsupported_auth_method, Method}}),
  419. {stop, normal, State2};
  420. %% ErrorResponse
  421. auth({error, E}, State) ->
  422. Why = case E#error.code of
  423. <<"28000">> -> invalid_authorization_specification;
  424. <<"28P01">> -> invalid_password;
  425. Any -> Any
  426. end,
  427. {stop, normal, finish(State, {error, Why})};
  428. auth(Other, State) ->
  429. on_message(Other, State).
  430. %% BackendKeyData
  431. initializing({?CANCELLATION_KEY, <<Pid:?int32, Key:?int32>>}, State) ->
  432. {noreply, State#state{backend = {Pid, Key}}};
  433. %% ReadyForQuery
  434. initializing({?READY_FOR_QUERY, <<Status:8>>}, State) ->
  435. #state{parameters = Parameters} = State,
  436. erase(username),
  437. erase(password),
  438. %% TODO decode dates to now() format
  439. case lists:keysearch(<<"integer_datetimes">>, 1, Parameters) of
  440. {value, {_, <<"on">>}} -> put(datetime_mod, epgsql_idatetime);
  441. {value, {_, <<"off">>}} -> put(datetime_mod, epgsql_fdatetime)
  442. end,
  443. State2 = finish(State#state{handler = on_message,
  444. txstatus = Status,
  445. codec = epgsql_binary:new_codec([])},
  446. connected),
  447. {noreply, State2};
  448. initializing({error, _} = Error, State) ->
  449. {stop, normal, finish(State, Error)};
  450. initializing(Other, State) ->
  451. on_message(Other, State).
  452. %% ParseComplete
  453. on_message({?PARSE_COMPLETE, <<>>}, State) ->
  454. {noreply, State};
  455. %% ParameterDescription
  456. on_message({?PARAMETER_DESCRIPTION, <<_Count:?int16, Bin/binary>>}, State) ->
  457. Types = [epgsql_binary:oid2type(Oid, State#state.codec) || <<Oid:?int32>> <= Bin],
  458. State2 = notify(State#state{types = Types}, {types, Types}),
  459. {noreply, State2};
  460. %% RowDescription
  461. on_message({?ROW_DESCRIPTION, <<Count:?int16, Bin/binary>>}, State) ->
  462. Columns = epgsql_wire:decode_columns(Count, Bin, State#state.codec),
  463. Columns2 =
  464. case command_tag(State) of
  465. C when C == describe_portal; C == squery ->
  466. Columns;
  467. C when C == parse; C == describe_statement ->
  468. [Col#column{format = epgsql_wire:format(Col#column.type)}
  469. || Col <- Columns]
  470. end,
  471. State2 = State#state{columns = Columns2},
  472. Message = {columns, Columns2},
  473. State3 = case command_tag(State2) of
  474. squery ->
  475. notify(State2, Message);
  476. T when T == parse; T == describe_statement ->
  477. finish(State2, Message, {ok, make_statement(State2)});
  478. describe_portal ->
  479. finish(State2, Message, {ok, Columns})
  480. end,
  481. {noreply, State3};
  482. %% NoData
  483. on_message({?NO_DATA, <<>>}, State) ->
  484. State2 = case command_tag(State) of
  485. C when C == parse; C == describe_statement ->
  486. finish(State, no_data, {ok, make_statement(State)});
  487. describe_portal ->
  488. finish(State, no_data, {ok, []})
  489. end,
  490. {noreply, State2};
  491. %% BindComplete
  492. on_message({?BIND_COMPLETE, <<>>}, State) ->
  493. State2 = case command_tag(State) of
  494. equery ->
  495. %% TODO send Describe as a part of equery, needs text format support
  496. notify(State, {columns, get_columns(State)});
  497. bind ->
  498. finish(State, ok);
  499. execute_batch ->
  500. Batch =
  501. case State#state.batch of
  502. [] ->
  503. {_, {_, B}} = queue:get(State#state.queue),
  504. B;
  505. B -> B
  506. end,
  507. State#state{batch = Batch}
  508. end,
  509. {noreply, State2};
  510. %% CloseComplete
  511. on_message({?CLOSE_COMPLETE, <<>>}, State) ->
  512. State2 = case command_tag(State) of
  513. equery ->
  514. State;
  515. close ->
  516. finish(State, ok)
  517. end,
  518. {noreply, State2};
  519. %% DataRow
  520. on_message({?DATA_ROW, <<_Count:?int16, Bin/binary>>}, State) ->
  521. Data = epgsql_wire:decode_data(get_columns(State), Bin, State#state.codec),
  522. {noreply, add_row(State, Data)};
  523. %% PortalSuspended
  524. on_message({?PORTAL_SUSPENDED, <<>>}, State) ->
  525. State2 = finish(State,
  526. suspended,
  527. {partial, lists:reverse(State#state.rows)}),
  528. {noreply, State2};
  529. %% CommandComplete
  530. on_message({?COMMAND_COMPLETE, Bin}, State) ->
  531. Complete = epgsql_wire:decode_complete(Bin),
  532. Command = command_tag(State),
  533. Notice = {complete, Complete},
  534. Rows = lists:reverse(State#state.rows),
  535. State2 = case {Command, Complete, Rows} of
  536. {execute, {_, Count}, []} ->
  537. finish(State, Notice, {ok, Count});
  538. {execute, {_, Count}, _} ->
  539. finish(State, Notice, {ok, Count, Rows});
  540. {execute, _, _} ->
  541. finish(State, Notice, {ok, Rows});
  542. {execute_batch, {_, Count}, []} ->
  543. add_result(State, Notice, {ok, Count});
  544. {execute_batch, {_, Count}, _} ->
  545. add_result(State, Notice, {ok, Count, Rows});
  546. {execute_batch, _, _} ->
  547. add_result(State, Notice, {ok, Rows});
  548. {C, {_, Count}, []} when C == squery; C == equery ->
  549. add_result(State, Notice, {ok, Count});
  550. {C, {_, Count}, _} when C == squery; C == equery ->
  551. add_result(State, Notice, {ok, Count, get_columns(State), Rows});
  552. {C, _, _} when C == squery; C == equery ->
  553. add_result(State, Notice, {ok, get_columns(State), Rows})
  554. end,
  555. {noreply, State2};
  556. %% EmptyQueryResponse
  557. on_message({?EMPTY_QUERY, _Bin}, State) ->
  558. Notice = {complete, empty},
  559. State2 = case command_tag(State) of
  560. execute ->
  561. finish(State, Notice, {ok, [], []});
  562. C when C == squery; C == equery ->
  563. add_result(State, Notice, {ok, [], []})
  564. end,
  565. {noreply, State2};
  566. %% ReadyForQuery
  567. on_message({?READY_FOR_QUERY, <<Status:8>>}, State) ->
  568. State2 = case command_tag(State) of
  569. squery ->
  570. case State#state.results of
  571. [Result] ->
  572. finish(State, done, Result);
  573. Results ->
  574. finish(State, done, lists:reverse(Results))
  575. end;
  576. execute_batch ->
  577. finish(State, done, lists:reverse(State#state.results));
  578. equery ->
  579. case State#state.results of
  580. [Result] ->
  581. finish(State, done, Result);
  582. [] ->
  583. finish(State, done)
  584. end;
  585. sync ->
  586. finish(State, ok)
  587. end,
  588. {noreply, State2#state{txstatus = Status}};
  589. on_message(Error = {error, _}, State) ->
  590. State2 = case command_tag(State) of
  591. C when C == squery; C == equery; C == execute_batch ->
  592. add_result(State, Error, Error);
  593. _ ->
  594. sync_required(finish(State, Error))
  595. end,
  596. {noreply, State2};
  597. %% NoticeResponse
  598. on_message({?NOTICE, Data}, State) ->
  599. State2 = notify_async(State, {notice, epgsql_wire:decode_error(Data)}),
  600. {noreply, State2};
  601. %% ParameterStatus
  602. on_message({?PARAMETER_STATUS, Data}, State) ->
  603. [Name, Value] = epgsql_wire:decode_strings(Data),
  604. Parameters2 = lists:keystore(Name, 1, State#state.parameters,
  605. {Name, Value}),
  606. {noreply, State#state{parameters = Parameters2}};
  607. %% NotificationResponse
  608. on_message({?NOTIFICATION, <<Pid:?int32, Strings/binary>>}, State) ->
  609. {Channel1, Payload1} = case epgsql_wire:decode_strings(Strings) of
  610. [Channel, Payload] -> {Channel, Payload};
  611. [Channel] -> {Channel, <<>>}
  612. end,
  613. State2 = notify_async(State, {notification, Channel1, Pid, Payload1}),
  614. {noreply, State2}.