cowboy_listener.erl 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. %% Copyright (c) 2011, Loïc Hoguin <essen@dev-extend.eu>
  2. %%
  3. %% Permission to use, copy, modify, and/or distribute this software for any
  4. %% purpose with or without fee is hereby granted, provided that the above
  5. %% copyright notice and this permission notice appear in all copies.
  6. %%
  7. %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  8. %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  9. %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  10. %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  11. %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  12. %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  13. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  14. %% @doc Public API for managing listeners.
  15. -module(cowboy_listener).
  16. -behaviour(gen_server).
  17. -export([start_link/2, stop/1,
  18. add_connection/4, move_connection/3, remove_connection/2,
  19. get_protocol_options/1, set_protocol_options/2]). %% API.
  20. -export([init/1, handle_call/3, handle_cast/2,
  21. handle_info/2, terminate/2, code_change/3]). %% gen_server.
  22. -type pools() :: [{atom(), non_neg_integer()}].
  23. -record(state, {
  24. req_pools = [] :: pools(),
  25. reqs_table :: ets:tid(),
  26. queue = [] :: [{pid(), reference()}],
  27. max_conns = undefined :: non_neg_integer(),
  28. proto_opts :: any(),
  29. proto_opts_vsn = 1 :: non_neg_integer()
  30. }).
  31. %% API.
  32. %% @private
  33. %%
  34. %% We set the process priority to high because cowboy_listener is the central
  35. %% gen_server in Cowboy and is used to manage all the incoming connections.
  36. %% Setting the process priority to high ensures the connection-related code
  37. %% will always be executed when a connection needs it, allowing Cowboy to
  38. %% scale far beyond what it would with a normal priority.
  39. -spec start_link(non_neg_integer(), any()) -> {ok, pid()}.
  40. start_link(MaxConns, ProtoOpts) ->
  41. gen_server:start_link(?MODULE, [MaxConns, ProtoOpts],
  42. [{spawn_opt, [{priority, high}]}]).
  43. %% @private
  44. -spec stop(pid()) -> stopped.
  45. stop(ServerPid) ->
  46. gen_server:call(ServerPid, stop).
  47. %% @doc Add a connection to the given pool in the listener.
  48. %%
  49. %% Pools of connections are used to restrict the maximum number of connections
  50. %% depending on their type. By default, Cowboy add all connections to the
  51. %% pool <em>default</em>. It also checks for the maximum number of connections
  52. %% in that pool before accepting again. This function only returns when there
  53. %% is free space in the pool.
  54. %%
  55. %% When a process managing a connection dies, the process is removed from the
  56. %% pool. If the socket has been sent to another process, it is up to the
  57. %% protocol code to inform the listener of the new <em>ConnPid</em> by removing
  58. %% the previous and adding the new one.
  59. %%
  60. %% This function also returns whether the protocol options have been modified.
  61. %% If so, then an {upgrade, ProtoOpts, OptsVsn} will be returned instead of
  62. %% the atom 'ok'. The acceptor can then continue with the new protocol options.
  63. -spec add_connection(pid(), atom(), pid(), non_neg_integer())
  64. -> ok | {upgrade, any(), non_neg_integer()}.
  65. add_connection(ServerPid, Pool, ConnPid, OptsVsn) ->
  66. gen_server:call(ServerPid, {add_connection, Pool, ConnPid, OptsVsn},
  67. infinity).
  68. %% @doc Move a connection from one pool to another.
  69. -spec move_connection(pid(), atom(), pid()) -> ok.
  70. move_connection(ServerPid, DestPool, ConnPid) ->
  71. gen_server:cast(ServerPid, {move_connection, DestPool, ConnPid}).
  72. %% @doc Remove the given connection from its pool.
  73. -spec remove_connection(pid(), pid()) -> ok.
  74. remove_connection(ServerPid, ConnPid) ->
  75. gen_server:cast(ServerPid, {remove_connection, ConnPid}).
  76. %% @doc Return the current protocol options.
  77. -spec get_protocol_options(pid()) -> {ok, any()}.
  78. get_protocol_options(ServerPid) ->
  79. gen_server:call(ServerPid, get_protocol_options).
  80. %% @doc Upgrade the protocol options.
  81. -spec set_protocol_options(pid(), any()) -> ok.
  82. set_protocol_options(ServerPid, ProtoOpts) ->
  83. gen_server:call(ServerPid, {set_protocol_options, ProtoOpts}).
  84. %% gen_server.
  85. %% @private
  86. -spec init(list()) -> {ok, #state{}}.
  87. init([MaxConns, ProtoOpts]) ->
  88. ReqsTable = ets:new(requests_table, [set, private]),
  89. {ok, #state{reqs_table=ReqsTable, max_conns=MaxConns,
  90. proto_opts=ProtoOpts}}.
  91. %% @private
  92. -spec handle_call(_, _, State)
  93. -> {reply, ignored, State} | {stop, normal, stopped, State}.
  94. handle_call({add_connection, Pool, ConnPid, AccOptsVsn}, From, State=#state{
  95. req_pools=Pools, reqs_table=ReqsTable,
  96. queue=Queue, max_conns=MaxConns,
  97. proto_opts=ProtoOpts, proto_opts_vsn=LisOptsVsn}) ->
  98. {NbConns, Pools2} = add_pid(ConnPid, Pool, Pools, ReqsTable),
  99. State2 = State#state{req_pools=Pools2},
  100. if AccOptsVsn =/= LisOptsVsn ->
  101. {reply, {ugprade, ProtoOpts, LisOptsVsn}, State2};
  102. NbConns > MaxConns ->
  103. {noreply, State2#state{queue=[From|Queue]}};
  104. true ->
  105. {reply, ok, State2}
  106. end;
  107. handle_call(get_protocol_options, _From, State=#state{proto_opts=ProtoOpts}) ->
  108. {reply, {ok, ProtoOpts}, State};
  109. handle_call({set_protocol_options, ProtoOpts}, _From,
  110. State=#state{proto_opts_vsn=OptsVsn}) ->
  111. {reply, ok, State#state{proto_opts=ProtoOpts, proto_opts_vsn=OptsVsn + 1}};
  112. handle_call(stop, _From, State) ->
  113. {stop, normal, stopped, State};
  114. handle_call(_Request, _From, State) ->
  115. {reply, ignored, State}.
  116. %% @private
  117. -spec handle_cast(_, State) -> {noreply, State}.
  118. handle_cast({move_connection, DestPool, ConnPid}, State=#state{
  119. req_pools=Pools, reqs_table=ReqsTable}) ->
  120. {MonitorRef, SrcPool} = ets:lookup_element(ReqsTable, ConnPid, 2),
  121. ets:insert(ReqsTable, {ConnPid, {MonitorRef, DestPool}}),
  122. {SrcPool, SrcNbConns} = lists:keyfind(SrcPool, 1, Pools),
  123. DestNbConns = case lists:keyfind(DestPool, 1, Pools) of
  124. false -> 1;
  125. {DestPool, NbConns} -> NbConns + 1
  126. end,
  127. Pools2 = lists:keydelete(SrcPool, 1, lists:keydelete(DestPool, 1, Pools)),
  128. Pools3 = [{SrcPool, SrcNbConns - 1}, {DestPool, DestNbConns}|Pools2],
  129. {noreply, State#state{req_pools=Pools3}};
  130. handle_cast({remove_connection, ConnPid}, State) ->
  131. State2 = remove_pid(ConnPid, State),
  132. {noreply, State2};
  133. handle_cast(_Msg, State) ->
  134. {noreply, State}.
  135. %% @private
  136. -spec handle_info(_, State) -> {noreply, State}.
  137. handle_info({'DOWN', _Ref, process, Pid, _Info}, State) ->
  138. State2 = remove_pid(Pid, State),
  139. {noreply, State2};
  140. handle_info(_Info, State) ->
  141. {noreply, State}.
  142. %% @private
  143. -spec terminate(_, _) -> ok.
  144. terminate(_Reason, _State) ->
  145. ok.
  146. %% @private
  147. -spec code_change(_, State, _) -> {ok, State}.
  148. code_change(_OldVsn, State, _Extra) ->
  149. {ok, State}.
  150. %% Internal.
  151. %% @private
  152. -spec add_pid(pid(), atom(), pools(), ets:tid())
  153. -> {non_neg_integer(), pools()}.
  154. add_pid(ConnPid, Pool, Pools, ReqsTable) ->
  155. MonitorRef = erlang:monitor(process, ConnPid),
  156. ConnPid ! {shoot, self()},
  157. {NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of
  158. false ->
  159. {1, [{Pool, 1}|Pools]};
  160. {Pool, NbConns} ->
  161. NbConns2 = NbConns + 1,
  162. {NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]}
  163. end,
  164. ets:insert(ReqsTable, {ConnPid, {MonitorRef, Pool}}),
  165. {NbConnsRet, Pools2}.
  166. %% @private
  167. -spec remove_pid(pid(), State) -> State.
  168. remove_pid(Pid, State=#state{
  169. req_pools=Pools, reqs_table=ReqsTable, queue=Queue}) ->
  170. {MonitorRef, Pool} = ets:lookup_element(ReqsTable, Pid, 2),
  171. erlang:demonitor(MonitorRef, [flush]),
  172. {Pool, NbConns} = lists:keyfind(Pool, 1, Pools),
  173. Pools2 = [{Pool, NbConns - 1}|lists:keydelete(Pool, 1, Pools)],
  174. ets:delete(ReqsTable, Pid),
  175. case Queue of
  176. [] ->
  177. State#state{req_pools=Pools2};
  178. [Client|Queue2] ->
  179. gen_server:reply(Client, ok),
  180. State#state{req_pools=Pools2, queue=Queue2}
  181. end.