Browse Source

Make sure a response is always sent with HTTP/2

Previously only DATA was sent, with missing HEADERS.
Loïc Hoguin 9 years ago
parent
commit
44f16f3b1e
1 changed files with 62 additions and 46 deletions
  1. 62 46
      src/cowboy_http2.erl

+ 62 - 46
src/cowboy_http2.erl

@@ -26,7 +26,7 @@
 	id = undefined :: cowboy_stream:streamid(),
 	id = undefined :: cowboy_stream:streamid(),
 	state = undefined :: any(),
 	state = undefined :: any(),
 	%% Whether we finished sending data.
 	%% Whether we finished sending data.
-	local = nofin :: cowboy_stream:fin(),
+	local = idle :: idle | cowboy_stream:fin(),
 	%% Whether we finished receiving data.
 	%% Whether we finished receiving data.
 	remote = nofin :: cowboy_stream:fin(),
 	remote = nofin :: cowboy_stream:fin(),
 	%% Request body length.
 	%% Request body length.
@@ -235,8 +235,8 @@ parse_settings_preface(State, _, _, _) ->
 %% and terminate the stream if this is the end of it.
 %% and terminate the stream if this is the end of it.
 
 
 %% DATA frame.
 %% DATA frame.
-frame(State=#state{handler=Handler, streams=Streams0}, {data, StreamID, IsFin0, Data}) ->
-	case lists:keyfind(StreamID, #stream.id, Streams0) of
+frame(State=#state{handler=Handler, streams=Streams}, {data, StreamID, IsFin0, Data}) ->
+	case lists:keyfind(StreamID, #stream.id, Streams) of
 		Stream = #stream{state=StreamState0, remote=nofin, body_length=Len0} ->
 		Stream = #stream{state=StreamState0, remote=nofin, body_length=Len0} ->
 			Len = Len0 + byte_size(Data),
 			Len = Len0 + byte_size(Data),
 			IsFin = case IsFin0 of
 			IsFin = case IsFin0 of
@@ -245,9 +245,7 @@ frame(State=#state{handler=Handler, streams=Streams0}, {data, StreamID, IsFin0,
 			end,
 			end,
 			try Handler:data(StreamID, IsFin, Data, StreamState0) of
 			try Handler:data(StreamID, IsFin, Data, StreamState0) of
 				{Commands, StreamState} ->
 				{Commands, StreamState} ->
-					Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
-						Stream#stream{state=StreamState, body_length=Len}),
-					commands(State#state{streams=Streams}, StreamID, Commands)
+					commands(State, Stream#stream{state=StreamState, body_length=Len}, Commands)
 			catch Class:Reason ->
 			catch Class:Reason ->
 				error_logger:error_msg("Exception occurred in ~s:data(~p, ~p, ~p, ~p) with reason ~p:~p.",
 				error_logger:error_msg("Exception occurred in ~s:data(~p, ~p, ~p, ~p) with reason ~p:~p.",
 					[Handler, StreamID, IsFin0, Data, StreamState0, Class, Reason]),
 					[Handler, StreamID, IsFin0, Data, StreamState0, Class, Reason]),
@@ -342,14 +340,12 @@ down(State=#state{children=Children0}, Pid, Msg) ->
 			State
 			State
 	end.
 	end.
 
 
-info(State=#state{handler=Handler, streams=Streams0}, StreamID, Msg) ->
-	case lists:keyfind(StreamID, #stream.id, Streams0) of
+info(State=#state{handler=Handler, streams=Streams}, StreamID, Msg) ->
+	case lists:keyfind(StreamID, #stream.id, Streams) of
 		Stream = #stream{state=StreamState0} ->
 		Stream = #stream{state=StreamState0} ->
 			try Handler:info(StreamID, Msg, StreamState0) of
 			try Handler:info(StreamID, Msg, StreamState0) of
 				{Commands, StreamState} ->
 				{Commands, StreamState} ->
-					Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
-						Stream#stream{state=StreamState}),
-					commands(State#state{streams=Streams}, StreamID, Commands)
+					commands(State, Stream#stream{state=StreamState}, Commands)
 			catch Class:Reason ->
 			catch Class:Reason ->
 				error_logger:error_msg("Exception occurred in ~s:info(~p, ~p, ~p) with reason ~p:~p.",
 				error_logger:error_msg("Exception occurred in ~s:info(~p, ~p, ~p) with reason ~p:~p.",
 					[Handler, StreamID, Msg, StreamState0, Class, Reason]),
 					[Handler, StreamID, Msg, StreamState0, Class, Reason]),
@@ -361,37 +357,41 @@ info(State=#state{handler=Handler, streams=Streams0}, StreamID, Msg) ->
 			State
 			State
 	end.
 	end.
 
 
-commands(State, _, []) ->
-	State;
+commands(State, Stream, []) ->
+	after_commands(State, Stream);
 %% Send response headers.
 %% Send response headers.
 %%
 %%
 %% @todo Kill the stream if it sent a response when one has already been sent.
 %% @todo Kill the stream if it sent a response when one has already been sent.
 %% @todo Keep IsFin in the state.
 %% @todo Keep IsFin in the state.
 %% @todo Same two things above apply to DATA, possibly promise too.
 %% @todo Same two things above apply to DATA, possibly promise too.
-commands(State=#state{socket=Socket, transport=Transport, encode_state=EncodeState0}, StreamID,
-		[{response, StatusCode, Headers0, Body}|Tail]) ->
+commands(State=#state{socket=Socket, transport=Transport, encode_state=EncodeState0},
+		Stream=#stream{id=StreamID, local=idle}, [{response, StatusCode, Headers0, Body}|Tail]) ->
 	Headers = Headers0#{<<":status">> => integer_to_binary(StatusCode)},
 	Headers = Headers0#{<<":status">> => integer_to_binary(StatusCode)},
 	{HeaderBlock, EncodeState} = headers_encode(Headers, EncodeState0),
 	{HeaderBlock, EncodeState} = headers_encode(Headers, EncodeState0),
-	Response = cow_http2:headers(StreamID, nofin, HeaderBlock),
 	case Body of
 	case Body of
+		<<>> ->
+			Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)),
+			commands(State#state{encode_state=EncodeState}, Stream#stream{local=fin}, Tail);
 		{sendfile, O, B, P} ->
 		{sendfile, O, B, P} ->
-			Transport:send(Socket, Response),
-			commands(State#state{encode_state=EncodeState}, StreamID,
+			Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)),
+			commands(State#state{encode_state=EncodeState}, Stream#stream{local=nofin},
 				[{sendfile, fin, O, B, P}|Tail]);
 				[{sendfile, fin, O, B, P}|Tail]);
 		_ ->
 		_ ->
 			Transport:send(Socket, [
 			Transport:send(Socket, [
-				Response,
+				cow_http2:headers(StreamID, nofin, HeaderBlock),
 				cow_http2:data(StreamID, fin, Body)
 				cow_http2:data(StreamID, fin, Body)
 			]),
 			]),
-			commands(State#state{encode_state=EncodeState}, StreamID, Tail)
+			commands(State#state{encode_state=EncodeState}, Stream#stream{local=fin}, Tail)
 	end;
 	end;
+%% @todo response when local!=idle
 %% Send response headers and initiate chunked encoding.
 %% Send response headers and initiate chunked encoding.
-commands(State=#state{socket=Socket, transport=Transport, encode_state=EncodeState0}, StreamID,
-		[{headers, StatusCode, Headers0}|Tail]) ->
+commands(State=#state{socket=Socket, transport=Transport, encode_state=EncodeState0},
+		Stream=#stream{id=StreamID, local=idle}, [{headers, StatusCode, Headers0}|Tail]) ->
 	Headers = Headers0#{<<":status">> => integer_to_binary(StatusCode)},
 	Headers = Headers0#{<<":status">> => integer_to_binary(StatusCode)},
 	{HeaderBlock, EncodeState} = headers_encode(Headers, EncodeState0),
 	{HeaderBlock, EncodeState} = headers_encode(Headers, EncodeState0),
 	Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)),
 	Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)),
-	commands(State#state{encode_state=EncodeState}, StreamID, Tail);
+	commands(State#state{encode_state=EncodeState}, Stream#stream{local=nofin}, Tail);
+%% @todo headers when local!=idle
 %% Send a response body chunk.
 %% Send a response body chunk.
 %%
 %%
 %% @todo WINDOW_UPDATE stuff require us to buffer some data.
 %% @todo WINDOW_UPDATE stuff require us to buffer some data.
@@ -403,10 +403,11 @@ commands(State=#state{socket=Socket, transport=Transport, encode_state=EncodeSta
 %% split into multiple calls and flow control should be used to make
 %% split into multiple calls and flow control should be used to make
 %% sure we only send as fast as the client can receive and don't block
 %% sure we only send as fast as the client can receive and don't block
 %% anything.
 %% anything.
-commands(State=#state{socket=Socket, transport=Transport}, StreamID,
+commands(State=#state{socket=Socket, transport=Transport}, Stream=#stream{id=StreamID, local=nofin},
 		[{data, IsFin, Data}|Tail]) ->
 		[{data, IsFin, Data}|Tail]) ->
 	Transport:send(Socket, cow_http2:data(StreamID, IsFin, Data)),
 	Transport:send(Socket, cow_http2:data(StreamID, IsFin, Data)),
-	commands(State, StreamID, Tail);
+	commands(State, Stream#stream{local=IsFin}, Tail);
+%% @todo data when local!=nofin
 %% Send a file.
 %% Send a file.
 %%
 %%
 %% @todo This implementation is terrible. A good implementation would
 %% @todo This implementation is terrible. A good implementation would
@@ -418,17 +419,18 @@ commands(State=#state{socket=Socket, transport=Transport}, StreamID,
 %% to ensure the file is sent in chunks (which would require a better
 %% to ensure the file is sent in chunks (which would require a better
 %% flow control at the stream handler level). One thing for sure, the
 %% flow control at the stream handler level). One thing for sure, the
 %% implementation necessarily varies between HTTP/1.1 and HTTP/2.
 %% implementation necessarily varies between HTTP/1.1 and HTTP/2.
-commands(State=#state{socket=Socket, transport=Transport}, StreamID,
+commands(State=#state{socket=Socket, transport=Transport}, Stream=#stream{id=StreamID, local=nofin},
 		[{sendfile, IsFin, Offset, Bytes, Path}|Tail]) ->
 		[{sendfile, IsFin, Offset, Bytes, Path}|Tail]) ->
 	Transport:send(Socket, cow_http2:data_header(StreamID, IsFin, Bytes)),
 	Transport:send(Socket, cow_http2:data_header(StreamID, IsFin, Bytes)),
 	Transport:sendfile(Socket, Path, Offset, Bytes),
 	Transport:sendfile(Socket, Path, Offset, Bytes),
-	commands(State, StreamID, Tail);
+	commands(State, Stream#stream{local=IsFin}, Tail);
+%% @todo sendfile when local!=nofin
 %% Send a push promise.
 %% Send a push promise.
 %%
 %%
 %% @todo We need to keep track of what promises we made so that we don't
 %% @todo We need to keep track of what promises we made so that we don't
 %% end up with an infinite loop of promises.
 %% end up with an infinite loop of promises.
 commands(State0=#state{socket=Socket, transport=Transport, server_streamid=PromisedStreamID,
 commands(State0=#state{socket=Socket, transport=Transport, server_streamid=PromisedStreamID,
-		encode_state=EncodeState0}, StreamID,
+		encode_state=EncodeState0}, Stream=#stream{id=StreamID},
 		[{promise, Method, Scheme, Authority, Path, Headers0}|Tail]) ->
 		[{promise, Method, Scheme, Authority, Path, Headers0}|Tail]) ->
 	Headers = Headers0#{<<":method">> => Method,
 	Headers = Headers0#{<<":method">> => Method,
 			<<":scheme">> => Scheme,
 			<<":scheme">> => Scheme,
@@ -439,30 +441,38 @@ commands(State0=#state{socket=Socket, transport=Transport, server_streamid=Promi
 	%% @todo iolist_to_binary(HeaderBlock) isn't optimal. Need a shortcut.
 	%% @todo iolist_to_binary(HeaderBlock) isn't optimal. Need a shortcut.
 	State = stream_init(State0#state{server_streamid=PromisedStreamID + 2, encode_state=EncodeState},
 	State = stream_init(State0#state{server_streamid=PromisedStreamID + 2, encode_state=EncodeState},
 		PromisedStreamID, fin, iolist_to_binary(HeaderBlock)),
 		PromisedStreamID, fin, iolist_to_binary(HeaderBlock)),
-	commands(State, StreamID, Tail);
+	commands(State, Stream, Tail);
 %% @todo Update the flow control state.
 %% @todo Update the flow control state.
-commands(State, StreamID, [{flow, _Size}|Tail]) ->
-	commands(State, StreamID, Tail);
+commands(State, Stream, [{flow, _Size}|Tail]) ->
+	commands(State, Stream, Tail);
 %% Supervise a child process.
 %% Supervise a child process.
-commands(State=#state{children=Children}, StreamID, [{spawn, Pid, _Shutdown}|Tail]) -> %% @todo Shutdown
-	 commands(State#state{children=[{Pid, StreamID}|Children]}, StreamID, Tail);
+commands(State=#state{children=Children}, Stream=#stream{id=StreamID},
+		[{spawn, Pid, _Shutdown}|Tail]) -> %% @todo Shutdown
+	 commands(State#state{children=[{Pid, StreamID}|Children]}, Stream, Tail);
 %% Error handling.
 %% Error handling.
-commands(State, StreamID, [Error = {internal_error, _, _}|Tail]) ->
+commands(State, Stream=#stream{id=StreamID}, [Error = {internal_error, _, _}|_Tail]) ->
+	%% @todo Do we want to run the commands after an internal_error?
+	%% @todo Do we even allow commands after?
 	%% @todo Only reset when the stream still exists.
 	%% @todo Only reset when the stream still exists.
-	commands(stream_reset(State, StreamID, Error), StreamID, Tail);
+	stream_reset(after_commands(State, Stream), StreamID, Error);
 %% Upgrade to a new protocol.
 %% Upgrade to a new protocol.
 %%
 %%
 %% @todo Implementation.
 %% @todo Implementation.
 %% @todo Can only upgrade if: there are no other streams and there are no children left alive.
 %% @todo Can only upgrade if: there are no other streams and there are no children left alive.
 %% @todo For HTTP/1.1 we should reject upgrading if pipelining is used.
 %% @todo For HTTP/1.1 we should reject upgrading if pipelining is used.
-commands(State, StreamID, [{upgrade, _Mod, _ModState}]) ->
-	commands(State, StreamID, []);
-commands(State, StreamID, [{upgrade, _Mod, _ModState}|Tail]) ->
+commands(State, Stream, [{upgrade, _Mod, _ModState}]) ->
+	commands(State, Stream, []);
+commands(State, Stream, [{upgrade, _Mod, _ModState}|Tail]) ->
 	%% @todo This is an error. Not sure what to do here yet.
 	%% @todo This is an error. Not sure what to do here yet.
-	commands(State, StreamID, Tail);
-commands(State, StreamID, [stop|_Tail]) ->
+	commands(State, Stream, Tail);
+commands(State, Stream=#stream{id=StreamID}, [stop|_Tail]) ->
 	%% @todo Do we want to run the commands after a stop?
 	%% @todo Do we want to run the commands after a stop?
-	stream_terminate(State, StreamID, normal).
+	%% @todo Do we even allow commands after?
+	stream_terminate(after_commands(State, Stream), StreamID, normal).
+
+after_commands(State=#state{streams=Streams0}, Stream=#stream{id=StreamID}) ->
+	Streams = lists:keystore(StreamID, #stream.id, Streams0, Stream),
+	State#state{streams=Streams}.
 
 
 terminate(#state{socket=Socket, transport=Transport, handler=Handler,
 terminate(#state{socket=Socket, transport=Transport, handler=Handler,
 		streams=Streams, children=Children}, Reason) ->
 		streams=Streams, children=Children}, Reason) ->
@@ -534,11 +544,10 @@ stream_init(State0=#state{ref=Ref, socket=Socket, transport=Transport, decode_st
 			'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'})
 			'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'})
 	end.
 	end.
 
 
-stream_handler_init(State=#state{handler=Handler, opts=Opts, streams=Streams0}, StreamID, IsFin, Req) ->
+stream_handler_init(State=#state{handler=Handler, opts=Opts}, StreamID, IsFin, Req) ->
 	try Handler:init(StreamID, Req, Opts) of
 	try Handler:init(StreamID, Req, Opts) of
 		{Commands, StreamState} ->
 		{Commands, StreamState} ->
-			Streams = [#stream{id=StreamID, state=StreamState, remote=IsFin}|Streams0],
-			commands(State#state{streams=Streams}, StreamID, Commands)
+			commands(State, #stream{id=StreamID, state=StreamState, remote=IsFin}, Commands)
 	catch Class:Reason ->
 	catch Class:Reason ->
 		error_logger:error_msg("Exception occurred in ~s:init(~p, ~p, ~p) "
 		error_logger:error_msg("Exception occurred in ~s:init(~p, ~p, ~p) "
 			"with reason ~p:~p.",
 			"with reason ~p:~p.",
@@ -557,9 +566,16 @@ stream_reset(State=#state{socket=Socket, transport=Transport}, StreamID,
 	Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)),
 	Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)),
 	stream_terminate(State, StreamID, StreamError).
 	stream_terminate(State, StreamID, StreamError).
 
 
-stream_terminate(State=#state{socket=Socket, transport=Transport,
-		handler=Handler, streams=Streams0, children=Children0}, StreamID, Reason) ->
+stream_terminate(State=#state{socket=Socket, transport=Transport, handler=Handler,
+		streams=Streams0, children=Children0, encode_state=EncodeState0}, StreamID, Reason) ->
 	case lists:keytake(StreamID, #stream.id, Streams0) of
 	case lists:keytake(StreamID, #stream.id, Streams0) of
+		{value, #stream{state=StreamState, local=idle}, Streams} when Reason =:= normal ->
+			Headers = #{<<":status">> => <<"204">>},
+			{HeaderBlock, EncodeState} = headers_encode(Headers, EncodeState0),
+			Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)),
+			stream_call_terminate(StreamID, Reason, Handler, StreamState),
+			Children = stream_terminate_children(Children0, StreamID, []),
+			State#state{streams=Streams, children=Children, encode_state=EncodeState};
 		{value, #stream{state=StreamState, local=nofin}, Streams} when Reason =:= normal ->
 		{value, #stream{state=StreamState, local=nofin}, Streams} when Reason =:= normal ->
 			Transport:send(Socket, cow_http2:data(StreamID, fin, <<>>)),
 			Transport:send(Socket, cow_http2:data(StreamID, fin, <<>>)),
 			stream_call_terminate(StreamID, Reason, Handler, StreamState),
 			stream_call_terminate(StreamID, Reason, Handler, StreamState),