ranch_listener.erl 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. %% Copyright (c) 2011-2012, Loïc Hoguin <essen@ninenines.eu>
  2. %%
  3. %% Permission to use, copy, modify, and/or distribute this software for any
  4. %% purpose with or without fee is hereby granted, provided that the above
  5. %% copyright notice and this permission notice appear in all copies.
  6. %%
  7. %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  8. %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  9. %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  10. %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  11. %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  12. %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  13. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  14. %% @doc Public API for managing listeners.
  15. -module(ranch_listener).
  16. -behaviour(gen_server).
  17. %% API.
  18. -export([start_link/2]).
  19. -export([stop/1]).
  20. -export([add_connection/4]).
  21. -export([move_connection/3]).
  22. -export([remove_connection/2]).
  23. -export([check_upgrades/2]).
  24. -export([get_protocol_options/1]).
  25. -export([set_protocol_options/2]).
  26. %% gen_server.
  27. -export([init/1]).
  28. -export([handle_call/3]).
  29. -export([handle_cast/2]).
  30. -export([handle_info/2]).
  31. -export([terminate/2]).
  32. -export([code_change/3]).
  33. -type pools() :: [{atom(), non_neg_integer()}].
  34. -record(state, {
  35. conn_pools = [] :: pools(),
  36. conns_table :: ets:tid(),
  37. queue = undefined :: queue(),
  38. max_conns = undefined :: non_neg_integer(),
  39. proto_opts :: any(),
  40. proto_opts_vsn = 1 :: non_neg_integer()
  41. }).
  42. %% API.
  43. %% @private
  44. %%
  45. %% We set the process priority to high because ranch_listener is the central
  46. %% gen_server in Ranch and is used to manage all the incoming connections.
  47. %% Setting the process priority to high ensures the connection-related code
  48. %% will always be executed when a connection needs it, allowing Ranch to
  49. %% scale far beyond what it would with a normal priority.
  50. -spec start_link(non_neg_integer(), any()) -> {ok, pid()}.
  51. start_link(MaxConns, ProtoOpts) ->
  52. gen_server:start_link(?MODULE, [MaxConns, ProtoOpts],
  53. [{spawn_opt, [{priority, high}]}]).
  54. %% @private
  55. -spec stop(pid()) -> stopped.
  56. stop(ServerPid) ->
  57. gen_server:call(ServerPid, stop).
  58. %% @doc Add a connection to the given pool in the listener.
  59. %%
  60. %% Pools of connections are used to restrict the maximum number of connections
  61. %% depending on their type. By default, Ranch add all connections to the
  62. %% pool <em>default</em>. It also checks for the maximum number of connections
  63. %% in that pool before accepting again. This function only returns when there
  64. %% is free space in the pool.
  65. %%
  66. %% When a process managing a connection dies, the process is removed from the
  67. %% pool. If the socket has been sent to another process, it is up to the
  68. %% protocol code to inform the listener of the new <em>ConnPid</em> by removing
  69. %% the previous and adding the new one.
  70. %%
  71. %% This function also returns whether the protocol options have been modified.
  72. %% If so, then an {upgrade, ProtoOpts, OptsVsn} will be returned instead of
  73. %% the atom 'ok'. The acceptor can then continue with the new protocol options.
  74. -spec add_connection(pid(), atom(), pid(), non_neg_integer())
  75. -> ok | {upgrade, any(), non_neg_integer()}.
  76. add_connection(ServerPid, Pool, ConnPid, OptsVsn) ->
  77. gen_server:call(ServerPid, {add_connection, Pool, ConnPid, OptsVsn},
  78. infinity).
  79. %% @doc Move a connection from one pool to another.
  80. -spec move_connection(pid(), atom(), pid()) -> ok.
  81. move_connection(ServerPid, DestPool, ConnPid) ->
  82. gen_server:cast(ServerPid, {move_connection, DestPool, ConnPid}).
  83. %% @doc Remove the given connection from its pool.
  84. -spec remove_connection(pid(), pid()) -> ok.
  85. remove_connection(ServerPid, ConnPid) ->
  86. gen_server:cast(ServerPid, {remove_connection, ConnPid}).
  87. %% @doc Return whether a protocol upgrade is required.
  88. -spec check_upgrades(pid(), non_neg_integer())
  89. -> ok | {upgrade, any(), non_neg_integer()}.
  90. check_upgrades(ServerPid, OptsVsn) ->
  91. gen_server:call(ServerPid, {check_upgrades, OptsVsn}).
  92. %% @doc Return the current protocol options.
  93. -spec get_protocol_options(pid()) -> {ok, any()}.
  94. get_protocol_options(ServerPid) ->
  95. gen_server:call(ServerPid, get_protocol_options).
  96. %% @doc Upgrade the protocol options.
  97. -spec set_protocol_options(pid(), any()) -> ok.
  98. set_protocol_options(ServerPid, ProtoOpts) ->
  99. gen_server:call(ServerPid, {set_protocol_options, ProtoOpts}).
  100. %% gen_server.
  101. %% @private
  102. init([MaxConns, ProtoOpts]) ->
  103. ConnsTable = ets:new(connections_table, [set, private]),
  104. Queue = queue:new(),
  105. {ok, #state{conns_table=ConnsTable, max_conns=MaxConns,
  106. proto_opts=ProtoOpts, queue=Queue}}.
  107. %% @private
  108. handle_call({add_connection, Pool, ConnPid, AccOptsVsn}, From, State=#state{
  109. conn_pools=Pools, conns_table=ConnsTable,
  110. queue=Queue, max_conns=MaxConns,
  111. proto_opts=ProtoOpts, proto_opts_vsn=LisOptsVsn}) ->
  112. {NbConns, Pools2} = add_pid(ConnPid, Pool, Pools, ConnsTable),
  113. State2 = State#state{conn_pools=Pools2},
  114. if AccOptsVsn =/= LisOptsVsn ->
  115. {reply, {upgrade, ProtoOpts, LisOptsVsn}, State2};
  116. NbConns > MaxConns ->
  117. Queue2 = queue:in(From, Queue),
  118. {noreply, State2#state{queue=Queue2}};
  119. true ->
  120. {reply, ok, State2}
  121. end;
  122. handle_call({check_upgrades, AccOptsVsn}, _From, State=#state{
  123. proto_opts=ProtoOpts, proto_opts_vsn=LisOptsVsn}) ->
  124. if AccOptsVsn =/= LisOptsVsn ->
  125. {reply, {upgrade, ProtoOpts, LisOptsVsn}, State};
  126. true ->
  127. {reply, ok, State}
  128. end;
  129. handle_call(get_protocol_options, _From, State=#state{proto_opts=ProtoOpts}) ->
  130. {reply, {ok, ProtoOpts}, State};
  131. handle_call({set_protocol_options, ProtoOpts}, _From,
  132. State=#state{proto_opts_vsn=OptsVsn}) ->
  133. {reply, ok, State#state{proto_opts=ProtoOpts, proto_opts_vsn=OptsVsn + 1}};
  134. handle_call(stop, _From, State) ->
  135. {stop, normal, stopped, State};
  136. handle_call(_, _From, State) ->
  137. {reply, ignored, State}.
  138. %% @private
  139. handle_cast({move_connection, DestPool, ConnPid}, State=#state{
  140. conn_pools=Pools, conns_table=ConnsTable}) ->
  141. Pools2 = move_pid(ConnPid, DestPool, Pools, ConnsTable),
  142. {noreply, State#state{conn_pools=Pools2}};
  143. handle_cast({remove_connection, ConnPid}, State=#state{
  144. conn_pools=Pools, conns_table=ConnsTable, queue=Queue}) ->
  145. {Pools2, Queue2} = remove_pid(ConnPid, Pools, ConnsTable, Queue),
  146. {noreply, State#state{conn_pools=Pools2, queue=Queue2}};
  147. handle_cast(_Msg, State) ->
  148. {noreply, State}.
  149. %% @private
  150. handle_info({'DOWN', _Ref, process, Pid, _Info}, State=#state{
  151. conn_pools=Pools, conns_table=ConnsTable, queue=Queue}) ->
  152. {Pools2, Queue2} = remove_pid(Pid, Pools, ConnsTable, Queue),
  153. {noreply, State#state{conn_pools=Pools2, queue=Queue2}};
  154. handle_info(_Info, State) ->
  155. {noreply, State}.
  156. %% @private
  157. terminate(_Reason, _State) ->
  158. ok.
  159. %% @private
  160. code_change(_OldVsn, State, _Extra) ->
  161. {ok, State}.
  162. %% Internal.
  163. %% @private
  164. -spec add_pid(pid(), atom(), pools(), ets:tid())
  165. -> {non_neg_integer(), pools()}.
  166. add_pid(ConnPid, Pool, Pools, ConnsTable) ->
  167. MonitorRef = erlang:monitor(process, ConnPid),
  168. ConnPid ! {shoot, self()},
  169. {NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of
  170. false ->
  171. {1, [{Pool, 1}|Pools]};
  172. {Pool, NbConns} ->
  173. NbConns2 = NbConns + 1,
  174. {NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]}
  175. end,
  176. ets:insert(ConnsTable, {ConnPid, {MonitorRef, Pool}}),
  177. {NbConnsRet, Pools2}.
  178. %% @private
  179. -spec move_pid(pid(), atom(), pools(), ets:tid()) -> pools().
  180. move_pid(ConnPid, DestPool, Pools, ConnsTable) ->
  181. {MonitorRef, SrcPool} = ets:lookup_element(ConnsTable, ConnPid, 2),
  182. ets:insert(ConnsTable, {ConnPid, {MonitorRef, DestPool}}),
  183. {SrcPool, SrcNbConns} = lists:keyfind(SrcPool, 1, Pools),
  184. DestNbConns = case lists:keyfind(DestPool, 1, Pools) of
  185. false -> 1;
  186. {DestPool, NbConns} -> NbConns + 1
  187. end,
  188. Pools2 = lists:keydelete(SrcPool, 1, lists:keydelete(DestPool, 1, Pools)),
  189. [{SrcPool, SrcNbConns - 1}, {DestPool, DestNbConns}|Pools2].
  190. %% @private
  191. -spec remove_pid(pid(), pools(), ets:tid(), queue()) -> {pools(), queue()}.
  192. remove_pid(Pid, Pools, ConnsTable, Queue) ->
  193. {MonitorRef, Pool} = ets:lookup_element(ConnsTable, Pid, 2),
  194. erlang:demonitor(MonitorRef, [flush]),
  195. {Pool, NbConns} = lists:keyfind(Pool, 1, Pools),
  196. Pools2 = [{Pool, NbConns - 1}|lists:keydelete(Pool, 1, Pools)],
  197. ets:delete(ConnsTable, Pid),
  198. case queue:out(Queue) of
  199. {{value, Client}, Queue2} ->
  200. gen_server:reply(Client, ok),
  201. {Pools2, Queue2};
  202. _ ->
  203. {Pools2, Queue}
  204. end.