epgsql_incremental.erl 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. %%% Copyright (C) 2008 - Will Glozer. All rights reserved.
  2. %%% Copyright (C) 2011 - Anton Lebedevich. All rights reserved.
  3. %%%
  4. %%% Emulates original epgsql API over epgsqli for original tests
  5. -module(epgsql_incremental).
  6. -export([connect/2, connect/3, connect/4, close/1]).
  7. -export([get_parameter/2, set_notice_receiver/2, squery/2, equery/2, equery/3]).
  8. -export([prepared_query/3]).
  9. -export([parse/2, parse/3, parse/4, describe/2, describe/3]).
  10. -export([bind/3, bind/4, execute/2, execute/3, execute/4, execute_batch/2]).
  11. -export([close/2, close/3, sync/1]).
  12. -export([with_transaction/2]).
  13. -include("epgsql.hrl").
  14. %% -- client interface --
  15. connect(Host, Opts) ->
  16. connect(Host, os:getenv("USER"), "", Opts).
  17. connect(Host, Username, Opts) ->
  18. connect(Host, Username, "", Opts).
  19. connect(Host, Username, Password, Opts) ->
  20. {ok, C} = epgsql_sock:start_link(),
  21. Ref = epgsqli:connect(C, Host, Username, Password, Opts),
  22. receive
  23. {C, Ref, connected} ->
  24. {ok, C};
  25. {C, Ref, Error = {error, _}} ->
  26. Error;
  27. {'EXIT', C, _Reason} ->
  28. {error, closed}
  29. end.
  30. close(C) ->
  31. epgsqli:close(C).
  32. get_parameter(C, Name) ->
  33. epgsqli:get_parameter(C, Name).
  34. set_notice_receiver(C, PidOrName) ->
  35. epgsqli:set_notice_receiver(C, PidOrName).
  36. squery(C, Sql) ->
  37. Ref = epgsqli:squery(C, Sql),
  38. case receive_results(C, Ref, []) of
  39. [Result] -> Result;
  40. Results -> Results
  41. end.
  42. equery(C, Sql) ->
  43. equery(C, Sql, []).
  44. equery(C, Sql, Parameters) ->
  45. case parse(C, Sql) of
  46. {ok, #statement{types = Types} = S} ->
  47. Typed_Parameters = lists:zip(Types, Parameters),
  48. Ref = epgsqli:equery(C, S, Typed_Parameters),
  49. receive_result(C, Ref, undefined);
  50. Error ->
  51. Error
  52. end.
  53. prepared_query(C, Name, Parameters) ->
  54. case describe(C, statement, Name) of
  55. {ok, #statement{types = Types} = S} ->
  56. Typed_Parameters = lists:zip(Types, Parameters),
  57. Ref = epgsqli:prepared_query(C, S, Typed_Parameters),
  58. receive_result(C, Ref, undefined);
  59. Error ->
  60. Error
  61. end.
  62. %% parse
  63. parse(C, Sql) ->
  64. parse(C, "", Sql, []).
  65. parse(C, Sql, Types) ->
  66. parse(C, "", Sql, Types).
  67. parse(C, Name, Sql, Types) ->
  68. Ref = epgsqli:parse(C, Name, Sql, Types),
  69. sync_on_error(C, receive_describe(C, Ref, #statement{name = Name})).
  70. %% bind
  71. bind(C, Statement, Parameters) ->
  72. bind(C, Statement, "", Parameters).
  73. bind(C, Statement, PortalName, Parameters) ->
  74. Ref = epgsqli:bind(C, Statement, PortalName, Parameters),
  75. sync_on_error(C, receive_atom(C, Ref, ok, ok)).
  76. %% execute
  77. execute(C, S) ->
  78. execute(C, S, "", 0).
  79. execute(C, S, N) ->
  80. execute(C, S, "", N).
  81. execute(C, S, PortalName, N) ->
  82. Ref = epgsqli:execute(C, S, PortalName, N),
  83. receive_extended_result(C, Ref, []).
  84. execute_batch(C, Batch) ->
  85. Ref = epgsqli:execute_batch(C, Batch),
  86. receive_extended_results(C, Ref, []).
  87. %% statement/portal functions
  88. describe(C, #statement{name = Name}) ->
  89. describe(C, statement, Name).
  90. describe(C, statement, Name) ->
  91. Ref = epgsqli:describe(C, statement, Name),
  92. sync_on_error(C, receive_describe(C, Ref, #statement{name = Name}));
  93. describe(C, Type, Name) ->
  94. %% TODO unknown result format of Describe portal
  95. epgsqli:describe(C, Type, Name).
  96. close(C, #statement{name = Name}) ->
  97. close(C, statement, Name).
  98. close(C, Type, Name) ->
  99. Ref = epgsqli:close(C, Type, Name),
  100. receive_atom(C, Ref, ok, ok).
  101. sync(C) ->
  102. Ref = epgsqli:sync(C),
  103. receive_atom(C, Ref, ok, ok).
  104. %% misc helper functions
  105. with_transaction(C, F) ->
  106. try {ok, [], []} = squery(C, "BEGIN"),
  107. R = F(C),
  108. {ok, [], []} = squery(C, "COMMIT"),
  109. R
  110. catch
  111. _:Why ->
  112. squery(C, "ROLLBACK"),
  113. %% TODO hides error stacktrace
  114. {rollback, Why}
  115. end.
  116. %% -- internal functions --
  117. receive_result(C, Ref, Result) ->
  118. try receive_result(C, Ref, [], []) of
  119. done -> Result;
  120. R -> receive_result(C, Ref, R)
  121. catch
  122. throw:E -> E
  123. end.
  124. receive_results(C, Ref, Results) ->
  125. try receive_result(C, Ref, [], []) of
  126. done -> lists:reverse(Results);
  127. R -> receive_results(C, Ref, [R | Results])
  128. catch
  129. throw:E -> E
  130. end.
  131. receive_result(C, Ref, Cols, Rows) ->
  132. receive
  133. {C, Ref, {columns, Cols2}} ->
  134. receive_result(C, Ref, Cols2, Rows);
  135. {C, Ref, {data, Row}} ->
  136. receive_result(C, Ref, Cols, [Row | Rows]);
  137. {C, Ref, {error, _E} = Error} ->
  138. Error;
  139. {C, Ref, {complete, {_Type, Count}}} ->
  140. case Rows of
  141. [] -> {ok, Count};
  142. _L -> {ok, Count, Cols, lists:reverse(Rows)}
  143. end;
  144. {C, Ref, {complete, _Type}} ->
  145. {ok, Cols, lists:reverse(Rows)};
  146. {C, Ref, done} ->
  147. done;
  148. {'EXIT', C, _Reason} ->
  149. throw({error, closed})
  150. end.
  151. receive_extended_results(C, Ref, Results) ->
  152. try receive_extended_result(C, Ref, []) of
  153. done -> lists:reverse(Results);
  154. R -> receive_extended_results(C, Ref, [R | Results])
  155. catch
  156. throw:E -> E
  157. end.
  158. receive_extended_result(C, Ref, Rows) ->
  159. receive
  160. {C, Ref, {data, Row}} ->
  161. receive_extended_result(C, Ref, [Row | Rows]);
  162. {C, Ref, {error, _E} = Error} ->
  163. Error;
  164. {C, Ref, suspended} ->
  165. {partial, lists:reverse(Rows)};
  166. {C, Ref, {complete, {_Type, Count}}} ->
  167. case Rows of
  168. [] -> {ok, Count};
  169. _L -> {ok, Count, lists:reverse(Rows)}
  170. end;
  171. {C, Ref, {complete, _Type}} ->
  172. {ok, lists:reverse(Rows)};
  173. {C, Ref, done} ->
  174. done;
  175. {'EXIT', C, _Reason} ->
  176. {error, closed}
  177. end.
  178. receive_describe(C, Ref, Statement = #statement{}) ->
  179. receive
  180. {C, Ref, {types, Types}} ->
  181. receive_describe(C, Ref, Statement#statement{types = Types});
  182. {C, Ref, {columns, Columns}} ->
  183. Columns2 = [Col#column{format = epgsql_wire:format(Col#column.type)} || Col <- Columns],
  184. {ok, Statement#statement{columns = Columns2}};
  185. {C, Ref, no_data} ->
  186. {ok, Statement#statement{columns = []}};
  187. {C, Ref, Error = {error, _}} ->
  188. Error;
  189. {'EXIT', C, _Reason} ->
  190. {error, closed}
  191. end.
  192. receive_atom(C, Ref, Receive, Return) ->
  193. receive
  194. {C, Ref, Receive} ->
  195. Return;
  196. {C, Ref, Error = {error, _}} ->
  197. Error;
  198. {'EXIT', C, _Reason} ->
  199. {error, closed}
  200. end.
  201. sync_on_error(C, Error = {error, _}) ->
  202. Ref = epgsqli:sync(C),
  203. receive_atom(C, Ref, ok, ok),
  204. Error;
  205. sync_on_error(_C, R) ->
  206. R.