|
- -module(gproc_pool).
- -behavior(gen_server).
- -export([new/1,
- new/3,
- delete/1,
- force_delete/1,
- add_worker/2,
- add_worker/3,
- remove_worker/2,
- connect_worker/2,
- disconnect_worker/2,
- whereis_worker/2,
- worker_id/2,
- active_workers/1,
- defined_workers/1,
- worker_pool/1,
- pick/1,
- pick/2,
- pick_worker/1,
- pick_worker/2,
- claim/2,
- claim/3,
- log/1,
- randomize/1]).
- -export([start_link/0]).
- -export([init/1,
- handle_call/3,
- handle_cast/2,
- handle_info/2,
- terminate/2,
- code_change/3]).
- -export([test/1, test/3, ptest/4, test_run/2, test_run1/2, test_run2/2,
- test_run0/2, setup_test_pool/3, setup_test_pool/4,
- remove_test_pool/1]).
- -define(POOL(Pool), {p,l,{?MODULE,Pool}}).
- -define(POOL_CUR(Pool), {c,l,{?MODULE,Pool,cur}}).
- -define(POOL_WRK(Pool,Name), {c,l,{?MODULE,Pool,w,Name}}).
- -record(st, {}).
- new(Pool) ->
- new(Pool, round_robin, []).
- new(Pool, Type, Opts) when Type == round_robin;
- Type == random;
- Type == hash;
- Type == direct;
- Type == claim ->
- call({new, Pool, Type, Opts}).
- delete(Pool) ->
- call({delete, Pool}).
- force_delete(Pool) ->
-
-
-
-
-
- force_delete_(Pool).
- add_worker(Pool, Name) ->
- call({add_worker, Pool, Name}).
- add_worker(Pool, Name, Slot) ->
- call({add_worker, Pool, Name, Slot}).
- connect_worker(Pool, Name) ->
- gproc:reg(worker_id(Pool, Name), 0).
- disconnect_worker(Pool, Name) ->
- gproc:unreg(worker_id(Pool, Name)).
- remove_worker(Pool, Name) ->
- call({remove_worker, Pool, Name}).
- whereis_worker(Pool, Name) ->
- ID = worker_id(Pool, Name),
- gproc:where(ID).
- worker_id(Pool, Name) ->
- N = gproc:get_attribute(?POOL_WRK(Pool, Name), shared, n),
- {n, l, [?MODULE, Pool, N, Name]}.
- active_workers(Pool) ->
- gproc:select(
- {l,n},
- [{ {{n,l,[?MODULE,Pool,'$1','$2']},'$3','_'}, [{is_integer, '$1'}],
- [{{'$2', '$3'}}] }]).
- defined_workers(Pool) ->
- K = ?POOL(Pool),
- [{N, Pos, gproc:get_value(?POOL_WRK(Pool, N), shared)}
- || {N, Pos} <- get_workers_(K)].
- worker_pool(Pool) ->
- get_workers_(?POOL(Pool)).
- pick(Pool) ->
- case gproc:get_value(?POOL(Pool), shared) of
- {0, _} -> false;
- {Sz, Type} when Type == round_robin; Type == random ->
- pick(Pool, Sz, Type, name);
- _ ->
- error(badarg)
- end.
- pick_worker(Pool) ->
- case gproc:get_value(?POOL(Pool), shared) of
- {0, _} -> false;
- {Sz, Type} when Type == round_robin; Type == random ->
- pick(Pool, Sz, Type, pid);
- _ ->
- error(badarg)
- end.
- pick(Pool, N) ->
- case gproc:get_value(?POOL(Pool), shared) of
- {0, _} -> false;
- {Sz, Type} when Type == hash; Type == direct ->
- pick(Pool, Sz, Type, N, name);
- _ ->
- error(badarg)
- end.
- pick_worker(Pool, N) ->
- case gproc:get_value(?POOL(Pool), shared) of
- {0, _} -> false;
- {Sz, Type} when Type == hash; Type == direct ->
- pick(Pool, Sz, Type, N, pid);
- _ ->
- error(badarg)
- end.
- pick(Pool, Sz, round_robin, Ret) ->
- Next = incr(Pool, 1, Sz),
- case ets:next(gproc, {{n,l,[?MODULE,Pool,Next]},n}) of
- {{n,l,[?MODULE,Pool,Actual,_Name]} = Pick, _} ->
- case Actual - Next of
- Diff when Diff > 1 ->
- gproc:update_counter(
- ?POOL_CUR(Pool), shared, {Diff, Sz, 1}),
- ret(Pick, Ret);
- _ ->
- ret(Pick, Ret)
- end;
- _ ->
- case ets:next(gproc, {{n,l,[?MODULE,Pool,0]}, n}) of
- {{n,l,[?MODULE,Pool,Actual1,_Name1]} = Pick, _} ->
- incr(Pool, Sz-Next+Actual1, Sz),
-
-
- ret(Pick, Ret);
- _ ->
- false
- end
- end;
- pick(Pool, Sz, random, Ret) ->
- pick_near(Pool, crypto:rand_uniform(1, Sz + 1), Ret).
- pick(Pool, Sz, hash, Val, Ret) ->
- pick_near(Pool, erlang:phash2(Val, Sz) + 1, Ret);
- pick(Pool, Sz, direct, N, Ret) when is_integer(N), N > 0 ->
- pick_near(Pool, case (N rem Sz-1) + 1 of 0 -> Sz; N1 -> N1 end, Ret).
- pick_near(Pool, N, Ret) ->
- case ets:next(gproc, {{n,l,[?MODULE,Pool,N]}, n}) of
- {{n,l,[?MODULE,Pool,_,_]} = Pick, _} ->
- ret(Pick, Ret);
- _ ->
-
- case ets:next(gproc, {{n,l,[?MODULE,Pool,1]}, n}) of
- {{n,l,[?MODULE,Pool,_,_]} = Pick, _} ->
- ret(Pick, Ret);
- _ ->
- false
- end
- end.
- ret(Name, name) ->
- Name;
- ret(Name, pid) ->
- case ets:lookup(gproc, {Name,n}) of
- [{_, Pid, _}] ->
- Pid;
- [] ->
-
- false
- end.
- claim(Pool, F) when is_function(F, 2) ->
- claim(Pool, F, nowait).
- claim(Pool, F, Wait) ->
- case gproc:get_value(?POOL(Pool), shared) of
- {0, _} -> false;
- {_, claim} ->
- W = setup_wait(Wait, Pool),
- claim_w(Pool, F, W);
- _ ->
- error(badarg)
- end.
- claim_w(_Pool, _F, timeout) ->
- false;
- claim_w(Pool, F, W) ->
- case claim_(Pool, F) of
- false ->
- claim_w(Pool, F, do_wait(W));
- Other ->
- clear_wait(W),
- Other
- end.
- -define(CLAIM_CHUNK, 5).
- claim_(Pool, F) ->
-
- case ets:select(gproc, [{ {{{n,l,[?MODULE,Pool,'_','_']},n}, '$1', 0}, [],
- [{{ {element,1,{element,1,'$_'}}, '$1' }}]}],
- ?CLAIM_CHUNK) of
- {[_|_] = Workers, Cont} ->
- case try_claim(Workers, F) of
- {true, _} = True ->
- True;
- false ->
- claim_cont(Cont, F)
- end;
- _ ->
- false
- end.
- claim_cont('$end_of_table', _) ->
- false;
- claim_cont(Cont, F) ->
- case ets:select(Cont) of
- {[_|_] = Workers, Cont1} ->
- case try_claim(Workers, F) of
- {true, _} = True ->
- True;
- false ->
- claim_cont(Cont1, F)
- end;
- _ ->
- false
- end.
- try_claim([], _) ->
- false;
- try_claim([{K,Pid}|T], F) ->
- case try_claim(K, Pid, F) of
- false ->
- try_claim(T, F);
- Other ->
- Other
- end.
- try_claim(K, Pid, F) ->
- case gproc:update_counter(K, [0, {1, 1, 1}]) of
- [0, 1] ->
-
- try Res = F(K, Pid),
- {true, Res}
- after
- gproc:reset_counter(K)
- end;
- [1, 1] ->
-
- false
- end.
- setup_wait(nowait, _) ->
- nowait;
- setup_wait({busy_wait, MS}, Pool) ->
- Ref = erlang:send_after(MS, self(), {claim, Pool}),
- {busy_wait, Ref}.
- do_wait(nowait) ->
- timeout;
- do_wait({busy_wait, Ref} = W) ->
-
-
-
- erlang:yield(),
- case erlang:read_timer(Ref) of
- false ->
- erlang:cancel_timer(Ref),
- timeout;
- _ ->
- W
- end.
- clear_wait(nowait) ->
- ok;
- clear_wait({busy_wait, Ref}) ->
- erlang:cancel_timer(Ref),
- ok.
- log({n,l,[?MODULE,Pool,_,Name]}) ->
- gproc:update_shared_counter(?POOL_WRK(Pool,Name), 1).
- randomize(Pool) ->
- case pool_size(Pool) of
- 0 -> 0;
- 1 -> 1;
- Sz ->
- incr(Pool, crypto:rand_uniform(0, Sz), Sz)
- end.
- pool_size(Pool) ->
- {Sz, _} = gproc:get_value(?POOL(Pool), shared),
- Sz.
- start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
- init([]) ->
- {ok, #st{}}.
- call(Req) ->
- case gen_server:call(?MODULE, Req) of
- badarg ->
- error(badarg);
- {badarg, Reason} ->
- error(Reason);
- Reply ->
- Reply
- end.
- handle_call(Req, From, S) ->
- try handle_call_(Req, From, S)
- catch
- error:Reason ->
- {reply, {badarg, Reason}, S}
- end.
- handle_call_({new, Pool, Type, Opts}, _, S) ->
- new_(Pool, Type, Opts),
- {reply, ok, S};
- handle_call_({delete, Pool}, _, S) ->
- delete_(Pool),
- {reply, ok, S};
- handle_call_({force_delete, Pool}, _, S) ->
- force_delete_(Pool),
- {reply, ok, S};
- handle_call_({add_worker, Pool, Name}, _, S) ->
- N = add_worker_(Pool, Name),
- {reply, N, S};
- handle_call_({add_worker, Pool, Name, Pos}, _, S) ->
- N = add_worker_(Pool, Name, Pos),
- {reply, N, S};
- handle_call_({set_pool_size, Pool, Sz}, _, S) ->
- Workers = get_workers_(Pool),
- case get_last_worker_n(Workers) of
- N when N > Sz ->
- {reply, badarg, S};
- _ ->
- set_pool_size_(?POOL(Pool), Sz, Workers),
- {reply, true, S}
- end;
- handle_call_({remove_worker, Pool, Name}, _, S) ->
- ok = remove_worker_(Pool, Name),
- {reply, true, S}.
- handle_cast(_, S) ->
- {noreply, S}.
- handle_info(_, S) ->
- {noreply, S}.
- terminate(_, _) ->
- ok.
- code_change(_, S, _) ->
- {ok, S}.
- new_(Pool, Type, Opts) ->
- valid_type(Type),
- Size = proplists:get_value(size, Opts, 0),
- Workers = lists:seq(1, Size),
- K = ?POOL(Pool),
- try gproc:reg_shared(K, {Size, Type})
- catch
- error:_ -> error(exists)
- end,
- Opts1 =
- case lists:keyfind(auto_size, 1, Opts) of
- false ->
- Opts ++ [{auto_size, not lists:keymember(size, 1, Opts)}];
- {_, Bool} when is_boolean(Bool) ->
- Opts
- end,
- gproc:set_attributes_shared(K, Opts1),
- set_workers(K, Workers),
- gproc:reg_shared(?POOL_CUR(Pool), Size).
- valid_type(T) when T==round_robin; T==hash; T==random; T==direct; T==claim ->
- true;
- valid_type(_) ->
- error(invalid_type).
- set_pool_size_(K, Sz, Workers) ->
- {_, Type} = gproc:get_value(K, shared),
- case length(Workers) of
- Sz ->
- set_workers(K, Workers);
- Len when Len > Sz ->
- Workers1 = lists:sublist(Workers, 1, Sz),
- set_workers(K, Workers1);
- Len when Len < Sz ->
- Workers1 = Workers ++ lists:seq(Len+1, Sz),
- set_workers(K, Workers1)
- end,
- gproc:set_value_shared(K, {Sz, Type}).
- delete_(Pool) ->
- K = ?POOL(Pool),
- Ws = get_workers_(K),
- case [1 || {_,_} <- Ws] of
- [] ->
- gproc:unreg_shared(K),
- gproc:unreg_shared(?POOL_CUR(Pool));
- [_|_] ->
- error(not_empty)
- end.
- force_delete_(Pool) ->
- Props = gproc:select({l,p}, [{ {?POOL(Pool), '_', '_'}, [], ['$_']}]),
- Cur = gproc:select({l,c}, [{ {?POOL_CUR(Pool), '_', '_'}, [], ['$_']}]),
- Workers = gproc:select(
- {l,c}, [{ {?POOL_WRK(Pool,'_'), '_', '_'}, [], ['$_']}]),
- Names = find_names(Pool, '_'),
- lists:foreach(
- fun({Key, Pid, _}) when Pid == self() -> gproc:unreg(Key);
- ({_, Pid, _}) when is_pid(Pid) -> exit(Pid, kill)
- end, Names),
- [gproc:unreg_shared(W) || {W,shared,_} <- Cur ++ Props ++ Workers],
- true.
- find_names(Pool, Pid) ->
- gproc:select(
- {l,n}, [{ {{n,l,[?MODULE,Pool,Pid,'_']}, '_', '_'}, [], ['$_']}]).
- add_worker_(Pool, Name) ->
- K = ?POOL(Pool),
- {Sz, Type} = gproc:get_value(K, shared),
- AutoSz = gproc:get_attribute(K, shared, auto_size),
- Ws0 = get_workers_(K),
- {N,Ws1} =
- case lists:keymember(Name, 1, Ws0) of
- false ->
- case find_slot(Name, K, Ws0, Sz, Type, AutoSz) of
- {_, _} = Res ->
- Res;
- false ->
- error(pool_full)
- end;
- true ->
- error(exists)
- end,
- if N > Sz ->
- set_pool_size_(K, N, Ws1);
- true ->
-
- set_workers(K, Ws1)
- end,
- reg_worker(Pool, Name, N),
- N.
- add_worker_(Pool, Name, Pos) ->
- K = ?POOL(Pool),
- {Sz, _} = gproc:get_value(K, shared),
- Ws0 = get_workers_(K),
- if Pos > Sz ->
- case gproc:get_attribute(K, shared, auto_size) of
- true ->
- Ws1 = Ws0 ++ lists:seq(Sz+1,Pos-1) ++ [{Name, Pos}],
- set_pool_size_(K, Pos, Ws1);
- false ->
- error(out_of_range)
- end;
- true ->
- case lists:nth(Pos, Ws0) of
- {_,_} -> error(exists);
- P when is_integer(P) ->
- Ws1 = set_pos(Pos, Ws0, {Name, Pos}),
- set_workers(K, Ws1)
- end
- end,
- reg_worker(Pool, Name, Pos),
- Pos.
- reg_worker(Pool, Name, Pos) ->
- gproc:reg_shared(Wrk = ?POOL_WRK(Pool, Name), 0),
- gproc:set_attributes_shared(Wrk, [{n, Pos}]).
- remove_worker_(Pool, Name) ->
- case whereis_worker(Pool, Name) of
- Pid when is_pid(Pid) ->
- error({worker_connected, Pid});
- undefined ->
- do_remove_worker_(Pool, Name)
- end.
- do_remove_worker_(Pool, Name) ->
- K = ?POOL(Pool),
- Ws0 = get_workers_(K),
- Ws1 = del_slot(Name, Ws0),
- gproc:unreg_shared(?POOL_WRK(Pool, Name)),
- case (NewLen = length(Ws1)) - length(Ws0) of
- 0 -> ok;
- Diff when Diff < 0 ->
- {_, Type} = gproc:get_value(K, shared),
- gproc:set_value_shared(K, {NewLen, Type})
- end,
- gproc:set_attributes_shared(K, [{workers, Ws1}]),
- ok.
- del_slot(Name, [{Name,_}]) ->
- [];
- del_slot(Name, [{Name, Pos}|T]) ->
- [Pos|T];
- del_slot(Name, [H|T]) ->
- [H|del_slot(Name, T)].
- find_slot(Name, _, [], Sz, _, Auto) ->
- case {Sz, Auto} of
- {0, false} -> false;
- {_, _} ->
- {1, [{Name, 1}]}
- end;
- find_slot(Name, Key, Workers, Sz, Type, AutoSz) ->
- case get_strategy(Key, Type) of
- packed ->
- find_slot_packed(Name, Workers, AutoSz);
- sparse ->
- find_slot_sparse(Name, Workers, Sz, AutoSz)
- end.
- get_last_worker_n(Ws) ->
- get_last_worker_n(Ws, 0, 1).
- get_last_worker_n([{_,_}|T], _, P) ->
- get_last_worker_n(T, P, P+1);
- get_last_worker_n([H|T], Last, P) when is_integer(H) ->
- get_last_worker_n(T, Last, P+1);
- get_last_worker_n([], Last, _) ->
- Last.
- find_slot_packed(Name, Workers, AutoSz) ->
- find_slot_packed(Name, Workers, AutoSz, []).
- find_slot_packed(Name, [N|T], _, Acc) when is_integer(N) ->
- {N, lists:reverse(Acc) ++ [{Name, N}|T]};
- find_slot_packed(Name, [{_,Prev} = Last], true, Acc) ->
- New = Prev+1,
- {New, lists:reverse([{Name, New}, Last|Acc])};
- find_slot_packed(_, [_], false, _) ->
- false;
- find_slot_packed(Name, [{_,_} = H|T], Auto, Acc) ->
- find_slot_packed(Name, T, Auto, [H|Acc]).
- find_slot_sparse(Name, Ws, Sz, Auto) ->
-
-
- case lists:foldl(
- fun(N, {Prev, StartP, First, Last, Max, MaxP}) when is_integer(N) ->
- case Prev+1 of
- Gap when Gap > Max ->
- {Gap, StartP, First, Last, Gap, StartP};
- Gap ->
- {Gap, StartP, First, Last, Max, MaxP}
- end;
- (N, []) when is_integer(N) ->
-
- [];
- ({_, Pos}, []) ->
- {0, Pos, _First = Pos, _Last = Pos, 0, 0};
- ({_, Pos}, {Prev, StartP, First, _PrevLast, Max, MaxP}) ->
- if Prev > Max ->
- {0, Pos, First, Pos, Prev, StartP};
- true ->
- {0, Pos, First, Pos, Max, MaxP}
- end
- end, [], Ws) of
- [] ->
-
- case {Sz, Auto} of
- {0, false} ->
- false;
- {0, true} ->
- {1, [{Name, 1}]};
- {_, _} when is_integer(Sz), Sz > 0 ->
- {1, [{Name, 1}|tl(Ws)]}
- end;
- {_, _, 1, Last, 0, _} ->
-
- if Auto ->
- NewPos = Last + 1,
- {NewPos, Ws ++ [{Name, NewPos}]};
- true ->
- false
- end;
- {_, _, First, Last, MaxGap, StartPos} ->
- WrapGap = (Sz - Last) + First - 1,
- NewPos = if WrapGap >= MaxGap ->
- (Last + (WrapGap div 2) + 1) rem (Sz+1);
- true ->
- (StartPos + (MaxGap div 2) + 1) rem (Sz+1)
- end,
- {NewPos, set_pos(NewPos, Ws, {Name, NewPos})}
- end.
- set_pos(P, L, X) when P > 0, is_list(L) ->
- set_pos(P, 1, L, X).
- set_pos(P, P, [_|T], X) ->
- [X|T];
- set_pos(P, C, [H|T], X) when C < P ->
- [H|set_pos(P, C+1, T, X)].
- get_workers_(K) ->
- case gproc:get_attribute(K, shared, workers) of
- undefined ->
- [];
- L when is_list(L) ->
- L
- end.
- set_workers(K, L) when is_list(L) ->
- gproc:set_attributes_shared(K, [{workers, L}]).
- get_strategy(Key, Type) ->
- Default = case Type of
- round_robin -> packed;
- random -> sparse;
- hash -> sparse;
- direct -> packed;
- claim -> packed
- end,
- attribute(Key, fill_strategy, Default).
- attribute(Key, A, Default) ->
- case gproc:get_attribute(Key, shared, A) of
- undefined -> Default;
- Value -> Value
- end.
- incr(Pool, Incr, Sz) ->
- gproc:update_counter(?POOL_CUR(Pool), shared, {Incr, Sz, 1}).
- test(N) when N > 0 ->
- test(N, round_robin, []).
- test(N, Type, Opts) when Type==round_robin;
- Type==random;
- Type==hash;
- Type==direct;
- Type==claim ->
- P = ?LINE,
- setup_test_pool(P, Type, Opts),
- try timer:tc(?MODULE, f(Type), [N, P])
- after
- remove_test_pool(P)
- end.
- ptest(N, I, Type, Opts) ->
- P = ?LINE,
- setup_test_pool(P, Type, Opts),
- F = f(Type),
- Pids =
- [spawn_monitor(fun() -> exit({ok, timer:tc(?MODULE, F, [I, P])}) end)
- || _ <- lists:seq(1, N)],
- try collect(Pids)
- after
- remove_test_pool(P)
- end.
- collect(Pids) ->
- Results = [receive
- {'DOWN', Ref, _, _, Reason} ->
- Reason
- end || {_, Ref} <- Pids],
- {Times, Avgs} = lists:foldr(fun({ok, {T, Avg}}, {A,B}) ->
- {[T|A], [Avg|B]} end,
- {[],[]}, Results),
- {Times, lists:sum(Times)/length(Times),
- lists:sum(Avgs)/length(Avgs)}.
- f(Type) when Type==hash; Type==direct ->
- test_run1;
- f(Type) when Type==claim ->
- test_run2;
- f({empty,_}) ->
- test_run0;
- f(_) ->
- test_run.
- setup_test_pool(P, Type, Opts) ->
- setup_test_pool(P, Type, Opts, test_workers()).
- setup_test_pool(P, Type0, Opts, Workers) ->
- Type = case Type0 of {_, T} -> T; T when is_atom(T) -> T end,
- new(P, Type, Opts),
- [begin R = add_worker(P, W),
- io:fwrite("add_worker(~p, ~p) -> ~p; Ws = ~p~n",
- [P, W, R, get_workers_(?POOL(P))]),
- connect_worker(P, W)
- end || W <- Workers].
- remove_test_pool(P) ->
- io:fwrite("worker stats (~p):~n"
- "~p~n", [P, gproc:select(
- {l,c},
- [{ {{c,l,{?MODULE,P,w,'$1'}},'_','$2'}, [],
- [{{'$1','$2'}}] }])]),
- [begin disconnect_worker(P, W),
- remove_worker(P, W)
- end || W <- test_workers()],
- delete(P).
- test_workers() -> [a,b,c,d,e,f].
- test_run(N, P) ->
- test_run(N, P, 0, 0).
- test_run(N, P, S, M) when N > 0 ->
- {T, Worker} = timer:tc(?MODULE, pick, [P]),
- true = (Worker =/= false),
- log(Worker),
- timer:sleep(crypto:rand_uniform(1,50)),
- test_run(N-1, P, S+T, M+1);
- test_run(_, _, S, M) ->
- S/M.
- test_run1(N, P) ->
- test_run1(N, P, 0, 0).
- test_run1(N, P, S, M) when N > 0 ->
- {T, Worker} = timer:tc(?MODULE, pick, [P, N]),
- true = (Worker =/= false),
- log(Worker),
- timer:sleep(crypto:rand_uniform(1,50)),
- test_run1(N-1, P, S+T, M+1);
- test_run1(_, _, S, M) ->
- S/M.
- test_run2(N, P) ->
- test_run2(N, P, fun(K,_) ->
- R = log(K),
- timer:sleep(crypto:rand_uniform(1,50)),
- R
- end, 0, 0).
- test_run2(N, P, F, S, M) when N > 0 ->
- {T, {true, _}} = timer:tc(?MODULE, claim, [P, F, {busy_wait, 5000}]),
- test_run2(N-1, P, F, S+T, M+1);
- test_run2(_, _, _, S, M) ->
- S/M.
- test_run0(N, X) when N > 0 ->
- test_run0(N-1, X);
- test_run0(_, _) ->
- ok.
|