Browse Source

Allow to set netsplit conflicting mode.

Roberto Ostinelli 10 years ago
parent
commit
315a29a1c6
5 changed files with 191 additions and 49 deletions
  1. 54 25
      README.md
  2. 14 1
      src/syn.erl
  3. 56 19
      src/syn_netsplits.erl
  4. 66 3
      test/syn_netsplits_SUITE.erl
  5. 1 1
      test/syn_test_suite_helper.erl

+ 54 - 25
README.md

@@ -48,13 +48,42 @@ syn:start().
 To register a process:
 
 ```erlang
-syn:register(Key, Port)
+syn:register(Key, Pid) -> ok | {error, Error}.
+
+Types:
+	Key = any()
+	Pid = pid()
+	Error = taken
 ```
 
-To retrieve a Pid for a Key:
+To retrieve a Pid from a Key:
 
 ```erlang
-syn:find(Key)
+syn:find_by_key(Key) -> Pid | undefined.
+
+Types:
+	Key = any()
+	Pid = pid()
+```
+
+To retrieve a Key from a Pid:
+
+```erlang
+syn:find_by_pid(Pid) -> Key | undefined.
+
+Types:
+	Pid = pid()
+	Key = any()
+```
+
+To unregister a previously a Key:
+
+```erlang
+syn:unregister(Key) -> ok | {error, Error}.
+
+Types:
+	Key = any()
+	Error = undefined
 ```
 
 Processes are automatically monitored and removed from the registry if they die.
@@ -62,39 +91,39 @@ Processes are automatically monitored and removed from the registry if they die.
 ### Conflict resolution
 After a net split, when nodes reconnect, Syn will merge the data from all the nodes in the cluster.
 
-If the same Key was used to register a process on different nodes during a net split, then there will be a conflict. By default, Syn will kill the processes of the node the conflict is being resolved on. The killing of the unwanted process happens by sending a `kill` signal with `exit(Pid, kill)`.
+If the same Key was used to register a process on different nodes during a net split, then there will be a conflict. By default, Syn will kill the processes of the node the conflict is being resolved on. By default, the killing of the unwanted process happens by sending a `kill` signal with `exit(Pid, kill)`.
+
+If this is not desired, you can change the `netsplit_conflicting_mode` to send a message to the discarded process, so that you can trigger any actions on that process (such as a graceful shutdown).
 
-If this is not desired, you can set a resolve callback function to be called for the conflicting items. The resolve function is defined as follows:
+To do so, you can use the `syn:options/1` method.
 
 ```erlang
-ResolveFun = fun((Key, {LocalPid, LocalNode}, {RemotePid, RemoteNode}) -> ChosenPid)
-Key = term()
-LocalPid = RemotePid = ChosenPid = pid()
-LocalNode = RemoteNode = atom()
+syn:options(SynOptions) -> ok.
+
+Types:
+	SynOptions = [SynOption]
+	SynOption = NetsplitConflictingMode
+	NetsplitConflictingMode = {netsplit_conflicting_mode, kill | {send_message, any()}}
 ```
 
-Where:
-  * `Key` is the conflicting Key.
-  * `LocalPid` is the Pid of the process running on the node where the conflict resolution is running (i.e. `LocalNode`, which corresponds to the value returned by `node/1`).
-  * `RemotePid` and `RemoteNode` are the Pid and Node of the other conflicting process.
+For example, if you want the message `shutdown` to be send to the discarded process:
 
-Once the resolve function is defined, you can set it with `syn:resolve_fun/1`.
+```erlang
+syn:options([
+	{netsplit_conflicting_mode, {send_message, shutdown}}
+]).
+```
 
-The following is an example on how to set a resolve function to choose the process running on the remote node, and send a graceful `shutdown` signal to the local process instead of the abrupt killing of the process.
+If instead you want theto ensure an `exit` signal is sent to the discarded process:
 
 ```erlang
-%% define the resolve fun
-ResolveFun = fun(_Key, {LocalPid, _LocalNode}, {RemotePid, _RemoteNode}) ->
-	%% send a shutdown message to the local pid
-	LocalPid ! shutdown,
-	%% select to keep the remote process
-	RemotePid
-end,
-
-%% set the fun
-syn:resolve_fun(ResolveFun).
+syn:options([
+	{netsplit_conflicting_mode, kill}
+]).
 ```
 
+This is the default, so you do not have to specify this behavior if you haven't changed it.
+
 ## Internals
 Under the hood, Syn performs dirty reads and writes into a distributed in-memory Mnesia table, synchronized across nodes.
 

+ 14 - 1
src/syn.erl

@@ -32,6 +32,7 @@
 -export([start/0, stop/0]).
 -export([register/2, unregister/1]).
 -export([find_by_key/1, find_by_pid/1]).
+-export([options/1]).
 
 
 %% ===================================================================
@@ -47,7 +48,7 @@ start() ->
 stop() ->
     ok = application:stop(syn).
 
--spec register(Key :: any(), Pid :: pid()) -> ok | {error, key_taken}.
+-spec register(Key :: any(), Pid :: pid()) -> ok | {error, taken}.
 register(Key, Pid) ->
     syn_backbone:register(Key, Pid).
 
@@ -62,3 +63,15 @@ find_by_key(Key) ->
 -spec find_by_pid(Pid :: pid()) -> Key :: any() | undefined.
 find_by_pid(Pid) ->
     syn_backbone:find_by_pid(Pid).
+
+-spec options(list()) -> ok.
+options(Options) ->
+    %% send message
+    case proplists:get_value(netsplit_conflicting_mode, Options) of
+        undefined ->
+            ok;
+        kill ->
+            syn_netsplits:conflicting_mode(kill);
+        {send_message, Message} ->
+            syn_netsplits:conflicting_mode({send_message, Message})
+    end.

+ 56 - 19
src/syn_netsplits.erl

@@ -38,9 +38,13 @@
 %% internal
 -export([get_processes_info_of_node/1]).
 -export([write_processes_info_to_node/2]).
+-export([conflicting_mode/1]).
 
 %% records
--record(state, {}).
+-record(state, {
+    conflicting_mode = kill :: kill | send_message,
+    message = undefined :: any()
+}).
 
 %% include
 -include("syn.hrl").
@@ -54,6 +58,11 @@ start_link() ->
     Options = [],
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
 
+-spec conflicting_mode(undefined | kill | {send_message, any()}) -> ok.
+conflicting_mode(undefined) -> ok;
+conflicting_mode(kill) -> gen_server:call(?MODULE, {conflicting_mode, kill});
+conflicting_mode({send_message, Message}) -> gen_server:call(?MODULE, {conflicting_mode, {send_message, Message}}).
+
 %% ===================================================================
 %% Callbacks
 %% ===================================================================
@@ -71,6 +80,7 @@ init([]) ->
     process_flag(trap_exit, true),
     %% monitor mnesia events
     mnesia:subscribe(system),
+    %% init state
     {ok, #state{}}.
 
 %% ----------------------------------------------------------------------------------------------------------
@@ -84,6 +94,16 @@ init([]) ->
     {stop, Reason :: any(), Reply :: any(), #state{}} |
     {stop, Reason :: any(), #state{}}.
 
+handle_call({conflicting_mode, kill}, _From, State) ->
+    {reply, ok, State#state{
+        conflicting_mode = kill,
+        message = undefined
+    }};
+handle_call({conflicting_mode, {send_message, Message}}, _From, State) ->
+    {reply, ok, State#state{
+        conflicting_mode = send_message,
+        message = Message
+    }};
 handle_call(Request, From, State) ->
     error_logger:warning_msg("Received from ~p an unknown call message: ~p~n", [Request, From]),
     {reply, undefined, State}.
@@ -108,9 +128,12 @@ handle_cast(Msg, State) ->
     {noreply, #state{}, Timeout :: non_neg_integer()} |
     {stop, Reason :: any(), #state{}}.
 
-handle_info({mnesia_system_event, {inconsistent_database, Context, Node}}, State) ->
+handle_info({mnesia_system_event, {inconsistent_database, Context, Node}}, #state{
+    conflicting_mode = ConflictingMode,
+    message = Message
+} = State) ->
     error_logger:warning_msg("MNESIA signalled an inconsistent database on node: ~p with context: ~p, initiating automerge~n", [Node, Context]),
-    automerge(Node),
+    automerge(Node, ConflictingMode, Message),
     {noreply, State};
 
 handle_info({mnesia_system_event, {mnesia_down, Node}}, State) when Node =/= node() ->
@@ -159,54 +182,65 @@ delete_pids_of_disconnected_node(Node) ->
         lists:foreach(DelF, NodePids)
     end).
 
--spec automerge(RemoteNode :: atom()) -> ok.
-automerge(RemoteNode) ->
+-spec automerge(RemoteNode :: atom(), ConflictingMode :: kill | send_message, Message :: any()) -> ok.
+automerge(RemoteNode, ConflictingMode, Message) ->
     global:trans({{?MODULE, automerge}, self()},
         fun() ->
             error_logger:warning_msg("AUTOMERGE starting for remote node ~s (global lock is set)~n", [RemoteNode]),
-            check_stitch(RemoteNode),
+            check_stitch(RemoteNode, ConflictingMode, Message),
             error_logger:warning_msg("AUTOMERGE done (global lock will be unset)~n")
         end).
 
--spec check_stitch(RemoteNode :: atom()) -> ok.
-check_stitch(RemoteNode) ->
+-spec check_stitch(RemoteNode :: atom(), ConflictingMode :: kill | send_message, Message :: any()) -> ok.
+check_stitch(RemoteNode, ConflictingMode, Message) ->
     case lists:member(RemoteNode, mnesia:system_info(running_db_nodes)) of
         true ->
             ok;
         false ->
-            stitch(RemoteNode),
+            stitch(RemoteNode, ConflictingMode, Message),
             ok
     end.
 
--spec stitch(RemoteNode :: atom()) -> {'ok', any()} | {'error', any()}.
-stitch(RemoteNode) ->
+-spec stitch(RemoteNode :: atom(), ConflictingMode :: kill | send_message, Message :: any()) ->
+    {'ok', any()} | {'error', any()}.
+stitch(RemoteNode, ConflictingMode, Message) ->
     mnesia_controller:connect_nodes(
         [RemoteNode],
         fun(MergeF) ->
             catch case MergeF([syn_processes_table]) of
                 {merged, _, _} = Res ->
-                    stitch_tab(RemoteNode),
+                    stitch_tab(RemoteNode, ConflictingMode, Message),
                     Res;
                 Other ->
                     Other
             end
         end).
 
--spec stitch_tab(RemoteNode :: atom()) -> ok.
-stitch_tab(RemoteNode) ->
+-spec stitch_tab(RemoteNode :: atom(), ConflictingMode :: kill | send_message, Message :: any()) -> ok.
+stitch_tab(RemoteNode, ConflictingMode, Message) ->
     %% get remote processes info
     RemoteProcessesInfo = rpc:call(RemoteNode, ?MODULE, get_processes_info_of_node, [RemoteNode]),
     %% get local processes info
     LocalProcessesInfo = get_processes_info_of_node(node()),
     %% purge doubles (if any)
-    {LocalProcessesInfo1, RemoteProcessesInfo1} = purge_double_processes_from_local_node(LocalProcessesInfo, RemoteProcessesInfo),
+    {LocalProcessesInfo1, RemoteProcessesInfo1} = purge_double_processes_from_local_node(
+        LocalProcessesInfo,
+        RemoteProcessesInfo,
+        ConflictingMode,
+        Message
+    ),
     %% write
     write_remote_processes_to_local(RemoteNode, RemoteProcessesInfo1),
     write_local_processes_to_remote(RemoteNode, LocalProcessesInfo1).
 
--spec purge_double_processes_from_local_node(LocalProcessesInfo :: list(), RemoteProcessesInfo :: list()) ->
+-spec purge_double_processes_from_local_node(
+    LocalProcessesInfo :: list(),
+    RemoteProcessesInfo :: list(),
+    ConflictingMode :: kill | send_message,
+    Message :: any()
+) ->
     {LocalProcessesInfo :: list(), RemoteProcessesInfo :: list()}.
-purge_double_processes_from_local_node(LocalProcessesInfo, RemoteProcessesInfo) ->
+purge_double_processes_from_local_node(LocalProcessesInfo, RemoteProcessesInfo, ConflictingMode, Message) ->
     %% create ETS table
     Tab = ets:new(syn_automerge_doubles_table, [set]),
 
@@ -223,8 +257,11 @@ purge_double_processes_from_local_node(LocalProcessesInfo, RemoteProcessesInfo)
                 mnesia:dirty_delete(syn_processes_table, Key),
                 %% remove it from ETS
                 ets:delete(Tab, Key),
-                %% kill the process
-                exit(LocalProcessPid, kill)
+                %% kill or send message
+                case ConflictingMode of
+                    kill -> exit(LocalProcessPid, kill);
+                    send_message -> LocalProcessPid ! Message
+                end
         end
     end,
     lists:foreach(F, RemoteProcessesInfo),

+ 66 - 3
test/syn_netsplits_SUITE.erl

@@ -36,7 +36,8 @@
 %% tests
 -export([
     two_nodes_netsplit_when_there_are_no_conflicts/1,
-    two_nodes_netsplit_when_there_are_conflicts/1
+    two_nodes_netsplit_kill_resolution_when_there_are_conflicts/1,
+    two_nodes_netsplit_message_resolution_when_there_are_conflicts/1
 ]).
 
 %% include
@@ -75,7 +76,8 @@ groups() ->
     [
         {two_nodes_netsplits, [shuffle], [
             two_nodes_netsplit_when_there_are_no_conflicts,
-            two_nodes_netsplit_when_there_are_conflicts
+            two_nodes_netsplit_kill_resolution_when_there_are_conflicts,
+            two_nodes_netsplit_message_resolution_when_there_are_conflicts
         ]}
     ].
 %% -------------------------------------------------------------------
@@ -229,7 +231,7 @@ two_nodes_netsplit_when_there_are_no_conflicts(Config) ->
     syn_test_suite_helper:kill_process(SlavePidLocal),
     syn_test_suite_helper:kill_process(SlavePidSlave).
 
-two_nodes_netsplit_when_there_are_conflicts(Config) ->
+two_nodes_netsplit_kill_resolution_when_there_are_conflicts(Config) ->
     %% get slave
     SlaveNodeName = proplists:get_value(slave_node_name, Config),
     CurrentNode = node(),
@@ -273,9 +275,70 @@ two_nodes_netsplit_when_there_are_conflicts(Config) ->
 
     %% check process
     FoundPid = syn:find_by_key(conflicting_key),
+    true = lists:member(FoundPid, [LocalPid, SlavePid]),
+
+    %% kill processes
+    syn_test_suite_helper:kill_process(LocalPid),
+    syn_test_suite_helper:kill_process(SlavePid).
+
+two_nodes_netsplit_message_resolution_when_there_are_conflicts(Config) ->
+    %% get slave
+    SlaveNodeName = proplists:get_value(slave_node_name, Config),
+    CurrentNode = node(),
+
+    %% set resolution by message shutdown
+    syn:options([{netsplit_conflicting_mode, {send_message, {self(), shutdown}}}]),
+
+    %% start processes
+    LocalPid = syn_test_suite_helper:start_process(),
+    SlavePid = syn_test_suite_helper:start_process(SlaveNodeName),
+
+    %% register
+    ok = syn:register(conflicting_key, SlavePid),
+    timer:sleep(100),
+
+    %% check tables
+    1 = mnesia:table_info(syn_processes_table, size),
+    1 = rpc:call(SlaveNodeName, mnesia, table_info, [syn_processes_table, size]),
+
+    %% check process
+    SlavePid = syn:find_by_key(conflicting_key),
 
+    %% simulate net split
+    syn_test_suite_helper:disconnect_node(SlaveNodeName),
+    timer:sleep(1000),
+
+    %% check tables
+    0 = mnesia:table_info(syn_processes_table, size),
+    [CurrentNode] = mnesia:table_info(syn_processes_table, active_replicas),
+
+    %% now register the local pid with the same key
+    ok = syn:register(conflicting_key, LocalPid),
+
+    %% check process
+    LocalPid = syn:find_by_key(conflicting_key),
+
+    %% reconnect
+    syn_test_suite_helper:connect_node(SlaveNodeName),
+    timer:sleep(1000),
+
+    %% check tables
+    1 = mnesia:table_info(syn_processes_table, size),
+    1 = rpc:call(SlaveNodeName, mnesia, table_info, [syn_processes_table, size]),
+
+    %% check process
+    FoundPid = syn:find_by_key(conflicting_key),
     true = lists:member(FoundPid, [LocalPid, SlavePid]),
 
+    %% check message received from killed pid
+    KilledPid = lists:nth(1, lists:delete(FoundPid, [LocalPid, SlavePid])),
+    receive
+        {KilledPid, terminated} -> ok;
+        Other -> ct:pal("WUT?? ~p", [Other])
+    after 5 ->
+        ok = not_received
+    end,
+
     %% kill processes
     syn_test_suite_helper:kill_process(LocalPid),
     syn_test_suite_helper:kill_process(SlavePid).

+ 1 - 1
test/syn_test_suite_helper.erl

@@ -95,5 +95,5 @@ kill_process(Pid) ->
 
 process_main() ->
     receive
-        shutdown -> ok
+        {From, shutdown} -> From ! {self(), terminated}
     end.