|
@@ -136,8 +136,16 @@ loop(State=#state{parent=Parent, ref=Ref, conn_type=ConnType,
|
|
To ! {Tag, CurConns},
|
|
To ! {Tag, CurConns},
|
|
loop(State, CurConns, NbChildren, Sleepers);
|
|
loop(State, CurConns, NbChildren, Sleepers);
|
|
%% Remove a connection from the count of connections.
|
|
%% Remove a connection from the count of connections.
|
|
- {remove_connection, Ref} ->
|
|
|
|
- loop(State, CurConns - 1, NbChildren, Sleepers);
|
|
|
|
|
|
+ {remove_connection, Ref, Pid} ->
|
|
|
|
+ case put(Pid, removed) of
|
|
|
|
+ active ->
|
|
|
|
+ loop(State, CurConns - 1, NbChildren, Sleepers);
|
|
|
|
+ remove ->
|
|
|
|
+ loop(State, CurConns, NbChildren, Sleepers);
|
|
|
|
+ undefined ->
|
|
|
|
+ _ = erase(Pid),
|
|
|
|
+ loop(State, CurConns, NbChildren, Sleepers)
|
|
|
|
+ end;
|
|
%% Upgrade the max number of connections allowed concurrently.
|
|
%% Upgrade the max number of connections allowed concurrently.
|
|
%% We resume all sleeping acceptors if this number increases.
|
|
%% We resume all sleeping acceptors if this number increases.
|
|
{set_max_conns, MaxConns2} when MaxConns2 > MaxConns ->
|
|
{set_max_conns, MaxConns2} when MaxConns2 > MaxConns ->
|
|
@@ -154,24 +162,41 @@ loop(State=#state{parent=Parent, ref=Ref, conn_type=ConnType,
|
|
{'EXIT', Parent, Reason} ->
|
|
{'EXIT', Parent, Reason} ->
|
|
terminate(State, Reason, NbChildren);
|
|
terminate(State, Reason, NbChildren);
|
|
{'EXIT', Pid, Reason} when Sleepers =:= [] ->
|
|
{'EXIT', Pid, Reason} when Sleepers =:= [] ->
|
|
- report_error(Ref, Protocol, Pid, Reason),
|
|
|
|
- erase(Pid),
|
|
|
|
- loop(State, CurConns - 1, NbChildren - 1, Sleepers);
|
|
|
|
|
|
+ case erase(Pid) of
|
|
|
|
+ active ->
|
|
|
|
+ report_error(Ref, Protocol, Pid, Reason),
|
|
|
|
+ loop(State, CurConns - 1, NbChildren - 1, Sleepers);
|
|
|
|
+ removed ->
|
|
|
|
+ report_error(Ref, Protocol, Pid, Reason),
|
|
|
|
+ loop(State, CurConns, NbChildren - 1, Sleepers);
|
|
|
|
+ undefined ->
|
|
|
|
+ loop(State, CurConns, NbChildren, Sleepers)
|
|
|
|
+ end;
|
|
%% Resume a sleeping acceptor if needed.
|
|
%% Resume a sleeping acceptor if needed.
|
|
{'EXIT', Pid, Reason} ->
|
|
{'EXIT', Pid, Reason} ->
|
|
- report_error(Ref, Protocol, Pid, Reason),
|
|
|
|
- erase(Pid),
|
|
|
|
- [To|Sleepers2] = Sleepers,
|
|
|
|
- To ! self(),
|
|
|
|
- loop(State, CurConns - 1, NbChildren - 1, Sleepers2);
|
|
|
|
|
|
+ case erase(Pid) of
|
|
|
|
+ active when CurConns > MaxConns ->
|
|
|
|
+ report_error(Ref, Protocol, Pid, Reason),
|
|
|
|
+ loop(State, CurConns - 1, NbChildren - 1, Sleepers);
|
|
|
|
+ active ->
|
|
|
|
+ report_error(Ref, Protocol, Pid, Reason),
|
|
|
|
+ [To|Sleepers2] = Sleepers,
|
|
|
|
+ To ! self(),
|
|
|
|
+ loop(State, CurConns - 1, NbChildren - 1, Sleepers2);
|
|
|
|
+ removed ->
|
|
|
|
+ report_error(Ref, Protocol, Pid, Reason),
|
|
|
|
+ loop(State, CurConns, NbChildren - 1, Sleepers);
|
|
|
|
+ undefined ->
|
|
|
|
+ loop(State, CurConns, NbChildren, Sleepers)
|
|
|
|
+ end;
|
|
{system, From, Request} ->
|
|
{system, From, Request} ->
|
|
sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
|
|
sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
|
|
{State, CurConns, NbChildren, Sleepers});
|
|
{State, CurConns, NbChildren, Sleepers});
|
|
%% Calls from the supervisor module.
|
|
%% Calls from the supervisor module.
|
|
{'$gen_call', {To, Tag}, which_children} ->
|
|
{'$gen_call', {To, Tag}, which_children} ->
|
|
- Pids = get_keys(true),
|
|
|
|
Children = [{Protocol, Pid, ConnType, [Protocol]}
|
|
Children = [{Protocol, Pid, ConnType, [Protocol]}
|
|
- || Pid <- Pids, is_pid(Pid)],
|
|
|
|
|
|
+ || {Pid, Type} <- get(),
|
|
|
|
+ Type =:= active orelse Type =:= removed],
|
|
To ! {Tag, Children},
|
|
To ! {Tag, Children},
|
|
loop(State, CurConns, NbChildren, Sleepers);
|
|
loop(State, CurConns, NbChildren, Sleepers);
|
|
{'$gen_call', {To, Tag}, count_children} ->
|
|
{'$gen_call', {To, Tag}, count_children} ->
|
|
@@ -196,7 +221,7 @@ shoot(State=#state{ref=Ref, transport=Transport, ack_timeout=AckTimeout, max_con
|
|
case Transport:controlling_process(Socket, ProtocolPid) of
|
|
case Transport:controlling_process(Socket, ProtocolPid) of
|
|
ok ->
|
|
ok ->
|
|
ProtocolPid ! {shoot, Ref, Transport, Socket, AckTimeout},
|
|
ProtocolPid ! {shoot, Ref, Transport, Socket, AckTimeout},
|
|
- put(SupPid, true),
|
|
|
|
|
|
+ put(SupPid, active),
|
|
CurConns2 = CurConns + 1,
|
|
CurConns2 = CurConns + 1,
|
|
if CurConns2 < MaxConns ->
|
|
if CurConns2 < MaxConns ->
|
|
To ! self(),
|
|
To ! self(),
|
|
@@ -213,18 +238,14 @@ shoot(State=#state{ref=Ref, transport=Transport, ack_timeout=AckTimeout, max_con
|
|
end.
|
|
end.
|
|
|
|
|
|
-spec terminate(#state{}, any(), non_neg_integer()) -> no_return().
|
|
-spec terminate(#state{}, any(), non_neg_integer()) -> no_return().
|
|
-%% Kill all children and then exit. We unlink first to avoid
|
|
|
|
-%% getting a message for each child getting killed.
|
|
|
|
terminate(#state{shutdown=brutal_kill}, Reason, _) ->
|
|
terminate(#state{shutdown=brutal_kill}, Reason, _) ->
|
|
- Pids = get_keys(true),
|
|
|
|
- _ = [begin
|
|
|
|
- unlink(P),
|
|
|
|
- exit(P, kill)
|
|
|
|
- end || P <- Pids],
|
|
|
|
|
|
+ kill_children(get_keys(active)),
|
|
|
|
+ kill_children(get_keys(removed)),
|
|
exit(Reason);
|
|
exit(Reason);
|
|
%% Attempt to gracefully shutdown all children.
|
|
%% Attempt to gracefully shutdown all children.
|
|
terminate(#state{shutdown=Shutdown}, Reason, NbChildren) ->
|
|
terminate(#state{shutdown=Shutdown}, Reason, NbChildren) ->
|
|
- shutdown_children(),
|
|
|
|
|
|
+ shutdown_children(get_keys(active)),
|
|
|
|
+ shutdown_children(get_keys(removed)),
|
|
_ = if
|
|
_ = if
|
|
Shutdown =:= infinity ->
|
|
Shutdown =:= infinity ->
|
|
ok;
|
|
ok;
|
|
@@ -234,11 +255,19 @@ terminate(#state{shutdown=Shutdown}, Reason, NbChildren) ->
|
|
wait_children(NbChildren),
|
|
wait_children(NbChildren),
|
|
exit(Reason).
|
|
exit(Reason).
|
|
|
|
|
|
|
|
+%% Kill all children and then exit. We unlink first to avoid
|
|
|
|
+%% getting a message for each child getting killed.
|
|
|
|
+kill_children(Pids) ->
|
|
|
|
+ _ = [begin
|
|
|
|
+ unlink(P),
|
|
|
|
+ exit(P, kill)
|
|
|
|
+ end || P <- Pids],
|
|
|
|
+ ok.
|
|
|
|
+
|
|
%% Monitor processes so we can know which ones have shutdown
|
|
%% Monitor processes so we can know which ones have shutdown
|
|
%% before the timeout. Unlink so we avoid receiving an extra
|
|
%% before the timeout. Unlink so we avoid receiving an extra
|
|
%% message. Then send a shutdown exit signal.
|
|
%% message. Then send a shutdown exit signal.
|
|
-shutdown_children() ->
|
|
|
|
- Pids = get_keys(true),
|
|
|
|
|
|
+shutdown_children(Pids) ->
|
|
_ = [begin
|
|
_ = [begin
|
|
monitor(process, P),
|
|
monitor(process, P),
|
|
unlink(P),
|
|
unlink(P),
|
|
@@ -251,11 +280,16 @@ wait_children(0) ->
|
|
wait_children(NbChildren) ->
|
|
wait_children(NbChildren) ->
|
|
receive
|
|
receive
|
|
{'DOWN', _, process, Pid, _} ->
|
|
{'DOWN', _, process, Pid, _} ->
|
|
- _ = erase(Pid),
|
|
|
|
- wait_children(NbChildren - 1);
|
|
|
|
|
|
+ case erase(Pid) of
|
|
|
|
+ active -> wait_children(NbChildren - 1);
|
|
|
|
+ removed -> wait_children(NbChildren - 1);
|
|
|
|
+ _ -> wait_children(NbChildren)
|
|
|
|
+ end;
|
|
kill ->
|
|
kill ->
|
|
- Pids = get_keys(true),
|
|
|
|
- _ = [exit(P, kill) || P <- Pids],
|
|
|
|
|
|
+ Active = get_keys(active),
|
|
|
|
+ _ = [exit(P, kill) || P <- Active],
|
|
|
|
+ Removed = get_keys(removed),
|
|
|
|
+ _ = [exit(P, kill) || P <- Removed],
|
|
ok
|
|
ok
|
|
end.
|
|
end.
|
|
|
|
|