|
@@ -36,10 +36,10 @@
|
|
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
|
|
|
|
|
%% internal
|
|
|
--export([get_processes_info_of_node/1]).
|
|
|
--export([write_processes_info_to_node/2]).
|
|
|
--export([get_groups_info_of_node/1]).
|
|
|
--export([write_groups_info_to_node/2]).
|
|
|
+-export([get_registry_processes_info_of_node/1]).
|
|
|
+-export([write_registry_processes_info_to_node/2]).
|
|
|
+-export([get_groups_processes_info_of_node/1]).
|
|
|
+-export([write_groups_processes_info_to_node/2]).
|
|
|
|
|
|
%% records
|
|
|
-record(state, {
|
|
@@ -75,14 +75,14 @@ init([]) ->
|
|
|
%% monitor mnesia events
|
|
|
mnesia:subscribe(system),
|
|
|
%% get options
|
|
|
- {ok, [ConflictingProcessCallbackModule, ConflictingProcessCallbackFunction]} = syn_utils:get_env_value(
|
|
|
+ {ok, [RegistryConflictingProcessCallbackModule, RegistryConflictingProcessCallbackFunction]} = syn_utils:get_env_value(
|
|
|
registry_conflicting_process_callback,
|
|
|
[undefined, undefined]
|
|
|
),
|
|
|
%% build state
|
|
|
{ok, #state{
|
|
|
- registry_conflicting_process_callback_module = ConflictingProcessCallbackModule,
|
|
|
- registry_conflicting_process_callback_function = ConflictingProcessCallbackFunction
|
|
|
+ registry_conflicting_process_callback_module = RegistryConflictingProcessCallbackModule,
|
|
|
+ registry_conflicting_process_callback_function = RegistryConflictingProcessCallbackFunction
|
|
|
}}.
|
|
|
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
@@ -135,12 +135,12 @@ handle_info({mnesia_system_event, _MnesiaEvent}, State) ->
|
|
|
%% ignore mnesia event
|
|
|
{noreply, State};
|
|
|
|
|
|
-handle_info({purge_double_processes, DoubleProcessesInfo}, #state{
|
|
|
- registry_conflicting_process_callback_module = ConflictingProcessCallbackModule,
|
|
|
- registry_conflicting_process_callback_function = ConflictingProcessCallbackFunction
|
|
|
+handle_info({purge_registry_double_processes, DoubleRegistryProcessesInfo}, #state{
|
|
|
+ registry_conflicting_process_callback_module = RegistryConflictingProcessCallbackModule,
|
|
|
+ registry_conflicting_process_callback_function = RegistryConflictingProcessCallbackFunction
|
|
|
} = State) ->
|
|
|
error_logger:warning_msg("About to purge double processes after netsplit"),
|
|
|
- purge_double_processes(ConflictingProcessCallbackModule, ConflictingProcessCallbackFunction, DoubleProcessesInfo),
|
|
|
+ purge_registry_double_processes(RegistryConflictingProcessCallbackModule, RegistryConflictingProcessCallbackFunction, DoubleRegistryProcessesInfo),
|
|
|
{noreply, State};
|
|
|
|
|
|
handle_info(Info, State) ->
|
|
@@ -227,29 +227,29 @@ stitch(RemoteNode) ->
|
|
|
-spec stitch_registry_tab(RemoteNode :: atom()) -> ok.
|
|
|
stitch_registry_tab(RemoteNode) ->
|
|
|
%% get remote processes info
|
|
|
- RemoteProcessesInfo = rpc:call(RemoteNode, ?MODULE, get_processes_info_of_node, [RemoteNode]),
|
|
|
+ RemoteRegistryProcessesInfo = rpc:call(RemoteNode, ?MODULE, get_registry_processes_info_of_node, [RemoteNode]),
|
|
|
%% get local processes info
|
|
|
- LocalProcessesInfo = get_processes_info_of_node(node()),
|
|
|
+ LocalRegistryProcessesInfo = get_registry_processes_info_of_node(node()),
|
|
|
%% purge doubles (if any)
|
|
|
- {LocalProcessesInfo1, RemoteProcessesInfo1} = purge_double_processes_from_local_mnesia(
|
|
|
- LocalProcessesInfo,
|
|
|
- RemoteProcessesInfo
|
|
|
+ {LocalRegistryProcessesInfo1, RemoteRegistryProcessesInfo1} = purge_registry_double_processes_from_local_mnesia(
|
|
|
+ LocalRegistryProcessesInfo,
|
|
|
+ RemoteRegistryProcessesInfo
|
|
|
),
|
|
|
%% write
|
|
|
- write_remote_processes_to_local(RemoteNode, RemoteProcessesInfo1),
|
|
|
- write_local_processes_to_remote(RemoteNode, LocalProcessesInfo1).
|
|
|
+ write_remote_registry_processes_to_local(RemoteNode, RemoteRegistryProcessesInfo1),
|
|
|
+ write_local_registry_processes_to_remote(RemoteNode, LocalRegistryProcessesInfo1).
|
|
|
|
|
|
--spec purge_double_processes_from_local_mnesia(
|
|
|
- LocalProcessesInfo :: list(),
|
|
|
- RemoteProcessesInfo :: list()
|
|
|
+-spec purge_registry_double_processes_from_local_mnesia(
|
|
|
+ LocalRegistryProcessesInfo :: list(),
|
|
|
+ RemoteRegistryProcessesInfo :: list()
|
|
|
) ->
|
|
|
- {LocalProcessesInfo :: list(), RemoteProcessesInfo :: list()}.
|
|
|
-purge_double_processes_from_local_mnesia(LocalProcessesInfo, RemoteProcessesInfo) ->
|
|
|
+ {LocalRegistryProcessesInfo :: list(), RemoteRegistryProcessesInfo :: list()}.
|
|
|
+purge_registry_double_processes_from_local_mnesia(LocalRegistryProcessesInfo, RemoteRegistryProcessesInfo) ->
|
|
|
%% create ETS table
|
|
|
Tab = ets:new(syn_automerge_doubles_table, [set]),
|
|
|
|
|
|
%% insert local processes info
|
|
|
- ets:insert(Tab, LocalProcessesInfo),
|
|
|
+ ets:insert(Tab, LocalRegistryProcessesInfo),
|
|
|
|
|
|
%% find doubles
|
|
|
F = fun({Key, _RemoteProcessPid, _RemoteProcessMeta}, Acc) ->
|
|
@@ -264,28 +264,28 @@ purge_double_processes_from_local_mnesia(LocalProcessesInfo, RemoteProcessesInfo
|
|
|
[{Key, LocalProcessPid, LocalProcessMeta} | Acc]
|
|
|
end
|
|
|
end,
|
|
|
- DoubleProcessesInfo = lists:foldl(F, [], RemoteProcessesInfo),
|
|
|
+ DoubleRegistryProcessesInfo = lists:foldl(F, [], RemoteRegistryProcessesInfo),
|
|
|
|
|
|
%% send to syn_consistency gen_server to handle double processes once merging is done
|
|
|
- ?MODULE ! {purge_double_processes, DoubleProcessesInfo},
|
|
|
+ ?MODULE ! {purge_registry_double_processes, DoubleRegistryProcessesInfo},
|
|
|
|
|
|
%% compute local processes without doubles
|
|
|
- LocalProcessesInfo1 = ets:tab2list(Tab),
|
|
|
+ LocalRegistryProcessesInfo1 = ets:tab2list(Tab),
|
|
|
%% delete ETS table
|
|
|
ets:delete(Tab),
|
|
|
%% return
|
|
|
- {LocalProcessesInfo1, RemoteProcessesInfo}.
|
|
|
+ {LocalRegistryProcessesInfo1, RemoteRegistryProcessesInfo}.
|
|
|
|
|
|
--spec write_remote_processes_to_local(RemoteNode :: atom(), RemoteProcessesInfo :: list()) -> ok.
|
|
|
-write_remote_processes_to_local(RemoteNode, RemoteProcessesInfo) ->
|
|
|
- write_processes_info_to_node(RemoteNode, RemoteProcessesInfo).
|
|
|
+-spec write_remote_registry_processes_to_local(RemoteNode :: atom(), RemoteRegistryProcessesInfo :: list()) -> ok.
|
|
|
+write_remote_registry_processes_to_local(RemoteNode, RemoteRegistryProcessesInfo) ->
|
|
|
+ write_registry_processes_info_to_node(RemoteNode, RemoteRegistryProcessesInfo).
|
|
|
|
|
|
--spec write_local_processes_to_remote(RemoteNode :: atom(), LocalProcessesInfo :: list()) -> ok.
|
|
|
-write_local_processes_to_remote(RemoteNode, LocalProcessesInfo) ->
|
|
|
- ok = rpc:call(RemoteNode, ?MODULE, write_processes_info_to_node, [node(), LocalProcessesInfo]).
|
|
|
+-spec write_local_registry_processes_to_remote(RemoteNode :: atom(), LocalRegistryProcessesInfo :: list()) -> ok.
|
|
|
+write_local_registry_processes_to_remote(RemoteNode, LocalRegistryProcessesInfo) ->
|
|
|
+ ok = rpc:call(RemoteNode, ?MODULE, write_registry_processes_info_to_node, [node(), LocalRegistryProcessesInfo]).
|
|
|
|
|
|
--spec get_processes_info_of_node(Node :: atom()) -> list().
|
|
|
-get_processes_info_of_node(Node) ->
|
|
|
+-spec get_registry_processes_info_of_node(Node :: atom()) -> list().
|
|
|
+get_registry_processes_info_of_node(Node) ->
|
|
|
%% build match specs
|
|
|
MatchHead = #syn_registry_table{key = '$1', pid = '$2', node = '$3', meta = '$4'},
|
|
|
Guard = {'=:=', '$3', Node},
|
|
@@ -293,8 +293,8 @@ get_processes_info_of_node(Node) ->
|
|
|
%% select
|
|
|
mnesia:dirty_select(syn_registry_table, [{MatchHead, [Guard], [ProcessInfoFormat]}]).
|
|
|
|
|
|
--spec write_processes_info_to_node(Node :: atom(), ProcessesInfo :: list()) -> ok.
|
|
|
-write_processes_info_to_node(Node, ProcessesInfo) ->
|
|
|
+-spec write_registry_processes_info_to_node(Node :: atom(), RegistryProcessesInfo :: list()) -> ok.
|
|
|
+write_registry_processes_info_to_node(Node, RegistryProcessesInfo) ->
|
|
|
FWrite = fun({Key, ProcessPid, ProcessMeta}) ->
|
|
|
mnesia:dirty_write(#syn_registry_table{
|
|
|
key = Key,
|
|
@@ -303,41 +303,41 @@ write_processes_info_to_node(Node, ProcessesInfo) ->
|
|
|
meta = ProcessMeta
|
|
|
})
|
|
|
end,
|
|
|
- lists:foreach(FWrite, ProcessesInfo).
|
|
|
+ lists:foreach(FWrite, RegistryProcessesInfo).
|
|
|
|
|
|
--spec purge_double_processes(
|
|
|
- ConflictingProcessCallbackModule :: atom(),
|
|
|
- ConflictingProcessCallbackFunction :: atom(),
|
|
|
- DoubleProcessesInfo :: list()
|
|
|
+-spec purge_registry_double_processes(
|
|
|
+ RegistryConflictingProcessCallbackModule :: atom(),
|
|
|
+ RegistryConflictingProcessCallbackFunction :: atom(),
|
|
|
+ DoubleRegistryProcessesInfo :: list()
|
|
|
) -> ok.
|
|
|
-purge_double_processes(undefined, _, DoubleProcessesInfo) ->
|
|
|
+purge_registry_double_processes(undefined, _, DoubleRegistryProcessesInfo) ->
|
|
|
F = fun({Key, LocalProcessPid, _LocalProcessMeta}) ->
|
|
|
error_logger:warning_msg("Found a double process for ~s, killing it on local node ~p", [Key, node()]),
|
|
|
exit(LocalProcessPid, kill)
|
|
|
end,
|
|
|
- lists:foreach(F, DoubleProcessesInfo);
|
|
|
-purge_double_processes(ConflictingProcessCallbackModule, ConflictingProcessCallbackFunction, DoubleProcessesInfo) ->
|
|
|
+ lists:foreach(F, DoubleRegistryProcessesInfo);
|
|
|
+purge_registry_double_processes(RegistryConflictingProcessCallbackModule, RegistryConflictingProcessCallbackFunction, DoubleRegistryProcessesInfo) ->
|
|
|
F = fun({Key, LocalProcessPid, LocalProcessMeta}) ->
|
|
|
spawn(
|
|
|
fun() ->
|
|
|
error_logger:warning_msg("Found a double process for ~s, about to trigger callback on local node ~p", [Key, node()]),
|
|
|
- ConflictingProcessCallbackModule:ConflictingProcessCallbackFunction(Key, LocalProcessPid, LocalProcessMeta)
|
|
|
+ RegistryConflictingProcessCallbackModule:RegistryConflictingProcessCallbackFunction(Key, LocalProcessPid, LocalProcessMeta)
|
|
|
end)
|
|
|
end,
|
|
|
- lists:foreach(F, DoubleProcessesInfo).
|
|
|
+ lists:foreach(F, DoubleRegistryProcessesInfo).
|
|
|
|
|
|
-spec stitch_group_tab(RemoteNode :: atom()) -> ok.
|
|
|
stitch_group_tab(RemoteNode) ->
|
|
|
%% get remote processes info
|
|
|
- RemoteGroupsInfo = rpc:call(RemoteNode, ?MODULE, get_groups_info_of_node, [RemoteNode]),
|
|
|
+ RemoteGroupsRegistryProcessesInfo = rpc:call(RemoteNode, ?MODULE, get_groups_processes_info_of_node, [RemoteNode]),
|
|
|
%% get local processes info
|
|
|
- LocalGroupsInfo = get_groups_info_of_node(node()),
|
|
|
+ LocalGroupsRegistryProcessesInfo = get_groups_processes_info_of_node(node()),
|
|
|
%% write
|
|
|
- write_remote_groups_to_local(RemoteNode, RemoteGroupsInfo),
|
|
|
- write_local_groups_to_remote(RemoteNode, LocalGroupsInfo).
|
|
|
+ write_remote_groups_processes_info_to_local(RemoteNode, RemoteGroupsRegistryProcessesInfo),
|
|
|
+ write_local_groups_processes_info_to_remote(RemoteNode, LocalGroupsRegistryProcessesInfo).
|
|
|
|
|
|
--spec get_groups_info_of_node(Node :: atom()) -> list().
|
|
|
-get_groups_info_of_node(Node) ->
|
|
|
+-spec get_groups_processes_info_of_node(Node :: atom()) -> list().
|
|
|
+get_groups_processes_info_of_node(Node) ->
|
|
|
%% build match specs
|
|
|
MatchHead = #syn_groups_table{name = '$1', pid = '$2', node = '$3'},
|
|
|
Guard = {'=:=', '$3', Node},
|
|
@@ -345,16 +345,16 @@ get_groups_info_of_node(Node) ->
|
|
|
%% select
|
|
|
mnesia:dirty_select(syn_groups_table, [{MatchHead, [Guard], [GroupInfoFormat]}]).
|
|
|
|
|
|
--spec write_remote_groups_to_local(RemoteNode :: atom(), RemoteGroupsInfo :: list()) -> ok.
|
|
|
-write_remote_groups_to_local(RemoteNode, RemoteGroupsInfo) ->
|
|
|
- write_groups_info_to_node(RemoteNode, RemoteGroupsInfo).
|
|
|
+-spec write_remote_groups_processes_info_to_local(RemoteNode :: atom(), RemoteGroupsRegistryProcessesInfo :: list()) -> ok.
|
|
|
+write_remote_groups_processes_info_to_local(RemoteNode, RemoteGroupsRegistryProcessesInfo) ->
|
|
|
+ write_groups_processes_info_to_node(RemoteNode, RemoteGroupsRegistryProcessesInfo).
|
|
|
|
|
|
--spec write_local_groups_to_remote(RemoteNode :: atom(), LocalGroupsInfo :: list()) -> ok.
|
|
|
-write_local_groups_to_remote(RemoteNode, LocalGroupsInfo) ->
|
|
|
- ok = rpc:call(RemoteNode, ?MODULE, write_groups_info_to_node, [node(), LocalGroupsInfo]).
|
|
|
+-spec write_local_groups_processes_info_to_remote(RemoteNode :: atom(), LocalGroupsRegistryProcessesInfo :: list()) -> ok.
|
|
|
+write_local_groups_processes_info_to_remote(RemoteNode, LocalGroupsRegistryProcessesInfo) ->
|
|
|
+ ok = rpc:call(RemoteNode, ?MODULE, write_groups_processes_info_to_node, [node(), LocalGroupsRegistryProcessesInfo]).
|
|
|
|
|
|
--spec write_groups_info_to_node(Node :: atom(), GroupsInfo :: list()) -> ok.
|
|
|
-write_groups_info_to_node(Node, GroupsInfo) ->
|
|
|
+-spec write_groups_processes_info_to_node(Node :: atom(), GroupsRegistryProcessesInfo :: list()) -> ok.
|
|
|
+write_groups_processes_info_to_node(Node, GroupsRegistryProcessesInfo) ->
|
|
|
FWrite = fun({Name, Pid}) ->
|
|
|
mnesia:dirty_write(#syn_groups_table{
|
|
|
name = Name,
|
|
@@ -362,4 +362,4 @@ write_groups_info_to_node(Node, GroupsInfo) ->
|
|
|
node = Node
|
|
|
})
|
|
|
end,
|
|
|
- lists:foreach(FWrite, GroupsInfo).
|
|
|
+ lists:foreach(FWrite, GroupsRegistryProcessesInfo).
|