Browse Source

Ensure we can read the request body from any process

Loïc Hoguin 5 years ago
parent
commit
20660d7566
5 changed files with 28 additions and 9 deletions
  1. 1 1
      src/cowboy_req.erl
  2. 10 7
      src/cowboy_stream_h.erl
  3. 1 1
      src/cowboy_websocket.erl
  4. 10 0
      test/handlers/echo_h.erl
  5. 6 0
      test/req_SUITE.erl

+ 1 - 1
src/cowboy_req.erl

@@ -491,7 +491,7 @@ read_body(Req=#{pid := Pid, streamid := StreamID}, Opts) ->
 	Period = maps:get(period, Opts, 15000),
 	Timeout = maps:get(timeout, Opts, Period + 1000),
 	Ref = make_ref(),
-	Pid ! {{Pid, StreamID}, {read_body, Ref, Length, Period}},
+	Pid ! {{Pid, StreamID}, {read_body, self(), Ref, Length, Period}},
 	receive
 		{request_body, Ref, nofin, Body} ->
 			{more, Body, Req};

+ 10 - 7
src/cowboy_stream_h.erl

@@ -34,6 +34,7 @@
 	ref = undefined :: ranch:ref(),
 	pid = undefined :: pid(),
 	expect = undefined :: undefined | continue,
+	read_body_pid = undefined :: pid() | undefined,
 	read_body_ref = undefined :: reference() | undefined,
 	read_body_timer_ref = undefined :: reference() | undefined,
 	read_body_length = 0 :: non_neg_integer() | infinity | auto,
@@ -94,7 +95,7 @@ data(StreamID, IsFin, Data, State=#state{
 %% Stream is waiting for data using auto mode.
 %%
 %% There is no buffering done in auto mode.
-data(StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref,
+data(StreamID, IsFin, Data, State=#state{read_body_pid=Pid, read_body_ref=Ref,
 		read_body_length=auto, body_length=BodyLen}) ->
 	send_request_body(Pid, Ref, IsFin, BodyLen, Data),
 	do_data(StreamID, IsFin, Data, [{flow, byte_size(Data)}], State#state{
@@ -111,7 +112,7 @@ data(StreamID, IsFin=nofin, Data, State=#state{
 		body_length=BodyLen + byte_size(Data)
 	});
 %% Stream is waiting for data and we received enough to send.
-data(StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref,
+data(StreamID, IsFin, Data, State=#state{read_body_pid=Pid, read_body_ref=Ref,
 		read_body_timer_ref=TRef, read_body_buffer=Buffer, body_length=BodyLen0}) ->
 	BodyLen = BodyLen0 + byte_size(Data),
 	%% @todo Handle the infinity case where no TRef was defined.
@@ -162,13 +163,14 @@ info(StreamID, Exit={'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref, p
 		{error_response, 500, #{<<"content-length">> => <<"0">>}, <<>>}
 	|Commands], State);
 %% Request body, auto mode, no body buffered.
-info(StreamID, Info={read_body, Ref, auto, infinity}, State=#state{read_body_buffer= <<>>}) ->
+info(StreamID, Info={read_body, Pid, Ref, auto, infinity}, State=#state{read_body_buffer= <<>>}) ->
 	do_info(StreamID, Info, [], State#state{
+		read_body_pid=Pid,
 		read_body_ref=Ref,
 		read_body_length=auto
 	});
 %% Request body, auto mode, body buffered or complete.
-info(StreamID, Info={read_body, Ref, auto, infinity}, State=#state{pid=Pid,
+info(StreamID, Info={read_body, Pid, Ref, auto, infinity}, State=#state{
 		read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) ->
 	send_request_body(Pid, Ref, IsFin, BodyLen, Buffer),
 	do_info(StreamID, Info, [{flow, byte_size(Buffer)}],
@@ -177,13 +179,13 @@ info(StreamID, Info={read_body, Ref, auto, infinity}, State=#state{pid=Pid,
 %%
 %% We do not send a 100 continue response if the client
 %% already started sending the body.
-info(StreamID, Info={read_body, Ref, Length, _}, State=#state{pid=Pid,
+info(StreamID, Info={read_body, Pid, Ref, Length, _}, State=#state{
 		read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen})
 		when IsFin =:= fin; byte_size(Buffer) >= Length ->
 	send_request_body(Pid, Ref, IsFin, BodyLen, Buffer),
 	do_info(StreamID, Info, [], State#state{read_body_buffer= <<>>});
 %% Request body, not enough to send yet.
-info(StreamID, Info={read_body, Ref, Length, Period}, State=#state{expect=Expect}) ->
+info(StreamID, Info={read_body, Pid, Ref, Length, Period}, State=#state{expect=Expect}) ->
 	Commands = case Expect of
 		continue -> [{inform, 100, #{}}, {flow, Length}];
 		undefined -> [{flow, Length}]
@@ -191,12 +193,13 @@ info(StreamID, Info={read_body, Ref, Length, Period}, State=#state{expect=Expect
 	%% @todo Handle the case where Period =:= infinity.
 	TRef = erlang:send_after(Period, self(), {{self(), StreamID}, {read_body_timeout, Ref}}),
 	do_info(StreamID, Info, Commands, State#state{
+		read_body_pid=Pid,
 		read_body_ref=Ref,
 		read_body_timer_ref=TRef,
 		read_body_length=Length
 	});
 %% Request body reading timeout; send what we got.
-info(StreamID, Info={read_body_timeout, Ref}, State=#state{pid=Pid, read_body_ref=Ref,
+info(StreamID, Info={read_body_timeout, Ref}, State=#state{read_body_pid=Pid, read_body_ref=Ref,
 		read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) ->
 	send_request_body(Pid, Ref, IsFin, BodyLen, Buffer),
 	do_info(StreamID, Info, [], State#state{

+ 1 - 1
src/cowboy_websocket.erl

@@ -305,7 +305,7 @@ before_loop(State=#state{socket=Stream={Pid, _}, transport=undefined},
 		HandlerState, ParseState) ->
 	%% @todo Keep Ref around.
 	ReadBodyRef = make_ref(),
-	Pid ! {Stream, {read_body, ReadBodyRef, auto, infinity}},
+	Pid ! {Stream, {read_body, self(), ReadBodyRef, auto, infinity}},
 	loop(State, HandlerState, ParseState);
 before_loop(State=#state{socket=Socket, transport=Transport, hibernate=true},
 		HandlerState, ParseState) ->

+ 10 - 0
test/handlers/echo_h.erl

@@ -30,6 +30,16 @@ echo(<<"read_body">>, Req0, Opts) ->
 			Length = cowboy_req:body_length(Req1),
 			{ok, integer_to_binary(Length), Req1};
 		<<"/opts", _/bits>> -> cowboy_req:read_body(Req0, Opts);
+		<<"/spawn", _/bits>> ->
+			Parent = self(),
+			Pid = spawn_link(fun() ->
+				Parent ! {self(), cowboy_req:read_body(Req0)}
+			end),
+			receive
+				{Pid, Msg} -> Msg
+			after 5000 ->
+				error(timeout)
+			end;
 		_ -> cowboy_req:read_body(Req0)
 	end,
 	{ok, cowboy_req:reply(200, #{}, Body, Req), Opts};

+ 6 - 0
test/req_SUITE.erl

@@ -57,6 +57,7 @@ init_dispatch(Config) ->
 		{"/opts/:key/timeout", echo_h, #{timeout => 1000, crash => true}},
 		{"/100-continue/:key", echo_h, []},
 		{"/full/:key", echo_h, []},
+		{"/spawn/:key", echo_h, []},
 		{"/no/:key", echo_h, []},
 		{"/direct/:key/[...]", echo_h, []},
 		{"/:key/[...]", echo_h, []}
@@ -488,6 +489,11 @@ do_read_body_timeout(Path, Body, Config) ->
 	{response, _, 500, _} = gun:await(ConnPid, Ref),
 	gun:close(ConnPid).
 
+read_body_spawn(Config) ->
+	doc("Confirm we can use cowboy_req:read_body/1,2 from another process."),
+	<<"hello world!">> = do_body("POST", "/spawn/read_body", [], "hello world!", Config),
+	ok.
+
 read_body_expect_100_continue(Config) ->
 	doc("Request body with a 100-continue expect header."),
 	do_read_body_expect_100_continue("/read_body", Config).