epgsql.erl 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591
  1. %%% @doc Synchronous interface.
  2. %%%
  3. %%% All functions block (with infinite timeout) until full result is available.
  4. %%% @end
  5. %%% Copyright (C) 2008 - Will Glozer. All rights reserved.
  6. %%% Copyright (C) 2011 - Anton Lebedevich. All rights reserved.
  7. -module(epgsql).
  8. -export([connect/1, connect/2, connect/3, connect/4, connect/5,
  9. close/1,
  10. get_parameter/2,
  11. set_notice_receiver/2,
  12. get_cmd_status/1,
  13. get_backend_pid/1,
  14. squery/2,
  15. equery/2, equery/3, equery/4,
  16. prepared_query/3,
  17. parse/2, parse/3, parse/4,
  18. describe/2, describe/3,
  19. bind/3, bind/4,
  20. execute/2, execute/3, execute/4,
  21. execute_batch/2, execute_batch/3,
  22. close/2, close/3,
  23. sync/1,
  24. cancel/1,
  25. update_type_cache/1,
  26. update_type_cache/2,
  27. with_transaction/2,
  28. with_transaction/3,
  29. sync_on_error/2,
  30. copy_from_stdin/2,
  31. copy_from_stdin/3,
  32. copy_send_rows/3,
  33. copy_done/1,
  34. standby_status_update/3,
  35. start_replication/5,
  36. start_replication/6,
  37. start_replication/7,
  38. to_map/1,
  39. activate/1]).
  40. %% private
  41. -export([handle_x_log_data/5]).
  42. -export_type([connection/0, connect_option/0, connect_opts/0, connect_opts_map/0,
  43. connect_error/0, query_error/0, sql_query/0, column/0,
  44. type_name/0, epgsql_type/0, statement/0,
  45. transaction_option/0, transaction_opts/0, socket_active/0]).
  46. %% Deprecated types
  47. -export_type([bind_param/0, typed_param/0,
  48. squery_row/0, equery_row/0, reply/1,
  49. pg_time/0, pg_date/0, pg_datetime/0, pg_interval/0]).
  50. -include("epgsql.hrl").
  51. -ifdef(OTP_RELEASE).
  52. -type ssl_options() :: [ssl:tls_client_option()].
  53. -else.
  54. -type ssl_options() :: list().
  55. -endif.
  56. -type sql_query() :: iodata(). % SQL query text
  57. -type host() :: inet:ip_address() | inet:hostname().
  58. -type password() :: string() | iodata() | fun( () -> iodata() ).
  59. -type connection() :: pid().
  60. -type socket_active() :: true | -32768..32767.
  61. -type connect_option() ::
  62. {host, host()} |
  63. {username, string()} |
  64. {password, password()} |
  65. {database, DBName :: string()} |
  66. {port, PortNum :: inet:port_number()} |
  67. {ssl, IsEnabled :: boolean() | required} |
  68. {ssl_opts, SslOptions :: ssl_options()} | % see OTP ssl app documentation
  69. {tcp_opts, TcpOptions :: [gen_tcp:option()]} | % see OTP gen_tcp module documentation
  70. {timeout, TimeoutMs :: timeout()} | % connect timeout, default: 5000 ms
  71. {async, Receiver :: pid() | atom()} | % process to receive LISTEN/NOTIFY msgs
  72. {codecs, Codecs :: [{epgsql_codec:codec_mod(), any()}]} |
  73. {nulls, Nulls :: [any(), ...]} | % terms to be used as NULL
  74. {replication, Replication :: string()} | % Pass "database" to connect in replication mode
  75. {application_name, ApplicationName :: string()} |
  76. {socket_active, Active :: socket_active()}.
  77. -type connect_opts_map() ::
  78. #{host => host(),
  79. username => string(),
  80. password => password(),
  81. database => string(),
  82. port => inet:port_number(),
  83. ssl => boolean() | required,
  84. ssl_opts => ssl_options(),
  85. tcp_opts => [gen_tcp:option()],
  86. timeout => timeout(),
  87. async => pid() | atom(),
  88. codecs => [{epgsql_codec:codec_mod(), any()}],
  89. nulls => [any(), ...],
  90. replication => string(),
  91. application_name => string(),
  92. socket_active => socket_active()
  93. }.
  94. -type connect_opts() :: connect_opts_map() | [connect_option()].
  95. -type transaction_option() ::
  96. {reraise, boolean()} |
  97. {ensure_committed, boolean()} |
  98. {begin_opts, iodata()}.
  99. -type transaction_opts() ::
  100. [transaction_option()]
  101. | #{reraise => boolean(),
  102. ensure_committed => boolean(),
  103. begin_opts => iodata()
  104. }.
  105. -type connect_error() :: epgsql_cmd_connect:connect_error().
  106. -type query_error() :: #error{}. % Error report generated by server
  107. -type type_name() :: atom().
  108. -type epgsql_type() :: type_name()
  109. | {array, type_name()}
  110. | {unknown_oid, integer()}.
  111. %% Deprecated
  112. -type pg_date() :: epgsql_codec_datetime:pg_date().
  113. -type pg_time() :: epgsql_codec_datetime:pg_time().
  114. -type pg_datetime() :: epgsql_codec_datetime:pg_datetime().
  115. -type pg_interval() :: epgsql_codec_datetime:pg_interval().
  116. -type bind_param() :: any().
  117. %% Value to be bound to placeholder (`$1', `$2' etc)
  118. -type typed_param() :: {epgsql_type(), bind_param()}.
  119. -type column() :: #column{}.
  120. -type statement() :: #statement{}.
  121. -type squery_row() :: tuple(). % tuple of binary().
  122. -type equery_row() :: tuple(). % tuple of any().
  123. -type ok_reply(RowType) ::
  124. %% select
  125. {ok, ColumnsDescription :: [column()], RowsValues :: [RowType]} |
  126. %% update/insert/delete
  127. {ok, Count :: non_neg_integer()} |
  128. %% update/insert/delete + returning
  129. {ok, Count :: non_neg_integer(), ColumnsDescription :: [column()], RowsValues :: [RowType]}.
  130. -type error_reply() :: {error, query_error()}.
  131. -type reply(RowType) :: ok_reply(RowType) | error_reply().
  132. -type lsn() :: integer().
  133. -type cb_state() :: term().
  134. %% See https://github.com/erlang/rebar3/pull/1773
  135. -ifndef(OTP_RELEASE). % pre-OTP21
  136. -define(WITH_STACKTRACE(T, R, S), T:R -> S = erlang:get_stacktrace(), ).
  137. -else.
  138. -define(WITH_STACKTRACE(T, R, S), T:R:S ->).
  139. -endif.
  140. %% -- behaviour callbacks --
  141. %% Handles a XLogData Message (StartLSN, EndLSN, WALRecord, CbState).
  142. %% Return: {ok, LastFlushedLSN, LastAppliedLSN, NewCbState}
  143. -callback handle_x_log_data(lsn(), lsn(), binary(), cb_state()) -> {ok, lsn(), lsn(), cb_state()}.
  144. %% -------------
  145. %% -- client interface --
  146. %% @doc connects to the server and performs all the necessary handshakes
  147. -spec connect(connect_opts())
  148. -> {ok, Connection :: connection()} | {error, Reason :: connect_error()}.
  149. connect(Opts) ->
  150. {ok, C} = epgsql_sock:start_link(),
  151. call_connect(C, Opts).
  152. connect(Host, Opts) ->
  153. connect(Host, os:getenv("USER"), "", Opts).
  154. connect(Host, Username, Opts) ->
  155. connect(Host, Username, "", Opts).
  156. -spec connect(host(), string(), password(), connect_opts())
  157. -> {ok, Connection :: connection()} | {error, Reason :: connect_error()}.
  158. %% @doc connects to the server and performs all the necessary handshakes (legacy interface)
  159. %% @param Host host to connect to
  160. %% @param Username username to connect as, defaults to `$USER'
  161. %% @param Password optional password to authenticate with
  162. %% @param Opts proplist or map of extra options
  163. %% @returns `{ok, Connection}' otherwise `{error, Reason}'
  164. %% @see connect/1
  165. connect(Host, Username, Password, Opts) ->
  166. {ok, C} = epgsql_sock:start_link(),
  167. connect(C, Host, Username, Password, Opts).
  168. -spec connect(connection(), host(), string(), password(), connect_opts())
  169. -> {ok, Connection :: connection()} | {error, Reason :: connect_error()}.
  170. connect(C, Host, Username, Password, Opts) ->
  171. Opts1 = maps:merge(to_map(Opts),
  172. #{host => Host,
  173. username => Username,
  174. password => Password}),
  175. call_connect(C, Opts1).
  176. -spec call_connect(connection(), connect_opts())
  177. -> {ok, Connection :: connection()} | {error, Reason :: connect_error()}.
  178. call_connect(C, Opts) ->
  179. Opts1 = epgsql_cmd_connect:opts_hide_password(to_map(Opts)),
  180. case epgsql_sock:sync_command(
  181. C, epgsql_cmd_connect, Opts1) of
  182. connected ->
  183. %% If following call fails for you, try to add {codecs, []} connect option
  184. {ok, _} = maybe_update_typecache(C, Opts1),
  185. {ok, C};
  186. Error = {error, _} ->
  187. Error
  188. end.
  189. maybe_update_typecache(C, Opts) ->
  190. maybe_update_typecache(C, maps:get(replication, Opts, undefined), maps:get(codecs, Opts, undefined)).
  191. maybe_update_typecache(C, undefined, undefined) ->
  192. %% TODO: don't execute 'update_type_cache' when `codecs` is undefined.
  193. %% This will break backward compatibility
  194. update_type_cache(C);
  195. maybe_update_typecache(C, undefined, [_ | _] = Codecs) ->
  196. update_type_cache(C, Codecs);
  197. maybe_update_typecache(_, _, _) ->
  198. {ok, []}.
  199. update_type_cache(C) ->
  200. update_type_cache(C, [{epgsql_codec_hstore, []},
  201. {epgsql_codec_postgis, []}]).
  202. -spec update_type_cache(connection(), [{epgsql_codec:codec_mod(), Opts :: any()}]) ->
  203. epgsql_cmd_update_type_cache:response() | {error, empty}.
  204. update_type_cache(_C, []) ->
  205. {error, empty};
  206. update_type_cache(C, Codecs) ->
  207. %% {error, #error{severity = error,
  208. %% message = <<"column \"typarray\" does not exist in pg_type">>}}
  209. %% Do not fail connect if pg_type table in not in the expected
  210. %% format. Known to happen for Redshift which is based on PG v8.0.2
  211. epgsql_sock:sync_command(C, epgsql_cmd_update_type_cache, Codecs).
  212. %% @doc close connection
  213. -spec close(connection()) -> ok.
  214. close(C) ->
  215. epgsql_sock:close(C).
  216. -spec get_parameter(connection(), list() | binary()) -> {ok, binary() | undefined}.
  217. get_parameter(C, Name) ->
  218. epgsql_sock:get_parameter(C, Name).
  219. -spec set_notice_receiver(connection(), undefined | pid() | atom()) ->
  220. {ok, Previous :: pid() | atom()}.
  221. set_notice_receiver(C, PidOrName) ->
  222. epgsql_sock:set_notice_receiver(C, PidOrName).
  223. %% @doc Returns last command status message.
  224. %% If multiple queries were executed using {@link squery/2}, separated by semicolon,
  225. %% only the last query's status will be available.
  226. %% See [https://www.postgresql.org/docs/current/static/libpq-exec.html#LIBPQ-PQCMDSTATUS]
  227. -spec get_cmd_status(connection()) -> {ok, Status}
  228. when
  229. Status :: undefined | atom() | {atom(), integer()}.
  230. get_cmd_status(C) ->
  231. epgsql_sock:get_cmd_status(C).
  232. %% @doc Returns the OS pid of PostgreSQL backend OS process that serves this connection.
  233. %%
  234. %% Similar to `SELECT pg_get_pid()', but does not need network roundtrips.
  235. -spec get_backend_pid(connection()) -> integer().
  236. get_backend_pid(C) ->
  237. epgsql_sock:get_backend_pid(C).
  238. -spec squery(connection(), sql_query()) -> epgsql_cmd_squery:response() | epgsql_sock:error().
  239. %% @doc runs simple `SqlQuery' via given `Connection'
  240. %% @see epgsql_cmd_squery
  241. squery(Connection, SqlQuery) ->
  242. epgsql_sock:sync_command(Connection, epgsql_cmd_squery, SqlQuery).
  243. equery(C, Sql) ->
  244. equery(C, Sql, []).
  245. -spec equery(connection(), sql_query(), [bind_param()]) ->
  246. epgsql_cmd_equery:response() | epgsql_sock:error().
  247. equery(C, Sql, Parameters) ->
  248. equery(C, "", Sql, Parameters).
  249. %% @doc Executes extended query
  250. %% @end
  251. %% @see epgsql_cmd_equery
  252. %% @end
  253. %% TODO add fast_equery command that doesn't need parsed statement
  254. -spec equery(connection(), string(), sql_query(), [bind_param()]) ->
  255. epgsql_cmd_equery:response() | epgsql_sock:error().
  256. equery(C, Name, Sql, Parameters) ->
  257. case parse(C, Name, Sql, []) of
  258. {ok, #statement{types = Types} = S} ->
  259. TypedParameters = lists:zip(Types, Parameters),
  260. epgsql_sock:sync_command(C, epgsql_cmd_equery, {S, TypedParameters});
  261. Error ->
  262. Error
  263. end.
  264. %% @doc Similar to {@link equery/3}, but uses prepared statement that can be reused multiple times.
  265. %% @see epgsql_cmd_prepared_query
  266. -spec prepared_query(C::connection(), string() | statement(), Parameters::[bind_param()]) ->
  267. epgsql_cmd_prepared_query:response().
  268. prepared_query(C, #statement{types = Types} = S, Parameters) ->
  269. TypedParameters = lists:zip(Types, Parameters),
  270. epgsql_sock:sync_command(C, epgsql_cmd_prepared_query, {S, TypedParameters});
  271. prepared_query(C, Name, Parameters) when is_list(Name) ->
  272. case describe(C, statement, Name) of
  273. {ok, #statement{} = S} ->
  274. prepared_query(C, S, Parameters);
  275. Error ->
  276. Error
  277. end.
  278. %% parse
  279. parse(C, Sql) ->
  280. parse(C, Sql, []).
  281. parse(C, Sql, Types) ->
  282. parse(C, "", Sql, Types).
  283. -spec parse(connection(), iolist(), sql_query(), [epgsql_type()]) ->
  284. epgsql_cmd_parse:response().
  285. parse(C, Name, Sql, Types) ->
  286. sync_on_error(
  287. C, epgsql_sock:sync_command(
  288. C, epgsql_cmd_parse, {Name, Sql, Types})).
  289. %% bind
  290. bind(C, Statement, Parameters) ->
  291. bind(C, Statement, "", Parameters).
  292. -spec bind(connection(), statement(), string(), [bind_param()]) ->
  293. epgsql_cmd_bind:response().
  294. bind(C, Statement, PortalName, Parameters) ->
  295. sync_on_error(
  296. C,
  297. epgsql_sock:sync_command(
  298. C, epgsql_cmd_bind, {Statement, PortalName, Parameters})).
  299. %% execute
  300. execute(C, S) ->
  301. execute(C, S, "", 0).
  302. execute(C, S, N) ->
  303. execute(C, S, "", N).
  304. -spec execute(connection(), statement(), string(), non_neg_integer()) -> Reply when
  305. Reply :: epgsql_cmd_execute:response().
  306. execute(C, S, PortalName, N) ->
  307. epgsql_sock:sync_command(C, epgsql_cmd_execute, {S, PortalName, N}).
  308. %% @doc Executes batch of `{statement(), [bind_param()]}' extended queries
  309. %% @see epgsql_cmd_batch
  310. -spec execute_batch(connection(), [{statement(), [bind_param()]}]) ->
  311. epgsql_cmd_batch:response().
  312. execute_batch(C, Batch) ->
  313. epgsql_sock:sync_command(C, epgsql_cmd_batch, Batch).
  314. %% @doc Executes same statement() extended query with each parameter list of a `Batch'
  315. %% @see epgsql_cmd_batch
  316. -spec execute_batch(connection(), statement() | sql_query(), [ [bind_param()] ]) ->
  317. {[column()], epgsql_cmd_batch:response()}.
  318. execute_batch(C, #statement{columns = Cols} = Statement, Batch) ->
  319. {Cols, epgsql_sock:sync_command(C, epgsql_cmd_batch, {Statement, Batch})};
  320. execute_batch(C, Sql, Batch) ->
  321. case parse(C, Sql) of
  322. {ok, #statement{} = S} ->
  323. execute_batch(C, S, Batch);
  324. Error ->
  325. Error
  326. end.
  327. %% statement/portal functions
  328. -spec describe(connection(), statement()) -> epgsql_cmd_describe_statement:response().
  329. describe(C, #statement{name = Name}) ->
  330. describe(C, statement, Name).
  331. -spec describe(connection(), portal, iodata()) -> epgsql_cmd_describe_portal:response();
  332. (connection(), statement, iodata()) -> epgsql_cmd_describe_statement:response().
  333. describe(C, statement, Name) ->
  334. sync_on_error(
  335. C, epgsql_sock:sync_command(
  336. C, epgsql_cmd_describe_statement, Name));
  337. describe(C, portal, Name) ->
  338. sync_on_error(
  339. C, epgsql_sock:sync_command(
  340. C, epgsql_cmd_describe_portal, Name)).
  341. %% @doc close statement
  342. -spec close(connection(), statement()) -> epgsql_cmd_close:response().
  343. close(C, #statement{name = Name}) ->
  344. close(C, statement, Name).
  345. %% @doc close statement or portal
  346. -spec close(connection(), statement | portal, iodata()) -> epgsql_cmd_close:response().
  347. close(C, Type, Name) ->
  348. epgsql_sock:sync_command(C, epgsql_cmd_close, {Type, Name}).
  349. -spec sync(connection()) -> epgsql_cmd_sync:response().
  350. sync(C) ->
  351. epgsql_sock:sync_command(C, epgsql_cmd_sync, []).
  352. %% @doc cancel currently executing command
  353. -spec cancel(connection()) -> ok.
  354. cancel(C) ->
  355. epgsql_sock:cancel(C).
  356. %% misc helper functions
  357. -spec with_transaction(connection(), fun((connection()) -> Reply)) ->
  358. Reply | {rollback, any()}
  359. when
  360. Reply :: any().
  361. with_transaction(C, F) ->
  362. with_transaction(C, F, [{reraise, false}]).
  363. %% @doc Execute callback function with connection in a transaction.
  364. %% Transaction will be rolled back in case of exception.
  365. %% Options (proplist or map):
  366. %% <dl>
  367. %% <dt>reraise</dt>
  368. %% <dd>when set to true, exception will be re-thrown, otherwise
  369. %% `{rollback, ErrorReason}' will be returned. Default: `true'</dd>
  370. %% <dt>ensure_comitted</dt>
  371. %% <dd>even when callback returns without exception,
  372. %% check that transaction was committed by checking CommandComplete status
  373. %% of "COMMIT" command. In case when transaction was rolled back, status will be
  374. %% "rollback" instead of "commit". Default: `false'</dd>
  375. %% <dt>begin_opts</dt>
  376. %% <dd>append extra options to "BEGIN" command (see
  377. %% https://www.postgresql.org/docs/current/static/sql-begin.html)
  378. %% Beware of SQL injections! No escaping is made on begin_opts! Default: `""'</dd>
  379. %% </dl>
  380. -spec with_transaction(
  381. connection(), fun((connection()) -> Reply), transaction_opts()) -> Reply | {rollback, any()} | no_return() when
  382. Reply :: any().
  383. with_transaction(C, F, Opts0) ->
  384. Opts = to_map(Opts0),
  385. Begin = case Opts of
  386. #{begin_opts := BeginOpts} ->
  387. [<<"BEGIN ">> | BeginOpts];
  388. _ -> <<"BEGIN">>
  389. end,
  390. try
  391. {ok, [], []} = squery(C, Begin),
  392. R = F(C),
  393. {ok, [], []} = squery(C, <<"COMMIT">>),
  394. case Opts of
  395. #{ensure_committed := true} ->
  396. {ok, CmdStatus} = get_cmd_status(C),
  397. (commit == CmdStatus) orelse error({ensure_committed_failed, CmdStatus});
  398. _ -> ok
  399. end,
  400. R
  401. catch
  402. ?WITH_STACKTRACE(Type, Reason, Stack)
  403. squery(C, "ROLLBACK"),
  404. case maps:get(reraise, Opts, true) of
  405. true ->
  406. erlang:raise(Type, Reason, Stack);
  407. false ->
  408. {rollback, Reason}
  409. end
  410. end.
  411. sync_on_error(C, Error = {error, _}) ->
  412. ok = sync(C),
  413. Error;
  414. sync_on_error(_C, R) ->
  415. R.
  416. %% @equiv copy_from_stdin(C, SQL, text)
  417. copy_from_stdin(C, SQL) ->
  418. copy_from_stdin(C, SQL, text).
  419. %% @doc Switches epgsql into COPY-mode
  420. %%
  421. %% When `Format' is `text', Erlang IO-protocol should be used to transfer "raw" COPY data to the
  422. %% server (see, eg, `io:put_chars/2' and `file:write/2' etc).
  423. %%
  424. %% When `Format' is `{binary, Types}', {@link copy_send_rows/3} should be used instead.
  425. %%
  426. %% In case COPY-payload is invalid, asynchronous message of the form
  427. %% `{epgsql, connection(), {error, epgsql:query_error()}}' (similar to asynchronous notification,
  428. %% see {@link set_notice_receiver/2}) will be sent to the process that called `copy_from_stdin'
  429. %% and all the subsequent IO-protocol requests will return error.
  430. %% It's important to not call `copy_done' if such error is detected!
  431. %%
  432. %% @param SQL have to be `COPY ... FROM STDIN ...' statement
  433. %% @param Format data transfer format specification: `text' or `{binary, epgsql_type()}'. Have to
  434. %% match `WHERE (FORMAT ???)' from SQL (`text' for `text'/`csv' OR `{binary, ..}' for `binary').
  435. %% @returns in case of success, `{ok, [text | binary]}' tuple is returned. List describes the expected
  436. %% payload format for each column of input. In current implementation all the atoms in a list
  437. %% will be the same and will match the atom in `Format' parameter. It may change in the future
  438. %% if PostgreSQL will introduce alternative payload formats.
  439. -spec copy_from_stdin(connection(), sql_query(), text | {binary, [epgsql_type()]}) ->
  440. epgsql_cmd_copy_from_stdin:response().
  441. copy_from_stdin(C, SQL, Format) ->
  442. epgsql_sock:sync_command(C, epgsql_cmd_copy_from_stdin, {SQL, self(), Format}).
  443. %% @doc Send a batch of rows to `COPY .. FROM STDIN WITH (FORMAT binary)' in Erlang format
  444. %%
  445. %% Erlang values will be converted to postgres types same way as parameters of, eg, {@link equery/3}
  446. %% using data type specification from 3rd argument of {@link copy_from_stdin/3} (number of columns in
  447. %% each element of `Rows' should match the number of elements in `{binary, Types}').
  448. %% @param Rows might be a list of tuples or list of lists. List of lists is slightly more efficient.
  449. -spec copy_send_rows(connection(), [tuple() | [bind_param()]], timeout()) -> ok | {error, ErrReason} when
  450. ErrReason :: not_in_copy_mode | not_binary_format | query_error().
  451. copy_send_rows(C, Rows, Timeout) ->
  452. epgsql_sock:copy_send_rows(C, Rows, Timeout).
  453. %% @doc Tells server that the transfer of COPY data is done
  454. %%
  455. %% Stops copy-mode and returns the number of inserted rows.
  456. -spec copy_done(connection()) -> epgsql_cmd_copy_done:response().
  457. copy_done(C) ->
  458. epgsql_sock:sync_command(C, epgsql_cmd_copy_done, []).
  459. -spec standby_status_update(connection(), lsn(), lsn()) -> ok.
  460. %% @doc sends last flushed and applied WAL positions to the server in a standby status update message via
  461. %% given `Connection'
  462. standby_status_update(Connection, FlushedLSN, AppliedLSN) ->
  463. epgsql_sock:standby_status_update(Connection, FlushedLSN, AppliedLSN).
  464. handle_x_log_data(Mod, StartLSN, EndLSN, WALRecord, Repl) ->
  465. Mod:handle_x_log_data(StartLSN, EndLSN, WALRecord, Repl).
  466. -type replication_option() ::
  467. {align_lsn, boolean()}. %% Align last applied and flushed LSN with last received LSN
  468. %% after Primary keepalive message with ReplyRequired flag
  469. -type replication_opts() ::
  470. [replication_option()]
  471. | #{align_lsn => boolean()}.
  472. -spec start_replication(connection(), string(), Callback, cb_state(), string(), string(), replication_opts()) ->
  473. Response when
  474. Response :: epgsql_cmd_start_replication:response(),
  475. Callback :: module() | pid().
  476. %% @doc instructs Postgres server to start streaming WAL for logical replication
  477. %% @param Connection connection in replication mode
  478. %% @param ReplicationSlot the name of the replication slot to stream changes from
  479. %% @param Callback Callback module which should have the callback functions implemented for message processing.
  480. %% or a process which should be able to receive replication messages.
  481. %% @param CbInitState Callback Module's initial state
  482. %% @param WALPosition the WAL position XXX/XXX to begin streaming at.
  483. %% "0/0" to let the server determine the start point.
  484. %% @param PluginOpts optional options passed to the slot's logical decoding plugin.
  485. %% For example: "option_name1 'value1', option_name2 'value2'"
  486. %% @param Opts options of logical replication
  487. %% @returns `ok' otherwise `{error, Reason}'
  488. start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts, Opts) ->
  489. Command = {ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts, to_map(Opts)},
  490. epgsql_sock:sync_command(Connection, epgsql_cmd_start_replication, Command).
  491. start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts) ->
  492. start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts, []).
  493. start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition) ->
  494. start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, [], []).
  495. %% @private
  496. -spec to_map([{any(), any()}] | map()) -> map().
  497. to_map(Map) when is_map(Map) ->
  498. Map;
  499. to_map(List) when is_list(List) ->
  500. maps:from_list(List).
  501. %% @doc Activates TCP or SSL socket of a connection.
  502. %%
  503. %% If the `socket_active` connection option is supplied the function sets
  504. %% `{active, X}' the connection's SSL or TCP socket. It sets `{active, true}' otherwise.
  505. %%
  506. %% @param Connection connection
  507. %% @returns `ok' or `{error, Reason}'
  508. %%
  509. %% Note: The ssl:reason() type is not exported so that we use `any()' on the spec.
  510. -spec activate(connection()) -> ok | {error, inet:posix() | any()}.
  511. activate(Connection) ->
  512. epgsql_sock:activate(Connection).