Просмотр исходного кода

Merge remote branch 'esl/master'

Ulf Wiger 13 лет назад
Родитель
Сommit
0b0156f0eb
9 измененных файлов с 210 добавлено и 38 удалено
  1. 2 0
      .gitignore
  2. 23 6
      doc/gproc.md
  3. 11 1
      doc/gproc_lib.md
  4. 1 1
      src/gproc.app.src
  5. 29 11
      src/gproc.erl
  6. 2 1
      src/gproc_dist.erl
  7. 46 2
      src/gproc_lib.erl
  8. 40 10
      test/gproc_dist_tests.erl
  9. 56 6
      test/gproc_tests.erl

+ 2 - 0
.gitignore

@@ -2,3 +2,5 @@ current_counterexample.eqc
 deps/
 ebin/
 .eunit/
+*~
+*/*~

+ 23 - 6
doc/gproc.md

@@ -10,7 +10,8 @@ Module gproc
 * [Function Details](#functions)
 
 
-Extended process registry.
+Extended process registry  
+This module implements an extended process registry.
 
 
 
@@ -22,13 +23,32 @@ __Authors:__ Ulf Wiger ([`ulf.wiger@erlang-consulting.com`](mailto:ulf.wiger@erl
 
 
 
-This module implements an extended process registry
 
 
 For a detailed description, see
 [erlang07-wiger.pdf](erlang07-wiger.pdf).
 
-Type and scope for registration and lookup:
+
+
+<h2>Tuning Gproc performance</h2>
+
+
+
+
+
+Gproc relies on a central server and an ordered-set ets table.
+Effort is made to perform as much work as possible in the client without
+sacrificing consistency. A few things can be tuned by setting the following
+application environment variables in the top application of `gproc`
+(usually `gproc`):
+
+* `{ets_options, list()}` - Currently, the options `{write_concurrency, F}`
+and `{read_concurrency, F}` are allowed. The default is
+`[{write_concurrency, true}, {read_concurrency, true}]`
+* `{server_options, list()}` - These will be passed as spawn options when
+starting the `gproc` and `gproc_dist` servers. Default is `[]`. It is
+likely that `{priority, high | max}` and/or increasing `min_heap_size`
+will improve performance.
 
 
 
@@ -156,9 +176,6 @@ a = aggregate_counter
 <pre>unique_id() = {n | a, <a href="#type-scope">scope()</a>, any()}</pre>
 
 
-Type and scope for select(), qlc() and stepping:
-
-
 <h2><a name="index">Function Index</a></h2>
 
 

+ 11 - 1
doc/gproc_lib.md

@@ -28,7 +28,7 @@ For a detailed description, see gproc/doc/erlang07-wiger.pdf.
 
 
 
-<table width="100%" border="1" cellspacing="0" cellpadding="2" summary="function index"><tr><td valign="top"><a href="#await-3">await/3</a></td><td></td></tr><tr><td valign="top"><a href="#do_set_counter_value-3">do_set_counter_value/3</a></td><td></td></tr><tr><td valign="top"><a href="#do_set_value-3">do_set_value/3</a></td><td></td></tr><tr><td valign="top"><a href="#ensure_monitor-2">ensure_monitor/2</a></td><td></td></tr><tr><td valign="top"><a href="#insert_many-4">insert_many/4</a></td><td></td></tr><tr><td valign="top"><a href="#insert_reg-4">insert_reg/4</a></td><td></td></tr><tr><td valign="top"><a href="#remove_many-4">remove_many/4</a></td><td></td></tr><tr><td valign="top"><a href="#remove_reg-2">remove_reg/2</a></td><td></td></tr><tr><td valign="top"><a href="#update_aggr_counter-3">update_aggr_counter/3</a></td><td></td></tr><tr><td valign="top"><a href="#update_counter-3">update_counter/3</a></td><td></td></tr></table>
+<table width="100%" border="1" cellspacing="0" cellpadding="2" summary="function index"><tr><td valign="top"><a href="#await-3">await/3</a></td><td></td></tr><tr><td valign="top"><a href="#do_set_counter_value-3">do_set_counter_value/3</a></td><td></td></tr><tr><td valign="top"><a href="#do_set_value-3">do_set_value/3</a></td><td></td></tr><tr><td valign="top"><a href="#ensure_monitor-2">ensure_monitor/2</a></td><td></td></tr><tr><td valign="top"><a href="#insert_many-4">insert_many/4</a></td><td></td></tr><tr><td valign="top"><a href="#insert_reg-4">insert_reg/4</a></td><td></td></tr><tr><td valign="top"><a href="#remove_many-4">remove_many/4</a></td><td></td></tr><tr><td valign="top"><a href="#remove_reg-2">remove_reg/2</a></td><td></td></tr><tr><td valign="top"><a href="#update_aggr_counter-3">update_aggr_counter/3</a></td><td></td></tr><tr><td valign="top"><a href="#update_counter-3">update_counter/3</a></td><td></td></tr><tr><td valign="top"><a href="#valid_opts-2">valid_opts/2</a></td><td></td></tr></table>
 
 
 
@@ -140,3 +140,13 @@ For a detailed description, see gproc/doc/erlang07-wiger.pdf.
 
 `update_counter(Key, Incr, Pid) -> any()`
 
+<a name="valid_opts-2"></a>
+
+<h3>valid_opts/2</h3>
+
+
+
+
+
+`valid_opts(Type, Default) -> any()`
+

+ 1 - 1
src/gproc.app.src

@@ -5,7 +5,7 @@
 {application, gproc,
  [
   {description, "GPROC"},
-  {vsn, "0.2.7"},
+  {vsn, "0.2.8"},
   {id, "GPROC"},
   {registered, [ ] },
   %% NOTE: do not list applications which are load-only!

+ 29 - 11
src/gproc.erl

@@ -16,12 +16,29 @@
 %% @author Ulf Wiger <ulf.wiger@erlang-consulting.com>
 %%
 %% @doc Extended process registry
-%% <p>This module implements an extended process registry</p>
-%% <p>For a detailed description, see
-%% <a href="erlang07-wiger.pdf">erlang07-wiger.pdf</a>.</p>
+%% This module implements an extended process registry
 %%
-%% Type and scope for registration and lookup:
+%% For a detailed description, see
+%% <a href="erlang07-wiger.pdf">erlang07-wiger.pdf</a>.
 %%
+%% <h2>Tuning Gproc performance</h2>
+%%
+%% Gproc relies on a central server and an ordered-set ets table.
+%% Effort is made to perform as much work as possible in the client without
+%% sacrificing consistency. A few things can be tuned by setting the following
+%% application environment variables in the top application of `gproc'
+%% (usually `gproc'):
+%%
+%% * `{ets_options, list()}' - Currently, the options `{write_concurrency, F}'
+%%   and `{read_concurrency, F}' are allowed. The default is
+%%   `[{write_concurrency, true}, {read_concurrency, true}]'
+%% * `{server_options, list()}' - These will be passed as spawn options when 
+%%   starting the `gproc' and `gproc_dist' servers. Default is `[]'. It is 
+%%   likely that `{priority, high | max}' and/or increasing `min_heap_size'
+%%   will improve performance.
+%%
+%% @end
+
 %% @type type()  = n | p | c | a. n = name; p = property; c = counter;
 %%                                a = aggregate_counter
 %% @type scope() = l | g. l = local registration; g = global registration
@@ -29,8 +46,6 @@
 %% @type reg_id() = {type(), scope(), any()}.
 %% @type unique_id() = {n | a, scope(), any()}.
 %%
-%% Type and scope for select(), qlc() and stepping:
-%%
 %% @type sel_scope() = scope | all | global | local.
 %% @type sel_type() = type() | names | props | counters | aggr_counters.
 %% @type context() = {scope(), type()} | type(). {'all','all'} is the default
@@ -42,8 +57,8 @@
 %% @type pidpat() = pid() | sel_var().
 %% sel_var() = DollarVar | '_'.
 %% @type sel_pattern() = [{headpat(), Guards, Prod}].
-%% @type key()   = {type(), scope(), any()}
-%% @end
+%% @type key()   = {type(), scope(), any()}.
+
 -module(gproc).
 -behaviour(gen_server).
 
@@ -140,7 +155,9 @@
 %% @end
 start_link() ->
     _ = create_tabs(),
-    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+    SpawnOpts = gproc_lib:valid_opts(server_options, []),
+    gen_server:start_link({local, ?SERVER}, ?MODULE, [],
+			  [{spawn_opt, SpawnOpts}]).
 
 %% spec(Name::any()) -> true
 %%
@@ -1403,14 +1420,15 @@ pid_to_give_away_to({T,l,_} = Key) when T==n; T==a ->
     end.
 
 create_tabs() ->
+    Opts = gproc_lib:valid_opts(ets_options, [{write_concurrency,true},
+					      {read_concurrency, true}]),
     case ets:info(?TAB, name) of
         undefined ->
-            ets:new(?TAB, [ordered_set, public, named_table]);
+            ets:new(?TAB, [ordered_set, public, named_table | Opts]);
         _ ->
             ok
     end.
 
-
 %% @hidden
 init([]) ->
     set_monitors(),

+ 2 - 1
src/gproc_dist.erl

@@ -70,8 +70,9 @@ start_link(all) ->
 start_link(Nodes) when is_list(Nodes) ->
     start_link({Nodes, []});
 start_link({Nodes, Opts}) ->
+    SpawnOpts = gproc_lib:valid_opts(server_options, []),
     gen_leader:start_link(
-      ?SERVER, Nodes, Opts, ?MODULE, [], []).
+      ?SERVER, Nodes, Opts, ?MODULE, [], [{spawn_opt, SpawnOpts}]).
 
 %% ==========================================================
 %% API

+ 46 - 2
src/gproc_lib.erl

@@ -30,7 +30,8 @@
          remove_many/4,
          remove_reg/2,
          update_aggr_counter/3,
-         update_counter/3]).
+         update_counter/3,
+	 valid_opts/2]).
 
 -include("gproc.hrl").
 
@@ -172,7 +173,11 @@ maybe_waiters(K, Pid, Value, T, Info) ->
 notify_waiters(Waiters, K, Pid, V) ->
     _ = [begin
              P ! {gproc, Ref, registered, {K, Pid, V}},
-             ets:delete(?TAB, {P, K})
+             case P of 
+		 Pid -> ignore;
+		 _ ->
+		     ets:delete(?TAB, {P, K})
+	     end
          end || {P, Ref} <- Waiters],
     ok.
 
@@ -261,3 +266,42 @@ scan_existing_counters(Ctxt, Name) ->
     Head = {{{c,Ctxt,Name},'_'},'_','$1'},
     Cs = ets:select(?TAB, [{Head, [], ['$1']}]),
     lists:sum(Cs).
+
+
+valid_opts(Type, Default) ->
+    Opts = get_app_env(Type, Default),
+    check_opts(Type, Opts).
+
+check_opts(Type, Opts) when is_list(Opts) ->
+    Check = check_option_f(Type),
+    lists:map(fun(X) ->
+		      case Check(X) of
+			  true -> X;
+			  false ->
+			      erlang:error({illegal_option, X}, [Type, Opts])
+		      end
+	      end, Opts);
+check_opts(Type, Other) ->
+    erlang:error(invalid_options, [Type, Other]).
+
+check_option_f(ets_options)    -> fun check_ets_option/1;
+check_option_f(server_options) -> fun check_server_option/1.
+
+check_ets_option({read_concurrency , B}) -> is_boolean(B);
+check_ets_option({write_concurrency, B}) -> is_boolean(B);
+check_ets_option(_) -> false.
+
+check_server_option({priority, P}) ->
+    %% Forbid setting priority to 'low' since that would
+    %% surely cause problems. Unsure about 'max'...
+    lists:member(P, [normal, high, max]);
+check_server_option(_) ->
+    %% assume it's a valid spawn option
+    true.
+
+get_app_env(Key, Default) ->
+    case application:get_env(Key) of
+	undefined       -> Default;
+	{ok, undefined} -> Default;
+	{ok, Value}     -> Value
+    end.

+ 40 - 10
test/gproc_dist_tests.erl

@@ -50,6 +50,9 @@ dist_test_() ->
 			       	       ?debugVal(t_await_reg(Ns))
 			       end,
 			       fun() ->
+			       	       ?debugVal(t_await_self(Ns))
+			       end,
+			       fun() ->
 			       	       ?debugVal(t_await_reg_exists(Ns))
 			       end,
 			       fun() ->
@@ -108,6 +111,24 @@ t_await_reg([A,B|_]) ->
     ?assertMatch(ok, t_call(P, die)),
     ?assertMatch(ok, t_call(P1, die)).
 
+t_await_self([A|_]) ->
+    Name = ?T_NAME,
+    P = t_spawn(A, false),  % buffer unknowns
+    Ref = t_call(P, {apply, gproc, nb_wait, [Name]}),
+    ?assertMatch(ok, t_call(P, {selective, true})),
+    ?assertMatch(true, t_call(P, {apply, gproc, reg, [Name, some_value]})),
+    ?assertMatch({registered, {Name, P, some_value}},
+		 t_call(P, {apply_fun, fun() ->
+					       receive
+						   {gproc, Ref, R, Wh} ->
+						       {R, Wh}
+					       after 10000 ->
+						       timeout
+					       end
+				       end})),
+    ?assertMatch(ok, t_call(P, {selective, false})),
+    ?assertMatch(true, t_call(P, {apply, gproc, unreg, [Name]})).
+
 t_await_reg_exists([A,B|_]) ->
     Name = ?T_NAME,
     P = t_spawn(A),
@@ -149,21 +170,20 @@ t_sync(Ns) ->
 %% the other candidate doesn't respond too quickly.
 t_sync_cand_dies([A,B|_]) ->
     Leader = rpc:call(A, gproc_dist, get_leader, []),
-    Other = case Leader of 
+    Other = case Leader of
 		A -> B;
 		B -> A
 	    end,
     ?assertMatch(ok, rpc:call(Other, sys, suspend, [gproc_dist])),
     P = rpc:call(Other, erlang, whereis, [gproc_dist]),
     Key = rpc:async_call(Leader, gproc_dist, sync, []),
-    %% The overall timeout for gproc_dist:sync() is 5 seconds. Here, we should 
+    %% The overall timeout for gproc_dist:sync() is 5 seconds. Here, we should
     %% still be waiting.
     ?assertMatch(timeout, rpc:nb_yield(Key, 1000)),
     exit(P, kill),
     %% The leader should detect that the other candidate died and respond
     %% immediately. Therefore, we should have our answer well within 1 sec.
     ?assertMatch({value, true}, rpc:nb_yield(Key, 1000)).
-		    
 
 t_fail_node([A,B|_] = Ns) ->
     Na = ?T_NAME,
@@ -178,8 +198,7 @@ t_fail_node([A,B|_] = Ns) ->
     ?assertMatch(ok, t_lookup_everywhere(Nb, Ns, Pb)),
     ?assertMatch(ok, t_call(Pa, die)),
     ?assertMatch(ok, t_call(Pb, die)).
-    
-    
+
 t_sleep() ->
     timer:sleep(500).
 
@@ -198,13 +217,15 @@ t_lookup_everywhere(Key, Nodes, Exp, I) ->
        true ->
 	    ok
     end.
-				  
 
 t_spawn(Node) ->
+    t_spawn(Node, false).
+
+t_spawn(Node, Selective) when is_boolean(Selective) ->
     Me = self(),
     P = spawn(Node, fun() ->
 			    Me ! {self(), ok},
-			    t_loop()
+			    t_loop(Selective)
 		    end),
     receive
 	{P, ok} -> P
@@ -244,13 +265,22 @@ t_call(P, Req) ->
     end.
 
 t_loop() ->
+    t_loop(false).
+
+t_loop(Selective) when is_boolean(Selective) ->
     receive
 	{From, Ref, die} ->
 	    From ! {self(), Ref, ok};
+	{From, Ref, {selective, Bool}} when is_boolean(Bool) ->
+	    From ! {self(), Ref, ok},
+	    t_loop(Bool);
 	{From, Ref, {apply, M, F, A}} ->
 	    From ! {self(), Ref, apply(M, F, A)},
-	    t_loop();
-	Other ->
+	    t_loop(Selective);
+	{From, Ref, {apply_fun, F}} ->
+	    From ! {self(), Ref, F()},
+	    t_loop(Selective);
+	Other when not Selective ->
 	    ?debugFmt("got unknown msg: ~p~n", [Other]),
 	    exit({unknown_msg, Other})
     end.
@@ -259,7 +289,7 @@ start_slaves(Ns) ->
     [H|T] = Nodes = [start_slave(N) || N <- Ns],
     _ = [rpc:call(H, net, ping, [N]) || N <- T],
     Nodes.
-	       
+
 start_slave(Name) ->
     case node() of
         nonode@nohost ->

+ 56 - 6
test/gproc_tests.erl

@@ -22,6 +22,42 @@
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("stdlib/include/qlc.hrl").
 
+conf_test_() ->
+    {foreach,
+     fun() ->
+	     application:unload(gproc)
+     end,
+     fun(_) ->
+	     application:stop(gproc)
+     end,
+     [?_test(t_server_opts()),
+      ?_test(t_ets_opts())]}.
+
+t_server_opts() ->
+    H = 10000,
+    application:set_env(gproc, server_options, [{min_heap_size, H}]),
+    ?assert(ok == application:start(gproc)),
+    {min_heap_size, H1} = process_info(whereis(gproc), min_heap_size),
+    ?assert(is_integer(H1) andalso H1 > H).
+
+t_ets_opts() ->
+    %% Cannot inspect the write_concurrency attribute on an ets table in
+    %% any easy way, so trace on the ets:new/2 call and check the arguments.
+    application:set_env(gproc, ets_options, [{write_concurrency, false}]),
+    erlang:trace_pattern({ets,new, 2}, [{[gproc,'_'],[],[]}], [global]),
+    erlang:trace(new, true, [call]),
+    ?assert(ok == application:start(gproc)),
+    erlang:trace(new, false, [call]),
+    receive
+	{trace,_,call,{ets,new,[gproc,Opts]}} ->
+	    ?assertMatch({write_concurrency, false},
+			 lists:keyfind(write_concurrency,1,Opts))
+    after 3000 ->
+	    error(timeout)
+    end.
+
+
+
 reg_test_() ->
     {setup,
      fun() ->
@@ -39,6 +75,8 @@ reg_test_() ->
       , ?_test(t_is_clean())
       , {spawn, ?_test(t_await())}
       , ?_test(t_is_clean())
+      , {spawn, ?_test(t_await_self())}
+      , ?_test(t_is_clean())
       , {spawn, ?_test(t_simple_mreg())}
       , ?_test(t_is_clean())
       , {spawn, ?_test(t_gproc_crash())}
@@ -75,8 +113,6 @@ t_simple_reg() ->
     ?assert(gproc:unreg({n,l,name}) =:= true),
     ?assert(gproc:where({n,l,name}) =:= undefined).
 
-
-                       
 t_simple_prop() ->
     ?assert(gproc:reg({p,l,prop}) =:= true),
     ?assert(t_other_proc(fun() ->
@@ -96,7 +132,10 @@ t_other_proc(F) ->
 t_await() ->
     Me = self(),
     {_Pid,Ref} = spawn_monitor(
-                   fun() -> exit(?assert(gproc:await({n,l,t_await}) =:= {Me,val})) end),
+                   fun() ->
+			   exit(?assert(
+				   gproc:await({n,l,t_await}) =:= {Me,val}))
+		   end),
     ?assert(gproc:reg({n,l,t_await},val) =:= true),
     receive
         {'DOWN', Ref, _, _, R} ->
@@ -105,11 +144,23 @@ t_await() ->
             erlang:error(timeout)
     end.
 
+t_await_self() ->
+    Me = self(),
+    Ref = gproc:nb_wait({n, l, t_await_self}),
+    ?assert(gproc:reg({n, l, t_await_self}, some_value) =:= true),
+    ?assertEqual(true, receive
+			   {gproc, Ref, R, Wh} ->
+			       {registered, {{n, l, t_await_self},
+					     Me, some_value}} = {R, Wh},
+			       true
+		       after 10000 ->
+			       timeout
+		       end).
+
 t_is_clean() ->
     sys:get_status(gproc), % in order to synch
     T = ets:tab2list(gproc),
     ?assert(T =:= []).
-                                        
 
 t_simple_mreg() ->
     P = self(),
@@ -356,7 +407,6 @@ t_loop() ->
 	{From, die} ->
 	    From ! {self(), ok}
     end.
-    
 
 t_call(P, Msg) ->
     P ! {self(), Msg},
@@ -371,7 +421,7 @@ spawn_helper() ->
 		      ?assert(gproc:reg({n,l,self()}) =:= true),
 		      Ref = erlang:monitor(process, Parent),
 		      Parent ! {ok,self()},
-		      receive 
+		      receive
 			  {'DOWN', Ref, _, _, _} ->
 			      ok
 		      end