|
@@ -11,7 +11,6 @@
|
|
|
|
|
|
-export([init/1, handle_event/3, handle_sync_event/4]).
|
|
-export([init/1, handle_event/3, handle_sync_event/4]).
|
|
-export([handle_info/3, terminate/3, code_change/4]).
|
|
-export([handle_info/3, terminate/3, code_change/4]).
|
|
--export([read/3]).
|
|
|
|
|
|
|
|
-export([startup/3, auth/2, initializing/2, ready/2, ready/3]).
|
|
-export([startup/3, auth/2, initializing/2, ready/2, ready/3]).
|
|
-export([querying/2, parsing/2, binding/2, describing/2]).
|
|
-export([querying/2, parsing/2, binding/2, describing/2]).
|
|
@@ -93,7 +92,7 @@ handle_event(Event, _State_Name, State) ->
|
|
handle_sync_event(Event, _From, _State_Name, State) ->
|
|
handle_sync_event(Event, _From, _State_Name, State) ->
|
|
{stop, {unsupported_sync_event, Event}, State}.
|
|
{stop, {unsupported_sync_event, Event}, State}.
|
|
|
|
|
|
-handle_info({'EXIT', Pid, Reason}, _State_Name, State = #state{reader = Pid}) ->
|
|
|
|
|
|
+handle_info({'EXIT', Pid, Reason}, _State_Name, State = #state{sock = Pid}) ->
|
|
{stop, Reason, State};
|
|
{stop, Reason, State};
|
|
|
|
|
|
handle_info(Info, _State_Name, State) ->
|
|
handle_info(Info, _State_Name, State) ->
|
|
@@ -101,8 +100,7 @@ handle_info(Info, _State_Name, State) ->
|
|
|
|
|
|
terminate(_Reason, _State_Name, State = #state{sock = Sock})
|
|
terminate(_Reason, _State_Name, State = #state{sock = Sock})
|
|
when Sock =/= undefined ->
|
|
when Sock =/= undefined ->
|
|
- send(State, $X, []),
|
|
|
|
- gen_tcp:close(Sock);
|
|
|
|
|
|
+ send(State, $X, []);
|
|
|
|
|
|
terminate(_Reason, _State_Name, _State) ->
|
|
terminate(_Reason, _State_Name, _State) ->
|
|
ok.
|
|
ok.
|
|
@@ -113,25 +111,11 @@ code_change(_Old_Vsn, State_Name, State, _Extra) ->
|
|
%% -- states --
|
|
%% -- states --
|
|
|
|
|
|
startup({connect, Host, Username, Password, Opts}, From, State) ->
|
|
startup({connect, Host, Username, Password, Opts}, From, State) ->
|
|
- Port = proplists:get_value(port, Opts, 5432),
|
|
|
|
- Sock_Opts = [{active, false}, {packet, raw}, binary],
|
|
|
|
- case gen_tcp:connect(Host, Port, Sock_Opts) of
|
|
|
|
|
|
+ case pgsql_sock:start_link(self(), Host, Username, Opts) of
|
|
{ok, Sock} ->
|
|
{ok, Sock} ->
|
|
- Reader = spawn_link(?MODULE, read, [self(), Sock, <<>>]),
|
|
|
|
-
|
|
|
|
- Opts2 = ["user", 0, Username, 0],
|
|
|
|
- case proplists:get_value(database, Opts, undefined) of
|
|
|
|
- undefined -> Opts3 = Opts2;
|
|
|
|
- Database -> Opts3 = [Opts2 | ["database", 0, Database, 0]]
|
|
|
|
- end,
|
|
|
|
-
|
|
|
|
put(username, Username),
|
|
put(username, Username),
|
|
put(password, Password),
|
|
put(password, Password),
|
|
- State2 = State#state{reader = Reader,
|
|
|
|
- sock = Sock,
|
|
|
|
- reply_to = From},
|
|
|
|
- send(State2, [<<196608:32>>, Opts3, 0]),
|
|
|
|
-
|
|
|
|
|
|
+ State2 = State#state{sock = Sock, reply_to = From},
|
|
{next_state, auth, State2};
|
|
{next_state, auth, State2};
|
|
Error ->
|
|
Error ->
|
|
{stop, normal, Error, State}
|
|
{stop, normal, Error, State}
|
|
@@ -167,9 +151,8 @@ auth({$R, <<M:?int32, _/binary>>}, State) ->
|
|
{stop, normal, State};
|
|
{stop, normal, State};
|
|
|
|
|
|
%% ErrorResponse
|
|
%% ErrorResponse
|
|
-auth({$E, Bin}, State) ->
|
|
|
|
- Error = decode_error(Bin),
|
|
|
|
- case Error#error.code of
|
|
|
|
|
|
+auth({error, E}, State) ->
|
|
|
|
+ case E#error.code of
|
|
<<"28000">> -> Why = invalid_authorization_specification;
|
|
<<"28000">> -> Why = invalid_authorization_specification;
|
|
Any -> Why = Any
|
|
Any -> Why = Any
|
|
end,
|
|
end,
|
|
@@ -182,9 +165,8 @@ initializing({$K, <<Pid:?int32, Key:?int32>>}, State) ->
|
|
{next_state, initializing, State2};
|
|
{next_state, initializing, State2};
|
|
|
|
|
|
%% ErrorResponse
|
|
%% ErrorResponse
|
|
-initializing({$E, Bin}, State) ->
|
|
|
|
- Error = decode_error(Bin),
|
|
|
|
- case Error#error.code of
|
|
|
|
|
|
+initializing({error, E}, State) ->
|
|
|
|
+ case E#error.code of
|
|
<<"28000">> -> Why = invalid_authorization_specification;
|
|
<<"28000">> -> Why = invalid_authorization_specification;
|
|
Any -> Why = Any
|
|
Any -> Why = Any
|
|
end,
|
|
end,
|
|
@@ -311,9 +293,8 @@ querying({$I, _Bin}, State) ->
|
|
{next_state, querying, State};
|
|
{next_state, querying, State};
|
|
|
|
|
|
%% ErrorResponse
|
|
%% ErrorResponse
|
|
-querying({$E, Bin}, State) ->
|
|
|
|
- Error = decode_error(Bin),
|
|
|
|
- notify(State, {error, Error}),
|
|
|
|
|
|
+querying({error, E}, State) ->
|
|
|
|
+ notify(State, {error, E}),
|
|
{next_state, querying, State};
|
|
{next_state, querying, State};
|
|
|
|
|
|
%% ReadyForQuery
|
|
%% ReadyForQuery
|
|
@@ -326,8 +307,8 @@ parsing({$1, <<>>}, State) ->
|
|
{next_state, describing, State};
|
|
{next_state, describing, State};
|
|
|
|
|
|
%% ErrorResponse
|
|
%% ErrorResponse
|
|
-parsing({$E, Bin}, State) ->
|
|
|
|
- Reply = {error, decode_error(Bin)},
|
|
|
|
|
|
+parsing({error, E}, State) ->
|
|
|
|
+ Reply = {error, E},
|
|
send(State, $S, []),
|
|
send(State, $S, []),
|
|
{next_state, parsing, State#state{reply = Reply}};
|
|
{next_state, parsing, State#state{reply = Reply}};
|
|
|
|
|
|
@@ -343,8 +324,8 @@ binding({$2, <<>>}, State) ->
|
|
{next_state, ready, State};
|
|
{next_state, ready, State};
|
|
|
|
|
|
%% ErrorResponse
|
|
%% ErrorResponse
|
|
-binding({$E, Bin}, State) ->
|
|
|
|
- Reply = {error, decode_error(Bin)},
|
|
|
|
|
|
+binding({error, E}, State) ->
|
|
|
|
+ Reply = {error, E},
|
|
send(State, $S, []),
|
|
send(State, $S, []),
|
|
{next_state, binding, State#state{reply = Reply}};
|
|
{next_state, binding, State#state{reply = Reply}};
|
|
|
|
|
|
@@ -375,8 +356,8 @@ describing({$n, <<>>}, State) ->
|
|
{next_state, ready, State};
|
|
{next_state, ready, State};
|
|
|
|
|
|
%% ErrorResponse
|
|
%% ErrorResponse
|
|
-describing({$E, Bin}, State) ->
|
|
|
|
- Reply = {error, decode_error(Bin)},
|
|
|
|
|
|
+describing({error, E}, State) ->
|
|
|
|
+ Reply = {error, E},
|
|
send(State, $S, []),
|
|
send(State, $S, []),
|
|
{next_state, describing, State#state{reply = Reply}};
|
|
{next_state, describing, State#state{reply = Reply}};
|
|
|
|
|
|
@@ -409,8 +390,8 @@ executing({$I, _Bin}, State) ->
|
|
{next_state, ready, State};
|
|
{next_state, ready, State};
|
|
|
|
|
|
%% ErrorResponse
|
|
%% ErrorResponse
|
|
-executing({$E, Bin}, State) ->
|
|
|
|
- notify(State, {error, decode_error(Bin)}),
|
|
|
|
|
|
+executing({error, E}, State) ->
|
|
|
|
+ notify(State, {error, E}),
|
|
{next_state, executing, State}.
|
|
{next_state, executing, State}.
|
|
|
|
|
|
%% CloseComplete
|
|
%% CloseComplete
|
|
@@ -419,14 +400,14 @@ closing({$3, <<>>}, State) ->
|
|
{next_state, ready, State};
|
|
{next_state, ready, State};
|
|
|
|
|
|
%% ErrorResponse
|
|
%% ErrorResponse
|
|
-closing({$E, Bin}, State) ->
|
|
|
|
- Error = {error, decode_error(Bin)},
|
|
|
|
|
|
+closing({error, E}, State) ->
|
|
|
|
+ Error = {error, E},
|
|
gen_fsm:reply(State#state.reply_to, Error),
|
|
gen_fsm:reply(State#state.reply_to, Error),
|
|
{next_state, ready, State}.
|
|
{next_state, ready, State}.
|
|
|
|
|
|
%% ErrorResponse
|
|
%% ErrorResponse
|
|
-synchronizing({$E, Bin}, State) ->
|
|
|
|
- Reply = {error, decode_error(Bin)},
|
|
|
|
|
|
+synchronizing({error, E}, State) ->
|
|
|
|
+ Reply = {error, E},
|
|
{next_state, synchronizing, State#state{reply = Reply}};
|
|
{next_state, synchronizing, State#state{reply = Reply}};
|
|
|
|
|
|
%% ReadyForQuery
|
|
%% ReadyForQuery
|
|
@@ -437,35 +418,6 @@ synchronizing({$Z, <<Status:8>>}, State) ->
|
|
|
|
|
|
%% -- internal functions --
|
|
%% -- internal functions --
|
|
|
|
|
|
-%% decode a single null-terminated string
|
|
|
|
-decode_string(Bin) ->
|
|
|
|
- decode_string(Bin, <<>>).
|
|
|
|
-
|
|
|
|
-decode_string(<<0, Rest/binary>>, Str) ->
|
|
|
|
- {Str, Rest};
|
|
|
|
-decode_string(<<C, Rest/binary>>, Str) ->
|
|
|
|
- decode_string(Rest, <<Str/binary, C>>).
|
|
|
|
-
|
|
|
|
-%% decode multiple null-terminated string
|
|
|
|
-decode_strings(Bin) ->
|
|
|
|
- decode_strings(Bin, []).
|
|
|
|
-
|
|
|
|
-decode_strings(<<>>, Acc) ->
|
|
|
|
- lists:reverse(Acc);
|
|
|
|
-decode_strings(Bin, Acc) ->
|
|
|
|
- {Str, Rest} = decode_string(Bin),
|
|
|
|
- decode_strings(Rest, [Str | Acc]).
|
|
|
|
-
|
|
|
|
-%% decode field
|
|
|
|
-decode_fields(Bin) ->
|
|
|
|
- decode_fields(Bin, []).
|
|
|
|
-
|
|
|
|
-decode_fields(<<0>>, Acc) ->
|
|
|
|
- Acc;
|
|
|
|
-decode_fields(<<Type:8, Rest/binary>>, Acc) ->
|
|
|
|
- {Str, Rest2} = decode_string(Rest),
|
|
|
|
- decode_fields(Rest2, [{Type, Str} | Acc]).
|
|
|
|
-
|
|
|
|
%% decode data
|
|
%% decode data
|
|
decode_data(Columns, Bin) ->
|
|
decode_data(Columns, Bin) ->
|
|
decode_data(Columns, Bin, []).
|
|
decode_data(Columns, Bin, []).
|
|
@@ -488,7 +440,7 @@ decode_columns(Count, Bin) ->
|
|
decode_columns(0, _Bin, Acc) ->
|
|
decode_columns(0, _Bin, Acc) ->
|
|
lists:reverse(Acc);
|
|
lists:reverse(Acc);
|
|
decode_columns(N, Bin, Acc) ->
|
|
decode_columns(N, Bin, Acc) ->
|
|
- {Name, Rest} = decode_string(Bin),
|
|
|
|
|
|
+ {Name, Rest} = pgsql_sock:decode_string(Bin),
|
|
<<_Table_Oid:?int32, _Attrib_Num:?int16, Type_Oid:?int32,
|
|
<<_Table_Oid:?int32, _Attrib_Num:?int16, Type_Oid:?int32,
|
|
Size:?int16, Modifier:?int32, Format:?int16, Rest2/binary>> = Rest,
|
|
Size:?int16, Modifier:?int32, Format:?int16, Rest2/binary>> = Rest,
|
|
Desc = #column{
|
|
Desc = #column{
|
|
@@ -504,36 +456,14 @@ decode_complete(<<"SELECT", 0>>) -> select;
|
|
decode_complete(<<"BEGIN", 0>>) -> 'begin';
|
|
decode_complete(<<"BEGIN", 0>>) -> 'begin';
|
|
decode_complete(<<"ROLLBACK", 0>>) -> rollback;
|
|
decode_complete(<<"ROLLBACK", 0>>) -> rollback;
|
|
decode_complete(Bin) ->
|
|
decode_complete(Bin) ->
|
|
- {Str, _} = decode_string(Bin),
|
|
|
|
|
|
+ {Str, _} = pgsql_sock:decode_string(Bin),
|
|
case string:tokens(binary_to_list(Str), " ") of
|
|
case string:tokens(binary_to_list(Str), " ") of
|
|
["INSERT", _Oid, Rows] -> {insert, list_to_integer(Rows)};
|
|
["INSERT", _Oid, Rows] -> {insert, list_to_integer(Rows)};
|
|
["UPDATE", Rows] -> {update, list_to_integer(Rows)};
|
|
["UPDATE", Rows] -> {update, list_to_integer(Rows)};
|
|
["DELETE", Rows] -> {delete, list_to_integer(Rows)};
|
|
["DELETE", Rows] -> {delete, list_to_integer(Rows)};
|
|
["MOVE", Rows] -> {move, list_to_integer(Rows)};
|
|
["MOVE", Rows] -> {move, list_to_integer(Rows)};
|
|
["FETCH", Rows] -> {fetch, list_to_integer(Rows)};
|
|
["FETCH", Rows] -> {fetch, list_to_integer(Rows)};
|
|
- [Type | _Rest] -> lower_atom(Type)
|
|
|
|
- end.
|
|
|
|
-
|
|
|
|
-%% decode ErrorResponse
|
|
|
|
-decode_error(Bin) ->
|
|
|
|
- Fields = decode_fields(Bin),
|
|
|
|
- Error = #error{
|
|
|
|
- severity = lower_atom(proplists:get_value($S, Fields)),
|
|
|
|
- code = proplists:get_value($C, Fields),
|
|
|
|
- message = proplists:get_value($M, Fields),
|
|
|
|
- extra = decode_error_extra(Fields)},
|
|
|
|
- Error.
|
|
|
|
-
|
|
|
|
-decode_error_extra(Fields) ->
|
|
|
|
- Types = [{$D, detail}, {$H, hint}, {$P, position}],
|
|
|
|
- decode_error_extra(Types, Fields, []).
|
|
|
|
-
|
|
|
|
-decode_error_extra([], _Fields, Extra) ->
|
|
|
|
- Extra;
|
|
|
|
-decode_error_extra([{Type, Name} | T], Fields, Extra) ->
|
|
|
|
- case proplists:get_value(Type, Fields) of
|
|
|
|
- undefined -> decode_error_extra(T, Fields, Extra);
|
|
|
|
- Value -> decode_error_extra(T, Fields, [{Name, Value} | Extra])
|
|
|
|
|
|
+ [Type | _Rest] -> pgsql_sock:lower_atom(Type)
|
|
end.
|
|
end.
|
|
|
|
|
|
%% encode types
|
|
%% encode types
|
|
@@ -599,11 +529,6 @@ encode_list(L) ->
|
|
notify(#state{reply_to = {Pid, _Tag}}, Msg) ->
|
|
notify(#state{reply_to = {Pid, _Tag}}, Msg) ->
|
|
Pid ! {pgsql, self(), Msg}.
|
|
Pid ! {pgsql, self(), Msg}.
|
|
|
|
|
|
-lower_atom(Str) when is_binary(Str) ->
|
|
|
|
- lower_atom(binary_to_list(Str));
|
|
|
|
-lower_atom(Str) when is_list(Str) ->
|
|
|
|
- list_to_atom(string:to_lower(Str)).
|
|
|
|
-
|
|
|
|
to_binary(B) when is_binary(B) -> B;
|
|
to_binary(B) when is_binary(B) -> B;
|
|
to_binary(L) when is_list(L) -> list_to_binary(L).
|
|
to_binary(L) when is_list(L) -> list_to_binary(L).
|
|
|
|
|
|
@@ -616,36 +541,4 @@ hex(Bin) ->
|
|
%% send data to server
|
|
%% send data to server
|
|
|
|
|
|
send(#state{sock = Sock}, Type, Data) ->
|
|
send(#state{sock = Sock}, Type, Data) ->
|
|
- Bin = iolist_to_binary(Data),
|
|
|
|
- gen_tcp:send(Sock, <<Type:8, (byte_size(Bin) + 4):?int32, Bin/binary>>).
|
|
|
|
-
|
|
|
|
-send(#state{sock = Sock}, Data) ->
|
|
|
|
- Bin = iolist_to_binary(Data),
|
|
|
|
- gen_tcp:send(Sock, <<(byte_size(Bin) + 4):?int32, Bin/binary>>).
|
|
|
|
-
|
|
|
|
-%% -- socket read loop --
|
|
|
|
-
|
|
|
|
-read(Fsm, Sock, Tail) ->
|
|
|
|
- case gen_tcp:recv(Sock, 0) of
|
|
|
|
- {ok, Bin} -> decode(Fsm, Sock, <<Tail/binary, Bin/binary>>);
|
|
|
|
- Error -> exit(Error)
|
|
|
|
- end.
|
|
|
|
-
|
|
|
|
-decode(Fsm, Sock, <<Type:8, Len:?int32, Rest/binary>> = Bin) ->
|
|
|
|
- Len2 = Len - 4,
|
|
|
|
- case Rest of
|
|
|
|
- <<Data:Len2/binary, Tail/binary>> when Type == $N ->
|
|
|
|
- gen_fsm:send_all_state_event(Fsm, {notice, decode_error(Data)}),
|
|
|
|
- decode(Fsm, Sock, Tail);
|
|
|
|
- <<Data:Len2/binary, Tail/binary>> when Type == $S ->
|
|
|
|
- [Name, Value] = decode_strings(Data),
|
|
|
|
- gen_fsm:send_all_state_event(Fsm, {parameter_status, Name, Value}),
|
|
|
|
- decode(Fsm, Sock, Tail);
|
|
|
|
- <<Data:Len2/binary, Tail/binary>> ->
|
|
|
|
- gen_fsm:send_event(Fsm, {Type, Data}),
|
|
|
|
- decode(Fsm, Sock, Tail);
|
|
|
|
- _Other ->
|
|
|
|
- ?MODULE:read(Fsm, Sock, Bin)
|
|
|
|
- end;
|
|
|
|
-decode(Fsm, Sock, Bin) ->
|
|
|
|
- ?MODULE:read(Fsm, Sock, Bin).
|
|
|
|
|
|
+ pgsql_sock:send(Sock, Type, Data).
|