epgsql_incremental.erl 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. %%% Copyright (C) 2008 - Will Glozer. All rights reserved.
  2. %%% Copyright (C) 2011 - Anton Lebedevich. All rights reserved.
  3. %%%
  4. %%% Emulates original epgsql API over epgsqli for original tests
  5. -module(epgsql_incremental).
  6. -export([connect/1, connect/2, connect/3, connect/4, close/1]).
  7. -export([get_parameter/2, set_notice_receiver/2, get_cmd_status/1, get_backend_pid/1,
  8. squery/2, equery/2, equery/3]).
  9. -export([prepared_query/3]).
  10. -export([parse/2, parse/3, parse/4, describe/2, describe/3]).
  11. -export([bind/3, bind/4, execute/2, execute/3, execute/4, execute_batch/2, execute_batch/3]).
  12. -export([close/2, close/3, sync/1]).
  13. -include("epgsql.hrl").
  14. %% -- client interface --
  15. connect(Opts) ->
  16. Ref = epgsqli:connect(Opts),
  17. await_connect(Ref, Opts).
  18. connect(Host, Opts) ->
  19. Ref = epgsqli:connect(Host, Opts),
  20. await_connect(Ref, Opts).
  21. connect(Host, Username, Opts) ->
  22. Ref = epgsqli:connect(Host, Username, Opts),
  23. await_connect(Ref, Opts).
  24. connect(Host, Username, Password, Opts) ->
  25. Ref = epgsqli:connect(Host, Username, Password, Opts),
  26. await_connect(Ref, Opts).
  27. await_connect(Ref, Opts0) ->
  28. Opts = epgsql:to_map(Opts0),
  29. Timeout = maps:get(timeout, Opts, 5000),
  30. receive
  31. {C, Ref, connected} ->
  32. {ok, C};
  33. {_C, Ref, Error = {error, _}} ->
  34. Error;
  35. {epgsql, C, socket_passive} ->
  36. ok = epgsql:activate(C),
  37. await_connect(Ref, Opts0)
  38. after Timeout ->
  39. error(timeout)
  40. end.
  41. close(C) ->
  42. epgsqli:close(C).
  43. get_parameter(C, Name) ->
  44. epgsqli:get_parameter(C, Name).
  45. set_notice_receiver(C, PidOrName) ->
  46. epgsqli:set_notice_receiver(C, PidOrName).
  47. get_cmd_status(C) ->
  48. epgsqli:get_cmd_status(C).
  49. get_backend_pid(C) ->
  50. epgsqli:get_backend_pid(C).
  51. squery(C, Sql) ->
  52. Ref = epgsqli:squery(C, Sql),
  53. case receive_results(C, Ref, []) of
  54. [Result] -> Result;
  55. Results -> Results
  56. end.
  57. equery(C, Sql) ->
  58. equery(C, Sql, []).
  59. equery(C, Sql, Parameters) ->
  60. case parse(C, Sql) of
  61. {ok, #statement{types = Types} = S} ->
  62. Typed_Parameters = lists:zip(Types, Parameters),
  63. Ref = epgsqli:equery(C, S, Typed_Parameters),
  64. receive_result(C, Ref, undefined);
  65. Error ->
  66. Error
  67. end.
  68. prepared_query(C, #statement{types = Types} = Stmt, Parameters) ->
  69. TypedParameters = lists:zip(Types, Parameters),
  70. Ref = epgsqli:prepared_query(C, Stmt, TypedParameters),
  71. receive_result(C, Ref, undefined);
  72. prepared_query(C, Name, Parameters) ->
  73. case describe(C, statement, Name) of
  74. {ok, S} ->
  75. prepared_query(C, S, Parameters);
  76. Error ->
  77. Error
  78. end.
  79. %% parse
  80. parse(C, Sql) ->
  81. parse(C, "", Sql, []).
  82. parse(C, Sql, Types) ->
  83. parse(C, "", Sql, Types).
  84. parse(C, Name, Sql, Types) ->
  85. Ref = epgsqli:parse(C, Name, Sql, Types),
  86. sync_on_error(C, receive_describe(C, Ref, #statement{name = Name})).
  87. %% bind
  88. bind(C, Statement, Parameters) ->
  89. bind(C, Statement, "", Parameters).
  90. bind(C, Statement, PortalName, Parameters) ->
  91. Ref = epgsqli:bind(C, Statement, PortalName, Parameters),
  92. sync_on_error(C, receive_atom(C, Ref, ok, ok)).
  93. %% execute
  94. execute(C, S) ->
  95. execute(C, S, "", 0).
  96. execute(C, S, N) ->
  97. execute(C, S, "", N).
  98. execute(C, S, PortalName, N) ->
  99. Ref = epgsqli:execute(C, S, PortalName, N),
  100. receive_extended_result(C, Ref, []).
  101. execute_batch(C, Batch) ->
  102. Ref = epgsqli:execute_batch(C, Batch),
  103. receive_extended_results(C, Ref, []).
  104. execute_batch(C, #statement{columns = Cols} = Stmt, Batch) ->
  105. Ref = epgsqli:execute_batch(C, Stmt, Batch),
  106. {Cols, receive_extended_results(C, Ref, [])};
  107. execute_batch(C, Sql, Batch) ->
  108. case parse(C, Sql) of
  109. {ok, #statement{} = S} ->
  110. execute_batch(C, S, Batch);
  111. Error ->
  112. Error
  113. end.
  114. %% statement/portal functions
  115. describe(C, #statement{name = Name}) ->
  116. describe(C, statement, Name).
  117. describe(C, statement, Name) ->
  118. Ref = epgsqli:describe(C, statement, Name),
  119. sync_on_error(C, receive_describe(C, Ref, #statement{name = Name}));
  120. describe(C, portal, Name) ->
  121. Ref = epgsqli:describe(C, portal, Name),
  122. sync_on_error(C, receive_describe_portal(C, Ref)).
  123. close(C, #statement{name = Name}) ->
  124. close(C, statement, Name).
  125. close(C, Type, Name) ->
  126. Ref = epgsqli:close(C, Type, Name),
  127. receive_atom(C, Ref, ok, ok).
  128. sync(C) ->
  129. Ref = epgsqli:sync(C),
  130. receive_atom(C, Ref, ok, ok).
  131. %% -- internal functions --
  132. receive_result(C, Ref, Result) ->
  133. try receive_result(C, Ref, [], []) of
  134. done -> Result;
  135. R -> receive_result(C, Ref, R)
  136. catch
  137. throw:E -> E
  138. end.
  139. receive_results(C, Ref, Results) ->
  140. try receive_result(C, Ref, [], []) of
  141. done -> lists:reverse(Results);
  142. R -> receive_results(C, Ref, [R | Results])
  143. catch
  144. throw:E -> E
  145. end.
  146. receive_result(C, Ref, Cols, Rows) ->
  147. receive
  148. {C, Ref, {columns, Cols2}} ->
  149. receive_result(C, Ref, Cols2, Rows);
  150. {C, Ref, {data, Row}} ->
  151. receive_result(C, Ref, Cols, [Row | Rows]);
  152. {C, Ref, {error, _E} = Error} ->
  153. Error;
  154. {C, Ref, {complete, {_Type, Count}}} ->
  155. case Cols of
  156. [] -> {ok, Count};
  157. _L -> {ok, Count, Cols, lists:reverse(Rows)}
  158. end;
  159. {C, Ref, {complete, _Type}} ->
  160. {ok, Cols, lists:reverse(Rows)};
  161. {C, Ref, done} ->
  162. done;
  163. {epgsql, C, socket_passive} ->
  164. ok = epgsql:activate(C),
  165. receive_result(C, Ref, Cols, Rows);
  166. {'EXIT', C, _Reason} ->
  167. throw({error, closed})
  168. end.
  169. receive_extended_results(C, Ref, Results) ->
  170. try receive_extended_result(C, Ref, []) of
  171. done -> lists:reverse(Results);
  172. R -> receive_extended_results(C, Ref, [R | Results])
  173. catch
  174. throw:E -> E
  175. end.
  176. receive_extended_result(C, Ref, Rows) ->
  177. receive
  178. {C, Ref, {data, Row}} ->
  179. receive_extended_result(C, Ref, [Row | Rows]);
  180. {C, Ref, {error, _E} = Error} ->
  181. Error;
  182. {C, Ref, suspended} ->
  183. {partial, lists:reverse(Rows)};
  184. {C, Ref, {complete, {_Type, Count}}} ->
  185. case Rows of
  186. [] -> {ok, Count};
  187. _L -> {ok, Count, lists:reverse(Rows)}
  188. end;
  189. {C, Ref, {complete, _Type}} ->
  190. {ok, lists:reverse(Rows)};
  191. {C, Ref, done} ->
  192. done;
  193. {epgsql, C, socket_passive} ->
  194. ok = epgsql:activate(C),
  195. receive_extended_result(C, Ref, Rows);
  196. {'EXIT', C, _Reason} ->
  197. {error, closed}
  198. end.
  199. receive_describe(C, Ref, Statement = #statement{}) ->
  200. receive
  201. {C, Ref, {types, Types}} ->
  202. receive_describe(C, Ref, Statement#statement{types = Types});
  203. {C, Ref, {columns, Columns}} ->
  204. {ok, Statement#statement{columns = Columns}};
  205. {C, Ref, no_data} ->
  206. {ok, Statement#statement{columns = []}};
  207. {C, Ref, Error = {error, _}} ->
  208. Error;
  209. {epgsql, C, socket_passive} ->
  210. ok = epgsql:activate(C),
  211. receive_describe(C, Ref, Statement);
  212. {'EXIT', C, _Reason} ->
  213. {error, closed}
  214. end.
  215. receive_describe_portal(C, Ref) ->
  216. receive
  217. {C, Ref, {columns, Columns}} ->
  218. {ok, Columns};
  219. {C, Ref, no_data} ->
  220. {ok, []};
  221. {C, Ref, Error = {error, _}} ->
  222. Error;
  223. {epgsql, C, socket_passive} ->
  224. ok = epgsql:activate(C),
  225. receive_describe_portal(C, Ref);
  226. {'EXIT', C, _Reason} ->
  227. {error, closed}
  228. end.
  229. receive_atom(C, Ref, Receive, Return) ->
  230. receive
  231. {C, Ref, Receive} ->
  232. Return;
  233. {C, Ref, Error = {error, _}} ->
  234. Error;
  235. {epgsql, C, socket_passive} ->
  236. ok = epgsql:activate(C),
  237. receive_atom(C, Ref, Receive, Return);
  238. {'EXIT', C, _Reason} ->
  239. {error, closed}
  240. end.
  241. sync_on_error(C, Error = {error, _}) ->
  242. Ref = epgsqli:sync(C),
  243. receive_atom(C, Ref, ok, ok),
  244. Error;
  245. sync_on_error(_C, R) ->
  246. R.