123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401 |
- %% @author Seth Falcon <seth@userprimary.net>
- %% @copyright 2011 Seth Falcon
- %% @doc This is the main interface to the pooler application
- %%
- %% To integrate with your application, you probably want to call
- %% application:start(pooler) after having specified appropriate
- %% configuration for the pooler application (either via a config file
- %% or appropriate calls to the application module to set the
- %% application's config).
- %%
- -module(pooler).
- -behaviour(gen_server).
- -define(SERVER, ?MODULE).
- -include_lib("eunit/include/eunit.hrl").
- -record(pool, {
- name :: string(),
- max_count = 100 :: non_neg_integer(),
- init_count = 10 :: non_neg_integer(),
- start_mfa :: {atom(), atom(), [term()]},
- free_pids = [] :: [pid()],
- in_use_count = 0 :: non_neg_integer(),
- free_count = 0 :: non_neg_integer()
- }).
- -record(state, {
- npools :: non_neg_integer(),
- pools = dict:new() :: dict(),
- pool_sups = dict:new() :: dict(),
- all_members = dict:new() :: dict(),
- consumer_to_pid = dict:new() :: dict(),
- pool_selector :: array()
- }).
- -define(gv(X, Y), proplists:get_value(X, Y)).
- -define(gv(X, Y, D), proplists:get_value(X, Y, D)).
- %% ------------------------------------------------------------------
- %% API Function Exports
- %% ------------------------------------------------------------------
- -export([start/1,
- start_link/1,
- stop/0,
- take_member/0,
- return_member/2,
- % remove_pool/2,
- % add_pool/1,
- pool_stats/0]).
- %% ------------------------------------------------------------------
- %% gen_server Function Exports
- %% ------------------------------------------------------------------
- -export([init/1,
- handle_call/3,
- handle_cast/2,
- handle_info/2,
- terminate/2,
- code_change/3]).
- %% ------------------------------------------------------------------
- %% API Function Definitions
- %% ------------------------------------------------------------------
- start_link(Config) ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, Config, []).
- start(Config) ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, Config, []).
- stop() ->
- gen_server:call(?SERVER, stop).
- %% @doc Obtain exclusive access to a member from a randomly selected pool.
- %%
- %% If there are no free members in the randomly selected pool, then a
- %% member will be returned from the pool with the most free members.
- %% If no free members are available, 'error_no_members' is returned.
- %%
- -spec take_member() -> pid() | error_no_members.
- take_member() ->
- gen_server:call(?SERVER, take_member).
- %% @doc Return a member to the pool so it can be reused.
- %%
- %% If `Status' is 'ok', the member is returned to the pool. If
- %% `Status' is 'fail', the member is destroyed and a new member is
- %% added to the pool in its place.
- -spec return_member(pid(), ok | fail) -> ok.
- return_member(Pid, Status) when Status == ok; Status == fail ->
- CPid = self(),
- gen_server:cast(?SERVER, {return_member, Pid, Status, CPid}),
- ok.
- % TODO:
- % remove_pool(Name, How) when How == graceful; How == immediate ->
- % gen_server:call(?SERVER, {remove_pool, Name, How}).
- % TODO:
- % add_pool(Pool) ->
- % gen_server:call(?SERVER, {add_pool, Pool}).
- %% @doc Obtain runtime state info for all pools.
- %%
- %% Format of the return value is subject to change.
- -spec pool_stats() -> [tuple()].
- pool_stats() ->
- gen_server:call(?SERVER, pool_stats).
- %% ------------------------------------------------------------------
- %% gen_server Function Definitions
- %% ------------------------------------------------------------------
- -spec init([any()]) -> {'ok', #state{npools::'undefined' | non_neg_integer(),
- pools::dict(),
- pool_sups::dict(),
- all_members::dict(),
- consumer_to_pid::dict(),
- pool_selector::'undefined' | array()}}.
- init(Config) ->
- PoolRecs = [ props_to_pool(P) || P <- ?gv(pools, Config) ],
- Pools = [ {Pool#pool.name, Pool} || Pool <- PoolRecs ],
- PoolSups =
- lists:map(
- fun(#pool{name = Name, start_mfa = MFA}) ->
- {ok, SupPid} = supervisor:start_child(pooler_pool_sup, [MFA]),
- {Name, SupPid}
- end, PoolRecs),
- State0 = #state{npools = length(Pools),
- pools = dict:from_list(Pools),
- pool_sups = dict:from_list(PoolSups),
- pool_selector = array:from_list([PN || {PN, _} <- Pools])
- },
- {ok, State} = lists:foldl(
- fun(#pool{name = PName, init_count = N}, {ok, AccState}) ->
- add_pids(PName, N, AccState)
- end, {ok, State0}, PoolRecs),
- process_flag(trap_exit, true),
- {ok, State}.
- -spec handle_call(_, _, _) -> {'noreply','ok',_} |
- {'reply',
- 'error_no_members' | pid() | [{_,_}],
- #state{npools::'undefined' | non_neg_integer(),
- pools::dict(),
- pool_sups::dict(),
- all_members::dict(),
- consumer_to_pid::dict(),
- pool_selector::'undefined' | array()}}
- | {'stop','normal','stop_ok', _}.
- handle_call(take_member, {CPid, _Tag},
- State = #state{pool_selector = PS, npools = NP}) ->
- % attempt to return a member from a randomly selected pool. If
- % that pool has no members, find the pool with most free members
- % and return a member from there.
- PoolName = array:get(crypto:rand_uniform(0, NP), PS),
- case take_member(PoolName, CPid, State) of
- {error_no_members, NewState} ->
- case max_free_pool(State#state.pools) of
- error_no_members ->
- {reply, error_no_members, NewState};
- MaxFreePoolName ->
- {NewPid, State2} = take_member(MaxFreePoolName, CPid, NewState),
- {reply, NewPid, State2}
- end;
- {NewPid, NewState} ->
- {reply, NewPid, NewState}
- end;
- handle_call(stop, _From, State) ->
- {stop, normal, stop_ok, State};
- handle_call(pool_stats, _From, State) ->
- {reply, dict:to_list(State#state.all_members), State};
- handle_call(_Request, _From, State) ->
- {noreply, ok, State}.
- -spec handle_cast(_,_) -> {'noreply', _}.
- handle_cast({return_member, Pid, Status, _CPid}, State) ->
- {noreply, do_return_member(Pid, Status, State)};
- handle_cast(_Msg, State) ->
- {noreply, State}.
- -spec handle_info(_, _) -> {'noreply', _}.
- handle_info({'EXIT', Pid, Reason}, State) ->
- State1 =
- case dict:find(Pid, State#state.all_members) of
- {ok, {_PoolName, _ConsumerPid, _Time}} ->
- do_return_member(Pid, fail, State);
- error ->
- case dict:find(Pid, State#state.consumer_to_pid) of
- {ok, Pids} ->
- IsOk = case Reason of
- normal -> ok;
- _Crash -> fail
- end,
- lists:foldl(
- fun(P, S) -> do_return_member(P, IsOk, S) end,
- State, Pids);
- error ->
- State
- end
- end,
- {noreply, State1};
- handle_info(_Info, State) ->
- {noreply, State}.
- -spec terminate(_, _) -> 'ok'.
- terminate(_Reason, _State) ->
- ok.
- -spec code_change(_, _, _) -> {'ok', _}.
- code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
- %% ------------------------------------------------------------------
- %% Internal Function Definitions
- %% ------------------------------------------------------------------
- -spec props_to_pool([{atom(), term()}]) -> #pool{}.
- props_to_pool(P) ->
- #pool{ name = ?gv(name, P),
- max_count = ?gv(max_count, P),
- init_count = ?gv(init_count, P),
- start_mfa = ?gv(start_mfa, P)}.
- % FIXME: creation of new pids should probably happen
- % in a spawned process to avoid tying up the loop.
- -spec add_pids(error | string(), non_neg_integer(), #state{}) ->
- {bad_pool_name | max_count_reached | ok, #state{}}.
- add_pids(error, _N, State) ->
- {bad_pool_name, State};
- add_pids(PoolName, N, State) ->
- #state{pools = Pools, pool_sups = PoolSups,
- all_members = AllMembers} = State,
- Pool = dict:fetch(PoolName, Pools),
- #pool{max_count = Max, free_pids = Free,
- in_use_count = NumInUse, free_count = NumFree} = Pool,
- Total = NumFree + NumInUse,
- case Total + N =< Max of
- true ->
- PoolSup = dict:fetch(PoolName, PoolSups),
- {AllMembers1, NewPids} = start_n_pids(N, PoolName, PoolSup,
- AllMembers),
- % should we sanity check or take length(Free ++ NewPids)
- % as free_count?
- Pool1 = Pool#pool{free_pids = Free ++ NewPids,
- free_count = NumFree + N},
- {ok, State#state{pools = dict:store(PoolName, Pool1, Pools),
- all_members = AllMembers1}};
- false ->
- {max_count_reached, State}
- end.
- -spec take_member(string(), pid(), #state{}) ->
- {error_no_members | pid(), #state{}}.
- take_member(PoolName, From, State) ->
- #state{pools = Pools, consumer_to_pid = CPMap} = State,
- Pool = dict:fetch(PoolName, Pools),
- #pool{max_count = Max, free_pids = Free, in_use_count = NumInUse,
- free_count = NumFree} = Pool,
- case Free of
- [] when NumInUse == Max ->
- {error_no_members, State};
- [] when NumInUse < Max ->
- case add_pids(PoolName, 1, State) of
- {ok, State1} ->
- take_member(PoolName, From, State1);
- {max_count_reached, _} ->
- {error_no_members, State}
- end;
- [Pid|Rest] ->
- erlang:link(From),
- Pool1 = Pool#pool{free_pids = Rest, in_use_count = NumInUse + 1,
- free_count = NumFree - 1},
- CPMap1 = dict:update(From, fun(O) -> [Pid|O] end, [Pid], CPMap),
- AllMembers =
- dict:update(Pid,
- fun({PName, free, Time}) -> {PName, From, Time} end,
- State#state.all_members),
- {Pid, State#state{pools = dict:store(PoolName, Pool1, Pools),
- consumer_to_pid = CPMap1,
- all_members = AllMembers}}
- end.
- -spec do_return_member(pid(), ok | fail, #state{}) -> #state{}.
- do_return_member(Pid, ok, State = #state{all_members = AllMembers}) ->
- {PoolName, _CPid, _} = dict:fetch(Pid, AllMembers),
- Pool = dict:fetch(PoolName, State#state.pools),
- #pool{free_pids = Free, in_use_count = NumInUse,
- free_count = NumFree} = Pool,
- Pool1 = Pool#pool{free_pids = [Pid | Free], in_use_count = NumInUse - 1,
- free_count = NumFree + 1},
- State#state{pools = dict:store(PoolName, Pool1, State#state.pools),
- all_members = dict:store(Pid, {PoolName, free, os:timestamp()},
- AllMembers)};
- do_return_member(Pid, fail, State = #state{all_members = AllMembers}) ->
- % for the fail case, perhaps the member crashed and was alerady
- % removed, so use find instead of fetch and ignore missing.
- case dict:find(Pid, AllMembers) of
- {ok, {PoolName, _, _}} ->
- State1 = remove_pid(Pid, State),
- {Status, State2} = add_pids(PoolName, 1, State1),
- case Status =:= ok orelse Status =:= max_count_reached of
- true ->
- State2;
- false ->
- erlang:error({error, "unexpected return from add_pid",
- Status, erlang:get_stacktrace()})
- end;
- error ->
- State
- end.
-
- % @doc Remove `Pid' from the pid list associated with `CPid' in the
- % consumer to member map given by `CPMap'.
- %
- % If `Pid' is the last element in `CPid's pid list, then the `CPid'
- % entry is removed entirely.
- %
- -spec cpmap_remove(pid(), pid(), dict()) -> dict().
- cpmap_remove(Pid, CPid, CPMap) ->
- case dict:find(CPid, CPMap) of
- {ok, Pids0} ->
- unlink(CPid), % FIXME: flush msg queue here?
- Pids1 = lists:delete(Pid, Pids0),
- case Pids1 of
- [_H|_T] ->
- dict:store(CPid, Pids1, CPMap);
- [] ->
- dict:erase(CPid, CPMap)
- end;
- error ->
- % FIXME: this shouldn't happen, should we log or error?
- CPMap
- end.
- % @doc Remove and kill a pool member.
- %
- % Handles in-use and free members. Logs an error if the pid is not
- % tracked in state.all_members.
- %
- -spec remove_pid(pid(), #state{}) -> #state{}.
- remove_pid(Pid, State) ->
- #state{all_members = AllMembers, pools = Pools,
- consumer_to_pid = CPMap} = State,
- case dict:find(Pid, AllMembers) of
- {ok, {PoolName, free, _Time}} ->
- % remove an unused member
- Pool = dict:fetch(PoolName, Pools),
- Pool1 = lists:delete(Pid, Pool#pool.free_pids),
- exit(Pid, kill),
- State#state{pools = dict:store(PoolName, Pool1, Pools),
- all_members = dict:erase(Pid, AllMembers)};
- {ok, {PoolName, CPid, _Time}} ->
- Pool = dict:fetch(PoolName, Pools),
- Pool1 = Pool#pool{in_use_count = Pool#pool.in_use_count - 1},
- exit(Pid, kill),
- State#state{pools = dict:store(PoolName, Pool1, Pools),
- consumer_to_pid = cpmap_remove(Pid, CPid, CPMap),
- all_members = dict:erase(Pid, AllMembers)};
- error ->
- error_logger:error_report({unknown_pid, Pid,
- erlang:get_stacktrace()}),
- State
- end.
- -spec max_free_pool(dict()) -> error_no_members | string().
- max_free_pool(Pools) ->
- case dict:fold(fun fold_max_free_count/3, {"", 0}, Pools) of
- {"", 0} -> error_no_members;
- {MaxFreePoolName, _} -> MaxFreePoolName
- end.
- -spec fold_max_free_count(string(), #pool{}, {string(), non_neg_integer()}) ->
- {string(), non_neg_integer()}.
- fold_max_free_count(Name, Pool, {CName, CMax}) ->
- case Pool#pool.free_count > CMax of
- true -> {Name, Pool#pool.free_count};
- false -> {CName, CMax}
- end.
- -spec start_n_pids(non_neg_integer(), string(), pid(), dict()) ->
- {dict(), [pid()]}.
- start_n_pids(N, PoolName, PoolSup, AllMembers) ->
- NewPids = lists:map(
- fun(_I) ->
- {ok, Pid} = supervisor:start_child(PoolSup, []),
- % FIXME: race condition here if child
- % crashes early.
- erlang:link(Pid),
- Pid
- end, lists:seq(1, N)),
- AllMembers1 = lists:foldl(
- fun(M, Dict) ->
- Time = os:timestamp(),
- dict:store(M, {PoolName, free, Time}, Dict)
- end, AllMembers, NewPids),
- {AllMembers1, NewPids}.
|