Browse Source

Add custom groups scopes.

Roberto Ostinelli 3 years ago
parent
commit
49ec0caf0b
3 changed files with 249 additions and 11 deletions
  1. 4 1
      src/syn_gen_scope.erl
  2. 13 5
      src/syn_groups.erl
  3. 232 5
      test/syn_groups_SUITE.erl

+ 4 - 1
src/syn_gen_scope.erl

@@ -104,7 +104,10 @@ call(Handler, Scope, Message) ->
 call(Handler, Node, Scope, Message) ->
 call(Handler, Node, Scope, Message) ->
     case get_process_name_for_scope(Handler, Scope) of
     case get_process_name_for_scope(Handler, Scope) of
         undefined -> error({invalid_scope, Scope});
         undefined -> error({invalid_scope, Scope});
-        ProcessName -> gen_server:call({ProcessName, Node}, Message)
+        ProcessName ->
+            try gen_server:call({ProcessName, Node}, Message)
+            catch exit:{noproc, {gen_server, call, _}} -> error({invalid_scope, Scope})
+            end
     end.
     end.
 
 
 %% ===================================================================
 %% ===================================================================

+ 13 - 5
src/syn_groups.erl

@@ -333,17 +333,25 @@ rebuild_monitors(#state{
     table_by_pid = TableByPid
     table_by_pid = TableByPid
 }) ->
 }) ->
     GroupsTuples = get_groups_tuples_for_node(node(), TableByName),
     GroupsTuples = get_groups_tuples_for_node(node(), TableByName),
-    lists:foreach(fun({GroupName, Pid, Meta, Time}) ->
+    lists:foldl(fun({GroupName, Pid, Meta, Time}, NewMonitorRefs) ->
         remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
         remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
         case is_process_alive(Pid) of
         case is_process_alive(Pid) of
             true ->
             true ->
-                MRef = erlang:monitor(process, Pid),
-                add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid);
+                case maps:find(Pid, NewMonitorRefs) of
+                    error ->
+                        MRef = erlang:monitor(process, Pid),
+                        add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid),
+                        maps:put(Pid, MRef, NewMonitorRefs);
+
+                    {ok, MRef} ->
+                        add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid),
+                        NewMonitorRefs
+                end;
 
 
             _ ->
             _ ->
-                ok
+                NewMonitorRefs
         end
         end
-    end, GroupsTuples).
+    end, #{}, GroupsTuples).
 
 
 -spec get_groups_tuples_for_node(Node :: node(), TableByName :: atom()) -> [syn_groups_tuple()].
 -spec get_groups_tuples_for_node(Node :: node(), TableByName :: atom()) -> [syn_groups_tuple()].
 get_groups_tuples_for_node(Node, TableByName) ->
 get_groups_tuples_for_node(Node, TableByName) ->

+ 232 - 5
test/syn_groups_SUITE.erl

@@ -35,7 +35,8 @@
 -export([
 -export([
     three_nodes_discover_default_scope/1,
     three_nodes_discover_default_scope/1,
     three_nodes_discover_custom_scope/1,
     three_nodes_discover_custom_scope/1,
-    three_nodes_join_leave_and_monitor_default_scope/1
+    three_nodes_join_leave_and_monitor_default_scope/1,
+    three_nodes_join_leave_and_monitor_custom_scope/1
 ]).
 ]).
 
 
 %% include
 %% include
@@ -73,9 +74,10 @@ all() ->
 groups() ->
 groups() ->
     [
     [
         {three_nodes_groups, [shuffle], [
         {three_nodes_groups, [shuffle], [
-%%            three_nodes_discover_default_scope,
-%%            three_nodes_discover_custom_scope,
-            three_nodes_join_leave_and_monitor_default_scope
+            three_nodes_discover_default_scope,
+            three_nodes_discover_custom_scope,
+            three_nodes_join_leave_and_monitor_default_scope,
+            three_nodes_join_leave_and_monitor_custom_scope
         ]}
         ]}
     ].
     ].
 %% -------------------------------------------------------------------
 %% -------------------------------------------------------------------
@@ -480,4 +482,229 @@ three_nodes_join_leave_and_monitor_default_scope(Config) ->
     1 = syn:groups_count(default),
     1 = syn:groups_count(default),
     1 = syn:groups_count(default, node()),
     1 = syn:groups_count(default, node()),
     0 = syn:groups_count(default, SlaveNode1),
     0 = syn:groups_count(default, SlaveNode1),
-    0 = syn:groups_count(default, SlaveNode2).
+    0 = syn:groups_count(default, SlaveNode2),
+
+    %% errors
+    {error, not_in_group} = syn:leave({group, "one"}, PidWithMeta).
+
+three_nodes_join_leave_and_monitor_custom_scope(Config) ->
+    %% get slaves
+    SlaveNode1 = proplists:get_value(slave_node_1, Config),
+    SlaveNode2 = proplists:get_value(slave_node_2, Config),
+
+    %% start syn on nodes
+    ok = syn:start(),
+    ok = rpc:call(SlaveNode1, syn, start, []),
+    ok = rpc:call(SlaveNode2, syn, start, []),
+
+    %% add custom scopes
+    ok = syn:add_node_to_scope(custom_scope_ab),
+    ok = rpc:call(SlaveNode1, syn, add_node_to_scopes, [[custom_scope_ab, custom_scope_bc]]),
+    ok = rpc:call(SlaveNode2, syn, add_node_to_scopes, [[custom_scope_bc]]),
+
+    %% start processes
+    Pid = syn_test_suite_helper:start_process(),
+    PidWithMeta = syn_test_suite_helper:start_process(),
+    PidRemoteOn1 = syn_test_suite_helper:start_process(SlaveNode1),
+    PidRemoteOn2 = syn_test_suite_helper:start_process(SlaveNode2),
+
+    %% check
+    [] = syn:get_members(custom_scope_ab, {group, "one"}),
+    [] = rpc:call(SlaveNode1, syn, get_members, [custom_scope_ab, {group, "one"}]),
+    {badrpc, {'EXIT', {{invalid_scope, custom_scope_ab}, _}}} = catch rpc:call(SlaveNode2, syn, get_members, [custom_scope_ab, {group, "one"}]),
+    {'EXIT', {{invalid_scope, custom_scope_bc}, _}} = catch syn:get_members(custom_scope_bc, {group, "one"}),
+    [] = rpc:call(SlaveNode1, syn, get_members, [custom_scope_bc, {group, "one"}]),
+    [] = rpc:call(SlaveNode2, syn, get_members, [custom_scope_bc, {group, "one"}]),
+    [] = syn:get_members(custom_scope_ab, {group, "two"}),
+    [] = rpc:call(SlaveNode1, syn, get_members, [custom_scope_ab, {group, "two"}]),
+    {badrpc, {'EXIT', {{invalid_scope, custom_scope_ab}, _}}} = catch rpc:call(SlaveNode2, syn, get_members, [custom_scope_ab, {group, "two"}]),
+    {'EXIT', {{invalid_scope, custom_scope_bc}, _}} = catch syn:get_members(custom_scope_bc, {group, "two"}),
+    [] = rpc:call(SlaveNode1, syn, get_members, [custom_scope_bc, {group, "two"}]),
+    [] = rpc:call(SlaveNode2, syn, get_members, [custom_scope_bc, {group, "two"}]),
+
+    %% join
+    ok = syn:join(custom_scope_ab, {group, "one"}, Pid),
+    ok = syn:join(custom_scope_ab, {group, "one"}, PidWithMeta, <<"with meta">>),
+    ok = rpc:call(SlaveNode1, syn, join, [custom_scope_bc, {group, "two"}, PidRemoteOn1]),
+    ok = syn:join(custom_scope_ab, {group, "two"}, Pid),
+    ok = syn:join(custom_scope_ab, {group, "two"}, PidWithMeta, "with-meta-2"),
+
+    %% errors
+    {error, not_alive} = syn:join({"pid not alive"}, list_to_pid("<0.9999.0>")),
+    {error, not_in_group} = syn:leave({group, "three"}, Pid),
+    {'EXIT', {{invalid_scope, custom_scope_ab}, _}} = catch syn:join(custom_scope_ab, {group, "one"}, PidRemoteOn2),
+
+    %% retrieve
+    syn_test_suite_helper:assert_wait(
+        lists:sort([{Pid, undefined}, {PidWithMeta, <<"with meta">>}]),
+        fun() -> lists:sort(syn:get_members(custom_scope_ab, {group, "one"})) end
+    ),
+    syn_test_suite_helper:assert_wait(
+        lists:sort([{Pid, undefined}, {PidWithMeta, <<"with meta">>}]),
+        fun() -> lists:sort(rpc:call(SlaveNode1, syn, get_members, [custom_scope_ab, {group, "one"}])) end
+    ),
+    {badrpc, {'EXIT', {{invalid_scope, custom_scope_ab}, _}}} = catch rpc:call(SlaveNode2, syn, get_members, [custom_scope_ab, {group, "one"}]),
+    syn_test_suite_helper:assert_wait(
+        lists:sort([{Pid, undefined}, {PidWithMeta, "with-meta-2"}]),
+        fun() -> lists:sort(syn:get_members(custom_scope_ab, {group, "two"})) end
+    ),
+    syn_test_suite_helper:assert_wait(
+        lists:sort([{Pid, undefined}, {PidWithMeta, "with-meta-2"}]),
+        fun() -> lists:sort(rpc:call(SlaveNode1, syn, get_members, [custom_scope_ab, {group, "two"}])) end
+    ),
+    {badrpc, {'EXIT', {{invalid_scope, custom_scope_ab}, _}}} = catch rpc:call(SlaveNode2, syn, get_members, [custom_scope_ab, {group, "two"}]),
+    {'EXIT', {{invalid_scope, custom_scope_bc}, _}} = catch syn:get_members(custom_scope_bc, {group, "two"}),
+    syn_test_suite_helper:assert_wait(
+        [{PidRemoteOn1, undefined}],
+        fun() -> lists:sort(rpc:call(SlaveNode1, syn, get_members, [custom_scope_bc, {group, "two"}])) end
+    ),
+    syn_test_suite_helper:assert_wait(
+        [{PidRemoteOn1, undefined}],
+        fun() -> lists:sort(rpc:call(SlaveNode2, syn, get_members, [custom_scope_bc, {group, "two"}])) end
+    ),
+    2 = syn:groups_count(custom_scope_ab),
+    2 = syn:groups_count(custom_scope_ab, node()),
+    0 = syn:groups_count(custom_scope_ab, SlaveNode1),
+    0 = syn:groups_count(custom_scope_ab, SlaveNode2),
+    {'EXIT', {{invalid_scope, custom_scope_bc}, _}} = catch syn:groups_count(custom_scope_bc),
+    {'EXIT', {{invalid_scope, custom_scope_bc}, _}} = catch syn:groups_count(custom_scope_bc, node()),
+    {'EXIT', {{invalid_scope, custom_scope_bc}, _}} = catch syn:groups_count(custom_scope_bc, SlaveNode1),
+    {'EXIT', {{invalid_scope, custom_scope_bc}, _}} = catch syn:groups_count(custom_scope_bc, SlaveNode2),
+    2 = rpc:call(SlaveNode1, syn, groups_count, [custom_scope_ab]),
+    2 = rpc:call(SlaveNode1, syn, groups_count, [custom_scope_ab, node()]),
+    0 = rpc:call(SlaveNode1, syn, groups_count, [custom_scope_ab, SlaveNode1]),
+    0 = rpc:call(SlaveNode1, syn, groups_count, [custom_scope_ab, SlaveNode2]),
+    1 = rpc:call(SlaveNode1, syn, groups_count, [custom_scope_bc]),
+    0 = rpc:call(SlaveNode1, syn, groups_count, [custom_scope_bc, node()]),
+    1 = rpc:call(SlaveNode1, syn, groups_count, [custom_scope_bc, SlaveNode1]),
+    0 = rpc:call(SlaveNode1, syn, groups_count, [custom_scope_bc, SlaveNode2]),
+    {badrpc, {'EXIT', {{invalid_scope, custom_scope_ab}, _}}} = catch rpc:call(SlaveNode2, syn, groups_count, [custom_scope_ab]),
+    {badrpc, {'EXIT', {{invalid_scope, custom_scope_ab}, _}}} = catch rpc:call(SlaveNode2, syn, groups_count, [custom_scope_ab, node()]),
+    {badrpc, {'EXIT', {{invalid_scope, custom_scope_ab}, _}}} = catch rpc:call(SlaveNode2, syn, groups_count, [custom_scope_ab, SlaveNode1]),
+    {badrpc, {'EXIT', {{invalid_scope, custom_scope_ab}, _}}} = catch rpc:call(SlaveNode2, syn, groups_count, [custom_scope_ab, SlaveNode2]),
+    1 = rpc:call(SlaveNode2, syn, groups_count, [custom_scope_bc]),
+    0 = rpc:call(SlaveNode2, syn, groups_count, [custom_scope_bc, node()]),
+    1 = rpc:call(SlaveNode2, syn, groups_count, [custom_scope_bc, SlaveNode1]),
+    0 = rpc:call(SlaveNode2, syn, groups_count, [custom_scope_bc, SlaveNode2]),
+
+    %% re-join to edit meta
+    ok = syn:join(custom_scope_ab, {group, "one"}, PidWithMeta, <<"with updated meta">>),
+    ok = rpc:call(SlaveNode2, syn, join, [custom_scope_bc, {group, "two"}, PidRemoteOn1, added_meta]), %% updated on slave 2
+
+    %% retrieve
+    syn_test_suite_helper:assert_wait(
+        lists:sort([{Pid, undefined}, {PidWithMeta, <<"with updated meta">>}]),
+        fun() -> lists:sort(syn:get_members(custom_scope_ab, {group, "one"})) end
+    ),
+    syn_test_suite_helper:assert_wait(
+        lists:sort([{Pid, undefined}, {PidWithMeta, <<"with updated meta">>}]),
+        fun() -> lists:sort(rpc:call(SlaveNode1, syn, get_members, [custom_scope_ab, {group, "one"}])) end
+    ),
+    {badrpc, {'EXIT', {{invalid_scope, custom_scope_ab}, _}}} = catch rpc:call(SlaveNode2, syn, get_members, [custom_scope_ab, {group, "one"}]),
+    syn_test_suite_helper:assert_wait(
+        lists:sort([{Pid, undefined}, {PidWithMeta, "with-meta-2"}]),
+        fun() -> lists:sort(syn:get_members(custom_scope_ab, {group, "two"})) end
+    ),
+    syn_test_suite_helper:assert_wait(
+        lists:sort([{Pid, undefined}, {PidWithMeta, "with-meta-2"}]),
+        fun() -> lists:sort(rpc:call(SlaveNode1, syn, get_members, [custom_scope_ab, {group, "two"}])) end
+    ),
+    {badrpc, {'EXIT', {{invalid_scope, custom_scope_ab}, _}}} = catch rpc:call(SlaveNode2, syn, get_members, [custom_scope_ab, {group, "two"}]),
+    {'EXIT', {{invalid_scope, custom_scope_bc}, _}} = catch syn:get_members(custom_scope_bc, {group, "two"}),
+    syn_test_suite_helper:assert_wait(
+        [{PidRemoteOn1, added_meta}],
+        fun() -> lists:sort(rpc:call(SlaveNode1, syn, get_members, [custom_scope_bc, {group, "two"}])) end
+    ),
+    syn_test_suite_helper:assert_wait(
+        [{PidRemoteOn1, added_meta}],
+        fun() -> lists:sort(rpc:call(SlaveNode2, syn, get_members, [custom_scope_bc, {group, "two"}])) end
+    ),
+    2 = syn:groups_count(custom_scope_ab),
+    2 = syn:groups_count(custom_scope_ab, node()),
+    0 = syn:groups_count(custom_scope_ab, SlaveNode1),
+    0 = syn:groups_count(custom_scope_ab, SlaveNode2),
+    {'EXIT', {{invalid_scope, custom_scope_bc}, _}} = catch syn:groups_count(custom_scope_bc),
+    {'EXIT', {{invalid_scope, custom_scope_bc}, _}} = catch syn:groups_count(custom_scope_bc, node()),
+    {'EXIT', {{invalid_scope, custom_scope_bc}, _}} = catch syn:groups_count(custom_scope_bc, SlaveNode1),
+    {'EXIT', {{invalid_scope, custom_scope_bc}, _}} = catch syn:groups_count(custom_scope_bc, SlaveNode2),
+    2 = rpc:call(SlaveNode1, syn, groups_count, [custom_scope_ab]),
+    2 = rpc:call(SlaveNode1, syn, groups_count, [custom_scope_ab, node()]),
+    0 = rpc:call(SlaveNode1, syn, groups_count, [custom_scope_ab, SlaveNode1]),
+    0 = rpc:call(SlaveNode1, syn, groups_count, [custom_scope_ab, SlaveNode2]),
+    1 = rpc:call(SlaveNode1, syn, groups_count, [custom_scope_bc]),
+    0 = rpc:call(SlaveNode1, syn, groups_count, [custom_scope_bc, node()]),
+    1 = rpc:call(SlaveNode1, syn, groups_count, [custom_scope_bc, SlaveNode1]),
+    0 = rpc:call(SlaveNode1, syn, groups_count, [custom_scope_bc, SlaveNode2]),
+    {badrpc, {'EXIT', {{invalid_scope, custom_scope_ab}, _}}} = catch rpc:call(SlaveNode2, syn, groups_count, [custom_scope_ab]),
+    {badrpc, {'EXIT', {{invalid_scope, custom_scope_ab}, _}}} = catch rpc:call(SlaveNode2, syn, groups_count, [custom_scope_ab, node()]),
+    {badrpc, {'EXIT', {{invalid_scope, custom_scope_ab}, _}}} = catch rpc:call(SlaveNode2, syn, groups_count, [custom_scope_ab, SlaveNode1]),
+    {badrpc, {'EXIT', {{invalid_scope, custom_scope_ab}, _}}} = catch rpc:call(SlaveNode2, syn, groups_count, [custom_scope_ab, SlaveNode2]),
+    1 = rpc:call(SlaveNode2, syn, groups_count, [custom_scope_bc]),
+    0 = rpc:call(SlaveNode2, syn, groups_count, [custom_scope_bc, node()]),
+    1 = rpc:call(SlaveNode2, syn, groups_count, [custom_scope_bc, SlaveNode1]),
+    0 = rpc:call(SlaveNode2, syn, groups_count, [custom_scope_bc, SlaveNode2]),
+
+    %% crash scope process to ensure that monitors get recreated
+    exit(whereis(syn_groups_custom_scope_ab), kill),
+    syn_test_suite_helper:wait_process_name_ready(syn_groups_custom_scope_ab),
+
+    %% kill process
+    syn_test_suite_helper:kill_process(Pid),
+    syn_test_suite_helper:kill_process(PidRemoteOn1),
+    %% leave
+    ok = syn:leave(custom_scope_ab, {group, "one"}, PidWithMeta),
+
+    %% retrieve
+    syn_test_suite_helper:assert_wait(
+        [],
+        fun() -> lists:sort(syn:get_members(custom_scope_ab, {group, "one"})) end
+    ),
+    syn_test_suite_helper:assert_wait(
+        [],
+        fun() -> lists:sort(rpc:call(SlaveNode1, syn, get_members, [custom_scope_ab, {group, "one"}])) end
+    ),
+    {badrpc, {'EXIT', {{invalid_scope, custom_scope_ab}, _}}} = catch rpc:call(SlaveNode2, syn, get_members, [custom_scope_ab, {group, "one"}]),
+    syn_test_suite_helper:assert_wait(
+        [{PidWithMeta, "with-meta-2"}],
+        fun() -> lists:sort(syn:get_members(custom_scope_ab, {group, "two"})) end
+    ),
+    syn_test_suite_helper:assert_wait(
+        [{PidWithMeta, "with-meta-2"}],
+        fun() -> lists:sort(rpc:call(SlaveNode1, syn, get_members, [custom_scope_ab, {group, "two"}])) end
+    ),
+    {badrpc, {'EXIT', {{invalid_scope, custom_scope_ab}, _}}} = catch rpc:call(SlaveNode2, syn, get_members, [custom_scope_ab, {group, "two"}]),
+    {'EXIT', {{invalid_scope, custom_scope_bc}, _}} = catch syn:get_members(custom_scope_bc, {group, "two"}),
+    syn_test_suite_helper:assert_wait(
+        [],
+        fun() -> lists:sort(rpc:call(SlaveNode1, syn, get_members, [custom_scope_bc, {group, "two"}])) end
+    ),
+    syn_test_suite_helper:assert_wait(
+        [], fun() -> lists:sort(rpc:call(SlaveNode2, syn, get_members, [custom_scope_bc, {group, "two"}])) end
+    ),
+    1 = syn:groups_count(custom_scope_ab),
+    1 = syn:groups_count(custom_scope_ab, node()),
+    0 = syn:groups_count(custom_scope_ab, SlaveNode1),
+    0 = syn:groups_count(custom_scope_ab, SlaveNode2),
+    {'EXIT', {{invalid_scope, custom_scope_bc}, _}} = catch syn:groups_count(custom_scope_bc),
+    {'EXIT', {{invalid_scope, custom_scope_bc}, _}} = catch syn:groups_count(custom_scope_bc, node()),
+    {'EXIT', {{invalid_scope, custom_scope_bc}, _}} = catch syn:groups_count(custom_scope_bc, SlaveNode1),
+    {'EXIT', {{invalid_scope, custom_scope_bc}, _}} = catch syn:groups_count(custom_scope_bc, SlaveNode2),
+    1 = rpc:call(SlaveNode1, syn, groups_count, [custom_scope_ab]),
+    1 = rpc:call(SlaveNode1, syn, groups_count, [custom_scope_ab, node()]),
+    0 = rpc:call(SlaveNode1, syn, groups_count, [custom_scope_ab, SlaveNode1]),
+    0 = rpc:call(SlaveNode1, syn, groups_count, [custom_scope_ab, SlaveNode2]),
+    0 = rpc:call(SlaveNode1, syn, groups_count, [custom_scope_bc]),
+    0 = rpc:call(SlaveNode1, syn, groups_count, [custom_scope_bc, node()]),
+    0 = rpc:call(SlaveNode1, syn, groups_count, [custom_scope_bc, SlaveNode1]),
+    0 = rpc:call(SlaveNode1, syn, groups_count, [custom_scope_bc, SlaveNode2]),
+    {badrpc, {'EXIT', {{invalid_scope, custom_scope_ab}, _}}} = catch rpc:call(SlaveNode2, syn, groups_count, [custom_scope_ab]),
+    {badrpc, {'EXIT', {{invalid_scope, custom_scope_ab}, _}}} = catch rpc:call(SlaveNode2, syn, groups_count, [custom_scope_ab, node()]),
+    {badrpc, {'EXIT', {{invalid_scope, custom_scope_ab}, _}}} = catch rpc:call(SlaveNode2, syn, groups_count, [custom_scope_ab, SlaveNode1]),
+    {badrpc, {'EXIT', {{invalid_scope, custom_scope_ab}, _}}} = catch rpc:call(SlaveNode2, syn, groups_count, [custom_scope_ab, SlaveNode2]),
+    0 = rpc:call(SlaveNode2, syn, groups_count, [custom_scope_bc]),
+    0 = rpc:call(SlaveNode2, syn, groups_count, [custom_scope_bc, node()]),
+    0 = rpc:call(SlaveNode2, syn, groups_count, [custom_scope_bc, SlaveNode1]),
+    0 = rpc:call(SlaveNode2, syn, groups_count, [custom_scope_bc, SlaveNode2]),
+
+    %% errors
+    {error, not_in_group} = syn:leave(custom_scope_ab, {group, "one"}, PidWithMeta).