Просмотр исходного кода

Purge doubles only after the mnesia merging has finished.

Roberto Ostinelli 9 лет назад
Родитель
Сommit
e5c6c8fe35
1 измененных файлов с 59 добавлено и 43 удалено
  1. 59 43
      src/syn_consistency.erl

+ 59 - 43
src/syn_consistency.erl

@@ -121,12 +121,9 @@ handle_cast(Msg, State) ->
     {noreply, #state{}, Timeout :: non_neg_integer()} |
     {stop, Reason :: any(), #state{}}.
 
-handle_info({mnesia_system_event, {inconsistent_database, Context, Node}}, #state{
-    conflicting_process_callback_module = ConflictingProcessCallbackModule,
-    conflicting_process_callback_function = ConflictingProcessCallbackFunction
-} = State) ->
+handle_info({mnesia_system_event, {inconsistent_database, Context, Node}}, State) ->
     error_logger:warning_msg("MNESIA signalled an inconsistent database on node: ~p with context: ~p, initiating automerge", [Node, Context]),
-    automerge(Node, ConflictingProcessCallbackModule, ConflictingProcessCallbackFunction),
+    automerge(Node),
     {noreply, State};
 
 handle_info({mnesia_system_event, {mnesia_down, Node}}, State) when Node =/= node() ->
@@ -138,6 +135,14 @@ handle_info({mnesia_system_event, _MnesiaEvent}, State) ->
     %% ignore mnesia event
     {noreply, State};
 
+handle_info({purge_double_processes, DoubleProcessesInfo}, #state{
+    conflicting_process_callback_module = ConflictingProcessCallbackModule,
+    conflicting_process_callback_function = ConflictingProcessCallbackFunction
+} = State) ->
+    error_logger:warning_msg("About to purge double processes after netsplit"),
+    purge_double_processes(ConflictingProcessCallbackModule, ConflictingProcessCallbackFunction, DoubleProcessesInfo),
+    {noreply, State};
+
 handle_info(Info, State) ->
     error_logger:warning_msg("Received an unknown info message: ~p", [Info]),
     {noreply, State}.
@@ -175,67 +180,63 @@ delete_pids_of_disconnected_node(Node) ->
         lists:foreach(DelF, NodePids)
     end).
 
--spec automerge(RemoteNode :: atom(), CallbackModule :: atom(), CallbackFunction :: atom()) -> ok.
-automerge(RemoteNode, CallbackModule, CallbackFunction) ->
+-spec automerge(RemoteNode :: atom()) -> ok.
+automerge(RemoteNode) ->
     global:trans({{?MODULE, automerge}, self()},
         fun() ->
             error_logger:warning_msg("AUTOMERGE starting for remote node ~s (global lock is set)", [RemoteNode]),
-            check_stitch(RemoteNode, CallbackModule, CallbackFunction),
-            error_logger:warning_msg("AUTOMERGE done (global lock is about to be unset)")
+            check_stitch(RemoteNode),
+            error_logger:warning_msg("AUTOMERGE done (global lock about to be unset)")
         end).
 
--spec check_stitch(RemoteNode :: atom(), CallbackModule :: atom(), CallbackFunction :: atom()) -> ok.
-check_stitch(RemoteNode, CallbackModule, CallbackFunction) ->
+-spec check_stitch(RemoteNode :: atom()) -> ok.
+check_stitch(RemoteNode) ->
     case catch lists:member(RemoteNode, mnesia:system_info(running_db_nodes)) of
         true ->
             ok;
         false ->
-            catch stitch(RemoteNode, CallbackModule, CallbackFunction),
+            catch stitch(RemoteNode),
             ok;
         Error ->
-            error_logger:error_msg("Could not check if node is stiched: ~p", [Error])
+            error_logger:error_msg("Could not check if node is stiched: ~p", [Error]),
+            ok
     end.
 
--spec stitch(RemoteNode :: atom(), CallbackModule :: atom(), CallbackFunction :: atom()) ->
-    {'ok', any()} | {'error', any()}.
-stitch(RemoteNode, CallbackModule, CallbackFunction) ->
+-spec stitch(RemoteNode :: atom()) -> {ok, any()} | {error, any()}.
+stitch(RemoteNode) ->
     mnesia_controller:connect_nodes(
         [RemoteNode],
         fun(MergeF) ->
             catch case MergeF([syn_processes_table]) of
                 {merged, _, _} = Res ->
-                    stitch_tab(RemoteNode, CallbackModule, CallbackFunction),
+                    stitch_tab(RemoteNode),
                     Res;
                 Other ->
                     Other
             end
         end).
 
--spec stitch_tab(RemoteNode :: atom(), CallbackModule :: atom(), CallbackFunction :: atom()) -> ok.
-stitch_tab(RemoteNode, CallbackModule, CallbackFunction) ->
+-spec stitch_tab(RemoteNode :: atom()) -> ok.
+stitch_tab(RemoteNode) ->
     %% get remote processes info
     RemoteProcessesInfo = rpc:call(RemoteNode, ?MODULE, get_processes_info_of_node, [RemoteNode]),
     %% get local processes info
     LocalProcessesInfo = get_processes_info_of_node(node()),
     %% purge doubles (if any)
-    {LocalProcessesInfo1, RemoteProcessesInfo1} = purge_double_processes_from_local_node(
+    {LocalProcessesInfo1, RemoteProcessesInfo1} = purge_double_processes_from_local_mnesia(
         LocalProcessesInfo,
-        RemoteProcessesInfo,
-        CallbackModule,
-        CallbackFunction
+        RemoteProcessesInfo
     ),
     %% write
     write_remote_processes_to_local(RemoteNode, RemoteProcessesInfo1),
     write_local_processes_to_remote(RemoteNode, LocalProcessesInfo1).
 
--spec purge_double_processes_from_local_node(
+-spec purge_double_processes_from_local_mnesia(
     LocalProcessesInfo :: list(),
-    RemoteProcessesInfo :: list(),
-    ConflictingMode :: kill | send_message,
-    Message :: any()
+    RemoteProcessesInfo :: list()
 ) ->
     {LocalProcessesInfo :: list(), RemoteProcessesInfo :: list()}.
-purge_double_processes_from_local_node(LocalProcessesInfo, RemoteProcessesInfo, CallbackModule, CallbackFunction) ->
+purge_double_processes_from_local_mnesia(LocalProcessesInfo, RemoteProcessesInfo) ->
     %% create ETS table
     Tab = ets:new(syn_automerge_doubles_table, [set]),
 
@@ -243,28 +244,22 @@ purge_double_processes_from_local_node(LocalProcessesInfo, RemoteProcessesInfo,
     ets:insert(Tab, LocalProcessesInfo),
 
     %% find doubles
-    F = fun({Key, _RemoteProcessPid, _RemoteProcessMeta}) ->
+    F = fun({Key, _RemoteProcessPid, _RemoteProcessMeta}, Acc) ->
         case ets:lookup(Tab, Key) of
-            [] -> ok;
+            [] -> Acc;
             [{Key, LocalProcessPid, LocalProcessMeta}] ->
-                %% remove it from local mnesia table
+                %% found a double process, remove it from local mnesia table
                 mnesia:dirty_delete(syn_processes_table, Key),
                 %% remove it from ETS
                 ets:delete(Tab, Key),
-                %% kill or send message
-                case CallbackModule of
-                    undefined ->
-                        error_logger:warning_msg("Found a double process for ~s, killing it on local node ~p", [Key, node()]),
-                        exit(LocalProcessPid, kill);
-                    _ -> spawn(
-                        fun() ->
-                            error_logger:warning_msg("Found a double process for ~s, about to trigger callback on local node ~p", [Key, node()]),
-                            CallbackModule:CallbackFunction(Key, LocalProcessPid, LocalProcessMeta)
-                        end)
-                end
+                %% add it to acc
+                [{Key, LocalProcessPid, LocalProcessMeta} | Acc]
         end
     end,
-    lists:foreach(F, RemoteProcessesInfo),
+    DoubleProcessesInfo = lists:foldl(F, [], RemoteProcessesInfo),
+
+    %% send to syn_consistency gen_server to handle double processes once merging is done
+    ?MODULE ! {purge_double_processes, DoubleProcessesInfo},
 
     %% compute local processes without doubles
     LocalProcessesInfo1 = ets:tab2list(Tab),
@@ -301,3 +296,24 @@ write_processes_info_to_node(Node, ProcessesInfo) ->
         })
     end,
     lists:foreach(FWrite, ProcessesInfo).
+
+-spec purge_double_processes(
+    ConflictingProcessCallbackModule :: atom(),
+    ConflictingProcessCallbackFunction :: atom(),
+    DoubleProcessesInfo :: list()
+) -> ok.
+purge_double_processes(undefined, _, DoubleProcessesInfo) ->
+    F = fun({Key, LocalProcessPid, _LocalProcessMeta}) ->
+        error_logger:warning_msg("Found a double process for ~s, killing it on local node ~p", [Key, node()]),
+        exit(LocalProcessPid, kill)
+    end,
+    lists:foreach(F, DoubleProcessesInfo);
+purge_double_processes(ConflictingProcessCallbackModule, ConflictingProcessCallbackFunction, DoubleProcessesInfo) ->
+    F = fun({Key, LocalProcessPid, LocalProcessMeta}) ->
+        spawn(
+            fun() ->
+                error_logger:warning_msg("Found a double process for ~s, about to trigger callback on local node ~p", [Key, node()]),
+                ConflictingProcessCallbackModule:ConflictingProcessCallbackFunction(Key, LocalProcessPid, LocalProcessMeta)
+            end)
+    end,
+    lists:foreach(F, DoubleProcessesInfo).