|
@@ -19,6 +19,39 @@
|
|
-include("pgsql.hrl").
|
|
-include("pgsql.hrl").
|
|
-include("pgsql_binary.hrl").
|
|
-include("pgsql_binary.hrl").
|
|
|
|
|
|
|
|
+%% Commands defined as per this page:
|
|
|
|
+%% http://www.postgresql.org/docs/9.2/static/protocol-message-formats.html
|
|
|
|
+
|
|
|
|
+%% Commands
|
|
|
|
+-define(BIND, $B).
|
|
|
|
+-define(CLOSE, $C).
|
|
|
|
+-define(DESCRIBE, $D).
|
|
|
|
+-define(EXECUTE, $E).
|
|
|
|
+-define(FLUSH, $H).
|
|
|
|
+-define(PARSE, $P).
|
|
|
|
+-define(SIMPLEQUERY, $Q).
|
|
|
|
+-define(AUTHENTICATION_REQUEST, $R).
|
|
|
|
+-define(SYNC, $S).
|
|
|
|
+
|
|
|
|
+%% Parameters
|
|
|
|
+
|
|
|
|
+-define(PREPARED_STATEMENT, $S).
|
|
|
|
+-define(PORTAL, $P).
|
|
|
|
+
|
|
|
|
+%% Responses
|
|
|
|
+
|
|
|
|
+-define(PARSE_COMPLETE, $1).
|
|
|
|
+-define(BIND_COMPLETE, $2).
|
|
|
|
+-define(CLOSE_COMPLETE, $3).
|
|
|
|
+-define(COMMAND_COMPLETE, $C).
|
|
|
|
+-define(DATA_ROW, $D).
|
|
|
|
+-define(CANCELLATION_KEY, $K).
|
|
|
|
+-define(NO_DATA, $n).
|
|
|
|
+-define(PORTAL_SUSPENDED, $s).
|
|
|
|
+-define(PARAMETER_DESCRIPTION, $t).
|
|
|
|
+-define(ROW_DESCRIPTION, $T).
|
|
|
|
+-define(READY_FOR_QUERY, $Z).
|
|
|
|
+
|
|
-record(state, {mod,
|
|
-record(state, {mod,
|
|
sock,
|
|
sock,
|
|
data = <<>>,
|
|
data = <<>>,
|
|
@@ -146,7 +179,7 @@ command({connect, Host, Username, Password, Opts}, State) ->
|
|
async = Async}};
|
|
async = Async}};
|
|
|
|
|
|
command({squery, Sql}, State) ->
|
|
command({squery, Sql}, State) ->
|
|
- send(State, $Q, [Sql, 0]),
|
|
|
|
|
|
+ send(State, ?SIMPLEQUERY, [Sql, 0]),
|
|
{noreply, State};
|
|
{noreply, State};
|
|
|
|
|
|
%% TODO add fast_equery command that doesn't need parsed statement,
|
|
%% TODO add fast_equery command that doesn't need parsed statement,
|
|
@@ -156,17 +189,17 @@ command({equery, Statement, Parameters}, State) ->
|
|
#statement{name = StatementName, columns = Columns} = Statement,
|
|
#statement{name = StatementName, columns = Columns} = Statement,
|
|
Bin1 = pgsql_wire:encode_parameters(Parameters),
|
|
Bin1 = pgsql_wire:encode_parameters(Parameters),
|
|
Bin2 = pgsql_wire:encode_formats(Columns),
|
|
Bin2 = pgsql_wire:encode_formats(Columns),
|
|
- send(State, $B, ["", 0, StatementName, 0, Bin1, Bin2]),
|
|
|
|
- send(State, $E, ["", 0, <<0:?int32>>]),
|
|
|
|
- send(State, $C, [$S, StatementName, 0]),
|
|
|
|
- send(State, $S, []),
|
|
|
|
|
|
+ send(State, ?BIND, ["", 0, StatementName, 0, Bin1, Bin2]),
|
|
|
|
+ send(State, ?EXECUTE, ["", 0, <<0:?int32>>]),
|
|
|
|
+ send(State, ?CLOSE, [?PREPARED_STATEMENT, StatementName, 0]),
|
|
|
|
+ send(State, ?SYNC, []),
|
|
{noreply, State};
|
|
{noreply, State};
|
|
|
|
|
|
command({parse, Name, Sql, Types}, State) ->
|
|
command({parse, Name, Sql, Types}, State) ->
|
|
Bin = pgsql_wire:encode_types(Types),
|
|
Bin = pgsql_wire:encode_types(Types),
|
|
- send(State, $P, [Name, 0, Sql, 0, Bin]),
|
|
|
|
- send(State, $D, [$S, Name, 0]),
|
|
|
|
- send(State, $H, []),
|
|
|
|
|
|
+ send(State, ?PARSE, [Name, 0, Sql, 0, Bin]),
|
|
|
|
+ send(State, ?DESCRIBE, [?PREPARED_STATEMENT, Name, 0]),
|
|
|
|
+ send(State, ?FLUSH, []),
|
|
{noreply, State};
|
|
{noreply, State};
|
|
|
|
|
|
command({bind, Statement, PortalName, Parameters}, State) ->
|
|
command({bind, Statement, PortalName, Parameters}, State) ->
|
|
@@ -174,13 +207,13 @@ command({bind, Statement, PortalName, Parameters}, State) ->
|
|
Typed_Parameters = lists:zip(Types, Parameters),
|
|
Typed_Parameters = lists:zip(Types, Parameters),
|
|
Bin1 = pgsql_wire:encode_parameters(Typed_Parameters),
|
|
Bin1 = pgsql_wire:encode_parameters(Typed_Parameters),
|
|
Bin2 = pgsql_wire:encode_formats(Columns),
|
|
Bin2 = pgsql_wire:encode_formats(Columns),
|
|
- send(State, $B, [PortalName, 0, StatementName, 0, Bin1, Bin2]),
|
|
|
|
- send(State, $H, []),
|
|
|
|
|
|
+ send(State, ?BIND, [PortalName, 0, StatementName, 0, Bin1, Bin2]),
|
|
|
|
+ send(State, ?FLUSH, []),
|
|
{noreply, State};
|
|
{noreply, State};
|
|
|
|
|
|
command({execute, _Statement, PortalName, MaxRows}, State) ->
|
|
command({execute, _Statement, PortalName, MaxRows}, State) ->
|
|
- send(State, $E, [PortalName, 0, <<MaxRows:?int32>>]),
|
|
|
|
- send(State, $H, []),
|
|
|
|
|
|
+ send(State, ?EXECUTE, [PortalName, 0, <<MaxRows:?int32>>]),
|
|
|
|
+ send(State, ?FLUSH, []),
|
|
{noreply, State};
|
|
{noreply, State};
|
|
|
|
|
|
command({execute_batch, Batch}, State) ->
|
|
command({execute_batch, Batch}, State) ->
|
|
@@ -194,36 +227,36 @@ command({execute_batch, Batch}, State) ->
|
|
Typed_Parameters = lists:zip(Types, Parameters),
|
|
Typed_Parameters = lists:zip(Types, Parameters),
|
|
Bin1 = pgsql_wire:encode_parameters(Typed_Parameters),
|
|
Bin1 = pgsql_wire:encode_parameters(Typed_Parameters),
|
|
Bin2 = pgsql_wire:encode_formats(Columns),
|
|
Bin2 = pgsql_wire:encode_formats(Columns),
|
|
- [pgsql_wire:encode($B, [0, StatementName, 0,
|
|
|
|
- Bin1, Bin2]),
|
|
|
|
- pgsql_wire:encode($E, [0, <<0:?int32>>])]
|
|
|
|
|
|
+ [pgsql_wire:encode(?BIND, [0, StatementName, 0,
|
|
|
|
+ Bin1, Bin2]),
|
|
|
|
+ pgsql_wire:encode(?EXECUTE, [0, <<0:?int32>>])]
|
|
end,
|
|
end,
|
|
Batch),
|
|
Batch),
|
|
- Sync = pgsql_wire:encode($S, []),
|
|
|
|
|
|
+ Sync = pgsql_wire:encode(?SYNC, []),
|
|
do_send(Mod, Sock, [BindExecute, Sync]),
|
|
do_send(Mod, Sock, [BindExecute, Sync]),
|
|
{noreply, State};
|
|
{noreply, State};
|
|
|
|
|
|
command({describe_statement, Name}, State) ->
|
|
command({describe_statement, Name}, State) ->
|
|
- send(State, $D, [$S, Name, 0]),
|
|
|
|
- send(State, $H, []),
|
|
|
|
|
|
+ send(State, ?DESCRIBE, [?PREPARED_STATEMENT, Name, 0]),
|
|
|
|
+ send(State, ?FLUSH, []),
|
|
{noreply, State};
|
|
{noreply, State};
|
|
|
|
|
|
command({describe_portal, Name}, State) ->
|
|
command({describe_portal, Name}, State) ->
|
|
- send(State, $D, [$P, Name, 0]),
|
|
|
|
- send(State, $H, []),
|
|
|
|
|
|
+ send(State, ?DESCRIBE, [?PORTAL, Name, 0]),
|
|
|
|
+ send(State, ?FLUSH, []),
|
|
{noreply, State};
|
|
{noreply, State};
|
|
|
|
|
|
command({close, Type, Name}, State) ->
|
|
command({close, Type, Name}, State) ->
|
|
case Type of
|
|
case Type of
|
|
- statement -> Type2 = $S;
|
|
|
|
- portal -> Type2 = $P
|
|
|
|
|
|
+ statement -> Type2 = ?PREPARED_STATEMENT;
|
|
|
|
+ portal -> Type2 = ?PORTAL
|
|
end,
|
|
end,
|
|
- send(State, $C, [Type2, Name, 0]),
|
|
|
|
- send(State, $H, []),
|
|
|
|
|
|
+ send(State, ?CLOSE, [Type2, Name, 0]),
|
|
|
|
+ send(State, ?FLUSH, []),
|
|
{noreply, State};
|
|
{noreply, State};
|
|
|
|
|
|
command(sync, State) ->
|
|
command(sync, State) ->
|
|
- send(State, $S, []),
|
|
|
|
|
|
+ send(State, ?SYNC, []),
|
|
{noreply, State#state{sync_required = false}}.
|
|
{noreply, State#state{sync_required = false}}.
|
|
|
|
|
|
start_ssl(S, Flag, Opts, State) ->
|
|
start_ssl(S, Flag, Opts, State) ->
|
|
@@ -406,22 +439,22 @@ hex(Bin) ->
|
|
%% -- backend message handling --
|
|
%% -- backend message handling --
|
|
|
|
|
|
%% AuthenticationOk
|
|
%% AuthenticationOk
|
|
-auth({$R, <<0:?int32>>}, State) ->
|
|
|
|
|
|
+auth({?AUTHENTICATION_REQUEST, <<0:?int32>>}, State) ->
|
|
{noreply, State#state{handler = initializing}};
|
|
{noreply, State#state{handler = initializing}};
|
|
|
|
|
|
%% AuthenticationCleartextPassword
|
|
%% AuthenticationCleartextPassword
|
|
-auth({$R, <<3:?int32>>}, State) ->
|
|
|
|
|
|
+auth({?AUTHENTICATION_REQUEST, <<3:?int32>>}, State) ->
|
|
send(State, $p, [get(password), 0]),
|
|
send(State, $p, [get(password), 0]),
|
|
{noreply, State};
|
|
{noreply, State};
|
|
|
|
|
|
%% AuthenticationMD5Password
|
|
%% AuthenticationMD5Password
|
|
-auth({$R, <<5:?int32, Salt:4/binary>>}, State) ->
|
|
|
|
|
|
+auth({?AUTHENTICATION_REQUEST, <<5:?int32, Salt:4/binary>>}, State) ->
|
|
Digest1 = hex(erlang:md5([get(password), get(username)])),
|
|
Digest1 = hex(erlang:md5([get(password), get(username)])),
|
|
Str = ["md5", hex(erlang:md5([Digest1, Salt])), 0],
|
|
Str = ["md5", hex(erlang:md5([Digest1, Salt])), 0],
|
|
send(State, $p, Str),
|
|
send(State, $p, Str),
|
|
{noreply, State};
|
|
{noreply, State};
|
|
|
|
|
|
-auth({$R, <<M:?int32, _/binary>>}, State) ->
|
|
|
|
|
|
+auth({?AUTHENTICATION_REQUEST, <<M:?int32, _/binary>>}, State) ->
|
|
case M of
|
|
case M of
|
|
2 -> Method = kerberosV5;
|
|
2 -> Method = kerberosV5;
|
|
4 -> Method = crypt;
|
|
4 -> Method = crypt;
|
|
@@ -446,11 +479,11 @@ auth(Other, State) ->
|
|
on_message(Other, State).
|
|
on_message(Other, State).
|
|
|
|
|
|
%% BackendKeyData
|
|
%% BackendKeyData
|
|
-initializing({$K, <<Pid:?int32, Key:?int32>>}, State) ->
|
|
|
|
|
|
+initializing({?CANCELLATION_KEY, <<Pid:?int32, Key:?int32>>}, State) ->
|
|
{noreply, State#state{backend = {Pid, Key}}};
|
|
{noreply, State#state{backend = {Pid, Key}}};
|
|
|
|
|
|
%% ReadyForQuery
|
|
%% ReadyForQuery
|
|
-initializing({$Z, <<Status:8>>}, State) ->
|
|
|
|
|
|
+initializing({?READY_FOR_QUERY, <<Status:8>>}, State) ->
|
|
#state{parameters = Parameters} = State,
|
|
#state{parameters = Parameters} = State,
|
|
erase(username),
|
|
erase(username),
|
|
erase(password),
|
|
erase(password),
|
|
@@ -471,17 +504,17 @@ initializing(Other, State) ->
|
|
on_message(Other, State).
|
|
on_message(Other, State).
|
|
|
|
|
|
%% ParseComplete
|
|
%% ParseComplete
|
|
-on_message({$1, <<>>}, State) ->
|
|
|
|
|
|
+on_message({?PARSE_COMPLETE, <<>>}, State) ->
|
|
{noreply, State};
|
|
{noreply, State};
|
|
|
|
|
|
%% ParameterDescription
|
|
%% ParameterDescription
|
|
-on_message({$t, <<_Count:?int16, Bin/binary>>}, State) ->
|
|
|
|
|
|
+on_message({?PARAMETER_DESCRIPTION, <<_Count:?int16, Bin/binary>>}, State) ->
|
|
Types = [pgsql_types:oid2type(Oid) || <<Oid:?int32>> <= Bin],
|
|
Types = [pgsql_types:oid2type(Oid) || <<Oid:?int32>> <= Bin],
|
|
State2 = notify(State#state{types = Types}, {types, Types}),
|
|
State2 = notify(State#state{types = Types}, {types, Types}),
|
|
{noreply, State2};
|
|
{noreply, State2};
|
|
|
|
|
|
%% RowDescription
|
|
%% RowDescription
|
|
-on_message({$T, <<Count:?int16, Bin/binary>>}, State) ->
|
|
|
|
|
|
+on_message({?ROW_DESCRIPTION, <<Count:?int16, Bin/binary>>}, State) ->
|
|
Columns = pgsql_wire:decode_columns(Count, Bin),
|
|
Columns = pgsql_wire:decode_columns(Count, Bin),
|
|
Columns2 =
|
|
Columns2 =
|
|
case command_tag(State) of
|
|
case command_tag(State) of
|
|
@@ -504,7 +537,7 @@ on_message({$T, <<Count:?int16, Bin/binary>>}, State) ->
|
|
{noreply, State3};
|
|
{noreply, State3};
|
|
|
|
|
|
%% NoData
|
|
%% NoData
|
|
-on_message({$n, <<>>}, State) ->
|
|
|
|
|
|
+on_message({?NO_DATA, <<>>}, State) ->
|
|
State2 = case command_tag(State) of
|
|
State2 = case command_tag(State) of
|
|
C when C == parse; C == describe_statement ->
|
|
C when C == parse; C == describe_statement ->
|
|
finish(State, no_data, {ok, make_statement(State)});
|
|
finish(State, no_data, {ok, make_statement(State)});
|
|
@@ -514,7 +547,7 @@ on_message({$n, <<>>}, State) ->
|
|
{noreply, State2};
|
|
{noreply, State2};
|
|
|
|
|
|
%% BindComplete
|
|
%% BindComplete
|
|
-on_message({$2, <<>>}, State) ->
|
|
|
|
|
|
+on_message({?BIND_COMPLETE, <<>>}, State) ->
|
|
State2 = case command_tag(State) of
|
|
State2 = case command_tag(State) of
|
|
equery ->
|
|
equery ->
|
|
%% TODO send Describe as a part of equery, needs text format support
|
|
%% TODO send Describe as a part of equery, needs text format support
|
|
@@ -534,7 +567,7 @@ on_message({$2, <<>>}, State) ->
|
|
{noreply, State2};
|
|
{noreply, State2};
|
|
|
|
|
|
%% CloseComplete
|
|
%% CloseComplete
|
|
-on_message({$3, <<>>}, State) ->
|
|
|
|
|
|
+on_message({?CLOSE_COMPLETE, <<>>}, State) ->
|
|
State2 = case command_tag(State) of
|
|
State2 = case command_tag(State) of
|
|
equery ->
|
|
equery ->
|
|
State;
|
|
State;
|
|
@@ -544,19 +577,19 @@ on_message({$3, <<>>}, State) ->
|
|
{noreply, State2};
|
|
{noreply, State2};
|
|
|
|
|
|
%% DataRow
|
|
%% DataRow
|
|
-on_message({$D, <<_Count:?int16, Bin/binary>>}, State) ->
|
|
|
|
|
|
+on_message({?DATA_ROW, <<_Count:?int16, Bin/binary>>}, State) ->
|
|
Data = pgsql_wire:decode_data(get_columns(State), Bin),
|
|
Data = pgsql_wire:decode_data(get_columns(State), Bin),
|
|
{noreply, add_row(State, Data)};
|
|
{noreply, add_row(State, Data)};
|
|
|
|
|
|
%% PortalSuspended
|
|
%% PortalSuspended
|
|
-on_message({$s, <<>>}, State) ->
|
|
|
|
|
|
+on_message({?PORTAL_SUSPENDED, <<>>}, State) ->
|
|
State2 = finish(State,
|
|
State2 = finish(State,
|
|
suspended,
|
|
suspended,
|
|
{partial, lists:reverse(State#state.rows)}),
|
|
{partial, lists:reverse(State#state.rows)}),
|
|
{noreply, State2};
|
|
{noreply, State2};
|
|
|
|
|
|
%% CommandComplete
|
|
%% CommandComplete
|
|
-on_message({$C, Bin}, State) ->
|
|
|
|
|
|
+on_message({?COMMAND_COMPLETE, Bin}, State) ->
|
|
Complete = pgsql_wire:decode_complete(Bin),
|
|
Complete = pgsql_wire:decode_complete(Bin),
|
|
Command = command_tag(State),
|
|
Command = command_tag(State),
|
|
Notice = {complete, Complete},
|
|
Notice = {complete, Complete},
|