epgsql_incremental.erl 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. %%% Copyright (C) 2008 - Will Glozer. All rights reserved.
  2. %%% Copyright (C) 2011 - Anton Lebedevich. All rights reserved.
  3. %%%
  4. %%% Emulates original epgsql API over epgsqli for original tests
  5. -module(epgsql_incremental).
  6. -export([connect/1, connect/2, connect/3, connect/4, close/1]).
  7. -export([get_parameter/2, set_notice_receiver/2, get_cmd_status/1, squery/2, equery/2, equery/3]).
  8. -export([prepared_query/3]).
  9. -export([parse/2, parse/3, parse/4, describe/2, describe/3]).
  10. -export([bind/3, bind/4, execute/2, execute/3, execute/4, execute_batch/2]).
  11. -export([close/2, close/3, sync/1]).
  12. -export([with_transaction/2]).
  13. -include("epgsql.hrl").
  14. %% -- client interface --
  15. connect(Opts) ->
  16. Ref = epgsqli:connect(Opts),
  17. await_connect(Ref).
  18. connect(Host, Opts) ->
  19. Ref = epgsqli:connect(Host, Opts),
  20. await_connect(Ref).
  21. connect(Host, Username, Opts) ->
  22. Ref = epgsqli:connect(Host, Username, Opts),
  23. await_connect(Ref).
  24. connect(Host, Username, Password, Opts) ->
  25. Ref = epgsqli:connect(Host, Username, Password, Opts),
  26. await_connect(Ref).
  27. await_connect(Ref) ->
  28. receive
  29. {C, Ref, connected} ->
  30. {ok, C};
  31. {_C, Ref, Error = {error, _}} ->
  32. Error;
  33. {'EXIT', _C, _Reason} ->
  34. {error, closed}
  35. end.
  36. close(C) ->
  37. epgsqli:close(C).
  38. get_parameter(C, Name) ->
  39. epgsqli:get_parameter(C, Name).
  40. set_notice_receiver(C, PidOrName) ->
  41. epgsqli:set_notice_receiver(C, PidOrName).
  42. get_cmd_status(C) ->
  43. epgsqli:get_cmd_status(C).
  44. squery(C, Sql) ->
  45. Ref = epgsqli:squery(C, Sql),
  46. case receive_results(C, Ref, []) of
  47. [Result] -> Result;
  48. Results -> Results
  49. end.
  50. equery(C, Sql) ->
  51. equery(C, Sql, []).
  52. equery(C, Sql, Parameters) ->
  53. case parse(C, Sql) of
  54. {ok, #statement{types = Types} = S} ->
  55. Typed_Parameters = lists:zip(Types, Parameters),
  56. Ref = epgsqli:equery(C, S, Typed_Parameters),
  57. receive_result(C, Ref, undefined);
  58. Error ->
  59. Error
  60. end.
  61. prepared_query(C, Name, Parameters) ->
  62. case describe(C, statement, Name) of
  63. {ok, #statement{types = Types} = S} ->
  64. Typed_Parameters = lists:zip(Types, Parameters),
  65. Ref = epgsqli:prepared_query(C, S, Typed_Parameters),
  66. receive_result(C, Ref, undefined);
  67. Error ->
  68. Error
  69. end.
  70. %% parse
  71. parse(C, Sql) ->
  72. parse(C, "", Sql, []).
  73. parse(C, Sql, Types) ->
  74. parse(C, "", Sql, Types).
  75. parse(C, Name, Sql, Types) ->
  76. Ref = epgsqli:parse(C, Name, Sql, Types),
  77. sync_on_error(C, receive_describe(C, Ref, #statement{name = Name})).
  78. %% bind
  79. bind(C, Statement, Parameters) ->
  80. bind(C, Statement, "", Parameters).
  81. bind(C, Statement, PortalName, Parameters) ->
  82. Ref = epgsqli:bind(C, Statement, PortalName, Parameters),
  83. sync_on_error(C, receive_atom(C, Ref, ok, ok)).
  84. %% execute
  85. execute(C, S) ->
  86. execute(C, S, "", 0).
  87. execute(C, S, N) ->
  88. execute(C, S, "", N).
  89. execute(C, S, PortalName, N) ->
  90. Ref = epgsqli:execute(C, S, PortalName, N),
  91. receive_extended_result(C, Ref, []).
  92. execute_batch(C, Batch) ->
  93. Ref = epgsqli:execute_batch(C, Batch),
  94. receive_extended_results(C, Ref, []).
  95. %% statement/portal functions
  96. describe(C, #statement{name = Name}) ->
  97. describe(C, statement, Name).
  98. describe(C, statement, Name) ->
  99. Ref = epgsqli:describe(C, statement, Name),
  100. sync_on_error(C, receive_describe(C, Ref, #statement{name = Name}));
  101. describe(C, Type, Name) ->
  102. %% TODO unknown result format of Describe portal
  103. epgsqli:describe(C, Type, Name).
  104. close(C, #statement{name = Name}) ->
  105. close(C, statement, Name).
  106. close(C, Type, Name) ->
  107. Ref = epgsqli:close(C, Type, Name),
  108. receive_atom(C, Ref, ok, ok).
  109. sync(C) ->
  110. Ref = epgsqli:sync(C),
  111. receive_atom(C, Ref, ok, ok).
  112. %% misc helper functions
  113. with_transaction(C, F) ->
  114. try {ok, [], []} = squery(C, "BEGIN"),
  115. R = F(C),
  116. {ok, [], []} = squery(C, "COMMIT"),
  117. R
  118. catch
  119. _:Why ->
  120. squery(C, "ROLLBACK"),
  121. %% TODO hides error stacktrace
  122. {rollback, Why}
  123. end.
  124. %% -- internal functions --
  125. receive_result(C, Ref, Result) ->
  126. try receive_result(C, Ref, [], []) of
  127. done -> Result;
  128. R -> receive_result(C, Ref, R)
  129. catch
  130. throw:E -> E
  131. end.
  132. receive_results(C, Ref, Results) ->
  133. try receive_result(C, Ref, [], []) of
  134. done -> lists:reverse(Results);
  135. R -> receive_results(C, Ref, [R | Results])
  136. catch
  137. throw:E -> E
  138. end.
  139. receive_result(C, Ref, Cols, Rows) ->
  140. receive
  141. {C, Ref, {columns, Cols2}} ->
  142. receive_result(C, Ref, Cols2, Rows);
  143. {C, Ref, {data, Row}} ->
  144. receive_result(C, Ref, Cols, [Row | Rows]);
  145. {C, Ref, {error, _E} = Error} ->
  146. Error;
  147. {C, Ref, {complete, {_Type, Count}}} ->
  148. case Cols of
  149. [] -> {ok, Count};
  150. _L -> {ok, Count, Cols, lists:reverse(Rows)}
  151. end;
  152. {C, Ref, {complete, _Type}} ->
  153. {ok, Cols, lists:reverse(Rows)};
  154. {C, Ref, done} ->
  155. done;
  156. {'EXIT', C, _Reason} ->
  157. throw({error, closed})
  158. end.
  159. receive_extended_results(C, Ref, Results) ->
  160. try receive_extended_result(C, Ref, []) of
  161. done -> lists:reverse(Results);
  162. R -> receive_extended_results(C, Ref, [R | Results])
  163. catch
  164. throw:E -> E
  165. end.
  166. receive_extended_result(C, Ref, Rows) ->
  167. receive
  168. {C, Ref, {data, Row}} ->
  169. receive_extended_result(C, Ref, [Row | Rows]);
  170. {C, Ref, {error, _E} = Error} ->
  171. Error;
  172. {C, Ref, suspended} ->
  173. {partial, lists:reverse(Rows)};
  174. {C, Ref, {complete, {_Type, Count}}} ->
  175. case Rows of
  176. [] -> {ok, Count};
  177. _L -> {ok, Count, lists:reverse(Rows)}
  178. end;
  179. {C, Ref, {complete, _Type}} ->
  180. {ok, lists:reverse(Rows)};
  181. {C, Ref, done} ->
  182. done;
  183. {'EXIT', C, _Reason} ->
  184. {error, closed}
  185. end.
  186. receive_describe(C, Ref, Statement = #statement{}) ->
  187. receive
  188. {C, Ref, {types, Types}} ->
  189. receive_describe(C, Ref, Statement#statement{types = Types});
  190. {C, Ref, {columns, Columns}} ->
  191. Columns2 = [Col#column{format = epgsql_wire:format(Col#column.type)} || Col <- Columns],
  192. {ok, Statement#statement{columns = Columns2}};
  193. {C, Ref, no_data} ->
  194. {ok, Statement#statement{columns = []}};
  195. {C, Ref, Error = {error, _}} ->
  196. Error;
  197. {'EXIT', C, _Reason} ->
  198. {error, closed}
  199. end.
  200. receive_atom(C, Ref, Receive, Return) ->
  201. receive
  202. {C, Ref, Receive} ->
  203. Return;
  204. {C, Ref, Error = {error, _}} ->
  205. Error;
  206. {'EXIT', C, _Reason} ->
  207. {error, closed}
  208. end.
  209. sync_on_error(C, Error = {error, _}) ->
  210. Ref = epgsqli:sync(C),
  211. receive_atom(C, Ref, ok, ok),
  212. Error;
  213. sync_on_error(_C, R) ->
  214. R.