123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190 |
- -module(syn_consistency).
- -behaviour(gen_server).
- -export([start_link/0]).
- -export([resume_local_syn_registry/0]).
- -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
- -record(state, {}).
- -include("syn_records.hrl").
- -spec start_link() -> {ok, pid()} | {error, any()}.
- start_link() ->
- Options = [],
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
- -spec resume_local_syn_registry() -> ok.
- resume_local_syn_registry() ->
-
- sys:resume(syn_registry).
- -spec init([]) ->
- {ok, #state{}} |
- {ok, #state{}, Timeout :: non_neg_integer()} |
- ignore |
- {stop, Reason :: any()}.
- init([]) ->
-
- ok = net_kernel:monitor_nodes(true),
-
- case mnesia:wait_for_tables([syn_registry_table], 10000) of
- ok ->
- {ok, #state{}};
- Reason ->
- {stop, {error_waiting_for_process_registry_table, Reason}}
- end.
- -spec handle_call(Request :: any(), From :: any(), #state{}) ->
- {reply, Reply :: any(), #state{}} |
- {reply, Reply :: any(), #state{}, Timeout :: non_neg_integer()} |
- {noreply, #state{}} |
- {noreply, #state{}, Timeout :: non_neg_integer()} |
- {stop, Reason :: any(), Reply :: any(), #state{}} |
- {stop, Reason :: any(), #state{}}.
- handle_call(Request, From, State) ->
- error_logger:warning_msg("Received from ~p an unknown call message: ~p~n", [Request, From]),
- {reply, undefined, State}.
- -spec handle_cast(Msg :: any(), #state{}) ->
- {noreply, #state{}} |
- {noreply, #state{}, Timeout :: non_neg_integer()} |
- {stop, Reason :: any(), #state{}}.
- handle_cast(Msg, State) ->
- error_logger:warning_msg("Received an unknown cast message: ~p~n", [Msg]),
- {noreply, State}.
- -spec handle_info(Info :: any(), #state{}) ->
- {noreply, #state{}} |
- {noreply, #state{}, Timeout :: non_neg_integer()} |
- {stop, Reason :: any(), #state{}}.
- handle_info({nodeup, RemoteNode}, State) ->
- error_logger:info_msg("Node ~p has joined the cluster of local node ~p~n", [RemoteNode, node()]),
- global:trans({{?MODULE, auto_merge_node_up}, self()},
- fun() ->
- error_logger:info_msg("Merge: ----> Initiating on ~p for remote node ~p~n", [node(), RemoteNode]),
-
- RegistryTuples = rpc:call(RemoteNode, syn_registry, get_local_registry_tuples_and_suspend, [node()]),
- sync_registry_tuples(RemoteNode, RegistryTuples),
- error_logger:error_msg("Merge: <---- Done on ~p for remote node ~p~n", [node(), RemoteNode])
- end
- ),
-
- ok = rpc:call(RemoteNode, sys, resume, [syn_registry]),
-
- {noreply, State};
- handle_info({nodedown, RemoteNode}, State) ->
- error_logger:warning_msg("Node ~p has left the cluster of local node ~p~n", [RemoteNode, node()]),
- purge_registry_entries_for_node(RemoteNode),
- {noreply, State};
- handle_info(Info, State) ->
- error_logger:warning_msg("Received an unknown info message: ~p~n", [Info]),
- {noreply, State}.
- sync_registry_tuples(RemoteNode, RegistryTuples) ->
-
- purge_registry_entries_for_node(RemoteNode),
-
- F = fun({Name, RemotePid, _RemoteNode, RemoteMeta}) ->
-
- case syn_registry:find_process_entry_by_name(Name) of
- undefined ->
-
- ok;
- Entry ->
- error_logger:warning_msg(
- "Conflicting name process found for: ~p, processes are ~p, ~p, killing local~n",
- [Name, Entry#syn_registry_table.pid, RemotePid]
- ),
-
- exit(Entry#syn_registry_table.pid, kill)
- end,
-
- syn_registry:sync_register(Name, RemotePid, RemoteMeta)
- end,
-
- lists:foreach(F, RegistryTuples).
- -spec terminate(Reason :: any(), #state{}) -> terminated.
- terminate(Reason, _State) ->
- error_logger:info_msg("Terminating with reason: ~p~n", [Reason]),
- terminated.
- -spec code_change(OldVsn :: any(), #state{}, Extra :: any()) -> {ok, #state{}}.
- code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
- -spec purge_registry_entries_for_node(Node :: atom()) -> ok.
- purge_registry_entries_for_node(Node) ->
-
- MatchHead = #syn_registry_table{name = '$1', node = '$2', _ = '_'},
- Guard = {'=:=', '$2', Node},
- IdFormat = '$1',
-
- NodePids = mnesia:dirty_select(syn_registry_table, [{MatchHead, [Guard], [IdFormat]}]),
- DelF = fun(Id) -> mnesia:dirty_delete({syn_registry_table, Id}) end,
- lists:foreach(DelF, NodePids).
|