Browse Source

Allow to create PG and list their contents.

Roberto Ostinelli 9 years ago
parent
commit
1ff00ebd68
5 changed files with 211 additions and 23 deletions
  1. 5 0
      include/syn.hrl
  2. 13 0
      src/syn.erl
  3. 33 23
      src/syn_backbone.erl
  4. 159 0
      src/syn_pg.erl
  5. 1 0
      src/syn_sup.erl

+ 5 - 0
include/syn.hrl

@@ -30,3 +30,8 @@
     node = undefined :: atom(),
     meta = undefined :: any()
 }).
+-record(syn_pg_table, {
+    name = undefined :: any(),
+    pid = undefined :: undefined | pid() | atom(),
+    node = undefined :: atom()
+}).

+ 13 - 0
src/syn.erl

@@ -28,12 +28,17 @@
 %% API
 -export([start/0, stop/0]).
 -export([init/0]).
+
+%% global
 -export([register/2, register/3]).
 -export([unregister/1]).
 -export([find_by_key/1, find_by_key/2]).
 -export([find_by_pid/1, find_by_pid/2]).
 -export([registry_count/0, registry_count/1]).
 
+%% PG
+-export([add_to_pg/2]).
+-export([pids_of_pg/1]).
 
 %% ===================================================================
 %% API
@@ -88,6 +93,14 @@ registry_count() ->
 registry_count(Node) ->
     syn_global:count(Node).
 
+-spec add_to_pg(Name :: any(), Pid :: pid()) -> ok | {error, pid_already_in_group}.
+add_to_pg(Name, Pid) ->
+    syn_pg:add_to_pg(Name, Pid).
+
+-spec pids_of_pg(Name :: any()) -> [pid()].
+pids_of_pg(Name) ->
+    syn_pg:pids_of_pg(Name).
+
 %% ===================================================================
 %% Internal
 %% ===================================================================

+ 33 - 23
src/syn_backbone.erl

@@ -38,50 +38,60 @@
 
 -spec initdb() -> ok | {error, any()}.
 initdb() ->
-    %% get nodes
-    CurrentNode = node(),
-    ClusterNodes = [CurrentNode | nodes()],
     %% ensure all nodes are added
+    ClusterNodes = [node() | nodes()],
     mnesia:change_config(extra_db_nodes, ClusterNodes),
-    %% ensure table exists
-    case mnesia:create_table(syn_global_table, [
+    %% create tables
+    create_table(syn_global_table, [
         {type, set},
         {ram_copies, ClusterNodes},
         {attributes, record_info(fields, syn_global_table)},
         {index, [#syn_global_table.pid]},
         {storage_properties, [{ets, [{read_concurrency, true}]}]}
-    ]) of
+    ]),
+    create_table(syn_pg_table, [
+        {type, bag},
+        {ram_copies, ClusterNodes},
+        {attributes, record_info(fields, syn_pg_table)},
+        {index, [#syn_pg_table.pid]},
+        {storage_properties, [{ets, [{read_concurrency, true}]}]}
+    ]).
+
+create_table(TableName, Options) ->
+    CurrentNode = node(),
+    %% ensure table exists
+    case mnesia:create_table(TableName, Options) of
         {atomic, ok} ->
-            error_logger:info_msg("syn_global_table was successfully created"),
+            error_logger:info_msg("~p was successfully created", [TableName]),
             ok;
-        {aborted, {already_exists, syn_global_table}} ->
+        {aborted, {already_exists, TableName}} ->
             %% table already exists, try to add current node as copy
-            add_table_copy_to_current_node();
-        {aborted, {already_exists, syn_global_table, CurrentNode}} ->
+            add_table_copy_to_current_node(TableName);
+        {aborted, {already_exists, TableName, CurrentNode}} ->
             %% table already exists, try to add current node as copy
-            add_table_copy_to_current_node();
+            add_table_copy_to_current_node(TableName);
         Other ->
-            error_logger:error_msg("Error while creating syn_global_table: ~p", [Other]),
+            error_logger:error_msg("Error while creating ~p: ~p", [TableName, Other]),
             {error, Other}
     end.
 
--spec add_table_copy_to_current_node() -> ok | {error, any()}.
-add_table_copy_to_current_node() ->
+-spec add_table_copy_to_current_node(TableName :: atom()) -> ok | {error, any()}.
+add_table_copy_to_current_node(TableName) ->
+    CurrentNode = node(),
     %% wait for table
-    mnesia:wait_for_tables([syn_global_table], 10000),
+    mnesia:wait_for_tables([TableName], 10000),
     %% add copy
-    CurrentNode = node(),
-    case mnesia:add_table_copy(syn_global_table, CurrentNode, ram_copies) of
+    case mnesia:add_table_copy(TableName, CurrentNode, ram_copies) of
         {atomic, ok} ->
-            error_logger:info_msg("Copy of syn_global_table was successfully added to current node"),
+            error_logger:info_msg("Copy of ~p was successfully added to current node", [TableName]),
             ok;
-        {aborted, {already_exists, syn_global_table}} ->
-            error_logger:info_msg("Copy of syn_global_table is already added to current node"),
+        {aborted, {already_exists, TableName}} ->
+            error_logger:info_msg("Copy of ~p is already added to current node", [TableName]),
             ok;
-        {aborted, {already_exists, syn_global_table, CurrentNode}} ->
-            error_logger:info_msg("Copy of syn_global_table is already added to current node"),
+        {aborted, {already_exists, TableName, CurrentNode}} ->
+            error_logger:info_msg("Copy of ~p is already added to current node", [TableName]),
             ok;
         {aborted, Reason} ->
-            error_logger:error_msg("Error while creating copy of syn_global_table: ~p", [Reason]),
+            error_logger:error_msg("Error while creating copy of ~p: ~p", [TableName, Reason]),
             {error, Reason}
     end.

+ 159 - 0
src/syn_pg.erl

@@ -0,0 +1,159 @@
+%% ==========================================================================================================
+%% Syn - A global process registry.
+%%
+%% The MIT License (MIT)
+%%
+%% Copyright (c) 2016 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
+%%
+%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%% of this software and associated documentation files (the "Software"), to deal
+%% in the Software without restriction, including without limitation the rights
+%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%% copies of the Software, and to permit persons to whom the Software is
+%% furnished to do so, subject to the following conditions:
+%%
+%% The above copyright notice and this permission notice shall be included in
+%% all copies or substantial portions of the Software.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+%% THE SOFTWARE.
+%% ==========================================================================================================
+-module(syn_pg).
+
+%% API
+-export([start_link/0]).
+-export([add_to_pg/2]).
+-export([pids_of_pg/1]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
+
+%% records
+-record(state, {}).
+
+%% include
+-include("syn.hrl").
+
+
+%% ===================================================================
+%% API
+%% ===================================================================
+-spec start_link() -> {ok, pid()} | {error, any()}.
+start_link() ->
+    Options = [],
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
+
+-spec add_to_pg(Name :: any(), Pid :: pid()) -> ok | {error, pid_already_in_group}.
+add_to_pg(Name, Pid) ->
+    Node = node(Pid),
+    gen_server:call({?MODULE, Node}, {add_to_pg, Name, Pid}).
+
+-spec pids_of_pg(Name :: any()) -> [pid()].
+pids_of_pg(Name) ->
+    i_pids_of_pg(Name).
+
+%% ===================================================================
+%% Callbacks
+%% ===================================================================
+
+%% ----------------------------------------------------------------------------------------------------------
+%% Init
+%% ----------------------------------------------------------------------------------------------------------
+-spec init([]) ->
+    {ok, #state{}} |
+    {ok, #state{}, Timeout :: non_neg_integer()} |
+    ignore |
+    {stop, Reason :: any()}.
+init([]) ->
+    %% trap linked processes signal
+    process_flag(trap_exit, true),
+
+    %% build state
+    {ok, #state{}}.
+
+%% ----------------------------------------------------------------------------------------------------------
+%% Call messages
+%% ----------------------------------------------------------------------------------------------------------
+-spec handle_call(Request :: any(), From :: any(), #state{}) ->
+    {reply, Reply :: any(), #state{}} |
+    {reply, Reply :: any(), #state{}, Timeout :: non_neg_integer()} |
+    {noreply, #state{}} |
+    {noreply, #state{}, Timeout :: non_neg_integer()} |
+    {stop, Reason :: any(), Reply :: any(), #state{}} |
+    {stop, Reason :: any(), #state{}}.
+
+handle_call({add_to_pg, Name, Pid}, _From, State) ->
+    PidsOfPg = i_pids_of_pg(Name),
+    case lists:member(Pid, PidsOfPg) of
+        false ->
+            %% add to table
+            mnesia:dirty_write(#syn_pg_table{
+                name = Name,
+                pid = Pid,
+                node = node()
+            }),
+            %% link
+            erlang:link(Pid),
+            %% return
+            {reply, ok, State};
+        _ ->
+            {reply, pid_already_in_group, State}
+    end;
+
+handle_call(Request, From, State) ->
+    error_logger:warning_msg("Received from ~p an unknown call message: ~p", [Request, From]),
+    {reply, undefined, State}.
+
+%% ----------------------------------------------------------------------------------------------------------
+%% Cast messages
+%% ----------------------------------------------------------------------------------------------------------
+-spec handle_cast(Msg :: any(), #state{}) ->
+    {noreply, #state{}} |
+    {noreply, #state{}, Timeout :: non_neg_integer()} |
+    {stop, Reason :: any(), #state{}}.
+
+handle_cast(Msg, State) ->
+    error_logger:warning_msg("Received an unknown cast message: ~p", [Msg]),
+    {noreply, State}.
+
+%% ----------------------------------------------------------------------------------------------------------
+%% All non Call / Cast messages
+%% ----------------------------------------------------------------------------------------------------------
+-spec handle_info(Info :: any(), #state{}) ->
+    {noreply, #state{}} |
+    {noreply, #state{}, Timeout :: non_neg_integer()} |
+    {stop, Reason :: any(), #state{}}.
+
+handle_info(Info, State) ->
+    error_logger:warning_msg("Received an unknown info message: ~p", [Info]),
+    {noreply, State}.
+
+%% ----------------------------------------------------------------------------------------------------------
+%% Terminate
+%% ----------------------------------------------------------------------------------------------------------
+-spec terminate(Reason :: any(), #state{}) -> terminated.
+terminate(Reason, _State) ->
+    error_logger:info_msg("Terminating syn pg with reason: ~p", [Reason]),
+    terminated.
+
+%% ----------------------------------------------------------------------------------------------------------
+%% Convert process state when code is changed.
+%% ----------------------------------------------------------------------------------------------------------
+-spec code_change(OldVsn :: any(), #state{}, Extra :: any()) -> {ok, #state{}}.
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%% ===================================================================
+%% Internal
+%% ===================================================================
+-spec i_pids_of_pg(Name :: any()) -> [Process :: #syn_global_table{}].
+i_pids_of_pg(Name) ->
+    Processes = mnesia:dirty_read(syn_pg_table, Name),
+    lists:map(fun(Process) ->
+        Process#syn_pg_table.pid
+    end, Processes).

+ 1 - 0
src/syn_sup.erl

@@ -51,6 +51,7 @@ start_link() ->
 init([]) ->
     Children = [
         ?CHILD(syn_global, worker),
+        ?CHILD(syn_pg, worker),
         ?CHILD(syn_consistency, worker)
     ],
     {ok, {{one_for_one, 10, 10}, Children}}.