|
@@ -196,24 +196,24 @@ before_loop(State=#state{socket=Socket, transport=Transport}, Buffer) ->
|
|
loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
|
|
loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
|
|
timer=TimerRef, children=Children, in_streamid=InStreamID,
|
|
timer=TimerRef, children=Children, in_streamid=InStreamID,
|
|
last_streamid=LastStreamID, streams=Streams}, Buffer) ->
|
|
last_streamid=LastStreamID, streams=Streams}, Buffer) ->
|
|
- {OK, Closed, Error} = Transport:messages(),
|
|
|
|
|
|
+ Messages = Transport:messages(),
|
|
InactivityTimeout = maps:get(inactivity_timeout, Opts, 300000),
|
|
InactivityTimeout = maps:get(inactivity_timeout, Opts, 300000),
|
|
receive
|
|
receive
|
|
%% Discard data coming in after the last request
|
|
%% Discard data coming in after the last request
|
|
%% we want to process was received fully.
|
|
%% we want to process was received fully.
|
|
- {OK, Socket, _} when InStreamID > LastStreamID ->
|
|
|
|
|
|
+ {OK, Socket, _} when OK =:= element(1, Messages), InStreamID > LastStreamID ->
|
|
before_loop(State, Buffer);
|
|
before_loop(State, Buffer);
|
|
%% Socket messages.
|
|
%% Socket messages.
|
|
- {OK, Socket, Data} ->
|
|
|
|
|
|
+ {OK, Socket, Data} when OK =:= element(1, Messages) ->
|
|
%% Only reset the timeout if it is idle_timeout (active streams).
|
|
%% Only reset the timeout if it is idle_timeout (active streams).
|
|
State1 = case Streams of
|
|
State1 = case Streams of
|
|
[] -> State;
|
|
[] -> State;
|
|
_ -> set_timeout(State)
|
|
_ -> set_timeout(State)
|
|
end,
|
|
end,
|
|
parse(<< Buffer/binary, Data/binary >>, State1);
|
|
parse(<< Buffer/binary, Data/binary >>, State1);
|
|
- {Closed, Socket} ->
|
|
|
|
|
|
+ {Closed, Socket} when Closed =:= element(2, Messages) ->
|
|
terminate(State, {socket_error, closed, 'The socket has been closed.'});
|
|
terminate(State, {socket_error, closed, 'The socket has been closed.'});
|
|
- {Error, Socket, Reason} ->
|
|
|
|
|
|
+ {Error, Socket, Reason} when Error =:= element(3, Messages) ->
|
|
terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
|
|
terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
|
|
%% Timeouts.
|
|
%% Timeouts.
|
|
{timeout, Ref, {shutdown, Pid}} ->
|
|
{timeout, Ref, {shutdown, Pid}} ->
|
|
@@ -1406,17 +1406,17 @@ terminate_linger(State=#state{socket=Socket, transport=Transport, opts=Opts}) ->
|
|
end.
|
|
end.
|
|
|
|
|
|
terminate_linger_loop(State=#state{socket=Socket, transport=Transport}, TimerRef) ->
|
|
terminate_linger_loop(State=#state{socket=Socket, transport=Transport}, TimerRef) ->
|
|
- {OK, Closed, Error} = Transport:messages(),
|
|
|
|
|
|
+ Messages = Transport:messages(),
|
|
%% We may already have a message in the mailbox when we do this
|
|
%% We may already have a message in the mailbox when we do this
|
|
%% but it's OK because we are shutting down anyway.
|
|
%% but it's OK because we are shutting down anyway.
|
|
case Transport:setopts(Socket, [{active, once}]) of
|
|
case Transport:setopts(Socket, [{active, once}]) of
|
|
ok ->
|
|
ok ->
|
|
receive
|
|
receive
|
|
- {OK, Socket, _} ->
|
|
|
|
|
|
+ {OK, Socket, _} when OK =:= element(1, Messages) ->
|
|
terminate_linger_loop(State, TimerRef);
|
|
terminate_linger_loop(State, TimerRef);
|
|
- {Closed, Socket} ->
|
|
|
|
|
|
+ {Closed, Socket} when Closed =:= element(2, Messages) ->
|
|
ok;
|
|
ok;
|
|
- {Error, Socket, _} ->
|
|
|
|
|
|
+ {Error, Socket, _} when Error =:= element(3, Messages) ->
|
|
ok;
|
|
ok;
|
|
{timeout, TimerRef, linger_timeout} ->
|
|
{timeout, TimerRef, linger_timeout} ->
|
|
ok;
|
|
ok;
|