Browse Source

Add multi_call to groups.

Roberto Ostinelli 9 years ago
parent
commit
a492bf43fb
4 changed files with 164 additions and 12 deletions
  1. 44 7
      README.md
  2. 10 0
      src/syn.erl
  3. 37 0
      src/syn_groups.erl
  4. 73 5
      test/syn_groups_SUITE.erl

+ 44 - 7
README.md

@@ -313,6 +313,25 @@ Types:
 
 > You don't need to remove processes that are about to die, since they are monitored by Syn and they will be removed automatically from their groups.
 
+To get a list of the members of a group:
+
+```erlang
+syn:get_members(Name) -> [pid()].
+
+Types:
+	Name = any()
+```
+
+To know if a process is a member of a group:
+
+```erlang
+syn:member(Pid, Name) -> boolean().
+
+Types:
+	Pid = pid()
+	Name = any()
+```
+
 To publish a message to all group members:
 
 ```erlang
@@ -324,27 +343,45 @@ Types:
 	RecipientCount = non_neg_integer()
 ```
 
-To get a list of the members of a group:
+To call all group members and get their replies:
 
 ```erlang
-syn:get_members(Name) -> [pid()].
+syn:multi_call(Name, Message) -> {Replies, BadPids}.
 
 Types:
 	Name = any()
+	Message = any()
+	Replies = [{MemberPid, Reply}]
+	BadPids = [MemberPid]
+	  MemberPid = pid()
+	  Reply = any()
 ```
 
-To know if a process is a member of a group:
+> Syn will wait up to 5 seconds to receive all replies from the members. The members that do not reply in time will be added to the BadPids list.
+
+When this call is issued, all members will receive a tuple in the format:
 
 ```erlang
-syn:member(Pid, Name) -> boolean().
+{syn_multi_call, CallerPid, Message}
 
 Types:
-	Pid = pid()
-	Name = any()
+	CallerPid = pid()
+	Message = any()
+```
+
+To reply, every member must use the method:
+
+```erlang
+syn:multi_call_reply(CallerPid, Reply) -> ok.
+
+Types:
+	CallerPid = pid()
+	Reply = any()
 ```
 
+
 ## Internals
-Under the hood, Syn performs dirty reads and writes into a distributed in-memory Mnesia table, replicated across all the nodes of the cluster.
+Under the hood, Syn performs dirty reads and writes into distributed in-memory Mnesia tables, replicated across all the nodes of the cluster.
 
 To automatically handle conflict resolution, Syn implements a specialized and simplified version of the mechanisms used in Ulf Wiger's [unsplit](https://github.com/uwiger/unsplit) framework.
 

+ 10 - 0
src/syn.erl

@@ -42,6 +42,8 @@
 -export([member/2]).
 -export([get_members/1]).
 -export([publish/2]).
+-export([multi_call/2]).
+-export([multi_call_reply/2]).
 
 %% ===================================================================
 %% API
@@ -116,6 +118,14 @@ get_members(Name) ->
 publish(Name, Message) ->
     syn_groups:publish(Name, Message).
 
+-spec multi_call(Name :: any(), Message :: any()) -> [{pid(), Response :: any()}].
+multi_call(Name, Message) ->
+    syn_groups:multi_call(Name, Message).
+
+-spec multi_call_reply(CallerPid :: pid(), Reply :: any()) -> ok.
+multi_call_reply(CallerPid, Reply) ->
+    syn_groups:multi_call_reply(CallerPid, Reply).
+
 %% ===================================================================
 %% Internal
 %% ===================================================================

+ 37 - 0
src/syn_groups.erl

@@ -32,6 +32,8 @@
 -export([member/2]).
 -export([get_members/1]).
 -export([publish/2]).
+-export([multi_call/2]).
+-export([multi_call_reply/2]).
 
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@@ -39,6 +41,9 @@
 %% records
 -record(state, {}).
 
+%% macros
+-define(MULTI_CALL_TIMEOUT_MS, 5000).
+
 %% include
 -include("syn.hrl").
 
@@ -78,6 +83,21 @@ publish(Name, Message) ->
     lists:foreach(FSend, MemberPids),
     {ok, length(MemberPids)}.
 
+-spec multi_call(Name :: any(), Message :: any()) ->
+    {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
+multi_call(Name, Message) ->
+    Self = self(),
+    MemberPids = i_get_members(Name),
+    FSend = fun(Pid) ->
+        Pid ! {syn_multi_call, Self, Message}
+    end,
+    lists:foreach(FSend, MemberPids),
+    collect_replies(MemberPids).
+
+-spec multi_call_reply(CallerPid :: pid(), Reply :: any()) -> ok.
+multi_call_reply(CallerPid, Reply) ->
+    CallerPid ! {syn_multi_call_reply, self(), Reply}.
+
 %% ===================================================================
 %% Callbacks
 %% ===================================================================
@@ -251,3 +271,20 @@ i_find_by_pid(Pid) ->
 -spec remove_process(Process :: #syn_groups_table{}) -> ok.
 remove_process(Process) ->
     mnesia:dirty_delete_object(syn_groups_table, Process).
+
+-spec collect_replies(MemberPids :: [pid()]) ->
+    {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
+collect_replies(MemberPids) ->
+    collect_replies(MemberPids, []).
+
+-spec collect_replies(MemberPids :: [pid()], Replies :: [{pid(), Reply :: any()}]) ->
+    {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
+collect_replies([], Replies) -> {Replies, []};
+collect_replies(MemberPids, Replies) ->
+    receive
+        {syn_multi_call_reply, Pid, Reply} ->
+            MemberPids1 = lists:delete(Pid, MemberPids),
+            collect_replies(MemberPids1, [{Pid, Reply} | Replies])
+    after ?MULTI_CALL_TIMEOUT_MS ->
+        {Replies, MemberPids}
+    end.

+ 73 - 5
test/syn_groups_SUITE.erl

@@ -35,15 +35,18 @@
 -export([
     single_node_leave/1,
     single_node_kill/1,
-    single_node_publish/1
+    single_node_publish/1,
+    single_node_call/1
 ]).
 -export([
     two_nodes_kill/1,
-    two_nodes_publish/1
+    two_nodes_publish/1,
+    two_nodes_call/1
 ]).
 
 %% internal
 -export([recipient_loop/1]).
+-export([called_loop/1]).
 
 %% include
 -include_lib("common_test/include/ct.hrl").
@@ -83,11 +86,13 @@ groups() ->
         {single_node_process_groups, [shuffle], [
             single_node_leave,
             single_node_kill,
-            single_node_publish
+            single_node_publish,
+            single_node_call
         ]},
         {two_nodes_process_groups, [shuffle], [
             two_nodes_kill,
-            two_nodes_publish
+            two_nodes_publish,
+            two_nodes_call
         ]}
     ].
 %% -------------------------------------------------------------------
@@ -221,7 +226,7 @@ single_node_publish(_Config) ->
     %% start
     ok = syn:start(),
     ok = syn:init(),
-    %% start process
+    %% start processes
     ResultPid = self(),
     F = fun() -> recipient_loop(ResultPid) end,
     Pid1 = syn_test_suite_helper:start_process(F),
@@ -246,6 +251,32 @@ single_node_publish(_Config) ->
     syn_test_suite_helper:kill_process(Pid1),
     syn_test_suite_helper:kill_process(Pid2).
 
+single_node_call(_Config) ->
+    %% set schema location
+    application:set_env(mnesia, schema_location, ram),
+    %% start
+    ok = syn:start(),
+    ok = syn:init(),
+    %% start processes
+    Pid1 = syn_test_suite_helper:start_process(fun() -> called_loop(pid1) end),
+    Pid2 = syn_test_suite_helper:start_process(fun() -> called_loop(pid2) end),
+    PidUnresponsive = syn_test_suite_helper:start_process(),
+    %% register
+    ok = syn:join(<<"my group">>, Pid1),
+    ok = syn:join(<<"my group">>, Pid2),
+    ok = syn:join(<<"my group">>, PidUnresponsive),
+    %% call
+    {Replies, BadPids} = syn:multi_call(<<"my group">>, get_pid_name),
+    %% check responses
+    2 = length(Replies),
+    pid1 = proplists:get_value(Pid1, Replies),
+    pid2 = proplists:get_value(Pid2, Replies),
+    [PidUnresponsive] = BadPids,
+    %% kill processes
+    syn_test_suite_helper:kill_process(Pid1),
+    syn_test_suite_helper:kill_process(Pid2),
+    syn_test_suite_helper:kill_process(PidUnresponsive).
+
 two_nodes_kill(Config) ->
     %% get slave
     SlaveNode = proplists:get_value(slave_node, Config),
@@ -327,6 +358,38 @@ two_nodes_publish(Config) ->
     syn_test_suite_helper:kill_process(PidLocal),
     syn_test_suite_helper:kill_process(PidSlave).
 
+two_nodes_call(Config) ->
+    %% get slave
+    SlaveNode = proplists:get_value(slave_node, Config),
+    %% set schema location
+    application:set_env(mnesia, schema_location, ram),
+    rpc:call(SlaveNode, mnesia, schema_location, [ram]),
+    %% start
+    ok = syn:start(),
+    ok = syn:init(),
+    ok = rpc:call(SlaveNode, syn, start, []),
+    ok = rpc:call(SlaveNode, syn, init, []),
+    timer:sleep(100),
+    %% start processes
+    PidLocal = syn_test_suite_helper:start_process(fun() -> called_loop(pid1) end),
+    PidSlave = syn_test_suite_helper:start_process(SlaveNode, fun() -> called_loop(pid2) end),
+    PidUnresponsive = syn_test_suite_helper:start_process(),
+    %% register
+    ok = syn:join(<<"my group">>, PidLocal),
+    ok = syn:join(<<"my group">>, PidSlave),
+    ok = syn:join(<<"my group">>, PidUnresponsive),
+    %% call
+    {Replies, BadPids} = syn:multi_call(<<"my group">>, get_pid_name),
+    %% check responses
+    2 = length(Replies),
+    pid1 = proplists:get_value(PidLocal, Replies),
+    pid2 = proplists:get_value(PidSlave, Replies),
+    [PidUnresponsive] = BadPids,
+    %% kill processes
+    syn_test_suite_helper:kill_process(PidLocal),
+    syn_test_suite_helper:kill_process(PidSlave),
+    syn_test_suite_helper:kill_process(PidUnresponsive).
+
 %% ===================================================================
 %% Internal
 %% ===================================================================
@@ -334,3 +397,8 @@ recipient_loop(Pid) ->
     receive
         Message -> Pid ! {received, self(), Message}
     end.
+
+called_loop(PidName) ->
+    receive
+        {syn_multi_call, CallerPid, get_pid_name} -> syn:multi_call_reply(CallerPid, PidName)
+    end.