epgsql.erl 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495
  1. %%% @doc Synchronous interface.
  2. %%%
  3. %%% All functions block (with infinite timeout) until full result is available.
  4. %%% @end
  5. %%% Copyright (C) 2008 - Will Glozer. All rights reserved.
  6. %%% Copyright (C) 2011 - Anton Lebedevich. All rights reserved.
  7. -module(epgsql).
  8. -export([connect/1, connect/2, connect/3, connect/4, connect/5,
  9. close/1,
  10. get_parameter/2,
  11. set_notice_receiver/2,
  12. get_cmd_status/1,
  13. squery/2,
  14. equery/2, equery/3, equery/4,
  15. prepared_query/3,
  16. parse/2, parse/3, parse/4,
  17. describe/2, describe/3,
  18. bind/3, bind/4,
  19. execute/2, execute/3, execute/4,
  20. execute_batch/2, execute_batch/3,
  21. close/2, close/3,
  22. sync/1,
  23. cancel/1,
  24. update_type_cache/1,
  25. update_type_cache/2,
  26. with_transaction/2,
  27. with_transaction/3,
  28. sync_on_error/2,
  29. standby_status_update/3,
  30. start_replication/5,
  31. start_replication/6,
  32. start_replication/7,
  33. to_map/1]).
  34. -export([handle_x_log_data/5]). % private
  35. -export_type([connection/0, connect_option/0, connect_opts/0,
  36. connect_error/0, query_error/0, sql_query/0, column/0,
  37. type_name/0, epgsql_type/0, statement/0]).
  38. %% Deprecated types
  39. -export_type([bind_param/0, typed_param/0,
  40. squery_row/0, equery_row/0, reply/1,
  41. pg_time/0, pg_date/0, pg_datetime/0, pg_interval/0]).
  42. -include("epgsql.hrl").
  43. -type sql_query() :: iodata(). % SQL query text
  44. -type host() :: inet:ip_address() | inet:hostname().
  45. -type password() :: string() | iodata() | fun( () -> iodata() ).
  46. -type connection() :: pid().
  47. -type connect_option() ::
  48. {host, host()} |
  49. {username, string()} |
  50. {password, password()} |
  51. {database, DBName :: string()} |
  52. {port, PortNum :: inet:port_number()} |
  53. {ssl, IsEnabled :: boolean() | required} |
  54. {ssl_opts, SslOptions :: [ssl:ssl_option()]} | % see OTP ssl app, ssl_api.hrl
  55. {timeout, TimeoutMs :: timeout()} | % default: 5000 ms
  56. {async, Receiver :: pid() | atom()} | % process to receive LISTEN/NOTIFY msgs
  57. {codecs, Codecs :: [{epgsql_codec:codec_mod(), any()}]} |
  58. {nulls, Nulls :: [any(), ...]} | % terms to be used as NULL
  59. {replication, Replication :: string()} | % Pass "database" to connect in replication mode
  60. {application_name, ApplicationName :: string()}.
  61. -type connect_opts() ::
  62. [connect_option()]
  63. | #{host => host(),
  64. username => string(),
  65. password => password(),
  66. database => string(),
  67. port => inet:port_number(),
  68. ssl => boolean() | required,
  69. ssl_opts => [ssl:ssl_option()],
  70. timeout => timeout(),
  71. async => pid() | atom(),
  72. codecs => [{epgsql_codec:codec_mod(), any()}],
  73. nulls => [any(), ...],
  74. replication => string(),
  75. application_name => string()
  76. }.
  77. -type connect_error() :: epgsql_cmd_connect:connect_error().
  78. -type query_error() :: #error{}. % Error report generated by server
  79. -type type_name() :: atom().
  80. -type epgsql_type() :: type_name()
  81. | {array, type_name()}
  82. | {unknown_oid, integer()}.
  83. %% Deprecated
  84. -type pg_date() :: epgsql_codec_datetime:pg_date().
  85. -type pg_time() :: epgsql_codec_datetime:pg_time().
  86. -type pg_datetime() :: epgsql_codec_datetime:pg_datetime().
  87. -type pg_interval() :: epgsql_codec_datetime:pg_interval().
  88. -type bind_param() :: any().
  89. %% Value to be bound to placeholder (`$1', `$2' etc)
  90. -type typed_param() :: {epgsql_type(), bind_param()}.
  91. -type column() :: #column{}.
  92. -type statement() :: #statement{}.
  93. -type squery_row() :: tuple(). % tuple of binary().
  94. -type equery_row() :: tuple(). % tuple of any().
  95. -type ok_reply(RowType) ::
  96. %% select
  97. {ok, ColumnsDescription :: [column()], RowsValues :: [RowType]} |
  98. %% update/insert/delete
  99. {ok, Count :: non_neg_integer()} |
  100. %% update/insert/delete + returning
  101. {ok, Count :: non_neg_integer(), ColumnsDescription :: [column()], RowsValues :: [RowType]}.
  102. -type error_reply() :: {error, query_error()}.
  103. -type reply(RowType) :: ok_reply(RowType) | error_reply().
  104. -type lsn() :: integer().
  105. -type cb_state() :: term().
  106. %% See https://github.com/erlang/rebar3/pull/1773
  107. -ifndef(OTP_RELEASE). % pre-OTP21
  108. -define(WITH_STACKTRACE(T, R, S), T:R -> S = erlang:get_stacktrace(), ).
  109. -else.
  110. -define(WITH_STACKTRACE(T, R, S), T:R:S ->).
  111. -endif.
  112. %% -- behaviour callbacks --
  113. %% Handles a XLogData Message (StartLSN, EndLSN, WALRecord, CbState).
  114. %% Return: {ok, LastFlushedLSN, LastAppliedLSN, NewCbState}
  115. -callback handle_x_log_data(lsn(), lsn(), binary(), cb_state()) -> {ok, lsn(), lsn(), cb_state()}.
  116. %% -------------
  117. %% -- client interface --
  118. %% @doc connects to the server and performs all the necessary handshakes
  119. -spec connect(connect_opts())
  120. -> {ok, Connection :: connection()} | {error, Reason :: connect_error()}.
  121. connect(Opts) ->
  122. {ok, C} = epgsql_sock:start_link(),
  123. call_connect(C, Opts).
  124. connect(Host, Opts) ->
  125. connect(Host, os:getenv("USER"), "", Opts).
  126. connect(Host, Username, Opts) ->
  127. connect(Host, Username, "", Opts).
  128. -spec connect(host(), string(), password(), connect_opts())
  129. -> {ok, Connection :: connection()} | {error, Reason :: connect_error()}.
  130. %% @doc connects to the server and performs all the necessary handshakes (legacy interface)
  131. %% @param Host host to connect to
  132. %% @param Username username to connect as, defaults to `$USER'
  133. %% @param Password optional password to authenticate with
  134. %% @param Opts proplist or map of extra options
  135. %% @returns `{ok, Connection}' otherwise `{error, Reason}'
  136. %% @see connect/1
  137. connect(Host, Username, Password, Opts) ->
  138. {ok, C} = epgsql_sock:start_link(),
  139. connect(C, Host, Username, Password, Opts).
  140. -spec connect(connection(), host(), string(), password(), connect_opts())
  141. -> {ok, Connection :: connection()} | {error, Reason :: connect_error()}.
  142. connect(C, Host, Username, Password, Opts) ->
  143. Opts1 = maps:merge(to_map(Opts),
  144. #{host => Host,
  145. username => Username,
  146. password => Password}),
  147. call_connect(C, Opts1).
  148. -spec call_connect(connection(), connect_opts())
  149. -> {ok, Connection :: connection()} | {error, Reason :: connect_error()}.
  150. call_connect(C, Opts) ->
  151. Opts1 = epgsql_cmd_connect:opts_hide_password(to_map(Opts)),
  152. case epgsql_sock:sync_command(
  153. C, epgsql_cmd_connect, Opts1) of
  154. connected ->
  155. %% If following call fails for you, try to add {codecs, []} connect option
  156. {ok, _} = maybe_update_typecache(C, Opts1),
  157. {ok, C};
  158. Error = {error, _} ->
  159. Error
  160. end.
  161. maybe_update_typecache(C, Opts) ->
  162. maybe_update_typecache(C, maps:get(replication, Opts, undefined), maps:get(codecs, Opts, undefined)).
  163. maybe_update_typecache(C, undefined, undefined) ->
  164. %% TODO: don't execute 'update_type_cache' when `codecs` is undefined.
  165. %% This will break backward compatibility
  166. update_type_cache(C);
  167. maybe_update_typecache(C, undefined, [_ | _] = Codecs) ->
  168. update_type_cache(C, Codecs);
  169. maybe_update_typecache(_, _, _) ->
  170. {ok, []}.
  171. update_type_cache(C) ->
  172. update_type_cache(C, [{epgsql_codec_hstore, []},
  173. {epgsql_codec_postgis, []}]).
  174. -spec update_type_cache(connection(), [{epgsql_codec:codec_mod(), Opts :: any()}]) ->
  175. epgsql_cmd_update_type_cache:response() | {error, empty}.
  176. update_type_cache(_C, []) ->
  177. {error, empty};
  178. update_type_cache(C, Codecs) ->
  179. %% {error, #error{severity = error,
  180. %% message = <<"column \"typarray\" does not exist in pg_type">>}}
  181. %% Do not fail connect if pg_type table in not in the expected
  182. %% format. Known to happen for Redshift which is based on PG v8.0.2
  183. epgsql_sock:sync_command(C, epgsql_cmd_update_type_cache, Codecs).
  184. %% @doc close connection
  185. -spec close(connection()) -> ok.
  186. close(C) ->
  187. epgsql_sock:close(C).
  188. -spec get_parameter(connection(), binary()) -> binary() | undefined.
  189. get_parameter(C, Name) ->
  190. epgsql_sock:get_parameter(C, Name).
  191. -spec set_notice_receiver(connection(), undefined | pid() | atom()) ->
  192. {ok, Previous :: pid() | atom()}.
  193. set_notice_receiver(C, PidOrName) ->
  194. epgsql_sock:set_notice_receiver(C, PidOrName).
  195. %% @doc Returns last command status message.
  196. %% If multiple queries were executed using {@link squery/2}, separated by semicolon,
  197. %% only the last query's status will be available.
  198. %% See [https://www.postgresql.org/docs/current/static/libpq-exec.html#LIBPQ-PQCMDSTATUS]
  199. -spec get_cmd_status(connection()) -> {ok, Status}
  200. when
  201. Status :: undefined | atom() | {atom(), integer()}.
  202. get_cmd_status(C) ->
  203. epgsql_sock:get_cmd_status(C).
  204. -spec squery(connection(), sql_query()) -> epgsql_cmd_squery:response() | epgsql_sock:error().
  205. %% @doc runs simple `SqlQuery' via given `Connection'
  206. %% @see epgsql_cmd_squery
  207. squery(Connection, SqlQuery) ->
  208. epgsql_sock:sync_command(Connection, epgsql_cmd_squery, SqlQuery).
  209. equery(C, Sql) ->
  210. equery(C, Sql, []).
  211. -spec equery(connection(), sql_query(), [bind_param()]) ->
  212. epgsql_cmd_equery:response() | epgsql_sock:error().
  213. equery(C, Sql, Parameters) ->
  214. equery(C, "", Sql, Parameters).
  215. %% @doc Executes extended query
  216. %% @end
  217. %% @see epgsql_cmd_equery
  218. %% @end
  219. %% TODO add fast_equery command that doesn't need parsed statement
  220. -spec equery(connection(), string(), sql_query(), [bind_param()]) ->
  221. epgsql_cmd_equery:response() | epgsql_sock:error().
  222. equery(C, Name, Sql, Parameters) ->
  223. case parse(C, Name, Sql, []) of
  224. {ok, #statement{types = Types} = S} ->
  225. TypedParameters = lists:zip(Types, Parameters),
  226. epgsql_sock:sync_command(C, epgsql_cmd_equery, {S, TypedParameters});
  227. Error ->
  228. Error
  229. end.
  230. %% @doc Similar to {@link equery/3}, but uses prepared statement that can be reused multiple times.
  231. %% @see epgsql_cmd_prepared_query
  232. -spec prepared_query(C::connection(), string() | statement(), Parameters::[bind_param()]) ->
  233. epgsql_cmd_prepared_query:response().
  234. prepared_query(C, #statement{types = Types} = S, Parameters) ->
  235. TypedParameters = lists:zip(Types, Parameters),
  236. epgsql_sock:sync_command(C, epgsql_cmd_prepared_query, {S, TypedParameters});
  237. prepared_query(C, Name, Parameters) when is_list(Name) ->
  238. case describe(C, statement, Name) of
  239. {ok, #statement{} = S} ->
  240. prepared_query(C, S, Parameters);
  241. Error ->
  242. Error
  243. end.
  244. %% parse
  245. parse(C, Sql) ->
  246. parse(C, Sql, []).
  247. parse(C, Sql, Types) ->
  248. parse(C, "", Sql, Types).
  249. -spec parse(connection(), iolist(), sql_query(), [epgsql_type()]) ->
  250. epgsql_cmd_parse:response().
  251. parse(C, Name, Sql, Types) ->
  252. sync_on_error(
  253. C, epgsql_sock:sync_command(
  254. C, epgsql_cmd_parse, {Name, Sql, Types})).
  255. %% bind
  256. bind(C, Statement, Parameters) ->
  257. bind(C, Statement, "", Parameters).
  258. -spec bind(connection(), statement(), string(), [bind_param()]) ->
  259. epgsql_cmd_bind:response().
  260. bind(C, Statement, PortalName, Parameters) ->
  261. sync_on_error(
  262. C,
  263. epgsql_sock:sync_command(
  264. C, epgsql_cmd_bind, {Statement, PortalName, Parameters})).
  265. %% execute
  266. execute(C, S) ->
  267. execute(C, S, "", 0).
  268. execute(C, S, N) ->
  269. execute(C, S, "", N).
  270. -spec execute(connection(), statement(), string(), non_neg_integer()) -> Reply when
  271. Reply :: epgsql_cmd_execute:response().
  272. execute(C, S, PortalName, N) ->
  273. epgsql_sock:sync_command(C, epgsql_cmd_execute, {S, PortalName, N}).
  274. %% @doc Executes batch of `{statement(), [bind_param()]}' extended queries
  275. %% @see epgsql_cmd_batch
  276. -spec execute_batch(connection(), [{statement(), [bind_param()]}]) ->
  277. epgsql_cmd_batch:response().
  278. execute_batch(C, Batch) ->
  279. epgsql_sock:sync_command(C, epgsql_cmd_batch, Batch).
  280. %% @doc Executes same statement() extended query with each parameter list of a `Batch'
  281. %% @see epgsql_cmd_batch
  282. -spec execute_batch(connection(), statement() | sql_query(), [ [bind_param()] ]) ->
  283. {[column()], epgsql_cmd_batch:response()}.
  284. execute_batch(C, #statement{columns = Cols} = Statement, Batch) ->
  285. {Cols, epgsql_sock:sync_command(C, epgsql_cmd_batch, {Statement, Batch})};
  286. execute_batch(C, Sql, Batch) ->
  287. case parse(C, Sql) of
  288. {ok, #statement{} = S} ->
  289. execute_batch(C, S, Batch);
  290. Error ->
  291. Error
  292. end.
  293. %% statement/portal functions
  294. -spec describe(connection(), statement()) -> epgsql_cmd_describe_statement:response().
  295. describe(C, #statement{name = Name}) ->
  296. describe(C, statement, Name).
  297. -spec describe(connection(), portal, iodata()) -> epgsql_cmd_describe_portal:response();
  298. (connection(), statement, iodata()) -> epgsql_cmd_describe_statement:response().
  299. describe(C, statement, Name) ->
  300. sync_on_error(
  301. C, epgsql_sock:sync_command(
  302. C, epgsql_cmd_describe_statement, Name));
  303. describe(C, portal, Name) ->
  304. sync_on_error(
  305. C, epgsql_sock:sync_command(
  306. C, epgsql_cmd_describe_portal, Name)).
  307. %% @doc close statement
  308. -spec close(connection(), statement()) -> epgsql_cmd_close:response().
  309. close(C, #statement{name = Name}) ->
  310. close(C, statement, Name).
  311. %% @doc close statement or portal
  312. -spec close(connection(), statement | portal, iodata()) -> epgsql_cmd_close:response().
  313. close(C, Type, Name) ->
  314. epgsql_sock:sync_command(C, epgsql_cmd_close, {Type, Name}).
  315. -spec sync(connection()) -> epgsql_cmd_sync:response().
  316. sync(C) ->
  317. epgsql_sock:sync_command(C, epgsql_cmd_sync, []).
  318. %% @doc cancel currently executing command
  319. -spec cancel(connection()) -> ok.
  320. cancel(C) ->
  321. epgsql_sock:cancel(C).
  322. %% misc helper functions
  323. -spec with_transaction(connection(), fun((connection()) -> Reply)) ->
  324. Reply | {rollback, any()}
  325. when
  326. Reply :: any().
  327. with_transaction(C, F) ->
  328. with_transaction(C, F, [{reraise, false}]).
  329. %% @doc Execute callback function with connection in a transaction.
  330. %% Transaction will be rolled back in case of exception.
  331. %% Options (proplist or map):
  332. %% <dl>
  333. %% <dt>reraise</dt>
  334. %% <dd>when set to true, exception will be re-thrown, otherwise
  335. %% `{rollback, ErrorReason}' will be returned. Default: `true'</dd>
  336. %% <dt>ensure_comitted</dt>
  337. %% <dd>even when callback returns without exception,
  338. %% check that transaction was comitted by checking CommandComplete status
  339. %% of "COMMIT" command. In case when transaction was rolled back, status will be
  340. %% "rollback" instead of "commit". Default: `false'</dd>
  341. %% <dt>begin_opts</dt>
  342. %% <dd>append extra options to "BEGIN" command (see
  343. %% https://www.postgresql.org/docs/current/static/sql-begin.html)
  344. %% Beware of SQL injections! No escaping is made on begin_opts! Default: `""'</dd>
  345. %% </dl>
  346. -spec with_transaction(
  347. connection(), fun((connection()) -> Reply), Opts) -> Reply | {rollback, any()} | no_return() when
  348. Reply :: any(),
  349. Opts :: [{reraise, boolean()} |
  350. {ensure_committed, boolean()} |
  351. {begin_opts, iodata()}].
  352. with_transaction(C, F, Opts0) ->
  353. Opts = to_map(Opts0),
  354. Begin = case Opts of
  355. #{begin_opts := BeginOpts} ->
  356. [<<"BEGIN ">> | BeginOpts];
  357. _ -> <<"BEGIN">>
  358. end,
  359. try
  360. {ok, [], []} = squery(C, Begin),
  361. R = F(C),
  362. {ok, [], []} = squery(C, <<"COMMIT">>),
  363. case Opts of
  364. #{ensure_committed := true} ->
  365. {ok, CmdStatus} = get_cmd_status(C),
  366. (commit == CmdStatus) orelse error({ensure_committed_failed, CmdStatus});
  367. _ -> ok
  368. end,
  369. R
  370. catch
  371. ?WITH_STACKTRACE(Type, Reason, Stack)
  372. squery(C, "ROLLBACK"),
  373. case maps:get(reraise, Opts, true) of
  374. true ->
  375. erlang:raise(Type, Reason, Stack);
  376. false ->
  377. {rollback, Reason}
  378. end
  379. end.
  380. sync_on_error(C, Error = {error, _}) ->
  381. ok = sync(C),
  382. Error;
  383. sync_on_error(_C, R) ->
  384. R.
  385. -spec standby_status_update(connection(), lsn(), lsn()) -> ok.
  386. %% @doc sends last flushed and applied WAL positions to the server in a standby status update message via
  387. %% given `Connection'
  388. standby_status_update(Connection, FlushedLSN, AppliedLSN) ->
  389. gen_server:call(Connection, {standby_status_update, FlushedLSN, AppliedLSN}).
  390. handle_x_log_data(Mod, StartLSN, EndLSN, WALRecord, Repl) ->
  391. Mod:handle_x_log_data(StartLSN, EndLSN, WALRecord, Repl).
  392. -type replication_option() ::
  393. {align_lsn, boolean()}. %% Align last applied and flushed LSN with last received LSN
  394. %% after Primary keepalive message with ReplyRequired flag
  395. -type replication_opts() ::
  396. [replication_option()]
  397. | #{align_lsn => boolean()}.
  398. -spec start_replication(connection(), string(), Callback, cb_state(), string(), string(), replication_opts()) ->
  399. Response when
  400. Response :: epgsql_cmd_start_replication:response(),
  401. Callback :: module() | pid().
  402. %% @doc instructs Postgres server to start streaming WAL for logical replication
  403. %% @param Connection connection in replication mode
  404. %% @param ReplicationSlot the name of the replication slot to stream changes from
  405. %% @param Callback Callback module which should have the callback functions implemented for message processing.
  406. %% or a process which should be able to receive replication messages.
  407. %% @param CbInitState Callback Module's initial state
  408. %% @param WALPosition the WAL position XXX/XXX to begin streaming at.
  409. %% "0/0" to let the server determine the start point.
  410. %% @param PluginOpts optional options passed to the slot's logical decoding plugin.
  411. %% For example: "option_name1 'value1', option_name2 'value2'"
  412. %% @param Opts options of logical replication
  413. %% @returns `ok' otherwise `{error, Reason}'
  414. start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts, Opts) ->
  415. Command = {ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts, to_map(Opts)},
  416. epgsql_sock:sync_command(Connection, epgsql_cmd_start_replication, Command).
  417. start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts) ->
  418. start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts, []).
  419. start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition) ->
  420. start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, [], []).
  421. %% @private
  422. -spec to_map([{any(), any()}] | map()) -> map().
  423. to_map(Map) when is_map(Map) ->
  424. Map;
  425. to_map(List) when is_list(List) ->
  426. maps:from_list(List).