Просмотр исходного кода

Sync in spawned processes to avoid blocking main.

Roberto Ostinelli 5 лет назад
Родитель
Сommit
adf09630ff
2 измененных файлов с 39 добавлено и 22 удалено
  1. 19 9
      src/syn_groups.erl
  2. 20 13
      src/syn_registry.erl

+ 19 - 9
src/syn_groups.erl

@@ -237,9 +237,7 @@ handle_call({join_on_node, GroupName, Pid, Meta}, _From, State) ->
         true ->
             join_on_node(GroupName, Pid, Meta),
             %% multicast
-            lists:foreach(fun(RemoteNode) ->
-                sync_join(RemoteNode, GroupName, Pid, Meta)
-            end, nodes()),
+            multicast_join(GroupName, Pid, Meta),
             %% return
             {reply, ok, State};
         _ ->
@@ -250,9 +248,7 @@ handle_call({leave_on_node, GroupName, Pid}, _From, State) ->
     case leave_on_node(GroupName, Pid) of
         ok ->
             %% multicast
-            lists:foreach(fun(RemoteNode) ->
-                sync_leave(RemoteNode, GroupName, Pid)
-            end, nodes()),
+            multicast_leave(GroupName, Pid),
             %% return
             {reply, ok, State};
         {error, Reason} ->
@@ -311,9 +307,7 @@ handle_info({'DOWN', _MonitorRef, process, Pid, Reason}, State) ->
                 %% remove from table
                 remove_from_local_table(Entry),
                 %% multicast
-                lists:foreach(fun(RemoteNode) ->
-                    sync_leave(RemoteNode, GroupName, Pid)
-                end, nodes())
+                multicast_leave(GroupName, Pid)
             end, Entries)
     end,
     %% return
@@ -365,6 +359,22 @@ code_change(_OldVsn, State, _Extra) ->
 %% ===================================================================
 %% Internal
 %% ===================================================================
+-spec multicast_join(GroupName :: any(), Pid :: pid(), Meta :: any()) -> ok.
+multicast_join(GroupName, Pid, Meta) ->
+    spawn_link(fun() ->
+        lists:foreach(fun(RemoteNode) ->
+            sync_join(RemoteNode, GroupName, Pid, Meta)
+        end, nodes())
+    end).
+
+-spec multicast_leave(GroupName :: any(), Pid :: pid()) -> ok.
+multicast_leave(GroupName, Pid) ->
+    spawn_link(fun() ->
+        lists:foreach(fun(RemoteNode) ->
+            sync_leave(RemoteNode, GroupName, Pid)
+        end, nodes())
+    end).
+
 -spec join_on_node(GroupName :: any(), Pid :: pid(), Meta :: any()) -> ok.
 join_on_node(GroupName, Pid, Meta) ->
     MonitorRef = case find_processes_entry_by_pid(Pid) of

+ 20 - 13
src/syn_registry.erl

@@ -176,18 +176,14 @@ handle_call({register_on_node, Name, Pid, Meta}, _From, State) ->
                 undefined ->
                     register_on_node(Name, Pid, Meta),
                     %% multicast
-                    lists:foreach(fun(RemoteNode) ->
-                        sync_register(RemoteNode, Name, Pid, Meta)
-                    end, nodes()),
+                    multicast_register(Name, Pid, Meta),
                     %% return
                     {reply, ok, State};
 
                 Entry when Entry#syn_registry_table.pid == Pid ->
                     register_on_node(Name, Pid, Meta),
                     %% multicast
-                    lists:foreach(fun(RemoteNode) ->
-                        sync_register(RemoteNode, Name, Pid, Meta)
-                    end, nodes()),
+                    multicast_register(Name, Pid, Meta),
                     %% return
                     {reply, ok, State};
 
@@ -201,10 +197,7 @@ handle_call({register_on_node, Name, Pid, Meta}, _From, State) ->
 handle_call({unregister_on_node, Name}, _From, State) ->
     case unregister_on_node(Name) of
         ok ->
-            %% multicast
-            lists:foreach(fun(RemoteNode) ->
-                sync_unregister(RemoteNode, Name)
-            end, nodes()),
+            multicast_unregister(Name),
             %% return
             {reply, ok, State};
         {error, Reason} ->
@@ -265,9 +258,7 @@ handle_info({'DOWN', _MonitorRef, process, Pid, Reason}, State) ->
                 %% remove from table
                 remove_from_local_table(Name),
                 %% multicast
-                lists:foreach(fun(RemoteNode) ->
-                    sync_unregister(RemoteNode, Name)
-                end, nodes())
+                multicast_unregister(Name)
             end, Entries)
     end,
     %% return
@@ -319,6 +310,22 @@ code_change(_OldVsn, State, _Extra) ->
 %% ===================================================================
 %% Internal
 %% ===================================================================
+-spec multicast_register(Name :: any(), Pid :: pid(), Meta :: any()) -> ok.
+multicast_register(Name, Pid, Meta) ->
+    spawn_link(fun() ->
+        lists:foreach(fun(RemoteNode) ->
+            sync_register(RemoteNode, Name, Pid, Meta)
+        end, nodes())
+    end).
+
+-spec multicast_unregister(Name :: any()) -> ok.
+multicast_unregister(Name) ->
+    spawn_link(fun() ->
+        lists:foreach(fun(RemoteNode) ->
+            sync_unregister(RemoteNode, Name)
+        end, nodes())
+    end).
+
 -spec register_on_node(Name :: any(), Pid :: pid(), Meta :: any()) -> ok.
 register_on_node(Name, Pid, Meta) ->
     MonitorRef = case find_processes_entry_by_pid(Pid) of