Browse Source

Add multi_call.

Roberto Ostinelli 3 years ago
parent
commit
2b2f48126c
4 changed files with 208 additions and 15 deletions
  1. 20 0
      src/syn.erl
  2. 71 0
      src/syn_groups.erl
  3. 0 1
      src/syn_registry.erl
  4. 117 14
      test/syn_groups_SUITE.erl

+ 20 - 0
src/syn.erl

@@ -47,6 +47,7 @@
 -export([groups_count/1, groups_count/2]).
 -export([groups_count/1, groups_count/2]).
 -export([publish/2, publish/3]).
 -export([publish/2, publish/3]).
 -export([local_publish/2, local_publish/3]).
 -export([local_publish/2, local_publish/3]).
+-export([multi_call/2, multi_call/3, multi_call/4, multi_call_reply/2]).
 
 
 %% ===================================================================
 %% ===================================================================
 %% API
 %% API
@@ -224,3 +225,22 @@ local_publish(GroupName, Message) ->
 -spec local_publish(Scope :: atom(), GroupName :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
 -spec local_publish(Scope :: atom(), GroupName :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
 local_publish(Scope, GroupName, Message) ->
 local_publish(Scope, GroupName, Message) ->
     syn_groups:local_publish(Scope, GroupName, Message).
     syn_groups:local_publish(Scope, GroupName, Message).
+
+-spec multi_call(GroupName :: any(), Message :: any()) ->
+    {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
+multi_call(GroupName, Message) ->
+    syn_groups:multi_call(GroupName, Message).
+
+-spec multi_call(Scope :: atom(), GroupName :: any(), Message :: any()) ->
+    {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
+multi_call(Scope, GroupName, Message) ->
+    syn_groups:multi_call(Scope, GroupName, Message).
+
+-spec multi_call(Scope :: atom(), GroupName :: any(), Message :: any(), Timeout :: non_neg_integer()) ->
+    {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
+multi_call(Scope, GroupName, Message, Timeout) ->
+    syn_groups:multi_call(Scope, GroupName, Message, Timeout).
+
+-spec multi_call_reply(CallerPid :: pid(), Reply :: any()) -> {syn_multi_call_reply, pid(), Reply :: any()}.
+multi_call_reply(CallerPid, Reply) ->
+    syn_groups:multi_call_reply(CallerPid, Reply).

+ 71 - 0
src/syn_groups.erl

@@ -38,6 +38,7 @@
 -export([count/1, count/2]).
 -export([count/1, count/2]).
 -export([publish/2, publish/3]).
 -export([publish/2, publish/3]).
 -export([local_publish/2, local_publish/3]).
 -export([local_publish/2, local_publish/3]).
+-export([multi_call/2, multi_call/3, multi_call/4, multi_call_reply/2]).
 
 
 %% syn_gen_scope callbacks
 %% syn_gen_scope callbacks
 -export([
 -export([
@@ -49,6 +50,12 @@
     purge_local_data_for_node/2
     purge_local_data_for_node/2
 ]).
 ]).
 
 
+%% internal
+-export([multi_call_and_receive/5]).
+
+%% macros
+-define(DEFAULT_MULTI_CALL_TIMEOUT_MS, 5000).
+
 %% includes
 %% includes
 -include("syn.hrl").
 -include("syn.hrl").
 
 
@@ -231,6 +238,28 @@ do_publish(Members, Message) ->
     end, Members),
     end, Members),
     {ok, length(Members)}.
     {ok, length(Members)}.
 
 
+-spec multi_call(GroupName :: any(), Message :: any()) -> {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
+multi_call(GroupName, Message) ->
+    multi_call(?DEFAULT_SCOPE, GroupName, Message).
+
+-spec multi_call(Scope :: atom(), GroupName :: any(), Message :: any()) -> {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
+multi_call(Scope, GroupName, Message) ->
+    multi_call(Scope, GroupName, Message, ?DEFAULT_MULTI_CALL_TIMEOUT_MS).
+
+-spec multi_call(Scope :: atom(), GroupName :: any(), Message :: any(), Timeout :: non_neg_integer()) ->
+    {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
+multi_call(Scope, GroupName, Message, Timeout) ->
+    Self = self(),
+    Members = members(Scope, GroupName),
+    lists:foreach(fun({Pid, Meta}) ->
+        spawn_link(?MODULE, multi_call_and_receive, [Self, Pid, Meta, Message, Timeout])
+    end, Members),
+    collect_replies(Members).
+
+-spec multi_call_reply(CallerPid :: pid(), Reply :: any()) -> {syn_multi_call_reply, pid(), Reply :: any()}.
+multi_call_reply(CallerPid, Reply) ->
+    CallerPid ! {syn_multi_call_reply, self(), Reply}.
+
 %% ===================================================================
 %% ===================================================================
 %% Callbacks
 %% Callbacks
 %% ===================================================================
 %% ===================================================================
@@ -575,3 +604,45 @@ handle_groups_sync(GroupName, Pid, Meta, Time, #state{
             %% race condition: incoming data is older, ignore
             %% race condition: incoming data is older, ignore
             ok
             ok
     end.
     end.
+
+-spec multi_call_and_receive(
+    CollectorPid :: pid(),
+    Pid :: pid(),
+    Meta :: term(),
+    Message :: any(),
+    Timeout :: non_neg_integer()
+) -> any().
+multi_call_and_receive(CollectorPid, Pid, Meta, Message, Timeout) ->
+    %% monitor
+    MonitorRef = monitor(process, Pid),
+    %% send
+    Pid ! {syn_multi_call, self(), Meta, Message},
+    %% wait for reply
+    receive
+        {syn_multi_call_reply, Pid, Reply} ->
+            CollectorPid ! {reply, Pid, Reply};
+
+        {'DOWN', MonitorRef, _, _, _} ->
+            CollectorPid ! {bad_pid, Pid}
+
+    after Timeout ->
+        CollectorPid ! {bad_pid, Pid}
+    end.
+
+-spec collect_replies(Members :: [{pid(), Meta :: any()}]) -> {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
+collect_replies(Members) ->
+    collect_replies(Members, [], []).
+
+-spec collect_replies(MemberPids :: [{pid(), Meta :: any()}], [{pid(), Reply :: any()}], [pid()]) ->
+    {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
+collect_replies([], Replies, BadPids) -> {Replies, BadPids};
+collect_replies(Members, Replies, BadPids) ->
+    receive
+        {reply, Pid, Reply} ->
+            {value, {Pid, Meta}, Members1} = lists:keytake(Pid, 1, Members),
+            collect_replies(Members1, [{{Pid, Meta}, Reply} | Replies], BadPids);
+
+        {bad_pid, Pid} ->
+            {value, {Pid, Meta}, Members1} = lists:keytake(Pid, 1, Members),
+            collect_replies(Members1, Replies, [{Pid, Meta} | BadPids])
+    end.

+ 0 - 1
src/syn_registry.erl

@@ -189,7 +189,6 @@ init(State) ->
     {stop, Reason :: term(), Reply :: term(), #state{}} |
     {stop, Reason :: term(), Reply :: term(), #state{}} |
     {stop, Reason :: term(), #state{}}.
     {stop, Reason :: term(), #state{}}.
 handle_call({register_on_node, RequesterNode, Name, Pid, Meta}, _From, #state{
 handle_call({register_on_node, RequesterNode, Name, Pid, Meta}, _From, #state{
-    scope = Scope,
     table_by_name = TableByName,
     table_by_name = TableByName,
     table_by_pid = TableByPid
     table_by_pid = TableByPid
 } = State) ->
 } = State) ->

+ 117 - 14
test/syn_groups_SUITE.erl

@@ -40,12 +40,15 @@
     three_nodes_cluster_changes/1,
     three_nodes_cluster_changes/1,
     three_nodes_custom_event_handler_joined_left/1,
     three_nodes_custom_event_handler_joined_left/1,
     three_nodes_publish_default_scope/1,
     three_nodes_publish_default_scope/1,
-    three_nodes_publish_custom_scope/1
+    three_nodes_publish_custom_scope/1,
+    three_nodes_multi_call_default_scope/1,
+    three_nodes_multi_call_custom_scope/1
 ]).
 ]).
 
 
 %% internals
 %% internals
 -export([
 -export([
-    subscriber_loop/2
+    subscriber_loop/2,
+    recipient_loop/0
 ]).
 ]).
 
 
 %% include
 %% include
@@ -90,7 +93,9 @@ groups() ->
             three_nodes_cluster_changes,
             three_nodes_cluster_changes,
             three_nodes_custom_event_handler_joined_left,
             three_nodes_custom_event_handler_joined_left,
             three_nodes_publish_default_scope,
             three_nodes_publish_default_scope,
-            three_nodes_publish_custom_scope
+            three_nodes_publish_custom_scope,
+            three_nodes_multi_call_default_scope,
+            three_nodes_multi_call_custom_scope
         ]}
         ]}
     ].
     ].
 %% -------------------------------------------------------------------
 %% -------------------------------------------------------------------
@@ -1658,8 +1663,8 @@ three_nodes_publish_default_scope(Config) ->
     ]),
     ]),
     syn_test_suite_helper:assert_empty_queue(self()),
     syn_test_suite_helper:assert_empty_queue(self()),
 
 
-    %% non-existent
-    {ok, 0} = syn:publish(<<"non-existent">>, TestMessage),
+    %% non-existant
+    {ok, 0} = syn:publish(<<"non-existant">>, TestMessage),
     syn_test_suite_helper:assert_empty_queue(self()),
     syn_test_suite_helper:assert_empty_queue(self()),
 
 
     %% ---> publish local
     %% ---> publish local
@@ -1670,8 +1675,8 @@ three_nodes_publish_default_scope(Config) ->
     ]),
     ]),
     syn_test_suite_helper:assert_empty_queue(self()),
     syn_test_suite_helper:assert_empty_queue(self()),
 
 
-    %% non-existent
-    {ok, 0} = syn:local_publish(<<"non-existent">>, TestMessage),
+    %% non-existant
+    {ok, 0} = syn:local_publish(<<"non-existant">>, TestMessage),
     syn_test_suite_helper:assert_empty_queue(self()).
     syn_test_suite_helper:assert_empty_queue(self()).
 
 
 three_nodes_publish_custom_scope(Config) ->
 three_nodes_publish_custom_scope(Config) ->
@@ -1703,8 +1708,8 @@ three_nodes_publish_custom_scope(Config) ->
     ok = syn:join(custom_scope_ab, <<"subscribers">>, Pid),
     ok = syn:join(custom_scope_ab, <<"subscribers">>, Pid),
     ok = syn:join(custom_scope_ab, <<"subscribers-2">>, Pid2),
     ok = syn:join(custom_scope_ab, <<"subscribers-2">>, Pid2),
     ok = syn:join(custom_scope_ab, <<"subscribers">>, PidRemoteOn1),
     ok = syn:join(custom_scope_ab, <<"subscribers">>, PidRemoteOn1),
-    ok = rpc:call(SlaveNode1, syn, join, [custom_scope_bc, <<"subscribers-bc">>, PidRemoteOn1]),
-    ok = rpc:call(SlaveNode2, syn, join, [custom_scope_bc, <<"subscribers-bc">>, PidRemoteOn2]),
+    ok = rpc:call(SlaveNode1, syn, join, [custom_scope_bc, <<"subscribers">>, PidRemoteOn1]),
+    ok = rpc:call(SlaveNode2, syn, join, [custom_scope_bc, <<"subscribers">>, PidRemoteOn2]),
 
 
     %% ---> publish
     %% ---> publish
     {ok, 2} = syn:publish(custom_scope_ab, <<"subscribers">>, TestMessage),
     {ok, 2} = syn:publish(custom_scope_ab, <<"subscribers">>, TestMessage),
@@ -1719,7 +1724,7 @@ three_nodes_publish_custom_scope(Config) ->
     {'EXIT', {{invalid_scope, custom_scope_bc}, _}} = catch syn:publish(custom_scope_bc, <<"subscribers">>, TestMessage),
     {'EXIT', {{invalid_scope, custom_scope_bc}, _}} = catch syn:publish(custom_scope_bc, <<"subscribers">>, TestMessage),
 
 
     %% on other scope
     %% on other scope
-    {ok, 2} = rpc:call(SlaveNode1, syn, publish, [custom_scope_bc, <<"subscribers-bc">>, TestMessage]),
+    {ok, 2} = rpc:call(SlaveNode1, syn, publish, [custom_scope_bc, <<"subscribers">>, TestMessage]),
 
 
     syn_test_suite_helper:assert_received_messages([
     syn_test_suite_helper:assert_received_messages([
         {done, PidRemoteOn1},
         {done, PidRemoteOn1},
@@ -1727,8 +1732,8 @@ three_nodes_publish_custom_scope(Config) ->
     ]),
     ]),
     syn_test_suite_helper:assert_empty_queue(self()),
     syn_test_suite_helper:assert_empty_queue(self()),
 
 
-    %% non-existent
-    {ok, 0} = syn:publish(custom_scope_ab, <<"non-existent">>, TestMessage),
+    %% non-existant
+    {ok, 0} = syn:publish(custom_scope_ab, <<"non-existant">>, TestMessage),
     syn_test_suite_helper:assert_empty_queue(self()),
     syn_test_suite_helper:assert_empty_queue(self()),
 
 
     %% ---> publish local
     %% ---> publish local
@@ -1739,10 +1744,101 @@ three_nodes_publish_custom_scope(Config) ->
     ]),
     ]),
     syn_test_suite_helper:assert_empty_queue(self()),
     syn_test_suite_helper:assert_empty_queue(self()),
 
 
-    %% non-existent
-    {ok, 0} = syn:local_publish(custom_scope_ab, <<"non-existent">>, TestMessage),
+    %% non-existant
+    {ok, 0} = syn:local_publish(custom_scope_ab, <<"non-existant">>, TestMessage),
     syn_test_suite_helper:assert_empty_queue(self()).
     syn_test_suite_helper:assert_empty_queue(self()).
 
 
+three_nodes_multi_call_default_scope(Config) ->
+    %% get slaves
+    SlaveNode1 = proplists:get_value(slave_node_1, Config),
+    SlaveNode2 = proplists:get_value(slave_node_2, Config),
+
+    %% start syn on nodes
+    ok = syn:start(),
+    ok = rpc:call(SlaveNode1, syn, start, []),
+    ok = rpc:call(SlaveNode2, syn, start, []),
+
+    %% start processes
+    RecipientLoopLate = fun() -> timer:sleep(200), recipient_loop() end,
+
+    Pid = syn_test_suite_helper:start_process(fun recipient_loop/0),
+    Pid2 = syn_test_suite_helper:start_process(RecipientLoopLate),
+    PidRemoteOn1 = syn_test_suite_helper:start_process(SlaveNode1, fun recipient_loop/0),
+    PidRemoteOn2 = syn_test_suite_helper:start_process(SlaveNode2, fun recipient_loop/0),
+
+    %% join
+    ok = syn:join(<<"recipients">>, Pid, "meta-1"),
+    ok = syn:join(<<"recipients">>, Pid2),
+    ok = syn:join(<<"recipients">>, PidRemoteOn1, "meta-on-1"),
+    ok = syn:join(<<"recipients">>, PidRemoteOn2, "meta-on-2"),
+
+    %% ---> multi_call
+    {Replies, BadReplies} = syn:multi_call(default, <<"recipients">>, test_message, 100),
+
+    RepliesSorted = lists:sort(Replies),
+    RepliesSorted = lists:sort([
+        {{Pid, "meta-1"}, {reply, Pid, "meta-1", test_message}},
+        {{PidRemoteOn1, "meta-on-1"}, {reply, PidRemoteOn1, "meta-on-1", test_message}},
+        {{PidRemoteOn2, "meta-on-2"}, {reply, PidRemoteOn2, "meta-on-2", test_message}}
+    ]),
+    BadReplies = [{Pid2, undefined}],
+
+    %% empty
+    {[], []} = syn:multi_call(default, <<"non-existant">>, test_message, 100).
+
+three_nodes_multi_call_custom_scope(Config) ->
+    %% get slaves
+    SlaveNode1 = proplists:get_value(slave_node_1, Config),
+    SlaveNode2 = proplists:get_value(slave_node_2, Config),
+
+    %% start syn on nodes
+    ok = syn:start(),
+    ok = rpc:call(SlaveNode1, syn, start, []),
+    ok = rpc:call(SlaveNode2, syn, start, []),
+
+    %% add custom scopes
+    ok = syn:add_node_to_scope(custom_scope_ab),
+    ok = rpc:call(SlaveNode1, syn, add_node_to_scopes, [[custom_scope_ab, custom_scope_bc]]),
+    ok = rpc:call(SlaveNode2, syn, add_node_to_scopes, [[custom_scope_bc]]),
+
+    %% start processes
+    RecipientLoopLate = fun() -> timer:sleep(200), recipient_loop() end,
+
+    Pid = syn_test_suite_helper:start_process(fun recipient_loop/0),
+    Pid2 = syn_test_suite_helper:start_process(RecipientLoopLate),
+    PidRemoteOn1 = syn_test_suite_helper:start_process(SlaveNode1, fun recipient_loop/0),
+    PidRemoteOn2 = syn_test_suite_helper:start_process(SlaveNode2, fun recipient_loop/0),
+
+    %% join
+    ok = syn:join(custom_scope_ab, <<"recipients">>, Pid, "meta-1"),
+    ok = syn:join(custom_scope_ab, <<"recipients">>, Pid2),
+    ok = syn:join(custom_scope_ab, <<"recipients">>, PidRemoteOn1, "meta-on-ab-1"),
+    ok = rpc:call(SlaveNode1, syn, join, [custom_scope_bc, <<"recipients">>, PidRemoteOn1, "meta-on-bc-1"]),
+    ok = rpc:call(SlaveNode2, syn, join, [custom_scope_bc, <<"recipients">>, PidRemoteOn2, "meta-on-bc-2"]),
+
+    %% errors
+    {'EXIT', {{invalid_scope, custom_scope_bc}, _}} = catch syn:multi_call(custom_scope_bc, <<"recipients">>, test_message, 100),
+
+    %% ---> multi_call
+    {RepliesAB, BadRepliesAB} = syn:multi_call(custom_scope_ab, <<"recipients">>, test_message_ab, 100),
+
+    RepliesABSorted = lists:sort(RepliesAB),
+    RepliesABSorted = lists:sort([
+        {{Pid, "meta-1"}, {reply, Pid, "meta-1", test_message_ab}},
+        {{PidRemoteOn1, "meta-on-ab-1"}, {reply, PidRemoteOn1, "meta-on-ab-1", test_message_ab}}
+    ]),
+    BadRepliesAB = [{Pid2, undefined}],
+
+    %% different scope
+    {RepliesBC, BadRepliesBC} = rpc:call(SlaveNode1, syn, multi_call, [custom_scope_bc, <<"recipients">>, test_message_bc, 100]),
+
+    RepliesBCSorted = lists:sort(RepliesBC),
+    RepliesBCSorted = lists:sort([
+        {{PidRemoteOn1, "meta-on-bc-1"}, {reply, PidRemoteOn1, "meta-on-bc-1", test_message_bc}},
+        {{PidRemoteOn2, "meta-on-bc-2"}, {reply, PidRemoteOn2, "meta-on-bc-2", test_message_bc}}
+    ]),
+    BadRepliesBC = [].
+
 %% ===================================================================
 %% ===================================================================
 %% Internal
 %% Internal
 %% ===================================================================
 %% ===================================================================
@@ -1752,3 +1848,10 @@ subscriber_loop(TestPid, TestMessage) ->
             TestPid ! {done, self()},
             TestPid ! {done, self()},
             subscriber_loop(TestPid, TestMessage)
             subscriber_loop(TestPid, TestMessage)
     end.
     end.
+
+recipient_loop() ->
+    receive
+        {syn_multi_call, CallerPid, Meta, TestMessage} ->
+            syn:multi_call_reply(CallerPid, {reply, self(), Meta, TestMessage}),
+            recipient_loop()
+    end.