|
@@ -24,9 +24,14 @@
|
|
|
|
|
|
-export([init/3]).
|
|
|
|
|
|
+%% Temporary callback to do sendfile over QUIC.
|
|
|
+-export([send/2]).
|
|
|
+
|
|
|
-include_lib("quicer/include/quicer.hrl").
|
|
|
|
|
|
-record(stream, {
|
|
|
+ %% @todo We shouldn't use the QUIC reference because it is a NIF object
|
|
|
+ %% and if it's stored somewhere by the user it'll never get GC.
|
|
|
ref :: any(), %% @todo specs
|
|
|
|
|
|
%% Whether the stream is currently in a special state.
|
|
@@ -143,8 +148,11 @@ loop(State0=#state{conn=Conn}) ->
|
|
|
%% Messages pertaining to a stream.
|
|
|
{{Pid, StreamRef}, Msg} when Pid =:= self() ->
|
|
|
loop(info(State0, StreamRef, Msg));
|
|
|
- _Msg ->
|
|
|
-% ct:pal("msg ~p", [Msg]),
|
|
|
+ %% Exit signal from children.
|
|
|
+ Msg = {'EXIT', Pid, _} ->
|
|
|
+ loop(down(State0, Pid, Msg));
|
|
|
+ Msg ->
|
|
|
+ ct:pal("cowboy msg ~p", [Msg]),
|
|
|
loop(State0)
|
|
|
end.
|
|
|
|
|
@@ -192,6 +200,7 @@ parse1(State, Data, Stream=#stream{ref=StreamRef}, Props) ->
|
|
|
case cow_http3:parse(Data) of
|
|
|
{ok, Frame, Rest} ->
|
|
|
IsFin = is_fin(Props, Rest),
|
|
|
+% ct:pal("parse1 Frame= ~p Rest= ~p", [Frame, Rest]),
|
|
|
parse(frame(State, Stream, Frame, IsFin), Rest, StreamRef, Props);
|
|
|
{more, Frame, Len} ->
|
|
|
IsFin = is_fin(Props, <<>>),
|
|
@@ -252,6 +261,7 @@ parse_unidirectional_stream_header(State0=#state{http3_machine=HTTP3Machine0},
|
|
|
end.
|
|
|
|
|
|
frame(State=#state{http3_machine=HTTP3Machine0}, Stream=#stream{ref=StreamRef}, Frame, IsFin) ->
|
|
|
+% ct:pal("cowboy frame ~p ~p", [Frame, IsFin]),
|
|
|
case cow_http3_machine:frame(Frame, IsFin, StreamRef, HTTP3Machine0) of
|
|
|
{ok, HTTP3Machine} ->
|
|
|
State#state{http3_machine=HTTP3Machine};
|
|
@@ -323,7 +333,7 @@ headers_frame_parse_host(State=#state{peer=Peer, sock=Sock},
|
|
|
streamid => StreamRef,
|
|
|
peer => Peer,
|
|
|
sock => Sock,
|
|
|
-% cert => Cert, %% @todo
|
|
|
+ cert => undefined, %Cert, %% @todo
|
|
|
method => Method,
|
|
|
scheme => Scheme,
|
|
|
host => Host,
|
|
@@ -351,6 +361,7 @@ headers_frame_parse_host(State=#state{peer=Peer, sock=Sock},
|
|
|
end.
|
|
|
|
|
|
%% @todo Copied from cowboy_http2.
|
|
|
+%% @todo Remove "http"? Probably.
|
|
|
ensure_port(<<"http">>, undefined) -> 80;
|
|
|
ensure_port(<<"https">>, undefined) -> 443;
|
|
|
ensure_port(_, Port) -> Port.
|
|
@@ -374,10 +385,11 @@ headers_to_map([{Name, Value}|Tail], Acc0) ->
|
|
|
|
|
|
headers_frame(State=#state{opts=Opts, streams=Streams},
|
|
|
Stream=#stream{ref=StreamRef}, Req) ->
|
|
|
+ct:pal("req ~p", [Req]),
|
|
|
try cowboy_stream:init(StreamRef, Req, Opts) of
|
|
|
{Commands, StreamState} ->
|
|
|
-logger:error("~p", [Commands]),
|
|
|
-logger:error("~p", [StreamState]),
|
|
|
+%logger:error("~p", [Commands]),
|
|
|
+%logger:error("~p", [StreamState]),
|
|
|
commands(State#state{
|
|
|
streams=Streams#{StreamRef => Stream#stream{state=StreamState}}},
|
|
|
StreamRef, Commands)
|
|
@@ -389,14 +401,38 @@ logger:error("~p", [StreamState]),
|
|
|
'Unhandled exception in cowboy_stream:init/3.'})
|
|
|
end.
|
|
|
|
|
|
+%% Erlang messages.
|
|
|
+
|
|
|
+down(State0=#state{opts=Opts, children=Children0}, Pid, Msg) ->
|
|
|
+ State = case cowboy_children:down(Children0, Pid) of
|
|
|
+ %% The stream was terminated already.
|
|
|
+ {ok, undefined, Children} ->
|
|
|
+ State0#state{children=Children};
|
|
|
+ %% The stream is still running.
|
|
|
+ {ok, StreamRef, Children} ->
|
|
|
+ info(State0#state{children=Children}, StreamRef, Msg);
|
|
|
+ %% The process was unknown.
|
|
|
+ error ->
|
|
|
+ cowboy:log(warning, "Received EXIT signal ~p for unknown process ~p.~n",
|
|
|
+ [Msg, Pid], Opts),
|
|
|
+ State0
|
|
|
+ end,
|
|
|
+ if
|
|
|
+%% @todo
|
|
|
+% State#state.http2_status =:= closing, State#state.streams =:= #{} ->
|
|
|
+% terminate(State, {stop, normal, 'The connection is going away.'});
|
|
|
+ true ->
|
|
|
+ State
|
|
|
+ end.
|
|
|
+
|
|
|
info(State=#state{opts=Opts, http3_machine=_HTTP3Machine, streams=Streams}, StreamRef, Msg) ->
|
|
|
-logger:error("~p", [Msg]),
|
|
|
+%ct:pal("INFO ~p", [Msg]),
|
|
|
case Streams of
|
|
|
#{StreamRef := Stream=#stream{state=StreamState0}} ->
|
|
|
try cowboy_stream:info(StreamRef, Msg, StreamState0) of
|
|
|
{Commands, StreamState} ->
|
|
|
-logger:error("~p", [Commands]),
|
|
|
-logger:error("~p ~p", [StreamRef, Streams]),
|
|
|
+%ct:pal("~p", [Commands]),
|
|
|
+%logger:error("~p ~p", [StreamRef, Streams]),
|
|
|
commands(State#state{streams=Streams#{StreamRef => Stream#stream{state=StreamState}}},
|
|
|
StreamRef, Commands)
|
|
|
catch Class:Exception:Stacktrace ->
|
|
@@ -422,35 +458,59 @@ logger:error("~p ~p", [StreamRef, Streams]),
|
|
|
commands(State, _, []) ->
|
|
|
State;
|
|
|
%% Error responses are sent only if a response wasn't sent already.
|
|
|
-%commands(State=#state{http3_machine=HTTP3Machine}, StreamRef,
|
|
|
-% [{error_response, StatusCode, Headers, Body}|Tail]) ->
|
|
|
-% %% @todo
|
|
|
+commands(State=#state{http3_machine=HTTP3Machine}, StreamRef,
|
|
|
+ [{error_response, StatusCode, Headers, Body}|Tail]) ->
|
|
|
+ %% @todo
|
|
|
% case cow_http2_machine:get_stream_local_state(StreamRef, HTTP2Machine) of
|
|
|
% {ok, idle, _} ->
|
|
|
-% commands(State, StreamRef, [{response, StatusCode, Headers, Body}|Tail]);
|
|
|
+ commands(State, StreamRef, [{response, StatusCode, Headers, Body}|Tail]);
|
|
|
% _ ->
|
|
|
% commands(State, StreamRef, Tail)
|
|
|
% end;
|
|
|
%% Send an informational response.
|
|
|
-%commands(State0, StreamRef, [{inform, StatusCode, Headers}|Tail]) ->
|
|
|
-% State = send_headers(State0, StreamRef, idle, StatusCode, Headers),
|
|
|
-% commands(State, StreamRef, Tail);
|
|
|
+commands(State0, StreamRef, [{inform, StatusCode, Headers}|Tail]) ->
|
|
|
+ State = send_headers(State0, StreamRef, idle, StatusCode, Headers),
|
|
|
+ commands(State, StreamRef, Tail);
|
|
|
%% Send response headers.
|
|
|
commands(State0, StreamRef, [{response, StatusCode, Headers, Body}|Tail]) ->
|
|
|
+ ct:pal("commands response ~p ~p ~p", [StatusCode, Headers, try iolist_size(Body) catch _:_ -> Body end]),
|
|
|
State = send_response(State0, StreamRef, StatusCode, Headers, Body),
|
|
|
commands(State, StreamRef, Tail);
|
|
|
%% Send response headers.
|
|
|
-%commands(State0, StreamRef, [{headers, StatusCode, Headers}|Tail]) ->
|
|
|
-% State = send_headers(State0, StreamRef, nofin, StatusCode, Headers),
|
|
|
-% commands(State, StreamRef, Tail);
|
|
|
+commands(State0, StreamRef, [{headers, StatusCode, Headers}|Tail]) ->
|
|
|
+ ct:pal("commands headers ~p ~p", [StatusCode, Headers]),
|
|
|
+ State = send_headers(State0, StreamRef, nofin, StatusCode, Headers),
|
|
|
+ commands(State, StreamRef, Tail);
|
|
|
%%% Send a response body chunk.
|
|
|
-%commands(State0, StreamRef, [{data, IsFin, Data}|Tail]) ->
|
|
|
-% State = maybe_send_data(State0, StreamRef, IsFin, Data, []),
|
|
|
-% commands(State, StreamRef, Tail);
|
|
|
+commands(State0, StreamRef, [{data, IsFin, Data}|Tail]) ->
|
|
|
+ ct:pal("commands data ~p ~p", [IsFin, try iolist_size(Data) catch _:_ -> Data end]),
|
|
|
+ _ = case Data of
|
|
|
+ {sendfile, Offset, Bytes, Path} ->
|
|
|
+ %% Temporary solution to do sendfile over QUIC.
|
|
|
+ {ok, _} = ranch_transport:sendfile(?MODULE, StreamRef,
|
|
|
+ Path, Offset, Bytes, []),
|
|
|
+ {ok, _} = quicer:send(StreamRef, cow_http3:data(<<>>), send_flag(IsFin));
|
|
|
+ _ ->
|
|
|
+ {ok, _} = quicer:send(StreamRef, cow_http3:data(Data), send_flag(IsFin))
|
|
|
+ end,
|
|
|
+ State = maybe_send_is_fin(State0, StreamRef, IsFin),
|
|
|
+ commands(State, StreamRef, Tail);
|
|
|
%%% Send trailers.
|
|
|
-%commands(State0, StreamRef, [{trailers, Trailers}|Tail]) ->
|
|
|
-% State = maybe_send_data(State0, StreamRef, fin, {trailers, maps:to_list(Trailers)}, []),
|
|
|
-% commands(State, StreamRef, Tail);
|
|
|
+commands(State=#state{http3_machine=HTTP3Machine0}, StreamRef, [{trailers, Trailers}|Tail]) ->
|
|
|
+ ct:pal("commands trailers ~p", [Trailers]),
|
|
|
+ HTTP3Machine = case cow_http3_machine:prepare_trailers(
|
|
|
+ StreamRef, HTTP3Machine0, maps:to_list(Trailers)) of
|
|
|
+ {trailers, HeaderBlock, _EncData, HTTP3Machine1} ->
|
|
|
+ ct:pal("trailers"),
|
|
|
+ %% @todo EncData!!
|
|
|
+ {ok, _} = quicer:send(StreamRef, cow_http3:headers(HeaderBlock), send_flag(fin)),
|
|
|
+ HTTP3Machine1;
|
|
|
+ {no_trailers, HTTP3Machine1} ->
|
|
|
+ ct:pal("no_trailers"),
|
|
|
+ {ok, _} = quicer:send(StreamRef, cow_http3:data(<<>>), send_flag(fin)),
|
|
|
+ HTTP3Machine1
|
|
|
+ end,
|
|
|
+ commands(State#state{http3_machine=HTTP3Machine}, StreamRef, Tail);
|
|
|
%% Send a push promise.
|
|
|
%%
|
|
|
%% @todo Responses sent as a result of a push_promise request
|
|
@@ -524,6 +584,7 @@ commands(State=#state{children=Children}, StreamRef, [{spawn, Pid, Shutdown}|Tai
|
|
|
commands(State, StreamRef, [{set_options, _Opts}|Tail]) ->
|
|
|
commands(State, StreamRef, Tail);
|
|
|
commands(State, StreamRef, [stop|_Tail]) ->
|
|
|
+ ct:pal("stop"),
|
|
|
%% @todo Do we want to run the commands after a stop?
|
|
|
%% @todo Do we even allow commands after?
|
|
|
stop_stream(State, StreamRef);
|
|
@@ -534,13 +595,13 @@ commands(State=#state{opts=Opts}, StreamRef, [Log={log, _, _, _}|Tail]) ->
|
|
|
|
|
|
send_response(State0=#state{http3_machine=HTTP3Machine0}, StreamRef, StatusCode, Headers, Body) ->
|
|
|
Size = case Body of
|
|
|
- {sendfile, _, Bytes, _} -> Bytes;
|
|
|
+ {sendfile, _, Bytes0, _} -> Bytes0;
|
|
|
_ -> iolist_size(Body)
|
|
|
end,
|
|
|
case Size of
|
|
|
0 ->
|
|
|
State = send_headers(State0, StreamRef, fin, StatusCode, Headers),
|
|
|
- maybe_terminate_stream(State, StreamRef, fin);
|
|
|
+ maybe_send_is_fin(State, StreamRef, fin);
|
|
|
_ ->
|
|
|
%% @todo Add a test for HEAD to make sure we don't send the body when
|
|
|
%% returning {response...} from a stream handler (or {headers...} then {data...}).
|
|
@@ -550,12 +611,33 @@ send_response(State0=#state{http3_machine=HTTP3Machine0}, StreamRef, StatusCode,
|
|
|
#{status => cow_http:status_to_integer(StatusCode)},
|
|
|
headers_to_list(Headers)),
|
|
|
%% @todo It might be better to do async sends.
|
|
|
- {ok, _} = quicer:send(StreamRef, [
|
|
|
- cow_http3:headers(HeaderBlock),
|
|
|
- cow_http3:data(Body)
|
|
|
- ], send_flag(fin)),
|
|
|
- State0#state{http3_machine=HTTP3Machine}
|
|
|
- %% @todo maybe_terminate_stream (see maybe_send_data for how to handle it)
|
|
|
+ _ = case Body of
|
|
|
+ {sendfile, Offset, Bytes, Path} ->
|
|
|
+ {ok, _} = quicer:send(StreamRef, cow_http3:headers(HeaderBlock)),
|
|
|
+ %% Temporary solution to do sendfile over QUIC.
|
|
|
+ {ok, _} = ranch_transport:sendfile(?MODULE, StreamRef,
|
|
|
+ Path, Offset, Bytes, []),
|
|
|
+ {ok, _} = quicer:send(StreamRef, cow_http3:data(<<>>), send_flag(fin));
|
|
|
+ _ ->
|
|
|
+ {ok, _} = quicer:send(StreamRef, [
|
|
|
+ cow_http3:headers(HeaderBlock),
|
|
|
+ cow_http3:data(Body)
|
|
|
+ ], send_flag(fin))
|
|
|
+ end,
|
|
|
+ maybe_send_is_fin(State0#state{http3_machine=HTTP3Machine}, StreamRef, fin)
|
|
|
+ end.
|
|
|
+
|
|
|
+maybe_send_is_fin(State=#state{http3_machine=HTTP3Machine0}, StreamRef, fin) ->
|
|
|
+ HTTP3Machine = cow_http3_machine:close_bidi_stream_for_sending(StreamRef, HTTP3Machine0),
|
|
|
+ State#state{http3_machine=HTTP3Machine};
|
|
|
+maybe_send_is_fin(State, _, _) ->
|
|
|
+ State.
|
|
|
+
|
|
|
+%% Temporary callback to do sendfile over QUIC.
|
|
|
+send(StreamRef, IoData) ->
|
|
|
+ case quicer:send(StreamRef, cow_http3:data(IoData)) of
|
|
|
+ {ok, _} -> ok;
|
|
|
+ Error -> Error
|
|
|
end.
|
|
|
|
|
|
send_headers(State=#state{http3_machine=HTTP3Machine0},
|
|
@@ -603,6 +685,34 @@ reset_stream(State0=#state{http3_machine=HTTP3Machine0}, StreamRef, Error) ->
|
|
|
% end.
|
|
|
State1.
|
|
|
|
|
|
+stop_stream(State0=#state{http3_machine=HTTP3Machine, streams=Streams}, StreamRef) ->
|
|
|
+ #{StreamRef := Stream} = Streams,
|
|
|
+ %% We abort reading when stopping the stream but only
|
|
|
+ %% if the client was not finished sending data.
|
|
|
+ State = case cow_http3_machine:get_stream_remote_state(StreamRef, HTTP3Machine) of
|
|
|
+ {ok, fin} ->
|
|
|
+ State0;
|
|
|
+ _ ->
|
|
|
+ stream_abort_receive(State0, Stream, h3_no_error)
|
|
|
+ end,
|
|
|
+ %% Then we may need to send a response or terminate it
|
|
|
+ %% if the stream handler did not do so already.
|
|
|
+ case cow_http3_machine:get_stream_local_state(StreamRef, HTTP3Machine) of
|
|
|
+ %% When the stream terminates normally (without resetting the stream)
|
|
|
+ %% and no response was sent, we need to send a proper response back to the client.
|
|
|
+ {ok, idle} ->
|
|
|
+ info(State, StreamRef, {response, 204, #{}, <<>>});
|
|
|
+ %% When a response was sent but not terminated, we need to close the stream.
|
|
|
+ %% We send a final DATA frame to complete the stream.
|
|
|
+ {ok, nofin} ->
|
|
|
+ ct:pal("error nofin"),
|
|
|
+ info(State, StreamRef, {data, fin, <<>>});
|
|
|
+ %% When a response was sent fully we can terminate the stream,
|
|
|
+ %% regardless of the stream being in half-closed or closed state.
|
|
|
+ _ ->
|
|
|
+ terminate_stream(State, StreamRef, h3_no_error)
|
|
|
+ end.
|
|
|
+
|
|
|
terminate_stream(State=#state{streams=Streams0, children=Children0}, StreamRef, Reason) ->
|
|
|
case maps:take(StreamRef, Streams0) of
|
|
|
{#stream{state=StreamState}, Streams} ->
|
|
@@ -622,15 +732,6 @@ terminate_stream_handler(#state{opts=Opts}, StreamRef, Reason, StreamState) ->
|
|
|
Class, Exception, Stacktrace), Opts)
|
|
|
end.
|
|
|
|
|
|
-
|
|
|
-
|
|
|
-stop_stream(_, _) ->
|
|
|
- error(todo).
|
|
|
-
|
|
|
-%% @todo Maybe do this on stream close instead.
|
|
|
-maybe_terminate_stream(State, _, _) ->
|
|
|
- State.
|
|
|
-
|
|
|
ignored_frame(State=#state{http3_machine=HTTP3Machine0}, #stream{ref=StreamRef}) ->
|
|
|
case cow_http3_machine:ignored_frame(StreamRef, HTTP3Machine0) of
|
|
|
{ok, HTTP3Machine} ->
|