Browse Source

Add a commands-based interface to Websocket handlers

This feature is currently experimental. It will become the
preferred way to use Websocket handlers once it becomes
documented.

A commands-based interface enables adding commands without
having to change the interface much. It mirrors the interface
of stream handlers or gen_statem. It will enable adding
commands that have been needed for some time but were not
implemented for fear of making the interface too complex.
Loïc Hoguin 6 years ago
parent
commit
8404b1c908

+ 42 - 4
src/cowboy_websocket.erl

@@ -31,7 +31,12 @@
 -export([system_terminate/4]).
 -export([system_code_change/4]).
 
--type call_result(State) :: {ok, State}
+-type commands() :: [cow_ws:frame()].
+-export_type([commands/0]).
+
+-type call_result(State) :: {commands(), State} | {commands(), State, hibernate}.
+
+-type deprecated_call_result(State) :: {ok, State}
 	| {ok, State, hibernate}
 	| {reply, cow_ws:frame() | [cow_ws:frame()], State}
 	| {reply, cow_ws:frame() | [cow_ws:frame()], State, hibernate}
@@ -48,13 +53,13 @@
 	when Req::cowboy_req:req().
 
 -callback websocket_init(State)
-	-> call_result(State) when State::any().
+	-> call_result(State) | deprecated_call_result(State) when State::any().
 -optional_callbacks([websocket_init/1]).
 
 -callback websocket_handle(ping | pong | {text | binary | ping | pong, binary()}, State)
-	-> call_result(State) when State::any().
+	-> call_result(State) | deprecated_call_result(State) when State::any().
 -callback websocket_info(any(), State)
-	-> call_result(State) when State::any().
+	-> call_result(State) | deprecated_call_result(State) when State::any().
 
 -callback terminate(any(), cowboy_req:req(), any()) -> ok.
 -optional_callbacks([terminate/3]).
@@ -457,6 +462,13 @@ handler_call(State=#state{handler=Handler}, HandlerState,
 		websocket_init -> Handler:websocket_init(HandlerState);
 		_ -> Handler:Callback(Message, HandlerState)
 	end of
+		{Commands, HandlerState2} when is_list(Commands) ->
+			handler_call_result(State,
+				HandlerState2, ParseState, NextState, Commands);
+		{Commands, HandlerState2, hibernate} when is_list(Commands) ->
+			handler_call_result(State#state{hibernate=true},
+				HandlerState2, ParseState, NextState, Commands);
+		%% The following call results are deprecated.
 		{ok, HandlerState2} ->
 			NextState(State, HandlerState2, ParseState);
 		{ok, HandlerState2, hibernate} ->
@@ -488,6 +500,32 @@ handler_call(State=#state{handler=Handler}, HandlerState,
 		erlang:raise(Class, Reason, erlang:get_stacktrace())
 	end.
 
+-spec handler_call_result(#state{}, any(), parse_state(), fun(), commands()) -> no_return().
+handler_call_result(State0, HandlerState, ParseState, NextState, Commands) ->
+	case commands(Commands, State0, []) of
+		{ok, State} ->
+			NextState(State, HandlerState, ParseState);
+		{stop, State} ->
+			terminate(State, HandlerState, stop);
+		{Error = {error, _}, State} ->
+			terminate(State, HandlerState, Error)
+	end.
+
+commands([], State, []) ->
+	{ok, State};
+commands([], State, Data) ->
+	Result = transport_send(State, nofin, lists:reverse(Data)),
+	{Result, State};
+commands([Frame|Tail], State=#state{extensions=Extensions}, Data0) ->
+	Data = [cow_ws:frame(Frame, Extensions)|Data0],
+	case is_close_frame(Frame) of
+		true ->
+			_ = transport_send(State, fin, lists:reverse(Data)),
+			{stop, State};
+		false ->
+			commands(Tail, State, Data)
+	end.
+
 transport_send(#state{socket=Stream={Pid, _}, transport=undefined}, IsFin, Data) ->
 	Pid ! {Stream, {data, IsFin, Data}},
 	ok;

+ 31 - 0
test/handlers/ws_handle_commands_h.erl

@@ -0,0 +1,31 @@
+%% This module takes commands from the x-commands header
+%% and returns them in the websocket_handle/2 callback.
+
+-module(ws_handle_commands_h).
+-behavior(cowboy_websocket).
+
+-export([init/2]).
+-export([websocket_init/1]).
+-export([websocket_handle/2]).
+-export([websocket_info/2]).
+
+init(Req=#{pid := Pid}, RunOrHibernate) ->
+	Commands0 = cowboy_req:header(<<"x-commands">>, Req),
+	Commands = binary_to_term(base64:decode(Commands0)),
+	case Commands of
+		bad -> ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6);
+		_ -> ok
+	end,
+	{cowboy_websocket, Req, {Commands, RunOrHibernate}}.
+
+websocket_init(State) ->
+	{[], State}.
+
+websocket_handle(_, State={Commands, run}) ->
+	{Commands, State};
+websocket_handle(_, State={Commands, hibernate}) ->
+	{Commands, State, hibernate}.
+
+websocket_info(_, State) ->
+	{[], State}.
+

+ 32 - 0
test/handlers/ws_info_commands_h.erl

@@ -0,0 +1,32 @@
+%% This module takes commands from the x-commands header
+%% and returns them in the websocket_info/2 callback.
+%% This callback is triggered via a message.
+
+-module(ws_info_commands_h).
+-behavior(cowboy_websocket).
+
+-export([init/2]).
+-export([websocket_init/1]).
+-export([websocket_handle/2]).
+-export([websocket_info/2]).
+
+init(Req=#{pid := Pid}, RunOrHibernate) ->
+	Commands0 = cowboy_req:header(<<"x-commands">>, Req),
+	Commands = binary_to_term(base64:decode(Commands0)),
+	case Commands of
+		bad -> ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6);
+		_ -> ok
+	end,
+	{cowboy_websocket, Req, {Commands, RunOrHibernate}}.
+
+websocket_init(State) ->
+	self() ! shoot,
+	{[], State}.
+
+websocket_handle(_, State) ->
+	{[], State}.
+
+websocket_info(_, State={Commands, run}) ->
+	{Commands, State};
+websocket_info(_, State={Commands, hibernate}) ->
+	{Commands, State, hibernate}.

+ 30 - 0
test/handlers/ws_init_commands_h.erl

@@ -0,0 +1,30 @@
+%% This module takes commands from the x-commands header
+%% and returns them in the websocket_init/1 callback.
+
+-module(ws_init_commands_h).
+-behavior(cowboy_websocket).
+
+-export([init/2]).
+-export([websocket_init/1]).
+-export([websocket_handle/2]).
+-export([websocket_info/2]).
+
+init(Req=#{pid := Pid}, RunOrHibernate) ->
+	Commands0 = cowboy_req:header(<<"x-commands">>, Req),
+	Commands = binary_to_term(base64:decode(Commands0)),
+	case Commands of
+		bad -> ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6);
+		_ -> ok
+	end,
+	{cowboy_websocket, Req, {Commands, RunOrHibernate}}.
+
+websocket_init(State={Commands, run}) ->
+	{Commands, State};
+websocket_init(State={Commands, hibernate}) ->
+	{Commands, State, hibernate}.
+
+websocket_handle(_, State) ->
+	{[], State}.
+
+websocket_info(_, State) ->
+	{[], State}.

+ 207 - 0
test/ws_handler_SUITE.erl

@@ -0,0 +1,207 @@
+%% Copyright (c) 2018, Loïc Hoguin <essen@ninenines.eu>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(ws_handler_SUITE).
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-import(ct_helper, [config/2]).
+-import(ct_helper, [doc/1]).
+-import(cowboy_test, [gun_open/1]).
+-import(cowboy_test, [gun_down/1]).
+
+%% ct.
+
+all() ->
+	[{group, ws}, {group, ws_hibernate}].
+
+groups() ->
+	AllTests = ct_helper:all(?MODULE),
+	[{ws, [parallel], AllTests}, {ws_hibernate, [parallel], AllTests}].
+
+init_per_group(Name, Config) ->
+	cowboy_test:init_http(Name, #{
+		env => #{dispatch => init_dispatch(Name)}
+	}, Config).
+
+end_per_group(Name, _) ->
+	cowboy:stop_listener(Name).
+
+%% Dispatch configuration.
+
+init_dispatch(Name) ->
+	RunOrHibernate = case Name of
+		ws -> run;
+		ws_hibernate -> hibernate
+	end,
+	cowboy_router:compile([{'_', [
+		{"/init", ws_init_commands_h, RunOrHibernate},
+		{"/handle", ws_handle_commands_h, RunOrHibernate},
+		{"/info", ws_info_commands_h, RunOrHibernate}
+	]}]).
+
+%% Support functions for testing using Gun.
+
+gun_open_ws(Config, Path, Commands) ->
+	ConnPid = gun_open(Config),
+	StreamRef = gun:ws_upgrade(ConnPid, Path, [
+		{<<"x-commands">>, base64:encode(term_to_binary(Commands))}
+	]),
+	receive
+		{gun_upgrade, ConnPid, StreamRef, [<<"websocket">>], _} ->
+			{ok, ConnPid, StreamRef};
+		{gun_response, ConnPid, _, _, Status, Headers} ->
+			exit({ws_upgrade_failed, Status, Headers});
+		{gun_error, ConnPid, StreamRef, Reason} ->
+			exit({ws_upgrade_failed, Reason})
+	after 1000 ->
+		error(timeout)
+	end.
+
+receive_ws(ConnPid, StreamRef) ->
+	receive
+		{gun_ws, ConnPid, StreamRef, Frame} ->
+			{ok, Frame}
+	after 1000 ->
+		{error, timeout}
+	end.
+
+ensure_handle_is_called(ConnPid, "/handle") ->
+	gun:ws_send(ConnPid, {text, <<"Necessary to trigger websocket_handle/2.">>});
+ensure_handle_is_called(_, _) ->
+	ok.
+
+%% Tests.
+
+websocket_init_nothing(Config) ->
+	doc("Nothing happens when websocket_init/1 returns no commands."),
+	do_nothing(Config, "/init").
+
+websocket_handle_nothing(Config) ->
+	doc("Nothing happens when websocket_handle/2 returns no commands."),
+	do_nothing(Config, "/handle").
+
+websocket_info_nothing(Config) ->
+	doc("Nothing happens when websocket_info/2 returns no commands."),
+	do_nothing(Config, "/info").
+
+do_nothing(Config, Path) ->
+	{ok, ConnPid, StreamRef} = gun_open_ws(Config, Path, []),
+	ensure_handle_is_called(ConnPid, Path),
+	{error, timeout} = receive_ws(ConnPid, StreamRef),
+	ok.
+
+websocket_init_invalid(Config) ->
+	doc("The connection must be closed when websocket_init/1 returns an invalid command."),
+	do_invalid(Config, "/init").
+
+websocket_handle_invalid(Config) ->
+	doc("The connection must be closed when websocket_handle/2 returns an invalid command."),
+	do_invalid(Config, "/init").
+
+websocket_info_invalid(Config) ->
+	doc("The connection must be closed when websocket_info/2 returns an invalid command."),
+	do_invalid(Config, "/info").
+
+do_invalid(Config, Path) ->
+	{ok, ConnPid, _} = gun_open_ws(Config, Path, bad),
+	ensure_handle_is_called(ConnPid, Path),
+	gun_down(ConnPid).
+
+websocket_init_one_frame(Config) ->
+	doc("A single frame is received when websocket_init/1 returns it as a command."),
+	do_one_frame(Config, "/init").
+
+websocket_handle_one_frame(Config) ->
+	doc("A single frame is received when websocket_handle/2 returns it as a command."),
+	do_one_frame(Config, "/handle").
+
+websocket_info_one_frame(Config) ->
+	doc("A single frame is received when websocket_info/2 returns it as a command."),
+	do_one_frame(Config, "/info").
+
+do_one_frame(Config, Path) ->
+	{ok, ConnPid, StreamRef} = gun_open_ws(Config, Path, [
+		{text, <<"One frame!">>}
+	]),
+	ensure_handle_is_called(ConnPid, Path),
+	{ok, {text, <<"One frame!">>}} = receive_ws(ConnPid, StreamRef),
+	ok.
+
+websocket_init_many_frames(Config) ->
+	doc("Multiple frames are received when websocket_init/1 returns them as commands."),
+	do_many_frames(Config, "/init").
+
+websocket_handle_many_frames(Config) ->
+	doc("Multiple frames are received when websocket_handle/2 returns them as commands."),
+	do_many_frames(Config, "/handle").
+
+websocket_info_many_frames(Config) ->
+	doc("Multiple frames are received when websocket_info/2 returns them as commands."),
+	do_many_frames(Config, "/info").
+
+do_many_frames(Config, Path) ->
+	{ok, ConnPid, StreamRef} = gun_open_ws(Config, Path, [
+		{text, <<"One frame!">>},
+		{binary, <<"Two frames!">>}
+	]),
+	ensure_handle_is_called(ConnPid, Path),
+	{ok, {text, <<"One frame!">>}} = receive_ws(ConnPid, StreamRef),
+	{ok, {binary, <<"Two frames!">>}} = receive_ws(ConnPid, StreamRef),
+	ok.
+
+websocket_init_close_frame(Config) ->
+	doc("A single close frame is received when websocket_init/1 returns it as a command."),
+	do_close_frame(Config, "/init").
+
+websocket_handle_close_frame(Config) ->
+	doc("A single close frame is received when websocket_handle/2 returns it as a command."),
+	do_close_frame(Config, "/handle").
+
+websocket_info_close_frame(Config) ->
+	doc("A single close frame is received when websocket_info/2 returns it as a command."),
+	do_close_frame(Config, "/info").
+
+do_close_frame(Config, Path) ->
+	{ok, ConnPid, StreamRef} = gun_open_ws(Config, Path, [close]),
+	ensure_handle_is_called(ConnPid, Path),
+	{ok, close} = receive_ws(ConnPid, StreamRef),
+	gun_down(ConnPid).
+
+websocket_init_many_frames_then_close_frame(Config) ->
+	doc("Multiple frames are received followed by a close frame "
+		"when websocket_init/1 returns them as commands."),
+	do_many_frames_then_close_frame(Config, "/init").
+
+websocket_handle_many_frames_then_close_frame(Config) ->
+	doc("Multiple frames are received followed by a close frame "
+		"when websocket_handle/2 returns them as commands."),
+	do_many_frames_then_close_frame(Config, "/handle").
+
+websocket_info_many_frames_then_close_frame(Config) ->
+	doc("Multiple frames are received followed by a close frame "
+		"when websocket_info/2 returns them as commands."),
+	do_many_frames_then_close_frame(Config, "/info").
+
+do_many_frames_then_close_frame(Config, Path) ->
+	{ok, ConnPid, StreamRef} = gun_open_ws(Config, Path, [
+		{text, <<"One frame!">>},
+		{binary, <<"Two frames!">>},
+		close
+	]),
+	ensure_handle_is_called(ConnPid, Path),
+	{ok, {text, <<"One frame!">>}} = receive_ws(ConnPid, StreamRef),
+	{ok, {binary, <<"Two frames!">>}} = receive_ws(ConnPid, StreamRef),
+	{ok, close} = receive_ws(ConnPid, StreamRef),
+	gun_down(ConnPid).