|
@@ -29,7 +29,7 @@
|
|
|
|
|
|
%% API
|
|
|
-export([
|
|
|
- start_link/2,
|
|
|
+ start_link/3,
|
|
|
subcluster_nodes/2,
|
|
|
call/3, call/4
|
|
|
]).
|
|
@@ -78,9 +78,9 @@
|
|
|
%% ===================================================================
|
|
|
%% API
|
|
|
%% ===================================================================
|
|
|
--spec start_link(Handler :: module(), Scope :: atom()) ->
|
|
|
+-spec start_link(Handler :: module(), HandlerLogName :: atom(), Scope :: atom()) ->
|
|
|
{ok, Pid :: pid()} | {error, {already_started, Pid :: pid()}} | {error, Reason :: term()}.
|
|
|
-start_link(Handler, Scope) when is_atom(Scope) ->
|
|
|
+start_link(Handler, HandlerLogName, Scope) when is_atom(Scope) ->
|
|
|
%% build name
|
|
|
HandlerBin = list_to_binary(atom_to_list(Handler)),
|
|
|
ScopeBin = list_to_binary(atom_to_list(Scope)),
|
|
@@ -88,7 +88,7 @@ start_link(Handler, Scope) when is_atom(Scope) ->
|
|
|
%% save to lookup table
|
|
|
syn_backbone:save_process_name({Handler, Scope}, ProcessName),
|
|
|
%% create process
|
|
|
- gen_server:start_link({local, ProcessName}, ?MODULE, [Handler, Scope, ProcessName], []).
|
|
|
+ gen_server:start_link({local, ProcessName}, ?MODULE, [Handler, HandlerLogName, Scope, ProcessName], []).
|
|
|
|
|
|
-spec subcluster_nodes(Handler :: module(), Scope :: atom()) -> [node()].
|
|
|
subcluster_nodes(Handler, Scope) ->
|
|
@@ -138,7 +138,7 @@ send_to_node(RemoteNode, Message, #state{process_name = ProcessName}) ->
|
|
|
{ok, #state{}} |
|
|
|
{ok, #state{}, timeout() | hibernate | {continue, term()}} |
|
|
|
{stop, Reason :: term()} | ignore.
|
|
|
-init([Handler, Scope, ProcessName]) ->
|
|
|
+init([Handler, HandlerLogName, Scope, ProcessName]) ->
|
|
|
%% monitor nodes
|
|
|
ok = net_kernel:monitor_nodes(true),
|
|
|
%% start multicast process
|
|
@@ -150,6 +150,7 @@ init([Handler, Scope, ProcessName]) ->
|
|
|
%% build state
|
|
|
State = #state{
|
|
|
handler = Handler,
|
|
|
+ handler_log_name = HandlerLogName,
|
|
|
scope = Scope,
|
|
|
process_name = ProcessName,
|
|
|
multicast_pid = MulticastPid,
|
|
@@ -199,12 +200,13 @@ handle_cast(Msg, #state{handler = Handler} = State) ->
|
|
|
{stop, Reason :: term(), #state{}}.
|
|
|
handle_info({'3.0', discover, RemoteScopePid}, #state{
|
|
|
handler = Handler,
|
|
|
+ handler_log_name = HandlerLogName,
|
|
|
scope = Scope,
|
|
|
nodes_map = NodesMap
|
|
|
} = State) ->
|
|
|
RemoteScopeNode = node(RemoteScopePid),
|
|
|
- error_logger:info_msg("SYN[~s<~s>] Received DISCOVER request from node '~s'",
|
|
|
- [Handler, Scope, RemoteScopeNode]
|
|
|
+ error_logger:info_msg("SYN[~s|~s<~s>] Received DISCOVER request from node ~s",
|
|
|
+ [node(), HandlerLogName, Scope, RemoteScopeNode]
|
|
|
),
|
|
|
%% send local data to remote
|
|
|
{ok, LocalData} = Handler:get_local_data(State),
|
|
@@ -223,12 +225,13 @@ handle_info({'3.0', discover, RemoteScopePid}, #state{
|
|
|
|
|
|
handle_info({'3.0', ack_sync, RemoteScopePid, Data}, #state{
|
|
|
handler = Handler,
|
|
|
+ handler_log_name = HandlerLogName,
|
|
|
nodes_map = NodesMap,
|
|
|
scope = Scope
|
|
|
} = State) ->
|
|
|
RemoteScopeNode = node(RemoteScopePid),
|
|
|
- error_logger:info_msg("SYN[~s<~s>] Received ACK SYNC (~w entries) from node '~s'",
|
|
|
- [Handler, Scope, length(Data), RemoteScopeNode]
|
|
|
+ error_logger:info_msg("SYN[~s|~s<~s>] Received ACK SYNC (~w entries) from node ~s",
|
|
|
+ [node(), HandlerLogName, Scope, length(Data), RemoteScopeNode]
|
|
|
),
|
|
|
%% save remote data
|
|
|
Handler:save_remote_data(Data, State),
|
|
@@ -250,6 +253,7 @@ handle_info({'3.0', ack_sync, RemoteScopePid, Data}, #state{
|
|
|
|
|
|
handle_info({'DOWN', MRef, process, Pid, Reason}, #state{
|
|
|
handler = Handler,
|
|
|
+ handler_log_name = HandlerLogName,
|
|
|
scope = Scope,
|
|
|
nodes_map = NodesMap
|
|
|
} = State) when node(Pid) =/= node() ->
|
|
@@ -257,8 +261,8 @@ handle_info({'DOWN', MRef, process, Pid, Reason}, #state{
|
|
|
RemoteNode = node(Pid),
|
|
|
case maps:take(RemoteNode, NodesMap) of
|
|
|
{Pid, NodesMap1} ->
|
|
|
- error_logger:info_msg("SYN[~s<~s>] Scope Process is DOWN on node '~s': ~p",
|
|
|
- [Handler, Scope, RemoteNode, Reason]
|
|
|
+ error_logger:info_msg("SYN[~s|~s<~s>] Scope Process is DOWN on node ~s: ~p",
|
|
|
+ [node(), HandlerLogName, Scope, RemoteNode, Reason]
|
|
|
),
|
|
|
Handler:purge_local_data_for_node(RemoteNode, State),
|
|
|
{noreply, State#state{nodes_map = NodesMap1}};
|
|
@@ -273,11 +277,11 @@ handle_info({nodedown, _Node}, State) ->
|
|
|
{noreply, State};
|
|
|
|
|
|
handle_info({nodeup, RemoteNode}, #state{
|
|
|
- handler = Handler,
|
|
|
+ handler_log_name = HandlerLogName,
|
|
|
scope = Scope
|
|
|
} = State) ->
|
|
|
- error_logger:info_msg("SYN[~s<~s>] Node '~s' has joined the cluster, sending discover message",
|
|
|
- [Handler, Scope, RemoteNode]
|
|
|
+ error_logger:info_msg("SYN[~s|~s<~s>] Node ~s has joined the cluster, sending discover message",
|
|
|
+ [node(), HandlerLogName, Scope, RemoteNode]
|
|
|
),
|
|
|
send_to_node(RemoteNode, {'3.0', discover, self()}, State),
|
|
|
{noreply, State};
|
|
@@ -293,11 +297,11 @@ handle_info(Info, #state{handler = Handler} = State) ->
|
|
|
{noreply, #state{}, timeout() | hibernate | {continue, term()}} |
|
|
|
{stop, Reason :: term(), #state{}}.
|
|
|
handle_continue(after_init, #state{
|
|
|
- handler = Handler,
|
|
|
+ handler_log_name = HandlerLogName,
|
|
|
scope = Scope,
|
|
|
process_name = ProcessName
|
|
|
} = State) ->
|
|
|
- error_logger:info_msg("SYN[~s<~s>] Discovering the cluster", [Handler, Scope]),
|
|
|
+ error_logger:info_msg("SYN[~s|~s<~s>] Discovering the cluster", [node(), HandlerLogName, Scope]),
|
|
|
%% broadcasting is done in the scope process to avoid issues with ordering guarantees
|
|
|
lists:foreach(fun(RemoteNode) ->
|
|
|
{ProcessName, RemoteNode} ! {'3.0', discover, self()}
|
|
@@ -308,8 +312,8 @@ handle_continue(after_init, #state{
|
|
|
%% Terminate
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
|
-spec terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), #state{}) -> any().
|
|
|
-terminate(Reason, #state{handler = Handler, scope = Scope}) ->
|
|
|
- error_logger:info_msg("SYN[~s<~s>] ~s terminating with reason: ~p", [Handler, Scope, Handler, Reason]).
|
|
|
+terminate(Reason, #state{handler_log_name = HandlerLogName, scope = Scope}) ->
|
|
|
+ error_logger:info_msg("SYN[~s|~s<~s>] Terminating with reason: ~p", [node(), HandlerLogName, Scope, Reason]).
|
|
|
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
|
%% Convert process state when code is changed.
|