cowboy_stream_h.erl 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. %% Copyright (c) 2016-2017, Loïc Hoguin <essen@ninenines.eu>
  2. %%
  3. %% Permission to use, copy, modify, and/or distribute this software for any
  4. %% purpose with or without fee is hereby granted, provided that the above
  5. %% copyright notice and this permission notice appear in all copies.
  6. %%
  7. %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  8. %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  9. %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  10. %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  11. %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  12. %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  13. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  14. -module(cowboy_stream_h).
  15. -behavior(cowboy_stream).
  16. %% @todo Maybe have a callback for the type of process this is, worker or supervisor.
  17. -export([init/3]).
  18. -export([data/4]).
  19. -export([info/3]).
  20. -export([terminate/3]).
  21. -export([proc_lib_hack/3]).
  22. -export([execute/3]).
  23. -export([resume/5]).
  24. %% @todo Need to call subsequent handlers.
  25. -record(state, {
  26. ref = undefined :: ranch:ref(),
  27. pid = undefined :: pid(),
  28. read_body_ref = undefined :: reference() | undefined,
  29. read_body_timer_ref = undefined :: reference() | undefined,
  30. read_body_length = 0 :: non_neg_integer() | infinity,
  31. read_body_is_fin = nofin :: nofin | {fin, non_neg_integer()},
  32. read_body_buffer = <<>> :: binary()
  33. }).
  34. %% @todo For shutting down children we need to have a timeout before we terminate
  35. %% the stream like supervisors do. So here just send a message to yourself first,
  36. %% and then decide what to do when receiving this message.
  37. -spec init(cowboy_stream:streamid(), cowboy_req:req(), cowboy:opts())
  38. -> {[{spawn, pid(), timeout()}], #state{}}.
  39. init(_StreamID, Req=#{ref := Ref}, Opts) ->
  40. Env = maps:get(env, Opts, #{}),
  41. Middlewares = maps:get(middlewares, Opts, [cowboy_router, cowboy_handler]),
  42. Shutdown = maps:get(shutdown, Opts, 5000),
  43. Pid = proc_lib:spawn_link(?MODULE, proc_lib_hack, [Req, Env, Middlewares]),
  44. {[{spawn, Pid, Shutdown}], #state{ref=Ref, pid=Pid}}.
  45. %% If we receive data and stream is waiting for data:
  46. %% If we accumulated enough data or IsFin=fin, send it.
  47. %% If not, buffer it.
  48. %% If not, buffer it.
  49. -spec data(cowboy_stream:streamid(), cowboy_stream:fin(), cowboy_req:resp_body(), State)
  50. -> {cowboy_stream:commands(), State} when State::#state{}.
  51. data(_StreamID, IsFin, Data, State=#state{read_body_ref=undefined, read_body_buffer=Buffer}) ->
  52. {[], State#state{read_body_is_fin=IsFin, read_body_buffer= << Buffer/binary, Data/binary >>}};
  53. data(_StreamID, nofin, Data, State=#state{read_body_length=Length, read_body_buffer=Buffer}) when byte_size(Data) + byte_size(Buffer) < Length ->
  54. {[], State#state{read_body_buffer= << Buffer/binary, Data/binary >>}};
  55. data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref,
  56. read_body_timer_ref=TRef, read_body_buffer=Buffer}) ->
  57. ok = erlang:cancel_timer(TRef, [{async, true}, {info, false}]),
  58. Pid ! {request_body, Ref, IsFin, << Buffer/binary, Data/binary >>},
  59. {[], State#state{read_body_ref=undefined, read_body_timer_ref=undefined, read_body_buffer= <<>>}}.
  60. -spec info(cowboy_stream:streamid(), any(), State)
  61. -> {cowboy_stream:commands(), State} when State::#state{}.
  62. info(_StreamID, {'EXIT', Pid, normal}, State=#state{pid=Pid}) ->
  63. %% @todo Do we even reach this clause?
  64. {[stop], State};
  65. info(_StreamID, {'EXIT', Pid, {_Reason, [_, {cow_http_hd, _, _, _}|_]}}, State=#state{pid=Pid}) ->
  66. %% @todo Have an option to enable/disable this specific crash report?
  67. %%report_crash(Ref, StreamID, Pid, Reason, Stacktrace),
  68. %% @todo Headers? Details in body? More stuff in debug only?
  69. {[{error_response, 400, #{}, <<>>}, stop], State};
  70. info(StreamID, Exit = {'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref, pid=Pid}) ->
  71. report_crash(Ref, StreamID, Pid, Reason, Stacktrace),
  72. {[
  73. {error_response, 500, #{<<"content-length">> => <<"0">>}, <<>>},
  74. {internal_error, Exit, 'Stream process crashed.'}
  75. ], State};
  76. %% Request body, no body buffer but IsFin=fin.
  77. %info(_StreamID, {read_body, Ref, _, _}, State=#state{pid=Pid, read_body_is_fin=fin, read_body_buffer= <<>>}) ->
  78. % Pid ! {request_body, Ref, fin, <<>>},
  79. % {[], State};
  80. %% Request body, body buffered large enough or complete.
  81. info(_StreamID, {read_body, Ref, Length, _},
  82. State=#state{pid=Pid, read_body_is_fin=IsFin, read_body_buffer=Data})
  83. when element(1, IsFin) =:= fin; byte_size(Data) >= Length ->
  84. Pid ! {request_body, Ref, IsFin, Data},
  85. {[], State#state{read_body_buffer= <<>>}};
  86. %% Request body, not enough to send yet.
  87. info(StreamID, {read_body, Ref, Length, Period}, State) ->
  88. TRef = erlang:send_after(Period, self(), {{self(), StreamID}, {read_body_timeout, Ref}}),
  89. {[{flow, Length}], State#state{read_body_ref=Ref, read_body_timer_ref=TRef, read_body_length=Length}};
  90. %% Request body reading timeout; send what we got.
  91. info(_StreamID, {read_body_timeout, Ref}, State=#state{pid=Pid, read_body_ref=Ref,
  92. read_body_is_fin=IsFin, read_body_buffer=Buffer}) ->
  93. Pid ! {request_body, Ref, IsFin, Buffer},
  94. {[], State#state{read_body_ref=undefined, read_body_timer_ref=undefined, read_body_buffer= <<>>}};
  95. info(_StreamID, {read_body_timeout, _}, State) ->
  96. {[], State};
  97. %% Response.
  98. info(_StreamID, Response = {response, _, _, _}, State) ->
  99. {[Response], State};
  100. info(_StreamID, Headers = {headers, _, _}, State) ->
  101. {[Headers], State};
  102. info(_StreamID, Data = {data, _, _}, State) ->
  103. {[Data], State};
  104. info(_StreamID, Push = {push, _, _, _, _, _, _, _}, State) ->
  105. {[Push], State};
  106. info(_StreamID, SwitchProtocol = {switch_protocol, _, _, _}, State) ->
  107. {[SwitchProtocol], State};
  108. %% Stray message.
  109. info(_StreamID, _Info, State) ->
  110. %% @todo Error report.
  111. %% @todo Cleanup if no reply was sent when stream ends.
  112. {[], State}.
  113. -spec terminate(cowboy_stream:streamid(), cowboy_stream:reason(), #state{}) -> ok.
  114. terminate(_StreamID, _Reason, _State) ->
  115. ok.
  116. %% We use ~999999p here instead of ~w because the latter doesn't
  117. %% support printable strings.
  118. report_crash(_, _, _, normal, _) ->
  119. ok;
  120. report_crash(_, _, _, shutdown, _) ->
  121. ok;
  122. report_crash(_, _, _, {shutdown, _}, _) ->
  123. ok;
  124. report_crash(Ref, StreamID, Pid, Reason, Stacktrace) ->
  125. error_logger:error_msg(
  126. "Ranch listener ~p, connection process ~p, stream ~p "
  127. "had its request process ~p exit with reason "
  128. "~999999p and stacktrace ~999999p~n",
  129. [Ref, self(), StreamID, Pid, Reason, Stacktrace]).
  130. %% Request process.
  131. %% @todo This should wrap with try/catch to get the full error
  132. %% in the stream handler. Only then can we decide what to do
  133. %% about it. This means that we should remove any other try/catch
  134. %% in the request process.
  135. %% This hack is necessary because proc_lib does not propagate
  136. %% stacktraces by default. This is ugly because we end up
  137. %% having two try/catch instead of one (the one in proc_lib),
  138. %% just to add the stacktrace information.
  139. %%
  140. %% @todo Remove whenever proc_lib propagates stacktraces.
  141. -spec proc_lib_hack(_, _, _) -> _.
  142. proc_lib_hack(Req, Env, Middlewares) ->
  143. try
  144. execute(Req, Env, Middlewares)
  145. catch
  146. _:Reason when element(1, Reason) =:= cowboy_handler ->
  147. exit(Reason);
  148. _:Reason ->
  149. exit({Reason, erlang:get_stacktrace()})
  150. end.
  151. %% @todo
  152. %-spec execute(cowboy_req:req(), #state{}, cowboy_middleware:env(), [module()])
  153. % -> ok.
  154. -spec execute(_, _, _) -> _.
  155. execute(_, _, []) ->
  156. ok; %% @todo Maybe error reason should differ here and there.
  157. execute(Req, Env, [Middleware|Tail]) ->
  158. case Middleware:execute(Req, Env) of
  159. {ok, Req2, Env2} ->
  160. execute(Req2, Env2, Tail);
  161. {suspend, Module, Function, Args} ->
  162. proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module, Function, Args]);
  163. {stop, _Req2} ->
  164. ok %% @todo Maybe error reason should differ here and there.
  165. end.
  166. -spec resume(cowboy_middleware:env(), [module()],
  167. module(), module(), [any()]) -> ok.
  168. resume(Env, Tail, Module, Function, Args) ->
  169. case apply(Module, Function, Args) of
  170. {ok, Req2, Env2} ->
  171. execute(Req2, Env2, Tail);
  172. {suspend, Module2, Function2, Args2} ->
  173. proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module2, Function2, Args2]);
  174. {stop, _Req2} ->
  175. ok %% @todo Maybe error reason should differ here and there.
  176. end.