Browse Source

Allow creation of scope subclusters.

Roberto Ostinelli 3 years ago
parent
commit
730d3f7d57
6 changed files with 185 additions and 106 deletions
  1. 13 17
      src/syn.erl
  2. 3 3
      src/syn_groups.erl
  3. 10 10
      src/syn_registry.erl
  4. 30 5
      src/syn_scopes_sup.erl
  5. 125 69
      test/syn_registry_SUITE.erl
  6. 4 2
      test/syn_test_suite_helper.erl

+ 13 - 17
src/syn.erl

@@ -27,9 +27,7 @@
 
 %% API
 -export([start/0, stop/0]).
--export([register/2, register/3]).
--export([lookup/1]).
--export([unregister/1]).
+-export([get_node_scopes/0, add_node_to_scope/1, add_node_to_scopes/1]).
 
 %% ===================================================================
 %% API
@@ -43,19 +41,17 @@ start() ->
 stop() ->
     application:stop(syn).
 
-%% ----- \/ registry -------------------------------------------------
--spec register(Name :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
-register(Name, Pid) ->
-    syn_registry:register(Name, Pid).
+%% ----- \/ scopes ---------------------------------------------------
+-spec get_node_scopes() -> [atom()].
+get_node_scopes() ->
+    syn_scopes_sup:get_node_scopes().
 
--spec register(Name :: any(), Pid :: pid(), Meta :: any()) -> ok | {error, Reason :: any()}.
-register(Name, Pid, Meta) ->
-    syn_registry:register(Name, Pid, Meta).
+-spec add_node_to_scope(Scope :: atom()) -> ok.
+add_node_to_scope(Scope) ->
+    syn_scopes_sup:add_node_to_scope(Scope).
 
--spec lookup(Name :: any()) -> {pid(), Meta :: term()} | undefined.
-lookup(Name) ->
-    syn_registry:lookup(Name).
-
--spec unregister(Name :: any()) -> ok | {error, Reason :: any()}.
-unregister(Name) ->
-    syn_registry:unregister(Name).
+-spec add_node_to_scopes(Scopes :: [atom()]) -> ok.
+add_node_to_scopes(Scopes) ->
+    lists:foreach(fun(Scope) ->
+        syn_scopes_sup:add_node_to_scope(Scope)
+    end, Scopes).

+ 3 - 3
src/syn_groups.erl

@@ -44,7 +44,7 @@
 -spec start_link(Scope :: atom()) -> {ok, pid()} | {error, any()}.
 start_link(Scope) when is_atom(Scope) ->
     Args = [],
-    ProcessName = get_process_name(Scope),
+    ProcessName = get_process_name_for(Scope),
     gen_server:start_link({local, ProcessName}, ?MODULE, Args, []).
 
 %% ===================================================================
@@ -120,8 +120,8 @@ code_change(_OldVsn, State, _Extra) ->
 %% ===================================================================
 %% Internal
 %% ===================================================================
--spec get_process_name(Scope :: atom()) -> atom().
-get_process_name(Scope) ->
+-spec get_process_name_for(Scope :: atom()) -> atom().
+get_process_name_for(Scope) ->
     ModuleBin = atom_to_binary(?MODULE),
     ScopeBin = atom_to_binary(Scope),
     binary_to_atom(<<ModuleBin/binary, "_", ScopeBin/binary>>).

+ 10 - 10
src/syn_registry.erl

@@ -52,12 +52,12 @@
 %% ===================================================================
 -spec start_link(Scope :: atom()) -> {ok, pid()} | {error, any()}.
 start_link(Scope) when is_atom(Scope) ->
-    ProcessName = get_process_name(Scope),
+    ProcessName = get_process_name_for(Scope),
     Args = [Scope, ProcessName],
     gen_server:start_link({local, ProcessName}, ?MODULE, Args, []).
 
 get_nodes(Scope) ->
-    ProcessName = get_process_name(Scope),
+    ProcessName = get_process_name_for(Scope),
     gen_server:call(ProcessName, get_nodes).
 
 %% ===================================================================
@@ -78,7 +78,7 @@ sync(RemoteNode, ProcessName) ->
 %% ----------------------------------------------------------------------------------------------------------
 -spec init([term()]) ->
     {ok, #state{}} |
-    {ok, #state{}, Timeout :: pos_integer()} |
+    {ok, #state{}, Timeout :: non_neg_integer()} |
     ignore |
     {stop, Reason :: any()}.
 init([Scope, ProcessName]) ->
@@ -97,9 +97,9 @@ init([Scope, ProcessName]) ->
 %% ----------------------------------------------------------------------------------------------------------
 -spec handle_call(Request :: any(), From :: any(), #state{}) ->
     {reply, Reply :: any(), #state{}} |
-    {reply, Reply :: any(), #state{}, Timeout :: pos_integer()} |
+    {reply, Reply :: any(), #state{}, Timeout :: non_neg_integer()} |
     {noreply, #state{}} |
-    {noreply, #state{}, Timeout :: pos_integer()} |
+    {noreply, #state{}, Timeout :: non_neg_integer()} |
     {stop, Reason :: any(), Reply :: any(), #state{}} |
     {stop, Reason :: any(), #state{}}.
 handle_call(get_nodes, _From, #state{
@@ -116,7 +116,7 @@ handle_call(Request, From, State) ->
 %% ----------------------------------------------------------------------------------------------------------
 -spec handle_cast(Msg :: any(), #state{}) ->
     {noreply, #state{}} |
-    {noreply, #state{}, Timeout :: pos_integer()} |
+    {noreply, #state{}, Timeout :: non_neg_integer()} |
     {stop, Reason :: any(), #state{}}.
 handle_cast({announce, RemoteScopePid}, #state{
     scope = Scope,
@@ -132,7 +132,7 @@ handle_cast({announce, RemoteScopePid}, #state{
         true ->
             %% already known, ignore
             {noreply, State};
-        
+
         false ->
             %% monitor & announce
             _MRef = monitor(process, RemoteScopePid),
@@ -168,7 +168,7 @@ handle_cast(Msg, State) ->
 %% ----------------------------------------------------------------------------------------------------------
 -spec handle_info(Info :: any(), #state{}) ->
     {noreply, #state{}} |
-    {noreply, #state{}, Timeout :: pos_integer()} |
+    {noreply, #state{}, Timeout :: non_neg_integer()} |
     {stop, Reason :: any(), #state{}}.
 handle_info(timeout, #state{
     scope = Scope,
@@ -225,8 +225,8 @@ code_change(_OldVsn, State, _Extra) ->
 %% ===================================================================
 %% Internal
 %% ===================================================================
--spec get_process_name(Scope :: atom()) -> atom().
-get_process_name(Scope) ->
+-spec get_process_name_for(Scope :: atom()) -> atom().
+get_process_name_for(Scope) ->
     ModuleBin = atom_to_binary(?MODULE),
     ScopeBin = atom_to_binary(Scope),
     binary_to_atom(<<ModuleBin/binary, "_", ScopeBin/binary>>).

+ 30 - 5
src/syn_scopes_sup.erl

@@ -28,6 +28,7 @@
 
 %% API
 -export([start_link/0]).
+-export([get_node_scopes/0, add_node_to_scope/1]).
 
 %% Supervisor callbacks
 -export([init/1]).
@@ -39,17 +40,41 @@
 start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
+-spec get_node_scopes() -> [atom()].
+get_node_scopes() ->
+    case application:get_env(syn, syn_custom_scopes) of
+        undefined -> [default];
+        {ok, Scopes} -> [default] ++ maps:keys(Scopes)
+    end.
+
+-spec add_node_to_scope(Scope :: atom()) -> ok.
+add_node_to_scope(Scope) ->
+    error_logger:info_msg("SYN[~p] Adding node to scope: ~p~n", [node(), Scope]),
+    %% save to ENV (failsafe if sup is restarted)
+    CustomScopes0 = case application:get_env(syn, syn_custom_scopes) of
+        undefined -> #{};
+        {ok, Scopes} -> Scopes
+    end,
+    CustomScopes = CustomScopes0#{Scope => #{}},
+    application:set_env(syn, syn_custom_scopes, CustomScopes),
+    %% start children
+    supervisor:start_child(?MODULE, scope_child_spec(syn_registry, Scope)),
+    supervisor:start_child(?MODULE, scope_child_spec(syn_groups, Scope)),
+    ok.
+
 %% ===================================================================
 %% Callbacks
 %% ===================================================================
 -spec init([]) ->
     {ok, {{supervisor:strategy(), non_neg_integer(), pos_integer()}, [supervisor:child_spec()]}}.
 init([]) ->
-    %% start default scopes
-    Children = [
-        scope_child_spec(syn_registry, default),
-        scope_child_spec(syn_groups, default)
-    ],
+    Children = lists:foldl(fun(Scope, Acc) ->
+        [
+            scope_child_spec(syn_registry, Scope),
+            scope_child_spec(syn_groups, Scope)
+            | Acc
+        ]
+    end, [], get_node_scopes()),
     {ok, {{one_for_one, 10, 10}, Children}}.
 
 %% ===================================================================

+ 125 - 69
test/syn_registry_SUITE.erl

@@ -33,7 +33,8 @@
 
 %% tests
 -export([
-    three_nodes_discover_default_scope/1
+    three_nodes_discover_default_scope/1,
+    three_nodes_discover_custom_scope/1
 %%    three_nodes_register_unregister_and_monitor_default_scope/1
 ]).
 
@@ -72,7 +73,8 @@ all() ->
 groups() ->
     [
         {three_nodes_process_registration, [shuffle], [
-            three_nodes_discover_default_scope
+            three_nodes_discover_default_scope,
+            three_nodes_discover_custom_scope
 %%            three_nodes_register_unregister_and_monitor_default_scope
         ]}
     ].
@@ -172,22 +174,10 @@ three_nodes_discover_default_scope(Config) ->
     ok = rpc:call(SlaveNode2, syn, start, []),
     timer:sleep(100),
 
-    %% check scope network
-    NodesMap0_0 = syn_registry:get_nodes(default),
-    Nodes0_0 = maps:keys(NodesMap0_0),
-    2 = length(Nodes0_0),
-    true = lists:member(SlaveNode1, Nodes0_0),
-    true = lists:member(SlaveNode2, Nodes0_0),
-    NodesMap0_1 = rpc:call(SlaveNode1, syn_registry, get_nodes, [default]),
-    Nodes0_1 = maps:keys(NodesMap0_1),
-    2 = length(Nodes0_1),
-    true = lists:member(node(), Nodes0_1),
-    true = lists:member(SlaveNode2, Nodes0_1),
-    NodesMap0_2 = rpc:call(SlaveNode2, syn_registry, get_nodes, [default]),
-    Nodes0_2 = maps:keys(NodesMap0_2),
-    2 = length(Nodes0_2),
-    true = lists:member(node(), Nodes0_2),
-    true = lists:member(SlaveNode1, Nodes0_2),
+    %% check
+    assert_scope_subcluster(node(), default, [SlaveNode1, SlaveNode2]),
+    assert_scope_subcluster(SlaveNode1, default, [node(), SlaveNode2]),
+    assert_scope_subcluster(SlaveNode2, default, [node(), SlaveNode1]),
 
     %% simulate full netsplit
     rpc:call(SlaveNode1, syn_test_suite_helper, disconnect_node, [SlaveNode2]),
@@ -195,69 +185,124 @@ three_nodes_discover_default_scope(Config) ->
     syn_test_suite_helper:disconnect_node(SlaveNode2),
     timer:sleep(100),
 
-    %% check scope network
-    NodesMap1_0 = syn_registry:get_nodes(default),
-    Nodes1_0 = maps:keys(NodesMap1_0),
-    0 = length(Nodes1_0),
+    %% check
+    assert_scope_subcluster(node(), default, []),
 
-    %% reconnect one node
+    %% reconnect slave 1
     syn_test_suite_helper:connect_node(SlaveNode1),
     ok = syn_test_suite_helper:wait_cluster_connected([node(), SlaveNode1]),
 
-    %% check scope network
-    NodesMap2_0 = syn_registry:get_nodes(default),
-    Nodes2_0 = maps:keys(NodesMap2_0),
-    1 = length(Nodes2_0),
-    true = lists:member(SlaveNode1, Nodes2_0),
-    false = lists:member(SlaveNode2, Nodes2_0),
-    NodesMap2_1 = rpc:call(SlaveNode1, syn_registry, get_nodes, [default]),
-    Nodes2_1 = maps:keys(NodesMap2_1),
-    1 = length(Nodes2_1),
-    true = lists:member(node(), Nodes2_1),
-    false = lists:member(SlaveNode2, Nodes2_1),
+    %% check
+    assert_scope_subcluster(node(), default, [SlaveNode1]),
+    assert_scope_subcluster(SlaveNode1, default, [node()]),
 
     %% reconnect all
     syn_test_suite_helper:connect_node(SlaveNode2),
     rpc:call(SlaveNode1, syn_test_suite_helper, connect_node, [SlaveNode2]),
     ok = syn_test_suite_helper:wait_cluster_connected([node(), SlaveNode1, SlaveNode2]),
 
-    %% check scope network
-    NodesMap3_0 = syn_registry:get_nodes(default),
-    Nodes3_0 = maps:keys(NodesMap3_0),
-    2 = length(Nodes3_0),
-    true = lists:member(SlaveNode1, Nodes3_0),
-    true = lists:member(SlaveNode2, Nodes3_0),
-    NodesMap3_1 = rpc:call(SlaveNode1, syn_registry, get_nodes, [default]),
-    Nodes3_1 = maps:keys(NodesMap3_1),
-    2 = length(Nodes3_1),
-    true = lists:member(node(), Nodes3_1),
-    true = lists:member(SlaveNode2, Nodes3_1),
-    NodesMap3_2 = rpc:call(SlaveNode2, syn_registry, get_nodes, [default]),
-    Nodes3_2 = maps:keys(NodesMap3_2),
-    2 = length(Nodes3_2),
-    true = lists:member(node(), Nodes3_2),
-    true = lists:member(SlaveNode1, Nodes3_2),
-
-    %% crash the scope process on slave1
-    rpc:call(SlaveNode1, syn_test_suite_helper, kill_process, [syn_registry_default]),
+    %% check
+    assert_scope_subcluster(node(), default, [SlaveNode1, SlaveNode2]),
+    assert_scope_subcluster(SlaveNode1, default, [node(), SlaveNode2]),
+    assert_scope_subcluster(SlaveNode2, default, [node(), SlaveNode1]),
+
+    %% crash the scope process on local
+    syn_test_suite_helper:kill_process(syn_registry_default),
+    timer:sleep(100),
+
+    %% check, it should have rebuilt after supervisor restarts it
+    assert_scope_subcluster(node(), default, [SlaveNode1, SlaveNode2]),
+    assert_scope_subcluster(SlaveNode1, default, [node(), SlaveNode2]),
+    assert_scope_subcluster(SlaveNode2, default, [node(), SlaveNode1]),
+
+    %% crash scopes supervisor on local
+    syn_test_suite_helper:kill_process(syn_scopes_sup),
+    timer:sleep(100),
+
+    %% check
+    assert_scope_subcluster(node(), default, [SlaveNode1, SlaveNode2]),
+    assert_scope_subcluster(SlaveNode1, default, [node(), SlaveNode2]),
+    assert_scope_subcluster(SlaveNode2, default, [node(), SlaveNode1]).
+
+three_nodes_discover_custom_scope(Config) ->
+    %% get slaves
+    SlaveNode1 = proplists:get_value(slave_node_1, Config),
+    SlaveNode2 = proplists:get_value(slave_node_2, Config),
+    %% start syn on nodes
+    ok = syn:start(),
+    ok = rpc:call(SlaveNode1, syn, start, []),
+    ok = rpc:call(SlaveNode2, syn, start, []),
+    timer:sleep(100),
+
+    %% add custom scopes
+    ok = syn:add_node_to_scope(custom_scope_a),
+    ok = syn:add_node_to_scope(custom_scope_all),
+    ok = rpc:call(SlaveNode1, syn, add_node_to_scopes, [[custom_scope_a, custom_scope_b, custom_scope_all]]),
+    ok = rpc:call(SlaveNode2, syn, add_node_to_scopes, [[custom_scope_b, custom_scope_c, custom_scope_all]]),
+    timer:sleep(100),
+
+    %% check
+    assert_scope_subcluster(node(), custom_scope_a, [SlaveNode1]),
+    assert_scope_subcluster(node(), custom_scope_all, [SlaveNode1, SlaveNode2]),
+    assert_scope_subcluster(SlaveNode1, custom_scope_a, [node()]),
+    assert_scope_subcluster(SlaveNode1, custom_scope_b, [SlaveNode2]),
+    assert_scope_subcluster(SlaveNode1, custom_scope_all, [node(), SlaveNode2]),
+    assert_scope_subcluster(SlaveNode2, custom_scope_b, [SlaveNode1]),
+    assert_scope_subcluster(SlaveNode2, custom_scope_c, []),
+    assert_scope_subcluster(SlaveNode2, custom_scope_all, [node(), SlaveNode1]),
+
+    %% disconnect node 2 (node 1 can still see node 2)
+    syn_test_suite_helper:disconnect_node(SlaveNode2),
     timer:sleep(100),
 
-    %% check scope network, it should have rebuilt after supervisor restarts it
-    NodesMap3_0 = syn_registry:get_nodes(default),
-    Nodes3_0 = maps:keys(NodesMap3_0),
-    2 = length(Nodes3_0),
-    true = lists:member(SlaveNode1, Nodes3_0),
-    true = lists:member(SlaveNode2, Nodes3_0),
-    NodesMap3_1 = rpc:call(SlaveNode1, syn_registry, get_nodes, [default]),
-    Nodes3_1 = maps:keys(NodesMap3_1),
-    2 = length(Nodes3_1),
-    true = lists:member(node(), Nodes3_1),
-    true = lists:member(SlaveNode2, Nodes3_1),
-    NodesMap3_2 = rpc:call(SlaveNode2, syn_registry, get_nodes, [default]),
-    Nodes3_2 = maps:keys(NodesMap3_2),
-    2 = length(Nodes3_2),
-    true = lists:member(node(), Nodes3_2),
-    true = lists:member(SlaveNode1, Nodes3_2).
+    %% check
+    assert_scope_subcluster(node(), custom_scope_a, [SlaveNode1]),
+    assert_scope_subcluster(node(), custom_scope_all, [SlaveNode1]),
+    assert_scope_subcluster(SlaveNode1, custom_scope_a, [node()]),
+    assert_scope_subcluster(SlaveNode1, custom_scope_b, [SlaveNode2]),
+    assert_scope_subcluster(SlaveNode1, custom_scope_all, [node(), SlaveNode2]),
+
+    %% reconnect node 2
+    syn_test_suite_helper:connect_node(SlaveNode2),
+    ok = syn_test_suite_helper:wait_cluster_connected([node(), SlaveNode1, SlaveNode2]),
+
+    %% check
+    assert_scope_subcluster(node(), custom_scope_a, [SlaveNode1]),
+    assert_scope_subcluster(node(), custom_scope_all, [SlaveNode1, SlaveNode2]),
+    assert_scope_subcluster(SlaveNode1, custom_scope_a, [node()]),
+    assert_scope_subcluster(SlaveNode1, custom_scope_b, [SlaveNode2]),
+    assert_scope_subcluster(SlaveNode1, custom_scope_all, [node(), SlaveNode2]),
+    assert_scope_subcluster(SlaveNode2, custom_scope_b, [SlaveNode1]),
+    assert_scope_subcluster(SlaveNode2, custom_scope_c, []),
+    assert_scope_subcluster(SlaveNode2, custom_scope_all, [node(), SlaveNode1]),
+
+    %% crash a scope process on 2
+    rpc:call(SlaveNode2, syn_test_suite_helper, kill_process, [syn_registry_custom_scope_b]),
+    timer:sleep(100),
+
+    %% check
+    assert_scope_subcluster(node(), custom_scope_a, [SlaveNode1]),
+    assert_scope_subcluster(node(), custom_scope_all, [SlaveNode1, SlaveNode2]),
+    assert_scope_subcluster(SlaveNode1, custom_scope_a, [node()]),
+    assert_scope_subcluster(SlaveNode1, custom_scope_b, [SlaveNode2]),
+    assert_scope_subcluster(SlaveNode1, custom_scope_all, [node(), SlaveNode2]),
+    assert_scope_subcluster(SlaveNode2, custom_scope_b, [SlaveNode1]),
+    assert_scope_subcluster(SlaveNode2, custom_scope_c, []),
+    assert_scope_subcluster(SlaveNode2, custom_scope_all, [node(), SlaveNode1]),
+
+    %% crash scopes supervisor on local
+    syn_test_suite_helper:kill_process(syn_scopes_sup),
+    timer:sleep(100),
+
+    %% check
+    assert_scope_subcluster(node(), custom_scope_a, [SlaveNode1]),
+    assert_scope_subcluster(node(), custom_scope_all, [SlaveNode1, SlaveNode2]),
+    assert_scope_subcluster(SlaveNode1, custom_scope_a, [node()]),
+    assert_scope_subcluster(SlaveNode1, custom_scope_b, [SlaveNode2]),
+    assert_scope_subcluster(SlaveNode1, custom_scope_all, [node(), SlaveNode2]),
+    assert_scope_subcluster(SlaveNode2, custom_scope_b, [SlaveNode1]),
+    assert_scope_subcluster(SlaveNode2, custom_scope_c, []),
+    assert_scope_subcluster(SlaveNode2, custom_scope_all, [node(), SlaveNode1]).
 
 %%three_nodes_register_unregister_and_monitor_default_scope(Config) ->
 %%    %% get slaves
@@ -342,3 +387,14 @@ three_nodes_discover_default_scope(Config) ->
 %%    undefined = syn:lookup({remote_pid_on, slave_1}),
 %%    undefined = rpc:call(SlaveNode1, syn, lookup, [{remote_pid_on, slave_1}]),
 %%    undefined = rpc:call(SlaveNode2, syn, lookup, [{remote_pid_on, slave_1}]).
+
+
+%% ===================================================================
+%% Internal
+%% ===================================================================
+assert_scope_subcluster(Node, Scope, ExpectedNodes) ->
+    NodesMap = rpc:call(Node, syn_registry, get_nodes, [Scope]),
+    Nodes = maps:keys(NodesMap),
+    ExpectedCount = length(ExpectedNodes),
+    ExpectedCount = length(Nodes),
+    lists:foreach(fun(RemoteNode) -> true = lists:member(RemoteNode, Nodes) end, ExpectedNodes).

+ 4 - 2
test/syn_test_suite_helper.erl

@@ -87,8 +87,10 @@ start_process(Node, Loop) ->
     Pid = spawn(Node, Loop),
     Pid.
 
-kill_process(Pid) ->
-    exit(Pid, kill).
+kill_process(Pid) when is_pid(Pid) ->
+    exit(Pid, kill);
+kill_process(RegisteredName) when is_atom(RegisteredName) ->
+    exit(whereis(RegisteredName), kill).
 
 wait_cluster_connected(Nodes) ->
     wait_cluster_connected(Nodes, os:system_time(millisecond)).