|
@@ -113,7 +113,7 @@ count() ->
|
|
|
-spec count(Node :: node()) -> non_neg_integer().
|
|
|
count(Node) ->
|
|
|
ets:select_count(syn_registry_by_name, [{
|
|
|
- {'_', '_', '_', '_', '_', Node},
|
|
|
+ {{'_', '_'}, '_', '_', '_', Node},
|
|
|
[],
|
|
|
[true]
|
|
|
}]).
|
|
@@ -293,7 +293,7 @@ handle_cast({sync_register, Name, RemotePid, RemoteMeta, RemoteTime, Force}, Sta
|
|
|
|
|
|
case resolve_conflict(Name, {TablePid, TableMeta, TableTime}, {RemotePid, RemoteMeta, RemoteTime}, State) of
|
|
|
{TablePid, KillOtherPid} ->
|
|
|
- %% keep local
|
|
|
+ %% keep table
|
|
|
%% demonitor
|
|
|
MonitorRef = rpc:call(node(RemotePid), syn_registry, find_monitor_for_pid, [RemotePid]),
|
|
|
sync_demonitor_and_kill_on_node(Name, RemotePid, RemoteMeta, MonitorRef, KillOtherPid),
|
|
@@ -486,7 +486,7 @@ unregister_on_node(Name) ->
|
|
|
undefined ->
|
|
|
{error, undefined};
|
|
|
|
|
|
- {Name, Pid, _Meta, _Clock, MonitorRef, _Node} when MonitorRef =/= undefined ->
|
|
|
+ {{Name, Pid}, _Meta, _Clock, MonitorRef, _Node} when MonitorRef =/= undefined ->
|
|
|
%% demonitor
|
|
|
erlang:demonitor(MonitorRef, [flush]),
|
|
|
%% remove from table
|
|
@@ -494,7 +494,7 @@ unregister_on_node(Name) ->
|
|
|
%% return
|
|
|
{ok, Pid};
|
|
|
|
|
|
- {Name, Pid, _Meta, _Clock, _MonitorRef, Node} = RegistryEntry when Node =:= node() ->
|
|
|
+ {{Name, Pid}, _Meta, _Clock, _MonitorRef, Node} = RegistryEntry when Node =:= node() ->
|
|
|
error_logger:error_msg(
|
|
|
"Syn(~p): INTERNAL ERROR | Registry entry ~p has no monitor but it's running on node~n",
|
|
|
[node(), RegistryEntry]
|
|
@@ -528,35 +528,22 @@ add_to_local_table(Name, Pid, Meta, Time, MonitorRef) ->
|
|
|
undefined;
|
|
|
|
|
|
{Name, PreviousPid, _, _} ->
|
|
|
- ets:delete(syn_registry_by_pid, {PreviousPid, Name})
|
|
|
+ remove_from_local_table(Name, PreviousPid)
|
|
|
end,
|
|
|
%% overwrite & add
|
|
|
- ets:insert(syn_registry_by_name, {Name, Pid, Meta, Time, MonitorRef, node(Pid)}),
|
|
|
+ ets:insert(syn_registry_by_name, {{Name, Pid}, Meta, Time, MonitorRef, node(Pid)}),
|
|
|
ets:insert(syn_registry_by_pid, {{Pid, Name}, Meta, Time, MonitorRef, node(Pid)}),
|
|
|
ok.
|
|
|
|
|
|
-spec remove_from_local_table(Name :: any(), Pid :: pid()) -> ok.
|
|
|
remove_from_local_table(Name, Pid) ->
|
|
|
- case find_registry_tuple_by_name(Name) of
|
|
|
- undefined ->
|
|
|
- ok;
|
|
|
-
|
|
|
- {Name, Pid, _, _} ->
|
|
|
- ets:delete(syn_registry_by_name, Name),
|
|
|
- ets:delete(syn_registry_by_pid, {Pid, Name}),
|
|
|
- ok;
|
|
|
-
|
|
|
- {Name, TablePid, _, _} ->
|
|
|
- error_logger:info_msg(
|
|
|
- "Syn(~p): Request to delete registry name ~p for pid ~p but locally have ~p, ignoring~n",
|
|
|
- [node(), Name, Pid, TablePid]
|
|
|
- )
|
|
|
- end.
|
|
|
+ ets:delete(syn_registry_by_name, {Name, Pid}),
|
|
|
+ ets:delete(syn_registry_by_pid, {Pid, Name}).
|
|
|
|
|
|
-spec find_registry_tuple_by_name(Name :: any()) -> RegistryTuple :: syn_registry_tuple() | undefined.
|
|
|
find_registry_tuple_by_name(Name) ->
|
|
|
case ets:select(syn_registry_by_name, [{
|
|
|
- {Name, '$2', '$3', '$4', '_', '_'},
|
|
|
+ {{Name, '$2'}, '$3', '$4', '_', '_'},
|
|
|
[],
|
|
|
[{{{const, Name}, '$2', '$3', '$4'}}]
|
|
|
}]) of
|
|
@@ -567,7 +554,7 @@ find_registry_tuple_by_name(Name) ->
|
|
|
-spec find_registry_entry_by_name(Name :: any()) -> Entry :: syn_registry_entry() | undefined.
|
|
|
find_registry_entry_by_name(Name) ->
|
|
|
case ets:select(syn_registry_by_name, [{
|
|
|
- {Name, '$2', '$3', '_', '_', '_'},
|
|
|
+ {{Name, '$2'}, '$3', '_', '_', '_'},
|
|
|
[],
|
|
|
['$_']
|
|
|
}]) of
|
|
@@ -586,7 +573,7 @@ find_monitor_for_pid(Pid) when is_pid(Pid) ->
|
|
|
_ -> undefined
|
|
|
end.
|
|
|
|
|
|
--spec find_registry_tuples_by_pid(Pid :: pid()) -> Entries :: [syn_registry_tuple()].
|
|
|
+-spec find_registry_tuples_by_pid(Pid :: pid()) -> RegistryTuples :: [syn_registry_tuple()].
|
|
|
find_registry_tuples_by_pid(Pid) when is_pid(Pid) ->
|
|
|
ets:select(syn_registry_by_pid, [{
|
|
|
{{Pid, '$2'}, '$3', '$4', '_', '_'},
|
|
@@ -597,7 +584,7 @@ find_registry_tuples_by_pid(Pid) when is_pid(Pid) ->
|
|
|
-spec get_registry_tuples_for_node(Node :: node()) -> [syn_registry_tuple()].
|
|
|
get_registry_tuples_for_node(Node) ->
|
|
|
ets:select(syn_registry_by_name, [{
|
|
|
- {'$1', '$2', '$3', '$4', '_', Node},
|
|
|
+ {{'$1', '$2'}, '$3', '$4', '_', Node},
|
|
|
[],
|
|
|
[{{'$1', '$2', '$3', '$4'}}]
|
|
|
}]).
|
|
@@ -636,10 +623,10 @@ registry_automerge(RemoteNode, State) ->
|
|
|
[node(), RemoteNode]
|
|
|
);
|
|
|
|
|
|
- Entries ->
|
|
|
+ RegistryTuples ->
|
|
|
error_logger:info_msg(
|
|
|
"Syn(~p): Received ~p registry tuple(s) from remote node ~p~n",
|
|
|
- [node(), length(Entries), RemoteNode]
|
|
|
+ [node(), length(RegistryTuples), RemoteNode]
|
|
|
),
|
|
|
%% ensure that registry doesn't have any joining node's entries (here again for race conditions)
|
|
|
raw_purge_registry_entries_for_remote_node(RemoteNode),
|
|
@@ -648,7 +635,7 @@ registry_automerge(RemoteNode, State) ->
|
|
|
resolve_tuple_in_automerge(Name, RemotePid, RemoteMeta, RemoteTime, State)
|
|
|
end,
|
|
|
%% add to table
|
|
|
- lists:foreach(F, Entries),
|
|
|
+ lists:foreach(F, RegistryTuples),
|
|
|
%% exit
|
|
|
error_logger:info_msg("Syn(~p): REGISTRY AUTOMERGE <---- Done for remote node ~p~n", [node(), RemoteNode])
|
|
|
end
|
|
@@ -780,13 +767,13 @@ do_sync_from_full_cluster(State) ->
|
|
|
-spec raw_purge_registry_entries_for_remote_node(Node :: atom()) -> ok.
|
|
|
raw_purge_registry_entries_for_remote_node(Node) when Node =/= node() ->
|
|
|
%% NB: no demonitoring is done, this is why it's raw
|
|
|
- ets:match_delete(syn_registry_by_name, {'_', '_', '_', '_', '_', Node}),
|
|
|
+ ets:match_delete(syn_registry_by_name, {{'_', '_'}, '_', '_', '_', Node}),
|
|
|
ets:match_delete(syn_registry_by_pid, {{'_', '_'}, '_', '_', '_', Node}),
|
|
|
ok.
|
|
|
|
|
|
-spec rebuild_monitors() -> ok.
|
|
|
rebuild_monitors() ->
|
|
|
- Entries = get_registry_tuples_for_node(node()),
|
|
|
+ RegistryTuples = get_registry_tuples_for_node(node()),
|
|
|
lists:foreach(fun({Name, Pid, Meta, Time}) ->
|
|
|
case is_process_alive(Pid) of
|
|
|
true ->
|
|
@@ -796,7 +783,7 @@ rebuild_monitors() ->
|
|
|
_ ->
|
|
|
remove_from_local_table(Name, Pid)
|
|
|
end
|
|
|
- end, Entries).
|
|
|
+ end, RegistryTuples).
|
|
|
|
|
|
-spec set_timer_for_anti_entropy(#state{}) -> ok.
|
|
|
set_timer_for_anti_entropy(#state{anti_entropy_interval_ms = undefined}) -> ok;
|