Browse Source

Allow to publish only to local groups.

Roberto Ostinelli 5 years ago
parent
commit
03500a0654
3 changed files with 104 additions and 14 deletions
  1. 5 0
      src/syn.erl
  2. 10 0
      src/syn_groups.erl
  3. 89 14
      test/syn_groups_SUITE.erl

+ 5 - 0
src/syn.erl

@@ -38,6 +38,7 @@
 -export([get_local_members/1, get_local_members/2]).
 -export([local_member/2]).
 -export([publish/2]).
+-export([publish_to_local/2]).
 
 %% gen_server via interface
 -export([register_name/2, unregister_name/1, whereis_name/1, send/2]).
@@ -152,3 +153,7 @@ local_member(GroupName, Pid) ->
 -spec publish(Name :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
 publish(Name, Message) ->
     syn_groups:publish(Name, Message).
+
+-spec publish_to_local(Name :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
+publish_to_local(Name, Message) ->
+    syn_groups:publish_to_local(Name, Message).

+ 10 - 0
src/syn_groups.erl

@@ -35,6 +35,7 @@
 -export([get_local_members/1, get_local_members/2]).
 -export([local_member/2]).
 -export([publish/2]).
+-export([publish_to_local/2]).
 
 %% sync API
 -export([sync_join/3, sync_leave/2]).
@@ -142,6 +143,15 @@ publish(GroupName, Message) ->
     lists:foreach(FSend, MemberPids),
     {ok, length(MemberPids)}.
 
+-spec publish_to_local(Name :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
+publish_to_local(Name, Message) ->
+    MemberPids = get_local_members(Name),
+    FSend = fun(Pid) ->
+        Pid ! Message
+    end,
+    lists:foreach(FSend, MemberPids),
+    {ok, length(MemberPids)}.
+
 -spec sync_join(GroupName :: any(), Pid :: pid(), Meta :: any()) -> ok.
 sync_join(GroupName, Pid, Meta) ->
     gen_server:cast(?MODULE, {sync_join, GroupName, Pid, Meta}).

+ 89 - 14
test/syn_groups_SUITE.erl

@@ -41,7 +41,8 @@
 -export([
     two_nodes_join_monitor_and_unregister/1,
     two_nodes_local_members/1,
-    two_nodes_publish/1
+    two_nodes_publish/1,
+    two_nodes_local_publish/1
 ]).
 -export([
     three_nodes_partial_netsplit_consistency/1,
@@ -65,9 +66,9 @@
 %% -------------------------------------------------------------------
 all() ->
     [
-        {group, single_node_groups},
-        {group, two_nodes_groups},
-        {group, three_nodes_groups}
+        {group, single_node_groups}
+%%        {group, two_nodes_groups}
+%%        {group, three_nodes_groups}
     ].
 
 %% -------------------------------------------------------------------
@@ -85,15 +86,17 @@ all() ->
 groups() ->
     [
         {single_node_groups, [shuffle], [
-            single_node_join_and_monitor,
-            single_node_join_and_leave,
-            single_node_join_errors,
-            single_node_publish
+%%            single_node_join_and_monitor,
+%%            single_node_join_and_leave,
+%%            single_node_join_errors,
+%%            single_node_publish,
+            single_node_multicall
         ]},
         {two_nodes_groups, [shuffle], [
-            two_nodes_join_monitor_and_unregister,
-            two_nodes_local_members,
-            two_nodes_publish
+%%            two_nodes_join_monitor_and_unregister,
+%%            two_nodes_local_members,
+%%            two_nodes_publish,
+%%            two_nodes_local_publish,
         ]},
         {three_nodes_groups, [shuffle], [
             three_nodes_partial_netsplit_consistency,
@@ -492,7 +495,7 @@ two_nodes_publish(Config) ->
     receive
         {received, LocalPid, Message} -> ok
     after 2000 ->
-        ok = published_message_was_not_received_by_local_pid_1
+        ok = published_message_was_not_received_by_local_pid
     end,
     receive
         {received, LocalPid2, Message} -> ok
@@ -502,18 +505,90 @@ two_nodes_publish(Config) ->
     receive
         {received, RemotePid, Message} -> ok
     after 2000 ->
-        ok = published_message_was_not_received_by_remote_pid_2
+        ok = published_message_was_not_received_by_remote_pid
     end,
     receive
         {received, RemotePid2, Message} -> ok
     after 2000 ->
         ok = published_message_was_not_received_by_remote_pid_2
     end,
+    receive
+        {received, OtherPid, Message} ->
+            ok = published_message_was_received_by_other_pid
+    after 250 ->
+        ok
+    end,
+    %% kill processes
+    syn_test_suite_helper:kill_process(LocalPid),
+    syn_test_suite_helper:kill_process(LocalPid2),
+    syn_test_suite_helper:kill_process(RemotePid),
+    syn_test_suite_helper:kill_process(RemotePid2),
+    syn_test_suite_helper:kill_process(OtherPid).
+
+two_nodes_local_publish(Config) ->
+    GroupName = "my group",
+    Message = {test, message},
+    %% get slave
+    SlaveNode = proplists:get_value(slave_node, Config),
+    %% start
+    ok = syn:start(),
+    ok = rpc:call(SlaveNode, syn, start, []),
+    timer:sleep(100),
+    %% start processes
+    ResultPid = self(),
+    F = fun() ->
+        receive
+            Message -> ResultPid ! {received, self(), Message}
+        end
+    end,
+    LocalPid = syn_test_suite_helper:start_process(F),
+    LocalPid2 = syn_test_suite_helper:start_process(F),
+    RemotePid = syn_test_suite_helper:start_process(SlaveNode, F),
+    RemotePid2 = syn_test_suite_helper:start_process(SlaveNode, F),
+    OtherPid = syn_test_suite_helper:start_process(F),
+    %% join
+    ok = syn:join(GroupName, LocalPid),
+    ok = syn:join(GroupName, LocalPid2),
+    ok = syn:join(GroupName, RemotePid),
+    ok = syn:join(GroupName, RemotePid2),
+    timer:sleep(200),
+    %% send
+    {ok, 2} = syn:publish_to_local(GroupName, Message),
+    %% check
+    receive
+        {received, LocalPid, Message} -> ok
+    after 2000 ->
+        ok = published_message_was_not_received_by_local_pid
+    end,
+    receive
+        {received, LocalPid2, Message} -> ok
+    after 2000 ->
+        ok = published_message_was_not_received_by_local_pid_2
+    end,
+    receive
+        {received, RemotePid, Message} ->
+            ok = published_message_was_received_by_remote_pid
+    after 250 ->
+        ok
+    end,
+    receive
+        {received, RemotePid, Message} ->
+            ok = published_message_was_received_by_remote_pid_2
+    after 250 ->
+        ok
+    end,
+    receive
+        {received, OtherPid, Message} ->
+            ok = published_message_was_received_by_other_pid
+    after 250 ->
+        ok
+    end,
     %% kill processes
     syn_test_suite_helper:kill_process(LocalPid),
     syn_test_suite_helper:kill_process(LocalPid2),
     syn_test_suite_helper:kill_process(RemotePid),
-    syn_test_suite_helper:kill_process(RemotePid2).
+    syn_test_suite_helper:kill_process(RemotePid2),
+    syn_test_suite_helper:kill_process(OtherPid).
 
 three_nodes_partial_netsplit_consistency(Config) ->
     GroupName = "my group",