Browse Source

Parsing + part of state machine

Loïc Hoguin 2 years ago
parent
commit
9887c9dae0
4 changed files with 731 additions and 59 deletions
  1. 1 1
      ebin/cowboy.app
  2. 260 0
      src/cow_http3.erl
  3. 329 0
      src/cow_http3_machine.erl
  4. 141 58
      src/cowboy_http3.erl

+ 1 - 1
ebin/cowboy.app

@@ -1,7 +1,7 @@
 {application, 'cowboy', [
 	{description, "Small, fast, modern HTTP server."},
 	{vsn, "2.10.0"},
-	{modules, ['cowboy','cowboy_app','cowboy_bstr','cowboy_children','cowboy_clear','cowboy_clock','cowboy_compress_h','cowboy_constraints','cowboy_handler','cowboy_http','cowboy_http2','cowboy_http3','cowboy_loop','cowboy_metrics_h','cowboy_middleware','cowboy_req','cowboy_rest','cowboy_router','cowboy_static','cowboy_stream','cowboy_stream_h','cowboy_sub_protocol','cowboy_sup','cowboy_tls','cowboy_tracer_h','cowboy_websocket']},
+	{modules, ['cow_http3','cow_http3_machine','cowboy','cowboy_app','cowboy_bstr','cowboy_children','cowboy_clear','cowboy_clock','cowboy_compress_h','cowboy_constraints','cowboy_handler','cowboy_http','cowboy_http2','cowboy_http3','cowboy_loop','cowboy_metrics_h','cowboy_middleware','cowboy_req','cowboy_rest','cowboy_router','cowboy_static','cowboy_stream','cowboy_stream_h','cowboy_sub_protocol','cowboy_sup','cowboy_tls','cowboy_tracer_h','cowboy_websocket']},
 	{registered, [cowboy_sup,cowboy_clock]},
 	{applications, [kernel,stdlib,crypto,cowlib,ranch,quicer]},
 	{mod, {cowboy_app, []}},

+ 260 - 0
src/cow_http3.erl

@@ -0,0 +1,260 @@
+%% Copyright (c) 2023, Loïc Hoguin <essen@ninenines.eu>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(cow_http3).
+
+-export([parse/1]).
+-export([parse_unidi_stream_header/1]).
+
+-spec parse(_) -> _. %% @todo
+
+%%
+%% DATA frames.
+%%
+parse(<<0, 0:2, Len:6, Data:Len/binary, Rest/bits>>) ->
+	{ok, {data, Data}, Rest};
+parse(<<0, 1:2, Len:14, Data:Len/binary, Rest/bits>>) ->
+	{ok, {data, Data}, Rest};
+parse(<<0, 2:2, Len:30, Data:Len/binary, Rest/bits>>) ->
+	{ok, {data, Data}, Rest};
+parse(<<0, 3:2, Len:62, Data:Len/binary, Rest/bits>>) ->
+	{ok, {data, Data}, Rest};
+%% DATA frames may be split over multiple QUIC packets
+%% but we want to process them immediately rather than
+%% risk buffering a very large payload.
+parse(<<0, 0:2, Len:6, Data/bits>>) when byte_size(Data) < Len ->
+	{more, {data, Data}, Len - byte_size(Data)};
+parse(<<0, 1:2, Len:14, Data/bits>>) when byte_size(Data) < Len ->
+	{more, {data, Data}, Len - byte_size(Data)};
+parse(<<0, 2:2, Len:30, Data/bits>>) when byte_size(Data) < Len ->
+	{more, {data, Data}, Len - byte_size(Data)};
+parse(<<0, 3:2, Len:62, Data/bits>>) when byte_size(Data) < Len ->
+	{more, {data, Data}, Len - byte_size(Data)};
+%%
+%% HEADERS frames.
+%%
+parse(<<1, 0:2, Len:6, EncodedFieldSection:Len/binary, Rest/bits>>) ->
+	{ok, {headers, EncodedFieldSection}, Rest};
+parse(<<1, 1:2, Len:14, EncodedFieldSection:Len/binary, Rest/bits>>) ->
+	{ok, {headers, EncodedFieldSection}, Rest};
+parse(<<1, 2:2, Len:30, EncodedFieldSection:Len/binary, Rest/bits>>) ->
+	{ok, {headers, EncodedFieldSection}, Rest};
+parse(<<1, 3:2, Len:62, EncodedFieldSection:Len/binary, Rest/bits>>) ->
+	{ok, {headers, EncodedFieldSection}, Rest};
+%%
+%% CANCEL_PUSH frames.
+%%
+parse(<<3, 0:2, 1:6, 0:2, PushID:6, Rest/bits>>) ->
+	{ok, {cancel_push, PushID}, Rest};
+parse(<<3, 0:2, 2:6, 1:2, PushID:14, Rest/bits>>) ->
+	{ok, {cancel_push, PushID}, Rest};
+parse(<<3, 0:2, 4:6, 2:2, PushID:30, Rest/bits>>) ->
+	{ok, {cancel_push, PushID}, Rest};
+parse(<<3, 0:2, 8:6, 3:2, PushID:62, Rest/bits>>) ->
+	{ok, {cancel_push, PushID}, Rest};
+parse(<<3, _/bits>>) ->
+	{connection_error, h3_frame_error,
+		'CANCEL_PUSH frames payload MUST be 1, 2, 4 or 8 bytes wide. (RFC9114 7.1, RFC9114 7.2.3)'};
+%%
+%% SETTINGS frames.
+%%
+parse(<<4, 0:2, Len:6, Rest/bits>>) when byte_size(Rest) >= Len ->
+	parse_settings_id(Rest, Len, #{});
+parse(<<4, 1:2, Len:14, Rest/bits>>) when byte_size(Rest) >= Len ->
+	parse_settings_id(Rest, Len, #{});
+parse(<<4, 2:2, Len:30, Rest/bits>>) when byte_size(Rest) >= Len ->
+	parse_settings_id(Rest, Len, #{});
+parse(<<4, 3:2, Len:62, Rest/bits>>) when byte_size(Rest) >= Len ->
+	parse_settings_id(Rest, Len, #{});
+%%
+%% PUSH_PROMISE frames.
+%%
+parse(<<5, 0:2, Len:6, Rest/bits>>) when byte_size(Rest) >= Len ->
+	parse_push_promise(Rest, Len);
+parse(<<5, 1:2, Len:14, Rest/bits>>) when byte_size(Rest) >= Len ->
+	parse_push_promise(Rest, Len);
+parse(<<5, 2:2, Len:30, Rest/bits>>) when byte_size(Rest) >= Len ->
+	parse_push_promise(Rest, Len);
+parse(<<5, 3:2, Len:62, Rest/bits>>) when byte_size(Rest) >= Len ->
+	parse_push_promise(Rest, Len);
+%%
+%% GOAWAY frames.
+%%
+parse(<<7, 0:2, 1:6, 0:2, StreamOrPushID:6, Rest/bits>>) ->
+	{ok, {goaway, StreamOrPushID}, Rest};
+parse(<<7, 0:2, 2:6, 1:2, StreamOrPushID:14, Rest/bits>>) ->
+	{ok, {goaway, StreamOrPushID}, Rest};
+parse(<<7, 0:2, 4:6, 2:2, StreamOrPushID:30, Rest/bits>>) ->
+	{ok, {goaway, StreamOrPushID}, Rest};
+parse(<<7, 0:2, 8:6, 3:2, StreamOrPushID:62, Rest/bits>>) ->
+	{ok, {goaway, StreamOrPushID}, Rest};
+parse(<<7, _/bits>>) ->
+	{connection_error, h3_frame_error,
+		'GOAWAY frames payload MUST be 1, 2, 4 or 8 bytes wide. (RFC9114 7.1, RFC9114 7.2.6)'};
+%%
+%% MAX_PUSH_ID frames.
+%%
+parse(<<13, 0:2, 1:6, 0:2, PushID:6, Rest/bits>>) ->
+	{ok, {max_push_id, PushID}, Rest};
+parse(<<13, 0:2, 2:6, 1:2, PushID:14, Rest/bits>>) ->
+	{ok, {max_push_id, PushID}, Rest};
+parse(<<13, 0:2, 4:6, 2:2, PushID:30, Rest/bits>>) ->
+	{ok, {max_push_id, PushID}, Rest};
+parse(<<13, 0:2, 8:6, 3:2, PushID:62, Rest/bits>>) ->
+	{ok, {max_push_id, PushID}, Rest};
+parse(<<13, _/bits>>) ->
+	{connection_error, h3_frame_error,
+		'MAX_PUSH_ID frames payload MUST be 1, 2, 4 or 8 bytes wide. (RFC9114 7.1, RFC9114 7.2.6)'};
+%%
+%% HTTP/2 frame types must be rejected.
+%%
+parse(<<2, _/bits>>) ->
+	{connection_error, h3_frame_unexpected,
+		'HTTP/2 PRIORITY frame not defined for HTTP/3 must be rejected. (RFC9114 7.2.8)'};
+parse(<<6, _/bits>>) ->
+	{connection_error, h3_frame_unexpected,
+		'HTTP/2 PING frame not defined for HTTP/3 must be rejected. (RFC9114 7.2.8)'};
+parse(<<8, _/bits>>) ->
+	{connection_error, h3_frame_unexpected,
+		'HTTP/2 WINDOW_UPDATE frame not defined for HTTP/3 must be rejected. (RFC9114 7.2.8)'};
+parse(<<9, _/bits>>) ->
+	{connection_error, h3_frame_unexpected,
+		'HTTP/2 CONTINUATION frame not defined for HTTP/3 must be rejected. (RFC9114 7.2.8)'};
+%%
+%% Unknown frames must be ignored.
+%%
+%% @todo This can lead to DoS especially for larger frames
+%%       and HTTP/3 doesn't have a limit in SETTINGS. Perhaps
+%%       we should have an option to limit the stream buffer
+%%       size and error out (h3_excessive_load) when exceeded.
+parse(<<0:2, Type:6, 0:2, Len:6, _:Len/binary, Rest/bits>>)
+		when Type =:= 10; Type =:= 11; Type =:= 12; Type > 13 ->
+	{ignore, Rest};
+parse(<<0:2, Type:6, 1:2, Len:14, _:Len/binary, Rest/bits>>)
+		when Type =:= 10; Type =:= 11; Type =:= 12; Type > 13 ->
+	{ignore, Rest};
+parse(<<0:2, Type:6, 2:2, Len:30, _:Len/binary, Rest/bits>>)
+		when Type =:= 10; Type =:= 11; Type =:= 12; Type > 13 ->
+	{ignore, Rest};
+parse(<<0:2, Type:6, 3:2, Len:62, _:Len/binary, Rest/bits>>)
+		when Type =:= 10; Type =:= 11; Type =:= 12; Type > 13 ->
+	{ignore, Rest};
+parse(<<1:2, _:14, 0:2, Len:6, _:Len/binary, Rest/bits>>) ->
+	{ignore, Rest};
+parse(<<1:2, _:14, 1:2, Len:14, _:Len/binary, Rest/bits>>) ->
+	{ignore, Rest};
+parse(<<1:2, _:14, 2:2, Len:30, _:Len/binary, Rest/bits>>) ->
+	{ignore, Rest};
+parse(<<1:2, _:14, 3:2, Len:62, _:Len/binary, Rest/bits>>) ->
+	{ignore, Rest};
+parse(<<2:2, _:30, 0:2, Len:6, _:Len/binary, Rest/bits>>) ->
+	{ignore, Rest};
+parse(<<2:2, _:30, 1:2, Len:14, _:Len/binary, Rest/bits>>) ->
+	{ignore, Rest};
+parse(<<2:2, _:30, 2:2, Len:30, _:Len/binary, Rest/bits>>) ->
+	{ignore, Rest};
+parse(<<2:2, _:30, 3:2, Len:62, _:Len/binary, Rest/bits>>) ->
+	{ignore, Rest};
+parse(<<3:2, _:62, 0:2, Len:6, _:Len/binary, Rest/bits>>) ->
+	{ignore, Rest};
+parse(<<3:2, _:62, 1:2, Len:14, _:Len/binary, Rest/bits>>) ->
+	{ignore, Rest};
+parse(<<3:2, _:62, 2:2, Len:30, _:Len/binary, Rest/bits>>) ->
+	{ignore, Rest};
+parse(<<3:2, _:62, 3:2, Len:62, _:Len/binary, Rest/bits>>) ->
+	{ignore, Rest};
+%%
+%% Incomplete frames for those we fully process only.
+%%
+parse(_) ->
+	more.
+
+parse_settings_id(Rest, 0, Settings) ->
+	{ok, {settings, Settings}, Rest};
+parse_settings_id(<<0:2, Identifier:6, Rest/bits>>, Len, Settings) when Len >= 1 ->
+	parse_settings_val(Rest, Len - 1, Settings, Identifier);
+parse_settings_id(<<1:2, Identifier:14, Rest/bits>>, Len, Settings) when Len >= 2 ->
+	parse_settings_val(Rest, Len - 2, Settings, Identifier);
+parse_settings_id(<<2:2, Identifier:30, Rest/bits>>, Len, Settings) when Len >= 4 ->
+	parse_settings_val(Rest, Len - 4, Settings, Identifier);
+parse_settings_id(<<3:2, Identifier:62, Rest/bits>>, Len, Settings) when Len >= 8 ->
+	parse_settings_val(Rest, Len - 8, Settings, Identifier);
+parse_settings_id(_, _, _) ->
+	{connection_error, h3_frame_error,
+		'SETTINGS payload size exceeds the length given. (RFC9114 7.1, RFC9114 7.2.4)'}.
+
+parse_settings_val(<<0:2, Value:6, Rest/bits>>, Len, Settings, Identifier) when Len >= 1 ->
+	parse_settings_id_val(Rest, Len - 1, Settings, Identifier, Value);
+parse_settings_val(<<1:2, Value:14, Rest/bits>>, Len, Settings, Identifier) when Len >= 2 ->
+	parse_settings_id_val(Rest, Len - 2, Settings, Identifier, Value);
+parse_settings_val(<<2:2, Value:30, Rest/bits>>, Len, Settings, Identifier) when Len >= 4 ->
+	parse_settings_id_val(Rest, Len - 4, Settings, Identifier, Value);
+parse_settings_val(<<3:2, Value:62, Rest/bits>>, Len, Settings, Identifier) when Len >= 8 ->
+	parse_settings_id_val(Rest, Len - 8, Settings, Identifier, Value);
+parse_settings_val(_, _, _, _) ->
+	{connection_error, h3_frame_error,
+		'SETTINGS payload size exceeds the length given. (RFC9114 7.1, RFC9114 7.2.4)'}.
+
+parse_settings_id_val(Rest, Len, Settings, Identifier, Value) ->
+	case Identifier of
+		6 ->
+			parse_settings_key_val(Rest, Len, Settings, max_field_section_size, Value);
+		_ when Identifier < 6, Identifier =/= 1 ->
+			{connection_error, h3_settings_error,
+				'HTTP/2 setting not defined for HTTP/3 must be rejected. (RFC9114 7.2.4.1)'};
+		%% Unknown settings must be ignored.
+		_ ->
+			parse_settings_id(Rest, Len, Settings)
+	end.
+
+parse_settings_key_val(Rest, Len, Settings, Key, Value) ->
+	case Settings of
+		#{Key := _} ->
+			{connection_error, h3_settings_error,
+				'A duplicate setting identifier was found. (RFC9114 7.2.4)'};
+		_ ->
+			parse_settings_id(Rest, Len, Settings#{Key => Value})
+	end.
+
+parse_push_promise(<<0:2, PushID:6, Data/bits>>, Len) ->
+	<<EncodedFieldSection:(Len - 1)/bytes, Rest/bits>> = Data,
+	{ok, {push_promise, PushID, EncodedFieldSection}, Rest};
+parse_push_promise(<<1:2, PushID:14, Data/bits>>, Len) ->
+	<<EncodedFieldSection:(Len - 2)/bytes, Rest/bits>> = Data,
+	{ok, {push_promise, PushID, EncodedFieldSection}, Rest};
+parse_push_promise(<<2:2, PushID:30, Data/bits>>, Len) ->
+	<<EncodedFieldSection:(Len - 4)/bytes, Rest/bits>> = Data,
+	{ok, {push_promise, PushID, EncodedFieldSection}, Rest};
+parse_push_promise(<<3:2, PushID:62, Data/bits>>, Len) ->
+	<<EncodedFieldSection:(Len - 8)/bytes, Rest/bits>> = Data,
+	{ok, {push_promise, PushID, EncodedFieldSection}, Rest}.
+
+-spec parse_unidi_stream_header(_) -> _. %% @todo
+
+parse_unidi_stream_header(<<0, Rest/bits>>) ->
+	{ok, control, Rest};
+parse_unidi_stream_header(<<1, Rest/bits>>) ->
+	{ok, push, Rest};
+parse_unidi_stream_header(<<2, Rest/bits>>) ->
+	{ok, encoder, Rest};
+parse_unidi_stream_header(<<3, Rest/bits>>) ->
+	{ok, decoder, Rest};
+parse_unidi_stream_header(<<0:2, _:6, Rest/bits>>) ->
+	{undefined, Rest};
+parse_unidi_stream_header(<<1:2, _:14, Rest/bits>>) ->
+	{undefined, Rest};
+parse_unidi_stream_header(<<2:2, _:30, Rest/bits>>) ->
+	{undefined, Rest};
+parse_unidi_stream_header(<<3:2, _:62, Rest/bits>>) ->
+	{undefined, Rest}.

+ 329 - 0
src/cow_http3_machine.erl

@@ -0,0 +1,329 @@
+%% Copyright (c) 2023, Loïc Hoguin <essen@ninenines.eu>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(cow_http3_machine).
+
+-export([init/2]).
+-export([init_unidi_local_streams/7]).
+-export([init_stream/5]).
+-export([set_unidi_remote_stream_type/3]).
+-export([frame/4]).
+
+-record(stream, {
+	ref :: any(), %% @todo specs
+	id = undefined :: non_neg_integer(), %% @todo spec from quicer?
+	dir :: unidi_local | unidi_remote | bidi,
+	type :: undefined | req | control | push | encoder | decoder,
+
+	%% Further fields are only used by bidi streams.
+	%% @todo Perhaps have two different records?
+
+	%% Request method.
+	method = undefined :: binary(),
+
+	%% Whether we finished sending data.
+	local = idle :: idle | cow_http2:fin(),
+
+	%% Whether we finished receiving data.
+	remote = idle :: idle | cow_http2:fin(),
+
+	%% Size expected and read from the request body.
+	remote_expected_size = undefined :: undefined | non_neg_integer(),
+	remote_read_size = 0 :: non_neg_integer(),
+
+	%% Unparsed te header. Used to know if we can send trailers.
+	%% Note that we can always send trailers to the server.
+	te :: undefined | binary()
+}).
+
+-type stream() :: #stream{}.
+
+-record(http3_machine, {
+	%% Whether the HTTP/3 endpoint is a client or a server.
+	mode :: client | server,
+
+	%% Quick pointers for commonly used streams.
+	local_encoder_ref :: any(), %% @todo specs
+	local_decoder_ref :: any(), %% @todo specs
+
+	%% Currently active HTTP/3 streams. Streams may be initiated either
+	%% by the client or by the server through PUSH_PROMISE frames.
+	streams = #{} :: #{reference() => stream()},
+
+	%% QPACK decoding and encoding state.
+	decode_state = cow_qpack:init() :: cow_qpack:state(),
+	encode_state = cow_qpack:init() :: cow_qpack:state()
+}).
+
+-spec init(_, _) -> _. %% @todo
+
+init(Mode, _Opts) ->
+	{ok, <<>>, #http3_machine{mode=Mode}}.
+
+-spec init_unidi_local_streams(_, _, _, _, _ ,_ ,_) -> _. %% @todo
+
+init_unidi_local_streams(ControlRef, ControlID,
+		EncoderRef, EncoderID, DecoderRef, DecoderID,
+		State=#http3_machine{streams=Streams}) ->
+	State#http3_machine{
+		local_encoder_ref=EncoderRef,
+		local_decoder_ref=DecoderRef,
+		streams=Streams#{
+			ControlRef => #stream{ref=ControlRef, id=ControlID, dir=unidi_local, type=control},
+			EncoderRef => #stream{ref=EncoderRef, id=EncoderID, dir=unidi_local, type=encoder},
+			DecoderRef => #stream{ref=DecoderRef, id=DecoderID, dir=unidi_local, type=decoder}
+	}}.
+
+-spec init_stream(_, _, _, _, _) -> _. %% @todo
+
+init_stream(StreamRef, StreamID, StreamDir, StreamType,
+		State=#http3_machine{streams=Streams}) ->
+	State#http3_machine{streams=Streams#{StreamRef => #stream{
+		ref=StreamRef, id=StreamID, dir=StreamDir, type=StreamType}}}.
+
+-spec set_unidi_remote_stream_type(_, _, _) -> _. %% @todo
+
+set_unidi_remote_stream_type(StreamRef, Type,
+		State=#http3_machine{streams=Streams}) ->
+	#{StreamRef := Stream} = Streams,
+	State#http3_machine{streams=Streams#{StreamRef => Stream#stream{type=Type}}}.
+
+-spec frame(_, _, _, _) -> _. %% @todo
+
+frame(Frame, IsFin, StreamRef, State) ->
+	case element(1, Frame) of
+		headers -> headers_frame(Frame, IsFin, StreamRef, State);
+		settings -> {ok, State} %% @todo
+	end.
+
+headers_frame(Frame, IsFin, StreamRef, State=#http3_machine{mode=Mode}) ->
+	case Mode of
+		server -> server_headers_frame(Frame, IsFin, StreamRef, State)
+	end.
+
+%% @todo We may receive HEADERS before or after DATA.
+server_headers_frame(Frame, IsFin, StreamRef, State=#http3_machine{streams=Streams}) ->
+	case Streams of
+		%% Headers.
+		#{StreamRef := Stream=#stream{remote=idle}} ->
+			headers_decode(Frame, IsFin, Stream, State, request);
+		%% Trailers.
+		%% @todo Error out if we didn't get the full body.
+		#{StreamRef := _Stream=#stream{remote=nofin}} ->
+			todo_trailers; %% @todo
+		%% Additional frame received after trailers.
+		#{StreamRef := _Stream=#stream{remote=fin}} ->
+			todo_error %% @todo
+	end.
+
+%% @todo Check whether connection_error or stream_error fits better.
+headers_decode({headers, EncodedFieldSection}, IsFin, Stream=#stream{id=StreamID},
+		State=#http3_machine{decode_state=DecodeState0}, Type) ->
+	try cow_qpack:decode_field_section(EncodedFieldSection, StreamID, DecodeState0) of
+		{ok, Headers, DecData, DecodeState} ->
+			headers_pseudo_headers(Stream,
+				State#http3_machine{decode_state=DecodeState}, IsFin, Type, DecData, Headers);
+		{error, Reason, Human} ->
+			{error, {connection_error, Reason, Human}, State}
+	catch _:_ ->
+		{error, {connection_error, qpack_decompression_failed,
+			'Error while trying to decode QPACK-encoded header block. (RFC9204 6)'},
+			State}
+	end.
+
+%% @todo Much of the headers handling past this point is common between h2 and h3.
+
+headers_pseudo_headers(Stream, State,%=#http3_machine{local_settings=LocalSettings},
+		IsFin, Type, DecData, Headers0) when Type =:= request ->%; Type =:= push_promise ->
+%	IsExtendedConnectEnabled = maps:get(enable_connect_protocol, LocalSettings, false),
+	case request_pseudo_headers(Headers0, #{}) of
+		%% Extended CONNECT method (RFC9220).
+%		{ok, PseudoHeaders=#{method := <<"CONNECT">>, scheme := _,
+%			authority := _, path := _, protocol := _}, Headers}
+%			when IsExtendedConnectEnabled ->
+%			headers_regular_headers(Frame, State, Type, Stream, PseudoHeaders, Headers);
+%		{ok, #{method := <<"CONNECT">>, scheme := _,
+%			authority := _, path := _}, _}
+%			when IsExtendedConnectEnabled ->
+%			headers_malformed(Stream, State,
+%				'The :protocol pseudo-header MUST be sent with an extended CONNECT. (RFC8441 4)');
+		{ok, #{protocol := _}, _} ->
+			headers_malformed(Stream, State,
+				'The :protocol pseudo-header is only defined for the extended CONNECT. (RFC8441 4)');
+		%% Normal CONNECT (no scheme/path).
+		{ok, PseudoHeaders=#{method := <<"CONNECT">>, authority := _}, Headers}
+				when map_size(PseudoHeaders) =:= 2 ->
+			headers_regular_headers(Stream, State, IsFin, Type, DecData, PseudoHeaders, Headers);
+		{ok, #{method := <<"CONNECT">>}, _} ->
+			headers_malformed(Stream, State,
+				'CONNECT requests only use the :method and :authority pseudo-headers. (RFC7540 8.3)');
+		%% Other requests.
+		{ok, PseudoHeaders=#{method := _, scheme := _, path := _}, Headers} ->
+			headers_regular_headers(Stream, State, IsFin, Type, DecData, PseudoHeaders, Headers);
+		{ok, _, _} ->
+			headers_malformed(Stream, State,
+				'A required pseudo-header was not found. (RFC7540 8.1.2.3)');
+		{error, HumanReadable} ->
+			headers_malformed(Stream, State, HumanReadable)
+	end.
+
+%% @todo This function was copy pasted from cow_http2_machine. Export instead.
+request_pseudo_headers([{<<":method">>, _}|_], #{method := _}) ->
+	{error, 'Multiple :method pseudo-headers were found. (RFC7540 8.1.2.3)'};
+request_pseudo_headers([{<<":method">>, Method}|Tail], PseudoHeaders) ->
+	request_pseudo_headers(Tail, PseudoHeaders#{method => Method});
+request_pseudo_headers([{<<":scheme">>, _}|_], #{scheme := _}) ->
+	{error, 'Multiple :scheme pseudo-headers were found. (RFC7540 8.1.2.3)'};
+request_pseudo_headers([{<<":scheme">>, Scheme}|Tail], PseudoHeaders) ->
+	request_pseudo_headers(Tail, PseudoHeaders#{scheme => Scheme});
+request_pseudo_headers([{<<":authority">>, _}|_], #{authority := _}) ->
+	{error, 'Multiple :authority pseudo-headers were found. (RFC7540 8.1.2.3)'};
+request_pseudo_headers([{<<":authority">>, Authority}|Tail], PseudoHeaders) ->
+	request_pseudo_headers(Tail, PseudoHeaders#{authority => Authority});
+request_pseudo_headers([{<<":path">>, _}|_], #{path := _}) ->
+	{error, 'Multiple :path pseudo-headers were found. (RFC7540 8.1.2.3)'};
+request_pseudo_headers([{<<":path">>, Path}|Tail], PseudoHeaders) ->
+	request_pseudo_headers(Tail, PseudoHeaders#{path => Path});
+request_pseudo_headers([{<<":protocol">>, _}|_], #{protocol := _}) ->
+	{error, 'Multiple :protocol pseudo-headers were found. (RFC7540 8.1.2.3)'};
+request_pseudo_headers([{<<":protocol">>, Protocol}|Tail], PseudoHeaders) ->
+	request_pseudo_headers(Tail, PseudoHeaders#{protocol => Protocol});
+request_pseudo_headers([{<<":", _/bits>>, _}|_], _) ->
+	{error, 'An unknown or invalid pseudo-header was found. (RFC7540 8.1.2.1)'};
+request_pseudo_headers(Headers, PseudoHeaders) ->
+	{ok, PseudoHeaders, Headers}.
+
+headers_malformed(#stream{id=StreamID}, State, HumanReadable) ->
+	{error, {stream_error, StreamID, h3_message_error, HumanReadable}, State}.
+
+%% Rejecting invalid regular headers might be a bit too strong for clients.
+headers_regular_headers(Stream=#stream{id=_StreamID},
+		State, IsFin, Type, DecData, PseudoHeaders, Headers) ->
+	case regular_headers(Headers, Type) of
+		ok when Type =:= request ->
+			request_expected_size(Stream, State, IsFin, Type, DecData, PseudoHeaders, Headers);
+%		ok when Type =:= push_promise ->
+%			push_promise_frame(Frame, State, Stream, PseudoHeaders, Headers);
+%		ok when Type =:= response ->
+%			response_expected_size(Frame, State, Type, Stream, PseudoHeaders, Headers);
+%		ok when Type =:= trailers ->
+%			trailers_frame(Frame, State, Stream, Headers);
+		{error, HumanReadable} when Type =:= request ->
+			headers_malformed(Stream, State, HumanReadable)%;
+%		{error, HumanReadable} ->
+%			stream_reset(StreamID, State, protocol_error, HumanReadable)
+	end.
+
+%% @todo This function was copy pasted from cow_http2_machine. Export instead.
+%% @todo The error reasons refer to the h2 RFC but then again h3 doesn't cover it in as much details.
+regular_headers([{<<>>, _}|_], _) ->
+	{error, 'Empty header names are not valid regular headers. (CVE-2019-9516)'};
+regular_headers([{<<":", _/bits>>, _}|_], _) ->
+	{error, 'Pseudo-headers were found after regular headers. (RFC7540 8.1.2.1)'};
+regular_headers([{<<"connection">>, _}|_], _) ->
+	{error, 'The connection header is not allowed. (RFC7540 8.1.2.2)'};
+regular_headers([{<<"keep-alive">>, _}|_], _) ->
+	{error, 'The keep-alive header is not allowed. (RFC7540 8.1.2.2)'};
+regular_headers([{<<"proxy-authenticate">>, _}|_], _) ->
+	{error, 'The proxy-authenticate header is not allowed. (RFC7540 8.1.2.2)'};
+regular_headers([{<<"proxy-authorization">>, _}|_], _) ->
+	{error, 'The proxy-authorization header is not allowed. (RFC7540 8.1.2.2)'};
+regular_headers([{<<"transfer-encoding">>, _}|_], _) ->
+	{error, 'The transfer-encoding header is not allowed. (RFC7540 8.1.2.2)'};
+regular_headers([{<<"upgrade">>, _}|_], _) ->
+	{error, 'The upgrade header is not allowed. (RFC7540 8.1.2.2)'};
+regular_headers([{<<"te">>, Value}|_], request) when Value =/= <<"trailers">> ->
+	{error, 'The te header with a value other than "trailers" is not allowed. (RFC7540 8.1.2.2)'};
+regular_headers([{<<"te">>, _}|_], Type) when Type =/= request ->
+	{error, 'The te header is only allowed in request headers. (RFC7540 8.1.2.2)'};
+regular_headers([{Name, _}|Tail], Type) ->
+	Pattern = [
+		<<$A>>, <<$B>>, <<$C>>, <<$D>>, <<$E>>, <<$F>>, <<$G>>, <<$H>>, <<$I>>,
+		<<$J>>, <<$K>>, <<$L>>, <<$M>>, <<$N>>, <<$O>>, <<$P>>, <<$Q>>, <<$R>>,
+		<<$S>>, <<$T>>, <<$U>>, <<$V>>, <<$W>>, <<$X>>, <<$Y>>, <<$Z>>
+	],
+	case binary:match(Name, Pattern) of
+		nomatch -> regular_headers(Tail, Type);
+		_ -> {error, 'Header names must be lowercase. (RFC7540 8.1.2)'}
+	end;
+regular_headers([], _) ->
+	ok.
+
+%% @todo Much of the logic can probably be put in its own function shared between h2 and h3.
+request_expected_size(Stream, State, IsFin, Type, DecData, PseudoHeaders, Headers) ->
+	case [CL || {<<"content-length">>, CL} <- Headers] of
+		[] when IsFin =:= fin ->
+			headers_frame(Stream, State, IsFin, Type, DecData, PseudoHeaders, Headers, 0);
+		[] ->
+			headers_frame(Stream, State, IsFin, Type, DecData, PseudoHeaders, Headers, undefined);
+		[<<"0">>] when IsFin =:= fin ->
+			headers_frame(Stream, State, IsFin, Type, DecData, PseudoHeaders, Headers, 0);
+		[_] when IsFin =:= fin ->
+			headers_malformed(Stream, State,
+				'HEADERS frame with the END_STREAM flag contains a non-zero content-length. (RFC7540 8.1.2.6)');
+		[BinLen] ->
+			headers_parse_expected_size(Stream, State, IsFin, Type, DecData,
+				PseudoHeaders, Headers, BinLen);
+		_ ->
+			headers_malformed(Stream, State,
+				'Multiple content-length headers were received. (RFC7230 3.3.2)')
+	end.
+
+headers_parse_expected_size(Stream=#stream{id=_StreamID},
+		State, IsFin, Type, DecData, PseudoHeaders, Headers, BinLen) ->
+	try cow_http_hd:parse_content_length(BinLen) of
+		Len ->
+			headers_frame(Stream, State, IsFin, Type, DecData, PseudoHeaders, Headers, Len)
+	catch
+		_:_ ->
+			HumanReadable = 'The content-length header is invalid. (RFC7230 3.3.2)',
+			case Type of
+				request -> headers_malformed(Stream, State, HumanReadable)%;
+%				response -> stream_reset(StreamID, State, protocol_error, HumanReadable)
+			end
+	end.
+
+headers_frame(Stream0, State0=#http3_machine{local_decoder_ref=DecoderRef},
+		IsFin, Type, DecData, PseudoHeaders, Headers, Len) ->
+	Stream = case Type of
+		request ->
+			TE = case lists:keyfind(<<"te">>, 1, Headers) of
+				{_, TE0} -> TE0;
+				false -> undefined
+			end,
+			Stream0#stream{method=maps:get(method, PseudoHeaders),
+				remote=IsFin, remote_expected_size=Len, te=TE}%;
+%		response ->
+%			Stream1 = case PseudoHeaders of
+%				#{status := Status} when Status >= 100, Status =< 199 -> Stream0;
+%				_ -> Stream0#stream{remote=IsFin, remote_expected_size=Len}
+%			end,
+%			{Stream1, State0}
+	end,
+	State = stream_store(Stream, State0),
+	%% @todo Maybe don't return DecData if empty, but return the StreamRef with it if we must send.
+	case DecData of
+		<<>> ->
+			{ok, {headers, IsFin, Headers, PseudoHeaders, Len}, State};
+		_ ->
+			{ok, {headers, IsFin, Headers, PseudoHeaders, Len}, {DecoderRef, DecData}, State}
+	end.
+
+stream_store(#stream{ref=StreamRef, local=fin, remote=fin},
+		State=#http3_machine{streams=Streams0}) ->
+	Streams = maps:remove(StreamRef, Streams0),
+	State#http3_machine{streams=Streams};
+stream_store(Stream=#stream{ref=StreamRef},
+		State=#http3_machine{streams=Streams}) ->
+	State#http3_machine{streams=Streams#{StreamRef => Stream}}.

+ 141 - 58
src/cowboy_http3.erl

@@ -19,18 +19,24 @@
 -include_lib("quicer/include/quicer.hrl").
 
 -record(stream, {
-	id :: non_neg_integer(), %% @todo specs
-	dir :: unidi_local | unidi_remote | bidi,
-	ref :: any(), %% @todo specs
-	role :: undefined | req | control | push | encoder | decoder
+	ref :: any(), %% @todo specs; is it useful in the record?
+
+	%% Whether the stream is currently in a special state.
+	status :: header | normal | data | discard,
+
+	%% Stream buffer.
+	buffer = <<>> :: binary(),
+
+	%% Stream state.
+	state :: {module, any()}
 }).
 
 -record(state, {
 	parent :: pid(),
 	conn :: any(), %% @todo specs
 
-	%% Quick pointers for commonly used streams.
-	local_encoder_stream :: any(), %% @todo specs
+	%% HTTP/3 state machine.
+	http3_machine :: cow_http3_machine:http3_machine(),
 
 	%% Bidirectional streams are used for requests and responses.
 	streams = #{} :: map() %% @todo specs
@@ -38,11 +44,13 @@
 
 -spec init(_, _) -> no_return().
 init(Parent, Conn) ->
+	Opts = #{}, %% @todo
+	{ok, SettingsBin, HTTP3Machine0} = cow_http3_machine:init(server, Opts),
 	{ok, Conn} = quicer:async_accept_stream(Conn, []),
 	%% Immediately open a control, encoder and decoder stream.
 	{ok, ControlRef} = quicer:start_stream(Conn,
 		#{open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}),
-	quicer:send(ControlRef, <<0>>), %% @todo Also send settings frame.
+	quicer:send(ControlRef, [<<0>>, SettingsBin]),
 	{ok, ControlID} = quicer:get_stream_id(ControlRef),
 	{ok, EncoderRef} = quicer:start_stream(Conn,
 		#{open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}),
@@ -52,19 +60,20 @@ init(Parent, Conn) ->
 		#{open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}),
 	quicer:send(DecoderRef, <<3>>),
 	{ok, DecoderID} = quicer:get_stream_id(DecoderRef),
+	%% Set the control, encoder and decoder streams in the machine.
+	HTTP3Machine = cow_http3_machine:init_unidi_local_streams(
+		ControlRef, ControlID, EncoderRef, EncoderID, DecoderRef, DecoderID,
+		HTTP3Machine0),
 	%% Quick! Let's go!
-	loop(#state{parent=Parent, conn=Conn, local_encoder_stream=EncoderRef, streams=#{
-		ControlRef => #stream{id=ControlID, dir=unidi_local, ref=ControlRef, role=control},
-		EncoderRef => #stream{id=EncoderID, dir=unidi_local, ref=EncoderRef, role=encoder},
-		DecoderRef => #stream{id=DecoderID, dir=unidi_local, ref=DecoderRef, role=decoder}
-	}}).
+	loop(#state{parent=Parent, conn=Conn, http3_machine=HTTP3Machine}).
 
 loop(State0=#state{conn=Conn}) ->
 	receive
 		%% Stream data.
+		%% @todo IsFin is inside Props. But it may not be set once the data was sent.
 		{quic, Data, StreamRef, Props} when is_binary(Data) ->
-			State = stream_data(Data, State0, StreamRef, Props),
-			loop(State);
+			logger:error("DATA ~p props ~p", [StreamRef, Props]),
+			parse(State0, Data, StreamRef, Props);
 		%% QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED
 		{quic, new_stream, StreamRef, Flags} ->
 			%% Conn does not change.
@@ -102,68 +111,142 @@ loop(State0=#state{conn=Conn}) ->
 			loop(State0)
 	end.
 
-stream_new_remote(State=#state{streams=Streams}, StreamRef, Flags) ->
-	{ok, StreamID} = quicer:get_stream_id(StreamRef),
-	{StreamDir, Role} = case quicer:is_unidirectional(Flags) of
-		true -> {unidi_remote, undefined};
-		false -> {bidi, req}
+parse(State=#state{streams=Streams}, Data, StreamRef, Props) ->
+	#{StreamRef := Stream} = Streams,
+	case Stream of
+		#stream{buffer= <<>>} ->
+			parse1(State, Data, Stream, Props);
+		#stream{buffer=Buffer} ->
+			parse1(State, <<Buffer/binary, Data/binary>>,
+				Stream#stream{buffer= <<>>}, Props)
+	end.
+
+%% @todo Swap Data and Stream/StreamRef.
+parse1(State, Data, Stream=#stream{status=header}, Props) ->
+	parse_unidirectional_stream_header(State, Data, Stream, Props);
+%% @todo Continuation clause for data frames.
+%% @todo Clause that discards receiving data for aborted streams.
+parse1(State, Data, Stream, Props) ->
+	case cow_http3:parse(Data) of
+		{ok, Frame, Rest} ->
+			parse1(frame(State, Stream, Frame, Props), Rest, Stream, Props);
+		{more, Frame, _Len} ->
+			%% @todo Change state of stream to expect more data frames.
+			loop(frame(State, Stream, Frame, Props));
+		{ignore, Rest} ->
+			parse1(ignored_frame(State, Stream), Rest, Stream, Props);
+		Error = {connection_error, _, _} ->
+			terminate(State, Error);
+		more ->
+			loop(stream_update(State, Stream#stream{buffer=Data}))
+	end.
+
+parse_unidirectional_stream_header(State0=#state{http3_machine=HTTP3Machine0},
+		Data, Stream0=#stream{ref=StreamRef}, Props) ->
+	case cow_http3:parse_unidi_stream_header(Data) of
+		{ok, Type, Rest} when Type =:= control; Type =:= encoder; Type =:= decoder ->
+			HTTP3Machine = cow_http3_machine:set_unidi_remote_stream_type(
+				StreamRef, Type, HTTP3Machine0),
+			State = State0#state{http3_machine=HTTP3Machine},
+			Stream = Stream0#stream{status=normal},
+			parse1(stream_update(State, Stream), Rest, Stream, Props);
+		{ok, push, _} ->
+			terminate(State0, {connection_error, h3_stream_creation_error,
+				'Only servers can push. (RFC9114 6.2.2)'});
+		%% Unknown stream types must be ignored. We choose to abort the
+		%% stream instead of reading and discarding the incoming data.
+		{undefined, _} ->
+			loop(stream_abort_receive(State0, Stream0, h3_stream_creation_error))
+	end.
+
+frame(State=#state{http3_machine=HTTP3Machine0}, Stream=#stream{ref=StreamRef}, Frame, Props) ->
+	#{flags := Flags} = Props,
+	IsFin = case Flags band ?QUIC_RECEIVE_FLAG_FIN of
+		?QUIC_RECEIVE_FLAG_FIN -> fin;
+		_ -> nofin
 	end,
-	Stream = #stream{id=StreamID, dir=StreamDir, ref=StreamRef, role=Role},
-	logger:debug("new stream ~p", [Stream]),
-	State#state{streams=Streams#{StreamRef => Stream}}.
+	case cow_http3_machine:frame(Frame, IsFin, StreamRef, HTTP3Machine0) of
+		{ok, HTTP3Machine} ->
+			State#state{http3_machine=HTTP3Machine};
+		{ok, {headers, IsFin, Headers, PseudoHeaders, BodyLen}, HTTP3Machine} ->
+			headers_frame(State#state{http3_machine=HTTP3Machine},
+				Stream, Headers, PseudoHeaders, BodyLen);
+		{ok, {headers, IsFin, Headers, PseudoHeaders, BodyLen},
+				{DecoderRef, DecData}, HTTP3Machine} ->
+			%% Send the decoder data.
+			quicer:send(DecoderRef, DecData),
+			headers_frame(State#state{http3_machine=HTTP3Machine},
+				Stream, Headers, PseudoHeaders, BodyLen)
+	end.
 
-stream_data(Data, State=#state{streams=Streams}, StreamRef, _Props) ->
-	#{StreamRef := Stream} = Streams,
-	stream_data2(Data, State, Stream).
-
-stream_data2(Data, State, Stream=#stream{role=req}) ->
-	stream_data_req(State, Data, Stream);
-stream_data2(_Data, State, _Stream=#stream{role=control}) ->
-	State; %stream_data_control(...);
-stream_data2(_Data, State, _Stream=#stream{role=encoder}) ->
-	State; %stream_data_encoder(...);
-stream_data2(_Data, State, _Stream=#stream{role=decoder}) ->
-	State; %stream_data_decoder(...);
-stream_data2(Data, State, Stream=#stream{role=undefined, dir=unidi_remote}) ->
-	stream_data_undefined(State, Data, Stream).
-
-%% @todo Frame type and length are using https://www.rfc-editor.org/rfc/rfc9000.html#name-variable-length-integer-enc
-%% @todo Check stream state and update it afterwards.
-stream_data_req(State=#state{local_encoder_stream=EncoderRef},
-		Req = <<1, _Len, FieldsBin/binary>>, #stream{ref=StreamRef}) ->
-	logger:debug("data ~p~nfields ~p", [Req, cow_qpack:decode_field_section(FieldsBin, 0, cow_qpack:init())]),
-	StreamID = quicer:get_stream_id(StreamRef),
+headers_frame(State, Stream=#stream{ref=StreamRef}, Headers, PseudoHeaders, BodyLen) ->
+	logger:error("~p~n~p~n~p~n~p~n~p", [State, Stream, Headers, PseudoHeaders, BodyLen]),
+	{ok, StreamID} = quicer:get_stream_id(StreamRef),
 	{ok, Data, EncData, _} = cow_qpack:encode_field_section([
 		{<<":status">>, <<"200">>},
 		{<<"content-length">>, <<"12">>},
 		{<<"content-type">>, <<"text/plain">>}
 	], StreamID, cow_qpack:init()),
-	%% Send the encoder data.
-	quicer:send(EncoderRef, EncData),
+%	%% Send the encoder data.
+%	quicer:send(EncoderRef, EncData),
 	%% Then the response data.
 	DataLen = iolist_size(Data),
 	quicer:send(StreamRef, [<<1, DataLen>>, Data]),
 	quicer:send(StreamRef, <<0,12,"Hello world!">>, ?QUIC_SEND_FLAG_FIN),
 %	quicer:shutdown_stream(StreamRef),
-	logger:debug("sent response ~p~nenc data ~p", [iolist_to_binary([<<1, DataLen>>, Data]), EncData]),
+	logger:error("sent response ~p~nenc data ~p", [iolist_to_binary([<<1, DataLen>>, Data]), EncData]),
 	State.
 
-%% @todo stream_control
-%% @todo stream_encoder
-%% @todo stream_decoder
+%% @todo In ignored_frame we must check for example that the frame
+%%       we received wasn't the first frame in a control stream
+%%       as that one must be SETTINGS.
+ignored_frame(State, _) ->
+	State.
 
-%% @todo We should probably reject, not crash, unknown/bad types.
-stream_data_undefined(State, <<TypeBin, Rest/bits>>, Stream0) ->
-	Role = case TypeBin of
-		0 -> control;
-		2 -> encoder;
-		3 -> decoder
+stream_abort_receive(State, Stream=#stream{ref=StreamRef}, Reason) ->
+	quicer:shutdown_stream(StreamRef, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE,
+		error_code(Reason), infinity),
+	stream_update(State, Stream#stream{status=discard}).
+
+%% @todo
+terminate(_State, Error) ->
+	exit({shutdown, Error}).
+
+%% @todo qpack errors
+error_code(h3_no_error) -> 16#0100;
+error_code(h3_general_protocol_error) -> 16#0101;
+error_code(h3_internal_error) -> 16#0102;
+error_code(h3_stream_creation_error) -> 16#0103;
+error_code(h3_closed_critical_stream) -> 16#0104;
+error_code(h3_frame_unexpected) -> 16#0105;
+error_code(h3_frame_error) -> 16#0106;
+error_code(h3_excessive_load) -> 16#0107;
+error_code(h3_id_error) -> 16#0108;
+error_code(h3_settings_error) -> 16#0109;
+error_code(h3_missing_settings) -> 16#010a;
+error_code(h3_request_rejected) -> 16#010b;
+error_code(h3_request_cancelled) -> 16#010c;
+error_code(h3_request_incomplete) -> 16#010d;
+error_code(h3_message_error) -> 16#010e;
+error_code(h3_connect_error) -> 16#010f;
+error_code(h3_version_fallback) -> 16#0110.
+
+stream_new_remote(State=#state{http3_machine=HTTP3Machine0, streams=Streams}, StreamRef, Flags) ->
+	{ok, StreamID} = quicer:get_stream_id(StreamRef),
+	{StreamDir, StreamType, Status} = case quicer:is_unidirectional(Flags) of
+		true -> {unidi_remote, undefined, header};
+		false -> {bidi, req, normal}
 	end,
-	Stream = Stream0#stream{role=Role},
-	stream_data2(Rest, stream_update(State, Stream), Stream).
+	HTTP3Machine = cow_http3_machine:init_stream(StreamRef,
+		StreamID, StreamDir, StreamType, HTTP3Machine0),
+	Stream = #stream{ref=StreamRef, status=Status},
+	logger:error("new stream ~p ~p", [Stream, HTTP3Machine]),
+	State#state{http3_machine=HTTP3Machine, streams=Streams#{StreamRef => Stream}}.
 
 stream_closed(State=#state{streams=Streams0}, StreamRef, _Flags) ->
-	{_Stream, Streams} = maps:take(StreamRef, Streams0),
+	%% @todo Some streams may not be bidi or remote. Need to inform cow_http3_machine too.
+	logger:error("stream_closed ~p", [StreamRef]),
+	Streams = maps:remove(StreamRef, Streams0),
 	%% @todo terminate stream
 	State#state{streams=Streams}.