Browse Source

Fix a bug in HTTP/2 where connection gets closed prematurely

When the user code was sending a response fully without reading
the request body, the connection could get closed when receiving
DATA frames for that body. We now ask the client to stop sending
data via a NO_ERROR RST_STREAM, and linger any stream that has
been reset so that we can skip any pending frames from that
stream.

This fixes a number of intermittent failures in req_SUITE, which
now passes reliably.

In addition a small number of rfc7540_SUITE test cases have been
corrected as they were incorrect.
Loïc Hoguin 7 years ago
parent
commit
e7114577bb
2 changed files with 68 additions and 31 deletions
  1. 65 28
      src/cowboy_http2.erl
  2. 3 3
      test/rfc7540_SUITE.erl

+ 65 - 28
src/cowboy_http2.erl

@@ -98,6 +98,10 @@
 	%% by the client or by the server through PUSH_PROMISE frames.
 	%% by the client or by the server through PUSH_PROMISE frames.
 	streams = [] :: [stream()],
 	streams = [] :: [stream()],
 
 
+	%% HTTP/2 streams that have been reset recently. We are expected
+	%% to keep receiving additional frames after sending an RST_STREAM.
+	lingering_streams = [] :: [cowboy_stream:streamid()],
+
 	%% Streams can spawn zero or more children which are then managed
 	%% Streams can spawn zero or more children which are then managed
 	%% by this module if operating as a supervisor.
 	%% by this module if operating as a supervisor.
 	children = cowboy_children:init() :: cowboy_children:children(),
 	children = cowboy_children:init() :: cowboy_children:children(),
@@ -286,14 +290,12 @@ frame(State=#state{client_streamid=LastStreamID}, {data, StreamID, _, _})
 		when StreamID > LastStreamID ->
 		when StreamID > LastStreamID ->
 	terminate(State, {connection_error, protocol_error,
 	terminate(State, {connection_error, protocol_error,
 		'DATA frame received on a stream in idle state. (RFC7540 5.1)'});
 		'DATA frame received on a stream in idle state. (RFC7540 5.1)'});
-frame(State0=#state{remote_window=ConnWindow, streams=Streams},
+frame(State0=#state{remote_window=ConnWindow, streams=Streams, lingering_streams=Lingering},
 		{data, StreamID, IsFin, Data}) ->
 		{data, StreamID, IsFin, Data}) ->
 	DataLen = byte_size(Data),
 	DataLen = byte_size(Data),
 	State = State0#state{remote_window=ConnWindow - DataLen},
 	State = State0#state{remote_window=ConnWindow - DataLen},
 	case lists:keyfind(StreamID, #stream.id, Streams) of
 	case lists:keyfind(StreamID, #stream.id, Streams) of
 		Stream = #stream{state=flush, remote=nofin, remote_window=StreamWindow} ->
 		Stream = #stream{state=flush, remote=nofin, remote_window=StreamWindow} ->
-			%% @todo We need to cancel streams that we don't want to receive
-			%% the full body from after we finish flushing the response.
 			after_commands(State, Stream#stream{remote=IsFin, remote_window=StreamWindow - DataLen});
 			after_commands(State, Stream#stream{remote=IsFin, remote_window=StreamWindow - DataLen});
 		Stream = #stream{state=StreamState0, remote=nofin, remote_window=StreamWindow} ->
 		Stream = #stream{state=StreamState0, remote=nofin, remote_window=StreamWindow} ->
 			try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
 			try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
@@ -311,9 +313,17 @@ frame(State0=#state{remote_window=ConnWindow, streams=Streams},
 			stream_reset(State, StreamID, {stream_error, stream_closed,
 			stream_reset(State, StreamID, {stream_error, stream_closed,
 				'DATA frame received for a half-closed (remote) stream. (RFC7540 5.1)'});
 				'DATA frame received for a half-closed (remote) stream. (RFC7540 5.1)'});
 		false ->
 		false ->
-			%% @todo What about RST_STREAM? Sigh.
-			terminate(State, {connection_error, stream_closed,
-				'DATA frame received for a closed stream. (RFC7540 5.1)'})
+			%% After we send an RST_STREAM frame and terminate a stream,
+			%% the client still might be sending us some more frames
+			%% until it can process this RST_STREAM. We therefore ignore
+			%% DATA frames received for such lingering streams.
+			case lists:member(StreamID, Lingering) of
+				true ->
+					State0;
+				false ->
+					terminate(State, {connection_error, stream_closed,
+						'DATA frame received for a closed stream. (RFC7540 5.1)'})
+			end
 	end;
 	end;
 %% HEADERS frame with invalid even-numbered streamid.
 %% HEADERS frame with invalid even-numbered streamid.
 frame(State, {headers, StreamID, _, _, _}) when StreamID rem 2 =:= 0 ->
 frame(State, {headers, StreamID, _, _, _}) when StreamID rem 2 =:= 0 ->
@@ -574,9 +584,6 @@ status(Status) when is_integer(Status) ->
 status(<< H, T, U, _/bits >>) when H >= $1, H =< $9, T >= $0, T =< $9, U >= $0, U =< $9 ->
 status(<< H, T, U, _/bits >>) when H >= $1, H =< $9, T >= $0, T =< $9, U >= $0, U =< $9 ->
 	<< H, T, U >>.
 	<< H, T, U >>.
 
 
-
-
-
 %% @todo Should we ever want to implement the PRIORITY mechanism,
 %% @todo Should we ever want to implement the PRIORITY mechanism,
 %% this would be the place to do it. Right now, we just go over
 %% this would be the place to do it. Right now, we just go over
 %% all streams and send what we can until either everything is
 %% all streams and send what we can until either everything is
@@ -584,7 +591,6 @@ status(<< H, T, U, _/bits >>) when H >= $1, H =< $9, T >= $0, T =< $9, U >= $0,
 send_data(State=#state{streams=Streams}) ->
 send_data(State=#state{streams=Streams}) ->
 	resume_streams(State, Streams, []).
 	resume_streams(State, Streams, []).
 
 
-%% @todo When streams terminate we need to remove the stream.
 resume_streams(State, [], Acc) ->
 resume_streams(State, [], Acc) ->
 	State#state{streams=lists:reverse(Acc)};
 	State#state{streams=lists:reverse(Acc)};
 %% While technically we should never get < 0 here, let's be on the safe side.
 %% While technically we should never get < 0 here, let's be on the safe side.
@@ -593,8 +599,18 @@ resume_streams(State=#state{local_window=ConnWindow}, Streams, Acc)
 	State#state{streams=lists:reverse(Acc, Streams)};
 	State#state{streams=lists:reverse(Acc, Streams)};
 %% We rely on send_data/2 to do all the necessary checks about the stream.
 %% We rely on send_data/2 to do all the necessary checks about the stream.
 resume_streams(State0, [Stream0|Tail], Acc) ->
 resume_streams(State0, [Stream0|Tail], Acc) ->
-	{State, Stream} = send_data(State0, Stream0),
-	resume_streams(State, Tail, [Stream|Acc]).
+	{State1, Stream} = send_data(State0, Stream0),
+	case Stream of
+		%% We are done flushing, remove the stream.
+		%% Maybe skip the request body if it was not fully read.
+		#stream{state=flush, local=fin} ->
+			State = maybe_skip_body(State1, Stream, normal),
+			resume_streams(State, Tail, Acc);
+		%% Keep the stream. Either the stream handler is still running,
+		%% or we are not finished flushing.
+		_ ->
+			resume_streams(State1, Tail, [Stream|Acc])
+	end.
 
 
 %% @todo We might want to print an error if local=fin.
 %% @todo We might want to print an error if local=fin.
 %%
 %%
@@ -762,43 +778,54 @@ stream_handler_init(State=#state{opts=Opts,
 			'Unhandled exception in cowboy_stream:init/3.'})
 			'Unhandled exception in cowboy_stream:init/3.'})
 	end.
 	end.
 
 
-%% @todo We might need to keep track of which stream has been reset so we don't send lots of them.
-stream_reset(State=#state{socket=Socket, transport=Transport}, StreamID,
-		StreamError={internal_error, _, _}) ->
-	Transport:send(Socket, cow_http2:rst_stream(StreamID, internal_error)),
-	stream_terminate(State, StreamID, StreamError);
-stream_reset(State=#state{socket=Socket, transport=Transport}, StreamID,
-		StreamError={stream_error, Reason, _}) ->
+%% @todo Don't send an RST_STREAM if one was already sent.
+stream_reset(State=#state{socket=Socket, transport=Transport}, StreamID, StreamError) ->
+	Reason = case StreamError of
+		{internal_error, _, _} -> internal_error;
+		{stream_error, Reason0, _} -> Reason0
+	end,
 	Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)),
 	Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)),
-	stream_terminate(State, StreamID, StreamError).
+	stream_terminate(stream_linger(State, StreamID), StreamID, StreamError).
 
 
-stream_terminate(State=#state{socket=Socket, transport=Transport,
+%% We only keep up to 100 streams in this state. @todo Make it configurable?
+stream_linger(State=#state{lingering_streams=Lingering0}, StreamID) ->
+	Lingering = [StreamID|lists:sublist(Lingering0, 100 - 1)],
+	State#state{lingering_streams=Lingering}.
+
+stream_terminate(State0=#state{socket=Socket, transport=Transport,
 		streams=Streams0, children=Children0}, StreamID, Reason) ->
 		streams=Streams0, children=Children0}, StreamID, Reason) ->
 	case lists:keytake(StreamID, #stream.id, Streams0) of
 	case lists:keytake(StreamID, #stream.id, Streams0) of
 		%% When the stream terminates normally (without sending RST_STREAM)
 		%% When the stream terminates normally (without sending RST_STREAM)
 		%% and no response was sent, we need to send a proper response back to the client.
 		%% and no response was sent, we need to send a proper response back to the client.
-		{value, #stream{local=idle}, Streams} when Reason =:= normal ->
-			State1 = #state{streams=Streams1} = info(State, StreamID, {response, 204, #{}, <<>>}),
+		{value, Stream=#stream{local=idle}, Streams} when Reason =:= normal ->
+			State1 = #state{streams=Streams1} = info(State0, StreamID, {response, 204, #{}, <<>>}),
+			State = maybe_skip_body(State1, Stream, Reason),
 			#stream{state=StreamState} = lists:keyfind(StreamID, #stream.id, Streams1),
 			#stream{state=StreamState} = lists:keyfind(StreamID, #stream.id, Streams1),
 			stream_call_terminate(StreamID, Reason, StreamState),
 			stream_call_terminate(StreamID, Reason, StreamState),
 			Children = cowboy_children:shutdown(Children0, StreamID),
 			Children = cowboy_children:shutdown(Children0, StreamID),
-			State1#state{streams=Streams, children=Children};
+			State#state{streams=Streams, children=Children};
 		%% When a response was sent but not terminated, we need to close the stream.
 		%% When a response was sent but not terminated, we need to close the stream.
-		{value, #stream{state=StreamState, local=nofin, local_buffer_size=0}, Streams}
+		{value, Stream=#stream{state=StreamState, local=nofin, local_buffer_size=0}, Streams}
 				when Reason =:= normal ->
 				when Reason =:= normal ->
 			Transport:send(Socket, cow_http2:data(StreamID, fin, <<>>)),
 			Transport:send(Socket, cow_http2:data(StreamID, fin, <<>>)),
+			State = maybe_skip_body(State0, Stream, Reason),
 			stream_call_terminate(StreamID, Reason, StreamState),
 			stream_call_terminate(StreamID, Reason, StreamState),
 			Children = cowboy_children:shutdown(Children0, StreamID),
 			Children = cowboy_children:shutdown(Children0, StreamID),
 			State#state{streams=Streams, children=Children};
 			State#state{streams=Streams, children=Children};
 		%% Unless there is still data in the buffer. We can however reset
 		%% Unless there is still data in the buffer. We can however reset
 		%% a few fields and set a special local state to avoid confusion.
 		%% a few fields and set a special local state to avoid confusion.
+		%%
+		%% We do not reset the stream in this case (to skip the body)
+		%% because we are still sending data via the buffer. We will
+		%% reset the stream if necessary once the buffer is empty.
 		{value, Stream=#stream{state=StreamState, local=nofin}, Streams} ->
 		{value, Stream=#stream{state=StreamState, local=nofin}, Streams} ->
 			stream_call_terminate(StreamID, Reason, StreamState),
 			stream_call_terminate(StreamID, Reason, StreamState),
 			Children = cowboy_children:shutdown(Children0, StreamID),
 			Children = cowboy_children:shutdown(Children0, StreamID),
-			State#state{streams=[Stream#stream{state=flush, local=flush}|Streams],
+			State0#state{streams=[Stream#stream{state=flush, local=flush}|Streams],
 				children=Children};
 				children=Children};
 		%% Otherwise we sent an RST_STREAM and/or the stream is already closed.
 		%% Otherwise we sent an RST_STREAM and/or the stream is already closed.
-		{value, #stream{state=StreamState}, Streams} ->
+		{value, Stream=#stream{state=StreamState}, Streams} ->
+			State = maybe_skip_body(State0, Stream, Reason),
 			stream_call_terminate(StreamID, Reason, StreamState),
 			stream_call_terminate(StreamID, Reason, StreamState),
 			Children = cowboy_children:shutdown(Children0, StreamID),
 			Children = cowboy_children:shutdown(Children0, StreamID),
 			State#state{streams=Streams, children=Children};
 			State#state{streams=Streams, children=Children};
@@ -807,9 +834,19 @@ stream_terminate(State=#state{socket=Socket, transport=Transport,
 		%% the cowboy_stream:init call failed, in which case doing nothing
 		%% the cowboy_stream:init call failed, in which case doing nothing
 		%% is correct.
 		%% is correct.
 		false ->
 		false ->
-			State
+			State0
 	end.
 	end.
 
 
+%% When the stream stops normally without reading the request
+%% body fully we need to tell the client to stop sending it.
+%% We do this by sending an RST_STREAM with reason NO_ERROR. (RFC7540 8.1.0)
+maybe_skip_body(State=#state{socket=Socket, transport=Transport},
+		#stream{id=StreamID, remote=nofin}, normal) ->
+	Transport:send(Socket, cow_http2:rst_stream(StreamID, no_error)),
+	stream_linger(State, StreamID);
+maybe_skip_body(State, _, _) ->
+	State.
+
 stream_call_terminate(StreamID, Reason, StreamState) ->
 stream_call_terminate(StreamID, Reason, StreamState) ->
 	try
 	try
 		cowboy_stream:terminate(StreamID, Reason, StreamState)
 		cowboy_stream:terminate(StreamID, Reason, StreamState)

+ 3 - 3
test/rfc7540_SUITE.erl

@@ -2032,7 +2032,7 @@ stream_closed_accept_priority(Config) ->
 		{<<":authority">>, <<"localhost">>}, %% @todo Correct port number.
 		{<<":authority">>, <<"localhost">>}, %% @todo Correct port number.
 		{<<":path">>, <<"/">>}
 		{<<":path">>, <<"/">>}
 	]),
 	]),
-	ok = gen_tcp:send(Socket, cow_http2:headers(1, nofin, HeadersBlock)),
+	ok = gen_tcp:send(Socket, cow_http2:headers(1, fin, HeadersBlock)),
 	%% Receive the response.
 	%% Receive the response.
 	{ok, << Length1:24, 1:8, _:40 >>} = gen_tcp:recv(Socket, 9, 6000),
 	{ok, << Length1:24, 1:8, _:40 >>} = gen_tcp:recv(Socket, 9, 6000),
 	{ok, _} = gen_tcp:recv(Socket, Length1, 6000),
 	{ok, _} = gen_tcp:recv(Socket, Length1, 6000),
@@ -2055,7 +2055,7 @@ stream_closed_accept_rst_stream(Config) ->
 		{<<":authority">>, <<"localhost">>}, %% @todo Correct port number.
 		{<<":authority">>, <<"localhost">>}, %% @todo Correct port number.
 		{<<":path">>, <<"/">>}
 		{<<":path">>, <<"/">>}
 	]),
 	]),
-	ok = gen_tcp:send(Socket, cow_http2:headers(1, nofin, HeadersBlock)),
+	ok = gen_tcp:send(Socket, cow_http2:headers(1, fin, HeadersBlock)),
 	%% Receive the response.
 	%% Receive the response.
 	{ok, << Length1:24, 1:8, _:40 >>} = gen_tcp:recv(Socket, 9, 6000),
 	{ok, << Length1:24, 1:8, _:40 >>} = gen_tcp:recv(Socket, 9, 6000),
 	{ok, _} = gen_tcp:recv(Socket, Length1, 6000),
 	{ok, _} = gen_tcp:recv(Socket, Length1, 6000),
@@ -2084,7 +2084,7 @@ stream_closed_accept_window_update(Config) ->
 		{<<":authority">>, <<"localhost">>}, %% @todo Correct port number.
 		{<<":authority">>, <<"localhost">>}, %% @todo Correct port number.
 		{<<":path">>, <<"/">>}
 		{<<":path">>, <<"/">>}
 	]),
 	]),
-	ok = gen_tcp:send(Socket, cow_http2:headers(1, nofin, HeadersBlock)),
+	ok = gen_tcp:send(Socket, cow_http2:headers(1, fin, HeadersBlock)),
 	%% Receive the response.
 	%% Receive the response.
 	{ok, << Length1:24, 1:8, _:40 >>} = gen_tcp:recv(Socket, 9, 6000),
 	{ok, << Length1:24, 1:8, _:40 >>} = gen_tcp:recv(Socket, 9, 6000),
 	{ok, _} = gen_tcp:recv(Socket, Length1, 6000),
 	{ok, _} = gen_tcp:recv(Socket, Length1, 6000),