ranch_conns_sup.erl 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. %% Copyright (c) 2011-2018, Loïc Hoguin <essen@ninenines.eu>
  2. %%
  3. %% Permission to use, copy, modify, and/or distribute this software for any
  4. %% purpose with or without fee is hereby granted, provided that the above
  5. %% copyright notice and this permission notice appear in all copies.
  6. %%
  7. %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  8. %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  9. %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  10. %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  11. %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  12. %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  13. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  14. %% Make sure to never reload this module outside a release upgrade,
  15. %% as calling l(ranch_conns_sup) twice will kill the process and all
  16. %% the currently open connections.
  17. -module(ranch_conns_sup).
  18. %% API.
  19. -export([start_link/3]).
  20. -export([start_protocol/2]).
  21. -export([active_connections/1]).
  22. %% Supervisor internals.
  23. -export([init/4]).
  24. -export([system_continue/3]).
  25. -export([system_terminate/4]).
  26. -export([system_code_change/4]).
  27. -type conn_type() :: worker | supervisor.
  28. -type shutdown() :: brutal_kill | timeout().
  29. -record(state, {
  30. parent = undefined :: pid(),
  31. ref :: ranch:ref(),
  32. conn_type :: conn_type(),
  33. shutdown :: shutdown(),
  34. transport = undefined :: module(),
  35. protocol = undefined :: module(),
  36. opts :: any(),
  37. handshake_timeout :: timeout(),
  38. max_conns = undefined :: ranch:max_conns()
  39. }).
  40. %% API.
  41. -spec start_link(ranch:ref(), module(), module()) -> {ok, pid()}.
  42. start_link(Ref, Transport, Protocol) ->
  43. proc_lib:start_link(?MODULE, init,
  44. [self(), Ref, Transport, Protocol]).
  45. %% We can safely assume we are on the same node as the supervisor.
  46. %%
  47. %% We can also safely avoid having a monitor and a timeout here
  48. %% because only three things can happen:
  49. %% * The supervisor died; rest_for_one strategy killed all acceptors
  50. %% so this very calling process is going to di--
  51. %% * There's too many connections, the supervisor will resume the
  52. %% acceptor only when we get below the limit again.
  53. %% * The supervisor is overloaded, there's either too many acceptors
  54. %% or the max_connections limit is too large. It's better if we
  55. %% don't keep accepting connections because this leaves
  56. %% more room for the situation to be resolved.
  57. %%
  58. %% We do not need the reply, we only need the ok from the supervisor
  59. %% to continue. The supervisor sends its own pid when the acceptor can
  60. %% continue.
  61. -spec start_protocol(pid(), inet:socket()) -> ok.
  62. start_protocol(SupPid, Socket) ->
  63. SupPid ! {?MODULE, start_protocol, self(), Socket},
  64. receive SupPid -> ok end.
  65. %% We can't make the above assumptions here. This function might be
  66. %% called from anywhere.
  67. -spec active_connections(pid()) -> non_neg_integer().
  68. active_connections(SupPid) ->
  69. Tag = erlang:monitor(process, SupPid),
  70. catch erlang:send(SupPid, {?MODULE, active_connections, self(), Tag},
  71. [noconnect]),
  72. receive
  73. {Tag, Ret} ->
  74. erlang:demonitor(Tag, [flush]),
  75. Ret;
  76. {'DOWN', Tag, _, _, noconnection} ->
  77. exit({nodedown, node(SupPid)});
  78. {'DOWN', Tag, _, _, Reason} ->
  79. exit(Reason)
  80. after 5000 ->
  81. erlang:demonitor(Tag, [flush]),
  82. exit(timeout)
  83. end.
  84. %% Supervisor internals.
  85. -spec init(pid(), ranch:ref(), module(), module()) -> no_return().
  86. init(Parent, Ref, Transport, Protocol) ->
  87. process_flag(trap_exit, true),
  88. ok = ranch_server:set_connections_sup(Ref, self()),
  89. MaxConns = ranch_server:get_max_connections(Ref),
  90. TransOpts = ranch_server:get_transport_options(Ref),
  91. ConnType = maps:get(connection_type, TransOpts, worker),
  92. Shutdown = maps:get(shutdown, TransOpts, 5000),
  93. HandshakeTimeout = maps:get(handshake_timeout, TransOpts, 5000),
  94. ProtoOpts = ranch_server:get_protocol_options(Ref),
  95. ok = proc_lib:init_ack(Parent, {ok, self()}),
  96. loop(#state{parent=Parent, ref=Ref, conn_type=ConnType,
  97. shutdown=Shutdown, transport=Transport, protocol=Protocol,
  98. opts=ProtoOpts, handshake_timeout=HandshakeTimeout, max_conns=MaxConns}, 0, 0, []).
  99. loop(State=#state{parent=Parent, ref=Ref, conn_type=ConnType,
  100. transport=Transport, protocol=Protocol, opts=Opts,
  101. max_conns=MaxConns}, CurConns, NbChildren, Sleepers) ->
  102. receive
  103. {?MODULE, start_protocol, To, Socket} ->
  104. try Protocol:start_link(Ref, Socket, Transport, Opts) of
  105. {ok, Pid} ->
  106. handshake(State, CurConns, NbChildren, Sleepers, To, Socket, Pid, Pid);
  107. {ok, SupPid, ProtocolPid} when ConnType =:= supervisor ->
  108. handshake(State, CurConns, NbChildren, Sleepers, To, Socket, SupPid, ProtocolPid);
  109. Ret ->
  110. To ! self(),
  111. error_logger:error_msg(
  112. "Ranch listener ~p connection process start failure; "
  113. "~p:start_link/4 returned: ~999999p~n",
  114. [Ref, Protocol, Ret]),
  115. Transport:close(Socket),
  116. loop(State, CurConns, NbChildren, Sleepers)
  117. catch Class:Reason ->
  118. To ! self(),
  119. error_logger:error_msg(
  120. "Ranch listener ~p connection process start failure; "
  121. "~p:start_link/4 crashed with reason: ~p:~999999p~n",
  122. [Ref, Protocol, Class, Reason]),
  123. loop(State, CurConns, NbChildren, Sleepers)
  124. end;
  125. {?MODULE, active_connections, To, Tag} ->
  126. To ! {Tag, CurConns},
  127. loop(State, CurConns, NbChildren, Sleepers);
  128. %% Remove a connection from the count of connections.
  129. {remove_connection, Ref, Pid} ->
  130. case put(Pid, removed) of
  131. active ->
  132. loop(State, CurConns - 1, NbChildren, Sleepers);
  133. remove ->
  134. loop(State, CurConns, NbChildren, Sleepers);
  135. undefined ->
  136. _ = erase(Pid),
  137. loop(State, CurConns, NbChildren, Sleepers)
  138. end;
  139. %% Upgrade the max number of connections allowed concurrently.
  140. %% We resume all sleeping acceptors if this number increases.
  141. {set_max_conns, MaxConns2} when MaxConns2 > MaxConns ->
  142. _ = [To ! self() || To <- Sleepers],
  143. loop(State#state{max_conns=MaxConns2},
  144. CurConns, NbChildren, []);
  145. {set_max_conns, MaxConns2} ->
  146. loop(State#state{max_conns=MaxConns2},
  147. CurConns, NbChildren, Sleepers);
  148. %% Upgrade the protocol options.
  149. {set_opts, Opts2} ->
  150. loop(State#state{opts=Opts2},
  151. CurConns, NbChildren, Sleepers);
  152. {'EXIT', Parent, Reason} ->
  153. terminate(State, Reason, NbChildren);
  154. {'EXIT', Pid, Reason} when Sleepers =:= [] ->
  155. case erase(Pid) of
  156. active ->
  157. report_error(Ref, Protocol, Pid, Reason),
  158. loop(State, CurConns - 1, NbChildren - 1, Sleepers);
  159. removed ->
  160. report_error(Ref, Protocol, Pid, Reason),
  161. loop(State, CurConns, NbChildren - 1, Sleepers);
  162. undefined ->
  163. loop(State, CurConns, NbChildren, Sleepers)
  164. end;
  165. %% Resume a sleeping acceptor if needed.
  166. {'EXIT', Pid, Reason} ->
  167. case erase(Pid) of
  168. active when CurConns > MaxConns ->
  169. report_error(Ref, Protocol, Pid, Reason),
  170. loop(State, CurConns - 1, NbChildren - 1, Sleepers);
  171. active ->
  172. report_error(Ref, Protocol, Pid, Reason),
  173. [To|Sleepers2] = Sleepers,
  174. To ! self(),
  175. loop(State, CurConns - 1, NbChildren - 1, Sleepers2);
  176. removed ->
  177. report_error(Ref, Protocol, Pid, Reason),
  178. loop(State, CurConns, NbChildren - 1, Sleepers);
  179. undefined ->
  180. loop(State, CurConns, NbChildren, Sleepers)
  181. end;
  182. {system, From, Request} ->
  183. sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
  184. {State, CurConns, NbChildren, Sleepers});
  185. %% Calls from the supervisor module.
  186. {'$gen_call', {To, Tag}, which_children} ->
  187. Children = [{Protocol, Pid, ConnType, [Protocol]}
  188. || {Pid, Type} <- get(),
  189. Type =:= active orelse Type =:= removed],
  190. To ! {Tag, Children},
  191. loop(State, CurConns, NbChildren, Sleepers);
  192. {'$gen_call', {To, Tag}, count_children} ->
  193. Counts = case ConnType of
  194. worker -> [{supervisors, 0}, {workers, NbChildren}];
  195. supervisor -> [{supervisors, NbChildren}, {workers, 0}]
  196. end,
  197. Counts2 = [{specs, 1}, {active, NbChildren}|Counts],
  198. To ! {Tag, Counts2},
  199. loop(State, CurConns, NbChildren, Sleepers);
  200. {'$gen_call', {To, Tag}, _} ->
  201. To ! {Tag, {error, ?MODULE}},
  202. loop(State, CurConns, NbChildren, Sleepers);
  203. Msg ->
  204. error_logger:error_msg(
  205. "Ranch listener ~p received unexpected message ~p~n",
  206. [Ref, Msg]),
  207. loop(State, CurConns, NbChildren, Sleepers)
  208. end.
  209. handshake(State=#state{ref=Ref, transport=Transport, handshake_timeout=HandshakeTimeout,
  210. max_conns=MaxConns}, CurConns, NbChildren, Sleepers, To, Socket, SupPid, ProtocolPid) ->
  211. case Transport:controlling_process(Socket, ProtocolPid) of
  212. ok ->
  213. ProtocolPid ! {handshake, Ref, Transport, Socket, HandshakeTimeout},
  214. put(SupPid, active),
  215. CurConns2 = CurConns + 1,
  216. if CurConns2 < MaxConns ->
  217. To ! self(),
  218. loop(State, CurConns2, NbChildren + 1, Sleepers);
  219. true ->
  220. loop(State, CurConns2, NbChildren + 1, [To|Sleepers])
  221. end;
  222. {error, _} ->
  223. Transport:close(Socket),
  224. %% Only kill the supervised pid, because the connection's pid,
  225. %% when different, is supposed to be sitting under it and linked.
  226. exit(SupPid, kill),
  227. To ! self(),
  228. loop(State, CurConns, NbChildren, Sleepers)
  229. end.
  230. -spec terminate(#state{}, any(), non_neg_integer()) -> no_return().
  231. terminate(#state{shutdown=brutal_kill}, Reason, _) ->
  232. kill_children(get_keys(active)),
  233. kill_children(get_keys(removed)),
  234. exit(Reason);
  235. %% Attempt to gracefully shutdown all children.
  236. terminate(#state{shutdown=Shutdown}, Reason, NbChildren) ->
  237. shutdown_children(get_keys(active)),
  238. shutdown_children(get_keys(removed)),
  239. _ = if
  240. Shutdown =:= infinity ->
  241. ok;
  242. true ->
  243. erlang:send_after(Shutdown, self(), kill)
  244. end,
  245. wait_children(NbChildren),
  246. exit(Reason).
  247. %% Kill all children and then exit. We unlink first to avoid
  248. %% getting a message for each child getting killed.
  249. kill_children(Pids) ->
  250. _ = [begin
  251. unlink(P),
  252. exit(P, kill)
  253. end || P <- Pids],
  254. ok.
  255. %% Monitor processes so we can know which ones have shutdown
  256. %% before the timeout. Unlink so we avoid receiving an extra
  257. %% message. Then send a shutdown exit signal.
  258. shutdown_children(Pids) ->
  259. _ = [begin
  260. monitor(process, P),
  261. unlink(P),
  262. exit(P, shutdown)
  263. end || P <- Pids],
  264. ok.
  265. wait_children(0) ->
  266. ok;
  267. wait_children(NbChildren) ->
  268. receive
  269. {'DOWN', _, process, Pid, _} ->
  270. case erase(Pid) of
  271. active -> wait_children(NbChildren - 1);
  272. removed -> wait_children(NbChildren - 1);
  273. _ -> wait_children(NbChildren)
  274. end;
  275. kill ->
  276. Active = get_keys(active),
  277. _ = [exit(P, kill) || P <- Active],
  278. Removed = get_keys(removed),
  279. _ = [exit(P, kill) || P <- Removed],
  280. ok
  281. end.
  282. system_continue(_, _, {State, CurConns, NbChildren, Sleepers}) ->
  283. loop(State, CurConns, NbChildren, Sleepers).
  284. -spec system_terminate(any(), _, _, _) -> no_return().
  285. system_terminate(Reason, _, _, {State, _, NbChildren, _}) ->
  286. terminate(State, Reason, NbChildren).
  287. system_code_change(Misc, _, _, _) ->
  288. {ok, Misc}.
  289. %% We use ~999999p here instead of ~w because the latter doesn't
  290. %% support printable strings.
  291. report_error(_, _, _, normal) ->
  292. ok;
  293. report_error(_, _, _, shutdown) ->
  294. ok;
  295. report_error(_, _, _, {shutdown, _}) ->
  296. ok;
  297. report_error(Ref, Protocol, Pid, Reason) ->
  298. error_logger:error_msg(
  299. "Ranch listener ~p had connection process started with "
  300. "~p:start_link/4 at ~p exit with reason: ~999999p~n",
  301. [Ref, Protocol, Pid, Reason]).