Browse Source

Introduce cowboy_listener for managing a listener

Currently only supports limiting the maximum number of
connections by managing connection pools.
Loïc Hoguin 13 years ago
parent
commit
56369d5c1a
5 changed files with 192 additions and 28 deletions
  1. 5 0
      src/cowboy.erl
  2. 18 17
      src/cowboy_acceptor.erl
  3. 11 8
      src/cowboy_acceptors_sup.erl
  4. 151 0
      src/cowboy_listener.erl
  5. 7 3
      src/cowboy_listener_sup.erl

+ 5 - 0
src/cowboy.erl

@@ -32,6 +32,11 @@
 %% performance. The exact number depends of course on your hardware, on the
 %% protocol used and on the number of expected simultaneous connections.
 %%
+%% The <em>Transport</em> option <em>max_connections</em> allows you to define
+%% the maximum number of simultaneous connections for this listener. It defaults
+%% to 1024. See <em>cowboy_listener</em> for more details on limiting the number
+%% of connections.
+%%
 %% Although Cowboy includes a <em>cowboy_http_protocol</em> handler, other
 %% handlers can be created for different protocols like IRC, FTP and more.
 %%

+ 18 - 17
src/cowboy_acceptor.erl

@@ -15,29 +15,32 @@
 %% @private
 -module(cowboy_acceptor).
 
--export([start_link/6]). %% API.
--export([acceptor/6]). %% Internal.
+-export([start_link/7]). %% API.
+-export([acceptor/7]). %% Internal.
 
 %% API.
 
 -spec start_link(inet:socket(), module(), module(), any(),
-	non_neg_integer(), pid()) -> {ok, pid()}.
-start_link(LSocket, Transport, Protocol, Opts, MaxConns, ReqsSup) ->
+	non_neg_integer(), pid(), pid()) -> {ok, pid()}.
+start_link(LSocket, Transport, Protocol, Opts,
+		MaxConns, ListenerPid, ReqsSup) ->
 	Pid = spawn_link(?MODULE, acceptor,
-		[LSocket, Transport, Protocol, Opts, MaxConns, ReqsSup]),
+		[LSocket, Transport, Protocol, Opts, MaxConns, ListenerPid, ReqsSup]),
 	{ok, Pid}.
 
 %% Internal.
 
 -spec acceptor(inet:socket(), module(), module(), any(),
-	non_neg_integer(), pid()) -> no_return().
-acceptor(LSocket, Transport, Protocol, Opts, MaxConns, ReqsSup) ->
+	non_neg_integer(), pid(), pid()) -> no_return().
+acceptor(LSocket, Transport, Protocol, Opts, MaxConns, ListenerPid, ReqsSup) ->
 	case Transport:accept(LSocket, 2000) of
 		{ok, CSocket} ->
 			{ok, Pid} = supervisor:start_child(ReqsSup,
 				[CSocket, Transport, Protocol, Opts]),
 			Transport:controlling_process(CSocket, Pid),
-			limit_reqs(MaxConns, ReqsSup);
+			{ok, NbConns} = cowboy_listener:add_connection(ListenerPid,
+				default, Pid),
+			limit_reqs(ListenerPid, NbConns, MaxConns);
 		{error, timeout} ->
 			ignore;
 		{error, _Reason} ->
@@ -45,13 +48,11 @@ acceptor(LSocket, Transport, Protocol, Opts, MaxConns, ReqsSup) ->
 			%%       we may want to try and listen again on the port?
 			ignore
 	end,
-	?MODULE:acceptor(LSocket, Transport, Protocol, Opts, MaxConns, ReqsSup).
+	?MODULE:acceptor(LSocket, Transport, Protocol, Opts,
+		MaxConns, ListenerPid, ReqsSup).
 
--spec limit_reqs(non_neg_integer(), pid()) -> ok.
-limit_reqs(MaxConns, ReqsSup) ->
-	Counts = supervisor:count_children(ReqsSup),
-	Active = lists:keyfind(active, 1, Counts),
-	case Active < MaxConns of
-		true -> ok;
-		false -> timer:sleep(1)
-	end.
+-spec limit_reqs(pid(), non_neg_integer(), non_neg_integer()) -> ok.
+limit_reqs(_ListenerPid, NbConns, MaxConns) when NbConns =< MaxConns ->
+	ok;
+limit_reqs(ListenerPid, _NbConns, MaxConns) ->
+	cowboy_listener:wait(ListenerPid, default, MaxConns).

+ 11 - 8
src/cowboy_acceptors_sup.erl

@@ -16,25 +16,28 @@
 -module(cowboy_acceptors_sup).
 -behaviour(supervisor).
 
--export([start_link/6]). %% API.
+-export([start_link/7]). %% API.
 -export([init/1]). %% supervisor.
 
 %% API.
 
--spec start_link(non_neg_integer(), module(), any(), module(), any(), pid())
-	-> {ok, pid()}.
-start_link(NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts, ReqsPid) ->
-	supervisor:start_link(?MODULE, [NbAcceptors,
-		Transport, TransOpts, Protocol, ProtoOpts, ReqsPid]).
+-spec start_link(non_neg_integer(), module(), any(),
+	module(), any(), pid(), pid()) -> {ok, pid()}.
+start_link(NbAcceptors, Transport, TransOpts,
+		Protocol, ProtoOpts, ListenerPid, ReqsPid) ->
+	supervisor:start_link(?MODULE, [NbAcceptors, Transport, TransOpts,
+		Protocol, ProtoOpts, ListenerPid, ReqsPid]).
 
 %% supervisor.
 
 -spec init(list()) -> {ok, {{one_for_one, 10, 10}, list()}}.
-init([NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts, ReqsPid]) ->
+init([NbAcceptors, Transport, TransOpts,
+		Protocol, ProtoOpts, ListenerPid, ReqsPid]) ->
 	{ok, LSocket} = Transport:listen(TransOpts),
 	MaxConns = proplists:get_value(max_connections, TransOpts, 1024),
 	Procs = [{{acceptor, self(), N}, {cowboy_acceptor, start_link, [
-				LSocket, Transport, Protocol, ProtoOpts, MaxConns, ReqsPid
+				LSocket, Transport, Protocol, ProtoOpts,
+				MaxConns, ListenerPid, ReqsPid
 			]}, permanent, brutal_kill, worker, dynamic}
 		|| N <- lists:seq(1, NbAcceptors)],
 	{ok, {{one_for_one, 10, 10}, Procs}}.

+ 151 - 0
src/cowboy_listener.erl

@@ -0,0 +1,151 @@
+%% Copyright (c) 2011, Loïc Hoguin <essen@dev-extend.eu>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+%% @doc Public API for managing listeners.
+-module(cowboy_listener).
+-behaviour(gen_server).
+
+-export([start_link/0, stop/1,
+	add_connection/3, remove_connection/2, wait/3]). %% API.
+-export([init/1, handle_call/3, handle_cast/2,
+	handle_info/2, terminate/2, code_change/3]). %% gen_server.
+
+-record(state, {
+	req_pools = [] :: [{atom(), non_neg_integer()}],
+	reqs_table :: ets:tid(),
+	queue = [] :: [{pid(), reference()}]
+}).
+
+%% API.
+
+%% @private
+-spec start_link() -> {ok, pid()}.
+start_link() ->
+	gen_server:start_link(?MODULE, [], []).
+
+%% @private
+-spec stop(pid()) -> stopped.
+stop(ServerPid) ->
+	gen_server:call(ServerPid, stop).
+
+%% @doc Add a connection to the given pool in the listener.
+%%
+%% Pools of connections are used to restrict the maximum number of connections
+%% depending on their type. By default, Cowboy add all connections to the
+%% pool <em>default</em>. It also checks for the maximum number of connections
+%% in that pool before accepting again.
+%%
+%% When a process managing a connection dies, the process is removed from the
+%% pool. If the socket has been sent to another process, it is up to the
+%% protocol code to inform the listener of the new <em>ConnPid</em> by removing
+%% the previous and adding the new one.
+-spec add_connection(pid(), atom(), pid()) -> {ok, non_neg_integer()}.
+add_connection(ServerPid, Pool, ConnPid) ->
+	gen_server:call(ServerPid, {add_connection, Pool, ConnPid}).
+
+%% @doc Remove the given connection from its pool.
+-spec remove_connection(pid(), pid()) -> ok.
+remove_connection(ServerPid, ConnPid) ->
+	gen_server:cast(ServerPid, {remove_connection, ConnPid}).
+
+%% @doc Wait until the number of connections in the given pool gets below
+%% the given threshold.
+%%
+%% This function will not return until the number of connections in the pool
+%% gets below <em>MaxConns</em>. It makes use of <em>gen_server:reply/2</em>
+%% to make the process wait for a reply indefinitely.
+-spec wait(pid(), atom(), non_neg_integer()) -> ok.
+wait(ServerPid, Pool, MaxConns) ->
+	gen_server:call(ServerPid, {wait, Pool, MaxConns}, infinity).
+
+%% gen_server.
+
+%% @private
+-spec init([]) -> {ok, #state{}}.
+init([]) ->
+	ReqsTablePid = ets:new(requests_table, [set, private]),
+	{ok, #state{reqs_table=ReqsTablePid}}.
+
+%% @private
+-spec handle_call(_, _, State)
+	-> {reply, ignored, State} | {stop, normal, stopped, State}.
+handle_call({add_connection, Pool, ConnPid}, _From, State=#state{
+		req_pools=Pools, reqs_table=ReqsTable}) ->
+	MonitorRef = erlang:monitor(process, ConnPid),
+	{NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of
+		false ->
+			{1, [{Pool, 1}|Pools]};
+		{Pool, NbConns} ->
+			NbConns2 = NbConns + 1,
+			{NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]}
+	end,
+	ets:insert(ReqsTable, {ConnPid, {MonitorRef, Pool}}),
+	{reply, {ok, NbConnsRet}, State#state{req_pools=Pools2}};
+handle_call({wait, Pool, MaxConns}, From, State=#state{
+		req_pools=Pools, queue=Queue}) ->
+	case lists:keyfind(Pool, 1, Pools) of
+		{Pool, NbConns} when NbConns > MaxConns ->
+			{noreply, State#state{queue=[From|Queue]}};
+		_Any ->
+			{reply, ok, State}
+	end;
+handle_call(stop, _From, State) ->
+	{stop, normal, stopped, State};
+handle_call(_Request, _From, State) ->
+	{reply, ignored, State}.
+
+%% @private
+-spec handle_cast(_, State) -> {noreply, State}.
+handle_cast({remove_connection, ConnPid}, State) ->
+	State2 = remove_pid(ConnPid, State),
+	{noreply, State2};
+handle_cast(_Msg, State) ->
+	{noreply, State}.
+
+%% @private
+-spec handle_info(_, State) -> {noreply, State}.
+handle_info({'DOWN', _Ref, process, Pid, _Info}, State) ->
+	State2 = remove_pid(Pid, State),
+	{noreply, State2};
+handle_info(_Info, State) ->
+	{noreply, State}.
+
+%% @private
+-spec terminate(_, _) -> ok.
+terminate(_Reason, _State) ->
+	ok.
+
+%% @private
+-spec code_change(_, State, _) -> {ok, State}.
+code_change(_OldVsn, State, _Extra) ->
+	{ok, State}.
+
+%% Internal.
+
+%% @private
+-spec remove_pid(pid(), State) -> State.
+remove_pid(Pid, State=#state{
+		req_pools=Pools, reqs_table=ReqsTable, queue=Queue}) ->
+	{MonitorRef, Pool} = ets:lookup_element(ReqsTable, Pid, 2),
+	erlang:demonitor(MonitorRef, [flush]),
+	{Pool, NbConns} = lists:keyfind(Pool, 1, Pools),
+	Pools2 = [{Pool, NbConns - 1}|lists:keydelete(Pool, 1, Pools)],
+	ets:delete(ReqsTable, Pid),
+	case Queue of
+		[] ->
+			State#state{req_pools=Pools2};
+		[Client|Queue2] ->
+			gen_server:reply(Client, ok),
+			State#state{req_pools=Pools2, queue=Queue2}
+	end.

+ 7 - 3
src/cowboy_listener_sup.erl

@@ -25,17 +25,21 @@
 	-> {ok, pid()}.
 start_link(NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) ->
 	{ok, SupPid} = supervisor:start_link(?MODULE, []),
+	{ok, ListenerPid} = supervisor:start_child(SupPid,
+		{cowboy_listener, {cowboy_listener, start_link, []},
+		 permanent, 5000, worker, dynamic}),
 	{ok, ReqsPid} = supervisor:start_child(SupPid,
 		{cowboy_requests_sup, {cowboy_requests_sup, start_link, []},
 		 permanent, 5000, supervisor, [cowboy_requests_sup]}),
 	{ok, _PoolPid} = supervisor:start_child(SupPid,
 		{cowboy_acceptors_sup, {cowboy_acceptors_sup, start_link, [
-			NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts, ReqsPid
+			NbAcceptors, Transport, TransOpts,
+			Protocol, ProtoOpts, ListenerPid, ReqsPid
 		]}, permanent, 5000, supervisor, [cowboy_acceptors_sup]}),
 	{ok, SupPid}.
 
 %% supervisor.
 
--spec init([]) -> {ok, {{one_for_one, 0, 1}, []}}.
+-spec init([]) -> {ok, {{one_for_all, 10, 10}, []}}.
 init([]) ->
-	{ok, {{one_for_one, 0, 1}, []}}.
+	{ok, {{one_for_all, 10, 10}, []}}.