Browse Source

Use a map for streams in cow_http2_machine

Loïc Hoguin 5 years ago
parent
commit
e7852121a0
1 changed files with 30 additions and 32 deletions
  1. 30 32
      src/cow_http2_machine.erl

+ 30 - 32
src/cow_http2_machine.erl

@@ -146,7 +146,7 @@
 
 
 	%% Currently active HTTP/2 streams. Streams may be initiated either
 	%% Currently active HTTP/2 streams. Streams may be initiated either
 	%% by the client or by the server through PUSH_PROMISE frames.
 	%% by the client or by the server through PUSH_PROMISE frames.
-	streams = [] :: [stream()],
+	streams = #{} :: #{cow_http2:streamid() => stream()},
 
 
 	%% HTTP/2 streams that have recently been reset locally.
 	%% HTTP/2 streams that have recently been reset locally.
 	%% We are expected to keep receiving additional frames after
 	%% We are expected to keep receiving additional frames after
@@ -519,7 +519,7 @@ headers_enforce_concurrency_limit(Frame=#headers{id=StreamID},
 	MaxConcurrentStreams = maps:get(max_concurrent_streams, LocalSettings, infinity),
 	MaxConcurrentStreams = maps:get(max_concurrent_streams, LocalSettings, infinity),
 	%% Using < is correct because this new stream is not included
 	%% Using < is correct because this new stream is not included
 	%% in the Streams variable yet and so we'll end up with +1 stream.
 	%% in the Streams variable yet and so we'll end up with +1 stream.
-	case length(Streams) < MaxConcurrentStreams of
+	case map_size(Streams) < MaxConcurrentStreams of
 		true ->
 		true ->
 			headers_pseudo_headers(Frame, State, Type, Stream, Headers);
 			headers_pseudo_headers(Frame, State, Type, Stream, Headers);
 		false ->
 		false ->
@@ -803,7 +803,7 @@ rst_stream_frame({rst_stream, StreamID, _}, State=#http2_machine{mode=Mode,
 		State};
 		State};
 rst_stream_frame({rst_stream, StreamID, Reason}, State=#http2_machine{
 rst_stream_frame({rst_stream, StreamID, Reason}, State=#http2_machine{
 		streams=Streams0, remote_lingering_streams=Lingering0}) ->
 		streams=Streams0, remote_lingering_streams=Lingering0}) ->
-	Streams = lists:keydelete(StreamID, #stream.id, Streams0),
+	Streams = maps:remove(StreamID, Streams0),
 	%% We only keep up to 10 streams in this state. @todo Make it configurable?
 	%% We only keep up to 10 streams in this state. @todo Make it configurable?
 	Lingering = [StreamID|lists:sublist(Lingering0, 10 - 1)],
 	Lingering = [StreamID|lists:sublist(Lingering0, 10 - 1)],
 	{ok, {rst_stream, StreamID, Reason},
 	{ok, {rst_stream, StreamID, Reason},
@@ -839,9 +839,9 @@ settings_frame(_F, State) ->
 %% the local stream windows for all active streams and perhaps
 %% the local stream windows for all active streams and perhaps
 %% resume sending data.
 %% resume sending data.
 streams_update_local_window(State=#http2_machine{streams=Streams0}, Increment) ->
 streams_update_local_window(State=#http2_machine{streams=Streams0}, Increment) ->
-	Streams = [
+	Streams = maps:map(fun(_, S=#stream{local_window=StreamWindow}) ->
 		S#stream{local_window=StreamWindow + Increment}
 		S#stream{local_window=StreamWindow + Increment}
-	|| S=#stream{local_window=StreamWindow} <- Streams0],
+	end, Streams0),
 	State#http2_machine{streams=Streams}.
 	State#http2_machine{streams=Streams}.
 
 
 %% Ack for a previously sent SETTINGS frame.
 %% Ack for a previously sent SETTINGS frame.
@@ -869,9 +869,9 @@ settings_ack_frame(State0=#http2_machine{settings_timer=TRef,
 %% When we receive an ack to a SETTINGS frame we sent we need to update
 %% When we receive an ack to a SETTINGS frame we sent we need to update
 %% the remote stream windows for all active streams.
 %% the remote stream windows for all active streams.
 streams_update_remote_window(State=#http2_machine{streams=Streams0}, Increment) ->
 streams_update_remote_window(State=#http2_machine{streams=Streams0}, Increment) ->
-	Streams = [
+	Streams = maps:map(fun(_, S=#stream{remote_window=StreamWindow}) ->
 		S#stream{remote_window=StreamWindow + Increment}
 		S#stream{remote_window=StreamWindow + Increment}
-	|| S=#stream{remote_window=StreamWindow} <- Streams0],
+	end, Streams0),
 	State#http2_machine{streams=Streams}.
 	State#http2_machine{streams=Streams}.
 
 
 %% PUSH_PROMISE frame.
 %% PUSH_PROMISE frame.
@@ -1213,31 +1213,33 @@ send_or_queue_data(StreamID, State0=#http2_machine{opts=Opts, local_window=ConnW
 %% all streams and send what we can until either everything is
 %% all streams and send what we can until either everything is
 %% sent or we run out of space in the window.
 %% sent or we run out of space in the window.
 send_data(State0=#http2_machine{streams=Streams0}) ->
 send_data(State0=#http2_machine{streams=Streams0}) ->
-	case send_data_for_all_streams(Streams0, State0, [], []) of
+	Iterator = maps:iterator(Streams0),
+	case send_data_for_all_streams(maps:next(Iterator), Streams0, State0, []) of
 		{ok, Streams, State, []} ->
 		{ok, Streams, State, []} ->
 			{ok, State#http2_machine{streams=Streams}};
 			{ok, State#http2_machine{streams=Streams}};
 		{ok, Streams, State, Send} ->
 		{ok, Streams, State, Send} ->
 			{send, Send, State#http2_machine{streams=Streams}}
 			{send, Send, State#http2_machine{streams=Streams}}
 	end.
 	end.
 
 
-send_data_for_all_streams([], State, Acc, Send) ->
-	{ok, lists:reverse(Acc), State, Send};
+send_data_for_all_streams(none, Streams, State, Send) ->
+	{ok, Streams, State, Send};
 %% While technically we should never get < 0 here, let's be on the safe side.
 %% While technically we should never get < 0 here, let's be on the safe side.
-send_data_for_all_streams(Tail, State=#http2_machine{local_window=ConnWindow}, Acc, Send)
+send_data_for_all_streams(_, Streams, State=#http2_machine{local_window=ConnWindow}, Send)
 		when ConnWindow =< 0 ->
 		when ConnWindow =< 0 ->
-	{ok, lists:reverse(Acc, Tail), State, Send};
+	{ok, Streams, State, Send};
 %% We rely on send_data_for_one_stream/3 to do all the necessary checks about the stream.
 %% We rely on send_data_for_one_stream/3 to do all the necessary checks about the stream.
-send_data_for_all_streams([Stream0|Tail], State0, Acc, Send) ->
+send_data_for_all_streams({StreamID, Stream0, Iterator}, Streams, State0, Send) ->
 	case send_data_for_one_stream(Stream0, State0, []) of
 	case send_data_for_one_stream(Stream0, State0, []) of
 		{ok, Stream, State, []} ->
 		{ok, Stream, State, []} ->
-			send_data_for_all_streams(Tail, State, [Stream|Acc], Send);
+			send_data_for_all_streams(maps:next(Iterator),
+				Streams#{StreamID => Stream}, State, Send);
 		%% We need to remove the stream here because we do not use stream_store/2.
 		%% We need to remove the stream here because we do not use stream_store/2.
-		{ok, #stream{id=StreamID, local=fin, remote=fin}, State, SendData} ->
-			send_data_for_all_streams(Tail, State, Acc,
-				[{StreamID, fin, SendData}|Send]);
-		{ok, Stream=#stream{id=StreamID, local=IsFin}, State, SendData} ->
-			send_data_for_all_streams(Tail, State, [Stream|Acc],
-				[{StreamID, IsFin, SendData}|Send])
+		{ok, #stream{local=fin, remote=fin}, State, SendData} ->
+			send_data_for_all_streams(maps:next(Iterator),
+				maps:remove(StreamID, Streams), State, [{StreamID, fin, SendData}|Send]);
+		{ok, Stream=#stream{local=IsFin}, State, SendData} ->
+			send_data_for_all_streams(maps:next(Iterator),
+				Streams#{StreamID => Stream}, State, [{StreamID, IsFin, SendData}|Send])
 	end.
 	end.
 
 
 send_data(Stream0, State0) ->
 send_data(Stream0, State0) ->
@@ -1460,10 +1462,10 @@ update_window(StreamID, Size, State)
 -spec reset_stream(cow_http2:streamid(), State)
 -spec reset_stream(cow_http2:streamid(), State)
 	-> {ok, State} | {error, not_found} when State::http2_machine().
 	-> {ok, State} | {error, not_found} when State::http2_machine().
 reset_stream(StreamID, State=#http2_machine{streams=Streams0}) ->
 reset_stream(StreamID, State=#http2_machine{streams=Streams0}) ->
-	case lists:keytake(StreamID, #stream.id, Streams0) of
-		{value, _, Streams} ->
+	case maps:take(StreamID, Streams0) of
+		{_, Streams} ->
 			{ok, stream_linger(StreamID, State#http2_machine{streams=Streams})};
 			{ok, stream_linger(StreamID, State#http2_machine{streams=Streams})};
-		false ->
+		error ->
 			{error, not_found}
 			{error, not_found}
 	end.
 	end.
 
 
@@ -1471,7 +1473,7 @@ reset_stream(StreamID, State=#http2_machine{streams=Streams0}) ->
 
 
 -spec get_connection_local_buffer_size(http2_machine()) -> non_neg_integer().
 -spec get_connection_local_buffer_size(http2_machine()) -> non_neg_integer().
 get_connection_local_buffer_size(#http2_machine{streams=Streams}) ->
 get_connection_local_buffer_size(#http2_machine{streams=Streams}) ->
-	lists:foldl(fun(#stream{local_buffer_size=Size}, Acc) ->
+	maps:fold(fun(_, #stream{local_buffer_size=Size}, Acc) ->
 		Acc + Size
 		Acc + Size
 	end, 0, Streams).
 	end, 0, Streams).
 
 
@@ -1564,19 +1566,15 @@ is_lingering_stream(StreamID, #http2_machine{
 %% Stream-related functions.
 %% Stream-related functions.
 
 
 stream_get(StreamID, #http2_machine{streams=Streams}) ->
 stream_get(StreamID, #http2_machine{streams=Streams}) ->
-	case lists:keyfind(StreamID, #stream.id, Streams) of
-		false -> undefined;
-		Stream -> Stream
-	end.
+	maps:get(StreamID, Streams, undefined).
 
 
 stream_store(#stream{id=StreamID, local=fin, remote=fin},
 stream_store(#stream{id=StreamID, local=fin, remote=fin},
 		State=#http2_machine{streams=Streams0}) ->
 		State=#http2_machine{streams=Streams0}) ->
-	Streams = lists:keydelete(StreamID, #stream.id, Streams0),
+	Streams = maps:remove(StreamID, Streams0),
 	State#http2_machine{streams=Streams};
 	State#http2_machine{streams=Streams};
 stream_store(Stream=#stream{id=StreamID},
 stream_store(Stream=#stream{id=StreamID},
-		State=#http2_machine{streams=Streams0}) ->
-	Streams = lists:keystore(StreamID, #stream.id, Streams0, Stream),
-	State#http2_machine{streams=Streams}.
+		State=#http2_machine{streams=Streams}) ->
+	State#http2_machine{streams=Streams#{StreamID => Stream}}.
 
 
 %% @todo Don't send an RST_STREAM if one was already sent.
 %% @todo Don't send an RST_STREAM if one was already sent.
 stream_reset(StreamID, State, Reason, HumanReadable) ->
 stream_reset(StreamID, State, Reason, HumanReadable) ->