Browse Source

Ensure we can stream the response body from any process

Loïc Hoguin 5 years ago
parent
commit
eaa052616f
4 changed files with 38 additions and 9 deletions
  1. 5 5
      src/cowboy_req.erl
  2. 13 4
      src/cowboy_stream_h.erl
  3. 15 0
      test/handlers/resp_h.erl
  4. 5 0
      test/req_SUITE.erl

+ 5 - 5
src/cowboy_req.erl

@@ -827,19 +827,19 @@ stream_body(_, _, #{method := <<"HEAD">>, has_sent_resp := headers}) ->
 stream_body({sendfile, _, 0, _}, nofin, _) ->
 	ok;
 stream_body({sendfile, _, 0, _}, IsFin=fin, Req=#{has_sent_resp := headers}) ->
-	stream_body({data, IsFin, <<>>}, Req);
+	stream_body({data, self(), IsFin, <<>>}, Req);
 stream_body({sendfile, O, B, P}, IsFin, Req=#{has_sent_resp := headers})
 		when is_integer(O), O >= 0, is_integer(B), B > 0 ->
-	stream_body({data, IsFin, {sendfile, O, B, P}}, Req);
+	stream_body({data, self(), IsFin, {sendfile, O, B, P}}, Req);
 stream_body(Data, IsFin=nofin, Req=#{has_sent_resp := headers})
 		when not is_tuple(Data) ->
 	case iolist_size(Data) of
 		0 -> ok;
-		_ -> stream_body({data, IsFin, Data}, Req)
+		_ -> stream_body({data, self(), IsFin, Data}, Req)
 	end;
 stream_body(Data, IsFin, Req=#{has_sent_resp := headers})
 		when not is_tuple(Data) ->
-	stream_body({data, IsFin, Data}, Req).
+	stream_body({data, self(), IsFin, Data}, Req).
 
 %% @todo Do we need a timeout?
 stream_body(Msg, #{pid := Pid, streamid := StreamID}) ->
@@ -850,7 +850,7 @@ stream_body(Msg, #{pid := Pid, streamid := StreamID}) ->
 stream_events(Event, IsFin, Req) when is_map(Event) ->
 	stream_events([Event], IsFin, Req);
 stream_events(Events, IsFin, Req=#{has_sent_resp := headers}) ->
-	stream_body({data, IsFin, cow_sse:events(Events)}, Req).
+	stream_body({data, self(), IsFin, cow_sse:events(Events)}, Req).
 
 -spec stream_trailers(cowboy:http_headers(), req()) -> ok.
 stream_trailers(Trailers, #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) ->

+ 13 - 4
src/cowboy_stream_h.erl

@@ -41,6 +41,7 @@
 	read_body_is_fin = nofin :: nofin | {fin, non_neg_integer()},
 	read_body_buffer = <<>> :: binary(),
 	body_length = 0 :: non_neg_integer(),
+	stream_body_pid = undefined :: pid() | undefined,
 	stream_body_status = normal :: normal | blocking | blocked
 }).
 
@@ -225,28 +226,36 @@ info(StreamID, Headers={headers, _, _}, State) ->
 	do_info(StreamID, Headers, [Headers], State#state{expect=undefined});
 %% Sending data involves the data message, the stream_buffer_full alarm
 %% and the connection_buffer_full alarm. We stop sending acks when an alarm is on.
-info(StreamID, Data={data, _, _}, State0=#state{pid=Pid, stream_body_status=Status}) ->
+%%
+%% We only apply backpressure when the message includes a pid. Otherwise
+%% it is a message from Cowboy, or the user circumventing the backpressure.
+%%
+%% We currently do not support sending data from multiple processes concurrently.
+info(StreamID, Data={data, _, _}, State) ->
+	do_info(StreamID, Data, [Data], State);
+info(StreamID, Data0={data, Pid, _, _}, State0=#state{stream_body_status=Status}) ->
 	State = case Status of
 		normal ->
 			Pid ! {data_ack, self()},
 			State0;
 		blocking ->
-			State0#state{stream_body_status=blocked};
+			State0#state{stream_body_pid=Pid, stream_body_status=blocked};
 		blocked ->
 			State0
 	end,
+	Data = erlang:delete_element(2, Data0),
 	do_info(StreamID, Data, [Data], State);
 info(StreamID, Alarm={alarm, Name, on}, State)
 		when Name =:= connection_buffer_full; Name =:= stream_buffer_full ->
 	do_info(StreamID, Alarm, [], State#state{stream_body_status=blocking});
-info(StreamID, Alarm={alarm, Name, off}, State=#state{pid=Pid, stream_body_status=Status})
+info(StreamID, Alarm={alarm, Name, off}, State=#state{stream_body_pid=Pid, stream_body_status=Status})
 		when Name =:= connection_buffer_full; Name =:= stream_buffer_full ->
 	_ = case Status of
 		normal -> ok;
 		blocking -> ok;
 		blocked -> Pid ! {data_ack, self()}
 	end,
-	do_info(StreamID, Alarm, [], State#state{stream_body_status=normal});
+	do_info(StreamID, Alarm, [], State#state{stream_body_pid=undefined, stream_body_status=normal});
 info(StreamID, Trailers={trailers, _}, State) ->
 	do_info(StreamID, Trailers, [Trailers], State);
 info(StreamID, Push={push, _, _, _, _, _, _, _}, State) ->

+ 15 - 0
test/handlers/resp_h.erl

@@ -247,6 +247,21 @@ do(<<"stream_body">>, Req0, Opts) ->
 			cowboy_req:stream_body(<<"Hello! ">>, nofin, Req),
 			cowboy_req:stream_body({sendfile, 0, AppSize, AppFile}, fin, Req),
 			{ok, Req, Opts};
+		<<"spawn">> ->
+			Req = cowboy_req:stream_reply(200, Req0),
+			Parent = self(),
+			Pid = spawn(fun() ->
+				cowboy_req:stream_body(<<"Hello ">>, nofin, Req),
+				cowboy_req:stream_body(<<"world">>, nofin, Req),
+				cowboy_req:stream_body(<<"!">>, fin, Req),
+				Parent ! {self(), ok}
+			end),
+			receive
+				{Pid, ok} -> ok
+			after 5000 ->
+				error(timeout)
+			end,
+			{ok, Req, Opts};
 		_ ->
 			%% Call stream_body without initiating streaming.
 			cowboy_req:stream_body(<<0:800000>>, fin, Req0),

+ 5 - 0
test/req_SUITE.erl

@@ -942,6 +942,11 @@ stream_body_sendfile_fin(Config) ->
 	{200, _, ExpectedBody} = do_get("/resp/stream_body/sendfile_fin", Config),
 	ok.
 
+stream_body_spawn(Config) ->
+	doc("Confirm we can use cowboy_req:stream_body/3 from another process."),
+	{200, _, <<"Hello world!">>} = do_get("/resp/stream_body/spawn", Config),
+	ok.
+
 stream_body_content_length_multiple(Config) ->
 	doc("Streamed body via multiple calls."),
 	{200, _, <<"Hello world!">>} = do_get("/resp/stream_body_content_length/multiple", Config),