Browse Source

Progress!!

Loïc Hoguin 1 year ago
parent
commit
7979c314e9
9 changed files with 365 additions and 204 deletions
  1. 6 0
      Makefile
  2. 1 1
      ebin/cowboy.app
  3. 2 0
      src/cow_http3_machine.erl
  4. 32 20
      src/cowboy.erl
  5. 146 171
      src/cowboy_http3.erl
  6. 154 0
      src/cowboy_quicer.erl
  7. 1 1
      test/cowboy_test.erl
  8. 3 1
      test/metrics_SUITE.erl
  9. 20 10
      test/rfc9114_SUITE.erl

+ 6 - 0
Makefile

@@ -18,6 +18,7 @@ DEPS = cowlib ranch
 dep_cowlib = git https://github.com/ninenines/cowlib qpack
 dep_ranch = git https://github.com/ninenines/ranch 1.8.0
 
+# @todo Only if COWBOY_QUICER is set.
 DEPS += quicer
 dep_quicer = git https://github.com/emqx/quic main
 
@@ -70,6 +71,11 @@ TEST_ERLC_OPTS += +'{parse_transform, eunit_autoexport}'
 
 app:: rebar.config
 
+# Fix quicer compilation for HTTP/3.
+
+autopatch-quicer::
+	$(verbose) printf "%s\n" "all: ;" > $(DEPS_DIR)/quicer/c_src/Makefile.erlang.mk
+
 # Dialyze the tests.
 
 DIALYZER_OPTS += --src -r test

+ 1 - 1
ebin/cowboy.app

@@ -1,7 +1,7 @@
 {application, 'cowboy', [
 	{description, "Small, fast, modern HTTP server."},
 	{vsn, "2.10.0"},
-	{modules, ['cow_http3','cow_http3_machine','cowboy','cowboy_app','cowboy_bstr','cowboy_children','cowboy_clear','cowboy_clock','cowboy_compress_h','cowboy_constraints','cowboy_handler','cowboy_http','cowboy_http2','cowboy_http3','cowboy_loop','cowboy_metrics_h','cowboy_middleware','cowboy_req','cowboy_rest','cowboy_router','cowboy_static','cowboy_stream','cowboy_stream_h','cowboy_sub_protocol','cowboy_sup','cowboy_tls','cowboy_tracer_h','cowboy_websocket','quic_hello_h']},
+	{modules, ['cow_http3','cow_http3_machine','cowboy','cowboy_app','cowboy_bstr','cowboy_children','cowboy_clear','cowboy_clock','cowboy_compress_h','cowboy_constraints','cowboy_handler','cowboy_http','cowboy_http2','cowboy_http3','cowboy_loop','cowboy_metrics_h','cowboy_middleware','cowboy_quicer','cowboy_req','cowboy_rest','cowboy_router','cowboy_static','cowboy_stream','cowboy_stream_h','cowboy_sub_protocol','cowboy_sup','cowboy_tls','cowboy_tracer_h','cowboy_websocket','quic_hello_h']},
 	{registered, [cowboy_sup,cowboy_clock]},
 	{applications, [kernel,stdlib,crypto,cowlib,ranch,quicer]},
 	{mod, {cowboy_app, []}},

+ 2 - 0
src/cow_http3_machine.erl

@@ -157,11 +157,13 @@ close_bidi_stream_for_sending(StreamID, State=#http3_machine{streams=Streams}) -
 -spec close_stream(_, _) -> _. %% @todo
 
 close_stream(StreamID, State=#http3_machine{streams=Streams0}) ->
+%	ct:pal("~p close_stream ~p ~p", [self(), StreamID, Streams0]),
 	case maps:take(StreamID, Streams0) of
 		{#stream{type=control}, Streams} ->
 			{error, {connection_error, h3_closed_critical_stream,
 				'The control stream was closed. (RFC9114 6.2.1)'},
 				State#http3_machine{streams=Streams}};
+		%% @todo We should also error out when the QPACK streams get closed.
 		{_, Streams} ->
 			{ok, State#http3_machine{streams=Streams}}
 	end.

+ 32 - 20
src/cowboy.erl

@@ -82,27 +82,39 @@ start_quic(TransOpts, ProtoOpts) ->
 		{peer_unidi_stream_count, 3}, %% We only need control and QPACK enc/dec.
 		{peer_bidi_stream_count, 100}
 	|SocketOpts2],
-	{ok, Listener} = quicer:listen(Port, SocketOpts),
-	ct:pal("listen ~p", [Listener]),
-	_ListenerPid = spawn(fun AcceptLoop() ->
-		{ok, Conn} = quicer:accept(Listener, []),
-		ct:pal("accept ~p", [Conn]),
-		{ok, Conn} = quicer:handshake(Conn),
-		ct:pal("handshake ~p", [Conn]),
-		Pid = spawn(fun() ->
-			receive go -> ok end,
-			process_flag(trap_exit, true), %% @todo Only if supervisor though.
-			try cowboy_http3:init(Parent, Conn, ProtoOpts)
-			catch
-				exit:{shutdown,_} -> ok;
-				C:E:S -> ct:pal("CRASH ~p:~p:~p", [C,E,S])
-			end
-		end),
-		ok = quicer:controlling_process(Conn, Pid),
-		Pid ! go,
-		AcceptLoop()
+	_ListenerPid = spawn(fun() ->
+		{ok, Listener} = quicer:listen(Port, SocketOpts),
+		Parent ! {ok, Listener},
+		ct:pal("listen ~p", [Listener]),
+		_AcceptorPid = [spawn(fun AcceptLoop() ->
+			{ok, Conn} = quicer:accept(Listener, []),
+%			ct:pal("accept ~p", [Conn]),
+			Pid = spawn(fun() ->
+				receive go -> ok end,
+				%% We have to do the handshake after handing control of
+				%% the connection otherwise streams may come in before
+				%% the controlling process is changed and messages will
+				%% not be sent to the correct process.
+				{ok, Conn} = quicer:handshake(Conn),
+%				ct:pal("handshake ~p", [Conn]),
+				process_flag(trap_exit, true), %% @todo Only if supervisor though.
+				try cowboy_http3:init(Parent, Conn, ProtoOpts)
+				catch
+					exit:{shutdown,_} -> ok;
+					C:E:S -> ct:pal("CRASH ~p:~p:~p", [C,E,S])
+				end
+			end),
+			ok = quicer:controlling_process(Conn, Pid),
+			Pid ! go,
+			AcceptLoop()
+		end) || _ <- lists:seq(1, 20)],
+		%% Listener process must not terminate.
+		receive after infinity -> ok end
 	end),
-	{ok, Listener}.
+	receive
+		{ok, Listener} ->
+			{ok, Listener}
+	end.
 
 %% Select a random UDP port using gen_udp because quicer
 %% does not provide equivalent functionality. Taken from

+ 146 - 171
src/cowboy_http3.erl

@@ -24,11 +24,8 @@
 %% Temporary callback to do sendfile over QUIC.
 -export([send/2]).
 
--include_lib("quicer/include/quicer.hrl").
-
 -record(stream, {
 	id :: non_neg_integer(), %% @todo specs
-	ref :: any(), %% @todo specs
 
 	%% Whether the stream is currently in a special state.
 	status :: header | normal | {data, non_neg_integer()} | stopping,
@@ -55,18 +52,19 @@
 	http3_machine :: cow_http3_machine:http3_machine(),
 
 	%% Quick pointers for commonly used streams.
-	local_control_ref :: any(), %% @todo specs Control stream must not be closed.
-	local_encoder_ref :: any(), %% @todo specs
-	local_decoder_ref :: any(), %% @todo specs
+	local_control_id :: any(), %% @todo specs Control stream must not be closed.
+	local_encoder_id :: any(), %% @todo specs
+	local_decoder_id :: any(), %% @todo specs
 
 	%% Bidirectional streams used for requests and responses,
 	%% as well as unidirectional streams initiated by the client.
 	streams = #{} :: map(), %% @todo specs
+	%% @todo a ref/id map because stream_closed we don't have the id
 
 	%% Lingering streams that were recently reset. We may receive
 	%% pending data or messages for these streams a short while
 	%% after they have been reset.
-	lingering_streams = [] :: [reference()],
+	lingering_streams = [] :: [non_neg_integer()],
 
 	%% Streams can spawn zero or more children which are then managed
 	%% by this module if operating as a supervisor.
@@ -74,37 +72,30 @@
 }).
 
 -spec init(_, _, _) -> no_return().
+
 init(Parent, Conn, Opts) ->
-ct:pal("init"),
+%ct:pal("init"),
 	{ok, SettingsBin, HTTP3Machine0} = cow_http3_machine:init(server, Opts),
 	%% Immediately open a control, encoder and decoder stream.
-	{ok, ControlRef} = quicer:start_stream(Conn,
-		#{open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}),
-	{ok, ControlID} = quicer:get_stream_id(ControlRef),
-	{ok, _} = quicer:send(ControlRef, [<<0>>, SettingsBin]),
-	{ok, EncoderRef} = quicer:start_stream(Conn,
-		#{open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}),
-	{ok, EncoderID} = quicer:get_stream_id(EncoderRef),
-	{ok, _} = quicer:send(EncoderRef, <<2>>),
-	{ok, DecoderRef} = quicer:start_stream(Conn,
-		#{open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}),
-	{ok, DecoderID} = quicer:get_stream_id(DecoderRef),
-	{ok, _} = quicer:send(DecoderRef, <<3>>),
+	{ok, ControlID} = cowboy_quicer:start_unidi_stream(Conn, [<<0>>, SettingsBin]),
+	{ok, EncoderID} = cowboy_quicer:start_unidi_stream(Conn, <<2>>),
+	{ok, DecoderID} = cowboy_quicer:start_unidi_stream(Conn, <<3>>),
+%ct:pal("control ~p encoder ~p decoder ~p", [ControlID, EncoderID, DecoderID]),
 	%% Set the control, encoder and decoder streams in the machine.
 	HTTP3Machine = cow_http3_machine:init_unidi_local_streams(
 		ControlID, EncoderID, DecoderID, HTTP3Machine0),
 	%% Get the peername/sockname.
-	Peer0 = quicer:peername(Conn),
-	Sock0 = quicer:sockname(Conn),
+	Peer0 = cowboy_quicer:peername(Conn),
+	Sock0 = cowboy_quicer:sockname(Conn),
 	%% @todo Get the peer certificate here if it makes sense.
 	case {Peer0, Sock0} of
 		{{ok, Peer}, {ok, Sock}} ->
 			%% Quick! Let's go!
 			loop(#state{parent=Parent, conn=Conn, opts=Opts,
 				peer=Peer, sock=Sock, http3_machine=HTTP3Machine,
-				local_control_ref=ControlRef,
-				local_encoder_ref=EncoderRef,
-				local_decoder_ref=DecoderRef});
+				local_control_id=ControlID,
+				local_encoder_id=EncoderID,
+				local_decoder_id=DecoderID});
 		{{error, Reason}, _} ->
 			terminate(undefined, {socket_error, Reason,
 				'A socket error occurred when retrieving the peer name.'});
@@ -113,51 +104,11 @@ ct:pal("init"),
 				'A socket error occurred when retrieving the sock name.'})
 	end.
 
-loop(State0=#state{conn=Conn, children=Children}) ->
+loop(State0=#state{children=Children}) ->
 %ct:pal("~p", [process_info(self(), messages)]),
 	receive
-		%% Stream data.
-		%% @todo IsFin is inside Props. But it may not be set once the data was sent.
-		{quic, Data, StreamRef, Props} when is_binary(Data) ->
-%			ct:pal("DATA ~p props ~p", [StreamRef, Props]),
-			{ok, StreamID} = quicer:get_stream_id(StreamRef),
-			parse(State0, Data, StreamID, Props);
-		%% QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED
-		{quic, new_stream, StreamRef, #{flags := Flags}} ->
-%			ct:pal("new_stream ~p flags ~p", [StreamRef, Flags]),
-			ok = quicer:setopt(StreamRef, active, true),
-			{ok, StreamID} = quicer:get_stream_id(StreamRef),
-			State = stream_new_remote(State0, StreamID, StreamRef, Flags),
-			loop(State);
-		%% QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE
-		{quic, stream_closed, StreamRef, Flags} ->
-%			ct:pal("stream_closed ~p flags ~p", [StreamRef, Flags]),
-			{ok, StreamID} = quicer:get_stream_id(StreamRef),
-			State = stream_closed(State0, StreamID, Flags),
-			loop(State);
-		%% QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE
-		%%
-		%% Connection closed.
-		{quic, closed, Conn, _Flags} ->
-			quicer:close_connection(Conn),
-			%% @todo terminate here?
-			ok;
-		%%
-		%% The following events are currently ignored either because
-		%% I do not know what they do or because we do not need to
-		%% take action.
-		%%
-		%% QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT
-		{quic, transport_shutdown, Conn, _Flags} ->
-			%% @todo Why isn't it BY_PEER when using curl?
-			loop(State0);
-		%% QUIC_STREAM_EVENT_PEER_SEND_SHUTDOWN
-		{quic, peer_send_shutdown, _StreamRef, undefined} ->
-%			ct:pal("peer_send_shutdown ~p", [StreamRef]),
-			loop(State0);
-		%% QUIC_STREAM_EVENT_SEND_SHUTDOWN_COMPLETE
-		{quic, send_shutdown_complete, _StreamRef, _IsGraceful} ->
-			loop(State0);
+		Msg when element(1, Msg) =:= quic ->
+			handle_quic_msg(State0, Msg);
 		%% Timeouts.
 		{timeout, Ref, {shutdown, Pid}} ->
 			cowboy_children:shutdown_timeout(Children, Ref, Pid),
@@ -173,14 +124,34 @@ loop(State0=#state{conn=Conn, children=Children}) ->
 			loop(State0)
 	end.
 
-parse(State=#state{opts=Opts}, Data, StreamID, Props) ->
+handle_quic_msg(State0, Msg) ->
+	case cowboy_quicer:handle(Msg) of
+		{data, StreamID, IsFin, Data} ->
+%			ct:pal("{data, ~p, ~p, ~p}", [StreamID, IsFin, Data]),
+			parse(State0, Data, StreamID, IsFin);
+		{stream_started, StreamID, StreamType} ->
+%			ct:pal("~p stream_started ~p ~p", [self(), StreamID, StreamType]),
+			State = stream_new_remote(State0, StreamID, StreamType),
+			loop(State);
+		{stream_closed, StreamID, ErrorCode} ->
+%			ct:pal("stream_closed ~p state ~p code ~p", [StreamID, State0, ErrorCode]),
+			State = stream_closed(State0, StreamID, ErrorCode),
+			loop(State);
+		closed ->
+			%% @todo terminate here?
+			ok;
+		ok ->
+			loop(State0)
+	end.
+
+parse(State=#state{opts=Opts}, Data, StreamID, IsFin) ->
 	case stream_get(State, StreamID) of
 		Stream=#stream{buffer= <<>>} ->
-			parse1(State, Data, Stream, Props);
+			parse1(State, Data, Stream, IsFin);
 		Stream=#stream{buffer=Buffer} ->
 			Stream1 = Stream#stream{buffer= <<>>},
 			parse1(stream_store(State, Stream1),
-				<<Buffer/binary, Data/binary>>, Stream1, Props);
+				<<Buffer/binary, Data/binary>>, Stream1, IsFin);
 		%% Pending data for a stream that has been reset. Ignore.
 		error ->
 			case is_lingering_stream(State, StreamID) of
@@ -195,30 +166,31 @@ parse(State=#state{opts=Opts}, Data, StreamID, Props) ->
 	end.
 
 %% @todo Swap Data and Stream/StreamID?
-parse1(State, Data, Stream=#stream{status=header}, Props) ->
-	parse_unidirectional_stream_header(State, Data, Stream, Props);
-parse1(State, Data, Stream=#stream{status={data, Len}, id=StreamID}, Props) ->
+parse1(State, Data, Stream=#stream{status=header}, IsFin) ->
+	parse_unidirectional_stream_header(State, Data, Stream, IsFin);
+parse1(State, Data, Stream=#stream{status={data, Len}, id=StreamID}, IsFin) ->
 	DataLen = byte_size(Data),
 	if
 		DataLen < Len ->
-			IsFin = is_fin(Props, <<>>),
+			%% We don't have the full frame but this is the end of the
+			%% data we have. So FrameIsFin is equivalent to IsFin here.
 			loop(frame(State, Stream#stream{status={data, Len - DataLen}}, {data, Data}, IsFin));
 		true ->
 			<<Data1:Len/binary, Rest/bits>> = Data,
-			IsFin = is_fin(Props, Rest),
-			parse(frame(State, Stream#stream{status=normal}, {data, Data1}, IsFin),
-				Rest, StreamID, Props)
+			FrameIsFin = is_fin(IsFin, Rest),
+			parse(frame(State, Stream#stream{status=normal}, {data, Data1}, FrameIsFin),
+				Rest, StreamID, IsFin)
 	end;
 %% @todo Clause that discards receiving data for stopping streams.
 %%       We may receive a few more frames after we abort receiving.
-parse1(State, Data, Stream=#stream{id=StreamID}, Props) ->
+parse1(State, Data, Stream=#stream{id=StreamID}, IsFin) ->
 	case cow_http3:parse(Data) of
 		{ok, Frame, Rest} ->
-			IsFin = is_fin(Props, Rest),
+			FrameIsFin = is_fin(IsFin, Rest),
 %			ct:pal("parse1 Frame= ~p Rest= ~p", [Frame, Rest]),
-			parse(frame(State, Stream, Frame, IsFin), Rest, StreamID, Props);
+			parse(frame(State, Stream, Frame, FrameIsFin), Rest, StreamID, IsFin);
 		{more, Frame, Len} ->
-			IsFin = is_fin(Props, <<>>),
+			%% We're at the end of the data so FrameIsFin is equivalent to IsFin.
 			case IsFin of
 				nofin ->
 					loop(frame(State, Stream#stream{status={data, Len}}, Frame, nofin));
@@ -227,13 +199,13 @@ parse1(State, Data, Stream=#stream{id=StreamID}, Props) ->
 						'Last frame on stream was truncated. (RFC9114 7.1)'})
 			end;
 		{ignore, Rest} ->
-			parse(ignored_frame(State, Stream), Rest, StreamID, Props);
+			parse(ignored_frame(State, Stream), Rest, StreamID, IsFin);
 		Error = {connection_error, _, _} ->
 			terminate(State, Error);
 		more when Data =:= <<>> ->
 			loop(stream_store(State, Stream#stream{buffer=Data}));
 		more ->
-			IsFin = is_fin(Props, <<>>),
+			%% We're at the end of the data so FrameIsFin is equivalent to IsFin.
 			case IsFin of
 				nofin ->
 					loop(stream_store(State, Stream#stream{buffer=Data}));
@@ -247,14 +219,11 @@ parse1(State, Data, Stream=#stream{id=StreamID}, Props) ->
 %% The FIN flag applies to the QUIC packet, not to the frame.
 %% We must therefore only consider the frame to have a FIN
 %% flag if there's no data remaining to be read.
-is_fin(#{flags := Flags}, Rest) ->
-	case Flags band ?QUIC_RECEIVE_FLAG_FIN of
-		?QUIC_RECEIVE_FLAG_FIN when Rest =:= <<>> -> fin;
-		_ -> nofin
-	end.
+is_fin(fin, <<>>) -> fin;
+is_fin(_, _) -> nofin.
 
 parse_unidirectional_stream_header(State0=#state{http3_machine=HTTP3Machine0},
-		Data, Stream0=#stream{id=StreamID}, Props) ->
+		Data, Stream0=#stream{id=StreamID}, IsFin) ->
 	case cow_http3:parse_unidi_stream_header(Data) of
 		{ok, Type, Rest} when Type =:= control; Type =:= encoder; Type =:= decoder ->
 			case cow_http3_machine:set_unidi_remote_stream_type(
@@ -262,7 +231,7 @@ parse_unidirectional_stream_header(State0=#state{http3_machine=HTTP3Machine0},
 				{ok, HTTP3Machine} ->
 					State = State0#state{http3_machine=HTTP3Machine},
 					Stream = Stream0#stream{status=normal},
-					parse(stream_store(State, Stream), Rest, StreamID, Props);
+					parse(stream_store(State, Stream), Rest, StreamID, IsFin);
 				{error, Error={connection_error, _, _}, HTTP3Machine} ->
 					terminate(State0#state{http3_machine=HTTP3Machine}, Error)
 			end;
@@ -275,7 +244,7 @@ parse_unidirectional_stream_header(State0=#state{http3_machine=HTTP3Machine0},
 			loop(stream_abort_receive(State0, Stream0, h3_stream_creation_error))
 	end.
 
-frame(State=#state{http3_machine=HTTP3Machine0, local_decoder_ref=DecoderRef},
+frame(State=#state{http3_machine=HTTP3Machine0, conn=Conn, local_decoder_id=DecoderID},
 		Stream=#stream{id=StreamID}, Frame, IsFin) ->
 %	ct:pal("cowboy frame ~p ~p", [Frame, IsFin]),
 	case cow_http3_machine:frame(Frame, IsFin, StreamID, HTTP3Machine0) of
@@ -289,7 +258,7 @@ frame(State=#state{http3_machine=HTTP3Machine0, local_decoder_ref=DecoderRef},
 				Stream, IsFin, Headers, PseudoHeaders, BodyLen);
 		{ok, {headers, IsFin, Headers, PseudoHeaders, BodyLen}, DecData, HTTP3Machine} ->
 			%% Send the decoder data.
-			{ok, _} = quicer:send(DecoderRef, DecData),
+			ok = cowboy_quicer:send(Conn, DecoderID, DecData),
 			headers_frame(State#state{http3_machine=HTTP3Machine},
 				Stream, IsFin, Headers, PseudoHeaders, BodyLen);
 		{ok, {trailers, _Trailers}, HTTP3Machine} ->
@@ -405,7 +374,7 @@ headers_to_map([{Name, Value}|Tail], Acc0) ->
 	headers_to_map(Tail, Acc).
 
 headers_frame(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Req) ->
-ct:pal("req ~p", [Req]),
+%ct:pal("req ~p", [Req]),
 	try cowboy_stream:init(StreamID, Req, Opts) of
 		{Commands, StreamState} ->
 %logger:error("~p", [Commands]),
@@ -528,7 +497,7 @@ commands(State0, Stream, [{inform, StatusCode, Headers}|Tail]) ->
 	commands(State, Stream, Tail);
 %% Send response headers.
 commands(State0, Stream, [{response, StatusCode, Headers, Body}|Tail]) ->
-%	ct:pal("commands response ~p ~p ~p", [StatusCode, Headers, try iolist_size(Body) catch _:_ -> Body end]),
+%	ct:pal("~p commands response ~p ~p ~p", [self(), StatusCode, Headers, try iolist_size(Body) catch _:_ -> Body end]),
 	State = send_response(State0, Stream, StatusCode, Headers, Body),
 	commands(State, Stream, Tail);
 %% Send response headers.
@@ -537,34 +506,33 @@ commands(State0, Stream, [{headers, StatusCode, Headers}|Tail]) ->
 	State = send_headers(State0, Stream, nofin, StatusCode, Headers),
 	commands(State, Stream, Tail);
 %%% Send a response body chunk.
-commands(State0, Stream=#stream{ref=StreamRef}, [{data, IsFin, Data}|Tail]) ->
+commands(State0=#state{conn=Conn}, Stream=#stream{id=StreamID}, [{data, IsFin, Data}|Tail]) ->
 %	ct:pal("commands data ~p ~p", [IsFin, try iolist_size(Data) catch _:_ -> Data end]),
 	_ = case Data of
 		{sendfile, Offset, Bytes, Path} ->
 			%% Temporary solution to do sendfile over QUIC.
-			{ok, _} = ranch_transport:sendfile(?MODULE, StreamRef,
+			{ok, _} = ranch_transport:sendfile(?MODULE, {Conn, StreamID},
 				Path, Offset, Bytes, []),
-			{ok, _} = quicer:send(StreamRef, cow_http3:data(<<>>), send_flag(IsFin));
+			ok = cowboy_quicer:send(Conn, StreamID, cow_http3:data(<<>>), IsFin);
 		_ ->
-			{ok, _} = quicer:send(StreamRef, cow_http3:data(Data), send_flag(IsFin))
+			ok = cowboy_quicer:send(Conn, StreamID, cow_http3:data(Data), IsFin)
 	end,
 	State = maybe_send_is_fin(State0, Stream, IsFin),
 	commands(State, Stream, Tail);
 %%% Send trailers.
-commands(State=#state{http3_machine=HTTP3Machine0},
-		Stream=#stream{id=StreamID, ref=StreamRef},
-		[{trailers, Trailers}|Tail]) ->
+commands(State=#state{conn=Conn, http3_machine=HTTP3Machine0},
+		Stream=#stream{id=StreamID}, [{trailers, Trailers}|Tail]) ->
 %	ct:pal("commands trailers ~p", [Trailers]),
 	HTTP3Machine = case cow_http3_machine:prepare_trailers(
 			StreamID, HTTP3Machine0, maps:to_list(Trailers)) of
 		{trailers, HeaderBlock, _EncData, HTTP3Machine1} ->
-			ct:pal("trailers"),
+%			ct:pal("trailers"),
 			%% @todo EncData!!
-			{ok, _} = quicer:send(StreamRef, cow_http3:headers(HeaderBlock), send_flag(fin)),
+			ok = cowboy_quicer:send(Conn, StreamID, cow_http3:headers(HeaderBlock), fin),
 			HTTP3Machine1;
 		{no_trailers, HTTP3Machine1} ->
-			ct:pal("no_trailers"),
-			{ok, _} = quicer:send(StreamRef, cow_http3:data(<<>>), send_flag(fin)),
+%			ct:pal("no_trailers"),
+			ok = cowboy_quicer:send(Conn, StreamID, cow_http3:data(<<>>), fin),
 			HTTP3Machine1
 	end,
 	commands(State#state{http3_machine=HTTP3Machine}, Stream, Tail);
@@ -638,7 +606,7 @@ commands(State0, Stream0=#stream{id=StreamID},
 commands(State, Stream, [{set_options, _Opts}|Tail]) ->
 	commands(State, Stream, Tail);
 commands(State, Stream, [stop|_Tail]) ->
-	ct:pal("stop"),
+%	ct:pal("stop"),
 	%% @todo Do we want to run the commands after a stop?
 	%% @todo Do we even allow commands after?
 	stop_stream(stream_store(State, Stream), Stream);
@@ -647,8 +615,8 @@ commands(State=#state{opts=Opts}, Stream, [Log={log, _, _, _}|Tail]) ->
 	cowboy:log(Log, Opts),
 	commands(State, Stream, Tail).
 
-send_response(State0=#state{http3_machine=HTTP3Machine0},
-		Stream=#stream{id=StreamID, ref=StreamRef}, StatusCode, Headers, Body) ->
+send_response(State0=#state{conn=Conn, http3_machine=HTTP3Machine0},
+		Stream=#stream{id=StreamID}, StatusCode, Headers, Body) ->
 	Size = case Body of
 		{sendfile, _, Bytes0, _} -> Bytes0;
 		_ -> iolist_size(Body)
@@ -668,16 +636,18 @@ send_response(State0=#state{http3_machine=HTTP3Machine0},
 			%% @todo It might be better to do async sends.
 			_ = case Body of
 				{sendfile, Offset, Bytes, Path} ->
-					{ok, _} = quicer:send(StreamRef, cow_http3:headers(HeaderBlock)),
+					ok = cowboy_quicer:send(Conn, StreamID,
+						cow_http3:headers(HeaderBlock)),
 					%% Temporary solution to do sendfile over QUIC.
-					{ok, _} = ranch_transport:sendfile(?MODULE, StreamRef,
+					{ok, _} = ranch_transport:sendfile(?MODULE, {Conn, StreamID},
 						Path, Offset, Bytes, []),
-					{ok, _} = quicer:send(StreamRef, cow_http3:data(<<>>), send_flag(fin));
+					ok = cowboy_quicer:send(Conn, StreamID,
+						cow_http3:data(<<>>), fin);
 				_ ->
-					{ok, _} = quicer:send(StreamRef, [
+					ok = cowboy_quicer:send(Conn, StreamID, [
 						cow_http3:headers(HeaderBlock),
 						cow_http3:data(Body)
-					], send_flag(fin))
+					], fin)
 			end,
 			maybe_send_is_fin(State0#state{http3_machine=HTTP3Machine}, Stream, fin)
 	end.
@@ -690,19 +660,18 @@ maybe_send_is_fin(State, _, _) ->
 	State.
 
 %% Temporary callback to do sendfile over QUIC.
-send(StreamRef, IoData) ->
-	case quicer:send(StreamRef, cow_http3:data(IoData)) of
-		{ok, _} -> ok;
-		Error -> Error
-	end.
+-spec send(_, _) -> _. %% @todo
+
+send({Conn, StreamID}, IoData) ->
+	cowboy_quicer:send(Conn, StreamID, cow_http3:data(IoData)).
 
-send_headers(State=#state{http3_machine=HTTP3Machine0},
-		#stream{id=StreamID, ref=StreamRef}, IsFin0, StatusCode, Headers) ->
+send_headers(State=#state{conn=Conn, http3_machine=HTTP3Machine0},
+		#stream{id=StreamID}, IsFin0, StatusCode, Headers) ->
 	{ok, IsFin, HeaderBlock, _EncData, HTTP3Machine}
 		= cow_http3_machine:prepare_headers(StreamID, HTTP3Machine0, IsFin0,
 			#{status => cow_http:status_to_integer(StatusCode)},
 			headers_to_list(Headers)),
-	quicer:send(StreamRef, cow_http3:headers(HeaderBlock), send_flag(IsFin)),
+	ok = cowboy_quicer:send(Conn, StreamID, cow_http3:headers(HeaderBlock), IsFin),
 	%% @todo Send _EncData.
 	State#state{http3_machine=HTTP3Machine}.
 
@@ -713,20 +682,18 @@ headers_to_list(Headers0=#{<<"set-cookie">> := SetCookies}) ->
 headers_to_list(Headers) ->
 	maps:to_list(Headers).
 
-send_flag(nofin) -> ?QUIC_SEND_FLAG_NONE;
-send_flag(fin) -> ?QUIC_SEND_FLAG_FIN.
-
-reset_stream(State0=#state{http3_machine=HTTP3Machine0},
-		Stream=#stream{id=StreamID, ref=StreamRef}, Error) ->
-%ct:pal("~p ~p", [Stream, Error]),
+reset_stream(State0=#state{conn=Conn, http3_machine=HTTP3Machine0},
+		Stream=#stream{id=StreamID}, Error) ->
+%ct:pal("~p reset_stream ~p ~p", [self(), Stream, Error]),
 	Reason = case Error of
 		{internal_error, _, _} -> h3_internal_error;
 		{stream_error, Reason0, _} -> Reason0
 	end,
 	%% @todo Do we want to close both sides?
 	%% @todo Should we close the send side if the receive side was already closed?
-	quicer:shutdown_stream(StreamRef, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT,
-		cow_http3:error_to_code(Reason), infinity),
+	Res = cowboy_quicer:shutdown_stream(Conn, StreamID,
+		both, cow_http3:error_to_code(Reason)),
+%	ct:pal("~p reset_stream res ~p", [self(), Res]),
 	State1 = case cow_http3_machine:reset_stream(StreamID, HTTP3Machine0) of
 		{ok, HTTP3Machine} ->
 			terminate_stream(State0#state{http3_machine=HTTP3Machine}, Stream, Error);
@@ -804,9 +771,9 @@ ignored_frame(State=#state{http3_machine=HTTP3Machine0}, #stream{id=StreamID}) -
 			terminate(State#state{http3_machine=HTTP3Machine}, Error)
 	end.
 
-stream_abort_receive(State, Stream=#stream{ref=StreamRef}, Reason) ->
-	quicer:shutdown_stream(StreamRef, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE,
-		cow_http3:error_to_code(Reason), infinity),
+stream_abort_receive(State=#state{conn=Conn}, Stream=#stream{id=StreamID}, Reason) ->
+	cowboy_quicer:shutdown_stream(Conn, StreamID,
+		receiving, cow_http3:error_to_code(Reason)),
 	stream_store(State, Stream#stream{status=stopping}).
 
 %% @todo Graceful connection shutdown.
@@ -820,7 +787,7 @@ terminate(State=#state{conn=Conn, %http3_status=Status,
 %	if
 %		Status =:= connected; Status =:= closing_initiated ->
 %% @todo
-%			{ok, _} = quicer:send(ControlRef, cow_http3:goaway(
+%			ok = cowboy_quicer:send(Conn, ControlID, cow_http3:goaway(
 %				cow_http3_machine:get_last_streamid(HTTP3Machine))),
 		%% We already sent the GOAWAY frame.
 %		Status =:= closing ->
@@ -829,9 +796,7 @@ terminate(State=#state{conn=Conn, %http3_status=Status,
 	terminate_all_streams(State, maps:to_list(Streams), Reason),
 	cowboy_children:terminate(Children),
 %	terminate_linger(State),
-	quicer:shutdown_connection(Conn,
-		?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE,
-		cow_http3:error_to_code(terminate_reason(Reason))),
+	cowboy_quicer:shutdown(Conn, cow_http3:error_to_code(terminate_reason(Reason))),
 	exit({shutdown, Reason}).
 
 terminate_reason({connection_error, Reason, _}) -> Reason;
@@ -853,49 +818,59 @@ stream_get(#state{streams=Streams}, StreamID) ->
 	maps:get(StreamID, Streams, error).
 
 stream_new_remote(State=#state{http3_machine=HTTP3Machine0, streams=Streams},
-		StreamID, StreamRef, Flags) ->
-	{HTTP3Machine, Status} = case quicer:is_unidirectional(Flags) of
-		true ->
+		StreamID, StreamType) ->
+	{HTTP3Machine, Status} = case StreamType of
+		unidi ->
 			{cow_http3_machine:init_unidi_stream(StreamID, unidi_remote, HTTP3Machine0),
 				header};
-		false ->
+		bidi ->
 			{cow_http3_machine:init_bidi_stream(StreamID, HTTP3Machine0),
 				normal}
 	end,
-	Stream = #stream{id=StreamID, ref=StreamRef, status=Status},
-%	ct:pal("new stream ~p ~p", [Stream, HTTP3Machine]),
+	Stream = #stream{id=StreamID, status=Status},
 	State#state{http3_machine=HTTP3Machine, streams=Streams#{StreamID => Stream}}.
 
-stream_closed(State=#state{opts=Opts, http3_machine=HTTP3Machine0,
-		streams=Streams0, children=Children0}, StreamID, #{error := ErrorCode}) ->
+%% Stream closed message for a local (write-only) unidi stream.
+stream_closed(State=#state{local_control_id=StreamID}, StreamID, _) ->
+	stream_closed1(State, StreamID);
+stream_closed(State=#state{local_encoder_id=StreamID}, StreamID, _) ->
+	stream_closed1(State, StreamID);
+stream_closed(State=#state{local_decoder_id=StreamID}, StreamID, _) ->
+	stream_closed1(State, StreamID);
+stream_closed(State=#state{opts=Opts,
+		streams=Streams0, children=Children0}, StreamID, ErrorCode) ->
+	case maps:take(StreamID, Streams0) of
+		{#stream{state=undefined}, Streams} ->
+			%% Unidi stream has no handler/children.
+			stream_closed1(State#state{streams=Streams}, StreamID);
+		%% We only stop bidi streams if the stream was closed with an error
+		%% or the stream was already in the process of stopping.
+		{#stream{status=Status, state=StreamState}, Streams}
+				when Status =:= stopping; ErrorCode =/= 0 ->
+			terminate_stream_handler(State, StreamID, closed, StreamState),
+			Children = cowboy_children:shutdown(Children0, StreamID),
+			stream_closed1(State#state{streams=Streams, children=Children}, StreamID);
+		%% Don't remove a stream that terminated properly but
+		%% has chosen to remain up (custom stream handlers).
+		{_, _} ->
+			stream_closed1(State, StreamID);
+		%% Stream closed message for a stream that has been reset. Ignore.
+		error ->
+			case is_lingering_stream(State, StreamID) of
+				true ->
+					ok;
+				false ->
+					%% We avoid logging the data as it could be quite large.
+					cowboy:log(warning, "Received stream_closed for unknown stream ~p. ~p ~p",
+						[StreamID, self(), Streams0], Opts)
+			end,
+			State
+	end.
+
+stream_closed1(State=#state{http3_machine=HTTP3Machine0}, StreamID) ->
 	case cow_http3_machine:close_stream(StreamID, HTTP3Machine0) of
 		{ok, HTTP3Machine} ->
-			case maps:take(StreamID, Streams0) of
-				{#stream{state=undefined}, Streams} ->
-					%% Unidi stream has no handler/children.
-					State#state{http3_machine=HTTP3Machine, streams=Streams};
-				%% We only stop bidi streams if the stream was closed with an error
-				%% or the stream was already in the process of stopping.
-				{#stream{status=Status, state=StreamState}, Streams}
-						when Status =:= stopping; ErrorCode =/= 0 ->
-					terminate_stream_handler(State, StreamID, closed, StreamState),
-					Children = cowboy_children:shutdown(Children0, StreamID),
-					State#state{http3_machine=HTTP3Machine, streams=Streams, children=Children};
-				%% Don't remove a stream that terminated properly but
-				%% has chosen to remain up (custom stream handlers).
-				{_, _} ->
-					State#state{http3_machine=HTTP3Machine};
-				error ->
-					case is_lingering_stream(State, StreamID) of
-						true ->
-							ok;
-						false ->
-							%% We avoid logging the data as it could be quite large.
-							cowboy:log(warning, "Received stream_closed for unknown stream ~p.",
-								[StreamID], Opts)
-					end,
-					State
-			end;
+			State#state{http3_machine=HTTP3Machine};
 		{error, Error={connection_error, _, _}, HTTP3Machine} ->
 			terminate(State#state{http3_machine=HTTP3Machine}, Error)
 	end.

+ 154 - 0
src/cowboy_quicer.erl

@@ -0,0 +1,154 @@
+%% Copyright (c) 2023, Loïc Hoguin <essen@ninenines.eu>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+%% QUIC transport using the emqx/quicer NIF.
+
+-module(cowboy_quicer).
+
+%% Connection.
+-export([peername/1]).
+-export([sockname/1]).
+-export([shutdown/2]).
+
+%% Streams.
+-export([start_unidi_stream/2]).
+-export([send/3]).
+-export([send/4]).
+-export([shutdown_stream/4]).
+
+%% Messages.
+-export([handle/1]).
+
+-define(COWBOY_QUICER, 1). %% @todo Remove when merging to master.
+
+-ifndef(COWBOY_QUICER).
+
+%% @todo Error out on all callbacks.
+
+-else.
+
+-include_lib("quicer/include/quicer.hrl").
+
+%% Connection.
+
+-spec peername(_) -> _. %% @todo
+
+peername(Conn) ->
+	quicer:peername(Conn).
+
+-spec sockname(_) -> _. %% @todo
+
+sockname(Conn) ->
+	quicer:sockname(Conn).
+
+-spec shutdown(_, _) -> _. %% @todo
+
+shutdown(Conn, ErrorCode) ->
+	quicer:shutdown_connection(Conn,
+		?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE,
+		ErrorCode).
+
+%% Streams.
+
+-spec start_unidi_stream(_, _) -> _. %% @todo
+
+start_unidi_stream(Conn, HeaderData) ->
+	{ok, StreamRef} = quicer:start_stream(Conn,
+		#{open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}),
+	{ok, _} = quicer:send(StreamRef, HeaderData),
+	{ok, StreamID} = quicer:get_stream_id(StreamRef),
+	put({quicer_stream, StreamID}, StreamRef),
+	{ok, StreamID}.
+
+-spec send(_, _, _) -> _. %% @todo
+
+send(Conn, StreamID, Data) ->
+	send(Conn, StreamID, Data, nofin).
+
+-spec send(_, _, _, _) -> _. %% @todo
+
+send(_Conn, StreamID, Data, IsFin) ->
+	StreamRef = get({quicer_stream, StreamID}),
+	Size = iolist_size(Data),
+	case quicer:send(StreamRef, Data, send_flag(IsFin)) of
+		{ok, _} -> ok;
+		Error -> Error
+	end.
+
+send_flag(nofin) -> ?QUIC_SEND_FLAG_NONE;
+send_flag(fin) -> ?QUIC_SEND_FLAG_FIN.
+
+-spec shutdown_stream(_, _, _, _) -> _. %% @todo
+
+shutdown_stream(_Conn, StreamID, Dir, ErrorCode) ->
+	StreamRef = get({quicer_stream, StreamID}),
+	Res = quicer:shutdown_stream(StreamRef, shutdown_flag(Dir), ErrorCode, infinity),
+%	ct:pal("~p shutdown_stream res ~p", [self(), Res]),
+	ok.
+
+shutdown_flag(both) -> ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT;
+shutdown_flag(receiving) -> ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE.
+
+%% Messages.
+
+%% @todo Probably should have the Conn given too?
+-spec handle(_) -> _. %% @todo
+
+handle({quic, Data, StreamRef, #{flags := Flags}}) when is_binary(Data) ->
+	{ok, StreamID} = quicer:get_stream_id(StreamRef),
+	IsFin = case Flags band ?QUIC_RECEIVE_FLAG_FIN of
+		?QUIC_RECEIVE_FLAG_FIN -> fin;
+		_ -> nofin
+	end,
+	{data, StreamID, IsFin, Data};
+%% QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED.
+handle({quic, new_stream, StreamRef, #{flags := Flags}}) ->
+	ok = quicer:setopt(StreamRef, active, true),
+	{ok, StreamID} = quicer:get_stream_id(StreamRef),
+	put({quicer_stream, StreamID}, StreamRef),
+	StreamType = case quicer:is_unidirectional(Flags) of
+		true -> unidi;
+		false -> bidi
+	end,
+	{stream_started, StreamID, StreamType};
+%% QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE.
+handle({quic, stream_closed, StreamRef, #{error := ErrorCode}}) ->
+	{ok, StreamID} = quicer:get_stream_id(StreamRef),
+	{stream_closed, StreamID, ErrorCode};
+%% QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE.
+handle({quic, closed, Conn, _Flags}) ->
+	quicer:close_connection(Conn),
+	closed;
+%% The following events are currently ignored either because
+%% I do not know what they do or because we do not need to
+%% take action.
+handle({quic, streams_available, _Conn, Props}) ->
+	ok;
+handle({quic, dgram_state_changed, _Conn, _Props}) ->
+	ok;
+%% QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT
+handle({quic, transport_shutdown, _Conn, _Flags}) ->
+	ok;
+handle({quic, peer_send_shutdown, _StreamRef, undefined}) ->
+%	ct:pal("peer_send_shutdown ~p", [StreamRef]),
+	ok;
+handle({quic, send_shutdown_complete, _StreamRef, _IsGraceful}) ->
+	ok;
+handle({quic, shutdown, _Conn, success}) ->
+	ok;
+handle(Msg) ->
+	ct:pal("Ignoring quicer message ~p", [Msg]),
+	ok.
+
+-endif.

+ 1 - 1
test/cowboy_test.erl

@@ -91,7 +91,7 @@ common_groups(Tests) ->
 		{https, Opts, Tests},
 		{h2, Opts, Tests},
 		{h2c, Opts, Tests},
-		{h3, [], Tests}, %% @todo Enable parallel when issues get fixed.
+		{h3, [parallel], Tests}, %% @todo Enable parallel when issues get fixed.
 		{http_compress, Opts, Tests},
 		{https_compress, Opts, Tests},
 		{h2_compress, Opts, Tests},

+ 3 - 1
test/metrics_SUITE.erl

@@ -380,7 +380,9 @@ stream_reply(Config) ->
 ws(Config) ->
 	case config(protocol, Config) of
 		http -> do_ws(Config);
-		http2 -> doc("It is not currently possible to switch to Websocket over HTTP/2.")
+		%% @todo The test can be implemented for HTTP/2.
+		http2 -> doc("It is not currently possible to switch to Websocket over HTTP/2.");
+		http3 -> {skip, "Gun does not currently support Websocket over HTTP/3."}
 	end.
 
 do_ws(Config) ->

+ 20 - 10
test/rfc9114_SUITE.erl

@@ -21,15 +21,15 @@
 
 -include_lib("quicer/include/quicer.hrl").
 
-all() -> [{group, quic}].
+all() -> [{group, h3}].
 
 groups() ->
 	%% @todo Enable parallel tests but for this issues in the
 	%% QUIC accept loop need to be figured out (can't connect
 	%% concurrently somehow, no backlog?).
-	[{quic, [], ct_helper:all(?MODULE)}].
+	[{h3, [], ct_helper:all(?MODULE)}].
 
-init_per_group(Name = quic, Config) ->
+init_per_group(Name = h3, Config) ->
 	cowboy_test:init_http3(Name, #{
 		env => #{dispatch => cowboy_router:compile(init_routes(Config))}
 	}, Config).
@@ -50,7 +50,7 @@ alpn(Config) ->
 	doc("Successful ALPN negotiation. (RFC9114 3.1)"),
 	{ok, Conn} = quicer:connect("localhost", config(port, Config),
 		#{alpn => ["h3"], verify => none}, 5000),
-	{ok, <<"h3">>} = quicer:getopt(Conn, param_tls_negotiated_alpn, quic_tls),
+	{ok, <<"h3">>} = quicer:negotiated_protocol(Conn),
 	%% To make sure the connection is fully established we wait
 	%% to receive the SETTINGS frame on the control stream.
 	{ok, _ControlRef, _Settings} = do_wait_settings(Conn),
@@ -1219,12 +1219,20 @@ unidi_allow_at_least_three(Config) ->
 		#{open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}),
 	{ok, _} = quicer:send(DecoderRef, <<3>>),
 	%% Streams shouldn't get closed.
-	receive
-		Msg ->
-			error(Msg)
-	after 1000 ->
-		ok
-	end.
+	fun Loop() ->
+		receive
+			%% We don't care about these messages.
+			{quic, dgram_state_changed, Conn, _} ->
+				Loop();
+			{quic, peer_needs_streams, Conn, _} ->
+				Loop();
+			%% Any other we do care.
+			Msg ->
+				error(Msg)
+		after 1000 ->
+			ok
+		end
+	end().
 
 unidi_create_critical_first(Config) ->
 	doc("Endpoints should create the HTTP control stream as well as "
@@ -1495,6 +1503,8 @@ control_local_closed_abort(Config) ->
 	%% Close the control stream.
 	quicer:async_shutdown_stream(ControlRef, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, 0),
 	%% The connection should have been closed.
+	timer:sleep(1000),
+	ct:pal("~p", [process_info(self(), messages)]),
 	#{reason := h3_closed_critical_stream} = do_wait_connection_closed(Conn),
 	ok.