epgsql_incremental.erl 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. %%% Copyright (C) 2008 - Will Glozer. All rights reserved.
  2. %%% Copyright (C) 2011 - Anton Lebedevich. All rights reserved.
  3. %%%
  4. %%% Emulates original epgsql API over epgsqli for original tests
  5. -module(epgsql_incremental).
  6. -export([connect/1, connect/2, connect/3, connect/4, close/1]).
  7. -export([get_parameter/2, set_notice_receiver/2, get_cmd_status/1, squery/2, equery/2, equery/3]).
  8. -export([prepared_query/3]).
  9. -export([parse/2, parse/3, parse/4, describe/2, describe/3]).
  10. -export([bind/3, bind/4, execute/2, execute/3, execute/4, execute_batch/2, execute_batch/3]).
  11. -export([close/2, close/3, sync/1]).
  12. -include("epgsql.hrl").
  13. %% -- client interface --
  14. connect(Opts) ->
  15. Ref = epgsqli:connect(Opts),
  16. await_connect(Ref, Opts).
  17. connect(Host, Opts) ->
  18. Ref = epgsqli:connect(Host, Opts),
  19. await_connect(Ref, Opts).
  20. connect(Host, Username, Opts) ->
  21. Ref = epgsqli:connect(Host, Username, Opts),
  22. await_connect(Ref, Opts).
  23. connect(Host, Username, Password, Opts) ->
  24. Ref = epgsqli:connect(Host, Username, Password, Opts),
  25. await_connect(Ref, Opts).
  26. await_connect(Ref, Opts0) ->
  27. Opts = epgsql:to_map(Opts0),
  28. Timeout = maps:get(timeout, Opts, 5000),
  29. receive
  30. {C, Ref, connected} ->
  31. {ok, C};
  32. {_C, Ref, Error = {error, _}} ->
  33. Error
  34. after Timeout ->
  35. error(timeout)
  36. end.
  37. close(C) ->
  38. epgsqli:close(C).
  39. get_parameter(C, Name) ->
  40. epgsqli:get_parameter(C, Name).
  41. set_notice_receiver(C, PidOrName) ->
  42. epgsqli:set_notice_receiver(C, PidOrName).
  43. get_cmd_status(C) ->
  44. epgsqli:get_cmd_status(C).
  45. squery(C, Sql) ->
  46. Ref = epgsqli:squery(C, Sql),
  47. case receive_results(C, Ref, []) of
  48. [Result] -> Result;
  49. Results -> Results
  50. end.
  51. equery(C, Sql) ->
  52. equery(C, Sql, []).
  53. equery(C, Sql, Parameters) ->
  54. case parse(C, Sql) of
  55. {ok, #statement{types = Types} = S} ->
  56. Typed_Parameters = lists:zip(Types, Parameters),
  57. Ref = epgsqli:equery(C, S, Typed_Parameters),
  58. receive_result(C, Ref, undefined);
  59. Error ->
  60. Error
  61. end.
  62. prepared_query(C, Name, Parameters) ->
  63. case describe(C, statement, Name) of
  64. {ok, #statement{types = Types} = S} ->
  65. Typed_Parameters = lists:zip(Types, Parameters),
  66. Ref = epgsqli:prepared_query(C, S, Typed_Parameters),
  67. receive_result(C, Ref, undefined);
  68. Error ->
  69. Error
  70. end.
  71. %% parse
  72. parse(C, Sql) ->
  73. parse(C, "", Sql, []).
  74. parse(C, Sql, Types) ->
  75. parse(C, "", Sql, Types).
  76. parse(C, Name, Sql, Types) ->
  77. Ref = epgsqli:parse(C, Name, Sql, Types),
  78. sync_on_error(C, receive_describe(C, Ref, #statement{name = Name})).
  79. %% bind
  80. bind(C, Statement, Parameters) ->
  81. bind(C, Statement, "", Parameters).
  82. bind(C, Statement, PortalName, Parameters) ->
  83. Ref = epgsqli:bind(C, Statement, PortalName, Parameters),
  84. sync_on_error(C, receive_atom(C, Ref, ok, ok)).
  85. %% execute
  86. execute(C, S) ->
  87. execute(C, S, "", 0).
  88. execute(C, S, N) ->
  89. execute(C, S, "", N).
  90. execute(C, S, PortalName, N) ->
  91. Ref = epgsqli:execute(C, S, PortalName, N),
  92. receive_extended_result(C, Ref, []).
  93. execute_batch(C, Batch) ->
  94. Ref = epgsqli:execute_batch(C, Batch),
  95. receive_extended_results(C, Ref, []).
  96. execute_batch(C, #statement{columns = Cols} = Stmt, Batch) ->
  97. Ref = epgsqli:execute_batch(C, Stmt, Batch),
  98. {Cols, receive_extended_results(C, Ref, [])};
  99. execute_batch(C, Sql, Batch) ->
  100. case parse(C, Sql) of
  101. {ok, #statement{} = S} ->
  102. execute_batch(C, S, Batch);
  103. Error ->
  104. Error
  105. end.
  106. %% statement/portal functions
  107. describe(C, #statement{name = Name}) ->
  108. describe(C, statement, Name).
  109. describe(C, statement, Name) ->
  110. Ref = epgsqli:describe(C, statement, Name),
  111. sync_on_error(C, receive_describe(C, Ref, #statement{name = Name}));
  112. describe(C, portal, Name) ->
  113. Ref = epgsqli:describe(C, portal, Name),
  114. sync_on_error(C, receive_describe_portal(C, Ref)).
  115. close(C, #statement{name = Name}) ->
  116. close(C, statement, Name).
  117. close(C, Type, Name) ->
  118. Ref = epgsqli:close(C, Type, Name),
  119. receive_atom(C, Ref, ok, ok).
  120. sync(C) ->
  121. Ref = epgsqli:sync(C),
  122. receive_atom(C, Ref, ok, ok).
  123. %% -- internal functions --
  124. receive_result(C, Ref, Result) ->
  125. try receive_result(C, Ref, [], []) of
  126. done -> Result;
  127. R -> receive_result(C, Ref, R)
  128. catch
  129. throw:E -> E
  130. end.
  131. receive_results(C, Ref, Results) ->
  132. try receive_result(C, Ref, [], []) of
  133. done -> lists:reverse(Results);
  134. R -> receive_results(C, Ref, [R | Results])
  135. catch
  136. throw:E -> E
  137. end.
  138. receive_result(C, Ref, Cols, Rows) ->
  139. receive
  140. {C, Ref, {columns, Cols2}} ->
  141. receive_result(C, Ref, Cols2, Rows);
  142. {C, Ref, {data, Row}} ->
  143. receive_result(C, Ref, Cols, [Row | Rows]);
  144. {C, Ref, {error, _E} = Error} ->
  145. Error;
  146. {C, Ref, {complete, {_Type, Count}}} ->
  147. case Cols of
  148. [] -> {ok, Count};
  149. _L -> {ok, Count, Cols, lists:reverse(Rows)}
  150. end;
  151. {C, Ref, {complete, _Type}} ->
  152. {ok, Cols, lists:reverse(Rows)};
  153. {C, Ref, done} ->
  154. done;
  155. {'EXIT', C, _Reason} ->
  156. throw({error, closed})
  157. end.
  158. receive_extended_results(C, Ref, Results) ->
  159. try receive_extended_result(C, Ref, []) of
  160. done -> lists:reverse(Results);
  161. R -> receive_extended_results(C, Ref, [R | Results])
  162. catch
  163. throw:E -> E
  164. end.
  165. receive_extended_result(C, Ref, Rows) ->
  166. receive
  167. {C, Ref, {data, Row}} ->
  168. receive_extended_result(C, Ref, [Row | Rows]);
  169. {C, Ref, {error, _E} = Error} ->
  170. Error;
  171. {C, Ref, suspended} ->
  172. {partial, lists:reverse(Rows)};
  173. {C, Ref, {complete, {_Type, Count}}} ->
  174. case Rows of
  175. [] -> {ok, Count};
  176. _L -> {ok, Count, lists:reverse(Rows)}
  177. end;
  178. {C, Ref, {complete, _Type}} ->
  179. {ok, lists:reverse(Rows)};
  180. {C, Ref, done} ->
  181. done;
  182. {'EXIT', C, _Reason} ->
  183. {error, closed}
  184. end.
  185. receive_describe(C, Ref, Statement = #statement{}) ->
  186. receive
  187. {C, Ref, {types, Types}} ->
  188. receive_describe(C, Ref, Statement#statement{types = Types});
  189. {C, Ref, {columns, Columns}} ->
  190. {ok, Statement#statement{columns = Columns}};
  191. {C, Ref, no_data} ->
  192. {ok, Statement#statement{columns = []}};
  193. {C, Ref, Error = {error, _}} ->
  194. Error;
  195. {'EXIT', C, _Reason} ->
  196. {error, closed}
  197. end.
  198. receive_describe_portal(C, Ref) ->
  199. receive
  200. {C, Ref, {columns, Columns}} ->
  201. {ok, Columns};
  202. {C, Ref, no_data} ->
  203. {ok, []};
  204. {C, Ref, Error = {error, _}} ->
  205. Error;
  206. {'EXIT', C, _Reason} ->
  207. {error, closed}
  208. end.
  209. receive_atom(C, Ref, Receive, Return) ->
  210. receive
  211. {C, Ref, Receive} ->
  212. Return;
  213. {C, Ref, Error = {error, _}} ->
  214. Error;
  215. {'EXIT', C, _Reason} ->
  216. {error, closed}
  217. end.
  218. sync_on_error(C, Error = {error, _}) ->
  219. Ref = epgsqli:sync(C),
  220. receive_atom(C, Ref, ok, ok),
  221. Error;
  222. sync_on_error(_C, R) ->
  223. R.