Browse Source

Add groups benchmark.

Roberto Ostinelli 3 years ago
parent
commit
16f9ea885f
1 changed files with 253 additions and 80 deletions
  1. 253 80
      test/syn_benchmark.erl

+ 253 - 80
test/syn_benchmark.erl

@@ -32,13 +32,19 @@
     process_loop/0,
     process_loop/0,
     register_on_node/4,
     register_on_node/4,
     unregister_on_node/4,
     unregister_on_node/4,
-    wait_registry_propagation/1
+    join_on_node/3,
+    leave_on_node/3,
+    wait_registry_propagation/1,
+    wait_groups_propagation/1
 ]).
 ]).
 -export([
 -export([
     start_profiling/0,
     start_profiling/0,
     stop_profiling/0
     stop_profiling/0
 ]).
 ]).
 
 
+%% macros
+-define(TEST_GROUP_NAME, <<"test-group">>).
+
 %% ===================================================================
 %% ===================================================================
 %% API
 %% API
 %% ===================================================================
 %% ===================================================================
@@ -49,6 +55,8 @@ start() ->
     ProcessCount = list_to_integer(os:getenv("PROCESS_COUNT", "100000")),
     ProcessCount = list_to_integer(os:getenv("PROCESS_COUNT", "100000")),
     WorkersPerNode = list_to_integer(os:getenv("WORKERS_PER_NODE", "1")),
     WorkersPerNode = list_to_integer(os:getenv("WORKERS_PER_NODE", "1")),
     SlavesCount = list_to_integer(os:getenv("NODES_COUNT", "1")),
     SlavesCount = list_to_integer(os:getenv("NODES_COUNT", "1")),
+    SkipRegistry = case os:getenv("SKIP_REGISTRY") of false -> false; _ -> true end,
+    SkipGroups = case os:getenv("SKIP_GROUPS") of false -> false; _ -> true end,
 
 
     ProcessesPerNode = round(ProcessCount / SlavesCount),
     ProcessesPerNode = round(ProcessCount / SlavesCount),
 
 
@@ -90,85 +98,174 @@ start() ->
         maps:put(Node, Pids, Acc)
         maps:put(Node, Pids, Acc)
     end, #{}, NodesInfo),
     end, #{}, NodesInfo),
 
 
-    %% start registration
-    lists:foreach(fun({Node, FromName, _ToName}) ->
-        Pids = maps:get(Node, PidsMap),
-        rpc:cast(Node, ?MODULE, register_on_node, [CollectorPid, WorkersPerNode, FromName, Pids])
-    end, NodesInfo),
-
-    %% wait
-    RegRemoteNodesTimes = wait_from_all_remote_nodes(nodes(), []),
-
-    io:format("----> Remote registration times:~n"),
-    io:format("      --> MIN: ~p secs.~n", [lists:min(RegRemoteNodesTimes)]),
-    io:format("      --> MAX: ~p secs.~n", [lists:max(RegRemoteNodesTimes)]),
-
-    {RegPropagationTimeMs, _} = timer:tc(?MODULE, wait_registry_propagation, [ProcessCount]),
-    RegPropagationTime = RegPropagationTimeMs / 1000000,
-    io:format("----> Eventual additional time to propagate all to master: ~p secs.~n", [RegPropagationTime]),
-
-    %% sum
-    RegTakenTime = (lists:max(RegRemoteNodesTimes) + RegPropagationTime),
-    RegistrationRate = ProcessCount / RegTakenTime,
-    io:format("====> Registeration rate (with propagation): ~p/sec.~n~n", [RegistrationRate]),
-
-    timer:sleep(1000),
-
-    %% start unregistration
-    lists:foreach(fun({Node, FromName, ToName}) ->
-        rpc:cast(Node, ?MODULE, unregister_on_node, [CollectorPid, WorkersPerNode, FromName, ToName])
-    end, NodesInfo),
-
-    %% wait
-    UnregRemoteNodesTimes = wait_from_all_remote_nodes(nodes(), []),
-
-    io:format("----> Remote unregistration times:~n"),
-    io:format("      --> MIN: ~p secs.~n", [lists:min(UnregRemoteNodesTimes)]),
-    io:format("      --> MAX: ~p secs.~n", [lists:max(UnregRemoteNodesTimes)]),
-
-    {UnregPropagationTimeMs, _} = timer:tc(?MODULE, wait_registry_propagation, [0]),
-    UnregPropagationTime = UnregPropagationTimeMs / 1000000,
-    io:format("----> Eventual additional time to propagate all to master: ~p secs.~n", [UnregPropagationTime]),
-
-    %% sum
-    UnregTakenTime = (lists:max(UnregRemoteNodesTimes) + UnregPropagationTime),
-    UnregistrationRate = ProcessCount / UnregTakenTime,
-    io:format("====> Unregisteration rate (with propagation): ~p/sec.~n~n", [UnregistrationRate]),
-
-    %% start re-registration
-    lists:foreach(fun({Node, FromName, _ToName}) ->
-        Pids = maps:get(Node, PidsMap),
-        rpc:cast(Node, ?MODULE, register_on_node, [CollectorPid, WorkersPerNode, FromName, Pids])
-    end, NodesInfo),
-
-    %% wait
-    ReRegRemoteNodesTimes = wait_from_all_remote_nodes(nodes(), []),
-
-    io:format("----> Remote re-registration times:~n"),
-    io:format("      --> MIN: ~p secs.~n", [lists:min(ReRegRemoteNodesTimes)]),
-    io:format("      --> MAX: ~p secs.~n", [lists:max(ReRegRemoteNodesTimes)]),
-
-    {ReRegPropagationTimeMs, _} = timer:tc(?MODULE, wait_registry_propagation, [ProcessCount]),
-    ReRegPropagationTime = ReRegPropagationTimeMs / 1000000,
-    io:format("----> Eventual additional time to propagate all to master: ~p secs.~n", [ReRegPropagationTime]),
-
-    %% sum
-    ReRegTakenTime = (lists:max(ReRegRemoteNodesTimes) + ReRegPropagationTime),
-    ReRegistrationRate = ProcessCount / ReRegTakenTime,
-    io:format("====> Re-registeration rate (with propagation): ~p/sec.~n~n", [ReRegistrationRate]),
-
-    %% kill all processes
-    maps:foreach(fun(_Node, Pids) ->
-        lists:foreach(fun(Pid) -> exit(Pid, kill) end, Pids)
-    end, PidsMap),
-
-    %% wait all unregistered
-    {KillPropagationTimeMs, _} = timer:tc(?MODULE, wait_registry_propagation, [0]),
-    KillPropagationTime = KillPropagationTimeMs / 1000000,
-    io:format("----> Time to propagate killed process to to master: ~p secs.~n", [KillPropagationTime]),
-
-    KillRate = ProcessCount / KillPropagationTime,
-    io:format("====> Unregistered after kill rate (with propagation): ~p/sec.~n~n", [KillRate]),
+    case SkipRegistry of
+        false ->
+            %% start registration
+            lists:foreach(fun({Node, FromName, _ToName}) ->
+                Pids = maps:get(Node, PidsMap),
+                rpc:cast(Node, ?MODULE, register_on_node, [CollectorPid, WorkersPerNode, FromName, Pids])
+            end, NodesInfo),
+
+            %% wait
+            RegRemoteNodesTimes = wait_from_all_remote_nodes(nodes(), []),
+
+            io:format("----> Remote registration times:~n"),
+            io:format("      --> MIN: ~p secs.~n", [lists:min(RegRemoteNodesTimes)]),
+            io:format("      --> MAX: ~p secs.~n", [lists:max(RegRemoteNodesTimes)]),
+
+            {RegPropagationTimeMs, _} = timer:tc(?MODULE, wait_registry_propagation, [ProcessCount]),
+            RegPropagationTime = RegPropagationTimeMs / 1000000,
+            io:format("----> Eventual additional time to propagate all to master: ~p secs.~n", [RegPropagationTime]),
+
+            %% sum
+            RegTakenTime = (lists:max(RegRemoteNodesTimes) + RegPropagationTime),
+            RegistrationRate = ProcessCount / RegTakenTime,
+            io:format("====> Registeration rate (with propagation): ~p/sec.~n~n", [RegistrationRate]),
+
+            %% start unregistration
+            lists:foreach(fun({Node, FromName, ToName}) ->
+                rpc:cast(Node, ?MODULE, unregister_on_node, [CollectorPid, WorkersPerNode, FromName, ToName])
+            end, NodesInfo),
+
+            %% wait
+            UnregRemoteNodesTimes = wait_from_all_remote_nodes(nodes(), []),
+
+            io:format("----> Remote unregistration times:~n"),
+            io:format("      --> MIN: ~p secs.~n", [lists:min(UnregRemoteNodesTimes)]),
+            io:format("      --> MAX: ~p secs.~n", [lists:max(UnregRemoteNodesTimes)]),
+
+            {UnregPropagationTimeMs, _} = timer:tc(?MODULE, wait_registry_propagation, [0]),
+            UnregPropagationTime = UnregPropagationTimeMs / 1000000,
+            io:format("----> Eventual additional time to propagate all to master: ~p secs.~n", [UnregPropagationTime]),
+
+            %% sum
+            UnregTakenTime = (lists:max(UnregRemoteNodesTimes) + UnregPropagationTime),
+            UnregistrationRate = ProcessCount / UnregTakenTime,
+            io:format("====> Unregisteration rate (with propagation): ~p/sec.~n~n", [UnregistrationRate]),
+
+            %% start re-registration
+            lists:foreach(fun({Node, FromName, _ToName}) ->
+                Pids = maps:get(Node, PidsMap),
+                rpc:cast(Node, ?MODULE, register_on_node, [CollectorPid, WorkersPerNode, FromName, Pids])
+            end, NodesInfo),
+
+            %% wait
+            ReRegRemoteNodesTimes = wait_from_all_remote_nodes(nodes(), []),
+
+            io:format("----> Remote re-registration times:~n"),
+            io:format("      --> MIN: ~p secs.~n", [lists:min(ReRegRemoteNodesTimes)]),
+            io:format("      --> MAX: ~p secs.~n", [lists:max(ReRegRemoteNodesTimes)]),
+
+            {ReRegPropagationTimeMs, _} = timer:tc(?MODULE, wait_registry_propagation, [ProcessCount]),
+            ReRegPropagationTime = ReRegPropagationTimeMs / 1000000,
+            io:format("----> Eventual additional time to propagate all to master: ~p secs.~n", [ReRegPropagationTime]),
+
+            %% sum
+            ReRegTakenTime = (lists:max(ReRegRemoteNodesTimes) + ReRegPropagationTime),
+            ReRegistrationRate = ProcessCount / ReRegTakenTime,
+            io:format("====> Re-registeration rate (with propagation): ~p/sec.~n~n", [ReRegistrationRate]),
+
+            %% kill all processes
+            maps:foreach(fun(_Node, Pids) ->
+                lists:foreach(fun(Pid) -> exit(Pid, kill) end, Pids)
+            end, PidsMap),
+
+            %% wait all unregistered
+            {RegKillPropagationTimeMs, _} = timer:tc(?MODULE, wait_registry_propagation, [0]),
+            RegKillPropagationTime = RegKillPropagationTimeMs / 1000000,
+            io:format("----> Time to propagate killed process to to master: ~p secs.~n", [RegKillPropagationTime]),
+
+            RegKillRate = ProcessCount / RegKillPropagationTime,
+            io:format("====> Unregistered after kill rate (with propagation): ~p/sec.~n~n", [RegKillRate]);
+
+        true ->
+            io:format("====> Skipping REGISTRY.~n")
+    end,
+
+    case SkipGroups of
+        false ->
+            %% start joining
+            lists:foreach(fun({Node, _FromName, _ToName}) ->
+                Pids = maps:get(Node, PidsMap),
+                rpc:cast(Node, ?MODULE, join_on_node, [CollectorPid, WorkersPerNode, Pids])
+            end, NodesInfo),
+
+            %% wait
+            JoinRemoteNodesTimes = wait_from_all_remote_nodes(nodes(), []),
+
+            io:format("----> Remote join times:~n"),
+            io:format("      --> MIN: ~p secs.~n", [lists:min(JoinRemoteNodesTimes)]),
+            io:format("      --> MAX: ~p secs.~n", [lists:max(JoinRemoteNodesTimes)]),
+
+            {JoinPropagationTimeMs, _} = timer:tc(?MODULE, wait_groups_propagation, [ProcessCount]),
+            JoinPropagationTime = JoinPropagationTimeMs / 1000000,
+            io:format("----> Eventual additional time to propagate all to master: ~p secs.~n", [JoinPropagationTime]),
+
+            %% sum
+            JoinTakenTime = (lists:max(JoinRemoteNodesTimes) + JoinPropagationTime),
+            JoinRate = ProcessCount / JoinTakenTime,
+            io:format("====> Join rate (with propagation): ~p/sec.~n~n", [JoinRate]),
+
+            %% start leaving
+            lists:foreach(fun({Node, _FromName, _ToName}) ->
+                Pids = maps:get(Node, PidsMap),
+                rpc:cast(Node, ?MODULE, leave_on_node, [CollectorPid, WorkersPerNode, Pids])
+            end, NodesInfo),
+
+            %% wait
+            LeaveRemoteNodesTimes = wait_from_all_remote_nodes(nodes(), []),
+
+            io:format("----> Remote leave times:~n"),
+            io:format("      --> MIN: ~p secs.~n", [lists:min(LeaveRemoteNodesTimes)]),
+            io:format("      --> MAX: ~p secs.~n", [lists:max(LeaveRemoteNodesTimes)]),
+
+            {LeavePropagationTimeMs, _} = timer:tc(?MODULE, wait_groups_propagation, [0]),
+            LeavePropagationTime = LeavePropagationTimeMs / 1000000,
+            io:format("----> Eventual additional time to propagate all to master: ~p secs.~n", [LeavePropagationTime]),
+
+            %% sum
+            LeaveTakenTime = (lists:max(LeaveRemoteNodesTimes) + LeavePropagationTime),
+            LeaveRate = ProcessCount / LeaveTakenTime,
+            io:format("====> Leave rate (with propagation): ~p/sec.~n~n", [LeaveRate]),
+
+            %% start re-joining
+            lists:foreach(fun({Node, _FromName, _ToName}) ->
+                Pids = maps:get(Node, PidsMap),
+                rpc:cast(Node, ?MODULE, join_on_node, [CollectorPid, WorkersPerNode, Pids])
+            end, NodesInfo),
+
+            %% wait
+            ReJoinRemoteNodesTimes = wait_from_all_remote_nodes(nodes(), []),
+
+            io:format("----> Remote join times:~n"),
+            io:format("      --> MIN: ~p secs.~n", [lists:min(ReJoinRemoteNodesTimes)]),
+            io:format("      --> MAX: ~p secs.~n", [lists:max(ReJoinRemoteNodesTimes)]),
+
+            {ReJoinPropagationTimeMs, _} = timer:tc(?MODULE, wait_groups_propagation, [ProcessCount]),
+            ReJoinPropagationTime = ReJoinPropagationTimeMs / 1000000,
+            io:format("----> Eventual additional time to propagate all to master: ~p secs.~n", [ReJoinPropagationTime]),
+
+            %% sum
+            ReJoinTakenTime = (lists:max(ReJoinRemoteNodesTimes) + ReJoinPropagationTime),
+            ReJoinRate = ProcessCount / ReJoinTakenTime,
+            io:format("====> Re-join rate (with propagation): ~p/sec.~n~n", [ReJoinRate]),
+
+            %% kill all processes
+            maps:foreach(fun(_Node, Pids) ->
+                lists:foreach(fun(Pid) -> exit(Pid, kill) end, Pids)
+            end, PidsMap),
+
+            %% wait all unregistered
+            {GroupsKillPropagationTimeMs, _} = timer:tc(?MODULE, wait_groups_propagation, [0]),
+            GroupsKillPropagationTime = GroupsKillPropagationTimeMs / 1000000,
+            io:format("----> Time to propagate killed process to to master: ~p secs.~n", [GroupsKillPropagationTime]),
+
+            GroupsKillRate = ProcessCount / GroupsKillPropagationTime,
+            io:format("====> Left after kill rate (with propagation): ~p/sec.~n~n", [GroupsKillRate]);
+
+        true ->
+            io:format("====> Skipping GROUPS.~n")
+    end,
 
 
     %% stop node
     %% stop node
     init:stop().
     init:stop().
@@ -241,6 +338,72 @@ worker_unregister_on_node(Name, ToName) ->
     ok = syn:unregister(Name),
     ok = syn:unregister(Name),
     worker_unregister_on_node(Name + 1, ToName).
     worker_unregister_on_node(Name + 1, ToName).
 
 
+join_on_node(CollectorPid, WorkersPerNode, Pids) ->
+    %% split pids in workers
+    PidsPerNode = round(length(Pids) / WorkersPerNode),
+    {PidsPerWorker, []} = lists:foldl(fun(I, {P, RPids}) ->
+        {WPids, RestOfPids} = case I of
+            WorkersPerNode ->
+                %% last in the loop, get remaining pids
+                {RPids, []};
+            _ ->
+                %% get portion of pids
+                lists:split(PidsPerNode, RPids)
+        end,
+        {[WPids | P], RestOfPids}
+    end, {[], Pids}, lists:seq(1, WorkersPerNode)),
+    %% spawn workers
+    ReplyPid = self(),
+    lists:foreach(fun(WorkerPids) ->
+        spawn(fun() ->
+            StartAt = os:system_time(millisecond),
+            worker_join_on_node(WorkerPids),
+            Time = (os:system_time(millisecond) - StartAt) / 1000,
+            ReplyPid ! {done, Time}
+        end)
+    end, PidsPerWorker),
+    %% wait
+    Time = wait_done_on_node(CollectorPid, 0, WorkersPerNode),
+    io:format("----> Joined on node ~p on ~p secs.~n", [node(), Time]).
+
+worker_join_on_node([]) -> ok;
+worker_join_on_node([Pid | PidsTail]) ->
+    ok = syn:join(?TEST_GROUP_NAME, Pid),
+    worker_join_on_node(PidsTail).
+
+leave_on_node(CollectorPid, WorkersPerNode, Pids) ->
+    %% split pids in workers
+    PidsPerNode = round(length(Pids) / WorkersPerNode),
+    {PidsPerWorker, []} = lists:foldl(fun(I, {P, RPids}) ->
+        {WPids, RestOfPids} = case I of
+            WorkersPerNode ->
+                %% last in the loop, get remaining pids
+                {RPids, []};
+            _ ->
+                %% get portion of pids
+                lists:split(PidsPerNode, RPids)
+        end,
+        {[WPids | P], RestOfPids}
+    end, {[], Pids}, lists:seq(1, WorkersPerNode)),
+    %% spawn workers
+    ReplyPid = self(),
+    lists:foreach(fun(WorkerPids) ->
+        spawn(fun() ->
+            StartAt = os:system_time(millisecond),
+            worker_leave_on_node(WorkerPids),
+            Time = (os:system_time(millisecond) - StartAt) / 1000,
+            ReplyPid ! {done, Time}
+        end)
+    end, PidsPerWorker),
+    %% wait
+    Time = wait_done_on_node(CollectorPid, 0, WorkersPerNode),
+    io:format("----> Left on node ~p on ~p secs.~n", [node(), Time]).
+
+worker_leave_on_node([]) -> ok;
+worker_leave_on_node([Pid | PidsTail]) ->
+    ok = syn:leave(?TEST_GROUP_NAME, Pid),
+    worker_leave_on_node(PidsTail).
+
 wait_done_on_node(CollectorPid, Time, 0) ->
 wait_done_on_node(CollectorPid, Time, 0) ->
     CollectorPid ! {done, node(), Time},
     CollectorPid ! {done, node(), Time},
     Time;
     Time;
@@ -281,6 +444,16 @@ wait_registry_propagation(DesiredCount) ->
             wait_registry_propagation(DesiredCount)
             wait_registry_propagation(DesiredCount)
     end.
     end.
 
 
+wait_groups_propagation(DesiredCount) ->
+    case length(syn:members(?TEST_GROUP_NAME)) of
+        DesiredCount ->
+            ok;
+
+        _ ->
+            timer:sleep(50),
+            wait_groups_propagation(DesiredCount)
+    end.
+
 start_profiling() ->
 start_profiling() ->
     {ok, P} = eprof:start(),
     {ok, P} = eprof:start(),
     eprof:start_profiling(erlang:processes() -- [P]).
     eprof:start_profiling(erlang:processes() -- [P]).