cowboy_listener.erl 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. %% Copyright (c) 2011, Loïc Hoguin <essen@dev-extend.eu>
  2. %%
  3. %% Permission to use, copy, modify, and/or distribute this software for any
  4. %% purpose with or without fee is hereby granted, provided that the above
  5. %% copyright notice and this permission notice appear in all copies.
  6. %%
  7. %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  8. %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  9. %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  10. %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  11. %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  12. %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  13. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  14. %% @doc Public API for managing listeners.
  15. -module(cowboy_listener).
  16. -behaviour(gen_server).
  17. -export([start_link/0, stop/1,
  18. add_connection/3, move_connection/3, remove_connection/2, wait/3]). %% API.
  19. -export([init/1, handle_call/3, handle_cast/2,
  20. handle_info/2, terminate/2, code_change/3]). %% gen_server.
  21. -record(state, {
  22. req_pools = [] :: [{atom(), non_neg_integer()}],
  23. reqs_table :: ets:tid(),
  24. queue = [] :: [{pid(), reference()}]
  25. }).
  26. %% API.
  27. %% @private
  28. %%
  29. %% We set the process priority to high because cowboy_listener is the central
  30. %% gen_server in Cowboy and is used to manage all the incoming connections.
  31. %% Setting the process priority to high ensures the connection-related code
  32. %% will always be executed when a connection needs it, allowing Cowboy to
  33. %% scale far beyond what it would with a normal priority.
  34. -spec start_link() -> {ok, pid()}.
  35. start_link() ->
  36. gen_server:start_link(?MODULE, [], [{spawn_opt, [{priority, high}]}]).
  37. %% @private
  38. -spec stop(pid()) -> stopped.
  39. stop(ServerPid) ->
  40. gen_server:call(ServerPid, stop).
  41. %% @doc Add a connection to the given pool in the listener.
  42. %%
  43. %% Pools of connections are used to restrict the maximum number of connections
  44. %% depending on their type. By default, Cowboy add all connections to the
  45. %% pool <em>default</em>. It also checks for the maximum number of connections
  46. %% in that pool before accepting again.
  47. %%
  48. %% When a process managing a connection dies, the process is removed from the
  49. %% pool. If the socket has been sent to another process, it is up to the
  50. %% protocol code to inform the listener of the new <em>ConnPid</em> by removing
  51. %% the previous and adding the new one.
  52. -spec add_connection(pid(), atom(), pid()) -> {ok, non_neg_integer()}.
  53. add_connection(ServerPid, Pool, ConnPid) ->
  54. gen_server:call(ServerPid, {add_connection, Pool, ConnPid}).
  55. %% @doc Move a connection from one pool to another.
  56. -spec move_connection(pid(), atom(), pid()) -> ok.
  57. move_connection(ServerPid, DestPool, ConnPid) ->
  58. gen_server:cast(ServerPid, {move_connection, DestPool, ConnPid}).
  59. %% @doc Remove the given connection from its pool.
  60. -spec remove_connection(pid(), pid()) -> ok.
  61. remove_connection(ServerPid, ConnPid) ->
  62. gen_server:cast(ServerPid, {remove_connection, ConnPid}).
  63. %% @doc Wait until the number of connections in the given pool gets below
  64. %% the given threshold.
  65. %%
  66. %% This function will not return until the number of connections in the pool
  67. %% gets below <em>MaxConns</em>. It makes use of <em>gen_server:reply/2</em>
  68. %% to make the process wait for a reply indefinitely.
  69. -spec wait(pid(), atom(), non_neg_integer()) -> ok.
  70. wait(ServerPid, Pool, MaxConns) ->
  71. gen_server:call(ServerPid, {wait, Pool, MaxConns}, infinity).
  72. %% gen_server.
  73. %% @private
  74. -spec init([]) -> {ok, #state{}}.
  75. init([]) ->
  76. ReqsTablePid = ets:new(requests_table, [set, private]),
  77. {ok, #state{reqs_table=ReqsTablePid}}.
  78. %% @private
  79. -spec handle_call(_, _, State)
  80. -> {reply, ignored, State} | {stop, normal, stopped, State}.
  81. handle_call({add_connection, Pool, ConnPid}, _From, State=#state{
  82. req_pools=Pools, reqs_table=ReqsTable}) ->
  83. MonitorRef = erlang:monitor(process, ConnPid),
  84. {NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of
  85. false ->
  86. {1, [{Pool, 1}|Pools]};
  87. {Pool, NbConns} ->
  88. NbConns2 = NbConns + 1,
  89. {NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]}
  90. end,
  91. ets:insert(ReqsTable, {ConnPid, {MonitorRef, Pool}}),
  92. {reply, {ok, NbConnsRet}, State#state{req_pools=Pools2}};
  93. handle_call({wait, Pool, MaxConns}, From, State=#state{
  94. req_pools=Pools, queue=Queue}) ->
  95. case lists:keyfind(Pool, 1, Pools) of
  96. {Pool, NbConns} when NbConns > MaxConns ->
  97. {noreply, State#state{queue=[From|Queue]}};
  98. _Any ->
  99. {reply, ok, State}
  100. end;
  101. handle_call(stop, _From, State) ->
  102. {stop, normal, stopped, State};
  103. handle_call(_Request, _From, State) ->
  104. {reply, ignored, State}.
  105. %% @private
  106. -spec handle_cast(_, State) -> {noreply, State}.
  107. handle_cast({move_connection, DestPool, ConnPid}, State=#state{
  108. req_pools=Pools, reqs_table=ReqsTable}) ->
  109. {MonitorRef, SrcPool} = ets:lookup_element(ReqsTable, ConnPid, 2),
  110. ets:insert(ReqsTable, {ConnPid, {MonitorRef, DestPool}}),
  111. {SrcPool, SrcNbConns} = lists:keyfind(SrcPool, 1, Pools),
  112. DestNbConns = case lists:keyfind(DestPool, 1, Pools) of
  113. false -> 1;
  114. {DestPool, NbConns} -> NbConns + 1
  115. end,
  116. Pools2 = lists:keydelete(SrcPool, 1, lists:keydelete(DestPool, 1, Pools)),
  117. Pools3 = [{SrcPool, SrcNbConns - 1}, {DestPool, DestNbConns}|Pools2],
  118. {noreply, State#state{req_pools=Pools3}};
  119. handle_cast({remove_connection, ConnPid}, State) ->
  120. State2 = remove_pid(ConnPid, State),
  121. {noreply, State2};
  122. handle_cast(_Msg, State) ->
  123. {noreply, State}.
  124. %% @private
  125. -spec handle_info(_, State) -> {noreply, State}.
  126. handle_info({'DOWN', _Ref, process, Pid, _Info}, State) ->
  127. State2 = remove_pid(Pid, State),
  128. {noreply, State2};
  129. handle_info(_Info, State) ->
  130. {noreply, State}.
  131. %% @private
  132. -spec terminate(_, _) -> ok.
  133. terminate(_Reason, _State) ->
  134. ok.
  135. %% @private
  136. -spec code_change(_, State, _) -> {ok, State}.
  137. code_change(_OldVsn, State, _Extra) ->
  138. {ok, State}.
  139. %% Internal.
  140. %% @private
  141. -spec remove_pid(pid(), State) -> State.
  142. remove_pid(Pid, State=#state{
  143. req_pools=Pools, reqs_table=ReqsTable, queue=Queue}) ->
  144. {MonitorRef, Pool} = ets:lookup_element(ReqsTable, Pid, 2),
  145. erlang:demonitor(MonitorRef, [flush]),
  146. {Pool, NbConns} = lists:keyfind(Pool, 1, Pools),
  147. Pools2 = [{Pool, NbConns - 1}|lists:keydelete(Pool, 1, Pools)],
  148. ets:delete(ReqsTable, Pid),
  149. case Queue of
  150. [] ->
  151. State#state{req_pools=Pools2};
  152. [Client|Queue2] ->
  153. gen_server:reply(Client, ok),
  154. State#state{req_pools=Pools2, queue=Queue2}
  155. end.