|
@@ -51,6 +51,9 @@
|
|
%% @todo I haven't put as much thought as I should have on this,
|
|
%% @todo I haven't put as much thought as I should have on this,
|
|
%% the final settings handling will be very different.
|
|
%% the final settings handling will be very different.
|
|
local_settings = #{} :: map(),
|
|
local_settings = #{} :: map(),
|
|
|
|
+ %% @todo We need a TimerRef to do SETTINGS_TIMEOUT errors.
|
|
|
|
+ %% We need to be careful there. It's well possible that we send
|
|
|
|
+ %% two SETTINGS frames before we receive a SETTINGS ack.
|
|
next_settings = #{} :: undefined | map(), %% @todo perhaps set to undefined by default
|
|
next_settings = #{} :: undefined | map(), %% @todo perhaps set to undefined by default
|
|
remote_settings = #{} :: map(),
|
|
remote_settings = #{} :: map(),
|
|
|
|
|
|
@@ -71,7 +74,9 @@
|
|
%% is established and continues normally. An exception is when a HEADERS
|
|
%% is established and continues normally. An exception is when a HEADERS
|
|
%% frame is sent followed by CONTINUATION frames: no other frame can be
|
|
%% frame is sent followed by CONTINUATION frames: no other frame can be
|
|
%% sent in between.
|
|
%% sent in between.
|
|
- parse_state = preface :: preface | settings | normal
|
|
|
|
|
|
+ parse_state = undefined :: {preface, sequence, reference()}
|
|
|
|
+ | {preface, settings, reference()}
|
|
|
|
+ | normal
|
|
| {continuation, cowboy_stream:streamid(), cowboy_stream:fin(), binary()},
|
|
| {continuation, cowboy_stream:streamid(), cowboy_stream:fin(), binary()},
|
|
|
|
|
|
%% HPACK decoding and encoding state.
|
|
%% HPACK decoding and encoding state.
|
|
@@ -86,7 +91,8 @@ init(Parent, Ref, Socket, Transport, Opts, Handler) ->
|
|
-spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts(), module(), binary()) -> ok.
|
|
-spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts(), module(), binary()) -> ok.
|
|
init(Parent, Ref, Socket, Transport, Opts, Handler, Buffer) ->
|
|
init(Parent, Ref, Socket, Transport, Opts, Handler, Buffer) ->
|
|
State = #state{parent=Parent, ref=Ref, socket=Socket,
|
|
State = #state{parent=Parent, ref=Ref, socket=Socket,
|
|
- transport=Transport, opts=Opts, handler=Handler},
|
|
|
|
|
|
+ transport=Transport, opts=Opts, handler=Handler,
|
|
|
|
+ parse_state={preface, sequence, preface_timeout(Opts)}},
|
|
preface(State),
|
|
preface(State),
|
|
case Buffer of
|
|
case Buffer of
|
|
<<>> -> before_loop(State, Buffer);
|
|
<<>> -> before_loop(State, Buffer);
|
|
@@ -98,7 +104,8 @@ init(Parent, Ref, Socket, Transport, Opts, Handler, Buffer) ->
|
|
binary(), binary() | undefined, cowboy_req:req()) -> ok.
|
|
binary(), binary() | undefined, cowboy_req:req()) -> ok.
|
|
init(Parent, Ref, Socket, Transport, Opts, Handler, Buffer, _Settings, Req) ->
|
|
init(Parent, Ref, Socket, Transport, Opts, Handler, Buffer, _Settings, Req) ->
|
|
State0 = #state{parent=Parent, ref=Ref, socket=Socket,
|
|
State0 = #state{parent=Parent, ref=Ref, socket=Socket,
|
|
- transport=Transport, opts=Opts, handler=Handler},
|
|
|
|
|
|
+ transport=Transport, opts=Opts, handler=Handler,
|
|
|
|
+ parse_state={preface, sequence, preface_timeout(Opts)}},
|
|
preface(State0),
|
|
preface(State0),
|
|
%% @todo Apply settings.
|
|
%% @todo Apply settings.
|
|
%% StreamID from HTTP/1.1 Upgrade requests is always 1.
|
|
%% StreamID from HTTP/1.1 Upgrade requests is always 1.
|
|
@@ -113,11 +120,16 @@ preface(#state{socket=Socket, transport=Transport, next_settings=Settings}) ->
|
|
%% We send next_settings and use defaults until we get a ack.
|
|
%% We send next_settings and use defaults until we get a ack.
|
|
ok = Transport:send(Socket, cow_http2:settings(Settings)).
|
|
ok = Transport:send(Socket, cow_http2:settings(Settings)).
|
|
|
|
|
|
|
|
+preface_timeout(Opts) ->
|
|
|
|
+ PrefaceTimeout = maps:get(preface_timeout, Opts, 5000),
|
|
|
|
+ erlang:start_timer(PrefaceTimeout, self(), preface_timeout).
|
|
|
|
+
|
|
%% @todo Add the timeout for last time since we heard of connection.
|
|
%% @todo Add the timeout for last time since we heard of connection.
|
|
before_loop(State, Buffer) ->
|
|
before_loop(State, Buffer) ->
|
|
loop(State, Buffer).
|
|
loop(State, Buffer).
|
|
|
|
|
|
-loop(State=#state{parent=Parent, socket=Socket, transport=Transport, children=Children}, Buffer) ->
|
|
|
|
|
|
+loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
|
|
|
|
+ children=Children, parse_state=PS}, Buffer) ->
|
|
Transport:setopts(Socket, [{active, once}]),
|
|
Transport:setopts(Socket, [{active, once}]),
|
|
{OK, Closed, Error} = Transport:messages(),
|
|
{OK, Closed, Error} = Transport:messages(),
|
|
receive
|
|
receive
|
|
@@ -133,6 +145,14 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, children=Ch
|
|
exit(Reason);
|
|
exit(Reason);
|
|
{system, From, Request} ->
|
|
{system, From, Request} ->
|
|
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer});
|
|
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer});
|
|
|
|
+ {timeout, TRef, preface_timeout} ->
|
|
|
|
+ case PS of
|
|
|
|
+ {preface, _, TRef} ->
|
|
|
|
+ terminate(State, {connection_error, protocol_error,
|
|
|
|
+ 'The preface was not received in a reasonable amount of time.'});
|
|
|
|
+ _ ->
|
|
|
|
+ loop(State, Buffer)
|
|
|
|
+ end;
|
|
%% Messages pertaining to a stream.
|
|
%% Messages pertaining to a stream.
|
|
{{Pid, StreamID}, Msg} when Pid =:= self() ->
|
|
{{Pid, StreamID}, Msg} when Pid =:= self() ->
|
|
loop(info(State, StreamID, Msg), Buffer);
|
|
loop(info(State, StreamID, Msg), Buffer);
|
|
@@ -161,10 +181,10 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, children=Ch
|
|
terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
|
|
terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
|
|
end.
|
|
end.
|
|
|
|
|
|
-parse(State=#state{socket=Socket, transport=Transport, parse_state=preface}, Data) ->
|
|
|
|
|
|
+parse(State=#state{socket=Socket, transport=Transport, parse_state={preface, sequence, TRef}}, Data) ->
|
|
case Data of
|
|
case Data of
|
|
<< "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n", Rest/bits >> ->
|
|
<< "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n", Rest/bits >> ->
|
|
- parse(State#state{parse_state=settings}, Rest);
|
|
|
|
|
|
+ parse(State#state{parse_state={preface, settings, TRef}}, Rest);
|
|
_ when byte_size(Data) >= 24 ->
|
|
_ when byte_size(Data) >= 24 ->
|
|
Transport:close(Socket),
|
|
Transport:close(Socket),
|
|
exit({shutdown, {connection_error, protocol_error,
|
|
exit({shutdown, {connection_error, protocol_error,
|
|
@@ -187,9 +207,12 @@ parse(State=#state{parse_state=ParseState}, Data) ->
|
|
case cow_http2:parse(Data) of
|
|
case cow_http2:parse(Data) of
|
|
{ok, Frame, Rest} ->
|
|
{ok, Frame, Rest} ->
|
|
case ParseState of
|
|
case ParseState of
|
|
- normal -> parse(frame(State, Frame), Rest);
|
|
|
|
- settings -> parse(frame(State, Frame), Rest);
|
|
|
|
- _ -> parse(continuation_frame(State, Frame), Rest)
|
|
|
|
|
|
+ normal ->
|
|
|
|
+ parse(frame(State, Frame), Rest);
|
|
|
|
+ {preface, settings, TRef} ->
|
|
|
|
+ parse_settings_preface(State, Frame, Rest, TRef);
|
|
|
|
+ {continuation, _, _, _} ->
|
|
|
|
+ parse(continuation_frame(State, Frame), Rest)
|
|
end;
|
|
end;
|
|
{stream_error, StreamID, Reason, Human, Rest} ->
|
|
{stream_error, StreamID, Reason, Human, Rest} ->
|
|
parse(stream_reset(State, StreamID, {stream_error, Reason, Human}), Rest);
|
|
parse(stream_reset(State, StreamID, {stream_error, Reason, Human}), Rest);
|
|
@@ -199,13 +222,20 @@ parse(State=#state{parse_state=ParseState}, Data) ->
|
|
before_loop(State, Data)
|
|
before_loop(State, Data)
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
+parse_settings_preface(State, Frame={settings, _}, Rest, TRef) ->
|
|
|
|
+ erlang:cancel_timer(TRef, [{async, true}, {info, false}]),
|
|
|
|
+ parse(frame(State#state{parse_state=normal}, Frame), Rest);
|
|
|
|
+parse_settings_preface(State, _, _, _) ->
|
|
|
|
+ terminate(State, {connection_error, protocol_error,
|
|
|
|
+ 'The preface sequence must be followed by a SETTINGS frame. (RFC7540 3.5)'}).
|
|
|
|
+
|
|
%% @todo When we get a 'fin' we need to check if the stream had a 'fin' sent back
|
|
%% @todo When we get a 'fin' we need to check if the stream had a 'fin' sent back
|
|
%% and terminate the stream if this is the end of it.
|
|
%% and terminate the stream if this is the end of it.
|
|
|
|
|
|
%% DATA frame.
|
|
%% DATA frame.
|
|
frame(State=#state{handler=Handler, streams=Streams0}, {data, StreamID, IsFin, Data}) ->
|
|
frame(State=#state{handler=Handler, streams=Streams0}, {data, StreamID, IsFin, Data}) ->
|
|
case lists:keyfind(StreamID, #stream.id, Streams0) of
|
|
case lists:keyfind(StreamID, #stream.id, Streams0) of
|
|
- Stream = #stream{state=StreamState0} -> %% @todo in=open
|
|
|
|
|
|
+ Stream = #stream{state=StreamState0, remote=nofin} ->
|
|
try Handler:data(StreamID, IsFin, Data, StreamState0) of
|
|
try Handler:data(StreamID, IsFin, Data, StreamState0) of
|
|
{Commands, StreamState} ->
|
|
{Commands, StreamState} ->
|
|
Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
|
|
Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
|
|
@@ -217,7 +247,7 @@ frame(State=#state{handler=Handler, streams=Streams0}, {data, StreamID, IsFin, D
|
|
stream_reset(State, StreamID, {internal_error, {Class, Reason},
|
|
stream_reset(State, StreamID, {internal_error, {Class, Reason},
|
|
'Exception occurred in StreamHandler:data/4 call.'})
|
|
'Exception occurred in StreamHandler:data/4 call.'})
|
|
end;
|
|
end;
|
|
- false ->
|
|
|
|
|
|
+ _ ->
|
|
stream_reset(State, StreamID, {stream_error, stream_closed,
|
|
stream_reset(State, StreamID, {stream_error, stream_closed,
|
|
'DATA frame received for a closed or non-existent stream. (RFC7540 6.1)'})
|
|
'DATA frame received for a closed or non-existent stream. (RFC7540 6.1)'})
|
|
end;
|
|
end;
|