Browse Source

Finalize PG tests.

Roberto Ostinelli 9 years ago
parent
commit
f32f907a62

+ 2 - 1
src/syn.app.src

@@ -3,8 +3,9 @@
         {description, "A global process registry."},
         {vsn, "1.0.0"},
         {registered, [
+            syn_consistency,
             syn_global,
-            syn_consistency
+            syn_pg
         ]},
         {applications, [
             kernel,

+ 59 - 8
src/syn_consistency.erl

@@ -38,6 +38,8 @@
 %% internal
 -export([get_processes_info_of_node/1]).
 -export([write_processes_info_to_node/2]).
+-export([get_pgs_info_of_node/1]).
+-export([write_pgs_info_to_node/2]).
 
 %% records
 -record(state, {
@@ -125,7 +127,8 @@ handle_info({mnesia_system_event, {inconsistent_database, Context, RemoteNode}},
 
 handle_info({mnesia_system_event, {mnesia_down, RemoteNode}}, State) when RemoteNode =/= node() ->
     error_logger:error_msg("Received a MNESIA down event, removing on node ~p all pids of node ~p", [node(), RemoteNode]),
-    delete_pids_of_disconnected_node(RemoteNode),
+    delete_global_pids_of_disconnected_node(RemoteNode),
+    delete_pg_pids_of_disconnected_node(RemoteNode),
     {noreply, State};
 
 handle_info({mnesia_system_event, _MnesiaEvent}, State) ->
@@ -149,7 +152,7 @@ handle_info(Info, State) ->
 %% ----------------------------------------------------------------------------------------------------------
 -spec terminate(Reason :: any(), #state{}) -> terminated.
 terminate(Reason, _State) ->
-    error_logger:info_msg("Terminating syn netsplits with reason: ~p", [Reason]),
+    error_logger:info_msg("Terminating syn consistency with reason: ~p", [Reason]),
     terminated.
 
 %% ----------------------------------------------------------------------------------------------------------
@@ -163,8 +166,8 @@ code_change(_OldVsn, State, _Extra) ->
 %% ===================================================================
 %% Internal
 %% ===================================================================
--spec delete_pids_of_disconnected_node(RemoteNode :: atom()) -> ok.
-delete_pids_of_disconnected_node(RemoteNode) ->
+-spec delete_global_pids_of_disconnected_node(RemoteNode :: atom()) -> ok.
+delete_global_pids_of_disconnected_node(RemoteNode) ->
     %% build match specs
     MatchHead = #syn_global_table{key = '$1', node = '$2', _ = '_'},
     Guard = {'=:=', '$2', RemoteNode},
@@ -174,6 +177,15 @@ delete_pids_of_disconnected_node(RemoteNode) ->
     NodePids = mnesia:dirty_select(syn_global_table, [{MatchHead, [Guard], [IdFormat]}]),
     lists:foreach(DelF, NodePids).
 
+-spec delete_pg_pids_of_disconnected_node(RemoteNode :: atom()) -> ok.
+delete_pg_pids_of_disconnected_node(RemoteNode) ->
+    %% build match specs
+    Pattern = #syn_pg_table{node = RemoteNode, _ = '_'},
+    ObjectsToDelete = mnesia:dirty_match_object(syn_pg_table, Pattern),
+    %% delete
+    DelF = fun(Record) -> mnesia:dirty_delete_object(syn_pg_table, Record) end,
+    lists:foreach(DelF, ObjectsToDelete).
+
 -spec automerge(RemoteNode :: atom()) -> ok.
 automerge(RemoteNode) ->
     global:trans({{?MODULE, automerge}, self()},
@@ -202,17 +214,18 @@ stitch(RemoteNode) ->
     mnesia_controller:connect_nodes(
         [RemoteNode],
         fun(MergeF) ->
-            catch case MergeF([syn_global_table]) of
+            catch case MergeF([syn_global_table, syn_pg_table]) of
                 {merged, _, _} = Res ->
-                    stitch_tab(RemoteNode),
+                    stitch_global_tab(RemoteNode),
+                    stitch_pg_tab(RemoteNode),
                     Res;
                 Other ->
                     Other
             end
         end).
 
--spec stitch_tab(RemoteNode :: atom()) -> ok.
-stitch_tab(RemoteNode) ->
+-spec stitch_global_tab(RemoteNode :: atom()) -> ok.
+stitch_global_tab(RemoteNode) ->
     %% get remote processes info
     RemoteProcessesInfo = rpc:call(RemoteNode, ?MODULE, get_processes_info_of_node, [RemoteNode]),
     %% get local processes info
@@ -312,3 +325,41 @@ purge_double_processes(ConflictingProcessCallbackModule, ConflictingProcessCallb
             end)
     end,
     lists:foreach(F, DoubleProcessesInfo).
+
+-spec stitch_pg_tab(RemoteNode :: atom()) -> ok.
+stitch_pg_tab(RemoteNode) ->
+    %% get remote processes info
+    RemotePgsInfo = rpc:call(RemoteNode, ?MODULE, get_pgs_info_of_node, [RemoteNode]),
+    %% get local processes info
+    LocalPgsInfo = get_pgs_info_of_node(node()),
+    %% write
+    write_remote_pgs_to_local(RemoteNode, RemotePgsInfo),
+    write_local_pgs_to_remote(RemoteNode, LocalPgsInfo).
+
+-spec get_pgs_info_of_node(Node :: atom()) -> list().
+get_pgs_info_of_node(Node) ->
+    %% build match specs
+    MatchHead = #syn_pg_table{name = '$1', pid = '$2', node = '$3'},
+    Guard = {'=:=', '$3', Node},
+    PgInfoFormat = {{'$1', '$2'}},
+    %% select
+    mnesia:dirty_select(syn_pg_table, [{MatchHead, [Guard], [PgInfoFormat]}]).
+
+-spec write_remote_pgs_to_local(RemoteNode :: atom(), RemotePgsInfo :: list()) -> ok.
+write_remote_pgs_to_local(RemoteNode, RemotePgsInfo) ->
+    write_pgs_info_to_node(RemoteNode, RemotePgsInfo).
+
+-spec write_local_pgs_to_remote(RemoteNode :: atom(), LocalPgsInfo :: list()) -> ok.
+write_local_pgs_to_remote(RemoteNode, LocalPgsInfo) ->
+    ok = rpc:call(RemoteNode, ?MODULE, write_pgs_info_to_node, [node(), LocalPgsInfo]).
+
+-spec write_pgs_info_to_node(Node :: atom(), PgsInfo :: list()) -> ok.
+write_pgs_info_to_node(Node, PgsInfo) ->
+    FWrite = fun({Name, Pid}) ->
+        mnesia:dirty_write(#syn_pg_table{
+            name = Name,
+            pid = Pid,
+            node = Node
+        })
+    end,
+    lists:foreach(FWrite, PgsInfo).

+ 1 - 1
test/syn-test.config

@@ -19,7 +19,7 @@
         %% If this is not desired, you can set the conflicting_process_callback option here below to instruct Syn
         %% to trigger a callback, so that you can perform custom operations (such as a graceful shutdown).
 
-        {conflicting_process_callback, [syn_consistency_SUITE, conflicting_process_callback_dummy]}
+        {conflicting_process_callback, [syn_registry_consistency_SUITE, conflicting_process_callback_dummy]}
 
     ]}
 

+ 13 - 1
test/syn_process_groups_SUITE.erl

@@ -62,7 +62,7 @@
 %% -------------------------------------------------------------------
 all() ->
     [
-%%        {group, single_node_process_groups},
+        {group, single_node_process_groups},
         {group, two_nodes_process_groups}
     ].
 
@@ -169,6 +169,8 @@ end_per_testcase(_TestCase, Config) ->
 %% Tests
 %% ===================================================================
 single_node_leave(_Config) ->
+    %% set schema location
+    application:set_env(mnesia, schema_location, ram),
     %% start
     ok = syn:start(),
     ok = syn:init(),
@@ -191,6 +193,8 @@ single_node_leave(_Config) ->
     syn_test_suite_helper:kill_process(Pid).
 
 single_node_kill(_Config) ->
+    %% set schema location
+    application:set_env(mnesia, schema_location, ram),
     %% start
     ok = syn:start(),
     ok = syn:init(),
@@ -212,6 +216,8 @@ single_node_kill(_Config) ->
     false = syn:member(Pid, <<"my pg">>).
 
 single_node_publish(_Config) ->
+    %% set schema location
+    application:set_env(mnesia, schema_location, ram),
     %% start
     ok = syn:start(),
     ok = syn:init(),
@@ -243,6 +249,9 @@ single_node_publish(_Config) ->
 two_nodes_kill(Config) ->
     %% get slave
     SlaveNode = proplists:get_value(slave_node, Config),
+    %% set schema location
+    application:set_env(mnesia, schema_location, ram),
+    rpc:call(SlaveNode, mnesia, schema_location, [ram]),
     %% start
     ok = syn:start(),
     ok = syn:init(),
@@ -284,6 +293,9 @@ two_nodes_kill(Config) ->
 two_nodes_publish(Config) ->
     %% get slave
     SlaveNode = proplists:get_value(slave_node, Config),
+    %% set schema location
+    application:set_env(mnesia, schema_location, ram),
+    rpc:call(SlaveNode, mnesia, schema_location, [ram]),
     %% start
     ok = syn:start(),
     ok = syn:init(),

+ 229 - 0
test/syn_process_groups_consistency_SUITE.erl

@@ -0,0 +1,229 @@
+%% ==========================================================================================================
+%% Syn - A global process registry.
+%%
+%% The MIT License (MIT)
+%%
+%% Copyright (c) 2016 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
+%%
+%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%% of this software and associated documentation files (the "Software"), to deal
+%% in the Software without restriction, including without limitation the rights
+%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%% copies of the Software, and to permit persons to whom the Software is
+%% furnished to do so, subject to the following conditions:
+%%
+%% The above copyright notice and this permission notice shall be included in
+%% all copies or substantial portions of the Software.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+%% THE SOFTWARE.
+-module(syn_process_groups_consistency_SUITE).
+
+%% callbacks
+-export([all/0]).
+-export([init_per_suite/1, end_per_suite/1]).
+-export([groups/0, init_per_group/2, end_per_group/2]).
+-export([init_per_testcase/2, end_per_testcase/2]).
+
+%% tests
+-export([
+    two_nodes_netsplit_when_there_are_no_conflicts/1
+]).
+
+%% include
+-include_lib("common_test/include/ct.hrl").
+
+
+%% ===================================================================
+%% Callbacks
+%% ===================================================================
+
+%% -------------------------------------------------------------------
+%% Function: all() -> GroupsAndTestCases | {skip,Reason}
+%% GroupsAndTestCases = [{group,GroupName} | TestCase]
+%% GroupName = atom()
+%% TestCase = atom()
+%% Reason = term()
+%% -------------------------------------------------------------------
+all() ->
+    [
+        {group, two_nodes_netsplits}
+    ].
+
+%% -------------------------------------------------------------------
+%% Function: groups() -> [Group]
+%% Group = {GroupName,Properties,GroupsAndTestCases}
+%% GroupName = atom()
+%% Properties = [parallel | sequence | Shuffle | {RepeatType,N}]
+%% GroupsAndTestCases = [Group | {group,GroupName} | TestCase]
+%% TestCase = atom()
+%% Shuffle = shuffle | {shuffle,{integer(),integer(),integer()}}
+%% RepeatType = repeat | repeat_until_all_ok | repeat_until_all_fail |
+%%			   repeat_until_any_ok | repeat_until_any_fail
+%% N = integer() | forever
+%% -------------------------------------------------------------------
+groups() ->
+    [
+        {two_nodes_netsplits, [shuffle], [
+            two_nodes_netsplit_when_there_are_no_conflicts
+        ]}
+    ].
+%% -------------------------------------------------------------------
+%% Function: init_per_suite(Config0) ->
+%%				Config1 | {skip,Reason} |
+%%              {skip_and_save,Reason,Config1}
+%% Config0 = Config1 = [tuple()]
+%% Reason = term()
+%% -------------------------------------------------------------------
+init_per_suite(Config) ->
+    %% init
+    SlaveNodeShortName = syn_slave,
+    %% start slave
+    {ok, SlaveNode} = syn_test_suite_helper:start_slave(SlaveNodeShortName),
+    %% config
+    [
+        {slave_node_short_name, SlaveNodeShortName},
+        {slave_node, SlaveNode}
+        | Config
+    ].
+
+%% -------------------------------------------------------------------
+%% Function: end_per_suite(Config0) -> void() | {save_config,Config1}
+%% Config0 = Config1 = [tuple()]
+%% -------------------------------------------------------------------
+end_per_suite(Config) ->
+    %% get slave node name
+    SlaveNodeShortName = proplists:get_value(slave_node_short_name, Config),
+    %% stop slave
+    syn_test_suite_helper:stop_slave(SlaveNodeShortName).
+
+%% -------------------------------------------------------------------
+%% Function: init_per_group(GroupName, Config0) ->
+%%				Config1 | {skip,Reason} |
+%%              {skip_and_save,Reason,Config1}
+%% GroupName = atom()
+%% Config0 = Config1 = [tuple()]
+%% Reason = term()
+%% -------------------------------------------------------------------
+init_per_group(_GroupName, Config) -> Config.
+
+%% -------------------------------------------------------------------
+%% Function: end_per_group(GroupName, Config0) ->
+%%				void() | {save_config,Config1}
+%% GroupName = atom()
+%% Config0 = Config1 = [tuple()]
+%% -------------------------------------------------------------------
+end_per_group(_GroupName, _Config) -> ok.
+
+% ----------------------------------------------------------------------------------------------------------
+% Function: init_per_testcase(TestCase, Config0) ->
+%				Config1 | {skip,Reason} | {skip_and_save,Reason,Config1}
+% TestCase = atom()
+% Config0 = Config1 = [tuple()]
+% Reason = term()
+% ----------------------------------------------------------------------------------------------------------
+init_per_testcase(_TestCase, Config) ->
+    %% get slave
+    SlaveNode = proplists:get_value(slave_node, Config),
+    %% set schema location
+    application:set_env(mnesia, schema_location, ram),
+    rpc:call(SlaveNode, mnesia, schema_location, [ram]),
+    %% return
+    Config.
+
+% ----------------------------------------------------------------------------------------------------------
+% Function: end_per_testcase(TestCase, Config0) ->
+%				void() | {save_config,Config1} | {fail,Reason}
+% TestCase = atom()
+% Config0 = Config1 = [tuple()]
+% Reason = term()
+% ----------------------------------------------------------------------------------------------------------
+end_per_testcase(_TestCase, Config) ->
+    %% get slave
+    SlaveNode = proplists:get_value(slave_node, Config),
+    syn_test_suite_helper:clean_after_test(SlaveNode).
+
+%% ===================================================================
+%% Tests
+%% ===================================================================
+two_nodes_netsplit_when_there_are_no_conflicts(Config) ->
+    %% get slave
+    SlaveNode = proplists:get_value(slave_node, Config),
+    CurrentNode = node(),
+
+    %% set schema location
+    application:set_env(mnesia, schema_location, ram),
+    rpc:call(SlaveNode, mnesia, schema_location, [ram]),
+
+    %% start syn
+    ok = syn:start(),
+    ok = syn:init(),
+    ok = rpc:call(SlaveNode, syn, start, []),
+    ok = rpc:call(SlaveNode, syn, init, []),
+    timer:sleep(100),
+
+    %% start processes
+    LocalPid = syn_test_suite_helper:start_process(),
+    SlavePidLocal = syn_test_suite_helper:start_process(SlaveNode),
+    SlavePidSlave = syn_test_suite_helper:start_process(SlaveNode),
+
+    %% join
+    ok = syn:join(test_pg, LocalPid),
+    ok = syn:join(test_pg, SlavePidLocal),    %% joined on local node
+    ok = rpc:call(SlaveNode, syn, join, [test_pg, SlavePidSlave]),    %% joined on slave node
+    timer:sleep(100),
+
+    %% check tables
+    3 = mnesia:table_info(syn_pg_table, size),
+    3 = rpc:call(SlaveNode, mnesia, table_info, [syn_pg_table, size]),
+
+    LocalActiveReplicas = mnesia:table_info(syn_pg_table, active_replicas),
+    2 = length(LocalActiveReplicas),
+    true = lists:member(SlaveNode, LocalActiveReplicas),
+    true = lists:member(CurrentNode, LocalActiveReplicas),
+
+    SlaveActiveReplicas = rpc:call(SlaveNode, mnesia, table_info, [syn_pg_table, active_replicas]),
+    2 = length(SlaveActiveReplicas),
+    true = lists:member(SlaveNode, SlaveActiveReplicas),
+    true = lists:member(CurrentNode, SlaveActiveReplicas),
+
+    %% simulate net split
+    syn_test_suite_helper:disconnect_node(SlaveNode),
+    timer:sleep(1000),
+
+    %% check tables
+    1 = mnesia:table_info(syn_pg_table, size),
+    [CurrentNode] = mnesia:table_info(syn_pg_table, active_replicas),
+
+    %% reconnect
+    syn_test_suite_helper:connect_node(SlaveNode),
+    timer:sleep(1000),
+
+    %% check tables
+    3 = mnesia:table_info(syn_pg_table, size),
+    3 = rpc:call(SlaveNode, mnesia, table_info, [syn_pg_table, size]),
+
+    LocalActiveReplicasAfter = mnesia:table_info(syn_pg_table, active_replicas),
+    2 = length(LocalActiveReplicasAfter),
+    true = lists:member(SlaveNode, LocalActiveReplicasAfter),
+    true = lists:member(CurrentNode, LocalActiveReplicasAfter),
+
+    SlaveActiveReplicasAfter = rpc:call(SlaveNode, mnesia, table_info, [syn_pg_table, active_replicas]),
+    2 = length(SlaveActiveReplicasAfter),
+    true = lists:member(SlaveNode, SlaveActiveReplicasAfter),
+    true = lists:member(CurrentNode, SlaveActiveReplicasAfter),
+
+    %% check grouos
+    3 = length(syn:get_members(test_pg)),
+    true = syn:member(LocalPid, test_pg),
+    true = syn:member(SlavePidLocal, test_pg),
+    true = syn:member(SlavePidSlave, test_pg),
+    3 = length(rpc:call(SlaveNode, syn, get_members, [test_pg])),
+    true = rpc:call(SlaveNode, syn, member, [LocalPid, test_pg]),
+    true = rpc:call(SlaveNode, syn, member, [SlavePidLocal, test_pg]),
+    true = rpc:call(SlaveNode, syn, member, [SlavePidSlave, test_pg]).

+ 2 - 2
test/syn_registry_consistency_SUITE.erl

@@ -62,8 +62,8 @@
 %% -------------------------------------------------------------------
 all() ->
     [
-%%        {group, two_nodes_netsplits},
-%%        {group, three_nodes_netsplits}
+        {group, two_nodes_netsplits},
+        {group, three_nodes_netsplits}
     ].
 
 %% -------------------------------------------------------------------