Browse Source

Merge pull request #108 from uwiger/uw-unreg-inconsistency

Address issue where crashed process wasn't unregged on remote node
Ulf Wiger 9 years ago
parent
commit
58b75ef086
3 changed files with 129 additions and 114 deletions
  1. 24 13
      src/gproc_dist.erl
  2. 5 0
      src/gproc_trace.hrl
  3. 100 101
      test/gproc_dist_tests.erl

+ 24 - 13
src/gproc_dist.erl

@@ -71,6 +71,7 @@
           is_leader,
           sync_requests = []}).
 
+-include("gproc_trace.hrl").
 %% ==========================================================
 %% Start functions
 
@@ -522,22 +523,22 @@ handle_leader_call({Unreg, {T,g,Name} = K, Pid}, _From, S, _E)
                     case ets:lookup(?TAB, {{a,g,Name},a}) of
                         [Aggr] ->
                             %% updated by remove_reg/3
-                            {reply, true, [{delete,[Key, {Pid,K}]},
+                            {reply, true, [{delete,[{K,Pid}, {Pid,K}]},
                                            {insert, [Aggr]}], S};
                         [] ->
-                            {reply, true, [{delete, [Key, {Pid,K}]}], S}
+                            {reply, true, [{delete, [{K,Pid}, {Pid,K}]}], S}
                     end;
                T == r ->
                     case ets:lookup(?TAB, {{rc,g,Name},rc}) of
                         [RC] ->
-                            {reply, true, [{delete,[Key, {Pid,K}]},
+                            {reply, true, [{delete,[{K,Pid}, {Pid,K}]},
                                            {insert, [RC]}], S};
                         [] ->
-                            {reply, true, [{delete, [Key, {Pid, K}]}], S}
+                            {reply, true, [{delete, [{K,Pid}, {Pid, K}]}], S}
                     end;
                true ->
                     {reply, true, [{notify, [{K, Pid, unreg}]},
-                                   {delete, [Key, {Pid,K}]}], S}
+                                   {delete, [{K, Pid}, {Pid,K}]}], S}
             end;
         false ->
             {reply, badarg, S}
@@ -560,14 +561,14 @@ handle_leader_call({give_away, {T,g,_} = K, To, Pid}, _From, S, _E)
                     gproc_lib:notify({migrated, ToPid}, K, Opts),
                     {reply, ToPid, [{insert, [{Key, ToPid, Value}]},
                                     {notify, [{K, Pid, {migrated, ToPid}}]},
-				    {delete, [Rev]}], S};
+				    {delete, [{K, Pid}, Rev]}], S};
                 undefined ->
                     ets:delete(?TAB, Key),
                     Rev = {Pid, K},
                     ets:delete(?TAB, Rev),
                     gproc_lib:notify(unreg, K, Opts),
                     {reply, undefined, [{notify, [{K, Pid, unreg}]},
-                                        {delete, [Key, Rev]}], S}
+                                        {delete, [{K, Pid}, Rev]}], S}
             end;
         _ ->
             {reply, badarg, S}
@@ -771,12 +772,15 @@ filter_standbys([], _) ->
 remove_entry(Key, Pid, Event) ->
     K = ets_key(Key, Pid),
     case ets:lookup(?TAB, K) of
-	[{_, Pid, _}] ->
+	[{_, P, _}] when is_pid(P), P =:= Pid; is_atom(Pid) ->
 	    ets:delete(?TAB, K),
 	    remove_rev_entry(get_opts(Pid, Key), Pid, Key, Event);
 	[{_, _OtherPid, _}] ->
 	    ets:delete(?TAB, {Pid, Key}),
 	    [];
+        [{_, _Waiters}] ->
+            %% Skip
+            [];
 	[] -> []
     end.
 
@@ -834,8 +838,11 @@ insert_globals(Globals) ->
 
 delete_globals(Globals) ->
     lists:foreach(
-      fun({{_,g,_},T} = K) when is_atom(T); is_pid(T) ->
-              ets:delete(?TAB, K);
+      fun({{_,g,_} = K, T}) when is_atom(T); is_pid(T) ->
+              remove_entry(K, T, []);
+         ({{{_,g,_} = K, T}, P}) when is_pid(P), is_atom(T);
+                                          is_pid(P), is_pid(T) ->
+	      remove_entry(K, P, []);
          ({Pid, Key}) when is_pid(Pid); Pid==shared ->
 	      ets:delete(?TAB, {Pid, Key})
       end, Globals).
@@ -878,8 +885,9 @@ surrendered_1(Globs) ->
     My_local_globs =
         ets:select(?TAB, [{{{{'_',g,'_'},'_'},'$1', '$2'},
                            [{'==', {node,'$1'}, node()}],
-                           [{{ {element,1,{element,1,'$_'}}, '$1', '$2' }}]}]),
+                           [{{ {element,1,'$_'}, '$1', '$2' }}]}]),
     _ = [gproc_lib:ensure_monitor(Pid, g) || {_, Pid, _} <- My_local_globs],
+    ?event({'My_local_globs', My_local_globs}),
     %% remove all remote globals.
     ets:select_delete(?TAB, [{{{{'_',g,'_'},'_'}, '$1', '_'},
                               [{'=/=', {node,'$1'}, node()}],
@@ -902,6 +910,7 @@ surrendered_1(Globs) ->
              ({_, Pid, _} = Obj, Acc) when node(Pid) == node() ->
                   [Obj|Acc]
           end, [], Globs),
+    ?event({'Ldr_local_globs', Ldr_local_globs}),
     case [{K,P,V} || {K,P,V} <- My_local_globs,
 		     is_pid(P) andalso
 			 not(lists:keymember(K, 1, Ldr_local_globs))] of
@@ -910,14 +919,16 @@ surrendered_1(Globs) ->
             ok;
         [_|_] = Missing ->
             %% This is very unlikely, I think
+            ?event({'Missing', Missing}),
             leader_cast({add_globals, mk_broadcast_insert_vals(Missing)})
     end,
-    case [{K,P} || {K,P,_} <- Ldr_local_globs,
+    case [{K,P} || {{K,_}=R,P,_} <- Ldr_local_globs,
 		   is_pid(P) andalso
-		       not(lists:keymember(K, 1, My_local_globs))] of
+		       not(lists:keymember(R, 1, My_local_globs))] of
         [] ->
             ok;
         [_|_] = Remove ->
+            ?event({'Remove', Remove}),
             leader_cast({remove_globals, Remove})
     end.
 

+ 5 - 0
src/gproc_trace.hrl

@@ -0,0 +1,5 @@
+-define(event(E), event(?LINE, E, [])).
+-define(event(E, S), event(?LINE, E, S)).
+
+event(_L, _E, _S) ->
+    ok.

+ 100 - 101
test/gproc_dist_tests.erl

@@ -22,109 +22,82 @@
 -include_lib("eunit/include/eunit.hrl").
 -export([t_spawn/1, t_spawn_reg/2]).
 
+-define(f(E), fun() -> ?debugVal(E) end).
+
 dist_test_() ->
     {timeout, 120,
-     [{setup,
-       fun() ->
-               case run_dist_tests() of
-                   true ->
-                       Ns = start_slaves([dist_test_n1, dist_test_n2]),
-                       ?assertMatch({[ok,ok],[]},
-                                    rpc:multicall(Ns, application, set_env,
-                                                  [gproc, gproc_dist, Ns])),
-                       ?assertMatch({[ok,ok],[]},
-                                    rpc:multicall(
-                                      Ns, application, start, [gproc])),
-                       Ns;
-                   false ->
-                       skip
-               end
-       end,
-       fun(_Ns) ->
-               ok
-       end,
-       fun(skip) -> [];
-          (Ns) when is_list(Ns) ->
-               {inorder,
-                [
-                 {inorder, [
-                               fun() ->
-                                       ?debugVal(t_simple_reg(Ns))
-                               end,
-                               fun() ->
-                                       ?debugVal(t_simple_reg_other(Ns))
-                               end,
-                               fun() ->
-                                       ?debugVal(t_simple_reg_or_locate(Ns))
-                               end,
-                               fun() ->
-                                       ?debugVal(t_simple_counter(Ns))
-                               end,
-                               fun() ->
-                                       ?debugVal(t_aggr_counter(Ns))
-                               end,
-                               fun() ->
-                                       ?debugVal(t_awaited_aggr_counter(Ns))
-                               end,
-                               fun() ->
-                                       ?debugVal(t_simple_resource_count(Ns))
-                               end,
-                               fun() ->
-                                       ?debugVal(t_awaited_resource_count(Ns))
-                               end,
-                               fun() ->
-                                       ?debugVal(t_resource_count_on_zero(Ns))
-                               end,
-                               fun() ->
-                                       ?debugVal(t_update_counters(Ns))
-                               end,
-                               fun() ->
-                                       ?debugVal(t_shared_counter(Ns))
-                               end,
-                               fun() ->
-                                       ?debugVal(t_prop(Ns))
-                               end,
-                               fun() ->
-                                       ?debugVal(t_mreg(Ns))
-                               end,
-                               fun() ->
-                                       ?debugVal(t_await_reg(Ns))
-                               end,
-                               fun() ->
-                                       ?debugVal(t_await_self(Ns))
-                               end,
-                               fun() ->
-                                       ?debugVal(t_await_reg_exists(Ns))
-                               end,
-                               fun() ->
-                                       ?debugVal(t_give_away(Ns))
-                               end,
-                               fun() ->
-                                       ?debugVal(t_sync(Ns))
-                               end,
-                               fun() ->
-                                       ?debugVal(t_monitor(Ns))
-                               end,
-                               fun() ->
-                                       ?debugVal(t_standby_monitor(Ns))
-                               end,
-                               fun() ->
-                                       ?debugVal(t_follow_monitor(Ns))
-                               end,
-                               fun() ->
-                                       ?debugVal(t_subscribe(Ns))
-                               end
-                           ]
-                 },
-                 fun() ->
-                         ?debugVal(t_sync_cand_dies(Ns))
-                 end,
-                 {timeout, 90, [fun() ->
-                                        ?debugVal(t_fail_node(Ns))
-                                end]}
-                ]}
-       end
-      }]}.
+     [
+      %% {setup,
+      %%  fun dist_setup/0,
+      %%  fun dist_cleanup/1,
+      %%  fun(skip) -> [];
+      %%     (Ns) when is_list(Ns) ->
+      %%          {inorder, basic_tests(Ns)}
+      %%  end
+      %% },
+      {foreach,
+       fun dist_setup/0,
+       fun dist_cleanup/1,
+       [
+        fun(Ns) ->
+                [{inorder, basic_tests(Ns)}]
+        end,
+        fun(Ns) ->
+                [?f(t_sync_cand_dies(Ns))]
+        end,
+        fun(Ns) ->
+                [?f(t_fail_node(Ns))]
+        end,
+        fun(Ns) ->
+                [?f(t_master_dies(Ns))]
+        end
+       ]}
+     ]}.
+
+basic_tests(Ns) ->
+    [
+     ?f(t_simple_reg(Ns)),
+     ?f(t_simple_reg_other(Ns)),
+     ?f(t_simple_reg_or_locate(Ns)),
+     ?f(t_simple_counter(Ns)),
+     ?f(t_aggr_counter(Ns)),
+     ?f(t_awaited_aggr_counter(Ns)),
+     ?f(t_simple_resource_count(Ns)),
+     ?f(t_awaited_resource_count(Ns)),
+     ?f(t_resource_count_on_zero(Ns)),
+     ?f(t_update_counters(Ns)),
+     ?f(t_shared_counter(Ns)),
+     ?f(t_prop(Ns)),
+     ?f(t_mreg(Ns)),
+     ?f(t_await_reg(Ns)),
+     ?f(t_await_self(Ns)),
+     ?f(t_await_reg_exists(Ns)),
+     ?f(t_give_away(Ns)),
+     ?f(t_sync(Ns)),
+     ?f(t_monitor(Ns)),
+     ?f(t_standby_monitor(Ns)),
+     ?f(t_follow_monitor(Ns)),
+     ?f(t_subscribe(Ns))
+    ].
+
+dist_setup() ->
+    case run_dist_tests() of
+        true ->
+            Ns = start_slaves([dist_test_n1, dist_test_n2, dist_test_n3]),
+            ?assertMatch({[ok,ok,ok],[]},
+                         rpc:multicall(Ns, application, set_env,
+                                       [gproc, gproc_dist, Ns])),
+            ?assertMatch({[ok,ok,ok],[]},
+                         rpc:multicall(
+                           Ns, application, start, [gproc])),
+            Ns;
+        false ->
+            skip
+    end.
+
+dist_cleanup(Ns) ->
+    [slave:stop(N) || N <- Ns],
+    ok.
 
 run_dist_tests() ->
     case os:getenv("GPROC_DIST") of
@@ -530,6 +503,32 @@ t_fail_node([A,B|_] = Ns) ->
     ?assertMatch(ok, t_call(Pa, die)),
     ?assertMatch(ok, t_call(Pb, die)).
 
+t_master_dies([A,B,C] = Ns) ->
+    Na = ?T_NAME,
+    Nb = ?T_NAME,
+    Nc = ?T_NAME,
+    Pa = t_spawn_reg(A, Na),
+    Pb = t_spawn_reg(B, Nb),
+    Pc = t_spawn_reg(C, Nc),
+    L = rpc:call(A, gproc_dist, get_leader, []),
+    ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pa)),
+    ?assertMatch(ok, t_lookup_everywhere(Nb, Ns, Pb)),
+    ?assertMatch(ok, t_lookup_everywhere(Nc, Ns, Pc)),
+    {Nl, Pl} = case L of
+                   A -> {Na, Pa};
+                   B -> {Nb, Pb};
+                   C -> {Nc, Pc}
+               end,
+    ?assertMatch(true, rpc:call(A, gproc_dist, sync, [])),
+    ?assertMatch(ok, rpc:call(L, application, stop, [gproc])),
+    Names = [{Na,Pa}, {Nb,Pb}, {Nc,Pc}] -- [{Nl, Pl}],
+    RestNs = Ns -- [L],
+    ?assertMatch(true, rpc:call(hd(RestNs), gproc_dist, sync, [])),
+    ?assertMatch(ok, t_lookup_everywhere(Nl, RestNs, undefined)),
+    [?assertMatch(ok, t_lookup_everywhere(Nx, RestNs, Px))
+     || {Nx, Px} <- Names],
+    ok.
+
 t_sleep() ->
     timer:sleep(500).