|
@@ -30,6 +30,7 @@
|
|
-record(state, {
|
|
-record(state, {
|
|
ref = undefined :: ranch:ref(),
|
|
ref = undefined :: ranch:ref(),
|
|
pid = undefined :: pid(),
|
|
pid = undefined :: pid(),
|
|
|
|
+ expect = undefined :: undefined | continue,
|
|
read_body_ref = undefined :: reference() | undefined,
|
|
read_body_ref = undefined :: reference() | undefined,
|
|
read_body_timer_ref = undefined :: reference() | undefined,
|
|
read_body_timer_ref = undefined :: reference() | undefined,
|
|
read_body_length = 0 :: non_neg_integer() | infinity,
|
|
read_body_length = 0 :: non_neg_integer() | infinity,
|
|
@@ -49,18 +50,35 @@ init(_StreamID, Req=#{ref := Ref}, Opts) ->
|
|
Middlewares = maps:get(middlewares, Opts, [cowboy_router, cowboy_handler]),
|
|
Middlewares = maps:get(middlewares, Opts, [cowboy_router, cowboy_handler]),
|
|
Shutdown = maps:get(shutdown_timeout, Opts, 5000),
|
|
Shutdown = maps:get(shutdown_timeout, Opts, 5000),
|
|
Pid = proc_lib:spawn_link(?MODULE, request_process, [Req, Env, Middlewares]),
|
|
Pid = proc_lib:spawn_link(?MODULE, request_process, [Req, Env, Middlewares]),
|
|
- {[{spawn, Pid, Shutdown}], #state{ref=Ref, pid=Pid}}.
|
|
|
|
|
|
+ Expect = expect(Req),
|
|
|
|
+ {[{spawn, Pid, Shutdown}], #state{ref=Ref, pid=Pid, expect=Expect}}.
|
|
|
|
+
|
|
|
|
+%% Ignore the expect header in HTTP/1.0.
|
|
|
|
+expect(#{version := 'HTTP/1.0'}) ->
|
|
|
|
+ undefined;
|
|
|
|
+expect(Req) ->
|
|
|
|
+ try cowboy_req:parse_header(<<"expect">>, Req) of
|
|
|
|
+ Expect ->
|
|
|
|
+ Expect
|
|
|
|
+ catch _:_ ->
|
|
|
|
+ undefined
|
|
|
|
+ end.
|
|
|
|
|
|
%% If we receive data and stream is waiting for data:
|
|
%% If we receive data and stream is waiting for data:
|
|
%% If we accumulated enough data or IsFin=fin, send it.
|
|
%% If we accumulated enough data or IsFin=fin, send it.
|
|
%% If not, buffer it.
|
|
%% If not, buffer it.
|
|
%% If not, buffer it.
|
|
%% If not, buffer it.
|
|
|
|
+%%
|
|
|
|
+%% We always reset the expect field when we receive data,
|
|
|
|
+%% since the client started sending the request body before
|
|
|
|
+%% we could send a 100 continue response.
|
|
|
|
|
|
-spec data(cowboy_stream:streamid(), cowboy_stream:fin(), cowboy_req:resp_body(), State)
|
|
-spec data(cowboy_stream:streamid(), cowboy_stream:fin(), cowboy_req:resp_body(), State)
|
|
-> {cowboy_stream:commands(), State} when State::#state{}.
|
|
-> {cowboy_stream:commands(), State} when State::#state{}.
|
|
data(_StreamID, IsFin, Data, State=#state{
|
|
data(_StreamID, IsFin, Data, State=#state{
|
|
read_body_ref=undefined, read_body_buffer=Buffer, body_length=BodyLen}) ->
|
|
read_body_ref=undefined, read_body_buffer=Buffer, body_length=BodyLen}) ->
|
|
{[], State#state{
|
|
{[], State#state{
|
|
|
|
+ expect=undefined,
|
|
read_body_is_fin=IsFin,
|
|
read_body_is_fin=IsFin,
|
|
read_body_buffer= << Buffer/binary, Data/binary >>,
|
|
read_body_buffer= << Buffer/binary, Data/binary >>,
|
|
body_length=BodyLen + byte_size(Data)}};
|
|
body_length=BodyLen + byte_size(Data)}};
|
|
@@ -68,6 +86,7 @@ data(_StreamID, nofin, Data, State=#state{
|
|
read_body_length=ReadLen, read_body_buffer=Buffer, body_length=BodyLen})
|
|
read_body_length=ReadLen, read_body_buffer=Buffer, body_length=BodyLen})
|
|
when byte_size(Data) + byte_size(Buffer) < ReadLen ->
|
|
when byte_size(Data) + byte_size(Buffer) < ReadLen ->
|
|
{[], State#state{
|
|
{[], State#state{
|
|
|
|
+ expect=undefined,
|
|
read_body_buffer= << Buffer/binary, Data/binary >>,
|
|
read_body_buffer= << Buffer/binary, Data/binary >>,
|
|
body_length=BodyLen + byte_size(Data)}};
|
|
body_length=BodyLen + byte_size(Data)}};
|
|
data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref,
|
|
data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref,
|
|
@@ -76,6 +95,7 @@ data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref,
|
|
ok = erlang:cancel_timer(TRef, [{async, true}, {info, false}]),
|
|
ok = erlang:cancel_timer(TRef, [{async, true}, {info, false}]),
|
|
send_request_body(Pid, Ref, IsFin, BodyLen, <<Buffer/binary, Data/binary>>),
|
|
send_request_body(Pid, Ref, IsFin, BodyLen, <<Buffer/binary, Data/binary>>),
|
|
{[], State#state{
|
|
{[], State#state{
|
|
|
|
+ expect=undefined,
|
|
read_body_ref=undefined,
|
|
read_body_ref=undefined,
|
|
read_body_timer_ref=undefined,
|
|
read_body_timer_ref=undefined,
|
|
read_body_buffer= <<>>,
|
|
read_body_buffer= <<>>,
|
|
@@ -102,15 +122,25 @@ info(StreamID, Exit = {'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref,
|
|
{internal_error, Exit, 'Stream process crashed.'}
|
|
{internal_error, Exit, 'Stream process crashed.'}
|
|
], State};
|
|
], State};
|
|
%% Request body, body buffered large enough or complete.
|
|
%% Request body, body buffered large enough or complete.
|
|
|
|
+%%
|
|
|
|
+%% We do not send a 100 continue response if the client
|
|
|
|
+%% already started sending the body.
|
|
info(_StreamID, {read_body, Ref, Length, _}, State=#state{pid=Pid,
|
|
info(_StreamID, {read_body, Ref, Length, _}, State=#state{pid=Pid,
|
|
read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen})
|
|
read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen})
|
|
when IsFin =:= fin; byte_size(Buffer) >= Length ->
|
|
when IsFin =:= fin; byte_size(Buffer) >= Length ->
|
|
send_request_body(Pid, Ref, IsFin, BodyLen, Buffer),
|
|
send_request_body(Pid, Ref, IsFin, BodyLen, Buffer),
|
|
{[], State#state{read_body_buffer= <<>>}};
|
|
{[], State#state{read_body_buffer= <<>>}};
|
|
%% Request body, not enough to send yet.
|
|
%% Request body, not enough to send yet.
|
|
-info(StreamID, {read_body, Ref, Length, Period}, State) ->
|
|
|
|
|
|
+info(StreamID, {read_body, Ref, Length, Period}, State=#state{expect=Expect}) ->
|
|
|
|
+ Commands = case Expect of
|
|
|
|
+ continue -> [{inform, 100, #{}}, {flow, Length}];
|
|
|
|
+ undefined -> [{flow, Length}]
|
|
|
|
+ end,
|
|
TRef = erlang:send_after(Period, self(), {{self(), StreamID}, {read_body_timeout, Ref}}),
|
|
TRef = erlang:send_after(Period, self(), {{self(), StreamID}, {read_body_timeout, Ref}}),
|
|
- {[{flow, Length}], State#state{read_body_ref=Ref, read_body_timer_ref=TRef, read_body_length=Length}};
|
|
|
|
|
|
+ {Commands, State#state{
|
|
|
|
+ read_body_ref=Ref,
|
|
|
|
+ read_body_timer_ref=TRef,
|
|
|
|
+ read_body_length=Length}};
|
|
%% Request body reading timeout; send what we got.
|
|
%% Request body reading timeout; send what we got.
|
|
info(_StreamID, {read_body_timeout, Ref}, State=#state{pid=Pid, read_body_ref=Ref,
|
|
info(_StreamID, {read_body_timeout, Ref}, State=#state{pid=Pid, read_body_ref=Ref,
|
|
read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) ->
|
|
read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) ->
|
|
@@ -119,18 +149,27 @@ info(_StreamID, {read_body_timeout, Ref}, State=#state{pid=Pid, read_body_ref=Re
|
|
info(_StreamID, {read_body_timeout, _}, State) ->
|
|
info(_StreamID, {read_body_timeout, _}, State) ->
|
|
{[], State};
|
|
{[], State};
|
|
%% Response.
|
|
%% Response.
|
|
-info(_StreamID, Inform = {inform, _, _}, State) ->
|
|
|
|
|
|
+%%
|
|
|
|
+%% We reset the expect field when a 100 continue response
|
|
|
|
+%% is sent or when any final response is sent.
|
|
|
|
+info(_StreamID, Inform = {inform, Status, _}, State0) ->
|
|
|
|
+ State = case Status of
|
|
|
|
+ 100 -> State0#state{expect=undefined};
|
|
|
|
+ <<"100">> -> State0#state{expect=undefined};
|
|
|
|
+ <<"100 ", _/bits>> -> State0#state{expect=undefined};
|
|
|
|
+ _ -> State0
|
|
|
|
+ end,
|
|
{[Inform], State};
|
|
{[Inform], State};
|
|
info(_StreamID, Response = {response, _, _, _}, State) ->
|
|
info(_StreamID, Response = {response, _, _, _}, State) ->
|
|
- {[Response], State};
|
|
|
|
|
|
+ {[Response], State#state{expect=undefined}};
|
|
info(_StreamID, Headers = {headers, _, _}, State) ->
|
|
info(_StreamID, Headers = {headers, _, _}, State) ->
|
|
- {[Headers], State};
|
|
|
|
|
|
+ {[Headers], State#state{expect=undefined}};
|
|
info(_StreamID, Data = {data, _, _}, State) ->
|
|
info(_StreamID, Data = {data, _, _}, State) ->
|
|
{[Data], State};
|
|
{[Data], State};
|
|
info(_StreamID, Push = {push, _, _, _, _, _, _, _}, State) ->
|
|
info(_StreamID, Push = {push, _, _, _, _, _, _, _}, State) ->
|
|
{[Push], State};
|
|
{[Push], State};
|
|
info(_StreamID, SwitchProtocol = {switch_protocol, _, _, _}, State) ->
|
|
info(_StreamID, SwitchProtocol = {switch_protocol, _, _, _}, State) ->
|
|
- {[SwitchProtocol], State};
|
|
|
|
|
|
+ {[SwitchProtocol], State#state{expect=undefined}};
|
|
%% Stray message.
|
|
%% Stray message.
|
|
info(_StreamID, _Info, State) ->
|
|
info(_StreamID, _Info, State) ->
|
|
{[], State}.
|
|
{[], State}.
|