syn_benchmark.erl 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. %% ==========================================================================================================
  2. %% Syn - A global Process Registry and Process Group manager.
  3. %%
  4. %% The MIT License (MIT)
  5. %%
  6. %% Copyright (c) 2019 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
  7. %%
  8. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  9. %% of this software and associated documentation files (the "Software"), to deal
  10. %% in the Software without restriction, including without limitation the rights
  11. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  12. %% copies of the Software, and to permit persons to whom the Software is
  13. %% furnished to do so, subject to the following conditions:
  14. %%
  15. %% The above copyright notice and this permission notice shall be included in
  16. %% all copies or substantial portions of the Software.
  17. %%
  18. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  19. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  20. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  21. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  22. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  23. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  24. %% THE SOFTWARE.
  25. %% ==========================================================================================================
  26. -module(syn_benchmark).
  27. %% API
  28. -export([start/0]).
  29. -export([register/1, unregister/1]).
  30. -export([register_on_node/1, unregister_on_node/1]).
  31. -export([process_loop/0]).
  32. %% macros
  33. -define(MAX_RETRIEVE_WAITING_TIME, 60000).
  34. %% ===================================================================
  35. %% API
  36. %% ===================================================================
  37. start() ->
  38. %% init
  39. NodeCount = list_to_integer(os:getenv("SYN_BENCH_NODE_COUNT", "4")),
  40. ProcessCount = list_to_integer(os:getenv("SYN_PROCESS_COUNT", "100000")),
  41. %% start nodes
  42. SlaveNodes = lists:foldl(fun(Count, Acc) ->
  43. ShortName = list_to_atom("syn_slave_" ++ integer_to_list(Count)),
  44. {ok, SlaveNode} = syn_test_suite_helper:start_slave(ShortName),
  45. [SlaveNode | Acc]
  46. end, [], lists:seq(1, NodeCount - 1)),
  47. io:format("-----> Started ~p nodes: ~p~n", [NodeCount, [node() | SlaveNodes]]),
  48. %% start syn
  49. lists:foreach(fun(Node) ->
  50. ok = rpc:call(Node, syn, start, [])
  51. end, [node() | nodes()]),
  52. timer:sleep(1000),
  53. try
  54. %% launch processes
  55. {UpperName, PidInfos} = launch_processes(ProcessCount),
  56. %% benchmark: register
  57. {TimeReg, _} = timer:tc(?MODULE, register, [PidInfos]),
  58. io:format("-----> Registered processes in ~p sec, at a rate of ~p/sec~n", [
  59. TimeReg / 1000000,
  60. ProcessCount / TimeReg * 1000000
  61. ]),
  62. %% benchmark: registration propagation
  63. {RetrievedInMs1, RetrieveProcess1} = retrieve(pid, UpperName),
  64. io:format("-----> Check that process with Name ~p was found: ~p in ~p ms~n", [
  65. UpperName, RetrieveProcess1, RetrievedInMs1
  66. ]),
  67. %% benchmark: unregister
  68. {TimeUnreg, _} = timer:tc(?MODULE, unregister, [PidInfos]),
  69. io:format("-----> Unregistered processes in ~p sec, at a rate of ~p/sec~n", [
  70. TimeUnreg / 1000000,
  71. ProcessCount / TimeUnreg * 1000000
  72. ]),
  73. %% benchmark: unregistration propagation
  74. {RetrievedInMs2, RetrieveProcess2} = retrieve(undefined, UpperName),
  75. io:format("-----> Check that process with Name ~p was NOT found: ~p in ~p ms~n", [
  76. UpperName, RetrieveProcess2, RetrievedInMs2
  77. ]),
  78. %% benchmark: re-registering
  79. {TimeReg2, _} = timer:tc(?MODULE, register, [PidInfos]),
  80. io:format("-----> Re-registered processes in ~p sec, at a rate of ~p/sec~n", [
  81. TimeReg2 / 1000000,
  82. ProcessCount / TimeReg2 * 1000000
  83. ]),
  84. %% benchmark: re-registration propagation
  85. {RetrievedInMs3, RetrieveProcess3} = retrieve(pid, UpperName),
  86. io:format("-----> Check that process with Name ~p was found: ~p in ~p ms~n", [
  87. UpperName, RetrieveProcess3, RetrievedInMs3
  88. ]),
  89. %% benchmark: monitoring
  90. kill_processes(PidInfos),
  91. {RetrievedInMs4, RetrieveProcess4} = retrieve(undefined, UpperName),
  92. io:format("-----> Check that process with Name ~p was NOT found: ~p in ~p ms~n", [
  93. UpperName, RetrieveProcess4, RetrievedInMs4
  94. ])
  95. after
  96. %% stop syn
  97. lists:foreach(fun(Node) ->
  98. ok = rpc:call(Node, syn, stop, [])
  99. end, [node() | nodes()]),
  100. timer:sleep(1000),
  101. %% stop nodes
  102. lists:foreach(fun(SlaveNode) ->
  103. syn_test_suite_helper:connect_node(SlaveNode),
  104. ShortName = list_to_atom(lists:nth(1, string:split(atom_to_list(SlaveNode), "@"))),
  105. syn_test_suite_helper:stop_slave(ShortName)
  106. end, SlaveNodes),
  107. io:format("-----> Stopped ~p nodes: ~p~n", [length(SlaveNodes) + 1, [node() | SlaveNodes]]),
  108. %% stop node
  109. init:stop()
  110. end.
  111. %% ===================================================================
  112. %% Internal
  113. %% ===================================================================
  114. process_loop() ->
  115. receive
  116. _ -> ok
  117. end.
  118. launch_processes(ProcessCount) ->
  119. %% return the processes info in format [{Node, [{Name, Pid}]}, ...]
  120. Nodes = [node() | nodes()],
  121. ProcessesPerNode = round(ProcessCount / length(Nodes)),
  122. UpperName = integer_to_list(ProcessesPerNode * length(Nodes)),
  123. F = fun(Node, Acc) ->
  124. StartingName = length(Acc) * ProcessesPerNode,
  125. Pids = launch_processes_on_node(ProcessesPerNode, StartingName, Node),
  126. [{Node, Pids} | Acc]
  127. end,
  128. {UpperName, lists:foldl(F, [], Nodes)}.
  129. launch_processes_on_node(ProcessesPerNode, StartingName, Node) ->
  130. %% return the name and process in a list of format [{Name, Pid}, ...]
  131. Seq = [
  132. integer_to_list(Name)
  133. || Name <- lists:seq(StartingName + 1, ProcessesPerNode + StartingName)
  134. ],
  135. [{Name, spawn(Node, ?MODULE, process_loop, [])} || Name <- Seq].
  136. register(PidInfos) ->
  137. %% register in parallel on all nodes
  138. F = fun({Node, NodePidInfos}, Acc) ->
  139. RpcKey = rpc:async_call(Node, ?MODULE, register_on_node, [NodePidInfos]),
  140. [{Node, RpcKey} | Acc]
  141. end,
  142. RpcKeys = lists:foldl(F, [], PidInfos),
  143. %% wait for registration to complete on all nodes
  144. FResult = fun({Node, RpcKey}) ->
  145. Registered = rpc:yield(RpcKey),
  146. io:format(" Registered ~p processes on node ~p~n", [Registered, Node])
  147. end,
  148. lists:foreach(FResult, RpcKeys).
  149. register_on_node(NodePidInfos) ->
  150. F = fun({Name, Pid}) ->
  151. syn:register(Name, Pid)
  152. end,
  153. lists:foreach(F, NodePidInfos),
  154. length(NodePidInfos).
  155. retrieve(Expected, Name) ->
  156. StartTime = epoch_time_ms(),
  157. retrieve(Expected, Name, StartTime).
  158. retrieve(pid, Name, StartTime) ->
  159. %% wait for a pid to be returned
  160. case syn:whereis(Name) of
  161. undefined ->
  162. timer:sleep(50),
  163. case epoch_time_ms() > StartTime + ?MAX_RETRIEVE_WAITING_TIME of
  164. true -> {error, timeout_during_retrieve};
  165. false -> retrieve(pid, Name, StartTime)
  166. end;
  167. Pid ->
  168. RetrievedInMs = epoch_time_ms() - StartTime,
  169. {RetrievedInMs, Pid}
  170. end;
  171. retrieve(undefined, Name, StartTime) ->
  172. %% wait for undefined to be returned
  173. case syn:whereis(Name) of
  174. undefined ->
  175. RetrievedInMs = epoch_time_ms() - StartTime,
  176. {RetrievedInMs, undefined};
  177. _Pid ->
  178. timer:sleep(50),
  179. case epoch_time_ms() > StartTime + ?MAX_RETRIEVE_WAITING_TIME of
  180. true -> {error, timeout_during_retrieve};
  181. false -> retrieve(undefined, Name, StartTime)
  182. end
  183. end.
  184. unregister(PidInfos) ->
  185. %% unregister in parallel on all nodes
  186. F = fun({Node, NodePidInfos}, Acc) ->
  187. RpcKey = rpc:async_call(Node, ?MODULE, unregister_on_node, [NodePidInfos]),
  188. [{Node, RpcKey} | Acc]
  189. end,
  190. RpcKeys = lists:foldl(F, [], PidInfos),
  191. %% wait for unregistration to complete on all nodes
  192. FResult = fun({Node, RpcKey}) ->
  193. Unregistered = rpc:yield(RpcKey),
  194. io:format(" Unregistered ~p processes on node ~p~n", [Unregistered, Node])
  195. end,
  196. lists:foreach(FResult, RpcKeys).
  197. unregister_on_node(NodePidInfos) ->
  198. F = fun({Name, _Pid}) ->
  199. syn:unregister(Name)
  200. end,
  201. lists:foreach(F, NodePidInfos),
  202. length(NodePidInfos).
  203. kill_processes(PidInfos) ->
  204. F = fun({_Node, NodePidInfos}) ->
  205. [exit(Pid, kill) || {_Name, Pid} <- NodePidInfos]
  206. end,
  207. lists:foreach(F, PidInfos).
  208. epoch_time_ms() ->
  209. {Mega, Sec, Micro} = os:timestamp(),
  210. (Mega * 1000000 + Sec) * 1000 + round(Micro / 1000).