|
@@ -16,7 +16,7 @@
|
|
|
%% all copies or substantial portions of the Software.
|
|
|
%%
|
|
|
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
|
-%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
|
+%% IMPLIED, INCLUDING BUT NOT LIMITED TO THxE WARRANTIES OF MERCHANTABILITY,
|
|
|
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
|
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
|
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
@@ -287,15 +287,15 @@ handle_cast({'3.0', sync_unregister, Name, Pid, Meta}, #state{scope = Scope} = S
|
|
|
%% return
|
|
|
{noreply, State};
|
|
|
|
|
|
-handle_cast({'3.0', announce, RemoteScopePid}, #state{
|
|
|
+handle_cast({'3.0', discover, RemoteScopePid}, #state{
|
|
|
scope = Scope,
|
|
|
nodes = Nodes
|
|
|
} = State) ->
|
|
|
RemoteScopeNode = node(RemoteScopePid),
|
|
|
- error_logger:info_msg("SYN[~p] Received announce request from node ~p and scope ~p", [node(), RemoteScopeNode, Scope]),
|
|
|
+ error_logger:info_msg("SYN[~s] Received DISCOVER request from node '~s' and scope '~s'", [node(), RemoteScopeNode, Scope]),
|
|
|
%% send data
|
|
|
RegistryTuplesOfLocalNode = get_registry_tuples_for_node(Scope, node()),
|
|
|
- cast_to_node(RemoteScopeNode, {'3.0', sync, self(), RegistryTuplesOfLocalNode}, State),
|
|
|
+ cast_to_node(RemoteScopeNode, {'3.0', ack_sync, self(), RegistryTuplesOfLocalNode}, State),
|
|
|
%% is this a new node?
|
|
|
case maps:is_key(RemoteScopeNode, Nodes) of
|
|
|
true ->
|
|
@@ -303,18 +303,17 @@ handle_cast({'3.0', announce, RemoteScopePid}, #state{
|
|
|
{noreply, State};
|
|
|
|
|
|
false ->
|
|
|
- %% monitor & announce
|
|
|
+ %% monitor
|
|
|
_MRef = monitor(process, RemoteScopePid),
|
|
|
- cast_to_node(RemoteScopeNode, {'3.0', announce, self()}, State),
|
|
|
{noreply, State#state{nodes = Nodes#{RemoteScopeNode => RemoteScopePid}}}
|
|
|
end;
|
|
|
|
|
|
-handle_cast({'3.0', sync, RemoteScopePid, RegistryTuplesOfRemoteNode}, #state{
|
|
|
+handle_cast({'3.0', ack_sync, RemoteScopePid, RegistryTuplesOfRemoteNode}, #state{
|
|
|
scope = Scope,
|
|
|
nodes = Nodes
|
|
|
} = State) ->
|
|
|
RemoteScopeNode = node(RemoteScopePid),
|
|
|
- error_logger:info_msg("SYN[~p] Received sync data (~p entries) from node ~p and scope ~p",
|
|
|
+ error_logger:info_msg("SYN[~s] Received ACK SYNC (~w entries) from node '~s' and scope '~s'",
|
|
|
[node(), length(RegistryTuplesOfRemoteNode), RemoteScopeNode, Scope]
|
|
|
),
|
|
|
%% insert tuples
|
|
@@ -328,9 +327,12 @@ handle_cast({'3.0', sync, RemoteScopePid, RegistryTuplesOfRemoteNode}, #state{
|
|
|
{noreply, State};
|
|
|
|
|
|
false ->
|
|
|
- %% if we don't know about the node, it is because it's the response to the first broadcast of announce message
|
|
|
- %% -> monitor
|
|
|
+ %% monitor
|
|
|
_MRef = monitor(process, RemoteScopePid),
|
|
|
+ %% send data
|
|
|
+ RegistryTuplesOfLocalNode = get_registry_tuples_for_node(Scope, node()),
|
|
|
+ cast_to_node(RemoteScopeNode, {'3.0', ack_sync, self(), RegistryTuplesOfLocalNode}, State),
|
|
|
+ %% return
|
|
|
{noreply, State#state{nodes = Nodes#{RemoteScopeNode => RemoteScopePid}}}
|
|
|
end.
|
|
|
|
|
@@ -349,12 +351,12 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, #state{
|
|
|
RemoteNode = node(Pid),
|
|
|
case maps:take(RemoteNode, Nodes) of
|
|
|
{Pid, Nodes1} ->
|
|
|
- error_logger:info_msg("SYN[~p] Scope Process ~p is DOWN on node ~p", [node(), Scope, RemoteNode]),
|
|
|
+ error_logger:info_msg("SYN[~s] Scope Process ~p is DOWN on node '~s'", [node(), Scope, RemoteNode]),
|
|
|
purge_registry_for_remote_node(Scope, RemoteNode),
|
|
|
{noreply, State#state{nodes = Nodes1}};
|
|
|
|
|
|
error ->
|
|
|
- error_logger:warning_msg("SYN[~p] Received DOWN message from unknown pid: ~p", [node(), Pid]),
|
|
|
+ error_logger:warning_msg("SYN[~s] Received DOWN message from unknown pid: ~p", [node(), Pid]),
|
|
|
{noreply, State}
|
|
|
end;
|
|
|
|
|
@@ -362,7 +364,7 @@ handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{scope = Scope} = State
|
|
|
case find_registry_entries_by_pid(Scope, Pid) of
|
|
|
[] ->
|
|
|
error_logger:warning_msg(
|
|
|
- "SYN[~p] Received a DOWN message from an unknown process ~p with reason: ~p",
|
|
|
+ "SYN[~s] Received a DOWN message from an unknown process ~p with reason: ~p",
|
|
|
[node(), Pid, Reason]
|
|
|
);
|
|
|
|
|
@@ -383,17 +385,19 @@ handle_info({nodedown, _Node}, State) ->
|
|
|
%% ignore & wait for monitor DOWN message
|
|
|
{noreply, State};
|
|
|
|
|
|
-handle_info({nodeup, RemoteNode}, State) ->
|
|
|
- error_logger:info_msg("SYN[~p] Node ~p has joined the cluster, sending announce message", [node(), RemoteNode]),
|
|
|
- cast_to_node(RemoteNode, {'3.0', announce, self()}, State),
|
|
|
+handle_info({nodeup, RemoteNode}, #state{scope = Scope} = State) ->
|
|
|
+ error_logger:info_msg("SYN[~s] Node '~s' has joined the cluster, sending discover message for scope '~s'",
|
|
|
+ [node(), RemoteNode, Scope]
|
|
|
+ ),
|
|
|
+ cast_to_node(RemoteNode, {'3.0', discover, self()}, State),
|
|
|
{noreply, State}.
|
|
|
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
|
%% Continue messages
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
|
handle_continue(after_init, #state{scope = Scope} = State) ->
|
|
|
- error_logger:info_msg("SYN[~p] Announcing to all nodes in the cluster with scope: ~p", [node(), Scope]),
|
|
|
- broadcast_all({'3.0', announce, self()}, State),
|
|
|
+ error_logger:info_msg("SYN[~s] Discovering the cluster with scope '~s'", [node(), Scope]),
|
|
|
+ broadcast_all({'3.0', discover, self()}, State),
|
|
|
{noreply, State}.
|
|
|
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
@@ -401,7 +405,7 @@ handle_continue(after_init, #state{scope = Scope} = State) ->
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
|
-spec terminate(Reason :: any(), #state{}) -> terminated.
|
|
|
terminate(Reason, _State) ->
|
|
|
- error_logger:info_msg("SYN[~p] Terminating with reason: ~p", [node(), Reason]),
|
|
|
+ error_logger:info_msg("SYN[~s] Terminating with reason: ~p", [node(), Reason]),
|
|
|
terminated.
|
|
|
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
@@ -629,7 +633,7 @@ resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime
|
|
|
syn_event_handler:do_on_process_registered(Scope, Name, {TablePid, TableMeta}, {Pid, Meta}),
|
|
|
%% kill
|
|
|
exit(TablePid, {syn_resolve_kill, Name, TableMeta}),
|
|
|
- error_logger:info_msg("SYN[~p] Registry CONFLICT for name ~p@~p: ~p ~p -> chosen: ~p",
|
|
|
+ error_logger:info_msg("SYN[~s] Registry CONFLICT for name ~p@~s: ~p ~p -> chosen: ~p",
|
|
|
[node(), Name, Scope, Pid, TablePid, Pid]
|
|
|
);
|
|
|
|
|
@@ -640,7 +644,7 @@ resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime
|
|
|
add_to_local_table(Scope, Name, TablePid, TableMeta, ResolveTime, TableMRef),
|
|
|
%% broadcast
|
|
|
broadcast({'3.0', sync_register, Scope, Name, TablePid, TableMeta, ResolveTime}, State),
|
|
|
- error_logger:info_msg("SYN[~p] Registry CONFLICT for name ~p@~p: ~p ~p -> chosen: ~p",
|
|
|
+ error_logger:info_msg("SYN[~s] Registry CONFLICT for name ~p@~s: ~p ~p -> chosen: ~p",
|
|
|
[node(), Name, Scope, Pid, TablePid, TablePid]
|
|
|
);
|
|
|
|
|
@@ -652,7 +656,7 @@ resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime
|
|
|
syn_event_handler:do_on_process_unregistered(Scope, Name, TablePid, TableMeta),
|
|
|
%% kill local, remote will be killed by other node performing the same resolve
|
|
|
exit(TablePid, {syn_resolve_kill, Name, TableMeta}),
|
|
|
- error_logger:info_msg("SYN[~p] Registry CONFLICT for name ~p@~p: ~p ~p -> none chosen (got: ~p)",
|
|
|
+ error_logger:info_msg("SYN[~s] Registry CONFLICT for name ~p@~s: ~p ~p -> none chosen (got: ~p)",
|
|
|
[node(), Name, Scope, Pid, TablePid, Invalid]
|
|
|
)
|
|
|
end.
|