Browse Source

[optimization] Add multicast pid.

Roberto Ostinelli 3 years ago
parent
commit
369f0b389f
3 changed files with 35 additions and 11 deletions
  1. 3 2
      src/syn.hrl
  2. 31 9
      src/syn_gen_scope.erl
  3. 1 0
      test/syn_registry_SUITE.erl

+ 3 - 2
src/syn.hrl

@@ -62,6 +62,7 @@
     handler = undefined :: undefined | module(),
     handler_state = undefined :: any(),
     scope = default :: atom(),
-    process_name = syn_registry_default :: atom(),
-    nodes = #{} :: #{node() => pid()}
+    process_name :: atom(),
+    nodes = #{} :: #{node() => pid()},
+    multicast_pid :: pid()
 }).

+ 31 - 9
src/syn_gen_scope.erl

@@ -49,6 +49,9 @@
     code_change/3
 ]).
 
+%% internal
+-export([multicast_loop/0]).
+
 %% callbacks
 -callback init(Args :: term()) ->
     {ok, State :: term()}.
@@ -102,16 +105,12 @@ broadcast(Message, State) ->
     broadcast(Message, [], State).
 
 -spec broadcast(Message :: any(), ExcludedNodes :: [node()], #state{}) -> any().
-broadcast(Message, ExcludedNodes, #state{process_name = ProcessName, nodes = Nodes}) ->
-    lists:foreach(fun(RemoteNode) ->
-        erlang:send({ProcessName, RemoteNode}, Message, [noconnect])
-    end, maps:keys(Nodes) -- ExcludedNodes).
+broadcast(Message, ExcludedNodes, #state{multicast_pid = MulticastPid} = State) ->
+    MulticastPid ! {broadcast, Message, ExcludedNodes, State}.
 
 -spec broadcast_all_cluster(Message :: any(), #state{}) -> any().
-broadcast_all_cluster(Message, #state{process_name = ProcessName}) ->
-    lists:foreach(fun(RemoteNode) ->
-        erlang:send({ProcessName, RemoteNode}, Message, [noconnect])
-    end, nodes()).
+broadcast_all_cluster(Message, #state{multicast_pid = MulticastPid} = State) ->
+    MulticastPid ! {broadcast_all_cluster, Message, State}.
 
 -spec send_to_node(RemoteNode :: node(), Message :: any(), #state{}) -> any().
 send_to_node(RemoteNode, Message, #state{process_name = ProcessName}) ->
@@ -131,6 +130,8 @@ send_to_node(RemoteNode, Message, #state{process_name = ProcessName}) ->
     {stop, Reason :: any()} |
     {continue, any()}.
 init([Handler, Scope, ProcessName, Args]) ->
+    %% start multicast process
+    MulticastPid = spawn_link(?MODULE, multicast_loop, []),
     %% call init
     {ok, HandlerState} = Handler:init(Args),
     %% monitor nodes
@@ -140,7 +141,8 @@ init([Handler, Scope, ProcessName, Args]) ->
         handler = Handler,
         handler_state = HandlerState,
         scope = Scope,
-        process_name = ProcessName
+        process_name = ProcessName,
+        multicast_pid = MulticastPid
     },
     {ok, State, {continue, after_init}}.
 
@@ -296,3 +298,23 @@ get_process_name_for_scope(Handler, Scope) ->
     ModuleBin = atom_to_binary(Handler),
     ScopeBin = atom_to_binary(Scope),
     binary_to_atom(<<ModuleBin/binary, "_", ScopeBin/binary>>).
+
+
+-spec multicast_loop() -> terminated.
+multicast_loop() ->
+    receive
+        {broadcast, Message, ExcludedNodes, #state{process_name = ProcessName, nodes = Nodes}} ->
+            lists:foreach(fun(RemoteNode) ->
+                erlang:send({ProcessName, RemoteNode}, Message, [noconnect])
+            end, maps:keys(Nodes) -- ExcludedNodes),
+            multicast_loop();
+
+        {broadcast_all_cluster, Message, #state{process_name = ProcessName}} ->
+            lists:foreach(fun(RemoteNode) ->
+                erlang:send({ProcessName, RemoteNode}, Message, [noconnect])
+            end, nodes()),
+            multicast_loop();
+
+        terminate ->
+            terminated
+    end.

+ 1 - 0
test/syn_registry_SUITE.erl

@@ -828,6 +828,7 @@ three_nodes_cluster_conflicts(Config) ->
     syn_test_suite_helper:assert_cluster(node(), [SlaveNode1, SlaveNode2]),
     syn_test_suite_helper:assert_cluster(SlaveNode1, [node(), SlaveNode2]),
     syn_test_suite_helper:assert_cluster(SlaveNode2, [node(), SlaveNode1]),
+    timer:sleep(500),
 
     %% retrieve
     {Pid2RemoteOn2, "meta-2"} = syn:lookup("proc-confict"),