cowboy_http.erl 57 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459
  1. %% Copyright (c) 2016-2017, Loïc Hoguin <essen@ninenines.eu>
  2. %%
  3. %% Permission to use, copy, modify, and/or distribute this software for any
  4. %% purpose with or without fee is hereby granted, provided that the above
  5. %% copyright notice and this permission notice appear in all copies.
  6. %%
  7. %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  8. %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  9. %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  10. %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  11. %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  12. %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  13. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  14. -module(cowboy_http).
  15. -ifdef(OTP_RELEASE).
  16. -compile({nowarn_deprecated_function, [{erlang, get_stacktrace, 0}]}).
  17. -endif.
  18. -export([init/6]).
  19. -export([system_continue/3]).
  20. -export([system_terminate/4]).
  21. -export([system_code_change/4]).
  22. -type opts() :: #{
  23. chunked => boolean(),
  24. compress_buffering => boolean(),
  25. compress_threshold => non_neg_integer(),
  26. connection_type => worker | supervisor,
  27. env => cowboy_middleware:env(),
  28. http10_keepalive => boolean(),
  29. idle_timeout => timeout(),
  30. inactivity_timeout => timeout(),
  31. linger_timeout => timeout(),
  32. logger => module(),
  33. max_authority_length => non_neg_integer(),
  34. max_empty_lines => non_neg_integer(),
  35. max_header_name_length => non_neg_integer(),
  36. max_header_value_length => non_neg_integer(),
  37. max_headers => non_neg_integer(),
  38. max_keepalive => non_neg_integer(),
  39. max_method_length => non_neg_integer(),
  40. max_request_line_length => non_neg_integer(),
  41. metrics_callback => cowboy_metrics_h:metrics_callback(),
  42. middlewares => [module()],
  43. proxy_header => boolean(),
  44. request_timeout => timeout(),
  45. sendfile => boolean(),
  46. shutdown_timeout => timeout(),
  47. stream_handlers => [module()],
  48. tracer_callback => cowboy_tracer_h:tracer_callback(),
  49. tracer_match_specs => cowboy_tracer_h:tracer_match_specs(),
  50. %% Open ended because configured stream handlers might add options.
  51. _ => _
  52. }.
  53. -export_type([opts/0]).
  54. -record(ps_request_line, {
  55. empty_lines = 0 :: non_neg_integer()
  56. }).
  57. -record(ps_header, {
  58. method = undefined :: binary(),
  59. authority = undefined :: binary() | undefined,
  60. path = undefined :: binary(),
  61. qs = undefined :: binary(),
  62. version = undefined :: cowboy:http_version(),
  63. headers = undefined :: cowboy:http_headers() | undefined,
  64. name = undefined :: binary() | undefined
  65. }).
  66. %% @todo We need a state where we wait for the stream process to ask for the body
  67. %% and do not attempt to read from the socket while in that state (we should read
  68. %% up to a certain length, and then wait, basically implementing flow control but
  69. %% by not reading from the socket when the window is empty).
  70. -record(ps_body, {
  71. length :: non_neg_integer() | undefined,
  72. received = 0 :: non_neg_integer(),
  73. %% @todo flow
  74. transfer_decode_fun :: fun(), %% @todo better type
  75. transfer_decode_state :: any() %% @todo better type
  76. }).
  77. -record(stream, {
  78. id = undefined :: cowboy_stream:streamid(),
  79. %% Stream handlers and their state.
  80. state = undefined :: {module(), any()},
  81. %% Request method.
  82. method = undefined :: binary(),
  83. %% Client HTTP version for this stream.
  84. version = undefined :: cowboy:http_version(),
  85. %% Unparsed te header. Used to know if we can send trailers.
  86. te :: undefined | binary(),
  87. %% Expected body size.
  88. local_expected_size = undefined :: undefined | non_neg_integer(),
  89. %% Sent body size.
  90. local_sent_size = 0 :: non_neg_integer(),
  91. %% Commands queued.
  92. queue = [] :: cowboy_stream:commands()
  93. }).
  94. -type stream() :: #stream{}.
  95. -record(state, {
  96. parent :: pid(),
  97. ref :: ranch:ref(),
  98. socket :: inet:socket(),
  99. transport :: module(),
  100. proxy_header :: undefined | ranch_proxy_header:proxy_info(),
  101. opts = #{} :: cowboy:opts(),
  102. %% Some options may be overriden for the current stream.
  103. overriden_opts = #{} :: cowboy:opts(),
  104. %% Remote address and port for the connection.
  105. peer = undefined :: {inet:ip_address(), inet:port_number()},
  106. %% Local address and port for the connection.
  107. sock = undefined :: {inet:ip_address(), inet:port_number()},
  108. %% Client certificate (TLS only).
  109. cert :: undefined | binary(),
  110. timer = undefined :: undefined | reference(),
  111. %% Identifier for the stream currently being read (or waiting to be received).
  112. in_streamid = 1 :: pos_integer(),
  113. %% Parsing state for the current stream or stream-to-be.
  114. in_state = #ps_request_line{} :: #ps_request_line{} | #ps_header{} | #ps_body{},
  115. %% Identifier for the stream currently being written.
  116. %% Note that out_streamid =< in_streamid.
  117. out_streamid = 1 :: pos_integer(),
  118. %% Whether we finished writing data for the current stream.
  119. out_state = wait :: wait | chunked | streaming | done,
  120. %% The connection will be closed after this stream.
  121. last_streamid = undefined :: pos_integer(),
  122. %% Currently active HTTP/1.1 streams.
  123. streams = [] :: [stream()],
  124. %% Children processes created by streams.
  125. children = cowboy_children:init() :: cowboy_children:children()
  126. }).
  127. -include_lib("cowlib/include/cow_inline.hrl").
  128. -include_lib("cowlib/include/cow_parse.hrl").
  129. -spec init(pid(), ranch:ref(), inet:socket(), module(),
  130. ranch_proxy_header:proxy_info(), cowboy:opts()) -> ok.
  131. init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
  132. Peer0 = Transport:peername(Socket),
  133. Sock0 = Transport:sockname(Socket),
  134. Cert1 = case Transport:name() of
  135. ssl ->
  136. case ssl:peercert(Socket) of
  137. {error, no_peercert} ->
  138. {ok, undefined};
  139. Cert0 ->
  140. Cert0
  141. end;
  142. _ ->
  143. {ok, undefined}
  144. end,
  145. case {Peer0, Sock0, Cert1} of
  146. {{ok, Peer}, {ok, Sock}, {ok, Cert}} ->
  147. LastStreamID = maps:get(max_keepalive, Opts, 100),
  148. before_loop(set_timeout(#state{
  149. parent=Parent, ref=Ref, socket=Socket,
  150. transport=Transport, proxy_header=ProxyHeader, opts=Opts,
  151. peer=Peer, sock=Sock, cert=Cert,
  152. last_streamid=LastStreamID}), <<>>);
  153. {{error, Reason}, _, _} ->
  154. terminate(undefined, {socket_error, Reason,
  155. 'A socket error occurred when retrieving the peer name.'});
  156. {_, {error, Reason}, _} ->
  157. terminate(undefined, {socket_error, Reason,
  158. 'A socket error occurred when retrieving the sock name.'});
  159. {_, _, {error, Reason}} ->
  160. terminate(undefined, {socket_error, Reason,
  161. 'A socket error occurred when retrieving the client TLS certificate.'})
  162. end.
  163. before_loop(State=#state{socket=Socket, transport=Transport}, Buffer) ->
  164. %% @todo disable this when we get to the body, until the stream asks for it?
  165. %% Perhaps have a threshold for how much we're willing to read before waiting.
  166. Transport:setopts(Socket, [{active, once}]),
  167. loop(State, Buffer).
  168. loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
  169. timer=TimerRef, children=Children, in_streamid=InStreamID,
  170. last_streamid=LastStreamID, streams=Streams}, Buffer) ->
  171. Messages = Transport:messages(),
  172. InactivityTimeout = maps:get(inactivity_timeout, Opts, 300000),
  173. receive
  174. %% Discard data coming in after the last request
  175. %% we want to process was received fully.
  176. {OK, Socket, _} when OK =:= element(1, Messages), InStreamID > LastStreamID ->
  177. before_loop(State, Buffer);
  178. %% Socket messages.
  179. {OK, Socket, Data} when OK =:= element(1, Messages) ->
  180. %% Only reset the timeout if it is idle_timeout (active streams).
  181. State1 = case Streams of
  182. [] -> State;
  183. _ -> set_timeout(State)
  184. end,
  185. parse(<< Buffer/binary, Data/binary >>, State1);
  186. {Closed, Socket} when Closed =:= element(2, Messages) ->
  187. terminate(State, {socket_error, closed, 'The socket has been closed.'});
  188. {Error, Socket, Reason} when Error =:= element(3, Messages) ->
  189. terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
  190. %% Timeouts.
  191. {timeout, Ref, {shutdown, Pid}} ->
  192. cowboy_children:shutdown_timeout(Children, Ref, Pid),
  193. loop(State, Buffer);
  194. {timeout, TimerRef, Reason} ->
  195. timeout(State, Reason);
  196. {timeout, _, _} ->
  197. loop(State, Buffer);
  198. %% System messages.
  199. {'EXIT', Parent, Reason} ->
  200. terminate(State, {stop, {exit, Reason}, 'Parent process terminated.'});
  201. {system, From, Request} ->
  202. sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer});
  203. %% Messages pertaining to a stream.
  204. {{Pid, StreamID}, Msg} when Pid =:= self() ->
  205. loop(info(State, StreamID, Msg), Buffer);
  206. %% Exit signal from children.
  207. Msg = {'EXIT', Pid, _} ->
  208. loop(down(State, Pid, Msg), Buffer);
  209. %% Calls from supervisor module.
  210. {'$gen_call', From, Call} ->
  211. cowboy_children:handle_supervisor_call(Call, From, Children, ?MODULE),
  212. loop(State, Buffer);
  213. %% Unknown messages.
  214. Msg ->
  215. cowboy:log(warning, "Received stray message ~p.~n", [Msg], Opts),
  216. loop(State, Buffer)
  217. after InactivityTimeout ->
  218. terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
  219. end.
  220. %% We set request_timeout when there are no active streams,
  221. %% and idle_timeout otherwise.
  222. set_timeout(State0=#state{opts=Opts, overriden_opts=Override, streams=Streams}) ->
  223. State = cancel_timeout(State0),
  224. {Name, Default} = case Streams of
  225. [] -> {request_timeout, 5000};
  226. _ -> {idle_timeout, 60000}
  227. end,
  228. Timeout = case Override of
  229. %% The timeout may have been overriden for the current stream.
  230. #{Name := Timeout0} -> Timeout0;
  231. _ -> maps:get(Name, Opts, Default)
  232. end,
  233. TimerRef = case Timeout of
  234. infinity -> undefined;
  235. Timeout -> erlang:start_timer(Timeout, self(), Name)
  236. end,
  237. State#state{timer=TimerRef}.
  238. cancel_timeout(State=#state{timer=TimerRef}) ->
  239. ok = case TimerRef of
  240. undefined ->
  241. ok;
  242. _ ->
  243. %% Do a synchronous cancel and remove the message if any
  244. %% to avoid receiving stray messages.
  245. _ = erlang:cancel_timer(TimerRef),
  246. receive
  247. {timeout, TimerRef, _} -> ok
  248. after 0 ->
  249. ok
  250. end
  251. end,
  252. State#state{timer=undefined}.
  253. -spec timeout(_, _) -> no_return().
  254. timeout(State=#state{in_state=#ps_request_line{}}, request_timeout) ->
  255. terminate(State, {connection_error, timeout,
  256. 'No request-line received before timeout.'});
  257. timeout(State=#state{in_state=#ps_header{}}, request_timeout) ->
  258. error_terminate(408, State, {connection_error, timeout,
  259. 'Request headers not received before timeout.'});
  260. timeout(State, idle_timeout) ->
  261. terminate(State, {connection_error, timeout,
  262. 'Connection idle longer than configuration allows.'}).
  263. parse(<<>>, State) ->
  264. before_loop(State, <<>>);
  265. %% Do not process requests that come in after the last request
  266. %% and discard the buffer if any to save memory.
  267. parse(_, State=#state{in_streamid=InStreamID, in_state=#ps_request_line{},
  268. last_streamid=LastStreamID}) when InStreamID > LastStreamID ->
  269. before_loop(State, <<>>);
  270. parse(Buffer, State=#state{in_state=#ps_request_line{empty_lines=EmptyLines}}) ->
  271. after_parse(parse_request(Buffer, State, EmptyLines));
  272. parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=undefined}}) ->
  273. after_parse(parse_header(Buffer,
  274. State#state{in_state=PS#ps_header{headers=undefined}},
  275. Headers));
  276. parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=Name}}) ->
  277. after_parse(parse_hd_before_value(Buffer,
  278. State#state{in_state=PS#ps_header{headers=undefined, name=undefined}},
  279. Headers, Name));
  280. parse(Buffer, State=#state{in_state=#ps_body{}}) ->
  281. %% @todo We do not want to get the body automatically if the request doesn't ask for it.
  282. %% We may want to get bodies that are below a threshold without waiting, and buffer them
  283. %% until the request asks, though.
  284. after_parse(parse_body(Buffer, State)).
  285. after_parse({request, Req=#{streamid := StreamID, method := Method,
  286. headers := Headers, version := Version},
  287. State0=#state{opts=Opts, streams=Streams0}, Buffer}) ->
  288. try cowboy_stream:init(StreamID, Req, Opts) of
  289. {Commands, StreamState} ->
  290. TE = maps:get(<<"te">>, Headers, undefined),
  291. Streams = [#stream{id=StreamID, state=StreamState,
  292. method=Method, version=Version, te=TE}|Streams0],
  293. State1 = case maybe_req_close(State0, Headers, Version) of
  294. close -> State0#state{streams=Streams, last_streamid=StreamID};
  295. keepalive -> State0#state{streams=Streams}
  296. end,
  297. State = set_timeout(State1),
  298. parse(Buffer, commands(State, StreamID, Commands))
  299. catch Class:Exception ->
  300. cowboy:log(cowboy_stream:make_error_log(init,
  301. [StreamID, Req, Opts],
  302. Class, Exception, erlang:get_stacktrace()), Opts),
  303. early_error(500, State0, {internal_error, {Class, Exception},
  304. 'Unhandled exception in cowboy_stream:init/3.'}, Req),
  305. parse(Buffer, State0)
  306. end;
  307. %% Streams are sequential so the body is always about the last stream created
  308. %% unless that stream has terminated.
  309. after_parse({data, StreamID, IsFin, Data, State=#state{opts=Opts,
  310. streams=Streams0=[Stream=#stream{id=StreamID, state=StreamState0}|_]}, Buffer}) ->
  311. try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
  312. {Commands, StreamState} ->
  313. Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
  314. Stream#stream{state=StreamState}),
  315. parse(Buffer, commands(State#state{streams=Streams}, StreamID, Commands))
  316. catch Class:Exception ->
  317. cowboy:log(cowboy_stream:make_error_log(data,
  318. [StreamID, IsFin, Data, StreamState0],
  319. Class, Exception, erlang:get_stacktrace()), Opts),
  320. stream_reset(State, StreamID, {internal_error, {Class, Exception},
  321. 'Unhandled exception in cowboy_stream:data/4.'})
  322. end;
  323. %% No corresponding stream. We must skip the body of the previous request
  324. %% in order to process the next one.
  325. after_parse({data, _, _, _, State, Buffer}) ->
  326. before_loop(State, Buffer);
  327. after_parse({more, State, Buffer}) ->
  328. before_loop(State, Buffer).
  329. %% Request-line.
  330. -spec parse_request(Buffer, State, non_neg_integer())
  331. -> {request, cowboy_req:req(), State, Buffer}
  332. | {data, cowboy_stream:streamid(), cowboy_stream:fin(), binary(), State, Buffer}
  333. | {more, State, Buffer}
  334. when Buffer::binary(), State::#state{}.
  335. %% Empty lines must be using \r\n.
  336. parse_request(<< $\n, _/bits >>, State, _) ->
  337. error_terminate(400, State, {connection_error, protocol_error,
  338. 'Empty lines between requests must use the CRLF line terminator. (RFC7230 3.5)'});
  339. parse_request(<< $\s, _/bits >>, State, _) ->
  340. error_terminate(400, State, {connection_error, protocol_error,
  341. 'The request-line must not begin with a space. (RFC7230 3.1.1, RFC7230 3.5)'});
  342. %% We limit the length of the Request-line to MaxLength to avoid endlessly
  343. %% reading from the socket and eventually crashing.
  344. parse_request(Buffer, State=#state{opts=Opts, in_streamid=InStreamID}, EmptyLines) ->
  345. MaxLength = maps:get(max_request_line_length, Opts, 8000),
  346. MaxEmptyLines = maps:get(max_empty_lines, Opts, 5),
  347. case match_eol(Buffer, 0) of
  348. nomatch when byte_size(Buffer) > MaxLength ->
  349. error_terminate(414, State, {connection_error, limit_reached,
  350. 'The request-line length is larger than configuration allows. (RFC7230 3.1.1)'});
  351. nomatch ->
  352. {more, State#state{in_state=#ps_request_line{empty_lines=EmptyLines}}, Buffer};
  353. 1 when EmptyLines =:= MaxEmptyLines ->
  354. error_terminate(400, State, {connection_error, limit_reached,
  355. 'More empty lines were received than configuration allows. (RFC7230 3.5)'});
  356. 1 ->
  357. << _:16, Rest/bits >> = Buffer,
  358. parse_request(Rest, State, EmptyLines + 1);
  359. _ ->
  360. case Buffer of
  361. %% @todo * is only for server-wide OPTIONS request (RFC7230 5.3.4); tests
  362. << "OPTIONS * ", Rest/bits >> ->
  363. parse_version(Rest, State, <<"OPTIONS">>, undefined, <<"*">>, <<>>);
  364. <<"CONNECT ", _/bits>> ->
  365. error_terminate(501, State, {connection_error, no_error,
  366. 'The CONNECT method is currently not implemented. (RFC7231 4.3.6)'});
  367. <<"TRACE ", _/bits>> ->
  368. error_terminate(501, State, {connection_error, no_error,
  369. 'The TRACE method is currently not implemented. (RFC7231 4.3.8)'});
  370. %% Accept direct HTTP/2 only at the beginning of the connection.
  371. << "PRI * HTTP/2.0\r\n", _/bits >> when InStreamID =:= 1 ->
  372. %% @todo Might be worth throwing to get a clean stacktrace.
  373. http2_upgrade(State, Buffer);
  374. _ ->
  375. parse_method(Buffer, State, <<>>,
  376. maps:get(max_method_length, Opts, 32))
  377. end
  378. end.
  379. match_eol(<< $\n, _/bits >>, N) ->
  380. N;
  381. match_eol(<< _, Rest/bits >>, N) ->
  382. match_eol(Rest, N + 1);
  383. match_eol(_, _) ->
  384. nomatch.
  385. parse_method(_, State, _, 0) ->
  386. error_terminate(501, State, {connection_error, limit_reached,
  387. 'The method name is longer than configuration allows. (RFC7230 3.1.1)'});
  388. parse_method(<< C, Rest/bits >>, State, SoFar, Remaining) ->
  389. case C of
  390. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  391. 'The method name must not be followed with a line break. (RFC7230 3.1.1)'});
  392. $\s -> parse_uri(Rest, State, SoFar);
  393. _ when ?IS_TOKEN(C) -> parse_method(Rest, State, << SoFar/binary, C >>, Remaining - 1);
  394. _ -> error_terminate(400, State, {connection_error, protocol_error,
  395. 'The method name must contain only valid token characters. (RFC7230 3.1.1)'})
  396. end.
  397. parse_uri(<< H, T, T, P, "://", Rest/bits >>, State, Method)
  398. when H =:= $h orelse H =:= $H, T =:= $t orelse T =:= $T;
  399. P =:= $p orelse P =:= $P ->
  400. parse_uri_authority(Rest, State, Method);
  401. parse_uri(<< H, T, T, P, S, "://", Rest/bits >>, State, Method)
  402. when H =:= $h orelse H =:= $H, T =:= $t orelse T =:= $T;
  403. P =:= $p orelse P =:= $P; S =:= $s orelse S =:= $S ->
  404. parse_uri_authority(Rest, State, Method);
  405. parse_uri(<< $/, Rest/bits >>, State, Method) ->
  406. parse_uri_path(Rest, State, Method, undefined, <<$/>>);
  407. parse_uri(_, State, _) ->
  408. error_terminate(400, State, {connection_error, protocol_error,
  409. 'Invalid request-line or request-target. (RFC7230 3.1.1, RFC7230 5.3)'}).
  410. %% @todo We probably want to apply max_authority_length also
  411. %% to the host header and to document this option. It might
  412. %% also be useful for HTTP/2 requests.
  413. parse_uri_authority(Rest, State=#state{opts=Opts}, Method) ->
  414. parse_uri_authority(Rest, State, Method, <<>>,
  415. maps:get(max_authority_length, Opts, 255)).
  416. parse_uri_authority(_, State, _, _, 0) ->
  417. error_terminate(414, State, {connection_error, limit_reached,
  418. 'The authority component of the absolute URI is longer than configuration allows. (RFC7230 2.7.1)'});
  419. parse_uri_authority(<<C, Rest/bits>>, State, Method, SoFar, Remaining) ->
  420. case C of
  421. $\r ->
  422. error_terminate(400, State, {connection_error, protocol_error,
  423. 'The request-target must not be followed by a line break. (RFC7230 3.1.1)'});
  424. $@ ->
  425. error_terminate(400, State, {connection_error, protocol_error,
  426. 'Absolute URIs must not include a userinfo component. (RFC7230 2.7.1)'});
  427. C when SoFar =:= <<>> andalso
  428. ((C =:= $/) orelse (C =:= $\s) orelse (C =:= $?) orelse (C =:= $#)) ->
  429. error_terminate(400, State, {connection_error, protocol_error,
  430. 'Absolute URIs must include a non-empty host component. (RFC7230 2.7.1)'});
  431. $: when SoFar =:= <<>> ->
  432. error_terminate(400, State, {connection_error, protocol_error,
  433. 'Absolute URIs must include a non-empty host component. (RFC7230 2.7.1)'});
  434. $/ -> parse_uri_path(Rest, State, Method, SoFar, <<"/">>);
  435. $\s -> parse_version(Rest, State, Method, SoFar, <<"/">>, <<>>);
  436. $? -> parse_uri_query(Rest, State, Method, SoFar, <<"/">>, <<>>);
  437. $# -> skip_uri_fragment(Rest, State, Method, SoFar, <<"/">>, <<>>);
  438. C -> parse_uri_authority(Rest, State, Method, <<SoFar/binary, C>>, Remaining - 1)
  439. end.
  440. parse_uri_path(<<C, Rest/bits>>, State, Method, Authority, SoFar) ->
  441. case C of
  442. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  443. 'The request-target must not be followed by a line break. (RFC7230 3.1.1)'});
  444. $\s -> parse_version(Rest, State, Method, Authority, SoFar, <<>>);
  445. $? -> parse_uri_query(Rest, State, Method, Authority, SoFar, <<>>);
  446. $# -> skip_uri_fragment(Rest, State, Method, Authority, SoFar, <<>>);
  447. _ -> parse_uri_path(Rest, State, Method, Authority, <<SoFar/binary, C>>)
  448. end.
  449. parse_uri_query(<<C, Rest/bits>>, State, M, A, P, SoFar) ->
  450. case C of
  451. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  452. 'The request-target must not be followed by a line break. (RFC7230 3.1.1)'});
  453. $\s -> parse_version(Rest, State, M, A, P, SoFar);
  454. $# -> skip_uri_fragment(Rest, State, M, A, P, SoFar);
  455. _ -> parse_uri_query(Rest, State, M, A, P, <<SoFar/binary, C>>)
  456. end.
  457. skip_uri_fragment(<<C, Rest/bits>>, State, M, A, P, Q) ->
  458. case C of
  459. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  460. 'The request-target must not be followed by a line break. (RFC7230 3.1.1)'});
  461. $\s -> parse_version(Rest, State, M, A, P, Q);
  462. _ -> skip_uri_fragment(Rest, State, M, A, P, Q)
  463. end.
  464. parse_version(<< "HTTP/1.1\r\n", Rest/bits >>, State, M, A, P, Q) ->
  465. before_parse_headers(Rest, State, M, A, P, Q, 'HTTP/1.1');
  466. parse_version(<< "HTTP/1.0\r\n", Rest/bits >>, State, M, A, P, Q) ->
  467. before_parse_headers(Rest, State, M, A, P, Q, 'HTTP/1.0');
  468. parse_version(<< "HTTP/1.", _, C, _/bits >>, State, _, _, _, _) when C =:= $\s; C =:= $\t ->
  469. error_terminate(400, State, {connection_error, protocol_error,
  470. 'Whitespace is not allowed after the HTTP version. (RFC7230 3.1.1)'});
  471. parse_version(<< C, _/bits >>, State, _, _, _, _) when C =:= $\s; C =:= $\t ->
  472. error_terminate(400, State, {connection_error, protocol_error,
  473. 'The separator between request target and version must be a single SP. (RFC7230 3.1.1)'});
  474. parse_version(_, State, _, _, _, _) ->
  475. error_terminate(505, State, {connection_error, protocol_error,
  476. 'Unsupported HTTP version. (RFC7230 2.6)'}).
  477. before_parse_headers(Rest, State, M, A, P, Q, V) ->
  478. parse_header(Rest, State#state{in_state=#ps_header{
  479. method=M, authority=A, path=P, qs=Q, version=V}}, #{}).
  480. %% Headers.
  481. %% We need two or more bytes in the buffer to continue.
  482. parse_header(Rest, State=#state{in_state=PS}, Headers) when byte_size(Rest) < 2 ->
  483. {more, State#state{in_state=PS#ps_header{headers=Headers}}, Rest};
  484. parse_header(<< $\r, $\n, Rest/bits >>, S, Headers) ->
  485. request(Rest, S, Headers);
  486. parse_header(Buffer, State=#state{opts=Opts, in_state=PS}, Headers) ->
  487. MaxHeaders = maps:get(max_headers, Opts, 100),
  488. NumHeaders = maps:size(Headers),
  489. if
  490. NumHeaders >= MaxHeaders ->
  491. error_terminate(431, State#state{in_state=PS#ps_header{headers=Headers}},
  492. {connection_error, limit_reached,
  493. 'The number of headers is larger than configuration allows. (RFC7230 3.2.5, RFC6585 5)'});
  494. true ->
  495. parse_header_colon(Buffer, State, Headers)
  496. end.
  497. parse_header_colon(Buffer, State=#state{opts=Opts, in_state=PS}, Headers) ->
  498. MaxLength = maps:get(max_header_name_length, Opts, 64),
  499. case match_colon(Buffer, 0) of
  500. nomatch when byte_size(Buffer) > MaxLength ->
  501. error_terminate(431, State#state{in_state=PS#ps_header{headers=Headers}},
  502. {connection_error, limit_reached,
  503. 'A header name is larger than configuration allows. (RFC7230 3.2.5, RFC6585 5)'});
  504. nomatch ->
  505. %% We don't have a colon but we might have an invalid header line,
  506. %% so check if we have an LF and abort with an error if we do.
  507. case match_eol(Buffer, 0) of
  508. nomatch ->
  509. {more, State#state{in_state=PS#ps_header{headers=Headers}}, Buffer};
  510. _ ->
  511. error_terminate(400, State#state{in_state=PS#ps_header{headers=Headers}},
  512. {connection_error, protocol_error,
  513. 'A header line is missing a colon separator. (RFC7230 3.2.4)'})
  514. end;
  515. _ ->
  516. parse_hd_name(Buffer, State, Headers, <<>>)
  517. end.
  518. match_colon(<< $:, _/bits >>, N) ->
  519. N;
  520. match_colon(<< _, Rest/bits >>, N) ->
  521. match_colon(Rest, N + 1);
  522. match_colon(_, _) ->
  523. nomatch.
  524. parse_hd_name(<< $:, Rest/bits >>, State, H, SoFar) ->
  525. parse_hd_before_value(Rest, State, H, SoFar);
  526. parse_hd_name(<< C, _/bits >>, State=#state{in_state=PS}, H, <<>>) when ?IS_WS(C) ->
  527. error_terminate(400, State#state{in_state=PS#ps_header{headers=H}},
  528. {connection_error, protocol_error,
  529. 'Whitespace is not allowed before the header name. (RFC7230 3.2)'});
  530. parse_hd_name(<< C, _/bits >>, State=#state{in_state=PS}, H, _) when ?IS_WS(C) ->
  531. error_terminate(400, State#state{in_state=PS#ps_header{headers=H}},
  532. {connection_error, protocol_error,
  533. 'Whitespace is not allowed between the header name and the colon. (RFC7230 3.2.4)'});
  534. parse_hd_name(<< C, Rest/bits >>, State, H, SoFar) ->
  535. ?LOWER(parse_hd_name, Rest, State, H, SoFar).
  536. parse_hd_before_value(<< $\s, Rest/bits >>, S, H, N) ->
  537. parse_hd_before_value(Rest, S, H, N);
  538. parse_hd_before_value(<< $\t, Rest/bits >>, S, H, N) ->
  539. parse_hd_before_value(Rest, S, H, N);
  540. parse_hd_before_value(Buffer, State=#state{opts=Opts, in_state=PS}, H, N) ->
  541. MaxLength = maps:get(max_header_value_length, Opts, 4096),
  542. case match_eol(Buffer, 0) of
  543. nomatch when byte_size(Buffer) > MaxLength ->
  544. error_terminate(431, State#state{in_state=PS#ps_header{headers=H}},
  545. {connection_error, limit_reached,
  546. 'A header value is larger than configuration allows. (RFC7230 3.2.5, RFC6585 5)'});
  547. nomatch ->
  548. {more, State#state{in_state=PS#ps_header{headers=H, name=N}}, Buffer};
  549. _ ->
  550. parse_hd_value(Buffer, State, H, N, <<>>)
  551. end.
  552. parse_hd_value(<< $\r, $\n, Rest/bits >>, S, Headers0, Name, SoFar) ->
  553. Value = clean_value_ws_end(SoFar, byte_size(SoFar) - 1),
  554. Headers = case maps:get(Name, Headers0, undefined) of
  555. undefined -> Headers0#{Name => Value};
  556. %% The cookie header does not use proper HTTP header lists.
  557. Value0 when Name =:= <<"cookie">> -> Headers0#{Name => << Value0/binary, "; ", Value/binary >>};
  558. Value0 -> Headers0#{Name => << Value0/binary, ", ", Value/binary >>}
  559. end,
  560. parse_header(Rest, S, Headers);
  561. parse_hd_value(<< C, Rest/bits >>, S, H, N, SoFar) ->
  562. parse_hd_value(Rest, S, H, N, << SoFar/binary, C >>).
  563. clean_value_ws_end(_, -1) ->
  564. <<>>;
  565. clean_value_ws_end(Value, N) ->
  566. case binary:at(Value, N) of
  567. $\s -> clean_value_ws_end(Value, N - 1);
  568. $\t -> clean_value_ws_end(Value, N - 1);
  569. _ ->
  570. S = N + 1,
  571. << Value2:S/binary, _/bits >> = Value,
  572. Value2
  573. end.
  574. -ifdef(TEST).
  575. clean_value_ws_end_test_() ->
  576. Tests = [
  577. {<<>>, <<>>},
  578. {<<" ">>, <<>>},
  579. {<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
  580. "text/html;level=2;q=0.4, */*;q=0.5 \t \t ">>,
  581. <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
  582. "text/html;level=2;q=0.4, */*;q=0.5">>}
  583. ],
  584. [{V, fun() -> R = clean_value_ws_end(V, byte_size(V) - 1) end} || {V, R} <- Tests].
  585. horse_clean_value_ws_end() ->
  586. horse:repeat(200000,
  587. clean_value_ws_end(
  588. <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
  589. "text/html;level=2;q=0.4, */*;q=0.5 ">>,
  590. byte_size(<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
  591. "text/html;level=2;q=0.4, */*;q=0.5 ">>) - 1)
  592. ).
  593. -endif.
  594. request(Buffer, State=#state{transport=Transport,
  595. in_state=PS=#ps_header{authority=Authority, version=Version}}, Headers) ->
  596. case maps:get(<<"host">>, Headers, undefined) of
  597. undefined when Version =:= 'HTTP/1.1' ->
  598. %% @todo Might want to not close the connection on this and next one.
  599. error_terminate(400, State#state{in_state=PS#ps_header{headers=Headers}},
  600. {stream_error, protocol_error,
  601. 'HTTP/1.1 requests must include a host header. (RFC7230 5.4)'});
  602. undefined ->
  603. request(Buffer, State, Headers, <<>>, default_port(Transport:secure()));
  604. %% @todo When CONNECT requests come in we need to ignore the RawHost
  605. %% and instead use the Authority as the source of host.
  606. RawHost when Authority =:= undefined; Authority =:= RawHost ->
  607. request_parse_host(Buffer, State, Headers, RawHost);
  608. %% RFC7230 does not explicitly ask us to reject requests
  609. %% that have a different authority component and host header.
  610. %% However it DOES ask clients to set them to the same value,
  611. %% so we enforce that.
  612. _ ->
  613. error_terminate(400, State#state{in_state=PS#ps_header{headers=Headers}},
  614. {stream_error, protocol_error,
  615. 'The host header is different than the absolute-form authority component. (RFC7230 5.4)'})
  616. end.
  617. request_parse_host(Buffer, State=#state{transport=Transport, in_state=PS}, Headers, RawHost) ->
  618. try cow_http_hd:parse_host(RawHost) of
  619. {Host, undefined} ->
  620. request(Buffer, State, Headers, Host, default_port(Transport:secure()));
  621. {Host, Port} when Port > 0, Port =< 65535 ->
  622. request(Buffer, State, Headers, Host, Port);
  623. _ ->
  624. error_terminate(400, State, {stream_error, protocol_error,
  625. 'The port component of the absolute-form is not in the range 0..65535. (RFC7230 2.7.1)'})
  626. catch _:_ ->
  627. error_terminate(400, State#state{in_state=PS#ps_header{headers=Headers}},
  628. {stream_error, protocol_error,
  629. 'The host header is invalid. (RFC7230 5.4)'})
  630. end.
  631. -spec default_port(boolean()) -> 80 | 443.
  632. default_port(true) -> 443;
  633. default_port(_) -> 80.
  634. %% End of request parsing.
  635. request(Buffer, State0=#state{ref=Ref, transport=Transport, peer=Peer, sock=Sock, cert=Cert,
  636. proxy_header=ProxyHeader, in_streamid=StreamID, in_state=
  637. PS=#ps_header{method=Method, path=Path, qs=Qs, version=Version}},
  638. Headers0, Host, Port) ->
  639. Scheme = case Transport:secure() of
  640. true -> <<"https">>;
  641. false -> <<"http">>
  642. end,
  643. {Headers, HasBody, BodyLength, TDecodeFun, TDecodeState} = case Headers0 of
  644. #{<<"transfer-encoding">> := TransferEncoding0} ->
  645. try cow_http_hd:parse_transfer_encoding(TransferEncoding0) of
  646. [<<"chunked">>] ->
  647. {maps:remove(<<"content-length">>, Headers0),
  648. true, undefined, fun cow_http_te:stream_chunked/2, {0, 0}};
  649. _ ->
  650. error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers0}},
  651. {stream_error, protocol_error,
  652. 'Cowboy only supports transfer-encoding: chunked. (RFC7230 3.3.1)'})
  653. catch _:_ ->
  654. error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers0}},
  655. {stream_error, protocol_error,
  656. 'The transfer-encoding header is invalid. (RFC7230 3.3.1)'})
  657. end;
  658. #{<<"content-length">> := <<"0">>} ->
  659. {Headers0, false, 0, undefined, undefined};
  660. #{<<"content-length">> := BinLength} ->
  661. Length = try
  662. cow_http_hd:parse_content_length(BinLength)
  663. catch _:_ ->
  664. error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers0}},
  665. {stream_error, protocol_error,
  666. 'The content-length header is invalid. (RFC7230 3.3.2)'})
  667. end,
  668. {Headers0, true, Length, fun cow_http_te:stream_identity/2, {0, Length}};
  669. _ ->
  670. {Headers0, false, 0, undefined, undefined}
  671. end,
  672. Req0 = #{
  673. ref => Ref,
  674. pid => self(),
  675. streamid => StreamID,
  676. peer => Peer,
  677. sock => Sock,
  678. cert => Cert,
  679. method => Method,
  680. scheme => Scheme,
  681. host => Host,
  682. port => Port,
  683. path => Path,
  684. qs => Qs,
  685. version => Version,
  686. %% We are transparently taking care of transfer-encodings so
  687. %% the user code has no need to know about it.
  688. headers => maps:remove(<<"transfer-encoding">>, Headers),
  689. has_body => HasBody,
  690. body_length => BodyLength
  691. },
  692. %% We add the PROXY header information if any.
  693. Req = case ProxyHeader of
  694. undefined -> Req0;
  695. _ -> Req0#{proxy_header => ProxyHeader}
  696. end,
  697. case is_http2_upgrade(Headers, Version) of
  698. false ->
  699. State = case HasBody of
  700. true ->
  701. State0#state{in_state=#ps_body{
  702. length = BodyLength,
  703. transfer_decode_fun = TDecodeFun,
  704. transfer_decode_state = TDecodeState
  705. }};
  706. false ->
  707. State0#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}
  708. end,
  709. {request, Req, State, Buffer};
  710. {true, HTTP2Settings} ->
  711. %% We save the headers in case the upgrade will fail
  712. %% and we need to pass them to cowboy_stream:early_error.
  713. http2_upgrade(State0#state{in_state=PS#ps_header{headers=Headers}},
  714. Buffer, HTTP2Settings, Req)
  715. end.
  716. %% HTTP/2 upgrade.
  717. %% @todo We must not upgrade to h2c over a TLS connection.
  718. is_http2_upgrade(#{<<"connection">> := Conn, <<"upgrade">> := Upgrade,
  719. <<"http2-settings">> := HTTP2Settings}, 'HTTP/1.1') ->
  720. Conns = cow_http_hd:parse_connection(Conn),
  721. case {lists:member(<<"upgrade">>, Conns), lists:member(<<"http2-settings">>, Conns)} of
  722. {true, true} ->
  723. Protocols = cow_http_hd:parse_upgrade(Upgrade),
  724. case lists:member(<<"h2c">>, Protocols) of
  725. true ->
  726. {true, HTTP2Settings};
  727. false ->
  728. false
  729. end;
  730. _ ->
  731. false
  732. end;
  733. is_http2_upgrade(_, _) ->
  734. false.
  735. %% Prior knowledge upgrade, without an HTTP/1.1 request.
  736. http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
  737. proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert}, Buffer) ->
  738. case Transport:secure() of
  739. false ->
  740. _ = cancel_timeout(State),
  741. cowboy_http2:init(Parent, Ref, Socket, Transport,
  742. ProxyHeader, Opts, Peer, Sock, Cert, Buffer);
  743. true ->
  744. error_terminate(400, State, {connection_error, protocol_error,
  745. 'Clients that support HTTP/2 over TLS MUST use ALPN. (RFC7540 3.4)'})
  746. end.
  747. %% Upgrade via an HTTP/1.1 request.
  748. http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
  749. proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert},
  750. Buffer, HTTP2Settings, Req) ->
  751. %% @todo
  752. %% However if the client sent a body, we need to read the body in full
  753. %% and if we can't do that, return a 413 response. Some options are in order.
  754. %% Always half-closed stream coming from this side.
  755. try cow_http_hd:parse_http2_settings(HTTP2Settings) of
  756. Settings ->
  757. _ = cancel_timeout(State),
  758. cowboy_http2:init(Parent, Ref, Socket, Transport,
  759. ProxyHeader, Opts, Peer, Sock, Cert, Buffer, Settings, Req)
  760. catch _:_ ->
  761. error_terminate(400, State, {connection_error, protocol_error,
  762. 'The HTTP2-Settings header must contain a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'})
  763. end.
  764. %% Request body parsing.
  765. parse_body(Buffer, State=#state{in_streamid=StreamID, in_state=
  766. PS=#ps_body{received=Received, transfer_decode_fun=TDecode,
  767. transfer_decode_state=TState0}}) ->
  768. %% @todo Proper trailers.
  769. try TDecode(Buffer, TState0) of
  770. more ->
  771. %% @todo Asks for 0 or more bytes.
  772. {more, State, Buffer};
  773. {more, Data, TState} ->
  774. %% @todo Asks for 0 or more bytes.
  775. {data, StreamID, nofin, Data, State#state{in_state=
  776. PS#ps_body{received=Received + byte_size(Data),
  777. transfer_decode_state=TState}}, <<>>};
  778. {more, Data, _Length, TState} when is_integer(_Length) ->
  779. %% @todo Asks for Length more bytes.
  780. {data, StreamID, nofin, Data, State#state{in_state=
  781. PS#ps_body{received=Received + byte_size(Data),
  782. transfer_decode_state=TState}}, <<>>};
  783. {more, Data, Rest, TState} ->
  784. %% @todo Asks for 0 or more bytes.
  785. {data, StreamID, nofin, Data, State#state{in_state=
  786. PS#ps_body{received=Received + byte_size(Data),
  787. transfer_decode_state=TState}}, Rest};
  788. {done, _HasTrailers, Rest} ->
  789. {data, StreamID, fin, <<>>, set_timeout(
  790. State#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}), Rest};
  791. {done, Data, _HasTrailers, Rest} ->
  792. {data, StreamID, fin, Data, set_timeout(
  793. State#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}), Rest}
  794. catch _:_ ->
  795. Reason = {connection_error, protocol_error,
  796. 'Failure to decode the content. (RFC7230 4)'},
  797. terminate(stream_terminate(State, StreamID, Reason), Reason)
  798. end.
  799. %% Message handling.
  800. down(State=#state{opts=Opts, children=Children0}, Pid, Msg) ->
  801. case cowboy_children:down(Children0, Pid) of
  802. %% The stream was terminated already.
  803. {ok, undefined, Children} ->
  804. State#state{children=Children};
  805. %% The stream is still running.
  806. {ok, StreamID, Children} ->
  807. info(State#state{children=Children}, StreamID, Msg);
  808. %% The process was unknown.
  809. error ->
  810. cowboy:log(warning, "Received EXIT signal ~p for unknown process ~p.~n",
  811. [Msg, Pid], Opts),
  812. State
  813. end.
  814. info(State=#state{opts=Opts, streams=Streams0}, StreamID, Msg) ->
  815. case lists:keyfind(StreamID, #stream.id, Streams0) of
  816. Stream = #stream{state=StreamState0} ->
  817. try cowboy_stream:info(StreamID, Msg, StreamState0) of
  818. {Commands, StreamState} ->
  819. Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
  820. Stream#stream{state=StreamState}),
  821. commands(State#state{streams=Streams}, StreamID, Commands)
  822. catch Class:Exception ->
  823. cowboy:log(cowboy_stream:make_error_log(info,
  824. [StreamID, Msg, StreamState0],
  825. Class, Exception, erlang:get_stacktrace()), Opts),
  826. stream_reset(State, StreamID, {internal_error, {Class, Exception},
  827. 'Unhandled exception in cowboy_stream:info/3.'})
  828. end;
  829. false ->
  830. cowboy:log(warning, "Received message ~p for unknown stream ~p.~n",
  831. [Msg, StreamID], Opts),
  832. State
  833. end.
  834. %% Commands.
  835. commands(State, _, []) ->
  836. State;
  837. %% Supervise a child process.
  838. commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) ->
  839. commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)},
  840. StreamID, Tail);
  841. %% Error handling.
  842. commands(State, StreamID, [Error = {internal_error, _, _}|Tail]) ->
  843. commands(stream_reset(State, StreamID, Error), StreamID, Tail);
  844. %% Commands for a stream currently inactive.
  845. commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Commands)
  846. when Current =/= StreamID ->
  847. %% @todo We still want to handle some commands...
  848. Stream = #stream{queue=Queue} = lists:keyfind(StreamID, #stream.id, Streams0),
  849. Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
  850. Stream#stream{queue=Queue ++ Commands}),
  851. State#state{streams=Streams};
  852. %% Read the request body.
  853. commands(State, StreamID, [{flow, _Length}|Tail]) ->
  854. %% @todo We only read from socket if buffer is empty, otherwise
  855. %% we decode the buffer.
  856. %% @todo Set the body reading length to min(Length, BodyLength)
  857. commands(State, StreamID, Tail);
  858. %% Error responses are sent only if a response wasn't sent already.
  859. commands(State=#state{out_state=wait, out_streamid=StreamID}, StreamID,
  860. [{error_response, Status, Headers0, Body}|Tail]) ->
  861. %% We close the connection when the error response is 408, as it
  862. %% indicates a timeout and the RFC recommends that we stop here. (RFC7231 6.5.7)
  863. Headers = case Status of
  864. 408 -> Headers0#{<<"connection">> => <<"close">>};
  865. <<"408", _/bits>> -> Headers0#{<<"connection">> => <<"close">>};
  866. _ -> Headers0
  867. end,
  868. commands(State, StreamID, [{response, Status, Headers, Body}|Tail]);
  869. commands(State, StreamID, [{error_response, _, _, _}|Tail]) ->
  870. commands(State, StreamID, Tail);
  871. %% Send an informational response.
  872. commands(State=#state{socket=Socket, transport=Transport, out_state=wait, streams=Streams},
  873. StreamID, [{inform, StatusCode, Headers}|Tail]) ->
  874. %% @todo I'm pretty sure the last stream in the list is the one we want
  875. %% considering all others are queued.
  876. #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
  877. _ = case Version of
  878. 'HTTP/1.1' ->
  879. Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1',
  880. headers_to_list(Headers)));
  881. %% Do not send informational responses to HTTP/1.0 clients. (RFC7231 6.2)
  882. 'HTTP/1.0' ->
  883. ok
  884. end,
  885. commands(State, StreamID, Tail);
  886. %% Send a full response.
  887. %%
  888. %% @todo Kill the stream if it sent a response when one has already been sent.
  889. %% @todo Keep IsFin in the state.
  890. %% @todo Same two things above apply to DATA, possibly promise too.
  891. commands(State0=#state{socket=Socket, transport=Transport, out_state=wait, streams=Streams}, StreamID,
  892. [{response, StatusCode, Headers0, Body}|Tail]) ->
  893. %% @todo I'm pretty sure the last stream in the list is the one we want
  894. %% considering all others are queued.
  895. #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
  896. {State1, Headers} = connection(State0, Headers0, StreamID, Version),
  897. State = State1#state{out_state=done},
  898. %% @todo Ensure content-length is set.
  899. Response = cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers)),
  900. case Body of
  901. {sendfile, _, _, _} ->
  902. Transport:send(Socket, Response),
  903. sendfile(State, Body);
  904. _ ->
  905. Transport:send(Socket, [Response, Body])
  906. end,
  907. commands(State, StreamID, Tail);
  908. %% Send response headers and initiate chunked encoding or streaming.
  909. commands(State0=#state{socket=Socket, transport=Transport,
  910. opts=Opts, overriden_opts=Override, streams=Streams0, out_state=OutState},
  911. StreamID, [{headers, StatusCode, Headers0}|Tail]) ->
  912. %% @todo Same as above (about the last stream in the list).
  913. Stream = #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams0),
  914. Status = cow_http:status_to_integer(StatusCode),
  915. ContentLength = maps:get(<<"content-length">>, Headers0, undefined),
  916. %% Chunked transfer-encoding can be disabled on a per-request basis.
  917. Chunked = case Override of
  918. #{chunked := Chunked0} -> Chunked0;
  919. _ -> maps:get(chunked, Opts, true)
  920. end,
  921. {State1, Headers1} = case {Status, ContentLength, Version} of
  922. {204, _, 'HTTP/1.1'} ->
  923. {State0#state{out_state=done}, Headers0};
  924. {304, _, 'HTTP/1.1'} ->
  925. {State0#state{out_state=done}, Headers0};
  926. {_, undefined, 'HTTP/1.1'} when Chunked ->
  927. {State0#state{out_state=chunked}, Headers0#{<<"transfer-encoding">> => <<"chunked">>}};
  928. %% Close the connection after streaming without content-length
  929. %% to all HTTP/1.0 clients and to HTTP/1.1 clients when chunked is disabled.
  930. {_, undefined, _} ->
  931. {State0#state{out_state=streaming, last_streamid=StreamID}, Headers0};
  932. %% Stream the response body without chunked transfer-encoding.
  933. _ ->
  934. ExpectedSize = cow_http_hd:parse_content_length(ContentLength),
  935. Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
  936. Stream#stream{local_expected_size=ExpectedSize}),
  937. {State0#state{out_state=streaming, streams=Streams}, Headers0}
  938. end,
  939. Headers2 = case stream_te(OutState, Stream) of
  940. trailers -> Headers1;
  941. _ -> maps:remove(<<"trailer">>, Headers1)
  942. end,
  943. {State, Headers} = connection(State1, Headers2, StreamID, Version),
  944. Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers))),
  945. commands(State, StreamID, Tail);
  946. %% Send a response body chunk.
  947. %% @todo We need to kill the stream if it tries to send data before headers.
  948. commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out_state=OutState},
  949. StreamID, [{data, IsFin, Data}|Tail]) ->
  950. %% Do not send anything when the user asks to send an empty
  951. %% data frame, as that would break the protocol.
  952. Size = case Data of
  953. {sendfile, _, B, _} -> B;
  954. _ -> iolist_size(Data)
  955. end,
  956. %% Depending on the current state we may need to send nothing,
  957. %% the last chunk, chunked data with/without the last chunk,
  958. %% or just the data as-is.
  959. Stream = case lists:keyfind(StreamID, #stream.id, Streams0) of
  960. Stream0=#stream{method= <<"HEAD">>} ->
  961. Stream0;
  962. Stream0 when Size =:= 0, IsFin =:= fin, OutState =:= chunked ->
  963. Transport:send(Socket, <<"0\r\n\r\n">>),
  964. Stream0;
  965. Stream0 when Size =:= 0 ->
  966. Stream0;
  967. Stream0 when is_tuple(Data), OutState =:= chunked ->
  968. Transport:send(Socket, [integer_to_binary(Size, 16), <<"\r\n">>]),
  969. sendfile(State0, Data),
  970. Transport:send(Socket,
  971. case IsFin of
  972. fin -> <<"\r\n0\r\n\r\n">>;
  973. nofin -> <<"\r\n">>
  974. end),
  975. Stream0;
  976. Stream0 when OutState =:= chunked ->
  977. Transport:send(Socket, [
  978. integer_to_binary(Size, 16), <<"\r\n">>, Data,
  979. case IsFin of
  980. fin -> <<"\r\n0\r\n\r\n">>;
  981. nofin -> <<"\r\n">>
  982. end
  983. ]),
  984. Stream0;
  985. Stream0 when OutState =:= streaming ->
  986. #stream{local_sent_size=SentSize0, local_expected_size=ExpectedSize} = Stream0,
  987. SentSize = SentSize0 + Size,
  988. if
  989. %% ExpectedSize may be undefined, which is > any integer value.
  990. SentSize > ExpectedSize ->
  991. terminate(State0, response_body_too_large);
  992. is_tuple(Data) ->
  993. sendfile(State0, Data);
  994. true ->
  995. Transport:send(Socket, Data)
  996. end,
  997. Stream0#stream{local_sent_size=SentSize}
  998. end,
  999. State = case IsFin of
  1000. fin -> State0#state{out_state=done};
  1001. nofin -> State0
  1002. end,
  1003. Streams = lists:keyreplace(StreamID, #stream.id, Streams0, Stream),
  1004. commands(State#state{streams=Streams}, StreamID, Tail);
  1005. commands(State=#state{socket=Socket, transport=Transport, streams=Streams, out_state=OutState},
  1006. StreamID, [{trailers, Trailers}|Tail]) ->
  1007. case stream_te(OutState, lists:keyfind(StreamID, #stream.id, Streams)) of
  1008. trailers ->
  1009. Transport:send(Socket, [
  1010. <<"0\r\n">>,
  1011. cow_http:headers(maps:to_list(Trailers)),
  1012. <<"\r\n">>
  1013. ]);
  1014. no_trailers ->
  1015. Transport:send(Socket, <<"0\r\n\r\n">>);
  1016. not_chunked ->
  1017. ok
  1018. end,
  1019. commands(State#state{out_state=done}, StreamID, Tail);
  1020. %% Protocol takeover.
  1021. commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport,
  1022. out_state=OutState, opts=Opts, children=Children}, StreamID,
  1023. [{switch_protocol, Headers, Protocol, InitialState}|_Tail]) ->
  1024. %% @todo This should be the last stream running otherwise we need to wait before switching.
  1025. %% @todo If there's streams opened after this one, fail instead of 101.
  1026. State = cancel_timeout(State0),
  1027. %% Before we send the 101 response we need to stop receiving data
  1028. %% from the socket, otherwise the data might be receive before the
  1029. %% call to flush/0 and we end up inadvertently dropping a packet.
  1030. %%
  1031. %% @todo Handle cases where the request came with a body. We need
  1032. %% to process or skip the body before the upgrade can be completed.
  1033. Transport:setopts(Socket, [{active, false}]),
  1034. %% Send a 101 response if necessary, then terminate the stream.
  1035. #state{streams=Streams} = case OutState of
  1036. wait -> info(State, StreamID, {inform, 101, Headers});
  1037. _ -> State
  1038. end,
  1039. #stream{state=StreamState} = lists:keyfind(StreamID, #stream.id, Streams),
  1040. %% @todo We need to shutdown processes here first.
  1041. stream_call_terminate(StreamID, switch_protocol, StreamState, State),
  1042. %% Terminate children processes and flush any remaining messages from the mailbox.
  1043. cowboy_children:terminate(Children),
  1044. flush(Parent),
  1045. %% @todo This is no good because commands return a state normally and here it doesn't
  1046. %% we need to let this module go entirely. Perhaps it should be handled directly in
  1047. %% cowboy_clear/cowboy_tls?
  1048. Protocol:takeover(Parent, Ref, Socket, Transport, Opts, <<>>, InitialState);
  1049. %% Set options dynamically.
  1050. commands(State0=#state{overriden_opts=Opts},
  1051. StreamID, [{set_options, SetOpts}|Tail]) ->
  1052. State1 = case SetOpts of
  1053. #{idle_timeout := IdleTimeout} ->
  1054. set_timeout(State0#state{overriden_opts=Opts#{idle_timeout => IdleTimeout}});
  1055. _ ->
  1056. State0
  1057. end,
  1058. State = case SetOpts of
  1059. #{chunked := Chunked} ->
  1060. State1#state{overriden_opts=Opts#{chunked => Chunked}};
  1061. _ ->
  1062. State1
  1063. end,
  1064. commands(State, StreamID, Tail);
  1065. %% Stream shutdown.
  1066. commands(State, StreamID, [stop|Tail]) ->
  1067. %% @todo Do we want to run the commands after a stop?
  1068. %% @todo We currently wait for the stop command before we
  1069. %% continue with the next request/response. In theory, if
  1070. %% the request body was read fully and the response body
  1071. %% was sent fully we should be able to start working on
  1072. %% the next request concurrently. This can be done as a
  1073. %% future optimization.
  1074. maybe_terminate(State, StreamID, Tail);
  1075. %% Log event.
  1076. commands(State=#state{opts=Opts}, StreamID, [Log={log, _, _, _}|Tail]) ->
  1077. cowboy:log(Log, Opts),
  1078. commands(State, StreamID, Tail);
  1079. %% HTTP/1.1 does not support push; ignore.
  1080. commands(State, StreamID, [{push, _, _, _, _, _, _, _}|Tail]) ->
  1081. commands(State, StreamID, Tail).
  1082. %% The set-cookie header is special; we can only send one cookie per header.
  1083. headers_to_list(Headers0=#{<<"set-cookie">> := SetCookies}) ->
  1084. Headers1 = maps:to_list(maps:remove(<<"set-cookie">>, Headers0)),
  1085. Headers1 ++ [{<<"set-cookie">>, Value} || Value <- SetCookies];
  1086. headers_to_list(Headers) ->
  1087. maps:to_list(Headers).
  1088. %% We wrap the sendfile call into a try/catch because on OTP-20
  1089. %% and earlier a few different crashes could occur for sockets
  1090. %% that were closing or closed. For example a badarg in
  1091. %% erlang:port_get_data(#Port<...>) or a badmatch like
  1092. %% {{badmatch,{error,einval}},[{prim_file,sendfile,8,[]}...
  1093. %%
  1094. %% OTP-21 uses a NIF instead of a port so the implementation
  1095. %% and behavior has dramatically changed and it is unclear
  1096. %% whether it will be necessary in the future.
  1097. %%
  1098. %% This try/catch prevents some noisy logs to be written
  1099. %% when these errors occur.
  1100. sendfile(State=#state{socket=Socket, transport=Transport, opts=Opts},
  1101. {sendfile, Offset, Bytes, Path}) ->
  1102. try
  1103. %% When sendfile is disabled we explicitly use the fallback.
  1104. _ = case maps:get(sendfile, Opts, true) of
  1105. true -> Transport:sendfile(Socket, Path, Offset, Bytes);
  1106. false -> ranch_transport:sendfile(Transport, Socket, Path, Offset, Bytes, [])
  1107. end,
  1108. ok
  1109. catch _:_ ->
  1110. terminate(State, {socket_error, sendfile_crash,
  1111. 'An error occurred when using the sendfile function.'})
  1112. end.
  1113. %% Flush messages specific to cowboy_http before handing over the
  1114. %% connection to another protocol.
  1115. flush(Parent) ->
  1116. receive
  1117. {timeout, _, _} ->
  1118. flush(Parent);
  1119. {{Pid, _}, _} when Pid =:= self() ->
  1120. flush(Parent);
  1121. {'EXIT', Pid, _} when Pid =/= Parent ->
  1122. flush(Parent)
  1123. after 0 ->
  1124. ok
  1125. end.
  1126. %% @todo In these cases I'm not sure if we should continue processing commands.
  1127. maybe_terminate(State=#state{last_streamid=StreamID}, StreamID, _Tail) ->
  1128. terminate(stream_terminate(State, StreamID, normal), normal); %% @todo Reason ok?
  1129. maybe_terminate(State, StreamID, _Tail) ->
  1130. stream_terminate(State, StreamID, normal).
  1131. stream_reset(State, StreamID, StreamError={internal_error, _, _}) ->
  1132. %% @todo headers
  1133. %% @todo Don't send this if there are no streams left.
  1134. % Transport:send(Socket, cow_http:response(500, 'HTTP/1.1', [
  1135. % {<<"content-length">>, <<"0">>}
  1136. % ])),
  1137. %% @todo update IsFin local
  1138. % stream_terminate(State#state{out_state=done}, StreamID, StreamError).
  1139. stream_terminate(State, StreamID, StreamError).
  1140. stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InState,
  1141. out_streamid=OutStreamID, out_state=OutState, streams=Streams0,
  1142. children=Children0}, StreamID, Reason) ->
  1143. #stream{version=Version, local_expected_size=ExpectedSize, local_sent_size=SentSize}
  1144. = lists:keyfind(StreamID, #stream.id, Streams0),
  1145. State1 = #state{streams=Streams1} = case OutState of
  1146. wait when element(1, Reason) =:= internal_error ->
  1147. info(State0, StreamID, {response, 500, #{<<"content-length">> => <<"0">>}, <<>>});
  1148. wait when element(1, Reason) =:= connection_error ->
  1149. info(State0, StreamID, {response, 400, #{<<"content-length">> => <<"0">>}, <<>>});
  1150. wait ->
  1151. info(State0, StreamID, {response, 204, #{}, <<>>});
  1152. chunked when Version =:= 'HTTP/1.1' ->
  1153. info(State0, StreamID, {data, fin, <<>>});
  1154. streaming when SentSize < ExpectedSize ->
  1155. terminate(State0, response_body_too_small);
  1156. _ -> %% done or Version =:= 'HTTP/1.0'
  1157. State0
  1158. end,
  1159. %% Remove the stream from the state and reset the overriden options.
  1160. {value, #stream{state=StreamState}, Streams}
  1161. = lists:keytake(StreamID, #stream.id, Streams1),
  1162. State2 = State1#state{streams=Streams, overriden_opts=#{}},
  1163. %% Stop the stream.
  1164. stream_call_terminate(StreamID, Reason, StreamState, State2),
  1165. Children = cowboy_children:shutdown(Children0, StreamID),
  1166. %% We reset the timeout if there are no active streams anymore.
  1167. State = case Streams of
  1168. [] -> set_timeout(State2);
  1169. _ -> State2
  1170. end,
  1171. %% We want to drop the connection if the body was not read fully
  1172. %% and we don't know its length or more remains to be read than
  1173. %% configuration allows.
  1174. %% @todo Only do this if Current =:= StreamID.
  1175. MaxSkipBodyLength = maps:get(max_skip_body_length, Opts, 1000000),
  1176. case InState of
  1177. #ps_body{length=undefined}
  1178. when InStreamID =:= OutStreamID ->
  1179. terminate(State#state{streams=Streams, children=Children}, skip_body_unknown_length);
  1180. #ps_body{length=Len, received=Received}
  1181. when InStreamID =:= OutStreamID, Received + MaxSkipBodyLength < Len ->
  1182. terminate(State#state{streams=Streams, children=Children}, skip_body_too_large);
  1183. _ ->
  1184. %% Move on to the next stream.
  1185. NextOutStreamID = OutStreamID + 1,
  1186. case lists:keyfind(NextOutStreamID, #stream.id, Streams) of
  1187. false ->
  1188. %% @todo This is clearly wrong, if the stream is gone we need to check if
  1189. %% there used to be such a stream, and if there was to send an error.
  1190. State#state{out_streamid=NextOutStreamID, out_state=wait,
  1191. streams=Streams, children=Children};
  1192. #stream{queue=Commands} ->
  1193. %% @todo Remove queue from the stream.
  1194. commands(State#state{out_streamid=NextOutStreamID, out_state=wait,
  1195. streams=Streams, children=Children}, NextOutStreamID, Commands)
  1196. end
  1197. end.
  1198. stream_call_terminate(StreamID, Reason, StreamState, #state{opts=Opts}) ->
  1199. try
  1200. cowboy_stream:terminate(StreamID, Reason, StreamState)
  1201. catch Class:Exception ->
  1202. cowboy:log(cowboy_stream:make_error_log(terminate,
  1203. [StreamID, Reason, StreamState],
  1204. Class, Exception, erlang:get_stacktrace()), Opts)
  1205. end.
  1206. maybe_req_close(#state{opts=#{http10_keepalive := false}}, _, 'HTTP/1.0') ->
  1207. close;
  1208. maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.0') ->
  1209. Conns = cow_http_hd:parse_connection(Conn),
  1210. case lists:member(<<"keep-alive">>, Conns) of
  1211. true -> keepalive;
  1212. false -> close
  1213. end;
  1214. maybe_req_close(_, _, 'HTTP/1.0') ->
  1215. close;
  1216. maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.1') ->
  1217. case connection_hd_is_close(Conn) of
  1218. true -> close;
  1219. false -> keepalive
  1220. end;
  1221. maybe_req_close(_, _, _) ->
  1222. keepalive.
  1223. connection(State=#state{last_streamid=StreamID}, Headers=#{<<"connection">> := Conn}, StreamID, _) ->
  1224. case connection_hd_is_close(Conn) of
  1225. true -> {State, Headers};
  1226. %% @todo Here we need to remove keep-alive and add close, not just add close.
  1227. false -> {State, Headers#{<<"connection">> => [<<"close, ">>, Conn]}}
  1228. end;
  1229. connection(State=#state{last_streamid=StreamID}, Headers, StreamID, _) ->
  1230. {State, Headers#{<<"connection">> => <<"close">>}};
  1231. connection(State, Headers=#{<<"connection">> := Conn}, StreamID, _) ->
  1232. case connection_hd_is_close(Conn) of
  1233. true -> {State#state{last_streamid=StreamID}, Headers};
  1234. %% @todo Here we need to set keep-alive only if it wasn't set before.
  1235. false -> {State, Headers}
  1236. end;
  1237. connection(State, Headers, _, 'HTTP/1.0') ->
  1238. {State, Headers#{<<"connection">> => <<"keep-alive">>}};
  1239. connection(State, Headers, _, _) ->
  1240. {State, Headers}.
  1241. connection_hd_is_close(Conn) ->
  1242. Conns = cow_http_hd:parse_connection(iolist_to_binary(Conn)),
  1243. lists:member(<<"close">>, Conns).
  1244. stream_te(streaming, _) ->
  1245. not_chunked;
  1246. %% No TE header was sent.
  1247. stream_te(_, #stream{te=undefined}) ->
  1248. no_trailers;
  1249. stream_te(_, #stream{te=TE0}) ->
  1250. try cow_http_hd:parse_te(TE0) of
  1251. {TE1, _} -> TE1
  1252. catch _:_ ->
  1253. %% If we can't parse the TE header, assume we can't send trailers.
  1254. no_trailers
  1255. end.
  1256. %% This function is only called when an error occurs on a new stream.
  1257. -spec error_terminate(cowboy:http_status(), #state{}, _) -> no_return().
  1258. error_terminate(StatusCode, State=#state{ref=Ref, peer=Peer, in_state=StreamState}, Reason) ->
  1259. PartialReq = case StreamState of
  1260. #ps_request_line{} -> #{
  1261. ref => Ref,
  1262. peer => Peer
  1263. };
  1264. #ps_header{method=Method, path=Path, qs=Qs,
  1265. version=Version, headers=ReqHeaders} -> #{
  1266. ref => Ref,
  1267. peer => Peer,
  1268. method => Method,
  1269. path => Path,
  1270. qs => Qs,
  1271. version => Version,
  1272. headers => case ReqHeaders of
  1273. undefined -> #{};
  1274. _ -> ReqHeaders
  1275. end
  1276. }
  1277. end,
  1278. early_error(StatusCode, State, Reason, PartialReq, #{<<"connection">> => <<"close">>}),
  1279. terminate(State, Reason).
  1280. early_error(StatusCode, State, Reason, PartialReq) ->
  1281. early_error(StatusCode, State, Reason, PartialReq, #{}).
  1282. early_error(StatusCode0, #state{socket=Socket, transport=Transport,
  1283. opts=Opts, in_streamid=StreamID}, Reason, PartialReq, RespHeaders0) ->
  1284. RespHeaders1 = RespHeaders0#{<<"content-length">> => <<"0">>},
  1285. Resp = {response, StatusCode0, RespHeaders1, <<>>},
  1286. try cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts) of
  1287. {response, StatusCode, RespHeaders, RespBody} ->
  1288. Transport:send(Socket, [
  1289. cow_http:response(StatusCode, 'HTTP/1.1', maps:to_list(RespHeaders)),
  1290. %% @todo We shouldn't send the body when the method is HEAD.
  1291. %% @todo Technically we allow the sendfile tuple.
  1292. RespBody
  1293. ])
  1294. catch Class:Exception ->
  1295. cowboy:log(cowboy_stream:make_error_log(early_error,
  1296. [StreamID, Reason, PartialReq, Resp, Opts],
  1297. Class, Exception, erlang:get_stacktrace()), Opts),
  1298. %% We still need to send an error response, so send what we initially
  1299. %% wanted to send. It's better than nothing.
  1300. Transport:send(Socket, cow_http:response(StatusCode0,
  1301. 'HTTP/1.1', maps:to_list(RespHeaders1)))
  1302. end,
  1303. ok.
  1304. -spec terminate(_, _) -> no_return().
  1305. terminate(undefined, Reason) ->
  1306. exit({shutdown, Reason});
  1307. terminate(State=#state{streams=Streams, children=Children}, Reason) ->
  1308. terminate_all_streams(State, Streams, Reason),
  1309. cowboy_children:terminate(Children),
  1310. terminate_linger(State),
  1311. exit({shutdown, Reason}).
  1312. terminate_all_streams(_, [], _) ->
  1313. ok;
  1314. terminate_all_streams(State, [#stream{id=StreamID, state=StreamState}|Tail], Reason) ->
  1315. stream_call_terminate(StreamID, Reason, StreamState, State),
  1316. terminate_all_streams(State, Tail, Reason).
  1317. terminate_linger(State=#state{socket=Socket, transport=Transport, opts=Opts}) ->
  1318. case Transport:shutdown(Socket, write) of
  1319. ok ->
  1320. case maps:get(linger_timeout, Opts, 1000) of
  1321. 0 ->
  1322. ok;
  1323. infinity ->
  1324. terminate_linger_loop(State, undefined);
  1325. Timeout ->
  1326. TimerRef = erlang:start_timer(Timeout, self(), linger_timeout),
  1327. terminate_linger_loop(State, TimerRef)
  1328. end;
  1329. {error, _} ->
  1330. ok
  1331. end.
  1332. terminate_linger_loop(State=#state{socket=Socket, transport=Transport}, TimerRef) ->
  1333. Messages = Transport:messages(),
  1334. %% We may already have a message in the mailbox when we do this
  1335. %% but it's OK because we are shutting down anyway.
  1336. case Transport:setopts(Socket, [{active, once}]) of
  1337. ok ->
  1338. receive
  1339. {OK, Socket, _} when OK =:= element(1, Messages) ->
  1340. terminate_linger_loop(State, TimerRef);
  1341. {Closed, Socket} when Closed =:= element(2, Messages) ->
  1342. ok;
  1343. {Error, Socket, _} when Error =:= element(3, Messages) ->
  1344. ok;
  1345. {timeout, TimerRef, linger_timeout} ->
  1346. ok;
  1347. _ ->
  1348. terminate_linger_loop(State, TimerRef)
  1349. end;
  1350. {error, _} ->
  1351. ok
  1352. end.
  1353. %% System callbacks.
  1354. -spec system_continue(_, _, {#state{}, binary()}) -> ok.
  1355. system_continue(_, _, {State, Buffer}) ->
  1356. loop(State, Buffer).
  1357. -spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return().
  1358. system_terminate(Reason, _, _, {State, _}) ->
  1359. terminate(State, {stop, {exit, Reason}, 'sys:terminate/2,3 was called.'}).
  1360. -spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}.
  1361. system_code_change(Misc, _, _, _) ->
  1362. {ok, Misc}.