pgsql.erl 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. %%% Copyright (C) 2008 - Will Glozer. All rights reserved.
  2. -module(pgsql).
  3. -export([connect/2, connect/3, connect/4, close/1]).
  4. -export([get_parameter/2, squery/2, equery/2, equery/3]).
  5. -export([parse/2, parse/3, parse/4, describe/2, describe/3]).
  6. -export([bind/3, bind/4, execute/2, execute/3, execute/4]).
  7. -export([close/2, close/3, sync/1]).
  8. -export([with_transaction/2]).
  9. -include("pgsql.hrl").
  10. %% -- client interface --
  11. connect(Host, Opts) ->
  12. connect(Host, os:getenv("USER"), "", Opts).
  13. connect(Host, Username, Opts) ->
  14. connect(Host, Username, "", Opts).
  15. connect(Host, Username, Password, Opts) ->
  16. {ok, C} = pgsql_connection:start_link(),
  17. pgsql_connection:connect(C, Host, Username, Password, Opts).
  18. close(C) when is_pid(C) ->
  19. catch pgsql_connection:stop(C),
  20. ok.
  21. get_parameter(C, Name) ->
  22. pgsql_connection:get_parameter(C, Name).
  23. squery(C, Sql) ->
  24. ok = pgsql_connection:squery(C, Sql),
  25. case receive_results(C, []) of
  26. [Result] -> Result;
  27. Results -> Results
  28. end.
  29. equery(C, Sql) ->
  30. equery(C, Sql, []).
  31. equery(C, Sql, Parameters) ->
  32. case pgsql_connection:parse(C, "", Sql, []) of
  33. {ok, #statement{types = Types} = S} ->
  34. Typed_Parameters = lists:zip(Types, Parameters),
  35. ok = pgsql_connection:equery(C, S, Typed_Parameters),
  36. receive_result(C, undefined);
  37. Error ->
  38. Error
  39. end.
  40. %% parse
  41. parse(C, Sql) ->
  42. parse(C, "", Sql, []).
  43. parse(C, Sql, Types) ->
  44. parse(C, "", Sql, Types).
  45. parse(C, Name, Sql, Types) ->
  46. pgsql_connection:parse(C, Name, Sql, Types).
  47. %% bind
  48. bind(C, Statement, Parameters) ->
  49. bind(C, Statement, "", Parameters).
  50. bind(C, Statement, PortalName, Parameters) ->
  51. pgsql_connection:bind(C, Statement, PortalName, Parameters).
  52. %% execute
  53. execute(C, S) ->
  54. execute(C, S, "", 0).
  55. execute(C, S, N) ->
  56. execute(C, S, "", N).
  57. execute(C, S, PortalName, N) ->
  58. pgsql_connection:execute(C, S, PortalName, N),
  59. receive_extended_result(C).
  60. %% statement/portal functions
  61. describe(C, #statement{name = Name}) ->
  62. pgsql_connection:describe(C, statement, Name).
  63. describe(C, Type, Name) ->
  64. pgsql_connection:describe(C, Type, Name).
  65. close(C, #statement{name = Name}) ->
  66. pgsql_connection:close(C, statement, Name).
  67. close(C, Type, Name) ->
  68. pgsql_connection:close(C, Type, Name).
  69. sync(C) ->
  70. pgsql_connection:sync(C).
  71. %% misc helper functions
  72. with_transaction(C, F) ->
  73. try {ok, [], []} = squery(C, "BEGIN"),
  74. R = F(C),
  75. {ok, [], []} = squery(C, "COMMIT"),
  76. R
  77. catch
  78. _:Why ->
  79. squery(C, "ROLLBACK"),
  80. {rollback, Why}
  81. end.
  82. %% -- internal functions --
  83. receive_result(C, Result) ->
  84. try receive_result(C, [], []) of
  85. done -> Result;
  86. R -> receive_result(C, R)
  87. catch
  88. throw:E -> E
  89. end.
  90. receive_results(C, Results) ->
  91. try receive_result(C, [], []) of
  92. done -> lists:reverse(Results);
  93. R -> receive_results(C, [R | Results])
  94. catch
  95. throw:E -> E
  96. end.
  97. receive_result(C, Cols, Rows) ->
  98. receive
  99. {pgsql, C, {columns, Cols2}} ->
  100. receive_result(C, Cols2, Rows);
  101. {pgsql, C, {data, Row}} ->
  102. receive_result(C, Cols, [Row | Rows]);
  103. {pgsql, C, {error, _E} = Error} ->
  104. Error;
  105. {pgsql, C, {complete, {_Type, Count}}} ->
  106. case Rows of
  107. [] -> {ok, Count};
  108. _L -> {ok, Count, Cols, lists:reverse(Rows)}
  109. end;
  110. {pgsql, C, {complete, _Type}} ->
  111. {ok, Cols, lists:reverse(Rows)};
  112. {pgsql, C, done} ->
  113. done;
  114. {pgsql, C, timeout} ->
  115. throw({error, timeout});
  116. {'EXIT', C, _Reason} ->
  117. throw({error, closed})
  118. end.
  119. receive_extended_result(C)->
  120. receive_extended_result(C, []).
  121. receive_extended_result(C, Rows) ->
  122. receive
  123. {pgsql, C, {data, Row}} ->
  124. receive_extended_result(C, [Row | Rows]);
  125. {pgsql, C, {error, _E} = Error} ->
  126. Error;
  127. {pgsql, C, suspended} ->
  128. {partial, lists:reverse(Rows)};
  129. {pgsql, C, {complete, {_Type, Count}}} ->
  130. case Rows of
  131. [] -> {ok, Count};
  132. _L -> {ok, Count, lists:reverse(Rows)}
  133. end;
  134. {pgsql, C, {complete, _Type}} ->
  135. {ok, lists:reverse(Rows)};
  136. {pgsql, C, timeout} ->
  137. {error, timeout};
  138. {'EXIT', C, _Reason} ->
  139. {error, closed}
  140. end.