|
@@ -10,7 +10,8 @@
|
|
|
init_count = 10 :: non_neg_integer(),
|
|
|
start_mfa :: {atom(), atom(), [term()]},
|
|
|
free_pids = [] :: [pid()],
|
|
|
- in_use_count = 0 :: non_neg_integer()
|
|
|
+ in_use_count = 0 :: non_neg_integer(),
|
|
|
+ free_count = 0 :: non_neg_integer()
|
|
|
}).
|
|
|
|
|
|
-record(state, {
|
|
@@ -167,6 +168,7 @@ code_change(_OldVsn, State, _Extra) ->
|
|
|
%% Internal Function Definitions
|
|
|
%% ------------------------------------------------------------------
|
|
|
|
|
|
+-spec props_to_pool([{atom(), term()}]) -> #pool{}.
|
|
|
props_to_pool(P) ->
|
|
|
#pool{ name = ?gv(name, P),
|
|
|
max_count = ?gv(max_count, P),
|
|
@@ -175,39 +177,39 @@ props_to_pool(P) ->
|
|
|
|
|
|
% FIXME: creation of new pids should probably happen
|
|
|
% in a spawned process to avoid tying up the loop.
|
|
|
+-spec add_pids(error | string(), non_neg_integer(), #state{}) ->
|
|
|
+ {bad_pool_name | max_count_reached | ok, #state{}}.
|
|
|
add_pids(error, _N, State) ->
|
|
|
{bad_pool_name, State};
|
|
|
add_pids(PoolName, N, State) ->
|
|
|
#state{pools = Pools, pool_sups = PoolSups,
|
|
|
all_members = AllMembers} = State,
|
|
|
Pool = dict:fetch(PoolName, Pools),
|
|
|
- #pool{max_count = Max, free_pids = Free, in_use_count = NumInUse} = Pool,
|
|
|
- Total = length(Free) + NumInUse,
|
|
|
+ #pool{max_count = Max, free_pids = Free,
|
|
|
+ in_use_count = NumInUse, free_count = NumFree} = Pool,
|
|
|
+ Total = NumFree + NumInUse,
|
|
|
case Total + N =< Max of
|
|
|
true ->
|
|
|
- Sup = dict:fetch(PoolName, PoolSups),
|
|
|
- NewPids =
|
|
|
- lists:map(fun(_I) ->
|
|
|
- {ok, Pid} = supervisor:start_child(Sup, []),
|
|
|
- erlang:link(Pid),
|
|
|
- Pid
|
|
|
- end, lists:seq(1, N)),
|
|
|
- AllMembers1 = lists:foldl(
|
|
|
- fun(M, Dict) ->
|
|
|
- Time = os:timestamp(),
|
|
|
- dict:store(M, {PoolName, free, Time}, Dict)
|
|
|
- end, AllMembers, NewPids),
|
|
|
- Pool1 = Pool#pool{free_pids = Free ++ NewPids},
|
|
|
+ PoolSup = dict:fetch(PoolName, PoolSups),
|
|
|
+ {AllMembers1, NewPids} = start_n_pids(N, PoolName, PoolSup,
|
|
|
+ AllMembers),
|
|
|
+ % should we sanity check or take length(Free ++ NewPids)
|
|
|
+ % as free_count?
|
|
|
+ Pool1 = Pool#pool{free_pids = Free ++ NewPids,
|
|
|
+ free_count = NumFree + N},
|
|
|
{ok, State#state{pools = dict:store(PoolName, Pool1, Pools),
|
|
|
all_members = AllMembers1}};
|
|
|
false ->
|
|
|
{max_count_reached, State}
|
|
|
end.
|
|
|
|
|
|
+-spec take_member(string(), pid(), #state{}) ->
|
|
|
+ {error_no_members | pid(), #state{}}.
|
|
|
take_member(PoolName, From, State) ->
|
|
|
#state{pools = Pools, consumer_to_pid = CPMap} = State,
|
|
|
Pool = dict:fetch(PoolName, Pools),
|
|
|
- #pool{max_count = Max, free_pids = Free, in_use_count = NumInUse} = Pool,
|
|
|
+ #pool{max_count = Max, free_pids = Free, in_use_count = NumInUse,
|
|
|
+ free_count = NumFree} = Pool,
|
|
|
case Free of
|
|
|
[] when NumInUse == Max ->
|
|
|
{error_no_members, State};
|
|
@@ -220,7 +222,8 @@ take_member(PoolName, From, State) ->
|
|
|
end;
|
|
|
[Pid|Rest] ->
|
|
|
erlang:link(From),
|
|
|
- Pool1 = Pool#pool{free_pids = Rest, in_use_count = NumInUse + 1},
|
|
|
+ Pool1 = Pool#pool{free_pids = Rest, in_use_count = NumInUse + 1,
|
|
|
+ free_count = NumFree - 1},
|
|
|
CPMap1 = dict:update(From, fun(O) -> [Pid|O] end, [Pid], CPMap),
|
|
|
AllMembers =
|
|
|
dict:update(Pid,
|
|
@@ -235,8 +238,10 @@ take_member(PoolName, From, State) ->
|
|
|
do_return_member2(Pid, ok, State = #state{all_members = AllMembers}) ->
|
|
|
{PoolName, _CPid, _} = dict:fetch(Pid, AllMembers),
|
|
|
Pool = dict:fetch(PoolName, State#state.pools),
|
|
|
- #pool{free_pids = Free, in_use_count = NumInUse} = Pool,
|
|
|
- Pool1 = Pool#pool{free_pids = [Pid | Free], in_use_count = NumInUse - 1},
|
|
|
+ #pool{free_pids = Free, in_use_count = NumInUse,
|
|
|
+ free_count = NumFree} = Pool,
|
|
|
+ Pool1 = Pool#pool{free_pids = [Pid | Free], in_use_count = NumInUse - 1,
|
|
|
+ free_count = NumFree + 1},
|
|
|
State#state{pools = dict:store(PoolName, Pool1, State#state.pools),
|
|
|
all_members = dict:store(Pid, {PoolName, free, os:timestamp()},
|
|
|
AllMembers)};
|
|
@@ -310,3 +315,34 @@ remove_pid(Pid, State) ->
|
|
|
erlang:get_stacktrace()}),
|
|
|
State
|
|
|
end.
|
|
|
+
|
|
|
+-spec max_free_pool(dict()) -> error_no_members | string().
|
|
|
+max_free_pool(Pools) ->
|
|
|
+ Fun = fun(Name, Pool, {CName, CMax}) ->
|
|
|
+ case Pool#pool.free_count > CMax of
|
|
|
+ true -> {Name, Pool#pool.free_count};
|
|
|
+ false -> {CName, CMax}
|
|
|
+ end
|
|
|
+ end,
|
|
|
+ case dict:fold(Fun, {"", 0}, Pools) of
|
|
|
+ {"", 0} -> error_no_members;
|
|
|
+ {MaxFreePoolName, _} -> MaxFreePoolName
|
|
|
+ end.
|
|
|
+
|
|
|
+-spec start_n_pids(non_neg_integer(), string(), pid(), dict()) ->
|
|
|
+ {dict(), [pid()]}.
|
|
|
+start_n_pids(N, PoolName, PoolSup, AllMembers) ->
|
|
|
+ NewPids = lists:map(
|
|
|
+ fun(_I) ->
|
|
|
+ {ok, Pid} = supervisor:start_child(PoolSup, []),
|
|
|
+ % FIXME: race condition here if child
|
|
|
+ % crashes early.
|
|
|
+ erlang:link(Pid),
|
|
|
+ Pid
|
|
|
+ end, lists:seq(1, N)),
|
|
|
+ AllMembers1 = lists:foldl(
|
|
|
+ fun(M, Dict) ->
|
|
|
+ Time = os:timestamp(),
|
|
|
+ dict:store(M, {PoolName, free, Time}, Dict)
|
|
|
+ end, AllMembers, NewPids),
|
|
|
+ {AllMembers1, NewPids}.
|