Просмотр исходного кода

Use a linked process to multicast instead of spawning everytime.

Roberto Ostinelli 5 лет назад
Родитель
Сommit
3cd9694076
3 измененных файлов с 105 добавлено и 47 удалено
  1. 47 22
      src/syn_groups.erl
  2. 57 24
      src/syn_registry.erl
  3. 1 1
      test/syn_groups_SUITE.erl

+ 47 - 22
src/syn_groups.erl

@@ -45,6 +45,9 @@
 -export([force_cluster_sync/0]).
 -export([remove_from_local_table/2]).
 
+%% internal
+-export([multicast_loop/0]).
+
 %% tests
 -ifdef(TEST).
 -export([add_to_local_table/4]).
@@ -60,7 +63,8 @@
 -record(state, {
     custom_event_handler :: undefined | module(),
     anti_entropy_interval_ms :: undefined | non_neg_integer(),
-    anti_entropy_interval_max_deviation_ms :: undefined | non_neg_integer()
+    anti_entropy_interval_max_deviation_ms :: undefined | non_neg_integer(),
+    multicast_pid :: undefined | pid()
 }).
 
 %% macros
@@ -240,6 +244,8 @@ init([]) ->
     ok = net_kernel:monitor_nodes(true),
     %% rebuild
     rebuild_monitors(),
+    %% start multicast process
+    MulticastPid = spawn_link(?MODULE, multicast_loop, []),
     %% get handler
     CustomEventHandler = syn_backbone:get_event_handler_module(),
     %% get anti-entropy interval
@@ -248,7 +254,8 @@ init([]) ->
     State = #state{
         custom_event_handler = CustomEventHandler,
         anti_entropy_interval_ms = AntiEntropyIntervalMs,
-        anti_entropy_interval_max_deviation_ms = AntiEntropyIntervalMaxDeviationMs
+        anti_entropy_interval_max_deviation_ms = AntiEntropyIntervalMaxDeviationMs,
+        multicast_pid = MulticastPid
     },
     %% send message to initiate full cluster sync
     timer:send_after(0, self(), sync_from_full_cluster),
@@ -274,7 +281,7 @@ handle_call({join_on_node, GroupName, Pid, Meta}, _From, State) ->
         true ->
             join_on_node(GroupName, Pid, Meta),
             %% multicast
-            multicast_join(GroupName, Pid, Meta),
+            multicast_join(GroupName, Pid, Meta, State),
             %% return
             {reply, ok, State};
         _ ->
@@ -285,7 +292,7 @@ handle_call({leave_on_node, GroupName, Pid}, _From, State) ->
     case leave_on_node(GroupName, Pid) of
         ok ->
             %% multicast
-            multicast_leave(GroupName, Pid),
+            multicast_leave(GroupName, Pid, State),
             %% return
             {reply, ok, State};
         {error, Reason} ->
@@ -347,7 +354,7 @@ handle_info({'DOWN', _MonitorRef, process, Pid, Reason}, State) ->
                 %% handle
                 handle_process_down(GroupName, Pid, Meta, Reason, State),
                 %% multicast
-                multicast_leave(GroupName, Pid)
+                multicast_leave(GroupName, Pid, State)
             end, GroupTuples)
     end,
     %% return
@@ -394,8 +401,11 @@ handle_info(Info, State) ->
 %% Terminate
 %% ----------------------------------------------------------------------------------------------------------
 -spec terminate(Reason :: any(), #state{}) -> terminated.
-terminate(Reason, _State) ->
+terminate(Reason, #state{
+    multicast_pid = MulticastPid
+}) ->
     error_logger:info_msg("Syn(~p): Terminating with reason: ~p~n", [node(), Reason]),
+    MulticastPid ! terminate,
     terminated.
 
 %% ----------------------------------------------------------------------------------------------------------
@@ -408,21 +418,17 @@ code_change(_OldVsn, State, _Extra) ->
 %% ===================================================================
 %% Internal
 %% ===================================================================
--spec multicast_join(GroupName :: any(), Pid :: pid(), Meta :: any()) -> pid().
-multicast_join(GroupName, Pid, Meta) ->
-    spawn_link(fun() ->
-        lists:foreach(fun(RemoteNode) ->
-            sync_join(RemoteNode, GroupName, Pid, Meta)
-        end, nodes())
-    end).
-
--spec multicast_leave(GroupName :: any(), Pid :: pid()) -> pid().
-multicast_leave(GroupName, Pid) ->
-    spawn_link(fun() ->
-        lists:foreach(fun(RemoteNode) ->
-            sync_leave(RemoteNode, GroupName, Pid)
-        end, nodes())
-    end).
+-spec multicast_join(GroupName :: any(), Pid :: pid(), Meta :: any(), #state{}) -> pid().
+multicast_join(GroupName, Pid, Meta, #state{
+    multicast_pid = MulticastPid
+}) ->
+    MulticastPid ! {multicast_join, GroupName, Pid, Meta}.
+
+-spec multicast_leave(GroupName :: any(), Pid :: pid(), #state{}) -> pid().
+multicast_leave(GroupName, Pid, #state{
+    multicast_pid = MulticastPid
+}) ->
+    MulticastPid ! {multicast_leave, GroupName, Pid}.
 
 -spec join_on_node(GroupName :: any(), Pid :: pid(), Meta :: any()) -> ok.
 join_on_node(GroupName, Pid, Meta) ->
@@ -633,7 +639,7 @@ rebuild_monitors() ->
             true ->
                 join_on_node(GroupName, Pid, Meta);
             _ ->
-                multicast_leave(GroupName, Pid)
+                remove_from_local_table(GroupName, Pid)
         end
     end, GroupTuples).
 
@@ -646,3 +652,22 @@ set_timer_for_anti_entropy(#state{
     IntervalMs = round(AntiEntropyIntervalMs + rand:uniform() * AntiEntropyIntervalMaxDeviationMs),
     {ok, _} = timer:send_after(IntervalMs, self(), sync_anti_entropy),
     ok.
+
+-spec multicast_loop() -> terminated.
+multicast_loop() ->
+    receive
+        {multicast_join, GroupName, Pid, Meta} ->
+            lists:foreach(fun(RemoteNode) ->
+                sync_join(RemoteNode, GroupName, Pid, Meta)
+            end, nodes()),
+            multicast_loop();
+
+        {multicast_leave, GroupName, Pid} ->
+            lists:foreach(fun(RemoteNode) ->
+                sync_leave(RemoteNode, GroupName, Pid)
+            end, nodes()),
+            multicast_loop();
+
+        terminate ->
+            terminated
+    end.

+ 57 - 24
src/syn_registry.erl

@@ -42,6 +42,9 @@
 -export([add_to_local_table/5, remove_from_local_table/2]).
 -export([find_monitor_for_pid/1]).
 
+%% internal
+-export([multicast_loop/0]).
+
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
 
@@ -49,7 +52,8 @@
 -record(state, {
     custom_event_handler :: undefined | module(),
     anti_entropy_interval_ms :: undefined | non_neg_integer(),
-    anti_entropy_interval_max_deviation_ms :: undefined | non_neg_integer()
+    anti_entropy_interval_max_deviation_ms :: undefined | non_neg_integer(),
+    multicast_pid :: undefined | pid()
 }).
 
 %% includes
@@ -173,6 +177,8 @@ init([]) ->
     ok = net_kernel:monitor_nodes(true),
     %% rebuild monitors (if coming after a crash)
     rebuild_monitors(),
+    %% start multicast process
+    MulticastPid = spawn_link(?MODULE, multicast_loop, []),
     %% get handler
     CustomEventHandler = syn_backbone:get_event_handler_module(),
     %% get anti-entropy interval
@@ -181,7 +187,8 @@ init([]) ->
     State = #state{
         custom_event_handler = CustomEventHandler,
         anti_entropy_interval_ms = AntiEntropyIntervalMs,
-        anti_entropy_interval_max_deviation_ms = AntiEntropyIntervalMaxDeviationMs
+        anti_entropy_interval_max_deviation_ms = AntiEntropyIntervalMaxDeviationMs,
+        multicast_pid = MulticastPid
     },
     %% send message to initiate full cluster sync
     timer:send_after(0, self(), sync_from_full_cluster),
@@ -211,7 +218,7 @@ handle_call({register_on_node, Name, Pid, Meta, Force}, _From, State) ->
                     %% available
                     {ok, Time} = register_on_node(Name, Pid, Meta),
                     %% multicast
-                    multicast_register(Name, Pid, Meta, Time, false),
+                    multicast_register(Name, Pid, Meta, Time, false, State),
                     %% return
                     {reply, ok, State};
 
@@ -219,7 +226,7 @@ handle_call({register_on_node, Name, Pid, Meta, Force}, _From, State) ->
                     % same pid, overwrite
                     {ok, Time} = register_on_node(Name, Pid, Meta),
                     %% multicast
-                    multicast_register(Name, Pid, Meta, Time, false),
+                    multicast_register(Name, Pid, Meta, Time, false, State),
                     %% return
                     {reply, ok, State};
 
@@ -231,7 +238,7 @@ handle_call({register_on_node, Name, Pid, Meta, Force}, _From, State) ->
                             %% force register
                             {ok, Time} = register_on_node(Name, Pid, Meta),
                             %% multicast
-                            multicast_register(Name, Pid, Meta, Time, true),
+                            multicast_register(Name, Pid, Meta, Time, true, State),
                             %% return
                             {reply, ok, State};
 
@@ -246,7 +253,7 @@ handle_call({register_on_node, Name, Pid, Meta, Force}, _From, State) ->
 handle_call({unregister_on_node, Name}, _From, State) ->
     case unregister_on_node(Name) of
         {ok, RemovedPid} ->
-            multicast_unregister(Name, RemovedPid),
+            multicast_unregister(Name, RemovedPid, State),
             %% return
             {reply, ok, State};
         {error, Reason} ->
@@ -388,7 +395,7 @@ handle_info({'DOWN', _MonitorRef, process, Pid, Reason}, State) ->
                 %% remove from table
                 remove_from_local_table(Name, Pid),
                 %% multicast
-                multicast_unregister(Name, Pid)
+                multicast_unregister(Name, Pid, State)
             end, Entries)
     end,
     %% return
@@ -435,8 +442,11 @@ handle_info(Info, State) ->
 %% Terminate
 %% ----------------------------------------------------------------------------------------------------------
 -spec terminate(Reason :: any(), #state{}) -> terminated.
-terminate(Reason, _State) ->
+terminate(Reason, #state{
+    multicast_pid = MulticastPid
+}) ->
     error_logger:info_msg("Syn(~p): Terminating with reason: ~p~n", [node(), Reason]),
+    MulticastPid ! terminate,
     terminated.
 
 %% ----------------------------------------------------------------------------------------------------------
@@ -449,21 +459,24 @@ code_change(_OldVsn, State, _Extra) ->
 %% ===================================================================
 %% Internal
 %% ===================================================================
--spec multicast_register(Name :: any(), Pid :: pid(), Meta :: any(), Time :: integer(), Force :: boolean()) -> pid().
-multicast_register(Name, Pid, Meta, Time, Force) ->
-    spawn_link(fun() ->
-        lists:foreach(fun(RemoteNode) ->
-            sync_register(RemoteNode, Name, Pid, Meta, Time, Force)
-        end, nodes())
-    end).
-
--spec multicast_unregister(Name :: any(), Pid :: pid()) -> pid().
-multicast_unregister(Name, Pid) ->
-    spawn_link(fun() ->
-        lists:foreach(fun(RemoteNode) ->
-            sync_unregister(RemoteNode, Name, Pid)
-        end, nodes())
-    end).
+-spec multicast_register(
+    Name :: any(),
+    Pid :: pid(),
+    Meta :: any(),
+    Time :: integer(),
+    Force :: boolean(),
+    #state{}
+) -> pid().
+multicast_register(Name, Pid, Meta, Time, Force, #state{
+    multicast_pid = MulticastPid
+}) ->
+    MulticastPid ! {multicast_register, Name, Pid, Meta, Time, Force}.
+
+-spec multicast_unregister(Name :: any(), Pid :: pid(), #state{}) -> pid().
+multicast_unregister(Name, Pid, #state{
+    multicast_pid = MulticastPid
+}) ->
+    MulticastPid ! {multicast_unregister, Name, Pid}.
 
 -spec register_on_node(Name :: any(), Pid :: pid(), Meta :: any()) -> {ok, Time :: integer()}.
 register_on_node(Name, Pid, Meta) ->
@@ -538,7 +551,8 @@ add_to_local_table(Name, Pid, Meta, Time, MonitorRef) ->
 -spec remove_from_local_table(Name :: any(), Pid :: pid()) -> ok.
 remove_from_local_table(Name, Pid) ->
     ets:delete(syn_registry_by_name, {Name, Pid}),
-    ets:delete(syn_registry_by_pid, {Pid, Name}).
+    ets:delete(syn_registry_by_pid, {Pid, Name}),
+    ok.
 
 -spec find_registry_tuple_by_name(Name :: any()) -> RegistryTuple :: syn_registry_tuple() | undefined.
 find_registry_tuple_by_name(Name) ->
@@ -807,3 +821,22 @@ demonitor_if_local(Pid) ->
         _ ->
             ok
     end.
+
+-spec multicast_loop() -> terminated.
+multicast_loop() ->
+    receive
+        {multicast_register, Name, Pid, Meta, Time, Force} ->
+            lists:foreach(fun(RemoteNode) ->
+                sync_register(RemoteNode, Name, Pid, Meta, Time, Force)
+            end, nodes()),
+            multicast_loop();
+
+        {multicast_unregister, Name, Pid} ->
+            lists:foreach(fun(RemoteNode) ->
+                sync_unregister(RemoteNode, Name, Pid)
+            end, nodes()),
+            multicast_loop();
+
+        terminate ->
+            terminated
+    end.

+ 1 - 1
test/syn_groups_SUITE.erl

@@ -1095,7 +1095,7 @@ three_nodes_full_netsplit_consistency(Config) ->
     timer:sleep(2000),
     %% leave 0Changed
     ok = syn:leave(GroupName, Pid0Changed),
-    timer:sleep(250),
+    timer:sleep(500),
     %% retrieve local
     true = lists:sort([Pid0]) =:= lists:sort(syn:get_members(GroupName)),
     true = lists:sort([