Browse Source

Add registration (no errors & no conflict resolution yet).

Roberto Ostinelli 3 years ago
parent
commit
090f35888b
3 changed files with 243 additions and 29 deletions
  1. 28 0
      src/syn.erl
  2. 212 26
      src/syn_registry.erl
  3. 3 3
      test/syn_registry_SUITE.erl

+ 28 - 0
src/syn.erl

@@ -28,6 +28,9 @@
 %% API
 -export([start/0, stop/0]).
 -export([get_node_scopes/0, add_node_to_scope/1, add_node_to_scopes/1]).
+-export([lookup/1]).
+-export([register/2, register/3, register/4]).
+-export([unregister/1, unregister/2]).
 
 %% ===================================================================
 %% API
@@ -55,3 +58,28 @@ add_node_to_scopes(Scopes) ->
     lists:foreach(fun(Scope) ->
         syn_scopes_sup:add_node_to_scope(Scope)
     end, Scopes).
+
+%% ----- \/ registry -------------------------------------------------
+-spec lookup(Name :: any()) -> {pid(), Meta :: any()} | undefined.
+lookup(Name) ->
+    syn_registry:lookup(Name).
+
+-spec register(Name :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
+register(Name, Pid) ->
+    syn_registry:register(Name, Pid).
+
+-spec register(NameOrScope :: any(), PidOrName :: any(), MetaOrPid :: any()) -> ok | {error, Reason :: any()}.
+register(NameOrScope, PidOrName, MetaOrPid) ->
+    syn_registry:register(NameOrScope, PidOrName, MetaOrPid).
+
+-spec register(Scope :: atom(), Name :: any(), Pid :: pid(), Meta :: any()) -> ok | {error, Reason :: any()}.
+register(Scope, Name, Pid, Meta) ->
+    syn_registry:register(Scope, Name, Pid, Meta).
+
+-spec unregister(Name :: any()) -> ok | {error, Reason :: any()}.
+unregister(Name) ->
+    syn_registry:unregister(Name).
+
+-spec unregister(Scope :: atom(), Name :: any()) -> ok | {error, Reason :: any()}.
+unregister(Scope, Name) ->
+    syn_registry:unregister(Scope, Name).

+ 212 - 26
src/syn_registry.erl

@@ -29,10 +29,9 @@
 %% API
 -export([start_link/1]).
 -export([get_subcluster_nodes/1]).
-
-%% Cluster API
--export([announce/2]).
--export([sync/2]).
+-export([lookup/1]).
+-export([register/2, register/3, register/4]).
+-export([unregister/1, unregister/2]).
 
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@@ -52,23 +51,58 @@
 %% ===================================================================
 -spec start_link(Scope :: atom()) -> {ok, pid()} | {error, any()}.
 start_link(Scope) when is_atom(Scope) ->
-    ProcessName = get_process_name_for(Scope),
+    ProcessName = get_process_name_for_scope(Scope),
     Args = [Scope, ProcessName],
     gen_server:start_link({local, ProcessName}, ?MODULE, Args, []).
 
 -spec get_subcluster_nodes(Scope :: atom()) -> [node()].
 get_subcluster_nodes(Scope) ->
-    ProcessName = get_process_name_for(Scope),
+    ProcessName = get_process_name_for_scope(Scope),
     gen_server:call(ProcessName, get_subcluster_nodes).
 
-%% ===================================================================
-%% Cluster API
-%% ===================================================================
-announce(RemoteNode, ProcessName) ->
-    gen_server:cast({ProcessName, RemoteNode}, {announce, self()}).
+-spec lookup(Name :: any()) -> {pid(), Meta :: any()} | undefined.
+lookup(Name) ->
+    lookup(default, Name).
+
+-spec lookup(Scope :: atom(), Name :: any()) -> {pid(), Meta :: any()} | undefined.
+lookup(Scope, Name) ->
+    case find_registry_tuple_by_name(Scope, Name) of
+        undefined -> undefined;
+        {Name, Pid, Meta, _} -> {Pid, Meta}
+    end.
+
+-spec register(Name :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
+register(Name, Pid) ->
+    register(default, Name, Pid, undefined).
+
+-spec register(NameOrScope :: any(), PidOrName :: any(), MetaOrPid :: any()) -> ok | {error, Reason :: any()}.
+register(Name, Pid, Meta) when is_pid(Pid) ->
+    register(default, Name, Pid, Meta);
+
+register(Scope, Name, Pid) when is_pid(Pid) ->
+    register(Scope, Name, Pid, undefined).
+
+-spec register(Scope :: atom(), Name :: any(), Pid :: pid(), Meta :: any()) -> ok | {error, Reason :: any()}.
+register(Scope, Name, Pid, Meta) ->
+    ProcessName = get_process_name_for_scope(Scope),
+    Node = node(Pid),
+    gen_server:call({ProcessName, Node}, {register_on_node, Name, Pid, Meta}).
+
+-spec unregister(Name :: any()) -> ok | {error, Reason :: any()}.
+unregister(Name) ->
+    unregister(default, Name).
 
-sync(RemoteNode, ProcessName) ->
-    gen_server:cast({ProcessName, RemoteNode}, {sync, self(), []}).
+-spec unregister(Scope :: atom(), Name :: any()) -> ok | {error, Reason :: any()}.
+unregister(Scope, Name) ->
+    % get process' node
+    case find_registry_tuple_by_name(Scope, Name) of
+        undefined ->
+            {error, undefined};
+        {Name, Pid, _, _} ->
+            ProcessName = get_process_name_for_scope(Scope),
+            Node = node(Pid),
+            gen_server:call({ProcessName, Node}, {unregister_on_node, Name})
+    end.
 
 %% ===================================================================
 %% Callbacks
@@ -108,6 +142,37 @@ handle_call(get_subcluster_nodes, _From, #state{
 } = State) ->
     {reply, Nodes, State};
 
+handle_call({register_on_node, Name, Pid, Meta}, _From, #state{
+    scope = Scope
+} = State) ->
+    MRef = case find_monitor_for_pid(Scope, Pid) of
+        undefined -> erlang:monitor(process, Pid);  %% process is not monitored yet, add
+        MRef0 -> MRef0
+    end,
+    %% add to local table
+    Time = erlang:system_time(),
+    add_to_local_table(Scope, Name, Pid, Meta, Time, MRef),
+    %% broadcast
+    broadcast({'3.0', sync_register, Scope, Name, Pid, Meta, Time}, State),
+    %% return
+    {reply, ok, State};
+
+handle_call({unregister_on_node, Name}, _From, #state{scope = Scope} = State) ->
+    case find_registry_entry_by_name(Scope, Name) of
+        {{Name, Pid}, _Meta, _Clock, _MRef, _Node} ->
+            %% demonitor if the process is not registered under other names
+            maybe_demonitor(Scope, Pid),
+            %% remove from table
+            remove_from_local_table(Scope, Name, Pid),
+            %% broadcast
+            broadcast({'3.0', sync_unregister, Name, Pid}, State),
+            %% return
+            {reply, ok, State};
+
+        _ ->
+            {reply, ok, State}
+    end;
+
 handle_call(Request, From, State) ->
     error_logger:warning_msg("SYN[~p] Received from ~p an unknown call message: ~p~n", [node(), Request, From]),
     {reply, undefined, State}.
@@ -119,15 +184,22 @@ handle_call(Request, From, State) ->
     {noreply, #state{}} |
     {noreply, #state{}, Timeout :: non_neg_integer()} |
     {stop, Reason :: any(), #state{}}.
-handle_cast({announce, RemoteScopePid}, #state{
+handle_cast({'3.0', sync_register, Scope, Name, Pid, Meta, Time}, State) ->
+    add_to_local_table(Scope, Name, Pid, Meta, Time, undefined),
+    {noreply, State};
+
+handle_cast({'3.0', sync_unregister, Name, Pid}, #state{scope = Scope} = State) ->
+    remove_from_local_table(Scope, Name, Pid),
+    {noreply, State};
+
+handle_cast({'3.0', announce, RemoteScopePid}, #state{
     scope = Scope,
-    process_name = ProcessName,
     nodes = Nodes
 } = State) ->
     RemoteScopeNode = node(RemoteScopePid),
     error_logger:info_msg("SYN[~p] Received announce request from node ~p and scope ~p~n", [node(), RemoteScopeNode, Scope]),
     %% send data
-    sync(RemoteScopeNode, ProcessName),
+    cast_to_node(RemoteScopeNode, {'3.0', sync, self(), []}, State),
     %% is this a new node?
     case maps:is_key(RemoteScopeNode, Nodes) of
         true ->
@@ -137,11 +209,11 @@ handle_cast({announce, RemoteScopePid}, #state{
         false ->
             %% monitor & announce
             _MRef = monitor(process, RemoteScopePid),
-            announce(RemoteScopeNode, ProcessName),
+            cast_to_node(RemoteScopeNode, {'3.0', announce, self()}, State),
             {noreply, State#state{nodes = Nodes#{RemoteScopeNode => RemoteScopePid}}}
     end;
 
-handle_cast({sync, RemoteScopePid, _Data}, #state{
+handle_cast({'3.0', sync, RemoteScopePid, _Data}, #state{
     scope = Scope,
     nodes = Nodes
 } = State) ->
@@ -172,17 +244,16 @@ handle_cast(Msg, State) ->
     {noreply, #state{}, Timeout :: non_neg_integer()} |
     {stop, Reason :: any(), #state{}}.
 handle_info(timeout, #state{
-    scope = Scope,
-    process_name = ProcessName
+    scope = Scope
 } = State) ->
     error_logger:info_msg("SYN[~p] Announcing to all nodes in the cluster with scope: ~p~n", [node(), Scope]),
-    lists:foreach(fun(RemoteNode) -> announce(RemoteNode, ProcessName) end, nodes()),
+    broadcast_all({'3.0', announce, self()}, State),
     {noreply, State};
 
 handle_info({'DOWN', _MRef, process, Pid, _Reason}, #state{
     scope = Scope,
     nodes = Nodes
-} = State) ->
+} = State) when node(Pid) =/= node() ->
     PidNode = node(Pid),
     case maps:take(PidNode, Nodes) of
         {Pid, Nodes1} ->
@@ -195,13 +266,32 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, #state{
             {noreply, State}
     end;
 
+handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{scope = Scope} = State) ->
+    case find_registry_tuples_by_pid(Scope, Pid) of
+        [] ->
+            error_logger:warning_msg(
+                "SYN[~p] Received a DOWN message from an unknown process ~p with reason: ~p~n",
+                [node(), Pid, Reason]
+            );
+
+        Entries ->
+            lists:foreach(fun({Name, _Pid, _Meta, _Time}) ->
+                %% remove from table
+                remove_from_local_table(Scope, Name, Pid),
+                %% broadcast
+                broadcast({'3.0', sync_unregister, Name, Pid}, State)
+            end, Entries)
+    end,
+    %% return
+    {noreply, State};
+
 handle_info({nodedown, _Node}, State) ->
     %% ignore & wait for monitor DOWN message
     {noreply, State};
 
-handle_info({nodeup, RemoteNode}, #state{process_name = ProcessName} = State) ->
+handle_info({nodeup, RemoteNode}, State) ->
     error_logger:info_msg("SYN[~p] Node ~p has joined the cluster, sending announce message~n", [node(), RemoteNode]),
-    announce(RemoteNode, ProcessName),
+    cast_to_node(RemoteNode, {'3.0', announce, self()}, State),
     {noreply, State};
 
 handle_info(Info, State) ->
@@ -226,8 +316,104 @@ code_change(_OldVsn, State, _Extra) ->
 %% ===================================================================
 %% Internal
 %% ===================================================================
--spec get_process_name_for(Scope :: atom()) -> atom().
-get_process_name_for(Scope) ->
+-spec get_process_name_for_scope(Scope :: atom()) -> atom().
+get_process_name_for_scope(Scope) ->
     ModuleBin = atom_to_binary(?MODULE),
     ScopeBin = atom_to_binary(Scope),
     binary_to_atom(<<ModuleBin/binary, "_", ScopeBin/binary>>).
+
+-spec broadcast(Message :: any(), #state{}) -> any().
+broadcast(Message, #state{process_name = ProcessName, nodes = Nodes}) ->
+    lists:foreach(fun(RemoteNode) -> gen_server:cast({ProcessName, RemoteNode}, Message) end, maps:keys(Nodes)).
+
+-spec broadcast_all(Message :: any(), #state{}) -> any().
+broadcast_all(Message, #state{process_name = ProcessName}) ->
+    lists:foreach(fun(RemoteNode) -> gen_server:cast({ProcessName, RemoteNode}, Message) end, nodes()).
+
+-spec cast_to_node(RemoteNode :: node(), Message :: any(), #state{}) -> any().
+cast_to_node(RemoteNode, Message, #state{
+    process_name = ProcessName
+}) ->
+    gen_server:cast({ProcessName, RemoteNode}, Message).
+
+-spec find_registry_tuple_by_name(Scope :: atom(), Name :: any()) ->
+    RegistryTuple :: syn_registry_tuple() | undefined.
+find_registry_tuple_by_name(Scope, Name) ->
+    TableName = syn_backbone:get_table_name(syn_registry_by_name, Scope),
+    case ets:select(TableName, [{
+        {{Name, '$2'}, '$3', '$4', '_', '_'},
+        [],
+        [{{{const, Name}, '$2', '$3', '$4'}}]
+    }]) of
+        [RegistryTuple] -> RegistryTuple;
+        _ -> undefined
+    end.
+
+-spec find_registry_entry_by_name(Scope :: atom(), Name :: any()) -> Entry :: syn_registry_entry() | undefined.
+find_registry_entry_by_name(Scope, Name) ->
+    case ets:select(syn_backbone:get_table_name(syn_registry_by_name, Scope), [{
+        {{Name, '$2'}, '$3', '_', '_', '_'},
+        [],
+        ['$_']
+    }]) of
+        [RegistryEntry] -> RegistryEntry;
+        _ -> undefined
+    end.
+
+-spec find_monitor_for_pid(Scope :: atom(), Pid :: pid()) -> reference() | undefined.
+find_monitor_for_pid(Scope, Pid) when is_pid(Pid) ->
+    TableName = syn_backbone:get_table_name(syn_registry_by_pid, Scope),
+    case ets:select(TableName, [{
+        {{Pid, '_'}, '_', '_', '$5', '_'},
+        [],
+        ['$5']
+    }], 1) of
+        {[MonitorRef], _} -> MonitorRef;
+        _ -> undefined
+    end.
+
+-spec find_registry_tuples_by_pid(Scope :: atom(), Pid :: pid()) -> RegistryTuples :: [syn_registry_tuple()].
+find_registry_tuples_by_pid(Scope, Pid) when is_pid(Pid) ->
+    ets:select(syn_backbone:get_table_name(syn_registry_by_pid, Scope), [{
+        {{Pid, '$2'}, '$3', '$4', '_', '_'},
+        [],
+        [{{'$2', Pid, '$3', '$4'}}]
+    }]).
+
+-spec maybe_demonitor(Scope :: atom(), Pid :: pid()) -> ok.
+maybe_demonitor(Scope, Pid) ->
+    %% try to retrieve 2 items
+    %% if only 1 is returned it means that no other aliases exist for the Pid
+    case ets:select(syn_backbone:get_table_name(syn_registry_by_pid, Scope), [{
+        {{Pid, '_'}, '_', '_', '$5', '_'},
+        [],
+        ['$5']
+    }], 2) of
+        {[MRef], _} when MRef =/= undefined ->
+            %% no other aliases, demonitor
+            erlang:demonitor(MRef, [flush]),
+            ok;
+        _ ->
+            ok
+    end.
+
+-spec add_to_local_table(
+    Scope :: atom(),
+    Name :: any(),
+    Pid :: pid(),
+    Meta :: any(),
+    Time :: integer(),
+    MonitorRef :: undefined | reference()
+) -> true.
+add_to_local_table(Scope, Name, Pid, Meta, Time, MonitorRef) ->
+    ets:insert(syn_backbone:get_table_name(syn_registry_by_name, Scope),
+        {{Name, Pid}, Meta, Time, MonitorRef, node(Pid)}
+    ),
+    ets:insert(syn_backbone:get_table_name(syn_registry_by_pid, Scope),
+        {{Pid, Name}, Meta, Time, MonitorRef, node(Pid)}
+    ).
+
+-spec remove_from_local_table(Scope :: atom(), Name :: any(), Pid :: pid()) -> true.
+remove_from_local_table(Scope, Name, Pid) ->
+    ets:delete(syn_backbone:get_table_name(syn_registry_by_name, Scope), {Name, Pid}),
+    ets:delete(syn_backbone:get_table_name(syn_registry_by_pid, Scope), {Pid, Name}).

+ 3 - 3
test/syn_registry_SUITE.erl

@@ -74,8 +74,8 @@ groups() ->
     [
         {three_nodes_process_registration, [shuffle], [
             three_nodes_discover_default_scope,
-            three_nodes_discover_custom_scope
-%%            three_nodes_register_unregister_and_monitor_default_scope
+            three_nodes_discover_custom_scope,
+            three_nodes_register_unregister_and_monitor_default_scope
         ]}
     ].
 %% -------------------------------------------------------------------
@@ -376,7 +376,7 @@ three_nodes_register_unregister_and_monitor_default_scope(Config) ->
     syn_test_suite_helper:kill_process(Pid),
     syn_test_suite_helper:kill_process(PidRemote1),
     %% unregister process
-    syn:unregister(<<"my proc with meta">>),
+    ok = syn:unregister(<<"my proc with meta">>),
     timer:sleep(100),
 
     %% retrieve