Browse Source

Set scopes in benchmark.

Roberto Ostinelli 3 years ago
parent
commit
10b8802d77
1 changed files with 53 additions and 40 deletions
  1. 53 40
      test/syn_benchmark.erl

+ 53 - 40
test/syn_benchmark.erl

@@ -35,7 +35,7 @@
     join_on_node/3,
     join_on_node/3,
     leave_on_node/3,
     leave_on_node/3,
     wait_registry_propagation/1,
     wait_registry_propagation/1,
-    wait_groups_propagation/1
+    wait_pg_propagation/1
 ]).
 ]).
 -export([
 -export([
     start_profiling/1,
     start_profiling/1,
@@ -45,6 +45,7 @@
 ]).
 ]).
 
 
 %% macros
 %% macros
+-define(TEST_SCOPE, scope_test).
 -define(TEST_GROUP_NAME, <<"test-group">>).
 -define(TEST_GROUP_NAME, <<"test-group">>).
 
 
 %% ===================================================================
 %% ===================================================================
@@ -55,17 +56,17 @@
 start() ->
 start() ->
     %% init
     %% init
     ProcessCount = list_to_integer(os:getenv("PROCESS_COUNT", "100000")),
     ProcessCount = list_to_integer(os:getenv("PROCESS_COUNT", "100000")),
-    WorkersPerNode = list_to_integer(os:getenv("WORKERS_PER_NODE", "1")),
-    SlavesCount = list_to_integer(os:getenv("NODES_COUNT", "1")),
+    WorkersPerNode = list_to_integer(os:getenv("WORKERS_PER_NODE", "100")),
+    SlavesCount = list_to_integer(os:getenv("NODES_COUNT", "2")),
     SkipRegistry = case os:getenv("SKIP_REGISTRY") of false -> false; _ -> true end,
     SkipRegistry = case os:getenv("SKIP_REGISTRY") of false -> false; _ -> true end,
-    SkipGroups = case os:getenv("SKIP_GROUPS") of false -> false; _ -> true end,
+    SkipPG = case os:getenv("SKIP_PG") of false -> false; _ -> true end,
 
 
     ProcessesPerNode = round(ProcessCount / SlavesCount),
     ProcessesPerNode = round(ProcessCount / SlavesCount),
 
 
-    io:format("-----> Starting benchmark~n"),
-    io:format("       --> Nodes: ~w / ~w slave(s)~n", [SlavesCount + 1, SlavesCount]),
-    io:format("       --> Total processes: ~w (~w / slave node)~n", [ProcessCount, ProcessesPerNode]),
-    io:format("       --> Workers per node: ~w~n~n", [WorkersPerNode]),
+    io:format("====> Starting benchmark~n"),
+    io:format("      --> Nodes: ~w / ~w slave(s)~n", [SlavesCount + 1, SlavesCount]),
+    io:format("      --> Total processes: ~w (~w / slave node)~n", [ProcessCount, ProcessesPerNode]),
+    io:format("      --> Workers per node: ~w~n~n", [WorkersPerNode]),
 
 
     %% start nodes
     %% start nodes
     NodesInfo = lists:foldl(fun(I, Acc) ->
     NodesInfo = lists:foldl(fun(I, Acc) ->
@@ -80,7 +81,8 @@ start() ->
         CodePath = code:get_path(),
         CodePath = code:get_path(),
         true = rpc:call(Node, code, set_path, [CodePath]),
         true = rpc:call(Node, code, set_path, [CodePath]),
         %% start syn
         %% start syn
-        rpc:call(Node, syn, start, []),
+        ok = rpc:call(Node, syn, start, []),
+        ok = rpc:call(Node, syn, add_node_to_scopes, [[?TEST_SCOPE]]),
         %% gather data
         %% gather data
         FromName = (I - 1) * ProcessesPerNode + 1,
         FromName = (I - 1) * ProcessesPerNode + 1,
         ToName = FromName + ProcessesPerNode - 1,
         ToName = FromName + ProcessesPerNode - 1,
@@ -90,21 +92,24 @@ start() ->
 
 
     %% start syn locally
     %% start syn locally
     ok = syn:start(),
     ok = syn:start(),
+    ok = syn:add_node_to_scopes([?TEST_SCOPE]),
     timer:sleep(1000),
     timer:sleep(1000),
 
 
     CollectorPid = self(),
     CollectorPid = self(),
 
 
-    %% start processes
-    PidsMap = lists:foldl(fun({Node, _FromName, _ToName}, Acc) ->
-        Pids = rpc:call(Node, ?MODULE, start_processes, [ProcessesPerNode]),
-        maps:put(Node, Pids, Acc)
-    end, #{}, NodesInfo),
-
     case SkipRegistry of
     case SkipRegistry of
         false ->
         false ->
+            io:format("~n====> Starting REGISTRY benchmark~n~n"),
+
+            %% start processes
+            RegistryPidsMap = lists:foldl(fun({Node, _FromName, _ToName}, Acc) ->
+                Pids = rpc:call(Node, ?MODULE, start_processes, [ProcessesPerNode]),
+                maps:put(Node, Pids, Acc)
+            end, #{}, NodesInfo),
+
             %% start registration
             %% start registration
             lists:foreach(fun({Node, FromName, _ToName}) ->
             lists:foreach(fun({Node, FromName, _ToName}) ->
-                Pids = maps:get(Node, PidsMap),
+                Pids = maps:get(Node, RegistryPidsMap),
                 rpc:cast(Node, ?MODULE, register_on_node, [CollectorPid, WorkersPerNode, FromName, Pids])
                 rpc:cast(Node, ?MODULE, register_on_node, [CollectorPid, WorkersPerNode, FromName, Pids])
             end, NodesInfo),
             end, NodesInfo),
 
 
@@ -147,7 +152,7 @@ start() ->
 
 
             %% start re-registration
             %% start re-registration
             lists:foreach(fun({Node, FromName, _ToName}) ->
             lists:foreach(fun({Node, FromName, _ToName}) ->
-                Pids = maps:get(Node, PidsMap),
+                Pids = maps:get(Node, RegistryPidsMap),
                 rpc:cast(Node, ?MODULE, register_on_node, [CollectorPid, WorkersPerNode, FromName, Pids])
                 rpc:cast(Node, ?MODULE, register_on_node, [CollectorPid, WorkersPerNode, FromName, Pids])
             end, NodesInfo),
             end, NodesInfo),
 
 
@@ -170,7 +175,7 @@ start() ->
             %% kill all processes
             %% kill all processes
             maps:foreach(fun(_Node, Pids) ->
             maps:foreach(fun(_Node, Pids) ->
                 lists:foreach(fun(Pid) -> exit(Pid, kill) end, Pids)
                 lists:foreach(fun(Pid) -> exit(Pid, kill) end, Pids)
-            end, PidsMap),
+            end, RegistryPidsMap),
 
 
             %% wait all unregistered
             %% wait all unregistered
             {RegKillPropagationTimeMs, _} = timer:tc(?MODULE, wait_registry_propagation, [0]),
             {RegKillPropagationTimeMs, _} = timer:tc(?MODULE, wait_registry_propagation, [0]),
@@ -184,11 +189,19 @@ start() ->
             io:format("~n====> Skipping REGISTRY.~n~n")
             io:format("~n====> Skipping REGISTRY.~n~n")
     end,
     end,
 
 
-    case SkipGroups of
+    case SkipPG of
         false ->
         false ->
+            io:format("~n====> Starting PG benchmark~n~n"),
+
+            %% start processes
+            PgPidsMap = lists:foldl(fun({Node, _FromName, _ToName}, Acc) ->
+                Pids = rpc:call(Node, ?MODULE, start_processes, [ProcessesPerNode]),
+                maps:put(Node, Pids, Acc)
+            end, #{}, NodesInfo),
+
             %% start joining
             %% start joining
             lists:foreach(fun({Node, _FromName, _ToName}) ->
             lists:foreach(fun({Node, _FromName, _ToName}) ->
-                Pids = maps:get(Node, PidsMap),
+                Pids = maps:get(Node, PgPidsMap),
                 rpc:cast(Node, ?MODULE, join_on_node, [CollectorPid, WorkersPerNode, Pids])
                 rpc:cast(Node, ?MODULE, join_on_node, [CollectorPid, WorkersPerNode, Pids])
             end, NodesInfo),
             end, NodesInfo),
 
 
@@ -199,7 +212,7 @@ start() ->
             io:format("      --> MIN: ~p secs.~n", [lists:min(JoinRemoteNodesTimes)]),
             io:format("      --> MIN: ~p secs.~n", [lists:min(JoinRemoteNodesTimes)]),
             io:format("      --> MAX: ~p secs.~n", [lists:max(JoinRemoteNodesTimes)]),
             io:format("      --> MAX: ~p secs.~n", [lists:max(JoinRemoteNodesTimes)]),
 
 
-            {JoinPropagationTimeMs, _} = timer:tc(?MODULE, wait_groups_propagation, [ProcessCount]),
+            {JoinPropagationTimeMs, _} = timer:tc(?MODULE, wait_pg_propagation, [ProcessCount]),
             JoinPropagationTime = JoinPropagationTimeMs / 1000000,
             JoinPropagationTime = JoinPropagationTimeMs / 1000000,
             io:format("----> Eventual additional time to propagate all to master: ~p secs.~n", [JoinPropagationTime]),
             io:format("----> Eventual additional time to propagate all to master: ~p secs.~n", [JoinPropagationTime]),
 
 
@@ -210,7 +223,7 @@ start() ->
 
 
             %% start leaving
             %% start leaving
             lists:foreach(fun({Node, _FromName, _ToName}) ->
             lists:foreach(fun({Node, _FromName, _ToName}) ->
-                Pids = maps:get(Node, PidsMap),
+                Pids = maps:get(Node, PgPidsMap),
                 rpc:cast(Node, ?MODULE, leave_on_node, [CollectorPid, WorkersPerNode, Pids])
                 rpc:cast(Node, ?MODULE, leave_on_node, [CollectorPid, WorkersPerNode, Pids])
             end, NodesInfo),
             end, NodesInfo),
 
 
@@ -221,7 +234,7 @@ start() ->
             io:format("      --> MIN: ~p secs.~n", [lists:min(LeaveRemoteNodesTimes)]),
             io:format("      --> MIN: ~p secs.~n", [lists:min(LeaveRemoteNodesTimes)]),
             io:format("      --> MAX: ~p secs.~n", [lists:max(LeaveRemoteNodesTimes)]),
             io:format("      --> MAX: ~p secs.~n", [lists:max(LeaveRemoteNodesTimes)]),
 
 
-            {LeavePropagationTimeMs, _} = timer:tc(?MODULE, wait_groups_propagation, [0]),
+            {LeavePropagationTimeMs, _} = timer:tc(?MODULE, wait_pg_propagation, [0]),
             LeavePropagationTime = LeavePropagationTimeMs / 1000000,
             LeavePropagationTime = LeavePropagationTimeMs / 1000000,
             io:format("----> Eventual additional time to propagate all to master: ~p secs.~n", [LeavePropagationTime]),
             io:format("----> Eventual additional time to propagate all to master: ~p secs.~n", [LeavePropagationTime]),
 
 
@@ -232,7 +245,7 @@ start() ->
 
 
             %% start re-joining
             %% start re-joining
             lists:foreach(fun({Node, _FromName, _ToName}) ->
             lists:foreach(fun({Node, _FromName, _ToName}) ->
-                Pids = maps:get(Node, PidsMap),
+                Pids = maps:get(Node, PgPidsMap),
                 rpc:cast(Node, ?MODULE, join_on_node, [CollectorPid, WorkersPerNode, Pids])
                 rpc:cast(Node, ?MODULE, join_on_node, [CollectorPid, WorkersPerNode, Pids])
             end, NodesInfo),
             end, NodesInfo),
 
 
@@ -243,7 +256,7 @@ start() ->
             io:format("      --> MIN: ~p secs.~n", [lists:min(ReJoinRemoteNodesTimes)]),
             io:format("      --> MIN: ~p secs.~n", [lists:min(ReJoinRemoteNodesTimes)]),
             io:format("      --> MAX: ~p secs.~n", [lists:max(ReJoinRemoteNodesTimes)]),
             io:format("      --> MAX: ~p secs.~n", [lists:max(ReJoinRemoteNodesTimes)]),
 
 
-            {ReJoinPropagationTimeMs, _} = timer:tc(?MODULE, wait_groups_propagation, [ProcessCount]),
+            {ReJoinPropagationTimeMs, _} = timer:tc(?MODULE, wait_pg_propagation, [ProcessCount]),
             ReJoinPropagationTime = ReJoinPropagationTimeMs / 1000000,
             ReJoinPropagationTime = ReJoinPropagationTimeMs / 1000000,
             io:format("----> Eventual additional time to propagate all to master: ~p secs.~n", [ReJoinPropagationTime]),
             io:format("----> Eventual additional time to propagate all to master: ~p secs.~n", [ReJoinPropagationTime]),
 
 
@@ -255,18 +268,18 @@ start() ->
             %% kill all processes
             %% kill all processes
             maps:foreach(fun(_Node, Pids) ->
             maps:foreach(fun(_Node, Pids) ->
                 lists:foreach(fun(Pid) -> exit(Pid, kill) end, Pids)
                 lists:foreach(fun(Pid) -> exit(Pid, kill) end, Pids)
-            end, PidsMap),
+            end, PgPidsMap),
 
 
             %% wait all unregistered
             %% wait all unregistered
-            {GroupsKillPropagationTimeMs, _} = timer:tc(?MODULE, wait_groups_propagation, [0]),
-            GroupsKillPropagationTime = GroupsKillPropagationTimeMs / 1000000,
-            io:format("----> Time to propagate killed process to to master: ~p secs.~n", [GroupsKillPropagationTime]),
+            {PgKillPropagationTimeMs, _} = timer:tc(?MODULE, wait_pg_propagation, [0]),
+            PgKillPropagationTime = PgKillPropagationTimeMs / 1000000,
+            io:format("----> Time to propagate killed process to to master: ~p secs.~n", [PgKillPropagationTime]),
 
 
-            GroupsKillRate = ProcessCount / GroupsKillPropagationTime,
-            io:format("====> Left after kill rate (with propagation): ~p/sec.~n~n", [GroupsKillRate]);
+            PgKillRate = ProcessCount / PgKillPropagationTime,
+            io:format("====> Left after kill rate (with propagation): ~p/sec.~n~n", [PgKillRate]);
 
 
         true ->
         true ->
-            io:format("~n====> Skipping GROUPS.~n")
+            io:format("~n====> Skipping PG.~n")
     end,
     end,
 
 
     %% stop node
     %% stop node
@@ -303,7 +316,7 @@ register_on_node(CollectorPid, WorkersPerNode, FromName, Pids) ->
 
 
 worker_register_on_node(_Name, []) -> ok;
 worker_register_on_node(_Name, []) -> ok;
 worker_register_on_node(Name, [Pid | PidsTail]) ->
 worker_register_on_node(Name, [Pid | PidsTail]) ->
-    ok = syn:register(Name, Pid),
+    ok = syn:register(?TEST_SCOPE, Name, Pid),
     worker_register_on_node(Name + 1, PidsTail).
     worker_register_on_node(Name + 1, PidsTail).
 
 
 unregister_on_node(CollectorPid, WorkersPerNode, FromName, ToName) ->
 unregister_on_node(CollectorPid, WorkersPerNode, FromName, ToName) ->
@@ -337,7 +350,7 @@ unregister_on_node(CollectorPid, WorkersPerNode, FromName, ToName) ->
 
 
 worker_unregister_on_node(FromName, ToName) when FromName > ToName -> ok;
 worker_unregister_on_node(FromName, ToName) when FromName > ToName -> ok;
 worker_unregister_on_node(Name, ToName) ->
 worker_unregister_on_node(Name, ToName) ->
-    ok = syn:unregister(Name),
+    ok = syn:unregister(?TEST_SCOPE, Name),
     worker_unregister_on_node(Name + 1, ToName).
     worker_unregister_on_node(Name + 1, ToName).
 
 
 join_on_node(CollectorPid, WorkersPerNode, Pids) ->
 join_on_node(CollectorPid, WorkersPerNode, Pids) ->
@@ -370,7 +383,7 @@ join_on_node(CollectorPid, WorkersPerNode, Pids) ->
 
 
 worker_join_on_node([]) -> ok;
 worker_join_on_node([]) -> ok;
 worker_join_on_node([Pid | PidsTail]) ->
 worker_join_on_node([Pid | PidsTail]) ->
-    ok = syn:join(?TEST_GROUP_NAME, Pid),
+    ok = syn:join(?TEST_SCOPE, ?TEST_GROUP_NAME, Pid),
     worker_join_on_node(PidsTail).
     worker_join_on_node(PidsTail).
 
 
 leave_on_node(CollectorPid, WorkersPerNode, Pids) ->
 leave_on_node(CollectorPid, WorkersPerNode, Pids) ->
@@ -403,7 +416,7 @@ leave_on_node(CollectorPid, WorkersPerNode, Pids) ->
 
 
 worker_leave_on_node([]) -> ok;
 worker_leave_on_node([]) -> ok;
 worker_leave_on_node([Pid | PidsTail]) ->
 worker_leave_on_node([Pid | PidsTail]) ->
-    ok = syn:leave(?TEST_GROUP_NAME, Pid),
+    ok = syn:leave(?TEST_SCOPE, ?TEST_GROUP_NAME, Pid),
     worker_leave_on_node(PidsTail).
     worker_leave_on_node(PidsTail).
 
 
 wait_done_on_node(CollectorPid, Time, 0) ->
 wait_done_on_node(CollectorPid, Time, 0) ->
@@ -437,7 +450,7 @@ wait_from_all_remote_nodes([RemoteNode | Tail], Times) ->
     end.
     end.
 
 
 wait_registry_propagation(DesiredCount) ->
 wait_registry_propagation(DesiredCount) ->
-    case syn:registry_count(default) of
+    case syn:registry_count(?TEST_SCOPE) of
         DesiredCount ->
         DesiredCount ->
             ok;
             ok;
 
 
@@ -446,14 +459,14 @@ wait_registry_propagation(DesiredCount) ->
             wait_registry_propagation(DesiredCount)
             wait_registry_propagation(DesiredCount)
     end.
     end.
 
 
-wait_groups_propagation(DesiredCount) ->
-    case length(syn:members(?TEST_GROUP_NAME)) of
+wait_pg_propagation(DesiredCount) ->
+    case length(syn:members(?TEST_SCOPE, ?TEST_GROUP_NAME)) of
         DesiredCount ->
         DesiredCount ->
             ok;
             ok;
 
 
         _ ->
         _ ->
             timer:sleep(50),
             timer:sleep(50),
-            wait_groups_propagation(DesiredCount)
+            wait_pg_propagation(DesiredCount)
     end.
     end.
 
 
 start_profiling(NodesInfo) ->
 start_profiling(NodesInfo) ->