cowboy_http.erl 57 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479
  1. %% Copyright (c) 2016-2017, Loïc Hoguin <essen@ninenines.eu>
  2. %%
  3. %% Permission to use, copy, modify, and/or distribute this software for any
  4. %% purpose with or without fee is hereby granted, provided that the above
  5. %% copyright notice and this permission notice appear in all copies.
  6. %%
  7. %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  8. %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  9. %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  10. %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  11. %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  12. %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  13. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  14. -module(cowboy_http).
  15. -ifdef(OTP_RELEASE).
  16. -compile({nowarn_deprecated_function, [{erlang, get_stacktrace, 0}]}).
  17. -endif.
  18. -export([init/6]).
  19. -export([system_continue/3]).
  20. -export([system_terminate/4]).
  21. -export([system_code_change/4]).
  22. -type opts() :: #{
  23. chunked => boolean(),
  24. compress_buffering => boolean(),
  25. compress_threshold => non_neg_integer(),
  26. connection_type => worker | supervisor,
  27. env => cowboy_middleware:env(),
  28. http10_keepalive => boolean(),
  29. idle_timeout => timeout(),
  30. inactivity_timeout => timeout(),
  31. initial_stream_flow_size => non_neg_integer(),
  32. linger_timeout => timeout(),
  33. logger => module(),
  34. max_authority_length => non_neg_integer(),
  35. max_empty_lines => non_neg_integer(),
  36. max_header_name_length => non_neg_integer(),
  37. max_header_value_length => non_neg_integer(),
  38. max_headers => non_neg_integer(),
  39. max_keepalive => non_neg_integer(),
  40. max_method_length => non_neg_integer(),
  41. max_request_line_length => non_neg_integer(),
  42. metrics_callback => cowboy_metrics_h:metrics_callback(),
  43. metrics_req_filter => fun((cowboy_req:req()) -> map()),
  44. metrics_resp_headers_filter => fun((cowboy:http_headers()) -> cowboy:http_headers()),
  45. middlewares => [module()],
  46. proxy_header => boolean(),
  47. request_timeout => timeout(),
  48. sendfile => boolean(),
  49. shutdown_timeout => timeout(),
  50. stream_handlers => [module()],
  51. tracer_callback => cowboy_tracer_h:tracer_callback(),
  52. tracer_flags => [atom()],
  53. tracer_match_specs => cowboy_tracer_h:tracer_match_specs(),
  54. %% Open ended because configured stream handlers might add options.
  55. _ => _
  56. }.
  57. -export_type([opts/0]).
  58. -record(ps_request_line, {
  59. empty_lines = 0 :: non_neg_integer()
  60. }).
  61. -record(ps_header, {
  62. method = undefined :: binary(),
  63. authority = undefined :: binary() | undefined,
  64. path = undefined :: binary(),
  65. qs = undefined :: binary(),
  66. version = undefined :: cowboy:http_version(),
  67. headers = undefined :: cowboy:http_headers() | undefined,
  68. name = undefined :: binary() | undefined
  69. }).
  70. -record(ps_body, {
  71. length :: non_neg_integer() | undefined,
  72. received = 0 :: non_neg_integer(),
  73. transfer_decode_fun :: fun((binary(), cow_http_te:state()) -> cow_http_te:decode_ret()),
  74. transfer_decode_state :: cow_http_te:state()
  75. }).
  76. -record(stream, {
  77. id = undefined :: cowboy_stream:streamid(),
  78. %% Stream handlers and their state.
  79. state = undefined :: {module(), any()},
  80. %% Request method.
  81. method = undefined :: binary(),
  82. %% Client HTTP version for this stream.
  83. version = undefined :: cowboy:http_version(),
  84. %% Unparsed te header. Used to know if we can send trailers.
  85. te :: undefined | binary(),
  86. %% Expected body size.
  87. local_expected_size = undefined :: undefined | non_neg_integer(),
  88. %% Sent body size.
  89. local_sent_size = 0 :: non_neg_integer(),
  90. %% Commands queued.
  91. queue = [] :: cowboy_stream:commands()
  92. }).
  93. -type stream() :: #stream{}.
  94. -record(state, {
  95. parent :: pid(),
  96. ref :: ranch:ref(),
  97. socket :: inet:socket(),
  98. transport :: module(),
  99. proxy_header :: undefined | ranch_proxy_header:proxy_info(),
  100. opts = #{} :: cowboy:opts(),
  101. buffer = <<>> :: binary(),
  102. %% Some options may be overriden for the current stream.
  103. overriden_opts = #{} :: cowboy:opts(),
  104. %% Remote address and port for the connection.
  105. peer = undefined :: {inet:ip_address(), inet:port_number()},
  106. %% Local address and port for the connection.
  107. sock = undefined :: {inet:ip_address(), inet:port_number()},
  108. %% Client certificate (TLS only).
  109. cert :: undefined | binary(),
  110. timer = undefined :: undefined | reference(),
  111. %% Whether we are currently receiving data from the socket.
  112. active = false :: boolean(),
  113. %% Identifier for the stream currently being read (or waiting to be received).
  114. in_streamid = 1 :: pos_integer(),
  115. %% Parsing state for the current stream or stream-to-be.
  116. in_state = #ps_request_line{} :: #ps_request_line{} | #ps_header{} | #ps_body{},
  117. %% Flow requested for the current stream.
  118. flow = infinity :: non_neg_integer() | infinity,
  119. %% Identifier for the stream currently being written.
  120. %% Note that out_streamid =< in_streamid.
  121. out_streamid = 1 :: pos_integer(),
  122. %% Whether we finished writing data for the current stream.
  123. out_state = wait :: wait | chunked | streaming | done,
  124. %% The connection will be closed after this stream.
  125. last_streamid = undefined :: pos_integer(),
  126. %% Currently active HTTP/1.1 streams.
  127. streams = [] :: [stream()],
  128. %% Children processes created by streams.
  129. children = cowboy_children:init() :: cowboy_children:children()
  130. }).
  131. -include_lib("cowlib/include/cow_inline.hrl").
  132. -include_lib("cowlib/include/cow_parse.hrl").
  133. -spec init(pid(), ranch:ref(), inet:socket(), module(),
  134. ranch_proxy_header:proxy_info(), cowboy:opts()) -> ok.
  135. init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
  136. Peer0 = Transport:peername(Socket),
  137. Sock0 = Transport:sockname(Socket),
  138. Cert1 = case Transport:name() of
  139. ssl ->
  140. case ssl:peercert(Socket) of
  141. {error, no_peercert} ->
  142. {ok, undefined};
  143. Cert0 ->
  144. Cert0
  145. end;
  146. _ ->
  147. {ok, undefined}
  148. end,
  149. case {Peer0, Sock0, Cert1} of
  150. {{ok, Peer}, {ok, Sock}, {ok, Cert}} ->
  151. State = #state{
  152. parent=Parent, ref=Ref, socket=Socket,
  153. transport=Transport, proxy_header=ProxyHeader, opts=Opts,
  154. peer=Peer, sock=Sock, cert=Cert,
  155. last_streamid=maps:get(max_keepalive, Opts, 100)},
  156. before_loop(set_timeout(State, request_timeout));
  157. {{error, Reason}, _, _} ->
  158. terminate(undefined, {socket_error, Reason,
  159. 'A socket error occurred when retrieving the peer name.'});
  160. {_, {error, Reason}, _} ->
  161. terminate(undefined, {socket_error, Reason,
  162. 'A socket error occurred when retrieving the sock name.'});
  163. {_, _, {error, Reason}} ->
  164. terminate(undefined, {socket_error, Reason,
  165. 'A socket error occurred when retrieving the client TLS certificate.'})
  166. end.
  167. active(State=#state{socket=Socket, transport=Transport}) ->
  168. Transport:setopts(Socket, [{active, 100}]),
  169. State#state{active=true}.
  170. passive(State=#state{socket=Socket, transport=Transport}) ->
  171. Transport:setopts(Socket, [{active, false}]),
  172. State#state{active=false}.
  173. before_loop(State=#state{active=true}) ->
  174. loop(State);
  175. %% Do not read from the socket unless flow is large enough.
  176. before_loop(State=#state{flow=Flow}) when Flow =< 0 ->
  177. loop(State);
  178. before_loop(State) ->
  179. loop(active(State)).
  180. loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
  181. buffer=Buffer, timer=TimerRef, children=Children, in_streamid=InStreamID,
  182. last_streamid=LastStreamID}) ->
  183. Messages = Transport:messages(),
  184. InactivityTimeout = maps:get(inactivity_timeout, Opts, 300000),
  185. receive
  186. %% Discard data coming in after the last request
  187. %% we want to process was received fully.
  188. {OK, Socket, _} when OK =:= element(1, Messages), InStreamID > LastStreamID ->
  189. before_loop(State);
  190. %% Socket messages.
  191. {OK, Socket, Data} when OK =:= element(1, Messages) ->
  192. parse(<< Buffer/binary, Data/binary >>, State);
  193. {Closed, Socket} when Closed =:= element(2, Messages) ->
  194. terminate(State, {socket_error, closed, 'The socket has been closed.'});
  195. {Error, Socket, Reason} when Error =:= element(3, Messages) ->
  196. terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
  197. {Passive, Socket} when Passive =:= tcp_passive; Passive =:= ssl_passive ->
  198. loop(active(State));
  199. %% Timeouts.
  200. {timeout, Ref, {shutdown, Pid}} ->
  201. cowboy_children:shutdown_timeout(Children, Ref, Pid),
  202. loop(State);
  203. {timeout, TimerRef, Reason} ->
  204. timeout(State, Reason);
  205. {timeout, _, _} ->
  206. loop(State);
  207. %% System messages.
  208. {'EXIT', Parent, Reason} ->
  209. terminate(State, {stop, {exit, Reason}, 'Parent process terminated.'});
  210. {system, From, Request} ->
  211. sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State);
  212. %% Messages pertaining to a stream.
  213. {{Pid, StreamID}, Msg} when Pid =:= self() ->
  214. loop(info(State, StreamID, Msg));
  215. %% Exit signal from children.
  216. Msg = {'EXIT', Pid, _} ->
  217. loop(down(State, Pid, Msg));
  218. %% Calls from supervisor module.
  219. {'$gen_call', From, Call} ->
  220. cowboy_children:handle_supervisor_call(Call, From, Children, ?MODULE),
  221. loop(State);
  222. %% Unknown messages.
  223. Msg ->
  224. cowboy:log(warning, "Received stray message ~p.~n", [Msg], Opts),
  225. loop(State)
  226. after InactivityTimeout ->
  227. terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
  228. end.
  229. %% We do not set request_timeout if there are active streams.
  230. set_timeout(State=#state{streams=[_|_]}, request_timeout) ->
  231. State;
  232. %% We do not set request_timeout if we are skipping a body.
  233. set_timeout(State=#state{in_state=#ps_body{}}, request_timeout) ->
  234. State;
  235. %% We do not set idle_timeout if there are no active streams,
  236. %% unless when we are skipping a body.
  237. set_timeout(State=#state{streams=[], in_state=InState}, idle_timeout)
  238. when element(1, InState) =/= ps_body ->
  239. State;
  240. %% Otherwise we can set the timeout.
  241. set_timeout(State0=#state{opts=Opts, overriden_opts=Override}, Name) ->
  242. State = cancel_timeout(State0),
  243. Default = case Name of
  244. request_timeout -> 5000;
  245. idle_timeout -> 60000
  246. end,
  247. Timeout = case Override of
  248. %% The timeout may have been overriden for the current stream.
  249. #{Name := Timeout0} -> Timeout0;
  250. _ -> maps:get(Name, Opts, Default)
  251. end,
  252. TimerRef = case Timeout of
  253. infinity -> undefined;
  254. Timeout -> erlang:start_timer(Timeout, self(), Name)
  255. end,
  256. State#state{timer=TimerRef}.
  257. cancel_timeout(State=#state{timer=TimerRef}) ->
  258. ok = case TimerRef of
  259. undefined ->
  260. ok;
  261. _ ->
  262. %% Do a synchronous cancel and remove the message if any
  263. %% to avoid receiving stray messages.
  264. _ = erlang:cancel_timer(TimerRef),
  265. receive
  266. {timeout, TimerRef, _} -> ok
  267. after 0 ->
  268. ok
  269. end
  270. end,
  271. State#state{timer=undefined}.
  272. -spec timeout(_, _) -> no_return().
  273. timeout(State=#state{in_state=#ps_request_line{}}, request_timeout) ->
  274. terminate(State, {connection_error, timeout,
  275. 'No request-line received before timeout.'});
  276. timeout(State=#state{in_state=#ps_header{}}, request_timeout) ->
  277. error_terminate(408, State, {connection_error, timeout,
  278. 'Request headers not received before timeout.'});
  279. timeout(State, idle_timeout) ->
  280. terminate(State, {connection_error, timeout,
  281. 'Connection idle longer than configuration allows.'}).
  282. parse(<<>>, State) ->
  283. before_loop(State#state{buffer= <<>>});
  284. %% Do not process requests that come in after the last request
  285. %% and discard the buffer if any to save memory.
  286. parse(_, State=#state{in_streamid=InStreamID, in_state=#ps_request_line{},
  287. last_streamid=LastStreamID}) when InStreamID > LastStreamID ->
  288. before_loop(State#state{buffer= <<>>});
  289. parse(Buffer, State=#state{in_state=#ps_request_line{empty_lines=EmptyLines}}) ->
  290. after_parse(parse_request(Buffer, State, EmptyLines));
  291. parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=undefined}}) ->
  292. after_parse(parse_header(Buffer,
  293. State#state{in_state=PS#ps_header{headers=undefined}},
  294. Headers));
  295. parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=Name}}) ->
  296. after_parse(parse_hd_before_value(Buffer,
  297. State#state{in_state=PS#ps_header{headers=undefined, name=undefined}},
  298. Headers, Name));
  299. parse(Buffer, State=#state{in_state=#ps_body{}}) ->
  300. after_parse(parse_body(Buffer, State)).
  301. after_parse({request, Req=#{streamid := StreamID, method := Method,
  302. headers := Headers, version := Version},
  303. State0=#state{opts=Opts, buffer=Buffer, streams=Streams0}}) ->
  304. try cowboy_stream:init(StreamID, Req, Opts) of
  305. {Commands, StreamState} ->
  306. Flow = maps:get(initial_stream_flow_size, Opts, 65535),
  307. TE = maps:get(<<"te">>, Headers, undefined),
  308. Streams = [#stream{id=StreamID, state=StreamState,
  309. method=Method, version=Version, te=TE}|Streams0],
  310. State1 = case maybe_req_close(State0, Headers, Version) of
  311. close -> State0#state{streams=Streams, last_streamid=StreamID, flow=Flow};
  312. keepalive -> State0#state{streams=Streams, flow=Flow}
  313. end,
  314. State = set_timeout(State1, idle_timeout),
  315. parse(Buffer, commands(State, StreamID, Commands))
  316. catch Class:Exception ->
  317. cowboy:log(cowboy_stream:make_error_log(init,
  318. [StreamID, Req, Opts],
  319. Class, Exception, erlang:get_stacktrace()), Opts),
  320. early_error(500, State0, {internal_error, {Class, Exception},
  321. 'Unhandled exception in cowboy_stream:init/3.'}, Req),
  322. parse(Buffer, State0)
  323. end;
  324. %% Streams are sequential so the body is always about the last stream created
  325. %% unless that stream has terminated.
  326. after_parse({data, StreamID, IsFin, Data, State0=#state{opts=Opts, buffer=Buffer,
  327. streams=Streams0=[Stream=#stream{id=StreamID, state=StreamState0}|_]}}) ->
  328. try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
  329. {Commands, StreamState} ->
  330. Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
  331. Stream#stream{state=StreamState}),
  332. State1 = set_timeout(State0, case IsFin of
  333. fin -> request_timeout;
  334. nofin -> idle_timeout
  335. end),
  336. State = update_flow(IsFin, Data, State1#state{streams=Streams}),
  337. parse(Buffer, commands(State, StreamID, Commands))
  338. catch Class:Exception ->
  339. cowboy:log(cowboy_stream:make_error_log(data,
  340. [StreamID, IsFin, Data, StreamState0],
  341. Class, Exception, erlang:get_stacktrace()), Opts),
  342. %% @todo Should call parse after this.
  343. stream_terminate(State0, StreamID, {internal_error, {Class, Exception},
  344. 'Unhandled exception in cowboy_stream:data/4.'})
  345. end;
  346. %% No corresponding stream. We must skip the body of the previous request
  347. %% in order to process the next one.
  348. after_parse({data, _, IsFin, _, State}) ->
  349. before_loop(set_timeout(State, case IsFin of
  350. fin -> request_timeout;
  351. nofin -> idle_timeout
  352. end));
  353. after_parse({more, State}) ->
  354. before_loop(set_timeout(State, idle_timeout)).
  355. update_flow(fin, _, State) ->
  356. State#state{flow=infinity};
  357. update_flow(nofin, Data, State=#state{flow=Flow0}) ->
  358. State#state{flow=Flow0 - byte_size(Data)}.
  359. %% Request-line.
  360. -spec parse_request(Buffer, State, non_neg_integer())
  361. -> {request, cowboy_req:req(), State}
  362. | {data, cowboy_stream:streamid(), cowboy_stream:fin(), binary(), State}
  363. | {more, State}
  364. when Buffer::binary(), State::#state{}.
  365. %% Empty lines must be using \r\n.
  366. parse_request(<< $\n, _/bits >>, State, _) ->
  367. error_terminate(400, State, {connection_error, protocol_error,
  368. 'Empty lines between requests must use the CRLF line terminator. (RFC7230 3.5)'});
  369. parse_request(<< $\s, _/bits >>, State, _) ->
  370. error_terminate(400, State, {connection_error, protocol_error,
  371. 'The request-line must not begin with a space. (RFC7230 3.1.1, RFC7230 3.5)'});
  372. %% We limit the length of the Request-line to MaxLength to avoid endlessly
  373. %% reading from the socket and eventually crashing.
  374. parse_request(Buffer, State=#state{opts=Opts, in_streamid=InStreamID}, EmptyLines) ->
  375. MaxLength = maps:get(max_request_line_length, Opts, 8000),
  376. MaxEmptyLines = maps:get(max_empty_lines, Opts, 5),
  377. case match_eol(Buffer, 0) of
  378. nomatch when byte_size(Buffer) > MaxLength ->
  379. error_terminate(414, State, {connection_error, limit_reached,
  380. 'The request-line length is larger than configuration allows. (RFC7230 3.1.1)'});
  381. nomatch ->
  382. {more, State#state{buffer=Buffer, in_state=#ps_request_line{empty_lines=EmptyLines}}};
  383. 1 when EmptyLines =:= MaxEmptyLines ->
  384. error_terminate(400, State, {connection_error, limit_reached,
  385. 'More empty lines were received than configuration allows. (RFC7230 3.5)'});
  386. 1 ->
  387. << _:16, Rest/bits >> = Buffer,
  388. parse_request(Rest, State, EmptyLines + 1);
  389. _ ->
  390. case Buffer of
  391. %% @todo * is only for server-wide OPTIONS request (RFC7230 5.3.4); tests
  392. << "OPTIONS * ", Rest/bits >> ->
  393. parse_version(Rest, State, <<"OPTIONS">>, undefined, <<"*">>, <<>>);
  394. <<"CONNECT ", _/bits>> ->
  395. error_terminate(501, State, {connection_error, no_error,
  396. 'The CONNECT method is currently not implemented. (RFC7231 4.3.6)'});
  397. <<"TRACE ", _/bits>> ->
  398. error_terminate(501, State, {connection_error, no_error,
  399. 'The TRACE method is currently not implemented. (RFC7231 4.3.8)'});
  400. %% Accept direct HTTP/2 only at the beginning of the connection.
  401. << "PRI * HTTP/2.0\r\n", _/bits >> when InStreamID =:= 1 ->
  402. %% @todo Might be worth throwing to get a clean stacktrace.
  403. http2_upgrade(State, Buffer);
  404. _ ->
  405. parse_method(Buffer, State, <<>>,
  406. maps:get(max_method_length, Opts, 32))
  407. end
  408. end.
  409. match_eol(<< $\n, _/bits >>, N) ->
  410. N;
  411. match_eol(<< _, Rest/bits >>, N) ->
  412. match_eol(Rest, N + 1);
  413. match_eol(_, _) ->
  414. nomatch.
  415. parse_method(_, State, _, 0) ->
  416. error_terminate(501, State, {connection_error, limit_reached,
  417. 'The method name is longer than configuration allows. (RFC7230 3.1.1)'});
  418. parse_method(<< C, Rest/bits >>, State, SoFar, Remaining) ->
  419. case C of
  420. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  421. 'The method name must not be followed with a line break. (RFC7230 3.1.1)'});
  422. $\s -> parse_uri(Rest, State, SoFar);
  423. _ when ?IS_TOKEN(C) -> parse_method(Rest, State, << SoFar/binary, C >>, Remaining - 1);
  424. _ -> error_terminate(400, State, {connection_error, protocol_error,
  425. 'The method name must contain only valid token characters. (RFC7230 3.1.1)'})
  426. end.
  427. parse_uri(<< H, T, T, P, "://", Rest/bits >>, State, Method)
  428. when H =:= $h orelse H =:= $H, T =:= $t orelse T =:= $T;
  429. P =:= $p orelse P =:= $P ->
  430. parse_uri_authority(Rest, State, Method);
  431. parse_uri(<< H, T, T, P, S, "://", Rest/bits >>, State, Method)
  432. when H =:= $h orelse H =:= $H, T =:= $t orelse T =:= $T;
  433. P =:= $p orelse P =:= $P; S =:= $s orelse S =:= $S ->
  434. parse_uri_authority(Rest, State, Method);
  435. parse_uri(<< $/, Rest/bits >>, State, Method) ->
  436. parse_uri_path(Rest, State, Method, undefined, <<$/>>);
  437. parse_uri(_, State, _) ->
  438. error_terminate(400, State, {connection_error, protocol_error,
  439. 'Invalid request-line or request-target. (RFC7230 3.1.1, RFC7230 5.3)'}).
  440. %% @todo We probably want to apply max_authority_length also
  441. %% to the host header and to document this option. It might
  442. %% also be useful for HTTP/2 requests.
  443. parse_uri_authority(Rest, State=#state{opts=Opts}, Method) ->
  444. parse_uri_authority(Rest, State, Method, <<>>,
  445. maps:get(max_authority_length, Opts, 255)).
  446. parse_uri_authority(_, State, _, _, 0) ->
  447. error_terminate(414, State, {connection_error, limit_reached,
  448. 'The authority component of the absolute URI is longer than configuration allows. (RFC7230 2.7.1)'});
  449. parse_uri_authority(<<C, Rest/bits>>, State, Method, SoFar, Remaining) ->
  450. case C of
  451. $\r ->
  452. error_terminate(400, State, {connection_error, protocol_error,
  453. 'The request-target must not be followed by a line break. (RFC7230 3.1.1)'});
  454. $@ ->
  455. error_terminate(400, State, {connection_error, protocol_error,
  456. 'Absolute URIs must not include a userinfo component. (RFC7230 2.7.1)'});
  457. C when SoFar =:= <<>> andalso
  458. ((C =:= $/) orelse (C =:= $\s) orelse (C =:= $?) orelse (C =:= $#)) ->
  459. error_terminate(400, State, {connection_error, protocol_error,
  460. 'Absolute URIs must include a non-empty host component. (RFC7230 2.7.1)'});
  461. $: when SoFar =:= <<>> ->
  462. error_terminate(400, State, {connection_error, protocol_error,
  463. 'Absolute URIs must include a non-empty host component. (RFC7230 2.7.1)'});
  464. $/ -> parse_uri_path(Rest, State, Method, SoFar, <<"/">>);
  465. $\s -> parse_version(Rest, State, Method, SoFar, <<"/">>, <<>>);
  466. $? -> parse_uri_query(Rest, State, Method, SoFar, <<"/">>, <<>>);
  467. $# -> skip_uri_fragment(Rest, State, Method, SoFar, <<"/">>, <<>>);
  468. C -> parse_uri_authority(Rest, State, Method, <<SoFar/binary, C>>, Remaining - 1)
  469. end.
  470. parse_uri_path(<<C, Rest/bits>>, State, Method, Authority, SoFar) ->
  471. case C of
  472. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  473. 'The request-target must not be followed by a line break. (RFC7230 3.1.1)'});
  474. $\s -> parse_version(Rest, State, Method, Authority, SoFar, <<>>);
  475. $? -> parse_uri_query(Rest, State, Method, Authority, SoFar, <<>>);
  476. $# -> skip_uri_fragment(Rest, State, Method, Authority, SoFar, <<>>);
  477. _ -> parse_uri_path(Rest, State, Method, Authority, <<SoFar/binary, C>>)
  478. end.
  479. parse_uri_query(<<C, Rest/bits>>, State, M, A, P, SoFar) ->
  480. case C of
  481. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  482. 'The request-target must not be followed by a line break. (RFC7230 3.1.1)'});
  483. $\s -> parse_version(Rest, State, M, A, P, SoFar);
  484. $# -> skip_uri_fragment(Rest, State, M, A, P, SoFar);
  485. _ -> parse_uri_query(Rest, State, M, A, P, <<SoFar/binary, C>>)
  486. end.
  487. skip_uri_fragment(<<C, Rest/bits>>, State, M, A, P, Q) ->
  488. case C of
  489. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  490. 'The request-target must not be followed by a line break. (RFC7230 3.1.1)'});
  491. $\s -> parse_version(Rest, State, M, A, P, Q);
  492. _ -> skip_uri_fragment(Rest, State, M, A, P, Q)
  493. end.
  494. parse_version(<< "HTTP/1.1\r\n", Rest/bits >>, State, M, A, P, Q) ->
  495. before_parse_headers(Rest, State, M, A, P, Q, 'HTTP/1.1');
  496. parse_version(<< "HTTP/1.0\r\n", Rest/bits >>, State, M, A, P, Q) ->
  497. before_parse_headers(Rest, State, M, A, P, Q, 'HTTP/1.0');
  498. parse_version(<< "HTTP/1.", _, C, _/bits >>, State, _, _, _, _) when C =:= $\s; C =:= $\t ->
  499. error_terminate(400, State, {connection_error, protocol_error,
  500. 'Whitespace is not allowed after the HTTP version. (RFC7230 3.1.1)'});
  501. parse_version(<< C, _/bits >>, State, _, _, _, _) when C =:= $\s; C =:= $\t ->
  502. error_terminate(400, State, {connection_error, protocol_error,
  503. 'The separator between request target and version must be a single SP. (RFC7230 3.1.1)'});
  504. parse_version(_, State, _, _, _, _) ->
  505. error_terminate(505, State, {connection_error, protocol_error,
  506. 'Unsupported HTTP version. (RFC7230 2.6)'}).
  507. before_parse_headers(Rest, State, M, A, P, Q, V) ->
  508. parse_header(Rest, State#state{in_state=#ps_header{
  509. method=M, authority=A, path=P, qs=Q, version=V}}, #{}).
  510. %% Headers.
  511. %% We need two or more bytes in the buffer to continue.
  512. parse_header(Rest, State=#state{in_state=PS}, Headers) when byte_size(Rest) < 2 ->
  513. {more, State#state{buffer=Rest, in_state=PS#ps_header{headers=Headers}}};
  514. parse_header(<< $\r, $\n, Rest/bits >>, S, Headers) ->
  515. request(Rest, S, Headers);
  516. parse_header(Buffer, State=#state{opts=Opts, in_state=PS}, Headers) ->
  517. MaxHeaders = maps:get(max_headers, Opts, 100),
  518. NumHeaders = maps:size(Headers),
  519. if
  520. NumHeaders >= MaxHeaders ->
  521. error_terminate(431, State#state{in_state=PS#ps_header{headers=Headers}},
  522. {connection_error, limit_reached,
  523. 'The number of headers is larger than configuration allows. (RFC7230 3.2.5, RFC6585 5)'});
  524. true ->
  525. parse_header_colon(Buffer, State, Headers)
  526. end.
  527. parse_header_colon(Buffer, State=#state{opts=Opts, in_state=PS}, Headers) ->
  528. MaxLength = maps:get(max_header_name_length, Opts, 64),
  529. case match_colon(Buffer, 0) of
  530. nomatch when byte_size(Buffer) > MaxLength ->
  531. error_terminate(431, State#state{in_state=PS#ps_header{headers=Headers}},
  532. {connection_error, limit_reached,
  533. 'A header name is larger than configuration allows. (RFC7230 3.2.5, RFC6585 5)'});
  534. nomatch ->
  535. %% We don't have a colon but we might have an invalid header line,
  536. %% so check if we have an LF and abort with an error if we do.
  537. case match_eol(Buffer, 0) of
  538. nomatch ->
  539. {more, State#state{buffer=Buffer, in_state=PS#ps_header{headers=Headers}}};
  540. _ ->
  541. error_terminate(400, State#state{in_state=PS#ps_header{headers=Headers}},
  542. {connection_error, protocol_error,
  543. 'A header line is missing a colon separator. (RFC7230 3.2.4)'})
  544. end;
  545. _ ->
  546. parse_hd_name(Buffer, State, Headers, <<>>)
  547. end.
  548. match_colon(<< $:, _/bits >>, N) ->
  549. N;
  550. match_colon(<< _, Rest/bits >>, N) ->
  551. match_colon(Rest, N + 1);
  552. match_colon(_, _) ->
  553. nomatch.
  554. parse_hd_name(<< $:, Rest/bits >>, State, H, SoFar) ->
  555. parse_hd_before_value(Rest, State, H, SoFar);
  556. parse_hd_name(<< C, _/bits >>, State=#state{in_state=PS}, H, <<>>) when ?IS_WS(C) ->
  557. error_terminate(400, State#state{in_state=PS#ps_header{headers=H}},
  558. {connection_error, protocol_error,
  559. 'Whitespace is not allowed before the header name. (RFC7230 3.2)'});
  560. parse_hd_name(<< C, _/bits >>, State=#state{in_state=PS}, H, _) when ?IS_WS(C) ->
  561. error_terminate(400, State#state{in_state=PS#ps_header{headers=H}},
  562. {connection_error, protocol_error,
  563. 'Whitespace is not allowed between the header name and the colon. (RFC7230 3.2.4)'});
  564. parse_hd_name(<< C, Rest/bits >>, State, H, SoFar) ->
  565. ?LOWER(parse_hd_name, Rest, State, H, SoFar).
  566. parse_hd_before_value(<< $\s, Rest/bits >>, S, H, N) ->
  567. parse_hd_before_value(Rest, S, H, N);
  568. parse_hd_before_value(<< $\t, Rest/bits >>, S, H, N) ->
  569. parse_hd_before_value(Rest, S, H, N);
  570. parse_hd_before_value(Buffer, State=#state{opts=Opts, in_state=PS}, H, N) ->
  571. MaxLength = maps:get(max_header_value_length, Opts, 4096),
  572. case match_eol(Buffer, 0) of
  573. nomatch when byte_size(Buffer) > MaxLength ->
  574. error_terminate(431, State#state{in_state=PS#ps_header{headers=H}},
  575. {connection_error, limit_reached,
  576. 'A header value is larger than configuration allows. (RFC7230 3.2.5, RFC6585 5)'});
  577. nomatch ->
  578. {more, State#state{buffer=Buffer, in_state=PS#ps_header{headers=H, name=N}}};
  579. _ ->
  580. parse_hd_value(Buffer, State, H, N, <<>>)
  581. end.
  582. parse_hd_value(<< $\r, $\n, Rest/bits >>, S, Headers0, Name, SoFar) ->
  583. Value = clean_value_ws_end(SoFar, byte_size(SoFar) - 1),
  584. Headers = case maps:get(Name, Headers0, undefined) of
  585. undefined -> Headers0#{Name => Value};
  586. %% The cookie header does not use proper HTTP header lists.
  587. Value0 when Name =:= <<"cookie">> -> Headers0#{Name => << Value0/binary, "; ", Value/binary >>};
  588. Value0 -> Headers0#{Name => << Value0/binary, ", ", Value/binary >>}
  589. end,
  590. parse_header(Rest, S, Headers);
  591. parse_hd_value(<< C, Rest/bits >>, S, H, N, SoFar) ->
  592. parse_hd_value(Rest, S, H, N, << SoFar/binary, C >>).
  593. clean_value_ws_end(_, -1) ->
  594. <<>>;
  595. clean_value_ws_end(Value, N) ->
  596. case binary:at(Value, N) of
  597. $\s -> clean_value_ws_end(Value, N - 1);
  598. $\t -> clean_value_ws_end(Value, N - 1);
  599. _ ->
  600. S = N + 1,
  601. << Value2:S/binary, _/bits >> = Value,
  602. Value2
  603. end.
  604. -ifdef(TEST).
  605. clean_value_ws_end_test_() ->
  606. Tests = [
  607. {<<>>, <<>>},
  608. {<<" ">>, <<>>},
  609. {<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
  610. "text/html;level=2;q=0.4, */*;q=0.5 \t \t ">>,
  611. <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
  612. "text/html;level=2;q=0.4, */*;q=0.5">>}
  613. ],
  614. [{V, fun() -> R = clean_value_ws_end(V, byte_size(V) - 1) end} || {V, R} <- Tests].
  615. horse_clean_value_ws_end() ->
  616. horse:repeat(200000,
  617. clean_value_ws_end(
  618. <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
  619. "text/html;level=2;q=0.4, */*;q=0.5 ">>,
  620. byte_size(<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
  621. "text/html;level=2;q=0.4, */*;q=0.5 ">>) - 1)
  622. ).
  623. -endif.
  624. request(Buffer, State=#state{transport=Transport,
  625. in_state=PS=#ps_header{authority=Authority, version=Version}}, Headers) ->
  626. case maps:get(<<"host">>, Headers, undefined) of
  627. undefined when Version =:= 'HTTP/1.1' ->
  628. %% @todo Might want to not close the connection on this and next one.
  629. error_terminate(400, State#state{in_state=PS#ps_header{headers=Headers}},
  630. {stream_error, protocol_error,
  631. 'HTTP/1.1 requests must include a host header. (RFC7230 5.4)'});
  632. undefined ->
  633. request(Buffer, State, Headers, <<>>, default_port(Transport:secure()));
  634. %% @todo When CONNECT requests come in we need to ignore the RawHost
  635. %% and instead use the Authority as the source of host.
  636. RawHost when Authority =:= undefined; Authority =:= RawHost ->
  637. request_parse_host(Buffer, State, Headers, RawHost);
  638. %% RFC7230 does not explicitly ask us to reject requests
  639. %% that have a different authority component and host header.
  640. %% However it DOES ask clients to set them to the same value,
  641. %% so we enforce that.
  642. _ ->
  643. error_terminate(400, State#state{in_state=PS#ps_header{headers=Headers}},
  644. {stream_error, protocol_error,
  645. 'The host header is different than the absolute-form authority component. (RFC7230 5.4)'})
  646. end.
  647. request_parse_host(Buffer, State=#state{transport=Transport, in_state=PS}, Headers, RawHost) ->
  648. try cow_http_hd:parse_host(RawHost) of
  649. {Host, undefined} ->
  650. request(Buffer, State, Headers, Host, default_port(Transport:secure()));
  651. {Host, Port} when Port > 0, Port =< 65535 ->
  652. request(Buffer, State, Headers, Host, Port);
  653. _ ->
  654. error_terminate(400, State, {stream_error, protocol_error,
  655. 'The port component of the absolute-form is not in the range 0..65535. (RFC7230 2.7.1)'})
  656. catch _:_ ->
  657. error_terminate(400, State#state{in_state=PS#ps_header{headers=Headers}},
  658. {stream_error, protocol_error,
  659. 'The host header is invalid. (RFC7230 5.4)'})
  660. end.
  661. -spec default_port(boolean()) -> 80 | 443.
  662. default_port(true) -> 443;
  663. default_port(_) -> 80.
  664. %% End of request parsing.
  665. request(Buffer, State0=#state{ref=Ref, transport=Transport, peer=Peer, sock=Sock, cert=Cert,
  666. proxy_header=ProxyHeader, in_streamid=StreamID, in_state=
  667. PS=#ps_header{method=Method, path=Path, qs=Qs, version=Version}},
  668. Headers0, Host, Port) ->
  669. Scheme = case Transport:secure() of
  670. true -> <<"https">>;
  671. false -> <<"http">>
  672. end,
  673. {Headers, HasBody, BodyLength, TDecodeFun, TDecodeState} = case Headers0 of
  674. #{<<"transfer-encoding">> := TransferEncoding0} ->
  675. try cow_http_hd:parse_transfer_encoding(TransferEncoding0) of
  676. [<<"chunked">>] ->
  677. {maps:remove(<<"content-length">>, Headers0),
  678. true, undefined, fun cow_http_te:stream_chunked/2, {0, 0}};
  679. _ ->
  680. error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers0}},
  681. {stream_error, protocol_error,
  682. 'Cowboy only supports transfer-encoding: chunked. (RFC7230 3.3.1)'})
  683. catch _:_ ->
  684. error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers0}},
  685. {stream_error, protocol_error,
  686. 'The transfer-encoding header is invalid. (RFC7230 3.3.1)'})
  687. end;
  688. #{<<"content-length">> := <<"0">>} ->
  689. {Headers0, false, 0, undefined, undefined};
  690. #{<<"content-length">> := BinLength} ->
  691. Length = try
  692. cow_http_hd:parse_content_length(BinLength)
  693. catch _:_ ->
  694. error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers0}},
  695. {stream_error, protocol_error,
  696. 'The content-length header is invalid. (RFC7230 3.3.2)'})
  697. end,
  698. {Headers0, true, Length, fun cow_http_te:stream_identity/2, {0, Length}};
  699. _ ->
  700. {Headers0, false, 0, undefined, undefined}
  701. end,
  702. Req0 = #{
  703. ref => Ref,
  704. pid => self(),
  705. streamid => StreamID,
  706. peer => Peer,
  707. sock => Sock,
  708. cert => Cert,
  709. method => Method,
  710. scheme => Scheme,
  711. host => Host,
  712. port => Port,
  713. path => Path,
  714. qs => Qs,
  715. version => Version,
  716. %% We are transparently taking care of transfer-encodings so
  717. %% the user code has no need to know about it.
  718. headers => maps:remove(<<"transfer-encoding">>, Headers),
  719. has_body => HasBody,
  720. body_length => BodyLength
  721. },
  722. %% We add the PROXY header information if any.
  723. Req = case ProxyHeader of
  724. undefined -> Req0;
  725. _ -> Req0#{proxy_header => ProxyHeader}
  726. end,
  727. case is_http2_upgrade(Headers, Version) of
  728. false ->
  729. State = case HasBody of
  730. true ->
  731. State0#state{in_state=#ps_body{
  732. length = BodyLength,
  733. transfer_decode_fun = TDecodeFun,
  734. transfer_decode_state = TDecodeState
  735. }};
  736. false ->
  737. State0#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}
  738. end,
  739. {request, Req, State#state{buffer=Buffer}};
  740. {true, HTTP2Settings} ->
  741. %% We save the headers in case the upgrade will fail
  742. %% and we need to pass them to cowboy_stream:early_error.
  743. http2_upgrade(State0#state{in_state=PS#ps_header{headers=Headers}},
  744. Buffer, HTTP2Settings, Req)
  745. end.
  746. %% HTTP/2 upgrade.
  747. %% @todo We must not upgrade to h2c over a TLS connection.
  748. is_http2_upgrade(#{<<"connection">> := Conn, <<"upgrade">> := Upgrade,
  749. <<"http2-settings">> := HTTP2Settings}, 'HTTP/1.1') ->
  750. Conns = cow_http_hd:parse_connection(Conn),
  751. case {lists:member(<<"upgrade">>, Conns), lists:member(<<"http2-settings">>, Conns)} of
  752. {true, true} ->
  753. Protocols = cow_http_hd:parse_upgrade(Upgrade),
  754. case lists:member(<<"h2c">>, Protocols) of
  755. true ->
  756. {true, HTTP2Settings};
  757. false ->
  758. false
  759. end;
  760. _ ->
  761. false
  762. end;
  763. is_http2_upgrade(_, _) ->
  764. false.
  765. %% Prior knowledge upgrade, without an HTTP/1.1 request.
  766. http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
  767. proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert}, Buffer) ->
  768. case Transport:secure() of
  769. false ->
  770. _ = cancel_timeout(State),
  771. cowboy_http2:init(Parent, Ref, Socket, Transport,
  772. ProxyHeader, Opts, Peer, Sock, Cert, Buffer);
  773. true ->
  774. error_terminate(400, State, {connection_error, protocol_error,
  775. 'Clients that support HTTP/2 over TLS MUST use ALPN. (RFC7540 3.4)'})
  776. end.
  777. %% Upgrade via an HTTP/1.1 request.
  778. http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
  779. proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert},
  780. Buffer, HTTP2Settings, Req) ->
  781. %% @todo
  782. %% However if the client sent a body, we need to read the body in full
  783. %% and if we can't do that, return a 413 response. Some options are in order.
  784. %% Always half-closed stream coming from this side.
  785. try cow_http_hd:parse_http2_settings(HTTP2Settings) of
  786. Settings ->
  787. _ = cancel_timeout(State),
  788. cowboy_http2:init(Parent, Ref, Socket, Transport,
  789. ProxyHeader, Opts, Peer, Sock, Cert, Buffer, Settings, Req)
  790. catch _:_ ->
  791. error_terminate(400, State, {connection_error, protocol_error,
  792. 'The HTTP2-Settings header must contain a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'})
  793. end.
  794. %% Request body parsing.
  795. parse_body(Buffer, State=#state{in_streamid=StreamID, in_state=
  796. PS=#ps_body{received=Received, transfer_decode_fun=TDecode,
  797. transfer_decode_state=TState0}}) ->
  798. %% @todo Proper trailers.
  799. try TDecode(Buffer, TState0) of
  800. more ->
  801. {more, State#state{buffer=Buffer}};
  802. {more, Data, TState} ->
  803. {data, StreamID, nofin, Data, State#state{buffer= <<>>,
  804. in_state=PS#ps_body{received=Received + byte_size(Data),
  805. transfer_decode_state=TState}}};
  806. {more, Data, _Length, TState} when is_integer(_Length) ->
  807. {data, StreamID, nofin, Data, State#state{buffer= <<>>,
  808. in_state=PS#ps_body{received=Received + byte_size(Data),
  809. transfer_decode_state=TState}}};
  810. {more, Data, Rest, TState} ->
  811. {data, StreamID, nofin, Data, State#state{buffer=Rest,
  812. in_state=PS#ps_body{received=Received + byte_size(Data),
  813. transfer_decode_state=TState}}};
  814. {done, _HasTrailers, Rest} ->
  815. {data, StreamID, fin, <<>>,
  816. State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}}};
  817. {done, Data, _HasTrailers, Rest} ->
  818. {data, StreamID, fin, Data,
  819. State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}}}
  820. catch _:_ ->
  821. Reason = {connection_error, protocol_error,
  822. 'Failure to decode the content. (RFC7230 4)'},
  823. terminate(stream_terminate(State, StreamID, Reason), Reason)
  824. end.
  825. %% Message handling.
  826. down(State=#state{opts=Opts, children=Children0}, Pid, Msg) ->
  827. case cowboy_children:down(Children0, Pid) of
  828. %% The stream was terminated already.
  829. {ok, undefined, Children} ->
  830. State#state{children=Children};
  831. %% The stream is still running.
  832. {ok, StreamID, Children} ->
  833. info(State#state{children=Children}, StreamID, Msg);
  834. %% The process was unknown.
  835. error ->
  836. cowboy:log(warning, "Received EXIT signal ~p for unknown process ~p.~n",
  837. [Msg, Pid], Opts),
  838. State
  839. end.
  840. info(State=#state{opts=Opts, streams=Streams0}, StreamID, Msg) ->
  841. case lists:keyfind(StreamID, #stream.id, Streams0) of
  842. Stream = #stream{state=StreamState0} ->
  843. try cowboy_stream:info(StreamID, Msg, StreamState0) of
  844. {Commands, StreamState} ->
  845. Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
  846. Stream#stream{state=StreamState}),
  847. commands(State#state{streams=Streams}, StreamID, Commands)
  848. catch Class:Exception ->
  849. cowboy:log(cowboy_stream:make_error_log(info,
  850. [StreamID, Msg, StreamState0],
  851. Class, Exception, erlang:get_stacktrace()), Opts),
  852. stream_terminate(State, StreamID, {internal_error, {Class, Exception},
  853. 'Unhandled exception in cowboy_stream:info/3.'})
  854. end;
  855. false ->
  856. cowboy:log(warning, "Received message ~p for unknown stream ~p.~n",
  857. [Msg, StreamID], Opts),
  858. State
  859. end.
  860. %% Commands.
  861. commands(State, _, []) ->
  862. State;
  863. %% Supervise a child process.
  864. commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) ->
  865. commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)},
  866. StreamID, Tail);
  867. %% Error handling.
  868. commands(State, StreamID, [Error = {internal_error, _, _}|Tail]) ->
  869. commands(stream_terminate(State, StreamID, Error), StreamID, Tail);
  870. %% Commands for a stream currently inactive.
  871. commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Commands)
  872. when Current =/= StreamID ->
  873. %% @todo We still want to handle some commands...
  874. Stream = #stream{queue=Queue} = lists:keyfind(StreamID, #stream.id, Streams0),
  875. Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
  876. Stream#stream{queue=Queue ++ Commands}),
  877. State#state{streams=Streams};
  878. %% Read the request body.
  879. commands(State0=#state{flow=Flow0}, StreamID, [{flow, Size}|Tail]) ->
  880. %% We must read *at least* Size of data otherwise functions
  881. %% like cowboy_req:read_body/1,2 will wait indefinitely.
  882. Flow = if
  883. Flow0 < 0 -> Size;
  884. true -> Flow0 + Size
  885. end,
  886. %% Reenable active mode if necessary.
  887. State = if
  888. Flow0 =< 0, Flow > 0 ->
  889. active(State0);
  890. true ->
  891. State0
  892. end,
  893. commands(State#state{flow=Flow}, StreamID, Tail);
  894. %% Error responses are sent only if a response wasn't sent already.
  895. commands(State=#state{out_state=wait, out_streamid=StreamID}, StreamID,
  896. [{error_response, Status, Headers0, Body}|Tail]) ->
  897. %% We close the connection when the error response is 408, as it
  898. %% indicates a timeout and the RFC recommends that we stop here. (RFC7231 6.5.7)
  899. Headers = case Status of
  900. 408 -> Headers0#{<<"connection">> => <<"close">>};
  901. <<"408", _/bits>> -> Headers0#{<<"connection">> => <<"close">>};
  902. _ -> Headers0
  903. end,
  904. commands(State, StreamID, [{response, Status, Headers, Body}|Tail]);
  905. commands(State, StreamID, [{error_response, _, _, _}|Tail]) ->
  906. commands(State, StreamID, Tail);
  907. %% Send an informational response.
  908. commands(State=#state{socket=Socket, transport=Transport, out_state=wait, streams=Streams},
  909. StreamID, [{inform, StatusCode, Headers}|Tail]) ->
  910. %% @todo I'm pretty sure the last stream in the list is the one we want
  911. %% considering all others are queued.
  912. #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
  913. _ = case Version of
  914. 'HTTP/1.1' ->
  915. Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1',
  916. headers_to_list(Headers)));
  917. %% Do not send informational responses to HTTP/1.0 clients. (RFC7231 6.2)
  918. 'HTTP/1.0' ->
  919. ok
  920. end,
  921. commands(State, StreamID, Tail);
  922. %% Send a full response.
  923. %%
  924. %% @todo Kill the stream if it sent a response when one has already been sent.
  925. %% @todo Keep IsFin in the state.
  926. %% @todo Same two things above apply to DATA, possibly promise too.
  927. commands(State0=#state{socket=Socket, transport=Transport, out_state=wait, streams=Streams}, StreamID,
  928. [{response, StatusCode, Headers0, Body}|Tail]) ->
  929. %% @todo I'm pretty sure the last stream in the list is the one we want
  930. %% considering all others are queued.
  931. #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
  932. {State1, Headers} = connection(State0, Headers0, StreamID, Version),
  933. State = State1#state{out_state=done},
  934. %% @todo Ensure content-length is set.
  935. Response = cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers)),
  936. case Body of
  937. {sendfile, _, _, _} ->
  938. Transport:send(Socket, Response),
  939. sendfile(State, Body);
  940. _ ->
  941. Transport:send(Socket, [Response, Body])
  942. end,
  943. commands(State, StreamID, Tail);
  944. %% Send response headers and initiate chunked encoding or streaming.
  945. commands(State0=#state{socket=Socket, transport=Transport,
  946. opts=Opts, overriden_opts=Override, streams=Streams0, out_state=OutState},
  947. StreamID, [{headers, StatusCode, Headers0}|Tail]) ->
  948. %% @todo Same as above (about the last stream in the list).
  949. Stream = #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams0),
  950. Status = cow_http:status_to_integer(StatusCode),
  951. ContentLength = maps:get(<<"content-length">>, Headers0, undefined),
  952. %% Chunked transfer-encoding can be disabled on a per-request basis.
  953. Chunked = case Override of
  954. #{chunked := Chunked0} -> Chunked0;
  955. _ -> maps:get(chunked, Opts, true)
  956. end,
  957. {State1, Headers1} = case {Status, ContentLength, Version} of
  958. {204, _, 'HTTP/1.1'} ->
  959. {State0#state{out_state=done}, Headers0};
  960. {304, _, 'HTTP/1.1'} ->
  961. {State0#state{out_state=done}, Headers0};
  962. {_, undefined, 'HTTP/1.1'} when Chunked ->
  963. {State0#state{out_state=chunked}, Headers0#{<<"transfer-encoding">> => <<"chunked">>}};
  964. %% Close the connection after streaming without content-length
  965. %% to all HTTP/1.0 clients and to HTTP/1.1 clients when chunked is disabled.
  966. {_, undefined, _} ->
  967. {State0#state{out_state=streaming, last_streamid=StreamID}, Headers0};
  968. %% Stream the response body without chunked transfer-encoding.
  969. _ ->
  970. ExpectedSize = cow_http_hd:parse_content_length(ContentLength),
  971. Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
  972. Stream#stream{local_expected_size=ExpectedSize}),
  973. {State0#state{out_state=streaming, streams=Streams}, Headers0}
  974. end,
  975. Headers2 = case stream_te(OutState, Stream) of
  976. trailers -> Headers1;
  977. _ -> maps:remove(<<"trailer">>, Headers1)
  978. end,
  979. {State, Headers} = connection(State1, Headers2, StreamID, Version),
  980. Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers))),
  981. commands(State, StreamID, Tail);
  982. %% Send a response body chunk.
  983. %% @todo We need to kill the stream if it tries to send data before headers.
  984. commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out_state=OutState},
  985. StreamID, [{data, IsFin, Data}|Tail]) ->
  986. %% Do not send anything when the user asks to send an empty
  987. %% data frame, as that would break the protocol.
  988. Size = case Data of
  989. {sendfile, _, B, _} -> B;
  990. _ -> iolist_size(Data)
  991. end,
  992. %% Depending on the current state we may need to send nothing,
  993. %% the last chunk, chunked data with/without the last chunk,
  994. %% or just the data as-is.
  995. Stream = case lists:keyfind(StreamID, #stream.id, Streams0) of
  996. Stream0=#stream{method= <<"HEAD">>} ->
  997. Stream0;
  998. Stream0 when Size =:= 0, IsFin =:= fin, OutState =:= chunked ->
  999. Transport:send(Socket, <<"0\r\n\r\n">>),
  1000. Stream0;
  1001. Stream0 when Size =:= 0 ->
  1002. Stream0;
  1003. Stream0 when is_tuple(Data), OutState =:= chunked ->
  1004. Transport:send(Socket, [integer_to_binary(Size, 16), <<"\r\n">>]),
  1005. sendfile(State0, Data),
  1006. Transport:send(Socket,
  1007. case IsFin of
  1008. fin -> <<"\r\n0\r\n\r\n">>;
  1009. nofin -> <<"\r\n">>
  1010. end),
  1011. Stream0;
  1012. Stream0 when OutState =:= chunked ->
  1013. Transport:send(Socket, [
  1014. integer_to_binary(Size, 16), <<"\r\n">>, Data,
  1015. case IsFin of
  1016. fin -> <<"\r\n0\r\n\r\n">>;
  1017. nofin -> <<"\r\n">>
  1018. end
  1019. ]),
  1020. Stream0;
  1021. Stream0 when OutState =:= streaming ->
  1022. #stream{local_sent_size=SentSize0, local_expected_size=ExpectedSize} = Stream0,
  1023. SentSize = SentSize0 + Size,
  1024. if
  1025. %% ExpectedSize may be undefined, which is > any integer value.
  1026. SentSize > ExpectedSize ->
  1027. terminate(State0, response_body_too_large);
  1028. is_tuple(Data) ->
  1029. sendfile(State0, Data);
  1030. true ->
  1031. Transport:send(Socket, Data)
  1032. end,
  1033. Stream0#stream{local_sent_size=SentSize}
  1034. end,
  1035. State = case IsFin of
  1036. fin -> State0#state{out_state=done};
  1037. nofin -> State0
  1038. end,
  1039. Streams = lists:keyreplace(StreamID, #stream.id, Streams0, Stream),
  1040. commands(State#state{streams=Streams}, StreamID, Tail);
  1041. commands(State=#state{socket=Socket, transport=Transport, streams=Streams, out_state=OutState},
  1042. StreamID, [{trailers, Trailers}|Tail]) ->
  1043. case stream_te(OutState, lists:keyfind(StreamID, #stream.id, Streams)) of
  1044. trailers ->
  1045. Transport:send(Socket, [
  1046. <<"0\r\n">>,
  1047. cow_http:headers(maps:to_list(Trailers)),
  1048. <<"\r\n">>
  1049. ]);
  1050. no_trailers ->
  1051. Transport:send(Socket, <<"0\r\n\r\n">>);
  1052. not_chunked ->
  1053. ok
  1054. end,
  1055. commands(State#state{out_state=done}, StreamID, Tail);
  1056. %% Protocol takeover.
  1057. commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport,
  1058. out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID,
  1059. [{switch_protocol, Headers, Protocol, InitialState}|_Tail]) ->
  1060. %% @todo If there's streams opened after this one, fail instead of 101.
  1061. State1 = cancel_timeout(State0),
  1062. %% Before we send the 101 response we need to stop receiving data
  1063. %% from the socket, otherwise the data might be receive before the
  1064. %% call to flush/0 and we end up inadvertently dropping a packet.
  1065. %%
  1066. %% @todo Handle cases where the request came with a body. We need
  1067. %% to process or skip the body before the upgrade can be completed.
  1068. State = passive(State1),
  1069. %% Send a 101 response if necessary, then terminate the stream.
  1070. #state{streams=Streams} = case OutState of
  1071. wait -> info(State, StreamID, {inform, 101, Headers});
  1072. _ -> State
  1073. end,
  1074. #stream{state=StreamState} = lists:keyfind(StreamID, #stream.id, Streams),
  1075. %% @todo We need to shutdown processes here first.
  1076. stream_call_terminate(StreamID, switch_protocol, StreamState, State),
  1077. %% Terminate children processes and flush any remaining messages from the mailbox.
  1078. cowboy_children:terminate(Children),
  1079. flush(Parent),
  1080. Protocol:takeover(Parent, Ref, Socket, Transport, Opts, Buffer, InitialState);
  1081. %% Set options dynamically.
  1082. commands(State0=#state{overriden_opts=Opts},
  1083. StreamID, [{set_options, SetOpts}|Tail]) ->
  1084. State1 = case SetOpts of
  1085. #{idle_timeout := IdleTimeout} ->
  1086. set_timeout(State0#state{overriden_opts=Opts#{idle_timeout => IdleTimeout}},
  1087. idle_timeout);
  1088. _ ->
  1089. State0
  1090. end,
  1091. State = case SetOpts of
  1092. #{chunked := Chunked} ->
  1093. State1#state{overriden_opts=Opts#{chunked => Chunked}};
  1094. _ ->
  1095. State1
  1096. end,
  1097. commands(State, StreamID, Tail);
  1098. %% Stream shutdown.
  1099. commands(State, StreamID, [stop|Tail]) ->
  1100. %% @todo Do we want to run the commands after a stop?
  1101. %% @todo We currently wait for the stop command before we
  1102. %% continue with the next request/response. In theory, if
  1103. %% the request body was read fully and the response body
  1104. %% was sent fully we should be able to start working on
  1105. %% the next request concurrently. This can be done as a
  1106. %% future optimization.
  1107. maybe_terminate(State, StreamID, Tail);
  1108. %% Log event.
  1109. commands(State=#state{opts=Opts}, StreamID, [Log={log, _, _, _}|Tail]) ->
  1110. cowboy:log(Log, Opts),
  1111. commands(State, StreamID, Tail);
  1112. %% HTTP/1.1 does not support push; ignore.
  1113. commands(State, StreamID, [{push, _, _, _, _, _, _, _}|Tail]) ->
  1114. commands(State, StreamID, Tail).
  1115. %% The set-cookie header is special; we can only send one cookie per header.
  1116. headers_to_list(Headers0=#{<<"set-cookie">> := SetCookies}) ->
  1117. Headers1 = maps:to_list(maps:remove(<<"set-cookie">>, Headers0)),
  1118. Headers1 ++ [{<<"set-cookie">>, Value} || Value <- SetCookies];
  1119. headers_to_list(Headers) ->
  1120. maps:to_list(Headers).
  1121. %% We wrap the sendfile call into a try/catch because on OTP-20
  1122. %% and earlier a few different crashes could occur for sockets
  1123. %% that were closing or closed. For example a badarg in
  1124. %% erlang:port_get_data(#Port<...>) or a badmatch like
  1125. %% {{badmatch,{error,einval}},[{prim_file,sendfile,8,[]}...
  1126. %%
  1127. %% OTP-21 uses a NIF instead of a port so the implementation
  1128. %% and behavior has dramatically changed and it is unclear
  1129. %% whether it will be necessary in the future.
  1130. %%
  1131. %% This try/catch prevents some noisy logs to be written
  1132. %% when these errors occur.
  1133. sendfile(State=#state{socket=Socket, transport=Transport, opts=Opts},
  1134. {sendfile, Offset, Bytes, Path}) ->
  1135. try
  1136. %% When sendfile is disabled we explicitly use the fallback.
  1137. _ = case maps:get(sendfile, Opts, true) of
  1138. true -> Transport:sendfile(Socket, Path, Offset, Bytes);
  1139. false -> ranch_transport:sendfile(Transport, Socket, Path, Offset, Bytes, [])
  1140. end,
  1141. ok
  1142. catch _:_ ->
  1143. terminate(State, {socket_error, sendfile_crash,
  1144. 'An error occurred when using the sendfile function.'})
  1145. end.
  1146. %% Flush messages specific to cowboy_http before handing over the
  1147. %% connection to another protocol.
  1148. flush(Parent) ->
  1149. receive
  1150. {timeout, _, _} ->
  1151. flush(Parent);
  1152. {{Pid, _}, _} when Pid =:= self() ->
  1153. flush(Parent);
  1154. {'EXIT', Pid, _} when Pid =/= Parent ->
  1155. flush(Parent)
  1156. after 0 ->
  1157. ok
  1158. end.
  1159. %% @todo In these cases I'm not sure if we should continue processing commands.
  1160. maybe_terminate(State=#state{last_streamid=StreamID}, StreamID, _Tail) ->
  1161. terminate(stream_terminate(State, StreamID, normal), normal); %% @todo Reason ok?
  1162. maybe_terminate(State, StreamID, _Tail) ->
  1163. stream_terminate(State, StreamID, normal).
  1164. stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InState,
  1165. out_streamid=OutStreamID, out_state=OutState, streams=Streams0,
  1166. children=Children0}, StreamID, Reason) ->
  1167. #stream{version=Version, local_expected_size=ExpectedSize, local_sent_size=SentSize}
  1168. = lists:keyfind(StreamID, #stream.id, Streams0),
  1169. State1 = #state{streams=Streams1} = case OutState of
  1170. wait when element(1, Reason) =:= internal_error ->
  1171. info(State0, StreamID, {response, 500, #{<<"content-length">> => <<"0">>}, <<>>});
  1172. wait when element(1, Reason) =:= connection_error ->
  1173. info(State0, StreamID, {response, 400, #{<<"content-length">> => <<"0">>}, <<>>});
  1174. wait ->
  1175. info(State0, StreamID, {response, 204, #{}, <<>>});
  1176. chunked when Version =:= 'HTTP/1.1' ->
  1177. info(State0, StreamID, {data, fin, <<>>});
  1178. streaming when SentSize < ExpectedSize ->
  1179. terminate(State0, response_body_too_small);
  1180. _ -> %% done or Version =:= 'HTTP/1.0'
  1181. State0
  1182. end,
  1183. %% Remove the stream from the state and reset the overriden options.
  1184. {value, #stream{state=StreamState}, Streams}
  1185. = lists:keytake(StreamID, #stream.id, Streams1),
  1186. State2 = State1#state{streams=Streams, overriden_opts=#{}, flow=infinity},
  1187. %% Stop the stream.
  1188. stream_call_terminate(StreamID, Reason, StreamState, State2),
  1189. Children = cowboy_children:shutdown(Children0, StreamID),
  1190. %% We reset the timeout if there are no active streams anymore.
  1191. State = set_timeout(State2#state{streams=Streams, children=Children}, request_timeout),
  1192. %% We want to drop the connection if the body was not read fully
  1193. %% and we don't know its length or more remains to be read than
  1194. %% configuration allows.
  1195. %% @todo Only do this if Current =:= StreamID.
  1196. MaxSkipBodyLength = maps:get(max_skip_body_length, Opts, 1000000),
  1197. case InState of
  1198. #ps_body{length=undefined}
  1199. when InStreamID =:= OutStreamID ->
  1200. terminate(State, skip_body_unknown_length);
  1201. #ps_body{length=Len, received=Received}
  1202. when InStreamID =:= OutStreamID, Received + MaxSkipBodyLength < Len ->
  1203. terminate(State, skip_body_too_large);
  1204. _ ->
  1205. %% Move on to the next stream.
  1206. NextOutStreamID = OutStreamID + 1,
  1207. case lists:keyfind(NextOutStreamID, #stream.id, Streams) of
  1208. false ->
  1209. State#state{out_streamid=NextOutStreamID, out_state=wait};
  1210. #stream{queue=Commands} ->
  1211. %% @todo Remove queue from the stream.
  1212. commands(State#state{out_streamid=NextOutStreamID, out_state=wait},
  1213. NextOutStreamID, Commands)
  1214. end
  1215. end.
  1216. stream_call_terminate(StreamID, Reason, StreamState, #state{opts=Opts}) ->
  1217. try
  1218. cowboy_stream:terminate(StreamID, Reason, StreamState)
  1219. catch Class:Exception ->
  1220. cowboy:log(cowboy_stream:make_error_log(terminate,
  1221. [StreamID, Reason, StreamState],
  1222. Class, Exception, erlang:get_stacktrace()), Opts)
  1223. end.
  1224. maybe_req_close(#state{opts=#{http10_keepalive := false}}, _, 'HTTP/1.0') ->
  1225. close;
  1226. maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.0') ->
  1227. Conns = cow_http_hd:parse_connection(Conn),
  1228. case lists:member(<<"keep-alive">>, Conns) of
  1229. true -> keepalive;
  1230. false -> close
  1231. end;
  1232. maybe_req_close(_, _, 'HTTP/1.0') ->
  1233. close;
  1234. maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.1') ->
  1235. case connection_hd_is_close(Conn) of
  1236. true -> close;
  1237. false -> keepalive
  1238. end;
  1239. maybe_req_close(_, _, _) ->
  1240. keepalive.
  1241. connection(State=#state{last_streamid=StreamID}, Headers=#{<<"connection">> := Conn}, StreamID, _) ->
  1242. case connection_hd_is_close(Conn) of
  1243. true -> {State, Headers};
  1244. %% @todo Here we need to remove keep-alive and add close, not just add close.
  1245. false -> {State, Headers#{<<"connection">> => [<<"close, ">>, Conn]}}
  1246. end;
  1247. connection(State=#state{last_streamid=StreamID}, Headers, StreamID, _) ->
  1248. {State, Headers#{<<"connection">> => <<"close">>}};
  1249. connection(State, Headers=#{<<"connection">> := Conn}, StreamID, _) ->
  1250. case connection_hd_is_close(Conn) of
  1251. true -> {State#state{last_streamid=StreamID}, Headers};
  1252. %% @todo Here we need to set keep-alive only if it wasn't set before.
  1253. false -> {State, Headers}
  1254. end;
  1255. connection(State, Headers, _, 'HTTP/1.0') ->
  1256. {State, Headers#{<<"connection">> => <<"keep-alive">>}};
  1257. connection(State, Headers, _, _) ->
  1258. {State, Headers}.
  1259. connection_hd_is_close(Conn) ->
  1260. Conns = cow_http_hd:parse_connection(iolist_to_binary(Conn)),
  1261. lists:member(<<"close">>, Conns).
  1262. stream_te(streaming, _) ->
  1263. not_chunked;
  1264. %% No TE header was sent.
  1265. stream_te(_, #stream{te=undefined}) ->
  1266. no_trailers;
  1267. stream_te(_, #stream{te=TE0}) ->
  1268. try cow_http_hd:parse_te(TE0) of
  1269. {TE1, _} -> TE1
  1270. catch _:_ ->
  1271. %% If we can't parse the TE header, assume we can't send trailers.
  1272. no_trailers
  1273. end.
  1274. %% This function is only called when an error occurs on a new stream.
  1275. -spec error_terminate(cowboy:http_status(), #state{}, _) -> no_return().
  1276. error_terminate(StatusCode, State=#state{ref=Ref, peer=Peer, in_state=StreamState}, Reason) ->
  1277. PartialReq = case StreamState of
  1278. #ps_request_line{} -> #{
  1279. ref => Ref,
  1280. peer => Peer
  1281. };
  1282. #ps_header{method=Method, path=Path, qs=Qs,
  1283. version=Version, headers=ReqHeaders} -> #{
  1284. ref => Ref,
  1285. peer => Peer,
  1286. method => Method,
  1287. path => Path,
  1288. qs => Qs,
  1289. version => Version,
  1290. headers => case ReqHeaders of
  1291. undefined -> #{};
  1292. _ -> ReqHeaders
  1293. end
  1294. }
  1295. end,
  1296. early_error(StatusCode, State, Reason, PartialReq, #{<<"connection">> => <<"close">>}),
  1297. terminate(State, Reason).
  1298. early_error(StatusCode, State, Reason, PartialReq) ->
  1299. early_error(StatusCode, State, Reason, PartialReq, #{}).
  1300. early_error(StatusCode0, #state{socket=Socket, transport=Transport,
  1301. opts=Opts, in_streamid=StreamID}, Reason, PartialReq, RespHeaders0) ->
  1302. RespHeaders1 = RespHeaders0#{<<"content-length">> => <<"0">>},
  1303. Resp = {response, StatusCode0, RespHeaders1, <<>>},
  1304. try cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts) of
  1305. {response, StatusCode, RespHeaders, RespBody} ->
  1306. Transport:send(Socket, [
  1307. cow_http:response(StatusCode, 'HTTP/1.1', maps:to_list(RespHeaders)),
  1308. %% @todo We shouldn't send the body when the method is HEAD.
  1309. %% @todo Technically we allow the sendfile tuple.
  1310. RespBody
  1311. ])
  1312. catch Class:Exception ->
  1313. cowboy:log(cowboy_stream:make_error_log(early_error,
  1314. [StreamID, Reason, PartialReq, Resp, Opts],
  1315. Class, Exception, erlang:get_stacktrace()), Opts),
  1316. %% We still need to send an error response, so send what we initially
  1317. %% wanted to send. It's better than nothing.
  1318. Transport:send(Socket, cow_http:response(StatusCode0,
  1319. 'HTTP/1.1', maps:to_list(RespHeaders1)))
  1320. end,
  1321. ok.
  1322. -spec terminate(_, _) -> no_return().
  1323. terminate(undefined, Reason) ->
  1324. exit({shutdown, Reason});
  1325. terminate(State=#state{streams=Streams, children=Children}, Reason) ->
  1326. terminate_all_streams(State, Streams, Reason),
  1327. cowboy_children:terminate(Children),
  1328. terminate_linger(State),
  1329. exit({shutdown, Reason}).
  1330. terminate_all_streams(_, [], _) ->
  1331. ok;
  1332. terminate_all_streams(State, [#stream{id=StreamID, state=StreamState}|Tail], Reason) ->
  1333. stream_call_terminate(StreamID, Reason, StreamState, State),
  1334. terminate_all_streams(State, Tail, Reason).
  1335. terminate_linger(State=#state{socket=Socket, transport=Transport, opts=Opts}) ->
  1336. case Transport:shutdown(Socket, write) of
  1337. ok ->
  1338. case maps:get(linger_timeout, Opts, 1000) of
  1339. 0 ->
  1340. ok;
  1341. infinity ->
  1342. terminate_linger_loop(State, undefined);
  1343. Timeout ->
  1344. TimerRef = erlang:start_timer(Timeout, self(), linger_timeout),
  1345. terminate_linger_loop(State, TimerRef)
  1346. end;
  1347. {error, _} ->
  1348. ok
  1349. end.
  1350. terminate_linger_loop(State=#state{socket=Socket, transport=Transport}, TimerRef) ->
  1351. Messages = Transport:messages(),
  1352. %% We may already have a message in the mailbox when we do this
  1353. %% but it's OK because we are shutting down anyway.
  1354. %% @todo Use active,N here as well.
  1355. case Transport:setopts(Socket, [{active, once}]) of
  1356. ok ->
  1357. receive
  1358. {OK, Socket, _} when OK =:= element(1, Messages) ->
  1359. terminate_linger_loop(State, TimerRef);
  1360. {Closed, Socket} when Closed =:= element(2, Messages) ->
  1361. ok;
  1362. {Error, Socket, _} when Error =:= element(3, Messages) ->
  1363. ok;
  1364. {timeout, TimerRef, linger_timeout} ->
  1365. ok;
  1366. _ ->
  1367. terminate_linger_loop(State, TimerRef)
  1368. end;
  1369. {error, _} ->
  1370. ok
  1371. end.
  1372. %% System callbacks.
  1373. -spec system_continue(_, _, #state{}) -> ok.
  1374. system_continue(_, _, State) ->
  1375. loop(State).
  1376. -spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return().
  1377. system_terminate(Reason, _, _, State) ->
  1378. terminate(State, {stop, {exit, Reason}, 'sys:terminate/2,3 was called.'}).
  1379. -spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}.
  1380. system_code_change(Misc, _, _, _) ->
  1381. {ok, Misc}.