Просмотр исходного кода

Convert groups to ETS backend.

Roberto Ostinelli 5 лет назад
Родитель
Сommit
d22e14cf9a
6 измененных файлов с 151 добавлено и 126 удалено
  1. 8 10
      src/syn.hrl
  2. 8 2
      src/syn_backbone.erl
  3. 114 107
      src/syn_groups.erl
  4. 20 5
      src/syn_registry.erl
  5. 1 1
      src/syn_sup.erl
  6. 0 1
      test/syn_groups_SUITE.erl

+ 8 - 10
src/syn.hrl

@@ -23,15 +23,6 @@
 %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 %% THE SOFTWARE.
 %% ==========================================================================================================
-%% records
--record(syn_groups_table, {
-    name = undefined :: any(),
-    pid = undefined :: atom() | pid(),
-    node = undefined :: atom(),
-    meta = undefined :: any(),
-    monitor_ref = undefined :: atom() | reference()
-}).
-
 %% types
 -type syn_registry_entry() :: {
     Name :: any(),
@@ -45,7 +36,14 @@
     Pid :: pid(),
     Meta :: any()
 }.
--type syn_group_tuple() :: {
+-type syn_groups_entry() :: {
+    GroupName :: any(),
+    Pid :: pid(),
+    Meta :: any(),
+    MonitorRef :: undefined | reference(),
+    Node :: node()
+}.
+-type syn_groups_tuple() :: {
     GroupName :: any(),
     Pid :: pid(),
     Meta :: any()

+ 8 - 2
src/syn_backbone.erl

@@ -73,10 +73,14 @@ get_event_handler_module() ->
     {stop, Reason :: any()}.
 init([]) ->
     %% create ETS tables
-    %% objects have structure {Name, Pid, Meta, MonitorRef, Node}
+    %% entries have structure {Name, Pid, Meta, MonitorRef, Node}
     ets:new(syn_registry_by_name, [set, public, named_table, {read_concurrency, true}, {write_concurrency, true}]),
-    %% objects have format {{Pid, Name}, Meta, MonitorRef, Node}
+    %% entries have format {{Pid, Name}, Meta, MonitorRef, Node}
     ets:new(syn_registry_by_pid, [ordered_set, public, named_table, {read_concurrency, true}, {write_concurrency, true}]),
+    %% entries have format {{GroupName, Pid}, Meta, MonitorRef, Node}
+    ets:new(syn_groups_by_name, [ordered_set, public, named_table, {read_concurrency, true}, {write_concurrency, true}]),
+    %% entries have format {{Pid, GroupName}, Meta, MonitorRef, Node}
+    ets:new(syn_groups_by_pid, [ordered_set, public, named_table, {read_concurrency, true}, {write_concurrency, true}]),
     %% init
     {ok, #state{}}.
 
@@ -128,6 +132,8 @@ terminate(Reason, _State) ->
     %% delete ETS tables
     ets:delete(syn_registry_by_name),
     ets:delete(syn_registry_by_pid),
+    ets:delete(syn_groups_by_name),
+    ets:delete(syn_groups_by_pid),
     %% return
     terminated.
 

+ 114 - 107
src/syn_groups.erl

@@ -79,7 +79,7 @@ join(GroupName, Pid, Meta) when is_pid(Pid) ->
 
 -spec leave(GroupName :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
 leave(GroupName, Pid) ->
-    case find_process_entry_by_name_and_pid(GroupName, Pid) of
+    case find_groups_entry_by_name_and_pid(GroupName, Pid) of
         undefined ->
             {error, not_in_group};
         _ ->
@@ -89,59 +89,57 @@ leave(GroupName, Pid) ->
 
 -spec get_members(Name :: any()) -> [pid()].
 get_members(GroupName) ->
-    Entries = mnesia:dirty_read(syn_groups_table, GroupName),
-    Pids = [Entry#syn_groups_table.pid || Entry <- Entries],
-    lists:sort(Pids).
+    lists:sort(ets:select(syn_groups_by_name, [{
+        {{GroupName, '$2'}, '_', '_', '_'},
+        [],
+        ['$2']
+    }])).
 
 -spec get_members(GroupName :: any(), with_meta) -> [{pid(), Meta :: any()}].
 get_members(GroupName, with_meta) ->
-    Entries = mnesia:dirty_read(syn_groups_table, GroupName),
-    Pids = [{Entry#syn_groups_table.pid, Entry#syn_groups_table.meta} || Entry <- Entries],
-    lists:sort(Pids).
+    Guard = case is_tuple(GroupName) of
+        true -> {'=:=', '$1', {GroupName}};
+        _ -> {'=:=', '$1', GroupName}
+    end,
+    lists:sort(ets:select(syn_groups_by_name, [{
+        {{'$1', '$2'}, '$3', '_', '_'},
+        [Guard],
+        [{{'$2', '$3'}}]
+    }])).
 
 -spec member(Pid :: pid(), GroupName :: any()) -> boolean().
 member(Pid, GroupName) ->
-    case find_process_entry_by_name_and_pid(GroupName, Pid) of
+    case find_groups_entry_by_name_and_pid(GroupName, Pid) of
         undefined -> false;
         _ -> true
     end.
 
 -spec get_local_members(Name :: any()) -> [pid()].
 get_local_members(GroupName) ->
-    %% build name guard
-    NameGuard = case is_tuple(GroupName) of
-        true -> {'==', '$1', {GroupName}};
-        _ -> {'=:=', '$1', GroupName}
-    end,
-    %% build match specs
-    MatchHead = #syn_groups_table{name = '$1', node = '$2', pid = '$3', _ = '_'},
-    Guards = [NameGuard, {'=:=', '$2', node()}],
-    Result = '$3',
-    %% select
-    Pids = mnesia:dirty_select(syn_groups_table, [{MatchHead, Guards, [Result]}]),
-    lists:sort(Pids).
+    Node = node(),
+    lists:sort(ets:select(syn_groups_by_name, [{
+        {{GroupName, '$2'}, '_', '_', Node},
+        [],
+        ['$2']
+    }])).
 
 -spec get_local_members(GroupName :: any(), with_meta) -> [{pid(), Meta :: any()}].
 get_local_members(GroupName, with_meta) ->
-    %% build name guard
-    NameGuard = case is_tuple(GroupName) of
-        true -> {'==', '$1', {GroupName}};
-        _ -> {'=:=', '$1', GroupName}
-    end,
-    %% build match specs
-    MatchHead = #syn_groups_table{name = '$1', node = '$2', pid = '$3', meta = '$4', _ = '_'},
-    Guards = [NameGuard, {'=:=', '$2', node()}],
-    Result = {{'$3', '$4'}},
-    %% select
-    PidsWithMeta = mnesia:dirty_select(syn_groups_table, [{MatchHead, Guards, [Result]}]),
-    lists:keysort(1, PidsWithMeta).
+    Node = node(),
+    lists:sort(ets:select(syn_groups_by_name, [{
+        {{GroupName, '$2'}, '$3', '_', Node},
+        [],
+        [{{'$2', '$3'}}]
+    }])).
 
 -spec local_member(Pid :: pid(), GroupName :: any()) -> boolean().
 local_member(Pid, GroupName) ->
-    case find_process_entry_by_name_and_pid(GroupName, Pid) of
-        undefined -> false;
-        Entry when Entry#syn_groups_table.node =:= node() -> true;
-        _ -> false
+    case find_groups_entry_by_name_and_pid(GroupName, Pid) of
+        {GroupName, Pid, _Meta, _MonitorRef, Node} when Node =:= node() ->
+            true;
+
+        _ ->
+            false
     end.
 
 -spec publish(GroupName :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
@@ -189,7 +187,7 @@ sync_join(RemoteNode, GroupName, Pid, Meta) ->
 sync_leave(RemoteNode, GroupName, Pid) ->
     gen_server:cast({?MODULE, RemoteNode}, {sync_leave, GroupName, Pid}).
 
--spec sync_get_local_group_tuples(FromNode :: node()) -> list(syn_group_tuple()).
+-spec sync_get_local_group_tuples(FromNode :: node()) -> list(syn_groups_tuple()).
 sync_get_local_group_tuples(FromNode) ->
     error_logger:info_msg("Syn(~p): Received request of local group tuples from remote node: ~p~n", [node(), FromNode]),
     get_group_tuples_for_node(node()).
@@ -293,23 +291,20 @@ handle_cast(Msg, State) ->
     {stop, Reason :: any(), #state{}}.
 
 handle_info({'DOWN', _MonitorRef, process, Pid, Reason}, State) ->
-    case find_processes_entry_by_pid(Pid) of
+    case find_groups_tuples_by_pid(Pid) of
         [] ->
             %% handle
             handle_process_down(undefined, Pid, undefined, Reason, State);
 
-        Entries ->
-            lists:foreach(fun(Entry) ->
-                %% get process info
-                GroupName = Entry#syn_groups_table.name,
-                Meta = Entry#syn_groups_table.meta,
+        GroupTuples ->
+            lists:foreach(fun({GroupName, _Pid, Meta}) ->
                 %% remove from table
-                remove_from_local_table(Entry),
+                remove_from_local_table(GroupName, Pid),
                 %% handle
                 handle_process_down(GroupName, Pid, Meta, Reason, State),
                 %% multicast
                 multicast_leave(GroupName, Pid)
-            end, Entries)
+            end, GroupTuples)
     end,
     %% return
     {noreply, State};
@@ -326,7 +321,13 @@ handle_info({nodedown, RemoteNode}, State) ->
     {noreply, State};
 
 handle_info(sync_full_cluster, State) ->
-    error_logger:info_msg("Syn(~p): Initiating full cluster groups sync for nodes: ~p~n", [node(), nodes()]),
+    case length(nodes()) > 0 of
+        true ->
+            error_logger:info_msg("Syn(~p): Initiating full cluster groups sync for nodes: ~p~n", [node(), nodes()]);
+
+        _ ->
+            ok
+    end,
     lists:foreach(fun(RemoteNode) ->
         groups_automerge(RemoteNode)
     end, nodes()),
@@ -372,43 +373,45 @@ multicast_leave(GroupName, Pid) ->
 
 -spec join_on_node(GroupName :: any(), Pid :: pid(), Meta :: any()) -> ok.
 join_on_node(GroupName, Pid, Meta) ->
-    MonitorRef = case find_processes_entry_by_pid(Pid) of
-        [] ->
+    MonitorRef = case find_monitor_for_pid(Pid) of
+        undefined ->
             %% process is not monitored yet, add
             erlang:monitor(process, Pid);
-        [Entry | _] ->
-            Entry#syn_groups_table.monitor_ref
+
+        MRef ->
+            MRef
     end,
     %% add to table
     add_to_local_table(GroupName, Pid, Meta, MonitorRef).
 
 -spec leave_on_node(GroupName :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
 leave_on_node(GroupName, Pid) ->
-    case find_process_entry_by_name_and_pid(GroupName, Pid) of
+    case find_groups_entry_by_name_and_pid(GroupName, Pid) of
         undefined ->
             {error, not_in_group};
 
-        Entry when Entry#syn_groups_table.monitor_ref =/= undefined ->
+        {GroupName, Pid, _Meta, MonitorRef, _Node} when MonitorRef =/= undefined ->
             %% is this the last group process is in?
-            case find_processes_entry_by_pid(Pid) of
-                [Entry] ->
-                    %% demonitor
-                    erlang:demonitor(Entry#syn_groups_table.monitor_ref, [flush]);
+            case find_groups_tuples_by_pid(Pid) of
+                [_GroupTuple] ->
+                    %% only one left (the one we're about to delete), demonitor
+                    erlang:demonitor(MonitorRef, [flush]);
+
                 _ ->
                     ok
             end,
             %% remove from table
-            remove_from_local_table(Entry);
+            remove_from_local_table(GroupName, Pid);
 
-        Entry when Entry#syn_groups_table.node =:= node() ->
+        {GroupName, Pid, _Meta, _MonitorRef, Node} = GroupsEntry when Node =:= node() ->
             error_logger:error_msg(
                 "Syn(~p): INTERNAL ERROR | Group entry ~p has no monitor but it's running on node~n",
-                [node(), Entry]
+                [node(), GroupsEntry]
             ),
             %% remove from table
-            remove_from_local_table(Entry);
+            remove_from_local_table(GroupName, Pid);
 
-        _Entry ->
+        _ ->
             %% race condition: leave request but entry in table is not a local pid (has no monitor)
             %% ignore it, sync messages will take care of it
             ok
@@ -416,55 +419,63 @@ leave_on_node(GroupName, Pid) ->
 
 -spec add_to_local_table(GroupName :: any(), Pid :: pid(), Meta :: any(), MonitorRef :: undefined | reference()) -> ok.
 add_to_local_table(GroupName, Pid, Meta, MonitorRef) ->
-    %% clean if any
-    remove_from_local_table(GroupName, Pid),
-    %% write
-    mnesia:dirty_write(#syn_groups_table{
-        name = GroupName,
-        pid = Pid,
-        node = node(Pid),
-        meta = Meta,
-        monitor_ref = MonitorRef
-    }).
+    ets:insert(syn_groups_by_name, {{GroupName, Pid}, Meta, MonitorRef, node(Pid)}),
+    ets:insert(syn_groups_by_pid, {{Pid, GroupName}, Meta, MonitorRef, node(Pid)}),
+    ok.
 
 -spec remove_from_local_table(GroupName :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
 remove_from_local_table(GroupName, Pid) ->
-    case find_process_entry_by_name_and_pid(GroupName, Pid) of
+    case ets:lookup(syn_groups_by_name, {GroupName, Pid}) of
         undefined ->
             {error, not_in_group};
-        Entry ->
-            %% remove from table
-            remove_from_local_table(Entry)
+
+        _ ->
+            ets:match_delete(syn_groups_by_name, {{GroupName, Pid}, '_', '_', '_'}),
+            ets:match_delete(syn_groups_by_name, {{Pid, GroupName}, '_', '_', '_'}),
+            ok
+    end.
+
+-spec find_groups_tuples_by_pid(Pid :: pid()) -> GroupTuples :: list(syn_groups_tuple()).
+find_groups_tuples_by_pid(Pid) when is_pid(Pid) ->
+    ets:select(syn_groups_by_pid, [{
+        {{'$1', '$2'}, '$3', '_', '_'},
+        [{'=:=', '$1', Pid}],
+        [{{'$2', '$1', '$3'}}]
+    }]).
+
+-spec find_groups_entry_by_name_and_pid(GroupName :: any(), Pid :: pid()) -> Entry :: syn_groups_entry() | undefined.
+find_groups_entry_by_name_and_pid(GroupName, Pid) ->
+    NameGuard = case is_tuple(GroupName) of
+        true -> {'=:=', '$1', {GroupName}};
+        _ -> {'=:=', '$1', GroupName}
+    end,
+    case ets:select(syn_groups_by_name, [{
+        {{'$1', '$2'}, '$3', '$4', '$5'},
+        [{'andalso', NameGuard, {'=:=', '$2', Pid}}],
+        [{{'$1', '$2', '$3', '$4', '$5'}}]
+    }]) of
+        [RegistryTuple] -> RegistryTuple;
+        _ -> undefined
     end.
 
--spec remove_from_local_table(Entry :: #syn_groups_table{}) -> ok.
-remove_from_local_table(Entry) ->
-    mnesia:dirty_delete_object(syn_groups_table, Entry).
-
--spec find_processes_entry_by_pid(Pid :: pid()) -> Entries :: list(#syn_groups_table{}).
-find_processes_entry_by_pid(Pid) when is_pid(Pid) ->
-    mnesia:dirty_index_read(syn_groups_table, Pid, #syn_groups_table.pid).
-
--spec find_process_entry_by_name_and_pid(GroupName :: any(), Pid :: pid()) -> Entry :: #syn_groups_table{} | undefined.
-find_process_entry_by_name_and_pid(GroupName, Pid) ->
-    %% build match specs
-    MatchHead = #syn_groups_table{name = GroupName, pid = Pid, _ = '_'},
-    Guards = [],
-    Result = '$_',
-    %% select
-    case mnesia:dirty_select(syn_groups_table, [{MatchHead, Guards, [Result]}]) of
-        [Entry] -> Entry;
-        [] -> undefined
+-spec find_monitor_for_pid(Pid :: pid()) -> reference() | undefined.
+find_monitor_for_pid(Pid) when is_pid(Pid) ->
+    case ets:select(syn_groups_by_pid, [{
+        {{'$1', '_'}, '_', '$4', '_'},
+        [{'=:=', '$1', Pid}],
+        ['$4']
+    }], 1) of
+        {[MonitorRef], _} -> MonitorRef;
+        _ -> undefined
     end.
 
--spec get_group_tuples_for_node(Node :: node()) -> [syn_group_tuple()].
+-spec get_group_tuples_for_node(Node :: node()) -> [syn_groups_tuple()].
 get_group_tuples_for_node(Node) ->
-    %% build match specs
-    MatchHead = #syn_groups_table{name = '$1', pid = '$2', node = '$3', meta = '$4', _ = '_'},
-    Guard = {'=:=', '$3', Node},
-    GroupTupleFormat = {{'$1', '$2', '$4'}},
-    %% select
-    mnesia:dirty_select(syn_groups_table, [{MatchHead, [Guard], [GroupTupleFormat]}]).
+    ets:select(syn_groups_by_name, [{
+        {{'$1', '$2'}, '$3', '_', '$5'},
+        [{'=:=', '$5', Node}],
+        [{{'$1', '$2', '$3'}}]
+    }]).
 
 -spec handle_process_down(GroupName :: any(), Pid :: pid(), Meta :: any(), Reason :: any(), #state{}) -> ok.
 handle_process_down(GroupName, Pid, Meta, Reason, #state{
@@ -503,7 +514,7 @@ groups_automerge(RemoteNode) ->
                             true ->
                                 add_to_local_table(GroupName, RemotePid, RemoteMeta, undefined);
                             _ ->
-                                ok = rpc:call(RemoteNode, syn_registry, remove_from_local_table, [GroupName, RemotePid])
+                                ok = rpc:call(RemoteNode, syn_groups, remove_from_local_table, [GroupName, RemotePid])
                         end
                     end, GroupTuples),
                     %% exit
@@ -515,12 +526,8 @@ groups_automerge(RemoteNode) ->
 -spec raw_purge_group_entries_for_node(Node :: atom()) -> ok.
 raw_purge_group_entries_for_node(Node) ->
     %% NB: no demonitoring is done, this is why it's raw
-    %% build match specs
-    Pattern = #syn_groups_table{node = Node, _ = '_'},
-    ObjectsToDelete = mnesia:dirty_match_object(syn_groups_table, Pattern),
-    %% delete
-    DelF = fun(Record) -> mnesia:dirty_delete_object(syn_groups_table, Record) end,
-    lists:foreach(DelF, ObjectsToDelete).
+    ets:match_delete(syn_groups_by_name, {{'_', '_'}, '_', '_', Node}),
+    ets:match_delete(syn_groups_by_pid, {{'_', '_'}, '_', '_', Node}).
 
 -spec multi_call_and_receive(
     CollectorPid :: pid(),

+ 20 - 5
src/syn_registry.erl

@@ -290,7 +290,13 @@ handle_info({nodedown, RemoteNode}, State) ->
     {noreply, State};
 
 handle_info(sync_full_cluster, State) ->
-    error_logger:info_msg("Syn(~p): Initiating full cluster registry sync for nodes: ~p~n", [node(), nodes()]),
+    case length(nodes()) > 0 of
+        true ->
+            error_logger:info_msg("Syn(~p): Initiating full cluster registry sync for nodes: ~p~n", [node(), nodes()]);
+
+        _ ->
+            ok
+    end,
     lists:foreach(fun(RemoteNode) ->
         registry_automerge(RemoteNode, State)
     end, nodes()),
@@ -340,6 +346,7 @@ register_on_node(Name, Pid, Meta) ->
         undefined ->
             %% process is not monitored yet, add
             erlang:monitor(process, Pid);
+
         MRef ->
             MRef
     end,
@@ -352,7 +359,7 @@ unregister_on_node(Name) ->
         undefined ->
             {error, undefined};
 
-        {Name, _Pid, _Meta, MonitorRef, Node} when MonitorRef =/= undefined ->
+        {Name, _Pid, _Meta, MonitorRef, _Node} when MonitorRef =/= undefined ->
             %% demonitor
             erlang:demonitor(MonitorRef, [flush]),
             %% remove from table
@@ -400,20 +407,28 @@ remove_from_local_table(Name) ->
 
 -spec find_registry_tuple_by_name(Name :: any()) -> RegistryTuple :: syn_registry_tuple() | undefined.
 find_registry_tuple_by_name(Name) ->
+    Guard = case is_tuple(Name) of
+        true -> {'=:=', '$1', {Name}};
+        _ -> {'=:=', '$1', Name}
+    end,
     case ets:select(syn_registry_by_name, [{
         {'$1', '$2', '$3', '_', '_'},
-        [{'=:=', '$1', Name}],
+        [Guard],
         [{{'$1', '$2', '$3'}}]
     }]) of
         [RegistryTuple] -> RegistryTuple;
         _ -> undefined
     end.
 
--spec find_registry_entry_by_name(Name :: any()) -> Object :: syn_registry_tuple() | undefined.
+-spec find_registry_entry_by_name(Name :: any()) -> Entry :: syn_registry_entry() | undefined.
 find_registry_entry_by_name(Name) ->
+    Guard = case is_tuple(Name) of
+        true -> {'=:=', '$1', {Name}};
+        _ -> {'=:=', '$1', Name}
+    end,
     case ets:select(syn_registry_by_name, [{
         {'$1', '$2', '$3', '_', '_'},
-        [{'=:=', '$1', Name}],
+        [Guard],
         ['$_']
     }]) of
         [RegistryTuple] -> RegistryTuple;

+ 1 - 1
src/syn_sup.erl

@@ -51,7 +51,7 @@ start_link() ->
 init([]) ->
     Children = [
         ?CHILD(syn_backbone, worker),
-%%        ?CHILD(syn_groups, worker),
+        ?CHILD(syn_groups, worker),
         ?CHILD(syn_registry, worker)
     ],
     {ok, {{one_for_one, 10, 10}, Children}}.

+ 0 - 1
test/syn_groups_SUITE.erl

@@ -92,7 +92,6 @@ all() ->
 groups() ->
     [
         {single_node_groups, [shuffle], [
-            single_node_join_and_monitor,
             single_node_join_and_leave,
             single_node_join_errors,
             single_node_publish,