cowboy_http.erl 42 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031
  1. %% Copyright (c) 2016-2017, Loïc Hoguin <essen@ninenines.eu>
  2. %%
  3. %% Permission to use, copy, modify, and/or distribute this software for any
  4. %% purpose with or without fee is hereby granted, provided that the above
  5. %% copyright notice and this permission notice appear in all copies.
  6. %%
  7. %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  8. %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  9. %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  10. %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  11. %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  12. %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  13. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  14. -module(cowboy_http).
  15. -export([init/5]).
  16. -export([system_continue/3]).
  17. -export([system_terminate/4]).
  18. -export([system_code_change/4]).
  19. %% @todo map
  20. -type opts() :: [{compress, boolean()}
  21. | {env, cowboy_middleware:env()}
  22. | {max_empty_lines, non_neg_integer()}
  23. | {max_header_name_length, non_neg_integer()}
  24. | {max_header_value_length, non_neg_integer()}
  25. | {max_headers, non_neg_integer()}
  26. | {max_keepalive, non_neg_integer()}
  27. | {max_request_line_length, non_neg_integer()}
  28. | {middlewares, [module()]}
  29. | {onresponse, cowboy:onresponse_fun()}
  30. | {timeout, timeout()}].
  31. -export_type([opts/0]).
  32. -record(ps_request_line, {
  33. empty_lines = 0 :: non_neg_integer()
  34. }).
  35. -record(ps_header, {
  36. method = undefined :: binary(),
  37. path = undefined :: binary(),
  38. qs = undefined :: binary(),
  39. version = undefined :: cowboy:http_version(),
  40. headers = undefined :: map() | undefined, %% @todo better type than map()
  41. name = undefined :: binary() | undefined
  42. }).
  43. %% @todo We need a state where we wait for the stream process to ask for the body.
  44. %% OR DO WE
  45. %% In HTTP/2 we start receiving data before the body asks for it, even if optionally
  46. %% (and by default), so we need to be able to do the same for HTTP/1.1 too. This means
  47. %% that when we receive data (up to a certain limit, we read from the socket and decode.
  48. %% When we reach a limit, we stop reading from the socket momentarily until the stream
  49. %% process asks for more or the stream ends.
  50. %% This means that we need to keep a buffer in the stream handler (until the stream
  51. %% process asks for it). And that we need the body state to indicate how much we have
  52. %% left to read (and stop/start reading from the socket depending on value).
  53. -record(ps_body, {
  54. %% @todo flow
  55. transfer_decode_fun :: fun(), %% @todo better type
  56. transfer_decode_state :: any() %% @todo better type
  57. }).
  58. -record(stream, {
  59. id = undefined :: cowboy_stream:streamid(),
  60. %% Stream handlers and their state.
  61. state = undefined :: {module(), any()},
  62. %% Client HTTP version for this stream.
  63. version = undefined :: cowboy:http_version(),
  64. %% Commands queued.
  65. queue = [] :: cowboy_stream:commands()
  66. }).
  67. -type stream() :: #stream{}.
  68. -record(state, {
  69. parent :: pid(),
  70. ref :: ranch:ref(),
  71. socket :: inet:socket(),
  72. transport :: module(),
  73. opts = #{} :: map(),
  74. %% Remote address and port for the connection.
  75. peer = undefined :: {inet:ip_address(), inet:port_number()},
  76. timer = undefined :: undefined | reference(),
  77. %% Identifier for the stream currently being read (or waiting to be received).
  78. in_streamid = 1 :: pos_integer(),
  79. %% Parsing state for the current stream or stream-to-be.
  80. in_state = #ps_request_line{} :: #ps_request_line{} | #ps_header{} | #ps_body{},
  81. %% Identifier for the stream currently being written.
  82. %% Note that out_streamid =< in_streamid.
  83. out_streamid = 1 :: pos_integer(),
  84. %% Whether we finished writing data for the current stream.
  85. out_state = wait :: wait | headers | chunked | done,
  86. %% The connection will be closed after this stream.
  87. last_streamid = undefined :: pos_integer(),
  88. %% Currently active HTTP/1.1 streams.
  89. streams = [] :: [stream()],
  90. %% Children which are in the process of shutting down.
  91. children = [] :: [{pid(), cowboy_stream:streamid(), timeout()}]
  92. }).
  93. -include_lib("cowlib/include/cow_inline.hrl").
  94. -include_lib("cowlib/include/cow_parse.hrl").
  95. -spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts()) -> ok.
  96. init(Parent, Ref, Socket, Transport, Opts) ->
  97. case Transport:peername(Socket) of
  98. {ok, Peer} ->
  99. LastStreamID = maps:get(max_keepalive, Opts, 100),
  100. before_loop(set_request_timeout(#state{
  101. parent=Parent, ref=Ref, socket=Socket,
  102. transport=Transport, opts=Opts,
  103. peer=Peer, last_streamid=LastStreamID}), <<>>);
  104. {error, Reason} ->
  105. %% Couldn't read the peer address; connection is gone.
  106. terminate(undefined, {socket_error, Reason, 'An error has occurred on the socket.'})
  107. end.
  108. %% @todo Send a response depending on in_state and whether one was already sent.
  109. %% @todo
  110. %% Timeouts:
  111. %% - waiting for new request (if no stream is currently running)
  112. %% -> request_timeout: for whole request/headers, set at init/when we set ps_request_line{} state
  113. %% - waiting for new request, or body (when a stream is currently running)
  114. %% -> idle_timeout: amount of time we wait without receiving any data
  115. %% - if we skip the body, skip only for a specific duration
  116. %% -> skip_body_timeout: also have a skip_body_length
  117. %% - global
  118. %% -> inactivity_timeout: max time to wait without anything happening before giving up
  119. before_loop(State=#state{socket=Socket, transport=Transport}, Buffer) ->
  120. %% @todo disable this when we get to the body, until the stream asks for it?
  121. %% Perhaps have a threshold for how much we're willing to read before waiting.
  122. Transport:setopts(Socket, [{active, once}]),
  123. loop(State, Buffer).
  124. loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
  125. timer=TimerRef, children=Children}, Buffer) ->
  126. {OK, Closed, Error} = Transport:messages(),
  127. receive
  128. %% Socket messages.
  129. {OK, Socket, Data} ->
  130. parse(<< Buffer/binary, Data/binary >>, State);
  131. {Closed, Socket} ->
  132. terminate(State, {socket_error, closed, 'The socket has been closed.'});
  133. {Error, Socket, Reason} ->
  134. terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
  135. %% Timeouts.
  136. {timeout, TimerRef, Reason} ->
  137. timeout(State, Reason);
  138. {timeout, _, _} ->
  139. loop(State, Buffer);
  140. %% System messages.
  141. {'EXIT', Parent, Reason} ->
  142. exit(Reason);
  143. {system, From, Request} ->
  144. sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer});
  145. %% Messages pertaining to a stream.
  146. {{Pid, StreamID}, Msg} when Pid =:= self() ->
  147. loop(info(State, StreamID, Msg), Buffer);
  148. %% Exit signal from children.
  149. Msg = {'EXIT', Pid, _} ->
  150. loop(down(State, Pid, Msg), Buffer);
  151. %% Calls from supervisor module.
  152. {'$gen_call', {From, Tag}, which_children} ->
  153. Workers = [{?MODULE, Pid, worker, [?MODULE]} || {Pid, _, _} <- Children],
  154. From ! {Tag, Workers},
  155. loop(State, Buffer);
  156. {'$gen_call', {From, Tag}, count_children} ->
  157. NbChildren = length(Children),
  158. Counts = [{specs, 1}, {active, NbChildren},
  159. {supervisors, 0}, {workers, NbChildren}],
  160. From ! {Tag, Counts},
  161. loop(State, Buffer);
  162. {'$gen_call', {From, Tag}, _} ->
  163. From ! {Tag, {error, ?MODULE}},
  164. loop(State, Buffer);
  165. %% Unknown messages.
  166. Msg ->
  167. error_logger:error_msg("Received stray message ~p.~n", [Msg]),
  168. loop(State, Buffer)
  169. %% @todo Configurable timeout. This should be a global inactivity timeout
  170. %% that triggers when really nothing happens (ie something went really wrong).
  171. after 300000 ->
  172. terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
  173. end.
  174. set_request_timeout(State0=#state{opts=Opts}) ->
  175. State = cancel_request_timeout(State0),
  176. Timeout = maps:get(request_timeout, Opts, 5000),
  177. TimerRef = erlang:start_timer(Timeout, self(), request_timeout),
  178. State#state{timer=TimerRef}.
  179. cancel_request_timeout(State=#state{timer=TimerRef}) ->
  180. ok = case TimerRef of
  181. undefined -> ok;
  182. _ -> erlang:cancel_timer(TimerRef, [{async, true}, {info, false}])
  183. end,
  184. State#state{timer=undefined}.
  185. -spec timeout(_, _) -> no_return().
  186. %% @todo Honestly it would be much better if we didn't enable pipelining yet.
  187. timeout(State=#state{in_state=#ps_request_line{}}, request_timeout) ->
  188. %% @todo If other streams are running, just set the connection to be closed
  189. %% and stop trying to read from the socket?
  190. terminate(State, {connection_error, timeout, 'No request-line received before timeout.'});
  191. timeout(State=#state{socket=Socket, transport=Transport, in_state=#ps_header{}}, request_timeout) ->
  192. %% @todo If other streams are running, maybe wait for their reply before sending 408?
  193. %% -> Definitely. Either way, stop reading from the socket and make that stream the last.
  194. Transport:send(Socket, cow_http:response(408, 'HTTP/1.1', [])),
  195. terminate(State, {connection_error, timeout, 'Request headers not received before timeout.'}).
  196. %% Request-line.
  197. parse(<<>>, State) ->
  198. before_loop(State, <<>>);
  199. parse(Buffer, State=#state{in_state=#ps_request_line{empty_lines=EmptyLines}}) ->
  200. after_parse(parse_request(Buffer, State, EmptyLines));
  201. parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=undefined}}) ->
  202. after_parse(parse_header(Buffer,
  203. State#state{in_state=PS#ps_header{headers=undefined}},
  204. Headers));
  205. parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=Name}}) ->
  206. after_parse(parse_hd_before_value(Buffer,
  207. State#state{in_state=PS#ps_header{headers=undefined, name=undefined}},
  208. Headers, Name));
  209. parse(Buffer, State=#state{in_state=#ps_body{}}) ->
  210. %% @todo We do not want to get the body automatically if the request doesn't ask for it.
  211. %% We may want to get bodies that are below a threshold without waiting, and buffer them
  212. %% until the request asks, though.
  213. after_parse(parse_body(Buffer, State)).
  214. %% @todo Don't parse if body is finished but request isn't. Let's not parallelize for now.
  215. after_parse({request, Req=#{streamid := StreamID, headers := Headers, version := Version},
  216. State0=#state{opts=Opts, streams=Streams0}, Buffer}) ->
  217. try cowboy_stream:init(StreamID, Req, Opts) of
  218. {Commands, StreamState} ->
  219. Streams = [#stream{id=StreamID, state=StreamState, version=Version}|Streams0],
  220. State = case maybe_req_close(State0, Headers, Version) of
  221. close -> State0#state{streams=Streams, last_streamid=StreamID};
  222. keepalive -> State0#state{streams=Streams}
  223. end,
  224. parse(Buffer, commands(State, StreamID, Commands))
  225. catch Class:Reason ->
  226. error_logger:error_msg("Exception occurred in "
  227. "cowboy_stream:init(~p, ~p, ~p) with reason ~p:~p.",
  228. [StreamID, Req, Opts, Class, Reason]),
  229. ok %% @todo send a proper response, etc. note that terminate must NOT be called
  230. %% @todo Status code.
  231. % stream_reset(State, StreamID, {internal_error, {Class, Reason},
  232. % 'Exception occurred in StreamHandler:init/10 call.'}) %% @todo Check final arity.
  233. end;
  234. %% Streams are sequential so the body is always about the last stream created
  235. %% unless that stream has terminated.
  236. after_parse({data, StreamID, IsFin, Data, State=#state{
  237. streams=Streams0=[Stream=#stream{id=StreamID, state=StreamState0}|_]}, Buffer}) ->
  238. try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
  239. {Commands, StreamState} ->
  240. Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
  241. Stream#stream{state=StreamState}),
  242. parse(Buffer, commands(State#state{streams=Streams}, StreamID, Commands))
  243. catch Class:Reason ->
  244. error_logger:error_msg("Exception occurred in "
  245. "cowboy_stream:data(~p, ~p, ~p, ~p) with reason ~p:~p.",
  246. [StreamID, IsFin, Data, StreamState0, Class, Reason]),
  247. %% @todo Bad value returned here. Crashes.
  248. ok
  249. %% @todo
  250. % stream_reset(State, StreamID, {internal_error, {Class, Reason},
  251. % 'Exception occurred in StreamHandler:data/4 call.'})
  252. end;
  253. %% No corresponding stream, skip.
  254. after_parse({data, _, _, _, State, Buffer}) ->
  255. before_loop(State, Buffer);
  256. after_parse({more, State, Buffer}) ->
  257. before_loop(State, Buffer).
  258. %% Request-line.
  259. -spec parse_request(Buffer, State, non_neg_integer())
  260. -> {request, cowboy_req:req(), State, Buffer}
  261. | {data, cowboy_stream:streamid(), cowboy_stream:fin(), binary(), State, Buffer}
  262. | {more, State, Buffer}
  263. when Buffer::binary(), State::#state{}.
  264. %% Empty lines must be using \r\n.
  265. parse_request(<< $\n, _/bits >>, State, _) ->
  266. error_terminate(400, State, {connection_error, protocol_error,
  267. 'Empty lines between requests must use the CRLF line terminator. (RFC7230 3.5)'});
  268. parse_request(<< $\s, _/bits >>, State, _) ->
  269. error_terminate(400, State, {connection_error, protocol_error,
  270. 'The request-line must not begin with a space. (RFC7230 3.1.1, RFC7230 3.5)'});
  271. %% We limit the length of the Request-line to MaxLength to avoid endlessly
  272. %% reading from the socket and eventually crashing.
  273. parse_request(Buffer, State=#state{opts=Opts, in_streamid=InStreamID}, EmptyLines) ->
  274. MaxLength = maps:get(max_request_line_length, Opts, 8000),
  275. MaxEmptyLines = maps:get(max_empty_lines, Opts, 5),
  276. case match_eol(Buffer, 0) of
  277. nomatch when byte_size(Buffer) > MaxLength ->
  278. error_terminate(414, State, {connection_error, limit_reached,
  279. 'The request-line length is larger than configuration allows. (RFC7230 3.1.1)'});
  280. nomatch ->
  281. {more, State#state{in_state=#ps_request_line{empty_lines=EmptyLines}}, Buffer};
  282. 1 when EmptyLines =:= MaxEmptyLines ->
  283. error_terminate(400, State, {connection_error, limit_reached,
  284. 'More empty lines were received than configuration allows. (RFC7230 3.5)'});
  285. 1 ->
  286. << _:16, Rest/bits >> = Buffer,
  287. parse_request(Rest, State, EmptyLines + 1);
  288. _ ->
  289. case Buffer of
  290. %% @todo * is only for server-wide OPTIONS request (RFC7230 5.3.4); tests
  291. << "OPTIONS * ", Rest/bits >> ->
  292. parse_version(Rest, State, <<"OPTIONS">>, <<"*">>, <<>>);
  293. % << "CONNECT ", Rest/bits >> ->
  294. % parse_authority( %% @todo
  295. %% Accept direct HTTP/2 only at the beginning of the connection.
  296. << "PRI * HTTP/2.0\r\n", _/bits >> when InStreamID =:= 1 ->
  297. %% @todo Might be worth throwing to get a clean stacktrace.
  298. http2_upgrade(State, Buffer);
  299. _ ->
  300. parse_method(Buffer, State, <<>>,
  301. maps:get(max_method_length, Opts, 32))
  302. end
  303. end.
  304. match_eol(<< $\n, _/bits >>, N) ->
  305. N;
  306. match_eol(<< _, Rest/bits >>, N) ->
  307. match_eol(Rest, N + 1);
  308. match_eol(_, _) ->
  309. nomatch.
  310. parse_method(_, State, _, 0) ->
  311. error_terminate(501, State, {connection_error, limit_reached,
  312. 'The method name is longer than configuration allows. (RFC7230 3.1.1)'});
  313. parse_method(<< C, Rest/bits >>, State, SoFar, Remaining) ->
  314. case C of
  315. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  316. 'The method name must not be followed with a line break. (RFC7230 3.1.1)'});
  317. $\s -> parse_uri(Rest, State, SoFar);
  318. _ when ?IS_TOKEN(C) -> parse_method(Rest, State, << SoFar/binary, C >>, Remaining - 1);
  319. _ -> error_terminate(400, State, {connection_error, protocol_error,
  320. 'The method name must contain only valid token characters. (RFC7230 3.1.1)'})
  321. end.
  322. parse_uri(<< H, T, T, P, "://", Rest/bits >>, State, Method)
  323. when H =:= $h orelse H =:= $H, T =:= $t orelse T =:= $T;
  324. P =:= $p orelse P =:= $P ->
  325. parse_uri_skip_host(Rest, State, Method);
  326. parse_uri(<< H, T, T, P, S, "://", Rest/bits >>, State, Method)
  327. when H =:= $h orelse H =:= $H, T =:= $t orelse T =:= $T;
  328. P =:= $p orelse P =:= $P; S =:= $s orelse S =:= $S ->
  329. parse_uri_skip_host(Rest, State, Method);
  330. parse_uri(<< $/, Rest/bits >>, State, Method) ->
  331. parse_uri_path(Rest, State, Method, << $/ >>);
  332. parse_uri(_, State, _) ->
  333. error_terminate(400, State, {connection_error, protocol_error,
  334. 'Invalid request-line or request-target. (RFC7230 3.1.1, RFC7230 5.3)'}).
  335. parse_uri_skip_host(<< C, Rest/bits >>, State, Method) ->
  336. case C of
  337. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  338. 'The request-target must not be followed by a line break. (RFC7230 3.1.1)'});
  339. $/ -> parse_uri_path(Rest, State, Method, <<"/">>);
  340. $\s -> parse_version(Rest, State, Method, <<"/">>, <<>>);
  341. $? -> parse_uri_query(Rest, State, Method, <<"/">>, <<>>);
  342. $# -> skip_uri_fragment(Rest, State, Method, <<"/">>, <<>>);
  343. _ -> parse_uri_skip_host(Rest, State, Method)
  344. end.
  345. parse_uri_path(<< C, Rest/bits >>, State, Method, SoFar) ->
  346. case C of
  347. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  348. 'The request-target must not be followed by a line break. (RFC7230 3.1.1)'});
  349. $\s -> parse_version(Rest, State, Method, SoFar, <<>>);
  350. $? -> parse_uri_query(Rest, State, Method, SoFar, <<>>);
  351. $# -> skip_uri_fragment(Rest, State, Method, SoFar, <<>>);
  352. _ -> parse_uri_path(Rest, State, Method, << SoFar/binary, C >>)
  353. end.
  354. parse_uri_query(<< C, Rest/bits >>, State, M, P, SoFar) ->
  355. case C of
  356. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  357. 'The request-target must not be followed by a line break. (RFC7230 3.1.1)'});
  358. $\s -> parse_version(Rest, State, M, P, SoFar);
  359. $# -> skip_uri_fragment(Rest, State, M, P, SoFar);
  360. _ -> parse_uri_query(Rest, State, M, P, << SoFar/binary, C >>)
  361. end.
  362. skip_uri_fragment(<< C, Rest/bits >>, State, M, P, Q) ->
  363. case C of
  364. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  365. 'The request-target must not be followed by a line break. (RFC7230 3.1.1)'});
  366. $\s -> parse_version(Rest, State, M, P, Q);
  367. _ -> skip_uri_fragment(Rest, State, M, P, Q)
  368. end.
  369. parse_version(<< "HTTP/1.1\r\n", Rest/bits >>, State, M, P, Q) ->
  370. parse_headers(Rest, State, M, P, Q, 'HTTP/1.1');
  371. parse_version(<< "HTTP/1.0\r\n", Rest/bits >>, State, M, P, Q) ->
  372. parse_headers(Rest, State, M, P, Q, 'HTTP/1.0');
  373. parse_version(<< "HTTP/1.", _, C, _/bits >>, State, _, _, _) when C =:= $\s; C =:= $\t ->
  374. error_terminate(400, State, {connection_error, protocol_error,
  375. 'Whitespace is not allowed after the HTTP version. (RFC7230 3.1.1)'});
  376. parse_version(<< C, _/bits >>, State, _, _, _) when C =:= $\s; C =:= $\t ->
  377. error_terminate(400, State, {connection_error, protocol_error,
  378. 'The separator between request target and version must be a single SP. (RFC7230 3.1.1)'});
  379. parse_version(_, State, _, _, _) ->
  380. error_terminate(505, State, {connection_error, protocol_error,
  381. 'Unsupported HTTP version. (RFC7230 2.6)'}).
  382. parse_headers(Rest, State, M, P, Q, V) ->
  383. parse_header(Rest, State#state{in_state=#ps_header{
  384. method=M, path=P, qs=Q, version=V}}, #{}).
  385. %% Headers.
  386. %% We need two or more bytes in the buffer to continue.
  387. parse_header(Rest, State=#state{in_state=PS}, Headers) when byte_size(Rest) < 2 ->
  388. {more, State#state{in_state=PS#ps_header{headers=Headers}}, Rest};
  389. parse_header(<< $\r, $\n, Rest/bits >>, S, Headers) ->
  390. request(Rest, S, Headers);
  391. parse_header(Buffer, State=#state{opts=Opts, in_state=PS}, Headers) ->
  392. MaxLength = maps:get(max_header_name_length, Opts, 64),
  393. MaxHeaders = maps:get(max_headers, Opts, 100),
  394. NumHeaders = maps:size(Headers),
  395. case match_colon(Buffer, 0) of
  396. nomatch when byte_size(Buffer) > MaxLength ->
  397. error_terminate(431, State, {connection_error, limit_reached,
  398. 'A header name is larger than configuration allows. (RFC7230 3.2.5, RFC6585 5)'});
  399. nomatch when NumHeaders >= MaxHeaders ->
  400. error_terminate(431, State, {connection_error, limit_reached,
  401. 'The number of headers is larger than configuration allows. (RFC7230 3.2.5, RFC6585 5)'});
  402. nomatch ->
  403. {more, State#state{in_state=PS#ps_header{headers=Headers}}, Buffer};
  404. _ ->
  405. parse_hd_name(Buffer, State, Headers, <<>>)
  406. end.
  407. match_colon(<< $:, _/bits >>, N) ->
  408. N;
  409. match_colon(<< _, Rest/bits >>, N) ->
  410. match_colon(Rest, N + 1);
  411. match_colon(_, _) ->
  412. nomatch.
  413. parse_hd_name(<< $:, Rest/bits >>, State, H, SoFar) ->
  414. parse_hd_before_value(Rest, State, H, SoFar);
  415. parse_hd_name(<< C, _/bits >>, State, _, <<>>) when ?IS_WS(C) ->
  416. error_terminate(400, State, {connection_error, protocol_error,
  417. 'Whitespace is not allowed between the header name and the colon. (RFC7230 3.2)'});
  418. parse_hd_name(<< C, Rest/bits >>, State, H, SoFar) when ?IS_WS(C) ->
  419. parse_hd_name_ws(Rest, State, H, SoFar);
  420. parse_hd_name(<< C, Rest/bits >>, State, H, SoFar) ->
  421. ?LOWER(parse_hd_name, Rest, State, H, SoFar).
  422. parse_hd_name_ws(<< C, Rest/bits >>, S, H, Name) ->
  423. case C of
  424. $\s -> parse_hd_name_ws(Rest, S, H, Name);
  425. $\t -> parse_hd_name_ws(Rest, S, H, Name);
  426. $: -> parse_hd_before_value(Rest, S, H, Name)
  427. end.
  428. parse_hd_before_value(<< $\s, Rest/bits >>, S, H, N) ->
  429. parse_hd_before_value(Rest, S, H, N);
  430. parse_hd_before_value(<< $\t, Rest/bits >>, S, H, N) ->
  431. parse_hd_before_value(Rest, S, H, N);
  432. parse_hd_before_value(Buffer, State=#state{opts=Opts, in_state=PS}, H, N) ->
  433. MaxLength = maps:get(max_header_value_length, Opts, 4096),
  434. case match_eol(Buffer, 0) of
  435. nomatch when byte_size(Buffer) > MaxLength ->
  436. error_terminate(431, State, {connection_error, limit_reached,
  437. 'A header value is larger than configuration allows. (RFC7230 3.2.5, RFC6585 5)'});
  438. nomatch ->
  439. {more, State#state{in_state=PS#ps_header{headers=H, name=N}}, Buffer};
  440. _ ->
  441. parse_hd_value(Buffer, State, H, N, <<>>)
  442. end.
  443. parse_hd_value(<< $\r, $\n, Rest/bits >>, S, Headers0, Name, SoFar) ->
  444. Value = clean_value_ws_end(SoFar, byte_size(SoFar) - 1),
  445. Headers = case maps:get(Name, Headers0, undefined) of
  446. undefined -> Headers0#{Name => Value};
  447. %% The cookie header does not use proper HTTP header lists.
  448. Value0 when Name =:= <<"cookie">> -> Headers0#{Name => << Value0/binary, "; ", Value/binary >>};
  449. Value0 -> Headers0#{Name => << Value0/binary, ", ", Value/binary >>}
  450. end,
  451. parse_header(Rest, S, Headers);
  452. parse_hd_value(<< C, Rest/bits >>, S, H, N, SoFar) ->
  453. parse_hd_value(Rest, S, H, N, << SoFar/binary, C >>).
  454. clean_value_ws_end(_, -1) ->
  455. <<>>;
  456. clean_value_ws_end(Value, N) ->
  457. case binary:at(Value, N) of
  458. $\s -> clean_value_ws_end(Value, N - 1);
  459. $\t -> clean_value_ws_end(Value, N - 1);
  460. _ ->
  461. S = N + 1,
  462. << Value2:S/binary, _/bits >> = Value,
  463. Value2
  464. end.
  465. -ifdef(TEST).
  466. clean_value_ws_end_test_() ->
  467. Tests = [
  468. {<<>>, <<>>},
  469. {<<" ">>, <<>>},
  470. {<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
  471. "text/html;level=2;q=0.4, */*;q=0.5 \t \t ">>,
  472. <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
  473. "text/html;level=2;q=0.4, */*;q=0.5">>}
  474. ],
  475. [{V, fun() -> R = clean_value_ws_end(V, byte_size(V) - 1) end} || {V, R} <- Tests].
  476. horse_clean_value_ws_end() ->
  477. horse:repeat(200000,
  478. clean_value_ws_end(
  479. <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
  480. "text/html;level=2;q=0.4, */*;q=0.5 ">>,
  481. byte_size(<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
  482. "text/html;level=2;q=0.4, */*;q=0.5 ">>) - 1)
  483. ).
  484. -endif.
  485. request(Buffer, State=#state{transport=Transport, in_streamid=StreamID,
  486. in_state=#ps_header{version=Version}}, Headers) ->
  487. case maps:get(<<"host">>, Headers, undefined) of
  488. undefined when Version =:= 'HTTP/1.1' ->
  489. %% @todo Might want to not close the connection on this and next one.
  490. error_terminate(400, State, {stream_error, StreamID, protocol_error,
  491. 'HTTP/1.1 requests must include a host header. (RFC7230 5.4)'});
  492. undefined ->
  493. request(Buffer, State, Headers, <<>>, default_port(Transport:secure()));
  494. RawHost ->
  495. try cow_http_hd:parse_host(RawHost) of
  496. {Host, undefined} ->
  497. request(Buffer, State, Headers, Host, default_port(Transport:secure()));
  498. {Host, Port} ->
  499. request(Buffer, State, Headers, Host, Port)
  500. catch _:_ ->
  501. error_terminate(400, State, {stream_error, StreamID, protocol_error,
  502. 'The host header is invalid. (RFC7230 5.4)'})
  503. end
  504. end.
  505. -spec default_port(boolean()) -> 80 | 443.
  506. default_port(true) -> 443;
  507. default_port(_) -> 80.
  508. %% End of request parsing.
  509. request(Buffer, State0=#state{ref=Ref, transport=Transport, peer=Peer, in_streamid=StreamID,
  510. in_state=#ps_header{method=Method, path=Path, qs=Qs, version=Version}},
  511. Headers, Host, Port) ->
  512. Scheme = case Transport:secure() of
  513. true -> <<"https">>;
  514. false -> <<"http">>
  515. end,
  516. {HasBody, BodyLength, TDecodeFun, TDecodeState} = case Headers of
  517. #{<<"content-length">> := <<"0">>} ->
  518. {false, 0, undefined, undefined};
  519. #{<<"content-length">> := BinLength} ->
  520. Length = try
  521. cow_http_hd:parse_content_length(BinLength)
  522. catch _:_ ->
  523. error_terminate(400, State0, {stream_error, StreamID, protocol_error,
  524. 'The content-length header is invalid. (RFC7230 3.3.2)'})
  525. %% @todo Err should terminate here...
  526. end,
  527. {true, Length, fun cow_http_te:stream_identity/2, {0, Length}};
  528. %% @todo Better handling of transfer decoding.
  529. #{<<"transfer-encoding">> := <<"chunked">>} ->
  530. {true, undefined, fun cow_http_te:stream_chunked/2, {0, 0}};
  531. _ ->
  532. {false, 0, undefined, undefined}
  533. end,
  534. Req = #{
  535. ref => Ref,
  536. pid => self(),
  537. streamid => StreamID,
  538. peer => Peer,
  539. method => Method,
  540. scheme => Scheme,
  541. host => Host,
  542. port => Port,
  543. %% @todo The path component needs to be normalized.
  544. path => Path,
  545. qs => Qs,
  546. version => Version,
  547. %% We are transparently taking care of transfer-encodings so
  548. %% the user code has no need to know about it.
  549. headers => maps:remove(<<"transfer-encoding">>, Headers),
  550. has_body => HasBody,
  551. body_length => BodyLength
  552. },
  553. case is_http2_upgrade(Headers, Version) of
  554. false ->
  555. State = case HasBody of
  556. true ->
  557. cancel_request_timeout(State0#state{in_state=#ps_body{
  558. %% @todo Don't need length anymore?
  559. transfer_decode_fun = TDecodeFun,
  560. transfer_decode_state = TDecodeState
  561. }});
  562. false ->
  563. set_request_timeout(State0#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}})
  564. end,
  565. {request, Req, State, Buffer};
  566. {true, HTTP2Settings} ->
  567. http2_upgrade(State0, Buffer, HTTP2Settings, Req)
  568. end.
  569. %% HTTP/2 upgrade.
  570. is_http2_upgrade(#{<<"connection">> := Conn, <<"upgrade">> := Upgrade,
  571. <<"http2-settings">> := HTTP2Settings}, 'HTTP/1.1') ->
  572. Conns = cow_http_hd:parse_connection(Conn),
  573. case {lists:member(<<"upgrade">>, Conns), lists:member(<<"http2-settings">>, Conns)} of
  574. {true, true} ->
  575. Protocols = cow_http_hd:parse_upgrade(Upgrade),
  576. case lists:member(<<"h2c">>, Protocols) of
  577. true ->
  578. {true, HTTP2Settings};
  579. false ->
  580. false
  581. end;
  582. _ ->
  583. false
  584. end;
  585. is_http2_upgrade(_, _) ->
  586. false.
  587. %% Upgrade through an HTTP/1.1 request.
  588. %% Prior knowledge upgrade, without an HTTP/1.1 request.
  589. http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
  590. opts=Opts, peer=Peer}, Buffer) ->
  591. case Transport:secure() of
  592. false ->
  593. _ = cancel_request_timeout(State),
  594. cowboy_http2:init(Parent, Ref, Socket, Transport, Opts, Peer, Buffer);
  595. true ->
  596. error_terminate(400, State, {connection_error, protocol_error,
  597. 'Clients that support HTTP/2 over TLS MUST use ALPN. (RFC7540 3.4)'})
  598. end.
  599. http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
  600. opts=Opts, peer=Peer}, Buffer, HTTP2Settings, Req) ->
  601. %% @todo
  602. %% However if the client sent a body, we need to read the body in full
  603. %% and if we can't do that, return a 413 response. Some options are in order.
  604. %% Always half-closed stream coming from this side.
  605. try cow_http_hd:parse_http2_settings(HTTP2Settings) of
  606. Settings ->
  607. Transport:send(Socket, cow_http:response(101, 'HTTP/1.1', maps:to_list(#{
  608. <<"connection">> => <<"Upgrade">>,
  609. <<"upgrade">> => <<"h2c">>
  610. }))),
  611. %% @todo Possibly redirect the request if it was https.
  612. _ = cancel_request_timeout(State),
  613. cowboy_http2:init(Parent, Ref, Socket, Transport, Opts, Peer, Buffer, Settings, Req)
  614. catch _:_ ->
  615. error_terminate(400, State, {connection_error, protocol_error,
  616. 'The HTTP2-Settings header contains a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'})
  617. end.
  618. %% Request body parsing.
  619. parse_body(Buffer, State=#state{in_streamid=StreamID, in_state=
  620. PS=#ps_body{transfer_decode_fun=TDecode, transfer_decode_state=TState0}}) ->
  621. %% @todo Proper trailers.
  622. case TDecode(Buffer, TState0) of
  623. more ->
  624. %% @todo Asks for 0 or more bytes.
  625. {more, State, Buffer};
  626. {more, Data, TState} ->
  627. %% @todo Asks for 0 or more bytes.
  628. {data, StreamID, nofin, Data, State#state{in_state=
  629. PS#ps_body{transfer_decode_state=TState}}, <<>>};
  630. {more, Data, _Length, TState} when is_integer(_Length) ->
  631. %% @todo Asks for Length more bytes.
  632. {data, StreamID, nofin, Data, State#state{in_state=
  633. PS#ps_body{transfer_decode_state=TState}}, <<>>};
  634. {more, Data, Rest, TState} ->
  635. %% @todo Asks for 0 or more bytes.
  636. {data, StreamID, nofin, Data, State#state{in_state=
  637. PS#ps_body{transfer_decode_state=TState}}, Rest};
  638. {done, TotalLength, Rest} ->
  639. {data, StreamID, {fin, TotalLength}, <<>>, set_request_timeout(
  640. State#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}), Rest};
  641. {done, Data, TotalLength, Rest} ->
  642. {data, StreamID, {fin, TotalLength}, Data, set_request_timeout(
  643. State#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}), Rest}
  644. end.
  645. %% Message handling.
  646. %% @todo There is a difference in behavior between HTTP/1.1 and HTTP/2
  647. %% when an error or crash occurs after sending a 500 response. In HTTP/2
  648. %% the error will be printed, in HTTP/1.1 the error will be ignored.
  649. %% This is due to HTTP/1.1 disabling streams differently after both
  650. %% requests and responses have been sent.
  651. down(State=#state{children=Children0}, Pid, Msg) ->
  652. case lists:keytake(Pid, 1, Children0) of
  653. {value, {_, undefined, _}, Children} ->
  654. State#state{children=Children};
  655. {value, {_, StreamID, _}, Children} ->
  656. info(State#state{children=Children}, StreamID, Msg);
  657. false ->
  658. error_logger:error_msg("Received EXIT signal ~p for unknown process ~p.~n", [Msg, Pid]),
  659. State
  660. end.
  661. info(State=#state{streams=Streams0}, StreamID, Msg) ->
  662. case lists:keyfind(StreamID, #stream.id, Streams0) of
  663. Stream = #stream{state=StreamState0} ->
  664. try cowboy_stream:info(StreamID, Msg, StreamState0) of
  665. {Commands, StreamState} ->
  666. Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
  667. Stream#stream{state=StreamState}),
  668. commands(State#state{streams=Streams}, StreamID, Commands)
  669. catch Class:Reason ->
  670. error_logger:error_msg("Exception occurred in "
  671. "cowboy_stream:info(~p, ~p, ~p) with reason ~p:~p.",
  672. [StreamID, Msg, StreamState0, Class, Reason]),
  673. ok
  674. %% @todo
  675. % stream_reset(State, StreamID, {internal_error, {Class, Reason},
  676. % 'Exception occurred in StreamHandler:info/3 call.'})
  677. end;
  678. false ->
  679. error_logger:error_msg("Received message ~p for unknown stream ~p.~n", [Msg, StreamID]),
  680. State
  681. end.
  682. %% Commands.
  683. commands(State, _, []) ->
  684. State;
  685. %% Supervise a child process.
  686. commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) ->
  687. commands(State#state{children=[{Pid, StreamID, Shutdown}|Children]}, StreamID, Tail);
  688. %% Error handling.
  689. commands(State, StreamID, [Error = {internal_error, _, _}|Tail]) ->
  690. commands(stream_reset(State, StreamID, Error), StreamID, Tail);
  691. %% Commands for a stream currently inactive.
  692. commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Commands)
  693. when Current =/= StreamID ->
  694. %% @todo We still want to handle some commands...
  695. Stream = #stream{queue=Queue} = lists:keyfind(StreamID, #stream.id, Streams0),
  696. Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
  697. Stream#stream{queue=Queue ++ Commands}),
  698. State#state{streams=Streams};
  699. %% Read the request body.
  700. commands(State, StreamID, [{flow, _Length}|Tail]) ->
  701. %% @todo We only read from socket if buffer is empty, otherwise
  702. %% we decode the buffer.
  703. %% @todo Set the body reading length to min(Length, BodyLength)
  704. commands(State, StreamID, Tail);
  705. %% Error responses are sent only if a response wasn't sent already.
  706. commands(State=#state{out_state=wait}, StreamID, [{error_response, StatusCode, Headers, Body}|Tail]) ->
  707. commands(State, StreamID, [{response, StatusCode, Headers, Body}|Tail]);
  708. commands(State, StreamID, [{error_response, _, _, _}|Tail]) ->
  709. commands(State, StreamID, Tail);
  710. %% Send a full response.
  711. %%
  712. %% @todo Kill the stream if it sent a response when one has already been sent.
  713. %% @todo Keep IsFin in the state.
  714. %% @todo Same two things above apply to DATA, possibly promise too.
  715. commands(State0=#state{socket=Socket, transport=Transport, out_state=wait, streams=Streams}, StreamID,
  716. [{response, StatusCode, Headers0, Body}|Tail]) ->
  717. %% @todo I'm pretty sure the last stream in the list is the one we want
  718. %% considering all others are queued.
  719. #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
  720. {State, Headers} = connection(State0, Headers0, StreamID, Version),
  721. %% @todo Ensure content-length is set.
  722. Response = cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers)),
  723. case Body of
  724. {sendfile, O, B, P} ->
  725. Transport:send(Socket, Response),
  726. commands(State#state{out_state=done}, StreamID, [{sendfile, fin, O, B, P}|Tail]);
  727. _ ->
  728. Transport:send(Socket, [Response, Body]),
  729. %% @todo If max number of requests, close connection.
  730. %% @todo If IsFin, maybe skip body of current request.
  731. maybe_terminate(State#state{out_state=done}, StreamID, Tail, fin)
  732. end;
  733. %% Send response headers and initiate chunked encoding.
  734. commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
  735. [{headers, StatusCode, Headers0}|Tail]) ->
  736. %% @todo Same as above.
  737. #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
  738. {State1, Headers1} = case Version of
  739. 'HTTP/1.1' ->
  740. {State0, Headers0#{<<"transfer-encoding">> => <<"chunked">>}};
  741. %% Close the connection after streaming the data to HTTP/1.0 client.
  742. %% @todo I'm guessing we need to differentiate responses with a content-length and others.
  743. 'HTTP/1.0' ->
  744. {State0#state{last_streamid=StreamID}, Headers0}
  745. end,
  746. {State, Headers} = connection(State1, Headers1, StreamID, Version),
  747. Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers))),
  748. commands(State#state{out_state=chunked}, StreamID, Tail);
  749. %% Send a response body chunk.
  750. %%
  751. %% @todo WINDOW_UPDATE stuff require us to buffer some data.
  752. %% @todo We probably want to allow Data to be the {sendfile, ...} tuple also.
  753. commands(State=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
  754. [{data, IsFin, Data}|Tail]) ->
  755. %% Do not send anything when the user asks to send an empty
  756. %% data frame, as that would break the protocol.
  757. Size = iolist_size(Data),
  758. case Size of
  759. 0 -> ok;
  760. _ ->
  761. %% @todo We need to kill the stream if it tries to send data before headers.
  762. %% @todo Same as above.
  763. case lists:keyfind(StreamID, #stream.id, Streams) of
  764. #stream{version='HTTP/1.1'} ->
  765. Transport:send(Socket, [integer_to_binary(Size, 16), <<"\r\n">>, Data, <<"\r\n">>]);
  766. #stream{version='HTTP/1.0'} ->
  767. Transport:send(Socket, Data)
  768. end
  769. end,
  770. maybe_terminate(State, StreamID, Tail, IsFin);
  771. %% Send a file.
  772. commands(State=#state{socket=Socket, transport=Transport}, StreamID,
  773. [{sendfile, IsFin, Offset, Bytes, Path}|Tail]) ->
  774. Transport:sendfile(Socket, Path, Offset, Bytes),
  775. maybe_terminate(State, StreamID, Tail, IsFin);
  776. %% Protocol takeover.
  777. commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport,
  778. opts=Opts, children=Children}, StreamID,
  779. [{switch_protocol, Headers, Protocol, InitialState}|_Tail]) ->
  780. %% @todo This should be the last stream running otherwise we need to wait before switching.
  781. %% @todo If there's streams opened after this one, fail instead of 101.
  782. State = cancel_request_timeout(State0),
  783. %% @todo When we actually do the upgrade, we only have the one stream left, plus
  784. %% possibly some processes terminating. We need a smart strategy for handling the
  785. %% children shutdown. We can start with brutal_kill and discarding the EXIT messages
  786. %% received before switching to Websocket. Something better would be to let the
  787. %% stream processes finish but that implies the Websocket module to know about
  788. %% them and filter the messages. For now, kill them all and discard all messages
  789. %% in the mailbox.
  790. _ = [exit(Pid, kill) || {Pid, _, _} <- Children],
  791. flush(),
  792. %% Everything good, upgrade!
  793. _ = commands(State, StreamID, [{response, 101, Headers, <<>>}]),
  794. %% @todo This is no good because commands return a state normally and here it doesn't
  795. %% we need to let this module go entirely. Perhaps it should be handled directly in
  796. %% cowboy_clear/cowboy_tls? Perhaps not. We do want that Buffer.
  797. Protocol:takeover(Parent, Ref, Socket, Transport, Opts, <<>>, InitialState);
  798. %% Stream shutdown.
  799. commands(State, StreamID, [stop|Tail]) ->
  800. %% @todo Do we want to run the commands after a stop?
  801. % commands(stream_terminate(State, StreamID, stop), StreamID, Tail).
  802. %% @todo I think that's where we need to terminate streams.
  803. maybe_terminate(State, StreamID, Tail, fin);
  804. %% HTTP/1.1 does not support push; ignore.
  805. commands(State, StreamID, [{push, _, _, _, _, _, _, _}|Tail]) ->
  806. commands(State, StreamID, Tail).
  807. %% The set-cookie header is special; we can only send one cookie per header.
  808. headers_to_list(Headers0=#{<<"set-cookie">> := SetCookies}) ->
  809. Headers1 = maps:to_list(maps:remove(<<"set-cookie">>, Headers0)),
  810. Headers1 ++ [{<<"set-cookie">>, Value} || Value <- SetCookies];
  811. headers_to_list(Headers) ->
  812. maps:to_list(Headers).
  813. flush() ->
  814. receive _ -> flush() after 0 -> ok end.
  815. maybe_terminate(State, StreamID, Tail, nofin) ->
  816. commands(State, StreamID, Tail);
  817. %% @todo In these cases I'm not sure if we should continue processing commands.
  818. maybe_terminate(State=#state{last_streamid=StreamID}, StreamID, _Tail, fin) ->
  819. terminate(stream_terminate(State, StreamID, normal), normal); %% @todo Reason ok?
  820. maybe_terminate(State, StreamID, _Tail, fin) ->
  821. stream_terminate(State, StreamID, normal).
  822. stream_reset(State, StreamID, StreamError={internal_error, _, _}) ->
  823. %% @todo headers
  824. %% @todo Don't send this if there are no streams left.
  825. % Transport:send(Socket, cow_http:response(500, 'HTTP/1.1', [
  826. % {<<"content-length">>, <<"0">>}
  827. % ])),
  828. %% @todo update IsFin local
  829. % stream_terminate(State#state{out_state=done}, StreamID, StreamError).
  830. stream_terminate(State, StreamID, StreamError).
  831. stream_terminate(State=#state{socket=Socket, transport=Transport,
  832. out_streamid=OutStreamID, out_state=OutState,
  833. streams=Streams0, children=Children0}, StreamID, Reason) ->
  834. {value, #stream{state=StreamState, version=Version}, Streams}
  835. = lists:keytake(StreamID, #stream.id, Streams0),
  836. _ = case OutState of
  837. wait ->
  838. Transport:send(Socket, cow_http:response(204, 'HTTP/1.1', []));
  839. chunked when Version =:= 'HTTP/1.1' ->
  840. Transport:send(Socket, <<"0\r\n\r\n">>);
  841. _ -> %% done or Version =:= 'HTTP/1.0'
  842. ok
  843. end,
  844. stream_call_terminate(StreamID, Reason, StreamState),
  845. %% @todo initiate children shutdown
  846. % Children = stream_terminate_children(Children0, StreamID, []),
  847. Children = [case C of
  848. {Pid, StreamID, Shutdown} -> {Pid, undefined, Shutdown};
  849. _ -> C
  850. end || C <- Children0],
  851. %% @todo Skip the body, if any, or drop the connection if too large.
  852. %% @todo Only do this if Current =:= StreamID.
  853. NextOutStreamID = OutStreamID + 1,
  854. case lists:keyfind(NextOutStreamID, #stream.id, Streams) of
  855. false ->
  856. %% @todo This is clearly wrong, if the stream is gone we need to check if
  857. %% there used to be such a stream, and if there was to send an error.
  858. State#state{out_streamid=NextOutStreamID, out_state=wait, streams=Streams, children=Children};
  859. #stream{queue=Commands} ->
  860. %% @todo Remove queue from the stream.
  861. commands(State#state{out_streamid=NextOutStreamID, out_state=wait,
  862. streams=Streams, children=Children}, NextOutStreamID, Commands)
  863. end.
  864. %% @todo Taken directly from _http2
  865. stream_call_terminate(StreamID, Reason, StreamState) ->
  866. try
  867. cowboy_stream:terminate(StreamID, Reason, StreamState)
  868. catch Class:Reason ->
  869. error_logger:error_msg("Exception occurred in "
  870. "cowboy_stream:terminate(~p, ~p, ~p) with reason ~p:~p.",
  871. [StreamID, Reason, StreamState, Class, Reason])
  872. end.
  873. %stream_terminate_children([], _, Acc) ->
  874. % Acc;
  875. %stream_terminate_children([{Pid, StreamID}|Tail], StreamID, Acc) ->
  876. % exit(Pid, kill),
  877. % stream_terminate_children(Tail, StreamID, Acc);
  878. %stream_terminate_children([Child|Tail], StreamID, Acc) ->
  879. % stream_terminate_children(Tail, StreamID, [Child|Acc]).
  880. %% @todo max_reqs also
  881. maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.0') ->
  882. Conns = cow_http_hd:parse_connection(Conn),
  883. case lists:member(<<"keep-alive">>, Conns) of
  884. true -> keepalive;
  885. false -> close
  886. end;
  887. maybe_req_close(_, _, 'HTTP/1.0') ->
  888. close;
  889. maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.1') ->
  890. case connection_hd_is_close(Conn) of
  891. true -> close;
  892. false -> keepalive
  893. end;
  894. maybe_req_close(_State, _, _) ->
  895. keepalive.
  896. connection(State=#state{last_streamid=StreamID}, Headers=#{<<"connection">> := Conn}, StreamID, _) ->
  897. case connection_hd_is_close(Conn) of
  898. true -> {State, Headers};
  899. %% @todo Here we need to remove keep-alive and add close, not just add close.
  900. false -> {State, Headers#{<<"connection">> => [<<"close, ">>, Conn]}}
  901. end;
  902. connection(State=#state{last_streamid=StreamID}, Headers, StreamID, _) ->
  903. {State, Headers#{<<"connection">> => <<"close">>}};
  904. connection(State, Headers=#{<<"connection">> := Conn}, StreamID, _) ->
  905. case connection_hd_is_close(Conn) of
  906. true -> {State#state{last_streamid=StreamID}, Headers};
  907. %% @todo Here we need to set keep-alive only if it wasn't set before.
  908. false -> {State, Headers}
  909. end;
  910. connection(State, Headers, _, 'HTTP/1.0') ->
  911. {State, Headers#{<<"connection">> => <<"keep-alive">>}};
  912. connection(State, Headers, _, _) ->
  913. {State, Headers}.
  914. connection_hd_is_close(Conn) ->
  915. Conns = cow_http_hd:parse_connection(iolist_to_binary(Conn)),
  916. lists:member(<<"close">>, Conns).
  917. -spec error_terminate(cowboy:http_status(), #state{}, _) -> no_return().
  918. error_terminate(StatusCode, State=#state{socket=Socket, transport=Transport}, Reason) ->
  919. Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', [
  920. {<<"content-length">>, <<"0">>}
  921. ])),
  922. terminate(State, Reason).
  923. -spec terminate(_, _) -> no_return().
  924. terminate(_State, _Reason) ->
  925. exit(normal). %% @todo We probably don't want to exit normal on errors.
  926. %% System callbacks.
  927. -spec system_continue(_, _, {#state{}, binary()}) -> ok.
  928. system_continue(_, _, {State, Buffer}) ->
  929. loop(State, Buffer).
  930. -spec system_terminate(any(), _, _, _) -> no_return().
  931. system_terminate(Reason, _, _, _) ->
  932. exit(Reason).
  933. -spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}.
  934. system_code_change(Misc, _, _, _) ->
  935. {ok, Misc}.