epgsql.erl 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. %%% Copyright (C) 2008 - Will Glozer. All rights reserved.
  2. %%% Copyright (C) 2011 - Anton Lebedevich. All rights reserved.
  3. -module(epgsql).
  4. -export([connect/1, connect/2, connect/3, connect/4, connect/5,
  5. close/1,
  6. get_parameter/2,
  7. set_notice_receiver/2,
  8. get_cmd_status/1,
  9. squery/2,
  10. equery/2, equery/3, equery/4,
  11. prepared_query/3,
  12. parse/2, parse/3, parse/4,
  13. describe/2, describe/3,
  14. bind/3, bind/4,
  15. execute/2, execute/3, execute/4,
  16. execute_batch/2,
  17. close/2, close/3,
  18. sync/1,
  19. cancel/1,
  20. update_type_cache/1,
  21. update_type_cache/2,
  22. with_transaction/2,
  23. sync_on_error/2,
  24. standby_status_update/3,
  25. start_replication/5,
  26. start_replication/6,
  27. to_proplist/1]).
  28. -export_type([connection/0, connect_option/0, connect_opts/0,
  29. connect_error/0, query_error/0,
  30. sql_query/0, bind_param/0, typed_param/0,
  31. squery_row/0, equery_row/0, reply/1]).
  32. -include("epgsql.hrl").
  33. -type sql_query() :: string() | iodata().
  34. -type host() :: inet:ip_address() | inet:hostname().
  35. -type connection() :: pid().
  36. -type connect_option() ::
  37. {host, host()} |
  38. {username, string()} |
  39. {password, string()} |
  40. {database, DBName :: string()} |
  41. {port, PortNum :: inet:port_number()} |
  42. {ssl, IsEnabled :: boolean() | required} |
  43. {ssl_opts, SslOptions :: [ssl:ssl_option()]} | % @see OTP ssl app, ssl_api.hrl
  44. {timeout, TimeoutMs :: timeout()} | % default: 5000 ms
  45. {async, Receiver :: pid() | atom()} | % process to receive LISTEN/NOTIFY msgs
  46. {replication, Replication :: string()}. % Pass "database" to connect in replication mode
  47. -ifdef(have_maps).
  48. -type connect_opts() ::
  49. [connect_option()]
  50. | #{host => host(),
  51. username => string(),
  52. password => string(),
  53. database => string(),
  54. port => inet:port_number(),
  55. ssl => boolean() | required,
  56. ssl_opts => [ssl:ssl_option()],
  57. timeout => timeout(),
  58. async => pid(),
  59. replication => string()}.
  60. -else.
  61. -type connect_opts() :: [connect_option()].
  62. -endif.
  63. -type connect_error() ::
  64. #error{}
  65. | {unsupported_auth_method, atom()}
  66. | invalid_authorization_specification
  67. | invalid_password.
  68. -type query_error() :: #error{}.
  69. -type bind_param() ::
  70. null
  71. | boolean()
  72. | string()
  73. | binary()
  74. | integer()
  75. | float()
  76. | calendar:date()
  77. | calendar:time() %actualy, `Seconds' may be float()
  78. | calendar:datetime()
  79. | {calendar:time(), Days::non_neg_integer(), Months::non_neg_integer()}
  80. | {list({binary(), binary() | null})} % hstore
  81. | [bind_param()]. %array (maybe nested)
  82. -type typed_param() ::
  83. {epgsql_type(), bind_param()}.
  84. -type squery_row() :: tuple(). % tuple of binary().
  85. -type equery_row() :: tuple(). % tuple of bind_param().
  86. -type ok_reply(RowType) ::
  87. {ok, ColumnsDescription :: [#column{}], RowsValues :: [RowType]} | % select
  88. {ok, Count :: non_neg_integer()} | % update/insert/delete
  89. {ok, Count :: non_neg_integer(), ColumnsDescription :: [#column{}], RowsValues :: [RowType]}. % update/insert/delete + returning
  90. -type error_reply() :: {error, query_error()}.
  91. -type reply(RowType) :: ok_reply(RowType) | error_reply().
  92. -type lsn() :: integer().
  93. -type cb_state() :: term().
  94. %% -- behaviour callbacks --
  95. %% Handles a XLogData Message (StartLSN, EndLSN, WALRecord, CbState).
  96. %% Return: {ok, LastFlushedLSN, LastAppliedLSN, NewCbState}
  97. -callback handle_x_log_data(lsn(), lsn(), binary(), cb_state()) -> {ok, lsn(), lsn(), cb_state()}.
  98. %% -------------
  99. %% -- client interface --
  100. -spec connect(connect_opts())
  101. -> {ok, Connection :: connection()} | {error, Reason :: connect_error()}.
  102. connect(Settings0) ->
  103. Settings = to_proplist(Settings0),
  104. Host = proplists:get_value(host, Settings, "localhost"),
  105. Username = proplists:get_value(username, Settings, os:getenv("USER")),
  106. Password = proplists:get_value(password, Settings, ""),
  107. connect(Host, Username, Password, Settings).
  108. connect(Host, Opts) ->
  109. connect(Host, os:getenv("USER"), "", Opts).
  110. connect(Host, Username, Opts) ->
  111. connect(Host, Username, "", Opts).
  112. -spec connect(host(), string(), string(), connect_opts())
  113. -> {ok, Connection :: connection()} | {error, Reason :: connect_error()}.
  114. %% @doc connects to Postgres
  115. %% where
  116. %% `Host' - host to connect to
  117. %% `Username' - username to connect as, defaults to `$USER'
  118. %% `Password' - optional password to authenticate with
  119. %% `Opts' - proplist of extra options
  120. %% returns `{ok, Connection}' otherwise `{error, Reason}'
  121. connect(Host, Username, Password, Opts) ->
  122. {ok, C} = epgsql_sock:start_link(),
  123. connect(C, Host, Username, Password, Opts).
  124. -spec connect(connection(), host(), string(), string(), connect_opts())
  125. -> {ok, Connection :: connection()} | {error, Reason :: connect_error()}.
  126. connect(C, Host, Username, Password, Opts0) ->
  127. Opts = to_proplist(Opts0),
  128. %% TODO connect timeout
  129. case gen_server:call(C,
  130. {connect, Host, Username, Password, Opts},
  131. infinity) of
  132. connected ->
  133. case proplists:get_value(replication, Opts, undefined) of
  134. undefined ->
  135. update_type_cache(C),
  136. {ok, C};
  137. _ -> {ok, C} %% do not update update_type_cache if connection is in replication mode
  138. end;
  139. Error = {error, _} ->
  140. Error
  141. end.
  142. -spec update_type_cache(connection()) -> ok.
  143. update_type_cache(C) ->
  144. update_type_cache(C, [<<"hstore">>,<<"geometry">>]).
  145. -spec update_type_cache(connection(), [binary()]) -> ok.
  146. update_type_cache(C, DynamicTypes) ->
  147. Query = "SELECT typname, oid::int4, typarray::int4"
  148. " FROM pg_type"
  149. " WHERE typname = ANY($1::varchar[])",
  150. case equery(C, Query, [DynamicTypes]) of
  151. {ok, _, TypeInfos} ->
  152. ok = gen_server:call(C, {update_type_cache, TypeInfos});
  153. {error, {error, error, _, _,
  154. <<"column \"typarray\" does not exist in pg_type">>, _}} ->
  155. %% Do not fail connect if pg_type table in not in the expected
  156. %% format. Known to happen for Redshift which is based on PG v8.0.2
  157. ok
  158. end.
  159. -spec close(connection()) -> ok.
  160. close(C) ->
  161. epgsql_sock:close(C).
  162. -spec get_parameter(connection(), binary()) -> binary() | undefined.
  163. get_parameter(C, Name) ->
  164. epgsql_sock:get_parameter(C, Name).
  165. -spec set_notice_receiver(connection(), undefined | pid() | atom()) ->
  166. {ok, Previous :: pid() | atom()}.
  167. set_notice_receiver(C, PidOrName) ->
  168. epgsql_sock:set_notice_receiver(C, PidOrName).
  169. %% @doc Returns last command status message
  170. %% If multiple queries was executed using `squery/2', separated by semicolon,
  171. %% only last query's status will be available.
  172. %% See https://www.postgresql.org/docs/current/static/libpq-exec.html#LIBPQ-PQCMDSTATUS
  173. -spec get_cmd_status(connection()) -> {ok, Status}
  174. when
  175. Status :: undefined | atom() | {atom(), integer()}.
  176. get_cmd_status(C) ->
  177. epgsql_sock:get_cmd_status(C).
  178. -spec squery(connection(), sql_query()) -> reply(squery_row()) | [reply(squery_row())].
  179. %% @doc runs simple `SqlQuery' via given `Connection'
  180. squery(Connection, SqlQuery) ->
  181. gen_server:call(Connection, {squery, SqlQuery}, infinity).
  182. equery(C, Sql) ->
  183. equery(C, Sql, []).
  184. %% TODO add fast_equery command that doesn't need parsed statement
  185. equery(C, Sql, Parameters) ->
  186. case parse(C, "", Sql, []) of
  187. {ok, #statement{types = Types} = S} ->
  188. Typed_Parameters = lists:zip(Types, Parameters),
  189. gen_server:call(C, {equery, S, Typed_Parameters}, infinity);
  190. Error ->
  191. Error
  192. end.
  193. -spec equery(connection(), string(), sql_query(), [bind_param()]) -> reply(equery_row()).
  194. equery(C, Name, Sql, Parameters) ->
  195. case parse(C, Name, Sql, []) of
  196. {ok, #statement{types = Types} = S} ->
  197. Typed_Parameters = lists:zip(Types, Parameters),
  198. gen_server:call(C, {equery, S, Typed_Parameters}, infinity);
  199. Error ->
  200. Error
  201. end.
  202. -spec prepared_query(C::connection(), Name::string(), Parameters::[bind_param()]) -> reply(equery_row()).
  203. prepared_query(C, Name, Parameters) ->
  204. case describe(C, statement, Name) of
  205. {ok, #statement{types = Types} = S} ->
  206. Typed_Parameters = lists:zip(Types, Parameters),
  207. gen_server:call(C, {prepared_query, S, Typed_Parameters}, infinity);
  208. Error ->
  209. Error
  210. end.
  211. %% parse
  212. parse(C, Sql) ->
  213. parse(C, Sql, []).
  214. parse(C, Sql, Types) ->
  215. parse(C, "", Sql, Types).
  216. -spec parse(connection(), iolist(), sql_query(), [epgsql_type()]) ->
  217. {ok, #statement{}} | {error, query_error()}.
  218. parse(C, Name, Sql, Types) ->
  219. sync_on_error(C, gen_server:call(C, {parse, Name, Sql, Types}, infinity)).
  220. %% bind
  221. bind(C, Statement, Parameters) ->
  222. bind(C, Statement, "", Parameters).
  223. -spec bind(connection(), #statement{}, string(), [bind_param()]) ->
  224. ok | {error, query_error()}.
  225. bind(C, Statement, PortalName, Parameters) ->
  226. sync_on_error(
  227. C,
  228. gen_server:call(C, {bind, Statement, PortalName, Parameters}, infinity)).
  229. %% execute
  230. execute(C, S) ->
  231. execute(C, S, "", 0).
  232. execute(C, S, N) ->
  233. execute(C, S, "", N).
  234. -spec execute(connection(), #statement{}, string(), non_neg_integer()) -> Reply
  235. when
  236. Reply :: {ok | partial, [equery_row()]}
  237. | {ok, non_neg_integer()}
  238. | {ok, non_neg_integer(), [equery_row()]}
  239. | {error, query_error()}.
  240. execute(C, S, PortalName, N) ->
  241. gen_server:call(C, {execute, S, PortalName, N}, infinity).
  242. -spec execute_batch(connection(), [{#statement{}, [bind_param()]}]) -> [reply(equery_row())].
  243. execute_batch(C, Batch) ->
  244. gen_server:call(C, {execute_batch, Batch}, infinity).
  245. %% statement/portal functions
  246. describe(C, #statement{name = Name}) ->
  247. describe(C, statement, Name).
  248. describe(C, statement, Name) ->
  249. sync_on_error(C, gen_server:call(C, {describe_statement, Name}, infinity));
  250. %% TODO unknown result format of Describe portal
  251. describe(C, portal, Name) ->
  252. sync_on_error(C, gen_server:call(C, {describe_portal, Name}, infinity)).
  253. close(C, #statement{name = Name}) ->
  254. close(C, statement, Name).
  255. close(C, Type, Name) ->
  256. gen_server:call(C, {close, Type, Name}).
  257. sync(C) ->
  258. gen_server:call(C, sync).
  259. -spec cancel(connection()) -> ok.
  260. cancel(C) ->
  261. epgsql_sock:cancel(C).
  262. %% misc helper functions
  263. -spec with_transaction(connection(), fun((connection()) -> Reply)) ->
  264. Reply | {rollback, any()}
  265. when
  266. Reply :: any().
  267. with_transaction(C, F) ->
  268. try {ok, [], []} = squery(C, "BEGIN"),
  269. R = F(C),
  270. {ok, [], []} = squery(C, "COMMIT"),
  271. R
  272. catch
  273. _:Why ->
  274. squery(C, "ROLLBACK"),
  275. %% TODO hides error stacktrace
  276. {rollback, Why}
  277. end.
  278. sync_on_error(C, Error = {error, _}) ->
  279. ok = sync(C),
  280. Error;
  281. sync_on_error(_C, R) ->
  282. R.
  283. -spec standby_status_update(connection(), lsn(), lsn()) -> ok | error_reply().
  284. %% @doc sends last flushed and applied WAL positions to the server in a standby status update message via given `Connection'
  285. standby_status_update(Connection, FlushedLSN, AppliedLSN) ->
  286. gen_server:call(Connection, {standby_status_update, FlushedLSN, AppliedLSN}).
  287. -spec start_replication(connection(), string(), Callback, cb_state(), string(), string()) -> ok | error_reply() when
  288. Callback :: module() | pid().
  289. %% @doc instructs Postgres server to start streaming WAL for logical replication
  290. %% where
  291. %% `Connection' - connection in replication mode
  292. %% `ReplicationSlot' - the name of the replication slot to stream changes from
  293. %% `Callback' - Callback module which should have the callback functions implemented for message processing.
  294. %% or a process which should be able to receive replication messages.
  295. %% `CbInitState' - Callback Module's initial state
  296. %% `WALPosition' - the WAL position XXX/XXX to begin streaming at.
  297. %% "0/0" to let the server determine the start point.
  298. %% `PluginOpts' - optional options passed to the slot's logical decoding plugin.
  299. %% For example: "option_name1 'value1', option_name2 'value2'"
  300. %% returns `ok' otherwise `{error, Reason}'
  301. start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts) ->
  302. gen_server:call(Connection, {start_replication, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts}).
  303. start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition) ->
  304. start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, []).
  305. %% @private
  306. to_proplist(List) when is_list(List) ->
  307. List;
  308. to_proplist(Map) ->
  309. maps:to_list(Map).