cowboy_stream_h.erl 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. %% Copyright (c) 2016-2017, Loïc Hoguin <essen@ninenines.eu>
  2. %%
  3. %% Permission to use, copy, modify, and/or distribute this software for any
  4. %% purpose with or without fee is hereby granted, provided that the above
  5. %% copyright notice and this permission notice appear in all copies.
  6. %%
  7. %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  8. %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  9. %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  10. %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  11. %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  12. %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  13. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  14. -module(cowboy_stream_h).
  15. -behavior(cowboy_stream).
  16. -ifdef(OTP_RELEASE).
  17. -compile({nowarn_deprecated_function, [{erlang, get_stacktrace, 0}]}).
  18. -endif.
  19. -export([init/3]).
  20. -export([data/4]).
  21. -export([info/3]).
  22. -export([terminate/3]).
  23. -export([early_error/5]).
  24. -export([request_process/3]).
  25. -export([resume/5]).
  26. -record(state, {
  27. next :: any(),
  28. ref = undefined :: ranch:ref(),
  29. pid = undefined :: pid(),
  30. expect = undefined :: undefined | continue,
  31. read_body_pid = undefined :: pid() | undefined,
  32. read_body_ref = undefined :: reference() | undefined,
  33. read_body_timer_ref = undefined :: reference() | undefined,
  34. read_body_length = 0 :: non_neg_integer() | infinity | auto,
  35. read_body_is_fin = nofin :: nofin | {fin, non_neg_integer()},
  36. read_body_buffer = <<>> :: binary(),
  37. body_length = 0 :: non_neg_integer(),
  38. stream_body_pid = undefined :: pid() | undefined,
  39. stream_body_status = normal :: normal | blocking | blocked
  40. }).
  41. -spec init(cowboy_stream:streamid(), cowboy_req:req(), cowboy:opts())
  42. -> {[{spawn, pid(), timeout()}], #state{}}.
  43. init(StreamID, Req=#{ref := Ref}, Opts) ->
  44. Env = maps:get(env, Opts, #{}),
  45. Middlewares = maps:get(middlewares, Opts, [cowboy_router, cowboy_handler]),
  46. Shutdown = maps:get(shutdown_timeout, Opts, 5000),
  47. Pid = proc_lib:spawn_link(?MODULE, request_process, [Req, Env, Middlewares]),
  48. Expect = expect(Req),
  49. {Commands, Next} = cowboy_stream:init(StreamID, Req, Opts),
  50. {[{spawn, Pid, Shutdown}|Commands],
  51. #state{next=Next, ref=Ref, pid=Pid, expect=Expect}}.
  52. %% Ignore the expect header in HTTP/1.0.
  53. expect(#{version := 'HTTP/1.0'}) ->
  54. undefined;
  55. expect(Req) ->
  56. try cowboy_req:parse_header(<<"expect">>, Req) of
  57. Expect ->
  58. Expect
  59. catch _:_ ->
  60. undefined
  61. end.
  62. %% If we receive data and stream is waiting for data:
  63. %% If we accumulated enough data or IsFin=fin, send it.
  64. %% If we are in auto mode, send it and update flow control.
  65. %% If not, buffer it.
  66. %% If not, buffer it.
  67. %%
  68. %% We always reset the expect field when we receive data,
  69. %% since the client started sending the request body before
  70. %% we could send a 100 continue response.
  71. -spec data(cowboy_stream:streamid(), cowboy_stream:fin(), cowboy_req:resp_body(), State)
  72. -> {cowboy_stream:commands(), State} when State::#state{}.
  73. %% Stream isn't waiting for data.
  74. data(StreamID, IsFin, Data, State=#state{
  75. read_body_ref=undefined, read_body_buffer=Buffer, body_length=BodyLen}) ->
  76. do_data(StreamID, IsFin, Data, [], State#state{
  77. expect=undefined,
  78. read_body_is_fin=IsFin,
  79. read_body_buffer= << Buffer/binary, Data/binary >>,
  80. body_length=BodyLen + byte_size(Data)
  81. });
  82. %% Stream is waiting for data using auto mode.
  83. %%
  84. %% There is no buffering done in auto mode.
  85. data(StreamID, IsFin, Data, State=#state{read_body_pid=Pid, read_body_ref=Ref,
  86. read_body_length=auto, body_length=BodyLen}) ->
  87. send_request_body(Pid, Ref, IsFin, BodyLen, Data),
  88. do_data(StreamID, IsFin, Data, [{flow, byte_size(Data)}], State#state{
  89. read_body_ref=undefined,
  90. body_length=BodyLen
  91. });
  92. %% Stream is waiting for data but we didn't receive enough to send yet.
  93. data(StreamID, IsFin=nofin, Data, State=#state{
  94. read_body_length=ReadLen, read_body_buffer=Buffer, body_length=BodyLen})
  95. when byte_size(Data) + byte_size(Buffer) < ReadLen ->
  96. do_data(StreamID, IsFin, Data, [], State#state{
  97. expect=undefined,
  98. read_body_buffer= << Buffer/binary, Data/binary >>,
  99. body_length=BodyLen + byte_size(Data)
  100. });
  101. %% Stream is waiting for data and we received enough to send.
  102. data(StreamID, IsFin, Data, State=#state{read_body_pid=Pid, read_body_ref=Ref,
  103. read_body_timer_ref=TRef, read_body_buffer=Buffer, body_length=BodyLen0}) ->
  104. BodyLen = BodyLen0 + byte_size(Data),
  105. ok = erlang:cancel_timer(TRef, [{async, true}, {info, false}]),
  106. send_request_body(Pid, Ref, IsFin, BodyLen, <<Buffer/binary, Data/binary>>),
  107. do_data(StreamID, IsFin, Data, [], State#state{
  108. expect=undefined,
  109. read_body_ref=undefined,
  110. read_body_timer_ref=undefined,
  111. read_body_buffer= <<>>,
  112. body_length=BodyLen
  113. }).
  114. do_data(StreamID, IsFin, Data, Commands1, State=#state{next=Next0}) ->
  115. {Commands2, Next} = cowboy_stream:data(StreamID, IsFin, Data, Next0),
  116. {Commands1 ++ Commands2, State#state{next=Next}}.
  117. -spec info(cowboy_stream:streamid(), any(), State)
  118. -> {cowboy_stream:commands(), State} when State::#state{}.
  119. info(StreamID, Info={'EXIT', Pid, normal}, State=#state{pid=Pid}) ->
  120. do_info(StreamID, Info, [stop], State);
  121. info(StreamID, Info={'EXIT', Pid, {{request_error, Reason, _HumanReadable}, _}},
  122. State=#state{pid=Pid}) ->
  123. Status = case Reason of
  124. timeout -> 408;
  125. payload_too_large -> 413;
  126. _ -> 400
  127. end,
  128. %% @todo Headers? Details in body? Log the crash? More stuff in debug only?
  129. do_info(StreamID, Info, [
  130. {error_response, Status, #{<<"content-length">> => <<"0">>}, <<>>},
  131. stop
  132. ], State);
  133. info(StreamID, Exit={'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref, pid=Pid}) ->
  134. Commands0 = [{internal_error, Exit, 'Stream process crashed.'}],
  135. Commands = case Reason of
  136. normal -> Commands0;
  137. shutdown -> Commands0;
  138. {shutdown, _} -> Commands0;
  139. _ -> [{log, error,
  140. "Ranch listener ~p, connection process ~p, stream ~p "
  141. "had its request process ~p exit with reason "
  142. "~999999p and stacktrace ~999999p~n",
  143. [Ref, self(), StreamID, Pid, Reason, Stacktrace]}
  144. |Commands0]
  145. end,
  146. do_info(StreamID, Exit, [
  147. {error_response, 500, #{<<"content-length">> => <<"0">>}, <<>>}
  148. |Commands], State);
  149. %% Request body, auto mode, no body buffered.
  150. info(StreamID, Info={read_body, Pid, Ref, auto, infinity}, State=#state{read_body_buffer= <<>>}) ->
  151. do_info(StreamID, Info, [], State#state{
  152. read_body_pid=Pid,
  153. read_body_ref=Ref,
  154. read_body_length=auto
  155. });
  156. %% Request body, auto mode, body buffered or complete.
  157. info(StreamID, Info={read_body, Pid, Ref, auto, infinity}, State=#state{
  158. read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) ->
  159. send_request_body(Pid, Ref, IsFin, BodyLen, Buffer),
  160. do_info(StreamID, Info, [{flow, byte_size(Buffer)}],
  161. State#state{read_body_buffer= <<>>});
  162. %% Request body, body buffered large enough or complete.
  163. %%
  164. %% We do not send a 100 continue response if the client
  165. %% already started sending the body.
  166. info(StreamID, Info={read_body, Pid, Ref, Length, _}, State=#state{
  167. read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen})
  168. when IsFin =:= fin; byte_size(Buffer) >= Length ->
  169. send_request_body(Pid, Ref, IsFin, BodyLen, Buffer),
  170. do_info(StreamID, Info, [], State#state{read_body_buffer= <<>>});
  171. %% Request body, not enough to send yet.
  172. info(StreamID, Info={read_body, Pid, Ref, Length, Period}, State=#state{expect=Expect}) ->
  173. Commands = case Expect of
  174. continue -> [{inform, 100, #{}}, {flow, Length}];
  175. undefined -> [{flow, Length}]
  176. end,
  177. TRef = erlang:send_after(Period, self(), {{self(), StreamID}, {read_body_timeout, Ref}}),
  178. do_info(StreamID, Info, Commands, State#state{
  179. read_body_pid=Pid,
  180. read_body_ref=Ref,
  181. read_body_timer_ref=TRef,
  182. read_body_length=Length
  183. });
  184. %% Request body reading timeout; send what we got.
  185. info(StreamID, Info={read_body_timeout, Ref}, State=#state{read_body_pid=Pid, read_body_ref=Ref,
  186. read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) ->
  187. send_request_body(Pid, Ref, IsFin, BodyLen, Buffer),
  188. do_info(StreamID, Info, [], State#state{
  189. read_body_ref=undefined,
  190. read_body_timer_ref=undefined,
  191. read_body_buffer= <<>>
  192. });
  193. info(StreamID, Info={read_body_timeout, _}, State) ->
  194. do_info(StreamID, Info, [], State);
  195. %% Response.
  196. %%
  197. %% We reset the expect field when a 100 continue response
  198. %% is sent or when any final response is sent.
  199. info(StreamID, Inform={inform, Status, _}, State0) ->
  200. State = case cow_http:status_to_integer(Status) of
  201. 100 -> State0#state{expect=undefined};
  202. _ -> State0
  203. end,
  204. do_info(StreamID, Inform, [Inform], State);
  205. info(StreamID, Response={response, _, _, _}, State) ->
  206. do_info(StreamID, Response, [Response], State#state{expect=undefined});
  207. info(StreamID, Headers={headers, _, _}, State) ->
  208. do_info(StreamID, Headers, [Headers], State#state{expect=undefined});
  209. %% Sending data involves the data message, the stream_buffer_full alarm
  210. %% and the connection_buffer_full alarm. We stop sending acks when an alarm is on.
  211. %%
  212. %% We only apply backpressure when the message includes a pid. Otherwise
  213. %% it is a message from Cowboy, or the user circumventing the backpressure.
  214. %%
  215. %% We currently do not support sending data from multiple processes concurrently.
  216. info(StreamID, Data={data, _, _}, State) ->
  217. do_info(StreamID, Data, [Data], State);
  218. info(StreamID, Data0={data, Pid, _, _}, State0=#state{stream_body_status=Status}) ->
  219. State = case Status of
  220. normal ->
  221. Pid ! {data_ack, self()},
  222. State0;
  223. blocking ->
  224. State0#state{stream_body_pid=Pid, stream_body_status=blocked};
  225. blocked ->
  226. State0
  227. end,
  228. Data = erlang:delete_element(2, Data0),
  229. do_info(StreamID, Data, [Data], State);
  230. info(StreamID, Alarm={alarm, Name, on}, State)
  231. when Name =:= connection_buffer_full; Name =:= stream_buffer_full ->
  232. do_info(StreamID, Alarm, [], State#state{stream_body_status=blocking});
  233. info(StreamID, Alarm={alarm, Name, off}, State=#state{stream_body_pid=Pid, stream_body_status=Status})
  234. when Name =:= connection_buffer_full; Name =:= stream_buffer_full ->
  235. _ = case Status of
  236. normal -> ok;
  237. blocking -> ok;
  238. blocked -> Pid ! {data_ack, self()}
  239. end,
  240. do_info(StreamID, Alarm, [], State#state{stream_body_pid=undefined, stream_body_status=normal});
  241. info(StreamID, Trailers={trailers, _}, State) ->
  242. do_info(StreamID, Trailers, [Trailers], State);
  243. info(StreamID, Push={push, _, _, _, _, _, _, _}, State) ->
  244. do_info(StreamID, Push, [Push], State);
  245. info(StreamID, SwitchProtocol={switch_protocol, _, _, _}, State) ->
  246. do_info(StreamID, SwitchProtocol, [SwitchProtocol], State#state{expect=undefined});
  247. %% Convert the set_options message to a command.
  248. info(StreamID, SetOptions={set_options, _}, State) ->
  249. do_info(StreamID, SetOptions, [SetOptions], State);
  250. %% Unknown message, either stray or meant for a handler down the line.
  251. info(StreamID, Info, State) ->
  252. do_info(StreamID, Info, [], State).
  253. do_info(StreamID, Info, Commands1, State0=#state{next=Next0}) ->
  254. {Commands2, Next} = cowboy_stream:info(StreamID, Info, Next0),
  255. {Commands1 ++ Commands2, State0#state{next=Next}}.
  256. -spec terminate(cowboy_stream:streamid(), cowboy_stream:reason(), #state{}) -> ok.
  257. terminate(StreamID, Reason, #state{next=Next}) ->
  258. cowboy_stream:terminate(StreamID, Reason, Next).
  259. -spec early_error(cowboy_stream:streamid(), cowboy_stream:reason(),
  260. cowboy_stream:partial_req(), Resp, cowboy:opts()) -> Resp
  261. when Resp::cowboy_stream:resp_command().
  262. early_error(StreamID, Reason, PartialReq, Resp, Opts) ->
  263. cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts).
  264. send_request_body(Pid, Ref, nofin, _, Data) ->
  265. Pid ! {request_body, Ref, nofin, Data},
  266. ok;
  267. send_request_body(Pid, Ref, fin, BodyLen, Data) ->
  268. Pid ! {request_body, Ref, fin, BodyLen, Data},
  269. ok.
  270. %% Request process.
  271. %% We catch all exceptions in order to add the stacktrace to
  272. %% the exit reason as it is not propagated by proc_lib otherwise
  273. %% and therefore not present in the 'EXIT' message. We want
  274. %% the stacktrace in order to simplify debugging of errors.
  275. %%
  276. %% This + the behavior in proc_lib means that we will get a
  277. %% {Reason, Stacktrace} tuple for every exceptions, instead of
  278. %% just for errors and throws.
  279. %%
  280. %% @todo Better spec.
  281. -spec request_process(cowboy_req:req(), cowboy_middleware:env(), [module()]) -> ok.
  282. request_process(Req, Env, Middlewares) ->
  283. OTP = erlang:system_info(otp_release),
  284. try
  285. execute(Req, Env, Middlewares)
  286. catch
  287. exit:Reason ->
  288. Stacktrace = erlang:get_stacktrace(),
  289. erlang:raise(exit, {Reason, Stacktrace}, Stacktrace);
  290. %% OTP 19 does not propagate any exception stacktraces,
  291. %% we therefore add it for every class of exception.
  292. _:Reason when OTP =:= "19" ->
  293. Stacktrace = erlang:get_stacktrace(),
  294. erlang:raise(exit, {Reason, Stacktrace}, Stacktrace);
  295. %% @todo I don't think this clause is necessary.
  296. Class:Reason ->
  297. erlang:raise(Class, Reason, erlang:get_stacktrace())
  298. end.
  299. execute(_, _, []) ->
  300. ok;
  301. execute(Req, Env, [Middleware|Tail]) ->
  302. case Middleware:execute(Req, Env) of
  303. {ok, Req2, Env2} ->
  304. execute(Req2, Env2, Tail);
  305. {suspend, Module, Function, Args} ->
  306. proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module, Function, Args]);
  307. {stop, _Req2} ->
  308. ok
  309. end.
  310. -spec resume(cowboy_middleware:env(), [module()], module(), atom(), [any()]) -> ok.
  311. resume(Env, Tail, Module, Function, Args) ->
  312. case apply(Module, Function, Args) of
  313. {ok, Req2, Env2} ->
  314. execute(Req2, Env2, Tail);
  315. {suspend, Module2, Function2, Args2} ->
  316. proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module2, Function2, Args2]);
  317. {stop, _Req2} ->
  318. ok
  319. end.