Browse Source

Make sure cowboy_stream_h calls subsequent stream handlers

Loïc Hoguin 7 years ago
parent
commit
34f57ebbd3
1 changed files with 80 additions and 56 deletions
  1. 80 56
      src/cowboy_stream_h.erl

+ 80 - 56
src/cowboy_stream_h.erl

@@ -29,9 +29,8 @@
 -export([execute/3]).
 -export([execute/3]).
 -export([resume/5]).
 -export([resume/5]).
 
 
-%% @todo Need to call subsequent handlers.
-
 -record(state, {
 -record(state, {
+	next :: any(),
 	ref = undefined :: ranch:ref(),
 	ref = undefined :: ranch:ref(),
 	pid = undefined :: pid(),
 	pid = undefined :: pid(),
 	expect = undefined :: undefined | continue,
 	expect = undefined :: undefined | continue,
@@ -49,13 +48,15 @@
 
 
 -spec init(cowboy_stream:streamid(), cowboy_req:req(), cowboy:opts())
 -spec init(cowboy_stream:streamid(), cowboy_req:req(), cowboy:opts())
 	-> {[{spawn, pid(), timeout()}], #state{}}.
 	-> {[{spawn, pid(), timeout()}], #state{}}.
-init(_StreamID, Req=#{ref := Ref}, Opts) ->
+init(StreamID, Req=#{ref := Ref}, Opts) ->
 	Env = maps:get(env, Opts, #{}),
 	Env = maps:get(env, Opts, #{}),
 	Middlewares = maps:get(middlewares, Opts, [cowboy_router, cowboy_handler]),
 	Middlewares = maps:get(middlewares, Opts, [cowboy_router, cowboy_handler]),
 	Shutdown = maps:get(shutdown_timeout, Opts, 5000),
 	Shutdown = maps:get(shutdown_timeout, Opts, 5000),
 	Pid = proc_lib:spawn_link(?MODULE, request_process, [Req, Env, Middlewares]),
 	Pid = proc_lib:spawn_link(?MODULE, request_process, [Req, Env, Middlewares]),
 	Expect = expect(Req),
 	Expect = expect(Req),
-	{[{spawn, Pid, Shutdown}], #state{ref=Ref, pid=Pid, expect=Expect}}.
+	{Commands, Next} = cowboy_stream:init(StreamID, Req, Opts),
+	{[{spawn, Pid, Shutdown}|Commands],
+		#state{next=Next, ref=Ref, pid=Pid, expect=Expect}}.
 
 
 %% Ignore the expect header in HTTP/1.0.
 %% Ignore the expect header in HTTP/1.0.
 expect(#{version := 'HTTP/1.0'}) ->
 expect(#{version := 'HTTP/1.0'}) ->
@@ -81,49 +82,58 @@ expect(Req) ->
 -spec data(cowboy_stream:streamid(), cowboy_stream:fin(), cowboy_req:resp_body(), State)
 -spec data(cowboy_stream:streamid(), cowboy_stream:fin(), cowboy_req:resp_body(), State)
 	-> {cowboy_stream:commands(), State} when State::#state{}.
 	-> {cowboy_stream:commands(), State} when State::#state{}.
 %% Stream isn't waiting for data.
 %% Stream isn't waiting for data.
-data(_StreamID, IsFin, Data, State=#state{
+data(StreamID, IsFin, Data, State=#state{
 		read_body_ref=undefined, read_body_buffer=Buffer, body_length=BodyLen}) ->
 		read_body_ref=undefined, read_body_buffer=Buffer, body_length=BodyLen}) ->
-	{[], State#state{
+	do_data(StreamID, IsFin, Data, [], State#state{
 		expect=undefined,
 		expect=undefined,
 		read_body_is_fin=IsFin,
 		read_body_is_fin=IsFin,
 		read_body_buffer= << Buffer/binary, Data/binary >>,
 		read_body_buffer= << Buffer/binary, Data/binary >>,
-		body_length=BodyLen + byte_size(Data)}};
+		body_length=BodyLen + byte_size(Data)
+	});
 %% Stream is waiting for data using auto mode.
 %% Stream is waiting for data using auto mode.
 %%
 %%
 %% There is no buffering done in auto mode.
 %% There is no buffering done in auto mode.
-data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref,
+data(StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref,
 		read_body_length=auto, body_length=BodyLen}) ->
 		read_body_length=auto, body_length=BodyLen}) ->
 	send_request_body(Pid, Ref, IsFin, BodyLen, Data),
 	send_request_body(Pid, Ref, IsFin, BodyLen, Data),
-	{[{flow, byte_size(Data)}], State#state{
+	do_data(StreamID, IsFin, Data, [{flow, byte_size(Data)}], State#state{
 		read_body_ref=undefined,
 		read_body_ref=undefined,
-		body_length=BodyLen}};
+		body_length=BodyLen
+	});
 %% Stream is waiting for data but we didn't receive enough to send yet.
 %% Stream is waiting for data but we didn't receive enough to send yet.
-data(_StreamID, nofin, Data, State=#state{
+data(StreamID, IsFin=nofin, Data, State=#state{
 		read_body_length=ReadLen, read_body_buffer=Buffer, body_length=BodyLen})
 		read_body_length=ReadLen, read_body_buffer=Buffer, body_length=BodyLen})
 		when byte_size(Data) + byte_size(Buffer) < ReadLen ->
 		when byte_size(Data) + byte_size(Buffer) < ReadLen ->
-	{[], State#state{
+	do_data(StreamID, IsFin, Data, [], State#state{
 		expect=undefined,
 		expect=undefined,
 		read_body_buffer= << Buffer/binary, Data/binary >>,
 		read_body_buffer= << Buffer/binary, Data/binary >>,
-		body_length=BodyLen + byte_size(Data)}};
+		body_length=BodyLen + byte_size(Data)
+	});
 %% Stream is waiting for data and we received enough to send.
 %% Stream is waiting for data and we received enough to send.
-data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref,
+data(StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref,
 		read_body_timer_ref=TRef, read_body_buffer=Buffer, body_length=BodyLen0}) ->
 		read_body_timer_ref=TRef, read_body_buffer=Buffer, body_length=BodyLen0}) ->
 	BodyLen = BodyLen0 + byte_size(Data),
 	BodyLen = BodyLen0 + byte_size(Data),
 	%% @todo Handle the infinity case where no TRef was defined.
 	%% @todo Handle the infinity case where no TRef was defined.
 	ok = erlang:cancel_timer(TRef, [{async, true}, {info, false}]),
 	ok = erlang:cancel_timer(TRef, [{async, true}, {info, false}]),
 	send_request_body(Pid, Ref, IsFin, BodyLen, <<Buffer/binary, Data/binary>>),
 	send_request_body(Pid, Ref, IsFin, BodyLen, <<Buffer/binary, Data/binary>>),
-	{[], State#state{
+	do_data(StreamID, IsFin, Data, [], State#state{
 		expect=undefined,
 		expect=undefined,
 		read_body_ref=undefined,
 		read_body_ref=undefined,
 		read_body_timer_ref=undefined,
 		read_body_timer_ref=undefined,
 		read_body_buffer= <<>>,
 		read_body_buffer= <<>>,
-		body_length=BodyLen}}.
+		body_length=BodyLen
+	}).
+
+do_data(StreamID, IsFin, Data, Commands1, State=#state{next=Next0}) ->
+	{Commands2, Next} = cowboy_stream:data(StreamID, IsFin, Data, Next0),
+	{Commands1 ++ Commands2, State#state{next=Next}}.
 
 
 -spec info(cowboy_stream:streamid(), any(), State)
 -spec info(cowboy_stream:streamid(), any(), State)
 	-> {cowboy_stream:commands(), State} when State::#state{}.
 	-> {cowboy_stream:commands(), State} when State::#state{}.
-info(_StreamID, {'EXIT', Pid, normal}, State=#state{pid=Pid}) ->
-	{[stop], State};
-info(_StreamID, {'EXIT', Pid, {{request_error, Reason, _HumanReadable}, _}}, State=#state{pid=Pid}) ->
+info(StreamID, Info={'EXIT', Pid, normal}, State=#state{pid=Pid}) ->
+	do_info(StreamID, Info, [stop], State);
+info(StreamID, Info={'EXIT', Pid, {{request_error, Reason, _HumanReadable}, _}},
+		State=#state{pid=Pid}) ->
 	%% @todo Optionally report the crash to help debugging.
 	%% @todo Optionally report the crash to help debugging.
 	%%report_crash(Ref, StreamID, Pid, Reason, Stacktrace),
 	%%report_crash(Ref, StreamID, Pid, Reason, Stacktrace),
 	Status = case Reason of
 	Status = case Reason of
@@ -132,80 +142,94 @@ info(_StreamID, {'EXIT', Pid, {{request_error, Reason, _HumanReadable}, _}}, Sta
 		_ -> 400
 		_ -> 400
 	end,
 	end,
 	%% @todo Headers? Details in body? More stuff in debug only?
 	%% @todo Headers? Details in body? More stuff in debug only?
-	{[{error_response, Status, #{<<"content-length">> => <<"0">>}, <<>>}, stop], State};
-info(StreamID, Exit = {'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref, pid=Pid}) ->
+	do_info(StreamID, Info, [
+		{error_response, Status, #{<<"content-length">> => <<"0">>}, <<>>},
+		stop
+	], State);
+info(StreamID, Exit={'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref, pid=Pid}) ->
 	report_crash(Ref, StreamID, Pid, Reason, Stacktrace),
 	report_crash(Ref, StreamID, Pid, Reason, Stacktrace),
-	{[
+	do_info(StreamID, Exit, [
 		{error_response, 500, #{<<"content-length">> => <<"0">>}, <<>>},
 		{error_response, 500, #{<<"content-length">> => <<"0">>}, <<>>},
 		{internal_error, Exit, 'Stream process crashed.'}
 		{internal_error, Exit, 'Stream process crashed.'}
-	], State};
+	], State);
 %% Request body, auto mode, no body buffered.
 %% Request body, auto mode, no body buffered.
-info(_StreamID, {read_body, Ref, auto, infinity}, State=#state{read_body_buffer= <<>>}) ->
-	{[], State#state{
+info(StreamID, Info={read_body, Ref, auto, infinity}, State=#state{read_body_buffer= <<>>}) ->
+	do_info(StreamID, Info, [], State#state{
 		read_body_ref=Ref,
 		read_body_ref=Ref,
-		read_body_length=auto}};
+		read_body_length=auto
+	});
 %% Request body, auto mode, body buffered or complete.
 %% Request body, auto mode, body buffered or complete.
-info(_StreamID, {read_body, Ref, auto, infinity}, State=#state{pid=Pid,
+info(StreamID, Info={read_body, Ref, auto, infinity}, State=#state{pid=Pid,
 		read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) ->
 		read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) ->
 	send_request_body(Pid, Ref, IsFin, BodyLen, Buffer),
 	send_request_body(Pid, Ref, IsFin, BodyLen, Buffer),
-	{[{flow, byte_size(Buffer)}], State#state{read_body_buffer= <<>>}};
+	do_info(StreamID, Info, [{flow, byte_size(Buffer)}],
+		State#state{read_body_buffer= <<>>});
 %% Request body, body buffered large enough or complete.
 %% Request body, body buffered large enough or complete.
 %%
 %%
 %% We do not send a 100 continue response if the client
 %% We do not send a 100 continue response if the client
 %% already started sending the body.
 %% already started sending the body.
-info(_StreamID, {read_body, Ref, Length, _}, State=#state{pid=Pid,
+info(StreamID, Info={read_body, Ref, Length, _}, State=#state{pid=Pid,
 		read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen})
 		read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen})
 		when IsFin =:= fin; byte_size(Buffer) >= Length ->
 		when IsFin =:= fin; byte_size(Buffer) >= Length ->
 	send_request_body(Pid, Ref, IsFin, BodyLen, Buffer),
 	send_request_body(Pid, Ref, IsFin, BodyLen, Buffer),
-	{[], State#state{read_body_buffer= <<>>}};
+	do_info(StreamID, Info, [], State#state{read_body_buffer= <<>>});
 %% Request body, not enough to send yet.
 %% Request body, not enough to send yet.
-info(StreamID, {read_body, Ref, Length, Period}, State=#state{expect=Expect}) ->
+info(StreamID, Info={read_body, Ref, Length, Period}, State=#state{expect=Expect}) ->
 	Commands = case Expect of
 	Commands = case Expect of
 		continue -> [{inform, 100, #{}}, {flow, Length}];
 		continue -> [{inform, 100, #{}}, {flow, Length}];
 		undefined -> [{flow, Length}]
 		undefined -> [{flow, Length}]
 	end,
 	end,
 	%% @todo Handle the case where Period =:= infinity.
 	%% @todo Handle the case where Period =:= infinity.
 	TRef = erlang:send_after(Period, self(), {{self(), StreamID}, {read_body_timeout, Ref}}),
 	TRef = erlang:send_after(Period, self(), {{self(), StreamID}, {read_body_timeout, Ref}}),
-	{Commands, State#state{
+	do_info(StreamID, Info, Commands, State#state{
 		read_body_ref=Ref,
 		read_body_ref=Ref,
 		read_body_timer_ref=TRef,
 		read_body_timer_ref=TRef,
-		read_body_length=Length}};
+		read_body_length=Length
+	});
 %% Request body reading timeout; send what we got.
 %% Request body reading timeout; send what we got.
-info(_StreamID, {read_body_timeout, Ref}, State=#state{pid=Pid, read_body_ref=Ref,
+info(StreamID, Info={read_body_timeout, Ref}, State=#state{pid=Pid, read_body_ref=Ref,
 		read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) ->
 		read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) ->
 	send_request_body(Pid, Ref, IsFin, BodyLen, Buffer),
 	send_request_body(Pid, Ref, IsFin, BodyLen, Buffer),
-	{[], State#state{read_body_ref=undefined, read_body_timer_ref=undefined, read_body_buffer= <<>>}};
-info(_StreamID, {read_body_timeout, _}, State) ->
-	{[], State};
+	do_info(StreamID, Info, [], State#state{
+		read_body_ref=undefined,
+		read_body_timer_ref=undefined,
+		read_body_buffer= <<>>
+	});
+info(StreamID, Info={read_body_timeout, _}, State) ->
+	do_info(StreamID, Info, [], State);
 %% Response.
 %% Response.
 %%
 %%
 %% We reset the expect field when a 100 continue response
 %% We reset the expect field when a 100 continue response
 %% is sent or when any final response is sent.
 %% is sent or when any final response is sent.
-info(_StreamID, Inform = {inform, Status, _}, State0) ->
+info(StreamID, Inform={inform, Status, _}, State0) ->
 	State = case cow_http:status_to_integer(Status) of
 	State = case cow_http:status_to_integer(Status) of
 		100 -> State0#state{expect=undefined};
 		100 -> State0#state{expect=undefined};
 		_ -> State0
 		_ -> State0
 	end,
 	end,
-	{[Inform], State};
-info(_StreamID, Response = {response, _, _, _}, State) ->
-	{[Response], State#state{expect=undefined}};
-info(_StreamID, Headers = {headers, _, _}, State) ->
-	{[Headers], State#state{expect=undefined}};
-info(_StreamID, Data = {data, _, _}, State) ->
-	{[Data], State};
-info(_StreamID, Trailers = {trailers, _}, State) ->
-	{[Trailers], State};
-info(_StreamID, Push = {push, _, _, _, _, _, _, _}, State) ->
-	{[Push], State};
-info(_StreamID, SwitchProtocol = {switch_protocol, _, _, _}, State) ->
-	{[SwitchProtocol], State#state{expect=undefined}};
-%% Stray message.
-info(_StreamID, _Info, State) ->
-	{[], State}.
+	do_info(StreamID, Inform, [Inform], State);
+info(StreamID, Response={response, _, _, _}, State) ->
+	do_info(StreamID, Response, [Response], State#state{expect=undefined});
+info(StreamID, Headers={headers, _, _}, State) ->
+	do_info(StreamID, Headers, [Headers], State#state{expect=undefined});
+info(StreamID, Data={data, _, _}, State) ->
+	do_info(StreamID, Data, [Data], State);
+info(StreamID, Trailers={trailers, _}, State) ->
+	do_info(StreamID, Trailers, [Trailers], State);
+info(StreamID, Push={push, _, _, _, _, _, _, _}, State) ->
+	do_info(StreamID, Push, [Push], State);
+info(StreamID, SwitchProtocol={switch_protocol, _, _, _}, State) ->
+	do_info(StreamID, SwitchProtocol, [SwitchProtocol], State#state{expect=undefined});
+%% Unknown message, either stray or meant for a handler down the line.
+info(StreamID, Info, State) ->
+	do_info(StreamID, Info, [], State).
+
+do_info(StreamID, Info, Commands1, State0=#state{next=Next0}) ->
+	{Commands2, Next} = cowboy_stream:info(StreamID, Info, Next0),
+	{Commands1 ++ Commands2, State0#state{next=Next}}.
 
 
 -spec terminate(cowboy_stream:streamid(), cowboy_stream:reason(), #state{}) -> ok.
 -spec terminate(cowboy_stream:streamid(), cowboy_stream:reason(), #state{}) -> ok.
-terminate(_StreamID, _Reason, _State) ->
-	ok.
+terminate(StreamID, Reason, #state{next=Next}) ->
+	cowboy_stream:terminate(StreamID, Reason, Next).
 
 
 -spec early_error(cowboy_stream:streamid(), cowboy_stream:reason(),
 -spec early_error(cowboy_stream:streamid(), cowboy_stream:reason(),
 	cowboy_stream:partial_req(), Resp, cowboy:opts()) -> Resp
 	cowboy_stream:partial_req(), Resp, cowboy:opts()) -> Resp