|
@@ -35,6 +35,10 @@
|
|
%% gen_server callbacks
|
|
%% gen_server callbacks
|
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
|
|
|
|
|
|
|
+%% internal
|
|
|
|
+-export([get_processes_info_of_node/1]).
|
|
|
|
+-export([write_processes_info_to_node/2]).
|
|
|
|
+
|
|
%% records
|
|
%% records
|
|
-record(state, {}).
|
|
-record(state, {}).
|
|
|
|
|
|
@@ -63,6 +67,8 @@ start_link() ->
|
|
ignore |
|
|
ignore |
|
|
{stop, Reason :: any()}.
|
|
{stop, Reason :: any()}.
|
|
init([]) ->
|
|
init([]) ->
|
|
|
|
+ %% trap linked processes signal
|
|
|
|
+ process_flag(trap_exit, true),
|
|
%% monitor mnesia events
|
|
%% monitor mnesia events
|
|
mnesia:subscribe(system),
|
|
mnesia:subscribe(system),
|
|
{ok, #state{}}.
|
|
{ok, #state{}}.
|
|
@@ -79,7 +85,7 @@ init([]) ->
|
|
{stop, Reason :: any(), #state{}}.
|
|
{stop, Reason :: any(), #state{}}.
|
|
|
|
|
|
handle_call(Request, From, State) ->
|
|
handle_call(Request, From, State) ->
|
|
- error_logger:warning_msg("Received from ~p an unknown call message: ~p", [Request, From]),
|
|
|
|
|
|
+ error_logger:warning_msg("Received from ~p an unknown call message: ~p~n", [Request, From]),
|
|
{reply, undefined, State}.
|
|
{reply, undefined, State}.
|
|
|
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
@@ -91,7 +97,7 @@ handle_call(Request, From, State) ->
|
|
{stop, Reason :: any(), #state{}}.
|
|
{stop, Reason :: any(), #state{}}.
|
|
|
|
|
|
handle_cast(Msg, State) ->
|
|
handle_cast(Msg, State) ->
|
|
- error_logger:warning_msg("Received an unknown cast message: ~p", [Msg]),
|
|
|
|
|
|
+ error_logger:warning_msg("Received an unknown cast message: ~p~n", [Msg]),
|
|
{noreply, State}.
|
|
{noreply, State}.
|
|
|
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
@@ -103,13 +109,13 @@ handle_cast(Msg, State) ->
|
|
{stop, Reason :: any(), #state{}}.
|
|
{stop, Reason :: any(), #state{}}.
|
|
|
|
|
|
handle_info({mnesia_system_event, {inconsistent_database, Context, Node}}, State) ->
|
|
handle_info({mnesia_system_event, {inconsistent_database, Context, Node}}, State) ->
|
|
- error_logger:warning_msg("MNESIA signalled an inconsistent database on node: ~p with context: ~p, initiating automerge", [Node, Context]),
|
|
|
|
- %% automerge(Node),
|
|
|
|
|
|
+ error_logger:warning_msg("MNESIA signalled an inconsistent database on node: ~p with context: ~p, initiating automerge~n", [Node, Context]),
|
|
|
|
+ automerge(Node),
|
|
{noreply, State};
|
|
{noreply, State};
|
|
|
|
|
|
handle_info({mnesia_system_event, {mnesia_down, Node}}, State) when Node =/= node() ->
|
|
handle_info({mnesia_system_event, {mnesia_down, Node}}, State) when Node =/= node() ->
|
|
- error_logger:warning_msg("Received a MNESIA down event, removing all pids of node ~p", [Node]),
|
|
|
|
- %% delete_pids_of_disconnected_node(Node),
|
|
|
|
|
|
+ error_logger:warning_msg("Received a MNESIA down event, removing all pids of node ~p~n", [Node]),
|
|
|
|
+ delete_pids_of_disconnected_node(Node),
|
|
{noreply, State};
|
|
{noreply, State};
|
|
|
|
|
|
handle_info({mnesia_system_event, _MnesiaEvent}, State) ->
|
|
handle_info({mnesia_system_event, _MnesiaEvent}, State) ->
|
|
@@ -117,7 +123,7 @@ handle_info({mnesia_system_event, _MnesiaEvent}, State) ->
|
|
{noreply, State};
|
|
{noreply, State};
|
|
|
|
|
|
handle_info(Info, State) ->
|
|
handle_info(Info, State) ->
|
|
- error_logger:warning_msg("Received an unknown info message: ~p", [Info]),
|
|
|
|
|
|
+ error_logger:warning_msg("Received an unknown info message: ~p~n", [Info]),
|
|
{noreply, State}.
|
|
{noreply, State}.
|
|
|
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
@@ -125,7 +131,7 @@ handle_info(Info, State) ->
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
-spec terminate(Reason :: any(), #state{}) -> terminated.
|
|
-spec terminate(Reason :: any(), #state{}) -> terminated.
|
|
terminate(Reason, _State) ->
|
|
terminate(Reason, _State) ->
|
|
- error_logger:info_msg("Terminating syn with reason: ~p", [Reason]),
|
|
|
|
|
|
+ error_logger:info_msg("Terminating syn with reason: ~p~n", [Reason]),
|
|
terminated.
|
|
terminated.
|
|
|
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
@@ -134,3 +140,126 @@ terminate(Reason, _State) ->
|
|
-spec code_change(OldVsn :: any(), #state{}, Extra :: any()) -> {ok, #state{}}.
|
|
-spec code_change(OldVsn :: any(), #state{}, Extra :: any()) -> {ok, #state{}}.
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
{ok, State}.
|
|
{ok, State}.
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+%% ===================================================================
|
|
|
|
+%% Internal
|
|
|
|
+%% ===================================================================
|
|
|
|
+-spec delete_pids_of_disconnected_node(Node :: atom()) -> pid().
|
|
|
|
+delete_pids_of_disconnected_node(Node) ->
|
|
|
|
+ %% don't lock gen server
|
|
|
|
+ spawn(fun() ->
|
|
|
|
+ %% build match specs
|
|
|
|
+ MatchHead = #syn_processes_table{key = '$1', node = '$2', _ = '_'},
|
|
|
|
+ Guard = {'=:=', '$2', Node},
|
|
|
|
+ IdFormat = '$1',
|
|
|
|
+ %% delete
|
|
|
|
+ DelF = fun(Id) -> mnesia:dirty_delete({syn_processes_table, Id}) end,
|
|
|
|
+ NodePids = mnesia:dirty_select(syn_processes_table, [{MatchHead, [Guard], [IdFormat]}]),
|
|
|
|
+ lists:foreach(DelF, NodePids)
|
|
|
|
+ end).
|
|
|
|
+
|
|
|
|
+-spec automerge(RemoteNode :: atom()) -> ok.
|
|
|
|
+automerge(RemoteNode) ->
|
|
|
|
+ global:trans({{?MODULE, automerge}, self()},
|
|
|
|
+ fun() ->
|
|
|
|
+ error_logger:warning_msg("AUTOMERGE starting for remote node ~s (global lock is set)~n", [RemoteNode]),
|
|
|
|
+ check_stitch(RemoteNode),
|
|
|
|
+ error_logger:warning_msg("AUTOMERGE done (global lock will be unset)~n")
|
|
|
|
+ end).
|
|
|
|
+
|
|
|
|
+-spec check_stitch(RemoteNode :: atom()) -> ok.
|
|
|
|
+check_stitch(RemoteNode) ->
|
|
|
|
+ case lists:member(RemoteNode, mnesia:system_info(running_db_nodes)) of
|
|
|
|
+ true ->
|
|
|
|
+ ok;
|
|
|
|
+ false ->
|
|
|
|
+ stitch(RemoteNode),
|
|
|
|
+ ok
|
|
|
|
+ end.
|
|
|
|
+
|
|
|
|
+-spec stitch(RemoteNode :: atom()) -> {'ok', any()} | {'error', any()}.
|
|
|
|
+stitch(RemoteNode) ->
|
|
|
|
+ mnesia_controller:connect_nodes(
|
|
|
|
+ [RemoteNode],
|
|
|
|
+ fun(MergeF) ->
|
|
|
|
+ case MergeF([syn_processes_table]) of
|
|
|
|
+ {merged, _, _} = Res ->
|
|
|
|
+ stitch_tab(RemoteNode),
|
|
|
|
+ Res;
|
|
|
|
+ Other ->
|
|
|
|
+ Other
|
|
|
|
+ end
|
|
|
|
+ end).
|
|
|
|
+
|
|
|
|
+-spec stitch_tab(RemoteNode :: atom()) -> ok.
|
|
|
|
+stitch_tab(RemoteNode) ->
|
|
|
|
+ %% get remote processes info
|
|
|
|
+ RemoteProcessesInfo = rpc:call(RemoteNode, ?MODULE, get_processes_info_of_node, [RemoteNode]),
|
|
|
|
+ %% get local processes info
|
|
|
|
+ LocalProcessesInfo = get_processes_info_of_node(node()),
|
|
|
|
+ %% purge doubles (if any)
|
|
|
|
+ {LocalProcessesInfo1, RemoteProcessesInfo1} = purge_double_processes_from_local_node(LocalProcessesInfo, RemoteProcessesInfo),
|
|
|
|
+ %% write
|
|
|
|
+ write_remote_processes_to_local(RemoteNode, RemoteProcessesInfo1),
|
|
|
|
+ write_local_processes_to_remote(RemoteNode, LocalProcessesInfo1).
|
|
|
|
+
|
|
|
|
+-spec purge_double_processes_from_local_node(LocalProcessesInfo :: list(), RemoteProcessesInfo :: list()) ->
|
|
|
|
+ {LocalProcessesInfo :: list(), RemoteProcessesInfo :: list()}.
|
|
|
|
+purge_double_processes_from_local_node(LocalProcessesInfo, RemoteProcessesInfo) ->
|
|
|
|
+ %% create ETS table
|
|
|
|
+ Tab = ets:new(syn_automerge_doubles_table, [set]),
|
|
|
|
+
|
|
|
|
+ %% insert local processes info
|
|
|
|
+ ets:insert(Tab, LocalProcessesInfo),
|
|
|
|
+
|
|
|
|
+ %% find doubles
|
|
|
|
+ F = fun({Key, _RemoteProcessPid}) ->
|
|
|
|
+ case ets:lookup(Tab, Key) of
|
|
|
|
+ [] -> ok;
|
|
|
|
+ [{Key, LocalProcessPid}] ->
|
|
|
|
+ error_logger:warning_msg("Found a double process for ~s, killing it on local node~n", [Key]),
|
|
|
|
+ %% remove it from local mnesia table
|
|
|
|
+ mnesia:dirty_delete(syn_processes_table, Key),
|
|
|
|
+ %% remove it from ETS
|
|
|
|
+ ets:delete(Tab, Key),
|
|
|
|
+ %% kill the process
|
|
|
|
+ exit(LocalProcessPid, kill)
|
|
|
|
+ end
|
|
|
|
+ end,
|
|
|
|
+ lists:foreach(F, RemoteProcessesInfo),
|
|
|
|
+
|
|
|
|
+ %% compute local processes without doubles
|
|
|
|
+ LocalProcessesInfo1 = ets:tab2list(Tab),
|
|
|
|
+ %% delete ETS table
|
|
|
|
+ ets:delete(Tab),
|
|
|
|
+ %% return
|
|
|
|
+ {LocalProcessesInfo1, RemoteProcessesInfo}.
|
|
|
|
+
|
|
|
|
+-spec write_remote_processes_to_local(RemoteNode :: atom(), RemoteProcessesInfo :: list()) -> ok.
|
|
|
|
+write_remote_processes_to_local(RemoteNode, RemoteProcessesInfo) ->
|
|
|
|
+ write_processes_info_to_node(RemoteNode, RemoteProcessesInfo).
|
|
|
|
+
|
|
|
|
+-spec write_local_processes_to_remote(RemoteNode :: atom(), LocalProcessesInfo :: list()) -> ok.
|
|
|
|
+write_local_processes_to_remote(RemoteNode, LocalProcessesInfo) ->
|
|
|
|
+ ok = rpc:call(RemoteNode, ?MODULE, write_processes_info_to_node, [node(), LocalProcessesInfo]).
|
|
|
|
+
|
|
|
|
+-spec get_processes_info_of_node(Node :: atom()) -> list().
|
|
|
|
+get_processes_info_of_node(Node) ->
|
|
|
|
+ %% build match specs
|
|
|
|
+ MatchHead = #syn_processes_table{key = '$1', pid = '$2', node = '$3'},
|
|
|
|
+ Guard = {'=:=', '$3', Node},
|
|
|
|
+ ProcessInfoFormat = {{'$1', '$2'}},
|
|
|
|
+ %% select
|
|
|
|
+ mnesia:dirty_select(syn_processes_table, [{MatchHead, [Guard], [ProcessInfoFormat]}]).
|
|
|
|
+
|
|
|
|
+-spec write_processes_info_to_node(Node :: atom(), ProcessesInfo :: list()) -> ok.
|
|
|
|
+write_processes_info_to_node(Node, ProcessesInfo) ->
|
|
|
|
+ FWrite = fun({Key, ProcessPid}) ->
|
|
|
|
+ mnesia:dirty_write(#syn_processes_table{
|
|
|
|
+ key = Key,
|
|
|
|
+ pid = ProcessPid,
|
|
|
|
+ node = Node
|
|
|
|
+ })
|
|
|
|
+ end,
|
|
|
|
+ lists:foreach(FWrite, ProcessesInfo).
|