|
@@ -29,13 +29,13 @@
|
|
%% API
|
|
%% API
|
|
-export([start_link/0]).
|
|
-export([start_link/0]).
|
|
-export([register/2, register/3]).
|
|
-export([register/2, register/3]).
|
|
--export([reregister/2, reregister/3]).
|
|
|
|
|
|
+-export([force_register/2, force_register/3]).
|
|
-export([unregister/1]).
|
|
-export([unregister/1]).
|
|
-export([whereis/1, whereis/2]).
|
|
-export([whereis/1, whereis/2]).
|
|
-export([count/0, count/1]).
|
|
-export([count/0, count/1]).
|
|
|
|
|
|
%% sync API
|
|
%% sync API
|
|
--export([sync_register/5, sync_unregister/3]).
|
|
|
|
|
|
+-export([sync_register/6, sync_unregister/3]).
|
|
-export([sync_demonitor_and_kill_on_node/5]).
|
|
-export([sync_demonitor_and_kill_on_node/5]).
|
|
-export([sync_get_local_registry_tuples/1]).
|
|
-export([sync_get_local_registry_tuples/1]).
|
|
-export([force_cluster_sync/0]).
|
|
-export([force_cluster_sync/0]).
|
|
@@ -70,38 +70,16 @@ register(Name, Pid) ->
|
|
-spec register(Name :: any(), Pid :: pid(), Meta :: any()) -> ok | {error, Reason :: any()}.
|
|
-spec register(Name :: any(), Pid :: pid(), Meta :: any()) -> ok | {error, Reason :: any()}.
|
|
register(Name, Pid, Meta) when is_pid(Pid) ->
|
|
register(Name, Pid, Meta) when is_pid(Pid) ->
|
|
Node = node(Pid),
|
|
Node = node(Pid),
|
|
- gen_server:call({?MODULE, Node}, {register_on_node, Name, Pid, Meta}).
|
|
|
|
-
|
|
|
|
--spec reregister(Name :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
|
|
|
|
-reregister(Name, Pid) ->
|
|
|
|
- reregister(Name, Pid, undefined).
|
|
|
|
-
|
|
|
|
--spec reregister(Name :: any(), Pid :: pid(), Meta :: any()) -> ok | {error, Reason :: any()}.
|
|
|
|
-reregister(Name, Pid, Meta) when is_pid(Pid) ->
|
|
|
|
- reregister(Name, Pid, Meta, 0).
|
|
|
|
-
|
|
|
|
--spec reregister(Name :: any(), Pid :: pid(), Meta :: any(), RetryCount :: non_neg_integer()) ->
|
|
|
|
- ok | {error, Reason :: any()}.
|
|
|
|
-reregister(Name, Pid, Meta, RetryCount) when RetryCount > 40 ->
|
|
|
|
- exit(self(), {timeout, {gen_server, call, [?MODULE, reregister, {Name, Pid, Meta}]}});
|
|
|
|
-reregister(Name, Pid, Meta, RetryCount) when is_pid(Pid) ->
|
|
|
|
- ?MODULE:unregister(Name),
|
|
|
|
- case find_registry_tuple_by_name(Name) of
|
|
|
|
- undefined ->
|
|
|
|
- case ?MODULE:register(Name, Pid, Meta) of
|
|
|
|
- {error, taken} ->
|
|
|
|
- %% race conditions, retry
|
|
|
|
- timer:sleep(100),
|
|
|
|
- reregister(Name, Pid, Meta, RetryCount + 1);
|
|
|
|
-
|
|
|
|
- Result ->
|
|
|
|
- Result
|
|
|
|
- end;
|
|
|
|
|
|
+ gen_server:call({?MODULE, Node}, {register_on_node, Name, Pid, Meta, false}).
|
|
|
|
|
|
- {Name, _, _, _} ->
|
|
|
|
- timer:sleep(100),
|
|
|
|
- reregister(Name, Pid, Meta, RetryCount + 1)
|
|
|
|
- end.
|
|
|
|
|
|
+-spec force_register(Name :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
|
|
|
|
+force_register(Name, Pid) ->
|
|
|
|
+ force_register(Name, Pid, undefined).
|
|
|
|
+
|
|
|
|
+-spec force_register(Name :: any(), Pid :: pid(), Meta :: any()) -> ok | {error, Reason :: any()}.
|
|
|
|
+force_register(Name, Pid, Meta) when is_pid(Pid) ->
|
|
|
|
+ Node = node(Pid),
|
|
|
|
+ gen_server:call({?MODULE, Node}, {register_on_node, Name, Pid, Meta, true}).
|
|
|
|
|
|
-spec unregister(Name :: any()) -> ok | {error, Reason :: any()}.
|
|
-spec unregister(Name :: any()) -> ok | {error, Reason :: any()}.
|
|
unregister(Name) ->
|
|
unregister(Name) ->
|
|
@@ -140,10 +118,17 @@ count(Node) ->
|
|
[true]
|
|
[true]
|
|
}]).
|
|
}]).
|
|
|
|
|
|
--spec sync_register(RemoteNode :: node(), Name :: any(), RemotePid :: pid(), RemoteMeta :: any(), RemoteTime :: integer()) ->
|
|
|
|
|
|
+-spec sync_register(
|
|
|
|
+ RemoteNode :: node(),
|
|
|
|
+ Name :: any(),
|
|
|
|
+ RemotePid :: pid(),
|
|
|
|
+ RemoteMeta :: any(),
|
|
|
|
+ RemoteTime :: integer(),
|
|
|
|
+ Force :: boolean()
|
|
|
|
+) ->
|
|
ok.
|
|
ok.
|
|
-sync_register(RemoteNode, Name, RemotePid, RemoteMeta, RemoteTime) ->
|
|
|
|
- gen_server:cast({?MODULE, RemoteNode}, {sync_register, Name, RemotePid, RemoteMeta, RemoteTime}).
|
|
|
|
|
|
+sync_register(RemoteNode, Name, RemotePid, RemoteMeta, RemoteTime, Force) ->
|
|
|
|
+ gen_server:cast({?MODULE, RemoteNode}, {sync_register, Name, RemotePid, RemoteMeta, RemoteTime, Force}).
|
|
|
|
|
|
-spec sync_unregister(RemoteNode :: node(), Name :: any(), Pid :: pid()) -> ok.
|
|
-spec sync_unregister(RemoteNode :: node(), Name :: any(), Pid :: pid()) -> ok.
|
|
sync_unregister(RemoteNode, Name, Pid) ->
|
|
sync_unregister(RemoteNode, Name, Pid) ->
|
|
@@ -216,28 +201,42 @@ init([]) ->
|
|
{stop, Reason :: any(), Reply :: any(), #state{}} |
|
|
{stop, Reason :: any(), Reply :: any(), #state{}} |
|
|
{stop, Reason :: any(), #state{}}.
|
|
{stop, Reason :: any(), #state{}}.
|
|
|
|
|
|
-handle_call({register_on_node, Name, Pid, Meta}, _From, State) ->
|
|
|
|
|
|
+handle_call({register_on_node, Name, Pid, Meta, Force}, _From, State) ->
|
|
%% check if pid is alive
|
|
%% check if pid is alive
|
|
case is_process_alive(Pid) of
|
|
case is_process_alive(Pid) of
|
|
true ->
|
|
true ->
|
|
%% check if name available
|
|
%% check if name available
|
|
case find_registry_tuple_by_name(Name) of
|
|
case find_registry_tuple_by_name(Name) of
|
|
undefined ->
|
|
undefined ->
|
|
|
|
+ %% available
|
|
{ok, Time} = register_on_node(Name, Pid, Meta),
|
|
{ok, Time} = register_on_node(Name, Pid, Meta),
|
|
%% multicast
|
|
%% multicast
|
|
- multicast_register(Name, Pid, Meta, Time),
|
|
|
|
|
|
+ multicast_register(Name, Pid, Meta, Time, false),
|
|
%% return
|
|
%% return
|
|
{reply, ok, State};
|
|
{reply, ok, State};
|
|
|
|
|
|
{Name, Pid, _, _} ->
|
|
{Name, Pid, _, _} ->
|
|
|
|
+ % same pid, overwrite
|
|
{ok, Time} = register_on_node(Name, Pid, Meta),
|
|
{ok, Time} = register_on_node(Name, Pid, Meta),
|
|
%% multicast
|
|
%% multicast
|
|
- multicast_register(Name, Pid, Meta, Time),
|
|
|
|
|
|
+ multicast_register(Name, Pid, Meta, Time, false),
|
|
%% return
|
|
%% return
|
|
{reply, ok, State};
|
|
{reply, ok, State};
|
|
|
|
|
|
- _ ->
|
|
|
|
- {reply, {error, taken}, State}
|
|
|
|
|
|
+ {Name, _, _, _} ->
|
|
|
|
+ %% same name, different pid
|
|
|
|
+ case Force of
|
|
|
|
+ true ->
|
|
|
|
+ %% force register
|
|
|
|
+ {ok, Time} = register_on_node(Name, Pid, Meta),
|
|
|
|
+ %% multicast
|
|
|
|
+ multicast_register(Name, Pid, Meta, Time, true),
|
|
|
|
+ %% return
|
|
|
|
+ {reply, ok, State};
|
|
|
|
+
|
|
|
|
+ _ ->
|
|
|
|
+ {reply, {error, taken}, State}
|
|
|
|
+ end
|
|
end;
|
|
end;
|
|
_ ->
|
|
_ ->
|
|
{reply, {error, not_alive}, State}
|
|
{reply, {error, not_alive}, State}
|
|
@@ -266,7 +265,7 @@ handle_call(Request, From, State) ->
|
|
{noreply, #state{}, Timeout :: non_neg_integer()} |
|
|
{noreply, #state{}, Timeout :: non_neg_integer()} |
|
|
{stop, Reason :: any(), #state{}}.
|
|
{stop, Reason :: any(), #state{}}.
|
|
|
|
|
|
-handle_cast({sync_register, Name, RemotePid, RemoteMeta, RemoteTime}, State) ->
|
|
|
|
|
|
+handle_cast({sync_register, Name, RemotePid, RemoteMeta, RemoteTime, Force}, State) ->
|
|
%% check for conflicts
|
|
%% check for conflicts
|
|
case find_registry_tuple_by_name(Name) of
|
|
case find_registry_tuple_by_name(Name) of
|
|
undefined ->
|
|
undefined ->
|
|
@@ -277,7 +276,20 @@ handle_cast({sync_register, Name, RemotePid, RemoteMeta, RemoteTime}, State) ->
|
|
%% same process, no conflict, overwrite
|
|
%% same process, no conflict, overwrite
|
|
add_to_local_table(Name, RemotePid, RemoteMeta, RemoteTime, undefined);
|
|
add_to_local_table(Name, RemotePid, RemoteMeta, RemoteTime, undefined);
|
|
|
|
|
|
- {Name, TablePid, TableMeta, TableTime} ->
|
|
|
|
|
|
+ {Name, TablePid, _, _} when Force =:= true ->
|
|
|
|
+ case node(TablePid) =:= node() of
|
|
|
|
+ true ->
|
|
|
|
+ %% demonitor
|
|
|
|
+ MonitorRef = syn_registry:find_monitor_for_pid(TablePid),
|
|
|
|
+ catch erlang:demonitor(MonitorRef, [flush]);
|
|
|
|
+
|
|
|
|
+ _ ->
|
|
|
|
+ ok
|
|
|
|
+ end,
|
|
|
|
+ %% overwrite
|
|
|
|
+ add_to_local_table(Name, RemotePid, RemoteMeta, RemoteTime, undefined);
|
|
|
|
+
|
|
|
|
+ {Name, TablePid, TableMeta, TableTime} when Force =:= false ->
|
|
%% different pid, we have a conflict
|
|
%% different pid, we have a conflict
|
|
global:trans({{?MODULE, {inconsistent_name, Name}}, self()},
|
|
global:trans({{?MODULE, {inconsistent_name, Name}}, self()},
|
|
fun() ->
|
|
fun() ->
|
|
@@ -444,11 +456,11 @@ code_change(_OldVsn, State, _Extra) ->
|
|
%% ===================================================================
|
|
%% ===================================================================
|
|
%% Internal
|
|
%% Internal
|
|
%% ===================================================================
|
|
%% ===================================================================
|
|
--spec multicast_register(Name :: any(), Pid :: pid(), Meta :: any(), Time :: integer()) -> pid().
|
|
|
|
-multicast_register(Name, Pid, Meta, Time) ->
|
|
|
|
|
|
+-spec multicast_register(Name :: any(), Pid :: pid(), Meta :: any(), Time :: integer(), Force :: boolean()) -> pid().
|
|
|
|
+multicast_register(Name, Pid, Meta, Time, Force) ->
|
|
spawn_link(fun() ->
|
|
spawn_link(fun() ->
|
|
lists:foreach(fun(RemoteNode) ->
|
|
lists:foreach(fun(RemoteNode) ->
|
|
- sync_register(RemoteNode, Name, Pid, Meta, Time)
|
|
|
|
|
|
+ sync_register(RemoteNode, Name, Pid, Meta, Time, Force)
|
|
end, nodes())
|
|
end, nodes())
|
|
end).
|
|
end).
|
|
|
|
|
|
@@ -522,8 +534,8 @@ add_to_local_table(Name, Pid, Meta, Time, MonitorRef) ->
|
|
undefined ->
|
|
undefined ->
|
|
undefined;
|
|
undefined;
|
|
|
|
|
|
- {Name, OldPid, _, _} ->
|
|
|
|
- ets:delete(syn_registry_by_pid, {OldPid, Name})
|
|
|
|
|
|
+ {Name, PreviousPid, _, _} ->
|
|
|
|
+ ets:delete(syn_registry_by_pid, {PreviousPid, Name})
|
|
end,
|
|
end,
|
|
%% overwrite & add
|
|
%% overwrite & add
|
|
ets:insert(syn_registry_by_name, {Name, Pid, Meta, Time, MonitorRef, node(Pid)}),
|
|
ets:insert(syn_registry_by_name, {Name, Pid, Meta, Time, MonitorRef, node(Pid)}),
|