|
@@ -163,7 +163,8 @@ handle_call(take_member, {CPid, _Tag},
|
|
|
error_no_members ->
|
|
|
{reply, error_no_members, NewState};
|
|
|
MaxFreePoolName ->
|
|
|
- {NewPid, State2} = take_member(MaxFreePoolName, CPid, NewState),
|
|
|
+ {NewPid, State2} = take_member(MaxFreePoolName, CPid,
|
|
|
+ NewState),
|
|
|
{reply, NewPid, State2}
|
|
|
end;
|
|
|
{NewPid, NewState} ->
|
|
@@ -232,22 +233,21 @@ props_to_pool(P) ->
|
|
|
add_pids(error, _N, State) ->
|
|
|
{bad_pool_name, State};
|
|
|
add_pids(PoolName, N, State) ->
|
|
|
- #state{pools = Pools, pool_sups = PoolSups,
|
|
|
- all_members = AllMembers} = State,
|
|
|
- Pool = dict:fetch(PoolName, Pools),
|
|
|
+ #state{pools = Pools, all_members = AllMembers} = State,
|
|
|
+ Pool = fetch_pool(PoolName, Pools),
|
|
|
#pool{max_count = Max, free_pids = Free,
|
|
|
in_use_count = NumInUse, free_count = NumFree} = Pool,
|
|
|
Total = NumFree + NumInUse,
|
|
|
case Total + N =< Max of
|
|
|
true ->
|
|
|
- PoolSup = dict:fetch(PoolName, PoolSups),
|
|
|
+ PoolSup = dict:fetch(PoolName, State#state.pool_sups),
|
|
|
{AllMembers1, NewPids} = start_n_pids(N, PoolName, PoolSup,
|
|
|
AllMembers),
|
|
|
% should we sanity check or take length(Free ++ NewPids)
|
|
|
% as free_count?
|
|
|
Pool1 = Pool#pool{free_pids = Free ++ NewPids,
|
|
|
free_count = NumFree + N},
|
|
|
- {ok, State#state{pools = dict:store(PoolName, Pool1, Pools),
|
|
|
+ {ok, State#state{pools = store_pool(PoolName, Pool1, Pools),
|
|
|
all_members = AllMembers1}};
|
|
|
false ->
|
|
|
{max_count_reached, State}
|
|
@@ -257,7 +257,7 @@ add_pids(PoolName, N, State) ->
|
|
|
{error_no_members | pid(), #state{}}.
|
|
|
take_member(PoolName, From, State) ->
|
|
|
#state{pools = Pools, consumer_to_pid = CPMap} = State,
|
|
|
- Pool = dict:fetch(PoolName, Pools),
|
|
|
+ Pool = fetch_pool(PoolName, Pools),
|
|
|
#pool{max_count = Max, free_pids = Free, in_use_count = NumInUse,
|
|
|
free_count = NumFree} = Pool,
|
|
|
case Free of
|
|
@@ -274,29 +274,28 @@ take_member(PoolName, From, State) ->
|
|
|
erlang:link(From),
|
|
|
Pool1 = Pool#pool{free_pids = Rest, in_use_count = NumInUse + 1,
|
|
|
free_count = NumFree - 1},
|
|
|
- CPMap1 = dict:update(From, fun(O) -> [Pid|O] end, [Pid], CPMap),
|
|
|
- AllMembers =
|
|
|
- dict:update(Pid,
|
|
|
- fun({PName, free, Time}) -> {PName, From, Time} end,
|
|
|
- State#state.all_members),
|
|
|
- {Pid, State#state{pools = dict:store(PoolName, Pool1, Pools),
|
|
|
- consumer_to_pid = CPMap1,
|
|
|
- all_members = AllMembers}}
|
|
|
+ {Pid, State#state{
|
|
|
+ pools = store_pool(PoolName, Pool1, Pools),
|
|
|
+ consumer_to_pid = add_member_to_consumer(Pid, From, CPMap),
|
|
|
+ all_members = set_cpid_for_member(Pid, From,
|
|
|
+ State#state.all_members)
|
|
|
+ }}
|
|
|
end.
|
|
|
|
|
|
-spec do_return_member(pid(), ok | fail, #state{}) -> #state{}.
|
|
|
-do_return_member(Pid, ok, State = #state{all_members = AllMembers}) ->
|
|
|
- {PoolName, CPid, _} = dict:fetch(Pid, AllMembers),
|
|
|
- Pool = dict:fetch(PoolName, State#state.pools),
|
|
|
+do_return_member(Pid, ok, State = #state{}) ->
|
|
|
+ {PoolName, CPid, _} = dict:fetch(Pid, State#state.all_members),
|
|
|
+ Pool = fetch_pool(PoolName, State#state.pools),
|
|
|
#pool{free_pids = Free, in_use_count = NumInUse,
|
|
|
free_count = NumFree} = Pool,
|
|
|
Pool1 = Pool#pool{free_pids = [Pid | Free], in_use_count = NumInUse - 1,
|
|
|
free_count = NumFree + 1},
|
|
|
- State#state{pools = dict:store(PoolName, Pool1, State#state.pools),
|
|
|
- all_members = dict:store(Pid, {PoolName, free, os:timestamp()},
|
|
|
- AllMembers),
|
|
|
- consumer_to_pid = cpmap_remove(Pid, CPid,
|
|
|
- State#state.consumer_to_pid)};
|
|
|
+ Entry = {PoolName, free, os:timestamp()},
|
|
|
+ State#state{pools = store_pool(PoolName, Pool1, State#state.pools),
|
|
|
+ all_members = store_all_members(Pid, Entry,
|
|
|
+ State#state.all_members),
|
|
|
+ consumer_to_pid = cpmap_remove(Pid, CPid,
|
|
|
+ State#state.consumer_to_pid)};
|
|
|
do_return_member(Pid, fail, State = #state{all_members = AllMembers}) ->
|
|
|
% for the fail case, perhaps the member crashed and was alerady
|
|
|
% removed, so use find instead of fetch and ignore missing.
|
|
@@ -352,18 +351,18 @@ remove_pid(Pid, State) ->
|
|
|
case dict:find(Pid, AllMembers) of
|
|
|
{ok, {PoolName, free, _Time}} ->
|
|
|
% remove an unused member
|
|
|
- Pool = dict:fetch(PoolName, Pools),
|
|
|
+ Pool = fetch_pool(PoolName, Pools),
|
|
|
FreePids = lists:delete(Pid, Pool#pool.free_pids),
|
|
|
NumFree = Pool#pool.free_count - 1,
|
|
|
Pool1 = Pool#pool{free_pids = FreePids, free_count = NumFree},
|
|
|
exit(Pid, kill),
|
|
|
- State#state{pools = dict:store(PoolName, Pool1, Pools),
|
|
|
+ State#state{pools = store_pool(PoolName, Pool1, Pools),
|
|
|
all_members = dict:erase(Pid, AllMembers)};
|
|
|
{ok, {PoolName, CPid, _Time}} ->
|
|
|
- Pool = dict:fetch(PoolName, Pools),
|
|
|
+ Pool = fetch_pool(PoolName, Pools),
|
|
|
Pool1 = Pool#pool{in_use_count = Pool#pool.in_use_count - 1},
|
|
|
exit(Pid, kill),
|
|
|
- State#state{pools = dict:store(PoolName, Pool1, Pools),
|
|
|
+ State#state{pools = store_pool(PoolName, Pool1, Pools),
|
|
|
consumer_to_pid = cpmap_remove(Pid, CPid, CPMap),
|
|
|
all_members = dict:erase(Pid, AllMembers)};
|
|
|
error ->
|
|
@@ -401,7 +400,34 @@ start_n_pids(N, PoolName, PoolSup, AllMembers) ->
|
|
|
end, lists:seq(1, N)),
|
|
|
AllMembers1 = lists:foldl(
|
|
|
fun(M, Dict) ->
|
|
|
- Time = os:timestamp(),
|
|
|
- dict:store(M, {PoolName, free, Time}, Dict)
|
|
|
+ Entry = {PoolName, free, os:timestamp()},
|
|
|
+ store_all_members(M, Entry, Dict)
|
|
|
end, AllMembers, NewPids),
|
|
|
{AllMembers1, NewPids}.
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+-spec fetch_pool(string(), dict()) -> #pool{}.
|
|
|
+fetch_pool(PoolName, Pools) ->
|
|
|
+ dict:fetch(PoolName, Pools).
|
|
|
+
|
|
|
+-spec store_pool(string(), #pool{}, dict()) -> dict().
|
|
|
+store_pool(PoolName, Pool = #pool{}, Pools) ->
|
|
|
+ dict:store(PoolName, Pool, Pools).
|
|
|
+
|
|
|
+-spec store_all_members(pid(),
|
|
|
+ {string(), free | pid(), {_, _, _}}, dict()) ->
|
|
|
+ dict().
|
|
|
+store_all_members(Pid, Val = {_PoolName, _CPid, _Time}, AllMembers) ->
|
|
|
+ dict:store(Pid, Val, AllMembers).
|
|
|
+
|
|
|
+-spec set_cpid_for_member(pid(), pid(), dict()) -> dict().
|
|
|
+set_cpid_for_member(MemberPid, CPid, AllMembers) ->
|
|
|
+ dict:update(MemberPid,
|
|
|
+ fun({PoolName, free, Time = {_, _, _}}) ->
|
|
|
+ {PoolName, CPid, Time}
|
|
|
+ end, AllMembers).
|
|
|
+
|
|
|
+-spec add_member_to_consumer(pid(), pid(), dict()) -> dict().
|
|
|
+add_member_to_consumer(MemberPid, CPid, CPMap) ->
|
|
|
+ dict:update(CPid, fun(O) -> [MemberPid|O] end, [MemberPid], CPMap).
|