Browse Source

Add new benchmark.

Roberto Ostinelli 5 years ago
parent
commit
0dda6551d1
2 changed files with 111 additions and 177 deletions
  1. 1 1
      Makefile
  2. 110 176
      test/syn_benchmark.erl

+ 1 - 1
Makefile

@@ -41,7 +41,7 @@ endif
 bench: compile_test
 	@erl -pa `rebar3 as test path` \
 	-pa `rebar3 as test path`/../test \
-	-name syn_bench@127.0.0.1 \
+	-name syn_bench_master@127.0.0.1 \
 	+K true \
 	-mnesia schema_location ram \
 	-noshell \

+ 110 - 176
test/syn_benchmark.erl

@@ -27,9 +27,12 @@
 
 %% API
 -export([start/0]).
--export([register/1, unregister/1]).
--export([register_on_node/1, unregister_on_node/1]).
 -export([process_loop/0]).
+-export([collection_loop/2]).
+-export([run_test_on_node/3]).
+-export([register_pids/1]).
+-export([unregister_pids/1]).
+-export([wait_for_unregistered/1]).
 
 %% macros
 -define(MAX_RETRIEVE_WAITING_TIME, 60000).
@@ -39,197 +42,128 @@
 %% ===================================================================
 start() ->
     %% init
-    NodeCount = list_to_integer(os:getenv("SYN_BENCH_NODE_COUNT", "4")),
+    NodeCount = list_to_integer(os:getenv("SYN_BENCH_NODE_COUNT", "2")),
     ProcessCount = list_to_integer(os:getenv("SYN_PROCESS_COUNT", "100000")),
     %% start nodes
-    SlaveNodes = lists:foldl(fun(Count, Acc) ->
-        ShortName = list_to_atom("syn_slave_" ++ integer_to_list(Count)),
-        {ok, SlaveNode} = syn_test_suite_helper:start_slave(ShortName),
-        [SlaveNode | Acc]
-    end, [], lists:seq(1, NodeCount - 1)),
-    io:format("-----> Started ~p nodes: ~p~n", [NodeCount, [node() | SlaveNodes]]),
-
-    %% start syn
+    lists:foreach(fun(Count) ->
+        ShortName = list_to_atom("syn_bench_slave_" ++ integer_to_list(Count)),
+        {ok, _} = syn_test_suite_helper:start_slave(ShortName)
+    end, lists:seq(1, NodeCount - 1)),
+
+    Nodes = [node() | nodes()],
+    io:format("-----> Started ~p nodes: ~p~n", [length(Nodes), Nodes]),
+
+    %% start syn everywhere
     lists:foreach(fun(Node) ->
         ok = rpc:call(Node, syn, start, [])
+    end, Nodes),
+    timer:sleep(1000),
+
+    %% compute names per node
+    ProcessesPerNode = round(ProcessCount / length(Nodes)),
+    io:format("-----> ~p processes per node~n", [ProcessesPerNode]),
+
+    %% launch test everywhere
+    CollectingPid = self(),
+    lists:foldl(fun(Node, Acc) ->
+        FromName = Acc * ProcessesPerNode + 1,
+        ToName = FromName + ProcessesPerNode - 1,
+        rpc:cast(Node, ?MODULE, run_test_on_node, [CollectingPid, FromName, ToName]),
+        Acc + 1
+    end, 0, Nodes),
+
+    Results = collection_loop(Nodes, []),
+    io:format("-----> Results: ~p~n", [Results]),
+
+    %% stop syn everywhere
+    lists:foreach(fun(Node) ->
+        ok = rpc:call(Node, syn, stop, [])
     end, [node() | nodes()]),
     timer:sleep(1000),
 
-    try
-        %% launch processes
-        {UpperName, PidInfos} = launch_processes(ProcessCount),
-
-        %% benchmark: register
-        {TimeReg, _} = timer:tc(?MODULE, register, [PidInfos]),
-        io:format("-----> Registered processes in ~p sec, at a rate of ~p/sec~n", [
-            TimeReg / 1000000,
-            ProcessCount / TimeReg * 1000000
-        ]),
-
-        %% benchmark: registration propagation
-        {RetrievedInMs1, RetrieveProcess1} = retrieve(pid, UpperName),
-        io:format("-----> Check that process with Name ~p was found: ~p in ~p ms~n", [
-            UpperName, RetrieveProcess1, RetrievedInMs1
-        ]),
-
-        %% benchmark: unregister
-        {TimeUnreg, _} = timer:tc(?MODULE, unregister, [PidInfos]),
-        io:format("-----> Unregistered processes in ~p sec, at a rate of ~p/sec~n", [
-            TimeUnreg / 1000000,
-            ProcessCount / TimeUnreg * 1000000
-        ]),
-
-        %% benchmark: unregistration propagation
-        {RetrievedInMs2, RetrieveProcess2} = retrieve(undefined, UpperName),
-        io:format("-----> Check that process with Name ~p was NOT found: ~p in ~p ms~n", [
-            UpperName, RetrieveProcess2, RetrievedInMs2
-        ]),
-
-        %% benchmark: re-registering
-        {TimeReg2, _} = timer:tc(?MODULE, register, [PidInfos]),
-        io:format("-----> Re-registered processes in ~p sec, at a rate of ~p/sec~n", [
-            TimeReg2 / 1000000,
-            ProcessCount / TimeReg2 * 1000000
-        ]),
-
-        %% benchmark: re-registration propagation
-        {RetrievedInMs3, RetrieveProcess3} = retrieve(pid, UpperName),
-        io:format("-----> Check that process with Name ~p was found: ~p in ~p ms~n", [
-            UpperName, RetrieveProcess3, RetrievedInMs3
-        ]),
-
-        %% benchmark: monitoring
-        kill_processes(PidInfos),
-        {RetrievedInMs4, RetrieveProcess4} = retrieve(undefined, UpperName),
-        io:format("-----> Check that process with Name ~p was NOT found: ~p in ~p ms~n", [
-            UpperName, RetrieveProcess4, RetrievedInMs4
-        ])
-
-    after
-        %% stop syn
-        lists:foreach(fun(Node) ->
-            ok = rpc:call(Node, syn, stop, [])
-        end, [node() | nodes()]),
-        timer:sleep(1000),
-
-        %% stop nodes
-        lists:foreach(fun(SlaveNode) ->
-            syn_test_suite_helper:connect_node(SlaveNode),
-            ShortName = list_to_atom(lists:nth(1, string:split(atom_to_list(SlaveNode), "@"))),
-            syn_test_suite_helper:stop_slave(ShortName)
-        end, SlaveNodes),
-        io:format("-----> Stopped ~p nodes: ~p~n", [length(SlaveNodes) + 1, [node() | SlaveNodes]]),
-
-        %% stop node
-        init:stop()
-    end.
+    %% stop nodes
+    lists:foreach(fun(SlaveNode) ->
+        syn_test_suite_helper:connect_node(SlaveNode),
+        ShortName = list_to_atom(lists:nth(1, string:split(atom_to_list(SlaveNode), "@"))),
+        syn_test_suite_helper:stop_slave(ShortName)
+    end, nodes()),
+    io:format("-----> Stopped ~p nodes: ~p~n", [length(Nodes), Nodes]),
+
+    %% stop node
+    init:stop().
+
+run_test_on_node(CollectingPid, FromName, ToName) ->
+    %% launch processes - list is in format {Name, pid()}
+    PidTuples = [{Name, spawn(?MODULE, process_loop, [])} || Name <- lists:seq(FromName, ToName)],
+    {ToName, ToPid} = lists:last(PidTuples),
+
+    %% register
+    {TimeReg0, _} = timer:tc(?MODULE, register_pids, [PidTuples]),
+    RegisteringRate1 = length(PidTuples) / TimeReg0 * 1000000,
+    %% check
+    ToPid = syn:whereis(ToName),
+
+    %% unregister
+    {TimeReg1, _} = timer:tc(?MODULE, unregister_pids, [PidTuples]),
+    UnRegisteringRate1 = length(PidTuples) / TimeReg1 * 1000000,
+    %% check
+    undefined = syn:whereis(ToName),
+
+    %% re-register
+    {TimeReg3, _} = timer:tc(?MODULE, register_pids, [PidTuples]),
+    RegisteringRate2 = length(PidTuples) / TimeReg3 * 1000000,
+    %% check
+    ToPid = syn:whereis(ToName),
+
+    %% kill all
+    lists:foreach(fun({_Name, Pid}) ->
+        exit(Pid, kill)
+    end, PidTuples),
+
+    %% check all unregistered
+    {TimeReg4, _} = timer:tc(?MODULE, wait_for_unregistered, [ToName]),
+    CheckAllKilled = TimeReg4 / 1000000,
+
+    %% return
+    CollectingPid ! {node(), [
+        {registering_rate_1, RegisteringRate1},
+        {unregistering_rate_1, UnRegisteringRate1},
+        {registering_rate_2, RegisteringRate2},
+        {check_all_killed, CheckAllKilled}
+    ]}.
 
 %% ===================================================================
 %% Internal
 %% ===================================================================
+collection_loop([], Results) -> Results;
+collection_loop(Nodes, Results) ->
+    receive
+        {Node, Result} ->
+            collection_loop(lists:delete(Node, Nodes), [{Node, Result} | Results])
+    after ?MAX_RETRIEVE_WAITING_TIME ->
+        io:format("COULD NOT COMPLETE TEST IN ~p seconds", [?MAX_RETRIEVE_WAITING_TIME])
+    end.
+
 process_loop() ->
     receive
         _ -> ok
     end.
 
-launch_processes(ProcessCount) ->
-    %% return the processes info in format [{Node, [{Name, Pid}]}, ...]
-    Nodes = [node() | nodes()],
-    ProcessesPerNode = round(ProcessCount / length(Nodes)),
-    UpperName = integer_to_list(ProcessesPerNode * length(Nodes)),
-    F = fun(Node, Acc) ->
-        StartingName = length(Acc) * ProcessesPerNode,
-        Pids = launch_processes_on_node(ProcessesPerNode, StartingName, Node),
-        [{Node, Pids} | Acc]
-    end,
-    {UpperName, lists:foldl(F, [], Nodes)}.
-
-launch_processes_on_node(ProcessesPerNode, StartingName, Node) ->
-    %% return the name and process in a list of format [{Name, Pid}, ...]
-    Seq = [
-        integer_to_list(Name)
-        || Name <- lists:seq(StartingName + 1, ProcessesPerNode + StartingName)
-    ],
-    [{Name, spawn(Node, ?MODULE, process_loop, [])} || Name <- Seq].
-
-register(PidInfos) ->
-    %% register in parallel on all nodes
-    F = fun({Node, NodePidInfos}, Acc) ->
-        RpcKey = rpc:async_call(Node, ?MODULE, register_on_node, [NodePidInfos]),
-        [{Node, RpcKey} | Acc]
-    end,
-    RpcKeys = lists:foldl(F, [], PidInfos),
-    %% wait for registration to complete on all nodes
-    FResult = fun({Node, RpcKey}) ->
-        Registered = rpc:yield(RpcKey),
-        io:format("       Registered ~p processes on node ~p~n", [Registered, Node])
-    end,
-    lists:foreach(FResult, RpcKeys).
-
-register_on_node(NodePidInfos) ->
-    F = fun({Name, Pid}) ->
-        syn:register(Name, Pid)
-    end,
-    lists:foreach(F, NodePidInfos),
-    length(NodePidInfos).
-
-retrieve(Expected, Name) ->
-    StartTime = epoch_time_ms(),
-    retrieve(Expected, Name, StartTime).
-retrieve(pid, Name, StartTime) ->
-    %% wait for a pid to be returned
-    case syn:whereis(Name) of
-        undefined ->
-            timer:sleep(50),
-            case epoch_time_ms() > StartTime + ?MAX_RETRIEVE_WAITING_TIME of
-                true -> {error, timeout_during_retrieve};
-                false -> retrieve(pid, Name, StartTime)
-            end;
-        Pid ->
-            RetrievedInMs = epoch_time_ms() - StartTime,
-            {RetrievedInMs, Pid}
-    end;
-retrieve(undefined, Name, StartTime) ->
-    %% wait for undefined to be returned
-    case syn:whereis(Name) of
+register_pids([]) -> ok;
+register_pids([{Name, Pid} | TPidTuples]) ->
+    ok = syn:register(Name, Pid),
+    register_pids(TPidTuples).
+
+wait_for_unregistered(ToName) ->
+    case syn:whereis(ToName) of
         undefined ->
-            RetrievedInMs = epoch_time_ms() - StartTime,
-            {RetrievedInMs, undefined};
-        _Pid ->
+            ok;
+        _ ->
             timer:sleep(50),
-            case epoch_time_ms() > StartTime + ?MAX_RETRIEVE_WAITING_TIME of
-                true -> {error, timeout_during_retrieve};
-                false -> retrieve(undefined, Name, StartTime)
-            end
+            wait_for_unregistered(ToName)
     end.
 
-unregister(PidInfos) ->
-    %% unregister in parallel on all nodes
-    F = fun({Node, NodePidInfos}, Acc) ->
-        RpcKey = rpc:async_call(Node, ?MODULE, unregister_on_node, [NodePidInfos]),
-        [{Node, RpcKey} | Acc]
-    end,
-    RpcKeys = lists:foldl(F, [], PidInfos),
-    %% wait for unregistration to complete on all nodes
-    FResult = fun({Node, RpcKey}) ->
-        Unregistered = rpc:yield(RpcKey),
-        io:format("       Unregistered ~p processes on node ~p~n", [Unregistered, Node])
-    end,
-    lists:foreach(FResult, RpcKeys).
-
-unregister_on_node(NodePidInfos) ->
-    F = fun({Name, _Pid}) ->
-        syn:unregister(Name)
-    end,
-    lists:foreach(F, NodePidInfos),
-    length(NodePidInfos).
-
-kill_processes(PidInfos) ->
-    F = fun({_Node, NodePidInfos}) ->
-        [exit(Pid, kill) || {_Name, Pid} <- NodePidInfos]
-    end,
-    lists:foreach(F, PidInfos).
-
-epoch_time_ms() ->
-    {Mega, Sec, Micro} = os:timestamp(),
-    (Mega * 1000000 + Sec) * 1000 + round(Micro / 1000).
+unregister_pids([]) -> ok;
+unregister_pids([{Name, _Pid} | TPidTuples]) ->
+    ok = syn:unregister(Name),
+    unregister_pids(TPidTuples).