|
@@ -281,10 +281,10 @@ handle_info({'DOWN', _MonitorRef, process, Pid, Reason}, State) ->
|
|
|
%% get process info
|
|
|
GroupName = Entry#syn_groups_table.name,
|
|
|
Meta = Entry#syn_groups_table.meta,
|
|
|
- %% handle
|
|
|
- handle_process_down(GroupName, Pid, Meta, Reason, State),
|
|
|
%% remove from table
|
|
|
remove_from_local_table(Entry),
|
|
|
+ %% handle
|
|
|
+ handle_process_down(GroupName, Pid, Meta, Reason, State),
|
|
|
%% multicast
|
|
|
multicast_leave(GroupName, Pid)
|
|
|
end, Entries)
|
|
@@ -303,7 +303,17 @@ handle_info({nodeup, RemoteNode}, State) ->
|
|
|
"Syn(~p): Received ~p group entrie(s) from remote node ~p",
|
|
|
[node(), length(GroupTuples), RemoteNode]
|
|
|
),
|
|
|
- write_group_tuples_for_node(GroupTuples, RemoteNode),
|
|
|
+ %% ensure that groups doesn't have any joining node's entries
|
|
|
+ raw_purge_group_entries_for_node(RemoteNode),
|
|
|
+ %% add
|
|
|
+ lists:foreach(fun({GroupName, RemotePid, RemoteMeta}) ->
|
|
|
+ case rpc:call(node(RemotePid), erlang, is_process_alive, [RemotePid]) of
|
|
|
+ true ->
|
|
|
+ add_to_local_table(GroupName, RemotePid, RemoteMeta, undefined);
|
|
|
+ _ ->
|
|
|
+ ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [GroupName, RemotePid])
|
|
|
+ end
|
|
|
+ end, GroupTuples),
|
|
|
%% exit
|
|
|
error_logger:warning_msg("Syn(~p): GROUPS AUTOMERGE <---- Done for remote node ~p", [node(), RemoteNode])
|
|
|
end
|
|
@@ -447,20 +457,6 @@ handle_process_down(GroupName, Pid, Meta, Reason, #state{
|
|
|
syn_event_handler:do_on_group_process_exit(GroupName, Pid, Meta, Reason, CustomEventHandler)
|
|
|
end.
|
|
|
|
|
|
--spec write_group_tuples_for_node(GroupTuples :: [syn_registry_tuple()], RemoteNode :: node()) -> ok.
|
|
|
-write_group_tuples_for_node(GroupTuples, RemoteNode) ->
|
|
|
- %% ensure that groups doesn't have any joining node's entries
|
|
|
- raw_purge_group_entries_for_node(RemoteNode),
|
|
|
- %% add
|
|
|
- lists:foreach(fun({GroupName, RemotePid, RemoteMeta}) ->
|
|
|
- case rpc:call(node(RemotePid), erlang, is_process_alive, [RemotePid]) of
|
|
|
- true ->
|
|
|
- add_to_local_table(GroupName, RemotePid, RemoteMeta, undefined);
|
|
|
- _ ->
|
|
|
- ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [GroupName, RemotePid])
|
|
|
- end
|
|
|
- end, GroupTuples).
|
|
|
-
|
|
|
-spec raw_purge_group_entries_for_node(Node :: atom()) -> ok.
|
|
|
raw_purge_group_entries_for_node(Node) ->
|
|
|
%% NB: no demonitoring is done, this is why it's raw
|
|
@@ -477,6 +473,7 @@ raw_purge_group_entries_for_node(Node) ->
|
|
|
Message :: any(),
|
|
|
Timeout :: non_neg_integer()
|
|
|
) -> any().
|
|
|
+
|
|
|
multi_call_and_receive(CollectorPid, Pid, Message, Timeout) ->
|
|
|
MonitorRef = monitor(process, Pid),
|
|
|
Pid ! {syn_multi_call, self(), Message},
|