Browse Source

Add multiple workers per node to better simulate reality.

Roberto Ostinelli 3 years ago
parent
commit
914a8af14a
1 changed files with 90 additions and 27 deletions
  1. 90 27
      test/syn_benchmark.erl

+ 90 - 27
test/syn_benchmark.erl

@@ -30,10 +30,8 @@
     start/0,
     start/0,
     start_processes/1,
     start_processes/1,
     process_loop/0,
     process_loop/0,
-    register_on_node/3,
-    do_register_on_node/2,
-    unregister_on_node/3,
-    do_unregister_on_node/2,
+    register_on_node/4,
+    unregister_on_node/4,
     wait_registration_propagation/1,
     wait_registration_propagation/1,
     wait_unregistration_propagation/0
     wait_unregistration_propagation/0
 ]).
 ]).
@@ -46,16 +44,19 @@
 %% API
 %% API
 %% ===================================================================
 %% ===================================================================
 
 
-%% example run: `PROCESS_COUNT=100000 NODES_COUNT=2 make bench`
+%% example run: `PROCESS_COUNT=100000 WORKERS_PER_NODE=100 NODES_COUNT=2 make bench`
 start() ->
 start() ->
     %% init
     %% init
     SlavesCount = list_to_integer(os:getenv("NODES_COUNT", "1")),
     SlavesCount = list_to_integer(os:getenv("NODES_COUNT", "1")),
+    WorkersPerNode = list_to_integer(os:getenv("WORKERS_PER_NODE", "100")),
     ProcessCount = list_to_integer(os:getenv("PROCESS_COUNT", "100000")),
     ProcessCount = list_to_integer(os:getenv("PROCESS_COUNT", "100000")),
 
 
     ProcessesPerNode = round(ProcessCount / SlavesCount),
     ProcessesPerNode = round(ProcessCount / SlavesCount),
-    io:format("-----> Starting benchmark on ~w nodes (~w slaves) (for ~w processes total (~w / slave node)~n",
-        [SlavesCount + 1, SlavesCount, ProcessCount, ProcessesPerNode]
-    ),
+
+    io:format("-----> Starting benchmark~n"),
+    io:format("       --> Nodes: ~w (~w slaves)~n", [SlavesCount + 1, SlavesCount]),
+    io:format("       --> Total processes: ~w (~w / slave node)~n", [ProcessCount, ProcessesPerNode]),
+    io:format("       --> Workers per node: ~w~n~n", [WorkersPerNode]),
 
 
     %% start nodes
     %% start nodes
     NodesInfo = lists:foldl(fun(I, Acc) ->
     NodesInfo = lists:foldl(fun(I, Acc) ->
@@ -93,7 +94,7 @@ start() ->
     %% start registration
     %% start registration
     lists:foreach(fun({Node, FromName, _ToName}) ->
     lists:foreach(fun({Node, FromName, _ToName}) ->
         Pids = maps:get(Node, PidsMap),
         Pids = maps:get(Node, PidsMap),
-        rpc:cast(Node, ?MODULE, register_on_node, [CollectorPid, FromName, Pids])
+        rpc:cast(Node, ?MODULE, register_on_node, [CollectorPid, WorkersPerNode, FromName, Pids])
     end, NodesInfo),
     end, NodesInfo),
 
 
     %% wait
     %% wait
@@ -116,7 +117,7 @@ start() ->
 
 
     %% start unregistration
     %% start unregistration
     lists:foreach(fun({Node, FromName, ToName}) ->
     lists:foreach(fun({Node, FromName, ToName}) ->
-        rpc:cast(Node, ?MODULE, unregister_on_node, [CollectorPid, FromName, ToName])
+        rpc:cast(Node, ?MODULE, unregister_on_node, [CollectorPid, WorkersPerNode, FromName, ToName])
     end, NodesInfo),
     end, NodesInfo),
 
 
     %% wait
     %% wait
@@ -138,7 +139,7 @@ start() ->
     %% start re-registration
     %% start re-registration
     lists:foreach(fun({Node, FromName, _ToName}) ->
     lists:foreach(fun({Node, FromName, _ToName}) ->
         Pids = maps:get(Node, PidsMap),
         Pids = maps:get(Node, PidsMap),
-        rpc:cast(Node, ?MODULE, register_on_node, [CollectorPid, FromName, Pids])
+        rpc:cast(Node, ?MODULE, register_on_node, [CollectorPid, WorkersPerNode, FromName, Pids])
     end, NodesInfo),
     end, NodesInfo),
 
 
     %% wait
     %% wait
@@ -173,27 +174,89 @@ start() ->
     %% stop node
     %% stop node
     init:stop().
     init:stop().
 
 
-register_on_node(CollectorPid, FromName, Pids) ->
-    {TimeMs, _} = timer:tc(?MODULE, do_register_on_node, [FromName, Pids]),
-    Time = TimeMs / 1000000,
-    io:format("----> Registered on node ~p on ~p secs.~n", [node(), Time]),
-    CollectorPid ! {done, node(), Time}.
+register_on_node(CollectorPid, WorkersPerNode, FromName, Pids) ->
+    %% split pids in workers
+    PidsPerNode = round(length(Pids) / WorkersPerNode),
+    {WorkerInfo, []} = lists:foldl(fun(I, {WInfo, RPids}) ->
+        {WorkerPids, RestOfPids} = case I of
+            WorkersPerNode ->
+                %% last in the loop, get remaining pids
+                {RPids, []};
+            _ ->
+                %% get portion of pids
+                lists:split(PidsPerNode, RPids)
+        end,
+        WorkerFromName = FromName + (PidsPerNode * (I - 1)),
+        {[{WorkerFromName, WorkerPids} | WInfo], RestOfPids}
+    end, {[], Pids}, lists:seq(1, WorkersPerNode)),
+    %% start measure, spawn time is irrelevant
+    StartAt = os:system_time(millisecond),
+    %% spawn workers
+    ReplyPid = self(),
+    lists:foreach(fun({WorkerFromName, WorkerPids}) ->
+        spawn(fun() ->
+            worker_register_on_node(WorkerFromName, WorkerPids),
+            ReplyPid ! done
+        end)
+    end, WorkerInfo),
+    %% wait
+    wait_register_on_node(CollectorPid, StartAt, WorkersPerNode).
 
 
-do_register_on_node(_Name, []) -> ok;
-do_register_on_node(Name, [Pid | PidsTail]) ->
+worker_register_on_node(_Name, []) -> ok;
+worker_register_on_node(Name, [Pid | PidsTail]) ->
     ok = syn:register(Name, Pid),
     ok = syn:register(Name, Pid),
-    do_register_on_node(Name + 1, PidsTail).
+    worker_register_on_node(Name + 1, PidsTail).
 
 
-unregister_on_node(CollectorPid, FromName, ToName) ->
-    {TimeMs, _} = timer:tc(?MODULE, do_unregister_on_node, [FromName, ToName]),
-    Time = TimeMs / 1000000,
-    io:format("----> Unregistered on node ~p on ~p secs.~n", [node(), Time]),
-    CollectorPid ! {done, node(), Time}.
+wait_register_on_node(CollectorPid, StartAt, 0) ->
+    Time = (os:system_time(millisecond) - StartAt) / 1000,
+    io:format("----> Registered on node ~p on ~p secs.~n", [node(), Time]),
+    CollectorPid ! {done, node(), Time};
+wait_register_on_node(CollectorPid, StartAt, WorkersRemainingCount) ->
+    receive
+        done -> wait_register_on_node(CollectorPid, StartAt, WorkersRemainingCount - 1)
+    end.
 
 
-do_unregister_on_node(FromName, ToName) when FromName > ToName -> ok;
-do_unregister_on_node(Name, ToName) ->
+unregister_on_node(CollectorPid, WorkersPerNode, FromName, ToName) ->
+    %% split pids in workers
+    ProcessesPerNode = ToName - FromName + 1,
+    ProcessesPerWorker = round(ProcessesPerNode / WorkersPerNode),
+    WorkerInfo = lists:foldl(fun(I, Acc) ->
+        {WorkerFromName, WorkerToName} = case I of
+            WorkersPerNode ->
+                %% last in the loop
+                {FromName + (I - 1) * ProcessesPerWorker, ToName};
+
+            _ ->
+                {FromName + (I - 1) * ProcessesPerWorker, FromName + I * ProcessesPerWorker - 1}
+        end,
+        [{WorkerFromName, WorkerToName} | Acc]
+    end, [], lists:seq(1, WorkersPerNode)),
+    %% start measure, spawn time is irrelevant
+    StartAt = os:system_time(millisecond),
+    %% spawn workers
+    ReplyPid = self(),
+    lists:foreach(fun({WorkerFromName, WorkerToName}) ->
+        spawn(fun() ->
+            worker_unregister_on_node(WorkerFromName, WorkerToName),
+            ReplyPid ! done
+        end)
+    end, WorkerInfo),
+    %% wait
+    wait_unregister_on_node(CollectorPid, StartAt, WorkersPerNode).
+
+worker_unregister_on_node(FromName, ToName) when FromName > ToName -> ok;
+worker_unregister_on_node(Name, ToName) ->
     ok = syn:unregister(Name),
     ok = syn:unregister(Name),
-    do_unregister_on_node(Name + 1, ToName).
+    worker_unregister_on_node(Name + 1, ToName).
+
+wait_unregister_on_node(CollectorPid, StartAt, 0) ->
+    Time = (os:system_time(millisecond) - StartAt) / 1000,
+    io:format("----> Unregistered on node ~p on ~p secs.~n", [node(), Time]),
+    CollectorPid ! {done, node(), Time};
+wait_unregister_on_node(CollectorPid, StartAt, WorkersRemainingCount) ->
+    receive
+        done -> wait_unregister_on_node(CollectorPid, StartAt, WorkersRemainingCount - 1)
+    end.
 
 
 start_processes(Count) ->
 start_processes(Count) ->
     start_processes(Count, []).
     start_processes(Count, []).