Browse Source

Add cowboy_req:set_resp_body_fun/2

This allows streaming a body without knowing the length in advance.
Also allows {stream, StreamFun} response body in the REST code.
Loïc Hoguin 12 years ago
parent
commit
faeb37ed80
4 changed files with 73 additions and 16 deletions
  1. 39 11
      src/cowboy_req.erl
  2. 4 2
      src/cowboy_rest.erl
  3. 22 0
      test/http_SUITE.erl
  4. 8 3
      test/http_handler_stream_body.erl

+ 39 - 11
src/cowboy_req.erl

@@ -89,6 +89,7 @@
 -export([set_resp_cookie/4]).
 -export([set_resp_cookie/4]).
 -export([set_resp_header/3]).
 -export([set_resp_header/3]).
 -export([set_resp_body/2]).
 -export([set_resp_body/2]).
+-export([set_resp_body_fun/2]).
 -export([set_resp_body_fun/3]).
 -export([set_resp_body_fun/3]).
 -export([has_resp_header/2]).
 -export([has_resp_header/2]).
 -export([has_resp_body/1]).
 -export([has_resp_body/1]).
@@ -157,7 +158,8 @@
 	%% Response.
 	%% Response.
 	resp_state = waiting :: locked | waiting | chunks | done,
 	resp_state = waiting :: locked | waiting | chunks | done,
 	resp_headers = [] :: cowboy_http:headers(),
 	resp_headers = [] :: cowboy_http:headers(),
-	resp_body = <<>> :: iodata() | {non_neg_integer(), resp_body_fun()},
+	resp_body = <<>> :: iodata() | resp_body_fun()
+		| {non_neg_integer(), resp_body_fun()},
 
 
 	%% Functions.
 	%% Functions.
 	onresponse = undefined :: undefined | cowboy_protocol:onresponse_fun()
 	onresponse = undefined :: undefined | cowboy_protocol:onresponse_fun()
@@ -821,20 +823,33 @@ set_resp_header(Name, Value, Req=#http_req{resp_headers=RespHeaders}) ->
 set_resp_body(Body, Req) ->
 set_resp_body(Body, Req) ->
 	Req#http_req{resp_body=Body}.
 	Req#http_req{resp_body=Body}.
 
 
+%% @doc Add a body stream function to the response.
+%%
+%% The body set here is ignored if the response is later sent using
+%% anything other than reply/2 or reply/3.
+%%
+%% Setting a response stream function without a length means that the
+%% body will be sent until the connection is closed. Cowboy will make
+%% sure that the connection is closed with no extra step required.
+%%
+%% To inform the client that a body has been sent with this request,
+%% Cowboy will add a "Transfer-Encoding: identity" header to the
+%% response.
+-spec set_resp_body_fun(resp_body_fun(), Req) -> Req when Req::req().
+set_resp_body_fun(StreamFun, Req) ->
+	Req#http_req{resp_body=StreamFun}.
+
 %% @doc Add a body function to the response.
 %% @doc Add a body function to the response.
 %%
 %%
-%% The response body may also be set to a content-length - stream-function pair.
-%% If the response body is of this type normal response headers will be sent.
-%% After the response headers has been sent the body function is applied.
-%% The body function is expected to write the response body directly to the
-%% socket using the transport module.
+%% The body set here is ignored if the response is later sent using
+%% anything other than reply/2 or reply/3.
 %%
 %%
-%% If the body function crashes while writing the response body or writes fewer
-%% bytes than declared the behaviour is undefined. The body set here is ignored
-%% if the response is later sent using anything other than `reply/2' or
-%% `reply/3'.
+%% Cowboy will call the given response stream function after sending the
+%% headers. This function must send the specified number of bytes to the
+%% socket it will receive as argument.
 %%
 %%
-%% @see cowboy_req:transport/1.
+%% If the body function crashes while writing the response body or writes
+%% fewer bytes than declared the behaviour is undefined.
 -spec set_resp_body_fun(non_neg_integer(), resp_body_fun(), Req)
 -spec set_resp_body_fun(non_neg_integer(), resp_body_fun(), Req)
 	-> Req when Req::req().
 	-> Req when Req::req().
 set_resp_body_fun(StreamLen, StreamFun, Req) ->
 set_resp_body_fun(StreamLen, StreamFun, Req) ->
@@ -884,7 +899,20 @@ reply(Status, Headers, Body, Req=#http_req{
 		_ -> []
 		_ -> []
 	end,
 	end,
 	case Body of
 	case Body of
+		BodyFun when is_function(BodyFun) ->
+			%% We stream the response body until we close the connection.
+			{RespType, Req2} = response(Status, Headers, RespHeaders, [
+					{<<"connection">>, <<"close">>},
+					{<<"date">>, cowboy_clock:rfc1123()},
+					{<<"server">>, <<"Cowboy">>},
+					{<<"transfer-encoding">>, <<"identity">>}
+				], <<>>, Req#http_req{connection=close}),
+			if	RespType =/= hook, Method =/= <<"HEAD">> ->
+					BodyFun(Socket, Transport);
+				true -> ok
+			end;
 		{ContentLength, BodyFun} ->
 		{ContentLength, BodyFun} ->
+			%% We stream the response body for ContentLength bytes.
 			{RespType, Req2} = response(Status, Headers, RespHeaders, [
 			{RespType, Req2} = response(Status, Headers, RespHeaders, [
 					{<<"content-length">>, integer_to_list(ContentLength)},
 					{<<"content-length">>, integer_to_list(ContentLength)},
 					{<<"date">>, cowboy_clock:rfc1123()},
 					{<<"date">>, cowboy_clock:rfc1123()},

+ 4 - 2
src/cowboy_rest.erl

@@ -796,8 +796,10 @@ set_resp_body(Req, State=#state{handler=Handler, handler_state=HandlerState,
 		{Body, Req6, HandlerState} ->
 		{Body, Req6, HandlerState} ->
 			State5 = State4#state{handler_state=HandlerState},
 			State5 = State4#state{handler_state=HandlerState},
 			Req7 = case Body of
 			Req7 = case Body of
-				{stream, Len, Fun1} ->
-					cowboy_req:set_resp_body_fun(Len, Fun1, Req6);
+				{stream, StreamFun} ->
+					cowboy_req:set_resp_body_fun(StreamFun, Req6);
+				{stream, Len, StreamFun} ->
+					cowboy_req:set_resp_body_fun(Len, StreamFun, Req6);
 				_Contents ->
 				_Contents ->
 					cowboy_req:set_resp_body(Body, Req6)
 					cowboy_req:set_resp_body(Body, Req6)
 			end,
 			end,

+ 22 - 0
test/http_SUITE.erl

@@ -68,6 +68,7 @@
 -export([static_test_file/1]).
 -export([static_test_file/1]).
 -export([static_test_file_css/1]).
 -export([static_test_file_css/1]).
 -export([stream_body_set_resp/1]).
 -export([stream_body_set_resp/1]).
+-export([stream_body_set_resp_close/1]).
 -export([te_chunked/1]).
 -export([te_chunked/1]).
 -export([te_chunked_delayed/1]).
 -export([te_chunked_delayed/1]).
 -export([te_identity/1]).
 -export([te_identity/1]).
@@ -117,6 +118,7 @@ groups() ->
 		static_test_file,
 		static_test_file,
 		static_test_file_css,
 		static_test_file_css,
 		stream_body_set_resp,
 		stream_body_set_resp,
+		stream_body_set_resp_close,
 		te_chunked,
 		te_chunked,
 		te_chunked_delayed,
 		te_chunked_delayed,
 		te_identity
 		te_identity
@@ -235,6 +237,10 @@ init_dispatch(Config) ->
 				[{body, <<"A flameless dance does not equal a cycle">>}]},
 				[{body, <<"A flameless dance does not equal a cycle">>}]},
 			{[<<"stream_body">>, <<"set_resp">>], http_handler_stream_body,
 			{[<<"stream_body">>, <<"set_resp">>], http_handler_stream_body,
 				[{reply, set_resp}, {body, <<"stream_body_set_resp">>}]},
 				[{reply, set_resp}, {body, <<"stream_body_set_resp">>}]},
+			{[<<"stream_body">>, <<"set_resp_close">>],
+				http_handler_stream_body, [
+					{reply, set_resp_close},
+					{body, <<"stream_body_set_resp_close">>}]},
 			{[<<"static">>, '...'], cowboy_static,
 			{[<<"static">>, '...'], cowboy_static,
 				[{directory, ?config(static_dir, Config)},
 				[{directory, ?config(static_dir, Config)},
 				 {mimetypes, [{<<".css">>, [<<"text/css">>]}]}]},
 				 {mimetypes, [{<<".css">>, [<<"text/css">>]}]}]},
@@ -892,6 +898,22 @@ stream_body_set_resp(Config) ->
 	{ok, <<"stream_body_set_resp">>, _}
 	{ok, <<"stream_body_set_resp">>, _}
 		= cowboy_client:response_body(Client3).
 		= cowboy_client:response_body(Client3).
 
 
+stream_body_set_resp_close(Config) ->
+	Client = ?config(client, Config),
+	{ok, Client2} = cowboy_client:request(<<"GET">>,
+		build_url("/stream_body/set_resp_close", Config), Client),
+	{ok, 200, _, Client3} = cowboy_client:response(Client2),
+	{ok, Transport, Socket} = cowboy_client:transport(Client3),
+	case element(7, Client3) of
+		<<"stream_body_set_resp_close">> ->
+			ok;
+		Buffer ->
+			{ok, Rest} = Transport:recv(Socket, 26 - size(Buffer), 1000),
+			<<"stream_body_set_resp_close">> = << Buffer/binary, Rest/binary >>,
+			ok
+	end,
+	{error, closed} = Transport:recv(Socket, 0, 1000).
+
 te_chunked(Config) ->
 te_chunked(Config) ->
 	Client = ?config(client, Config),
 	Client = ?config(client, Config),
 	Body = list_to_binary(io_lib:format("~p", [lists:seq(1, 100)])),
 	Body = list_to_binary(io_lib:format("~p", [lists:seq(1, 100)])),

+ 8 - 3
test/http_handler_stream_body.erl

@@ -12,10 +12,15 @@ init({_Transport, http}, Req, Opts) ->
 	Reply = proplists:get_value(reply, Opts),
 	Reply = proplists:get_value(reply, Opts),
 	{ok, Req, #state{headers=Headers, body=Body, reply=Reply}}.
 	{ok, Req, #state{headers=Headers, body=Body, reply=Reply}}.
 
 
-handle(Req, State=#state{headers=_Headers, body=Body, reply=set_resp}) ->
+handle(Req, State=#state{headers=_Headers, body=Body, reply=Reply}) ->
 	SFun = fun(Socket, Transport) -> Transport:send(Socket, Body) end,
 	SFun = fun(Socket, Transport) -> Transport:send(Socket, Body) end,
-	SLen = iolist_size(Body),
-	Req2 = cowboy_req:set_resp_body_fun(SLen, SFun, Req),
+	Req2 = case Reply of
+		set_resp ->
+			SLen = iolist_size(Body),
+			cowboy_req:set_resp_body_fun(SLen, SFun, Req);
+		set_resp_close ->
+			cowboy_req:set_resp_body_fun(SFun, Req)
+	end,
 	{ok, Req3} = cowboy_req:reply(200, Req2),
 	{ok, Req3} = cowboy_req:reply(200, Req2),
 	{ok, Req3, State}.
 	{ok, Req3, State}.