Browse Source

chasing timing issues with gproc_dist_tests

Ulf Wiger 14 years ago
parent
commit
fdc3283dbb
4 changed files with 126 additions and 257 deletions
  1. 10 208
      src/gproc.erl
  2. 5 4
      src/gproc_dist.erl
  3. 15 9
      src/gproc_lib.erl
  4. 96 36
      test/gproc_dist_tests.erl

+ 10 - 208
src/gproc.erl

@@ -315,7 +315,13 @@ request_wait({n,C,_} = Key, Timeout) when C==l; C==g ->
                _ ->
                    erlang:error(badarg, [Key, Timeout])
            end,
-    WRef = call({await,Key,self()}, C),
+    WRef = case {call({await,Key,self()}, C), C} of
+	       {{R, {Kg,Pg,Vg}}, g} ->
+		   self() ! {gproc, R, registered, {Kg,Pg,Vg}},
+		   R;
+	       {R,_} ->
+		   R
+	   end,
     receive
         {gproc, WRef, registered, {_K, Pid, V}} ->
 	    case TRef of
@@ -855,11 +861,11 @@ handle_call({unreg, {_,l,_} = Key}, {Pid,_}, S) ->
         false ->
             {reply, badarg, S}
     end;
-handle_call({await, {_,l,_} = Key, Pid}, {_, Ref}, S) ->
+handle_call({await, {_,l,_} = Key, Pid}, From, S) ->
     %% Passing the pid explicitly is needed when leader_call is used,
     %% since the Pid given as From in the leader is the local gen_leader
     %% instance on the calling node.
-    case gproc_lib:await(Key, {Pid, Ref}) of
+    case gproc_lib:await(Key, Pid, From) of
         noreply ->
             {noreply, S};
         {reply, Reply, _} ->
@@ -929,7 +935,7 @@ call(Req, g) ->
 chk_reply(Reply, Req) ->
     case Reply of
         badarg -> erlang:error(badarg, Req);
-        Reply  -> Reply
+        _  -> Reply
     end.
 
 
@@ -1372,207 +1378,3 @@ is_unique({_,n}) -> true;
 is_unique({_,a}) -> true;
 is_unique(_) -> false.
 
-
-%% =============== EUNIT tests
-
-reg_test_() ->
-    {setup,
-     fun() ->
-             application:start(gproc)
-     end,
-     fun(_) ->
-             application:stop(gproc)
-     end,
-     [
-      {spawn, ?_test(t_simple_reg())}
-      , ?_test(t_is_clean())
-      , {spawn, ?_test(t_simple_prop())}
-      , ?_test(t_is_clean())
-      , {spawn, ?_test(t_await())}
-      , ?_test(t_is_clean())
-      , {spawn, ?_test(t_simple_mreg())}
-      , ?_test(t_is_clean())
-      , {spawn, ?_test(t_gproc_crash())}
-      , ?_test(t_is_clean())
-      , {spawn, ?_test(t_cancel_wait_and_register())}
-      , ?_test(t_is_clean())
-      , {spawn, ?_test(t_give_away_to_pid())}
-      , ?_test(t_is_clean())
-      , {spawn, ?_test(t_give_away_to_self())}
-      , ?_test(t_is_clean())
-      , {spawn, ?_test(t_give_away_badarg())}
-      , ?_test(t_is_clean())
-      , {spawn, ?_test(t_give_away_to_unknown())}
-      , ?_test(t_is_clean())
-      , {spawn, ?_test(t_give_away_and_back())}
-      , ?_test(t_is_clean())
-     ]}.
-
-t_simple_reg() ->
-    ?assert(gproc:reg({n,l,name}) =:= true),
-    ?assert(gproc:where({n,l,name}) =:= self()),
-    ?assert(gproc:unreg({n,l,name}) =:= true),
-    ?assert(gproc:where({n,l,name}) =:= undefined).
-
-
-                       
-t_simple_prop() ->
-    ?assert(gproc:reg({p,l,prop}) =:= true),
-    ?assert(t_other_proc(fun() ->
-                                 ?assert(gproc:reg({p,l,prop}) =:= true)
-                         end) =:= ok),
-    ?assert(gproc:unreg({p,l,prop}) =:= true).
-
-t_other_proc(F) ->
-    {_Pid,Ref} = spawn_monitor(fun() -> exit(F()) end),
-    receive
-        {'DOWN',Ref,_,_,R} ->
-            R
-    after 10000 ->
-            erlang:error(timeout)
-    end.
-
-t_await() ->
-    Me = self(),
-    {_Pid,Ref} = spawn_monitor(
-                   fun() -> exit(?assert(gproc:await({n,l,t_await}) =:= {Me,val})) end),
-    ?assert(gproc:reg({n,l,t_await},val) =:= true),
-    receive
-        {'DOWN', Ref, _, _, R} ->
-            ?assertEqual(R, ok)
-    after 10000 ->
-            erlang:error(timeout)
-    end.
-
-t_is_clean() ->
-    sys:get_status(gproc), % in order to synch
-    T = ets:tab2list(gproc),
-    ?assert(T =:= []).
-                                        
-
-t_simple_mreg() ->
-    ok.
-
-
-t_gproc_crash() ->
-    P = spawn_helper(),
-    ?assert(gproc:where({n,l,P}) =:= P),
-    exit(whereis(gproc), kill),
-    give_gproc_some_time(100),
-    ?assert(whereis(gproc) =/= undefined),
-    %%
-    %% Check that the registration is still there using an ets:lookup(),
-    %% Once we've killed the process, gproc will always return undefined
-    %% if the process is not alive, regardless of whether the registration
-    %% is still there. So, here, the lookup should find something...
-    %%
-    ?assert(ets:lookup(gproc,{{n,l,P},n}) =/= []),
-    ?assert(gproc:where({n,l,P}) =:= P),
-    exit(P, kill),
-    %% ...and here, it shouldn't.
-    %% (sleep for a while first to let gproc handle the EXIT
-    give_gproc_some_time(10),
-    ?assert(ets:lookup(gproc,{{n,l,P},n}) =:= []).
-
-t_cancel_wait_and_register() ->
-    Alias = {n, l, foo},
-    Me = self(),
-    P = spawn(fun() ->
-		      {'EXIT',_} = (catch gproc:await(Alias, 100)),
-		      ?assert(element(1,sys:get_status(gproc)) == status),
-		      Me ! {self(), go_ahead},
-		      timer:sleep(infinity)
-	      end),
-    receive
-	{P, go_ahead} ->
-	    ?assertEqual(gproc:reg(Alias, undefined), true),
-	    exit(P, kill),
-	    timer:sleep(500),
-	    ?assert(element(1,sys:get_status(gproc)) == status)
-    end.
-
-
-t_give_away_to_pid() ->
-    From = {n, l, foo},
-    Me = self(),
-    P = spawn_link(fun t_loop/0),
-    ?assertEqual(true, gproc:reg(From, undefined)),
-    ?assertEqual(Me, gproc:where(From)),
-    ?assertEqual(P, gproc:give_away(From, P)),
-    ?assertEqual(P, gproc:where(From)),
-    ?assertEqual(ok, t_call(P, die)).
-
-t_give_away_to_self() ->
-    From = {n, l, foo},
-    Me = self(),
-    ?assertEqual(true, gproc:reg(From, undefined)),
-    ?assertEqual(Me, gproc:where(From)),
-    ?assertEqual(Me, gproc:give_away(From, Me)),
-    ?assertEqual(Me, gproc:where(From)),
-    ?assertEqual(true, gproc:unreg(From)).
-
-t_give_away_badarg() ->
-    From = {n, l, foo},
-    Me = self(),
-    ?assertEqual(undefined, gproc:where(From)),
-    ?assertError(badarg, gproc:give_away(From, Me)).
-
-t_give_away_to_unknown() ->
-    From = {n, l, foo},
-    Unknown = {n, l, unknown},
-    Me = self(),
-    ?assertEqual(true, gproc:reg(From, undefined)),
-    ?assertEqual(Me, gproc:where(From)),
-    ?assertEqual(undefined, gproc:where(Unknown)),
-    ?assertEqual(undefined, gproc:give_away(From, Unknown)),
-    ?assertEqual(undefined, gproc:where(From)).
-
-t_give_away_and_back() ->
-    From = {n, l, foo},
-    Me = self(),
-    P = spawn_link(fun t_loop/0),
-    ?assertEqual(true, gproc:reg(From, undefined)),
-    ?assertEqual(Me, gproc:where(From)),
-    ?assertEqual(P, gproc:give_away(From, P)),
-    ?assertEqual(P, gproc:where(From)),
-    ?assertEqual(ok, t_call(P, {give_away, From})),
-    ?assertEqual(Me, gproc:where(From)),
-    ?assertEqual(ok, t_call(P, die)).
-
-t_loop() ->
-    receive
-	{From, {give_away, Key}} ->
-	    ?assertEqual(From, gproc:give_away(Key, From)),
-	    From ! {self(), ok},
-	    t_loop();
-	{From, die} ->
-	    From ! {self(), ok}
-    end.
-    
-
-t_call(P, Msg) ->
-    P ! {self(), Msg},
-    receive
-	{P, Reply} ->
-	    Reply
-    end.
-
-spawn_helper() ->
-    Parent = self(),
-    P = spawn(fun() ->
-		      ?assert(gproc:reg({n,l,self()}) =:= true),
-		      Ref = erlang:monitor(process, Parent),
-		      Parent ! {ok,self()},
-		      receive 
-			  {'DOWN', Ref, _, _, _} ->
-			      ok
-		      end
-	      end),
-    receive
-	{ok,P} ->
-	     P
-    end.
-
-give_gproc_some_time(T) ->
-    timer:sleep(T),
-    sys:get_status(gproc).

+ 5 - 4
src/gproc_dist.erl

@@ -293,12 +293,13 @@ handle_leader_call({set,{T,g,N} =K,V,Pid}, _From, S, _E) ->
 		    {reply, badarg, S}
 	    end
     end;
-handle_leader_call({await, Key, Pid}, {_,Ref} = _From, S, _E) ->
+handle_leader_call({await, Key, Pid}, {_,Ref} = From, S, _E) ->
     %% The pid in _From is of the gen_leader instance that forwarded the
     %% call - not of the client. This is why the Pid is explicitly passed.
-    case gproc_lib:await(Key, {Pid,Ref}) of
-        noreply ->
-            {noreply, S};
+    %% case gproc_lib:await(Key, {Pid,Ref}) of
+    case gproc_lib:await(Key, Pid, From) of
+	{reply, {Ref, {K, P, V}}} ->
+	    {reply, {Ref, {K, P, V}}, S};
         {reply, Reply, Insert} ->
             {reply, Reply, [{insert, Insert}], S}
     end;

+ 15 - 9
src/gproc_lib.erl

@@ -117,24 +117,30 @@ insert_objects(Objs) ->
       end, Objs).
 
 
-await({T,C,_} = Key, {Pid, Ref} = From) ->
-    Rev = {{Pid,Key}, r},
+await({T,C,_} = Key, WPid, {_Pid, Ref} = From) ->
+    Rev = {{WPid,Key}, r},
     case ets:lookup(?TAB, {Key,T}) of
         [{_, P, Value}] ->
             %% for symmetry, we always reply with Ref and then send a message
-            gen_server:reply(From, Ref),
-            Pid ! {gproc, Ref, registered, {Key, P, Value}},
-            noreply;
+	    if C == g ->
+		    %% in the global case, we bundle the reply, since otherwise
+		    %% the messages can pass each other
+		    {reply, {Ref, {Key, P, Value}}};
+	       true ->
+		    gen_server:reply(From, Ref),
+		    WPid ! {gproc, Ref, registered, {Key, P, Value}},
+		    noreply
+	    end;
         [{K, Waiters}] ->
-            NewWaiters = [{Pid,Ref} | Waiters],
+            NewWaiters = [{WPid,Ref} | Waiters],
             W = {K, NewWaiters},
             ets:insert(?TAB, [W, Rev]),
-            gproc_lib:ensure_monitor(Pid,C),
+            gproc_lib:ensure_monitor(WPid,C),
             {reply, Ref, [W,Rev]};
         [] ->
-            W = {{Key,T}, [{Pid,Ref}]},
+            W = {{Key,T}, [{WPid,Ref}]},
             ets:insert(?TAB, [W, Rev]),
-            gproc_lib:ensure_monitor(Pid,C),
+            gproc_lib:ensure_monitor(WPid,C),
             {reply, Ref, [W,Rev]}
     end.
 

+ 96 - 36
test/gproc_dist_tests.erl

@@ -22,32 +22,44 @@
 -export([t_spawn/1, t_spawn_reg/2]).
 
 dist_test_() ->
-    {timeout, 60,
-     [{foreach,
+    {timeout, 90,
+     [{setup,
        fun() ->
 	       Ns = start_slaves([n1, n2]),
-	       %% dbg:tracer(),
-	       %% [dbg:n(N) || N <- Ns],
-	       %% dbg:tpl(gproc_dist, x),
-	       %% dbg:p(all,[c]),
-	       Ns
+	       ?assertMatch({[ok,ok],[]},
+			    rpc:multicall(Ns, application, start, [gproc])),
+	       %% Without this trace output, the test times out on my Mac...
+	       dbg:tracer(),
+	       dbg:tpl(?MODULE, x),
+	       dbg:p(all,[c]),
+	       ?debugVal(Ns)
        end,
        fun(Ns) ->
 	       [rpc:call(N, init, stop, []) || N <- Ns]
        end,
-       [
-	{with, [fun(_) -> {in_parallel, [fun(X) ->
-						 ?debugVal(t_simple_reg(X)) end,
-					 fun(X) ->
-						 ?debugVal(t_await_reg(X))
-					 end,
-					 fun(X) ->
-						 ?debugVal(t_give_away(X))
-					 end]
-			  }
-		end]}
-       ]}
-     ]}.
+       fun(Ns) ->
+	       {inorder,
+		[
+		 {inparallel, [fun() ->
+				       ?debugVal(t_simple_reg(Ns))
+			       end,
+			       fun() ->
+			       	       ?debugVal(t_await_reg(Ns))
+			       end,
+			       fun() ->
+			       	       ?debugVal(t_await_reg_exists(Ns))
+			       end,
+			       fun() ->
+				       ?debugVal(t_give_away(Ns))
+			       end
+			      ]
+		 },
+		 {timeout, 60, [fun() ->
+					?debugVal(t_fail_node(Ns))
+				end]}
+		]}
+       end
+      }]}.
 
 -define(T_NAME, {n, g, {?MODULE, ?LINE}}).
 
@@ -64,11 +76,40 @@ t_await_reg([A,B|_]) ->
     ?debugMsg(t_await_reg),
     Name = ?T_NAME,
     P = t_spawn(A),
-    P ! {self(), {apply, gproc, await, [Name]}},
+    Ref = erlang:monitor(process, P),
+    P ! {self(), Ref, {apply, gproc, await, [Name]}},
+    t_sleep(),
+    P1 = t_spawn_reg(B, Name),
+    ?assert(P1 == receive
+		      {P, Ref, Res} ->
+			  element(1, Res);
+		      {'DOWN', Ref, _, _, Reason} ->
+			  erlang:error(Reason);
+		      Other ->
+			  erlang:error({received,Other})
+		  end),
+    ?assertMatch(ok, t_call(P, die)),
+    ?assertMatch(ok, t_call(P1, die)).
+
+t_await_reg_exists([A,B|_]) ->
+    ?debugMsg(t_await_reg_exists),
+    Name = ?T_NAME,
+    P = t_spawn(A),
+    Ref = erlang:monitor(process, P),
+     %% dbg:tracer(),
+     %% [dbg:n(N) || N <- Ns],
+     %% dbg:tpl(gproc_dist,x),
+     %% dbg:tpl(gproc_lib,await,x),
+     %% dbg:p(all,[c]),
     P1 = t_spawn_reg(B, Name),
+    P ! {self(), Ref, {apply, gproc, await, [Name]}},
     ?assert(P1 == receive
-		      {P, Res} ->
-			  element(1, Res)
+		      {P, Ref, Res} ->
+			  element(1, Res);
+		      {'DOWN', Ref, _, _, Reason} ->
+			  erlang:error(Reason);
+		      Other ->
+			  erlang:error({received,Other})
 		  end),
     ?assertMatch(ok, t_call(P, die)),
     ?assertMatch(ok, t_call(P1, die)).
@@ -81,18 +122,31 @@ t_give_away([A,B|_] = Ns) ->
     Pb = t_spawn_reg(B, Nb),
     ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pa)),
     ?assertMatch(ok, t_lookup_everywhere(Nb, Ns, Pb)),
-    %% ?debugHere,
     ?assertMatch(Pb, t_call(Pa, {apply, {gproc, give_away, [Na, Nb]}})),
     ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pb)),
-    %% ?debugHere,
     ?assertMatch(Pa, t_call(Pa, {apply, {gproc, give_away, [Na, Pa]}})),
     ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pa)),
-    %% ?debugHere,
     ?assertMatch(ok, t_call(Pa, die)),
     ?assertMatch(ok, t_call(Pb, die)).
+
+t_fail_node([A,B|_] = Ns) ->
+    ?debugMsg(t_fail_node),
+    Na = ?T_NAME,
+    Nb = ?T_NAME,
+    Pa = t_spawn_reg(A, Na),
+    Pb = t_spawn_reg(B, Nb),
+    ?assertMatch(ok, rpc:call(A, application, stop, [gproc])),
+    ?assertMatch(ok, t_lookup_everywhere(Na, Ns -- [A], undefined)),
+    ?assertMatch(ok, t_lookup_everywhere(Nb, Ns -- [A], Pb)),
+    ?assertMatch(ok, rpc:call(A, application, start, [gproc])),
+    ?assertMatch(ok, t_lookup_everywhere(Na, Ns, undefined)),
+    ?assertMatch(ok, t_lookup_everywhere(Nb, Ns, Pb)),
+    ?assertMatch(ok, t_call(Pa, die)),
+    ?assertMatch(ok, t_call(Pb, die)).
+    
     
 t_sleep() ->
-    timer:sleep(1000).
+    timer:sleep(500).
 
 t_lookup_everywhere(Key, Nodes, Exp) ->
     t_lookup_everywhere(Key, Nodes, Exp, 3).
@@ -133,29 +187,35 @@ t_spawn_reg(Node, Name) ->
     end.
 
 t_call(P, Req) ->
-    P ! {self(), Req},
+    Ref = erlang:monitor(process, P),
+    P ! {self(), Ref, Req},
     receive
-	{P, Res} ->
-	    Res
+	{P, Ref, Res} ->
+	    erlang:demonitor(Ref),
+	    Res;
+	{'DOWN', Ref, _, _, Error} ->
+	    erlang:error({'DOWN', P, Error})
     end.
 
 t_loop() ->
     receive
-	{From, die} ->
-	    From ! {self(), ok};
-	{From, {apply, M, F, A}} ->
-	    From ! {self(), apply(M, F, A)},
+	{From, Ref, die} ->
+	    From ! {self(), Ref, ok};
+	{From, Ref, {apply, M, F, A}} ->
+	    From ! {self(), Ref, apply(M, F, A)},
 	    t_loop()
     end.
 
 start_slaves(Ns) ->
-    [start_slave(N) || N <- Ns].
+    [H|T] = Nodes = [start_slave(N) || N <- Ns],
+    _ = [{N, rpc:call(H, net, ping, [N])} || N <- T],
+    Nodes.
 	       
 start_slave(Name) ->
     case node() of
         nonode@nohost ->
             os:cmd("epmd -daemon"),
-            {ok, _} = net_kernel:start([gproc_master, shortnames]);
+            {ok, _} = net_kernel:start([gproc_master, longnames]);
         _ ->
             ok
     end,