cowboy_http2.erl 42 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048
  1. %% Copyright (c) 2015-2017, Loïc Hoguin <essen@ninenines.eu>
  2. %%
  3. %% Permission to use, copy, modify, and/or distribute this software for any
  4. %% purpose with or without fee is hereby granted, provided that the above
  5. %% copyright notice and this permission notice appear in all copies.
  6. %%
  7. %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  8. %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  9. %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  10. %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  11. %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  12. %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  13. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  14. -module(cowboy_http2).
  15. -export([init/6]).
  16. -export([init/10]).
  17. -export([init/12]).
  18. -export([system_continue/3]).
  19. -export([system_terminate/4]).
  20. -export([system_code_change/4]).
  21. -type opts() :: #{
  22. compress_buffering => boolean(),
  23. compress_threshold => non_neg_integer(),
  24. connection_type => worker | supervisor,
  25. connection_window_margin_size => 0..16#7fffffff,
  26. connection_window_update_threshold => 0..16#7fffffff,
  27. enable_connect_protocol => boolean(),
  28. env => cowboy_middleware:env(),
  29. idle_timeout => timeout(),
  30. inactivity_timeout => timeout(),
  31. initial_connection_window_size => 65535..16#7fffffff,
  32. initial_stream_window_size => 0..16#7fffffff,
  33. logger => module(),
  34. max_concurrent_streams => non_neg_integer() | infinity,
  35. max_connection_buffer_size => non_neg_integer(),
  36. max_connection_window_size => 0..16#7fffffff,
  37. max_decode_table_size => non_neg_integer(),
  38. max_encode_table_size => non_neg_integer(),
  39. max_frame_size_received => 16384..16777215,
  40. max_frame_size_sent => 16384..16777215 | infinity,
  41. max_received_frame_rate => {pos_integer(), timeout()},
  42. max_reset_stream_rate => {pos_integer(), timeout()},
  43. max_stream_buffer_size => non_neg_integer(),
  44. max_stream_window_size => 0..16#7fffffff,
  45. metrics_callback => cowboy_metrics_h:metrics_callback(),
  46. metrics_req_filter => fun((cowboy_req:req()) -> map()),
  47. metrics_resp_headers_filter => fun((cowboy:http_headers()) -> cowboy:http_headers()),
  48. middlewares => [module()],
  49. preface_timeout => timeout(),
  50. proxy_header => boolean(),
  51. sendfile => boolean(),
  52. settings_timeout => timeout(),
  53. shutdown_timeout => timeout(),
  54. stream_handlers => [module()],
  55. stream_window_data_threshold => 0..16#7fffffff,
  56. stream_window_margin_size => 0..16#7fffffff,
  57. stream_window_update_threshold => 0..16#7fffffff,
  58. tracer_callback => cowboy_tracer_h:tracer_callback(),
  59. tracer_flags => [atom()],
  60. tracer_match_specs => cowboy_tracer_h:tracer_match_specs(),
  61. %% Open ended because configured stream handlers might add options.
  62. _ => _
  63. }.
  64. -export_type([opts/0]).
  65. -record(stream, {
  66. %% Whether the stream is currently stopping.
  67. status = running :: running | stopping,
  68. %% Flow requested for this stream.
  69. flow = 0 :: non_neg_integer(),
  70. %% Stream state.
  71. state :: {module, any()}
  72. }).
  73. -record(state, {
  74. parent = undefined :: pid(),
  75. ref :: ranch:ref(),
  76. socket = undefined :: inet:socket(),
  77. transport :: module(),
  78. proxy_header :: undefined | ranch_proxy_header:proxy_info(),
  79. opts = #{} :: opts(),
  80. %% Timer for idle_timeout.
  81. timer = undefined :: undefined | reference(),
  82. %% Remote address and port for the connection.
  83. peer = undefined :: {inet:ip_address(), inet:port_number()},
  84. %% Local address and port for the connection.
  85. sock = undefined :: {inet:ip_address(), inet:port_number()},
  86. %% Client certificate (TLS only).
  87. cert :: undefined | binary(),
  88. %% HTTP/2 state machine.
  89. http2_status :: sequence | settings | upgrade | connected | closing,
  90. http2_machine :: cow_http2_machine:http2_machine(),
  91. %% HTTP/2 frame rate flood protection.
  92. frame_rate_num :: undefined | pos_integer(),
  93. frame_rate_time :: undefined | integer(),
  94. %% HTTP/2 reset stream flood protection.
  95. reset_rate_num :: undefined | pos_integer(),
  96. reset_rate_time :: undefined | integer(),
  97. %% Flow requested for all streams.
  98. flow = 0 :: non_neg_integer(),
  99. %% Currently active HTTP/2 streams. Streams may be initiated either
  100. %% by the client or by the server through PUSH_PROMISE frames.
  101. streams = #{} :: #{cow_http2:streamid() => #stream{}},
  102. %% Streams can spawn zero or more children which are then managed
  103. %% by this module if operating as a supervisor.
  104. children = cowboy_children:init() :: cowboy_children:children()
  105. }).
  106. -spec init(pid(), ranch:ref(), inet:socket(), module(),
  107. ranch_proxy_header:proxy_info() | undefined, cowboy:opts()) -> ok.
  108. init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
  109. Peer0 = Transport:peername(Socket),
  110. Sock0 = Transport:sockname(Socket),
  111. Cert1 = case Transport:name() of
  112. ssl ->
  113. case ssl:peercert(Socket) of
  114. {error, no_peercert} ->
  115. {ok, undefined};
  116. Cert0 ->
  117. Cert0
  118. end;
  119. _ ->
  120. {ok, undefined}
  121. end,
  122. case {Peer0, Sock0, Cert1} of
  123. {{ok, Peer}, {ok, Sock}, {ok, Cert}} ->
  124. init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, <<>>);
  125. {{error, Reason}, _, _} ->
  126. terminate(undefined, {socket_error, Reason,
  127. 'A socket error occurred when retrieving the peer name.'});
  128. {_, {error, Reason}, _} ->
  129. terminate(undefined, {socket_error, Reason,
  130. 'A socket error occurred when retrieving the sock name.'});
  131. {_, _, {error, Reason}} ->
  132. terminate(undefined, {socket_error, Reason,
  133. 'A socket error occurred when retrieving the client TLS certificate.'})
  134. end.
  135. -spec init(pid(), ranch:ref(), inet:socket(), module(),
  136. ranch_proxy_header:proxy_info() | undefined, cowboy:opts(),
  137. {inet:ip_address(), inet:port_number()}, {inet:ip_address(), inet:port_number()},
  138. binary() | undefined, binary()) -> ok.
  139. init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer) ->
  140. {ok, Preface, HTTP2Machine} = cow_http2_machine:init(server, Opts),
  141. State = set_timeout(init_rate_limiting(#state{parent=Parent, ref=Ref, socket=Socket,
  142. transport=Transport, proxy_header=ProxyHeader,
  143. opts=Opts, peer=Peer, sock=Sock, cert=Cert,
  144. http2_status=sequence, http2_machine=HTTP2Machine})),
  145. Transport:send(Socket, Preface),
  146. case Buffer of
  147. <<>> -> loop(State, Buffer);
  148. _ -> parse(State, Buffer)
  149. end.
  150. init_rate_limiting(State=#state{opts=Opts}) ->
  151. {FrameRateNum, FrameRatePeriod} = maps:get(max_received_frame_rate, Opts, {1000, 10000}),
  152. {ResetRateNum, ResetRatePeriod} = maps:get(max_reset_stream_rate, Opts, {10, 10000}),
  153. CurrentTime = erlang:monotonic_time(millisecond),
  154. State#state{
  155. frame_rate_num=FrameRateNum, frame_rate_time=add_period(CurrentTime, FrameRatePeriod),
  156. reset_rate_num=ResetRateNum, reset_rate_time=add_period(CurrentTime, ResetRatePeriod)
  157. }.
  158. add_period(_, infinity) -> infinity;
  159. add_period(Time, Period) -> Time + Period.
  160. %% @todo Add an argument for the request body.
  161. -spec init(pid(), ranch:ref(), inet:socket(), module(),
  162. ranch_proxy_header:proxy_info() | undefined, cowboy:opts(),
  163. {inet:ip_address(), inet:port_number()}, {inet:ip_address(), inet:port_number()},
  164. binary() | undefined, binary(), map() | undefined, cowboy_req:req()) -> ok.
  165. init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer,
  166. _Settings, Req=#{method := Method}) ->
  167. {ok, Preface, HTTP2Machine0} = cow_http2_machine:init(server, Opts),
  168. {ok, StreamID, HTTP2Machine}
  169. = cow_http2_machine:init_upgrade_stream(Method, HTTP2Machine0),
  170. State0 = #state{parent=Parent, ref=Ref, socket=Socket,
  171. transport=Transport, proxy_header=ProxyHeader,
  172. opts=Opts, peer=Peer, sock=Sock, cert=Cert,
  173. http2_status=upgrade, http2_machine=HTTP2Machine},
  174. State1 = headers_frame(State0#state{
  175. http2_machine=HTTP2Machine}, StreamID, Req),
  176. %% We assume that the upgrade will be applied. A stream handler
  177. %% must not prevent the normal operations of the server.
  178. State2 = info(State1, 1, {switch_protocol, #{
  179. <<"connection">> => <<"Upgrade">>,
  180. <<"upgrade">> => <<"h2c">>
  181. }, ?MODULE, undefined}), %% @todo undefined or #{}?
  182. State = set_timeout(init_rate_limiting(State2#state{http2_status=sequence})),
  183. Transport:send(Socket, Preface),
  184. case Buffer of
  185. <<>> -> loop(State, Buffer);
  186. _ -> parse(State, Buffer)
  187. end.
  188. loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
  189. opts=Opts, timer=TimerRef, children=Children}, Buffer) ->
  190. %% @todo This should only be called when data was read.
  191. Transport:setopts(Socket, [{active, once}]),
  192. Messages = Transport:messages(),
  193. InactivityTimeout = maps:get(inactivity_timeout, Opts, 300000),
  194. receive
  195. %% Socket messages.
  196. {OK, Socket, Data} when OK =:= element(1, Messages) ->
  197. parse(set_timeout(State), << Buffer/binary, Data/binary >>);
  198. {Closed, Socket} when Closed =:= element(2, Messages) ->
  199. terminate(State, {socket_error, closed, 'The socket has been closed.'});
  200. {Error, Socket, Reason} when Error =:= element(3, Messages) ->
  201. terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
  202. %% System messages.
  203. {'EXIT', Parent, Reason} ->
  204. %% @todo Graceful shutdown here as well?
  205. terminate(State, {stop, {exit, Reason}, 'Parent process terminated.'});
  206. {system, From, Request} ->
  207. sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer});
  208. %% Timeouts.
  209. {timeout, TimerRef, idle_timeout} ->
  210. terminate(State, {stop, timeout,
  211. 'Connection idle longer than configuration allows.'});
  212. {timeout, Ref, {shutdown, Pid}} ->
  213. cowboy_children:shutdown_timeout(Children, Ref, Pid),
  214. loop(State, Buffer);
  215. {timeout, TRef, {cow_http2_machine, Name}} ->
  216. loop(timeout(State, Name, TRef), Buffer);
  217. %% Messages pertaining to a stream.
  218. {{Pid, StreamID}, Msg} when Pid =:= self() ->
  219. loop(info(State, StreamID, Msg), Buffer);
  220. %% Exit signal from children.
  221. Msg = {'EXIT', Pid, _} ->
  222. loop(down(State, Pid, Msg), Buffer);
  223. %% Calls from supervisor module.
  224. {'$gen_call', From, Call} ->
  225. cowboy_children:handle_supervisor_call(Call, From, Children, ?MODULE),
  226. loop(State, Buffer);
  227. Msg ->
  228. cowboy:log(warning, "Received stray message ~p.", [Msg], Opts),
  229. loop(State, Buffer)
  230. after InactivityTimeout ->
  231. terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
  232. end.
  233. set_timeout(State=#state{opts=Opts, timer=TimerRef0}) ->
  234. ok = case TimerRef0 of
  235. undefined -> ok;
  236. _ -> erlang:cancel_timer(TimerRef0, [{async, true}, {info, false}])
  237. end,
  238. TimerRef = case maps:get(idle_timeout, Opts, 60000) of
  239. infinity -> undefined;
  240. Timeout -> erlang:start_timer(Timeout, self(), idle_timeout)
  241. end,
  242. State#state{timer=TimerRef}.
  243. %% HTTP/2 protocol parsing.
  244. parse(State=#state{http2_status=sequence}, Data) ->
  245. case cow_http2:parse_sequence(Data) of
  246. {ok, Rest} ->
  247. parse(State#state{http2_status=settings}, Rest);
  248. more ->
  249. loop(State, Data);
  250. Error = {connection_error, _, _} ->
  251. terminate(State, Error)
  252. end;
  253. parse(State=#state{http2_status=Status, http2_machine=HTTP2Machine, streams=Streams}, Data) ->
  254. MaxFrameSize = cow_http2_machine:get_local_setting(max_frame_size, HTTP2Machine),
  255. case cow_http2:parse(Data, MaxFrameSize) of
  256. {ok, Frame, Rest} ->
  257. parse(frame_rate(State, Frame), Rest);
  258. {ignore, Rest} ->
  259. parse(frame_rate(State, ignore), Rest);
  260. {stream_error, StreamID, Reason, Human, Rest} ->
  261. parse(reset_stream(State, StreamID, {stream_error, Reason, Human}), Rest);
  262. Error = {connection_error, _, _} ->
  263. terminate(State, Error);
  264. %% Terminate the connection if we are closing and all streams have completed.
  265. more when Status =:= closing, Streams =:= #{} ->
  266. terminate(State, {stop, normal, 'The connection is going away.'});
  267. more ->
  268. loop(State, Data)
  269. end.
  270. %% Frame rate flood protection.
  271. frame_rate(State0=#state{opts=Opts, frame_rate_num=Num0, frame_rate_time=Time}, Frame) ->
  272. {Result, State} = case Num0 - 1 of
  273. 0 ->
  274. CurrentTime = erlang:monotonic_time(millisecond),
  275. if
  276. CurrentTime < Time ->
  277. {error, State0};
  278. true ->
  279. %% When the option has a period of infinity we cannot reach this clause.
  280. {Num, Period} = maps:get(max_received_frame_rate, Opts, {1000, 10000}),
  281. {ok, State0#state{frame_rate_num=Num, frame_rate_time=CurrentTime + Period}}
  282. end;
  283. Num ->
  284. {ok, State0#state{frame_rate_num=Num}}
  285. end,
  286. case {Result, Frame} of
  287. {ok, ignore} -> ignored_frame(State);
  288. {ok, _} -> frame(State, Frame);
  289. {error, _} -> terminate(State, {connection_error, enhance_your_calm,
  290. 'Frame rate larger than configuration allows. Flood? (CVE-2019-9512, CVE-2019-9515, CVE-2019-9518)'})
  291. end.
  292. %% Frames received.
  293. %% We do nothing when receiving a lingering DATA frame.
  294. %% We already removed the stream flow from the connection
  295. %% flow and are therefore already accounting for the window
  296. %% being reduced by these frames.
  297. frame(State=#state{http2_machine=HTTP2Machine0}, Frame) ->
  298. case cow_http2_machine:frame(Frame, HTTP2Machine0) of
  299. {ok, HTTP2Machine} ->
  300. maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame);
  301. {ok, {data, StreamID, IsFin, Data}, HTTP2Machine} ->
  302. data_frame(State#state{http2_machine=HTTP2Machine}, StreamID, IsFin, Data);
  303. {ok, {headers, StreamID, IsFin, Headers, PseudoHeaders, BodyLen}, HTTP2Machine} ->
  304. headers_frame(State#state{http2_machine=HTTP2Machine},
  305. StreamID, IsFin, Headers, PseudoHeaders, BodyLen);
  306. {ok, {trailers, _StreamID, _Trailers}, HTTP2Machine} ->
  307. %% @todo Propagate trailers.
  308. State#state{http2_machine=HTTP2Machine};
  309. {ok, {rst_stream, StreamID, Reason}, HTTP2Machine} ->
  310. rst_stream_frame(State#state{http2_machine=HTTP2Machine}, StreamID, Reason);
  311. {ok, GoAway={goaway, _, _, _}, HTTP2Machine} ->
  312. goaway(State#state{http2_machine=HTTP2Machine}, GoAway);
  313. {send, SendData, HTTP2Machine} ->
  314. %% We may need to send an alarm for each of the streams sending data.
  315. lists:foldl(
  316. fun({StreamID, _, _}, S) -> maybe_send_data_alarm(S, HTTP2Machine0, StreamID) end,
  317. send_data(maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame), SendData),
  318. SendData);
  319. {error, {stream_error, StreamID, Reason, Human}, HTTP2Machine} ->
  320. reset_stream(State#state{http2_machine=HTTP2Machine},
  321. StreamID, {stream_error, Reason, Human});
  322. {error, Error={connection_error, _, _}, HTTP2Machine} ->
  323. terminate(State#state{http2_machine=HTTP2Machine}, Error)
  324. end.
  325. %% We use this opportunity to mark the HTTP/2 status as connected
  326. %% if we were still waiting for a SETTINGS frame.
  327. maybe_ack(State=#state{http2_status=settings}, Frame) ->
  328. maybe_ack(State#state{http2_status=connected}, Frame);
  329. maybe_ack(State=#state{socket=Socket, transport=Transport}, Frame) ->
  330. case Frame of
  331. {settings, _} -> Transport:send(Socket, cow_http2:settings_ack());
  332. {ping, Opaque} -> Transport:send(Socket, cow_http2:ping_ack(Opaque));
  333. _ -> ok
  334. end,
  335. State.
  336. data_frame(State0=#state{opts=Opts, flow=Flow, streams=Streams}, StreamID, IsFin, Data) ->
  337. case Streams of
  338. #{StreamID := Stream=#stream{status=running, flow=StreamFlow, state=StreamState0}} ->
  339. try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
  340. {Commands, StreamState} ->
  341. %% Remove the amount of data received from the flow.
  342. %% We may receive more data than we requested. We ensure
  343. %% that the flow value doesn't go lower than 0.
  344. Size = byte_size(Data),
  345. State = update_window(State0#state{flow=max(0, Flow - Size),
  346. streams=Streams#{StreamID => Stream#stream{
  347. flow=max(0, StreamFlow - Size), state=StreamState}}},
  348. StreamID),
  349. commands(State, StreamID, Commands)
  350. catch Class:Exception:Stacktrace ->
  351. cowboy:log(cowboy_stream:make_error_log(data,
  352. [StreamID, IsFin, Data, StreamState0],
  353. Class, Exception, Stacktrace), Opts),
  354. reset_stream(State0, StreamID, {internal_error, {Class, Exception},
  355. 'Unhandled exception in cowboy_stream:data/4.'})
  356. end;
  357. %% We ignore DATA frames for streams that are stopping.
  358. #{} ->
  359. State0
  360. end.
  361. headers_frame(State, StreamID, IsFin, Headers,
  362. PseudoHeaders=#{method := <<"CONNECT">>}, _)
  363. when map_size(PseudoHeaders) =:= 2 ->
  364. early_error(State, StreamID, IsFin, Headers, PseudoHeaders, 501,
  365. 'The CONNECT method is currently not implemented. (RFC7231 4.3.6)');
  366. headers_frame(State, StreamID, IsFin, Headers,
  367. PseudoHeaders=#{method := <<"TRACE">>}, _) ->
  368. early_error(State, StreamID, IsFin, Headers, PseudoHeaders, 501,
  369. 'The TRACE method is currently not implemented. (RFC7231 4.3.8)');
  370. headers_frame(State, StreamID, IsFin, Headers, PseudoHeaders=#{authority := Authority}, BodyLen) ->
  371. headers_frame_parse_host(State, StreamID, IsFin, Headers, PseudoHeaders, BodyLen, Authority);
  372. headers_frame(State, StreamID, IsFin, Headers, PseudoHeaders, BodyLen) ->
  373. case lists:keyfind(<<"host">>, 1, Headers) of
  374. {_, Authority} ->
  375. headers_frame_parse_host(State, StreamID, IsFin, Headers, PseudoHeaders, BodyLen, Authority);
  376. _ ->
  377. reset_stream(State, StreamID, {stream_error, protocol_error,
  378. 'Requests translated from HTTP/1.1 must include a host header. (RFC7540 8.1.2.3, RFC7230 5.4)'})
  379. end.
  380. headers_frame_parse_host(State=#state{ref=Ref, peer=Peer, sock=Sock, cert=Cert, proxy_header=ProxyHeader},
  381. StreamID, IsFin, Headers, PseudoHeaders=#{method := Method, scheme := Scheme, path := PathWithQs},
  382. BodyLen, Authority) ->
  383. try cow_http_hd:parse_host(Authority) of
  384. {Host, Port0} ->
  385. Port = ensure_port(Scheme, Port0),
  386. try cow_http:parse_fullpath(PathWithQs) of
  387. {<<>>, _} ->
  388. reset_stream(State, StreamID, {stream_error, protocol_error,
  389. 'The path component must not be empty. (RFC7540 8.1.2.3)'});
  390. {Path, Qs} ->
  391. Req0 = #{
  392. ref => Ref,
  393. pid => self(),
  394. streamid => StreamID,
  395. peer => Peer,
  396. sock => Sock,
  397. cert => Cert,
  398. method => Method,
  399. scheme => Scheme,
  400. host => Host,
  401. port => Port,
  402. path => Path,
  403. qs => Qs,
  404. version => 'HTTP/2',
  405. headers => headers_to_map(Headers, #{}),
  406. has_body => IsFin =:= nofin,
  407. body_length => BodyLen
  408. },
  409. %% We add the PROXY header information if any.
  410. Req1 = case ProxyHeader of
  411. undefined -> Req0;
  412. _ -> Req0#{proxy_header => ProxyHeader}
  413. end,
  414. %% We add the protocol information for extended CONNECTs.
  415. Req = case PseudoHeaders of
  416. #{protocol := Protocol} -> Req1#{protocol => Protocol};
  417. _ -> Req1
  418. end,
  419. headers_frame(State, StreamID, Req)
  420. catch _:_ ->
  421. reset_stream(State, StreamID, {stream_error, protocol_error,
  422. 'The :path pseudo-header is invalid. (RFC7540 8.1.2.3)'})
  423. end
  424. catch _:_ ->
  425. reset_stream(State, StreamID, {stream_error, protocol_error,
  426. 'The :authority pseudo-header is invalid. (RFC7540 8.1.2.3)'})
  427. end.
  428. ensure_port(<<"http">>, undefined) -> 80;
  429. ensure_port(<<"https">>, undefined) -> 443;
  430. ensure_port(_, Port) -> Port.
  431. %% This function is necessary to properly handle duplicate headers
  432. %% and the special-case cookie header.
  433. headers_to_map([], Acc) ->
  434. Acc;
  435. headers_to_map([{Name, Value}|Tail], Acc0) ->
  436. Acc = case Acc0 of
  437. %% The cookie header does not use proper HTTP header lists.
  438. #{Name := Value0} when Name =:= <<"cookie">> ->
  439. Acc0#{Name => << Value0/binary, "; ", Value/binary >>};
  440. #{Name := Value0} ->
  441. Acc0#{Name => << Value0/binary, ", ", Value/binary >>};
  442. _ ->
  443. Acc0#{Name => Value}
  444. end,
  445. headers_to_map(Tail, Acc).
  446. headers_frame(State=#state{opts=Opts, streams=Streams}, StreamID, Req) ->
  447. try cowboy_stream:init(StreamID, Req, Opts) of
  448. {Commands, StreamState} ->
  449. commands(State#state{
  450. streams=Streams#{StreamID => #stream{state=StreamState}}},
  451. StreamID, Commands)
  452. catch Class:Exception:Stacktrace ->
  453. cowboy:log(cowboy_stream:make_error_log(init,
  454. [StreamID, Req, Opts],
  455. Class, Exception, Stacktrace), Opts),
  456. reset_stream(State, StreamID, {internal_error, {Class, Exception},
  457. 'Unhandled exception in cowboy_stream:init/3.'})
  458. end.
  459. early_error(State0=#state{ref=Ref, opts=Opts, peer=Peer},
  460. StreamID, _IsFin, Headers, #{method := Method},
  461. StatusCode0, HumanReadable) ->
  462. %% We automatically terminate the stream but it is not an error
  463. %% per se (at least not in the first implementation).
  464. Reason = {stream_error, no_error, HumanReadable},
  465. %% The partial Req is minimal for now. We only have one case
  466. %% where it can be called (when a method is completely disabled).
  467. %% @todo Fill in the other elements.
  468. PartialReq = #{
  469. ref => Ref,
  470. peer => Peer,
  471. method => Method,
  472. headers => headers_to_map(Headers, #{})
  473. },
  474. Resp = {response, StatusCode0, RespHeaders0=#{<<"content-length">> => <<"0">>}, <<>>},
  475. try cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts) of
  476. {response, StatusCode, RespHeaders, RespBody} ->
  477. send_response(State0, StreamID, StatusCode, RespHeaders, RespBody)
  478. catch Class:Exception:Stacktrace ->
  479. cowboy:log(cowboy_stream:make_error_log(early_error,
  480. [StreamID, Reason, PartialReq, Resp, Opts],
  481. Class, Exception, Stacktrace), Opts),
  482. %% We still need to send an error response, so send what we initially
  483. %% wanted to send. It's better than nothing.
  484. send_headers(State0, StreamID, fin, StatusCode0, RespHeaders0)
  485. end.
  486. rst_stream_frame(State=#state{streams=Streams0, children=Children0}, StreamID, Reason) ->
  487. case maps:take(StreamID, Streams0) of
  488. {#stream{state=StreamState}, Streams} ->
  489. terminate_stream_handler(State, StreamID, Reason, StreamState),
  490. Children = cowboy_children:shutdown(Children0, StreamID),
  491. State#state{streams=Streams, children=Children};
  492. error ->
  493. State
  494. end.
  495. ignored_frame(State=#state{http2_machine=HTTP2Machine0}) ->
  496. case cow_http2_machine:ignored_frame(HTTP2Machine0) of
  497. {ok, HTTP2Machine} ->
  498. State#state{http2_machine=HTTP2Machine};
  499. {error, Error={connection_error, _, _}, HTTP2Machine} ->
  500. terminate(State#state{http2_machine=HTTP2Machine}, Error)
  501. end.
  502. %% HTTP/2 timeouts.
  503. timeout(State=#state{http2_machine=HTTP2Machine0}, Name, TRef) ->
  504. case cow_http2_machine:timeout(Name, TRef, HTTP2Machine0) of
  505. {ok, HTTP2Machine} ->
  506. State#state{http2_machine=HTTP2Machine};
  507. {error, Error={connection_error, _, _}, HTTP2Machine} ->
  508. terminate(State#state{http2_machine=HTTP2Machine}, Error)
  509. end.
  510. %% Erlang messages.
  511. down(State=#state{opts=Opts, children=Children0}, Pid, Msg) ->
  512. case cowboy_children:down(Children0, Pid) of
  513. %% The stream was terminated already.
  514. {ok, undefined, Children} ->
  515. State#state{children=Children};
  516. %% The stream is still running.
  517. {ok, StreamID, Children} ->
  518. info(State#state{children=Children}, StreamID, Msg);
  519. %% The process was unknown.
  520. error ->
  521. cowboy:log(warning, "Received EXIT signal ~p for unknown process ~p.~n",
  522. [Msg, Pid], Opts),
  523. State
  524. end.
  525. info(State=#state{opts=Opts, http2_machine=HTTP2Machine, streams=Streams}, StreamID, Msg) ->
  526. case Streams of
  527. #{StreamID := Stream=#stream{state=StreamState0}} ->
  528. try cowboy_stream:info(StreamID, Msg, StreamState0) of
  529. {Commands, StreamState} ->
  530. commands(State#state{streams=Streams#{StreamID => Stream#stream{state=StreamState}}},
  531. StreamID, Commands)
  532. catch Class:Exception:Stacktrace ->
  533. cowboy:log(cowboy_stream:make_error_log(info,
  534. [StreamID, Msg, StreamState0],
  535. Class, Exception, Stacktrace), Opts),
  536. reset_stream(State, StreamID, {internal_error, {Class, Exception},
  537. 'Unhandled exception in cowboy_stream:info/3.'})
  538. end;
  539. _ ->
  540. case cow_http2_machine:is_lingering_stream(StreamID, HTTP2Machine) of
  541. true ->
  542. ok;
  543. false ->
  544. cowboy:log(warning, "Received message ~p for unknown stream ~p.",
  545. [Msg, StreamID], Opts)
  546. end,
  547. State
  548. end.
  549. %% Stream handler commands.
  550. %%
  551. %% @todo Kill the stream if it tries to send a response, headers,
  552. %% data or push promise when the stream is closed or half-closed.
  553. commands(State, _, []) ->
  554. State;
  555. %% Error responses are sent only if a response wasn't sent already.
  556. commands(State=#state{http2_machine=HTTP2Machine}, StreamID,
  557. [{error_response, StatusCode, Headers, Body}|Tail]) ->
  558. case cow_http2_machine:get_stream_local_state(StreamID, HTTP2Machine) of
  559. {ok, idle, _} ->
  560. commands(State, StreamID, [{response, StatusCode, Headers, Body}|Tail]);
  561. _ ->
  562. commands(State, StreamID, Tail)
  563. end;
  564. %% Send an informational response.
  565. commands(State0, StreamID, [{inform, StatusCode, Headers}|Tail]) ->
  566. State = send_headers(State0, StreamID, idle, StatusCode, Headers),
  567. commands(State, StreamID, Tail);
  568. %% Send response headers.
  569. commands(State0, StreamID, [{response, StatusCode, Headers, Body}|Tail]) ->
  570. State = send_response(State0, StreamID, StatusCode, Headers, Body),
  571. commands(State, StreamID, Tail);
  572. %% Send response headers.
  573. commands(State0, StreamID, [{headers, StatusCode, Headers}|Tail]) ->
  574. State = send_headers(State0, StreamID, nofin, StatusCode, Headers),
  575. commands(State, StreamID, Tail);
  576. %% Send a response body chunk.
  577. commands(State0, StreamID, [{data, IsFin, Data}|Tail]) ->
  578. State = maybe_send_data(State0, StreamID, IsFin, Data),
  579. commands(State, StreamID, Tail);
  580. %% Send trailers.
  581. commands(State0, StreamID, [{trailers, Trailers}|Tail]) ->
  582. State = maybe_send_data(State0, StreamID, fin, {trailers, maps:to_list(Trailers)}),
  583. commands(State, StreamID, Tail);
  584. %% Send a push promise.
  585. %%
  586. %% @todo Responses sent as a result of a push_promise request
  587. %% must not send push_promise frames themselves.
  588. %%
  589. %% @todo We should not send push_promise frames when we are
  590. %% in the closing http2_status.
  591. commands(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0},
  592. StreamID, [{push, Method, Scheme, Host, Port, Path, Qs, Headers0}|Tail]) ->
  593. Authority = case {Scheme, Port} of
  594. {<<"http">>, 80} -> Host;
  595. {<<"https">>, 443} -> Host;
  596. _ -> iolist_to_binary([Host, $:, integer_to_binary(Port)])
  597. end,
  598. PathWithQs = iolist_to_binary(case Qs of
  599. <<>> -> Path;
  600. _ -> [Path, $?, Qs]
  601. end),
  602. PseudoHeaders = #{
  603. method => Method,
  604. scheme => Scheme,
  605. authority => Authority,
  606. path => PathWithQs
  607. },
  608. %% We need to make sure the header value is binary before we can
  609. %% create the Req object, as it expects them to be flat.
  610. Headers = maps:to_list(maps:map(fun(_, V) -> iolist_to_binary(V) end, Headers0)),
  611. State = case cow_http2_machine:prepare_push_promise(StreamID, HTTP2Machine0,
  612. PseudoHeaders, Headers) of
  613. {ok, PromisedStreamID, HeaderBlock, HTTP2Machine} ->
  614. Transport:send(Socket, cow_http2:push_promise(
  615. StreamID, PromisedStreamID, HeaderBlock)),
  616. headers_frame(State0#state{http2_machine=HTTP2Machine},
  617. PromisedStreamID, fin, Headers, PseudoHeaders, 0);
  618. {error, no_push} ->
  619. State0
  620. end,
  621. commands(State, StreamID, Tail);
  622. %% Read the request body.
  623. commands(State0=#state{flow=Flow, streams=Streams}, StreamID, [{flow, Size}|Tail]) ->
  624. #{StreamID := Stream=#stream{flow=StreamFlow}} = Streams,
  625. State = update_window(State0#state{flow=Flow + Size,
  626. streams=Streams#{StreamID => Stream#stream{flow=StreamFlow + Size}}},
  627. StreamID),
  628. commands(State, StreamID, Tail);
  629. %% Supervise a child process.
  630. commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) ->
  631. commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)},
  632. StreamID, Tail);
  633. %% Error handling.
  634. commands(State, StreamID, [Error = {internal_error, _, _}|_Tail]) ->
  635. %% @todo Do we want to run the commands after an internal_error?
  636. %% @todo Do we even allow commands after?
  637. %% @todo Only reset when the stream still exists.
  638. reset_stream(State, StreamID, Error);
  639. %% Upgrade to HTTP/2. This is triggered by cowboy_http2 itself.
  640. commands(State=#state{socket=Socket, transport=Transport, http2_status=upgrade},
  641. StreamID, [{switch_protocol, Headers, ?MODULE, _}|Tail]) ->
  642. %% @todo This 101 response needs to be passed through stream handlers.
  643. Transport:send(Socket, cow_http:response(101, 'HTTP/1.1', maps:to_list(Headers))),
  644. commands(State, StreamID, Tail);
  645. %% Use a different protocol within the stream (CONNECT :protocol).
  646. %% @todo Make sure we error out when the feature is disabled.
  647. commands(State0, StreamID, [{switch_protocol, Headers, _Mod, _ModState}|Tail]) ->
  648. State = info(State0, StreamID, {headers, 200, Headers}),
  649. commands(State, StreamID, Tail);
  650. %% Set options dynamically.
  651. commands(State, StreamID, [{set_options, _Opts}|Tail]) ->
  652. commands(State, StreamID, Tail);
  653. commands(State, StreamID, [stop|_Tail]) ->
  654. %% @todo Do we want to run the commands after a stop?
  655. %% @todo Do we even allow commands after?
  656. stop_stream(State, StreamID);
  657. %% Log event.
  658. commands(State=#state{opts=Opts}, StreamID, [Log={log, _, _, _}|Tail]) ->
  659. cowboy:log(Log, Opts),
  660. commands(State, StreamID, Tail).
  661. %% Tentatively update the window after the flow was updated.
  662. update_window(State=#state{socket=Socket, transport=Transport,
  663. http2_machine=HTTP2Machine0, flow=Flow, streams=Streams}, StreamID) ->
  664. #{StreamID := #stream{flow=StreamFlow}} = Streams,
  665. {Data1, HTTP2Machine2} = case cow_http2_machine:ensure_window(Flow, HTTP2Machine0) of
  666. ok -> {<<>>, HTTP2Machine0};
  667. {ok, Increment1, HTTP2Machine1} -> {cow_http2:window_update(Increment1), HTTP2Machine1}
  668. end,
  669. {Data2, HTTP2Machine} = case cow_http2_machine:ensure_window(StreamID, StreamFlow, HTTP2Machine2) of
  670. ok -> {<<>>, HTTP2Machine2};
  671. {ok, Increment2, HTTP2Machine3} -> {cow_http2:window_update(StreamID, Increment2), HTTP2Machine3}
  672. end,
  673. case {Data1, Data2} of
  674. {<<>>, <<>>} -> ok;
  675. _ -> Transport:send(Socket, [Data1, Data2])
  676. end,
  677. State#state{http2_machine=HTTP2Machine}.
  678. %% Send the response, trailers or data.
  679. send_response(State0, StreamID, StatusCode, Headers, Body) ->
  680. Size = case Body of
  681. {sendfile, _, Bytes, _} -> Bytes;
  682. _ -> iolist_size(Body)
  683. end,
  684. case Size of
  685. 0 ->
  686. State = send_headers(State0, StreamID, fin, StatusCode, Headers),
  687. maybe_terminate_stream(State, StreamID, fin);
  688. _ ->
  689. State = send_headers(State0, StreamID, nofin, StatusCode, Headers),
  690. maybe_send_data(State, StreamID, fin, Body)
  691. end.
  692. send_headers(State=#state{socket=Socket, transport=Transport,
  693. http2_machine=HTTP2Machine0}, StreamID, IsFin0, StatusCode, Headers) ->
  694. {ok, IsFin, HeaderBlock, HTTP2Machine}
  695. = cow_http2_machine:prepare_headers(StreamID, HTTP2Machine0, IsFin0,
  696. #{status => cow_http:status_to_integer(StatusCode)},
  697. headers_to_list(Headers)),
  698. Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)),
  699. State#state{http2_machine=HTTP2Machine}.
  700. %% The set-cookie header is special; we can only send one cookie per header.
  701. headers_to_list(Headers0=#{<<"set-cookie">> := SetCookies}) ->
  702. Headers = maps:to_list(maps:remove(<<"set-cookie">>, Headers0)),
  703. Headers ++ [{<<"set-cookie">>, Value} || Value <- SetCookies];
  704. headers_to_list(Headers) ->
  705. maps:to_list(Headers).
  706. maybe_send_data(State0=#state{http2_machine=HTTP2Machine0}, StreamID, IsFin, Data0) ->
  707. Data = case is_tuple(Data0) of
  708. false -> {data, Data0};
  709. true -> Data0
  710. end,
  711. case cow_http2_machine:send_or_queue_data(StreamID, HTTP2Machine0, IsFin, Data) of
  712. {ok, HTTP2Machine} ->
  713. maybe_send_data_alarm(State0#state{http2_machine=HTTP2Machine}, HTTP2Machine0, StreamID);
  714. {send, SendData, HTTP2Machine} ->
  715. State = #state{http2_status=Status, streams=Streams}
  716. = send_data(State0#state{http2_machine=HTTP2Machine}, SendData),
  717. %% Terminate the connection if we are closing and all streams have completed.
  718. if
  719. Status =:= closing, Streams =:= #{} ->
  720. terminate(State, {stop, normal, 'The connection is going away.'});
  721. true ->
  722. maybe_send_data_alarm(State, HTTP2Machine0, StreamID)
  723. end
  724. end.
  725. send_data(State, []) ->
  726. State;
  727. send_data(State0, [{StreamID, IsFin, SendData}|Tail]) ->
  728. State = send_data(State0, StreamID, IsFin, SendData),
  729. send_data(State, Tail).
  730. send_data(State0, StreamID, IsFin, [Data]) ->
  731. State = send_data_frame(State0, StreamID, IsFin, Data),
  732. maybe_terminate_stream(State, StreamID, IsFin);
  733. send_data(State0, StreamID, IsFin, [Data|Tail]) ->
  734. State = send_data_frame(State0, StreamID, nofin, Data),
  735. send_data(State, StreamID, IsFin, Tail).
  736. send_data_frame(State=#state{socket=Socket, transport=Transport},
  737. StreamID, IsFin, {data, Data}) ->
  738. Transport:send(Socket, cow_http2:data(StreamID, IsFin, Data)),
  739. State;
  740. send_data_frame(State=#state{socket=Socket, transport=Transport, opts=Opts},
  741. StreamID, IsFin, {sendfile, Offset, Bytes, Path}) ->
  742. Transport:send(Socket, cow_http2:data_header(StreamID, IsFin, Bytes)),
  743. %% When sendfile is disabled we explicitly use the fallback.
  744. _ = case maps:get(sendfile, Opts, true) of
  745. true -> Transport:sendfile(Socket, Path, Offset, Bytes);
  746. false -> ranch_transport:sendfile(Transport, Socket, Path, Offset, Bytes, [])
  747. end,
  748. State;
  749. %% The stream is terminated in cow_http2_machine:prepare_trailers.
  750. send_data_frame(State=#state{socket=Socket, transport=Transport,
  751. http2_machine=HTTP2Machine0}, StreamID, nofin, {trailers, Trailers}) ->
  752. {ok, HeaderBlock, HTTP2Machine}
  753. = cow_http2_machine:prepare_trailers(StreamID, HTTP2Machine0, Trailers),
  754. Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)),
  755. State#state{http2_machine=HTTP2Machine}.
  756. %% After we have sent or queued data we may need to set or clear an alarm.
  757. %% We do this by comparing the HTTP2Machine buffer state before/after for
  758. %% the relevant streams.
  759. maybe_send_data_alarm(State=#state{opts=Opts, http2_machine=HTTP2Machine}, HTTP2Machine0, StreamID) ->
  760. ConnBufferSizeBefore = cow_http2_machine:get_connection_local_buffer_size(HTTP2Machine0),
  761. ConnBufferSizeAfter = cow_http2_machine:get_connection_local_buffer_size(HTTP2Machine),
  762. {ok, StreamBufferSizeBefore} = cow_http2_machine:get_stream_local_buffer_size(StreamID, HTTP2Machine0),
  763. %% When the stream ends up closed after it finished sending data,
  764. %% we do not want to trigger an alarm. We act as if the buffer
  765. %% size did not change.
  766. StreamBufferSizeAfter = case cow_http2_machine:get_stream_local_buffer_size(StreamID, HTTP2Machine) of
  767. {ok, BSA} -> BSA;
  768. {error, closed} -> StreamBufferSizeBefore
  769. end,
  770. MaxConnBufferSize = maps:get(max_connection_buffer_size, Opts, 16000000),
  771. MaxStreamBufferSize = maps:get(max_stream_buffer_size, Opts, 8000000),
  772. %% I do not want to document these internal events yet. I am not yet
  773. %% convinced it should be {alarm, Name, on|off} and not {internal_event, E}
  774. %% or something else entirely. Though alarms are probably right.
  775. if
  776. ConnBufferSizeBefore >= MaxConnBufferSize, ConnBufferSizeAfter < MaxConnBufferSize ->
  777. connection_alarm(State, connection_buffer_full, off);
  778. ConnBufferSizeBefore < MaxConnBufferSize, ConnBufferSizeAfter >= MaxConnBufferSize ->
  779. connection_alarm(State, connection_buffer_full, on);
  780. StreamBufferSizeBefore >= MaxStreamBufferSize, StreamBufferSizeAfter < MaxStreamBufferSize ->
  781. stream_alarm(State, StreamID, stream_buffer_full, off);
  782. StreamBufferSizeBefore < MaxStreamBufferSize, StreamBufferSizeAfter >= MaxStreamBufferSize ->
  783. stream_alarm(State, StreamID, stream_buffer_full, on);
  784. true ->
  785. State
  786. end.
  787. connection_alarm(State0=#state{streams=Streams}, Name, Value) ->
  788. lists:foldl(fun(StreamID, State) ->
  789. stream_alarm(State, StreamID, Name, Value)
  790. end, State0, maps:keys(Streams)).
  791. stream_alarm(State, StreamID, Name, Value) ->
  792. info(State, StreamID, {alarm, Name, Value}).
  793. %% Terminate a stream or the connection.
  794. %% We may have to cancel streams even if we receive multiple
  795. %% GOAWAY frames as the LastStreamID value may be lower than
  796. %% the one previously received.
  797. goaway(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine,
  798. http2_status=Status, streams=Streams0}, {goaway, LastStreamID, Reason, _})
  799. when Status =:= connected; Status =:= closing ->
  800. Streams = goaway_streams(State0, maps:to_list(Streams0), LastStreamID,
  801. {stop, {goaway, Reason}, 'The connection is going away.'}, []),
  802. State = State0#state{streams=maps:from_list(Streams)},
  803. case Status of
  804. connected ->
  805. Transport:send(Socket, cow_http2:goaway(
  806. cow_http2_machine:get_last_streamid(HTTP2Machine),
  807. no_error, <<>>)),
  808. State#state{http2_status=closing};
  809. _ ->
  810. State
  811. end;
  812. %% We terminate the connection immediately if it hasn't fully been initialized.
  813. goaway(State, {goaway, _, Reason, _}) ->
  814. terminate(State, {stop, {goaway, Reason}, 'The connection is going away.'}).
  815. %% Cancel client-initiated streams that are above LastStreamID.
  816. goaway_streams(_, [], _, _, Acc) ->
  817. Acc;
  818. goaway_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], LastStreamID, Reason, Acc)
  819. when StreamID > LastStreamID, (StreamID rem 2) =:= 0 ->
  820. terminate_stream_handler(State, StreamID, Reason, StreamState),
  821. goaway_streams(State, Tail, LastStreamID, Reason, Acc);
  822. goaway_streams(State, [Stream|Tail], LastStreamID, Reason, Acc) ->
  823. goaway_streams(State, Tail, LastStreamID, Reason, [Stream|Acc]).
  824. -spec terminate(#state{}, _) -> no_return().
  825. terminate(undefined, Reason) ->
  826. exit({shutdown, Reason});
  827. terminate(State=#state{socket=Socket, transport=Transport, http2_status=Status,
  828. http2_machine=HTTP2Machine, streams=Streams, children=Children}, Reason)
  829. when Status =:= connected; Status =:= closing ->
  830. %% @todo We might want to optionally send the Reason value
  831. %% as debug data in the GOAWAY frame here. Perhaps more.
  832. case Status of
  833. connected ->
  834. Transport:send(Socket, cow_http2:goaway(
  835. cow_http2_machine:get_last_streamid(HTTP2Machine),
  836. terminate_reason(Reason), <<>>));
  837. %% We already sent the GOAWAY frame.
  838. closing ->
  839. ok
  840. end,
  841. terminate_all_streams(State, maps:to_list(Streams), Reason),
  842. cowboy_children:terminate(Children),
  843. Transport:close(Socket),
  844. exit({shutdown, Reason});
  845. terminate(#state{socket=Socket, transport=Transport}, Reason) ->
  846. Transport:close(Socket),
  847. exit({shutdown, Reason}).
  848. terminate_reason({connection_error, Reason, _}) -> Reason;
  849. terminate_reason({stop, _, _}) -> no_error;
  850. terminate_reason({socket_error, _, _}) -> internal_error;
  851. terminate_reason({internal_error, _, _}) -> internal_error.
  852. terminate_all_streams(_, [], _) ->
  853. ok;
  854. terminate_all_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], Reason) ->
  855. terminate_stream_handler(State, StreamID, Reason, StreamState),
  856. terminate_all_streams(State, Tail, Reason).
  857. %% @todo Don't send an RST_STREAM if one was already sent.
  858. reset_stream(State0=#state{socket=Socket, transport=Transport,
  859. http2_machine=HTTP2Machine0}, StreamID, Error) ->
  860. Reason = case Error of
  861. {internal_error, _, _} -> internal_error;
  862. {stream_error, Reason0, _} -> Reason0
  863. end,
  864. Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)),
  865. State1 = case cow_http2_machine:reset_stream(StreamID, HTTP2Machine0) of
  866. {ok, HTTP2Machine} ->
  867. terminate_stream(State0#state{http2_machine=HTTP2Machine}, StreamID, Error);
  868. {error, not_found} ->
  869. terminate_stream(State0, StreamID, Error)
  870. end,
  871. case reset_rate(State1) of
  872. {ok, State} ->
  873. State;
  874. error ->
  875. terminate(State1, {connection_error, enhance_your_calm,
  876. 'Stream reset rate larger than configuration allows. Flood? (CVE-2019-9514)'})
  877. end.
  878. reset_rate(State0=#state{opts=Opts, reset_rate_num=Num0, reset_rate_time=Time}) ->
  879. case Num0 - 1 of
  880. 0 ->
  881. CurrentTime = erlang:monotonic_time(millisecond),
  882. if
  883. CurrentTime < Time ->
  884. error;
  885. true ->
  886. %% When the option has a period of infinity we cannot reach this clause.
  887. {Num, Period} = maps:get(max_reset_stream_rate, Opts, {10, 10000}),
  888. {ok, State0#state{reset_rate_num=Num, reset_rate_time=CurrentTime + Period}}
  889. end;
  890. Num ->
  891. {ok, State0#state{reset_rate_num=Num}}
  892. end.
  893. stop_stream(State=#state{http2_machine=HTTP2Machine}, StreamID) ->
  894. case cow_http2_machine:get_stream_local_state(StreamID, HTTP2Machine) of
  895. %% When the stream terminates normally (without sending RST_STREAM)
  896. %% and no response was sent, we need to send a proper response back to the client.
  897. %% We delay the termination of the stream until the response is fully sent.
  898. {ok, idle, _} ->
  899. info(stopping(State, StreamID), StreamID, {response, 204, #{}, <<>>});
  900. %% When a response was sent but not terminated, we need to close the stream.
  901. %% We delay the termination of the stream until the response is fully sent.
  902. {ok, nofin, fin} ->
  903. stopping(State, StreamID);
  904. %% We only send a final DATA frame if there isn't one queued yet.
  905. {ok, nofin, _} ->
  906. info(stopping(State, StreamID), StreamID, {data, fin, <<>>});
  907. %% When a response was sent fully we can terminate the stream,
  908. %% regardless of the stream being in half-closed or closed state.
  909. _ ->
  910. terminate_stream(State, StreamID)
  911. end.
  912. stopping(State=#state{streams=Streams}, StreamID) ->
  913. #{StreamID := Stream} = Streams,
  914. State#state{streams=Streams#{StreamID => Stream#stream{status=stopping}}}.
  915. %% If we finished sending data and the stream is stopping, terminate it.
  916. maybe_terminate_stream(State=#state{streams=Streams}, StreamID, fin) ->
  917. case Streams of
  918. #{StreamID := #stream{status=stopping}} ->
  919. terminate_stream(State, StreamID);
  920. _ ->
  921. State
  922. end;
  923. maybe_terminate_stream(State, _, _) ->
  924. State.
  925. %% When the stream stops normally without reading the request
  926. %% body fully we need to tell the client to stop sending it.
  927. %% We do this by sending an RST_STREAM with reason NO_ERROR. (RFC7540 8.1.0)
  928. terminate_stream(State0=#state{socket=Socket, transport=Transport,
  929. http2_machine=HTTP2Machine0}, StreamID) ->
  930. State = case cow_http2_machine:get_stream_local_state(StreamID, HTTP2Machine0) of
  931. {ok, fin, _} ->
  932. Transport:send(Socket, cow_http2:rst_stream(StreamID, no_error)),
  933. {ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0),
  934. State0#state{http2_machine=HTTP2Machine};
  935. {error, closed} ->
  936. State0
  937. end,
  938. terminate_stream(State, StreamID, normal).
  939. %% We remove the stream flow from the connection flow. Any further
  940. %% data received for this stream is therefore fully contained within
  941. %% the extra window we allocated for this stream.
  942. terminate_stream(State=#state{flow=Flow, streams=Streams0, children=Children0}, StreamID, Reason) ->
  943. case maps:take(StreamID, Streams0) of
  944. {#stream{flow=StreamFlow, state=StreamState}, Streams} ->
  945. terminate_stream_handler(State, StreamID, Reason, StreamState),
  946. Children = cowboy_children:shutdown(Children0, StreamID),
  947. State#state{flow=Flow - StreamFlow, streams=Streams, children=Children};
  948. error ->
  949. State
  950. end.
  951. terminate_stream_handler(#state{opts=Opts}, StreamID, Reason, StreamState) ->
  952. try
  953. cowboy_stream:terminate(StreamID, Reason, StreamState)
  954. catch Class:Exception:Stacktrace ->
  955. cowboy:log(cowboy_stream:make_error_log(terminate,
  956. [StreamID, Reason, StreamState],
  957. Class, Exception, Stacktrace), Opts)
  958. end.
  959. %% System callbacks.
  960. -spec system_continue(_, _, {#state{}, binary()}) -> ok.
  961. system_continue(_, _, {State, Buffer}) ->
  962. loop(State, Buffer).
  963. -spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return().
  964. system_terminate(Reason, _, _, {State, _}) ->
  965. %% @todo Graceful shutdown here as well?
  966. terminate(State, {stop, {exit, Reason}, 'sys:terminate/2,3 was called.'}).
  967. -spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}.
  968. system_code_change(Misc, _, _, _) ->
  969. {ok, Misc}.