pgsql_connection.erl 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659
  1. %%% Copyright (C) 2008 - Will Glozer. All rights reserved.
  2. -module(pgsql_connection).
  3. -behavior(gen_fsm).
  4. -export([start_link/0, stop/1, connect/5, get_parameter/2]).
  5. -export([squery/2, equery/3]).
  6. -export([parse/4, bind/4, execute/4, describe/3]).
  7. -export([close/3, sync/1]).
  8. -export([init/1, handle_event/3, handle_sync_event/4]).
  9. -export([handle_info/3, terminate/3, code_change/4]).
  10. -export([startup/3, auth/2, initializing/2, ready/2, ready/3]).
  11. -export([querying/2, parsing/2, binding/2, describing/2]).
  12. -export([executing/2, closing/2, synchronizing/2, timeout/2]).
  13. -export([aborted/3]).
  14. -include("pgsql.hrl").
  15. -include("pgsql_binary.hrl").
  16. -record(state, {
  17. reader,
  18. sock,
  19. timeout,
  20. parameters = [],
  21. reply,
  22. reply_to,
  23. async,
  24. backend,
  25. statement,
  26. txstatus}).
  27. %% -- client interface --
  28. start_link() ->
  29. gen_fsm:start_link(?MODULE, [], []).
  30. stop(C) ->
  31. gen_fsm:send_all_state_event(C, stop).
  32. connect(C, Host, Username, Password, Opts) ->
  33. gen_fsm:sync_send_event(C, {connect, Host, Username, Password, Opts}, infinity).
  34. get_parameter(C, Name) ->
  35. gen_fsm:sync_send_event(C, {get_parameter, to_binary(Name)}).
  36. squery(C, Sql) ->
  37. gen_fsm:sync_send_event(C, {squery, Sql}, infinity).
  38. equery(C, Statement, Parameters) ->
  39. gen_fsm:sync_send_event(C, {equery, Statement, Parameters}, infinity).
  40. parse(C, Name, Sql, Types) ->
  41. gen_fsm:sync_send_event(C, {parse, Name, Sql, Types}, infinity).
  42. bind(C, Statement, PortalName, Parameters) ->
  43. gen_fsm:sync_send_event(C, {bind, Statement, PortalName, Parameters}, infinity).
  44. execute(C, Statement, PortalName, MaxRows) ->
  45. gen_fsm:sync_send_event(C, {execute, Statement, PortalName, MaxRows}, infinity).
  46. describe(C, Type, Name) ->
  47. gen_fsm:sync_send_event(C, {describe, Type, Name}, infinity).
  48. close(C, Type, Name) ->
  49. gen_fsm:sync_send_event(C, {close, Type, Name}, infinity).
  50. sync(C) ->
  51. gen_fsm:sync_send_event(C, sync, infinity).
  52. %% -- gen_fsm implementation --
  53. init([]) ->
  54. process_flag(trap_exit, true),
  55. {ok, startup, #state{}}.
  56. handle_event({notice, _Notice} = Msg, State_Name, State) ->
  57. notify_async(State, Msg),
  58. {next_state, State_Name, State};
  59. handle_event({notification, _Channel, _Pid, _Payload} = Msg, State_Name, State) ->
  60. notify_async(State, Msg),
  61. {next_state, State_Name, State};
  62. handle_event({parameter_status, Name, Value}, State_Name, State) ->
  63. Parameters2 = lists:keystore(Name, 1, State#state.parameters, {Name, Value}),
  64. {next_state, State_Name, State#state{parameters = Parameters2}};
  65. handle_event(stop, _State_Name, State) ->
  66. {stop, normal, State};
  67. handle_event(Event, _State_Name, State) ->
  68. {stop, {unsupported_event, Event}, State}.
  69. handle_sync_event(Event, _From, _State_Name, State) ->
  70. {stop, {unsupported_sync_event, Event}, State}.
  71. handle_info({'EXIT', Pid, Reason}, _State_Name, State = #state{sock = Pid}) ->
  72. {stop, Reason, State};
  73. handle_info(Info, _State_Name, State) ->
  74. {stop, {unsupported_info, Info}, State}.
  75. terminate(_Reason, _State_Name, State = #state{sock = Sock})
  76. when Sock =/= undefined ->
  77. send(State, $X, []);
  78. terminate(_Reason, _State_Name, _State) ->
  79. ok.
  80. code_change(_Old_Vsn, State_Name, State, _Extra) ->
  81. {ok, State_Name, State}.
  82. %% -- states --
  83. startup({connect, Host, Username, Password, Opts}, From, State) ->
  84. Timeout = proplists:get_value(timeout, Opts, 5000),
  85. Async = proplists:get_value(async, Opts, undefined),
  86. case pgsql_sock:start_link(self(), Host, Username, Opts) of
  87. {ok, Sock} ->
  88. put(username, Username),
  89. put(password, Password),
  90. State2 = State#state{
  91. sock = Sock,
  92. timeout = Timeout,
  93. reply_to = From,
  94. async = Async},
  95. {next_state, auth, State2, Timeout};
  96. Error ->
  97. {stop, normal, Error, State}
  98. end.
  99. %% AuthenticationOk
  100. auth({$R, <<0:?int32>>}, State) ->
  101. #state{timeout = Timeout} = State,
  102. {next_state, initializing, State, Timeout};
  103. %% AuthenticationCleartextPassword
  104. auth({$R, <<3:?int32>>}, State) ->
  105. #state{timeout = Timeout} = State,
  106. send(State, $p, [get(password), 0]),
  107. {next_state, auth, State, Timeout};
  108. %% AuthenticationMD5Password
  109. auth({$R, <<5:?int32, Salt:4/binary>>}, State) ->
  110. #state{timeout = Timeout} = State,
  111. Digest1 = hex(erlang:md5([get(password), get(username)])),
  112. Str = ["md5", hex(erlang:md5([Digest1, Salt])), 0],
  113. send(State, $p, Str),
  114. {next_state, auth, State, Timeout};
  115. auth({$R, <<M:?int32, _/binary>>}, State) ->
  116. case M of
  117. 2 -> Method = kerberosV5;
  118. 4 -> Method = crypt;
  119. 6 -> Method = scm;
  120. 7 -> Method = gss;
  121. 8 -> Method = sspi;
  122. _ -> Method = unknown
  123. end,
  124. Error = {error, {unsupported_auth_method, Method}},
  125. gen_fsm:reply(State#state.reply_to, Error),
  126. {stop, normal, State};
  127. %% ErrorResponse
  128. auth({error, E}, State) ->
  129. case E#error.code of
  130. <<"28000">> -> Why = invalid_authorization_specification;
  131. <<"28P01">> -> Why = invalid_password;
  132. Any -> Why = Any
  133. end,
  134. gen_fsm:reply(State#state.reply_to, {error, Why}),
  135. {stop, normal, State};
  136. auth(timeout, State) ->
  137. gen_fsm:reply(State#state.reply_to, {error, timeout}),
  138. {stop, normal, State}.
  139. %% BackendKeyData
  140. initializing({$K, <<Pid:?int32, Key:?int32>>}, State) ->
  141. #state{timeout = Timeout} = State,
  142. State2 = State#state{backend = {Pid, Key}},
  143. {next_state, initializing, State2, Timeout};
  144. %% ErrorResponse
  145. initializing({error, E}, State) ->
  146. case E#error.code of
  147. <<"28000">> -> Why = invalid_authorization_specification;
  148. Any -> Why = Any
  149. end,
  150. gen_fsm:reply(State#state.reply_to, {error, Why}),
  151. {stop, normal, State};
  152. initializing(timeout, State) ->
  153. gen_fsm:reply(State#state.reply_to, {error, timeout}),
  154. {stop, normal, State};
  155. %% ReadyForQuery
  156. initializing({$Z, <<Status:8>>}, State) ->
  157. #state{parameters = Parameters, reply_to = Reply_To} = State,
  158. erase(username),
  159. erase(password),
  160. case lists:keysearch(<<"integer_datetimes">>, 1, Parameters) of
  161. {value, {_, <<"on">>}} -> put(datetime_mod, pgsql_idatetime);
  162. {value, {_, <<"off">>}} -> put(datetime_mod, pgsql_fdatetime)
  163. end,
  164. gen_fsm:reply(Reply_To, {ok, self()}),
  165. {next_state, ready, State#state{txstatus = Status}}.
  166. ready(_Msg, State) ->
  167. {next_state, ready, State}.
  168. %% execute simple query
  169. ready({squery, Sql}, From, State) ->
  170. #state{timeout = Timeout} = State,
  171. send(State, $Q, [Sql, 0]),
  172. State2 = State#state{statement = #statement{}, reply_to = From},
  173. {reply, ok, querying, State2, Timeout};
  174. %% execute extended query
  175. ready({equery, Statement, Parameters}, From, State) ->
  176. #state{timeout = Timeout} = State,
  177. #statement{name = StatementName, columns = Columns} = Statement,
  178. Bin1 = encode_parameters(Parameters),
  179. Bin2 = encode_formats(Columns),
  180. send(State, $B, ["", 0, StatementName, 0, Bin1, Bin2]),
  181. send(State, $E, ["", 0, <<0:?int32>>]),
  182. send(State, $C, [$S, "", 0]),
  183. send(State, $S, []),
  184. State2 = State#state{statement = Statement, reply_to = From},
  185. {reply, ok, querying, State2, Timeout};
  186. ready({get_parameter, Name}, _From, State) ->
  187. case lists:keysearch(Name, 1, State#state.parameters) of
  188. {value, {Name, Value}} -> Value;
  189. false -> Value = undefined
  190. end,
  191. {reply, {ok, Value}, ready, State};
  192. ready({parse, Name, Sql, Types}, From, State) ->
  193. #state{timeout = Timeout} = State,
  194. Bin = encode_types(Types),
  195. send(State, $P, [Name, 0, Sql, 0, Bin]),
  196. send(State, $D, [$S, Name, 0]),
  197. send(State, $H, []),
  198. S = #statement{name = Name},
  199. State2 = State#state{statement = S, reply_to = From},
  200. {next_state, parsing, State2, Timeout};
  201. ready({bind, Statement, PortalName, Parameters}, From, State) ->
  202. #state{timeout = Timeout} = State,
  203. #statement{name = StatementName, columns = Columns, types = Types} = Statement,
  204. Typed_Parameters = lists:zip(Types, Parameters),
  205. Bin1 = encode_parameters(Typed_Parameters),
  206. Bin2 = encode_formats(Columns),
  207. send(State, $B, [PortalName, 0, StatementName, 0, Bin1, Bin2]),
  208. send(State, $H, []),
  209. State2 = State#state{statement = Statement, reply_to = From},
  210. {next_state, binding, State2, Timeout};
  211. ready({execute, Statement, PortalName, MaxRows}, From, State) ->
  212. #state{timeout = Timeout} = State,
  213. send(State, $E, [PortalName, 0, <<MaxRows:?int32>>]),
  214. send(State, $H, []),
  215. State2 = State#state{statement = Statement, reply_to = From},
  216. {reply, ok, executing, State2, Timeout};
  217. ready({describe, Type, Name}, From, State) ->
  218. #state{timeout = Timeout} = State,
  219. case Type of
  220. statement -> Type2 = $S;
  221. portal -> Type2 = $P
  222. end,
  223. send(State, $D, [Type2, Name, 0]),
  224. send(State, $H, []),
  225. {next_state, describing, State#state{reply_to = From}, Timeout};
  226. ready({close, Type, Name}, From, State) ->
  227. #state{timeout = Timeout} = State,
  228. case Type of
  229. statement -> Type2 = $S;
  230. portal -> Type2 = $P
  231. end,
  232. send(State, $C, [Type2, Name, 0]),
  233. send(State, $H, []),
  234. {next_state, closing, State#state{reply_to = From}, Timeout};
  235. ready(sync, From, State) ->
  236. #state{timeout = Timeout} = State,
  237. send(State, $S, []),
  238. State2 = State#state{reply = ok, reply_to = From},
  239. {next_state, synchronizing, State2, Timeout}.
  240. %% BindComplete
  241. querying({$2, <<>>}, State) ->
  242. #state{timeout = Timeout, statement = #statement{columns = Columns}} = State,
  243. notify(State, {columns, Columns}),
  244. {next_state, querying, State, Timeout};
  245. %% CloseComplete
  246. querying({$3, <<>>}, State) ->
  247. #state{timeout = Timeout} = State,
  248. {next_state, querying, State, Timeout};
  249. %% RowDescription
  250. querying({$T, <<Count:?int16, Bin/binary>>}, State) ->
  251. #state{timeout = Timeout} = State,
  252. Columns = decode_columns(Count, Bin),
  253. S2 = (State#state.statement)#statement{columns = Columns},
  254. notify(State, {columns, Columns}),
  255. {next_state, querying, State#state{statement = S2}, Timeout};
  256. %% DataRow
  257. querying({$D, <<_Count:?int16, Bin/binary>>}, State) ->
  258. #state{timeout = Timeout, statement = #statement{columns = Columns}} = State,
  259. Data = decode_data(Columns, Bin),
  260. notify(State, {data, Data}),
  261. {next_state, querying, State, Timeout};
  262. %% CommandComplete
  263. querying({$C, Bin}, State) ->
  264. #state{timeout = Timeout} = State,
  265. Complete = decode_complete(Bin),
  266. notify(State, {complete, Complete}),
  267. {next_state, querying, State, Timeout};
  268. %% EmptyQueryResponse
  269. querying({$I, _Bin}, State) ->
  270. #state{timeout = Timeout} = State,
  271. notify(State, {complete, empty}),
  272. {next_state, querying, State, Timeout};
  273. querying(timeout, State) ->
  274. #state{sock = Sock, timeout = Timeout, backend = {Pid, Key}} = State,
  275. pgsql_sock:cancel(Sock, Pid, Key),
  276. {next_state, timeout, State, Timeout};
  277. %% ErrorResponse
  278. querying({error, E}, State) ->
  279. #state{timeout = Timeout} = State,
  280. notify(State, {error, E}),
  281. {next_state, querying, State, Timeout};
  282. %% ReadyForQuery
  283. querying({$Z, <<_Status:8>>}, State) ->
  284. notify(State, done),
  285. {next_state, ready, State#state{reply_to = undefined}}.
  286. %% ParseComplete
  287. parsing({$1, <<>>}, State) ->
  288. #state{timeout = Timeout} = State,
  289. {next_state, describing, State, Timeout};
  290. parsing(timeout, State) ->
  291. #state{timeout = Timeout} = State,
  292. Reply = {error, timeout},
  293. send(State, $S, []),
  294. {next_state, parsing, State#state{reply = Reply}, Timeout};
  295. %% ErrorResponse
  296. parsing({error, E}, State) ->
  297. #state{timeout = Timeout} = State,
  298. Reply = {error, E},
  299. send(State, $S, []),
  300. {next_state, parsing, State#state{reply = Reply}, Timeout};
  301. %% ReadyForQuery
  302. parsing({$Z, <<Status:8>>}, State) ->
  303. #state{reply = Reply, reply_to = Reply_To} = State,
  304. gen_fsm:reply(Reply_To, Reply),
  305. {next_state, ready, State#state{reply = undefined, txstatus = Status}}.
  306. %% BindComplete
  307. binding({$2, <<>>}, State) ->
  308. gen_fsm:reply(State#state.reply_to, ok),
  309. {next_state, ready, State};
  310. binding(timeout, State) ->
  311. #state{timeout = Timeout} = State,
  312. Reply = {error, timeout},
  313. send(State, $S, []),
  314. {next_state, binding, State#state{reply = Reply}, Timeout};
  315. %% ErrorResponse
  316. binding({error, E}, State) ->
  317. #state{timeout = Timeout} = State,
  318. Reply = {error, E},
  319. send(State, $S, []),
  320. {next_state, binding, State#state{reply = Reply}, Timeout};
  321. %% ReadyForQuery
  322. binding({$Z, <<Status:8>>}, State) ->
  323. #state{reply = Reply, reply_to = Reply_To} = State,
  324. gen_fsm:reply(Reply_To, Reply),
  325. {next_state, ready, State#state{reply = undefined, txstatus = Status}}.
  326. %% ParameterDescription
  327. describing({$t, <<_Count:?int16, Bin/binary>>}, State) ->
  328. #state{timeout = Timeout} = State,
  329. Types = [pgsql_types:oid2type(Oid) || <<Oid:?int32>> <= Bin],
  330. S2 = (State#state.statement)#statement{types = Types},
  331. {next_state, describing, State#state{statement = S2}, Timeout};
  332. %% RowDescription
  333. describing({$T, <<Count:?int16, Bin/binary>>}, State) ->
  334. Columns = decode_columns(Count, Bin),
  335. Columns2 = [C#column{format = format(C#column.type)} || C <- Columns],
  336. S2 = (State#state.statement)#statement{columns = Columns2},
  337. gen_fsm:reply(State#state.reply_to, {ok, S2}),
  338. {next_state, ready, State};
  339. %% NoData
  340. describing({$n, <<>>}, State) ->
  341. S2 = (State#state.statement)#statement{columns = []},
  342. gen_fsm:reply(State#state.reply_to, {ok, S2}),
  343. {next_state, ready, State};
  344. describing(timeout, State) ->
  345. #state{timeout = Timeout} = State,
  346. Reply = {error, timeout},
  347. send(State, $S, []),
  348. {next_state, describing, State#state{reply = Reply}, Timeout};
  349. %% ErrorResponse
  350. describing({error, E}, State) ->
  351. #state{timeout = Timeout} = State,
  352. Reply = {error, E},
  353. send(State, $S, []),
  354. {next_state, describing, State#state{reply = Reply}, Timeout};
  355. %% ReadyForQuery
  356. describing({$Z, <<Status:8>>}, State) ->
  357. #state{reply = Reply, reply_to = Reply_To} = State,
  358. gen_fsm:reply(Reply_To, Reply),
  359. {next_state, ready, State#state{reply = undefined, txstatus = Status}}.
  360. %% DataRow
  361. executing({$D, <<_Count:?int16, Bin/binary>>}, State) ->
  362. #state{timeout = Timeout, statement = #statement{columns = Columns}} = State,
  363. Data = decode_data(Columns, Bin),
  364. notify(State, {data, Data}),
  365. {next_state, executing, State, Timeout};
  366. %% PortalSuspended
  367. executing({$s, <<>>}, State) ->
  368. notify(State, suspended),
  369. {next_state, ready, State};
  370. %% CommandComplete
  371. executing({$C, Bin}, State) ->
  372. notify(State, {complete, decode_complete(Bin)}),
  373. {next_state, ready, State};
  374. %% EmptyQueryResponse
  375. executing({$I, _Bin}, State) ->
  376. notify(State, {complete, empty}),
  377. {next_state, ready, State};
  378. executing(timeout, State) ->
  379. #state{sock = Sock, timeout = Timeout, backend = {Pid, Key}} = State,
  380. pgsql_sock:cancel(Sock, Pid, Key),
  381. send(State, $S, []),
  382. {next_state, timeout, State, Timeout};
  383. %% ErrorResponse
  384. executing({error, E}, State) ->
  385. #state{timeout = Timeout} = State,
  386. notify(State, {error, E}),
  387. {next_state, aborted, State, Timeout}.
  388. %% CloseComplete
  389. closing({$3, <<>>}, State) ->
  390. gen_fsm:reply(State#state.reply_to, ok),
  391. {next_state, ready, State};
  392. closing(timeout, State) ->
  393. gen_fsm:reply(State#state.reply_to, {error, timeout}),
  394. {next_state, ready, State};
  395. %% ErrorResponse
  396. closing({error, E}, State) ->
  397. Error = {error, E},
  398. gen_fsm:reply(State#state.reply_to, Error),
  399. {next_state, ready, State}.
  400. %% ErrorResponse
  401. synchronizing({error, E}, State) ->
  402. #state{timeout = Timeout} = State,
  403. Reply = {error, E},
  404. {next_state, synchronizing, State#state{reply = Reply}, Timeout};
  405. synchronizing(timeout, State) ->
  406. #state{timeout = Timeout} = State,
  407. Reply = {error, timeout},
  408. {next_state, synchronizing, State#state{reply = Reply}, Timeout};
  409. %% ReadyForQuery
  410. synchronizing({$Z, <<Status:8>>}, State) ->
  411. #state{reply = Reply, reply_to = Reply_To} = State,
  412. gen_fsm:reply(Reply_To, Reply),
  413. {next_state, ready, State#state{reply = undefined, txstatus = Status}}.
  414. timeout({$Z, <<Status:8>>}, State) ->
  415. notify(State, timeout),
  416. {next_state, ready, State#state{txstatus = Status}};
  417. timeout(timeout, State) ->
  418. {stop, timeout, State};
  419. %% ignore events that occur after timeout
  420. timeout(_Event, State) ->
  421. #state{timeout = Timeout} = State,
  422. {next_state, timeout, State, Timeout}.
  423. aborted(sync, From, State) ->
  424. #state{timeout = Timeout} = State,
  425. send(State, $S, []),
  426. State2 = State#state{reply = ok, reply_to = From},
  427. {next_state, synchronizing, State2, Timeout};
  428. aborted(_Msg, _From, State) ->
  429. #state{timeout = Timeout} = State,
  430. {reply, {error, sync_required}, aborted, State, Timeout}.
  431. %% -- internal functions --
  432. %% decode data
  433. decode_data(Columns, Bin) ->
  434. decode_data(Columns, Bin, []).
  435. decode_data([], _Bin, Acc) ->
  436. list_to_tuple(lists:reverse(Acc));
  437. decode_data([_C | T], <<-1:?int32, Rest/binary>>, Acc) ->
  438. decode_data(T, Rest, [null | Acc]);
  439. decode_data([C | T], <<Len:?int32, Value:Len/binary, Rest/binary>>, Acc) ->
  440. case C of
  441. #column{type = Type, format = 1} -> Value2 = pgsql_binary:decode(Type, Value);
  442. #column{} -> Value2 = Value
  443. end,
  444. decode_data(T, Rest, [Value2 | Acc]).
  445. %% decode column information
  446. decode_columns(Count, Bin) ->
  447. decode_columns(Count, Bin, []).
  448. decode_columns(0, _Bin, Acc) ->
  449. lists:reverse(Acc);
  450. decode_columns(N, Bin, Acc) ->
  451. {Name, Rest} = pgsql_sock:decode_string(Bin),
  452. <<_Table_Oid:?int32, _Attrib_Num:?int16, Type_Oid:?int32,
  453. Size:?int16, Modifier:?int32, Format:?int16, Rest2/binary>> = Rest,
  454. Desc = #column{
  455. name = Name,
  456. type = pgsql_types:oid2type(Type_Oid),
  457. size = Size,
  458. modifier = Modifier,
  459. format = Format},
  460. decode_columns(N - 1, Rest2, [Desc | Acc]).
  461. %% decode command complete msg
  462. decode_complete(<<"SELECT", 0>>) -> select;
  463. decode_complete(<<"SELECT", _/binary>>) -> select;
  464. decode_complete(<<"BEGIN", 0>>) -> 'begin';
  465. decode_complete(<<"ROLLBACK", 0>>) -> rollback;
  466. decode_complete(Bin) ->
  467. {Str, _} = pgsql_sock:decode_string(Bin),
  468. case string:tokens(binary_to_list(Str), " ") of
  469. ["INSERT", _Oid, Rows] -> {insert, list_to_integer(Rows)};
  470. ["UPDATE", Rows] -> {update, list_to_integer(Rows)};
  471. ["DELETE", Rows] -> {delete, list_to_integer(Rows)};
  472. ["MOVE", Rows] -> {move, list_to_integer(Rows)};
  473. ["FETCH", Rows] -> {fetch, list_to_integer(Rows)};
  474. [Type | _Rest] -> pgsql_sock:lower_atom(Type)
  475. end.
  476. %% encode types
  477. encode_types(Types) ->
  478. encode_types(Types, 0, <<>>).
  479. encode_types([], Count, Acc) ->
  480. <<Count:?int16, Acc/binary>>;
  481. encode_types([Type | T], Count, Acc) ->
  482. case Type of
  483. undefined -> Oid = 0;
  484. _Any -> Oid = pgsql_types:type2oid(Type)
  485. end,
  486. encode_types(T, Count + 1, <<Acc/binary, Oid:?int32>>).
  487. %% encode column formats
  488. encode_formats(Columns) ->
  489. encode_formats(Columns, 0, <<>>).
  490. encode_formats([], Count, Acc) ->
  491. <<Count:?int16, Acc/binary>>;
  492. encode_formats([#column{format = Format} | T], Count, Acc) ->
  493. encode_formats(T, Count + 1, <<Acc/binary, Format:?int16>>).
  494. format(Type) ->
  495. case pgsql_binary:supports(Type) of
  496. true -> 1;
  497. false -> 0
  498. end.
  499. %% encode parameters
  500. encode_parameters(Parameters) ->
  501. encode_parameters(Parameters, 0, <<>>, <<>>).
  502. encode_parameters([], Count, Formats, Values) ->
  503. <<Count:?int16, Formats/binary, Count:?int16, Values/binary>>;
  504. encode_parameters([P | T], Count, Formats, Values) ->
  505. {Format, Value} = encode_parameter(P),
  506. Formats2 = <<Formats/binary, Format:?int16>>,
  507. Values2 = <<Values/binary, Value/binary>>,
  508. encode_parameters(T, Count + 1, Formats2, Values2).
  509. %% encode parameter
  510. encode_parameter({Type, Value}) ->
  511. case pgsql_binary:encode(Type, Value) of
  512. Bin when is_binary(Bin) -> {1, Bin};
  513. {error, unsupported} -> encode_parameter(Value)
  514. end;
  515. encode_parameter(A) when is_atom(A) -> {0, encode_list(atom_to_list(A))};
  516. encode_parameter(B) when is_binary(B) -> {0, <<(byte_size(B)):?int32, B/binary>>};
  517. encode_parameter(I) when is_integer(I) -> {0, encode_list(integer_to_list(I))};
  518. encode_parameter(F) when is_float(F) -> {0, encode_list(float_to_list(F))};
  519. encode_parameter(L) when is_list(L) -> {0, encode_list(L)}.
  520. encode_list(L) ->
  521. Bin = list_to_binary(L),
  522. <<(byte_size(Bin)):?int32, Bin/binary>>.
  523. notify(#state{reply_to = {Pid, _Tag}}, Msg) ->
  524. Pid ! {pgsql, self(), Msg}.
  525. notify_async(#state{async = Pid}, Msg) ->
  526. case is_pid(Pid) of
  527. true -> Pid ! {pgsql, self(), Msg};
  528. false -> false
  529. end.
  530. to_binary(B) when is_binary(B) -> B;
  531. to_binary(L) when is_list(L) -> list_to_binary(L).
  532. hex(Bin) ->
  533. HChar = fun(N) when N < 10 -> $0 + N;
  534. (N) when N < 16 -> $W + N
  535. end,
  536. <<<<(HChar(H)), (HChar(L))>> || <<H:4, L:4>> <= Bin>>.
  537. %% send data to server
  538. send(#state{sock = Sock}, Type, Data) ->
  539. pgsql_sock:send(Sock, Type, Data).