cowboy_stream_h.erl 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. %% Copyright (c) 2016-2017, Loïc Hoguin <essen@ninenines.eu>
  2. %%
  3. %% Permission to use, copy, modify, and/or distribute this software for any
  4. %% purpose with or without fee is hereby granted, provided that the above
  5. %% copyright notice and this permission notice appear in all copies.
  6. %%
  7. %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  8. %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  9. %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  10. %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  11. %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  12. %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  13. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  14. -module(cowboy_stream_h).
  15. -behavior(cowboy_stream).
  16. -export([init/3]).
  17. -export([data/4]).
  18. -export([info/3]).
  19. -export([terminate/3]).
  20. -export([early_error/5]).
  21. -export([request_process/3]).
  22. -export([execute/3]).
  23. -export([resume/5]).
  24. %% @todo Need to call subsequent handlers.
  25. -record(state, {
  26. ref = undefined :: ranch:ref(),
  27. pid = undefined :: pid(),
  28. expect = undefined :: undefined | continue,
  29. read_body_ref = undefined :: reference() | undefined,
  30. read_body_timer_ref = undefined :: reference() | undefined,
  31. read_body_length = 0 :: non_neg_integer() | infinity,
  32. read_body_is_fin = nofin :: nofin | {fin, non_neg_integer()},
  33. read_body_buffer = <<>> :: binary(),
  34. body_length = 0 :: non_neg_integer()
  35. }).
  36. %% @todo For shutting down children we need to have a timeout before we terminate
  37. %% the stream like supervisors do. So here just send a message to yourself first,
  38. %% and then decide what to do when receiving this message.
  39. -spec init(cowboy_stream:streamid(), cowboy_req:req(), cowboy:opts())
  40. -> {[{spawn, pid(), timeout()}], #state{}}.
  41. init(_StreamID, Req=#{ref := Ref}, Opts) ->
  42. Env = maps:get(env, Opts, #{}),
  43. Middlewares = maps:get(middlewares, Opts, [cowboy_router, cowboy_handler]),
  44. Shutdown = maps:get(shutdown_timeout, Opts, 5000),
  45. Pid = proc_lib:spawn_link(?MODULE, request_process, [Req, Env, Middlewares]),
  46. Expect = expect(Req),
  47. {[{spawn, Pid, Shutdown}], #state{ref=Ref, pid=Pid, expect=Expect}}.
  48. %% Ignore the expect header in HTTP/1.0.
  49. expect(#{version := 'HTTP/1.0'}) ->
  50. undefined;
  51. expect(Req) ->
  52. try cowboy_req:parse_header(<<"expect">>, Req) of
  53. Expect ->
  54. Expect
  55. catch _:_ ->
  56. undefined
  57. end.
  58. %% If we receive data and stream is waiting for data:
  59. %% If we accumulated enough data or IsFin=fin, send it.
  60. %% If not, buffer it.
  61. %% If not, buffer it.
  62. %%
  63. %% We always reset the expect field when we receive data,
  64. %% since the client started sending the request body before
  65. %% we could send a 100 continue response.
  66. -spec data(cowboy_stream:streamid(), cowboy_stream:fin(), cowboy_req:resp_body(), State)
  67. -> {cowboy_stream:commands(), State} when State::#state{}.
  68. data(_StreamID, IsFin, Data, State=#state{
  69. read_body_ref=undefined, read_body_buffer=Buffer, body_length=BodyLen}) ->
  70. {[], State#state{
  71. expect=undefined,
  72. read_body_is_fin=IsFin,
  73. read_body_buffer= << Buffer/binary, Data/binary >>,
  74. body_length=BodyLen + byte_size(Data)}};
  75. data(_StreamID, nofin, Data, State=#state{
  76. read_body_length=ReadLen, read_body_buffer=Buffer, body_length=BodyLen})
  77. when byte_size(Data) + byte_size(Buffer) < ReadLen ->
  78. {[], State#state{
  79. expect=undefined,
  80. read_body_buffer= << Buffer/binary, Data/binary >>,
  81. body_length=BodyLen + byte_size(Data)}};
  82. data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref,
  83. read_body_timer_ref=TRef, read_body_buffer=Buffer, body_length=BodyLen0}) ->
  84. BodyLen = BodyLen0 + byte_size(Data),
  85. ok = erlang:cancel_timer(TRef, [{async, true}, {info, false}]),
  86. send_request_body(Pid, Ref, IsFin, BodyLen, <<Buffer/binary, Data/binary>>),
  87. {[], State#state{
  88. expect=undefined,
  89. read_body_ref=undefined,
  90. read_body_timer_ref=undefined,
  91. read_body_buffer= <<>>,
  92. body_length=BodyLen}}.
  93. -spec info(cowboy_stream:streamid(), any(), State)
  94. -> {cowboy_stream:commands(), State} when State::#state{}.
  95. info(_StreamID, {'EXIT', Pid, normal}, State=#state{pid=Pid}) ->
  96. {[stop], State};
  97. info(_StreamID, {'EXIT', Pid, {{request_error, Reason, _HumanReadable}, _}}, State=#state{pid=Pid}) ->
  98. %% @todo Optionally report the crash to help debugging.
  99. %%report_crash(Ref, StreamID, Pid, Reason, Stacktrace),
  100. Status = case Reason of
  101. timeout -> 408;
  102. payload_too_large -> 413;
  103. _ -> 400
  104. end,
  105. %% @todo Headers? Details in body? More stuff in debug only?
  106. {[{error_response, Status, #{<<"content-length">> => <<"0">>}, <<>>}, stop], State};
  107. info(StreamID, Exit = {'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref, pid=Pid}) ->
  108. report_crash(Ref, StreamID, Pid, Reason, Stacktrace),
  109. {[
  110. {error_response, 500, #{<<"content-length">> => <<"0">>}, <<>>},
  111. {internal_error, Exit, 'Stream process crashed.'}
  112. ], State};
  113. %% Request body, body buffered large enough or complete.
  114. %%
  115. %% We do not send a 100 continue response if the client
  116. %% already started sending the body.
  117. info(_StreamID, {read_body, Ref, Length, _}, State=#state{pid=Pid,
  118. read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen})
  119. when IsFin =:= fin; byte_size(Buffer) >= Length ->
  120. send_request_body(Pid, Ref, IsFin, BodyLen, Buffer),
  121. {[], State#state{read_body_buffer= <<>>}};
  122. %% Request body, not enough to send yet.
  123. info(StreamID, {read_body, Ref, Length, Period}, State=#state{expect=Expect}) ->
  124. Commands = case Expect of
  125. continue -> [{inform, 100, #{}}, {flow, Length}];
  126. undefined -> [{flow, Length}]
  127. end,
  128. TRef = erlang:send_after(Period, self(), {{self(), StreamID}, {read_body_timeout, Ref}}),
  129. {Commands, State#state{
  130. read_body_ref=Ref,
  131. read_body_timer_ref=TRef,
  132. read_body_length=Length}};
  133. %% Request body reading timeout; send what we got.
  134. info(_StreamID, {read_body_timeout, Ref}, State=#state{pid=Pid, read_body_ref=Ref,
  135. read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) ->
  136. send_request_body(Pid, Ref, IsFin, BodyLen, Buffer),
  137. {[], State#state{read_body_ref=undefined, read_body_timer_ref=undefined, read_body_buffer= <<>>}};
  138. info(_StreamID, {read_body_timeout, _}, State) ->
  139. {[], State};
  140. %% Response.
  141. %%
  142. %% We reset the expect field when a 100 continue response
  143. %% is sent or when any final response is sent.
  144. info(_StreamID, Inform = {inform, Status, _}, State0) ->
  145. State = case Status of
  146. 100 -> State0#state{expect=undefined};
  147. <<"100">> -> State0#state{expect=undefined};
  148. <<"100 ", _/bits>> -> State0#state{expect=undefined};
  149. _ -> State0
  150. end,
  151. {[Inform], State};
  152. info(_StreamID, Response = {response, _, _, _}, State) ->
  153. {[Response], State#state{expect=undefined}};
  154. info(_StreamID, Headers = {headers, _, _}, State) ->
  155. {[Headers], State#state{expect=undefined}};
  156. info(_StreamID, Data = {data, _, _}, State) ->
  157. {[Data], State};
  158. info(_StreamID, Trailers = {trailers, _}, State) ->
  159. {[Trailers], State};
  160. info(_StreamID, Push = {push, _, _, _, _, _, _, _}, State) ->
  161. {[Push], State};
  162. info(_StreamID, SwitchProtocol = {switch_protocol, _, _, _}, State) ->
  163. {[SwitchProtocol], State#state{expect=undefined}};
  164. %% Stray message.
  165. info(_StreamID, _Info, State) ->
  166. {[], State}.
  167. -spec terminate(cowboy_stream:streamid(), cowboy_stream:reason(), #state{}) -> ok.
  168. terminate(_StreamID, _Reason, _State) ->
  169. ok.
  170. -spec early_error(cowboy_stream:streamid(), cowboy_stream:reason(),
  171. cowboy_stream:partial_req(), Resp, cowboy:opts()) -> Resp
  172. when Resp::cowboy_stream:resp_command().
  173. early_error(StreamID, Reason, PartialReq, Resp, Opts) ->
  174. cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts).
  175. send_request_body(Pid, Ref, nofin, _, Data) ->
  176. Pid ! {request_body, Ref, nofin, Data},
  177. ok;
  178. send_request_body(Pid, Ref, fin, BodyLen, Data) ->
  179. Pid ! {request_body, Ref, fin, BodyLen, Data},
  180. ok.
  181. %% We use ~999999p here instead of ~w because the latter doesn't
  182. %% support printable strings.
  183. report_crash(_, _, _, normal, _) ->
  184. ok;
  185. report_crash(_, _, _, shutdown, _) ->
  186. ok;
  187. report_crash(_, _, _, {shutdown, _}, _) ->
  188. ok;
  189. report_crash(Ref, StreamID, Pid, Reason, Stacktrace) ->
  190. error_logger:error_msg(
  191. "Ranch listener ~p, connection process ~p, stream ~p "
  192. "had its request process ~p exit with reason "
  193. "~999999p and stacktrace ~999999p~n",
  194. [Ref, self(), StreamID, Pid, Reason, Stacktrace]).
  195. %% Request process.
  196. %% We catch all exceptions in order to add the stacktrace to
  197. %% the exit reason as it is not propagated by proc_lib otherwise
  198. %% and therefore not present in the 'EXIT' message. We want
  199. %% the stacktrace in order to simplify debugging of errors.
  200. %%
  201. %% This + the behavior in proc_lib means that we will get a
  202. %% {Reason, Stacktrace} tuple for every exceptions, instead of
  203. %% just for errors and throws.
  204. %%
  205. %% @todo Better spec.
  206. -spec request_process(_, _, _) -> _.
  207. request_process(Req, Env, Middlewares) ->
  208. OTP = erlang:system_info(otp_release),
  209. try
  210. execute(Req, Env, Middlewares)
  211. catch
  212. exit:Reason ->
  213. Stacktrace = erlang:get_stacktrace(),
  214. erlang:raise(exit, {Reason, Stacktrace}, Stacktrace);
  215. %% OTP 19 does not propagate any exception stacktraces,
  216. %% we therefore add it for every class of exception.
  217. _:Reason when OTP =:= "19" ->
  218. Stacktrace = erlang:get_stacktrace(),
  219. erlang:raise(exit, {Reason, Stacktrace}, Stacktrace);
  220. %% @todo I don't think this clause is necessary.
  221. Class:Reason ->
  222. erlang:raise(Class, Reason, erlang:get_stacktrace())
  223. end.
  224. %% @todo
  225. %-spec execute(cowboy_req:req(), #state{}, cowboy_middleware:env(), [module()])
  226. % -> ok.
  227. -spec execute(_, _, _) -> _.
  228. execute(_, _, []) ->
  229. ok; %% @todo Maybe error reason should differ here and there.
  230. execute(Req, Env, [Middleware|Tail]) ->
  231. case Middleware:execute(Req, Env) of
  232. {ok, Req2, Env2} ->
  233. execute(Req2, Env2, Tail);
  234. {suspend, Module, Function, Args} ->
  235. proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module, Function, Args]);
  236. {stop, _Req2} ->
  237. ok %% @todo Maybe error reason should differ here and there.
  238. end.
  239. -spec resume(cowboy_middleware:env(), [module()],
  240. module(), module(), [any()]) -> ok.
  241. resume(Env, Tail, Module, Function, Args) ->
  242. case apply(Module, Function, Args) of
  243. {ok, Req2, Env2} ->
  244. execute(Req2, Env2, Tail);
  245. {suspend, Module2, Function2, Args2} ->
  246. proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module2, Function2, Args2]);
  247. {stop, _Req2} ->
  248. ok %% @todo Maybe error reason should differ here and there.
  249. end.