Browse Source

Move conflict resolution on node that detects the conflict.

Resolution only happens between the node and the remote node that requested the write.
Roberto Ostinelli 5 years ago
parent
commit
864cbbf679
1 changed files with 32 additions and 56 deletions
  1. 32 56
      src/syn_registry.erl

+ 32 - 56
src/syn_registry.erl

@@ -35,8 +35,8 @@
 
 
 %% sync API
 %% sync API
 -export([sync_get_local_registry_tuples/1]).
 -export([sync_get_local_registry_tuples/1]).
--export([raise_inconsistent_name_data/5]).
--export([add_remote_to_local_table/3, add_to_local_table/4, remove_from_local_table/1]).
+-export([add_remote_to_local_table/4]).
+-export([add_to_local_table/4, remove_from_local_table/1]).
 
 
 %% gen_server callbacks
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@@ -100,21 +100,15 @@ count(Node) ->
     RegistryTuples = get_registry_tuples_for_node(Node),
     RegistryTuples = get_registry_tuples_for_node(Node),
     length(RegistryTuples).
     length(RegistryTuples).
 
 
+-spec add_remote_to_local_table(RemoteNode :: node(), Name :: any(), RemotePid :: pid(), RemoteMeta :: any()) -> ok.
+add_remote_to_local_table(RemoteNode, Name, RemotePid, RemoteMeta) ->
+    gen_server:cast({?MODULE, RemoteNode}, {add_remote_to_local_table, Name, RemotePid, RemoteMeta}).
+
 -spec sync_get_local_registry_tuples(FromNode :: node()) -> [syn_registry_tuple()].
 -spec sync_get_local_registry_tuples(FromNode :: node()) -> [syn_registry_tuple()].
 sync_get_local_registry_tuples(FromNode) ->
 sync_get_local_registry_tuples(FromNode) ->
     error_logger:info_msg("Syn(~p): Received request of local registry tuples from remote node ~p", [node(), FromNode]),
     error_logger:info_msg("Syn(~p): Received request of local registry tuples from remote node ~p", [node(), FromNode]),
     get_registry_tuples_for_node(node()).
     get_registry_tuples_for_node(node()).
 
 
--spec raise_inconsistent_name_data(
-    RemoteNode :: node(),
-    OriginatingNode :: node(),
-    Name :: any(),
-    Pid :: pid(),
-    Meta :: any()
-) -> ok.
-raise_inconsistent_name_data(RemoteNode, OriginatingNode, Name, Pid, Meta) ->
-    gen_server:cast({?MODULE, RemoteNode}, {inconsistent_name_data, OriginatingNode, Name, Pid, Meta}).
-
 %% ===================================================================
 %% ===================================================================
 %% Callbacks
 %% Callbacks
 %% ===================================================================
 %% ===================================================================
@@ -200,28 +194,33 @@ handle_call(Request, From, State) ->
     {noreply, #state{}, Timeout :: non_neg_integer()} |
     {noreply, #state{}, Timeout :: non_neg_integer()} |
     {stop, Reason :: any(), #state{}}.
     {stop, Reason :: any(), #state{}}.
 
 
-handle_cast({inconsistent_name_data, OriginatingNode, Name, RemotePid, RemoteMeta}, State) ->
-    error_logger:warning_msg("Syn(~p): Inconsistent name ~p signalled from node ~p", [node(), Name, OriginatingNode]),
-    global:trans({{?MODULE, auto_merge_registry}, self()},
-        fun() ->
-            case find_process_entry_by_name(Name) of
-                undefined ->
-                    error_logger:info_msg("Syn(~p): No local data for name ~p, skipping", [node(), Name]);
-
-                Entry ->
-                    TablePid = Entry#syn_registry_table.pid,
-                    TableMeta = Entry#syn_registry_table.meta,
+handle_cast({add_remote_to_local_table, Name, RemotePid, RemoteMeta}, State) ->
+    %% check for conflicts
+    case find_process_entry_by_name(Name) of
+        undefined ->
+            %% no conflict
+            add_to_local_table(Name, RemotePid, RemoteMeta, undefined);
+        Entry when Entry#syn_registry_table.pid =:= RemotePid ->
+            %% no conflict - TODO: we still could have a conflict on meta, use vclocks?
+            add_to_local_table(Name, RemotePid, RemoteMeta, undefined);
+        Entry ->
+            %% different pid, we have a conflict
+            RemoteNode = node(RemotePid),
 
 
+            global:trans({{?MODULE, {inconsistent_name, Name}}, self()},
+                fun() ->
                     error_logger:info_msg(
                     error_logger:info_msg(
-                        "Syn(~p): REGISTRY NAME MERGE ----> Initiating for originating node ~p",
-                        [node(), OriginatingNode]
+                        "Syn(~p): REGISTRY NAME INCONSISTENCY FOR ~p ----> Initiating for remote node ~p",
+                        [node(), Name, RemoteNode]
                     ),
                     ),
+                    TablePid = Entry#syn_registry_table.pid,
+                    TableMeta = Entry#syn_registry_table.meta,
 
 
                     resolve_conflict(Name, {TablePid, TableMeta}, {RemotePid, RemoteMeta},
                     resolve_conflict(Name, {TablePid, TableMeta}, {RemotePid, RemoteMeta},
                         %% keep currently in table
                         %% keep currently in table
                         fun() ->
                         fun() ->
                             %% overwrite
                             %% overwrite
-                            ok = rpc:call(OriginatingNode, syn_registry, add_to_local_table, [Name, TablePid, TableMeta, undefined])
+                            ok = rpc:call(RemoteNode, syn_registry, add_to_local_table, [Name, TablePid, TableMeta, undefined])
                         end,
                         end,
                         %% keep remote
                         %% keep remote
                         fun() ->
                         fun() ->
@@ -229,16 +228,11 @@ handle_cast({inconsistent_name_data, OriginatingNode, Name, RemotePid, RemoteMet
                             add_to_local_table(Name, RemotePid, RemoteMeta, undefined)
                             add_to_local_table(Name, RemotePid, RemoteMeta, undefined)
                         end,
                         end,
                         State
                         State
-                    ),
-
-                    error_logger:info_msg(
-                        "Syn(~p): REGISTRY NAME MERGE ----> Done for originating node ~p",
-                        [node(), OriginatingNode]
                     )
                     )
-            end
-        end
-    ),
-    %% resume
+                end
+            )
+    end,
+    %% return
     {noreply, State};
     {noreply, State};
 
 
 handle_cast(Msg, State) ->
 handle_cast(Msg, State) ->
@@ -363,7 +357,9 @@ code_change(_OldVsn, State, _Extra) ->
 -spec multicast_register(Name :: any(), Pid :: pid(), Meta :: any()) -> pid().
 -spec multicast_register(Name :: any(), Pid :: pid(), Meta :: any()) -> pid().
 multicast_register(Name, Pid, Meta) ->
 multicast_register(Name, Pid, Meta) ->
     spawn_link(fun() ->
     spawn_link(fun() ->
-        rpc:eval_everywhere(nodes(), ?MODULE, add_remote_to_local_table, [Name, Pid, Meta])
+        lists:foreach(fun(RemoteNode) ->
+            add_remote_to_local_table(RemoteNode, Name, Pid, Meta)
+        end, nodes())
     end).
     end).
 
 
 -spec multicast_unregister(Name :: any()) -> pid().
 -spec multicast_unregister(Name :: any()) -> pid().
@@ -372,14 +368,6 @@ multicast_unregister(Name) ->
         rpc:eval_everywhere(nodes(), ?MODULE, remove_from_local_table, [Name])
         rpc:eval_everywhere(nodes(), ?MODULE, remove_from_local_table, [Name])
     end).
     end).
 
 
--spec multicast_inconsistent_name_data(Name :: any(), Pid :: pid(), Meta :: any()) -> pid().
-multicast_inconsistent_name_data(Name, Pid, Meta) ->
-    spawn_link(fun() ->
-        lists:foreach(fun(RemoteNode) ->
-            raise_inconsistent_name_data(RemoteNode, node(), Name, Pid, Meta)
-        end, nodes())
-    end).
-
 -spec register_on_node(Name :: any(), Pid :: pid(), Meta :: any()) -> ok.
 -spec register_on_node(Name :: any(), Pid :: pid(), Meta :: any()) -> ok.
 register_on_node(Name, Pid, Meta) ->
 register_on_node(Name, Pid, Meta) ->
     MonitorRef = case find_processes_entry_by_pid(Pid) of
     MonitorRef = case find_processes_entry_by_pid(Pid) of
@@ -405,18 +393,6 @@ unregister_on_node(Name) ->
             remove_from_local_table(Name)
             remove_from_local_table(Name)
     end.
     end.
 
 
--spec add_remote_to_local_table(Name :: any(), RemotePid :: pid(), RemoteMeta :: any()) -> ok.
-add_remote_to_local_table(Name, RemotePid, RemoteMeta) ->
-    %% check for conflicts
-    case find_process_entry_by_name(Name) of
-        undefined ->
-            %% no conflict
-            add_to_local_table(Name, RemotePid, RemoteMeta, undefined);
-        Entry ->
-            %% conflict found, raise resolution
-            multicast_inconsistent_name_data(Name, Entry#syn_registry_table.pid, Entry#syn_registry_table.meta)
-    end.
-
 -spec add_to_local_table(Name :: any(), Pid :: pid(), Meta :: any(), MonitorRef :: undefined | reference()) -> ok.
 -spec add_to_local_table(Name :: any(), Pid :: pid(), Meta :: any(), MonitorRef :: undefined | reference()) -> ok.
 add_to_local_table(Name, Pid, Meta, MonitorRef) ->
 add_to_local_table(Name, Pid, Meta, MonitorRef) ->
     mnesia:dirty_write(#syn_registry_table{
     mnesia:dirty_write(#syn_registry_table{