|
@@ -66,9 +66,9 @@ lookup(Name) ->
|
|
|
|
|
|
-spec lookup(Scope :: atom(), Name :: any()) -> {pid(), Meta :: any()} | undefined.
|
|
|
lookup(Scope, Name) ->
|
|
|
- case find_registry_tuple_by_name(Scope, Name) of
|
|
|
+ case find_registry_entry_by_name(Scope, Name) of
|
|
|
undefined -> undefined;
|
|
|
- {Name, Pid, Meta, _} -> {Pid, Meta}
|
|
|
+ {{Name, Pid}, Meta, _, _, _} -> {Pid, Meta}
|
|
|
end.
|
|
|
|
|
|
-spec register(Name :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
|
|
@@ -95,10 +95,10 @@ unregister(Name) ->
|
|
|
-spec unregister(Scope :: atom(), Name :: any()) -> ok | {error, Reason :: any()}.
|
|
|
unregister(Scope, Name) ->
|
|
|
% get process' node
|
|
|
- case find_registry_tuple_by_name(Scope, Name) of
|
|
|
+ case find_registry_entry_by_name(Scope, Name) of
|
|
|
undefined ->
|
|
|
{error, undefined};
|
|
|
- {Name, Pid, _, _} ->
|
|
|
+ {{Name, Pid}, _, _, _, _} ->
|
|
|
ProcessName = get_process_name_for_scope(Scope),
|
|
|
Node = node(Pid),
|
|
|
gen_server:call({ProcessName, Node}, {unregister_on_node, Name})
|
|
@@ -145,17 +145,39 @@ handle_call(get_subcluster_nodes, _From, #state{
|
|
|
handle_call({register_on_node, Name, Pid, Meta}, _From, #state{
|
|
|
scope = Scope
|
|
|
} = State) ->
|
|
|
- MRef = case find_monitor_for_pid(Scope, Pid) of
|
|
|
- undefined -> erlang:monitor(process, Pid); %% process is not monitored yet, add
|
|
|
- MRef0 -> MRef0
|
|
|
- end,
|
|
|
- %% add to local table
|
|
|
- Time = erlang:system_time(),
|
|
|
- add_to_local_table(Scope, Name, Pid, Meta, Time, MRef),
|
|
|
- %% broadcast
|
|
|
- broadcast({'3.0', sync_register, Scope, Name, Pid, Meta, Time}, State),
|
|
|
- %% return
|
|
|
- {reply, ok, State};
|
|
|
+ case is_process_alive(Pid) of
|
|
|
+ true ->
|
|
|
+ case find_registry_entry_by_name(Scope, Name) of
|
|
|
+ undefined ->
|
|
|
+ %% available
|
|
|
+ MRef = case find_monitor_for_pid(Scope, Pid) of
|
|
|
+ undefined -> erlang:monitor(process, Pid); %% process is not monitored yet, add
|
|
|
+ MRef0 -> MRef0
|
|
|
+ end,
|
|
|
+ %% add to local table
|
|
|
+ Time = erlang:system_time(),
|
|
|
+ add_to_local_table(Scope, Name, Pid, Meta, Time, MRef),
|
|
|
+ %% broadcast
|
|
|
+ broadcast({'3.0', sync_register, Scope, Name, Pid, Meta, Time}, State),
|
|
|
+ %% return
|
|
|
+ {reply, ok, State};
|
|
|
+
|
|
|
+ {{Name, Pid}, _Meta, _Time, MRef, _Node} ->
|
|
|
+ %% same pid, possibly new meta, overwrite
|
|
|
+ Time = erlang:system_time(),
|
|
|
+ add_to_local_table(Scope, Name, Pid, Meta, Time, MRef),
|
|
|
+ %% broadcast
|
|
|
+ broadcast({'3.0', sync_register, Scope, Name, Pid, Meta, Time}, State),
|
|
|
+ %% return
|
|
|
+ {reply, ok, State};
|
|
|
+
|
|
|
+ _ ->
|
|
|
+ {reply, {error, taken}, State}
|
|
|
+ end;
|
|
|
+
|
|
|
+ false ->
|
|
|
+ {reply, {error, not_alive}, State}
|
|
|
+ end;
|
|
|
|
|
|
handle_call({unregister_on_node, Name}, _From, #state{scope = Scope} = State) ->
|
|
|
case find_registry_entry_by_name(Scope, Name) of
|
|
@@ -267,7 +289,7 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, #state{
|
|
|
end;
|
|
|
|
|
|
handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{scope = Scope} = State) ->
|
|
|
- case find_registry_tuples_by_pid(Scope, Pid) of
|
|
|
+ case find_registry_entries_by_pid(Scope, Pid) of
|
|
|
[] ->
|
|
|
error_logger:warning_msg(
|
|
|
"SYN[~p] Received a DOWN message from an unknown process ~p with reason: ~p~n",
|
|
@@ -275,7 +297,7 @@ handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{scope = Scope} = State
|
|
|
);
|
|
|
|
|
|
Entries ->
|
|
|
- lists:foreach(fun({Name, _Pid, _Meta, _Time}) ->
|
|
|
+ lists:foreach(fun({{Name, _Pid}, _Meta, _Time, _MRef, _Node}) ->
|
|
|
%% remove from table
|
|
|
remove_from_local_table(Scope, Name, Pid),
|
|
|
%% broadcast
|
|
@@ -336,23 +358,10 @@ cast_to_node(RemoteNode, Message, #state{
|
|
|
}) ->
|
|
|
gen_server:cast({ProcessName, RemoteNode}, Message).
|
|
|
|
|
|
--spec find_registry_tuple_by_name(Scope :: atom(), Name :: any()) ->
|
|
|
- RegistryTuple :: syn_registry_tuple() | undefined.
|
|
|
-find_registry_tuple_by_name(Scope, Name) ->
|
|
|
- TableName = syn_backbone:get_table_name(syn_registry_by_name, Scope),
|
|
|
- case ets:select(TableName, [{
|
|
|
- {{Name, '$2'}, '$3', '$4', '_', '_'},
|
|
|
- [],
|
|
|
- [{{{const, Name}, '$2', '$3', '$4'}}]
|
|
|
- }]) of
|
|
|
- [RegistryTuple] -> RegistryTuple;
|
|
|
- _ -> undefined
|
|
|
- end.
|
|
|
-
|
|
|
-spec find_registry_entry_by_name(Scope :: atom(), Name :: any()) -> Entry :: syn_registry_entry() | undefined.
|
|
|
find_registry_entry_by_name(Scope, Name) ->
|
|
|
case ets:select(syn_backbone:get_table_name(syn_registry_by_name, Scope), [{
|
|
|
- {{Name, '$2'}, '$3', '_', '_', '_'},
|
|
|
+ {{Name, '_'}, '_', '_', '_', '_'},
|
|
|
[],
|
|
|
['$_']
|
|
|
}]) of
|
|
@@ -360,6 +369,14 @@ find_registry_entry_by_name(Scope, Name) ->
|
|
|
_ -> undefined
|
|
|
end.
|
|
|
|
|
|
+-spec find_registry_entries_by_pid(Scope :: atom(), Pid :: pid()) -> RegistryEntries :: [syn_registry_entry()].
|
|
|
+find_registry_entries_by_pid(Scope, Pid) when is_pid(Pid) ->
|
|
|
+ ets:select(syn_backbone:get_table_name(syn_registry_by_pid, Scope), [{
|
|
|
+ {{Pid, '$2'}, '$3', '$4', '$5', '$6'},
|
|
|
+ [],
|
|
|
+ [{{{{'$2', Pid}}, '$3', '$4', '$5', '$6'}}]
|
|
|
+ }]).
|
|
|
+
|
|
|
-spec find_monitor_for_pid(Scope :: atom(), Pid :: pid()) -> reference() | undefined.
|
|
|
find_monitor_for_pid(Scope, Pid) when is_pid(Pid) ->
|
|
|
TableName = syn_backbone:get_table_name(syn_registry_by_pid, Scope),
|
|
@@ -368,18 +385,10 @@ find_monitor_for_pid(Scope, Pid) when is_pid(Pid) ->
|
|
|
[],
|
|
|
['$5']
|
|
|
}], 1) of
|
|
|
- {[MonitorRef], _} -> MonitorRef;
|
|
|
+ {[MRef], _} -> MRef;
|
|
|
_ -> undefined
|
|
|
end.
|
|
|
|
|
|
--spec find_registry_tuples_by_pid(Scope :: atom(), Pid :: pid()) -> RegistryTuples :: [syn_registry_tuple()].
|
|
|
-find_registry_tuples_by_pid(Scope, Pid) when is_pid(Pid) ->
|
|
|
- ets:select(syn_backbone:get_table_name(syn_registry_by_pid, Scope), [{
|
|
|
- {{Pid, '$2'}, '$3', '$4', '_', '_'},
|
|
|
- [],
|
|
|
- [{{'$2', Pid, '$3', '$4'}}]
|
|
|
- }]).
|
|
|
-
|
|
|
-spec maybe_demonitor(Scope :: atom(), Pid :: pid()) -> ok.
|
|
|
maybe_demonitor(Scope, Pid) ->
|
|
|
%% try to retrieve 2 items
|
|
@@ -403,14 +412,14 @@ maybe_demonitor(Scope, Pid) ->
|
|
|
Pid :: pid(),
|
|
|
Meta :: any(),
|
|
|
Time :: integer(),
|
|
|
- MonitorRef :: undefined | reference()
|
|
|
+ MRef :: undefined | reference()
|
|
|
) -> true.
|
|
|
-add_to_local_table(Scope, Name, Pid, Meta, Time, MonitorRef) ->
|
|
|
+add_to_local_table(Scope, Name, Pid, Meta, Time, MRef) ->
|
|
|
ets:insert(syn_backbone:get_table_name(syn_registry_by_name, Scope),
|
|
|
- {{Name, Pid}, Meta, Time, MonitorRef, node(Pid)}
|
|
|
+ {{Name, Pid}, Meta, Time, MRef, node(Pid)}
|
|
|
),
|
|
|
ets:insert(syn_backbone:get_table_name(syn_registry_by_pid, Scope),
|
|
|
- {{Pid, Name}, Meta, Time, MonitorRef, node(Pid)}
|
|
|
+ {{Pid, Name}, Meta, Time, MRef, node(Pid)}
|
|
|
).
|
|
|
|
|
|
-spec remove_from_local_table(Scope :: atom(), Name :: any(), Pid :: pid()) -> true.
|