Browse Source

Merge pull request #142 from uwiger/uw-demonitor-bug

fix demonitor + add test cases
Ulf Wiger 7 years ago
parent
commit
52ef4dfca7
6 changed files with 151 additions and 25 deletions
  1. 34 12
      src/gproc.erl
  2. 46 12
      src/gproc_dist.erl
  3. 8 0
      src/gproc_lib.erl
  4. 32 0
      test/gproc_dist_tests.erl
  5. 14 1
      test/gproc_test_lib.erl
  6. 17 0
      test/gproc_tests.erl

+ 34 - 12
src/gproc.erl

@@ -2331,18 +2331,40 @@ handle_call({monitor, {T,l,_} = Key, Pid, Type}, _From, S)
     {reply, Ref, S};
 handle_call({demonitor, {T,l,_} = Key, Ref, Pid}, _From, S)
   when T==n; T==a; T==rc ->
-    _ = case where(Key) of
-	    undefined ->
-		ok;  % be nice
-	    RegPid ->
-		case ets:lookup(?TAB, {RegPid, Key}) of
-		    [{_K,r}] ->
-			ok;   % be nice
-		    [{K, Opts}] ->
-			ets:insert(?TAB, {K, gproc_lib:remove_monitor(
-					       Opts, Pid, Ref)})
-		end
-	end,
+    _ = case ets:lookup(?TAB, {Key, T}) of
+            [] ->
+                ok;  % be nice
+            [{_, Waiters}] ->
+                case lists:filter(fun({P, R, _}) ->
+                                          P =/= Pid orelse R =/= Ref
+                                  end, Waiters) of
+                    [] ->
+                        ets:delete(?TAB, {Pid, Key}),
+                        ets:delete(?TAB, {Key, T});
+                    NewWaiters ->
+                        case lists:keymember(Pid, 1, NewWaiters) of
+                            true ->
+                                ok;
+                            false ->
+                                ets:delete(?TAB, {Pid, Key})
+                        end,
+                        ets:insert(?TAB, {{Key, T}, Waiters})
+                end;
+            [{_, RegPid, _}] ->
+                case ets:lookup(?TAB, {RegPid, Key}) of
+                    [{_K,r}] ->
+                        ok;   % be nice
+                    [{K, Opts}] ->
+                        Opts1 = gproc_lib:remove_monitor(Opts, Pid, Ref),
+                        ets:insert(?TAB, {K, Opts1}),
+                        case gproc_lib:does_pid_monitor(Pid, Opts) of
+                            true ->
+                                ok;
+                            false ->
+                                ets:delete(?TAB, {Pid, Key})
+                        end
+                end
+        end,
     {reply, ok, S};
 handle_call({reg_shared, {_T,l,_} = Key, Val, Attrs, Op}, _From, S) ->
     case try_insert_reg(Key, Val, shared) of

+ 46 - 12
src/gproc_dist.erl

@@ -439,13 +439,32 @@ handle_leader_call({demonitor, {T,g,_} = K, MPid, Ref}, _From, S, _E) ->
             Opts1 = gproc_lib:remove_monitors(Opts, MPid, Ref),
             Obj = {{Pid,K}, Opts1},
             ets:insert(?TAB, Obj),
-            ets:delete(?TAB, {MPid, K}),
-            {reply, ok, [{delete, [{MPid,K}]},
-                         {insert, [Obj]}], S};
+            Del = case gproc_lib:does_pid_monitor(MPid, Opts1) of
+                      true -> [];
+                      false ->
+                          ets:delete(?TAB, {MPid, K}),
+                          [{delete, [{MPid, K}]}]
+                  end,
+            {reply, ok, Del ++ [{insert, [Obj]}], S};
         [{Key, Waiters}] ->
-            NewWaiters = [W || W <- Waiters,
-                               W =/= {MPid, Ref, follow}],
-            {reply, ok, [{insert, [{Key, NewWaiters}]}], S};
+            case lists:filter(fun({P, R, _}) ->
+                                      P =/= MPid orelse R =/= Ref
+                              end, Waiters) of
+                [] ->
+                    ets:delete(?TAB, {MPid, K}),
+                    ets:delete(?TAB, Key),
+                    {reply, ok, [{delete, [{MPid, K}, Key]}], S};
+                NewWaiters ->
+                    ets:insert(?TAB, {Key, NewWaiters}),
+                    Del = case lists:keymember(MPid, 1, NewWaiters) of
+                              false ->
+                                  ets:delete(?TAB, {MPid, K}),
+                                  [{delete, [{MPid, K}]}];
+                              true ->
+                                  []
+                          end,
+                    {reply, ok, Del ++ [{insert, [{Key, NewWaiters}]}], S}
+            end;
         _ ->
             {reply, ok, S}
     end;
@@ -789,7 +808,7 @@ remove_entry(Key, Pid, Event) ->
 	    ets:delete(?TAB, {Pid, Key}),
 	    [];
         [{_, _Waiters}] ->
-            %% Skip
+            ets:delete(?TAB, K),
             [];
 	[] -> []
     end.
@@ -837,7 +856,10 @@ insert_globals(Globals) ->
 	      ets:insert_new(?TAB, {{Pid,Key}, []}),
 	      gproc_lib:ensure_monitor(Pid,g),
 	      A;
-	 ({{P,_K}, Opts} = Obj, A) when is_pid(P), is_list(Opts),Opts =/= [] ->
+         ({{{_,_,_},_}, _} = Obj, A) ->
+              ets:insert(?TAB, Obj),
+              A;
+         ({{P,_K}, Opts} = Obj, A) when is_pid(P), is_list(Opts) ->
 	      ets:insert(?TAB, Obj),
 	      gproc_lib:ensure_monitor(P,g),
 	      [Obj] ++ A;
@@ -857,9 +879,11 @@ delete_globals(Globals) ->
 	      ets:delete(?TAB, {Pid, Key})
       end, Globals).
 
-do_notify([{P, Msg}|T]) when is_pid(P) ->
+do_notify([{P, Msg}|T]) when is_pid(P), node(P) =:= node() ->
     P ! Msg,
     do_notify(T);
+do_notify([{P, Msg}|T]) when is_pid(P) ->
+    do_notify(T);
 do_notify([{K, P, E}|T]) ->
     case ets:lookup(?TAB, {P,K}) of
         [{_, Opts}] when is_list(Opts) ->
@@ -1060,10 +1084,10 @@ tell_waiters([], _, _, _, _) ->
 
 add_follow_to_waiters(Waiters, {T,_,_} = K, Pid, Ref, S) ->
     Obj = {{K,T}, [{Pid, Ref, follow}|Waiters]},
-    Rev = {{Pid,K}, []},
-    ets:insert(?TAB, [Obj, Rev]),
+    ets:insert(?TAB, Obj),
+    Rev = ensure_rev({Pid, K}),
     Msg = {gproc, unreg, Ref, K},
-    if node(Pid) == node() ->
+    if node(Pid) =:= node() ->
             Pid ! Msg,
             {reply, Ref, [{insert, [Obj, Rev]}], S};
        true ->
@@ -1071,5 +1095,15 @@ add_follow_to_waiters(Waiters, {T,_,_} = K, Pid, Ref, S) ->
                           {notify, [{Pid, Msg}]}], S}
     end.
 
+ensure_rev(K) ->
+    case ets:lookup(?TAB, K) of
+        [Rev] ->
+            Rev;
+        [] ->
+            Rev = {K, []},
+            ets:insert(?TAB, Rev),
+            Rev
+    end.
+
 regged_new(reg   ) -> true;
 regged_new(ensure) -> new.

+ 8 - 0
src/gproc_lib.erl

@@ -34,6 +34,7 @@
          standbys/1,
          followers/1,
          remove_monitor_pid/2,
+         does_pid_monitor/2,
 	 add_monitor/4,
 	 remove_monitor/3,
 	 remove_monitors/3,
@@ -302,6 +303,13 @@ remove_monitors(Key, Pid, MPid) ->
 	    []
     end.
 
+does_pid_monitor(Pid, Opts) ->
+    case lists:keyfind(monitors, 1, Opts) of
+        false ->
+            false;
+        {_, Ms} ->
+            lists:keymember(Pid, 1, Ms)
+    end.
 
 mk_reg_objs(T, Scope, Pid, L) when T==n; T==a; T==rc ->
     lists:map(fun({K,V}) ->

+ 32 - 0
test/gproc_dist_tests.erl

@@ -87,6 +87,7 @@ basic_tests(Ns) ->
      ?f(t_standby_monitor(Ns)),
      ?f(t_standby_monitor_unreg(Ns)),
      ?f(t_follow_monitor(Ns)),
+     ?f(t_monitor_demonitor(Ns)),
      ?f(t_subscribe(Ns))
     ].
 
@@ -496,6 +497,36 @@ t_follow_monitor([A,B|_]) ->
     ok = t_call(Pb, die),
     ok = t_call(Pa, die).
 
+t_monitor_demonitor([A,B|_]) ->
+    Na = ?T_NAME,
+    Pa = t_spawn(A, Selective = true),
+    Pa2 = t_spawn(A, Selective),
+    Pb = t_spawn(B, Selective),
+    Pb2 = t_spawn(B, Selective),
+    RefA = t_call(Pa, {apply, gproc, monitor, [Na, follow]}),
+    RefA2 = t_call(Pa2, {apply, gproc, monitor, [Na, follow]}),
+    RefB = t_call(Pb, {apply, gproc, monitor, [Na, follow]}),
+    RefB2 = t_call(Pb2, {apply, gproc, monitor, [Na, follow]}),
+    Msg1 = {gproc, unreg, RefA, Na},
+    {Msg1, Msg1} = {got_msg(Pa), Msg1},
+    Msg2 = {gproc, unreg, RefA2, Na},
+    {Msg2, Msg2} = {got_msg(Pa2), Msg2},
+    Msg3 = {gproc, unreg, RefB, Na},
+    {Msg3, Msg3} = {got_msg(Pb), Msg3},
+    Msg4 = {gproc, unreg, RefB2, Na},
+    {Msg4, Msg4} = {got_msg(Pb2), Msg4},
+    ok = t_call(Pa, {apply, gproc, demonitor, [Na, RefA]}),
+    ok = t_call(Pb, {apply, gproc, demonitor, [Na, RefB]}),
+    Pr = t_spawn_reg(B, Na),
+    Msg5 = {gproc, registered, RefA2, Na},
+    {Msg5, Msg5} = {got_msg(Pa2), Msg5},
+    Msg6 = {gproc, registered, RefB2, Na},
+    {Msg6, Msg6} = {got_msg(Pb2), Msg6},
+    ok = no_msg(Pa, 500),
+    ok = no_msg(Pb, 500),
+    [ ok = t_call(P, die) || P <- [Pa, Pa2, Pb, Pb2, Pr]],
+    ok.
+
 t_subscribe([A,B|_] = Ns) ->
     Na = ?T_NAME,
     Pb = t_spawn(B, _Selective = true),
@@ -639,6 +670,7 @@ t_spawn_reg(Node, N, V, As) -> gproc_test_lib:t_spawn_reg(Node, N, V, As).
 t_spawn_reg_shared(Node, N, V) -> gproc_test_lib:t_spawn_reg_shared(Node, N, V).
 got_msg(P) -> gproc_test_lib:got_msg(P).
 got_msg(P, Tag) -> gproc_test_lib:got_msg(P, Tag).
+no_msg(P, Timeout) -> gproc_test_lib:no_msg(P, Timeout).
 
 t_call(P, Req) ->
     gproc_test_lib:t_call(P, Req).

+ 14 - 1
test/gproc_test_lib.erl

@@ -7,7 +7,8 @@
          t_call/2,
          t_loop/0, t_loop/1,
 	 t_pool_contains_atleast/2,
-         got_msg/1, got_msg/2]).
+         got_msg/1, got_msg/2,
+         no_msg/2]).
 
 -include_lib("eunit/include/eunit.hrl").
 
@@ -139,6 +140,18 @@ got_msg(Pb, Tag) ->
 		    end
 	    end}).
 
+no_msg(Pb, Timeout) ->
+    t_call(Pb,
+           {apply_fun,
+            fun() ->
+                    receive
+                        M ->
+                            erlang:error({unexpected_msg, M})
+                    after Timeout ->
+                            ok
+                    end
+            end}).
+
 t_pool_contains_atleast(Pool,N)->
     Existing = lists:foldl(fun({_X,_Y},Acc)->
                                    Acc+1;

+ 17 - 0
test/gproc_tests.erl

@@ -152,6 +152,8 @@ reg_test_() ->
       , ?_test(t_is_clean())
       , {spawn, ?_test(?debugVal(t_monitor_follow()))}
       , ?_test(t_is_clean())
+      , {spawn, ?_test(?debugVal(t_monitor_demonitor()))}
+      , ?_test(t_is_clean())
       , {spawn, ?_test(?debugVal(t_subscribe()))}
       , ?_test(t_is_clean())
       , {spawn, ?_test(?debugVal(t_gproc_info()))}
@@ -916,6 +918,21 @@ t_monitor_follow() ->
     [exit(P,kill) || P <- [P1,P3]],
     ok.
 
+t_monitor_demonitor() ->
+    Name = ?T_NAME,
+    P1 = t_spawn(Selective = true),
+    Ref = t_call(P1, {apply, gproc, monitor, [Name, follow]}),
+    {gproc, unreg, Ref, Name} = got_msg(P1),
+    ok = t_call(P1, {apply, gproc, demonitor, [Name, Ref]}),
+    P2 = t_spawn(Selective),
+    Ref2 = t_call(P2, {apply, gproc, monitor, [Name, follow]}),
+    {gproc, unreg, Ref2, Name} = got_msg(P2),
+    P3 = t_spawn_reg(Name),
+    {gproc, registered, Ref2, Name} = got_msg(P2),
+    ok = gproc_test_lib:no_msg(P1, 300),
+    [exit(P, kill) || P <- [P1, P2, P3]],
+    ok.
+
 t_subscribe() ->
     Key = {n,l,a},
     ?assertEqual(ok, gproc_monitor:subscribe(Key)),