Browse Source

Allow to set process callback exit and netsplit_send_message_to_process from config file.

Roberto Ostinelli 10 years ago
parent
commit
07fd241094

+ 0 - 32
src/syn.erl

@@ -32,7 +32,6 @@
 -export([start/0, stop/0]).
 -export([register/2, unregister/1]).
 -export([find_by_key/1, find_by_pid/1]).
--export([options/1]).
 -export([count/0, count/1]).
 
 
@@ -65,10 +64,6 @@ find_by_key(Key) ->
 find_by_pid(Pid) ->
     syn_backbone:find_by_pid(Pid).
 
--spec options(list()) -> ok.
-options(Options) ->
-    set_options(Options).
-
 -spec count() -> non_neg_integer().
 count() ->
     syn_backbone:count().
@@ -76,30 +71,3 @@ count() ->
 -spec count(Node :: atom()) -> non_neg_integer().
 count(Node) ->
     syn_backbone:count(Node).
-
-%% ===================================================================
-%% Internal
-%% ===================================================================
-set_options([]) -> ok;
-set_options([{netsplit_conflicting_mode, Mode} | Options]) ->
-    case Mode of
-        kill ->
-            syn_netsplits:conflicting_mode(kill);
-        {send_message, Message} ->
-            syn_netsplits:conflicting_mode({send_message, Message});
-        _ ->
-            error(invalid_syn_option, {netsplit_conflicting_mode, Mode})
-    end,
-    set_options(Options);
-set_options([{process_exit_callback, ProcessExitCallback} | Options]) ->
-    case ProcessExitCallback of
-        undefined ->
-            syn_backbone:process_exit_callback(undefined);
-        _ when is_function(ProcessExitCallback) ->
-            syn_backbone:process_exit_callback(ProcessExitCallback);
-        _ ->
-            error(invalid_syn_option, {process_exit_callback, ProcessExitCallback})
-    end,
-    set_options(Options);
-set_options([InvalidSynOption | _Options]) ->
-    error(invalid_syn_option, InvalidSynOption).

+ 16 - 14
src/syn_backbone.erl

@@ -31,7 +31,6 @@
 
 %% API
 -export([start_link/0]).
--export([process_exit_callback/1]).
 -export([register/2, unregister/1]).
 -export([find_by_key/1, find_by_pid/1]).
 -export([count/0, count/1]).
@@ -41,7 +40,8 @@
 
 %% records
 -record(state, {
-    process_exit_callback = undefined :: undefined | function()
+    process_exit_callback_module = undefined :: atom(),
+    process_exit_callback_function = undefined :: atom()
 }).
 
 %% include
@@ -56,10 +56,6 @@ start_link() ->
     Options = [],
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
 
--spec process_exit_callback(function() | undefined) -> ok.
-process_exit_callback(ProcessExitCallback) ->
-    gen_server:multi_call(?MODULE, {process_exit_callback, ProcessExitCallback}).
-
 -spec find_by_key(Key :: any()) -> pid() | undefined.
 find_by_key(Key) ->
     case mnesia:dirty_read(syn_processes_table, Key) of
@@ -135,7 +131,16 @@ init([]) ->
     %% init
     case initdb() of
         ok ->
-            {ok, #state{}};
+            %% get options
+            {ok, [ProcessExitCallbackModule, ProcessExitCallbackFunction]} = syn_utils:get_env_value(
+                process_exit_callback,
+                [undefined, undefined]
+            ),
+            %% return
+            {ok, #state{
+                process_exit_callback_module = ProcessExitCallbackModule,
+                process_exit_callback_function = ProcessExitCallbackFunction
+            }};
         Other ->
             {stop, Other}
     end.
@@ -159,10 +164,6 @@ handle_call({unlink_process, Pid}, _From, State) ->
     erlang:unlink(Pid),
     {reply, ok, State};
 
-handle_call({process_exit_callback, ProcessExitCallback}, _From, State) ->
-    error_logger:info_msg("process_exit_callback set to: ~p~n", [ProcessExitCallback]),
-    {reply, ok, State#state{process_exit_callback = ProcessExitCallback}};
-
 handle_call(Request, From, State) ->
     error_logger:warning_msg("Received from ~p an unknown call message: ~p~n", [Request, From]),
     {reply, undefined, State}.
@@ -188,7 +189,8 @@ handle_cast(Msg, State) ->
     {stop, Reason :: any(), #state{}}.
 
 handle_info({'EXIT', Pid, Reason}, #state{
-    process_exit_callback = ProcessExitCallback
+    process_exit_callback_module = ProcessExitCallbackModule,
+    process_exit_callback_function = ProcessExitCallbackFunction
 } = State) ->
     %% do not lock backbone
     spawn(fun() ->
@@ -212,9 +214,9 @@ handle_info({'EXIT', Pid, Reason}, #state{
                         error_logger:error_msg("Process with key ~p exited with reason: ~p~n", [Key, Reason])
                 end,
                 %% callback
-                case ProcessExitCallback of
+                case ProcessExitCallbackModule of
                     undefined -> ok;
-                    _ -> spawn(fun() -> ProcessExitCallback(Key, Pid, Reason) end)
+                    _ -> spawn(fun() -> ProcessExitCallbackModule:ProcessExitCallbackFunction(Key, Pid, Reason) end)
                 end
         end
     end),

+ 16 - 19
src/syn_netsplits.erl

@@ -31,7 +31,6 @@
 
 %% API
 -export([start_link/0]).
--export([conflicting_mode/1]).
 
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@@ -58,12 +57,6 @@ start_link() ->
     Options = [],
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
 
--spec conflicting_mode(kill | {send_message, any()}) -> ok.
-conflicting_mode(kill) ->
-    gen_server:multi_call(?MODULE, {conflicting_mode, kill});
-conflicting_mode({send_message, Message}) ->
-    gen_server:multi_call(?MODULE, {conflicting_mode, {send_message, Message}}).
-
 %% ===================================================================
 %% Callbacks
 %% ===================================================================
@@ -81,8 +74,22 @@ init([]) ->
     process_flag(trap_exit, true),
     %% monitor mnesia events
     mnesia:subscribe(system),
-    %% init state
-    {ok, #state{}}.
+    %% get options
+    {ok, NetsplitSendMessageToProcess} = syn_utils:get_env_value(
+        netsplit_send_message_to_process,
+        syn_do_not_send_any_message_to_conflicting_process
+    ),
+    %% get state params
+    {ConflictingMode, Message} = case NetsplitSendMessageToProcess of
+        syn_do_not_send_any_message_to_conflicting_process -> {kill, undefined};
+        _ -> {send_message, NetsplitSendMessageToProcess}
+
+    end,
+    %% build state
+    {ok, #state{
+        conflicting_mode = ConflictingMode,
+        message = Message
+    }}.
 
 %% ----------------------------------------------------------------------------------------------------------
 %% Call messages
@@ -95,16 +102,6 @@ init([]) ->
     {stop, Reason :: any(), Reply :: any(), #state{}} |
     {stop, Reason :: any(), #state{}}.
 
-handle_call({conflicting_mode, kill}, _From, State) ->
-    {reply, ok, State#state{
-        conflicting_mode = kill,
-        message = undefined
-    }};
-handle_call({conflicting_mode, {send_message, Message}}, _From, State) ->
-    {reply, ok, State#state{
-        conflicting_mode = send_message,
-        message = Message
-    }};
 handle_call(Request, From, State) ->
     error_logger:warning_msg("Received from ~p an unknown call message: ~p~n", [Request, From]),
     {reply, undefined, State}.

+ 48 - 0
src/syn_utils.erl

@@ -0,0 +1,48 @@
+%% ==========================================================================================================
+%% Syn - A global process registry.
+%%
+%% Copyright (C) 2015, Roberto Ostinelli <roberto@ostinelli.net>.
+%% All rights reserved.
+%%
+%% The MIT License (MIT)
+%%
+%% Copyright (c) 2015 Roberto Ostinelli
+%%
+%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%% of this software and associated documentation files (the "Software"), to deal
+%% in the Software without restriction, including without limitation the rights
+%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%% copies of the Software, and to permit persons to whom the Software is
+%% furnished to do so, subject to the following conditions:
+%%
+%% The above copyright notice and this permission notice shall be included in
+%% all copies or substantial portions of the Software.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+%% THE SOFTWARE.
+%% ==========================================================================================================
+-module(syn_utils).
+
+%% API
+-export([get_env_value/1, get_env_value/2]).
+
+
+%% ===================================================================
+%% API
+%% ===================================================================
+%% get an environment value
+-spec get_env_value(Key :: any()) -> {ok, any()} | undefined.
+get_env_value(Key) ->
+    application:get_env(Key).
+
+-spec get_env_value(Key :: any(), Default :: any()) -> {ok, any()}.
+get_env_value(Key, Default) ->
+    case application:get_env(Key) of
+        undefined -> {ok, Default};
+        {ok, Val} -> {ok, Val}
+    end.

+ 27 - 0
test/syn-test.config

@@ -0,0 +1,27 @@
+%%%===================================================================
+%%% Syn - TEST CONFIGURATION FILE
+%%%===================================================================
+
+[
+
+%% Syn config
+    {syn, [
+
+        %% You can set a callback to be triggered when a process exits.
+        %% This callback will be called only on the node where the process was running.
+
+        {process_exit_callback, [syn_register_processes_SUITE, process_exit_callback_dummy]},
+
+        %% After a net split, when nodes reconnect, Syn will merge the data from all the nodes in the cluster.
+        %% If the same Key was used to register a process on different nodes during a net split, then there will be a conflict.
+        %% By default, Syn will discard the processes running on the node the conflict is being resolved on,
+        %% and will kill it by sending a `kill` signal with `exit(Pid, kill)`.
+        %% If this is not desired, you can set the netsplit_send_message_to_process option here below to instruct Syn
+        %% to send a message to the discarded process, so that you can trigger any actions on that process
+        %% (such as a graceful shutdown).
+
+        {netsplit_send_message_to_process, shutdown}
+
+    ]}
+
+].

+ 41 - 12
test/syn_netsplits_SUITE.erl

@@ -40,6 +40,9 @@
     two_nodes_netsplit_message_resolution_when_there_are_conflicts/1
 ]).
 
+%% internal
+-export([process_reply_main/0]).
+
 %% include
 -include_lib("common_test/include/ct.hrl").
 
@@ -140,10 +143,7 @@ init_per_testcase(_TestCase, Config) ->
     %% set schema location
     application:set_env(mnesia, schema_location, ram),
     rpc:call(SlaveNodeName, mnesia, schema_location, [ram]),
-    %% start syn
-    ok = syn:start(),
-    ok = rpc:call(SlaveNodeName, syn, start, []),
-    timer:sleep(100),
+    %% return
     Config.
 
 % ----------------------------------------------------------------------------------------------------------
@@ -166,6 +166,11 @@ two_nodes_netsplit_when_there_are_no_conflicts(Config) ->
     SlaveNodeName = proplists:get_value(slave_node_name, Config),
     CurrentNode = node(),
 
+    %% start syn
+    ok = syn:start(),
+    ok = rpc:call(SlaveNodeName, syn, start, []),
+    timer:sleep(100),
+
     %% start processes
     LocalPid = syn_test_suite_helper:start_process(),
     SlavePidLocal = syn_test_suite_helper:start_process(SlaveNodeName),
@@ -236,6 +241,11 @@ two_nodes_netsplit_kill_resolution_when_there_are_conflicts(Config) ->
     SlaveNodeName = proplists:get_value(slave_node_name, Config),
     CurrentNode = node(),
 
+    %% start syn
+    ok = syn:start(),
+    ok = rpc:call(SlaveNodeName, syn, start, []),
+    timer:sleep(100),
+
     %% start processes
     LocalPid = syn_test_suite_helper:start_process(),
     SlavePid = syn_test_suite_helper:start_process(SlaveNodeName),
@@ -286,12 +296,22 @@ two_nodes_netsplit_message_resolution_when_there_are_conflicts(Config) ->
     SlaveNodeName = proplists:get_value(slave_node_name, Config),
     CurrentNode = node(),
 
-    %% set resolution by message shutdown
-    syn:options([{netsplit_conflicting_mode, {send_message, {self(), shutdown}}}]),
+    %% load configuration variables from syn-test.config => this sets the netsplit_send_message_to_process option
+    syn_test_suite_helper:set_environment_variables(),
+    syn_test_suite_helper:set_environment_variables(SlaveNodeName),
+
+    %% start syn
+    ok = syn:start(),
+    ok = rpc:call(SlaveNodeName, syn, start, []),
+    timer:sleep(100),
 
     %% start processes
-    LocalPid = syn_test_suite_helper:start_process(),
-    SlavePid = syn_test_suite_helper:start_process(SlaveNodeName),
+    LocalPid = syn_test_suite_helper:start_process(fun process_reply_main/0),
+    SlavePid = syn_test_suite_helper:start_process(SlaveNodeName, fun process_reply_main/0),
+
+    %% register global process
+    ResultPid = self(),
+    global:register_name(syn_netsplits_SUITE_result, ResultPid),
 
     %% register
     ok = syn:register(conflicting_key, SlavePid),
@@ -333,12 +353,21 @@ two_nodes_netsplit_message_resolution_when_there_are_conflicts(Config) ->
     %% check message received from killed pid
     KilledPid = lists:nth(1, lists:delete(FoundPid, [LocalPid, SlavePid])),
     receive
-        {KilledPid, terminated} -> ok;
-        Other -> ct:pal("WUT?? ~p", [Other])
-    after 5 ->
-        ok = not_received
+        {exited, KilledPid} -> ok
+    after 2000 ->
+        ok = conflicting_process_did_not_receive_message
     end,
 
     %% kill processes
     syn_test_suite_helper:kill_process(LocalPid),
     syn_test_suite_helper:kill_process(SlavePid).
+
+%% ===================================================================
+%% Internal
+%% ===================================================================
+process_reply_main() ->
+    receive
+        shutdown ->
+            timer:sleep(100), %% wait for global processes to propagate
+            global:send(syn_netsplits_SUITE_result, {exited, self()})
+    end.

+ 26 - 21
test/syn_register_processes_SUITE.erl

@@ -51,6 +51,9 @@
     two_nodes_when_mnesia_is_disc_find_by_pid/1
 ]).
 
+%% internals
+-export([process_exit_callback_dummy/3]).
+
 %% include
 -include_lib("common_test/include/ct.hrl").
 
@@ -291,29 +294,27 @@ single_node_when_mnesia_is_ram_process_count(_Config) ->
     0 = syn:count().
 
 single_node_when_mnesia_is_ram_callback_on_process_exit(_Config) ->
+    CurrentNode = node(),
     %% set schema location
     application:set_env(mnesia, schema_location, ram),
+    %% load configuration variables from syn-test.config => this defines the callback
+    syn_test_suite_helper:set_environment_variables(),
     %% start
     ok = syn:start(),
-    %% define callback
-    Self = self(),
-    CallbackFun = fun(Key, Pid, Reason) ->
-        Self ! {exited, Key, Pid, Reason}
-    end,
-    syn:options([
-        {process_exit_callback, CallbackFun}
-    ]),
+    %% register global process
+    ResultPid = self(),
+    global:register_name(syn_register_process_SUITE_result, ResultPid),
     %% start process
     Pid = syn_test_suite_helper:start_process(),
     %% register
     ok = syn:register(<<"my proc">>, Pid),
     %% kill process
     syn_test_suite_helper:kill_process(Pid),
-    %% check callback was triggered
+    %% check callback were triggered
     receive
-        {exited, <<"my proc">>, Pid, killed} -> ok
+        {exited, CurrentNode, <<"my proc">>, Pid, killed} -> ok
     after 2000 ->
-        ok = process_exit_callback_was_not_called
+        ok = process_exit_callback_was_not_called_from_local_node
     end.
 
 single_node_when_mnesia_is_disc_find_by_key(_Config) ->
@@ -418,18 +419,16 @@ two_nodes_when_mnesia_is_ram_callback_on_process_exit(Config) ->
     %% set schema location
     application:set_env(mnesia, schema_location, ram),
     rpc:call(SlaveNodeName, mnesia, schema_location, [ram]),
+    %% load configuration variables from syn-test.config => this defines the callback
+    syn_test_suite_helper:set_environment_variables(),
+    syn_test_suite_helper:set_environment_variables(SlaveNodeName),
     %% start
     ok = syn:start(),
     ok = rpc:call(SlaveNodeName, syn, start, []),
     timer:sleep(100),
-    %% define callback
-    Self = self(),
-    CallbackFun = fun(Key, _Pid, _Reason) ->
-        Self ! {exited, Key, node()}
-    end,
-    syn:options([
-        {process_exit_callback, CallbackFun}
-    ]),
+    %% register global process
+    ResultPid = self(),
+    global:register_name(syn_register_process_SUITE_result, ResultPid),
     %% start processes
     PidLocal = syn_test_suite_helper:start_process(),
     PidSlave = syn_test_suite_helper:start_process(SlaveNodeName),
@@ -441,12 +440,12 @@ two_nodes_when_mnesia_is_ram_callback_on_process_exit(Config) ->
     syn_test_suite_helper:kill_process(PidSlave),
     %% check callback were triggered
     receive
-        {exited, <<"local">>, CurrentNode} -> ok
+        {exited, CurrentNode, <<"local">>, PidLocal, killed} -> ok
     after 2000 ->
         ok = process_exit_callback_was_not_called_from_local_node
     end,
     receive
-        {exited, <<"slave">>, SlaveNodeName} -> ok
+        {exited, SlaveNodeName, <<"slave">>, PidSlave, killed} -> ok
     after 2000 ->
         ok = process_exit_callback_was_not_called_from_slave_node
     end.
@@ -476,3 +475,9 @@ two_nodes_when_mnesia_is_disc_find_by_pid(Config) ->
     %% retrieve
     undefined = syn:find_by_pid(Pid),
     undefined = rpc:call(SlaveNodeName, syn, find_by_pid, [Pid]).
+
+%% ===================================================================
+%% Internal
+%% ===================================================================
+process_exit_callback_dummy(Key, Pid, Reason) ->
+    global:send(syn_register_process_SUITE_result, {exited, node(), Key, Pid, Reason}).

+ 41 - 7
test/syn_test_suite_helper.erl

@@ -29,15 +29,35 @@
 -module(syn_test_suite_helper).
 
 %% API
+-export([set_environment_variables/0, set_environment_variables/1]).
 -export([start_slave/1, stop_slave/1]).
 -export([connect_node/1, disconnect_node/1]).
 -export([clean_after_test/0, clean_after_test/1]).
--export([start_process/0, start_process/1, kill_process/1, process_main/0]).
+-export([start_process/0, start_process/1, start_process/2]).
+-export([kill_process/1]).
+
+%% internal
+-export([process_main/0]).
+
+%% macros
+-define(SYN_TEST_CONFIG_FILENAME, "syn-test.config").
 
 
 %% ===================================================================
 %% API
 %% ===================================================================
+set_environment_variables() ->
+    set_environment_variables(node()).
+set_environment_variables(Node) ->
+    % read config file
+    ConfigFilePath = filename:join([filename:dirname(code:which(?MODULE)), ?SYN_TEST_CONFIG_FILENAME]),
+    {ok, [AppsConfig]} = file:consult(ConfigFilePath),
+    % loop to set variables
+    F = fun({AppName, AppConfig}) ->
+        set_environment_for_app(Node, AppName, AppConfig)
+    end,
+    lists:foreach(F, AppsConfig).
+
 start_slave(NodeShortName) ->
     EbinFilePath = filename:join([filename:dirname(code:lib_dir(syn, ebin)), "ebin"]),
     TestFilePath = filename:join([filename:dirname(code:lib_dir(syn, ebin)), "test"]),
@@ -82,17 +102,31 @@ clean_after_test(NodeName) ->
     rpc:call(NodeName, syn, stop, []).
 
 start_process() ->
-    Pid = spawn(?MODULE, process_main, []),
+    Pid = spawn(fun process_main/0),
     Pid.
-
-start_process(NodeName) ->
-    Pid = spawn(NodeName, ?MODULE, process_main, []),
+start_process(NodeName) when is_atom(NodeName) ->
+    Pid = spawn(NodeName, fun process_main/0),
+    Pid;
+start_process(Loop) when is_function(Loop) ->
+    Pid = spawn(Loop),
+    Pid.
+start_process(NodeName, Loop)->
+    Pid = spawn(NodeName, Loop),
     Pid.
 
 kill_process(Pid) ->
     exit(Pid, kill).
 
+%% ===================================================================
+%% Internal
+%% ===================================================================
+set_environment_for_app(Node, AppName, AppConfig) ->
+    F = fun({Key, Val}) ->
+        ok = rpc:call(Node, application, set_env, [AppName, Key, Val])
+    end,
+    lists:foreach(F, AppConfig).
+
 process_main() ->
     receive
-        {From, shutdown} -> From ! {self(), terminated}
-    end.
+        _ -> process_main()
+    end.