Browse Source

added experimental branch

git-svn-id: http://svn.ulf.wiger.net/gproc/branches/experimental-0906/gproc@14 f3948e33-8234-0410-8a80-a07eae3b6c4d
uwiger 16 years ago
parent
commit
3ea07ef79e
1 changed files with 49 additions and 398 deletions
  1. 49 398
      src/gproc.erl

+ 49 - 398
src/gproc.erl

@@ -20,9 +20,9 @@
 %% <p>For a detailed description, see gproc/doc/erlang07-wiger.pdf.</p>
 %% @end
 -module(gproc).
--behaviour(gen_leader).
+-behaviour(gen_server).
 
--export([start_link/0, start_link/1,
+-export([start_link/0,
 	 reg/2, unreg/1,
 	 mreg/3,
 	 set_value/2,
@@ -37,53 +37,32 @@
 	 last/1,
 	 table/1, table/2]).
 
--export([start_local/0, go_global/0, go_global/1]).
-
 %%% internal exports
 -export([init/1,
 	 handle_cast/2,
 	 handle_call/3,
 	 handle_info/2,
-	 handle_leader_call/4,
-	 handle_leader_cast/3,
-	 handle_DOWN/3,
-	 elected/2,
-	 surrendered/3,
-	 from_leader/3,
-	 code_change/4,
+	 code_change/3,
 	 terminate/2]).
 
--define(TAB, ?MODULE).
+-include("gproc.hrl").
+
 -define(SERVER, ?MODULE).
 
--record(state, {mode, is_leader}).
+-define(CHK_DIST, 
+	case whereis(gproc_dist) of
+	    undefined ->
+		erlang:error(local_only);
+	    _ ->
+		ok
+	end).
 
-start_local() ->
-    create_tabs(),
-    gen_leader:start(?SERVER, ?MODULE, [], []).
-
-go_global() ->
-    erlang:display({"calling go_global (Ns = ~p)~n", [node()|nodes()]}),
-    go_global([node()|nodes()]).
-
-go_global(Nodes) when is_list(Nodes) ->
-    erlang:display({"calling go_global(~p)~n", [node()|nodes()]}),
-    case whereis(?SERVER) of
-	undefined ->
-	    start_link(Nodes);
-	Pid ->
-	    link(Pid),
-	    ok = call({go_global, Nodes}),
-	    {ok, Pid}
-    end.
+-record(state, {}).
 
 start_link() ->
-    start_link([node()|nodes()]).
-
-start_link(Nodes) ->
     create_tabs(),
-    gen_leader:start_link(
-      ?SERVER, Nodes, [],?MODULE, [], [{debug,[trace]}]).
+    gen_server:start({local, ?SERVER}, ?MODULE, [], []).
+
 
 %%% @spec({Class,Scope, Key}, Value) -> true
 %%% @doc
@@ -95,7 +74,8 @@ start_link(Nodes) ->
 %%%
 reg({_,g,_} = Key, Value) ->
     %% anything global
-    leader_call({reg, Key, Value, self()});
+    ?CHK_DIST,
+    gproc_dist:reg(Key, Value);
 reg({T,l,_} = Key, Value) when T==n; T==a ->
     %% local names and aggregated counters
     call({reg, Key, Value});
@@ -113,9 +93,8 @@ reg(_, _) ->
     erlang:error(badarg).
 
 mreg(T, g, KVL) ->
-    if is_list(KVL) -> leader_call({mreg, T, g, KVL, self()});
-       true -> erlang:error(badarg)
-    end;
+    ?CHK_DIST,
+    gproc_dist:mreg(T, KVL);
 mreg(T, l, KVL) when T==a; T==n ->
     if is_list(KVL) -> call({mreg, T, l, KVL});
        true -> erlang:error(badarg)
@@ -127,13 +106,15 @@ mreg(_, _, _) ->
 
 unreg(Key) ->
     case Key of
-	{_, g, _} -> leader_call({unreg, Key, self()});
+	{_, g, _} ->
+	    ?CHK_DIST,
+	    gproc_dist:unreg(Key);
 	{T, l, _} when T == n;
 		       T == a -> call({unreg, Key});
 	{_, l, _} ->
 	    case ets:member(?TAB, {Key,self()}) of
 		true ->
-		    remove_reg(Key, self());
+		    gproc_lib:remove_reg(Key, self());
 		false ->
 		    erlang:error(badarg)
 	    end
@@ -153,98 +134,25 @@ select(Scope, Pat, NObjs) ->
 %%% no other process can interfere.
 %%%
 local_reg(Key, Value) ->
-    case insert_reg(Key, Value, self(), l) of
+    case gproc_lib:insert_reg(Key, Value, self(), l) of
 	false -> erlang:error(badarg);
 	true  -> monitor_me()
     end.
 
 local_mreg(_, []) -> true;
 local_mreg(T, [_|_] = KVL) ->
-    case insert_many(T, l, KVL, self()) of
+    case gproc_lib:insert_many(T, l, KVL, self()) of
 	false     -> erlang:error(badarg);
 	{true,_}  -> monitor_me()
     end.
 
 
-remove_reg(Key, Pid) ->
-    remove_reg_1(Key, Pid),
-    ets:delete(?TAB, {Pid,Key}).
 
-remove_reg_1({c,_,_} = Key, Pid) ->
-    remove_counter_1(Key, ets:lookup_element(?TAB, {Key,Pid}, 3), Pid);
-remove_reg_1({T,_,_} = Key, _Pid) when T==a; T==n ->
-    ets:delete(?TAB, {Key,T});
-remove_reg_1({_,_,_} = Key, Pid) ->
-    ets:delete(?TAB, {Key, Pid}).
-    
-remove_counter_1({c,C,N} = Key, Val, Pid) ->
-    update_aggr_counter(C, N, -Val),
-    ets:delete(?TAB, {Key, Pid}).
-
-
-insert_reg({T,_,Name} = K, Value, Pid, C) when T==a; T==n ->
-    %%% We want to store names and aggregated counters with the same
-    %%% structure as properties, but at the same time, we must ensure
-    %%% that the key is unique. We replace the Pid in the key part with
-    %%% an atom. To know which Pid owns the object, we lug the Pid around
-    %%% as payload as well. This is a bit redundant, but symmetric.
-    %%%
-    case ets:insert_new(?TAB, [{{K, T}, Pid, Value}, {{Pid,K}}]) of
-	true ->
-	    if T == a ->
-		    Initial = scan_existing_counters(C, Name),
-		    ets:insert(?TAB, {{K,a}, Pid, Initial});
-	       T == c ->
-		    update_aggr_counter(l, Name, Value);
-	       true ->
-		    true
-	    end,
-	    true;
-	false ->
-	    false
-    end;
-insert_reg(Key, Value, Pid, _C) ->
-    %% Non-unique keys; store Pid in the key part
-    K = {Key, Pid},
-    Kr = {Pid, Key},
-    ets:insert_new(?TAB, [{K, Pid, Value}, {Kr}]).
-
-insert_many(T, C, KVL, Pid) ->
-    Objs = mk_reg_objs(T, C, Pid, KVL),
-    case ets:insert_new(?TAB, Objs) of
-	true ->
-	    RevObjs = mk_reg_rev_objs(T, C, Pid, KVL),
-	    ets:insert(?TAB, RevObjs),
-	    {true, Objs};
-	false ->
-	    false
-    end.
 
-mk_reg_objs(T, C, _, L) when T == n; T == a ->
-    lists:map(fun({K,V}) ->
-		      {{{T,C,K},T}, V};
-		 (_) ->
-		      erlang:error(badarg)
-	      end, L);
-mk_reg_objs(p = T, C, Pid, L) ->
-    lists:map(fun({K,V}) ->
-		      {{{T,C,K},Pid}, V};
-		 (_) ->
-		      erlang:error(badarg)
-	      end, L).
-
-mk_reg_rev_objs(T, C, Pid, L) ->
-    [{Pid,{T,C,K}} || {K,_} <- L].
-			  
-
-set_value({T,g,_} = Key, Value) when T==a; T==c ->
-    if is_integer(Value) ->
-	    leader_call({set, Key, Value});
-       true ->
-	    erlang:error(badarg)
-    end;
+
 set_value({_,g,_} = Key, Value) ->
-    leader_call({set, Key, Value, self()});
+    ?CHK_DIST,
+    gproc_dist:set_value(Key, Value);
 set_value({a,l,_} = Key, Value) when is_integer(Value) ->
     call({set, Key, Value});
 set_value({n,l,_} = Key, Value) ->
@@ -254,34 +162,17 @@ set_value({n,l,_} = Key, Value) ->
 set_value({p,l,_} = Key, Value) ->
     %% we _can_ to this locally, since there is no race condition - no
     %% other process can update our properties.
-    case do_set_value(Key, Value, self()) of
+    case gproc_lib:do_set_value(Key, Value, self()) of
 	true -> true;
 	false ->
 	    erlang:error(badarg)
     end;
 set_value({c,l,_} = Key, Value) when is_integer(Value) ->
-    do_set_counter_value(Key, Value, self());
+    gproc_lib:do_set_counter_value(Key, Value, self());
 set_value(_, _) ->
     erlang:error(badarg).
 
 
-do_set_value({T,_,_} = Key, Value, Pid) ->
-    K2 = if T==n -> T;
-	    true -> Pid
-	 end,
-    case ets:member(?TAB, {Key, K2}) of
-	true ->
-	    ets:insert(?TAB, {{Key, K2}, Pid, Value});
-	false ->
-	    false
-    end.
-
-do_set_counter_value({_,C,N} = Key, Value, Pid) ->
-    OldVal = ets:lookup_element(?TAB, {Key, Pid}, 3), % may fail with badarg
-    update_aggr_counter(C, N, Value - OldVal),
-    ets:insert(?TAB, {{Key, Pid}, Pid, Value}).
-
-
 
 
 %%% @spec (Key) -> Value
@@ -303,18 +194,15 @@ get_value(_, _) ->
     erlang:error(badarg).
 
 
-update_counter({c,l,Ctr} = Key, Incr) when is_integer(Incr) ->
-    update_aggr_counter(l, Ctr, Incr),
-    ets:update_counter(?TAB, Key, {3,Incr});
+update_counter({c,l,_} = Key, Incr) when is_integer(Incr) ->
+    gproc_lib:update_counter(Key, Incr);
 update_counter({c,g,_} = Key, Incr) when is_integer(Incr) ->
-    leader_call({update_counter, Key, Incr, self()});
+    ?CHK_DIST,
+    gproc_dist:update_counter(Key, Incr);
 update_counter(_, _) ->
     erlang:error(badarg).
 
 
-update_aggr_counter(C, N, Val) ->
-    catch ets:update_counter(?TAB, {{a,C,N},a}, {3, Val}).
-
 
 
 send({T,C,_} = Key, Msg) when C==l; C==g ->
@@ -417,16 +305,8 @@ handle_cast({monitor_me, Pid}, S) ->
     erlang:monitor(process, Pid),
     {ok, S}.
 
-handle_call({go_global, Nodes}, _, S) ->
-    erlang:display({"got go_global (~p)~n", [Nodes]}),
-    case S#state.mode of
-	local ->
-	    {activate, Nodes, [], ok, S#state{mode = global}};
-	global ->
-	    {reply, badarg, S}
-    end;
 handle_call({reg, {_,l,_} = Key, Val}, {Pid,_}, S) ->
-    case insert_reg(Key, Val, Pid, l) of
+    case gproc_lib:insert_reg(Key, Val, Pid, l) of
 	false ->
 	    {reply, badarg, S};
 	true ->
@@ -436,20 +316,20 @@ handle_call({reg, {_,l,_} = Key, Val}, {Pid,_}, S) ->
 handle_call({unreg, {_,l,_} = Key}, {Pid,_}, S) ->
     case ets:member(?TAB, {Pid,Key}) of
 	true ->
-	    remove_reg(Key, Pid),
+	    gproc_lib:remove_reg(Key, Pid),
 	    {reply, true, S};
 	false ->
 	    {reply, badarg, S}
     end;
 handle_call({mreg, T, l, L}, {Pid,_}, S) ->
-    try	insert_many(T, l, L, Pid) of
+    try	gproc_lib:insert_many(T, l, L, Pid) of
 	{true,_} -> {reply, true, S};
 	false    -> {reply, badarg, S}
     catch
 	error:_  -> {reply, badarg, S}
     end;
 handle_call({set, {_,l,_} = Key, Value}, {Pid,_}, S) ->
-    case do_set_value(Key, Value, Pid) of
+    case gproc_lib:do_set_value(Key, Value, Pid) of
 	true ->
 	    {reply, true, S};
 	false ->
@@ -459,162 +339,18 @@ handle_call(_, _, S) ->
     {reply, badarg, S}.
 
 handle_info({'DOWN', _MRef, process, Pid, _}, S) ->
-    Keys = ets:select(?TAB, [{{{Pid,'$1'}}, [], ['$1']}]),
-    case lists:keymember(g, 2, Keys) of
-	true ->
-	    leader_cast({pid_is_DOWN, Pid});
-	false ->
-	    ok
-    end,
-    ets:select_delete(?TAB, [{{{Pid,'_'}}, [], [true]}]),
+    Keys = ets:select(?TAB, [{{{Pid,'$1'}},
+			      [{'==',{element,2,'$1'},l}], ['$1']}]),
+    ets:select_delete(?TAB, [{{{Pid,{'_',l,'_'}}}, [], [true]}]),
     ets:delete(?TAB, Pid),
-    lists:foreach(fun(Key) -> remove_reg_1(Key, Pid) end, Keys),
+    lists:foreach(fun(Key) -> gproc_lib:remove_reg_1(Key, Pid) end, Keys),
     {ok, S};
 handle_info(_, S) ->
     {ok, S}.
 
 
-elected(S, _E) ->
-    Globs = ets:select(?TAB, [{{{{'_',g,'_'},'_'},'_','_'},[],['$_']}]),
-    {ok, {globals, Globs}, S#state{is_leader = true}}.
-
-surrendered(S, {globals, Globs}, _E) ->
-    %% globals from this node should be more correct in our table than
-    %% in the leader's
-    surrendered_1(Globs),
-    {ok, S#state{is_leader = false}}.
-
 
-handle_DOWN(Node, S, _E) ->
-    Head = {{{'_',g,'_'},'_'},'$1','_'},
-    Gs = [{'==', {node,'$1'},Node}],
-    Globs = ets:select(?TAB, [{Head, Gs, [{element,1,'$_'}]}]),
-    ets:select_delete(?TAB, [{Head, Gs, [true]}]),
-    {ok, [{delete, Globs}], S}.
-
-handle_leader_call(_, _, #state{mode = local} = S, _) ->
-    {reply, badarg, S};
-handle_leader_call({reg, {C,g,Name} = K, Value, Pid}, _From, S, _E) ->
-    case insert_reg(K, Value, Pid, g) of
-	false ->
-	    {reply, badarg, S};
-	true ->
-	    ensure_monitor(Pid),
-	    Vals =
-		if C == a ->
-			ets:lookup(?TAB, {K,a});
-		   C == c ->
-			case ets:lookup(?TAB, {{a,g,Name},a}) of
-			    [] ->
-				ets:lookup(?TAB, {K,Pid});
-			    [AC] ->
-				[AC | ets:lookup(?TAB, {K,Pid})]
-			end;
-		   C == n ->
-			[{{K,n},Pid,Value}];
-		   true ->
-			[{{K,Pid},Pid,Value}]
-		end,
-	    {reply, true, [{insert, Vals}], S}
-    end;
-handle_leader_call({unreg, {T,g,Name} = K, Pid}, _From, S, _E) ->
-    Key = if T == n; T == a -> {K,T};
-	     true -> {K, Pid}
-	  end,
-    case ets:member(?TAB, Key) of
-	true ->
-	    remove_reg(K, Pid),
-	    if T == c ->
-		    case ets:lookup(?TAB, {{a,g,Name},a}) of
-			[Aggr] ->
-			    %% updated by remove_reg/2
-			    {reply, true, [{delete,[{Key,Pid}]},
-					   {insert, [Aggr]}], S};
-			[] ->
-			    {reply, true, [{delete, [{Key, Pid}]}], S}
-		    end;
-	       true ->
-		    {reply, true, [{delete, [{Key,Pid}]}], S}
-	    end;
-	false ->
-	    {reply, badarg, S}
-    end;
-handle_leader_call({mreg, T, g, L, Pid}, _From, S, _E) ->
-    if T==p; T==n ->
-	    try insert_many(T, g, Pid, L) of
-		{true,Objs} -> {reply, true, [{insert,Objs}], S};
-		false       -> {reply, badarg, S}
-	    catch
-		error:_     -> {reply, badarg, S}
-	    end;
-       true -> {reply, badarg, S}
-    end;
-handle_leader_call({set,{T,g,N} =K,V,Pid}, _From, S, _E) ->
-    if T == a ->
-	    if is_integer(V) ->
-		    case do_set_value(K, V, Pid) of
-			true  -> {reply, true, [{insert,[{{K,T},Pid,V}]}], S};
-			false -> {reply, badarg, S}
-		    end
-	    end;
-       T == c ->
-	    try do_set_counter_value(K, V, Pid),
-		AKey = {{a,g,N},a},
-		Aggr = ets:lookup(?TAB, AKey),  % may be []
-		{reply, true, [{insert, [{{K,Pid},Pid,V} | Aggr]}], S}
-	    catch
-		error:_ ->
-		    {reply, badarg, S}
-	    end;
-       true ->
-	    case do_set_value(K, V, Pid) of
-		true ->
-		    Obj = if T==n -> {{K, T}, Pid, V};
-			     true -> {{K, Pid}, Pid, V}
-			  end,
-		    {reply, true, [{insert,[Obj]}], S};
-		false ->
-		    {reply, badarg, S}
-	    end
-    end;
-handle_leader_call(_, _, S, _E) ->
-    {reply, badarg, S}.
-
-handle_leader_cast(_, #state{mode = local} = S, _E) ->
-    {ok, S};
-handle_leader_cast({add_globals, Missing}, S, _E) ->
-    %% This is an audit message: a peer (non-leader) had info about granted
-    %% global resources that we didn't know of when we became leader.
-    %% This could happen due to a race condition when the old leader died.
-    ets:insert(?TAB, Missing),
-    {ok, [{insert, Missing}], S};
-handle_leader_cast({remove_globals, Globals}, S, _E) ->
-    delete_globals(Globals),
-    {ok, S};
-handle_leader_cast({pid_is_DOWN, Pid}, S, _E) ->
-    Keys = ets:select(?TAB, [{{{Pid,'$1'}},[],['$1']}]),
-    Globals = if node(Pid) =/= node() ->
-		      Keys;
-		 true ->
-		      [K || K <- Keys, element(2,K) == g]
-	      end,
-    ets:select_delete(?TAB, [{{{Pid,'_'}},[],[true]}]),
-    ets:delete(?TAB, Pid),
-    Modified = 
-	lists:foldl(
-	  fun({T,_,_}=K,A) when T==a;T==n -> ets:delete(?TAB, {K,T}), A;
-	     ({c,_,_}=K,A) -> cleanup_counter(K, Pid, A);
-	     (K,A) -> ets:delete(?TAB, {K,Pid}), A
-	  end, [], Keys),
-    case [{Op,Objs} || {Op,Objs} <- [{insert,Modified},
-				     {remove,Globals}], Objs =/= []] of
-	[] ->
-	    {ok, S};
-	Broadcast ->
-	    {ok, Broadcast, S}
-    end.
-
-code_change(_FromVsn, S, _Extra, _E) ->
+code_change(_FromVsn, S, _Extra) ->
     {ok, S}.
 
 terminate(_Reason, _S) ->
@@ -623,105 +359,24 @@ terminate(_Reason, _S) ->
 
 
 
-cleanup_counter({c,g,N}=K, Pid, Acc) ->
-    remove_reg(K,Pid),
-    case ets:lookup(?TAB, {{a,g,N},a}) of
-	[Aggr] ->
-	    [Aggr|Acc];
-	[] ->
-	    Acc
-    end;
-cleanup_counter(K, Pid, Acc) ->
-    remove_reg(K,Pid),
-    Acc.
-
-from_leader(Ops, S, _E) ->
-    lists:foreach(
-      fun({delete, Globals}) ->
-	      delete_globals(Globals);
-	 ({insert, Globals}) ->
-	      ets:insert(?TAB, Globals),
-	      lists:foreach(
-		fun({{{_,g,_}=Key,_}, P, _}) ->
-			ets:insert(?TAB, {{P,Key}}),
-			ensure_monitor(P)
-		end, Globals)
-      end, Ops),
-    {ok, S}.
-
-delete_globals(Globals) ->
-    lists:foreach(
-      fun({{Key,_}=K, Pid}) ->
-	      ets:delete(?TAB, K),
-	      ets:delete(?TAB, {{Pid, Key}})
-      end, Globals).
-    
-
-
 call(Req) ->
-    case gen_leader:call(?MODULE, Req) of
+    case gen_server:call(?MODULE, Req) of
 	badarg -> erlang:error(badarg, Req);
 	Reply  -> Reply
     end.
 
 cast(Msg) ->
-    gen_leader:cast(?MODULE, Msg).
-
-leader_call(Req) ->
-    case gen_leader:leader_call(?MODULE, Req) of
-	badarg -> erlang:error(badarg, Req);
-	Reply  -> Reply
-    end.
+    gen_server:cast(?MODULE, Msg).
 
-leader_cast(Msg) ->
-    gen_leader:leader_cast(?MODULE, Msg).
-	     
 
 
 create_tabs() ->
     ets:new(?MODULE, [ordered_set, public, named_table]).
 
-init({local_only,[]}) ->
-    {ok, #state{mode = local}};
 init([]) ->
-    {ok, #state{mode = global}}.
-
-
-surrendered_1(Globs) ->
-    My_local_globs =
-	ets:select(?TAB, [{{{{'_',g,'_'},'_'},'$1', '_'},
-			   [{'==', {node,'$1'}, node()}],
-			   ['$_']}]),
-    %% remove all remote globals - we don't have monitors on them.
-    ets:select_delete(?TAB, [{{{{'_',g,'_'},'_'}, '$1', '_'},
-			      [{'=/=', {node,'$1'}, node()}],
-			      [true]}]),
-    %% insert new non-local globals, collect the leader's version of
-    %% what my globals are
-    Ldr_local_globs =
-	lists:foldl(
-	  fun({{Key,_}=K, Pid, V}, Acc) when node(Pid) =/= node() ->
-		  ets:insert(?TAB, [{K, Pid, V}, {{Pid,Key}}]),
-		  Acc;
-	     ({_, Pid, _} = Obj, Acc) when node(Pid) == node() ->
-		  [Obj|Acc]
-	  end, [], Globs),
-    case [{K,P,V} || {K,P,V} <- My_local_globs,
-		   not(lists:keymember(K, 1, Ldr_local_globs))] of
-	[] ->
-	    %% phew! We have the same picture
-	    ok;
-	[_|_] = Missing ->
-	    %% This is very unlikely, I think
-	    leader_cast({add_globals, Missing})
-    end,
-    case [{K,P} || {K,P,_} <- Ldr_local_globs,
-		   not(lists:keymember(K, 1, My_local_globs))] of
-	[] ->
-	    ok;
-	[_|_] = Remove ->
-	    leader_cast({remove_globals, Remove})
-    end.
+    {ok, #state{}}.
+
+
 
 
 ensure_monitor(Pid) when node(Pid) == node() ->
@@ -741,11 +396,6 @@ monitor_me() ->
     end.
 
 
-scan_existing_counters(Ctxt, Name) ->
-    Head = {{{c,Ctxt,Name},'_'},'_','$1'},
-    Cs = ets:select(?TAB, [{Head, [], ['$1']}]),
-    lists:sum(Cs).
-
 
 
 pattern([{'_', Gs, As}], T) ->
@@ -984,3 +634,4 @@ is_unique({_,n}) -> true;
 is_unique({_,a}) -> true;
 is_unique(_) -> false.
      
+