|
@@ -34,7 +34,8 @@
|
|
read_body_timer_ref = undefined :: reference() | undefined,
|
|
read_body_timer_ref = undefined :: reference() | undefined,
|
|
read_body_length = 0 :: non_neg_integer() | infinity,
|
|
read_body_length = 0 :: non_neg_integer() | infinity,
|
|
read_body_is_fin = nofin :: nofin | {fin, non_neg_integer()},
|
|
read_body_is_fin = nofin :: nofin | {fin, non_neg_integer()},
|
|
- read_body_buffer = <<>> :: binary()
|
|
|
|
|
|
+ read_body_buffer = <<>> :: binary(),
|
|
|
|
+ body_length = 0 :: non_neg_integer()
|
|
}).
|
|
}).
|
|
|
|
|
|
%% @todo For shutting down children we need to have a timeout before we terminate
|
|
%% @todo For shutting down children we need to have a timeout before we terminate
|
|
@@ -54,17 +55,31 @@ init(_StreamID, Req=#{ref := Ref}, Opts) ->
|
|
%% If we accumulated enough data or IsFin=fin, send it.
|
|
%% If we accumulated enough data or IsFin=fin, send it.
|
|
%% If not, buffer it.
|
|
%% If not, buffer it.
|
|
%% If not, buffer it.
|
|
%% If not, buffer it.
|
|
|
|
+
|
|
-spec data(cowboy_stream:streamid(), cowboy_stream:fin(), cowboy_req:resp_body(), State)
|
|
-spec data(cowboy_stream:streamid(), cowboy_stream:fin(), cowboy_req:resp_body(), State)
|
|
-> {cowboy_stream:commands(), State} when State::#state{}.
|
|
-> {cowboy_stream:commands(), State} when State::#state{}.
|
|
-data(_StreamID, IsFin, Data, State=#state{read_body_ref=undefined, read_body_buffer=Buffer}) ->
|
|
|
|
- {[], State#state{read_body_is_fin=IsFin, read_body_buffer= << Buffer/binary, Data/binary >>}};
|
|
|
|
-data(_StreamID, nofin, Data, State=#state{read_body_length=Length, read_body_buffer=Buffer}) when byte_size(Data) + byte_size(Buffer) < Length ->
|
|
|
|
- {[], State#state{read_body_buffer= << Buffer/binary, Data/binary >>}};
|
|
|
|
|
|
+data(_StreamID, IsFin, Data, State=#state{
|
|
|
|
+ read_body_ref=undefined, read_body_buffer=Buffer, body_length=BodyLen}) ->
|
|
|
|
+ {[], State#state{
|
|
|
|
+ read_body_is_fin=IsFin,
|
|
|
|
+ read_body_buffer= << Buffer/binary, Data/binary >>,
|
|
|
|
+ body_length=BodyLen + byte_size(Data)}};
|
|
|
|
+data(_StreamID, nofin, Data, State=#state{
|
|
|
|
+ read_body_length=ReadLen, read_body_buffer=Buffer, body_length=BodyLen})
|
|
|
|
+ when byte_size(Data) + byte_size(Buffer) < ReadLen ->
|
|
|
|
+ {[], State#state{
|
|
|
|
+ read_body_buffer= << Buffer/binary, Data/binary >>,
|
|
|
|
+ body_length=BodyLen + byte_size(Data)}};
|
|
data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref,
|
|
data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref,
|
|
- read_body_timer_ref=TRef, read_body_buffer=Buffer}) ->
|
|
|
|
|
|
+ read_body_timer_ref=TRef, read_body_buffer=Buffer, body_length=BodyLen0}) ->
|
|
|
|
+ BodyLen = BodyLen0 + byte_size(Data),
|
|
ok = erlang:cancel_timer(TRef, [{async, true}, {info, false}]),
|
|
ok = erlang:cancel_timer(TRef, [{async, true}, {info, false}]),
|
|
- Pid ! {request_body, Ref, IsFin, << Buffer/binary, Data/binary >>},
|
|
|
|
- {[], State#state{read_body_ref=undefined, read_body_timer_ref=undefined, read_body_buffer= <<>>}}.
|
|
|
|
|
|
+ send_request_body(Pid, Ref, IsFin, BodyLen, <<Buffer/binary, Data/binary>>),
|
|
|
|
+ {[], State#state{
|
|
|
|
+ read_body_ref=undefined,
|
|
|
|
+ read_body_timer_ref=undefined,
|
|
|
|
+ read_body_buffer= <<>>,
|
|
|
|
+ body_length=BodyLen}}.
|
|
|
|
|
|
-spec info(cowboy_stream:streamid(), any(), State)
|
|
-spec info(cowboy_stream:streamid(), any(), State)
|
|
-> {cowboy_stream:commands(), State} when State::#state{}.
|
|
-> {cowboy_stream:commands(), State} when State::#state{}.
|
|
@@ -86,15 +101,11 @@ info(StreamID, Exit = {'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref,
|
|
{error_response, 500, #{<<"content-length">> => <<"0">>}, <<>>},
|
|
{error_response, 500, #{<<"content-length">> => <<"0">>}, <<>>},
|
|
{internal_error, Exit, 'Stream process crashed.'}
|
|
{internal_error, Exit, 'Stream process crashed.'}
|
|
], State};
|
|
], State};
|
|
-%% Request body, no body buffer but IsFin=fin.
|
|
|
|
-%info(_StreamID, {read_body, Ref, _, _}, State=#state{pid=Pid, read_body_is_fin=fin, read_body_buffer= <<>>}) ->
|
|
|
|
-% Pid ! {request_body, Ref, fin, <<>>},
|
|
|
|
-% {[], State};
|
|
|
|
%% Request body, body buffered large enough or complete.
|
|
%% Request body, body buffered large enough or complete.
|
|
-info(_StreamID, {read_body, Ref, Length, _},
|
|
|
|
- State=#state{pid=Pid, read_body_is_fin=IsFin, read_body_buffer=Data})
|
|
|
|
- when element(1, IsFin) =:= fin; byte_size(Data) >= Length ->
|
|
|
|
- Pid ! {request_body, Ref, IsFin, Data},
|
|
|
|
|
|
+info(_StreamID, {read_body, Ref, Length, _}, State=#state{pid=Pid,
|
|
|
|
+ read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen})
|
|
|
|
+ when IsFin =:= fin; byte_size(Buffer) >= Length ->
|
|
|
|
+ send_request_body(Pid, Ref, IsFin, BodyLen, Buffer),
|
|
{[], State#state{read_body_buffer= <<>>}};
|
|
{[], State#state{read_body_buffer= <<>>}};
|
|
%% Request body, not enough to send yet.
|
|
%% Request body, not enough to send yet.
|
|
info(StreamID, {read_body, Ref, Length, Period}, State) ->
|
|
info(StreamID, {read_body, Ref, Length, Period}, State) ->
|
|
@@ -102,8 +113,8 @@ info(StreamID, {read_body, Ref, Length, Period}, State) ->
|
|
{[{flow, Length}], State#state{read_body_ref=Ref, read_body_timer_ref=TRef, read_body_length=Length}};
|
|
{[{flow, Length}], State#state{read_body_ref=Ref, read_body_timer_ref=TRef, read_body_length=Length}};
|
|
%% Request body reading timeout; send what we got.
|
|
%% Request body reading timeout; send what we got.
|
|
info(_StreamID, {read_body_timeout, Ref}, State=#state{pid=Pid, read_body_ref=Ref,
|
|
info(_StreamID, {read_body_timeout, Ref}, State=#state{pid=Pid, read_body_ref=Ref,
|
|
- read_body_is_fin=IsFin, read_body_buffer=Buffer}) ->
|
|
|
|
- Pid ! {request_body, Ref, IsFin, Buffer},
|
|
|
|
|
|
+ read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) ->
|
|
|
|
+ send_request_body(Pid, Ref, IsFin, BodyLen, Buffer),
|
|
{[], State#state{read_body_ref=undefined, read_body_timer_ref=undefined, read_body_buffer= <<>>}};
|
|
{[], State#state{read_body_ref=undefined, read_body_timer_ref=undefined, read_body_buffer= <<>>}};
|
|
info(_StreamID, {read_body_timeout, _}, State) ->
|
|
info(_StreamID, {read_body_timeout, _}, State) ->
|
|
{[], State};
|
|
{[], State};
|
|
@@ -132,6 +143,13 @@ terminate(_StreamID, _Reason, _State) ->
|
|
early_error(StreamID, Reason, PartialReq, Resp, Opts) ->
|
|
early_error(StreamID, Reason, PartialReq, Resp, Opts) ->
|
|
cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts).
|
|
cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts).
|
|
|
|
|
|
|
|
+send_request_body(Pid, Ref, nofin, _, Data) ->
|
|
|
|
+ Pid ! {request_body, Ref, nofin, Data},
|
|
|
|
+ ok;
|
|
|
|
+send_request_body(Pid, Ref, fin, BodyLen, Data) ->
|
|
|
|
+ Pid ! {request_body, Ref, fin, BodyLen, Data},
|
|
|
|
+ ok.
|
|
|
|
+
|
|
%% We use ~999999p here instead of ~w because the latter doesn't
|
|
%% We use ~999999p here instead of ~w because the latter doesn't
|
|
%% support printable strings.
|
|
%% support printable strings.
|
|
report_crash(_, _, _, normal, _) ->
|
|
report_crash(_, _, _, normal, _) ->
|