cowboy_http2.erl 49 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225
  1. %% Copyright (c) 2015-2017, Loïc Hoguin <essen@ninenines.eu>
  2. %%
  3. %% Permission to use, copy, modify, and/or distribute this software for any
  4. %% purpose with or without fee is hereby granted, provided that the above
  5. %% copyright notice and this permission notice appear in all copies.
  6. %%
  7. %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  8. %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  9. %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  10. %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  11. %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  12. %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  13. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  14. -module(cowboy_http2).
  15. -export([init/6]).
  16. -export([init/10]).
  17. -export([init/12]).
  18. -export([system_continue/3]).
  19. -export([system_terminate/4]).
  20. -export([system_code_change/4]).
  21. -type opts() :: #{
  22. active_n => pos_integer(),
  23. compress_buffering => boolean(),
  24. compress_threshold => non_neg_integer(),
  25. connection_type => worker | supervisor,
  26. connection_window_margin_size => 0..16#7fffffff,
  27. connection_window_update_threshold => 0..16#7fffffff,
  28. enable_connect_protocol => boolean(),
  29. env => cowboy_middleware:env(),
  30. goaway_initial_timeout => timeout(),
  31. goaway_complete_timeout => timeout(),
  32. idle_timeout => timeout(),
  33. inactivity_timeout => timeout(),
  34. initial_connection_window_size => 65535..16#7fffffff,
  35. initial_stream_window_size => 0..16#7fffffff,
  36. linger_timeout => timeout(),
  37. logger => module(),
  38. max_concurrent_streams => non_neg_integer() | infinity,
  39. max_connection_buffer_size => non_neg_integer(),
  40. max_connection_window_size => 0..16#7fffffff,
  41. max_decode_table_size => non_neg_integer(),
  42. max_encode_table_size => non_neg_integer(),
  43. max_frame_size_received => 16384..16777215,
  44. max_frame_size_sent => 16384..16777215 | infinity,
  45. max_received_frame_rate => {pos_integer(), timeout()},
  46. max_reset_stream_rate => {pos_integer(), timeout()},
  47. max_stream_buffer_size => non_neg_integer(),
  48. max_stream_window_size => 0..16#7fffffff,
  49. metrics_callback => cowboy_metrics_h:metrics_callback(),
  50. metrics_req_filter => fun((cowboy_req:req()) -> map()),
  51. metrics_resp_headers_filter => fun((cowboy:http_headers()) -> cowboy:http_headers()),
  52. middlewares => [module()],
  53. preface_timeout => timeout(),
  54. proxy_header => boolean(),
  55. sendfile => boolean(),
  56. settings_timeout => timeout(),
  57. shutdown_timeout => timeout(),
  58. stream_handlers => [module()],
  59. stream_window_data_threshold => 0..16#7fffffff,
  60. stream_window_margin_size => 0..16#7fffffff,
  61. stream_window_update_threshold => 0..16#7fffffff,
  62. tracer_callback => cowboy_tracer_h:tracer_callback(),
  63. tracer_flags => [atom()],
  64. tracer_match_specs => cowboy_tracer_h:tracer_match_specs(),
  65. %% Open ended because configured stream handlers might add options.
  66. _ => _
  67. }.
  68. -export_type([opts/0]).
  69. -record(stream, {
  70. %% Whether the stream is currently stopping.
  71. status = running :: running | stopping,
  72. %% Flow requested for this stream.
  73. flow = 0 :: non_neg_integer(),
  74. %% Stream state.
  75. state :: {module, any()}
  76. }).
  77. -record(state, {
  78. parent = undefined :: pid(),
  79. ref :: ranch:ref(),
  80. socket = undefined :: inet:socket(),
  81. transport :: module(),
  82. proxy_header :: undefined | ranch_proxy_header:proxy_info(),
  83. opts = #{} :: opts(),
  84. %% Timer for idle_timeout; also used for goaway timers.
  85. timer = undefined :: undefined | reference(),
  86. %% Remote address and port for the connection.
  87. peer = undefined :: {inet:ip_address(), inet:port_number()},
  88. %% Local address and port for the connection.
  89. sock = undefined :: {inet:ip_address(), inet:port_number()},
  90. %% Client certificate (TLS only).
  91. cert :: undefined | binary(),
  92. %% HTTP/2 state machine.
  93. http2_status :: sequence | settings | upgrade | connected | closing_initiated | closing,
  94. http2_machine :: cow_http2_machine:http2_machine(),
  95. %% HTTP/2 frame rate flood protection.
  96. frame_rate_num :: undefined | pos_integer(),
  97. frame_rate_time :: undefined | integer(),
  98. %% HTTP/2 reset stream flood protection.
  99. reset_rate_num :: undefined | pos_integer(),
  100. reset_rate_time :: undefined | integer(),
  101. %% Flow requested for all streams.
  102. flow = 0 :: non_neg_integer(),
  103. %% Currently active HTTP/2 streams. Streams may be initiated either
  104. %% by the client or by the server through PUSH_PROMISE frames.
  105. streams = #{} :: #{cow_http2:streamid() => #stream{}},
  106. %% Streams can spawn zero or more children which are then managed
  107. %% by this module if operating as a supervisor.
  108. children = cowboy_children:init() :: cowboy_children:children()
  109. }).
  110. -spec init(pid(), ranch:ref(), inet:socket(), module(),
  111. ranch_proxy_header:proxy_info() | undefined, cowboy:opts()) -> ok.
  112. init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
  113. Peer0 = Transport:peername(Socket),
  114. Sock0 = Transport:sockname(Socket),
  115. Cert1 = case Transport:name() of
  116. ssl ->
  117. case ssl:peercert(Socket) of
  118. {error, no_peercert} ->
  119. {ok, undefined};
  120. Cert0 ->
  121. Cert0
  122. end;
  123. _ ->
  124. {ok, undefined}
  125. end,
  126. case {Peer0, Sock0, Cert1} of
  127. {{ok, Peer}, {ok, Sock}, {ok, Cert}} ->
  128. init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, <<>>);
  129. {{error, Reason}, _, _} ->
  130. terminate(undefined, {socket_error, Reason,
  131. 'A socket error occurred when retrieving the peer name.'});
  132. {_, {error, Reason}, _} ->
  133. terminate(undefined, {socket_error, Reason,
  134. 'A socket error occurred when retrieving the sock name.'});
  135. {_, _, {error, Reason}} ->
  136. terminate(undefined, {socket_error, Reason,
  137. 'A socket error occurred when retrieving the client TLS certificate.'})
  138. end.
  139. -spec init(pid(), ranch:ref(), inet:socket(), module(),
  140. ranch_proxy_header:proxy_info() | undefined, cowboy:opts(),
  141. {inet:ip_address(), inet:port_number()}, {inet:ip_address(), inet:port_number()},
  142. binary() | undefined, binary()) -> ok.
  143. init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer) ->
  144. {ok, Preface, HTTP2Machine} = cow_http2_machine:init(server, Opts),
  145. State = set_idle_timeout(init_rate_limiting(#state{parent=Parent, ref=Ref, socket=Socket,
  146. transport=Transport, proxy_header=ProxyHeader,
  147. opts=Opts, peer=Peer, sock=Sock, cert=Cert,
  148. http2_status=sequence, http2_machine=HTTP2Machine})),
  149. Transport:send(Socket, Preface),
  150. setopts_active(State),
  151. case Buffer of
  152. <<>> -> loop(State, Buffer);
  153. _ -> parse(State, Buffer)
  154. end.
  155. init_rate_limiting(State) ->
  156. CurrentTime = erlang:monotonic_time(millisecond),
  157. init_reset_rate_limiting(init_frame_rate_limiting(State, CurrentTime), CurrentTime).
  158. init_frame_rate_limiting(State=#state{opts=Opts}, CurrentTime) ->
  159. {FrameRateNum, FrameRatePeriod} = maps:get(max_received_frame_rate, Opts, {10000, 10000}),
  160. State#state{
  161. frame_rate_num=FrameRateNum, frame_rate_time=add_period(CurrentTime, FrameRatePeriod)
  162. }.
  163. init_reset_rate_limiting(State=#state{opts=Opts}, CurrentTime) ->
  164. {ResetRateNum, ResetRatePeriod} = maps:get(max_reset_stream_rate, Opts, {10, 10000}),
  165. State#state{
  166. reset_rate_num=ResetRateNum, reset_rate_time=add_period(CurrentTime, ResetRatePeriod)
  167. }.
  168. add_period(_, infinity) -> infinity;
  169. add_period(Time, Period) -> Time + Period.
  170. %% @todo Add an argument for the request body.
  171. -spec init(pid(), ranch:ref(), inet:socket(), module(),
  172. ranch_proxy_header:proxy_info() | undefined, cowboy:opts(),
  173. {inet:ip_address(), inet:port_number()}, {inet:ip_address(), inet:port_number()},
  174. binary() | undefined, binary(), map() | undefined, cowboy_req:req()) -> ok.
  175. init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer,
  176. _Settings, Req=#{method := Method}) ->
  177. {ok, Preface, HTTP2Machine0} = cow_http2_machine:init(server, Opts),
  178. {ok, StreamID, HTTP2Machine}
  179. = cow_http2_machine:init_upgrade_stream(Method, HTTP2Machine0),
  180. State0 = #state{parent=Parent, ref=Ref, socket=Socket,
  181. transport=Transport, proxy_header=ProxyHeader,
  182. opts=Opts, peer=Peer, sock=Sock, cert=Cert,
  183. http2_status=upgrade, http2_machine=HTTP2Machine},
  184. State1 = headers_frame(State0#state{
  185. http2_machine=HTTP2Machine}, StreamID, Req),
  186. %% We assume that the upgrade will be applied. A stream handler
  187. %% must not prevent the normal operations of the server.
  188. State2 = info(State1, 1, {switch_protocol, #{
  189. <<"connection">> => <<"Upgrade">>,
  190. <<"upgrade">> => <<"h2c">>
  191. }, ?MODULE, undefined}), %% @todo undefined or #{}?
  192. State = set_idle_timeout(init_rate_limiting(State2#state{http2_status=sequence})),
  193. Transport:send(Socket, Preface),
  194. setopts_active(State),
  195. case Buffer of
  196. <<>> -> loop(State, Buffer);
  197. _ -> parse(State, Buffer)
  198. end.
  199. %% Because HTTP/2 has flow control and Cowboy has other rate limiting
  200. %% mechanisms implemented, a very large active_n value should be fine,
  201. %% as long as the stream handlers do their work in a timely manner.
  202. setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
  203. N = maps:get(active_n, Opts, 100),
  204. Transport:setopts(Socket, [{active, N}]).
  205. loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
  206. opts=Opts, timer=TimerRef, children=Children}, Buffer) ->
  207. Messages = Transport:messages(),
  208. InactivityTimeout = maps:get(inactivity_timeout, Opts, 300000),
  209. receive
  210. %% Socket messages.
  211. {OK, Socket, Data} when OK =:= element(1, Messages) ->
  212. parse(set_idle_timeout(State), << Buffer/binary, Data/binary >>);
  213. {Closed, Socket} when Closed =:= element(2, Messages) ->
  214. Reason = case State#state.http2_status of
  215. closing -> {stop, closed, 'The client is going away.'};
  216. _ -> {socket_error, closed, 'The socket has been closed.'}
  217. end,
  218. terminate(State, Reason);
  219. {Error, Socket, Reason} when Error =:= element(3, Messages) ->
  220. terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
  221. {Passive, Socket} when Passive =:= element(4, Messages);
  222. %% Hardcoded for compatibility with Ranch 1.x.
  223. Passive =:= tcp_passive; Passive =:= ssl_passive ->
  224. setopts_active(State),
  225. loop(State, Buffer);
  226. %% System messages.
  227. {'EXIT', Parent, shutdown} ->
  228. Reason = {stop, {exit, shutdown}, 'Parent process requested shutdown.'},
  229. loop(initiate_closing(State, Reason), Buffer);
  230. {'EXIT', Parent, Reason} ->
  231. terminate(State, {stop, {exit, Reason}, 'Parent process terminated.'});
  232. {system, From, Request} ->
  233. sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer});
  234. %% Timeouts.
  235. {timeout, TimerRef, idle_timeout} ->
  236. terminate(State, {stop, timeout,
  237. 'Connection idle longer than configuration allows.'});
  238. {timeout, Ref, {shutdown, Pid}} ->
  239. cowboy_children:shutdown_timeout(Children, Ref, Pid),
  240. loop(State, Buffer);
  241. {timeout, TRef, {cow_http2_machine, Name}} ->
  242. loop(timeout(State, Name, TRef), Buffer);
  243. {timeout, TimerRef, {goaway_initial_timeout, Reason}} ->
  244. loop(closing(State, Reason), Buffer);
  245. {timeout, TimerRef, {goaway_complete_timeout, Reason}} ->
  246. terminate(State, {stop, stop_reason(Reason),
  247. 'Graceful shutdown timed out.'});
  248. %% Messages pertaining to a stream.
  249. {{Pid, StreamID}, Msg} when Pid =:= self() ->
  250. loop(info(State, StreamID, Msg), Buffer);
  251. %% Exit signal from children.
  252. Msg = {'EXIT', Pid, _} ->
  253. loop(down(State, Pid, Msg), Buffer);
  254. %% Calls from supervisor module.
  255. {'$gen_call', From, Call} ->
  256. cowboy_children:handle_supervisor_call(Call, From, Children, ?MODULE),
  257. loop(State, Buffer);
  258. Msg ->
  259. cowboy:log(warning, "Received stray message ~p.", [Msg], Opts),
  260. loop(State, Buffer)
  261. after InactivityTimeout ->
  262. terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
  263. end.
  264. set_idle_timeout(State=#state{http2_status=Status, timer=TimerRef})
  265. when Status =:= closing_initiated orelse Status =:= closing,
  266. TimerRef =/= undefined ->
  267. State;
  268. set_idle_timeout(State=#state{opts=Opts}) ->
  269. set_timeout(State, maps:get(idle_timeout, Opts, 60000), idle_timeout).
  270. set_timeout(State=#state{timer=TimerRef0}, Timeout, Message) ->
  271. ok = case TimerRef0 of
  272. undefined -> ok;
  273. _ -> erlang:cancel_timer(TimerRef0, [{async, true}, {info, false}])
  274. end,
  275. TimerRef = case Timeout of
  276. infinity -> undefined;
  277. Timeout -> erlang:start_timer(Timeout, self(), Message)
  278. end,
  279. State#state{timer=TimerRef}.
  280. %% HTTP/2 protocol parsing.
  281. parse(State=#state{http2_status=sequence}, Data) ->
  282. case cow_http2:parse_sequence(Data) of
  283. {ok, Rest} ->
  284. parse(State#state{http2_status=settings}, Rest);
  285. more ->
  286. loop(State, Data);
  287. Error = {connection_error, _, _} ->
  288. terminate(State, Error)
  289. end;
  290. parse(State=#state{http2_status=Status, http2_machine=HTTP2Machine, streams=Streams}, Data) ->
  291. MaxFrameSize = cow_http2_machine:get_local_setting(max_frame_size, HTTP2Machine),
  292. case cow_http2:parse(Data, MaxFrameSize) of
  293. {ok, Frame, Rest} ->
  294. parse(frame_rate(State, Frame), Rest);
  295. {ignore, Rest} ->
  296. parse(frame_rate(State, ignore), Rest);
  297. {stream_error, StreamID, Reason, Human, Rest} ->
  298. parse(reset_stream(State, StreamID, {stream_error, Reason, Human}), Rest);
  299. Error = {connection_error, _, _} ->
  300. terminate(State, Error);
  301. %% Terminate the connection if we are closing and all streams have completed.
  302. more when Status =:= closing, Streams =:= #{} ->
  303. terminate(State, {stop, normal, 'The connection is going away.'});
  304. more ->
  305. loop(State, Data)
  306. end.
  307. %% Frame rate flood protection.
  308. frame_rate(State0=#state{frame_rate_num=Num0, frame_rate_time=Time}, Frame) ->
  309. {Result, State} = case Num0 - 1 of
  310. 0 ->
  311. CurrentTime = erlang:monotonic_time(millisecond),
  312. if
  313. CurrentTime < Time ->
  314. {error, State0};
  315. true ->
  316. %% When the option has a period of infinity we cannot reach this clause.
  317. {ok, init_frame_rate_limiting(State0, CurrentTime)}
  318. end;
  319. Num ->
  320. {ok, State0#state{frame_rate_num=Num}}
  321. end,
  322. case {Result, Frame} of
  323. {ok, ignore} -> ignored_frame(State);
  324. {ok, _} -> frame(State, Frame);
  325. {error, _} -> terminate(State, {connection_error, enhance_your_calm,
  326. 'Frame rate larger than configuration allows. Flood? (CVE-2019-9512, CVE-2019-9515, CVE-2019-9518)'})
  327. end.
  328. %% Frames received.
  329. %% We do nothing when receiving a lingering DATA frame.
  330. %% We already removed the stream flow from the connection
  331. %% flow and are therefore already accounting for the window
  332. %% being reduced by these frames.
  333. frame(State=#state{http2_machine=HTTP2Machine0}, Frame) ->
  334. case cow_http2_machine:frame(Frame, HTTP2Machine0) of
  335. {ok, HTTP2Machine} ->
  336. maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame);
  337. {ok, {data, StreamID, IsFin, Data}, HTTP2Machine} ->
  338. data_frame(State#state{http2_machine=HTTP2Machine}, StreamID, IsFin, Data);
  339. {ok, {headers, StreamID, IsFin, Headers, PseudoHeaders, BodyLen}, HTTP2Machine} ->
  340. headers_frame(State#state{http2_machine=HTTP2Machine},
  341. StreamID, IsFin, Headers, PseudoHeaders, BodyLen);
  342. {ok, {trailers, _StreamID, _Trailers}, HTTP2Machine} ->
  343. %% @todo Propagate trailers.
  344. State#state{http2_machine=HTTP2Machine};
  345. {ok, {rst_stream, StreamID, Reason}, HTTP2Machine} ->
  346. rst_stream_frame(State#state{http2_machine=HTTP2Machine}, StreamID, Reason);
  347. {ok, GoAway={goaway, _, _, _}, HTTP2Machine} ->
  348. goaway(State#state{http2_machine=HTTP2Machine}, GoAway);
  349. {send, SendData, HTTP2Machine} ->
  350. %% We may need to send an alarm for each of the streams sending data.
  351. lists:foldl(
  352. fun({StreamID, _, _}, S) -> maybe_send_data_alarm(S, HTTP2Machine0, StreamID) end,
  353. send_data(maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame), SendData, []),
  354. SendData);
  355. {error, {stream_error, StreamID, Reason, Human}, HTTP2Machine} ->
  356. reset_stream(State#state{http2_machine=HTTP2Machine},
  357. StreamID, {stream_error, Reason, Human});
  358. {error, Error={connection_error, _, _}, HTTP2Machine} ->
  359. terminate(State#state{http2_machine=HTTP2Machine}, Error)
  360. end.
  361. %% We use this opportunity to mark the HTTP/2 status as connected
  362. %% if we were still waiting for a SETTINGS frame.
  363. maybe_ack(State=#state{http2_status=settings}, Frame) ->
  364. maybe_ack(State#state{http2_status=connected}, Frame);
  365. maybe_ack(State=#state{socket=Socket, transport=Transport}, Frame) ->
  366. case Frame of
  367. {settings, _} -> Transport:send(Socket, cow_http2:settings_ack());
  368. {ping, Opaque} -> Transport:send(Socket, cow_http2:ping_ack(Opaque));
  369. _ -> ok
  370. end,
  371. State.
  372. data_frame(State0=#state{opts=Opts, flow=Flow, streams=Streams}, StreamID, IsFin, Data) ->
  373. case Streams of
  374. #{StreamID := Stream=#stream{status=running, flow=StreamFlow, state=StreamState0}} ->
  375. try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
  376. {Commands, StreamState} ->
  377. %% Remove the amount of data received from the flow.
  378. %% We may receive more data than we requested. We ensure
  379. %% that the flow value doesn't go lower than 0.
  380. Size = byte_size(Data),
  381. State = update_window(State0#state{flow=max(0, Flow - Size),
  382. streams=Streams#{StreamID => Stream#stream{
  383. flow=max(0, StreamFlow - Size), state=StreamState}}},
  384. StreamID),
  385. commands(State, StreamID, Commands)
  386. catch Class:Exception:Stacktrace ->
  387. cowboy:log(cowboy_stream:make_error_log(data,
  388. [StreamID, IsFin, Data, StreamState0],
  389. Class, Exception, Stacktrace), Opts),
  390. reset_stream(State0, StreamID, {internal_error, {Class, Exception},
  391. 'Unhandled exception in cowboy_stream:data/4.'})
  392. end;
  393. %% We ignore DATA frames for streams that are stopping.
  394. #{} ->
  395. State0
  396. end.
  397. headers_frame(State, StreamID, IsFin, Headers,
  398. PseudoHeaders=#{method := <<"CONNECT">>}, _)
  399. when map_size(PseudoHeaders) =:= 2 ->
  400. early_error(State, StreamID, IsFin, Headers, PseudoHeaders, 501,
  401. 'The CONNECT method is currently not implemented. (RFC7231 4.3.6)');
  402. headers_frame(State, StreamID, IsFin, Headers,
  403. PseudoHeaders=#{method := <<"TRACE">>}, _) ->
  404. early_error(State, StreamID, IsFin, Headers, PseudoHeaders, 501,
  405. 'The TRACE method is currently not implemented. (RFC7231 4.3.8)');
  406. headers_frame(State, StreamID, IsFin, Headers, PseudoHeaders=#{authority := Authority}, BodyLen) ->
  407. headers_frame_parse_host(State, StreamID, IsFin, Headers, PseudoHeaders, BodyLen, Authority);
  408. headers_frame(State, StreamID, IsFin, Headers, PseudoHeaders, BodyLen) ->
  409. case lists:keyfind(<<"host">>, 1, Headers) of
  410. {_, Authority} ->
  411. headers_frame_parse_host(State, StreamID, IsFin, Headers, PseudoHeaders, BodyLen, Authority);
  412. _ ->
  413. reset_stream(State, StreamID, {stream_error, protocol_error,
  414. 'Requests translated from HTTP/1.1 must include a host header. (RFC7540 8.1.2.3, RFC7230 5.4)'})
  415. end.
  416. headers_frame_parse_host(State=#state{ref=Ref, peer=Peer, sock=Sock, cert=Cert, proxy_header=ProxyHeader},
  417. StreamID, IsFin, Headers, PseudoHeaders=#{method := Method, scheme := Scheme, path := PathWithQs},
  418. BodyLen, Authority) ->
  419. try cow_http_hd:parse_host(Authority) of
  420. {Host, Port0} ->
  421. Port = ensure_port(Scheme, Port0),
  422. try cow_http:parse_fullpath(PathWithQs) of
  423. {<<>>, _} ->
  424. reset_stream(State, StreamID, {stream_error, protocol_error,
  425. 'The path component must not be empty. (RFC7540 8.1.2.3)'});
  426. {Path, Qs} ->
  427. Req0 = #{
  428. ref => Ref,
  429. pid => self(),
  430. streamid => StreamID,
  431. peer => Peer,
  432. sock => Sock,
  433. cert => Cert,
  434. method => Method,
  435. scheme => Scheme,
  436. host => Host,
  437. port => Port,
  438. path => Path,
  439. qs => Qs,
  440. version => 'HTTP/2',
  441. headers => headers_to_map(Headers, #{}),
  442. has_body => IsFin =:= nofin,
  443. body_length => BodyLen
  444. },
  445. %% We add the PROXY header information if any.
  446. Req1 = case ProxyHeader of
  447. undefined -> Req0;
  448. _ -> Req0#{proxy_header => ProxyHeader}
  449. end,
  450. %% We add the protocol information for extended CONNECTs.
  451. Req = case PseudoHeaders of
  452. #{protocol := Protocol} -> Req1#{protocol => Protocol};
  453. _ -> Req1
  454. end,
  455. headers_frame(State, StreamID, Req)
  456. catch _:_ ->
  457. reset_stream(State, StreamID, {stream_error, protocol_error,
  458. 'The :path pseudo-header is invalid. (RFC7540 8.1.2.3)'})
  459. end
  460. catch _:_ ->
  461. reset_stream(State, StreamID, {stream_error, protocol_error,
  462. 'The :authority pseudo-header is invalid. (RFC7540 8.1.2.3)'})
  463. end.
  464. ensure_port(<<"http">>, undefined) -> 80;
  465. ensure_port(<<"https">>, undefined) -> 443;
  466. ensure_port(_, Port) -> Port.
  467. %% This function is necessary to properly handle duplicate headers
  468. %% and the special-case cookie header.
  469. headers_to_map([], Acc) ->
  470. Acc;
  471. headers_to_map([{Name, Value}|Tail], Acc0) ->
  472. Acc = case Acc0 of
  473. %% The cookie header does not use proper HTTP header lists.
  474. #{Name := Value0} when Name =:= <<"cookie">> ->
  475. Acc0#{Name => << Value0/binary, "; ", Value/binary >>};
  476. #{Name := Value0} ->
  477. Acc0#{Name => << Value0/binary, ", ", Value/binary >>};
  478. _ ->
  479. Acc0#{Name => Value}
  480. end,
  481. headers_to_map(Tail, Acc).
  482. headers_frame(State=#state{opts=Opts, streams=Streams}, StreamID, Req) ->
  483. try cowboy_stream:init(StreamID, Req, Opts) of
  484. {Commands, StreamState} ->
  485. commands(State#state{
  486. streams=Streams#{StreamID => #stream{state=StreamState}}},
  487. StreamID, Commands)
  488. catch Class:Exception:Stacktrace ->
  489. cowboy:log(cowboy_stream:make_error_log(init,
  490. [StreamID, Req, Opts],
  491. Class, Exception, Stacktrace), Opts),
  492. reset_stream(State, StreamID, {internal_error, {Class, Exception},
  493. 'Unhandled exception in cowboy_stream:init/3.'})
  494. end.
  495. early_error(State0=#state{ref=Ref, opts=Opts, peer=Peer},
  496. StreamID, _IsFin, Headers, #{method := Method},
  497. StatusCode0, HumanReadable) ->
  498. %% We automatically terminate the stream but it is not an error
  499. %% per se (at least not in the first implementation).
  500. Reason = {stream_error, no_error, HumanReadable},
  501. %% The partial Req is minimal for now. We only have one case
  502. %% where it can be called (when a method is completely disabled).
  503. %% @todo Fill in the other elements.
  504. PartialReq = #{
  505. ref => Ref,
  506. peer => Peer,
  507. method => Method,
  508. headers => headers_to_map(Headers, #{})
  509. },
  510. Resp = {response, StatusCode0, RespHeaders0=#{<<"content-length">> => <<"0">>}, <<>>},
  511. try cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts) of
  512. {response, StatusCode, RespHeaders, RespBody} ->
  513. send_response(State0, StreamID, StatusCode, RespHeaders, RespBody)
  514. catch Class:Exception:Stacktrace ->
  515. cowboy:log(cowboy_stream:make_error_log(early_error,
  516. [StreamID, Reason, PartialReq, Resp, Opts],
  517. Class, Exception, Stacktrace), Opts),
  518. %% We still need to send an error response, so send what we initially
  519. %% wanted to send. It's better than nothing.
  520. send_headers(State0, StreamID, fin, StatusCode0, RespHeaders0)
  521. end.
  522. rst_stream_frame(State=#state{streams=Streams0, children=Children0}, StreamID, Reason) ->
  523. case maps:take(StreamID, Streams0) of
  524. {#stream{state=StreamState}, Streams} ->
  525. terminate_stream_handler(State, StreamID, Reason, StreamState),
  526. Children = cowboy_children:shutdown(Children0, StreamID),
  527. State#state{streams=Streams, children=Children};
  528. error ->
  529. State
  530. end.
  531. ignored_frame(State=#state{http2_machine=HTTP2Machine0}) ->
  532. case cow_http2_machine:ignored_frame(HTTP2Machine0) of
  533. {ok, HTTP2Machine} ->
  534. State#state{http2_machine=HTTP2Machine};
  535. {error, Error={connection_error, _, _}, HTTP2Machine} ->
  536. terminate(State#state{http2_machine=HTTP2Machine}, Error)
  537. end.
  538. %% HTTP/2 timeouts.
  539. timeout(State=#state{http2_machine=HTTP2Machine0}, Name, TRef) ->
  540. case cow_http2_machine:timeout(Name, TRef, HTTP2Machine0) of
  541. {ok, HTTP2Machine} ->
  542. State#state{http2_machine=HTTP2Machine};
  543. {error, Error={connection_error, _, _}, HTTP2Machine} ->
  544. terminate(State#state{http2_machine=HTTP2Machine}, Error)
  545. end.
  546. %% Erlang messages.
  547. down(State0=#state{opts=Opts, children=Children0}, Pid, Msg) ->
  548. State = case cowboy_children:down(Children0, Pid) of
  549. %% The stream was terminated already.
  550. {ok, undefined, Children} ->
  551. State0#state{children=Children};
  552. %% The stream is still running.
  553. {ok, StreamID, Children} ->
  554. info(State0#state{children=Children}, StreamID, Msg);
  555. %% The process was unknown.
  556. error ->
  557. cowboy:log(warning, "Received EXIT signal ~p for unknown process ~p.~n",
  558. [Msg, Pid], Opts),
  559. State0
  560. end,
  561. if
  562. State#state.http2_status =:= closing, State#state.streams =:= #{} ->
  563. terminate(State, {stop, normal, 'The connection is going away.'});
  564. true ->
  565. State
  566. end.
  567. info(State=#state{opts=Opts, http2_machine=HTTP2Machine, streams=Streams}, StreamID, Msg) ->
  568. case Streams of
  569. #{StreamID := Stream=#stream{state=StreamState0}} ->
  570. try cowboy_stream:info(StreamID, Msg, StreamState0) of
  571. {Commands, StreamState} ->
  572. commands(State#state{streams=Streams#{StreamID => Stream#stream{state=StreamState}}},
  573. StreamID, Commands)
  574. catch Class:Exception:Stacktrace ->
  575. cowboy:log(cowboy_stream:make_error_log(info,
  576. [StreamID, Msg, StreamState0],
  577. Class, Exception, Stacktrace), Opts),
  578. reset_stream(State, StreamID, {internal_error, {Class, Exception},
  579. 'Unhandled exception in cowboy_stream:info/3.'})
  580. end;
  581. _ ->
  582. case cow_http2_machine:is_lingering_stream(StreamID, HTTP2Machine) of
  583. true ->
  584. ok;
  585. false ->
  586. cowboy:log(warning, "Received message ~p for unknown stream ~p.",
  587. [Msg, StreamID], Opts)
  588. end,
  589. State
  590. end.
  591. %% Stream handler commands.
  592. %%
  593. %% @todo Kill the stream if it tries to send a response, headers,
  594. %% data or push promise when the stream is closed or half-closed.
  595. commands(State, _, []) ->
  596. State;
  597. %% Error responses are sent only if a response wasn't sent already.
  598. commands(State=#state{http2_machine=HTTP2Machine}, StreamID,
  599. [{error_response, StatusCode, Headers, Body}|Tail]) ->
  600. case cow_http2_machine:get_stream_local_state(StreamID, HTTP2Machine) of
  601. {ok, idle, _} ->
  602. commands(State, StreamID, [{response, StatusCode, Headers, Body}|Tail]);
  603. _ ->
  604. commands(State, StreamID, Tail)
  605. end;
  606. %% Send an informational response.
  607. commands(State0, StreamID, [{inform, StatusCode, Headers}|Tail]) ->
  608. State = send_headers(State0, StreamID, idle, StatusCode, Headers),
  609. commands(State, StreamID, Tail);
  610. %% Send response headers.
  611. commands(State0, StreamID, [{response, StatusCode, Headers, Body}|Tail]) ->
  612. State = send_response(State0, StreamID, StatusCode, Headers, Body),
  613. commands(State, StreamID, Tail);
  614. %% Send response headers.
  615. commands(State0, StreamID, [{headers, StatusCode, Headers}|Tail]) ->
  616. State = send_headers(State0, StreamID, nofin, StatusCode, Headers),
  617. commands(State, StreamID, Tail);
  618. %% Send a response body chunk.
  619. commands(State0, StreamID, [{data, IsFin, Data}|Tail]) ->
  620. State = maybe_send_data(State0, StreamID, IsFin, Data, []),
  621. commands(State, StreamID, Tail);
  622. %% Send trailers.
  623. commands(State0, StreamID, [{trailers, Trailers}|Tail]) ->
  624. State = maybe_send_data(State0, StreamID, fin, {trailers, maps:to_list(Trailers)}, []),
  625. commands(State, StreamID, Tail);
  626. %% Send a push promise.
  627. %%
  628. %% @todo Responses sent as a result of a push_promise request
  629. %% must not send push_promise frames themselves.
  630. %%
  631. %% @todo We should not send push_promise frames when we are
  632. %% in the closing http2_status.
  633. commands(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0},
  634. StreamID, [{push, Method, Scheme, Host, Port, Path, Qs, Headers0}|Tail]) ->
  635. Authority = case {Scheme, Port} of
  636. {<<"http">>, 80} -> Host;
  637. {<<"https">>, 443} -> Host;
  638. _ -> iolist_to_binary([Host, $:, integer_to_binary(Port)])
  639. end,
  640. PathWithQs = iolist_to_binary(case Qs of
  641. <<>> -> Path;
  642. _ -> [Path, $?, Qs]
  643. end),
  644. PseudoHeaders = #{
  645. method => Method,
  646. scheme => Scheme,
  647. authority => Authority,
  648. path => PathWithQs
  649. },
  650. %% We need to make sure the header value is binary before we can
  651. %% create the Req object, as it expects them to be flat.
  652. Headers = maps:to_list(maps:map(fun(_, V) -> iolist_to_binary(V) end, Headers0)),
  653. State = case cow_http2_machine:prepare_push_promise(StreamID, HTTP2Machine0,
  654. PseudoHeaders, Headers) of
  655. {ok, PromisedStreamID, HeaderBlock, HTTP2Machine} ->
  656. Transport:send(Socket, cow_http2:push_promise(
  657. StreamID, PromisedStreamID, HeaderBlock)),
  658. headers_frame(State0#state{http2_machine=HTTP2Machine},
  659. PromisedStreamID, fin, Headers, PseudoHeaders, 0);
  660. {error, no_push} ->
  661. State0
  662. end,
  663. commands(State, StreamID, Tail);
  664. %% Read the request body.
  665. commands(State0=#state{flow=Flow, streams=Streams}, StreamID, [{flow, Size}|Tail]) ->
  666. #{StreamID := Stream=#stream{flow=StreamFlow}} = Streams,
  667. State = update_window(State0#state{flow=Flow + Size,
  668. streams=Streams#{StreamID => Stream#stream{flow=StreamFlow + Size}}},
  669. StreamID),
  670. commands(State, StreamID, Tail);
  671. %% Supervise a child process.
  672. commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) ->
  673. commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)},
  674. StreamID, Tail);
  675. %% Error handling.
  676. commands(State, StreamID, [Error = {internal_error, _, _}|_Tail]) ->
  677. %% @todo Do we want to run the commands after an internal_error?
  678. %% @todo Do we even allow commands after?
  679. %% @todo Only reset when the stream still exists.
  680. reset_stream(State, StreamID, Error);
  681. %% Upgrade to HTTP/2. This is triggered by cowboy_http2 itself.
  682. commands(State=#state{socket=Socket, transport=Transport, http2_status=upgrade},
  683. StreamID, [{switch_protocol, Headers, ?MODULE, _}|Tail]) ->
  684. %% @todo This 101 response needs to be passed through stream handlers.
  685. Transport:send(Socket, cow_http:response(101, 'HTTP/1.1', maps:to_list(Headers))),
  686. commands(State, StreamID, Tail);
  687. %% Use a different protocol within the stream (CONNECT :protocol).
  688. %% @todo Make sure we error out when the feature is disabled.
  689. commands(State0, StreamID, [{switch_protocol, Headers, _Mod, _ModState}|Tail]) ->
  690. State = info(State0, StreamID, {headers, 200, Headers}),
  691. commands(State, StreamID, Tail);
  692. %% Set options dynamically.
  693. commands(State, StreamID, [{set_options, _Opts}|Tail]) ->
  694. commands(State, StreamID, Tail);
  695. commands(State, StreamID, [stop|_Tail]) ->
  696. %% @todo Do we want to run the commands after a stop?
  697. %% @todo Do we even allow commands after?
  698. stop_stream(State, StreamID);
  699. %% Log event.
  700. commands(State=#state{opts=Opts}, StreamID, [Log={log, _, _, _}|Tail]) ->
  701. cowboy:log(Log, Opts),
  702. commands(State, StreamID, Tail).
  703. %% Tentatively update the window after the flow was updated.
  704. update_window(State=#state{socket=Socket, transport=Transport,
  705. http2_machine=HTTP2Machine0, flow=Flow, streams=Streams}, StreamID) ->
  706. #{StreamID := #stream{flow=StreamFlow}} = Streams,
  707. {Data1, HTTP2Machine2} = case cow_http2_machine:ensure_window(Flow, HTTP2Machine0) of
  708. ok -> {<<>>, HTTP2Machine0};
  709. {ok, Increment1, HTTP2Machine1} -> {cow_http2:window_update(Increment1), HTTP2Machine1}
  710. end,
  711. {Data2, HTTP2Machine} = case cow_http2_machine:ensure_window(StreamID, StreamFlow, HTTP2Machine2) of
  712. ok -> {<<>>, HTTP2Machine2};
  713. {ok, Increment2, HTTP2Machine3} -> {cow_http2:window_update(StreamID, Increment2), HTTP2Machine3}
  714. end,
  715. case {Data1, Data2} of
  716. {<<>>, <<>>} -> ok;
  717. _ -> Transport:send(Socket, [Data1, Data2])
  718. end,
  719. State#state{http2_machine=HTTP2Machine}.
  720. %% Send the response, trailers or data.
  721. send_response(State0=#state{http2_machine=HTTP2Machine0}, StreamID, StatusCode, Headers, Body) ->
  722. Size = case Body of
  723. {sendfile, _, Bytes, _} -> Bytes;
  724. _ -> iolist_size(Body)
  725. end,
  726. case Size of
  727. 0 ->
  728. State = send_headers(State0, StreamID, fin, StatusCode, Headers),
  729. maybe_terminate_stream(State, StreamID, fin);
  730. _ ->
  731. %% @todo Add a test for HEAD to make sure we don't send the body when
  732. %% returning {response...} from a stream handler (or {headers...} then {data...}).
  733. {ok, _IsFin, HeaderBlock, HTTP2Machine}
  734. = cow_http2_machine:prepare_headers(StreamID, HTTP2Machine0, nofin,
  735. #{status => cow_http:status_to_integer(StatusCode)},
  736. headers_to_list(Headers)),
  737. maybe_send_data(State0#state{http2_machine=HTTP2Machine}, StreamID, fin, Body,
  738. [cow_http2:headers(StreamID, nofin, HeaderBlock)])
  739. end.
  740. send_headers(State=#state{socket=Socket, transport=Transport,
  741. http2_machine=HTTP2Machine0}, StreamID, IsFin0, StatusCode, Headers) ->
  742. {ok, IsFin, HeaderBlock, HTTP2Machine}
  743. = cow_http2_machine:prepare_headers(StreamID, HTTP2Machine0, IsFin0,
  744. #{status => cow_http:status_to_integer(StatusCode)},
  745. headers_to_list(Headers)),
  746. Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)),
  747. State#state{http2_machine=HTTP2Machine}.
  748. %% The set-cookie header is special; we can only send one cookie per header.
  749. headers_to_list(Headers0=#{<<"set-cookie">> := SetCookies}) ->
  750. Headers = maps:to_list(maps:remove(<<"set-cookie">>, Headers0)),
  751. Headers ++ [{<<"set-cookie">>, Value} || Value <- SetCookies];
  752. headers_to_list(Headers) ->
  753. maps:to_list(Headers).
  754. maybe_send_data(State0=#state{socket=Socket, transport=Transport,
  755. http2_machine=HTTP2Machine0}, StreamID, IsFin, Data0, Prefix) ->
  756. Data = case is_tuple(Data0) of
  757. false -> {data, Data0};
  758. true -> Data0
  759. end,
  760. case cow_http2_machine:send_or_queue_data(StreamID, HTTP2Machine0, IsFin, Data) of
  761. {ok, HTTP2Machine} ->
  762. %% If we have prefix data (like a HEADERS frame) we need to send it
  763. %% even if we do not send any DATA frames.
  764. case Prefix of
  765. [] -> ok;
  766. _ -> Transport:send(Socket, Prefix)
  767. end,
  768. maybe_send_data_alarm(State0#state{http2_machine=HTTP2Machine}, HTTP2Machine0, StreamID);
  769. {send, SendData, HTTP2Machine} ->
  770. State = #state{http2_status=Status, streams=Streams}
  771. = send_data(State0#state{http2_machine=HTTP2Machine}, SendData, Prefix),
  772. %% Terminate the connection if we are closing and all streams have completed.
  773. if
  774. Status =:= closing, Streams =:= #{} ->
  775. terminate(State, {stop, normal, 'The connection is going away.'});
  776. true ->
  777. maybe_send_data_alarm(State, HTTP2Machine0, StreamID)
  778. end
  779. end.
  780. send_data(State0=#state{socket=Socket, transport=Transport, opts=Opts}, SendData, Prefix) ->
  781. {Acc, State} = prepare_data(State0, SendData, [], Prefix),
  782. _ = [case Data of
  783. {sendfile, Offset, Bytes, Path} ->
  784. %% When sendfile is disabled we explicitly use the fallback.
  785. _ = case maps:get(sendfile, Opts, true) of
  786. true -> Transport:sendfile(Socket, Path, Offset, Bytes);
  787. false -> ranch_transport:sendfile(Transport, Socket, Path, Offset, Bytes, [])
  788. end;
  789. _ ->
  790. Transport:send(Socket, Data)
  791. end || Data <- Acc],
  792. send_data_terminate(State, SendData).
  793. send_data_terminate(State, []) ->
  794. State;
  795. send_data_terminate(State0, [{StreamID, IsFin, _}|Tail]) ->
  796. State = maybe_terminate_stream(State0, StreamID, IsFin),
  797. send_data_terminate(State, Tail).
  798. prepare_data(State, [], Acc, []) ->
  799. {lists:reverse(Acc), State};
  800. prepare_data(State, [], Acc, Buffer) ->
  801. {lists:reverse([lists:reverse(Buffer)|Acc]), State};
  802. prepare_data(State0, [{StreamID, IsFin, SendData}|Tail], Acc0, Buffer0) ->
  803. {Acc, Buffer, State} = prepare_data(State0, StreamID, IsFin, SendData, Acc0, Buffer0),
  804. prepare_data(State, Tail, Acc, Buffer).
  805. prepare_data(State, _, _, [], Acc, Buffer) ->
  806. {Acc, Buffer, State};
  807. prepare_data(State0, StreamID, IsFin, [FrameData|Tail], Acc, Buffer) ->
  808. FrameIsFin = case Tail of
  809. [] -> IsFin;
  810. _ -> nofin
  811. end,
  812. case prepare_data_frame(State0, StreamID, FrameIsFin, FrameData) of
  813. {{MoreData, Sendfile}, State} when is_tuple(Sendfile) ->
  814. case Buffer of
  815. [] ->
  816. prepare_data(State, StreamID, IsFin, Tail,
  817. [Sendfile, MoreData|Acc], []);
  818. _ ->
  819. prepare_data(State, StreamID, IsFin, Tail,
  820. [Sendfile, lists:reverse([MoreData|Buffer])|Acc], [])
  821. end;
  822. {MoreData, State} ->
  823. prepare_data(State, StreamID, IsFin, Tail,
  824. Acc, [MoreData|Buffer])
  825. end.
  826. prepare_data_frame(State, StreamID, IsFin, {data, Data}) ->
  827. {cow_http2:data(StreamID, IsFin, Data),
  828. State};
  829. prepare_data_frame(State, StreamID, IsFin, Sendfile={sendfile, _, Bytes, _}) ->
  830. {{cow_http2:data_header(StreamID, IsFin, Bytes), Sendfile},
  831. State};
  832. %% The stream is terminated in cow_http2_machine:prepare_trailers.
  833. prepare_data_frame(State=#state{http2_machine=HTTP2Machine0},
  834. StreamID, nofin, {trailers, Trailers}) ->
  835. {ok, HeaderBlock, HTTP2Machine}
  836. = cow_http2_machine:prepare_trailers(StreamID, HTTP2Machine0, Trailers),
  837. {cow_http2:headers(StreamID, fin, HeaderBlock),
  838. State#state{http2_machine=HTTP2Machine}}.
  839. %% After we have sent or queued data we may need to set or clear an alarm.
  840. %% We do this by comparing the HTTP2Machine buffer state before/after for
  841. %% the relevant streams.
  842. maybe_send_data_alarm(State=#state{opts=Opts, http2_machine=HTTP2Machine}, HTTP2Machine0, StreamID) ->
  843. ConnBufferSizeBefore = cow_http2_machine:get_connection_local_buffer_size(HTTP2Machine0),
  844. ConnBufferSizeAfter = cow_http2_machine:get_connection_local_buffer_size(HTTP2Machine),
  845. {ok, StreamBufferSizeBefore} = cow_http2_machine:get_stream_local_buffer_size(StreamID, HTTP2Machine0),
  846. %% When the stream ends up closed after it finished sending data,
  847. %% we do not want to trigger an alarm. We act as if the buffer
  848. %% size did not change.
  849. StreamBufferSizeAfter = case cow_http2_machine:get_stream_local_buffer_size(StreamID, HTTP2Machine) of
  850. {ok, BSA} -> BSA;
  851. {error, closed} -> StreamBufferSizeBefore
  852. end,
  853. MaxConnBufferSize = maps:get(max_connection_buffer_size, Opts, 16000000),
  854. MaxStreamBufferSize = maps:get(max_stream_buffer_size, Opts, 8000000),
  855. %% I do not want to document these internal events yet. I am not yet
  856. %% convinced it should be {alarm, Name, on|off} and not {internal_event, E}
  857. %% or something else entirely. Though alarms are probably right.
  858. if
  859. ConnBufferSizeBefore >= MaxConnBufferSize, ConnBufferSizeAfter < MaxConnBufferSize ->
  860. connection_alarm(State, connection_buffer_full, off);
  861. ConnBufferSizeBefore < MaxConnBufferSize, ConnBufferSizeAfter >= MaxConnBufferSize ->
  862. connection_alarm(State, connection_buffer_full, on);
  863. StreamBufferSizeBefore >= MaxStreamBufferSize, StreamBufferSizeAfter < MaxStreamBufferSize ->
  864. stream_alarm(State, StreamID, stream_buffer_full, off);
  865. StreamBufferSizeBefore < MaxStreamBufferSize, StreamBufferSizeAfter >= MaxStreamBufferSize ->
  866. stream_alarm(State, StreamID, stream_buffer_full, on);
  867. true ->
  868. State
  869. end.
  870. connection_alarm(State0=#state{streams=Streams}, Name, Value) ->
  871. lists:foldl(fun(StreamID, State) ->
  872. stream_alarm(State, StreamID, Name, Value)
  873. end, State0, maps:keys(Streams)).
  874. stream_alarm(State, StreamID, Name, Value) ->
  875. info(State, StreamID, {alarm, Name, Value}).
  876. %% Terminate a stream or the connection.
  877. %% We may have to cancel streams even if we receive multiple
  878. %% GOAWAY frames as the LastStreamID value may be lower than
  879. %% the one previously received.
  880. goaway(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0,
  881. http2_status=Status, streams=Streams0}, {goaway, LastStreamID, Reason, _})
  882. when Status =:= connected; Status =:= closing_initiated; Status =:= closing ->
  883. Streams = goaway_streams(State0, maps:to_list(Streams0), LastStreamID,
  884. {stop, {goaway, Reason}, 'The connection is going away.'}, []),
  885. State = State0#state{streams=maps:from_list(Streams)},
  886. if
  887. Status =:= connected; Status =:= closing_initiated ->
  888. {OurLastStreamID, HTTP2Machine} =
  889. cow_http2_machine:set_last_streamid(HTTP2Machine0),
  890. Transport:send(Socket, cow_http2:goaway(
  891. OurLastStreamID, no_error, <<>>)),
  892. State#state{http2_status=closing,
  893. http2_machine=HTTP2Machine};
  894. true ->
  895. State
  896. end;
  897. %% We terminate the connection immediately if it hasn't fully been initialized.
  898. goaway(State, {goaway, _, Reason, _}) ->
  899. terminate(State, {stop, {goaway, Reason}, 'The connection is going away.'}).
  900. %% Cancel client-initiated streams that are above LastStreamID.
  901. goaway_streams(_, [], _, _, Acc) ->
  902. Acc;
  903. goaway_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], LastStreamID, Reason, Acc)
  904. when StreamID > LastStreamID, (StreamID rem 2) =:= 0 ->
  905. terminate_stream_handler(State, StreamID, Reason, StreamState),
  906. goaway_streams(State, Tail, LastStreamID, Reason, Acc);
  907. goaway_streams(State, [Stream|Tail], LastStreamID, Reason, Acc) ->
  908. goaway_streams(State, Tail, LastStreamID, Reason, [Stream|Acc]).
  909. %% A server that is attempting to gracefully shut down a connection SHOULD send
  910. %% an initial GOAWAY frame with the last stream identifier set to 2^31-1 and a
  911. %% NO_ERROR code. This signals to the client that a shutdown is imminent and
  912. %% that initiating further requests is prohibited. After allowing time for any
  913. %% in-flight stream creation (at least one round-trip time), the server can send
  914. %% another GOAWAY frame with an updated last stream identifier. This ensures
  915. %% that a connection can be cleanly shut down without losing requests.
  916. -spec initiate_closing(#state{}, _) -> #state{}.
  917. initiate_closing(State=#state{http2_status=connected, socket=Socket,
  918. transport=Transport, opts=Opts}, Reason) ->
  919. Transport:send(Socket, cow_http2:goaway(16#7fffffff, no_error, <<>>)),
  920. Timeout = maps:get(goaway_initial_timeout, Opts, 1000),
  921. Message = {goaway_initial_timeout, Reason},
  922. set_timeout(State#state{http2_status=closing_initiated}, Timeout, Message);
  923. initiate_closing(State=#state{http2_status=Status}, _Reason)
  924. when Status =:= closing_initiated; Status =:= closing ->
  925. %% This happens if sys:terminate/2,3 is called twice or if the supervisor
  926. %% tells us to shutdown after sys:terminate/2,3 is called or vice versa.
  927. State;
  928. initiate_closing(State, Reason) ->
  929. terminate(State, {stop, stop_reason(Reason), 'The connection is going away.'}).
  930. %% Switch to 'closing' state and stop accepting new streams.
  931. -spec closing(#state{}, Reason :: term()) -> #state{}.
  932. closing(State=#state{streams=Streams}, Reason) when Streams =:= #{} ->
  933. terminate(State, Reason);
  934. closing(State=#state{http2_status=closing_initiated,
  935. http2_machine=HTTP2Machine0, socket=Socket, transport=Transport},
  936. Reason) ->
  937. %% Stop accepting new streams.
  938. {LastStreamID, HTTP2Machine} =
  939. cow_http2_machine:set_last_streamid(HTTP2Machine0),
  940. Transport:send(Socket, cow_http2:goaway(LastStreamID, no_error, <<>>)),
  941. closing(State#state{http2_status=closing, http2_machine=HTTP2Machine}, Reason);
  942. closing(State=#state{http2_status=closing, opts=Opts}, Reason) ->
  943. %% If client sent GOAWAY, we may already be in 'closing' but without the
  944. %% goaway complete timeout set.
  945. Timeout = maps:get(goaway_complete_timeout, Opts, 3000),
  946. Message = {goaway_complete_timeout, Reason},
  947. set_timeout(State, Timeout, Message).
  948. stop_reason({stop, Reason, _}) -> Reason;
  949. stop_reason(Reason) -> Reason.
  950. -spec terminate(#state{}, _) -> no_return().
  951. terminate(undefined, Reason) ->
  952. exit({shutdown, Reason});
  953. terminate(State=#state{socket=Socket, transport=Transport, http2_status=Status,
  954. http2_machine=HTTP2Machine, streams=Streams, children=Children}, Reason)
  955. when Status =:= connected; Status =:= closing_initiated; Status =:= closing ->
  956. %% @todo We might want to optionally send the Reason value
  957. %% as debug data in the GOAWAY frame here. Perhaps more.
  958. if
  959. Status =:= connected; Status =:= closing_initiated ->
  960. Transport:send(Socket, cow_http2:goaway(
  961. cow_http2_machine:get_last_streamid(HTTP2Machine),
  962. terminate_reason(Reason), <<>>));
  963. %% We already sent the GOAWAY frame.
  964. Status =:= closing ->
  965. ok
  966. end,
  967. terminate_all_streams(State, maps:to_list(Streams), Reason),
  968. cowboy_children:terminate(Children),
  969. terminate_linger(State),
  970. exit({shutdown, Reason});
  971. terminate(#state{socket=Socket, transport=Transport}, Reason) ->
  972. Transport:close(Socket),
  973. exit({shutdown, Reason}).
  974. terminate_reason({connection_error, Reason, _}) -> Reason;
  975. terminate_reason({stop, _, _}) -> no_error;
  976. terminate_reason({socket_error, _, _}) -> internal_error;
  977. terminate_reason({internal_error, _, _}) -> internal_error.
  978. terminate_all_streams(_, [], _) ->
  979. ok;
  980. terminate_all_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], Reason) ->
  981. terminate_stream_handler(State, StreamID, Reason, StreamState),
  982. terminate_all_streams(State, Tail, Reason).
  983. %% This code is copied from cowboy_http.
  984. terminate_linger(State=#state{socket=Socket, transport=Transport, opts=Opts}) ->
  985. case Transport:shutdown(Socket, write) of
  986. ok ->
  987. case maps:get(linger_timeout, Opts, 1000) of
  988. 0 ->
  989. ok;
  990. infinity ->
  991. terminate_linger_before_loop(State, undefined, Transport:messages());
  992. Timeout ->
  993. TimerRef = erlang:start_timer(Timeout, self(), linger_timeout),
  994. terminate_linger_before_loop(State, TimerRef, Transport:messages())
  995. end;
  996. {error, _} ->
  997. ok
  998. end.
  999. terminate_linger_before_loop(State, TimerRef, Messages) ->
  1000. %% We may already be in active mode when we do this
  1001. %% but it's OK because we are shutting down anyway.
  1002. case setopts_active(State) of
  1003. ok ->
  1004. terminate_linger_loop(State, TimerRef, Messages);
  1005. {error, _} ->
  1006. ok
  1007. end.
  1008. terminate_linger_loop(State=#state{socket=Socket}, TimerRef, Messages) ->
  1009. receive
  1010. {OK, Socket, _} when OK =:= element(1, Messages) ->
  1011. terminate_linger_loop(State, TimerRef, Messages);
  1012. {Closed, Socket} when Closed =:= element(2, Messages) ->
  1013. ok;
  1014. {Error, Socket, _} when Error =:= element(3, Messages) ->
  1015. ok;
  1016. {Passive, Socket} when Passive =:= tcp_passive; Passive =:= ssl_passive ->
  1017. terminate_linger_before_loop(State, TimerRef, Messages);
  1018. {timeout, TimerRef, linger_timeout} ->
  1019. ok;
  1020. _ ->
  1021. terminate_linger_loop(State, TimerRef, Messages)
  1022. end.
  1023. %% @todo Don't send an RST_STREAM if one was already sent.
  1024. reset_stream(State0=#state{socket=Socket, transport=Transport,
  1025. http2_machine=HTTP2Machine0}, StreamID, Error) ->
  1026. Reason = case Error of
  1027. {internal_error, _, _} -> internal_error;
  1028. {stream_error, Reason0, _} -> Reason0
  1029. end,
  1030. Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)),
  1031. State1 = case cow_http2_machine:reset_stream(StreamID, HTTP2Machine0) of
  1032. {ok, HTTP2Machine} ->
  1033. terminate_stream(State0#state{http2_machine=HTTP2Machine}, StreamID, Error);
  1034. {error, not_found} ->
  1035. terminate_stream(State0, StreamID, Error)
  1036. end,
  1037. case reset_rate(State1) of
  1038. {ok, State} ->
  1039. State;
  1040. error ->
  1041. terminate(State1, {connection_error, enhance_your_calm,
  1042. 'Stream reset rate larger than configuration allows. Flood? (CVE-2019-9514)'})
  1043. end.
  1044. reset_rate(State0=#state{reset_rate_num=Num0, reset_rate_time=Time}) ->
  1045. case Num0 - 1 of
  1046. 0 ->
  1047. CurrentTime = erlang:monotonic_time(millisecond),
  1048. if
  1049. CurrentTime < Time ->
  1050. error;
  1051. true ->
  1052. %% When the option has a period of infinity we cannot reach this clause.
  1053. {ok, init_reset_rate_limiting(State0, CurrentTime)}
  1054. end;
  1055. Num ->
  1056. {ok, State0#state{reset_rate_num=Num}}
  1057. end.
  1058. stop_stream(State=#state{http2_machine=HTTP2Machine}, StreamID) ->
  1059. case cow_http2_machine:get_stream_local_state(StreamID, HTTP2Machine) of
  1060. %% When the stream terminates normally (without sending RST_STREAM)
  1061. %% and no response was sent, we need to send a proper response back to the client.
  1062. %% We delay the termination of the stream until the response is fully sent.
  1063. {ok, idle, _} ->
  1064. info(stopping(State, StreamID), StreamID, {response, 204, #{}, <<>>});
  1065. %% When a response was sent but not terminated, we need to close the stream.
  1066. %% We delay the termination of the stream until the response is fully sent.
  1067. {ok, nofin, fin} ->
  1068. stopping(State, StreamID);
  1069. %% We only send a final DATA frame if there isn't one queued yet.
  1070. {ok, nofin, _} ->
  1071. info(stopping(State, StreamID), StreamID, {data, fin, <<>>});
  1072. %% When a response was sent fully we can terminate the stream,
  1073. %% regardless of the stream being in half-closed or closed state.
  1074. _ ->
  1075. terminate_stream(State, StreamID)
  1076. end.
  1077. stopping(State=#state{streams=Streams}, StreamID) ->
  1078. #{StreamID := Stream} = Streams,
  1079. State#state{streams=Streams#{StreamID => Stream#stream{status=stopping}}}.
  1080. %% If we finished sending data and the stream is stopping, terminate it.
  1081. maybe_terminate_stream(State=#state{streams=Streams}, StreamID, fin) ->
  1082. case Streams of
  1083. #{StreamID := #stream{status=stopping}} ->
  1084. terminate_stream(State, StreamID);
  1085. _ ->
  1086. State
  1087. end;
  1088. maybe_terminate_stream(State, _, _) ->
  1089. State.
  1090. %% When the stream stops normally without reading the request
  1091. %% body fully we need to tell the client to stop sending it.
  1092. %% We do this by sending an RST_STREAM with reason NO_ERROR. (RFC7540 8.1.0)
  1093. terminate_stream(State0=#state{socket=Socket, transport=Transport,
  1094. http2_machine=HTTP2Machine0}, StreamID) ->
  1095. State = case cow_http2_machine:get_stream_local_state(StreamID, HTTP2Machine0) of
  1096. {ok, fin, _} ->
  1097. Transport:send(Socket, cow_http2:rst_stream(StreamID, no_error)),
  1098. {ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0),
  1099. State0#state{http2_machine=HTTP2Machine};
  1100. {error, closed} ->
  1101. State0
  1102. end,
  1103. terminate_stream(State, StreamID, normal).
  1104. %% We remove the stream flow from the connection flow. Any further
  1105. %% data received for this stream is therefore fully contained within
  1106. %% the extra window we allocated for this stream.
  1107. terminate_stream(State=#state{flow=Flow, streams=Streams0, children=Children0}, StreamID, Reason) ->
  1108. case maps:take(StreamID, Streams0) of
  1109. {#stream{flow=StreamFlow, state=StreamState}, Streams} ->
  1110. terminate_stream_handler(State, StreamID, Reason, StreamState),
  1111. Children = cowboy_children:shutdown(Children0, StreamID),
  1112. State#state{flow=Flow - StreamFlow, streams=Streams, children=Children};
  1113. error ->
  1114. State
  1115. end.
  1116. terminate_stream_handler(#state{opts=Opts}, StreamID, Reason, StreamState) ->
  1117. try
  1118. cowboy_stream:terminate(StreamID, Reason, StreamState)
  1119. catch Class:Exception:Stacktrace ->
  1120. cowboy:log(cowboy_stream:make_error_log(terminate,
  1121. [StreamID, Reason, StreamState],
  1122. Class, Exception, Stacktrace), Opts)
  1123. end.
  1124. %% System callbacks.
  1125. -spec system_continue(_, _, {#state{}, binary()}) -> ok.
  1126. system_continue(_, _, {State, Buffer}) ->
  1127. loop(State, Buffer).
  1128. -spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return().
  1129. system_terminate(Reason0, _, _, {State, Buffer}) ->
  1130. Reason = {stop, {exit, Reason0}, 'sys:terminate/2,3 was called.'},
  1131. loop(initiate_closing(State, Reason), Buffer).
  1132. -spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}.
  1133. system_code_change(Misc, _, _, _) ->
  1134. {ok, Misc}.