Browse Source

Make accept asynchronous

Ranch now accepts connection asynchronously through a separate
process. The accept process is linked to the acceptor, calls
accept and does nothing else but send the socket back to the
acceptor. This allows us to receive messages in the acceptor
to handle upgrades instead of polling. This will also allow us
later to make acceptors system processes.

Remove support for connection pools in favor of a simpler
max_connections setting. Connections can be removed from the
count, allowing us to have as many long-lived connections as
we want while still limiting the number of short-lived ones.

Add max_connections, max_connections with long-lived connections,
and upgrade tests.
Loïc Hoguin 12 years ago
parent
commit
6b354c1124

+ 45 - 26
src/ranch_acceptor.erl

@@ -19,43 +19,62 @@
 -export([start_link/6]).
 
 %% Internal.
--export([acceptor/7]).
+-export([init/7]).
+-export([loop/7]).
 
 %% API.
 
 -spec start_link(any(), inet:socket(), module(), module(), pid(), pid())
 	-> {ok, pid()}.
 start_link(Ref, LSocket, Transport, Protocol, ListenerPid, ConnsSup) ->
+	{ok, MaxConns} = ranch_listener:get_max_connections(ListenerPid),
 	{ok, Opts} = ranch_listener:get_protocol_options(ListenerPid),
-	Pid = spawn_link(?MODULE, acceptor,
-		[LSocket, Transport, Protocol, Opts, 1, ListenerPid, ConnsSup]),
+	Pid = spawn_link(?MODULE, init,
+		[LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup]),
 	ok = ranch_server:add_acceptor(Ref, Pid),
 	{ok, Pid}.
 
 %% Internal.
 
--spec acceptor(inet:socket(), module(), module(), any(),
-	non_neg_integer(), pid(), pid()) -> no_return().
-acceptor(LSocket, Transport, Protocol, Opts, OptsVsn, ListenerPid, ConnsSup) ->
-	Res = case Transport:accept(LSocket, 2000) of
-		{ok, CSocket} ->
-			{ok, Pid} = supervisor:start_child(ConnsSup,
+-spec init(inet:socket(), module(), module(),
+	non_neg_integer(), any(), pid(), pid()) -> no_return().
+init(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup) ->
+	async_accept(LSocket, Transport),
+	loop(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup).
+
+-spec loop(inet:socket(), module(), module(),
+	non_neg_integer(), any(), pid(), pid()) -> no_return().
+loop(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup) ->
+	receive
+		{accept, CSocket} ->
+			{ok, ConnPid} = supervisor:start_child(ConnsSup,
 				[ListenerPid, CSocket, Transport, Protocol, Opts]),
-			Transport:controlling_process(CSocket, Pid),
-			ranch_listener:add_connection(ListenerPid,
-				default, Pid, OptsVsn);
-		{error, timeout} ->
-			ranch_listener:check_upgrades(ListenerPid, OptsVsn);
-		{error, _Reason} ->
-			%% @todo Probably do something here. If the socket was closed,
-			%%       we may want to try and listen again on the port?
-			ok
-	end,
-	case Res of
-		ok ->
-			?MODULE:acceptor(LSocket, Transport, Protocol,
-				Opts, OptsVsn, ListenerPid, ConnsSup);
-		{upgrade, Opts2, OptsVsn2} ->
-			?MODULE:acceptor(LSocket, Transport, Protocol,
-				Opts2, OptsVsn2, ListenerPid, ConnsSup)
+			Transport:controlling_process(CSocket, ConnPid),
+			ConnPid ! {shoot, ListenerPid},
+			NbConns = ranch_listener:add_connection(ListenerPid, ConnPid),
+			maybe_wait(ListenerPid, MaxConns, NbConns),
+			?MODULE:init(LSocket, Transport, Protocol,
+				MaxConns, Opts, ListenerPid, ConnsSup);
+		{set_opts, Opts2} ->
+			?MODULE:loop(LSocket, Transport, Protocol,
+				MaxConns, Opts2, ListenerPid, ConnsSup)
 	end.
+
+-spec maybe_wait(pid(), non_neg_integer(), non_neg_integer()) -> ok.
+maybe_wait(_, MaxConns, NbConns) when MaxConns > NbConns ->
+	ok;
+maybe_wait(ListenerPid, MaxConns, _) ->
+	erlang:yield(),
+	NbConns2 = ranch_server:count_connections(ListenerPid),
+	maybe_wait(ListenerPid, MaxConns, NbConns2).
+
+-spec async_accept(inet:socket(), module()) -> ok.
+async_accept(LSocket, Transport) ->
+	AcceptorPid = self(),
+	_ = spawn_link(fun() ->
+		%% @todo {error, closed} must be handled and other errors ignored.
+		{ok, CSocket} = Transport:accept(LSocket, infinity),
+		Transport:controlling_process(CSocket, AcceptorPid),
+		AcceptorPid ! {accept, CSocket}
+	end),
+	ok.

+ 47 - 146
src/ranch_listener.erl

@@ -17,14 +17,13 @@
 -behaviour(gen_server).
 
 %% API.
--export([start_link/2]).
+-export([start_link/3]).
 -export([stop/1]).
--export([add_connection/4]).
--export([move_connection/3]).
--export([remove_connection/2]).
--export([check_upgrades/2]).
+-export([add_connection/2]).
+-export([remove_connection/1]).
 -export([get_port/1]).
 -export([set_port/2]).
+-export([get_max_connections/1]).
 -export([get_protocol_options/1]).
 -export([set_protocol_options/2]).
 
@@ -36,74 +35,41 @@
 -export([terminate/2]).
 -export([code_change/3]).
 
--type pools() :: [{atom(), non_neg_integer()}].
-
 -record(state, {
-	conn_pools = [] :: pools(),
-	conns_table :: ets:tid(),
-	queue = undefined :: queue(),
+	ref :: any(),
 	max_conns = undefined :: non_neg_integer(),
 	port = undefined :: undefined | inet:port_number(),
-	proto_opts :: any(),
-	proto_opts_vsn = 1 :: non_neg_integer()
+	proto_opts = undefined :: any(),
+	rm_diff = 0 :: non_neg_integer()
 }).
 
 %% API.
 
 %% @private
-%%
-%% We set the process priority to high because ranch_listener is the central
-%% gen_server in Ranch and is used to manage all the incoming connections.
-%% Setting the process priority to high ensures the connection-related code
-%% will always be executed when a connection needs it, allowing Ranch to
-%% scale far beyond what it would with a normal priority.
--spec start_link(non_neg_integer(), any()) -> {ok, pid()}.
-start_link(MaxConns, ProtoOpts) ->
-	gen_server:start_link(?MODULE, [MaxConns, ProtoOpts],
-		[{spawn_opt, [{priority, high}]}]).
+-spec start_link(any(), non_neg_integer(), any()) -> {ok, pid()}.
+start_link(Ref, MaxConns, ProtoOpts) ->
+	gen_server:start_link(?MODULE, [Ref, MaxConns, ProtoOpts], []).
 
 %% @private
 -spec stop(pid()) -> stopped.
 stop(ServerPid) ->
 	gen_server:call(ServerPid, stop).
 
-%% @doc Add a connection to the given pool in the listener.
-%%
-%% Pools of connections are used to restrict the maximum number of connections
-%% depending on their type. By default, Ranch add all connections to the
-%% pool <em>default</em>. It also checks for the maximum number of connections
-%% in that pool before accepting again. This function only returns when there
-%% is free space in the pool.
-%%
-%% When a process managing a connection dies, the process is removed from the
-%% pool. If the socket has been sent to another process, it is up to the
-%% protocol code to inform the listener of the new <em>ConnPid</em> by removing
-%% the previous and adding the new one.
+%% @doc Add a connection to the listener's pool.
+-spec add_connection(pid(), pid()) -> non_neg_integer().
+add_connection(ServerPid, ConnPid) ->
+	ok = gen_server:cast(ServerPid, {add_connection, ConnPid}),
+	ranch_server:add_connection(ServerPid).
+
+%% @doc Remove this process' connection from the pool.
 %%
-%% This function also returns whether the protocol options have been modified.
-%% If so, then an {upgrade, ProtoOpts, OptsVsn} will be returned instead of
-%% the atom 'ok'. The acceptor can then continue with the new protocol options.
--spec add_connection(pid(), atom(), pid(), non_neg_integer())
-	-> ok | {upgrade, any(), non_neg_integer()}.
-add_connection(ServerPid, Pool, ConnPid, OptsVsn) ->
-	gen_server:call(ServerPid, {add_connection, Pool, ConnPid, OptsVsn},
-		infinity).
-
-%% @doc Move a connection from one pool to another.
--spec move_connection(pid(), atom(), pid()) -> ok.
-move_connection(ServerPid, DestPool, ConnPid) ->
-	gen_server:cast(ServerPid, {move_connection, DestPool, ConnPid}).
-
-%% @doc Remove the given connection from its pool.
--spec remove_connection(pid(), pid()) -> ok.
-remove_connection(ServerPid, ConnPid) ->
-	gen_server:cast(ServerPid, {remove_connection, ConnPid}).
-
-%% @doc Return whether a protocol upgrade is required.
--spec check_upgrades(pid(), non_neg_integer())
-	-> ok | {upgrade, any(), non_neg_integer()}.
-check_upgrades(ServerPid, OptsVsn) ->
-	gen_server:call(ServerPid, {check_upgrades, OptsVsn}).
+%% Useful if you have long-lived connections that aren't taking up
+%% resources and shouldn't be counted in the limited number of running
+%% connections.
+-spec remove_connection(pid()) -> non_neg_integer().
+remove_connection(ServerPid) ->
+	ok = gen_server:cast(ServerPid, remove_connection),
+	ranch_server:remove_connection(ServerPid).
 
 %% @doc Return the listener's port.
 -spec get_port(pid()) -> {ok, inet:port_number()}.
@@ -115,6 +81,12 @@ get_port(ServerPid) ->
 set_port(ServerPid, Port) ->
 	gen_server:cast(ServerPid, {set_port, Port}).
 
+%% @doc Return the max number of connections allowed concurrently.
+%% @todo Add set_max_connections.
+-spec get_max_connections(pid()) -> {ok, non_neg_integer()}.
+get_max_connections(ServerPid) ->
+	gen_server:call(ServerPid, get_max_connections).
+
 %% @doc Return the current protocol options.
 -spec get_protocol_options(pid()) -> {ok, any()}.
 get_protocol_options(ServerPid) ->
@@ -128,65 +100,41 @@ set_protocol_options(ServerPid, ProtoOpts) ->
 %% gen_server.
 
 %% @private
-init([MaxConns, ProtoOpts]) ->
-	ConnsTable = ets:new(connections_table, [set, private]),
-	Queue = queue:new(),
-	{ok, #state{conns_table=ConnsTable, max_conns=MaxConns,
-		proto_opts=ProtoOpts, queue=Queue}}.
+init([Ref, MaxConns, ProtoOpts]) ->
+	{ok, #state{ref=Ref, max_conns=MaxConns, proto_opts=ProtoOpts}}.
 
 %% @private
-handle_call({add_connection, Pool, ConnPid, AccOptsVsn}, From, State=#state{
-		conn_pools=Pools, conns_table=ConnsTable,
-		queue=Queue, max_conns=MaxConns,
-		proto_opts=ProtoOpts, proto_opts_vsn=LisOptsVsn}) ->
-	{NbConns, Pools2} = add_pid(ConnPid, Pool, Pools, ConnsTable),
-	State2 = State#state{conn_pools=Pools2},
-	if	AccOptsVsn =/= LisOptsVsn ->
-			{reply, {upgrade, ProtoOpts, LisOptsVsn}, State2};
-		NbConns > MaxConns ->
-			Queue2 = queue:in(From, Queue),
-			{noreply, State2#state{queue=Queue2}};
-		true ->
-			{reply, ok, State2}
-	end;
-handle_call({check_upgrades, AccOptsVsn}, _From, State=#state{
-		proto_opts=ProtoOpts, proto_opts_vsn=LisOptsVsn}) ->
-	if	AccOptsVsn =/= LisOptsVsn ->
-			{reply, {upgrade, ProtoOpts, LisOptsVsn}, State};
-		true ->
-			{reply, ok, State}
-	end;
 handle_call(get_port, _From, State=#state{port=Port}) ->
 	{reply, {ok, Port}, State};
+handle_call(get_max_connections, _From, State=#state{max_conns=MaxConns}) ->
+	{reply, {ok, MaxConns}, State};
 handle_call(get_protocol_options, _From, State=#state{proto_opts=ProtoOpts}) ->
 	{reply, {ok, ProtoOpts}, State};
-handle_call({set_protocol_options, ProtoOpts}, _From,
-		State=#state{proto_opts_vsn=OptsVsn}) ->
-	{reply, ok, State#state{proto_opts=ProtoOpts, proto_opts_vsn=OptsVsn + 1}};
+handle_call({set_protocol_options, ProtoOpts}, _From, State=#state{ref=Ref}) ->
+	ranch_server:send_to_acceptors(Ref, {set_opts, ProtoOpts}),
+	{reply, ok, State#state{proto_opts=ProtoOpts}};
 handle_call(stop, _From, State) ->
 	{stop, normal, stopped, State};
 handle_call(_, _From, State) ->
 	{reply, ignored, State}.
 
 %% @private
+handle_cast({add_connection, ConnPid}, State) ->
+	_ = erlang:monitor(process, ConnPid),
+	{noreply, State};
+handle_cast(remove_connection, State=#state{rm_diff=RmDiff}) ->
+	{noreply, State#state{rm_diff=RmDiff + 1}};
 handle_cast({set_port, Port}, State) ->
 	{noreply, State#state{port=Port}};
-handle_cast({move_connection, DestPool, ConnPid}, State=#state{
-		conn_pools=Pools, conns_table=ConnsTable}) ->
-	Pools2 = move_pid(ConnPid, DestPool, Pools, ConnsTable),
-	{noreply, State#state{conn_pools=Pools2}};
-handle_cast({remove_connection, ConnPid}, State=#state{
-		conn_pools=Pools, conns_table=ConnsTable, queue=Queue}) ->
-	{Pools2, Queue2} = remove_pid(ConnPid, Pools, ConnsTable, Queue),
-	{noreply, State#state{conn_pools=Pools2, queue=Queue2}};
 handle_cast(_Msg, State) ->
 	{noreply, State}.
 
 %% @private
-handle_info({'DOWN', _Ref, process, Pid, _Info}, State=#state{
-		conn_pools=Pools, conns_table=ConnsTable, queue=Queue}) ->
-	{Pools2, Queue2} = remove_pid(Pid, Pools, ConnsTable, Queue),
-	{noreply, State#state{conn_pools=Pools2, queue=Queue2}};
+handle_info({'DOWN', _, process, _, _}, State=#state{rm_diff=0}) ->
+	_ = ranch_server:remove_connection(self()),
+	{noreply, State};
+handle_info({'DOWN', _, process, _, _}, State=#state{rm_diff=RmDiff}) ->
+	{noreply, State#state{rm_diff=RmDiff - 1}};
 handle_info(_Info, State) ->
 	{noreply, State}.
 
@@ -197,50 +145,3 @@ terminate(_Reason, _State) ->
 %% @private
 code_change(_OldVsn, State, _Extra) ->
 	{ok, State}.
-
-%% Internal.
-
-%% @private
--spec add_pid(pid(), atom(), pools(), ets:tid())
-	-> {non_neg_integer(), pools()}.
-add_pid(ConnPid, Pool, Pools, ConnsTable) ->
-	MonitorRef = erlang:monitor(process, ConnPid),
-	ConnPid ! {shoot, self()},
-	{NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of
-		false ->
-			{1, [{Pool, 1}|Pools]};
-		{Pool, NbConns} ->
-			NbConns2 = NbConns + 1,
-			{NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]}
-	end,
-	ets:insert(ConnsTable, {ConnPid, {MonitorRef, Pool}}),
-	{NbConnsRet, Pools2}.
-
-%% @private
--spec move_pid(pid(), atom(), pools(), ets:tid()) -> pools().
-move_pid(ConnPid, DestPool, Pools, ConnsTable) ->
-	{MonitorRef, SrcPool} = ets:lookup_element(ConnsTable, ConnPid, 2),
-	ets:insert(ConnsTable, {ConnPid, {MonitorRef, DestPool}}),
-	{SrcPool, SrcNbConns} = lists:keyfind(SrcPool, 1, Pools),
-	DestNbConns = case lists:keyfind(DestPool, 1, Pools) of
-		false -> 1;
-		{DestPool, NbConns} -> NbConns + 1
-	end,
-	Pools2 = lists:keydelete(SrcPool, 1, lists:keydelete(DestPool, 1, Pools)),
-	[{SrcPool, SrcNbConns - 1}, {DestPool, DestNbConns}|Pools2].
-
-%% @private
--spec remove_pid(pid(), pools(), ets:tid(), queue()) -> {pools(), queue()}.
-remove_pid(Pid, Pools, ConnsTable, Queue) ->
-	{MonitorRef, Pool} = ets:lookup_element(ConnsTable, Pid, 2),
-	erlang:demonitor(MonitorRef, [flush]),
-	{Pool, NbConns} = lists:keyfind(Pool, 1, Pools),
-	Pools2 = [{Pool, NbConns - 1}|lists:keydelete(Pool, 1, Pools)],
-	ets:delete(ConnsTable, Pid),
-	case queue:out(Queue) of
-		{{value, Client}, Queue2} ->
-			gen_server:reply(Client, ok),
-			{Pools2, Queue2};
-		_ ->
-			{Pools2, Queue}
-	end.

+ 2 - 1
src/ranch_listener_sup.erl

@@ -30,7 +30,8 @@ start_link(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) ->
 	MaxConns = proplists:get_value(max_connections, TransOpts, 1024),
 	{ok, SupPid} = supervisor:start_link(?MODULE, []),
 	{ok, ListenerPid} = supervisor:start_child(SupPid,
-		{ranch_listener, {ranch_listener, start_link, [MaxConns, ProtoOpts]},
+		{ranch_listener, {ranch_listener, start_link,
+			[Ref, MaxConns, ProtoOpts]},
 		 permanent, 5000, worker, [ranch_listener]}),
 	ok = ranch_server:insert_listener(Ref, ListenerPid),
 	{ok, ConnsPid} = supervisor:start_child(SupPid,

+ 28 - 1
src/ranch_server.erl

@@ -22,6 +22,9 @@
 -export([lookup_listener/1]).
 -export([add_acceptor/2]).
 -export([send_to_acceptors/2]).
+-export([add_connection/1]).
+-export([count_connections/1]).
+-export([remove_connection/1]).
 
 %% gen_server.
 -export([init/1]).
@@ -69,12 +72,31 @@ send_to_acceptors(Ref, Msg) ->
 	_ = [Pid ! Msg || Pid <- Acceptors],
 	ok.
 
+%% @doc Add a connection to the connection pool.
+%%
+%% Also return the number of connections in the pool after this operation.
+-spec add_connection(pid()) -> non_neg_integer().
+add_connection(ListenerPid) ->
+	ets:update_counter(?TAB, {connections, ListenerPid}, 1).
+
+%% @doc Count the number of connections in the connection pool.
+-spec count_connections(pid()) -> non_neg_integer().
+count_connections(ListenerPid) ->
+	ets:update_counter(?TAB, {connections, ListenerPid}, 0).
+
+%% @doc Remove a connection from the connection pool.
+%%
+%% Also return the number of connections in the pool after this operation.
+-spec remove_connection(pid()) -> non_neg_integer().
+remove_connection(ListenerPid) ->
+	ets:update_counter(?TAB, {connections, ListenerPid}, -1).
+
 %% gen_server.
 
 %% @private
 init([]) ->
 	?TAB = ets:new(?TAB, [
-		ordered_set, public, named_table, {read_concurrency, true}]),
+		ordered_set, public, named_table, {write_concurrency, true}]),
 	{ok, #state{}}.
 
 %% @private
@@ -84,6 +106,7 @@ handle_call(_Request, _From, State) ->
 %% @private
 handle_cast({insert_listener, Ref, Pid}, State=#state{monitors=Monitors}) ->
 	true = ets:insert_new(?TAB, {{acceptors, Ref}, []}),
+	true = ets:insert_new(?TAB, {{connections, Pid}, 0}),
 	MonitorRef = erlang:monitor(process, Pid),
 	{noreply, State#state{
 		monitors=[{{MonitorRef, Pid}, {listener, Ref}}|Monitors]}};
@@ -93,6 +116,9 @@ handle_cast({add_acceptor, Ref, Pid}, State=#state{monitors=Monitors}) ->
 	true = ets:insert(?TAB, {{acceptors, Ref}, [Pid|Acceptors]}),
 	{noreply, State#state{
 		monitors=[{{MonitorRef, Pid}, {acceptors, Ref}}|Monitors]}};
+handle_cast({add_connection, Pid}, State) ->
+	_ = erlang:monitor(process, Pid),
+	{noreply, State};
 handle_cast(_Request, State) ->
 	{noreply, State}.
 
@@ -120,6 +146,7 @@ code_change(_OldVsn, State, _Extra) ->
 remove_process(Key = {listener, Ref}, MonitorRef, Pid, Monitors) ->
 	true = ets:delete(?TAB, Key),
 	true = ets:delete(?TAB, {acceptors, Ref}),
+	true = ets:delete(?TAB, {connections, Pid}),
 	lists:keydelete({MonitorRef, Pid}, 1, Monitors);
 remove_process(Key = {acceptors, _}, MonitorRef, Pid, Monitors) ->
 	Acceptors = ets:lookup_element(?TAB, Key, 2),

+ 66 - 1
test/acceptor_SUITE.erl

@@ -29,6 +29,9 @@
 
 %% tcp.
 -export([tcp_echo/1]).
+-export([tcp_max_connections/1]).
+-export([tcp_max_connections_and_beyond/1]).
+-export([tcp_upgrade/1]).
 
 %% ct.
 
@@ -37,7 +40,10 @@ all() ->
 
 groups() ->
 	[{tcp, [
-		tcp_echo
+		tcp_echo,
+		tcp_max_connections,
+		tcp_max_connections_and_beyond,
+		tcp_upgrade
 	]}, {ssl, [
 		ssl_echo
 	]}].
@@ -100,3 +106,62 @@ tcp_echo(_) ->
 	%% Make sure the listener stopped.
 	{'EXIT', _} = begin catch ranch:get_port(tcp_echo) end,
 	ok.
+
+tcp_max_connections(_) ->
+	{ok, _} = ranch:start_listener(tcp_max_connections, 1,
+		ranch_tcp, [{port, 0}, {max_connections, 10}],
+		notify_and_wait_protocol, [{msg, connected}, {pid, self()}]),
+	Port = ranch:get_port(tcp_max_connections),
+	%% @todo We'll probably want a more direct interface to count_connections.
+	ListenerPid = ranch_server:lookup_listener(tcp_max_connections),
+	ok = connect_loop(Port, 11, 150),
+	10 = ranch_server:count_connections(ListenerPid),
+	10 = receive_loop(connected, 400),
+	1 = receive_loop(connected, 1000).
+
+tcp_max_connections_and_beyond(_) ->
+	{ok, _} = ranch:start_listener(tcp_max_connections_and_beyond, 1,
+		ranch_tcp, [{port, 0}, {max_connections, 10}],
+		remove_conn_and_wait_protocol, [{remove, true}]),
+	Port = ranch:get_port(tcp_max_connections_and_beyond),
+	%% @todo We'll probably want a more direct interface to count_connections.
+	ListenerPid = ranch_server:lookup_listener(tcp_max_connections_and_beyond),
+	ok = connect_loop(Port, 10, 0),
+	0 = ranch_server:count_connections(ListenerPid),
+	ranch:set_protocol_options(tcp_max_connections_and_beyond,
+		[{remove, false}]),
+	receive after 500 -> ok end,
+	ok = connect_loop(Port, 10, 0),
+	receive after 500 -> ok end,
+	10 = ranch_server:count_connections(ListenerPid).
+
+tcp_upgrade(_) ->
+	receive after 20000 -> ok end,
+	{ok, _} = ranch:start_listener(tcp_upgrade, 1,
+		ranch_tcp, [{port, 0}],
+		notify_and_wait_protocol, [{msg, connected}, {pid, self()}]),
+	Port = ranch:get_port(tcp_upgrade),
+	ok = connect_loop(Port, 1, 0),
+	receive connected -> ok after 1000 -> error(timeout) end,
+	ranch:set_protocol_options(tcp_upgrade, [{msg, upgraded}, {pid, self()}]),
+	ok = connect_loop(Port, 1, 0),
+	receive upgraded -> ok after 1000 -> error(timeout) end.
+
+%% Utility functions.
+
+connect_loop(_, 0, _) ->
+	ok;
+connect_loop(Port, N, Sleep) ->
+	{ok, _} = gen_tcp:connect("localhost", Port,
+		[binary, {active, false}, {packet, raw}]),
+	receive after Sleep -> ok end,
+	connect_loop(Port, N - 1, Sleep).
+
+receive_loop(Message, Timeout) ->
+	receive_loop(Message, Timeout, 0).
+receive_loop(Message, Timeout, N) ->
+	receive Message ->
+		receive_loop(Message, Timeout, N + 1)
+	after Timeout ->
+		N
+	end.

+ 11 - 0
test/notify_and_wait_protocol.erl

@@ -0,0 +1,11 @@
+-module(notify_and_wait_protocol).
+-export([start_link/4]).
+-export([init/2]).
+
+start_link(_, _, _, [{msg, Msg}, {pid, TestPid}]) ->
+	Pid = spawn_link(?MODULE, init, [Msg, TestPid]),
+	{ok, Pid}.
+
+init(Msg, Pid) ->
+	Pid ! Msg,
+	receive after 2500 -> ok end.

+ 17 - 0
test/remove_conn_and_wait_protocol.erl

@@ -0,0 +1,17 @@
+-module(remove_conn_and_wait_protocol).
+-export([start_link/4]).
+-export([init/2]).
+
+start_link(ListenerPid, _, _, [{remove, MaybeRemove}]) ->
+	Pid = spawn_link(?MODULE, init, [ListenerPid, MaybeRemove]),
+	{ok, Pid}.
+
+init(ListenerPid, MaybeRemove) ->
+	ranch:accept_ack(ListenerPid),
+	case MaybeRemove of
+		true ->
+			ranch_listener:remove_connection(ListenerPid);
+		false ->
+			ok
+	end,
+	receive after 2500 -> ok end.