Просмотр исходного кода

Merge pull request #150 from uwiger/uw-rewrite-sync

Uw rewrite sync
Ulf Wiger 7 лет назад
Родитель
Сommit
c4cc57fc93
2 измененных файлов с 120 добавлено и 38 удалено
  1. 101 35
      src/gproc_dist.erl
  2. 19 3
      test/gproc_dist_tests.erl

+ 101 - 35
src/gproc_dist.erl

@@ -69,6 +69,7 @@
 -record(state, {
           always_broadcast = false,
           is_leader,
+          sync_clients = [],
           sync_requests = []}).
 
 -include("gproc_trace.hrl").
@@ -266,7 +267,7 @@ reset_counter(_) ->
 %%
 sync() ->
     %% Increase timeout since gen_leader can take some time ...
-    leader_call(sync, 10000).
+    gen_server:call(?MODULE, sync, 5000).
 
 %% @spec get_leader() -> node()
 %% @doc Returns the node of the current gproc leader.
@@ -283,6 +284,8 @@ handle_cast(_Msg, S, _) ->
 
 handle_call(get_leader, _, S, E) ->
     {reply, gen_leader:leader_node(E), S};
+handle_call(sync, From, S, E) ->
+    {noreply, initiate_sync(From, S, E)};
 handle_call(_, _, S, _) ->
     {reply, badarg, S}.
 
@@ -326,16 +329,16 @@ globs() ->
 surrendered(#state{is_leader = true} = S, {globals, Globs}, _E) ->
     %% Leader conflict!
     surrendered_1(Globs),
-    {ok, S#state{is_leader = false}};
+    {ok, maybe_reinitiate_sync(S#state{is_leader = false})};
 surrendered(S, {globals, Globs}, _E) ->
     %% globals from this node should be more correct in our table than
     %% in the leader's
     surrendered_1(Globs),
-    {ok, S#state{is_leader = false}}.
+    {ok, maybe_reinitiate_sync(S#state{is_leader = false})}.
 
 
-handle_DOWN(Node, S, _E) ->
-    S1 = check_sync_requests(Node, S),
+handle_DOWN(Node, S, E) ->
+    S1 = check_sync_requests(Node, S, E),
     Head = {{{'_',g,'_'},'_'},'$1','_'},
     Gs = [{'==', {node,'$1'},Node}],
     Globs = ets:select(?TAB, [{Head, Gs, [{{{element,1,{element,1,'$_'}},
@@ -347,28 +350,31 @@ handle_DOWN(Node, S, _E) ->
             {ok, Broadcast, S1}
     end.
 
-check_sync_requests(Node, #state{sync_requests = SReqs} = S) ->
-    SReqs1 = lists:flatmap(
-               fun({From, Ns}) ->
-                       case Ns -- [Node] of
-                           [] ->
-                               gen_leader:reply(From, {leader, reply, true}),
-                               [];
-                           Ns1 ->
-                               [{From, Ns1}]
-                       end
-               end, SReqs),
-    S#state{sync_requests = SReqs1}.
-
-handle_leader_call(sync, From, #state{sync_requests = SReqs} = S, E) ->
-    GenLeader = gen_leader,
-    case GenLeader:alive(E) -- [node()] of
+check_sync_requests(Node, #state{sync_requests = SReqs} = S, E) ->
+    check_sync_requests(SReqs, Node, S, E).
+
+check_sync_requests([], _, S, _) ->
+    S;
+check_sync_requests([{From, Ns}|Reqs], Node, S, E) ->
+    case lists:member(Node, Ns) of
+        true ->
+            remove_node_from_sync_request(Node, Ns, From, S, E);
+        false ->
+            check_sync_requests(Reqs, Node, S, E)
+    end.
+
+remove_node_from_sync_request(Node, Ns, From, S, E) ->
+    case Ns -- [Node] of
         [] ->
-            {reply, true, S};
-        Alive ->
-            GenLeader:broadcast({from_leader, {sync, From}}, Alive, E),
-            {noreply, S#state{sync_requests = [{From, Alive}|SReqs]}}
-    end;
+            check_sync_requests(Node, send_sync_complete(From, S, E), E);
+        Ns1 ->
+            Rs1 = lists:keyreplace(
+                    From, 1, S#state.sync_requests, {From, Ns1}),
+            %% Yes, we start over and run through the list from the top,
+            %% with updated state; simpler code that way.
+            check_sync_requests(Node, S#state{sync_requests = Rs1}, E)
+    end.
+
 handle_leader_call({Reg, {_C,g,_Name} = K, Value, Pid, As, Op}, _From, S, _E)
   when Reg==reg; Reg==reg_other ->
     case gproc_lib:insert_reg(K, Value, Pid, g) of
@@ -667,7 +673,18 @@ handle_leader_call({await, Key, Pid}, {_,Ref} = From, S, _E) ->
 handle_leader_call(_, _, S, _E) ->
     {reply, badarg, S}.
 
-handle_leader_cast({sync_reply, Node, Ref}, S, _E) ->
+handle_leader_cast({initiate_sync, Ref}, S, E) ->
+    case gen_leader:alive(E) -- [node()] of
+        [] ->
+            %% ???
+            {noreply, send_sync_complete(Ref, S, E)};
+        Alive ->
+            gen_leader:broadcast({from_leader, {sync, Ref}}, Alive, E),
+            {noreply, S#state{sync_requests =
+                                  [{Ref, Alive}|S#state.sync_requests]}}
+    end;
+
+handle_leader_cast({sync_reply, Node, Ref}, S, E) ->
     #state{sync_requests = SReqs} = S,
     case lists:keyfind(Ref, 1, SReqs) of
         false ->
@@ -679,8 +696,7 @@ handle_leader_cast({sync_reply, Node, Ref}, S, _E) ->
         {_, Ns} ->
             case lists:delete(Node, Ns) of
                 [] ->
-                    gen_leader:reply(Ref, {leader, reply, true}),
-                    {ok, S#state{sync_requests = lists:keydelete(Ref,1,SReqs)}};
+                    {ok, send_sync_complete(Ref, S, E)};
                 Ns1 ->
                     SReqs1 = lists:keyreplace(Ref, 1, SReqs, {Ref, Ns1}),
                     {ok, S#state{sync_requests = SReqs1}}
@@ -843,6 +859,16 @@ terminate(_Reason, _S) ->
 from_leader({sync, Ref}, S, _E) ->
     gen_leader:leader_cast(?MODULE, {sync_reply, node(), Ref}),
     {ok, S};
+from_leader({sync_complete, Ref}, S, _E) ->
+    case Ref of
+        {From, _} when node(From) == node() ->
+            {ok, reply_to_sync_client(Ref, S)};
+        _ ->
+            %% we shouldn't have to, but ensure that we don't have
+            %% the sync request in our state.
+            {ok, S#state{sync_requests = lists:keydelete(
+                                           Ref, 1, S#state.sync_requests)}}
+    end;
 from_leader(Ops, S, _E) ->
     lists:foreach(
       fun({delete, Globals}) ->
@@ -887,7 +913,7 @@ delete_globals(Globals) ->
 do_notify([{P, Msg}|T]) when is_pid(P), node(P) =:= node() ->
     P ! Msg,
     do_notify(T);
-do_notify([{P, Msg}|T]) when is_pid(P) ->
+do_notify([{P, _Msg}|T]) when is_pid(P) ->
     do_notify(T);
 do_notify([{K, P, E}|T]) ->
     case ets:lookup(?TAB, {P,K}) of
@@ -911,11 +937,11 @@ leader_call(Req) ->
         Reply  -> Reply
     end.
 
-leader_call(Req, Timeout) ->
-    case gen_leader:leader_call(?MODULE, Req, Timeout) of
-        badarg -> ?THROW_GPROC_ERROR(badarg);
-        Reply  -> Reply
-    end.
+%% leader_call(Req, Timeout) ->
+%%     case gen_leader:leader_call(?MODULE, Req, Timeout) of
+%%         badarg -> ?THROW_GPROC_ERROR(badarg);
+%%         Reply  -> Reply
+%%     end.
 
 leader_cast(Msg) ->
     gen_leader:leader_cast(?MODULE, Msg).
@@ -1118,3 +1144,43 @@ ensure_rev(K) ->
 
 regged_new(reg   ) -> true;
 regged_new(ensure) -> new.
+
+
+initiate_sync(From, #state{is_leader = true} = S, E) ->
+    case gen_leader:alive(E) -- [node()] of
+        [] ->
+            %% I'm alone - sync is trivial
+            gen_server:reply(From, true),
+            S;
+        Alive ->
+            gen_leader:broadcast(
+              {from_leader, {sync, From}}, Alive, E),
+            S#state{sync_requests =
+                        [{From, Alive}|S#state.sync_requests]}
+    end;
+initiate_sync(From, S, _E) ->
+    leader_cast({initiate_sync, From}),
+    S.
+
+maybe_reinitiate_sync(#state{sync_clients = []} = S) ->
+    S;
+maybe_reinitiate_sync(#state{sync_clients = Cs} = S) ->
+    _ = [leader_cast({initiate_sync, From}) || From <- Cs],
+    S.
+
+send_sync_complete({From, _} = Ref, S, _E) when node(From) == node() ->
+    reply_to_sync_client(Ref, S);
+send_sync_complete({From, _} = Ref, S, E) ->
+    %% Notify the node that initiated the sync
+    %% 'broadcasting' to exactly one node.
+    gen_leader:broadcast(
+      {from_leader, {sync_complete, Ref}}, [node(From)], E),
+    S#state{sync_requests =
+                lists:keydelete(Ref, 1, S#state.sync_requests)}.
+
+reply_to_sync_client(Ref, S) ->
+    gen_server:reply(Ref, true),
+    S#state{sync_clients =
+                S#state.sync_clients -- [Ref],
+            sync_requests =
+                lists:keydelete(Ref, 1, S#state.sync_requests)}.

+ 19 - 3
test/gproc_dist_tests.erl

@@ -25,7 +25,7 @@
 -define(f(E), fun() -> ?debugVal(E) end).
 
 dist_test_() ->
-    {timeout, 180,
+    {timeout, 120,
      [
       %% {setup,
       %%  fun dist_setup/0,
@@ -49,7 +49,7 @@ dist_test_() ->
                 tests(Ns, [?f(t_fail_node(Ns))])
         end,
         fun(Ns) ->
-                tests(Ns, [{timeout, 15, ?f(t_master_dies(Ns))}])
+                tests(Ns, [{timeout, 10, ?f(t_master_dies(Ns))}])
         end
        ]}
      ]}.
@@ -575,6 +575,7 @@ t_sync_cand_dies([A,B,C]) ->
     %% immediately. Therefore, we should have our answer well within 1 sec.
     ?assertMatch({value, true}, rpc:nb_yield(Key, 1000)).
 
+
 %% Verify that the registry updates consistently if a non-leader node
 %% dies.
 t_fail_node(Ns) ->
@@ -613,12 +614,27 @@ t_master_dies([A,B,C] = Ns) ->
     ?assertMatch(ok, rpc:call(L, application, stop, [gproc])),
     Names = [{Na,Pa}, {Nb,Pb}, {Nc,Pc}] -- [{Nl, Pl}],
     RestNs = Ns -- [L],
-    ?assertMatch(true, rpc:call(hd(RestNs), gproc_dist, sync, [])),
+    %% ?assertMatch(true, rpc:call(hd(RestNs), gproc_dist, sync, [])),
+    ?assertMatch(true, try_sync(hd(RestNs), RestNs)),
     ?assertMatch(ok, t_lookup_everywhere(Nl, RestNs, undefined)),
     [?assertMatch(ok, t_lookup_everywhere(Nx, RestNs, Px))
      || {Nx, Px} <- Names],
     ok.
 
+try_sync(N, Ns) ->
+    case rpc:call(N, gproc_dist, sync, []) of
+        {badrpc, _} = Err ->
+            ?debugFmt(
+               "Error in gproc_dist:sync() (~p):~n"
+               "  ~p~n"
+               "Status = ~p~n",
+               [Err, N,
+                {Ns, rpc:multicall([N|Ns], sys, get_status, [gproc_dist])}]),
+            Err;
+        true ->
+            true
+    end.
+
 t_sleep() ->
     timer:sleep(500).