|
@@ -47,9 +47,9 @@
|
|
%% example run: `PROCESS_COUNT=100000 WORKERS_PER_NODE=100 NODES_COUNT=2 make bench`
|
|
%% example run: `PROCESS_COUNT=100000 WORKERS_PER_NODE=100 NODES_COUNT=2 make bench`
|
|
start() ->
|
|
start() ->
|
|
%% init
|
|
%% init
|
|
- SlavesCount = list_to_integer(os:getenv("NODES_COUNT", "1")),
|
|
|
|
- WorkersPerNode = list_to_integer(os:getenv("WORKERS_PER_NODE", "100")),
|
|
|
|
ProcessCount = list_to_integer(os:getenv("PROCESS_COUNT", "100000")),
|
|
ProcessCount = list_to_integer(os:getenv("PROCESS_COUNT", "100000")),
|
|
|
|
+ WorkersPerNode = list_to_integer(os:getenv("WORKERS_PER_NODE", "1")),
|
|
|
|
+ SlavesCount = list_to_integer(os:getenv("NODES_COUNT", "1")),
|
|
|
|
|
|
ProcessesPerNode = round(ProcessCount / SlavesCount),
|
|
ProcessesPerNode = round(ProcessCount / SlavesCount),
|
|
|
|
|
|
@@ -189,31 +189,32 @@ register_on_node(CollectorPid, WorkersPerNode, FromName, Pids) ->
|
|
WorkerFromName = FromName + (PidsPerNode * (I - 1)),
|
|
WorkerFromName = FromName + (PidsPerNode * (I - 1)),
|
|
{[{WorkerFromName, WorkerPids} | WInfo], RestOfPids}
|
|
{[{WorkerFromName, WorkerPids} | WInfo], RestOfPids}
|
|
end, {[], Pids}, lists:seq(1, WorkersPerNode)),
|
|
end, {[], Pids}, lists:seq(1, WorkersPerNode)),
|
|
- %% start measure, spawn time is irrelevant
|
|
|
|
- StartAt = os:system_time(millisecond),
|
|
|
|
%% spawn workers
|
|
%% spawn workers
|
|
ReplyPid = self(),
|
|
ReplyPid = self(),
|
|
lists:foreach(fun({WorkerFromName, WorkerPids}) ->
|
|
lists:foreach(fun({WorkerFromName, WorkerPids}) ->
|
|
spawn(fun() ->
|
|
spawn(fun() ->
|
|
|
|
+ StartAt = os:system_time(millisecond),
|
|
worker_register_on_node(WorkerFromName, WorkerPids),
|
|
worker_register_on_node(WorkerFromName, WorkerPids),
|
|
- ReplyPid ! done
|
|
|
|
|
|
+ Time = (os:system_time(millisecond) - StartAt) / 1000,
|
|
|
|
+ ReplyPid ! {done, Time}
|
|
end)
|
|
end)
|
|
end, WorkerInfo),
|
|
end, WorkerInfo),
|
|
%% wait
|
|
%% wait
|
|
- wait_register_on_node(CollectorPid, StartAt, WorkersPerNode).
|
|
|
|
|
|
+ wait_register_on_node(CollectorPid, 0, WorkersPerNode).
|
|
|
|
|
|
worker_register_on_node(_Name, []) -> ok;
|
|
worker_register_on_node(_Name, []) -> ok;
|
|
worker_register_on_node(Name, [Pid | PidsTail]) ->
|
|
worker_register_on_node(Name, [Pid | PidsTail]) ->
|
|
ok = syn:register(Name, Pid),
|
|
ok = syn:register(Name, Pid),
|
|
worker_register_on_node(Name + 1, PidsTail).
|
|
worker_register_on_node(Name + 1, PidsTail).
|
|
|
|
|
|
-wait_register_on_node(CollectorPid, StartAt, 0) ->
|
|
|
|
- Time = (os:system_time(millisecond) - StartAt) / 1000,
|
|
|
|
|
|
+wait_register_on_node(CollectorPid, Time, 0) ->
|
|
io:format("----> Registered on node ~p on ~p secs.~n", [node(), Time]),
|
|
io:format("----> Registered on node ~p on ~p secs.~n", [node(), Time]),
|
|
CollectorPid ! {done, node(), Time};
|
|
CollectorPid ! {done, node(), Time};
|
|
-wait_register_on_node(CollectorPid, StartAt, WorkersRemainingCount) ->
|
|
|
|
|
|
+wait_register_on_node(CollectorPid, Time, WorkersRemainingCount) ->
|
|
receive
|
|
receive
|
|
- done -> wait_register_on_node(CollectorPid, StartAt, WorkersRemainingCount - 1)
|
|
|
|
|
|
+ {done, WorkerTime} ->
|
|
|
|
+ Time1 = lists:max([WorkerTime, Time]),
|
|
|
|
+ wait_register_on_node(CollectorPid, Time1, WorkersRemainingCount - 1)
|
|
end.
|
|
end.
|
|
|
|
|
|
unregister_on_node(CollectorPid, WorkersPerNode, FromName, ToName) ->
|
|
unregister_on_node(CollectorPid, WorkersPerNode, FromName, ToName) ->
|
|
@@ -231,31 +232,32 @@ unregister_on_node(CollectorPid, WorkersPerNode, FromName, ToName) ->
|
|
end,
|
|
end,
|
|
[{WorkerFromName, WorkerToName} | Acc]
|
|
[{WorkerFromName, WorkerToName} | Acc]
|
|
end, [], lists:seq(1, WorkersPerNode)),
|
|
end, [], lists:seq(1, WorkersPerNode)),
|
|
- %% start measure, spawn time is irrelevant
|
|
|
|
- StartAt = os:system_time(millisecond),
|
|
|
|
%% spawn workers
|
|
%% spawn workers
|
|
ReplyPid = self(),
|
|
ReplyPid = self(),
|
|
lists:foreach(fun({WorkerFromName, WorkerToName}) ->
|
|
lists:foreach(fun({WorkerFromName, WorkerToName}) ->
|
|
spawn(fun() ->
|
|
spawn(fun() ->
|
|
|
|
+ StartAt = os:system_time(millisecond),
|
|
worker_unregister_on_node(WorkerFromName, WorkerToName),
|
|
worker_unregister_on_node(WorkerFromName, WorkerToName),
|
|
- ReplyPid ! done
|
|
|
|
|
|
+ Time = (os:system_time(millisecond) - StartAt) / 1000,
|
|
|
|
+ ReplyPid ! {done, Time}
|
|
end)
|
|
end)
|
|
end, WorkerInfo),
|
|
end, WorkerInfo),
|
|
%% wait
|
|
%% wait
|
|
- wait_unregister_on_node(CollectorPid, StartAt, WorkersPerNode).
|
|
|
|
|
|
+ wait_unregister_on_node(CollectorPid, 0, WorkersPerNode).
|
|
|
|
|
|
worker_unregister_on_node(FromName, ToName) when FromName > ToName -> ok;
|
|
worker_unregister_on_node(FromName, ToName) when FromName > ToName -> ok;
|
|
worker_unregister_on_node(Name, ToName) ->
|
|
worker_unregister_on_node(Name, ToName) ->
|
|
ok = syn:unregister(Name),
|
|
ok = syn:unregister(Name),
|
|
worker_unregister_on_node(Name + 1, ToName).
|
|
worker_unregister_on_node(Name + 1, ToName).
|
|
|
|
|
|
-wait_unregister_on_node(CollectorPid, StartAt, 0) ->
|
|
|
|
- Time = (os:system_time(millisecond) - StartAt) / 1000,
|
|
|
|
|
|
+wait_unregister_on_node(CollectorPid, Time, 0) ->
|
|
io:format("----> Unregistered on node ~p on ~p secs.~n", [node(), Time]),
|
|
io:format("----> Unregistered on node ~p on ~p secs.~n", [node(), Time]),
|
|
CollectorPid ! {done, node(), Time};
|
|
CollectorPid ! {done, node(), Time};
|
|
-wait_unregister_on_node(CollectorPid, StartAt, WorkersRemainingCount) ->
|
|
|
|
|
|
+wait_unregister_on_node(CollectorPid, Time, WorkersRemainingCount) ->
|
|
receive
|
|
receive
|
|
- done -> wait_unregister_on_node(CollectorPid, StartAt, WorkersRemainingCount - 1)
|
|
|
|
|
|
+ {done, WorkerTime} ->
|
|
|
|
+ Time1 = lists:max([WorkerTime, Time]),
|
|
|
|
+ wait_unregister_on_node(CollectorPid, Time1, WorkersRemainingCount - 1)
|
|
end.
|
|
end.
|
|
|
|
|
|
start_processes(Count) ->
|
|
start_processes(Count) ->
|