|
@@ -275,7 +275,9 @@ add_pids(PoolName, N, State) ->
|
|
true -> ok;
|
|
true -> ok;
|
|
false ->
|
|
false ->
|
|
error_logger:error_msg("tried to add ~B members, only added ~B~n",
|
|
error_logger:error_msg("tried to add ~B members, only added ~B~n",
|
|
- [N, NewPidCount])
|
|
|
|
|
|
+ [N, NewPidCount]),
|
|
|
|
+ send_metric(<<"pooler.events">>,
|
|
|
|
+ {add_pids_failed, N, NewPidCount}, history)
|
|
end,
|
|
end,
|
|
Pool1 = Pool#pool{free_pids = Free ++ NewPids,
|
|
Pool1 = Pool#pool{free_pids = Free ++ NewPids,
|
|
free_count = length(Free) + NewPidCount},
|
|
free_count = length(Free) + NewPidCount},
|
|
@@ -288,23 +290,30 @@ add_pids(PoolName, N, State) ->
|
|
-spec take_member(string(), pid(), #state{}) ->
|
|
-spec take_member(string(), pid(), #state{}) ->
|
|
{error_no_members | pid(), #state{}}.
|
|
{error_no_members | pid(), #state{}}.
|
|
take_member(PoolName, From, #state{pools = Pools, consumer_to_pid = CPMap} = State) ->
|
|
take_member(PoolName, From, #state{pools = Pools, consumer_to_pid = CPMap} = State) ->
|
|
|
|
+ send_metric(pool_metric(PoolName, take_rate), 1, meter),
|
|
Pool = fetch_pool(PoolName, Pools),
|
|
Pool = fetch_pool(PoolName, Pools),
|
|
#pool{max_count = Max, free_pids = Free, in_use_count = NumInUse,
|
|
#pool{max_count = Max, free_pids = Free, in_use_count = NumInUse,
|
|
free_count = NumFree} = Pool,
|
|
free_count = NumFree} = Pool,
|
|
case Free of
|
|
case Free of
|
|
[] when NumInUse =:= Max ->
|
|
[] when NumInUse =:= Max ->
|
|
|
|
+ send_metric(<<"pooler.error_no_members_count">>, {inc, 1}, counter),
|
|
|
|
+ send_metric(<<"pooler.events">>, error_no_members, history),
|
|
{error_no_members, State};
|
|
{error_no_members, State};
|
|
[] when NumInUse < Max ->
|
|
[] when NumInUse < Max ->
|
|
case add_pids(PoolName, 1, State) of
|
|
case add_pids(PoolName, 1, State) of
|
|
{ok, State1} ->
|
|
{ok, State1} ->
|
|
take_member(PoolName, From, State1);
|
|
take_member(PoolName, From, State1);
|
|
{max_count_reached, _} ->
|
|
{max_count_reached, _} ->
|
|
|
|
+ send_metric(<<"pooler.error_no_members_count">>, {inc, 1}, counter),
|
|
|
|
+ send_metric(<<"pooler.events">>, error_no_members, history),
|
|
{error_no_members, State}
|
|
{error_no_members, State}
|
|
end;
|
|
end;
|
|
[Pid|Rest] ->
|
|
[Pid|Rest] ->
|
|
erlang:link(From),
|
|
erlang:link(From),
|
|
Pool1 = Pool#pool{free_pids = Rest, in_use_count = NumInUse + 1,
|
|
Pool1 = Pool#pool{free_pids = Rest, in_use_count = NumInUse + 1,
|
|
free_count = NumFree - 1},
|
|
free_count = NumFree - 1},
|
|
|
|
+ send_metric(pool_metric(PoolName, in_use_count), Pool1#pool.in_use_count, histogram),
|
|
|
|
+ send_metric(pool_metric(PoolName, free_count), Pool1#pool.free_count, histogram),
|
|
{Pid, State#state{
|
|
{Pid, State#state{
|
|
pools = store_pool(PoolName, Pool1, Pools),
|
|
pools = store_pool(PoolName, Pool1, Pools),
|
|
consumer_to_pid = add_member_to_consumer(Pid, From, CPMap),
|
|
consumer_to_pid = add_member_to_consumer(Pid, From, CPMap),
|
|
@@ -339,7 +348,9 @@ do_return_member(Pid, fail, #state{all_members = AllMembers} = State) ->
|
|
State2;
|
|
State2;
|
|
{Status, _} ->
|
|
{Status, _} ->
|
|
erlang:error({error, "unexpected return from add_pid",
|
|
erlang:error({error, "unexpected return from add_pid",
|
|
- Status, erlang:get_stacktrace()})
|
|
|
|
|
|
+ Status, erlang:get_stacktrace()}),
|
|
|
|
+ send_metric(<<"pooler.events">>, bad_return_from_add_pid,
|
|
|
|
+ history)
|
|
end;
|
|
end;
|
|
error ->
|
|
error ->
|
|
State
|
|
State
|
|
@@ -387,18 +398,21 @@ remove_pid(Pid, State) ->
|
|
NumFree = Pool#pool.free_count - 1,
|
|
NumFree = Pool#pool.free_count - 1,
|
|
Pool1 = Pool#pool{free_pids = FreePids, free_count = NumFree},
|
|
Pool1 = Pool#pool{free_pids = FreePids, free_count = NumFree},
|
|
exit(Pid, kill),
|
|
exit(Pid, kill),
|
|
|
|
+ send_metric(<<"pooler.killed_free_count">>, {inc, 1}, counter),
|
|
State#state{pools = store_pool(PoolName, Pool1, Pools),
|
|
State#state{pools = store_pool(PoolName, Pool1, Pools),
|
|
all_members = dict:erase(Pid, AllMembers)};
|
|
all_members = dict:erase(Pid, AllMembers)};
|
|
{ok, {PoolName, CPid, _Time}} ->
|
|
{ok, {PoolName, CPid, _Time}} ->
|
|
Pool = fetch_pool(PoolName, Pools),
|
|
Pool = fetch_pool(PoolName, Pools),
|
|
Pool1 = Pool#pool{in_use_count = Pool#pool.in_use_count - 1},
|
|
Pool1 = Pool#pool{in_use_count = Pool#pool.in_use_count - 1},
|
|
exit(Pid, kill),
|
|
exit(Pid, kill),
|
|
|
|
+ send_metric(<<"pooler.killed_in_use_count">>, {inc, 1}, counter),
|
|
State#state{pools = store_pool(PoolName, Pool1, Pools),
|
|
State#state{pools = store_pool(PoolName, Pool1, Pools),
|
|
consumer_to_pid = cpmap_remove(Pid, CPid, CPMap),
|
|
consumer_to_pid = cpmap_remove(Pid, CPid, CPMap),
|
|
all_members = dict:erase(Pid, AllMembers)};
|
|
all_members = dict:erase(Pid, AllMembers)};
|
|
error ->
|
|
error ->
|
|
error_logger:error_report({unknown_pid, Pid,
|
|
error_logger:error_report({unknown_pid, Pid,
|
|
erlang:get_stacktrace()}),
|
|
erlang:get_stacktrace()}),
|
|
|
|
+ send_metric(<<"pooler.event">>, unknown_pid, history),
|
|
State
|
|
State
|
|
end.
|
|
end.
|
|
|
|
|
|
@@ -495,3 +509,19 @@ expired_free_members(Members, Now, MaxAgeMin) ->
|
|
Micros = 60 * 1000 * 1000,
|
|
Micros = 60 * 1000 * 1000,
|
|
[ MI || MI = {_, {_, free, LastReturn}} <- Members,
|
|
[ MI || MI = {_, {_, free, LastReturn}} <- Members,
|
|
timer:now_diff(Now, LastReturn) > (MaxAgeMin * Micros) ].
|
|
timer:now_diff(Now, LastReturn) > (MaxAgeMin * Micros) ].
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+-spec send_metric(binary(), term(), atom()) -> ok.
|
|
|
|
+%% Send a metric using the metrics module from application config or
|
|
|
|
+%% do nothing.
|
|
|
|
+send_metric(Name, Value, Type) ->
|
|
|
|
+ case application:get_env(pooler, metrics_module) of
|
|
|
|
+ undefined -> ok;
|
|
|
|
+ Mod -> Mod:notify(Name, Value, Type)
|
|
|
|
+ end,
|
|
|
|
+ ok.
|
|
|
|
+
|
|
|
|
+-spec pool_metric(string(), string() | binary()) -> binary().
|
|
|
|
+pool_metric(PoolName, Metric) ->
|
|
|
|
+ iolist_to_binary([<<"pooler.">>, PoolName, ".",
|
|
|
|
+ atom_to_binary(Metric, utf8)]).
|