syn_benchmark.erl 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475
  1. %% ==========================================================================================================
  2. %% Syn - A global Process Registry and Process Group manager.
  3. %%
  4. %% The MIT License (MIT)
  5. %%
  6. %% Copyright (c) 2019-2021 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
  7. %%
  8. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  9. %% of this software and associated documentation files (the "Software"), to deal
  10. %% in the Software without restriction, including without limitation the rights
  11. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  12. %% copies of the Software, and to permit persons to whom the Software is
  13. %% furnished to do so, subject to the following conditions:
  14. %%
  15. %% The above copyright notice and this permission notice shall be included in
  16. %% all copies or substantial portions of the Software.
  17. %%
  18. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  19. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  20. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  21. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  22. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  23. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  24. %% THE SOFTWARE.
  25. %% ==========================================================================================================
  26. -module(syn_benchmark).
  27. %% API
  28. -export([
  29. start/0,
  30. start_processes/1,
  31. process_loop/0,
  32. register_on_node/4,
  33. unregister_on_node/4,
  34. join_on_node/3,
  35. leave_on_node/3,
  36. wait_registry_propagation/1,
  37. wait_groups_propagation/1
  38. ]).
  39. -export([
  40. start_profiling/1,
  41. stop_profiling/1,
  42. start_profiling_on_node/0,
  43. stop_profiling_on_node/0
  44. ]).
  45. %% macros
  46. -define(TEST_GROUP_NAME, <<"test-group">>).
  47. %% ===================================================================
  48. %% API
  49. %% ===================================================================
  50. %% example run: `PROCESS_COUNT=100000 WORKERS_PER_NODE=100 NODES_COUNT=2 make bench`
  51. start() ->
  52. %% init
  53. ProcessCount = list_to_integer(os:getenv("PROCESS_COUNT", "100000")),
  54. WorkersPerNode = list_to_integer(os:getenv("WORKERS_PER_NODE", "1")),
  55. SlavesCount = list_to_integer(os:getenv("NODES_COUNT", "1")),
  56. SkipRegistry = case os:getenv("SKIP_REGISTRY") of false -> false; _ -> true end,
  57. SkipGroups = case os:getenv("SKIP_GROUPS") of false -> false; _ -> true end,
  58. ProcessesPerNode = round(ProcessCount / SlavesCount),
  59. io:format("-----> Starting benchmark~n"),
  60. io:format(" --> Nodes: ~w / ~w slave(s)~n", [SlavesCount + 1, SlavesCount]),
  61. io:format(" --> Total processes: ~w (~w / slave node)~n", [ProcessCount, ProcessesPerNode]),
  62. io:format(" --> Workers per node: ~w~n~n", [WorkersPerNode]),
  63. %% start nodes
  64. NodesInfo = lists:foldl(fun(I, Acc) ->
  65. %% start slave
  66. CountBin = integer_to_binary(I),
  67. NodeShortName = binary_to_atom(<<"slave_", CountBin/binary>>),
  68. {ok, Node} = ct_slave:start(NodeShortName, [
  69. {boot_timeout, 10},
  70. {monitor_master, true}
  71. ]),
  72. %% add code path
  73. CodePath = code:get_path(),
  74. true = rpc:call(Node, code, set_path, [CodePath]),
  75. %% start syn
  76. rpc:call(Node, syn, start, []),
  77. %% gather data
  78. FromName = (I - 1) * ProcessesPerNode + 1,
  79. ToName = FromName + ProcessesPerNode - 1,
  80. %% fold
  81. [{Node, FromName, ToName} | Acc]
  82. end, [], lists:seq(1, SlavesCount)),
  83. %% start syn locally
  84. ok = syn:start(),
  85. timer:sleep(1000),
  86. CollectorPid = self(),
  87. %% start processes
  88. PidsMap = lists:foldl(fun({Node, _FromName, _ToName}, Acc) ->
  89. Pids = rpc:call(Node, ?MODULE, start_processes, [ProcessesPerNode]),
  90. maps:put(Node, Pids, Acc)
  91. end, #{}, NodesInfo),
  92. case SkipRegistry of
  93. false ->
  94. %% start registration
  95. lists:foreach(fun({Node, FromName, _ToName}) ->
  96. Pids = maps:get(Node, PidsMap),
  97. rpc:cast(Node, ?MODULE, register_on_node, [CollectorPid, WorkersPerNode, FromName, Pids])
  98. end, NodesInfo),
  99. %% wait
  100. RegRemoteNodesTimes = wait_from_all_remote_nodes(nodes(), []),
  101. io:format("----> Remote registration times:~n"),
  102. io:format(" --> MIN: ~p secs.~n", [lists:min(RegRemoteNodesTimes)]),
  103. io:format(" --> MAX: ~p secs.~n", [lists:max(RegRemoteNodesTimes)]),
  104. {RegPropagationTimeMs, _} = timer:tc(?MODULE, wait_registry_propagation, [ProcessCount]),
  105. RegPropagationTime = RegPropagationTimeMs / 1000000,
  106. io:format("----> Eventual additional time to propagate all to master: ~p secs.~n", [RegPropagationTime]),
  107. %% sum
  108. RegTakenTime = (lists:max(RegRemoteNodesTimes) + RegPropagationTime),
  109. RegistrationRate = ProcessCount / RegTakenTime,
  110. io:format("====> Registeration rate (with propagation): ~p/sec.~n~n", [RegistrationRate]),
  111. %% start unregistration
  112. lists:foreach(fun({Node, FromName, ToName}) ->
  113. rpc:cast(Node, ?MODULE, unregister_on_node, [CollectorPid, WorkersPerNode, FromName, ToName])
  114. end, NodesInfo),
  115. %% wait
  116. UnregRemoteNodesTimes = wait_from_all_remote_nodes(nodes(), []),
  117. io:format("----> Remote unregistration times:~n"),
  118. io:format(" --> MIN: ~p secs.~n", [lists:min(UnregRemoteNodesTimes)]),
  119. io:format(" --> MAX: ~p secs.~n", [lists:max(UnregRemoteNodesTimes)]),
  120. {UnregPropagationTimeMs, _} = timer:tc(?MODULE, wait_registry_propagation, [0]),
  121. UnregPropagationTime = UnregPropagationTimeMs / 1000000,
  122. io:format("----> Eventual additional time to propagate all to master: ~p secs.~n", [UnregPropagationTime]),
  123. %% sum
  124. UnregTakenTime = (lists:max(UnregRemoteNodesTimes) + UnregPropagationTime),
  125. UnregistrationRate = ProcessCount / UnregTakenTime,
  126. io:format("====> Unregisteration rate (with propagation): ~p/sec.~n~n", [UnregistrationRate]),
  127. %% start re-registration
  128. lists:foreach(fun({Node, FromName, _ToName}) ->
  129. Pids = maps:get(Node, PidsMap),
  130. rpc:cast(Node, ?MODULE, register_on_node, [CollectorPid, WorkersPerNode, FromName, Pids])
  131. end, NodesInfo),
  132. %% wait
  133. ReRegRemoteNodesTimes = wait_from_all_remote_nodes(nodes(), []),
  134. io:format("----> Remote re-registration times:~n"),
  135. io:format(" --> MIN: ~p secs.~n", [lists:min(ReRegRemoteNodesTimes)]),
  136. io:format(" --> MAX: ~p secs.~n", [lists:max(ReRegRemoteNodesTimes)]),
  137. {ReRegPropagationTimeMs, _} = timer:tc(?MODULE, wait_registry_propagation, [ProcessCount]),
  138. ReRegPropagationTime = ReRegPropagationTimeMs / 1000000,
  139. io:format("----> Eventual additional time to propagate all to master: ~p secs.~n", [ReRegPropagationTime]),
  140. %% sum
  141. ReRegTakenTime = (lists:max(ReRegRemoteNodesTimes) + ReRegPropagationTime),
  142. ReRegistrationRate = ProcessCount / ReRegTakenTime,
  143. io:format("====> Re-registeration rate (with propagation): ~p/sec.~n~n", [ReRegistrationRate]),
  144. %% kill all processes
  145. maps:foreach(fun(_Node, Pids) ->
  146. lists:foreach(fun(Pid) -> exit(Pid, kill) end, Pids)
  147. end, PidsMap),
  148. %% wait all unregistered
  149. {RegKillPropagationTimeMs, _} = timer:tc(?MODULE, wait_registry_propagation, [0]),
  150. RegKillPropagationTime = RegKillPropagationTimeMs / 1000000,
  151. io:format("----> Time to propagate killed process to to master: ~p secs.~n", [RegKillPropagationTime]),
  152. RegKillRate = ProcessCount / RegKillPropagationTime,
  153. io:format("====> Unregistered after kill rate (with propagation): ~p/sec.~n~n", [RegKillRate]);
  154. true ->
  155. io:format("~n====> Skipping REGISTRY.~n~n")
  156. end,
  157. case SkipGroups of
  158. false ->
  159. %% start joining
  160. lists:foreach(fun({Node, _FromName, _ToName}) ->
  161. Pids = maps:get(Node, PidsMap),
  162. rpc:cast(Node, ?MODULE, join_on_node, [CollectorPid, WorkersPerNode, Pids])
  163. end, NodesInfo),
  164. %% wait
  165. JoinRemoteNodesTimes = wait_from_all_remote_nodes(nodes(), []),
  166. io:format("----> Remote join times:~n"),
  167. io:format(" --> MIN: ~p secs.~n", [lists:min(JoinRemoteNodesTimes)]),
  168. io:format(" --> MAX: ~p secs.~n", [lists:max(JoinRemoteNodesTimes)]),
  169. {JoinPropagationTimeMs, _} = timer:tc(?MODULE, wait_groups_propagation, [ProcessCount]),
  170. JoinPropagationTime = JoinPropagationTimeMs / 1000000,
  171. io:format("----> Eventual additional time to propagate all to master: ~p secs.~n", [JoinPropagationTime]),
  172. %% sum
  173. JoinTakenTime = (lists:max(JoinRemoteNodesTimes) + JoinPropagationTime),
  174. JoinRate = ProcessCount / JoinTakenTime,
  175. io:format("====> Join rate (with propagation): ~p/sec.~n~n", [JoinRate]),
  176. %% start leaving
  177. lists:foreach(fun({Node, _FromName, _ToName}) ->
  178. Pids = maps:get(Node, PidsMap),
  179. rpc:cast(Node, ?MODULE, leave_on_node, [CollectorPid, WorkersPerNode, Pids])
  180. end, NodesInfo),
  181. %% wait
  182. LeaveRemoteNodesTimes = wait_from_all_remote_nodes(nodes(), []),
  183. io:format("----> Remote leave times:~n"),
  184. io:format(" --> MIN: ~p secs.~n", [lists:min(LeaveRemoteNodesTimes)]),
  185. io:format(" --> MAX: ~p secs.~n", [lists:max(LeaveRemoteNodesTimes)]),
  186. {LeavePropagationTimeMs, _} = timer:tc(?MODULE, wait_groups_propagation, [0]),
  187. LeavePropagationTime = LeavePropagationTimeMs / 1000000,
  188. io:format("----> Eventual additional time to propagate all to master: ~p secs.~n", [LeavePropagationTime]),
  189. %% sum
  190. LeaveTakenTime = (lists:max(LeaveRemoteNodesTimes) + LeavePropagationTime),
  191. LeaveRate = ProcessCount / LeaveTakenTime,
  192. io:format("====> Leave rate (with propagation): ~p/sec.~n~n", [LeaveRate]),
  193. %% start re-joining
  194. lists:foreach(fun({Node, _FromName, _ToName}) ->
  195. Pids = maps:get(Node, PidsMap),
  196. rpc:cast(Node, ?MODULE, join_on_node, [CollectorPid, WorkersPerNode, Pids])
  197. end, NodesInfo),
  198. %% wait
  199. ReJoinRemoteNodesTimes = wait_from_all_remote_nodes(nodes(), []),
  200. io:format("----> Remote join times:~n"),
  201. io:format(" --> MIN: ~p secs.~n", [lists:min(ReJoinRemoteNodesTimes)]),
  202. io:format(" --> MAX: ~p secs.~n", [lists:max(ReJoinRemoteNodesTimes)]),
  203. {ReJoinPropagationTimeMs, _} = timer:tc(?MODULE, wait_groups_propagation, [ProcessCount]),
  204. ReJoinPropagationTime = ReJoinPropagationTimeMs / 1000000,
  205. io:format("----> Eventual additional time to propagate all to master: ~p secs.~n", [ReJoinPropagationTime]),
  206. %% sum
  207. ReJoinTakenTime = (lists:max(ReJoinRemoteNodesTimes) + ReJoinPropagationTime),
  208. ReJoinRate = ProcessCount / ReJoinTakenTime,
  209. io:format("====> Re-join rate (with propagation): ~p/sec.~n~n", [ReJoinRate]),
  210. %% kill all processes
  211. maps:foreach(fun(_Node, Pids) ->
  212. lists:foreach(fun(Pid) -> exit(Pid, kill) end, Pids)
  213. end, PidsMap),
  214. %% wait all unregistered
  215. {GroupsKillPropagationTimeMs, _} = timer:tc(?MODULE, wait_groups_propagation, [0]),
  216. GroupsKillPropagationTime = GroupsKillPropagationTimeMs / 1000000,
  217. io:format("----> Time to propagate killed process to to master: ~p secs.~n", [GroupsKillPropagationTime]),
  218. GroupsKillRate = ProcessCount / GroupsKillPropagationTime,
  219. io:format("====> Left after kill rate (with propagation): ~p/sec.~n~n", [GroupsKillRate]);
  220. true ->
  221. io:format("~n====> Skipping GROUPS.~n")
  222. end,
  223. %% stop node
  224. init:stop().
  225. register_on_node(CollectorPid, WorkersPerNode, FromName, Pids) ->
  226. %% split pids in workers
  227. PidsPerNode = round(length(Pids) / WorkersPerNode),
  228. {WorkerInfo, []} = lists:foldl(fun(I, {WInfo, RPids}) ->
  229. {WorkerPids, RestOfPids} = case I of
  230. WorkersPerNode ->
  231. %% last in the loop, get remaining pids
  232. {RPids, []};
  233. _ ->
  234. %% get portion of pids
  235. lists:split(PidsPerNode, RPids)
  236. end,
  237. WorkerFromName = FromName + (PidsPerNode * (I - 1)),
  238. {[{WorkerFromName, WorkerPids} | WInfo], RestOfPids}
  239. end, {[], Pids}, lists:seq(1, WorkersPerNode)),
  240. %% spawn workers
  241. ReplyPid = self(),
  242. lists:foreach(fun({WorkerFromName, WorkerPids}) ->
  243. spawn(fun() ->
  244. StartAt = os:system_time(millisecond),
  245. worker_register_on_node(WorkerFromName, WorkerPids),
  246. Time = (os:system_time(millisecond) - StartAt) / 1000,
  247. ReplyPid ! {done, Time}
  248. end)
  249. end, WorkerInfo),
  250. %% wait
  251. Time = wait_done_on_node(CollectorPid, 0, WorkersPerNode),
  252. io:format("----> Registered on node ~p on ~p secs.~n", [node(), Time]).
  253. worker_register_on_node(_Name, []) -> ok;
  254. worker_register_on_node(Name, [Pid | PidsTail]) ->
  255. ok = syn:register(Name, Pid),
  256. worker_register_on_node(Name + 1, PidsTail).
  257. unregister_on_node(CollectorPid, WorkersPerNode, FromName, ToName) ->
  258. %% split pids in workers
  259. ProcessesPerNode = ToName - FromName + 1,
  260. ProcessesPerWorker = round(ProcessesPerNode / WorkersPerNode),
  261. WorkerInfo = lists:foldl(fun(I, Acc) ->
  262. {WorkerFromName, WorkerToName} = case I of
  263. WorkersPerNode ->
  264. %% last in the loop
  265. {FromName + (I - 1) * ProcessesPerWorker, ToName};
  266. _ ->
  267. {FromName + (I - 1) * ProcessesPerWorker, FromName + I * ProcessesPerWorker - 1}
  268. end,
  269. [{WorkerFromName, WorkerToName} | Acc]
  270. end, [], lists:seq(1, WorkersPerNode)),
  271. %% spawn workers
  272. ReplyPid = self(),
  273. lists:foreach(fun({WorkerFromName, WorkerToName}) ->
  274. spawn(fun() ->
  275. StartAt = os:system_time(millisecond),
  276. worker_unregister_on_node(WorkerFromName, WorkerToName),
  277. Time = (os:system_time(millisecond) - StartAt) / 1000,
  278. ReplyPid ! {done, Time}
  279. end)
  280. end, WorkerInfo),
  281. %% wait
  282. Time = wait_done_on_node(CollectorPid, 0, WorkersPerNode),
  283. io:format("----> Unregistered on node ~p on ~p secs.~n", [node(), Time]).
  284. worker_unregister_on_node(FromName, ToName) when FromName > ToName -> ok;
  285. worker_unregister_on_node(Name, ToName) ->
  286. ok = syn:unregister(Name),
  287. worker_unregister_on_node(Name + 1, ToName).
  288. join_on_node(CollectorPid, WorkersPerNode, Pids) ->
  289. %% split pids in workers
  290. PidsPerNode = round(length(Pids) / WorkersPerNode),
  291. {PidsPerWorker, []} = lists:foldl(fun(I, {P, RPids}) ->
  292. {WPids, RestOfPids} = case I of
  293. WorkersPerNode ->
  294. %% last in the loop, get remaining pids
  295. {RPids, []};
  296. _ ->
  297. %% get portion of pids
  298. lists:split(PidsPerNode, RPids)
  299. end,
  300. {[WPids | P], RestOfPids}
  301. end, {[], Pids}, lists:seq(1, WorkersPerNode)),
  302. %% spawn workers
  303. ReplyPid = self(),
  304. lists:foreach(fun(WorkerPids) ->
  305. spawn(fun() ->
  306. StartAt = os:system_time(millisecond),
  307. worker_join_on_node(WorkerPids),
  308. Time = (os:system_time(millisecond) - StartAt) / 1000,
  309. ReplyPid ! {done, Time}
  310. end)
  311. end, PidsPerWorker),
  312. %% wait
  313. Time = wait_done_on_node(CollectorPid, 0, WorkersPerNode),
  314. io:format("----> Joined on node ~p on ~p secs.~n", [node(), Time]).
  315. worker_join_on_node([]) -> ok;
  316. worker_join_on_node([Pid | PidsTail]) ->
  317. ok = syn:join(?TEST_GROUP_NAME, Pid),
  318. worker_join_on_node(PidsTail).
  319. leave_on_node(CollectorPid, WorkersPerNode, Pids) ->
  320. %% split pids in workers
  321. PidsPerNode = round(length(Pids) / WorkersPerNode),
  322. {PidsPerWorker, []} = lists:foldl(fun(I, {P, RPids}) ->
  323. {WPids, RestOfPids} = case I of
  324. WorkersPerNode ->
  325. %% last in the loop, get remaining pids
  326. {RPids, []};
  327. _ ->
  328. %% get portion of pids
  329. lists:split(PidsPerNode, RPids)
  330. end,
  331. {[WPids | P], RestOfPids}
  332. end, {[], Pids}, lists:seq(1, WorkersPerNode)),
  333. %% spawn workers
  334. ReplyPid = self(),
  335. lists:foreach(fun(WorkerPids) ->
  336. spawn(fun() ->
  337. StartAt = os:system_time(millisecond),
  338. worker_leave_on_node(WorkerPids),
  339. Time = (os:system_time(millisecond) - StartAt) / 1000,
  340. ReplyPid ! {done, Time}
  341. end)
  342. end, PidsPerWorker),
  343. %% wait
  344. Time = wait_done_on_node(CollectorPid, 0, WorkersPerNode),
  345. io:format("----> Left on node ~p on ~p secs.~n", [node(), Time]).
  346. worker_leave_on_node([]) -> ok;
  347. worker_leave_on_node([Pid | PidsTail]) ->
  348. ok = syn:leave(?TEST_GROUP_NAME, Pid),
  349. worker_leave_on_node(PidsTail).
  350. wait_done_on_node(CollectorPid, Time, 0) ->
  351. CollectorPid ! {done, node(), Time},
  352. Time;
  353. wait_done_on_node(CollectorPid, Time, WorkersRemainingCount) ->
  354. receive
  355. {done, WorkerTime} ->
  356. Time1 = lists:max([WorkerTime, Time]),
  357. wait_done_on_node(CollectorPid, Time1, WorkersRemainingCount - 1)
  358. end.
  359. start_processes(Count) ->
  360. start_processes(Count, []).
  361. start_processes(0, Pids) ->
  362. Pids;
  363. start_processes(Count, Pids) ->
  364. Pid = spawn(fun process_loop/0),
  365. start_processes(Count - 1, [Pid | Pids]).
  366. process_loop() ->
  367. receive
  368. _ -> ok
  369. end.
  370. wait_from_all_remote_nodes([], Times) -> Times;
  371. wait_from_all_remote_nodes([RemoteNode | Tail], Times) ->
  372. receive
  373. {done, RemoteNode, Time} ->
  374. wait_from_all_remote_nodes(Tail, [Time | Times])
  375. end.
  376. wait_registry_propagation(DesiredCount) ->
  377. case syn:registry_count(default) of
  378. DesiredCount ->
  379. ok;
  380. _ ->
  381. timer:sleep(50),
  382. wait_registry_propagation(DesiredCount)
  383. end.
  384. wait_groups_propagation(DesiredCount) ->
  385. case length(syn:members(?TEST_GROUP_NAME)) of
  386. DesiredCount ->
  387. ok;
  388. _ ->
  389. timer:sleep(50),
  390. wait_groups_propagation(DesiredCount)
  391. end.
  392. start_profiling(NodesInfo) ->
  393. {Node, _FromName, _ToName} = hd(NodesInfo),
  394. ok = rpc:call(Node, ?MODULE, start_profiling_on_node, []).
  395. stop_profiling(NodesInfo) ->
  396. {Node, _FromName, _ToName} = hd(NodesInfo),
  397. ok = rpc:call(Node, ?MODULE, stop_profiling_on_node, []).
  398. start_profiling_on_node() ->
  399. {ok, P} = eprof:start(),
  400. eprof:start_profiling(erlang:processes() -- [P]),
  401. ok.
  402. stop_profiling_on_node() ->
  403. eprof:stop_profiling(),
  404. eprof:analyze(total),
  405. ok.