Browse Source

subscription, monitor local,dist + test cases

Ulf Wiger 13 years ago
parent
commit
a4d7f6fbce
8 changed files with 519 additions and 121 deletions
  1. 19 2
      doc/gproc.md
  2. 59 5
      doc/gproc_lib.md
  3. 50 49
      src/gproc.erl
  4. 97 25
      src/gproc_dist.erl
  5. 94 9
      src/gproc_lib.erl
  6. 62 7
      src/gproc_monitor.erl
  7. 46 1
      test/gproc_dist_tests.erl
  8. 92 23
      test/gproc_tests.erl

+ 19 - 2
doc/gproc.md

@@ -161,7 +161,7 @@ a = aggregate_counter
 ##Function Index##
 
 
-<table width="100%" border="1" cellspacing="0" cellpadding="2" summary="function index"><tr><td valign="top"><a href="#add_global_aggr_counter-1">add_global_aggr_counter/1</a></td><td>Registers a global (unique) aggregated counter.</td></tr><tr><td valign="top"><a href="#add_global_counter-2">add_global_counter/2</a></td><td>Registers a global (non-unique) counter.</td></tr><tr><td valign="top"><a href="#add_global_name-1">add_global_name/1</a></td><td>Registers a global (unique) name.</td></tr><tr><td valign="top"><a href="#add_global_property-2">add_global_property/2</a></td><td>Registers a global (non-unique) property.</td></tr><tr><td valign="top"><a href="#add_local_aggr_counter-1">add_local_aggr_counter/1</a></td><td>Registers a local (unique) aggregated counter.</td></tr><tr><td valign="top"><a href="#add_local_counter-2">add_local_counter/2</a></td><td>Registers a local (non-unique) counter.</td></tr><tr><td valign="top"><a href="#add_local_name-1">add_local_name/1</a></td><td>Registers a local (unique) name.</td></tr><tr><td valign="top"><a href="#add_local_property-2">add_local_property/2</a></td><td>Registers a local (non-unique) property.</td></tr><tr><td valign="top"><a href="#add_shared_local_counter-2">add_shared_local_counter/2</a></td><td>Registers a local shared (unique) counter.</td></tr><tr><td valign="top"><a href="#audit_process-1">audit_process/1</a></td><td></td></tr><tr><td valign="top"><a href="#await-1">await/1</a></td><td>Equivalent to <a href="#await-2"><tt>await(Key, infinity)</tt></a>.</td></tr><tr><td valign="top"><a href="#await-2">await/2</a></td><td>Wait for a local name to be registered.</td></tr><tr><td valign="top"><a href="#cancel_wait-2">cancel_wait/2</a></td><td></td></tr><tr><td valign="top"><a href="#default-1">default/1</a></td><td></td></tr><tr><td valign="top"><a href="#demonitor-2">demonitor/2</a></td><td>Remove a monitor on a registered name
+<table width="100%" border="1" cellspacing="0" cellpadding="2" summary="function index"><tr><td valign="top"><a href="#add_global_aggr_counter-1">add_global_aggr_counter/1</a></td><td>Registers a global (unique) aggregated counter.</td></tr><tr><td valign="top"><a href="#add_global_counter-2">add_global_counter/2</a></td><td>Registers a global (non-unique) counter.</td></tr><tr><td valign="top"><a href="#add_global_name-1">add_global_name/1</a></td><td>Registers a global (unique) name.</td></tr><tr><td valign="top"><a href="#add_global_property-2">add_global_property/2</a></td><td>Registers a global (non-unique) property.</td></tr><tr><td valign="top"><a href="#add_local_aggr_counter-1">add_local_aggr_counter/1</a></td><td>Registers a local (unique) aggregated counter.</td></tr><tr><td valign="top"><a href="#add_local_counter-2">add_local_counter/2</a></td><td>Registers a local (non-unique) counter.</td></tr><tr><td valign="top"><a href="#add_local_name-1">add_local_name/1</a></td><td>Registers a local (unique) name.</td></tr><tr><td valign="top"><a href="#add_local_property-2">add_local_property/2</a></td><td>Registers a local (non-unique) property.</td></tr><tr><td valign="top"><a href="#add_shared_local_counter-2">add_shared_local_counter/2</a></td><td>Registers a local shared (unique) counter.</td></tr><tr><td valign="top"><a href="#audit_process-1">audit_process/1</a></td><td></td></tr><tr><td valign="top"><a href="#await-1">await/1</a></td><td>Equivalent to <a href="#await-2"><tt>await(Key, infinity)</tt></a>.</td></tr><tr><td valign="top"><a href="#await-2">await/2</a></td><td>Wait for a local name to be registered.</td></tr><tr><td valign="top"><a href="#cancel_wait-2">cancel_wait/2</a></td><td>Cancels a previous call to nb_wait/1.</td></tr><tr><td valign="top"><a href="#cancel_wait_or_monitor-1">cancel_wait_or_monitor/1</a></td><td></td></tr><tr><td valign="top"><a href="#default-1">default/1</a></td><td></td></tr><tr><td valign="top"><a href="#demonitor-2">demonitor/2</a></td><td>Remove a monitor on a registered name
 This function is the reverse of monitor/1.</td></tr><tr><td valign="top"><a href="#first-1">first/1</a></td><td>Behaves as ets:first(Tab) for a given type of registration object.</td></tr><tr><td valign="top"><a href="#get_env-3">get_env/3</a></td><td>Equivalent to <a href="#get_env-4"><tt>get_env(Scope, App, Key, [app_env])</tt></a>.</td></tr><tr><td valign="top"><a href="#get_env-4">get_env/4</a></td><td>Read an environment value, potentially cached as a <code>gproc_env</code> property.</td></tr><tr><td valign="top"><a href="#get_set_env-3">get_set_env/3</a></td><td>Equivalent to <a href="#get_set_env-4"><tt>get_set_env(Scope, App, Key, [app_env])</tt></a>.</td></tr><tr><td valign="top"><a href="#get_set_env-4">get_set_env/4</a></td><td>Fetch and cache an environment value, if not already cached.</td></tr><tr><td valign="top"><a href="#get_value-1">get_value/1</a></td><td>Reads the value stored with a key registered to the current process.</td></tr><tr><td valign="top"><a href="#get_value-2">get_value/2</a></td><td>Reads the value stored with a key registered to the process Pid.</td></tr><tr><td valign="top"><a href="#give_away-2">give_away/2</a></td><td>Atomically transfers the key <code>From</code> to the process identified by <code>To</code>.</td></tr><tr><td valign="top"><a href="#goodbye-0">goodbye/0</a></td><td>Unregister all items of the calling process and inform gproc  
 to forget about the calling process.</td></tr><tr><td valign="top"><a href="#i-0">i/0</a></td><td>Similar to the built-in shell command <code>i()</code> but inserts information
 about names and properties registered in Gproc, where applicable.</td></tr><tr><td valign="top"><a href="#info-1">info/1</a></td><td>Similar to <code>process_info(Pid)</code> but with additional gproc info.</td></tr><tr><td valign="top"><a href="#info-2">info/2</a></td><td>Similar to process_info(Pid, Item), but with additional gproc info.</td></tr><tr><td valign="top"><a href="#last-1">last/1</a></td><td>Behaves as ets:last(Tab) for a given type of registration object.</td></tr><tr><td valign="top"><a href="#lookup_global_aggr_counter-1">lookup_global_aggr_counter/1</a></td><td>Lookup a global (unique) aggregated counter and returns its value.</td></tr><tr><td valign="top"><a href="#lookup_global_counters-1">lookup_global_counters/1</a></td><td>Look up all global (non-unique) instances of a given Counter.</td></tr><tr><td valign="top"><a href="#lookup_global_name-1">lookup_global_name/1</a></td><td>Lookup a global unique name.</td></tr><tr><td valign="top"><a href="#lookup_global_properties-1">lookup_global_properties/1</a></td><td>Look up all global (non-unique) instances of a given Property.</td></tr><tr><td valign="top"><a href="#lookup_local_aggr_counter-1">lookup_local_aggr_counter/1</a></td><td>Lookup a local (unique) aggregated counter and returns its value.</td></tr><tr><td valign="top"><a href="#lookup_local_counters-1">lookup_local_counters/1</a></td><td>Look up all local (non-unique) instances of a given Counter.</td></tr><tr><td valign="top"><a href="#lookup_local_name-1">lookup_local_name/1</a></td><td>Lookup a local unique name.</td></tr><tr><td valign="top"><a href="#lookup_local_properties-1">lookup_local_properties/1</a></td><td>Look up all local (non-unique) instances of a given Property.</td></tr><tr><td valign="top"><a href="#lookup_pid-1">lookup_pid/1</a></td><td>Lookup the Pid stored with a key.</td></tr><tr><td valign="top"><a href="#lookup_pids-1">lookup_pids/1</a></td><td>Returns a list of pids with the published key Key.</td></tr><tr><td valign="top"><a href="#lookup_value-1">lookup_value/1</a></td><td>Lookup the value stored with a key.</td></tr><tr><td valign="top"><a href="#lookup_values-1">lookup_values/1</a></td><td>Retrieve the <code>{Pid,Value}</code> pairs corresponding to Key.</td></tr><tr><td valign="top"><a href="#monitor-1">monitor/1</a></td><td>monitor a registered name
@@ -328,7 +328,24 @@ registered (the difference: await/2 also returns the value).<a name="cancel_wait
 
 
 
-`cancel_wait(Key, Ref) -> any()`
+<pre>cancel_wait(Key::[key()](#type-key), Ref) -&gt; ok</pre>
+<ul class="definitions"><li><pre>Ref = all | reference()</pre></li></ul>
+
+
+
+
+
+Cancels a previous call to nb_wait/1
+
+If `Ref = all`, all wait requests on `Key` from the calling process
+are canceled.<a name="cancel_wait_or_monitor-1"></a>
+
+###cancel_wait_or_monitor/1##
+
+
+
+
+`cancel_wait_or_monitor(Key) -> any()`
 
 <a name="default-1"></a>
 

+ 59 - 5
doc/gproc_lib.md

@@ -23,13 +23,22 @@ For a detailed description, see gproc/doc/erlang07-wiger.pdf.<a name="index"></a
 ##Function Index##
 
 
-<table width="100%" border="1" cellspacing="0" cellpadding="2" summary="function index"><tr><td valign="top"><a href="#await-3">await/3</a></td><td></td></tr><tr><td valign="top"><a href="#do_set_counter_value-3">do_set_counter_value/3</a></td><td></td></tr><tr><td valign="top"><a href="#do_set_value-3">do_set_value/3</a></td><td></td></tr><tr><td valign="top"><a href="#ensure_monitor-2">ensure_monitor/2</a></td><td></td></tr><tr><td valign="top"><a href="#insert_many-4">insert_many/4</a></td><td></td></tr><tr><td valign="top"><a href="#insert_reg-4">insert_reg/4</a></td><td></td></tr><tr><td valign="top"><a href="#notify-2">notify/2</a></td><td></td></tr><tr><td valign="top"><a href="#remove_many-4">remove_many/4</a></td><td></td></tr><tr><td valign="top"><a href="#remove_reg-2">remove_reg/2</a></td><td></td></tr><tr><td valign="top"><a href="#remove_reg-3">remove_reg/3</a></td><td></td></tr><tr><td valign="top"><a href="#update_aggr_counter-3">update_aggr_counter/3</a></td><td></td></tr><tr><td valign="top"><a href="#update_counter-3">update_counter/3</a></td><td></td></tr><tr><td valign="top"><a href="#valid_opts-2">valid_opts/2</a></td><td></td></tr></table>
+<table width="100%" border="1" cellspacing="0" cellpadding="2" summary="function index"><tr><td valign="top"><a href="#add_monitor-3">add_monitor/3</a></td><td></td></tr><tr><td valign="top"><a href="#await-3">await/3</a></td><td></td></tr><tr><td valign="top"><a href="#do_set_counter_value-3">do_set_counter_value/3</a></td><td></td></tr><tr><td valign="top"><a href="#do_set_value-3">do_set_value/3</a></td><td></td></tr><tr><td valign="top"><a href="#ensure_monitor-2">ensure_monitor/2</a></td><td></td></tr><tr><td valign="top"><a href="#insert_many-4">insert_many/4</a></td><td></td></tr><tr><td valign="top"><a href="#insert_reg-4">insert_reg/4</a></td><td></td></tr><tr><td valign="top"><a href="#notify-2">notify/2</a></td><td></td></tr><tr><td valign="top"><a href="#notify-3">notify/3</a></td><td></td></tr><tr><td valign="top"><a href="#remove_many-4">remove_many/4</a></td><td></td></tr><tr><td valign="top"><a href="#remove_monitor-3">remove_monitor/3</a></td><td></td></tr><tr><td valign="top"><a href="#remove_monitors-3">remove_monitors/3</a></td><td></td></tr><tr><td valign="top"><a href="#remove_reg-3">remove_reg/3</a></td><td></td></tr><tr><td valign="top"><a href="#remove_reg-4">remove_reg/4</a></td><td></td></tr><tr><td valign="top"><a href="#remove_reverse_mapping-3">remove_reverse_mapping/3</a></td><td></td></tr><tr><td valign="top"><a href="#remove_wait-4">remove_wait/4</a></td><td></td></tr><tr><td valign="top"><a href="#update_aggr_counter-3">update_aggr_counter/3</a></td><td></td></tr><tr><td valign="top"><a href="#update_counter-3">update_counter/3</a></td><td></td></tr><tr><td valign="top"><a href="#valid_opts-2">valid_opts/2</a></td><td></td></tr></table>
 
 
 <a name="functions"></a>
 
 ##Function Details##
 
+<a name="add_monitor-3"></a>
+
+###add_monitor/3##
+
+
+
+
+`add_monitor(T, Pid, Ref) -> any()`
+
 <a name="await-3"></a>
 
 ###await/3##
@@ -97,6 +106,15 @@ For a detailed description, see gproc/doc/erlang07-wiger.pdf.<a name="index"></a
 
 `notify(Key, Opts) -> any()`
 
+<a name="notify-3"></a>
+
+###notify/3##
+
+
+
+
+`notify(Event, Key, Opts) -> any()`
+
 <a name="remove_many-4"></a>
 
 ###remove_many/4##
@@ -106,14 +124,23 @@ For a detailed description, see gproc/doc/erlang07-wiger.pdf.<a name="index"></a
 
 `remove_many(T, Scope, L, Pid) -> any()`
 
-<a name="remove_reg-2"></a>
+<a name="remove_monitor-3"></a>
+
+###remove_monitor/3##
+
+
+
+
+`remove_monitor(T, Pid, Ref) -> any()`
+
+<a name="remove_monitors-3"></a>
 
-###remove_reg/2##
+###remove_monitors/3##
 
 
 
 
-`remove_reg(Key, Pid) -> any()`
+`remove_monitors(Key, Pid, MPid) -> any()`
 
 <a name="remove_reg-3"></a>
 
@@ -122,7 +149,34 @@ For a detailed description, see gproc/doc/erlang07-wiger.pdf.<a name="index"></a
 
 
 
-`remove_reg(Key, Pid, Opts) -> any()`
+`remove_reg(Key, Pid, Event) -> any()`
+
+<a name="remove_reg-4"></a>
+
+###remove_reg/4##
+
+
+
+
+`remove_reg(Key, Pid, Event, Opts) -> any()`
+
+<a name="remove_reverse_mapping-3"></a>
+
+###remove_reverse_mapping/3##
+
+
+
+
+`remove_reverse_mapping(Event, Pid, Key) -> any()`
+
+<a name="remove_wait-4"></a>
+
+###remove_wait/4##
+
+
+
+
+`remove_wait(Key, Pid, Ref, Waiters) -> any()`
 
 <a name="update_aggr_counter-3"></a>
 

+ 50 - 49
src/gproc.erl

@@ -73,6 +73,7 @@
          await/1, await/2,
          nb_wait/1,
          cancel_wait/2,
+	 cancel_wait_or_monitor/1,
 	 monitor/1,
 	 demonitor/2,
          lookup_pid/1,
@@ -659,6 +660,15 @@ nb_wait({n,l,_} = Key) ->
 nb_wait(Key) ->
     erlang:error(badarg, [Key]).
 
+%% @spec cancel_wait(Key::key(), Ref) -> ok
+%%    Ref = all | reference()
+%%
+%% @doc Cancels a previous call to nb_wait/1
+%%
+%% If `Ref = all', all wait requests on `Key' from the calling process
+%% are canceled.
+%% @end
+%%
 cancel_wait({_,g,_} = Key, Ref) ->
     ?CHK_DIST,
     cast({cancel_wait, self(), Key, Ref}, g),
@@ -667,6 +677,14 @@ cancel_wait({_,l,_} = Key, Ref) ->
     cast({cancel_wait, self(), Key, Ref}, l),
     ok.
 
+cancel_wait_or_monitor({_,g,_} = Key) ->
+    ?CHK_DIST,
+    cast({cancel_wait_or_monitor, self(), Key}, g),
+    ok;
+cancel_wait_or_monitor({_,l,_} = Key) ->
+    cast({cancel_wait_or_monitor, self(), Key}, l),
+    ok.
+
 
 %% @spec monitor(key()) -> reference()
 %%
@@ -680,9 +698,9 @@ cancel_wait({_,l,_} = Key, Ref) ->
 %% @end
 monitor({T,g,_} = Key) when T==n; T==a ->
     ?CHK_DIST,
-    call({monitor, Key}, g);
+    call({monitor, Key, self()}, g);
 monitor({T,l,_} = Key) when T==n; T==a ->
-    call({monitor, Key}, l);
+    call({monitor, Key, self()}, l);
 monitor(Key) ->
     erlang:error(badarg, [Key]).
 
@@ -694,9 +712,9 @@ monitor(Key) ->
 %% @end
 demonitor({T,g,_} = Key, Ref) when T==n; T==a ->
     ?CHK_DIST,
-    call({demonitor, Key, Ref}, g);
+    call({demonitor, Key, Ref, self()}, g);
 demonitor({T,l,_} = Key, Ref) when T==n; T==a ->
-    call({demonitor, Key, Ref}, l);
+    call({demonitor, Key, Ref, self()}, l);
 demonitor(Key, Ref) ->
     erlang:error(badarg, [Key, Ref]).
 
@@ -830,7 +848,7 @@ unreg(Key) ->
         {_, l, _} ->
             case ets:member(?TAB, {Key,self()}) of
                 true ->
-                    _ = gproc_lib:remove_reg(Key, self()),
+                    _ = gproc_lib:remove_reg(Key, self(), unreg),
                     true;
                 false ->
                     erlang:error(badarg)
@@ -936,7 +954,7 @@ local_mreg(T, [_|_] = KVL) ->
     end.
 
 local_munreg(T, L) when T==p; T==c ->
-    _ = [gproc_lib:remove_reg({T,l,K}, self()) || K <- L],
+    _ = [gproc_lib:remove_reg({T,l,K}, self(), unreg) || K <- L],
     true.
 
 %% @spec (Key :: key(), Value) -> true
@@ -1350,27 +1368,22 @@ handle_cast({monitor_me, Pid}, S) ->
     erlang:monitor(process, Pid),
     {noreply, S};
 handle_cast({cancel_wait, Pid, {T,_,_} = Key, Ref}, S) ->
-    Rev = {Pid,Key},
     case ets:lookup(?TAB, {Key,T}) of
-        [{K, Waiters}] ->
-            case Waiters -- [{Pid,Ref}] of
-                [] ->
-                    ets:delete(?TAB, K),
-                    ets:delete(?TAB, Rev);
-                NewWaiters ->
-                    ets:insert(?TAB, {K, NewWaiters}),
-                    case lists:keymember(Pid, 1, NewWaiters) of
-                        true ->
-                            %% should be extremely unlikely
-                            ok;
-                        false ->
-                            %% delete the reverse entry
-                            ets:delete(?TAB, Rev)
-                    end
-            end;
+        [{_, Waiters}] ->
+	    gproc_lib:remove_wait(Key, Pid, Ref, Waiters);
         _ ->
             ignore
     end,
+    {noreply, S};
+handle_cast({cancel_wait_or_monitor, Pid, {T,_,_} = Key}, S) ->
+    case ets:lookup(?TAB, {Key, T}) of
+	[{_, Waiters}] ->
+	    gproc_lib:remove_wait(Key, Pid, all, Waiters);
+	[{_, OtherPid, _}] ->
+	    gproc_lib:remove_monitors(Key, OtherPid, Pid);
+	_ ->
+	    ok
+    end,
     {noreply, S}.
 
 %% @hidden
@@ -1382,7 +1395,7 @@ handle_call({reg, {_T,l,_} = Key, Val}, {Pid,_}, S) ->
         false ->
             {reply, badarg, S}
     end;
-handle_call({monitor, {T,_,_} = Key}, {Pid, _}, S)
+handle_call({monitor, {T,l,_} = Key, Pid}, _From, S)
   when T==n; T==a ->
     Ref = make_ref(),
     case where(Key) of
@@ -1391,13 +1404,13 @@ handle_call({monitor, {T,_,_} = Key}, {Pid, _}, S)
 	RegPid ->
 	    case ets:lookup(?TAB, {RegPid, Key}) of
 		[{K,r}] ->
-		    ets:insert(?TAB, {K, [{monitor, [Pid]}]});
+		    ets:insert(?TAB, {K, [{monitor, [{Pid,Ref}]}]});
 		[{K, Opts}] ->
-		    ets:insert(?TAB, {K, add_monitor(Opts, Pid, Ref)})
+		    ets:insert(?TAB, {K, gproc_lib:add_monitor(Opts, Pid, Ref)})
 	    end
     end,
     {reply, Ref, S};
-handle_call({demonitor, {T,_,_} = Key, Ref}, {Pid,_}, S)
+handle_call({demonitor, {T,l,_} = Key, Ref, Pid}, _From, S)
   when T==n; T==a ->
     case where(Key) of
 	undefined ->
@@ -1407,7 +1420,8 @@ handle_call({demonitor, {T,_,_} = Key, Ref}, {Pid,_}, S)
 		[{_K,r}] ->
 		    ok;   % be nice
 		[{K, Opts}] ->
-		    ets:insert(?TAB, {K, remove_monitor(Opts, Pid, Ref)})
+		    ets:insert(?TAB, {K, gproc_lib:remove_monitor(
+					   Opts, Pid, Ref)})
 	    end
     end,
     {reply, ok, S};
@@ -1422,10 +1436,10 @@ handle_call({reg_shared, {_T,l,_} = Key, Val}, _From, S) ->
 handle_call({unreg, {_,l,_} = Key}, {Pid,_}, S) ->
     case ets:lookup(?TAB, {Pid,Key}) of
         [{_, r}] ->
-            _ = gproc_lib:remove_reg(Key, Pid),
+            _ = gproc_lib:remove_reg(Key, Pid, unreg),
             {reply, true, S};
         [{_, Opts}] when is_list(Opts) ->
-            _ = gproc_lib:remove_reg(Key, Pid, Opts),
+            _ = gproc_lib:remove_reg(Key, Pid, unreg, Opts),
             {reply, true, S};
         [] ->
             {reply, badarg, S}
@@ -1433,9 +1447,9 @@ handle_call({unreg, {_,l,_} = Key}, {Pid,_}, S) ->
 handle_call({unreg_shared, {_,l,_} = Key}, _, S) ->
     case ets:lookup(?TAB, {shared, Key}) of
 	[{_, r}] ->
-	    _ = gproc_lib:remove_reg(Key, shared);
+	    _ = gproc_lib:remove_reg(Key, shared, unreg);
 	[{_, Opts}] ->
-	    _ = gproc_lib:remove_reg(Key, shared, Opts);
+	    _ = gproc_lib:remove_reg(Key, shared, unreg, Opts);
 	[] ->
 	    %% don't crash if shared key already unregged.
 	    ok
@@ -1552,6 +1566,7 @@ try_insert_reg({T,l,_} = Key, Val, Pid) ->
             true
     end.
 
+
 %% try_insert_shared({c,l,_} = Key, Val) ->
 %%     ets:insert_new(?TAB, [{{Key,shared}, shared, Val}, {{shared, Key}, []}]);
 %% try_insert_shared({a,l,_} = Key, Val) ->
@@ -1629,11 +1644,11 @@ do_give_away({T,l,_} = K, To, Pid) when T==n; T==a ->
                 ToPid when is_pid(ToPid) ->
                     ets:insert(?TAB, [{Key, ToPid, Value},
                                       {{ToPid, K}, []}]),
-                    ets:delete(?TAB, {Pid, K}),
+		    gproc_lib:remove_reverse_mapping({migrated,ToPid}, Pid, K),
                     _ = gproc_lib:ensure_monitor(ToPid, l),
                     ToPid;
                 undefined ->
-                    _ = gproc_lib:remove_reg(K, Pid),
+                    _ = gproc_lib:remove_reg(K, Pid, unreg),
                     undefined
             end;
         _ ->
@@ -1658,7 +1673,7 @@ do_give_away({T,l,_} = K, To, Pid) when T==c; T==p ->
                             ToPid
                     end;
                 undefined ->
-                    _ = gproc_lib:remove_reg(K, Pid),
+                    _ = gproc_lib:remove_reg(K, Pid, migrated),
                     undefined
             end;
         _ ->
@@ -1702,20 +1717,6 @@ set_monitors({Pids, Cont}) ->
     _ = [erlang:monitor(process,Pid) || Pid <- Pids],
     set_monitors(ets:select(Cont)).
 
-add_monitor([{monitor, Mons}|T], Pid, Ref) ->
-    [{monitor, [{Pid,Ref}|Mons]}|T];
-add_monitor([H|T], Pid, Ref) ->
-    [H|add_monitor(T, Pid, Ref)];
-add_monitor([], Pid, Ref) ->
-    [{monitor, [{Pid, Ref}]}].
-
-remove_monitor([{monitor, Mons}|T], Pid, Ref) ->
-    [{monitor, Mons -- [{Pid, Ref}]}|T];
-remove_monitor([H|T], Pid, Ref) ->
-    [H|remove_monitor(T, Pid, Ref)];
-remove_monitor([], _Pid, _Ref) ->
-    [].
-
 monitor_me() ->
     case ets:insert_new(?TAB, {{self(),l}}) of
         false -> true;

+ 97 - 25
src/gproc_dist.erl

@@ -279,6 +279,39 @@ handle_leader_call({reg, {C,g,Name} = K, Value, Pid}, _From, S, _E) ->
                 end,
             {reply, true, [{insert, Vals}], S}
     end;
+handle_leader_call({monitor, {T,g,_} = Key, Pid}, _From, S, _E)
+  when T==n; T==a ->
+    Ref = make_ref(),
+    case gproc:where(Key) of
+	undefined ->
+	    Pid ! {gproc, unreg, Ref, Key},
+	    {reply, Ref, [], S};
+	RegPid ->
+	    NewRev =
+		case ets:lookup_element(?TAB, K = {RegPid, Key}, 2) of
+		    r ->
+			{K, [{monitor, [{Pid,Ref}]}]};
+		    Opts ->
+			{K, gproc_lib:add_monitor(Opts, Pid, Ref)}
+		end,
+	    ets:insert(?TAB, NewRev),
+	    {reply, Ref, [{insert, [NewRev]}], S}
+    end;
+handle_leader_call({demonitor, {T,g,_} = Key, Ref, Pid}, _From, S, _E)
+  when T==n; T==a ->
+    case gproc:where(Key) of
+	undefined ->
+	    {reply, ok, [], S};
+	RegPid ->
+	    case ets:lookup_element(?TAB, K = {RegPid, Key}, 2) of
+		r ->
+		    {reply, ok, [], S};
+		Opts ->
+		    NewRev = {K, gproc_lib:remove_monitor(Opts, Pid, Ref)},
+		    ets:insert(?TAB, NewRev),
+		    {reply, Ref, [{insert, [NewRev]}], S}
+	    end
+    end;
 handle_leader_call({update_counter, {c,g,_Ctr} = Key, Incr, Pid}, _From, S, _E)
   when is_integer(Incr) ->
     try New = ets:update_counter(?TAB, {Key, Pid}, {3,Incr}),
@@ -310,18 +343,18 @@ handle_leader_call({unreg, {T,g,Name} = K, Pid}, _From, S, _E) ->
           end,
     case ets:member(?TAB, Key) of
         true ->
-            _ = gproc_lib:remove_reg(K, Pid),
+            _ = gproc_lib:remove_reg(K, Pid, unreg),
             if T == c ->
                     case ets:lookup(?TAB, {{a,g,Name},a}) of
                         [Aggr] ->
-                            %% updated by remove_reg/2
-                            {reply, true, [{delete,[Key, {Pid,K}]},
+                            %% updated by remove_reg/3
+                            {reply, true, [{delete,[Key, {Pid,K}], unreg},
                                            {insert, [Aggr]}], S};
                         [] ->
-                            {reply, true, [{delete, [Key, {Pid,K}]}], S}
+                            {reply, true, [{delete, [Key, {Pid,K}], unreg}], S}
                     end;
                true ->
-                    {reply, true, [{delete, [Key]}], S}
+                    {reply, true, [{delete, [Key, {Pid,K}], unreg}], S}
             end;
         false ->
             {reply, badarg, S}
@@ -338,12 +371,12 @@ handle_leader_call({give_away, {T,g,_} = K, To, Pid}, _From, S, _E)
                     ets:insert(?TAB, [{Key, ToPid, Value},
                                       {{ToPid,K}, []}]),
                     _ = gproc_lib:ensure_monitor(ToPid, g),
-                    {reply, ToPid, [{delete, [Key, {Pid,K}]},
-                                   {insert, [{Key, ToPid, Value}]}], S};
+                    {reply, ToPid, [{delete, [Key, {Pid,K}], {migrated,ToPid}},
+				    {insert, [{Key, ToPid, Value}]}], S};
                 undefined ->
                     ets:delete(?TAB, Key),
-                    ets:delete(?TAB, {Pid, K}),
-                    {reply, undefined, [{delete, [Key, {Pid,K}]}], S}
+		    Rev = gproc_lib:remove_reverse_mapping(unreg, Pid, K),
+                    {reply, undefined, [{delete, [Key, Rev], unreg}], S}
             end;
         _ ->
             {reply, badarg, S}
@@ -363,7 +396,7 @@ handle_leader_call({munreg, T, g, L, Pid}, _From, S, _E) ->
         [] ->
             {reply, true, S};
         Objs ->
-            {reply, true, [{delete, Objs}], S}
+            {reply, true, [{delete, Objs, unreg}], S}
     catch
         error:_ -> {reply, badarg, S}
     end;
@@ -434,8 +467,25 @@ handle_leader_cast({add_globals, Missing}, S, _E) ->
     ets:insert(?TAB, Missing),
     {ok, [{insert, Missing}], S};
 handle_leader_cast({remove_globals, Globals}, S, _E) ->
-    delete_globals(Globals),
+    delete_globals(Globals, []),
     {ok, S};
+handle_leader_cast({cancel_wait, Pid, {T,_,_} = Key, Ref}, S, _E) ->
+    case ets:lookup(?TAB, {Key, T}) of
+	[{_, Waiters}] ->
+	    Ops = gproc_lib:remove_wait(Key, Pid, Ref, Waiters),
+	    {ok, Ops, S};
+	_ ->
+	    {ok, [], S}
+    end;
+handle_leader_cast({cancel_wait_or_monitor, Pid, {T,_,_} = Key}, S, _E) ->
+    case ets:lookup(?TAB, {Key, T}) of
+	[{_, Waiters}] ->
+	    Ops = gproc_lib:remove_wait(Key, Pid, all, Waiters),
+	    {ok, Ops, S};
+	[{_, OtherPid, _}] ->
+	    Ops = gproc_lib:remove_monitors(Key, OtherPid, Pid),
+	    {ok, Ops, S}
+    end;
 handle_leader_cast({pid_is_DOWN, Pid}, S, _E) ->
     Globals = ets:select(?TAB, [{{{Pid,'$1'}, '_'},
                                  [{'==',{element,2,'$1'},g}],[{{'$1',Pid}}]}]),
@@ -460,11 +510,24 @@ process_globals(Globals) ->
                        end,
                   K = ets_key(Key, Pid),
                   ets:delete(?TAB, K),
-                  ets:delete(?TAB, {Pid,Key}),
+                  remove_rev_entry(Pid, Key, unreg),
                   A1
           end, [], Globals),
-    [{Op,Objs} || {Op,Objs} <- [{insert,Modified},
-                                {delete,Globals}], Objs =/= []].
+    [{insert, Modified} || Modified =/= []] ++
+	[{delete, Globals, unreg} || Globals =/= []].
+
+remove_rev_entry(Pid, {T,g,_} = K, Event) when T==n; T==a ->
+    Key = {Pid, K},
+    case ets:lookup(?TAB, Key) of
+	[]       -> ok;
+	[{_, r}] -> ok;
+	[{_, Opts}] when is_list(Opts) ->
+	    gproc_lib:notify(Event, K, Opts)
+    end,
+    ets:delete(?TAB, Key);
+remove_rev_entry(Pid, K, _Event) ->
+    ets:delete(?TAB, {Pid, K}).
+
 
 code_change(_FromVsn, S, _Extra, _E) ->
     {ok, S}.
@@ -477,15 +540,16 @@ from_leader({sync, Ref}, S, _E) ->
     {ok, S};
 from_leader(Ops, S, _E) ->
     lists:foreach(
-      fun({delete, Globals}) ->
-              delete_globals(Globals);
+      fun({delete, Globals, Event}) ->
+              delete_globals(Globals, Event);
          ({insert, Globals}) ->
               ets:insert(?TAB, Globals),
               lists:foreach(
                 fun({{{_,g,_}=Key,_}, P, _}) ->
-                        ets:insert(?TAB, {{P,Key}, []}),
+                        ets:insert_new(?TAB, {{P,Key}, []}),
                         gproc_lib:ensure_monitor(P,g);
-                   ({{P,_K}, _}) when is_pid(P) ->
+                   ({{P,_K}, _Opts} = Obj) when is_pid(P) ->
+			ets:insert(?TAB, Obj),
                         gproc_lib:ensure_monitor(P,g);
                    (_) ->
                         skip
@@ -493,16 +557,18 @@ from_leader(Ops, S, _E) ->
       end, Ops),
     {ok, S}.
 
-delete_globals(Globals) ->
+delete_globals(Globals, Event) ->
     lists:foreach(
       fun({{_,g,_},T} = K) when is_atom(T) ->
               ets:delete(?TAB, K);
          ({Key, Pid}) when is_pid(Pid); Pid==shared ->
               K = ets_key(Key,Pid),
               ets:delete(?TAB, K),
-              ets:delete(?TAB, {Pid, Key});
-         ({Pid, K}) when is_pid(Pid); Pid==shared ->
-              ets:delete(?TAB, {Pid, K})
+	      remove_rev_entry(Pid, Key, Event);
+         ({Pid, Key}) when is_pid(Pid); Pid==shared ->
+	      K = ets_key(Key, Pid),
+	      ets:delete(?TAB, K),
+	      remove_rev_entry(Pid, Key, Event)
       end, Globals).
 
 ets_key({T,_,_} = K, _) when T==n; T==a ->
@@ -539,13 +605,18 @@ surrendered_1(Globs) ->
     Ldr_local_globs =
         lists:foldl(
           fun({{Key,_}=K, Pid, V}, Acc) when node(Pid) =/= node() ->
-                  ets:insert(?TAB, [{K, Pid, V}, {{Pid,Key}}]),
+                  ets:insert(?TAB, {K, Pid, V}),
+		  ets:insert_new(?TAB, {{Pid,Key}, r}),
                   Acc;
+	     ({{Pid,_}=K, Opts}, Acc) when node(Pid) =/= node() ->
+		     ets:insert(?TAB, {K, Opts}),
+		     Acc;
              ({_, Pid, _} = Obj, Acc) when node(Pid) == node() ->
                   [Obj|Acc]
           end, [], Globs),
     case [{K,P,V} || {K,P,V} <- My_local_globs,
-                   not(lists:keymember(K, 1, Ldr_local_globs))] of
+		     is_pid(P) andalso
+			 not(lists:keymember(K, 1, Ldr_local_globs))] of
         [] ->
             %% phew! We have the same picture
             ok;
@@ -554,7 +625,8 @@ surrendered_1(Globs) ->
             leader_cast({add_globals, Missing})
     end,
     case [{K,P} || {K,P,_} <- Ldr_local_globs,
-                   not(lists:keymember(K, 1, My_local_globs))] of
+		   is_pid(P) andalso
+		       not(lists:keymember(K, 1, My_local_globs))] of
         [] ->
             ok;
         [_|_] = Remove ->

+ 94 - 9
src/gproc_lib.erl

@@ -28,8 +28,13 @@
          insert_many/4,
          insert_reg/4,
          remove_many/4,
-         remove_reg/2, remove_reg/3,
-	 notify/2,
+         remove_reg/3, remove_reg/4,
+	 add_monitor/3,
+	 remove_monitor/3,
+	 remove_monitors/3,
+	 remove_reverse_mapping/3,
+	 notify/2, notify/3,
+	 remove_wait/4,
          update_aggr_counter/3,
          update_counter/3,
 	 valid_opts/2]).
@@ -182,6 +187,51 @@ notify_waiters(Waiters, K, Pid, V) ->
          end || {P, Ref} <- Waiters],
     ok.
 
+remove_wait({T,_,_} = Key, Pid, Ref, Waiters) ->
+    Rev = {Pid,Key},
+    case remove_from_waiters(Waiters, Pid, Ref) of
+	[] ->
+	    ets:delete(?TAB, {Key,T}),
+	    ets:delete(?TAB, Rev),
+	    [{delete, [{Key,T}, Rev], []}];
+	NewWaiters ->
+	    ets:insert(?TAB, {Key, NewWaiters}),
+	    case lists:keymember(Pid, 1, NewWaiters) of
+		true ->
+		    %% should be extremely unlikely
+		    [{insert, [{Key, NewWaiters}]}];
+		false ->
+		    %% delete the reverse entry
+		    ets:delete(?TAB, Rev),
+		    [{insert, [{Key, NewWaiters}]},
+		     {delete, [Rev], []}]
+	    end
+    end.
+
+remove_from_waiters(Waiters, Pid, all) ->
+    [{P,R} || {P,R} <- Waiters,
+	      P =/= Pid];
+remove_from_waiters(Waiters, Pid, Ref) ->
+    Waiters -- [{Pid, Ref}].
+
+remove_monitors(Key, Pid, MPid) ->
+    case ets:lookup(?TAB, {Pid, Key}) of
+	[{_, r}] ->
+	    [];
+	[{K, Opts}] when is_list(Opts) ->
+	    case lists:keyfind(monitors, 1, Opts) of
+		false ->
+		    [];
+		{_, Ms} ->
+		    Ms1 = [{P,R} || {P,R} <- Ms,
+				    P =/= MPid],
+		    NewMs = lists:keyreplace(monitors, 1, {monitors,Ms1}),
+		    ets:insert(?TAB, {K, NewMs}),
+		    [{insert, [{{Pid,Key}, NewMs}]}]
+	    end;
+	_ ->
+	    []
+    end.
 
 
 mk_reg_objs(T, Scope, Pid, L) when T==n; T==a ->
@@ -209,27 +259,62 @@ ensure_monitor(Pid, Scope) when Scope==g; Scope==l ->
         true  -> erlang:monitor(process, Pid)
     end.
 
-remove_reg(Key, Pid) ->
-    remove_reg(Key, Pid, []).
+remove_reg(Key, Pid, Event) ->
+    remove_reg(Key, Pid, Event, []).
 
-remove_reg(Key, Pid, Opts) ->
+remove_reg(Key, Pid, Event, Opts) ->
     Reg = remove_reg_1(Key, Pid),
-    ets:delete(?TAB, Rev = {Pid,Key}),
-    notify(Key, Opts),
+    Rev = remove_reverse_mapping(Event, Pid, Key, Opts),
     [Reg, Rev].
 
+remove_reverse_mapping(Event, Pid, Key) ->
+    Opts = case ets:lookup(?TAB, {Pid, Key}) of
+	       [] ->       [];
+	       [{_, r}] -> [];
+	       [{_, L}] when is_list(L) ->
+		   L
+	   end,
+    remove_reverse_mapping(Event, Pid, Key, Opts).
+
+remove_reverse_mapping(Event, Pid, Key, Opts) when Event==unreg;
+						   element(1,Event)==migrated ->
+    Rev = {Pid, Key},
+    notify(Event, Key, Opts),
+    ets:delete(?TAB, Rev),
+    Rev.
+
 notify(Key, Opts) ->
+    notify(unreg, Key, Opts).
+
+notify([], _, _) ->
+    [];
+notify(Event, Key, Opts) ->
     case lists:keyfind(monitor, 1, Opts) of
 	false ->
 	    [];
 	{_, Mons} ->
-	    [begin P ! {gproc, unreg, Ref, Key}, P end || {P, Ref} <- Mons]
+	    [begin P ! {gproc, Event, Ref, Key}, P end || {P, Ref} <- Mons,
+							  node(P) == node()]
     end.
 
+add_monitor([{monitor, Mons}|T], Pid, Ref) ->
+    [{monitor, [{Pid,Ref}|Mons]}|T];
+add_monitor([H|T], Pid, Ref) ->
+    [H|add_monitor(T, Pid, Ref)];
+add_monitor([], Pid, Ref) ->
+    [{monitor, [{Pid, Ref}]}].
+
+remove_monitor([{monitor, Mons}|T], Pid, Ref) ->
+    [{monitor, Mons -- [{Pid, Ref}]}|T];
+remove_monitor([H|T], Pid, Ref) ->
+    [H|remove_monitor(T, Pid, Ref)];
+remove_monitor([], _Pid, _Ref) ->
+    [].
+
 remove_many(T, Scope, L, Pid) ->
     lists:flatmap(fun(K) ->
                           Key = {T, Scope, K},
-                          remove_reg(Key, Pid, unreg_opts(Key, Pid))
+                          remove_reg(Key, Pid, unreg, unreg_opts(Key, Pid))
                   end, L).
 
 unreg_opts(Key, Pid) ->

+ 62 - 7
src/gproc_monitor.erl

@@ -82,7 +82,7 @@ unsubscribe({T,S,_} = Key) when (T==n orelse T==a)
     catch
 	error:badarg -> ok
     end,
-    ok.
+    gen_server:cast(?SERVER, {unsubscribe, self(), Key}).
 
 %%--------------------------------------------------------------------
 %% @doc
@@ -120,6 +120,7 @@ start_link() ->
 %% @end
 %%--------------------------------------------------------------------
 init(Parent) ->
+    process_flag(priority, high),
     register(?SERVER, self()),
     proc_lib:init_ack(Parent, {ok, self()}),
     receive {'ETS-TRANSFER',?TAB,_,_} -> ok end,
@@ -155,8 +156,13 @@ handle_call(_Request, _From, State) ->
 %%--------------------------------------------------------------------
 handle_cast({subscribe, Pid, Key}, State) ->
     Status = gproc:where(Key),
+    add_subscription(Pid, Key),
     do_monitor(Key, Status),
     Pid ! {?MODULE, Key, Status},
+    monitor_pid(Pid),
+    {noreply, State};
+handle_cast({unsubscribe, Pid, Key}, State) ->
+    del_subscription(Pid, Key),
     {noreply, State}.
 
 %%--------------------------------------------------------------------
@@ -170,16 +176,25 @@ handle_cast({subscribe, Pid, Key}, State) ->
 %% @end
 %%--------------------------------------------------------------------
 handle_info({gproc, unreg, _Ref, Name}, State) ->
-    ets:delete(?TAB, Name),
+    ets:delete(?TAB, {m, Name}),
     notify(Name, undefined),
     do_monitor(Name, undefined),
     {noreply, State};
+handle_info({gproc, {migrated,ToPid}, _Ref, Name}, State) ->
+    ets:delete(?TAB, {m, Name}),
+    notify(Name, {migrated, ToPid}),
+    do_monitor(Name, ToPid),
+    {noreply, State};
 handle_info({gproc, _, registered, {{T,_,_} = Name, Pid, _}}, State)
   when T==n; T==a ->
+    ets:delete(?TAB, {w, Name}),
     notify(Name, Pid),
     do_monitor(Name, Pid),
     {noreply, State};
-handle_info(_, State) ->
+handle_info({'DOWN', _, process, Pid, _}, State) ->
+    pid_is_down(Pid),
+    {noreply, State};
+handle_info(_Msg, State) ->
     {noreply, State}.
 
 
@@ -212,16 +227,56 @@ code_change(_OldVsn, State, _Extra) ->
 %%% Internal functions
 %%%===================================================================
 
+add_subscription(Pid, {_,_,_} = Key) when is_pid(Pid) ->
+    ets:insert(?TAB, [{{s, Key, Pid}}, {{r, Pid, Key}}]).
+
+del_subscription(Pid, Key) ->
+    ets:delete(?TAB, {{s, Key, Pid}}),
+    ets:delete(?TAB, {{r, Pid, Key}}),
+    maybe_cancel_wait(Key).
+
 do_monitor(Name, undefined) ->
-    gproc:nb_wait(Name);
-do_monitor(Name, Pid) when is_pid(Pid) ->
-    case ets:member(?TAB, Name) of
+    case ets:member(?TAB, {w, Name}) of
 	false ->
+	    Ref = gproc:nb_wait(Name),
+	    ets:insert(?TAB, {{w, Name}, Ref})
+    end;
+do_monitor(Name, Pid) when is_pid(Pid) ->
+    case ets:member(?TAB, {m, Name}) of
+	true ->
+	    ok;
+	_ ->
 	    Ref = gproc:monitor(Name),
-	    ets:insert(?TAB, {Name, Ref});
+	    ets:insert(?TAB, {{m, Name}, Ref})
+    end.
+
+monitor_pid(Pid) when is_pid(Pid) ->
+    case ets:member(?TAB, {p,Pid}) of
+	false ->
+	    Ref = erlang:monitor(process, Pid),
+	    ets:insert(?TAB, {{p,Pid}, Ref});
 	true ->
 	    ok
     end.
 
+pid_is_down(Pid) ->
+    Keys = ets:select(?TAB, [{ {{r, Pid, '$1'}}, [], ['$1'] }]),
+    ets:select_delete(?TAB, [{ {{r, Pid, '$1'}}, [], [true] }]),
+    lists:foreach(fun(K) ->
+			  ets:delete(?TAB, {s,K,Pid}),
+			  maybe_cancel_wait(K)
+		  end, Keys),
+    ets:delete(?TAB, {p, Pid}).
+
+maybe_cancel_wait(K) ->
+    case ets:next(?TAB, {s,K}) of
+	{s,K,P} when is_pid(P) ->
+	    ok;
+	_ ->
+	    gproc:cancel_wait_or_monitor(K),
+	    ets:delete(?TAB, {m, K}),
+	    ets:delete(?TAB, {w, K})
+    end.
+
 notify(Name, Where) ->
     gproc:send({p, l, {?MODULE, Name}}, {?MODULE, Name, Where}).

+ 46 - 1
test/gproc_dist_tests.erl

@@ -69,6 +69,12 @@ dist_test_() ->
 			       end,
 			       fun() ->
 				       ?debugVal(t_sync(Ns))
+			       end,
+			       fun() ->
+				       ?debugVal(t_monitor(Ns))
+			       end,
+			       fun() ->
+				       ?debugVal(t_subscribe(Ns))
 			       end
 			      ]
 		 },
@@ -164,7 +170,7 @@ t_await_reg([A,B|_]) ->
 
 t_await_self([A|_]) ->
     Name = ?T_NAME,
-    P = t_spawn(A, false),  % buffer unknowns
+    P = t_spawn(A, false),  % don't buffer unknowns
     Ref = t_call(P, {apply, gproc, nb_wait, [Name]}),
     ?assertMatch(ok, t_call(P, {selective, true})),
     ?assertMatch(true, t_call(P, {apply, gproc, reg, [Name, some_value]})),
@@ -216,6 +222,45 @@ t_sync(Ns) ->
     [?assertMatch(true, rpc:call(N, gproc_dist, sync, []))
      || N <- Ns].
 
+t_monitor([A,B|_]) ->
+    Na = ?T_NAME,
+    Pa = t_spawn_reg(A, Na),
+    Pb = t_spawn(B, _Selective = true),
+    Ref = t_call(Pb, {apply, gproc, monitor, [Na]}),
+    ?assert(is_reference(Ref)),
+    ?assertMatch(ok, t_call(Pa, die)),
+    ?assertMatch({gproc,unreg,Ref,Na}, got_msg(Pb, gproc)),
+    Pc = t_spawn_reg(A, Na),
+    Ref1 = t_call(Pb, {apply, gproc, monitor, [Na]}),
+    ?assertMatch(true, t_call(Pc, {apply, gproc, unreg, [Na]})),
+    ?assertMatch({gproc,unreg,Ref1,Na}, got_msg(Pb, gproc)).
+
+t_subscribe([A,B|_] = Ns) ->
+    Na = ?T_NAME,
+    Pb = t_spawn(B, _Selective = true),
+    ?assertEqual(ok, t_call(Pb, {apply, gproc_monitor, subscribe, [Na]})),
+    ?assertMatch({gproc_monitor, Na, undefined}, got_msg(Pb, gproc_monitor)),
+    Pa = t_spawn_reg(A, Na),
+    ?assertMatch({gproc_monitor, Na, Pa}, got_msg(Pb, gproc_monitor)),
+    Pc = t_spawn(A),
+    t_call(Pa, {apply, gproc, give_away, [Na, Pc]}),
+    ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pc)),
+    ?assertEqual({gproc_monitor,Na,{migrated,Pc}}, got_msg(Pb, gproc_monitor)),
+    ?assertEqual(ok, t_call(Pc, die)),
+    ?assertEqual({gproc_monitor,Na,undefined}, got_msg(Pb, gproc_monitor)).
+
+got_msg(Pb, Tag) ->
+    t_call(Pb,
+	   {apply_fun,
+	    fun() ->
+		    receive
+			M when element(1, M) == Tag ->
+			    M
+		    after 1000 ->
+			    timeout
+		    end
+	    end}).
+
 %% Verify that the gproc_dist:sync() call returns true even if a candidate dies
 %% while the sync is underway. This test makes use of sys:suspend() to ensure that
 %% the other candidate doesn't respond too quickly.

+ 92 - 23
test/gproc_tests.erl

@@ -69,47 +69,53 @@ reg_test_() ->
 	     application:stop(mnesia)
      end,
      [
-      {spawn, ?_test(t_simple_reg())}
+      {spawn, ?_test(?debugVal(t_simple_reg()))}
       , ?_test(t_is_clean())
-      , {spawn, ?_test(t_simple_counter())}
+      , {spawn, ?_test(?debugVal(t_simple_counter()))}
       , ?_test(t_is_clean())
-      , {spawn, ?_test(t_simple_aggr_counter())}
+      , {spawn, ?_test(?debugVal(t_simple_aggr_counter()))}
       , ?_test(t_is_clean())
-      , {spawn, ?_test(t_simple_prop())}
+      , {spawn, ?_test(?debugVal(t_simple_prop()))}
       , ?_test(t_is_clean())
-      , {spawn, ?_test(t_await())}
+      , {spawn, ?_test(?debugVal(t_await()))}
       , ?_test(t_is_clean())
-      , {spawn, ?_test(t_await_self())}
+      , {spawn, ?_test(?debugVal(t_await_self()))}
       , ?_test(t_is_clean())
-      , {spawn, ?_test(t_simple_mreg())}
+      , {spawn, ?_test(?debugVal(t_simple_mreg()))}
       , ?_test(t_is_clean())
-      , {spawn, ?_test(t_gproc_crash())}
+      , {spawn, ?_test(?debugVal(t_gproc_crash()))}
       , ?_test(t_is_clean())
-      , {spawn, ?_test(t_cancel_wait_and_register())}
+      , {spawn, ?_test(?debugVal(t_cancel_wait_and_register()))}
       , ?_test(t_is_clean())
-      , {spawn, ?_test(t_give_away_to_pid())}
+      , {spawn, ?_test(?debugVal(t_give_away_to_pid()))}
       , ?_test(t_is_clean())
-      , {spawn, ?_test(t_give_away_to_self())}
+      , {spawn, ?_test(?debugVal(t_give_away_to_self()))}
       , ?_test(t_is_clean())
-      , {spawn, ?_test(t_give_away_badarg())}
+      , {spawn, ?_test(?debugVal(t_give_away_badarg()))}
       , ?_test(t_is_clean())
-      , {spawn, ?_test(t_give_away_to_unknown())}
+      , {spawn, ?_test(?debugVal(t_give_away_to_unknown()))}
       , ?_test(t_is_clean())
-      , {spawn, ?_test(t_give_away_and_back())}
+      , {spawn, ?_test(?debugVal(t_give_away_and_back()))}
       , ?_test(t_is_clean())
-      , {spawn, ?_test(t_select())}
+      , {spawn, ?_test(?debugVal(t_select()))}
       , ?_test(t_is_clean())
-      , {spawn, ?_test(t_select_count())}
+      , {spawn, ?_test(?debugVal(t_select_count()))}
       , ?_test(t_is_clean())
-      , {spawn, ?_test(t_qlc())}
+      , {spawn, ?_test(?debugVal(t_qlc()))}
       , ?_test(t_is_clean())
-      , {spawn, ?_test(t_get_env())}
+      , {spawn, ?_test(?debugVal(t_get_env()))}
       , ?_test(t_is_clean())
-      , {spawn, ?_test(t_get_set_env())}
+      , {spawn, ?_test(?debugVal(t_get_set_env()))}
       , ?_test(t_is_clean())
-      , {spawn, ?_test(t_set_env())}
+      , {spawn, ?_test(?debugVal(t_set_env()))}
       , ?_test(t_is_clean())
-      , {spawn, ?_test(t_get_env_inherit())}
+      , {spawn, ?_test(?debugVal(t_get_env_inherit()))}
+      , ?_test(t_is_clean())
+      , {spawn, ?_test(?debugVal(t_monitor()))}
+      , ?_test(t_is_clean())
+      , {spawn, ?_test(?debugVal(t_monitor_give_away()))}
+      , ?_test(t_is_clean())
+      , {spawn, ?_test(?debugVal(t_subscribe()))}
       , ?_test(t_is_clean())
      ]}.
 
@@ -195,8 +201,11 @@ t_await_self() ->
 
 t_is_clean() ->
     sys:get_status(gproc), % in order to synch
+    sys:get_status(gproc_monitor),
     T = ets:tab2list(gproc),
-    ?assertMatch([], T).
+    Tm = ets:tab2list(gproc_monitor),
+    ?assertMatch([], Tm),
+    ?assertMatch([], T -- [{{whereis(gproc_monitor), l}}]).
 
 t_simple_mreg() ->
     P = self(),
@@ -451,9 +460,69 @@ t_get_env_inherit() ->
 		   end),
     ?assertEqual({P,undefined}, gproc:await({n,l,get_env_p},1000)),
     ?assertEqual(bar, gproc:get_env(l, gproc, foo, [{inherit, P}])),
-    ?assertEqual(bar, gproc:get_env(l, gproc, foo, [{inherit, {n,l,get_env_p}}])),
+    ?assertEqual(bar, gproc:get_env(l, gproc, foo,
+				    [{inherit, {n,l,get_env_p}}])),
     ?assertEqual(ok, t_call(P, die)).
 
+t_monitor() ->
+    Me = self(),
+    P = spawn_link(fun() ->
+			   gproc:reg({n,l,a}),
+			   Me ! continue,
+			   t_loop()
+		   end),
+    receive continue ->
+	    ok
+    end,
+    Ref = gproc:monitor({n,l,a}),
+    ?assertEqual(ok, t_call(P, die)),
+    receive
+	M ->
+	    ?assertEqual({gproc,unreg,Ref,{n,l,a}}, M)
+    end.
+
+t_monitor_give_away() ->
+    Me = self(),
+    P = spawn_link(fun() ->
+			   gproc:reg({n,l,a}),
+			   Me ! continue,
+			   t_loop()
+		   end),
+    receive continue ->
+	    ok
+    end,
+    Ref = gproc:monitor({n,l,a}),
+    ?assertEqual(ok, t_call(P, {give_away, {n,l,a}})),
+    receive
+	M ->
+	    ?assertEqual({gproc,{migrated,Me},Ref,{n,l,a}}, M)
+    end,
+    ?assertEqual(ok, t_call(P, die)).
+
+t_subscribe() ->
+    Key = {n,l,a},
+    ?assertEqual(ok, gproc_monitor:subscribe(Key)),
+    ?assertEqual({gproc_monitor, Key, undefined}, get_msg()),
+    P = spawn_link(fun() ->
+			   gproc:reg({n,l,a}),
+			   t_loop()
+		   end),
+    ?assertEqual({gproc_monitor, Key, P}, get_msg()),
+    ?assertEqual(ok, t_call(P, {give_away, Key})),
+    ?assertEqual({gproc_monitor, Key, {migrated,self()}}, get_msg()),
+    gproc:give_away(Key, P),
+    ?assertEqual({gproc_monitor, Key, {migrated,P}}, get_msg()),
+    ?assertEqual(ok, t_call(P, die)),
+    ?assertEqual({gproc_monitor, Key, undefined}, get_msg()),
+    ?assertEqual(ok, gproc_monitor:unsubscribe(Key)).
+
+get_msg() ->
+    receive M ->
+	    M
+    after 1000 ->
+	    timeout
+    end.
+
 t_loop() ->
     receive
 	{From, {give_away, Key}} ->