Browse Source

Resolve with system time as default.

Roberto Ostinelli 5 years ago
parent
commit
e78e1f1d71
3 changed files with 69 additions and 56 deletions
  1. 11 2
      src/syn_event_handler.erl
  2. 1 1
      src/syn_registry.erl
  3. 57 53
      test/syn_registry_SUITE.erl

+ 11 - 2
src/syn_event_handler.erl

@@ -103,8 +103,10 @@ do_resolve_registry_conflict(Name, {Pid1, Meta1, Time1}, {Pid2, Meta2, Time2}, C
             try CustomEventHandler:resolve_registry_conflict(Name, {Pid1, Meta1}, {Pid2, Meta2}) of
                 PidToKeep when is_pid(PidToKeep) ->
                     {PidToKeep, false};
+
                 _ ->
                     {undefined, false}
+
             catch Exception:Reason ->
                 error_logger:error_msg(
                     "Syn(~p): Error ~p in custom handler resolve_registry_conflict: ~p~n",
@@ -112,7 +114,14 @@ do_resolve_registry_conflict(Name, {Pid1, Meta1, Time1}, {Pid2, Meta2, Time2}, C
                 ),
                 {undefined, false}
             end;
+
         _ ->
-            %% by default, keep pid that generated the conflict & kill the one in the local table
-            {Pid2, true}
+            %% by default, keep pid registered more recently
+            %% this is a simple mechanism that can be imprecise, as system clocks are not perfectly aligned in a cluster
+            %% if something more elaborate is desired (such as vector clocks) use Meta to store data and a custom event handler
+            PidToKeep = case Time1 > Time2 of
+                true -> Pid1;
+                _ -> Pid2
+            end,
+            {PidToKeep, true}
     end.

+ 1 - 1
src/syn_registry.erl

@@ -464,7 +464,7 @@ register_on_node(Name, Pid, Meta) ->
             MRef
     end,
     %% add to table
-    Time = erlang:monotonic_time(),
+    Time = erlang:system_time(),
     add_to_local_table(Name, Pid, Meta, Time, MonitorRef),
     {ok, Time}.
 

+ 57 - 53
test/syn_registry_SUITE.erl

@@ -45,7 +45,8 @@
 -export([
     two_nodes_register_monitor_and_unregister/1,
     two_nodes_registry_count/1,
-    two_nodes_registration_race_condition_conflict_resolution_keep_remote/1,
+    two_nodes_registration_race_condition_conflict_resolution_keep_more_recent_remote/1,
+    two_nodes_registration_race_condition_conflict_resolution_keep_more_recent_local/1,
     two_nodes_registration_race_condition_conflict_resolution_keep_remote_with_custom_handler/1,
     two_nodes_registration_race_condition_conflict_resolution_keep_local_with_custom_handler/1,
     two_nodes_registration_race_condition_conflict_resolution_when_process_died/1,
@@ -56,7 +57,7 @@
 -export([
     three_nodes_partial_netsplit_consistency/1,
     three_nodes_full_netsplit_consistency/1,
-    three_nodes_start_syn_before_connecting_cluster_with_conflict/1,
+    three_nodes_start_syn_before_connecting_cluster_with_conflict_keep_more_recent/1,
     three_nodes_start_syn_before_connecting_cluster_with_custom_conflict_resolution_keep_remote/1,
     three_nodes_registration_race_condition_custom_conflict_resolution/1,
     three_nodes_anti_entropy/1,
@@ -121,7 +122,8 @@ groups() ->
         {two_nodes_process_registration, [shuffle], [
             two_nodes_register_monitor_and_unregister,
             two_nodes_registry_count,
-            two_nodes_registration_race_condition_conflict_resolution_keep_remote,
+            two_nodes_registration_race_condition_conflict_resolution_keep_more_recent_remote,
+            two_nodes_registration_race_condition_conflict_resolution_keep_more_recent_local,
             two_nodes_registration_race_condition_conflict_resolution_keep_remote_with_custom_handler,
             two_nodes_registration_race_condition_conflict_resolution_keep_local_with_custom_handler,
             two_nodes_registration_race_condition_conflict_resolution_when_process_died,
@@ -132,7 +134,7 @@ groups() ->
         {three_nodes_process_registration, [shuffle], [
             three_nodes_partial_netsplit_consistency,
             three_nodes_full_netsplit_consistency,
-            three_nodes_start_syn_before_connecting_cluster_with_conflict,
+            three_nodes_start_syn_before_connecting_cluster_with_conflict_keep_more_recent,
             three_nodes_start_syn_before_connecting_cluster_with_custom_conflict_resolution_keep_remote,
             three_nodes_registration_race_condition_custom_conflict_resolution,
             three_nodes_anti_entropy,
@@ -500,7 +502,7 @@ two_nodes_registry_count(Config) ->
     0 = syn:registry_count(node()),
     0 = syn:registry_count(SlaveNode).
 
-two_nodes_registration_race_condition_conflict_resolution_keep_remote(Config) ->
+two_nodes_registration_race_condition_conflict_resolution_keep_more_recent_remote(Config) ->
     ConflictingName = "COMMON",
     %% get slaves
     SlaveNode = proplists:get_value(slave_node, Config),
@@ -512,7 +514,7 @@ two_nodes_registration_race_condition_conflict_resolution_keep_remote(Config) ->
     Pid0 = syn_test_suite_helper:start_process(),
     Pid1 = syn_test_suite_helper:start_process(SlaveNode),
     %% inject into syn to simulate concurrent registration
-    ok = syn_registry:add_to_local_table(ConflictingName, Pid0, node(), erlang:monotonic_time() - 1000, undefined),
+    ok = syn_registry:add_to_local_table(ConflictingName, Pid0, node(), erlang:system_time() - 1000000000, undefined),
     %% register on slave node to trigger conflict resolution on master node
     ok = rpc:call(SlaveNode, syn, register, [ConflictingName, Pid1, SlaveNode]),
     timer:sleep(1000),
@@ -523,6 +525,30 @@ two_nodes_registration_race_condition_conflict_resolution_keep_remote(Config) ->
     false = is_process_alive(Pid0),
     true = rpc:call(SlaveNode, erlang, is_process_alive, [Pid1]).
 
+two_nodes_registration_race_condition_conflict_resolution_keep_more_recent_local(Config) ->
+    ConflictingName = "COMMON",
+    %% get slaves
+    SlaveNode = proplists:get_value(slave_node, Config),
+    %% start syn on nodes
+    ok = syn:start(),
+    ok = rpc:call(SlaveNode, syn, start, []),
+    timer:sleep(1000),
+    %% start processes
+    Pid0 = syn_test_suite_helper:start_process(),
+    Pid1 = syn_test_suite_helper:start_process(SlaveNode),
+    %% inject into syn to simulate concurrent registration
+    ok = syn_registry:add_to_local_table(ConflictingName, Pid0, node(), erlang:system_time() + 1000000000, undefined),
+    %% register on slave node to trigger conflict resolution on master node
+    ok = rpc:call(SlaveNode, syn, register, [ConflictingName, Pid1, SlaveNode]),
+    timer:sleep(1000),
+    %% check metadata, resolution happens on master node
+    Node = node(),
+    {Pid0, Node} = syn:whereis(ConflictingName, with_meta),
+    {Pid0, Node} = rpc:call(SlaveNode, syn, whereis, [ConflictingName, with_meta]),
+    %% check that other processes are not alive because syn killed them
+    true = is_process_alive(Pid0),
+    false = rpc:call(SlaveNode, erlang, is_process_alive, [Pid1]).
+
 two_nodes_registration_race_condition_conflict_resolution_keep_remote_with_custom_handler(Config) ->
     ConflictingName = "COMMON",
     %% get slaves
@@ -540,7 +566,7 @@ two_nodes_registration_race_condition_conflict_resolution_keep_remote_with_custo
     %% register
     ok = syn:register(ConflictingName, Pid0, node()),
     %% trigger conflict resolution on master node with something less recent (which would be discarded without a custom handler)
-    ok = syn_registry:sync_register(node(), ConflictingName, Pid1, keep_this_one, erlang:monotonic_time() - 1000),
+    ok = syn_registry:sync_register(node(), ConflictingName, Pid1, keep_this_one, erlang:system_time() - 1000000000),
     timer:sleep(1000),
     %% check metadata, resolution happens on master node
     {Pid1, keep_this_one} = syn:whereis(ConflictingName, with_meta),
@@ -566,7 +592,7 @@ two_nodes_registration_race_condition_conflict_resolution_keep_local_with_custom
     Pid0 = syn_test_suite_helper:start_process(),
     Pid1 = syn_test_suite_helper:start_process(SlaveNode),
     %% inject into syn to simulate concurrent registration with something more recent (which would be picked without a custom handler)
-    ok = syn_registry:add_to_local_table(ConflictingName, Pid0, keep_this_one, undefined, erlang:monotonic_time() + 1000),
+    ok = syn_registry:add_to_local_table(ConflictingName, Pid0, keep_this_one, undefined, erlang:system_time() + 1000000000),
     %% register on slave node to trigger conflict resolution on master node
     ok = rpc:call(SlaveNode, syn, register, [ConflictingName, Pid1, SlaveNode]),
     timer:sleep(1000),
@@ -850,7 +876,7 @@ three_nodes_full_netsplit_consistency(Config) ->
     Pid1 = rpc:call(SlaveNode2, syn, whereis, [<<"proc1">>]),
     Pid2 = rpc:call(SlaveNode2, syn, whereis, [<<"proc2">>]).
 
-three_nodes_start_syn_before_connecting_cluster_with_conflict(Config) ->
+three_nodes_start_syn_before_connecting_cluster_with_conflict_keep_more_recent(Config) ->
     ConflictingName = "COMMON",
     %% get slaves
     SlaveNode1 = proplists:get_value(slave_node_1, Config),
@@ -860,15 +886,17 @@ three_nodes_start_syn_before_connecting_cluster_with_conflict(Config) ->
     Pid1 = syn_test_suite_helper:start_process(SlaveNode1),
     Pid2 = syn_test_suite_helper:start_process(SlaveNode2),
     %% start delayed
-    start_syn_delayed_and_register_local_process(ConflictingName, Pid0, 1500),
-    rpc:cast(SlaveNode1, ?MODULE, start_syn_delayed_and_register_local_process, [ConflictingName, Pid1, 1500]),
-    rpc:cast(SlaveNode2, ?MODULE, start_syn_delayed_and_register_local_process, [ConflictingName, Pid2, 1500]),
-    timer:sleep(500),
+    start_syn_delayed_and_register_local_process(ConflictingName, Pid0, 2000),
+    timer:sleep(250),
+    rpc:cast(SlaveNode1, ?MODULE, start_syn_delayed_and_register_local_process, [ConflictingName, Pid1, 2000]),
+    timer:sleep(250),
+    rpc:cast(SlaveNode2, ?MODULE, start_syn_delayed_and_register_local_process, [ConflictingName, Pid2, 2000]),
+    timer:sleep(1000),
     %% disconnect all
     rpc:call(SlaveNode1, syn_test_suite_helper, disconnect_node, [SlaveNode2]),
     syn_test_suite_helper:disconnect_node(SlaveNode1),
     syn_test_suite_helper:disconnect_node(SlaveNode2),
-    timer:sleep(2000),
+    timer:sleep(3000),
     [] = nodes(),
     %% reconnect all
     syn_test_suite_helper:connect_node(SlaveNode1),
@@ -884,37 +912,13 @@ three_nodes_start_syn_before_connecting_cluster_with_conflict(Config) ->
     true = lists:member(rpc:call(SlaveNode1, syn, whereis, [ConflictingName]), [Pid0, Pid1, Pid2]),
     true = lists:member(rpc:call(SlaveNode2, syn, whereis, [ConflictingName]), [Pid0, Pid1, Pid2]),
     %% check metadata
-    case syn:whereis(ConflictingName, with_meta) of
-        {Pid0, Meta} ->
-            CurrentNode = node(),
-            %% check that other nodes' data corresponds
-            {Pid0, CurrentNode} = rpc:call(SlaveNode1, syn, whereis, [ConflictingName, with_meta]),
-            {Pid0, CurrentNode} = rpc:call(SlaveNode2, syn, whereis, [ConflictingName, with_meta]),
-            %% check that other processes are not alive because syn killed them
-            true = is_process_alive(Pid0),
-            false = rpc:call(SlaveNode1, erlang, is_process_alive, [Pid1]),
-            false = rpc:call(SlaveNode2, erlang, is_process_alive, [Pid2]);
-        {Pid1, Meta} ->
-            SlaveNode1 = Meta,
-            %% check that other nodes' data corresponds
-            {Pid1, Meta} = rpc:call(SlaveNode1, syn, whereis, [ConflictingName, with_meta]),
-            {Pid1, Meta} = rpc:call(SlaveNode2, syn, whereis, [ConflictingName, with_meta]),
-            %% check that other processes are not alive because syn killed them
-            false = is_process_alive(Pid0),
-            true = rpc:call(SlaveNode1, erlang, is_process_alive, [Pid1]),
-            false = rpc:call(SlaveNode2, erlang, is_process_alive, [Pid2]);
-        {Pid2, Meta} ->
-            SlaveNode2 = Meta,
-            %% check that other nodes' data corresponds
-            {Pid2, Meta} = rpc:call(SlaveNode1, syn, whereis, [ConflictingName, with_meta]),
-            {Pid2, Meta} = rpc:call(SlaveNode2, syn, whereis, [ConflictingName, with_meta]),
-            %% check that other processes are not alive because syn killed them
-            false = is_process_alive(Pid0),
-            false = rpc:call(SlaveNode1, erlang, is_process_alive, [Pid1]),
-            true = rpc:call(SlaveNode2, erlang, is_process_alive, [Pid2]);
-        _ ->
-            ok = no_process_is_registered_with_conflicting_name
-    end.
+    {Pid2, SlaveNode2} = syn:whereis(ConflictingName, with_meta),
+    {Pid2, SlaveNode2} = rpc:call(SlaveNode1, syn, whereis, [ConflictingName, with_meta]),
+    {Pid2, SlaveNode2} = rpc:call(SlaveNode2, syn, whereis, [ConflictingName, with_meta]),
+    %% check that other processes are not alive because syn killed them
+    false = is_process_alive(Pid0),
+    false = rpc:call(SlaveNode1, erlang, is_process_alive, [Pid1]),
+    true = rpc:call(SlaveNode2, erlang, is_process_alive, [Pid2]).
 
 three_nodes_start_syn_before_connecting_cluster_with_custom_conflict_resolution_keep_remote(Config) ->
     ConflictingName = "COMMON",
@@ -992,7 +996,7 @@ three_nodes_registration_race_condition_custom_conflict_resolution(Config) ->
     Pid1 = syn_test_suite_helper:start_process(SlaveNode1),
     Pid2 = syn_test_suite_helper:start_process(SlaveNode2),
     %% inject into syn to simulate concurrent registration with something less recent (which would be discarded without a custom handler)
-    ok = rpc:call(SlaveNode1, syn_registry, add_to_local_table, [ConflictingName, Pid1, keep_this_one, erlang:monotonic_time() - 1000, undefined]),
+    ok = rpc:call(SlaveNode1, syn_registry, add_to_local_table, [ConflictingName, Pid1, keep_this_one, erlang:system_time() - 1000000000, undefined]),
     %% register on master node to trigger conflict resolution
     ok = syn:register(ConflictingName, Pid0, node()),
     timer:sleep(1000),
@@ -1038,9 +1042,9 @@ three_nodes_anti_entropy(Config) ->
     ok = syn_registry:add_to_local_table("pid0", Pid0, node(), 0, undefined),
     ok = rpc:call(SlaveNode1, syn_registry, add_to_local_table, ["pid1", Pid1, SlaveNode1, 0, undefined]),
     ok = rpc:call(SlaveNode2, syn_registry, add_to_local_table, ["pid2", Pid2, SlaveNode2, 0, undefined]),
-    ok = syn_registry:add_to_local_table("conflict", Pid0Conflict, node(), erlang:monotonic_time() + 1000, undefined),
-    ok = rpc:call(SlaveNode1, syn_registry, add_to_local_table, ["conflict", Pid1Conflict, keep_this_one, erlang:monotonic_time(), undefined]),
-    ok = rpc:call(SlaveNode2, syn_registry, add_to_local_table, ["conflict", Pid2Conflict, SlaveNode2, erlang:monotonic_time() + 1000, undefined]),
+    ok = syn_registry:add_to_local_table("conflict", Pid0Conflict, node(), erlang:system_time() + 1000000000, undefined),
+    ok = rpc:call(SlaveNode1, syn_registry, add_to_local_table, ["conflict", Pid1Conflict, keep_this_one, erlang:system_time(), undefined]),
+    ok = rpc:call(SlaveNode2, syn_registry, add_to_local_table, ["conflict", Pid2Conflict, SlaveNode2, erlang:system_time() + 1000000000, undefined]),
     %% wait to let anti-entropy settle
     timer:sleep(5000),
     %% check
@@ -1083,9 +1087,9 @@ three_nodes_anti_entropy_manual(Config) ->
     ok = syn_registry:add_to_local_table("pid0", Pid0, node(), 0, undefined),
     ok = rpc:call(SlaveNode1, syn_registry, add_to_local_table, ["pid1", Pid1, SlaveNode1, 0, undefined]),
     ok = rpc:call(SlaveNode2, syn_registry, add_to_local_table, ["pid2", Pid2, SlaveNode2, 0, undefined]),
-    ok = syn_registry:add_to_local_table("conflict", Pid0Conflict, node(), erlang:monotonic_time() + 1000, undefined),
-    ok = rpc:call(SlaveNode1, syn_registry, add_to_local_table, ["conflict", Pid1Conflict, keep_this_one, erlang:monotonic_time(), undefined]),
-    ok = rpc:call(SlaveNode2, syn_registry, add_to_local_table, ["conflict", Pid2Conflict, SlaveNode2, erlang:monotonic_time() + 1000, undefined]),
+    ok = syn_registry:add_to_local_table("conflict", Pid0Conflict, node(), erlang:system_time() + 1000000000, undefined),
+    ok = rpc:call(SlaveNode1, syn_registry, add_to_local_table, ["conflict", Pid1Conflict, keep_this_one, erlang:system_time(), undefined]),
+    ok = rpc:call(SlaveNode2, syn_registry, add_to_local_table, ["conflict", Pid2Conflict, SlaveNode2, erlang:system_time() + 1000000000, undefined]),
     %% call anti entropy
     ok = syn:force_cluster_sync(registry),
     timer:sleep(1000),
@@ -1158,7 +1162,7 @@ three_nodes_resolve_conflict_on_all_nodes(Config) ->
     {Pid1, SlaveNode1} = rpc:call(SlaveNode1, syn, whereis, [CommonName, with_meta]),
     {Pid1, SlaveNode1} = rpc:call(SlaveNode2, syn, whereis, [CommonName, with_meta]),
     %% force a sync registration conflict on master node from slave 2
-    syn_registry:sync_register(node(), CommonName, Pid2, SlaveNode2, erlang:monotonic_time() - 1000),
+    syn_registry:sync_register(node(), CommonName, Pid2, SlaveNode2, erlang:system_time() + 1000000000),
     timer:sleep(1000),
     %% check
     {Pid2, SlaveNode2} = syn:whereis(CommonName, with_meta),