pgsql.erl 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. %%% Copyright (C) 2008 - Will Glozer. All rights reserved.
  2. -module(pgsql).
  3. -export([connect/2, connect/3, connect/4, close/1]).
  4. -export([get_parameter/2, squery/2, equery/2, equery/3]).
  5. -export([parse/2, parse/3, parse/4, describe/2, describe/3]).
  6. -export([bind/3, bind/4, execute/2, execute/3, execute/4]).
  7. -export([close/2, close/3, sync/1]).
  8. -export([with_transaction/2]).
  9. -include("pgsql.hrl").
  10. %% -- client interface --
  11. connect(Host, Opts) ->
  12. connect(Host, os:getenv("USER"), "", Opts).
  13. connect(Host, Username, Opts) ->
  14. connect(Host, Username, "", Opts).
  15. connect(Host, Username, Password, Opts) ->
  16. {ok, C} = pgsql_sock:start_link(),
  17. Ref = pgsql_sock:connect(C, Host, Username, Password, Opts),
  18. %% TODO connect timeout
  19. receive
  20. {Ref, connected} ->
  21. {ok, C};
  22. {Ref, Error = {error, _}} ->
  23. Error;
  24. {'EXIT', C, _Reason} ->
  25. {error, closed}
  26. end.
  27. close(C) ->
  28. pgsql_sock:close(C).
  29. get_parameter(C, Name) ->
  30. pgsql_sock:get_parameter(C, Name).
  31. squery(C, Sql) ->
  32. Ref = pgsql_sock:squery(C, Sql),
  33. case receive_results(C, Ref, []) of
  34. [Result] -> Result;
  35. Results -> Results
  36. end.
  37. equery(C, Sql) ->
  38. equery(C, Sql, []).
  39. equery(C, Sql, Parameters) ->
  40. %% TODO don't require reply from 'describe'
  41. case parse(C, Sql) of
  42. {ok, #statement{types = Types} = S} ->
  43. Typed_Parameters = lists:zip(Types, Parameters),
  44. Ref = pgsql_sock:equery(C, S, Typed_Parameters),
  45. receive_result(C, Ref, undefined);
  46. Error ->
  47. Error
  48. end.
  49. %% parse
  50. parse(C, Sql) ->
  51. parse(C, "", Sql, []).
  52. parse(C, Sql, Types) ->
  53. parse(C, "", Sql, Types).
  54. parse(C, Name, Sql, Types) ->
  55. Ref = pgsql_sock:parse(C, Name, Sql, Types),
  56. receive_describe(C, Ref, #statement{name = Name}).
  57. %% bind
  58. bind(C, Statement, Parameters) ->
  59. bind(C, Statement, "", Parameters).
  60. bind(C, Statement, PortalName, Parameters) ->
  61. pgsql_sock:bind(C, Statement, PortalName, Parameters).
  62. %% execute
  63. execute(C, S) ->
  64. execute(C, S, "", 0).
  65. execute(C, S, N) ->
  66. execute(C, S, "", N).
  67. execute(C, S, PortalName, N) ->
  68. Ref = pgsql_sock:execute(C, S, PortalName, N),
  69. receive_extended_result(C, Ref).
  70. %% statement/portal functions
  71. describe(C, Statement = #statement{name = Name}) ->
  72. Ref = pgsql_sock:describe(C, statement, Name),
  73. receive_describe(C, Ref, Statement).
  74. describe(C, statement, Name) ->
  75. Ref = pgsql_sock:describe(C, statement, Name),
  76. receive_describe(C, Ref, #statement{name = Name});
  77. describe(C, Type, Name) ->
  78. pgsql_sock:describe(C, Type, Name).
  79. close(C, #statement{name = Name}) ->
  80. pgsql_sock:close(C, statement, Name).
  81. close(C, Type, Name) ->
  82. pgsql_sock:close(C, Type, Name).
  83. sync(C) ->
  84. pgsql_sock:sync(C).
  85. %% misc helper functions
  86. with_transaction(C, F) ->
  87. try {ok, [], []} = squery(C, "BEGIN"),
  88. R = F(C),
  89. {ok, [], []} = squery(C, "COMMIT"),
  90. R
  91. catch
  92. _:Why ->
  93. squery(C, "ROLLBACK"),
  94. {rollback, Why}
  95. end.
  96. %% -- internal functions --
  97. receive_result(C, Ref, Result) ->
  98. try receive_result(C, Ref, [], []) of
  99. done -> Result;
  100. R -> receive_result(C, Ref, R)
  101. catch
  102. throw:E -> E
  103. end.
  104. receive_results(C, Ref, Results) ->
  105. try receive_result(C, Ref, [], []) of
  106. done -> lists:reverse(Results);
  107. R -> receive_results(C, Ref, [R | Results])
  108. catch
  109. throw:E -> E
  110. end.
  111. receive_result(C, Ref, Cols, Rows) ->
  112. receive
  113. {Ref, {columns, Cols2}} ->
  114. receive_result(C, Ref, Cols2, Rows);
  115. {Ref, {data, Row}} ->
  116. receive_result(C, Ref, Cols, [Row | Rows]);
  117. {Ref, {error, _E} = Error} ->
  118. Error;
  119. {Ref, {complete, {_Type, Count}}} ->
  120. case Rows of
  121. [] -> {ok, Count};
  122. _L -> {ok, Count, Cols, lists:reverse(Rows)}
  123. end;
  124. {Ref, {complete, _Type}} ->
  125. {ok, Cols, lists:reverse(Rows)};
  126. {Ref, done} ->
  127. done;
  128. {Ref, timeout} ->
  129. throw({error, timeout});
  130. {'EXIT', C, _Reason} ->
  131. throw({error, closed})
  132. end.
  133. receive_extended_result(C, Ref)->
  134. receive_extended_result(C, Ref, []).
  135. receive_extended_result(C, Ref, Rows) ->
  136. receive
  137. {Ref, {data, Row}} ->
  138. receive_extended_result(C, Ref, [Row | Rows]);
  139. {Ref, {error, _E} = Error} ->
  140. Error;
  141. {Ref, suspended} ->
  142. {partial, lists:reverse(Rows)};
  143. {Ref, {complete, {_Type, Count}}} ->
  144. case Rows of
  145. [] -> {ok, Count};
  146. _L -> {ok, Count, lists:reverse(Rows)}
  147. end;
  148. {Ref, {complete, _Type}} ->
  149. {ok, lists:reverse(Rows)};
  150. {'EXIT', C, _Reason} ->
  151. {error, closed}
  152. end.
  153. receive_describe(C, Ref, Statement = #statement{}) ->
  154. receive
  155. {Ref, {types, Types}} ->
  156. receive_describe(C, Ref, Statement#statement{types = Types});
  157. {Ref, {columns, Columns}} ->
  158. {ok, Statement#statement{columns = Columns}};
  159. {Ref, no_data} ->
  160. Statement#statement{columns = []};
  161. {Ref, Error = {error, _}} ->
  162. Error;
  163. {'EXIT', C, _Reason} ->
  164. {error, closed}
  165. end.