epgsql_incremental.erl 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. %%% Copyright (C) 2008 - Will Glozer. All rights reserved.
  2. %%% Copyright (C) 2011 - Anton Lebedevich. All rights reserved.
  3. %%%
  4. %%% Emulates original epgsql API over epgsqli for original tests
  5. -module(epgsql_incremental).
  6. -export([connect/1, connect/2, connect/3, connect/4, close/1]).
  7. -export([get_parameter/2, set_notice_receiver/2, squery/2, equery/2, equery/3]).
  8. -export([prepared_query/3]).
  9. -export([parse/2, parse/3, parse/4, describe/2, describe/3]).
  10. -export([bind/3, bind/4, execute/2, execute/3, execute/4, execute_batch/2]).
  11. -export([close/2, close/3, sync/1]).
  12. -export([with_transaction/2]).
  13. -include("epgsql.hrl").
  14. %% -- client interface --
  15. connect(Opts) ->
  16. Ref = epgsqli:connect(Opts),
  17. await_connect(Ref).
  18. connect(Host, Opts) ->
  19. Ref = epgsqli:connect(Host, Opts),
  20. await_connect(Ref).
  21. connect(Host, Username, Opts) ->
  22. Ref = epgsqli:connect(Host, Username, Opts),
  23. await_connect(Ref).
  24. connect(Host, Username, Password, Opts) ->
  25. Ref = epgsqli:connect(Host, Username, Password, Opts),
  26. await_connect(Ref).
  27. await_connect(Ref) ->
  28. receive
  29. {C, Ref, connected} ->
  30. {ok, C};
  31. {_C, Ref, Error = {error, _}} ->
  32. Error;
  33. {'EXIT', _C, _Reason} ->
  34. {error, closed}
  35. end.
  36. close(C) ->
  37. epgsqli:close(C).
  38. get_parameter(C, Name) ->
  39. epgsqli:get_parameter(C, Name).
  40. set_notice_receiver(C, PidOrName) ->
  41. epgsqli:set_notice_receiver(C, PidOrName).
  42. squery(C, Sql) ->
  43. Ref = epgsqli:squery(C, Sql),
  44. case receive_results(C, Ref, []) of
  45. [Result] -> Result;
  46. Results -> Results
  47. end.
  48. equery(C, Sql) ->
  49. equery(C, Sql, []).
  50. equery(C, Sql, Parameters) ->
  51. case parse(C, Sql) of
  52. {ok, #statement{types = Types} = S} ->
  53. Typed_Parameters = lists:zip(Types, Parameters),
  54. Ref = epgsqli:equery(C, S, Typed_Parameters),
  55. receive_result(C, Ref, undefined);
  56. Error ->
  57. Error
  58. end.
  59. prepared_query(C, Name, Parameters) ->
  60. case describe(C, statement, Name) of
  61. {ok, #statement{types = Types} = S} ->
  62. Typed_Parameters = lists:zip(Types, Parameters),
  63. Ref = epgsqli:prepared_query(C, S, Typed_Parameters),
  64. receive_result(C, Ref, undefined);
  65. Error ->
  66. Error
  67. end.
  68. %% parse
  69. parse(C, Sql) ->
  70. parse(C, "", Sql, []).
  71. parse(C, Sql, Types) ->
  72. parse(C, "", Sql, Types).
  73. parse(C, Name, Sql, Types) ->
  74. Ref = epgsqli:parse(C, Name, Sql, Types),
  75. sync_on_error(C, receive_describe(C, Ref, #statement{name = Name})).
  76. %% bind
  77. bind(C, Statement, Parameters) ->
  78. bind(C, Statement, "", Parameters).
  79. bind(C, Statement, PortalName, Parameters) ->
  80. Ref = epgsqli:bind(C, Statement, PortalName, Parameters),
  81. sync_on_error(C, receive_atom(C, Ref, ok, ok)).
  82. %% execute
  83. execute(C, S) ->
  84. execute(C, S, "", 0).
  85. execute(C, S, N) ->
  86. execute(C, S, "", N).
  87. execute(C, S, PortalName, N) ->
  88. Ref = epgsqli:execute(C, S, PortalName, N),
  89. receive_extended_result(C, Ref, []).
  90. execute_batch(C, Batch) ->
  91. Ref = epgsqli:execute_batch(C, Batch),
  92. receive_extended_results(C, Ref, []).
  93. %% statement/portal functions
  94. describe(C, #statement{name = Name}) ->
  95. describe(C, statement, Name).
  96. describe(C, statement, Name) ->
  97. Ref = epgsqli:describe(C, statement, Name),
  98. sync_on_error(C, receive_describe(C, Ref, #statement{name = Name}));
  99. describe(C, Type, Name) ->
  100. %% TODO unknown result format of Describe portal
  101. epgsqli:describe(C, Type, Name).
  102. close(C, #statement{name = Name}) ->
  103. close(C, statement, Name).
  104. close(C, Type, Name) ->
  105. Ref = epgsqli:close(C, Type, Name),
  106. receive_atom(C, Ref, ok, ok).
  107. sync(C) ->
  108. Ref = epgsqli:sync(C),
  109. receive_atom(C, Ref, ok, ok).
  110. %% misc helper functions
  111. with_transaction(C, F) ->
  112. try {ok, [], []} = squery(C, "BEGIN"),
  113. R = F(C),
  114. {ok, [], []} = squery(C, "COMMIT"),
  115. R
  116. catch
  117. _:Why ->
  118. squery(C, "ROLLBACK"),
  119. %% TODO hides error stacktrace
  120. {rollback, Why}
  121. end.
  122. %% -- internal functions --
  123. receive_result(C, Ref, Result) ->
  124. try receive_result(C, Ref, [], []) of
  125. done -> Result;
  126. R -> receive_result(C, Ref, R)
  127. catch
  128. throw:E -> E
  129. end.
  130. receive_results(C, Ref, Results) ->
  131. try receive_result(C, Ref, [], []) of
  132. done -> lists:reverse(Results);
  133. R -> receive_results(C, Ref, [R | Results])
  134. catch
  135. throw:E -> E
  136. end.
  137. receive_result(C, Ref, Cols, Rows) ->
  138. receive
  139. {C, Ref, {columns, Cols2}} ->
  140. receive_result(C, Ref, Cols2, Rows);
  141. {C, Ref, {data, Row}} ->
  142. receive_result(C, Ref, Cols, [Row | Rows]);
  143. {C, Ref, {error, _E} = Error} ->
  144. Error;
  145. {C, Ref, {complete, {_Type, Count}}} ->
  146. case Rows of
  147. [] -> {ok, Count};
  148. _L -> {ok, Count, Cols, lists:reverse(Rows)}
  149. end;
  150. {C, Ref, {complete, _Type}} ->
  151. {ok, Cols, lists:reverse(Rows)};
  152. {C, Ref, done} ->
  153. done;
  154. {'EXIT', C, _Reason} ->
  155. throw({error, closed})
  156. end.
  157. receive_extended_results(C, Ref, Results) ->
  158. try receive_extended_result(C, Ref, []) of
  159. done -> lists:reverse(Results);
  160. R -> receive_extended_results(C, Ref, [R | Results])
  161. catch
  162. throw:E -> E
  163. end.
  164. receive_extended_result(C, Ref, Rows) ->
  165. receive
  166. {C, Ref, {data, Row}} ->
  167. receive_extended_result(C, Ref, [Row | Rows]);
  168. {C, Ref, {error, _E} = Error} ->
  169. Error;
  170. {C, Ref, suspended} ->
  171. {partial, lists:reverse(Rows)};
  172. {C, Ref, {complete, {_Type, Count}}} ->
  173. case Rows of
  174. [] -> {ok, Count};
  175. _L -> {ok, Count, lists:reverse(Rows)}
  176. end;
  177. {C, Ref, {complete, _Type}} ->
  178. {ok, lists:reverse(Rows)};
  179. {C, Ref, done} ->
  180. done;
  181. {'EXIT', C, _Reason} ->
  182. {error, closed}
  183. end.
  184. receive_describe(C, Ref, Statement = #statement{}) ->
  185. receive
  186. {C, Ref, {types, Types}} ->
  187. receive_describe(C, Ref, Statement#statement{types = Types});
  188. {C, Ref, {columns, Columns}} ->
  189. Columns2 = [Col#column{format = epgsql_wire:format(Col#column.type)} || Col <- Columns],
  190. {ok, Statement#statement{columns = Columns2}};
  191. {C, Ref, no_data} ->
  192. {ok, Statement#statement{columns = []}};
  193. {C, Ref, Error = {error, _}} ->
  194. Error;
  195. {'EXIT', C, _Reason} ->
  196. {error, closed}
  197. end.
  198. receive_atom(C, Ref, Receive, Return) ->
  199. receive
  200. {C, Ref, Receive} ->
  201. Return;
  202. {C, Ref, Error = {error, _}} ->
  203. Error;
  204. {'EXIT', C, _Reason} ->
  205. {error, closed}
  206. end.
  207. sync_on_error(C, Error = {error, _}) ->
  208. Ref = epgsqli:sync(C),
  209. receive_atom(C, Ref, ok, ok),
  210. Error;
  211. sync_on_error(_C, R) ->
  212. R.