|
@@ -267,7 +267,11 @@ handle_call({unregister_on_owner, RequesterNode, Name, Pid}, _From, #state{scope
|
|
|
|
|
|
undefined ->
|
|
|
{reply, {error, undefined}, State}
|
|
|
- end.
|
|
|
+ end;
|
|
|
+
|
|
|
+handle_call(Request, From, State) ->
|
|
|
+ error_logger:warning_msg("SYN[~s] Received from ~p an unknown call message: ~p", [node(), Request, From]),
|
|
|
+ {reply, undefined, State}.
|
|
|
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
|
%% Cast messages
|
|
@@ -276,18 +280,29 @@ handle_call({unregister_on_owner, RequesterNode, Name, Pid}, _From, #state{scope
|
|
|
{noreply, #state{}} |
|
|
|
{noreply, #state{}, Timeout :: non_neg_integer()} |
|
|
|
{stop, Reason :: any(), #state{}}.
|
|
|
-handle_cast({'3.0', sync_register, Scope, Name, Pid, Meta, Time}, State) ->
|
|
|
+handle_cast(Msg, State) ->
|
|
|
+ error_logger:warning_msg("SYN[~s] Received an unknown cast message: ~p", [node(), Msg]),
|
|
|
+ {noreply, State}.
|
|
|
+
|
|
|
+%% ----------------------------------------------------------------------------------------------------------
|
|
|
+%% Info messages
|
|
|
+%% ----------------------------------------------------------------------------------------------------------
|
|
|
+-spec handle_info(Info :: any(), #state{}) ->
|
|
|
+ {noreply, #state{}} |
|
|
|
+ {noreply, #state{}, Timeout :: non_neg_integer()} |
|
|
|
+ {stop, Reason :: any(), #state{}}.
|
|
|
+handle_info({'3.0', sync_register, Scope, Name, Pid, Meta, Time}, State) ->
|
|
|
handle_registry_sync(Scope, Name, Pid, Meta, Time, State),
|
|
|
{noreply, State};
|
|
|
|
|
|
-handle_cast({'3.0', sync_unregister, Name, Pid, Meta}, #state{scope = Scope} = State) ->
|
|
|
+handle_info({'3.0', sync_unregister, Name, Pid, Meta}, #state{scope = Scope} = State) ->
|
|
|
remove_from_local_table(Scope, Name, Pid),
|
|
|
%% callback
|
|
|
syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
|
|
|
%% return
|
|
|
{noreply, State};
|
|
|
|
|
|
-handle_cast({'3.0', discover, RemoteScopePid}, #state{
|
|
|
+handle_info({'3.0', discover, RemoteScopePid}, #state{
|
|
|
scope = Scope,
|
|
|
nodes = Nodes
|
|
|
} = State) ->
|
|
@@ -295,7 +310,7 @@ handle_cast({'3.0', discover, RemoteScopePid}, #state{
|
|
|
error_logger:info_msg("SYN[~s] Received DISCOVER request from node '~s' and scope '~s'", [node(), RemoteScopeNode, Scope]),
|
|
|
%% send data
|
|
|
RegistryTuplesOfLocalNode = get_registry_tuples_for_node(Scope, node()),
|
|
|
- cast_to_node(RemoteScopeNode, {'3.0', ack_sync, self(), RegistryTuplesOfLocalNode}, State),
|
|
|
+ send_to_node(RemoteScopeNode, {'3.0', ack_sync, self(), RegistryTuplesOfLocalNode}, State),
|
|
|
%% is this a new node?
|
|
|
case maps:is_key(RemoteScopeNode, Nodes) of
|
|
|
true ->
|
|
@@ -308,7 +323,7 @@ handle_cast({'3.0', discover, RemoteScopePid}, #state{
|
|
|
{noreply, State#state{nodes = Nodes#{RemoteScopeNode => RemoteScopePid}}}
|
|
|
end;
|
|
|
|
|
|
-handle_cast({'3.0', ack_sync, RemoteScopePid, RegistryTuplesOfRemoteNode}, #state{
|
|
|
+handle_info({'3.0', ack_sync, RemoteScopePid, RegistryTuplesOfRemoteNode}, #state{
|
|
|
scope = Scope,
|
|
|
nodes = Nodes
|
|
|
} = State) ->
|
|
@@ -331,18 +346,11 @@ handle_cast({'3.0', ack_sync, RemoteScopePid, RegistryTuplesOfRemoteNode}, #stat
|
|
|
_MRef = monitor(process, RemoteScopePid),
|
|
|
%% send data
|
|
|
RegistryTuplesOfLocalNode = get_registry_tuples_for_node(Scope, node()),
|
|
|
- cast_to_node(RemoteScopeNode, {'3.0', ack_sync, self(), RegistryTuplesOfLocalNode}, State),
|
|
|
+ send_to_node(RemoteScopeNode, {'3.0', ack_sync, self(), RegistryTuplesOfLocalNode}, State),
|
|
|
%% return
|
|
|
{noreply, State#state{nodes = Nodes#{RemoteScopeNode => RemoteScopePid}}}
|
|
|
- end.
|
|
|
+ end;
|
|
|
|
|
|
-%% ----------------------------------------------------------------------------------------------------------
|
|
|
-%% Info messages
|
|
|
-%% ----------------------------------------------------------------------------------------------------------
|
|
|
--spec handle_info(Info :: any(), #state{}) ->
|
|
|
- {noreply, #state{}} |
|
|
|
- {noreply, #state{}, Timeout :: non_neg_integer()} |
|
|
|
- {stop, Reason :: any(), #state{}}.
|
|
|
handle_info({'DOWN', _MRef, process, Pid, _Reason}, #state{
|
|
|
scope = Scope,
|
|
|
nodes = Nodes
|
|
@@ -389,7 +397,11 @@ handle_info({nodeup, RemoteNode}, #state{scope = Scope} = State) ->
|
|
|
error_logger:info_msg("SYN[~s] Node '~s' has joined the cluster, sending discover message for scope '~s'",
|
|
|
[node(), RemoteNode, Scope]
|
|
|
),
|
|
|
- cast_to_node(RemoteNode, {'3.0', discover, self()}, State),
|
|
|
+ send_to_node(RemoteNode, {'3.0', discover, self()}, State),
|
|
|
+ {noreply, State};
|
|
|
+
|
|
|
+handle_info(Info, State) ->
|
|
|
+ error_logger:warning_msg("SYN[~s] Received an unknown info message: ~p", [node(), Info]),
|
|
|
{noreply, State}.
|
|
|
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
@@ -446,18 +458,18 @@ broadcast(Message, State) ->
|
|
|
-spec broadcast(Message :: any(), ExcludedNodes :: [node()], #state{}) -> any().
|
|
|
broadcast(Message, ExcludedNodes, #state{process_name = ProcessName, nodes = Nodes}) ->
|
|
|
lists:foreach(fun(RemoteNode) ->
|
|
|
- gen_server:cast({ProcessName, RemoteNode}, Message)
|
|
|
+ erlang:send({ProcessName, RemoteNode}, Message, [noconnect])
|
|
|
end, maps:keys(Nodes) -- ExcludedNodes).
|
|
|
|
|
|
-spec broadcast_all(Message :: any(), #state{}) -> any().
|
|
|
broadcast_all(Message, #state{process_name = ProcessName}) ->
|
|
|
- lists:foreach(fun(RemoteNode) -> gen_server:cast({ProcessName, RemoteNode}, Message) end, nodes()).
|
|
|
+ lists:foreach(fun(RemoteNode) ->
|
|
|
+ erlang:send({ProcessName, RemoteNode}, Message, [noconnect])
|
|
|
+ end, nodes()).
|
|
|
|
|
|
--spec cast_to_node(RemoteNode :: node(), Message :: any(), #state{}) -> any().
|
|
|
-cast_to_node(RemoteNode, Message, #state{
|
|
|
- process_name = ProcessName
|
|
|
-}) ->
|
|
|
- gen_server:cast({ProcessName, RemoteNode}, Message).
|
|
|
+-spec send_to_node(RemoteNode :: node(), Message :: any(), #state{}) -> any().
|
|
|
+send_to_node(RemoteNode, Message, #state{process_name = ProcessName}) ->
|
|
|
+ erlang:send({ProcessName, RemoteNode}, Message, [noconnect]).
|
|
|
|
|
|
-spec get_registry_tuples_for_node(Scope :: atom(), Node :: node()) -> [syn_registry_tuple()].
|
|
|
get_registry_tuples_for_node(Scope, Node) ->
|