Browse Source

Fix bugs related to HTTP/1.1 pipelining

The flow control is now only set to infinity when we are
skipping the request body of the stream that is being
terminated. This fixes a bug where it was set to infinity
while reading a subsequent request's body, leading to a
crash.

The timeout is no longer reset on stream termination.
Timeout handling is already done when receiving data
from the socket and doing a reset on stream termination
was leading to the wrong timeout being set or the right
timeout being reset needlessly.
Loïc Hoguin 5 years ago
parent
commit
752297b153
2 changed files with 21 additions and 19 deletions
  1. 18 17
      src/cowboy_http.erl
  2. 3 2
      test/rfc7230_SUITE.erl

+ 18 - 17
src/cowboy_http.erl

@@ -1266,6 +1266,7 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta
 		children=Children0}, StreamID, Reason) ->
 	#stream{version=Version, local_expected_size=ExpectedSize, local_sent_size=SentSize}
 		= lists:keyfind(StreamID, #stream.id, Streams0),
+	%% Send a response or terminate chunks depending on the current output state.
 	State1 = #state{streams=Streams1} = case OutState of
 		wait when element(1, Reason) =:= internal_error ->
 			info(State0, StreamID, {response, 500, #{<<"content-length">> => <<"0">>}, <<>>});
@@ -1280,19 +1281,15 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta
 		_ -> %% done or Version =:= 'HTTP/1.0'
 			State0
 	end,
-	%% Remove the stream from the state and reset the overriden options.
+	%% Stop the stream, shutdown children and reset overriden options.
 	{value, #stream{state=StreamState}, Streams}
 		= lists:keytake(StreamID, #stream.id, Streams1),
-	State2 = State1#state{streams=Streams, overriden_opts=#{}, flow=infinity},
-	%% Stop the stream.
-	stream_call_terminate(StreamID, Reason, StreamState, State2),
+	stream_call_terminate(StreamID, Reason, StreamState, State1),
 	Children = cowboy_children:shutdown(Children0, StreamID),
-	%% We reset the timeout if there are no active streams anymore.
-	State = set_timeout(State2#state{streams=Streams, children=Children}, request_timeout),
+	State = State1#state{overriden_opts=#{}, streams=Streams, children=Children},
 	%% We want to drop the connection if the body was not read fully
 	%% and we don't know its length or more remains to be read than
 	%% configuration allows.
-	%% @todo Only do this if Current =:= StreamID.
 	MaxSkipBodyLength = maps:get(max_skip_body_length, Opts, 1000000),
 	case InState of
 		#ps_body{length=undefined}
@@ -1301,17 +1298,21 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta
 		#ps_body{length=Len, received=Received}
 				when InStreamID =:= OutStreamID, Received + MaxSkipBodyLength < Len ->
 			terminate(State, skip_body_too_large);
+		#ps_body{} when InStreamID =:= OutStreamID ->
+			stream_next(State#state{flow=infinity});
 		_ ->
-			%% Move on to the next stream.
-			NextOutStreamID = OutStreamID + 1,
-			case lists:keyfind(NextOutStreamID, #stream.id, Streams) of
-				false ->
-					State#state{out_streamid=NextOutStreamID, out_state=wait};
-				#stream{queue=Commands} ->
-					%% @todo Remove queue from the stream.
-					commands(State#state{out_streamid=NextOutStreamID, out_state=wait},
-						NextOutStreamID, Commands)
-			end
+			stream_next(State)
+	end.
+
+stream_next(State=#state{out_streamid=OutStreamID, streams=Streams}) ->
+	NextOutStreamID = OutStreamID + 1,
+	case lists:keyfind(NextOutStreamID, #stream.id, Streams) of
+		false ->
+			State#state{out_streamid=NextOutStreamID, out_state=wait};
+		#stream{queue=Commands} ->
+			%% @todo Remove queue from the stream.
+			commands(State#state{out_streamid=NextOutStreamID, out_state=wait},
+				NextOutStreamID, Commands)
 	end.
 
 stream_call_terminate(StreamID, Reason, StreamState, #state{opts=Opts}) ->

+ 3 - 2
test/rfc7230_SUITE.erl

@@ -40,6 +40,7 @@ init_routes(_) -> [
 	{"localhost", [
 		{"/", hello_h, []},
 		{"/echo/:key[/:arg]", echo_h, []},
+		{"/full/:key[/:arg]", echo_h, []},
 		{"/length/echo/:key", echo_h, []},
 		{"/resp/:key[/:arg]", resp_h, []},
 		{"/send_message", send_message_h, []},
@@ -1553,13 +1554,13 @@ pipeline(Config) ->
 	ConnPid = gun_open(Config),
 	Refs = [{
 		gun:get(ConnPid, "/"),
-		gun:delete(ConnPid, "/echo/method")
+		gun:post(ConnPid, "/full/read_body", [], <<0:800000>>)
 	} || _ <- lists:seq(1, 25)],
 	_ = [begin
 		{response, nofin, 200, _} = gun:await(ConnPid, Ref1),
 		{ok, <<"Hello world!">>} = gun:await_body(ConnPid, Ref1),
 		{response, nofin, 200, _} = gun:await(ConnPid, Ref2),
-		{ok, <<"DELETE">>} = gun:await_body(ConnPid, Ref2)
+		{ok, <<0:800000>>} = gun:await_body(ConnPid, Ref2)
 	end || {Ref1, Ref2} <- Refs],
 	ok.