|
@@ -102,7 +102,7 @@
|
|
|
|
|
|
%% Streams can spawn zero or more children which are then managed
|
|
|
%% by this module if operating as a supervisor.
|
|
|
- children = [] :: [{pid(), cowboy_stream:streamid()}],
|
|
|
+ children = cowboy_children:init() :: cowboy_children:children(),
|
|
|
|
|
|
%% The client starts by sending a sequence of bytes as a preface,
|
|
|
%% followed by a potentially empty SETTINGS frame. Then the connection
|
|
@@ -194,6 +194,10 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
|
|
|
exit(Reason);
|
|
|
{system, From, Request} ->
|
|
|
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer});
|
|
|
+ %% Timeouts.
|
|
|
+ {timeout, Ref, {shutdown, Pid}} ->
|
|
|
+ cowboy_children:shutdown_timeout(Children, Ref, Pid),
|
|
|
+ loop(State, Buffer);
|
|
|
{timeout, TRef, preface_timeout} ->
|
|
|
case PS of
|
|
|
{preface, _, TRef} ->
|
|
@@ -210,14 +214,10 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
|
|
|
loop(down(State, Pid, Msg), Buffer);
|
|
|
%% Calls from supervisor module.
|
|
|
{'$gen_call', {From, Tag}, which_children} ->
|
|
|
- Workers = [{?MODULE, Pid, worker, [?MODULE]} || {Pid, _} <- Children],
|
|
|
- From ! {Tag, Workers},
|
|
|
+ From ! {Tag, cowboy_children:which_children(Children, ?MODULE)},
|
|
|
loop(State, Buffer);
|
|
|
{'$gen_call', {From, Tag}, count_children} ->
|
|
|
- NbChildren = length(Children),
|
|
|
- Counts = [{specs, 1}, {active, NbChildren},
|
|
|
- {supervisors, 0}, {workers, NbChildren}],
|
|
|
- From ! {Tag, Counts},
|
|
|
+ From ! {Tag, cowboy_children:count_children(Children)},
|
|
|
loop(State, Buffer);
|
|
|
{'$gen_call', {From, Tag}, _} ->
|
|
|
From ! {Tag, {error, ?MODULE}},
|
|
@@ -422,11 +422,16 @@ continuation_frame(State, _) ->
|
|
|
'An invalid frame was received while expecting a CONTINUATION frame. (RFC7540 6.2)'}).
|
|
|
|
|
|
down(State=#state{children=Children0}, Pid, Msg) ->
|
|
|
- case lists:keytake(Pid, 1, Children0) of
|
|
|
- {value, {_, StreamID}, Children} ->
|
|
|
+ case cowboy_children:down(Children0, Pid) of
|
|
|
+ %% The stream was terminated already.
|
|
|
+ {ok, undefined, Children} ->
|
|
|
+ State#state{children=Children};
|
|
|
+ %% The stream is still running.
|
|
|
+ {ok, StreamID, Children} ->
|
|
|
info(State#state{children=Children}, StreamID, Msg);
|
|
|
- false ->
|
|
|
- error_logger:error_msg("Received EXIT signal ~p for unknown process ~p.", [Msg, Pid]),
|
|
|
+ %% The process was unknown.
|
|
|
+ error ->
|
|
|
+ error_logger:error_msg("Received EXIT signal ~p for unknown process ~p.~n", [Msg, Pid]),
|
|
|
State
|
|
|
end.
|
|
|
|
|
@@ -559,8 +564,9 @@ commands(State=#state{socket=Socket, transport=Transport, remote_window=ConnWind
|
|
|
Stream#stream{remote_window=StreamWindow + Size}, Tail);
|
|
|
%% Supervise a child process.
|
|
|
commands(State=#state{children=Children}, Stream=#stream{id=StreamID},
|
|
|
- [{spawn, Pid, _Shutdown}|Tail]) -> %% @todo Shutdown
|
|
|
- commands(State#state{children=[{Pid, StreamID}|Children]}, Stream, Tail);
|
|
|
+ [{spawn, Pid, Shutdown}|Tail]) ->
|
|
|
+ commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)},
|
|
|
+ Stream, Tail);
|
|
|
%% Error handling.
|
|
|
commands(State, Stream=#stream{id=StreamID}, [Error = {internal_error, _, _}|_Tail]) ->
|
|
|
%% @todo Do we want to run the commands after an internal_error?
|
|
@@ -681,7 +687,8 @@ terminate(#state{socket=Socket, transport=Transport, client_streamid=LastStreamI
|
|
|
%% @todo We might want to optionally send the Reason value
|
|
|
%% as debug data in the GOAWAY frame here. Perhaps more.
|
|
|
Transport:send(Socket, cow_http2:goaway(LastStreamID, terminate_reason(Reason), <<>>)),
|
|
|
- terminate_all_streams(Streams, Reason, Children),
|
|
|
+ terminate_all_streams(Streams, Reason),
|
|
|
+ cowboy_children:terminate(Children),
|
|
|
Transport:close(Socket),
|
|
|
exit({shutdown, Reason}).
|
|
|
|
|
@@ -690,15 +697,14 @@ terminate_reason({stop, _, _}) -> no_error;
|
|
|
terminate_reason({socket_error, _, _}) -> internal_error;
|
|
|
terminate_reason({internal_error, _, _}) -> internal_error.
|
|
|
|
|
|
-terminate_all_streams([], _, []) ->
|
|
|
+terminate_all_streams([], _) ->
|
|
|
ok;
|
|
|
%% This stream was already terminated and is now just flushing the data out. Skip it.
|
|
|
-terminate_all_streams([#stream{state=flush}|Tail], Reason, Children) ->
|
|
|
- terminate_all_streams(Tail, Reason, Children);
|
|
|
-terminate_all_streams([#stream{id=StreamID, state=StreamState}|Tail], Reason, Children0) ->
|
|
|
+terminate_all_streams([#stream{state=flush}|Tail], Reason) ->
|
|
|
+ terminate_all_streams(Tail, Reason);
|
|
|
+terminate_all_streams([#stream{id=StreamID, state=StreamState}|Tail], Reason) ->
|
|
|
stream_call_terminate(StreamID, Reason, StreamState),
|
|
|
- Children = stream_terminate_children(Children0, StreamID, []),
|
|
|
- terminate_all_streams(Tail, Reason, Children).
|
|
|
+ terminate_all_streams(Tail, Reason).
|
|
|
|
|
|
%% Stream functions.
|
|
|
|
|
@@ -793,26 +799,26 @@ stream_terminate(State=#state{socket=Socket, transport=Transport,
|
|
|
{value, #stream{state=StreamState, local=idle}, Streams} when Reason =:= normal ->
|
|
|
State1 = info(State, StreamID, {response, 204, #{}, <<>>}),
|
|
|
stream_call_terminate(StreamID, Reason, StreamState),
|
|
|
- Children = stream_terminate_children(Children0, StreamID, []),
|
|
|
+ Children = cowboy_children:shutdown(Children0, StreamID),
|
|
|
State1#state{streams=Streams, children=Children};
|
|
|
%% When a response was sent but not terminated, we need to close the stream.
|
|
|
{value, #stream{state=StreamState, local=nofin, local_buffer_size=0}, Streams}
|
|
|
when Reason =:= normal ->
|
|
|
Transport:send(Socket, cow_http2:data(StreamID, fin, <<>>)),
|
|
|
stream_call_terminate(StreamID, Reason, StreamState),
|
|
|
- Children = stream_terminate_children(Children0, StreamID, []),
|
|
|
+ Children = cowboy_children:shutdown(Children0, StreamID),
|
|
|
State#state{streams=Streams, children=Children};
|
|
|
%% Unless there is still data in the buffer. We can however reset
|
|
|
%% a few fields and set a special local state to avoid confusion.
|
|
|
{value, Stream=#stream{state=StreamState, local=nofin}, Streams} ->
|
|
|
stream_call_terminate(StreamID, Reason, StreamState),
|
|
|
- Children = stream_terminate_children(Children0, StreamID, []),
|
|
|
+ Children = cowboy_children:shutdown(Children0, StreamID),
|
|
|
State#state{streams=[Stream#stream{state=flush, local=flush}|Streams],
|
|
|
children=Children};
|
|
|
%% Otherwise we sent an RST_STREAM and/or the stream is already closed.
|
|
|
{value, #stream{state=StreamState}, Streams} ->
|
|
|
stream_call_terminate(StreamID, Reason, StreamState),
|
|
|
- Children = stream_terminate_children(Children0, StreamID, []),
|
|
|
+ Children = cowboy_children:shutdown(Children0, StreamID),
|
|
|
State#state{streams=Streams, children=Children};
|
|
|
false ->
|
|
|
%% @todo Unknown stream. Not sure what to do here. Check again once all
|
|
@@ -829,17 +835,6 @@ stream_call_terminate(StreamID, Reason, StreamState) ->
|
|
|
[StreamID, Reason, StreamState, Class, Reason])
|
|
|
end.
|
|
|
|
|
|
-stream_terminate_children([], _, Acc) ->
|
|
|
- Acc;
|
|
|
-stream_terminate_children([{Pid, StreamID}|Tail], StreamID, Acc) ->
|
|
|
- %% We unlink and flush the mailbox to avoid receiving a stray message.
|
|
|
- unlink(Pid),
|
|
|
- receive {'EXIT', Pid, _} -> ok after 0 -> ok end,
|
|
|
- exit(Pid, kill),
|
|
|
- stream_terminate_children(Tail, StreamID, Acc);
|
|
|
-stream_terminate_children([Child|Tail], StreamID, Acc) ->
|
|
|
- stream_terminate_children(Tail, StreamID, [Child|Acc]).
|
|
|
-
|
|
|
%% Headers encode/decode.
|
|
|
|
|
|
headers_decode(HeaderBlock, DecodeState0) ->
|
|
@@ -874,9 +869,9 @@ headers_encode(Headers0, EncodeState) ->
|
|
|
system_continue(_, _, {State, Buffer}) ->
|
|
|
loop(State, Buffer).
|
|
|
|
|
|
--spec system_terminate(any(), _, _, _) -> no_return().
|
|
|
-system_terminate(Reason, _, _, _) ->
|
|
|
- exit(Reason).
|
|
|
+-spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return().
|
|
|
+system_terminate(Reason, _, _, {State, _}) ->
|
|
|
+ terminate(State, Reason).
|
|
|
|
|
|
-spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}.
|
|
|
system_code_change(Misc, _, _, _) ->
|