|
@@ -15,10 +15,7 @@
|
|
|
%% A key difference between cowboy_http2 and cowboy_http3
|
|
|
%% is that HTTP/3 streams are QUIC streams and therefore
|
|
|
%% much of the connection state is handled outside of
|
|
|
-%% Cowboy. The quicer library uses a reference for
|
|
|
-%% identifying streams, and we use that same reference
|
|
|
-%% for our StreamID internally. The real StreamID can
|
|
|
-%% be retrieved via quicer:get_stream_id(StreamRef).
|
|
|
+%% Cowboy.
|
|
|
|
|
|
-module(cowboy_http3).
|
|
|
|
|
@@ -30,12 +27,11 @@
|
|
|
-include_lib("quicer/include/quicer.hrl").
|
|
|
|
|
|
-record(stream, {
|
|
|
- %% @todo We shouldn't use the QUIC reference because it is a NIF object
|
|
|
- %% and if it's stored somewhere by the user it'll never get GC.
|
|
|
+ id :: non_neg_integer(), %% @todo specs
|
|
|
ref :: any(), %% @todo specs
|
|
|
|
|
|
%% Whether the stream is currently in a special state.
|
|
|
- status :: header | normal | {data, non_neg_integer()} | discard,
|
|
|
+ status :: header | normal | {data, non_neg_integer()} | stopping,
|
|
|
|
|
|
%% Stream buffer.
|
|
|
buffer = <<>> :: binary(),
|
|
@@ -58,7 +54,13 @@
|
|
|
%% HTTP/3 state machine.
|
|
|
http3_machine :: cow_http3_machine:http3_machine(),
|
|
|
|
|
|
- %% Bidirectional streams are used for requests and responses.
|
|
|
+ %% Quick pointers for commonly used streams.
|
|
|
+ local_control_ref :: any(), %% @todo specs Control stream must not be closed.
|
|
|
+ local_encoder_ref :: any(), %% @todo specs
|
|
|
+ local_decoder_ref :: any(), %% @todo specs
|
|
|
+
|
|
|
+ %% Bidirectional streams used for requests and responses,
|
|
|
+ %% as well as unidirectional streams initiated by the client.
|
|
|
streams = #{} :: map(), %% @todo specs
|
|
|
|
|
|
%% Lingering streams that were recently reset. We may receive
|
|
@@ -77,16 +79,19 @@ init(Parent, Conn, Opts) ->
|
|
|
%% Immediately open a control, encoder and decoder stream.
|
|
|
{ok, ControlRef} = quicer:start_stream(Conn,
|
|
|
#{open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}),
|
|
|
+ {ok, ControlID} = quicer:get_stream_id(ControlRef),
|
|
|
{ok, _} = quicer:send(ControlRef, [<<0>>, SettingsBin]),
|
|
|
{ok, EncoderRef} = quicer:start_stream(Conn,
|
|
|
#{open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}),
|
|
|
+ {ok, EncoderID} = quicer:get_stream_id(EncoderRef),
|
|
|
{ok, _} = quicer:send(EncoderRef, <<2>>),
|
|
|
{ok, DecoderRef} = quicer:start_stream(Conn,
|
|
|
#{open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}),
|
|
|
+ {ok, DecoderID} = quicer:get_stream_id(DecoderRef),
|
|
|
{ok, _} = quicer:send(DecoderRef, <<3>>),
|
|
|
%% Set the control, encoder and decoder streams in the machine.
|
|
|
HTTP3Machine = cow_http3_machine:init_unidi_local_streams(
|
|
|
- ControlRef, EncoderRef, DecoderRef, HTTP3Machine0),
|
|
|
+ ControlID, EncoderID, DecoderID, HTTP3Machine0),
|
|
|
%% Get the peername/sockname.
|
|
|
Peer0 = quicer:peername(Conn),
|
|
|
Sock0 = quicer:sockname(Conn),
|
|
@@ -95,7 +100,10 @@ init(Parent, Conn, Opts) ->
|
|
|
{{ok, Peer}, {ok, Sock}} ->
|
|
|
%% Quick! Let's go!
|
|
|
loop(#state{parent=Parent, conn=Conn, opts=Opts,
|
|
|
- peer=Peer, sock=Sock, http3_machine=HTTP3Machine});
|
|
|
+ peer=Peer, sock=Sock, http3_machine=HTTP3Machine,
|
|
|
+ local_control_ref=ControlRef,
|
|
|
+ local_encoder_ref=EncoderRef,
|
|
|
+ local_decoder_ref=DecoderRef});
|
|
|
{{error, Reason}, _} ->
|
|
|
terminate(undefined, {socket_error, Reason,
|
|
|
'A socket error occurred when retrieving the peer name.'});
|
|
@@ -110,17 +118,20 @@ loop(State0=#state{conn=Conn}) ->
|
|
|
%% @todo IsFin is inside Props. But it may not be set once the data was sent.
|
|
|
{quic, Data, StreamRef, Props} when is_binary(Data) ->
|
|
|
% ct:pal("DATA ~p props ~p", [StreamRef, Props]),
|
|
|
- parse(State0, Data, StreamRef, Props);
|
|
|
+ {ok, StreamID} = quicer:get_stream_id(StreamRef),
|
|
|
+ parse(State0, Data, StreamID, Props);
|
|
|
%% QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED
|
|
|
{quic, new_stream, StreamRef, #{flags := Flags}} ->
|
|
|
% ct:pal("new_stream ~p flags ~p", [StreamRef, Flags]),
|
|
|
ok = quicer:setopt(StreamRef, active, true),
|
|
|
- State = stream_new_remote(State0, StreamRef, Flags),
|
|
|
+ {ok, StreamID} = quicer:get_stream_id(StreamRef),
|
|
|
+ State = stream_new_remote(State0, StreamID, StreamRef, Flags),
|
|
|
loop(State);
|
|
|
%% QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE
|
|
|
{quic, stream_closed, StreamRef, Flags} ->
|
|
|
% ct:pal("stream_closed ~p flags ~p", [StreamRef, Flags]),
|
|
|
- State = stream_closed(State0, StreamRef, Flags),
|
|
|
+ {ok, StreamID} = quicer:get_stream_id(StreamRef),
|
|
|
+ State = stream_closed(State0, StreamID, Flags),
|
|
|
loop(State);
|
|
|
%% QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE
|
|
|
%%
|
|
@@ -146,8 +157,8 @@ loop(State0=#state{conn=Conn}) ->
|
|
|
{quic, send_shutdown_complete, _StreamRef, _IsGraceful} ->
|
|
|
loop(State0);
|
|
|
%% Messages pertaining to a stream.
|
|
|
- {{Pid, StreamRef}, Msg} when Pid =:= self() ->
|
|
|
- loop(info(State0, StreamRef, Msg));
|
|
|
+ {{Pid, StreamID}, Msg} when Pid =:= self() ->
|
|
|
+ loop(info(State0, StreamID, Msg));
|
|
|
%% Exit signal from children.
|
|
|
Msg = {'EXIT', Pid, _} ->
|
|
|
loop(down(State0, Pid, Msg));
|
|
@@ -156,34 +167,31 @@ loop(State0=#state{conn=Conn}) ->
|
|
|
loop(State0)
|
|
|
end.
|
|
|
|
|
|
-parse(State=#state{streams=Streams, opts=Opts}, Data, StreamRef, Props) ->
|
|
|
- case Streams of
|
|
|
- #{StreamRef := Stream=#stream{buffer= <<>>}} ->
|
|
|
+parse(State=#state{opts=Opts}, Data, StreamID, Props) ->
|
|
|
+ case stream_get(State, StreamID) of
|
|
|
+ Stream=#stream{buffer= <<>>} ->
|
|
|
parse1(State, Data, Stream, Props);
|
|
|
- #{StreamRef := Stream=#stream{buffer=Buffer}} ->
|
|
|
- %% @todo OK we should only keep the StreamRef forward
|
|
|
- %% and update the stream in the state here.
|
|
|
+ Stream=#stream{buffer=Buffer} ->
|
|
|
Stream1 = Stream#stream{buffer= <<>>},
|
|
|
- parse1(stream_update(State, Stream1),
|
|
|
+ parse1(stream_store(State, Stream1),
|
|
|
<<Buffer/binary, Data/binary>>, Stream1, Props);
|
|
|
%% Pending data for a stream that has been reset. Ignore.
|
|
|
- %% @todo Maybe keep a few pending to ignore this and stream process messages.
|
|
|
- #{} ->
|
|
|
- case is_lingering_stream(State, StreamRef) of
|
|
|
+ error ->
|
|
|
+ case is_lingering_stream(State, StreamID) of
|
|
|
true ->
|
|
|
ok;
|
|
|
false ->
|
|
|
%% We avoid logging the data as it could be quite large.
|
|
|
cowboy:log(warning, "Received data for unknown stream ~p.",
|
|
|
- [StreamRef], Opts)
|
|
|
+ [StreamID], Opts)
|
|
|
end,
|
|
|
loop(State)
|
|
|
end.
|
|
|
|
|
|
-%% @todo Swap Data and Stream/StreamRef.
|
|
|
+%% @todo Swap Data and Stream/StreamID?
|
|
|
parse1(State, Data, Stream=#stream{status=header}, Props) ->
|
|
|
parse_unidirectional_stream_header(State, Data, Stream, Props);
|
|
|
-parse1(State, Data, Stream=#stream{status={data, Len}, ref=StreamRef}, Props) ->
|
|
|
+parse1(State, Data, Stream=#stream{status={data, Len}, id=StreamID}, Props) ->
|
|
|
DataLen = byte_size(Data),
|
|
|
if
|
|
|
DataLen < Len ->
|
|
@@ -193,15 +201,16 @@ parse1(State, Data, Stream=#stream{status={data, Len}, ref=StreamRef}, Props) ->
|
|
|
<<Data1:Len/binary, Rest/bits>> = Data,
|
|
|
IsFin = is_fin(Props, Rest),
|
|
|
parse(frame(State, Stream#stream{status=normal}, {data, Data1}, IsFin),
|
|
|
- Rest, StreamRef, Props)
|
|
|
+ Rest, StreamID, Props)
|
|
|
end;
|
|
|
-%% @todo Clause that discards receiving data for aborted streams.
|
|
|
-parse1(State, Data, Stream=#stream{ref=StreamRef}, Props) ->
|
|
|
+%% @todo Clause that discards receiving data for stopping streams.
|
|
|
+%% We may receive a few more frames after we abort receiving.
|
|
|
+parse1(State, Data, Stream=#stream{id=StreamID}, Props) ->
|
|
|
case cow_http3:parse(Data) of
|
|
|
{ok, Frame, Rest} ->
|
|
|
IsFin = is_fin(Props, Rest),
|
|
|
% ct:pal("parse1 Frame= ~p Rest= ~p", [Frame, Rest]),
|
|
|
- parse(frame(State, Stream, Frame, IsFin), Rest, StreamRef, Props);
|
|
|
+ parse(frame(State, Stream, Frame, IsFin), Rest, StreamID, Props);
|
|
|
{more, Frame, Len} ->
|
|
|
IsFin = is_fin(Props, <<>>),
|
|
|
case IsFin of
|
|
@@ -212,16 +221,16 @@ parse1(State, Data, Stream=#stream{ref=StreamRef}, Props) ->
|
|
|
'Last frame on stream was truncated. (RFC9114 7.1)'})
|
|
|
end;
|
|
|
{ignore, Rest} ->
|
|
|
- parse(ignored_frame(State, Stream), Rest, StreamRef, Props);
|
|
|
+ parse(ignored_frame(State, Stream), Rest, StreamID, Props);
|
|
|
Error = {connection_error, _, _} ->
|
|
|
terminate(State, Error);
|
|
|
more when Data =:= <<>> ->
|
|
|
- loop(stream_update(State, Stream#stream{buffer=Data}));
|
|
|
+ loop(stream_store(State, Stream#stream{buffer=Data}));
|
|
|
more ->
|
|
|
IsFin = is_fin(Props, <<>>),
|
|
|
case IsFin of
|
|
|
nofin ->
|
|
|
- loop(stream_update(State, Stream#stream{buffer=Data}));
|
|
|
+ loop(stream_store(State, Stream#stream{buffer=Data}));
|
|
|
fin ->
|
|
|
terminate(State, {connection_error, h3_frame_error,
|
|
|
'Last frame on stream was truncated. (RFC9114 7.1)'})
|
|
@@ -239,15 +248,15 @@ is_fin(#{flags := Flags}, Rest) ->
|
|
|
end.
|
|
|
|
|
|
parse_unidirectional_stream_header(State0=#state{http3_machine=HTTP3Machine0},
|
|
|
- Data, Stream0=#stream{ref=StreamRef}, Props) ->
|
|
|
+ Data, Stream0=#stream{id=StreamID}, Props) ->
|
|
|
case cow_http3:parse_unidi_stream_header(Data) of
|
|
|
{ok, Type, Rest} when Type =:= control; Type =:= encoder; Type =:= decoder ->
|
|
|
case cow_http3_machine:set_unidi_remote_stream_type(
|
|
|
- StreamRef, Type, HTTP3Machine0) of
|
|
|
+ StreamID, Type, HTTP3Machine0) of
|
|
|
{ok, HTTP3Machine} ->
|
|
|
State = State0#state{http3_machine=HTTP3Machine},
|
|
|
Stream = Stream0#stream{status=normal},
|
|
|
- parse(stream_update(State, Stream), Rest, StreamRef, Props);
|
|
|
+ parse(stream_store(State, Stream), Rest, StreamID, Props);
|
|
|
{error, Error={connection_error, _, _}, HTTP3Machine} ->
|
|
|
terminate(State0#state{http3_machine=HTTP3Machine}, Error)
|
|
|
end;
|
|
@@ -260,9 +269,10 @@ parse_unidirectional_stream_header(State0=#state{http3_machine=HTTP3Machine0},
|
|
|
loop(stream_abort_receive(State0, Stream0, h3_stream_creation_error))
|
|
|
end.
|
|
|
|
|
|
-frame(State=#state{http3_machine=HTTP3Machine0}, Stream=#stream{ref=StreamRef}, Frame, IsFin) ->
|
|
|
+frame(State=#state{http3_machine=HTTP3Machine0, local_decoder_ref=DecoderRef},
|
|
|
+ Stream=#stream{id=StreamID}, Frame, IsFin) ->
|
|
|
% ct:pal("cowboy frame ~p ~p", [Frame, IsFin]),
|
|
|
- case cow_http3_machine:frame(Frame, IsFin, StreamRef, HTTP3Machine0) of
|
|
|
+ case cow_http3_machine:frame(Frame, IsFin, StreamID, HTTP3Machine0) of
|
|
|
{ok, HTTP3Machine} ->
|
|
|
State#state{http3_machine=HTTP3Machine};
|
|
|
{ok, {data, Data}, HTTP3Machine} ->
|
|
@@ -271,8 +281,7 @@ frame(State=#state{http3_machine=HTTP3Machine0}, Stream=#stream{ref=StreamRef},
|
|
|
{ok, {headers, IsFin, Headers, PseudoHeaders, BodyLen}, HTTP3Machine} ->
|
|
|
headers_frame(State#state{http3_machine=HTTP3Machine},
|
|
|
Stream, IsFin, Headers, PseudoHeaders, BodyLen);
|
|
|
- {ok, {headers, IsFin, Headers, PseudoHeaders, BodyLen},
|
|
|
- {DecoderRef, DecData}, HTTP3Machine} ->
|
|
|
+ {ok, {headers, IsFin, Headers, PseudoHeaders, BodyLen}, DecData, HTTP3Machine} ->
|
|
|
%% Send the decoder data.
|
|
|
{ok, _} = quicer:send(DecoderRef, DecData),
|
|
|
headers_frame(State#state{http3_machine=HTTP3Machine},
|
|
@@ -283,40 +292,46 @@ frame(State=#state{http3_machine=HTTP3Machine0}, Stream=#stream{ref=StreamRef},
|
|
|
{ok, GoAway={goaway, _}, HTTP3Machine} ->
|
|
|
goaway(State#state{http3_machine=HTTP3Machine}, GoAway);
|
|
|
{error, Error={stream_error, _Reason, _Human}, HTTP3Machine} ->
|
|
|
- reset_stream(State#state{http3_machine=HTTP3Machine}, StreamRef, Error);
|
|
|
+ reset_stream(State#state{http3_machine=HTTP3Machine}, Stream, Error);
|
|
|
{error, Error={connection_error, _, _}, HTTP3Machine} ->
|
|
|
terminate(State#state{http3_machine=HTTP3Machine}, Error)
|
|
|
end.
|
|
|
|
|
|
-data_frame(State=#state{opts=Opts, streams=Streams},
|
|
|
- Stream=#stream{ref=StreamRef, state=StreamState0}, IsFin, Data) ->
|
|
|
- try cowboy_stream:data(StreamRef, IsFin, Data, StreamState0) of
|
|
|
+data_frame(State=#state{opts=Opts},
|
|
|
+ Stream=#stream{id=StreamID, state=StreamState0}, IsFin, Data) ->
|
|
|
+ try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
|
|
|
{Commands, StreamState} ->
|
|
|
- commands(State#state{
|
|
|
- streams=Streams#{StreamRef => Stream#stream{state=StreamState}}},
|
|
|
- StreamRef, Commands)
|
|
|
+ commands(State, Stream#stream{state=StreamState}, Commands)
|
|
|
catch Class:Exception:Stacktrace ->
|
|
|
cowboy:log(cowboy_stream:make_error_log(data,
|
|
|
- [StreamRef, IsFin, Data, StreamState0],
|
|
|
+ [StreamID, IsFin, Data, StreamState0],
|
|
|
Class, Exception, Stacktrace), Opts),
|
|
|
- reset_stream(State, StreamRef, {internal_error, {Class, Exception},
|
|
|
+ reset_stream(State, Stream, {internal_error, {Class, Exception},
|
|
|
'Unhandled exception in cowboy_stream:data/4.'})
|
|
|
end.
|
|
|
|
|
|
-%% @todo CONNECT, TRACE.
|
|
|
+headers_frame(State, Stream, IsFin, Headers,
|
|
|
+ PseudoHeaders=#{method := <<"CONNECT">>}, _)
|
|
|
+ when map_size(PseudoHeaders) =:= 2 ->
|
|
|
+ early_error(State, Stream, IsFin, Headers, PseudoHeaders, 501,
|
|
|
+ 'The CONNECT method is currently not implemented. (RFC7231 4.3.6)');
|
|
|
+headers_frame(State, Stream, IsFin, Headers,
|
|
|
+ PseudoHeaders=#{method := <<"TRACE">>}, _) ->
|
|
|
+ early_error(State, Stream, IsFin, Headers, PseudoHeaders, 501,
|
|
|
+ 'The TRACE method is currently not implemented. (RFC9114 4.4, RFC7231 4.3.8)');
|
|
|
headers_frame(State, Stream, IsFin, Headers, PseudoHeaders=#{authority := Authority}, BodyLen) ->
|
|
|
headers_frame_parse_host(State, Stream, IsFin, Headers, PseudoHeaders, BodyLen, Authority);
|
|
|
-headers_frame(State, Stream=#stream{ref=StreamRef}, IsFin, Headers, PseudoHeaders, BodyLen) ->
|
|
|
+headers_frame(State, Stream, IsFin, Headers, PseudoHeaders, BodyLen) ->
|
|
|
case lists:keyfind(<<"host">>, 1, Headers) of
|
|
|
{_, Authority} ->
|
|
|
headers_frame_parse_host(State, Stream, IsFin, Headers, PseudoHeaders, BodyLen, Authority);
|
|
|
_ ->
|
|
|
- reset_stream(State, StreamRef, {stream_error, h3_message_error,
|
|
|
+ reset_stream(State, Stream, {stream_error, h3_message_error,
|
|
|
'Requests translated from HTTP/1.1 must include a host header. (RFC7540 8.1.2.3, RFC7230 5.4)'})
|
|
|
end.
|
|
|
|
|
|
headers_frame_parse_host(State=#state{peer=Peer, sock=Sock},
|
|
|
- Stream=#stream{ref=StreamRef}, IsFin, Headers,
|
|
|
+ Stream=#stream{id=StreamID}, IsFin, Headers,
|
|
|
#{method := Method, scheme := Scheme, path := PathWithQs},
|
|
|
BodyLen, Authority) ->
|
|
|
try cow_http_hd:parse_host(Authority) of
|
|
@@ -324,13 +339,13 @@ headers_frame_parse_host(State=#state{peer=Peer, sock=Sock},
|
|
|
Port = ensure_port(Scheme, Port0),
|
|
|
try cow_http:parse_fullpath(PathWithQs) of
|
|
|
{<<>>, _} ->
|
|
|
- reset_stream(State, StreamRef, {stream_error, h3_message_error,
|
|
|
+ reset_stream(State, Stream, {stream_error, h3_message_error,
|
|
|
'The path component must not be empty. (RFC7540 8.1.2.3)'});
|
|
|
{Path, Qs} ->
|
|
|
Req = #{
|
|
|
ref => quic, %% @todo Ref,
|
|
|
pid => self(),
|
|
|
- streamid => StreamRef,
|
|
|
+ streamid => StreamID,
|
|
|
peer => Peer,
|
|
|
sock => Sock,
|
|
|
cert => undefined, %Cert, %% @todo
|
|
@@ -352,11 +367,11 @@ headers_frame_parse_host(State=#state{peer=Peer, sock=Sock},
|
|
|
% end,
|
|
|
headers_frame(State, Stream, Req)
|
|
|
catch _:_ ->
|
|
|
- reset_stream(State, StreamRef, {stream_error, h3_message_error,
|
|
|
+ reset_stream(State, Stream, {stream_error, h3_message_error,
|
|
|
'The :path pseudo-header is invalid. (RFC7540 8.1.2.3)'})
|
|
|
end
|
|
|
catch _:_ ->
|
|
|
- reset_stream(State, StreamRef, {stream_error, h3_message_error,
|
|
|
+ reset_stream(State, Stream, {stream_error, h3_message_error,
|
|
|
'The :authority pseudo-header is invalid. (RFC7540 8.1.2.3)'})
|
|
|
end.
|
|
|
|
|
@@ -383,24 +398,60 @@ headers_to_map([{Name, Value}|Tail], Acc0) ->
|
|
|
end,
|
|
|
headers_to_map(Tail, Acc).
|
|
|
|
|
|
-headers_frame(State=#state{opts=Opts, streams=Streams},
|
|
|
- Stream=#stream{ref=StreamRef}, Req) ->
|
|
|
+headers_frame(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Req) ->
|
|
|
ct:pal("req ~p", [Req]),
|
|
|
- try cowboy_stream:init(StreamRef, Req, Opts) of
|
|
|
+ try cowboy_stream:init(StreamID, Req, Opts) of
|
|
|
{Commands, StreamState} ->
|
|
|
%logger:error("~p", [Commands]),
|
|
|
%logger:error("~p", [StreamState]),
|
|
|
- commands(State#state{
|
|
|
- streams=Streams#{StreamRef => Stream#stream{state=StreamState}}},
|
|
|
- StreamRef, Commands)
|
|
|
+ commands(State, Stream#stream{state=StreamState}, Commands)
|
|
|
catch Class:Exception:Stacktrace ->
|
|
|
cowboy:log(cowboy_stream:make_error_log(init,
|
|
|
- [StreamRef, Req, Opts],
|
|
|
+ [StreamID, Req, Opts],
|
|
|
Class, Exception, Stacktrace), Opts),
|
|
|
- reset_stream(State, StreamRef, {internal_error, {Class, Exception},
|
|
|
+ reset_stream(State, Stream, {internal_error, {Class, Exception},
|
|
|
'Unhandled exception in cowboy_stream:init/3.'})
|
|
|
end.
|
|
|
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+early_error(State0=#state{opts=Opts, peer=Peer},
|
|
|
+ Stream=#stream{id=StreamID}, _IsFin, Headers, #{method := Method},
|
|
|
+ StatusCode0, HumanReadable) ->
|
|
|
+ %% We automatically terminate the stream but it is not an error
|
|
|
+ %% per se (at least not in the first implementation).
|
|
|
+ Reason = {stream_error, h3_no_error, HumanReadable},
|
|
|
+ %% The partial Req is minimal for now. We only have one case
|
|
|
+ %% where it can be called (when a method is completely disabled).
|
|
|
+ %% @todo Fill in the other elements.
|
|
|
+ PartialReq = #{
|
|
|
+ ref => quic, %% @todo Ref,
|
|
|
+ peer => Peer,
|
|
|
+ method => Method,
|
|
|
+ headers => headers_to_map(Headers, #{})
|
|
|
+ },
|
|
|
+ Resp = {response, StatusCode0, RespHeaders0=#{<<"content-length">> => <<"0">>}, <<>>},
|
|
|
+ try cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts) of
|
|
|
+ {response, StatusCode, RespHeaders, RespBody} ->
|
|
|
+ send_response(State0, Stream, StatusCode, RespHeaders, RespBody)
|
|
|
+ catch Class:Exception:Stacktrace ->
|
|
|
+ cowboy:log(cowboy_stream:make_error_log(early_error,
|
|
|
+ [StreamID, Reason, PartialReq, Resp, Opts],
|
|
|
+ Class, Exception, Stacktrace), Opts),
|
|
|
+ %% We still need to send an error response, so send what we initially
|
|
|
+ %% wanted to send. It's better than nothing.
|
|
|
+ send_headers(State0, StreamID, fin, StatusCode0, RespHeaders0)
|
|
|
+ end.
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
%% Erlang messages.
|
|
|
|
|
|
down(State0=#state{opts=Opts, children=Children0}, Pid, Msg) ->
|
|
@@ -409,8 +460,8 @@ down(State0=#state{opts=Opts, children=Children0}, Pid, Msg) ->
|
|
|
{ok, undefined, Children} ->
|
|
|
State0#state{children=Children};
|
|
|
%% The stream is still running.
|
|
|
- {ok, StreamRef, Children} ->
|
|
|
- info(State0#state{children=Children}, StreamRef, Msg);
|
|
|
+ {ok, StreamID, Children} ->
|
|
|
+ info(State0#state{children=Children}, StreamID, Msg);
|
|
|
%% The process was unknown.
|
|
|
error ->
|
|
|
cowboy:log(warning, "Received EXIT signal ~p for unknown process ~p.~n",
|
|
@@ -425,64 +476,62 @@ down(State0=#state{opts=Opts, children=Children0}, Pid, Msg) ->
|
|
|
State
|
|
|
end.
|
|
|
|
|
|
-info(State=#state{opts=Opts, http3_machine=_HTTP3Machine, streams=Streams}, StreamRef, Msg) ->
|
|
|
+info(State=#state{opts=Opts, http3_machine=_HTTP3Machine}, StreamID, Msg) ->
|
|
|
%ct:pal("INFO ~p", [Msg]),
|
|
|
- case Streams of
|
|
|
- #{StreamRef := Stream=#stream{state=StreamState0}} ->
|
|
|
- try cowboy_stream:info(StreamRef, Msg, StreamState0) of
|
|
|
+ case stream_get(State, StreamID) of
|
|
|
+ Stream=#stream{state=StreamState0} ->
|
|
|
+ try cowboy_stream:info(StreamID, Msg, StreamState0) of
|
|
|
{Commands, StreamState} ->
|
|
|
%ct:pal("~p", [Commands]),
|
|
|
-%logger:error("~p ~p", [StreamRef, Streams]),
|
|
|
- commands(State#state{streams=Streams#{StreamRef => Stream#stream{state=StreamState}}},
|
|
|
- StreamRef, Commands)
|
|
|
+%logger:error("~p ~p", [StreamID, Streams]),
|
|
|
+ commands(State, Stream#stream{state=StreamState}, Commands)
|
|
|
catch Class:Exception:Stacktrace ->
|
|
|
cowboy:log(cowboy_stream:make_error_log(info,
|
|
|
- [StreamRef, Msg, StreamState0],
|
|
|
+ [StreamID, Msg, StreamState0],
|
|
|
Class, Exception, Stacktrace), Opts),
|
|
|
- reset_stream(State, StreamRef, {internal_error, {Class, Exception},
|
|
|
+ reset_stream(State, Stream, {internal_error, {Class, Exception},
|
|
|
'Unhandled exception in cowboy_stream:info/3.'})
|
|
|
end;
|
|
|
- _ ->
|
|
|
- case is_lingering_stream(State, StreamRef) of
|
|
|
+ error ->
|
|
|
+ case is_lingering_stream(State, StreamID) of
|
|
|
true ->
|
|
|
ok;
|
|
|
false ->
|
|
|
cowboy:log(warning, "Received message ~p for unknown stream ~p.",
|
|
|
- [Msg, StreamRef], Opts)
|
|
|
+ [Msg, StreamID], Opts)
|
|
|
end,
|
|
|
State
|
|
|
end.
|
|
|
|
|
|
%% Stream handler commands.
|
|
|
|
|
|
-commands(State, _, []) ->
|
|
|
- State;
|
|
|
+commands(State, Stream, []) ->
|
|
|
+ stream_store(State, Stream);
|
|
|
%% Error responses are sent only if a response wasn't sent already.
|
|
|
-commands(State=#state{http3_machine=HTTP3Machine}, StreamRef,
|
|
|
+commands(State=#state{http3_machine=HTTP3Machine}, Stream=#stream{id=StreamID},
|
|
|
[{error_response, StatusCode, Headers, Body}|Tail]) ->
|
|
|
- %% @todo
|
|
|
-% case cow_http2_machine:get_stream_local_state(StreamRef, HTTP2Machine) of
|
|
|
-% {ok, idle, _} ->
|
|
|
- commands(State, StreamRef, [{response, StatusCode, Headers, Body}|Tail]);
|
|
|
-% _ ->
|
|
|
-% commands(State, StreamRef, Tail)
|
|
|
-% end;
|
|
|
+ case cow_http3_machine:get_stream_local_state(StreamID, HTTP3Machine) of
|
|
|
+ {ok, idle} ->
|
|
|
+ commands(State, Stream, [{response, StatusCode, Headers, Body}|Tail]);
|
|
|
+ _ ->
|
|
|
+ commands(State, Stream, Tail)
|
|
|
+ end;
|
|
|
%% Send an informational response.
|
|
|
-commands(State0, StreamRef, [{inform, StatusCode, Headers}|Tail]) ->
|
|
|
- State = send_headers(State0, StreamRef, idle, StatusCode, Headers),
|
|
|
- commands(State, StreamRef, Tail);
|
|
|
+commands(State0, Stream, [{inform, StatusCode, Headers}|Tail]) ->
|
|
|
+ State = send_headers(State0, Stream, idle, StatusCode, Headers),
|
|
|
+ commands(State, Stream, Tail);
|
|
|
%% Send response headers.
|
|
|
-commands(State0, StreamRef, [{response, StatusCode, Headers, Body}|Tail]) ->
|
|
|
+commands(State0, Stream, [{response, StatusCode, Headers, Body}|Tail]) ->
|
|
|
ct:pal("commands response ~p ~p ~p", [StatusCode, Headers, try iolist_size(Body) catch _:_ -> Body end]),
|
|
|
- State = send_response(State0, StreamRef, StatusCode, Headers, Body),
|
|
|
- commands(State, StreamRef, Tail);
|
|
|
+ State = send_response(State0, Stream, StatusCode, Headers, Body),
|
|
|
+ commands(State, Stream, Tail);
|
|
|
%% Send response headers.
|
|
|
-commands(State0, StreamRef, [{headers, StatusCode, Headers}|Tail]) ->
|
|
|
+commands(State0, Stream, [{headers, StatusCode, Headers}|Tail]) ->
|
|
|
ct:pal("commands headers ~p ~p", [StatusCode, Headers]),
|
|
|
- State = send_headers(State0, StreamRef, nofin, StatusCode, Headers),
|
|
|
- commands(State, StreamRef, Tail);
|
|
|
+ State = send_headers(State0, Stream, nofin, StatusCode, Headers),
|
|
|
+ commands(State, Stream, Tail);
|
|
|
%%% Send a response body chunk.
|
|
|
-commands(State0, StreamRef, [{data, IsFin, Data}|Tail]) ->
|
|
|
+commands(State0, Stream=#stream{ref=StreamRef}, [{data, IsFin, Data}|Tail]) ->
|
|
|
ct:pal("commands data ~p ~p", [IsFin, try iolist_size(Data) catch _:_ -> Data end]),
|
|
|
_ = case Data of
|
|
|
{sendfile, Offset, Bytes, Path} ->
|
|
@@ -493,13 +542,15 @@ commands(State0, StreamRef, [{data, IsFin, Data}|Tail]) ->
|
|
|
_ ->
|
|
|
{ok, _} = quicer:send(StreamRef, cow_http3:data(Data), send_flag(IsFin))
|
|
|
end,
|
|
|
- State = maybe_send_is_fin(State0, StreamRef, IsFin),
|
|
|
- commands(State, StreamRef, Tail);
|
|
|
+ State = maybe_send_is_fin(State0, Stream, IsFin),
|
|
|
+ commands(State, Stream, Tail);
|
|
|
%%% Send trailers.
|
|
|
-commands(State=#state{http3_machine=HTTP3Machine0}, StreamRef, [{trailers, Trailers}|Tail]) ->
|
|
|
+commands(State=#state{http3_machine=HTTP3Machine0},
|
|
|
+ Stream=#stream{id=StreamID, ref=StreamRef},
|
|
|
+ [{trailers, Trailers}|Tail]) ->
|
|
|
ct:pal("commands trailers ~p", [Trailers]),
|
|
|
HTTP3Machine = case cow_http3_machine:prepare_trailers(
|
|
|
- StreamRef, HTTP3Machine0, maps:to_list(Trailers)) of
|
|
|
+ StreamID, HTTP3Machine0, maps:to_list(Trailers)) of
|
|
|
{trailers, HeaderBlock, _EncData, HTTP3Machine1} ->
|
|
|
ct:pal("trailers"),
|
|
|
%% @todo EncData!!
|
|
@@ -510,7 +561,7 @@ commands(State=#state{http3_machine=HTTP3Machine0}, StreamRef, [{trailers, Trail
|
|
|
{ok, _} = quicer:send(StreamRef, cow_http3:data(<<>>), send_flag(fin)),
|
|
|
HTTP3Machine1
|
|
|
end,
|
|
|
- commands(State#state{http3_machine=HTTP3Machine}, StreamRef, Tail);
|
|
|
+ commands(State#state{http3_machine=HTTP3Machine}, Stream, Tail);
|
|
|
%% Send a push promise.
|
|
|
%%
|
|
|
%% @todo Responses sent as a result of a push_promise request
|
|
@@ -519,7 +570,7 @@ commands(State=#state{http3_machine=HTTP3Machine0}, StreamRef, [{trailers, Trail
|
|
|
%% @todo We should not send push_promise frames when we are
|
|
|
%% in the closing http2_status.
|
|
|
%commands(State0=#state{socket=Socket, transport=Transport, http3_machine=HTTP3Machine0},
|
|
|
-% StreamRef, [{push, Method, Scheme, Host, Port, Path, Qs, Headers0}|Tail]) ->
|
|
|
+% Stream, [{push, Method, Scheme, Host, Port, Path, Qs, Headers0}|Tail]) ->
|
|
|
% Authority = case {Scheme, Port} of
|
|
|
% {<<"http">>, 80} -> Host;
|
|
|
% {<<"https">>, 443} -> Host;
|
|
@@ -539,75 +590,77 @@ commands(State=#state{http3_machine=HTTP3Machine0}, StreamRef, [{trailers, Trail
|
|
|
% %% create the Req object, as it expects them to be flat.
|
|
|
% Headers = maps:to_list(maps:map(fun(_, V) -> iolist_to_binary(V) end, Headers0)),
|
|
|
% %% @todo
|
|
|
-% State = case cow_http2_machine:prepare_push_promise(StreamRef, HTTP3Machine0,
|
|
|
+% State = case cow_http2_machine:prepare_push_promise(StreamID, HTTP3Machine0,
|
|
|
% PseudoHeaders, Headers) of
|
|
|
-% {ok, PromisedStreamRef, HeaderBlock, HTTP3Machine} ->
|
|
|
+% {ok, PromisedStreamID, HeaderBlock, HTTP3Machine} ->
|
|
|
% Transport:send(Socket, cow_http2:push_promise(
|
|
|
-% StreamRef, PromisedStreamRef, HeaderBlock)),
|
|
|
+% StreamID, PromisedStreamID, HeaderBlock)),
|
|
|
% headers_frame(State0#state{http3_machine=HTTP2Machine},
|
|
|
-% PromisedStreamRef, fin, Headers, PseudoHeaders, 0);
|
|
|
+% PromisedStreamID, fin, Headers, PseudoHeaders, 0);
|
|
|
% {error, no_push} ->
|
|
|
% State0
|
|
|
% end,
|
|
|
-% commands(State, StreamRef, Tail);
|
|
|
+% commands(State, Stream, Tail);
|
|
|
%%% Read the request body.
|
|
|
-%commands(State0=#state{flow=Flow, streams=Streams}, StreamRef, [{flow, Size}|Tail]) ->
|
|
|
-commands(State, StreamRef, [{flow, _Size}|Tail]) ->
|
|
|
+%commands(State0=#state{flow=Flow, streams=Streams}, Stream, [{flow, Size}|Tail]) ->
|
|
|
+commands(State, Stream, [{flow, _Size}|Tail]) ->
|
|
|
%% @todo We should tell the QUIC stream to increase its window size.
|
|
|
-% #{StreamRef := Stream=#stream{flow=StreamFlow}} = Streams,
|
|
|
+% #{StreamID := Stream=#stream{flow=StreamFlow}} = Streams,
|
|
|
% State = update_window(State0#state{flow=Flow + Size,
|
|
|
-% streams=Streams#{StreamRef => Stream#stream{flow=StreamFlow + Size}}},
|
|
|
-% StreamRef),
|
|
|
- commands(State, StreamRef, Tail);
|
|
|
+% streams=Streams#{StreamID => Stream#stream{flow=StreamFlow + Size}}},
|
|
|
+% StreamID),
|
|
|
+ commands(State, Stream, Tail);
|
|
|
%% Supervise a child process.
|
|
|
-commands(State=#state{children=Children}, StreamRef, [{spawn, Pid, Shutdown}|Tail]) ->
|
|
|
- commands(State#state{children=cowboy_children:up(Children, Pid, StreamRef, Shutdown)},
|
|
|
- StreamRef, Tail);
|
|
|
+commands(State=#state{children=Children}, Stream=#stream{id=StreamID},
|
|
|
+ [{spawn, Pid, Shutdown}|Tail]) ->
|
|
|
+ commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)},
|
|
|
+ Stream, Tail);
|
|
|
%% Error handling.
|
|
|
-%commands(State, StreamRef, [Error = {internal_error, _, _}|_Tail]) ->
|
|
|
-% %% @todo Do we want to run the commands after an internal_error?
|
|
|
-% %% @todo Do we even allow commands after?
|
|
|
-% %% @todo Only reset when the stream still exists.
|
|
|
-% reset_stream(State, StreamRef, Error);
|
|
|
+commands(State, Stream, [Error = {internal_error, _, _}|_Tail]) ->
|
|
|
+ %% @todo Do we want to run the commands after an internal_error?
|
|
|
+ %% @todo Do we even allow commands after?
|
|
|
+ %% @todo Only reset when the stream still exists.
|
|
|
+ reset_stream(State, Stream, Error);
|
|
|
%% Upgrade to HTTP/2. This is triggered by cowboy_http2 itself.
|
|
|
%commands(State=#state{socket=Socket, transport=Transport, http2_status=upgrade},
|
|
|
-% StreamRef, [{switch_protocol, Headers, ?MODULE, _}|Tail]) ->
|
|
|
+% Stream, [{switch_protocol, Headers, ?MODULE, _}|Tail]) ->
|
|
|
% %% @todo This 101 response needs to be passed through stream handlers.
|
|
|
% Transport:send(Socket, cow_http:response(101, 'HTTP/1.1', maps:to_list(Headers))),
|
|
|
-% commands(State, StreamRef, Tail);
|
|
|
+% commands(State, Stream, Tail);
|
|
|
%% Use a different protocol within the stream (CONNECT :protocol).
|
|
|
%% @todo Make sure we error out when the feature is disabled.
|
|
|
-%commands(State0, StreamRef, [{switch_protocol, Headers, _Mod, _ModState}|Tail]) ->
|
|
|
-% State = info(State0, StreamRef, {headers, 200, Headers}),
|
|
|
-% commands(State, StreamRef, Tail);
|
|
|
+%commands(State0, Stream, [{switch_protocol, Headers, _Mod, _ModState}|Tail]) ->
|
|
|
+% State = info(State0, Stream, {headers, 200, Headers}),
|
|
|
+% commands(State, Stream, Tail);
|
|
|
%% Set options dynamically.
|
|
|
-commands(State, StreamRef, [{set_options, _Opts}|Tail]) ->
|
|
|
- commands(State, StreamRef, Tail);
|
|
|
-commands(State, StreamRef, [stop|_Tail]) ->
|
|
|
+commands(State, Stream, [{set_options, _Opts}|Tail]) ->
|
|
|
+ commands(State, Stream, Tail);
|
|
|
+commands(State, Stream, [stop|_Tail]) ->
|
|
|
ct:pal("stop"),
|
|
|
%% @todo Do we want to run the commands after a stop?
|
|
|
%% @todo Do we even allow commands after?
|
|
|
- stop_stream(State, StreamRef);
|
|
|
+ stop_stream(stream_store(State, Stream), Stream);
|
|
|
%% Log event.
|
|
|
-commands(State=#state{opts=Opts}, StreamRef, [Log={log, _, _, _}|Tail]) ->
|
|
|
+commands(State=#state{opts=Opts}, Stream, [Log={log, _, _, _}|Tail]) ->
|
|
|
cowboy:log(Log, Opts),
|
|
|
- commands(State, StreamRef, Tail).
|
|
|
+ commands(State, Stream, Tail).
|
|
|
|
|
|
-send_response(State0=#state{http3_machine=HTTP3Machine0}, StreamRef, StatusCode, Headers, Body) ->
|
|
|
+send_response(State0=#state{http3_machine=HTTP3Machine0},
|
|
|
+ Stream=#stream{id=StreamID, ref=StreamRef}, StatusCode, Headers, Body) ->
|
|
|
Size = case Body of
|
|
|
{sendfile, _, Bytes0, _} -> Bytes0;
|
|
|
_ -> iolist_size(Body)
|
|
|
end,
|
|
|
case Size of
|
|
|
0 ->
|
|
|
- State = send_headers(State0, StreamRef, fin, StatusCode, Headers),
|
|
|
- maybe_send_is_fin(State, StreamRef, fin);
|
|
|
+ State = send_headers(State0, Stream, fin, StatusCode, Headers),
|
|
|
+ maybe_send_is_fin(State, Stream, fin);
|
|
|
_ ->
|
|
|
%% @todo Add a test for HEAD to make sure we don't send the body when
|
|
|
%% returning {response...} from a stream handler (or {headers...} then {data...}).
|
|
|
%% @todo We must send EncData!
|
|
|
{ok, _IsFin, HeaderBlock, _EncData, HTTP3Machine}
|
|
|
- = cow_http3_machine:prepare_headers(StreamRef, HTTP3Machine0, nofin,
|
|
|
+ = cow_http3_machine:prepare_headers(StreamID, HTTP3Machine0, nofin,
|
|
|
#{status => cow_http:status_to_integer(StatusCode)},
|
|
|
headers_to_list(Headers)),
|
|
|
%% @todo It might be better to do async sends.
|
|
@@ -624,12 +677,13 @@ send_response(State0=#state{http3_machine=HTTP3Machine0}, StreamRef, StatusCode,
|
|
|
cow_http3:data(Body)
|
|
|
], send_flag(fin))
|
|
|
end,
|
|
|
- maybe_send_is_fin(State0#state{http3_machine=HTTP3Machine}, StreamRef, fin)
|
|
|
+ maybe_send_is_fin(State0#state{http3_machine=HTTP3Machine}, Stream, fin)
|
|
|
end.
|
|
|
|
|
|
-maybe_send_is_fin(State=#state{http3_machine=HTTP3Machine0}, StreamRef, fin) ->
|
|
|
- HTTP3Machine = cow_http3_machine:close_bidi_stream_for_sending(StreamRef, HTTP3Machine0),
|
|
|
- State#state{http3_machine=HTTP3Machine};
|
|
|
+maybe_send_is_fin(State=#state{http3_machine=HTTP3Machine0},
|
|
|
+ Stream=#stream{id=StreamID}, fin) ->
|
|
|
+ HTTP3Machine = cow_http3_machine:close_bidi_stream_for_sending(StreamID, HTTP3Machine0),
|
|
|
+ maybe_terminate_stream(State#state{http3_machine=HTTP3Machine}, Stream);
|
|
|
maybe_send_is_fin(State, _, _) ->
|
|
|
State.
|
|
|
|
|
@@ -641,9 +695,9 @@ send(StreamRef, IoData) ->
|
|
|
end.
|
|
|
|
|
|
send_headers(State=#state{http3_machine=HTTP3Machine0},
|
|
|
- StreamRef, IsFin0, StatusCode, Headers) ->
|
|
|
+ #stream{id=StreamID, ref=StreamRef}, IsFin0, StatusCode, Headers) ->
|
|
|
{ok, IsFin, HeaderBlock, _EncData, HTTP3Machine}
|
|
|
- = cow_http3_machine:prepare_headers(StreamRef, HTTP3Machine0, IsFin0,
|
|
|
+ = cow_http3_machine:prepare_headers(StreamID, HTTP3Machine0, IsFin0,
|
|
|
#{status => cow_http:status_to_integer(StatusCode)},
|
|
|
headers_to_list(Headers)),
|
|
|
{ok, _} = quicer:send(StreamRef, cow_http3:headers(HeaderBlock), send_flag(IsFin)),
|
|
@@ -660,7 +714,8 @@ headers_to_list(Headers) ->
|
|
|
send_flag(nofin) -> ?QUIC_SEND_FLAG_NONE;
|
|
|
send_flag(fin) -> ?QUIC_SEND_FLAG_FIN.
|
|
|
|
|
|
-reset_stream(State0=#state{http3_machine=HTTP3Machine0}, StreamRef, Error) ->
|
|
|
+reset_stream(State0=#state{http3_machine=HTTP3Machine0},
|
|
|
+ Stream=#stream{id=StreamID, ref=StreamRef}, Error) ->
|
|
|
Reason = case Error of
|
|
|
{internal_error, _, _} -> h3_internal_error;
|
|
|
{stream_error, Reason0, _} -> Reason0
|
|
@@ -669,11 +724,11 @@ reset_stream(State0=#state{http3_machine=HTTP3Machine0}, StreamRef, Error) ->
|
|
|
%% @todo Should we close the send side if the receive side was already closed?
|
|
|
quicer:shutdown_stream(StreamRef, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT,
|
|
|
cow_http3:error_to_code(Reason), infinity),
|
|
|
- State1 = case cow_http3_machine:reset_stream(StreamRef, HTTP3Machine0) of
|
|
|
+ State1 = case cow_http3_machine:reset_stream(StreamID, HTTP3Machine0) of
|
|
|
{ok, HTTP3Machine} ->
|
|
|
- terminate_stream(State0#state{http3_machine=HTTP3Machine}, StreamRef, Error);
|
|
|
+ terminate_stream(State0#state{http3_machine=HTTP3Machine}, Stream, Error);
|
|
|
{error, not_found} ->
|
|
|
- terminate_stream(State0, StreamRef, Error)
|
|
|
+ terminate_stream(State0, Stream, Error)
|
|
|
end,
|
|
|
%% @todo
|
|
|
% case reset_rate(State1) of
|
|
@@ -685,55 +740,58 @@ reset_stream(State0=#state{http3_machine=HTTP3Machine0}, StreamRef, Error) ->
|
|
|
% end.
|
|
|
State1.
|
|
|
|
|
|
-stop_stream(State0=#state{http3_machine=HTTP3Machine, streams=Streams}, StreamRef) ->
|
|
|
- #{StreamRef := Stream} = Streams,
|
|
|
+stop_stream(State0=#state{http3_machine=HTTP3Machine}, Stream=#stream{id=StreamID}) ->
|
|
|
%% We abort reading when stopping the stream but only
|
|
|
%% if the client was not finished sending data.
|
|
|
- State = case cow_http3_machine:get_stream_remote_state(StreamRef, HTTP3Machine) of
|
|
|
+ %% We mark the stream as 'stopping' either way.
|
|
|
+ State = case cow_http3_machine:get_stream_remote_state(StreamID, HTTP3Machine) of
|
|
|
{ok, fin} ->
|
|
|
- State0;
|
|
|
+ stream_store(State0, Stream#stream{status=stopping});
|
|
|
_ ->
|
|
|
stream_abort_receive(State0, Stream, h3_no_error)
|
|
|
end,
|
|
|
%% Then we may need to send a response or terminate it
|
|
|
%% if the stream handler did not do so already.
|
|
|
- case cow_http3_machine:get_stream_local_state(StreamRef, HTTP3Machine) of
|
|
|
+ case cow_http3_machine:get_stream_local_state(StreamID, HTTP3Machine) of
|
|
|
%% When the stream terminates normally (without resetting the stream)
|
|
|
%% and no response was sent, we need to send a proper response back to the client.
|
|
|
{ok, idle} ->
|
|
|
- info(State, StreamRef, {response, 204, #{}, <<>>});
|
|
|
+ info(State, StreamID, {response, 204, #{}, <<>>});
|
|
|
%% When a response was sent but not terminated, we need to close the stream.
|
|
|
%% We send a final DATA frame to complete the stream.
|
|
|
{ok, nofin} ->
|
|
|
ct:pal("error nofin"),
|
|
|
- info(State, StreamRef, {data, fin, <<>>});
|
|
|
+ info(State, StreamID, {data, fin, <<>>});
|
|
|
%% When a response was sent fully we can terminate the stream,
|
|
|
%% regardless of the stream being in half-closed or closed state.
|
|
|
_ ->
|
|
|
- terminate_stream(State, StreamRef, h3_no_error)
|
|
|
+ terminate_stream(State, Stream, normal)
|
|
|
end.
|
|
|
|
|
|
-terminate_stream(State=#state{streams=Streams0, children=Children0}, StreamRef, Reason) ->
|
|
|
- case maps:take(StreamRef, Streams0) of
|
|
|
- {#stream{state=StreamState}, Streams} ->
|
|
|
- terminate_stream_handler(State, StreamRef, Reason, StreamState),
|
|
|
- Children = cowboy_children:shutdown(Children0, StreamRef),
|
|
|
- stream_linger(State#state{streams=Streams, children=Children}, StreamRef);
|
|
|
- error ->
|
|
|
- State
|
|
|
- end.
|
|
|
+maybe_terminate_stream(State, Stream=#stream{status=stopping}) ->
|
|
|
+ terminate_stream(State, Stream, normal);
|
|
|
+%% The Stream will be stored in the State at the end of commands processing.
|
|
|
+maybe_terminate_stream(State, _) ->
|
|
|
+ State.
|
|
|
+
|
|
|
+terminate_stream(State=#state{streams=Streams0, children=Children0},
|
|
|
+ #stream{id=StreamID, state=StreamState}, Reason) ->
|
|
|
+ Streams = maps:remove(StreamID, Streams0),
|
|
|
+ terminate_stream_handler(State, StreamID, Reason, StreamState),
|
|
|
+ Children = cowboy_children:shutdown(Children0, StreamID),
|
|
|
+ stream_linger(State#state{streams=Streams, children=Children}, StreamID).
|
|
|
|
|
|
-terminate_stream_handler(#state{opts=Opts}, StreamRef, Reason, StreamState) ->
|
|
|
+terminate_stream_handler(#state{opts=Opts}, StreamID, Reason, StreamState) ->
|
|
|
try
|
|
|
- cowboy_stream:terminate(StreamRef, Reason, StreamState)
|
|
|
+ cowboy_stream:terminate(StreamID, Reason, StreamState)
|
|
|
catch Class:Exception:Stacktrace ->
|
|
|
cowboy:log(cowboy_stream:make_error_log(terminate,
|
|
|
- [StreamRef, Reason, StreamState],
|
|
|
+ [StreamID, Reason, StreamState],
|
|
|
Class, Exception, Stacktrace), Opts)
|
|
|
end.
|
|
|
|
|
|
-ignored_frame(State=#state{http3_machine=HTTP3Machine0}, #stream{ref=StreamRef}) ->
|
|
|
- case cow_http3_machine:ignored_frame(StreamRef, HTTP3Machine0) of
|
|
|
+ignored_frame(State=#state{http3_machine=HTTP3Machine0}, #stream{id=StreamID}) ->
|
|
|
+ case cow_http3_machine:ignored_frame(StreamID, HTTP3Machine0) of
|
|
|
{ok, HTTP3Machine} ->
|
|
|
State#state{http3_machine=HTTP3Machine};
|
|
|
{error, Error={connection_error, _, _}, HTTP3Machine} ->
|
|
@@ -743,7 +801,7 @@ ignored_frame(State=#state{http3_machine=HTTP3Machine0}, #stream{ref=StreamRef})
|
|
|
stream_abort_receive(State, Stream=#stream{ref=StreamRef}, Reason) ->
|
|
|
quicer:shutdown_stream(StreamRef, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE,
|
|
|
cow_http3:error_to_code(Reason), infinity),
|
|
|
- stream_update(State, Stream#stream{status=discard}).
|
|
|
+ stream_store(State, Stream#stream{status=stopping}).
|
|
|
|
|
|
%% @todo Graceful connection shutdown.
|
|
|
%% We terminate the connection immediately if it hasn't fully been initialized.
|
|
@@ -785,39 +843,42 @@ terminate_all_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], Reas
|
|
|
|
|
|
|
|
|
|
|
|
+stream_get(#state{streams=Streams}, StreamID) ->
|
|
|
+ maps:get(StreamID, Streams, error).
|
|
|
|
|
|
-stream_new_remote(State=#state{http3_machine=HTTP3Machine0, streams=Streams}, StreamRef, Flags) ->
|
|
|
+stream_new_remote(State=#state{http3_machine=HTTP3Machine0, streams=Streams},
|
|
|
+ StreamID, StreamRef, Flags) ->
|
|
|
{HTTP3Machine, Status} = case quicer:is_unidirectional(Flags) of
|
|
|
true ->
|
|
|
- {cow_http3_machine:init_unidi_stream(StreamRef, unidi_remote, HTTP3Machine0),
|
|
|
+ {cow_http3_machine:init_unidi_stream(StreamID, unidi_remote, HTTP3Machine0),
|
|
|
header};
|
|
|
false ->
|
|
|
- {cow_http3_machine:init_bidi_stream(StreamRef, HTTP3Machine0),
|
|
|
+ {cow_http3_machine:init_bidi_stream(StreamID, HTTP3Machine0),
|
|
|
normal}
|
|
|
end,
|
|
|
- Stream = #stream{ref=StreamRef, status=Status},
|
|
|
+ Stream = #stream{id=StreamID, ref=StreamRef, status=Status},
|
|
|
% ct:pal("new stream ~p ~p", [Stream, HTTP3Machine]),
|
|
|
- State#state{http3_machine=HTTP3Machine, streams=Streams#{StreamRef => Stream}}.
|
|
|
+ State#state{http3_machine=HTTP3Machine, streams=Streams#{StreamID => Stream}}.
|
|
|
|
|
|
stream_closed(State=#state{http3_machine=HTTP3Machine0, streams=Streams0},
|
|
|
- StreamRef, _Flags) ->
|
|
|
- case cow_http3_machine:close_stream(StreamRef, HTTP3Machine0) of
|
|
|
+ StreamID, _Flags) ->
|
|
|
+ case cow_http3_machine:close_stream(StreamID, HTTP3Machine0) of
|
|
|
{ok, HTTP3Machine} ->
|
|
|
%% @todo Some streams may not be bidi or remote.
|
|
|
- Streams = maps:remove(StreamRef, Streams0),
|
|
|
- %% @todo terminate stream
|
|
|
+ Streams = maps:remove(StreamID, Streams0),
|
|
|
+ %% @todo terminate stream if necessary
|
|
|
State#state{http3_machine=HTTP3Machine, streams=Streams};
|
|
|
{error, Error={connection_error, _, _}, HTTP3Machine} ->
|
|
|
terminate(State#state{http3_machine=HTTP3Machine}, Error)
|
|
|
end.
|
|
|
|
|
|
-stream_update(State=#state{streams=Streams}, Stream=#stream{ref=StreamRef}) ->
|
|
|
- State#state{streams=Streams#{StreamRef => Stream}}.
|
|
|
+stream_store(State=#state{streams=Streams}, Stream=#stream{id=StreamID}) ->
|
|
|
+ State#state{streams=Streams#{StreamID => Stream}}.
|
|
|
|
|
|
-stream_linger(State=#state{lingering_streams=Lingering0}, StreamRef) ->
|
|
|
+stream_linger(State=#state{lingering_streams=Lingering0}, StreamID) ->
|
|
|
%% We only keep up to 100 streams in this state. @todo Make it configurable?
|
|
|
- Lingering = [StreamRef|lists:sublist(Lingering0, 100 - 1)],
|
|
|
+ Lingering = [StreamID|lists:sublist(Lingering0, 100 - 1)],
|
|
|
State#state{lingering_streams=Lingering}.
|
|
|
|
|
|
-is_lingering_stream(#state{lingering_streams=Lingering}, StreamRef) ->
|
|
|
- lists:member(StreamRef, Lingering).
|
|
|
+is_lingering_stream(#state{lingering_streams=Lingering}, StreamID) ->
|
|
|
+ lists:member(StreamID, Lingering).
|