|
@@ -70,6 +70,8 @@
|
|
worker_pool/1, % (Pool)
|
|
worker_pool/1, % (Pool)
|
|
pick/1, % (Pool)
|
|
pick/1, % (Pool)
|
|
pick/2, % (Pool, Value)
|
|
pick/2, % (Pool, Value)
|
|
|
|
+ pick_worker/1, % (Pool)
|
|
|
|
+ pick_worker/2, % (Pool, Value)
|
|
claim/2, % (Pool, Fun)
|
|
claim/2, % (Pool, Fun)
|
|
log/1, % (WorkerId)
|
|
log/1, % (WorkerId)
|
|
randomize/1]). % (Pool)
|
|
randomize/1]). % (Pool)
|
|
@@ -91,7 +93,7 @@
|
|
|
|
|
|
-record(st, {}).
|
|
-record(st, {}).
|
|
|
|
|
|
-%% @spec new(Pool::any()) -> true
|
|
|
|
|
|
+%% @spec new(Pool::any()) -> ok
|
|
%%
|
|
%%
|
|
%% @equiv new(Pool, round_robin, [])
|
|
%% @equiv new(Pool, round_robin, [])
|
|
new(Pool) ->
|
|
new(Pool) ->
|
|
@@ -294,7 +296,21 @@ pick(Pool) ->
|
|
case gproc:get_value(?POOL(Pool), shared) of
|
|
case gproc:get_value(?POOL(Pool), shared) of
|
|
{0, _} -> false;
|
|
{0, _} -> false;
|
|
{Sz, Type} when Type == round_robin; Type == random ->
|
|
{Sz, Type} when Type == round_robin; Type == random ->
|
|
- pick(Pool, Sz, Type);
|
|
|
|
|
|
+ pick(Pool, Sz, Type, name);
|
|
|
|
+ _ ->
|
|
|
|
+ error(badarg)
|
|
|
|
+ end.
|
|
|
|
+
|
|
|
|
+%% @spec pick_worker(Pool::any()) -> pid() | false
|
|
|
|
+%% @doc Pick a worker pid from the pool given the pool's load-balancing algorithm.
|
|
|
|
+%%
|
|
|
|
+%% Like {@link pick/1}, but returns the worker pid instead of the name.
|
|
|
|
+%% @end
|
|
|
|
+pick_worker(Pool) ->
|
|
|
|
+ case gproc:get_value(?POOL(Pool), shared) of
|
|
|
|
+ {0, _} -> false;
|
|
|
|
+ {Sz, Type} when Type == round_robin; Type == random ->
|
|
|
|
+ pick(Pool, Sz, Type, pid);
|
|
_ ->
|
|
_ ->
|
|
error(badarg)
|
|
error(badarg)
|
|
end.
|
|
end.
|
|
@@ -315,57 +331,82 @@ pick(Pool, N) ->
|
|
case gproc:get_value(?POOL(Pool), shared) of
|
|
case gproc:get_value(?POOL(Pool), shared) of
|
|
{0, _} -> false;
|
|
{0, _} -> false;
|
|
{Sz, Type} when Type == hash; Type == direct ->
|
|
{Sz, Type} when Type == hash; Type == direct ->
|
|
- pick(Pool, Sz, Type, N);
|
|
|
|
|
|
+ pick(Pool, Sz, Type, N, name);
|
|
_ ->
|
|
_ ->
|
|
error(badarg)
|
|
error(badarg)
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
+%% @spec pick_worker(Pool::any(), Value::any()) -> pid() | false
|
|
|
|
+%% @doc Pick a worker pid from the pool given the pool's load-balancing algorithm.
|
|
|
|
+%%
|
|
|
|
+%% Like {@link pick/2}, but returns the worker pid instead of the name.
|
|
|
|
+%% @end
|
|
|
|
+pick_worker(Pool, N) ->
|
|
|
|
+ case gproc:get_value(?POOL(Pool), shared) of
|
|
|
|
+ {0, _} -> false;
|
|
|
|
+ {Sz, Type} when Type == hash; Type == direct ->
|
|
|
|
+ pick(Pool, Sz, Type, N, pid);
|
|
|
|
+ _ ->
|
|
|
|
+ error(badarg)
|
|
|
|
+ end.
|
|
|
|
|
|
-pick(Pool, Sz, round_robin) ->
|
|
|
|
|
|
+pick(Pool, Sz, round_robin, Ret) ->
|
|
Next = incr(Pool, 1, Sz),
|
|
Next = incr(Pool, 1, Sz),
|
|
- case gproc:next({l,n}, {n,l,[?MODULE,Pool,Next]}) of
|
|
|
|
- {n,l,[?MODULE,Pool,Actual,_Name]} = Pick ->
|
|
|
|
|
|
+ case ets:next(gproc, {{n,l,[?MODULE,Pool,Next]},n}) of
|
|
|
|
+ {{n,l,[?MODULE,Pool,Actual,_Name]} = Pick, _} ->
|
|
case Actual - Next of
|
|
case Actual - Next of
|
|
Diff when Diff > 1 ->
|
|
Diff when Diff > 1 ->
|
|
gproc:update_counter(
|
|
gproc:update_counter(
|
|
?POOL_CUR(Pool), shared, {Diff, Sz, 1}),
|
|
?POOL_CUR(Pool), shared, {Diff, Sz, 1}),
|
|
- Pick;
|
|
|
|
|
|
+ ret(Pick, Ret);
|
|
_ ->
|
|
_ ->
|
|
- Pick
|
|
|
|
|
|
+ ret(Pick, Ret)
|
|
end;
|
|
end;
|
|
_ ->
|
|
_ ->
|
|
- case gproc:next({l,n}, {n,l,[?MODULE,Pool,0]}) of
|
|
|
|
- {n,l,[?MODULE,Pool,Actual1,_Name1]} = Pick ->
|
|
|
|
|
|
+ case ets:next(gproc, {{n,l,[?MODULE,Pool,0]}, n}) of
|
|
|
|
+ {{n,l,[?MODULE,Pool,Actual1,_Name1]} = Pick, _} ->
|
|
incr(Pool, Sz-Next+Actual1, Sz),
|
|
incr(Pool, Sz-Next+Actual1, Sz),
|
|
%% gproc:update_counter(
|
|
%% gproc:update_counter(
|
|
%% ?POOL_CUR(Pool), shared, {Sz-Next+Actual1, Sz, 1}),
|
|
%% ?POOL_CUR(Pool), shared, {Sz-Next+Actual1, Sz, 1}),
|
|
- Pick;
|
|
|
|
|
|
+ ret(Pick, Ret);
|
|
_ ->
|
|
_ ->
|
|
false
|
|
false
|
|
end
|
|
end
|
|
end;
|
|
end;
|
|
-pick(Pool, Sz, random) ->
|
|
|
|
- pick_near(Pool, crypto:rand_uniform(1, Sz + 1)).
|
|
|
|
-
|
|
|
|
-pick(Pool, Sz, hash, Val) ->
|
|
|
|
- pick_near(Pool, erlang:phash2(Val, Sz) + 1);
|
|
|
|
-pick(Pool, Sz, direct, N) when is_integer(N), N > 0 ->
|
|
|
|
- pick_near(Pool, case (N rem Sz-1) + 1 of 0 -> Sz; N1 -> N1 end).
|
|
|
|
-
|
|
|
|
-pick_near(Pool, N) ->
|
|
|
|
- case gproc:next({l,n}, {n,l,[?MODULE,Pool,N]}) of
|
|
|
|
- {n,l,[?MODULE,Pool,_,_]} = Pick ->
|
|
|
|
- Pick;
|
|
|
|
|
|
+pick(Pool, Sz, random, Ret) ->
|
|
|
|
+ pick_near(Pool, crypto:rand_uniform(1, Sz + 1), Ret).
|
|
|
|
+
|
|
|
|
+pick(Pool, Sz, hash, Val, Ret) ->
|
|
|
|
+ pick_near(Pool, erlang:phash2(Val, Sz) + 1, Ret);
|
|
|
|
+pick(Pool, Sz, direct, N, Ret) when is_integer(N), N > 0 ->
|
|
|
|
+ pick_near(Pool, case (N rem Sz-1) + 1 of 0 -> Sz; N1 -> N1 end, Ret).
|
|
|
|
+
|
|
|
|
+pick_near(Pool, N, Ret) ->
|
|
|
|
+ case ets:next(gproc, {{n,l,[?MODULE,Pool,N]}, n}) of
|
|
|
|
+ {{n,l,[?MODULE,Pool,_,_]} = Pick, _} ->
|
|
|
|
+ ret(Pick, Ret);
|
|
_ ->
|
|
_ ->
|
|
%% wrap
|
|
%% wrap
|
|
- case gproc:next({l,n}, {n,l,[?MODULE,Pool,1]}) of
|
|
|
|
- {n,l,[?MODULE,Pool,_,_]} = Pick ->
|
|
|
|
- Pick;
|
|
|
|
|
|
+ case ets:next(gproc, {{n,l,[?MODULE,Pool,1]}, n}) of
|
|
|
|
+ {{n,l,[?MODULE,Pool,_,_]} = Pick, _} ->
|
|
|
|
+ ret(Pick, Ret);
|
|
_ ->
|
|
_ ->
|
|
false
|
|
false
|
|
end
|
|
end
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
+ret(Name, name) ->
|
|
|
|
+ Name;
|
|
|
|
+ret(Name, pid) ->
|
|
|
|
+ case ets:lookup(gproc, {Name,n}) of
|
|
|
|
+ [{_, Pid, _}] ->
|
|
|
|
+ Pid;
|
|
|
|
+ [] ->
|
|
|
|
+ %% possible race
|
|
|
|
+ false
|
|
|
|
+ end.
|
|
|
|
+
|
|
|
|
+
|
|
%% @spec claim(Pool::any(), Fun::function()) -> {true, Res} | false
|
|
%% @spec claim(Pool::any(), Fun::function()) -> {true, Res} | false
|
|
%% @doc Picks the first available worker in the pool and applies `Fun'.
|
|
%% @doc Picks the first available worker in the pool and applies `Fun'.
|
|
%%
|
|
%%
|