|
@@ -146,7 +146,7 @@
|
|
|
|
|
|
%% Currently active HTTP/2 streams. Streams may be initiated either
|
|
%% Currently active HTTP/2 streams. Streams may be initiated either
|
|
%% by the client or by the server through PUSH_PROMISE frames.
|
|
%% by the client or by the server through PUSH_PROMISE frames.
|
|
- streams = [] :: [stream()],
|
|
|
|
|
|
+ streams = #{} :: #{cow_http2:streamid() => stream()},
|
|
|
|
|
|
%% HTTP/2 streams that have recently been reset locally.
|
|
%% HTTP/2 streams that have recently been reset locally.
|
|
%% We are expected to keep receiving additional frames after
|
|
%% We are expected to keep receiving additional frames after
|
|
@@ -519,7 +519,7 @@ headers_enforce_concurrency_limit(Frame=#headers{id=StreamID},
|
|
MaxConcurrentStreams = maps:get(max_concurrent_streams, LocalSettings, infinity),
|
|
MaxConcurrentStreams = maps:get(max_concurrent_streams, LocalSettings, infinity),
|
|
%% Using < is correct because this new stream is not included
|
|
%% Using < is correct because this new stream is not included
|
|
%% in the Streams variable yet and so we'll end up with +1 stream.
|
|
%% in the Streams variable yet and so we'll end up with +1 stream.
|
|
- case length(Streams) < MaxConcurrentStreams of
|
|
|
|
|
|
+ case map_size(Streams) < MaxConcurrentStreams of
|
|
true ->
|
|
true ->
|
|
headers_pseudo_headers(Frame, State, Type, Stream, Headers);
|
|
headers_pseudo_headers(Frame, State, Type, Stream, Headers);
|
|
false ->
|
|
false ->
|
|
@@ -803,7 +803,7 @@ rst_stream_frame({rst_stream, StreamID, _}, State=#http2_machine{mode=Mode,
|
|
State};
|
|
State};
|
|
rst_stream_frame({rst_stream, StreamID, Reason}, State=#http2_machine{
|
|
rst_stream_frame({rst_stream, StreamID, Reason}, State=#http2_machine{
|
|
streams=Streams0, remote_lingering_streams=Lingering0}) ->
|
|
streams=Streams0, remote_lingering_streams=Lingering0}) ->
|
|
- Streams = lists:keydelete(StreamID, #stream.id, Streams0),
|
|
|
|
|
|
+ Streams = maps:remove(StreamID, Streams0),
|
|
%% We only keep up to 10 streams in this state. @todo Make it configurable?
|
|
%% We only keep up to 10 streams in this state. @todo Make it configurable?
|
|
Lingering = [StreamID|lists:sublist(Lingering0, 10 - 1)],
|
|
Lingering = [StreamID|lists:sublist(Lingering0, 10 - 1)],
|
|
{ok, {rst_stream, StreamID, Reason},
|
|
{ok, {rst_stream, StreamID, Reason},
|
|
@@ -839,9 +839,9 @@ settings_frame(_F, State) ->
|
|
%% the local stream windows for all active streams and perhaps
|
|
%% the local stream windows for all active streams and perhaps
|
|
%% resume sending data.
|
|
%% resume sending data.
|
|
streams_update_local_window(State=#http2_machine{streams=Streams0}, Increment) ->
|
|
streams_update_local_window(State=#http2_machine{streams=Streams0}, Increment) ->
|
|
- Streams = [
|
|
|
|
|
|
+ Streams = maps:map(fun(_, S=#stream{local_window=StreamWindow}) ->
|
|
S#stream{local_window=StreamWindow + Increment}
|
|
S#stream{local_window=StreamWindow + Increment}
|
|
- || S=#stream{local_window=StreamWindow} <- Streams0],
|
|
|
|
|
|
+ end, Streams0),
|
|
State#http2_machine{streams=Streams}.
|
|
State#http2_machine{streams=Streams}.
|
|
|
|
|
|
%% Ack for a previously sent SETTINGS frame.
|
|
%% Ack for a previously sent SETTINGS frame.
|
|
@@ -869,9 +869,9 @@ settings_ack_frame(State0=#http2_machine{settings_timer=TRef,
|
|
%% When we receive an ack to a SETTINGS frame we sent we need to update
|
|
%% When we receive an ack to a SETTINGS frame we sent we need to update
|
|
%% the remote stream windows for all active streams.
|
|
%% the remote stream windows for all active streams.
|
|
streams_update_remote_window(State=#http2_machine{streams=Streams0}, Increment) ->
|
|
streams_update_remote_window(State=#http2_machine{streams=Streams0}, Increment) ->
|
|
- Streams = [
|
|
|
|
|
|
+ Streams = maps:map(fun(_, S=#stream{remote_window=StreamWindow}) ->
|
|
S#stream{remote_window=StreamWindow + Increment}
|
|
S#stream{remote_window=StreamWindow + Increment}
|
|
- || S=#stream{remote_window=StreamWindow} <- Streams0],
|
|
|
|
|
|
+ end, Streams0),
|
|
State#http2_machine{streams=Streams}.
|
|
State#http2_machine{streams=Streams}.
|
|
|
|
|
|
%% PUSH_PROMISE frame.
|
|
%% PUSH_PROMISE frame.
|
|
@@ -1213,11 +1213,11 @@ send_or_queue_data(StreamID, State0=#http2_machine{opts=Opts, local_window=ConnW
|
|
%% all streams and send what we can until either everything is
|
|
%% all streams and send what we can until either everything is
|
|
%% sent or we run out of space in the window.
|
|
%% sent or we run out of space in the window.
|
|
send_data(State0=#http2_machine{streams=Streams0}) ->
|
|
send_data(State0=#http2_machine{streams=Streams0}) ->
|
|
- case send_data_for_all_streams(Streams0, State0, [], []) of
|
|
|
|
|
|
+ case send_data_for_all_streams(maps:to_list(Streams0), State0, [], []) of
|
|
{ok, Streams, State, []} ->
|
|
{ok, Streams, State, []} ->
|
|
- {ok, State#http2_machine{streams=Streams}};
|
|
|
|
|
|
+ {ok, State#http2_machine{streams=maps:from_list(Streams)}};
|
|
{ok, Streams, State, Send} ->
|
|
{ok, Streams, State, Send} ->
|
|
- {send, Send, State#http2_machine{streams=Streams}}
|
|
|
|
|
|
+ {send, Send, State#http2_machine{streams=maps:from_list(Streams)}}
|
|
end.
|
|
end.
|
|
|
|
|
|
send_data_for_all_streams([], State, Acc, Send) ->
|
|
send_data_for_all_streams([], State, Acc, Send) ->
|
|
@@ -1227,16 +1227,16 @@ send_data_for_all_streams(Tail, State=#http2_machine{local_window=ConnWindow}, A
|
|
when ConnWindow =< 0 ->
|
|
when ConnWindow =< 0 ->
|
|
{ok, lists:reverse(Acc, Tail), State, Send};
|
|
{ok, lists:reverse(Acc, Tail), State, Send};
|
|
%% We rely on send_data_for_one_stream/3 to do all the necessary checks about the stream.
|
|
%% We rely on send_data_for_one_stream/3 to do all the necessary checks about the stream.
|
|
-send_data_for_all_streams([Stream0|Tail], State0, Acc, Send) ->
|
|
|
|
|
|
+send_data_for_all_streams([{StreamID, Stream0}|Tail], State0, Acc, Send) ->
|
|
case send_data_for_one_stream(Stream0, State0, []) of
|
|
case send_data_for_one_stream(Stream0, State0, []) of
|
|
{ok, Stream, State, []} ->
|
|
{ok, Stream, State, []} ->
|
|
- send_data_for_all_streams(Tail, State, [Stream|Acc], Send);
|
|
|
|
|
|
+ send_data_for_all_streams(Tail, State, [{StreamID, Stream}|Acc], Send);
|
|
%% We need to remove the stream here because we do not use stream_store/2.
|
|
%% We need to remove the stream here because we do not use stream_store/2.
|
|
{ok, #stream{id=StreamID, local=fin, remote=fin}, State, SendData} ->
|
|
{ok, #stream{id=StreamID, local=fin, remote=fin}, State, SendData} ->
|
|
send_data_for_all_streams(Tail, State, Acc,
|
|
send_data_for_all_streams(Tail, State, Acc,
|
|
[{StreamID, fin, SendData}|Send]);
|
|
[{StreamID, fin, SendData}|Send]);
|
|
{ok, Stream=#stream{id=StreamID, local=IsFin}, State, SendData} ->
|
|
{ok, Stream=#stream{id=StreamID, local=IsFin}, State, SendData} ->
|
|
- send_data_for_all_streams(Tail, State, [Stream|Acc],
|
|
|
|
|
|
+ send_data_for_all_streams(Tail, State, [{StreamID, Stream}|Acc],
|
|
[{StreamID, IsFin, SendData}|Send])
|
|
[{StreamID, IsFin, SendData}|Send])
|
|
end.
|
|
end.
|
|
|
|
|
|
@@ -1460,10 +1460,10 @@ update_window(StreamID, Size, State)
|
|
-spec reset_stream(cow_http2:streamid(), State)
|
|
-spec reset_stream(cow_http2:streamid(), State)
|
|
-> {ok, State} | {error, not_found} when State::http2_machine().
|
|
-> {ok, State} | {error, not_found} when State::http2_machine().
|
|
reset_stream(StreamID, State=#http2_machine{streams=Streams0}) ->
|
|
reset_stream(StreamID, State=#http2_machine{streams=Streams0}) ->
|
|
- case lists:keytake(StreamID, #stream.id, Streams0) of
|
|
|
|
- {value, _, Streams} ->
|
|
|
|
|
|
+ case maps:take(StreamID, #stream.id, Streams0) of
|
|
|
|
+ {_, Streams} ->
|
|
{ok, stream_linger(StreamID, State#http2_machine{streams=Streams})};
|
|
{ok, stream_linger(StreamID, State#http2_machine{streams=Streams})};
|
|
- false ->
|
|
|
|
|
|
+ error ->
|
|
{error, not_found}
|
|
{error, not_found}
|
|
end.
|
|
end.
|
|
|
|
|
|
@@ -1471,7 +1471,7 @@ reset_stream(StreamID, State=#http2_machine{streams=Streams0}) ->
|
|
|
|
|
|
-spec get_connection_local_buffer_size(http2_machine()) -> non_neg_integer().
|
|
-spec get_connection_local_buffer_size(http2_machine()) -> non_neg_integer().
|
|
get_connection_local_buffer_size(#http2_machine{streams=Streams}) ->
|
|
get_connection_local_buffer_size(#http2_machine{streams=Streams}) ->
|
|
- lists:foldl(fun(#stream{local_buffer_size=Size}, Acc) ->
|
|
|
|
|
|
+ maps:fold(fun(_, #stream{local_buffer_size=Size}, Acc) ->
|
|
Acc + Size
|
|
Acc + Size
|
|
end, 0, Streams).
|
|
end, 0, Streams).
|
|
|
|
|
|
@@ -1564,19 +1564,15 @@ is_lingering_stream(StreamID, #http2_machine{
|
|
%% Stream-related functions.
|
|
%% Stream-related functions.
|
|
|
|
|
|
stream_get(StreamID, #http2_machine{streams=Streams}) ->
|
|
stream_get(StreamID, #http2_machine{streams=Streams}) ->
|
|
- case lists:keyfind(StreamID, #stream.id, Streams) of
|
|
|
|
- false -> undefined;
|
|
|
|
- Stream -> Stream
|
|
|
|
- end.
|
|
|
|
|
|
+ maps:get(StreamID, Streams, undefined).
|
|
|
|
|
|
stream_store(#stream{id=StreamID, local=fin, remote=fin},
|
|
stream_store(#stream{id=StreamID, local=fin, remote=fin},
|
|
State=#http2_machine{streams=Streams0}) ->
|
|
State=#http2_machine{streams=Streams0}) ->
|
|
- Streams = lists:keydelete(StreamID, #stream.id, Streams0),
|
|
|
|
|
|
+ Streams = maps:remove(StreamID, Streams0),
|
|
State#http2_machine{streams=Streams};
|
|
State#http2_machine{streams=Streams};
|
|
stream_store(Stream=#stream{id=StreamID},
|
|
stream_store(Stream=#stream{id=StreamID},
|
|
- State=#http2_machine{streams=Streams0}) ->
|
|
|
|
- Streams = lists:keystore(StreamID, #stream.id, Streams0, Stream),
|
|
|
|
- State#http2_machine{streams=Streams}.
|
|
|
|
|
|
+ State=#http2_machine{streams=Streams}) ->
|
|
|
|
+ State#http2_machine{streams=Streams#{StreamID => Stream}}.
|
|
|
|
|
|
%% @todo Don't send an RST_STREAM if one was already sent.
|
|
%% @todo Don't send an RST_STREAM if one was already sent.
|
|
stream_reset(StreamID, State, Reason, HumanReadable) ->
|
|
stream_reset(StreamID, State, Reason, HumanReadable) ->
|