Browse Source

Add group publishing.

Roberto Ostinelli 5 years ago
parent
commit
609822812e
3 changed files with 118 additions and 6 deletions
  1. 5 0
      src/syn.erl
  2. 10 0
      src/syn_groups.erl
  3. 103 6
      test/syn_groups_SUITE.erl

+ 5 - 0
src/syn.erl

@@ -37,6 +37,7 @@
 -export([member/2]).
 -export([get_local_members/1, get_local_members/2]).
 -export([local_member/2]).
+-export([publish/2]).
 
 %% gen_server via interface
 -export([register_name/2, unregister_name/1, whereis_name/1, send/2]).
@@ -147,3 +148,7 @@ get_local_members(GroupName, with_meta) ->
 -spec local_member(GroupName :: any(), Pid :: pid()) -> boolean().
 local_member(GroupName, Pid) ->
     syn_groups:local_member(GroupName, Pid).
+
+-spec publish(Name :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
+publish(Name, Message) ->
+    syn_groups:publish(Name, Message).

+ 10 - 0
src/syn_groups.erl

@@ -34,6 +34,7 @@
 -export([member/2]).
 -export([get_local_members/1, get_local_members/2]).
 -export([local_member/2]).
+-export([publish/2]).
 
 %% sync API
 -export([sync_join/3, sync_leave/2]).
@@ -132,6 +133,15 @@ local_member(Pid, GroupName) ->
         _ -> false
     end.
 
+-spec publish(GroupName :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
+publish(GroupName, Message) ->
+    MemberPids = get_members(GroupName),
+    FSend = fun(Pid) ->
+        Pid ! Message
+    end,
+    lists:foreach(FSend, MemberPids),
+    {ok, length(MemberPids)}.
+
 -spec sync_join(GroupName :: any(), Pid :: pid(), Meta :: any()) -> ok.
 sync_join(GroupName, Pid, Meta) ->
     gen_server:cast(?MODULE, {sync_join, GroupName, Pid, Meta}).

+ 103 - 6
test/syn_groups_SUITE.erl

@@ -35,11 +35,13 @@
 -export([
     single_node_join_and_monitor/1,
     single_node_join_and_leave/1,
-    single_node_join_errors/1
+    single_node_join_errors/1,
+    single_node_publish/1
 ]).
 -export([
     two_nodes_join_monitor_and_unregister/1,
-    two_nodes_local_members/1
+    two_nodes_local_members/1,
+    two_nodes_publish/1
 ]).
 -export([
     three_nodes_partial_netsplit_consistency/1,
@@ -85,11 +87,13 @@ groups() ->
         {single_node_groups, [shuffle], [
             single_node_join_and_monitor,
             single_node_join_and_leave,
-            single_node_join_errors
+            single_node_join_errors,
+            single_node_publish
         ]},
         {two_nodes_groups, [shuffle], [
             two_nodes_join_monitor_and_unregister,
-            two_nodes_local_members
+            two_nodes_local_members,
+            two_nodes_publish
         ]},
         {three_nodes_groups, [shuffle], [
             three_nodes_partial_netsplit_consistency,
@@ -277,6 +281,43 @@ single_node_join_errors(_Config) ->
     %% kill processes
     syn_test_suite_helper:kill_process(Pid).
 
+single_node_publish(_Config) ->
+    GroupName = "my group",
+    Message = {test, message},
+    %% start
+    ok = syn:start(),
+    %% start processes
+    ResultPid = self(),
+    F = fun() ->
+        receive
+            Message -> ResultPid ! {received, self(), Message}
+        end
+    end,
+    Pid = syn_test_suite_helper:start_process(F),
+    Pid2 = syn_test_suite_helper:start_process(F),
+    OtherPid = syn_test_suite_helper:start_process(F),
+    %% join
+    ok = syn:join(GroupName, Pid),
+    ok = syn:join(GroupName, Pid2),
+    true = syn:member(Pid, GroupName),
+    true = syn:member(Pid2, GroupName),
+    %% send
+    {ok, 2} = syn:publish(GroupName, Message),
+    %% check
+    receive
+        {received, Pid, Message} -> ok
+    after 2000 ->
+        ok = published_message_was_not_received_by_pid_1
+    end,
+    receive
+        {received, Pid2, Message} -> ok
+    after 2000 ->
+        ok = published_message_was_not_received_by_pid_2
+    end,
+    %% kill processes
+    syn_test_suite_helper:kill_process(Pid),
+    syn_test_suite_helper:kill_process(Pid2).
+
 two_nodes_join_monitor_and_unregister(Config) ->
     GroupName = "my group",
     %% get slave
@@ -418,6 +459,62 @@ two_nodes_local_members(Config) ->
     syn_test_suite_helper:kill_process(LocalPid),
     syn_test_suite_helper:kill_process(RemotePid).
 
+two_nodes_publish(Config) ->
+    GroupName = "my group",
+    Message = {test, message},
+    %% get slave
+    SlaveNode = proplists:get_value(slave_node, Config),
+    %% start
+    ok = syn:start(),
+    ok = rpc:call(SlaveNode, syn, start, []),
+    timer:sleep(100),
+    %% start processes
+    ResultPid = self(),
+    F = fun() ->
+        receive
+            Message -> ResultPid ! {received, self(), Message}
+        end
+    end,
+    LocalPid = syn_test_suite_helper:start_process(F),
+    LocalPid2 = syn_test_suite_helper:start_process(F),
+    RemotePid = syn_test_suite_helper:start_process(SlaveNode, F),
+    RemotePid2 = syn_test_suite_helper:start_process(SlaveNode, F),
+    OtherPid = syn_test_suite_helper:start_process(F),
+    %% join
+    ok = syn:join(GroupName, LocalPid),
+    ok = syn:join(GroupName, LocalPid2),
+    ok = syn:join(GroupName, RemotePid),
+    ok = syn:join(GroupName, RemotePid2),
+    timer:sleep(200),
+    %% send
+    {ok, 4} = syn:publish(GroupName, Message),
+    %% check
+    receive
+        {received, LocalPid, Message} -> ok
+    after 2000 ->
+        ok = published_message_was_not_received_by_local_pid_1
+    end,
+    receive
+        {received, LocalPid2, Message} -> ok
+    after 2000 ->
+        ok = published_message_was_not_received_by_local_pid_2
+    end,
+    receive
+        {received, RemotePid, Message} -> ok
+    after 2000 ->
+        ok = published_message_was_not_received_by_remote_pid_2
+    end,
+    receive
+        {received, RemotePid2, Message} -> ok
+    after 2000 ->
+        ok = published_message_was_not_received_by_remote_pid_2
+    end,
+    %% kill processes
+    syn_test_suite_helper:kill_process(LocalPid),
+    syn_test_suite_helper:kill_process(LocalPid2),
+    syn_test_suite_helper:kill_process(RemotePid),
+    syn_test_suite_helper:kill_process(RemotePid2).
+
 three_nodes_partial_netsplit_consistency(Config) ->
     GroupName = "my group",
     %% get slaves
@@ -579,7 +676,7 @@ three_nodes_partial_netsplit_consistency(Config) ->
     true = rpc:call(SlaveNode1, syn, member, [Pid2, GroupName]),
     false = rpc:call(SlaveNode1, syn, member, [OtherPid, GroupName]),
     %% retrieve slave 2
-    true = lists:sort([Pid0,  Pid1, Pid2])
+    true = lists:sort([Pid0, Pid1, Pid2])
         =:= lists:sort(rpc:call(SlaveNode2, syn, get_members, [GroupName])),
     true = lists:sort([
         {Pid0, undefined},
@@ -707,7 +804,7 @@ three_nodes_full_netsplit_consistency(Config) ->
     rpc:call(SlaveNode1, syn_test_suite_helper, connect_node, [SlaveNode2]),
     timer:sleep(1500),
     %% retrieve local
-    true = lists:sort([Pid0,  Pid1, Pid2]) =:= lists:sort(syn:get_members(GroupName)),
+    true = lists:sort([Pid0, Pid1, Pid2]) =:= lists:sort(syn:get_members(GroupName)),
     true = lists:sort([
         {Pid0, undefined},
         {Pid1, undefined},