cowboy_http3.erl 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887
  1. %% Copyright (c) 2023, Loïc Hoguin <essen@ninenines.eu>
  2. %%
  3. %% Permission to use, copy, modify, and/or distribute this software for any
  4. %% purpose with or without fee is hereby granted, provided that the above
  5. %% copyright notice and this permission notice appear in all copies.
  6. %%
  7. %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  8. %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  9. %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  10. %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  11. %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  12. %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  13. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  14. %% A key difference between cowboy_http2 and cowboy_http3
  15. %% is that HTTP/3 streams are QUIC streams and therefore
  16. %% much of the connection state is handled outside of
  17. %% Cowboy.
  18. -module(cowboy_http3).
  19. -export([init/3]).
  20. %% Temporary callback to do sendfile over QUIC.
  21. -export([send/2]).
  22. -record(stream, {
  23. id :: non_neg_integer(), %% @todo specs
  24. %% Whether the stream is currently in a special state.
  25. status :: header | normal | {data, non_neg_integer()} | stopping,
  26. %% Stream buffer.
  27. buffer = <<>> :: binary(),
  28. %% Stream state.
  29. state = undefined :: undefined | {module, any()}
  30. }).
  31. -record(state, {
  32. parent :: pid(),
  33. conn :: any(), %% @todo specs
  34. opts = #{} :: any(), %% @todo opts(),
  35. %% Remote address and port for the connection.
  36. peer = undefined :: {inet:ip_address(), inet:port_number()},
  37. %% Local address and port for the connection.
  38. sock = undefined :: {inet:ip_address(), inet:port_number()},
  39. %% HTTP/3 state machine.
  40. http3_machine :: cow_http3_machine:http3_machine(),
  41. %% Quick pointers for commonly used streams.
  42. local_control_id :: any(), %% @todo specs Control stream must not be closed.
  43. local_encoder_id :: any(), %% @todo specs
  44. local_decoder_id :: any(), %% @todo specs
  45. %% Bidirectional streams used for requests and responses,
  46. %% as well as unidirectional streams initiated by the client.
  47. streams = #{} :: map(), %% @todo specs
  48. %% @todo a ref/id map because stream_closed we don't have the id
  49. %% Lingering streams that were recently reset. We may receive
  50. %% pending data or messages for these streams a short while
  51. %% after they have been reset.
  52. lingering_streams = [] :: [non_neg_integer()],
  53. %% Streams can spawn zero or more children which are then managed
  54. %% by this module if operating as a supervisor.
  55. children = cowboy_children:init() :: cowboy_children:children()
  56. }).
  57. -spec init(_, _, _) -> no_return().
  58. init(Parent, Conn, Opts) ->
  59. %ct:pal("init"),
  60. {ok, SettingsBin, HTTP3Machine0} = cow_http3_machine:init(server, Opts),
  61. %% Immediately open a control, encoder and decoder stream.
  62. {ok, ControlID} = cowboy_quicer:start_unidi_stream(Conn, [<<0>>, SettingsBin]),
  63. {ok, EncoderID} = cowboy_quicer:start_unidi_stream(Conn, <<2>>),
  64. {ok, DecoderID} = cowboy_quicer:start_unidi_stream(Conn, <<3>>),
  65. %ct:pal("control ~p encoder ~p decoder ~p", [ControlID, EncoderID, DecoderID]),
  66. %% Set the control, encoder and decoder streams in the machine.
  67. HTTP3Machine = cow_http3_machine:init_unidi_local_streams(
  68. ControlID, EncoderID, DecoderID, HTTP3Machine0),
  69. %% Get the peername/sockname.
  70. Peer0 = cowboy_quicer:peername(Conn),
  71. Sock0 = cowboy_quicer:sockname(Conn),
  72. %% @todo Get the peer certificate here if it makes sense.
  73. case {Peer0, Sock0} of
  74. {{ok, Peer}, {ok, Sock}} ->
  75. %% Quick! Let's go!
  76. loop(#state{parent=Parent, conn=Conn, opts=Opts,
  77. peer=Peer, sock=Sock, http3_machine=HTTP3Machine,
  78. local_control_id=ControlID,
  79. local_encoder_id=EncoderID,
  80. local_decoder_id=DecoderID});
  81. {{error, Reason}, _} ->
  82. terminate(undefined, {socket_error, Reason,
  83. 'A socket error occurred when retrieving the peer name.'});
  84. {_, {error, Reason}} ->
  85. terminate(undefined, {socket_error, Reason,
  86. 'A socket error occurred when retrieving the sock name.'})
  87. end.
  88. loop(State0=#state{children=Children}) ->
  89. %ct:pal("~p", [process_info(self(), messages)]),
  90. receive
  91. Msg when element(1, Msg) =:= quic ->
  92. handle_quic_msg(State0, Msg);
  93. %% Timeouts.
  94. {timeout, Ref, {shutdown, Pid}} ->
  95. cowboy_children:shutdown_timeout(Children, Ref, Pid),
  96. loop(State0);
  97. %% Messages pertaining to a stream.
  98. {{Pid, StreamID}, Msg} when Pid =:= self() ->
  99. loop(info(State0, StreamID, Msg));
  100. %% Exit signal from children.
  101. Msg = {'EXIT', Pid, _} ->
  102. loop(down(State0, Pid, Msg));
  103. Msg ->
  104. ct:pal("cowboy msg ~p", [Msg]),
  105. loop(State0)
  106. end.
  107. handle_quic_msg(State0, Msg) ->
  108. case cowboy_quicer:handle(Msg) of
  109. {data, StreamID, IsFin, Data} ->
  110. % ct:pal("{data, ~p, ~p, ~p}", [StreamID, IsFin, Data]),
  111. parse(State0, Data, StreamID, IsFin);
  112. {stream_started, StreamID, StreamType} ->
  113. % ct:pal("~p stream_started ~p ~p", [self(), StreamID, StreamType]),
  114. State = stream_new_remote(State0, StreamID, StreamType),
  115. loop(State);
  116. {stream_closed, StreamID, ErrorCode} ->
  117. % ct:pal("stream_closed ~p state ~p code ~p", [StreamID, State0, ErrorCode]),
  118. State = stream_closed(State0, StreamID, ErrorCode),
  119. loop(State);
  120. closed ->
  121. %% @todo terminate here?
  122. ok;
  123. ok ->
  124. loop(State0)
  125. end.
  126. parse(State=#state{opts=Opts}, Data, StreamID, IsFin) ->
  127. case stream_get(State, StreamID) of
  128. Stream=#stream{buffer= <<>>} ->
  129. parse1(State, Data, Stream, IsFin);
  130. Stream=#stream{buffer=Buffer} ->
  131. Stream1 = Stream#stream{buffer= <<>>},
  132. parse1(stream_store(State, Stream1),
  133. <<Buffer/binary, Data/binary>>, Stream1, IsFin);
  134. %% Pending data for a stream that has been reset. Ignore.
  135. error ->
  136. case is_lingering_stream(State, StreamID) of
  137. true ->
  138. ok;
  139. false ->
  140. %% We avoid logging the data as it could be quite large.
  141. cowboy:log(warning, "Received data for unknown stream ~p.",
  142. [StreamID], Opts)
  143. end,
  144. loop(State)
  145. end.
  146. %% @todo Swap Data and Stream/StreamID?
  147. parse1(State, Data, Stream=#stream{status=header}, IsFin) ->
  148. parse_unidirectional_stream_header(State, Data, Stream, IsFin);
  149. parse1(State, Data, Stream=#stream{status={data, Len}, id=StreamID}, IsFin) ->
  150. DataLen = byte_size(Data),
  151. if
  152. DataLen < Len ->
  153. %% We don't have the full frame but this is the end of the
  154. %% data we have. So FrameIsFin is equivalent to IsFin here.
  155. loop(frame(State, Stream#stream{status={data, Len - DataLen}}, {data, Data}, IsFin));
  156. true ->
  157. <<Data1:Len/binary, Rest/bits>> = Data,
  158. FrameIsFin = is_fin(IsFin, Rest),
  159. parse(frame(State, Stream#stream{status=normal}, {data, Data1}, FrameIsFin),
  160. Rest, StreamID, IsFin)
  161. end;
  162. %% @todo Clause that discards receiving data for stopping streams.
  163. %% We may receive a few more frames after we abort receiving.
  164. parse1(State, Data, Stream=#stream{id=StreamID}, IsFin) ->
  165. case cow_http3:parse(Data) of
  166. {ok, Frame, Rest} ->
  167. FrameIsFin = is_fin(IsFin, Rest),
  168. % ct:pal("parse1 Frame= ~p Rest= ~p", [Frame, Rest]),
  169. parse(frame(State, Stream, Frame, FrameIsFin), Rest, StreamID, IsFin);
  170. {more, Frame, Len} ->
  171. %% We're at the end of the data so FrameIsFin is equivalent to IsFin.
  172. case IsFin of
  173. nofin ->
  174. loop(frame(State, Stream#stream{status={data, Len}}, Frame, nofin));
  175. fin ->
  176. terminate(State, {connection_error, h3_frame_error,
  177. 'Last frame on stream was truncated. (RFC9114 7.1)'})
  178. end;
  179. {ignore, Rest} ->
  180. parse(ignored_frame(State, Stream), Rest, StreamID, IsFin);
  181. Error = {connection_error, _, _} ->
  182. terminate(State, Error);
  183. more when Data =:= <<>> ->
  184. loop(stream_store(State, Stream#stream{buffer=Data}));
  185. more ->
  186. %% We're at the end of the data so FrameIsFin is equivalent to IsFin.
  187. case IsFin of
  188. nofin ->
  189. loop(stream_store(State, Stream#stream{buffer=Data}));
  190. fin ->
  191. terminate(State, {connection_error, h3_frame_error,
  192. 'Last frame on stream was truncated. (RFC9114 7.1)'})
  193. end
  194. end.
  195. %% We may receive multiple frames in a single QUIC packet.
  196. %% The FIN flag applies to the QUIC packet, not to the frame.
  197. %% We must therefore only consider the frame to have a FIN
  198. %% flag if there's no data remaining to be read.
  199. is_fin(fin, <<>>) -> fin;
  200. is_fin(_, _) -> nofin.
  201. parse_unidirectional_stream_header(State0=#state{http3_machine=HTTP3Machine0},
  202. Data, Stream0=#stream{id=StreamID}, IsFin) ->
  203. case cow_http3:parse_unidi_stream_header(Data) of
  204. {ok, Type, Rest} when Type =:= control; Type =:= encoder; Type =:= decoder ->
  205. case cow_http3_machine:set_unidi_remote_stream_type(
  206. StreamID, Type, HTTP3Machine0) of
  207. {ok, HTTP3Machine} ->
  208. State = State0#state{http3_machine=HTTP3Machine},
  209. Stream = Stream0#stream{status=normal},
  210. parse(stream_store(State, Stream), Rest, StreamID, IsFin);
  211. {error, Error={connection_error, _, _}, HTTP3Machine} ->
  212. terminate(State0#state{http3_machine=HTTP3Machine}, Error)
  213. end;
  214. {ok, push, _} ->
  215. terminate(State0, {connection_error, h3_stream_creation_error,
  216. 'Only servers can push. (RFC9114 6.2.2)'});
  217. %% Unknown stream types must be ignored. We choose to abort the
  218. %% stream instead of reading and discarding the incoming data.
  219. {undefined, _} ->
  220. loop(stream_abort_receive(State0, Stream0, h3_stream_creation_error))
  221. end.
  222. frame(State=#state{http3_machine=HTTP3Machine0, conn=Conn, local_decoder_id=DecoderID},
  223. Stream=#stream{id=StreamID}, Frame, IsFin) ->
  224. % ct:pal("cowboy frame ~p ~p", [Frame, IsFin]),
  225. case cow_http3_machine:frame(Frame, IsFin, StreamID, HTTP3Machine0) of
  226. {ok, HTTP3Machine} ->
  227. State#state{http3_machine=HTTP3Machine};
  228. {ok, {data, Data}, HTTP3Machine} ->
  229. data_frame(State#state{http3_machine=HTTP3Machine}, Stream, IsFin, Data);
  230. %% @todo I don't think we need the IsFin in the {headers tuple.
  231. {ok, {headers, IsFin, Headers, PseudoHeaders, BodyLen}, HTTP3Machine} ->
  232. headers_frame(State#state{http3_machine=HTTP3Machine},
  233. Stream, IsFin, Headers, PseudoHeaders, BodyLen);
  234. {ok, {headers, IsFin, Headers, PseudoHeaders, BodyLen}, DecData, HTTP3Machine} ->
  235. %% Send the decoder data.
  236. ok = cowboy_quicer:send(Conn, DecoderID, DecData),
  237. headers_frame(State#state{http3_machine=HTTP3Machine},
  238. Stream, IsFin, Headers, PseudoHeaders, BodyLen);
  239. {ok, {trailers, _Trailers}, HTTP3Machine} ->
  240. %% @todo Propagate trailers.
  241. State#state{http3_machine=HTTP3Machine};
  242. {ok, GoAway={goaway, _}, HTTP3Machine} ->
  243. goaway(State#state{http3_machine=HTTP3Machine}, GoAway);
  244. {error, Error={stream_error, _Reason, _Human}, HTTP3Machine} ->
  245. reset_stream(State#state{http3_machine=HTTP3Machine}, Stream, Error);
  246. {error, Error={connection_error, _, _}, HTTP3Machine} ->
  247. terminate(State#state{http3_machine=HTTP3Machine}, Error)
  248. end.
  249. data_frame(State=#state{opts=Opts},
  250. Stream=#stream{id=StreamID, state=StreamState0}, IsFin, Data) ->
  251. try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
  252. {Commands, StreamState} ->
  253. commands(State, Stream#stream{state=StreamState}, Commands)
  254. catch Class:Exception:Stacktrace ->
  255. cowboy:log(cowboy_stream:make_error_log(data,
  256. [StreamID, IsFin, Data, StreamState0],
  257. Class, Exception, Stacktrace), Opts),
  258. reset_stream(State, Stream, {internal_error, {Class, Exception},
  259. 'Unhandled exception in cowboy_stream:data/4.'})
  260. end.
  261. headers_frame(State, Stream, IsFin, Headers,
  262. PseudoHeaders=#{method := <<"CONNECT">>}, _)
  263. when map_size(PseudoHeaders) =:= 2 ->
  264. early_error(State, Stream, IsFin, Headers, PseudoHeaders, 501,
  265. 'The CONNECT method is currently not implemented. (RFC7231 4.3.6)');
  266. headers_frame(State, Stream, IsFin, Headers,
  267. PseudoHeaders=#{method := <<"TRACE">>}, _) ->
  268. early_error(State, Stream, IsFin, Headers, PseudoHeaders, 501,
  269. 'The TRACE method is currently not implemented. (RFC9114 4.4, RFC7231 4.3.8)');
  270. headers_frame(State, Stream, IsFin, Headers, PseudoHeaders=#{authority := Authority}, BodyLen) ->
  271. headers_frame_parse_host(State, Stream, IsFin, Headers, PseudoHeaders, BodyLen, Authority);
  272. headers_frame(State, Stream, IsFin, Headers, PseudoHeaders, BodyLen) ->
  273. case lists:keyfind(<<"host">>, 1, Headers) of
  274. {_, Authority} ->
  275. headers_frame_parse_host(State, Stream, IsFin, Headers, PseudoHeaders, BodyLen, Authority);
  276. _ ->
  277. reset_stream(State, Stream, {stream_error, h3_message_error,
  278. 'Requests translated from HTTP/1.1 must include a host header. (RFC7540 8.1.2.3, RFC7230 5.4)'})
  279. end.
  280. headers_frame_parse_host(State=#state{peer=Peer, sock=Sock},
  281. Stream=#stream{id=StreamID}, IsFin, Headers,
  282. PseudoHeaders=#{method := Method, scheme := Scheme, path := PathWithQs},
  283. BodyLen, Authority) ->
  284. try cow_http_hd:parse_host(Authority) of
  285. {Host, Port0} ->
  286. Port = ensure_port(Scheme, Port0),
  287. try cow_http:parse_fullpath(PathWithQs) of
  288. {<<>>, _} ->
  289. reset_stream(State, Stream, {stream_error, h3_message_error,
  290. 'The path component must not be empty. (RFC7540 8.1.2.3)'});
  291. {Path, Qs} ->
  292. Req0 = #{
  293. ref => quic, %% @todo Ref,
  294. pid => self(),
  295. streamid => StreamID,
  296. peer => Peer,
  297. sock => Sock,
  298. cert => undefined, %Cert, %% @todo
  299. method => Method,
  300. scheme => Scheme,
  301. host => Host,
  302. port => Port,
  303. path => Path,
  304. qs => Qs,
  305. version => 'HTTP/3',
  306. headers => headers_to_map(Headers, #{}),
  307. has_body => IsFin =:= nofin,
  308. body_length => BodyLen
  309. },
  310. %% We add the protocol information for extended CONNECTs.
  311. Req = case PseudoHeaders of
  312. #{protocol := Protocol} -> Req0#{protocol => Protocol};
  313. _ -> Req0
  314. end,
  315. headers_frame(State, Stream, Req)
  316. catch _:_ ->
  317. reset_stream(State, Stream, {stream_error, h3_message_error,
  318. 'The :path pseudo-header is invalid. (RFC7540 8.1.2.3)'})
  319. end
  320. catch _:_ ->
  321. reset_stream(State, Stream, {stream_error, h3_message_error,
  322. 'The :authority pseudo-header is invalid. (RFC7540 8.1.2.3)'})
  323. end.
  324. %% @todo Copied from cowboy_http2.
  325. %% @todo Remove "http"? Probably.
  326. ensure_port(<<"http">>, undefined) -> 80;
  327. ensure_port(<<"https">>, undefined) -> 443;
  328. ensure_port(_, Port) -> Port.
  329. %% @todo Copied from cowboy_http2.
  330. %% This function is necessary to properly handle duplicate headers
  331. %% and the special-case cookie header.
  332. headers_to_map([], Acc) ->
  333. Acc;
  334. headers_to_map([{Name, Value}|Tail], Acc0) ->
  335. Acc = case Acc0 of
  336. %% The cookie header does not use proper HTTP header lists.
  337. #{Name := Value0} when Name =:= <<"cookie">> ->
  338. Acc0#{Name => << Value0/binary, "; ", Value/binary >>};
  339. #{Name := Value0} ->
  340. Acc0#{Name => << Value0/binary, ", ", Value/binary >>};
  341. _ ->
  342. Acc0#{Name => Value}
  343. end,
  344. headers_to_map(Tail, Acc).
  345. headers_frame(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Req) ->
  346. %ct:pal("req ~p", [Req]),
  347. try cowboy_stream:init(StreamID, Req, Opts) of
  348. {Commands, StreamState} ->
  349. %logger:error("~p", [Commands]),
  350. %logger:error("~p", [StreamState]),
  351. commands(State, Stream#stream{state=StreamState}, Commands)
  352. catch Class:Exception:Stacktrace ->
  353. cowboy:log(cowboy_stream:make_error_log(init,
  354. [StreamID, Req, Opts],
  355. Class, Exception, Stacktrace), Opts),
  356. reset_stream(State, Stream, {internal_error, {Class, Exception},
  357. 'Unhandled exception in cowboy_stream:init/3.'})
  358. end.
  359. early_error(State0=#state{opts=Opts, peer=Peer},
  360. Stream=#stream{id=StreamID}, _IsFin, Headers, #{method := Method},
  361. StatusCode0, HumanReadable) ->
  362. %% We automatically terminate the stream but it is not an error
  363. %% per se (at least not in the first implementation).
  364. Reason = {stream_error, h3_no_error, HumanReadable},
  365. %% The partial Req is minimal for now. We only have one case
  366. %% where it can be called (when a method is completely disabled).
  367. %% @todo Fill in the other elements.
  368. PartialReq = #{
  369. ref => quic, %% @todo Ref,
  370. peer => Peer,
  371. method => Method,
  372. headers => headers_to_map(Headers, #{})
  373. },
  374. Resp = {response, StatusCode0, RespHeaders0=#{<<"content-length">> => <<"0">>}, <<>>},
  375. try cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts) of
  376. {response, StatusCode, RespHeaders, RespBody} ->
  377. send_response(State0, Stream, StatusCode, RespHeaders, RespBody)
  378. catch Class:Exception:Stacktrace ->
  379. cowboy:log(cowboy_stream:make_error_log(early_error,
  380. [StreamID, Reason, PartialReq, Resp, Opts],
  381. Class, Exception, Stacktrace), Opts),
  382. %% We still need to send an error response, so send what we initially
  383. %% wanted to send. It's better than nothing.
  384. send_headers(State0, StreamID, fin, StatusCode0, RespHeaders0)
  385. end.
  386. %% Erlang messages.
  387. down(State0=#state{opts=Opts, children=Children0}, Pid, Msg) ->
  388. State = case cowboy_children:down(Children0, Pid) of
  389. %% The stream was terminated already.
  390. {ok, undefined, Children} ->
  391. State0#state{children=Children};
  392. %% The stream is still running.
  393. {ok, StreamID, Children} ->
  394. info(State0#state{children=Children}, StreamID, Msg);
  395. %% The process was unknown.
  396. error ->
  397. cowboy:log(warning, "Received EXIT signal ~p for unknown process ~p.~n",
  398. [Msg, Pid], Opts),
  399. State0
  400. end,
  401. if
  402. %% @todo
  403. % State#state.http2_status =:= closing, State#state.streams =:= #{} ->
  404. % terminate(State, {stop, normal, 'The connection is going away.'});
  405. true ->
  406. State
  407. end.
  408. info(State=#state{opts=Opts, http3_machine=_HTTP3Machine}, StreamID, Msg) ->
  409. %ct:pal("INFO ~p ~p ~p", [State, StreamID, Msg]),
  410. case stream_get(State, StreamID) of
  411. Stream=#stream{state=StreamState0} ->
  412. try cowboy_stream:info(StreamID, Msg, StreamState0) of
  413. {Commands, StreamState} ->
  414. %ct:pal("~p", [Commands]),
  415. %logger:error("~p ~p", [StreamID, Streams]),
  416. commands(State, Stream#stream{state=StreamState}, Commands)
  417. catch Class:Exception:Stacktrace ->
  418. cowboy:log(cowboy_stream:make_error_log(info,
  419. [StreamID, Msg, StreamState0],
  420. Class, Exception, Stacktrace), Opts),
  421. reset_stream(State, Stream, {internal_error, {Class, Exception},
  422. 'Unhandled exception in cowboy_stream:info/3.'})
  423. end;
  424. error ->
  425. case is_lingering_stream(State, StreamID) of
  426. true ->
  427. ok;
  428. false ->
  429. cowboy:log(warning, "Received message ~p for unknown stream ~p.",
  430. [Msg, StreamID], Opts)
  431. end,
  432. State
  433. end.
  434. %% Stream handler commands.
  435. commands(State, Stream, []) ->
  436. stream_store(State, Stream);
  437. %% Error responses are sent only if a response wasn't sent already.
  438. commands(State=#state{http3_machine=HTTP3Machine}, Stream=#stream{id=StreamID},
  439. [{error_response, StatusCode, Headers, Body}|Tail]) ->
  440. case cow_http3_machine:get_stream_local_state(StreamID, HTTP3Machine) of
  441. {ok, idle} ->
  442. commands(State, Stream, [{response, StatusCode, Headers, Body}|Tail]);
  443. _ ->
  444. commands(State, Stream, Tail)
  445. end;
  446. %% Send an informational response.
  447. commands(State0, Stream, [{inform, StatusCode, Headers}|Tail]) ->
  448. State = send_headers(State0, Stream, idle, StatusCode, Headers),
  449. commands(State, Stream, Tail);
  450. %% Send response headers.
  451. commands(State0, Stream, [{response, StatusCode, Headers, Body}|Tail]) ->
  452. % ct:pal("~p commands response ~p ~p ~p", [self(), StatusCode, Headers, try iolist_size(Body) catch _:_ -> Body end]),
  453. State = send_response(State0, Stream, StatusCode, Headers, Body),
  454. commands(State, Stream, Tail);
  455. %% Send response headers.
  456. commands(State0, Stream, [{headers, StatusCode, Headers}|Tail]) ->
  457. % ct:pal("commands headers ~p ~p", [StatusCode, Headers]),
  458. State = send_headers(State0, Stream, nofin, StatusCode, Headers),
  459. commands(State, Stream, Tail);
  460. %%% Send a response body chunk.
  461. commands(State0=#state{conn=Conn}, Stream=#stream{id=StreamID}, [{data, IsFin, Data}|Tail]) ->
  462. % ct:pal("commands data ~p ~p", [IsFin, try iolist_size(Data) catch _:_ -> Data end]),
  463. _ = case Data of
  464. {sendfile, Offset, Bytes, Path} ->
  465. %% Temporary solution to do sendfile over QUIC.
  466. {ok, _} = ranch_transport:sendfile(?MODULE, {Conn, StreamID},
  467. Path, Offset, Bytes, []),
  468. ok = cowboy_quicer:send(Conn, StreamID, cow_http3:data(<<>>), IsFin);
  469. _ ->
  470. ok = cowboy_quicer:send(Conn, StreamID, cow_http3:data(Data), IsFin)
  471. end,
  472. State = maybe_send_is_fin(State0, Stream, IsFin),
  473. commands(State, Stream, Tail);
  474. %%% Send trailers.
  475. commands(State=#state{conn=Conn, http3_machine=HTTP3Machine0},
  476. Stream=#stream{id=StreamID}, [{trailers, Trailers}|Tail]) ->
  477. % ct:pal("commands trailers ~p", [Trailers]),
  478. HTTP3Machine = case cow_http3_machine:prepare_trailers(
  479. StreamID, HTTP3Machine0, maps:to_list(Trailers)) of
  480. {trailers, HeaderBlock, _EncData, HTTP3Machine1} ->
  481. % ct:pal("trailers"),
  482. %% @todo EncData!!
  483. ok = cowboy_quicer:send(Conn, StreamID, cow_http3:headers(HeaderBlock), fin),
  484. HTTP3Machine1;
  485. {no_trailers, HTTP3Machine1} ->
  486. % ct:pal("no_trailers"),
  487. ok = cowboy_quicer:send(Conn, StreamID, cow_http3:data(<<>>), fin),
  488. HTTP3Machine1
  489. end,
  490. commands(State#state{http3_machine=HTTP3Machine}, Stream, Tail);
  491. %% Send a push promise.
  492. %%
  493. %% @todo Responses sent as a result of a push_promise request
  494. %% must not send push_promise frames themselves.
  495. %%
  496. %% @todo We should not send push_promise frames when we are
  497. %% in the closing http2_status.
  498. %commands(State0=#state{socket=Socket, transport=Transport, http3_machine=HTTP3Machine0},
  499. % Stream, [{push, Method, Scheme, Host, Port, Path, Qs, Headers0}|Tail]) ->
  500. % Authority = case {Scheme, Port} of
  501. % {<<"http">>, 80} -> Host;
  502. % {<<"https">>, 443} -> Host;
  503. % _ -> iolist_to_binary([Host, $:, integer_to_binary(Port)])
  504. % end,
  505. % PathWithQs = iolist_to_binary(case Qs of
  506. % <<>> -> Path;
  507. % _ -> [Path, $?, Qs]
  508. % end),
  509. % PseudoHeaders = #{
  510. % method => Method,
  511. % scheme => Scheme,
  512. % authority => Authority,
  513. % path => PathWithQs
  514. % },
  515. % %% We need to make sure the header value is binary before we can
  516. % %% create the Req object, as it expects them to be flat.
  517. % Headers = maps:to_list(maps:map(fun(_, V) -> iolist_to_binary(V) end, Headers0)),
  518. % %% @todo
  519. % State = case cow_http2_machine:prepare_push_promise(StreamID, HTTP3Machine0,
  520. % PseudoHeaders, Headers) of
  521. % {ok, PromisedStreamID, HeaderBlock, HTTP3Machine} ->
  522. % Transport:send(Socket, cow_http2:push_promise(
  523. % StreamID, PromisedStreamID, HeaderBlock)),
  524. % headers_frame(State0#state{http3_machine=HTTP2Machine},
  525. % PromisedStreamID, fin, Headers, PseudoHeaders, 0);
  526. % {error, no_push} ->
  527. % State0
  528. % end,
  529. % commands(State, Stream, Tail);
  530. %%% Read the request body.
  531. %commands(State0=#state{flow=Flow, streams=Streams}, Stream, [{flow, Size}|Tail]) ->
  532. commands(State, Stream, [{flow, _Size}|Tail]) ->
  533. %% @todo We should tell the QUIC stream to increase its window size.
  534. % #{StreamID := Stream=#stream{flow=StreamFlow}} = Streams,
  535. % State = update_window(State0#state{flow=Flow + Size,
  536. % streams=Streams#{StreamID => Stream#stream{flow=StreamFlow + Size}}},
  537. % StreamID),
  538. commands(State, Stream, Tail);
  539. %% Supervise a child process.
  540. commands(State=#state{children=Children}, Stream=#stream{id=StreamID},
  541. [{spawn, Pid, Shutdown}|Tail]) ->
  542. commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)},
  543. Stream, Tail);
  544. %% Error handling.
  545. commands(State, Stream, [Error = {internal_error, _, _}|_Tail]) ->
  546. %% @todo Do we want to run the commands after an internal_error?
  547. %% @todo Do we even allow commands after?
  548. %% @todo Only reset when the stream still exists.
  549. reset_stream(State, Stream, Error);
  550. %% Use a different protocol within the stream (CONNECT :protocol).
  551. %% @todo Make sure we error out when the feature is disabled.
  552. commands(State0, Stream0=#stream{id=StreamID},
  553. [{switch_protocol, Headers, _Mod, _ModState}|Tail]) ->
  554. State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}),
  555. Stream = stream_get(State, StreamID),
  556. commands(State, Stream, Tail);
  557. %% Set options dynamically.
  558. commands(State, Stream, [{set_options, _Opts}|Tail]) ->
  559. commands(State, Stream, Tail);
  560. commands(State, Stream, [stop|_Tail]) ->
  561. % ct:pal("stop"),
  562. %% @todo Do we want to run the commands after a stop?
  563. %% @todo Do we even allow commands after?
  564. stop_stream(stream_store(State, Stream), Stream);
  565. %% Log event.
  566. commands(State=#state{opts=Opts}, Stream, [Log={log, _, _, _}|Tail]) ->
  567. cowboy:log(Log, Opts),
  568. commands(State, Stream, Tail).
  569. send_response(State0=#state{conn=Conn, http3_machine=HTTP3Machine0},
  570. Stream=#stream{id=StreamID}, StatusCode, Headers, Body) ->
  571. Size = case Body of
  572. {sendfile, _, Bytes0, _} -> Bytes0;
  573. _ -> iolist_size(Body)
  574. end,
  575. case Size of
  576. 0 ->
  577. State = send_headers(State0, Stream, fin, StatusCode, Headers),
  578. maybe_send_is_fin(State, Stream, fin);
  579. _ ->
  580. %% @todo Add a test for HEAD to make sure we don't send the body when
  581. %% returning {response...} from a stream handler (or {headers...} then {data...}).
  582. %% @todo We must send EncData!
  583. {ok, _IsFin, HeaderBlock, _EncData, HTTP3Machine}
  584. = cow_http3_machine:prepare_headers(StreamID, HTTP3Machine0, nofin,
  585. #{status => cow_http:status_to_integer(StatusCode)},
  586. headers_to_list(Headers)),
  587. %% @todo It might be better to do async sends.
  588. _ = case Body of
  589. {sendfile, Offset, Bytes, Path} ->
  590. ok = cowboy_quicer:send(Conn, StreamID,
  591. cow_http3:headers(HeaderBlock)),
  592. %% Temporary solution to do sendfile over QUIC.
  593. {ok, _} = ranch_transport:sendfile(?MODULE, {Conn, StreamID},
  594. Path, Offset, Bytes, []),
  595. ok = cowboy_quicer:send(Conn, StreamID,
  596. cow_http3:data(<<>>), fin);
  597. _ ->
  598. ok = cowboy_quicer:send(Conn, StreamID, [
  599. cow_http3:headers(HeaderBlock),
  600. cow_http3:data(Body)
  601. ], fin)
  602. end,
  603. maybe_send_is_fin(State0#state{http3_machine=HTTP3Machine}, Stream, fin)
  604. end.
  605. maybe_send_is_fin(State=#state{http3_machine=HTTP3Machine0},
  606. Stream=#stream{id=StreamID}, fin) ->
  607. HTTP3Machine = cow_http3_machine:close_bidi_stream_for_sending(StreamID, HTTP3Machine0),
  608. maybe_terminate_stream(State#state{http3_machine=HTTP3Machine}, Stream);
  609. maybe_send_is_fin(State, _, _) ->
  610. State.
  611. %% Temporary callback to do sendfile over QUIC.
  612. -spec send(_, _) -> _. %% @todo
  613. send({Conn, StreamID}, IoData) ->
  614. cowboy_quicer:send(Conn, StreamID, cow_http3:data(IoData)).
  615. send_headers(State=#state{conn=Conn, http3_machine=HTTP3Machine0},
  616. #stream{id=StreamID}, IsFin0, StatusCode, Headers) ->
  617. {ok, IsFin, HeaderBlock, _EncData, HTTP3Machine}
  618. = cow_http3_machine:prepare_headers(StreamID, HTTP3Machine0, IsFin0,
  619. #{status => cow_http:status_to_integer(StatusCode)},
  620. headers_to_list(Headers)),
  621. ok = cowboy_quicer:send(Conn, StreamID, cow_http3:headers(HeaderBlock), IsFin),
  622. %% @todo Send _EncData.
  623. State#state{http3_machine=HTTP3Machine}.
  624. %% The set-cookie header is special; we can only send one cookie per header.
  625. headers_to_list(Headers0=#{<<"set-cookie">> := SetCookies}) ->
  626. Headers = maps:to_list(maps:remove(<<"set-cookie">>, Headers0)),
  627. Headers ++ [{<<"set-cookie">>, Value} || Value <- SetCookies];
  628. headers_to_list(Headers) ->
  629. maps:to_list(Headers).
  630. reset_stream(State0=#state{conn=Conn, http3_machine=HTTP3Machine0},
  631. Stream=#stream{id=StreamID}, Error) ->
  632. %ct:pal("~p reset_stream ~p ~p", [self(), Stream, Error]),
  633. Reason = case Error of
  634. {internal_error, _, _} -> h3_internal_error;
  635. {stream_error, Reason0, _} -> Reason0
  636. end,
  637. %% @todo Do we want to close both sides?
  638. %% @todo Should we close the send side if the receive side was already closed?
  639. Res = cowboy_quicer:shutdown_stream(Conn, StreamID,
  640. both, cow_http3:error_to_code(Reason)),
  641. % ct:pal("~p reset_stream res ~p", [self(), Res]),
  642. State1 = case cow_http3_machine:reset_stream(StreamID, HTTP3Machine0) of
  643. {ok, HTTP3Machine} ->
  644. terminate_stream(State0#state{http3_machine=HTTP3Machine}, Stream, Error);
  645. {error, not_found} ->
  646. terminate_stream(State0, Stream, Error)
  647. end,
  648. %% @todo
  649. % case reset_rate(State1) of
  650. % {ok, State} ->
  651. % State;
  652. % error ->
  653. % terminate(State1, {connection_error, enhance_your_calm,
  654. % 'Stream reset rate larger than configuration allows. Flood? (CVE-2019-9514)'})
  655. % end.
  656. State1.
  657. stop_stream(State0=#state{http3_machine=HTTP3Machine}, Stream=#stream{id=StreamID}) ->
  658. %ct:pal("stop_stream ~p ~p", [State0, Stream]),
  659. %% We abort reading when stopping the stream but only
  660. %% if the client was not finished sending data.
  661. %% We mark the stream as 'stopping' either way.
  662. State = case cow_http3_machine:get_stream_remote_state(StreamID, HTTP3Machine) of
  663. {ok, fin} ->
  664. stream_store(State0, Stream#stream{status=stopping});
  665. {error, not_found} ->
  666. stream_store(State0, Stream#stream{status=stopping});
  667. _ ->
  668. stream_abort_receive(State0, Stream, h3_no_error)
  669. end,
  670. %% Then we may need to send a response or terminate it
  671. %% if the stream handler did not do so already.
  672. case cow_http3_machine:get_stream_local_state(StreamID, HTTP3Machine) of
  673. %% When the stream terminates normally (without resetting the stream)
  674. %% and no response was sent, we need to send a proper response back to the client.
  675. {ok, idle} ->
  676. info(State, StreamID, {response, 204, #{}, <<>>});
  677. %% When a response was sent but not terminated, we need to close the stream.
  678. %% We send a final DATA frame to complete the stream.
  679. {ok, nofin} ->
  680. % ct:pal("error nofin"),
  681. info(State, StreamID, {data, fin, <<>>});
  682. %% When a response was sent fully we can terminate the stream,
  683. %% regardless of the stream being in half-closed or closed state.
  684. _ ->
  685. terminate_stream(State, Stream, normal)
  686. end.
  687. maybe_terminate_stream(State, Stream=#stream{status=stopping}) ->
  688. terminate_stream(State, Stream, normal);
  689. %% The Stream will be stored in the State at the end of commands processing.
  690. maybe_terminate_stream(State, _) ->
  691. State.
  692. terminate_stream(State=#state{streams=Streams0, children=Children0},
  693. #stream{id=StreamID, state=StreamState}, Reason) ->
  694. Streams = maps:remove(StreamID, Streams0),
  695. terminate_stream_handler(State, StreamID, Reason, StreamState),
  696. Children = cowboy_children:shutdown(Children0, StreamID),
  697. stream_linger(State#state{streams=Streams, children=Children}, StreamID).
  698. terminate_stream_handler(#state{opts=Opts}, StreamID, Reason, StreamState) ->
  699. try
  700. cowboy_stream:terminate(StreamID, Reason, StreamState)
  701. catch Class:Exception:Stacktrace ->
  702. cowboy:log(cowboy_stream:make_error_log(terminate,
  703. [StreamID, Reason, StreamState],
  704. Class, Exception, Stacktrace), Opts)
  705. end.
  706. ignored_frame(State=#state{http3_machine=HTTP3Machine0}, #stream{id=StreamID}) ->
  707. case cow_http3_machine:ignored_frame(StreamID, HTTP3Machine0) of
  708. {ok, HTTP3Machine} ->
  709. State#state{http3_machine=HTTP3Machine};
  710. {error, Error={connection_error, _, _}, HTTP3Machine} ->
  711. terminate(State#state{http3_machine=HTTP3Machine}, Error)
  712. end.
  713. stream_abort_receive(State=#state{conn=Conn}, Stream=#stream{id=StreamID}, Reason) ->
  714. cowboy_quicer:shutdown_stream(Conn, StreamID,
  715. receiving, cow_http3:error_to_code(Reason)),
  716. stream_store(State, Stream#stream{status=stopping}).
  717. %% @todo Graceful connection shutdown.
  718. %% We terminate the connection immediately if it hasn't fully been initialized.
  719. goaway(State, {goaway, _}) ->
  720. terminate(State, {stop, goaway, 'The connection is going away.'}).
  721. terminate(State=#state{conn=Conn, %http3_status=Status,
  722. %http3_machine=HTTP3Machine,
  723. streams=Streams, children=Children}, Reason) ->
  724. % if
  725. % Status =:= connected; Status =:= closing_initiated ->
  726. %% @todo
  727. % ok = cowboy_quicer:send(Conn, ControlID, cow_http3:goaway(
  728. % cow_http3_machine:get_last_streamid(HTTP3Machine))),
  729. %% We already sent the GOAWAY frame.
  730. % Status =:= closing ->
  731. % ok
  732. % end,
  733. terminate_all_streams(State, maps:to_list(Streams), Reason),
  734. cowboy_children:terminate(Children),
  735. % terminate_linger(State),
  736. cowboy_quicer:shutdown(Conn, cow_http3:error_to_code(terminate_reason(Reason))),
  737. exit({shutdown, Reason}).
  738. terminate_reason({connection_error, Reason, _}) -> Reason;
  739. terminate_reason({stop, _, _}) -> h3_no_error.
  740. %terminate_reason({socket_error, _, _}) -> internal_error;
  741. %terminate_reason({internal_error, _, _}) -> internal_error.
  742. terminate_all_streams(_, [], _) ->
  743. ok;
  744. terminate_all_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], Reason) ->
  745. terminate_stream_handler(State, StreamID, Reason, StreamState),
  746. terminate_all_streams(State, Tail, Reason).
  747. stream_get(#state{streams=Streams}, StreamID) ->
  748. maps:get(StreamID, Streams, error).
  749. stream_new_remote(State=#state{http3_machine=HTTP3Machine0, streams=Streams},
  750. StreamID, StreamType) ->
  751. {HTTP3Machine, Status} = case StreamType of
  752. unidi ->
  753. {cow_http3_machine:init_unidi_stream(StreamID, unidi_remote, HTTP3Machine0),
  754. header};
  755. bidi ->
  756. {cow_http3_machine:init_bidi_stream(StreamID, HTTP3Machine0),
  757. normal}
  758. end,
  759. Stream = #stream{id=StreamID, status=Status},
  760. State#state{http3_machine=HTTP3Machine, streams=Streams#{StreamID => Stream}}.
  761. %% Stream closed message for a local (write-only) unidi stream.
  762. stream_closed(State=#state{local_control_id=StreamID}, StreamID, _) ->
  763. stream_closed1(State, StreamID);
  764. stream_closed(State=#state{local_encoder_id=StreamID}, StreamID, _) ->
  765. stream_closed1(State, StreamID);
  766. stream_closed(State=#state{local_decoder_id=StreamID}, StreamID, _) ->
  767. stream_closed1(State, StreamID);
  768. stream_closed(State=#state{opts=Opts,
  769. streams=Streams0, children=Children0}, StreamID, ErrorCode) ->
  770. case maps:take(StreamID, Streams0) of
  771. {#stream{state=undefined}, Streams} ->
  772. %% Unidi stream has no handler/children.
  773. stream_closed1(State#state{streams=Streams}, StreamID);
  774. %% We only stop bidi streams if the stream was closed with an error
  775. %% or the stream was already in the process of stopping.
  776. {#stream{status=Status, state=StreamState}, Streams}
  777. when Status =:= stopping; ErrorCode =/= 0 ->
  778. terminate_stream_handler(State, StreamID, closed, StreamState),
  779. Children = cowboy_children:shutdown(Children0, StreamID),
  780. stream_closed1(State#state{streams=Streams, children=Children}, StreamID);
  781. %% Don't remove a stream that terminated properly but
  782. %% has chosen to remain up (custom stream handlers).
  783. {_, _} ->
  784. stream_closed1(State, StreamID);
  785. %% Stream closed message for a stream that has been reset. Ignore.
  786. error ->
  787. case is_lingering_stream(State, StreamID) of
  788. true ->
  789. ok;
  790. false ->
  791. %% We avoid logging the data as it could be quite large.
  792. cowboy:log(warning, "Received stream_closed for unknown stream ~p. ~p ~p",
  793. [StreamID, self(), Streams0], Opts)
  794. end,
  795. State
  796. end.
  797. stream_closed1(State=#state{http3_machine=HTTP3Machine0}, StreamID) ->
  798. case cow_http3_machine:close_stream(StreamID, HTTP3Machine0) of
  799. {ok, HTTP3Machine} ->
  800. State#state{http3_machine=HTTP3Machine};
  801. {error, Error={connection_error, _, _}, HTTP3Machine} ->
  802. terminate(State#state{http3_machine=HTTP3Machine}, Error)
  803. end.
  804. stream_store(State=#state{streams=Streams}, Stream=#stream{id=StreamID}) ->
  805. State#state{streams=Streams#{StreamID => Stream}}.
  806. stream_linger(State=#state{lingering_streams=Lingering0}, StreamID) ->
  807. %% We only keep up to 100 streams in this state. @todo Make it configurable?
  808. Lingering = [StreamID|lists:sublist(Lingering0, 100 - 1)],
  809. State#state{lingering_streams=Lingering}.
  810. is_lingering_stream(#state{lingering_streams=Lingering}, StreamID) ->
  811. lists:member(StreamID, Lingering).