|
@@ -121,7 +121,7 @@ init(Parent, Ref, Socket, Transport, Opts) ->
|
|
|
case Transport:peername(Socket) of
|
|
|
{ok, Peer} ->
|
|
|
LastStreamID = maps:get(max_keepalive, Opts, 100),
|
|
|
- before_loop(set_request_timeout(#state{
|
|
|
+ before_loop(set_timeout(#state{
|
|
|
parent=Parent, ref=Ref, socket=Socket,
|
|
|
transport=Transport, opts=Opts,
|
|
|
peer=Peer, last_streamid=LastStreamID}), <<>>);
|
|
@@ -150,12 +150,17 @@ before_loop(State=#state{socket=Socket, transport=Transport}, Buffer) ->
|
|
|
loop(State, Buffer).
|
|
|
|
|
|
loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
|
|
|
- timer=TimerRef, children=Children}, Buffer) ->
|
|
|
+ timer=TimerRef, children=Children, streams=Streams}, Buffer) ->
|
|
|
{OK, Closed, Error} = Transport:messages(),
|
|
|
receive
|
|
|
%% Socket messages.
|
|
|
{OK, Socket, Data} ->
|
|
|
- parse(<< Buffer/binary, Data/binary >>, State);
|
|
|
+ %% Only reset the timeout if it is idle_timeout (active streams).
|
|
|
+ State1 = case Streams of
|
|
|
+ [] -> State;
|
|
|
+ _ -> set_timeout(State)
|
|
|
+ end,
|
|
|
+ parse(<< Buffer/binary, Data/binary >>, State1);
|
|
|
{Closed, Socket} ->
|
|
|
terminate(State, {socket_error, closed, 'The socket has been closed.'});
|
|
|
{Error, Socket, Reason} ->
|
|
@@ -200,13 +205,19 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
|
|
|
terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
|
|
|
end.
|
|
|
|
|
|
-set_request_timeout(State0=#state{opts=Opts}) ->
|
|
|
- State = cancel_request_timeout(State0),
|
|
|
- Timeout = maps:get(request_timeout, Opts, 5000),
|
|
|
- TimerRef = erlang:start_timer(Timeout, self(), request_timeout),
|
|
|
+%% We set request_timeout when there are no active streams,
|
|
|
+%% and idle_timeout otherwise.
|
|
|
+set_timeout(State0=#state{opts=Opts, streams=Streams}) ->
|
|
|
+ State = cancel_timeout(State0),
|
|
|
+ {Name, Default} = case Streams of
|
|
|
+ [] -> {request_timeout, 5000};
|
|
|
+ _ -> {idle_timeout, 60000}
|
|
|
+ end,
|
|
|
+ Timeout = maps:get(Name, Opts, Default),
|
|
|
+ TimerRef = erlang:start_timer(Timeout, self(), Name),
|
|
|
State#state{timer=TimerRef}.
|
|
|
|
|
|
-cancel_request_timeout(State=#state{timer=TimerRef}) ->
|
|
|
+cancel_timeout(State=#state{timer=TimerRef}) ->
|
|
|
ok = case TimerRef of
|
|
|
undefined -> ok;
|
|
|
_ -> erlang:cancel_timer(TimerRef, [{async, true}, {info, false}])
|
|
@@ -214,15 +225,15 @@ cancel_request_timeout(State=#state{timer=TimerRef}) ->
|
|
|
State#state{timer=undefined}.
|
|
|
|
|
|
-spec timeout(_, _) -> no_return().
|
|
|
-%% @todo Honestly it would be much better if we didn't enable pipelining yet.
|
|
|
timeout(State=#state{in_state=#ps_request_line{}}, request_timeout) ->
|
|
|
- %% @todo If other streams are running, just set the connection to be closed
|
|
|
- %% and stop trying to read from the socket?
|
|
|
- terminate(State, {connection_error, timeout, 'No request-line received before timeout.'});
|
|
|
+ terminate(State, {connection_error, timeout,
|
|
|
+ 'No request-line received before timeout.'});
|
|
|
timeout(State=#state{in_state=#ps_header{}}, request_timeout) ->
|
|
|
- %% @todo If other streams are running, maybe wait for their reply before sending 408?
|
|
|
- %% -> Definitely. Either way, stop reading from the socket and make that stream the last.
|
|
|
- error_terminate(408, State, {connection_error, timeout, 'Request headers not received before timeout.'}).
|
|
|
+ error_terminate(408, State, {connection_error, timeout,
|
|
|
+ 'Request headers not received before timeout.'});
|
|
|
+timeout(State, idle_timeout) ->
|
|
|
+ terminate(State, {connection_error, timeout,
|
|
|
+ 'Connection idle longer than configuration allows.'}).
|
|
|
|
|
|
%% Request-line.
|
|
|
parse(<<>>, State) ->
|
|
@@ -249,10 +260,11 @@ after_parse({request, Req=#{streamid := StreamID, headers := Headers, version :=
|
|
|
try cowboy_stream:init(StreamID, Req, Opts) of
|
|
|
{Commands, StreamState} ->
|
|
|
Streams = [#stream{id=StreamID, state=StreamState, version=Version}|Streams0],
|
|
|
- State = case maybe_req_close(State0, Headers, Version) of
|
|
|
+ State1 = case maybe_req_close(State0, Headers, Version) of
|
|
|
close -> State0#state{streams=Streams, last_streamid=StreamID};
|
|
|
keepalive -> State0#state{streams=Streams}
|
|
|
end,
|
|
|
+ State = set_timeout(State1),
|
|
|
parse(Buffer, commands(State, StreamID, Commands))
|
|
|
catch Class:Reason ->
|
|
|
error_logger:error_msg("Exception occurred in "
|
|
@@ -617,13 +629,13 @@ request(Buffer, State0=#state{ref=Ref, transport=Transport, peer=Peer, in_stream
|
|
|
false ->
|
|
|
State = case HasBody of
|
|
|
true ->
|
|
|
- cancel_request_timeout(State0#state{in_state=#ps_body{
|
|
|
+ State0#state{in_state=#ps_body{
|
|
|
%% @todo Don't need length anymore?
|
|
|
transfer_decode_fun = TDecodeFun,
|
|
|
transfer_decode_state = TDecodeState
|
|
|
- }});
|
|
|
+ }};
|
|
|
false ->
|
|
|
- set_request_timeout(State0#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}})
|
|
|
+ State0#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}
|
|
|
end,
|
|
|
{request, Req, State, Buffer};
|
|
|
{true, HTTP2Settings} ->
|
|
@@ -661,7 +673,7 @@ http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Tran
|
|
|
opts=Opts, peer=Peer}, Buffer) ->
|
|
|
case Transport:secure() of
|
|
|
false ->
|
|
|
- _ = cancel_request_timeout(State),
|
|
|
+ _ = cancel_timeout(State),
|
|
|
cowboy_http2:init(Parent, Ref, Socket, Transport, Opts, Peer, Buffer);
|
|
|
true ->
|
|
|
error_terminate(400, State, {connection_error, protocol_error,
|
|
@@ -676,7 +688,7 @@ http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Tran
|
|
|
%% Always half-closed stream coming from this side.
|
|
|
try cow_http_hd:parse_http2_settings(HTTP2Settings) of
|
|
|
Settings ->
|
|
|
- _ = cancel_request_timeout(State),
|
|
|
+ _ = cancel_timeout(State),
|
|
|
cowboy_http2:init(Parent, Ref, Socket, Transport, Opts, Peer, Buffer, Settings, Req)
|
|
|
catch _:_ ->
|
|
|
error_terminate(400, State, {connection_error, protocol_error,
|
|
@@ -705,10 +717,10 @@ parse_body(Buffer, State=#state{in_streamid=StreamID, in_state=
|
|
|
{data, StreamID, nofin, Data, State#state{in_state=
|
|
|
PS#ps_body{transfer_decode_state=TState}}, Rest};
|
|
|
{done, TotalLength, Rest} ->
|
|
|
- {data, StreamID, {fin, TotalLength}, <<>>, set_request_timeout(
|
|
|
+ {data, StreamID, {fin, TotalLength}, <<>>, set_timeout(
|
|
|
State#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}), Rest};
|
|
|
{done, Data, TotalLength, Rest} ->
|
|
|
- {data, StreamID, {fin, TotalLength}, Data, set_request_timeout(
|
|
|
+ {data, StreamID, {fin, TotalLength}, Data, set_timeout(
|
|
|
State#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}), Rest}
|
|
|
end.
|
|
|
|
|
@@ -857,7 +869,7 @@ commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transpor
|
|
|
[{switch_protocol, Headers, Protocol, InitialState}|_Tail]) ->
|
|
|
%% @todo This should be the last stream running otherwise we need to wait before switching.
|
|
|
%% @todo If there's streams opened after this one, fail instead of 101.
|
|
|
- State = cancel_request_timeout(State0),
|
|
|
+ State = cancel_timeout(State0),
|
|
|
%% @todo When we actually do the upgrade, we only have the one stream left, plus
|
|
|
%% possibly some processes terminating. We need a smart strategy for handling the
|
|
|
%% children shutdown. We can start with brutal_kill and discarding the EXIT messages
|
|
@@ -918,7 +930,7 @@ stream_terminate(State0=#state{socket=Socket, transport=Transport,
|
|
|
streams=Streams0, children=Children0}, StreamID, Reason) ->
|
|
|
{value, #stream{state=StreamState, version=Version}, Streams}
|
|
|
= lists:keytake(StreamID, #stream.id, Streams0),
|
|
|
- State = case OutState of
|
|
|
+ State1 = case OutState of
|
|
|
wait ->
|
|
|
info(State0, StreamID, {response, 204, #{}, <<>>});
|
|
|
chunked when Version =:= 'HTTP/1.1' ->
|
|
@@ -927,6 +939,11 @@ stream_terminate(State0=#state{socket=Socket, transport=Transport,
|
|
|
_ -> %% done or Version =:= 'HTTP/1.0'
|
|
|
State0
|
|
|
end,
|
|
|
+ %% We reset the timeout if there are no active streams anymore.
|
|
|
+ State = case Streams of
|
|
|
+ [] -> set_timeout(State1);
|
|
|
+ _ -> State1
|
|
|
+ end,
|
|
|
|
|
|
stream_call_terminate(StreamID, Reason, StreamState),
|
|
|
%% @todo initiate children shutdown
|