Browse Source

Merge branch 'websocket-fragments' of https://github.com/klaar/cowboy

Loïc Hoguin 13 years ago
parent
commit
e8ee64777c
3 changed files with 174 additions and 31 deletions
  1. 112 26
      src/cowboy_http_websocket.erl
  2. 2 2
      test/autobahn_SUITE_data/test.py
  3. 60 3
      test/ws_SUITE.erl

+ 112 - 26
src/cowboy_http_websocket.erl

@@ -46,6 +46,14 @@
 -type opcode() :: 0 | 1 | 2 | 8 | 9 | 10.
 -type mask_key() :: 0..16#ffffffff.
 
+%% The websocket_data/4 function may be called multiple times for a message.
+%% The websocket_dispatch/4 function is only called once for each message.
+-type frag_state() ::
+	undefined | %% no fragmentation has been seen.
+	{nofin, opcode()} | %% first fragment has been seen.
+	{nofin, opcode(), binary()} | %% first fragment has been unmasked.
+	{fin, opcode(), binary()}. %% last fragment has been seen.
+
 -record(state, {
 	version :: 0 | 7 | 8 | 13,
 	handler :: module(),
@@ -56,7 +64,8 @@
 	messages = undefined :: undefined | {atom(), atom(), atom()},
 	hibernate = false :: boolean(),
 	eop :: undefined | tuple(), %% hixie-76 specific.
-	origin = undefined :: undefined | binary() %% hixie-76 specific.
+	origin = undefined :: undefined | binary(), %% hixie-76 specific.
+	frag_state = undefined :: frag_state()
 }).
 
 %% @doc Upgrade a HTTP request to the WebSocket protocol.
@@ -273,31 +282,94 @@ websocket_data(State=#state{version=0, eop=EOP}, Req, HandlerState,
 websocket_data(State=#state{version=Version}, Req, HandlerState, Data)
 		when Version =/= 0, byte_size(Data) =:= 1 ->
 	handler_before_loop(State, Req, HandlerState, Data);
-%% hybi data frame.
-%% @todo Handle Fin.
-websocket_data(State=#state{version=Version}, Req, HandlerState, Data)
-		when Version =/= 0 ->
-	<< 1:1, 0:3, Opcode:4, Mask:1, PayloadLen:7, Rest/bits >> = Data,
-	case {PayloadLen, Rest} of
-		{126, _} when Opcode >= 8 -> websocket_close(
-			State, Req, HandlerState, {error, protocol});
-		{127, _} when Opcode >= 8 -> websocket_close(
-			State, Req, HandlerState, {error, protocol});
-		{126, << L:16, R/bits >>}  -> websocket_before_unmask(
-			State, Req, HandlerState, Data, R, Opcode, Mask, L);
-		{126, Rest} -> websocket_before_unmask(
-			State, Req, HandlerState, Data, Rest, Opcode, Mask, undefined);
-		{127, << 0:1, L:63, R/bits >>} -> websocket_before_unmask(
-			State, Req, HandlerState, Data, R, Opcode, Mask, L);
-		{127, Rest} -> websocket_before_unmask(
-			State, Req, HandlerState, Data, Rest, Opcode, Mask, undefined);
-		{PayloadLen, Rest} -> websocket_before_unmask(
-			State, Req, HandlerState, Data, Rest, Opcode, Mask, PayloadLen)
-	end;
-%% Something was wrong with the frame. Close the connection.
-websocket_data(State, Req, HandlerState, _Bad) ->
+%% 7 bit payload length prefix exists
+websocket_data(State, Req, HandlerState,
+		<< Fin:1, Rsv:3, Opcode:4, Mask:1, PayloadLen:7, Rest/bits >>
+		= Data) when PayloadLen < 126 ->
+	websocket_data(State, Req, HandlerState,
+		Fin, Rsv, Opcode, Mask, PayloadLen, Rest, Data);
+%% 7+16 bits payload length prefix exists
+websocket_data(State, Req, HandlerState,
+		<< Fin:1, Rsv:3, Opcode:4, Mask:1, 126:7, PayloadLen:16, Rest/bits >>
+		= Data) when PayloadLen > 125 ->
+	websocket_data(State, Req, HandlerState,
+		Fin, Rsv, Opcode, Mask, PayloadLen, Rest, Data);
+%% 7+16 bits payload length prefix missing
+websocket_data(State, Req, HandlerState,
+		<< _Fin:1, _Rsv:3, _Opcode:4, _Mask:1, 126:7, Rest/bits >>
+		= Data) when byte_size(Rest) < 2 ->
+	handler_before_loop(State, Req, HandlerState, Data);
+%% 7+64 bits payload length prefix exists
+websocket_data(State, Req, HandlerState,
+		<< Fin:1, Rsv:3, Opcode:4, Mask:1, 127:7, 0:1, PayloadLen:63,
+		   Rest/bits >> = Data) when PayloadLen > 16#FFFF ->
+	websocket_data(State, Req, HandlerState,
+		Fin, Rsv, Opcode, Mask, PayloadLen, Rest, Data);
+%% 7+64 bits payload length prefix missing
+websocket_data(State, Req, HandlerState,
+		<< _Fin:1, _Rsv:3, _Opcode:4, _Mask:1, 127:7, Rest/bits >>
+		= Data) when byte_size(Rest) < 8 ->
+	handler_before_loop(State, Req, HandlerState, Data);
+%% invalid payload length prefix.
+websocket_data(State, Req, HandlerState, _Data) ->
 	websocket_close(State, Req, HandlerState, {error, badframe}).
 
+
+-spec websocket_data(#state{}, #http_req{}, any(), non_neg_integer(),
+		non_neg_integer(), non_neg_integer(), non_neg_integer(),
+		non_neg_integer(), binary(), binary()) -> closed.
+%% A fragmented message MUST start a non-zero opcode.
+websocket_data(State=#state{frag_state=undefined}, Req, HandlerState,
+		_Fin=0, _Rsv=0, _Opcode=0, _Mask, _PayloadLen, _Rest, _Buffer) ->
+	websocket_close(State, Req, HandlerState, {error, badframe});
+%% A control message MUST NOT be fragmented.
+websocket_data(State, Req, HandlerState, _Fin=0, _Rsv=0, Opcode, _Mask,
+		_PayloadLen, _Rest, _Buffer) when Opcode >= 8 ->
+	websocket_close(State, Req, HandlerState, {error, badframe});
+%% The opcode is only included in the first message fragment.
+websocket_data(State=#state{frag_state=undefined}, Req, HandlerState,
+		_Fin=0, _Rsv=0, Opcode, Mask, PayloadLen, Rest, Data) ->
+	websocket_before_unmask(
+		State#state{frag_state={nofin, Opcode}}, Req, HandlerState,
+		Data, Rest, 0, Mask, PayloadLen);
+%% non-control opcode when expecting control message or next fragment.
+websocket_data(State=#state{frag_state={nofin, _, _}}, Req, HandlerState, _Fin,
+		_Rsv=0, Opcode, _Mask, _Ln, _Rest, _Data) when Opcode > 0, Opcode < 8 ->
+	websocket_close(State, Req, HandlerState, {error, badframe});
+%% If the first message fragment was incomplete, retry unmasking.
+websocket_data(State=#state{frag_state={nofin, Opcode}}, Req, HandlerState,
+		_Fin=0, _Rsv=0, Opcode, Mask, PayloadLen, Rest, Data) ->
+	websocket_before_unmask(
+		State#state{frag_state={nofin, Opcode}}, Req, HandlerState,
+		Data, Rest, 0, Mask, PayloadLen);
+%% if the opcode is zero and the fin flag is zero, unmask and await next.
+websocket_data(State=#state{frag_state={nofin, _Opcode, _Payloads}}, Req,
+		HandlerState, _Fin=0, _Rsv=0, _Opcode2=0, Mask, PayloadLen, Rest,
+		Data) ->
+	websocket_before_unmask(
+		State, Req, HandlerState, Data, Rest, 0, Mask, PayloadLen);
+%% when the last fragment is seen. Update the fragmentation status.
+websocket_data(State=#state{frag_state={nofin, Opcode, Payloads}}, Req,
+		HandlerState, _Fin=1, _Rsv=0, _Opcode=0, Mask, PayloadLen, Rest,
+		Data) ->
+	websocket_before_unmask(
+		State#state{frag_state={fin, Opcode, Payloads}},
+		Req, HandlerState, Data, Rest, 0, Mask, PayloadLen);
+%% control messages MUST NOT use 7+16 bits or 7+64 bits payload length prefixes
+websocket_data(State, Req, HandlerState, _Fin, _Rsv, Opcode, _Mask, PayloadLen,
+		_Rest, _Data) when Opcode >= 8, PayloadLen > 125 ->
+	 websocket_close(State, Req, HandlerState, {error, protocol});
+%% unfragmented message. unmask and dispatch the message.
+websocket_data(State=#state{version=Version}, Req, HandlerState, _Fin=1, _Rsv=0,
+		Opcode, Mask, PayloadLen, Rest, Data) when Version =/= 0 ->
+	websocket_before_unmask(
+			State, Req, HandlerState, Data, Rest, Opcode, Mask, PayloadLen);
+%% Something was wrong with the frame. Close the connection.
+websocket_data(State, Req, HandlerState, _Fin, _Rsv, _Opcode, _Mask,
+		_PayloadLen, _Rest, _Data) ->
+		websocket_close(State, Req, HandlerState, {error, badframe}).
+
+
 %% hybi routing depending on whether unmasking is needed.
 -spec websocket_before_unmask(#state{}, #http_req{}, any(), binary(),
 	binary(), opcode(), 0 | 1, non_neg_integer() | undefined) -> closed.
@@ -356,8 +428,22 @@ websocket_unmask(State, Req, HandlerState, RemainingData,
 %% hybi dispatching.
 -spec websocket_dispatch(#state{}, #http_req{}, any(), binary(),
 	opcode(), binary()) -> closed.
-%% @todo Fragmentation.
-%~ websocket_dispatch(State, Req, HandlerState, RemainingData, 0, Payload) ->
+%% First frame of a fragmented message unmasked. Expect intermediate or last.
+websocket_dispatch(State=#state{frag_state={nofin, Opcode}}, Req, HandlerState,
+		RemainingData, 0, Payload) ->
+	websocket_data(State#state{frag_state={nofin, Opcode, Payload}},
+		Req, HandlerState, RemainingData);
+%% Intermediate frame of a fragmented message unmasked. Add payload to buffer.
+websocket_dispatch(State=#state{frag_state={nofin, Opcode, Payloads}}, Req,
+		HandlerState, RemainingData, 0, Payload) ->
+	websocket_data(State#state{frag_state={nofin, Opcode,
+		<<Payloads/binary, Payload/binary>>}}, Req, HandlerState,
+		RemainingData);
+%% Last frame of a fragmented message unmasked. Dispatch to handler.
+websocket_dispatch(State=#state{frag_state={fin, Opcode, Payloads}}, Req,
+		HandlerState, RemainingData, 0, Payload) ->
+	websocket_dispatch(State#state{frag_state=undefined}, Req, HandlerState,
+		RemainingData, Opcode, <<Payloads/binary, Payload/binary>>);
 %% Text frame.
 websocket_dispatch(State, Req, HandlerState, RemainingData, 1, Payload) ->
 	handler_call(State, Req, HandlerState, RemainingData,

+ 2 - 2
test/autobahn_SUITE_data/test.py

@@ -29,7 +29,7 @@ def install_env(env):
     subprocess.check_call(["curl", "-sS", VIRTUALENV_URL, "-o", VIRTUALENV_BIN])
     subprocess.check_call(["python", VIRTUALENV_BIN, env])
     activate_env(env)
-    subprocess.check_call([PIP_BIN, "install", "Autobahn"])
+    subprocess.check_call([PIP_BIN, "install", "AutobahnTestSuite"])
 
 def client_config():
     """
@@ -54,7 +54,7 @@ def run_test(env, config):
     activate_env(env)
     from twisted.python import log
     from twisted.internet import reactor
-    from autobahn.fuzzing import FuzzingClientFactory
+    from autobahntestsuite.fuzzing import FuzzingClientFactory
     os.chdir(AB_TESTS_PRIV)
     log.startLogging(sys.stdout)
     fuzzer = FuzzingClientFactory(config)

+ 60 - 3
test/ws_SUITE.erl

@@ -19,7 +19,7 @@
 -export([all/0, groups/0, init_per_suite/1, end_per_suite/1,
 	init_per_group/2, end_per_group/2]). %% ct.
 -export([ws0/1, ws8/1, ws8_single_bytes/1, ws8_init_shutdown/1,
-	ws13/1, ws_timeout_hibernate/1]). %% ws.
+	ws13/1, ws_timeout_hibernate/1, ws_text_fragments/1]). %% ws.
 
 %% ct.
 
@@ -28,7 +28,7 @@ all() ->
 
 groups() ->
 	BaseTests = [ws0, ws8, ws8_single_bytes, ws8_init_shutdown, ws13,
-		ws_timeout_hibernate],
+		ws_timeout_hibernate, ws_text_fragments],
 	[{ws, [], BaseTests}].
 
 init_per_suite(Config) ->
@@ -60,7 +60,8 @@ init_dispatch() ->
 		{[<<"localhost">>], [
 			{[<<"websocket">>], websocket_handler, []},
 			{[<<"ws_timeout_hibernate">>], ws_timeout_hibernate_handler, []},
-			{[<<"ws_init_shutdown">>], websocket_handler_init_shutdown, []}
+			{[<<"ws_init_shutdown">>], websocket_handler_init_shutdown, []},
+			{[<<"ws_echo_handler">>], websocket_echo_handler, []}
 		]}
 	].
 
@@ -310,6 +311,62 @@ ws13(Config) ->
 	{error, closed} = gen_tcp:recv(Socket, 0, 6000),
 	ok.
 
+ws_text_fragments(Config) ->
+	{port, Port} = lists:keyfind(port, 1, Config),
+	{ok, Socket} = gen_tcp:connect("localhost", Port,
+		[binary, {active, false}, {packet, raw}]),
+	ok = gen_tcp:send(Socket, [
+		"GET /ws_echo_handler HTTP/1.1\r\n"
+		"Host: localhost\r\n"
+		"Connection: Upgrade\r\n"
+		"Upgrade: websocket\r\n"
+		"Sec-WebSocket-Origin: http://localhost\r\n"
+		"Sec-WebSocket-Version: 8\r\n"
+		"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"
+		"\r\n"]),
+	{ok, Handshake} = gen_tcp:recv(Socket, 0, 6000),
+	{ok, {http_response, {1, 1}, 101, "Switching Protocols"}, Rest}
+		= erlang:decode_packet(http, Handshake, []),
+	[Headers, <<>>] = websocket_headers(
+		erlang:decode_packet(httph, Rest, []), []),
+	{'Connection', "Upgrade"} = lists:keyfind('Connection', 1, Headers),
+	{'Upgrade', "websocket"} = lists:keyfind('Upgrade', 1, Headers),
+	{"sec-websocket-accept", "s3pPLMBiTxaQ9kYGzzhZRbK+xOo="}
+		= lists:keyfind("sec-websocket-accept", 1, Headers),
+
+	ok = gen_tcp:send(Socket, [
+		<< 0:1, 0:3, 1:4, 1:1, 5:7 >>,
+		<< 16#37 >>, << 16#fa >>, << 16#21 >>, << 16#3d >>, << 16#7f >>,
+		<< 16#9f >>, << 16#4d >>, << 16#51 >>, << 16#58 >>]),
+	ok = gen_tcp:send(Socket, [
+		<< 1:1, 0:3, 0:4, 1:1, 5:7 >>,
+		<< 16#37 >>, << 16#fa >>, << 16#21 >>, << 16#3d >>, << 16#7f >>,
+		<< 16#9f >>, << 16#4d >>, << 16#51 >>, << 16#58 >>]),
+	{ok, << 1:1, 0:3, 1:4, 0:1, 10:7, "HelloHello" >>}
+		= gen_tcp:recv(Socket, 0, 6000),
+
+	ok = gen_tcp:send(Socket, [
+		%% #1
+		<< 0:1, 0:3, 1:4, 1:1, 5:7 >>,
+		<< 16#37 >>, << 16#fa >>, << 16#21 >>, << 16#3d >>, << 16#7f >>,
+		<< 16#9f >>, << 16#4d >>, << 16#51 >>, << 16#58 >>,
+		%% #2
+		<< 0:1, 0:3, 0:4, 1:1, 5:7 >>,
+		<< 16#37 >>, << 16#fa >>, << 16#21 >>, << 16#3d >>, << 16#7f >>,
+		<< 16#9f >>, << 16#4d >>, << 16#51 >>, << 16#58 >>,
+		%% #3
+		<< 1:1, 0:3, 0:4, 1:1, 5:7 >>,
+		<< 16#37 >>, << 16#fa >>, << 16#21 >>, << 16#3d >>, << 16#7f >>,
+		<< 16#9f >>, << 16#4d >>, << 16#51 >>, << 16#58 >>]),
+	{ok, << 1:1, 0:3, 1:4, 0:1, 15:7, "HelloHelloHello" >>}
+		= gen_tcp:recv(Socket, 0, 6000),
+
+	ok = gen_tcp:send(Socket, << 1:1, 0:3, 8:4, 0:8 >>), %% close
+	{ok, << 1:1, 0:3, 8:4, 0:8 >>} = gen_tcp:recv(Socket, 0, 6000),
+	{error, closed} = gen_tcp:recv(Socket, 0, 6000),
+	ok.
+
+
 websocket_headers({ok, http_eoh, Rest}, Acc) ->
 	[Acc, Rest];
 websocket_headers({ok, {http_header, _I, Key, _R, Value}, Rest}, Acc) ->