|
@@ -33,9 +33,11 @@
|
|
|
|
|
|
%% Internal transport functions.
|
|
|
-export([name/0]).
|
|
|
+-export([messages/0]).
|
|
|
-export([recv/3]).
|
|
|
-export([send/2]).
|
|
|
-export([sendfile/2]).
|
|
|
+-export([setopts/2]).
|
|
|
|
|
|
-type streamid() :: non_neg_integer().
|
|
|
-type socket() :: {pid(), streamid()}.
|
|
@@ -45,8 +47,8 @@
|
|
|
pid :: pid(),
|
|
|
input = nofin :: fin | nofin,
|
|
|
in_buffer = <<>> :: binary(),
|
|
|
- is_recv = false :: {true, {non_neg_integer(), pid()},
|
|
|
- pid(), non_neg_integer(), reference()} | false,
|
|
|
+ is_recv = false :: false | {active, socket(), pid()}
|
|
|
+ | {passive, socket(), pid(), non_neg_integer(), reference()},
|
|
|
output = nofin :: fin | nofin
|
|
|
}).
|
|
|
|
|
@@ -138,15 +140,15 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
|
|
|
TRef = erlang:send_after(Timeout, self(),
|
|
|
{recv_timeout, FromSocket}),
|
|
|
loop(replace_child(Child#child{
|
|
|
- is_recv={true, FromSocket, FromPid, Length, TRef}},
|
|
|
+ is_recv={passive, FromSocket, FromPid, Length, TRef}},
|
|
|
State))
|
|
|
end;
|
|
|
{recv_timeout, {Pid, StreamID}}
|
|
|
when Pid =:= self() ->
|
|
|
- Child = #child{is_recv={true, FromSocket, FromPid, _, _}}
|
|
|
+ Child = #child{is_recv={passive, FromSocket, FromPid, _, _}}
|
|
|
= get_child(StreamID, State),
|
|
|
FromPid ! {recv, FromSocket, {error, timeout}},
|
|
|
- loop(replace_child(Child#child{is_recv=false}, State));
|
|
|
+ loop(replace_child(Child#child{is_recv=passive}, State));
|
|
|
{reply, {Pid, StreamID}, Status, Headers}
|
|
|
when Pid =:= self() ->
|
|
|
Child = #child{output=nofin} = get_child(StreamID, State),
|
|
@@ -178,6 +180,22 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
|
|
|
Child = #child{output=nofin} = get_child(StreamID, State),
|
|
|
data_from_file(State, StreamID, Filepath),
|
|
|
loop(replace_child(Child#child{output=fin}, State));
|
|
|
+ {active, FromSocket = {Pid, StreamID}, FromPid} when Pid =:= self() ->
|
|
|
+ Child = #child{in_buffer=InBuffer, is_recv=false}
|
|
|
+ = get_child(StreamID, State),
|
|
|
+ case InBuffer of
|
|
|
+ <<>> ->
|
|
|
+ loop(replace_child(Child#child{
|
|
|
+ is_recv={active, FromSocket, FromPid}}, State));
|
|
|
+ _ ->
|
|
|
+ FromPid ! {spdy, FromSocket, InBuffer},
|
|
|
+ loop(replace_child(Child#child{in_buffer= <<>>}, State))
|
|
|
+ end;
|
|
|
+ {passive, FromSocket = {Pid, StreamID}, FromPid} when Pid =:= self() ->
|
|
|
+ Child = #child{is_recv=IsRecv} = get_child(StreamID, State),
|
|
|
+ %% Make sure we aren't in the middle of a recv call.
|
|
|
+ case IsRecv of false -> ok; {active, FromSocket, FromPid} -> ok end,
|
|
|
+ loop(replace_child(Child#child{is_recv=false}, State));
|
|
|
{'EXIT', Parent, Reason} ->
|
|
|
exit(Reason);
|
|
|
{'EXIT', Pid, _} ->
|
|
@@ -262,11 +280,14 @@ handle_frame(State, {data, StreamID, IsFin, Data}) ->
|
|
|
Data2 = << Buffer/binary, Data/binary >>,
|
|
|
IsFin2 = if IsFin -> fin; true -> nofin end,
|
|
|
Child2 = case IsRecv of
|
|
|
- {true, FromSocket, FromPid, 0, TRef} ->
|
|
|
+ {active, FromSocket, FromPid} ->
|
|
|
+ FromPid ! {spdy, FromSocket, Data},
|
|
|
+ Child#child{input=IsFin2, is_recv=false};
|
|
|
+ {passive, FromSocket, FromPid, 0, TRef} ->
|
|
|
FromPid ! {recv, FromSocket, {ok, Data2}},
|
|
|
cancel_recv_timeout(StreamID, TRef),
|
|
|
Child#child{input=IsFin2, in_buffer= <<>>, is_recv=false};
|
|
|
- {true, FromSocket, FromPid, Length, TRef}
|
|
|
+ {passive, FromSocket, FromPid, Length, TRef}
|
|
|
when byte_size(Data2) >= Length ->
|
|
|
<< Data3:Length/binary, Rest/binary >> = Data2,
|
|
|
FromPid ! {recv, FromSocket, {ok, Data3}},
|
|
@@ -443,6 +464,10 @@ stream_close(Socket = {Pid, _}) ->
|
|
|
name() ->
|
|
|
spdy.
|
|
|
|
|
|
+-spec messages() -> {spdy, spdy_closed, spdy_error}.
|
|
|
+messages() ->
|
|
|
+ {spdy, spdy_closed, spdy_error}.
|
|
|
+
|
|
|
-spec recv(socket(), non_neg_integer(), timeout())
|
|
|
-> {ok, binary()} | {error, timeout}.
|
|
|
recv(Socket = {Pid, _}, Length, Timeout) ->
|
|
@@ -463,3 +488,11 @@ send(Socket, Data) ->
|
|
|
sendfile(Socket = {Pid, _}, Filepath) ->
|
|
|
_ = Pid ! {sendfile, Socket, Filepath},
|
|
|
{ok, undefined}.
|
|
|
+
|
|
|
+-spec setopts(inet:socket(), list()) -> ok.
|
|
|
+setopts(Socket = {Pid, _}, [{active, once}]) ->
|
|
|
+ _ = Pid ! {active, Socket, self()},
|
|
|
+ ok;
|
|
|
+setopts(Socket = {Pid, _}, [{active, false}]) ->
|
|
|
+ _ = Pid ! {passive, Socket, self()},
|
|
|
+ ok.
|