cowboy_http.erl 41 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054
  1. %% Copyright (c) 2016, Loïc Hoguin <essen@ninenines.eu>
  2. %%
  3. %% Permission to use, copy, modify, and/or distribute this software for any
  4. %% purpose with or without fee is hereby granted, provided that the above
  5. %% copyright notice and this permission notice appear in all copies.
  6. %%
  7. %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  8. %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  9. %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  10. %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  11. %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  12. %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  13. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  14. -module(cowboy_http).
  15. -export([init/6]).
  16. -export([system_continue/3]).
  17. -export([system_terminate/4]).
  18. -export([system_code_change/4]).
  19. %% @todo map
  20. -type opts() :: [{compress, boolean()}
  21. | {env, cowboy_middleware:env()}
  22. | {max_empty_lines, non_neg_integer()}
  23. | {max_header_name_length, non_neg_integer()}
  24. | {max_header_value_length, non_neg_integer()}
  25. | {max_headers, non_neg_integer()}
  26. | {max_keepalive, non_neg_integer()}
  27. | {max_request_line_length, non_neg_integer()}
  28. | {middlewares, [module()]}
  29. | {onresponse, cowboy:onresponse_fun()}
  30. | {timeout, timeout()}].
  31. -export_type([opts/0]).
  32. -record(ps_request_line, {
  33. empty_lines = 0 :: non_neg_integer()
  34. }).
  35. -record(ps_header, {
  36. method = undefined :: binary(),
  37. path = undefined :: binary(),
  38. qs = undefined :: binary(),
  39. version = undefined :: cowboy:http_version(),
  40. headers = undefined :: map() | undefined, %% @todo better type than map()
  41. name = undefined :: binary()
  42. }).
  43. %% @todo We need a state where we wait for the stream process to ask for the body.
  44. %% OR DO WE
  45. %% In HTTP/2 we start receiving data before the body asks for it, even if optionally
  46. %% (and by default), so we need to be able to do the same for HTTP/1.1 too. This means
  47. %% that when we receive data (up to a certain limit, we read from the socket and decode.
  48. %% When we reach a limit, we stop reading from the socket momentarily until the stream
  49. %% process asks for more or the stream ends.
  50. %% This means that we need to keep a buffer in the stream handler (until the stream
  51. %% process asks for it). And that we need the body state to indicate how much we have
  52. %% left to read (and stop/start reading from the socket depending on value).
  53. -record(ps_body, {
  54. %% @todo flow
  55. transfer_decode_fun :: fun(), %% @todo better type
  56. transfer_decode_state :: any() %% @todo better type
  57. }).
  58. -record(stream, {
  59. %% Stream identifier.
  60. id = undefined :: cowboy_stream:streamid(),
  61. %% Stream handler state.
  62. state = undefined :: any(),
  63. %% Client HTTP version for this stream.
  64. version = undefined :: cowboy:http_version(),
  65. %% Commands queued.
  66. queue = [] :: [] %% @todo better type
  67. }).
  68. -type stream() :: #stream{}.
  69. -record(state, {
  70. parent :: pid(),
  71. ref :: ranch:ref(),
  72. socket :: inet:socket(),
  73. transport :: module(),
  74. opts = #{} :: map(),
  75. handler :: module(),
  76. %% Remote address and port for the connection.
  77. peer = undefined :: {inet:ip_address(), inet:port_number()},
  78. timer = undefined :: undefined | reference(),
  79. %% Identifier for the stream currently being read (or waiting to be received).
  80. in_streamid = 1 :: pos_integer(),
  81. %% Parsing state for the current stream or stream-to-be.
  82. in_state = #ps_request_line{} :: #ps_request_line{} | #ps_header{} | #ps_body{},
  83. %% Identifier for the stream currently being written.
  84. %% Note that out_streamid =< in_streamid.
  85. out_streamid = 1 :: pos_integer(),
  86. %% Whether we finished writing data for the current stream.
  87. out_state = wait :: wait | headers | chunked,
  88. %% The connection will be closed after this stream.
  89. last_streamid = undefined :: pos_integer(),
  90. %% Currently active HTTP/1.1 streams.
  91. streams = [] :: [stream()],
  92. %% Children which are in the process of shutting down.
  93. children = [] :: [{pid(), cowboy_stream:streamid(), timeout()}]
  94. %% @todo Automatic compression. (compress option?)
  95. %% @todo onresponse? Equivalent using streams.
  96. }).
  97. -include_lib("cowlib/include/cow_inline.hrl").
  98. -include_lib("cowlib/include/cow_parse.hrl").
  99. -spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts(), module()) -> ok.
  100. init(Parent, Ref, Socket, Transport, Opts, Handler) ->
  101. case Transport:peername(Socket) of
  102. {ok, Peer} ->
  103. LastStreamID = maps:get(max_keepalive, Opts, 100),
  104. before_loop(set_request_timeout(#state{
  105. parent=Parent, ref=Ref, socket=Socket,
  106. transport=Transport, opts=Opts, handler=Handler,
  107. peer=Peer, last_streamid=LastStreamID}), <<>>);
  108. {error, Reason} ->
  109. %% Couldn't read the peer address; connection is gone.
  110. terminate(undefined, {socket_error, Reason, 'An error has occurred on the socket.'})
  111. end.
  112. %% @todo Send a response depending on in_state and whether one was already sent.
  113. %% @todo
  114. %% Timeouts:
  115. %% - waiting for new request (if no stream is currently running)
  116. %% -> request_timeout: for whole request/headers, set at init/when we set ps_request_line{} state
  117. %% - waiting for body (if a stream requested the body to be read)
  118. %% -> read_body_timeout: amount of time we wait without receiving any data when reading the body
  119. %% - if we skip the body, skip only for a specific duration
  120. %% -> skip_body_timeout: also have a skip_body_length
  121. %% - none if we have a stream running and it didn't request the body to be read
  122. %% - global
  123. %% -> inactivity_timeout: max time to wait without anything happening before giving up
  124. before_loop(State=#state{socket=Socket, transport=Transport}, Buffer) ->
  125. %% @todo disable this when we get to the body, until the stream asks for it?
  126. %% Perhaps have a threshold for how much we're willing to read before waiting.
  127. Transport:setopts(Socket, [{active, once}]),
  128. loop(State, Buffer).
  129. loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
  130. handler=_Handler, timer=TimerRef, children=Children}, Buffer) ->
  131. {OK, Closed, Error} = Transport:messages(),
  132. receive
  133. %% Socket messages.
  134. {OK, Socket, Data} ->
  135. parse(<< Buffer/binary, Data/binary >>, State);
  136. {Closed, Socket} ->
  137. terminate(State, {socket_error, closed, 'The socket has been closed.'});
  138. {Error, Socket, Reason} ->
  139. terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
  140. %% Timeouts.
  141. {timeout, TimerRef, Reason} ->
  142. timeout(State, Reason);
  143. {timeout, _, _} ->
  144. loop(State, Buffer);
  145. %% System messages.
  146. {'EXIT', Parent, Reason} ->
  147. exit(Reason);
  148. {system, From, Request} ->
  149. sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer});
  150. %% Messages pertaining to a stream.
  151. {{Pid, StreamID}, Msg} when Pid =:= self() ->
  152. loop(info(State, StreamID, Msg), Buffer);
  153. %% Exit signal from children.
  154. Msg = {'EXIT', Pid, _} ->
  155. loop(down(State, Pid, Msg), Buffer);
  156. %% Calls from supervisor module.
  157. {'$gen_call', {From, Tag}, which_children} ->
  158. Workers = [{?MODULE, Pid, worker, [?MODULE]} || {Pid, _, _} <- Children],
  159. From ! {Tag, Workers},
  160. loop(State, Buffer);
  161. {'$gen_call', {From, Tag}, count_children} ->
  162. NbChildren = length(Children),
  163. Counts = [{specs, 1}, {active, NbChildren},
  164. {supervisors, 0}, {workers, NbChildren}],
  165. From ! {Tag, Counts},
  166. loop(State, Buffer);
  167. {'$gen_call', {From, Tag}, _} ->
  168. From ! {Tag, {error, ?MODULE}},
  169. loop(State, Buffer);
  170. %% Unknown messages.
  171. Msg ->
  172. error_logger:error_msg("Received stray message ~p.~n", [Msg]),
  173. loop(State, Buffer)
  174. %% @todo Configurable timeout. This should be a global inactivity timeout
  175. %% that triggers when really nothing happens (ie something went really wrong).
  176. after 300000 ->
  177. terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
  178. end.
  179. set_request_timeout(State0=#state{opts=Opts}) ->
  180. State = cancel_request_timeout(State0),
  181. Timeout = maps:get(request_timeout, Opts, 5000),
  182. TimerRef = erlang:start_timer(Timeout, self(), request_timeout),
  183. State#state{timer=TimerRef}.
  184. cancel_request_timeout(State=#state{timer=TimerRef}) ->
  185. ok = case TimerRef of
  186. undefined -> ok;
  187. _ -> erlang:cancel_timer(TimerRef, [{async, true}, {info, false}])
  188. end,
  189. State#state{timer=undefined}.
  190. %% @todo Honestly it would be much better if we didn't enable pipelining yet.
  191. timeout(State=#state{in_state=#ps_request_line{}}, request_timeout) ->
  192. %% @todo If other streams are running, just set the connection to be closed
  193. %% and stop trying to read from the socket?
  194. terminate(State, {connection_error, timeout, 'No request-line received before timeout.'});
  195. timeout(State=#state{socket=Socket, transport=Transport, in_state=#ps_header{}}, request_timeout) ->
  196. %% @todo If other streams are running, maybe wait for their reply before sending 408?
  197. %% -> Definitely. Either way, stop reading from the socket and make that stream the last.
  198. Transport:send(Socket, cow_http:response(408, 'HTTP/1.1', [])),
  199. terminate(State, {connection_error, timeout, 'Request headers not received before timeout.'}).
  200. %% Request-line.
  201. parse(<<>>, State) ->
  202. before_loop(State, <<>>);
  203. parse(Buffer, State=#state{in_state=#ps_request_line{empty_lines=EmptyLines}}) ->
  204. after_parse(parse_request(Buffer, State, EmptyLines));
  205. parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=undefined}}) ->
  206. after_parse(parse_header(Buffer,
  207. State#state{in_state=PS#ps_header{headers=undefined}},
  208. Headers));
  209. parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=Name}}) ->
  210. after_parse(parse_hd_before_value(Buffer,
  211. State#state{in_state=PS#ps_header{headers=undefined, name=undefined}},
  212. Headers, Name));
  213. parse(Buffer, State=#state{in_state=#ps_body{}}) ->
  214. %% @todo We do not want to get the body automatically if the request doesn't ask for it.
  215. %% We may want to get bodies that are below a threshold without waiting, and buffer them
  216. %% until the request asks, though.
  217. %% @todo Transfer-decoding must be done here.
  218. after_parse(parse_body(Buffer, State)).
  219. %% @todo Don't parse if body is finished but request isn't. Let's not parallelize for now.
  220. after_parse({request, Req=#{streamid := StreamID, headers := Headers, version := Version},
  221. State0=#state{handler=Handler, opts=Opts, streams=Streams0}, Buffer}) ->
  222. %% @todo Opts at the end. Maybe pass the same Opts we got?
  223. try Handler:init(StreamID, Req, Opts) of
  224. {Commands, StreamState} ->
  225. Streams = [#stream{id=StreamID, state=StreamState, version=Version}|Streams0],
  226. State = case maybe_req_close(State0, Headers, Version) of
  227. close -> State0#state{streams=Streams, last_streamid=StreamID};
  228. keepalive -> State0#state{streams=Streams}
  229. end,
  230. parse(Buffer, commands(State, StreamID, Commands))
  231. catch Class:Reason ->
  232. error_logger:error_msg("Exception occurred in ~s:init(~p, ~p, ~p) "
  233. "with reason ~p:~p.",
  234. [Handler, StreamID, Req, Opts, Class, Reason]),
  235. %% @todo Bad value returned here. Crashes.
  236. ok
  237. %% @todo Status code.
  238. % stream_reset(State, StreamID, {internal_error, {Class, Reason},
  239. % 'Exception occurred in StreamHandler:init/10 call.'}) %% @todo Check final arity.
  240. end;
  241. %% Streams are sequential so the body is always about the last stream created
  242. %% unless that stream has terminated.
  243. after_parse({data, StreamID, IsFin, Data, State=#state{handler=Handler,
  244. streams=Streams0=[Stream=#stream{id=StreamID, state=StreamState0}|_]}, Buffer}) ->
  245. try Handler:data(StreamID, IsFin, Data, StreamState0) of
  246. {Commands, StreamState} ->
  247. Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
  248. Stream#stream{state=StreamState}),
  249. parse(Buffer, commands(State#state{streams=Streams}, StreamID, Commands))
  250. catch Class:Reason ->
  251. error_logger:error_msg("Exception occurred in ~s:data(~p, ~p, ~p, ~p) with reason ~p:~p.",
  252. [Handler, StreamID, IsFin, Data, StreamState0, Class, Reason]),
  253. %% @todo Bad value returned here. Crashes.
  254. ok
  255. %% @todo
  256. % stream_reset(State, StreamID, {internal_error, {Class, Reason},
  257. % 'Exception occurred in StreamHandler:data/4 call.'})
  258. end;
  259. %% No corresponding stream, skip.
  260. after_parse({data, _, _, _, State, Buffer}) ->
  261. before_loop(State, Buffer);
  262. after_parse({more, State, Buffer}) ->
  263. before_loop(State, Buffer).
  264. %% Request-line.
  265. -spec parse_request(binary(), #state{}, non_neg_integer()) -> ok.
  266. %% Empty lines must be using \r\n.
  267. parse_request(<< $\n, _/bits >>, State, _) ->
  268. error_terminate(400, State, {connection_error, protocol_error,
  269. ''}); %% @todo
  270. parse_request(<< $\s, _/bits >>, State, _) ->
  271. error_terminate(400, State, {connection_error, protocol_error,
  272. ''}); %% @todo
  273. %% We limit the length of the Request-line to MaxLength to avoid endlessly
  274. %% reading from the socket and eventually crashing.
  275. parse_request(Buffer, State=#state{opts=Opts, in_streamid=InStreamID}, EmptyLines) ->
  276. MaxLength = maps:get(max_request_line_length, Opts, 8000),
  277. MaxEmptyLines = maps:get(max_empty_lines, Opts, 5),
  278. case match_eol(Buffer, 0) of
  279. nomatch when byte_size(Buffer) > MaxLength ->
  280. error_terminate(414, State, {connection_error, limit_reached,
  281. ''}); %% @todo
  282. nomatch ->
  283. {more, State#state{in_state=#ps_request_line{empty_lines=EmptyLines}}, Buffer};
  284. 1 when EmptyLines =:= MaxEmptyLines ->
  285. error_terminate(400, State, {connection_error, limit_reached,
  286. ''}); %% @todo
  287. 1 ->
  288. << _:16, Rest/bits >> = Buffer,
  289. parse_request(Rest, State, EmptyLines + 1);
  290. _ ->
  291. case Buffer of
  292. %% @todo * is only for server-wide OPTIONS request (RFC7230 5.3.4); tests
  293. << "OPTIONS * ", Rest/bits >> ->
  294. parse_version(Rest, State, <<"OPTIONS">>, <<"*">>, <<>>);
  295. % << "CONNECT ", Rest/bits >> ->
  296. % parse_authority( %% @todo
  297. %% Accept direct HTTP/2 only at the beginning of the connection.
  298. << "PRI * HTTP/2.0\r\n", _/bits >> when InStreamID =:= 1 ->
  299. %% @todo Might be worth throwing to get a clean stacktrace.
  300. http2_upgrade(State, Buffer);
  301. _ ->
  302. parse_method(Buffer, State, <<>>,
  303. maps:get(max_method_length, Opts, 32))
  304. end
  305. end.
  306. match_eol(<< $\n, _/bits >>, N) ->
  307. N;
  308. match_eol(<< _, Rest/bits >>, N) ->
  309. match_eol(Rest, N + 1);
  310. match_eol(_, _) ->
  311. nomatch.
  312. parse_method(_, State, _, 0) ->
  313. error_terminate(501, State, {connection_error, limit_reached,
  314. 'The method name is longer than configuration allows. (RFC7230 3.1.1)'});
  315. parse_method(<< C, Rest/bits >>, State, SoFar, Remaining) ->
  316. case C of
  317. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  318. ''}); %% @todo
  319. $\s -> parse_uri(Rest, State, SoFar);
  320. _ when ?IS_TOKEN(C) -> parse_method(Rest, State, << SoFar/binary, C >>, Remaining - 1);
  321. _ -> error_terminate(400, State, {connection_error, protocol_error,
  322. 'The method name must contain only valid token characters. (RFC7230 3.1.1)'})
  323. end.
  324. parse_uri(<< H, T, T, P, "://", Rest/bits >>, State, Method)
  325. when H =:= $h orelse H =:= $H, T =:= $t orelse T =:= $T;
  326. P =:= $p orelse P =:= $P ->
  327. parse_uri_skip_host(Rest, State, Method);
  328. parse_uri(<< H, T, T, P, S, "://", Rest/bits >>, State, Method)
  329. when H =:= $h orelse H =:= $H, T =:= $t orelse T =:= $T;
  330. P =:= $p orelse P =:= $P; S =:= $s orelse S =:= $S ->
  331. parse_uri_skip_host(Rest, State, Method);
  332. parse_uri(<< $/, Rest/bits >>, State, Method) ->
  333. parse_uri_path(Rest, State, Method, << $/ >>);
  334. parse_uri(_, State, _) ->
  335. error_terminate(400, State, {connection_error, protocol_error,
  336. 'Invalid request-line or request-target. (RFC7230 3.1.1, RFC7230 5.3)'}).
  337. parse_uri_skip_host(<< C, Rest/bits >>, State, Method) ->
  338. case C of
  339. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  340. ''}); %% @todo
  341. $/ -> parse_uri_path(Rest, State, Method, <<"/">>);
  342. $\s -> parse_version(Rest, State, Method, <<"/">>, <<>>);
  343. $? -> parse_uri_query(Rest, State, Method, <<"/">>, <<>>);
  344. $# -> skip_uri_fragment(Rest, State, Method, <<"/">>, <<>>);
  345. _ -> parse_uri_skip_host(Rest, State, Method)
  346. end.
  347. parse_uri_path(<< C, Rest/bits >>, State, Method, SoFar) ->
  348. case C of
  349. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  350. ''}); %% @todo
  351. $\s -> parse_version(Rest, State, Method, SoFar, <<>>);
  352. $? -> parse_uri_query(Rest, State, Method, SoFar, <<>>);
  353. $# -> skip_uri_fragment(Rest, State, Method, SoFar, <<>>);
  354. _ -> parse_uri_path(Rest, State, Method, << SoFar/binary, C >>)
  355. end.
  356. parse_uri_query(<< C, Rest/bits >>, State, M, P, SoFar) ->
  357. case C of
  358. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  359. ''}); %% @todo
  360. $\s -> parse_version(Rest, State, M, P, SoFar);
  361. $# -> skip_uri_fragment(Rest, State, M, P, SoFar);
  362. _ -> parse_uri_query(Rest, State, M, P, << SoFar/binary, C >>)
  363. end.
  364. skip_uri_fragment(<< C, Rest/bits >>, State, M, P, Q) ->
  365. case C of
  366. $\r -> error_terminate(400, State, {connection_error, protocol_error,
  367. ''}); %% @todo
  368. $\s -> parse_version(Rest, State, M, P, Q);
  369. _ -> skip_uri_fragment(Rest, State, M, P, Q)
  370. end.
  371. %% @todo Calls to parse_header should update the state.
  372. parse_version(<< "HTTP/1.1\r\n", Rest/bits >>, State, M, P, Q) ->
  373. parse_headers(Rest, State, M, P, Q, 'HTTP/1.1');
  374. parse_version(<< "HTTP/1.0\r\n", Rest/bits >>, State, M, P, Q) ->
  375. parse_headers(Rest, State, M, P, Q, 'HTTP/1.0');
  376. parse_version(<< "HTTP/1.", _, C, _/bits >>, State, _, _, _) when C =:= $\s; C =:= $\t ->
  377. error_terminate(400, State, {connection_error, protocol_error,
  378. 'Whitespace is not allowed after the HTTP version. (RFC7230 3.1.1)'});
  379. parse_version(<< C, _/bits >>, State, _, _, _) when C =:= $\s; C =:= $\t ->
  380. error_terminate(400, State, {connection_error, protocol_error,
  381. 'The separator between request target and version must be a single SP.'});
  382. parse_version(_, State, _, _, _) ->
  383. error_terminate(505, State, {connection_error, protocol_error,
  384. ''}). %% @todo
  385. parse_headers(Rest, State, M, P, Q, V) ->
  386. %% @todo Figure out the parse states.
  387. parse_header(Rest, State#state{in_state=#ps_header{
  388. method=M, path=P, qs=Q, version=V}}, #{}).
  389. %% Headers.
  390. %% We need two or more bytes in the buffer to continue.
  391. parse_header(Rest, State=#state{in_state=PS}, Headers) when byte_size(Rest) < 2 ->
  392. {more, State#state{in_state=PS#ps_header{headers=Headers}}, Rest};
  393. parse_header(<< $\r, $\n, Rest/bits >>, S, Headers) ->
  394. request(Rest, S, Headers);
  395. parse_header(Buffer, State=#state{opts=Opts, in_state=PS}, Headers) ->
  396. MaxLength = maps:get(max_header_name_length, Opts, 64),
  397. MaxHeaders = maps:get(max_headers, Opts, 100),
  398. case match_colon(Buffer, 0) of
  399. nomatch when byte_size(Buffer) > MaxLength ->
  400. error_terminate(400, State, {connection_error, limit_reached,
  401. ''}); %% @todo
  402. nomatch when length(Headers) >= MaxHeaders ->
  403. error_terminate(400, State, {connection_error, limit_reached,
  404. ''}); %% @todo
  405. nomatch ->
  406. {more, State#state{in_state=PS#ps_header{headers=Headers}}, Buffer};
  407. _ ->
  408. parse_hd_name(Buffer, State, Headers, <<>>)
  409. end.
  410. match_colon(<< $:, _/bits >>, N) ->
  411. N;
  412. match_colon(<< _, Rest/bits >>, N) ->
  413. match_colon(Rest, N + 1);
  414. match_colon(_, _) ->
  415. nomatch.
  416. parse_hd_name(<< $:, Rest/bits >>, State, H, SoFar) ->
  417. parse_hd_before_value(Rest, State, H, SoFar);
  418. parse_hd_name(<< C, _/bits >>, State, _, <<>>) when ?IS_WS(C) ->
  419. error_terminate(400, State, {connection_error, protocol_error,
  420. ''}); %% @todo
  421. parse_hd_name(<< C, Rest/bits >>, State, H, SoFar) when ?IS_WS(C) ->
  422. parse_hd_name_ws(Rest, State, H, SoFar);
  423. parse_hd_name(<< C, Rest/bits >>, State, H, SoFar) ->
  424. ?LOWER(parse_hd_name, Rest, State, H, SoFar).
  425. parse_hd_name_ws(<< C, Rest/bits >>, S, H, Name) ->
  426. case C of
  427. $\s -> parse_hd_name_ws(Rest, S, H, Name);
  428. $\t -> parse_hd_name_ws(Rest, S, H, Name);
  429. $: -> parse_hd_before_value(Rest, S, H, Name)
  430. end.
  431. parse_hd_before_value(<< $\s, Rest/bits >>, S, H, N) ->
  432. parse_hd_before_value(Rest, S, H, N);
  433. parse_hd_before_value(<< $\t, Rest/bits >>, S, H, N) ->
  434. parse_hd_before_value(Rest, S, H, N);
  435. parse_hd_before_value(Buffer, State=#state{opts=Opts, in_state=PS}, H, N) ->
  436. MaxLength = maps:get(max_header_value_length, Opts, 4096),
  437. case match_eol(Buffer, 0) of
  438. nomatch when byte_size(Buffer) > MaxLength ->
  439. error_terminate(400, State, {connection_error, limit_reached,
  440. ''}); %% @todo
  441. nomatch ->
  442. {more, State#state{in_state=PS#ps_header{headers=H, name=N}}, Buffer};
  443. _ ->
  444. parse_hd_value(Buffer, State, H, N, <<>>)
  445. end.
  446. parse_hd_value(<< $\r, $\n, Rest/bits >>, S, Headers0, Name, SoFar) ->
  447. Value = clean_value_ws_end(SoFar, byte_size(SoFar) - 1),
  448. Headers = case maps:get(Name, Headers0, undefined) of
  449. undefined -> Headers0#{Name => Value};
  450. %% The cookie header does not use proper HTTP header lists.
  451. Value0 when Name =:= <<"cookie">> -> Headers0#{Name => << Value0/binary, "; ", Value/binary >>};
  452. Value0 -> Headers0#{Name => << Value0/binary, ", ", Value/binary >>}
  453. end,
  454. parse_header(Rest, S, Headers);
  455. parse_hd_value(<< C, Rest/bits >>, S, H, N, SoFar) ->
  456. parse_hd_value(Rest, S, H, N, << SoFar/binary, C >>).
  457. clean_value_ws_end(_, -1) ->
  458. <<>>;
  459. clean_value_ws_end(Value, N) ->
  460. case binary:at(Value, N) of
  461. $\s -> clean_value_ws_end(Value, N - 1);
  462. $\t -> clean_value_ws_end(Value, N - 1);
  463. _ ->
  464. S = N + 1,
  465. << Value2:S/binary, _/bits >> = Value,
  466. Value2
  467. end.
  468. -ifdef(TEST).
  469. clean_value_ws_end_test_() ->
  470. Tests = [
  471. {<<>>, <<>>},
  472. {<<" ">>, <<>>},
  473. {<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
  474. "text/html;level=2;q=0.4, */*;q=0.5 \t \t ">>,
  475. <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
  476. "text/html;level=2;q=0.4, */*;q=0.5">>}
  477. ],
  478. [{V, fun() -> R = clean_value_ws_end(V, byte_size(V) - 1) end} || {V, R} <- Tests].
  479. horse_clean_value_ws_end() ->
  480. horse:repeat(200000,
  481. clean_value_ws_end(
  482. <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
  483. "text/html;level=2;q=0.4, */*;q=0.5 ">>,
  484. byte_size(<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
  485. "text/html;level=2;q=0.4, */*;q=0.5 ">>) - 1)
  486. ).
  487. -endif.
  488. request(Buffer, State=#state{transport=Transport, in_streamid=StreamID,
  489. in_state=#ps_header{version=Version}}, Headers) ->
  490. case maps:get(<<"host">>, Headers, undefined) of
  491. undefined when Version =:= 'HTTP/1.1' ->
  492. %% @todo Might want to not close the connection on this and next one.
  493. error_terminate(400, State, {stream_error, StreamID, protocol_error,
  494. ''}); %% @todo
  495. undefined ->
  496. request(Buffer, State, Headers, <<>>, default_port(Transport:secure()));
  497. RawHost ->
  498. try parse_host(RawHost, false, <<>>) of
  499. {Host, undefined} ->
  500. request(Buffer, State, Headers, Host, default_port(Transport:secure()));
  501. {Host, Port} ->
  502. request(Buffer, State, Headers, Host, Port)
  503. catch _:_ ->
  504. error_terminate(400, State, {stream_error, StreamID, protocol_error,
  505. ''}) %% @todo
  506. end
  507. end.
  508. -spec default_port(boolean()) -> 80 | 443.
  509. default_port(true) -> 443;
  510. default_port(_) -> 80.
  511. %% @todo Yeah probably just call the cowlib function.
  512. %% Same code as cow_http:parse_fullhost/1, but inline because we
  513. %% really want this to go fast.
  514. parse_host(<< $[, Rest/bits >>, false, <<>>) ->
  515. parse_host(Rest, true, << $[ >>);
  516. parse_host(<<>>, false, Acc) ->
  517. {Acc, undefined};
  518. parse_host(<< $:, Rest/bits >>, false, Acc) ->
  519. {Acc, list_to_integer(binary_to_list(Rest))};
  520. parse_host(<< $], Rest/bits >>, true, Acc) ->
  521. parse_host(Rest, false, << Acc/binary, $] >>);
  522. parse_host(<< C, Rest/bits >>, E, Acc) ->
  523. ?LOWER(parse_host, Rest, E, Acc).
  524. %% End of request parsing.
  525. request(Buffer, State0=#state{ref=Ref, transport=Transport, peer=Peer, in_streamid=StreamID,
  526. in_state=#ps_header{method=Method, path=Path, qs=Qs, version=Version}},
  527. Headers, Host, Port) ->
  528. Scheme = case Transport:secure() of
  529. true -> <<"https">>;
  530. false -> <<"http">>
  531. end,
  532. {HasBody, BodyLength, TDecodeFun, TDecodeState} = case Headers of
  533. #{<<"content-length">> := <<"0">>} ->
  534. {false, 0, undefined, undefined};
  535. #{<<"content-length">> := BinLength} ->
  536. Length = try
  537. cow_http_hd:parse_content_length(BinLength)
  538. catch _:_ ->
  539. error_terminate(400, State0, {stream_error, StreamID, protocol_error,
  540. ''}) %% @todo
  541. %% @todo Err should terminate here...
  542. end,
  543. {true, Length, fun cow_http_te:stream_identity/2, {0, Length}};
  544. %% @todo Better handling of transfer decoding.
  545. #{<<"transfer-encoding">> := <<"chunked">>} ->
  546. {true, undefined, fun cow_http_te:stream_chunked/2, {0, 0}};
  547. _ ->
  548. {false, 0, undefined, undefined}
  549. end,
  550. Req = #{
  551. ref => Ref,
  552. pid => self(),
  553. streamid => StreamID,
  554. peer => Peer,
  555. method => Method,
  556. scheme => Scheme,
  557. host => Host,
  558. port => Port,
  559. %% @todo So the path component needs to be normalized.
  560. path => Path,
  561. qs => Qs,
  562. version => Version,
  563. %% We are transparently taking care of transfer-encodings so
  564. %% the user code has no need to know about it.
  565. headers => maps:remove(<<"transfer-encoding">>, Headers),
  566. has_body => HasBody,
  567. body_length => BodyLength
  568. %% @todo multipart? keep state separate
  569. %% meta values (cowboy_websocket, cowboy_rest)
  570. },
  571. case is_http2_upgrade(Headers, Version) of
  572. false ->
  573. State = case HasBody of
  574. true ->
  575. cancel_request_timeout(State0#state{in_state=#ps_body{
  576. %% @todo Don't need length anymore?
  577. transfer_decode_fun = TDecodeFun,
  578. transfer_decode_state = TDecodeState
  579. }});
  580. false ->
  581. set_request_timeout(State0#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}})
  582. end,
  583. {request, Req, State, Buffer};
  584. {true, HTTP2Settings} ->
  585. http2_upgrade(State0, Buffer, HTTP2Settings, Req)
  586. end.
  587. %% HTTP/2 upgrade.
  588. is_http2_upgrade(#{<<"connection">> := Conn, <<"upgrade">> := Upgrade,
  589. <<"http2-settings">> := HTTP2Settings}, 'HTTP/1.1') ->
  590. Conns = cow_http_hd:parse_connection(Conn),
  591. case {lists:member(<<"upgrade">>, Conns), lists:member(<<"http2-settings">>, Conns)} of
  592. {true, true} ->
  593. Protocols = cow_http_hd:parse_upgrade(Upgrade),
  594. case lists:member(<<"h2c">>, Protocols) of
  595. true ->
  596. {true, HTTP2Settings};
  597. false ->
  598. false
  599. end;
  600. _ ->
  601. false
  602. end;
  603. is_http2_upgrade(_, _) ->
  604. false.
  605. %% Upgrade through an HTTP/1.1 request.
  606. %% Prior knowledge upgrade, without an HTTP/1.1 request.
  607. http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
  608. opts=Opts, handler=Handler, peer=Peer}, Buffer) ->
  609. case Transport:secure() of
  610. false ->
  611. _ = cancel_request_timeout(State),
  612. cowboy_http2:init(Parent, Ref, Socket, Transport, Opts, Handler, Peer, Buffer);
  613. true ->
  614. error_terminate(400, State, {connection_error, protocol_error,
  615. 'Clients that support HTTP/2 over TLS MUST use ALPN. (RFC7540 3.4)'})
  616. end.
  617. http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
  618. opts=Opts, handler=Handler, peer=Peer}, Buffer, HTTP2Settings, Req) ->
  619. %% @todo
  620. %% However if the client sent a body, we need to read the body in full
  621. %% and if we can't do that, return a 413 response. Some options are in order.
  622. %% Always half-closed stream coming from this side.
  623. try cow_http_hd:parse_http2_settings(HTTP2Settings) of
  624. Settings ->
  625. Transport:send(Socket, cow_http:response(101, 'HTTP/1.1', maps:to_list(#{
  626. <<"connection">> => <<"Upgrade">>,
  627. <<"upgrade">> => <<"h2c">>
  628. }))),
  629. %% @todo Possibly redirect the request if it was https.
  630. _ = cancel_request_timeout(State),
  631. cowboy_http2:init(Parent, Ref, Socket, Transport, Opts, Handler, Peer, Buffer, Settings, Req)
  632. catch _:_ ->
  633. error_terminate(400, State, {connection_error, protocol_error,
  634. 'The HTTP2-Settings header contains a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'})
  635. end.
  636. %% Request body parsing.
  637. parse_body(Buffer, State=#state{in_streamid=StreamID, in_state=
  638. PS=#ps_body{transfer_decode_fun=TDecode, transfer_decode_state=TState0}}) ->
  639. %% @todo Proper trailers.
  640. case TDecode(Buffer, TState0) of
  641. more ->
  642. %% @todo Asks for 0 or more bytes.
  643. {more, State, Buffer};
  644. {more, Data, TState} ->
  645. %% @todo Asks for 0 or more bytes.
  646. {data, StreamID, nofin, Data, State#state{in_state=
  647. PS#ps_body{transfer_decode_state=TState}}, <<>>};
  648. {more, Data, _Length, TState} when is_integer(_Length) ->
  649. %% @todo Asks for Length more bytes.
  650. {data, StreamID, nofin, Data, State#state{in_state=
  651. PS#ps_body{transfer_decode_state=TState}}, <<>>};
  652. {more, Data, Rest, TState} ->
  653. %% @todo Asks for 0 or more bytes.
  654. {data, StreamID, nofin, Data, State#state{in_state=
  655. PS#ps_body{transfer_decode_state=TState}}, Rest};
  656. {done, TotalLength, Rest} ->
  657. {data, StreamID, {fin, TotalLength}, <<>>, set_request_timeout(
  658. State#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}), Rest};
  659. {done, Data, TotalLength, Rest} ->
  660. {data, StreamID, {fin, TotalLength}, Data, set_request_timeout(
  661. State#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}), Rest}
  662. end.
  663. %% Message handling.
  664. %% @todo There is a difference in behavior between HTTP/1.1 and HTTP/2
  665. %% when an error or crash occurs after sending a 500 response. In HTTP/2
  666. %% the error will be printed, in HTTP/1.1 the error will be ignored.
  667. %% This is due to HTTP/1.1 disabling streams differently after both
  668. %% requests and responses have been sent.
  669. down(State=#state{children=Children0}, Pid, Msg) ->
  670. case lists:keytake(Pid, 1, Children0) of
  671. {value, {_, undefined, _}, Children} ->
  672. State#state{children=Children};
  673. {value, {_, StreamID, _}, Children} ->
  674. info(State#state{children=Children}, StreamID, Msg);
  675. false ->
  676. error_logger:error_msg("Received EXIT signal ~p for unknown process ~p.~n", [Msg, Pid]),
  677. State
  678. end.
  679. info(State=#state{handler=Handler, streams=Streams0}, StreamID, Msg) ->
  680. case lists:keyfind(StreamID, #stream.id, Streams0) of
  681. Stream = #stream{state=StreamState0} ->
  682. try Handler:info(StreamID, Msg, StreamState0) of
  683. {Commands, StreamState} ->
  684. Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
  685. Stream#stream{state=StreamState}),
  686. commands(State#state{streams=Streams}, StreamID, Commands)
  687. catch Class:Reason ->
  688. error_logger:error_msg("Exception occurred in ~s:info(~p, ~p, ~p) with reason ~p:~p.",
  689. [Handler, StreamID, Msg, StreamState0, Class, Reason]),
  690. ok
  691. %% @todo
  692. % stream_reset(State, StreamID, {internal_error, {Class, Reason},
  693. % 'Exception occurred in StreamHandler:info/3 call.'})
  694. end;
  695. false ->
  696. error_logger:error_msg("Received message ~p for unknown stream ~p.~n", [Msg, StreamID]),
  697. State
  698. end.
  699. %% Commands.
  700. commands(State, _, []) ->
  701. State;
  702. %% Supervise a child process.
  703. commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) ->
  704. commands(State#state{children=[{Pid, StreamID, Shutdown}|Children]}, StreamID, Tail);
  705. %% Error handling.
  706. commands(State, StreamID, [Error = {internal_error, _, _}|Tail]) ->
  707. commands(stream_reset(State, StreamID, Error), StreamID, Tail);
  708. %% Commands for a stream currently inactive.
  709. commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Commands)
  710. when Current =/= StreamID ->
  711. %% @todo We still want to handle some commands...
  712. Stream = #stream{queue=Queue} = lists:keyfind(StreamID, #stream.id, Streams0),
  713. Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
  714. Stream#stream{queue=Queue ++ Commands}),
  715. State#state{streams=Streams};
  716. %% Read the request body.
  717. commands(State, StreamID, [{flow, _Length}|Tail]) ->
  718. %% @todo We only read from socket if buffer is empty, otherwise
  719. %% we decode the buffer.
  720. %% @todo Set the body reading length to min(Length, BodyLength)
  721. commands(State, StreamID, Tail);
  722. %% Error responses are sent only if a response wasn't sent already.
  723. commands(State=#state{out_state=wait}, StreamID, [{error_response, StatusCode, Headers, Body}|Tail]) ->
  724. commands(State, StreamID, [{response, StatusCode, Headers, Body}|Tail]);
  725. commands(State, StreamID, [{error_response, _, _, _}|Tail]) ->
  726. commands(State, StreamID, Tail);
  727. %% Send a full response.
  728. %%
  729. %% @todo Kill the stream if it sent a response when one has already been sent.
  730. %% @todo Keep IsFin in the state.
  731. %% @todo Same two things above apply to DATA, possibly promise too.
  732. commands(State0=#state{socket=Socket, transport=Transport, out_state=wait, streams=Streams}, StreamID,
  733. [{response, StatusCode, Headers0, Body}|Tail]) ->
  734. %% @todo I'm pretty sure the last stream in the list is the one we want
  735. %% considering all others are queued.
  736. #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
  737. {State, Headers} = connection(State0, Headers0, StreamID, Version),
  738. %% @todo Ensure content-length is set.
  739. Response = cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers)),
  740. case Body of
  741. {sendfile, O, B, P} ->
  742. Transport:send(Socket, Response),
  743. commands(State#state{out_state=done}, StreamID, [{sendfile, fin, O, B, P}|Tail]);
  744. _ ->
  745. Transport:send(Socket, [Response, Body]),
  746. %% @todo If max number of requests, close connection.
  747. %% @todo If IsFin, maybe skip body of current request.
  748. maybe_terminate(State#state{out_state=done}, StreamID, Tail, fin)
  749. end;
  750. %% Send response headers and initiate chunked encoding.
  751. commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
  752. [{headers, StatusCode, Headers0}|Tail]) ->
  753. %% @todo Same as above.
  754. #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
  755. {State1, Headers1} = case Version of
  756. 'HTTP/1.1' ->
  757. {State0, Headers0#{<<"transfer-encoding">> => <<"chunked">>}};
  758. %% Close the connection after streaming the data to HTTP/1.0 client.
  759. %% @todo I'm guessing we need to differentiate responses with a content-length and others.
  760. 'HTTP/1.0' ->
  761. {State0#state{last_streamid=StreamID}, Headers0}
  762. end,
  763. {State, Headers} = connection(State1, Headers1, StreamID, Version),
  764. Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers))),
  765. commands(State#state{out_state=chunked}, StreamID, Tail);
  766. %% Send a response body chunk.
  767. %%
  768. %% @todo WINDOW_UPDATE stuff require us to buffer some data.
  769. %% @todo We probably want to allow Data to be the {sendfile, ...} tuple also.
  770. commands(State=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
  771. [{data, IsFin, Data}|Tail]) ->
  772. %% @todo We need to kill the stream if it tries to send data before headers.
  773. %% @todo Same as above.
  774. case lists:keyfind(StreamID, #stream.id, Streams) of
  775. #stream{version='HTTP/1.1'} ->
  776. Size = iolist_size(Data),
  777. Transport:send(Socket, [integer_to_list(Size, 16), <<"\r\n">>, Data, <<"\r\n">>]);
  778. #stream{version='HTTP/1.0'} ->
  779. Transport:send(Socket, Data)
  780. end,
  781. maybe_terminate(State, StreamID, Tail, IsFin);
  782. %% Send a file.
  783. commands(State=#state{socket=Socket, transport=Transport}, StreamID,
  784. [{sendfile, IsFin, Offset, Bytes, Path}|Tail]) ->
  785. Transport:sendfile(Socket, Path, Offset, Bytes),
  786. maybe_terminate(State, StreamID, Tail, IsFin);
  787. %% Protocol takeover.
  788. commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport,
  789. opts=Opts, children=Children}, StreamID,
  790. [{switch_protocol, Headers, Protocol, InitialState}|_Tail]) ->
  791. %% @todo This should be the last stream running otherwise we need to wait before switching.
  792. %% @todo If there's streams opened after this one, fail instead of 101.
  793. State = cancel_request_timeout(State0),
  794. %% @todo When we actually do the upgrade, we only have the one stream left, plus
  795. %% possibly some processes terminating. We need a smart strategy for handling the
  796. %% children shutdown. We can start with brutal_kill and discarding the EXIT messages
  797. %% received before switching to Websocket. Something better would be to let the
  798. %% stream processes finish but that implies the Websocket module to know about
  799. %% them and filter the messages. For now, kill them all and discard all messages
  800. %% in the mailbox.
  801. _ = [exit(Pid, kill) || {Pid, _, _} <- Children],
  802. flush(),
  803. %% Everything good, upgrade!
  804. _ = commands(State, StreamID, [{response, 101, Headers, <<>>}]),
  805. %% @todo This is no good because commands return a state normally and here it doesn't
  806. %% we need to let this module go entirely. Perhaps it should be handled directly in
  807. %% cowboy_clear/cowboy_tls? Perhaps not. We do want that Buffer.
  808. Protocol:takeover(Parent, Ref, Socket, Transport, Opts, <<>>, InitialState);
  809. %% Stream shutdown.
  810. commands(State, StreamID, [stop|Tail]) ->
  811. %% @todo Do we want to run the commands after a stop?
  812. % commands(stream_terminate(State, StreamID, stop), StreamID, Tail).
  813. %% @todo I think that's where we need to terminate streams.
  814. maybe_terminate(State, StreamID, Tail, fin);
  815. %% HTTP/1.1 does not support push; ignore.
  816. commands(State, StreamID, [{push, _, _, _, _, _, _, _}|Tail]) ->
  817. commands(State, StreamID, Tail).
  818. %% The set-cookie header is special; we can only send one cookie per header.
  819. headers_to_list(Headers0=#{<<"set-cookie">> := SetCookies}) ->
  820. Headers1 = maps:to_list(maps:remove(<<"set-cookie">>, Headers0)),
  821. Headers1 ++ [{<<"set-cookie">>, Value} || Value <- SetCookies];
  822. headers_to_list(Headers) ->
  823. maps:to_list(Headers).
  824. flush() ->
  825. receive _ -> flush() after 0 -> ok end.
  826. maybe_terminate(State, StreamID, Tail, nofin) ->
  827. commands(State, StreamID, Tail);
  828. %% @todo In these cases I'm not sure if we should continue processing commands.
  829. maybe_terminate(State=#state{last_streamid=StreamID}, StreamID, _Tail, fin) ->
  830. terminate(stream_terminate(State, StreamID, normal), normal); %% @todo Reason ok?
  831. maybe_terminate(State, StreamID, _Tail, fin) ->
  832. stream_terminate(State, StreamID, normal).
  833. stream_reset(State, StreamID, StreamError={internal_error, _, _}) ->
  834. %% @todo headers
  835. %% @todo Don't send this if there are no streams left.
  836. % Transport:send(Socket, cow_http:response(500, 'HTTP/1.1', [
  837. % {<<"content-length">>, <<"0">>}
  838. % ])),
  839. %% @todo update IsFin local
  840. % stream_terminate(State#state{out_state=done}, StreamID, StreamError).
  841. stream_terminate(State, StreamID, StreamError).
  842. stream_terminate(State=#state{socket=Socket, transport=Transport, handler=Handler,
  843. out_streamid=OutStreamID, out_state=OutState,
  844. streams=Streams0, children=Children0}, StreamID, Reason) ->
  845. {value, #stream{state=StreamState, version=Version}, Streams}
  846. = lists:keytake(StreamID, #stream.id, Streams0),
  847. _ = case OutState of
  848. wait ->
  849. Transport:send(Socket, cow_http:response(204, 'HTTP/1.1', []));
  850. chunked when Version =:= 'HTTP/1.1' ->
  851. Transport:send(Socket, <<"0\r\n\r\n">>);
  852. _ -> %% done or Version =:= 'HTTP/1.0'
  853. ok
  854. end,
  855. stream_call_terminate(StreamID, Reason, Handler, StreamState),
  856. %% @todo initiate children shutdown
  857. % Children = stream_terminate_children(Children0, StreamID, []),
  858. Children = [case C of
  859. {Pid, StreamID, Shutdown} -> {Pid, undefined, Shutdown};
  860. _ -> C
  861. end || C <- Children0],
  862. %% @todo Skip the body, if any, or drop the connection if too large.
  863. %% @todo Only do this if Current =:= StreamID.
  864. NextOutStreamID = OutStreamID + 1,
  865. case lists:keyfind(NextOutStreamID, #stream.id, Streams) of
  866. false ->
  867. %% @todo This is clearly wrong, if the stream is gone we need to check if
  868. %% there used to be such a stream, and if there was to send an error.
  869. State#state{out_streamid=NextOutStreamID, out_state=wait, streams=Streams, children=Children};
  870. #stream{queue=Commands} ->
  871. %% @todo Remove queue from the stream.
  872. commands(State#state{out_streamid=NextOutStreamID, out_state=wait,
  873. streams=Streams, children=Children}, NextOutStreamID, Commands)
  874. end.
  875. %% @todo Taken directly from _http2
  876. stream_call_terminate(StreamID, Reason, Handler, StreamState) ->
  877. try
  878. Handler:terminate(StreamID, Reason, StreamState),
  879. ok
  880. catch Class:Reason ->
  881. error_logger:error_msg("Exception occurred in ~s:terminate(~p, ~p, ~p) with reason ~p:~p.",
  882. [Handler, StreamID, Reason, StreamState, Class, Reason])
  883. end.
  884. %stream_terminate_children([], _, Acc) ->
  885. % Acc;
  886. %stream_terminate_children([{Pid, StreamID}|Tail], StreamID, Acc) ->
  887. % exit(Pid, kill),
  888. % stream_terminate_children(Tail, StreamID, Acc);
  889. %stream_terminate_children([Child|Tail], StreamID, Acc) ->
  890. % stream_terminate_children(Tail, StreamID, [Child|Acc]).
  891. %% @todo max_reqs also
  892. maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.0') ->
  893. Conns = cow_http_hd:parse_connection(Conn),
  894. case lists:member(<<"keep-alive">>, Conns) of
  895. true -> keepalive;
  896. false -> close
  897. end;
  898. maybe_req_close(_, _, 'HTTP/1.0') ->
  899. close;
  900. maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.1') ->
  901. case connection_hd_is_close(Conn) of
  902. true -> close;
  903. false -> keepalive
  904. end;
  905. maybe_req_close(_State, _, _) ->
  906. keepalive.
  907. connection(State=#state{last_streamid=StreamID}, Headers=#{<<"connection">> := Conn}, StreamID, _) ->
  908. case connection_hd_is_close(Conn) of
  909. true -> {State, Headers};
  910. %% @todo Here we need to remove keep-alive and add close, not just add close.
  911. false -> {State, Headers#{<<"connection">> => [<<"close, ">>, Conn]}}
  912. end;
  913. connection(State=#state{last_streamid=StreamID}, Headers, StreamID, _) ->
  914. {State, Headers#{<<"connection">> => <<"close">>}};
  915. connection(State, Headers=#{<<"connection">> := Conn}, StreamID, _) ->
  916. case connection_hd_is_close(Conn) of
  917. true -> {State#state{last_streamid=StreamID}, Headers};
  918. %% @todo Here we need to set keep-alive only if it wasn't set before.
  919. false -> {State, Headers}
  920. end;
  921. connection(State, Headers, _, 'HTTP/1.0') ->
  922. {State, Headers#{<<"connection">> => <<"keep-alive">>}};
  923. connection(State, Headers, _, _) ->
  924. {State, Headers}.
  925. connection_hd_is_close(Conn) ->
  926. Conns = cow_http_hd:parse_connection(iolist_to_binary(Conn)),
  927. lists:member(<<"close">>, Conns).
  928. error_terminate(StatusCode, State=#state{socket=Socket, transport=Transport}, Reason) ->
  929. Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', [
  930. {<<"content-length">>, <<"0">>}
  931. ])),
  932. terminate(State, Reason).
  933. terminate(_State, _Reason) ->
  934. exit(normal). %% @todo
  935. %% System callbacks.
  936. -spec system_continue(_, _, #state{}) -> ok.
  937. system_continue(_, _, {State, Buffer}) ->
  938. loop(State, Buffer).
  939. -spec system_terminate(any(), _, _, _) -> no_return().
  940. system_terminate(Reason, _, _, _) ->
  941. exit(Reason).
  942. -spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}.
  943. system_code_change(Misc, _, _, _) ->
  944. {ok, Misc}.