cowboy_http.erl 58 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511
  1. %% Copyright (c) 2016-2017, Loïc Hoguin <essen@ninenines.eu>
  2. %%
  3. %% Permission to use, copy, modify, and/or distribute this software for any
  4. %% purpose with or without fee is hereby granted, provided that the above
  5. %% copyright notice and this permission notice appear in all copies.
  6. %%
  7. %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  8. %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  9. %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  10. %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  11. %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  12. %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  13. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  14. -module(cowboy_http).
  15. -export([init/6]).
  16. -export([system_continue/3]).
  17. -export([system_terminate/4]).
  18. -export([system_code_change/4]).
  19. -type opts() :: #{
  20. active_n => pos_integer(),
  21. chunked => boolean(),
  22. compress_buffering => boolean(),
  23. compress_threshold => non_neg_integer(),
  24. connection_type => worker | supervisor,
  25. env => cowboy_middleware:env(),
  26. http10_keepalive => boolean(),
  27. idle_timeout => timeout(),
  28. inactivity_timeout => timeout(),
  29. initial_stream_flow_size => non_neg_integer(),
  30. linger_timeout => timeout(),
  31. logger => module(),
  32. max_authority_length => non_neg_integer(),
  33. max_empty_lines => non_neg_integer(),
  34. max_header_name_length => non_neg_integer(),
  35. max_header_value_length => non_neg_integer(),
  36. max_headers => non_neg_integer(),
  37. max_keepalive => non_neg_integer(),
  38. max_method_length => non_neg_integer(),
  39. max_request_line_length => non_neg_integer(),
  40. metrics_callback => cowboy_metrics_h:metrics_callback(),
  41. metrics_req_filter => fun((cowboy_req:req()) -> map()),
  42. metrics_resp_headers_filter => fun((cowboy:http_headers()) -> cowboy:http_headers()),
  43. middlewares => [module()],
  44. proxy_header => boolean(),
  45. request_timeout => timeout(),
  46. sendfile => boolean(),
  47. shutdown_timeout => timeout(),
  48. stream_handlers => [module()],
  49. tracer_callback => cowboy_tracer_h:tracer_callback(),
  50. tracer_flags => [atom()],
  51. tracer_match_specs => cowboy_tracer_h:tracer_match_specs(),
  52. %% Open ended because configured stream handlers might add options.
  53. _ => _
  54. }.
  55. -export_type([opts/0]).
  56. -record(ps_request_line, {
  57. empty_lines = 0 :: non_neg_integer()
  58. }).
  59. -record(ps_header, {
  60. method = undefined :: binary(),
  61. authority = undefined :: binary() | undefined,
  62. path = undefined :: binary(),
  63. qs = undefined :: binary(),
  64. version = undefined :: cowboy:http_version(),
  65. headers = undefined :: cowboy:http_headers() | undefined,
  66. name = undefined :: binary() | undefined
  67. }).
  68. -record(ps_body, {
  69. length :: non_neg_integer() | undefined,
  70. received = 0 :: non_neg_integer(),
  71. transfer_decode_fun :: fun((binary(), cow_http_te:state()) -> cow_http_te:decode_ret()),
  72. transfer_decode_state :: cow_http_te:state()
  73. }).
  74. -record(stream, {
  75. id = undefined :: cowboy_stream:streamid(),
  76. %% Stream handlers and their state.
  77. state = undefined :: {module(), any()},
  78. %% Request method.
  79. method = undefined :: binary(),
  80. %% Client HTTP version for this stream.
  81. version = undefined :: cowboy:http_version(),
  82. %% Unparsed te header. Used to know if we can send trailers.
  83. te :: undefined | binary(),
  84. %% Expected body size.
  85. local_expected_size = undefined :: undefined | non_neg_integer(),
  86. %% Sent body size.
  87. local_sent_size = 0 :: non_neg_integer(),
  88. %% Commands queued.
  89. queue = [] :: cowboy_stream:commands()
  90. }).
  91. -type stream() :: #stream{}.
  92. -record(state, {
  93. parent :: pid(),
  94. ref :: ranch:ref(),
  95. socket :: inet:socket(),
  96. transport :: module(),
  97. proxy_header :: undefined | ranch_proxy_header:proxy_info(),
  98. opts = #{} :: cowboy:opts(),
  99. buffer = <<>> :: binary(),
  100. %% Some options may be overriden for the current stream.
  101. overriden_opts = #{} :: cowboy:opts(),
  102. %% Remote address and port for the connection.
  103. peer = undefined :: {inet:ip_address(), inet:port_number()},
  104. %% Local address and port for the connection.
  105. sock = undefined :: {inet:ip_address(), inet:port_number()},
  106. %% Client certificate (TLS only).
  107. cert :: undefined | binary(),
  108. timer = undefined :: undefined | reference(),
  109. %% Whether we are currently receiving data from the socket.
  110. active = true :: boolean(),
  111. %% Identifier for the stream currently being read (or waiting to be received).
  112. in_streamid = 1 :: pos_integer(),
  113. %% Parsing state for the current stream or stream-to-be.
  114. in_state = #ps_request_line{} :: #ps_request_line{} | #ps_header{} | #ps_body{},
  115. %% Flow requested for the current stream.
  116. flow = infinity :: non_neg_integer() | infinity,
  117. %% Identifier for the stream currently being written.
  118. %% Note that out_streamid =< in_streamid.
  119. out_streamid = 1 :: pos_integer(),
  120. %% Whether we finished writing data for the current stream.
  121. out_state = wait :: wait | chunked | streaming | done,
  122. %% The connection will be closed after this stream.
  123. last_streamid = undefined :: pos_integer(),
  124. %% Currently active HTTP/1.1 streams.
  125. streams = [] :: [stream()],
  126. %% Children processes created by streams.
  127. children = cowboy_children:init() :: cowboy_children:children()
  128. }).
  129. -include_lib("cowlib/include/cow_inline.hrl").
  130. -include_lib("cowlib/include/cow_parse.hrl").
  131. -spec init(pid(), ranch:ref(), inet:socket(), module(),
  132. ranch_proxy_header:proxy_info(), cowboy:opts()) -> ok.
  133. init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
  134. Peer0 = Transport:peername(Socket),
  135. Sock0 = Transport:sockname(Socket),
  136. Cert1 = case Transport:name() of
  137. ssl ->
  138. case ssl:peercert(Socket) of
  139. {error, no_peercert} ->
  140. {ok, undefined};
  141. Cert0 ->
  142. Cert0
  143. end;
  144. _ ->
  145. {ok, undefined}
  146. end,
  147. case {Peer0, Sock0, Cert1} of
  148. {{ok, Peer}, {ok, Sock}, {ok, Cert}} ->
  149. State = #state{
  150. parent=Parent, ref=Ref, socket=Socket,
  151. transport=Transport, proxy_header=ProxyHeader, opts=Opts,
  152. peer=Peer, sock=Sock, cert=Cert,
  153. last_streamid=maps:get(max_keepalive, Opts, 100)},
  154. setopts_active(State),
  155. loop(set_timeout(State, request_timeout));
  156. {{error, Reason}, _, _} ->
  157. terminate(undefined, {socket_error, Reason,
  158. 'A socket error occurred when retrieving the peer name.'});
  159. {_, {error, Reason}, _} ->
  160. terminate(undefined, {socket_error, Reason,
  161. 'A socket error occurred when retrieving the sock name.'});
  162. {_, _, {error, Reason}} ->
  163. terminate(undefined, {socket_error, Reason,
  164. 'A socket error occurred when retrieving the client TLS certificate.'})
  165. end.
  166. setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
  167. N = maps:get(active_n, Opts, 100),
  168. Transport:setopts(Socket, [{active, N}]).
  169. active(State) ->
  170. setopts_active(State),
  171. State#state{active=true}.
  172. passive(State=#state{socket=Socket, transport=Transport}) ->
  173. Transport:setopts(Socket, [{active, false}]),
  174. Messages = Transport:messages(),
  175. flush_passive(Socket, Messages),
  176. State#state{active=false}.
  177. flush_passive(Socket, Messages) ->
  178. receive
  179. {Passive, Socket} when Passive =:= element(4, Messages);
  180. %% Hardcoded for compatibility with Ranch 1.x.
  181. Passive =:= tcp_passive; Passive =:= ssl_passive ->
  182. flush_passive(Socket, Messages)
  183. after 0 ->
  184. ok
  185. end.
  186. loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
  187. buffer=Buffer, timer=TimerRef, children=Children, in_streamid=InStreamID,
  188. last_streamid=LastStreamID}) ->
  189. Messages = Transport:messages(),
  190. InactivityTimeout = maps:get(inactivity_timeout, Opts, 300000),
  191. receive
  192. %% Discard data coming in after the last request
  193. %% we want to process was received fully.
  194. {OK, Socket, _} when OK =:= element(1, Messages), InStreamID > LastStreamID ->
  195. loop(State);
  196. %% Socket messages.
  197. {OK, Socket, Data} when OK =:= element(1, Messages) ->
  198. parse(<< Buffer/binary, Data/binary >>, State);
  199. {Closed, Socket} when Closed =:= element(2, Messages) ->
  200. terminate(State, {socket_error, closed, 'The socket has been closed.'});
  201. {Error, Socket, Reason} when Error =:= element(3, Messages) ->
  202. terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
  203. {Passive, Socket} when Passive =:= element(4, Messages);
  204. %% Hardcoded for compatibility with Ranch 1.x.
  205. Passive =:= tcp_passive; Passive =:= ssl_passive ->
  206. setopts_active(State),
  207. loop(State);
  208. %% Timeouts.
  209. {timeout, Ref, {shutdown, Pid}} ->
  210. cowboy_children:shutdown_timeout(Children, Ref, Pid),
  211. loop(State);
  212. {timeout, TimerRef, Reason} ->
  213. timeout(State, Reason);
  214. {timeout, _, _} ->
  215. loop(State);
  216. %% System messages.
  217. {'EXIT', Parent, Reason} ->
  218. terminate(State, {stop, {exit, Reason}, 'Parent process terminated.'});
  219. {system, From, Request} ->
  220. sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State);
  221. %% Messages pertaining to a stream.
  222. {{Pid, StreamID}, Msg} when Pid =:= self() ->
  223. loop(info(State, StreamID, Msg));
  224. %% Exit signal from children.
  225. Msg = {'EXIT', Pid, _} ->
  226. loop(down(State, Pid, Msg));
  227. %% Calls from supervisor module.
  228. {'$gen_call', From, Call} ->
  229. cowboy_children:handle_supervisor_call(Call, From, Children, ?MODULE),
  230. loop(State);
  231. %% Unknown messages.
  232. Msg ->
  233. cowboy:log(warning, "Received stray message ~p.~n", [Msg], Opts),
  234. loop(State)
  235. after InactivityTimeout ->
  236. terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
  237. end.
  238. %% We do not set request_timeout if there are active streams.
  239. set_timeout(State=#state{streams=[_|_]}, request_timeout) ->
  240. State;
  241. %% We do not set request_timeout if we are skipping a body.
  242. set_timeout(State=#state{in_state=#ps_body{}}, request_timeout) ->
  243. State;
  244. %% We do not set idle_timeout if there are no active streams,
  245. %% unless when we are skipping a body.
  246. set_timeout(State=#state{streams=[], in_state=InState}, idle_timeout)
  247. when element(1, InState) =/= ps_body ->
  248. State;
  249. %% Otherwise we can set the timeout.
  250. set_timeout(State0=#state{opts=Opts, overriden_opts=Override}, Name) ->
  251. State = cancel_timeout(State0),
  252. Default = case Name of
  253. request_timeout -> 5000;
  254. idle_timeout -> 60000
  255. end,
  256. Timeout = case Override of
  257. %% The timeout may have been overriden for the current stream.
  258. #{Name := Timeout0} -> Timeout0;
  259. _ -> maps:get(Name, Opts, Default)
  260. end,
  261. TimerRef = case Timeout of
  262. infinity -> undefined;
  263. Timeout -> erlang:start_timer(Timeout, self(), Name)
  264. end,
  265. State#state{timer=TimerRef}.
  266. cancel_timeout(State=#state{timer=TimerRef}) ->
  267. ok = case TimerRef of
  268. undefined ->
  269. ok;
  270. _ ->
  271. %% Do a synchronous cancel and remove the message if any
  272. %% to avoid receiving stray messages.
  273. _ = erlang:cancel_timer(TimerRef),
  274. receive
  275. {timeout, TimerRef, _} -> ok
  276. after 0 ->
  277. ok
  278. end
  279. end,
  280. State#state{timer=undefined}.
  281. -spec timeout(_, _) -> no_return().
  282. timeout(State=#state{in_state=#ps_request_line{}}, request_timeout) ->
  283. terminate(State, {connection_error, timeout,
  284. 'No request-line received before timeout.'});
  285. timeout(State=#state{in_state=#ps_header{}}, request_timeout) ->
  286. error_terminate(408, State, {connection_error, timeout,
  287. 'Request headers not received before timeout.'});
  288. timeout(State, idle_timeout) ->
  289. terminate(State, {connection_error, timeout,
  290. 'Connection idle longer than configuration allows.'}).
  291. parse(<<>>, State) ->
  292. loop(State#state{buffer= <<>>});
  293. %% Do not process requests that come in after the last request
  294. %% and discard the buffer if any to save memory.
  295. parse(_, State=#state{in_streamid=InStreamID, in_state=#ps_request_line{},
  296. last_streamid=LastStreamID}) when InStreamID > LastStreamID ->
  297. loop(State#state{buffer= <<>>});
  298. parse(Buffer, State=#state{in_state=#ps_request_line{empty_lines=EmptyLines}}) ->
  299. after_parse(parse_request(Buffer, State, EmptyLines));
  300. parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=undefined}}) ->
  301. after_parse(parse_header(Buffer,
  302. State#state{in_state=PS#ps_header{headers=undefined}},
  303. Headers));
  304. parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=Name}}) ->
  305. after_parse(parse_hd_before_value(Buffer,
  306. State#state{in_state=PS#ps_header{headers=undefined, name=undefined}},
  307. Headers, Name));
  308. parse(Buffer, State=#state{in_state=#ps_body{}}) ->
  309. after_parse(parse_body(Buffer, State)).
  310. after_parse({request, Req=#{streamid := StreamID, method := Method,
  311. headers := Headers, version := Version},
  312. State0=#state{opts=Opts, buffer=Buffer, streams=Streams0}}) ->
  313. try cowboy_stream:init(StreamID, Req, Opts) of
  314. {Commands, StreamState} ->
  315. Flow = maps:get(initial_stream_flow_size, Opts, 65535),
  316. TE = maps:get(<<"te">>, Headers, undefined),
  317. Streams = [#stream{id=StreamID, state=StreamState,
  318. method=Method, version=Version, te=TE}|Streams0],
  319. State1 = case maybe_req_close(State0, Headers, Version) of
  320. close -> State0#state{streams=Streams, last_streamid=StreamID, flow=Flow};
  321. keepalive -> State0#state{streams=Streams, flow=Flow}
  322. end,
  323. State = set_timeout(State1, idle_timeout),
  324. parse(Buffer, commands(State, StreamID, Commands))
  325. catch Class:Exception:Stacktrace ->
  326. cowboy:log(cowboy_stream:make_error_log(init,
  327. [StreamID, Req, Opts],
  328. Class, Exception, Stacktrace), Opts),
  329. early_error(500, State0, {internal_error, {Class, Exception},
  330. 'Unhandled exception in cowboy_stream:init/3.'}, Req),
  331. parse(Buffer, State0)
  332. end;
  333. %% Streams are sequential so the body is always about the last stream created
  334. %% unless that stream has terminated.
  335. after_parse({data, StreamID, IsFin, Data, State0=#state{opts=Opts, buffer=Buffer,
  336. streams=Streams0=[Stream=#stream{id=StreamID, state=StreamState0}|_]}}) ->
  337. try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
  338. {Commands, StreamState} ->
  339. Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
  340. Stream#stream{state=StreamState}),
  341. State1 = set_timeout(State0, case IsFin of
  342. fin -> request_timeout;
  343. nofin -> idle_timeout
  344. end),
  345. State = update_flow(IsFin, Data, State1#state{streams=Streams}),
  346. parse(Buffer, commands(State, StreamID, Commands))
  347. catch Class:Exception:Stacktrace ->
  348. cowboy:log(cowboy_stream:make_error_log(data,
  349. [StreamID, IsFin, Data, StreamState0],
  350. Class, Exception, Stacktrace), Opts),
  351. %% @todo Should call parse after this.
  352. stream_terminate(State0, StreamID, {internal_error, {Class, Exception},
  353. 'Unhandled exception in cowboy_stream:data/4.'})
  354. end;
  355. %% No corresponding stream. We must skip the body of the previous request
  356. %% in order to process the next one.
  357. after_parse({data, _, IsFin, _, State}) ->
  358. loop(set_timeout(State, case IsFin of
  359. fin -> request_timeout;
  360. nofin -> idle_timeout
  361. end));
  362. after_parse({more, State}) ->
  363. loop(set_timeout(State, idle_timeout)).
  364. update_flow(fin, _, State) ->
  365. %% This function is only called after parsing, therefore we
  366. %% are expecting to be in active mode already.
  367. State#state{flow=infinity};
  368. update_flow(nofin, Data, State0=#state{flow=Flow0}) ->
  369. Flow = Flow0 - byte_size(Data),
  370. State = State0#state{flow=Flow},
  371. if
  372. Flow0 > 0, Flow =< 0 ->
  373. passive(State);
  374. true ->
  375. State
  376. end.
  377. %% Request-line.
  378. -spec parse_request(Buffer, State, non_neg_integer())
  379. -> {request, cowboy_req:req(), State}
  380. | {data, cowboy_stream:streamid(), cowboy_stream:fin(), binary(), State}
  381. | {more, State}
  382. when Buffer::binary(), State::#state{}.
  383. %% Empty lines must be using \r\n.
  384. parse_request(<< $\n, _/bits >>, State, _) ->
  385. error_terminate(400, State, {connection_error, protocol_error,
  386. 'Empty lines between requests must use the CRLF line terminator. (RFC7230 3.5)'});
  387. parse_request(<< $\s, _/bits >>, State, _) ->
  388. error_terminate(400, State, {connection_error, protocol_error,
  389. 'The request-line must not begin with a space. (RFC7230 3.1.1, RFC7230 3.5)'});
  390. %% We limit the length of the Request-line to MaxLength to avoid endlessly
  391. %% reading from the socket and eventually crashing.
  392. parse_request(Buffer, State=#state{opts=Opts, in_streamid=InStreamID}, EmptyLines) ->
  393. MaxLength = maps:get(max_request_line_length, Opts, 8000),
  394. MaxEmptyLines = maps:get(max_empty_lines, Opts, 5),
  395. case match_eol(Buffer, 0) of
  396. nomatch when byte_size(Buffer) > MaxLength ->
  397. error_terminate(414, State, {connection_error, limit_reached,
  398. 'The request-line length is larger than configuration allows. (RFC7230 3.1.1)'});
  399. nomatch ->
  400. {more, State#state{buffer=Buffer, in_state=#ps_request_line{empty_lines=EmptyLines}}};
  401. 1 when EmptyLines =:= MaxEmptyLines ->
  402. error_terminate(400, State, {connection_error, limit_reached,
  403. 'More empty lines were received than configuration allows. (RFC7230 3.5)'});
  404. 1 ->
  405. << _:16, Rest/bits >> = Buffer,
  406. parse_request(Rest, State, EmptyLines + 1);
  407. _ ->
  408. case Buffer of
  409. %% @todo * is only for server-wide OPTIONS request (RFC7230 5.3.4); tests
  410. << "OPTIONS * ", Rest/bits >> ->
  411. parse_version(Rest, State, <<"OPTIONS">>, undefined, <<"*">>, <<>>);
  412. <<"CONNECT ", _/bits>> ->
  413. error_terminate(501, State, {connection_error, no_error,
  414. 'The CONNECT method is currently not implemented. (RFC7231 4.3.6)'});
  415. <<"TRACE ", _/bits>> ->
  416. error_terminate(501, State, {connection_error, no_error,
  417. 'The TRACE method is currently not implemented. (RFC7231 4.3.8)'});
  418. %% Accept direct HTTP/2 only at the beginning of the connection.
  419. << "PRI * HTTP/2.0\r\n", _/bits >> when InStreamID =:= 1 ->
  420. %% @todo Might be worth throwing to get a clean stacktrace.
  421. http2_upgrade(State, Buffer);
  422. _ ->
  423. parse_method(Buffer, State, <<>>,
  424. maps:get(max_method_length, Opts, 32))
  425. end
  426. end.
  427. match_eol(<< $\n, _/bits >>, N) ->
  428. N;
  429. match_eol(<< _, Rest/bits >>, N) ->
  430. match_eol(Rest, N + 1);
  431. match_eol(_, _) ->
  432. nomatch.
  433. parse_method(_, State, _, 0) ->
  434. error_terminate(501, State, {connection_error, limit_reached,
  435. 'The method name is longer than configuration allows. (RFC7230 3.1.1)'});
  436. parse_method(<< C, Rest/bits >>, State, SoFar, Remaining) ->
  437. case C of
  438. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  439. 'The method name must not be followed with a line break. (RFC7230 3.1.1)'});
  440. $\s -> parse_uri(Rest, State, SoFar);
  441. _ when ?IS_TOKEN(C) -> parse_method(Rest, State, << SoFar/binary, C >>, Remaining - 1);
  442. _ -> error_terminate(400, State, {connection_error, protocol_error,
  443. 'The method name must contain only valid token characters. (RFC7230 3.1.1)'})
  444. end.
  445. parse_uri(<< H, T, T, P, "://", Rest/bits >>, State, Method)
  446. when H =:= $h orelse H =:= $H, T =:= $t orelse T =:= $T;
  447. P =:= $p orelse P =:= $P ->
  448. parse_uri_authority(Rest, State, Method);
  449. parse_uri(<< H, T, T, P, S, "://", Rest/bits >>, State, Method)
  450. when H =:= $h orelse H =:= $H, T =:= $t orelse T =:= $T;
  451. P =:= $p orelse P =:= $P; S =:= $s orelse S =:= $S ->
  452. parse_uri_authority(Rest, State, Method);
  453. parse_uri(<< $/, Rest/bits >>, State, Method) ->
  454. parse_uri_path(Rest, State, Method, undefined, <<$/>>);
  455. parse_uri(_, State, _) ->
  456. error_terminate(400, State, {connection_error, protocol_error,
  457. 'Invalid request-line or request-target. (RFC7230 3.1.1, RFC7230 5.3)'}).
  458. %% @todo We probably want to apply max_authority_length also
  459. %% to the host header and to document this option. It might
  460. %% also be useful for HTTP/2 requests.
  461. parse_uri_authority(Rest, State=#state{opts=Opts}, Method) ->
  462. parse_uri_authority(Rest, State, Method, <<>>,
  463. maps:get(max_authority_length, Opts, 255)).
  464. parse_uri_authority(_, State, _, _, 0) ->
  465. error_terminate(414, State, {connection_error, limit_reached,
  466. 'The authority component of the absolute URI is longer than configuration allows. (RFC7230 2.7.1)'});
  467. parse_uri_authority(<<C, Rest/bits>>, State, Method, SoFar, Remaining) ->
  468. case C of
  469. $\r ->
  470. error_terminate(400, State, {connection_error, protocol_error,
  471. 'The request-target must not be followed by a line break. (RFC7230 3.1.1)'});
  472. $@ ->
  473. error_terminate(400, State, {connection_error, protocol_error,
  474. 'Absolute URIs must not include a userinfo component. (RFC7230 2.7.1)'});
  475. C when SoFar =:= <<>> andalso
  476. ((C =:= $/) orelse (C =:= $\s) orelse (C =:= $?) orelse (C =:= $#)) ->
  477. error_terminate(400, State, {connection_error, protocol_error,
  478. 'Absolute URIs must include a non-empty host component. (RFC7230 2.7.1)'});
  479. $: when SoFar =:= <<>> ->
  480. error_terminate(400, State, {connection_error, protocol_error,
  481. 'Absolute URIs must include a non-empty host component. (RFC7230 2.7.1)'});
  482. $/ -> parse_uri_path(Rest, State, Method, SoFar, <<"/">>);
  483. $\s -> parse_version(Rest, State, Method, SoFar, <<"/">>, <<>>);
  484. $? -> parse_uri_query(Rest, State, Method, SoFar, <<"/">>, <<>>);
  485. $# -> skip_uri_fragment(Rest, State, Method, SoFar, <<"/">>, <<>>);
  486. C -> parse_uri_authority(Rest, State, Method, <<SoFar/binary, C>>, Remaining - 1)
  487. end.
  488. parse_uri_path(<<C, Rest/bits>>, State, Method, Authority, SoFar) ->
  489. case C of
  490. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  491. 'The request-target must not be followed by a line break. (RFC7230 3.1.1)'});
  492. $\s -> parse_version(Rest, State, Method, Authority, SoFar, <<>>);
  493. $? -> parse_uri_query(Rest, State, Method, Authority, SoFar, <<>>);
  494. $# -> skip_uri_fragment(Rest, State, Method, Authority, SoFar, <<>>);
  495. _ -> parse_uri_path(Rest, State, Method, Authority, <<SoFar/binary, C>>)
  496. end.
  497. parse_uri_query(<<C, Rest/bits>>, State, M, A, P, SoFar) ->
  498. case C of
  499. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  500. 'The request-target must not be followed by a line break. (RFC7230 3.1.1)'});
  501. $\s -> parse_version(Rest, State, M, A, P, SoFar);
  502. $# -> skip_uri_fragment(Rest, State, M, A, P, SoFar);
  503. _ -> parse_uri_query(Rest, State, M, A, P, <<SoFar/binary, C>>)
  504. end.
  505. skip_uri_fragment(<<C, Rest/bits>>, State, M, A, P, Q) ->
  506. case C of
  507. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  508. 'The request-target must not be followed by a line break. (RFC7230 3.1.1)'});
  509. $\s -> parse_version(Rest, State, M, A, P, Q);
  510. _ -> skip_uri_fragment(Rest, State, M, A, P, Q)
  511. end.
  512. parse_version(<< "HTTP/1.1\r\n", Rest/bits >>, State, M, A, P, Q) ->
  513. before_parse_headers(Rest, State, M, A, P, Q, 'HTTP/1.1');
  514. parse_version(<< "HTTP/1.0\r\n", Rest/bits >>, State, M, A, P, Q) ->
  515. before_parse_headers(Rest, State, M, A, P, Q, 'HTTP/1.0');
  516. parse_version(<< "HTTP/1.", _, C, _/bits >>, State, _, _, _, _) when C =:= $\s; C =:= $\t ->
  517. error_terminate(400, State, {connection_error, protocol_error,
  518. 'Whitespace is not allowed after the HTTP version. (RFC7230 3.1.1)'});
  519. parse_version(<< C, _/bits >>, State, _, _, _, _) when C =:= $\s; C =:= $\t ->
  520. error_terminate(400, State, {connection_error, protocol_error,
  521. 'The separator between request target and version must be a single SP. (RFC7230 3.1.1)'});
  522. parse_version(_, State, _, _, _, _) ->
  523. error_terminate(505, State, {connection_error, protocol_error,
  524. 'Unsupported HTTP version. (RFC7230 2.6)'}).
  525. before_parse_headers(Rest, State, M, A, P, Q, V) ->
  526. parse_header(Rest, State#state{in_state=#ps_header{
  527. method=M, authority=A, path=P, qs=Q, version=V}}, #{}).
  528. %% Headers.
  529. %% We need two or more bytes in the buffer to continue.
  530. parse_header(Rest, State=#state{in_state=PS}, Headers) when byte_size(Rest) < 2 ->
  531. {more, State#state{buffer=Rest, in_state=PS#ps_header{headers=Headers}}};
  532. parse_header(<< $\r, $\n, Rest/bits >>, S, Headers) ->
  533. request(Rest, S, Headers);
  534. parse_header(Buffer, State=#state{opts=Opts, in_state=PS}, Headers) ->
  535. MaxHeaders = maps:get(max_headers, Opts, 100),
  536. NumHeaders = maps:size(Headers),
  537. if
  538. NumHeaders >= MaxHeaders ->
  539. error_terminate(431, State#state{in_state=PS#ps_header{headers=Headers}},
  540. {connection_error, limit_reached,
  541. 'The number of headers is larger than configuration allows. (RFC7230 3.2.5, RFC6585 5)'});
  542. true ->
  543. parse_header_colon(Buffer, State, Headers)
  544. end.
  545. parse_header_colon(Buffer, State=#state{opts=Opts, in_state=PS}, Headers) ->
  546. MaxLength = maps:get(max_header_name_length, Opts, 64),
  547. case match_colon(Buffer, 0) of
  548. nomatch when byte_size(Buffer) > MaxLength ->
  549. error_terminate(431, State#state{in_state=PS#ps_header{headers=Headers}},
  550. {connection_error, limit_reached,
  551. 'A header name is larger than configuration allows. (RFC7230 3.2.5, RFC6585 5)'});
  552. nomatch ->
  553. %% We don't have a colon but we might have an invalid header line,
  554. %% so check if we have an LF and abort with an error if we do.
  555. case match_eol(Buffer, 0) of
  556. nomatch ->
  557. {more, State#state{buffer=Buffer, in_state=PS#ps_header{headers=Headers}}};
  558. _ ->
  559. error_terminate(400, State#state{in_state=PS#ps_header{headers=Headers}},
  560. {connection_error, protocol_error,
  561. 'A header line is missing a colon separator. (RFC7230 3.2.4)'})
  562. end;
  563. _ ->
  564. parse_hd_name(Buffer, State, Headers, <<>>)
  565. end.
  566. match_colon(<< $:, _/bits >>, N) ->
  567. N;
  568. match_colon(<< _, Rest/bits >>, N) ->
  569. match_colon(Rest, N + 1);
  570. match_colon(_, _) ->
  571. nomatch.
  572. parse_hd_name(<< $:, Rest/bits >>, State, H, SoFar) ->
  573. parse_hd_before_value(Rest, State, H, SoFar);
  574. parse_hd_name(<< C, _/bits >>, State=#state{in_state=PS}, H, <<>>) when ?IS_WS(C) ->
  575. error_terminate(400, State#state{in_state=PS#ps_header{headers=H}},
  576. {connection_error, protocol_error,
  577. 'Whitespace is not allowed before the header name. (RFC7230 3.2)'});
  578. parse_hd_name(<< C, _/bits >>, State=#state{in_state=PS}, H, _) when ?IS_WS(C) ->
  579. error_terminate(400, State#state{in_state=PS#ps_header{headers=H}},
  580. {connection_error, protocol_error,
  581. 'Whitespace is not allowed between the header name and the colon. (RFC7230 3.2.4)'});
  582. parse_hd_name(<< C, Rest/bits >>, State, H, SoFar) ->
  583. ?LOWER(parse_hd_name, Rest, State, H, SoFar).
  584. parse_hd_before_value(<< $\s, Rest/bits >>, S, H, N) ->
  585. parse_hd_before_value(Rest, S, H, N);
  586. parse_hd_before_value(<< $\t, Rest/bits >>, S, H, N) ->
  587. parse_hd_before_value(Rest, S, H, N);
  588. parse_hd_before_value(Buffer, State=#state{opts=Opts, in_state=PS}, H, N) ->
  589. MaxLength = maps:get(max_header_value_length, Opts, 4096),
  590. case match_eol(Buffer, 0) of
  591. nomatch when byte_size(Buffer) > MaxLength ->
  592. error_terminate(431, State#state{in_state=PS#ps_header{headers=H}},
  593. {connection_error, limit_reached,
  594. 'A header value is larger than configuration allows. (RFC7230 3.2.5, RFC6585 5)'});
  595. nomatch ->
  596. {more, State#state{buffer=Buffer, in_state=PS#ps_header{headers=H, name=N}}};
  597. _ ->
  598. parse_hd_value(Buffer, State, H, N, <<>>)
  599. end.
  600. parse_hd_value(<< $\r, $\n, Rest/bits >>, S, Headers0, Name, SoFar) ->
  601. Value = clean_value_ws_end(SoFar, byte_size(SoFar) - 1),
  602. Headers = case maps:get(Name, Headers0, undefined) of
  603. undefined -> Headers0#{Name => Value};
  604. %% The cookie header does not use proper HTTP header lists.
  605. Value0 when Name =:= <<"cookie">> -> Headers0#{Name => << Value0/binary, "; ", Value/binary >>};
  606. Value0 -> Headers0#{Name => << Value0/binary, ", ", Value/binary >>}
  607. end,
  608. parse_header(Rest, S, Headers);
  609. parse_hd_value(<< C, Rest/bits >>, S, H, N, SoFar) ->
  610. parse_hd_value(Rest, S, H, N, << SoFar/binary, C >>).
  611. clean_value_ws_end(_, -1) ->
  612. <<>>;
  613. clean_value_ws_end(Value, N) ->
  614. case binary:at(Value, N) of
  615. $\s -> clean_value_ws_end(Value, N - 1);
  616. $\t -> clean_value_ws_end(Value, N - 1);
  617. _ ->
  618. S = N + 1,
  619. << Value2:S/binary, _/bits >> = Value,
  620. Value2
  621. end.
  622. -ifdef(TEST).
  623. clean_value_ws_end_test_() ->
  624. Tests = [
  625. {<<>>, <<>>},
  626. {<<" ">>, <<>>},
  627. {<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
  628. "text/html;level=2;q=0.4, */*;q=0.5 \t \t ">>,
  629. <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
  630. "text/html;level=2;q=0.4, */*;q=0.5">>}
  631. ],
  632. [{V, fun() -> R = clean_value_ws_end(V, byte_size(V) - 1) end} || {V, R} <- Tests].
  633. horse_clean_value_ws_end() ->
  634. horse:repeat(200000,
  635. clean_value_ws_end(
  636. <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
  637. "text/html;level=2;q=0.4, */*;q=0.5 ">>,
  638. byte_size(<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
  639. "text/html;level=2;q=0.4, */*;q=0.5 ">>) - 1)
  640. ).
  641. -endif.
  642. request(Buffer, State=#state{transport=Transport,
  643. in_state=PS=#ps_header{authority=Authority, version=Version}}, Headers) ->
  644. case maps:get(<<"host">>, Headers, undefined) of
  645. undefined when Version =:= 'HTTP/1.1' ->
  646. %% @todo Might want to not close the connection on this and next one.
  647. error_terminate(400, State#state{in_state=PS#ps_header{headers=Headers}},
  648. {stream_error, protocol_error,
  649. 'HTTP/1.1 requests must include a host header. (RFC7230 5.4)'});
  650. undefined ->
  651. request(Buffer, State, Headers, <<>>, default_port(Transport:secure()));
  652. %% @todo When CONNECT requests come in we need to ignore the RawHost
  653. %% and instead use the Authority as the source of host.
  654. RawHost when Authority =:= undefined; Authority =:= RawHost ->
  655. request_parse_host(Buffer, State, Headers, RawHost);
  656. %% RFC7230 does not explicitly ask us to reject requests
  657. %% that have a different authority component and host header.
  658. %% However it DOES ask clients to set them to the same value,
  659. %% so we enforce that.
  660. _ ->
  661. error_terminate(400, State#state{in_state=PS#ps_header{headers=Headers}},
  662. {stream_error, protocol_error,
  663. 'The host header is different than the absolute-form authority component. (RFC7230 5.4)'})
  664. end.
  665. request_parse_host(Buffer, State=#state{transport=Transport, in_state=PS}, Headers, RawHost) ->
  666. try cow_http_hd:parse_host(RawHost) of
  667. {Host, undefined} ->
  668. request(Buffer, State, Headers, Host, default_port(Transport:secure()));
  669. {Host, Port} when Port > 0, Port =< 65535 ->
  670. request(Buffer, State, Headers, Host, Port);
  671. _ ->
  672. error_terminate(400, State, {stream_error, protocol_error,
  673. 'The port component of the absolute-form is not in the range 0..65535. (RFC7230 2.7.1)'})
  674. catch _:_ ->
  675. error_terminate(400, State#state{in_state=PS#ps_header{headers=Headers}},
  676. {stream_error, protocol_error,
  677. 'The host header is invalid. (RFC7230 5.4)'})
  678. end.
  679. -spec default_port(boolean()) -> 80 | 443.
  680. default_port(true) -> 443;
  681. default_port(_) -> 80.
  682. %% End of request parsing.
  683. request(Buffer, State0=#state{ref=Ref, transport=Transport, peer=Peer, sock=Sock, cert=Cert,
  684. proxy_header=ProxyHeader, in_streamid=StreamID, in_state=
  685. PS=#ps_header{method=Method, path=Path, qs=Qs, version=Version}},
  686. Headers0, Host, Port) ->
  687. Scheme = case Transport:secure() of
  688. true -> <<"https">>;
  689. false -> <<"http">>
  690. end,
  691. {Headers, HasBody, BodyLength, TDecodeFun, TDecodeState} = case Headers0 of
  692. #{<<"transfer-encoding">> := TransferEncoding0} ->
  693. try cow_http_hd:parse_transfer_encoding(TransferEncoding0) of
  694. [<<"chunked">>] ->
  695. {maps:remove(<<"content-length">>, Headers0),
  696. true, undefined, fun cow_http_te:stream_chunked/2, {0, 0}};
  697. _ ->
  698. error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers0}},
  699. {stream_error, protocol_error,
  700. 'Cowboy only supports transfer-encoding: chunked. (RFC7230 3.3.1)'})
  701. catch _:_ ->
  702. error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers0}},
  703. {stream_error, protocol_error,
  704. 'The transfer-encoding header is invalid. (RFC7230 3.3.1)'})
  705. end;
  706. #{<<"content-length">> := <<"0">>} ->
  707. {Headers0, false, 0, undefined, undefined};
  708. #{<<"content-length">> := BinLength} ->
  709. Length = try
  710. cow_http_hd:parse_content_length(BinLength)
  711. catch _:_ ->
  712. error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers0}},
  713. {stream_error, protocol_error,
  714. 'The content-length header is invalid. (RFC7230 3.3.2)'})
  715. end,
  716. {Headers0, true, Length, fun cow_http_te:stream_identity/2, {0, Length}};
  717. _ ->
  718. {Headers0, false, 0, undefined, undefined}
  719. end,
  720. Req0 = #{
  721. ref => Ref,
  722. pid => self(),
  723. streamid => StreamID,
  724. peer => Peer,
  725. sock => Sock,
  726. cert => Cert,
  727. method => Method,
  728. scheme => Scheme,
  729. host => Host,
  730. port => Port,
  731. path => Path,
  732. qs => Qs,
  733. version => Version,
  734. %% We are transparently taking care of transfer-encodings so
  735. %% the user code has no need to know about it.
  736. headers => maps:remove(<<"transfer-encoding">>, Headers),
  737. has_body => HasBody,
  738. body_length => BodyLength
  739. },
  740. %% We add the PROXY header information if any.
  741. Req = case ProxyHeader of
  742. undefined -> Req0;
  743. _ -> Req0#{proxy_header => ProxyHeader}
  744. end,
  745. case is_http2_upgrade(Headers, Version) of
  746. false ->
  747. State = case HasBody of
  748. true ->
  749. State0#state{in_state=#ps_body{
  750. length = BodyLength,
  751. transfer_decode_fun = TDecodeFun,
  752. transfer_decode_state = TDecodeState
  753. }};
  754. false ->
  755. State0#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}
  756. end,
  757. {request, Req, State#state{buffer=Buffer}};
  758. {true, HTTP2Settings} ->
  759. %% We save the headers in case the upgrade will fail
  760. %% and we need to pass them to cowboy_stream:early_error.
  761. http2_upgrade(State0#state{in_state=PS#ps_header{headers=Headers}},
  762. Buffer, HTTP2Settings, Req)
  763. end.
  764. %% HTTP/2 upgrade.
  765. %% @todo We must not upgrade to h2c over a TLS connection.
  766. is_http2_upgrade(#{<<"connection">> := Conn, <<"upgrade">> := Upgrade,
  767. <<"http2-settings">> := HTTP2Settings}, 'HTTP/1.1') ->
  768. Conns = cow_http_hd:parse_connection(Conn),
  769. case {lists:member(<<"upgrade">>, Conns), lists:member(<<"http2-settings">>, Conns)} of
  770. {true, true} ->
  771. Protocols = cow_http_hd:parse_upgrade(Upgrade),
  772. case lists:member(<<"h2c">>, Protocols) of
  773. true ->
  774. {true, HTTP2Settings};
  775. false ->
  776. false
  777. end;
  778. _ ->
  779. false
  780. end;
  781. is_http2_upgrade(_, _) ->
  782. false.
  783. %% Prior knowledge upgrade, without an HTTP/1.1 request.
  784. http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
  785. proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert}, Buffer) ->
  786. case Transport:secure() of
  787. false ->
  788. _ = cancel_timeout(State),
  789. cowboy_http2:init(Parent, Ref, Socket, Transport,
  790. ProxyHeader, Opts, Peer, Sock, Cert, Buffer);
  791. true ->
  792. error_terminate(400, State, {connection_error, protocol_error,
  793. 'Clients that support HTTP/2 over TLS MUST use ALPN. (RFC7540 3.4)'})
  794. end.
  795. %% Upgrade via an HTTP/1.1 request.
  796. http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
  797. proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert},
  798. Buffer, HTTP2Settings, Req) ->
  799. %% @todo
  800. %% However if the client sent a body, we need to read the body in full
  801. %% and if we can't do that, return a 413 response. Some options are in order.
  802. %% Always half-closed stream coming from this side.
  803. try cow_http_hd:parse_http2_settings(HTTP2Settings) of
  804. Settings ->
  805. _ = cancel_timeout(State),
  806. cowboy_http2:init(Parent, Ref, Socket, Transport,
  807. ProxyHeader, Opts, Peer, Sock, Cert, Buffer, Settings, Req)
  808. catch _:_ ->
  809. error_terminate(400, State, {connection_error, protocol_error,
  810. 'The HTTP2-Settings header must contain a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'})
  811. end.
  812. %% Request body parsing.
  813. parse_body(Buffer, State=#state{in_streamid=StreamID, in_state=
  814. PS=#ps_body{received=Received, transfer_decode_fun=TDecode,
  815. transfer_decode_state=TState0}}) ->
  816. %% @todo Proper trailers.
  817. try TDecode(Buffer, TState0) of
  818. more ->
  819. {more, State#state{buffer=Buffer}};
  820. {more, Data, TState} ->
  821. {data, StreamID, nofin, Data, State#state{buffer= <<>>,
  822. in_state=PS#ps_body{received=Received + byte_size(Data),
  823. transfer_decode_state=TState}}};
  824. {more, Data, _Length, TState} when is_integer(_Length) ->
  825. {data, StreamID, nofin, Data, State#state{buffer= <<>>,
  826. in_state=PS#ps_body{received=Received + byte_size(Data),
  827. transfer_decode_state=TState}}};
  828. {more, Data, Rest, TState} ->
  829. {data, StreamID, nofin, Data, State#state{buffer=Rest,
  830. in_state=PS#ps_body{received=Received + byte_size(Data),
  831. transfer_decode_state=TState}}};
  832. {done, _HasTrailers, Rest} ->
  833. {data, StreamID, fin, <<>>,
  834. State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}}};
  835. {done, Data, _HasTrailers, Rest} ->
  836. {data, StreamID, fin, Data,
  837. State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}}}
  838. catch _:_ ->
  839. Reason = {connection_error, protocol_error,
  840. 'Failure to decode the content. (RFC7230 4)'},
  841. terminate(stream_terminate(State, StreamID, Reason), Reason)
  842. end.
  843. %% Message handling.
  844. down(State=#state{opts=Opts, children=Children0}, Pid, Msg) ->
  845. case cowboy_children:down(Children0, Pid) of
  846. %% The stream was terminated already.
  847. {ok, undefined, Children} ->
  848. State#state{children=Children};
  849. %% The stream is still running.
  850. {ok, StreamID, Children} ->
  851. info(State#state{children=Children}, StreamID, Msg);
  852. %% The process was unknown.
  853. error ->
  854. cowboy:log(warning, "Received EXIT signal ~p for unknown process ~p.~n",
  855. [Msg, Pid], Opts),
  856. State
  857. end.
  858. info(State=#state{opts=Opts, streams=Streams0}, StreamID, Msg) ->
  859. case lists:keyfind(StreamID, #stream.id, Streams0) of
  860. Stream = #stream{state=StreamState0} ->
  861. try cowboy_stream:info(StreamID, Msg, StreamState0) of
  862. {Commands, StreamState} ->
  863. Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
  864. Stream#stream{state=StreamState}),
  865. commands(State#state{streams=Streams}, StreamID, Commands)
  866. catch Class:Exception:Stacktrace ->
  867. cowboy:log(cowboy_stream:make_error_log(info,
  868. [StreamID, Msg, StreamState0],
  869. Class, Exception, Stacktrace), Opts),
  870. stream_terminate(State, StreamID, {internal_error, {Class, Exception},
  871. 'Unhandled exception in cowboy_stream:info/3.'})
  872. end;
  873. false ->
  874. cowboy:log(warning, "Received message ~p for unknown stream ~p.~n",
  875. [Msg, StreamID], Opts),
  876. State
  877. end.
  878. %% Commands.
  879. commands(State, _, []) ->
  880. State;
  881. %% Supervise a child process.
  882. commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) ->
  883. commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)},
  884. StreamID, Tail);
  885. %% Error handling.
  886. commands(State, StreamID, [Error = {internal_error, _, _}|Tail]) ->
  887. commands(stream_terminate(State, StreamID, Error), StreamID, Tail);
  888. %% Commands for a stream currently inactive.
  889. commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Commands)
  890. when Current =/= StreamID ->
  891. %% @todo We still want to handle some commands...
  892. Stream = #stream{queue=Queue} = lists:keyfind(StreamID, #stream.id, Streams0),
  893. Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
  894. Stream#stream{queue=Queue ++ Commands}),
  895. State#state{streams=Streams};
  896. %% When we have finished reading the request body, do nothing.
  897. commands(State=#state{flow=infinity}, StreamID, [{flow, _}|Tail]) ->
  898. commands(State, StreamID, Tail);
  899. %% Read the request body.
  900. commands(State0=#state{flow=Flow0}, StreamID, [{flow, Size}|Tail]) ->
  901. %% We must read *at least* Size of data otherwise functions
  902. %% like cowboy_req:read_body/1,2 will wait indefinitely.
  903. Flow = if
  904. Flow0 < 0 -> Size;
  905. true -> Flow0 + Size
  906. end,
  907. %% Reenable active mode if necessary.
  908. State = if
  909. Flow0 =< 0, Flow > 0 ->
  910. active(State0);
  911. true ->
  912. State0
  913. end,
  914. commands(State#state{flow=Flow}, StreamID, Tail);
  915. %% Error responses are sent only if a response wasn't sent already.
  916. commands(State=#state{out_state=wait, out_streamid=StreamID}, StreamID,
  917. [{error_response, Status, Headers0, Body}|Tail]) ->
  918. %% We close the connection when the error response is 408, as it
  919. %% indicates a timeout and the RFC recommends that we stop here. (RFC7231 6.5.7)
  920. Headers = case Status of
  921. 408 -> Headers0#{<<"connection">> => <<"close">>};
  922. <<"408", _/bits>> -> Headers0#{<<"connection">> => <<"close">>};
  923. _ -> Headers0
  924. end,
  925. commands(State, StreamID, [{response, Status, Headers, Body}|Tail]);
  926. commands(State, StreamID, [{error_response, _, _, _}|Tail]) ->
  927. commands(State, StreamID, Tail);
  928. %% Send an informational response.
  929. commands(State=#state{socket=Socket, transport=Transport, out_state=wait, streams=Streams},
  930. StreamID, [{inform, StatusCode, Headers}|Tail]) ->
  931. %% @todo I'm pretty sure the last stream in the list is the one we want
  932. %% considering all others are queued.
  933. #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
  934. _ = case Version of
  935. 'HTTP/1.1' ->
  936. Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1',
  937. headers_to_list(Headers)));
  938. %% Do not send informational responses to HTTP/1.0 clients. (RFC7231 6.2)
  939. 'HTTP/1.0' ->
  940. ok
  941. end,
  942. commands(State, StreamID, Tail);
  943. %% Send a full response.
  944. %%
  945. %% @todo Kill the stream if it sent a response when one has already been sent.
  946. %% @todo Keep IsFin in the state.
  947. %% @todo Same two things above apply to DATA, possibly promise too.
  948. commands(State0=#state{socket=Socket, transport=Transport, out_state=wait, streams=Streams}, StreamID,
  949. [{response, StatusCode, Headers0, Body}|Tail]) ->
  950. %% @todo I'm pretty sure the last stream in the list is the one we want
  951. %% considering all others are queued.
  952. #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
  953. {State1, Headers} = connection(State0, Headers0, StreamID, Version),
  954. State = State1#state{out_state=done},
  955. %% @todo Ensure content-length is set.
  956. Response = cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers)),
  957. case Body of
  958. {sendfile, _, _, _} ->
  959. Transport:send(Socket, Response),
  960. sendfile(State, Body);
  961. _ ->
  962. Transport:send(Socket, [Response, Body])
  963. end,
  964. commands(State, StreamID, Tail);
  965. %% Send response headers and initiate chunked encoding or streaming.
  966. commands(State0=#state{socket=Socket, transport=Transport,
  967. opts=Opts, overriden_opts=Override, streams=Streams0, out_state=OutState},
  968. StreamID, [{headers, StatusCode, Headers0}|Tail]) ->
  969. %% @todo Same as above (about the last stream in the list).
  970. Stream = #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams0),
  971. Status = cow_http:status_to_integer(StatusCode),
  972. ContentLength = maps:get(<<"content-length">>, Headers0, undefined),
  973. %% Chunked transfer-encoding can be disabled on a per-request basis.
  974. Chunked = case Override of
  975. #{chunked := Chunked0} -> Chunked0;
  976. _ -> maps:get(chunked, Opts, true)
  977. end,
  978. {State1, Headers1} = case {Status, ContentLength, Version} of
  979. {204, _, 'HTTP/1.1'} ->
  980. {State0#state{out_state=done}, Headers0};
  981. {304, _, 'HTTP/1.1'} ->
  982. {State0#state{out_state=done}, Headers0};
  983. {_, undefined, 'HTTP/1.1'} when Chunked ->
  984. {State0#state{out_state=chunked}, Headers0#{<<"transfer-encoding">> => <<"chunked">>}};
  985. %% Close the connection after streaming without content-length
  986. %% to all HTTP/1.0 clients and to HTTP/1.1 clients when chunked is disabled.
  987. {_, undefined, _} ->
  988. {State0#state{out_state=streaming, last_streamid=StreamID}, Headers0};
  989. %% Stream the response body without chunked transfer-encoding.
  990. _ ->
  991. ExpectedSize = cow_http_hd:parse_content_length(ContentLength),
  992. Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
  993. Stream#stream{local_expected_size=ExpectedSize}),
  994. {State0#state{out_state=streaming, streams=Streams}, Headers0}
  995. end,
  996. Headers2 = case stream_te(OutState, Stream) of
  997. trailers -> Headers1;
  998. _ -> maps:remove(<<"trailer">>, Headers1)
  999. end,
  1000. {State, Headers} = connection(State1, Headers2, StreamID, Version),
  1001. Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers))),
  1002. commands(State, StreamID, Tail);
  1003. %% Send a response body chunk.
  1004. %% @todo We need to kill the stream if it tries to send data before headers.
  1005. commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out_state=OutState},
  1006. StreamID, [{data, IsFin, Data}|Tail]) ->
  1007. %% Do not send anything when the user asks to send an empty
  1008. %% data frame, as that would break the protocol.
  1009. Size = case Data of
  1010. {sendfile, _, B, _} -> B;
  1011. _ -> iolist_size(Data)
  1012. end,
  1013. %% Depending on the current state we may need to send nothing,
  1014. %% the last chunk, chunked data with/without the last chunk,
  1015. %% or just the data as-is.
  1016. Stream = case lists:keyfind(StreamID, #stream.id, Streams0) of
  1017. Stream0=#stream{method= <<"HEAD">>} ->
  1018. Stream0;
  1019. Stream0 when Size =:= 0, IsFin =:= fin, OutState =:= chunked ->
  1020. Transport:send(Socket, <<"0\r\n\r\n">>),
  1021. Stream0;
  1022. Stream0 when Size =:= 0 ->
  1023. Stream0;
  1024. Stream0 when is_tuple(Data), OutState =:= chunked ->
  1025. Transport:send(Socket, [integer_to_binary(Size, 16), <<"\r\n">>]),
  1026. sendfile(State0, Data),
  1027. Transport:send(Socket,
  1028. case IsFin of
  1029. fin -> <<"\r\n0\r\n\r\n">>;
  1030. nofin -> <<"\r\n">>
  1031. end),
  1032. Stream0;
  1033. Stream0 when OutState =:= chunked ->
  1034. Transport:send(Socket, [
  1035. integer_to_binary(Size, 16), <<"\r\n">>, Data,
  1036. case IsFin of
  1037. fin -> <<"\r\n0\r\n\r\n">>;
  1038. nofin -> <<"\r\n">>
  1039. end
  1040. ]),
  1041. Stream0;
  1042. Stream0 when OutState =:= streaming ->
  1043. #stream{local_sent_size=SentSize0, local_expected_size=ExpectedSize} = Stream0,
  1044. SentSize = SentSize0 + Size,
  1045. if
  1046. %% ExpectedSize may be undefined, which is > any integer value.
  1047. SentSize > ExpectedSize ->
  1048. terminate(State0, response_body_too_large);
  1049. is_tuple(Data) ->
  1050. sendfile(State0, Data);
  1051. true ->
  1052. Transport:send(Socket, Data)
  1053. end,
  1054. Stream0#stream{local_sent_size=SentSize}
  1055. end,
  1056. State = case IsFin of
  1057. fin -> State0#state{out_state=done};
  1058. nofin -> State0
  1059. end,
  1060. Streams = lists:keyreplace(StreamID, #stream.id, Streams0, Stream),
  1061. commands(State#state{streams=Streams}, StreamID, Tail);
  1062. commands(State=#state{socket=Socket, transport=Transport, streams=Streams, out_state=OutState},
  1063. StreamID, [{trailers, Trailers}|Tail]) ->
  1064. case stream_te(OutState, lists:keyfind(StreamID, #stream.id, Streams)) of
  1065. trailers ->
  1066. Transport:send(Socket, [
  1067. <<"0\r\n">>,
  1068. cow_http:headers(maps:to_list(Trailers)),
  1069. <<"\r\n">>
  1070. ]);
  1071. no_trailers ->
  1072. Transport:send(Socket, <<"0\r\n\r\n">>);
  1073. not_chunked ->
  1074. ok
  1075. end,
  1076. commands(State#state{out_state=done}, StreamID, Tail);
  1077. %% Protocol takeover.
  1078. commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport,
  1079. out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID,
  1080. [{switch_protocol, Headers, Protocol, InitialState}|_Tail]) ->
  1081. %% @todo If there's streams opened after this one, fail instead of 101.
  1082. State1 = cancel_timeout(State0),
  1083. %% Before we send the 101 response we need to stop receiving data
  1084. %% from the socket, otherwise the data might be receive before the
  1085. %% call to flush/0 and we end up inadvertently dropping a packet.
  1086. %%
  1087. %% @todo Handle cases where the request came with a body. We need
  1088. %% to process or skip the body before the upgrade can be completed.
  1089. State = passive(State1),
  1090. %% Send a 101 response if necessary, then terminate the stream.
  1091. #state{streams=Streams} = case OutState of
  1092. wait -> info(State, StreamID, {inform, 101, Headers});
  1093. _ -> State
  1094. end,
  1095. #stream{state=StreamState} = lists:keyfind(StreamID, #stream.id, Streams),
  1096. %% @todo We need to shutdown processes here first.
  1097. stream_call_terminate(StreamID, switch_protocol, StreamState, State),
  1098. %% Terminate children processes and flush any remaining messages from the mailbox.
  1099. cowboy_children:terminate(Children),
  1100. flush(Parent),
  1101. Protocol:takeover(Parent, Ref, Socket, Transport, Opts, Buffer, InitialState);
  1102. %% Set options dynamically.
  1103. commands(State0=#state{overriden_opts=Opts},
  1104. StreamID, [{set_options, SetOpts}|Tail]) ->
  1105. State1 = case SetOpts of
  1106. #{idle_timeout := IdleTimeout} ->
  1107. set_timeout(State0#state{overriden_opts=Opts#{idle_timeout => IdleTimeout}},
  1108. idle_timeout);
  1109. _ ->
  1110. State0
  1111. end,
  1112. State = case SetOpts of
  1113. #{chunked := Chunked} ->
  1114. State1#state{overriden_opts=Opts#{chunked => Chunked}};
  1115. _ ->
  1116. State1
  1117. end,
  1118. commands(State, StreamID, Tail);
  1119. %% Stream shutdown.
  1120. commands(State, StreamID, [stop|Tail]) ->
  1121. %% @todo Do we want to run the commands after a stop?
  1122. %% @todo We currently wait for the stop command before we
  1123. %% continue with the next request/response. In theory, if
  1124. %% the request body was read fully and the response body
  1125. %% was sent fully we should be able to start working on
  1126. %% the next request concurrently. This can be done as a
  1127. %% future optimization.
  1128. maybe_terminate(State, StreamID, Tail);
  1129. %% Log event.
  1130. commands(State=#state{opts=Opts}, StreamID, [Log={log, _, _, _}|Tail]) ->
  1131. cowboy:log(Log, Opts),
  1132. commands(State, StreamID, Tail);
  1133. %% HTTP/1.1 does not support push; ignore.
  1134. commands(State, StreamID, [{push, _, _, _, _, _, _, _}|Tail]) ->
  1135. commands(State, StreamID, Tail).
  1136. %% The set-cookie header is special; we can only send one cookie per header.
  1137. headers_to_list(Headers0=#{<<"set-cookie">> := SetCookies}) ->
  1138. Headers1 = maps:to_list(maps:remove(<<"set-cookie">>, Headers0)),
  1139. Headers1 ++ [{<<"set-cookie">>, Value} || Value <- SetCookies];
  1140. headers_to_list(Headers) ->
  1141. maps:to_list(Headers).
  1142. %% We wrap the sendfile call into a try/catch because on OTP-20
  1143. %% and earlier a few different crashes could occur for sockets
  1144. %% that were closing or closed. For example a badarg in
  1145. %% erlang:port_get_data(#Port<...>) or a badmatch like
  1146. %% {{badmatch,{error,einval}},[{prim_file,sendfile,8,[]}...
  1147. %%
  1148. %% OTP-21 uses a NIF instead of a port so the implementation
  1149. %% and behavior has dramatically changed and it is unclear
  1150. %% whether it will be necessary in the future.
  1151. %%
  1152. %% This try/catch prevents some noisy logs to be written
  1153. %% when these errors occur.
  1154. sendfile(State=#state{socket=Socket, transport=Transport, opts=Opts},
  1155. {sendfile, Offset, Bytes, Path}) ->
  1156. try
  1157. %% When sendfile is disabled we explicitly use the fallback.
  1158. _ = case maps:get(sendfile, Opts, true) of
  1159. true -> Transport:sendfile(Socket, Path, Offset, Bytes);
  1160. false -> ranch_transport:sendfile(Transport, Socket, Path, Offset, Bytes, [])
  1161. end,
  1162. ok
  1163. catch _:_ ->
  1164. terminate(State, {socket_error, sendfile_crash,
  1165. 'An error occurred when using the sendfile function.'})
  1166. end.
  1167. %% Flush messages specific to cowboy_http before handing over the
  1168. %% connection to another protocol.
  1169. flush(Parent) ->
  1170. receive
  1171. {timeout, _, _} ->
  1172. flush(Parent);
  1173. {{Pid, _}, _} when Pid =:= self() ->
  1174. flush(Parent);
  1175. {'EXIT', Pid, _} when Pid =/= Parent ->
  1176. flush(Parent)
  1177. after 0 ->
  1178. ok
  1179. end.
  1180. %% @todo In these cases I'm not sure if we should continue processing commands.
  1181. maybe_terminate(State=#state{last_streamid=StreamID}, StreamID, _Tail) ->
  1182. terminate(stream_terminate(State, StreamID, normal), normal); %% @todo Reason ok?
  1183. maybe_terminate(State, StreamID, _Tail) ->
  1184. stream_terminate(State, StreamID, normal).
  1185. stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InState,
  1186. out_streamid=OutStreamID, out_state=OutState, streams=Streams0,
  1187. children=Children0}, StreamID, Reason) ->
  1188. #stream{version=Version, local_expected_size=ExpectedSize, local_sent_size=SentSize}
  1189. = lists:keyfind(StreamID, #stream.id, Streams0),
  1190. %% Send a response or terminate chunks depending on the current output state.
  1191. State1 = #state{streams=Streams1} = case OutState of
  1192. wait when element(1, Reason) =:= internal_error ->
  1193. info(State0, StreamID, {response, 500, #{<<"content-length">> => <<"0">>}, <<>>});
  1194. wait when element(1, Reason) =:= connection_error ->
  1195. info(State0, StreamID, {response, 400, #{<<"content-length">> => <<"0">>}, <<>>});
  1196. wait ->
  1197. info(State0, StreamID, {response, 204, #{}, <<>>});
  1198. chunked when Version =:= 'HTTP/1.1' ->
  1199. info(State0, StreamID, {data, fin, <<>>});
  1200. streaming when SentSize < ExpectedSize ->
  1201. terminate(State0, response_body_too_small);
  1202. _ -> %% done or Version =:= 'HTTP/1.0'
  1203. State0
  1204. end,
  1205. %% Stop the stream, shutdown children and reset overriden options.
  1206. {value, #stream{state=StreamState}, Streams}
  1207. = lists:keytake(StreamID, #stream.id, Streams1),
  1208. stream_call_terminate(StreamID, Reason, StreamState, State1),
  1209. Children = cowboy_children:shutdown(Children0, StreamID),
  1210. State = State1#state{overriden_opts=#{}, streams=Streams, children=Children},
  1211. %% We want to drop the connection if the body was not read fully
  1212. %% and we don't know its length or more remains to be read than
  1213. %% configuration allows.
  1214. MaxSkipBodyLength = maps:get(max_skip_body_length, Opts, 1000000),
  1215. case InState of
  1216. #ps_body{length=undefined}
  1217. when InStreamID =:= OutStreamID ->
  1218. terminate(State, skip_body_unknown_length);
  1219. #ps_body{length=Len, received=Received}
  1220. when InStreamID =:= OutStreamID, Received + MaxSkipBodyLength < Len ->
  1221. terminate(State, skip_body_too_large);
  1222. #ps_body{} when InStreamID =:= OutStreamID ->
  1223. stream_next(State#state{flow=infinity});
  1224. _ ->
  1225. stream_next(State)
  1226. end.
  1227. stream_next(State0=#state{opts=Opts, active=Active, out_streamid=OutStreamID, streams=Streams}) ->
  1228. NextOutStreamID = OutStreamID + 1,
  1229. case lists:keyfind(NextOutStreamID, #stream.id, Streams) of
  1230. false ->
  1231. State0#state{out_streamid=NextOutStreamID, out_state=wait};
  1232. #stream{queue=Commands} ->
  1233. State = case Active of
  1234. true -> State0;
  1235. false -> active(State0)
  1236. end,
  1237. %% @todo Remove queue from the stream.
  1238. %% We set the flow to the initial flow size even though
  1239. %% we might have sent some data through already due to pipelining.
  1240. Flow = maps:get(initial_stream_flow_size, Opts, 65535),
  1241. commands(State#state{flow=Flow, out_streamid=NextOutStreamID, out_state=wait},
  1242. NextOutStreamID, Commands)
  1243. end.
  1244. stream_call_terminate(StreamID, Reason, StreamState, #state{opts=Opts}) ->
  1245. try
  1246. cowboy_stream:terminate(StreamID, Reason, StreamState)
  1247. catch Class:Exception:Stacktrace ->
  1248. cowboy:log(cowboy_stream:make_error_log(terminate,
  1249. [StreamID, Reason, StreamState],
  1250. Class, Exception, Stacktrace), Opts)
  1251. end.
  1252. maybe_req_close(#state{opts=#{http10_keepalive := false}}, _, 'HTTP/1.0') ->
  1253. close;
  1254. maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.0') ->
  1255. Conns = cow_http_hd:parse_connection(Conn),
  1256. case lists:member(<<"keep-alive">>, Conns) of
  1257. true -> keepalive;
  1258. false -> close
  1259. end;
  1260. maybe_req_close(_, _, 'HTTP/1.0') ->
  1261. close;
  1262. maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.1') ->
  1263. case connection_hd_is_close(Conn) of
  1264. true -> close;
  1265. false -> keepalive
  1266. end;
  1267. maybe_req_close(_, _, _) ->
  1268. keepalive.
  1269. connection(State=#state{last_streamid=StreamID}, Headers=#{<<"connection">> := Conn}, StreamID, _) ->
  1270. case connection_hd_is_close(Conn) of
  1271. true -> {State, Headers};
  1272. %% @todo Here we need to remove keep-alive and add close, not just add close.
  1273. false -> {State, Headers#{<<"connection">> => [<<"close, ">>, Conn]}}
  1274. end;
  1275. connection(State=#state{last_streamid=StreamID}, Headers, StreamID, _) ->
  1276. {State, Headers#{<<"connection">> => <<"close">>}};
  1277. connection(State, Headers=#{<<"connection">> := Conn}, StreamID, _) ->
  1278. case connection_hd_is_close(Conn) of
  1279. true -> {State#state{last_streamid=StreamID}, Headers};
  1280. %% @todo Here we need to set keep-alive only if it wasn't set before.
  1281. false -> {State, Headers}
  1282. end;
  1283. connection(State, Headers, _, 'HTTP/1.0') ->
  1284. {State, Headers#{<<"connection">> => <<"keep-alive">>}};
  1285. connection(State, Headers, _, _) ->
  1286. {State, Headers}.
  1287. connection_hd_is_close(Conn) ->
  1288. Conns = cow_http_hd:parse_connection(iolist_to_binary(Conn)),
  1289. lists:member(<<"close">>, Conns).
  1290. stream_te(streaming, _) ->
  1291. not_chunked;
  1292. %% No TE header was sent.
  1293. stream_te(_, #stream{te=undefined}) ->
  1294. no_trailers;
  1295. stream_te(_, #stream{te=TE0}) ->
  1296. try cow_http_hd:parse_te(TE0) of
  1297. {TE1, _} -> TE1
  1298. catch _:_ ->
  1299. %% If we can't parse the TE header, assume we can't send trailers.
  1300. no_trailers
  1301. end.
  1302. %% This function is only called when an error occurs on a new stream.
  1303. -spec error_terminate(cowboy:http_status(), #state{}, _) -> no_return().
  1304. error_terminate(StatusCode, State=#state{ref=Ref, peer=Peer, in_state=StreamState}, Reason) ->
  1305. PartialReq = case StreamState of
  1306. #ps_request_line{} -> #{
  1307. ref => Ref,
  1308. peer => Peer
  1309. };
  1310. #ps_header{method=Method, path=Path, qs=Qs,
  1311. version=Version, headers=ReqHeaders} -> #{
  1312. ref => Ref,
  1313. peer => Peer,
  1314. method => Method,
  1315. path => Path,
  1316. qs => Qs,
  1317. version => Version,
  1318. headers => case ReqHeaders of
  1319. undefined -> #{};
  1320. _ -> ReqHeaders
  1321. end
  1322. }
  1323. end,
  1324. early_error(StatusCode, State, Reason, PartialReq, #{<<"connection">> => <<"close">>}),
  1325. terminate(State, Reason).
  1326. early_error(StatusCode, State, Reason, PartialReq) ->
  1327. early_error(StatusCode, State, Reason, PartialReq, #{}).
  1328. early_error(StatusCode0, #state{socket=Socket, transport=Transport,
  1329. opts=Opts, in_streamid=StreamID}, Reason, PartialReq, RespHeaders0) ->
  1330. RespHeaders1 = RespHeaders0#{<<"content-length">> => <<"0">>},
  1331. Resp = {response, StatusCode0, RespHeaders1, <<>>},
  1332. try cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts) of
  1333. {response, StatusCode, RespHeaders, RespBody} ->
  1334. Transport:send(Socket, [
  1335. cow_http:response(StatusCode, 'HTTP/1.1', maps:to_list(RespHeaders)),
  1336. %% @todo We shouldn't send the body when the method is HEAD.
  1337. %% @todo Technically we allow the sendfile tuple.
  1338. RespBody
  1339. ])
  1340. catch Class:Exception:Stacktrace ->
  1341. cowboy:log(cowboy_stream:make_error_log(early_error,
  1342. [StreamID, Reason, PartialReq, Resp, Opts],
  1343. Class, Exception, Stacktrace), Opts),
  1344. %% We still need to send an error response, so send what we initially
  1345. %% wanted to send. It's better than nothing.
  1346. Transport:send(Socket, cow_http:response(StatusCode0,
  1347. 'HTTP/1.1', maps:to_list(RespHeaders1)))
  1348. end,
  1349. ok.
  1350. -spec terminate(_, _) -> no_return().
  1351. terminate(undefined, Reason) ->
  1352. exit({shutdown, Reason});
  1353. terminate(State=#state{streams=Streams, children=Children}, Reason) ->
  1354. terminate_all_streams(State, Streams, Reason),
  1355. cowboy_children:terminate(Children),
  1356. terminate_linger(State),
  1357. exit({shutdown, Reason}).
  1358. terminate_all_streams(_, [], _) ->
  1359. ok;
  1360. terminate_all_streams(State, [#stream{id=StreamID, state=StreamState}|Tail], Reason) ->
  1361. stream_call_terminate(StreamID, Reason, StreamState, State),
  1362. terminate_all_streams(State, Tail, Reason).
  1363. terminate_linger(State=#state{socket=Socket, transport=Transport, opts=Opts}) ->
  1364. case Transport:shutdown(Socket, write) of
  1365. ok ->
  1366. case maps:get(linger_timeout, Opts, 1000) of
  1367. 0 ->
  1368. ok;
  1369. infinity ->
  1370. terminate_linger_before_loop(State, undefined, Transport:messages());
  1371. Timeout ->
  1372. TimerRef = erlang:start_timer(Timeout, self(), linger_timeout),
  1373. terminate_linger_before_loop(State, TimerRef, Transport:messages())
  1374. end;
  1375. {error, _} ->
  1376. ok
  1377. end.
  1378. terminate_linger_before_loop(State, TimerRef, Messages) ->
  1379. %% We may already be in active mode when we do this
  1380. %% but it's OK because we are shutting down anyway.
  1381. case setopts_active(State) of
  1382. ok ->
  1383. terminate_linger_loop(State, TimerRef, Messages);
  1384. {error, _} ->
  1385. ok
  1386. end.
  1387. terminate_linger_loop(State=#state{socket=Socket}, TimerRef, Messages) ->
  1388. receive
  1389. {OK, Socket, _} when OK =:= element(1, Messages) ->
  1390. terminate_linger_loop(State, TimerRef, Messages);
  1391. {Closed, Socket} when Closed =:= element(2, Messages) ->
  1392. ok;
  1393. {Error, Socket, _} when Error =:= element(3, Messages) ->
  1394. ok;
  1395. {Passive, Socket} when Passive =:= tcp_passive; Passive =:= ssl_passive ->
  1396. terminate_linger_before_loop(State, TimerRef, Messages);
  1397. {timeout, TimerRef, linger_timeout} ->
  1398. ok;
  1399. _ ->
  1400. terminate_linger_loop(State, TimerRef, Messages)
  1401. end.
  1402. %% System callbacks.
  1403. -spec system_continue(_, _, #state{}) -> ok.
  1404. system_continue(_, _, State) ->
  1405. loop(State).
  1406. -spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return().
  1407. system_terminate(Reason, _, _, State) ->
  1408. terminate(State, {stop, {exit, Reason}, 'sys:terminate/2,3 was called.'}).
  1409. -spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}.
  1410. system_code_change(Misc, _, _, _) ->
  1411. {ok, Misc}.