Browse Source

Add preliminary support for trailers in responses

This depends on changes in Cowlib that are only available on
master.
Loïc Hoguin 7 years ago
parent
commit
39baed6c80
7 changed files with 120 additions and 7 deletions
  1. 1 1
      Makefile
  2. 36 1
      src/cowboy_http.erl
  3. 29 5
      src/cowboy_http2.erl
  4. 6 0
      src/cowboy_req.erl
  5. 2 0
      src/cowboy_stream_h.erl
  6. 12 0
      test/handlers/resp_h.erl
  7. 34 0
      test/req_SUITE.erl

+ 1 - 1
Makefile

@@ -16,7 +16,7 @@ CT_OPTS += -ct_hooks cowboy_ct_hook [] # -boot start_sasl
 LOCAL_DEPS = crypto
 
 DEPS = cowlib ranch
-dep_cowlib = git https://github.com/ninenines/cowlib 2.0.1
+dep_cowlib = git https://github.com/ninenines/cowlib master
 dep_ranch = git https://github.com/ninenines/ranch 1.4.0
 
 DOC_DEPS = asciideck

+ 36 - 1
src/cowboy_http.erl

@@ -69,6 +69,8 @@
 	state = undefined :: {module(), any()},
 	%% Client HTTP version for this stream.
 	version = undefined :: cowboy:http_version(),
+	%% Unparsed te header. Used to know if we can send trailers.
+	te :: undefined | binary(),
 	%% Commands queued.
 	queue = [] :: cowboy_stream:commands()
 }).
@@ -267,7 +269,9 @@ after_parse({request, Req=#{streamid := StreamID, headers := Headers, version :=
 		State0=#state{opts=Opts, streams=Streams0}, Buffer}) ->
 	try cowboy_stream:init(StreamID, Req, Opts) of
 		{Commands, StreamState} ->
-			Streams = [#stream{id=StreamID, state=StreamState, version=Version}|Streams0],
+			TE = maps:get(<<"te">>, Headers, undefined),
+			Streams = [#stream{id=StreamID, state=StreamState,
+				version=Version, te=TE}|Streams0],
 			State1 = case maybe_req_close(State0, Headers, Version) of
 				close -> State0#state{streams=Streams, last_streamid=StreamID};
 				keepalive -> State0#state{streams=Streams}
@@ -900,6 +904,37 @@ commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, Str
 		nofin -> State0
 	end,
 	commands(State, StreamID, Tail);
+%% Send trailers.
+commands(State=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
+		[{trailers, Trailers}|Tail]) ->
+	TE = case lists:keyfind(StreamID, #stream.id, Streams) of
+		%% HTTP/1.0 doesn't support chunked transfer-encoding.
+		#stream{version='HTTP/1.0'} ->
+			not_chunked;
+		%% No TE header was sent.
+		#stream{te=undefined} ->
+			no_trailers;
+		#stream{te=TE0} ->
+			try cow_http_hd:parse_te(TE0) of
+				{TE1, _} -> TE1
+			catch _:_ ->
+				%% If we can't parse the TE header, assume we can't send trailers.
+				no_trailers
+			end
+	end,
+	case TE of
+		trailers ->
+			Transport:send(Socket, [
+				<<"0\r\n">>,
+				cow_http:headers(maps:to_list(Trailers)),
+				<<"\r\n">>
+			]);
+		no_trailers ->
+			Transport:send(Socket, <<"0\r\n\r\n">>);
+		not_chunked ->
+			ok
+	end,
+	commands(State#state{out_state=done}, StreamID, Tail);
 %% Send a file.
 commands(State0=#state{socket=Socket, transport=Transport}, StreamID,
 		[{sendfile, IsFin, Offset, Bytes, Path}|Tail]) ->

+ 29 - 5
src/cowboy_http2.erl

@@ -49,7 +49,9 @@
 	%% Whether we finished receiving data.
 	remote = nofin :: cowboy_stream:fin(),
 	%% Remote flow control window (how much we accept to receive).
-	remote_window :: integer()
+	remote_window :: integer(),
+	%% Unparsed te header. Used to know if we can send trailers.
+	te :: undefined | binary()
 }).
 
 -type stream() :: #stream{}.
@@ -537,9 +539,24 @@ commands(State=#state{socket=Socket, transport=Transport, encode_state=EncodeSta
 commands(State0, Stream0=#stream{local=nofin}, [{data, IsFin, Data}|Tail]) ->
 	{State, Stream} = send_data(State0, Stream0, IsFin, Data),
 	commands(State, Stream, Tail);
-
 %% @todo data when local!=nofin
-
+%% Send trailers.
+commands(State0, Stream0=#stream{local=nofin, te=TE0}, [{trailers, Trailers}|Tail]) ->
+	%% We only accept TE headers containing exactly "trailers" (RFC7540 8.1.2.1).
+	TE = try cow_http_hd:parse_te(TE0) of
+		{trailers, []} -> trailers;
+		_ -> no_trailers
+	catch _:_ ->
+		%% If we can't parse the TE header, assume we can't send trailers.
+		no_trailers
+	end,
+	{State, Stream} = case TE of
+		trailers ->
+			send_data(State0, Stream0, fin, {trailers, Trailers});
+		no_trailers ->
+			send_data(State0, Stream0, fin, <<>>)
+	end,
+	commands(State, Stream, Tail);
 %% Send a file.
 commands(State0, Stream0=#stream{local=nofin},
 		[{sendfile, IsFin, Offset, Bytes, Path}|Tail]) ->
@@ -664,6 +681,12 @@ send_data(State0, Stream0=#stream{local_buffer=Q0, local_buffer_size=BufferSize}
 send_data(State, Stream, IsFin, Data) ->
 	send_data(State, Stream, IsFin, Data, in).
 
+%% Always send trailer frames even if the window is empty.
+send_data(State=#state{socket=Socket, transport=Transport, encode_state=EncodeState0},
+		Stream=#stream{id=StreamID}, fin, {trailers, Trailers}, _) ->
+	{HeaderBlock, EncodeState} = headers_encode(Trailers, EncodeState0),
+	Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)),
+	{State#state{encode_state=EncodeState}, Stream#stream{local=fin}};
 %% Send data immediately if we can, buffer otherwise.
 %% @todo We might want to print an error if local=fin.
 send_data(State=#state{local_window=ConnWindow},
@@ -800,13 +823,14 @@ stream_req_init(State=#state{ref=Ref, peer=Peer, sock=Sock, cert=Cert},
 stream_handler_init(State=#state{opts=Opts,
 		local_settings=#{initial_window_size := RemoteWindow},
 		remote_settings=#{initial_window_size := LocalWindow}},
-		StreamID, RemoteIsFin, LocalIsFin, Req) ->
+		StreamID, RemoteIsFin, LocalIsFin, Req=#{headers := Headers}) ->
 	try cowboy_stream:init(StreamID, Req, Opts) of
 		{Commands, StreamState} ->
 			commands(State#state{client_streamid=StreamID},
 				#stream{id=StreamID, state=StreamState,
 					remote=RemoteIsFin, local=LocalIsFin,
-					local_window=LocalWindow, remote_window=RemoteWindow},
+					local_window=LocalWindow, remote_window=RemoteWindow,
+					te=maps:get(<<"te">>, Headers, undefined)},
 				Commands)
 	catch Class:Exception ->
 		cowboy_stream:report_error(init,

+ 6 - 0
src/cowboy_req.erl

@@ -81,6 +81,7 @@
 %% @todo stream_body/2 (nofin)
 -export([stream_body/3]).
 %% @todo stream_event/2,3
+-export([stream_trailers/2]).
 -export([push/3]).
 -export([push/4]).
 
@@ -774,6 +775,11 @@ stream_body(Data, IsFin, #{pid := Pid, streamid := StreamID, has_sent_resp := he
 	Pid ! {{Pid, StreamID}, {data, IsFin, Data}},
 	ok.
 
+-spec stream_trailers(cowboy:http_headers(), req()) -> ok.
+stream_trailers(Trailers, #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) ->
+	Pid ! {{Pid, StreamID}, {trailers, Trailers}},
+	ok.
+
 -spec push(binary(), cowboy:http_headers(), req()) -> ok.
 push(Path, Headers, Req) ->
 	push(Path, Headers, Req, #{}).

+ 2 - 0
src/cowboy_stream_h.erl

@@ -166,6 +166,8 @@ info(_StreamID, Headers = {headers, _, _}, State) ->
 	{[Headers], State#state{expect=undefined}};
 info(_StreamID, Data = {data, _, _}, State) ->
 	{[Data], State};
+info(_StreamID, Trailers = {trailers, _}, State) ->
+	{[Trailers], State};
 info(_StreamID, Push = {push, _, _, _, _, _, _, _}, State) ->
 	{[Push], State};
 info(_StreamID, SwitchProtocol = {switch_protocol, _, _, _}, State) ->

+ 12 - 0
test/handlers/resp_h.erl

@@ -204,6 +204,18 @@ do(<<"stream_body">>, Req0, Opts) ->
 			cowboy_req:stream_body(<<0:800000>>, fin, Req0),
 			{ok, Req0, Opts}
 	end;
+do(<<"stream_trailers">>, Req0, Opts) ->
+	case cowboy_req:binding(arg, Req0) of
+		_ ->
+			Req = cowboy_req:stream_reply(200, #{
+				<<"trailer">> => <<"grpc-status">>
+			}, Req0),
+			cowboy_req:stream_body(<<"Hello world!">>, nofin, Req),
+			cowboy_req:stream_trailers(#{
+				<<"grpc-status">> => <<"0">>
+			}, Req),
+			{ok, Req, Opts}
+	end;
 do(<<"push">>, Req, Opts) ->
 	case cowboy_req:binding(arg, Req) of
 		<<"method">> ->

+ 34 - 0
test/req_SUITE.erl

@@ -841,6 +841,40 @@ stream_body_nofin(Config) ->
 %% @todo Crash when calling stream_body after calling reply.
 %% @todo Crash when calling stream_body before calling stream_reply.
 
+stream_trailers(Config) ->
+	doc("Stream body followed by trailer headers."),
+	{200, RespHeaders, <<"Hello world!">>, [
+		{<<"grpc-status">>, <<"0">>}
+	]} = do_trailers("/resp/stream_trailers", Config),
+	{_, <<"grpc-status">>} = lists:keyfind(<<"trailer">>, 1, RespHeaders),
+	ok.
+
+stream_trailers_no_te(Config) ->
+	doc("Stream body followed by trailer headers."),
+	ConnPid = gun_open(Config),
+	Ref = gun:get(ConnPid, "/resp/stream_trailers", [
+		{<<"accept-encoding">>, <<"gzip">>}
+	]),
+	{response, nofin, 200, RespHeaders} = gun:await(ConnPid, Ref),
+	{ok, RespBody} = gun:await_body(ConnPid, Ref),
+	gun:close(ConnPid).
+
+do_trailers(Path, Config) ->
+	ConnPid = gun_open(Config),
+	Ref = gun:get(ConnPid, Path, [
+		{<<"accept-encoding">>, <<"gzip">>},
+		{<<"te">>, <<"trailers">>}
+	]),
+	{response, nofin, Status, RespHeaders} = gun:await(ConnPid, Ref),
+	{ok, RespBody, Trailers} = gun:await_body(ConnPid, Ref),
+	gun:close(ConnPid),
+	{Status, RespHeaders, do_decode(RespHeaders, RespBody), Trailers}.
+
+%% @todo Crash when calling stream_trailers twice.
+%% @todo Crash when calling stream_trailers after the fin flag has been set.
+%% @todo Crash when calling stream_trailers after calling reply.
+%% @todo Crash when calling stream_trailers before calling stream_reply.
+
 %% Tests: Push.
 
 %% @todo We want to crash when push is called after reply has been initiated.