|
@@ -153,6 +153,8 @@ count(Scope, Node) ->
|
|
|
init([Scope, ProcessName]) ->
|
|
|
%% monitor nodes
|
|
|
ok = net_kernel:monitor_nodes(true),
|
|
|
+ %% rebuild monitors (if after crash)
|
|
|
+ rebuild_monitors(Scope),
|
|
|
%% build state
|
|
|
State = #state{
|
|
|
scope = Scope,
|
|
@@ -335,7 +337,7 @@ handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{scope = Scope} = State
|
|
|
);
|
|
|
|
|
|
Entries ->
|
|
|
- lists:foreach(fun({{Name, Pid}, _, _, _, _}) ->
|
|
|
+ lists:foreach(fun({{Name, _Pid}, _, _, _, _}) ->
|
|
|
%% remove from table
|
|
|
remove_from_local_table(Scope, Name, Pid),
|
|
|
%% broadcast
|
|
@@ -382,6 +384,21 @@ get_process_name_for_scope(Scope) ->
|
|
|
ScopeBin = atom_to_binary(Scope),
|
|
|
binary_to_atom(<<ModuleBin/binary, "_", ScopeBin/binary>>).
|
|
|
|
|
|
+-spec rebuild_monitors(Scope :: atom()) -> ok.
|
|
|
+rebuild_monitors(Scope) ->
|
|
|
+ RegistryTuples = get_registry_tuples_for_node(Scope, node()),
|
|
|
+ lists:foreach(fun({Name, Pid, Meta, Time}) ->
|
|
|
+ remove_from_local_table(Scope, Name, Pid),
|
|
|
+ case is_process_alive(Pid) of
|
|
|
+ true ->
|
|
|
+ MRef = erlang:monitor(process, Pid),
|
|
|
+ add_to_local_table(Scope, Name, Pid, Meta, Time, MRef);
|
|
|
+
|
|
|
+ _ ->
|
|
|
+ ok
|
|
|
+ end
|
|
|
+ end, RegistryTuples).
|
|
|
+
|
|
|
-spec broadcast(Message :: any(), #state{}) -> any().
|
|
|
broadcast(Message, #state{process_name = ProcessName, nodes = Nodes}) ->
|
|
|
lists:foreach(fun(RemoteNode) -> gen_server:cast({ProcessName, RemoteNode}, Message) end, maps:keys(Nodes)).
|
|
@@ -396,6 +413,14 @@ cast_to_node(RemoteNode, Message, #state{
|
|
|
}) ->
|
|
|
gen_server:cast({ProcessName, RemoteNode}, Message).
|
|
|
|
|
|
+-spec get_registry_tuples_for_node(Scope :: atom(), Node :: node()) -> [syn_registry_tuple()].
|
|
|
+get_registry_tuples_for_node(Scope, Node) ->
|
|
|
+ ets:select(syn_backbone:get_table_name(syn_registry_by_name, Scope), [{
|
|
|
+ {{'$1', '$2'}, '$3', '$4', '_', Node},
|
|
|
+ [],
|
|
|
+ [{{'$1', '$2', '$3', '$4'}}]
|
|
|
+ }]).
|
|
|
+
|
|
|
-spec find_registry_entry_by_name(Scope :: atom(), Name :: any()) -> Entry :: syn_registry_entry() | undefined.
|
|
|
find_registry_entry_by_name(Scope, Name) ->
|
|
|
case ets:select(syn_backbone:get_table_name(syn_registry_by_name, Scope), [{
|