Browse Source

Register acceptors through ranch_server

Loïc Hoguin 12 years ago
parent
commit
1d2940b379
4 changed files with 51 additions and 16 deletions
  1. 4 3
      src/ranch_acceptor.erl
  2. 6 6
      src/ranch_acceptors_sup.erl
  3. 1 1
      src/ranch_listener_sup.erl
  4. 40 6
      src/ranch_server.erl

+ 4 - 3
src/ranch_acceptor.erl

@@ -16,19 +16,20 @@
 -module(ranch_acceptor).
 
 %% API.
--export([start_link/6]).
+-export([start_link/7]).
 
 %% Internal.
 -export([acceptor/7]).
 
 %% API.
 
--spec start_link(inet:socket(), module(), module(), any(),
+-spec start_link(any(), inet:socket(), module(), module(), any(),
 	pid(), pid()) -> {ok, pid()}.
-start_link(LSocket, Transport, Protocol, Opts,
+start_link(Ref, LSocket, Transport, Protocol, Opts,
 		ListenerPid, ConnsSup) ->
 	Pid = spawn_link(?MODULE, acceptor,
 		[LSocket, Transport, Protocol, Opts, 1, ListenerPid, ConnsSup]),
+	ok = ranch_server:add_acceptor(Ref, Pid),
 	{ok, Pid}.
 
 %% Internal.

+ 6 - 6
src/ranch_acceptors_sup.erl

@@ -17,29 +17,29 @@
 -behaviour(supervisor).
 
 %% API.
--export([start_link/7]).
+-export([start_link/8]).
 
 %% supervisor.
 -export([init/1]).
 
 %% API.
 
--spec start_link(non_neg_integer(), module(), any(),
+-spec start_link(any(), non_neg_integer(), module(), any(),
 	module(), any(), pid(), pid()) -> {ok, pid()}.
-start_link(NbAcceptors, Transport, TransOpts,
+start_link(Ref, NbAcceptors, Transport, TransOpts,
 		Protocol, ProtoOpts, ListenerPid, ConnsPid) ->
-	supervisor:start_link(?MODULE, [NbAcceptors, Transport, TransOpts,
+	supervisor:start_link(?MODULE, [Ref, NbAcceptors, Transport, TransOpts,
 		Protocol, ProtoOpts, ListenerPid, ConnsPid]).
 
 %% supervisor.
 
-init([NbAcceptors, Transport, TransOpts,
+init([Ref, NbAcceptors, Transport, TransOpts,
 		Protocol, ProtoOpts, ListenerPid, ConnsPid]) ->
 	{ok, LSocket} = Transport:listen(TransOpts),
 	{ok, {_, Port}} = Transport:sockname(LSocket),
 	ranch_listener:set_port(ListenerPid, Port),
 	Procs = [{{acceptor, self(), N}, {ranch_acceptor, start_link, [
-				LSocket, Transport, Protocol, ProtoOpts,
+				Ref, LSocket, Transport, Protocol, ProtoOpts,
 				ListenerPid, ConnsPid
       ]}, permanent, brutal_kill, worker, []}
 		|| N <- lists:seq(1, NbAcceptors)],

+ 1 - 1
src/ranch_listener_sup.erl

@@ -38,7 +38,7 @@ start_link(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) ->
 		 permanent, 5000, supervisor, [ranch_conns_sup]}),
 	{ok, _PoolPid} = supervisor:start_child(SupPid,
 		{ranch_acceptors_sup, {ranch_acceptors_sup, start_link, [
-			NbAcceptors, Transport, TransOpts,
+			Ref, NbAcceptors, Transport, TransOpts,
 			Protocol, ProtoOpts, ListenerPid, ConnsPid
 		]}, permanent, 5000, supervisor, [ranch_acceptors_sup]}),
 	{ok, SupPid}.

+ 40 - 6
src/ranch_server.erl

@@ -20,6 +20,8 @@
 -export([start_link/0]).
 -export([insert_listener/2]).
 -export([lookup_listener/1]).
+-export([add_acceptor/2]).
+-export([send_to_acceptors/2]).
 
 %% gen_server.
 -export([init/1]).
@@ -31,9 +33,10 @@
 
 -define(TAB, ?MODULE).
 
--type key() :: {listener, any()}.
+-type key() :: {listener | acceptors, any()}.
+-type monitors() :: [{{reference(), pid()}, key()}].
 -record(state, {
-	monitors = [] :: [{{reference(), pid()}, key()}]
+	monitors = [] :: monitors()
 }).
 
 %% API.
@@ -54,6 +57,18 @@ insert_listener(Ref, Pid) ->
 lookup_listener(Ref) ->
 	ets:lookup_element(?TAB, {listener, Ref}, 2).
 
+%% @doc Add an acceptor for the given listener.
+-spec add_acceptor(any(), pid()) -> ok.
+add_acceptor(Ref, Pid) ->
+	gen_server:cast(?MODULE, {add_acceptor, Ref, Pid}).
+
+%% @doc Send a message to all acceptors of the given listener.
+-spec send_to_acceptors(any(), any()) -> ok.
+send_to_acceptors(Ref, Msg) ->
+	Acceptors = ets:lookup_element(?TAB, {acceptors, Ref}, 2),
+	_ = [Pid ! Msg || Pid <- Acceptors],
+	ok.
+
 %% gen_server.
 
 %% @private
@@ -68,17 +83,24 @@ handle_call(_Request, _From, State) ->
 
 %% @private
 handle_cast({insert_listener, Ref, Pid}, State=#state{monitors=Monitors}) ->
+	true = ets:insert_new(?TAB, {{acceptors, Ref}, []}),
 	MonitorRef = erlang:monitor(process, Pid),
 	{noreply, State#state{
 		monitors=[{{MonitorRef, Pid}, {listener, Ref}}|Monitors]}};
+handle_cast({add_acceptor, Ref, Pid}, State=#state{monitors=Monitors}) ->
+	MonitorRef = erlang:monitor(process, Pid),
+	Acceptors = ets:lookup_element(?TAB, {acceptors, Ref}, 2),
+	true = ets:insert(?TAB, {{acceptors, Ref}, [Pid|Acceptors]}),
+	{noreply, State#state{
+		monitors=[{{MonitorRef, Pid}, {acceptors, Ref}}|Monitors]}};
 handle_cast(_Request, State) ->
 	{noreply, State}.
 
 %% @private
-handle_info({'DOWN', Ref, process, Pid, _}, State=#state{monitors=Monitors}) ->
-	{_, Key} = lists:keyfind({Ref, Pid}, 1, Monitors),
-	true = ets:delete(?TAB, Key),
-	Monitors2 = lists:keydelete({Ref, Pid}, 1, Monitors),
+handle_info({'DOWN', MonitorRef, process, Pid, _},
+		State=#state{monitors=Monitors}) ->
+	{_, Key} = lists:keyfind({MonitorRef, Pid}, 1, Monitors),
+	Monitors2 = remove_process(Key, MonitorRef, Pid, Monitors),
 	{noreply, State#state{monitors=Monitors2}};
 handle_info(_Info, State) ->
 	{noreply, State}.
@@ -90,3 +112,15 @@ terminate(_Reason, _State) ->
 %% @private
 code_change(_OldVsn, State, _Extra) ->
 	{ok, State}.
+
+%% Internal.
+
+-spec remove_process(key(), reference(), pid(), Monitors)
+	-> Monitors when Monitors::monitors() .
+remove_process(Key = {listener, _}, MonitorRef, Pid, Monitors) ->
+	true = ets:delete(?TAB, Key),
+	lists:keydelete({MonitorRef, Pid}, 1, Monitors);
+remove_process(Key = {acceptors, _}, MonitorRef, Pid, Monitors) ->
+	Acceptors = ets:lookup_element(?TAB, Key, 2),
+	true = ets:insert(?TAB, {Key, lists:delete(Pid, Acceptors)}),
+	lists:keydelete({MonitorRef, Pid}, 1, Monitors).