epgsql.erl 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. %%% Copyright (C) 2008 - Will Glozer. All rights reserved.
  2. %%% Copyright (C) 2011 - Anton Lebedevich. All rights reserved.
  3. -module(epgsql).
  4. -export([connect/1, connect/2, connect/3, connect/4, connect/5,
  5. close/1,
  6. get_parameter/2,
  7. squery/2,
  8. equery/2, equery/3, equery/4,
  9. parse/2, parse/3, parse/4,
  10. describe/2, describe/3,
  11. bind/3, bind/4,
  12. execute/2, execute/3, execute/4,
  13. execute_batch/2,
  14. close/2, close/3,
  15. sync/1,
  16. cancel/1,
  17. update_type_cache/1,
  18. with_transaction/2,
  19. sync_on_error/2]).
  20. -export_type([connection/0, connect_option/0,
  21. connect_error/0, query_error/0,
  22. bind_param/0,
  23. squery_row/0, equery_row/0, reply/1]).
  24. -include("epgsql.hrl").
  25. -type sql_query() :: string() | iodata().
  26. -type host() :: inet:ip_address() | inet:hostname().
  27. -type connection() :: pid().
  28. -type connect_option() ::
  29. {database, DBName :: string()} |
  30. {port, PortNum :: inet:port_number()} |
  31. {ssl, IsEnabled :: boolean() | required} |
  32. {ssl_opts, SslOptions :: [ssl:ssl_option()]} | % @see OTP ssl app, ssl_api.hrl
  33. {timeout, TimeoutMs :: timeout()} | % default: 5000 ms
  34. {async, Receiver :: pid()}. % process to receive LISTEN/NOTIFY msgs
  35. -type connect_error() :: #error{}.
  36. -type query_error() :: #error{}.
  37. -type bind_param() ::
  38. null
  39. | boolean()
  40. | string()
  41. | binary()
  42. | integer()
  43. | float()
  44. | calendar:date()
  45. | calendar:time() %actualy, `Seconds' may be float()
  46. | calendar:datetime()
  47. | {calendar:time(), Days::non_neg_integer(), Months::non_neg_integer()}
  48. | [bind_param()]. %array (maybe nested)
  49. -type squery_row() :: {binary()}.
  50. -type equery_row() :: {bind_param()}.
  51. -type ok_reply(RowType) ::
  52. {ok, Count :: non_neg_integer()} | % select
  53. {ok, ColumnsDescription :: [#column{}], RowsValues :: [RowType]} | % update/insert
  54. {ok, Count :: non_neg_integer(), ColumnsDescription :: [#column{}], RowsValues :: [RowType]}. % update/insert + returning
  55. -type error_reply() :: {error, query_error()}.
  56. -type reply(RowType) :: ok_reply(RowType) | error_reply().
  57. %% -- client interface --
  58. connect(Settings) ->
  59. Host = proplists:get_value(host, Settings, "localhost"),
  60. Username = proplists:get_value(username, Settings, os:getenv("USER")),
  61. Password = proplists:get_value(password, Settings, ""),
  62. connect(Host, Username, Password, Settings).
  63. connect(Host, Opts) ->
  64. connect(Host, os:getenv("USER"), "", Opts).
  65. connect(Host, Username, Opts) ->
  66. connect(Host, Username, "", Opts).
  67. -spec connect(host(), string(), string(), [connect_option()])
  68. -> {ok, Connection :: connection()} | {error, Reason :: connect_error()}.
  69. %% @doc connects to Postgres
  70. %% where
  71. %% `Host' - host to connect to
  72. %% `Username' - username to connect as, defaults to `$USER'
  73. %% `Password' - optional password to authenticate with
  74. %% `Opts' - proplist of extra options
  75. %% returns `{ok, Connection}' otherwise `{error, Reason}'
  76. connect(Host, Username, Password, Opts) ->
  77. {ok, C} = epgsql_sock:start_link(),
  78. connect(C, Host, Username, Password, Opts).
  79. -spec connect(connection(), host(), string(), string(), [connect_option()])
  80. -> {ok, Connection :: connection()} | {error, Reason :: connect_error()}.
  81. connect(C, Host, Username, Password, Opts) ->
  82. %% TODO connect timeout
  83. case gen_server:call(C,
  84. {connect, Host, Username, Password, Opts},
  85. infinity) of
  86. connected ->
  87. update_type_cache(C),
  88. {ok, C};
  89. Error = {error, _} ->
  90. Error
  91. end.
  92. -spec update_type_cache(connection()) -> ok.
  93. update_type_cache(C) ->
  94. DynamicTypes = [<<"hstore">>,<<"geometry">>],
  95. Query = "SELECT typname, oid::int4, typarray::int4"
  96. " FROM pg_type"
  97. " WHERE typname = ANY($1::varchar[])",
  98. {ok, _, TypeInfos} = equery(C, Query, [DynamicTypes]),
  99. ok = gen_server:call(C, {update_type_cache, TypeInfos}).
  100. -spec close(connection()) -> ok.
  101. close(C) ->
  102. epgsql_sock:close(C).
  103. -spec get_parameter(connection(), binary()) -> binary() | undefined.
  104. get_parameter(C, Name) ->
  105. epgsql_sock:get_parameter(C, Name).
  106. -spec squery(connection(), sql_query()) -> reply(squery_row()) | [reply(squery_row())].
  107. %% @doc runs simple `SqlQuery' via given `Connection'
  108. squery(Connection, SqlQuery) ->
  109. gen_server:call(Connection, {squery, SqlQuery}, infinity).
  110. equery(C, Sql) ->
  111. equery(C, Sql, []).
  112. %% TODO add fast_equery command that doesn't need parsed statement
  113. equery(C, Sql, Parameters) ->
  114. case parse(C, "", Sql, []) of
  115. {ok, #statement{types = Types} = S} ->
  116. Typed_Parameters = lists:zip(Types, Parameters),
  117. gen_server:call(C, {equery, S, Typed_Parameters}, infinity);
  118. Error ->
  119. Error
  120. end.
  121. -spec equery(connection(), string(), sql_query(), [bind_param()]) -> reply(equery_row()).
  122. equery(C, Name, Sql, Parameters) ->
  123. case parse(C, Name, Sql, []) of
  124. {ok, #statement{types = Types} = S} ->
  125. Typed_Parameters = lists:zip(Types, Parameters),
  126. gen_server:call(C, {equery, S, Typed_Parameters}, infinity);
  127. Error ->
  128. Error
  129. end.
  130. %% parse
  131. parse(C, Sql) ->
  132. parse(C, Sql, []).
  133. parse(C, Sql, Types) ->
  134. parse(C, "", Sql, Types).
  135. -spec parse(connection(), iolist(), sql_query(), [epgsql_type()]) ->
  136. {ok, #statement{}} | {error, query_error()}.
  137. parse(C, Name, Sql, Types) ->
  138. sync_on_error(C, gen_server:call(C, {parse, Name, Sql, Types}, infinity)).
  139. %% bind
  140. bind(C, Statement, Parameters) ->
  141. bind(C, Statement, "", Parameters).
  142. -spec bind(connection(), #statement{}, string(), [bind_param()]) ->
  143. ok | {error, query_error()}.
  144. bind(C, Statement, PortalName, Parameters) ->
  145. sync_on_error(
  146. C,
  147. gen_server:call(C, {bind, Statement, PortalName, Parameters}, infinity)).
  148. %% execute
  149. execute(C, S) ->
  150. execute(C, S, "", 0).
  151. execute(C, S, N) ->
  152. execute(C, S, "", N).
  153. -spec execute(connection(), #statement{}, string(), non_neg_integer()) -> Reply
  154. when
  155. Reply :: {ok | partial, [equery_row()]}
  156. | {ok, non_neg_integer()}
  157. | {ok, non_neg_integer(), [equery_row()]}
  158. | {error, query_error()}.
  159. execute(C, S, PortalName, N) ->
  160. gen_server:call(C, {execute, S, PortalName, N}, infinity).
  161. -spec execute_batch(connection(), [{#statement{}, [bind_param()]}]) -> [reply(equery_row())].
  162. execute_batch(C, Batch) ->
  163. gen_server:call(C, {execute_batch, Batch}, infinity).
  164. %% statement/portal functions
  165. describe(C, #statement{name = Name}) ->
  166. describe(C, statement, Name).
  167. describe(C, statement, Name) ->
  168. sync_on_error(C, gen_server:call(C, {describe_statement, Name}, infinity));
  169. %% TODO unknown result format of Describe portal
  170. describe(C, portal, Name) ->
  171. sync_on_error(C, gen_server:call(C, {describe_portal, Name}, infinity)).
  172. close(C, #statement{name = Name}) ->
  173. close(C, statement, Name).
  174. close(C, Type, Name) ->
  175. gen_server:call(C, {close, Type, Name}).
  176. sync(C) ->
  177. gen_server:call(C, sync).
  178. -spec cancel(connection()) -> ok.
  179. cancel(C) ->
  180. epgsql_sock:cancel(C).
  181. %% misc helper functions
  182. -spec with_transaction(connection(), fun((connection()) -> Reply)) ->
  183. Reply | {rollback, any()}
  184. when
  185. Reply :: any().
  186. with_transaction(C, F) ->
  187. try {ok, [], []} = squery(C, "BEGIN"),
  188. R = F(C),
  189. {ok, [], []} = squery(C, "COMMIT"),
  190. R
  191. catch
  192. _:Why ->
  193. squery(C, "ROLLBACK"),
  194. %% TODO hides error stacktrace
  195. {rollback, Why}
  196. end.
  197. sync_on_error(C, Error = {error, _}) ->
  198. ok = sync(C),
  199. Error;
  200. sync_on_error(_C, R) ->
  201. R.