Просмотр исходного кода

Rewrite gproc_dist:sync() to hopefully be more robust

The old version used gen_leader:leader_call(), which doesn't do well
if the leader dies, or similar things happen. The new solution calls
the local gproc_dist, which remembers the request and casts to the
leader (assuming it's not itself the leader). The leader performs
the sync as before, then casts to the intitial node that the sync
is complete. If a new leader is elected, the other nodes check
pending sync requests and re-initiate them.

Timeouts, which were recently increased, returned to their previous
values.
Ulf Wiger 7 лет назад
Родитель
Сommit
9c9c0cafaf
2 измененных файлов с 105 добавлено и 38 удалено
  1. 101 35
      src/gproc_dist.erl
  2. 4 3
      test/gproc_dist_tests.erl

+ 101 - 35
src/gproc_dist.erl

@@ -69,6 +69,7 @@
 -record(state, {
           always_broadcast = false,
           is_leader,
+          sync_clients = [],
           sync_requests = []}).
 
 -include("gproc_trace.hrl").
@@ -266,7 +267,7 @@ reset_counter(_) ->
 %%
 sync() ->
     %% Increase timeout since gen_leader can take some time ...
-    leader_call(sync, 10000).
+    gen_server:call(?MODULE, sync, 5000).
 
 %% @spec get_leader() -> node()
 %% @doc Returns the node of the current gproc leader.
@@ -283,6 +284,8 @@ handle_cast(_Msg, S, _) ->
 
 handle_call(get_leader, _, S, E) ->
     {reply, gen_leader:leader_node(E), S};
+handle_call(sync, From, S, E) ->
+    {noreply, initiate_sync(From, S, E)};
 handle_call(_, _, S, _) ->
     {reply, badarg, S}.
 
@@ -326,16 +329,16 @@ globs() ->
 surrendered(#state{is_leader = true} = S, {globals, Globs}, _E) ->
     %% Leader conflict!
     surrendered_1(Globs),
-    {ok, S#state{is_leader = false}};
+    {ok, maybe_reinitiate_sync(S#state{is_leader = false})};
 surrendered(S, {globals, Globs}, _E) ->
     %% globals from this node should be more correct in our table than
     %% in the leader's
     surrendered_1(Globs),
-    {ok, S#state{is_leader = false}}.
+    {ok, maybe_reinitiate_sync(S#state{is_leader = false})}.
 
 
-handle_DOWN(Node, S, _E) ->
-    S1 = check_sync_requests(Node, S),
+handle_DOWN(Node, S, E) ->
+    S1 = check_sync_requests(Node, S, E),
     Head = {{{'_',g,'_'},'_'},'$1','_'},
     Gs = [{'==', {node,'$1'},Node}],
     Globs = ets:select(?TAB, [{Head, Gs, [{{{element,1,{element,1,'$_'}},
@@ -347,28 +350,31 @@ handle_DOWN(Node, S, _E) ->
             {ok, Broadcast, S1}
     end.
 
-check_sync_requests(Node, #state{sync_requests = SReqs} = S) ->
-    SReqs1 = lists:flatmap(
-               fun({From, Ns}) ->
-                       case Ns -- [Node] of
-                           [] ->
-                               gen_leader:reply(From, {leader, reply, true}),
-                               [];
-                           Ns1 ->
-                               [{From, Ns1}]
-                       end
-               end, SReqs),
-    S#state{sync_requests = SReqs1}.
-
-handle_leader_call(sync, From, #state{sync_requests = SReqs} = S, E) ->
-    GenLeader = gen_leader,
-    case GenLeader:alive(E) -- [node()] of
+check_sync_requests(Node, #state{sync_requests = SReqs} = S, E) ->
+    check_sync_requests(SReqs, Node, S, E).
+
+check_sync_requests([], _, S, _) ->
+    S;
+check_sync_requests([{From, Ns}|Reqs], Node, S, E) ->
+    case lists:member(Node, Ns) of
+        true ->
+            remove_node_from_sync_request(Node, Ns, From, S, E);
+        false ->
+            check_sync_requests(Reqs, Node, S, E)
+    end.
+
+remove_node_from_sync_request(Node, Ns, From, S, E) ->
+    case Ns -- [Node] of
         [] ->
-            {reply, true, S};
-        Alive ->
-            GenLeader:broadcast({from_leader, {sync, From}}, Alive, E),
-            {noreply, S#state{sync_requests = [{From, Alive}|SReqs]}}
-    end;
+            check_sync_requests(Node, send_sync_complete(From, S, E), E);
+        Ns1 ->
+            Rs1 = lists:keyreplace(
+                    From, 1, S#state.sync_requests, {From, Ns1}),
+            %% Yes, we start over and run through the list from the top,
+            %% with updated state; simpler code that way.
+            check_sync_requests(Node, S#state{sync_requests = Rs1}, E)
+    end.
+
 handle_leader_call({Reg, {_C,g,_Name} = K, Value, Pid, As, Op}, _From, S, _E)
   when Reg==reg; Reg==reg_other ->
     case gproc_lib:insert_reg(K, Value, Pid, g) of
@@ -667,7 +673,18 @@ handle_leader_call({await, Key, Pid}, {_,Ref} = From, S, _E) ->
 handle_leader_call(_, _, S, _E) ->
     {reply, badarg, S}.
 
-handle_leader_cast({sync_reply, Node, Ref}, S, _E) ->
+handle_leader_cast({initiate_sync, Ref}, S, E) ->
+    case gen_leader:alive(E) -- [node()] of
+        [] ->
+            %% ???
+            {noreply, send_sync_complete(Ref, S, E)};
+        Alive ->
+            gen_leader:broadcast({from_leader, {sync, Ref}}, Alive, E),
+            {noreply, S#state{sync_requests =
+                                  [{Ref, Alive}|S#state.sync_requests]}}
+    end;
+
+handle_leader_cast({sync_reply, Node, Ref}, S, E) ->
     #state{sync_requests = SReqs} = S,
     case lists:keyfind(Ref, 1, SReqs) of
         false ->
@@ -679,8 +696,7 @@ handle_leader_cast({sync_reply, Node, Ref}, S, _E) ->
         {_, Ns} ->
             case lists:delete(Node, Ns) of
                 [] ->
-                    gen_leader:reply(Ref, {leader, reply, true}),
-                    {ok, S#state{sync_requests = lists:keydelete(Ref,1,SReqs)}};
+                    {ok, send_sync_complete(Ref, S, E)};
                 Ns1 ->
                     SReqs1 = lists:keyreplace(Ref, 1, SReqs, {Ref, Ns1}),
                     {ok, S#state{sync_requests = SReqs1}}
@@ -843,6 +859,16 @@ terminate(_Reason, _S) ->
 from_leader({sync, Ref}, S, _E) ->
     gen_leader:leader_cast(?MODULE, {sync_reply, node(), Ref}),
     {ok, S};
+from_leader({sync_complete, Ref}, S, _E) ->
+    case Ref of
+        {From, _} when node(From) == node() ->
+            {ok, reply_to_sync_client(Ref, S)};
+        _ ->
+            %% we shouldn't have to, but ensure that we don't have
+            %% the sync request in our state.
+            {ok, S#state{sync_requests = lists:keydelete(
+                                           Ref, 1, S#state.sync_requests)}}
+    end;
 from_leader(Ops, S, _E) ->
     lists:foreach(
       fun({delete, Globals}) ->
@@ -887,7 +913,7 @@ delete_globals(Globals) ->
 do_notify([{P, Msg}|T]) when is_pid(P), node(P) =:= node() ->
     P ! Msg,
     do_notify(T);
-do_notify([{P, Msg}|T]) when is_pid(P) ->
+do_notify([{P, _Msg}|T]) when is_pid(P) ->
     do_notify(T);
 do_notify([{K, P, E}|T]) ->
     case ets:lookup(?TAB, {P,K}) of
@@ -911,11 +937,11 @@ leader_call(Req) ->
         Reply  -> Reply
     end.
 
-leader_call(Req, Timeout) ->
-    case gen_leader:leader_call(?MODULE, Req, Timeout) of
-        badarg -> ?THROW_GPROC_ERROR(badarg);
-        Reply  -> Reply
-    end.
+%% leader_call(Req, Timeout) ->
+%%     case gen_leader:leader_call(?MODULE, Req, Timeout) of
+%%         badarg -> ?THROW_GPROC_ERROR(badarg);
+%%         Reply  -> Reply
+%%     end.
 
 leader_cast(Msg) ->
     gen_leader:leader_cast(?MODULE, Msg).
@@ -1118,3 +1144,43 @@ ensure_rev(K) ->
 
 regged_new(reg   ) -> true;
 regged_new(ensure) -> new.
+
+
+initiate_sync(From, #state{is_leader = true} = S, E) ->
+    case gen_leader:alive(E) -- [node()] of
+        [] ->
+            %% I'm alone - sync is trivial
+            gen_server:reply(From, true),
+            S;
+        Alive ->
+            gen_leader:broadcast(
+              {from_leader, {sync, From}}, Alive, E),
+            S#state{sync_requests =
+                        [{From, Alive}|S#state.sync_requests]}
+    end;
+initiate_sync(From, S, _E) ->
+    leader_cast({initiate_sync, From}),
+    S.
+
+maybe_reinitiate_sync(#state{sync_clients = []} = S) ->
+    S;
+maybe_reinitiate_sync(#state{sync_clients = Cs} = S) ->
+    _ = [leader_cast({initiate_sync, From}) || From <- Cs],
+    S.
+
+send_sync_complete({From, _} = Ref, S, _E) when node(From) == node() ->
+    reply_to_sync_client(Ref, S);
+send_sync_complete({From, _} = Ref, S, E) ->
+    %% Notify the node that initiated the sync
+    %% 'broadcasting' to exactly one node.
+    gen_leader:broadcast(
+      {from_leader, {sync_complete, Ref}}, [node(From)], E),
+    S#state{sync_requests =
+                lists:keydelete(Ref, 1, S#state.sync_requests)}.
+
+reply_to_sync_client(Ref, S) ->
+    gen_server:reply(Ref, true),
+    S#state{sync_clients =
+                S#state.sync_clients -- [Ref],
+            sync_requests =
+                lists:keydelete(Ref, 1, S#state.sync_requests)}.

+ 4 - 3
test/gproc_dist_tests.erl

@@ -25,7 +25,7 @@
 -define(f(E), fun() -> ?debugVal(E) end).
 
 dist_test_() ->
-    {timeout, 180,
+    {timeout, 120,
      [
       %% {setup,
       %%  fun dist_setup/0,
@@ -49,7 +49,7 @@ dist_test_() ->
                 tests(Ns, [?f(t_fail_node(Ns))])
         end,
         fun(Ns) ->
-                tests(Ns, [{timeout, 15, ?f(t_master_dies(Ns))}])
+                tests(Ns, [{timeout, 10, ?f(t_master_dies(Ns))}])
         end
        ]}
      ]}.
@@ -575,6 +575,7 @@ t_sync_cand_dies([A,B,C]) ->
     %% immediately. Therefore, we should have our answer well within 1 sec.
     ?assertMatch({value, true}, rpc:nb_yield(Key, 1000)).
 
+
 %% Verify that the registry updates consistently if a non-leader node
 %% dies.
 t_fail_node(Ns) ->
@@ -628,7 +629,7 @@ try_sync(N, Ns) ->
                "  ~p~n"
                "Status = ~p~n",
                [Err, N,
-                {Ns, rpc:multicall(Ns, sys, get_status, [gproc_dist])}]),
+                {Ns, rpc:multicall([N|Ns], sys, get_status, [gproc_dist])}]),
             Err;
         true ->
             true