ranch_conns_sup.erl 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. %% Copyright (c) 2011-2015, Loïc Hoguin <essen@ninenines.eu>
  2. %%
  3. %% Permission to use, copy, modify, and/or distribute this software for any
  4. %% purpose with or without fee is hereby granted, provided that the above
  5. %% copyright notice and this permission notice appear in all copies.
  6. %%
  7. %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  8. %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  9. %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  10. %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  11. %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  12. %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  13. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  14. %% Make sure to never reload this module outside a release upgrade,
  15. %% as calling l(ranch_conns_sup) twice will kill the process and all
  16. %% the currently open connections.
  17. -module(ranch_conns_sup).
  18. %% API.
  19. -export([start_link/6]).
  20. -export([start_protocol/2]).
  21. -export([active_connections/1]).
  22. %% Supervisor internals.
  23. -export([init/7]).
  24. -export([system_continue/3]).
  25. -export([system_terminate/4]).
  26. -export([system_code_change/4]).
  27. -type conn_type() :: worker | supervisor.
  28. -type shutdown() :: brutal_kill | timeout().
  29. -record(state, {
  30. parent = undefined :: pid(),
  31. ref :: ranch:ref(),
  32. conn_type :: conn_type(),
  33. shutdown :: shutdown(),
  34. transport = undefined :: module(),
  35. protocol = undefined :: module(),
  36. opts :: any(),
  37. ack_timeout :: timeout(),
  38. max_conns = undefined :: ranch:max_conns()
  39. }).
  40. %% API.
  41. -spec start_link(ranch:ref(), conn_type(), shutdown(), module(),
  42. timeout(), module()) -> {ok, pid()}.
  43. start_link(Ref, ConnType, Shutdown, Transport, AckTimeout, Protocol) ->
  44. proc_lib:start_link(?MODULE, init,
  45. [self(), Ref, ConnType, Shutdown, Transport, AckTimeout, Protocol]).
  46. %% We can safely assume we are on the same node as the supervisor.
  47. %%
  48. %% We can also safely avoid having a monitor and a timeout here
  49. %% because only three things can happen:
  50. %% * The supervisor died; rest_for_one strategy killed all acceptors
  51. %% so this very calling process is going to di--
  52. %% * There's too many connections, the supervisor will resume the
  53. %% acceptor only when we get below the limit again.
  54. %% * The supervisor is overloaded, there's either too many acceptors
  55. %% or the max_connections limit is too large. It's better if we
  56. %% don't keep accepting connections because this leaves
  57. %% more room for the situation to be resolved.
  58. %%
  59. %% We do not need the reply, we only need the ok from the supervisor
  60. %% to continue. The supervisor sends its own pid when the acceptor can
  61. %% continue.
  62. -spec start_protocol(pid(), inet:socket()) -> ok.
  63. start_protocol(SupPid, Socket) ->
  64. SupPid ! {?MODULE, start_protocol, self(), Socket},
  65. receive SupPid -> ok end.
  66. %% We can't make the above assumptions here. This function might be
  67. %% called from anywhere.
  68. -spec active_connections(pid()) -> non_neg_integer().
  69. active_connections(SupPid) ->
  70. Tag = erlang:monitor(process, SupPid),
  71. catch erlang:send(SupPid, {?MODULE, active_connections, self(), Tag},
  72. [noconnect]),
  73. receive
  74. {Tag, Ret} ->
  75. erlang:demonitor(Tag, [flush]),
  76. Ret;
  77. {'DOWN', Tag, _, _, noconnection} ->
  78. exit({nodedown, node(SupPid)});
  79. {'DOWN', Tag, _, _, Reason} ->
  80. exit(Reason)
  81. after 5000 ->
  82. erlang:demonitor(Tag, [flush]),
  83. exit(timeout)
  84. end.
  85. %% Supervisor internals.
  86. -spec init(pid(), ranch:ref(), conn_type(), shutdown(),
  87. module(), timeout(), module()) -> no_return().
  88. init(Parent, Ref, ConnType, Shutdown, Transport, AckTimeout, Protocol) ->
  89. process_flag(trap_exit, true),
  90. ok = ranch_server:set_connections_sup(Ref, self()),
  91. MaxConns = ranch_server:get_max_connections(Ref),
  92. Opts = ranch_server:get_protocol_options(Ref),
  93. ok = proc_lib:init_ack(Parent, {ok, self()}),
  94. loop(#state{parent=Parent, ref=Ref, conn_type=ConnType,
  95. shutdown=Shutdown, transport=Transport, protocol=Protocol,
  96. opts=Opts, ack_timeout=AckTimeout, max_conns=MaxConns}, 0, 0, []).
  97. loop(State=#state{parent=Parent, ref=Ref, conn_type=ConnType,
  98. transport=Transport, protocol=Protocol, opts=Opts,
  99. max_conns=MaxConns}, CurConns, NbChildren, Sleepers) ->
  100. receive
  101. {?MODULE, start_protocol, To, Socket} ->
  102. try Protocol:start_link(Ref, Socket, Transport, Opts) of
  103. {ok, Pid} ->
  104. shoot(State, CurConns, NbChildren, Sleepers, To, Socket, Pid, Pid);
  105. {ok, SupPid, ProtocolPid} when ConnType =:= supervisor ->
  106. shoot(State, CurConns, NbChildren, Sleepers, To, Socket, SupPid, ProtocolPid);
  107. Ret ->
  108. To ! self(),
  109. error_logger:error_msg(
  110. "Ranch listener ~p connection process start failure; "
  111. "~p:start_link/4 returned: ~999999p~n",
  112. [Ref, Protocol, Ret]),
  113. Transport:close(Socket),
  114. loop(State, CurConns, NbChildren, Sleepers)
  115. catch Class:Reason ->
  116. To ! self(),
  117. error_logger:error_msg(
  118. "Ranch listener ~p connection process start failure; "
  119. "~p:start_link/4 crashed with reason: ~p:~999999p~n",
  120. [Ref, Protocol, Class, Reason]),
  121. loop(State, CurConns, NbChildren, Sleepers)
  122. end;
  123. {?MODULE, active_connections, To, Tag} ->
  124. To ! {Tag, CurConns},
  125. loop(State, CurConns, NbChildren, Sleepers);
  126. %% Remove a connection from the count of connections.
  127. {remove_connection, Ref} ->
  128. loop(State, CurConns - 1, NbChildren, Sleepers);
  129. %% Upgrade the max number of connections allowed concurrently.
  130. %% We resume all sleeping acceptors if this number increases.
  131. {set_max_conns, MaxConns2} when MaxConns2 > MaxConns ->
  132. _ = [To ! self() || To <- Sleepers],
  133. loop(State#state{max_conns=MaxConns2},
  134. CurConns, NbChildren, []);
  135. {set_max_conns, MaxConns2} ->
  136. loop(State#state{max_conns=MaxConns2},
  137. CurConns, NbChildren, Sleepers);
  138. %% Upgrade the protocol options.
  139. {set_opts, Opts2} ->
  140. loop(State#state{opts=Opts2},
  141. CurConns, NbChildren, Sleepers);
  142. {'EXIT', Parent, Reason} ->
  143. terminate(State, Reason, NbChildren);
  144. {'EXIT', Pid, Reason} when Sleepers =:= [] ->
  145. report_error(Ref, Protocol, Pid, Reason),
  146. erase(Pid),
  147. loop(State, CurConns - 1, NbChildren - 1, Sleepers);
  148. %% Resume a sleeping acceptor if needed.
  149. {'EXIT', Pid, Reason} ->
  150. report_error(Ref, Protocol, Pid, Reason),
  151. erase(Pid),
  152. [To|Sleepers2] = Sleepers,
  153. To ! self(),
  154. loop(State, CurConns - 1, NbChildren - 1, Sleepers2);
  155. {system, From, Request} ->
  156. sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
  157. {State, CurConns, NbChildren, Sleepers});
  158. %% Calls from the supervisor module.
  159. {'$gen_call', {To, Tag}, which_children} ->
  160. Pids = get_keys(true),
  161. Children = [{Protocol, Pid, ConnType, [Protocol]}
  162. || Pid <- Pids, is_pid(Pid)],
  163. To ! {Tag, Children},
  164. loop(State, CurConns, NbChildren, Sleepers);
  165. {'$gen_call', {To, Tag}, count_children} ->
  166. Counts = case ConnType of
  167. worker -> [{supervisors, 0}, {workers, NbChildren}];
  168. supervisor -> [{supervisors, NbChildren}, {workers, 0}]
  169. end,
  170. Counts2 = [{specs, 1}, {active, NbChildren}|Counts],
  171. To ! {Tag, Counts2},
  172. loop(State, CurConns, NbChildren, Sleepers);
  173. {'$gen_call', {To, Tag}, _} ->
  174. To ! {Tag, {error, ?MODULE}},
  175. loop(State, CurConns, NbChildren, Sleepers);
  176. Msg ->
  177. error_logger:error_msg(
  178. "Ranch listener ~p received unexpected message ~p~n",
  179. [Ref, Msg])
  180. end.
  181. shoot(State=#state{ref=Ref, transport=Transport, ack_timeout=AckTimeout, max_conns=MaxConns},
  182. CurConns, NbChildren, Sleepers, To, Socket, SupPid, ProtocolPid) ->
  183. case Transport:controlling_process(Socket, ProtocolPid) of
  184. ok ->
  185. ProtocolPid ! {shoot, Ref, Transport, Socket, AckTimeout},
  186. put(SupPid, true),
  187. CurConns2 = CurConns + 1,
  188. if CurConns2 < MaxConns ->
  189. To ! self(),
  190. loop(State, CurConns2, NbChildren + 1, Sleepers);
  191. true ->
  192. loop(State, CurConns2, NbChildren + 1, [To|Sleepers])
  193. end;
  194. {error, _} ->
  195. Transport:close(Socket),
  196. %% Only kill the supervised pid, because the connection's pid,
  197. %% when different, is supposed to be sitting under it and linked.
  198. exit(SupPid, kill),
  199. loop(State, CurConns, NbChildren, Sleepers)
  200. end.
  201. -spec terminate(#state{}, any(), non_neg_integer()) -> no_return().
  202. %% Kill all children and then exit. We unlink first to avoid
  203. %% getting a message for each child getting killed.
  204. terminate(#state{shutdown=brutal_kill}, Reason, _) ->
  205. Pids = get_keys(true),
  206. _ = [begin
  207. unlink(P),
  208. exit(P, kill)
  209. end || P <- Pids],
  210. exit(Reason);
  211. %% Attempt to gracefully shutdown all children.
  212. terminate(#state{shutdown=Shutdown}, Reason, NbChildren) ->
  213. shutdown_children(),
  214. _ = if
  215. Shutdown =:= infinity ->
  216. ok;
  217. true ->
  218. erlang:send_after(Shutdown, self(), kill)
  219. end,
  220. wait_children(NbChildren),
  221. exit(Reason).
  222. %% Monitor processes so we can know which ones have shutdown
  223. %% before the timeout. Unlink so we avoid receiving an extra
  224. %% message. Then send a shutdown exit signal.
  225. shutdown_children() ->
  226. Pids = get_keys(true),
  227. _ = [begin
  228. monitor(process, P),
  229. unlink(P),
  230. exit(P, shutdown)
  231. end || P <- Pids],
  232. ok.
  233. wait_children(0) ->
  234. ok;
  235. wait_children(NbChildren) ->
  236. receive
  237. {'DOWN', _, process, Pid, _} ->
  238. _ = erase(Pid),
  239. wait_children(NbChildren - 1);
  240. kill ->
  241. Pids = get_keys(true),
  242. _ = [exit(P, kill) || P <- Pids],
  243. ok
  244. end.
  245. system_continue(_, _, {State, CurConns, NbChildren, Sleepers}) ->
  246. loop(State, CurConns, NbChildren, Sleepers).
  247. -spec system_terminate(any(), _, _, _) -> no_return().
  248. system_terminate(Reason, _, _, {State, _, NbChildren, _}) ->
  249. terminate(State, Reason, NbChildren).
  250. system_code_change(Misc, _, _, _) ->
  251. {ok, Misc}.
  252. %% We use ~999999p here instead of ~w because the latter doesn't
  253. %% support printable strings.
  254. report_error(_, _, _, normal) ->
  255. ok;
  256. report_error(_, _, _, shutdown) ->
  257. ok;
  258. report_error(_, _, _, {shutdown, _}) ->
  259. ok;
  260. report_error(Ref, Protocol, Pid, Reason) ->
  261. error_logger:error_msg(
  262. "Ranch listener ~p had connection process started with "
  263. "~p:start_link/4 at ~p exit with reason: ~999999p~n",
  264. [Ref, Protocol, Pid, Reason]).