ranch_conns_sup.erl 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. %% Copyright (c) 2011-2018, Loïc Hoguin <essen@ninenines.eu>
  2. %%
  3. %% Permission to use, copy, modify, and/or distribute this software for any
  4. %% purpose with or without fee is hereby granted, provided that the above
  5. %% copyright notice and this permission notice appear in all copies.
  6. %%
  7. %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  8. %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  9. %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  10. %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  11. %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  12. %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  13. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  14. %% Make sure to never reload this module outside a release upgrade,
  15. %% as calling l(ranch_conns_sup) twice will kill the process and all
  16. %% the currently open connections.
  17. -module(ranch_conns_sup).
  18. %% API.
  19. -export([start_link/4]).
  20. -export([start_protocol/3]).
  21. -export([active_connections/1]).
  22. %% Supervisor internals.
  23. -export([init/5]).
  24. -export([system_continue/3]).
  25. -export([system_terminate/4]).
  26. -export([system_code_change/4]).
  27. -type conn_type() :: worker | supervisor.
  28. -type shutdown() :: brutal_kill | timeout().
  29. -record(state, {
  30. parent = undefined :: pid(),
  31. ref :: ranch:ref(),
  32. conn_type :: conn_type(),
  33. shutdown :: shutdown(),
  34. transport = undefined :: module(),
  35. protocol = undefined :: module(),
  36. opts :: any(),
  37. handshake_timeout :: timeout(),
  38. max_conns = undefined :: ranch:max_conns(),
  39. logger = undefined :: module()
  40. }).
  41. %% API.
  42. -spec start_link(ranch:ref(), pos_integer(), module(), module()) -> {ok, pid()}.
  43. start_link(Ref, Id, Transport, Protocol) ->
  44. proc_lib:start_link(?MODULE, init,
  45. [self(), Ref, Id, Transport, Protocol]).
  46. %% We can safely assume we are on the same node as the supervisor.
  47. %%
  48. %% We can also safely avoid having a monitor and a timeout here
  49. %% because only three things can happen:
  50. %% * The supervisor died; rest_for_one strategy killed all acceptors
  51. %% so this very calling process is going to di--
  52. %% * There's too many connections, the supervisor will resume the
  53. %% acceptor only when we get below the limit again.
  54. %% * The supervisor is overloaded, there's either too many acceptors
  55. %% or the max_connections limit is too large. It's better if we
  56. %% don't keep accepting connections because this leaves
  57. %% more room for the situation to be resolved.
  58. %%
  59. %% We do not need the reply, we only need the ok from the supervisor
  60. %% to continue. The supervisor sends its own pid when the acceptor can
  61. %% continue.
  62. -spec start_protocol(pid(), reference(), inet:socket()) -> ok.
  63. start_protocol(SupPid, MonitorRef, Socket) ->
  64. SupPid ! {?MODULE, start_protocol, self(), Socket},
  65. receive
  66. SupPid ->
  67. ok;
  68. {'DOWN', MonitorRef, process, SupPid, Reason} ->
  69. error(Reason)
  70. end.
  71. %% We can't make the above assumptions here. This function might be
  72. %% called from anywhere.
  73. -spec active_connections(pid()) -> non_neg_integer().
  74. active_connections(SupPid) ->
  75. Tag = erlang:monitor(process, SupPid),
  76. catch erlang:send(SupPid, {?MODULE, active_connections, self(), Tag},
  77. [noconnect]),
  78. receive
  79. {Tag, Ret} ->
  80. erlang:demonitor(Tag, [flush]),
  81. Ret;
  82. {'DOWN', Tag, _, _, noconnection} ->
  83. exit({nodedown, node(SupPid)});
  84. {'DOWN', Tag, _, _, Reason} ->
  85. exit(Reason)
  86. after 5000 ->
  87. erlang:demonitor(Tag, [flush]),
  88. exit(timeout)
  89. end.
  90. %% Supervisor internals.
  91. -spec init(pid(), ranch:ref(), pos_integer(), module(), module()) -> no_return().
  92. init(Parent, Ref, Id, Transport, Protocol) ->
  93. process_flag(trap_exit, true),
  94. ok = ranch_server:set_connections_sup(Ref, Id, self()),
  95. MaxConns = ranch_server:get_max_connections(Ref),
  96. TransOpts = ranch_server:get_transport_options(Ref),
  97. ConnType = maps:get(connection_type, TransOpts, worker),
  98. Shutdown = maps:get(shutdown, TransOpts, 5000),
  99. HandshakeTimeout = maps:get(handshake_timeout, TransOpts, 5000),
  100. Logger = maps:get(logger, TransOpts, logger),
  101. ProtoOpts = ranch_server:get_protocol_options(Ref),
  102. ok = proc_lib:init_ack(Parent, {ok, self()}),
  103. loop(#state{parent=Parent, ref=Ref, conn_type=ConnType,
  104. shutdown=Shutdown, transport=Transport, protocol=Protocol,
  105. opts=ProtoOpts, handshake_timeout=HandshakeTimeout,
  106. max_conns=MaxConns, logger=Logger}, 0, 0, []).
  107. loop(State=#state{parent=Parent, ref=Ref, conn_type=ConnType,
  108. transport=Transport, protocol=Protocol, opts=Opts,
  109. max_conns=MaxConns, logger=Logger}, CurConns, NbChildren, Sleepers) ->
  110. receive
  111. {?MODULE, start_protocol, To, Socket} ->
  112. try Protocol:start_link(Ref, Transport, Opts) of
  113. {ok, Pid} ->
  114. handshake(State, CurConns, NbChildren, Sleepers, To, Socket, Pid, Pid);
  115. {ok, SupPid, ProtocolPid} when ConnType =:= supervisor ->
  116. handshake(State, CurConns, NbChildren, Sleepers, To, Socket, SupPid, ProtocolPid);
  117. Ret ->
  118. To ! self(),
  119. ranch:log(error,
  120. "Ranch listener ~p connection process start failure; "
  121. "~p:start_link/4 returned: ~999999p~n",
  122. [Ref, Protocol, Ret], Logger),
  123. Transport:close(Socket),
  124. loop(State, CurConns, NbChildren, Sleepers)
  125. catch Class:Reason ->
  126. To ! self(),
  127. ranch:log(error,
  128. "Ranch listener ~p connection process start failure; "
  129. "~p:start_link/4 crashed with reason: ~p:~999999p~n",
  130. [Ref, Protocol, Class, Reason], Logger),
  131. loop(State, CurConns, NbChildren, Sleepers)
  132. end;
  133. {?MODULE, active_connections, To, Tag} ->
  134. To ! {Tag, CurConns},
  135. loop(State, CurConns, NbChildren, Sleepers);
  136. %% Remove a connection from the count of connections.
  137. {remove_connection, Ref, Pid} ->
  138. case put(Pid, removed) of
  139. active ->
  140. loop(State, CurConns - 1, NbChildren, Sleepers);
  141. removed ->
  142. loop(State, CurConns, NbChildren, Sleepers);
  143. undefined ->
  144. _ = erase(Pid),
  145. loop(State, CurConns, NbChildren, Sleepers)
  146. end;
  147. %% Upgrade the max number of connections allowed concurrently.
  148. %% We resume all sleeping acceptors if this number increases.
  149. {set_max_conns, MaxConns2} when MaxConns2 > MaxConns ->
  150. _ = [To ! self() || To <- Sleepers],
  151. loop(State#state{max_conns=MaxConns2},
  152. CurConns, NbChildren, []);
  153. {set_max_conns, MaxConns2} ->
  154. loop(State#state{max_conns=MaxConns2},
  155. CurConns, NbChildren, Sleepers);
  156. %% Upgrade the protocol options.
  157. {set_opts, Opts2} ->
  158. loop(State#state{opts=Opts2},
  159. CurConns, NbChildren, Sleepers);
  160. {'EXIT', Parent, Reason} ->
  161. terminate(State, Reason, NbChildren);
  162. {'EXIT', Pid, Reason} when Sleepers =:= [] ->
  163. case erase(Pid) of
  164. active ->
  165. report_error(Logger, Ref, Protocol, Pid, Reason),
  166. loop(State, CurConns - 1, NbChildren - 1, Sleepers);
  167. removed ->
  168. report_error(Logger, Ref, Protocol, Pid, Reason),
  169. loop(State, CurConns, NbChildren - 1, Sleepers);
  170. undefined ->
  171. loop(State, CurConns, NbChildren, Sleepers)
  172. end;
  173. %% Resume a sleeping acceptor if needed.
  174. {'EXIT', Pid, Reason} ->
  175. case erase(Pid) of
  176. active when CurConns > MaxConns ->
  177. report_error(Logger, Ref, Protocol, Pid, Reason),
  178. loop(State, CurConns - 1, NbChildren - 1, Sleepers);
  179. active ->
  180. report_error(Logger, Ref, Protocol, Pid, Reason),
  181. [To|Sleepers2] = Sleepers,
  182. To ! self(),
  183. loop(State, CurConns - 1, NbChildren - 1, Sleepers2);
  184. removed ->
  185. report_error(Logger, Ref, Protocol, Pid, Reason),
  186. loop(State, CurConns, NbChildren - 1, Sleepers);
  187. undefined ->
  188. loop(State, CurConns, NbChildren, Sleepers)
  189. end;
  190. {system, From, Request} ->
  191. sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
  192. {State, CurConns, NbChildren, Sleepers});
  193. %% Calls from the supervisor module.
  194. {'$gen_call', {To, Tag}, which_children} ->
  195. Children = [{Protocol, Pid, ConnType, [Protocol]}
  196. || {Pid, Type} <- get(),
  197. Type =:= active orelse Type =:= removed],
  198. To ! {Tag, Children},
  199. loop(State, CurConns, NbChildren, Sleepers);
  200. {'$gen_call', {To, Tag}, count_children} ->
  201. Counts = case ConnType of
  202. worker -> [{supervisors, 0}, {workers, NbChildren}];
  203. supervisor -> [{supervisors, NbChildren}, {workers, 0}]
  204. end,
  205. Counts2 = [{specs, 1}, {active, NbChildren}|Counts],
  206. To ! {Tag, Counts2},
  207. loop(State, CurConns, NbChildren, Sleepers);
  208. {'$gen_call', {To, Tag}, _} ->
  209. To ! {Tag, {error, ?MODULE}},
  210. loop(State, CurConns, NbChildren, Sleepers);
  211. Msg ->
  212. ranch:log(error,
  213. "Ranch listener ~p received unexpected message ~p~n",
  214. [Ref, Msg], Logger),
  215. loop(State, CurConns, NbChildren, Sleepers)
  216. end.
  217. handshake(State=#state{ref=Ref, transport=Transport, handshake_timeout=HandshakeTimeout,
  218. max_conns=MaxConns}, CurConns, NbChildren, Sleepers, To, Socket, SupPid, ProtocolPid) ->
  219. case Transport:controlling_process(Socket, ProtocolPid) of
  220. ok ->
  221. ProtocolPid ! {handshake, Ref, Transport, Socket, HandshakeTimeout},
  222. put(SupPid, active),
  223. CurConns2 = CurConns + 1,
  224. if CurConns2 < MaxConns ->
  225. To ! self(),
  226. loop(State, CurConns2, NbChildren + 1, Sleepers);
  227. true ->
  228. loop(State, CurConns2, NbChildren + 1, [To|Sleepers])
  229. end;
  230. {error, _} ->
  231. Transport:close(Socket),
  232. %% Only kill the supervised pid, because the connection's pid,
  233. %% when different, is supposed to be sitting under it and linked.
  234. exit(SupPid, kill),
  235. To ! self(),
  236. loop(State, CurConns, NbChildren, Sleepers)
  237. end.
  238. -spec terminate(#state{}, any(), non_neg_integer()) -> no_return().
  239. terminate(#state{shutdown=brutal_kill}, Reason, _) ->
  240. kill_children(get_keys(active)),
  241. kill_children(get_keys(removed)),
  242. exit(Reason);
  243. %% Attempt to gracefully shutdown all children.
  244. terminate(#state{shutdown=Shutdown}, Reason, NbChildren) ->
  245. shutdown_children(get_keys(active)),
  246. shutdown_children(get_keys(removed)),
  247. _ = if
  248. Shutdown =:= infinity ->
  249. ok;
  250. true ->
  251. erlang:send_after(Shutdown, self(), kill)
  252. end,
  253. wait_children(NbChildren),
  254. exit(Reason).
  255. %% Kill all children and then exit. We unlink first to avoid
  256. %% getting a message for each child getting killed.
  257. kill_children(Pids) ->
  258. _ = [begin
  259. unlink(P),
  260. exit(P, kill)
  261. end || P <- Pids],
  262. ok.
  263. %% Monitor processes so we can know which ones have shutdown
  264. %% before the timeout. Unlink so we avoid receiving an extra
  265. %% message. Then send a shutdown exit signal.
  266. shutdown_children(Pids) ->
  267. _ = [begin
  268. monitor(process, P),
  269. unlink(P),
  270. exit(P, shutdown)
  271. end || P <- Pids],
  272. ok.
  273. wait_children(0) ->
  274. ok;
  275. wait_children(NbChildren) ->
  276. receive
  277. {'DOWN', _, process, Pid, _} ->
  278. case erase(Pid) of
  279. active -> wait_children(NbChildren - 1);
  280. removed -> wait_children(NbChildren - 1);
  281. _ -> wait_children(NbChildren)
  282. end;
  283. kill ->
  284. Active = get_keys(active),
  285. _ = [exit(P, kill) || P <- Active],
  286. Removed = get_keys(removed),
  287. _ = [exit(P, kill) || P <- Removed],
  288. ok
  289. end.
  290. system_continue(_, _, {State, CurConns, NbChildren, Sleepers}) ->
  291. loop(State, CurConns, NbChildren, Sleepers).
  292. -spec system_terminate(any(), _, _, _) -> no_return().
  293. system_terminate(Reason, _, _, {State, _, NbChildren, _}) ->
  294. terminate(State, Reason, NbChildren).
  295. system_code_change(Misc, _, _, _) ->
  296. {ok, Misc}.
  297. %% We use ~999999p here instead of ~w because the latter doesn't
  298. %% support printable strings.
  299. report_error(_, _, _, _, normal) ->
  300. ok;
  301. report_error(_, _, _, _, shutdown) ->
  302. ok;
  303. report_error(_, _, _, _, {shutdown, _}) ->
  304. ok;
  305. report_error(Logger, Ref, Protocol, Pid, Reason) ->
  306. ranch:log(error,
  307. "Ranch listener ~p had connection process started with "
  308. "~p:start_link/4 at ~p exit with reason: ~999999p~n",
  309. [Ref, Protocol, Pid, Reason], Logger).