Browse Source

Add load balancing support using pg2 and an ets table

Pool configs with a group value other than 'undefined' are added to a
pg2 process group at pool startup. The pool joins the group when it
receives a timeout message triggered by init. This is done in order to
ensure that the pool is ready to serve requests as soon as it can be
seen as a member of a group.

In order to provide a simple API where only the group name is needed
to take and return a member, members that are access via the new
take_group_member/1 function are added to an ETS table that maps the
member pid to the pid of the pool to which the member belongs. When
returning a member via return_group_member/3 the ETS table is used to
locate the pid of the appropriate pool and return_member is then
called. At present the group name is not actually used upon return,
but keeping the symmetry seems reasonable and will allow more options
for some alternative implementations later if desired.

Whenever a member is returned, the code calls do_return_member and
this function now ensure that a call to clean_group_table is
made. Only when the pool's group value is other than 'undefined' this
function attempts to delete the entry for the member pid in the ETS
table that is used for tracking groups.

The pooler_sup is responsible for starting the ETS table. If a pool
crashes, it could orphan entries in the ETS table. The pg2 group will
remove processes that die.
Seth Falcon 12 years ago
parent
commit
670d5eb764
4 changed files with 184 additions and 3 deletions
  1. 86 2
      src/pooler.erl
  2. 2 0
      src/pooler.hrl
  3. 1 0
      src/pooler_sup.erl
  4. 95 1
      test/pooler_tests.erl

+ 86 - 2
src/pooler.erl

@@ -32,6 +32,9 @@
          start_link/1,
          stop/1,
          take_member/1,
+         take_group_member/1,
+         return_group_member/2,
+         return_group_member/3,
          return_member/2,
          return_member/3,
          pool_stats/1]).
@@ -73,6 +76,68 @@ stop(Name) ->
 take_member(PoolName) when is_atom(PoolName) orelse is_pid(PoolName) ->
     gen_server:call(PoolName, take_member, infinity).
 
+%% @doc Take a member from a randomly selected member of the group
+%% `GroupName'. Returns `MemberPid' or `error_no_members'.  If no
+%% members are available in the randomly chosen pool, all other pools
+%% in the group are tried in order.
+-spec take_group_member(atom()) -> pid() | error_no_members.
+take_group_member(GroupName) ->
+    case pg2:get_local_members(GroupName) of
+        {error, {no_such_group, GroupName}} = Error ->
+            Error;
+        Members ->
+            %% Put a random member at the front of the list and then
+            %% return the first member you can walking the list.
+            Idx = crypto:rand_uniform(1, length(Members) + 1),
+            {Pid, Rest} = extract_nth(Idx, Members),
+            take_first_member([Pid | Rest])
+    end.
+
+take_first_member([Pid | Rest]) ->
+    case take_member(Pid) of
+        error_no_members ->
+            take_first_member(Rest);
+        Member ->
+            ets:insert(?POOLER_GROUP_TABLE, {Member, Pid}),
+            Member
+    end;
+take_first_member([]) ->
+    error_no_members.
+
+%% this helper function returns `{Nth_Elt, Rest}' where `Nth_Elt' is
+%% the nth element of `L' and `Rest' is `L -- [Nth_Elt]'.
+extract_nth(N, L) ->
+    extract_nth(N, L, []).
+
+extract_nth(1, [H | T], Acc) ->
+    {H, Acc ++ T};
+extract_nth(N, [H | T], Acc) ->
+    extract_nth(N - 1, T, [H | Acc]);
+extract_nth(_, [], _) ->
+    error(badarg).
+
+%% @doc Return a member that was taken from the group
+%% `GroupName'. This is a convenience function for
+%% `return_group_member/3' with `Status' of `ok'.
+-spec return_group_member(atom(), pid() | error_no_members) -> ok.
+return_group_member(GroupName, MemberPid) ->
+    return_group_member(GroupName, MemberPid, ok).
+
+%% @doc Return a member that was taken from the group `GroupName'. If
+%% `Status' is `ok' the member is returned to the pool from which is
+%% came. If `Status' is `fail' the member will be terminated and a new
+%% member added to the appropriate pool.
+-spec return_group_member(atom(), pid() | error_no_members, ok | fail) -> ok.
+return_group_member(_, error_no_members, _) ->
+    ok;
+return_group_member(_GroupName, MemberPid, Status) ->
+    case ets:lookup(?POOLER_GROUP_TABLE, MemberPid) of
+        [{MemberPid, PoolPid}] ->
+            return_member(PoolPid, MemberPid, Status);
+        [] ->
+            ok
+    end.
+
 %% @doc Return a member to the pool so it can be reused.
 %%
 %% If `Status' is 'ok', the member is returned to the pool.  If
@@ -110,7 +175,7 @@ pool_stats(PoolName) ->
 %% gen_server Function Definitions
 %% ------------------------------------------------------------------
 
--spec init(#pool{}) -> {'ok', #pool{}}.
+-spec init(#pool{}) -> {'ok', #pool{}, 0}.
 init(#pool{}=Pool) ->
     %% FIXME: change to a monitor only model so that this doesn't have
     %% to be a system process.
@@ -119,7 +184,12 @@ init(#pool{}=Pool) ->
     MemberSup = pooler_pool_sup:member_sup_name(Pool),
     Pool1 = set_member_sup(Pool, MemberSup),
     Pool2 = cull_members_from_pool(Pool1),
-    add_pids(N, Pool2).
+    {ok, NewPool} = add_pids(N, Pool2),
+    %% trigger an immediate timeout, handled by handle_info to allow
+    %% us to register with pg2. We use the timeout mechanism to ensure
+    %% that a server is added to a group only when it is ready to
+    %% process messages.
+    {ok, NewPool, 0}.
 
 set_member_sup(#pool{} = Pool, MemberSup) ->
     Pool#pool{member_sup = MemberSup}.
@@ -142,6 +212,13 @@ handle_cast(_Msg, Pool) ->
     {noreply, Pool}.
 
 -spec handle_info(_, _) -> {'noreply', _}.
+handle_info(timeout, #pool{group = undefined} = Pool) ->
+    %% ignore
+    {noreply, Pool};
+handle_info(timeout, #pool{group = Group} = Pool) ->
+    ok = pg2:create(Group),
+    ok = pg2:join(Group, self()),
+    {noreply, Pool};
 handle_info({'EXIT', Pid, Reason}, State) ->
     State1 =
         case dict:find(Pid, State#pool.all_members) of
@@ -261,6 +338,7 @@ take_member_from_pool(#pool{name = PoolName,
 
 -spec do_return_member(pid(), ok | fail, #pool{}) -> #pool{}.
 do_return_member(Pid, ok, #pool{all_members = AllMembers} = Pool) ->
+    clean_group_table(Pid, Pool),
     case dict:find(Pid, AllMembers) of
         {ok, {PoolName, CPid, _}} ->
             #pool{free_pids = Free, in_use_count = NumInUse,
@@ -277,6 +355,7 @@ do_return_member(Pid, ok, #pool{all_members = AllMembers} = Pool) ->
 do_return_member(Pid, fail, #pool{all_members = AllMembers} = Pool) ->
     % for the fail case, perhaps the member crashed and was alerady
     % removed, so use find instead of fetch and ignore missing.
+    clean_group_table(Pid, Pool),
     case dict:find(Pid, AllMembers) of
         {ok, {_PoolName, _, _}} ->
             Pool1 = remove_pid(Pid, Pool),
@@ -294,6 +373,11 @@ do_return_member(Pid, fail, #pool{all_members = AllMembers} = Pool) ->
             Pool
     end.
 
+clean_group_table(_MemberPid, #pool{group = undefined}) ->
+    ok;
+clean_group_table(MemberPid, #pool{group = _GroupName}) ->
+    ets:delete(?POOLER_GROUP_TABLE, MemberPid).
+
 % @doc Remove `Pid' from the pid list associated with `CPid' in the
 % consumer to member map given by `CPMap'.
 %

+ 2 - 0
src/pooler.hrl

@@ -2,6 +2,8 @@
 -define(DEFAULT_CULL_INTERVAL, {0, min}).
 -define(DEFAULT_MAX_AGE, {0, min}).
 
+-define(POOLER_GROUP_TABLE, pooler_group_table).
+
 -type member_info() :: {string(), free | pid(), {_, _, _}}.
 -type free_member_info() :: {string(), free, {_, _, _}}.
 -type time_unit() :: min | sec | ms | mu.

+ 1 - 0
src/pooler_sup.erl

@@ -14,6 +14,7 @@ init([]) ->
     {ok, Config} = application:get_env(pooler, pools),
     Pools = [ pooler_config:list_to_pool(L) || L <- Config ],
     PoolSupSpecs = [ pool_sup_spec(Pool) || Pool <- Pools ],
+    ets:new(?POOLER_GROUP_TABLE, [set, public, named_table, {write_concurrency, true}]),
     {ok, {{one_for_one, 5, 60}, PoolSupSpecs}}.
 
 pool_sup_spec(#pool{name = Name} = Pool) ->

+ 95 - 1
test/pooler_tests.erl

@@ -119,7 +119,7 @@ pooler_basics_test_() ->
                        {start_mfa,
                         {pooled_gs, start_link, [{"type-0"}]}}]],
              application:set_env(pooler, pools, Pools),
-             %% error_logger:delete_report_handler(error_logger_tty_h),
+             error_logger:delete_report_handler(error_logger_tty_h),
              application:start(crypto),
              application:start(pooler)
      end,
@@ -265,6 +265,100 @@ pooler_basics_test_() ->
        end}
      ]}}.
 
+pooler_groups_test_() ->
+    {setup,
+     fun() ->
+             application:set_env(pooler, metrics_module, fake_metrics),
+             fake_metrics:start_link()
+     end,
+     fun(_X) ->
+             fake_metrics:stop()
+     end,
+    {foreach,
+     % setup
+     fun() ->
+             Pools = [[{name, test_pool_1},
+                       {group, group_1},
+                       {max_count, 3},
+                       {init_count, 2},
+                       {start_mfa,
+                        {pooled_gs, start_link, [{"type-1-1"}]}}],
+                      [{name, test_pool_2},
+                       {group, group_1},
+                       {max_count, 3},
+                       {init_count, 2},
+                       {start_mfa,
+                        {pooled_gs, start_link, [{"type-1-2"}]}}],
+                      %% test_pool_3 not part of the group
+                      [{name, test_pool_3},
+                       {group, undefined},
+                       {max_count, 3},
+                       {init_count, 2},
+                       {start_mfa,
+                        {pooled_gs, start_link, [{"type-3"}]}}]
+                     ],
+             application:set_env(pooler, pools, Pools),
+             %% error_logger:delete_report_handler(error_logger_tty_h),
+             application:start(crypto),
+             pg2:start(),
+             application:start(pooler)
+     end,
+     fun(_X) ->
+             application:stop(pooler)
+     end,
+     [
+      {"take and return one group member (repeated)",
+       fun() ->
+               Types = [ begin
+                             Pid = pooler:take_group_member(group_1),
+                             {Type, _} = pooled_gs:get_id(Pid),
+                             ?assertMatch("type-1" ++ _, Type),
+                             ok = pooler:return_group_member(group_1, Pid, ok),
+                             Type
+                         end
+                         || _I <- lists:seq(1, 50) ],
+               Type_1_1 = [ X || "type-1-1" = X <- Types ],
+               Type_1_2 = [ X || "type-1-2" = X <- Types ],
+               ?assert(length(Type_1_1) > 0),
+               ?assert(length(Type_1_2) > 0)
+       end},
+
+      {"take member from unknown group",
+       fun() ->
+               ?assertEqual({error, {no_such_group, not_a_group}},
+                            pooler:take_group_member(not_a_group))
+       end},
+
+      {"return member to group, implied ok",
+       fun() ->
+               Pid = pooler:take_group_member(group_1),
+               ?assertEqual(ok, pooler:return_group_member(group_1, Pid))
+       end},
+
+      {"return error_no_member to group",
+       fun() ->
+               ?assertEqual(ok, pooler:return_group_member(group_1, error_no_members))
+       end},
+      
+
+      {"exhaust pools in group",
+       fun() ->
+               Pids = [ pooler:take_group_member(group_1) || _I <- lists:seq(1, 6) ],
+               %% they should all be pids
+               [ begin
+                     {Type, _} = pooled_gs:get_id(P),
+                     ?assertMatch("type-1" ++ _, Type),
+                     ok
+                 end || P <- Pids ],
+               %% further attempts should be error
+               [error_no_members,
+                error_no_members,
+                error_no_members] = [ pooler:take_group_member(group_1)
+                                      || _I <- lists:seq(1, 3) ]
+       end}
+     ]}}.
+               
+
 pooler_limit_failed_adds_test_() ->
     %% verify that pooler crashes completely if too many failures are
     %% encountered while trying to add pids.