Browse Source

Implement backpressure on cowboy_req:stream_body

This should limit the amount of memory that Cowboy is using
when a handler is sending data much faster than the network.

The new max_stream_buffer_size is a soft limit and only has
an effect when the cowboy_stream_h handler is used.
Loïc Hoguin 5 years ago
parent
commit
49af57d546

+ 10 - 1
doc/src/manual/cowboy_http2.asciidoc

@@ -31,6 +31,7 @@ opts() :: #{
     max_encode_table_size          => non_neg_integer(),
     max_encode_table_size          => non_neg_integer(),
     max_frame_size_received        => 16384..16777215,
     max_frame_size_received        => 16384..16777215,
     max_frame_size_sent            => 16384..16777215 | infinity,
     max_frame_size_sent            => 16384..16777215 | infinity,
+    max_stream_buffer_size         => non_neg_integer(),
     max_stream_window_size         => 0..16#7fffffff,
     max_stream_window_size         => 0..16#7fffffff,
     preface_timeout                => timeout(),
     preface_timeout                => timeout(),
     proxy_header                   => boolean(),
     proxy_header                   => boolean(),
@@ -136,6 +137,12 @@ following the client's advertised maximum.
 Note that actual frame sizes may be lower than the limit when
 Note that actual frame sizes may be lower than the limit when
 there is not enough space left in the flow control window.
 there is not enough space left in the flow control window.
 
 
+max_stream_buffer_size (8000000)::
+
+Maximum stream buffer size in bytes. This is a soft limit used
+to apply backpressure to handlers that send data faster than
+the HTTP/2 connection allows.
+
 max_stream_window_size (16#7fffffff)::
 max_stream_window_size (16#7fffffff)::
 
 
 Maximum stream window size in bytes. This is used as an upper bound
 Maximum stream window size in bytes. This is used as an upper bound
@@ -186,7 +193,9 @@ too many `WINDOW_UPDATE` frames.
          `max_connection_window_size`, `max_stream_window_size`,
          `max_connection_window_size`, `max_stream_window_size`,
          `stream_window_margin_size` and
          `stream_window_margin_size` and
          `stream_window_update_threshold` to configure
          `stream_window_update_threshold` to configure
-         behavior on sending WINDOW_UPDATE frames.
+         behavior on sending WINDOW_UPDATE frames, and
+         `max_stream_buffer_size` to apply backpressure
+         when sending data too fast.
 * *2.6*: The `proxy_header` and `sendfile` options were added.
 * *2.6*: The `proxy_header` and `sendfile` options were added.
 * *2.4*: Add the options `initial_connection_window_size`,
 * *2.4*: Add the options `initial_connection_window_size`,
          `initial_stream_window_size`, `max_concurrent_streams`,
          `initial_stream_window_size`, `max_concurrent_streams`,

+ 33 - 3
src/cowboy_http2.erl

@@ -45,6 +45,7 @@
 	max_encode_table_size => non_neg_integer(),
 	max_encode_table_size => non_neg_integer(),
 	max_frame_size_received => 16384..16777215,
 	max_frame_size_received => 16384..16777215,
 	max_frame_size_sent => 16384..16777215 | infinity,
 	max_frame_size_sent => 16384..16777215 | infinity,
+	max_stream_buffer_size => non_neg_integer(),
 	max_stream_window_size => 0..16#7fffffff,
 	max_stream_window_size => 0..16#7fffffff,
 	metrics_callback => cowboy_metrics_h:metrics_callback(),
 	metrics_callback => cowboy_metrics_h:metrics_callback(),
 	middlewares => [module()],
 	middlewares => [module()],
@@ -292,7 +293,11 @@ frame(State=#state{http2_machine=HTTP2Machine0}, Frame) ->
 		{ok, GoAway={goaway, _, _, _}, HTTP2Machine} ->
 		{ok, GoAway={goaway, _, _, _}, HTTP2Machine} ->
 			goaway(State#state{http2_machine=HTTP2Machine}, GoAway);
 			goaway(State#state{http2_machine=HTTP2Machine}, GoAway);
 		{send, SendData, HTTP2Machine} ->
 		{send, SendData, HTTP2Machine} ->
-			send_data(maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame), SendData);
+			%% We may need to send an alarm for each of the streams sending data.
+			lists:foldl(
+				fun({StreamID, _, _}, S) -> maybe_send_data_alarm(S, HTTP2Machine0, StreamID) end,
+				send_data(maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame), SendData),
+				SendData);
 		{error, {stream_error, StreamID, Reason, Human}, HTTP2Machine} ->
 		{error, {stream_error, StreamID, Reason, Human}, HTTP2Machine} ->
 			reset_stream(State#state{http2_machine=HTTP2Machine},
 			reset_stream(State#state{http2_machine=HTTP2Machine},
 				StreamID, {stream_error, Reason, Human});
 				StreamID, {stream_error, Reason, Human});
@@ -712,7 +717,7 @@ maybe_send_data(State0=#state{http2_machine=HTTP2Machine0}, StreamID, IsFin, Dat
 	end,
 	end,
 	case cow_http2_machine:send_or_queue_data(StreamID, HTTP2Machine0, IsFin, Data) of
 	case cow_http2_machine:send_or_queue_data(StreamID, HTTP2Machine0, IsFin, Data) of
 		{ok, HTTP2Machine} ->
 		{ok, HTTP2Machine} ->
-			State0#state{http2_machine=HTTP2Machine};
+			maybe_send_data_alarm(State0#state{http2_machine=HTTP2Machine}, HTTP2Machine0, StreamID);
 		{send, SendData, HTTP2Machine} ->
 		{send, SendData, HTTP2Machine} ->
 			State = #state{http2_status=Status, streams=Streams}
 			State = #state{http2_status=Status, streams=Streams}
 				= send_data(State0#state{http2_machine=HTTP2Machine}, SendData),
 				= send_data(State0#state{http2_machine=HTTP2Machine}, SendData),
@@ -721,7 +726,7 @@ maybe_send_data(State0=#state{http2_machine=HTTP2Machine0}, StreamID, IsFin, Dat
 				Status =:= closing, Streams =:= #{} ->
 				Status =:= closing, Streams =:= #{} ->
 					terminate(State, {stop, normal, 'The connection is going away.'});
 					terminate(State, {stop, normal, 'The connection is going away.'});
 				true ->
 				true ->
-					State
+					maybe_send_data_alarm(State, HTTP2Machine0, StreamID)
 			end
 			end
 	end.
 	end.
 
 
@@ -759,6 +764,31 @@ send_data_frame(State=#state{socket=Socket, transport=Transport,
 	Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)),
 	Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)),
 	State#state{http2_machine=HTTP2Machine}.
 	State#state{http2_machine=HTTP2Machine}.
 
 
+%% After we have sent or queued data we may need to set or clear an alarm.
+%% We do this by comparing the HTTP2Machine buffer state before/after for
+%% the relevant streams.
+maybe_send_data_alarm(State=#state{opts=Opts, http2_machine=HTTP2Machine}, HTTP2Machine0, StreamID) ->
+	{ok, BufferSizeBefore} = cow_http2_machine:get_stream_local_buffer_size(StreamID, HTTP2Machine0),
+	%% When the stream ends up closed after it finished sending data,
+	%% we do not want to trigger an alarm. We act as if the buffer
+	%% size did not change.
+	BufferSizeAfter = case cow_http2_machine:get_stream_local_buffer_size(StreamID, HTTP2Machine) of
+		{ok, BSA} -> BSA;
+		{error, closed} -> BufferSizeBefore
+	end,
+	MaxBufferSize = maps:get(max_stream_buffer_size, Opts, 8000000),
+	%% I do not want to document these internal_events yet. I am not yet
+	%% convinced it should be {alarm, Name, on|off} and not {internal_event, E}
+	%% or something else entirely.
+	if
+		BufferSizeBefore >= MaxBufferSize, BufferSizeAfter < MaxBufferSize ->
+			info(State, StreamID, {alarm, stream_buffer_full, off});
+		BufferSizeBefore < MaxBufferSize, BufferSizeAfter >= MaxBufferSize ->
+			info(State, StreamID, {alarm, stream_buffer_full, on});
+		true ->
+			State
+	end.
+
 %% Terminate a stream or the connection.
 %% Terminate a stream or the connection.
 
 
 %% We may have to cancel streams even if we receive multiple
 %% We may have to cancel streams even if we receive multiple

+ 15 - 18
src/cowboy_req.erl

@@ -826,34 +826,31 @@ stream_body(_, _, #{method := <<"HEAD">>, has_sent_resp := headers}) ->
 %% is converted to a data tuple, however.
 %% is converted to a data tuple, however.
 stream_body({sendfile, _, 0, _}, nofin, _) ->
 stream_body({sendfile, _, 0, _}, nofin, _) ->
 	ok;
 	ok;
-stream_body({sendfile, _, 0, _}, IsFin=fin,
+stream_body({sendfile, _, 0, _}, IsFin=fin, Req=#{has_sent_resp := headers}) ->
-		#{pid := Pid, streamid := StreamID, has_sent_resp := headers}) ->
+	stream_body({data, IsFin, <<>>}, Req);
-	Pid ! {{Pid, StreamID}, {data, IsFin, <<>>}},
+stream_body({sendfile, O, B, P}, IsFin, Req=#{has_sent_resp := headers})
-	ok;
-stream_body({sendfile, O, B, P}, IsFin,
-		#{pid := Pid, streamid := StreamID, has_sent_resp := headers})
 		when is_integer(O), O >= 0, is_integer(B), B > 0 ->
 		when is_integer(O), O >= 0, is_integer(B), B > 0 ->
-	Pid ! {{Pid, StreamID}, {data, IsFin, {sendfile, O, B, P}}},
+	stream_body({data, IsFin, {sendfile, O, B, P}}, Req);
-	ok;
+stream_body(Data, IsFin=nofin, Req=#{has_sent_resp := headers})
-stream_body(Data, IsFin=nofin, #{pid := Pid, streamid := StreamID, has_sent_resp := headers})
 		when not is_tuple(Data) ->
 		when not is_tuple(Data) ->
 	case iolist_size(Data) of
 	case iolist_size(Data) of
 		0 -> ok;
 		0 -> ok;
-		_ ->
+		_ -> stream_body({data, IsFin, Data}, Req)
-			Pid ! {{Pid, StreamID}, {data, IsFin, Data}},
-			ok
 	end;
 	end;
-stream_body(Data, IsFin, #{pid := Pid, streamid := StreamID, has_sent_resp := headers})
+stream_body(Data, IsFin, Req=#{has_sent_resp := headers})
 		when not is_tuple(Data) ->
 		when not is_tuple(Data) ->
-	Pid ! {{Pid, StreamID}, {data, IsFin, Data}},
+	stream_body({data, IsFin, Data}, Req).
-	ok.
+
+%% @todo Do we need a timeout?
+stream_body(Msg, #{pid := Pid, streamid := StreamID}) ->
+	Pid ! {{Pid, StreamID}, Msg},
+	receive {data_ack, Pid} -> ok end.
 
 
 -spec stream_events(cow_sse:event() | [cow_sse:event()], fin | nofin, req()) -> ok.
 -spec stream_events(cow_sse:event() | [cow_sse:event()], fin | nofin, req()) -> ok.
 stream_events(Event, IsFin, Req) when is_map(Event) ->
 stream_events(Event, IsFin, Req) when is_map(Event) ->
 	stream_events([Event], IsFin, Req);
 	stream_events([Event], IsFin, Req);
-stream_events(Events, IsFin, #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) ->
+stream_events(Events, IsFin, Req=#{has_sent_resp := headers}) ->
-	Pid ! {{Pid, StreamID}, {data, IsFin, cow_sse:events(Events)}},
+	stream_body({data, IsFin, cow_sse:events(Events)}, Req).
-	ok.
 
 
 -spec stream_trailers(cowboy:http_headers(), req()) -> ok.
 -spec stream_trailers(cowboy:http_headers(), req()) -> ok.
 stream_trailers(Trailers, #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) ->
 stream_trailers(Trailers, #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) ->

+ 22 - 2
src/cowboy_stream_h.erl

@@ -39,7 +39,8 @@
 	read_body_length = 0 :: non_neg_integer() | infinity | auto,
 	read_body_length = 0 :: non_neg_integer() | infinity | auto,
 	read_body_is_fin = nofin :: nofin | {fin, non_neg_integer()},
 	read_body_is_fin = nofin :: nofin | {fin, non_neg_integer()},
 	read_body_buffer = <<>> :: binary(),
 	read_body_buffer = <<>> :: binary(),
-	body_length = 0 :: non_neg_integer()
+	body_length = 0 :: non_neg_integer(),
+	stream_body_status = normal :: normal | blocking | blocked
 }).
 }).
 
 
 %% @todo For shutting down children we need to have a timeout before we terminate
 %% @todo For shutting down children we need to have a timeout before we terminate
@@ -219,8 +220,27 @@ info(StreamID, Response={response, _, _, _}, State) ->
 	do_info(StreamID, Response, [Response], State#state{expect=undefined});
 	do_info(StreamID, Response, [Response], State#state{expect=undefined});
 info(StreamID, Headers={headers, _, _}, State) ->
 info(StreamID, Headers={headers, _, _}, State) ->
 	do_info(StreamID, Headers, [Headers], State#state{expect=undefined});
 	do_info(StreamID, Headers, [Headers], State#state{expect=undefined});
-info(StreamID, Data={data, _, _}, State) ->
+%% Sending data involves the data message and the stream_buffer_full alarm.
+%% We stop sending acks when the alarm is on.
+info(StreamID, Data={data, _, _}, State0=#state{pid=Pid, stream_body_status=Status}) ->
+	State = case Status of
+		normal ->
+			Pid ! {data_ack, self()},
+			State0;
+		blocking ->
+			State0#state{stream_body_status=blocked};
+		blocked ->
+			State0
+	end,
 	do_info(StreamID, Data, [Data], State);
 	do_info(StreamID, Data, [Data], State);
+info(StreamID, Alarm={alarm, stream_buffer_full, on}, State) ->
+	do_info(StreamID, Alarm, [], State#state{stream_body_status=blocking});
+info(StreamID, Alarm={alarm, stream_buffer_full, off}, State=#state{pid=Pid, stream_body_status=Status}) ->
+	_ = case Status of
+		blocking -> ok;
+		blocked -> Pid ! {data_ack, self()}
+	end,
+	do_info(StreamID, Alarm, [], State#state{stream_body_status=normal});
 info(StreamID, Trailers={trailers, _}, State) ->
 info(StreamID, Trailers={trailers, _}, State) ->
 	do_info(StreamID, Trailers, [Trailers], State);
 	do_info(StreamID, Trailers, [Trailers], State);
 info(StreamID, Push={push, _, _, _, _, _, _, _}, State) ->
 info(StreamID, Push={push, _, _, _, _, _, _, _}, State) ->

+ 5 - 0
test/handlers/resp_h.erl

@@ -221,6 +221,11 @@ do(<<"stream_body">>, Req0, Opts) ->
 			cowboy_req:stream_body(<<"world">>, nofin, Req),
 			cowboy_req:stream_body(<<"world">>, nofin, Req),
 			cowboy_req:stream_body(<<"!">>, fin, Req),
 			cowboy_req:stream_body(<<"!">>, fin, Req),
 			{ok, Req, Opts};
 			{ok, Req, Opts};
+		<<"loop">> ->
+			Req = cowboy_req:stream_reply(200, Req0),
+			_ = [cowboy_req:stream_body(<<0:1000000/unit:8>>, nofin, Req)
+				|| _ <- lists:seq(1, 32)],
+			{ok, Req, Opts};
 		<<"nofin">> ->
 		<<"nofin">> ->
 			Req = cowboy_req:stream_reply(200, Req0),
 			Req = cowboy_req:stream_reply(200, Req0),
 			cowboy_req:stream_body(<<"Hello world!">>, nofin, Req),
 			cowboy_req:stream_body(<<"Hello world!">>, nofin, Req),

+ 6 - 1
test/req_SUITE.erl

@@ -98,7 +98,7 @@ do_get(Path, Headers, Config) ->
 	Ref = gun:get(ConnPid, Path, [{<<"accept-encoding">>, <<"gzip">>}|Headers]),
 	Ref = gun:get(ConnPid, Path, [{<<"accept-encoding">>, <<"gzip">>}|Headers]),
 	{response, IsFin, Status, RespHeaders} = gun:await(ConnPid, Ref),
 	{response, IsFin, Status, RespHeaders} = gun:await(ConnPid, Ref),
 	{ok, RespBody} = case IsFin of
 	{ok, RespBody} = case IsFin of
-		nofin -> gun:await_body(ConnPid, Ref);
+		nofin -> gun:await_body(ConnPid, Ref, 30000);
 		fin -> {ok, <<>>}
 		fin -> {ok, <<>>}
 	end,
 	end,
 	gun:close(ConnPid),
 	gun:close(ConnPid),
@@ -891,6 +891,11 @@ stream_body_multiple(Config) ->
 	{200, _, <<"Hello world!">>} = do_get("/resp/stream_body/multiple", Config),
 	{200, _, <<"Hello world!">>} = do_get("/resp/stream_body/multiple", Config),
 	ok.
 	ok.
 
 
+stream_body_loop(Config) ->
+	doc("Streamed body via a fast loop."),
+	{200, _, <<0:32000000/unit:8>>} = do_get("/resp/stream_body/loop", Config),
+	ok.
+
 stream_body_nofin(Config) ->
 stream_body_nofin(Config) ->
 	doc("Unfinished streamed body."),
 	doc("Unfinished streamed body."),
 	{200, _, <<"Hello world!">>} = do_get("/resp/stream_body/nofin", Config),
 	{200, _, <<"Hello world!">>} = do_get("/resp/stream_body/nofin", Config),