|
@@ -52,33 +52,33 @@
|
|
|
%% internal
|
|
|
-export([multicast_loop/0]).
|
|
|
|
|
|
+%% includes
|
|
|
+-include("syn.hrl").
|
|
|
+
|
|
|
%% callbacks
|
|
|
--callback init(State :: term()) ->
|
|
|
+-callback init(#state{}) ->
|
|
|
{ok, HandlerState :: term()}.
|
|
|
-callback handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
|
|
- State :: term()) ->
|
|
|
- {reply, Reply :: term(), NewState :: term()} |
|
|
|
- {reply, Reply :: term(), NewState :: term(), timeout() | hibernate | {continue, term()}} |
|
|
|
- {noreply, NewState :: term()} |
|
|
|
- {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
|
|
|
- {stop, Reason :: term(), Reply :: term(), NewState :: term()} |
|
|
|
- {stop, Reason :: term(), NewState :: term()}.
|
|
|
--callback handle_info(Info :: timeout | term(), State :: term()) ->
|
|
|
- {noreply, NewState :: term()} |
|
|
|
- {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
|
|
|
- {stop, Reason :: term(), NewState :: term()}.
|
|
|
--callback save_remote_data(RemoteData :: any(), State :: term()) -> any().
|
|
|
--callback get_local_data(State :: term()) -> {ok, Data :: any()} | undefined.
|
|
|
--callback purge_local_data_for_node(Node :: node(), State :: term()) -> any().
|
|
|
-
|
|
|
-%% includes
|
|
|
--include("syn.hrl").
|
|
|
+ #state{}) ->
|
|
|
+ {reply, Reply :: term(), #state{}} |
|
|
|
+ {reply, Reply :: term(), #state{}, timeout() | hibernate | {continue, term()}} |
|
|
|
+ {noreply, #state{}} |
|
|
|
+ {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
|
|
|
+ {stop, Reason :: term(), Reply :: term(), #state{}} |
|
|
|
+ {stop, Reason :: term(), #state{}}.
|
|
|
+-callback handle_info(Info :: timeout | term(), #state{}) ->
|
|
|
+ {noreply, #state{}} |
|
|
|
+ {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
|
|
|
+ {stop, Reason :: term(), #state{}}.
|
|
|
+-callback save_remote_data(RemoteData :: term(), #state{}) -> any().
|
|
|
+-callback get_local_data(#state{}) -> {ok, Data :: term()} | undefined.
|
|
|
+-callback purge_local_data_for_node(Node :: node(), #state{}) -> any().
|
|
|
|
|
|
%% ===================================================================
|
|
|
%% API
|
|
|
%% ===================================================================
|
|
|
-spec start_link(Handler :: module(), Scope :: atom()) ->
|
|
|
- {ok, Pid :: pid()} | {error, {already_started, Pid :: pid()}} | {error, Reason :: any()}.
|
|
|
+ {ok, Pid :: pid()} | {error, {already_started, Pid :: pid()}} | {error, Reason :: term()}.
|
|
|
start_link(Handler, Scope) when is_atom(Scope) ->
|
|
|
%% build name
|
|
|
HandlerBin = atom_to_binary(Handler),
|
|
@@ -96,11 +96,11 @@ get_subcluster_nodes(Handler, Scope) ->
|
|
|
ProcessName -> gen_server:call(ProcessName, get_subcluster_nodes)
|
|
|
end.
|
|
|
|
|
|
--spec call(Handler :: module(), Scope :: atom(), Message :: any()) -> Response :: any().
|
|
|
+-spec call(Handler :: module(), Scope :: atom(), Message :: term()) -> Response :: term().
|
|
|
call(Handler, Scope, Message) ->
|
|
|
call(Handler, node(), Scope, Message).
|
|
|
|
|
|
--spec call(Handler :: module(), Node :: atom(), Scope :: atom(), Message :: any()) -> Response :: any().
|
|
|
+-spec call(Handler :: module(), Node :: atom(), Scope :: atom(), Message :: term()) -> Response :: term().
|
|
|
call(Handler, Node, Scope, Message) ->
|
|
|
case get_process_name_for_scope(Handler, Scope) of
|
|
|
undefined -> error({invalid_scope, Scope});
|
|
@@ -110,19 +110,19 @@ call(Handler, Node, Scope, Message) ->
|
|
|
%% ===================================================================
|
|
|
%% In-Process API
|
|
|
%% ===================================================================
|
|
|
--spec broadcast(Message :: any(), #state{}) -> any().
|
|
|
+-spec broadcast(Message :: term(), #state{}) -> any().
|
|
|
broadcast(Message, State) ->
|
|
|
broadcast(Message, [], State).
|
|
|
|
|
|
--spec broadcast(Message :: any(), ExcludedNodes :: [node()], #state{}) -> any().
|
|
|
+-spec broadcast(Message :: term(), ExcludedNodes :: [node()], #state{}) -> any().
|
|
|
broadcast(Message, ExcludedNodes, #state{multicast_pid = MulticastPid} = State) ->
|
|
|
MulticastPid ! {broadcast, Message, ExcludedNodes, State}.
|
|
|
|
|
|
--spec broadcast_all_cluster(Message :: any(), #state{}) -> any().
|
|
|
+-spec broadcast_all_cluster(Message :: term(), #state{}) -> any().
|
|
|
broadcast_all_cluster(Message, #state{multicast_pid = MulticastPid} = State) ->
|
|
|
MulticastPid ! {broadcast_all_cluster, Message, State}.
|
|
|
|
|
|
--spec send_to_node(RemoteNode :: node(), Message :: any(), #state{}) -> any().
|
|
|
+-spec send_to_node(RemoteNode :: node(), Message :: term(), #state{}) -> any().
|
|
|
send_to_node(RemoteNode, Message, #state{process_name = ProcessName}) ->
|
|
|
erlang:send({ProcessName, RemoteNode}, Message, [noconnect]).
|
|
|
|
|
@@ -135,10 +135,8 @@ send_to_node(RemoteNode, Message, #state{process_name = ProcessName}) ->
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
|
-spec init([term()]) ->
|
|
|
{ok, #state{}} |
|
|
|
- {ok, #state{}, Timeout :: non_neg_integer()} |
|
|
|
- ignore |
|
|
|
- {stop, Reason :: any()} |
|
|
|
- {continue, any()}.
|
|
|
+ {ok, #state{}, timeout() | hibernate | {continue, term()}} |
|
|
|
+ {stop, Reason :: term()} | ignore.
|
|
|
init([Handler, Scope, ProcessName]) ->
|
|
|
%% monitor nodes
|
|
|
ok = net_kernel:monitor_nodes(true),
|
|
@@ -165,17 +163,17 @@ init([Handler, Scope, ProcessName]) ->
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
|
%% Call messages
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
|
--spec handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
|
|
- State :: term()) ->
|
|
|
- {reply, Reply :: term(), NewState :: term()} |
|
|
|
- {reply, Reply :: term(), NewState :: term(), timeout() | hibernate | {continue, term()}} |
|
|
|
- {noreply, NewState :: term()} |
|
|
|
- {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
|
|
|
- {stop, Reason :: term(), Reply :: term(), NewState :: term()} |
|
|
|
- {stop, Reason :: term(), NewState :: term()}.
|
|
|
+-spec handle_call(Request :: term(), From :: {pid(), Tag :: term()}, #state{}) ->
|
|
|
+ {reply, Reply :: term(), #state{}} |
|
|
|
+ {reply, Reply :: term(), #state{}, timeout() | hibernate | {continue, term()}} |
|
|
|
+ {noreply, #state{}} |
|
|
|
+ {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
|
|
|
+ {stop, Reason :: term(), Reply :: term(), #state{}} |
|
|
|
+ {stop, Reason :: term(), #state{}}.
|
|
|
handle_call(get_subcluster_nodes, _From, #state{
|
|
|
- nodes = Nodes
|
|
|
+ nodes_map = NodesMap
|
|
|
} = State) ->
|
|
|
+ Nodes = maps:keys(NodesMap),
|
|
|
{reply, Nodes, State};
|
|
|
|
|
|
handle_call(Request, From, #state{handler = Handler} = State) ->
|
|
@@ -184,24 +182,24 @@ handle_call(Request, From, #state{handler = Handler} = State) ->
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
|
%% Cast messages
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
|
--spec handle_cast(Request :: term(), State :: term()) ->
|
|
|
- {noreply, NewState :: term()} |
|
|
|
- {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
|
|
|
- {stop, Reason :: term(), NewState :: term()}.
|
|
|
+-spec handle_cast(Request :: term(), #state{}) ->
|
|
|
+ {noreply, #state{}} |
|
|
|
+ {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
|
|
|
+ {stop, Reason :: term(), #state{}}.
|
|
|
handle_cast(Msg, #state{handler = Handler} = State) ->
|
|
|
Handler:handle_cast(Msg, State).
|
|
|
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
|
%% Info messages
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
|
--spec handle_info(Info :: timeout | term(), State :: term()) ->
|
|
|
- {noreply, NewState :: term()} |
|
|
|
- {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
|
|
|
- {stop, Reason :: term(), NewState :: term()}.
|
|
|
+-spec handle_info(Info :: timeout | term(), #state{}) ->
|
|
|
+ {noreply, #state{}} |
|
|
|
+ {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
|
|
|
+ {stop, Reason :: term(), #state{}}.
|
|
|
handle_info({'3.0', discover, RemoteScopePid}, #state{
|
|
|
handler = Handler,
|
|
|
scope = Scope,
|
|
|
- nodes = Nodes
|
|
|
+ nodes_map = NodesMap
|
|
|
} = State) ->
|
|
|
RemoteScopeNode = node(RemoteScopePid),
|
|
|
error_logger:info_msg("SYN[~s] Received DISCOVER request from node '~s'for ~s and scope '~s'",
|
|
@@ -211,7 +209,7 @@ handle_info({'3.0', discover, RemoteScopePid}, #state{
|
|
|
{ok, LocalData} = Handler:get_local_data(State),
|
|
|
send_to_node(RemoteScopeNode, {'3.0', ack_sync, self(), LocalData}, State),
|
|
|
%% is this a new node?
|
|
|
- case maps:is_key(RemoteScopeNode, Nodes) of
|
|
|
+ case maps:is_key(RemoteScopeNode, NodesMap) of
|
|
|
true ->
|
|
|
%% already known, ignore
|
|
|
{noreply, State};
|
|
@@ -219,12 +217,12 @@ handle_info({'3.0', discover, RemoteScopePid}, #state{
|
|
|
false ->
|
|
|
%% monitor
|
|
|
_MRef = monitor(process, RemoteScopePid),
|
|
|
- {noreply, State#state{nodes = Nodes#{RemoteScopeNode => RemoteScopePid}}}
|
|
|
+ {noreply, State#state{nodes_map = NodesMap#{RemoteScopeNode => RemoteScopePid}}}
|
|
|
end;
|
|
|
|
|
|
handle_info({'3.0', ack_sync, RemoteScopePid, Data}, #state{
|
|
|
handler = Handler,
|
|
|
- nodes = Nodes,
|
|
|
+ nodes_map = NodesMap,
|
|
|
scope = Scope
|
|
|
} = State) ->
|
|
|
RemoteScopeNode = node(RemoteScopePid),
|
|
@@ -234,7 +232,7 @@ handle_info({'3.0', ack_sync, RemoteScopePid, Data}, #state{
|
|
|
%% save remote data
|
|
|
Handler:save_remote_data(Data, State),
|
|
|
%% is this a new node?
|
|
|
- case maps:is_key(RemoteScopeNode, Nodes) of
|
|
|
+ case maps:is_key(RemoteScopeNode, NodesMap) of
|
|
|
true ->
|
|
|
%% already known
|
|
|
{noreply, State};
|
|
@@ -246,23 +244,23 @@ handle_info({'3.0', ack_sync, RemoteScopePid, Data}, #state{
|
|
|
{ok, LocalData} = Handler:get_local_data(State),
|
|
|
send_to_node(RemoteScopeNode, {'3.0', ack_sync, self(), LocalData}, State),
|
|
|
%% return
|
|
|
- {noreply, State#state{nodes = Nodes#{RemoteScopeNode => RemoteScopePid}}}
|
|
|
+ {noreply, State#state{nodes_map = NodesMap#{RemoteScopeNode => RemoteScopePid}}}
|
|
|
end;
|
|
|
|
|
|
handle_info({'DOWN', MRef, process, Pid, Reason}, #state{
|
|
|
handler = Handler,
|
|
|
scope = Scope,
|
|
|
- nodes = Nodes
|
|
|
+ nodes_map = NodesMap
|
|
|
} = State) when node(Pid) =/= node() ->
|
|
|
%% scope process down
|
|
|
RemoteNode = node(Pid),
|
|
|
- case maps:take(RemoteNode, Nodes) of
|
|
|
- {Pid, Nodes1} ->
|
|
|
+ case maps:take(RemoteNode, NodesMap) of
|
|
|
+ {Pid, NodesMap1} ->
|
|
|
error_logger:info_msg("SYN[~s] Scope Process '~s' for ~s is DOWN on node '~s': ~p",
|
|
|
[node(), Scope, Handler, RemoteNode, Reason]
|
|
|
),
|
|
|
Handler:purge_local_data_for_node(RemoteNode, State),
|
|
|
- {noreply, State#state{nodes = Nodes1}};
|
|
|
+ {noreply, State#state{nodes_map = NodesMap1}};
|
|
|
|
|
|
error ->
|
|
|
%% relay to handler
|
|
@@ -289,6 +287,10 @@ handle_info(Info, #state{handler = Handler} = State) ->
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
|
%% Continue messages
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
|
+-spec handle_continue(Info :: term(), #state{}) ->
|
|
|
+ {noreply, #state{}} |
|
|
|
+ {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
|
|
|
+ {stop, Reason :: term(), #state{}}.
|
|
|
handle_continue(after_init, #state{
|
|
|
handler = Handler,
|
|
|
scope = Scope
|
|
@@ -300,15 +302,15 @@ handle_continue(after_init, #state{
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
|
%% Terminate
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
|
--spec terminate(Reason :: any(), #state{}) -> terminated.
|
|
|
+-spec terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), #state{}) -> any().
|
|
|
terminate(Reason, #state{handler = Handler}) ->
|
|
|
- error_logger:info_msg("SYN[~s] ~s terminating with reason: ~p", [node(), Handler, Reason]),
|
|
|
- terminated.
|
|
|
+ error_logger:info_msg("SYN[~s] ~s terminating with reason: ~p", [node(), Handler, Reason]).
|
|
|
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
|
%% Convert process state when code is changed.
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
|
--spec code_change(OldVsn :: any(), #state{}, Extra :: any()) -> {ok, #state{}}.
|
|
|
+-spec code_change(OldVsn :: (term() | {down, term()}), #state{}, Extra :: term()) ->
|
|
|
+ {ok, NewState :: term()} | {error, Reason :: term()}.
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
|
{ok, State}.
|
|
|
|
|
@@ -322,10 +324,10 @@ get_process_name_for_scope(Handler, Scope) ->
|
|
|
-spec multicast_loop() -> terminated.
|
|
|
multicast_loop() ->
|
|
|
receive
|
|
|
- {broadcast, Message, ExcludedNodes, #state{process_name = ProcessName, nodes = Nodes}} ->
|
|
|
+ {broadcast, Message, ExcludedNodes, #state{process_name = ProcessName, nodes_map = NodesMap}} ->
|
|
|
lists:foreach(fun(RemoteNode) ->
|
|
|
erlang:send({ProcessName, RemoteNode}, Message, [noconnect])
|
|
|
- end, maps:keys(Nodes) -- ExcludedNodes),
|
|
|
+ end, maps:keys(NodesMap) -- ExcludedNodes),
|
|
|
multicast_loop();
|
|
|
|
|
|
{broadcast_all_cluster, Message, #state{process_name = ProcessName}} ->
|