Browse Source

wire up supervisors for application:start(pidq)

Refactor pidq for using supervisors.
Seth Falcon 14 years ago
parent
commit
6711d69cfe
5 changed files with 90 additions and 49 deletions
  1. 3 0
      README.org
  2. 25 0
      pidq.config.example
  3. 1 0
      rebar.config
  4. 53 42
      src/pidq.erl
  5. 8 7
      src/pidq_sup.erl

+ 3 - 0
README.org

@@ -197,6 +197,9 @@ supervisor:start_child(S1, []).
 {ok, S2} = pidq_sup:start_link([]).
 supervisor:start_child(pidq_pool_sup, [A1]).
 
+application:load(pidq).
+C = application:get_all_env(pidq).
+pidq:start(C).
 #+END_SRC
 
 *** supervision strategy

+ 25 - 0
pidq.config.example

@@ -0,0 +1,25 @@
+% -*- mode: erlang -*-
+% pidq app config
+[
+ {pidq, [
+         {pools, [
+                  [{name, "rc8081"},
+                   {max_count, 5},
+                   {init_count, 3},
+                   {start_mfa,
+                    {riak_pb_socket, start_link, ["localhost", 8081]}}],
+
+                  [{name, "rc8082"},
+                   {max_count, 5},
+                   {init_count, 3},
+                   {start_mfa,
+                    {riak_pb_socket, start_link, ["localhost", 8082]}}],
+
+                  [{name, "rc8083"},
+                   {max_count, 5},
+                   {init_count, 3},
+                   {start_mfa,
+                    {riak_pb_socket, start_link, ["localhost", 8083]}}]
+                 ]}
+        ]}
+].

+ 1 - 0
rebar.config

@@ -0,0 +1 @@
+{erl_opts, [debug_info]}.

+ 53 - 42
src/pidq.erl

@@ -4,21 +4,22 @@
 
 -include_lib("eunit/include/eunit.hrl").
 
--record(pool, {name,
-               max_pids = 100,
-               min_free = 3,
-               init_size = 10,
-               pid_starter_args = [],
-               free_pids,
-               in_use_count}).
+-record(pool, {
+          name             :: string(),
+          max_count = 100  :: non_neg_integer(),
+          init_count = 10  :: non_neg_integer(),
+          start_mfa        :: {atom(), atom(), [term()]},
+          free_pids = []   :: [pid()],
+          in_use_count = 0 :: non_neg_integer()
+         }).
 
 -record(state, {
-          npools,
-          pools = dict:new(),
-          in_use_pids = dict:new(),
-          consumer_to_pid = dict:new(),
-          pid_starter,
-          pid_stopper}).
+          npools                       :: non_neg_integer(),
+          pools = dict:new()           :: dict:dictionary(),
+          pool_sups = dict:new()       :: dict:dictionary(),
+          in_use_pids = dict:new()     :: dict:dictionary(),
+          consumer_to_pid = dict:new() :: dict:dictionary()
+         }).
 
 -define(gv(X, Y), proplists:get_value(X, Y)).
 -define(gv(X, Y, D), proplists:get_value(X, Y, D)).
@@ -41,7 +42,12 @@
 %% gen_server Function Exports
 %% ------------------------------------------------------------------
 
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
+-export([init/1,
+         handle_call/3,
+         handle_cast/2,
+         handle_info/2,
+         terminate/2,
+         code_change/3]).
 
 %% ------------------------------------------------------------------
 %% API Function Definitions
@@ -81,13 +87,21 @@ status() ->
 %% ------------------------------------------------------------------
 
 init(Config) ->
+    ?debugMsg("in pidq init"),
     PoolRecs = [ props_to_pool(P) || P <- ?gv(pools, Config) ],
+    ?debugVal(hd(PoolRecs)),
     Pools = [ {Pool#pool.name, Pool} || Pool <-  PoolRecs ],
-    State = #state{pid_starter = ?gv(pid_starter, Config),
-                   pid_stopper = ?gv(pid_stopper, Config,
-                                     {?MODULE, default_stopper}),
-                   npools = length(Pools),
-                   pools = dict:from_list(Pools)},
+    ?debugMsg("have list"),
+    PoolSups = [ {P#pool.name,
+                  element(2, supervisor:start_child(pidq_pool_sup,
+                                                    [P#pool.start_mfa]))}
+                 || P <- PoolRecs ],
+    ?debugMsg("called start_child"),
+    % TODO: create initial workers here
+    State = #state{npools = length(Pools),
+                   pools = dict:from_list(Pools),
+                   pool_sups = dict:from_list(PoolSups)
+                  },
     process_flag(trap_exit, true),
     {ok, State}.
 
@@ -150,40 +164,40 @@ code_change(_OldVsn, State, _Extra) ->
 %% Internal Function Definitions
 %% ------------------------------------------------------------------
 
-% default_stopper(Pid) ->
-%     exit(Pid, kill).
-
 props_to_pool(P) ->
-    Defs = [{free_pids, []}, {in_use_count, 0}],
-    % a record is just a tagged tuple
-    P2 = lists:append(Defs, P),
-    Values = [ ?gv(Field, P2) || Field <- record_info(fields, pool) ],
-    list_to_tuple([pool|Values]).
+    #pool{      name = ?gv(name, P),
+           max_count = ?gv(max_count, P),
+          init_count = ?gv(init_count, P),
+           start_mfa = ?gv(start_mfa, P)}.
 
+% FIXME: creation of new pids should probably happen
+% in a spawned process to avoid typing up the loop.
 add_pids(error, _N, State) ->
     {bad_pool_name, State};
 add_pids(PoolName, N, State) ->
-    #state{pools = Pools, pid_starter = {M, F}} = State,
+    #state{pools = Pools, pool_sups = PoolSups} = State,
     Pool = dict:fetch(PoolName, Pools),
-    #pool{max_pids = Max, free_pids = Free, in_use_count = NumInUse,
-          pid_starter_args = Args} = Pool,
+    #pool{max_count = Max, free_pids = Free, in_use_count = NumInUse} = Pool,
     Total = length(Free) + NumInUse,
     case Total + N =< Max of
         true ->
-            % FIXME: we'll want to link to these pids so we'll know if
-            % they crash. Or should the starter function be expected
-            % to do spawn_link?
-            NewPids = [ apply(M, F, Args) || _X <- lists:seq(1, N) ],
-            Pool1 = Pool#pool{free_pids = lists:append(Free, NewPids)},
+            Sup = dict:fetch(PoolName, PoolSups),
+            NewPids =
+                lists:map(fun(_I) ->
+                                  {ok, Pid} = supervisor:start_child(Sup, []),
+                                  erlang:link(Pid),
+                                  Pid
+                          end, lists:seq(1, N)),
+            Pool1 = Pool#pool{free_pids = Free ++ NewPids},
             {ok, State#state{pools = dict:store(PoolName, Pool1, Pools)}};
         false ->
-            {max_pids_reached, State}
+            {max_count_reached, State}
     end.
 
 take_pid(PoolName, From, State) ->
     #state{pools = Pools, in_use_pids = InUse, consumer_to_pid = CPMap} = State,
     Pool = dict:fetch(PoolName, Pools),
-    #pool{max_pids = Max, free_pids = Free, in_use_count = NumInUse} = Pool,
+    #pool{max_count = Max, free_pids = Free, in_use_count = NumInUse} = Pool,
     case Free of
         [] when NumInUse == Max ->
             {error_no_pids, State};
@@ -191,12 +205,10 @@ take_pid(PoolName, From, State) ->
             case add_pids(PoolName, 1, State) of
                 {ok, State1} ->
                     take_pid(PoolName, From, State1);
-                {max_pids_reached, _} ->
+                {max_count_reached, _} ->
                     {error_no_pids, State}
             end;
         [Pid|Rest] ->
-            % FIXME: handle min_free here -- should adding pids
-            % to satisfy min_free be done in a spawned worker?
             erlang:link(From),
             Pool1 = Pool#pool{free_pids = Rest, in_use_count = NumInUse + 1},
             CPMap1 = dict:update(From, fun(O) -> [Pid|O] end, [Pid], CPMap),
@@ -229,8 +241,7 @@ add_pid_to_free(Pid, Pool) ->
     Pool#pool{free_pids = [Pid|Free], in_use_count = NumInUse - 1}.
 
 handle_failed_pid(Pid, PoolName, Pool, State) ->
-    {M, F} = State#state.pid_stopper,
-    M:F(Pid),
+    exit(Pid, kill),
     {_, NewState} = add_pids(PoolName, 1, State),
     NumInUse = Pool#pool.in_use_count,
     {Pool#pool{in_use_count = NumInUse - 1}, NewState}.

+ 8 - 7
src/pidq_sup.erl

@@ -2,15 +2,16 @@
 
 -behaviour(supervisor).
 
--export([start_link/1, init/1]).
+-export([start_link/0, init/1]).
 
-start_link(Config) ->
-    supervisor:start_link({local, ?MODULE}, ?MODULE, [Config]).
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
-init(Config) ->
-    % Pidq = {pidq, {pidq, start_link, [Config]},
-    %         permanent, 5000, worker, [pidq]},
+init([]) ->
+    Config = application:get_all_env(),
+    Pidq = {pidq, {pidq, start_link, [Config]},
+            permanent, 5000, worker, [pidq]},
     PidqPool = {pidq_pool_sup, {pidq_pool_sup, start_link, []},
                 permanent, 5000, supervisor, [pidq_pool_sup]},
-    {ok, {{one_for_one, 5, 10}, [PidqPool]}}.
+    {ok, {{one_for_one, 5, 10}, [PidqPool, Pidq]}}.