Browse Source

Convert registry to ETS backend.

Roberto Ostinelli 5 years ago
parent
commit
f9aaea0b6d
5 changed files with 115 additions and 87 deletions
  1. 7 11
      src/syn.hrl
  2. 3 1
      src/syn_backbone.erl
  3. 103 73
      src/syn_registry.erl
  4. 1 1
      src/syn_sup.erl
  5. 1 1
      test/syn_registry_SUITE.erl

+ 7 - 11
src/syn.hrl

@@ -24,13 +24,6 @@
 %% THE SOFTWARE.
 %% ==========================================================================================================
 %% records
--record(syn_registry_table, {
-    name = undefined :: any(),
-    pid = undefined :: atom() | pid(),
-    node = undefined :: atom(),
-    meta = undefined :: any(),
-    monitor_ref = undefined :: atom() | reference()
-}).
 -record(syn_groups_table, {
     name = undefined :: any(),
     pid = undefined :: atom() | pid(),
@@ -40,6 +33,13 @@
 }).
 
 %% types
+-type syn_registry_entry() :: {
+    Name :: any(),
+    Pid :: pid(),
+    Meta :: any(),
+    MonitorRef :: undefined | reference(),
+    Node :: node()
+}.
 -type syn_registry_tuple() :: {
     Name :: any(),
     Pid :: pid(),
@@ -50,7 +50,3 @@
     Pid :: pid(),
     Meta :: any()
 }.
--export_type([
-    syn_registry_tuple/0,
-    syn_group_tuple/0
-]).

+ 3 - 1
src/syn_backbone.erl

@@ -73,8 +73,10 @@ get_event_handler_module() ->
     {stop, Reason :: any()}.
 init([]) ->
     %% create ETS tables
+    %% objects have structure {Name, Pid, Meta, MonitorRef, Node}
     ets:new(syn_registry_by_name, [set, public, named_table, {read_concurrency, true}, {write_concurrency, true}]),
-    ets:new(syn_registry_by_pid, [set, public, named_table, {read_concurrency, true}, {write_concurrency, true}]),
+    %% objects have format {{Pid, Name}, Meta, MonitorRef, Node}
+    ets:new(syn_registry_by_pid, [ordered_set, public, named_table, {read_concurrency, true}, {write_concurrency, true}]),
     %% init
     {ok, #state{}}.
 

+ 103 - 73
src/syn_registry.erl

@@ -69,36 +69,39 @@ register(Name, Pid, Meta) when is_pid(Pid) ->
 -spec unregister(Name :: any()) -> ok | {error, Reason :: any()}.
 unregister(Name) ->
     % get process' node
-    case find_process_entry_by_name(Name) of
+    case find_registry_tuple_by_name(Name) of
         undefined ->
             {error, undefined};
-        Entry ->
-            Node = node(Entry#syn_registry_table.pid),
+        {Name, Pid, _Meta} ->
+            Node = node(Pid),
             gen_server:call({?MODULE, Node}, {unregister_on_node, Name})
     end.
 
 -spec whereis(Name :: any()) -> pid() | undefined.
 whereis(Name) ->
-    case find_process_entry_by_name(Name) of
+    case find_registry_tuple_by_name(Name) of
         undefined -> undefined;
-        Entry -> Entry#syn_registry_table.pid
+        {Name, Pid, _Meta} -> Pid
     end.
 
 -spec whereis(Name :: any(), with_meta) -> {pid(), Meta :: any()} | undefined.
 whereis(Name, with_meta) ->
-    case find_process_entry_by_name(Name) of
+    case find_registry_tuple_by_name(Name) of
         undefined -> undefined;
-        Entry -> {Entry#syn_registry_table.pid, Entry#syn_registry_table.meta}
+        {Name, Pid, Meta} -> {Pid, Meta}
     end.
 
 -spec count() -> non_neg_integer().
 count() ->
-    mnesia:table_info(syn_registry_table, size).
+    ets:info(syn_registry_by_name, size).
 
 -spec count(Node :: node()) -> non_neg_integer().
 count(Node) ->
-    RegistryTuples = get_registry_tuples_for_node(Node),
-    length(RegistryTuples).
+    ets:select_count(syn_registry_by_name, [{
+        {'_', '_', '_', '_', '$5'},
+        [{'=:=', '$5', Node}],
+        [true]
+    }]).
 
 -spec sync_register(RemoteNode :: node(), Name :: any(), RemotePid :: pid(), RemoteMeta :: any()) -> ok.
 sync_register(RemoteNode, Name, RemotePid, RemoteMeta) ->
@@ -155,7 +158,7 @@ handle_call({register_on_node, Name, Pid, Meta}, _From, State) ->
     case is_process_alive(Pid) of
         true ->
             %% check if name available
-            case find_process_entry_by_name(Name) of
+            case find_registry_tuple_by_name(Name) of
                 undefined ->
                     register_on_node(Name, Pid, Meta),
                     %% multicast
@@ -163,7 +166,7 @@ handle_call({register_on_node, Name, Pid, Meta}, _From, State) ->
                     %% return
                     {reply, ok, State};
 
-                Entry when Entry#syn_registry_table.pid == Pid ->
+                {Name, Pid, _OldMeta} ->
                     register_on_node(Name, Pid, Meta),
                     %% multicast
                     multicast_register(Name, Pid, Meta),
@@ -204,22 +207,19 @@ handle_cast({sync_register, Name, RemotePid, RemoteMeta}, State) ->
     %% get remote node
     RemoteNode = node(RemotePid),
     %% check for conflicts
-    case find_process_entry_by_name(Name) of
+    case find_registry_tuple_by_name(Name) of
         undefined ->
             %% no conflict
             add_to_local_table(Name, RemotePid, RemoteMeta, undefined);
 
-        Entry when Entry#syn_registry_table.pid =:= RemotePid ->
+        {Name, RemotePid, _Meta} ->
             %% same process, no conflict, overwrite
             add_to_local_table(Name, RemotePid, RemoteMeta, undefined);
 
-        Entry ->
+        {Name, TablePid, TableMeta} ->
             %% different pid, we have a conflict
             global:trans({{?MODULE, {inconsistent_name, Name}}, self()},
                 fun() ->
-                    TablePid = Entry#syn_registry_table.pid,
-                    TableMeta = Entry#syn_registry_table.meta,
-
                     error_logger:warning_msg(
                         "Syn(~p): REGISTRY INCONSISTENCY (name: ~p for ~p and ~p) ----> Initiating for remote node ~p~n",
                         [node(), Name, {TablePid, TableMeta}, {RemotePid, RemoteMeta}, RemoteNode]
@@ -260,17 +260,13 @@ handle_cast(Msg, State) ->
     {stop, Reason :: any(), #state{}}.
 
 handle_info({'DOWN', _MonitorRef, process, Pid, Reason}, State) ->
-    case find_processes_entry_by_pid(Pid) of
+    case find_registry_tuples_by_pid(Pid) of
         [] ->
             %% handle
             handle_process_down(undefined, Pid, undefined, Reason, State);
 
         Entries ->
-            lists:foreach(fun(Entry) ->
-                %% get process info
-                Name = Entry#syn_registry_table.name,
-                Pid = Entry#syn_registry_table.pid,
-                Meta = Entry#syn_registry_table.meta,
+            lists:foreach(fun({Name, _Pid, Meta}) ->
                 %% handle
                 handle_process_down(Name, Pid, Meta, Reason, State),
                 %% remove from table
@@ -340,37 +336,37 @@ multicast_unregister(Name) ->
 
 -spec register_on_node(Name :: any(), Pid :: pid(), Meta :: any()) -> ok.
 register_on_node(Name, Pid, Meta) ->
-    MonitorRef = case find_processes_entry_by_pid(Pid) of
-        [] ->
+    MonitorRef = case find_monitor_for_pid(Pid) of
+        undefined ->
             %% process is not monitored yet, add
             erlang:monitor(process, Pid);
-        [Entry | _] ->
-            Entry#syn_registry_table.monitor_ref
+        MRef ->
+            MRef
     end,
     %% add to table
     add_to_local_table(Name, Pid, Meta, MonitorRef).
 
 -spec unregister_on_node(Name :: any()) -> ok | {error, Reason :: any()}.
 unregister_on_node(Name) ->
-    case find_process_entry_by_name(Name) of
+    case find_registry_entry_by_name(Name) of
         undefined ->
             {error, undefined};
 
-        Entry when Entry#syn_registry_table.monitor_ref =/= undefined ->
+        {Name, _Pid, _Meta, MonitorRef, Node} when MonitorRef =/= undefined ->
             %% demonitor
-            erlang:demonitor(Entry#syn_registry_table.monitor_ref, [flush]),
+            erlang:demonitor(MonitorRef, [flush]),
             %% remove from table
             remove_from_local_table(Name);
 
-        Entry when Entry#syn_registry_table.node =:= node() ->
+        {Name, _Pid, _Meta, _MonitorRef, Node} = RegistryTuple when Node =:= node() ->
             error_logger:error_msg(
                 "Syn(~p): INTERNAL ERROR | Registry entry ~p has no monitor but it's running on node~n",
-                [node(), Entry]
+                [node(), RegistryTuple]
             ),
             %% remove from table
             remove_from_local_table(Name);
 
-        _Entry ->
+        _ ->
             %% race condition: un-registration request but entry in table is not a local pid (has no monitor)
             %% ignore it, sync messages will take care of it
             ok
@@ -378,37 +374,78 @@ unregister_on_node(Name) ->
 
 -spec add_to_local_table(Name :: any(), Pid :: pid(), Meta :: any(), MonitorRef :: undefined | reference()) -> ok.
 add_to_local_table(Name, Pid, Meta, MonitorRef) ->
-    mnesia:dirty_write(#syn_registry_table{
-        name = Name,
-        pid = Pid,
-        node = node(Pid),
-        meta = Meta,
-        monitor_ref = MonitorRef
-    }).
+    %% remove entry if previous exists
+    case find_registry_tuple_by_name(Name) of
+        undefined ->
+            ok;
+
+        {Name, OldPid, _OldMeta} ->
+            ets:delete(syn_registry_by_pid, {OldPid, Name})
+    end,
+    %% overwrite & add
+    ets:insert(syn_registry_by_name, {Name, Pid, Meta, MonitorRef, node(Pid)}),
+    ets:insert(syn_registry_by_pid, {{Pid, Name}, Meta, MonitorRef, node(Pid)}),
+    ok.
 
 -spec remove_from_local_table(Name :: any()) -> ok.
 remove_from_local_table(Name) ->
-    mnesia:dirty_delete(syn_registry_table, Name).
+    case ets:take(syn_registry_by_name, Name) of
+        [{Name, Pid, _, _, _}] ->
+            ets:match_delete(syn_registry_by_pid, {{Pid, Name}, '_', '_', '_'}),
+            ok;
+
+        _ ->
+            ok
+    end.
 
--spec find_processes_entry_by_pid(Pid :: pid()) -> Entries :: [#syn_registry_table{}].
-find_processes_entry_by_pid(Pid) when is_pid(Pid) ->
-    mnesia:dirty_index_read(syn_registry_table, Pid, #syn_registry_table.pid).
+-spec find_registry_tuple_by_name(Name :: any()) -> RegistryTuple :: syn_registry_tuple() | undefined.
+find_registry_tuple_by_name(Name) ->
+    case ets:select(syn_registry_by_name, [{
+        {'$1', '$2', '$3', '_', '_'},
+        [{'=:=', '$1', Name}],
+        [{{'$1', '$2', '$3'}}]
+    }]) of
+        [RegistryTuple] -> RegistryTuple;
+        _ -> undefined
+    end.
+
+-spec find_registry_entry_by_name(Name :: any()) -> Object :: syn_registry_tuple() | undefined.
+find_registry_entry_by_name(Name) ->
+    case ets:select(syn_registry_by_name, [{
+        {'$1', '$2', '$3', '_', '_'},
+        [{'=:=', '$1', Name}],
+        ['$_']
+    }]) of
+        [RegistryTuple] -> RegistryTuple;
+        _ -> undefined
+    end.
 
--spec find_process_entry_by_name(Name :: any()) -> Entry :: #syn_registry_table{} | undefined.
-find_process_entry_by_name(Name) ->
-    case mnesia:dirty_read(syn_registry_table, Name) of
-        [Entry] -> Entry;
+-spec find_monitor_for_pid(Pid :: pid()) -> reference() | undefined.
+find_monitor_for_pid(Pid) when is_pid(Pid) ->
+    case ets:select(syn_registry_by_pid, [{
+        {{'$1', '_'}, '_', '$4', '_'},
+        [{'=:=', '$1', Pid}],
+        ['$4']
+    }], 1) of
+        {[MonitorRef], _} -> MonitorRef;
         _ -> undefined
     end.
 
+-spec find_registry_tuples_by_pid(Pid :: pid()) -> Entries :: [syn_registry_tuple()].
+find_registry_tuples_by_pid(Pid) when is_pid(Pid) ->
+    ets:select(syn_registry_by_pid, [{
+        {{'$1', '$2'}, '$3', '_', '_'},
+        [{'=:=', '$1', Pid}],
+        [{{'$2', '$1', '$3'}}]
+    }]).
+
 -spec get_registry_tuples_for_node(Node :: node()) -> [syn_registry_tuple()].
 get_registry_tuples_for_node(Node) ->
-    %% build match specs
-    MatchHead = #syn_registry_table{name = '$1', pid = '$2', node = '$3', meta = '$4', _ = '_'},
-    Guard = {'=:=', '$3', Node},
-    RegistryTupleFormat = {{'$1', '$2', '$4'}},
-    %% select
-    mnesia:dirty_select(syn_registry_table, [{MatchHead, [Guard], [RegistryTupleFormat]}]).
+    ets:select(syn_registry_by_name, [{
+        {'$1', '$2', '$3', '_', '$5'},
+        [{'=:=', '$5', Node}],
+        [{{'$1', '$2', '$3'}}]
+    }]).
 
 -spec handle_process_down(Name :: any(), Pid :: pid(), Meta :: any(), Reason :: any(), #state{}) -> ok.
 handle_process_down(Name, Pid, Meta, Reason, #state{
@@ -419,6 +456,7 @@ handle_process_down(Name, Pid, Meta, Reason, #state{
             case Reason of
                 {syn_resolve_kill, KillName, KillMeta} ->
                     syn_event_handler:do_on_process_exit(KillName, Pid, KillMeta, syn_resolve_kill, CustomEventHandler);
+
                 _ ->
                     error_logger:warning_msg(
                         "Syn(~p): Received a DOWN message from an unregistered process ~p with reason: ~p~n",
@@ -440,17 +478,17 @@ registry_automerge(RemoteNode, State) ->
                 {badrpc, _} ->
                     error_logger:info_msg("Syn(~p): REGISTRY AUTOMERGE <---- Syn not ready on remote node ~p, postponing~n", [node(), RemoteNode]);
 
-                RegistryTuples ->
+                Entries ->
                     error_logger:info_msg(
                         "Syn(~p): Received ~p registry tuple(s) from remote node ~p~n",
-                        [node(), length(RegistryTuples), RemoteNode]
+                        [node(), length(Entries), RemoteNode]
                     ),
                     %% ensure that registry doesn't have any joining node's entries (here again for race conditions)
                     raw_purge_registry_entries_for_remote_node(RemoteNode),
                     %% loop
                     F = fun({Name, RemotePid, RemoteMeta}) ->
                         %% check if same name is registered
-                        case find_process_entry_by_name(Name) of
+                        case find_registry_tuple_by_name(Name) of
                             undefined ->
                                 %% no conflict
                                 case rpc:call(node(RemotePid), erlang, is_process_alive, [RemotePid]) of
@@ -460,10 +498,7 @@ registry_automerge(RemoteNode, State) ->
                                         ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [Name])
                                 end;
 
-                            Entry ->
-                                LocalPid = Entry#syn_registry_table.pid,
-                                LocalMeta = Entry#syn_registry_table.meta,
-
+                            {Name, LocalPid, LocalMeta} ->
                                 error_logger:warning_msg(
                                     "Syn(~p): Conflicting name in auto merge for: ~p, processes are ~p, ~p~n",
                                     [node(), Name, {LocalPid, LocalMeta}, {RemotePid, RemoteMeta}]
@@ -477,7 +512,7 @@ registry_automerge(RemoteNode, State) ->
                         end
                     end,
                     %% add to table
-                    lists:foreach(F, RegistryTuples),
+                    lists:foreach(F, Entries),
                     %% exit
                     error_logger:info_msg("Syn(~p): REGISTRY AUTOMERGE <---- Done for remote node ~p~n", [node(), RemoteNode])
             end
@@ -574,18 +609,12 @@ syn_kill(PidToKill, Name, Meta) -> exit(PidToKill, {syn_resolve_kill, Name, Meta
 -spec raw_purge_registry_entries_for_remote_node(Node :: atom()) -> ok.
 raw_purge_registry_entries_for_remote_node(Node) when Node =/= node() ->
     %% NB: no demonitoring is done, this is why it's raw
-    %% build match specs
-    MatchHead = #syn_registry_table{name = '$1', node = '$2', _ = '_'},
-    Guard = {'=:=', '$2', Node},
-    IdFormat = '$1',
-    %% delete
-    NodePids = mnesia:dirty_select(syn_registry_table, [{MatchHead, [Guard], [IdFormat]}]),
-    DelF = fun(Id) -> mnesia:dirty_delete({syn_registry_table, Id}) end,
-    lists:foreach(DelF, NodePids).
+    ets:match_delete(syn_registry_by_name, {'_', '_', '_', '_', Node}),
+    ets:match_delete(syn_registry_by_pid, {{'_', '_'}, '_', '_', Node}).
 
 -spec rebuild_monitors() -> ok.
 rebuild_monitors() ->
-    RegistryTuples = get_registry_tuples_for_node(node()),
+    Entries = get_registry_tuples_for_node(node()),
     lists:foreach(fun({Name, Pid, Meta}) ->
         case is_process_alive(Pid) of
             true ->
@@ -595,4 +624,5 @@ rebuild_monitors() ->
             _ ->
                 remove_from_local_table(Name)
         end
-    end, RegistryTuples).
+    end, Entries).
+

+ 1 - 1
src/syn_sup.erl

@@ -51,7 +51,7 @@ start_link() ->
 init([]) ->
     Children = [
         ?CHILD(syn_backbone, worker),
-        ?CHILD(syn_groups, worker),
+%%        ?CHILD(syn_groups, worker),
         ?CHILD(syn_registry, worker)
     ],
     {ok, {{one_for_one, 10, 10}, Children}}.

+ 1 - 1
test/syn_registry_SUITE.erl

@@ -90,7 +90,7 @@ all() ->
 %% -------------------------------------------------------------------
 %% Function: groups() -> [Group]
 %% Group = {GroupName,Properties,GroupsAndTestCases}
-%% GroupName = atom()
+%% GroupName =  atom()
 %% Properties = [parallel | sequence | Shuffle | {RepeatType,N}]
 %% GroupsAndTestCases = [Group | {group,GroupName} | TestCase]
 %% TestCase = atom()