Просмотр исходного кода

Merge branch 'fix/listener_sup_failures' of git://github.com/keynslug/syncranch

Loïc Hoguin 12 лет назад
Родитель
Сommit
058ad09e8b

+ 1 - 0
src/ranch.erl

@@ -55,6 +55,7 @@
 start_listener(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts)
 		when is_integer(NbAcceptors) andalso is_atom(Transport)
 		andalso is_atom(Protocol) ->
+	_ = code:ensure_loaded(Transport),
 	case erlang:function_exported(Transport, name, 0) of
 		false ->
 			{error, badarg};

+ 7 - 7
src/ranch_acceptors_sup.erl

@@ -17,7 +17,7 @@
 -behaviour(supervisor).
 
 %% API.
--export([start_link/7]).
+-export([start_link/5]).
 
 %% supervisor.
 -export([init/1]).
@@ -25,16 +25,16 @@
 %% API.
 
 -spec start_link(any(), non_neg_integer(), module(), any(),
-	module(), pid(), pid()) -> {ok, pid()}.
-start_link(Ref, NbAcceptors, Transport, TransOpts,
-		Protocol, ListenerPid, ConnsPid) ->
+	module()) -> {ok, pid()}.
+start_link(Ref, NbAcceptors, Transport, TransOpts, Protocol) ->
 	supervisor:start_link(?MODULE, [Ref, NbAcceptors, Transport, TransOpts,
-		Protocol, ListenerPid, ConnsPid]).
+		Protocol]).
 
 %% supervisor.
 
-init([Ref, NbAcceptors, Transport, TransOpts,
-		Protocol, ListenerPid, ConnsPid]) ->
+init([Ref, NbAcceptors, Transport, TransOpts, Protocol]) ->
+	ListenerPid = ranch_server:lookup_listener(Ref),
+	ConnsPid = ranch_server:lookup_connections_sup(Ref),
 	LSocket = case proplists:get_value(socket, TransOpts) of
 		undefined ->
 			{ok, Socket} = Transport:listen(TransOpts),

+ 6 - 5
src/ranch_conns_sup.erl

@@ -17,7 +17,7 @@
 -behaviour(supervisor).
 
 %% API.
--export([start_link/0]).
+-export([start_link/1]).
 -export([start_protocol/5]).
 
 %% supervisor.
@@ -25,9 +25,9 @@
 
 %% API.
 
--spec start_link() -> {ok, pid()}.
-start_link() ->
-	supervisor:start_link(?MODULE, []).
+-spec start_link(any()) -> {ok, pid()}.
+start_link(Ref) ->
+	supervisor:start_link(?MODULE, Ref).
 
 -spec start_protocol(pid(), inet:socket(), module(), module(), any())
 	-> {ok, pid()}.
@@ -36,6 +36,7 @@ start_protocol(ListenerPid, Socket, Transport, Protocol, Opts) ->
 
 %% supervisor.
 
-init([]) ->
+init(Ref) ->
+	ok = ranch_server:set_connections_sup(Ref, self()),
 	{ok, {{simple_one_for_one, 0, 1}, [{?MODULE, {?MODULE, start_protocol, []},
 		temporary, brutal_kill, worker, [?MODULE]}]}}.

+ 1 - 0
src/ranch_listener.erl

@@ -101,6 +101,7 @@ set_protocol_options(ServerPid, ProtoOpts) ->
 
 %% @private
 init([Ref, MaxConns, ProtoOpts]) ->
+	ok = ranch_server:insert_listener(Ref, self()),
 	{ok, #state{ref=Ref, max_conns=MaxConns, proto_opts=ProtoOpts}}.
 
 %% @private

+ 19 - 17
src/ranch_listener_sup.erl

@@ -28,23 +28,25 @@
 	-> {ok, pid()}.
 start_link(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) ->
 	MaxConns = proplists:get_value(max_connections, TransOpts, 1024),
-	{ok, SupPid} = supervisor:start_link(?MODULE, []),
-	{ok, ListenerPid} = supervisor:start_child(SupPid,
-		{ranch_listener, {ranch_listener, start_link,
-			[Ref, MaxConns, ProtoOpts]},
-		 permanent, 5000, worker, [ranch_listener]}),
-	ok = ranch_server:insert_listener(Ref, ListenerPid),
-	{ok, ConnsPid} = supervisor:start_child(SupPid,
-		{ranch_conns_sup, {ranch_conns_sup, start_link, []},
-		 permanent, 5000, supervisor, [ranch_conns_sup]}),
-	{ok, _PoolPid} = supervisor:start_child(SupPid,
-		{ranch_acceptors_sup, {ranch_acceptors_sup, start_link, [
-			Ref, NbAcceptors, Transport, TransOpts,
-			Protocol, ListenerPid, ConnsPid
-		]}, permanent, 5000, supervisor, [ranch_acceptors_sup]}),
-	{ok, SupPid}.
+	supervisor:start_link(?MODULE, {
+		Ref, NbAcceptors, MaxConns, Transport, TransOpts, Protocol, ProtoOpts
+		}).
 
 %% supervisor.
 
-init([]) ->
-	{ok, {{one_for_all, 10, 10}, []}}.
+init({Ref, NbAcceptors, MaxConns, Transport, TransOpts, Protocol, ProtoOpts}) ->
+	ChildSpecs = [
+		%% listener
+		{ranch_listener, {ranch_listener, start_link,
+			[Ref, MaxConns, ProtoOpts]},
+		 permanent, 5000, worker, [ranch_listener]},
+		%% conns_sup
+		{ranch_conns_sup, {ranch_conns_sup, start_link, [Ref]},
+		 permanent, infinity, supervisor, [ranch_conns_sup]},
+		%% acceptors_sup
+		{ranch_acceptors_sup, {ranch_acceptors_sup, start_link,
+			[Ref, NbAcceptors, Transport, TransOpts, Protocol]
+		 }, permanent, infinity, supervisor, [ranch_acceptors_sup]}
+	],
+	{ok, {{rest_for_one, 10, 10}, ChildSpecs}}.
+

+ 21 - 3
src/ranch_server.erl

@@ -20,6 +20,8 @@
 -export([start_link/0]).
 -export([insert_listener/2]).
 -export([lookup_listener/1]).
+-export([set_connections_sup/2]).
+-export([lookup_connections_sup/1]).
 -export([add_acceptor/2]).
 -export([send_to_acceptors/2]).
 -export([add_connection/1]).
@@ -52,7 +54,7 @@ start_link() ->
 %% @doc Insert a listener into the database.
 -spec insert_listener(any(), pid()) -> ok.
 insert_listener(Ref, Pid) ->
-	true = ets:insert_new(?TAB, {{listener, Ref}, Pid}),
+	true = ets:insert_new(?TAB, {{listener, Ref}, Pid, undefined}),
 	gen_server:cast(?MODULE, {insert_listener, Ref, Pid}).
 
 %% @doc Lookup a listener in the database.
@@ -60,6 +62,17 @@ insert_listener(Ref, Pid) ->
 lookup_listener(Ref) ->
 	ets:lookup_element(?TAB, {listener, Ref}, 2).
 
+%% @doc Set a connection supervisor associated with specific listener.
+-spec set_connections_sup(any(), pid()) -> ok.
+set_connections_sup(Ref, Pid) ->
+	true = ets:update_element(?TAB, {listener, Ref}, {3, Pid}),
+	ok.
+
+%% @doc Lookup a connection supervisor used by specific listener.
+-spec lookup_connections_sup(any()) -> pid() | undefined.
+lookup_connections_sup(Ref) ->
+	ets:lookup_element(?TAB, {listener, Ref}, 3).
+
 %% @doc Add an acceptor for the given listener.
 -spec add_acceptor(any(), pid()) -> ok.
 add_acceptor(Ref, Pid) ->
@@ -147,6 +160,11 @@ remove_process(Key = {listener, Ref}, MonitorRef, Pid, Monitors) ->
 	true = ets:delete(?TAB, {connections, Pid}),
 	lists:keydelete({MonitorRef, Pid}, 1, Monitors);
 remove_process(Key = {acceptors, _}, MonitorRef, Pid, Monitors) ->
-	Acceptors = ets:lookup_element(?TAB, Key, 2),
-	true = ets:insert(?TAB, {Key, lists:delete(Pid, Acceptors)}),
+	try
+		Acceptors = ets:lookup_element(?TAB, Key, 2),
+		true = ets:update_element(?TAB, Key, {2, lists:delete(Pid, Acceptors)})
+	catch
+		error:_ ->
+			ok
+	end,
 	lists:keydelete({MonitorRef, Pid}, 1, Monitors).

+ 123 - 1
test/acceptor_SUITE.erl

@@ -41,10 +41,15 @@
 -export([tcp_max_connections_and_beyond/1]).
 -export([tcp_upgrade/1]).
 
+%% supervisor.
+-export([supervisor_clean_restart/1]).
+-export([supervisor_clean_child_restart/1]).
+-export([supervisor_conns_alive/1]).
+
 %% ct.
 
 all() ->
-	[{group, tcp}, {group, ssl}, {group, misc}].
+	[{group, tcp}, {group, ssl}, {group, misc}, {group, supervisor}].
 
 groups() ->
 	[{tcp, [
@@ -61,6 +66,10 @@ groups() ->
 		ssl_echo
 	]}, {misc, [
 		misc_bad_transport
+	]}, {supervisor, [
+		supervisor_clean_restart,
+		supervisor_clean_child_restart,
+		supervisor_conns_alive
 	]}].
 
 init_per_suite(Config) ->
@@ -261,6 +270,119 @@ tcp_upgrade(_) ->
 	ok = connect_loop(Port, 1, 0),
 	receive upgraded -> ok after 1000 -> error(timeout) end.
 
+%% Supervisor tests
+
+supervisor_clean_restart(_) ->
+	%% There we verify that mature listener death will not let
+	%% whole supervisor down and also the supervisor itself will
+	%% restart everything properly.
+	Ref = supervisor_clean_restart,
+	NbAcc = 4,
+	{ok, Pid} = ranch:start_listener(Ref,
+		NbAcc, ranch_tcp, [{port, 0}], echo_protocol, []),
+	%% Trace supervisor spawns.
+	1 = erlang:trace(Pid, true, [procs, set_on_spawn]),
+	ListenerPid0 = ranch_server:lookup_listener(Ref),
+	erlang:exit(ListenerPid0, kill),
+	receive after 1000 -> ok end,
+	%% Verify that supervisor is alive
+	true = is_process_alive(Pid),
+	%% ...but children are dead.
+	false = is_process_alive(ListenerPid0),
+	%% Receive traces from newly started children
+	ListenerPid = receive {trace, Pid, spawn, Pid1, _} -> Pid1 end,
+	_ConnSupPid = receive {trace, Pid, spawn, Pid2, _} -> Pid2 end,
+	AccSupPid = receive {trace, Pid, spawn, Pid3, _} -> Pid3 end,
+	%% ...and its acceptors.
+	[receive {trace, AccSupPid, spawn, _Pid, _} -> ok end ||
+		_ <- lists:seq(1, NbAcc)],
+	%% No more traces then.
+	receive
+		{trace, EPid, spawn, _, _} when EPid == Pid; EPid == AccSupPid ->
+			error(invalid_restart)
+	after 1000 -> ok end,
+	%% Verify that new children registered themselves properly.
+	ListenerPid = ranch_server:lookup_listener(Ref),
+	_ = erlang:trace(all, false, [all]),
+	ok = clean_traces().
+
+supervisor_clean_child_restart(_) ->
+	%% Then we verify that only parts of the supervision tree
+	%% restarted in the case of failure.
+	Ref = supervisor_clean_child_restart,
+	%% Trace socket allocations.
+	_ = erlang:trace(new, true, [call]),
+	1 = erlang:trace_pattern({ranch_tcp, listen, 1}, [{'_', [], [{return_trace}]}], [global]),
+	{ok, Pid} = ranch:start_listener(Ref,
+		1, ranch_tcp, [{port, 0}], echo_protocol, []),
+	%% Trace supervisor spawns.
+	1 = erlang:trace(Pid, true, [procs, set_on_spawn]),
+	ListenerPid0 = ranch_server:lookup_listener(Ref),
+	%% Manually shut the listening socket down.
+	LSocket = receive
+		{trace, _, return_from, {ranch_tcp, listen, 1}, {ok, Socket}} ->
+			Socket
+	after 0 ->
+		error(lsocket_unknown)
+	end,
+	ok = gen_tcp:close(LSocket),
+	receive after 1000 -> ok end,
+	%% Verify that supervisor and its first two children are alive.
+	true = is_process_alive(Pid),
+	true = is_process_alive(ListenerPid0),
+	%% Check that acceptors_sup is restarted properly.
+	AccSupPid = receive {trace, Pid, spawn, Pid1, _} -> Pid1 end,
+	AccPid = receive {trace, AccSupPid, spawn, Pid2, _} -> Pid2 end,
+	receive {trace, AccPid, spawn, _, _} -> ok end,
+	%% No more traces then.
+	receive
+		{trace, _, spawn, _, _} -> error(invalid_restart)
+	after 1000 -> ok end,
+	%% Verify that children still registered right.
+	ListenerPid0 = ranch_server:lookup_listener(Ref),
+	_ = erlang:trace_pattern({ranch_tcp, listen, 1}, false, []),
+	_ = erlang:trace(all, false, [all]),
+	ok = clean_traces(),
+	ok.
+
+supervisor_conns_alive(_) ->
+	%% And finally we make sure that in the case of partial failure
+	%% live connections are not being killed.
+	Ref = supervisor_conns_alive,
+	_ = erlang:trace(new, true, [call]),
+	1 = erlang:trace_pattern({ranch_tcp, listen, 1}, [{'_', [], [{return_trace}]}], [global]),
+	{ok, _} = ranch:start_listener(Ref,
+		1, ranch_tcp, [{port, 0}], remove_conn_and_wait_protocol, [{remove, false}]),
+	ok,
+	%% Get the listener socket
+	LSocket = receive
+		{trace, _, return_from, {ranch_tcp, listen, 1}, {ok, S}} ->
+			S
+	after 0 ->
+		error(lsocket_unknown)
+	end,
+	TcpPort = ranch:get_port(Ref),
+	{ok, Socket} = gen_tcp:connect("localhost", TcpPort,
+		[binary, {active, true}, {packet, raw}]),
+	%% Shut the socket down
+	ok = gen_tcp:close(LSocket),
+	%% Assert that client is still viable.
+	receive {tcp_closed, _} -> error(closed) after 1500 -> ok end,
+	ok = gen_tcp:send(Socket, <<"poke">>),
+	receive {tcp_closed, _} -> ok end,
+	_ = erlang:trace(all, false, [all]),
+	ok = clean_traces().
+
+clean_traces() ->
+	receive
+		{trace, _, _, _} ->
+			clean_traces();
+		{trace, _, _, _, _} ->
+			clean_traces()
+	after 0 ->
+		ok
+	end.
+
 %% Utility functions.
 
 connect_loop(_, 0, _) ->