cowboy_stream_h.erl 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. %% Copyright (c) 2016-2017, Loïc Hoguin <essen@ninenines.eu>
  2. %%
  3. %% Permission to use, copy, modify, and/or distribute this software for any
  4. %% purpose with or without fee is hereby granted, provided that the above
  5. %% copyright notice and this permission notice appear in all copies.
  6. %%
  7. %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  8. %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  9. %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  10. %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  11. %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  12. %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  13. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  14. -module(cowboy_stream_h).
  15. -behavior(cowboy_stream).
  16. -ifdef(OTP_RELEASE).
  17. -compile({nowarn_deprecated_function, [{erlang, get_stacktrace, 0}]}).
  18. -endif.
  19. -export([init/3]).
  20. -export([data/4]).
  21. -export([info/3]).
  22. -export([terminate/3]).
  23. -export([early_error/5]).
  24. -export([request_process/3]).
  25. -export([execute/3]).
  26. -export([resume/5]).
  27. -record(state, {
  28. next :: any(),
  29. ref = undefined :: ranch:ref(),
  30. pid = undefined :: pid(),
  31. expect = undefined :: undefined | continue,
  32. read_body_ref = undefined :: reference() | undefined,
  33. read_body_timer_ref = undefined :: reference() | undefined,
  34. read_body_length = 0 :: non_neg_integer() | infinity | auto,
  35. read_body_is_fin = nofin :: nofin | {fin, non_neg_integer()},
  36. read_body_buffer = <<>> :: binary(),
  37. body_length = 0 :: non_neg_integer(),
  38. stream_body_status = normal :: normal | blocking | blocked
  39. }).
  40. %% @todo For shutting down children we need to have a timeout before we terminate
  41. %% the stream like supervisors do. So here just send a message to yourself first,
  42. %% and then decide what to do when receiving this message.
  43. -spec init(cowboy_stream:streamid(), cowboy_req:req(), cowboy:opts())
  44. -> {[{spawn, pid(), timeout()}], #state{}}.
  45. init(StreamID, Req=#{ref := Ref}, Opts) ->
  46. Env = maps:get(env, Opts, #{}),
  47. Middlewares = maps:get(middlewares, Opts, [cowboy_router, cowboy_handler]),
  48. Shutdown = maps:get(shutdown_timeout, Opts, 5000),
  49. Pid = proc_lib:spawn_link(?MODULE, request_process, [Req, Env, Middlewares]),
  50. Expect = expect(Req),
  51. {Commands, Next} = cowboy_stream:init(StreamID, Req, Opts),
  52. {[{spawn, Pid, Shutdown}|Commands],
  53. #state{next=Next, ref=Ref, pid=Pid, expect=Expect}}.
  54. %% Ignore the expect header in HTTP/1.0.
  55. expect(#{version := 'HTTP/1.0'}) ->
  56. undefined;
  57. expect(Req) ->
  58. try cowboy_req:parse_header(<<"expect">>, Req) of
  59. Expect ->
  60. Expect
  61. catch _:_ ->
  62. undefined
  63. end.
  64. %% If we receive data and stream is waiting for data:
  65. %% If we accumulated enough data or IsFin=fin, send it.
  66. %% If we are in auto mode, send it and update flow control.
  67. %% If not, buffer it.
  68. %% If not, buffer it.
  69. %%
  70. %% We always reset the expect field when we receive data,
  71. %% since the client started sending the request body before
  72. %% we could send a 100 continue response.
  73. -spec data(cowboy_stream:streamid(), cowboy_stream:fin(), cowboy_req:resp_body(), State)
  74. -> {cowboy_stream:commands(), State} when State::#state{}.
  75. %% Stream isn't waiting for data.
  76. data(StreamID, IsFin, Data, State=#state{
  77. read_body_ref=undefined, read_body_buffer=Buffer, body_length=BodyLen}) ->
  78. do_data(StreamID, IsFin, Data, [], State#state{
  79. expect=undefined,
  80. read_body_is_fin=IsFin,
  81. read_body_buffer= << Buffer/binary, Data/binary >>,
  82. body_length=BodyLen + byte_size(Data)
  83. });
  84. %% Stream is waiting for data using auto mode.
  85. %%
  86. %% There is no buffering done in auto mode.
  87. data(StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref,
  88. read_body_length=auto, body_length=BodyLen}) ->
  89. send_request_body(Pid, Ref, IsFin, BodyLen, Data),
  90. do_data(StreamID, IsFin, Data, [{flow, byte_size(Data)}], State#state{
  91. read_body_ref=undefined,
  92. body_length=BodyLen
  93. });
  94. %% Stream is waiting for data but we didn't receive enough to send yet.
  95. data(StreamID, IsFin=nofin, Data, State=#state{
  96. read_body_length=ReadLen, read_body_buffer=Buffer, body_length=BodyLen})
  97. when byte_size(Data) + byte_size(Buffer) < ReadLen ->
  98. do_data(StreamID, IsFin, Data, [], State#state{
  99. expect=undefined,
  100. read_body_buffer= << Buffer/binary, Data/binary >>,
  101. body_length=BodyLen + byte_size(Data)
  102. });
  103. %% Stream is waiting for data and we received enough to send.
  104. data(StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref,
  105. read_body_timer_ref=TRef, read_body_buffer=Buffer, body_length=BodyLen0}) ->
  106. BodyLen = BodyLen0 + byte_size(Data),
  107. %% @todo Handle the infinity case where no TRef was defined.
  108. ok = erlang:cancel_timer(TRef, [{async, true}, {info, false}]),
  109. send_request_body(Pid, Ref, IsFin, BodyLen, <<Buffer/binary, Data/binary>>),
  110. do_data(StreamID, IsFin, Data, [], State#state{
  111. expect=undefined,
  112. read_body_ref=undefined,
  113. read_body_timer_ref=undefined,
  114. read_body_buffer= <<>>,
  115. body_length=BodyLen
  116. }).
  117. do_data(StreamID, IsFin, Data, Commands1, State=#state{next=Next0}) ->
  118. {Commands2, Next} = cowboy_stream:data(StreamID, IsFin, Data, Next0),
  119. {Commands1 ++ Commands2, State#state{next=Next}}.
  120. -spec info(cowboy_stream:streamid(), any(), State)
  121. -> {cowboy_stream:commands(), State} when State::#state{}.
  122. info(StreamID, Info={'EXIT', Pid, normal}, State=#state{pid=Pid}) ->
  123. do_info(StreamID, Info, [stop], State);
  124. info(StreamID, Info={'EXIT', Pid, {{request_error, Reason, _HumanReadable}, _}},
  125. State=#state{pid=Pid}) ->
  126. Status = case Reason of
  127. timeout -> 408;
  128. payload_too_large -> 413;
  129. _ -> 400
  130. end,
  131. %% @todo Headers? Details in body? Log the crash? More stuff in debug only?
  132. do_info(StreamID, Info, [
  133. {error_response, Status, #{<<"content-length">> => <<"0">>}, <<>>},
  134. stop
  135. ], State);
  136. info(StreamID, Exit={'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref, pid=Pid}) ->
  137. Commands0 = [{internal_error, Exit, 'Stream process crashed.'}],
  138. Commands = case Reason of
  139. normal -> Commands0;
  140. shutdown -> Commands0;
  141. {shutdown, _} -> Commands0;
  142. _ -> [{log, error,
  143. "Ranch listener ~p, connection process ~p, stream ~p "
  144. "had its request process ~p exit with reason "
  145. "~999999p and stacktrace ~999999p~n",
  146. [Ref, self(), StreamID, Pid, Reason, Stacktrace]}
  147. |Commands0]
  148. end,
  149. do_info(StreamID, Exit, [
  150. {error_response, 500, #{<<"content-length">> => <<"0">>}, <<>>}
  151. |Commands], State);
  152. %% Request body, auto mode, no body buffered.
  153. info(StreamID, Info={read_body, Ref, auto, infinity}, State=#state{read_body_buffer= <<>>}) ->
  154. do_info(StreamID, Info, [], State#state{
  155. read_body_ref=Ref,
  156. read_body_length=auto
  157. });
  158. %% Request body, auto mode, body buffered or complete.
  159. info(StreamID, Info={read_body, Ref, auto, infinity}, State=#state{pid=Pid,
  160. read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) ->
  161. send_request_body(Pid, Ref, IsFin, BodyLen, Buffer),
  162. do_info(StreamID, Info, [{flow, byte_size(Buffer)}],
  163. State#state{read_body_buffer= <<>>});
  164. %% Request body, body buffered large enough or complete.
  165. %%
  166. %% We do not send a 100 continue response if the client
  167. %% already started sending the body.
  168. info(StreamID, Info={read_body, Ref, Length, _}, State=#state{pid=Pid,
  169. read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen})
  170. when IsFin =:= fin; byte_size(Buffer) >= Length ->
  171. send_request_body(Pid, Ref, IsFin, BodyLen, Buffer),
  172. do_info(StreamID, Info, [], State#state{read_body_buffer= <<>>});
  173. %% Request body, not enough to send yet.
  174. info(StreamID, Info={read_body, Ref, Length, Period}, State=#state{expect=Expect}) ->
  175. Commands = case Expect of
  176. continue -> [{inform, 100, #{}}, {flow, Length}];
  177. undefined -> [{flow, Length}]
  178. end,
  179. %% @todo Handle the case where Period =:= infinity.
  180. TRef = erlang:send_after(Period, self(), {{self(), StreamID}, {read_body_timeout, Ref}}),
  181. do_info(StreamID, Info, Commands, State#state{
  182. read_body_ref=Ref,
  183. read_body_timer_ref=TRef,
  184. read_body_length=Length
  185. });
  186. %% Request body reading timeout; send what we got.
  187. info(StreamID, Info={read_body_timeout, Ref}, State=#state{pid=Pid, read_body_ref=Ref,
  188. read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) ->
  189. send_request_body(Pid, Ref, IsFin, BodyLen, Buffer),
  190. do_info(StreamID, Info, [], State#state{
  191. read_body_ref=undefined,
  192. read_body_timer_ref=undefined,
  193. read_body_buffer= <<>>
  194. });
  195. info(StreamID, Info={read_body_timeout, _}, State) ->
  196. do_info(StreamID, Info, [], State);
  197. %% Response.
  198. %%
  199. %% We reset the expect field when a 100 continue response
  200. %% is sent or when any final response is sent.
  201. info(StreamID, Inform={inform, Status, _}, State0) ->
  202. State = case cow_http:status_to_integer(Status) of
  203. 100 -> State0#state{expect=undefined};
  204. _ -> State0
  205. end,
  206. do_info(StreamID, Inform, [Inform], State);
  207. info(StreamID, Response={response, _, _, _}, State) ->
  208. do_info(StreamID, Response, [Response], State#state{expect=undefined});
  209. info(StreamID, Headers={headers, _, _}, State) ->
  210. do_info(StreamID, Headers, [Headers], State#state{expect=undefined});
  211. %% Sending data involves the data message, the stream_buffer_full alarm
  212. %% and the connection_buffer_full alarm. We stop sending acks when an alarm is on.
  213. info(StreamID, Data={data, _, _}, State0=#state{pid=Pid, stream_body_status=Status}) ->
  214. State = case Status of
  215. normal ->
  216. Pid ! {data_ack, self()},
  217. State0;
  218. blocking ->
  219. State0#state{stream_body_status=blocked};
  220. blocked ->
  221. State0
  222. end,
  223. do_info(StreamID, Data, [Data], State);
  224. info(StreamID, Alarm={alarm, Name, on}, State)
  225. when Name =:= connection_buffer_full; Name =:= stream_buffer_full ->
  226. do_info(StreamID, Alarm, [], State#state{stream_body_status=blocking});
  227. info(StreamID, Alarm={alarm, Name, off}, State=#state{pid=Pid, stream_body_status=Status})
  228. when Name =:= connection_buffer_full; Name =:= stream_buffer_full ->
  229. _ = case Status of
  230. normal -> ok;
  231. blocking -> ok;
  232. blocked -> Pid ! {data_ack, self()}
  233. end,
  234. do_info(StreamID, Alarm, [], State#state{stream_body_status=normal});
  235. info(StreamID, Trailers={trailers, _}, State) ->
  236. do_info(StreamID, Trailers, [Trailers], State);
  237. info(StreamID, Push={push, _, _, _, _, _, _, _}, State) ->
  238. do_info(StreamID, Push, [Push], State);
  239. info(StreamID, SwitchProtocol={switch_protocol, _, _, _}, State) ->
  240. do_info(StreamID, SwitchProtocol, [SwitchProtocol], State#state{expect=undefined});
  241. %% Convert the set_options message to a command.
  242. info(StreamID, SetOptions={set_options, _}, State) ->
  243. do_info(StreamID, SetOptions, [SetOptions], State);
  244. %% Unknown message, either stray or meant for a handler down the line.
  245. info(StreamID, Info, State) ->
  246. do_info(StreamID, Info, [], State).
  247. do_info(StreamID, Info, Commands1, State0=#state{next=Next0}) ->
  248. {Commands2, Next} = cowboy_stream:info(StreamID, Info, Next0),
  249. {Commands1 ++ Commands2, State0#state{next=Next}}.
  250. -spec terminate(cowboy_stream:streamid(), cowboy_stream:reason(), #state{}) -> ok.
  251. terminate(StreamID, Reason, #state{next=Next}) ->
  252. cowboy_stream:terminate(StreamID, Reason, Next).
  253. -spec early_error(cowboy_stream:streamid(), cowboy_stream:reason(),
  254. cowboy_stream:partial_req(), Resp, cowboy:opts()) -> Resp
  255. when Resp::cowboy_stream:resp_command().
  256. early_error(StreamID, Reason, PartialReq, Resp, Opts) ->
  257. cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts).
  258. send_request_body(Pid, Ref, nofin, _, Data) ->
  259. Pid ! {request_body, Ref, nofin, Data},
  260. ok;
  261. send_request_body(Pid, Ref, fin, BodyLen, Data) ->
  262. Pid ! {request_body, Ref, fin, BodyLen, Data},
  263. ok.
  264. %% Request process.
  265. %% We catch all exceptions in order to add the stacktrace to
  266. %% the exit reason as it is not propagated by proc_lib otherwise
  267. %% and therefore not present in the 'EXIT' message. We want
  268. %% the stacktrace in order to simplify debugging of errors.
  269. %%
  270. %% This + the behavior in proc_lib means that we will get a
  271. %% {Reason, Stacktrace} tuple for every exceptions, instead of
  272. %% just for errors and throws.
  273. %%
  274. %% @todo Better spec.
  275. -spec request_process(_, _, _) -> _.
  276. request_process(Req, Env, Middlewares) ->
  277. OTP = erlang:system_info(otp_release),
  278. try
  279. execute(Req, Env, Middlewares)
  280. catch
  281. exit:Reason ->
  282. Stacktrace = erlang:get_stacktrace(),
  283. erlang:raise(exit, {Reason, Stacktrace}, Stacktrace);
  284. %% OTP 19 does not propagate any exception stacktraces,
  285. %% we therefore add it for every class of exception.
  286. _:Reason when OTP =:= "19" ->
  287. Stacktrace = erlang:get_stacktrace(),
  288. erlang:raise(exit, {Reason, Stacktrace}, Stacktrace);
  289. %% @todo I don't think this clause is necessary.
  290. Class:Reason ->
  291. erlang:raise(Class, Reason, erlang:get_stacktrace())
  292. end.
  293. %% @todo
  294. %-spec execute(cowboy_req:req(), #state{}, cowboy_middleware:env(), [module()])
  295. % -> ok.
  296. -spec execute(_, _, _) -> _.
  297. execute(_, _, []) ->
  298. ok; %% @todo Maybe error reason should differ here and there.
  299. execute(Req, Env, [Middleware|Tail]) ->
  300. case Middleware:execute(Req, Env) of
  301. {ok, Req2, Env2} ->
  302. execute(Req2, Env2, Tail);
  303. {suspend, Module, Function, Args} ->
  304. proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module, Function, Args]);
  305. {stop, _Req2} ->
  306. ok %% @todo Maybe error reason should differ here and there.
  307. end.
  308. -spec resume(cowboy_middleware:env(), [module()],
  309. module(), module(), [any()]) -> ok.
  310. resume(Env, Tail, Module, Function, Args) ->
  311. case apply(Module, Function, Args) of
  312. {ok, Req2, Env2} ->
  313. execute(Req2, Env2, Tail);
  314. {suspend, Module2, Function2, Args2} ->
  315. proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module2, Function2, Args2]);
  316. {stop, _Req2} ->
  317. ok %% @todo Maybe error reason should differ here and there.
  318. end.