Browse Source

bug in gproc:send/2 + gproc_dist work

Ulf Wiger 15 years ago
parent
commit
27210d2576
3 changed files with 97 additions and 22 deletions
  1. 55 3
      src/Unit-Quick-Files/gproc_eqc.erl
  2. 6 10
      src/gproc.erl
  3. 36 9
      src/gproc_dist.erl

+ 55 - 3
src/Unit-Quick-Files/gproc_eqc.erl

@@ -47,6 +47,7 @@
         {pids=[]        %% [pid()]
          , killed=[]    %% [pid()]
          , regs=[]      %% [reg()]
+         , waiters=[]   %% [{key(),pid()}]
         }).
 
 
@@ -70,6 +71,8 @@ command(S) ->
       %% where
       ++ [ {call,?MODULE,where, [key()]} ]
 
+      ++ [ {call,?MODULE,await_new, [key()]} ]
+
       %% kill
       ++ [ oneof([
                   %% kill
@@ -95,6 +98,13 @@ command(S) ->
                   , {call,?MODULE,lookup_pids,    [key()]}
                  ])
            || S#state.pids/=[] ]
+
+      ++ [
+          %% await on existing value
+          {call,?MODULE,await_existing, [elements(S#state.regs)]}
+          || S#state.regs/=[] ]
+
+
      ).
 
 %% generator class
@@ -117,8 +127,8 @@ value() -> frequency([{8, int()}, {1, undefined}, {1, make_ref()}]).
 %% helpers
 is_register_ok(_S,_Pid,#key{class=c},Value) when not is_integer(Value) ->
     false;
-is_register_ok(_S,_Pid,#key{class=a},Value) when not is_integer(Value) ->
-    false;
+is_register_ok(_S,_Pid,#key{class=a},Value) ->
+    Value == undefined;
 is_register_ok(S,Pid,Key,_Value) ->
     [] == [ Pid1 || #reg{pid=Pid1,key=Key1}
                         <- S#state.regs, is_register_eq(Pid,Key,Pid1,Key1) ].
@@ -266,6 +276,8 @@ next_state(S,_V,{call,_,_,_}) ->
 
 %% Precondition, checked before command is added to the command
 %% sequence
+precondition(S, {call,_,await_new,[Key]}) ->
+    not lists:keymember(Key,#reg.key,S#state.regs);
 precondition(_S,{call,_,_,_}) ->
     true.
 
@@ -300,7 +312,8 @@ postcondition(S,{call,_,where,[#key{class=Class}=Key]},Res) ->
 postcondition(S,{call,_,reg,[Pid,Key,Value]},Res) ->
     case Res of
         true ->
-            is_register_ok(S,Pid,Key,Value);
+            is_register_ok(S,Pid,Key,Value) andalso
+                check_waiters(Pid, Key, Value, S#state.waiters);
         {'EXIT', {badarg, _}} ->
             is_unregister_ok(S,Pid,Key)
                 orelse not is_register_ok(S,Pid,Key,Value)
@@ -362,6 +375,8 @@ postcondition(S,{call,_,lookup_pids,[#key{class=Class}=Key]},Res)
     Pids = [ Pid1 || #reg{pid=Pid1,key=Key1} <- S#state.regs
                          , Key==Key1 ],
     lists:sort(Res) == lists:sort(Pids);
+postcondition(S,{call,_,await_new,[#key{}=Key]}, Pid) ->
+    is_pid(Pid);
 %% postcondition(_S,{call,_,lookup_pids,[_Key]},Res) ->
 %%     case Res of {'EXIT', {badarg, _}} -> true; _ -> false end;
 %% otherwise
@@ -499,3 +514,40 @@ do(Pid, F) ->
     after 3000 ->
             {'EXIT', timeout}
     end.
+
+
+await_existing(#key{class=Class,scope=Scope,name=Name}) ->
+    %% short timeout, this call is expected to work
+    gproc:await({Class,Scope,Name}, 1000).
+
+await_new(#key{class=Class,scope=Scope,name=Name} = Key) ->
+    spawn_link(
+      fun() ->
+              Res = (catch gproc:await({Class,Scope,Name}, infinity)),
+              receive
+                  {From, send_result} ->
+                      From ! {result, Key, Res}
+              end
+      end).
+
+check_waiters(Pid, Key, Value, Waiters) ->
+    case [W || {Key, W} <- Waiters] of
+        [] ->
+            true;
+        WPids ->
+            lists:all(fun(WPid) ->
+                              check_waiter(WPid, Key, Value)
+                      end, WPids)
+    end.
+
+check_waiter(Pid, Key, Value) ->   
+    MRef = erlang:monitor(process, Pid),
+    Pid ! {self(), send_result},
+    receive
+        {result, Res} ->
+            Res == {Key, {Pid, Value}};
+        {'DOWN', MRef, _, _, R} ->
+            erlang:error(R)
+    after 1000 ->
+            erlang:error(timeout)
+    end.

+ 6 - 10
src/gproc.erl

@@ -334,13 +334,11 @@ reg({_,g,_} = Key, Value) ->
     gproc_dist:reg(Key, Value);
 reg({p,l,_} = Key, Value) ->
     local_reg(Key, Value);
-reg({T,l,_} = Key, Value) when T==n; T==c; T==a ->
-    %% local names, counters and aggregated counters
-    if T =:= n orelse is_integer(Value) ->
-			true;
-       true ->
-            erlang:error(badarg)
-    end,
+reg({a,l,_} = Key, undefined) ->
+    call({reg, Key, undefined});
+reg({c,l,_} = Key, Value) when is_integer(Value) ->
+    call({reg, Key, Value});
+reg({n,l,_} = Key, Value) ->
     call({reg, Key, Value});
 reg(_, _) ->
     erlang:error(badarg).
@@ -599,11 +597,9 @@ send({T,C,_} = Key, Msg) when C==l; C==g ->
        T==p orelse T==c ->
             %% BUG - if the key part contains select wildcards, we may end up
             %% sending multiple messages to the same pid
-            Head = {{Key,'$1'},'_'},
-            Pids = ets:select(?TAB, [{Head,[],['$1']}]),
             lists:foreach(fun(Pid) ->
                                   Pid ! Msg
-                          end, Pids),
+                          end, lookup_pids(Key)),
             Msg;
        true ->
             erlang:error(badarg)

+ 36 - 9
src/gproc_dist.erl

@@ -36,7 +36,8 @@
 	 handle_leader_call/4,
 	 handle_leader_cast/3,
 	 handle_DOWN/3,
-	 elected/2,
+         elected/2,  % original version
+	 elected/3,  
 	 surrendered/3,
 	 from_leader/3,
 	 code_change/4,
@@ -46,15 +47,21 @@
 
 -define(SERVER, ?MODULE).
 
--record(state, {is_leader}).
+-record(state, {
+          always_broadcast = false,
+          is_leader}).
 
 
 start_link() ->
-    start_link([node()|nodes()]).
+    start_link({[node()|nodes()], []}).
 
-start_link(Nodes) ->
+start_link(Nodes) when is_list(Nodes) ->
+    start_link({Nodes, []});
+start_link({Nodes, Opts}) ->
     gen_leader:start_link(
-      ?SERVER, Nodes, [],?MODULE, [], [{debug,[trace]}]).
+      ?SERVER, Nodes, Opts, ?MODULE, [], []).
+    
+%%       ?SERVER, Nodes, [],?MODULE, [], [{debug,[trace]}]).
 
 %%% @spec({Class,Scope, Key}, Value) -> true
 %%% @doc
@@ -129,8 +136,25 @@ handle_info(_, S) ->
 
 
 elected(S, _E) ->
-    Globs = ets:select(?TAB, [{{{{'_',g,'_'},'_'},'_','_'},[],['$_']}]),
-    {ok, {globals, Globs}, S#state{is_leader = true}}.
+    {ok, {globals,globs()}, S#state{is_leader = true}}.
+
+elected(S, _E, undefined) ->
+    %% I have become leader; full synch
+    {ok, {globals, globs()}, S#state{is_leader = true}};
+elected(S, _E, _Node) ->
+    Synch = {globals, globs()},
+    if not S#state.always_broadcast ->
+            %% Another node recognized us as the leader.
+            %% Don't broadcast all data to everyone else
+            {reply, Synch, S};
+       true ->
+            %% Main reason for doing this is if we are using a gen_leader
+            %% that doesn't support the 'reply' return value
+            {ok, Synch, S}
+    end.
+
+globs() ->
+    ets:select(?TAB, [{{{{'_',g,'_'},'_'},'_','_'},[],['$_']}]).
 
 surrendered(S, {globals, Globs}, _E) ->
     %% globals from this node should be more correct in our table than
@@ -302,8 +326,11 @@ leader_cast(Msg) ->
 	     
 
 
-init([]) ->
-    {ok, #state{}}.
+init(Opts) ->
+    S0 = #state{},
+    AlwaysBcast = proplists:get_value(always_broadcast, Opts,
+                                      S0#state.always_broadcast),
+    {ok, #state{always_broadcast = AlwaysBcast}}.
 
 
 surrendered_1(Globs) ->