Browse Source

add monitor(Key, follow), bug fixes for standby

Ulf Wiger 11 years ago
parent
commit
b8809f5541
9 changed files with 417 additions and 164 deletions
  1. 0 1
      doc/edoc-info
  2. 20 1
      doc/gproc.md
  3. 22 1
      doc/gproc_lib.md
  4. 60 14
      src/gproc.erl
  5. 64 3
      src/gproc_dist.erl
  6. 53 34
      src/gproc_lib.erl
  7. 33 100
      test/gproc_dist_tests.erl
  8. 126 0
      test/gproc_test_lib.erl
  9. 39 10
      test/gproc_tests.erl

+ 0 - 1
doc/edoc-info

@@ -1,4 +1,3 @@
-%% encoding: UTF-8
 {application,gproc}.
 {packages,[]}.
 {modules,[gproc,gproc_app,gproc_bcast,gproc_dist,gproc_info,gproc_init,

+ 20 - 1
doc/gproc.md

@@ -163,6 +163,18 @@ keypat() = {<a href="#type-sel_type">sel_type()</a> | <a href="#type-sel_var">se
 
 
 
+### <a name="type-monitor_type">monitor_type()</a> ###
+
+
+
+<pre><code>
+monitor_type() = info | standby | follow
+</code></pre>
+
+
+
+
+
 ### <a name="type-pidpat">pidpat()</a> ###
 
 
@@ -1119,7 +1131,7 @@ Equivalent to [`monitor(Key, info)`](#monitor-2).
 
 
 <pre><code>
-monitor(Key::<a href="#type-key">key()</a>, Type::info | standby) -&gt; reference()
+monitor(Key::<a href="#type-key">key()</a>, Type::<a href="#type-monitor_type">monitor_type()</a>) -&gt; reference()
 </code></pre>
 
 <br></br>
@@ -1143,9 +1155,16 @@ inherits the name, and a message `{gproc, {failover, ToPid}, Ref, Key}` is
 sent to all monitors, including the one that inherited the name.
 
 
+
 If the name is not yet registered, the unreg event is sent immediately.
 If the calling process in this case tried to start a `standby` monitoring,
 it receives the registered name and the failover event immediately.
+
+
+`monitor(Key, follow)` keeps monitoring the registered name even if it is
+temporarily unregistered. The messages received are the same as for the other
+monitor types, but `{gproc, registered, Ref, Key}` is also sent when a new
+process registers the name.
 <a name="mreg-3"></a>
 
 ### mreg/3 ###

+ 22 - 1
doc/gproc_lib.md

@@ -21,7 +21,7 @@ For a detailed description, see gproc/doc/erlang07-wiger.pdf.
 ## Function Index ##
 
 
-<table width="100%" border="1" cellspacing="0" cellpadding="2" summary="function index"><tr><td valign="top"><a href="#add_monitor-4">add_monitor/4</a></td><td></td></tr><tr><td valign="top"><a href="#await-3">await/3</a></td><td></td></tr><tr><td valign="top"><a href="#do_set_counter_value-3">do_set_counter_value/3</a></td><td></td></tr><tr><td valign="top"><a href="#do_set_value-3">do_set_value/3</a></td><td></td></tr><tr><td valign="top"><a href="#ensure_monitor-2">ensure_monitor/2</a></td><td></td></tr><tr><td valign="top"><a href="#insert_attr-4">insert_attr/4</a></td><td></td></tr><tr><td valign="top"><a href="#insert_many-4">insert_many/4</a></td><td></td></tr><tr><td valign="top"><a href="#insert_reg-4">insert_reg/4</a></td><td></td></tr><tr><td valign="top"><a href="#monitors-1">monitors/1</a></td><td></td></tr><tr><td valign="top"><a href="#notify-2">notify/2</a></td><td></td></tr><tr><td valign="top"><a href="#notify-3">notify/3</a></td><td></td></tr><tr><td valign="top"><a href="#remove_many-4">remove_many/4</a></td><td></td></tr><tr><td valign="top"><a href="#remove_monitor-3">remove_monitor/3</a></td><td></td></tr><tr><td valign="top"><a href="#remove_monitor_pid-2">remove_monitor_pid/2</a></td><td></td></tr><tr><td valign="top"><a href="#remove_monitors-3">remove_monitors/3</a></td><td></td></tr><tr><td valign="top"><a href="#remove_reg-3">remove_reg/3</a></td><td></td></tr><tr><td valign="top"><a href="#remove_reg-4">remove_reg/4</a></td><td></td></tr><tr><td valign="top"><a href="#remove_reverse_mapping-3">remove_reverse_mapping/3</a></td><td></td></tr><tr><td valign="top"><a href="#remove_reverse_mapping-4">remove_reverse_mapping/4</a></td><td></td></tr><tr><td valign="top"><a href="#remove_wait-4">remove_wait/4</a></td><td></td></tr><tr><td valign="top"><a href="#standbys-1">standbys/1</a></td><td></td></tr><tr><td valign="top"><a href="#update_aggr_counter-3">update_aggr_counter/3</a></td><td></td></tr><tr><td valign="top"><a href="#update_counter-3">update_counter/3</a></td><td></td></tr><tr><td valign="top"><a href="#valid_opts-2">valid_opts/2</a></td><td></td></tr></table>
+<table width="100%" border="1" cellspacing="0" cellpadding="2" summary="function index"><tr><td valign="top"><a href="#add_monitor-4">add_monitor/4</a></td><td></td></tr><tr><td valign="top"><a href="#await-3">await/3</a></td><td></td></tr><tr><td valign="top"><a href="#dbg-1">dbg/1</a></td><td></td></tr><tr><td valign="top"><a href="#do_set_counter_value-3">do_set_counter_value/3</a></td><td></td></tr><tr><td valign="top"><a href="#do_set_value-3">do_set_value/3</a></td><td></td></tr><tr><td valign="top"><a href="#ensure_monitor-2">ensure_monitor/2</a></td><td></td></tr><tr><td valign="top"><a href="#followers-1">followers/1</a></td><td></td></tr><tr><td valign="top"><a href="#insert_attr-4">insert_attr/4</a></td><td></td></tr><tr><td valign="top"><a href="#insert_many-4">insert_many/4</a></td><td></td></tr><tr><td valign="top"><a href="#insert_reg-4">insert_reg/4</a></td><td></td></tr><tr><td valign="top"><a href="#insert_reg-5">insert_reg/5</a></td><td></td></tr><tr><td valign="top"><a href="#monitors-1">monitors/1</a></td><td></td></tr><tr><td valign="top"><a href="#notify-2">notify/2</a></td><td></td></tr><tr><td valign="top"><a href="#notify-3">notify/3</a></td><td></td></tr><tr><td valign="top"><a href="#remove_many-4">remove_many/4</a></td><td></td></tr><tr><td valign="top"><a href="#remove_monitor-3">remove_monitor/3</a></td><td></td></tr><tr><td valign="top"><a href="#remove_monitor_pid-2">remove_monitor_pid/2</a></td><td></td></tr><tr><td valign="top"><a href="#remove_monitors-3">remove_monitors/3</a></td><td></td></tr><tr><td valign="top"><a href="#remove_reg-3">remove_reg/3</a></td><td></td></tr><tr><td valign="top"><a href="#remove_reg-4">remove_reg/4</a></td><td></td></tr><tr><td valign="top"><a href="#remove_reverse_mapping-3">remove_reverse_mapping/3</a></td><td></td></tr><tr><td valign="top"><a href="#remove_reverse_mapping-4">remove_reverse_mapping/4</a></td><td></td></tr><tr><td valign="top"><a href="#remove_wait-4">remove_wait/4</a></td><td></td></tr><tr><td valign="top"><a href="#standbys-1">standbys/1</a></td><td></td></tr><tr><td valign="top"><a href="#update_aggr_counter-3">update_aggr_counter/3</a></td><td></td></tr><tr><td valign="top"><a href="#update_counter-3">update_counter/3</a></td><td></td></tr><tr><td valign="top"><a href="#valid_opts-2">valid_opts/2</a></td><td></td></tr></table>
 
 
 <a name="functions"></a>
@@ -42,6 +42,13 @@ For a detailed description, see gproc/doc/erlang07-wiger.pdf.
 `await(Key, WPid, From) -> any()`
 
 
+<a name="dbg-1"></a>
+
+### dbg/1 ###
+
+`dbg(Mods) -> any()`
+
+
 <a name="do_set_counter_value-3"></a>
 
 ### do_set_counter_value/3 ###
@@ -63,6 +70,13 @@ For a detailed description, see gproc/doc/erlang07-wiger.pdf.
 `ensure_monitor(Pid, Scope) -> any()`
 
 
+<a name="followers-1"></a>
+
+### followers/1 ###
+
+`followers(Opts) -> any()`
+
+
 <a name="insert_attr-4"></a>
 
 ### insert_attr/4 ###
@@ -96,6 +110,13 @@ insert_reg(K::<a href="#type-key">key()</a>, Value::any(), Pid::pid() | shared,
 
 
 
+<a name="insert_reg-5"></a>
+
+### insert_reg/5 ###
+
+`insert_reg(K, Value, Pid, Scope, Event) -> any()`
+
+
 <a name="monitors-1"></a>
 
 ### monitors/1 ###

+ 60 - 14
src/gproc.erl

@@ -46,6 +46,8 @@
 %% @type reg_id() = {type(), scope(), any()}.
 %% @type unique_id() = {n | a, scope(), any()}.
 %%
+%% @type monitor_type() = info | standby | follow.
+%%
 %% @type sel_scope() = scope | all | global | local.
 %% @type sel_type() = type() | names | props | counters | aggr_counters.
 %% @type context() = {scope(), type()} | type(). {'all','all'} is the default
@@ -879,7 +881,7 @@ cancel_wait_or_monitor1({_,l,_} = Key) ->
 monitor(Key) ->
     ?CATCH_GPROC_ERROR(monitor1(Key, info), [Key]).
 
-%% @spec monitor(key(), info | standby) -> reference()
+%% @spec monitor(key(), monitor_type()) -> reference()
 %%
 %% @doc monitor a registered name
 %% `monitor(Key, info)' works much like erlang:monitor(process, Pid), but monitors
@@ -898,8 +900,14 @@ monitor(Key) ->
 %% If the name is not yet registered, the unreg event is sent immediately.
 %% If the calling process in this case tried to start a `standby' monitoring,
 %% it receives the registered name and the failover event immediately.
+%%
+%% `monitor(Key, follow)' keeps monitoring the registered name even if it is
+%% temporarily unregistered. The messages received are the same as for the other
+%% monitor types, but `{gproc, registered, Ref, Key}' is also sent when a new
+%% process registers the name.
 %% @end
 monitor(Key, Type) when Type==info;
+                        Type==follow;
                         Type==standby ->
     ?CATCH_GPROC_ERROR(monitor1(Key, Type), [Key, Type]).
 
@@ -2072,23 +2080,40 @@ handle_call({reg_or_locate, {T,l,_} = Key, Val, P}, _, S) ->
 handle_call({monitor, {T,l,_} = Key, Pid, Type}, _From, S)
   when T==n; T==a ->
     Ref = make_ref(),
-    _ = case {where(Key), Type} of
-	    {undefined, info} ->
+    Lookup = ets:lookup(?TAB, {Key, T}),
+    IsRegged = is_regged(Lookup),
+    _ = case {IsRegged, Type} of
+	    {false, info} ->
 		Pid ! {gproc, unreg, Ref, Key};
-            {undefined, standby} ->
-                true = gproc_lib:insert_reg(Key, undefined, Pid, l),
-                Pid ! {gproc, {failover, Pid}, Ref, Key},
+            {false, follow} ->
+		Pid ! {gproc, unreg, Ref, Key},
+                _ = gproc_lib:ensure_monitor(Pid, l),
+                case Lookup of
+                    [{K, Waiters}] ->
+                        NewWaiters = [{Pid,Ref,follow}|Waiters],
+                        ets:insert(?TAB, {K, NewWaiters}),
+                        ets:insert_new(?TAB, {{Pid,Key}, []});
+                    [] ->
+                        ets:insert(?TAB, {{Key,T}, [{Pid,Ref,follow}]}),
+                        ets:insert_new(?TAB, {{Pid,Key}, []})
+                end;
+            {false, standby} ->
+                Evt = {failover, Pid},
+                true = gproc_lib:insert_reg(Key, undefined, Pid, l, Evt),
+                Pid ! {gproc, Evt, Ref, Key},
+                notify_waiters(Lookup, Evt, Key),
                 _ = gproc_lib:ensure_monitor(Pid, l);
-	    {RegPid, _} ->
+	    {true, _} ->
+                [{_, RegPid, _}] = Lookup,
                 _ = gproc_lib:ensure_monitor(Pid, l),
 		case ets:lookup(?TAB, {RegPid, Key}) of
 		    [{K,r}] ->
-			ets:insert(?TAB, [{K, [{monitor, [{Pid,Ref,Type}]}]},
-                                          {{Pid,Key}, []}]);
+			ets:insert(?TAB, {K, [{monitor, [{Pid,Ref,Type}]}]}),
+                        ets:insert_new({{Pid,Key}, []});
 		    [{K, Opts}] ->
-			ets:insert(?TAB, [{K, gproc_lib:add_monitor(
-                                                Opts, Pid, Ref, Type)},
-                                          {{Pid,Key}, []}])
+			ets:insert(?TAB, {K, gproc_lib:add_monitor(
+                                               Opts, Pid, Ref, Type)}),
+                        ets:insert_new(?TAB, {{Pid,Key}, []})
 		end
 	end,
     {reply, Ref, S};
@@ -2301,8 +2326,8 @@ process_is_down(Pid) when is_pid(Pid) ->
                               ets:delete(?TAB, Key),
 			      opt_notify(R, K, Pid, V);
                           [{_, Waiters}] ->
-                              case [W || {P,_} = W <- Waiters,
-                                         P =/= Pid] of
+                              case [W || W <- Waiters,
+                                         element(1,W) =/= Pid] of
                                   [] ->
                                       ets:delete(?TAB, Key);
                                   Waiters1 ->
@@ -2355,10 +2380,12 @@ opt_notify(r, _, _, _) ->
 opt_notify(Opts, {T,_,_} = Key, Pid, Value) ->
     case gproc_lib:standbys(Opts) of
         [] ->
+            keep_followers(Opts, Key),
             gproc_lib:notify(unreg, Key, Opts);
         SBs ->
             case pick_standby(SBs) of
                 false ->
+                    keep_followers(Opts, Key),
                     gproc_lib:notify(unreg, Key, Opts),
                     ok;
                 {ToPid, Ref} ->
@@ -2373,6 +2400,14 @@ opt_notify(Opts, {T,_,_} = Key, Pid, Value) ->
             end
     end.
 
+keep_followers(Opts, {T,_,_} = Key) ->
+    case gproc_lib:followers(Opts) of
+        [] ->
+            ok;
+        [_|_] = F ->
+            ets:insert(?TAB, {{Key,T}, F})
+    end.
+
 pick_standby([{Pid, Ref, standby}|T]) when node(Pid) =:= node() ->
     case is_process_alive(Pid) of
         true ->
@@ -2821,3 +2856,14 @@ qlc_select(false, {Objects, Cont}) ->
 is_unique(n) -> true;
 is_unique(a) -> true;
 is_unique(_) -> false.
+
+is_regged([{_, _, _}]) ->
+    true;
+is_regged(_) ->
+    false.
+
+notify_waiters([{_, Ws}], Evt, K) ->
+    [P ! {gproc, Evt, R, K} || {P,R,follow} <- Ws],
+    ok;
+notify_waiters(_, _, _) ->
+    ok.

+ 64 - 3
src/gproc_dist.erl

@@ -129,6 +129,7 @@ reg_shared(_, _) ->
     ?THROW_GPROC_ERROR(badarg).
 
 monitor({_,g,_} = Key, Type) when Type==info;
+                                  Type==follow;
                                   Type==standby ->
     leader_call({monitor, Key, self(), Type});
 monitor(_, _) ->
@@ -351,7 +352,7 @@ handle_leader_call({reg, {_C,g,_Name} = K, Value, Pid}, _From, S, _E) ->
             {reply, true, [{insert, Vals}], S}
     end;
 handle_leader_call({monitor, {T,g,_} = K, MPid, Type}, _From, S, _E) when T==n;
-                                                                         T==a ->
+                                                                          T==a ->
     case ets:lookup(?TAB, {K, T}) of
         [{_, Pid, _}] ->
             Opts = get_opts(Pid, K),
@@ -362,15 +363,34 @@ handle_leader_call({monitor, {T,g,_} = K, MPid, Type}, _From, S, _E) when T==n;
             Rev = {{MPid,K}, []},
             ets:insert(?TAB, [Obj, Rev]),
             {reply, Ref, [{insert, [Obj, Rev]}], S};
-        [] ->
+        LookupRes ->
             Ref = make_ref(),
             case Type of
                 standby ->
+                    Event = {failover, MPid},
+                    Msgs = insert_reg(LookupRes, K, undefined, MPid, Event),
                     Obj = {{K,T}, MPid, undefined},
                     Rev = {{MPid,K}, []},
                     ets:insert(?TAB, [Obj, Rev]),
                     MPid ! {gproc, {failover,MPid}, Ref, K},
-                    {reply, Ref, [{insert, [Obj, Rev]}], S};
+                    {reply, Ref, [{insert, [Obj, Rev]},
+                                  {notify, Msgs}], S};
+                follow ->
+                    case LookupRes of
+                        [{_, Waiters}] ->
+                            add_follow_to_waiters(Waiters, K, MPid, Ref, S);
+                        [] ->
+                            add_follow_to_waiters([], K, MPid, Ref, S);
+                        [{_, Pid, _}] ->
+                            case ets:lookup(?TAB, {Pid,K}) of
+                                [{_, Opts}] when is_list(Opts) ->
+                                    Opts1 = gproc_lib:add_monitor(
+                                              Opts, MPid, Ref, follow),
+                                    ets:insert(?TAB, {{Pid,K}, Opts1}),
+                                    {reply, Ref,
+                                     [{insert, [{{Pid,K}, Opts1}]}], S}
+                            end
+                    end;
                 _ ->
                     MPid ! {gproc, unreg, Ref, K},
                     {reply, Ref, S}
@@ -386,6 +406,10 @@ handle_leader_call({demonitor, {T,g,_} = K, MPid, Ref}, _From, S, _E) ->
             ets:delete(?TAB, {MPid, K}),
             {reply, ok, [{delete, [{MPid,K}]},
                          {insert, [Obj]}], S};
+        [{Key, Waiters}] ->
+            NewWaiters = [W || W <- Waiters,
+                               W =/= {MPid, Ref, follow}],
+            {reply, ok, [{insert, [{Key, NewWaiters}]}], S};
         _ ->
             {reply, ok, S}
     end;
@@ -771,6 +795,9 @@ delete_globals(Globals) ->
 	      ets:delete(?TAB, {Pid, Key})
       end, Globals).
 
+do_notify([{P, Msg}|T]) when is_pid(P) ->
+    P ! Msg,
+    do_notify(T);
 do_notify([{K, P, E}|T]) ->
     case ets:lookup(?TAB, {P,K}) of
         [{_, Opts}] when is_list(Opts) ->
@@ -929,3 +956,37 @@ pid_to_give_away_to({T,g,_} = Key) when T==n; T==a ->
         _ ->
             undefined
     end.
+
+insert_reg([{_, Waiters}], K, Val, Pid, Event) ->
+    gproc_lib:insert_reg(K, Val, Pid, g, []),
+    tell_waiters(Waiters, K, Pid, Val, Event).
+
+tell_waiters([{P,R}|T], K, Pid, V, Event) ->
+    Msg = {gproc, R, registered, {K, Pid, V}},
+    if node(P) == node() ->
+            P ! Msg;
+       true ->
+            [{P, Msg} | tell_waiters(T, K, Pid, V, Event)]
+    end;
+tell_waiters([{P,R,follow}|T], K, Pid, V, Event) ->
+    Msg = {gproc, Event, R, K},
+    if node(P) == node() ->
+            P ! Msg;
+       true ->
+            [{P, Msg} | tell_waiters(T, K, Pid, V, Event)]
+    end;
+tell_waiters([], _, _, _, _) ->
+    [].
+
+add_follow_to_waiters(Waiters, {T,_,_} = K, Pid, Ref, S) ->
+    Obj = {{K,T}, [{Pid, Ref, follow}|Waiters]},
+    Rev = {{Pid,K}, []},
+    ets:insert(?TAB, [Obj, Rev]),
+    Msg = {gproc, unreg, Ref, K},
+    if node(Pid) == node() ->
+            Pid ! Msg,
+            {reply, Ref, [{insert, [Obj, Rev]}], S};
+       true ->
+            {reply, Ref, [{insert, [Obj, Rev]},
+                          {notify, [{Pid, Msg}]}]}
+    end.

+ 53 - 34
src/gproc_lib.erl

@@ -26,12 +26,13 @@
          do_set_value/3,
          ensure_monitor/2,
          insert_many/4,
-         insert_reg/4,
+         insert_reg/4, insert_reg/5,
 	 insert_attr/4,
          remove_many/4,
          remove_reg/3, remove_reg/4,
          monitors/1,
          standbys/1,
+         followers/1,
          remove_monitor_pid/2,
 	 add_monitor/4,
 	 remove_monitor/3,
@@ -43,9 +44,17 @@
          update_counter/3,
 	 valid_opts/2]).
 
+-export([dbg/1]).
+
 -include("gproc_int.hrl").
 -include("gproc.hrl").
 
+dbg(Mods) ->
+    dbg:tracer(),
+    [dbg:tpl(M,x) || M <- Mods],
+    dbg:tp(ets,'_',[{[gproc,'_'], [], [{message,{exception_trace}}]}]),
+    dbg:p(all,[c]).
+
 %% We want to store names and aggregated counters with the same
 %% structure as properties, but at the same time, we must ensure
 %% that the key is unique. We replace the Pid in the key part
@@ -54,8 +63,10 @@
 %% symmetric.
 %%
 -spec insert_reg(key(), any(), pid() | shared, scope()) -> boolean().
+insert_reg(K, Value, Pid, Scope) ->
+    insert_reg(K, Value, Pid, Scope, registered).
 
-insert_reg({T,_,Name} = K, Value, Pid, Scope) when T==a; T==n ->
+insert_reg({T,_,Name} = K, Value, Pid, Scope, Event) when T==a; T==n ->
     MaybeScan = fun() ->
                         if T==a ->
                                 Initial = scan_existing_counters(Scope, Name),
@@ -69,18 +80,18 @@ insert_reg({T,_,Name} = K, Value, Pid, Scope) when T==a; T==n ->
             _ = ets:insert_new(?TAB, {{Pid,K}, []}),
             MaybeScan();
         false ->
-            if T==n ->
-                    maybe_waiters(K, Pid, Value, T);
+            if T==n; T==a ->
+                    maybe_waiters(K, Pid, Value, T, Event);
                true ->
                     false
             end
     end;
-insert_reg({p,Scope,_} = K, Value, shared, Scope)
+insert_reg({p,Scope,_} = K, Value, shared, Scope, _E)
   when Scope == g; Scope == l ->
     %% shared properties are unique
     Info = [{{K, shared}, shared, Value}, {{shared,K}, []}],
     ets:insert_new(?TAB, Info);
-insert_reg({c,Scope,Ctr} = Key, Value, Pid, Scope) when Scope==l; Scope==g ->
+insert_reg({c,Scope,Ctr} = Key, Value, Pid, Scope, _E) when Scope==l; Scope==g ->
     %% Non-unique keys; store Pid in the key part
     K = {Key, Pid},
     Kr = {Pid, Key},
@@ -92,7 +103,7 @@ insert_reg({c,Scope,Ctr} = Key, Value, Pid, Scope) when Scope==l; Scope==g ->
             ignore
     end,
     Res;
-insert_reg({_,_,_} = Key, Value, Pid, _Scope) when is_pid(Pid) ->
+insert_reg({_,_,_} = Key, Value, Pid, _Scope, _E) when is_pid(Pid) ->
     %% Non-unique keys; store Pid in the key part
     K = {Key, Pid},
     Kr = {Pid, Key},
@@ -153,7 +164,7 @@ insert_objects(Objs) ->
               case Existing of
                   [] -> ok;
                   [{_, Waiters}] ->
-                      notify_waiters(Waiters, Id, Pid, V)
+                      notify_waiters(Waiters, Id, Pid, V, registered)
               end
       end, Objs).
 
@@ -185,30 +196,30 @@ await({T,C,_} = Key, WPid, {_Pid, Ref} = From) ->
             {reply, Ref, [W,Rev]}
     end.
 
-
-
-maybe_waiters(K, Pid, Value, T) ->
+maybe_waiters(_, _, _, _, []) ->
+    false;
+maybe_waiters(K, Pid, Value, T, Event) ->
     case ets:lookup(?TAB, {K,T}) of
         [{_, Waiters}] when is_list(Waiters) ->
-            ets:insert(?TAB, [{{K,T}, Pid, Value}, {{Pid,K}, []}]),
-            notify_waiters(Waiters, K, Pid, Value),
+            Followers = [F || {_,_,follow} = F <- Waiters],
+            ets:insert(?TAB, [{{K,T}, Pid, Value},
+                              {{Pid,K}, [{monitor, Followers}
+                                         || Followers =/= []]}]),
+            notify_waiters(Waiters, K, Pid, Value, Event),
             true;
         _ ->
             false
     end.
 
-
--spec notify_waiters([{pid(), reference()}], key(), pid(), any()) -> ok.
-
-notify_waiters(Waiters, K, Pid, V) ->
-    _ = [begin
-             P ! {gproc, Ref, registered, {K, Pid, V}},
-             case P of
-		 Pid -> ignore;
-		 _ ->
-		     ets:delete(?TAB, {P, K})
-	     end
-         end || {P, Ref} <- Waiters],
+-spec notify_waiters([{pid(), reference()}], key(), pid(), any(), any()) -> ok.
+notify_waiters([{P, Ref}|T], K, Pid, V, E) ->
+    P ! {gproc, Ref, registered, {K, Pid, V}},
+    notify_waiters(T, K, Pid, V, E);
+notify_waiters([{P, Ref, follow}|T], K, Pid, V, E) ->
+    %% This is really a monitor, lurking in the Waiters list
+    P ! {gproc, E, Ref, K},
+    notify_waiters(T, K, Pid, V, E);
+notify_waiters([], _, _, _, _) ->
     ok.
 
 remove_wait({T,_,_} = Key, Pid, Ref, Waiters) ->
@@ -233,10 +244,15 @@ remove_wait({T,_,_} = Key, Pid, Ref, Waiters) ->
     end.
 
 remove_from_waiters(Waiters, Pid, all) ->
-    [{P,R} || {P,R} <- Waiters,
-	      P =/= Pid];
+    [W || W <- Waiters,
+	      element(1,W) =/= Pid];
 remove_from_waiters(Waiters, Pid, Ref) ->
-    Waiters -- [{Pid, Ref}].
+    [W || W <- Waiters, not is_waiter(W, Pid, Ref)].
+
+is_waiter({Pid, Ref}   , Pid, Ref) -> true;
+is_waiter({Pid, Ref, _}, Pid, Ref) -> true;
+is_waiter(_, _, _) ->
+    false.
 
 remove_monitors(Key, Pid, MPid) ->
     case ets:lookup(?TAB, {Pid, Key}) of
@@ -325,13 +341,16 @@ monitors(Opts) ->
     end.
 
 standbys(Opts) ->
-    standbys(monitors(Opts), []).
+    select_monitors(monitors(Opts), standby, []).
+
+followers(Opts) ->
+    select_monitors(monitors(Opts), follow, []).
 
-standbys([{_,_,standby}=H|T], Acc) ->
-    standbys(T, [H|Acc]);
-standbys([_|T], Acc) ->
-    standbys(T, Acc);
-standbys([], Acc) ->
+select_monitors([{_,_,Type}=H|T], Type, Acc) ->
+    select_monitors(T, Type, [H|Acc]);
+select_monitors([_|T], Type, Acc) ->
+    select_monitors(T, Type, Acc);
+select_monitors([], _, Acc) ->
     Acc.
 
 remove_monitor_pid([{monitor, Mons}|T], Pid) ->

+ 33 - 100
test/gproc_dist_tests.erl

@@ -91,6 +91,9 @@ dist_test_() ->
                                        ?debugVal(t_standby_monitor(Ns))
                                end,
                                fun() ->
+                                       ?debugVal(t_follow_monitor(Ns))
+                               end,
+                               fun() ->
                                        ?debugVal(t_subscribe(Ns))
                                end
                            ]
@@ -330,6 +333,16 @@ t_standby_monitor([A,B|_] = Ns) ->
     ?assertMatch({gproc,unreg,Ref1,Na}, got_msg(Pc, gproc)),
     ?assertMatch(ok, t_lookup_everywhere(Na, Ns, undefined)).
 
+t_follow_monitor([A,B|_] = Ns) ->
+    Na = ?T_NAME,
+    Pa = t_spawn(A, _Selective = true),
+    Ref = t_call(Pa, {apply, gproc, monitor, [Na, follow]}),
+    {gproc,unreg,Ref,Na} = got_msg(Pa),
+    Pb = t_spawn_reg(A, Na),
+    {gproc,registered,Ref,Na} = got_msg(Pa),
+    ok = t_call(Pb, die),
+    ok = t_call(Pa, die).
+
 t_subscribe([A,B|_] = Ns) ->
     Na = ?T_NAME,
     Pb = t_spawn(B, _Selective = true),
@@ -345,17 +358,17 @@ t_subscribe([A,B|_] = Ns) ->
     ?assertEqual({gproc_monitor,Na,undefined}, got_msg(Pb, gproc_monitor)),
     ok.
 
-got_msg(Pb, Tag) ->
-    t_call(Pb,
-	   {apply_fun,
-	    fun() ->
-		    receive
-			M when element(1, M) == Tag ->
-			    M
-		    after 1000 ->
-			    erlang:error({timeout, got_msg, [Pb, Tag]})
-		    end
-	    end}).
+%% got_msg(Pb, Tag) ->
+%%     t_call(Pb,
+%% 	   {apply_fun,
+%% 	    fun() ->
+%% 		    receive
+%% 			M when element(1, M) == Tag ->
+%% 			    M
+%% 		    after 1000 ->
+%% 			    erlang:error({timeout, got_msg, [Pb, Tag]})
+%% 		    end
+%% 	    end}).
 
 %% Verify that the gproc_dist:sync() call returns true even if a candidate dies
 %% while the sync is underway. This test makes use of sys:suspend() to ensure that
@@ -436,97 +449,17 @@ t_read_everywhere(Key, Pid, Nodes, Exp, I) ->
 read_result({badrpc, {'EXIT', {badarg, _}}}) -> badarg;
 read_result(R) -> R.
 
-t_spawn(Node) ->
-    t_spawn(Node, false).
-
-t_spawn(Node, Selective) when is_boolean(Selective) ->
-    Me = self(),
-    P = spawn(Node, fun() ->
-			    Me ! {self(), ok},
-			    t_loop(Selective)
-		    end),
-    receive
-	{P, ok} -> P
-    after 1000 ->
-            erlang:error({timeout, t_spawn, [Node, Selective]})
-    end.
-
-t_spawn_reg(Node, Name) ->
-    t_spawn_reg(Node, Name, default_value(Name)).
-
-t_spawn_reg(Node, Name, Value) ->
-    Me = self(),
-    P = spawn(Node, fun() ->
-                            ?assertMatch(true, gproc:reg(Name, Value)),
-                            Me ! {self(), ok},
-                            t_loop()
-                    end),
-    receive
-	{P, ok} -> P
-    after 1000 ->
-            erlang:error({timeout, t_spawn_reg, [Node, Name, Value]})
-    end.
-
-t_spawn_reg_shared(Node, Name, Value) ->
-    Me = self(),
-    P = spawn(Node, fun() ->
-                            ?assertMatch(true, gproc:reg_shared(Name, Value)),
-                            Me ! {self(), ok},
-                            t_loop()
-                    end),
-    receive
-	{P, ok} -> P
-    after 1000 ->
-              erlang:error({timeout, t_spawn_reg_shared, [Node,Name,Value]})
-    end.
-
-default_value({c,_,_}) -> 0;
-default_value(_) -> undefined.
-
-t_spawn_mreg(Node, KVL) ->
-    Me = self(),
-    P = spawn(Node, fun() ->
-                            ?assertMatch(true, gproc:mreg(n, g, KVL)),
-                            Me ! {self(), ok},
-                            t_loop()
-                    end),
-    receive
-	{P, ok} -> P
-    end.
+t_spawn(Node) -> gproc_test_lib:t_spawn(Node).
+t_spawn(Node, Selective) -> gproc_test_lib:t_spawn(Node, Selective).
+t_spawn_mreg(Node, KVL) -> gproc_test_lib:t_spawn_mreg(Node, KVL).
+t_spawn_reg(Node, N) -> gproc_test_lib:t_spawn_reg(Node, N).
+t_spawn_reg(Node, N, V) -> gproc_test_lib:t_spawn_reg(Node, N, V).
+t_spawn_reg_shared(Node, N, V) -> gproc_test_lib:t_spawn_reg_shared(Node, N, V).
+got_msg(P) -> gproc_test_lib:got_msg(P).
+got_msg(P, Tag) -> gproc_test_lib:got_msg(P, Tag).
 
 t_call(P, Req) ->
-    Ref = erlang:monitor(process, P),
-    P ! {self(), Ref, Req},
-    receive
-	{P, Ref, Res} ->
-	    erlang:demonitor(Ref, [flush]),
-	    Res;
-	{'DOWN', Ref, _, _, Error} ->
-	    erlang:error({'DOWN', P, Error})
-    after 1000 ->
-            erlang:error({timeout,t_call,[P,Req]})
-    end.
-
-t_loop() ->
-    t_loop(false).
-
-t_loop(Selective) when is_boolean(Selective) ->
-    receive
-	{From, Ref, die} ->
-	    From ! {self(), Ref, ok};
-	{From, Ref, {selective, Bool}} when is_boolean(Bool) ->
-	    From ! {self(), Ref, ok},
-	    t_loop(Bool);
-	{From, Ref, {apply, M, F, A}} ->
-	    From ! {self(), Ref, apply(M, F, A)},
-	    t_loop(Selective);
-	{From, Ref, {apply_fun, F}} ->
-	    From ! {self(), Ref, F()},
-	    t_loop(Selective);
-	Other when not Selective ->
-	    ?debugFmt("got unknown msg: ~p~n", [Other]),
-	    exit({unknown_msg, Other})
-    end.
+    gproc_test_lib:t_call(P, Req).
 
 start_slaves(Ns) ->
     [H|T] = Nodes = [start_slave(N) || N <- Ns],

+ 126 - 0
test/gproc_test_lib.erl

@@ -0,0 +1,126 @@
+-module(gproc_test_lib).
+
+-export([t_spawn/1, t_spawn/2,
+         t_spawn_reg/2, t_spawn_reg/3,
+         t_spawn_reg_shared/3,
+         t_spawn_mreg/2,
+         t_call/2,
+         t_loop/0, t_loop/1,
+         got_msg/1, got_msg/2]).
+
+-include_lib("eunit/include/eunit.hrl").
+
+t_spawn(Node) ->
+    t_spawn(Node, false).
+
+t_spawn(Node, Selective) when is_boolean(Selective) ->
+    Me = self(),
+    P = spawn(Node, fun() ->
+			    Me ! {self(), ok},
+			    t_loop(Selective)
+		    end),
+    receive
+	{P, ok} -> P
+    after 1000 ->
+            erlang:error({timeout, t_spawn, [Node, Selective]})
+    end.
+
+t_spawn_reg(Node, Name) ->
+    t_spawn_reg(Node, Name, default_value(Name)).
+
+t_spawn_reg(Node, Name, Value) ->
+    Me = self(),
+    P = spawn(Node, fun() ->
+                            ?assertMatch(true, gproc:reg(Name, Value)),
+                            Me ! {self(), ok},
+                            t_loop()
+                    end),
+    receive
+	{P, ok} -> P
+    after 1000 ->
+            erlang:error({timeout, t_spawn_reg, [Node, Name, Value]})
+    end.
+
+t_spawn_mreg(Node, KVL) ->
+    Me = self(),
+    P = spawn(Node, fun() ->
+                            ?assertMatch(true, gproc:mreg(n, g, KVL)),
+                            Me ! {self(), ok},
+                            t_loop()
+                    end),
+    receive
+	{P, ok} -> P
+    end.
+
+
+t_spawn_reg_shared(Node, Name, Value) ->
+    Me = self(),
+    P = spawn(Node, fun() ->
+                            ?assertMatch(true, gproc:reg_shared(Name, Value)),
+                            Me ! {self(), ok},
+                            t_loop()
+                    end),
+    receive
+	{P, ok} -> P
+    after 1000 ->
+              erlang:error({timeout, t_spawn_reg_shared, [Node,Name,Value]})
+    end.
+
+default_value({c,_,_}) -> 0;
+default_value(_) -> undefined.
+
+t_call(P, Req) ->
+    Ref = erlang:monitor(process, P),
+    P ! {self(), Ref, Req},
+    receive
+	{P, Ref, Res} ->
+	    erlang:demonitor(Ref, [flush]),
+	    Res;
+	{'DOWN', Ref, _, _, Error} ->
+	    erlang:error({'DOWN', P, Error})
+    after 1000 ->
+            erlang:error({timeout,t_call,[P,Req]})
+    end.
+
+t_loop() ->
+    t_loop(false).
+
+t_loop(Selective) when is_boolean(Selective) ->
+    receive
+	{From, Ref, die} ->
+	    From ! {self(), Ref, ok};
+	{From, Ref, {selective, Bool}} when is_boolean(Bool) ->
+	    From ! {self(), Ref, ok},
+	    t_loop(Bool);
+	{From, Ref, {apply, M, F, A}} ->
+	    From ! {self(), Ref, apply(M, F, A)},
+	    t_loop(Selective);
+	{From, Ref, {apply_fun, F}} ->
+	    From ! {self(), Ref, F()},
+	    t_loop(Selective);
+	Other when not Selective ->
+	    ?debugFmt("got unknown msg: ~p~n", [Other]),
+	    exit({unknown_msg, Other})
+    end.
+
+got_msg(Pb) ->
+    t_call(Pb,
+           {apply_fun,
+            fun() ->
+                    receive M -> M
+                    after 1000 ->
+                            erlang:error({timeout, got_msg, [Pb]})
+                    end
+            end}).
+
+got_msg(Pb, Tag) ->
+    t_call(Pb,
+	   {apply_fun,
+	    fun() ->
+		    receive
+			M when element(1, M) == Tag ->
+			    M
+		    after 1000 ->
+			    erlang:error({timeout, got_msg, [Pb, Tag]})
+		    end
+	    end}).

+ 39 - 10
test/gproc_tests.erl

@@ -22,6 +22,8 @@
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("stdlib/include/qlc.hrl").
 
+-define(T_NAME, {n, l, {?MODULE, ?LINE, erlang:now()}}).
+
 conf_test_() ->
     {foreach,
      fun() ->
@@ -132,6 +134,8 @@ reg_test_() ->
       , ?_test(t_is_clean())
       , {spawn, ?_test(?debugVal(t_monitor_standby()))}
       , ?_test(t_is_clean())
+      , {spawn, ?_test(?debugVal(t_monitor_follow()))}
+      , ?_test(t_is_clean())
       , {spawn, ?_test(?debugVal(t_subscribe()))}
       , ?_test(t_is_clean())
       , {spawn, ?_test(?debugVal(t_gproc_info()))}
@@ -383,7 +387,7 @@ t_give_away_to_pid() ->
     ?assertEqual(Me, gproc:where(From)),
     ?assertEqual(P, gproc:give_away(From, P)),
     ?assertEqual(P, gproc:where(From)),
-    ?assertEqual(ok, t_call(P, die)).
+    ?assertEqual(ok, t_lcall(P, die)).
 
 t_give_away_to_self() ->
     From = {n, l, foo},
@@ -418,9 +422,9 @@ t_give_away_and_back() ->
     ?assertEqual(Me, gproc:where(From)),
     ?assertEqual(P, gproc:give_away(From, P)),
     ?assertEqual(P, gproc:where(From)),
-    ?assertEqual(ok, t_call(P, {give_away, From})),
+    ?assertEqual(ok, t_lcall(P, {give_away, From})),
     ?assertEqual(Me, gproc:where(From)),
-    ?assertEqual(ok, t_call(P, die)).
+    ?assertEqual(ok, t_lcall(P, die)).
 
 t_select() ->
     ?assertEqual(true, gproc:reg({n, l, {n,1}}, x)),
@@ -659,7 +663,7 @@ t_get_env_inherit() ->
     ?assertEqual(bar, gproc:get_env(l, gproc, foo, [{inherit, P}])),
     ?assertEqual(bar, gproc:get_env(l, gproc, foo,
 				    [{inherit, {n,l,get_env_p}}])),
-    ?assertEqual(ok, t_call(P, die)).
+    ?assertEqual(ok, t_lcall(P, die)).
 
 %% What we test here is that we return the same current_function as the
 %% process_info() BIF. As we parse the backtrace dump, we check with some
@@ -706,7 +710,7 @@ t_monitor() ->
 	    ok
     end,
     Ref = gproc:monitor({n,l,a}),
-    ?assertEqual(ok, t_call(P, die)),
+    ?assertEqual(ok, t_lcall(P, die)),
     receive
 	M ->
 	    ?assertEqual({gproc,unreg,Ref,{n,l,a}}, M)
@@ -723,12 +727,12 @@ t_monitor_give_away() ->
 	    ok
     end,
     Ref = gproc:monitor({n,l,a}),
-    ?assertEqual(ok, t_call(P, {give_away, {n,l,a}})),
+    ?assertEqual(ok, t_lcall(P, {give_away, {n,l,a}})),
     receive
 	M ->
 	    ?assertEqual({gproc,{migrated,Me},Ref,{n,l,a}}, M)
     end,
-    ?assertEqual(ok, t_call(P, die)).
+    ?assertEqual(ok, t_lcall(P, die)).
 
 t_monitor_standby() ->
     Me = self(),
@@ -749,6 +753,23 @@ t_monitor_standby() ->
     gproc:unreg({n,l,a}),
     ok.
 
+t_monitor_follow() ->
+    Name = ?T_NAME,
+    P1 = t_spawn(_Selective = true),
+    Ref = t_call(P1, {apply, gproc, monitor, [Name, follow]}),
+    {gproc,unreg,Ref,Name} = got_msg(P1),
+    %% gproc_lib:dbg([gproc,gproc_lib]),
+    P2 = t_spawn_reg(Name),
+    {gproc,registered,Ref,Name} = got_msg(P1),
+    exit(P2, kill),
+    {gproc,unreg,Ref,Name} = got_msg(P1),
+    P3 = t_spawn(true),
+    Ref3 = t_call(P3, {apply, gproc, monitor, [Name, standby]}),
+    {gproc,{failover,P3},Ref,Name} = got_msg(P1),
+    {gproc,{failover,P3},Ref3,Name} = got_msg(P3),
+    [exit(P,kill) || P <- [P1,P3]],
+    ok.
+
 t_subscribe() ->
     Key = {n,l,a},
     ?assertEqual(ok, gproc_monitor:subscribe(Key)),
@@ -758,11 +779,11 @@ t_subscribe() ->
 			   t_loop()
 		   end),
     ?assertEqual({gproc_monitor, Key, P}, get_msg()),
-    ?assertEqual(ok, t_call(P, {give_away, Key})),
+    ?assertEqual(ok, t_lcall(P, {give_away, Key})),
     ?assertEqual({gproc_monitor, Key, {migrated,self()}}, get_msg()),
     gproc:give_away(Key, P),
     ?assertEqual({gproc_monitor, Key, {migrated,P}}, get_msg()),
-    ?assertEqual(ok, t_call(P, die)),
+    ?assertEqual(ok, t_lcall(P, die)),
     ?assertEqual({gproc_monitor, Key, undefined}, get_msg()),
     ?assertEqual(ok, gproc_monitor:unsubscribe(Key)).
 
@@ -773,6 +794,14 @@ get_msg() ->
 	    timeout
     end.
 
+%% t_spawn()      -> gproc_test_lib:t_spawn(node()).
+t_spawn(Sel)   -> gproc_test_lib:t_spawn(node(), Sel).
+t_spawn_reg(N) -> gproc_test_lib:t_spawn_reg(node(), N).
+t_call(P, Req) -> gproc_test_lib:t_call(P, Req).
+%% got_msg(P, M)  -> gproc_test_lib:got_msg(P, M).
+got_msg(P)     -> gproc_test_lib:got_msg(P).
+
+
 t_loop() ->
     receive
 	{From, {give_away, Key}} ->
@@ -783,7 +812,7 @@ t_loop() ->
 	    From ! {self(), ok}
     end.
 
-t_call(P, Msg) ->
+t_lcall(P, Msg) ->
     P ! {self(), Msg},
     receive
 	{P, Reply} ->