Browse Source

surrender/2 -> give_away/2; gproc_dist tests

Ulf Wiger 14 years ago
parent
commit
0b9d80c8ee
3 changed files with 213 additions and 52 deletions
  1. 7 2
      rebar.config
  2. 48 43
      src/gproc.erl
  3. 158 7
      src/gproc_dist.erl

+ 7 - 2
rebar.config

@@ -1,3 +1,8 @@
+%% -*- erlang -*-
 {erl_opts, [debug_info]}.
-{deps, [{edown, ".*", {git, "git://github.com/esl/edown.git", "HEAD"}}]}.
-{edoc_opts, [{doclet, edown_doclet}]}.
+{deps, [
+	{edown, ".*", {git, "git://github.com/esl/edown.git", "HEAD"}},
+	{gen_leader, ".*",
+	 {git, "git://github.com/abecciu/gen_leader_revival.git", "HEAD"}}
+       ]}.
+{edoc_opts, [{doclet, edown_doclet}]}.

+ 48 - 43
src/gproc.erl

@@ -52,7 +52,7 @@
 	 lookup_value/1,
          lookup_values/1,
          update_counter/2,
-	 surrender/2,
+	 give_away/2,
          send/2,
          info/1, info/2,
          select/1, select/2, select/3,
@@ -641,22 +641,22 @@ update_counter(_, _) ->
 %%
 %% `To' must be either a pid or a unique name (name or aggregated counter), but
 %% does not necessarily have to resolve to an existing process. If there is 
-%% no process registered with the `To' key, `surrender/2' returns `undefined',
+%% no process registered with the `To' key, `give_away/2' returns `undefined',
 %% and the `From' key is effectively unregistered.
 %%
-%% It is allowed to surrender a key to oneself, but of course, this operation
+%% It is allowed to give away a key to oneself, but of course, this operation
 %% will have no effect.
 %%
 %% Fails with `badarg' if the calling process does not have a `From' key 
 %% registered.
 %% @end
-surrender({_,l,_} = Key, ToPid) when is_pid(ToPid), node(ToPid) == node() ->
-    call({surrender, Key, ToPid});
-surrender({_,l,_} = Key, {n,l,_} = ToKey) ->
-    call({surrender, Key, ToKey});
-surrender({_,g,_} = Key, To) ->
+give_away({_,l,_} = Key, ToPid) when is_pid(ToPid), node(ToPid) == node() ->
+    call({give_away, Key, ToPid});
+give_away({_,l,_} = Key, {n,l,_} = ToKey) ->
+    call({give_away, Key, ToKey});
+give_away({_,g,_} = Key, To) ->
     ?CHK_DIST,
-    gproc_dist:surrender(Key, To).
+    gproc_dist:give_away(Key, To).
 
 %% @spec (Key::key(), Msg::any()) -> Msg
 %%
@@ -887,8 +887,8 @@ handle_call({audit_process, Pid}, _, S) ->
 	    ignore
     end,
     {reply, ok, S};
-handle_call({surrender, Key, To}, {Pid,_}, S) ->
-    Reply = do_surrender(Key, To, Pid),
+handle_call({give_away, Key, To}, {Pid,_}, S) ->
+    Reply = do_give_away(Key, To, Pid),
     {reply, Reply, S};
 handle_call(_, _, S) ->
     {reply, badarg, S}.
@@ -1011,14 +1011,14 @@ process_is_down(Pid) ->
     ets:select_delete(?TAB, [{{{Pid,{'_',l,'_'}},'_'}, [], [true]}]),
     ok.
 
-do_surrender({T,l,_} = K, To, Pid) when T==n; T==a ->
+do_give_away({T,l,_} = K, To, Pid) when T==n; T==a ->
     Key = {K, T},
     case ets:lookup(?TAB, Key) of
 	[{_, Pid, Value}] ->
-	    %% Pid owns the reg; allowed to surrender
-	    case pid_to_surrender_to(To) of
+	    %% Pid owns the reg; allowed to give_away
+	    case pid_to_give_away_to(To) of
 		Pid ->
-		    %% Surrender to ourselves? Why not? We'll allow it,
+		    %% Give away to ourselves? Why not? We'll allow it,
 		    %% but nothing needs to be done.
 		    Pid;
 		ToPid when is_pid(ToPid) ->
@@ -1034,19 +1034,24 @@ do_surrender({T,l,_} = K, To, Pid) when T==n; T==a ->
 	_ ->
 	    badarg
     end;
-do_surrender({T,l,_} = K, To, Pid) when T==c; T==p ->
+do_give_away({T,l,_} = K, To, Pid) when T==c; T==p ->
     Key = {K, Pid},
     case ets:lookup(?TAB, Key) of
 	[{_, Pid, Value}] ->
-	    case pid_to_surrender_to(To) of
+	    case pid_to_give_away_to(To) of
 		ToPid when is_pid(ToPid) ->
 		    ToKey = {K, ToPid},
-		    ets:insert(?TAB, [{ToKey, ToPid, Value},
-				      {{ToPid, K}, r}]),
-		    ets:delete(?TAB, {Pid, K}),
-		    ets:delete(?TAB, Key),
-		    gproc_lib:ensure_monitor(ToPid, l),
-		    ToPid;
+		    case ets:member(?TAB, ToKey) of
+			true ->
+			    badarg;
+			false ->
+			    ets:insert(?TAB, [{ToKey, ToPid, Value},
+					      {{ToPid, K}, r}]),
+			    ets:delete(?TAB, {Pid, K}),
+			    ets:delete(?TAB, Key),
+			    gproc_lib:ensure_monitor(ToPid, l),
+			    ToPid
+		    end;
 		undefined ->
 		    gproc_lib:remove_reg(K, Pid),
 		    undefined
@@ -1056,9 +1061,9 @@ do_surrender({T,l,_} = K, To, Pid) when T==c; T==p ->
     end.
 			
 
-pid_to_surrender_to(P) when is_pid(P), node(P) == node() ->		    
+pid_to_give_away_to(P) when is_pid(P), node(P) == node() ->		    
     P;
-pid_to_surrender_to({T,l,_} = Key) when T==n; T==a ->
+pid_to_give_away_to({T,l,_} = Key) when T==n; T==a ->
     case ets:lookup(?TAB, {Key, T}) of
 	[{_, Pid, _}] ->
 	    Pid;
@@ -1391,15 +1396,15 @@ reg_test_() ->
       , ?_test(t_is_clean())
       , {spawn, ?_test(t_cancel_wait_and_register())}
       , ?_test(t_is_clean())
-      , {spawn, ?_test(t_surrender_to_pid())}
+      , {spawn, ?_test(t_give_away_to_pid())}
       , ?_test(t_is_clean())
-      , {spawn, ?_test(t_surrender_to_self())}
+      , {spawn, ?_test(t_give_away_to_self())}
       , ?_test(t_is_clean())
-      , {spawn, ?_test(t_surrender_badarg())}
+      , {spawn, ?_test(t_give_away_badarg())}
       , ?_test(t_is_clean())
-      , {spawn, ?_test(t_surrender_to_unknown())}
+      , {spawn, ?_test(t_give_away_to_unknown())}
       , ?_test(t_is_clean())
-      , {spawn, ?_test(t_surrender_and_back())}
+      , {spawn, ?_test(t_give_away_and_back())}
       , ?_test(t_is_clean())
      ]}.
 
@@ -1487,57 +1492,57 @@ t_cancel_wait_and_register() ->
     end.
 
 
-t_surrender_to_pid() ->
+t_give_away_to_pid() ->
     From = {n, l, foo},
     Me = self(),
     P = spawn_link(fun t_loop/0),
     ?assertEqual(true, gproc:reg(From, undefined)),
     ?assertEqual(Me, gproc:where(From)),
-    ?assertEqual(P, gproc:surrender(From, P)),
+    ?assertEqual(P, gproc:give_away(From, P)),
     ?assertEqual(P, gproc:where(From)),
     ?assertEqual(ok, t_call(P, die)).
 
-t_surrender_to_self() ->
+t_give_away_to_self() ->
     From = {n, l, foo},
     Me = self(),
     ?assertEqual(true, gproc:reg(From, undefined)),
     ?assertEqual(Me, gproc:where(From)),
-    ?assertEqual(Me, gproc:surrender(From, Me)),
+    ?assertEqual(Me, gproc:give_away(From, Me)),
     ?assertEqual(Me, gproc:where(From)),
     ?assertEqual(true, gproc:unreg(From)).
 
-t_surrender_badarg() ->
+t_give_away_badarg() ->
     From = {n, l, foo},
     Me = self(),
     ?assertEqual(undefined, gproc:where(From)),
-    ?assertError(badarg, gproc:surrender(From, Me)).
+    ?assertError(badarg, gproc:give_away(From, Me)).
 
-t_surrender_to_unknown() ->
+t_give_away_to_unknown() ->
     From = {n, l, foo},
     Unknown = {n, l, unknown},
     Me = self(),
     ?assertEqual(true, gproc:reg(From, undefined)),
     ?assertEqual(Me, gproc:where(From)),
     ?assertEqual(undefined, gproc:where(Unknown)),
-    ?assertEqual(undefined, gproc:surrender(From, Unknown)),
+    ?assertEqual(undefined, gproc:give_away(From, Unknown)),
     ?assertEqual(undefined, gproc:where(From)).
 
-t_surrender_and_back() ->
+t_give_away_and_back() ->
     From = {n, l, foo},
     Me = self(),
     P = spawn_link(fun t_loop/0),
     ?assertEqual(true, gproc:reg(From, undefined)),
     ?assertEqual(Me, gproc:where(From)),
-    ?assertEqual(P, gproc:surrender(From, P)),
+    ?assertEqual(P, gproc:give_away(From, P)),
     ?assertEqual(P, gproc:where(From)),
-    ?assertEqual(ok, t_call(P, {surrender, From})),
+    ?assertEqual(ok, t_call(P, {give_away, From})),
     ?assertEqual(Me, gproc:where(From)),
     ?assertEqual(ok, t_call(P, die)).
 
 t_loop() ->
     receive
-	{From, {surrender, Key}} ->
-	    ?assertEqual(From, gproc:surrender(Key, From)),
+	{From, {give_away, Key}} ->
+	    ?assertEqual(From, gproc:give_away(Key, From)),
 	    From ! {self(), ok},
 	    t_loop();
 	{From, die} ->

+ 158 - 7
src/gproc_dist.erl

@@ -26,7 +26,7 @@
 	 reg/1, reg/2, unreg/1,
 	 mreg/2,
 	 set_value/2,
-	 surrender/2,
+	 give_away/2,
 	 update_counter/2]).
 
 -export([leader_call/1, leader_cast/1]).
@@ -48,6 +48,11 @@
 
 -include("gproc.hrl").
 
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-export([t_spawn/1, t_spawn_reg/2]).
+-endif.
+
 -define(SERVER, ?MODULE).
 
 -record(state, {
@@ -110,8 +115,8 @@ set_value({_,g,_} = Key, Value) ->
 set_value(_, _) ->
     erlang:error(badarg).
 
-surrender({_,g,_} = Key, To) ->
-    leader_call({surrender, Key, To, self()}).
+give_away({_,g,_} = Key, To) ->
+    leader_call({give_away, Key, To, self()}).
 
 
 update_counter({c,g,_} = Key, Incr) when is_integer(Incr) ->
@@ -231,12 +236,12 @@ handle_leader_call({unreg, {T,g,Name} = K, Pid}, _From, S, _E) ->
 	false ->
 	    {reply, badarg, S}
     end;
-handle_leader_call({surrender, {T,g,_} = K, To, Pid}, _From, S, _E)
+handle_leader_call({give_away, {T,g,_} = K, To, Pid}, _From, S, _E)
   when T == a; T == n ->
     Key = {K, T},
     case ets:lookup(?TAB, Key) of
 	[{_, Pid, Value}] ->
-	    case pid_to_surrender_to(To) of 
+	    case pid_to_give_away_to(To) of 
 		Pid ->
 		    {reply, Pid, S};
 		ToPid when is_pid(ToPid) ->
@@ -459,12 +464,158 @@ update_aggr_counter({c,g,Ctr}, Incr) ->
             [New]
     end.
 
-pid_to_surrender_to(P) when is_pid(P) ->                 
+pid_to_give_away_to(P) when is_pid(P) ->                 
     P;
-pid_to_surrender_to({T,g,_} = Key) when T==n; T==a ->
+pid_to_give_away_to({T,g,_} = Key) when T==n; T==a ->
     case ets:lookup(?TAB, {Key, T}) of
         [{_, Pid, _}] ->
             Pid;
         _ ->
             undefined
     end.
+
+-ifdef(TEST).
+
+dist_test_() ->
+    {foreach,
+     fun() ->
+	     Ns = start_slaves([n1, n2, n3]),
+	     %% dbg:tracer(),
+	     %% [dbg:n(N) || N <- Ns],
+	     %% dbg:tpl(gproc_dist, x),
+	     %% dbg:p(all,[c]),
+	     Ns
+     end,
+     fun(Ns) ->
+	     ?debugVal([rpc:call(N, init, stop, []) || N <- Ns])
+     end,
+     [
+      {with, [fun t_simple_reg/1,
+	      fun t_await_reg/1,
+	      fun t_give_away/1]}
+     ]}.
+
+t_simple_reg([H|_] = Ns) ->
+    ?debugMsg(t_simple_reg),
+    Name = {n, g, foo},
+    P = t_spawn_reg(H, Name),
+    ?assertMatch(ok, t_lookup_everywhere(Name, Ns, P)),
+    ?assertMatch(true, t_call(P, {apply, gproc, unreg, [Name]})),
+    ?assertMatch(ok, t_lookup_everywhere(Name, Ns, undefined)),
+    ?assertMatch(ok, t_call(P, die)).
+
+t_await_reg([A,B|_]) ->
+    ?debugMsg(t_await_reg),
+    Name = {n, g, foo},
+    P = t_spawn(A),
+    P ! {self(), {apply, gproc, await, [Name]}},
+    P1 = t_spawn_reg(B, Name),
+    ?assert(P1 == receive
+		      {P, Res} ->
+			  element(1, Res)
+		  end),
+    ?assertMatch(ok, t_call(P, die)),
+    ?assertMatch(ok, t_call(P1, die)).
+
+t_give_away([A,B|_] = Ns) ->
+    ?debugMsg(t_give_away),
+    Na = {n, g, a},
+    Nb = {n, g, b},
+    Pa = t_spawn_reg(A, Na),
+    Pb = t_spawn_reg(B, Nb),
+    ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pa)),
+    ?assertMatch(ok, t_lookup_everywhere(Nb, Ns, Pb)),
+    ?debugHere,
+    ?assertMatch(Pb, ?debugVal(
+			t_call(Pa, {apply, {gproc, give_away, [Na, Nb]}}))),
+    ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pb)),
+    ?debugHere,
+    ?assertMatch(Pa, t_call(Pa, {apply, {gproc, give_away, [Na, Pa]}})),
+    ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pa)),
+    ?debugHere,
+    ?assertMatch(ok, t_call(Pa, die)),
+    ?assertMatch(ok, t_call(Pb, die)).
+    
+t_sleep() ->
+    timer:sleep(1000).
+
+t_lookup_everywhere(Key, Nodes, Exp) ->
+    t_lookup_everywhere(Key, Nodes, Exp, 3).
+
+t_lookup_everywhere(Key, _, Exp, 0) ->
+    {lookup_failed, Key, Exp};
+t_lookup_everywhere(Key, Nodes, Exp, I) ->
+    Expected = [{N, Exp} || N <- Nodes],
+    Found = [{N,rpc:call(N, gproc, where, [Key])} || N <- Nodes],
+    if Expected =/= Found ->
+	    ?debugFmt("lookup ~p failed (~p), retrying...~n", [Key, Found]),
+	    t_sleep(),
+	    t_lookup_everywhere(Key, Nodes, Exp, I-1);
+       true ->
+	    ok
+    end.
+				  
+
+t_spawn(Node) ->
+    Me = self(),
+    P = spawn(Node, fun() ->
+			    Me ! {self(), ok},
+			    t_loop()
+		    end),
+    receive
+	{P, ok} -> P
+    end.
+
+t_spawn_reg(Node, Name) ->
+    Me = self(),
+    spawn(Node, fun() ->
+			?assertMatch(true, gproc:reg(Name)),
+			Me ! {self(), ok},
+			t_loop()
+		end),
+    receive
+	{P, ok} -> P
+    end.
+
+t_call(P, Req) ->
+    P ! {self(), Req},
+    receive
+	{P, Res} ->
+	    Res
+    end.
+
+t_loop() ->
+    receive
+	{From, die} ->
+	    From ! {self(), ok};
+	{From, {apply, M, F, A}} ->
+	    From ! {self(), apply(M, F, A)},
+	    t_loop()
+    end.
+
+start_slaves(Ns) ->
+    [H|T] = Nodes = [start_slave(N) || N <- Ns],
+    ?debugVal([pong = rpc:call(H, net, ping, [N]) || N <- T]),
+    ?debugVal(rpc:multicall(Nodes, application, start, [gproc])),
+    Nodes.
+	       
+start_slave(Name) ->
+    case node() of
+        nonode@nohost ->
+            os:cmd("epmd -daemon"),
+            {ok, _} = net_kernel:start([gproc_master, shortnames]);
+        _ ->
+            ok
+    end,
+    {ok, Node} = slave:start(
+		   host(), Name,
+		   "-pa . -pz ../ebin -pa ../deps/gen_leader/ebin "
+		   "-gproc gproc_dist all"),
+    io:fwrite(user, "Slave node: ~p~n", [Node]),
+    Node.
+
+host() ->
+    [Name, Host] = re:split(atom_to_list(node()), "@", [{return, list}]),
+    list_to_atom(Host).
+
+-endif.