cow_http2_machine.erl 66 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618
  1. %% Copyright (c) 2018, Loïc Hoguin <essen@ninenines.eu>
  2. %%
  3. %% Permission to use, copy, modify, and/or distribute this software for any
  4. %% purpose with or without fee is hereby granted, provided that the above
  5. %% copyright notice and this permission notice appear in all copies.
  6. %%
  7. %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  8. %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  9. %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  10. %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  11. %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  12. %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  13. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  14. -module(cow_http2_machine).
  15. -export([init/2]).
  16. -export([init_stream/2]).
  17. -export([init_upgrade_stream/2]).
  18. -export([frame/2]).
  19. -export([ignored_frame/1]).
  20. -export([timeout/3]).
  21. -export([prepare_headers/5]).
  22. -export([prepare_push_promise/4]).
  23. -export([prepare_trailers/3]).
  24. -export([send_or_queue_data/4]).
  25. -export([ensure_window/2]).
  26. -export([ensure_window/3]).
  27. -export([update_window/2]).
  28. -export([update_window/3]).
  29. -export([reset_stream/2]).
  30. -export([get_connection_local_buffer_size/1]).
  31. -export([get_local_setting/2]).
  32. -export([get_remote_settings/1]).
  33. -export([get_last_streamid/1]).
  34. -export([get_stream_local_buffer_size/2]).
  35. -export([get_stream_local_state/2]).
  36. -export([get_stream_remote_state/2]).
  37. -export([is_lingering_stream/2]).
  38. -type opts() :: #{
  39. connection_window_margin_size => 0..16#7fffffff,
  40. connection_window_update_threshold => 0..16#7fffffff,
  41. enable_connect_protocol => boolean(),
  42. initial_connection_window_size => 65535..16#7fffffff,
  43. initial_stream_window_size => 0..16#7fffffff,
  44. max_connection_window_size => 0..16#7fffffff,
  45. max_concurrent_streams => non_neg_integer() | infinity,
  46. max_decode_table_size => non_neg_integer(),
  47. max_encode_table_size => non_neg_integer(),
  48. max_frame_size_received => 16384..16777215,
  49. max_frame_size_sent => 16384..16777215 | infinity,
  50. max_stream_window_size => 0..16#7fffffff,
  51. message_tag => any(),
  52. preface_timeout => timeout(),
  53. settings_timeout => timeout(),
  54. stream_window_data_threshold => 0..16#7fffffff,
  55. stream_window_margin_size => 0..16#7fffffff,
  56. stream_window_update_threshold => 0..16#7fffffff
  57. }.
  58. -export_type([opts/0]).
  59. %% The order of the fields is significant.
  60. -record(sendfile, {
  61. offset :: non_neg_integer(),
  62. bytes :: pos_integer(),
  63. path :: file:name_all()
  64. }).
  65. -record(stream, {
  66. id = undefined :: cow_http2:streamid(),
  67. %% Request method.
  68. method = undefined :: binary(),
  69. %% Whether we finished sending data.
  70. local = idle :: idle | cow_http2:fin(),
  71. %% Local flow control window (how much we can send).
  72. local_window :: integer(),
  73. %% Buffered data waiting for the flow control window to increase.
  74. local_buffer = queue:new() ::
  75. queue:queue({cow_http2:fin(), non_neg_integer(), {data, iodata()} | #sendfile{}}),
  76. local_buffer_size = 0 :: non_neg_integer(),
  77. local_trailers = undefined :: undefined | cow_http:headers(),
  78. %% Whether we finished receiving data.
  79. remote = idle :: idle | cow_http2:fin(),
  80. %% Remote flow control window (how much we accept to receive).
  81. remote_window :: integer(),
  82. %% Size expected and read from the request body.
  83. remote_expected_size = undefined :: undefined | non_neg_integer(),
  84. remote_read_size = 0 :: non_neg_integer(),
  85. %% Unparsed te header. Used to know if we can send trailers.
  86. %% Note that we can always send trailers to the server.
  87. te :: undefined | binary()
  88. }).
  89. -type stream() :: #stream{}.
  90. -type continued_frame() ::
  91. {headers, cow_http2:streamid(), cow_http2:fin(), cow_http2:head_fin(), binary()} |
  92. {push_promise, cow_http2:streamid(), cow_http2:head_fin(), cow_http2:streamid(), binary()}.
  93. -record(http2_machine, {
  94. %% Whether the HTTP/2 endpoint is a client or a server.
  95. mode :: client | server,
  96. %% HTTP/2 SETTINGS customization.
  97. opts = #{} :: opts(),
  98. %% Connection-wide frame processing state.
  99. state = settings :: settings | normal
  100. | {continuation, request | response | trailers | push_promise, continued_frame()},
  101. %% Timer for the connection preface.
  102. preface_timer = undefined :: undefined | reference(),
  103. %% Timer for the ack for a SETTINGS frame we sent.
  104. settings_timer = undefined :: undefined | reference(),
  105. %% Settings are separate for each endpoint. In addition, settings
  106. %% must be acknowledged before they can be expected to be applied.
  107. local_settings = #{
  108. % header_table_size => 4096,
  109. % enable_push => true,
  110. % max_concurrent_streams => infinity,
  111. initial_window_size => 65535
  112. % max_frame_size => 16384
  113. % max_header_list_size => infinity
  114. } :: map(),
  115. next_settings = undefined :: undefined | map(),
  116. remote_settings = #{
  117. initial_window_size => 65535
  118. } :: map(),
  119. %% Connection-wide flow control window.
  120. local_window = 65535 :: integer(), %% How much we can send.
  121. remote_window = 65535 :: integer(), %% How much we accept to receive.
  122. %% Stream identifiers.
  123. local_streamid :: pos_integer(), %% The next streamid to be used.
  124. remote_streamid = 0 :: non_neg_integer(), %% The last streamid received.
  125. %% Currently active HTTP/2 streams. Streams may be initiated either
  126. %% by the client or by the server through PUSH_PROMISE frames.
  127. streams = #{} :: #{cow_http2:streamid() => stream()},
  128. %% HTTP/2 streams that have recently been reset locally.
  129. %% We are expected to keep receiving additional frames after
  130. %% sending an RST_STREAM.
  131. local_lingering_streams = [] :: [cow_http2:streamid()],
  132. %% HTTP/2 streams that have recently been reset remotely.
  133. %% We keep a few of these around in order to reject subsequent
  134. %% frames on these streams.
  135. remote_lingering_streams = [] :: [cow_http2:streamid()],
  136. %% HPACK decoding and encoding state.
  137. decode_state = cow_hpack:init() :: cow_hpack:state(),
  138. encode_state = cow_hpack:init() :: cow_hpack:state()
  139. }).
  140. -opaque http2_machine() :: #http2_machine{}.
  141. -export_type([http2_machine/0]).
  142. -type pseudo_headers() :: #{} %% Trailers
  143. | #{ %% Responses.
  144. status := cow_http:status()
  145. } | #{ %% Normal CONNECT requests.
  146. method := binary(),
  147. authority := binary()
  148. } | #{ %% Other requests and extended CONNECT requests.
  149. method := binary(),
  150. scheme := binary(),
  151. authority := binary(),
  152. path := binary(),
  153. protocol => binary()
  154. }.
  155. %% Returns true when the given StreamID is for a local-initiated stream.
  156. -define(IS_SERVER_LOCAL(StreamID), ((StreamID rem 2) =:= 0)).
  157. -define(IS_CLIENT_LOCAL(StreamID), ((StreamID rem 2) =:= 1)).
  158. -define(IS_LOCAL(Mode, StreamID), (
  159. ((Mode =:= server) andalso ?IS_SERVER_LOCAL(StreamID))
  160. orelse
  161. ((Mode =:= client) andalso ?IS_CLIENT_LOCAL(StreamID))
  162. )).
  163. -spec init(client | server, opts()) -> {ok, iodata(), http2_machine()}.
  164. init(client, Opts) ->
  165. NextSettings = settings_init(Opts),
  166. client_preface(#http2_machine{
  167. mode=client,
  168. opts=Opts,
  169. preface_timer=start_timer(preface_timeout, Opts),
  170. settings_timer=start_timer(settings_timeout, Opts),
  171. next_settings=NextSettings,
  172. local_streamid=1
  173. });
  174. init(server, Opts) ->
  175. NextSettings = settings_init(Opts),
  176. common_preface(#http2_machine{
  177. mode=server,
  178. opts=Opts,
  179. preface_timer=start_timer(preface_timeout, Opts),
  180. settings_timer=start_timer(settings_timeout, Opts),
  181. next_settings=NextSettings,
  182. local_streamid=2
  183. }).
  184. %% @todo In Cowlib 3.0 we should always include MessageTag in the message.
  185. %% It can be set to 'undefined' if the option is missing.
  186. start_timer(Name, Opts=#{message_tag := MessageTag}) ->
  187. case maps:get(Name, Opts, 5000) of
  188. infinity -> undefined;
  189. Timeout -> erlang:start_timer(Timeout, self(), {?MODULE, MessageTag, Name})
  190. end;
  191. start_timer(Name, Opts) ->
  192. case maps:get(Name, Opts, 5000) of
  193. infinity -> undefined;
  194. Timeout -> erlang:start_timer(Timeout, self(), {?MODULE, Name})
  195. end.
  196. client_preface(State0) ->
  197. {ok, CommonPreface, State} = common_preface(State0),
  198. {ok, [
  199. <<"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n">>,
  200. CommonPreface
  201. ], State}.
  202. %% We send next_settings and use defaults until we get an ack.
  203. %%
  204. %% We also send a WINDOW_UPDATE frame for the connection when
  205. %% the user specified an initial_connection_window_size.
  206. common_preface(State=#http2_machine{opts=Opts, next_settings=NextSettings}) ->
  207. case maps:get(initial_connection_window_size, Opts, 65535) of
  208. 65535 ->
  209. {ok, cow_http2:settings(NextSettings), State};
  210. Size ->
  211. {ok, [
  212. cow_http2:settings(NextSettings),
  213. cow_http2:window_update(Size - 65535)
  214. ], update_window(Size - 65535, State)}
  215. end.
  216. settings_init(Opts) ->
  217. S0 = setting_from_opt(#{}, Opts, max_decode_table_size,
  218. header_table_size, 4096),
  219. S1 = setting_from_opt(S0, Opts, max_concurrent_streams,
  220. max_concurrent_streams, infinity),
  221. S2 = setting_from_opt(S1, Opts, initial_stream_window_size,
  222. initial_window_size, 65535),
  223. S3 = setting_from_opt(S2, Opts, max_frame_size_received,
  224. max_frame_size, 16384),
  225. %% @todo max_header_list_size
  226. setting_from_opt(S3, Opts, enable_connect_protocol,
  227. enable_connect_protocol, false).
  228. setting_from_opt(Settings, Opts, OptName, SettingName, Default) ->
  229. case maps:get(OptName, Opts, Default) of
  230. Default -> Settings;
  231. Value -> Settings#{SettingName => Value}
  232. end.
  233. -spec init_stream(binary(), State)
  234. -> {ok, cow_http2:streamid(), State} when State::http2_machine().
  235. init_stream(Method, State=#http2_machine{mode=client, local_streamid=LocalStreamID,
  236. local_settings=#{initial_window_size := RemoteWindow},
  237. remote_settings=#{initial_window_size := LocalWindow}}) ->
  238. Stream = #stream{id=LocalStreamID, method=Method,
  239. local_window=LocalWindow, remote_window=RemoteWindow},
  240. {ok, LocalStreamID, stream_store(Stream, State#http2_machine{
  241. local_streamid=LocalStreamID + 2})}.
  242. -spec init_upgrade_stream(binary(), State)
  243. -> {ok, cow_http2:streamid(), State} when State::http2_machine().
  244. init_upgrade_stream(Method, State=#http2_machine{mode=server, remote_streamid=0,
  245. local_settings=#{initial_window_size := RemoteWindow},
  246. remote_settings=#{initial_window_size := LocalWindow}}) ->
  247. Stream = #stream{id=1, method=Method,
  248. remote=fin, remote_expected_size=0,
  249. local_window=LocalWindow, remote_window=RemoteWindow, te=undefined},
  250. {ok, 1, stream_store(Stream, State#http2_machine{remote_streamid=1})}.
  251. -spec frame(cow_http2:frame(), State)
  252. -> {ok, State}
  253. | {ok, {data, cow_http2:streamid(), cow_http2:fin(), binary()}, State}
  254. | {ok, {headers, cow_http2:streamid(), cow_http2:fin(),
  255. cow_http:headers(), pseudo_headers(), non_neg_integer() | undefined}, State}
  256. | {ok, {trailers, cow_http2:streamid(), cow_http:headers()}, State}
  257. | {ok, {rst_stream, cow_http2:streamid(), cow_http2:error()}, State}
  258. | {ok, {push_promise, cow_http2:streamid(), cow_http2:streamid(),
  259. cow_http:headers(), pseudo_headers()}, State}
  260. | {ok, {goaway, cow_http2:streamid(), cow_http2:error(), binary()}, State}
  261. | {send, [{cow_http2:streamid(), cow_http2:fin(),
  262. [{data, iodata()} | #sendfile{} | {trailers, cow_http:headers()}]}], State}
  263. | {error, {stream_error, cow_http2:streamid(), cow_http2:error(), atom()}, State}
  264. | {error, {connection_error, cow_http2:error(), atom()}, State}
  265. when State::http2_machine().
  266. frame(Frame, State=#http2_machine{state=settings, preface_timer=TRef}) ->
  267. ok = case TRef of
  268. undefined -> ok;
  269. _ -> erlang:cancel_timer(TRef, [{async, true}, {info, false}])
  270. end,
  271. settings_frame(Frame, State#http2_machine{state=normal, preface_timer=undefined});
  272. frame(Frame, State=#http2_machine{state={continuation, _, _}}) ->
  273. continuation_frame(Frame, State);
  274. frame(settings_ack, State=#http2_machine{state=normal}) ->
  275. settings_ack_frame(State);
  276. frame(Frame, State=#http2_machine{state=normal}) ->
  277. case element(1, Frame) of
  278. data -> data_frame(Frame, State);
  279. headers -> headers_frame(Frame, State);
  280. priority -> priority_frame(Frame, State);
  281. rst_stream -> rst_stream_frame(Frame, State);
  282. settings -> settings_frame(Frame, State);
  283. push_promise -> push_promise_frame(Frame, State);
  284. ping -> ping_frame(Frame, State);
  285. ping_ack -> ping_ack_frame(Frame, State);
  286. goaway -> goaway_frame(Frame, State);
  287. window_update -> window_update_frame(Frame, State);
  288. continuation -> unexpected_continuation_frame(Frame, State);
  289. _ -> ignored_frame(State)
  290. end.
  291. %% DATA frame.
  292. data_frame({data, StreamID, _, _}, State=#http2_machine{mode=Mode,
  293. local_streamid=LocalStreamID, remote_streamid=RemoteStreamID})
  294. when (?IS_LOCAL(Mode, StreamID) andalso (StreamID >= LocalStreamID))
  295. orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID > RemoteStreamID)) ->
  296. {error, {connection_error, protocol_error,
  297. 'DATA frame received on a stream in idle state. (RFC7540 5.1)'},
  298. State};
  299. data_frame({data, _, _, Data}, State=#http2_machine{remote_window=ConnWindow})
  300. when byte_size(Data) > ConnWindow ->
  301. {error, {connection_error, flow_control_error,
  302. 'DATA frame overflowed the connection flow control window. (RFC7540 6.9, RFC7540 6.9.1)'},
  303. State};
  304. data_frame(Frame={data, StreamID, _, Data}, State0=#http2_machine{
  305. remote_window=ConnWindow, local_lingering_streams=Lingering}) ->
  306. DataLen = byte_size(Data),
  307. State = State0#http2_machine{remote_window=ConnWindow - DataLen},
  308. case stream_get(StreamID, State) of
  309. #stream{remote_window=StreamWindow} when StreamWindow < DataLen ->
  310. stream_reset(StreamID, State, flow_control_error,
  311. 'DATA frame overflowed the stream flow control window. (RFC7540 6.9, RFC7540 6.9.1)');
  312. Stream = #stream{remote=nofin} ->
  313. data_frame(Frame, State, Stream, DataLen);
  314. #stream{remote=idle} ->
  315. stream_reset(StreamID, State, protocol_error,
  316. 'DATA frame received before a HEADERS frame. (RFC7540 8.1, RFC7540 8.1.2.6)');
  317. #stream{remote=fin} ->
  318. stream_reset(StreamID, State, stream_closed,
  319. 'DATA frame received for a half-closed (remote) stream. (RFC7540 5.1)');
  320. undefined ->
  321. %% After we send an RST_STREAM frame and terminate a stream,
  322. %% the remote endpoint might still be sending us some more
  323. %% frames until it can process this RST_STREAM.
  324. case lists:member(StreamID, Lingering) of
  325. true ->
  326. {ok, State};
  327. false ->
  328. {error, {connection_error, stream_closed,
  329. 'DATA frame received for a closed stream. (RFC7540 5.1)'},
  330. State}
  331. end
  332. end.
  333. data_frame(Frame={data, _, IsFin, _}, State0, Stream0=#stream{id=StreamID,
  334. remote_window=StreamWindow, remote_read_size=StreamRead}, DataLen) ->
  335. Stream = Stream0#stream{remote=IsFin,
  336. remote_window=StreamWindow - DataLen,
  337. remote_read_size=StreamRead + DataLen},
  338. State = stream_store(Stream, State0),
  339. case is_body_size_valid(Stream) of
  340. true ->
  341. {ok, Frame, State};
  342. false ->
  343. stream_reset(StreamID, State, protocol_error,
  344. 'The total size of DATA frames is different than the content-length. (RFC7540 8.1.2.6)')
  345. end.
  346. %% It's always valid when no content-length header was specified.
  347. is_body_size_valid(#stream{remote_expected_size=undefined}) ->
  348. true;
  349. %% We didn't finish reading the body but the size is already larger than expected.
  350. is_body_size_valid(#stream{remote=nofin, remote_expected_size=Expected,
  351. remote_read_size=Read}) when Read > Expected ->
  352. false;
  353. is_body_size_valid(#stream{remote=nofin}) ->
  354. true;
  355. is_body_size_valid(#stream{remote=fin, remote_expected_size=Expected,
  356. remote_read_size=Expected}) ->
  357. true;
  358. %% We finished reading the body and the size read is not the one expected.
  359. is_body_size_valid(_) ->
  360. false.
  361. %% HEADERS frame.
  362. %%
  363. %% We always close the connection when we detect errors before
  364. %% decoding the headers to not waste resources on non-compliant
  365. %% endpoints, making us stricter than the RFC requires.
  366. %% Convenience record to manipulate the tuple.
  367. %% The order of the fields matter.
  368. -record(headers, {
  369. id :: cow_http2:streamid(),
  370. fin :: cow_http2:fin(),
  371. head :: cow_http2:head_fin(),
  372. data :: binary()
  373. }).
  374. headers_frame(Frame=#headers{}, State=#http2_machine{mode=Mode}) ->
  375. case Mode of
  376. server -> server_headers_frame(Frame, State);
  377. client -> client_headers_frame(Frame, State)
  378. end;
  379. %% @todo Handle the PRIORITY data, but only if this returns an ok tuple.
  380. %% @todo Do not lose the PRIORITY information if CONTINUATION frames follow.
  381. headers_frame({headers, StreamID, IsFin, IsHeadFin,
  382. _IsExclusive, _DepStreamID, _Weight, HeaderData},
  383. State=#http2_machine{mode=Mode}) ->
  384. HeadersFrame = #headers{id=StreamID, fin=IsFin, head=IsHeadFin, data=HeaderData},
  385. case Mode of
  386. server -> server_headers_frame(HeadersFrame, State);
  387. client -> client_headers_frame(HeadersFrame, State)
  388. end.
  389. %% Reject HEADERS frames with even-numbered streamid.
  390. server_headers_frame(#headers{id=StreamID}, State)
  391. when ?IS_SERVER_LOCAL(StreamID) ->
  392. {error, {connection_error, protocol_error,
  393. 'HEADERS frame received with even-numbered streamid. (RFC7540 5.1.1)'},
  394. State};
  395. %% HEADERS frame on an idle stream: new request.
  396. server_headers_frame(Frame=#headers{id=StreamID, head=IsHeadFin},
  397. State=#http2_machine{mode=server, remote_streamid=RemoteStreamID})
  398. when StreamID > RemoteStreamID ->
  399. case IsHeadFin of
  400. head_fin ->
  401. headers_decode(Frame, State, request, undefined);
  402. head_nofin ->
  403. {ok, State#http2_machine{state={continuation, request, Frame}}}
  404. end;
  405. %% Either a HEADERS frame received on (half-)closed stream,
  406. %% or a HEADERS frame containing the trailers.
  407. server_headers_frame(Frame=#headers{id=StreamID, fin=IsFin, head=IsHeadFin}, State) ->
  408. case stream_get(StreamID, State) of
  409. %% Trailers.
  410. Stream = #stream{remote=nofin} when IsFin =:= fin ->
  411. case IsHeadFin of
  412. head_fin ->
  413. headers_decode(Frame, State, trailers, Stream);
  414. head_nofin ->
  415. {ok, State#http2_machine{state={continuation, trailers, Frame}}}
  416. end;
  417. #stream{remote=nofin} ->
  418. {error, {connection_error, protocol_error,
  419. 'Trailing HEADERS frame received without the END_STREAM flag set. (RFC7540 8.1, RFC7540 8.1.2.6)'},
  420. State};
  421. _ ->
  422. {error, {connection_error, stream_closed,
  423. 'HEADERS frame received on a stream in closed or half-closed state. (RFC7540 5.1)'},
  424. State}
  425. end.
  426. %% Either a HEADERS frame received on an (half-)closed stream,
  427. %% or a HEADERS frame containing the response or the trailers.
  428. client_headers_frame(Frame=#headers{id=StreamID, fin=IsFin, head=IsHeadFin},
  429. State=#http2_machine{local_streamid=LocalStreamID, remote_streamid=RemoteStreamID})
  430. when (?IS_CLIENT_LOCAL(StreamID) andalso (StreamID < LocalStreamID))
  431. orelse ((not ?IS_CLIENT_LOCAL(StreamID)) andalso (StreamID =< RemoteStreamID)) ->
  432. case stream_get(StreamID, State) of
  433. Stream = #stream{remote=idle} ->
  434. case IsHeadFin of
  435. head_fin ->
  436. headers_decode(Frame, State, response, Stream);
  437. head_nofin ->
  438. {ok, State#http2_machine{state={continuation, response, Frame}}}
  439. end;
  440. Stream = #stream{remote=nofin} when IsFin =:= fin ->
  441. case IsHeadFin of
  442. head_fin ->
  443. headers_decode(Frame, State, trailers, Stream);
  444. head_nofin ->
  445. {ok, State#http2_machine{state={continuation, trailers, Frame}}}
  446. end;
  447. #stream{remote=nofin} ->
  448. {error, {connection_error, protocol_error,
  449. 'Trailing HEADERS frame received without the END_STREAM flag set. (RFC7540 8.1, RFC7540 8.1.2.6)'},
  450. State};
  451. _ ->
  452. {error, {connection_error, stream_closed,
  453. 'HEADERS frame received on a stream in closed or half-closed state. (RFC7540 5.1)'},
  454. State}
  455. end;
  456. %% Reject HEADERS frames received on idle streams.
  457. client_headers_frame(_, State) ->
  458. {error, {connection_error, protocol_error,
  459. 'HEADERS frame received on an idle stream. (RFC7540 5.1.1)'},
  460. State}.
  461. headers_decode(Frame=#headers{head=head_fin, data=HeaderData},
  462. State=#http2_machine{decode_state=DecodeState0}, Type, Stream) ->
  463. try cow_hpack:decode(HeaderData, DecodeState0) of
  464. {Headers, DecodeState} when Type =:= request ->
  465. headers_enforce_concurrency_limit(Frame,
  466. State#http2_machine{decode_state=DecodeState}, Type, Stream, Headers);
  467. {Headers, DecodeState} ->
  468. headers_pseudo_headers(Frame,
  469. State#http2_machine{decode_state=DecodeState}, Type, Stream, Headers)
  470. catch _:_ ->
  471. {error, {connection_error, compression_error,
  472. 'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'},
  473. State}
  474. end.
  475. headers_enforce_concurrency_limit(Frame=#headers{id=StreamID},
  476. State=#http2_machine{local_settings=LocalSettings, streams=Streams},
  477. Type, Stream, Headers) ->
  478. MaxConcurrentStreams = maps:get(max_concurrent_streams, LocalSettings, infinity),
  479. %% Using < is correct because this new stream is not included
  480. %% in the Streams variable yet and so we'll end up with +1 stream.
  481. case map_size(Streams) < MaxConcurrentStreams of
  482. true ->
  483. headers_pseudo_headers(Frame, State, Type, Stream, Headers);
  484. false ->
  485. {error, {stream_error, StreamID, refused_stream,
  486. 'Maximum number of concurrent streams has been reached. (RFC7540 5.1.2)'},
  487. State}
  488. end.
  489. headers_pseudo_headers(Frame, State=#http2_machine{local_settings=LocalSettings},
  490. Type, Stream, Headers0) when Type =:= request; Type =:= push_promise ->
  491. IsExtendedConnectEnabled = maps:get(enable_connect_protocol, LocalSettings, false),
  492. case request_pseudo_headers(Headers0, #{}) of
  493. %% Extended CONNECT method (RFC8441).
  494. {ok, PseudoHeaders=#{method := <<"CONNECT">>, scheme := _,
  495. authority := _, path := _, protocol := _}, Headers}
  496. when IsExtendedConnectEnabled ->
  497. headers_regular_headers(Frame, State, Type, Stream, PseudoHeaders, Headers);
  498. {ok, #{method := <<"CONNECT">>, scheme := _,
  499. authority := _, path := _}, _}
  500. when IsExtendedConnectEnabled ->
  501. headers_malformed(Frame, State,
  502. 'The :protocol pseudo-header MUST be sent with an extended CONNECT. (RFC8441 4)');
  503. {ok, #{protocol := _}, _} ->
  504. headers_malformed(Frame, State,
  505. 'The :protocol pseudo-header is only defined for the extended CONNECT. (RFC8441 4)');
  506. %% Normal CONNECT (no scheme/path).
  507. {ok, PseudoHeaders=#{method := <<"CONNECT">>, authority := _}, Headers}
  508. when map_size(PseudoHeaders) =:= 2 ->
  509. headers_regular_headers(Frame, State, Type, Stream, PseudoHeaders, Headers);
  510. {ok, #{method := <<"CONNECT">>}, _} ->
  511. headers_malformed(Frame, State,
  512. 'CONNECT requests only use the :method and :authority pseudo-headers. (RFC7540 8.3)');
  513. %% Other requests.
  514. {ok, PseudoHeaders=#{method := _, scheme := _, path := _}, Headers} ->
  515. headers_regular_headers(Frame, State, Type, Stream, PseudoHeaders, Headers);
  516. {ok, _, _} ->
  517. headers_malformed(Frame, State,
  518. 'A required pseudo-header was not found. (RFC7540 8.1.2.3)');
  519. {error, HumanReadable} ->
  520. headers_malformed(Frame, State, HumanReadable)
  521. end;
  522. headers_pseudo_headers(Frame=#headers{id=StreamID},
  523. State, Type=response, Stream, Headers0) ->
  524. case response_pseudo_headers(Headers0, #{}) of
  525. {ok, PseudoHeaders=#{status := _}, Headers} ->
  526. headers_regular_headers(Frame, State, Type, Stream, PseudoHeaders, Headers);
  527. {ok, _, _} ->
  528. stream_reset(StreamID, State, protocol_error,
  529. 'A required pseudo-header was not found. (RFC7540 8.1.2.4)');
  530. {error, HumanReadable} ->
  531. stream_reset(StreamID, State, protocol_error, HumanReadable)
  532. end;
  533. headers_pseudo_headers(Frame=#headers{id=StreamID},
  534. State, Type=trailers, Stream, Headers) ->
  535. case trailers_contain_pseudo_headers(Headers) of
  536. false ->
  537. headers_regular_headers(Frame, State, Type, Stream, #{}, Headers);
  538. true ->
  539. stream_reset(StreamID, State, protocol_error,
  540. 'Trailer header blocks must not contain pseudo-headers. (RFC7540 8.1.2.1)')
  541. end.
  542. headers_malformed(#headers{id=StreamID}, State, HumanReadable) ->
  543. {error, {stream_error, StreamID, protocol_error, HumanReadable}, State}.
  544. request_pseudo_headers([{<<":method">>, _}|_], #{method := _}) ->
  545. {error, 'Multiple :method pseudo-headers were found. (RFC7540 8.1.2.3)'};
  546. request_pseudo_headers([{<<":method">>, Method}|Tail], PseudoHeaders) ->
  547. request_pseudo_headers(Tail, PseudoHeaders#{method => Method});
  548. request_pseudo_headers([{<<":scheme">>, _}|_], #{scheme := _}) ->
  549. {error, 'Multiple :scheme pseudo-headers were found. (RFC7540 8.1.2.3)'};
  550. request_pseudo_headers([{<<":scheme">>, Scheme}|Tail], PseudoHeaders) ->
  551. request_pseudo_headers(Tail, PseudoHeaders#{scheme => Scheme});
  552. request_pseudo_headers([{<<":authority">>, _}|_], #{authority := _}) ->
  553. {error, 'Multiple :authority pseudo-headers were found. (RFC7540 8.1.2.3)'};
  554. request_pseudo_headers([{<<":authority">>, Authority}|Tail], PseudoHeaders) ->
  555. request_pseudo_headers(Tail, PseudoHeaders#{authority => Authority});
  556. request_pseudo_headers([{<<":path">>, _}|_], #{path := _}) ->
  557. {error, 'Multiple :path pseudo-headers were found. (RFC7540 8.1.2.3)'};
  558. request_pseudo_headers([{<<":path">>, Path}|Tail], PseudoHeaders) ->
  559. request_pseudo_headers(Tail, PseudoHeaders#{path => Path});
  560. request_pseudo_headers([{<<":protocol">>, _}|_], #{protocol := _}) ->
  561. {error, 'Multiple :protocol pseudo-headers were found. (RFC7540 8.1.2.3)'};
  562. request_pseudo_headers([{<<":protocol">>, Protocol}|Tail], PseudoHeaders) ->
  563. request_pseudo_headers(Tail, PseudoHeaders#{protocol => Protocol});
  564. request_pseudo_headers([{<<":", _/bits>>, _}|_], _) ->
  565. {error, 'An unknown or invalid pseudo-header was found. (RFC7540 8.1.2.1)'};
  566. request_pseudo_headers(Headers, PseudoHeaders) ->
  567. {ok, PseudoHeaders, Headers}.
  568. response_pseudo_headers([{<<":status">>, _}|_], #{status := _}) ->
  569. {error, 'Multiple :status pseudo-headers were found. (RFC7540 8.1.2.3)'};
  570. response_pseudo_headers([{<<":status">>, Status}|Tail], PseudoHeaders) ->
  571. try cow_http:status_to_integer(Status) of
  572. IntStatus ->
  573. response_pseudo_headers(Tail, PseudoHeaders#{status => IntStatus})
  574. catch _:_ ->
  575. {error, 'The :status pseudo-header value is invalid. (RFC7540 8.1.2.4)'}
  576. end;
  577. response_pseudo_headers([{<<":", _/bits>>, _}|_], _) ->
  578. {error, 'An unknown or invalid pseudo-header was found. (RFC7540 8.1.2.1)'};
  579. response_pseudo_headers(Headers, PseudoHeaders) ->
  580. {ok, PseudoHeaders, Headers}.
  581. trailers_contain_pseudo_headers([]) ->
  582. false;
  583. trailers_contain_pseudo_headers([{<<":", _/bits>>, _}|_]) ->
  584. true;
  585. trailers_contain_pseudo_headers([_|Tail]) ->
  586. trailers_contain_pseudo_headers(Tail).
  587. %% Rejecting invalid regular headers might be a bit too strong for clients.
  588. headers_regular_headers(Frame=#headers{id=StreamID},
  589. State, Type, Stream, PseudoHeaders, Headers) ->
  590. case regular_headers(Headers, Type) of
  591. ok when Type =:= request ->
  592. request_expected_size(Frame, State, Type, Stream, PseudoHeaders, Headers);
  593. ok when Type =:= push_promise ->
  594. push_promise_frame(Frame, State, Stream, PseudoHeaders, Headers);
  595. ok when Type =:= response ->
  596. response_expected_size(Frame, State, Type, Stream, PseudoHeaders, Headers);
  597. ok when Type =:= trailers ->
  598. trailers_frame(Frame, State, Stream, Headers);
  599. {error, HumanReadable} when Type =:= request ->
  600. headers_malformed(Frame, State, HumanReadable);
  601. {error, HumanReadable} ->
  602. stream_reset(StreamID, State, protocol_error, HumanReadable)
  603. end.
  604. regular_headers([{<<>>, _}|_], _) ->
  605. {error, 'Empty header names are not valid regular headers. (CVE-2019-9516)'};
  606. regular_headers([{<<":", _/bits>>, _}|_], _) ->
  607. {error, 'Pseudo-headers were found after regular headers. (RFC7540 8.1.2.1)'};
  608. regular_headers([{<<"connection">>, _}|_], _) ->
  609. {error, 'The connection header is not allowed. (RFC7540 8.1.2.2)'};
  610. regular_headers([{<<"keep-alive">>, _}|_], _) ->
  611. {error, 'The keep-alive header is not allowed. (RFC7540 8.1.2.2)'};
  612. regular_headers([{<<"proxy-authenticate">>, _}|_], _) ->
  613. {error, 'The proxy-authenticate header is not allowed. (RFC7540 8.1.2.2)'};
  614. regular_headers([{<<"proxy-authorization">>, _}|_], _) ->
  615. {error, 'The proxy-authorization header is not allowed. (RFC7540 8.1.2.2)'};
  616. regular_headers([{<<"transfer-encoding">>, _}|_], _) ->
  617. {error, 'The transfer-encoding header is not allowed. (RFC7540 8.1.2.2)'};
  618. regular_headers([{<<"upgrade">>, _}|_], _) ->
  619. {error, 'The upgrade header is not allowed. (RFC7540 8.1.2.2)'};
  620. regular_headers([{<<"te">>, Value}|_], request) when Value =/= <<"trailers">> ->
  621. {error, 'The te header with a value other than "trailers" is not allowed. (RFC7540 8.1.2.2)'};
  622. regular_headers([{<<"te">>, _}|_], Type) when Type =/= request ->
  623. {error, 'The te header is only allowed in request headers. (RFC7540 8.1.2.2)'};
  624. regular_headers([{Name, _}|Tail], Type) ->
  625. Pattern = [
  626. <<$A>>, <<$B>>, <<$C>>, <<$D>>, <<$E>>, <<$F>>, <<$G>>, <<$H>>, <<$I>>,
  627. <<$J>>, <<$K>>, <<$L>>, <<$M>>, <<$N>>, <<$O>>, <<$P>>, <<$Q>>, <<$R>>,
  628. <<$S>>, <<$T>>, <<$U>>, <<$V>>, <<$W>>, <<$X>>, <<$Y>>, <<$Z>>
  629. ],
  630. case binary:match(Name, Pattern) of
  631. nomatch -> regular_headers(Tail, Type);
  632. _ -> {error, 'Header names must be lowercase. (RFC7540 8.1.2)'}
  633. end;
  634. regular_headers([], _) ->
  635. ok.
  636. request_expected_size(Frame=#headers{fin=IsFin}, State, Type, Stream, PseudoHeaders, Headers) ->
  637. case [CL || {<<"content-length">>, CL} <- Headers] of
  638. [] when IsFin =:= fin ->
  639. headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0);
  640. [] ->
  641. headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, undefined);
  642. [<<"0">>] when IsFin =:= fin ->
  643. headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0);
  644. [_] when IsFin =:= fin ->
  645. headers_malformed(Frame, State,
  646. 'HEADERS frame with the END_STREAM flag contains a non-zero content-length. (RFC7540 8.1.2.6)');
  647. [BinLen] ->
  648. headers_parse_expected_size(Frame, State, Type, Stream,
  649. PseudoHeaders, Headers, BinLen);
  650. _ ->
  651. headers_malformed(Frame, State,
  652. 'Multiple content-length headers were received. (RFC7230 3.3.2)')
  653. end.
  654. response_expected_size(Frame=#headers{id=StreamID, fin=IsFin}, State, Type,
  655. Stream=#stream{method=Method}, PseudoHeaders=#{status := Status}, Headers) ->
  656. case [CL || {<<"content-length">>, CL} <- Headers] of
  657. [] when IsFin =:= fin ->
  658. headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0);
  659. [] ->
  660. headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, undefined);
  661. [_] when Status >= 100, Status =< 199 ->
  662. stream_reset(StreamID, State, protocol_error,
  663. 'Content-length header received in a 1xx response. (RFC7230 3.3.2)');
  664. [_] when Status =:= 204 ->
  665. stream_reset(StreamID, State, protocol_error,
  666. 'Content-length header received in a 204 response. (RFC7230 3.3.2)');
  667. [_] when Status >= 200, Status =< 299, Method =:= <<"CONNECT">> ->
  668. stream_reset(StreamID, State, protocol_error,
  669. 'Content-length header received in a 2xx response to a CONNECT request. (RFC7230 3.3.2).');
  670. %% Responses to HEAD requests, and 304 responses may contain
  671. %% a content-length header that must be ignored. (RFC7230 3.3.2)
  672. [_] when Method =:= <<"HEAD">> ->
  673. headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0);
  674. [_] when Status =:= 304 ->
  675. headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0);
  676. [<<"0">>] when IsFin =:= fin ->
  677. headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0);
  678. [_] when IsFin =:= fin ->
  679. stream_reset(StreamID, State, protocol_error,
  680. 'HEADERS frame with the END_STREAM flag contains a non-zero content-length. (RFC7540 8.1.2.6)');
  681. [BinLen] ->
  682. headers_parse_expected_size(Frame, State, Type, Stream,
  683. PseudoHeaders, Headers, BinLen);
  684. _ ->
  685. stream_reset(StreamID, State, protocol_error,
  686. 'Multiple content-length headers were received. (RFC7230 3.3.2)')
  687. end.
  688. headers_parse_expected_size(Frame=#headers{id=StreamID},
  689. State, Type, Stream, PseudoHeaders, Headers, BinLen) ->
  690. try cow_http_hd:parse_content_length(BinLen) of
  691. Len ->
  692. headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, Len)
  693. catch
  694. _:_ ->
  695. HumanReadable = 'The content-length header is invalid. (RFC7230 3.3.2)',
  696. case Type of
  697. request -> headers_malformed(Frame, State, HumanReadable);
  698. response -> stream_reset(StreamID, State, protocol_error, HumanReadable)
  699. end
  700. end.
  701. headers_frame(#headers{id=StreamID, fin=IsFin}, State0=#http2_machine{
  702. local_settings=#{initial_window_size := RemoteWindow},
  703. remote_settings=#{initial_window_size := LocalWindow}},
  704. Type, Stream0, PseudoHeaders, Headers, Len) ->
  705. {Stream, State1} = case Type of
  706. request ->
  707. TE = case lists:keyfind(<<"te">>, 1, Headers) of
  708. {_, TE0} -> TE0;
  709. false -> undefined
  710. end,
  711. {#stream{id=StreamID, method=maps:get(method, PseudoHeaders),
  712. remote=IsFin, remote_expected_size=Len,
  713. local_window=LocalWindow, remote_window=RemoteWindow, te=TE},
  714. State0#http2_machine{remote_streamid=StreamID}};
  715. response ->
  716. Stream1 = case PseudoHeaders of
  717. #{status := Status} when Status >= 100, Status =< 199 -> Stream0;
  718. _ -> Stream0#stream{remote=IsFin, remote_expected_size=Len}
  719. end,
  720. {Stream1, State0}
  721. end,
  722. State = stream_store(Stream, State1),
  723. {ok, {headers, StreamID, IsFin, Headers, PseudoHeaders, Len}, State}.
  724. trailers_frame(#headers{id=StreamID}, State0, Stream0, Headers) ->
  725. Stream = Stream0#stream{remote=fin},
  726. State = stream_store(Stream, State0),
  727. case is_body_size_valid(Stream) of
  728. true ->
  729. {ok, {trailers, StreamID, Headers}, State};
  730. false ->
  731. stream_reset(StreamID, State, protocol_error,
  732. 'The total size of DATA frames is different than the content-length. (RFC7540 8.1.2.6)')
  733. end.
  734. %% PRIORITY frame.
  735. %%
  736. %% @todo Handle PRIORITY frames.
  737. priority_frame(_Frame, State) ->
  738. {ok, State}.
  739. %% RST_STREAM frame.
  740. rst_stream_frame({rst_stream, StreamID, _}, State=#http2_machine{mode=Mode,
  741. local_streamid=LocalStreamID, remote_streamid=RemoteStreamID})
  742. when (?IS_LOCAL(Mode, StreamID) andalso (StreamID >= LocalStreamID))
  743. orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID > RemoteStreamID)) ->
  744. {error, {connection_error, protocol_error,
  745. 'RST_STREAM frame received on a stream in idle state. (RFC7540 5.1)'},
  746. State};
  747. rst_stream_frame({rst_stream, StreamID, Reason}, State=#http2_machine{
  748. streams=Streams0, remote_lingering_streams=Lingering0}) ->
  749. Streams = maps:remove(StreamID, Streams0),
  750. %% We only keep up to 10 streams in this state. @todo Make it configurable?
  751. Lingering = [StreamID|lists:sublist(Lingering0, 10 - 1)],
  752. {ok, {rst_stream, StreamID, Reason},
  753. State#http2_machine{streams=Streams, remote_lingering_streams=Lingering}}.
  754. %% SETTINGS frame.
  755. settings_frame({settings, Settings}, State0=#http2_machine{
  756. opts=Opts, remote_settings=Settings0}) ->
  757. State1 = State0#http2_machine{remote_settings=maps:merge(Settings0, Settings)},
  758. State2 = maps:fold(fun
  759. (header_table_size, NewSize, State=#http2_machine{encode_state=EncodeState0}) ->
  760. MaxSize = maps:get(max_encode_table_size, Opts, 4096),
  761. EncodeState = cow_hpack:set_max_size(min(NewSize, MaxSize), EncodeState0),
  762. State#http2_machine{encode_state=EncodeState};
  763. (initial_window_size, NewWindowSize, State) ->
  764. OldWindowSize = maps:get(initial_window_size, Settings0, 65535),
  765. streams_update_local_window(State, NewWindowSize - OldWindowSize);
  766. (_, _, State) ->
  767. State
  768. end, State1, Settings),
  769. case Settings of
  770. #{initial_window_size := _} -> send_data(State2);
  771. _ -> {ok, State2}
  772. end;
  773. %% We expect to receive a SETTINGS frame as part of the preface.
  774. settings_frame(_F, State=#http2_machine{mode=server}) ->
  775. {error, {connection_error, protocol_error,
  776. 'The preface sequence must be followed by a SETTINGS frame. (RFC7540 3.5)'},
  777. State};
  778. settings_frame(_F, State) ->
  779. {error, {connection_error, protocol_error,
  780. 'The preface must begin with a SETTINGS frame. (RFC7540 3.5)'},
  781. State}.
  782. %% When SETTINGS_INITIAL_WINDOW_SIZE changes we need to update
  783. %% the local stream windows for all active streams and perhaps
  784. %% resume sending data.
  785. streams_update_local_window(State=#http2_machine{streams=Streams0}, Increment) ->
  786. Streams = maps:map(fun(_, S=#stream{local_window=StreamWindow}) ->
  787. S#stream{local_window=StreamWindow + Increment}
  788. end, Streams0),
  789. State#http2_machine{streams=Streams}.
  790. %% Ack for a previously sent SETTINGS frame.
  791. settings_ack_frame(State0=#http2_machine{settings_timer=TRef,
  792. local_settings=Local0, next_settings=NextSettings}) ->
  793. ok = case TRef of
  794. undefined -> ok;
  795. _ -> erlang:cancel_timer(TRef, [{async, true}, {info, false}])
  796. end,
  797. Local = maps:merge(Local0, NextSettings),
  798. State1 = State0#http2_machine{settings_timer=undefined,
  799. local_settings=Local, next_settings=#{}},
  800. {ok, maps:fold(fun
  801. (header_table_size, MaxSize, State=#http2_machine{decode_state=DecodeState0}) ->
  802. DecodeState = cow_hpack:set_max_size(MaxSize, DecodeState0),
  803. State#http2_machine{decode_state=DecodeState};
  804. (initial_window_size, NewWindowSize, State) ->
  805. OldWindowSize = maps:get(initial_window_size, Local0, 65535),
  806. streams_update_remote_window(State, NewWindowSize - OldWindowSize);
  807. (_, _, State) ->
  808. State
  809. end, State1, NextSettings)}.
  810. %% When we receive an ack to a SETTINGS frame we sent we need to update
  811. %% the remote stream windows for all active streams.
  812. streams_update_remote_window(State=#http2_machine{streams=Streams0}, Increment) ->
  813. Streams = maps:map(fun(_, S=#stream{remote_window=StreamWindow}) ->
  814. S#stream{remote_window=StreamWindow + Increment}
  815. end, Streams0),
  816. State#http2_machine{streams=Streams}.
  817. %% PUSH_PROMISE frame.
  818. %% Convenience record to manipulate the tuple.
  819. %% The order of the fields matter.
  820. -record(push_promise, {
  821. id :: cow_http2:streamid(),
  822. head :: cow_http2:head_fin(),
  823. promised_id :: cow_http2:streamid(),
  824. data :: binary()
  825. }).
  826. push_promise_frame(_, State=#http2_machine{mode=server}) ->
  827. {error, {connection_error, protocol_error,
  828. 'PUSH_PROMISE frames MUST NOT be sent by the client. (RFC7540 6.6)'},
  829. State};
  830. push_promise_frame(_, State=#http2_machine{local_settings=#{enable_push := false}}) ->
  831. {error, {connection_error, protocol_error,
  832. 'PUSH_PROMISE frame received despite SETTINGS_ENABLE_PUSH set to 0. (RFC7540 6.6)'},
  833. State};
  834. push_promise_frame(#push_promise{promised_id=PromisedStreamID},
  835. State=#http2_machine{remote_streamid=RemoteStreamID})
  836. when PromisedStreamID =< RemoteStreamID ->
  837. {error, {connection_error, protocol_error,
  838. 'PUSH_PROMISE frame received for a promised stream in closed or half-closed state. (RFC7540 5.1, RFC7540 6.6)'},
  839. State};
  840. push_promise_frame(#push_promise{id=StreamID}, State)
  841. when not ?IS_CLIENT_LOCAL(StreamID) ->
  842. {error, {connection_error, protocol_error,
  843. 'PUSH_PROMISE frame received on a server-initiated stream. (RFC7540 6.6)'},
  844. State};
  845. push_promise_frame(Frame=#push_promise{id=StreamID, head=IsHeadFin,
  846. promised_id=PromisedStreamID, data=HeaderData}, State) ->
  847. case stream_get(StreamID, State) of
  848. Stream=#stream{remote=idle} ->
  849. case IsHeadFin of
  850. head_fin ->
  851. headers_decode(#headers{id=PromisedStreamID,
  852. fin=fin, head=IsHeadFin, data=HeaderData},
  853. State, push_promise, Stream);
  854. head_nofin ->
  855. {ok, State#http2_machine{state={continuation, push_promise, Frame}}}
  856. end;
  857. _ ->
  858. %% @todo Check if the stream is lingering. If it is, decode the frame
  859. %% and do what? That's the big question and why it's not implemented yet.
  860. % However, an endpoint that
  861. % has sent RST_STREAM on the associated stream MUST handle PUSH_PROMISE
  862. % frames that might have been created before the RST_STREAM frame is
  863. % received and processed. (RFC7540 6.6)
  864. {error, {connection_error, stream_closed,
  865. 'PUSH_PROMISE frame received on a stream in closed or half-closed state. (RFC7540 5.1, RFC7540 6.6)'},
  866. State}
  867. end.
  868. push_promise_frame(#headers{id=PromisedStreamID},
  869. State0=#http2_machine{
  870. local_settings=#{initial_window_size := RemoteWindow},
  871. remote_settings=#{initial_window_size := LocalWindow}},
  872. #stream{id=StreamID}, PseudoHeaders=#{method := Method}, Headers) ->
  873. TE = case lists:keyfind(<<"te">>, 1, Headers) of
  874. {_, TE0} -> TE0;
  875. false -> undefined
  876. end,
  877. PromisedStream = #stream{id=PromisedStreamID, method=Method,
  878. local=fin, local_window=LocalWindow,
  879. remote_window=RemoteWindow, te=TE},
  880. State = stream_store(PromisedStream,
  881. State0#http2_machine{remote_streamid=PromisedStreamID}),
  882. {ok, {push_promise, StreamID, PromisedStreamID, Headers, PseudoHeaders}, State}.
  883. %% PING frame.
  884. ping_frame({ping, _}, State) ->
  885. {ok, State}.
  886. %% Ack for a previously sent PING frame.
  887. %%
  888. %% @todo Might want to check contents but probably a waste of time.
  889. ping_ack_frame({ping_ack, _}, State) ->
  890. {ok, State}.
  891. %% GOAWAY frame.
  892. goaway_frame(Frame={goaway, _, _, _}, State) ->
  893. {ok, Frame, State}.
  894. %% WINDOW_UPDATE frame.
  895. %% Connection-wide WINDOW_UPDATE frame.
  896. window_update_frame({window_update, Increment}, State=#http2_machine{local_window=ConnWindow})
  897. when ConnWindow + Increment > 16#7fffffff ->
  898. {error, {connection_error, flow_control_error,
  899. 'The flow control window must not be greater than 2^31-1. (RFC7540 6.9.1)'},
  900. State};
  901. window_update_frame({window_update, Increment}, State=#http2_machine{local_window=ConnWindow}) ->
  902. send_data(State#http2_machine{local_window=ConnWindow + Increment});
  903. %% Stream-specific WINDOW_UPDATE frame.
  904. window_update_frame({window_update, StreamID, _}, State=#http2_machine{mode=Mode,
  905. local_streamid=LocalStreamID, remote_streamid=RemoteStreamID})
  906. when (?IS_LOCAL(Mode, StreamID) andalso (StreamID >= LocalStreamID))
  907. orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID > RemoteStreamID)) ->
  908. {error, {connection_error, protocol_error,
  909. 'WINDOW_UPDATE frame received on a stream in idle state. (RFC7540 5.1)'},
  910. State};
  911. window_update_frame({window_update, StreamID, Increment},
  912. State0=#http2_machine{remote_lingering_streams=Lingering}) ->
  913. case stream_get(StreamID, State0) of
  914. #stream{local_window=StreamWindow} when StreamWindow + Increment > 16#7fffffff ->
  915. stream_reset(StreamID, State0, flow_control_error,
  916. 'The flow control window must not be greater than 2^31-1. (RFC7540 6.9.1)');
  917. Stream0 = #stream{local_window=StreamWindow} ->
  918. send_data(Stream0#stream{local_window=StreamWindow + Increment}, State0);
  919. undefined ->
  920. %% WINDOW_UPDATE frames may be received for a short period of time
  921. %% after a stream is closed. They must be ignored.
  922. case lists:member(StreamID, Lingering) of
  923. false -> {ok, State0};
  924. true -> stream_reset(StreamID, State0, stream_closed,
  925. 'WINDOW_UPDATE frame received after the stream was reset. (RFC7540 5.1)')
  926. end
  927. end.
  928. %% CONTINUATION frame.
  929. %% Convenience record to manipulate the tuple.
  930. %% The order of the fields matter.
  931. -record(continuation, {
  932. id :: cow_http2:streamid(),
  933. head :: cow_http2:head_fin(),
  934. data :: binary()
  935. }).
  936. unexpected_continuation_frame(#continuation{}, State) ->
  937. {error, {connection_error, protocol_error,
  938. 'CONTINUATION frames MUST be preceded by a HEADERS or PUSH_PROMISE frame. (RFC7540 6.10)'},
  939. State}.
  940. continuation_frame(#continuation{id=StreamID, head=head_fin, data=HeaderFragment1},
  941. State=#http2_machine{state={continuation, Type,
  942. Frame=#headers{id=StreamID, data=HeaderFragment0}}}) ->
  943. HeaderData = <<HeaderFragment0/binary, HeaderFragment1/binary>>,
  944. headers_decode(Frame#headers{head=head_fin, data=HeaderData},
  945. State#http2_machine{state=normal}, Type, stream_get(StreamID, State));
  946. continuation_frame(#continuation{id=StreamID, head=head_fin, data=HeaderFragment1},
  947. State=#http2_machine{state={continuation, Type, #push_promise{
  948. id=StreamID, promised_id=PromisedStreamID, data=HeaderFragment0}}}) ->
  949. HeaderData = <<HeaderFragment0/binary, HeaderFragment1/binary>>,
  950. headers_decode(#headers{id=PromisedStreamID, fin=fin, head=head_fin, data=HeaderData},
  951. State#http2_machine{state=normal}, Type, undefined);
  952. continuation_frame(#continuation{id=StreamID, data=HeaderFragment1},
  953. State=#http2_machine{state={continuation, Type, ContinuedFrame0}})
  954. when element(2, ContinuedFrame0) =:= StreamID ->
  955. ContinuedFrame = case ContinuedFrame0 of
  956. #headers{data=HeaderFragment0} ->
  957. HeaderData = <<HeaderFragment0/binary, HeaderFragment1/binary>>,
  958. ContinuedFrame0#headers{data=HeaderData};
  959. #push_promise{data=HeaderFragment0} ->
  960. HeaderData = <<HeaderFragment0/binary, HeaderFragment1/binary>>,
  961. ContinuedFrame0#push_promise{data=HeaderData}
  962. end,
  963. {ok, State#http2_machine{state={continuation, Type, ContinuedFrame}}};
  964. continuation_frame(_F, State) ->
  965. {error, {connection_error, protocol_error,
  966. 'An invalid frame was received in the middle of a header block. (RFC7540 6.2)'},
  967. State}.
  968. %% Ignored frames.
  969. -spec ignored_frame(State)
  970. -> {ok, State}
  971. | {error, {connection_error, protocol_error, atom()}, State}
  972. when State::http2_machine().
  973. ignored_frame(State=#http2_machine{state={continuation, _, _}}) ->
  974. {error, {connection_error, protocol_error,
  975. 'An invalid frame was received in the middle of a header block. (RFC7540 6.2)'},
  976. State};
  977. %% @todo It might be useful to error out when we receive
  978. %% too many unknown frames. (RFC7540 10.5)
  979. ignored_frame(State) ->
  980. {ok, State}.
  981. %% Timeouts.
  982. -spec timeout(preface_timeout | settings_timeout, reference(), State)
  983. -> {ok, State}
  984. | {error, {connection_error, cow_http2:error(), atom()}, State}
  985. when State::http2_machine().
  986. timeout(preface_timeout, TRef, State=#http2_machine{preface_timer=TRef}) ->
  987. {error, {connection_error, protocol_error,
  988. 'The preface was not received in a reasonable amount of time.'},
  989. State};
  990. timeout(settings_timeout, TRef, State=#http2_machine{settings_timer=TRef}) ->
  991. {error, {connection_error, settings_timeout,
  992. 'The SETTINGS ack was not received within the configured time. (RFC7540 6.5.3)'},
  993. State};
  994. timeout(_, _, State) ->
  995. {ok, State}.
  996. %% Functions for sending a message header or body. Note that
  997. %% this module does not send data directly, instead it returns
  998. %% a value that can then be used to send the frames.
  999. -spec prepare_headers(cow_http2:streamid(), State, idle | cow_http2:fin(),
  1000. pseudo_headers(), cow_http:headers())
  1001. -> {ok, cow_http2:fin(), iodata(), State} when State::http2_machine().
  1002. prepare_headers(StreamID, State=#http2_machine{encode_state=EncodeState0},
  1003. IsFin0, PseudoHeaders, Headers0) ->
  1004. Stream = #stream{method=Method, local=idle} = stream_get(StreamID, State),
  1005. IsFin = case {IsFin0, Method} of
  1006. {idle, _} -> nofin;
  1007. {_, <<"HEAD">>} -> fin;
  1008. _ -> IsFin0
  1009. end,
  1010. Headers = merge_pseudo_headers(PseudoHeaders, remove_http11_headers(Headers0)),
  1011. {HeaderBlock, EncodeState} = cow_hpack:encode(Headers, EncodeState0),
  1012. {ok, IsFin, HeaderBlock, stream_store(Stream#stream{local=IsFin0},
  1013. State#http2_machine{encode_state=EncodeState})}.
  1014. -spec prepare_push_promise(cow_http2:streamid(), State, pseudo_headers(), cow_http:headers())
  1015. -> {ok, cow_http2:streamid(), iodata(), State}
  1016. | {error, no_push} when State::http2_machine().
  1017. prepare_push_promise(_, #http2_machine{remote_settings=#{enable_push := false}}, _, _) ->
  1018. {error, no_push};
  1019. prepare_push_promise(StreamID, State=#http2_machine{encode_state=EncodeState0,
  1020. local_settings=#{initial_window_size := RemoteWindow},
  1021. remote_settings=#{initial_window_size := LocalWindow},
  1022. local_streamid=LocalStreamID}, PseudoHeaders, Headers0) ->
  1023. #stream{local=idle} = stream_get(StreamID, State),
  1024. TE = case lists:keyfind(<<"te">>, 1, Headers0) of
  1025. {_, TE0} -> TE0;
  1026. false -> undefined
  1027. end,
  1028. Headers = merge_pseudo_headers(PseudoHeaders, remove_http11_headers(Headers0)),
  1029. {HeaderBlock, EncodeState} = cow_hpack:encode(Headers, EncodeState0),
  1030. {ok, LocalStreamID, HeaderBlock, stream_store(
  1031. #stream{id=LocalStreamID, method=maps:get(method, PseudoHeaders),
  1032. remote=fin, remote_expected_size=0,
  1033. local_window=LocalWindow, remote_window=RemoteWindow, te=TE},
  1034. State#http2_machine{encode_state=EncodeState, local_streamid=LocalStreamID + 2})}.
  1035. remove_http11_headers(Headers) ->
  1036. RemoveHeaders0 = [
  1037. <<"keep-alive">>,
  1038. <<"proxy-connection">>,
  1039. <<"transfer-encoding">>,
  1040. <<"upgrade">>
  1041. ],
  1042. RemoveHeaders = case lists:keyfind(<<"connection">>, 1, Headers) of
  1043. false ->
  1044. RemoveHeaders0;
  1045. {_, ConnHd} ->
  1046. %% We do not need to worry about any "close" header because
  1047. %% that header name is reserved.
  1048. Connection = cow_http_hd:parse_connection(ConnHd),
  1049. Connection ++ [<<"connection">>|RemoveHeaders0]
  1050. end,
  1051. lists:filter(fun({Name, _}) ->
  1052. not lists:member(Name, RemoveHeaders)
  1053. end, Headers).
  1054. merge_pseudo_headers(PseudoHeaders, Headers0) ->
  1055. lists:foldl(fun
  1056. ({status, Status}, Acc) when is_integer(Status) ->
  1057. [{<<":status">>, integer_to_binary(Status)}|Acc];
  1058. ({Name, Value}, Acc) ->
  1059. [{iolist_to_binary([$:, atom_to_binary(Name, latin1)]), Value}|Acc]
  1060. end, Headers0, maps:to_list(PseudoHeaders)).
  1061. -spec prepare_trailers(cow_http2:streamid(), State, cow_http:headers())
  1062. -> {ok, iodata(), State} when State::http2_machine().
  1063. prepare_trailers(StreamID, State=#http2_machine{encode_state=EncodeState0}, Trailers) ->
  1064. Stream = #stream{local=nofin} = stream_get(StreamID, State),
  1065. {HeaderBlock, EncodeState} = cow_hpack:encode(Trailers, EncodeState0),
  1066. {ok, HeaderBlock, stream_store(Stream#stream{local=fin},
  1067. State#http2_machine{encode_state=EncodeState})}.
  1068. -spec send_or_queue_data(cow_http2:streamid(), State, cow_http2:fin(), DataOrFileOrTrailers)
  1069. -> {ok, State}
  1070. | {send, [{cow_http2:streamid(), cow_http2:fin(), [DataOrFileOrTrailers]}], State}
  1071. when State::http2_machine(), DataOrFileOrTrailers::
  1072. {data, iodata()} | #sendfile{} | {trailers, cow_http:headers()}.
  1073. send_or_queue_data(StreamID, State0=#http2_machine{opts=Opts, local_window=ConnWindow},
  1074. IsFin0, DataOrFileOrTrailers0) ->
  1075. %% @todo Probably just ignore if the method was HEAD.
  1076. Stream0 = #stream{
  1077. local=nofin,
  1078. local_window=StreamWindow,
  1079. local_buffer_size=BufferSize,
  1080. te=TE0
  1081. } = stream_get(StreamID, State0),
  1082. DataOrFileOrTrailers = case DataOrFileOrTrailers0 of
  1083. {trailers, _} ->
  1084. %% We only accept TE headers containing exactly "trailers" (RFC7540 8.1.2.1).
  1085. TE = try cow_http_hd:parse_te(TE0) of
  1086. {trailers, []} -> trailers;
  1087. _ -> no_trailers
  1088. catch _:_ ->
  1089. %% If we can't parse the TE header, assume we can't send trailers.
  1090. no_trailers
  1091. end,
  1092. case TE of
  1093. trailers ->
  1094. DataOrFileOrTrailers0;
  1095. no_trailers ->
  1096. {data, <<>>}
  1097. end;
  1098. _ ->
  1099. DataOrFileOrTrailers0
  1100. end,
  1101. SendSize = case DataOrFileOrTrailers of
  1102. {data, D} -> BufferSize + iolist_size(D);
  1103. #sendfile{bytes=B} -> BufferSize + B;
  1104. {trailers, _} -> 0
  1105. end,
  1106. MinSendSize = maps:get(stream_window_data_threshold, Opts, 16384),
  1107. if
  1108. %% If we cannot send the data all at once and the window
  1109. %% is smaller than we are willing to send at a minimum,
  1110. %% we queue the data directly.
  1111. (StreamWindow < MinSendSize)
  1112. andalso ((StreamWindow < SendSize) orelse (ConnWindow < SendSize)) ->
  1113. {ok, stream_store(queue_data(Stream0, IsFin0, DataOrFileOrTrailers, in), State0)};
  1114. true ->
  1115. case send_or_queue_data(Stream0, State0, [], IsFin0, DataOrFileOrTrailers, in) of
  1116. {ok, Stream, State, []} ->
  1117. {ok, stream_store(Stream, State)};
  1118. {ok, Stream=#stream{local=IsFin}, State, SendData} ->
  1119. {send, [{StreamID, IsFin, lists:reverse(SendData)}], stream_store(Stream, State)}
  1120. end
  1121. end.
  1122. %% Internal data sending/queuing functions.
  1123. %% @todo Should we ever want to implement the PRIORITY mechanism,
  1124. %% this would be the place to do it. Right now, we just go over
  1125. %% all streams and send what we can until either everything is
  1126. %% sent or we run out of space in the window.
  1127. send_data(State0=#http2_machine{streams=Streams0}) ->
  1128. Iterator = maps:iterator(Streams0),
  1129. case send_data_for_all_streams(maps:next(Iterator), Streams0, State0, []) of
  1130. {ok, Streams, State, []} ->
  1131. {ok, State#http2_machine{streams=Streams}};
  1132. {ok, Streams, State, Send} ->
  1133. {send, Send, State#http2_machine{streams=Streams}}
  1134. end.
  1135. send_data_for_all_streams(none, Streams, State, Send) ->
  1136. {ok, Streams, State, Send};
  1137. %% While technically we should never get < 0 here, let's be on the safe side.
  1138. send_data_for_all_streams(_, Streams, State=#http2_machine{local_window=ConnWindow}, Send)
  1139. when ConnWindow =< 0 ->
  1140. {ok, Streams, State, Send};
  1141. %% We rely on send_data_for_one_stream/3 to do all the necessary checks about the stream.
  1142. send_data_for_all_streams({StreamID, Stream0, Iterator}, Streams, State0, Send) ->
  1143. case send_data_for_one_stream(Stream0, State0, []) of
  1144. {ok, Stream, State, []} ->
  1145. send_data_for_all_streams(maps:next(Iterator),
  1146. Streams#{StreamID => Stream}, State, Send);
  1147. %% We need to remove the stream here because we do not use stream_store/2.
  1148. {ok, #stream{local=fin, remote=fin}, State, SendData} ->
  1149. send_data_for_all_streams(maps:next(Iterator),
  1150. maps:remove(StreamID, Streams), State, [{StreamID, fin, SendData}|Send]);
  1151. {ok, Stream=#stream{local=IsFin}, State, SendData} ->
  1152. send_data_for_all_streams(maps:next(Iterator),
  1153. Streams#{StreamID => Stream}, State, [{StreamID, IsFin, SendData}|Send])
  1154. end.
  1155. send_data(Stream0, State0) ->
  1156. case send_data_for_one_stream(Stream0, State0, []) of
  1157. {ok, Stream, State, []} ->
  1158. {ok, stream_store(Stream, State)};
  1159. {ok, Stream=#stream{id=StreamID, local=IsFin}, State, SendData} ->
  1160. {send, [{StreamID, IsFin, SendData}], stream_store(Stream, State)}
  1161. end.
  1162. send_data_for_one_stream(Stream=#stream{local=nofin, local_buffer_size=0,
  1163. local_trailers=Trailers}, State, SendAcc) when Trailers =/= undefined ->
  1164. {ok, Stream, State, lists:reverse([{trailers, Trailers}|SendAcc])};
  1165. send_data_for_one_stream(Stream=#stream{local=nofin, local_buffer=Q0, local_buffer_size=0},
  1166. State, SendAcc) ->
  1167. case queue:len(Q0) of
  1168. 0 ->
  1169. {ok, Stream, State, lists:reverse(SendAcc)};
  1170. 1 ->
  1171. %% We know there is a final empty data frame in the queue.
  1172. %% We need to mark the stream as complete.
  1173. {{value, {fin, 0, _}}, Q} = queue:out(Q0),
  1174. {ok, Stream#stream{local=fin, local_buffer=Q}, State, lists:reverse(SendAcc)}
  1175. end;
  1176. send_data_for_one_stream(Stream=#stream{local=IsFin, local_window=StreamWindow,
  1177. local_buffer_size=BufferSize}, State=#http2_machine{local_window=ConnWindow}, SendAcc)
  1178. when ConnWindow =< 0; IsFin =:= fin; StreamWindow =< 0; BufferSize =:= 0 ->
  1179. {ok, Stream, State, lists:reverse(SendAcc)};
  1180. send_data_for_one_stream(Stream0=#stream{local_window=StreamWindow,
  1181. local_buffer=Q0, local_buffer_size=BufferSize},
  1182. State0=#http2_machine{opts=Opts, local_window=ConnWindow}, SendAcc0) ->
  1183. MinSendSize = maps:get(stream_window_data_threshold, Opts, 16384),
  1184. if
  1185. %% If we cannot send the entire buffer at once and the window
  1186. %% is smaller than we are willing to send at a minimum, do nothing.
  1187. %%
  1188. %% We only do this check the first time we go through this function;
  1189. %% we want to send as much data as possible IF we send some.
  1190. (SendAcc0 =:= []) andalso (StreamWindow < MinSendSize)
  1191. andalso ((StreamWindow < BufferSize) orelse (ConnWindow < BufferSize)) ->
  1192. {ok, Stream0, State0, []};
  1193. true ->
  1194. %% We know there is an item in the queue.
  1195. {{value, {IsFin, DataSize, Data}}, Q} = queue:out(Q0),
  1196. Stream1 = Stream0#stream{local_buffer=Q, local_buffer_size=BufferSize - DataSize},
  1197. {ok, Stream, State, SendAcc}
  1198. = send_or_queue_data(Stream1, State0, SendAcc0, IsFin, Data, in_r),
  1199. send_data_for_one_stream(Stream, State, SendAcc)
  1200. end.
  1201. %% We can send trailers immediately if the queue is empty, otherwise we queue.
  1202. %% We always send trailer frames even if the window is empty.
  1203. send_or_queue_data(Stream=#stream{local_buffer_size=0},
  1204. State, SendAcc, fin, {trailers, Trailers}, _) ->
  1205. {ok, Stream, State, [{trailers, Trailers}|SendAcc]};
  1206. send_or_queue_data(Stream, State, SendAcc, fin, {trailers, Trailers}, _) ->
  1207. {ok, Stream#stream{local_trailers=Trailers}, State, SendAcc};
  1208. %% Send data immediately if we can, buffer otherwise.
  1209. send_or_queue_data(Stream=#stream{local_window=StreamWindow},
  1210. State=#http2_machine{local_window=ConnWindow},
  1211. SendAcc, IsFin, Data, In)
  1212. when ConnWindow =< 0; StreamWindow =< 0 ->
  1213. {ok, queue_data(Stream, IsFin, Data, In), State, SendAcc};
  1214. send_or_queue_data(Stream=#stream{local_window=StreamWindow},
  1215. State=#http2_machine{opts=Opts, remote_settings=RemoteSettings,
  1216. local_window=ConnWindow}, SendAcc, IsFin, Data, In) ->
  1217. RemoteMaxFrameSize = maps:get(max_frame_size, RemoteSettings, 16384),
  1218. ConfiguredMaxFrameSize = maps:get(max_frame_size_sent, Opts, infinity),
  1219. MaxSendSize = min(
  1220. min(ConnWindow, StreamWindow),
  1221. min(RemoteMaxFrameSize, ConfiguredMaxFrameSize)
  1222. ),
  1223. case Data of
  1224. File = #sendfile{bytes=Bytes} when Bytes =< MaxSendSize ->
  1225. {ok, Stream#stream{local=IsFin, local_window=StreamWindow - Bytes},
  1226. State#http2_machine{local_window=ConnWindow - Bytes},
  1227. [File|SendAcc]};
  1228. File = #sendfile{offset=Offset, bytes=Bytes} ->
  1229. send_or_queue_data(Stream#stream{local_window=StreamWindow - MaxSendSize},
  1230. State#http2_machine{local_window=ConnWindow - MaxSendSize},
  1231. [File#sendfile{bytes=MaxSendSize}|SendAcc], IsFin,
  1232. File#sendfile{offset=Offset + MaxSendSize, bytes=Bytes - MaxSendSize}, In);
  1233. {data, Iolist0} ->
  1234. IolistSize = iolist_size(Iolist0),
  1235. if
  1236. IolistSize =< MaxSendSize ->
  1237. {ok, Stream#stream{local=IsFin, local_window=StreamWindow - IolistSize},
  1238. State#http2_machine{local_window=ConnWindow - IolistSize},
  1239. [{data, Iolist0}|SendAcc]};
  1240. true ->
  1241. {Iolist, More} = cow_iolists:split(MaxSendSize, Iolist0),
  1242. send_or_queue_data(Stream#stream{local_window=StreamWindow - MaxSendSize},
  1243. State#http2_machine{local_window=ConnWindow - MaxSendSize},
  1244. [{data, Iolist}|SendAcc], IsFin, {data, More}, In)
  1245. end
  1246. end.
  1247. queue_data(Stream=#stream{local_buffer=Q0, local_buffer_size=Size0}, IsFin, Data, In) ->
  1248. DataSize = case Data of
  1249. {sendfile, _, Bytes, _} -> Bytes;
  1250. {data, Iolist} -> iolist_size(Iolist)
  1251. end,
  1252. %% Never queue non-final empty data frames.
  1253. case {DataSize, IsFin} of
  1254. {0, nofin} ->
  1255. Stream;
  1256. _ ->
  1257. Q = queue:In({IsFin, DataSize, Data}, Q0),
  1258. Stream#stream{local_buffer=Q, local_buffer_size=Size0 + DataSize}
  1259. end.
  1260. %% Public interface to update the flow control window.
  1261. %%
  1262. %% The ensure_window function applies heuristics to avoid updating the
  1263. %% window when it is not necessary. The update_window function updates
  1264. %% the window unconditionally.
  1265. %%
  1266. %% The ensure_window function should be called when requesting more
  1267. %% data (for example when reading a request or response body) as well
  1268. %% as when receiving new data. Failure to do so may result in the
  1269. %% window being depleted.
  1270. %%
  1271. %% The heuristics dictating whether the window must be updated and
  1272. %% what the window size is depends on three options (margin, max
  1273. %% and threshold) along with the Size argument. The window increment
  1274. %% returned by this function may therefore be smaller than the Size
  1275. %% argument. On the other hand the total window allocated over many
  1276. %% calls may end up being larger than the initial Size argument. As
  1277. %% a result, it is the responsibility of the caller to ensure that
  1278. %% the Size argument is never lower than 0.
  1279. -spec ensure_window(non_neg_integer(), State)
  1280. -> ok | {ok, pos_integer(), State} when State::http2_machine().
  1281. ensure_window(Size, State=#http2_machine{opts=Opts, remote_window=RemoteWindow}) ->
  1282. case ensure_window(Size, RemoteWindow, connection, Opts) of
  1283. ok ->
  1284. ok;
  1285. {ok, Increment} ->
  1286. {ok, Increment, State#http2_machine{remote_window=RemoteWindow + Increment}}
  1287. end.
  1288. -spec ensure_window(cow_http2:streamid(), non_neg_integer(), State)
  1289. -> ok | {ok, pos_integer(), State} when State::http2_machine().
  1290. ensure_window(StreamID, Size, State=#http2_machine{opts=Opts}) ->
  1291. case stream_get(StreamID, State) of
  1292. %% For simplicity's sake, we do not consider attempts to ensure the window
  1293. %% of a terminated stream to be errors. We simply act as if the stream
  1294. %% window is large enough.
  1295. undefined ->
  1296. ok;
  1297. Stream = #stream{remote_window=RemoteWindow} ->
  1298. case ensure_window(Size, RemoteWindow, stream, Opts) of
  1299. ok ->
  1300. ok;
  1301. {ok, Increment} ->
  1302. {ok, Increment, stream_store(Stream#stream{remote_window=RemoteWindow + Increment}, State)}
  1303. end
  1304. end.
  1305. %% No need to update the window when we are not expecting data.
  1306. ensure_window(0, _, _, _) ->
  1307. ok;
  1308. %% No need to update the window when it is already high enough.
  1309. ensure_window(Size, Window, _, _) when Size =< Window ->
  1310. ok;
  1311. ensure_window(Size0, Window, Type, Opts) ->
  1312. Threshold = ensure_window_threshold(Type, Opts),
  1313. if
  1314. %% We do not update the window when it is higher than the threshold.
  1315. Window > Threshold ->
  1316. ok;
  1317. true ->
  1318. Margin = ensure_window_margin(Type, Opts),
  1319. Size = Size0 + Margin,
  1320. MaxWindow = ensure_window_max(Type, Opts),
  1321. Increment = if
  1322. %% We cannot go above the maximum window size.
  1323. Size > MaxWindow -> MaxWindow - Window;
  1324. true -> Size - Window
  1325. end,
  1326. case Increment of
  1327. 0 -> ok;
  1328. _ -> {ok, Increment}
  1329. end
  1330. end.
  1331. %% Margin defaults to the default initial window size.
  1332. ensure_window_margin(connection, Opts) ->
  1333. maps:get(connection_window_margin_size, Opts, 65535);
  1334. ensure_window_margin(stream, Opts) ->
  1335. maps:get(stream_window_margin_size, Opts, 65535).
  1336. %% Max window defaults to the max value allowed by the protocol.
  1337. ensure_window_max(connection, Opts) ->
  1338. maps:get(max_connection_window_size, Opts, 16#7fffffff);
  1339. ensure_window_max(stream, Opts) ->
  1340. maps:get(max_stream_window_size, Opts, 16#7fffffff).
  1341. %% Threshold defaults to 10 times the default frame size.
  1342. ensure_window_threshold(connection, Opts) ->
  1343. maps:get(connection_window_update_threshold, Opts, 163840);
  1344. ensure_window_threshold(stream, Opts) ->
  1345. maps:get(stream_window_update_threshold, Opts, 163840).
  1346. -spec update_window(1..16#7fffffff, State)
  1347. -> State when State::http2_machine().
  1348. update_window(Size, State=#http2_machine{remote_window=RemoteWindow})
  1349. when Size > 0 ->
  1350. State#http2_machine{remote_window=RemoteWindow + Size}.
  1351. -spec update_window(cow_http2:streamid(), 1..16#7fffffff, State)
  1352. -> State when State::http2_machine().
  1353. update_window(StreamID, Size, State)
  1354. when Size > 0 ->
  1355. Stream = #stream{remote_window=RemoteWindow} = stream_get(StreamID, State),
  1356. stream_store(Stream#stream{remote_window=RemoteWindow + Size}, State).
  1357. %% Public interface to reset streams.
  1358. -spec reset_stream(cow_http2:streamid(), State)
  1359. -> {ok, State} | {error, not_found} when State::http2_machine().
  1360. reset_stream(StreamID, State=#http2_machine{streams=Streams0}) ->
  1361. case maps:take(StreamID, Streams0) of
  1362. {_, Streams} ->
  1363. {ok, stream_linger(StreamID, State#http2_machine{streams=Streams})};
  1364. error ->
  1365. {error, not_found}
  1366. end.
  1367. %% Retrieve the buffer size for all streams.
  1368. -spec get_connection_local_buffer_size(http2_machine()) -> non_neg_integer().
  1369. get_connection_local_buffer_size(#http2_machine{streams=Streams}) ->
  1370. maps:fold(fun(_, #stream{local_buffer_size=Size}, Acc) ->
  1371. Acc + Size
  1372. end, 0, Streams).
  1373. %% Retrieve a setting value, or its default value if not set.
  1374. -spec get_local_setting(atom(), http2_machine()) -> atom() | integer().
  1375. get_local_setting(Key, #http2_machine{local_settings=Settings}) ->
  1376. maps:get(Key, Settings, default_setting_value(Key)).
  1377. -spec get_remote_settings(http2_machine()) -> map().
  1378. get_remote_settings(#http2_machine{mode=Mode, remote_settings=Settings}) ->
  1379. Defaults0 = #{
  1380. header_table_size => default_setting_value(header_table_size),
  1381. enable_push => default_setting_value(enable_push),
  1382. max_concurrent_streams => default_setting_value(max_concurrent_streams),
  1383. initial_window_size => default_setting_value(initial_window_size),
  1384. max_frame_size => default_setting_value(max_frame_size),
  1385. max_header_list_size => default_setting_value(max_header_list_size)
  1386. },
  1387. Defaults = case Mode of
  1388. server ->
  1389. Defaults0#{enable_connect_protocol => default_setting_value(enable_connect_protocol)};
  1390. client ->
  1391. Defaults0
  1392. end,
  1393. maps:merge(Defaults, Settings).
  1394. default_setting_value(header_table_size) -> 4096;
  1395. default_setting_value(enable_push) -> true;
  1396. default_setting_value(max_concurrent_streams) -> infinity;
  1397. default_setting_value(initial_window_size) -> 65535;
  1398. default_setting_value(max_frame_size) -> 16384;
  1399. default_setting_value(max_header_list_size) -> infinity;
  1400. default_setting_value(enable_connect_protocol) -> false.
  1401. %% Function to obtain the last known streamid received
  1402. %% for the purposes of sending a GOAWAY frame and closing the connection.
  1403. -spec get_last_streamid(http2_machine()) -> cow_http2:streamid().
  1404. get_last_streamid(#http2_machine{remote_streamid=RemoteStreamID}) ->
  1405. RemoteStreamID.
  1406. %% Retrieve the local buffer size for a stream.
  1407. -spec get_stream_local_buffer_size(cow_http2:streamid(), http2_machine())
  1408. -> {ok, non_neg_integer()} | {error, not_found | closed}.
  1409. get_stream_local_buffer_size(StreamID, State=#http2_machine{mode=Mode,
  1410. local_streamid=LocalStreamID, remote_streamid=RemoteStreamID}) ->
  1411. case stream_get(StreamID, State) of
  1412. #stream{local_buffer_size=Size} ->
  1413. {ok, Size};
  1414. undefined when (?IS_LOCAL(Mode, StreamID) andalso (StreamID < LocalStreamID))
  1415. orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID =< RemoteStreamID)) ->
  1416. {error, closed};
  1417. undefined ->
  1418. {error, not_found}
  1419. end.
  1420. %% Retrieve the local state for a stream, including the state in the queue.
  1421. -spec get_stream_local_state(cow_http2:streamid(), http2_machine())
  1422. -> {ok, idle | cow_http2:fin(), empty | nofin | fin} | {error, not_found | closed}.
  1423. get_stream_local_state(StreamID, State=#http2_machine{mode=Mode,
  1424. local_streamid=LocalStreamID, remote_streamid=RemoteStreamID}) ->
  1425. case stream_get(StreamID, State) of
  1426. #stream{local=IsFin, local_buffer=Q, local_trailers=undefined} ->
  1427. IsQueueFin = case queue:peek_r(Q) of
  1428. empty -> empty;
  1429. {value, {IsQueueFin0, _, _}} -> IsQueueFin0
  1430. end,
  1431. {ok, IsFin, IsQueueFin};
  1432. %% Trailers are queued so the local state is fin after the queue is drained.
  1433. #stream{local=IsFin} ->
  1434. {ok, IsFin, fin};
  1435. undefined when (?IS_LOCAL(Mode, StreamID) andalso (StreamID < LocalStreamID))
  1436. orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID =< RemoteStreamID)) ->
  1437. {error, closed};
  1438. undefined ->
  1439. {error, not_found}
  1440. end.
  1441. %% Retrieve the remote state for a stream.
  1442. -spec get_stream_remote_state(cow_http2:streamid(), http2_machine())
  1443. -> {ok, idle | cow_http2:fin()} | {error, not_found | closed}.
  1444. get_stream_remote_state(StreamID, State=#http2_machine{mode=Mode,
  1445. local_streamid=LocalStreamID, remote_streamid=RemoteStreamID}) ->
  1446. case stream_get(StreamID, State) of
  1447. #stream{remote=IsFin} ->
  1448. {ok, IsFin};
  1449. undefined when (?IS_LOCAL(Mode, StreamID) andalso (StreamID < LocalStreamID))
  1450. orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID =< RemoteStreamID)) ->
  1451. {error, closed};
  1452. undefined ->
  1453. {error, not_found}
  1454. end.
  1455. %% Query whether the stream was reset recently by the remote endpoint.
  1456. -spec is_lingering_stream(cow_http2:streamid(), http2_machine()) -> boolean().
  1457. is_lingering_stream(StreamID, #http2_machine{
  1458. local_lingering_streams=Local, remote_lingering_streams=Remote}) ->
  1459. case lists:member(StreamID, Local) of
  1460. true -> true;
  1461. false -> lists:member(StreamID, Remote)
  1462. end.
  1463. %% Stream-related functions.
  1464. stream_get(StreamID, #http2_machine{streams=Streams}) ->
  1465. maps:get(StreamID, Streams, undefined).
  1466. stream_store(#stream{id=StreamID, local=fin, remote=fin},
  1467. State=#http2_machine{streams=Streams0}) ->
  1468. Streams = maps:remove(StreamID, Streams0),
  1469. State#http2_machine{streams=Streams};
  1470. stream_store(Stream=#stream{id=StreamID},
  1471. State=#http2_machine{streams=Streams}) ->
  1472. State#http2_machine{streams=Streams#{StreamID => Stream}}.
  1473. %% @todo Don't send an RST_STREAM if one was already sent.
  1474. stream_reset(StreamID, State, Reason, HumanReadable) ->
  1475. {error, {stream_error, StreamID, Reason, HumanReadable},
  1476. stream_linger(StreamID, State)}.
  1477. stream_linger(StreamID, State=#http2_machine{local_lingering_streams=Lingering0}) ->
  1478. %% We only keep up to 100 streams in this state. @todo Make it configurable?
  1479. Lingering = [StreamID|lists:sublist(Lingering0, 100 - 1)],
  1480. State#http2_machine{local_lingering_streams=Lingering}.