Browse Source

Allow overriding cowboy_http's idle_timeout per request

This allows requests that expect to run longer to do so
without impacting the configuration of other requests.
Loïc Hoguin 6 years ago
parent
commit
1949357f0c
3 changed files with 106 additions and 7 deletions
  1. 22 4
      src/cowboy_http.erl
  2. 22 0
      test/handlers/set_options_h.erl
  3. 62 3
      test/http_SUITE.erl

+ 22 - 4
src/cowboy_http.erl

@@ -111,6 +111,9 @@
 	proxy_header :: undefined | ranch_proxy_header:proxy_info(),
 	proxy_header :: undefined | ranch_proxy_header:proxy_info(),
 	opts = #{} :: cowboy:opts(),
 	opts = #{} :: cowboy:opts(),
 
 
+	%% Some options may be overriden for the current stream.
+	overriden_opts = #{} :: cowboy:opts(),
+
 	%% Remote address and port for the connection.
 	%% Remote address and port for the connection.
 	peer = undefined :: {inet:ip_address(), inet:port_number()},
 	peer = undefined :: {inet:ip_address(), inet:port_number()},
 
 
@@ -244,13 +247,18 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
 
 
 %% We set request_timeout when there are no active streams,
 %% We set request_timeout when there are no active streams,
 %% and idle_timeout otherwise.
 %% and idle_timeout otherwise.
-set_timeout(State0=#state{opts=Opts, streams=Streams}) ->
+set_timeout(State0=#state{opts=Opts, overriden_opts=Override, streams=Streams}) ->
 	State = cancel_timeout(State0),
 	State = cancel_timeout(State0),
 	{Name, Default} = case Streams of
 	{Name, Default} = case Streams of
 		[] -> {request_timeout, 5000};
 		[] -> {request_timeout, 5000};
 		_ -> {idle_timeout, 60000}
 		_ -> {idle_timeout, 60000}
 	end,
 	end,
-	TimerRef = case maps:get(Name, Opts, Default) of
+	Timeout = case Override of
+		%% The timeout may have been overriden for the current stream.
+		#{Name := Timeout0} -> Timeout0;
+		_ -> maps:get(Name, Opts, Default)
+	end,
+	TimerRef = case Timeout of
 		infinity -> undefined;
 		infinity -> undefined;
 		Timeout -> erlang:start_timer(Timeout, self(), Name)
 		Timeout -> erlang:start_timer(Timeout, self(), Name)
 	end,
 	end,
@@ -1088,6 +1096,16 @@ commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transpor
 	%% we need to let this module go entirely. Perhaps it should be handled directly in
 	%% we need to let this module go entirely. Perhaps it should be handled directly in
 	%% cowboy_clear/cowboy_tls?
 	%% cowboy_clear/cowboy_tls?
 	Protocol:takeover(Parent, Ref, Socket, Transport, Opts, <<>>, InitialState);
 	Protocol:takeover(Parent, Ref, Socket, Transport, Opts, <<>>, InitialState);
+%% Set options dynamically.
+commands(State0=#state{overriden_opts=Opts},
+		StreamID, [{set_options, SetOpts}|Tail]) ->
+	State = case SetOpts of
+		#{idle_timeout := IdleTimeout} ->
+			set_timeout(State0#state{overriden_opts=Opts#{idle_timeout => IdleTimeout}});
+		_ ->
+			State0
+	end,
+	commands(State, StreamID, Tail);
 %% Stream shutdown.
 %% Stream shutdown.
 commands(State, StreamID, [stop|Tail]) ->
 commands(State, StreamID, [stop|Tail]) ->
 	%% @todo Do we want to run the commands after a stop?
 	%% @todo Do we want to run the commands after a stop?
@@ -1188,10 +1206,10 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta
 		_ -> %% done or Version =:= 'HTTP/1.0'
 		_ -> %% done or Version =:= 'HTTP/1.0'
 			State0
 			State0
 	end,
 	end,
-	%% Remove the stream from the state.
+	%% Remove the stream from the state and reset the overriden options.
 	{value, #stream{state=StreamState}, Streams}
 	{value, #stream{state=StreamState}, Streams}
 		= lists:keytake(StreamID, #stream.id, Streams1),
 		= lists:keytake(StreamID, #stream.id, Streams1),
-	State2 = State1#state{streams=Streams},
+	State2 = State1#state{streams=Streams, overriden_opts=#{}},
 	%% Stop the stream.
 	%% Stop the stream.
 	stream_call_terminate(StreamID, Reason, StreamState, State2),
 	stream_call_terminate(StreamID, Reason, StreamState, State2),
 	Children = cowboy_children:shutdown(Children0, StreamID),
 	Children = cowboy_children:shutdown(Children0, StreamID),

+ 22 - 0
test/handlers/set_options_h.erl

@@ -0,0 +1,22 @@
+%% This module sets options dynamically and performs
+%% some related relevant operation for testing the change.
+
+-module(set_options_h).
+
+-export([init/2]).
+
+init(Req, State) ->
+	set_options(cowboy_req:binding(key, Req), Req, State).
+
+set_options(<<"idle_timeout_short">>, Req0, State) ->
+	%% @todo This should be replaced by a cowboy_req:cast/cowboy_stream:cast.
+	#{pid := Pid, streamid := StreamID} = Req0,
+	Pid ! {{Pid, StreamID}, {set_options, #{idle_timeout => 500}}},
+	{_, Body, Req} = cowboy_req:read_body(Req0),
+	{ok, cowboy_req:reply(200, #{}, Body, Req), State};
+set_options(<<"idle_timeout_long">>, Req0, State) ->
+	%% @todo This should be replaced by a cowboy_req:cast/cowboy_stream:cast.
+	#{pid := Pid, streamid := StreamID} = Req0,
+	Pid ! {{Pid, StreamID}, {set_options, #{idle_timeout => 60000}}},
+	{_, Body, Req} = cowboy_req:read_body(Req0),
+	{ok, cowboy_req:reply(200, #{}, Body, Req), State}.

+ 62 - 3
test/http_SUITE.erl

@@ -32,7 +32,8 @@ groups() -> [{clear, [parallel], ct_helper:all(?MODULE)}].
 init_routes(_) -> [
 init_routes(_) -> [
 	{"localhost", [
 	{"localhost", [
 		{"/", hello_h, []},
 		{"/", hello_h, []},
-		{"/echo/:key", echo_h, []}
+		{"/echo/:key", echo_h, []},
+		{"/set_options/:key", set_options_h, []}
 	]}
 	]}
 ].
 ].
 
 
@@ -65,11 +66,13 @@ idle_timeout_infinity(Config) ->
 	doc("Ensure the idle_timeout option accepts the infinity value."),
 	doc("Ensure the idle_timeout option accepts the infinity value."),
 	{ok, _} = cowboy:start_clear(name(), [{port, 0}], #{
 	{ok, _} = cowboy:start_clear(name(), [{port, 0}], #{
 		env => #{dispatch => cowboy_router:compile(init_routes(Config))},
 		env => #{dispatch => cowboy_router:compile(init_routes(Config))},
-		request_timeout => infinity
+		request_timeout => 500,
+		idle_timeout => infinity
 	}),
 	}),
 	Port = ranch:get_port(name()),
 	Port = ranch:get_port(name()),
 	ConnPid = gun_open([{type, tcp}, {protocol, http}, {port, Port}|Config]),
 	ConnPid = gun_open([{type, tcp}, {protocol, http}, {port, Port}|Config]),
-	_ = gun:post(ConnPid, "/echo/read_body", [], <<"TEST">>),
+	_ = gun:post(ConnPid, "/echo/read_body",
+		[{<<"content-type">>, <<"text/plain">>}]),
 	#{socket := Socket} = gun:info(ConnPid),
 	#{socket := Socket} = gun:info(ConnPid),
 	Pid = get_remote_pid_tcp(Socket),
 	Pid = get_remote_pid_tcp(Socket),
 	Ref = erlang:monitor(process, Pid),
 	Ref = erlang:monitor(process, Pid),
@@ -98,6 +101,62 @@ request_timeout_infinity(Config) ->
 		ok
 		ok
 	end.
 	end.
 
 
+set_options_idle_timeout(Config) ->
+	doc("Confirm that the idle_timeout option can be dynamically "
+		"set to change how long Cowboy will wait before it closes the connection."),
+	%% We start with a long timeout and then cut it short.
+	{ok, _} = cowboy:start_clear(name(), [{port, 0}], #{
+		env => #{dispatch => cowboy_router:compile(init_routes(Config))},
+		idle_timeout => 60000
+	}),
+	Port = ranch:get_port(name()),
+	ConnPid = gun_open([{type, tcp}, {protocol, http}, {port, Port}|Config]),
+	_ = gun:post(ConnPid, "/set_options/idle_timeout_short",
+		[{<<"content-type">>, <<"text/plain">>}]),
+	#{socket := Socket} = gun:info(ConnPid),
+	Pid = get_remote_pid_tcp(Socket),
+	Ref = erlang:monitor(process, Pid),
+	receive
+		{'DOWN', Ref, process, Pid, _} ->
+			ok
+	after 2000 ->
+		error(timeout)
+	end.
+
+set_options_idle_timeout_only_applies_to_current_request(Config) ->
+	doc("Confirm that changes to the idle_timeout option only apply to the current stream."),
+	%% We start with a long timeout and then cut it short.
+	{ok, _} = cowboy:start_clear(name(), [{port, 0}], #{
+		env => #{dispatch => cowboy_router:compile(init_routes(Config))},
+		idle_timeout => 500
+	}),
+	Port = ranch:get_port(name()),
+	ConnPid = gun_open([{type, tcp}, {protocol, http}, {port, Port}|Config]),
+	StreamRef = gun:post(ConnPid, "/set_options/idle_timeout_long",
+		[{<<"content-type">>, <<"text/plain">>}]),
+	#{socket := Socket} = gun:info(ConnPid),
+	Pid = get_remote_pid_tcp(Socket),
+	Ref = erlang:monitor(process, Pid),
+	receive
+		{'DOWN', Ref, process, Pid, Reason} ->
+			error(Reason)
+	after 2000 ->
+		ok
+	end,
+	%% Finish the first request and start a second one to confirm
+	%% the idle_timeout option is back to normal.
+	gun:data(ConnPid, StreamRef, fin, <<"Hello!">>),
+	{response, nofin, 200, _} = gun:await(ConnPid, StreamRef),
+	{ok, <<"Hello!">>} = gun:await_body(ConnPid, StreamRef),
+	_ = gun:post(ConnPid, "/echo/read_body",
+		[{<<"content-type">>, <<"text/plain">>}]),
+	receive
+		{'DOWN', Ref, process, Pid, _} ->
+			ok
+	after 2000 ->
+		error(timeout)
+	end.
+
 switch_protocol_flush(Config) ->
 switch_protocol_flush(Config) ->
 	doc("Confirm that switch_protocol does not flush unrelated messages."),
 	doc("Confirm that switch_protocol does not flush unrelated messages."),
 	ProtoOpts = #{
 	ProtoOpts = #{