|
@@ -78,7 +78,7 @@
|
|
cert :: undefined | binary(),
|
|
cert :: undefined | binary(),
|
|
|
|
|
|
%% HTTP/2 state machine.
|
|
%% HTTP/2 state machine.
|
|
- http2_init :: sequence | settings | upgrade | complete,
|
|
|
|
|
|
+ http2_status :: sequence | settings | upgrade | connected | closing,
|
|
http2_machine :: cow_http2_machine:http2_machine(),
|
|
http2_machine :: cow_http2_machine:http2_machine(),
|
|
|
|
|
|
%% Currently active HTTP/2 streams. Streams may be initiated either
|
|
%% Currently active HTTP/2 streams. Streams may be initiated either
|
|
@@ -129,7 +129,7 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer
|
|
State = set_timeout(#state{parent=Parent, ref=Ref, socket=Socket,
|
|
State = set_timeout(#state{parent=Parent, ref=Ref, socket=Socket,
|
|
transport=Transport, proxy_header=ProxyHeader,
|
|
transport=Transport, proxy_header=ProxyHeader,
|
|
opts=Opts, peer=Peer, sock=Sock, cert=Cert,
|
|
opts=Opts, peer=Peer, sock=Sock, cert=Cert,
|
|
- http2_init=sequence, http2_machine=HTTP2Machine}),
|
|
|
|
|
|
+ http2_status=sequence, http2_machine=HTTP2Machine}),
|
|
Transport:send(Socket, Preface),
|
|
Transport:send(Socket, Preface),
|
|
case Buffer of
|
|
case Buffer of
|
|
<<>> -> loop(State, Buffer);
|
|
<<>> -> loop(State, Buffer);
|
|
@@ -149,7 +149,7 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer
|
|
State0 = #state{parent=Parent, ref=Ref, socket=Socket,
|
|
State0 = #state{parent=Parent, ref=Ref, socket=Socket,
|
|
transport=Transport, proxy_header=ProxyHeader,
|
|
transport=Transport, proxy_header=ProxyHeader,
|
|
opts=Opts, peer=Peer, sock=Sock, cert=Cert,
|
|
opts=Opts, peer=Peer, sock=Sock, cert=Cert,
|
|
- http2_init=upgrade, http2_machine=HTTP2Machine},
|
|
|
|
|
|
+ http2_status=upgrade, http2_machine=HTTP2Machine},
|
|
State1 = headers_frame(State0#state{
|
|
State1 = headers_frame(State0#state{
|
|
http2_machine=HTTP2Machine}, StreamID, Req),
|
|
http2_machine=HTTP2Machine}, StreamID, Req),
|
|
%% We assume that the upgrade will be applied. A stream handler
|
|
%% We assume that the upgrade will be applied. A stream handler
|
|
@@ -158,7 +158,7 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer
|
|
<<"connection">> => <<"Upgrade">>,
|
|
<<"connection">> => <<"Upgrade">>,
|
|
<<"upgrade">> => <<"h2c">>
|
|
<<"upgrade">> => <<"h2c">>
|
|
}, ?MODULE, undefined}), %% @todo undefined or #{}?
|
|
}, ?MODULE, undefined}), %% @todo undefined or #{}?
|
|
- State = set_timeout(State2#state{http2_init=sequence}),
|
|
|
|
|
|
+ State = set_timeout(State2#state{http2_status=sequence}),
|
|
Transport:send(Socket, Preface),
|
|
Transport:send(Socket, Preface),
|
|
case Buffer of
|
|
case Buffer of
|
|
<<>> -> loop(State, Buffer);
|
|
<<>> -> loop(State, Buffer);
|
|
@@ -181,6 +181,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
|
|
terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
|
|
terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
|
|
%% System messages.
|
|
%% System messages.
|
|
{'EXIT', Parent, Reason} ->
|
|
{'EXIT', Parent, Reason} ->
|
|
|
|
+ %% @todo Graceful shutdown here as well?
|
|
terminate(State, {stop, {exit, Reason}, 'Parent process terminated.'});
|
|
terminate(State, {stop, {exit, Reason}, 'Parent process terminated.'});
|
|
{system, From, Request} ->
|
|
{system, From, Request} ->
|
|
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer});
|
|
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer});
|
|
@@ -223,16 +224,16 @@ set_timeout(State=#state{opts=Opts, timer=TimerRef0}) ->
|
|
|
|
|
|
%% HTTP/2 protocol parsing.
|
|
%% HTTP/2 protocol parsing.
|
|
|
|
|
|
-parse(State=#state{http2_init=sequence}, Data) ->
|
|
|
|
|
|
+parse(State=#state{http2_status=sequence}, Data) ->
|
|
case cow_http2:parse_sequence(Data) of
|
|
case cow_http2:parse_sequence(Data) of
|
|
{ok, Rest} ->
|
|
{ok, Rest} ->
|
|
- parse(State#state{http2_init=settings}, Rest);
|
|
|
|
|
|
+ parse(State#state{http2_status=settings}, Rest);
|
|
more ->
|
|
more ->
|
|
loop(State, Data);
|
|
loop(State, Data);
|
|
Error = {connection_error, _, _} ->
|
|
Error = {connection_error, _, _} ->
|
|
terminate(State, Error)
|
|
terminate(State, Error)
|
|
end;
|
|
end;
|
|
-parse(State=#state{http2_machine=HTTP2Machine}, Data) ->
|
|
|
|
|
|
+parse(State=#state{http2_status=Status, http2_machine=HTTP2Machine, streams=Streams}, Data) ->
|
|
MaxFrameSize = cow_http2_machine:get_local_setting(max_frame_size, HTTP2Machine),
|
|
MaxFrameSize = cow_http2_machine:get_local_setting(max_frame_size, HTTP2Machine),
|
|
case cow_http2:parse(Data, MaxFrameSize) of
|
|
case cow_http2:parse(Data, MaxFrameSize) of
|
|
{ok, Frame, Rest} ->
|
|
{ok, Frame, Rest} ->
|
|
@@ -243,6 +244,9 @@ parse(State=#state{http2_machine=HTTP2Machine}, Data) ->
|
|
parse(reset_stream(State, StreamID, {stream_error, Reason, Human}), Rest);
|
|
parse(reset_stream(State, StreamID, {stream_error, Reason, Human}), Rest);
|
|
Error = {connection_error, _, _} ->
|
|
Error = {connection_error, _, _} ->
|
|
terminate(State, Error);
|
|
terminate(State, Error);
|
|
|
|
+ %% Terminate the connection if we are closing and all streams have completed.
|
|
|
|
+ more when Status =:= closing, Streams =:= #{} ->
|
|
|
|
+ terminate(State, {stop, normal, 'The connection is going away.'});
|
|
more ->
|
|
more ->
|
|
loop(State, Data)
|
|
loop(State, Data)
|
|
end.
|
|
end.
|
|
@@ -265,9 +269,8 @@ frame(State=#state{http2_machine=HTTP2Machine0}, Frame) ->
|
|
State#state{http2_machine=HTTP2Machine};
|
|
State#state{http2_machine=HTTP2Machine};
|
|
{ok, {rst_stream, StreamID, Reason}, HTTP2Machine} ->
|
|
{ok, {rst_stream, StreamID, Reason}, HTTP2Machine} ->
|
|
rst_stream_frame(State#state{http2_machine=HTTP2Machine}, StreamID, Reason);
|
|
rst_stream_frame(State#state{http2_machine=HTTP2Machine}, StreamID, Reason);
|
|
- {ok, Frame={goaway, _StreamID, _Reason, _Data}, HTTP2Machine} ->
|
|
|
|
- terminate(State#state{http2_machine=HTTP2Machine},
|
|
|
|
- {stop, Frame, 'Client is going away.'});
|
|
|
|
|
|
+ {ok, GoAway={goaway, _, _, _}, HTTP2Machine} ->
|
|
|
|
+ goaway(State#state{http2_machine=HTTP2Machine}, GoAway);
|
|
{send, SendData, HTTP2Machine} ->
|
|
{send, SendData, HTTP2Machine} ->
|
|
send_data(maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame), SendData);
|
|
send_data(maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame), SendData);
|
|
{error, {stream_error, StreamID, Reason, Human}, HTTP2Machine} ->
|
|
{error, {stream_error, StreamID, Reason, Human}, HTTP2Machine} ->
|
|
@@ -277,10 +280,10 @@ frame(State=#state{http2_machine=HTTP2Machine0}, Frame) ->
|
|
terminate(State#state{http2_machine=HTTP2Machine}, Error)
|
|
terminate(State#state{http2_machine=HTTP2Machine}, Error)
|
|
end.
|
|
end.
|
|
|
|
|
|
-%% We use this opportunity to mark the HTTP/2 initialization
|
|
|
|
-%% as complete if we were still waiting for a SETTINGS frame.
|
|
|
|
-maybe_ack(State=#state{http2_init=settings}, Frame) ->
|
|
|
|
- maybe_ack(State#state{http2_init=complete}, Frame);
|
|
|
|
|
|
+%% We use this opportunity to mark the HTTP/2 status as connected
|
|
|
|
+%% if we were still waiting for a SETTINGS frame.
|
|
|
|
+maybe_ack(State=#state{http2_status=settings}, Frame) ->
|
|
|
|
+ maybe_ack(State#state{http2_status=connected}, Frame);
|
|
maybe_ack(State=#state{socket=Socket, transport=Transport}, Frame) ->
|
|
maybe_ack(State=#state{socket=Socket, transport=Transport}, Frame) ->
|
|
case Frame of
|
|
case Frame of
|
|
{settings, _} -> Transport:send(Socket, cow_http2:settings_ack());
|
|
{settings, _} -> Transport:send(Socket, cow_http2:settings_ack());
|
|
@@ -549,6 +552,9 @@ commands(State0, StreamID, [{trailers, Trailers}|Tail]) ->
|
|
%%
|
|
%%
|
|
%% @todo Responses sent as a result of a push_promise request
|
|
%% @todo Responses sent as a result of a push_promise request
|
|
%% must not send push_promise frames themselves.
|
|
%% must not send push_promise frames themselves.
|
|
|
|
+%%
|
|
|
|
+%% @todo We should not send push_promise frames when we are
|
|
|
|
+%% in the closing http2_status.
|
|
commands(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0},
|
|
commands(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0},
|
|
StreamID, [{push, Method, Scheme, Host, Port, Path, Qs, Headers0}|Tail]) ->
|
|
StreamID, [{push, Method, Scheme, Host, Port, Path, Qs, Headers0}|Tail]) ->
|
|
Authority = case {Scheme, Port} of
|
|
Authority = case {Scheme, Port} of
|
|
@@ -600,7 +606,7 @@ commands(State, StreamID, [Error = {internal_error, _, _}|_Tail]) ->
|
|
%% @todo Only reset when the stream still exists.
|
|
%% @todo Only reset when the stream still exists.
|
|
reset_stream(State, StreamID, Error);
|
|
reset_stream(State, StreamID, Error);
|
|
%% Upgrade to HTTP/2. This is triggered by cowboy_http2 itself.
|
|
%% Upgrade to HTTP/2. This is triggered by cowboy_http2 itself.
|
|
-commands(State=#state{socket=Socket, transport=Transport, http2_init=upgrade},
|
|
|
|
|
|
+commands(State=#state{socket=Socket, transport=Transport, http2_status=upgrade},
|
|
StreamID, [{switch_protocol, Headers, ?MODULE, _}|Tail]) ->
|
|
StreamID, [{switch_protocol, Headers, ?MODULE, _}|Tail]) ->
|
|
%% @todo This 101 response needs to be passed through stream handlers.
|
|
%% @todo This 101 response needs to be passed through stream handlers.
|
|
Transport:send(Socket, cow_http:response(101, 'HTTP/1.1', maps:to_list(Headers))),
|
|
Transport:send(Socket, cow_http:response(101, 'HTTP/1.1', maps:to_list(Headers))),
|
|
@@ -654,16 +660,24 @@ headers_to_list(Headers0=#{<<"set-cookie">> := SetCookies}) ->
|
|
headers_to_list(Headers) ->
|
|
headers_to_list(Headers) ->
|
|
maps:to_list(Headers).
|
|
maps:to_list(Headers).
|
|
|
|
|
|
-maybe_send_data(State=#state{http2_machine=HTTP2Machine0}, StreamID, IsFin, Data0) ->
|
|
|
|
|
|
+maybe_send_data(State0=#state{http2_machine=HTTP2Machine0}, StreamID, IsFin, Data0) ->
|
|
Data = case is_tuple(Data0) of
|
|
Data = case is_tuple(Data0) of
|
|
false -> {data, Data0};
|
|
false -> {data, Data0};
|
|
true -> Data0
|
|
true -> Data0
|
|
end,
|
|
end,
|
|
case cow_http2_machine:send_or_queue_data(StreamID, HTTP2Machine0, IsFin, Data) of
|
|
case cow_http2_machine:send_or_queue_data(StreamID, HTTP2Machine0, IsFin, Data) of
|
|
{ok, HTTP2Machine} ->
|
|
{ok, HTTP2Machine} ->
|
|
- State#state{http2_machine=HTTP2Machine};
|
|
|
|
|
|
+ State0#state{http2_machine=HTTP2Machine};
|
|
{send, SendData, HTTP2Machine} ->
|
|
{send, SendData, HTTP2Machine} ->
|
|
- send_data(State#state{http2_machine=HTTP2Machine}, SendData)
|
|
|
|
|
|
+ State = #state{http2_status=Status, streams=Streams}
|
|
|
|
+ = send_data(State0#state{http2_machine=HTTP2Machine}, SendData),
|
|
|
|
+ %% Terminate the connection if we are closing and all streams have completed.
|
|
|
|
+ if
|
|
|
|
+ Status =:= closing, Streams =:= #{} ->
|
|
|
|
+ terminate(State, {stop, normal, 'The connection is going away.'});
|
|
|
|
+ true ->
|
|
|
|
+ State
|
|
|
|
+ end
|
|
end.
|
|
end.
|
|
|
|
|
|
send_data(State, []) ->
|
|
send_data(State, []) ->
|
|
@@ -702,16 +716,55 @@ send_data_frame(State=#state{socket=Socket, transport=Transport,
|
|
|
|
|
|
%% Terminate a stream or the connection.
|
|
%% Terminate a stream or the connection.
|
|
|
|
|
|
|
|
+%% We may have to cancel streams even if we receive multiple
|
|
|
|
+%% GOAWAY frames as the LastStreamID value may be lower than
|
|
|
|
+%% the one previously received.
|
|
|
|
+goaway(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine,
|
|
|
|
+ http2_status=Status, streams=Streams0}, {goaway, LastStreamID, Reason, _})
|
|
|
|
+ when Status =:= connected; Status =:= closing ->
|
|
|
|
+ Streams = goaway_streams(State0, maps:to_list(Streams0), LastStreamID,
|
|
|
|
+ {stop, {goaway, Reason}, 'The connection is going away.'}, []),
|
|
|
|
+ State = State0#state{streams=maps:from_list(Streams)},
|
|
|
|
+ case Status of
|
|
|
|
+ connected ->
|
|
|
|
+ Transport:send(Socket, cow_http2:goaway(
|
|
|
|
+ cow_http2_machine:get_last_streamid(HTTP2Machine),
|
|
|
|
+ no_error, <<>>)),
|
|
|
|
+ State#state{http2_status=closing};
|
|
|
|
+ _ ->
|
|
|
|
+ State
|
|
|
|
+ end;
|
|
|
|
+%% We terminate the connection immediately if it hasn't fully been initialized.
|
|
|
|
+goaway(State, {goaway, _, Reason, _}) ->
|
|
|
|
+ terminate(State, {stop, {goaway, Reason}, 'The connection is going away.'}).
|
|
|
|
+
|
|
|
|
+%% Cancel client-initiated streams that are above LastStreamID.
|
|
|
|
+goaway_streams(_, [], _, _, Acc) ->
|
|
|
|
+ Acc;
|
|
|
|
+goaway_streams(State, [{StreamID, {_, StreamState}}|Tail], LastStreamID, Reason, Acc)
|
|
|
|
+ when StreamID > LastStreamID, (StreamID rem 2) =:= 0 ->
|
|
|
|
+ terminate_stream_handler(State, StreamID, Reason, StreamState),
|
|
|
|
+ goaway_streams(State, Tail, LastStreamID, Reason, Acc);
|
|
|
|
+goaway_streams(State, [Stream|Tail], LastStreamID, Reason, Acc) ->
|
|
|
|
+ goaway_streams(State, Tail, LastStreamID, Reason, [Stream|Acc]).
|
|
|
|
+
|
|
-spec terminate(#state{}, _) -> no_return().
|
|
-spec terminate(#state{}, _) -> no_return().
|
|
terminate(undefined, Reason) ->
|
|
terminate(undefined, Reason) ->
|
|
exit({shutdown, Reason});
|
|
exit({shutdown, Reason});
|
|
-terminate(State=#state{socket=Socket, transport=Transport, http2_init=complete,
|
|
|
|
- http2_machine=HTTP2Machine, streams=Streams, children=Children}, Reason) ->
|
|
|
|
|
|
+terminate(State=#state{socket=Socket, transport=Transport, http2_status=Status,
|
|
|
|
+ http2_machine=HTTP2Machine, streams=Streams, children=Children}, Reason)
|
|
|
|
+ when Status =:= connected; Status =:= closing ->
|
|
%% @todo We might want to optionally send the Reason value
|
|
%% @todo We might want to optionally send the Reason value
|
|
%% as debug data in the GOAWAY frame here. Perhaps more.
|
|
%% as debug data in the GOAWAY frame here. Perhaps more.
|
|
- Transport:send(Socket, cow_http2:goaway(
|
|
|
|
- cow_http2_machine:get_last_streamid(HTTP2Machine),
|
|
|
|
- terminate_reason(Reason), <<>>)),
|
|
|
|
|
|
+ case Status of
|
|
|
|
+ connected ->
|
|
|
|
+ Transport:send(Socket, cow_http2:goaway(
|
|
|
|
+ cow_http2_machine:get_last_streamid(HTTP2Machine),
|
|
|
|
+ terminate_reason(Reason), <<>>));
|
|
|
|
+ %% We already sent the GOAWAY frame.
|
|
|
|
+ closing ->
|
|
|
|
+ ok
|
|
|
|
+ end,
|
|
terminate_all_streams(State, maps:to_list(Streams), Reason),
|
|
terminate_all_streams(State, maps:to_list(Streams), Reason),
|
|
cowboy_children:terminate(Children),
|
|
cowboy_children:terminate(Children),
|
|
Transport:close(Socket),
|
|
Transport:close(Socket),
|
|
@@ -823,6 +876,7 @@ system_continue(_, _, {State, Buffer}) ->
|
|
|
|
|
|
-spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return().
|
|
-spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return().
|
|
system_terminate(Reason, _, _, {State, _}) ->
|
|
system_terminate(Reason, _, _, {State, _}) ->
|
|
|
|
+ %% @todo Graceful shutdown here as well?
|
|
terminate(State, {stop, {exit, Reason}, 'sys:terminate/2,3 was called.'}).
|
|
terminate(State, {stop, {exit, Reason}, 'sys:terminate/2,3 was called.'}).
|
|
|
|
|
|
-spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}.
|
|
-spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}.
|