Browse Source

First reasonably well working Distributed Gproc

This version requires the following gen_leader version

http://github.com/uwiger/gen_leader_revival/tree/master/hanssv+serge_version/

(At commit time, other gen_leader versions handle DOWN messages
incorrectly, causing gproc to malfunction.)
Ulf Wiger 15 years ago
parent
commit
0fae114946
4 changed files with 207 additions and 91 deletions
  1. 1 1
      Makefile
  2. 72 34
      src/gproc.erl
  3. 95 37
      src/gproc_dist.erl
  4. 39 19
      src/gproc_lib.erl

+ 1 - 1
Makefile

@@ -1,6 +1,6 @@
 ## The MIT License
 ##
-## Copyright (c) 2008-2010 Ulf Wiger <ulf.wiger@erlang-solutions.com>,
+## Copyright (c) 2008-2010 Ulf Wiger <ulf@wiger.net>,
 ##
 ## Permission is hereby granted, free of charge, to any person obtaining a
 ## copy of this software and associated documentation files (the "Software"),

+ 72 - 34
src/gproc.erl

@@ -281,6 +281,9 @@ default(_) -> undefined.
 await(Key) ->
     await(Key, infinity).
 
+await({n,g,_} = Key, Timeout) ->
+    ?CHK_DIST,
+    request_wait(Key, Timeout);
 await({n,l,_} = Key, Timeout) ->
     case ets:lookup(?TAB, {Key, n}) of
         [{_, Pid, Value}] ->
@@ -291,7 +294,7 @@ await({n,l,_} = Key, Timeout) ->
 await(K, T) ->
     erlang:error(badarg, [K, T]).
 
-request_wait({n,l,_} = Key, Timeout) ->
+request_wait({n,C,_} = Key, Timeout) when C==l; C==g ->
     TRef = case Timeout of
                infinity -> no_timer;
                T when is_integer(T), T > 0 ->
@@ -299,7 +302,7 @@ request_wait({n,l,_} = Key, Timeout) ->
                _ ->
                    erlang:error(badarg, [Key, Timeout])
            end,
-    WRef = call({await,Key}),
+    WRef = call({await,Key,self()}, C),
     receive
         {gproc, WRef, registered, {_K, Pid, V}} ->
             {Pid, V};
@@ -316,13 +319,20 @@ request_wait({n,l,_} = Key, Timeout) ->
 %% {gproc, Ref, registered, {Key, Pid, Value}}, once the name is registered.
 %% @end
 %%
+nb_wait({n,g,_} = Key) ->
+    ?CHK_DIST,
+    call({await, Key, self()}, g);
 nb_wait({n,l,_} = Key) ->
-    call({await, Key});
+    call({await, Key, self()}, l);
 nb_wait(Key) ->
     erlang:error(badarg, [Key]).
 
-cancel_wait(Key, Ref) ->
-    cast({cancel_wait, self(), Key, Ref}),
+cancel_wait({_,g,_} = Key, Ref) ->
+    ?CHK_DIST,
+    cast({cancel_wait, self(), Key, Ref}, g),
+    ok;
+cancel_wait({_,l,_} = Key, Ref) ->
+    cast({cancel_wait, self(), Key, Ref}, l),
     ok.
             
 
@@ -754,7 +764,7 @@ handle_cast({cancel_wait, Pid, {T,_,_} = Key, Ref}, S) ->
 handle_call({reg, {_T,l,_} = Key, Val}, {Pid,_}, S) ->
     case try_insert_reg(Key, Val, Pid) of
         true ->
-            ensure_monitor(Pid),
+            gproc_lib:ensure_monitor(Pid,l),
             {reply, true, S};
         false ->
             {reply, badarg, S}
@@ -767,24 +777,33 @@ handle_call({unreg, {_,l,_} = Key}, {Pid,_}, S) ->
         false ->
             {reply, badarg, S}
     end;
-handle_call({await, {T,l,_} = Key}, {Pid,Ref} = From, S) ->
-    Rev = {{Pid,Key}, r},
-    case ets:lookup(?TAB, {Key,T}) of
-        [{_, P, Value}] ->
-            %% for symmetry, we always reply with Ref and then send a message
-            gen_server:reply(From, Ref),
-            Pid ! {gproc, Ref, registered, {Key, P, Value}},
+handle_call({await, {_,l,_} = Key, Pid}, {_, Ref}, S) ->
+    %% Passing the pid explicitly is needed when leader_call is used,
+    %% since the Pid given as From in the leader is the local gen_leader
+    %% instance on the calling node.
+    case gproc_lib:await(Key, {Pid, Ref}) of
+        noreply ->
             {noreply, S};
-        [{K, Waiters}] ->
-            NewWaiters = [{Pid,Ref} | Waiters],
-            ets:insert(?TAB, [{K, NewWaiters}, Rev]),
-            gproc_lib:ensure_monitor(Pid),
-            {reply, Ref, S};
-        [] ->
-            ets:insert(?TAB, [{{Key,T}, [{Pid,Ref}]}, Rev]),
-            gproc_lib:ensure_monitor(Pid),
-            {reply, Ref, S}
+        {reply, Reply, _} ->
+            {reply, Reply, S}
     end;
+%%     Rev = {{Pid,Key}, r},
+%%     case ets:lookup(?TAB, {Key,T}) of
+%%         [{_, P, Value}] ->
+%%             %% for symmetry, we always reply with Ref and then send a message
+%%             gen_server:reply(From, Ref),
+%%             Pid ! {gproc, Ref, registered, {Key, P, Value}},
+%%             {noreply, S};
+%%         [{K, Waiters}] ->
+%%             NewWaiters = [{Pid,Ref} | Waiters],
+%%             ets:insert(?TAB, [{K, NewWaiters}, Rev]),
+%%             gproc_lib:ensure_monitor(Pid,l),
+%%             {reply, Ref, S};
+%%         [] ->
+%%             ets:insert(?TAB, [{{Key,T}, [{Pid,Ref}]}, Rev]),
+%%             gproc_lib:ensure_monitor(Pid,l),
+%%             {reply, Ref, S}
+%%     end;
 handle_call({mreg, T, l, L}, {Pid,_}, S) ->
     try gproc_lib:insert_many(T, l, L, Pid) of
         {true,_} -> {reply, true, S};
@@ -812,6 +831,14 @@ handle_info(_, S) ->
 
 %% @hidden
 code_change(_FromVsn, S, _Extra) ->
+    %% We have changed local monitor markers from {Pid} to {Pid,l}.
+    case ets:select(?TAB, [{{'$1'},[],['$1']}]) of
+        [] ->
+            ok;
+        Pids ->
+            ets:insert(?TAB, [{P,l} || P <- Pids]),
+            ets:select_delete(?TAB, [{{'_'},[],[true]}])
+    end,
     {ok, S}.
 
 %% @hidden
@@ -819,17 +846,28 @@ terminate(_Reason, _S) ->
     ok.
 
 
+call(Req) ->
+    call(Req, l).
 
+call(Req, l) ->
+    chk_reply(gen_server:call(?MODULE, Req), Req);
+call(Req, g) ->
+    chk_reply(gproc_dist:leader_call(Req), Req).
 
-call(Req) ->
-    case gen_server:call(?MODULE, Req) of
+chk_reply(Reply, Req) ->
+    case Reply of
         badarg -> erlang:error(badarg, Req);
         Reply  -> Reply
     end.
 
 
 cast(Msg) ->
-    gen_server:cast(?MODULE, Msg).
+    cast(Msg, l).
+
+cast(Msg, l) ->
+    gen_server:cast(?MODULE, Msg);
+cast(Msg, g) ->
+    gproc_dist:leader_cast(Msg).
 
 
 
@@ -860,7 +898,7 @@ process_is_down(Pid) ->
     Keys = ets:select(?TAB, [{{{Pid,'$1'},'$2'},
                               [{'==',{element,2,'$1'},l}], [{{'$1','$2'}}]}]),
     ets:select_delete(?TAB, [{{{Pid,{'_',l,'_'}},'_'}, [], [true]}]),
-    ets:delete(?TAB, Pid),
+    ets:delete(?TAB, {Pid,l}),
     lists:foreach(fun({Key,r}) ->
                           gproc_lib:remove_reg_1(Key, Pid);
                      ({Key,w}) ->
@@ -879,16 +917,16 @@ init([]) ->
 
 
 
-ensure_monitor(Pid) when node(Pid) == node() ->
-    case ets:insert_new(?TAB, {Pid}) of
-        false -> ok;
-        true  -> erlang:monitor(process, Pid)
-    end;
-ensure_monitor(_) ->
-    true.
+%% ensure_monitor(Pid) when node(Pid) == node() ->
+%%     case ets:insert_new(?TAB, {Pid}) of
+%%         false -> ok;
+%%         true  -> erlang:monitor(process, Pid)
+%%     end;
+%% ensure_monitor(_) ->
+%%     true.
 
 monitor_me() ->
-    case ets:insert_new(?TAB, {self()}) of
+    case ets:insert_new(?TAB, {self(),l}) of
         false -> true;
         true  ->
             cast({monitor_me,self()}),

+ 95 - 37
src/gproc_dist.erl

@@ -23,11 +23,13 @@
 -behaviour(gen_leader).
 
 -export([start_link/0, start_link/1,
-	 reg/2, unreg/1,
+	 reg/1, reg/2, unreg/1,
 	 mreg/2,
 	 set_value/2,
 	 update_counter/2]).
 
+-export([leader_call/1, leader_cast/1]).
+
 %%% internal exports
 -export([init/1,
 	 handle_cast/2,
@@ -63,6 +65,11 @@ start_link({Nodes, Opts}) ->
     
 %%       ?SERVER, Nodes, [],?MODULE, [], [{debug,[trace]}]).
 
+
+reg(Key) ->
+    reg(Key, gproc:default(Key)).
+
+
 %%% @spec({Class,Scope, Key}, Value) -> true
 %%% @doc
 %%%    Class = n  - unique name
@@ -120,22 +127,17 @@ handle_call(_, _, S) ->
     {reply, badarg, S}.
 
 handle_info({'DOWN', _MRef, process, Pid, _}, S) ->
-    Keys = ets:select(?TAB, [{{{Pid,'$1'}}, [], ['$1']}]),
-    case lists:keymember(g, 2, Keys) of
-	true ->
-	    leader_cast({pid_is_DOWN, Pid});
-	false ->
-	    ok
-    end,
-    ets:select_delete(?TAB, [{{{Pid,'_'}}, [], [true]}]),
-    ets:delete(?TAB, Pid),
-    lists:foreach(fun(Key) -> gproc_lib:remove_reg_1(Key, Pid) end, Keys),
+    leader_cast({pid_is_DOWN, Pid}),
+%%     ets:select_delete(?TAB, [{{{Pid,'_'}}, [], [true]}]),
+%%     ets:delete(?TAB, Pid),
+%%     lists:foreach(fun(Key) -> gproc_lib:remove_reg_1(Key, Pid) end, Keys),
     {ok, S};
 handle_info(_, S) ->
     {ok, S}.
 
 
 elected(S, _E) ->
+    io:fwrite("elected(_, E = ~p)~n", [_E]),
     {ok, {globals,globs()}, S#state{is_leader = true}}.
 
 elected(S, _E, undefined) ->
@@ -166,26 +168,29 @@ surrendered(S, {globals, Globs}, _E) ->
 handle_DOWN(Node, S, _E) ->
     Head = {{{'_',g,'_'},'_'},'$1','_'},
     Gs = [{'==', {node,'$1'},Node}],
-    Globs = ets:select(?TAB, [{Head, Gs, [{element,1,'$_'}]}]),
-    ets:select_delete(?TAB, [{Head, Gs, [true]}]),
-    {ok, [{delete, Globs}], S}.
+    Globs = ets:select(?TAB, [{Head, Gs, [{{{element,1,{element,1,'$_'}},
+                                            {element,2,'$_'}}}]}]),
+    io:fwrite("handle_DOWN(~p); Globs = ~p~n", [Node, Globs]),
+    case process_globals(Globs) of
+        [] ->
+            {ok, S};
+        Broadcast ->
+            {ok, Broadcast, S}
+    end.
+%%     ets:select_delete(?TAB, [{Head, Gs, [true]}]),
+%%     {ok, [{delete, Globs}], S}.
 
 handle_leader_call({reg, {C,g,Name} = K, Value, Pid}, _From, S, _E) ->
     case gproc_lib:insert_reg(K, Value, Pid, g) of
 	false ->
 	    {reply, badarg, S};
 	true ->
-	    gproc_lib:ensure_monitor(Pid),
+	    gproc_lib:ensure_monitor(Pid,g),
 	    Vals =
 		if C == a ->
 			ets:lookup(?TAB, {K,a});
 		   C == c ->
-			case ets:lookup(?TAB, {{a,g,Name},a}) of
-			    [] ->
-				ets:lookup(?TAB, {K,Pid});
-			    [AC] ->
-				[AC | ets:lookup(?TAB, {K,Pid})]
-			end;
+                        [{{K,Pid},Pid,Value} | ets:lookup(?TAB,{{a,g,Name},a})];
 		   C == n ->
 			[{{K,n},Pid,Value}];
 		   true ->
@@ -193,6 +198,15 @@ handle_leader_call({reg, {C,g,Name} = K, Value, Pid}, _From, S, _E) ->
 		end,
 	    {reply, true, [{insert, Vals}], S}
     end;
+handle_leader_call({update_counter, {c,g,Ctr} = Key, Incr, Pid}, _From, S, _E)
+  when is_integer(Incr) ->
+    try New = ets:update_counter(?TAB, {Key, Pid}, {3,Incr}),
+        Vals = [{{Key,Pid},Pid,New} | update_aggr_counter(Ctr,Incr)],
+        {reply, New, [{insert, Vals}], S}
+    catch
+        error:_ ->
+            {reply, badarg, S}
+    end;
 handle_leader_call({unreg, {T,g,Name} = K, Pid}, _From, S, _E) ->
     Key = if T == n; T == a -> {K,T};
 	     true -> {K, Pid}
@@ -253,6 +267,15 @@ handle_leader_call({set,{T,g,N} =K,V,Pid}, _From, S, _E) ->
 		    {reply, badarg, S}
 	    end
     end;
+handle_leader_call({await, Key, Pid}, {_,Ref} = _From, S, _E) ->
+    %% The pid in _From is of the gen_leader instance that forwarded the
+    %% call - not of the client. This is why the Pid is explicitly passed.
+    case gproc_lib:await(Key, {Pid,Ref}) of
+        noreply ->
+            {noreply, S};
+        {reply, Reply, Insert} ->
+            {reply, Reply, [{insert, Insert}], S}
+    end;
 handle_leader_call(_, _, S, _E) ->
     {reply, badarg, S}.
 
@@ -266,24 +289,38 @@ handle_leader_cast({remove_globals, Globals}, S, _E) ->
     delete_globals(Globals),
     {ok, S};
 handle_leader_cast({pid_is_DOWN, Pid}, S, _E) ->
-    Globals = ets:select(?TAB, [{{{Pid,'$1'}},
-				 [{'==',{element,2,'$1'},g}],['$1']}]),
-    ets:select_delete(?TAB, [{{{Pid,{'_',g,'_'}}},[],[true]}]),
-    ets:delete(?TAB, Pid),
-    Modified = 
-	lists:foldl(
-	  fun({T,_,_}=K,A) when T==a;T==n -> ets:delete(?TAB, {K,T}), A;
-	     ({c,_,_}=K,A) -> gproc_lib:cleanup_counter(K, Pid, A);
-	     (K,A) -> ets:delete(?TAB, {K,Pid}), A
-	  end, [], Globals),
-    case [{Op,Objs} || {Op,Objs} <- [{insert,Modified},
-				     {remove,Globals}], Objs =/= []] of
+    Globals = ets:select(?TAB, [{{{Pid,'$1'},r},
+				 [{'==',{element,2,'$1'},g}],[{{'$1',Pid}}]}]),
+    io:fwrite("pid_is_DOWN(~p); Globals = ~p~n", [Pid,Globals]),
+%%     ets:select_delete(?TAB, [{{{Pid,{'_',g,'_'}},r},[],[true]}]),
+    ets:delete(?TAB, {Pid,g}),
+    case process_globals(Globals) of
 	[] ->
 	    {ok, S};
 	Broadcast ->
 	    {ok, Broadcast, S}
     end.
 
+process_globals(Globals) ->
+    Modified = 
+        lists:foldl(
+          fun({{T,_,_} = Key, Pid}, A) ->
+                  A1 = case T of
+                           c ->
+                               Incr = ets:lookup_element(?TAB, {Key,Pid}, 3),
+                               update_aggr_counter(Key, -Incr) ++ A;
+                           _ ->
+                               A
+                       end,
+                  K = ets_key(Key, Pid),
+                  ets:delete(?TAB, K),
+                  ets:delete(?TAB, {Pid,Key}),
+                  A1
+          end, [], Globals),
+    [{Op,Objs} || {Op,Objs} <- [{insert,Modified},
+                                {delete,Globals}], Objs =/= []].
+
+
 code_change(_FromVsn, S, _Extra, _E) ->
     {ok, S}.
 
@@ -301,18 +338,28 @@ from_leader(Ops, S, _E) ->
 	      ets:insert(?TAB, Globals),
 	      lists:foreach(
 		fun({{{_,g,_}=Key,_}, P, _}) ->
-			ets:insert(?TAB, {{P,Key}}),
-			gproc_lib:ensure_monitor(P)
+			ets:insert(?TAB, {{P,Key},r}),
+			gproc_lib:ensure_monitor(P,g);
+                   ({{P,_K},r}) ->
+                        gproc_lib:ensure_monitor(P,g);
+                   (_) ->
+                        skip
 		end, Globals)
       end, Ops),
     {ok, S}.
 
 delete_globals(Globals) ->
     lists:foreach(
-      fun({{Key,_}=K, Pid}) ->
+      fun({Key, Pid}) ->
+              K = ets_key(Key,Pid),
 	      ets:delete(?TAB, K),
-	      ets:delete(?TAB, {{Pid, Key}})
+	      ets:delete(?TAB, {Pid, Key})
       end, Globals).
+
+ets_key({T,_,_} = K, _) when T==n; T==a ->
+    {K, T};
+ets_key(K, Pid) ->
+    {K, Pid}.
     
 
 leader_call(Req) ->
@@ -369,3 +416,14 @@ surrendered_1(Globs) ->
 	    leader_cast({remove_globals, Remove})
     end.
 
+
+update_aggr_counter({c,g,Ctr}, Incr) ->
+    Key = {{a,g,Ctr},a},
+    case ets:lookup(?TAB, Key) of
+        [] ->
+            [];
+        [{K, Pid, Prev}] ->
+            New = {K, Pid, Prev+Incr},
+            ets:insert(?TAB, New),
+            [New]
+    end.

+ 39 - 19
src/gproc_lib.erl

@@ -51,12 +51,12 @@ insert_reg({T,_,Name} = K, Value, Pid, C) when T==a; T==n ->
                     false
             end
     end;
-insert_reg({c,l,Ctr} = Key, Value, Pid, _C) ->
+insert_reg({c,C,Ctr} = Key, Value, Pid, _C) when C==l; C==g ->
     %% Non-unique keys; store Pid in the key part
     K = {Key, Pid},
     Kr = {Pid, Key},
     Res = ets:insert_new(?TAB, [{K, Pid, Value}, {Kr,r}]),
-    update_aggr_counter(l, Ctr, Value),
+    update_aggr_counter(g, Ctr, Value),
     Res;
 insert_reg(Key, Value, Pid, _C) ->
     %% Non-unique keys; store Pid in the key part
@@ -96,6 +96,29 @@ insert_objects(Objs) ->
       end, Objs).
 
 
+await({T,C,_} = Key, {Pid, Ref} = From) ->
+    Rev = {{Pid,Key}, r},
+    case ets:lookup(?TAB, {Key,T}) of
+        [{_, P, Value}] ->
+            %% for symmetry, we always reply with Ref and then send a message
+            gen_server:reply(From, Ref),
+            Pid ! {gproc, Ref, registered, {Key, P, Value}},
+            noreply;
+        [{K, Waiters}] ->
+            NewWaiters = [{Pid,Ref} | Waiters],
+            W = {K, NewWaiters},
+            ets:insert(?TAB, [W, Rev]),
+            gproc_lib:ensure_monitor(Pid,C),
+            {reply, Ref, [W,Rev]};
+        [] ->
+            W = {{Key,T}, [{Pid,Ref}]},
+            ets:insert(?TAB, [W, Rev]),
+            gproc_lib:ensure_monitor(Pid,C),
+            {reply, Ref, [W,Rev]}
+    end.
+
+
+
 maybe_waiters(K, Pid, Value, T, Info) ->
     case ets:lookup(?TAB, {K,T}) of
         [{_, Waiters}] when is_list(Waiters) ->
@@ -131,14 +154,11 @@ mk_reg_rev_objs(T, C, Pid, L) ->
     [{{Pid,{T,C,K}},r} || {K,_} <- L].
 
 
-
-ensure_monitor(Pid) when node(Pid)==node() ->
-    case ets:insert_new(?TAB, {Pid}) of
+ensure_monitor(Pid,C) when C==g; C==l ->
+    case node(Pid) == node() andalso ets:insert_new(?TAB, {Pid,C}) of
         false -> ok;
         true  -> erlang:monitor(process, Pid)
-    end;
-ensure_monitor(_) ->
-    true.
+    end.
 
 remove_reg(Key, Pid) ->
     remove_reg_1(Key, Pid),
@@ -185,17 +205,17 @@ update_counter({c,l,Ctr} = Key, Incr, Pid) ->
 update_aggr_counter(C, N, Val) ->
     catch ets:update_counter(?TAB, {{a,C,N},a}, {3, Val}).
 
-cleanup_counter({c,g,N}=K, Pid, Acc) ->
-    remove_reg(K,Pid),
-    case ets:lookup(?TAB, {{a,g,N},a}) of
-        [Aggr] ->
-            [Aggr|Acc];
-        [] ->
-            Acc
-    end;
-cleanup_counter(K, Pid, Acc) ->
-    remove_reg(K,Pid),
-    Acc.
+%% cleanup_counter({c,g,N}=K, Pid, Acc) ->
+%%     remove_reg(K,Pid),
+%%     case ets:lookup(?TAB, {{a,g,N},a}) of
+%%         [Aggr] ->
+%%             [Aggr|Acc];
+%%         [] ->
+%%             Acc
+%%     end;
+%% cleanup_counter(K, Pid, Acc) ->
+%%     remove_reg(K,Pid),
+%%     Acc.
 
 scan_existing_counters(Ctxt, Name) ->
     Head = {{{c,Ctxt,Name},'_'},'_','$1'},