Просмотр исходного кода

Add init/0 function to ensure syn is initialized when nodes are connected.

Roberto Ostinelli 10 лет назад
Родитель
Сommit
c6c1d00345

+ 9 - 0
README.md

@@ -45,6 +45,15 @@ Ensure to start Syn from your application. This can be done by either providing
 syn:start().
 ```
 
+Your application will need to connect to the nodes in your cluster. Once that is done, ensure to initialize Syn:
+
+```erlang
+syn:init().
+```
+
+Syn is now ready to register processes.
+
+
 ### Basic
 To register a process:
 

+ 5 - 0
src/syn.erl

@@ -30,6 +30,7 @@
 
 %% API
 -export([start/0, stop/0]).
+-export([init/0]).
 -export([register/2, unregister/1]).
 -export([find_by_key/1, find_by_pid/1]).
 -export([count/0, count/1]).
@@ -48,6 +49,10 @@ start() ->
 stop() ->
     ok = application:stop(syn).
 
+-spec init() -> ok.
+init() ->
+    ok = syn_backbone:initdb().
+
 -spec register(Key :: any(), Pid :: pid()) -> ok | {error, taken}.
 register(Key, Pid) ->
     syn_backbone:register(Key, Pid).

+ 37 - 42
src/syn_backbone.erl

@@ -31,6 +31,7 @@
 
 %% API
 -export([start_link/0]).
+-export([initdb/0]).
 -export([register/2, unregister/1]).
 -export([find_by_key/1, find_by_pid/1]).
 -export([count/0, count/1]).
@@ -56,6 +57,10 @@ start_link() ->
     Options = [],
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
 
+-spec initdb() -> ok | {error, any()}.
+initdb() ->
+    initdb_do().
+
 -spec find_by_key(Key :: any()) -> pid() | undefined.
 find_by_key(Key) ->
     case mnesia:dirty_read(syn_processes_table, Key) of
@@ -128,24 +133,18 @@ count(Node) ->
 init([]) ->
     %% trap linked processes signal
     process_flag(trap_exit, true),
-    %% monitor mnesia events
-    mnesia:subscribe(system),
-    %% init
-    case ensure_mnesia_table_is_configured() of
-        ok ->
-            %% get options
-            {ok, [ProcessExitCallbackModule, ProcessExitCallbackFunction]} = syn_utils:get_env_value(
-                process_exit_callback,
-                [undefined, undefined]
-            ),
-            %% build state
-            {ok, #state{
-                process_exit_callback_module = ProcessExitCallbackModule,
-                process_exit_callback_function = ProcessExitCallbackFunction
-            }};
-        Other ->
-            {stop, Other}
-    end.
+
+    %% get options
+    {ok, [ProcessExitCallbackModule, ProcessExitCallbackFunction]} = syn_utils:get_env_value(
+        process_exit_callback,
+        [undefined, undefined]
+    ),
+
+    %% build state
+    {ok, #state{
+        process_exit_callback_module = ProcessExitCallbackModule,
+        process_exit_callback_function = ProcessExitCallbackFunction
+    }}.
 
 %% ----------------------------------------------------------------------------------------------------------
 %% Call messages
@@ -225,15 +224,6 @@ handle_info({'EXIT', Pid, Reason}, #state{
     %% return
     {noreply, State};
 
-handle_info({mnesia_system_event, {mnesia_up, Node}}, State) when Node =/= node() ->
-    error_logger:info_msg("Received a MNESIA up event, ensuring db is properly initialized with node ~p~n", [Node]),
-    ensure_mnesia_table_is_configured(),
-    {noreply, State};
-
-handle_info({mnesia_system_event, _MnesiaEvent}, State) ->
-    %% ignore mnesia event
-    {noreply, State};
-
 handle_info(Info, State) ->
     error_logger:warning_msg("Received an unknown info message: ~p~n", [Info]),
     {noreply, State}.
@@ -256,45 +246,50 @@ code_change(_OldVsn, State, _Extra) ->
 %% ===================================================================
 %% Internal
 %% ===================================================================
--spec ensure_mnesia_table_is_configured() -> ok | {error, any()}.
-ensure_mnesia_table_is_configured() ->
-    %% ensure all nodes are added - this covers when mnesia is in ram only mode
-    mnesia:change_config(extra_db_nodes, [node() | nodes()]),
-    %% ensure table exists
+-spec initdb_do() -> ok | {error, any()}.
+initdb_do() ->
+    %% get nodes
     CurrentNode = node(),
+    ClusterNodes = [CurrentNode | nodes()],
+    %% ensure all nodes are added
+    mnesia:change_config(extra_db_nodes, ClusterNodes),
+    %% ensure table exists
     case mnesia:create_table(syn_processes_table, [
         {type, set},
-        {ram_copies, [node() | nodes()]},
+        {ram_copies, ClusterNodes},
         {attributes, record_info(fields, syn_processes_table)},
         {index, [#syn_processes_table.pid]},
         {storage_properties, [{ets, [{read_concurrency, true}]}]}
     ]) of
         {atomic, ok} ->
-            error_logger:info_msg("syn_processes_table was successfully created.~n"),
+            error_logger:info_msg("syn_processes_table was successfully created~n"),
             ok;
         {aborted, {already_exists, syn_processes_table}} ->
             %% table already exists, try to add current node as copy
-            add_table_copy_to_local_node();
+            add_table_copy_to_current_node();
         {aborted, {already_exists, syn_processes_table, CurrentNode}} ->
             %% table already exists, try to add current node as copy
-            add_table_copy_to_local_node();
+            add_table_copy_to_current_node();
         Other ->
             error_logger:error_msg("Error while creating syn_processes_table: ~p~n", [Other]),
             {error, Other}
     end.
 
--spec add_table_copy_to_local_node() -> ok | {error, any()}.
-add_table_copy_to_local_node() ->
+-spec add_table_copy_to_current_node() -> ok | {error, any()}.
+add_table_copy_to_current_node() ->
+    %% wait for table
+    mnesia:wait_for_tables([syn_processes_table], 10000),
+    %% add copy
     CurrentNode = node(),
-    case mnesia:add_table_copy(syn_processes_table, node(), ram_copies) of
+    case mnesia:add_table_copy(syn_processes_table, CurrentNode, ram_copies) of
         {atomic, ok} ->
-            error_logger:info_msg("Copy of syn_processes_table was successfully added to current node.~n"),
+            error_logger:info_msg("Copy of syn_processes_table was successfully added to current node~n"),
             ok;
         {aborted, {already_exists, syn_processes_table}} ->
-            error_logger:info_msg("Copy of syn_processes_table is already added to current node.~n"),
+            error_logger:info_msg("Copy of syn_processes_table is already added to current node~n"),
             ok;
         {aborted, {already_exists, syn_processes_table, CurrentNode}} ->
-            error_logger:info_msg("Copy of syn_processes_table is already added to current node.~n"),
+            error_logger:info_msg("Copy of syn_processes_table is already added to current node~n"),
             ok;
         {aborted, Reason} ->
             error_logger:error_msg("Error while creating copy of syn_processes_table: ~p~n", [Reason]),

+ 12 - 0
test/syn_create_mnesia_SUITE.erl

@@ -179,6 +179,7 @@ single_node_when_mnesia_is_ram(_Config) ->
     application:set_env(mnesia, schema_location, ram),
     %% start
     ok = syn:start(),
+    ok = syn:init(),
     %% check table exists
     true = lists:member(syn_processes_table, mnesia:system_info(tables)).
 
@@ -187,6 +188,7 @@ single_node_when_mnesia_is_opt_disc_no_schema_exists(_Config) ->
     application:set_env(mnesia, schema_location, opt_disc),
     %% start
     ok = syn:start(),
+    ok = syn:init(),
     %% check table exists
     true = lists:member(syn_processes_table, mnesia:system_info(tables)).
 
@@ -197,6 +199,7 @@ single_node_when_mnesia_is_opt_disc_schema_exists(_Config) ->
     mnesia:create_schema([node()]),
     %% start
     ok = syn:start(),
+    ok = syn:init(),
     %% check table exists
     true = lists:member(syn_processes_table, mnesia:system_info(tables)).
 
@@ -207,6 +210,7 @@ single_node_when_mnesia_is_disc(_Config) ->
     mnesia:create_schema([node()]),
     %% start
     ok = syn:start(),
+    ok = syn:init(),
     %% check table exists
     true = lists:member(syn_processes_table, mnesia:system_info(tables)).
 
@@ -218,7 +222,9 @@ two_nodes_when_mnesia_is_ram(Config) ->
     rpc:call(SlaveNode, mnesia, schema_location, [ram]),
     %% start
     ok = syn:start(),
+    ok = syn:init(),
     ok = rpc:call(SlaveNode, syn, start, []),
+    ok = rpc:call(SlaveNode, syn, init, []),
     timer:sleep(100),
     %% check table exists on local
     true = lists:member(syn_processes_table, mnesia:system_info(tables)),
@@ -234,7 +240,9 @@ two_nodes_when_mnesia_is_opt_disc_no_schema_exists(Config) ->
     rpc:call(SlaveNode, mnesia, schema_location, [opt_disc]),
     %% start
     ok = syn:start(),
+    ok = syn:init(),
     ok = rpc:call(SlaveNode, syn, start, []),
+    ok = rpc:call(SlaveNode, syn, init, []),
     timer:sleep(100),
     %% check table exists on local
     true = lists:member(syn_processes_table, mnesia:system_info(tables)),
@@ -252,7 +260,9 @@ two_nodes_when_mnesia_is_opt_disc_schema_exists(Config) ->
     mnesia:create_schema([node(), SlaveNode]),
     %% start
     ok = syn:start(),
+    ok = syn:init(),
     ok = rpc:call(SlaveNode, syn, start, []),
+    ok = rpc:call(SlaveNode, syn, init, []),
     timer:sleep(100),
     %% check table exists on local
     true = lists:member(syn_processes_table, mnesia:system_info(tables)),
@@ -270,7 +280,9 @@ two_nodes_when_mnesia_is_disc(Config) ->
     mnesia:create_schema([node(), SlaveNode]),
     %% start
     ok = syn:start(),
+    ok = syn:init(),
     ok = rpc:call(SlaveNode, syn, start, []),
+    ok = rpc:call(SlaveNode, syn, init, []),
     timer:sleep(100),
     %% check table exists on local
     true = lists:member(syn_processes_table, mnesia:system_info(tables)),

+ 9 - 0
test/syn_netsplits_SUITE.erl

@@ -193,7 +193,9 @@ two_nodes_netsplit_when_there_are_no_conflicts(Config) ->
 
     %% start syn
     ok = syn:start(),
+    ok = syn:init(),
     ok = rpc:call(SlaveNode, syn, start, []),
+    ok = rpc:call(SlaveNode, syn, init, []),
     timer:sleep(100),
 
     %% start processes
@@ -268,7 +270,9 @@ two_nodes_netsplit_kill_resolution_when_there_are_conflicts(Config) ->
 
     %% start syn
     ok = syn:start(),
+    ok = syn:init(),
     ok = rpc:call(SlaveNode, syn, start, []),
+    ok = rpc:call(SlaveNode, syn, init, []),
     timer:sleep(100),
 
     %% start processes
@@ -327,7 +331,9 @@ two_nodes_netsplit_callback_resolution_when_there_are_conflicts(Config) ->
 
     %% start syn
     ok = syn:start(),
+    ok = syn:init(),
     ok = rpc:call(SlaveNode, syn, start, []),
+    ok = rpc:call(SlaveNode, syn, init, []),
     timer:sleep(100),
 
     %% start processes
@@ -395,8 +401,11 @@ three_nodes_netsplit_kill_resolution_when_there_are_conflicts(Config) ->
 
     %% start syn
     ok = syn:start(),
+    ok = syn:init(),
     ok = rpc:call(SlaveNode, syn, start, []),
+    ok = rpc:call(SlaveNode, syn, init, []),
     ok = rpc:call(SlaveNode2, syn, start, []),
+    ok = rpc:call(SlaveNode2, syn, init, []),
     timer:sleep(100),
 
     %% start processes

+ 15 - 0
test/syn_register_processes_SUITE.erl

@@ -188,6 +188,7 @@ single_node_when_mnesia_is_ram_find_by_key(_Config) ->
     application:set_env(mnesia, schema_location, ram),
     %% start
     ok = syn:start(),
+    ok = syn:init(),
     %% start process
     Pid = syn_test_suite_helper:start_process(),
     %% retrieve
@@ -207,6 +208,7 @@ single_node_when_mnesia_is_ram_find_by_pid(_Config) ->
     application:set_env(mnesia, schema_location, ram),
     %% start
     ok = syn:start(),
+    ok = syn:init(),
     %% start process
     Pid = syn_test_suite_helper:start_process(),
     %% register
@@ -224,6 +226,7 @@ single_node_when_mnesia_is_ram_re_register_error(_Config) ->
     application:set_env(mnesia, schema_location, ram),
     %% start
     ok = syn:start(),
+    ok = syn:init(),
     %% start process
     Pid = syn_test_suite_helper:start_process(),
     Pid2 = syn_test_suite_helper:start_process(),
@@ -252,6 +255,7 @@ single_node_when_mnesia_is_ram_unregister(_Config) ->
     application:set_env(mnesia, schema_location, ram),
     %% start
     ok = syn:start(),
+    ok = syn:init(),
     %% start process
     Pid = syn_test_suite_helper:start_process(),
     %% unregister
@@ -273,6 +277,7 @@ single_node_when_mnesia_is_ram_process_count(_Config) ->
     application:set_env(mnesia, schema_location, ram),
     %% start
     ok = syn:start(),
+    ok = syn:init(),
     %% count
     0 = syn:count(),
     %% start process
@@ -301,6 +306,7 @@ single_node_when_mnesia_is_ram_callback_on_process_exit(_Config) ->
     syn_test_suite_helper:set_environment_variables(),
     %% start
     ok = syn:start(),
+    ok = syn:init(),
     %% register global process
     ResultPid = self(),
     global:register_name(syn_register_process_SUITE_result, ResultPid),
@@ -324,6 +330,7 @@ single_node_when_mnesia_is_disc_find_by_key(_Config) ->
     mnesia:create_schema([node()]),
     %% start
     ok = syn:start(),
+    ok = syn:init(),
     %% start process
     Pid = syn_test_suite_helper:start_process(),
     %% retrieve
@@ -346,7 +353,9 @@ two_nodes_when_mnesia_is_ram_find_by_key(Config) ->
     rpc:call(SlaveNode, mnesia, schema_location, [ram]),
     %% start
     ok = syn:start(),
+    ok = syn:init(),
     ok = rpc:call(SlaveNode, syn, start, []),
+    ok = rpc:call(SlaveNode, syn, init, []),
     timer:sleep(100),
     %% start process
     Pid = syn_test_suite_helper:start_process(),
@@ -374,7 +383,9 @@ two_nodes_when_mnesia_is_ram_process_count(Config) ->
     rpc:call(SlaveNode, mnesia, schema_location, [ram]),
     %% start
     ok = syn:start(),
+    ok = syn:init(),
     ok = rpc:call(SlaveNode, syn, start, []),
+    ok = rpc:call(SlaveNode, syn, init, []),
     timer:sleep(100),
     %% count
     0 = syn:count(),
@@ -424,7 +435,9 @@ two_nodes_when_mnesia_is_ram_callback_on_process_exit(Config) ->
     syn_test_suite_helper:set_environment_variables(SlaveNode),
     %% start
     ok = syn:start(),
+    ok = syn:init(),
     ok = rpc:call(SlaveNode, syn, start, []),
+    ok = rpc:call(SlaveNode, syn, init, []),
     timer:sleep(100),
     %% register global process
     ResultPid = self(),
@@ -460,7 +473,9 @@ two_nodes_when_mnesia_is_disc_find_by_pid(Config) ->
     mnesia:create_schema([node(), SlaveNode]),
     %% start
     ok = syn:start(),
+    ok = syn:init(),
     ok = rpc:call(SlaveNode, syn, start, []),
+    ok = rpc:call(SlaveNode, syn, init, []),
     timer:sleep(100),
     %% start process
     Pid = syn_test_suite_helper:start_process(),