Browse Source

Add chunked response body fun

Adds a new type of streaming response fun. It can be set in a similar
way to a streaming body fun with known length:

Req2 = cowboy_req:set_resp_body_fun(chunked, StreamFun, Req)

The fun, StreamFun, should accept a fun as its single argument. This
fun, ChunkFun, is used to send chunks of iodata:

ok = ChunkFun(IoData)

ChunkFun should not be called with an empty binary or iolist as this
will cause HTTP 1.1 clients to believe the stream is over. The final (0
length) chunk will be sent automatically - even if it has already been
sent - assuming no exception is raised.

Also note that the connection will close after the last chunk for HTTP
1.0 clients.
James Fish 12 years ago
parent
commit
c8242ab396
4 changed files with 102 additions and 19 deletions
  1. 48 18
      src/cowboy_req.erl
  2. 2 0
      src/cowboy_rest.erl
  3. 47 0
      test/http_SUITE.erl
  4. 5 1
      test/http_SUITE_data/http_stream_body.erl

+ 48 - 18
src/cowboy_req.erl

@@ -122,6 +122,8 @@
 -export_type([cookie_opts/0]).
 
 -type resp_body_fun() :: fun((inet:socket(), module()) -> ok).
+-type send_chunk_fun() :: fun((iodata()) -> ok | {error, atom()}).
+-type resp_chunked_fun() :: fun((send_chunk_fun()) -> ok).
 
 -record(http_req, {
 	%% Transport.
@@ -159,7 +161,8 @@
 	resp_state = waiting :: locked | waiting | chunks | done,
 	resp_headers = [] :: cowboy_http:headers(),
 	resp_body = <<>> :: iodata() | resp_body_fun()
-		| {non_neg_integer(), resp_body_fun()},
+		| {non_neg_integer(), resp_body_fun()}
+		| {chunked, resp_chunked_fun()},
 
 	%% Functions.
 	onresponse = undefined :: undefined | already_called
@@ -892,10 +895,15 @@ set_resp_body_fun(StreamFun, Req) when is_function(StreamFun) ->
 %% If the body function crashes while writing the response body or writes
 %% fewer bytes than declared the behaviour is undefined.
 -spec set_resp_body_fun(non_neg_integer(), resp_body_fun(), Req)
+	-> Req when Req::req();
+	(chunked, resp_chunked_fun(), Req)
 	-> Req when Req::req().
 set_resp_body_fun(StreamLen, StreamFun, Req)
 		when is_integer(StreamLen), is_function(StreamFun) ->
-	Req#http_req{resp_body={StreamLen, StreamFun}}.
+	Req#http_req{resp_body={StreamLen, StreamFun}};
+set_resp_body_fun(chunked, StreamFun, Req)
+		when is_function(StreamFun) ->
+	Req#http_req{resp_body={chunked, StreamFun}}.
 
 %% @doc Return whether the given header has been set for the response.
 -spec has_resp_header(binary(), req()) -> boolean().
@@ -906,6 +914,8 @@ has_resp_header(Name, #http_req{resp_headers=RespHeaders}) ->
 -spec has_resp_body(req()) -> boolean().
 has_resp_body(#http_req{resp_body=RespBody}) when is_function(RespBody) ->
 	true;
+has_resp_body(#http_req{resp_body={chunked, _}}) ->
+	true;
 has_resp_body(#http_req{resp_body={Length, _}}) ->
 	Length > 0;
 has_resp_body(#http_req{resp_body=RespBody}) ->
@@ -957,6 +967,20 @@ reply(Status, Headers, Body, Req=#http_req{
 				true -> ok
 			end,
 			Req2#http_req{connection=RespConn};
+		{chunked, BodyFun} ->
+			%% We stream the response body in chunks.
+			{RespType, Req2} = chunked_response(Status, Headers, Req),
+			if	RespType =/= hook, Method =/= <<"HEAD">> ->
+					ChunkFun = fun(IoData) -> chunk(IoData, Req2) end,
+					BodyFun(ChunkFun),
+					%% Terminate the chunked body for HTTP/1.1 only.
+					_ = case Version of
+						{1, 0} -> ok;
+						_ -> Transport:send(Socket, <<"0\r\n\r\n">>)
+					end;
+				true -> ok
+			end,
+			Req2;
 		{ContentLength, BodyFun} ->
 			%% We stream the response body for ContentLength bytes.
 			RespConn = response_connection(Headers, Connection),
@@ -1035,22 +1059,9 @@ chunked_reply(Status, Req) ->
 %% @see cowboy_req:chunk/2
 -spec chunked_reply(cowboy_http:status(), cowboy_http:headers(), Req)
 	-> {ok, Req} when Req::req().
-chunked_reply(Status, Headers, Req=#http_req{
-		version=Version, connection=Connection,
-		resp_state=waiting, resp_headers=RespHeaders}) ->
-	RespConn = response_connection(Headers, Connection),
-	HTTP11Headers = case Version of
-		{1, 1} -> [
-			{<<"connection">>, atom_to_connection(Connection)},
-			{<<"transfer-encoding">>, <<"chunked">>}];
-		_ -> []
-	end,
-	{_, Req2} = response(Status, Headers, RespHeaders, [
-		{<<"date">>, cowboy_clock:rfc1123()},
-		{<<"server">>, <<"Cowboy">>}
-	|HTTP11Headers], <<>>, Req),
-	{ok, Req2#http_req{connection=RespConn, resp_state=chunks,
-		resp_headers=[], resp_body= <<>>}}.
+chunked_reply(Status, Headers, Req) ->
+	{_, Req2} = chunked_response(Status, Headers, Req),
+	{ok, Req2}.
 
 %% @doc Send a chunk of data.
 %%
@@ -1205,6 +1216,25 @@ to_list(Req) ->
 
 %% Internal.
 
+-spec chunked_response(cowboy_http:status(), cowboy_http:headers(), Req) ->
+	{normal | hook, Req} when Req::req().
+chunked_response(Status, Headers, Req=#http_req{
+		version=Version, connection=Connection,
+		resp_state=waiting, resp_headers=RespHeaders}) ->
+	RespConn = response_connection(Headers, Connection),
+	HTTP11Headers = case Version of
+		{1, 1} -> [
+			{<<"connection">>, atom_to_connection(Connection)},
+			{<<"transfer-encoding">>, <<"chunked">>}];
+		_ -> []
+	end,
+	{RespType, Req2} = response(Status, Headers, RespHeaders, [
+		{<<"date">>, cowboy_clock:rfc1123()},
+		{<<"server">>, <<"Cowboy">>}
+	|HTTP11Headers], <<>>, Req),
+	{RespType, Req2#http_req{connection=RespConn, resp_state=chunks,
+			resp_headers=[], resp_body= <<>>}}.
+
 -spec response(cowboy_http:status(), cowboy_http:headers(),
 	cowboy_http:headers(), cowboy_http:headers(), iodata(), Req)
 	-> {normal | hook, Req} when Req::req().

+ 2 - 0
src/cowboy_rest.erl

@@ -945,6 +945,8 @@ set_resp_body(Req, State=#state{handler=Handler, handler_state=HandlerState,
 					cowboy_req:set_resp_body_fun(StreamFun, Req2);
 				{stream, Len, StreamFun} ->
 					cowboy_req:set_resp_body_fun(Len, StreamFun, Req2);
+				{chunked, StreamFun} ->
+					cowboy_req:set_resp_body_fun(chunked, StreamFun, Req2);
 				_Contents ->
 					cowboy_req:set_resp_body(Body, Req2)
 			end,

+ 47 - 0
test/http_SUITE.erl

@@ -82,6 +82,8 @@
 -export([static_test_file_css/1]).
 -export([stream_body_set_resp/1]).
 -export([stream_body_set_resp_close/1]).
+-export([stream_body_set_resp_chunked/1]).
+-export([stream_body_set_resp_chunked10/1]).
 -export([te_chunked/1]).
 -export([te_chunked_chopped/1]).
 -export([te_chunked_delayed/1]).
@@ -153,6 +155,8 @@ groups() ->
 		static_test_file_css,
 		stream_body_set_resp,
 		stream_body_set_resp_close,
+		stream_body_set_resp_chunked,
+		stream_body_set_resp_chunked10,
 		te_chunked,
 		te_chunked_chopped,
 		te_chunked_delayed,
@@ -338,6 +342,10 @@ init_dispatch(Config) ->
 				http_stream_body, [
 					{reply, set_resp_close},
 					{body, <<"stream_body_set_resp_close">>}]},
+			{"/stream_body/set_resp_chunked",
+				http_stream_body, [
+					{reply, set_resp_chunked},
+					{body, [<<"stream_body">>, <<"_set_resp_chunked">>]}]},
 			{"/static/[...]", cowboy_static,
 				[{directory, ?config(static_dir, Config)},
 				 {mimetypes, [{<<".css">>, [<<"text/css">>]}]}]},
@@ -1211,6 +1219,45 @@ stream_body_set_resp_close(Config) ->
 	end,
 	{error, closed} = Transport:recv(Socket, 0, 1000).
 
+stream_body_set_resp_chunked(Config) ->
+	Client = ?config(client, Config),
+	{ok, Client2} = cowboy_client:request(<<"GET">>,
+		build_url("/stream_body/set_resp_chunked", Config), Client),
+	{ok, 200, Headers, Client3} = cowboy_client:response(Client2),
+	{_, <<"chunked">>} = lists:keyfind(<<"transfer-encoding">>, 1, Headers),
+	{ok, Transport, Socket} = cowboy_client:transport(Client3),
+	case element(7, Client3) of
+		<<"B\r\nstream_body\r\n11\r\n_set_resp_chunked\r\n0\r\n\r\n">> ->
+			ok;
+		Buffer ->
+			{ok, Rest} = Transport:recv(Socket, 44 - byte_size(Buffer), 1000),
+			<<"B\r\nstream_body\r\n11\r\n_set_resp_chunked\r\n0\r\n\r\n">>
+				= <<Buffer/binary, Rest/binary>>,
+			ok
+	end.
+
+stream_body_set_resp_chunked10(Config) ->
+	Client = ?config(client, Config),
+	Transport = ?config(transport, Config),
+	{ok, Client2} = cowboy_client:connect(
+		Transport, "localhost", ?config(port, Config), Client),
+	Data = ["GET /stream_body/set_resp_chunked HTTP/1.0\r\n",
+		"Host: localhost\r\n\r\n"],
+	{ok, Client3} = cowboy_client:raw_request(Data, Client2),
+	{ok, 200, Headers, Client4} = cowboy_client:response(Client3),
+	false = lists:keymember(<<"transfer-encoding">>, 1, Headers),
+	{ok, Transport, Socket} = cowboy_client:transport(Client4),
+	case element(7, Client4) of
+		<<"stream_body_set_resp_chunked">> ->
+			ok;
+		Buffer ->
+			{ok, Rest} = Transport:recv(Socket, 28 - byte_size(Buffer), 1000),
+			<<"stream_body_set_resp_chunked">>
+				= <<Buffer/binary, Rest/binary>>,
+			ok
+	end,
+	{error, closed} = Transport:recv(Socket, 0, 1000).
+
 te_chunked(Config) ->
 	Client = ?config(client, Config),
 	Body = list_to_binary(io_lib:format("~p", [lists:seq(1, 100)])),

+ 5 - 1
test/http_SUITE_data/http_stream_body.erl

@@ -19,7 +19,11 @@ handle(Req, State=#state{headers=_Headers, body=Body, reply=Reply}) ->
 			SLen = iolist_size(Body),
 			cowboy_req:set_resp_body_fun(SLen, SFun, Req);
 		set_resp_close ->
-			cowboy_req:set_resp_body_fun(SFun, Req)
+			cowboy_req:set_resp_body_fun(SFun, Req);
+		set_resp_chunked ->
+			%% Here Body should be a list of chunks, not a binary.
+			SFun2 = fun(SendFun) -> lists:foreach(SendFun, Body) end,
+			cowboy_req:set_resp_body_fun(chunked, SFun2, Req)
 	end,
 	{ok, Req3} = cowboy_req:reply(200, Req2),
 	{ok, Req3, State}.