Browse Source

Enable connection count alarms

juhlig 4 years ago
parent
commit
50f6191bf8
4 changed files with 282 additions and 9 deletions
  1. 61 0
      doc/src/guide/listeners.asciidoc
  2. 28 0
      src/ranch.erl
  3. 118 8
      src/ranch_conns_sup.erl
  4. 75 1
      test/acceptor_SUITE.erl

+ 61 - 0
doc/src/guide/listeners.asciidoc

@@ -328,6 +328,67 @@ processes.
     echo_protocol, []
     echo_protocol, []
 ).
 ).
 
 
+=== Setting connection count alarms
+
+The `alarms` transport options allows you to configure alarms
+which will be triggered when the number of connections under a connection
+supervisor reaches or exceeds the defined treshold.
+
+The `alarms` transport option takes a map with alarm names as keys and alarm
+options as values.
+
+Any term is allowed as an alarm name.
+
+Alarm options, defining the alarm behavior, are again a map with the following
+keys, all of which are mandatory:
+
+`type`::
+The alarm type. Currently, `num_connections` is the only allowed type.
+
+`treshold`::
+The alarm treshold. When the number of connections under a connection
+supervisor reaches or exceeds this value, the alarm will trigger and
+call the function given in the `callback` key.
+
+`callback`::
+The alarm function, that is, the function which will be called when the
+alarm is triggered. Its arguments are the listener name, the alarm
+name, the Pid of the triggering connection supervisor, and the Pids of
+all the connection processes under that supervisor.
+
+`cooldown`::
+The minimum time to elapse before the alarm can trigger again, in
+milliseconds.
+
+.Setting an alarm to log warnings when the number of connections exceed 100
+
+[source,erlang]
+----
+Alarms = #{
+    my_alarm => #{
+        type => num_connections,
+        treshold => 100,
+        callback => fun(Ref, Name, ConnSup, ConnPids]) ->
+            logger:warning("Warning (~s): "
+                    "Supervisor ~s of listener ~s "
+                    "has ~b connections",
+                [Name, Ref, ConnSup, length(ConnPids)])
+        end,
+        cooldown => 5000
+    }
+},
+{ok, _} = ranch:start_listener(tcp_echo,
+    ranch_tcp, #{alarms => Alarms, socket_opts => [{port, 5555}]},
+    echo_protocol, []
+).
+----
+
+In the example code, an alarm named `my_alarm` is defined, which will
+call the given function when the number of connections under a
+connection supervisor reaches or exceeds 100. When the number of
+connections is still (or again) above 100 after 5 seconds, the
+alarm will trigger again.
+
 === When running out of file descriptors
 === When running out of file descriptors
 
 
 Operating systems have limits on the number of sockets
 Operating systems have limits on the number of sockets

+ 28 - 0
src/ranch.erl

@@ -55,7 +55,17 @@
 -type opts() :: any() | transport_opts(any()).
 -type opts() :: any() | transport_opts(any()).
 -export_type([opts/0]).
 -export_type([opts/0]).
 
 
+-type alarm(Type, Callback) :: #{
+	type := Type,
+	callback := Callback,
+	treshold := non_neg_integer(),
+	cooldown := non_neg_integer()
+}.
+
+-type alarm_num_connections() :: alarm(num_connections, fun((ref(), term(), pid(), [pid()]) -> any())).
+
 -type transport_opts(SocketOpts) :: #{
 -type transport_opts(SocketOpts) :: #{
+	alarms => #{term() => alarm_num_connections()},
 	connection_type => worker | supervisor,
 	connection_type => worker | supervisor,
 	handshake_timeout => timeout(),
 	handshake_timeout => timeout(),
 	logger => module(),
 	logger => module(),
@@ -123,6 +133,16 @@ validate_transport_opt(max_connections, infinity, _) ->
 	true;
 	true;
 validate_transport_opt(max_connections, Value, _) ->
 validate_transport_opt(max_connections, Value, _) ->
 	is_integer(Value) andalso Value >= 0;
 	is_integer(Value) andalso Value >= 0;
+validate_transport_opt(alarms, Alarms, _) ->
+	maps:fold(
+		fun
+			(_, Opts, true) ->
+				validate_alarm(Opts);
+			(_, _, false) ->
+				false
+		end,
+		true,
+		Alarms);
 validate_transport_opt(logger, Value, _) ->
 validate_transport_opt(logger, Value, _) ->
 	is_atom(Value);
 	is_atom(Value);
 validate_transport_opt(num_acceptors, Value, _) ->
 validate_transport_opt(num_acceptors, Value, _) ->
@@ -145,6 +165,14 @@ validate_transport_opt(socket_opts, _, _) ->
 validate_transport_opt(_, _, _) ->
 validate_transport_opt(_, _, _) ->
 	false.
 	false.
 
 
+validate_alarm(#{type := num_connections, treshold := Treshold,
+		callback := Callback, cooldown := Cooldown}) ->
+	is_integer(Treshold) andalso Treshold >= 0
+	andalso is_integer(Cooldown) andalso Cooldown >= 0
+	andalso is_function(Callback, 4);
+validate_alarm(_) ->
+	false.
+
 maybe_started({error, {{shutdown,
 maybe_started({error, {{shutdown,
 		{failed_to_start_child, ranch_acceptors_sup,
 		{failed_to_start_child, ranch_acceptors_sup,
 			{listen_error, _, Reason}}}, _}} = Error) ->
 			{listen_error, _, Reason}}}, _}} = Error) ->

+ 118 - 8
src/ranch_conns_sup.erl

@@ -43,6 +43,7 @@
 	handshake_timeout :: timeout(),
 	handshake_timeout :: timeout(),
 	max_conns = undefined :: ranch:max_conns(),
 	max_conns = undefined :: ranch:max_conns(),
 	stats_counters_ref :: counters:counters_ref(),
 	stats_counters_ref :: counters:counters_ref(),
+	alarms = #{} :: #{term() => {undefined | reference(), map()}},
 	logger = undefined :: module()
 	logger = undefined :: module()
 }).
 }).
 
 
@@ -106,6 +107,7 @@ init(Parent, Ref, Id, Transport, TransOpts, Protocol, Logger) ->
 	process_flag(trap_exit, true),
 	process_flag(trap_exit, true),
 	ok = ranch_server:set_connections_sup(Ref, Id, self()),
 	ok = ranch_server:set_connections_sup(Ref, Id, self()),
 	MaxConns = ranch_server:get_max_connections(Ref),
 	MaxConns = ranch_server:get_max_connections(Ref),
+	Alarms = get_alarms(TransOpts),
 	ConnType = maps:get(connection_type, TransOpts, worker),
 	ConnType = maps:get(connection_type, TransOpts, worker),
 	Shutdown = maps:get(shutdown, TransOpts, 5000),
 	Shutdown = maps:get(shutdown, TransOpts, 5000),
 	HandshakeTimeout = maps:get(handshake_timeout, TransOpts, 5000),
 	HandshakeTimeout = maps:get(handshake_timeout, TransOpts, 5000),
@@ -116,11 +118,12 @@ init(Parent, Ref, Id, Transport, TransOpts, Protocol, Logger) ->
 		shutdown=Shutdown, transport=Transport, protocol=Protocol,
 		shutdown=Shutdown, transport=Transport, protocol=Protocol,
 		opts=ProtoOpts, stats_counters_ref=StatsCounters,
 		opts=ProtoOpts, stats_counters_ref=StatsCounters,
 		handshake_timeout=HandshakeTimeout,
 		handshake_timeout=HandshakeTimeout,
-		max_conns=MaxConns, logger=Logger}, 0, 0, []).
+		max_conns=MaxConns, alarms=Alarms,
+		logger=Logger}, 0, 0, []).
 
 
 loop(State=#state{parent=Parent, ref=Ref, id=Id, conn_type=ConnType,
 loop(State=#state{parent=Parent, ref=Ref, id=Id, conn_type=ConnType,
 		transport=Transport, protocol=Protocol, opts=Opts, stats_counters_ref=StatsCounters,
 		transport=Transport, protocol=Protocol, opts=Opts, stats_counters_ref=StatsCounters,
-		max_conns=MaxConns, logger=Logger}, CurConns, NbChildren, Sleepers) ->
+		alarms=Alarms, max_conns=MaxConns, logger=Logger}, CurConns, NbChildren, Sleepers) ->
 	receive
 	receive
 		{?MODULE, start_protocol, To, Socket} ->
 		{?MODULE, start_protocol, To, Socket} ->
 			try Protocol:start_link(Ref, Transport, Opts) of
 			try Protocol:start_link(Ref, Transport, Opts) of
@@ -181,6 +184,12 @@ loop(State=#state{parent=Parent, ref=Ref, id=Id, conn_type=ConnType,
 		{set_protocol_options, Opts2} ->
 		{set_protocol_options, Opts2} ->
 			loop(State#state{opts=Opts2},
 			loop(State#state{opts=Opts2},
 				CurConns, NbChildren, Sleepers);
 				CurConns, NbChildren, Sleepers);
+		{timeout, _, {activate_alarm, AlarmName}} when is_map_key(AlarmName, Alarms) ->
+			{AlarmOpts, _} = maps:get(AlarmName, Alarms),
+			NewAlarm = trigger_alarm(Ref, AlarmName, {AlarmOpts, undefined}, CurConns),
+			loop(State#state{alarms=Alarms#{AlarmName => NewAlarm}}, CurConns, NbChildren, Sleepers);
+		{timeout, _, {activate_alarm, _}} ->
+			loop(State, CurConns, NbChildren, Sleepers);
 		{'EXIT', Parent, Reason} ->
 		{'EXIT', Parent, Reason} ->
 			terminate(State, Reason, NbChildren);
 			terminate(State, Reason, NbChildren);
 		{'EXIT', Pid, Reason} when Sleepers =:= [] ->
 		{'EXIT', Pid, Reason} when Sleepers =:= [] ->
@@ -245,18 +254,20 @@ loop(State=#state{parent=Parent, ref=Ref, id=Id, conn_type=ConnType,
 	end.
 	end.
 
 
 handshake(State=#state{ref=Ref, transport=Transport, handshake_timeout=HandshakeTimeout,
 handshake(State=#state{ref=Ref, transport=Transport, handshake_timeout=HandshakeTimeout,
-		max_conns=MaxConns}, CurConns, NbChildren, Sleepers, To, Socket, SupPid, ProtocolPid) ->
+		max_conns=MaxConns, alarms=Alarms0}, CurConns, NbChildren, Sleepers, To, Socket, SupPid, ProtocolPid) ->
 	case Transport:controlling_process(Socket, ProtocolPid) of
 	case Transport:controlling_process(Socket, ProtocolPid) of
 		ok ->
 		ok ->
 			ProtocolPid ! {handshake, Ref, Transport, Socket, HandshakeTimeout},
 			ProtocolPid ! {handshake, Ref, Transport, Socket, HandshakeTimeout},
 			put(SupPid, active),
 			put(SupPid, active),
 			CurConns2 = CurConns + 1,
 			CurConns2 = CurConns + 1,
-			if CurConns2 < MaxConns ->
+			Sleepers2 = if CurConns2 < MaxConns ->
 					To ! self(),
 					To ! self(),
-					loop(State, CurConns2, NbChildren + 1, Sleepers);
+					Sleepers;
 				true ->
 				true ->
-					loop(State, CurConns2, NbChildren + 1, [To|Sleepers])
-			end;
+					[To|Sleepers]
+			end,
+			Alarms1 = trigger_alarms(Ref, Alarms0, CurConns2),
+			loop(State#state{alarms=Alarms1}, CurConns2, NbChildren + 1, Sleepers2);
 		{error, _} ->
 		{error, _} ->
 			Transport:close(Socket),
 			Transport:close(Socket),
 			%% Only kill the supervised pid, because the connection's pid,
 			%% Only kill the supervised pid, because the connection's pid,
@@ -266,6 +277,45 @@ handshake(State=#state{ref=Ref, transport=Transport, handshake_timeout=Handshake
 			loop(State, CurConns, NbChildren, Sleepers)
 			loop(State, CurConns, NbChildren, Sleepers)
 	end.
 	end.
 
 
+trigger_alarms(Ref, Alarms, CurConns) ->
+	maps:map(
+		fun
+			(AlarmName, Alarm) ->
+				trigger_alarm(Ref, AlarmName, Alarm, CurConns)
+		end,
+		Alarms
+	).
+
+trigger_alarm(Ref, AlarmName, {Opts=#{treshold := Treshold, callback := Callback}, undefined}, CurConns) when CurConns >= Treshold ->
+	ActiveConns = [Pid || {Pid, active} <- get()],
+	case Callback of
+		{Mod, Fun} ->
+			spawn(Mod, Fun, [Ref, AlarmName, self(), ActiveConns]);
+		_ ->
+			Self = self(),
+			spawn(fun () -> Callback(Ref, AlarmName, Self, ActiveConns) end)
+	end,
+	{Opts, schedule_activate_alarm(AlarmName, Opts)};
+trigger_alarm(_, _, Alarm, _) ->
+	Alarm.
+
+schedule_activate_alarm(AlarmName, #{cooldown := Cooldown}) when Cooldown > 0 ->
+	erlang:start_timer(Cooldown, self(), {activate_alarm, AlarmName});
+schedule_activate_alarm(_, _) ->
+	undefined.
+
+get_alarms(#{alarms := Alarms}) when is_map(Alarms) ->
+	maps:fold(
+		fun
+			(Name, Opts = #{type := num_connections}, Acc) -> Acc#{Name => {Opts, undefined}};
+			(_, _, Acc) -> Acc
+		end,
+		#{},
+		Alarms
+	);
+get_alarms(_) ->
+	#{}.
+
 set_transport_options(State=#state{max_conns=MaxConns0}, CurConns, NbChildren, Sleepers0, TransOpts) ->
 set_transport_options(State=#state{max_conns=MaxConns0}, CurConns, NbChildren, Sleepers0, TransOpts) ->
 	MaxConns1 = maps:get(max_connections, TransOpts, 1024),
 	MaxConns1 = maps:get(max_connections, TransOpts, 1024),
 	HandshakeTimeout = maps:get(handshake_timeout, TransOpts, 5000),
 	HandshakeTimeout = maps:get(handshake_timeout, TransOpts, 5000),
@@ -277,9 +327,69 @@ set_transport_options(State=#state{max_conns=MaxConns0}, CurConns, NbChildren, S
 		false ->
 		false ->
 			Sleepers0
 			Sleepers0
 	end,
 	end,
-	loop(State#state{max_conns=MaxConns1, handshake_timeout=HandshakeTimeout, shutdown=Shutdown},
+	State1=set_alarm_option(State, TransOpts, CurConns),
+	loop(State1#state{max_conns=MaxConns1, handshake_timeout=HandshakeTimeout, shutdown=Shutdown},
 		CurConns, NbChildren, Sleepers1).
 		CurConns, NbChildren, Sleepers1).
 
 
+set_alarm_option(State=#state{ref=Ref, alarms=OldAlarms}, TransOpts, CurConns) ->
+	NewAlarms0 = get_alarms(TransOpts),
+	NewAlarms1 = merge_alarms(OldAlarms, NewAlarms0),
+	NewAlarms2 = trigger_alarms(Ref, NewAlarms1, CurConns),
+	State#state{alarms=NewAlarms2}.
+
+merge_alarms(Old, New) ->
+	OldList = lists:sort(maps:to_list(Old)),
+	NewList = lists:sort(maps:to_list(New)),
+	Merged = merge_alarms(OldList, NewList, []),
+	maps:from_list(Merged).
+
+merge_alarms([], News, Acc) ->
+	News ++ Acc;
+merge_alarms([{_, {_, undefined}}|Olds], [], Acc) ->
+	merge_alarms(Olds, [], Acc);
+merge_alarms([{_, {_, Timer}}|Olds], [], Acc) ->
+	_ = cancel_alarm_reactivation_timer(Timer),
+	merge_alarms(Olds, [], Acc);
+merge_alarms([{Name, {OldOpts, Timer}}|Olds], [{Name, {NewOpts, _}}|News], Acc) ->
+	merge_alarms(Olds, News, [{Name, {NewOpts, adapt_alarm_timer(Name, Timer, OldOpts, NewOpts)}}|Acc]);
+merge_alarms([{OldName, {_, Timer}}|Olds], News=[{NewName, _}|_], Acc) when OldName < NewName ->
+	_ = cancel_alarm_reactivation_timer(Timer),
+	merge_alarms(Olds, News, Acc);
+merge_alarms(Olds, [New|News], Acc) ->
+	merge_alarms(Olds, News, [New|Acc]).
+
+%% Not in cooldown.
+adapt_alarm_timer(_, undefined, _, _) ->
+	undefined;
+%% Cooldown unchanged.
+adapt_alarm_timer(_, Timer, #{cooldown := Cooldown}, #{cooldown := Cooldown}) ->
+	Timer;
+%% Cooldown changed to no cooldown, cancel cooldown timer.
+adapt_alarm_timer(_, Timer, _, #{cooldown := 0}) ->
+	_ = cancel_alarm_reactivation_timer(Timer),
+	undefined;
+%% Cooldown changed, cancel current and start new timer taking the already elapsed time into account.
+adapt_alarm_timer(Name, Timer, #{cooldown := OldCooldown}, #{cooldown := NewCooldown}) ->
+	OldTimeLeft = cancel_alarm_reactivation_timer(Timer),
+	case NewCooldown-OldCooldown+OldTimeLeft of
+		NewTimeLeft when NewTimeLeft>0 ->
+			erlang:start_timer(NewTimeLeft, self(), {activate_alarm, Name});
+		_ ->
+			undefined
+	end.
+
+cancel_alarm_reactivation_timer(Timer) ->
+	case erlang:cancel_timer(Timer) of
+		%% Timer had already expired when we tried to cancel it, so we flush the
+		%% reactivation message it sent and return 0 as remaining time.
+		false ->
+			ok = receive {timeout, Timer, {activate_alarm, _}} -> ok after 0 -> ok end,
+			0;
+		%% Timer has not yet expired, we return the amount of time that was remaining.
+		TimeLeft ->
+			TimeLeft
+	end.
+
 -spec terminate(#state{}, any(), non_neg_integer()) -> no_return().
 -spec terminate(#state{}, any(), non_neg_integer()) -> no_return().
 terminate(#state{shutdown=brutal_kill, id=Id,
 terminate(#state{shutdown=brutal_kill, id=Id,
 		stats_counters_ref=StatsCounters}, Reason, NbChildren) ->
 		stats_counters_ref=StatsCounters}, Reason, NbChildren) ->

+ 75 - 1
test/acceptor_SUITE.erl

@@ -109,7 +109,8 @@ groups() ->
 		misc_post_listen_callback_error,
 		misc_post_listen_callback_error,
 		misc_set_transport_options,
 		misc_set_transport_options,
 		misc_wait_for_connections,
 		misc_wait_for_connections,
-		misc_multiple_ip_local_socket_opts
+		misc_multiple_ip_local_socket_opts,
+		misc_connection_alarms
 	]}, {supervisor, [
 	]}, {supervisor, [
 		connection_type_supervisor,
 		connection_type_supervisor,
 		connection_type_supervisor_separate_from_connection,
 		connection_type_supervisor_separate_from_connection,
@@ -576,6 +577,79 @@ do_misc_multiple_ip_local_socket_opts() ->
 	{error, enoent} = file:read_file_info(SockFile2),
 	{error, enoent} = file:read_file_info(SockFile2),
 	ok.
 	ok.
 
 
+misc_connection_alarms(_) ->
+	doc("Ensure that connection alarms work."),
+	Name = name(),
+
+	Self = self(),
+	TransOpts0 = #{num_conns_sups => 1},
+	AlarmCallback = fun (Ref, AlarmName, _, ActiveConns) ->
+		Self ! {connection_alarm, {Ref, AlarmName, length(ActiveConns)}}
+	end,
+	Alarms0 = #{
+		test1 => Alarm1 = #{type => num_connections, treshold => 2, cooldown => 0, callback => AlarmCallback},
+		test2 => Alarm2 = #{type => num_connections, treshold => 3, cooldown => 0, callback => AlarmCallback}
+	},
+	ConnectOpts = [binary, {active, false}, {packet, raw}],
+
+	{ok, _} = ranch:start_listener(Name, ranch_tcp,
+		TransOpts0#{alarms => Alarms0}, notify_and_wait_protocol, #{pid => self()}),
+	Port = ranch:get_port(Name),
+
+	{ok, _} = gen_tcp:connect("localhost", Port, ConnectOpts),
+	{1, [Conn1]} = receive_loop(connected, 100),
+	#{test1 := undefined, test2 := undefined} = do_recv_connection_alarms(Name, 100),
+
+	{ok, _} = gen_tcp:connect("localhost", Port, ConnectOpts),
+	{1, [Conn2]} = receive_loop(connected, 100),
+	#{test1 := 2, test2 := undefined} = do_recv_connection_alarms(Name, 100),
+
+	{ok, _} = gen_tcp:connect("localhost", Port, ConnectOpts),
+	{1, [Conn3]} = receive_loop(connected, 100),
+	#{test1 := 3, test2 := 3} = do_recv_connection_alarms(Name, 100),
+
+	Alarms1 = #{
+		test1 => Alarm1#{cooldown => 100},
+		test2 => Alarm2#{cooldown => 100}
+	},
+	ok = ranch:set_transport_options(Name, TransOpts0#{alarms => Alarms1}),
+	ok = do_flush_connection_alarms(Name),
+	#{test1 := 3, test2 := 3} = do_recv_connection_alarms(Name, 100),
+	ok = do_flush_connection_alarms(Name),
+	#{test1 := 3, test2 := 3} = do_recv_connection_alarms(Name, 100),
+
+	Conn3 ! stop,
+	timer:sleep(100),
+	ok = do_flush_connection_alarms(Name),
+	#{test1 := 2, test2 := undefined} = do_recv_connection_alarms(Name, 100),
+
+	Conn2 ! stop,
+	timer:sleep(100),
+	ok = do_flush_connection_alarms(Name),
+	#{test1 := undefined, test2 := undefined} = do_recv_connection_alarms(Name, 100),
+
+	Conn1 ! stop,
+
+	ok = ranch:stop_listener(Name),
+	ok.
+
+do_recv_connection_alarms(Name, Timeout) ->
+	do_recv_connection_alarms(Name, Timeout, #{test1 => undefined, test2 => undefined}).
+
+do_recv_connection_alarms(Name, Timeout, Acc) ->
+	receive {connection_alarm, {Name, AlarmName, N}} ->
+		do_recv_connection_alarms(Name, Timeout, Acc#{AlarmName => N})
+	after Timeout ->
+		Acc
+	end.
+
+do_flush_connection_alarms(Name) ->
+	receive {connection_alarm, {Name, _, _}} ->
+		do_flush_connection_alarms(Name)
+	after 0 ->
+		ok
+	end.
+
 %% ssl.
 %% ssl.
 
 
 ssl_accept_error(_) ->
 ssl_accept_error(_) ->