Просмотр исходного кода

Add start_members_sync/3 to pooler_starter

This function starts Count members in parallel and returns the list of
new member pids.
Seth Falcon 12 лет назад
Родитель
Сommit
ccf2cd92e3
1 измененных файлов с 48 добавлено и 5 удалено
  1. 48 5
      src/pooler_starter.erl

+ 48 - 5
src/pooler_starter.erl

@@ -56,6 +56,14 @@ start_member(Starter, #pool{} = Pool) ->
     gen_server:cast(Starter, {start_member, Pool, Ref}),
     Ref.
 
+%% @doc Start `Count' members in parallel and return a list of created
+%% members. The returned list may contain fewer than `Count' members
+%% if errors occured for some member starts.
+-spec start_members_sync(atom() | pid(), #pool{},
+                         non_neg_integer()) -> [pid()].
+start_members_sync(Starter, #pool{} = Pool, Count) ->
+    gen_server:call(Starter, {start_members_sync, Pool, Count}, infinity).
+
 %% ------------------------------------------------------------------
 %% gen_server Function Definitions
 %% ------------------------------------------------------------------
@@ -64,10 +72,12 @@ start_member(Starter, #pool{} = Pool) ->
 init([]) ->
     {ok, {}}.
 
-handle_call(stop, _From, Pool) ->
-    {stop, normal, stop_ok, Pool};
-handle_call(_Request, _From, Pool) ->
-    {noreply, Pool}.
+handle_call({start_members_sync, Pool, Count}, _From, State) ->
+    {reply, do_start_members_sync(Pool, Count), State};
+handle_call(stop, _From, State) ->
+    {stop, normal, stop_ok, State};
+handle_call(_Request, _From, State) ->
+    {noreply, State}.
 
 handle_cast({start_member, Pool, Ref}, State) ->
     ok = do_start_member(Pool, Ref),
@@ -86,7 +96,40 @@ do_start_member(#pool{name = PoolName,
             pooler:accept_member(PoolName, {Ref, Error}),
             ok
     end.
-            
+
+do_start_members_sync(#pool{name = PoolName,
+                            member_sup = PoolSup}, Count) ->
+    Parent = self(),
+    StarterPids = [ launch_starter(Parent, PoolName, PoolSup)
+                    || _I <- lists:seq(1, Count) ],
+    gather_pids(StarterPids, []).
+
+launch_starter(Parent, PoolName, PoolSup) ->
+    proc_lib:spawn_link(fun() ->
+                                Result = start_or_log_error(PoolName, PoolSup),
+                                Parent ! {self(), Result}
+                        end).
+
+start_or_log_error(PoolName, PoolSup) ->
+    case supervisor:start_child(PoolSup, []) of
+        {ok, Pid} ->
+            {ok, Pid};
+        Error ->
+            error_logger:error_msg("pool '~s' failed to start member: ~p",
+                                   [PoolName, Error]),
+            error
+    end.
+
+gather_pids([Pid|Rest], Acc) ->
+    receive
+        {Pid, error} ->
+            gather_pids(Rest, Acc);
+        {Pid, {ok, MemberPid}} ->
+            gather_pids(Rest, [MemberPid | Acc])
+    end;
+gather_pids([], Acc) ->
+    Acc.
+
 -spec handle_info(_, _) -> {'noreply', _}.
 handle_info(_Info, State) ->
     {noreply, State}.