Browse Source

Add basic registering and monitoring functionalities.

Roberto Ostinelli 10 years ago
parent
commit
3f15544399
3 changed files with 218 additions and 0 deletions
  1. 10 0
      src/syn.erl
  2. 75 0
      src/syn_backbone.erl
  3. 133 0
      test/syn_register_processes_SUITE.erl

+ 10 - 0
src/syn.erl

@@ -2,6 +2,8 @@
 
 %% API
 -export([start/0, stop/0]).
+-export([register/2]).
+-export([find_by_key/1]).
 
 
 %% ===================================================================
@@ -16,3 +18,11 @@ start() ->
 -spec stop() -> ok.
 stop() ->
     ok = application:stop(syn).
+
+-spec register(Key :: any(), Pid :: pid()) -> ok | {error, key_taken}.
+register(Key, Pid) ->
+    syn_backbone:register(Key, Pid).
+
+-spec find_by_key(Key :: any()) -> pid() | undefined.
+find_by_key(Key) ->
+    syn_backbone:find_by_key(Key).

+ 75 - 0
src/syn_backbone.erl

@@ -3,6 +3,8 @@
 
 %% API
 -export([start_link/0]).
+-export([register/2]).
+-export([find_by_key/1, find_by_pid/1]).
 
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@@ -22,6 +24,31 @@ start_link() ->
     Options = [],
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
 
+-spec find_by_key(Key :: any()) -> pid() | undefined.
+find_by_key(Key) ->
+    case mnesia:dirty_read(syn_processes_table, Key) of
+        [Process] -> return_pid_if_on_connected_node(Process);
+        _ -> undefined
+    end.
+
+-spec find_by_pid(Pid :: pid()) -> Key :: any() | undefined.
+find_by_pid(Pid) ->
+    case mnesia:dirty_index_read(syn_processes_table, Pid, #syn_processes_table.pid) of
+        [Process] -> return_key_if_on_connected_node(Process);
+        _ -> undefined
+    end.
+
+-spec register(Key :: any(), Pid :: pid()) -> ok | {error, key_taken}.
+register(Key, Pid) ->
+    %% add to table
+    mnesia:dirty_write(#syn_processes_table{
+        key = Key,
+        pid = Pid,
+        node = node()
+    }),
+    %% link
+    gen_server:call(?MODULE, {link_process, Pid}).
+
 %% ===================================================================
 %% Callbacks
 %% ===================================================================
@@ -35,6 +62,8 @@ start_link() ->
     ignore |
     {stop, Reason :: any()}.
 init([]) ->
+    %% trap linked processes signal
+    process_flag(trap_exit, true),
     %% init
     case initdb() of
         ok ->
@@ -54,6 +83,10 @@ init([]) ->
     {stop, Reason :: any(), Reply :: any(), #state{}} |
     {stop, Reason :: any(), #state{}}.
 
+handle_call({link_process, Pid}, _From, State) ->
+    erlang:link(Pid),
+    {reply, ok, State};
+
 handle_call(Request, From, State) ->
     error_logger:warning_msg("Received from ~p an unknown call message: ~p", [Request, From]),
     {reply, undefined, State}.
@@ -78,6 +111,30 @@ handle_cast(Msg, State) ->
     {noreply, #state{}, Timeout :: non_neg_integer()} |
     {stop, Reason :: any(), #state{}}.
 
+handle_info({'EXIT', Pid, Reason}, State) ->
+    %% do not lock backbone
+    spawn(fun() ->
+        %% check if pid is in table
+        case find_by_pid(Pid) of
+            undefined ->
+                case Reason of
+                    normal -> ok;
+                    _ -> error_logger:warning_msg("Received a crash message from an unlinked process ~p with reason: ~p", [Pid, Reason])
+                end;
+            Key ->
+                %% delete from table
+                remove_process_by_key(Key),
+                %% log
+                case Reason of
+                    normal -> ok;
+                    killed -> ok;
+                    _ -> error_logger:error_msg("Process with key ~p crashed with reason: ~p", [Key, Reason])
+                end
+        end
+    end),
+    %% return
+    {noreply, State};
+
 handle_info(Info, State) ->
     error_logger:warning_msg("Received an unknown info message: ~p", [Info]),
     {noreply, State}.
@@ -144,3 +201,21 @@ add_table_copy_to_local_node() ->
             error_logger:error_msg("Error while creating copy of syn_processes_table: ~p", [Reason]),
             {error, Reason}
     end.
+
+-spec return_pid_if_on_connected_node(Process :: #syn_processes_table{}) -> pid() | undefined.
+return_pid_if_on_connected_node(Process) ->
+    case lists:member(Process#syn_processes_table.node, [node() | nodes()]) of
+        true -> Process#syn_processes_table.pid;
+        _ -> undefined
+    end.
+
+-spec return_key_if_on_connected_node(Process :: #syn_processes_table{}) -> pid() | undefined.
+return_key_if_on_connected_node(Process) ->
+    case lists:member(Process#syn_processes_table.node, [node() | nodes()]) of
+        true -> Process#syn_processes_table.key;
+        _ -> undefined
+    end.
+
+-spec remove_process_by_key(Key :: any()) -> ok.
+remove_process_by_key(Key) ->
+    mnesia:dirty_delete(syn_processes_table, Key).

+ 133 - 0
test/syn_register_processes_SUITE.erl

@@ -0,0 +1,133 @@
+-module(syn_register_processes_SUITE).
+
+%% callbacks
+-export([all/0]).
+-export([init_per_suite/1, end_per_suite/1]).
+-export([groups/0, init_per_group/2, end_per_group/2]).
+
+%% tests
+-export([
+    single_node_when_mnesia_is_ram_simple_registration/1
+]).
+
+%% include
+-include_lib("common_test/include/ct.hrl").
+
+
+%% ===================================================================
+%% Callbacks
+%% ===================================================================
+
+%% -------------------------------------------------------------------
+%% Function: all() -> GroupsAndTestCases | {skip,Reason}
+%% GroupsAndTestCases = [{group,GroupName} | TestCase]
+%% GroupName = atom()
+%% TestCase = atom()
+%% Reason = term()
+%% -------------------------------------------------------------------
+all() ->
+    [
+        {group, single_node_process_registration}
+    ].
+
+%% -------------------------------------------------------------------
+%% Function: groups() -> [Group]
+%% Group = {GroupName,Properties,GroupsAndTestCases}
+%% GroupName = atom()
+%% Properties = [parallel | sequence | Shuffle | {RepeatType,N}]
+%% GroupsAndTestCases = [Group | {group,GroupName} | TestCase]
+%% TestCase = atom()
+%% Shuffle = shuffle | {shuffle,{integer(),integer(),integer()}}
+%% RepeatType = repeat | repeat_until_all_ok | repeat_until_all_fail |
+%%			   repeat_until_any_ok | repeat_until_any_fail
+%% N = integer() | forever
+%% -------------------------------------------------------------------
+groups() ->
+    [
+        {single_node_process_registration, [shuffle], [
+            single_node_when_mnesia_is_ram_simple_registration
+        ]}
+    ].
+%% -------------------------------------------------------------------
+%% Function: init_per_suite(Config0) ->
+%%				Config1 | {skip,Reason} |
+%%              {skip_and_save,Reason,Config1}
+%% Config0 = Config1 = [tuple()]
+%% Reason = term()
+%% -------------------------------------------------------------------
+init_per_suite(Config) ->
+    %% config
+    [
+        {slave_node_short_name, syn_slave}
+        | Config
+    ].
+
+%% -------------------------------------------------------------------
+%% Function: end_per_suite(Config0) -> void() | {save_config,Config1}
+%% Config0 = Config1 = [tuple()]
+%% -------------------------------------------------------------------
+end_per_suite(_Config) -> ok.
+
+%% -------------------------------------------------------------------
+%% Function: init_per_group(GroupName, Config0) ->
+%%				Config1 | {skip,Reason} |
+%%              {skip_and_save,Reason,Config1}
+%% GroupName = atom()
+%% Config0 = Config1 = [tuple()]
+%% Reason = term()
+%% -------------------------------------------------------------------
+init_per_group(_GroupName, Config) -> Config.
+
+%% -------------------------------------------------------------------
+%% Function: end_per_group(GroupName, Config0) ->
+%%				void() | {save_config,Config1}
+%% GroupName = atom()
+%% Config0 = Config1 = [tuple()]
+%% -------------------------------------------------------------------
+end_per_group(_GroupName, _Config) ->
+    clean_after_test().
+
+%% ===================================================================
+%% Tests
+%% ===================================================================
+single_node_when_mnesia_is_ram_simple_registration(_Config) ->
+    %% set schema location
+    application:set_env(mnesia, schema_location, ram),
+    %% start
+    ok = syn:start(),
+    %% start process
+    Pid = start_process(),
+    %% retrieve
+    undefined = syn:find_by_key(<<"my proc">>),
+    %% register
+    syn:register(<<"my proc">>, Pid),
+    %% retrieve
+    Pid = syn:find_by_key(<<"my proc">>),
+    %% kill process
+    kill_process(Pid),
+    timer:sleep(100),
+    %% retrieve
+    undefined = syn:find_by_key(<<"my proc">>).
+
+%% ===================================================================
+%% Internal
+%% ===================================================================
+clean_after_test() ->
+    %% stop mnesia
+    mnesia:stop(),
+    %% delete schema
+    mnesia:delete_schema([node()]),
+    %% stop syn
+    syn:stop().
+
+start_process() ->
+    Pid = spawn(?MODULE, process_main, []),
+    Pid.
+
+kill_process(Pid) ->
+    exit(Pid, kill).
+
+process_main() ->
+    receive
+        shutdown -> ok
+    end.