|
@@ -58,7 +58,7 @@
|
|
|
-spec start_link(Scope :: atom()) ->
|
|
|
{ok, Pid :: pid()} | {error, {already_started, Pid :: pid()}} | {error, Reason :: any()}.
|
|
|
start_link(Scope) when is_atom(Scope) ->
|
|
|
- syn_gen_scope:start_link(?MODULE, Scope, [Scope]).
|
|
|
+ syn_gen_scope:start_link(?MODULE, Scope).
|
|
|
|
|
|
-spec get_subcluster_nodes(#state{}) -> [node()].
|
|
|
get_subcluster_nodes(State) ->
|
|
@@ -70,10 +70,15 @@ lookup(Name) ->
|
|
|
|
|
|
-spec lookup(Scope :: atom(), Name :: any()) -> {pid(), Meta :: any()} | undefined.
|
|
|
lookup(Scope, Name) ->
|
|
|
- case find_registry_entry_by_name(Scope, Name) of
|
|
|
- undefined -> undefined;
|
|
|
- non_existent_table -> error({invalid_scope, Scope});
|
|
|
- {{Name, Pid}, Meta, _, _, _} -> {Pid, Meta}
|
|
|
+ case syn_backbone:get_table_name(syn_registry_by_name, Scope) of
|
|
|
+ undefined ->
|
|
|
+ error({invalid_scope, Scope});
|
|
|
+
|
|
|
+ TableByName ->
|
|
|
+ case find_registry_entry_by_name(Name, TableByName) of
|
|
|
+ undefined -> undefined;
|
|
|
+ {{Name, Pid}, Meta, _, _, _} -> {Pid, Meta}
|
|
|
+ end
|
|
|
end.
|
|
|
|
|
|
-spec register(Name :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
|
|
@@ -91,9 +96,9 @@ register(Scope, Name, Pid) when is_pid(Pid) ->
|
|
|
register(Scope, Name, Pid, Meta) ->
|
|
|
Node = node(Pid),
|
|
|
case syn_gen_scope:call(?MODULE, Node, Scope, {register_on_owner, node(), Name, Pid, Meta}) of
|
|
|
- {ok, {TablePid, TableMeta, Time}} when Node =/= node() ->
|
|
|
+ {ok, {TablePid, TableMeta, Time, TableByName, TableByPid}} when Node =/= node() ->
|
|
|
%% update table on caller node immediately so that subsequent calls have an updated registry
|
|
|
- add_to_local_table(Scope, Name, Pid, Meta, Time, undefined),
|
|
|
+ add_to_local_table(Name, Pid, Meta, Time, undefined, TableByName, TableByPid),
|
|
|
%% callback
|
|
|
syn_event_handler:do_on_process_registered(Scope, Name, {TablePid, TableMeta}, {Pid, Meta}),
|
|
|
%% return
|
|
@@ -109,46 +114,53 @@ unregister(Name) ->
|
|
|
|
|
|
-spec unregister(Scope :: atom(), Name :: any()) -> ok | {error, Reason :: any()}.
|
|
|
unregister(Scope, Name) ->
|
|
|
- % get process' node
|
|
|
- case find_registry_entry_by_name(Scope, Name) of
|
|
|
+ case syn_backbone:get_table_name(syn_registry_by_name, Scope) of
|
|
|
undefined ->
|
|
|
- {error, undefined};
|
|
|
-
|
|
|
- non_existent_table ->
|
|
|
error({invalid_scope, Scope});
|
|
|
|
|
|
- {{Name, Pid}, Meta, _, _, _} ->
|
|
|
- Node = node(Pid),
|
|
|
- case syn_gen_scope:call(?MODULE, Node, Scope, {unregister_on_owner, node(), Name, Pid}) of
|
|
|
- ok when Node =/= node() ->
|
|
|
- %% remove table on caller node immediately so that subsequent calls have an updated registry
|
|
|
- remove_from_local_table(Scope, Name, Pid),
|
|
|
- %% callback
|
|
|
- syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
|
|
|
- %% return
|
|
|
- ok;
|
|
|
-
|
|
|
- Response ->
|
|
|
- Response
|
|
|
+ TableByName ->
|
|
|
+ % get process' node
|
|
|
+ case find_registry_entry_by_name(Name, TableByName) of
|
|
|
+ undefined ->
|
|
|
+ {error, undefined};
|
|
|
+
|
|
|
+ {{Name, Pid}, Meta, _, _, _} ->
|
|
|
+ Node = node(Pid),
|
|
|
+ case syn_gen_scope:call(?MODULE, Node, Scope, {unregister_on_owner, node(), Name, Pid}) of
|
|
|
+ {ok, TableByPid} when Node =/= node() ->
|
|
|
+ %% remove table on caller node immediately so that subsequent calls have an updated registry
|
|
|
+ remove_from_local_table(Name, Pid, TableByName, TableByPid),
|
|
|
+ %% callback
|
|
|
+ syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
|
|
|
+ %% return
|
|
|
+ ok;
|
|
|
+
|
|
|
+ {Response, _} ->
|
|
|
+ Response
|
|
|
+ end
|
|
|
end
|
|
|
end.
|
|
|
|
|
|
-spec count(Scope :: atom()) -> non_neg_integer().
|
|
|
count(Scope) ->
|
|
|
- case ets:info(syn_backbone:get_table_name(syn_registry_by_name, Scope), size) of
|
|
|
+ TableByName = syn_backbone:get_table_name(syn_registry_by_name, Scope),
|
|
|
+ case ets:info(TableByName, size) of
|
|
|
undefined -> error({invalid_scope, Scope});
|
|
|
Value -> Value
|
|
|
end.
|
|
|
|
|
|
-spec count(Scope :: atom(), Node :: node()) -> non_neg_integer().
|
|
|
count(Scope, Node) ->
|
|
|
- case catch ets:select_count(syn_backbone:get_table_name(syn_registry_by_name, Scope), [{
|
|
|
- {{'_', '_'}, '_', '_', '_', Node},
|
|
|
- [],
|
|
|
- [true]
|
|
|
- }]) of
|
|
|
- {'EXIT', {badarg, [{ets, select_count, _, _} | _]}} -> error({invalid_scope, Scope});
|
|
|
- Value -> Value
|
|
|
+ case syn_backbone:get_table_name(syn_registry_by_name, Scope) of
|
|
|
+ undefined ->
|
|
|
+ error({invalid_scope, Scope});
|
|
|
+
|
|
|
+ TableByName ->
|
|
|
+ ets:select_count(TableByName, [{
|
|
|
+ {{'_', '_'}, '_', '_', '_', Node},
|
|
|
+ [],
|
|
|
+ [true]
|
|
|
+ }])
|
|
|
end.
|
|
|
|
|
|
%% ===================================================================
|
|
@@ -158,11 +170,11 @@ count(Scope, Node) ->
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
|
%% Init
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
|
--spec init(Args :: term()) -> {ok, State :: term()}.
|
|
|
-init([Scope]) ->
|
|
|
+-spec init(State :: term()) -> {ok, State :: term()}.
|
|
|
+init(State) ->
|
|
|
HandlerState = #{},
|
|
|
%% rebuild
|
|
|
- rebuild_monitors(Scope),
|
|
|
+ rebuild_monitors(State),
|
|
|
%% init
|
|
|
{ok, HandlerState}.
|
|
|
|
|
@@ -178,37 +190,39 @@ init([Scope]) ->
|
|
|
{stop, Reason :: term(), Reply :: term(), NewState :: term()} |
|
|
|
{stop, Reason :: term(), NewState :: term()}.
|
|
|
handle_call({register_on_owner, RequesterNode, Name, Pid, Meta}, _From, #state{
|
|
|
- scope = Scope
|
|
|
+ scope = Scope,
|
|
|
+ table_by_name = TableByName,
|
|
|
+ table_by_pid = TableByPid
|
|
|
} = State) ->
|
|
|
case is_process_alive(Pid) of
|
|
|
true ->
|
|
|
- case find_registry_entry_by_name(Scope, Name) of
|
|
|
+ case find_registry_entry_by_name(Name, TableByName) of
|
|
|
undefined ->
|
|
|
%% available
|
|
|
- MRef = case find_monitor_for_pid(Scope, Pid) of
|
|
|
+ MRef = case find_monitor_for_pid(Pid, TableByPid) of
|
|
|
undefined -> erlang:monitor(process, Pid); %% process is not monitored yet, add
|
|
|
MRef0 -> MRef0
|
|
|
end,
|
|
|
%% add to local table
|
|
|
Time = erlang:system_time(),
|
|
|
- add_to_local_table(Scope, Name, Pid, Meta, Time, MRef),
|
|
|
+ add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid),
|
|
|
%% callback
|
|
|
syn_event_handler:do_on_process_registered(Scope, Name, {undefined, undefined}, {Pid, Meta}),
|
|
|
%% broadcast
|
|
|
syn_gen_scope:broadcast({'3.0', sync_register, Scope, Name, Pid, Meta, Time}, [RequesterNode], State),
|
|
|
%% return
|
|
|
- {reply, {ok, {undefined, undefined, Time}}, State};
|
|
|
+ {reply, {ok, {undefined, undefined, Time, TableByName, TableByPid}}, State};
|
|
|
|
|
|
{{Name, Pid}, TableMeta, _TableTime, MRef, _TableNode} ->
|
|
|
%% same pid, possibly new meta or time, overwrite
|
|
|
Time = erlang:system_time(),
|
|
|
- add_to_local_table(Scope, Name, Pid, Meta, Time, MRef),
|
|
|
+ add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid),
|
|
|
%% callback
|
|
|
syn_event_handler:do_on_process_registered(Scope, Name, {Pid, TableMeta}, {Pid, Meta}),
|
|
|
%% broadcast
|
|
|
syn_gen_scope:broadcast({'3.0', sync_register, Scope, Name, Pid, Meta, Time}, State),
|
|
|
%% return
|
|
|
- {reply, {ok, {Pid, TableMeta, Time}}, State};
|
|
|
+ {reply, {ok, {Pid, TableMeta, Time, TableByName, TableByPid}}, State};
|
|
|
|
|
|
_ ->
|
|
|
{reply, {{error, taken}, undefined}, State}
|
|
@@ -218,26 +232,30 @@ handle_call({register_on_owner, RequesterNode, Name, Pid, Meta}, _From, #state{
|
|
|
{reply, {{error, not_alive}, undefined}, State}
|
|
|
end;
|
|
|
|
|
|
-handle_call({unregister_on_owner, RequesterNode, Name, Pid}, _From, #state{scope = Scope} = State) ->
|
|
|
- case find_registry_entry_by_name(Scope, Name) of
|
|
|
+handle_call({unregister_on_owner, RequesterNode, Name, Pid}, _From, #state{
|
|
|
+ scope = Scope,
|
|
|
+ table_by_name = TableByName,
|
|
|
+ table_by_pid = TableByPid
|
|
|
+} = State) ->
|
|
|
+ case find_registry_entry_by_name(Name, TableByName) of
|
|
|
{{Name, Pid}, Meta, _Time, _MRef, _Node} ->
|
|
|
%% demonitor if the process is not registered under other names
|
|
|
- maybe_demonitor(Scope, Pid),
|
|
|
+ maybe_demonitor(Pid, TableByPid),
|
|
|
%% remove from table
|
|
|
- remove_from_local_table(Scope, Name, Pid),
|
|
|
+ remove_from_local_table(Name, Pid, TableByName, TableByPid),
|
|
|
%% callback
|
|
|
syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
|
|
|
%% broadcast
|
|
|
syn_gen_scope:broadcast({'3.0', sync_unregister, Name, Pid, Meta}, [RequesterNode], State),
|
|
|
%% return
|
|
|
- {reply, ok, State};
|
|
|
+ {reply, {ok, TableByPid}, State};
|
|
|
|
|
|
{{Name, _TablePid}, _Meta, _Time, _MRef, _Node} ->
|
|
|
%% process is registered locally with another pid: race condition, wait for sync to happen & return error
|
|
|
- {reply, {error, race_condition}, State};
|
|
|
+ {reply, {{error, race_condition}, undefined}, State};
|
|
|
|
|
|
undefined ->
|
|
|
- {reply, {error, undefined}, State}
|
|
|
+ {reply, {{error, undefined}, undefined}, State}
|
|
|
end;
|
|
|
|
|
|
handle_call(Request, From, State) ->
|
|
@@ -255,14 +273,22 @@ handle_info({'3.0', sync_register, Scope, Name, Pid, Meta, Time}, State) ->
|
|
|
handle_registry_sync(Scope, Name, Pid, Meta, Time, State),
|
|
|
{noreply, State};
|
|
|
|
|
|
-handle_info({'3.0', sync_unregister, Name, Pid, Meta}, #state{scope = Scope} = State) ->
|
|
|
- remove_from_local_table(Scope, Name, Pid),
|
|
|
+handle_info({'3.0', sync_unregister, Name, Pid, Meta}, #state{
|
|
|
+ scope = Scope,
|
|
|
+ table_by_name = TableByName,
|
|
|
+ table_by_pid = TableByPid
|
|
|
+} = State) ->
|
|
|
+ remove_from_local_table(Name, Pid, TableByName, TableByPid),
|
|
|
%% callback
|
|
|
syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
|
|
|
%% return
|
|
|
{noreply, State};
|
|
|
-handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{scope = Scope} = State) ->
|
|
|
- case find_registry_entries_by_pid(Scope, Pid) of
|
|
|
+handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{
|
|
|
+ scope = Scope,
|
|
|
+ table_by_name = TableByName,
|
|
|
+ table_by_pid = TableByPid
|
|
|
+} = State) ->
|
|
|
+ case find_registry_entries_by_pid(Pid, TableByPid) of
|
|
|
[] ->
|
|
|
error_logger:warning_msg(
|
|
|
"SYN[~s] Received a DOWN message from an unknown process ~p with reason: ~p",
|
|
@@ -272,7 +298,7 @@ handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{scope = Scope} = State
|
|
|
Entries ->
|
|
|
lists:foreach(fun({{Name, _Pid}, Meta, _, _, _}) ->
|
|
|
%% remove from table
|
|
|
- remove_from_local_table(Scope, Name, Pid),
|
|
|
+ remove_from_local_table(Name, Pid, TableByName, TableByPid),
|
|
|
%% callback
|
|
|
syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
|
|
|
%% broadcast
|
|
@@ -290,8 +316,8 @@ handle_info(Info, State) ->
|
|
|
%% Data
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
|
-spec get_local_data(State :: term()) -> {ok, Data :: any()} | undefined.
|
|
|
-get_local_data(#state{scope = Scope}) ->
|
|
|
- {ok, get_registry_tuples_for_node(Scope, node())}.
|
|
|
+get_local_data(#state{table_by_name = TableByName}) ->
|
|
|
+ {ok, get_registry_tuples_for_node(node(), TableByName)}.
|
|
|
|
|
|
-spec save_remote_data(RemoteData :: any(), State :: term()) -> any().
|
|
|
save_remote_data(RegistryTuplesOfRemoteNode, #state{scope = Scope} = State) ->
|
|
@@ -301,64 +327,65 @@ save_remote_data(RegistryTuplesOfRemoteNode, #state{scope = Scope} = State) ->
|
|
|
end, RegistryTuplesOfRemoteNode).
|
|
|
|
|
|
-spec purge_local_data_for_node(Node :: node(), State :: term()) -> any().
|
|
|
-purge_local_data_for_node(Node, #state{scope = Scope}) ->
|
|
|
- purge_registry_for_remote_node(Scope, Node).
|
|
|
+purge_local_data_for_node(Node, #state{
|
|
|
+ scope = Scope,
|
|
|
+ table_by_name = TableByName,
|
|
|
+ table_by_pid = TableByPid
|
|
|
+}) ->
|
|
|
+ purge_registry_for_remote_node(Scope, Node, TableByName, TableByPid).
|
|
|
|
|
|
%% ===================================================================
|
|
|
%% Internal
|
|
|
%% ===================================================================
|
|
|
--spec rebuild_monitors(Scope :: atom()) -> ok.
|
|
|
-rebuild_monitors(Scope) ->
|
|
|
- RegistryTuples = get_registry_tuples_for_node(Scope, node()),
|
|
|
+-spec rebuild_monitors(#state{}) -> ok.
|
|
|
+rebuild_monitors(#state{
|
|
|
+ table_by_name = TableByName,
|
|
|
+ table_by_pid = TableByPid
|
|
|
+}) ->
|
|
|
+ RegistryTuples = get_registry_tuples_for_node(node(), TableByName),
|
|
|
lists:foreach(fun({Name, Pid, Meta, Time}) ->
|
|
|
- remove_from_local_table(Scope, Name, Pid),
|
|
|
+ remove_from_local_table(Name, Pid, TableByName, TableByPid),
|
|
|
case is_process_alive(Pid) of
|
|
|
true ->
|
|
|
MRef = erlang:monitor(process, Pid),
|
|
|
- add_to_local_table(Scope, Name, Pid, Meta, Time, MRef);
|
|
|
+ add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid);
|
|
|
|
|
|
_ ->
|
|
|
ok
|
|
|
end
|
|
|
end, RegistryTuples).
|
|
|
|
|
|
--spec get_registry_tuples_for_node(Scope :: atom(), Node :: node()) -> [syn_registry_tuple()].
|
|
|
-get_registry_tuples_for_node(Scope, Node) ->
|
|
|
- ets:select(syn_backbone:get_table_name(syn_registry_by_name, Scope), [{
|
|
|
+-spec get_registry_tuples_for_node(Node :: node(), TableByName :: atom()) -> [syn_registry_tuple()].
|
|
|
+get_registry_tuples_for_node(Node, TableByName) ->
|
|
|
+ ets:select(TableByName, [{
|
|
|
{{'$1', '$2'}, '$3', '$4', '_', Node},
|
|
|
[],
|
|
|
[{{'$1', '$2', '$3', '$4'}}]
|
|
|
}]).
|
|
|
|
|
|
--spec find_registry_entry_by_name(Scope :: atom(), Name :: any()) ->
|
|
|
+-spec find_registry_entry_by_name(Name :: any(), TableByName :: atom()) ->
|
|
|
Entry :: syn_registry_entry() | undefined | non_existent_table.
|
|
|
-find_registry_entry_by_name(Scope, Name) ->
|
|
|
- case syn_backbone:get_table_name(syn_registry_by_name, Scope) of
|
|
|
- undefined ->
|
|
|
- non_existent_table;
|
|
|
-
|
|
|
- TableName ->
|
|
|
- case ets:select(TableName, [{
|
|
|
- {{Name, '_'}, '_', '_', '_', '_'},
|
|
|
- [],
|
|
|
- ['$_']
|
|
|
- }]) of
|
|
|
- [RegistryEntry] -> RegistryEntry;
|
|
|
- [] -> undefined
|
|
|
- end
|
|
|
+find_registry_entry_by_name(Name, TableByName) ->
|
|
|
+ case ets:select(TableByName, [{
|
|
|
+ {{Name, '_'}, '_', '_', '_', '_'},
|
|
|
+ [],
|
|
|
+ ['$_']
|
|
|
+ }]) of
|
|
|
+ [RegistryEntry] -> RegistryEntry;
|
|
|
+ [] -> undefined
|
|
|
end.
|
|
|
|
|
|
--spec find_registry_entries_by_pid(Scope :: atom(), Pid :: pid()) -> RegistryEntries :: [syn_registry_entry()].
|
|
|
-find_registry_entries_by_pid(Scope, Pid) when is_pid(Pid) ->
|
|
|
- ets:select(syn_backbone:get_table_name(syn_registry_by_pid, Scope), [{
|
|
|
+-spec find_registry_entries_by_pid(Pid :: pid(), TableByPid :: atom()) -> RegistryEntries :: [syn_registry_entry()].
|
|
|
+find_registry_entries_by_pid(Pid, TableByPid) when is_pid(Pid) ->
|
|
|
+ ets:select(TableByPid, [{
|
|
|
{{Pid, '$2'}, '$3', '$4', '$5', '$6'},
|
|
|
[],
|
|
|
[{{{{'$2', Pid}}, '$3', '$4', '$5', '$6'}}]
|
|
|
}]).
|
|
|
|
|
|
--spec find_monitor_for_pid(Scope :: atom(), Pid :: pid()) -> reference() | undefined.
|
|
|
-find_monitor_for_pid(Scope, Pid) when is_pid(Pid) ->
|
|
|
- case ets:select(syn_backbone:get_table_name(syn_registry_by_pid, Scope), [{
|
|
|
+-spec find_monitor_for_pid(Pid :: pid(), TableByPid :: atom()) -> reference() | undefined.
|
|
|
+find_monitor_for_pid(Pid, TableByPid) when is_pid(Pid) ->
|
|
|
+ case ets:select(TableByPid, [{
|
|
|
{{Pid, '_'}, '_', '_', '$5', '_'},
|
|
|
[],
|
|
|
['$5']
|
|
@@ -367,11 +394,11 @@ find_monitor_for_pid(Scope, Pid) when is_pid(Pid) ->
|
|
|
'$end_of_table' -> undefined
|
|
|
end.
|
|
|
|
|
|
--spec maybe_demonitor(Scope :: atom(), Pid :: pid()) -> ok.
|
|
|
-maybe_demonitor(Scope, Pid) ->
|
|
|
+-spec maybe_demonitor(Pid :: pid(), TableByPid :: atom()) -> ok.
|
|
|
+maybe_demonitor(Pid, TableByPid) ->
|
|
|
%% try to retrieve 2 items
|
|
|
%% if only 1 is returned it means that no other aliases exist for the Pid
|
|
|
- case ets:select(syn_backbone:get_table_name(syn_registry_by_pid, Scope), [{
|
|
|
+ case ets:select(TableByPid, [{
|
|
|
{{Pid, '_'}, '_', '_', '$5', '_'},
|
|
|
[],
|
|
|
['$5']
|
|
@@ -385,29 +412,30 @@ maybe_demonitor(Scope, Pid) ->
|
|
|
end.
|
|
|
|
|
|
-spec add_to_local_table(
|
|
|
- Scope :: atom(),
|
|
|
Name :: any(),
|
|
|
Pid :: pid(),
|
|
|
Meta :: any(),
|
|
|
Time :: integer(),
|
|
|
- MRef :: undefined | reference()
|
|
|
+ MRef :: undefined | reference(),
|
|
|
+ TableByName :: atom(),
|
|
|
+ TableByPid :: atom()
|
|
|
) -> true.
|
|
|
-add_to_local_table(Scope, Name, Pid, Meta, Time, MRef) ->
|
|
|
+add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid) ->
|
|
|
%% insert
|
|
|
- true = ets:insert(syn_backbone:get_table_name(syn_registry_by_name, Scope),
|
|
|
- {{Name, Pid}, Meta, Time, MRef, node(Pid)}
|
|
|
- ),
|
|
|
- true = ets:insert(syn_backbone:get_table_name(syn_registry_by_pid, Scope),
|
|
|
- {{Pid, Name}, Meta, Time, MRef, node(Pid)}
|
|
|
- ).
|
|
|
+ true = ets:insert(TableByName, {{Name, Pid}, Meta, Time, MRef, node(Pid)}),
|
|
|
+ true = ets:insert(TableByPid, {{Pid, Name}, Meta, Time, MRef, node(Pid)}).
|
|
|
|
|
|
--spec remove_from_local_table(Scope :: atom(), Name :: any(), Pid :: pid()) -> true.
|
|
|
-remove_from_local_table(Scope, Name, Pid) ->
|
|
|
- true = ets:delete(syn_backbone:get_table_name(syn_registry_by_name, Scope), {Name, Pid}),
|
|
|
- true = ets:delete(syn_backbone:get_table_name(syn_registry_by_pid, Scope), {Pid, Name}).
|
|
|
+-spec remove_from_local_table(
|
|
|
+ Name :: any(),
|
|
|
+ Pid :: pid(),
|
|
|
+ TableByName :: atom(),
|
|
|
+ TableByPid :: atom()
|
|
|
+) -> true.
|
|
|
+remove_from_local_table(Name, Pid, TableByName, TableByPid) ->
|
|
|
+ true = ets:delete(TableByName, {Name, Pid}),
|
|
|
+ true = ets:delete(TableByPid, {Pid, Name}).
|
|
|
|
|
|
-spec update_local_table(
|
|
|
- Scope :: atom(),
|
|
|
Name :: any(),
|
|
|
PreviousPid :: pid(),
|
|
|
{
|
|
@@ -415,25 +443,27 @@ remove_from_local_table(Scope, Name, Pid) ->
|
|
|
Meta :: any(),
|
|
|
Time :: integer(),
|
|
|
MRef :: undefined | reference()
|
|
|
- }
|
|
|
+ },
|
|
|
+ TableByName :: atom(),
|
|
|
+ TableByPid :: atom()
|
|
|
) -> true.
|
|
|
-update_local_table(Scope, Name, PreviousPid, {Pid, Meta, Time, MRef}) ->
|
|
|
- maybe_demonitor(Scope, PreviousPid),
|
|
|
- remove_from_local_table(Scope, Name, PreviousPid),
|
|
|
- add_to_local_table(Scope, Name, Pid, Meta, Time, MRef).
|
|
|
+update_local_table(Name, PreviousPid, {Pid, Meta, Time, MRef}, TableByName, TableByPid) ->
|
|
|
+ maybe_demonitor(PreviousPid, TableByPid),
|
|
|
+ remove_from_local_table(Name, PreviousPid, TableByName, TableByPid),
|
|
|
+ add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid).
|
|
|
|
|
|
--spec purge_registry_for_remote_node(Scope :: atom(), Node :: atom()) -> true.
|
|
|
-purge_registry_for_remote_node(Scope, Node) when Node =/= node() ->
|
|
|
+-spec purge_registry_for_remote_node(Scope :: atom(), Node :: atom(), TableByName :: atom(), TableByPid :: atom()) -> true.
|
|
|
+purge_registry_for_remote_node(Scope, Node, TableByName, TableByPid) when Node =/= node() ->
|
|
|
%% loop elements for callback in a separate process to free scope process
|
|
|
- RegistryTuples = get_registry_tuples_for_node(Scope, Node),
|
|
|
+ RegistryTuples = get_registry_tuples_for_node(Node, TableByName),
|
|
|
spawn(fun() ->
|
|
|
lists:foreach(fun({Name, Pid, Meta, _Time}) ->
|
|
|
syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta)
|
|
|
end, RegistryTuples)
|
|
|
end),
|
|
|
%% remove all from pid table
|
|
|
- true = ets:match_delete(syn_backbone:get_table_name(syn_registry_by_name, Scope), {{'_', '_'}, '_', '_', '_', Node}),
|
|
|
- true = ets:match_delete(syn_backbone:get_table_name(syn_registry_by_pid, Scope), {{'_', '_'}, '_', '_', '_', Node}).
|
|
|
+ true = ets:match_delete(TableByName, {{'_', '_'}, '_', '_', '_', Node}),
|
|
|
+ true = ets:match_delete(TableByPid, {{'_', '_'}, '_', '_', '_', Node}).
|
|
|
|
|
|
-spec handle_registry_sync(
|
|
|
Scope :: atom(),
|
|
@@ -443,17 +473,20 @@ purge_registry_for_remote_node(Scope, Node) when Node =/= node() ->
|
|
|
Time :: non_neg_integer(),
|
|
|
#state{}
|
|
|
) -> any().
|
|
|
-handle_registry_sync(Scope, Name, Pid, Meta, Time, State) ->
|
|
|
- case find_registry_entry_by_name(Scope, Name) of
|
|
|
+handle_registry_sync(Scope, Name, Pid, Meta, Time, #state{
|
|
|
+ table_by_name = TableByName,
|
|
|
+ table_by_pid = TableByPid
|
|
|
+} = State) ->
|
|
|
+ case find_registry_entry_by_name(Name, TableByName) of
|
|
|
undefined ->
|
|
|
%% no conflict
|
|
|
- add_to_local_table(Scope, Name, Pid, Meta, Time, undefined),
|
|
|
+ add_to_local_table(Name, Pid, Meta, Time, undefined, TableByName, TableByPid),
|
|
|
%% callback
|
|
|
syn_event_handler:do_on_process_registered(Scope, Name, {undefined, undefined}, {Pid, Meta});
|
|
|
|
|
|
{{Name, Pid}, TableMeta, _TableTime, MRef, _TableNode} ->
|
|
|
%% same pid, more recent (because it comes from the same node, which means that it's sequential)
|
|
|
- add_to_local_table(Scope, Name, Pid, Meta, Time, MRef),
|
|
|
+ add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid),
|
|
|
%% callback
|
|
|
syn_event_handler:do_on_process_registered(Scope, Name, {Pid, TableMeta}, {Pid, Meta});
|
|
|
|
|
@@ -467,7 +500,7 @@ handle_registry_sync(Scope, Name, Pid, Meta, Time, State) ->
|
|
|
|
|
|
{{Name, TablePid}, TableMeta, TableTime, _TableMRef, _TableNode} when TableTime < Time ->
|
|
|
%% current node does not own any of the conflicting processes, update
|
|
|
- update_local_table(Scope, Name, TablePid, {Pid, Meta, Time, undefined}),
|
|
|
+ update_local_table(Name, TablePid, {Pid, Meta, Time, undefined}, TableByName, TableByPid),
|
|
|
%% callbacks
|
|
|
syn_event_handler:do_on_process_unregistered(Scope, Name, TablePid, TableMeta),
|
|
|
syn_event_handler:do_on_process_registered(Scope, Name, {TablePid, TableMeta}, {Pid, Meta});
|
|
@@ -484,7 +517,10 @@ handle_registry_sync(Scope, Name, Pid, Meta, Time, State) ->
|
|
|
{TablePid :: pid(), TableMeta :: any(), TableTime :: non_neg_integer(), TableMRef :: reference()},
|
|
|
#state{}
|
|
|
) -> any().
|
|
|
-resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime, TableMRef}, State) ->
|
|
|
+resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime, TableMRef}, #state{
|
|
|
+ table_by_name = TableByName,
|
|
|
+ table_by_pid = TableByPid
|
|
|
+} = State) ->
|
|
|
%% call conflict resolution
|
|
|
PidToKeep = syn_event_handler:do_resolve_registry_conflict(
|
|
|
Scope,
|
|
@@ -497,7 +533,7 @@ resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime
|
|
|
Pid ->
|
|
|
%% -> we keep the remote pid
|
|
|
%% update locally, the incoming sync_register will update with the time coming from remote node
|
|
|
- update_local_table(Scope, Name, TablePid, {Pid, Meta, Time, undefined}),
|
|
|
+ update_local_table(Name, TablePid, {Pid, Meta, Time, undefined}, TableByName, TableByPid),
|
|
|
%% callbacks
|
|
|
syn_event_handler:do_on_process_unregistered(Scope, Name, TablePid, TableMeta),
|
|
|
syn_event_handler:do_on_process_registered(Scope, Name, {TablePid, TableMeta}, {Pid, Meta}),
|
|
@@ -511,7 +547,7 @@ resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime
|
|
|
%% -> we keep the local pid
|
|
|
%% overwrite with updated time
|
|
|
ResolveTime = erlang:system_time(),
|
|
|
- add_to_local_table(Scope, Name, TablePid, TableMeta, ResolveTime, TableMRef),
|
|
|
+ add_to_local_table(Name, TablePid, TableMeta, ResolveTime, TableMRef, TableByName, TableByPid),
|
|
|
%% broadcast
|
|
|
syn_gen_scope:broadcast({'3.0', sync_register, Scope, Name, TablePid, TableMeta, ResolveTime}, State),
|
|
|
error_logger:info_msg("SYN[~s] Registry CONFLICT for name ~p@~s: ~p ~p -> chosen: ~p",
|
|
@@ -520,8 +556,8 @@ resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime
|
|
|
|
|
|
Invalid ->
|
|
|
%% remove
|
|
|
- maybe_demonitor(Scope, TablePid),
|
|
|
- remove_from_local_table(Scope, Name, TablePid),
|
|
|
+ maybe_demonitor(TablePid, TableByPid),
|
|
|
+ remove_from_local_table(Name, TablePid, TableByName, TableByPid),
|
|
|
%% callback
|
|
|
syn_event_handler:do_on_process_unregistered(Scope, Name, TablePid, TableMeta),
|
|
|
%% kill local, remote will be killed by other node performing the same resolve
|
|
@@ -530,3 +566,29 @@ resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime
|
|
|
[node(), Name, Scope, Pid, TablePid, Invalid]
|
|
|
)
|
|
|
end.
|
|
|
+
|
|
|
+%% ===================================================================
|
|
|
+%% Test support
|
|
|
+%% ===================================================================
|
|
|
+-spec add_to_local_table(
|
|
|
+ Scope :: atom(),
|
|
|
+ Name :: any(),
|
|
|
+ Pid :: pid(),
|
|
|
+ Meta :: any(),
|
|
|
+ Time :: integer(),
|
|
|
+ MRef :: undefined | reference()
|
|
|
+) -> true.
|
|
|
+add_to_local_table(Scope, Name, Pid, Meta, Time, MRef) ->
|
|
|
+ TableByName = syn_backbone:get_table_name(syn_registry_by_name, Scope),
|
|
|
+ TableByPid = syn_backbone:get_table_name(syn_registry_by_pid, Scope),
|
|
|
+ add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid).
|
|
|
+
|
|
|
+-spec remove_from_local_table(
|
|
|
+ Scope :: atom(),
|
|
|
+ Name :: any(),
|
|
|
+ Pid :: pid()
|
|
|
+) -> true.
|
|
|
+remove_from_local_table(Scope, Name, Pid) ->
|
|
|
+ TableByName = syn_backbone:get_table_name(syn_registry_by_name, Scope),
|
|
|
+ TableByPid = syn_backbone:get_table_name(syn_registry_by_pid, Scope),
|
|
|
+ remove_from_local_table(Name, Pid, TableByName, TableByPid).
|