|
@@ -41,8 +41,8 @@
|
|
|
|
|
|
%% records
|
|
|
-record(state, {
|
|
|
- conflicting_mode = kill :: kill | send_message,
|
|
|
- message = undefined :: any()
|
|
|
+ netsplit_conflicting_process_callback_module = undefined :: atom(),
|
|
|
+ netsplit_conflicting_process_callback_function = undefined :: atom()
|
|
|
}).
|
|
|
|
|
|
%% include
|
|
@@ -75,20 +75,14 @@ init([]) ->
|
|
|
%% monitor mnesia events
|
|
|
mnesia:subscribe(system),
|
|
|
%% get options
|
|
|
- {ok, NetsplitSendMessageToProcess} = syn_utils:get_env_value(
|
|
|
- netsplit_send_message_to_process,
|
|
|
- syn_do_not_send_any_message_to_conflicting_process
|
|
|
+ {ok, [NetsplitConflictingProcessCallbackModule, NetsplitConflictingProcessCallbackFunction]} = syn_utils:get_env_value(
|
|
|
+ netsplit_conflicting_process_callback,
|
|
|
+ [undefined, undefined]
|
|
|
),
|
|
|
- %% get state params
|
|
|
- {ConflictingMode, Message} = case NetsplitSendMessageToProcess of
|
|
|
- syn_do_not_send_any_message_to_conflicting_process -> {kill, undefined};
|
|
|
- _ -> {send_message, NetsplitSendMessageToProcess}
|
|
|
-
|
|
|
- end,
|
|
|
%% build state
|
|
|
{ok, #state{
|
|
|
- conflicting_mode = ConflictingMode,
|
|
|
- message = Message
|
|
|
+ netsplit_conflicting_process_callback_module = NetsplitConflictingProcessCallbackModule,
|
|
|
+ netsplit_conflicting_process_callback_function = NetsplitConflictingProcessCallbackFunction
|
|
|
}}.
|
|
|
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
@@ -127,11 +121,11 @@ handle_cast(Msg, State) ->
|
|
|
{stop, Reason :: any(), #state{}}.
|
|
|
|
|
|
handle_info({mnesia_system_event, {inconsistent_database, Context, Node}}, #state{
|
|
|
- conflicting_mode = ConflictingMode,
|
|
|
- message = Message
|
|
|
+ netsplit_conflicting_process_callback_module = NetsplitConflictingProcessCallbackModule,
|
|
|
+ netsplit_conflicting_process_callback_function = NetsplitConflictingProcessCallbackFunction
|
|
|
} = State) ->
|
|
|
error_logger:warning_msg("MNESIA signalled an inconsistent database on node: ~p with context: ~p, initiating automerge~n", [Node, Context]),
|
|
|
- automerge(Node, ConflictingMode, Message),
|
|
|
+ automerge(Node, NetsplitConflictingProcessCallbackModule, NetsplitConflictingProcessCallbackFunction),
|
|
|
{noreply, State};
|
|
|
|
|
|
handle_info({mnesia_system_event, {mnesia_down, Node}}, State) when Node =/= node() ->
|
|
@@ -180,44 +174,44 @@ delete_pids_of_disconnected_node(Node) ->
|
|
|
lists:foreach(DelF, NodePids)
|
|
|
end).
|
|
|
|
|
|
--spec automerge(RemoteNode :: atom(), ConflictingMode :: kill | send_message, Message :: any()) -> ok.
|
|
|
-automerge(RemoteNode, ConflictingMode, Message) ->
|
|
|
+-spec automerge(RemoteNode :: atom(), CallbackModule :: atom(), CallbackFunction :: atom()) -> ok.
|
|
|
+automerge(RemoteNode, CallbackModule, CallbackFunction) ->
|
|
|
global:trans({{?MODULE, automerge}, self()},
|
|
|
fun() ->
|
|
|
error_logger:warning_msg("AUTOMERGE starting for remote node ~s (global lock is set)~n", [RemoteNode]),
|
|
|
- check_stitch(RemoteNode, ConflictingMode, Message),
|
|
|
+ check_stitch(RemoteNode, CallbackModule, CallbackFunction),
|
|
|
error_logger:warning_msg("AUTOMERGE done (global lock will be unset)~n")
|
|
|
end).
|
|
|
|
|
|
--spec check_stitch(RemoteNode :: atom(), ConflictingMode :: kill | send_message, Message :: any()) -> ok.
|
|
|
-check_stitch(RemoteNode, ConflictingMode, Message) ->
|
|
|
+-spec check_stitch(RemoteNode :: atom(), CallbackModule :: atom(), CallbackFunction :: atom()) -> ok.
|
|
|
+check_stitch(RemoteNode, CallbackModule, CallbackFunction) ->
|
|
|
case catch lists:member(RemoteNode, mnesia:system_info(running_db_nodes)) of
|
|
|
true ->
|
|
|
ok;
|
|
|
false ->
|
|
|
- stitch(RemoteNode, ConflictingMode, Message),
|
|
|
+ stitch(RemoteNode, CallbackModule, CallbackFunction),
|
|
|
ok;
|
|
|
Error ->
|
|
|
error_logger:error_msg("Could not check if node is stiched: ~p~n", [Error])
|
|
|
end.
|
|
|
|
|
|
--spec stitch(RemoteNode :: atom(), ConflictingMode :: kill | send_message, Message :: any()) ->
|
|
|
+-spec stitch(RemoteNode :: atom(), CallbackModule :: atom(), CallbackFunction :: atom()) ->
|
|
|
{'ok', any()} | {'error', any()}.
|
|
|
-stitch(RemoteNode, ConflictingMode, Message) ->
|
|
|
+stitch(RemoteNode, CallbackModule, CallbackFunction) ->
|
|
|
mnesia_controller:connect_nodes(
|
|
|
[RemoteNode],
|
|
|
fun(MergeF) ->
|
|
|
catch case MergeF([syn_processes_table]) of
|
|
|
{merged, _, _} = Res ->
|
|
|
- stitch_tab(RemoteNode, ConflictingMode, Message),
|
|
|
+ stitch_tab(RemoteNode, CallbackModule, CallbackFunction),
|
|
|
Res;
|
|
|
Other ->
|
|
|
Other
|
|
|
end
|
|
|
end).
|
|
|
|
|
|
--spec stitch_tab(RemoteNode :: atom(), ConflictingMode :: kill | send_message, Message :: any()) -> ok.
|
|
|
-stitch_tab(RemoteNode, ConflictingMode, Message) ->
|
|
|
+-spec stitch_tab(RemoteNode :: atom(), CallbackModule :: atom(), CallbackFunction :: atom()) -> ok.
|
|
|
+stitch_tab(RemoteNode, CallbackModule, CallbackFunction) ->
|
|
|
%% get remote processes info
|
|
|
RemoteProcessesInfo = rpc:call(RemoteNode, ?MODULE, get_processes_info_of_node, [RemoteNode]),
|
|
|
%% get local processes info
|
|
@@ -226,8 +220,8 @@ stitch_tab(RemoteNode, ConflictingMode, Message) ->
|
|
|
{LocalProcessesInfo1, RemoteProcessesInfo1} = purge_double_processes_from_local_node(
|
|
|
LocalProcessesInfo,
|
|
|
RemoteProcessesInfo,
|
|
|
- ConflictingMode,
|
|
|
- Message
|
|
|
+ CallbackModule,
|
|
|
+ CallbackFunction
|
|
|
),
|
|
|
%% write
|
|
|
write_remote_processes_to_local(RemoteNode, RemoteProcessesInfo1),
|
|
@@ -240,7 +234,7 @@ stitch_tab(RemoteNode, ConflictingMode, Message) ->
|
|
|
Message :: any()
|
|
|
) ->
|
|
|
{LocalProcessesInfo :: list(), RemoteProcessesInfo :: list()}.
|
|
|
-purge_double_processes_from_local_node(LocalProcessesInfo, RemoteProcessesInfo, ConflictingMode, Message) ->
|
|
|
+purge_double_processes_from_local_node(LocalProcessesInfo, RemoteProcessesInfo, CallbackModule, CallbackFunction) ->
|
|
|
%% create ETS table
|
|
|
Tab = ets:new(syn_automerge_doubles_table, [set]),
|
|
|
|
|
@@ -258,9 +252,9 @@ purge_double_processes_from_local_node(LocalProcessesInfo, RemoteProcessesInfo,
|
|
|
%% remove it from ETS
|
|
|
ets:delete(Tab, Key),
|
|
|
%% kill or send message
|
|
|
- case ConflictingMode of
|
|
|
- kill -> exit(LocalProcessPid, kill);
|
|
|
- send_message -> LocalProcessPid ! Message
|
|
|
+ case CallbackModule of
|
|
|
+ undefined -> exit(LocalProcessPid, kill);
|
|
|
+ _ -> spawn(fun() -> CallbackModule:CallbackFunction(Key, LocalProcessPid) end)
|
|
|
end
|
|
|
end
|
|
|
end,
|