Roberto Ostinelli 3 лет назад
Родитель
Сommit
3d329c020f
3 измененных файлов с 186 добавлено и 2 удалено
  1. 18 0
      src/syn.erl
  2. 26 0
      src/syn_groups.erl
  3. 142 2
      test/syn_groups_SUITE.erl

+ 18 - 0
src/syn.erl

@@ -45,6 +45,8 @@
 -export([join/2, join/3, join/4]).
 -export([leave/2, leave/3]).
 -export([groups_count/1, groups_count/2]).
+-export([publish/2, publish/3]).
+-export([local_publish/2, local_publish/3]).
 
 %% ===================================================================
 %% API
@@ -206,3 +208,19 @@ groups_count(Scope) ->
 -spec groups_count(Scope :: atom(), Node :: node()) -> non_neg_integer().
 groups_count(Scope, Node) ->
     syn_groups:count(Scope, Node).
+
+-spec publish(GroupName :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
+publish(GroupName, Message) ->
+    syn_groups:publish(GroupName, Message).
+
+-spec publish(Scope :: atom(), GroupName :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
+publish(Scope, GroupName, Message) ->
+    syn_groups:publish(Scope, GroupName, Message).
+
+-spec local_publish(GroupName :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
+local_publish(GroupName, Message) ->
+    syn_groups:local_publish(GroupName, Message).
+
+-spec local_publish(Scope :: atom(), GroupName :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
+local_publish(Scope, GroupName, Message) ->
+    syn_groups:local_publish(Scope, GroupName, Message).

+ 26 - 0
src/syn_groups.erl

@@ -36,6 +36,8 @@
 -export([local_members/1, local_members/2]).
 -export([is_local_member/2, is_local_member/3]).
 -export([count/1, count/2]).
+-export([publish/2, publish/3]).
+-export([local_publish/2, local_publish/3]).
 
 %% syn_gen_scope callbacks
 -export([
@@ -211,6 +213,30 @@ count(Scope, Node) ->
             sets:size(Set)
     end.
 
+-spec publish(GroupName :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
+publish(GroupName, Message) ->
+    publish(?DEFAULT_SCOPE, GroupName, Message).
+
+-spec publish(Scope :: atom(), GroupName :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
+publish(Scope, GroupName, Message) ->
+    Members = members(Scope, GroupName),
+    lists:foreach(fun({Pid, _Meta}) ->
+        Pid ! Message
+    end, Members),
+    {ok, length(Members)}.
+
+-spec local_publish(GroupName :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
+local_publish(GroupName, Message) ->
+    local_publish(?DEFAULT_SCOPE, GroupName, Message).
+
+-spec local_publish(Scope :: atom(), GroupName :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
+local_publish(Scope, GroupName, Message) ->
+    Members = local_members(Scope, GroupName),
+    lists:foreach(fun({Pid, _Meta}) ->
+        Pid ! Message
+    end, Members),
+    {ok, length(Members)}.
+
 %% ===================================================================
 %% Callbacks
 %% ===================================================================

+ 142 - 2
test/syn_groups_SUITE.erl

@@ -38,7 +38,14 @@
     three_nodes_join_leave_and_monitor_default_scope/1,
     three_nodes_join_leave_and_monitor_custom_scope/1,
     three_nodes_cluster_changes/1,
-    three_nodes_custom_event_handler_joined_left/1
+    three_nodes_custom_event_handler_joined_left/1,
+    three_nodes_publish_default_scope/1,
+    three_nodes_publish_custom_scope/1
+]).
+
+%% internals
+-export([
+    subscriber_loop/2
 ]).
 
 %% include
@@ -81,7 +88,9 @@ groups() ->
             three_nodes_join_leave_and_monitor_default_scope,
             three_nodes_join_leave_and_monitor_custom_scope,
             three_nodes_cluster_changes,
-            three_nodes_custom_event_handler_joined_left
+            three_nodes_custom_event_handler_joined_left,
+            three_nodes_publish_default_scope,
+            three_nodes_publish_custom_scope
         ]}
     ].
 %% -------------------------------------------------------------------
@@ -1612,3 +1621,134 @@ three_nodes_custom_event_handler_joined_left(Config) ->
         ok,
         fun() -> syn_test_suite_helper:assert_empty_queue(self()) end
     ).
+
+three_nodes_publish_default_scope(Config) ->
+    %% get slaves
+    SlaveNode1 = proplists:get_value(slave_node_1, Config),
+    SlaveNode2 = proplists:get_value(slave_node_2, Config),
+
+    %% start syn on nodes
+    ok = syn:start(),
+    ok = rpc:call(SlaveNode1, syn, start, []),
+    ok = rpc:call(SlaveNode2, syn, start, []),
+
+    %% start processes
+    TestMessage = test_message,
+    TestPid = self(),
+    SubscriberLoop = fun() -> subscriber_loop(TestPid, TestMessage) end,
+
+    Pid = syn_test_suite_helper:start_process(SubscriberLoop),
+    OtherPid = syn_test_suite_helper:start_process(SubscriberLoop),
+    PidRemoteOn1 = syn_test_suite_helper:start_process(SlaveNode1, SubscriberLoop),
+    PidRemoteOn2 = syn_test_suite_helper:start_process(SlaveNode2, SubscriberLoop),
+
+    %% join
+    ok = syn:join(<<"subscribers">>, Pid),
+    ok = syn:join(<<"ignore">>, OtherPid),
+    ok = syn:join(<<"subscribers">>, PidRemoteOn1),
+    ok = syn:join(<<"subscribers">>, PidRemoteOn2),
+
+    %% ---> publish
+    {ok, 3} = syn:publish(<<"subscribers">>, TestMessage),
+
+    syn_test_suite_helper:assert_received_messages([
+        {done, Pid},
+        {done, PidRemoteOn1},
+        {done, PidRemoteOn2}
+    ]),
+    syn_test_suite_helper:assert_empty_queue(self()),
+
+    %% non-existent
+    {ok, 0} = syn:publish(<<"non-existent">>, TestMessage),
+    syn_test_suite_helper:assert_empty_queue(self()),
+
+    %% ---> publish local
+    {ok, 1} = syn:local_publish(<<"subscribers">>, test_message),
+
+    syn_test_suite_helper:assert_received_messages([
+        {done, Pid}
+    ]),
+    syn_test_suite_helper:assert_empty_queue(self()),
+
+    %% non-existent
+    {ok, 0} = syn:local_publish(<<"non-existent">>, TestMessage),
+    syn_test_suite_helper:assert_empty_queue(self()).
+
+three_nodes_publish_custom_scope(Config) ->
+    %% get slaves
+    SlaveNode1 = proplists:get_value(slave_node_1, Config),
+    SlaveNode2 = proplists:get_value(slave_node_2, Config),
+
+    %% start syn on nodes
+    ok = syn:start(),
+    ok = rpc:call(SlaveNode1, syn, start, []),
+    ok = rpc:call(SlaveNode2, syn, start, []),
+
+    %% add custom scopes
+    ok = syn:add_node_to_scope(custom_scope_ab),
+    ok = rpc:call(SlaveNode1, syn, add_node_to_scopes, [[custom_scope_ab, custom_scope_bc]]),
+    ok = rpc:call(SlaveNode2, syn, add_node_to_scopes, [[custom_scope_bc]]),
+
+    %% start processes
+    TestMessage = test_message,
+    TestPid = self(),
+    SubscriberLoop = fun() -> subscriber_loop(TestPid, TestMessage) end,
+
+    Pid = syn_test_suite_helper:start_process(SubscriberLoop),
+    Pid2 = syn_test_suite_helper:start_process(SubscriberLoop),
+    PidRemoteOn1 = syn_test_suite_helper:start_process(SlaveNode1, SubscriberLoop),
+    PidRemoteOn2 = syn_test_suite_helper:start_process(SlaveNode2, SubscriberLoop),
+
+    %% join
+    ok = syn:join(custom_scope_ab, <<"subscribers">>, Pid),
+    ok = syn:join(custom_scope_ab, <<"subscribers-2">>, Pid2),
+    ok = syn:join(custom_scope_ab, <<"subscribers">>, PidRemoteOn1),
+    ok = rpc:call(SlaveNode1, syn, join, [custom_scope_bc, <<"subscribers-bc">>, PidRemoteOn1]),
+    ok = rpc:call(SlaveNode2, syn, join, [custom_scope_bc, <<"subscribers-bc">>, PidRemoteOn2]),
+
+    %% ---> publish
+    {ok, 2} = syn:publish(custom_scope_ab, <<"subscribers">>, TestMessage),
+
+    syn_test_suite_helper:assert_received_messages([
+        {done, Pid},
+        {done, PidRemoteOn1}
+    ]),
+    syn_test_suite_helper:assert_empty_queue(self()),
+
+    %% errors
+    {'EXIT', {{invalid_scope, custom_scope_bc}, _}} = catch syn:publish(custom_scope_bc, <<"subscribers">>, TestMessage),
+
+    %% on other scope
+    {ok, 2} = rpc:call(SlaveNode1, syn, publish, [custom_scope_bc, <<"subscribers-bc">>, TestMessage]),
+
+    syn_test_suite_helper:assert_received_messages([
+        {done, PidRemoteOn1},
+        {done, PidRemoteOn2}
+    ]),
+    syn_test_suite_helper:assert_empty_queue(self()),
+
+    %% non-existent
+    {ok, 0} = syn:publish(custom_scope_ab, <<"non-existent">>, TestMessage),
+    syn_test_suite_helper:assert_empty_queue(self()),
+
+    %% ---> publish local
+    {ok, 1} = syn:local_publish(custom_scope_ab, <<"subscribers">>, test_message),
+
+    syn_test_suite_helper:assert_received_messages([
+        {done, Pid}
+    ]),
+    syn_test_suite_helper:assert_empty_queue(self()),
+
+    %% non-existent
+    {ok, 0} = syn:local_publish(custom_scope_ab, <<"non-existent">>, TestMessage),
+    syn_test_suite_helper:assert_empty_queue(self()).
+
+%% ===================================================================
+%% Internal
+%% ===================================================================
+subscriber_loop(TestPid, TestMessage) ->
+    receive
+        TestMessage ->
+            TestPid ! {done, self()},
+            subscriber_loop(TestPid, TestMessage)
+    end.