Просмотр исходного кода

Add a function to wait for number of connections

LH: Reworked validation style and added a -dialyzer
attribute to acceptor_SUITE to silence expected errors.
j.uhlig 7 лет назад
Родитель
Сommit
e381ccccdf
3 измененных файлов с 145 добавлено и 2 удалено
  1. 21 1
      doc/src/manual/ranch.asciidoc
  2. 47 0
      src/ranch.erl
  3. 77 1
      test/acceptor_SUITE.erl

+ 21 - 1
doc/src/manual/ranch.asciidoc

@@ -120,7 +120,7 @@ Ref = ref():: Listener name.
 
 Return the status of the given listener.
 
-=== get_transport_options(Ref) -> ProtoOpts
+=== get_transport_options(Ref) -> TransOpts
 
 Ref = ref():: Listener name.
 TransOpts = any():: Current transport options.
@@ -138,6 +138,7 @@ Return detailed information about all Ranch listeners.
 The following keys are defined:
 
 pid:: Pid of the listener's top-level supervisor.
+status:: Listener status, either running or suspended.
 ip:: Interface Ranch listens on.
 port:: Port number Ranch listens on.
 num_acceptors:: Number of acceptor processes.
@@ -149,6 +150,16 @@ transport_options:: Transport options.
 protocol:: Protocol module.
 protocol_options:: Protocol options.
 
+=== info(Ref) -> [{Key, Value}]
+
+Ref = ref():: Listener name.
+Key = atom():: Information key.
+Value = any():: Information value.
+
+Return detailed information about a specific Ranch listener.
+
+See `info/0` for a description of the defined keys.
+
 === procs(Ref, acceptors | connections) -> [pid()]
 
 Ref = ref():: Listener name.
@@ -253,3 +264,12 @@ If the listener is already suspended, nothing will happen.
 The listener will stop listening and accepting connections by
 closing the listening port, but will not stop running connection
 processes.
+
+=== wait_for_connections(Ref, Operator, NumConnections) -> ok
+
+Ref = ref():: Listener name.
+Operator = '>' | '>=' | '==' | '=<' | '<':: Comparison operator.
+NumConnections = non_neg_integer():: Number of connections to wait for.
+
+Wait until the number of connections on the given listener matches
+the given operator and number of connections.

+ 47 - 0
src/ranch.erl

@@ -35,6 +35,8 @@
 -export([info/0]).
 -export([info/1]).
 -export([procs/2]).
+-export([wait_for_connections/3]).
+-export([wait_for_connections/4]).
 -export([filter_options/3]).
 -export([set_option_default/3]).
 -export([require/1]).
@@ -272,6 +274,51 @@ procs1(Ref, Sup) ->
 		[]
 	end.
 
+-spec wait_for_connections
+	(ref(), '>' | '>=' | '==' | '=<', non_neg_integer()) -> ok;
+	(ref(), '<', pos_integer()) -> ok.
+wait_for_connections(Ref, Op, NumConns) ->
+	wait_for_connections(Ref, Op, NumConns, 1000).
+
+-spec wait_for_connections
+	(ref(), '>' | '>=' | '==' | '=<', non_neg_integer(), non_neg_integer()) -> ok;
+	(ref(), '<', pos_integer(), non_neg_integer()) -> ok.
+wait_for_connections(Ref, Op, NumConns, Interval) ->
+	validate_op(Op, NumConns),
+	validate_num_conns(NumConns),
+	validate_interval(Interval),
+	wait_for_connections_loop(Ref, Op, NumConns, Interval).
+
+validate_op('>', _) -> ok;
+validate_op('>=', _) -> ok;
+validate_op('==', _) -> ok;
+validate_op('=<', _) -> ok;
+validate_op('<', NumConns) when NumConns > 0 -> ok;
+validate_op(_, _) -> error(badarg).
+
+validate_num_conns(NumConns) when is_integer(NumConns), NumConns >= 0 -> ok;
+validate_num_conns(_) -> error(badarg).
+
+validate_interval(Interval) when is_integer(Interval), Interval >= 0 -> ok;
+validate_interval(_) -> error(badarg).
+
+wait_for_connections_loop(Ref, Op, NumConns, Interval) ->
+	CurConns = try
+		ConnsSup = ranch_server:get_connections_sup(Ref),
+		proplists:get_value(active, supervisor:count_children(ConnsSup))
+	catch _:_ ->
+		0
+	end,
+	case erlang:Op(CurConns, NumConns) of
+		true ->
+			ok;
+		false when Interval > 0 ->
+			wait_for_connections_loop(Ref, Op, NumConns, Interval);
+		false ->
+			timer:sleep(Interval),
+			wait_for_connections_loop(Ref, Op, NumConns, Interval)
+	end.
+
 -spec filter_options([inet | inet6 | {atom(), any()} | {raw, any(), any(), any()}],
 	[atom()], Acc) -> Acc when Acc :: [any()].
 filter_options(UserOptions, DisallowedKeys, DefaultOptions) ->

+ 77 - 1
test/acceptor_SUITE.erl

@@ -16,6 +16,8 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
+-dialyzer({nowarn_function, misc_wait_for_connections/1}).
+
 -import(ct_helper, [doc/1]).
 -import(ct_helper, [name/0]).
 
@@ -59,7 +61,8 @@ groups() ->
 		misc_bad_transport,
 		misc_bad_transport_options,
 		misc_info,
-		misc_info_embedded
+		misc_info_embedded,
+		misc_wait_for_connections
 	]}, {supervisor, [
 		connection_type_supervisor,
 		connection_type_supervisor_separate_from_connection,
@@ -266,6 +269,79 @@ misc_info_embedded(_) ->
 do_get_listener_info(ListenerGroup) ->
 	lists:sort([L || L={{G, _}, _} <- ranch:info(), G=:=ListenerGroup]).
 
+misc_wait_for_connections(_) ->
+	doc("Ensure wait for connections works."),
+	Name = name(),
+	Self = self(),
+	%% Ensure invalid arguments are rejected.
+	{'EXIT', {badarg, _}} = begin catch ranch:wait_for_connections(Name, 'foo', 0) end,
+	{'EXIT', {badarg, _}} = begin catch ranch:wait_for_connections(Name, '==', -1) end,
+	{'EXIT', {badarg, _}} = begin catch ranch:wait_for_connections(Name, '==', 0, -1) end,
+	{'EXIT', {badarg, _}} = begin catch ranch:wait_for_connections(Name, '<', 0) end,
+	%% Create waiters for increasing number of connections.
+	Pid1GT = do_create_waiter(Self, Name, '>', 0),
+	Pid1GE = do_create_waiter(Self, Name, '>=', 1),
+	Pid1EQ = do_create_waiter(Self, Name, '==', 1),
+	Pid2GT = do_create_waiter(Self, Name, '>', 1),
+	Pid2GE = do_create_waiter(Self, Name, '>=', 2),
+	Pid2EQ = do_create_waiter(Self, Name, '==', 2),
+	{ok, _} = ranch:start_listener(Name,
+		ranch_tcp, [{num_acceptors, 1}],
+		remove_conn_and_wait_protocol, [{remove, true, 2500}]),
+	Port = ranch:get_port(Name),
+	%% Create some connections, ensure that waiters respond.
+	{ok, Sock1} = gen_tcp:connect("localhost", Port, []),
+	ok = do_expect_waiter(Pid1GT),
+	ok = do_expect_waiter(Pid1GE),
+	ok = do_expect_waiter(Pid1EQ),
+	ok = do_expect_waiter(undefined),
+	{ok, Sock2} = gen_tcp:connect("localhost", Port, []),
+	ok = do_expect_waiter(Pid2GT),
+	ok = do_expect_waiter(Pid2GE),
+	ok = do_expect_waiter(Pid2EQ),
+	ok = do_expect_waiter(undefined),
+	%% Create waiters for decreasing number of connections.
+	Pid3LT = do_create_waiter(Self, Name, '<', 2),
+	Pid3LE = do_create_waiter(Self, Name, '=<', 1),
+	Pid3EQ = do_create_waiter(Self, Name, '==', 1),
+	Pid4LT = do_create_waiter(Self, Name, '<', 1),
+	Pid4LE = do_create_waiter(Self, Name, '=<', 0),
+	Pid4EQ = do_create_waiter(Self, Name, '==', 0),
+	%% Close connections, ensure that waiters respond.
+	ok = gen_tcp:close(Sock1),
+	ok = do_expect_waiter(Pid3LT),
+	ok = do_expect_waiter(Pid3LE),
+	ok = do_expect_waiter(Pid3EQ),
+	ok = do_expect_waiter(undefined),
+	ok = gen_tcp:close(Sock2),
+	ok = do_expect_waiter(Pid4LT),
+	ok = do_expect_waiter(Pid4LE),
+	ok = do_expect_waiter(Pid4EQ),
+	ok = do_expect_waiter(undefined),
+	ok = ranch:stop_listener(Name),
+	%% Make sure the listener stopped.
+	{'EXIT', _} = begin catch ranch:get_port(Name) end,
+	ok.
+
+do_create_waiter(ReplyTo, Ref, Op, NumConns) ->
+	spawn(fun () -> ok = ranch:wait_for_connections(Ref, Op, NumConns, 100),
+		ReplyTo ! {wait_connections, self()} end).
+
+do_expect_waiter(WaiterPid) ->
+	receive
+		{wait_connections, _} when WaiterPid=:=undefined ->
+			error;
+		{wait_connections, Pid} when Pid=:=WaiterPid ->
+			ok
+	after 1000 ->
+			case WaiterPid of
+				undefined ->
+					ok;
+				_ ->
+					timeout
+			end
+	end.
+
 %% ssl.
 
 ssl_accept_error(_) ->