Browse Source

Allow to multi_call group processes.

Roberto Ostinelli 5 years ago
parent
commit
099380894d
3 changed files with 206 additions and 23 deletions
  1. 21 6
      src/syn.erl
  2. 65 3
      src/syn_groups.erl
  3. 120 14
      test/syn_groups_SUITE.erl

+ 21 - 6
src/syn.erl

@@ -39,6 +39,7 @@
 -export([local_member/2]).
 -export([publish/2]).
 -export([publish_to_local/2]).
+-export([multi_call/2, multi_call/3, multi_call_reply/2]).
 
 %% gen_server via interface
 -export([register_name/2, unregister_name/1, whereis_name/1, send/2]).
@@ -150,10 +151,24 @@ get_local_members(GroupName, with_meta) ->
 local_member(GroupName, Pid) ->
     syn_groups:local_member(GroupName, Pid).
 
--spec publish(Name :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
-publish(Name, Message) ->
-    syn_groups:publish(Name, Message).
+-spec publish(GroupName :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
+publish(GroupName, Message) ->
+    syn_groups:publish(GroupName, Message).
 
--spec publish_to_local(Name :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
-publish_to_local(Name, Message) ->
-    syn_groups:publish_to_local(Name, Message).
+-spec publish_to_local(GroupName :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
+publish_to_local(GroupName, Message) ->
+    syn_groups:publish_to_local(GroupName, Message).
+
+-spec multi_call(GroupName :: any(), Message :: any()) ->
+    {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
+multi_call(GroupName, Message) ->
+    syn_groups:multi_call(GroupName, Message).
+
+-spec multi_call(GroupName :: any(), Message :: any(), Timeout :: non_neg_integer()) ->
+    {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
+multi_call(GroupName, Message, Timeout) ->
+    syn_groups:multi_call(GroupName, Message, Timeout).
+
+-spec multi_call_reply(CallerPid :: pid(), Reply :: any()) -> {syn_multi_call_reply, pid(), Reply :: any()}.
+multi_call_reply(CallerPid, Reply) ->
+    syn_groups:multi_call_reply(CallerPid, Reply).

+ 65 - 3
src/syn_groups.erl

@@ -36,6 +36,7 @@
 -export([local_member/2]).
 -export([publish/2]).
 -export([publish_to_local/2]).
+-export([multi_call/2, multi_call/3, multi_call_reply/2]).
 
 %% sync API
 -export([sync_join/3, sync_leave/2]).
@@ -44,9 +45,15 @@
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
 
+%% internal
+-export([multi_call_and_receive/4]).
+
 %% records
 -record(state, {}).
 
+%% macros
+-define(DEFAULT_MULTI_CALL_TIMEOUT_MS, 5000).
+
 %% includes
 -include("syn.hrl").
 
@@ -143,15 +150,34 @@ publish(GroupName, Message) ->
     lists:foreach(FSend, MemberPids),
     {ok, length(MemberPids)}.
 
--spec publish_to_local(Name :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
-publish_to_local(Name, Message) ->
-    MemberPids = get_local_members(Name),
+-spec publish_to_local(GroupName :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
+publish_to_local(GroupName, Message) ->
+    MemberPids = get_local_members(GroupName),
     FSend = fun(Pid) ->
         Pid ! Message
     end,
     lists:foreach(FSend, MemberPids),
     {ok, length(MemberPids)}.
 
+-spec multi_call(GroupName :: any(), Message :: any()) -> {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
+multi_call(GroupName, Message) ->
+    multi_call(GroupName, Message, ?DEFAULT_MULTI_CALL_TIMEOUT_MS).
+
+-spec multi_call(GroupName :: any(), Message :: any(), Timeout :: non_neg_integer()) ->
+    {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
+multi_call(GroupName, Message, Timeout) ->
+    Self = self(),
+    MemberPids = get_members(GroupName),
+    FSend = fun(Pid) ->
+        spawn_link(?MODULE, multi_call_and_receive, [Self, Pid, Message, Timeout])
+    end,
+    lists:foreach(FSend, MemberPids),
+    collect_replies(MemberPids).
+
+-spec multi_call_reply(CallerPid :: pid(), Reply :: any()) -> {syn_multi_call_reply, pid(), Reply :: any()}.
+multi_call_reply(CallerPid, Reply) ->
+    CallerPid ! {syn_multi_call_reply, self(), Reply}.
+
 -spec sync_join(GroupName :: any(), Pid :: pid(), Meta :: any()) -> ok.
 sync_join(GroupName, Pid, Meta) ->
     gen_server:cast(?MODULE, {sync_join, GroupName, Pid, Meta}).
@@ -449,3 +475,39 @@ purge_group_entries_for_remote_node(Node) when Node =/= node() ->
     %% delete
     DelF = fun(Record) -> mnesia:dirty_delete_object(syn_groups_table, Record) end,
     lists:foreach(DelF, ObjectsToDelete).
+
+-spec multi_call_and_receive(
+    CollectorPid :: pid(),
+    Pid :: pid(),
+    Message :: any(),
+    Timeout :: non_neg_integer()
+) -> any().
+multi_call_and_receive(CollectorPid, Pid, Message, Timeout) ->
+    MonitorRef = monitor(process, Pid),
+    Pid ! {syn_multi_call, self(), Message},
+
+    receive
+        {syn_multi_call_reply, Pid, Reply} ->
+            CollectorPid ! {reply, Pid, Reply};
+        {'DOWN', MonitorRef, _, _, _} ->
+            CollectorPid ! {bad_pid, Pid}
+    after Timeout ->
+        CollectorPid ! {bad_pid, Pid}
+    end.
+
+-spec collect_replies(MemberPids :: [pid()]) -> {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
+collect_replies(MemberPids) ->
+    collect_replies(MemberPids, [], []).
+
+-spec collect_replies(MemberPids :: [pid()], [{pid(), Reply :: any()}], [pid()]) ->
+    {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
+collect_replies([], Replies, BadPids) -> {Replies, BadPids};
+collect_replies(MemberPids, Replies, BadPids) ->
+    receive
+        {reply, Pid, Reply} ->
+            MemberPids1 = lists:delete(Pid, MemberPids),
+            collect_replies(MemberPids1, [{Pid, Reply} | Replies], BadPids);
+        {bad_pid, Pid} ->
+            MemberPids1 = lists:delete(Pid, MemberPids),
+            collect_replies(MemberPids1, Replies, [Pid | BadPids])
+    end.

+ 120 - 14
test/syn_groups_SUITE.erl

@@ -36,13 +36,16 @@
     single_node_join_and_monitor/1,
     single_node_join_and_leave/1,
     single_node_join_errors/1,
-    single_node_publish/1
+    single_node_publish/1,
+    single_node_multicall/1,
+    single_node_multicall_with_custom_timeout/1
 ]).
 -export([
     two_nodes_join_monitor_and_unregister/1,
     two_nodes_local_members/1,
     two_nodes_publish/1,
-    two_nodes_local_publish/1
+    two_nodes_local_publish/1,
+    two_nodes_multicall/1
 ]).
 -export([
     three_nodes_partial_netsplit_consistency/1,
@@ -66,9 +69,9 @@
 %% -------------------------------------------------------------------
 all() ->
     [
-        {group, single_node_groups}
-%%        {group, two_nodes_groups}
-%%        {group, three_nodes_groups}
+        {group, single_node_groups},
+        {group, two_nodes_groups},
+        {group, three_nodes_groups}
     ].
 
 %% -------------------------------------------------------------------
@@ -86,17 +89,19 @@ all() ->
 groups() ->
     [
         {single_node_groups, [shuffle], [
-%%            single_node_join_and_monitor,
-%%            single_node_join_and_leave,
-%%            single_node_join_errors,
-%%            single_node_publish,
-            single_node_multicall
+            single_node_join_and_monitor,
+            single_node_join_and_leave,
+            single_node_join_errors,
+            single_node_publish,
+            single_node_multicall,
+            single_node_multicall_with_custom_timeout
         ]},
         {two_nodes_groups, [shuffle], [
-%%            two_nodes_join_monitor_and_unregister,
-%%            two_nodes_local_members,
-%%            two_nodes_publish,
-%%            two_nodes_local_publish,
+            two_nodes_join_monitor_and_unregister,
+            two_nodes_local_members,
+            two_nodes_publish,
+            two_nodes_local_publish,
+            two_nodes_multicall
         ]},
         {three_nodes_groups, [shuffle], [
             three_nodes_partial_netsplit_consistency,
@@ -321,6 +326,72 @@ single_node_publish(_Config) ->
     syn_test_suite_helper:kill_process(Pid),
     syn_test_suite_helper:kill_process(Pid2).
 
+single_node_multicall(_Config) ->
+    GroupName = <<"my group">>,
+    %% start
+    ok = syn:start(),
+    %% start processes
+    F = fun() ->
+        receive
+            {syn_multi_call, RequestorPid, get_pid_name} ->
+                syn:multi_call_reply(RequestorPid, {pong, self()})
+        end
+    end,
+    Pid1 = syn_test_suite_helper:start_process(F),
+    Pid2 = syn_test_suite_helper:start_process(F),
+    PidUnresponsive = syn_test_suite_helper:start_process(),
+    %% register
+    ok = syn:join(GroupName, Pid1),
+    ok = syn:join(GroupName, Pid2),
+    ok = syn:join(GroupName, PidUnresponsive),
+    %% call
+    {Replies, BadPids} = syn:multi_call(GroupName, get_pid_name),
+    %% check responses
+    true = lists:sort([
+        {Pid1, {pong, Pid1}},
+        {Pid2, {pong, Pid2}}
+    ]) =:= lists:sort(Replies),
+    [PidUnresponsive] = BadPids,
+    %% kill processes
+    syn_test_suite_helper:kill_process(Pid1),
+    syn_test_suite_helper:kill_process(Pid2),
+    syn_test_suite_helper:kill_process(PidUnresponsive).
+
+single_node_multicall_with_custom_timeout(_Config) ->
+    GroupName = <<"my group">>,
+    %% start
+    ok = syn:start(),
+    %% start processes
+    F1 = fun() ->
+        receive
+            {syn_multi_call, RequestorPid, get_pid_name} ->
+                syn:multi_call_reply(RequestorPid, {pong, self()})
+        end
+    end,
+    Pid1 = syn_test_suite_helper:start_process(F1),
+    F2 = fun() ->
+        receive
+            {syn_multi_call, RequestorPid, get_pid_name} ->
+                timer:sleep(5000),
+                syn:multi_call_reply(RequestorPid, {pong, self()})
+        end
+    end,
+    PidTakesLong = syn_test_suite_helper:start_process(F2),
+    PidUnresponsive = syn_test_suite_helper:start_process(),
+    %% register
+    ok = syn:join(GroupName, Pid1),
+    ok = syn:join(GroupName, PidTakesLong),
+    ok = syn:join(GroupName, PidUnresponsive),
+    %% call
+    {Replies, BadPids} = syn:multi_call(GroupName, get_pid_name, 2000),
+    %% check responses
+    [{Pid1, {pong, Pid1}}] = Replies,
+    true = lists:sort([PidTakesLong, PidUnresponsive]) =:= lists:sort(BadPids),
+    %% kill processes
+    syn_test_suite_helper:kill_process(Pid1),
+    syn_test_suite_helper:kill_process(PidTakesLong),
+    syn_test_suite_helper:kill_process(PidUnresponsive).
+
 two_nodes_join_monitor_and_unregister(Config) ->
     GroupName = "my group",
     %% get slave
@@ -590,6 +661,41 @@ two_nodes_local_publish(Config) ->
     syn_test_suite_helper:kill_process(RemotePid2),
     syn_test_suite_helper:kill_process(OtherPid).
 
+two_nodes_multicall(Config) ->
+    GroupName = <<"my group">>,
+    %% get slave
+    SlaveNode = proplists:get_value(slave_node, Config),
+    %% start
+    ok = syn:start(),
+    ok = rpc:call(SlaveNode, syn, start, []),
+    timer:sleep(100),
+    %% start processes
+    F = fun() ->
+        receive
+            {syn_multi_call, RequestorPid, get_pid_name} ->
+                syn:multi_call_reply(RequestorPid, {pong, self()})
+        end
+    end,
+    Pid1 = syn_test_suite_helper:start_process(F),
+    Pid2 = syn_test_suite_helper:start_process(SlaveNode, F),
+    PidUnresponsive = syn_test_suite_helper:start_process(),
+    %% register
+    ok = syn:join(GroupName, Pid1),
+    ok = syn:join(GroupName, Pid2),
+    ok = syn:join(GroupName, PidUnresponsive),
+    %% call
+    {Replies, BadPids} = syn:multi_call(GroupName, get_pid_name),
+    %% check responses
+    true = lists:sort([
+        {Pid1, {pong, Pid1}},
+        {Pid2, {pong, Pid2}}
+    ]) =:= lists:sort(Replies),
+    [PidUnresponsive] = BadPids,
+    %% kill processes
+    syn_test_suite_helper:kill_process(Pid1),
+    syn_test_suite_helper:kill_process(Pid2),
+    syn_test_suite_helper:kill_process(PidUnresponsive).
+
 three_nodes_partial_netsplit_consistency(Config) ->
     GroupName = "my group",
     %% get slaves