Browse Source

Add the {active, boolean()} Websocket command

This command is currently not documented. It allows disabling
the reading of incoming data from the socket, and can be used
as a poor man's flow control.
Loïc Hoguin 6 years ago
parent
commit
f810d8dd64

+ 4 - 0
doc/src/guide/migrating_from_2.4.asciidoc

@@ -31,6 +31,10 @@ also been worked on.
   sent or commands yet to be introduced. New commands will
   be available only through this new interface.
 
+* Add the `{active, boolean()}` Websocket handler command.
+  It allows disabling reading from the socket when `false`
+  is returned. `true` reenables reading from the socket.
+
 * Add the protocol option `logger` that allows configuring
   which logger module will be used. The logger module must
   follow the interface of the new `logger` module in Erlang/OTP 21,

+ 5 - 0
src/cowboy_websocket.erl

@@ -77,6 +77,7 @@
 	ref :: ranch:ref(),
 	socket = undefined :: inet:socket() | {pid(), cowboy_stream:streamid()} | undefined,
 	transport = undefined :: module() | undefined,
+	active = true :: boolean(),
 	handler :: module(),
 	key = undefined :: undefined | binary(),
 	timeout = infinity :: timeout(),
@@ -295,6 +296,8 @@ takeover(Parent, Ref, Socket, Transport, _Opts, Buffer,
 		false -> before_loop(State, HandlerState, #ps_header{buffer=Buffer})
 	end.
 
+before_loop(State=#state{active=false}, HandlerState, ParseState) ->
+	loop(State, HandlerState, ParseState);
 %% @todo We probably shouldn't do the setopts if we have not received a socket message.
 %% @todo We need to hibernate when HTTP/2 is used too.
 before_loop(State=#state{socket=Stream={Pid, _}, transport=undefined},
@@ -516,6 +519,8 @@ commands([], State, []) ->
 commands([], State, Data) ->
 	Result = transport_send(State, nofin, lists:reverse(Data)),
 	{Result, State};
+commands([{active, Active}|Tail], State, Data) when is_boolean(Active) ->
+	commands(Tail, State#state{active=Active}, Data);
 commands([Frame|Tail], State=#state{extensions=Extensions}, Data0) ->
 	Data = [cow_ws:frame(Frame, Extensions)|Data0],
 	case is_close_frame(Frame) of

+ 30 - 0
test/handlers/ws_active_commands_h.erl

@@ -0,0 +1,30 @@
+%% This module takes commands from the x-commands header
+%% and returns them in the websocket_init/1 callback.
+
+-module(ws_active_commands_h).
+-behavior(cowboy_websocket).
+
+-export([init/2]).
+-export([websocket_init/1]).
+-export([websocket_handle/2]).
+-export([websocket_info/2]).
+
+init(Req, RunOrHibernate) ->
+	{cowboy_websocket, Req, RunOrHibernate}.
+
+websocket_init(State=run) ->
+	erlang:send_after(1500, self(), active_true),
+	{[{active, false}], State};
+websocket_init(State=hibernate) ->
+	erlang:send_after(1500, self(), active_true),
+	{[{active, false}], State, hibernate}.
+
+websocket_handle(Frame, State=run) ->
+	{[Frame], State};
+websocket_handle(Frame, State=hibernate) ->
+	{[Frame], State, hibernate}.
+
+websocket_info(active_true, State=run) ->
+	{[{active, true}], State};
+websocket_info(active_true, State=hibernate) ->
+	{[{active, true}], State, hibernate}.

+ 13 - 1
test/ws_handler_SUITE.erl

@@ -26,6 +26,7 @@
 all() ->
 	[{group, ws}, {group, ws_hibernate}].
 
+%% @todo Test against HTTP/2 too.
 groups() ->
 	AllTests = ct_helper:all(?MODULE),
 	[{ws, [parallel], AllTests}, {ws_hibernate, [parallel], AllTests}].
@@ -48,7 +49,8 @@ init_dispatch(Name) ->
 	cowboy_router:compile([{'_', [
 		{"/init", ws_init_commands_h, RunOrHibernate},
 		{"/handle", ws_handle_commands_h, RunOrHibernate},
-		{"/info", ws_info_commands_h, RunOrHibernate}
+		{"/info", ws_info_commands_h, RunOrHibernate},
+		{"/active", ws_active_commands_h, RunOrHibernate}
 	]}]).
 
 %% Support functions for testing using Gun.
@@ -205,3 +207,13 @@ do_many_frames_then_close_frame(Config, Path) ->
 	{ok, {binary, <<"Two frames!">>}} = receive_ws(ConnPid, StreamRef),
 	{ok, close} = receive_ws(ConnPid, StreamRef),
 	gun_down(ConnPid).
+
+websocket_active_false(Config) ->
+	doc("The {active, false} command stops receiving data from the socket. "
+		"The {active, true} command reenables it."),
+	{ok, ConnPid, StreamRef} = gun_open_ws(Config, "/active", []),
+	gun:ws_send(ConnPid, {text, <<"Not received until the handler enables active again.">>}),
+	{error, timeout} = receive_ws(ConnPid, StreamRef),
+	{ok, {text, <<"Not received until the handler enables active again.">>}}
+		= receive_ws(ConnPid, StreamRef),
+	ok.