|
@@ -267,8 +267,6 @@ handle_call(Request, From, State) ->
|
|
{stop, Reason :: any(), #state{}}.
|
|
{stop, Reason :: any(), #state{}}.
|
|
|
|
|
|
handle_cast({sync_register, Name, RemotePid, RemoteMeta}, State) ->
|
|
handle_cast({sync_register, Name, RemotePid, RemoteMeta}, State) ->
|
|
- %% get remote node
|
|
|
|
- RemoteNode = node(RemotePid),
|
|
|
|
%% check for conflicts
|
|
%% check for conflicts
|
|
case find_registry_tuple_by_name(Name) of
|
|
case find_registry_tuple_by_name(Name) of
|
|
undefined ->
|
|
undefined ->
|
|
@@ -285,27 +283,28 @@ handle_cast({sync_register, Name, RemotePid, RemoteMeta}, State) ->
|
|
fun() ->
|
|
fun() ->
|
|
error_logger:warning_msg(
|
|
error_logger:warning_msg(
|
|
"Syn(~p): REGISTRY INCONSISTENCY (name: ~p for ~p and ~p) ----> Received from remote node ~p~n",
|
|
"Syn(~p): REGISTRY INCONSISTENCY (name: ~p for ~p and ~p) ----> Received from remote node ~p~n",
|
|
- [node(), Name, {TablePid, TableMeta}, {RemotePid, RemoteMeta}, RemoteNode]
|
|
|
|
|
|
+ [node(), Name, {TablePid, TableMeta}, {RemotePid, RemoteMeta}, node(RemotePid)]
|
|
),
|
|
),
|
|
|
|
|
|
case resolve_conflict(Name, {TablePid, TableMeta}, {RemotePid, RemoteMeta}, State) of
|
|
case resolve_conflict(Name, {TablePid, TableMeta}, {RemotePid, RemoteMeta}, State) of
|
|
{TablePid, KillOtherPid} ->
|
|
{TablePid, KillOtherPid} ->
|
|
%% keep local
|
|
%% keep local
|
|
%% demonitor
|
|
%% demonitor
|
|
- MonitorRef = rpc:call(RemoteNode, syn_registry, find_monitor_for_pid, [RemotePid]),
|
|
|
|
|
|
+ MonitorRef = rpc:call(node(RemotePid), syn_registry, find_monitor_for_pid, [RemotePid]),
|
|
sync_demonitor_and_kill_on_node(Name, RemotePid, RemoteMeta, MonitorRef, KillOtherPid),
|
|
sync_demonitor_and_kill_on_node(Name, RemotePid, RemoteMeta, MonitorRef, KillOtherPid),
|
|
- %% overwrite local data to all remote nodes
|
|
|
|
|
|
+ %% overwrite local data to all remote nodes, except TablePid's node
|
|
|
|
+ NodesExceptLocalAndTablePidNode = nodes() -- [node(TablePid)],
|
|
lists:foreach(fun(RNode) ->
|
|
lists:foreach(fun(RNode) ->
|
|
ok = rpc:call(RNode, syn_registry, add_to_local_table, [Name, TablePid, TableMeta, undefined])
|
|
ok = rpc:call(RNode, syn_registry, add_to_local_table, [Name, TablePid, TableMeta, undefined])
|
|
- end, nodes());
|
|
|
|
|
|
+ end, NodesExceptLocalAndTablePidNode);
|
|
|
|
|
|
{RemotePid, KillOtherPid} ->
|
|
{RemotePid, KillOtherPid} ->
|
|
%% keep remote
|
|
%% keep remote
|
|
%% demonitor
|
|
%% demonitor
|
|
MonitorRef = rpc:call(node(TablePid), syn_registry, find_monitor_for_pid, [TablePid]),
|
|
MonitorRef = rpc:call(node(TablePid), syn_registry, find_monitor_for_pid, [TablePid]),
|
|
sync_demonitor_and_kill_on_node(Name, TablePid, TableMeta, MonitorRef, KillOtherPid),
|
|
sync_demonitor_and_kill_on_node(Name, TablePid, TableMeta, MonitorRef, KillOtherPid),
|
|
- %% overwrite remote data to all other nodes (including local)
|
|
|
|
- NodesExceptRemoteNode = [node() | nodes()] -- [RemoteNode],
|
|
|
|
|
|
+ %% overwrite remote data to all other nodes (including local), except RemotePid's node
|
|
|
|
+ NodesExceptRemoteNode = [node() | nodes()] -- [node(RemotePid)],
|
|
lists:foreach(fun(RNode) ->
|
|
lists:foreach(fun(RNode) ->
|
|
ok = rpc:call(RNode, syn_registry, add_to_local_table, [Name, RemotePid, RemoteMeta, undefined])
|
|
ok = rpc:call(RNode, syn_registry, add_to_local_table, [Name, RemotePid, RemoteMeta, undefined])
|
|
end, NodesExceptRemoteNode);
|
|
end, NodesExceptRemoteNode);
|
|
@@ -512,15 +511,6 @@ unregister_on_node(Name) ->
|
|
|
|
|
|
-spec add_to_local_table(Name :: any(), Pid :: pid(), Meta :: any(), MonitorRef :: undefined | reference()) -> ok.
|
|
-spec add_to_local_table(Name :: any(), Pid :: pid(), Meta :: any(), MonitorRef :: undefined | reference()) -> ok.
|
|
add_to_local_table(Name, Pid, Meta, MonitorRef) ->
|
|
add_to_local_table(Name, Pid, Meta, MonitorRef) ->
|
|
- %% keep monitor reference if it exists for pid & new monitor is undefined
|
|
|
|
- %% (rpc calls from other nodes during conflict resolution may otherwise overwrite this)
|
|
|
|
- MonitorRef1 = case MonitorRef of
|
|
|
|
- undefined ->
|
|
|
|
- find_monitor_for_pid(Pid);
|
|
|
|
-
|
|
|
|
- _ ->
|
|
|
|
- MonitorRef
|
|
|
|
- end,
|
|
|
|
%% remove entry if previous exists
|
|
%% remove entry if previous exists
|
|
case find_registry_tuple_by_name(Name) of
|
|
case find_registry_tuple_by_name(Name) of
|
|
undefined ->
|
|
undefined ->
|
|
@@ -530,8 +520,8 @@ add_to_local_table(Name, Pid, Meta, MonitorRef) ->
|
|
ets:delete(syn_registry_by_pid, {OldPid, Name})
|
|
ets:delete(syn_registry_by_pid, {OldPid, Name})
|
|
end,
|
|
end,
|
|
%% overwrite & add
|
|
%% overwrite & add
|
|
- ets:insert(syn_registry_by_name, {Name, Pid, Meta, MonitorRef1, node(Pid)}),
|
|
|
|
- ets:insert(syn_registry_by_pid, {{Pid, Name}, Meta, MonitorRef1, node(Pid)}),
|
|
|
|
|
|
+ ets:insert(syn_registry_by_name, {Name, Pid, Meta, MonitorRef, node(Pid)}),
|
|
|
|
+ ets:insert(syn_registry_by_pid, {{Pid, Name}, Meta, MonitorRef, node(Pid)}),
|
|
ok.
|
|
ok.
|
|
|
|
|
|
-spec remove_from_local_table(Name :: any(), Pid :: pid()) -> ok.
|
|
-spec remove_from_local_table(Name :: any(), Pid :: pid()) -> ok.
|
|
@@ -649,7 +639,7 @@ registry_automerge(RemoteNode, State) ->
|
|
raw_purge_registry_entries_for_remote_node(RemoteNode),
|
|
raw_purge_registry_entries_for_remote_node(RemoteNode),
|
|
%% loop
|
|
%% loop
|
|
F = fun({Name, RemotePid, RemoteMeta}) ->
|
|
F = fun({Name, RemotePid, RemoteMeta}) ->
|
|
- resolve_tuple_in_automerge(Name, RemotePid, RemoteMeta, RemoteNode, State)
|
|
|
|
|
|
+ resolve_tuple_in_automerge(Name, RemotePid, RemoteMeta, State)
|
|
end,
|
|
end,
|
|
%% add to table
|
|
%% add to table
|
|
lists:foreach(F, Entries),
|
|
lists:foreach(F, Entries),
|
|
@@ -663,10 +653,9 @@ registry_automerge(RemoteNode, State) ->
|
|
Name :: any(),
|
|
Name :: any(),
|
|
RemotePid :: pid(),
|
|
RemotePid :: pid(),
|
|
RemoteMeta :: any(),
|
|
RemoteMeta :: any(),
|
|
- RemoteNode :: node(),
|
|
|
|
#state{}
|
|
#state{}
|
|
) -> any().
|
|
) -> any().
|
|
-resolve_tuple_in_automerge(Name, RemotePid, RemoteMeta, RemoteNode, State) ->
|
|
|
|
|
|
+resolve_tuple_in_automerge(Name, RemotePid, RemoteMeta, State) ->
|
|
%% check if same name is registered
|
|
%% check if same name is registered
|
|
case find_registry_tuple_by_name(Name) of
|
|
case find_registry_tuple_by_name(Name) of
|
|
undefined ->
|
|
undefined ->
|
|
@@ -683,15 +672,17 @@ resolve_tuple_in_automerge(Name, RemotePid, RemoteMeta, RemoteNode, State) ->
|
|
{TablePid, KillOtherPid} ->
|
|
{TablePid, KillOtherPid} ->
|
|
%% keep local
|
|
%% keep local
|
|
%% demonitor
|
|
%% demonitor
|
|
- MonitorRef = rpc:call(RemoteNode, syn_registry, find_monitor_for_pid, [RemotePid]),
|
|
|
|
|
|
+ %% TODO: test demonitor
|
|
|
|
+ MonitorRef = rpc:call(node(RemotePid), syn_registry, find_monitor_for_pid, [RemotePid]),
|
|
sync_demonitor_and_kill_on_node(Name, RemotePid, RemoteMeta, MonitorRef, KillOtherPid),
|
|
sync_demonitor_and_kill_on_node(Name, RemotePid, RemoteMeta, MonitorRef, KillOtherPid),
|
|
%% remote data still on remote node, remove there
|
|
%% remote data still on remote node, remove there
|
|
- ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name, RemotePid]);
|
|
|
|
|
|
+ ok = rpc:call(node(RemotePid), syn_registry, remove_from_local_table, [Name, RemotePid]);
|
|
|
|
|
|
{RemotePid, KillOtherPid} ->
|
|
{RemotePid, KillOtherPid} ->
|
|
%% keep remote
|
|
%% keep remote
|
|
%% demonitor
|
|
%% demonitor
|
|
- MonitorRef = rpc:call(RemoteNode, syn_registry, find_monitor_for_pid, [TablePid]),
|
|
|
|
|
|
+ %% TODO: test demonitor
|
|
|
|
+ MonitorRef = rpc:call(node(TablePid), syn_registry, find_monitor_for_pid, [TablePid]),
|
|
sync_demonitor_and_kill_on_node(Name, TablePid, TableMeta, MonitorRef, KillOtherPid),
|
|
sync_demonitor_and_kill_on_node(Name, TablePid, TableMeta, MonitorRef, KillOtherPid),
|
|
%% overwrite remote data to local
|
|
%% overwrite remote data to local
|
|
add_to_local_table(Name, RemotePid, RemoteMeta, undefined);
|
|
add_to_local_table(Name, RemotePid, RemoteMeta, undefined);
|
|
@@ -699,7 +690,7 @@ resolve_tuple_in_automerge(Name, RemotePid, RemoteMeta, RemoteNode, State) ->
|
|
undefined ->
|
|
undefined ->
|
|
%% both are dead, remove from local & remote
|
|
%% both are dead, remove from local & remote
|
|
remove_from_local_table(Name, TablePid),
|
|
remove_from_local_table(Name, TablePid),
|
|
- ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name, RemotePid])
|
|
|
|
|
|
+ ok = rpc:call(node(RemotePid), syn_registry, remove_from_local_table, [Name, RemotePid])
|
|
end
|
|
end
|
|
end.
|
|
end.
|
|
|
|
|