Browse Source

Use active,N

This reduces the number of times we need to ask for more packets,
and as a result we get a fairly large boost in performance,
especially with HTTP/1.1.

Unfortunately this makes Cowboy require at least Erlang/OTP 21.3+
because the ssl application did not have active,N. For simplicity
the version required will be Erlang/OTP 22+.

In addition this change improves hibernate handling in
cowboy_websocket. Hibernate will now work for HTTP/2 transport
as well, and stray or unrelated messages will no longer cancel
hibernate (the process will handle the message and go back into
hibernation).

Thanks go to Stressgrid for benchmarking an early version of this
commit: https://stressgrid.com/blog/cowboy_performance_part_2/
Loïc Hoguin 5 years ago
parent
commit
db0d6f8d25

+ 2 - 2
Makefile

@@ -29,10 +29,10 @@ dep_gun = git https://github.com/ninenines/gun master
 dep_ci.erlang.mk = git https://github.com/ninenines/ci.erlang.mk master
 dep_ci.erlang.mk = git https://github.com/ninenines/ci.erlang.mk master
 DEP_EARLY_PLUGINS = ci.erlang.mk
 DEP_EARLY_PLUGINS = ci.erlang.mk
 
 
-AUTO_CI_OTP ?= OTP-LATEST-20+
+AUTO_CI_OTP ?= OTP-LATEST-22+
 AUTO_CI_HIPE ?= OTP-LATEST
 AUTO_CI_HIPE ?= OTP-LATEST
 # AUTO_CI_ERLLVM ?= OTP-LATEST
 # AUTO_CI_ERLLVM ?= OTP-LATEST
-AUTO_CI_WINDOWS ?= OTP-LATEST-20+
+AUTO_CI_WINDOWS ?= OTP-LATEST-22+
 
 
 # Standard targets.
 # Standard targets.
 
 

+ 1 - 1
doc/src/guide/introduction.asciidoc

@@ -35,7 +35,7 @@ guarantee that the experience will be safe and smooth. You are advised
 to perform the necessary testing and security audits prior to deploying
 to perform the necessary testing and security audits prior to deploying
 on other platforms.
 on other platforms.
 
 
-Cowboy is developed for Erlang/OTP 20.0 and newer.
+Cowboy is developed for Erlang/OTP 22.0 and newer.
 
 
 === License
 === License
 
 

+ 10 - 0
doc/src/manual/cowboy_http.asciidoc

@@ -17,6 +17,7 @@ as a Ranch protocol.
 [source,erlang]
 [source,erlang]
 ----
 ----
 opts() :: #{
 opts() :: #{
+    active_n                 => pos_integer(),
     chunked                  => boolean(),
     chunked                  => boolean(),
     connection_type          => worker | supervisor,
     connection_type          => worker | supervisor,
     http10_keepalive         => boolean(),
     http10_keepalive         => boolean(),
@@ -51,6 +52,14 @@ Ranch functions `ranch:get_protocol_options/1` and
 
 
 The default value is given next to the option name:
 The default value is given next to the option name:
 
 
+active_n (100)::
+
+The number of packets Cowboy will request from the socket at once.
+This can be used to tweak the performance of the server. Higher
+values reduce the number of times Cowboy need to request more
+packets from the port driver at the expense of potentially
+higher memory being used.
+
 chunked (true)::
 chunked (true)::
 
 
 Whether chunked transfer-encoding is enabled for HTTP/1.1 connections.
 Whether chunked transfer-encoding is enabled for HTTP/1.1 connections.
@@ -151,6 +160,7 @@ Ordered list of stream handlers that will handle all stream events.
 
 
 == Changelog
 == Changelog
 
 
+* *2.8*: The `active_n` option was added.
 * *2.7*: The `initial_stream_flow_size` and `logger` options were added.
 * *2.7*: The `initial_stream_flow_size` and `logger` options were added.
 * *2.6*: The `chunked`, `http10_keepalive`, `proxy_header` and `sendfile` options were added.
 * *2.6*: The `chunked`, `http10_keepalive`, `proxy_header` and `sendfile` options were added.
 * *2.5*: The `linger_timeout` option was added.
 * *2.5*: The `linger_timeout` option was added.

+ 10 - 0
doc/src/manual/cowboy_http2.asciidoc

@@ -17,6 +17,7 @@ as a Ranch protocol.
 [source,erlang]
 [source,erlang]
 ----
 ----
 opts() :: #{
 opts() :: #{
+    active_n                       => pos_integer(),
     connection_type                => worker | supervisor,
     connection_type                => worker | supervisor,
     connection_window_margin_size  => 0..16#7fffffff,
     connection_window_margin_size  => 0..16#7fffffff,
     connection_window_update_threshold => 0..16#7fffffff,
     connection_window_update_threshold => 0..16#7fffffff,
@@ -59,6 +60,14 @@ Ranch functions `ranch:get_protocol_options/1` and
 
 
 The default value is given next to the option name:
 The default value is given next to the option name:
 
 
+active_n (100)::
+
+The number of packets Cowboy will request from the socket at once.
+This can be used to tweak the performance of the server. Higher
+values reduce the number of times Cowboy need to request more
+packets from the port driver at the expense of potentially
+higher memory being used.
+
 connection_type (supervisor)::
 connection_type (supervisor)::
 
 
 Whether the connection process also acts as a supervisor.
 Whether the connection process also acts as a supervisor.
@@ -226,6 +235,7 @@ too many `WINDOW_UPDATE` frames.
 
 
 == Changelog
 == Changelog
 
 
+* *2.8*: The `active_n` option was added.
 * *2.7*: Add the options `connection_window_margin_size`,
 * *2.7*: Add the options `connection_window_margin_size`,
          `connection_window_update_threshold`,
          `connection_window_update_threshold`,
          `max_connection_window_size`, `max_stream_window_size`,
          `max_connection_window_size`, `max_stream_window_size`,

+ 12 - 0
doc/src/manual/cowboy_websocket.asciidoc

@@ -198,6 +198,7 @@ Cowboy does it automatically for you.
 [source,erlang]
 [source,erlang]
 ----
 ----
 opts() :: #{
 opts() :: #{
+    active_n       => pos_integer(),
     compress       => boolean(),
     compress       => boolean(),
     deflate_opts   => cow_ws:deflate_opts()
     deflate_opts   => cow_ws:deflate_opts()
     idle_timeout   => timeout(),
     idle_timeout   => timeout(),
@@ -221,6 +222,16 @@ init(Req, State) ->
 
 
 The default value is given next to the option name:
 The default value is given next to the option name:
 
 
+active_n (100)::
+
+The number of packets Cowboy will request from the socket at once.
+This can be used to tweak the performance of the server. Higher
+values reduce the number of times Cowboy need to request more
+packets from the port driver at the expense of potentially
+higher memory being used.
++
+This option does not apply to Websocket over HTTP/2.
+
 compress (false)::
 compress (false)::
 
 
 Whether to enable the Websocket frame compression
 Whether to enable the Websocket frame compression
@@ -274,6 +285,7 @@ normal circumstances if necessary.
 
 
 == Changelog
 == Changelog
 
 
+* *2.8*: The `active_n` option was added.
 * *2.7*: The commands based interface has been documented.
 * *2.7*: The commands based interface has been documented.
          The old interface is now deprecated.
          The old interface is now deprecated.
 * *2.7*: The command `shutdown_reason` was introduced.
 * *2.7*: The command `shutdown_reason` was introduced.

+ 78 - 39
src/cowboy_http.erl

@@ -21,6 +21,7 @@
 -export([system_code_change/4]).
 -export([system_code_change/4]).
 
 
 -type opts() :: #{
 -type opts() :: #{
+	active_n => pos_integer(),
 	chunked => boolean(),
 	chunked => boolean(),
 	compress_buffering => boolean(),
 	compress_buffering => boolean(),
 	compress_threshold => non_neg_integer(),
 	compress_threshold => non_neg_integer(),
@@ -121,6 +122,9 @@
 
 
 	timer = undefined :: undefined | reference(),
 	timer = undefined :: undefined | reference(),
 
 
+	%% Whether we are currently receiving data from the socket.
+	active = true :: boolean(),
+
 	%% Identifier for the stream currently being read (or waiting to be received).
 	%% Identifier for the stream currently being read (or waiting to be received).
 	in_streamid = 1 :: pos_integer(),
 	in_streamid = 1 :: pos_integer(),
 
 
@@ -173,7 +177,8 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
 				transport=Transport, proxy_header=ProxyHeader, opts=Opts,
 				transport=Transport, proxy_header=ProxyHeader, opts=Opts,
 				peer=Peer, sock=Sock, cert=Cert,
 				peer=Peer, sock=Sock, cert=Cert,
 				last_streamid=maps:get(max_keepalive, Opts, 100)},
 				last_streamid=maps:get(max_keepalive, Opts, 100)},
-			before_loop(set_timeout(State, request_timeout));
+			setopts_active(State),
+			loop(set_timeout(State, request_timeout));
 		{{error, Reason}, _, _} ->
 		{{error, Reason}, _, _} ->
 			terminate(undefined, {socket_error, Reason,
 			terminate(undefined, {socket_error, Reason,
 				'A socket error occurred when retrieving the peer name.'});
 				'A socket error occurred when retrieving the peer name.'});
@@ -185,12 +190,29 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
 				'A socket error occurred when retrieving the client TLS certificate.'})
 				'A socket error occurred when retrieving the client TLS certificate.'})
 	end.
 	end.
 
 
-%% Do not read from the socket unless flow is large enough.
-before_loop(State=#state{flow=Flow}) when Flow =< 0 ->
-	loop(State);
-before_loop(State=#state{socket=Socket, transport=Transport}) ->
-	Transport:setopts(Socket, [{active, once}]),
-	loop(State).
+setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
+	N = maps:get(active_n, Opts, 100),
+	Transport:setopts(Socket, [{active, N}]).
+
+active(State) ->
+	setopts_active(State),
+	State#state{active=true}.
+
+passive(State=#state{socket=Socket, transport=Transport}) ->
+	Transport:setopts(Socket, [{active, false}]),
+	Messages = Transport:messages(),
+	flush_passive(Socket, Messages),
+	State#state{active=false}.
+
+flush_passive(Socket, Messages) ->
+	receive
+		{Passive, Socket} when Passive =:= element(4, Messages);
+				%% Hardcoded for compatibility with Ranch 1.x.
+				Passive =:= tcp_passive; Passive =:= ssl_passive ->
+			flush_passive(Socket, Messages)
+	after 0 ->
+		ok
+	end.
 
 
 loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
 loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
 		buffer=Buffer, timer=TimerRef, children=Children, in_streamid=InStreamID,
 		buffer=Buffer, timer=TimerRef, children=Children, in_streamid=InStreamID,
@@ -201,7 +223,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
 		%% Discard data coming in after the last request
 		%% Discard data coming in after the last request
 		%% we want to process was received fully.
 		%% we want to process was received fully.
 		{OK, Socket, _} when OK =:= element(1, Messages), InStreamID > LastStreamID ->
 		{OK, Socket, _} when OK =:= element(1, Messages), InStreamID > LastStreamID ->
-			before_loop(State);
+			loop(State);
 		%% Socket messages.
 		%% Socket messages.
 		{OK, Socket, Data} when OK =:= element(1, Messages) ->
 		{OK, Socket, Data} when OK =:= element(1, Messages) ->
 			parse(<< Buffer/binary, Data/binary >>, State);
 			parse(<< Buffer/binary, Data/binary >>, State);
@@ -209,6 +231,11 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
 			terminate(State, {socket_error, closed, 'The socket has been closed.'});
 			terminate(State, {socket_error, closed, 'The socket has been closed.'});
 		{Error, Socket, Reason} when Error =:= element(3, Messages) ->
 		{Error, Socket, Reason} when Error =:= element(3, Messages) ->
 			terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
 			terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
+		{Passive, Socket} when Passive =:= element(4, Messages);
+				%% Hardcoded for compatibility with Ranch 1.x.
+				Passive =:= tcp_passive; Passive =:= ssl_passive ->
+			setopts_active(State),
+			loop(State);
 		%% Timeouts.
 		%% Timeouts.
 		{timeout, Ref, {shutdown, Pid}} ->
 		{timeout, Ref, {shutdown, Pid}} ->
 			cowboy_children:shutdown_timeout(Children, Ref, Pid),
 			cowboy_children:shutdown_timeout(Children, Ref, Pid),
@@ -297,12 +324,12 @@ timeout(State, idle_timeout) ->
 		'Connection idle longer than configuration allows.'}).
 		'Connection idle longer than configuration allows.'}).
 
 
 parse(<<>>, State) ->
 parse(<<>>, State) ->
-	before_loop(State#state{buffer= <<>>});
+	loop(State#state{buffer= <<>>});
 %% Do not process requests that come in after the last request
 %% Do not process requests that come in after the last request
 %% and discard the buffer if any to save memory.
 %% and discard the buffer if any to save memory.
 parse(_, State=#state{in_streamid=InStreamID, in_state=#ps_request_line{},
 parse(_, State=#state{in_streamid=InStreamID, in_state=#ps_request_line{},
 		last_streamid=LastStreamID}) when InStreamID > LastStreamID ->
 		last_streamid=LastStreamID}) when InStreamID > LastStreamID ->
-	before_loop(State#state{buffer= <<>>});
+	loop(State#state{buffer= <<>>});
 parse(Buffer, State=#state{in_state=#ps_request_line{empty_lines=EmptyLines}}) ->
 parse(Buffer, State=#state{in_state=#ps_request_line{empty_lines=EmptyLines}}) ->
 	after_parse(parse_request(Buffer, State, EmptyLines));
 	after_parse(parse_request(Buffer, State, EmptyLines));
 parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=undefined}}) ->
 parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=undefined}}) ->
@@ -364,17 +391,26 @@ after_parse({data, StreamID, IsFin, Data, State0=#state{opts=Opts, buffer=Buffer
 %% No corresponding stream. We must skip the body of the previous request
 %% No corresponding stream. We must skip the body of the previous request
 %% in order to process the next one.
 %% in order to process the next one.
 after_parse({data, _, IsFin, _, State}) ->
 after_parse({data, _, IsFin, _, State}) ->
-	before_loop(set_timeout(State, case IsFin of
+	loop(set_timeout(State, case IsFin of
 		fin -> request_timeout;
 		fin -> request_timeout;
 		nofin -> idle_timeout
 		nofin -> idle_timeout
 	end));
 	end));
 after_parse({more, State}) ->
 after_parse({more, State}) ->
-	before_loop(set_timeout(State, idle_timeout)).
+	loop(set_timeout(State, idle_timeout)).
 
 
 update_flow(fin, _, State) ->
 update_flow(fin, _, State) ->
+	%% This function is only called after parsing, therefore we
+	%% are expecting to be in active mode already.
 	State#state{flow=infinity};
 	State#state{flow=infinity};
-update_flow(nofin, Data, State=#state{flow=Flow0}) ->
-	State#state{flow=Flow0 - byte_size(Data)}.
+update_flow(nofin, Data, State0=#state{flow=Flow0}) ->
+	Flow = Flow0 - byte_size(Data),
+	State = State0#state{flow=Flow},
+	if
+		Flow0 > 0, Flow =< 0 ->
+			passive(State);
+		true ->
+			State
+	end.
 
 
 %% Request-line.
 %% Request-line.
 
 
@@ -935,8 +971,7 @@ commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Command
 		Stream#stream{queue=Queue ++ Commands}),
 		Stream#stream{queue=Queue ++ Commands}),
 	State#state{streams=Streams};
 	State#state{streams=Streams};
 %% Read the request body.
 %% Read the request body.
-commands(State=#state{socket=Socket, transport=Transport, flow=Flow0}, StreamID,
-		[{flow, Size}|Tail]) ->
+commands(State0=#state{flow=Flow0}, StreamID, [{flow, Size}|Tail]) ->
 	%% We must read *at least* Size of data otherwise functions
 	%% We must read *at least* Size of data otherwise functions
 	%% like cowboy_req:read_body/1,2 will wait indefinitely.
 	%% like cowboy_req:read_body/1,2 will wait indefinitely.
 	Flow = if
 	Flow = if
@@ -944,11 +979,11 @@ commands(State=#state{socket=Socket, transport=Transport, flow=Flow0}, StreamID,
 		true -> Flow0 + Size
 		true -> Flow0 + Size
 	end,
 	end,
 	%% Reenable active mode if necessary.
 	%% Reenable active mode if necessary.
-	_ = if
+	State = if
 		Flow0 =< 0, Flow > 0 ->
 		Flow0 =< 0, Flow > 0 ->
-			Transport:setopts(Socket, [{active, once}]);
+			active(State0);
 		true ->
 		true ->
-			ok
+			State0
 	end,
 	end,
 	commands(State#state{flow=Flow}, StreamID, Tail);
 	commands(State#state{flow=Flow}, StreamID, Tail);
 %% Error responses are sent only if a response wasn't sent already.
 %% Error responses are sent only if a response wasn't sent already.
@@ -1118,14 +1153,14 @@ commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transpor
 		out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID,
 		out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID,
 		[{switch_protocol, Headers, Protocol, InitialState}|_Tail]) ->
 		[{switch_protocol, Headers, Protocol, InitialState}|_Tail]) ->
 	%% @todo If there's streams opened after this one, fail instead of 101.
 	%% @todo If there's streams opened after this one, fail instead of 101.
-	State = cancel_timeout(State0),
+	State1 = cancel_timeout(State0),
 	%% Before we send the 101 response we need to stop receiving data
 	%% Before we send the 101 response we need to stop receiving data
 	%% from the socket, otherwise the data might be receive before the
 	%% from the socket, otherwise the data might be receive before the
 	%% call to flush/0 and we end up inadvertently dropping a packet.
 	%% call to flush/0 and we end up inadvertently dropping a packet.
 	%%
 	%%
 	%% @todo Handle cases where the request came with a body. We need
 	%% @todo Handle cases where the request came with a body. We need
 	%% to process or skip the body before the upgrade can be completed.
 	%% to process or skip the body before the upgrade can be completed.
-	Transport:setopts(Socket, [{active, false}]),
+	State = passive(State1),
 	%% Send a 101 response if necessary, then terminate the stream.
 	%% Send a 101 response if necessary, then terminate the stream.
 	#state{streams=Streams} = case OutState of
 	#state{streams=Streams} = case OutState of
 		wait -> info(State, StreamID, {inform, 101, Headers});
 		wait -> info(State, StreamID, {inform, 101, Headers});
@@ -1415,37 +1450,41 @@ terminate_linger(State=#state{socket=Socket, transport=Transport, opts=Opts}) ->
 				0 ->
 				0 ->
 					ok;
 					ok;
 				infinity ->
 				infinity ->
-					terminate_linger_loop(State, undefined);
+					terminate_linger_before_loop(State, undefined, Transport:messages());
 				Timeout ->
 				Timeout ->
 					TimerRef = erlang:start_timer(Timeout, self(), linger_timeout),
 					TimerRef = erlang:start_timer(Timeout, self(), linger_timeout),
-					terminate_linger_loop(State, TimerRef)
+					terminate_linger_before_loop(State, TimerRef, Transport:messages())
 			end;
 			end;
 		{error, _} ->
 		{error, _} ->
 			ok
 			ok
 	end.
 	end.
 
 
-terminate_linger_loop(State=#state{socket=Socket, transport=Transport}, TimerRef) ->
-	Messages = Transport:messages(),
-	%% We may already have a message in the mailbox when we do this
+terminate_linger_before_loop(State, TimerRef, Messages) ->
+	%% We may already be in active mode when we do this
 	%% but it's OK because we are shutting down anyway.
 	%% but it's OK because we are shutting down anyway.
-	case Transport:setopts(Socket, [{active, once}]) of
+	case setopts_active(State) of
 		ok ->
 		ok ->
-			receive
-				{OK, Socket, _} when OK =:= element(1, Messages) ->
-					terminate_linger_loop(State, TimerRef);
-				{Closed, Socket} when Closed =:= element(2, Messages) ->
-					ok;
-				{Error, Socket, _} when Error =:= element(3, Messages) ->
-					ok;
-				{timeout, TimerRef, linger_timeout} ->
-					ok;
-				_ ->
-					terminate_linger_loop(State, TimerRef)
-			end;
+			terminate_linger_loop(State, TimerRef, Messages);
 		{error, _} ->
 		{error, _} ->
 			ok
 			ok
 	end.
 	end.
 
 
+terminate_linger_loop(State=#state{socket=Socket}, TimerRef, Messages) ->
+	receive
+		{OK, Socket, _} when OK =:= element(1, Messages) ->
+			terminate_linger_loop(State, TimerRef, Messages);
+		{Closed, Socket} when Closed =:= element(2, Messages) ->
+			ok;
+		{Error, Socket, _} when Error =:= element(3, Messages) ->
+			ok;
+		{Passive, Socket} when Passive =:= tcp_passive; Passive =:= ssl_passive ->
+			terminate_linger_before_loop(State, TimerRef, Messages);
+		{timeout, TimerRef, linger_timeout} ->
+			ok;
+		_ ->
+			terminate_linger_loop(State, TimerRef, Messages)
+	end.
+
 %% System callbacks.
 %% System callbacks.
 
 
 -spec system_continue(_, _, #state{}) -> ok.
 -spec system_continue(_, _, #state{}) -> ok.

+ 15 - 2
src/cowboy_http2.erl

@@ -23,6 +23,7 @@
 -export([system_code_change/4]).
 -export([system_code_change/4]).
 
 
 -type opts() :: #{
 -type opts() :: #{
+	active_n => pos_integer(),
 	compress_buffering => boolean(),
 	compress_buffering => boolean(),
 	compress_threshold => non_neg_integer(),
 	compress_threshold => non_neg_integer(),
 	connection_type => worker | supervisor,
 	connection_type => worker | supervisor,
@@ -163,6 +164,7 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer
 		opts=Opts, peer=Peer, sock=Sock, cert=Cert,
 		opts=Opts, peer=Peer, sock=Sock, cert=Cert,
 		http2_status=sequence, http2_machine=HTTP2Machine})),
 		http2_status=sequence, http2_machine=HTTP2Machine})),
 	Transport:send(Socket, Preface),
 	Transport:send(Socket, Preface),
+	setopts_active(State),
 	case Buffer of
 	case Buffer of
 		<<>> -> loop(State, Buffer);
 		<<>> -> loop(State, Buffer);
 		_ -> parse(State, Buffer)
 		_ -> parse(State, Buffer)
@@ -204,15 +206,21 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer
 	}, ?MODULE, undefined}), %% @todo undefined or #{}?
 	}, ?MODULE, undefined}), %% @todo undefined or #{}?
 	State = set_timeout(init_rate_limiting(State2#state{http2_status=sequence})),
 	State = set_timeout(init_rate_limiting(State2#state{http2_status=sequence})),
 	Transport:send(Socket, Preface),
 	Transport:send(Socket, Preface),
+	setopts_active(State),
 	case Buffer of
 	case Buffer of
 		<<>> -> loop(State, Buffer);
 		<<>> -> loop(State, Buffer);
 		_ -> parse(State, Buffer)
 		_ -> parse(State, Buffer)
 	end.
 	end.
 
 
+%% Because HTTP/2 has flow control and Cowboy has other rate limiting
+%% mechanisms implemented, a very large active_n value should be fine,
+%% as long as the stream handlers do their work in a timely manner.
+setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
+	N = maps:get(active_n, Opts, 100),
+	Transport:setopts(Socket, [{active, N}]).
+
 loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
 loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
 		opts=Opts, timer=TimerRef, children=Children}, Buffer) ->
 		opts=Opts, timer=TimerRef, children=Children}, Buffer) ->
-	%% @todo This should only be called when data was read.
-	Transport:setopts(Socket, [{active, once}]),
 	Messages = Transport:messages(),
 	Messages = Transport:messages(),
 	InactivityTimeout = maps:get(inactivity_timeout, Opts, 300000),
 	InactivityTimeout = maps:get(inactivity_timeout, Opts, 300000),
 	receive
 	receive
@@ -223,6 +231,11 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
 			terminate(State, {socket_error, closed, 'The socket has been closed.'});
 			terminate(State, {socket_error, closed, 'The socket has been closed.'});
 		{Error, Socket, Reason} when Error =:= element(3, Messages) ->
 		{Error, Socket, Reason} when Error =:= element(3, Messages) ->
 			terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
 			terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
+		{Passive, Socket} when Passive =:= element(4, Messages);
+				%% Hardcoded for compatibility with Ranch 1.x.
+				Passive =:= tcp_passive; Passive =:= ssl_passive ->
+			setopts_active(State),
+			loop(State, Buffer);
 		%% System messages.
 		%% System messages.
 		{'EXIT', Parent, Reason} ->
 		{'EXIT', Parent, Reason} ->
 			%% @todo Graceful shutdown here as well?
 			%% @todo Graceful shutdown here as well?

+ 1 - 0
src/cowboy_stream_h.erl

@@ -92,6 +92,7 @@ data(StreamID, IsFin, Data, State=#state{read_body_pid=Pid, read_body_ref=Ref,
 	send_request_body(Pid, Ref, IsFin, BodyLen, Data),
 	send_request_body(Pid, Ref, IsFin, BodyLen, Data),
 	do_data(StreamID, IsFin, Data, [{flow, byte_size(Data)}], State#state{
 	do_data(StreamID, IsFin, Data, [{flow, byte_size(Data)}], State#state{
 		read_body_ref=undefined,
 		read_body_ref=undefined,
+		%% @todo This is wrong, it's missing byte_size(Data).
 		body_length=BodyLen
 		body_length=BodyLen
 	});
 	});
 %% Stream is waiting for data but we didn't receive enough to send yet.
 %% Stream is waiting for data but we didn't receive enough to send yet.

+ 79 - 21
src/cowboy_websocket.erl

@@ -66,6 +66,7 @@
 -optional_callbacks([terminate/3]).
 -optional_callbacks([terminate/3]).
 
 
 -type opts() :: #{
 -type opts() :: #{
+	active_n => pos_integer(),
 	compress => boolean(),
 	compress => boolean(),
 	deflate_opts => cow_ws:deflate_opts(),
 	deflate_opts => cow_ws:deflate_opts(),
 	idle_timeout => timeout(),
 	idle_timeout => timeout(),
@@ -85,7 +86,8 @@
 	handler :: module(),
 	handler :: module(),
 	key = undefined :: undefined | binary(),
 	key = undefined :: undefined | binary(),
 	timeout_ref = undefined :: undefined | reference(),
 	timeout_ref = undefined :: undefined | reference(),
-	messages = undefined :: undefined | {atom(), atom(), atom()},
+	messages = undefined :: undefined | {atom(), atom(), atom()}
+		| {atom(), atom(), atom(), atom()},
 	hibernate = false :: boolean(),
 	hibernate = false :: boolean(),
 	frag_state = undefined :: cow_ws:frag_state(),
 	frag_state = undefined :: cow_ws:frag_state(),
 	frag_buffer = <<>> :: binary(),
 	frag_buffer = <<>> :: binary(),
@@ -300,28 +302,71 @@ takeover(Parent, Ref, Socket, Transport, _Opts, Buffer,
 	%% we still want to process that data if any.
 	%% we still want to process that data if any.
 	case erlang:function_exported(Handler, websocket_init, 1) of
 	case erlang:function_exported(Handler, websocket_init, 1) of
 		true -> handler_call(State, HandlerState, #ps_header{buffer=Buffer},
 		true -> handler_call(State, HandlerState, #ps_header{buffer=Buffer},
-			websocket_init, undefined, fun parse_header/3);
-		false -> parse_header(State, HandlerState, #ps_header{buffer=Buffer})
+			websocket_init, undefined, fun after_init/3);
+		false -> after_init(State, HandlerState, #ps_header{buffer=Buffer})
 	end.
 	end.
 
 
-before_loop(State=#state{active=false}, HandlerState, ParseState) ->
-	loop(State, HandlerState, ParseState);
-%% @todo We probably shouldn't do the setopts if we have not received a socket message.
-%% @todo We need to hibernate when HTTP/2 is used too.
-before_loop(State=#state{socket=Stream={Pid, _}, transport=undefined},
-		HandlerState, ParseState) ->
+after_init(State=#state{active=true}, HandlerState, ParseState) ->
+	%% Enable active,N for HTTP/1.1, and auto read_body for HTTP/2.
+	%% We must do this only after calling websocket_init/1 (if any)
+	%% to give the handler a chance to disable active mode immediately.
+	setopts_active(State),
+	maybe_read_body(State),
+	parse_header(State, HandlerState, ParseState);
+after_init(State, HandlerState, ParseState) ->
+	parse_header(State, HandlerState, ParseState).
+
+%% We have two ways of reading the body for Websocket. For HTTP/1.1
+%% we have full control of the socket and can therefore use active,N.
+%% For HTTP/2 we are just a stream, and are instead using read_body
+%% (automatic mode). Technically HTTP/2 will only go passive after
+%% receiving the next data message, while HTTP/1.1 goes passive
+%% immediately but there might still be data to be processed in
+%% the message queue.
+
+setopts_active(#state{transport=undefined}) ->
+	ok;
+setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
+	N = maps:get(active_n, Opts, 100),
+	Transport:setopts(Socket, [{active, N}]).
+
+maybe_read_body(#state{socket=Stream={Pid, _}, transport=undefined, active=true}) ->
 	%% @todo Keep Ref around.
 	%% @todo Keep Ref around.
 	ReadBodyRef = make_ref(),
 	ReadBodyRef = make_ref(),
 	Pid ! {Stream, {read_body, self(), ReadBodyRef, auto, infinity}},
 	Pid ! {Stream, {read_body, self(), ReadBodyRef, auto, infinity}},
-	loop(State, HandlerState, ParseState);
-before_loop(State=#state{socket=Socket, transport=Transport, hibernate=true},
-		HandlerState, ParseState) ->
-	Transport:setopts(Socket, [{active, once}]),
+	ok;
+maybe_read_body(_) ->
+	ok.
+
+active(State) ->
+	setopts_active(State),
+	maybe_read_body(State),
+	State#state{active=true}.
+
+passive(State=#state{transport=undefined}) ->
+	%% Unfortunately we cannot currently cancel read_body.
+	%% But that's OK, we will just stop reading the body
+	%% after the next message.
+	State#state{active=false};
+passive(State=#state{socket=Socket, transport=Transport, messages=Messages}) ->
+	Transport:setopts(Socket, [{active, false}]),
+	flush_passive(Socket, Messages),
+	State#state{active=false}.
+
+flush_passive(Socket, Messages) ->
+	receive
+		{Passive, Socket} when Passive =:= element(4, Messages);
+				%% Hardcoded for compatibility with Ranch 1.x.
+				Passive =:= tcp_passive; Passive =:= ssl_passive ->
+			flush_passive(Socket, Messages)
+	after 0 ->
+		ok
+	end.
+
+before_loop(State=#state{hibernate=true}, HandlerState, ParseState) ->
 	proc_lib:hibernate(?MODULE, loop,
 	proc_lib:hibernate(?MODULE, loop,
 		[State#state{hibernate=false}, HandlerState, ParseState]);
 		[State#state{hibernate=false}, HandlerState, ParseState]);
-before_loop(State=#state{socket=Socket, transport=Transport},
-		HandlerState, ParseState) ->
-	Transport:setopts(Socket, [{active, once}]),
+before_loop(State, HandlerState, ParseState) ->
 	loop(State, HandlerState, ParseState).
 	loop(State, HandlerState, ParseState).
 
 
 -spec loop_timeout(#state{}) -> #state{}.
 -spec loop_timeout(#state{}) -> #state{}.
@@ -350,22 +395,28 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages,
 			terminate(State, HandlerState, {error, closed});
 			terminate(State, HandlerState, {error, closed});
 		{Error, Socket, Reason} when Error =:= element(3, Messages) ->
 		{Error, Socket, Reason} when Error =:= element(3, Messages) ->
 			terminate(State, HandlerState, {error, Reason});
 			terminate(State, HandlerState, {error, Reason});
+		{Passive, Socket} when Passive =:= element(4, Messages);
+				%% Hardcoded for compatibility with Ranch 1.x.
+				Passive =:= tcp_passive; Passive =:= ssl_passive ->
+			setopts_active(State),
+			loop(State, HandlerState, ParseState);
 		%% Body reading messages. (HTTP/2)
 		%% Body reading messages. (HTTP/2)
 		{request_body, _Ref, nofin, Data} ->
 		{request_body, _Ref, nofin, Data} ->
+			maybe_read_body(State),
 			State2 = loop_timeout(State),
 			State2 = loop_timeout(State),
 			parse(State2, HandlerState, ParseState, Data);
 			parse(State2, HandlerState, ParseState, Data);
 		%% @todo We need to handle this case as if it was an {error, closed}
 		%% @todo We need to handle this case as if it was an {error, closed}
 		%% but not before we finish processing frames. We probably should have
 		%% but not before we finish processing frames. We probably should have
 		%% a check in before_loop to let us stop looping if a flag is set.
 		%% a check in before_loop to let us stop looping if a flag is set.
 		{request_body, _Ref, fin, _, Data} ->
 		{request_body, _Ref, fin, _, Data} ->
+			maybe_read_body(State),
 			State2 = loop_timeout(State),
 			State2 = loop_timeout(State),
 			parse(State2, HandlerState, ParseState, Data);
 			parse(State2, HandlerState, ParseState, Data);
 		%% Timeouts.
 		%% Timeouts.
 		{timeout, TRef, ?MODULE} ->
 		{timeout, TRef, ?MODULE} ->
 			websocket_close(State, HandlerState, timeout);
 			websocket_close(State, HandlerState, timeout);
 		{timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) ->
 		{timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) ->
-			%% @todo This should call before_loop.
-			loop(State, HandlerState, ParseState);
+			before_loop(State, HandlerState, ParseState);
 		%% System messages.
 		%% System messages.
 		{'EXIT', Parent, Reason} ->
 		{'EXIT', Parent, Reason} ->
 			%% @todo We should exit gracefully.
 			%% @todo We should exit gracefully.
@@ -376,8 +427,7 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages,
 		%% Calls from supervisor module.
 		%% Calls from supervisor module.
 		{'$gen_call', From, Call} ->
 		{'$gen_call', From, Call} ->
 			cowboy_children:handle_supervisor_call(Call, From, [], ?MODULE),
 			cowboy_children:handle_supervisor_call(Call, From, [], ?MODULE),
-			%% @todo This should call before_loop.
-			loop(State, HandlerState, ParseState);
+			before_loop(State, HandlerState, ParseState);
 		Message ->
 		Message ->
 			handler_call(State, HandlerState, ParseState,
 			handler_call(State, HandlerState, ParseState,
 				websocket_info, Message, fun before_loop/3)
 				websocket_info, Message, fun before_loop/3)
@@ -531,7 +581,15 @@ commands([], State, []) ->
 commands([], State, Data) ->
 commands([], State, Data) ->
 	Result = transport_send(State, nofin, lists:reverse(Data)),
 	Result = transport_send(State, nofin, lists:reverse(Data)),
 	{Result, State};
 	{Result, State};
-commands([{active, Active}|Tail], State, Data) when is_boolean(Active) ->
+commands([{active, Active}|Tail], State0=#state{active=Active0}, Data) when is_boolean(Active) ->
+	State = if
+		Active, not Active0 ->
+			active(State0);
+		Active0, not Active ->
+			passive(State0);
+		true ->
+			State0
+	end,
 	commands(Tail, State#state{active=Active}, Data);
 	commands(Tail, State#state{active=Active}, Data);
 commands([{deflate, Deflate}|Tail], State, Data) when is_boolean(Deflate) ->
 commands([{deflate, Deflate}|Tail], State, Data) when is_boolean(Deflate) ->
 	commands(Tail, State#state{deflate=Deflate}, Data);
 	commands(Tail, State#state{deflate=Deflate}, Data);