epgsql.erl 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. %%% Copyright (C) 2008 - Will Glozer. All rights reserved.
  2. %%% Copyright (C) 2011 - Anton Lebedevich. All rights reserved.
  3. -module(epgsql).
  4. -export([connect/1, connect/2, connect/3, connect/4, connect/5,
  5. close/1,
  6. get_parameter/2,
  7. set_notice_receiver/2,
  8. get_cmd_status/1,
  9. squery/2,
  10. equery/2, equery/3, equery/4,
  11. prepared_query/3,
  12. parse/2, parse/3, parse/4,
  13. describe/2, describe/3,
  14. bind/3, bind/4,
  15. execute/2, execute/3, execute/4,
  16. execute_batch/2,
  17. close/2, close/3,
  18. sync/1,
  19. cancel/1,
  20. update_type_cache/1,
  21. update_type_cache/2,
  22. with_transaction/2,
  23. with_transaction/3,
  24. sync_on_error/2,
  25. standby_status_update/3,
  26. start_replication/5,
  27. start_replication/6,
  28. to_proplist/1]).
  29. -export([handle_x_log_data/5]). % private
  30. -export_type([connection/0, connect_option/0, connect_opts/0,
  31. connect_error/0, query_error/0, sql_query/0, column/0,
  32. type_name/0, epgsql_type/0, statement/0]).
  33. %% Deprecated types
  34. -export_type([bind_param/0, typed_param/0,
  35. squery_row/0, equery_row/0, reply/1,
  36. pg_time/0, pg_date/0, pg_datetime/0, pg_interval/0]).
  37. -include("epgsql.hrl").
  38. -type sql_query() :: string() | iodata().
  39. -type host() :: inet:ip_address() | inet:hostname().
  40. -type connection() :: pid().
  41. -type connect_option() ::
  42. {host, host()} |
  43. {username, string()} |
  44. {password, string()} |
  45. {database, DBName :: string()} |
  46. {port, PortNum :: inet:port_number()} |
  47. {ssl, IsEnabled :: boolean() | required} |
  48. {ssl_opts, SslOptions :: [ssl:ssl_option()]} | % see OTP ssl app, ssl_api.hrl
  49. {timeout, TimeoutMs :: timeout()} | % default: 5000 ms
  50. {async, Receiver :: pid() | atom()} | % process to receive LISTEN/NOTIFY msgs
  51. {codecs, Codecs :: [{epgsql_codec:codec_mod(), any()}]} |
  52. {replication, Replication :: string()}. % Pass "database" to connect in replication mode
  53. -ifdef(have_maps).
  54. -type connect_opts() ::
  55. [connect_option()]
  56. | #{host => host(),
  57. username => string(),
  58. password => string(),
  59. database => string(),
  60. port => inet:port_number(),
  61. ssl => boolean() | required,
  62. ssl_opts => [ssl:ssl_option()],
  63. timeout => timeout(),
  64. async => pid(),
  65. codecs => [{epgsql_codec:codec_mod(), any()}],
  66. replication => string()}.
  67. -else.
  68. -type connect_opts() :: [connect_option()].
  69. -endif.
  70. -type connect_error() :: epgsql_cmd_connect:connect_error().
  71. -type query_error() :: #error{}.
  72. -type type_name() :: atom().
  73. -type epgsql_type() :: type_name()
  74. | {array, type_name()}
  75. | {unknown_oid, integer()}.
  76. %% Deprecated
  77. -type pg_date() :: epgsql_codec_datetime:pg_date().
  78. -type pg_time() :: epgsql_codec_datetime:pg_time().
  79. -type pg_datetime() :: epgsql_codec_datetime:pg_datetime().
  80. -type pg_interval() :: epgsql_codec_datetime:pg_interval().
  81. %% Deprecated
  82. -type bind_param() :: any().
  83. -type typed_param() :: {epgsql_type(), bind_param()}.
  84. -type column() :: #column{}.
  85. -type statement() :: #statement{}.
  86. -type squery_row() :: tuple(). % tuple of binary().
  87. -type equery_row() :: tuple(). % tuple of bind_param().
  88. -type ok_reply(RowType) ::
  89. %% select
  90. {ok, ColumnsDescription :: [column()], RowsValues :: [RowType]} |
  91. %% update/insert/delete
  92. {ok, Count :: non_neg_integer()} |
  93. %% update/insert/delete + returning
  94. {ok, Count :: non_neg_integer(), ColumnsDescription :: [column()], RowsValues :: [RowType]}.
  95. -type error_reply() :: {error, query_error()}.
  96. -type reply(RowType) :: ok_reply(RowType) | error_reply().
  97. -type lsn() :: integer().
  98. -type cb_state() :: term().
  99. %% -- behaviour callbacks --
  100. %% Handles a XLogData Message (StartLSN, EndLSN, WALRecord, CbState).
  101. %% Return: {ok, LastFlushedLSN, LastAppliedLSN, NewCbState}
  102. -callback handle_x_log_data(lsn(), lsn(), binary(), cb_state()) -> {ok, lsn(), lsn(), cb_state()}.
  103. %% -------------
  104. %% -- client interface --
  105. -spec connect(connect_opts())
  106. -> {ok, Connection :: connection()} | {error, Reason :: connect_error()}.
  107. connect(Settings0) ->
  108. Settings = to_proplist(Settings0),
  109. Host = proplists:get_value(host, Settings, "localhost"),
  110. Username = proplists:get_value(username, Settings, os:getenv("USER")),
  111. Password = proplists:get_value(password, Settings, ""),
  112. connect(Host, Username, Password, Settings).
  113. connect(Host, Opts) ->
  114. connect(Host, os:getenv("USER"), "", Opts).
  115. connect(Host, Username, Opts) ->
  116. connect(Host, Username, "", Opts).
  117. -spec connect(host(), string(), string(), connect_opts())
  118. -> {ok, Connection :: connection()} | {error, Reason :: connect_error()}.
  119. %% @doc connects to Postgres
  120. %% where
  121. %% `Host' - host to connect to
  122. %% `Username' - username to connect as, defaults to `$USER'
  123. %% `Password' - optional password to authenticate with
  124. %% `Opts' - proplist of extra options
  125. %% returns `{ok, Connection}' otherwise `{error, Reason}'
  126. connect(Host, Username, Password, Opts) ->
  127. {ok, C} = epgsql_sock:start_link(),
  128. connect(C, Host, Username, Password, Opts).
  129. -spec connect(connection(), host(), string(), string(), connect_opts())
  130. -> {ok, Connection :: connection()} | {error, Reason :: connect_error()}.
  131. connect(C, Host, Username, Password, Opts0) ->
  132. Opts = to_proplist(Opts0),
  133. %% TODO connect timeout
  134. case epgsql_sock:sync_command(
  135. C, epgsql_cmd_connect, {Host, Username, Password, Opts}) of
  136. connected ->
  137. %% If following call fails for you, try to add {codecs, []} connect option
  138. {ok, _} = maybe_update_typecache(C, Opts),
  139. {ok, C};
  140. Error = {error, _} ->
  141. Error
  142. end.
  143. maybe_update_typecache(C, Opts) ->
  144. maybe_update_typecache(C, proplists:get_value(replication, Opts), proplists:get_value(codecs, Opts)).
  145. maybe_update_typecache(C, undefined, undefined) ->
  146. %% TODO: don't execute 'update_type_cache' when `codecs` is undefined.
  147. %% This will break backward compatibility
  148. update_type_cache(C);
  149. maybe_update_typecache(C, undefined, [_ | _] = Codecs) ->
  150. update_type_cache(C, Codecs);
  151. maybe_update_typecache(_, _, _) ->
  152. {ok, []}.
  153. update_type_cache(C) ->
  154. update_type_cache(C, [{epgsql_codec_hstore, []},
  155. {epgsql_codec_postgis, []}]).
  156. -spec update_type_cache(connection(), [{epgsql_codec:codec_mod(), Opts :: any()}]) ->
  157. epgsql_cmd_update_type_cache:response() | {error, empty}.
  158. update_type_cache(_C, []) ->
  159. {error, empty};
  160. update_type_cache(C, Codecs) ->
  161. %% {error, #error{severity = error,
  162. %% message = <<"column \"typarray\" does not exist in pg_type">>}}
  163. %% Do not fail connect if pg_type table in not in the expected
  164. %% format. Known to happen for Redshift which is based on PG v8.0.2
  165. epgsql_sock:sync_command(C, epgsql_cmd_update_type_cache, Codecs).
  166. %% @doc close connection
  167. -spec close(connection()) -> ok.
  168. close(C) ->
  169. epgsql_sock:close(C).
  170. -spec get_parameter(connection(), binary()) -> binary() | undefined.
  171. get_parameter(C, Name) ->
  172. epgsql_sock:get_parameter(C, Name).
  173. -spec set_notice_receiver(connection(), undefined | pid() | atom()) ->
  174. {ok, Previous :: pid() | atom()}.
  175. set_notice_receiver(C, PidOrName) ->
  176. epgsql_sock:set_notice_receiver(C, PidOrName).
  177. %% @doc Returns last command status message
  178. %% If multiple queries were executed using `squery/2', separated by semicolon,
  179. %% only the last query's status will be available.
  180. %% See https://www.postgresql.org/docs/current/static/libpq-exec.html#LIBPQ-PQCMDSTATUS
  181. -spec get_cmd_status(connection()) -> {ok, Status}
  182. when
  183. Status :: undefined | atom() | {atom(), integer()}.
  184. get_cmd_status(C) ->
  185. epgsql_sock:get_cmd_status(C).
  186. -spec squery(connection(), sql_query()) -> epgsql_cmd_squery:response().
  187. %% @doc runs simple `SqlQuery' via given `Connection'
  188. squery(Connection, SqlQuery) ->
  189. epgsql_sock:sync_command(Connection, epgsql_cmd_squery, SqlQuery).
  190. equery(C, Sql) ->
  191. equery(C, Sql, []).
  192. %% TODO add fast_equery command that doesn't need parsed statement
  193. equery(C, Sql, Parameters) ->
  194. case parse(C, "", Sql, []) of
  195. {ok, #statement{types = Types} = S} ->
  196. TypedParameters = lists:zip(Types, Parameters),
  197. epgsql_sock:sync_command(C, epgsql_cmd_equery, {S, TypedParameters});
  198. Error ->
  199. Error
  200. end.
  201. -spec equery(connection(), string(), sql_query(), [bind_param()]) ->
  202. epgsql_cmd_equery:response().
  203. equery(C, Name, Sql, Parameters) ->
  204. case parse(C, Name, Sql, []) of
  205. {ok, #statement{types = Types} = S} ->
  206. TypedParameters = lists:zip(Types, Parameters),
  207. epgsql_sock:sync_command(C, epgsql_cmd_equery, {S, TypedParameters});
  208. Error ->
  209. Error
  210. end.
  211. -spec prepared_query(C::connection(), Name::string(), Parameters::[bind_param()]) ->
  212. epgsql_cmd_prepared_query:response().
  213. prepared_query(C, Name, Parameters) ->
  214. case describe(C, statement, Name) of
  215. {ok, #statement{types = Types} = S} ->
  216. TypedParameters = lists:zip(Types, Parameters),
  217. epgsql_sock:sync_command(C, epgsql_cmd_prepared_query, {S, TypedParameters});
  218. Error ->
  219. Error
  220. end.
  221. %% parse
  222. parse(C, Sql) ->
  223. parse(C, Sql, []).
  224. parse(C, Sql, Types) ->
  225. parse(C, "", Sql, Types).
  226. -spec parse(connection(), iolist(), sql_query(), [epgsql_type()]) ->
  227. epgsql_cmd_parse:response().
  228. parse(C, Name, Sql, Types) ->
  229. sync_on_error(
  230. C, epgsql_sock:sync_command(
  231. C, epgsql_cmd_parse, {Name, Sql, Types})).
  232. %% bind
  233. bind(C, Statement, Parameters) ->
  234. bind(C, Statement, "", Parameters).
  235. -spec bind(connection(), statement(), string(), [bind_param()]) ->
  236. epgsql_cmd_bind:response().
  237. bind(C, Statement, PortalName, Parameters) ->
  238. sync_on_error(
  239. C,
  240. epgsql_sock:sync_command(
  241. C, epgsql_cmd_bind, {Statement, PortalName, Parameters})).
  242. %% execute
  243. execute(C, S) ->
  244. execute(C, S, "", 0).
  245. execute(C, S, N) ->
  246. execute(C, S, "", N).
  247. -spec execute(connection(), statement(), string(), non_neg_integer()) -> Reply when
  248. Reply :: epgsql_cmd_execute:response().
  249. execute(C, S, PortalName, N) ->
  250. epgsql_sock:sync_command(C, epgsql_cmd_execute, {S, PortalName, N}).
  251. -spec execute_batch(connection(), [{statement(), [bind_param()]}]) ->
  252. epgsql_cmd_batch:response().
  253. execute_batch(C, Batch) ->
  254. epgsql_sock:sync_command(C, epgsql_cmd_batch, Batch).
  255. %% statement/portal functions
  256. -spec describe(connection(), statement()) -> epgsql_cmd_describe_statement:response().
  257. describe(C, #statement{name = Name}) ->
  258. describe(C, statement, Name).
  259. -spec describe(connection(), portal, iodata()) -> epgsql_cmd_describe_portal:response();
  260. (connection(), statement, iodata()) -> epgsql_cmd_describe_statement:response().
  261. describe(C, statement, Name) ->
  262. sync_on_error(
  263. C, epgsql_sock:sync_command(
  264. C, epgsql_cmd_describe_statement, Name));
  265. describe(C, portal, Name) ->
  266. sync_on_error(
  267. C, epgsql_sock:sync_command(
  268. C, epgsql_cmd_describe_portal, Name)).
  269. %% @doc close statement
  270. -spec close(connection(), statement()) -> epgsql_cmd_close:response().
  271. close(C, #statement{name = Name}) ->
  272. close(C, statement, Name).
  273. -spec close(connection(), statement | portal, iodata()) -> epgsql_cmd_close:response().
  274. close(C, Type, Name) ->
  275. epgsql_sock:sync_command(C, epgsql_cmd_close, {Type, Name}).
  276. -spec sync(connection()) -> epgsql_cmd_sync:response().
  277. sync(C) ->
  278. epgsql_sock:sync_command(C, epgsql_cmd_sync, []).
  279. -spec cancel(connection()) -> ok.
  280. cancel(C) ->
  281. epgsql_sock:cancel(C).
  282. %% misc helper functions
  283. -spec with_transaction(connection(), fun((connection()) -> Reply)) ->
  284. Reply | {rollback, any()}
  285. when
  286. Reply :: any().
  287. with_transaction(C, F) ->
  288. with_transaction(C, F, [{reraise, false}]).
  289. %% @doc Execute callback function with connection in a transaction.
  290. %% Transaction will be rolled back in case of exception.
  291. %% Options (proplist or map):
  292. %% - reraise (true): when set to true, exception will be re-thrown, otherwise
  293. %% {rollback, ErrorReason} will be returned
  294. %% - ensure_comitted (false): even when callback returns without exception,
  295. %% check that transaction was comitted by checking CommandComplete status
  296. %% of "COMMIT" command. In case when transaction was rolled back, status will be
  297. %% "rollback" instead of "commit".
  298. %% - begin_opts (""): append extra options to "BEGIN" command (see
  299. %% https://www.postgresql.org/docs/current/static/sql-begin.html)
  300. %% Beware of SQL injections! No escaping is made on begin_opts!
  301. -spec with_transaction(
  302. connection(), fun((connection()) -> Reply), Opts) -> Reply | {rollback, any()} | no_return() when
  303. Reply :: any(),
  304. Opts :: [{reraise, boolean()} |
  305. {ensure_committed, boolean()} |
  306. {begin_opts, iodata()}].
  307. with_transaction(C, F, Opts0) ->
  308. Opts = to_proplist(Opts0),
  309. Begin = case proplists:get_value(begin_opts, Opts) of
  310. undefined -> <<"BEGIN">>;
  311. BeginOpts ->
  312. [<<"BEGIN ">> | BeginOpts]
  313. end,
  314. try
  315. {ok, [], []} = squery(C, Begin),
  316. R = F(C),
  317. {ok, [], []} = squery(C, <<"COMMIT">>),
  318. case proplists:get_value(ensure_committed, Opts, false) of
  319. true ->
  320. {ok, CmdStatus} = get_cmd_status(C),
  321. (commit == CmdStatus) orelse error({ensure_committed_failed, CmdStatus});
  322. false -> ok
  323. end,
  324. R
  325. catch
  326. Type:Reason ->
  327. squery(C, "ROLLBACK"),
  328. handle_error(Type, Reason, proplists:get_value(reraise, Opts, true))
  329. end.
  330. handle_error(_, Reason, false) ->
  331. {rollback, Reason};
  332. handle_error(Type, Reason, true) ->
  333. erlang:raise(Type, Reason, erlang:get_stacktrace()).
  334. sync_on_error(C, Error = {error, _}) ->
  335. ok = sync(C),
  336. Error;
  337. sync_on_error(_C, R) ->
  338. R.
  339. -spec standby_status_update(connection(), lsn(), lsn()) -> ok.
  340. %% @doc sends last flushed and applied WAL positions to the server in a standby status update message via
  341. %% given `Connection'
  342. standby_status_update(Connection, FlushedLSN, AppliedLSN) ->
  343. gen_server:call(Connection, {standby_status_update, FlushedLSN, AppliedLSN}).
  344. handle_x_log_data(Mod, StartLSN, EndLSN, WALRecord, Repl) ->
  345. Mod:handle_x_log_data(StartLSN, EndLSN, WALRecord, Repl).
  346. -spec start_replication(connection(), string(), Callback, cb_state(), string(), string()) -> Response when
  347. Response :: epgsql_cmd_start_replication:response(),
  348. Callback :: module() | pid().
  349. %% @doc instructs Postgres server to start streaming WAL for logical replication
  350. %% where
  351. %% `Connection' - connection in replication mode
  352. %% `ReplicationSlot' - the name of the replication slot to stream changes from
  353. %% `Callback' - Callback module which should have the callback functions implemented for message processing.
  354. %% or a process which should be able to receive replication messages.
  355. %% `CbInitState' - Callback Module's initial state
  356. %% `WALPosition' - the WAL position XXX/XXX to begin streaming at.
  357. %% "0/0" to let the server determine the start point.
  358. %% `PluginOpts' - optional options passed to the slot's logical decoding plugin.
  359. %% For example: "option_name1 'value1', option_name2 'value2'"
  360. %% returns `ok' otherwise `{error, Reason}'
  361. start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts) ->
  362. Command = {ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts},
  363. epgsql_sock:sync_command(Connection, epgsql_cmd_start_replication, Command).
  364. start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition) ->
  365. start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, []).
  366. %% @private
  367. to_proplist(List) when is_list(List) ->
  368. List;
  369. to_proplist(Map) ->
  370. maps:to_list(Map).