Browse Source

bug fixes + test case for distr standby mon

Ulf Wiger 11 years ago
parent
commit
a2d87354e2
5 changed files with 79 additions and 38 deletions
  1. 4 0
      src/gproc.erl
  2. 44 30
      src/gproc_dist.erl
  3. 10 7
      src/gproc_lib.erl
  4. 3 1
      src/gproc_monitor.erl
  5. 18 0
      test/gproc_dist_tests.erl

+ 4 - 0
src/gproc.erl

@@ -2075,6 +2075,10 @@ handle_call({monitor, {T,l,_} = Key, Pid, Type}, _From, S)
     _ = case {where(Key), Type} of
 	    {undefined, info} ->
 		Pid ! {gproc, unreg, Ref, Key};
+            {undefined, standby} ->
+                true = gproc_lib:insert_reg(Key, undefined, Pid, l),
+                Pid ! {gproc, {failover, Pid}, Ref, Key},
+                _ = gproc_lib:ensure_monitor(Pid, l);
 	    {RegPid, _} ->
                 _ = gproc_lib:ensure_monitor(Pid, l),
 		case ets:lookup(?TAB, {RegPid, Key}) of

+ 44 - 30
src/gproc_dist.erl

@@ -477,7 +477,8 @@ handle_leader_call({unreg, {T,g,Name} = K, Pid}, _From, S, _E) ->
                             {reply, true, [{delete, [Key, {Pid,K}]}], S}
                     end;
                true ->
-                    {reply, true, [{delete, [Key, {Pid,K}]}], S}
+                    {reply, true, [{notify, [{K, Pid, unreg}]},
+                                   {delete, [Key, {Pid,K}]}], S}
             end;
         false ->
             {reply, badarg, S}
@@ -498,14 +499,16 @@ handle_leader_call({give_away, {T,g,_} = K, To, Pid}, _From, S, _E)
                     Rev = {Pid, K},
                     ets:delete(?TAB, Rev),
                     gproc_lib:notify({migrated, ToPid}, K, Opts),
-                    {reply, ToPid, [{delete, [Key, Rev]},
-				    {insert, [{Key, ToPid, Value}]}], S};
+                    {reply, ToPid, [{insert, [{Key, ToPid, Value}]},
+                                    {notify, [{K, Pid, {migrated, ToPid}}]},
+				    {delete, [Rev]}], S};
                 undefined ->
                     ets:delete(?TAB, Key),
                     Rev = {Pid, K},
                     ets:delete(?TAB, Rev),
                     gproc_lib:notify(unreg, K, Opts),
-                    {reply, undefined, [{delete, [Key, Rev]}], S}
+                    {reply, undefined, [{notify, [{K, Pid, unreg}]},
+                                        {delete, [Key, Rev]}], S}
             end;
         _ ->
             {reply, badarg, S}
@@ -643,7 +646,7 @@ mk_broadcast_insert_vals(Objs) ->
 
 
 process_globals(Globals) ->
-    Modified =
+    {Modified, Notifications} =
         lists:foldl(
           fun({{T,_,_} = Key, Pid}, A) when T==n; T==a ->
                   case ets:lookup(?TAB, {Pid,Key}) of
@@ -652,38 +655,39 @@ process_globals(Globals) ->
                       _ ->
                           A
                   end;
-             ({{T,_,_} = Key, Pid}, A) ->
-                  A1 = case T of
-                           c ->
-                               Incr = ets:lookup_element(?TAB, {Key,Pid}, 3),
-                               update_aggr_counter(Key, -Incr) ++ A;
-                           _ ->
-                               A
-                       end,
-                  remove_entry(Key, Pid, unreg),
-                  A1
-          end, [], Globals),
+             ({{T,_,_} = Key, Pid}, {MA,NA}) ->
+                  MA1 = case T of
+                            c ->
+                                Incr = ets:lookup_element(?TAB, {Key,Pid}, 3),
+                                update_aggr_counter(Key, -Incr) ++ MA;
+                            _ ->
+                               MA
+                        end,
+                  N = remove_entry(Key, Pid, unreg),
+                  {MA1, N ++ NA}
+          end, {[],[]}, Globals),
     [{insert, Modified} || Modified =/= []] ++
+        [{notify, Notifications} || Notifications =/= []] ++
 	[{delete, Globals} || Globals =/= []].
 
-maybe_failover({T,_,_} = Key, Pid, Opts, Acc) ->
+maybe_failover({T,_,_} = Key, Pid, Opts, {MAcc, NAcc}) ->
     Opts = get_opts(Pid, Key),
     case filter_standbys(gproc_lib:standbys(Opts)) of
         [] ->
-            remove_entry(Key, Pid, unreg),
-            Acc;
+            Notify = remove_entry(Key, Pid, unreg),
+            {MAcc, Notify ++ NAcc};
         [{ToPid,Ref,_}|_] ->
             Value = case ets:lookup(?TAB, {Key,T}) of
                         [{_, _, V}] -> V;
                         _ -> undefined
                     end,
-            remove_rev_entry(Opts, Pid, Key, {failover, ToPid}),
+            Notify = remove_rev_entry(Opts, Pid, Key, {failover, ToPid}),
             Opts1 = gproc_lib:remove_monitor(Opts, ToPid, Ref),
             _ = gproc_lib:ensure_monitor(ToPid, g),
             NewReg = {{Key,T}, ToPid, Value},
             NewRev = {{ToPid, Key}, Opts1},
             ets:insert(?TAB, [NewReg, NewRev]),
-            [NewReg, NewRev | Acc]
+            {[NewReg, NewRev | MAcc], Notify ++ NAcc}
     end.
 
 filter_standbys(SBs) ->
@@ -708,9 +712,11 @@ remove_entry(Key, Pid, Event) ->
 remove_rev_entry(Opts, Pid, {T,g,_} = K, Event) when T==n; T==a ->
     Key = {Pid, K},
     gproc_lib:notify(Event, K, Opts),
-    ets:delete(?TAB, Key);
+    ets:delete(?TAB, Key),
+    [{K, Pid, Event}];
 remove_rev_entry(_, Pid, K, _Event) ->
-    ets:delete(?TAB, {Pid, K}).
+    ets:delete(?TAB, {Pid, K}),
+    [].
 
 get_opts(Pid, K) ->
     case ets:lookup(?TAB, {Pid, K}) of
@@ -733,22 +739,19 @@ from_leader(Ops, S, _E) ->
       fun({delete, Globals}) ->
               delete_globals(Globals);
          ({insert, Globals}) ->
-	      _ = insert_globals(Globals)
+	      _ = insert_globals(Globals);
+         ({notify, Events}) ->
+              do_notify(Events)
       end, Ops),
     {ok, S}.
 
 insert_globals(Globals) ->
     lists:foldl(
-      fun({{{_,_,_} = Key,Pid}, Pid, _} = Obj, A) ->
+      fun({{{_,_,_} = Key,_}, Pid, _} = Obj, A) ->
               ets:insert(?TAB, Obj),
 	      ets:insert_new(?TAB, {{Pid,Key}, []}),
 	      gproc_lib:ensure_monitor(Pid,g),
 	      A;
-	 ({{{_,_,_} = Key, _}, Pid, _} = Obj, A) ->
-              ets:insert(?TAB, Obj),
-              ets:insert_new(?TAB, {{Pid,Key}, []}),
-	      gproc_lib:ensure_monitor(Pid,g),
-	      A;
 	 ({{P,_K}, Opts} = Obj, A) when is_pid(P), is_list(Opts),Opts =/= [] ->
 	      ets:insert(?TAB, Obj),
 	      gproc_lib:ensure_monitor(P,g),
@@ -768,6 +771,17 @@ delete_globals(Globals) ->
 	      ets:delete(?TAB, {Pid, Key})
       end, Globals).
 
+do_notify([{K, P, E}|T]) ->
+    case ets:lookup(?TAB, {P,K}) of
+        [{_, Opts}] when is_list(Opts) ->
+            gproc_lib:notify(E, K, Opts);
+        _ ->
+            do_notify(T)
+    end;
+do_notify([]) ->
+    ok.
+
+
 ets_key({T,_,_} = K, _) when T==n; T==a ->
     {K, T};
 ets_key(K, Pid) ->

+ 10 - 7
src/gproc_lib.erl

@@ -64,13 +64,13 @@ insert_reg({T,_,Name} = K, Value, Pid, Scope) when T==a; T==n ->
                                 true
                         end
                 end,
-    Info = [{{K, T}, Pid, Value}, {{Pid,K}, []}],
-    case ets:insert_new(?TAB, Info) of
+    case ets:insert_new(?TAB, {{K,T}, Pid, Value}) of
         true ->
+            _ = ets:insert_new(?TAB, {{Pid,K}, []}),
             MaybeScan();
         false ->
             if T==n ->
-                    maybe_waiters(K, Pid, Value, T, Info);
+                    maybe_waiters(K, Pid, Value, T);
                true ->
                     false
             end
@@ -187,13 +187,13 @@ await({T,C,_} = Key, WPid, {_Pid, Ref} = From) ->
 
 
 
-maybe_waiters(K, Pid, Value, T, Info) ->
+maybe_waiters(K, Pid, Value, T) ->
     case ets:lookup(?TAB, {K,T}) of
         [{_, Waiters}] when is_list(Waiters) ->
-            ets:insert(?TAB, Info),
+            ets:insert(?TAB, [{{K,T}, Pid, Value}, {{Pid,K}, []}]),
             notify_waiters(Waiters, K, Pid, Value),
             true;
-        [_] ->
+        _ ->
             false
     end.
 
@@ -352,7 +352,10 @@ notify(Event, Key, Opts) ->
 notify_([{Pid,Ref}|T], Event, Key) ->
     Pid ! {gproc, Event, Ref, Key},
     notify_(T, Event, Key);
-notify_([{Pid,Ref,_}|T], Event, Key) ->
+notify_([{Pid,Ref,_}|T], Event, {_,l,_} = Key) ->
+    Pid ! {gproc, Event, Ref, Key},
+    notify_(T, Event, Key);
+notify_([{Pid,Ref,_}|T], Event, {_,g,_} = Key) when node(Pid) == node() ->
     Pid ! {gproc, Event, Ref, Key},
     notify_(T, Event, Key);
 notify_([_|T], Event, Key) ->

+ 3 - 1
src/gproc_monitor.erl

@@ -244,7 +244,9 @@ do_monitor(Name, undefined) ->
     case ets:member(?TAB, {w, Name}) of
 	false ->
 	    Ref = gproc:nb_wait(Name),
-	    ets:insert(?TAB, {{w, Name}, Ref})
+	    ets:insert(?TAB, {{w, Name}, Ref});
+        true ->
+            ok
     end;
 do_monitor(Name, Pid) when is_pid(Pid) ->
     case ets:member(?TAB, {m, Name}) of

+ 18 - 0
test/gproc_dist_tests.erl

@@ -88,6 +88,9 @@ dist_test_() ->
                                        ?debugVal(t_monitor(Ns))
                                end,
                                fun() ->
+                                       ?debugVal(t_standby_monitor(Ns))
+                               end,
+                               fun() ->
                                        ?debugVal(t_subscribe(Ns))
                                end
                            ]
@@ -312,6 +315,21 @@ t_monitor([A,B|_]) ->
     ?assertMatch(true, t_call(Pc, {apply, gproc, unreg, [Na]})),
     ?assertMatch({gproc,unreg,Ref1,Na}, got_msg(Pb, gproc)).
 
+t_standby_monitor([A,B|_] = Ns) ->
+    Na = ?T_NAME,
+    Pa = t_spawn_reg(A, Na),
+    Pb = t_spawn(B, _Selective = true),
+    Ref = t_call(Pb, {apply, gproc, monitor, [Na, standby]}),
+    ?assert(is_reference(Ref)),
+    ?assertMatch(ok, t_call(Pa, die)),
+    ?assertMatch({gproc,{failover,Pb},Ref,Na}, got_msg(Pb, gproc)),
+    ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pb)),
+    Pc = t_spawn(A, true),
+    Ref1 = t_call(Pc, {apply, gproc, monitor, [Na, standby]}),
+    ?assertMatch(true, t_call(Pb, {apply, gproc, unreg, [Na]})),
+    ?assertMatch({gproc,unreg,Ref1,Na}, got_msg(Pc, gproc)),
+    ?assertMatch(ok, t_lookup_everywhere(Na, Ns, undefined)).
+
 t_subscribe([A,B|_] = Ns) ->
     Na = ?T_NAME,
     Pb = t_spawn(B, _Selective = true),