pgsql_connection.erl 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648
  1. %%% Copyright (C) 2008 - Will Glozer. All rights reserved.
  2. -module(pgsql_connection).
  3. -behavior(gen_fsm).
  4. -export([start_link/0, stop/1, connect/5, get_parameter/2]).
  5. -export([squery/2, equery/3]).
  6. -export([parse/4, bind/4, execute/4, describe/3]).
  7. -export([close/3, sync/1]).
  8. -export([init/1, handle_event/3, handle_sync_event/4]).
  9. -export([handle_info/3, terminate/3, code_change/4]).
  10. -export([read/3]).
  11. -export([startup/3, auth/2, initializing/2, ready/2, ready/3]).
  12. -export([querying/2, parsing/2, binding/2, describing/2]).
  13. -export([executing/2, closing/2, synchronizing/2]).
  14. -include("pgsql.hrl").
  15. -record(state, {
  16. reader,
  17. sock,
  18. parameters = [],
  19. reply,
  20. reply_to,
  21. backend,
  22. statement,
  23. txstatus}).
  24. -define(int16, 1/big-signed-unit:16).
  25. -define(int32, 1/big-signed-unit:32).
  26. %% -- client interface --
  27. start_link() ->
  28. gen_fsm:start_link(?MODULE, [], []).
  29. stop(C) ->
  30. gen_fsm:send_all_state_event(C, stop).
  31. connect(C, Host, Username, Password, Opts) ->
  32. gen_fsm:sync_send_event(C, {connect, Host, Username, Password, Opts}).
  33. get_parameter(C, Name) ->
  34. gen_fsm:sync_send_event(C, {get_parameter, to_binary(Name)}).
  35. squery(C, Sql) ->
  36. gen_fsm:sync_send_event(C, {squery, Sql}).
  37. equery(C, Statement, Parameters) ->
  38. gen_fsm:sync_send_event(C, {equery, Statement, Parameters}).
  39. parse(C, Name, Sql, Types) ->
  40. gen_fsm:sync_send_event(C, {parse, Name, Sql, Types}).
  41. bind(C, Statement, PortalName, Parameters) ->
  42. gen_fsm:sync_send_event(C, {bind, Statement, PortalName, Parameters}).
  43. execute(C, Statement, PortalName, MaxRows) ->
  44. gen_fsm:sync_send_event(C, {execute, Statement, PortalName, MaxRows}).
  45. describe(C, Type, Name) ->
  46. gen_fsm:sync_send_event(C, {describe, Type, Name}).
  47. close(C, Type, Name) ->
  48. gen_fsm:sync_send_event(C, {close, Type, Name}).
  49. sync(C) ->
  50. gen_fsm:sync_send_event(C, sync).
  51. %% -- gen_fsm implementation --
  52. init([]) ->
  53. process_flag(trap_exit, true),
  54. {ok, startup, #state{}}.
  55. handle_event({notice, Notice}, State_Name, State) ->
  56. notify(State, {notice, Notice}),
  57. {next_state, State_Name, State};
  58. handle_event({parameter_status, Name, Value}, State_Name, State) ->
  59. Parameters2 = lists:keystore(Name, 1, State#state.parameters, {Name, Value}),
  60. {next_state, State_Name, State#state{parameters = Parameters2}};
  61. handle_event(stop, _State_Name, State) ->
  62. {stop, normal, State};
  63. handle_event(Event, _State_Name, State) ->
  64. {stop, {unsupported_event, Event}, State}.
  65. handle_sync_event(Event, _From, _State_Name, State) ->
  66. {stop, {unsupported_sync_event, Event}, State}.
  67. handle_info({'EXIT', Pid, Reason}, _State_Name, State = #state{reader = Pid}) ->
  68. {stop, Reason, State};
  69. handle_info(Info, _State_Name, State) ->
  70. {stop, {unsupported_info, Info}, State}.
  71. terminate(_Reason, _State_Name, State = #state{sock = Sock})
  72. when Sock =/= undefined ->
  73. send(State, $X, []),
  74. gen_tcp:close(Sock);
  75. terminate(_Reason, _State_Name, _State) ->
  76. ok.
  77. code_change(_Old_Vsn, State_Name, State, _Extra) ->
  78. {ok, State_Name, State}.
  79. %% -- states --
  80. startup({connect, Host, Username, Password, Opts}, From, State) ->
  81. Port = proplists:get_value(port, Opts, 5432),
  82. Sock_Opts = [{active, false}, {packet, raw}, binary],
  83. case gen_tcp:connect(Host, Port, Sock_Opts) of
  84. {ok, Sock} ->
  85. Reader = spawn_link(?MODULE, read, [self(), Sock, <<>>]),
  86. Opts2 = ["user", 0, Username, 0],
  87. case proplists:get_value(database, Opts, undefined) of
  88. undefined -> Opts3 = Opts2;
  89. Database -> Opts3 = [Opts2 | ["database", 0, Database, 0]]
  90. end,
  91. put(username, Username),
  92. put(password, Password),
  93. State2 = State#state{reader = Reader,
  94. sock = Sock,
  95. reply_to = From},
  96. send(State2, [<<196608:32>>, Opts3, 0]),
  97. {next_state, auth, State2};
  98. Error ->
  99. {stop, normal, Error, State}
  100. end.
  101. %% AuthenticationOk
  102. auth({$R, <<0:?int32>>}, State) ->
  103. {next_state, initializing, State};
  104. %% AuthenticationCleartextPassword
  105. auth({$R, <<3:?int32>>}, State) ->
  106. send(State, $p, [get(password), 0]),
  107. {next_state, auth, State};
  108. %% AuthenticationMD5Password
  109. auth({$R, <<5:?int32, Salt:4/binary>>}, State) ->
  110. Digest1 = hex(crypto:md5([get(password), get(username)])),
  111. Str = ["md5", hex(crypto:md5([Digest1, Salt])), 0],
  112. send(State, $p, Str),
  113. {next_state, auth, State};
  114. auth({$R, <<M:?int32, _/binary>>}, State) ->
  115. case M of
  116. 2 -> Method = kerberosV5;
  117. 4 -> Method = crypt;
  118. 6 -> Method = scm;
  119. 7 -> Method = gss;
  120. 8 -> Method = sspi;
  121. _ -> Method = unknown
  122. end,
  123. Error = {error, {unsupported_auth_method, Method}},
  124. gen_fsm:reply(State#state.reply_to, Error),
  125. {stop, normal, State};
  126. %% ErrorResponse
  127. auth({$E, Bin}, State) ->
  128. Error = decode_error(Bin),
  129. case Error#error.code of
  130. <<"28000">> -> Why = invalid_authorization_specification;
  131. Any -> Why = Any
  132. end,
  133. gen_fsm:reply(State#state.reply_to, {error, Why}),
  134. {stop, normal, State}.
  135. %% BackendKeyData
  136. initializing({$K, <<Pid:?int32, Key:?int32>>}, State) ->
  137. State2 = State#state{backend = {Pid, Key}},
  138. {next_state, initializing, State2};
  139. %% ErrorResponse
  140. initializing({$E, Bin}, State) ->
  141. Error = decode_error(Bin),
  142. case Error#error.code of
  143. <<"28000">> -> Why = invalid_authorization_specification;
  144. Any -> Why = Any
  145. end,
  146. gen_fsm:reply(State#state.reply_to, {error, Why}),
  147. {stop, normal, State};
  148. %% ReadyForQuery
  149. initializing({$Z, <<Status:8>>}, State) ->
  150. erase(username),
  151. erase(password),
  152. gen_fsm:reply(State#state.reply_to, {ok, self()}),
  153. {next_state, ready, State#state{txstatus = Status}}.
  154. ready(Data, State) ->
  155. error_logger:info_msg("unexpected msg when ready: ~p~n", Data),
  156. {next_state, ready, State}.
  157. %% execute simple query
  158. ready({squery, Sql}, From, State) ->
  159. send(State, $Q, [Sql, 0]),
  160. State2 = State#state{statement = #statement{}, reply_to = From},
  161. {reply, ok, querying, State2};
  162. %% execute extended query
  163. ready({equery, Statement, Parameters}, From, State) ->
  164. #statement{name = StatementName, columns = Columns} = Statement,
  165. Bin1 = encode_parameters(Parameters),
  166. Bin2 = encode_formats(Columns),
  167. send(State, $B, ["", 0, StatementName, 0, Bin1, Bin2]),
  168. send(State, $E, ["", 0, <<0:?int32>>]),
  169. send(State, $C, [$S, "", 0]),
  170. send(State, $S, []),
  171. State2 = State#state{statement = Statement, reply_to = From},
  172. {reply, ok, querying, State2};
  173. ready({get_parameter, Name}, _From, State) ->
  174. case lists:keysearch(Name, 1, State#state.parameters) of
  175. {value, {Name, Value}} -> Value;
  176. false -> Value = undefined
  177. end,
  178. {reply, {ok, Value}, ready, State};
  179. ready({parse, Name, Sql, Types}, From, State) ->
  180. Bin = encode_types(Types),
  181. send(State, $P, [Name, 0, Sql, 0, Bin]),
  182. send(State, $D, [$S, Name, 0]),
  183. send(State, $H, []),
  184. S = #statement{name = Name},
  185. {next_state, parsing, State#state{statement = S, reply_to = From}};
  186. ready({bind, Statement, PortalName, Parameters}, From, State) ->
  187. #statement{name = StatementName, columns = Columns, types = Types} = Statement,
  188. Typed_Parameters = lists:zip(Types, Parameters),
  189. Bin1 = encode_parameters(Typed_Parameters),
  190. Bin2 = encode_formats(Columns),
  191. send(State, $B, [PortalName, 0, StatementName, 0, Bin1, Bin2]),
  192. send(State, $H, []),
  193. {next_state, binding, State#state{statement = Statement, reply_to = From}};
  194. ready({execute, Statement, PortalName, MaxRows}, From, State) ->
  195. send(State, $E, [PortalName, 0, <<MaxRows:?int32>>]),
  196. send(State, $H, []),
  197. {reply, ok, executing, State#state{statement = Statement, reply_to = From}};
  198. ready({describe, Type, Name}, From, State) ->
  199. case Type of
  200. statement -> Type2 = $S;
  201. portal -> Type2 = $P
  202. end,
  203. send(State, $D, [Type2, Name, 0]),
  204. send(State, $H, []),
  205. {next_state, describing, State#state{reply_to = From}};
  206. ready({close, Type, Name}, From, State) ->
  207. case Type of
  208. statement -> Type2 = $S;
  209. portal -> Type2 = $P
  210. end,
  211. send(State, $C, [Type2, Name, 0]),
  212. send(State, $H, []),
  213. {next_state, closing, State#state{reply_to = From}};
  214. ready(sync, From, State) ->
  215. send(State, $S, []),
  216. {next_state, synchronizing, State#state{reply = ok, reply_to = From}}.
  217. %% BindComplete
  218. querying({$2, <<>>}, State) ->
  219. #state{statement = #statement{columns = Columns}} = State,
  220. notify(State, {columns, Columns}),
  221. {next_state, querying, State};
  222. %% CloseComplete
  223. querying({$3, <<>>}, State) ->
  224. {next_state, querying, State};
  225. %% RowDescription
  226. querying({$T, <<Count:?int16, Bin/binary>>}, State) ->
  227. Columns = decode_columns(Count, Bin),
  228. S2 = (State#state.statement)#statement{columns = Columns},
  229. notify(State, {columns, Columns}),
  230. {next_state, querying, State#state{statement = S2}};
  231. %% DataRow
  232. querying({$D, <<_Count:?int16, Bin/binary>>}, State) ->
  233. #state{statement = #statement{columns = Columns}} = State,
  234. Data = decode_data(Columns, Bin),
  235. notify(State, {data, Data}),
  236. {next_state, querying, State};
  237. %% CommandComplete
  238. querying({$C, Bin}, State) ->
  239. Complete = decode_complete(Bin),
  240. notify(State, {complete, Complete}),
  241. {next_state, querying, State};
  242. %% EmptyQueryResponse
  243. querying({$I, _Bin}, State) ->
  244. notify(State, {complete, empty}),
  245. {next_state, querying, State};
  246. %% ErrorResponse
  247. querying({$E, Bin}, State) ->
  248. Error = decode_error(Bin),
  249. notify(State, {error, Error}),
  250. {next_state, querying, State};
  251. %% ReadyForQuery
  252. querying({$Z, <<_Status:8>>}, State) ->
  253. notify(State, done),
  254. {next_state, ready, State#state{reply_to = undefined}}.
  255. %% ParseComplete
  256. parsing({$1, <<>>}, State) ->
  257. {next_state, describing, State};
  258. %% ErrorResponse
  259. parsing({$E, Bin}, State) ->
  260. Reply = {error, decode_error(Bin)},
  261. send(State, $S, []),
  262. {next_state, parsing, State#state{reply = Reply}};
  263. %% ReadyForQuery
  264. parsing({$Z, <<Status:8>>}, State) ->
  265. #state{reply = Reply, reply_to = Reply_To} = State,
  266. gen_fsm:reply(Reply_To, Reply),
  267. {next_state, ready, State#state{reply = undefined, txstatus = Status}}.
  268. %% BindComplete
  269. binding({$2, <<>>}, State) ->
  270. gen_fsm:reply(State#state.reply_to, ok),
  271. {next_state, ready, State};
  272. %% ErrorResponse
  273. binding({$E, Bin}, State) ->
  274. Reply = {error, decode_error(Bin)},
  275. send(State, $S, []),
  276. {next_state, binding, State#state{reply = Reply}};
  277. %% ReadyForQuery
  278. binding({$Z, <<Status:8>>}, State) ->
  279. #state{reply = Reply, reply_to = Reply_To} = State,
  280. gen_fsm:reply(Reply_To, Reply),
  281. {next_state, ready, State#state{reply = undefined, txstatus = Status}}.
  282. %% ParameterDescription
  283. describing({$t, <<_Count:?int16, Bin/binary>>}, State) ->
  284. Types = [pgsql_types:oid2type(Oid) || <<Oid:?int32>> <= Bin],
  285. S2 = (State#state.statement)#statement{types = Types},
  286. {next_state, describing, State#state{statement = S2}};
  287. %% RowDescription
  288. describing({$T, <<Count:?int16, Bin/binary>>}, State) ->
  289. Columns = decode_columns(Count, Bin),
  290. Columns2 = [C#column{format = format(C#column.type)} || C <- Columns],
  291. S2 = (State#state.statement)#statement{columns = Columns2},
  292. gen_fsm:reply(State#state.reply_to, {ok, S2}),
  293. {next_state, ready, State};
  294. %% NoData
  295. describing({$n, <<>>}, State) ->
  296. S2 = (State#state.statement)#statement{columns = []},
  297. gen_fsm:reply(State#state.reply_to, {ok, S2}),
  298. {next_state, ready, State};
  299. %% ErrorResponse
  300. describing({$E, Bin}, State) ->
  301. Reply = {error, decode_error(Bin)},
  302. send(State, $S, []),
  303. {next_state, describing, State#state{reply = Reply}};
  304. %% ReadyForQuery
  305. describing({$Z, <<Status:8>>}, State) ->
  306. #state{reply = Reply, reply_to = Reply_To} = State,
  307. gen_fsm:reply(Reply_To, Reply),
  308. {next_state, ready, State#state{reply = undefined, txstatus = Status}}.
  309. %% DataRow
  310. executing({$D, <<_Count:?int16, Bin/binary>>}, State) ->
  311. #state{statement = #statement{columns = Columns}} = State,
  312. Data = decode_data(Columns, Bin),
  313. notify(State, {data, Data}),
  314. {next_state, executing, State};
  315. %% PortalSuspended
  316. executing({$s, <<>>}, State) ->
  317. notify(State, suspended),
  318. {next_state, ready, State};
  319. %% CommandComplete
  320. executing({$C, Bin}, State) ->
  321. notify(State, {complete, decode_complete(Bin)}),
  322. {next_state, ready, State};
  323. %% EmptyQueryResponse
  324. executing({$I, _Bin}, State) ->
  325. notify(State, {complete, empty}),
  326. {next_state, ready, State};
  327. %% ErrorResponse
  328. executing({$E, Bin}, State) ->
  329. notify(State, {error, decode_error(Bin)}),
  330. {next_state, executing, State}.
  331. %% CloseComplete
  332. closing({$3, <<>>}, State) ->
  333. gen_fsm:reply(State#state.reply_to, ok),
  334. {next_state, ready, State};
  335. %% ErrorResponse
  336. closing({$E, Bin}, State) ->
  337. Error = {error, decode_error(Bin)},
  338. gen_fsm:reply(State#state.reply_to, Error),
  339. {next_state, ready, State}.
  340. %% ErrorResponse
  341. synchronizing({$E, Bin}, State) ->
  342. Reply = {error, decode_error(Bin)},
  343. {next_state, synchronizing, State#state{reply = Reply}};
  344. %% ReadyForQuery
  345. synchronizing({$Z, <<Status:8>>}, State) ->
  346. #state{reply = Reply, reply_to = Reply_To} = State,
  347. gen_fsm:reply(Reply_To, Reply),
  348. {next_state, ready, State#state{reply = undefined, txstatus = Status}}.
  349. %% -- internal functions --
  350. %% decode a single null-terminated string
  351. decode_string(Bin) ->
  352. decode_string(Bin, <<>>).
  353. decode_string(<<0, Rest/binary>>, Str) ->
  354. {Str, Rest};
  355. decode_string(<<C, Rest/binary>>, Str) ->
  356. decode_string(Rest, <<Str/binary, C>>).
  357. %% decode multiple null-terminated string
  358. decode_strings(Bin) ->
  359. decode_strings(Bin, []).
  360. decode_strings(<<>>, Acc) ->
  361. lists:reverse(Acc);
  362. decode_strings(Bin, Acc) ->
  363. {Str, Rest} = decode_string(Bin),
  364. decode_strings(Rest, [Str | Acc]).
  365. %% decode field
  366. decode_fields(Bin) ->
  367. decode_fields(Bin, []).
  368. decode_fields(<<0>>, Acc) ->
  369. Acc;
  370. decode_fields(<<Type:8, Rest/binary>>, Acc) ->
  371. {Str, Rest2} = decode_string(Rest),
  372. decode_fields(Rest2, [{Type, Str} | Acc]).
  373. %% decode data
  374. decode_data(Columns, Bin) ->
  375. decode_data(Columns, Bin, []).
  376. decode_data([], _Bin, Acc) ->
  377. list_to_tuple(lists:reverse(Acc));
  378. decode_data([_C | T], <<-1:?int32, Rest/binary>>, Acc) ->
  379. decode_data(T, Rest, [null | Acc]);
  380. decode_data([C | T], <<Len:?int32, Value:Len/binary, Rest/binary>>, Acc) ->
  381. case C of
  382. #column{type = Type, format = 1} -> Value2 = pgsql_binary:decode(Type, Value);
  383. #column{} -> Value2 = Value
  384. end,
  385. decode_data(T, Rest, [Value2 | Acc]).
  386. %% decode column information
  387. decode_columns(Count, Bin) ->
  388. decode_columns(Count, Bin, []).
  389. decode_columns(0, _Bin, Acc) ->
  390. lists:reverse(Acc);
  391. decode_columns(N, Bin, Acc) ->
  392. {Name, Rest} = decode_string(Bin),
  393. <<_Table_Oid:?int32, _Attrib_Num:?int16, Type_Oid:?int32,
  394. Size:?int16, Modifier:?int32, Format:?int16, Rest2/binary>> = Rest,
  395. Desc = #column{
  396. name = Name,
  397. type = pgsql_types:oid2type(Type_Oid),
  398. size = Size,
  399. modifier = Modifier,
  400. format = Format},
  401. decode_columns(N - 1, Rest2, [Desc | Acc]).
  402. %% decode command complete msg
  403. decode_complete(<<"SELECT", 0>>) -> select;
  404. decode_complete(<<"BEGIN", 0>>) -> 'begin';
  405. decode_complete(<<"ROLLBACK", 0>>) -> rollback;
  406. decode_complete(Bin) ->
  407. {Str, _} = decode_string(Bin),
  408. case string:tokens(binary_to_list(Str), " ") of
  409. ["INSERT", _Oid, Rows] -> {insert, list_to_integer(Rows)};
  410. ["UPDATE", Rows] -> {update, list_to_integer(Rows)};
  411. ["DELETE", Rows] -> {delete, list_to_integer(Rows)};
  412. ["MOVE", Rows] -> {move, list_to_integer(Rows)};
  413. ["FETCH", _Rows] -> fetch;
  414. [Type | _Rest] -> lower_atom(Type)
  415. end.
  416. %% decode ErrorResponse
  417. decode_error(Bin) ->
  418. Fields = decode_fields(Bin),
  419. Error = #error{
  420. severity = lower_atom(proplists:get_value($S, Fields)),
  421. code = proplists:get_value($C, Fields),
  422. message = proplists:get_value($M, Fields),
  423. extra = decode_error_extra(Fields)},
  424. Error.
  425. decode_error_extra(Fields) ->
  426. Types = [{$D, detail}, {$H, hint}, {$P, position}],
  427. decode_error_extra(Types, Fields, []).
  428. decode_error_extra([], _Fields, Extra) ->
  429. Extra;
  430. decode_error_extra([{Type, Name} | T], Fields, Extra) ->
  431. case proplists:get_value(Type, Fields) of
  432. undefined -> decode_error_extra(T, Fields, Extra);
  433. Value -> decode_error_extra(T, Fields, [{Name, Value} | Extra])
  434. end.
  435. %% encode types
  436. encode_types(Types) ->
  437. encode_types(Types, 0, <<>>).
  438. encode_types([], Count, Acc) ->
  439. <<Count:?int16, Acc/binary>>;
  440. encode_types([Type | T], Count, Acc) ->
  441. case Type of
  442. undefined -> Oid = 0;
  443. _Any -> Oid = pgsql_types:type2oid(Type)
  444. end,
  445. encode_types(T, Count + 1, <<Acc/binary, Oid:?int32>>).
  446. %% encode column formats
  447. encode_formats(Columns) ->
  448. encode_formats(Columns, 0, <<>>).
  449. encode_formats([], Count, Acc) ->
  450. <<Count:?int16, Acc/binary>>;
  451. encode_formats([#column{format = Format} | T], Count, Acc) ->
  452. encode_formats(T, Count + 1, <<Acc/binary, Format:?int16>>).
  453. format(Type) ->
  454. case pgsql_binary:supports(Type) of
  455. true -> 1;
  456. false -> 0
  457. end.
  458. %% encode parameters
  459. encode_parameters(Parameters) ->
  460. encode_parameters(Parameters, 0, <<>>, <<>>).
  461. encode_parameters([], Count, Formats, Values) ->
  462. <<Count:?int16, Formats/binary, Count:?int16, Values/binary>>;
  463. encode_parameters([P | T], Count, Formats, Values) ->
  464. {Format, Value} = encode_parameter(P),
  465. Formats2 = <<Formats/binary, Format:?int16>>,
  466. Values2 = <<Values/binary, Value/binary>>,
  467. encode_parameters(T, Count + 1, Formats2, Values2).
  468. %% encode parameter
  469. encode_parameter({Type, Value}) ->
  470. case pgsql_binary:encode(Type, Value) of
  471. Bin when is_binary(Bin) -> {1, Bin};
  472. {error, unsupported} -> {0, Value}
  473. end;
  474. encode_parameter(null) -> {1, pgsql_binary:encode(null, null)};
  475. encode_parameter(A) when is_atom(A) -> {0, encode_list(atom_to_list(A))};
  476. encode_parameter(B) when is_binary(B) -> {0, <<(byte_size(B)):?int32, B/binary>>};
  477. encode_parameter(I) when is_integer(I) -> {0, encode_list(integer_to_list(I))};
  478. encode_parameter(F) when is_float(F) -> {0, encode_list(float_to_list(F))};
  479. encode_parameter(L) when is_list(L) -> {0, encode_list(L)}.
  480. encode_list(L) ->
  481. Bin = list_to_binary(L),
  482. <<(byte_size(Bin)):?int32, Bin/binary>>.
  483. notify(#state{reply_to = {Pid, _Tag}}, Msg) ->
  484. Pid ! {pgsql, self(), Msg}.
  485. lower_atom(Str) when is_binary(Str) ->
  486. lower_atom(binary_to_list(Str));
  487. lower_atom(Str) when is_list(Str) ->
  488. list_to_atom(string:to_lower(Str)).
  489. to_binary(B) when is_binary(B) -> B;
  490. to_binary(L) when is_list(L) -> list_to_binary(L).
  491. hex(Bin) ->
  492. HChar = fun(N) when N < 10 -> $0 + N;
  493. (N) when N < 16 -> $W + N
  494. end,
  495. <<<<(HChar(H)), (HChar(L))>> || <<H:4, L:4>> <= Bin>>.
  496. %% send data to server
  497. send(#state{sock = Sock}, Type, Data) ->
  498. Bin = iolist_to_binary(Data),
  499. gen_tcp:send(Sock, <<Type:8, (byte_size(Bin) + 4):?int32, Bin/binary>>).
  500. send(#state{sock = Sock}, Data) ->
  501. Bin = iolist_to_binary(Data),
  502. gen_tcp:send(Sock, <<(byte_size(Bin) + 4):?int32, Bin/binary>>).
  503. %% -- socket read loop --
  504. read(Fsm, Sock, Tail) ->
  505. case gen_tcp:recv(Sock, 0) of
  506. {ok, Bin} -> decode(Fsm, Sock, <<Tail/binary, Bin/binary>>);
  507. Error -> exit(Error)
  508. end.
  509. decode(Fsm, Sock, <<Type:8, Len:?int32, Rest/binary>> = Bin) ->
  510. Len2 = Len - 4,
  511. case Rest of
  512. <<Data:Len2/binary, Tail/binary>> when Type == $N ->
  513. gen_fsm:send_all_state_event(Fsm, {notice, decode_error(Data)}),
  514. decode(Fsm, Sock, Tail);
  515. <<Data:Len2/binary, Tail/binary>> when Type == $S ->
  516. [Name, Value] = decode_strings(Data),
  517. gen_fsm:send_all_state_event(Fsm, {parameter_status, Name, Value}),
  518. decode(Fsm, Sock, Tail);
  519. <<Data:Len2/binary, Tail/binary>> ->
  520. gen_fsm:send_event(Fsm, {Type, Data}),
  521. decode(Fsm, Sock, Tail);
  522. _Other ->
  523. ?MODULE:read(Fsm, Sock, Bin)
  524. end;
  525. decode(Fsm, Sock, Bin) ->
  526. ?MODULE:read(Fsm, Sock, Bin).