Просмотр исходного кода

Generalize conflict naming resolution in case of race conrditions.

Roberto Ostinelli 5 лет назад
Родитель
Сommit
1bab0806eb
1 измененных файлов с 137 добавлено и 140 удалено
  1. 137 140
      src/syn_registry.erl

+ 137 - 140
src/syn_registry.erl

@@ -34,8 +34,9 @@
 -export([count/0, count/1]).
 
 %% sync API
--export([sync_register/4, sync_unregister/2]).
 -export([sync_get_local_registry_tuples/1]).
+-export([raise_inconsistent_name_data/5]).
+-export([add_remote_to_local_table/3]).
 -export([add_to_local_table/4]).
 -export([remove_from_local_table/1]).
 
@@ -101,19 +102,21 @@ count(Node) ->
     RegistryTuples = get_registry_tuples_for_node(Node),
     length(RegistryTuples).
 
--spec sync_register(RemoteNode :: node(), Name :: any(), Pid :: pid(), Meta :: any()) -> ok.
-sync_register(RemoteNode, Name, Pid, Meta) ->
-    gen_server:cast({?MODULE, RemoteNode}, {sync_register, Name, Pid, Meta}).
-
--spec sync_unregister(RemoteNode :: node(), Name :: any()) -> ok.
-sync_unregister(RemoteNode, Name) ->
-    gen_server:cast({?MODULE, RemoteNode}, {sync_unregister, Name}).
-
 -spec sync_get_local_registry_tuples(FromNode :: node()) -> [syn_registry_tuple()].
 sync_get_local_registry_tuples(FromNode) ->
-    error_logger:info_msg("Syn(~p): Received request of local registry tuples from remote node: ~p~n", [node(), FromNode]),
+    error_logger:info_msg("Syn(~p): Received request of local registry tuples from remote node ~p~n", [node(), FromNode]),
     get_registry_tuples_for_node(node()).
 
+-spec raise_inconsistent_name_data(
+    RemoteNode :: node(),
+    OriginatingNode :: node(),
+    Name :: any(),
+    Pid :: pid(),
+    Meta :: any()
+) -> ok.
+raise_inconsistent_name_data(RemoteNode, OriginatingNode, Name, Pid, Meta) ->
+    gen_server:cast({?MODULE, RemoteNode}, {inconsistent_name_data, OriginatingNode, Name, Pid, Meta}).
+
 %% ===================================================================
 %% Callbacks
 %% ===================================================================
@@ -199,16 +202,40 @@ handle_call(Request, From, State) ->
     {noreply, #state{}, Timeout :: non_neg_integer()} |
     {stop, Reason :: any(), #state{}}.
 
-handle_cast({sync_register, Name, Pid, Meta}, State) ->
-    %% add to table
-    add_remote_to_local_table(Name, Pid, Meta, State),
-    %% return
-    {noreply, State};
-
-handle_cast({sync_unregister, Name}, State) ->
-    %% remove from table
-    remove_from_local_table(Name),
-    %% return
+handle_cast({inconsistent_name_data, OriginatingNode, Name, RemotePid, RemoteMeta}, State) ->
+    error_logger:warning_msg("Syn(~p): Inconsistent name ~p signalled from node ~p~n", [node(), Name, OriginatingNode]),
+    global:trans({{?MODULE, auto_merge_registry}, self()},
+        fun() ->
+            case find_process_entry_by_name(Name) of
+                undefined ->
+                    error_logger:info_msg("Syn(~p): No local data for name ~p, skipping~n", [node(), Name]);
+                Entry ->
+                    error_logger:info_msg(
+                        "Syn(~p): REGISTRY NAME MERGE ----> Initiating for originating node ~p~n",
+                        [node(), OriginatingNode]
+                    ),
+                    LocalPid = Entry#syn_registry_table.pid,
+                    LocalMeta = Entry#syn_registry_table.meta,
+                    resolve_conflict(Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta},
+                        %% keep local
+                        fun() ->
+                            ok = rpc:call(OriginatingNode, syn_registry, add_to_local_table, [Name, LocalPid, LocalMeta, undefined])
+                        end,
+                        %% keep remote
+                        fun() ->
+                            add_to_local_table(Name, RemotePid, RemoteMeta, undefined)
+                        end,
+                        State
+                    ),
+                    %% exit
+                    error_logger:info_msg(
+                        "Syn(~p): REGISTRY NAME MERGE ----> Done for originating node ~p~n",
+                        [node(), OriginatingNode]
+                    )
+            end
+        end
+    ),
+    %% resume
     {noreply, State};
 
 handle_cast(Msg, State) ->
@@ -247,19 +274,52 @@ handle_info({'DOWN', _MonitorRef, process, Pid, Reason}, State) ->
     {noreply, State};
 
 handle_info({nodeup, RemoteNode}, State) ->
-    error_logger:info_msg("Syn(~p): Node ~p has joined the cluster~n", [node(), RemoteNode]),
+    error_logger:warning_msg("Syn(~p): Node ~p has joined the cluster~n", [node(), RemoteNode]),
     global:trans({{?MODULE, auto_merge_registry}, self()},
         fun() ->
-            error_logger:warning_msg("Syn(~p): REGISTRY AUTOMERGE ----> Initiating for remote node ~p~n", [node(), RemoteNode]),
+            error_logger:info_msg("Syn(~p): REGISTRY AUTOMERGE ----> Initiating for remote node ~p~n", [node(), RemoteNode]),
             %% get registry tuples from remote node
             RegistryTuples = rpc:call(RemoteNode, ?MODULE, sync_get_local_registry_tuples, [node()]),
-            error_logger:warning_msg(
-                "Syn(~p): Received ~p registry entrie(s) from remote node ~p, writing to local~n",
+            error_logger:info_msg(
+                "Syn(~p): Received ~p registry tuple(s) from remote node ~p, writing to local~n",
                 [node(), length(RegistryTuples), RemoteNode]
             ),
-            sync_registry_tuples(RemoteNode, RegistryTuples, State),
+            %% ensure that registry doesn't have any joining node's entries (here again for race conditions)
+            purge_registry_entries_for_remote_node(RemoteNode),
+            %% loop
+            F = fun({Name, RemotePid, RemoteMeta}) ->
+                %% check if same name is registered
+                case find_process_entry_by_name(Name) of
+                    undefined ->
+                        %% no conflict
+                        register_on_node(Name, RemotePid, RemoteMeta);
+
+                    Entry ->
+                        LocalPid = Entry#syn_registry_table.pid,
+                        LocalMeta = Entry#syn_registry_table.meta,
+
+                        error_logger:warning_msg(
+                            "Syn(~p): Conflicting name in auto merge for: ~p, processes are ~p, ~p~n",
+                            [node(), Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta}]
+                        ),
+
+                        resolve_conflict(Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta},
+                            %% keep local
+                            fun() ->
+                                ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name])
+                            end,
+                            %% keep remote
+                            fun() ->
+                                add_to_local_table(Name, RemotePid, RemoteMeta, undefined)
+                            end,
+                            State
+                        )
+                end
+            end,
+            %% add to table
+            lists:foreach(F, RegistryTuples),
             %% exit
-            error_logger:warning_msg("Syn(~p): REGISTRY AUTOMERGE <---- Done for remote node ~p~n", [node(), RemoteNode])
+            error_logger:info_msg("Syn(~p): REGISTRY AUTOMERGE <---- Done for remote node ~p~n", [node(), RemoteNode])
         end
     ),
     %% resume
@@ -295,16 +355,20 @@ code_change(_OldVsn, State, _Extra) ->
 -spec multicast_register(Name :: any(), Pid :: pid(), Meta :: any()) -> pid().
 multicast_register(Name, Pid, Meta) ->
     spawn_link(fun() ->
-        lists:foreach(fun(RemoteNode) ->
-            sync_register(RemoteNode, Name, Pid, Meta)
-        end, nodes())
+        rpc:eval_everywhere(nodes(), ?MODULE, add_remote_to_local_table, [Name, Pid, Meta])
     end).
 
 -spec multicast_unregister(Name :: any()) -> pid().
 multicast_unregister(Name) ->
     spawn_link(fun() ->
+        rpc:eval_everywhere(nodes(), ?MODULE, remove_from_local_table, [Name])
+    end).
+
+-spec multicast_inconsistent_name_data(Name :: any(), Pid :: pid(), Meta :: any()) -> pid().
+multicast_inconsistent_name_data(Name, Pid, Meta) ->
+    spawn_link(fun() ->
         lists:foreach(fun(RemoteNode) ->
-            sync_unregister(RemoteNode, Name)
+            raise_inconsistent_name_data(RemoteNode, node(), Name, Pid, Meta)
         end, nodes())
     end).
 
@@ -343,48 +407,54 @@ add_to_local_table(Name, Pid, Meta, MonitorRef) ->
         monitor_ref = MonitorRef
     }).
 
--spec add_remote_to_local_table(Name :: any(), RemotePid :: pid(), RemoteMeta :: any(), State :: #state{}) -> ok.
-add_remote_to_local_table(Name, RemotePid, RemoteMeta, State) ->
+-spec add_remote_to_local_table(Name :: any(), RemotePid :: pid(), RemoteMeta :: any()) -> ok.
+add_remote_to_local_table(Name, RemotePid, RemoteMeta) ->
     %% check for conflicts
     case find_process_entry_by_name(Name) of
         undefined ->
             %% no conflict
             add_to_local_table(Name, RemotePid, RemoteMeta, undefined);
-        _Entry ->
-            %% conflict found, enter resolution
-            global:trans({{?MODULE, race_condition_registry}, self()},
-                fun() ->
-                    %% get entry (for first node entering lock it exists, for subsequent nodes too
-                    %% because of data written during resolve)
-                    Entry = find_process_entry_by_name(Name),
-                    case Entry#syn_registry_table.pid =:= RemotePid of
-                        true ->
-                            error_logger:info_msg(
-                                "Syn(~p): Conflicting name from multicast ~p already resolved, skipping~n",
-                                [node(), Name]
-                            );
-                        false ->
-                            LocalPid = Entry#syn_registry_table.pid,
-                            LocalMeta = Entry#syn_registry_table.meta,
-
-                            error_logger:warning_msg(
-                                "Syn(~p): Conflicting name from multicast found for: ~p, processes are ~p, ~p~n",
-                                [node(), Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta}]
-                            ),
-
-                            resolve_conflict(Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta},
-                                fun() ->
-                                    RemoteNode = node(RemotePid),
-                                    ok = rpc:call(RemoteNode, syn_registry, add_to_local_table, [Name, LocalPid, LocalMeta, undefined])
-                                end,
-                                fun() ->
-                                    add_to_local_table(Name, RemotePid, RemoteMeta, undefined)
-                                end,
-                                State
-                            )
-                    end
-                end
-            )
+        Entry ->
+            %% conflict found, raise resolution
+            multicast_inconsistent_name_data(Name, Entry#syn_registry_table.pid, Entry#syn_registry_table.meta)
+    end.
+
+-spec remove_from_local_table(Name :: any()) -> ok.
+remove_from_local_table(Name) ->
+    mnesia:dirty_delete(syn_registry_table, Name).
+
+-spec find_processes_entry_by_pid(Pid :: pid()) -> Entries :: [#syn_registry_table{}].
+find_processes_entry_by_pid(Pid) when is_pid(Pid) ->
+    mnesia:dirty_index_read(syn_registry_table, Pid, #syn_registry_table.pid).
+
+-spec find_process_entry_by_name(Name :: any()) -> Entry :: #syn_registry_table{} | undefined.
+find_process_entry_by_name(Name) ->
+    case mnesia:dirty_read(syn_registry_table, Name) of
+        [Entry] -> Entry;
+        _ -> undefined
+    end.
+
+-spec get_registry_tuples_for_node(Node :: node()) -> [syn_registry_tuple()].
+get_registry_tuples_for_node(Node) ->
+    %% build match specs
+    MatchHead = #syn_registry_table{name = '$1', pid = '$2', node = '$3', meta = '$4', _ = '_'},
+    Guard = {'=:=', '$3', Node},
+    RegistryTupleFormat = {{'$1', '$2', '$4'}},
+    %% select
+    mnesia:dirty_select(syn_registry_table, [{MatchHead, [Guard], [RegistryTupleFormat]}]).
+
+-spec handle_process_down(Name :: any(), Pid :: pid(), Meta :: any(), Reason :: any(), #state{}) -> ok.
+handle_process_down(Name, Pid, Meta, Reason, #state{
+    custom_event_handler = CustomEventHandler
+}) ->
+    case Name of
+        undefined ->
+            error_logger:warning_msg(
+                "Syn(~p): Received a DOWN message from an unregistered process ~p with reason: ~p~n",
+                [node(), Pid, Reason]
+            );
+        _ ->
+            syn_event_handler:do_on_process_exit(Name, Pid, Meta, Reason, CustomEventHandler)
     end.
 
 -spec resolve_conflict(
@@ -439,79 +509,6 @@ resolve_conflict(Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta}, KeepLocal
             )
     end.
 
--spec remove_from_local_table(Name :: any()) -> ok.
-remove_from_local_table(Name) ->
-    mnesia:dirty_delete(syn_registry_table, Name).
-
--spec find_processes_entry_by_pid(Pid :: pid()) -> Entries :: [#syn_registry_table{}].
-find_processes_entry_by_pid(Pid) when is_pid(Pid) ->
-    mnesia:dirty_index_read(syn_registry_table, Pid, #syn_registry_table.pid).
-
--spec find_process_entry_by_name(Name :: any()) -> Entry :: #syn_registry_table{} | undefined.
-find_process_entry_by_name(Name) ->
-    case mnesia:dirty_read(syn_registry_table, Name) of
-        [Entry] -> Entry;
-        _ -> undefined
-    end.
-
--spec get_registry_tuples_for_node(Node :: node()) -> [syn_registry_tuple()].
-get_registry_tuples_for_node(Node) ->
-    %% build match specs
-    MatchHead = #syn_registry_table{name = '$1', pid = '$2', node = '$3', meta = '$4', _ = '_'},
-    Guard = {'=:=', '$3', Node},
-    RegistryTupleFormat = {{'$1', '$2', '$4'}},
-    %% select
-    mnesia:dirty_select(syn_registry_table, [{MatchHead, [Guard], [RegistryTupleFormat]}]).
-
--spec handle_process_down(Name :: any(), Pid :: pid(), Meta :: any(), Reason :: any(), #state{}) -> ok.
-handle_process_down(Name, Pid, Meta, Reason, #state{
-    custom_event_handler = CustomEventHandler
-}) ->
-    case Name of
-        undefined ->
-            error_logger:warning_msg(
-                "Syn(~p): Received a DOWN message from an unregistered process ~p with reason: ~p~n",
-                [node(), Pid, Reason]
-            );
-        _ ->
-            syn_event_handler:do_on_process_exit(Name, Pid, Meta, Reason, CustomEventHandler)
-    end.
-
--spec sync_registry_tuples(RemoteNode :: node(), RegistryTuples :: [syn_registry_tuple()], #state{}) -> ok.
-sync_registry_tuples(RemoteNode, RegistryTuples, State) ->
-    %% ensure that registry doesn't have any joining node's entries (here again for race conditions)
-    purge_registry_entries_for_remote_node(RemoteNode),
-    %% loop
-    F = fun({Name, RemotePid, RemoteMeta}) ->
-        %% check if same name is registered
-        case find_process_entry_by_name(Name) of
-            undefined ->
-                %% no conflict
-                register_on_node(Name, RemotePid, RemoteMeta);
-
-            Entry ->
-                LocalPid = Entry#syn_registry_table.pid,
-                LocalMeta = Entry#syn_registry_table.meta,
-
-                error_logger:warning_msg(
-                    "Syn(~p): Conflicting name in auto merge for: ~p, processes are ~p, ~p~n",
-                    [node(), Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta}]
-                ),
-
-                resolve_conflict(Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta},
-                    fun() ->
-                        ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name])
-                    end,
-                    fun() ->
-                        add_to_local_table(Name, RemotePid, RemoteMeta, undefined)
-                    end,
-                    State
-                )
-        end
-    end,
-    %% add to table
-    lists:foreach(F, RegistryTuples).
-
 -spec purge_registry_entries_for_remote_node(Node :: atom()) -> ok.
 purge_registry_entries_for_remote_node(Node) when Node =/= node() ->
     %% NB: no demonitoring is done, hence why this needs to run for a remote node