Browse Source

Unify 'decode_message' handling

Сергей Прохоров 7 years ago
parent
commit
b136fb9e23
2 changed files with 44 additions and 45 deletions
  1. 43 39
      src/epgsql_sock.erl
  2. 1 6
      src/epgsql_wire.erl

+ 43 - 39
src/epgsql_sock.erl

@@ -18,7 +18,7 @@
 -export([init/1, code_change/3, terminate/2]).
 
 %% state callbacks
--export([auth/2, initializing/2, on_message/2, on_replication/2]).
+-export([auth/3, initializing/3, on_message/3, on_replication/3]).
 
 -include("epgsql.hrl").
 -include("epgsql_binary.hrl").
@@ -66,6 +66,7 @@
 -define(NOTIFICATION, $A).
 -define(COMMAND_COMPLETE, $C).
 -define(DATA_ROW, $D).
+-define(ERROR, $E).
 -define(EMPTY_QUERY, $I).
 -define(CANCELLATION_KEY, $K).
 -define(NO_DATA, $n).
@@ -521,8 +522,8 @@ do_send(Mod, Sock, Bin) ->
 
 loop(#state{data = Data, handler = Handler, repl = Repl} = State) ->
     case epgsql_wire:decode_message(Data) of
-        {Message, Tail} ->
-            case ?MODULE:Handler(Message, State#state{data = Tail}) of
+        {Type, Payload, Tail} ->
+            case ?MODULE:Handler(Type, Payload, State#state{data = Tail}) of
                 {noreply, State2} ->
                     loop(State2);
                 R = {stop, _Reason2, _State2} ->
@@ -672,22 +673,22 @@ hex(Bin) ->
 %% -- backend message handling --
 
 %% AuthenticationOk
-auth({?AUTHENTICATION_REQUEST, <<0:?int32>>}, State) ->
+auth(?AUTHENTICATION_REQUEST, <<0:?int32>>, State) ->
     {noreply, State#state{handler = initializing}};
 
 %% AuthenticationCleartextPassword
-auth({?AUTHENTICATION_REQUEST, <<3:?int32>>}, State) ->
+auth(?AUTHENTICATION_REQUEST, <<3:?int32>>, State) ->
     send(State, ?PASSWORD, [get(password), 0]),
     {noreply, State};
 
 %% AuthenticationMD5Password
-auth({?AUTHENTICATION_REQUEST, <<5:?int32, Salt:4/binary>>}, State) ->
+auth(?AUTHENTICATION_REQUEST, <<5:?int32, Salt:4/binary>>, State) ->
     Digest1 = hex(erlang:md5([get(password), get(username)])),
     Str = ["md5", hex(erlang:md5([Digest1, Salt])), 0],
     send(State, ?PASSWORD, Str),
     {noreply, State};
 
-auth({?AUTHENTICATION_REQUEST, <<M:?int32, _/binary>>}, State) ->
+auth(?AUTHENTICATION_REQUEST, <<M:?int32, _/binary>>, State) ->
     Method = case M of
         2 -> kerberosV5;
         4 -> crypt;
@@ -700,7 +701,8 @@ auth({?AUTHENTICATION_REQUEST, <<M:?int32, _/binary>>}, State) ->
     {stop, normal, State2};
 
 %% ErrorResponse
-auth({error, E}, State) ->
+auth(?ERROR, Err, State) ->
+    E = epgsql_wire:decode_error(Err),
     Why = case E#error.code of
         <<"28000">> -> invalid_authorization_specification;
         <<"28P01">> -> invalid_password;
@@ -708,16 +710,15 @@ auth({error, E}, State) ->
     end,
     {stop, normal, finish(State, {error, Why})};
 
-auth(Other, State) ->
-    on_message(Other, State).
+auth(OtherType, OtherData, State) ->
+    on_message(OtherType, OtherData, State).
 
 %% BackendKeyData
-initializing({?CANCELLATION_KEY, <<Pid:?int32, Key:?int32>>}, State) ->
+initializing(?CANCELLATION_KEY, <<Pid:?int32, Key:?int32>>, State) ->
     {noreply, State#state{backend = {Pid, Key}}};
 
 %% ReadyForQuery
-initializing({?READY_FOR_QUERY, <<Status:8>>}, State) ->
-    #state{parameters = Parameters} = State,
+initializing(?READY_FOR_QUERY, <<Status:8>>, #state{parameters = Parameters} = State) ->
     erase(username),
     erase(password),
     %% TODO decode dates to now() format
@@ -731,24 +732,25 @@ initializing({?READY_FOR_QUERY, <<Status:8>>}, State) ->
                    connected),
     {noreply, State2};
 
-initializing({error, _} = Error, State) ->
-    {stop, normal, finish(State, Error)};
+initializing(?ERROR, Err, State) ->
+    E = epgsql_wire:decode_error(Err),
+    {stop, normal, finish(State, {error, E})};
 
-initializing(Other, State) ->
-    on_message(Other, State).
+initializing(OtherType, OtherData, State) ->
+    on_message(OtherType, OtherData, State).
 
 %% ParseComplete
-on_message({?PARSE_COMPLETE, <<>>}, State) ->
+on_message(?PARSE_COMPLETE, <<>>, State) ->
     {noreply, State};
 
 %% ParameterDescription
-on_message({?PARAMETER_DESCRIPTION, <<_Count:?int16, Bin/binary>>}, State) ->
+on_message(?PARAMETER_DESCRIPTION, <<_Count:?int16, Bin/binary>>, State) ->
     Types = [epgsql_binary:oid2type(Oid, State#state.codec) || <<Oid:?int32>> <= Bin],
     State2 = notify(State#state{types = Types}, {types, Types}),
     {noreply, State2};
 
 %% RowDescription
-on_message({?ROW_DESCRIPTION, <<Count:?int16, Bin/binary>>}, State) ->
+on_message(?ROW_DESCRIPTION, <<Count:?int16, Bin/binary>>, State) ->
     Columns = epgsql_wire:decode_columns(Count, Bin, State#state.codec),
     Columns2 =
         case command_tag(State) of
@@ -771,7 +773,7 @@ on_message({?ROW_DESCRIPTION, <<Count:?int16, Bin/binary>>}, State) ->
     {noreply, State3};
 
 %% NoData
-on_message({?NO_DATA, <<>>}, State) ->
+on_message(?NO_DATA, <<>>, State) ->
     State2 = case command_tag(State) of
                  C when C == parse; C == describe_statement ->
                      finish(State, no_data, {ok, make_statement(State)});
@@ -781,7 +783,7 @@ on_message({?NO_DATA, <<>>}, State) ->
     {noreply, State2};
 
 %% BindComplete
-on_message({?BIND_COMPLETE, <<>>}, State) ->
+on_message(?BIND_COMPLETE, <<>>, State) ->
     State2 = case command_tag(State) of
                  Command when Command == equery; Command == prepared_query ->
                      %% TODO send Describe as a part of equery, needs text format support
@@ -801,7 +803,7 @@ on_message({?BIND_COMPLETE, <<>>}, State) ->
     {noreply, State2};
 
 %% CloseComplete
-on_message({?CLOSE_COMPLETE, <<>>}, State) ->
+on_message(?CLOSE_COMPLETE, <<>>, State) ->
     State2 = case command_tag(State) of
                  Command when Command == equery; Command == prepared_query ->
                      State;
@@ -811,19 +813,19 @@ on_message({?CLOSE_COMPLETE, <<>>}, State) ->
     {noreply, State2};
 
 %% DataRow
-on_message({?DATA_ROW, <<_Count:?int16, Bin/binary>>}, State) ->
+on_message(?DATA_ROW, <<_Count:?int16, Bin/binary>>, State) ->
     Data = epgsql_wire:decode_data(get_columns(State), Bin, State#state.codec),
     {noreply, add_row(State, Data)};
 
 %% PortalSuspended
-on_message({?PORTAL_SUSPENDED, <<>>}, State) ->
+on_message(?PORTAL_SUSPENDED, <<>>, State) ->
     State2 = finish(State,
                    suspended,
                    {partial, lists:reverse(State#state.rows)}),
     {noreply, State2};
 
 %% CommandComplete
-on_message({?COMMAND_COMPLETE, Bin}, State0) ->
+on_message(?COMMAND_COMPLETE, Bin, State0) ->
     Complete = epgsql_wire:decode_complete(Bin),
     State = State0#state{complete_status = Complete},
     Command = command_tag(State),
@@ -853,7 +855,7 @@ on_message({?COMMAND_COMPLETE, Bin}, State0) ->
     {noreply, State2};
 
 %% EmptyQueryResponse
-on_message({?EMPTY_QUERY, _Bin}, State) ->
+on_message(?EMPTY_QUERY, _Bin, State) ->
     Notice = {complete, empty},
     State2 = case command_tag(State) of
                  execute ->
@@ -864,7 +866,7 @@ on_message({?EMPTY_QUERY, _Bin}, State) ->
     {noreply, State2};
 
 %% ReadyForQuery
-on_message({?READY_FOR_QUERY, <<Status:8>>}, State) ->
+on_message(?READY_FOR_QUERY, <<Status:8>>, State) ->
     State2 = case command_tag(State) of
                  squery ->
                      case State#state.results of
@@ -887,7 +889,9 @@ on_message({?READY_FOR_QUERY, <<Status:8>>}, State) ->
              end,
     {noreply, State2#state{txstatus = Status}};
 
-on_message(Error = {error, Reason}, State) ->
+on_message(?ERROR, Err, State) ->
+    Reason = epgsql_wire:decode_error(Err),
+    Error = {error, Reason},
     case queue:is_empty(State#state.queue) of
         true ->
             {stop, {shutdown, Reason}, State};
@@ -902,19 +906,19 @@ on_message(Error = {error, Reason}, State) ->
     end;
 
 %% NoticeResponse
-on_message({?NOTICE, Data}, State) ->
+on_message(?NOTICE, Data, State) ->
     notify_async(State, {notice, epgsql_wire:decode_error(Data)}),
     {noreply, State};
 
 %% ParameterStatus
-on_message({?PARAMETER_STATUS, Data}, State) ->
+on_message(?PARAMETER_STATUS, Data, State) ->
     [Name, Value] = epgsql_wire:decode_strings(Data),
     Parameters2 = lists:keystore(Name, 1, State#state.parameters,
                                  {Name, Value}),
     {noreply, State#state{parameters = Parameters2}};
 
 %% NotificationResponse
-on_message({?NOTIFICATION, <<Pid:?int32, Strings/binary>>}, State) ->
+on_message(?NOTIFICATION, <<Pid:?int32, Strings/binary>>, State) ->
     {Channel1, Payload1} = case epgsql_wire:decode_strings(Strings) of
         [Channel, Payload] -> {Channel, Payload};
         [Channel]          -> {Channel, <<>>}
@@ -923,19 +927,19 @@ on_message({?NOTIFICATION, <<Pid:?int32, Strings/binary>>}, State) ->
     {noreply, State};
 
 %% CopyData for COPY command. COPY command not supported yet.
-on_message({?COPY_DATA, _Data}, State) ->
+on_message(?COPY_DATA, _Data, State) ->
     {stop, {error, copy_command_not_supported}, State}.
 
 
 %% CopyBothResponse
-on_replication({?COPY_BOTH_RESPONSE, _Data}, State) ->
+on_replication(?COPY_BOTH_RESPONSE, _Data, State) ->
     State2 = finish(State, ok),
     {noreply, State2};
 
 %% CopyData for Replication mode
-on_replication({?COPY_DATA, <<?PRIMARY_KEEPALIVE_MESSAGE:8, LSN:?int64, _Timestamp:?int64, ReplyRequired:8>>},
-    #state{repl = #repl{last_flushed_lsn = LastFlushedLSN,
-                        last_applied_lsn = LastAppliedLSN} = Repl} = State) ->
+on_replication(?COPY_DATA, <<?PRIMARY_KEEPALIVE_MESSAGE:8, LSN:?int64, _Timestamp:?int64, ReplyRequired:8>>,
+               #state{repl = #repl{last_flushed_lsn = LastFlushedLSN,
+                                   last_applied_lsn = LastAppliedLSN} = Repl} = State) ->
     Repl1 =
         case ReplyRequired of
             1 ->
@@ -950,8 +954,8 @@ on_replication({?COPY_DATA, <<?PRIMARY_KEEPALIVE_MESSAGE:8, LSN:?int64, _Timesta
     {noreply, State#state{repl = Repl1}};
 
 %% CopyData for Replication mode
-on_replication({?COPY_DATA, <<?X_LOG_DATA, StartLSN:?int64, EndLSN:?int64,
-                              _Timestamp:?int64, WALRecord/binary>>},
+on_replication(?COPY_DATA, <<?X_LOG_DATA, StartLSN:?int64, EndLSN:?int64,
+                             _Timestamp:?int64, WALRecord/binary>>,
                #state{repl = Repl} = State) ->
     Repl1 = handle_xlog_data(StartLSN, EndLSN, WALRecord, Repl),
     {noreply, State#state{repl = Repl1}}.

+ 1 - 6
src/epgsql_wire.erl

@@ -24,12 +24,7 @@ decode_message(<<Type:8, Len:?int32, Rest/binary>> = Bin) ->
     Len2 = Len - 4,
     case Rest of
         <<Data:Len2/binary, Tail/binary>> ->
-            case Type of
-                $E ->
-                    {{error, decode_error(Data)}, Tail};
-                _ ->
-                    {{Type, Data}, Tail}
-            end;
+            {Type, Data, Tail};
         _Other ->
             Bin
     end;