Browse Source

Add accept_ack on all transports and ack_timeout transport option

Doing this in the connection process allows us to free acceptors
to start accepting more connections quicker, especially under load.
Loïc Hoguin 11 years ago
parent
commit
99242f3342

+ 1 - 1
manual/ranch_transport.md

@@ -17,7 +17,7 @@ Callbacks
 ---------
 ---------
 
 
 ### accept(LSocket, Timeout)
 ### accept(LSocket, Timeout)
-	-> {ok, CSocket} | {error, closed | timeout | atom() | tuple()}
+	-> {ok, CSocket} | {error, closed | timeout | atom()}
 
 
 > Types:
 > Types:
 >  *  LSocket = CSocket = any()
 >  *  LSocket = CSocket = any()

+ 3 - 1
src/ranch.erl

@@ -128,7 +128,9 @@ child_spec(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts)
 %% the protocol process before starting to use it.
 %% the protocol process before starting to use it.
 -spec accept_ack(ref()) -> ok.
 -spec accept_ack(ref()) -> ok.
 accept_ack(Ref) ->
 accept_ack(Ref) ->
-	receive {shoot, Ref} -> ok end.
+	receive {shoot, Ref, Transport, Socket, AckTimeout} ->
+		Transport:accept_ack(Socket, AckTimeout)
+	end.
 
 
 %% @doc Remove the calling process' connection from the pool.
 %% @doc Remove the calling process' connection from the pool.
 %%
 %%

+ 15 - 11
src/ranch_conns_sup.erl

@@ -20,12 +20,12 @@
 -module(ranch_conns_sup).
 -module(ranch_conns_sup).
 
 
 %% API.
 %% API.
--export([start_link/4]).
+-export([start_link/5]).
 -export([start_protocol/2]).
 -export([start_protocol/2]).
 -export([active_connections/1]).
 -export([active_connections/1]).
 
 
 %% Supervisor internals.
 %% Supervisor internals.
--export([init/5]).
+-export([init/6]).
 -export([system_continue/3]).
 -export([system_continue/3]).
 -export([system_terminate/4]).
 -export([system_terminate/4]).
 -export([system_code_change/4]).
 -export([system_code_change/4]).
@@ -39,15 +39,17 @@
 	transport = undefined :: module(),
 	transport = undefined :: module(),
 	protocol = undefined :: module(),
 	protocol = undefined :: module(),
 	opts :: any(),
 	opts :: any(),
-	max_conns = undefined :: non_neg_integer() | infinity
+	ack_timeout :: timeout(),
+	max_conns = undefined :: ranch:max_conns()
 }).
 }).
 
 
 %% API.
 %% API.
 
 
--spec start_link(ranch:ref(), conn_type(), module(), module()) -> {ok, pid()}.
-start_link(Ref, ConnType, Transport, Protocol) ->
+-spec start_link(ranch:ref(), conn_type(), module(), timeout(), module())
+	-> {ok, pid()}.
+start_link(Ref, ConnType, Transport, AckTimeout, Protocol) ->
 	proc_lib:start_link(?MODULE, init,
 	proc_lib:start_link(?MODULE, init,
-		[self(), Ref, ConnType, Transport, Protocol]).
+		[self(), Ref, ConnType, Transport, AckTimeout, Protocol]).
 
 
 %% We can safely assume we are on the same node as the supervisor.
 %% We can safely assume we are on the same node as the supervisor.
 %%
 %%
@@ -92,8 +94,9 @@ active_connections(SupPid) ->
 
 
 %% Supervisor internals.
 %% Supervisor internals.
 
 
--spec init(pid(), ranch:ref(), conn_type(), module(), module()) -> no_return().
-init(Parent, Ref, ConnType, Transport, Protocol) ->
+-spec init(pid(), ranch:ref(), conn_type(), module(), timeout(), module())
+	-> no_return().
+init(Parent, Ref, ConnType, Transport, AckTimeout, Protocol) ->
 	process_flag(trap_exit, true),
 	process_flag(trap_exit, true),
 	ok = ranch_server:set_connections_sup(Ref, self()),
 	ok = ranch_server:set_connections_sup(Ref, self()),
 	MaxConns = ranch_server:get_max_connections(Ref),
 	MaxConns = ranch_server:get_max_connections(Ref),
@@ -101,17 +104,18 @@ init(Parent, Ref, ConnType, Transport, Protocol) ->
 	ok = proc_lib:init_ack(Parent, {ok, self()}),
 	ok = proc_lib:init_ack(Parent, {ok, self()}),
 	loop(#state{parent=Parent, ref=Ref, conn_type=ConnType,
 	loop(#state{parent=Parent, ref=Ref, conn_type=ConnType,
 		transport=Transport, protocol=Protocol, opts=Opts,
 		transport=Transport, protocol=Protocol, opts=Opts,
-		max_conns=MaxConns}, 0, 0, []).
+		ack_timeout=AckTimeout, max_conns=MaxConns}, 0, 0, []).
 
 
 loop(State=#state{parent=Parent, ref=Ref, conn_type=ConnType,
 loop(State=#state{parent=Parent, ref=Ref, conn_type=ConnType,
 		transport=Transport, protocol=Protocol, opts=Opts,
 		transport=Transport, protocol=Protocol, opts=Opts,
-		max_conns=MaxConns}, CurConns, NbChildren, Sleepers) ->
+		ack_timeout=AckTimeout, max_conns=MaxConns},
+		CurConns, NbChildren, Sleepers) ->
 	receive
 	receive
 		{?MODULE, start_protocol, To, Socket} ->
 		{?MODULE, start_protocol, To, Socket} ->
 			case Protocol:start_link(Ref, Socket, Transport, Opts) of
 			case Protocol:start_link(Ref, Socket, Transport, Opts) of
 				{ok, Pid} ->
 				{ok, Pid} ->
 					Transport:controlling_process(Socket, Pid),
 					Transport:controlling_process(Socket, Pid),
-					Pid ! {shoot, Ref},
+					Pid ! {shoot, Ref, Transport, Socket, AckTimeout},
 					put(Pid, true),
 					put(Pid, true),
 					CurConns2 = CurConns + 1,
 					CurConns2 = CurConns + 1,
 					if CurConns2 < MaxConns ->
 					if CurConns2 < MaxConns ->

+ 2 - 1
src/ranch_listener_sup.erl

@@ -36,10 +36,11 @@ start_link(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) ->
 %% supervisor.
 %% supervisor.
 
 
 init({Ref, NbAcceptors, Transport, TransOpts, Protocol}) ->
 init({Ref, NbAcceptors, Transport, TransOpts, Protocol}) ->
+	AckTimeout = proplists:get_value(ack_timeout, TransOpts, 5000),
 	ConnType = proplists:get_value(connection_type, TransOpts, worker),
 	ConnType = proplists:get_value(connection_type, TransOpts, worker),
 	ChildSpecs = [
 	ChildSpecs = [
 		{ranch_conns_sup, {ranch_conns_sup, start_link,
 		{ranch_conns_sup, {ranch_conns_sup, start_link,
-				[Ref, ConnType, Transport, Protocol]},
+				[Ref, ConnType, Transport, AckTimeout, Protocol]},
 			permanent, infinity, supervisor, [ranch_conns_sup]},
 			permanent, infinity, supervisor, [ranch_conns_sup]},
 		{ranch_acceptors_sup, {ranch_acceptors_sup, start_link,
 		{ranch_acceptors_sup, {ranch_acceptors_sup, start_link,
 				[Ref, NbAcceptors, Transport, TransOpts]},
 				[Ref, NbAcceptors, Transport, TransOpts]},

+ 11 - 21
src/ranch_ssl.erl

@@ -30,6 +30,7 @@
 -export([messages/0]).
 -export([messages/0]).
 -export([listen/1]).
 -export([listen/1]).
 -export([accept/2]).
 -export([accept/2]).
+-export([accept_ack/2]).
 -export([connect/3]).
 -export([connect/3]).
 -export([recv/3]).
 -export([recv/3]).
 -export([send/2]).
 -export([send/2]).
@@ -164,13 +165,18 @@ listen(Opts) ->
 %% @see ssl:transport_accept/2
 %% @see ssl:transport_accept/2
 %% @see ssl:ssl_accept/2
 %% @see ssl:ssl_accept/2
 -spec accept(ssl:sslsocket(), timeout())
 -spec accept(ssl:sslsocket(), timeout())
-	-> {ok, ssl:sslsocket()} | {error, closed | timeout | atom() | tuple()}.
+	-> {ok, ssl:sslsocket()} | {error, closed | timeout | atom()}.
 accept(LSocket, Timeout) ->
 accept(LSocket, Timeout) ->
-	case ssl:transport_accept(LSocket, Timeout) of
-		{ok, CSocket} ->
-			ssl_accept(CSocket, Timeout);
+	ssl:transport_accept(LSocket, Timeout).
+
+-spec accept_ack(ssl:sslsocket(), timeout()) -> ok.
+accept_ack(CSocket, Timeout) ->
+	case ssl:ssl_accept(CSocket, Timeout) of
+		ok ->
+			ok;
 		{error, Reason} ->
 		{error, Reason} ->
-			{error, Reason}
+			ok = close(CSocket),
+			error(Reason)
 	end.
 	end.
 
 
 %% @private Experimental. Open a connection to the given host and port number.
 %% @private Experimental. Open a connection to the given host and port number.
@@ -263,22 +269,6 @@ close(Socket) ->
 
 
 %% Internal.
 %% Internal.
 
 
-%% This call always times out, either because a numeric timeout value
-%% was given, or because we've decided to use 5000ms instead of infinity.
-%% This value should be reasonable enough for the moment.
--spec ssl_accept(ssl:sslsocket(), timeout())
-	-> {ok, ssl:sslsocket()} | {error, {ssl_accept, atom()}}.
-ssl_accept(Socket, infinity) ->
-	ssl_accept(Socket, 5000);
-ssl_accept(Socket, Timeout) ->
-	case ssl:ssl_accept(Socket, Timeout) of
-		ok ->
-			{ok, Socket};
-		{error, Reason} ->
-			ok = close(Socket),
-			{error, {ssl_accept, Reason}}
-	end.
-
 %% Unfortunately the implementation of elliptic-curve ciphers that has
 %% Unfortunately the implementation of elliptic-curve ciphers that has
 %% been introduced in R16B01 is incomplete.  Depending on the particular
 %% been introduced in R16B01 is incomplete.  Depending on the particular
 %% client, this can cause the TLS handshake to break during key
 %% client, this can cause the TLS handshake to break during key

+ 5 - 0
src/ranch_tcp.erl

@@ -24,6 +24,7 @@
 -export([messages/0]).
 -export([messages/0]).
 -export([listen/1]).
 -export([listen/1]).
 -export([accept/2]).
 -export([accept/2]).
+-export([accept_ack/2]).
 -export([connect/3]).
 -export([connect/3]).
 -export([recv/3]).
 -export([recv/3]).
 -export([send/2]).
 -export([send/2]).
@@ -89,6 +90,10 @@ listen(Opts) ->
 accept(LSocket, Timeout) ->
 accept(LSocket, Timeout) ->
 	gen_tcp:accept(LSocket, Timeout).
 	gen_tcp:accept(LSocket, Timeout).
 
 
+-spec accept_ack(inet:socket(), timeout()) -> ok.
+accept_ack(_, _) ->
+	ok.
+
 %% @private Experimental. Open a connection to the given host and port number.
 %% @private Experimental. Open a connection to the given host and port number.
 %% @see gen_tcp:connect/3
 %% @see gen_tcp:connect/3
 %% @todo Probably filter Opts?
 %% @todo Probably filter Opts?

+ 4 - 1
src/ranch_transport.erl

@@ -46,7 +46,10 @@
 
 
 %% Accept connections with the given listening socket.
 %% Accept connections with the given listening socket.
 -callback accept(socket(), timeout())
 -callback accept(socket(), timeout())
-	-> {ok, socket()} | {error, closed | timeout | atom() | tuple()}.
+	-> {ok, socket()} | {error, closed | timeout | atom()}.
+
+%% Perform post-accept operations on the socket.
+-callback accept_ack(socket(), timeout()) -> ok.
 
 
 %% Experimental. Open a connection to the given host and port number.
 %% Experimental. Open a connection to the given host and port number.
 -callback connect(string(), inet:port_number(), opts())
 -callback connect(string(), inet:port_number(), opts())

+ 1 - 0
test/sendfile_SUITE.erl

@@ -305,6 +305,7 @@ sockets(Config) ->
 	end,
 	end,
 	_ = spawn_link(Fun),
 	_ = spawn_link(Fun),
 	{ok, Server} = Transport:accept(LSocket, 500),
 	{ok, Server} = Transport:accept(LSocket, 500),
+	ok = Transport:accept_ack(Server, 500),
 	receive
 	receive
 		{ok, Client} ->
 		{ok, Client} ->
 			ok = Transport:close(LSocket),
 			ok = Transport:close(LSocket),