123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887 |
- %% Copyright (c) 2023, Loïc Hoguin <essen@ninenines.eu>
- %%
- %% Permission to use, copy, modify, and/or distribute this software for any
- %% purpose with or without fee is hereby granted, provided that the above
- %% copyright notice and this permission notice appear in all copies.
- %%
- %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
- %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
- %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
- %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- %% A key difference between cowboy_http2 and cowboy_http3
- %% is that HTTP/3 streams are QUIC streams and therefore
- %% much of the connection state is handled outside of
- %% Cowboy.
- -module(cowboy_http3).
- -export([init/3]).
- %% Temporary callback to do sendfile over QUIC.
- -export([send/2]).
- -record(stream, {
- id :: non_neg_integer(), %% @todo specs
- %% Whether the stream is currently in a special state.
- status :: header | normal | {data, non_neg_integer()} | stopping,
- %% Stream buffer.
- buffer = <<>> :: binary(),
- %% Stream state.
- state = undefined :: undefined | {module, any()}
- }).
- -record(state, {
- parent :: pid(),
- conn :: any(), %% @todo specs
- opts = #{} :: any(), %% @todo opts(),
- %% Remote address and port for the connection.
- peer = undefined :: {inet:ip_address(), inet:port_number()},
- %% Local address and port for the connection.
- sock = undefined :: {inet:ip_address(), inet:port_number()},
- %% HTTP/3 state machine.
- http3_machine :: cow_http3_machine:http3_machine(),
- %% Quick pointers for commonly used streams.
- local_control_id :: any(), %% @todo specs Control stream must not be closed.
- local_encoder_id :: any(), %% @todo specs
- local_decoder_id :: any(), %% @todo specs
- %% Bidirectional streams used for requests and responses,
- %% as well as unidirectional streams initiated by the client.
- streams = #{} :: map(), %% @todo specs
- %% @todo a ref/id map because stream_closed we don't have the id
- %% Lingering streams that were recently reset. We may receive
- %% pending data or messages for these streams a short while
- %% after they have been reset.
- lingering_streams = [] :: [non_neg_integer()],
- %% Streams can spawn zero or more children which are then managed
- %% by this module if operating as a supervisor.
- children = cowboy_children:init() :: cowboy_children:children()
- }).
- -spec init(_, _, _) -> no_return().
- init(Parent, Conn, Opts) ->
- %ct:pal("init"),
- {ok, SettingsBin, HTTP3Machine0} = cow_http3_machine:init(server, Opts),
- %% Immediately open a control, encoder and decoder stream.
- {ok, ControlID} = cowboy_quicer:start_unidi_stream(Conn, [<<0>>, SettingsBin]),
- {ok, EncoderID} = cowboy_quicer:start_unidi_stream(Conn, <<2>>),
- {ok, DecoderID} = cowboy_quicer:start_unidi_stream(Conn, <<3>>),
- %ct:pal("control ~p encoder ~p decoder ~p", [ControlID, EncoderID, DecoderID]),
- %% Set the control, encoder and decoder streams in the machine.
- HTTP3Machine = cow_http3_machine:init_unidi_local_streams(
- ControlID, EncoderID, DecoderID, HTTP3Machine0),
- %% Get the peername/sockname.
- Peer0 = cowboy_quicer:peername(Conn),
- Sock0 = cowboy_quicer:sockname(Conn),
- %% @todo Get the peer certificate here if it makes sense.
- case {Peer0, Sock0} of
- {{ok, Peer}, {ok, Sock}} ->
- %% Quick! Let's go!
- loop(#state{parent=Parent, conn=Conn, opts=Opts,
- peer=Peer, sock=Sock, http3_machine=HTTP3Machine,
- local_control_id=ControlID,
- local_encoder_id=EncoderID,
- local_decoder_id=DecoderID});
- {{error, Reason}, _} ->
- terminate(undefined, {socket_error, Reason,
- 'A socket error occurred when retrieving the peer name.'});
- {_, {error, Reason}} ->
- terminate(undefined, {socket_error, Reason,
- 'A socket error occurred when retrieving the sock name.'})
- end.
- loop(State0=#state{children=Children}) ->
- %ct:pal("~p", [process_info(self(), messages)]),
- receive
- Msg when element(1, Msg) =:= quic ->
- handle_quic_msg(State0, Msg);
- %% Timeouts.
- {timeout, Ref, {shutdown, Pid}} ->
- cowboy_children:shutdown_timeout(Children, Ref, Pid),
- loop(State0);
- %% Messages pertaining to a stream.
- {{Pid, StreamID}, Msg} when Pid =:= self() ->
- loop(info(State0, StreamID, Msg));
- %% Exit signal from children.
- Msg = {'EXIT', Pid, _} ->
- loop(down(State0, Pid, Msg));
- Msg ->
- ct:pal("cowboy msg ~p", [Msg]),
- loop(State0)
- end.
- handle_quic_msg(State0, Msg) ->
- case cowboy_quicer:handle(Msg) of
- {data, StreamID, IsFin, Data} ->
- % ct:pal("{data, ~p, ~p, ~p}", [StreamID, IsFin, Data]),
- parse(State0, Data, StreamID, IsFin);
- {stream_started, StreamID, StreamType} ->
- % ct:pal("~p stream_started ~p ~p", [self(), StreamID, StreamType]),
- State = stream_new_remote(State0, StreamID, StreamType),
- loop(State);
- {stream_closed, StreamID, ErrorCode} ->
- % ct:pal("stream_closed ~p state ~p code ~p", [StreamID, State0, ErrorCode]),
- State = stream_closed(State0, StreamID, ErrorCode),
- loop(State);
- closed ->
- %% @todo terminate here?
- ok;
- ok ->
- loop(State0)
- end.
- parse(State=#state{opts=Opts}, Data, StreamID, IsFin) ->
- case stream_get(State, StreamID) of
- Stream=#stream{buffer= <<>>} ->
- parse1(State, Data, Stream, IsFin);
- Stream=#stream{buffer=Buffer} ->
- Stream1 = Stream#stream{buffer= <<>>},
- parse1(stream_store(State, Stream1),
- <<Buffer/binary, Data/binary>>, Stream1, IsFin);
- %% Pending data for a stream that has been reset. Ignore.
- error ->
- case is_lingering_stream(State, StreamID) of
- true ->
- ok;
- false ->
- %% We avoid logging the data as it could be quite large.
- cowboy:log(warning, "Received data for unknown stream ~p.",
- [StreamID], Opts)
- end,
- loop(State)
- end.
- %% @todo Swap Data and Stream/StreamID?
- parse1(State, Data, Stream=#stream{status=header}, IsFin) ->
- parse_unidirectional_stream_header(State, Data, Stream, IsFin);
- parse1(State, Data, Stream=#stream{status={data, Len}, id=StreamID}, IsFin) ->
- DataLen = byte_size(Data),
- if
- DataLen < Len ->
- %% We don't have the full frame but this is the end of the
- %% data we have. So FrameIsFin is equivalent to IsFin here.
- loop(frame(State, Stream#stream{status={data, Len - DataLen}}, {data, Data}, IsFin));
- true ->
- <<Data1:Len/binary, Rest/bits>> = Data,
- FrameIsFin = is_fin(IsFin, Rest),
- parse(frame(State, Stream#stream{status=normal}, {data, Data1}, FrameIsFin),
- Rest, StreamID, IsFin)
- end;
- %% @todo Clause that discards receiving data for stopping streams.
- %% We may receive a few more frames after we abort receiving.
- parse1(State, Data, Stream=#stream{id=StreamID}, IsFin) ->
- case cow_http3:parse(Data) of
- {ok, Frame, Rest} ->
- FrameIsFin = is_fin(IsFin, Rest),
- % ct:pal("parse1 Frame= ~p Rest= ~p", [Frame, Rest]),
- parse(frame(State, Stream, Frame, FrameIsFin), Rest, StreamID, IsFin);
- {more, Frame, Len} ->
- %% We're at the end of the data so FrameIsFin is equivalent to IsFin.
- case IsFin of
- nofin ->
- loop(frame(State, Stream#stream{status={data, Len}}, Frame, nofin));
- fin ->
- terminate(State, {connection_error, h3_frame_error,
- 'Last frame on stream was truncated. (RFC9114 7.1)'})
- end;
- {ignore, Rest} ->
- parse(ignored_frame(State, Stream), Rest, StreamID, IsFin);
- Error = {connection_error, _, _} ->
- terminate(State, Error);
- more when Data =:= <<>> ->
- loop(stream_store(State, Stream#stream{buffer=Data}));
- more ->
- %% We're at the end of the data so FrameIsFin is equivalent to IsFin.
- case IsFin of
- nofin ->
- loop(stream_store(State, Stream#stream{buffer=Data}));
- fin ->
- terminate(State, {connection_error, h3_frame_error,
- 'Last frame on stream was truncated. (RFC9114 7.1)'})
- end
- end.
- %% We may receive multiple frames in a single QUIC packet.
- %% The FIN flag applies to the QUIC packet, not to the frame.
- %% We must therefore only consider the frame to have a FIN
- %% flag if there's no data remaining to be read.
- is_fin(fin, <<>>) -> fin;
- is_fin(_, _) -> nofin.
- parse_unidirectional_stream_header(State0=#state{http3_machine=HTTP3Machine0},
- Data, Stream0=#stream{id=StreamID}, IsFin) ->
- case cow_http3:parse_unidi_stream_header(Data) of
- {ok, Type, Rest} when Type =:= control; Type =:= encoder; Type =:= decoder ->
- case cow_http3_machine:set_unidi_remote_stream_type(
- StreamID, Type, HTTP3Machine0) of
- {ok, HTTP3Machine} ->
- State = State0#state{http3_machine=HTTP3Machine},
- Stream = Stream0#stream{status=normal},
- parse(stream_store(State, Stream), Rest, StreamID, IsFin);
- {error, Error={connection_error, _, _}, HTTP3Machine} ->
- terminate(State0#state{http3_machine=HTTP3Machine}, Error)
- end;
- {ok, push, _} ->
- terminate(State0, {connection_error, h3_stream_creation_error,
- 'Only servers can push. (RFC9114 6.2.2)'});
- %% Unknown stream types must be ignored. We choose to abort the
- %% stream instead of reading and discarding the incoming data.
- {undefined, _} ->
- loop(stream_abort_receive(State0, Stream0, h3_stream_creation_error))
- end.
- frame(State=#state{http3_machine=HTTP3Machine0, conn=Conn, local_decoder_id=DecoderID},
- Stream=#stream{id=StreamID}, Frame, IsFin) ->
- % ct:pal("cowboy frame ~p ~p", [Frame, IsFin]),
- case cow_http3_machine:frame(Frame, IsFin, StreamID, HTTP3Machine0) of
- {ok, HTTP3Machine} ->
- State#state{http3_machine=HTTP3Machine};
- {ok, {data, Data}, HTTP3Machine} ->
- data_frame(State#state{http3_machine=HTTP3Machine}, Stream, IsFin, Data);
- %% @todo I don't think we need the IsFin in the {headers tuple.
- {ok, {headers, IsFin, Headers, PseudoHeaders, BodyLen}, HTTP3Machine} ->
- headers_frame(State#state{http3_machine=HTTP3Machine},
- Stream, IsFin, Headers, PseudoHeaders, BodyLen);
- {ok, {headers, IsFin, Headers, PseudoHeaders, BodyLen}, DecData, HTTP3Machine} ->
- %% Send the decoder data.
- ok = cowboy_quicer:send(Conn, DecoderID, DecData),
- headers_frame(State#state{http3_machine=HTTP3Machine},
- Stream, IsFin, Headers, PseudoHeaders, BodyLen);
- {ok, {trailers, _Trailers}, HTTP3Machine} ->
- %% @todo Propagate trailers.
- State#state{http3_machine=HTTP3Machine};
- {ok, GoAway={goaway, _}, HTTP3Machine} ->
- goaway(State#state{http3_machine=HTTP3Machine}, GoAway);
- {error, Error={stream_error, _Reason, _Human}, HTTP3Machine} ->
- reset_stream(State#state{http3_machine=HTTP3Machine}, Stream, Error);
- {error, Error={connection_error, _, _}, HTTP3Machine} ->
- terminate(State#state{http3_machine=HTTP3Machine}, Error)
- end.
- data_frame(State=#state{opts=Opts},
- Stream=#stream{id=StreamID, state=StreamState0}, IsFin, Data) ->
- try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
- {Commands, StreamState} ->
- commands(State, Stream#stream{state=StreamState}, Commands)
- catch Class:Exception:Stacktrace ->
- cowboy:log(cowboy_stream:make_error_log(data,
- [StreamID, IsFin, Data, StreamState0],
- Class, Exception, Stacktrace), Opts),
- reset_stream(State, Stream, {internal_error, {Class, Exception},
- 'Unhandled exception in cowboy_stream:data/4.'})
- end.
- headers_frame(State, Stream, IsFin, Headers,
- PseudoHeaders=#{method := <<"CONNECT">>}, _)
- when map_size(PseudoHeaders) =:= 2 ->
- early_error(State, Stream, IsFin, Headers, PseudoHeaders, 501,
- 'The CONNECT method is currently not implemented. (RFC7231 4.3.6)');
- headers_frame(State, Stream, IsFin, Headers,
- PseudoHeaders=#{method := <<"TRACE">>}, _) ->
- early_error(State, Stream, IsFin, Headers, PseudoHeaders, 501,
- 'The TRACE method is currently not implemented. (RFC9114 4.4, RFC7231 4.3.8)');
- headers_frame(State, Stream, IsFin, Headers, PseudoHeaders=#{authority := Authority}, BodyLen) ->
- headers_frame_parse_host(State, Stream, IsFin, Headers, PseudoHeaders, BodyLen, Authority);
- headers_frame(State, Stream, IsFin, Headers, PseudoHeaders, BodyLen) ->
- case lists:keyfind(<<"host">>, 1, Headers) of
- {_, Authority} ->
- headers_frame_parse_host(State, Stream, IsFin, Headers, PseudoHeaders, BodyLen, Authority);
- _ ->
- reset_stream(State, Stream, {stream_error, h3_message_error,
- 'Requests translated from HTTP/1.1 must include a host header. (RFC7540 8.1.2.3, RFC7230 5.4)'})
- end.
- headers_frame_parse_host(State=#state{peer=Peer, sock=Sock},
- Stream=#stream{id=StreamID}, IsFin, Headers,
- PseudoHeaders=#{method := Method, scheme := Scheme, path := PathWithQs},
- BodyLen, Authority) ->
- try cow_http_hd:parse_host(Authority) of
- {Host, Port0} ->
- Port = ensure_port(Scheme, Port0),
- try cow_http:parse_fullpath(PathWithQs) of
- {<<>>, _} ->
- reset_stream(State, Stream, {stream_error, h3_message_error,
- 'The path component must not be empty. (RFC7540 8.1.2.3)'});
- {Path, Qs} ->
- Req0 = #{
- ref => quic, %% @todo Ref,
- pid => self(),
- streamid => StreamID,
- peer => Peer,
- sock => Sock,
- cert => undefined, %Cert, %% @todo
- method => Method,
- scheme => Scheme,
- host => Host,
- port => Port,
- path => Path,
- qs => Qs,
- version => 'HTTP/3',
- headers => headers_to_map(Headers, #{}),
- has_body => IsFin =:= nofin,
- body_length => BodyLen
- },
- %% We add the protocol information for extended CONNECTs.
- Req = case PseudoHeaders of
- #{protocol := Protocol} -> Req0#{protocol => Protocol};
- _ -> Req0
- end,
- headers_frame(State, Stream, Req)
- catch _:_ ->
- reset_stream(State, Stream, {stream_error, h3_message_error,
- 'The :path pseudo-header is invalid. (RFC7540 8.1.2.3)'})
- end
- catch _:_ ->
- reset_stream(State, Stream, {stream_error, h3_message_error,
- 'The :authority pseudo-header is invalid. (RFC7540 8.1.2.3)'})
- end.
- %% @todo Copied from cowboy_http2.
- %% @todo Remove "http"? Probably.
- ensure_port(<<"http">>, undefined) -> 80;
- ensure_port(<<"https">>, undefined) -> 443;
- ensure_port(_, Port) -> Port.
- %% @todo Copied from cowboy_http2.
- %% This function is necessary to properly handle duplicate headers
- %% and the special-case cookie header.
- headers_to_map([], Acc) ->
- Acc;
- headers_to_map([{Name, Value}|Tail], Acc0) ->
- Acc = case Acc0 of
- %% The cookie header does not use proper HTTP header lists.
- #{Name := Value0} when Name =:= <<"cookie">> ->
- Acc0#{Name => << Value0/binary, "; ", Value/binary >>};
- #{Name := Value0} ->
- Acc0#{Name => << Value0/binary, ", ", Value/binary >>};
- _ ->
- Acc0#{Name => Value}
- end,
- headers_to_map(Tail, Acc).
- headers_frame(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Req) ->
- %ct:pal("req ~p", [Req]),
- try cowboy_stream:init(StreamID, Req, Opts) of
- {Commands, StreamState} ->
- %logger:error("~p", [Commands]),
- %logger:error("~p", [StreamState]),
- commands(State, Stream#stream{state=StreamState}, Commands)
- catch Class:Exception:Stacktrace ->
- cowboy:log(cowboy_stream:make_error_log(init,
- [StreamID, Req, Opts],
- Class, Exception, Stacktrace), Opts),
- reset_stream(State, Stream, {internal_error, {Class, Exception},
- 'Unhandled exception in cowboy_stream:init/3.'})
- end.
- early_error(State0=#state{opts=Opts, peer=Peer},
- Stream=#stream{id=StreamID}, _IsFin, Headers, #{method := Method},
- StatusCode0, HumanReadable) ->
- %% We automatically terminate the stream but it is not an error
- %% per se (at least not in the first implementation).
- Reason = {stream_error, h3_no_error, HumanReadable},
- %% The partial Req is minimal for now. We only have one case
- %% where it can be called (when a method is completely disabled).
- %% @todo Fill in the other elements.
- PartialReq = #{
- ref => quic, %% @todo Ref,
- peer => Peer,
- method => Method,
- headers => headers_to_map(Headers, #{})
- },
- Resp = {response, StatusCode0, RespHeaders0=#{<<"content-length">> => <<"0">>}, <<>>},
- try cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts) of
- {response, StatusCode, RespHeaders, RespBody} ->
- send_response(State0, Stream, StatusCode, RespHeaders, RespBody)
- catch Class:Exception:Stacktrace ->
- cowboy:log(cowboy_stream:make_error_log(early_error,
- [StreamID, Reason, PartialReq, Resp, Opts],
- Class, Exception, Stacktrace), Opts),
- %% We still need to send an error response, so send what we initially
- %% wanted to send. It's better than nothing.
- send_headers(State0, StreamID, fin, StatusCode0, RespHeaders0)
- end.
- %% Erlang messages.
- down(State0=#state{opts=Opts, children=Children0}, Pid, Msg) ->
- State = case cowboy_children:down(Children0, Pid) of
- %% The stream was terminated already.
- {ok, undefined, Children} ->
- State0#state{children=Children};
- %% The stream is still running.
- {ok, StreamID, Children} ->
- info(State0#state{children=Children}, StreamID, Msg);
- %% The process was unknown.
- error ->
- cowboy:log(warning, "Received EXIT signal ~p for unknown process ~p.~n",
- [Msg, Pid], Opts),
- State0
- end,
- if
- %% @todo
- % State#state.http2_status =:= closing, State#state.streams =:= #{} ->
- % terminate(State, {stop, normal, 'The connection is going away.'});
- true ->
- State
- end.
- info(State=#state{opts=Opts, http3_machine=_HTTP3Machine}, StreamID, Msg) ->
- %ct:pal("INFO ~p ~p ~p", [State, StreamID, Msg]),
- case stream_get(State, StreamID) of
- Stream=#stream{state=StreamState0} ->
- try cowboy_stream:info(StreamID, Msg, StreamState0) of
- {Commands, StreamState} ->
- %ct:pal("~p", [Commands]),
- %logger:error("~p ~p", [StreamID, Streams]),
- commands(State, Stream#stream{state=StreamState}, Commands)
- catch Class:Exception:Stacktrace ->
- cowboy:log(cowboy_stream:make_error_log(info,
- [StreamID, Msg, StreamState0],
- Class, Exception, Stacktrace), Opts),
- reset_stream(State, Stream, {internal_error, {Class, Exception},
- 'Unhandled exception in cowboy_stream:info/3.'})
- end;
- error ->
- case is_lingering_stream(State, StreamID) of
- true ->
- ok;
- false ->
- cowboy:log(warning, "Received message ~p for unknown stream ~p.",
- [Msg, StreamID], Opts)
- end,
- State
- end.
- %% Stream handler commands.
- commands(State, Stream, []) ->
- stream_store(State, Stream);
- %% Error responses are sent only if a response wasn't sent already.
- commands(State=#state{http3_machine=HTTP3Machine}, Stream=#stream{id=StreamID},
- [{error_response, StatusCode, Headers, Body}|Tail]) ->
- case cow_http3_machine:get_stream_local_state(StreamID, HTTP3Machine) of
- {ok, idle} ->
- commands(State, Stream, [{response, StatusCode, Headers, Body}|Tail]);
- _ ->
- commands(State, Stream, Tail)
- end;
- %% Send an informational response.
- commands(State0, Stream, [{inform, StatusCode, Headers}|Tail]) ->
- State = send_headers(State0, Stream, idle, StatusCode, Headers),
- commands(State, Stream, Tail);
- %% Send response headers.
- commands(State0, Stream, [{response, StatusCode, Headers, Body}|Tail]) ->
- % ct:pal("~p commands response ~p ~p ~p", [self(), StatusCode, Headers, try iolist_size(Body) catch _:_ -> Body end]),
- State = send_response(State0, Stream, StatusCode, Headers, Body),
- commands(State, Stream, Tail);
- %% Send response headers.
- commands(State0, Stream, [{headers, StatusCode, Headers}|Tail]) ->
- % ct:pal("commands headers ~p ~p", [StatusCode, Headers]),
- State = send_headers(State0, Stream, nofin, StatusCode, Headers),
- commands(State, Stream, Tail);
- %%% Send a response body chunk.
- commands(State0=#state{conn=Conn}, Stream=#stream{id=StreamID}, [{data, IsFin, Data}|Tail]) ->
- % ct:pal("commands data ~p ~p", [IsFin, try iolist_size(Data) catch _:_ -> Data end]),
- _ = case Data of
- {sendfile, Offset, Bytes, Path} ->
- %% Temporary solution to do sendfile over QUIC.
- {ok, _} = ranch_transport:sendfile(?MODULE, {Conn, StreamID},
- Path, Offset, Bytes, []),
- ok = cowboy_quicer:send(Conn, StreamID, cow_http3:data(<<>>), IsFin);
- _ ->
- ok = cowboy_quicer:send(Conn, StreamID, cow_http3:data(Data), IsFin)
- end,
- State = maybe_send_is_fin(State0, Stream, IsFin),
- commands(State, Stream, Tail);
- %%% Send trailers.
- commands(State=#state{conn=Conn, http3_machine=HTTP3Machine0},
- Stream=#stream{id=StreamID}, [{trailers, Trailers}|Tail]) ->
- % ct:pal("commands trailers ~p", [Trailers]),
- HTTP3Machine = case cow_http3_machine:prepare_trailers(
- StreamID, HTTP3Machine0, maps:to_list(Trailers)) of
- {trailers, HeaderBlock, _EncData, HTTP3Machine1} ->
- % ct:pal("trailers"),
- %% @todo EncData!!
- ok = cowboy_quicer:send(Conn, StreamID, cow_http3:headers(HeaderBlock), fin),
- HTTP3Machine1;
- {no_trailers, HTTP3Machine1} ->
- % ct:pal("no_trailers"),
- ok = cowboy_quicer:send(Conn, StreamID, cow_http3:data(<<>>), fin),
- HTTP3Machine1
- end,
- commands(State#state{http3_machine=HTTP3Machine}, Stream, Tail);
- %% Send a push promise.
- %%
- %% @todo Responses sent as a result of a push_promise request
- %% must not send push_promise frames themselves.
- %%
- %% @todo We should not send push_promise frames when we are
- %% in the closing http2_status.
- %commands(State0=#state{socket=Socket, transport=Transport, http3_machine=HTTP3Machine0},
- % Stream, [{push, Method, Scheme, Host, Port, Path, Qs, Headers0}|Tail]) ->
- % Authority = case {Scheme, Port} of
- % {<<"http">>, 80} -> Host;
- % {<<"https">>, 443} -> Host;
- % _ -> iolist_to_binary([Host, $:, integer_to_binary(Port)])
- % end,
- % PathWithQs = iolist_to_binary(case Qs of
- % <<>> -> Path;
- % _ -> [Path, $?, Qs]
- % end),
- % PseudoHeaders = #{
- % method => Method,
- % scheme => Scheme,
- % authority => Authority,
- % path => PathWithQs
- % },
- % %% We need to make sure the header value is binary before we can
- % %% create the Req object, as it expects them to be flat.
- % Headers = maps:to_list(maps:map(fun(_, V) -> iolist_to_binary(V) end, Headers0)),
- % %% @todo
- % State = case cow_http2_machine:prepare_push_promise(StreamID, HTTP3Machine0,
- % PseudoHeaders, Headers) of
- % {ok, PromisedStreamID, HeaderBlock, HTTP3Machine} ->
- % Transport:send(Socket, cow_http2:push_promise(
- % StreamID, PromisedStreamID, HeaderBlock)),
- % headers_frame(State0#state{http3_machine=HTTP2Machine},
- % PromisedStreamID, fin, Headers, PseudoHeaders, 0);
- % {error, no_push} ->
- % State0
- % end,
- % commands(State, Stream, Tail);
- %%% Read the request body.
- %commands(State0=#state{flow=Flow, streams=Streams}, Stream, [{flow, Size}|Tail]) ->
- commands(State, Stream, [{flow, _Size}|Tail]) ->
- %% @todo We should tell the QUIC stream to increase its window size.
- % #{StreamID := Stream=#stream{flow=StreamFlow}} = Streams,
- % State = update_window(State0#state{flow=Flow + Size,
- % streams=Streams#{StreamID => Stream#stream{flow=StreamFlow + Size}}},
- % StreamID),
- commands(State, Stream, Tail);
- %% Supervise a child process.
- commands(State=#state{children=Children}, Stream=#stream{id=StreamID},
- [{spawn, Pid, Shutdown}|Tail]) ->
- commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)},
- Stream, Tail);
- %% Error handling.
- commands(State, Stream, [Error = {internal_error, _, _}|_Tail]) ->
- %% @todo Do we want to run the commands after an internal_error?
- %% @todo Do we even allow commands after?
- %% @todo Only reset when the stream still exists.
- reset_stream(State, Stream, Error);
- %% Use a different protocol within the stream (CONNECT :protocol).
- %% @todo Make sure we error out when the feature is disabled.
- commands(State0, Stream0=#stream{id=StreamID},
- [{switch_protocol, Headers, _Mod, _ModState}|Tail]) ->
- State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}),
- Stream = stream_get(State, StreamID),
- commands(State, Stream, Tail);
- %% Set options dynamically.
- commands(State, Stream, [{set_options, _Opts}|Tail]) ->
- commands(State, Stream, Tail);
- commands(State, Stream, [stop|_Tail]) ->
- % ct:pal("stop"),
- %% @todo Do we want to run the commands after a stop?
- %% @todo Do we even allow commands after?
- stop_stream(stream_store(State, Stream), Stream);
- %% Log event.
- commands(State=#state{opts=Opts}, Stream, [Log={log, _, _, _}|Tail]) ->
- cowboy:log(Log, Opts),
- commands(State, Stream, Tail).
- send_response(State0=#state{conn=Conn, http3_machine=HTTP3Machine0},
- Stream=#stream{id=StreamID}, StatusCode, Headers, Body) ->
- Size = case Body of
- {sendfile, _, Bytes0, _} -> Bytes0;
- _ -> iolist_size(Body)
- end,
- case Size of
- 0 ->
- State = send_headers(State0, Stream, fin, StatusCode, Headers),
- maybe_send_is_fin(State, Stream, fin);
- _ ->
- %% @todo Add a test for HEAD to make sure we don't send the body when
- %% returning {response...} from a stream handler (or {headers...} then {data...}).
- %% @todo We must send EncData!
- {ok, _IsFin, HeaderBlock, _EncData, HTTP3Machine}
- = cow_http3_machine:prepare_headers(StreamID, HTTP3Machine0, nofin,
- #{status => cow_http:status_to_integer(StatusCode)},
- headers_to_list(Headers)),
- %% @todo It might be better to do async sends.
- _ = case Body of
- {sendfile, Offset, Bytes, Path} ->
- ok = cowboy_quicer:send(Conn, StreamID,
- cow_http3:headers(HeaderBlock)),
- %% Temporary solution to do sendfile over QUIC.
- {ok, _} = ranch_transport:sendfile(?MODULE, {Conn, StreamID},
- Path, Offset, Bytes, []),
- ok = cowboy_quicer:send(Conn, StreamID,
- cow_http3:data(<<>>), fin);
- _ ->
- ok = cowboy_quicer:send(Conn, StreamID, [
- cow_http3:headers(HeaderBlock),
- cow_http3:data(Body)
- ], fin)
- end,
- maybe_send_is_fin(State0#state{http3_machine=HTTP3Machine}, Stream, fin)
- end.
- maybe_send_is_fin(State=#state{http3_machine=HTTP3Machine0},
- Stream=#stream{id=StreamID}, fin) ->
- HTTP3Machine = cow_http3_machine:close_bidi_stream_for_sending(StreamID, HTTP3Machine0),
- maybe_terminate_stream(State#state{http3_machine=HTTP3Machine}, Stream);
- maybe_send_is_fin(State, _, _) ->
- State.
- %% Temporary callback to do sendfile over QUIC.
- -spec send(_, _) -> _. %% @todo
- send({Conn, StreamID}, IoData) ->
- cowboy_quicer:send(Conn, StreamID, cow_http3:data(IoData)).
- send_headers(State=#state{conn=Conn, http3_machine=HTTP3Machine0},
- #stream{id=StreamID}, IsFin0, StatusCode, Headers) ->
- {ok, IsFin, HeaderBlock, _EncData, HTTP3Machine}
- = cow_http3_machine:prepare_headers(StreamID, HTTP3Machine0, IsFin0,
- #{status => cow_http:status_to_integer(StatusCode)},
- headers_to_list(Headers)),
- ok = cowboy_quicer:send(Conn, StreamID, cow_http3:headers(HeaderBlock), IsFin),
- %% @todo Send _EncData.
- State#state{http3_machine=HTTP3Machine}.
- %% The set-cookie header is special; we can only send one cookie per header.
- headers_to_list(Headers0=#{<<"set-cookie">> := SetCookies}) ->
- Headers = maps:to_list(maps:remove(<<"set-cookie">>, Headers0)),
- Headers ++ [{<<"set-cookie">>, Value} || Value <- SetCookies];
- headers_to_list(Headers) ->
- maps:to_list(Headers).
- reset_stream(State0=#state{conn=Conn, http3_machine=HTTP3Machine0},
- Stream=#stream{id=StreamID}, Error) ->
- %ct:pal("~p reset_stream ~p ~p", [self(), Stream, Error]),
- Reason = case Error of
- {internal_error, _, _} -> h3_internal_error;
- {stream_error, Reason0, _} -> Reason0
- end,
- %% @todo Do we want to close both sides?
- %% @todo Should we close the send side if the receive side was already closed?
- Res = cowboy_quicer:shutdown_stream(Conn, StreamID,
- both, cow_http3:error_to_code(Reason)),
- % ct:pal("~p reset_stream res ~p", [self(), Res]),
- State1 = case cow_http3_machine:reset_stream(StreamID, HTTP3Machine0) of
- {ok, HTTP3Machine} ->
- terminate_stream(State0#state{http3_machine=HTTP3Machine}, Stream, Error);
- {error, not_found} ->
- terminate_stream(State0, Stream, Error)
- end,
- %% @todo
- % case reset_rate(State1) of
- % {ok, State} ->
- % State;
- % error ->
- % terminate(State1, {connection_error, enhance_your_calm,
- % 'Stream reset rate larger than configuration allows. Flood? (CVE-2019-9514)'})
- % end.
- State1.
- stop_stream(State0=#state{http3_machine=HTTP3Machine}, Stream=#stream{id=StreamID}) ->
- %ct:pal("stop_stream ~p ~p", [State0, Stream]),
- %% We abort reading when stopping the stream but only
- %% if the client was not finished sending data.
- %% We mark the stream as 'stopping' either way.
- State = case cow_http3_machine:get_stream_remote_state(StreamID, HTTP3Machine) of
- {ok, fin} ->
- stream_store(State0, Stream#stream{status=stopping});
- {error, not_found} ->
- stream_store(State0, Stream#stream{status=stopping});
- _ ->
- stream_abort_receive(State0, Stream, h3_no_error)
- end,
- %% Then we may need to send a response or terminate it
- %% if the stream handler did not do so already.
- case cow_http3_machine:get_stream_local_state(StreamID, HTTP3Machine) of
- %% When the stream terminates normally (without resetting the stream)
- %% and no response was sent, we need to send a proper response back to the client.
- {ok, idle} ->
- info(State, StreamID, {response, 204, #{}, <<>>});
- %% When a response was sent but not terminated, we need to close the stream.
- %% We send a final DATA frame to complete the stream.
- {ok, nofin} ->
- % ct:pal("error nofin"),
- info(State, StreamID, {data, fin, <<>>});
- %% When a response was sent fully we can terminate the stream,
- %% regardless of the stream being in half-closed or closed state.
- _ ->
- terminate_stream(State, Stream, normal)
- end.
- maybe_terminate_stream(State, Stream=#stream{status=stopping}) ->
- terminate_stream(State, Stream, normal);
- %% The Stream will be stored in the State at the end of commands processing.
- maybe_terminate_stream(State, _) ->
- State.
- terminate_stream(State=#state{streams=Streams0, children=Children0},
- #stream{id=StreamID, state=StreamState}, Reason) ->
- Streams = maps:remove(StreamID, Streams0),
- terminate_stream_handler(State, StreamID, Reason, StreamState),
- Children = cowboy_children:shutdown(Children0, StreamID),
- stream_linger(State#state{streams=Streams, children=Children}, StreamID).
- terminate_stream_handler(#state{opts=Opts}, StreamID, Reason, StreamState) ->
- try
- cowboy_stream:terminate(StreamID, Reason, StreamState)
- catch Class:Exception:Stacktrace ->
- cowboy:log(cowboy_stream:make_error_log(terminate,
- [StreamID, Reason, StreamState],
- Class, Exception, Stacktrace), Opts)
- end.
- ignored_frame(State=#state{http3_machine=HTTP3Machine0}, #stream{id=StreamID}) ->
- case cow_http3_machine:ignored_frame(StreamID, HTTP3Machine0) of
- {ok, HTTP3Machine} ->
- State#state{http3_machine=HTTP3Machine};
- {error, Error={connection_error, _, _}, HTTP3Machine} ->
- terminate(State#state{http3_machine=HTTP3Machine}, Error)
- end.
- stream_abort_receive(State=#state{conn=Conn}, Stream=#stream{id=StreamID}, Reason) ->
- cowboy_quicer:shutdown_stream(Conn, StreamID,
- receiving, cow_http3:error_to_code(Reason)),
- stream_store(State, Stream#stream{status=stopping}).
- %% @todo Graceful connection shutdown.
- %% We terminate the connection immediately if it hasn't fully been initialized.
- goaway(State, {goaway, _}) ->
- terminate(State, {stop, goaway, 'The connection is going away.'}).
- terminate(State=#state{conn=Conn, %http3_status=Status,
- %http3_machine=HTTP3Machine,
- streams=Streams, children=Children}, Reason) ->
- % if
- % Status =:= connected; Status =:= closing_initiated ->
- %% @todo
- % ok = cowboy_quicer:send(Conn, ControlID, cow_http3:goaway(
- % cow_http3_machine:get_last_streamid(HTTP3Machine))),
- %% We already sent the GOAWAY frame.
- % Status =:= closing ->
- % ok
- % end,
- terminate_all_streams(State, maps:to_list(Streams), Reason),
- cowboy_children:terminate(Children),
- % terminate_linger(State),
- cowboy_quicer:shutdown(Conn, cow_http3:error_to_code(terminate_reason(Reason))),
- exit({shutdown, Reason}).
- terminate_reason({connection_error, Reason, _}) -> Reason;
- terminate_reason({stop, _, _}) -> h3_no_error.
- %terminate_reason({socket_error, _, _}) -> internal_error;
- %terminate_reason({internal_error, _, _}) -> internal_error.
- terminate_all_streams(_, [], _) ->
- ok;
- terminate_all_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], Reason) ->
- terminate_stream_handler(State, StreamID, Reason, StreamState),
- terminate_all_streams(State, Tail, Reason).
- stream_get(#state{streams=Streams}, StreamID) ->
- maps:get(StreamID, Streams, error).
- stream_new_remote(State=#state{http3_machine=HTTP3Machine0, streams=Streams},
- StreamID, StreamType) ->
- {HTTP3Machine, Status} = case StreamType of
- unidi ->
- {cow_http3_machine:init_unidi_stream(StreamID, unidi_remote, HTTP3Machine0),
- header};
- bidi ->
- {cow_http3_machine:init_bidi_stream(StreamID, HTTP3Machine0),
- normal}
- end,
- Stream = #stream{id=StreamID, status=Status},
- State#state{http3_machine=HTTP3Machine, streams=Streams#{StreamID => Stream}}.
- %% Stream closed message for a local (write-only) unidi stream.
- stream_closed(State=#state{local_control_id=StreamID}, StreamID, _) ->
- stream_closed1(State, StreamID);
- stream_closed(State=#state{local_encoder_id=StreamID}, StreamID, _) ->
- stream_closed1(State, StreamID);
- stream_closed(State=#state{local_decoder_id=StreamID}, StreamID, _) ->
- stream_closed1(State, StreamID);
- stream_closed(State=#state{opts=Opts,
- streams=Streams0, children=Children0}, StreamID, ErrorCode) ->
- case maps:take(StreamID, Streams0) of
- {#stream{state=undefined}, Streams} ->
- %% Unidi stream has no handler/children.
- stream_closed1(State#state{streams=Streams}, StreamID);
- %% We only stop bidi streams if the stream was closed with an error
- %% or the stream was already in the process of stopping.
- {#stream{status=Status, state=StreamState}, Streams}
- when Status =:= stopping; ErrorCode =/= 0 ->
- terminate_stream_handler(State, StreamID, closed, StreamState),
- Children = cowboy_children:shutdown(Children0, StreamID),
- stream_closed1(State#state{streams=Streams, children=Children}, StreamID);
- %% Don't remove a stream that terminated properly but
- %% has chosen to remain up (custom stream handlers).
- {_, _} ->
- stream_closed1(State, StreamID);
- %% Stream closed message for a stream that has been reset. Ignore.
- error ->
- case is_lingering_stream(State, StreamID) of
- true ->
- ok;
- false ->
- %% We avoid logging the data as it could be quite large.
- cowboy:log(warning, "Received stream_closed for unknown stream ~p. ~p ~p",
- [StreamID, self(), Streams0], Opts)
- end,
- State
- end.
- stream_closed1(State=#state{http3_machine=HTTP3Machine0}, StreamID) ->
- case cow_http3_machine:close_stream(StreamID, HTTP3Machine0) of
- {ok, HTTP3Machine} ->
- State#state{http3_machine=HTTP3Machine};
- {error, Error={connection_error, _, _}, HTTP3Machine} ->
- terminate(State#state{http3_machine=HTTP3Machine}, Error)
- end.
- stream_store(State=#state{streams=Streams}, Stream=#stream{id=StreamID}) ->
- State#state{streams=Streams#{StreamID => Stream}}.
- stream_linger(State=#state{lingering_streams=Lingering0}, StreamID) ->
- %% We only keep up to 100 streams in this state. @todo Make it configurable?
- Lingering = [StreamID|lists:sublist(Lingering0, 100 - 1)],
- State#state{lingering_streams=Lingering}.
- is_lingering_stream(#state{lingering_streams=Lingering}, StreamID) ->
- lists:member(StreamID, Lingering).
|