Browse Source

Add experimental num_listen_sockets option

juhlig 6 years ago
parent
commit
4abd27adb5
3 changed files with 138 additions and 13 deletions
  1. 1 0
      src/ranch.erl
  2. 35 11
      src/ranch_acceptors_sup.erl
  3. 102 2
      test/acceptor_SUITE.erl

+ 1 - 0
src/ranch.erl

@@ -53,6 +53,7 @@
 	logger => module(),
 	num_acceptors => pos_integer(),
 	num_conns_sups => pos_integer(),
+	num_listen_sockets => pos_integer(),
 	shutdown => timeout() | brutal_kill,
 	socket_opts => any()
 }.

+ 35 - 11
src/ranch_acceptors_sup.erl

@@ -26,27 +26,51 @@ start_link(Ref, NumAcceptors, Transport) ->
 init([Ref, NumAcceptors, Transport]) ->
 	TransOpts = ranch_server:get_transport_options(Ref),
 	Logger = maps:get(logger, TransOpts, error_logger),
+	NumListenSockets = maps:get(num_listen_sockets, TransOpts, 1),
 	SocketOpts = maps:get(socket_opts, TransOpts, []),
 	%% We temporarily put the logger in the process dictionary
 	%% so that it can be used from ranch:filter_options. The
 	%% interface as it currently is does not allow passing it
 	%% down otherwise.
 	put(logger, Logger),
-	LSocket = case Transport:listen(SocketOpts) of
+	LSockets = start_listen_sockets(Ref, NumListenSockets, Transport, SocketOpts, Logger),
+	erase(logger),
+	Procs = [begin
+		LSocketId = (AcceptorId rem NumListenSockets) + 1,
+		{_, LSocket} = lists:keyfind(LSocketId, 1, LSockets),
+		{
+			{acceptor, self(), AcceptorId},
+			{ranch_acceptor, start_link, [Ref, AcceptorId, LSocket, Transport, Logger]},
+			permanent, brutal_kill, worker, []
+		}
+	end || AcceptorId <- lists:seq(1, NumAcceptors)],
+	{ok, {{one_for_one, 1, 5}, Procs}}.
+
+-spec start_listen_sockets(any(), pos_integer(), module(), list(), module())
+	-> [{pos_integer(), inet:socket()}].
+start_listen_sockets(Ref, NumListenSockets, Transport, SocketOpts0, Logger) when NumListenSockets > 0 ->
+	BaseSocket = start_listen_socket(Ref, Transport, SocketOpts0, Logger),
+	{ok, Addr={_, Port}} = Transport:sockname(BaseSocket),
+	ranch_server:set_addr(Ref, Addr),
+	SocketOpts = case lists:keyfind(port, 1, SocketOpts0) of
+		{port, Port} ->
+			SocketOpts0;
+		_ ->
+			[{port, Port}|lists:keydelete(port, 1, SocketOpts0)]
+	end,
+	ExtraSockets = [
+		{N, start_listen_socket(Ref, Transport, SocketOpts, Logger)}
+	|| N <- lists:seq(2, NumListenSockets)],
+	[{1, BaseSocket}|ExtraSockets].
+
+-spec start_listen_socket(any(), module(), list(), module()) -> inet:socket().
+start_listen_socket(Ref, Transport, SocketOpts, Logger) ->
+	case Transport:listen(SocketOpts) of
 		{ok, Socket} ->
-			erase(logger),
 			Socket;
 		{error, Reason} ->
 			listen_error(Ref, Transport, SocketOpts, Reason, Logger)
-	end,
-	{ok, Addr} = Transport:sockname(LSocket),
-	ranch_server:set_addr(Ref, Addr),
-	Procs = [
-		{{acceptor, self(), N}, {ranch_acceptor, start_link, [
-			Ref, N, LSocket, Transport, Logger
-		]}, permanent, brutal_kill, worker, []}
-			|| N <- lists:seq(1, NumAcceptors)],
-	{ok, {{one_for_one, 1, 5}, Procs}}.
+	end.
 
 -spec listen_error(any(), module(), any(), atom(), module()) -> no_return().
 listen_error(Ref, Transport, SocketOpts0, Reason, Logger) ->

+ 102 - 2
test/acceptor_SUITE.erl

@@ -41,6 +41,8 @@ groups() ->
 		tcp_getopts_capability,
 		tcp_getstat_capability,
 		tcp_upgrade,
+		tcp_10_acceptors_10_listen_sockets,
+		tcp_many_listen_sockets_no_reuseport,
 		tcp_error_eaddrinuse,
 		tcp_error_eacces
 	]}, {ssl, [
@@ -53,6 +55,8 @@ groups() ->
 		ssl_upgrade_from_tcp,
 		ssl_getopts_capability,
 		ssl_getstat_capability,
+		ssl_10_acceptors_10_listen_sockets,
+		ssl_many_listen_sockets_no_reuseport,
 		ssl_error_eaddrinuse,
 		ssl_error_no_cert,
 		ssl_error_eacces
@@ -399,6 +403,50 @@ ssl_accept_error(_) ->
 	true = is_process_alive(AcceptorPid),
 	ok = ranch:stop_listener(Name).
 
+ssl_10_acceptors_10_listen_sockets(_) ->
+	case do_os_supports_reuseport() of
+		true ->
+			ok = do_ssl_10_acceptors_10_listen_sockets();
+		false ->
+			{skip, "No SO_REUSEPORT support."}
+	end.
+
+do_ssl_10_acceptors_10_listen_sockets() ->
+	doc("Ensure that we can use 10 listen sockets across 10 acceptors with SSL."),
+	Name = name(),
+	Opts = ct_helper:get_certs_from_ets(),
+	{ok, ListenerSupPid} = ranch:start_listener(Name,
+		ranch_ssl, #{
+			num_acceptors => 10,
+			num_listen_sockets => 10,
+			socket_opts => [{raw, 1, 15, <<1:32/native>>}|Opts]},
+		echo_protocol, []),
+	10 = length(do_get_listener_sockets(ListenerSupPid)),
+	ok = ranch:stop_listener(Name),
+	{'EXIT', _} = begin catch ranch:get_port(Name) end,
+	ok.
+
+ssl_many_listen_sockets_no_reuseport(_) ->
+	case do_os_supports_reuseport() of
+		true ->
+			ok = do_ssl_many_listen_sockets_no_reuseport();
+		false ->
+			{skip, "No SO_REUSEPORT support."}
+	end.
+
+do_ssl_many_listen_sockets_no_reuseport() ->
+	doc("Confirm that ranch:start_listener/5 fails when SO_REUSEPORT is not available with SSL."),
+	Name = name(),
+	Opts = ct_helper:get_certs_from_ets(),
+	{error, eaddrinuse} = ranch:start_listener(Name,
+		ranch_ssl, #{
+			num_acceptors => 10,
+			num_listen_sockets => 10,
+			socket_opts => [{raw, 1, 15, <<0:32/native>>}|Opts]},
+		echo_protocol, []),
+	{'EXIT', _} = begin catch ranch:get_port(Name) end,
+	ok.
+
 ssl_active_echo(_) ->
 	doc("Ensure that active mode works with SSL transport."),
 	Name = name(),
@@ -622,6 +670,48 @@ ssl_error_eacces(_) ->
 
 %% tcp.
 
+tcp_10_acceptors_10_listen_sockets(_) ->
+	case do_os_supports_reuseport() of
+		true ->
+			ok = do_tcp_10_acceptors_10_listen_sockets();
+		false ->
+			{skip, "No SO_REUSEPORT support."}
+	end.
+
+do_tcp_10_acceptors_10_listen_sockets() ->
+	doc("Ensure that we can use 10 listen sockets across 10 acceptors with TCP."),
+	Name = name(),
+	{ok, ListenerSupPid} = ranch:start_listener(Name,
+		ranch_tcp, #{
+			num_acceptors => 10,
+			num_listen_sockets => 10,
+			socket_opts => [{raw, 1, 15, <<1:32/native>>}]},
+		echo_protocol, []),
+	10 = length(do_get_listener_sockets(ListenerSupPid)),
+	ok = ranch:stop_listener(Name),
+	{'EXIT', _} = begin catch ranch:get_port(Name) end,
+	ok.
+
+tcp_many_listen_sockets_no_reuseport(_) ->
+	case do_os_supports_reuseport() of
+		true ->
+			ok = do_tcp_many_listen_sockets_no_reuseport();
+		false ->
+			{skip, "No SO_REUSEPORT support."}
+	end.
+
+do_tcp_many_listen_sockets_no_reuseport() ->
+	doc("Confirm that ranch:start_listener/5 fails when SO_REUSEPORT is not available with TCP."),
+	Name = name(),
+	{error, eaddrinuse} = ranch:start_listener(Name,
+		ranch_tcp, #{
+			num_acceptors => 10,
+			num_listen_sockets => 10,
+			socket_opts => [{raw, 1, 15, <<0:32/native>>}]},
+		echo_protocol, []),
+	{'EXIT', _} = begin catch ranch:get_port(Name) end,
+	ok.
+
 tcp_active_echo(_) ->
 	doc("Ensure that active mode works with TCP transport."),
 	Name = name(),
@@ -1246,11 +1336,14 @@ clean_traces() ->
 	end.
 
 do_get_listener_socket(ListenerSupPid) ->
+	[LSocket] = do_get_listener_sockets(ListenerSupPid),
+	LSocket.
+
+do_get_listener_sockets(ListenerSupPid) ->
 	[AcceptorsSupPid] = [Pid || {ranch_acceptors_sup, Pid, supervisor, _}
 		<- supervisor:which_children(ListenerSupPid)],
 	{links, Links} = erlang:process_info(AcceptorsSupPid, links),
-	[LSocket] = [P || P <- Links, is_port(P)],
-	LSocket.
+	[P || P <- Links, is_port(P)].
 
 do_conns_which_children(Name) ->
 	Conns = [supervisor:which_children(ConnsSup) ||
@@ -1273,3 +1366,10 @@ do_conns_count_children(Name) ->
 		[supervisor:count_children(ConnsSup) ||
 			{_, ConnsSup} <- ranch_server:get_connections_sups(Name)]
 	).
+
+do_os_supports_reuseport() ->
+	case {os:type(), os:version()} of
+		{{unix, linux}, {Major, _, _}} when Major > 3 -> true;
+		{{unix, linux}, {3, Minor, _}} when Minor >= 9 -> true;
+		_ -> false
+	end.