Browse Source

[optimization] Use lookup tables instead of rebuilding process & atom names.

Roberto Ostinelli 3 years ago
parent
commit
11831874c7
5 changed files with 74 additions and 43 deletions
  1. 28 16
      src/syn_backbone.erl
  2. 16 8
      src/syn_gen_scope.erl
  3. 23 19
      src/syn_registry.erl
  4. 6 0
      test/syn_benchmark.erl
  5. 1 0
      test/syn_registry_SUITE.erl

+ 28 - 16
src/syn_backbone.erl

@@ -49,11 +49,12 @@ start_link() ->
 create_tables_for_scope(Scope) ->
     gen_server:call(?MODULE, {create_tables_for_scope, Scope}).
 
--spec get_table_name(TableName :: atom(), Scope :: atom()) -> atom().
-get_table_name(TableName, Scope) ->
-    TableNameBin = atom_to_binary(TableName),
-    ScopeBin = atom_to_binary(Scope),
-    binary_to_atom(<<TableNameBin/binary, "_", ScopeBin/binary>>).
+-spec get_table_name(TableId :: atom(), Scope :: atom()) -> TableName :: atom() | undefined.
+get_table_name(TableId, Scope) ->
+    case ets:lookup(syn_table_names, {TableId, Scope}) of
+        [{_, TableName}] -> TableName;
+        [] -> undefined
+    end.
 
 %% ===================================================================
 %% Callbacks
@@ -68,6 +69,9 @@ get_table_name(TableName, Scope) ->
     ignore |
     {stop, Reason :: any()}.
 init([]) ->
+    %% create table names table
+    ets:new(syn_table_names, [set, public, named_table, {read_concurrency, true}, {decentralized_counters, true}]),
+    ets:new(syn_process_names, [set, public, named_table, {read_concurrency, true}, {decentralized_counters, true}]),
     %% init
     {ok, #{}}.
 
@@ -82,15 +86,15 @@ init([]) ->
     {stop, Reason :: any(), Reply :: any(), State :: map()} |
     {stop, Reason :: any(), State :: map()}.
 handle_call({create_tables_for_scope, Scope}, _From, State) ->
-    error_logger:info_msg("SYN[~p] Creating tables for scope: ~p", [node(), Scope]),
-    ensure_table_exists(get_table_name(syn_registry_by_name, Scope)),
-    ensure_table_exists(get_table_name(syn_registry_by_pid, Scope)),
-    ensure_table_exists(get_table_name(syn_groups_by_name, Scope)),
-    ensure_table_exists(get_table_name(syn_groups_by_pid, Scope)),
+    error_logger:info_msg("SYN[~s] Creating tables for scope '~s'", [node(), Scope]),
+    ensure_table_exists(syn_registry_by_name, Scope),
+    ensure_table_exists(syn_registry_by_pid, Scope),
+    ensure_table_exists(syn_groups_by_name, Scope),
+    ensure_table_exists(syn_groups_by_pid, Scope),
     {reply, ok, State};
 
 handle_call(Request, From, State) ->
-    error_logger:warning_msg("SYN[~p] Received from ~p an unknown call message: ~p", [node(), Request, From]),
+    error_logger:warning_msg("SYN[~s] Received from ~p an unknown call message: ~p", [node(), From, Request]),
     {reply, undefined, State}.
 
 %% ----------------------------------------------------------------------------------------------------------
@@ -134,15 +138,23 @@ code_change(_OldVsn, State, _Extra) ->
 %% ===================================================================
 %% Internal
 %% ===================================================================
--spec ensure_table_exists(Name :: atom()) -> atom().
-ensure_table_exists(Name) ->
-    case ets:whereis(Name) of
+-spec ensure_table_exists(TableId :: atom(), Scope :: atom()) -> ok.
+ensure_table_exists(TableId, Scope) ->
+    %% build name
+    TableIdBin = atom_to_binary(TableId),
+    ScopeBin = atom_to_binary(Scope),
+    TableName = binary_to_atom(<<TableIdBin/binary, "_", ScopeBin/binary>>),
+    %% save to loopkup table
+    true = ets:insert(syn_table_names, {{TableId, Scope}, TableName}),
+    %% check or create
+    case ets:whereis(TableName) of
         undefined ->
             %% regarding decentralized_counters: <https://blog.erlang.org/scalable-ets-counters/>
-            ets:new(Name, [
+            ets:new(TableName, [
                 ordered_set, public, named_table,
                 {read_concurrency, true}, {write_concurrency, true}, {decentralized_counters, true}
-            ]);
+            ]),
+            ok;
 
         _ ->
             ok

+ 16 - 8
src/syn_gen_scope.erl

@@ -80,7 +80,13 @@
 -spec start_link(Handler :: module(), Scope :: atom(), Args :: [any()]) ->
     {ok, Pid :: pid()} | {error, {already_started, Pid :: pid()}} | {error, Reason :: any()}.
 start_link(Handler, Scope, Args) when is_atom(Scope) ->
-    ProcessName = get_process_name_for_scope(Handler, Scope),
+    %% build name
+    HandlerBin = atom_to_binary(Handler),
+    ScopeBin = atom_to_binary(Scope),
+    ProcessName = binary_to_atom(<<HandlerBin/binary, "_", ScopeBin/binary>>),
+    %% save to lookup table
+    true = ets:insert(syn_process_names, {{Handler, Scope}, ProcessName}),
+    %% create process
     gen_server:start_link({local, ProcessName}, ?MODULE, [Handler, Scope, ProcessName, Args], []).
 
 -spec get_subcluster_nodes(Handler :: module(), Scope :: atom()) -> [node()].
@@ -94,8 +100,10 @@ call(Handler, Scope, Message) ->
 
 -spec call(Handler :: module(), Node :: atom(), Scope :: atom(), Message :: any()) -> Response :: any().
 call(Handler, Node, Scope, Message) ->
-    ProcessName = get_process_name_for_scope(Handler, Scope),
-    gen_server:call({ProcessName, Node}, Message).
+    case get_process_name_for_scope(Handler, Scope) of
+        undefined -> error({invalid_scope, Scope});
+        ProcessName ->  gen_server:call({ProcessName, Node}, Message)
+    end.
 
 %% ===================================================================
 %% In-Process API
@@ -293,12 +301,12 @@ code_change(_OldVsn, State, _Extra) ->
 %% ===================================================================
 %% Internal
 %% ===================================================================
--spec get_process_name_for_scope(Handler :: module(), Scope :: atom()) -> atom().
+-spec get_process_name_for_scope(Handler :: module(), Scope :: atom()) -> ProcessName :: atom() | undefined.
 get_process_name_for_scope(Handler, Scope) ->
-    ModuleBin = atom_to_binary(Handler),
-    ScopeBin = atom_to_binary(Scope),
-    binary_to_atom(<<ModuleBin/binary, "_", ScopeBin/binary>>).
-
+    case ets:lookup(syn_process_names, {Handler, Scope}) of
+        [{_, ProcessName}] -> ProcessName;
+        [] -> undefined
+    end.
 
 -spec multicast_loop() -> terminated.
 multicast_loop() ->

+ 23 - 19
src/syn_registry.erl

@@ -70,11 +70,10 @@ lookup(Name) ->
 
 -spec lookup(Scope :: atom(), Name :: any()) -> {pid(), Meta :: any()} | undefined.
 lookup(Scope, Name) ->
-    try find_registry_entry_by_name(Scope, Name) of
+    case find_registry_entry_by_name(Scope, Name) of
         undefined -> undefined;
+        non_existent_table -> error({invalid_scope, Scope});
         {{Name, Pid}, Meta, _, _, _} -> {Pid, Meta}
-    catch
-        error:badarg -> error({invalid_scope, Scope})
     end.
 
 -spec register(Name :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
@@ -91,7 +90,7 @@ register(Scope, Name, Pid) when is_pid(Pid) ->
 -spec register(Scope :: atom(), Name :: any(), Pid :: pid(), Meta :: any()) -> ok | {error, Reason :: any()}.
 register(Scope, Name, Pid, Meta) ->
     Node = node(Pid),
-    try syn_gen_scope:call(?MODULE, Node, Scope, {register_on_owner, node(), Name, Pid, Meta}) of
+    case syn_gen_scope:call(?MODULE, Node, Scope, {register_on_owner, node(), Name, Pid, Meta}) of
         {ok, {TablePid, TableMeta, Time}} when Node =/= node() ->
             %% update table on caller node immediately so that subsequent calls have an updated registry
             add_to_local_table(Scope, Name, Pid, Meta, Time, undefined),
@@ -102,8 +101,6 @@ register(Scope, Name, Pid, Meta) ->
 
         {Response, _} ->
             Response
-    catch
-        exit:{noproc, {gen_server, call, _}} -> error({invalid_scope, Scope})
     end.
 
 -spec unregister(Name :: any()) -> ok | {error, Reason :: any()}.
@@ -113,10 +110,13 @@ unregister(Name) ->
 -spec unregister(Scope :: atom(), Name :: any()) -> ok | {error, Reason :: any()}.
 unregister(Scope, Name) ->
     % get process' node
-    try find_registry_entry_by_name(Scope, Name) of
+    case find_registry_entry_by_name(Scope, Name) of
         undefined ->
             {error, undefined};
 
+        non_existent_table ->
+            error({invalid_scope, Scope});
+
         {{Name, Pid}, Meta, _, _, _} ->
             Node = node(Pid),
             case syn_gen_scope:call(?MODULE, Node, Scope, {unregister_on_owner, node(), Name, Pid}) of
@@ -131,9 +131,6 @@ unregister(Scope, Name) ->
                 Response ->
                     Response
             end
-    catch
-        exit:{noproc, {gen_server, call, _}} -> error({invalid_scope, Scope});
-        error:badarg -> error({invalid_scope, Scope})
     end.
 
 -spec count(Scope :: atom()) -> non_neg_integer().
@@ -244,7 +241,7 @@ handle_call({unregister_on_owner, RequesterNode, Name, Pid}, _From, #state{scope
     end;
 
 handle_call(Request, From, State) ->
-    error_logger:warning_msg("SYN[~s] Received from ~p an unknown call message: ~p", [node(), Request, From]),
+    error_logger:warning_msg("SYN[~s] Received from ~p an unknown call message: ~p", [node(), From, Request]),
     {reply, undefined, State}.
 
 %% ----------------------------------------------------------------------------------------------------------
@@ -333,15 +330,22 @@ get_registry_tuples_for_node(Scope, Node) ->
         [{{'$1', '$2', '$3', '$4'}}]
     }]).
 
--spec find_registry_entry_by_name(Scope :: atom(), Name :: any()) -> Entry :: syn_registry_entry() | undefined.
+-spec find_registry_entry_by_name(Scope :: atom(), Name :: any()) ->
+    Entry :: syn_registry_entry() | undefined | non_existent_table.
 find_registry_entry_by_name(Scope, Name) ->
-    case ets:select(syn_backbone:get_table_name(syn_registry_by_name, Scope), [{
-        {{Name, '_'}, '_', '_', '_', '_'},
-        [],
-        ['$_']
-    }]) of
-        [RegistryEntry] -> RegistryEntry;
-        [] -> undefined
+    case syn_backbone:get_table_name(syn_registry_by_name, Scope) of
+        undefined ->
+            non_existent_table;
+
+        TableName ->
+            case ets:select(TableName, [{
+                {{Name, '_'}, '_', '_', '_', '_'},
+                [],
+                ['$_']
+            }]) of
+                [RegistryEntry] -> RegistryEntry;
+                [] -> undefined
+            end
     end.
 
 -spec find_registry_entries_by_pid(Scope :: atom(), Pid :: pid()) -> RegistryEntries :: [syn_registry_entry()].

+ 6 - 0
test/syn_benchmark.erl

@@ -76,6 +76,9 @@ start() ->
         maps:put(Node, Pids, Acc)
     end, #{}, NodesInfo),
 
+%%    {ok, P} = eprof:start(),
+%%    eprof:start_profiling(erlang:processes() -- [P]),
+
     %% start registration
     lists:foreach(fun({Node, FromName, _ToName}) ->
         Pids = maps:get(Node, PidsMap),
@@ -156,6 +159,9 @@ start() ->
     KillRate = ProcessCount / KillPropagationTime,
     io:format("====> Unregistered after kill rate (with propagation): ~p/sec.~n~n", [KillRate]),
 
+%%    eprof:stop_profiling(),
+%%    eprof:analyze(total),
+
     %% stop node
     init:stop().
 

+ 1 - 0
test/syn_registry_SUITE.erl

@@ -457,6 +457,7 @@ three_nodes_register_unregister_and_monitor_default_scope(Config) ->
     Pid1 = syn_test_suite_helper:start_process(),
     Pid2 = syn_test_suite_helper:start_process(),
     ok = syn:register(<<"my proc">>, Pid1),
+    timer:sleep(100),
     syn_registry:remove_from_local_table(default, <<"my proc">>, Pid1),
     syn_registry:add_to_local_table(default, <<"my proc">>, Pid2, undefined, 0, undefined),
     {error, race_condition} = rpc:call(SlaveNode1, syn, unregister, [<<"my proc">>]).