|
@@ -1,12 +1,34 @@
|
|
|
%%% Copyright (C) 2009 - Will Glozer. All rights reserved.
|
|
|
%%% Copyright (C) 2011 - Anton Lebedevich. All rights reserved.
|
|
|
|
|
|
+%%% @doc GenServer holding all connection state (including socket).
|
|
|
+%%%
|
|
|
+%%% See https://www.postgresql.org/docs/current/static/protocol-flow.html
|
|
|
+%%% Commands in PostgreSQL are pipelined: you don't need to wait for reply to
|
|
|
+%%% be able to send next command.
|
|
|
+%%% Commands are processed (and responses to them are generated) in FIFO order.
|
|
|
+%%% eg, if you execute 2 SimpleQuery: #1 and #2, first you get all response
|
|
|
+%%% packets for #1 and then all for #2:
|
|
|
+%%% > SQuery #1
|
|
|
+%%% > SQuery #2
|
|
|
+%%% < RowDescription #1
|
|
|
+%%% < DataRow #1
|
|
|
+%%% < CommandComplete #1
|
|
|
+%%% < RowDescription #2
|
|
|
+%%% < DataRow #2
|
|
|
+%%% < CommandComplete #2
|
|
|
+%%%
|
|
|
+%%% See epgsql_cmd_connect for network connection and authentication setup
|
|
|
+
|
|
|
+
|
|
|
-module(epgsql_sock).
|
|
|
|
|
|
-behavior(gen_server).
|
|
|
|
|
|
-export([start_link/0,
|
|
|
close/1,
|
|
|
+ sync_command/3,
|
|
|
+ async_command/4,
|
|
|
get_parameter/2,
|
|
|
set_notice_receiver/2,
|
|
|
get_cmd_status/1,
|
|
@@ -15,82 +37,48 @@
|
|
|
-export([handle_call/3, handle_cast/2, handle_info/2]).
|
|
|
-export([init/1, code_change/3, terminate/2]).
|
|
|
|
|
|
-%% state callbacks
|
|
|
--export([auth/2, initializing/2, on_message/2]).
|
|
|
+%% loop callback
|
|
|
+-export([on_message/3, on_replication/3]).
|
|
|
+
|
|
|
+%% Comand's APIs
|
|
|
+-export([set_net_socket/3, init_replication_state/1, set_attr/3, get_codec/1,
|
|
|
+ get_rows/1, get_results/1, notify/2, send/2, send/3, send_multi/2,
|
|
|
+ get_parameter_internal/2,
|
|
|
+ get_replication_state/1, set_packet_handler/2]).
|
|
|
+
|
|
|
+-export_type([transport/0, pg_sock/0]).
|
|
|
|
|
|
-include("epgsql.hrl").
|
|
|
--include("epgsql_binary.hrl").
|
|
|
-
|
|
|
-%% Commands defined as per this page:
|
|
|
-%% http://www.postgresql.org/docs/9.2/static/protocol-message-formats.html
|
|
|
-
|
|
|
-%% Commands
|
|
|
--define(BIND, $B).
|
|
|
--define(CLOSE, $C).
|
|
|
--define(DESCRIBE, $D).
|
|
|
--define(EXECUTE, $E).
|
|
|
--define(FLUSH, $H).
|
|
|
--define(PASSWORD, $p).
|
|
|
--define(PARSE, $P).
|
|
|
--define(SIMPLEQUERY, $Q).
|
|
|
--define(AUTHENTICATION_REQUEST, $R).
|
|
|
--define(SYNC, $S).
|
|
|
-
|
|
|
-%% Parameters
|
|
|
-
|
|
|
--define(PREPARED_STATEMENT, $S).
|
|
|
--define(PORTAL, $P).
|
|
|
-
|
|
|
-%% Responses
|
|
|
-
|
|
|
--define(PARSE_COMPLETE, $1).
|
|
|
--define(BIND_COMPLETE, $2).
|
|
|
--define(CLOSE_COMPLETE, $3).
|
|
|
--define(NOTIFICATION, $A).
|
|
|
--define(COMMAND_COMPLETE, $C).
|
|
|
--define(DATA_ROW, $D).
|
|
|
--define(EMPTY_QUERY, $I).
|
|
|
--define(CANCELLATION_KEY, $K).
|
|
|
--define(NO_DATA, $n).
|
|
|
--define(NOTICE, $N).
|
|
|
--define(PORTAL_SUSPENDED, $s).
|
|
|
--define(PARAMETER_STATUS, $S).
|
|
|
--define(PARAMETER_DESCRIPTION, $t).
|
|
|
--define(ROW_DESCRIPTION, $T).
|
|
|
--define(READY_FOR_QUERY, $Z).
|
|
|
--define(COPY_BOTH_RESPONSE, $W).
|
|
|
--define(COPY_DATA, $d).
|
|
|
-
|
|
|
-% CopyData replication messages
|
|
|
--define(X_LOG_DATA, $w).
|
|
|
--define(PRIMARY_KEEPALIVE_MESSAGE, $k).
|
|
|
--define(STANDBY_STATUS_UPDATE, $r).
|
|
|
-
|
|
|
--record(state, {mod,
|
|
|
- sock,
|
|
|
+-include("protocol.hrl").
|
|
|
+-include("epgsql_replication.hrl").
|
|
|
+
|
|
|
+-type transport() :: {call, any()}
|
|
|
+ | {cast, pid(), reference()}
|
|
|
+ | {incremental, pid(), reference()}.
|
|
|
+
|
|
|
+-type tcp_socket() :: port(). %gen_tcp:socket() isn't exported prior to erl 18
|
|
|
+-type repl_state() :: #repl{}.
|
|
|
+
|
|
|
+-record(state, {mod :: gen_tcp | ssl | undefined,
|
|
|
+ sock :: tcp_socket() | ssl:sslsocket() | undefined,
|
|
|
data = <<>>,
|
|
|
- backend,
|
|
|
- handler,
|
|
|
- codec,
|
|
|
- queue = queue:new(),
|
|
|
- async,
|
|
|
- parameters = [],
|
|
|
- types = [],
|
|
|
- columns = [],
|
|
|
- rows = [],
|
|
|
+ backend :: {Pid :: integer(), Key :: integer()} | undefined,
|
|
|
+ handler = on_message :: on_message | on_replication | undefined,
|
|
|
+ codec :: epgsql_binary:codec() | undefined,
|
|
|
+ queue = queue:new() :: queue:queue({epgsql_command:command(), any(), transport()}),
|
|
|
+ current_cmd :: epgsql_command:command() | undefined,
|
|
|
+ current_cmd_state :: any() | undefined,
|
|
|
+ current_cmd_transport :: transport() | undefined,
|
|
|
+ async :: undefined | atom() | pid(),
|
|
|
+ parameters = [] :: [{Key :: binary(), Value :: binary()}],
|
|
|
+ rows = [] :: [tuple()],
|
|
|
results = [],
|
|
|
- batch = [],
|
|
|
- sync_required,
|
|
|
- txstatus,
|
|
|
- complete_status :: undefined | atom() | {atom(), integer()},
|
|
|
- repl_last_received_lsn,
|
|
|
- repl_last_flushed_lsn,
|
|
|
- repl_last_applied_lsn,
|
|
|
- repl_feedback_required,
|
|
|
- repl_cbmodule,
|
|
|
- repl_cbstate,
|
|
|
- repl_receiver,
|
|
|
- repl_align_lsn}).
|
|
|
+ sync_required :: boolean() | undefined,
|
|
|
+ txstatus :: byte() | undefined, % $I | $T | $E,
|
|
|
+ complete_status :: atom() | {atom(), integer()} | undefined,
|
|
|
+ repl :: repl_state() | undefined}).
|
|
|
+
|
|
|
+-opaque pg_sock() :: #state{}.
|
|
|
|
|
|
%% -- client interface --
|
|
|
|
|
@@ -101,6 +89,18 @@ close(C) when is_pid(C) ->
|
|
|
catch gen_server:cast(C, stop),
|
|
|
ok.
|
|
|
|
|
|
+-spec sync_command(epgsql:connection(), epgsql_command:command(), any()) -> any().
|
|
|
+sync_command(C, Command, Args) ->
|
|
|
+ gen_server:call(C, {command, Command, Args}, infinity).
|
|
|
+
|
|
|
+-spec async_command(epgsql:connection(), cast | incremental,
|
|
|
+ epgsql_command:command(), any()) -> reference().
|
|
|
+async_command(C, Transport, Command, Args) ->
|
|
|
+ Ref = make_ref(),
|
|
|
+ Pid = self(),
|
|
|
+ ok = gen_server:cast(C, {{Transport, Pid, Ref}, Command, Args}),
|
|
|
+ Ref.
|
|
|
+
|
|
|
get_parameter(C, Name) ->
|
|
|
gen_server:call(C, {get_parameter, to_binary(Name)}, infinity).
|
|
|
|
|
@@ -114,21 +114,72 @@ get_cmd_status(C) ->
|
|
|
cancel(S) ->
|
|
|
gen_server:cast(S, cancel).
|
|
|
|
|
|
+
|
|
|
+%% -- command APIs --
|
|
|
+
|
|
|
+%% send()
|
|
|
+%% send_many()
|
|
|
+
|
|
|
+-spec set_net_socket(gen_tcp | ssl, tcp_socket() | ssl:sslsocket(), pg_sock()) -> pg_sock().
|
|
|
+set_net_socket(Mod, Socket, State) ->
|
|
|
+ State1 = State#state{mod = Mod, sock = Socket},
|
|
|
+ setopts(State1, [{active, true}]),
|
|
|
+ State1.
|
|
|
+
|
|
|
+-spec init_replication_state(pg_sock()) -> pg_sock().
|
|
|
+init_replication_state(State) ->
|
|
|
+ State#state{repl = #repl{}}.
|
|
|
+
|
|
|
+-spec set_attr(atom(), any(), pg_sock()) -> pg_sock().
|
|
|
+set_attr(backend, {_Pid, _Key} = Backend, State) ->
|
|
|
+ State#state{backend = Backend};
|
|
|
+set_attr(async, Async, State) ->
|
|
|
+ State#state{async = Async};
|
|
|
+set_attr(txstatus, Status, State) ->
|
|
|
+ State#state{txstatus = Status};
|
|
|
+set_attr(codec, Codec, State) ->
|
|
|
+ State#state{codec = Codec};
|
|
|
+set_attr(sync_required, Value, State) ->
|
|
|
+ State#state{sync_required = Value};
|
|
|
+set_attr(replication_state, Value, State) ->
|
|
|
+ State#state{repl = Value}.
|
|
|
+
|
|
|
+%% XXX: be careful!
|
|
|
+-spec set_packet_handler(atom(), pg_sock()) -> pg_sock().
|
|
|
+set_packet_handler(Handler, State) ->
|
|
|
+ State#state{handler = Handler}.
|
|
|
+
|
|
|
+-spec get_codec(pg_sock()) -> epgsql_binary:codec().
|
|
|
+get_codec(#state{codec = Codec}) ->
|
|
|
+ Codec.
|
|
|
+
|
|
|
+-spec get_replication_state(pg_sock()) -> repl_state().
|
|
|
+get_replication_state(#state{repl = Repl}) ->
|
|
|
+ Repl.
|
|
|
+
|
|
|
+-spec get_rows(pg_sock()) -> [tuple()].
|
|
|
+get_rows(#state{rows = Rows}) ->
|
|
|
+ lists:reverse(Rows).
|
|
|
+
|
|
|
+-spec get_results(pg_sock()) -> [any()].
|
|
|
+get_results(#state{results = Results}) ->
|
|
|
+ lists:reverse(Results).
|
|
|
+
|
|
|
+-spec get_parameter_internal(binary(), pg_sock()) -> binary() | undefined.
|
|
|
+get_parameter_internal(Name, #state{parameters = Parameters}) ->
|
|
|
+ case lists:keysearch(Name, 1, Parameters) of
|
|
|
+ {value, {Name, Value}} -> Value;
|
|
|
+ false -> undefined
|
|
|
+ end.
|
|
|
+
|
|
|
+
|
|
|
%% -- gen_server implementation --
|
|
|
|
|
|
init([]) ->
|
|
|
{ok, #state{}}.
|
|
|
|
|
|
-handle_call({update_type_cache, TypeInfos}, _From, #state{codec = Codec} = State) ->
|
|
|
- Codec2 = epgsql_binary:update_type_cache(TypeInfos, Codec),
|
|
|
- {reply, ok, State#state{codec = Codec2}};
|
|
|
-
|
|
|
handle_call({get_parameter, Name}, _From, State) ->
|
|
|
- Value1 = case lists:keysearch(Name, 1, State#state.parameters) of
|
|
|
- {value, {Name, Value}} -> Value;
|
|
|
- false -> undefined
|
|
|
- end,
|
|
|
- {reply, {ok, Value1}, State};
|
|
|
+ {reply, {ok, get_parameter_internal(Name, State)}, State};
|
|
|
|
|
|
handle_call({set_async_receiver, PidOrName}, _From, #state{async = Previous} = State) ->
|
|
|
{reply, {ok, Previous}, State#state{async = PidOrName}};
|
|
@@ -137,23 +188,21 @@ handle_call(get_cmd_status, _From, #state{complete_status = Status} = State) ->
|
|
|
{reply, {ok, Status}, State};
|
|
|
|
|
|
handle_call({standby_status_update, FlushedLSN, AppliedLSN}, _From,
|
|
|
- #state{repl_last_received_lsn = ReceivedLSN} = State) ->
|
|
|
+ #state{handler = on_replication,
|
|
|
+ repl = #repl{last_received_lsn = ReceivedLSN} = Repl} = State) ->
|
|
|
send(State, ?COPY_DATA, epgsql_wire:encode_standby_status_update(ReceivedLSN, FlushedLSN, AppliedLSN)),
|
|
|
- {reply, ok, State#state{repl_last_flushed_lsn = FlushedLSN, repl_last_applied_lsn = AppliedLSN}};
|
|
|
-
|
|
|
-handle_call(Command, From, State) ->
|
|
|
- #state{queue = Q} = State,
|
|
|
- Req = {{call, From}, Command},
|
|
|
- command(Command, State#state{queue = queue:in(Req, Q),
|
|
|
- complete_status = undefined}).
|
|
|
-
|
|
|
-handle_cast({{Method, From, Ref}, Command} = Req, State)
|
|
|
+ Repl1 = Repl#repl{last_flushed_lsn = FlushedLSN,
|
|
|
+ last_applied_lsn = AppliedLSN},
|
|
|
+ {reply, ok, State#state{repl = Repl1}};
|
|
|
+handle_call({command, Command, Args}, From, State) ->
|
|
|
+ Transport = {call, From},
|
|
|
+ command_new(Transport, Command, Args, State).
|
|
|
+
|
|
|
+handle_cast({{Method, From, Ref} = Transport, Command, Args}, State)
|
|
|
when ((Method == cast) or (Method == incremental)),
|
|
|
is_pid(From),
|
|
|
is_reference(Ref) ->
|
|
|
- #state{queue = Q} = State,
|
|
|
- command(Command, State#state{queue = queue:in(Req, Q),
|
|
|
- complete_status = undefined});
|
|
|
+ command_new(Transport, Command, Args, State);
|
|
|
|
|
|
handle_cast(stop, State) ->
|
|
|
{stop, normal, flush_queue(State, {error, closed})};
|
|
@@ -199,236 +248,134 @@ code_change(_OldVsn, State, _Extra) ->
|
|
|
|
|
|
%% -- internal functions --
|
|
|
|
|
|
-command(Command, State = #state{sync_required = true})
|
|
|
- when Command /= sync ->
|
|
|
- {noreply, finish(State, {error, sync_required})};
|
|
|
-
|
|
|
-command({connect, Host, Username, Password, Opts}, State) ->
|
|
|
- Timeout = proplists:get_value(timeout, Opts, 5000),
|
|
|
- Port = proplists:get_value(port, Opts, 5432),
|
|
|
- SockOpts = [{active, false}, {packet, raw}, binary, {nodelay, true}, {keepalive, true}],
|
|
|
- case gen_tcp:connect(Host, Port, SockOpts, Timeout) of
|
|
|
- {ok, Sock} ->
|
|
|
-
|
|
|
- %% Increase the buffer size. Following the recommendation in the inet man page:
|
|
|
- %%
|
|
|
- %% It is recommended to have val(buffer) >=
|
|
|
- %% max(val(sndbuf),val(recbuf)).
|
|
|
-
|
|
|
- {ok, [{recbuf, RecBufSize}, {sndbuf, SndBufSize}]} =
|
|
|
- inet:getopts(Sock, [recbuf, sndbuf]),
|
|
|
- inet:setopts(Sock, [{buffer, max(RecBufSize, SndBufSize)}]),
|
|
|
-
|
|
|
- State2 = case proplists:get_value(ssl, Opts) of
|
|
|
- T when T == true; T == required ->
|
|
|
- start_ssl(Sock, T, Opts, State);
|
|
|
- _ ->
|
|
|
- State#state{mod = gen_tcp, sock = Sock}
|
|
|
- end,
|
|
|
-
|
|
|
- Opts2 = ["user", 0, Username, 0],
|
|
|
- Opts3 = case proplists:get_value(database, Opts, undefined) of
|
|
|
- undefined -> Opts2;
|
|
|
- Database -> [Opts2 | ["database", 0, Database, 0]]
|
|
|
- end,
|
|
|
-
|
|
|
- Opts4 = case proplists:get_value(replication, Opts, undefined) of
|
|
|
- undefined -> Opts3;
|
|
|
- Replication -> [Opts3 | ["replication", 0, Replication, 0]]
|
|
|
- end,
|
|
|
+-spec command_new(transport(), epgsql_command:command(), any(), pg_sock()) ->
|
|
|
+ Result when
|
|
|
+ Result :: {noreply, pg_sock()}
|
|
|
+ | {stop, Reason :: any(), pg_sock()}.
|
|
|
+command_new(Transport, Command, Args, State) ->
|
|
|
+ CmdState = epgsql_command:init(Command, Args),
|
|
|
+ command_exec(Transport, Command, CmdState, State).
|
|
|
+
|
|
|
+-spec command_exec(transport(), epgsql_command:command(), any(), pg_sock()) ->
|
|
|
+ Result when
|
|
|
+ Result :: {noreply, pg_sock()}
|
|
|
+ | {stop, Reason :: any(), pg_sock()}.
|
|
|
+command_exec(Transport, Command, _, State = #state{sync_required = true})
|
|
|
+ when Command /= epgsql_cmd_sync ->
|
|
|
+ {noreply,
|
|
|
+ finish(State#state{current_cmd = Command,
|
|
|
+ current_cmd_transport = Transport},
|
|
|
+ {error, sync_required})};
|
|
|
+command_exec(Transport, Command, CmdState, State) ->
|
|
|
+ case epgsql_command:execute(Command, State, CmdState) of
|
|
|
+ {ok, State1, CmdState1} ->
|
|
|
+ {noreply, command_enqueue(Transport, Command, CmdState1, State1)};
|
|
|
+ {stop, StopReason, Response, State1} ->
|
|
|
+ reply(Transport, Response, Response),
|
|
|
+ {stop, StopReason, State1}
|
|
|
+ end.
|
|
|
|
|
|
- send(State2, [<<196608:?int32>>, Opts4, 0]),
|
|
|
- Async = proplists:get_value(async, Opts, undefined),
|
|
|
- setopts(State2, [{active, true}]),
|
|
|
- put(username, Username),
|
|
|
- put(password, Password),
|
|
|
+-spec command_enqueue(transport(), epgsql_command:command(), epgsql_command:state(), pg_sock()) -> pg_sock().
|
|
|
+command_enqueue(Transport, Command, CmdState, #state{current_cmd = undefined} = State) ->
|
|
|
+ State#state{current_cmd = Command,
|
|
|
+ current_cmd_state = CmdState,
|
|
|
+ current_cmd_transport = Transport,
|
|
|
+ complete_status = undefined};
|
|
|
+command_enqueue(Transport, Command, CmdState, #state{queue = Q} = State) ->
|
|
|
+ State#state{queue = queue:in({Command, CmdState, Transport}, Q),
|
|
|
+ complete_status = undefined}.
|
|
|
+
|
|
|
+-spec command_handle_message(byte(), binary() | epgsql:query_error(), pg_sock()) ->
|
|
|
+ {noreply, pg_sock()}
|
|
|
+ | {stop, any(), pg_sock()}.
|
|
|
+command_handle_message(Msg, Payload,
|
|
|
+ #state{current_cmd = Command,
|
|
|
+ current_cmd_state = CmdState} = State) ->
|
|
|
+ case epgsql_command:handle_message(Command, Msg, Payload, State, CmdState) of
|
|
|
+ {add_row, Row, State1, CmdState1} ->
|
|
|
+ {noreply, add_row(State1#state{current_cmd_state = CmdState1}, Row)};
|
|
|
+ {add_result, Result, Notice, State1, CmdState1} ->
|
|
|
{noreply,
|
|
|
- State2#state{handler = auth,
|
|
|
- async = Async}};
|
|
|
-
|
|
|
- {error, Reason} = Error ->
|
|
|
- {stop, Reason, finish(State, Error)}
|
|
|
- end;
|
|
|
-
|
|
|
-command({squery, Sql}, State) ->
|
|
|
- send(State, ?SIMPLEQUERY, [Sql, 0]),
|
|
|
- {noreply, State};
|
|
|
-
|
|
|
-%% TODO add fast_equery command that doesn't need parsed statement,
|
|
|
-%% uses default (text) column format,
|
|
|
-%% sends Describe after Bind to get RowDescription
|
|
|
-command({equery, Statement, Parameters}, #state{codec = Codec} = State) ->
|
|
|
- #statement{name = StatementName, columns = Columns} = Statement,
|
|
|
- Bin1 = epgsql_wire:encode_parameters(Parameters, Codec),
|
|
|
- Bin2 = epgsql_wire:encode_formats(Columns),
|
|
|
- send_multi(State, [
|
|
|
- {?BIND, ["", 0, StatementName, 0, Bin1, Bin2]},
|
|
|
- {?EXECUTE, ["", 0, <<0:?int32>>]},
|
|
|
- {?CLOSE, [?PREPARED_STATEMENT, StatementName, 0]},
|
|
|
- {?SYNC, []}
|
|
|
- ]),
|
|
|
- {noreply, State};
|
|
|
-
|
|
|
-command({prepared_query, Statement, Parameters}, #state{codec = Codec} = State) ->
|
|
|
- #statement{name = StatementName, columns = Columns} = Statement,
|
|
|
- Bin1 = epgsql_wire:encode_parameters(Parameters, Codec),
|
|
|
- Bin2 = epgsql_wire:encode_formats(Columns),
|
|
|
- send_multi(State, [
|
|
|
- {?BIND, ["", 0, StatementName, 0, Bin1, Bin2]},
|
|
|
- {?EXECUTE, ["", 0, <<0:?int32>>]},
|
|
|
- {?SYNC, []}
|
|
|
- ]),
|
|
|
- {noreply, State};
|
|
|
-
|
|
|
-command({parse, Name, Sql, Types}, State) ->
|
|
|
- Bin = epgsql_wire:encode_types(Types, State#state.codec),
|
|
|
- send_multi(State, [
|
|
|
- {?PARSE, [Name, 0, Sql, 0, Bin]},
|
|
|
- {?DESCRIBE, [?PREPARED_STATEMENT, Name, 0]},
|
|
|
- {?FLUSH, []}
|
|
|
- ]),
|
|
|
- {noreply, State};
|
|
|
-
|
|
|
-command({bind, Statement, PortalName, Parameters}, #state{codec = Codec} = State) ->
|
|
|
- #statement{name = StatementName, columns = Columns, types = Types} = Statement,
|
|
|
- Typed_Parameters = lists:zip(Types, Parameters),
|
|
|
- Bin1 = epgsql_wire:encode_parameters(Typed_Parameters, Codec),
|
|
|
- Bin2 = epgsql_wire:encode_formats(Columns),
|
|
|
- send_multi(State, [
|
|
|
- {?BIND, [PortalName, 0, StatementName, 0, Bin1, Bin2]},
|
|
|
- {?FLUSH, []}
|
|
|
- ]),
|
|
|
- {noreply, State};
|
|
|
-
|
|
|
-command({execute, _Statement, PortalName, MaxRows}, State) ->
|
|
|
- send_multi(State, [
|
|
|
- {?EXECUTE, [PortalName, 0, <<MaxRows:?int32>>]},
|
|
|
- {?FLUSH, []}
|
|
|
- ]),
|
|
|
- {noreply, State};
|
|
|
-
|
|
|
-command({execute_batch, Batch}, #state{codec = Codec} = State) ->
|
|
|
- Commands =
|
|
|
- lists:foldr(
|
|
|
- fun({Statement, Parameters}, Acc) ->
|
|
|
- #statement{name = StatementName,
|
|
|
- columns = Columns,
|
|
|
- types = Types} = Statement,
|
|
|
- Typed_Parameters = lists:zip(Types, Parameters),
|
|
|
- Bin1 = epgsql_wire:encode_parameters(Typed_Parameters, Codec),
|
|
|
- Bin2 = epgsql_wire:encode_formats(Columns),
|
|
|
- [{?BIND, [0, StatementName, 0, Bin1, Bin2]},
|
|
|
- {?EXECUTE, [0, <<0:?int32>>]} | Acc]
|
|
|
- end,
|
|
|
- [{?SYNC, []}],
|
|
|
- Batch),
|
|
|
- send_multi(State, Commands),
|
|
|
- {noreply, State};
|
|
|
-
|
|
|
-command({describe_statement, Name}, State) ->
|
|
|
- send_multi(State, [
|
|
|
- {?DESCRIBE, [?PREPARED_STATEMENT, Name, 0]},
|
|
|
- {?FLUSH, []}
|
|
|
- ]),
|
|
|
- {noreply, State};
|
|
|
-
|
|
|
-command({describe_portal, Name}, State) ->
|
|
|
- send_multi(State, [
|
|
|
- {?DESCRIBE, [?PORTAL, Name, 0]},
|
|
|
- {?FLUSH, []}
|
|
|
- ]),
|
|
|
- {noreply, State};
|
|
|
-
|
|
|
-command({close, Type, Name}, State) ->
|
|
|
- Type2 = case Type of
|
|
|
- statement -> ?PREPARED_STATEMENT;
|
|
|
- portal -> ?PORTAL
|
|
|
- end,
|
|
|
- send_multi(State, [
|
|
|
- {?CLOSE, [Type2, Name, 0]},
|
|
|
- {?FLUSH, []}
|
|
|
- ]),
|
|
|
- {noreply, State};
|
|
|
-
|
|
|
-command(sync, State) ->
|
|
|
- send(State, ?SYNC, []),
|
|
|
- {noreply, State#state{sync_required = false}};
|
|
|
-
|
|
|
-command({start_replication, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts, Opts}, State) ->
|
|
|
- Sql1 = ["START_REPLICATION SLOT """, ReplicationSlot, """ LOGICAL ", WALPosition],
|
|
|
- Sql2 =
|
|
|
- case PluginOpts of
|
|
|
- [] -> Sql1;
|
|
|
- PluginOpts -> [Sql1 , " (", PluginOpts, ")"]
|
|
|
- end,
|
|
|
-
|
|
|
- State2 =
|
|
|
- case Callback of
|
|
|
- Pid when is_pid(Pid) -> State#state{repl_receiver = Pid};
|
|
|
- Module -> State#state{repl_cbmodule = Module, repl_cbstate = CbInitState}
|
|
|
- end,
|
|
|
+ add_result(State1#state{current_cmd_state = CmdState1},
|
|
|
+ Notice, Result)};
|
|
|
+ {finish, Result, Notice, State1} ->
|
|
|
+ {noreply, finish(State1, Notice, Result)};
|
|
|
+ {noaction, State1} ->
|
|
|
+ {noreply, State1};
|
|
|
+ {noaction, State1, CmdState1} ->
|
|
|
+ {noreply, State1#state{current_cmd_state = CmdState1}};
|
|
|
+ {requeue, State1, CmdState1} ->
|
|
|
+ Transport = State1#state.current_cmd_transport,
|
|
|
+ command_exec(Transport, Command, CmdState1,
|
|
|
+ State1#state{current_cmd = undefined});
|
|
|
+ {stop, Reason, Response, State1} ->
|
|
|
+ {stop, Reason, finish(State1, Response)};
|
|
|
+ {sync_required, Why} ->
|
|
|
+ %% Protocol error. Finish and flush all pending commands.
|
|
|
+ {noreply, sync_required(finish(State#state{sync_required = true}, Why))};
|
|
|
+ unknown ->
|
|
|
+ {stop, {error, {unexpected_message, Msg, Command, CmdState}}, State}
|
|
|
+ end.
|
|
|
|
|
|
- Hex = [H || H <- WALPosition, H =/= $/],
|
|
|
- {ok, [LSN], _} = io_lib:fread("~16u", Hex),
|
|
|
- AlignLsn = proplists:get_value(align_lsn, Opts, false),
|
|
|
- State3 = State2#state{repl_last_flushed_lsn = LSN, repl_last_applied_lsn = LSN, repl_align_lsn = AlignLsn},
|
|
|
-
|
|
|
- send(State3, ?SIMPLEQUERY, [Sql2, 0]),
|
|
|
- {noreply, State3}.
|
|
|
-
|
|
|
-start_ssl(S, Flag, Opts, State) ->
|
|
|
- ok = gen_tcp:send(S, <<8:?int32, 80877103:?int32>>),
|
|
|
- Timeout = proplists:get_value(timeout, Opts, 5000),
|
|
|
- {ok, <<Code>>} = gen_tcp:recv(S, 1, Timeout),
|
|
|
- case Code of
|
|
|
- $S ->
|
|
|
- SslOpts = proplists:get_value(ssl_opts, Opts, []),
|
|
|
- case ssl:connect(S, SslOpts, Timeout) of
|
|
|
- {ok, S2} -> State#state{mod = ssl, sock = S2};
|
|
|
- {error, Reason} -> exit({ssl_negotiation_failed, Reason})
|
|
|
- end;
|
|
|
- $N ->
|
|
|
- case Flag of
|
|
|
- true -> State;
|
|
|
- required -> exit(ssl_not_available)
|
|
|
- end
|
|
|
+command_next(#state{current_cmd = PrevCmd,
|
|
|
+ queue = Q} = State) when PrevCmd =/= undefined ->
|
|
|
+ case queue:out(Q) of
|
|
|
+ {empty, _} ->
|
|
|
+ State#state{current_cmd = undefined,
|
|
|
+ current_cmd_state = undefined,
|
|
|
+ current_cmd_transport = undefined,
|
|
|
+ rows = [],
|
|
|
+ results = []};
|
|
|
+ {{value, {Command, CmdState, Transport}}, Q1} ->
|
|
|
+ State#state{current_cmd = Command,
|
|
|
+ current_cmd_state = CmdState,
|
|
|
+ current_cmd_transport = Transport,
|
|
|
+ queue = Q1,
|
|
|
+ rows = [],
|
|
|
+ results = []}
|
|
|
end.
|
|
|
|
|
|
+
|
|
|
setopts(#state{mod = Mod, sock = Sock}, Opts) ->
|
|
|
case Mod of
|
|
|
gen_tcp -> inet:setopts(Sock, Opts);
|
|
|
ssl -> ssl:setopts(Sock, Opts)
|
|
|
end.
|
|
|
|
|
|
+%% This one only used in connection initiation to send client's
|
|
|
+%% `StartupMessage' and `SSLRequest' packets
|
|
|
+-spec send(pg_sock(), iodata()) -> ok | {error, any()}.
|
|
|
send(#state{mod = Mod, sock = Sock}, Data) ->
|
|
|
- do_send(Mod, Sock, epgsql_wire:encode(Data)).
|
|
|
+ do_send(Mod, Sock, epgsql_wire:encode_command(Data)).
|
|
|
|
|
|
+-spec send(pg_sock(), byte(), iodata()) -> ok | {error, any()}.
|
|
|
send(#state{mod = Mod, sock = Sock}, Type, Data) ->
|
|
|
- do_send(Mod, Sock, epgsql_wire:encode(Type, Data)).
|
|
|
+ do_send(Mod, Sock, epgsql_wire:encode_command(Type, Data)).
|
|
|
|
|
|
+-spec send_multi(pg_sock(), [{byte(), iodata()}]) -> ok | {error, any()}.
|
|
|
send_multi(#state{mod = Mod, sock = Sock}, List) ->
|
|
|
do_send(Mod, Sock, lists:map(fun({Type, Data}) ->
|
|
|
- epgsql_wire:encode(Type, Data)
|
|
|
+ epgsql_wire:encode_command(Type, Data)
|
|
|
end, List)).
|
|
|
|
|
|
do_send(gen_tcp, Sock, Bin) ->
|
|
|
+ %% Why not gen_tcp:send/2?
|
|
|
+ %% See https://github.com/rabbitmq/rabbitmq-common/blob/v3.7.4/src/rabbit_writer.erl#L367-L384
|
|
|
+ %% Because of that we also have `handle_info({inet_reply, ...`
|
|
|
try erlang:port_command(Sock, Bin) of
|
|
|
true ->
|
|
|
ok
|
|
|
catch
|
|
|
error:_Error ->
|
|
|
- {error,einval}
|
|
|
+ {error, einval}
|
|
|
end;
|
|
|
+do_send(ssl, Sock, Bin) ->
|
|
|
+ ssl:send(Sock, Bin).
|
|
|
|
|
|
-do_send(Mod, Sock, Bin) ->
|
|
|
- Mod:send(Sock, Bin).
|
|
|
-
|
|
|
-loop(#state{data = Data, handler = Handler, repl_last_received_lsn = LastReceivedLSN,
|
|
|
- repl_last_flushed_lsn = LastFlushedLSN, repl_last_applied_lsn = LastAppliedLSN,
|
|
|
- repl_feedback_required = ReplFeedbackRequired} = State) ->
|
|
|
+loop(#state{data = Data, handler = Handler, repl = Repl} = State) ->
|
|
|
case epgsql_wire:decode_message(Data) of
|
|
|
- {Message, Tail} ->
|
|
|
- case ?MODULE:Handler(Message, State#state{data = Tail}) of
|
|
|
+ {Type, Payload, Tail} ->
|
|
|
+ case ?MODULE:Handler(Type, Payload, State#state{data = Tail}) of
|
|
|
{noreply, State2} ->
|
|
|
loop(State2);
|
|
|
R = {stop, _Reason2, _State2} ->
|
|
@@ -436,11 +383,14 @@ loop(#state{data = Data, handler = Handler, repl_last_received_lsn = LastReceive
|
|
|
end;
|
|
|
_ ->
|
|
|
%% in replication mode send feedback after each batch of messages
|
|
|
- case ReplFeedbackRequired of
|
|
|
+ case (Repl =/= undefined) andalso (Repl#repl.feedback_required) of
|
|
|
true ->
|
|
|
+ #repl{last_received_lsn = LastReceivedLSN,
|
|
|
+ last_flushed_lsn = LastFlushedLSN,
|
|
|
+ last_applied_lsn = LastAppliedLSN} = Repl,
|
|
|
send(State, ?COPY_DATA, epgsql_wire:encode_standby_status_update(
|
|
|
LastReceivedLSN, LastFlushedLSN, LastAppliedLSN)),
|
|
|
- {noreply, State#state{repl_feedback_required = false}};
|
|
|
+ {noreply, State#state{repl = Repl#repl{feedback_required = false}}};
|
|
|
_ ->
|
|
|
{noreply, State}
|
|
|
end
|
|
@@ -449,44 +399,31 @@ loop(#state{data = Data, handler = Handler, repl_last_received_lsn = LastReceive
|
|
|
finish(State, Result) ->
|
|
|
finish(State, Result, Result).
|
|
|
|
|
|
-finish(State = #state{queue = Q}, Notice, Result) ->
|
|
|
- case queue:get(Q) of
|
|
|
- {{cast, From, Ref}, _} ->
|
|
|
- From ! {self(), Ref, Result};
|
|
|
- {{incremental, From, Ref}, _} ->
|
|
|
- From ! {self(), Ref, Notice};
|
|
|
- {{call, From}, _} ->
|
|
|
- gen_server:reply(From, Result)
|
|
|
- end,
|
|
|
- State#state{queue = queue:drop(Q),
|
|
|
- types = [],
|
|
|
- columns = [],
|
|
|
- rows = [],
|
|
|
- results = [],
|
|
|
- batch = []}.
|
|
|
+finish(State = #state{current_cmd_transport = Transport}, Notice, Result) ->
|
|
|
+ reply(Transport, Notice, Result),
|
|
|
+ command_next(State).
|
|
|
|
|
|
-add_result(State, Notice, Result) ->
|
|
|
- #state{queue = Q, results = Results, batch = Batch} = State,
|
|
|
- Results2 = case queue:get(Q) of
|
|
|
- {{incremental, From, Ref}, _} ->
|
|
|
+reply({cast, From, Ref}, _, Result) ->
|
|
|
+ From ! {self(), Ref, Result};
|
|
|
+reply({incremental, From, Ref}, Notice, _) ->
|
|
|
+ From ! {self(), Ref, Notice};
|
|
|
+reply({call, From}, _, Result) ->
|
|
|
+ gen_server:reply(From, Result).
|
|
|
+
|
|
|
+add_result(#state{results = Results, current_cmd_transport = Transport} = State, Notice, Result) ->
|
|
|
+ Results2 = case Transport of
|
|
|
+ {incremental, From, Ref} ->
|
|
|
From ! {self(), Ref, Notice},
|
|
|
Results;
|
|
|
_ ->
|
|
|
[Result | Results]
|
|
|
end,
|
|
|
- Batch2 = case Batch of
|
|
|
- [] -> [];
|
|
|
- _ -> tl(Batch)
|
|
|
- end,
|
|
|
- State#state{types = [],
|
|
|
- columns = [],
|
|
|
- rows = [],
|
|
|
- results = Results2,
|
|
|
- batch = Batch2}.
|
|
|
-
|
|
|
-add_row(State = #state{queue = Q, rows = Rows}, Data) ->
|
|
|
- Rows2 = case queue:get(Q) of
|
|
|
- {{incremental, From, Ref}, _} ->
|
|
|
+ State#state{rows = [],
|
|
|
+ results = Results2}.
|
|
|
+
|
|
|
+add_row(#state{rows = Rows, current_cmd_transport = Transport} = State, Data) ->
|
|
|
+ Rows2 = case Transport of
|
|
|
+ {incremental, From, Ref} ->
|
|
|
From ! {self(), Ref, {data, Data}},
|
|
|
Rows;
|
|
|
_ ->
|
|
@@ -494,15 +431,13 @@ add_row(State = #state{queue = Q, rows = Rows}, Data) ->
|
|
|
end,
|
|
|
State#state{rows = Rows2}.
|
|
|
|
|
|
-notify(State = #state{queue = Q}, Notice) ->
|
|
|
- case queue:get(Q) of
|
|
|
- {{incremental, From, Ref}, _} ->
|
|
|
- From ! {self(), Ref, Notice};
|
|
|
- _ ->
|
|
|
- ignore
|
|
|
- end,
|
|
|
+notify(#state{current_cmd_transport = {incremental, From, Ref}} = State, Notice) ->
|
|
|
+ From ! {self(), Ref, Notice},
|
|
|
+ State;
|
|
|
+notify(State, _) ->
|
|
|
State.
|
|
|
|
|
|
+%% Send asynchronous messages (notice / notification)
|
|
|
notify_async(#state{async = undefined}, _) ->
|
|
|
false;
|
|
|
notify_async(#state{async = PidOrName}, Msg) ->
|
|
@@ -513,311 +448,58 @@ notify_async(#state{async = PidOrName}, Msg) ->
|
|
|
false
|
|
|
end.
|
|
|
|
|
|
-command_tag(#state{queue = Q}) ->
|
|
|
- {_, Req} = queue:get(Q),
|
|
|
- if is_tuple(Req) ->
|
|
|
- element(1, Req);
|
|
|
- is_atom(Req) ->
|
|
|
- Req
|
|
|
- end.
|
|
|
-
|
|
|
-get_columns(State) ->
|
|
|
- #state{queue = Q, columns = Columns, batch = Batch} = State,
|
|
|
- case queue:get(Q) of
|
|
|
- {_, {Command, #statement{columns = C}, _}} when Command == equery; Command == prepared_query ->
|
|
|
- C;
|
|
|
- {_, {execute, #statement{columns = C}, _, _}} ->
|
|
|
- C;
|
|
|
- {_, {squery, _}} ->
|
|
|
- Columns;
|
|
|
- {_, {execute_batch, _}} ->
|
|
|
- [{#statement{columns = C}, _} | _] = Batch,
|
|
|
- C
|
|
|
- end.
|
|
|
-
|
|
|
-make_statement(State) ->
|
|
|
- #state{queue = Q, types = Types, columns = Columns} = State,
|
|
|
- Name = case queue:get(Q) of
|
|
|
- {_, {parse, N, _, _}} -> N;
|
|
|
- {_, {describe_statement, N}} -> N
|
|
|
- end,
|
|
|
- #statement{name = Name, types = Types, columns = Columns}.
|
|
|
-
|
|
|
-sync_required(#state{queue = Q} = State) ->
|
|
|
- case queue:is_empty(Q) of
|
|
|
- false ->
|
|
|
- case command_tag(State) of
|
|
|
- sync ->
|
|
|
- State;
|
|
|
- _ ->
|
|
|
- sync_required(finish(State, {error, sync_required}))
|
|
|
- end;
|
|
|
- true ->
|
|
|
- State#state{sync_required = true}
|
|
|
- end.
|
|
|
+sync_required(#state{current_cmd = epgsql_cmd_sync} = State) ->
|
|
|
+ State;
|
|
|
+sync_required(#state{current_cmd = undefined} = State) ->
|
|
|
+ State#state{sync_required = true};
|
|
|
+sync_required(State) ->
|
|
|
+ sync_required(finish(State, {error, sync_required})).
|
|
|
|
|
|
-flush_queue(#state{queue = Q} = State, Error) ->
|
|
|
- case queue:is_empty(Q) of
|
|
|
- false ->
|
|
|
- flush_queue(finish(State, Error), Error);
|
|
|
- true -> State
|
|
|
- end.
|
|
|
+flush_queue(#state{current_cmd = undefined} = State, _) ->
|
|
|
+ State;
|
|
|
+flush_queue(State, Error) ->
|
|
|
+ flush_queue(finish(State, Error), Error).
|
|
|
|
|
|
to_binary(B) when is_binary(B) -> B;
|
|
|
to_binary(L) when is_list(L) -> list_to_binary(L).
|
|
|
|
|
|
-hex(Bin) ->
|
|
|
- HChar = fun(N) when N < 10 -> $0 + N;
|
|
|
- (N) when N < 16 -> $W + N
|
|
|
- end,
|
|
|
- <<<<(HChar(H)), (HChar(L))>> || <<H:4, L:4>> <= Bin>>.
|
|
|
|
|
|
%% -- backend message handling --
|
|
|
|
|
|
-%% AuthenticationOk
|
|
|
-auth({?AUTHENTICATION_REQUEST, <<0:?int32>>}, State) ->
|
|
|
- {noreply, State#state{handler = initializing}};
|
|
|
-
|
|
|
-%% AuthenticationCleartextPassword
|
|
|
-auth({?AUTHENTICATION_REQUEST, <<3:?int32>>}, State) ->
|
|
|
- send(State, ?PASSWORD, [get(password), 0]),
|
|
|
- {noreply, State};
|
|
|
-
|
|
|
-%% AuthenticationMD5Password
|
|
|
-auth({?AUTHENTICATION_REQUEST, <<5:?int32, Salt:4/binary>>}, State) ->
|
|
|
- Digest1 = hex(erlang:md5([get(password), get(username)])),
|
|
|
- Str = ["md5", hex(erlang:md5([Digest1, Salt])), 0],
|
|
|
- send(State, ?PASSWORD, Str),
|
|
|
- {noreply, State};
|
|
|
-
|
|
|
-auth({?AUTHENTICATION_REQUEST, <<M:?int32, _/binary>>}, State) ->
|
|
|
- Method = case M of
|
|
|
- 2 -> kerberosV5;
|
|
|
- 4 -> crypt;
|
|
|
- 6 -> scm;
|
|
|
- 7 -> gss;
|
|
|
- 8 -> sspi;
|
|
|
- _ -> unknown
|
|
|
- end,
|
|
|
- State2 = finish(State, {error, {unsupported_auth_method, Method}}),
|
|
|
- {stop, normal, State2};
|
|
|
-
|
|
|
-%% ErrorResponse
|
|
|
-auth({error, E}, State) ->
|
|
|
- Why = case E#error.code of
|
|
|
- <<"28000">> -> invalid_authorization_specification;
|
|
|
- <<"28P01">> -> invalid_password;
|
|
|
- Any -> Any
|
|
|
- end,
|
|
|
- {stop, normal, finish(State, {error, Why})};
|
|
|
-
|
|
|
-auth(Other, State) ->
|
|
|
- on_message(Other, State).
|
|
|
-
|
|
|
-%% BackendKeyData
|
|
|
-initializing({?CANCELLATION_KEY, <<Pid:?int32, Key:?int32>>}, State) ->
|
|
|
- {noreply, State#state{backend = {Pid, Key}}};
|
|
|
-
|
|
|
-%% ReadyForQuery
|
|
|
-initializing({?READY_FOR_QUERY, <<Status:8>>}, State) ->
|
|
|
- #state{parameters = Parameters} = State,
|
|
|
- erase(username),
|
|
|
- erase(password),
|
|
|
- %% TODO decode dates to now() format
|
|
|
- case lists:keysearch(<<"integer_datetimes">>, 1, Parameters) of
|
|
|
- {value, {_, <<"on">>}} -> put(datetime_mod, epgsql_idatetime);
|
|
|
- {value, {_, <<"off">>}} -> put(datetime_mod, epgsql_fdatetime)
|
|
|
- end,
|
|
|
- State2 = finish(State#state{handler = on_message,
|
|
|
- txstatus = Status,
|
|
|
- codec = epgsql_binary:new_codec([])},
|
|
|
- connected),
|
|
|
- {noreply, State2};
|
|
|
-
|
|
|
-initializing({error, _} = Error, State) ->
|
|
|
- {stop, normal, finish(State, Error)};
|
|
|
-
|
|
|
-initializing(Other, State) ->
|
|
|
- on_message(Other, State).
|
|
|
-
|
|
|
-%% ParseComplete
|
|
|
-on_message({?PARSE_COMPLETE, <<>>}, State) ->
|
|
|
- {noreply, State};
|
|
|
-
|
|
|
-%% ParameterDescription
|
|
|
-on_message({?PARAMETER_DESCRIPTION, <<_Count:?int16, Bin/binary>>}, State) ->
|
|
|
- Types = [epgsql_binary:oid2type(Oid, State#state.codec) || <<Oid:?int32>> <= Bin],
|
|
|
- State2 = notify(State#state{types = Types}, {types, Types}),
|
|
|
- {noreply, State2};
|
|
|
-
|
|
|
-%% RowDescription
|
|
|
-on_message({?ROW_DESCRIPTION, <<Count:?int16, Bin/binary>>}, State) ->
|
|
|
- Columns = epgsql_wire:decode_columns(Count, Bin, State#state.codec),
|
|
|
- Columns2 =
|
|
|
- case command_tag(State) of
|
|
|
- C when C == describe_portal; C == squery ->
|
|
|
- Columns;
|
|
|
- C when C == parse; C == describe_statement ->
|
|
|
- [Col#column{format = epgsql_wire:format(Col#column.type)}
|
|
|
- || Col <- Columns]
|
|
|
- end,
|
|
|
- State2 = State#state{columns = Columns2},
|
|
|
- Message = {columns, Columns2},
|
|
|
- State3 = case command_tag(State2) of
|
|
|
- squery ->
|
|
|
- notify(State2, Message);
|
|
|
- T when T == parse; T == describe_statement ->
|
|
|
- finish(State2, Message, {ok, make_statement(State2)});
|
|
|
- describe_portal ->
|
|
|
- finish(State2, Message, {ok, Columns})
|
|
|
- end,
|
|
|
- {noreply, State3};
|
|
|
-
|
|
|
-%% NoData
|
|
|
-on_message({?NO_DATA, <<>>}, State) ->
|
|
|
- State2 = case command_tag(State) of
|
|
|
- C when C == parse; C == describe_statement ->
|
|
|
- finish(State, no_data, {ok, make_statement(State)});
|
|
|
- describe_portal ->
|
|
|
- finish(State, no_data, {ok, []})
|
|
|
- end,
|
|
|
- {noreply, State2};
|
|
|
-
|
|
|
-%% BindComplete
|
|
|
-on_message({?BIND_COMPLETE, <<>>}, State) ->
|
|
|
- State2 = case command_tag(State) of
|
|
|
- Command when Command == equery; Command == prepared_query ->
|
|
|
- %% TODO send Describe as a part of equery, needs text format support
|
|
|
- notify(State, {columns, get_columns(State)});
|
|
|
- bind ->
|
|
|
- finish(State, ok);
|
|
|
- execute_batch ->
|
|
|
- Batch =
|
|
|
- case State#state.batch of
|
|
|
- [] ->
|
|
|
- {_, {_, B}} = queue:get(State#state.queue),
|
|
|
- B;
|
|
|
- B -> B
|
|
|
- end,
|
|
|
- State#state{batch = Batch}
|
|
|
- end,
|
|
|
- {noreply, State2};
|
|
|
-
|
|
|
-%% CloseComplete
|
|
|
-on_message({?CLOSE_COMPLETE, <<>>}, State) ->
|
|
|
- State2 = case command_tag(State) of
|
|
|
- Command when Command == equery; Command == prepared_query ->
|
|
|
- State;
|
|
|
- close ->
|
|
|
- finish(State, ok)
|
|
|
- end,
|
|
|
- {noreply, State2};
|
|
|
-
|
|
|
-%% DataRow
|
|
|
-on_message({?DATA_ROW, <<_Count:?int16, Bin/binary>>}, State) ->
|
|
|
- Data = epgsql_wire:decode_data(get_columns(State), Bin, State#state.codec),
|
|
|
- {noreply, add_row(State, Data)};
|
|
|
-
|
|
|
-%% PortalSuspended
|
|
|
-on_message({?PORTAL_SUSPENDED, <<>>}, State) ->
|
|
|
- State2 = finish(State,
|
|
|
- suspended,
|
|
|
- {partial, lists:reverse(State#state.rows)}),
|
|
|
- {noreply, State2};
|
|
|
-
|
|
|
%% CommandComplete
|
|
|
-on_message({?COMMAND_COMPLETE, Bin}, State0) ->
|
|
|
+on_message(?COMMAND_COMPLETE = Msg, Bin, State) ->
|
|
|
Complete = epgsql_wire:decode_complete(Bin),
|
|
|
- State = State0#state{complete_status = Complete},
|
|
|
- Command = command_tag(State),
|
|
|
- Notice = {complete, Complete},
|
|
|
- Rows = lists:reverse(State#state.rows),
|
|
|
- Columns = get_columns(State),
|
|
|
- State2 = case {Command, Complete, Columns} of
|
|
|
- {execute, {_, Count}, []} ->
|
|
|
- finish(State, Notice, {ok, Count});
|
|
|
- {execute, {_, Count}, _} ->
|
|
|
- finish(State, Notice, {ok, Count, Rows});
|
|
|
- {execute, _, _} ->
|
|
|
- finish(State, Notice, {ok, Rows});
|
|
|
- {execute_batch, {_, Count}, []} ->
|
|
|
- add_result(State, Notice, {ok, Count});
|
|
|
- {execute_batch, {_, Count}, _} ->
|
|
|
- add_result(State, Notice, {ok, Count, Rows});
|
|
|
- {execute_batch, _, _} ->
|
|
|
- add_result(State, Notice, {ok, Rows});
|
|
|
- {C, {_, Count}, []} when C == squery; C == equery; C == prepared_query ->
|
|
|
- add_result(State, Notice, {ok, Count});
|
|
|
- {C, {_, Count}, _} when C == squery; C == equery; C == prepared_query ->
|
|
|
- add_result(State, Notice, {ok, Count, Columns, Rows});
|
|
|
- {C, _, _} when C == squery; C == equery; C == prepared_query ->
|
|
|
- add_result(State, Notice, {ok, Columns, Rows})
|
|
|
- end,
|
|
|
- {noreply, State2};
|
|
|
-
|
|
|
-%% EmptyQueryResponse
|
|
|
-on_message({?EMPTY_QUERY, _Bin}, State) ->
|
|
|
- Notice = {complete, empty},
|
|
|
- State2 = case command_tag(State) of
|
|
|
- execute ->
|
|
|
- finish(State, Notice, {ok, [], []});
|
|
|
- C when C == squery; C == equery; C == prepared_query ->
|
|
|
- add_result(State, Notice, {ok, [], []})
|
|
|
- end,
|
|
|
- {noreply, State2};
|
|
|
+ command_handle_message(Msg, Bin, State#state{complete_status = Complete});
|
|
|
|
|
|
%% ReadyForQuery
|
|
|
-on_message({?READY_FOR_QUERY, <<Status:8>>}, State) ->
|
|
|
- State2 = case command_tag(State) of
|
|
|
- squery ->
|
|
|
- case State#state.results of
|
|
|
- [Result] ->
|
|
|
- finish(State, done, Result);
|
|
|
- Results ->
|
|
|
- finish(State, done, lists:reverse(Results))
|
|
|
- end;
|
|
|
- execute_batch ->
|
|
|
- finish(State, done, lists:reverse(State#state.results));
|
|
|
- Command when Command == equery; Command == prepared_query ->
|
|
|
- case State#state.results of
|
|
|
- [Result] ->
|
|
|
- finish(State, done, Result);
|
|
|
- [] ->
|
|
|
- finish(State, done)
|
|
|
- end;
|
|
|
- sync ->
|
|
|
- finish(State, ok)
|
|
|
- end,
|
|
|
- {noreply, State2#state{txstatus = Status}};
|
|
|
-
|
|
|
-on_message(Error = {error, Reason}, State) ->
|
|
|
- case queue:is_empty(State#state.queue) of
|
|
|
- true ->
|
|
|
+on_message(?READY_FOR_QUERY = Msg, <<Status:8>> = Bin, State) ->
|
|
|
+ command_handle_message(Msg, Bin, State#state{txstatus = Status});
|
|
|
+
|
|
|
+%% Error
|
|
|
+on_message(?ERROR = Msg, Err, #state{current_cmd = CurrentCmd} = State) ->
|
|
|
+ Reason = epgsql_wire:decode_error(Err),
|
|
|
+ case CurrentCmd of
|
|
|
+ undefined ->
|
|
|
+ %% Message generated by server asynchronously
|
|
|
{stop, {shutdown, Reason}, State};
|
|
|
- false ->
|
|
|
- State2 = case command_tag(State) of
|
|
|
- C when C == squery; C == equery; C == execute_batch; C == prepared_query ->
|
|
|
- add_result(State, Error, Error);
|
|
|
- _ ->
|
|
|
- sync_required(finish(State, Error))
|
|
|
- end,
|
|
|
- {noreply, State2}
|
|
|
+ _ ->
|
|
|
+ command_handle_message(Msg, Reason, State)
|
|
|
end;
|
|
|
|
|
|
%% NoticeResponse
|
|
|
-on_message({?NOTICE, Data}, State) ->
|
|
|
+on_message(?NOTICE, Data, State) ->
|
|
|
notify_async(State, {notice, epgsql_wire:decode_error(Data)}),
|
|
|
{noreply, State};
|
|
|
|
|
|
%% ParameterStatus
|
|
|
-on_message({?PARAMETER_STATUS, Data}, State) ->
|
|
|
+on_message(?PARAMETER_STATUS, Data, State) ->
|
|
|
[Name, Value] = epgsql_wire:decode_strings(Data),
|
|
|
Parameters2 = lists:keystore(Name, 1, State#state.parameters,
|
|
|
{Name, Value}),
|
|
|
{noreply, State#state{parameters = Parameters2}};
|
|
|
|
|
|
%% NotificationResponse
|
|
|
-on_message({?NOTIFICATION, <<Pid:?int32, Strings/binary>>}, State) ->
|
|
|
+on_message(?NOTIFICATION, <<Pid:?int32, Strings/binary>>, State) ->
|
|
|
{Channel1, Payload1} = case epgsql_wire:decode_strings(Strings) of
|
|
|
[Channel, Payload] -> {Channel, Payload};
|
|
|
[Channel] -> {Channel, <<>>}
|
|
@@ -825,46 +507,72 @@ on_message({?NOTIFICATION, <<Pid:?int32, Strings/binary>>}, State) ->
|
|
|
notify_async(State, {notification, Channel1, Pid, Payload1}),
|
|
|
{noreply, State};
|
|
|
|
|
|
+%% ParseComplete
|
|
|
+%% ParameterDescription
|
|
|
+%% RowDescription
|
|
|
+%% NoData
|
|
|
+%% BindComplete
|
|
|
+%% CloseComplete
|
|
|
+%% DataRow
|
|
|
+%% PortalSuspended
|
|
|
+%% EmptyQueryResponse
|
|
|
+%% CopyData
|
|
|
%% CopyBothResponse
|
|
|
-on_message({?COPY_BOTH_RESPONSE, _Data}, State) ->
|
|
|
- State2 = finish(State, ok),
|
|
|
- {noreply, State2};
|
|
|
+on_message(Msg, Payload, State) ->
|
|
|
+ command_handle_message(Msg, Payload, State).
|
|
|
|
|
|
-%% CopyData for COPY command. COPY command not supported yet.
|
|
|
-on_message({?COPY_DATA, _Data}, #state{repl_cbmodule = undefined, repl_receiver = undefined} = State) ->
|
|
|
- {stop, {error, copy_command_not_supported}, State};
|
|
|
|
|
|
%% CopyData for Replication mode
|
|
|
-on_message({?COPY_DATA, <<?PRIMARY_KEEPALIVE_MESSAGE:8, LSN:?int64, _Timestamp:?int64, ReplyRequired:8>>},
|
|
|
- #state{repl_last_flushed_lsn = LastFlushedLSN, repl_last_applied_lsn = LastAppliedLSN,
|
|
|
- repl_align_lsn = AlignLsn} = State) ->
|
|
|
- case ReplyRequired of
|
|
|
- 1 when AlignLsn ->
|
|
|
- send(State, ?COPY_DATA,
|
|
|
- epgsql_wire:encode_standby_status_update(LSN, LSN, LSN)),
|
|
|
- {noreply, State#state{repl_feedback_required = false,
|
|
|
- repl_last_received_lsn = LSN,
|
|
|
- repl_last_applied_lsn = LSN,
|
|
|
- repl_last_flushed_lsn = LSN}};
|
|
|
- 1 when not AlignLsn ->
|
|
|
- send(State, ?COPY_DATA,
|
|
|
- epgsql_wire:encode_standby_status_update(LSN, LastFlushedLSN, LastAppliedLSN)),
|
|
|
- {noreply, State#state{repl_feedback_required = false, repl_last_received_lsn = LSN}};
|
|
|
- _ ->
|
|
|
- {noreply, State#state{repl_feedback_required = true, repl_last_received_lsn = LSN}}
|
|
|
- end;
|
|
|
+on_replication(?COPY_DATA, <<?PRIMARY_KEEPALIVE_MESSAGE:8, LSN:?int64, _Timestamp:?int64, ReplyRequired:8>>,
|
|
|
+ #state{repl = #repl{last_flushed_lsn = LastFlushedLSN,
|
|
|
+ last_applied_lsn = LastAppliedLSN,
|
|
|
+ align_lsn = AlignLsn} = Repl} = State) ->
|
|
|
+ Repl1 =
|
|
|
+ case ReplyRequired of
|
|
|
+ 1 when AlignLsn ->
|
|
|
+ send(State, ?COPY_DATA,
|
|
|
+ epgsql_wire:encode_standby_status_update(LSN, LSN, LSN)),
|
|
|
+ Repl#repl{feedback_required = false,
|
|
|
+ last_received_lsn = LSN, last_applied_lsn = LSN, last_flushed_lsn = LSN};
|
|
|
+ 1 when not AlignLsn ->
|
|
|
+ send(State, ?COPY_DATA,
|
|
|
+ epgsql_wire:encode_standby_status_update(LSN, LastFlushedLSN, LastAppliedLSN)),
|
|
|
+ Repl#repl{feedback_required = false,
|
|
|
+ last_received_lsn = LSN};
|
|
|
+ _ ->
|
|
|
+ Repl#repl{feedback_required = true,
|
|
|
+ last_received_lsn = LSN}
|
|
|
+ end,
|
|
|
+ {noreply, State#state{repl = Repl1}};
|
|
|
|
|
|
-%% CopyData for Replication mode. with async messages
|
|
|
-on_message({?COPY_DATA, <<?X_LOG_DATA, StartLSN:?int64, EndLSN:?int64, _Timestamp:?int64, WALRecord/binary>>},
|
|
|
- #state{repl_cbmodule = undefined, repl_receiver = Receiver} = State) ->
|
|
|
+%% CopyData for Replication mode
|
|
|
+on_replication(?COPY_DATA, <<?X_LOG_DATA, StartLSN:?int64, EndLSN:?int64,
|
|
|
+ _Timestamp:?int64, WALRecord/binary>>,
|
|
|
+ #state{repl = Repl} = State) ->
|
|
|
+ Repl1 = handle_xlog_data(StartLSN, EndLSN, WALRecord, Repl),
|
|
|
+ {noreply, State#state{repl = Repl1}};
|
|
|
+on_replication(?ERROR, Err, State) ->
|
|
|
+ Reason = epgsql_wire:decode_error(Err),
|
|
|
+ {stop, {error, Reason}, State};
|
|
|
+on_replication(M, Data, Sock) when M == ?NOTICE;
|
|
|
+ M == ?NOTIFICATION;
|
|
|
+ M == ?PARAMETER_STATUS ->
|
|
|
+ on_message(M, Data, Sock).
|
|
|
+
|
|
|
+
|
|
|
+handle_xlog_data(StartLSN, EndLSN, WALRecord, #repl{cbmodule = undefined,
|
|
|
+ receiver = Receiver} = Repl) ->
|
|
|
+ %% with async messages
|
|
|
Receiver ! {epgsql, self(), {x_log_data, StartLSN, EndLSN, WALRecord}},
|
|
|
- {noreply, State#state{repl_feedback_required = true, repl_last_received_lsn = EndLSN}};
|
|
|
-
|
|
|
-%% CopyData for Replication mode. with callback method
|
|
|
-on_message({?COPY_DATA, <<?X_LOG_DATA, StartLSN:?int64, EndLSN:?int64, _Timestamp:?int64, WALRecord/binary>>},
|
|
|
- #state{repl_cbmodule = CbModule, repl_cbstate = CbState, repl_receiver = undefined} = State) ->
|
|
|
+ Repl#repl{feedback_required = true,
|
|
|
+ last_received_lsn = EndLSN};
|
|
|
+handle_xlog_data(StartLSN, EndLSN, WALRecord,
|
|
|
+ #repl{cbmodule = CbModule, cbstate = CbState, receiver = undefined} = Repl) ->
|
|
|
+ %% with callback method
|
|
|
{ok, LastFlushedLSN, LastAppliedLSN, NewCbState} =
|
|
|
- CbModule:handle_x_log_data(StartLSN, EndLSN, WALRecord, CbState),
|
|
|
- {noreply, State#state{repl_feedback_required = true, repl_last_received_lsn = EndLSN,
|
|
|
- repl_last_flushed_lsn = LastFlushedLSN, repl_last_applied_lsn = LastAppliedLSN,
|
|
|
- repl_cbstate = NewCbState}}.
|
|
|
+ epgsql:handle_x_log_data(CbModule, StartLSN, EndLSN, WALRecord, CbState),
|
|
|
+ Repl#repl{feedback_required = true,
|
|
|
+ last_received_lsn = EndLSN,
|
|
|
+ last_flushed_lsn = LastFlushedLSN,
|
|
|
+ last_applied_lsn = LastAppliedLSN,
|
|
|
+ cbstate = NewCbState}.
|