cowboy_http.erl 50 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303
  1. %% Copyright (c) 2016-2017, Loïc Hoguin <essen@ninenines.eu>
  2. %%
  3. %% Permission to use, copy, modify, and/or distribute this software for any
  4. %% purpose with or without fee is hereby granted, provided that the above
  5. %% copyright notice and this permission notice appear in all copies.
  6. %%
  7. %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  8. %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  9. %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  10. %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  11. %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  12. %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  13. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  14. -module(cowboy_http).
  15. -export([init/5]).
  16. -export([system_continue/3]).
  17. -export([system_terminate/4]).
  18. -export([system_code_change/4]).
  19. -type opts() :: #{
  20. connection_type => worker | supervisor,
  21. env => cowboy_middleware:env(),
  22. idle_timeout => timeout(),
  23. inactivity_timeout => timeout(),
  24. linger_timeout => timeout(),
  25. max_empty_lines => non_neg_integer(),
  26. max_header_name_length => non_neg_integer(),
  27. max_header_value_length => non_neg_integer(),
  28. max_headers => non_neg_integer(),
  29. max_keepalive => non_neg_integer(),
  30. max_method_length => non_neg_integer(),
  31. max_request_line_length => non_neg_integer(),
  32. middlewares => [module()],
  33. request_timeout => timeout(),
  34. shutdown_timeout => timeout(),
  35. stream_handlers => [module()]
  36. }.
  37. -export_type([opts/0]).
  38. -record(ps_request_line, {
  39. empty_lines = 0 :: non_neg_integer()
  40. }).
  41. -record(ps_header, {
  42. method = undefined :: binary(),
  43. path = undefined :: binary(),
  44. qs = undefined :: binary(),
  45. version = undefined :: cowboy:http_version(),
  46. headers = undefined :: map() | undefined, %% @todo better type than map()
  47. name = undefined :: binary() | undefined
  48. }).
  49. %% @todo We need a state where we wait for the stream process to ask for the body
  50. %% and do not attempt to read from the socket while in that state (we should read
  51. %% up to a certain length, and then wait, basically implementing flow control but
  52. %% by not reading from the socket when the window is empty).
  53. -record(ps_body, {
  54. length :: non_neg_integer() | undefined,
  55. received = 0 :: non_neg_integer(),
  56. %% @todo flow
  57. transfer_decode_fun :: fun(), %% @todo better type
  58. transfer_decode_state :: any() %% @todo better type
  59. }).
  60. -record(stream, {
  61. id = undefined :: cowboy_stream:streamid(),
  62. %% Stream handlers and their state.
  63. state = undefined :: {module(), any()},
  64. %% Request method.
  65. method = undefined :: binary(),
  66. %% Client HTTP version for this stream.
  67. version = undefined :: cowboy:http_version(),
  68. %% Unparsed te header. Used to know if we can send trailers.
  69. te :: undefined | binary(),
  70. %% Commands queued.
  71. queue = [] :: cowboy_stream:commands()
  72. }).
  73. -type stream() :: #stream{}.
  74. -record(state, {
  75. parent :: pid(),
  76. ref :: ranch:ref(),
  77. socket :: inet:socket(),
  78. transport :: module(),
  79. opts = #{} :: map(),
  80. %% Remote address and port for the connection.
  81. peer = undefined :: {inet:ip_address(), inet:port_number()},
  82. %% Local address and port for the connection.
  83. sock = undefined :: {inet:ip_address(), inet:port_number()},
  84. %% Client certificate (TLS only).
  85. cert :: undefined | binary(),
  86. timer = undefined :: undefined | reference(),
  87. %% Identifier for the stream currently being read (or waiting to be received).
  88. in_streamid = 1 :: pos_integer(),
  89. %% Parsing state for the current stream or stream-to-be.
  90. in_state = #ps_request_line{} :: #ps_request_line{} | #ps_header{} | #ps_body{},
  91. %% Identifier for the stream currently being written.
  92. %% Note that out_streamid =< in_streamid.
  93. out_streamid = 1 :: pos_integer(),
  94. %% Whether we finished writing data for the current stream.
  95. out_state = wait :: wait | chunked | done,
  96. %% The connection will be closed after this stream.
  97. last_streamid = undefined :: pos_integer(),
  98. %% Currently active HTTP/1.1 streams.
  99. streams = [] :: [stream()],
  100. %% Children processes created by streams.
  101. children = cowboy_children:init() :: cowboy_children:children()
  102. }).
  103. -include_lib("cowlib/include/cow_inline.hrl").
  104. -include_lib("cowlib/include/cow_parse.hrl").
  105. -spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts()) -> ok.
  106. init(Parent, Ref, Socket, Transport, Opts) ->
  107. Peer0 = Transport:peername(Socket),
  108. Sock0 = Transport:sockname(Socket),
  109. Cert1 = case Transport:name() of
  110. ssl ->
  111. case ssl:peercert(Socket) of
  112. {error, no_peercert} ->
  113. {ok, undefined};
  114. Cert0 ->
  115. Cert0
  116. end;
  117. _ ->
  118. {ok, undefined}
  119. end,
  120. case {Peer0, Sock0, Cert1} of
  121. {{ok, Peer}, {ok, Sock}, {ok, Cert}} ->
  122. LastStreamID = maps:get(max_keepalive, Opts, 100),
  123. before_loop(set_timeout(#state{
  124. parent=Parent, ref=Ref, socket=Socket,
  125. transport=Transport, opts=Opts,
  126. peer=Peer, sock=Sock, cert=Cert,
  127. last_streamid=LastStreamID}), <<>>);
  128. {{error, Reason}, _, _} ->
  129. terminate(undefined, {socket_error, Reason,
  130. 'A socket error occurred when retrieving the peer name.'});
  131. {_, {error, Reason}, _} ->
  132. terminate(undefined, {socket_error, Reason,
  133. 'A socket error occurred when retrieving the sock name.'});
  134. {_, _, {error, Reason}} ->
  135. terminate(undefined, {socket_error, Reason,
  136. 'A socket error occurred when retrieving the client TLS certificate.'})
  137. end.
  138. before_loop(State=#state{socket=Socket, transport=Transport}, Buffer) ->
  139. %% @todo disable this when we get to the body, until the stream asks for it?
  140. %% Perhaps have a threshold for how much we're willing to read before waiting.
  141. Transport:setopts(Socket, [{active, once}]),
  142. loop(State, Buffer).
  143. loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
  144. timer=TimerRef, children=Children, streams=Streams}, Buffer) ->
  145. {OK, Closed, Error} = Transport:messages(),
  146. InactivityTimeout = maps:get(inactivity_timeout, Opts, 300000),
  147. receive
  148. %% Socket messages.
  149. {OK, Socket, Data} ->
  150. %% Only reset the timeout if it is idle_timeout (active streams).
  151. State1 = case Streams of
  152. [] -> State;
  153. _ -> set_timeout(State)
  154. end,
  155. parse(<< Buffer/binary, Data/binary >>, State1);
  156. {Closed, Socket} ->
  157. terminate(State, {socket_error, closed, 'The socket has been closed.'});
  158. {Error, Socket, Reason} ->
  159. terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
  160. %% Timeouts.
  161. {timeout, Ref, {shutdown, Pid}} ->
  162. cowboy_children:shutdown_timeout(Children, Ref, Pid),
  163. loop(State, Buffer);
  164. {timeout, TimerRef, Reason} ->
  165. timeout(State, Reason);
  166. {timeout, _, _} ->
  167. loop(State, Buffer);
  168. %% System messages.
  169. {'EXIT', Parent, Reason} ->
  170. %% @todo We should exit gracefully, if possible.
  171. exit(Reason);
  172. {system, From, Request} ->
  173. sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer});
  174. %% Messages pertaining to a stream.
  175. {{Pid, StreamID}, Msg} when Pid =:= self() ->
  176. loop(info(State, StreamID, Msg), Buffer);
  177. %% Exit signal from children.
  178. Msg = {'EXIT', Pid, _} ->
  179. loop(down(State, Pid, Msg), Buffer);
  180. %% Calls from supervisor module.
  181. {'$gen_call', From, Call} ->
  182. cowboy_children:handle_supervisor_call(Call, From, Children, ?MODULE),
  183. loop(State, Buffer);
  184. %% Unknown messages.
  185. Msg ->
  186. error_logger:error_msg("Received stray message ~p.~n", [Msg]),
  187. loop(State, Buffer)
  188. after InactivityTimeout ->
  189. terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
  190. end.
  191. %% We set request_timeout when there are no active streams,
  192. %% and idle_timeout otherwise.
  193. set_timeout(State0=#state{opts=Opts, streams=Streams}) ->
  194. State = cancel_timeout(State0),
  195. {Name, Default} = case Streams of
  196. [] -> {request_timeout, 5000};
  197. _ -> {idle_timeout, 60000}
  198. end,
  199. TimerRef = case maps:get(Name, Opts, Default) of
  200. infinity -> undefined;
  201. Timeout -> erlang:start_timer(Timeout, self(), Name)
  202. end,
  203. State#state{timer=TimerRef}.
  204. cancel_timeout(State=#state{timer=TimerRef}) ->
  205. ok = case TimerRef of
  206. undefined -> ok;
  207. _ -> erlang:cancel_timer(TimerRef, [{async, true}, {info, false}])
  208. end,
  209. State#state{timer=undefined}.
  210. -spec timeout(_, _) -> no_return().
  211. timeout(State=#state{in_state=#ps_request_line{}}, request_timeout) ->
  212. terminate(State, {connection_error, timeout,
  213. 'No request-line received before timeout.'});
  214. timeout(State=#state{in_state=#ps_header{}}, request_timeout) ->
  215. error_terminate(408, State, {connection_error, timeout,
  216. 'Request headers not received before timeout.'});
  217. timeout(State, idle_timeout) ->
  218. terminate(State, {connection_error, timeout,
  219. 'Connection idle longer than configuration allows.'}).
  220. %% Request-line.
  221. parse(<<>>, State) ->
  222. before_loop(State, <<>>);
  223. parse(Buffer, State=#state{in_state=#ps_request_line{empty_lines=EmptyLines}}) ->
  224. after_parse(parse_request(Buffer, State, EmptyLines));
  225. parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=undefined}}) ->
  226. after_parse(parse_header(Buffer,
  227. State#state{in_state=PS#ps_header{headers=undefined}},
  228. Headers));
  229. parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=Name}}) ->
  230. after_parse(parse_hd_before_value(Buffer,
  231. State#state{in_state=PS#ps_header{headers=undefined, name=undefined}},
  232. Headers, Name));
  233. parse(Buffer, State=#state{in_state=#ps_body{}}) ->
  234. %% @todo We do not want to get the body automatically if the request doesn't ask for it.
  235. %% We may want to get bodies that are below a threshold without waiting, and buffer them
  236. %% until the request asks, though.
  237. after_parse(parse_body(Buffer, State)).
  238. %% @todo Don't parse if body is finished but request isn't. Let's not parallelize for now.
  239. after_parse({request, Req=#{streamid := StreamID, method := Method,
  240. headers := Headers, version := Version},
  241. State0=#state{opts=Opts, streams=Streams0}, Buffer}) ->
  242. try cowboy_stream:init(StreamID, Req, Opts) of
  243. {Commands, StreamState} ->
  244. TE = maps:get(<<"te">>, Headers, undefined),
  245. Streams = [#stream{id=StreamID, state=StreamState,
  246. method=Method, version=Version, te=TE}|Streams0],
  247. State1 = case maybe_req_close(State0, Headers, Version) of
  248. close -> State0#state{streams=Streams, last_streamid=StreamID};
  249. keepalive -> State0#state{streams=Streams}
  250. end,
  251. State = set_timeout(State1),
  252. parse(Buffer, commands(State, StreamID, Commands))
  253. catch Class:Exception ->
  254. cowboy_stream:report_error(init,
  255. [StreamID, Req, Opts],
  256. Class, Exception, erlang:get_stacktrace()),
  257. early_error(500, State0, {internal_error, {Class, Exception},
  258. 'Unhandled exception in cowboy_stream:init/3.'}, Req),
  259. parse(Buffer, State0)
  260. end;
  261. %% Streams are sequential so the body is always about the last stream created
  262. %% unless that stream has terminated.
  263. after_parse({data, StreamID, IsFin, Data, State=#state{
  264. streams=Streams0=[Stream=#stream{id=StreamID, state=StreamState0}|_]}, Buffer}) ->
  265. try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
  266. {Commands, StreamState} ->
  267. Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
  268. Stream#stream{state=StreamState}),
  269. parse(Buffer, commands(State#state{streams=Streams}, StreamID, Commands))
  270. catch Class:Exception ->
  271. cowboy_stream:report_error(data,
  272. [StreamID, IsFin, Data, StreamState0],
  273. Class, Exception, erlang:get_stacktrace()),
  274. stream_reset(State, StreamID, {internal_error, {Class, Exception},
  275. 'Unhandled exception in cowboy_stream:data/4.'})
  276. end;
  277. %% No corresponding stream. We must skip the body of the previous request
  278. %% in order to process the next one.
  279. after_parse({data, _, _, _, State, Buffer}) ->
  280. before_loop(State, Buffer);
  281. after_parse({more, State, Buffer}) ->
  282. before_loop(State, Buffer).
  283. %% Request-line.
  284. -spec parse_request(Buffer, State, non_neg_integer())
  285. -> {request, cowboy_req:req(), State, Buffer}
  286. | {data, cowboy_stream:streamid(), cowboy_stream:fin(), binary(), State, Buffer}
  287. | {more, State, Buffer}
  288. when Buffer::binary(), State::#state{}.
  289. %% Empty lines must be using \r\n.
  290. parse_request(<< $\n, _/bits >>, State, _) ->
  291. error_terminate(400, State, {connection_error, protocol_error,
  292. 'Empty lines between requests must use the CRLF line terminator. (RFC7230 3.5)'});
  293. parse_request(<< $\s, _/bits >>, State, _) ->
  294. error_terminate(400, State, {connection_error, protocol_error,
  295. 'The request-line must not begin with a space. (RFC7230 3.1.1, RFC7230 3.5)'});
  296. %% We limit the length of the Request-line to MaxLength to avoid endlessly
  297. %% reading from the socket and eventually crashing.
  298. parse_request(Buffer, State=#state{opts=Opts, in_streamid=InStreamID}, EmptyLines) ->
  299. MaxLength = maps:get(max_request_line_length, Opts, 8000),
  300. MaxEmptyLines = maps:get(max_empty_lines, Opts, 5),
  301. case match_eol(Buffer, 0) of
  302. nomatch when byte_size(Buffer) > MaxLength ->
  303. error_terminate(414, State, {connection_error, limit_reached,
  304. 'The request-line length is larger than configuration allows. (RFC7230 3.1.1)'});
  305. nomatch ->
  306. {more, State#state{in_state=#ps_request_line{empty_lines=EmptyLines}}, Buffer};
  307. 1 when EmptyLines =:= MaxEmptyLines ->
  308. error_terminate(400, State, {connection_error, limit_reached,
  309. 'More empty lines were received than configuration allows. (RFC7230 3.5)'});
  310. 1 ->
  311. << _:16, Rest/bits >> = Buffer,
  312. parse_request(Rest, State, EmptyLines + 1);
  313. _ ->
  314. case Buffer of
  315. %% @todo * is only for server-wide OPTIONS request (RFC7230 5.3.4); tests
  316. << "OPTIONS * ", Rest/bits >> ->
  317. parse_version(Rest, State, <<"OPTIONS">>, <<"*">>, <<>>);
  318. <<"CONNECT ", _/bits>> ->
  319. error_terminate(501, State, {connection_error, no_error,
  320. 'The CONNECT method is currently not implemented. (RFC7231 4.3.6)'});
  321. <<"TRACE ", _/bits>> ->
  322. error_terminate(501, State, {connection_error, no_error,
  323. 'The TRACE method is currently not implemented. (RFC7231 4.3.8)'});
  324. %% Accept direct HTTP/2 only at the beginning of the connection.
  325. << "PRI * HTTP/2.0\r\n", _/bits >> when InStreamID =:= 1 ->
  326. %% @todo Might be worth throwing to get a clean stacktrace.
  327. http2_upgrade(State, Buffer);
  328. _ ->
  329. parse_method(Buffer, State, <<>>,
  330. maps:get(max_method_length, Opts, 32))
  331. end
  332. end.
  333. match_eol(<< $\n, _/bits >>, N) ->
  334. N;
  335. match_eol(<< _, Rest/bits >>, N) ->
  336. match_eol(Rest, N + 1);
  337. match_eol(_, _) ->
  338. nomatch.
  339. parse_method(_, State, _, 0) ->
  340. error_terminate(501, State, {connection_error, limit_reached,
  341. 'The method name is longer than configuration allows. (RFC7230 3.1.1)'});
  342. parse_method(<< C, Rest/bits >>, State, SoFar, Remaining) ->
  343. case C of
  344. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  345. 'The method name must not be followed with a line break. (RFC7230 3.1.1)'});
  346. $\s -> parse_uri(Rest, State, SoFar);
  347. _ when ?IS_TOKEN(C) -> parse_method(Rest, State, << SoFar/binary, C >>, Remaining - 1);
  348. _ -> error_terminate(400, State, {connection_error, protocol_error,
  349. 'The method name must contain only valid token characters. (RFC7230 3.1.1)'})
  350. end.
  351. parse_uri(<< H, T, T, P, "://", Rest/bits >>, State, Method)
  352. when H =:= $h orelse H =:= $H, T =:= $t orelse T =:= $T;
  353. P =:= $p orelse P =:= $P ->
  354. parse_uri_skip_host(Rest, State, Method, <<>>);
  355. parse_uri(<< H, T, T, P, S, "://", Rest/bits >>, State, Method)
  356. when H =:= $h orelse H =:= $H, T =:= $t orelse T =:= $T;
  357. P =:= $p orelse P =:= $P; S =:= $s orelse S =:= $S ->
  358. parse_uri_skip_host(Rest, State, Method, <<>>);
  359. parse_uri(<< $/, Rest/bits >>, State, Method) ->
  360. parse_uri_path(Rest, State, Method, << $/ >>);
  361. parse_uri(_, State, _) ->
  362. error_terminate(400, State, {connection_error, protocol_error,
  363. 'Invalid request-line or request-target. (RFC7230 3.1.1, RFC7230 5.3)'}).
  364. parse_uri_skip_host(<< C, Rest/bits >>, State, Method, SoFar) ->
  365. case C of
  366. $\r ->
  367. error_terminate(400, State, {connection_error, protocol_error,
  368. 'The request-target must not be followed by a line break. (RFC7230 3.1.1)'});
  369. $@ ->
  370. error_terminate(400, State, {connection_error, protocol_error,
  371. 'Absolute URIs must not include a userinfo component. (RFC7230 2.7.1)'});
  372. C when SoFar =:= <<>> andalso
  373. ((C =:= $/) orelse (C =:= $\s) orelse (C =:= $?) orelse (C =:= $#)) ->
  374. error_terminate(400, State, {connection_error, protocol_error,
  375. 'Absolute URIs must include an authority component. (RFC7230 2.7.1)'});
  376. $/ -> parse_uri_path(Rest, State, Method, <<"/">>);
  377. $\s -> parse_version(Rest, State, Method, <<"/">>, <<>>);
  378. $? -> parse_uri_query(Rest, State, Method, <<"/">>, <<>>);
  379. $# -> skip_uri_fragment(Rest, State, Method, <<"/">>, <<>>);
  380. C -> parse_uri_skip_host(Rest, State, Method, <<SoFar/binary, C>>)
  381. end.
  382. parse_uri_path(<< C, Rest/bits >>, State, Method, SoFar) ->
  383. case C of
  384. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  385. 'The request-target must not be followed by a line break. (RFC7230 3.1.1)'});
  386. $\s -> parse_version(Rest, State, Method, SoFar, <<>>);
  387. $? -> parse_uri_query(Rest, State, Method, SoFar, <<>>);
  388. $# -> skip_uri_fragment(Rest, State, Method, SoFar, <<>>);
  389. _ -> parse_uri_path(Rest, State, Method, << SoFar/binary, C >>)
  390. end.
  391. parse_uri_query(<< C, Rest/bits >>, State, M, P, SoFar) ->
  392. case C of
  393. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  394. 'The request-target must not be followed by a line break. (RFC7230 3.1.1)'});
  395. $\s -> parse_version(Rest, State, M, P, SoFar);
  396. $# -> skip_uri_fragment(Rest, State, M, P, SoFar);
  397. _ -> parse_uri_query(Rest, State, M, P, << SoFar/binary, C >>)
  398. end.
  399. skip_uri_fragment(<< C, Rest/bits >>, State, M, P, Q) ->
  400. case C of
  401. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  402. 'The request-target must not be followed by a line break. (RFC7230 3.1.1)'});
  403. $\s -> parse_version(Rest, State, M, P, Q);
  404. _ -> skip_uri_fragment(Rest, State, M, P, Q)
  405. end.
  406. parse_version(<< "HTTP/1.1\r\n", Rest/bits >>, State, M, P, Q) ->
  407. parse_headers(Rest, State, M, P, Q, 'HTTP/1.1');
  408. parse_version(<< "HTTP/1.0\r\n", Rest/bits >>, State, M, P, Q) ->
  409. parse_headers(Rest, State, M, P, Q, 'HTTP/1.0');
  410. parse_version(<< "HTTP/1.", _, C, _/bits >>, State, _, _, _) when C =:= $\s; C =:= $\t ->
  411. error_terminate(400, State, {connection_error, protocol_error,
  412. 'Whitespace is not allowed after the HTTP version. (RFC7230 3.1.1)'});
  413. parse_version(<< C, _/bits >>, State, _, _, _) when C =:= $\s; C =:= $\t ->
  414. error_terminate(400, State, {connection_error, protocol_error,
  415. 'The separator between request target and version must be a single SP. (RFC7230 3.1.1)'});
  416. parse_version(_, State, _, _, _) ->
  417. error_terminate(505, State, {connection_error, protocol_error,
  418. 'Unsupported HTTP version. (RFC7230 2.6)'}).
  419. parse_headers(Rest, State, M, P, Q, V) ->
  420. parse_header(Rest, State#state{in_state=#ps_header{
  421. method=M, path=P, qs=Q, version=V}}, #{}).
  422. %% Headers.
  423. %% We need two or more bytes in the buffer to continue.
  424. parse_header(Rest, State=#state{in_state=PS}, Headers) when byte_size(Rest) < 2 ->
  425. {more, State#state{in_state=PS#ps_header{headers=Headers}}, Rest};
  426. parse_header(<< $\r, $\n, Rest/bits >>, S, Headers) ->
  427. request(Rest, S, Headers);
  428. parse_header(Buffer, State=#state{opts=Opts, in_state=PS}, Headers) ->
  429. MaxHeaders = maps:get(max_headers, Opts, 100),
  430. NumHeaders = maps:size(Headers),
  431. if
  432. NumHeaders >= MaxHeaders ->
  433. error_terminate(431, State#state{in_state=PS#ps_header{headers=Headers}},
  434. {connection_error, limit_reached,
  435. 'The number of headers is larger than configuration allows. (RFC7230 3.2.5, RFC6585 5)'});
  436. true ->
  437. parse_header_colon(Buffer, State, Headers)
  438. end.
  439. parse_header_colon(Buffer, State=#state{opts=Opts, in_state=PS}, Headers) ->
  440. MaxLength = maps:get(max_header_name_length, Opts, 64),
  441. case match_colon(Buffer, 0) of
  442. nomatch when byte_size(Buffer) > MaxLength ->
  443. error_terminate(431, State#state{in_state=PS#ps_header{headers=Headers}},
  444. {connection_error, limit_reached,
  445. 'A header name is larger than configuration allows. (RFC7230 3.2.5, RFC6585 5)'});
  446. nomatch ->
  447. {more, State#state{in_state=PS#ps_header{headers=Headers}}, Buffer};
  448. _ ->
  449. parse_hd_name(Buffer, State, Headers, <<>>)
  450. end.
  451. match_colon(<< $:, _/bits >>, N) ->
  452. N;
  453. match_colon(<< _, Rest/bits >>, N) ->
  454. match_colon(Rest, N + 1);
  455. match_colon(_, _) ->
  456. nomatch.
  457. parse_hd_name(<< $:, Rest/bits >>, State, H, SoFar) ->
  458. parse_hd_before_value(Rest, State, H, SoFar);
  459. parse_hd_name(<< C, _/bits >>, State=#state{in_state=PS}, H, <<>>) when ?IS_WS(C) ->
  460. error_terminate(400, State#state{in_state=PS#ps_header{headers=H}},
  461. {connection_error, protocol_error,
  462. 'Whitespace is not allowed before the header name. (RFC7230 3.2)'});
  463. parse_hd_name(<< C, _/bits >>, State=#state{in_state=PS}, H, _) when ?IS_WS(C) ->
  464. error_terminate(400, State#state{in_state=PS#ps_header{headers=H}},
  465. {connection_error, protocol_error,
  466. 'Whitespace is not allowed between the header name and the colon. (RFC7230 3.2.4)'});
  467. parse_hd_name(<< C, Rest/bits >>, State, H, SoFar) ->
  468. ?LOWER(parse_hd_name, Rest, State, H, SoFar).
  469. parse_hd_before_value(<< $\s, Rest/bits >>, S, H, N) ->
  470. parse_hd_before_value(Rest, S, H, N);
  471. parse_hd_before_value(<< $\t, Rest/bits >>, S, H, N) ->
  472. parse_hd_before_value(Rest, S, H, N);
  473. parse_hd_before_value(Buffer, State=#state{opts=Opts, in_state=PS}, H, N) ->
  474. MaxLength = maps:get(max_header_value_length, Opts, 4096),
  475. case match_eol(Buffer, 0) of
  476. nomatch when byte_size(Buffer) > MaxLength ->
  477. error_terminate(431, State#state{in_state=PS#ps_header{headers=H}},
  478. {connection_error, limit_reached,
  479. 'A header value is larger than configuration allows. (RFC7230 3.2.5, RFC6585 5)'});
  480. nomatch ->
  481. {more, State#state{in_state=PS#ps_header{headers=H, name=N}}, Buffer};
  482. _ ->
  483. parse_hd_value(Buffer, State, H, N, <<>>)
  484. end.
  485. parse_hd_value(<< $\r, $\n, Rest/bits >>, S, Headers0, Name, SoFar) ->
  486. Value = clean_value_ws_end(SoFar, byte_size(SoFar) - 1),
  487. Headers = case maps:get(Name, Headers0, undefined) of
  488. undefined -> Headers0#{Name => Value};
  489. %% The cookie header does not use proper HTTP header lists.
  490. Value0 when Name =:= <<"cookie">> -> Headers0#{Name => << Value0/binary, "; ", Value/binary >>};
  491. Value0 -> Headers0#{Name => << Value0/binary, ", ", Value/binary >>}
  492. end,
  493. parse_header(Rest, S, Headers);
  494. parse_hd_value(<< C, Rest/bits >>, S, H, N, SoFar) ->
  495. parse_hd_value(Rest, S, H, N, << SoFar/binary, C >>).
  496. clean_value_ws_end(_, -1) ->
  497. <<>>;
  498. clean_value_ws_end(Value, N) ->
  499. case binary:at(Value, N) of
  500. $\s -> clean_value_ws_end(Value, N - 1);
  501. $\t -> clean_value_ws_end(Value, N - 1);
  502. _ ->
  503. S = N + 1,
  504. << Value2:S/binary, _/bits >> = Value,
  505. Value2
  506. end.
  507. -ifdef(TEST).
  508. clean_value_ws_end_test_() ->
  509. Tests = [
  510. {<<>>, <<>>},
  511. {<<" ">>, <<>>},
  512. {<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
  513. "text/html;level=2;q=0.4, */*;q=0.5 \t \t ">>,
  514. <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
  515. "text/html;level=2;q=0.4, */*;q=0.5">>}
  516. ],
  517. [{V, fun() -> R = clean_value_ws_end(V, byte_size(V) - 1) end} || {V, R} <- Tests].
  518. horse_clean_value_ws_end() ->
  519. horse:repeat(200000,
  520. clean_value_ws_end(
  521. <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
  522. "text/html;level=2;q=0.4, */*;q=0.5 ">>,
  523. byte_size(<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
  524. "text/html;level=2;q=0.4, */*;q=0.5 ">>) - 1)
  525. ).
  526. -endif.
  527. request(Buffer, State=#state{transport=Transport, in_streamid=StreamID,
  528. in_state=PS=#ps_header{version=Version}}, Headers) ->
  529. case maps:get(<<"host">>, Headers, undefined) of
  530. undefined when Version =:= 'HTTP/1.1' ->
  531. %% @todo Might want to not close the connection on this and next one.
  532. error_terminate(400, State#state{in_state=PS#ps_header{headers=Headers}},
  533. {stream_error, StreamID, protocol_error,
  534. 'HTTP/1.1 requests must include a host header. (RFC7230 5.4)'});
  535. undefined ->
  536. request(Buffer, State, Headers, <<>>, default_port(Transport:secure()));
  537. RawHost ->
  538. try cow_http_hd:parse_host(RawHost) of
  539. {Host, undefined} ->
  540. request(Buffer, State, Headers, Host, default_port(Transport:secure()));
  541. {Host, Port} ->
  542. request(Buffer, State, Headers, Host, Port)
  543. catch _:_ ->
  544. error_terminate(400, State#state{in_state=PS#ps_header{headers=Headers}},
  545. {stream_error, StreamID, protocol_error,
  546. 'The host header is invalid. (RFC7230 5.4)'})
  547. end
  548. end.
  549. -spec default_port(boolean()) -> 80 | 443.
  550. default_port(true) -> 443;
  551. default_port(_) -> 80.
  552. %% End of request parsing.
  553. request(Buffer, State0=#state{ref=Ref, transport=Transport, peer=Peer, sock=Sock, cert=Cert,
  554. in_streamid=StreamID, in_state=
  555. PS=#ps_header{method=Method, path=Path, qs=Qs, version=Version}},
  556. Headers0, Host, Port) ->
  557. Scheme = case Transport:secure() of
  558. true -> <<"https">>;
  559. false -> <<"http">>
  560. end,
  561. {Headers, HasBody, BodyLength, TDecodeFun, TDecodeState} = case Headers0 of
  562. #{<<"transfer-encoding">> := TransferEncoding0} ->
  563. try cow_http_hd:parse_transfer_encoding(TransferEncoding0) of
  564. [<<"chunked">>] ->
  565. {maps:remove(<<"content-length">>, Headers0),
  566. true, undefined, fun cow_http_te:stream_chunked/2, {0, 0}};
  567. _ ->
  568. error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers0}},
  569. {stream_error, StreamID, protocol_error,
  570. 'Cowboy only supports transfer-encoding: chunked. (RFC7230 3.3.1)'})
  571. catch _:_ ->
  572. error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers0}},
  573. {stream_error, StreamID, protocol_error,
  574. 'The transfer-encoding header is invalid. (RFC7230 3.3.1)'})
  575. end;
  576. #{<<"content-length">> := <<"0">>} ->
  577. {Headers0, false, 0, undefined, undefined};
  578. #{<<"content-length">> := BinLength} ->
  579. Length = try
  580. cow_http_hd:parse_content_length(BinLength)
  581. catch _:_ ->
  582. error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers0}},
  583. {stream_error, StreamID, protocol_error,
  584. 'The content-length header is invalid. (RFC7230 3.3.2)'})
  585. end,
  586. {Headers0, true, Length, fun cow_http_te:stream_identity/2, {0, Length}};
  587. _ ->
  588. {Headers0, false, 0, undefined, undefined}
  589. end,
  590. Req = #{
  591. ref => Ref,
  592. pid => self(),
  593. streamid => StreamID,
  594. peer => Peer,
  595. sock => Sock,
  596. cert => Cert,
  597. method => Method,
  598. scheme => Scheme,
  599. host => Host,
  600. port => Port,
  601. path => Path,
  602. qs => Qs,
  603. version => Version,
  604. %% We are transparently taking care of transfer-encodings so
  605. %% the user code has no need to know about it.
  606. headers => maps:remove(<<"transfer-encoding">>, Headers),
  607. has_body => HasBody,
  608. body_length => BodyLength
  609. },
  610. case is_http2_upgrade(Headers, Version) of
  611. false ->
  612. State = case HasBody of
  613. true ->
  614. State0#state{in_state=#ps_body{
  615. length = BodyLength,
  616. transfer_decode_fun = TDecodeFun,
  617. transfer_decode_state = TDecodeState
  618. }};
  619. false ->
  620. State0#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}
  621. end,
  622. {request, Req, State, Buffer};
  623. {true, HTTP2Settings} ->
  624. %% We save the headers in case the upgrade will fail
  625. %% and we need to pass them to cowboy_stream:early_error.
  626. http2_upgrade(State0#state{in_state=PS#ps_header{headers=Headers}},
  627. Buffer, HTTP2Settings, Req)
  628. end.
  629. %% HTTP/2 upgrade.
  630. %% @todo We must not upgrade to h2c over a TLS connection.
  631. is_http2_upgrade(#{<<"connection">> := Conn, <<"upgrade">> := Upgrade,
  632. <<"http2-settings">> := HTTP2Settings}, 'HTTP/1.1') ->
  633. Conns = cow_http_hd:parse_connection(Conn),
  634. case {lists:member(<<"upgrade">>, Conns), lists:member(<<"http2-settings">>, Conns)} of
  635. {true, true} ->
  636. Protocols = cow_http_hd:parse_upgrade(Upgrade),
  637. case lists:member(<<"h2c">>, Protocols) of
  638. true ->
  639. {true, HTTP2Settings};
  640. false ->
  641. false
  642. end;
  643. _ ->
  644. false
  645. end;
  646. is_http2_upgrade(_, _) ->
  647. false.
  648. %% Prior knowledge upgrade, without an HTTP/1.1 request.
  649. http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
  650. opts=Opts, peer=Peer, sock=Sock, cert=Cert}, Buffer) ->
  651. case Transport:secure() of
  652. false ->
  653. _ = cancel_timeout(State),
  654. cowboy_http2:init(Parent, Ref, Socket, Transport, Opts,
  655. Peer, Sock, Cert, Buffer);
  656. true ->
  657. error_terminate(400, State, {connection_error, protocol_error,
  658. 'Clients that support HTTP/2 over TLS MUST use ALPN. (RFC7540 3.4)'})
  659. end.
  660. %% Upgrade via an HTTP/1.1 request.
  661. http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
  662. opts=Opts, peer=Peer, sock=Sock, cert=Cert}, Buffer, HTTP2Settings, Req) ->
  663. %% @todo
  664. %% However if the client sent a body, we need to read the body in full
  665. %% and if we can't do that, return a 413 response. Some options are in order.
  666. %% Always half-closed stream coming from this side.
  667. try cow_http_hd:parse_http2_settings(HTTP2Settings) of
  668. Settings ->
  669. _ = cancel_timeout(State),
  670. cowboy_http2:init(Parent, Ref, Socket, Transport, Opts,
  671. Peer, Sock, Cert, Buffer, Settings, Req)
  672. catch _:_ ->
  673. error_terminate(400, State, {connection_error, protocol_error,
  674. 'The HTTP2-Settings header must contain a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'})
  675. end.
  676. %% Request body parsing.
  677. parse_body(Buffer, State=#state{in_streamid=StreamID, in_state=
  678. PS=#ps_body{received=Received, transfer_decode_fun=TDecode,
  679. transfer_decode_state=TState0}}) ->
  680. %% @todo Proper trailers.
  681. try TDecode(Buffer, TState0) of
  682. more ->
  683. %% @todo Asks for 0 or more bytes.
  684. {more, State, Buffer};
  685. {more, Data, TState} ->
  686. %% @todo Asks for 0 or more bytes.
  687. {data, StreamID, nofin, Data, State#state{in_state=
  688. PS#ps_body{received=Received + byte_size(Data),
  689. transfer_decode_state=TState}}, <<>>};
  690. {more, Data, _Length, TState} when is_integer(_Length) ->
  691. %% @todo Asks for Length more bytes.
  692. {data, StreamID, nofin, Data, State#state{in_state=
  693. PS#ps_body{received=Received + byte_size(Data),
  694. transfer_decode_state=TState}}, <<>>};
  695. {more, Data, Rest, TState} ->
  696. %% @todo Asks for 0 or more bytes.
  697. {data, StreamID, nofin, Data, State#state{in_state=
  698. PS#ps_body{received=Received + byte_size(Data),
  699. transfer_decode_state=TState}}, Rest};
  700. {done, _HasTrailers, Rest} ->
  701. {data, StreamID, fin, <<>>, set_timeout(
  702. State#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}), Rest};
  703. {done, Data, _HasTrailers, Rest} ->
  704. {data, StreamID, fin, Data, set_timeout(
  705. State#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}), Rest}
  706. catch _:_ ->
  707. Reason = {connection_error, protocol_error,
  708. 'Failure to decode the content. (RFC7230 4)'},
  709. terminate(stream_terminate(State, StreamID, Reason), Reason)
  710. end.
  711. %% Message handling.
  712. down(State=#state{children=Children0}, Pid, Msg) ->
  713. case cowboy_children:down(Children0, Pid) of
  714. %% The stream was terminated already.
  715. {ok, undefined, Children} ->
  716. State#state{children=Children};
  717. %% The stream is still running.
  718. {ok, StreamID, Children} ->
  719. info(State#state{children=Children}, StreamID, Msg);
  720. %% The process was unknown.
  721. error ->
  722. error_logger:error_msg("Received EXIT signal ~p for unknown process ~p.~n", [Msg, Pid]),
  723. State
  724. end.
  725. info(State=#state{streams=Streams0}, StreamID, Msg) ->
  726. case lists:keyfind(StreamID, #stream.id, Streams0) of
  727. Stream = #stream{state=StreamState0} ->
  728. try cowboy_stream:info(StreamID, Msg, StreamState0) of
  729. {Commands, StreamState} ->
  730. Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
  731. Stream#stream{state=StreamState}),
  732. commands(State#state{streams=Streams}, StreamID, Commands)
  733. catch Class:Exception ->
  734. cowboy_stream:report_error(info,
  735. [StreamID, Msg, StreamState0],
  736. Class, Exception, erlang:get_stacktrace()),
  737. stream_reset(State, StreamID, {internal_error, {Class, Exception},
  738. 'Unhandled exception in cowboy_stream:info/3.'})
  739. end;
  740. false ->
  741. error_logger:error_msg("Received message ~p for unknown stream ~p.~n", [Msg, StreamID]),
  742. State
  743. end.
  744. %% Commands.
  745. commands(State, _, []) ->
  746. State;
  747. %% Supervise a child process.
  748. commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) ->
  749. commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)},
  750. StreamID, Tail);
  751. %% Error handling.
  752. commands(State, StreamID, [Error = {internal_error, _, _}|Tail]) ->
  753. commands(stream_reset(State, StreamID, Error), StreamID, Tail);
  754. %% Commands for a stream currently inactive.
  755. commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Commands)
  756. when Current =/= StreamID ->
  757. %% @todo We still want to handle some commands...
  758. Stream = #stream{queue=Queue} = lists:keyfind(StreamID, #stream.id, Streams0),
  759. Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
  760. Stream#stream{queue=Queue ++ Commands}),
  761. State#state{streams=Streams};
  762. %% Read the request body.
  763. commands(State, StreamID, [{flow, _Length}|Tail]) ->
  764. %% @todo We only read from socket if buffer is empty, otherwise
  765. %% we decode the buffer.
  766. %% @todo Set the body reading length to min(Length, BodyLength)
  767. commands(State, StreamID, Tail);
  768. %% Error responses are sent only if a response wasn't sent already.
  769. commands(State=#state{out_state=wait}, StreamID, [{error_response, Status, Headers0, Body}|Tail]) ->
  770. %% We close the connection when the error response is 408, as it
  771. %% indicates a timeout and the RFC recommends that we stop here. (RFC7231 6.5.7)
  772. Headers = case Status of
  773. 408 -> Headers0#{<<"connection">> => <<"close">>};
  774. <<"408", _/bits>> -> Headers0#{<<"connection">> => <<"close">>};
  775. _ -> Headers0
  776. end,
  777. commands(State, StreamID, [{response, Status, Headers, Body}|Tail]);
  778. commands(State, StreamID, [{error_response, _, _, _}|Tail]) ->
  779. commands(State, StreamID, Tail);
  780. %% Send an informational response.
  781. commands(State=#state{socket=Socket, transport=Transport, out_state=wait, streams=Streams},
  782. StreamID, [{inform, StatusCode, Headers}|Tail]) ->
  783. %% @todo I'm pretty sure the last stream in the list is the one we want
  784. %% considering all others are queued.
  785. #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
  786. _ = case Version of
  787. 'HTTP/1.1' ->
  788. Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1',
  789. headers_to_list(Headers)));
  790. %% Do not send informational responses to HTTP/1.0 clients. (RFC7231 6.2)
  791. 'HTTP/1.0' ->
  792. ok
  793. end,
  794. commands(State, StreamID, Tail);
  795. %% Send a full response.
  796. %%
  797. %% @todo Kill the stream if it sent a response when one has already been sent.
  798. %% @todo Keep IsFin in the state.
  799. %% @todo Same two things above apply to DATA, possibly promise too.
  800. commands(State0=#state{socket=Socket, transport=Transport, out_state=wait, streams=Streams}, StreamID,
  801. [{response, StatusCode, Headers0, Body}|Tail]) ->
  802. %% @todo I'm pretty sure the last stream in the list is the one we want
  803. %% considering all others are queued.
  804. #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
  805. {State, Headers} = connection(State0, Headers0, StreamID, Version),
  806. %% @todo Ensure content-length is set.
  807. Response = cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers)),
  808. case Body of
  809. {sendfile, O, B, P} ->
  810. Transport:send(Socket, Response),
  811. commands(State, StreamID, [{sendfile, fin, O, B, P}|Tail]);
  812. _ ->
  813. Transport:send(Socket, [Response, Body]),
  814. commands(State#state{out_state=done}, StreamID, Tail)
  815. end;
  816. %% Send response headers and initiate chunked encoding.
  817. commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
  818. [{headers, StatusCode, Headers0}|Tail]) ->
  819. %% @todo Same as above.
  820. #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
  821. {State1, Headers1} = case Version of
  822. 'HTTP/1.1' ->
  823. {State0, Headers0#{<<"transfer-encoding">> => <<"chunked">>}};
  824. %% Close the connection after streaming the data to HTTP/1.0 client.
  825. %% @todo I'm guessing we need to differentiate responses with a content-length and others.
  826. 'HTTP/1.0' ->
  827. {State0#state{last_streamid=StreamID}, Headers0}
  828. end,
  829. {State, Headers} = connection(State1, Headers1, StreamID, Version),
  830. Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers))),
  831. commands(State#state{out_state=chunked}, StreamID, Tail);
  832. %% Send a response body chunk.
  833. %%
  834. %% @todo WINDOW_UPDATE stuff require us to buffer some data.
  835. %% @todo We probably want to allow Data to be the {sendfile, ...} tuple also.
  836. commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
  837. [{data, IsFin, Data}|Tail]) ->
  838. %% Do not send anything when the user asks to send an empty
  839. %% data frame, as that would break the protocol.
  840. Size = iolist_size(Data),
  841. case Size of
  842. 0 ->
  843. %% We send the last chunk only if version is HTTP/1.1 and IsFin=fin.
  844. case lists:keyfind(StreamID, #stream.id, Streams) of
  845. #stream{method= <<"HEAD">>} ->
  846. ok;
  847. #stream{version='HTTP/1.1'} when IsFin =:= fin ->
  848. Transport:send(Socket, <<"0\r\n\r\n">>);
  849. _ ->
  850. ok
  851. end;
  852. _ ->
  853. %% @todo We need to kill the stream if it tries to send data before headers.
  854. %% @todo Same as above.
  855. case lists:keyfind(StreamID, #stream.id, Streams) of
  856. #stream{method= <<"HEAD">>} ->
  857. ok;
  858. #stream{version='HTTP/1.1'} ->
  859. Transport:send(Socket, [
  860. integer_to_binary(Size, 16), <<"\r\n">>, Data,
  861. case IsFin of
  862. fin -> <<"\r\n0\r\n\r\n">>;
  863. nofin -> <<"\r\n">>
  864. end
  865. ]);
  866. #stream{version='HTTP/1.0'} ->
  867. Transport:send(Socket, Data)
  868. end
  869. end,
  870. State = case IsFin of
  871. fin -> State0#state{out_state=done};
  872. nofin -> State0
  873. end,
  874. commands(State, StreamID, Tail);
  875. %% Send trailers.
  876. commands(State=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
  877. [{trailers, Trailers}|Tail]) ->
  878. TE = case lists:keyfind(StreamID, #stream.id, Streams) of
  879. %% HTTP/1.0 doesn't support chunked transfer-encoding.
  880. #stream{version='HTTP/1.0'} ->
  881. not_chunked;
  882. %% No TE header was sent.
  883. #stream{te=undefined} ->
  884. no_trailers;
  885. #stream{te=TE0} ->
  886. try cow_http_hd:parse_te(TE0) of
  887. {TE1, _} -> TE1
  888. catch _:_ ->
  889. %% If we can't parse the TE header, assume we can't send trailers.
  890. no_trailers
  891. end
  892. end,
  893. case TE of
  894. trailers ->
  895. Transport:send(Socket, [
  896. <<"0\r\n">>,
  897. cow_http:headers(maps:to_list(Trailers)),
  898. <<"\r\n">>
  899. ]);
  900. no_trailers ->
  901. Transport:send(Socket, <<"0\r\n\r\n">>);
  902. not_chunked ->
  903. ok
  904. end,
  905. commands(State#state{out_state=done}, StreamID, Tail);
  906. %% Send a file.
  907. commands(State0=#state{socket=Socket, transport=Transport}, StreamID,
  908. [{sendfile, IsFin, Offset, Bytes, Path}|Tail]) ->
  909. %% We wrap the sendfile call into a try/catch because on OTP-20
  910. %% and earlier a few different crashes could occur for sockets
  911. %% that were closing or closed. For example a badarg in
  912. %% erlang:port_get_data(#Port<...>) or a badmatch like
  913. %% {{badmatch,{error,einval}},[{prim_file,sendfile,8,[]}...
  914. %%
  915. %% OTP-21 uses a NIF instead of a port so the implementation
  916. %% and behavior has dramatically changed and it is unclear
  917. %% whether it will be necessary in the future.
  918. %%
  919. %% This try/catch prevents some noisy logs to be written
  920. %% when these errors occur.
  921. try
  922. Transport:sendfile(Socket, Path, Offset, Bytes),
  923. State = case IsFin of
  924. fin -> State0#state{out_state=done}
  925. %% @todo Add the sendfile command.
  926. % nofin -> State0
  927. end,
  928. commands(State, StreamID, Tail)
  929. catch _:_ ->
  930. terminate(State0, {socket_error, sendfile_crash,
  931. 'An error occurred when using the sendfile function.'})
  932. end;
  933. %% Protocol takeover.
  934. commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport,
  935. opts=Opts, children=Children}, StreamID,
  936. [{switch_protocol, Headers, Protocol, InitialState}|_Tail]) ->
  937. %% @todo This should be the last stream running otherwise we need to wait before switching.
  938. %% @todo If there's streams opened after this one, fail instead of 101.
  939. State = cancel_timeout(State0),
  940. %% Before we send the 101 response we need to stop receiving data
  941. %% from the socket, otherwise the data might be receive before the
  942. %% call to flush/0 and we end up inadvertently dropping a packet.
  943. %%
  944. %% @todo Handle cases where the request came with a body. We need
  945. %% to process or skip the body before the upgrade can be completed.
  946. Transport:setopts(Socket, [{active, false}]),
  947. %% Send a 101 response, then terminate the stream.
  948. #state{streams=Streams} = info(State, StreamID, {inform, 101, Headers}),
  949. #stream{state=StreamState} = lists:keyfind(StreamID, #stream.id, Streams),
  950. %% @todo We need to shutdown processes here first.
  951. stream_call_terminate(StreamID, switch_protocol, StreamState),
  952. %% Terminate children processes and flush any remaining messages from the mailbox.
  953. cowboy_children:terminate(Children),
  954. flush(Parent),
  955. %% @todo This is no good because commands return a state normally and here it doesn't
  956. %% we need to let this module go entirely. Perhaps it should be handled directly in
  957. %% cowboy_clear/cowboy_tls?
  958. Protocol:takeover(Parent, Ref, Socket, Transport, Opts, <<>>, InitialState);
  959. %% Stream shutdown.
  960. commands(State, StreamID, [stop|Tail]) ->
  961. %% @todo Do we want to run the commands after a stop?
  962. %% @todo We currently wait for the stop command before we
  963. %% continue with the next request/response. In theory, if
  964. %% the request body was read fully and the response body
  965. %% was sent fully we should be able to start working on
  966. %% the next request concurrently. This can be done as a
  967. %% future optimization.
  968. maybe_terminate(State, StreamID, Tail);
  969. %% HTTP/1.1 does not support push; ignore.
  970. commands(State, StreamID, [{push, _, _, _, _, _, _, _}|Tail]) ->
  971. commands(State, StreamID, Tail).
  972. %% The set-cookie header is special; we can only send one cookie per header.
  973. headers_to_list(Headers0=#{<<"set-cookie">> := SetCookies}) ->
  974. Headers1 = maps:to_list(maps:remove(<<"set-cookie">>, Headers0)),
  975. Headers1 ++ [{<<"set-cookie">>, Value} || Value <- SetCookies];
  976. headers_to_list(Headers) ->
  977. maps:to_list(Headers).
  978. %% Flush messages specific to cowboy_http before handing over the
  979. %% connection to another protocol.
  980. flush(Parent) ->
  981. receive
  982. {timeout, _, _} ->
  983. flush(Parent);
  984. {{Pid, _}, _} when Pid =:= self() ->
  985. flush(Parent);
  986. {'EXIT', Pid, _} when Pid =/= Parent ->
  987. flush(Parent)
  988. after 0 ->
  989. ok
  990. end.
  991. %% @todo In these cases I'm not sure if we should continue processing commands.
  992. maybe_terminate(State=#state{last_streamid=StreamID}, StreamID, _Tail) ->
  993. terminate(stream_terminate(State, StreamID, normal), normal); %% @todo Reason ok?
  994. maybe_terminate(State, StreamID, _Tail) ->
  995. stream_terminate(State, StreamID, normal).
  996. stream_reset(State, StreamID, StreamError={internal_error, _, _}) ->
  997. %% @todo headers
  998. %% @todo Don't send this if there are no streams left.
  999. % Transport:send(Socket, cow_http:response(500, 'HTTP/1.1', [
  1000. % {<<"content-length">>, <<"0">>}
  1001. % ])),
  1002. %% @todo update IsFin local
  1003. % stream_terminate(State#state{out_state=done}, StreamID, StreamError).
  1004. stream_terminate(State, StreamID, StreamError).
  1005. stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InState,
  1006. out_streamid=OutStreamID, out_state=OutState, streams=Streams0,
  1007. children=Children0}, StreamID, Reason) ->
  1008. #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams0),
  1009. State1 = #state{streams=Streams1} = case OutState of
  1010. wait when element(1, Reason) =:= internal_error ->
  1011. info(State0, StreamID, {response, 500, #{<<"content-length">> => <<"0">>}, <<>>});
  1012. wait when element(1, Reason) =:= connection_error ->
  1013. info(State0, StreamID, {response, 400, #{<<"content-length">> => <<"0">>}, <<>>});
  1014. wait ->
  1015. info(State0, StreamID, {response, 204, #{}, <<>>});
  1016. chunked when Version =:= 'HTTP/1.1' ->
  1017. info(State0, StreamID, {data, fin, <<>>});
  1018. _ -> %% done or Version =:= 'HTTP/1.0'
  1019. State0
  1020. end,
  1021. %% Remove the stream from the state.
  1022. {value, #stream{state=StreamState}, Streams}
  1023. = lists:keytake(StreamID, #stream.id, Streams1),
  1024. State2 = State1#state{streams=Streams},
  1025. %% Stop the stream.
  1026. stream_call_terminate(StreamID, Reason, StreamState),
  1027. Children = cowboy_children:shutdown(Children0, StreamID),
  1028. %% We reset the timeout if there are no active streams anymore.
  1029. State = case Streams of
  1030. [] -> set_timeout(State2);
  1031. _ -> State2
  1032. end,
  1033. %% We want to drop the connection if the body was not read fully
  1034. %% and we don't know its length or more remains to be read than
  1035. %% configuration allows.
  1036. %% @todo Only do this if Current =:= StreamID.
  1037. MaxSkipBodyLength = maps:get(max_skip_body_length, Opts, 1000000),
  1038. case InState of
  1039. #ps_body{length=undefined}
  1040. when InStreamID =:= OutStreamID ->
  1041. terminate(State#state{streams=Streams, children=Children}, skip_body_unknown_length);
  1042. #ps_body{length=Len, received=Received}
  1043. when InStreamID =:= OutStreamID, Received + MaxSkipBodyLength < Len ->
  1044. terminate(State#state{streams=Streams, children=Children}, skip_body_too_large);
  1045. _ ->
  1046. %% Move on to the next stream.
  1047. NextOutStreamID = OutStreamID + 1,
  1048. case lists:keyfind(NextOutStreamID, #stream.id, Streams) of
  1049. false ->
  1050. %% @todo This is clearly wrong, if the stream is gone we need to check if
  1051. %% there used to be such a stream, and if there was to send an error.
  1052. State#state{out_streamid=NextOutStreamID, out_state=wait,
  1053. streams=Streams, children=Children};
  1054. #stream{queue=Commands} ->
  1055. %% @todo Remove queue from the stream.
  1056. commands(State#state{out_streamid=NextOutStreamID, out_state=wait,
  1057. streams=Streams, children=Children}, NextOutStreamID, Commands)
  1058. end
  1059. end.
  1060. stream_call_terminate(StreamID, Reason, StreamState) ->
  1061. try
  1062. cowboy_stream:terminate(StreamID, Reason, StreamState)
  1063. catch Class:Exception ->
  1064. cowboy_stream:report_error(terminate,
  1065. [StreamID, Reason, StreamState],
  1066. Class, Exception, erlang:get_stacktrace())
  1067. end.
  1068. %% @todo max_reqs also
  1069. maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.0') ->
  1070. Conns = cow_http_hd:parse_connection(Conn),
  1071. case lists:member(<<"keep-alive">>, Conns) of
  1072. true -> keepalive;
  1073. false -> close
  1074. end;
  1075. maybe_req_close(_, _, 'HTTP/1.0') ->
  1076. close;
  1077. maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.1') ->
  1078. case connection_hd_is_close(Conn) of
  1079. true -> close;
  1080. false -> keepalive
  1081. end;
  1082. maybe_req_close(_State, _, _) ->
  1083. keepalive.
  1084. connection(State=#state{last_streamid=StreamID}, Headers=#{<<"connection">> := Conn}, StreamID, _) ->
  1085. case connection_hd_is_close(Conn) of
  1086. true -> {State, Headers};
  1087. %% @todo Here we need to remove keep-alive and add close, not just add close.
  1088. false -> {State, Headers#{<<"connection">> => [<<"close, ">>, Conn]}}
  1089. end;
  1090. connection(State=#state{last_streamid=StreamID}, Headers, StreamID, _) ->
  1091. {State, Headers#{<<"connection">> => <<"close">>}};
  1092. connection(State, Headers=#{<<"connection">> := Conn}, StreamID, _) ->
  1093. case connection_hd_is_close(Conn) of
  1094. true -> {State#state{last_streamid=StreamID}, Headers};
  1095. %% @todo Here we need to set keep-alive only if it wasn't set before.
  1096. false -> {State, Headers}
  1097. end;
  1098. connection(State, Headers, _, 'HTTP/1.0') ->
  1099. {State, Headers#{<<"connection">> => <<"keep-alive">>}};
  1100. connection(State, Headers, _, _) ->
  1101. {State, Headers}.
  1102. connection_hd_is_close(Conn) ->
  1103. Conns = cow_http_hd:parse_connection(iolist_to_binary(Conn)),
  1104. lists:member(<<"close">>, Conns).
  1105. %% This function is only called when an error occurs on a new stream.
  1106. -spec error_terminate(cowboy:http_status(), #state{}, _) -> no_return().
  1107. error_terminate(StatusCode, State=#state{ref=Ref, peer=Peer, in_state=StreamState}, Reason) ->
  1108. PartialReq = case StreamState of
  1109. #ps_request_line{} -> #{
  1110. ref => Ref,
  1111. peer => Peer
  1112. };
  1113. #ps_header{method=Method, path=Path, qs=Qs,
  1114. version=Version, headers=ReqHeaders} -> #{
  1115. ref => Ref,
  1116. peer => Peer,
  1117. method => Method,
  1118. path => Path,
  1119. qs => Qs,
  1120. version => Version,
  1121. headers => case ReqHeaders of
  1122. undefined -> #{};
  1123. _ -> ReqHeaders
  1124. end
  1125. }
  1126. end,
  1127. early_error(StatusCode, State, Reason, PartialReq, #{<<"connection">> => <<"close">>}),
  1128. terminate(State, Reason).
  1129. early_error(StatusCode, State, Reason, PartialReq) ->
  1130. early_error(StatusCode, State, Reason, PartialReq, #{}).
  1131. early_error(StatusCode0, #state{socket=Socket, transport=Transport,
  1132. opts=Opts, in_streamid=StreamID}, Reason, PartialReq, RespHeaders0) ->
  1133. RespHeaders1 = RespHeaders0#{<<"content-length">> => <<"0">>},
  1134. Resp = {response, StatusCode0, RespHeaders1, <<>>},
  1135. try cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts) of
  1136. {response, StatusCode, RespHeaders, RespBody} ->
  1137. Transport:send(Socket, [
  1138. cow_http:response(StatusCode, 'HTTP/1.1', maps:to_list(RespHeaders)),
  1139. %% @todo We shouldn't send the body when the method is HEAD.
  1140. %% @todo Technically we allow the sendfile tuple.
  1141. RespBody
  1142. ])
  1143. catch Class:Exception ->
  1144. cowboy_stream:report_error(early_error,
  1145. [StreamID, Reason, PartialReq, Resp, Opts],
  1146. Class, Exception, erlang:get_stacktrace()),
  1147. %% We still need to send an error response, so send what we initially
  1148. %% wanted to send. It's better than nothing.
  1149. Transport:send(Socket, cow_http:response(StatusCode0,
  1150. 'HTTP/1.1', maps:to_list(RespHeaders1)))
  1151. end,
  1152. ok.
  1153. -spec terminate(_, _) -> no_return().
  1154. terminate(undefined, Reason) ->
  1155. exit({shutdown, Reason});
  1156. terminate(State=#state{streams=Streams, children=Children}, Reason) ->
  1157. terminate_all_streams(Streams, Reason),
  1158. cowboy_children:terminate(Children),
  1159. terminate_linger(State),
  1160. exit({shutdown, Reason}).
  1161. terminate_all_streams([], _) ->
  1162. ok;
  1163. terminate_all_streams([#stream{id=StreamID, state=StreamState}|Tail], Reason) ->
  1164. stream_call_terminate(StreamID, Reason, StreamState),
  1165. terminate_all_streams(Tail, Reason).
  1166. terminate_linger(State=#state{socket=Socket, transport=Transport, opts=Opts}) ->
  1167. case Transport:shutdown(Socket, write) of
  1168. ok ->
  1169. case maps:get(linger_timeout, Opts, 1000) of
  1170. 0 ->
  1171. ok;
  1172. infinity ->
  1173. terminate_linger_loop(State, undefined);
  1174. Timeout ->
  1175. TimerRef = erlang:start_timer(Timeout, self(), linger_timeout),
  1176. terminate_linger_loop(State, TimerRef)
  1177. end;
  1178. {error, _} ->
  1179. ok
  1180. end.
  1181. terminate_linger_loop(State=#state{socket=Socket, transport=Transport}, TimerRef) ->
  1182. {OK, Closed, Error} = Transport:messages(),
  1183. %% We may already have a message in the mailbox when we do this
  1184. %% but it's OK because we are shutting down anyway.
  1185. case Transport:setopts(Socket, [{active, once}]) of
  1186. ok ->
  1187. receive
  1188. {OK, Socket, _} ->
  1189. terminate_linger_loop(State, TimerRef);
  1190. {Closed, Socket} ->
  1191. ok;
  1192. {Error, Socket, _} ->
  1193. ok;
  1194. {timeout, TimerRef, linger_timeout} ->
  1195. ok;
  1196. _ ->
  1197. terminate_linger_loop(State, TimerRef)
  1198. end;
  1199. {error, _} ->
  1200. ok
  1201. end.
  1202. %% System callbacks.
  1203. -spec system_continue(_, _, {#state{}, binary()}) -> ok.
  1204. system_continue(_, _, {State, Buffer}) ->
  1205. loop(State, Buffer).
  1206. -spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return().
  1207. system_terminate(Reason, _, _, {State, _}) ->
  1208. %% @todo We should exit gracefully, if possible.
  1209. terminate(State, Reason).
  1210. -spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}.
  1211. system_code_change(Misc, _, _, _) ->
  1212. {ok, Misc}.