Просмотр исходного кода

Add backbone and netsplit core.

Roberto Ostinelli 10 лет назад
Родитель
Сommit
93251c1add
5 измененных файлов с 401 добавлено и 0 удалено
  1. 6 0
      include/syn.hrl
  2. 138 0
      src/syn_backbone.erl
  3. 108 0
      src/syn_netsplits.erl
  4. 2 0
      src/syn_sup.erl
  5. 147 0
      test/syn_create_mnesia_SUITE.erl

+ 6 - 0
include/syn.hrl

@@ -0,0 +1,6 @@
+%% records
+-record(syn_processes_table, {
+    key = undefined :: undefined | any(),
+    pid = undefined :: undefined | pid() | atom(),
+    node = undefined :: atom()
+}).

+ 138 - 0
src/syn_backbone.erl

@@ -0,0 +1,138 @@
+-module(syn_backbone).
+-behaviour(gen_server).
+
+%% API
+-export([start_link/0]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
+
+%% records
+-record(state, {}).
+
+%% include
+-include("syn.hrl").
+
+
+%% ===================================================================
+%% API
+%% ===================================================================
+-spec start_link() -> {ok, pid()} | {error, any()}.
+start_link() ->
+    Options = [],
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
+
+%% ===================================================================
+%% Callbacks
+%% ===================================================================
+
+%% ----------------------------------------------------------------------------------------------------------
+%% Init
+%% ----------------------------------------------------------------------------------------------------------
+-spec init([]) ->
+    {ok, #state{}} |
+    {ok, #state{}, Timeout :: non_neg_integer()} |
+    ignore |
+    {stop, Reason :: any()}.
+init([]) ->
+    %% init
+    case initdb() of
+        ok ->
+            {ok, #state{}};
+        Other ->
+            {stop, Other}
+    end.
+
+%% ----------------------------------------------------------------------------------------------------------
+%% Call messages
+%% ----------------------------------------------------------------------------------------------------------
+-spec handle_call(Request :: any(), From :: any(), #state{}) ->
+    {reply, Reply :: any(), #state{}} |
+    {reply, Reply :: any(), #state{}, Timeout :: non_neg_integer()} |
+    {noreply, #state{}} |
+    {noreply, #state{}, Timeout :: non_neg_integer()} |
+    {stop, Reason :: any(), Reply :: any(), #state{}} |
+    {stop, Reason :: any(), #state{}}.
+
+handle_call(Request, From, State) ->
+    error_logger:warning_msg("Received from ~p an unknown call message: ~p", [Request, From]),
+    {reply, undefined, State}.
+
+%% ----------------------------------------------------------------------------------------------------------
+%% Cast messages
+%% ----------------------------------------------------------------------------------------------------------
+-spec handle_cast(Msg :: any(), #state{}) ->
+    {noreply, #state{}} |
+    {noreply, #state{}, Timeout :: non_neg_integer()} |
+    {stop, Reason :: any(), #state{}}.
+
+handle_cast(Msg, State) ->
+    error_logger:warning_msg("Received an unknown cast message: ~p", [Msg]),
+    {noreply, State}.
+
+%% ----------------------------------------------------------------------------------------------------------
+%% All non Call / Cast messages
+%% ----------------------------------------------------------------------------------------------------------
+-spec handle_info(Info :: any(), #state{}) ->
+    {noreply, #state{}} |
+    {noreply, #state{}, Timeout :: non_neg_integer()} |
+    {stop, Reason :: any(), #state{}}.
+
+handle_info(Info, State) ->
+    error_logger:warning_msg("Received an unknown info message: ~p", [Info]),
+    {noreply, State}.
+
+%% ----------------------------------------------------------------------------------------------------------
+%% Terminate
+%% ----------------------------------------------------------------------------------------------------------
+-spec terminate(Reason :: any(), #state{}) -> terminated.
+terminate(Reason, _State) ->
+    error_logger:info_msg("Terminating syn with reason: ~p", [Reason]),
+    terminated.
+
+%% ----------------------------------------------------------------------------------------------------------
+%% Convert process state when code is changed.
+%% ----------------------------------------------------------------------------------------------------------
+-spec code_change(OldVsn :: any(), #state{}, Extra :: any()) -> {ok, #state{}}.
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%% ===================================================================
+%% Internal
+%% ===================================================================
+-spec initdb() -> ok | {error, any()}.
+initdb() ->
+    %% ensure all nodes are added - this covers when mnesia is in ram only mode
+    mnesia:change_config(extra_db_nodes, [node() | nodes()]),
+    %% ensure table exists
+    case mnesia:create_table(syn_processes_table, [
+        {type, set},
+        {ram_copies, [node() | nodes()]},
+        {attributes, record_info(fields, syn_processes_table)},
+        {index, [#syn_processes_table.pid]},
+        {storage_properties, [{ets, [{read_concurrency, true}]}]}
+    ]) of
+        {atomic, ok} ->
+            error_logger:info_msg("syn_processes_table was successfully created."),
+            ok;
+        {aborted, {already_exists, syn_processes_table}} ->
+            %% table already exists, try to add current node as copy
+            add_table_copy_to_local_node();
+        Other ->
+            error_logger:error_msg("Error while creating syn_processes_table: ~p", [Other]),
+            {error, Other}
+    end.
+
+-spec add_table_copy_to_local_node() -> ok | {error, any()}.
+add_table_copy_to_local_node() ->
+    case mnesia:add_table_copy(syn_processes_table, node(), ram_copies) of
+        {atomic, ok} ->
+            error_logger:info_msg("Copy of syn_processes_table was successfully added to current node."),
+            ok;
+        {aborted, {already_exists, syn_processes_table}} ->
+            %% a copy of syn_processes_table is already added to current node
+            ok;
+        {aborted, Reason} ->
+            error_logger:error_msg("Error while creating copy of syn_processes_table: ~p", [Reason]),
+            {error, Reason}
+    end.

+ 108 - 0
src/syn_netsplits.erl

@@ -0,0 +1,108 @@
+-module(syn_netsplits).
+-behaviour(gen_server).
+
+%% API
+-export([start_link/0]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
+
+%% records
+-record(state, {}).
+
+%% include
+-include("syn.hrl").
+
+
+%% ===================================================================
+%% API
+%% ===================================================================
+-spec start_link() -> {ok, pid()} | {error, any()}.
+start_link() ->
+    Options = [],
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
+
+%% ===================================================================
+%% Callbacks
+%% ===================================================================
+
+%% ----------------------------------------------------------------------------------------------------------
+%% Init
+%% ----------------------------------------------------------------------------------------------------------
+-spec init([]) ->
+    {ok, #state{}} |
+    {ok, #state{}, Timeout :: non_neg_integer()} |
+    ignore |
+    {stop, Reason :: any()}.
+init([]) ->
+    %% monitor mnesia events
+    mnesia:subscribe(system),
+    {ok, #state{}}.
+
+%% ----------------------------------------------------------------------------------------------------------
+%% Call messages
+%% ----------------------------------------------------------------------------------------------------------
+-spec handle_call(Request :: any(), From :: any(), #state{}) ->
+    {reply, Reply :: any(), #state{}} |
+    {reply, Reply :: any(), #state{}, Timeout :: non_neg_integer()} |
+    {noreply, #state{}} |
+    {noreply, #state{}, Timeout :: non_neg_integer()} |
+    {stop, Reason :: any(), Reply :: any(), #state{}} |
+    {stop, Reason :: any(), #state{}}.
+
+handle_call(Request, From, State) ->
+    error_logger:warning_msg("Received from ~p an unknown call message: ~p", [Request, From]),
+    {reply, undefined, State}.
+
+%% ----------------------------------------------------------------------------------------------------------
+%% Cast messages
+%% ----------------------------------------------------------------------------------------------------------
+-spec handle_cast(Msg :: any(), #state{}) ->
+    {noreply, #state{}} |
+    {noreply, #state{}, Timeout :: non_neg_integer()} |
+    {stop, Reason :: any(), #state{}}.
+
+handle_cast(Msg, State) ->
+    error_logger:warning_msg("Received an unknown cast message: ~p", [Msg]),
+    {noreply, State}.
+
+%% ----------------------------------------------------------------------------------------------------------
+%% All non Call / Cast messages
+%% ----------------------------------------------------------------------------------------------------------
+-spec handle_info(Info :: any(), #state{}) ->
+    {noreply, #state{}} |
+    {noreply, #state{}, Timeout :: non_neg_integer()} |
+    {stop, Reason :: any(), #state{}}.
+
+handle_info({mnesia_system_event, {inconsistent_database, Context, Node}}, State) ->
+    error_logger:warning_msg("MNESIA signalled an inconsistent database on node: ~p with context: ~p, initiating automerge", [Node, Context]),
+    %% automerge(Node),
+    {noreply, State};
+
+handle_info({mnesia_system_event, {mnesia_down, Node}}, State) when Node =/= node() ->
+    error_logger:warning_msg("Received a MNESIA down event, removing all pids of node ~p", [Node]),
+    %% delete_pids_of_disconnected_node(Node),
+    {noreply, State};
+
+handle_info({mnesia_system_event, _MnesiaEvent}, State) ->
+    %% ignore mnesia event
+    {noreply, State};
+
+handle_info(Info, State) ->
+    error_logger:warning_msg("Received an unknown info message: ~p", [Info]),
+    {noreply, State}.
+
+%% ----------------------------------------------------------------------------------------------------------
+%% Terminate
+%% ----------------------------------------------------------------------------------------------------------
+-spec terminate(Reason :: any(), #state{}) -> terminated.
+terminate(Reason, _State) ->
+    error_logger:info_msg("Terminating syn with reason: ~p", [Reason]),
+    terminated.
+
+%% ----------------------------------------------------------------------------------------------------------
+%% Convert process state when code is changed.
+%% ----------------------------------------------------------------------------------------------------------
+-spec code_change(OldVsn :: any(), #state{}, Extra :: any()) -> {ok, #state{}}.
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.

+ 2 - 0
src/syn_sup.erl

@@ -25,5 +25,7 @@ start_link() ->
     {ok, {{supervisor:strategy(), non_neg_integer(), non_neg_integer()}, [supervisor:child_spec()]}}.
 init([]) ->
     Children = [
+        ?CHILD(syn_backbone, worker),
+        ?CHILD(syn_netsplits, worker)
     ],
     {ok, {{one_for_one, 10, 10}, Children}}.

+ 147 - 0
test/syn_create_mnesia_SUITE.erl

@@ -0,0 +1,147 @@
+-module(syn_create_mnesia_SUITE).
+
+%% callbacks
+-export([all/0]).
+-export([init_per_suite/1, end_per_suite/1]).
+-export([groups/0, init_per_group/2, end_per_group/2]).
+
+%% tests
+-export([
+    single_node_when_mnesia_is_ram/1,
+    single_node_when_mnesia_is_opt_disc_no_schema_exists/1,
+    single_node_when_mnesia_is_opt_disc_schema_exists/1,
+    single_node_when_mnesia_is_disc/1
+]).
+-export([
+    two_nodes_when_mnesia_is_ram/1
+]).
+
+%% include
+-include_lib("common_test/include/ct.hrl").
+
+
+%% ===================================================================
+%% Callbacks
+%% ===================================================================
+
+%% -------------------------------------------------------------------
+%% Function: all() -> GroupsAndTestCases | {skip,Reason}
+%% GroupsAndTestCases = [{group,GroupName} | TestCase]
+%% GroupName = atom()
+%% TestCase = atom()
+%% Reason = term()
+%% -------------------------------------------------------------------
+all() ->
+    [
+        {group, mnesia_creation_single_node}
+    ].
+
+%% -------------------------------------------------------------------
+%% Function: groups() -> [Group]
+%% Group = {GroupName,Properties,GroupsAndTestCases}
+%% GroupName = atom()
+%% Properties = [parallel | sequence | Shuffle | {RepeatType,N}]
+%% GroupsAndTestCases = [Group | {group,GroupName} | TestCase]
+%% TestCase = atom()
+%% Shuffle = shuffle | {shuffle,{integer(),integer(),integer()}}
+%% RepeatType = repeat | repeat_until_all_ok | repeat_until_all_fail |
+%%			   repeat_until_any_ok | repeat_until_any_fail
+%% N = integer() | forever
+%% -------------------------------------------------------------------
+groups() ->
+    [
+        {mnesia_creation_single_node, [shuffle], [
+            single_node_when_mnesia_is_ram,
+            single_node_when_mnesia_is_opt_disc_no_schema_exists,
+            single_node_when_mnesia_is_opt_disc_schema_exists,
+            single_node_when_mnesia_is_disc
+        ]}
+    ].
+%% -------------------------------------------------------------------
+%% Function: init_per_suite(Config0) ->
+%%				Config1 | {skip,Reason} |
+%%              {skip_and_save,Reason,Config1}
+%% Config0 = Config1 = [tuple()]
+%% Reason = term()
+%% -------------------------------------------------------------------
+init_per_suite(Config) ->
+    %% config
+    [
+        {slave_node_bare_name, syn_slave}
+        | Config
+    ].
+
+%% -------------------------------------------------------------------
+%% Function: end_per_suite(Config0) -> void() | {save_config,Config1}
+%% Config0 = Config1 = [tuple()]
+%% -------------------------------------------------------------------
+end_per_suite(_Config) -> ok.
+
+%% -------------------------------------------------------------------
+%% Function: init_per_group(GroupName, Config0) ->
+%%				Config1 | {skip,Reason} |
+%%              {skip_and_save,Reason,Config1}
+%% GroupName = atom()
+%% Config0 = Config1 = [tuple()]
+%% Reason = term()
+%% -------------------------------------------------------------------
+init_per_group(_GroupName, Config) -> Config.
+
+%% -------------------------------------------------------------------
+%% Function: end_per_group(GroupName, Config0) ->
+%%				void() | {save_config,Config1}
+%% GroupName = atom()
+%% Config0 = Config1 = [tuple()]
+%% -------------------------------------------------------------------
+end_per_group(_GroupName, _Config) ->
+    clean_after_test().
+
+%% ===================================================================
+%% Tests
+%% ===================================================================
+single_node_when_mnesia_is_ram(_Config) ->
+    %% set schema location
+    application:set_env(mnesia, schema_location, ram),
+    %% start
+    ok = syn:start(),
+    %% check table exists
+    true = lists:member(syn_processes_table, mnesia:system_info(tables)).
+
+single_node_when_mnesia_is_opt_disc_no_schema_exists(_Config) ->
+    %% set schema location
+    application:set_env(mnesia, schema_location, opt_disc),
+    %% start
+    ok = syn:start(),
+    %% check table exists
+    true = lists:member(syn_processes_table, mnesia:system_info(tables)).
+
+single_node_when_mnesia_is_opt_disc_schema_exists(_Config) ->
+    %% set schema location
+    application:set_env(mnesia, schema_location, opt_disc),
+    %% create schema
+    mnesia:create_schema([node()]),
+    %% start
+    ok = syn:start(),
+    %% check table exists
+    true = lists:member(syn_processes_table, mnesia:system_info(tables)).
+
+single_node_when_mnesia_is_disc(_Config) ->
+    %% set schema location
+    application:set_env(mnesia, schema_location, disc),
+    %% create schema
+    mnesia:create_schema([node()]),
+    %% start
+    ok = syn:start(),
+    %% check table exists
+    true = lists:member(syn_processes_table, mnesia:system_info(tables)).
+
+%% ===================================================================
+%% Internal
+%% ===================================================================
+clean_after_test() ->
+    %% stop mnesia
+    mnesia:stop(),
+    %% delete schema
+    mnesia:delete_schema([node()]),
+    %% stop syn
+    syn:stop().