Browse Source

More thorough gproc_dist:sync() + gproc_dist:get_leader()

gproc_dist:sync() now waits for the candidates to respond, ensuring that
when the call returns, any gproc update that happened before the call
will have been replicated to all participating nodes.

gproc_dist:get_leader() will ask the local gproc_dist server which node
is the leader. This is used in the gproc_dist_tests suite, but should be
useful in other contexts as well.
Ulf Wiger 14 years ago
parent
commit
349a90bacc
3 changed files with 105 additions and 12 deletions
  1. 20 4
      doc/gproc_dist.md
  2. 61 8
      src/gproc_dist.erl
  3. 24 0
      test/gproc_dist_tests.erl

+ 20 - 4
doc/gproc_dist.md

@@ -30,7 +30,7 @@ For a detailed description, see gproc/doc/erlang07-wiger.pdf.
 
 
 
 
 
 
-<table width="100%" border="1" cellspacing="0" cellpadding="2" summary="function index"><tr><td valign="top"><a href="#code_change-4">code_change/4</a></td><td></td></tr><tr><td valign="top"><a href="#elected-2">elected/2</a></td><td></td></tr><tr><td valign="top"><a href="#elected-3">elected/3</a></td><td></td></tr><tr><td valign="top"><a href="#from_leader-3">from_leader/3</a></td><td></td></tr><tr><td valign="top"><a href="#give_away-2">give_away/2</a></td><td></td></tr><tr><td valign="top"><a href="#handle_DOWN-3">handle_DOWN/3</a></td><td></td></tr><tr><td valign="top"><a href="#handle_call-4">handle_call/4</a></td><td></td></tr><tr><td valign="top"><a href="#handle_cast-3">handle_cast/3</a></td><td></td></tr><tr><td valign="top"><a href="#handle_info-2">handle_info/2</a></td><td></td></tr><tr><td valign="top"><a href="#handle_leader_call-4">handle_leader_call/4</a></td><td></td></tr><tr><td valign="top"><a href="#handle_leader_cast-3">handle_leader_cast/3</a></td><td></td></tr><tr><td valign="top"><a href="#init-1">init/1</a></td><td></td></tr><tr><td valign="top"><a href="#leader_call-1">leader_call/1</a></td><td></td></tr><tr><td valign="top"><a href="#leader_cast-1">leader_cast/1</a></td><td></td></tr><tr><td valign="top"><a href="#mreg-2">mreg/2</a></td><td></td></tr><tr><td valign="top"><a href="#reg-1">reg/1</a></td><td></td></tr><tr><td valign="top"><a href="#reg-2">reg/2</a></td><td>
+<table width="100%" border="1" cellspacing="0" cellpadding="2" summary="function index"><tr><td valign="top"><a href="#code_change-4">code_change/4</a></td><td></td></tr><tr><td valign="top"><a href="#elected-2">elected/2</a></td><td></td></tr><tr><td valign="top"><a href="#elected-3">elected/3</a></td><td></td></tr><tr><td valign="top"><a href="#from_leader-3">from_leader/3</a></td><td></td></tr><tr><td valign="top"><a href="#get_leader-0">get_leader/0</a></td><td></td></tr><tr><td valign="top"><a href="#give_away-2">give_away/2</a></td><td></td></tr><tr><td valign="top"><a href="#handle_DOWN-3">handle_DOWN/3</a></td><td></td></tr><tr><td valign="top"><a href="#handle_call-4">handle_call/4</a></td><td></td></tr><tr><td valign="top"><a href="#handle_cast-3">handle_cast/3</a></td><td></td></tr><tr><td valign="top"><a href="#handle_info-2">handle_info/2</a></td><td></td></tr><tr><td valign="top"><a href="#handle_leader_call-4">handle_leader_call/4</a></td><td></td></tr><tr><td valign="top"><a href="#handle_leader_cast-3">handle_leader_cast/3</a></td><td></td></tr><tr><td valign="top"><a href="#init-1">init/1</a></td><td></td></tr><tr><td valign="top"><a href="#leader_call-1">leader_call/1</a></td><td></td></tr><tr><td valign="top"><a href="#leader_cast-1">leader_cast/1</a></td><td></td></tr><tr><td valign="top"><a href="#mreg-2">mreg/2</a></td><td></td></tr><tr><td valign="top"><a href="#reg-1">reg/1</a></td><td></td></tr><tr><td valign="top"><a href="#reg-2">reg/2</a></td><td>
 Class = n  - unique name
 Class = n  - unique name
 | p  - non-unique property
 | p  - non-unique property
 | c  - counter
 | c  - counter
@@ -83,6 +83,16 @@ Scope = l | g (global or local).</td></tr><tr><td valign="top"><a href="#set_val
 
 
 `from_leader(Ops, S, E) -> any()`
 `from_leader(Ops, S, E) -> any()`
 
 
+<a name="get_leader-0"></a>
+
+<h3>get_leader/0</h3>
+
+
+
+
+
+`get_leader() -> any()`
+
 <a name="give_away-2"></a>
 <a name="give_away-2"></a>
 
 
 <h3>give_away/2</h3>
 <h3>give_away/2</h3>
@@ -111,7 +121,7 @@ Scope = l | g (global or local).</td></tr><tr><td valign="top"><a href="#set_val
 
 
 
 
 
 
-`handle_call(X1, X2, S, X4) -> any()`
+`handle_call(X1, X2, S, E) -> any()`
 
 
 <a name="handle_cast-3"></a>
 <a name="handle_cast-3"></a>
 
 
@@ -141,7 +151,7 @@ Scope = l | g (global or local).</td></tr><tr><td valign="top"><a href="#set_val
 
 
 
 
 
 
-`handle_leader_call(X1, From, S, E) -> any()`
+`handle_leader_call(X1, From, State, E) -> any()`
 
 
 <a name="handle_leader_cast-3"></a>
 <a name="handle_leader_cast-3"></a>
 
 
@@ -279,7 +289,13 @@ Scope = l | g (global or local)<a name="set_value-2"></a>
 Synchronize with the gproc leader
 Synchronize with the gproc leader
 
 
 This function can be used to ensure that data has been replicated from the
 This function can be used to ensure that data has been replicated from the
-leader to the current node.<a name="terminate-2"></a>
+leader to the current node. It does so by asking the leader to ping all
+live participating nodes. The call will return `true` when all these nodes
+have either responded or died. In the special case where the leader dies
+during an ongoing sync, the call will fail with a timeout exception.
+(Actually, it should be a `leader_died` exception; more study needed to find out
+why gen_leader times out in this situation, rather than reporting that the
+leader died.)<a name="terminate-2"></a>
 
 
 <h3>terminate/2</h3>
 <h3>terminate/2</h3>
 
 

+ 61 - 8
src/gproc_dist.erl

@@ -31,7 +31,8 @@
 
 
 -export([leader_call/1,
 -export([leader_call/1,
 	 leader_cast/1,
 	 leader_cast/1,
-	 sync/0]).
+	 sync/0,
+	 get_leader/0]).
 
 
 %%% internal exports
 %%% internal exports
 -export([init/1,
 -export([init/1,
@@ -54,7 +55,8 @@
 
 
 -record(state, {
 -record(state, {
           always_broadcast = false,
           always_broadcast = false,
-          is_leader}).
+          is_leader,
+	  sync_requests = []}).
 
 
 
 
 start_link() ->
 start_link() ->
@@ -127,18 +129,29 @@ update_counter(_, _) ->
 %% @doc Synchronize with the gproc leader
 %% @doc Synchronize with the gproc leader
 %%
 %%
 %% This function can be used to ensure that data has been replicated from the
 %% This function can be used to ensure that data has been replicated from the
-%% leader to the current node.
+%% leader to the current node. It does so by asking the leader to ping all
+%% live participating nodes. The call will return `true' when all these nodes
+%% have either responded or died. In the special case where the leader dies 
+%% during an ongoing sync, the call will fail with a timeout exception.
+%% (Actually, it should be a `leader_died' exception; more study needed to find out
+%% why gen_leader times out in this situation, rather than reporting that the 
+%% leader died.)
 %% @end
 %% @end
 %%
 %%
 sync() ->
 sync() ->
     leader_call(sync).
     leader_call(sync).
 
 
+get_leader() ->
+    gen_leader:call(?MODULE, get_leader).
+
 %%% ==========================================================
 %%% ==========================================================
 
 
 
 
 handle_cast(_Msg, S, _) ->
 handle_cast(_Msg, S, _) ->
     {stop, unknown_cast, S}.
     {stop, unknown_cast, S}.
 
 
+handle_call(get_leader, _, S, E) ->
+    {reply, gen_leader:leader_node(E), S};
 handle_call(_, _, S, _) ->
 handle_call(_, _, S, _) ->
     {reply, badarg, S}.
     {reply, badarg, S}.
 
 
@@ -181,21 +194,41 @@ surrendered(S, {globals, Globs}, _E) ->
 
 
 
 
 handle_DOWN(Node, S, _E) ->
 handle_DOWN(Node, S, _E) ->
+    S1 = check_sync_requests(Node, S),
     Head = {{{'_',g,'_'},'_'},'$1','_'},
     Head = {{{'_',g,'_'},'_'},'$1','_'},
     Gs = [{'==', {node,'$1'},Node}],
     Gs = [{'==', {node,'$1'},Node}],
     Globs = ets:select(?TAB, [{Head, Gs, [{{{element,1,{element,1,'$_'}},
     Globs = ets:select(?TAB, [{Head, Gs, [{{{element,1,{element,1,'$_'}},
                                             {element,2,'$_'}}}]}]),
                                             {element,2,'$_'}}}]}]),
     case process_globals(Globs) of
     case process_globals(Globs) of
         [] ->
         [] ->
-            {ok, S};
+            {ok, S1};
         Broadcast ->
         Broadcast ->
-            {ok, Broadcast, S}
+            {ok, Broadcast, S1}
     end.
     end.
 %%     ets:select_delete(?TAB, [{Head, Gs, [true]}]),
 %%     ets:select_delete(?TAB, [{Head, Gs, [true]}]),
 %%     {ok, [{delete, Globs}], S}.
 %%     {ok, [{delete, Globs}], S}.
 
 
-handle_leader_call(sync, _From, S, _E) ->
-    {reply, true, sync, S};
+check_sync_requests(Node, #state{sync_requests = SReqs} = S) ->
+    SReqs1 = lists:flatmap(
+	       fun({From, Ns}) ->
+		       case Ns -- [Node] of
+			   [] ->
+			       gen_leader:reply(From, {leader, reply, true}),
+			       [];
+			   Ns1 ->
+			       [{From, Ns1}]
+		       end
+	       end, SReqs),
+    S#state{sync_requests = SReqs1}.
+
+handle_leader_call(sync, From, #state{sync_requests = SReqs} = S, E) ->
+    case gen_leader:alive(E) -- [node()] of
+	[] ->
+	    {reply, true, S};
+	Alive ->
+	    gen_leader:broadcast({from_leader, {sync, From}}, Alive, E),
+	    {noreply, S#state{sync_requests = [{From, Alive}|SReqs]}}
+    end;
 handle_leader_call({reg, {C,g,Name} = K, Value, Pid}, _From, S, _E) ->
 handle_leader_call({reg, {C,g,Name} = K, Value, Pid}, _From, S, _E) ->
     case gproc_lib:insert_reg(K, Value, Pid, g) of
     case gproc_lib:insert_reg(K, Value, Pid, g) of
 	false ->
 	false ->
@@ -318,6 +351,25 @@ handle_leader_call({await, Key, Pid}, {_,Ref} = From, S, _E) ->
 handle_leader_call(_, _, S, _E) ->
 handle_leader_call(_, _, S, _E) ->
     {reply, badarg, S}.
     {reply, badarg, S}.
 
 
+handle_leader_cast({sync_reply, Node, Ref}, S, _E) ->
+    #state{sync_requests = SReqs} = S,
+    case lists:keyfind(Ref, 1, SReqs) of
+	false ->
+	    %% This should never happen, except perhaps if the leader who 
+	    %% received the sync request died, and the new leader gets the 
+	    %% sync reply. In that case, we trust that the client has been notified
+	    %% anyway, and ignore the message.
+	    {ok, S};
+	{_, Ns} ->
+	    case lists:delete(Node, Ns) of
+		[] ->
+		    gen_leader:reply(Ref, {leader, reply, true}),
+		    {ok, S#state{sync_requests = lists:keydelete(Ref,1,SReqs)}};
+		Ns1 ->
+		    SReqs1 = lists:keyreplace(Ref, 1, SReqs, {Ref, Ns1}),
+		    {ok, S#state{sync_requests = SReqs1}}
+	    end
+    end;
 handle_leader_cast({add_globals, Missing}, S, _E) ->
 handle_leader_cast({add_globals, Missing}, S, _E) ->
     %% This is an audit message: a peer (non-leader) had info about granted
     %% This is an audit message: a peer (non-leader) had info about granted
     %% global resources that we didn't know of when we became leader.
     %% global resources that we didn't know of when we became leader.
@@ -366,7 +418,8 @@ terminate(_Reason, _S) ->
 
 
 
 
 
 
-from_leader(sync, S, _E) ->
+from_leader({sync, Ref}, S, _E) ->
+    gen_leader:leader_cast(?MODULE, {sync_reply, node(), Ref}),
     {ok, S};
     {ok, S};
 from_leader(Ops, S, _E) ->
 from_leader(Ops, S, _E) ->
     lists:foreach(
     lists:foreach(

+ 24 - 0
test/gproc_dist_tests.erl

@@ -60,6 +60,9 @@ dist_test_() ->
 			       end
 			       end
 			      ]
 			      ]
 		 },
 		 },
+		 fun() ->
+			 ?debugVal(t_sync_cand_dies(Ns))
+		 end,
 		 {timeout, 90, [fun() ->
 		 {timeout, 90, [fun() ->
 		 			?debugVal(t_fail_node(Ns))
 		 			?debugVal(t_fail_node(Ns))
 		 		end]}
 		 		end]}
@@ -137,6 +140,27 @@ t_sync(Ns) ->
     [?assertMatch(true, rpc:call(N, gproc_dist, sync, []))
     [?assertMatch(true, rpc:call(N, gproc_dist, sync, []))
      || N <- Ns].
      || N <- Ns].
 
 
+%% Verify that the gproc_dist:sync() call returns true even if a candidate dies
+%% while the sync is underway. This test makes use of sys:suspend() to ensure that
+%% the other candidate doesn't respond too quickly.
+t_sync_cand_dies([A,B|_] = Ns) ->
+    Leader = rpc:call(A, gproc_dist, get_leader, []),
+    Other = case Leader of 
+		A -> B;
+		B -> A
+	    end,
+    ?assertMatch(ok, rpc:call(Other, sys, suspend, [gproc_dist])),
+    P = rpc:call(Other, erlang, whereis, [gproc_dist]),
+    Key = rpc:async_call(Leader, gproc_dist, sync, []),
+    %% The overall timeout for gproc_dist:sync() is 5 seconds. Here, we should 
+    %% still be waiting.
+    ?assertMatch(timeout, rpc:nb_yield(Key, 1000)),
+    exit(P, kill),
+    %% The leader should detect that the other candidate died and respond
+    %% immediately. Therefore, we should have our answer well within 1 sec.
+    ?assertMatch({value, true}, rpc:nb_yield(Key, 1000)).
+		    
+
 t_fail_node([A,B|_] = Ns) ->
 t_fail_node([A,B|_] = Ns) ->
     Na = ?T_NAME,
     Na = ?T_NAME,
     Nb = ?T_NAME,
     Nb = ?T_NAME,