cowboy_stream_h.erl 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. %% Copyright (c) 2016, Loïc Hoguin <essen@ninenines.eu>
  2. %%
  3. %% Permission to use, copy, modify, and/or distribute this software for any
  4. %% purpose with or without fee is hereby granted, provided that the above
  5. %% copyright notice and this permission notice appear in all copies.
  6. %%
  7. %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  8. %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  9. %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  10. %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  11. %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  12. %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  13. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  14. -module(cowboy_stream_h).
  15. %% @todo -behaviour(cowboy_stream).
  16. %% @todo Maybe have a callback for the type of process this is, worker or supervisor.
  17. -export([init/3]).
  18. -export([data/4]).
  19. -export([info/3]).
  20. -export([terminate/3]).
  21. -export([execute/3]).
  22. -export([resume/5]).
  23. -record(state, {
  24. pid = undefined :: pid(),
  25. read_body_ref = undefined :: reference(),
  26. read_body_length = 0 :: non_neg_integer(),
  27. read_body_is_fin = nofin :: nofin | fin,
  28. read_body_buffer = <<>> :: binary()
  29. }).
  30. %% @todo For shutting down children we need to have a timeout before we terminate
  31. %% the stream like supervisors do. So here just send a message to yourself first,
  32. %% and then decide what to do when receiving this message.
  33. %% @todo proper specs
  34. -spec init(_,_,_) -> _.
  35. init(_StreamID, Req, Opts) ->
  36. Env = maps:get(env, Opts, #{}),
  37. Middlewares = maps:get(middlewares, Opts, [cowboy_router, cowboy_handler]),
  38. Shutdown = maps:get(shutdown, Opts, 5000),
  39. Pid = proc_lib:spawn_link(?MODULE, execute, [Req, Env, Middlewares]),
  40. {[{spawn, Pid, Shutdown}], #state{pid=Pid}}.
  41. %% If we receive data and stream is waiting for data:
  42. %% If we accumulated enough data or IsFin=fin, send it.
  43. %% If not, buffer it.
  44. %% If not, buffer it.
  45. %% @todo proper specs
  46. -spec data(_,_,_,_) -> _.
  47. data(_StreamID, IsFin, Data, State=#state{read_body_ref=undefined, read_body_buffer=Buffer}) ->
  48. {[], State#state{read_body_is_fin=IsFin, read_body_buffer= << Buffer/binary, Data/binary >>}};
  49. data(_StreamID, nofin, Data, State=#state{read_body_length=Length, read_body_buffer=Buffer}) when byte_size(Data) + byte_size(Buffer) < Length ->
  50. {[], State#state{read_body_buffer= << Buffer/binary, Data/binary >>}};
  51. data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref, read_body_buffer=Buffer}) ->
  52. Pid ! {request_body, Ref, IsFin, << Buffer/binary, Data/binary >>},
  53. {[], State#state{read_body_ref=undefined, read_body_buffer= <<>>}}.
  54. %% @todo proper specs
  55. -spec info(_,_,_) -> _.
  56. info(_StreamID, {'EXIT', Pid, normal}, State=#state{pid=Pid}) ->
  57. {[stop], State};
  58. info(_StreamID, Reason = {'EXIT', Pid, _}, State=#state{pid=Pid}) ->
  59. {[{internal_error, Reason, 'Stream process crashed.'}], State};
  60. %% Request body, no body buffer but IsFin=fin.
  61. info(_StreamID, {read_body, Ref, _}, State=#state{pid=Pid, read_body_is_fin=fin, read_body_buffer= <<>>}) ->
  62. Pid ! {request_body, Ref, fin, <<>>},
  63. {[], State};
  64. %% Request body, body buffered large enough or complete.
  65. info(_StreamID, {read_body, Ref, Length}, State=#state{pid=Pid, read_body_is_fin=IsFin, read_body_buffer=Data})
  66. when element(1, IsFin) =:= fin; byte_size(Data) >= Length ->
  67. Pid ! {request_body, Ref, IsFin, Data},
  68. {[], State#state{read_body_buffer= <<>>}};
  69. %% Request body, not enough to send yet.
  70. info(_StreamID, {read_body, Ref, Length}, State) ->
  71. {[{flow, Length}], State#state{read_body_ref=Ref, read_body_length=Length}};
  72. %% Response.
  73. info(_StreamID, Response = {response, _, _, _}, State) ->
  74. {[Response], State};
  75. info(_StreamID, Headers = {headers, _, _}, State) ->
  76. {[Headers], State};
  77. info(_StreamID, Data = {data, _, _}, State) ->
  78. {[Data], State};
  79. info(_StreamID, SwitchProtocol = {switch_protocol, _, _, _}, State) ->
  80. {[SwitchProtocol], State};
  81. %% Stray message.
  82. info(_StreamID, _Msg, State) ->
  83. %% @todo Cleanup if no reply was sent when stream ends.
  84. {[], State}.
  85. %% @todo proper specs
  86. -spec terminate(_,_,_) -> _.
  87. terminate(_StreamID, _Reason, _State) ->
  88. ok.
  89. %% Request process.
  90. %% @todo
  91. %-spec execute(cowboy_req:req(), #state{}, cowboy_middleware:env(), [module()])
  92. % -> ok.
  93. -spec execute(_, _, _) -> _.
  94. execute(_, _, []) ->
  95. ok; %% @todo Maybe error reason should differ here and there.
  96. execute(Req, Env, [Middleware|Tail]) ->
  97. case Middleware:execute(Req, Env) of
  98. {ok, Req2, Env2} ->
  99. execute(Req2, Env2, Tail);
  100. {suspend, Module, Function, Args} ->
  101. proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module, Function, Args]);
  102. {stop, _Req2} ->
  103. ok %% @todo Maybe error reason should differ here and there.
  104. end.
  105. -spec resume(cowboy_middleware:env(), [module()],
  106. module(), module(), [any()]) -> ok.
  107. resume(Env, Tail, Module, Function, Args) ->
  108. case apply(Module, Function, Args) of
  109. {ok, Req2, Env2} ->
  110. execute(Req2, Env2, Tail);
  111. {suspend, Module2, Function2, Args2} ->
  112. proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module2, Function2, Args2]);
  113. {stop, _Req2} ->
  114. ok %% @todo Maybe error reason should differ here and there.
  115. end.