Browse Source

Add streaming without chunking for HTTP/1.1

If content-length is set in the response headers
we can skip chunked transfer-encoding.
Eric Meadows-Jönsson 7 years ago
parent
commit
f08f4610a0
4 changed files with 140 additions and 38 deletions
  1. 3 1
      doc/src/manual/cowboy_req.stream_reply.asciidoc
  2. 62 37
      src/cowboy_http.erl
  3. 26 0
      test/handlers/resp_h.erl
  4. 49 0
      test/req_SUITE.erl

+ 3 - 1
doc/src/manual/cowboy_req.stream_reply.asciidoc

@@ -45,7 +45,9 @@ more efficiently.
 
 The streaming method varies depending on the protocol being
 used. HTTP/2 will use the usual DATA frames. HTTP/1.1 will
-use chunked transfer-encoding. HTTP/1.0 will send the body
+use chunked transfer-encoding, if the content-length
+response header is set the body will be sent without chunked
+chunked transfer-encoding. HTTP/1.0 will send the body
 unmodified and close the connection at the end if no
 content-length was set.
 

+ 62 - 37
src/cowboy_http.erl

@@ -78,6 +78,10 @@
 	version = undefined :: cowboy:http_version(),
 	%% Unparsed te header. Used to know if we can send trailers.
 	te :: undefined | binary(),
+	%% Expected body size.
+	local_expected_size = undefined :: undefined | non_neg_integer(),
+	%% Sent body size.
+	local_sent_size = 0 :: non_neg_integer(),
 	%% Commands queued.
 	queue = [] :: cowboy_stream:commands()
 }).
@@ -113,7 +117,7 @@
 	out_streamid = 1 :: pos_integer(),
 
 	%% Whether we finished writing data for the current stream.
-	out_state = wait :: wait | chunked | done,
+	out_state = wait :: wait | chunked | streaming | done,
 
 	%% The connection will be closed after this stream.
 	last_streamid = undefined :: pos_integer(),
@@ -924,22 +928,29 @@ commands(State0=#state{socket=Socket, transport=Transport, out_state=wait, strea
 			Transport:send(Socket, [Response, Body]),
 			commands(State#state{out_state=done}, StreamID, Tail)
 	end;
-%% Send response headers and initiate chunked encoding.
-commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
-		[{headers, StatusCode, Headers0}|Tail]) ->
+%% Send response headers and initiate chunked encoding or streaming.
+commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out_state=OutState},
+		StreamID, [{headers, StatusCode, Headers0}|Tail]) ->
 	%% @todo Same as above (about the last stream in the list).
-	Stream = #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
-	{State1, Headers1} = case {cow_http:status_to_integer(StatusCode), Version} of
-		{204, 'HTTP/1.1'} ->
+	Stream = #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams0),
+	Status = cow_http:status_to_integer(StatusCode),
+	ContentLength = maps:get(<<"content-length">>, Headers0, undefined),
+	{State1, Headers1} = case {Status, ContentLength, Version} of
+		{204, _, 'HTTP/1.1'} ->
 			{State0#state{out_state=done}, Headers0};
-		{_, 'HTTP/1.1'} ->
+		{_, undefined, 'HTTP/1.1'} ->
 			{State0#state{out_state=chunked}, Headers0#{<<"transfer-encoding">> => <<"chunked">>}};
-		%% Close the connection after streaming the data to HTTP/1.0 client.
-		%% @todo I'm guessing we need to differentiate responses with a content-length and others.
-		{_, 'HTTP/1.0'} ->
-			{State0#state{out_state=chunked, last_streamid=StreamID}, Headers0}
+		%% Close the connection after streaming without content-length to HTTP/1.0 client.
+		{_, undefined, 'HTTP/1.0'} ->
+			{State0#state{out_state=streaming, last_streamid=StreamID}, Headers0};
+		%% Stream the response body without chunked transfer-encoding.
+		_ ->
+			ExpectedSize = cow_http_hd:parse_content_length(ContentLength),
+			Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
+				Stream#stream{local_expected_size=ExpectedSize}),
+			{State0#state{out_state=streaming, streams=Streams}, Headers0}
 	end,
-	Headers2 = case stream_te(Stream) of
+	Headers2 = case stream_te(OutState, Stream) of
 		trailers -> Headers1;
 		_ -> maps:remove(<<"trailer">>, Headers1)
 	end,
@@ -950,49 +961,60 @@ commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, Str
 %%
 %% @todo WINDOW_UPDATE stuff require us to buffer some data.
 %% @todo We probably want to allow Data to be the {sendfile, ...} tuple also.
-commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
-		[{data, IsFin, Data}|Tail]) ->
+commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out_state=OutState},
+		StreamID, [{data, IsFin, Data}|Tail]) ->
 	%% Do not send anything when the user asks to send an empty
 	%% data frame, as that would break the protocol.
 	Size = iolist_size(Data),
-	case Size of
+	Stream0 = lists:keyfind(StreamID, #stream.id, Streams0),
+	Stream = case Size of
 		0 ->
 			%% We send the last chunk only if version is HTTP/1.1 and IsFin=fin.
-			case lists:keyfind(StreamID, #stream.id, Streams) of
-				#stream{method= <<"HEAD">>} ->
+			case {OutState, Stream0} of
+				{_, #stream{method= <<"HEAD">>}} ->
 					ok;
-				#stream{version='HTTP/1.1'} when IsFin =:= fin ->
+				{chunked, _} when IsFin =:= fin ->
 					Transport:send(Socket, <<"0\r\n\r\n">>);
 				_ ->
 					ok
-			end;
+			end,
+			Stream0;
 		_ ->
 			%% @todo We need to kill the stream if it tries to send data before headers.
 			%% @todo Same as above.
-			case lists:keyfind(StreamID, #stream.id, Streams) of
-				#stream{method= <<"HEAD">>} ->
-					ok;
-				#stream{version='HTTP/1.1'} ->
+			case {OutState, Stream0} of
+				{_, #stream{method= <<"HEAD">>}} ->
+					Stream0;
+				{chunked, _} ->
 					Transport:send(Socket, [
 						integer_to_binary(Size, 16), <<"\r\n">>, Data,
 						case IsFin of
 							fin -> <<"\r\n0\r\n\r\n">>;
 							nofin -> <<"\r\n">>
 						end
-					]);
-				#stream{version='HTTP/1.0'} ->
-					Transport:send(Socket, Data)
+					]),
+					Stream0;
+				{streaming, #stream{local_sent_size=SentSize0, local_expected_size=ExpectedSize}} ->
+					SentSize = SentSize0 + Size,
+					if
+						%% undefined is > any integer value.
+						SentSize > ExpectedSize ->
+							terminate(State0, response_body_too_large);
+						true ->
+							Transport:send(Socket, Data),
+							Stream0#stream{local_sent_size=SentSize}
+					end
 			end
 	end,
 	State = case IsFin of
 		fin -> State0#state{out_state=done};
 		nofin -> State0
 	end,
-	commands(State, StreamID, Tail);
-%% Send trailers.
-commands(State=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
-		[{trailers, Trailers}|Tail]) ->
-	case stream_te(lists:keyfind(StreamID, #stream.id, Streams)) of
+	Streams = lists:keyreplace(StreamID, #stream.id, Streams0, Stream),
+	commands(State#state{streams=Streams}, StreamID, Tail);
+commands(State=#state{socket=Socket, transport=Transport, streams=Streams, out_state=OutState},
+		StreamID, [{trailers, Trailers}|Tail]) ->
+	case stream_te(OutState, lists:keyfind(StreamID, #stream.id, Streams)) of
 		trailers ->
 			Transport:send(Socket, [
 				<<"0\r\n">>,
@@ -1008,6 +1030,7 @@ commands(State=#state{socket=Socket, transport=Transport, streams=Streams}, Stre
 %% Send a file.
 commands(State0=#state{socket=Socket, transport=Transport}, StreamID,
 		[{sendfile, IsFin, Offset, Bytes, Path}|Tail]) ->
+	%% @todo exit with response_body_too_large if we exceed content-length
 	%% We wrap the sendfile call into a try/catch because on OTP-20
 	%% and earlier a few different crashes could occur for sockets
 	%% that were closing or closed. For example a badarg in
@@ -1112,7 +1135,8 @@ stream_reset(State, StreamID, StreamError={internal_error, _, _}) ->
 stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InState,
 		out_streamid=OutStreamID, out_state=OutState, streams=Streams0,
 		children=Children0}, StreamID, Reason) ->
-	#stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams0),
+	#stream{version=Version, local_expected_size=ExpectedSize, local_sent_size=SentSize}
+		= lists:keyfind(StreamID, #stream.id, Streams0),
 	State1 = #state{streams=Streams1} = case OutState of
 		wait when element(1, Reason) =:= internal_error ->
 			info(State0, StreamID, {response, 500, #{<<"content-length">> => <<"0">>}, <<>>});
@@ -1122,6 +1146,8 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta
 			info(State0, StreamID, {response, 204, #{}, <<>>});
 		chunked when Version =:= 'HTTP/1.1' ->
 			info(State0, StreamID, {data, fin, <<>>});
+		streaming when ExpectedSize < SentSize ->
+			terminate(State0, response_body_too_small);
 		_ -> %% done or Version =:= 'HTTP/1.0'
 			State0
 	end,
@@ -1214,13 +1240,12 @@ connection_hd_is_close(Conn) ->
 	Conns = cow_http_hd:parse_connection(iolist_to_binary(Conn)),
 	lists:member(<<"close">>, Conns).
 
-%% HTTP/1.0 doesn't support chunked transfer-encoding.
-stream_te(#stream{version='HTTP/1.0'}) ->
+stream_te(streaming, _) ->
 	not_chunked;
 %% No TE header was sent.
-stream_te(#stream{te=undefined}) ->
+stream_te(_, #stream{te=undefined}) ->
 	no_trailers;
-stream_te(#stream{te=TE0}) ->
+stream_te(_, #stream{te=TE0}) ->
 	try cow_http_hd:parse_te(TE0) of
 		{TE1, _} -> TE1
 	catch _:_ ->

+ 26 - 0
test/handlers/resp_h.erl

@@ -220,6 +220,32 @@ do(<<"stream_body">>, Req0, Opts) ->
 			cowboy_req:stream_body(<<0:800000>>, fin, Req0),
 			{ok, Req0, Opts}
 	end;
+do(<<"stream_body_content_length">>, Req0, Opts) ->
+	case cowboy_req:binding(arg, Req0) of
+		<<"fin0">> ->
+			Req1 = cowboy_req:set_resp_header(<<"content-length">>, <<"12">>, Req0),
+			Req = cowboy_req:stream_reply(200, Req1),
+			cowboy_req:stream_body(<<"Hello world!">>, nofin, Req),
+			cowboy_req:stream_body(<<>>, fin, Req),
+			{ok, Req, Opts};
+		<<"multiple">> ->
+			Req1 = cowboy_req:set_resp_header(<<"content-length">>, <<"12">>, Req0),
+			Req = cowboy_req:stream_reply(200, Req1),
+			cowboy_req:stream_body(<<"Hello ">>, nofin, Req),
+			cowboy_req:stream_body(<<"world">>, nofin, Req),
+			cowboy_req:stream_body(<<"!">>, fin, Req),
+			{ok, Req, Opts};
+		<<"nofin">> ->
+			Req1 = cowboy_req:set_resp_header(<<"content-length">>, <<"12">>, Req0),
+			Req = cowboy_req:stream_reply(200, Req1),
+			cowboy_req:stream_body(<<"Hello world!">>, nofin, Req),
+			{ok, Req, Opts};
+		<<"nofin-error">> ->
+			Req1 = cowboy_req:set_resp_header(<<"content-length">>, <<"12">>, Req0),
+			Req = cowboy_req:stream_reply(200, Req1),
+			cowboy_req:stream_body(<<"Hello">>, nofin, Req),
+			{ok, Req, Opts}
+	end;
 do(<<"stream_trailers">>, Req0, Opts) ->
 	case cowboy_req:binding(arg, Req0) of
 		<<"large">> ->

+ 49 - 0
test/req_SUITE.erl

@@ -146,6 +146,23 @@ do_decode(Headers, Body) ->
 		_ -> Body
 	end.
 
+do_get_error(Path, Config) ->
+	do_get_error(Path, [], Config).
+
+do_get_error(Path, Headers, Config) ->
+	ConnPid = gun_open(Config),
+	Ref = gun:get(ConnPid, Path, [{<<"accept-encoding">>, <<"gzip">>}|Headers]),
+	{response, IsFin, Status, RespHeaders} = gun:await(ConnPid, Ref),
+	Result = case IsFin of
+		nofin -> gun:await_body(ConnPid, Ref);
+		fin -> {ok, <<>>}
+	end,
+	gun:close(ConnPid),
+	case Result of
+		{ok, RespBody} -> {Status, RespHeaders, do_decode(RespHeaders, RespBody)};
+		_ -> Result
+	end.
+
 %% Tests: Request.
 
 binding(Config) ->
@@ -856,6 +873,38 @@ stream_body_nofin(Config) ->
 	{200, _, <<"Hello world!">>} = do_get("/resp/stream_body/nofin", Config),
 	ok.
 
+stream_body_content_length_multiple(Config) ->
+	doc("Streamed body via multiple calls."),
+	{200, _, <<"Hello world!">>} = do_get("/resp/stream_body_content_length/multiple", Config),
+	ok.
+
+stream_body_content_length_fin0(Config) ->
+	doc("Streamed body with last chunk of size 0."),
+	{200, _, <<"Hello world!">>} = do_get("/resp/stream_body_content_length/fin0", Config),
+	ok.
+
+stream_body_content_length_nofin(Config) ->
+	doc("Unfinished streamed body."),
+	{200, _, <<"Hello world!">>} = do_get("/resp/stream_body_content_length/nofin", Config),
+	ok.
+
+stream_body_content_length_nofin_error(Config) ->
+	doc("Not all of body sent."),
+	case config(protocol, Config) of
+		http ->
+			case do_get_error("/resp/stream_body_content_length/nofin-error", Config) of
+				{200, Headers, <<"Hello">>} ->
+					{_, <<"gzip">>} = lists:keyfind(<<"content-encoding">>, 1, Headers);
+				{error, {closed, "The connection was lost."}} ->
+					ok;
+				{error, timeout} ->
+					ok
+			end;
+		http2 ->
+			%% @todo HTTP2 should have the same content-length checks
+			ok
+	end.
+
 %% @todo Crash when calling stream_body after the fin flag has been set.
 %% @todo Crash when calling stream_body after calling reply.
 %% @todo Crash when calling stream_body before calling stream_reply.