Browse Source

Monitor recipient processes during multi_calls.

[ostinelli/syn#12]
Roberto Ostinelli 9 years ago
parent
commit
96904230c3
3 changed files with 73 additions and 17 deletions
  1. 1 1
      README.md
  2. 35 13
      src/syn_groups.erl
  3. 37 3
      test/syn_groups_SUITE.erl

+ 1 - 1
README.md

@@ -296,7 +296,7 @@ Types:
 	  Reply = any()
 ```
 
-> Syn will wait up to the value specified in `Timeout` to receive all replies from the members. The members that do not reply in time will be added to the `BadPids` list.
+> Syn will wait up to the value specified in `Timeout` to receive all replies from the members. The members that do not reply in time or that crash before sending a reply will be added to the `BadPids` list.
 
 When this call is issued, all members will receive a tuple in the format:
 

+ 35 - 13
src/syn_groups.erl

@@ -39,6 +39,9 @@
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
 
+%% internal
+-export([multi_call_and_receive/4]).
+
 %% records
 -record(state, {}).
 
@@ -94,10 +97,10 @@ multi_call(Name, Message, Timeout) ->
     Self = self(),
     MemberPids = i_get_members(Name),
     FSend = fun(Pid) ->
-        Pid ! {syn_multi_call, Self, Message}
+        spawn_link(?MODULE, multi_call_and_receive, [Self, Pid, Message, Timeout])
     end,
     lists:foreach(FSend, MemberPids),
-    collect_replies(MemberPids, Timeout).
+    collect_replies(MemberPids).
 
 -spec multi_call_reply(CallerPid :: pid(), Reply :: any()) -> {syn_multi_call_reply, pid(), Reply :: any()}.
 multi_call_reply(CallerPid, Reply) ->
@@ -278,19 +281,38 @@ i_find_by_pid(Pid) ->
 remove_process(Process) ->
     mnesia:dirty_delete_object(syn_groups_table, Process).
 
--spec collect_replies(MemberPids :: [pid()], Timeout :: non_neg_integer()) ->
-    {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
-collect_replies(MemberPids, Timeout) ->
-    collect_replies(MemberPids, Timeout, []).
+-spec multi_call_and_receive(
+    CollectorPid :: pid(),
+    Pid :: pid(),
+    Message :: any(),
+    Timeout :: non_neg_integer()
+) -> any().
+multi_call_and_receive(CollectorPid, Pid, Message, Timeout) ->
+    MonitorRef = monitor(process, Pid),
+    Pid ! {syn_multi_call, self(), Message},
 
--spec collect_replies(MemberPids :: [pid()], Timeout :: non_neg_integer(), Replies :: [{pid(), Reply :: any()}]) ->
-    {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
-collect_replies([], _Timeout, Replies) -> {Replies, []};
-collect_replies(MemberPids, Timeout, Replies) ->
     receive
         {syn_multi_call_reply, Pid, Reply} ->
-            MemberPids1 = lists:delete(Pid, MemberPids),
-            collect_replies(MemberPids1, Timeout, [{Pid, Reply} | Replies])
+            CollectorPid ! {reply, Pid, Reply};
+        {'DOWN', MonitorRef, _, _, _} ->
+            CollectorPid ! {bad_pid, Pid}
     after Timeout ->
-        {Replies, MemberPids}
+        CollectorPid ! {bad_pid, Pid}
+    end.
+
+-spec collect_replies(MemberPids :: [pid()]) -> {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
+collect_replies(MemberPids) ->
+    collect_replies(MemberPids, [], []).
+
+-spec collect_replies(MemberPids :: [pid()], [{pid(), Reply :: any()}], [pid()]) ->
+    {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
+collect_replies([], Replies, BadPids) -> {Replies, BadPids};
+collect_replies(MemberPids, Replies, BadPids) ->
+    receive
+        {reply, Pid, Reply} ->
+            MemberPids1 = lists:delete(Pid, MemberPids),
+            collect_replies(MemberPids1, [{Pid, Reply} | Replies], BadPids);
+        {bad_pid, Pid} ->
+            MemberPids1 = lists:delete(Pid, MemberPids),
+            collect_replies(MemberPids1, Replies, [Pid | BadPids])
     end.

+ 37 - 3
test/syn_groups_SUITE.erl

@@ -36,7 +36,8 @@
     single_node_leave/1,
     single_node_kill/1,
     single_node_publish/1,
-    single_node_multi_call/1
+    single_node_multi_call/1,
+    single_node_multi_call_when_recipient_crashes/1
 ]).
 -export([
     two_nodes_kill/1,
@@ -46,7 +47,7 @@
 
 %% internal
 -export([recipient_loop/1]).
--export([called_loop/1]).
+-export([called_loop/1, called_loop_that_crashes/1]).
 
 %% include
 -include_lib("common_test/include/ct.hrl").
@@ -87,7 +88,8 @@ groups() ->
             single_node_leave,
             single_node_kill,
             single_node_publish,
-            single_node_multi_call
+            single_node_multi_call,
+            single_node_multi_call_when_recipient_crashes
         ]},
         {two_nodes_process_groups, [shuffle], [
             two_nodes_kill,
@@ -277,6 +279,33 @@ single_node_multi_call(_Config) ->
     syn_test_suite_helper:kill_process(Pid2),
     syn_test_suite_helper:kill_process(PidUnresponsive).
 
+single_node_multi_call_when_recipient_crashes(_Config) ->
+    %% set schema location
+    application:set_env(mnesia, schema_location, ram),
+    %% start
+    ok = syn:start(),
+    ok = syn:init(),
+    %% start processes
+    Pid1 = syn_test_suite_helper:start_process(fun() -> called_loop(pid1) end),
+    Pid2 = syn_test_suite_helper:start_process(fun() -> called_loop(pid2) end),
+    PidCrashes = syn_test_suite_helper:start_process(fun() -> called_loop_that_crashes(pid_crashes) end),
+    %% register
+    ok = syn:join(<<"my group">>, Pid1),
+    ok = syn:join(<<"my group">>, Pid2),
+    ok = syn:join(<<"my group">>, PidCrashes),
+    %% call
+    {Time, {Replies, BadPids}} = timer:tc(syn, multi_call, [<<"my group">>, get_pid_name]),
+    %% check that pid2 was monitored, no need to wait for timeout
+    true = Time/1000 < 1000,
+    %% check responses
+    2 = length(Replies),
+    pid1 = proplists:get_value(Pid1, Replies),
+    pid2 = proplists:get_value(Pid2, Replies),
+    [PidCrashes] = BadPids,
+    %% kill processes
+    syn_test_suite_helper:kill_process(Pid1),
+    syn_test_suite_helper:kill_process(Pid2).
+
 two_nodes_kill(Config) ->
     %% get slave
     SlaveNode = proplists:get_value(slave_node, Config),
@@ -399,3 +428,8 @@ called_loop(PidName) ->
     receive
         {syn_multi_call, CallerPid, get_pid_name} -> syn:multi_call_reply(CallerPid, PidName)
     end.
+
+called_loop_that_crashes(_PidName) ->
+    receive
+        {syn_multi_call, _CallerPid, get_pid_name} -> exit(recipient_crashed_on_purpose)
+    end.