Browse Source

Add sendfile support to cowboy_req:stream_body

It is now possible to stream one or more sendfile tuples.
A simple example of what can now be done would be for
example to build a tar file on the fly using the sendfile
syscall for sending the files, or to support Range requests
with more than one range with the sendfile syscall.

When using cowboy_compress_h unfortunately we have to read
the file in order to send it. More options will be added
at a later time to make sure users don't read too much
into memory. This is a new feature however so existing
code is not affected.

Also rework cowboy_http's data sending to be flatter.
Loïc Hoguin 6 years ago
parent
commit
d7b7580b39

+ 2 - 1
doc/src/manual/cowboy_req.stream_body.asciidoc

@@ -10,7 +10,7 @@ cowboy_req:stream_body - Stream the response body
 ----
 stream_body(Data, IsFin, Req :: cowboy_req:req()) -> ok
 
-Data  :: iodata()
+Data  :: cowboy_req:resp_body()
 IsFin :: fin | nofin
 ----
 
@@ -58,6 +58,7 @@ The atom `ok` is always returned. It can be safely ignored.
 
 == Changelog
 
+* *2.6*: The `Data` argument can now be a sendfile tuple.
 * *2.0*: Function introduced. Replaces `chunk/2`.
 
 == Examples

+ 2 - 1
doc/src/manual/cowboy_stream.asciidoc

@@ -133,7 +133,7 @@ Send data to the client.
 
 [source,erlang]
 ----
-{data, fin(), iodata()}
+{data, fin(), cowboy_req:resp_body()}
 ----
 
 [[trailers_command]]
@@ -420,6 +420,7 @@ tuple.
 
 == Changelog
 
+* *2.6*: The data command can now contain a sendfile tuple.
 * *2.6*: The `{stop, {exit, any()}, HumanReadable}` terminate reason was added.
 * *2.2*: The trailers command was introduced.
 * *2.0*: Module introduced.

+ 28 - 1
src/cowboy_compress_h.erl

@@ -98,7 +98,7 @@ fold(Commands, State) ->
 
 fold([], State, Acc) ->
 	{lists:reverse(Acc), State};
-%% We do not compress sendfile bodies.
+%% We do not compress full sendfile bodies.
 fold([Response={response, _, _, {sendfile, _, _, _}}|Tail], State, Acc) ->
 	fold(Tail, State, [Response|Acc]);
 %% We compress full responses directly, unless they are lower than
@@ -171,6 +171,21 @@ gzip_headers({headers, Status, Headers0}, State) ->
 		<<"content-encoding">> => <<"gzip">>
 	}}, State#state{deflate=Z}}.
 
+%% It is not possible to combine zlib and the sendfile
+%% syscall as far as I can tell, because the zlib format
+%% includes a checksum at the end of the stream. We have
+%% to read the file in memory, making this not suitable for
+%% large files.
+gzip_data({data, nofin, Sendfile={sendfile, _, _, _}}, State=#state{deflate=Z}) ->
+	{ok, Data0} = read_file(Sendfile),
+	Data = zlib:deflate(Z, Data0),
+	{{data, nofin, Data}, State};
+gzip_data({data, fin, Sendfile={sendfile, _, _, _}}, State=#state{deflate=Z}) ->
+	{ok, Data0} = read_file(Sendfile),
+	Data = zlib:deflate(Z, Data0, finish),
+	zlib:deflateEnd(Z),
+	zlib:close(Z),
+	{{data, fin, Data}, State#state{deflate=undefined}};
 gzip_data({data, nofin, Data0}, State=#state{deflate=Z}) ->
 	Data = zlib:deflate(Z, Data0),
 	{{data, nofin, Data}, State};
@@ -179,3 +194,15 @@ gzip_data({data, fin, Data0}, State=#state{deflate=Z}) ->
 	zlib:deflateEnd(Z),
 	zlib:close(Z),
 	{{data, fin, Data}, State#state{deflate=undefined}}.
+
+read_file({sendfile, Offset, Bytes, Path}) ->
+	{ok, IoDevice} = file:open(Path, [read, raw, binary]),
+	try
+		_ = case Offset of
+			0 -> ok;
+			_ -> file:position(IoDevice, {bof, Offset})
+		end,
+		file:read(IoDevice, Bytes)
+	after
+		file:close(IoDevice)
+	end.

+ 79 - 80
src/cowboy_http.erl

@@ -940,17 +940,18 @@ commands(State0=#state{socket=Socket, transport=Transport, out_state=wait, strea
 	%% @todo I'm pretty sure the last stream in the list is the one we want
 	%% considering all others are queued.
 	#stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
-	{State, Headers} = connection(State0, Headers0, StreamID, Version),
+	{State1, Headers} = connection(State0, Headers0, StreamID, Version),
+	State = State1#state{out_state=done},
 	%% @todo Ensure content-length is set.
 	Response = cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers)),
 	case Body of
-		{sendfile, O, B, P} ->
+		{sendfile, _, _, _} ->
 			Transport:send(Socket, Response),
-			commands(State, StreamID, [{sendfile, fin, O, B, P}|Tail]);
+			sendfile(State, Body);
 		_ ->
-			Transport:send(Socket, [Response, Body]),
-			commands(State#state{out_state=done}, StreamID, Tail)
-	end;
+			Transport:send(Socket, [Response, Body])
+	end,
+	commands(State, StreamID, Tail);
 %% Send response headers and initiate chunked encoding or streaming.
 commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out_state=OutState},
 		StreamID, [{headers, StatusCode, Headers0}|Tail]) ->
@@ -981,53 +982,57 @@ commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out
 	Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers))),
 	commands(State, StreamID, Tail);
 %% Send a response body chunk.
-%%
-%% @todo WINDOW_UPDATE stuff require us to buffer some data.
-%% @todo We probably want to allow Data to be the {sendfile, ...} tuple also.
+%% @todo We need to kill the stream if it tries to send data before headers.
 commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out_state=OutState},
 		StreamID, [{data, IsFin, Data}|Tail]) ->
 	%% Do not send anything when the user asks to send an empty
 	%% data frame, as that would break the protocol.
-	Size = iolist_size(Data),
-	Stream0 = lists:keyfind(StreamID, #stream.id, Streams0),
-	Stream = case Size of
-		0 ->
-			%% We send the last chunk only if version is HTTP/1.1 and IsFin=fin.
-			case {OutState, Stream0} of
-				{_, #stream{method= <<"HEAD">>}} ->
-					ok;
-				{chunked, _} when IsFin =:= fin ->
-					Transport:send(Socket, <<"0\r\n\r\n">>);
-				_ ->
-					ok
-			end,
+	Size = case Data of
+		{sendfile, _, B, _} -> B;
+		_ -> iolist_size(Data)
+	end,
+	%% Depending on the current state we may need to send nothing,
+	%% the last chunk, chunked data with/without the last chunk,
+	%% or just the data as-is.
+	Stream = case lists:keyfind(StreamID, #stream.id, Streams0) of
+		Stream0=#stream{method= <<"HEAD">>} ->
 			Stream0;
-		_ ->
-			%% @todo We need to kill the stream if it tries to send data before headers.
-			%% @todo Same as above.
-			case {OutState, Stream0} of
-				{_, #stream{method= <<"HEAD">>}} ->
-					Stream0;
-				{chunked, _} ->
-					Transport:send(Socket, [
-						integer_to_binary(Size, 16), <<"\r\n">>, Data,
-						case IsFin of
-							fin -> <<"\r\n0\r\n\r\n">>;
-							nofin -> <<"\r\n">>
-						end
-					]),
-					Stream0;
-				{streaming, #stream{local_sent_size=SentSize0, local_expected_size=ExpectedSize}} ->
-					SentSize = SentSize0 + Size,
-					if
-						%% undefined is > any integer value.
-						SentSize > ExpectedSize ->
-							terminate(State0, response_body_too_large);
-						true ->
-							Transport:send(Socket, Data),
-							Stream0#stream{local_sent_size=SentSize}
-					end
-			end
+		Stream0 when Size =:= 0, IsFin =:= fin, OutState =:= chunked ->
+			Transport:send(Socket, <<"0\r\n\r\n">>),
+			Stream0;
+		Stream0 when Size =:= 0 ->
+			Stream0;
+		Stream0 when is_tuple(Data), OutState =:= chunked ->
+			Transport:send(Socket, [integer_to_binary(Size, 16), <<"\r\n">>]),
+			sendfile(State0, Data),
+			Transport:send(Socket,
+				case IsFin of
+					fin -> <<"\r\n0\r\n\r\n">>;
+					nofin -> <<"\r\n">>
+				end),
+			Stream0;
+		Stream0 when OutState =:= chunked ->
+			Transport:send(Socket, [
+				integer_to_binary(Size, 16), <<"\r\n">>, Data,
+				case IsFin of
+					fin -> <<"\r\n0\r\n\r\n">>;
+					nofin -> <<"\r\n">>
+				end
+			]),
+			Stream0;
+		Stream0 when OutState =:= streaming ->
+			#stream{local_sent_size=SentSize0, local_expected_size=ExpectedSize} = Stream0,
+			SentSize = SentSize0 + Size,
+			if
+				%% ExpectedSize may be undefined, which is > any integer value.
+				SentSize > ExpectedSize ->
+					terminate(State0, response_body_too_large);
+				is_tuple(Data) ->
+					sendfile(State0, Data);
+				true ->
+					Transport:send(Socket, Data)
+			end,
+			Stream0#stream{local_sent_size=SentSize}
 	end,
 	State = case IsFin of
 		fin -> State0#state{out_state=done};
@@ -1050,38 +1055,6 @@ commands(State=#state{socket=Socket, transport=Transport, streams=Streams, out_s
 			ok
 	end,
 	commands(State#state{out_state=done}, StreamID, Tail);
-%% Send a file.
-commands(State0=#state{socket=Socket, transport=Transport, opts=Opts}, StreamID,
-		[{sendfile, IsFin, Offset, Bytes, Path}|Tail]) ->
-	%% @todo exit with response_body_too_large if we exceed content-length
-	%% We wrap the sendfile call into a try/catch because on OTP-20
-	%% and earlier a few different crashes could occur for sockets
-	%% that were closing or closed. For example a badarg in
-	%% erlang:port_get_data(#Port<...>) or a badmatch like
-	%% {{badmatch,{error,einval}},[{prim_file,sendfile,8,[]}...
-	%%
-	%% OTP-21 uses a NIF instead of a port so the implementation
-	%% and behavior has dramatically changed and it is unclear
-	%% whether it will be necessary in the future.
-	%%
-	%% This try/catch prevents some noisy logs to be written
-	%% when these errors occur.
-	try
-		%% When sendfile is disabled we explicitly use the fallback.
-		_ = case maps:get(sendfile, Opts, true) of
-			true -> Transport:sendfile(Socket, Path, Offset, Bytes);
-			false -> ranch_transport:sendfile(Transport, Socket, Path, Offset, Bytes, [])
-		end,
-		State = case IsFin of
-			fin -> State0#state{out_state=done}
-%% @todo Add the sendfile command.
-%			nofin -> State0
-		end,
-		commands(State, StreamID, Tail)
-	catch _:_ ->
-		terminate(State0, {socket_error, sendfile_crash,
-			'An error occurred when using the sendfile function.'})
-	end;
 %% Protocol takeover.
 commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport,
 		out_state=OutState, opts=Opts, children=Children}, StreamID,
@@ -1136,6 +1109,32 @@ headers_to_list(Headers0=#{<<"set-cookie">> := SetCookies}) ->
 headers_to_list(Headers) ->
 	maps:to_list(Headers).
 
+%% We wrap the sendfile call into a try/catch because on OTP-20
+%% and earlier a few different crashes could occur for sockets
+%% that were closing or closed. For example a badarg in
+%% erlang:port_get_data(#Port<...>) or a badmatch like
+%% {{badmatch,{error,einval}},[{prim_file,sendfile,8,[]}...
+%%
+%% OTP-21 uses a NIF instead of a port so the implementation
+%% and behavior has dramatically changed and it is unclear
+%% whether it will be necessary in the future.
+%%
+%% This try/catch prevents some noisy logs to be written
+%% when these errors occur.
+sendfile(State=#state{socket=Socket, transport=Transport, opts=Opts},
+		{sendfile, Offset, Bytes, Path}) ->
+	try
+		%% When sendfile is disabled we explicitly use the fallback.
+		_ = case maps:get(sendfile, Opts, true) of
+			true -> Transport:sendfile(Socket, Path, Offset, Bytes);
+			false -> ranch_transport:sendfile(Transport, Socket, Path, Offset, Bytes, [])
+		end,
+		ok
+	catch _:_ ->
+		terminate(State, {socket_error, sendfile_crash,
+			'An error occurred when using the sendfile function.'})
+	end.
+
 %% Flush messages specific to cowboy_http before handing over the
 %% connection to another protocol.
 flush(Parent) ->

+ 0 - 6
src/cowboy_http2.erl

@@ -510,12 +510,6 @@ commands(State0, StreamID, [{data, IsFin, Data}|Tail]) ->
 commands(State0, StreamID, [{trailers, Trailers}|Tail]) ->
 	State = maybe_send_data(State0, StreamID, fin, {trailers, maps:to_list(Trailers)}),
 	commands(State, StreamID, Tail);
-%% Send a file.
-%% @todo Add the sendfile command.
-%commands(State0, Stream0=#stream{local=nofin},
-%		[{sendfile, IsFin, Offset, Bytes, Path}|Tail]) ->
-%	{State, Stream} = maybe_send_data(State0, Stream0, IsFin, {sendfile, Offset, Bytes, Path}),
-%	commands(State, Stream, Tail);
 %% Send a push promise.
 %%
 %% @todo Responses sent as a result of a push_promise request

+ 2 - 0
src/cowboy_metrics_h.erl

@@ -243,6 +243,8 @@ fold([{headers, Status, Headers}|Tail],
 		end,
 		resp_start=RespStart
 	});
+%% @todo It might be worthwhile to keep the sendfile information around,
+%% especially if these frames ultimately result in a sendfile syscall.
 fold([{data, nofin, Data}|Tail], State=#state{resp_body_length=RespBodyLen}) ->
 	fold(Tail, State#state{
 		resp_body_length=RespBodyLen + resp_body_length(Data)

+ 18 - 4
src/cowboy_req.erl

@@ -808,21 +808,35 @@ stream_reply(Status, Headers=#{}, Req=#{pid := Pid, streamid := StreamID})
 	Pid ! {{Pid, StreamID}, {headers, Status, response_headers(Headers, Req)}},
 	done_replying(Req, headers).
 
--spec stream_body(iodata(), fin | nofin, req()) -> ok.
+-spec stream_body(resp_body(), fin | nofin, req()) -> ok.
 %% Error out if headers were not sent.
 %% Don't send any body for HEAD responses.
 stream_body(_, _, #{method := <<"HEAD">>, has_sent_resp := headers}) ->
 	ok;
 %% Don't send a message if the data is empty, except for the
-%% very last message with IsFin=fin.
-stream_body(Data, IsFin=nofin, #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) ->
+%% very last message with IsFin=fin. When using sendfile this
+%% is converted to a data tuple, however.
+stream_body({sendfile, _, 0, _}, nofin, _) ->
+	ok;
+stream_body({sendfile, _, 0, _}, IsFin=fin,
+		#{pid := Pid, streamid := StreamID, has_sent_resp := headers}) ->
+	Pid ! {{Pid, StreamID}, {data, IsFin, <<>>}},
+	ok;
+stream_body({sendfile, O, B, P}, IsFin,
+		#{pid := Pid, streamid := StreamID, has_sent_resp := headers})
+		when is_integer(O), O >= 0, is_integer(B), B > 0 ->
+	Pid ! {{Pid, StreamID}, {data, IsFin, {sendfile, O, B, P}}},
+	ok;
+stream_body(Data, IsFin=nofin, #{pid := Pid, streamid := StreamID, has_sent_resp := headers})
+		when not is_tuple(Data) ->
 	case iolist_size(Data) of
 		0 -> ok;
 		_ ->
 			Pid ! {{Pid, StreamID}, {data, IsFin, Data}},
 			ok
 	end;
-stream_body(Data, IsFin, #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) ->
+stream_body(Data, IsFin, #{pid := Pid, streamid := StreamID, has_sent_resp := headers})
+		when not is_tuple(Data) ->
 	Pid ! {{Pid, StreamID}, {data, IsFin, Data}},
 	ok.
 

+ 1 - 1
src/cowboy_stream.erl

@@ -32,7 +32,7 @@
 -type commands() :: [{inform, cowboy:http_status(), cowboy:http_headers()}
 	| resp_command()
 	| {headers, cowboy:http_status(), cowboy:http_headers()}
-	| {data, fin(), iodata()}
+	| {data, fin(), cowboy_req:resp_body()}
 	| {trailers, cowboy:http_headers()}
 	| {push, binary(), binary(), binary(), inet:port_number(),
 		binary(), binary(), cowboy:http_headers()}

+ 18 - 0
test/compress_SUITE.erl

@@ -121,6 +121,24 @@ gzip_stream_reply(Config) ->
 	_ = zlib:gunzip(GzBody),
 	ok.
 
+gzip_stream_reply_sendfile(Config) ->
+	doc("Stream reply using sendfile for some chunks; get a gzipped response."),
+	{200, Headers, GzBody} = do_get("/stream_reply/sendfile",
+		[{<<"accept-encoding">>, <<"gzip">>}], Config),
+	{_, <<"gzip">>} = lists:keyfind(<<"content-encoding">>, 1, Headers),
+	file:write_file("/tmp/test.gz", GzBody),
+	_ = zlib:gunzip(GzBody),
+	ok.
+
+gzip_stream_reply_sendfile_fin(Config) ->
+	doc("Stream reply using sendfile for some chunks; get a gzipped response."),
+	{200, Headers, GzBody} = do_get("/stream_reply/sendfile_fin",
+		[{<<"accept-encoding">>, <<"gzip">>}], Config),
+	{_, <<"gzip">>} = lists:keyfind(<<"content-encoding">>, 1, Headers),
+	file:write_file("/tmp/test.gz", GzBody),
+	_ = zlib:gunzip(GzBody),
+	ok.
+
 gzip_stream_reply_content_encoding(Config) ->
 	doc("Stream reply with content-encoding header; get an uncompressed response."),
 	{200, Headers, Body} = do_get("/stream_reply/content-encoding",

+ 24 - 1
test/handlers/compress_h.erl

@@ -27,7 +27,30 @@ init(Req0, State=stream_reply) ->
 		<<"large">> ->
 			stream_reply(#{}, Req0);
 		<<"content-encoding">> ->
-			stream_reply(#{<<"content-encoding">> => <<"compress">>}, Req0)
+			stream_reply(#{<<"content-encoding">> => <<"compress">>}, Req0);
+		<<"sendfile">> ->
+			Data = lists:duplicate(10000, $a),
+			AppFile = code:where_is_file("cowboy.app"),
+			Size = filelib:file_size(AppFile),
+			Req1 = cowboy_req:stream_reply(200, Req0),
+			%% We send a few files interspersed into other data.
+			cowboy_req:stream_body(Data, nofin, Req1),
+			cowboy_req:stream_body({sendfile, 0, Size, AppFile}, nofin, Req1),
+			cowboy_req:stream_body(Data, nofin, Req1),
+			cowboy_req:stream_body({sendfile, 0, Size, AppFile}, nofin, Req1),
+			cowboy_req:stream_body(Data, fin, Req1),
+			Req1;
+		<<"sendfile_fin">> ->
+			Data = lists:duplicate(10000, $a),
+			AppFile = code:where_is_file("cowboy.app"),
+			Size = filelib:file_size(AppFile),
+			Req1 = cowboy_req:stream_reply(200, Req0),
+			%% We send a few files interspersed into other data.
+			cowboy_req:stream_body(Data, nofin, Req1),
+			cowboy_req:stream_body({sendfile, 0, Size, AppFile}, nofin, Req1),
+			cowboy_req:stream_body(Data, nofin, Req1),
+			cowboy_req:stream_body({sendfile, 0, Size, AppFile}, fin, Req1),
+			Req1
 	end,
 	{ok, Req, State}.
 

+ 17 - 0
test/handlers/resp_h.erl

@@ -219,6 +219,23 @@ do(<<"stream_body">>, Req0, Opts) ->
 			Req = cowboy_req:stream_reply(200, Req0),
 			cowboy_req:stream_body(<<"Hello world!">>, nofin, Req),
 			{ok, Req, Opts};
+		<<"sendfile">> ->
+			AppFile = code:where_is_file("cowboy.app"),
+			AppSize = filelib:file_size(AppFile),
+			Req = cowboy_req:stream_reply(200, Req0),
+			cowboy_req:stream_body(<<"Hello ">>, nofin, Req),
+			cowboy_req:stream_body({sendfile, 0, AppSize, AppFile}, nofin, Req),
+			cowboy_req:stream_body(<<" interspersed ">>, nofin, Req),
+			cowboy_req:stream_body({sendfile, 0, AppSize, AppFile}, nofin, Req),
+			cowboy_req:stream_body(<<" world!">>, fin, Req),
+			{ok, Req, Opts};
+		<<"sendfile_fin">> ->
+			AppFile = code:where_is_file("cowboy.app"),
+			AppSize = filelib:file_size(AppFile),
+			Req = cowboy_req:stream_reply(200, Req0),
+			cowboy_req:stream_body(<<"Hello! ">>, nofin, Req),
+			cowboy_req:stream_body({sendfile, 0, AppSize, AppFile}, fin, Req),
+			{ok, Req, Opts};
 		_ ->
 			%% Call stream_body without initiating streaming.
 			cowboy_req:stream_body(<<0:800000>>, fin, Req0),

+ 28 - 5
test/req_SUITE.erl

@@ -885,21 +885,44 @@ stream_reply3(Config) ->
 	{500, _, _} = do_get("/resp/stream_reply3/error", Config),
 	ok.
 
-stream_body_multiple(Config) ->
-	doc("Streamed body via multiple calls."),
-	{200, _, <<"Hello world!">>} = do_get("/resp/stream_body/multiple", Config),
-	ok.
-
 stream_body_fin0(Config) ->
 	doc("Streamed body with last chunk of size 0."),
 	{200, _, <<"Hello world!">>} = do_get("/resp/stream_body/fin0", Config),
 	ok.
 
+stream_body_multiple(Config) ->
+	doc("Streamed body via multiple calls."),
+	{200, _, <<"Hello world!">>} = do_get("/resp/stream_body/multiple", Config),
+	ok.
+
 stream_body_nofin(Config) ->
 	doc("Unfinished streamed body."),
 	{200, _, <<"Hello world!">>} = do_get("/resp/stream_body/nofin", Config),
 	ok.
 
+stream_body_sendfile(Config) ->
+	doc("Streamed body via multiple calls, including sendfile calls."),
+	{ok, AppFile} = file:read_file(code:where_is_file("cowboy.app")),
+	ExpectedBody = iolist_to_binary([
+		<<"Hello ">>,
+		AppFile,
+		<<" interspersed ">>,
+		AppFile,
+		<<" world!">>
+	]),
+	{200, _, ExpectedBody} = do_get("/resp/stream_body/sendfile", Config),
+	ok.
+
+stream_body_sendfile_fin(Config) ->
+	doc("Streamed body via multiple calls, including a sendfile final call."),
+	{ok, AppFile} = file:read_file(code:where_is_file("cowboy.app")),
+	ExpectedBody = iolist_to_binary([
+		<<"Hello! ">>,
+		AppFile
+	]),
+	{200, _, ExpectedBody} = do_get("/resp/stream_body/sendfile_fin", Config),
+	ok.
+
 stream_body_content_length_multiple(Config) ->
 	doc("Streamed body via multiple calls."),
 	{200, _, <<"Hello world!">>} = do_get("/resp/stream_body_content_length/multiple", Config),