Browse Source

Add pg concurrency test.

Roberto Ostinelli 3 years ago
parent
commit
2e7a463c96
2 changed files with 122 additions and 18 deletions
  1. 101 1
      test/syn_pg_SUITE.erl
  2. 21 17
      test/syn_registry_SUITE.erl

+ 101 - 1
test/syn_pg_SUITE.erl

@@ -42,6 +42,9 @@
     three_nodes_multi_call/1,
     three_nodes_group_names/1
 ]).
+-export([
+    four_nodes_concurrency/1
+]).
 
 %% internals
 -export([
@@ -66,7 +69,8 @@
 %% -------------------------------------------------------------------
 all() ->
     [
-        {group, three_nodes_pg}
+        {group, three_nodes_pg},
+        {group, four_nodes_pg}
     ].
 
 %% -------------------------------------------------------------------
@@ -92,6 +96,9 @@ groups() ->
             three_nodes_publish,
             three_nodes_multi_call,
             three_nodes_group_names
+        ]},
+        {four_nodes_pg, [shuffle], [
+            four_nodes_concurrency
         ]}
     ].
 %% -------------------------------------------------------------------
@@ -128,6 +135,15 @@ init_per_group(three_nodes_pg, Config) ->
         NodesConfig ->
             NodesConfig ++ Config
     end;
+init_per_group(four_nodes_pg, Config) ->
+    case syn_test_suite_helper:init_cluster(4) of
+        {error_initializing_cluster, Other} ->
+            end_per_group(four_nodes_pg, Config),
+            {skip, Other};
+
+        NodesConfig ->
+            NodesConfig ++ Config
+    end;
 
 init_per_group(_GroupName, Config) ->
     Config.
@@ -140,6 +156,8 @@ init_per_group(_GroupName, Config) ->
 %% -------------------------------------------------------------------
 end_per_group(three_nodes_pg, Config) ->
     syn_test_suite_helper:end_cluster(3, Config);
+end_per_group(four_nodes_pg, Config) ->
+    syn_test_suite_helper:end_cluster(4, Config);
 end_per_group(_GroupName, _Config) ->
     syn_test_suite_helper:clean_after_test().
 
@@ -1612,6 +1630,88 @@ three_nodes_group_names(Config) ->
         fun() -> lists:sort(rpc:call(SlaveNode2, syn, group_names, [scope_all, SlaveNode2])) end
     ).
 
+four_nodes_concurrency(Config) ->
+    %% get slaves
+    SlaveNode1 = proplists:get_value(syn_slave_1, Config),
+    SlaveNode2 = proplists:get_value(syn_slave_2, Config),
+    SlaveNode3 = proplists:get_value(syn_slave_3, Config),
+
+    %% start syn on nodes
+    ok = syn:start(),
+    ok = rpc:call(SlaveNode1, syn, start, []),
+    ok = rpc:call(SlaveNode2, syn, start, []),
+    ok = rpc:call(SlaveNode3, syn, start, []),
+
+    %% add scopes
+    ok = syn:add_node_to_scopes([scope_all]),
+    ok = rpc:call(SlaveNode1, syn, add_node_to_scopes, [[scope_all]]),
+    ok = rpc:call(SlaveNode2, syn, add_node_to_scopes, [[scope_all]]),
+    ok = rpc:call(SlaveNode3, syn, add_node_to_scopes, [[scope_all]]),
+
+    %% ref
+    TestPid = self(),
+    Iterations = 250,
+
+    %% pids
+    PidLocal = syn_test_suite_helper:start_process(),
+    PidOn1 = syn_test_suite_helper:start_process(SlaveNode1),
+    PidOn2 = syn_test_suite_helper:start_process(SlaveNode2),
+    PidOn3 = syn_test_suite_helper:start_process(SlaveNode3),
+
+    LocalNode = node(),
+    PidMap = #{
+        LocalNode => PidLocal,
+        SlaveNode1 => PidOn1,
+        SlaveNode2 => PidOn2,
+        SlaveNode3 => PidOn3
+    },
+
+    %% concurrent test
+    WorkerFun = fun() ->
+        Pid = maps:get(node(), PidMap),
+        lists:foreach(fun(_) ->
+            %% loop
+            RandomMeta = rand:uniform(99999),
+            ok = syn:join(scope_all, <<"concurrent-scope">>, Pid, RandomMeta),
+            case rand:uniform(5) of
+                1 -> syn:leave(scope_all, <<"concurrent-scope">>, Pid);
+                _ -> ok
+            end,
+            RndTime = rand:uniform(30),
+            timer:sleep(RndTime)
+        end, lists:seq(1, Iterations)),
+        TestPid ! {done, node()}
+    end,
+
+    %% spawn concurrent
+    spawn(LocalNode, WorkerFun),
+    spawn(SlaveNode1, WorkerFun),
+    spawn(SlaveNode2, WorkerFun),
+    spawn(SlaveNode3, WorkerFun),
+
+    %% wait for workers done
+    syn_test_suite_helper:assert_received_messages([
+        {done, LocalNode},
+        {done, SlaveNode1},
+        {done, SlaveNode2},
+        {done, SlaveNode3}
+    ]),
+
+    %% check results are same across network
+    syn_test_suite_helper:assert_wait(
+        1,
+        fun() ->
+            ResultPidLocal = lists:sort(syn:members(scope_all, <<"concurrent-scope">>)),
+            ResultPidOn1 = lists:sort(rpc:call(SlaveNode1, syn, members, [scope_all, <<"concurrent-scope">>])),
+            ResultPidOn2 = lists:sort(rpc:call(SlaveNode2, syn, members, [scope_all, <<"concurrent-scope">>])),
+            ResultPidOn3 = lists:sort(rpc:call(SlaveNode3, syn, members, [scope_all, <<"concurrent-scope">>])),
+
+            %% if unique set is of 1 element then they all contain the same result
+            Ordset = ordsets:from_list([ResultPidLocal, ResultPidOn1, ResultPidOn2, ResultPidOn3]),
+            ordsets:size(Ordset)
+        end
+    ).
+
 %% ===================================================================
 %% Internal
 %% ===================================================================

+ 21 - 17
test/syn_registry_SUITE.erl

@@ -65,9 +65,9 @@
 %% -------------------------------------------------------------------
 all() ->
     [
-        {group, one_node_process_registration},
-        {group, three_nodes_process_registration},
-        {group, four_nodes_process_registration}
+        {group, one_node_registry},
+        {group, three_nodes_registry},
+        {group, four_nodes_registry}
     ].
 
 %% -------------------------------------------------------------------
@@ -84,10 +84,10 @@ all() ->
 %% -------------------------------------------------------------------
 groups() ->
     [
-        {one_node_process_registration, [shuffle], [
+        {one_node_registry, [shuffle], [
             one_node_via_register_unregister
         ]},
-        {three_nodes_process_registration, [shuffle], [
+        {three_nodes_registry, [shuffle], [
             three_nodes_discover,
             three_nodes_register_unregister_and_monitor,
             three_nodes_register_filter_unknown_node,
@@ -96,7 +96,7 @@ groups() ->
             three_nodes_custom_event_handler_reg_unreg,
             three_nodes_custom_event_handler_conflict_resolution
         ]},
-        {four_nodes_process_registration, [shuffle], [
+        {four_nodes_registry, [shuffle], [
             four_nodes_concurrency
         ]}
     ].
@@ -125,20 +125,20 @@ end_per_suite(_Config) ->
 %% Config0 = Config1 = [tuple()]
 %% Reason = any()
 %% -------------------------------------------------------------------
-init_per_group(three_nodes_process_registration, Config) ->
+init_per_group(three_nodes_registry, Config) ->
     case syn_test_suite_helper:init_cluster(3) of
         {error_initializing_cluster, Other} ->
-            end_per_group(three_nodes_process_registration, Config),
+            end_per_group(three_nodes_registry, Config),
             {skip, Other};
 
         NodesConfig ->
             NodesConfig ++ Config
     end;
 
-init_per_group(four_nodes_process_registration, Config) ->
+init_per_group(four_nodes_registry, Config) ->
     case syn_test_suite_helper:init_cluster(4) of
         {error_initializing_cluster, Other} ->
-            end_per_group(four_nodes_process_registration, Config),
+            end_per_group(four_nodes_registry, Config),
             {skip, Other};
 
         NodesConfig ->
@@ -154,9 +154,9 @@ init_per_group(_GroupName, Config) ->
 %% GroupName = atom()
 %% Config0 = Config1 = [tuple()]
 %% -------------------------------------------------------------------
-end_per_group(three_nodes_process_registration, Config) ->
+end_per_group(three_nodes_registry, Config) ->
     syn_test_suite_helper:end_cluster(3, Config);
-end_per_group(four_nodes_process_registration, Config) ->
+end_per_group(four_nodes_registry, Config) ->
     syn_test_suite_helper:end_cluster(4, Config);
 end_per_group(_GroupName, _Config) ->
     syn_test_suite_helper:clean_after_test().
@@ -1472,8 +1472,9 @@ four_nodes_concurrency(Config) ->
         lists:foreach(fun(_) ->
             %% start pid
             Pid = syn_test_suite_helper:start_process(),
+            RandomMeta = rand:uniform(99999),
             %% loop
-            case syn:register(scope_all, <<"concurrent">>, Pid) of
+            case syn:register(scope_all, <<"concurrent">>, Pid, RandomMeta) of
                 ok ->
                     ok;
 
@@ -1484,7 +1485,7 @@ four_nodes_concurrency(Config) ->
                         {error, race_condition} ->
                             ok;
                         ok ->
-                            syn:register(scope_all, <<"concurrent">>, Pid)
+                            syn:register(scope_all, <<"concurrent">>, Pid, RandomMeta)
                     end
             end,
             RndTime = rand:uniform(30),
@@ -1500,7 +1501,7 @@ four_nodes_concurrency(Config) ->
     spawn(SlaveNode2, WorkerFun),
     spawn(SlaveNode3, WorkerFun),
 
-    %% check results are same across network
+    %% wait for workers done
     syn_test_suite_helper:assert_received_messages([
         {done, LocalNode},
         {done, SlaveNode1},
@@ -1508,6 +1509,7 @@ four_nodes_concurrency(Config) ->
         {done, SlaveNode3}
     ]),
 
+    %% check results are same across network
     syn_test_suite_helper:assert_wait(
         1,
         fun() ->
@@ -1515,8 +1517,10 @@ four_nodes_concurrency(Config) ->
             ResultPidOn1 = rpc:call(SlaveNode1, syn, lookup, [scope_all, <<"concurrent">>]),
             ResultPidOn2 = rpc:call(SlaveNode2, syn, lookup, [scope_all, <<"concurrent">>]),
             ResultPidOn3 = rpc:call(SlaveNode3, syn, lookup, [scope_all, <<"concurrent">>]),
-            O = ordsets:from_list([ResultPidLocal, ResultPidOn1, ResultPidOn2, ResultPidOn3]),
-            ordsets:size(O)
+
+            %% if unique set is of 1 element then they all contain the same result
+            Ordset = ordsets:from_list([ResultPidLocal, ResultPidOn1, ResultPidOn2, ResultPidOn3]),
+            ordsets:size(Ordset)
         end
     ).