ranch_conns_sup.erl 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. %% Copyright (c) 2011-2012, Loïc Hoguin <essen@ninenines.eu>
  2. %%
  3. %% Permission to use, copy, modify, and/or distribute this software for any
  4. %% purpose with or without fee is hereby granted, provided that the above
  5. %% copyright notice and this permission notice appear in all copies.
  6. %%
  7. %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  8. %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  9. %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  10. %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  11. %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  12. %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  13. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  14. %% @private
  15. %%
  16. %% Make sure to never reload this module outside a release upgrade,
  17. %% as calling l(ranch_conns_sup) twice will kill the process and all
  18. %% the currently open connections.
  19. -module(ranch_conns_sup).
  20. %% API.
  21. -export([start_link/6]).
  22. -export([start_protocol/2]).
  23. -export([active_connections/1]).
  24. %% Supervisor internals.
  25. -export([init/7]).
  26. -export([system_continue/3]).
  27. -export([system_terminate/4]).
  28. -export([system_code_change/4]).
  29. -type conn_type() :: worker | supervisor.
  30. -type shutdown() :: brutal_kill | timeout().
  31. -record(state, {
  32. parent = undefined :: pid(),
  33. ref :: ranch:ref(),
  34. conn_type :: conn_type(),
  35. shutdown :: shutdown(),
  36. transport = undefined :: module(),
  37. protocol = undefined :: module(),
  38. opts :: any(),
  39. ack_timeout :: timeout(),
  40. max_conns = undefined :: ranch:max_conns()
  41. }).
  42. %% API.
  43. -spec start_link(ranch:ref(), conn_type(), shutdown(), module(),
  44. timeout(), module()) -> {ok, pid()}.
  45. start_link(Ref, ConnType, Shutdown, Transport, AckTimeout, Protocol) ->
  46. proc_lib:start_link(?MODULE, init,
  47. [self(), Ref, ConnType, Shutdown, Transport, AckTimeout, Protocol]).
  48. %% We can safely assume we are on the same node as the supervisor.
  49. %%
  50. %% We can also safely avoid having a monitor and a timeout here
  51. %% because only three things can happen:
  52. %% * The supervisor died; rest_for_one strategy killed all acceptors
  53. %% so this very calling process is going to di--
  54. %% * There's too many connections, the supervisor will resume the
  55. %% acceptor only when we get below the limit again.
  56. %% * The supervisor is overloaded, there's either too many acceptors
  57. %% or the max_connections limit is too large. It's better if we
  58. %% don't keep accepting connections because this leaves
  59. %% more room for the situation to be resolved.
  60. %%
  61. %% We do not need the reply, we only need the ok from the supervisor
  62. %% to continue. The supervisor sends its own pid when the acceptor can
  63. %% continue.
  64. -spec start_protocol(pid(), inet:socket()) -> ok.
  65. start_protocol(SupPid, Socket) ->
  66. SupPid ! {?MODULE, start_protocol, self(), Socket},
  67. receive SupPid -> ok end.
  68. %% We can't make the above assumptions here. This function might be
  69. %% called from anywhere.
  70. -spec active_connections(pid()) -> non_neg_integer().
  71. active_connections(SupPid) ->
  72. Tag = erlang:monitor(process, SupPid),
  73. catch erlang:send(SupPid, {?MODULE, active_connections, self(), Tag},
  74. [noconnect]),
  75. receive
  76. {Tag, Ret} ->
  77. erlang:demonitor(Tag, [flush]),
  78. Ret;
  79. {'DOWN', Tag, _, _, noconnection} ->
  80. exit({nodedown, node(SupPid)});
  81. {'DOWN', Tag, _, _, Reason} ->
  82. exit(Reason)
  83. after 5000 ->
  84. erlang:demonitor(Tag, [flush]),
  85. exit(timeout)
  86. end.
  87. %% Supervisor internals.
  88. -spec init(pid(), ranch:ref(), conn_type(), shutdown(),
  89. module(), timeout(), module()) -> no_return().
  90. init(Parent, Ref, ConnType, Shutdown, Transport, AckTimeout, Protocol) ->
  91. process_flag(trap_exit, true),
  92. ok = ranch_server:set_connections_sup(Ref, self()),
  93. MaxConns = ranch_server:get_max_connections(Ref),
  94. Opts = ranch_server:get_protocol_options(Ref),
  95. ok = proc_lib:init_ack(Parent, {ok, self()}),
  96. loop(#state{parent=Parent, ref=Ref, conn_type=ConnType,
  97. shutdown=Shutdown, transport=Transport, protocol=Protocol,
  98. opts=Opts, ack_timeout=AckTimeout, max_conns=MaxConns}, 0, 0, []).
  99. loop(State=#state{parent=Parent, ref=Ref, conn_type=ConnType,
  100. transport=Transport, protocol=Protocol, opts=Opts,
  101. ack_timeout=AckTimeout, max_conns=MaxConns},
  102. CurConns, NbChildren, Sleepers) ->
  103. receive
  104. {?MODULE, start_protocol, To, Socket} ->
  105. case Protocol:start_link(Ref, Socket, Transport, Opts) of
  106. {ok, Pid} ->
  107. Transport:controlling_process(Socket, Pid),
  108. Pid ! {shoot, Ref, Transport, Socket, AckTimeout},
  109. put(Pid, true),
  110. CurConns2 = CurConns + 1,
  111. if CurConns2 < MaxConns ->
  112. To ! self(),
  113. loop(State, CurConns2, NbChildren + 1,
  114. Sleepers);
  115. true ->
  116. loop(State, CurConns2, NbChildren + 1,
  117. [To|Sleepers])
  118. end;
  119. Ret ->
  120. To ! self(),
  121. error_logger:error_msg(
  122. "Ranch listener ~p connection process start failure; "
  123. "~p:start_link/4 returned: ~999999p~n",
  124. [Ref, Protocol, Ret]),
  125. Transport:close(Socket),
  126. loop(State, CurConns, NbChildren, Sleepers)
  127. end;
  128. {?MODULE, active_connections, To, Tag} ->
  129. To ! {Tag, CurConns},
  130. loop(State, CurConns, NbChildren, Sleepers);
  131. %% Remove a connection from the count of connections.
  132. {remove_connection, Ref} ->
  133. loop(State, CurConns - 1, NbChildren, Sleepers);
  134. %% Upgrade the max number of connections allowed concurrently.
  135. %% We resume all sleeping acceptors if this number increases.
  136. {set_max_conns, MaxConns2} when MaxConns2 > MaxConns ->
  137. _ = [To ! self() || To <- Sleepers],
  138. loop(State#state{max_conns=MaxConns2},
  139. CurConns, NbChildren, []);
  140. {set_max_conns, MaxConns2} ->
  141. loop(State#state{max_conns=MaxConns2},
  142. CurConns, NbChildren, Sleepers);
  143. %% Upgrade the protocol options.
  144. {set_opts, Opts2} ->
  145. loop(State#state{opts=Opts2},
  146. CurConns, NbChildren, Sleepers);
  147. {'EXIT', Parent, Reason} ->
  148. terminate(State, Reason, NbChildren);
  149. {'EXIT', Pid, Reason} when Sleepers =:= [] ->
  150. report_error(Ref, Protocol, Pid, Reason),
  151. erase(Pid),
  152. loop(State, CurConns - 1, NbChildren - 1, Sleepers);
  153. %% Resume a sleeping acceptor if needed.
  154. {'EXIT', Pid, Reason} ->
  155. report_error(Ref, Protocol, Pid, Reason),
  156. erase(Pid),
  157. [To|Sleepers2] = Sleepers,
  158. To ! self(),
  159. loop(State, CurConns - 1, NbChildren - 1, Sleepers2);
  160. {system, From, Request} ->
  161. sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
  162. {State, CurConns, NbChildren, Sleepers});
  163. %% Calls from the supervisor module.
  164. {'$gen_call', {To, Tag}, which_children} ->
  165. Pids = get_keys(true),
  166. Children = [{Protocol, Pid, ConnType, [Protocol]}
  167. || Pid <- Pids, is_pid(Pid)],
  168. To ! {Tag, Children},
  169. loop(State, CurConns, NbChildren, Sleepers);
  170. {'$gen_call', {To, Tag}, count_children} ->
  171. Counts = case ConnType of
  172. worker -> [{supervisors, 0}, {workers, NbChildren}];
  173. supervisor -> [{supervisors, NbChildren}, {workers, 0}]
  174. end,
  175. Counts2 = [{specs, 1}, {active, NbChildren}|Counts],
  176. To ! {Tag, Counts2},
  177. loop(State, CurConns, NbChildren, Sleepers);
  178. {'$gen_call', {To, Tag}, _} ->
  179. To ! {Tag, {error, ?MODULE}},
  180. loop(State, CurConns, NbChildren, Sleepers);
  181. Msg ->
  182. error_logger:error_msg(
  183. "Ranch listener ~p received unexpected message ~p~n",
  184. [Ref, Msg])
  185. end.
  186. -spec terminate(#state{}, any(), non_neg_integer()) -> no_return().
  187. %% Kill all children and then exit. We unlink first to avoid
  188. %% getting a message for each child getting killed.
  189. terminate(#state{shutdown=brutal_kill}, Reason, _) ->
  190. Pids = get_keys(true),
  191. _ = [begin
  192. unlink(P),
  193. exit(P, kill)
  194. end || P <- Pids],
  195. exit(Reason);
  196. %% Attempt to gracefully shutdown all children.
  197. terminate(#state{shutdown=Shutdown}, Reason, NbChildren) ->
  198. shutdown_children(),
  199. _ = if
  200. Shutdown =:= infinity ->
  201. ok;
  202. true ->
  203. erlang:send_after(Shutdown, self(), kill)
  204. end,
  205. wait_children(NbChildren),
  206. exit(Reason).
  207. %% Monitor processes so we can know which ones have shutdown
  208. %% before the timeout. Unlink so we avoid receiving an extra
  209. %% message. Then send a shutdown exit signal.
  210. shutdown_children() ->
  211. Pids = get_keys(true),
  212. _ = [begin
  213. monitor(process, P),
  214. unlink(P),
  215. exit(P, shutdown)
  216. end || P <- Pids],
  217. ok.
  218. wait_children(0) ->
  219. ok;
  220. wait_children(NbChildren) ->
  221. receive
  222. {'DOWN', _, process, Pid, _} ->
  223. _ = erase(Pid),
  224. wait_children(NbChildren - 1);
  225. kill ->
  226. Pids = get_keys(true),
  227. _ = [exit(P, kill) || P <- Pids],
  228. ok
  229. end.
  230. system_continue(_, _, {State, CurConns, NbChildren, Sleepers}) ->
  231. loop(State, CurConns, NbChildren, Sleepers).
  232. -spec system_terminate(any(), _, _, _) -> no_return().
  233. system_terminate(Reason, _, _, {State, _, NbChildren, _}) ->
  234. terminate(State, Reason, NbChildren).
  235. system_code_change(Misc, _, _, _) ->
  236. {ok, Misc}.
  237. %% We use ~999999p here instead of ~w because the latter doesn't
  238. %% support printable strings.
  239. report_error(_, _, _, normal) ->
  240. ok;
  241. report_error(_, _, _, shutdown) ->
  242. ok;
  243. report_error(_, _, _, {shutdown, _}) ->
  244. ok;
  245. report_error(Ref, Protocol, Pid, Reason) ->
  246. error_logger:error_msg(
  247. "Ranch listener ~p had connection process started with "
  248. "~p:start_link/4 at ~p exit with reason: ~999999p~n",
  249. [Ref, Protocol, Pid, Reason]).