Browse Source

make qlc prune dead entries; fn clause in await (g)

Ulf Wiger 13 years ago
parent
commit
beddb19a8f
2 changed files with 141 additions and 26 deletions
  1. 80 26
      src/gproc.erl
  2. 61 0
      test/gproc_tests.erl

+ 80 - 26
src/gproc.erl

@@ -165,6 +165,9 @@
                 ok
         end).
 
+-define(PID_IS_DEAD(Pid),
+	(node(Pid) == node() andalso is_process_alive(Pid) == false)).
+
 -record(state, {}).
 
 %% @spec () -> {ok, pid()}
@@ -671,6 +674,8 @@ await1(_, _, _) ->
     throw(badarg).
 
 
+request_wait({_,g,_} = Key, Timeout) ->
+    request_wait(undefined, Key, Timeout);
 request_wait(Key, Timeout) ->
     request_wait(node(), Key, Timeout).
 
@@ -841,6 +846,8 @@ cancel_wait1({_,l,_} = Key, Ref) ->
     cast({cancel_wait, self(), Key, Ref}, l),
     ok.
 
+cancel_wait1(undefined, {_,g,_} = Key, Ref) ->
+    cast({cancel_wait, self(), Key, Ref}, g);
 cancel_wait1(N, {_,l,_} = Key, Ref) ->
     cast(N, {cancel_wait, self(), Key, Ref}, l).
 
@@ -1900,7 +1907,11 @@ call(Req, g) ->
     chk_reply(gproc_dist:leader_call(Req)).
 
 call(N, Req, l) ->
-    chk_reply(gen_server:call({?MODULE, N}, Req)).
+    chk_reply(gen_server:call({?MODULE, N}, Req));
+call(undefined, Req, g) ->
+    %% we always call the leader
+    chk_reply(gproc_dist:leader_call(Req)).
+
 
 chk_reply(Reply) ->
     case Reply of
@@ -2294,9 +2305,11 @@ table(Context, Opts) ->
                  fun() -> qlc_next(Ctxt, first(Ctxt)) end;
              last_prev -> fun() -> qlc_prev(Ctxt, last(Ctxt)) end;
              select ->
-                 fun(MS) -> qlc_select(select(Ctxt, MS, NObjs)) end;
+                 fun(MS) -> qlc_select(
+			      select(Ctxt, wrap_qlc_ms_prod(MS), NObjs)) end;
              {select,MS} ->
-                 fun() -> qlc_select(select(Ctxt, MS, NObjs)) end;
+                 fun() -> qlc_select(
+			    select(Ctxt, wrap_qlc_ms_prod(MS), NObjs)) end;
              _ ->
                  erlang:error(badarg, [Ctxt,Opts])
          end,
@@ -2317,39 +2330,66 @@ table(Context, Opts) ->
                    {lookup_fun, LookupFun}] ++ [{K,V} || {K,V} <- Opts,
                                                          K =/= traverse,
                                                          K =/= n_objects]).
+
+wrap_qlc_ms_prod(Pats) ->
+    [ wrap_qlc_ms_prod_(P) || P <- Pats ].
+
+wrap_qlc_ms_prod_({H, Gs, [P]}) ->
+    {H, Gs, [{{ {element, 2, '$_'}, P }}]}.
+
 qlc_lookup(_Scope, 1, Keys) ->
     lists:flatmap(
       fun(Key) ->
-              ets:select(?TAB, [{ {{Key,'_'},'_','_'}, [],
-                                  [{{ {element,1,{element,1,'$_'}},
-                                      {element,2,'$_'},
-                                      {element,3,'$_'} }}] }])
+              remove_dead(
+		ets:select(?TAB, [{ {{Key,'_'},'_','_'}, [],
+				    [{{ {element,1,{element,1,'$_'}},
+					{element,2,'$_'},
+					{element,3,'$_'} }}] }]))
       end, Keys);
 qlc_lookup(Scope, 2, Pids) ->
     lists:flatmap(fun(Pid) ->
-                          Found =
-                              ets:select(?TAB, [{{{Pid, rev_keypat(Scope)}, '_'},
-						 [], ['$_']}]),
-                          lists:flatmap(
-                            fun({{_,{T,_,_}=K}, _}) ->
-                                    K2 = if T==n orelse T==a -> T;
-                                            true -> Pid
-                                         end,
-                                    case ets:lookup(?TAB, {K,K2}) of
-                                        [{{Key,_},_,Value}] ->
-                                            [{Key, Pid, Value}];
-                                        [] ->
-                                            []
-                                    end
-                            end, Found)
-                  end, Pids).
+			  qlc_lookup_pid(Pid, Scope)
+		  end, Pids).
+
+remove_dead(Objs) ->
+    [ Reg || {_, Pid, _} = Reg <- Objs,
+	     not ?PID_IS_DEAD(Pid) ].
+
+qlc_lookup_pid(Pid, Scope) ->
+    case ?PID_IS_DEAD(Pid) of
+	true ->
+	    [];
+	false ->
+	    Found =
+		ets:select(?TAB, [{{{Pid, rev_keypat(Scope)}, '_'},
+				   [], ['$_']}]),
+	    lists:flatmap(
+	      fun({{_,{T,_,_}=K}, _}) ->
+		      K2 = if T==n orelse T==a -> T;
+			      true -> Pid
+			   end,
+		      case ets:lookup(?TAB, {K,K2}) of
+			  [{{Key,_},_,Value}] ->
+			      [{Key, Pid, Value}];
+			  [] ->
+			      []
+		      end
+	      end, Found)
+    end.
 
 
 qlc_next(_, '$end_of_table') -> [];
 qlc_next(Scope, K) ->
     case ets:lookup(?TAB, K) of
         [{{Key,_}, Pid, V}] ->
-            [{Key,Pid,V}] ++ fun() -> qlc_next(Scope, next(Scope, K)) end;
+	    case ?PID_IS_DEAD(Pid) of
+		true ->
+		    qlc_next(Scope, next(Scope, K));
+		false ->
+		    [{Key,Pid,V}] ++ fun() ->
+					     qlc_next(Scope, next(Scope, K))
+				     end
+	    end;
         [] ->
             qlc_next(Scope, next(Scope, K))
     end.
@@ -2358,7 +2398,14 @@ qlc_prev(_, '$end_of_table') -> [];
 qlc_prev(Scope, K) ->
     case ets:lookup(?TAB, K) of
         [{{Key,_},Pid,V}] ->
-            [{Key,Pid,V}] ++ fun() -> qlc_prev(Scope, prev(Scope, K)) end;
+	    case ?PID_IS_DEAD(Pid) of
+		true ->
+		    qlc_prev(Scope, prev(Scope, K));
+		false ->
+		    [{Key,Pid,V}] ++ fun() ->
+					     qlc_prev(Scope, prev(Scope, K))
+				     end
+	    end;
         [] ->
             qlc_prev(Scope, prev(Scope, K))
     end.
@@ -2366,7 +2413,14 @@ qlc_prev(Scope, K) ->
 qlc_select('$end_of_table') ->
     [];
 qlc_select({Objects, Cont}) ->
-    Objects ++ fun() -> qlc_select(ets:select(Cont)) end.
+    case [O || {Pid,O} <- Objects,
+	       not ?PID_IS_DEAD(Pid)] of
+	[] ->
+	    %% re-run search
+	    qlc_select(ets:select(Cont));
+	Found ->
+	    Found ++ fun() -> qlc_select(ets:select(Cont)) end
+    end.
 
 
 is_unique(n) -> true;

+ 61 - 0
test/gproc_tests.erl

@@ -107,6 +107,8 @@ reg_test_() ->
       , ?_test(t_is_clean())
       , {spawn, ?_test(?debugVal(t_qlc()))}
       , ?_test(t_is_clean())
+      , {spawn, ?_test(?debugVal(t_qlc_dead()))}
+      , ?_test(t_is_clean())
       , {spawn, ?_test(?debugVal(t_get_env()))}
       , ?_test(t_is_clean())
       , {spawn, ?_test(?debugVal(t_get_set_env()))}
@@ -450,6 +452,65 @@ t_qlc() ->
 		 qlc:e(qlc:q([{K,P,V} || {K,P,V} <-
 					     gproc:table(all), P == self()]))).
 
+t_qlc_dead() ->
+    P0 = self(),
+    ?assertEqual(true, gproc:reg({n, l, {n,1}}, x)),
+    ?assertEqual(true, gproc:reg({p, l, {p,1}}, x)),
+    P1 = spawn(fun() ->
+		       Ref = erlang:monitor(process, P0),
+		       gproc:reg({n, l, {n,2}}, y),
+		       gproc:reg({p, l, {p,2}}, y),
+		       P0 ! {self(), ok},
+		       receive
+			   {P, goodbye} -> ok;
+			   {'DOWN', Ref, _, _, _} ->
+			       ok
+		       end
+	       end),
+    receive {P1, ok} -> ok end,
+    %% now, suspend gproc so it doesn't do cleanup
+    try  sys:suspend(gproc),
+	 exit(P1, kill),
+	 %% local names
+	 Exp1 = [{{n,l,{n,1}},self(),x}],
+	 ?assertEqual(Exp1,
+		      qlc:e(qlc:q([N || N <- gproc:table(names)]))),
+	 ?assertEqual(Exp1,
+		      qlc:e(qlc:q([N || {{n,l,_},_,_} = N <-
+					    gproc:table(names)]))),
+	 %% match local names on value
+	 Exp2 = [{{n,l,{n,1}},self(),x}],
+	 ?assertEqual(Exp2,
+		      qlc:e(qlc:q([N || {{n,l,_},_,x} = N <-
+					    gproc:table(names)]))),
+	 ?assertEqual([],
+		      qlc:e(qlc:q([N || {{n,l,_},_,y} = N <-
+					    gproc:table(names)]))),
+	 %% match all on value
+	 Exp3 = [{{n,l,{n,1}},self(),x},
+		 {{p,l,{p,1}},self(),x}],
+	 ?assertEqual(Exp3,
+		      qlc:e(qlc:q([N || {_,_,x} = N <- gproc:table(all)]))),
+	 ?assertEqual([],
+		      qlc:e(qlc:q([N || {_,_,y} = N <- gproc:table(all)]))),
+
+	 %% match all
+	 Exp4 = [{{n,l,{n,1}},self(),x},
+		 {{p,l,{p,1}},self(),x}],
+	 ?assertEqual(Exp4,
+		      qlc:e(qlc:q([X || X <- gproc:table(all)]))),
+	 %% match on pid
+	 ?assertEqual(Exp4,
+		 qlc:e(qlc:q([{K,P,V} || {K,P,V} <-
+					     gproc:table(all), P =:= self()]))),
+	 ?assertEqual([],
+		 qlc:e(qlc:q([{K,P,V} || {K,P,V} <-
+					     gproc:table(all), P =:= P1])))
+    after
+	sys:resume(gproc)
+    end.
+
+
 t_get_env() ->
     ?assertEqual(ok, application:set_env(gproc, ssss, "s1")),
     ?assertEqual(true, os:putenv("SSSS", "s2")),