cowboy_http.erl 52 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335
  1. %% Copyright (c) 2016-2017, Loïc Hoguin <essen@ninenines.eu>
  2. %%
  3. %% Permission to use, copy, modify, and/or distribute this software for any
  4. %% purpose with or without fee is hereby granted, provided that the above
  5. %% copyright notice and this permission notice appear in all copies.
  6. %%
  7. %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  8. %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  9. %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  10. %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  11. %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  12. %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  13. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  14. -module(cowboy_http).
  15. -export([init/5]).
  16. -export([system_continue/3]).
  17. -export([system_terminate/4]).
  18. -export([system_code_change/4]).
  19. -type opts() :: #{
  20. connection_type => worker | supervisor,
  21. env => cowboy_middleware:env(),
  22. idle_timeout => timeout(),
  23. inactivity_timeout => timeout(),
  24. linger_timeout => timeout(),
  25. max_authority_length => non_neg_integer(),
  26. max_empty_lines => non_neg_integer(),
  27. max_header_name_length => non_neg_integer(),
  28. max_header_value_length => non_neg_integer(),
  29. max_headers => non_neg_integer(),
  30. max_keepalive => non_neg_integer(),
  31. max_method_length => non_neg_integer(),
  32. max_request_line_length => non_neg_integer(),
  33. middlewares => [module()],
  34. request_timeout => timeout(),
  35. shutdown_timeout => timeout(),
  36. stream_handlers => [module()]
  37. }.
  38. -export_type([opts/0]).
  39. -record(ps_request_line, {
  40. empty_lines = 0 :: non_neg_integer()
  41. }).
  42. -record(ps_header, {
  43. method = undefined :: binary(),
  44. authority = undefined :: binary() | undefined,
  45. path = undefined :: binary(),
  46. qs = undefined :: binary(),
  47. version = undefined :: cowboy:http_version(),
  48. headers = undefined :: map() | undefined, %% @todo better type than map()
  49. name = undefined :: binary() | undefined
  50. }).
  51. %% @todo We need a state where we wait for the stream process to ask for the body
  52. %% and do not attempt to read from the socket while in that state (we should read
  53. %% up to a certain length, and then wait, basically implementing flow control but
  54. %% by not reading from the socket when the window is empty).
  55. -record(ps_body, {
  56. length :: non_neg_integer() | undefined,
  57. received = 0 :: non_neg_integer(),
  58. %% @todo flow
  59. transfer_decode_fun :: fun(), %% @todo better type
  60. transfer_decode_state :: any() %% @todo better type
  61. }).
  62. -record(stream, {
  63. id = undefined :: cowboy_stream:streamid(),
  64. %% Stream handlers and their state.
  65. state = undefined :: {module(), any()},
  66. %% Request method.
  67. method = undefined :: binary(),
  68. %% Client HTTP version for this stream.
  69. version = undefined :: cowboy:http_version(),
  70. %% Unparsed te header. Used to know if we can send trailers.
  71. te :: undefined | binary(),
  72. %% Commands queued.
  73. queue = [] :: cowboy_stream:commands()
  74. }).
  75. -type stream() :: #stream{}.
  76. -record(state, {
  77. parent :: pid(),
  78. ref :: ranch:ref(),
  79. socket :: inet:socket(),
  80. transport :: module(),
  81. opts = #{} :: map(),
  82. %% Remote address and port for the connection.
  83. peer = undefined :: {inet:ip_address(), inet:port_number()},
  84. %% Local address and port for the connection.
  85. sock = undefined :: {inet:ip_address(), inet:port_number()},
  86. %% Client certificate (TLS only).
  87. cert :: undefined | binary(),
  88. timer = undefined :: undefined | reference(),
  89. %% Identifier for the stream currently being read (or waiting to be received).
  90. in_streamid = 1 :: pos_integer(),
  91. %% Parsing state for the current stream or stream-to-be.
  92. in_state = #ps_request_line{} :: #ps_request_line{} | #ps_header{} | #ps_body{},
  93. %% Identifier for the stream currently being written.
  94. %% Note that out_streamid =< in_streamid.
  95. out_streamid = 1 :: pos_integer(),
  96. %% Whether we finished writing data for the current stream.
  97. out_state = wait :: wait | chunked | done,
  98. %% The connection will be closed after this stream.
  99. last_streamid = undefined :: pos_integer(),
  100. %% Currently active HTTP/1.1 streams.
  101. streams = [] :: [stream()],
  102. %% Children processes created by streams.
  103. children = cowboy_children:init() :: cowboy_children:children()
  104. }).
  105. -include_lib("cowlib/include/cow_inline.hrl").
  106. -include_lib("cowlib/include/cow_parse.hrl").
  107. -spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts()) -> ok.
  108. init(Parent, Ref, Socket, Transport, Opts) ->
  109. Peer0 = Transport:peername(Socket),
  110. Sock0 = Transport:sockname(Socket),
  111. Cert1 = case Transport:name() of
  112. ssl ->
  113. case ssl:peercert(Socket) of
  114. {error, no_peercert} ->
  115. {ok, undefined};
  116. Cert0 ->
  117. Cert0
  118. end;
  119. _ ->
  120. {ok, undefined}
  121. end,
  122. case {Peer0, Sock0, Cert1} of
  123. {{ok, Peer}, {ok, Sock}, {ok, Cert}} ->
  124. LastStreamID = maps:get(max_keepalive, Opts, 100),
  125. before_loop(set_timeout(#state{
  126. parent=Parent, ref=Ref, socket=Socket,
  127. transport=Transport, opts=Opts,
  128. peer=Peer, sock=Sock, cert=Cert,
  129. last_streamid=LastStreamID}), <<>>);
  130. {{error, Reason}, _, _} ->
  131. terminate(undefined, {socket_error, Reason,
  132. 'A socket error occurred when retrieving the peer name.'});
  133. {_, {error, Reason}, _} ->
  134. terminate(undefined, {socket_error, Reason,
  135. 'A socket error occurred when retrieving the sock name.'});
  136. {_, _, {error, Reason}} ->
  137. terminate(undefined, {socket_error, Reason,
  138. 'A socket error occurred when retrieving the client TLS certificate.'})
  139. end.
  140. before_loop(State=#state{socket=Socket, transport=Transport}, Buffer) ->
  141. %% @todo disable this when we get to the body, until the stream asks for it?
  142. %% Perhaps have a threshold for how much we're willing to read before waiting.
  143. Transport:setopts(Socket, [{active, once}]),
  144. loop(State, Buffer).
  145. loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
  146. timer=TimerRef, children=Children, streams=Streams}, Buffer) ->
  147. {OK, Closed, Error} = Transport:messages(),
  148. InactivityTimeout = maps:get(inactivity_timeout, Opts, 300000),
  149. receive
  150. %% Socket messages.
  151. {OK, Socket, Data} ->
  152. %% Only reset the timeout if it is idle_timeout (active streams).
  153. State1 = case Streams of
  154. [] -> State;
  155. _ -> set_timeout(State)
  156. end,
  157. parse(<< Buffer/binary, Data/binary >>, State1);
  158. {Closed, Socket} ->
  159. terminate(State, {socket_error, closed, 'The socket has been closed.'});
  160. {Error, Socket, Reason} ->
  161. terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
  162. %% Timeouts.
  163. {timeout, Ref, {shutdown, Pid}} ->
  164. cowboy_children:shutdown_timeout(Children, Ref, Pid),
  165. loop(State, Buffer);
  166. {timeout, TimerRef, Reason} ->
  167. timeout(State, Reason);
  168. {timeout, _, _} ->
  169. loop(State, Buffer);
  170. %% System messages.
  171. {'EXIT', Parent, Reason} ->
  172. %% @todo We should exit gracefully, if possible.
  173. exit(Reason);
  174. {system, From, Request} ->
  175. sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer});
  176. %% Messages pertaining to a stream.
  177. {{Pid, StreamID}, Msg} when Pid =:= self() ->
  178. loop(info(State, StreamID, Msg), Buffer);
  179. %% Exit signal from children.
  180. Msg = {'EXIT', Pid, _} ->
  181. loop(down(State, Pid, Msg), Buffer);
  182. %% Calls from supervisor module.
  183. {'$gen_call', From, Call} ->
  184. cowboy_children:handle_supervisor_call(Call, From, Children, ?MODULE),
  185. loop(State, Buffer);
  186. %% Unknown messages.
  187. Msg ->
  188. error_logger:error_msg("Received stray message ~p.~n", [Msg]),
  189. loop(State, Buffer)
  190. after InactivityTimeout ->
  191. terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
  192. end.
  193. %% We set request_timeout when there are no active streams,
  194. %% and idle_timeout otherwise.
  195. set_timeout(State0=#state{opts=Opts, streams=Streams}) ->
  196. State = cancel_timeout(State0),
  197. {Name, Default} = case Streams of
  198. [] -> {request_timeout, 5000};
  199. _ -> {idle_timeout, 60000}
  200. end,
  201. TimerRef = case maps:get(Name, Opts, Default) of
  202. infinity -> undefined;
  203. Timeout -> erlang:start_timer(Timeout, self(), Name)
  204. end,
  205. State#state{timer=TimerRef}.
  206. cancel_timeout(State=#state{timer=TimerRef}) ->
  207. ok = case TimerRef of
  208. undefined -> ok;
  209. _ -> erlang:cancel_timer(TimerRef, [{async, true}, {info, false}])
  210. end,
  211. State#state{timer=undefined}.
  212. -spec timeout(_, _) -> no_return().
  213. timeout(State=#state{in_state=#ps_request_line{}}, request_timeout) ->
  214. terminate(State, {connection_error, timeout,
  215. 'No request-line received before timeout.'});
  216. timeout(State=#state{in_state=#ps_header{}}, request_timeout) ->
  217. error_terminate(408, State, {connection_error, timeout,
  218. 'Request headers not received before timeout.'});
  219. timeout(State, idle_timeout) ->
  220. terminate(State, {connection_error, timeout,
  221. 'Connection idle longer than configuration allows.'}).
  222. %% Request-line.
  223. parse(<<>>, State) ->
  224. before_loop(State, <<>>);
  225. parse(Buffer, State=#state{in_state=#ps_request_line{empty_lines=EmptyLines}}) ->
  226. after_parse(parse_request(Buffer, State, EmptyLines));
  227. parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=undefined}}) ->
  228. after_parse(parse_header(Buffer,
  229. State#state{in_state=PS#ps_header{headers=undefined}},
  230. Headers));
  231. parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=Name}}) ->
  232. after_parse(parse_hd_before_value(Buffer,
  233. State#state{in_state=PS#ps_header{headers=undefined, name=undefined}},
  234. Headers, Name));
  235. parse(Buffer, State=#state{in_state=#ps_body{}}) ->
  236. %% @todo We do not want to get the body automatically if the request doesn't ask for it.
  237. %% We may want to get bodies that are below a threshold without waiting, and buffer them
  238. %% until the request asks, though.
  239. after_parse(parse_body(Buffer, State)).
  240. %% @todo Don't parse if body is finished but request isn't. Let's not parallelize for now.
  241. after_parse({request, Req=#{streamid := StreamID, method := Method,
  242. headers := Headers, version := Version},
  243. State0=#state{opts=Opts, streams=Streams0}, Buffer}) ->
  244. try cowboy_stream:init(StreamID, Req, Opts) of
  245. {Commands, StreamState} ->
  246. TE = maps:get(<<"te">>, Headers, undefined),
  247. Streams = [#stream{id=StreamID, state=StreamState,
  248. method=Method, version=Version, te=TE}|Streams0],
  249. State1 = case maybe_req_close(State0, Headers, Version) of
  250. close -> State0#state{streams=Streams, last_streamid=StreamID};
  251. keepalive -> State0#state{streams=Streams}
  252. end,
  253. State = set_timeout(State1),
  254. parse(Buffer, commands(State, StreamID, Commands))
  255. catch Class:Exception ->
  256. cowboy_stream:report_error(init,
  257. [StreamID, Req, Opts],
  258. Class, Exception, erlang:get_stacktrace()),
  259. early_error(500, State0, {internal_error, {Class, Exception},
  260. 'Unhandled exception in cowboy_stream:init/3.'}, Req),
  261. parse(Buffer, State0)
  262. end;
  263. %% Streams are sequential so the body is always about the last stream created
  264. %% unless that stream has terminated.
  265. after_parse({data, StreamID, IsFin, Data, State=#state{
  266. streams=Streams0=[Stream=#stream{id=StreamID, state=StreamState0}|_]}, Buffer}) ->
  267. try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
  268. {Commands, StreamState} ->
  269. Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
  270. Stream#stream{state=StreamState}),
  271. parse(Buffer, commands(State#state{streams=Streams}, StreamID, Commands))
  272. catch Class:Exception ->
  273. cowboy_stream:report_error(data,
  274. [StreamID, IsFin, Data, StreamState0],
  275. Class, Exception, erlang:get_stacktrace()),
  276. stream_reset(State, StreamID, {internal_error, {Class, Exception},
  277. 'Unhandled exception in cowboy_stream:data/4.'})
  278. end;
  279. %% No corresponding stream. We must skip the body of the previous request
  280. %% in order to process the next one.
  281. after_parse({data, _, _, _, State, Buffer}) ->
  282. before_loop(State, Buffer);
  283. after_parse({more, State, Buffer}) ->
  284. before_loop(State, Buffer).
  285. %% Request-line.
  286. -spec parse_request(Buffer, State, non_neg_integer())
  287. -> {request, cowboy_req:req(), State, Buffer}
  288. | {data, cowboy_stream:streamid(), cowboy_stream:fin(), binary(), State, Buffer}
  289. | {more, State, Buffer}
  290. when Buffer::binary(), State::#state{}.
  291. %% Empty lines must be using \r\n.
  292. parse_request(<< $\n, _/bits >>, State, _) ->
  293. error_terminate(400, State, {connection_error, protocol_error,
  294. 'Empty lines between requests must use the CRLF line terminator. (RFC7230 3.5)'});
  295. parse_request(<< $\s, _/bits >>, State, _) ->
  296. error_terminate(400, State, {connection_error, protocol_error,
  297. 'The request-line must not begin with a space. (RFC7230 3.1.1, RFC7230 3.5)'});
  298. %% We limit the length of the Request-line to MaxLength to avoid endlessly
  299. %% reading from the socket and eventually crashing.
  300. parse_request(Buffer, State=#state{opts=Opts, in_streamid=InStreamID}, EmptyLines) ->
  301. MaxLength = maps:get(max_request_line_length, Opts, 8000),
  302. MaxEmptyLines = maps:get(max_empty_lines, Opts, 5),
  303. case match_eol(Buffer, 0) of
  304. nomatch when byte_size(Buffer) > MaxLength ->
  305. error_terminate(414, State, {connection_error, limit_reached,
  306. 'The request-line length is larger than configuration allows. (RFC7230 3.1.1)'});
  307. nomatch ->
  308. {more, State#state{in_state=#ps_request_line{empty_lines=EmptyLines}}, Buffer};
  309. 1 when EmptyLines =:= MaxEmptyLines ->
  310. error_terminate(400, State, {connection_error, limit_reached,
  311. 'More empty lines were received than configuration allows. (RFC7230 3.5)'});
  312. 1 ->
  313. << _:16, Rest/bits >> = Buffer,
  314. parse_request(Rest, State, EmptyLines + 1);
  315. _ ->
  316. case Buffer of
  317. %% @todo * is only for server-wide OPTIONS request (RFC7230 5.3.4); tests
  318. << "OPTIONS * ", Rest/bits >> ->
  319. parse_version(Rest, State, <<"OPTIONS">>, undefined, <<"*">>, <<>>);
  320. <<"CONNECT ", _/bits>> ->
  321. error_terminate(501, State, {connection_error, no_error,
  322. 'The CONNECT method is currently not implemented. (RFC7231 4.3.6)'});
  323. <<"TRACE ", _/bits>> ->
  324. error_terminate(501, State, {connection_error, no_error,
  325. 'The TRACE method is currently not implemented. (RFC7231 4.3.8)'});
  326. %% Accept direct HTTP/2 only at the beginning of the connection.
  327. << "PRI * HTTP/2.0\r\n", _/bits >> when InStreamID =:= 1 ->
  328. %% @todo Might be worth throwing to get a clean stacktrace.
  329. http2_upgrade(State, Buffer);
  330. _ ->
  331. parse_method(Buffer, State, <<>>,
  332. maps:get(max_method_length, Opts, 32))
  333. end
  334. end.
  335. match_eol(<< $\n, _/bits >>, N) ->
  336. N;
  337. match_eol(<< _, Rest/bits >>, N) ->
  338. match_eol(Rest, N + 1);
  339. match_eol(_, _) ->
  340. nomatch.
  341. parse_method(_, State, _, 0) ->
  342. error_terminate(501, State, {connection_error, limit_reached,
  343. 'The method name is longer than configuration allows. (RFC7230 3.1.1)'});
  344. parse_method(<< C, Rest/bits >>, State, SoFar, Remaining) ->
  345. case C of
  346. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  347. 'The method name must not be followed with a line break. (RFC7230 3.1.1)'});
  348. $\s -> parse_uri(Rest, State, SoFar);
  349. _ when ?IS_TOKEN(C) -> parse_method(Rest, State, << SoFar/binary, C >>, Remaining - 1);
  350. _ -> error_terminate(400, State, {connection_error, protocol_error,
  351. 'The method name must contain only valid token characters. (RFC7230 3.1.1)'})
  352. end.
  353. parse_uri(<< H, T, T, P, "://", Rest/bits >>, State, Method)
  354. when H =:= $h orelse H =:= $H, T =:= $t orelse T =:= $T;
  355. P =:= $p orelse P =:= $P ->
  356. parse_uri_authority(Rest, State, Method);
  357. parse_uri(<< H, T, T, P, S, "://", Rest/bits >>, State, Method)
  358. when H =:= $h orelse H =:= $H, T =:= $t orelse T =:= $T;
  359. P =:= $p orelse P =:= $P; S =:= $s orelse S =:= $S ->
  360. parse_uri_authority(Rest, State, Method);
  361. parse_uri(<< $/, Rest/bits >>, State, Method) ->
  362. parse_uri_path(Rest, State, Method, undefined, <<$/>>);
  363. parse_uri(_, State, _) ->
  364. error_terminate(400, State, {connection_error, protocol_error,
  365. 'Invalid request-line or request-target. (RFC7230 3.1.1, RFC7230 5.3)'}).
  366. %% @todo We probably want to apply max_authority_length also
  367. %% to the host header and to document this option. It might
  368. %% also be useful for HTTP/2 requests.
  369. parse_uri_authority(Rest, State=#state{opts=Opts}, Method) ->
  370. parse_uri_authority(Rest, State, Method, <<>>,
  371. maps:get(max_authority_length, Opts, 255)).
  372. parse_uri_authority(_, State, _, _, 0) ->
  373. error_terminate(414, State, {connection_error, limit_reached,
  374. 'The authority component of the absolute URI is longer than configuration allows. (RFC7230 2.7.1)'});
  375. parse_uri_authority(<<C, Rest/bits>>, State, Method, SoFar, Remaining) ->
  376. case C of
  377. $\r ->
  378. error_terminate(400, State, {connection_error, protocol_error,
  379. 'The request-target must not be followed by a line break. (RFC7230 3.1.1)'});
  380. $@ ->
  381. error_terminate(400, State, {connection_error, protocol_error,
  382. 'Absolute URIs must not include a userinfo component. (RFC7230 2.7.1)'});
  383. C when SoFar =:= <<>> andalso
  384. ((C =:= $/) orelse (C =:= $\s) orelse (C =:= $?) orelse (C =:= $#)) ->
  385. error_terminate(400, State, {connection_error, protocol_error,
  386. 'Absolute URIs must include a non-empty host component. (RFC7230 2.7.1)'});
  387. $: when SoFar =:= <<>> ->
  388. error_terminate(400, State, {connection_error, protocol_error,
  389. 'Absolute URIs must include a non-empty host component. (RFC7230 2.7.1)'});
  390. $/ -> parse_uri_path(Rest, State, Method, SoFar, <<"/">>);
  391. $\s -> parse_version(Rest, State, Method, SoFar, <<"/">>, <<>>);
  392. $? -> parse_uri_query(Rest, State, Method, SoFar, <<"/">>, <<>>);
  393. $# -> skip_uri_fragment(Rest, State, Method, SoFar, <<"/">>, <<>>);
  394. C -> parse_uri_authority(Rest, State, Method, <<SoFar/binary, C>>, Remaining - 1)
  395. end.
  396. parse_uri_path(<<C, Rest/bits>>, State, Method, Authority, SoFar) ->
  397. case C of
  398. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  399. 'The request-target must not be followed by a line break. (RFC7230 3.1.1)'});
  400. $\s -> parse_version(Rest, State, Method, Authority, SoFar, <<>>);
  401. $? -> parse_uri_query(Rest, State, Method, Authority, SoFar, <<>>);
  402. $# -> skip_uri_fragment(Rest, State, Method, Authority, SoFar, <<>>);
  403. _ -> parse_uri_path(Rest, State, Method, Authority, <<SoFar/binary, C>>)
  404. end.
  405. parse_uri_query(<<C, Rest/bits>>, State, M, A, P, SoFar) ->
  406. case C of
  407. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  408. 'The request-target must not be followed by a line break. (RFC7230 3.1.1)'});
  409. $\s -> parse_version(Rest, State, M, A, P, SoFar);
  410. $# -> skip_uri_fragment(Rest, State, M, A, P, SoFar);
  411. _ -> parse_uri_query(Rest, State, M, A, P, <<SoFar/binary, C>>)
  412. end.
  413. skip_uri_fragment(<<C, Rest/bits>>, State, M, A, P, Q) ->
  414. case C of
  415. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  416. 'The request-target must not be followed by a line break. (RFC7230 3.1.1)'});
  417. $\s -> parse_version(Rest, State, M, A, P, Q);
  418. _ -> skip_uri_fragment(Rest, State, M, A, P, Q)
  419. end.
  420. parse_version(<< "HTTP/1.1\r\n", Rest/bits >>, State, M, A, P, Q) ->
  421. before_parse_headers(Rest, State, M, A, P, Q, 'HTTP/1.1');
  422. parse_version(<< "HTTP/1.0\r\n", Rest/bits >>, State, M, A, P, Q) ->
  423. before_parse_headers(Rest, State, M, A, P, Q, 'HTTP/1.0');
  424. parse_version(<< "HTTP/1.", _, C, _/bits >>, State, _, _, _, _) when C =:= $\s; C =:= $\t ->
  425. error_terminate(400, State, {connection_error, protocol_error,
  426. 'Whitespace is not allowed after the HTTP version. (RFC7230 3.1.1)'});
  427. parse_version(<< C, _/bits >>, State, _, _, _, _) when C =:= $\s; C =:= $\t ->
  428. error_terminate(400, State, {connection_error, protocol_error,
  429. 'The separator between request target and version must be a single SP. (RFC7230 3.1.1)'});
  430. parse_version(_, State, _, _, _, _) ->
  431. error_terminate(505, State, {connection_error, protocol_error,
  432. 'Unsupported HTTP version. (RFC7230 2.6)'}).
  433. before_parse_headers(Rest, State, M, A, P, Q, V) ->
  434. parse_header(Rest, State#state{in_state=#ps_header{
  435. method=M, authority=A, path=P, qs=Q, version=V}}, #{}).
  436. %% Headers.
  437. %% We need two or more bytes in the buffer to continue.
  438. parse_header(Rest, State=#state{in_state=PS}, Headers) when byte_size(Rest) < 2 ->
  439. {more, State#state{in_state=PS#ps_header{headers=Headers}}, Rest};
  440. parse_header(<< $\r, $\n, Rest/bits >>, S, Headers) ->
  441. request(Rest, S, Headers);
  442. parse_header(Buffer, State=#state{opts=Opts, in_state=PS}, Headers) ->
  443. MaxHeaders = maps:get(max_headers, Opts, 100),
  444. NumHeaders = maps:size(Headers),
  445. if
  446. NumHeaders >= MaxHeaders ->
  447. error_terminate(431, State#state{in_state=PS#ps_header{headers=Headers}},
  448. {connection_error, limit_reached,
  449. 'The number of headers is larger than configuration allows. (RFC7230 3.2.5, RFC6585 5)'});
  450. true ->
  451. parse_header_colon(Buffer, State, Headers)
  452. end.
  453. parse_header_colon(Buffer, State=#state{opts=Opts, in_state=PS}, Headers) ->
  454. MaxLength = maps:get(max_header_name_length, Opts, 64),
  455. case match_colon(Buffer, 0) of
  456. nomatch when byte_size(Buffer) > MaxLength ->
  457. error_terminate(431, State#state{in_state=PS#ps_header{headers=Headers}},
  458. {connection_error, limit_reached,
  459. 'A header name is larger than configuration allows. (RFC7230 3.2.5, RFC6585 5)'});
  460. nomatch ->
  461. {more, State#state{in_state=PS#ps_header{headers=Headers}}, Buffer};
  462. _ ->
  463. parse_hd_name(Buffer, State, Headers, <<>>)
  464. end.
  465. match_colon(<< $:, _/bits >>, N) ->
  466. N;
  467. match_colon(<< _, Rest/bits >>, N) ->
  468. match_colon(Rest, N + 1);
  469. match_colon(_, _) ->
  470. nomatch.
  471. parse_hd_name(<< $:, Rest/bits >>, State, H, SoFar) ->
  472. parse_hd_before_value(Rest, State, H, SoFar);
  473. parse_hd_name(<< C, _/bits >>, State=#state{in_state=PS}, H, <<>>) when ?IS_WS(C) ->
  474. error_terminate(400, State#state{in_state=PS#ps_header{headers=H}},
  475. {connection_error, protocol_error,
  476. 'Whitespace is not allowed before the header name. (RFC7230 3.2)'});
  477. parse_hd_name(<< C, _/bits >>, State=#state{in_state=PS}, H, _) when ?IS_WS(C) ->
  478. error_terminate(400, State#state{in_state=PS#ps_header{headers=H}},
  479. {connection_error, protocol_error,
  480. 'Whitespace is not allowed between the header name and the colon. (RFC7230 3.2.4)'});
  481. parse_hd_name(<< C, Rest/bits >>, State, H, SoFar) ->
  482. ?LOWER(parse_hd_name, Rest, State, H, SoFar).
  483. parse_hd_before_value(<< $\s, Rest/bits >>, S, H, N) ->
  484. parse_hd_before_value(Rest, S, H, N);
  485. parse_hd_before_value(<< $\t, Rest/bits >>, S, H, N) ->
  486. parse_hd_before_value(Rest, S, H, N);
  487. parse_hd_before_value(Buffer, State=#state{opts=Opts, in_state=PS}, H, N) ->
  488. MaxLength = maps:get(max_header_value_length, Opts, 4096),
  489. case match_eol(Buffer, 0) of
  490. nomatch when byte_size(Buffer) > MaxLength ->
  491. error_terminate(431, State#state{in_state=PS#ps_header{headers=H}},
  492. {connection_error, limit_reached,
  493. 'A header value is larger than configuration allows. (RFC7230 3.2.5, RFC6585 5)'});
  494. nomatch ->
  495. {more, State#state{in_state=PS#ps_header{headers=H, name=N}}, Buffer};
  496. _ ->
  497. parse_hd_value(Buffer, State, H, N, <<>>)
  498. end.
  499. parse_hd_value(<< $\r, $\n, Rest/bits >>, S, Headers0, Name, SoFar) ->
  500. Value = clean_value_ws_end(SoFar, byte_size(SoFar) - 1),
  501. Headers = case maps:get(Name, Headers0, undefined) of
  502. undefined -> Headers0#{Name => Value};
  503. %% The cookie header does not use proper HTTP header lists.
  504. Value0 when Name =:= <<"cookie">> -> Headers0#{Name => << Value0/binary, "; ", Value/binary >>};
  505. Value0 -> Headers0#{Name => << Value0/binary, ", ", Value/binary >>}
  506. end,
  507. parse_header(Rest, S, Headers);
  508. parse_hd_value(<< C, Rest/bits >>, S, H, N, SoFar) ->
  509. parse_hd_value(Rest, S, H, N, << SoFar/binary, C >>).
  510. clean_value_ws_end(_, -1) ->
  511. <<>>;
  512. clean_value_ws_end(Value, N) ->
  513. case binary:at(Value, N) of
  514. $\s -> clean_value_ws_end(Value, N - 1);
  515. $\t -> clean_value_ws_end(Value, N - 1);
  516. _ ->
  517. S = N + 1,
  518. << Value2:S/binary, _/bits >> = Value,
  519. Value2
  520. end.
  521. -ifdef(TEST).
  522. clean_value_ws_end_test_() ->
  523. Tests = [
  524. {<<>>, <<>>},
  525. {<<" ">>, <<>>},
  526. {<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
  527. "text/html;level=2;q=0.4, */*;q=0.5 \t \t ">>,
  528. <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
  529. "text/html;level=2;q=0.4, */*;q=0.5">>}
  530. ],
  531. [{V, fun() -> R = clean_value_ws_end(V, byte_size(V) - 1) end} || {V, R} <- Tests].
  532. horse_clean_value_ws_end() ->
  533. horse:repeat(200000,
  534. clean_value_ws_end(
  535. <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
  536. "text/html;level=2;q=0.4, */*;q=0.5 ">>,
  537. byte_size(<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
  538. "text/html;level=2;q=0.4, */*;q=0.5 ">>) - 1)
  539. ).
  540. -endif.
  541. request(Buffer, State=#state{transport=Transport, in_streamid=StreamID,
  542. in_state=PS=#ps_header{authority=Authority, version=Version}}, Headers) ->
  543. case maps:get(<<"host">>, Headers, undefined) of
  544. undefined when Version =:= 'HTTP/1.1' ->
  545. %% @todo Might want to not close the connection on this and next one.
  546. error_terminate(400, State#state{in_state=PS#ps_header{headers=Headers}},
  547. {stream_error, StreamID, protocol_error,
  548. 'HTTP/1.1 requests must include a host header. (RFC7230 5.4)'});
  549. undefined ->
  550. request(Buffer, State, Headers, <<>>, default_port(Transport:secure()));
  551. %% @todo When CONNECT requests come in we need to ignore the RawHost
  552. %% and instead use the Authority as the source of host.
  553. RawHost when Authority =:= undefined; Authority =:= RawHost ->
  554. request_parse_host(Buffer, State, Headers, RawHost);
  555. %% RFC7230 does not explicitly ask us to reject requests
  556. %% that have a different authority component and host header.
  557. %% However it DOES ask clients to set them to the same value,
  558. %% so we enforce that.
  559. _ ->
  560. error_terminate(400, State#state{in_state=PS#ps_header{headers=Headers}},
  561. {stream_error, StreamID, protocol_error,
  562. 'The host header is different than the absolute-form authority component. (RFC7230 5.4)'})
  563. end.
  564. request_parse_host(Buffer, State=#state{transport=Transport,
  565. in_streamid=StreamID, in_state=PS}, Headers, RawHost) ->
  566. try cow_http_hd:parse_host(RawHost) of
  567. {Host, undefined} ->
  568. request(Buffer, State, Headers, Host, default_port(Transport:secure()));
  569. {Host, Port} when Port > 0, Port =< 65535 ->
  570. request(Buffer, State, Headers, Host, Port);
  571. _ ->
  572. error_terminate(400, State, {stream_error, StreamID, protocol_error,
  573. 'The port component of the absolute-form is not in the range 0..65535. (RFC7230 2.7.1)'})
  574. catch _:_ ->
  575. error_terminate(400, State#state{in_state=PS#ps_header{headers=Headers}},
  576. {stream_error, StreamID, protocol_error,
  577. 'The host header is invalid. (RFC7230 5.4)'})
  578. end.
  579. -spec default_port(boolean()) -> 80 | 443.
  580. default_port(true) -> 443;
  581. default_port(_) -> 80.
  582. %% End of request parsing.
  583. request(Buffer, State0=#state{ref=Ref, transport=Transport, peer=Peer, sock=Sock, cert=Cert,
  584. in_streamid=StreamID, in_state=
  585. PS=#ps_header{method=Method, path=Path, qs=Qs, version=Version}},
  586. Headers0, Host, Port) ->
  587. Scheme = case Transport:secure() of
  588. true -> <<"https">>;
  589. false -> <<"http">>
  590. end,
  591. {Headers, HasBody, BodyLength, TDecodeFun, TDecodeState} = case Headers0 of
  592. #{<<"transfer-encoding">> := TransferEncoding0} ->
  593. try cow_http_hd:parse_transfer_encoding(TransferEncoding0) of
  594. [<<"chunked">>] ->
  595. {maps:remove(<<"content-length">>, Headers0),
  596. true, undefined, fun cow_http_te:stream_chunked/2, {0, 0}};
  597. _ ->
  598. error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers0}},
  599. {stream_error, StreamID, protocol_error,
  600. 'Cowboy only supports transfer-encoding: chunked. (RFC7230 3.3.1)'})
  601. catch _:_ ->
  602. error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers0}},
  603. {stream_error, StreamID, protocol_error,
  604. 'The transfer-encoding header is invalid. (RFC7230 3.3.1)'})
  605. end;
  606. #{<<"content-length">> := <<"0">>} ->
  607. {Headers0, false, 0, undefined, undefined};
  608. #{<<"content-length">> := BinLength} ->
  609. Length = try
  610. cow_http_hd:parse_content_length(BinLength)
  611. catch _:_ ->
  612. error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers0}},
  613. {stream_error, StreamID, protocol_error,
  614. 'The content-length header is invalid. (RFC7230 3.3.2)'})
  615. end,
  616. {Headers0, true, Length, fun cow_http_te:stream_identity/2, {0, Length}};
  617. _ ->
  618. {Headers0, false, 0, undefined, undefined}
  619. end,
  620. Req = #{
  621. ref => Ref,
  622. pid => self(),
  623. streamid => StreamID,
  624. peer => Peer,
  625. sock => Sock,
  626. cert => Cert,
  627. method => Method,
  628. scheme => Scheme,
  629. host => Host,
  630. port => Port,
  631. path => Path,
  632. qs => Qs,
  633. version => Version,
  634. %% We are transparently taking care of transfer-encodings so
  635. %% the user code has no need to know about it.
  636. headers => maps:remove(<<"transfer-encoding">>, Headers),
  637. has_body => HasBody,
  638. body_length => BodyLength
  639. },
  640. case is_http2_upgrade(Headers, Version) of
  641. false ->
  642. State = case HasBody of
  643. true ->
  644. State0#state{in_state=#ps_body{
  645. length = BodyLength,
  646. transfer_decode_fun = TDecodeFun,
  647. transfer_decode_state = TDecodeState
  648. }};
  649. false ->
  650. State0#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}
  651. end,
  652. {request, Req, State, Buffer};
  653. {true, HTTP2Settings} ->
  654. %% We save the headers in case the upgrade will fail
  655. %% and we need to pass them to cowboy_stream:early_error.
  656. http2_upgrade(State0#state{in_state=PS#ps_header{headers=Headers}},
  657. Buffer, HTTP2Settings, Req)
  658. end.
  659. %% HTTP/2 upgrade.
  660. %% @todo We must not upgrade to h2c over a TLS connection.
  661. is_http2_upgrade(#{<<"connection">> := Conn, <<"upgrade">> := Upgrade,
  662. <<"http2-settings">> := HTTP2Settings}, 'HTTP/1.1') ->
  663. Conns = cow_http_hd:parse_connection(Conn),
  664. case {lists:member(<<"upgrade">>, Conns), lists:member(<<"http2-settings">>, Conns)} of
  665. {true, true} ->
  666. Protocols = cow_http_hd:parse_upgrade(Upgrade),
  667. case lists:member(<<"h2c">>, Protocols) of
  668. true ->
  669. {true, HTTP2Settings};
  670. false ->
  671. false
  672. end;
  673. _ ->
  674. false
  675. end;
  676. is_http2_upgrade(_, _) ->
  677. false.
  678. %% Prior knowledge upgrade, without an HTTP/1.1 request.
  679. http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
  680. opts=Opts, peer=Peer, sock=Sock, cert=Cert}, Buffer) ->
  681. case Transport:secure() of
  682. false ->
  683. _ = cancel_timeout(State),
  684. cowboy_http2:init(Parent, Ref, Socket, Transport, Opts,
  685. Peer, Sock, Cert, Buffer);
  686. true ->
  687. error_terminate(400, State, {connection_error, protocol_error,
  688. 'Clients that support HTTP/2 over TLS MUST use ALPN. (RFC7540 3.4)'})
  689. end.
  690. %% Upgrade via an HTTP/1.1 request.
  691. http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
  692. opts=Opts, peer=Peer, sock=Sock, cert=Cert}, Buffer, HTTP2Settings, Req) ->
  693. %% @todo
  694. %% However if the client sent a body, we need to read the body in full
  695. %% and if we can't do that, return a 413 response. Some options are in order.
  696. %% Always half-closed stream coming from this side.
  697. try cow_http_hd:parse_http2_settings(HTTP2Settings) of
  698. Settings ->
  699. _ = cancel_timeout(State),
  700. cowboy_http2:init(Parent, Ref, Socket, Transport, Opts,
  701. Peer, Sock, Cert, Buffer, Settings, Req)
  702. catch _:_ ->
  703. error_terminate(400, State, {connection_error, protocol_error,
  704. 'The HTTP2-Settings header must contain a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'})
  705. end.
  706. %% Request body parsing.
  707. parse_body(Buffer, State=#state{in_streamid=StreamID, in_state=
  708. PS=#ps_body{received=Received, transfer_decode_fun=TDecode,
  709. transfer_decode_state=TState0}}) ->
  710. %% @todo Proper trailers.
  711. try TDecode(Buffer, TState0) of
  712. more ->
  713. %% @todo Asks for 0 or more bytes.
  714. {more, State, Buffer};
  715. {more, Data, TState} ->
  716. %% @todo Asks for 0 or more bytes.
  717. {data, StreamID, nofin, Data, State#state{in_state=
  718. PS#ps_body{received=Received + byte_size(Data),
  719. transfer_decode_state=TState}}, <<>>};
  720. {more, Data, _Length, TState} when is_integer(_Length) ->
  721. %% @todo Asks for Length more bytes.
  722. {data, StreamID, nofin, Data, State#state{in_state=
  723. PS#ps_body{received=Received + byte_size(Data),
  724. transfer_decode_state=TState}}, <<>>};
  725. {more, Data, Rest, TState} ->
  726. %% @todo Asks for 0 or more bytes.
  727. {data, StreamID, nofin, Data, State#state{in_state=
  728. PS#ps_body{received=Received + byte_size(Data),
  729. transfer_decode_state=TState}}, Rest};
  730. {done, _HasTrailers, Rest} ->
  731. {data, StreamID, fin, <<>>, set_timeout(
  732. State#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}), Rest};
  733. {done, Data, _HasTrailers, Rest} ->
  734. {data, StreamID, fin, Data, set_timeout(
  735. State#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}), Rest}
  736. catch _:_ ->
  737. Reason = {connection_error, protocol_error,
  738. 'Failure to decode the content. (RFC7230 4)'},
  739. terminate(stream_terminate(State, StreamID, Reason), Reason)
  740. end.
  741. %% Message handling.
  742. down(State=#state{children=Children0}, Pid, Msg) ->
  743. case cowboy_children:down(Children0, Pid) of
  744. %% The stream was terminated already.
  745. {ok, undefined, Children} ->
  746. State#state{children=Children};
  747. %% The stream is still running.
  748. {ok, StreamID, Children} ->
  749. info(State#state{children=Children}, StreamID, Msg);
  750. %% The process was unknown.
  751. error ->
  752. error_logger:error_msg("Received EXIT signal ~p for unknown process ~p.~n", [Msg, Pid]),
  753. State
  754. end.
  755. info(State=#state{streams=Streams0}, StreamID, Msg) ->
  756. case lists:keyfind(StreamID, #stream.id, Streams0) of
  757. Stream = #stream{state=StreamState0} ->
  758. try cowboy_stream:info(StreamID, Msg, StreamState0) of
  759. {Commands, StreamState} ->
  760. Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
  761. Stream#stream{state=StreamState}),
  762. commands(State#state{streams=Streams}, StreamID, Commands)
  763. catch Class:Exception ->
  764. cowboy_stream:report_error(info,
  765. [StreamID, Msg, StreamState0],
  766. Class, Exception, erlang:get_stacktrace()),
  767. stream_reset(State, StreamID, {internal_error, {Class, Exception},
  768. 'Unhandled exception in cowboy_stream:info/3.'})
  769. end;
  770. false ->
  771. error_logger:error_msg("Received message ~p for unknown stream ~p.~n", [Msg, StreamID]),
  772. State
  773. end.
  774. %% Commands.
  775. commands(State, _, []) ->
  776. State;
  777. %% Supervise a child process.
  778. commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) ->
  779. commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)},
  780. StreamID, Tail);
  781. %% Error handling.
  782. commands(State, StreamID, [Error = {internal_error, _, _}|Tail]) ->
  783. commands(stream_reset(State, StreamID, Error), StreamID, Tail);
  784. %% Commands for a stream currently inactive.
  785. commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Commands)
  786. when Current =/= StreamID ->
  787. %% @todo We still want to handle some commands...
  788. Stream = #stream{queue=Queue} = lists:keyfind(StreamID, #stream.id, Streams0),
  789. Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
  790. Stream#stream{queue=Queue ++ Commands}),
  791. State#state{streams=Streams};
  792. %% Read the request body.
  793. commands(State, StreamID, [{flow, _Length}|Tail]) ->
  794. %% @todo We only read from socket if buffer is empty, otherwise
  795. %% we decode the buffer.
  796. %% @todo Set the body reading length to min(Length, BodyLength)
  797. commands(State, StreamID, Tail);
  798. %% Error responses are sent only if a response wasn't sent already.
  799. commands(State=#state{out_state=wait}, StreamID, [{error_response, Status, Headers0, Body}|Tail]) ->
  800. %% We close the connection when the error response is 408, as it
  801. %% indicates a timeout and the RFC recommends that we stop here. (RFC7231 6.5.7)
  802. Headers = case Status of
  803. 408 -> Headers0#{<<"connection">> => <<"close">>};
  804. <<"408", _/bits>> -> Headers0#{<<"connection">> => <<"close">>};
  805. _ -> Headers0
  806. end,
  807. commands(State, StreamID, [{response, Status, Headers, Body}|Tail]);
  808. commands(State, StreamID, [{error_response, _, _, _}|Tail]) ->
  809. commands(State, StreamID, Tail);
  810. %% Send an informational response.
  811. commands(State=#state{socket=Socket, transport=Transport, out_state=wait, streams=Streams},
  812. StreamID, [{inform, StatusCode, Headers}|Tail]) ->
  813. %% @todo I'm pretty sure the last stream in the list is the one we want
  814. %% considering all others are queued.
  815. #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
  816. _ = case Version of
  817. 'HTTP/1.1' ->
  818. Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1',
  819. headers_to_list(Headers)));
  820. %% Do not send informational responses to HTTP/1.0 clients. (RFC7231 6.2)
  821. 'HTTP/1.0' ->
  822. ok
  823. end,
  824. commands(State, StreamID, Tail);
  825. %% Send a full response.
  826. %%
  827. %% @todo Kill the stream if it sent a response when one has already been sent.
  828. %% @todo Keep IsFin in the state.
  829. %% @todo Same two things above apply to DATA, possibly promise too.
  830. commands(State0=#state{socket=Socket, transport=Transport, out_state=wait, streams=Streams}, StreamID,
  831. [{response, StatusCode, Headers0, Body}|Tail]) ->
  832. %% @todo I'm pretty sure the last stream in the list is the one we want
  833. %% considering all others are queued.
  834. #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
  835. {State, Headers} = connection(State0, Headers0, StreamID, Version),
  836. %% @todo Ensure content-length is set.
  837. Response = cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers)),
  838. case Body of
  839. {sendfile, O, B, P} ->
  840. Transport:send(Socket, Response),
  841. commands(State, StreamID, [{sendfile, fin, O, B, P}|Tail]);
  842. _ ->
  843. Transport:send(Socket, [Response, Body]),
  844. commands(State#state{out_state=done}, StreamID, Tail)
  845. end;
  846. %% Send response headers and initiate chunked encoding.
  847. commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
  848. [{headers, StatusCode, Headers0}|Tail]) ->
  849. %% @todo Same as above.
  850. #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
  851. {State1, Headers1} = case Version of
  852. 'HTTP/1.1' ->
  853. {State0, Headers0#{<<"transfer-encoding">> => <<"chunked">>}};
  854. %% Close the connection after streaming the data to HTTP/1.0 client.
  855. %% @todo I'm guessing we need to differentiate responses with a content-length and others.
  856. 'HTTP/1.0' ->
  857. {State0#state{last_streamid=StreamID}, Headers0}
  858. end,
  859. {State, Headers} = connection(State1, Headers1, StreamID, Version),
  860. Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers))),
  861. commands(State#state{out_state=chunked}, StreamID, Tail);
  862. %% Send a response body chunk.
  863. %%
  864. %% @todo WINDOW_UPDATE stuff require us to buffer some data.
  865. %% @todo We probably want to allow Data to be the {sendfile, ...} tuple also.
  866. commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
  867. [{data, IsFin, Data}|Tail]) ->
  868. %% Do not send anything when the user asks to send an empty
  869. %% data frame, as that would break the protocol.
  870. Size = iolist_size(Data),
  871. case Size of
  872. 0 ->
  873. %% We send the last chunk only if version is HTTP/1.1 and IsFin=fin.
  874. case lists:keyfind(StreamID, #stream.id, Streams) of
  875. #stream{method= <<"HEAD">>} ->
  876. ok;
  877. #stream{version='HTTP/1.1'} when IsFin =:= fin ->
  878. Transport:send(Socket, <<"0\r\n\r\n">>);
  879. _ ->
  880. ok
  881. end;
  882. _ ->
  883. %% @todo We need to kill the stream if it tries to send data before headers.
  884. %% @todo Same as above.
  885. case lists:keyfind(StreamID, #stream.id, Streams) of
  886. #stream{method= <<"HEAD">>} ->
  887. ok;
  888. #stream{version='HTTP/1.1'} ->
  889. Transport:send(Socket, [
  890. integer_to_binary(Size, 16), <<"\r\n">>, Data,
  891. case IsFin of
  892. fin -> <<"\r\n0\r\n\r\n">>;
  893. nofin -> <<"\r\n">>
  894. end
  895. ]);
  896. #stream{version='HTTP/1.0'} ->
  897. Transport:send(Socket, Data)
  898. end
  899. end,
  900. State = case IsFin of
  901. fin -> State0#state{out_state=done};
  902. nofin -> State0
  903. end,
  904. commands(State, StreamID, Tail);
  905. %% Send trailers.
  906. commands(State=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
  907. [{trailers, Trailers}|Tail]) ->
  908. TE = case lists:keyfind(StreamID, #stream.id, Streams) of
  909. %% HTTP/1.0 doesn't support chunked transfer-encoding.
  910. #stream{version='HTTP/1.0'} ->
  911. not_chunked;
  912. %% No TE header was sent.
  913. #stream{te=undefined} ->
  914. no_trailers;
  915. #stream{te=TE0} ->
  916. try cow_http_hd:parse_te(TE0) of
  917. {TE1, _} -> TE1
  918. catch _:_ ->
  919. %% If we can't parse the TE header, assume we can't send trailers.
  920. no_trailers
  921. end
  922. end,
  923. case TE of
  924. trailers ->
  925. Transport:send(Socket, [
  926. <<"0\r\n">>,
  927. cow_http:headers(maps:to_list(Trailers)),
  928. <<"\r\n">>
  929. ]);
  930. no_trailers ->
  931. Transport:send(Socket, <<"0\r\n\r\n">>);
  932. not_chunked ->
  933. ok
  934. end,
  935. commands(State#state{out_state=done}, StreamID, Tail);
  936. %% Send a file.
  937. commands(State0=#state{socket=Socket, transport=Transport}, StreamID,
  938. [{sendfile, IsFin, Offset, Bytes, Path}|Tail]) ->
  939. %% We wrap the sendfile call into a try/catch because on OTP-20
  940. %% and earlier a few different crashes could occur for sockets
  941. %% that were closing or closed. For example a badarg in
  942. %% erlang:port_get_data(#Port<...>) or a badmatch like
  943. %% {{badmatch,{error,einval}},[{prim_file,sendfile,8,[]}...
  944. %%
  945. %% OTP-21 uses a NIF instead of a port so the implementation
  946. %% and behavior has dramatically changed and it is unclear
  947. %% whether it will be necessary in the future.
  948. %%
  949. %% This try/catch prevents some noisy logs to be written
  950. %% when these errors occur.
  951. try
  952. Transport:sendfile(Socket, Path, Offset, Bytes),
  953. State = case IsFin of
  954. fin -> State0#state{out_state=done}
  955. %% @todo Add the sendfile command.
  956. % nofin -> State0
  957. end,
  958. commands(State, StreamID, Tail)
  959. catch _:_ ->
  960. terminate(State0, {socket_error, sendfile_crash,
  961. 'An error occurred when using the sendfile function.'})
  962. end;
  963. %% Protocol takeover.
  964. commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport,
  965. opts=Opts, children=Children}, StreamID,
  966. [{switch_protocol, Headers, Protocol, InitialState}|_Tail]) ->
  967. %% @todo This should be the last stream running otherwise we need to wait before switching.
  968. %% @todo If there's streams opened after this one, fail instead of 101.
  969. State = cancel_timeout(State0),
  970. %% Before we send the 101 response we need to stop receiving data
  971. %% from the socket, otherwise the data might be receive before the
  972. %% call to flush/0 and we end up inadvertently dropping a packet.
  973. %%
  974. %% @todo Handle cases where the request came with a body. We need
  975. %% to process or skip the body before the upgrade can be completed.
  976. Transport:setopts(Socket, [{active, false}]),
  977. %% Send a 101 response, then terminate the stream.
  978. #state{streams=Streams} = info(State, StreamID, {inform, 101, Headers}),
  979. #stream{state=StreamState} = lists:keyfind(StreamID, #stream.id, Streams),
  980. %% @todo We need to shutdown processes here first.
  981. stream_call_terminate(StreamID, switch_protocol, StreamState),
  982. %% Terminate children processes and flush any remaining messages from the mailbox.
  983. cowboy_children:terminate(Children),
  984. flush(Parent),
  985. %% @todo This is no good because commands return a state normally and here it doesn't
  986. %% we need to let this module go entirely. Perhaps it should be handled directly in
  987. %% cowboy_clear/cowboy_tls?
  988. Protocol:takeover(Parent, Ref, Socket, Transport, Opts, <<>>, InitialState);
  989. %% Stream shutdown.
  990. commands(State, StreamID, [stop|Tail]) ->
  991. %% @todo Do we want to run the commands after a stop?
  992. %% @todo We currently wait for the stop command before we
  993. %% continue with the next request/response. In theory, if
  994. %% the request body was read fully and the response body
  995. %% was sent fully we should be able to start working on
  996. %% the next request concurrently. This can be done as a
  997. %% future optimization.
  998. maybe_terminate(State, StreamID, Tail);
  999. %% HTTP/1.1 does not support push; ignore.
  1000. commands(State, StreamID, [{push, _, _, _, _, _, _, _}|Tail]) ->
  1001. commands(State, StreamID, Tail).
  1002. %% The set-cookie header is special; we can only send one cookie per header.
  1003. headers_to_list(Headers0=#{<<"set-cookie">> := SetCookies}) ->
  1004. Headers1 = maps:to_list(maps:remove(<<"set-cookie">>, Headers0)),
  1005. Headers1 ++ [{<<"set-cookie">>, Value} || Value <- SetCookies];
  1006. headers_to_list(Headers) ->
  1007. maps:to_list(Headers).
  1008. %% Flush messages specific to cowboy_http before handing over the
  1009. %% connection to another protocol.
  1010. flush(Parent) ->
  1011. receive
  1012. {timeout, _, _} ->
  1013. flush(Parent);
  1014. {{Pid, _}, _} when Pid =:= self() ->
  1015. flush(Parent);
  1016. {'EXIT', Pid, _} when Pid =/= Parent ->
  1017. flush(Parent)
  1018. after 0 ->
  1019. ok
  1020. end.
  1021. %% @todo In these cases I'm not sure if we should continue processing commands.
  1022. maybe_terminate(State=#state{last_streamid=StreamID}, StreamID, _Tail) ->
  1023. terminate(stream_terminate(State, StreamID, normal), normal); %% @todo Reason ok?
  1024. maybe_terminate(State, StreamID, _Tail) ->
  1025. stream_terminate(State, StreamID, normal).
  1026. stream_reset(State, StreamID, StreamError={internal_error, _, _}) ->
  1027. %% @todo headers
  1028. %% @todo Don't send this if there are no streams left.
  1029. % Transport:send(Socket, cow_http:response(500, 'HTTP/1.1', [
  1030. % {<<"content-length">>, <<"0">>}
  1031. % ])),
  1032. %% @todo update IsFin local
  1033. % stream_terminate(State#state{out_state=done}, StreamID, StreamError).
  1034. stream_terminate(State, StreamID, StreamError).
  1035. stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InState,
  1036. out_streamid=OutStreamID, out_state=OutState, streams=Streams0,
  1037. children=Children0}, StreamID, Reason) ->
  1038. #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams0),
  1039. State1 = #state{streams=Streams1} = case OutState of
  1040. wait when element(1, Reason) =:= internal_error ->
  1041. info(State0, StreamID, {response, 500, #{<<"content-length">> => <<"0">>}, <<>>});
  1042. wait when element(1, Reason) =:= connection_error ->
  1043. info(State0, StreamID, {response, 400, #{<<"content-length">> => <<"0">>}, <<>>});
  1044. wait ->
  1045. info(State0, StreamID, {response, 204, #{}, <<>>});
  1046. chunked when Version =:= 'HTTP/1.1' ->
  1047. info(State0, StreamID, {data, fin, <<>>});
  1048. _ -> %% done or Version =:= 'HTTP/1.0'
  1049. State0
  1050. end,
  1051. %% Remove the stream from the state.
  1052. {value, #stream{state=StreamState}, Streams}
  1053. = lists:keytake(StreamID, #stream.id, Streams1),
  1054. State2 = State1#state{streams=Streams},
  1055. %% Stop the stream.
  1056. stream_call_terminate(StreamID, Reason, StreamState),
  1057. Children = cowboy_children:shutdown(Children0, StreamID),
  1058. %% We reset the timeout if there are no active streams anymore.
  1059. State = case Streams of
  1060. [] -> set_timeout(State2);
  1061. _ -> State2
  1062. end,
  1063. %% We want to drop the connection if the body was not read fully
  1064. %% and we don't know its length or more remains to be read than
  1065. %% configuration allows.
  1066. %% @todo Only do this if Current =:= StreamID.
  1067. MaxSkipBodyLength = maps:get(max_skip_body_length, Opts, 1000000),
  1068. case InState of
  1069. #ps_body{length=undefined}
  1070. when InStreamID =:= OutStreamID ->
  1071. terminate(State#state{streams=Streams, children=Children}, skip_body_unknown_length);
  1072. #ps_body{length=Len, received=Received}
  1073. when InStreamID =:= OutStreamID, Received + MaxSkipBodyLength < Len ->
  1074. terminate(State#state{streams=Streams, children=Children}, skip_body_too_large);
  1075. _ ->
  1076. %% Move on to the next stream.
  1077. NextOutStreamID = OutStreamID + 1,
  1078. case lists:keyfind(NextOutStreamID, #stream.id, Streams) of
  1079. false ->
  1080. %% @todo This is clearly wrong, if the stream is gone we need to check if
  1081. %% there used to be such a stream, and if there was to send an error.
  1082. State#state{out_streamid=NextOutStreamID, out_state=wait,
  1083. streams=Streams, children=Children};
  1084. #stream{queue=Commands} ->
  1085. %% @todo Remove queue from the stream.
  1086. commands(State#state{out_streamid=NextOutStreamID, out_state=wait,
  1087. streams=Streams, children=Children}, NextOutStreamID, Commands)
  1088. end
  1089. end.
  1090. stream_call_terminate(StreamID, Reason, StreamState) ->
  1091. try
  1092. cowboy_stream:terminate(StreamID, Reason, StreamState)
  1093. catch Class:Exception ->
  1094. cowboy_stream:report_error(terminate,
  1095. [StreamID, Reason, StreamState],
  1096. Class, Exception, erlang:get_stacktrace())
  1097. end.
  1098. %% @todo max_reqs also
  1099. maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.0') ->
  1100. Conns = cow_http_hd:parse_connection(Conn),
  1101. case lists:member(<<"keep-alive">>, Conns) of
  1102. true -> keepalive;
  1103. false -> close
  1104. end;
  1105. maybe_req_close(_, _, 'HTTP/1.0') ->
  1106. close;
  1107. maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.1') ->
  1108. case connection_hd_is_close(Conn) of
  1109. true -> close;
  1110. false -> keepalive
  1111. end;
  1112. maybe_req_close(_State, _, _) ->
  1113. keepalive.
  1114. connection(State=#state{last_streamid=StreamID}, Headers=#{<<"connection">> := Conn}, StreamID, _) ->
  1115. case connection_hd_is_close(Conn) of
  1116. true -> {State, Headers};
  1117. %% @todo Here we need to remove keep-alive and add close, not just add close.
  1118. false -> {State, Headers#{<<"connection">> => [<<"close, ">>, Conn]}}
  1119. end;
  1120. connection(State=#state{last_streamid=StreamID}, Headers, StreamID, _) ->
  1121. {State, Headers#{<<"connection">> => <<"close">>}};
  1122. connection(State, Headers=#{<<"connection">> := Conn}, StreamID, _) ->
  1123. case connection_hd_is_close(Conn) of
  1124. true -> {State#state{last_streamid=StreamID}, Headers};
  1125. %% @todo Here we need to set keep-alive only if it wasn't set before.
  1126. false -> {State, Headers}
  1127. end;
  1128. connection(State, Headers, _, 'HTTP/1.0') ->
  1129. {State, Headers#{<<"connection">> => <<"keep-alive">>}};
  1130. connection(State, Headers, _, _) ->
  1131. {State, Headers}.
  1132. connection_hd_is_close(Conn) ->
  1133. Conns = cow_http_hd:parse_connection(iolist_to_binary(Conn)),
  1134. lists:member(<<"close">>, Conns).
  1135. %% This function is only called when an error occurs on a new stream.
  1136. -spec error_terminate(cowboy:http_status(), #state{}, _) -> no_return().
  1137. error_terminate(StatusCode, State=#state{ref=Ref, peer=Peer, in_state=StreamState}, Reason) ->
  1138. PartialReq = case StreamState of
  1139. #ps_request_line{} -> #{
  1140. ref => Ref,
  1141. peer => Peer
  1142. };
  1143. #ps_header{method=Method, path=Path, qs=Qs,
  1144. version=Version, headers=ReqHeaders} -> #{
  1145. ref => Ref,
  1146. peer => Peer,
  1147. method => Method,
  1148. path => Path,
  1149. qs => Qs,
  1150. version => Version,
  1151. headers => case ReqHeaders of
  1152. undefined -> #{};
  1153. _ -> ReqHeaders
  1154. end
  1155. }
  1156. end,
  1157. early_error(StatusCode, State, Reason, PartialReq, #{<<"connection">> => <<"close">>}),
  1158. terminate(State, Reason).
  1159. early_error(StatusCode, State, Reason, PartialReq) ->
  1160. early_error(StatusCode, State, Reason, PartialReq, #{}).
  1161. early_error(StatusCode0, #state{socket=Socket, transport=Transport,
  1162. opts=Opts, in_streamid=StreamID}, Reason, PartialReq, RespHeaders0) ->
  1163. RespHeaders1 = RespHeaders0#{<<"content-length">> => <<"0">>},
  1164. Resp = {response, StatusCode0, RespHeaders1, <<>>},
  1165. try cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts) of
  1166. {response, StatusCode, RespHeaders, RespBody} ->
  1167. Transport:send(Socket, [
  1168. cow_http:response(StatusCode, 'HTTP/1.1', maps:to_list(RespHeaders)),
  1169. %% @todo We shouldn't send the body when the method is HEAD.
  1170. %% @todo Technically we allow the sendfile tuple.
  1171. RespBody
  1172. ])
  1173. catch Class:Exception ->
  1174. cowboy_stream:report_error(early_error,
  1175. [StreamID, Reason, PartialReq, Resp, Opts],
  1176. Class, Exception, erlang:get_stacktrace()),
  1177. %% We still need to send an error response, so send what we initially
  1178. %% wanted to send. It's better than nothing.
  1179. Transport:send(Socket, cow_http:response(StatusCode0,
  1180. 'HTTP/1.1', maps:to_list(RespHeaders1)))
  1181. end,
  1182. ok.
  1183. -spec terminate(_, _) -> no_return().
  1184. terminate(undefined, Reason) ->
  1185. exit({shutdown, Reason});
  1186. terminate(State=#state{streams=Streams, children=Children}, Reason) ->
  1187. terminate_all_streams(Streams, Reason),
  1188. cowboy_children:terminate(Children),
  1189. terminate_linger(State),
  1190. exit({shutdown, Reason}).
  1191. terminate_all_streams([], _) ->
  1192. ok;
  1193. terminate_all_streams([#stream{id=StreamID, state=StreamState}|Tail], Reason) ->
  1194. stream_call_terminate(StreamID, Reason, StreamState),
  1195. terminate_all_streams(Tail, Reason).
  1196. terminate_linger(State=#state{socket=Socket, transport=Transport, opts=Opts}) ->
  1197. case Transport:shutdown(Socket, write) of
  1198. ok ->
  1199. case maps:get(linger_timeout, Opts, 1000) of
  1200. 0 ->
  1201. ok;
  1202. infinity ->
  1203. terminate_linger_loop(State, undefined);
  1204. Timeout ->
  1205. TimerRef = erlang:start_timer(Timeout, self(), linger_timeout),
  1206. terminate_linger_loop(State, TimerRef)
  1207. end;
  1208. {error, _} ->
  1209. ok
  1210. end.
  1211. terminate_linger_loop(State=#state{socket=Socket, transport=Transport}, TimerRef) ->
  1212. {OK, Closed, Error} = Transport:messages(),
  1213. %% We may already have a message in the mailbox when we do this
  1214. %% but it's OK because we are shutting down anyway.
  1215. case Transport:setopts(Socket, [{active, once}]) of
  1216. ok ->
  1217. receive
  1218. {OK, Socket, _} ->
  1219. terminate_linger_loop(State, TimerRef);
  1220. {Closed, Socket} ->
  1221. ok;
  1222. {Error, Socket, _} ->
  1223. ok;
  1224. {timeout, TimerRef, linger_timeout} ->
  1225. ok;
  1226. _ ->
  1227. terminate_linger_loop(State, TimerRef)
  1228. end;
  1229. {error, _} ->
  1230. ok
  1231. end.
  1232. %% System callbacks.
  1233. -spec system_continue(_, _, {#state{}, binary()}) -> ok.
  1234. system_continue(_, _, {State, Buffer}) ->
  1235. loop(State, Buffer).
  1236. -spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return().
  1237. system_terminate(Reason, _, _, {State, _}) ->
  1238. %% @todo We should exit gracefully, if possible.
  1239. terminate(State, Reason).
  1240. -spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}.
  1241. system_code_change(Misc, _, _, _) ->
  1242. {ok, Misc}.