|
@@ -28,10 +28,11 @@
|
|
|
start_replication/5,
|
|
|
start_replication/6,
|
|
|
to_proplist/1]).
|
|
|
+-export([handle_x_log_data/5]). % private
|
|
|
|
|
|
-export_type([connection/0, connect_option/0, connect_opts/0,
|
|
|
connect_error/0, query_error/0, sql_query/0, column/0,
|
|
|
- type_name/0, epgsql_type/0]).
|
|
|
+ type_name/0, epgsql_type/0, statement/0]).
|
|
|
|
|
|
%% Deprecated types
|
|
|
-export_type([bind_param/0, typed_param/0,
|
|
@@ -95,12 +96,16 @@
|
|
|
-type typed_param() :: {epgsql_type(), bind_param()}.
|
|
|
|
|
|
-type column() :: #column{}.
|
|
|
+-type statement() :: #statement{}.
|
|
|
-type squery_row() :: tuple(). % tuple of binary().
|
|
|
-type equery_row() :: tuple(). % tuple of bind_param().
|
|
|
-type ok_reply(RowType) ::
|
|
|
- {ok, ColumnsDescription :: [column()], RowsValues :: [RowType]} | % select
|
|
|
- {ok, Count :: non_neg_integer()} | % update/insert/delete
|
|
|
- {ok, Count :: non_neg_integer(), ColumnsDescription :: [column()], RowsValues :: [RowType]}. % update/insert/delete + returning
|
|
|
+ %% select
|
|
|
+ {ok, ColumnsDescription :: [column()], RowsValues :: [RowType]} |
|
|
|
+ %% update/insert/delete
|
|
|
+ {ok, Count :: non_neg_integer()} |
|
|
|
+ %% update/insert/delete + returning
|
|
|
+ {ok, Count :: non_neg_integer(), ColumnsDescription :: [column()], RowsValues :: [RowType]}.
|
|
|
-type error_reply() :: {error, query_error()}.
|
|
|
-type reply(RowType) :: ok_reply(RowType) | error_reply().
|
|
|
-type lsn() :: integer().
|
|
@@ -118,10 +123,10 @@
|
|
|
-> {ok, Connection :: connection()} | {error, Reason :: connect_error()}.
|
|
|
connect(Settings0) ->
|
|
|
Settings = to_proplist(Settings0),
|
|
|
- Host = proplists:get_value(host, Settings, "localhost"),
|
|
|
- Username = proplists:get_value(username, Settings, os:getenv("USER")),
|
|
|
- Password = proplists:get_value(password, Settings, ""),
|
|
|
- connect(Host, Username, Password, Settings).
|
|
|
+ Host = proplists:get_value(host, Settings, "localhost"),
|
|
|
+ Username = proplists:get_value(username, Settings, os:getenv("USER")),
|
|
|
+ Password = proplists:get_value(password, Settings, ""),
|
|
|
+ connect(Host, Username, Password, Settings).
|
|
|
|
|
|
connect(Host, Opts) ->
|
|
|
connect(Host, os:getenv("USER"), "", Opts).
|
|
@@ -220,8 +225,8 @@ equery(C, Sql) ->
|
|
|
equery(C, Sql, Parameters) ->
|
|
|
case parse(C, "", Sql, []) of
|
|
|
{ok, #statement{types = Types} = S} ->
|
|
|
- Typed_Parameters = lists:zip(Types, Parameters),
|
|
|
- epgsql_sock:sync_command(C, epgsql_cmd_equery, {S, Typed_Parameters});
|
|
|
+ TypedParameters = lists:zip(Types, Parameters),
|
|
|
+ epgsql_sock:sync_command(C, epgsql_cmd_equery, {S, TypedParameters});
|
|
|
Error ->
|
|
|
Error
|
|
|
end.
|
|
@@ -242,8 +247,8 @@ equery(C, Name, Sql, Parameters) ->
|
|
|
prepared_query(C, Name, Parameters) ->
|
|
|
case describe(C, statement, Name) of
|
|
|
{ok, #statement{types = Types} = S} ->
|
|
|
- Typed_Parameters = lists:zip(Types, Parameters),
|
|
|
- epgsql_sock:sync_command(C, epgsql_cmd_prepared_query, {S, Typed_Parameters});
|
|
|
+ TypedParameters = lists:zip(Types, Parameters),
|
|
|
+ epgsql_sock:sync_command(C, epgsql_cmd_prepared_query, {S, TypedParameters});
|
|
|
Error ->
|
|
|
Error
|
|
|
end.
|
|
@@ -269,7 +274,7 @@ parse(C, Name, Sql, Types) ->
|
|
|
bind(C, Statement, Parameters) ->
|
|
|
bind(C, Statement, "", Parameters).
|
|
|
|
|
|
--spec bind(connection(), #statement{}, string(), [bind_param()]) ->
|
|
|
+-spec bind(connection(), statement(), string(), [bind_param()]) ->
|
|
|
epgsql_cmd_bind:response().
|
|
|
bind(C, Statement, PortalName, Parameters) ->
|
|
|
sync_on_error(
|
|
@@ -285,18 +290,18 @@ execute(C, S) ->
|
|
|
execute(C, S, N) ->
|
|
|
execute(C, S, "", N).
|
|
|
|
|
|
--spec execute(connection(), #statement{}, string(), non_neg_integer()) -> Reply when
|
|
|
+-spec execute(connection(), statement(), string(), non_neg_integer()) -> Reply when
|
|
|
Reply :: epgsql_cmd_execute:response().
|
|
|
execute(C, S, PortalName, N) ->
|
|
|
epgsql_sock:sync_command(C, epgsql_cmd_execute, {S, PortalName, N}).
|
|
|
|
|
|
--spec execute_batch(connection(), [{#statement{}, [bind_param()]}]) ->
|
|
|
+-spec execute_batch(connection(), [{statement(), [bind_param()]}]) ->
|
|
|
epgsql_cmd_batch:response().
|
|
|
execute_batch(C, Batch) ->
|
|
|
epgsql_sock:sync_command(C, epgsql_cmd_batch, Batch).
|
|
|
|
|
|
%% statement/portal functions
|
|
|
--spec describe(connection(), #statement{}) -> epgsql_cmd_describe_statement:response().
|
|
|
+-spec describe(connection(), statement()) -> epgsql_cmd_describe_statement:response().
|
|
|
describe(C, #statement{name = Name}) ->
|
|
|
describe(C, statement, Name).
|
|
|
|
|
@@ -313,7 +318,7 @@ describe(C, portal, Name) ->
|
|
|
C, epgsql_cmd_describe_portal, Name)).
|
|
|
|
|
|
%% @doc close statement
|
|
|
--spec close(connection(), #statement{}) -> epgsql_cmd_close:response().
|
|
|
+-spec close(connection(), statement()) -> epgsql_cmd_close:response().
|
|
|
close(C, #statement{name = Name}) ->
|
|
|
close(C, statement, Name).
|
|
|
|
|
@@ -392,10 +397,14 @@ sync_on_error(_C, R) ->
|
|
|
R.
|
|
|
|
|
|
-spec standby_status_update(connection(), lsn(), lsn()) -> ok.
|
|
|
-%% @doc sends last flushed and applied WAL positions to the server in a standby status update message via given `Connection'
|
|
|
+%% @doc sends last flushed and applied WAL positions to the server in a standby status update message via
|
|
|
+%% given `Connection'
|
|
|
standby_status_update(Connection, FlushedLSN, AppliedLSN) ->
|
|
|
gen_server:call(Connection, {standby_status_update, FlushedLSN, AppliedLSN}).
|
|
|
|
|
|
+handle_x_log_data(Mod, StartLSN, EndLSN, WALRecord, Repl) ->
|
|
|
+ Mod:handle_x_log_data(StartLSN, EndLSN, WALRecord, Repl).
|
|
|
+
|
|
|
-spec start_replication(connection(), string(), Callback, cb_state(), string(), string()) -> Response when
|
|
|
Response :: epgsql_cmd_start_replication:response(),
|
|
|
Callback :: module() | pid().
|