cowboy_http.erl 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035
  1. %% Copyright (c) 2016, Loïc Hoguin <essen@ninenines.eu>
  2. %%
  3. %% Permission to use, copy, modify, and/or distribute this software for any
  4. %% purpose with or without fee is hereby granted, provided that the above
  5. %% copyright notice and this permission notice appear in all copies.
  6. %%
  7. %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  8. %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  9. %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  10. %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  11. %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  12. %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  13. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  14. -module(cowboy_http).
  15. -export([init/6]).
  16. -export([system_continue/3]).
  17. -export([system_terminate/4]).
  18. -export([system_code_change/4]).
  19. %% @todo map
  20. -type opts() :: [{compress, boolean()}
  21. | {env, cowboy_middleware:env()}
  22. | {max_empty_lines, non_neg_integer()}
  23. | {max_header_name_length, non_neg_integer()}
  24. | {max_header_value_length, non_neg_integer()}
  25. | {max_headers, non_neg_integer()}
  26. | {max_keepalive, non_neg_integer()}
  27. | {max_request_line_length, non_neg_integer()}
  28. | {middlewares, [module()]}
  29. | {onresponse, cowboy:onresponse_fun()}
  30. | {timeout, timeout()}].
  31. -export_type([opts/0]).
  32. -record(ps_request_line, {
  33. empty_lines = 0 :: non_neg_integer()
  34. }).
  35. -record(ps_header, {
  36. method = undefined :: binary(),
  37. path = undefined :: binary(),
  38. qs = undefined :: binary(),
  39. version = undefined :: cowboy:http_version(),
  40. headers = undefined :: map() | undefined, %% @todo better type than map()
  41. name = undefined :: binary()
  42. }).
  43. %% @todo We need a state where we wait for the stream process to ask for the body.
  44. %% OR DO WE
  45. %% In HTTP/2 we start receiving data before the body asks for it, even if optionally
  46. %% (and by default), so we need to be able to do the same for HTTP/1.1 too. This means
  47. %% that when we receive data (up to a certain limit, we read from the socket and decode.
  48. %% When we reach a limit, we stop reading from the socket momentarily until the stream
  49. %% process asks for more or the stream ends.
  50. %% This means that we need to keep a buffer in the stream handler (until the stream
  51. %% process asks for it). And that we need the body state to indicate how much we have
  52. %% left to read (and stop/start reading from the socket depending on value).
  53. -record(ps_body, {
  54. %% @todo flow
  55. transfer_decode_fun :: fun(), %% @todo better type
  56. transfer_decode_state :: any() %% @todo better type
  57. }).
  58. -record(stream, {
  59. %% Stream identifier.
  60. id = undefined :: cowboy_stream:streamid(),
  61. %% Stream handler state.
  62. state = undefined :: any(),
  63. %% Client HTTP version for this stream.
  64. version = undefined :: cowboy:http_version(),
  65. %% Commands queued.
  66. queue = [] :: [] %% @todo better type
  67. }).
  68. -type stream() :: #stream{}.
  69. -record(state, {
  70. parent :: pid(),
  71. ref :: ranch:ref(),
  72. socket :: inet:socket(),
  73. transport :: module(),
  74. opts = #{} :: map(),
  75. handler :: module(),
  76. timer = undefined :: undefined | reference(),
  77. %% Identifier for the stream currently being read (or waiting to be received).
  78. in_streamid = 1 :: pos_integer(),
  79. %% Parsing state for the current stream or stream-to-be.
  80. in_state = #ps_request_line{} :: #ps_request_line{} | #ps_header{} | #ps_body{},
  81. %% Identifier for the stream currently being written.
  82. %% Note that out_streamid =< in_streamid.
  83. out_streamid = 1 :: pos_integer(),
  84. %% Whether we finished writing data for the current stream.
  85. out_state = wait :: wait | headers | chunked,
  86. %% The connection will be closed after this stream.
  87. last_streamid = undefined :: pos_integer(),
  88. %% Currently active HTTP/1.1 streams. Streams may be initiated either
  89. %% by the client or by the server through PUSH_PROMISE frames.
  90. streams = [] :: [stream()],
  91. %% Children which are in the process of shutting down.
  92. children = [] :: [{pid(), cowboy_stream:streamid(), timeout()}]
  93. %% @todo Automatic compression. (compress option?)
  94. %% @todo onresponse? Equivalent using streams.
  95. }).
  96. -include_lib("cowlib/include/cow_inline.hrl").
  97. -include_lib("cowlib/include/cow_parse.hrl").
  98. -spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts(), module()) -> ok.
  99. init(Parent, Ref, Socket, Transport, Opts, Handler) ->
  100. LastStreamID = maps:get(max_keepalive, Opts, 100),
  101. before_loop(set_request_timeout(#state{
  102. parent=Parent, ref=Ref, socket=Socket,
  103. transport=Transport, opts=Opts, handler=Handler,
  104. last_streamid=LastStreamID}), <<>>).
  105. %% @todo Send a response depending on in_state and whether one was already sent.
  106. %% @todo
  107. %% Timeouts:
  108. %% - waiting for new request (if no stream is currently running)
  109. %% -> request_timeout: for whole request/headers, set at init/when we set ps_request_line{} state
  110. %% - waiting for body (if a stream requested the body to be read)
  111. %% -> read_body_timeout: amount of time we wait without receiving any data when reading the body
  112. %% - if we skip the body, skip only for a specific duration
  113. %% -> skip_body_timeout: also have a skip_body_length
  114. %% - none if we have a stream running and it didn't request the body to be read
  115. %% - global
  116. %% -> inactivity_timeout: max time to wait without anything happening before giving up
  117. before_loop(State=#state{socket=Socket, transport=Transport}, Buffer) ->
  118. %% @todo disable this when we get to the body, until the stream asks for it?
  119. %% Perhaps have a threshold for how much we're willing to read before waiting.
  120. Transport:setopts(Socket, [{active, once}]),
  121. loop(State, Buffer).
  122. loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
  123. handler=_Handler, timer=TimerRef, children=Children}, Buffer) ->
  124. {OK, Closed, Error} = Transport:messages(),
  125. receive
  126. %% Socket messages.
  127. {OK, Socket, Data} ->
  128. parse(<< Buffer/binary, Data/binary >>, State);
  129. {Closed, Socket} ->
  130. terminate(State, {socket_error, closed, 'The socket has been closed.'});
  131. {Error, Socket, Reason} ->
  132. terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
  133. %% Timeouts.
  134. {timeout, TimerRef, Reason} ->
  135. timeout(State, Reason);
  136. {timeout, _, _} ->
  137. loop(State, Buffer);
  138. %% System messages.
  139. {'EXIT', Parent, Reason} ->
  140. exit(Reason);
  141. {system, From, Request} ->
  142. sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer});
  143. %% Messages pertaining to a stream.
  144. {{Pid, StreamID}, Msg} when Pid =:= self() ->
  145. loop(info(State, StreamID, Msg), Buffer);
  146. %% Exit signal from children.
  147. Msg = {'EXIT', Pid, _} ->
  148. loop(down(State, Pid, Msg), Buffer);
  149. %% Calls from supervisor module.
  150. {'$gen_call', {From, Tag}, which_children} ->
  151. Workers = [{?MODULE, Pid, worker, [?MODULE]} || {Pid, _, _} <- Children],
  152. From ! {Tag, Workers},
  153. loop(State, Buffer);
  154. {'$gen_call', {From, Tag}, count_children} ->
  155. NbChildren = length(Children),
  156. Counts = [{specs, 1}, {active, NbChildren},
  157. {supervisors, 0}, {workers, NbChildren}],
  158. From ! {Tag, Counts},
  159. loop(State, Buffer);
  160. {'$gen_call', {From, Tag}, _} ->
  161. From ! {Tag, {error, ?MODULE}},
  162. loop(State, Buffer);
  163. %% Unknown messages.
  164. Msg ->
  165. error_logger:error_msg("Received stray message ~p.", [Msg]),
  166. loop(State, Buffer)
  167. %% @todo Configurable timeout. This should be a global inactivity timeout
  168. %% that triggers when really nothing happens (ie something went really wrong).
  169. after 300000 ->
  170. terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
  171. end.
  172. set_request_timeout(State0=#state{opts=Opts}) ->
  173. State = cancel_request_timeout(State0),
  174. Timeout = maps:get(request_timeout, Opts, 5000),
  175. TimerRef = erlang:start_timer(Timeout, self(), request_timeout),
  176. State#state{timer=TimerRef}.
  177. cancel_request_timeout(State=#state{timer=TimerRef}) ->
  178. ok = case TimerRef of
  179. undefined -> ok;
  180. _ -> erlang:cancel_timer(TimerRef, [{async, true}, {info, false}])
  181. end,
  182. State#state{timer=undefined}.
  183. %% @todo Honestly it would be much better if we didn't enable pipelining yet.
  184. timeout(State=#state{in_state=#ps_request_line{}}, request_timeout) ->
  185. %% @todo If other streams are running, just set the connection to be closed
  186. %% and stop trying to read from the socket?
  187. terminate(State, {connection_error, timeout, 'No request-line received before timeout.'});
  188. timeout(State=#state{socket=Socket, transport=Transport, in_state=#ps_header{}}, request_timeout) ->
  189. %% @todo If other streams are running, maybe wait for their reply before sending 408?
  190. %% -> Definitely. Either way, stop reading from the socket and make that stream the last.
  191. Transport:send(Socket, cow_http:response(408, 'HTTP/1.1', [])),
  192. terminate(State, {connection_error, timeout, 'Request headers not received before timeout.'}).
  193. %% Request-line.
  194. parse(<<>>, State) ->
  195. before_loop(State, <<>>);
  196. parse(Buffer, State=#state{in_state=#ps_request_line{empty_lines=EmptyLines}}) ->
  197. after_parse(parse_request(Buffer, State, EmptyLines));
  198. parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=undefined}}) ->
  199. after_parse(parse_header(Buffer,
  200. State#state{in_state=PS#ps_header{headers=undefined}},
  201. Headers));
  202. parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=Name}}) ->
  203. after_parse(parse_hd_before_value(Buffer,
  204. State#state{in_state=PS#ps_header{headers=undefined, name=undefined}},
  205. Headers, Name));
  206. parse(Buffer, State=#state{in_state=#ps_body{}}) ->
  207. %% @todo We do not want to get the body automatically if the request doesn't ask for it.
  208. %% We may want to get bodies that are below a threshold without waiting, and buffer them
  209. %% until the request asks, though.
  210. %% @todo Transfer-decoding must be done here.
  211. after_parse(parse_body(Buffer, State)).
  212. %% @todo Don't parse if body is finished but request isn't. Let's not parallelize for now.
  213. after_parse({request, Req=#{streamid := StreamID, headers := Headers, version := Version},
  214. State0=#state{handler=Handler, opts=Opts, streams=Streams0}, Buffer}) ->
  215. %% @todo Opts at the end. Maybe pass the same Opts we got?
  216. try Handler:init(StreamID, Req, Opts) of
  217. {Commands, StreamState} ->
  218. Streams = [#stream{id=StreamID, state=StreamState, version=Version}|Streams0],
  219. State = case maybe_req_close(State0, Headers, Version) of
  220. close -> State0#state{streams=Streams, last_streamid=StreamID};
  221. keepalive -> State0#state{streams=Streams}
  222. end,
  223. parse(Buffer, commands(State, StreamID, Commands))
  224. catch Class:Reason ->
  225. error_logger:error_msg("Exception occurred in ~s:init(~p, ~p, ~p) "
  226. "with reason ~p:~p.",
  227. [Handler, StreamID, Req, Opts, Class, Reason]),
  228. ok
  229. %% @todo Status code.
  230. % stream_reset(State, StreamID, {internal_error, {Class, Reason},
  231. % 'Exception occurred in StreamHandler:init/10 call.'}) %% @todo Check final arity.
  232. end;
  233. %% Streams are sequential so the body is always about the last stream created
  234. %% unless that stream has terminated.
  235. after_parse({data, StreamID, IsFin, Data, State=#state{handler=Handler,
  236. streams=Streams0=[Stream=#stream{id=StreamID, state=StreamState0}|_]}, Buffer}) ->
  237. try Handler:data(StreamID, IsFin, Data, StreamState0) of
  238. {Commands, StreamState} ->
  239. Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
  240. Stream#stream{state=StreamState}),
  241. parse(Buffer, commands(State#state{streams=Streams}, StreamID, Commands))
  242. catch Class:Reason ->
  243. error_logger:error_msg("Exception occurred in ~s:data(~p, ~p, ~p, ~p) with reason ~p:~p.",
  244. [Handler, StreamID, IsFin, Data, StreamState0, Class, Reason]),
  245. ok
  246. %% @todo
  247. % stream_reset(State, StreamID, {internal_error, {Class, Reason},
  248. % 'Exception occurred in StreamHandler:data/4 call.'})
  249. end;
  250. %% No corresponding stream, skip.
  251. after_parse({data, _, _, _, State, Buffer}) ->
  252. before_loop(State, Buffer);
  253. after_parse({more, State, Buffer}) ->
  254. before_loop(State, Buffer).
  255. %% Request-line.
  256. -spec parse_request(binary(), #state{}, non_neg_integer()) -> ok.
  257. %% Empty lines must be using \r\n.
  258. parse_request(<< $\n, _/bits >>, State, _) ->
  259. error_terminate(400, State, {connection_error, protocol_error,
  260. ''}); %% @todo
  261. parse_request(<< $\s, _/bits >>, State, _) ->
  262. error_terminate(400, State, {connection_error, protocol_error,
  263. ''}); %% @todo
  264. %% We limit the length of the Request-line to MaxLength to avoid endlessly
  265. %% reading from the socket and eventually crashing.
  266. parse_request(Buffer, State=#state{opts=Opts, in_streamid=InStreamID}, EmptyLines) ->
  267. MaxLength = maps:get(max_request_line_length, Opts, 8000),
  268. MaxEmptyLines = maps:get(max_empty_lines, Opts, 5),
  269. case match_eol(Buffer, 0) of
  270. nomatch when byte_size(Buffer) > MaxLength ->
  271. error_terminate(414, State, {connection_error, limit_reached,
  272. ''}); %% @todo
  273. nomatch ->
  274. {more, State#state{in_state=#ps_request_line{empty_lines=EmptyLines}}, Buffer};
  275. 1 when EmptyLines =:= MaxEmptyLines ->
  276. error_terminate(400, State, {connection_error, limit_reached,
  277. ''}); %% @todo
  278. 1 ->
  279. << _:16, Rest/bits >> = Buffer,
  280. parse_request(Rest, State, EmptyLines + 1);
  281. _ ->
  282. case Buffer of
  283. %% @todo * is only for server-wide OPTIONS request (RFC7230 5.3.4); tests
  284. << "OPTIONS * ", Rest/bits >> ->
  285. parse_version(Rest, State, <<"OPTIONS">>, <<"*">>, <<>>);
  286. % << "CONNECT ", Rest/bits >> ->
  287. % parse_authority( %% @todo
  288. %% Accept direct HTTP/2 only at the beginning of the connection.
  289. << "PRI * HTTP/2.0\r\n", _/bits >> when InStreamID =:= 1 ->
  290. %% @todo Might be worth throwing to get a clean stacktrace.
  291. http2_upgrade(State, Buffer);
  292. _ ->
  293. parse_method(Buffer, State, <<>>,
  294. maps:get(max_method_length, Opts, 32))
  295. end
  296. end.
  297. match_eol(<< $\n, _/bits >>, N) ->
  298. N;
  299. match_eol(<< _, Rest/bits >>, N) ->
  300. match_eol(Rest, N + 1);
  301. match_eol(_, _) ->
  302. nomatch.
  303. parse_method(_, State, _, 0) ->
  304. error_terminate(501, State, {connection_error, limit_reached,
  305. 'The method name is longer than configuration allows. (RFC7230 3.1.1)'});
  306. parse_method(<< C, Rest/bits >>, State, SoFar, Remaining) ->
  307. case C of
  308. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  309. ''}); %% @todo
  310. $\s -> parse_uri(Rest, State, SoFar);
  311. _ when ?IS_TOKEN(C) -> parse_method(Rest, State, << SoFar/binary, C >>, Remaining - 1);
  312. _ -> error_terminate(400, State, {connection_error, protocol_error,
  313. 'The method name must contain only valid token characters. (RFC7230 3.1.1)'})
  314. end.
  315. parse_uri(<< H, T, T, P, "://", Rest/bits >>, State, Method)
  316. when H =:= $h orelse H =:= $H, T =:= $t orelse T =:= $T;
  317. P =:= $p orelse P =:= $P ->
  318. parse_uri_skip_host(Rest, State, Method);
  319. parse_uri(<< H, T, T, P, S, "://", Rest/bits >>, State, Method)
  320. when H =:= $h orelse H =:= $H, T =:= $t orelse T =:= $T;
  321. P =:= $p orelse P =:= $P; S =:= $s orelse S =:= $S ->
  322. parse_uri_skip_host(Rest, State, Method);
  323. parse_uri(<< $/, Rest/bits >>, State, Method) ->
  324. parse_uri_path(Rest, State, Method, << $/ >>);
  325. parse_uri(_, State, _) ->
  326. error_terminate(400, State, {connection_error, protocol_error,
  327. 'Invalid request-line or request-target. (RFC7230 3.1.1, RFC7230 5.3)'}).
  328. parse_uri_skip_host(<< C, Rest/bits >>, State, Method) ->
  329. case C of
  330. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  331. ''}); %% @todo
  332. $/ -> parse_uri_path(Rest, State, Method, <<"/">>);
  333. $\s -> parse_version(Rest, State, Method, <<"/">>, <<>>);
  334. $? -> parse_uri_query(Rest, State, Method, <<"/">>, <<>>);
  335. $# -> skip_uri_fragment(Rest, State, Method, <<"/">>, <<>>);
  336. _ -> parse_uri_skip_host(Rest, State, Method)
  337. end.
  338. parse_uri_path(<< C, Rest/bits >>, State, Method, SoFar) ->
  339. case C of
  340. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  341. ''}); %% @todo
  342. $\s -> parse_version(Rest, State, Method, SoFar, <<>>);
  343. $? -> parse_uri_query(Rest, State, Method, SoFar, <<>>);
  344. $# -> skip_uri_fragment(Rest, State, Method, SoFar, <<>>);
  345. _ -> parse_uri_path(Rest, State, Method, << SoFar/binary, C >>)
  346. end.
  347. parse_uri_query(<< C, Rest/bits >>, State, M, P, SoFar) ->
  348. case C of
  349. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  350. ''}); %% @todo
  351. $\s -> parse_version(Rest, State, M, P, SoFar);
  352. $# -> skip_uri_fragment(Rest, State, M, P, SoFar);
  353. _ -> parse_uri_query(Rest, State, M, P, << SoFar/binary, C >>)
  354. end.
  355. skip_uri_fragment(<< C, Rest/bits >>, State, M, P, Q) ->
  356. case C of
  357. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  358. ''}); %% @todo
  359. $\s -> parse_version(Rest, State, M, P, Q);
  360. _ -> skip_uri_fragment(Rest, State, M, P, Q)
  361. end.
  362. %% @todo Calls to parse_header should update the state.
  363. parse_version(<< "HTTP/1.1\r\n", Rest/bits >>, State, M, P, Q) ->
  364. parse_headers(Rest, State, M, P, Q, 'HTTP/1.1');
  365. parse_version(<< "HTTP/1.0\r\n", Rest/bits >>, State, M, P, Q) ->
  366. parse_headers(Rest, State, M, P, Q, 'HTTP/1.0');
  367. parse_version(<< "HTTP/1.", _, C, _/bits >>, State, _, _, _) when C =:= $\s; C =:= $\t ->
  368. error_terminate(400, State, {connection_error, protocol_error,
  369. 'Whitespace is not allowed after the HTTP version. (RFC7230 3.1.1)'});
  370. parse_version(<< C, _/bits >>, State, _, _, _) when C =:= $\s; C =:= $\t ->
  371. error_terminate(400, State, {connection_error, protocol_error,
  372. 'The separator between request target and version must be a single SP.'});
  373. parse_version(_, State, _, _, _) ->
  374. error_terminate(505, State, {connection_error, protocol_error,
  375. ''}). %% @todo
  376. parse_headers(Rest, State, M, P, Q, V) ->
  377. %% @todo Figure out the parse states.
  378. parse_header(Rest, State#state{in_state=#ps_header{
  379. method=M, path=P, qs=Q, version=V}}, #{}).
  380. %% Headers.
  381. %% We need two or more bytes in the buffer to continue.
  382. parse_header(Rest, State=#state{in_state=PS}, Headers) when byte_size(Rest) < 2 ->
  383. {more, State#state{in_state=PS#ps_header{headers=Headers}}, Rest};
  384. parse_header(<< $\r, $\n, Rest/bits >>, S, Headers) ->
  385. request(Rest, S, Headers);
  386. parse_header(Buffer, State=#state{opts=Opts, in_state=PS}, Headers) ->
  387. MaxLength = maps:get(max_header_name_length, Opts, 64),
  388. MaxHeaders = maps:get(max_headers, Opts, 100),
  389. case match_colon(Buffer, 0) of
  390. nomatch when byte_size(Buffer) > MaxLength ->
  391. error_terminate(400, State, {connection_error, limit_reached,
  392. ''}); %% @todo
  393. nomatch when length(Headers) >= MaxHeaders ->
  394. error_terminate(400, State, {connection_error, limit_reached,
  395. ''}); %% @todo
  396. nomatch ->
  397. {more, State#state{in_state=PS#ps_header{headers=Headers}}, Buffer};
  398. _ ->
  399. parse_hd_name(Buffer, State, Headers, <<>>)
  400. end.
  401. match_colon(<< $:, _/bits >>, N) ->
  402. N;
  403. match_colon(<< _, Rest/bits >>, N) ->
  404. match_colon(Rest, N + 1);
  405. match_colon(_, _) ->
  406. nomatch.
  407. parse_hd_name(<< $:, Rest/bits >>, State, H, SoFar) ->
  408. parse_hd_before_value(Rest, State, H, SoFar);
  409. parse_hd_name(<< C, _/bits >>, State, _, <<>>) when ?IS_WS(C) ->
  410. error_terminate(400, State, {connection_error, protocol_error,
  411. ''}); %% @todo
  412. parse_hd_name(<< C, Rest/bits >>, State, H, SoFar) when ?IS_WS(C) ->
  413. parse_hd_name_ws(Rest, State, H, SoFar);
  414. parse_hd_name(<< C, Rest/bits >>, State, H, SoFar) ->
  415. ?LOWER(parse_hd_name, Rest, State, H, SoFar).
  416. parse_hd_name_ws(<< C, Rest/bits >>, S, H, Name) ->
  417. case C of
  418. $\s -> parse_hd_name_ws(Rest, S, H, Name);
  419. $\t -> parse_hd_name_ws(Rest, S, H, Name);
  420. $: -> parse_hd_before_value(Rest, S, H, Name)
  421. end.
  422. parse_hd_before_value(<< $\s, Rest/bits >>, S, H, N) ->
  423. parse_hd_before_value(Rest, S, H, N);
  424. parse_hd_before_value(<< $\t, Rest/bits >>, S, H, N) ->
  425. parse_hd_before_value(Rest, S, H, N);
  426. parse_hd_before_value(Buffer, State=#state{opts=Opts, in_state=PS}, H, N) ->
  427. MaxLength = maps:get(max_header_value_length, Opts, 4096),
  428. case match_eol(Buffer, 0) of
  429. nomatch when byte_size(Buffer) > MaxLength ->
  430. error_terminate(400, State, {connection_error, limit_reached,
  431. ''}); %% @todo
  432. nomatch ->
  433. {more, State#state{in_state=PS#ps_header{headers=H, name=N}}, Buffer};
  434. _ ->
  435. parse_hd_value(Buffer, State, H, N, <<>>)
  436. end.
  437. parse_hd_value(<< $\r, $\n, Rest/bits >>, S, Headers, Name, SoFar) ->
  438. %% @todo What to do about duplicate header names.
  439. parse_header(Rest, S, Headers#{Name => clean_value_ws_end(SoFar, byte_size(SoFar) - 1)});
  440. parse_hd_value(<< C, Rest/bits >>, S, H, N, SoFar) ->
  441. parse_hd_value(Rest, S, H, N, << SoFar/binary, C >>).
  442. clean_value_ws_end(_, -1) ->
  443. <<>>;
  444. clean_value_ws_end(Value, N) ->
  445. case binary:at(Value, N) of
  446. $\s -> clean_value_ws_end(Value, N - 1);
  447. $\t -> clean_value_ws_end(Value, N - 1);
  448. _ ->
  449. S = N + 1,
  450. << Value2:S/binary, _/bits >> = Value,
  451. Value2
  452. end.
  453. -ifdef(TEST).
  454. clean_value_ws_end_test_() ->
  455. Tests = [
  456. {<<>>, <<>>},
  457. {<<" ">>, <<>>},
  458. {<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
  459. "text/html;level=2;q=0.4, */*;q=0.5 \t \t ">>,
  460. <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
  461. "text/html;level=2;q=0.4, */*;q=0.5">>}
  462. ],
  463. [{V, fun() -> R = clean_value_ws_end(V, byte_size(V) - 1) end} || {V, R} <- Tests].
  464. horse_clean_value_ws_end() ->
  465. horse:repeat(200000,
  466. clean_value_ws_end(
  467. <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
  468. "text/html;level=2;q=0.4, */*;q=0.5 ">>,
  469. byte_size(<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
  470. "text/html;level=2;q=0.4, */*;q=0.5 ">>) - 1)
  471. ).
  472. -endif.
  473. request(Buffer, State=#state{transport=Transport, in_streamid=StreamID,
  474. in_state=#ps_header{version=Version}}, Headers) ->
  475. case maps:get(<<"host">>, Headers, undefined) of
  476. undefined when Version =:= 'HTTP/1.1' ->
  477. %% @todo Might want to not close the connection on this and next one.
  478. error_terminate(400, State, {stream_error, StreamID, protocol_error,
  479. ''}); %% @todo
  480. undefined ->
  481. request(Buffer, State, Headers, <<>>, default_port(Transport:secure()));
  482. RawHost ->
  483. try parse_host(RawHost, false, <<>>) of
  484. {Host, undefined} ->
  485. request(Buffer, State, Headers, Host, default_port(Transport:secure()));
  486. {Host, Port} ->
  487. request(Buffer, State, Headers, Host, Port)
  488. catch _:_ ->
  489. error_terminate(400, State, {stream_error, StreamID, protocol_error,
  490. ''}) %% @todo
  491. end
  492. end.
  493. -spec default_port(boolean()) -> 80 | 443.
  494. default_port(true) -> 443;
  495. default_port(_) -> 80.
  496. %% @todo Yeah probably just call the cowlib function.
  497. %% Same code as cow_http:parse_fullhost/1, but inline because we
  498. %% really want this to go fast.
  499. parse_host(<< $[, Rest/bits >>, false, <<>>) ->
  500. parse_host(Rest, true, << $[ >>);
  501. parse_host(<<>>, false, Acc) ->
  502. {Acc, undefined};
  503. parse_host(<< $:, Rest/bits >>, false, Acc) ->
  504. {Acc, list_to_integer(binary_to_list(Rest))};
  505. parse_host(<< $], Rest/bits >>, true, Acc) ->
  506. parse_host(Rest, false, << Acc/binary, $] >>);
  507. parse_host(<< C, Rest/bits >>, E, Acc) ->
  508. ?LOWER(parse_host, Rest, E, Acc).
  509. %% End of request parsing.
  510. %% @todo We used to get the peername here, bad idea, should
  511. %% get it at the very start of the connection, or the first
  512. %% time requested if we go the route of handler sending a
  513. %% message to get it (we probably shouldn't).
  514. request(Buffer, State0=#state{ref=Ref, transport=Transport, in_streamid=StreamID,
  515. in_state=#ps_header{method=Method, path=Path, qs=Qs, version=Version}},
  516. Headers, Host, Port) ->
  517. Scheme = case Transport:secure() of
  518. true -> <<"https">>;
  519. false -> <<"http">>
  520. end,
  521. {HasBody, BodyLength, TDecodeFun, TDecodeState} = case Headers of
  522. #{<<"content-length">> := <<"0">>} ->
  523. {false, 0, undefined, undefined};
  524. #{<<"content-length">> := BinLength} ->
  525. Length = try
  526. cow_http_hd:parse_content_length(BinLength)
  527. catch _:_ ->
  528. error_terminate(400, State0, {stream_error, StreamID, protocol_error,
  529. ''}) %% @todo
  530. %% @todo Err should terminate here...
  531. end,
  532. {true, Length, fun cow_http_te:stream_identity/2, {0, Length}};
  533. %% @todo Better handling of transfer decoding.
  534. #{<<"transfer-encoding">> := <<"chunked">>} ->
  535. {true, undefined, fun cow_http_te:stream_chunked/2, {0, 0}};
  536. _ ->
  537. {false, 0, undefined, undefined}
  538. end,
  539. Req = #{
  540. ref => Ref,
  541. pid => self(),
  542. streamid => StreamID,
  543. %% @todo peer
  544. %% @todo sockname
  545. %% @todo ssl client cert?
  546. method => Method,
  547. scheme => Scheme,
  548. host => Host,
  549. %% host_info (cowboy_router)
  550. port => Port,
  551. path => Path,
  552. %% path_info (cowboy_router)
  553. %% bindings (cowboy_router)
  554. qs => Qs,
  555. version => Version,
  556. %% We are transparently taking care of transfer-encodings so
  557. %% the user code has no need to know about it.
  558. headers => maps:remove(<<"transfer-encoding">>, Headers),
  559. has_body => HasBody,
  560. body_length => BodyLength
  561. %% @todo multipart? keep state separate
  562. %% meta values (cowboy_websocket, cowboy_rest)
  563. },
  564. case is_http2_upgrade(Headers, Version) of
  565. false ->
  566. State = case HasBody of
  567. true ->
  568. cancel_request_timeout(State0#state{in_state=#ps_body{
  569. %% @todo Don't need length anymore?
  570. transfer_decode_fun = TDecodeFun,
  571. transfer_decode_state = TDecodeState
  572. }});
  573. false ->
  574. set_request_timeout(State0#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}})
  575. end,
  576. {request, Req, State, Buffer};
  577. {true, SettingsPayload} ->
  578. http2_upgrade(State0, Buffer, SettingsPayload, Req)
  579. end.
  580. %% HTTP/2 upgrade.
  581. is_http2_upgrade(#{<<"connection">> := Conn, <<"upgrade">> := Upgrade,
  582. <<"http2-settings">> := HTTP2Settings}, 'HTTP/1.1') ->
  583. Conns = cow_http_hd:parse_connection(Conn),
  584. io:format(user, "CONNS ~p~n", [Conns]),
  585. case {lists:member(<<"upgrade">>, Conns), lists:member(<<"http2-settings">>, Conns)} of
  586. {true, true} ->
  587. Protocols = cow_http_hd:parse_upgrade(Upgrade),
  588. io:format(user, "PROTOCOLS ~p~n", [Protocols]),
  589. case lists:member(<<"h2c">>, Protocols) of
  590. true ->
  591. SettingsPayload = cow_http_hd:parse_http2_settings(HTTP2Settings),
  592. {true, SettingsPayload};
  593. false ->
  594. false
  595. end;
  596. _ ->
  597. false
  598. end;
  599. is_http2_upgrade(_, _) ->
  600. false.
  601. %% Upgrade through an HTTP/1.1 request.
  602. %% Prior knowledge upgrade, without an HTTP/1.1 request.
  603. http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
  604. opts=Opts, handler=Handler}, Buffer) ->
  605. case Transport:secure() of
  606. false ->
  607. _ = cancel_request_timeout(State),
  608. cowboy_http2:init(Parent, Ref, Socket, Transport, Opts, Handler, Buffer);
  609. true ->
  610. error_terminate(400, State, {connection_error, protocol_error,
  611. 'Clients that support HTTP/2 over TLS MUST use ALPN. (RFC7540 3.4)'})
  612. end.
  613. http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
  614. opts=Opts, handler=Handler}, Buffer, SettingsPayload, Req) ->
  615. %% @todo
  616. %% However if the client sent a body, we need to read the body in full
  617. %% and if we can't do that, return a 413 response. Some options are in order.
  618. %% Always half-closed stream coming from this side.
  619. Transport:send(Socket, cow_http:response(101, 'HTTP/1.1', maps:to_list(#{
  620. <<"connection">> => <<"Upgrade">>,
  621. <<"upgrade">> => <<"h2c">>
  622. }))),
  623. %% @todo Possibly redirect the request if it was https.
  624. _ = cancel_request_timeout(State),
  625. cowboy_http2:init(Parent, Ref, Socket, Transport, Opts, Handler, Buffer, SettingsPayload, Req).
  626. %% Request body parsing.
  627. parse_body(Buffer, State=#state{in_streamid=StreamID, in_state=
  628. PS=#ps_body{transfer_decode_fun=TDecode, transfer_decode_state=TState0}}) ->
  629. %% @todo Proper trailers.
  630. case TDecode(Buffer, TState0) of
  631. more ->
  632. %% @todo Asks for 0 or more bytes.
  633. {more, State, Buffer};
  634. {more, Data, TState} ->
  635. %% @todo Asks for 0 or more bytes.
  636. {data, StreamID, nofin, Data, State#state{in_state=
  637. PS#ps_body{transfer_decode_state=TState}}, <<>>};
  638. {more, Data, _Length, TState} when is_integer(_Length) ->
  639. %% @todo Asks for Length more bytes.
  640. {data, StreamID, nofin, Data, State#state{in_state=
  641. PS#ps_body{transfer_decode_state=TState}}, <<>>};
  642. {more, Data, Rest, TState} ->
  643. %% @todo Asks for 0 or more bytes.
  644. {data, StreamID, nofin, Data, State#state{in_state=
  645. PS#ps_body{transfer_decode_state=TState}}, Rest};
  646. {done, TotalLength, Rest} ->
  647. {data, StreamID, {fin, TotalLength}, <<>>, set_request_timeout(
  648. State#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}), Rest};
  649. {done, Data, TotalLength, Rest} ->
  650. {data, StreamID, {fin, TotalLength}, Data, set_request_timeout(
  651. State#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}), Rest}
  652. end.
  653. %% Message handling.
  654. down(State=#state{children=Children0}, Pid, Msg) ->
  655. case lists:keytake(Pid, 1, Children0) of
  656. {value, {_, undefined, _}, Children} ->
  657. State#state{children=Children};
  658. {value, {_, StreamID, _}, Children} ->
  659. info(State#state{children=Children}, StreamID, Msg);
  660. false ->
  661. error_logger:error_msg("Received EXIT signal ~p for unknown process ~p.", [Msg, Pid]),
  662. State
  663. end.
  664. info(State=#state{handler=Handler, streams=Streams0}, StreamID, Msg) ->
  665. case lists:keyfind(StreamID, #stream.id, Streams0) of
  666. Stream = #stream{state=StreamState0} ->
  667. try Handler:info(StreamID, Msg, StreamState0) of
  668. {Commands, StreamState} ->
  669. Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
  670. Stream#stream{state=StreamState}),
  671. commands(State#state{streams=Streams}, StreamID, Commands)
  672. catch Class:Reason ->
  673. error_logger:error_msg("Exception occurred in ~s:info(~p, ~p, ~p) with reason ~p:~p.",
  674. [Handler, StreamID, Msg, StreamState0, Class, Reason]),
  675. ok
  676. %% @todo
  677. % stream_reset(State, StreamID, {internal_error, {Class, Reason},
  678. % 'Exception occurred in StreamHandler:info/3 call.'})
  679. end;
  680. false ->
  681. error_logger:error_msg("Received message ~p for unknown stream ~p.", [Msg, StreamID]),
  682. State
  683. end.
  684. %% @todo commands/3
  685. %% @todo stream_reset
  686. %% Commands.
  687. commands(State, _, []) ->
  688. State;
  689. %% Supervise a child process.
  690. commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) ->
  691. commands(State#state{children=[{Pid, StreamID, Shutdown}|Children]}, StreamID, Tail);
  692. %% Error handling.
  693. commands(State, StreamID, [Error = {internal_error, _, _}|Tail]) ->
  694. commands(stream_reset(State, StreamID, Error), StreamID, Tail);
  695. %% Commands for a stream currently inactive.
  696. commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Commands)
  697. when Current =/= StreamID ->
  698. %% @todo We still want to handle some commands...
  699. Stream = #stream{queue=Queue} = lists:keyfind(StreamID, #stream.id, Streams0),
  700. Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
  701. Stream#stream{queue=Queue ++ Commands}),
  702. State#state{streams=Streams};
  703. %% Read the request body.
  704. commands(State, StreamID, [{flow, _Length}|Tail]) ->
  705. %% @todo We only read from socket if buffer is empty, otherwise
  706. %% we decode the buffer.
  707. %% @todo Set the body reading length to min(Length, BodyLength)
  708. commands(State, StreamID, Tail);
  709. %% @todo Probably a good idea to have an atomic response send (single send call for resp+body).
  710. %% Send a full response.
  711. %%
  712. %% @todo Kill the stream if it sent a response when one has already been sent.
  713. %% @todo Keep IsFin in the state.
  714. %% @todo Same two things above apply to DATA, possibly promise too.
  715. commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
  716. [{response, StatusCode, Headers0, Body}|Tail]) ->
  717. %% @todo I'm pretty sure the last stream in the list is the one we want
  718. %% considering all others are queued.
  719. #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
  720. {State, Headers} = connection(State0, Headers0, StreamID, Version),
  721. %% @todo Ensure content-length is set.
  722. Response = cow_http:response(StatusCode, 'HTTP/1.1', maps:to_list(Headers)),
  723. case Body of
  724. {sendfile, O, B, P} ->
  725. Transport:send(Socket, Response),
  726. commands(State#state{out_state=done}, StreamID, [{sendfile, fin, O, B, P}|Tail]);
  727. _ ->
  728. Transport:send(Socket, [Response, Body]),
  729. %% @todo If max number of requests, close connection.
  730. %% @todo If IsFin, maybe skip body of current request.
  731. maybe_terminate(State#state{out_state=done}, StreamID, Tail, fin)
  732. end;
  733. %% Send response headers and initiate chunked encoding.
  734. commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
  735. [{headers, StatusCode, Headers0}|Tail]) ->
  736. %% @todo Same as above.
  737. #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
  738. {State1, Headers1} = case Version of
  739. 'HTTP/1.1' ->
  740. {State0, Headers0#{<<"transfer-encoding">> => <<"chunked">>}};
  741. %% Close the connection after streaming the data to HTTP/1.0 client.
  742. %% @todo I'm guessing we need to differentiate responses with a content-length and others.
  743. 'HTTP/1.0' ->
  744. {State0#state{last_streamid=StreamID}, Headers0}
  745. end,
  746. {State, Headers} = connection(State1, Headers1, StreamID, Version),
  747. Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', maps:to_list(Headers))),
  748. commands(State#state{out_state=chunked}, StreamID, Tail);
  749. %% Send a response body chunk.
  750. %%
  751. %% @todo WINDOW_UPDATE stuff require us to buffer some data.
  752. commands(State=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
  753. [{data, IsFin, Data}|Tail]) ->
  754. %% @todo Same as above.
  755. case lists:keyfind(StreamID, #stream.id, Streams) of
  756. #stream{version='HTTP/1.1'} ->
  757. Size = iolist_size(Data),
  758. Transport:send(Socket, [integer_to_list(Size, 16), <<"\r\n">>, Data, <<"\r\n">>]);
  759. #stream{version='HTTP/1.0'} ->
  760. Transport:send(Socket, Data)
  761. end,
  762. maybe_terminate(State, StreamID, Tail, IsFin);
  763. %% Send a file.
  764. commands(State=#state{socket=Socket, transport=Transport}, StreamID,
  765. [{sendfile, IsFin, Offset, Bytes, Path}|Tail]) ->
  766. Transport:sendfile(Socket, Path, Offset, Bytes),
  767. maybe_terminate(State, StreamID, Tail, IsFin);
  768. %% Protocol takeover.
  769. commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport,
  770. opts=Opts, children=Children}, StreamID,
  771. [{switch_protocol, Headers, Protocol, InitialState}|_Tail]) ->
  772. %% @todo This should be the last stream running otherwise we need to wait before switching.
  773. %% @todo If there's streams opened after this one, fail instead of 101.
  774. State = cancel_request_timeout(State0),
  775. %% @todo When we actually do the upgrade, we only have the one stream left, plus
  776. %% possibly some processes terminating. We need a smart strategy for handling the
  777. %% children shutdown. We can start with brutal_kill and discarding the EXIT messages
  778. %% received before switching to Websocket. Something better would be to let the
  779. %% stream processes finish but that implies the Websocket module to know about
  780. %% them and filter the messages. For now, kill them all and discard all messages
  781. %% in the mailbox.
  782. _ = [exit(Pid, kill) || {Pid, _, _} <- Children],
  783. flush(),
  784. %% Everything good, upgrade!
  785. _ = commands(State, StreamID, [{response, 101, Headers, <<>>}]),
  786. %% @todo This is no good because commands return a state normally and here it doesn't
  787. %% we need to let this module go entirely. Perhaps it should be handled directly in
  788. %% cowboy_clear/cowboy_tls? Perhaps not. We do want that Buffer.
  789. Protocol:takeover(Parent, Ref, Socket, Transport, Opts, <<>>, InitialState);
  790. %% Stream shutdown.
  791. commands(State, StreamID, [stop|Tail]) ->
  792. %% @todo Do we want to run the commands after a stop?
  793. % commands(stream_terminate(State, StreamID, stop), StreamID, Tail).
  794. maybe_terminate(State, StreamID, Tail, fin).
  795. flush() ->
  796. receive _ -> flush() after 0 -> ok end.
  797. maybe_terminate(State, StreamID, Tail, nofin) ->
  798. commands(State, StreamID, Tail);
  799. %% @todo In these cases I'm not sure if we should continue processing commands.
  800. maybe_terminate(State=#state{last_streamid=StreamID}, StreamID, _Tail, fin) ->
  801. terminate(stream_terminate(State, StreamID, normal), normal); %% @todo Reason ok?
  802. maybe_terminate(State, StreamID, _Tail, fin) ->
  803. stream_terminate(State, StreamID, normal).
  804. stream_reset(State=#state{socket=Socket, transport=Transport}, StreamID,
  805. StreamError={internal_error, _, _}) ->
  806. %% @todo headers
  807. %% @todo Don't send this if there are no streams left.
  808. Transport:send(Socket, cow_http:response(500, 'HTTP/1.1', [
  809. {<<"content-length">>, <<"0">>}
  810. ])),
  811. %% @todo update IsFin local
  812. stream_terminate(State#state{out_state=done}, StreamID, StreamError).
  813. stream_terminate(State=#state{socket=Socket, transport=Transport, handler=Handler,
  814. out_streamid=OutStreamID, out_state=OutState,
  815. streams=Streams0, children=Children0}, StreamID, Reason) ->
  816. {value, #stream{state=StreamState, version=Version}, Streams}
  817. = lists:keytake(StreamID, #stream.id, Streams0),
  818. _ = case OutState of
  819. wait ->
  820. Transport:send(Socket, cow_http:response(204, 'HTTP/1.1', []));
  821. chunked when Version =:= 'HTTP/1.1' ->
  822. Transport:send(Socket, <<"0\r\n\r\n">>);
  823. _ -> %% done or Version =:= 'HTTP/1.0'
  824. ok
  825. end,
  826. stream_call_terminate(StreamID, Reason, Handler, StreamState),
  827. %% @todo initiate children shutdown
  828. % Children = stream_terminate_children(Children0, StreamID, []),
  829. Children = [case C of
  830. {Pid, StreamID, Shutdown} -> {Pid, undefined, Shutdown};
  831. _ -> C
  832. end || C <- Children0],
  833. %% @todo Skip the body, if any, or drop the connection if too large.
  834. %% @todo Only do this if Current =:= StreamID.
  835. NextOutStreamID = OutStreamID + 1,
  836. case lists:keyfind(NextOutStreamID, #stream.id, Streams) of
  837. false ->
  838. %% @todo This is clearly wrong, if the stream is gone we need to check if
  839. %% there used to be such a stream, and if there was to send an error.
  840. State#state{out_streamid=NextOutStreamID, out_state=wait, streams=Streams, children=Children};
  841. #stream{queue=Commands} ->
  842. %% @todo Remove queue from the stream.
  843. commands(State#state{out_streamid=NextOutStreamID, out_state=wait,
  844. streams=Streams, children=Children}, NextOutStreamID, Commands)
  845. end.
  846. %% @todo Taken directly from _http2
  847. stream_call_terminate(StreamID, Reason, Handler, StreamState) ->
  848. try
  849. Handler:terminate(StreamID, Reason, StreamState),
  850. ok
  851. catch Class:Reason ->
  852. error_logger:error_msg("Exception occurred in ~s:terminate(~p, ~p, ~p) with reason ~p:~p.",
  853. [Handler, StreamID, Reason, StreamState, Class, Reason])
  854. end.
  855. %stream_terminate_children([], _, Acc) ->
  856. % Acc;
  857. %stream_terminate_children([{Pid, StreamID}|Tail], StreamID, Acc) ->
  858. % exit(Pid, kill),
  859. % stream_terminate_children(Tail, StreamID, Acc);
  860. %stream_terminate_children([Child|Tail], StreamID, Acc) ->
  861. % stream_terminate_children(Tail, StreamID, [Child|Acc]).
  862. %% @todo max_reqs also
  863. maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.0') ->
  864. Conns = cow_http_hd:parse_connection(Conn),
  865. case lists:member(<<"keep-alive">>, Conns) of
  866. true -> keepalive;
  867. false -> close
  868. end;
  869. maybe_req_close(_, _, 'HTTP/1.0') ->
  870. close;
  871. maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.1') ->
  872. case connection_hd_is_close(Conn) of
  873. true -> close;
  874. false -> keepalive
  875. end;
  876. maybe_req_close(_State, _, _) ->
  877. keepalive.
  878. connection(State=#state{last_streamid=StreamID}, Headers=#{<<"connection">> := Conn}, StreamID, _) ->
  879. case connection_hd_is_close(Conn) of
  880. true -> {State, Headers};
  881. %% @todo Here we need to remove keep-alive and add close, not just add close.
  882. false -> {State, Headers#{<<"connection">> => [<<"close, ">>, Conn]}}
  883. end;
  884. connection(State=#state{last_streamid=StreamID}, Headers, StreamID, _) ->
  885. {State, Headers#{<<"connection">> => <<"close">>}};
  886. connection(State, Headers=#{<<"connection">> := Conn}, StreamID, _) ->
  887. case connection_hd_is_close(Conn) of
  888. true -> {State#state{last_streamid=StreamID}, Headers};
  889. %% @todo Here we need to set keep-alive only if it wasn't set before.
  890. false -> {State, Headers}
  891. end;
  892. connection(State, Headers, _, 'HTTP/1.0') ->
  893. {State, Headers#{<<"connection">> => <<"keep-alive">>}};
  894. connection(State, Headers, _, _) ->
  895. {State, Headers}.
  896. connection_hd_is_close(Conn) ->
  897. Conns = cow_http_hd:parse_connection(iolist_to_binary(Conn)),
  898. lists:member(<<"close">>, Conns).
  899. error_terminate(StatusCode, State=#state{socket=Socket, transport=Transport}, Reason) ->
  900. Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', [
  901. {<<"content-length">>, <<"0">>}
  902. ])),
  903. terminate(State, Reason).
  904. terminate(_State, _Reason) ->
  905. exit(normal). %% @todo
  906. %% System callbacks.
  907. -spec system_continue(_, _, #state{}) -> ok.
  908. system_continue(_, _, {State, Buffer}) ->
  909. loop(State, Buffer).
  910. -spec system_terminate(any(), _, _, _) -> no_return().
  911. system_terminate(Reason, _, _, _) ->
  912. exit(Reason).
  913. -spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}.
  914. system_code_change(Misc, _, _, _) ->
  915. {ok, Misc}.