|
@@ -77,7 +77,7 @@ lookup(Scope, Name) ->
|
|
TableByName ->
|
|
TableByName ->
|
|
case find_registry_entry_by_name(Name, TableByName) of
|
|
case find_registry_entry_by_name(Name, TableByName) of
|
|
undefined -> undefined;
|
|
undefined -> undefined;
|
|
- {{Name, Pid}, Meta, _, _, _} -> {Pid, Meta}
|
|
|
|
|
|
+ {Name, Pid, Meta, _, _, _} -> {Pid, Meta}
|
|
end
|
|
end
|
|
end.
|
|
end.
|
|
|
|
|
|
@@ -124,7 +124,7 @@ unregister(Scope, Name) ->
|
|
undefined ->
|
|
undefined ->
|
|
{error, undefined};
|
|
{error, undefined};
|
|
|
|
|
|
- {{Name, Pid}, Meta, _, _, _} ->
|
|
|
|
|
|
+ {Name, Pid, Meta, _, _, _} ->
|
|
Node = node(Pid),
|
|
Node = node(Pid),
|
|
case syn_gen_scope:call(?MODULE, Node, Scope, {unregister_on_owner, node(), Name, Pid}) of
|
|
case syn_gen_scope:call(?MODULE, Node, Scope, {unregister_on_owner, node(), Name, Pid}) of
|
|
{ok, TableByPid} when Node =/= node() ->
|
|
{ok, TableByPid} when Node =/= node() ->
|
|
@@ -157,7 +157,7 @@ count(Scope, Node) ->
|
|
|
|
|
|
TableByName ->
|
|
TableByName ->
|
|
ets:select_count(TableByName, [{
|
|
ets:select_count(TableByName, [{
|
|
- {{'_', '_'}, '_', '_', '_', Node},
|
|
|
|
|
|
+ {'_', '_', '_', '_', '_', Node},
|
|
[],
|
|
[],
|
|
[true]
|
|
[true]
|
|
}])
|
|
}])
|
|
@@ -213,7 +213,7 @@ handle_call({register_on_owner, RequesterNode, Name, Pid, Meta}, _From, #state{
|
|
%% return
|
|
%% return
|
|
{reply, {ok, {undefined, undefined, Time, TableByName, TableByPid}}, State};
|
|
{reply, {ok, {undefined, undefined, Time, TableByName, TableByPid}}, State};
|
|
|
|
|
|
- {{Name, Pid}, TableMeta, _TableTime, MRef, _TableNode} ->
|
|
|
|
|
|
+ {Name, Pid, TableMeta, _TableTime, MRef, _TableNode} ->
|
|
%% same pid, possibly new meta or time, overwrite
|
|
%% same pid, possibly new meta or time, overwrite
|
|
Time = erlang:system_time(),
|
|
Time = erlang:system_time(),
|
|
add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid),
|
|
add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid),
|
|
@@ -238,7 +238,7 @@ handle_call({unregister_on_owner, RequesterNode, Name, Pid}, _From, #state{
|
|
table_by_pid = TableByPid
|
|
table_by_pid = TableByPid
|
|
} = State) ->
|
|
} = State) ->
|
|
case find_registry_entry_by_name(Name, TableByName) of
|
|
case find_registry_entry_by_name(Name, TableByName) of
|
|
- {{Name, Pid}, Meta, _Time, _MRef, _Node} ->
|
|
|
|
|
|
+ {Name, Pid, Meta, _Time, _MRef, _Node} ->
|
|
%% demonitor if the process is not registered under other names
|
|
%% demonitor if the process is not registered under other names
|
|
maybe_demonitor(Pid, TableByPid),
|
|
maybe_demonitor(Pid, TableByPid),
|
|
%% remove from table
|
|
%% remove from table
|
|
@@ -250,7 +250,7 @@ handle_call({unregister_on_owner, RequesterNode, Name, Pid}, _From, #state{
|
|
%% return
|
|
%% return
|
|
{reply, {ok, TableByPid}, State};
|
|
{reply, {ok, TableByPid}, State};
|
|
|
|
|
|
- {{Name, _TablePid}, _Meta, _Time, _MRef, _Node} ->
|
|
|
|
|
|
+ {Name, _TablePid, _Meta, _Time, _MRef, _Node} ->
|
|
%% process is registered locally with another pid: race condition, wait for sync to happen & return error
|
|
%% process is registered locally with another pid: race condition, wait for sync to happen & return error
|
|
{reply, {{error, race_condition}, undefined}, State};
|
|
{reply, {{error, race_condition}, undefined}, State};
|
|
|
|
|
|
@@ -296,7 +296,7 @@ handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{
|
|
);
|
|
);
|
|
|
|
|
|
Entries ->
|
|
Entries ->
|
|
- lists:foreach(fun({{Name, _Pid}, Meta, _, _, _}) ->
|
|
|
|
|
|
+ lists:foreach(fun({Name, _Pid, Meta, _, _, _}) ->
|
|
%% remove from table
|
|
%% remove from table
|
|
remove_from_local_table(Name, Pid, TableByName, TableByPid),
|
|
remove_from_local_table(Name, Pid, TableByName, TableByPid),
|
|
%% callback
|
|
%% callback
|
|
@@ -358,21 +358,17 @@ rebuild_monitors(#state{
|
|
-spec get_registry_tuples_for_node(Node :: node(), TableByName :: atom()) -> [syn_registry_tuple()].
|
|
-spec get_registry_tuples_for_node(Node :: node(), TableByName :: atom()) -> [syn_registry_tuple()].
|
|
get_registry_tuples_for_node(Node, TableByName) ->
|
|
get_registry_tuples_for_node(Node, TableByName) ->
|
|
ets:select(TableByName, [{
|
|
ets:select(TableByName, [{
|
|
- {{'$1', '$2'}, '$3', '$4', '_', Node},
|
|
|
|
|
|
+ {'$1', '$2', '$3', '$4', '_', Node},
|
|
[],
|
|
[],
|
|
[{{'$1', '$2', '$3', '$4'}}]
|
|
[{{'$1', '$2', '$3', '$4'}}]
|
|
}]).
|
|
}]).
|
|
|
|
|
|
-spec find_registry_entry_by_name(Name :: any(), TableByName :: atom()) ->
|
|
-spec find_registry_entry_by_name(Name :: any(), TableByName :: atom()) ->
|
|
- Entry :: syn_registry_entry() | undefined | non_existent_table.
|
|
|
|
|
|
+ Entry :: syn_registry_entry() | undefined.
|
|
find_registry_entry_by_name(Name, TableByName) ->
|
|
find_registry_entry_by_name(Name, TableByName) ->
|
|
- case ets:select(TableByName, [{
|
|
|
|
- {{Name, '_'}, '_', '_', '_', '_'},
|
|
|
|
- [],
|
|
|
|
- ['$_']
|
|
|
|
- }]) of
|
|
|
|
- [RegistryEntry] -> RegistryEntry;
|
|
|
|
- [] -> undefined
|
|
|
|
|
|
+ case ets:lookup(TableByName, Name) of
|
|
|
|
+ [] -> undefined;
|
|
|
|
+ [Entry] -> Entry
|
|
end.
|
|
end.
|
|
|
|
|
|
-spec find_registry_entries_by_pid(Pid :: pid(), TableByPid :: atom()) -> RegistryEntries :: [syn_registry_entry()].
|
|
-spec find_registry_entries_by_pid(Pid :: pid(), TableByPid :: atom()) -> RegistryEntries :: [syn_registry_entry()].
|
|
@@ -380,7 +376,7 @@ find_registry_entries_by_pid(Pid, TableByPid) when is_pid(Pid) ->
|
|
ets:select(TableByPid, [{
|
|
ets:select(TableByPid, [{
|
|
{{Pid, '$2'}, '$3', '$4', '$5', '$6'},
|
|
{{Pid, '$2'}, '$3', '$4', '$5', '$6'},
|
|
[],
|
|
[],
|
|
- [{{{{'$2', Pid}}, '$3', '$4', '$5', '$6'}}]
|
|
|
|
|
|
+ [{{'$2', Pid, '$3', '$4', '$5', '$6'}}]
|
|
}]).
|
|
}]).
|
|
|
|
|
|
-spec find_monitor_for_pid(Pid :: pid(), TableByPid :: atom()) -> reference() | undefined.
|
|
-spec find_monitor_for_pid(Pid :: pid(), TableByPid :: atom()) -> reference() | undefined.
|
|
@@ -422,7 +418,7 @@ maybe_demonitor(Pid, TableByPid) ->
|
|
) -> true.
|
|
) -> true.
|
|
add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid) ->
|
|
add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid) ->
|
|
%% insert
|
|
%% insert
|
|
- true = ets:insert(TableByName, {{Name, Pid}, Meta, Time, MRef, node(Pid)}),
|
|
|
|
|
|
+ true = ets:insert(TableByName, {Name, Pid, Meta, Time, MRef, node(Pid)}),
|
|
true = ets:insert(TableByPid, {{Pid, Name}, Meta, Time, MRef, node(Pid)}).
|
|
true = ets:insert(TableByPid, {{Pid, Name}, Meta, Time, MRef, node(Pid)}).
|
|
|
|
|
|
-spec remove_from_local_table(
|
|
-spec remove_from_local_table(
|
|
@@ -432,7 +428,7 @@ add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid) ->
|
|
TableByPid :: atom()
|
|
TableByPid :: atom()
|
|
) -> true.
|
|
) -> true.
|
|
remove_from_local_table(Name, Pid, TableByName, TableByPid) ->
|
|
remove_from_local_table(Name, Pid, TableByName, TableByPid) ->
|
|
- true = ets:delete(TableByName, {Name, Pid}),
|
|
|
|
|
|
+ true = ets:delete(TableByName, Name),
|
|
true = ets:delete(TableByPid, {Pid, Name}).
|
|
true = ets:delete(TableByPid, {Pid, Name}).
|
|
|
|
|
|
-spec update_local_table(
|
|
-spec update_local_table(
|
|
@@ -462,7 +458,7 @@ purge_registry_for_remote_node(Scope, Node, TableByName, TableByPid) when Node =
|
|
end, RegistryTuples)
|
|
end, RegistryTuples)
|
|
end),
|
|
end),
|
|
%% remove all from pid table
|
|
%% remove all from pid table
|
|
- true = ets:match_delete(TableByName, {{'_', '_'}, '_', '_', '_', Node}),
|
|
|
|
|
|
+ true = ets:match_delete(TableByName, {'_', '_', '_', '_', '_', Node}),
|
|
true = ets:match_delete(TableByPid, {{'_', '_'}, '_', '_', '_', Node}).
|
|
true = ets:match_delete(TableByPid, {{'_', '_'}, '_', '_', '_', Node}).
|
|
|
|
|
|
-spec handle_registry_sync(
|
|
-spec handle_registry_sync(
|
|
@@ -484,13 +480,13 @@ handle_registry_sync(Scope, Name, Pid, Meta, Time, #state{
|
|
%% callback
|
|
%% callback
|
|
syn_event_handler:do_on_process_registered(Scope, Name, {undefined, undefined}, {Pid, Meta});
|
|
syn_event_handler:do_on_process_registered(Scope, Name, {undefined, undefined}, {Pid, Meta});
|
|
|
|
|
|
- {{Name, Pid}, TableMeta, _TableTime, MRef, _TableNode} ->
|
|
|
|
|
|
+ {Name, Pid, TableMeta, _TableTime, MRef, _TableNode} ->
|
|
%% same pid, more recent (because it comes from the same node, which means that it's sequential)
|
|
%% same pid, more recent (because it comes from the same node, which means that it's sequential)
|
|
add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid),
|
|
add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid),
|
|
%% callback
|
|
%% callback
|
|
syn_event_handler:do_on_process_registered(Scope, Name, {Pid, TableMeta}, {Pid, Meta});
|
|
syn_event_handler:do_on_process_registered(Scope, Name, {Pid, TableMeta}, {Pid, Meta});
|
|
|
|
|
|
- {{Name, TablePid}, TableMeta, TableTime, TableMRef, _TableNode} when node(TablePid) =:= node() ->
|
|
|
|
|
|
+ {Name, TablePid, TableMeta, TableTime, TableMRef, _TableNode} when node(TablePid) =:= node() ->
|
|
%% current node runs a conflicting process -> resolve
|
|
%% current node runs a conflicting process -> resolve
|
|
%% * the conflict is resolved by the two nodes that own the conflicting processes
|
|
%% * the conflict is resolved by the two nodes that own the conflicting processes
|
|
%% * when a process is chosen, the time is updated
|
|
%% * when a process is chosen, the time is updated
|
|
@@ -498,14 +494,14 @@ handle_registry_sync(Scope, Name, Pid, Meta, Time, #state{
|
|
%% * recipients check that the time is more recent that what they have to ensure that there are no race conditions
|
|
%% * recipients check that the time is more recent that what they have to ensure that there are no race conditions
|
|
resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime, TableMRef}, State);
|
|
resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime, TableMRef}, State);
|
|
|
|
|
|
- {{Name, TablePid}, TableMeta, TableTime, _TableMRef, _TableNode} when TableTime < Time ->
|
|
|
|
|
|
+ {Name, TablePid, TableMeta, TableTime, _TableMRef, _TableNode} when TableTime < Time ->
|
|
%% current node does not own any of the conflicting processes, update
|
|
%% current node does not own any of the conflicting processes, update
|
|
update_local_table(Name, TablePid, {Pid, Meta, Time, undefined}, TableByName, TableByPid),
|
|
update_local_table(Name, TablePid, {Pid, Meta, Time, undefined}, TableByName, TableByPid),
|
|
%% callbacks
|
|
%% callbacks
|
|
syn_event_handler:do_on_process_unregistered(Scope, Name, TablePid, TableMeta),
|
|
syn_event_handler:do_on_process_unregistered(Scope, Name, TablePid, TableMeta),
|
|
syn_event_handler:do_on_process_registered(Scope, Name, {TablePid, TableMeta}, {Pid, Meta});
|
|
syn_event_handler:do_on_process_registered(Scope, Name, {TablePid, TableMeta}, {Pid, Meta});
|
|
|
|
|
|
- {{Name, _TablePid}, _TableMeta, _TableTime, _TableMRef, _TableNode} ->
|
|
|
|
|
|
+ {Name, _TablePid, _TableMeta, _TableTime, _TableMRef, _TableNode} ->
|
|
%% race condition: incoming data is older, ignore
|
|
%% race condition: incoming data is older, ignore
|
|
ok
|
|
ok
|
|
end.
|
|
end.
|