epgsql_incremental.erl 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. %%% Copyright (C) 2008 - Will Glozer. All rights reserved.
  2. %%% Copyright (C) 2011 - Anton Lebedevich. All rights reserved.
  3. %%%
  4. %%% Emulates original epgsql API over epgsqli for original tests
  5. -module(epgsql_incremental).
  6. -export([connect/2, connect/3, connect/4, close/1]).
  7. -export([get_parameter/2, squery/2, equery/2, equery/3]).
  8. -export([parse/2, parse/3, parse/4, describe/2, describe/3]).
  9. -export([bind/3, bind/4, execute/2, execute/3, execute/4, execute_batch/2]).
  10. -export([close/2, close/3, sync/1]).
  11. -export([with_transaction/2]).
  12. -include("epgsql.hrl").
  13. %% -- client interface --
  14. connect(Host, Opts) ->
  15. connect(Host, os:getenv("USER"), "", Opts).
  16. connect(Host, Username, Opts) ->
  17. connect(Host, Username, "", Opts).
  18. connect(Host, Username, Password, Opts) ->
  19. {ok, C} = epgsql_sock:start_link(),
  20. Ref = epgsqli:connect(C, Host, Username, Password, Opts),
  21. receive
  22. {C, Ref, connected} ->
  23. {ok, C};
  24. {C, Ref, Error = {error, _}} ->
  25. Error;
  26. {'EXIT', C, _Reason} ->
  27. {error, closed}
  28. end.
  29. close(C) ->
  30. epgsqli:close(C).
  31. get_parameter(C, Name) ->
  32. epgsqli:get_parameter(C, Name).
  33. squery(C, Sql) ->
  34. Ref = epgsqli:squery(C, Sql),
  35. case receive_results(C, Ref, []) of
  36. [Result] -> Result;
  37. Results -> Results
  38. end.
  39. equery(C, Sql) ->
  40. equery(C, Sql, []).
  41. equery(C, Sql, Parameters) ->
  42. case parse(C, Sql) of
  43. {ok, #statement{types = Types} = S} ->
  44. Typed_Parameters = lists:zip(Types, Parameters),
  45. Ref = epgsqli:equery(C, S, Typed_Parameters),
  46. receive_result(C, Ref, undefined);
  47. Error ->
  48. Error
  49. end.
  50. %% parse
  51. parse(C, Sql) ->
  52. parse(C, "", Sql, []).
  53. parse(C, Sql, Types) ->
  54. parse(C, "", Sql, Types).
  55. parse(C, Name, Sql, Types) ->
  56. Ref = epgsqli:parse(C, Name, Sql, Types),
  57. sync_on_error(C, receive_describe(C, Ref, #statement{name = Name})).
  58. %% bind
  59. bind(C, Statement, Parameters) ->
  60. bind(C, Statement, "", Parameters).
  61. bind(C, Statement, PortalName, Parameters) ->
  62. Ref = epgsqli:bind(C, Statement, PortalName, Parameters),
  63. sync_on_error(C, receive_atom(C, Ref, ok, ok)).
  64. %% execute
  65. execute(C, S) ->
  66. execute(C, S, "", 0).
  67. execute(C, S, N) ->
  68. execute(C, S, "", N).
  69. execute(C, S, PortalName, N) ->
  70. Ref = epgsqli:execute(C, S, PortalName, N),
  71. receive_extended_result(C, Ref, []).
  72. execute_batch(C, Batch) ->
  73. Ref = epgsqli:execute_batch(C, Batch),
  74. receive_extended_results(C, Ref, []).
  75. %% statement/portal functions
  76. describe(C, #statement{name = Name}) ->
  77. describe(C, statement, Name).
  78. describe(C, statement, Name) ->
  79. Ref = epgsqli:describe(C, statement, Name),
  80. sync_on_error(C, receive_describe(C, Ref, #statement{name = Name}));
  81. describe(C, Type, Name) ->
  82. %% TODO unknown result format of Describe portal
  83. epgsqli:describe(C, Type, Name).
  84. close(C, #statement{name = Name}) ->
  85. close(C, statement, Name).
  86. close(C, Type, Name) ->
  87. Ref = epgsqli:close(C, Type, Name),
  88. receive_atom(C, Ref, ok, ok).
  89. sync(C) ->
  90. Ref = epgsqli:sync(C),
  91. receive_atom(C, Ref, ok, ok).
  92. %% misc helper functions
  93. with_transaction(C, F) ->
  94. try {ok, [], []} = squery(C, "BEGIN"),
  95. R = F(C),
  96. {ok, [], []} = squery(C, "COMMIT"),
  97. R
  98. catch
  99. _:Why ->
  100. squery(C, "ROLLBACK"),
  101. %% TODO hides error stacktrace
  102. {rollback, Why}
  103. end.
  104. %% -- internal functions --
  105. receive_result(C, Ref, Result) ->
  106. try receive_result(C, Ref, [], []) of
  107. done -> Result;
  108. R -> receive_result(C, Ref, R)
  109. catch
  110. throw:E -> E
  111. end.
  112. receive_results(C, Ref, Results) ->
  113. try receive_result(C, Ref, [], []) of
  114. done -> lists:reverse(Results);
  115. R -> receive_results(C, Ref, [R | Results])
  116. catch
  117. throw:E -> E
  118. end.
  119. receive_result(C, Ref, Cols, Rows) ->
  120. receive
  121. {C, Ref, {columns, Cols2}} ->
  122. receive_result(C, Ref, Cols2, Rows);
  123. {C, Ref, {data, Row}} ->
  124. receive_result(C, Ref, Cols, [Row | Rows]);
  125. {C, Ref, {error, _E} = Error} ->
  126. Error;
  127. {C, Ref, {complete, {_Type, Count}}} ->
  128. case Rows of
  129. [] -> {ok, Count};
  130. _L -> {ok, Count, Cols, lists:reverse(Rows)}
  131. end;
  132. {C, Ref, {complete, _Type}} ->
  133. {ok, Cols, lists:reverse(Rows)};
  134. {C, Ref, done} ->
  135. done;
  136. {'EXIT', C, _Reason} ->
  137. throw({error, closed})
  138. end.
  139. receive_extended_results(C, Ref, Results) ->
  140. try receive_extended_result(C, Ref, []) of
  141. done -> lists:reverse(Results);
  142. R -> receive_extended_results(C, Ref, [R | Results])
  143. catch
  144. throw:E -> E
  145. end.
  146. receive_extended_result(C, Ref, Rows) ->
  147. receive
  148. {C, Ref, {data, Row}} ->
  149. receive_extended_result(C, Ref, [Row | Rows]);
  150. {C, Ref, {error, _E} = Error} ->
  151. Error;
  152. {C, Ref, suspended} ->
  153. {partial, lists:reverse(Rows)};
  154. {C, Ref, {complete, {_Type, Count}}} ->
  155. case Rows of
  156. [] -> {ok, Count};
  157. _L -> {ok, Count, lists:reverse(Rows)}
  158. end;
  159. {C, Ref, {complete, _Type}} ->
  160. {ok, lists:reverse(Rows)};
  161. {C, Ref, done} ->
  162. done;
  163. {'EXIT', C, _Reason} ->
  164. {error, closed}
  165. end.
  166. receive_describe(C, Ref, Statement = #statement{}) ->
  167. receive
  168. {C, Ref, {types, Types}} ->
  169. receive_describe(C, Ref, Statement#statement{types = Types});
  170. {C, Ref, {columns, Columns}} ->
  171. Columns2 = [Col#column{format = epgsql_wire:format(Col#column.type)} || Col <- Columns],
  172. {ok, Statement#statement{columns = Columns2}};
  173. {C, Ref, no_data} ->
  174. {ok, Statement#statement{columns = []}};
  175. {C, Ref, Error = {error, _}} ->
  176. Error;
  177. {'EXIT', C, _Reason} ->
  178. {error, closed}
  179. end.
  180. receive_atom(C, Ref, Receive, Return) ->
  181. receive
  182. {C, Ref, Receive} ->
  183. Return;
  184. {C, Ref, Error = {error, _}} ->
  185. Error;
  186. {'EXIT', C, _Reason} ->
  187. {error, closed}
  188. end.
  189. sync_on_error(C, Error = {error, _}) ->
  190. Ref = epgsqli:sync(C),
  191. receive_atom(C, Ref, ok, ok),
  192. Error;
  193. sync_on_error(_C, R) ->
  194. R.