|
@@ -16,16 +16,21 @@
|
|
-module(cowboy_listener).
|
|
-module(cowboy_listener).
|
|
-behaviour(gen_server).
|
|
-behaviour(gen_server).
|
|
|
|
|
|
--export([start_link/1, stop/1,
|
|
|
|
- add_connection/3, move_connection/3, remove_connection/2]). %% API.
|
|
|
|
|
|
+-export([start_link/2, stop/1,
|
|
|
|
+ add_connection/4, move_connection/3, remove_connection/2,
|
|
|
|
+ get_protocol_options/1, set_protocol_options/2]). %% API.
|
|
-export([init/1, handle_call/3, handle_cast/2,
|
|
-export([init/1, handle_call/3, handle_cast/2,
|
|
handle_info/2, terminate/2, code_change/3]). %% gen_server.
|
|
handle_info/2, terminate/2, code_change/3]). %% gen_server.
|
|
|
|
|
|
|
|
+-type pools() :: [{atom(), non_neg_integer()}].
|
|
|
|
+
|
|
-record(state, {
|
|
-record(state, {
|
|
- req_pools = [] :: [{atom(), non_neg_integer()}],
|
|
|
|
|
|
+ req_pools = [] :: pools(),
|
|
reqs_table :: ets:tid(),
|
|
reqs_table :: ets:tid(),
|
|
queue = [] :: [{pid(), reference()}],
|
|
queue = [] :: [{pid(), reference()}],
|
|
- max_conns = undefined :: non_neg_integer()
|
|
|
|
|
|
+ max_conns = undefined :: non_neg_integer(),
|
|
|
|
+ proto_opts :: any(),
|
|
|
|
+ proto_opts_vsn = 1 :: non_neg_integer()
|
|
}).
|
|
}).
|
|
|
|
|
|
%% API.
|
|
%% API.
|
|
@@ -37,9 +42,9 @@
|
|
%% Setting the process priority to high ensures the connection-related code
|
|
%% Setting the process priority to high ensures the connection-related code
|
|
%% will always be executed when a connection needs it, allowing Cowboy to
|
|
%% will always be executed when a connection needs it, allowing Cowboy to
|
|
%% scale far beyond what it would with a normal priority.
|
|
%% scale far beyond what it would with a normal priority.
|
|
--spec start_link(non_neg_integer()) -> {ok, pid()}.
|
|
|
|
-start_link(MaxConns) ->
|
|
|
|
- gen_server:start_link(?MODULE, [MaxConns],
|
|
|
|
|
|
+-spec start_link(non_neg_integer(), any()) -> {ok, pid()}.
|
|
|
|
+start_link(MaxConns, ProtoOpts) ->
|
|
|
|
+ gen_server:start_link(?MODULE, [MaxConns, ProtoOpts],
|
|
[{spawn_opt, [{priority, high}]}]).
|
|
[{spawn_opt, [{priority, high}]}]).
|
|
|
|
|
|
%% @private
|
|
%% @private
|
|
@@ -59,9 +64,15 @@ stop(ServerPid) ->
|
|
%% pool. If the socket has been sent to another process, it is up to the
|
|
%% pool. If the socket has been sent to another process, it is up to the
|
|
%% protocol code to inform the listener of the new <em>ConnPid</em> by removing
|
|
%% protocol code to inform the listener of the new <em>ConnPid</em> by removing
|
|
%% the previous and adding the new one.
|
|
%% the previous and adding the new one.
|
|
--spec add_connection(pid(), atom(), pid()) -> ok.
|
|
|
|
-add_connection(ServerPid, Pool, ConnPid) ->
|
|
|
|
- gen_server:call(ServerPid, {add_connection, Pool, ConnPid}, infinity).
|
|
|
|
|
|
+%%
|
|
|
|
+%% This function also returns whether the protocol options have been modified.
|
|
|
|
+%% If so, then an {upgrade, ProtoOpts, OptsVsn} will be returned instead of
|
|
|
|
+%% the atom 'ok'. The acceptor can then continue with the new protocol options.
|
|
|
|
+-spec add_connection(pid(), atom(), pid(), non_neg_integer())
|
|
|
|
+ -> ok | {upgrade, any(), non_neg_integer()}.
|
|
|
|
+add_connection(ServerPid, Pool, ConnPid, OptsVsn) ->
|
|
|
|
+ gen_server:call(ServerPid, {add_connection, Pool, ConnPid, OptsVsn},
|
|
|
|
+ infinity).
|
|
|
|
|
|
%% @doc Move a connection from one pool to another.
|
|
%% @doc Move a connection from one pool to another.
|
|
-spec move_connection(pid(), atom(), pid()) -> ok.
|
|
-spec move_connection(pid(), atom(), pid()) -> ok.
|
|
@@ -73,35 +84,46 @@ move_connection(ServerPid, DestPool, ConnPid) ->
|
|
remove_connection(ServerPid, ConnPid) ->
|
|
remove_connection(ServerPid, ConnPid) ->
|
|
gen_server:cast(ServerPid, {remove_connection, ConnPid}).
|
|
gen_server:cast(ServerPid, {remove_connection, ConnPid}).
|
|
|
|
|
|
|
|
+%% @doc Return the current protocol options.
|
|
|
|
+-spec get_protocol_options(pid()) -> {ok, any()}.
|
|
|
|
+get_protocol_options(ServerPid) ->
|
|
|
|
+ gen_server:call(ServerPid, get_protocol_options).
|
|
|
|
+
|
|
|
|
+%% @doc Upgrade the protocol options.
|
|
|
|
+-spec set_protocol_options(pid(), any()) -> ok.
|
|
|
|
+set_protocol_options(ServerPid, ProtoOpts) ->
|
|
|
|
+ gen_server:call(ServerPid, {set_protocol_options, ProtoOpts}).
|
|
|
|
+
|
|
%% gen_server.
|
|
%% gen_server.
|
|
|
|
|
|
%% @private
|
|
%% @private
|
|
-spec init(list()) -> {ok, #state{}}.
|
|
-spec init(list()) -> {ok, #state{}}.
|
|
-init([MaxConns]) ->
|
|
|
|
- ReqsTablePid = ets:new(requests_table, [set, private]),
|
|
|
|
- {ok, #state{reqs_table=ReqsTablePid, max_conns=MaxConns}}.
|
|
|
|
|
|
+init([MaxConns, ProtoOpts]) ->
|
|
|
|
+ ReqsTable = ets:new(requests_table, [set, private]),
|
|
|
|
+ {ok, #state{reqs_table=ReqsTable, max_conns=MaxConns,
|
|
|
|
+ proto_opts=ProtoOpts}}.
|
|
|
|
|
|
%% @private
|
|
%% @private
|
|
-spec handle_call(_, _, State)
|
|
-spec handle_call(_, _, State)
|
|
-> {reply, ignored, State} | {stop, normal, stopped, State}.
|
|
-> {reply, ignored, State} | {stop, normal, stopped, State}.
|
|
-handle_call({add_connection, Pool, ConnPid}, From, State=#state{
|
|
|
|
|
|
+handle_call({add_connection, Pool, ConnPid, AccOptsVsn}, From, State=#state{
|
|
req_pools=Pools, reqs_table=ReqsTable,
|
|
req_pools=Pools, reqs_table=ReqsTable,
|
|
- queue=Queue, max_conns=MaxConns}) ->
|
|
|
|
- MonitorRef = erlang:monitor(process, ConnPid),
|
|
|
|
- ConnPid ! {shoot, self()},
|
|
|
|
- {NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of
|
|
|
|
- false ->
|
|
|
|
- {1, [{Pool, 1}|Pools]};
|
|
|
|
- {Pool, NbConns} ->
|
|
|
|
- NbConns2 = NbConns + 1,
|
|
|
|
- {NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]}
|
|
|
|
- end,
|
|
|
|
- ets:insert(ReqsTable, {ConnPid, {MonitorRef, Pool}}),
|
|
|
|
- if NbConnsRet > MaxConns ->
|
|
|
|
- {noreply, State#state{req_pools=Pools2, queue=[From|Queue]}};
|
|
|
|
|
|
+ queue=Queue, max_conns=MaxConns,
|
|
|
|
+ proto_opts=ProtoOpts, proto_opts_vsn=LisOptsVsn}) ->
|
|
|
|
+ {NbConns, Pools2} = add_pid(ConnPid, Pool, Pools, ReqsTable),
|
|
|
|
+ State2 = State#state{req_pools=Pools2},
|
|
|
|
+ if AccOptsVsn =/= LisOptsVsn ->
|
|
|
|
+ {reply, {ugprade, ProtoOpts, LisOptsVsn}, State2};
|
|
|
|
+ NbConns > MaxConns ->
|
|
|
|
+ {noreply, State2#state{queue=[From|Queue]}};
|
|
true ->
|
|
true ->
|
|
- {reply, ok, State#state{req_pools=Pools2}}
|
|
|
|
|
|
+ {reply, ok, State2}
|
|
end;
|
|
end;
|
|
|
|
+handle_call(get_protocol_options, _From, State=#state{proto_opts=ProtoOpts}) ->
|
|
|
|
+ {reply, {ok, ProtoOpts}, State};
|
|
|
|
+handle_call({set_protocol_options, ProtoOpts}, _From,
|
|
|
|
+ State=#state{proto_opts_vsn=OptsVsn}) ->
|
|
|
|
+ {reply, ok, State#state{proto_opts=ProtoOpts, proto_opts_vsn=OptsVsn + 1}};
|
|
handle_call(stop, _From, State) ->
|
|
handle_call(stop, _From, State) ->
|
|
{stop, normal, stopped, State};
|
|
{stop, normal, stopped, State};
|
|
handle_call(_Request, _From, State) ->
|
|
handle_call(_Request, _From, State) ->
|
|
@@ -148,6 +170,22 @@ code_change(_OldVsn, State, _Extra) ->
|
|
%% Internal.
|
|
%% Internal.
|
|
|
|
|
|
%% @private
|
|
%% @private
|
|
|
|
+-spec add_pid(pid(), atom(), pools(), ets:tid())
|
|
|
|
+ -> {non_neg_integer(), pools()}.
|
|
|
|
+add_pid(ConnPid, Pool, Pools, ReqsTable) ->
|
|
|
|
+ MonitorRef = erlang:monitor(process, ConnPid),
|
|
|
|
+ ConnPid ! {shoot, self()},
|
|
|
|
+ {NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of
|
|
|
|
+ false ->
|
|
|
|
+ {1, [{Pool, 1}|Pools]};
|
|
|
|
+ {Pool, NbConns} ->
|
|
|
|
+ NbConns2 = NbConns + 1,
|
|
|
|
+ {NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]}
|
|
|
|
+ end,
|
|
|
|
+ ets:insert(ReqsTable, {ConnPid, {MonitorRef, Pool}}),
|
|
|
|
+ {NbConnsRet, Pools2}.
|
|
|
|
+
|
|
|
|
+%% @private
|
|
-spec remove_pid(pid(), State) -> State.
|
|
-spec remove_pid(pid(), State) -> State.
|
|
remove_pid(Pid, State=#state{
|
|
remove_pid(Pid, State=#state{
|
|
req_pools=Pools, reqs_table=ReqsTable, queue=Queue}) ->
|
|
req_pools=Pools, reqs_table=ReqsTable, queue=Queue}) ->
|