cow_http2_machine.erl 65 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589
  1. %% Copyright (c) 2018, Loïc Hoguin <essen@ninenines.eu>
  2. %%
  3. %% Permission to use, copy, modify, and/or distribute this software for any
  4. %% purpose with or without fee is hereby granted, provided that the above
  5. %% copyright notice and this permission notice appear in all copies.
  6. %%
  7. %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  8. %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  9. %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  10. %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  11. %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  12. %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  13. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  14. -module(cow_http2_machine).
  15. -export([init/2]).
  16. -export([init_stream/2]).
  17. -export([init_upgrade_stream/2]).
  18. -export([frame/2]).
  19. -export([ignored_frame/1]).
  20. -export([timeout/3]).
  21. -export([prepare_headers/5]).
  22. -export([prepare_push_promise/4]).
  23. -export([prepare_trailers/3]).
  24. -export([send_or_queue_data/4]).
  25. -export([ensure_window/2]).
  26. -export([ensure_window/3]).
  27. -export([update_window/2]).
  28. -export([update_window/3]).
  29. -export([reset_stream/2]).
  30. -export([get_connection_local_buffer_size/1]).
  31. -export([get_local_setting/2]).
  32. -export([get_last_streamid/1]).
  33. -export([get_stream_local_buffer_size/2]).
  34. -export([get_stream_local_state/2]).
  35. -export([get_stream_remote_state/2]).
  36. -export([is_lingering_stream/2]).
  37. -type opts() :: #{
  38. connection_window_margin_size => 0..16#7fffffff,
  39. connection_window_update_threshold => 0..16#7fffffff,
  40. enable_connect_protocol => boolean(),
  41. initial_connection_window_size => 65535..16#7fffffff,
  42. initial_stream_window_size => 0..16#7fffffff,
  43. max_connection_window_size => 0..16#7fffffff,
  44. max_concurrent_streams => non_neg_integer() | infinity,
  45. max_decode_table_size => non_neg_integer(),
  46. max_encode_table_size => non_neg_integer(),
  47. max_frame_size_received => 16384..16777215,
  48. max_frame_size_sent => 16384..16777215 | infinity,
  49. max_stream_window_size => 0..16#7fffffff,
  50. preface_timeout => timeout(),
  51. settings_timeout => timeout(),
  52. stream_window_data_threshold => 0..16#7fffffff,
  53. stream_window_margin_size => 0..16#7fffffff,
  54. stream_window_update_threshold => 0..16#7fffffff
  55. }.
  56. -export_type([opts/0]).
  57. %% The order of the fields is significant.
  58. -record(sendfile, {
  59. offset :: non_neg_integer(),
  60. bytes :: pos_integer(),
  61. path :: file:name_all()
  62. }).
  63. -record(stream, {
  64. id = undefined :: cow_http2:streamid(),
  65. %% Request method.
  66. method = undefined :: binary(),
  67. %% Whether we finished sending data.
  68. local = idle :: idle | cow_http2:fin(),
  69. %% Local flow control window (how much we can send).
  70. local_window :: integer(),
  71. %% Buffered data waiting for the flow control window to increase.
  72. local_buffer = queue:new() ::
  73. queue:queue({cow_http2:fin(), non_neg_integer(), {data, iodata()} | #sendfile{}}),
  74. local_buffer_size = 0 :: non_neg_integer(),
  75. local_trailers = undefined :: undefined | cow_http:headers(),
  76. %% Whether we finished receiving data.
  77. remote = idle :: idle | cow_http2:fin(),
  78. %% Remote flow control window (how much we accept to receive).
  79. remote_window :: integer(),
  80. %% Size expected and read from the request body.
  81. remote_expected_size = undefined :: undefined | non_neg_integer(),
  82. remote_read_size = 0 :: non_neg_integer(),
  83. %% Unparsed te header. Used to know if we can send trailers.
  84. %% Note that we can always send trailers to the server.
  85. te :: undefined | binary()
  86. }).
  87. -type stream() :: #stream{}.
  88. -type continued_frame() ::
  89. {headers, cow_http2:streamid(), cow_http2:fin(), cow_http2:head_fin(), binary()} |
  90. {push_promise, cow_http2:streamid(), cow_http2:head_fin(), cow_http2:streamid(), binary()}.
  91. -record(http2_machine, {
  92. %% Whether the HTTP/2 endpoint is a client or a server.
  93. mode :: client | server,
  94. %% HTTP/2 SETTINGS customization.
  95. opts = #{} :: opts(),
  96. %% Connection-wide frame processing state.
  97. state = settings :: settings | normal
  98. | {continuation, request | response | trailers | push_promise, continued_frame()},
  99. %% Timer for the connection preface.
  100. preface_timer = undefined :: undefined | reference(),
  101. %% Timer for the ack for a SETTINGS frame we sent.
  102. settings_timer = undefined :: undefined | reference(),
  103. %% Settings are separate for each endpoint. In addition, settings
  104. %% must be acknowledged before they can be expected to be applied.
  105. local_settings = #{
  106. % header_table_size => 4096,
  107. % enable_push => true,
  108. % max_concurrent_streams => infinity,
  109. initial_window_size => 65535
  110. % max_frame_size => 16384
  111. % max_header_list_size => infinity
  112. } :: map(),
  113. next_settings = undefined :: undefined | map(),
  114. remote_settings = #{
  115. initial_window_size => 65535
  116. } :: map(),
  117. %% Connection-wide flow control window.
  118. local_window = 65535 :: integer(), %% How much we can send.
  119. remote_window = 65535 :: integer(), %% How much we accept to receive.
  120. %% Stream identifiers.
  121. local_streamid :: pos_integer(), %% The next streamid to be used.
  122. remote_streamid = 0 :: non_neg_integer(), %% The last streamid received.
  123. %% Currently active HTTP/2 streams. Streams may be initiated either
  124. %% by the client or by the server through PUSH_PROMISE frames.
  125. streams = [] :: [stream()],
  126. %% HTTP/2 streams that have recently been reset locally.
  127. %% We are expected to keep receiving additional frames after
  128. %% sending an RST_STREAM.
  129. local_lingering_streams = [] :: [cow_http2:streamid()],
  130. %% HTTP/2 streams that have recently been reset remotely.
  131. %% We keep a few of these around in order to reject subsequent
  132. %% frames on these streams.
  133. remote_lingering_streams = [] :: [cow_http2:streamid()],
  134. %% HPACK decoding and encoding state.
  135. decode_state = cow_hpack:init() :: cow_hpack:state(),
  136. encode_state = cow_hpack:init() :: cow_hpack:state()
  137. }).
  138. -opaque http2_machine() :: #http2_machine{}.
  139. -export_type([http2_machine/0]).
  140. -type pseudo_headers() :: #{} %% Trailers
  141. | #{ %% Responses.
  142. status := cow_http:status()
  143. } | #{ %% Normal CONNECT requests.
  144. method := binary(),
  145. authority := binary()
  146. } | #{ %% Other requests and extended CONNECT requests.
  147. method := binary(),
  148. scheme := binary(),
  149. authority := binary(),
  150. path := binary(),
  151. protocol => binary()
  152. }.
  153. %% Returns true when the given StreamID is for a local-initiated stream.
  154. -define(IS_SERVER_LOCAL(StreamID), ((StreamID rem 2) =:= 0)).
  155. -define(IS_CLIENT_LOCAL(StreamID), ((StreamID rem 2) =:= 1)).
  156. -define(IS_LOCAL(Mode, StreamID), (
  157. ((Mode =:= server) andalso ?IS_SERVER_LOCAL(StreamID))
  158. orelse
  159. ((Mode =:= client) andalso ?IS_CLIENT_LOCAL(StreamID))
  160. )).
  161. -spec init(client | server, opts()) -> {ok, iodata(), http2_machine()}.
  162. init(client, Opts) ->
  163. NextSettings = settings_init(Opts),
  164. client_preface(#http2_machine{
  165. mode=client,
  166. opts=Opts,
  167. preface_timer=start_timer(preface_timeout, Opts),
  168. settings_timer=start_timer(settings_timeout, Opts),
  169. next_settings=NextSettings,
  170. local_streamid=1
  171. });
  172. init(server, Opts) ->
  173. NextSettings = settings_init(Opts),
  174. common_preface(#http2_machine{
  175. mode=server,
  176. opts=Opts,
  177. preface_timer=start_timer(preface_timeout, Opts),
  178. settings_timer=start_timer(settings_timeout, Opts),
  179. next_settings=NextSettings,
  180. local_streamid=2
  181. }).
  182. start_timer(Name, Opts) ->
  183. case maps:get(Name, Opts, 5000) of
  184. infinity -> undefined;
  185. Timeout -> erlang:start_timer(Timeout, self(), {?MODULE, Name})
  186. end.
  187. client_preface(State0) ->
  188. {ok, CommonPreface, State} = common_preface(State0),
  189. {ok, [
  190. <<"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n">>,
  191. CommonPreface
  192. ], State}.
  193. %% We send next_settings and use defaults until we get an ack.
  194. %%
  195. %% We also send a WINDOW_UPDATE frame for the connection when
  196. %% the user specified an initial_connection_window_size.
  197. common_preface(State=#http2_machine{opts=Opts, next_settings=NextSettings}) ->
  198. case maps:get(initial_connection_window_size, Opts, 65535) of
  199. 65535 ->
  200. {ok, cow_http2:settings(NextSettings), State};
  201. Size ->
  202. {ok, [
  203. cow_http2:settings(NextSettings),
  204. cow_http2:window_update(Size - 65535)
  205. ], update_window(Size - 65535, State)}
  206. end.
  207. settings_init(Opts) ->
  208. S0 = setting_from_opt(#{}, Opts, max_decode_table_size,
  209. header_table_size, 4096),
  210. S1 = setting_from_opt(S0, Opts, max_concurrent_streams,
  211. max_concurrent_streams, infinity),
  212. S2 = setting_from_opt(S1, Opts, initial_stream_window_size,
  213. initial_window_size, 65535),
  214. S3 = setting_from_opt(S2, Opts, max_frame_size_received,
  215. max_frame_size, 16384),
  216. %% @todo max_header_list_size
  217. setting_from_opt(S3, Opts, enable_connect_protocol,
  218. enable_connect_protocol, false).
  219. setting_from_opt(Settings, Opts, OptName, SettingName, Default) ->
  220. case maps:get(OptName, Opts, Default) of
  221. Default -> Settings;
  222. Value -> Settings#{SettingName => Value}
  223. end.
  224. -spec init_stream(binary(), State)
  225. -> {ok, cow_http2:streamid(), State} when State::http2_machine().
  226. init_stream(Method, State=#http2_machine{mode=client, local_streamid=LocalStreamID,
  227. local_settings=#{initial_window_size := RemoteWindow},
  228. remote_settings=#{initial_window_size := LocalWindow}}) ->
  229. Stream = #stream{id=LocalStreamID, method=Method,
  230. local_window=LocalWindow, remote_window=RemoteWindow},
  231. {ok, LocalStreamID, stream_store(Stream, State#http2_machine{
  232. local_streamid=LocalStreamID + 2})}.
  233. -spec init_upgrade_stream(binary(), State)
  234. -> {ok, cow_http2:streamid(), State} when State::http2_machine().
  235. init_upgrade_stream(Method, State=#http2_machine{mode=server, remote_streamid=0,
  236. local_settings=#{initial_window_size := RemoteWindow},
  237. remote_settings=#{initial_window_size := LocalWindow}}) ->
  238. Stream = #stream{id=1, method=Method,
  239. remote=fin, remote_expected_size=0,
  240. local_window=LocalWindow, remote_window=RemoteWindow, te=undefined},
  241. {ok, 1, stream_store(Stream, State#http2_machine{remote_streamid=1})}.
  242. -spec frame(cow_http2:frame(), State)
  243. -> {ok, State}
  244. | {ok, {data, cow_http2:streamid(), cow_http2:fin(), binary()}, State}
  245. | {ok, {headers, cow_http2:streamid(), cow_http2:fin(),
  246. cow_http:headers(), pseudo_headers(), non_neg_integer() | undefined}, State}
  247. | {ok, {trailers, cow_http2:streamid(), cow_http:headers()}, State}
  248. | {ok, {rst_stream, cow_http2:streamid(), cow_http2:error()}, State}
  249. | {ok, {push_promise, cow_http2:streamid(), cow_http2:streamid(),
  250. cow_http:headers(), pseudo_headers()}, State}
  251. | {ok, {goaway, cow_http2:streamid(), cow_http2:error(), binary()}, State}
  252. | {send, [{cow_http2:streamid(), cow_http2:fin(),
  253. [{data, iodata()} | #sendfile{} | {trailers, cow_http:headers()}]}], State}
  254. | {error, {stream_error, cow_http2:streamid(), cow_http2:error(), atom()}, State}
  255. | {error, {connection_error, cow_http2:error(), atom()}, State}
  256. when State::http2_machine().
  257. frame(Frame, State=#http2_machine{state=settings, preface_timer=TRef}) ->
  258. ok = case TRef of
  259. undefined -> ok;
  260. _ -> erlang:cancel_timer(TRef, [{async, true}, {info, false}])
  261. end,
  262. settings_frame(Frame, State#http2_machine{state=normal, preface_timer=undefined});
  263. frame(Frame, State=#http2_machine{state={continuation, _, _}}) ->
  264. continuation_frame(Frame, State);
  265. frame(settings_ack, State=#http2_machine{state=normal}) ->
  266. settings_ack_frame(State);
  267. frame(Frame, State=#http2_machine{state=normal}) ->
  268. case element(1, Frame) of
  269. data -> data_frame(Frame, State);
  270. headers -> headers_frame(Frame, State);
  271. priority -> priority_frame(Frame, State);
  272. rst_stream -> rst_stream_frame(Frame, State);
  273. settings -> settings_frame(Frame, State);
  274. push_promise -> push_promise_frame(Frame, State);
  275. ping -> ping_frame(Frame, State);
  276. ping_ack -> ping_ack_frame(Frame, State);
  277. goaway -> goaway_frame(Frame, State);
  278. window_update -> window_update_frame(Frame, State);
  279. continuation -> unexpected_continuation_frame(Frame, State);
  280. _ -> ignored_frame(State)
  281. end.
  282. %% DATA frame.
  283. data_frame({data, StreamID, _, _}, State=#http2_machine{mode=Mode,
  284. local_streamid=LocalStreamID, remote_streamid=RemoteStreamID})
  285. when (?IS_LOCAL(Mode, StreamID) andalso (StreamID >= LocalStreamID))
  286. orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID > RemoteStreamID)) ->
  287. {error, {connection_error, protocol_error,
  288. 'DATA frame received on a stream in idle state. (RFC7540 5.1)'},
  289. State};
  290. data_frame({data, _, _, Data}, State=#http2_machine{remote_window=ConnWindow})
  291. when byte_size(Data) > ConnWindow ->
  292. {error, {connection_error, flow_control_error,
  293. 'DATA frame overflowed the connection flow control window. (RFC7540 6.9, RFC7540 6.9.1)'},
  294. State};
  295. data_frame(Frame={data, StreamID, _, Data}, State0=#http2_machine{
  296. remote_window=ConnWindow, local_lingering_streams=Lingering}) ->
  297. DataLen = byte_size(Data),
  298. State = State0#http2_machine{remote_window=ConnWindow - DataLen},
  299. case stream_get(StreamID, State) of
  300. #stream{remote_window=StreamWindow} when StreamWindow < DataLen ->
  301. stream_reset(StreamID, State, flow_control_error,
  302. 'DATA frame overflowed the stream flow control window. (RFC7540 6.9, RFC7540 6.9.1)');
  303. Stream = #stream{remote=nofin} ->
  304. data_frame(Frame, State, Stream, DataLen);
  305. #stream{remote=idle} ->
  306. stream_reset(StreamID, State, protocol_error,
  307. 'DATA frame received before a HEADERS frame. (RFC7540 8.1, RFC7540 8.1.2.6)');
  308. #stream{remote=fin} ->
  309. stream_reset(StreamID, State, stream_closed,
  310. 'DATA frame received for a half-closed (remote) stream. (RFC7540 5.1)');
  311. undefined ->
  312. %% After we send an RST_STREAM frame and terminate a stream,
  313. %% the remote endpoint might still be sending us some more
  314. %% frames until it can process this RST_STREAM.
  315. case lists:member(StreamID, Lingering) of
  316. true ->
  317. {ok, State};
  318. false ->
  319. {error, {connection_error, stream_closed,
  320. 'DATA frame received for a closed stream. (RFC7540 5.1)'},
  321. State}
  322. end
  323. end.
  324. data_frame(Frame={data, _, IsFin, _}, State0, Stream0=#stream{id=StreamID,
  325. remote_window=StreamWindow, remote_read_size=StreamRead}, DataLen) ->
  326. Stream = Stream0#stream{remote=IsFin,
  327. remote_window=StreamWindow - DataLen,
  328. remote_read_size=StreamRead + DataLen},
  329. State = stream_store(Stream, State0),
  330. case is_body_size_valid(Stream) of
  331. true ->
  332. {ok, Frame, State};
  333. false ->
  334. stream_reset(StreamID, State, protocol_error,
  335. 'The total size of DATA frames is different than the content-length. (RFC7540 8.1.2.6)')
  336. end.
  337. %% It's always valid when no content-length header was specified.
  338. is_body_size_valid(#stream{remote_expected_size=undefined}) ->
  339. true;
  340. %% We didn't finish reading the body but the size is already larger than expected.
  341. is_body_size_valid(#stream{remote=nofin, remote_expected_size=Expected,
  342. remote_read_size=Read}) when Read > Expected ->
  343. false;
  344. is_body_size_valid(#stream{remote=nofin}) ->
  345. true;
  346. is_body_size_valid(#stream{remote=fin, remote_expected_size=Expected,
  347. remote_read_size=Expected}) ->
  348. true;
  349. %% We finished reading the body and the size read is not the one expected.
  350. is_body_size_valid(_) ->
  351. false.
  352. %% HEADERS frame.
  353. %%
  354. %% We always close the connection when we detect errors before
  355. %% decoding the headers to not waste resources on non-compliant
  356. %% endpoints, making us stricter than the RFC requires.
  357. %% Convenience record to manipulate the tuple.
  358. %% The order of the fields matter.
  359. -record(headers, {
  360. id :: cow_http2:streamid(),
  361. fin :: cow_http2:fin(),
  362. head :: cow_http2:head_fin(),
  363. data :: binary()
  364. }).
  365. headers_frame(Frame=#headers{}, State=#http2_machine{mode=Mode}) ->
  366. case Mode of
  367. server -> server_headers_frame(Frame, State);
  368. client -> client_headers_frame(Frame, State)
  369. end;
  370. %% @todo Handle the PRIORITY data, but only if this returns an ok tuple.
  371. %% @todo Do not lose the PRIORITY information if CONTINUATION frames follow.
  372. headers_frame({headers, StreamID, IsFin, IsHeadFin,
  373. _IsExclusive, _DepStreamID, _Weight, HeaderData},
  374. State=#http2_machine{mode=Mode}) ->
  375. HeadersFrame = #headers{id=StreamID, fin=IsFin, head=IsHeadFin, data=HeaderData},
  376. case Mode of
  377. server -> server_headers_frame(HeadersFrame, State);
  378. client -> client_headers_frame(HeadersFrame, State)
  379. end.
  380. %% Reject HEADERS frames with even-numbered streamid.
  381. server_headers_frame(#headers{id=StreamID}, State)
  382. when ?IS_SERVER_LOCAL(StreamID) ->
  383. {error, {connection_error, protocol_error,
  384. 'HEADERS frame received with even-numbered streamid. (RFC7540 5.1.1)'},
  385. State};
  386. %% HEADERS frame on an idle stream: new request.
  387. server_headers_frame(Frame=#headers{id=StreamID, head=IsHeadFin},
  388. State=#http2_machine{mode=server, remote_streamid=RemoteStreamID})
  389. when StreamID > RemoteStreamID ->
  390. case IsHeadFin of
  391. head_fin ->
  392. headers_decode(Frame, State, request, undefined);
  393. head_nofin ->
  394. {ok, State#http2_machine{state={continuation, request, Frame}}}
  395. end;
  396. %% Either a HEADERS frame received on (half-)closed stream,
  397. %% or a HEADERS frame containing the trailers.
  398. server_headers_frame(Frame=#headers{id=StreamID, fin=IsFin, head=IsHeadFin}, State) ->
  399. case stream_get(StreamID, State) of
  400. %% Trailers.
  401. Stream = #stream{remote=nofin} when IsFin =:= fin ->
  402. case IsHeadFin of
  403. head_fin ->
  404. headers_decode(Frame, State, trailers, Stream);
  405. head_nofin ->
  406. {ok, State#http2_machine{state={continuation, trailers, Frame}}}
  407. end;
  408. #stream{remote=nofin} ->
  409. {error, {connection_error, protocol_error,
  410. 'Trailing HEADERS frame received without the END_STREAM flag set. (RFC7540 8.1, RFC7540 8.1.2.6)'},
  411. State};
  412. _ ->
  413. {error, {connection_error, stream_closed,
  414. 'HEADERS frame received on a stream in closed or half-closed state. (RFC7540 5.1)'},
  415. State}
  416. end.
  417. %% Either a HEADERS frame received on an (half-)closed stream,
  418. %% or a HEADERS frame containing the response or the trailers.
  419. client_headers_frame(Frame=#headers{id=StreamID, fin=IsFin, head=IsHeadFin},
  420. State=#http2_machine{local_streamid=LocalStreamID, remote_streamid=RemoteStreamID})
  421. when (?IS_CLIENT_LOCAL(StreamID) andalso (StreamID < LocalStreamID))
  422. orelse ((not ?IS_CLIENT_LOCAL(StreamID)) andalso (StreamID =< RemoteStreamID)) ->
  423. case stream_get(StreamID, State) of
  424. Stream = #stream{remote=idle} ->
  425. case IsHeadFin of
  426. head_fin ->
  427. headers_decode(Frame, State, response, Stream);
  428. head_nofin ->
  429. {ok, State#http2_machine{state={continuation, response, Frame}}}
  430. end;
  431. Stream = #stream{remote=nofin} when IsFin =:= fin ->
  432. case IsHeadFin of
  433. head_fin ->
  434. headers_decode(Frame, State, trailers, Stream);
  435. head_nofin ->
  436. {ok, State#http2_machine{state={continuation, trailers, Frame}}}
  437. end;
  438. #stream{remote=nofin} ->
  439. {error, {connection_error, protocol_error,
  440. 'Trailing HEADERS frame received without the END_STREAM flag set. (RFC7540 8.1, RFC7540 8.1.2.6)'},
  441. State};
  442. _ ->
  443. {error, {connection_error, stream_closed,
  444. 'HEADERS frame received on a stream in closed or half-closed state. (RFC7540 5.1)'},
  445. State}
  446. end;
  447. %% Reject HEADERS frames received on idle streams.
  448. client_headers_frame(_, State) ->
  449. {error, {connection_error, protocol_error,
  450. 'HEADERS frame received on an idle stream. (RFC7540 5.1.1)'},
  451. State}.
  452. headers_decode(Frame=#headers{head=head_fin, data=HeaderData},
  453. State=#http2_machine{decode_state=DecodeState0}, Type, Stream) ->
  454. try cow_hpack:decode(HeaderData, DecodeState0) of
  455. {Headers, DecodeState} when Type =:= request ->
  456. headers_enforce_concurrency_limit(Frame,
  457. State#http2_machine{decode_state=DecodeState}, Type, Stream, Headers);
  458. {Headers, DecodeState} ->
  459. headers_pseudo_headers(Frame,
  460. State#http2_machine{decode_state=DecodeState}, Type, Stream, Headers)
  461. catch _:_ ->
  462. {error, {connection_error, compression_error,
  463. 'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'},
  464. State}
  465. end.
  466. headers_enforce_concurrency_limit(Frame=#headers{id=StreamID},
  467. State=#http2_machine{local_settings=LocalSettings, streams=Streams},
  468. Type, Stream, Headers) ->
  469. MaxConcurrentStreams = maps:get(max_concurrent_streams, LocalSettings, infinity),
  470. %% Using < is correct because this new stream is not included
  471. %% in the Streams variable yet and so we'll end up with +1 stream.
  472. case length(Streams) < MaxConcurrentStreams of
  473. true ->
  474. headers_pseudo_headers(Frame, State, Type, Stream, Headers);
  475. false ->
  476. {error, {stream_error, StreamID, refused_stream,
  477. 'Maximum number of concurrent streams has been reached. (RFC7540 5.1.2)'},
  478. State}
  479. end.
  480. headers_pseudo_headers(Frame, State=#http2_machine{local_settings=LocalSettings},
  481. Type, Stream, Headers0) when Type =:= request; Type =:= push_promise ->
  482. IsExtendedConnectEnabled = maps:get(enable_connect_protocol, LocalSettings, false),
  483. case request_pseudo_headers(Headers0, #{}) of
  484. %% Extended CONNECT method (RFC8441).
  485. {ok, PseudoHeaders=#{method := <<"CONNECT">>, scheme := _,
  486. authority := _, path := _, protocol := _}, Headers}
  487. when IsExtendedConnectEnabled ->
  488. headers_regular_headers(Frame, State, Type, Stream, PseudoHeaders, Headers);
  489. {ok, #{method := <<"CONNECT">>, scheme := _,
  490. authority := _, path := _}, _}
  491. when IsExtendedConnectEnabled ->
  492. headers_malformed(Frame, State,
  493. 'The :protocol pseudo-header MUST be sent with an extended CONNECT. (RFC8441 4)');
  494. {ok, #{protocol := _}, _} ->
  495. headers_malformed(Frame, State,
  496. 'The :protocol pseudo-header is only defined for the extended CONNECT. (RFC8441 4)');
  497. %% Normal CONNECT (no scheme/path).
  498. {ok, PseudoHeaders=#{method := <<"CONNECT">>, authority := _}, Headers}
  499. when map_size(PseudoHeaders) =:= 2 ->
  500. headers_regular_headers(Frame, State, Type, Stream, PseudoHeaders, Headers);
  501. {ok, #{method := <<"CONNECT">>}, _} ->
  502. headers_malformed(Frame, State,
  503. 'CONNECT requests only use the :method and :authority pseudo-headers. (RFC7540 8.3)');
  504. %% Other requests.
  505. {ok, PseudoHeaders=#{method := _, scheme := _, path := _}, Headers} ->
  506. headers_regular_headers(Frame, State, Type, Stream, PseudoHeaders, Headers);
  507. {ok, _, _} ->
  508. headers_malformed(Frame, State,
  509. 'A required pseudo-header was not found. (RFC7540 8.1.2.3)');
  510. {error, HumanReadable} ->
  511. headers_malformed(Frame, State, HumanReadable)
  512. end;
  513. headers_pseudo_headers(Frame=#headers{id=StreamID},
  514. State, Type=response, Stream, Headers0) ->
  515. case response_pseudo_headers(Headers0, #{}) of
  516. {ok, PseudoHeaders=#{status := _}, Headers} ->
  517. headers_regular_headers(Frame, State, Type, Stream, PseudoHeaders, Headers);
  518. {ok, _, _} ->
  519. stream_reset(StreamID, State, protocol_error,
  520. 'A required pseudo-header was not found. (RFC7540 8.1.2.4)');
  521. {error, HumanReadable} ->
  522. stream_reset(StreamID, State, protocol_error, HumanReadable)
  523. end;
  524. headers_pseudo_headers(Frame=#headers{id=StreamID},
  525. State, Type=trailers, Stream, Headers) ->
  526. case trailers_contain_pseudo_headers(Headers) of
  527. false ->
  528. headers_regular_headers(Frame, State, Type, Stream, #{}, Headers);
  529. true ->
  530. stream_reset(StreamID, State, protocol_error,
  531. 'Trailer header blocks must not contain pseudo-headers. (RFC7540 8.1.2.1)')
  532. end.
  533. headers_malformed(#headers{id=StreamID}, State, HumanReadable) ->
  534. {error, {stream_error, StreamID, protocol_error, HumanReadable}, State}.
  535. request_pseudo_headers([{<<":method">>, _}|_], #{method := _}) ->
  536. {error, 'Multiple :method pseudo-headers were found. (RFC7540 8.1.2.3)'};
  537. request_pseudo_headers([{<<":method">>, Method}|Tail], PseudoHeaders) ->
  538. request_pseudo_headers(Tail, PseudoHeaders#{method => Method});
  539. request_pseudo_headers([{<<":scheme">>, _}|_], #{scheme := _}) ->
  540. {error, 'Multiple :scheme pseudo-headers were found. (RFC7540 8.1.2.3)'};
  541. request_pseudo_headers([{<<":scheme">>, Scheme}|Tail], PseudoHeaders) ->
  542. request_pseudo_headers(Tail, PseudoHeaders#{scheme => Scheme});
  543. request_pseudo_headers([{<<":authority">>, _}|_], #{authority := _}) ->
  544. {error, 'Multiple :authority pseudo-headers were found. (RFC7540 8.1.2.3)'};
  545. request_pseudo_headers([{<<":authority">>, Authority}|Tail], PseudoHeaders) ->
  546. request_pseudo_headers(Tail, PseudoHeaders#{authority => Authority});
  547. request_pseudo_headers([{<<":path">>, _}|_], #{path := _}) ->
  548. {error, 'Multiple :path pseudo-headers were found. (RFC7540 8.1.2.3)'};
  549. request_pseudo_headers([{<<":path">>, Path}|Tail], PseudoHeaders) ->
  550. request_pseudo_headers(Tail, PseudoHeaders#{path => Path});
  551. request_pseudo_headers([{<<":protocol">>, _}|_], #{protocol := _}) ->
  552. {error, 'Multiple :protocol pseudo-headers were found. (RFC7540 8.1.2.3)'};
  553. request_pseudo_headers([{<<":protocol">>, Protocol}|Tail], PseudoHeaders) ->
  554. request_pseudo_headers(Tail, PseudoHeaders#{protocol => Protocol});
  555. request_pseudo_headers([{<<":", _/bits>>, _}|_], _) ->
  556. {error, 'An unknown or invalid pseudo-header was found. (RFC7540 8.1.2.1)'};
  557. request_pseudo_headers(Headers, PseudoHeaders) ->
  558. {ok, PseudoHeaders, Headers}.
  559. response_pseudo_headers([{<<":status">>, _}|_], #{status := _}) ->
  560. {error, 'Multiple :status pseudo-headers were found. (RFC7540 8.1.2.3)'};
  561. response_pseudo_headers([{<<":status">>, Status}|Tail], PseudoHeaders) ->
  562. try cow_http:status_to_integer(Status) of
  563. IntStatus ->
  564. response_pseudo_headers(Tail, PseudoHeaders#{status => IntStatus})
  565. catch _:_ ->
  566. {error, 'The :status pseudo-header value is invalid. (RFC7540 8.1.2.4)'}
  567. end;
  568. response_pseudo_headers([{<<":", _/bits>>, _}|_], _) ->
  569. {error, 'An unknown or invalid pseudo-header was found. (RFC7540 8.1.2.1)'};
  570. response_pseudo_headers(Headers, PseudoHeaders) ->
  571. {ok, PseudoHeaders, Headers}.
  572. trailers_contain_pseudo_headers([]) ->
  573. false;
  574. trailers_contain_pseudo_headers([{<<":", _/bits>>, _}|_]) ->
  575. true;
  576. trailers_contain_pseudo_headers([_|Tail]) ->
  577. trailers_contain_pseudo_headers(Tail).
  578. %% Rejecting invalid regular headers might be a bit too strong for clients.
  579. headers_regular_headers(Frame=#headers{id=StreamID},
  580. State, Type, Stream, PseudoHeaders, Headers) ->
  581. case regular_headers(Headers, Type) of
  582. ok when Type =:= request ->
  583. request_expected_size(Frame, State, Type, Stream, PseudoHeaders, Headers);
  584. ok when Type =:= push_promise ->
  585. push_promise_frame(Frame, State, Stream, PseudoHeaders, Headers);
  586. ok when Type =:= response ->
  587. response_expected_size(Frame, State, Type, Stream, PseudoHeaders, Headers);
  588. ok when Type =:= trailers ->
  589. trailers_frame(Frame, State, Stream, Headers);
  590. {error, HumanReadable} when Type =:= request ->
  591. headers_malformed(Frame, State, HumanReadable);
  592. {error, HumanReadable} ->
  593. stream_reset(StreamID, State, protocol_error, HumanReadable)
  594. end.
  595. regular_headers([{<<>>, _}|_], _) ->
  596. {error, 'Empty header names are not valid regular headers. (CVE-2019-9516)'};
  597. regular_headers([{<<":", _/bits>>, _}|_], _) ->
  598. {error, 'Pseudo-headers were found after regular headers. (RFC7540 8.1.2.1)'};
  599. regular_headers([{<<"connection">>, _}|_], _) ->
  600. {error, 'The connection header is not allowed. (RFC7540 8.1.2.2)'};
  601. regular_headers([{<<"keep-alive">>, _}|_], _) ->
  602. {error, 'The keep-alive header is not allowed. (RFC7540 8.1.2.2)'};
  603. regular_headers([{<<"proxy-authenticate">>, _}|_], _) ->
  604. {error, 'The proxy-authenticate header is not allowed. (RFC7540 8.1.2.2)'};
  605. regular_headers([{<<"proxy-authorization">>, _}|_], _) ->
  606. {error, 'The proxy-authorization header is not allowed. (RFC7540 8.1.2.2)'};
  607. regular_headers([{<<"transfer-encoding">>, _}|_], _) ->
  608. {error, 'The transfer-encoding header is not allowed. (RFC7540 8.1.2.2)'};
  609. regular_headers([{<<"upgrade">>, _}|_], _) ->
  610. {error, 'The upgrade header is not allowed. (RFC7540 8.1.2.2)'};
  611. regular_headers([{<<"te">>, Value}|_], request) when Value =/= <<"trailers">> ->
  612. {error, 'The te header with a value other than "trailers" is not allowed. (RFC7540 8.1.2.2)'};
  613. regular_headers([{<<"te">>, _}|_], Type) when Type =/= request ->
  614. {error, 'The te header is only allowed in request headers. (RFC7540 8.1.2.2)'};
  615. regular_headers([{Name, _}|Tail], Type) ->
  616. Pattern = [
  617. <<$A>>, <<$B>>, <<$C>>, <<$D>>, <<$E>>, <<$F>>, <<$G>>, <<$H>>, <<$I>>,
  618. <<$J>>, <<$K>>, <<$L>>, <<$M>>, <<$N>>, <<$O>>, <<$P>>, <<$Q>>, <<$R>>,
  619. <<$S>>, <<$T>>, <<$U>>, <<$V>>, <<$W>>, <<$X>>, <<$Y>>, <<$Z>>
  620. ],
  621. case binary:match(Name, Pattern) of
  622. nomatch -> regular_headers(Tail, Type);
  623. _ -> {error, 'Header names must be lowercase. (RFC7540 8.1.2)'}
  624. end;
  625. regular_headers([], _) ->
  626. ok.
  627. request_expected_size(Frame=#headers{fin=IsFin}, State, Type, Stream, PseudoHeaders, Headers) ->
  628. case [CL || {<<"content-length">>, CL} <- Headers] of
  629. [] when IsFin =:= fin ->
  630. headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0);
  631. [] ->
  632. headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, undefined);
  633. [<<"0">>] when IsFin =:= fin ->
  634. headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0);
  635. [_] when IsFin =:= fin ->
  636. headers_malformed(Frame, State,
  637. 'HEADERS frame with the END_STREAM flag contains a non-zero content-length. (RFC7540 8.1.2.6)');
  638. [BinLen] ->
  639. headers_parse_expected_size(Frame, State, Type, Stream,
  640. PseudoHeaders, Headers, BinLen);
  641. _ ->
  642. headers_malformed(Frame, State,
  643. 'Multiple content-length headers were received. (RFC7230 3.3.2)')
  644. end.
  645. response_expected_size(Frame=#headers{id=StreamID, fin=IsFin}, State, Type,
  646. Stream=#stream{method=Method}, PseudoHeaders=#{status := Status}, Headers) ->
  647. case [CL || {<<"content-length">>, CL} <- Headers] of
  648. [] when IsFin =:= fin ->
  649. headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0);
  650. [] ->
  651. headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, undefined);
  652. [_] when Status >= 100, Status =< 199 ->
  653. stream_reset(StreamID, State, protocol_error,
  654. 'Content-length header received in a 1xx response. (RFC7230 3.3.2)');
  655. [_] when Status =:= 204 ->
  656. stream_reset(StreamID, State, protocol_error,
  657. 'Content-length header received in a 204 response. (RFC7230 3.3.2)');
  658. [_] when Status >= 200, Status =< 299, Method =:= <<"CONNECT">> ->
  659. stream_reset(StreamID, State, protocol_error,
  660. 'Content-length header received in a 2xx response to a CONNECT request. (RFC7230 3.3.2).');
  661. %% Responses to HEAD requests, and 304 responses may contain
  662. %% a content-length header that must be ignored. (RFC7230 3.3.2)
  663. [_] when Method =:= <<"HEAD">> ->
  664. headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0);
  665. [_] when Status =:= 304 ->
  666. headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0);
  667. [<<"0">>] when IsFin =:= fin ->
  668. headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0);
  669. [_] when IsFin =:= fin ->
  670. stream_reset(StreamID, State, protocol_error,
  671. 'HEADERS frame with the END_STREAM flag contains a non-zero content-length. (RFC7540 8.1.2.6)');
  672. [BinLen] ->
  673. headers_parse_expected_size(Frame, State, Type, Stream,
  674. PseudoHeaders, Headers, BinLen);
  675. _ ->
  676. stream_reset(StreamID, State, protocol_error,
  677. 'Multiple content-length headers were received. (RFC7230 3.3.2)')
  678. end.
  679. headers_parse_expected_size(Frame=#headers{id=StreamID},
  680. State, Type, Stream, PseudoHeaders, Headers, BinLen) ->
  681. try cow_http_hd:parse_content_length(BinLen) of
  682. Len ->
  683. headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, Len)
  684. catch
  685. _:_ ->
  686. HumanReadable = 'The content-length header is invalid. (RFC7230 3.3.2)',
  687. case Type of
  688. request -> headers_malformed(Frame, State, HumanReadable);
  689. response -> stream_reset(StreamID, State, protocol_error, HumanReadable)
  690. end
  691. end.
  692. headers_frame(#headers{id=StreamID, fin=IsFin}, State0=#http2_machine{
  693. local_settings=#{initial_window_size := RemoteWindow},
  694. remote_settings=#{initial_window_size := LocalWindow}},
  695. Type, Stream0, PseudoHeaders, Headers, Len) ->
  696. {Stream, State1} = case Type of
  697. request ->
  698. TE = case lists:keyfind(<<"te">>, 1, Headers) of
  699. {_, TE0} -> TE0;
  700. false -> undefined
  701. end,
  702. {#stream{id=StreamID, method=maps:get(method, PseudoHeaders),
  703. remote=IsFin, remote_expected_size=Len,
  704. local_window=LocalWindow, remote_window=RemoteWindow, te=TE},
  705. State0#http2_machine{remote_streamid=StreamID}};
  706. response ->
  707. Stream1 = case PseudoHeaders of
  708. #{status := Status} when Status >= 100, Status =< 199 -> Stream0;
  709. _ -> Stream0#stream{remote=IsFin, remote_expected_size=Len}
  710. end,
  711. {Stream1, State0}
  712. end,
  713. State = stream_store(Stream, State1),
  714. {ok, {headers, StreamID, IsFin, Headers, PseudoHeaders, Len}, State}.
  715. trailers_frame(#headers{id=StreamID}, State0, Stream0, Headers) ->
  716. Stream = Stream0#stream{remote=fin},
  717. State = stream_store(Stream, State0),
  718. case is_body_size_valid(Stream) of
  719. true ->
  720. {ok, {trailers, StreamID, Headers}, State};
  721. false ->
  722. stream_reset(StreamID, State, protocol_error,
  723. 'The total size of DATA frames is different than the content-length. (RFC7540 8.1.2.6)')
  724. end.
  725. %% PRIORITY frame.
  726. %%
  727. %% @todo Handle PRIORITY frames.
  728. priority_frame(_Frame, State) ->
  729. {ok, State}.
  730. %% RST_STREAM frame.
  731. rst_stream_frame({rst_stream, StreamID, _}, State=#http2_machine{mode=Mode,
  732. local_streamid=LocalStreamID, remote_streamid=RemoteStreamID})
  733. when (?IS_LOCAL(Mode, StreamID) andalso (StreamID >= LocalStreamID))
  734. orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID > RemoteStreamID)) ->
  735. {error, {connection_error, protocol_error,
  736. 'RST_STREAM frame received on a stream in idle state. (RFC7540 5.1)'},
  737. State};
  738. rst_stream_frame({rst_stream, StreamID, Reason}, State=#http2_machine{
  739. streams=Streams0, remote_lingering_streams=Lingering0}) ->
  740. Streams = lists:keydelete(StreamID, #stream.id, Streams0),
  741. %% We only keep up to 10 streams in this state. @todo Make it configurable?
  742. Lingering = [StreamID|lists:sublist(Lingering0, 10 - 1)],
  743. {ok, {rst_stream, StreamID, Reason},
  744. State#http2_machine{streams=Streams, remote_lingering_streams=Lingering}}.
  745. %% SETTINGS frame.
  746. settings_frame({settings, Settings}, State0=#http2_machine{
  747. opts=Opts, remote_settings=Settings0}) ->
  748. State1 = State0#http2_machine{remote_settings=maps:merge(Settings0, Settings)},
  749. State2 = maps:fold(fun
  750. (header_table_size, NewSize, State=#http2_machine{encode_state=EncodeState0}) ->
  751. MaxSize = maps:get(max_encode_table_size, Opts, 4096),
  752. EncodeState = cow_hpack:set_max_size(min(NewSize, MaxSize), EncodeState0),
  753. State#http2_machine{encode_state=EncodeState};
  754. (initial_window_size, NewWindowSize, State) ->
  755. OldWindowSize = maps:get(initial_window_size, Settings0, 65535),
  756. streams_update_local_window(State, NewWindowSize - OldWindowSize);
  757. (_, _, State) ->
  758. State
  759. end, State1, Settings),
  760. case Settings of
  761. #{initial_window_size := _} -> send_data(State2);
  762. _ -> {ok, State2}
  763. end;
  764. %% We expect to receive a SETTINGS frame as part of the preface.
  765. settings_frame(_F, State) ->
  766. {error, {connection_error, protocol_error,
  767. 'The preface sequence must be followed by a SETTINGS frame. (RFC7540 3.5)'},
  768. State}.
  769. %% When SETTINGS_INITIAL_WINDOW_SIZE changes we need to update
  770. %% the local stream windows for all active streams and perhaps
  771. %% resume sending data.
  772. streams_update_local_window(State=#http2_machine{streams=Streams0}, Increment) ->
  773. Streams = [
  774. S#stream{local_window=StreamWindow + Increment}
  775. || S=#stream{local_window=StreamWindow} <- Streams0],
  776. State#http2_machine{streams=Streams}.
  777. %% Ack for a previously sent SETTINGS frame.
  778. settings_ack_frame(State0=#http2_machine{settings_timer=TRef,
  779. local_settings=Local0, next_settings=NextSettings}) ->
  780. ok = case TRef of
  781. undefined -> ok;
  782. _ -> erlang:cancel_timer(TRef, [{async, true}, {info, false}])
  783. end,
  784. Local = maps:merge(Local0, NextSettings),
  785. State1 = State0#http2_machine{settings_timer=undefined,
  786. local_settings=Local, next_settings=#{}},
  787. {ok, maps:fold(fun
  788. (header_table_size, MaxSize, State=#http2_machine{decode_state=DecodeState0}) ->
  789. DecodeState = cow_hpack:set_max_size(MaxSize, DecodeState0),
  790. State#http2_machine{decode_state=DecodeState};
  791. (initial_window_size, NewWindowSize, State) ->
  792. OldWindowSize = maps:get(initial_window_size, Local0, 65535),
  793. streams_update_remote_window(State, NewWindowSize - OldWindowSize);
  794. (_, _, State) ->
  795. State
  796. end, State1, NextSettings)}.
  797. %% When we receive an ack to a SETTINGS frame we sent we need to update
  798. %% the remote stream windows for all active streams.
  799. streams_update_remote_window(State=#http2_machine{streams=Streams0}, Increment) ->
  800. Streams = [
  801. S#stream{remote_window=StreamWindow + Increment}
  802. || S=#stream{remote_window=StreamWindow} <- Streams0],
  803. State#http2_machine{streams=Streams}.
  804. %% PUSH_PROMISE frame.
  805. %% Convenience record to manipulate the tuple.
  806. %% The order of the fields matter.
  807. -record(push_promise, {
  808. id :: cow_http2:streamid(),
  809. head :: cow_http2:head_fin(),
  810. promised_id :: cow_http2:streamid(),
  811. data :: binary()
  812. }).
  813. push_promise_frame(_, State=#http2_machine{mode=server}) ->
  814. {error, {connection_error, protocol_error,
  815. 'PUSH_PROMISE frames MUST NOT be sent by the client. (RFC7540 6.6)'},
  816. State};
  817. push_promise_frame(_, State=#http2_machine{local_settings=#{enable_push := false}}) ->
  818. {error, {connection_error, protocol_error,
  819. 'PUSH_PROMISE frame received despite SETTINGS_ENABLE_PUSH set to 0. (RFC7540 6.6)'},
  820. State};
  821. push_promise_frame(#push_promise{promised_id=PromisedStreamID},
  822. State=#http2_machine{remote_streamid=RemoteStreamID})
  823. when PromisedStreamID =< RemoteStreamID ->
  824. {error, {connection_error, protocol_error,
  825. 'PUSH_PROMISE frame received for a promised stream in closed or half-closed state. (RFC7540 5.1, RFC7540 6.6)'},
  826. State};
  827. push_promise_frame(#push_promise{id=StreamID}, State)
  828. when not ?IS_CLIENT_LOCAL(StreamID) ->
  829. {error, {connection_error, protocol_error,
  830. 'PUSH_PROMISE frame received on a server-initiated stream. (RFC7540 6.6)'},
  831. State};
  832. push_promise_frame(Frame=#push_promise{id=StreamID, head=IsHeadFin,
  833. promised_id=PromisedStreamID, data=HeaderData}, State) ->
  834. case stream_get(StreamID, State) of
  835. Stream=#stream{remote=idle} ->
  836. case IsHeadFin of
  837. head_fin ->
  838. headers_decode(#headers{id=PromisedStreamID,
  839. fin=fin, head=IsHeadFin, data=HeaderData},
  840. State, push_promise, Stream);
  841. head_nofin ->
  842. {ok, State#http2_machine{state={continuation, push_promise, Frame}}}
  843. end;
  844. _ ->
  845. %% @todo Check if the stream is lingering. If it is, decode the frame
  846. %% and do what? That's the big question and why it's not implemented yet.
  847. % However, an endpoint that
  848. % has sent RST_STREAM on the associated stream MUST handle PUSH_PROMISE
  849. % frames that might have been created before the RST_STREAM frame is
  850. % received and processed. (RFC7540 6.6)
  851. {error, {connection_error, stream_closed,
  852. 'PUSH_PROMISE frame received on a stream in closed or half-closed state. (RFC7540 5.1, RFC7540 6.6)'},
  853. State}
  854. end.
  855. push_promise_frame(#headers{id=PromisedStreamID},
  856. State0=#http2_machine{
  857. local_settings=#{initial_window_size := RemoteWindow},
  858. remote_settings=#{initial_window_size := LocalWindow}},
  859. #stream{id=StreamID}, PseudoHeaders=#{method := Method}, Headers) ->
  860. TE = case lists:keyfind(<<"te">>, 1, Headers) of
  861. {_, TE0} -> TE0;
  862. false -> undefined
  863. end,
  864. PromisedStream = #stream{id=PromisedStreamID, method=Method,
  865. local=fin, local_window=LocalWindow,
  866. remote_window=RemoteWindow, te=TE},
  867. State = stream_store(PromisedStream,
  868. State0#http2_machine{remote_streamid=PromisedStreamID}),
  869. {ok, {push_promise, StreamID, PromisedStreamID, Headers, PseudoHeaders}, State}.
  870. %% PING frame.
  871. ping_frame({ping, _}, State) ->
  872. {ok, State}.
  873. %% Ack for a previously sent PING frame.
  874. %%
  875. %% @todo Might want to check contents but probably a waste of time.
  876. ping_ack_frame({ping_ack, _}, State) ->
  877. {ok, State}.
  878. %% GOAWAY frame.
  879. goaway_frame(Frame={goaway, _, _, _}, State) ->
  880. {ok, Frame, State}.
  881. %% WINDOW_UPDATE frame.
  882. %% Connection-wide WINDOW_UPDATE frame.
  883. window_update_frame({window_update, Increment}, State=#http2_machine{local_window=ConnWindow})
  884. when ConnWindow + Increment > 16#7fffffff ->
  885. {error, {connection_error, flow_control_error,
  886. 'The flow control window must not be greater than 2^31-1. (RFC7540 6.9.1)'},
  887. State};
  888. window_update_frame({window_update, Increment}, State=#http2_machine{local_window=ConnWindow}) ->
  889. send_data(State#http2_machine{local_window=ConnWindow + Increment});
  890. %% Stream-specific WINDOW_UPDATE frame.
  891. window_update_frame({window_update, StreamID, _}, State=#http2_machine{mode=Mode,
  892. local_streamid=LocalStreamID, remote_streamid=RemoteStreamID})
  893. when (?IS_LOCAL(Mode, StreamID) andalso (StreamID >= LocalStreamID))
  894. orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID > RemoteStreamID)) ->
  895. {error, {connection_error, protocol_error,
  896. 'WINDOW_UPDATE frame received on a stream in idle state. (RFC7540 5.1)'},
  897. State};
  898. window_update_frame({window_update, StreamID, Increment},
  899. State0=#http2_machine{remote_lingering_streams=Lingering}) ->
  900. case stream_get(StreamID, State0) of
  901. #stream{local_window=StreamWindow} when StreamWindow + Increment > 16#7fffffff ->
  902. stream_reset(StreamID, State0, flow_control_error,
  903. 'The flow control window must not be greater than 2^31-1. (RFC7540 6.9.1)');
  904. Stream0 = #stream{local_window=StreamWindow} ->
  905. send_data(Stream0#stream{local_window=StreamWindow + Increment}, State0);
  906. undefined ->
  907. %% WINDOW_UPDATE frames may be received for a short period of time
  908. %% after a stream is closed. They must be ignored.
  909. case lists:member(StreamID, Lingering) of
  910. false -> {ok, State0};
  911. true -> stream_reset(StreamID, State0, stream_closed,
  912. 'WINDOW_UPDATE frame received after the stream was reset. (RFC7540 5.1)')
  913. end
  914. end.
  915. %% CONTINUATION frame.
  916. %% Convenience record to manipulate the tuple.
  917. %% The order of the fields matter.
  918. -record(continuation, {
  919. id :: cow_http2:streamid(),
  920. head :: cow_http2:head_fin(),
  921. data :: binary()
  922. }).
  923. unexpected_continuation_frame(#continuation{}, State) ->
  924. {error, {connection_error, protocol_error,
  925. 'CONTINUATION frames MUST be preceded by a HEADERS or PUSH_PROMISE frame. (RFC7540 6.10)'},
  926. State}.
  927. continuation_frame(#continuation{id=StreamID, head=head_fin, data=HeaderFragment1},
  928. State=#http2_machine{state={continuation, Type,
  929. Frame=#headers{id=StreamID, data=HeaderFragment0}}}) ->
  930. HeaderData = <<HeaderFragment0/binary, HeaderFragment1/binary>>,
  931. headers_decode(Frame#headers{head=head_fin, data=HeaderData},
  932. State#http2_machine{state=normal}, Type, stream_get(StreamID, State));
  933. continuation_frame(#continuation{id=StreamID, head=head_fin, data=HeaderFragment1},
  934. State=#http2_machine{state={continuation, Type, #push_promise{
  935. id=StreamID, promised_id=PromisedStreamID, data=HeaderFragment0}}}) ->
  936. HeaderData = <<HeaderFragment0/binary, HeaderFragment1/binary>>,
  937. headers_decode(#headers{id=PromisedStreamID, fin=fin, head=head_fin, data=HeaderData},
  938. State#http2_machine{state=normal}, Type, undefined);
  939. continuation_frame(#continuation{id=StreamID, data=HeaderFragment1},
  940. State=#http2_machine{state={continuation, Type, ContinuedFrame0}})
  941. when element(2, ContinuedFrame0) =:= StreamID ->
  942. ContinuedFrame = case ContinuedFrame0 of
  943. #headers{data=HeaderFragment0} ->
  944. HeaderData = <<HeaderFragment0/binary, HeaderFragment1/binary>>,
  945. ContinuedFrame0#headers{data=HeaderData};
  946. #push_promise{data=HeaderFragment0} ->
  947. HeaderData = <<HeaderFragment0/binary, HeaderFragment1/binary>>,
  948. ContinuedFrame0#push_promise{data=HeaderData}
  949. end,
  950. {ok, State#http2_machine{state={continuation, Type, ContinuedFrame}}};
  951. continuation_frame(_F, State) ->
  952. {error, {connection_error, protocol_error,
  953. 'An invalid frame was received in the middle of a header block. (RFC7540 6.2)'},
  954. State}.
  955. %% Ignored frames.
  956. -spec ignored_frame(State)
  957. -> {ok, State}
  958. | {error, {connection_error, protocol_error, atom()}, State}
  959. when State::http2_machine().
  960. ignored_frame(State=#http2_machine{state={continuation, _, _}}) ->
  961. {error, {connection_error, protocol_error,
  962. 'An invalid frame was received in the middle of a header block. (RFC7540 6.2)'},
  963. State};
  964. %% @todo It might be useful to error out when we receive
  965. %% too many unknown frames. (RFC7540 10.5)
  966. ignored_frame(State) ->
  967. {ok, State}.
  968. %% Timeouts.
  969. -spec timeout(preface_timeout | settings_timeout, reference(), State)
  970. -> {ok, State}
  971. | {error, {connection_error, cow_http2:error(), atom()}, State}
  972. when State::http2_machine().
  973. timeout(preface_timeout, TRef, State=#http2_machine{preface_timer=TRef}) ->
  974. {error, {connection_error, protocol_error,
  975. 'The preface was not received in a reasonable amount of time.'},
  976. State};
  977. timeout(settings_timeout, TRef, State=#http2_machine{settings_timer=TRef}) ->
  978. {error, {connection_error, settings_timeout,
  979. 'The SETTINGS ack was not received within the configured time. (RFC7540 6.5.3)'},
  980. State};
  981. timeout(_, _, State) ->
  982. {ok, State}.
  983. %% Functions for sending a message header or body. Note that
  984. %% this module does not send data directly, instead it returns
  985. %% a value that can then be used to send the frames.
  986. -spec prepare_headers(cow_http2:streamid(), State, idle | cow_http2:fin(),
  987. pseudo_headers(), cow_http:headers())
  988. -> {ok, cow_http2:fin(), iodata(), State} when State::http2_machine().
  989. prepare_headers(StreamID, State=#http2_machine{encode_state=EncodeState0},
  990. IsFin0, PseudoHeaders, Headers0) ->
  991. Stream = #stream{method=Method, local=idle} = stream_get(StreamID, State),
  992. IsFin = case {IsFin0, Method} of
  993. {idle, _} -> nofin;
  994. {_, <<"HEAD">>} -> fin;
  995. _ -> IsFin0
  996. end,
  997. Headers = merge_pseudo_headers(PseudoHeaders, remove_http11_headers(Headers0)),
  998. {HeaderBlock, EncodeState} = cow_hpack:encode(Headers, EncodeState0),
  999. {ok, IsFin, HeaderBlock, stream_store(Stream#stream{local=IsFin0},
  1000. State#http2_machine{encode_state=EncodeState})}.
  1001. -spec prepare_push_promise(cow_http2:streamid(), State, pseudo_headers(), cow_http:headers())
  1002. -> {ok, cow_http2:streamid(), iodata(), State}
  1003. | {error, no_push} when State::http2_machine().
  1004. prepare_push_promise(_, #http2_machine{remote_settings=#{enable_push := false}}, _, _) ->
  1005. {error, no_push};
  1006. prepare_push_promise(StreamID, State=#http2_machine{encode_state=EncodeState0,
  1007. local_settings=#{initial_window_size := RemoteWindow},
  1008. remote_settings=#{initial_window_size := LocalWindow},
  1009. local_streamid=LocalStreamID}, PseudoHeaders, Headers0) ->
  1010. #stream{local=idle} = stream_get(StreamID, State),
  1011. TE = case lists:keyfind(<<"te">>, 1, Headers0) of
  1012. {_, TE0} -> TE0;
  1013. false -> undefined
  1014. end,
  1015. Headers = merge_pseudo_headers(PseudoHeaders, remove_http11_headers(Headers0)),
  1016. {HeaderBlock, EncodeState} = cow_hpack:encode(Headers, EncodeState0),
  1017. {ok, LocalStreamID, HeaderBlock, stream_store(
  1018. #stream{id=LocalStreamID, method=maps:get(method, PseudoHeaders),
  1019. remote=fin, remote_expected_size=0,
  1020. local_window=LocalWindow, remote_window=RemoteWindow, te=TE},
  1021. State#http2_machine{encode_state=EncodeState, local_streamid=LocalStreamID + 2})}.
  1022. remove_http11_headers(Headers) ->
  1023. RemoveHeaders0 = [
  1024. <<"keep-alive">>,
  1025. <<"proxy-connection">>,
  1026. <<"transfer-encoding">>,
  1027. <<"upgrade">>
  1028. ],
  1029. RemoveHeaders = case lists:keyfind(<<"connection">>, 1, Headers) of
  1030. false ->
  1031. RemoveHeaders0;
  1032. {_, ConnHd} ->
  1033. %% We do not need to worry about any "close" header because
  1034. %% that header name is reserved.
  1035. Connection = cow_http_hd:parse_connection(ConnHd),
  1036. Connection ++ [<<"connection">>|RemoveHeaders0]
  1037. end,
  1038. lists:filter(fun({Name, _}) ->
  1039. not lists:member(Name, RemoveHeaders)
  1040. end, Headers).
  1041. merge_pseudo_headers(PseudoHeaders, Headers0) ->
  1042. lists:foldl(fun
  1043. ({status, Status}, Acc) when is_integer(Status) ->
  1044. [{<<":status">>, integer_to_binary(Status)}|Acc];
  1045. ({Name, Value}, Acc) ->
  1046. [{iolist_to_binary([$:, atom_to_binary(Name, latin1)]), Value}|Acc]
  1047. end, Headers0, maps:to_list(PseudoHeaders)).
  1048. -spec prepare_trailers(cow_http2:streamid(), State, cow_http:headers())
  1049. -> {ok, iodata(), State} when State::http2_machine().
  1050. prepare_trailers(StreamID, State=#http2_machine{encode_state=EncodeState0}, Trailers) ->
  1051. Stream = #stream{local=nofin} = stream_get(StreamID, State),
  1052. {HeaderBlock, EncodeState} = cow_hpack:encode(Trailers, EncodeState0),
  1053. {ok, HeaderBlock, stream_store(Stream#stream{local=fin},
  1054. State#http2_machine{encode_state=EncodeState})}.
  1055. -spec send_or_queue_data(cow_http2:streamid(), State, cow_http2:fin(), DataOrFileOrTrailers)
  1056. -> {ok, State}
  1057. | {send, [{cow_http2:streamid(), cow_http2:fin(), [DataOrFileOrTrailers]}], State}
  1058. when State::http2_machine(), DataOrFileOrTrailers::
  1059. {data, iodata()} | #sendfile{} | {trailers, cow_http:headers()}.
  1060. send_or_queue_data(StreamID, State0=#http2_machine{opts=Opts, local_window=ConnWindow},
  1061. IsFin0, DataOrFileOrTrailers0) ->
  1062. %% @todo Probably just ignore if the method was HEAD.
  1063. Stream0 = #stream{
  1064. local=nofin,
  1065. local_window=StreamWindow,
  1066. local_buffer_size=BufferSize,
  1067. te=TE0
  1068. } = stream_get(StreamID, State0),
  1069. DataOrFileOrTrailers = case DataOrFileOrTrailers0 of
  1070. {trailers, _} ->
  1071. %% We only accept TE headers containing exactly "trailers" (RFC7540 8.1.2.1).
  1072. TE = try cow_http_hd:parse_te(TE0) of
  1073. {trailers, []} -> trailers;
  1074. _ -> no_trailers
  1075. catch _:_ ->
  1076. %% If we can't parse the TE header, assume we can't send trailers.
  1077. no_trailers
  1078. end,
  1079. case TE of
  1080. trailers ->
  1081. DataOrFileOrTrailers0;
  1082. no_trailers ->
  1083. {data, <<>>}
  1084. end;
  1085. _ ->
  1086. DataOrFileOrTrailers0
  1087. end,
  1088. SendSize = case DataOrFileOrTrailers of
  1089. {data, D} -> BufferSize + iolist_size(D);
  1090. #sendfile{bytes=B} -> BufferSize + B;
  1091. {trailers, _} -> 0
  1092. end,
  1093. MinSendSize = maps:get(stream_window_data_threshold, Opts, 16384),
  1094. if
  1095. %% If we cannot send the data all at once and the window
  1096. %% is smaller than we are willing to send at a minimum,
  1097. %% we queue the data directly.
  1098. (StreamWindow < MinSendSize)
  1099. andalso ((StreamWindow < SendSize) orelse (ConnWindow < SendSize)) ->
  1100. {ok, stream_store(queue_data(Stream0, IsFin0, DataOrFileOrTrailers, in), State0)};
  1101. true ->
  1102. case send_or_queue_data(Stream0, State0, [], IsFin0, DataOrFileOrTrailers, in) of
  1103. {ok, Stream, State, []} ->
  1104. {ok, stream_store(Stream, State)};
  1105. {ok, Stream=#stream{local=IsFin}, State, SendData} ->
  1106. {send, [{StreamID, IsFin, lists:reverse(SendData)}], stream_store(Stream, State)}
  1107. end
  1108. end.
  1109. %% Internal data sending/queuing functions.
  1110. %% @todo Should we ever want to implement the PRIORITY mechanism,
  1111. %% this would be the place to do it. Right now, we just go over
  1112. %% all streams and send what we can until either everything is
  1113. %% sent or we run out of space in the window.
  1114. send_data(State0=#http2_machine{streams=Streams0}) ->
  1115. case send_data_for_all_streams(Streams0, State0, [], []) of
  1116. {ok, Streams, State, []} ->
  1117. {ok, State#http2_machine{streams=Streams}};
  1118. {ok, Streams, State, Send} ->
  1119. {send, Send, State#http2_machine{streams=Streams}}
  1120. end.
  1121. send_data_for_all_streams([], State, Acc, Send) ->
  1122. {ok, lists:reverse(Acc), State, Send};
  1123. %% While technically we should never get < 0 here, let's be on the safe side.
  1124. send_data_for_all_streams(Tail, State=#http2_machine{local_window=ConnWindow}, Acc, Send)
  1125. when ConnWindow =< 0 ->
  1126. {ok, lists:reverse(Acc, Tail), State, Send};
  1127. %% We rely on send_data_for_one_stream/3 to do all the necessary checks about the stream.
  1128. send_data_for_all_streams([Stream0|Tail], State0, Acc, Send) ->
  1129. case send_data_for_one_stream(Stream0, State0, []) of
  1130. {ok, Stream, State, []} ->
  1131. send_data_for_all_streams(Tail, State, [Stream|Acc], Send);
  1132. %% We need to remove the stream here because we do not use stream_store/2.
  1133. {ok, #stream{id=StreamID, local=fin, remote=fin}, State, SendData} ->
  1134. send_data_for_all_streams(Tail, State, Acc,
  1135. [{StreamID, fin, SendData}|Send]);
  1136. {ok, Stream=#stream{id=StreamID, local=IsFin}, State, SendData} ->
  1137. send_data_for_all_streams(Tail, State, [Stream|Acc],
  1138. [{StreamID, IsFin, SendData}|Send])
  1139. end.
  1140. send_data(Stream0, State0) ->
  1141. case send_data_for_one_stream(Stream0, State0, []) of
  1142. {ok, Stream, State, []} ->
  1143. {ok, stream_store(Stream, State)};
  1144. {ok, Stream=#stream{id=StreamID, local=IsFin}, State, SendData} ->
  1145. {send, [{StreamID, IsFin, SendData}], stream_store(Stream, State)}
  1146. end.
  1147. send_data_for_one_stream(Stream=#stream{local=nofin, local_buffer_size=0,
  1148. local_trailers=Trailers}, State, SendAcc) when Trailers =/= undefined ->
  1149. {ok, Stream, State, lists:reverse([{trailers, Trailers}|SendAcc])};
  1150. send_data_for_one_stream(Stream=#stream{local=nofin, local_buffer=Q0, local_buffer_size=0},
  1151. State, SendAcc) ->
  1152. case queue:len(Q0) of
  1153. 0 ->
  1154. {ok, Stream, State, lists:reverse(SendAcc)};
  1155. 1 ->
  1156. %% We know there is a final empty data frame in the queue.
  1157. %% We need to mark the stream as complete.
  1158. {{value, {fin, 0, _}}, Q} = queue:out(Q0),
  1159. {ok, Stream#stream{local=fin, local_buffer=Q}, State, lists:reverse(SendAcc)}
  1160. end;
  1161. send_data_for_one_stream(Stream=#stream{local=IsFin, local_window=StreamWindow,
  1162. local_buffer_size=BufferSize}, State=#http2_machine{local_window=ConnWindow}, SendAcc)
  1163. when ConnWindow =< 0; IsFin =:= fin; StreamWindow =< 0; BufferSize =:= 0 ->
  1164. {ok, Stream, State, lists:reverse(SendAcc)};
  1165. send_data_for_one_stream(Stream0=#stream{local_window=StreamWindow,
  1166. local_buffer=Q0, local_buffer_size=BufferSize},
  1167. State0=#http2_machine{opts=Opts, local_window=ConnWindow}, SendAcc0) ->
  1168. MinSendSize = maps:get(stream_window_data_threshold, Opts, 16384),
  1169. if
  1170. %% If we cannot send the entire buffer at once and the window
  1171. %% is smaller than we are willing to send at a minimum, do nothing.
  1172. %%
  1173. %% We only do this check the first time we go through this function;
  1174. %% we want to send as much data as possible IF we send some.
  1175. (SendAcc0 =:= []) andalso (StreamWindow < MinSendSize)
  1176. andalso ((StreamWindow < BufferSize) orelse (ConnWindow < BufferSize)) ->
  1177. {ok, Stream0, State0, []};
  1178. true ->
  1179. %% We know there is an item in the queue.
  1180. {{value, {IsFin, DataSize, Data}}, Q} = queue:out(Q0),
  1181. Stream1 = Stream0#stream{local_buffer=Q, local_buffer_size=BufferSize - DataSize},
  1182. {ok, Stream, State, SendAcc}
  1183. = send_or_queue_data(Stream1, State0, SendAcc0, IsFin, Data, in_r),
  1184. send_data_for_one_stream(Stream, State, SendAcc)
  1185. end.
  1186. %% We can send trailers immediately if the queue is empty, otherwise we queue.
  1187. %% We always send trailer frames even if the window is empty.
  1188. send_or_queue_data(Stream=#stream{local_buffer_size=0},
  1189. State, SendAcc, fin, {trailers, Trailers}, _) ->
  1190. {ok, Stream, State, [{trailers, Trailers}|SendAcc]};
  1191. send_or_queue_data(Stream, State, SendAcc, fin, {trailers, Trailers}, _) ->
  1192. {ok, Stream#stream{local_trailers=Trailers}, State, SendAcc};
  1193. %% Send data immediately if we can, buffer otherwise.
  1194. send_or_queue_data(Stream=#stream{local_window=StreamWindow},
  1195. State=#http2_machine{local_window=ConnWindow},
  1196. SendAcc, IsFin, Data, In)
  1197. when ConnWindow =< 0; StreamWindow =< 0 ->
  1198. {ok, queue_data(Stream, IsFin, Data, In), State, SendAcc};
  1199. send_or_queue_data(Stream=#stream{local_window=StreamWindow},
  1200. State=#http2_machine{opts=Opts, remote_settings=RemoteSettings,
  1201. local_window=ConnWindow}, SendAcc, IsFin, Data, In) ->
  1202. RemoteMaxFrameSize = maps:get(max_frame_size, RemoteSettings, 16384),
  1203. ConfiguredMaxFrameSize = maps:get(max_frame_size_sent, Opts, infinity),
  1204. MaxSendSize = min(
  1205. min(ConnWindow, StreamWindow),
  1206. min(RemoteMaxFrameSize, ConfiguredMaxFrameSize)
  1207. ),
  1208. case Data of
  1209. File = #sendfile{bytes=Bytes} when Bytes =< MaxSendSize ->
  1210. {ok, Stream#stream{local=IsFin, local_window=StreamWindow - Bytes},
  1211. State#http2_machine{local_window=ConnWindow - Bytes},
  1212. [File|SendAcc]};
  1213. File = #sendfile{offset=Offset, bytes=Bytes} ->
  1214. send_or_queue_data(Stream#stream{local_window=StreamWindow - MaxSendSize},
  1215. State#http2_machine{local_window=ConnWindow - MaxSendSize},
  1216. [File#sendfile{bytes=MaxSendSize}|SendAcc], IsFin,
  1217. File#sendfile{offset=Offset + MaxSendSize, bytes=Bytes - MaxSendSize}, In);
  1218. {data, Iolist0} ->
  1219. IolistSize = iolist_size(Iolist0),
  1220. if
  1221. IolistSize =< MaxSendSize ->
  1222. {ok, Stream#stream{local=IsFin, local_window=StreamWindow - IolistSize},
  1223. State#http2_machine{local_window=ConnWindow - IolistSize},
  1224. [{data, Iolist0}|SendAcc]};
  1225. true ->
  1226. {Iolist, More} = cow_iolists:split(MaxSendSize, Iolist0),
  1227. send_or_queue_data(Stream#stream{local_window=StreamWindow - MaxSendSize},
  1228. State#http2_machine{local_window=ConnWindow - MaxSendSize},
  1229. [{data, Iolist}|SendAcc], IsFin, {data, More}, In)
  1230. end
  1231. end.
  1232. queue_data(Stream=#stream{local_buffer=Q0, local_buffer_size=Size0}, IsFin, Data, In) ->
  1233. DataSize = case Data of
  1234. {sendfile, _, Bytes, _} -> Bytes;
  1235. {data, Iolist} -> iolist_size(Iolist)
  1236. end,
  1237. %% Never queue non-final empty data frames.
  1238. case {DataSize, IsFin} of
  1239. {0, nofin} ->
  1240. Stream;
  1241. _ ->
  1242. Q = queue:In({IsFin, DataSize, Data}, Q0),
  1243. Stream#stream{local_buffer=Q, local_buffer_size=Size0 + DataSize}
  1244. end.
  1245. %% Public interface to update the flow control window.
  1246. %%
  1247. %% The ensure_window function applies heuristics to avoid updating the
  1248. %% window when it is not necessary. The update_window function updates
  1249. %% the window unconditionally.
  1250. %%
  1251. %% The ensure_window function should be called when requesting more
  1252. %% data (for example when reading a request or response body) as well
  1253. %% as when receiving new data. Failure to do so may result in the
  1254. %% window being depleted.
  1255. %%
  1256. %% The heuristics dictating whether the window must be updated and
  1257. %% what the window size is depends on three options (margin, max
  1258. %% and threshold) along with the Size argument. The window increment
  1259. %% returned by this function may therefore be smaller than the Size
  1260. %% argument. On the other hand the total window allocated over many
  1261. %% calls may end up being larger than the initial Size argument. As
  1262. %% a result, it is the responsibility of the caller to ensure that
  1263. %% the Size argument is never lower than 0.
  1264. -spec ensure_window(non_neg_integer(), State)
  1265. -> ok | {ok, pos_integer(), State} when State::http2_machine().
  1266. ensure_window(Size, State=#http2_machine{opts=Opts, remote_window=RemoteWindow}) ->
  1267. case ensure_window(Size, RemoteWindow, connection, Opts) of
  1268. ok ->
  1269. ok;
  1270. {ok, Increment} ->
  1271. {ok, Increment, State#http2_machine{remote_window=RemoteWindow + Increment}}
  1272. end.
  1273. -spec ensure_window(cow_http2:streamid(), non_neg_integer(), State)
  1274. -> ok | {ok, pos_integer(), State} when State::http2_machine().
  1275. ensure_window(StreamID, Size, State=#http2_machine{opts=Opts}) ->
  1276. case stream_get(StreamID, State) of
  1277. %% For simplicity's sake, we do not consider attempts to ensure the window
  1278. %% of a terminated stream to be errors. We simply act as if the stream
  1279. %% window is large enough.
  1280. undefined ->
  1281. ok;
  1282. Stream = #stream{remote_window=RemoteWindow} ->
  1283. case ensure_window(Size, RemoteWindow, stream, Opts) of
  1284. ok ->
  1285. ok;
  1286. {ok, Increment} ->
  1287. {ok, Increment, stream_store(Stream#stream{remote_window=RemoteWindow + Increment}, State)}
  1288. end
  1289. end.
  1290. %% No need to update the window when we are not expecting data.
  1291. ensure_window(0, _, _, _) ->
  1292. ok;
  1293. %% No need to update the window when it is already high enough.
  1294. ensure_window(Size, Window, _, _) when Size =< Window ->
  1295. ok;
  1296. ensure_window(Size0, Window, Type, Opts) ->
  1297. Threshold = ensure_window_threshold(Type, Opts),
  1298. if
  1299. %% We do not update the window when it is higher than the threshold.
  1300. Window > Threshold ->
  1301. ok;
  1302. true ->
  1303. Margin = ensure_window_margin(Type, Opts),
  1304. Size = Size0 + Margin,
  1305. MaxWindow = ensure_window_max(Type, Opts),
  1306. Increment = if
  1307. %% We cannot go above the maximum window size.
  1308. Size > MaxWindow -> MaxWindow - Window;
  1309. true -> Size - Window
  1310. end,
  1311. case Increment of
  1312. 0 -> ok;
  1313. _ -> {ok, Increment}
  1314. end
  1315. end.
  1316. %% Margin defaults to the default initial window size.
  1317. ensure_window_margin(connection, Opts) ->
  1318. maps:get(connection_window_margin_size, Opts, 65535);
  1319. ensure_window_margin(stream, Opts) ->
  1320. maps:get(stream_window_margin_size, Opts, 65535).
  1321. %% Max window defaults to the max value allowed by the protocol.
  1322. ensure_window_max(connection, Opts) ->
  1323. maps:get(max_connection_window_size, Opts, 16#7fffffff);
  1324. ensure_window_max(stream, Opts) ->
  1325. maps:get(max_stream_window_size, Opts, 16#7fffffff).
  1326. %% Threshold defaults to 10 times the default frame size.
  1327. ensure_window_threshold(connection, Opts) ->
  1328. maps:get(connection_window_update_threshold, Opts, 163840);
  1329. ensure_window_threshold(stream, Opts) ->
  1330. maps:get(stream_window_update_threshold, Opts, 163840).
  1331. -spec update_window(1..16#7fffffff, State)
  1332. -> State when State::http2_machine().
  1333. update_window(Size, State=#http2_machine{remote_window=RemoteWindow})
  1334. when Size > 0 ->
  1335. State#http2_machine{remote_window=RemoteWindow + Size}.
  1336. -spec update_window(cow_http2:streamid(), 1..16#7fffffff, State)
  1337. -> State when State::http2_machine().
  1338. update_window(StreamID, Size, State)
  1339. when Size > 0 ->
  1340. Stream = #stream{remote_window=RemoteWindow} = stream_get(StreamID, State),
  1341. stream_store(Stream#stream{remote_window=RemoteWindow + Size}, State).
  1342. %% Public interface to reset streams.
  1343. -spec reset_stream(cow_http2:streamid(), State)
  1344. -> {ok, State} | {error, not_found} when State::http2_machine().
  1345. reset_stream(StreamID, State=#http2_machine{streams=Streams0}) ->
  1346. case lists:keytake(StreamID, #stream.id, Streams0) of
  1347. {value, _, Streams} ->
  1348. {ok, stream_linger(StreamID, State#http2_machine{streams=Streams})};
  1349. false ->
  1350. {error, not_found}
  1351. end.
  1352. %% Retrieve the buffer size for all streams.
  1353. -spec get_connection_local_buffer_size(http2_machine()) -> non_neg_integer().
  1354. get_connection_local_buffer_size(#http2_machine{streams=Streams}) ->
  1355. lists:foldl(fun(#stream{local_buffer_size=Size}, Acc) ->
  1356. Acc + Size
  1357. end, 0, Streams).
  1358. %% Retrieve a setting value, or its default value if not set.
  1359. -spec get_local_setting(atom(), http2_machine()) -> atom() | integer().
  1360. get_local_setting(Key, #http2_machine{local_settings=Settings}) ->
  1361. maps:get(Key, Settings, default_setting_value(Key)).
  1362. default_setting_value(header_table_size) -> 4096;
  1363. default_setting_value(enable_push) -> true;
  1364. default_setting_value(max_concurrent_streams) -> infinity;
  1365. default_setting_value(initial_window_size) -> 65535;
  1366. default_setting_value(max_frame_size) -> 16384;
  1367. default_setting_value(max_header_list_size) -> infinity;
  1368. default_setting_value(enable_connect_protocol) -> false.
  1369. %% Function to obtain the last known streamid received
  1370. %% for the purposes of sending a GOAWAY frame and closing the connection.
  1371. -spec get_last_streamid(http2_machine()) -> cow_http2:streamid().
  1372. get_last_streamid(#http2_machine{remote_streamid=RemoteStreamID}) ->
  1373. RemoteStreamID.
  1374. %% Retrieve the local buffer size for a stream.
  1375. -spec get_stream_local_buffer_size(cow_http2:streamid(), http2_machine())
  1376. -> {ok, non_neg_integer()} | {error, not_found | closed}.
  1377. get_stream_local_buffer_size(StreamID, State=#http2_machine{mode=Mode,
  1378. local_streamid=LocalStreamID, remote_streamid=RemoteStreamID}) ->
  1379. case stream_get(StreamID, State) of
  1380. #stream{local_buffer_size=Size} ->
  1381. {ok, Size};
  1382. undefined when (?IS_LOCAL(Mode, StreamID) andalso (StreamID < LocalStreamID))
  1383. orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID =< RemoteStreamID)) ->
  1384. {error, closed};
  1385. undefined ->
  1386. {error, not_found}
  1387. end.
  1388. %% Retrieve the local state for a stream, including the state in the queue.
  1389. -spec get_stream_local_state(cow_http2:streamid(), http2_machine())
  1390. -> {ok, idle | cow_http2:fin(), empty | nofin | fin} | {error, not_found | closed}.
  1391. get_stream_local_state(StreamID, State=#http2_machine{mode=Mode,
  1392. local_streamid=LocalStreamID, remote_streamid=RemoteStreamID}) ->
  1393. case stream_get(StreamID, State) of
  1394. #stream{local=IsFin, local_buffer=Q, local_trailers=undefined} ->
  1395. IsQueueFin = case queue:peek_r(Q) of
  1396. empty -> empty;
  1397. {value, {IsQueueFin0, _, _}} -> IsQueueFin0
  1398. end,
  1399. {ok, IsFin, IsQueueFin};
  1400. %% Trailers are queued so the local state is fin after the queue is drained.
  1401. #stream{local=IsFin} ->
  1402. {ok, IsFin, fin};
  1403. undefined when (?IS_LOCAL(Mode, StreamID) andalso (StreamID < LocalStreamID))
  1404. orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID =< RemoteStreamID)) ->
  1405. {error, closed};
  1406. undefined ->
  1407. {error, not_found}
  1408. end.
  1409. %% Retrieve the remote state for a stream.
  1410. -spec get_stream_remote_state(cow_http2:streamid(), http2_machine())
  1411. -> {ok, idle | cow_http2:fin()} | {error, not_found | closed}.
  1412. get_stream_remote_state(StreamID, State=#http2_machine{mode=Mode,
  1413. local_streamid=LocalStreamID, remote_streamid=RemoteStreamID}) ->
  1414. case stream_get(StreamID, State) of
  1415. #stream{remote=IsFin} ->
  1416. {ok, IsFin};
  1417. undefined when (?IS_LOCAL(Mode, StreamID) andalso (StreamID < LocalStreamID))
  1418. orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID =< RemoteStreamID)) ->
  1419. {error, closed};
  1420. undefined ->
  1421. {error, not_found}
  1422. end.
  1423. %% Query whether the stream was reset recently by the remote endpoint.
  1424. -spec is_lingering_stream(cow_http2:streamid(), http2_machine()) -> boolean().
  1425. is_lingering_stream(StreamID, #http2_machine{
  1426. local_lingering_streams=Local, remote_lingering_streams=Remote}) ->
  1427. case lists:member(StreamID, Local) of
  1428. true -> true;
  1429. false -> lists:member(StreamID, Remote)
  1430. end.
  1431. %% Stream-related functions.
  1432. stream_get(StreamID, #http2_machine{streams=Streams}) ->
  1433. case lists:keyfind(StreamID, #stream.id, Streams) of
  1434. false -> undefined;
  1435. Stream -> Stream
  1436. end.
  1437. stream_store(#stream{id=StreamID, local=fin, remote=fin},
  1438. State=#http2_machine{streams=Streams0}) ->
  1439. Streams = lists:keydelete(StreamID, #stream.id, Streams0),
  1440. State#http2_machine{streams=Streams};
  1441. stream_store(Stream=#stream{id=StreamID},
  1442. State=#http2_machine{streams=Streams0}) ->
  1443. Streams = lists:keystore(StreamID, #stream.id, Streams0, Stream),
  1444. State#http2_machine{streams=Streams}.
  1445. %% @todo Don't send an RST_STREAM if one was already sent.
  1446. stream_reset(StreamID, State, Reason, HumanReadable) ->
  1447. {error, {stream_error, StreamID, Reason, HumanReadable},
  1448. stream_linger(StreamID, State)}.
  1449. stream_linger(StreamID, State=#http2_machine{local_lingering_streams=Lingering0}) ->
  1450. %% We only keep up to 100 streams in this state. @todo Make it configurable?
  1451. Lingering = [StreamID|lists:sublist(Lingering0, 100 - 1)],
  1452. State#http2_machine{local_lingering_streams=Lingering}.