123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288 |
- %% ==========================================================================================================
- %% Syn - A global process registry.
- %%
- %% Copyright (C) 2015, Roberto Ostinelli <roberto@ostinelli.net>.
- %% All rights reserved.
- %%
- %% The MIT License (MIT)
- %%
- %% Copyright (c) 2015 Roberto Ostinelli
- %%
- %% Permission is hereby granted, free of charge, to any person obtaining a copy
- %% of this software and associated documentation files (the "Software"), to deal
- %% in the Software without restriction, including without limitation the rights
- %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- %% copies of the Software, and to permit persons to whom the Software is
- %% furnished to do so, subject to the following conditions:
- %%
- %% The above copyright notice and this permission notice shall be included in
- %% all copies or substantial portions of the Software.
- %%
- %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- %% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
- %% THE SOFTWARE.
- %% ==========================================================================================================
- -module(syn_backbone).
- -behaviour(gen_server).
- %% API
- -export([start_link/0]).
- -export([register/2, unregister/1]).
- -export([find_by_key/1, find_by_pid/1]).
- -export([count/0, count/1]).
- %% gen_server callbacks
- -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
- %% records
- -record(state, {}).
- %% include
- -include("syn.hrl").
- %% ===================================================================
- %% API
- %% ===================================================================
- -spec start_link() -> {ok, pid()} | {error, any()}.
- start_link() ->
- Options = [],
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
- -spec find_by_key(Key :: any()) -> pid() | undefined.
- find_by_key(Key) ->
- case mnesia:dirty_read(syn_processes_table, Key) of
- [Process] -> return_pid_if_on_connected_node(Process);
- _ -> undefined
- end.
- -spec find_by_pid(Pid :: pid()) -> Key :: any() | undefined.
- find_by_pid(Pid) ->
- case mnesia:dirty_index_read(syn_processes_table, Pid, #syn_processes_table.pid) of
- [Process] -> return_key_if_on_connected_node(Process);
- _ -> undefined
- end.
- -spec register(Key :: any(), Pid :: pid()) -> ok | {error, taken}.
- register(Key, Pid) ->
- case find_by_key(Key) of
- undefined ->
- %% get processes's node
- Node = node(Pid),
- %% add to table
- mnesia:dirty_write(#syn_processes_table{
- key = Key,
- pid = Pid,
- node = Node
- }),
- %% link
- gen_server:call({?MODULE, Node}, {link_process, Pid});
- _ ->
- {error, taken}
- end.
- -spec unregister(Key :: any()) -> ok | {error, undefined}.
- unregister(Key) ->
- case find_by_key(Key) of
- undefined ->
- {error, undefined};
- Pid ->
- remove_process_by_key(Key),
- %% unlink
- gen_server:call(?MODULE, {unlink_process, Pid})
- end.
- -spec count() -> non_neg_integer().
- count() ->
- mnesia:table_info(syn_processes_table, size).
- -spec count(Node :: atom()) -> non_neg_integer().
- count(Node) ->
- %% build match specs
- MatchHead = #syn_processes_table{node = '$2', _ = '_'},
- Guard = {'=:=', '$2', Node},
- Result = '$2',
- %% select
- Processes = mnesia:dirty_select(syn_processes_table, [{MatchHead, [Guard], [Result]}]),
- length(Processes).
- %% ===================================================================
- %% Callbacks
- %% ===================================================================
- %% ----------------------------------------------------------------------------------------------------------
- %% Init
- %% ----------------------------------------------------------------------------------------------------------
- -spec init([]) ->
- {ok, #state{}} |
- {ok, #state{}, Timeout :: non_neg_integer()} |
- ignore |
- {stop, Reason :: any()}.
- init([]) ->
- %% trap linked processes signal
- process_flag(trap_exit, true),
- %% init
- case initdb() of
- ok ->
- {ok, #state{}};
- Other ->
- {stop, Other}
- end.
- %% ----------------------------------------------------------------------------------------------------------
- %% Call messages
- %% ----------------------------------------------------------------------------------------------------------
- -spec handle_call(Request :: any(), From :: any(), #state{}) ->
- {reply, Reply :: any(), #state{}} |
- {reply, Reply :: any(), #state{}, Timeout :: non_neg_integer()} |
- {noreply, #state{}} |
- {noreply, #state{}, Timeout :: non_neg_integer()} |
- {stop, Reason :: any(), Reply :: any(), #state{}} |
- {stop, Reason :: any(), #state{}}.
- handle_call({link_process, Pid}, _From, State) ->
- erlang:link(Pid),
- {reply, ok, State};
- handle_call({unlink_process, Pid}, _From, State) ->
- erlang:unlink(Pid),
- {reply, ok, State};
- handle_call(Request, From, State) ->
- error_logger:warning_msg("Received from ~p an unknown call message: ~p~n", [Request, From]),
- {reply, undefined, State}.
- %% ----------------------------------------------------------------------------------------------------------
- %% Cast messages
- %% ----------------------------------------------------------------------------------------------------------
- -spec handle_cast(Msg :: any(), #state{}) ->
- {noreply, #state{}} |
- {noreply, #state{}, Timeout :: non_neg_integer()} |
- {stop, Reason :: any(), #state{}}.
- handle_cast(Msg, State) ->
- error_logger:warning_msg("Received an unknown cast message: ~p~n", [Msg]),
- {noreply, State}.
- %% ----------------------------------------------------------------------------------------------------------
- %% All non Call / Cast messages
- %% ----------------------------------------------------------------------------------------------------------
- -spec handle_info(Info :: any(), #state{}) ->
- {noreply, #state{}} |
- {noreply, #state{}, Timeout :: non_neg_integer()} |
- {stop, Reason :: any(), #state{}}.
- handle_info({'EXIT', Pid, Reason}, State) ->
- %% do not lock backbone
- spawn(fun() ->
- %% check if pid is in table
- case find_by_pid(Pid) of
- undefined ->
- case Reason of
- normal -> ok;
- killed -> ok;
- _ ->
- error_logger:warning_msg("Received a crash message from an unlinked process ~p with reason: ~p~n", [Pid, Reason])
- end;
- Key ->
- %% delete from table
- remove_process_by_key(Key),
- %% log
- case Reason of
- normal -> ok;
- killed -> ok;
- _ -> error_logger:error_msg("Process with key ~p crashed with reason: ~p~n", [Key, Reason])
- end
- end
- end),
- %% return
- {noreply, State};
- handle_info(Info, State) ->
- error_logger:warning_msg("Received an unknown info message: ~p~n", [Info]),
- {noreply, State}.
- %% ----------------------------------------------------------------------------------------------------------
- %% Terminate
- %% ----------------------------------------------------------------------------------------------------------
- -spec terminate(Reason :: any(), #state{}) -> terminated.
- terminate(Reason, _State) ->
- error_logger:info_msg("Terminating syn backbone with reason: ~p~n", [Reason]),
- terminated.
- %% ----------------------------------------------------------------------------------------------------------
- %% Convert process state when code is changed.
- %% ----------------------------------------------------------------------------------------------------------
- -spec code_change(OldVsn :: any(), #state{}, Extra :: any()) -> {ok, #state{}}.
- code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
- %% ===================================================================
- %% Internal
- %% ===================================================================
- -spec initdb() -> ok | {error, any()}.
- initdb() ->
- %% ensure all nodes are added - this covers when mnesia is in ram only mode
- mnesia:change_config(extra_db_nodes, [node() | nodes()]),
- %% ensure table exists
- CurrentNode = node(),
- case mnesia:create_table(syn_processes_table, [
- {type, set},
- {ram_copies, [node() | nodes()]},
- {attributes, record_info(fields, syn_processes_table)},
- {index, [#syn_processes_table.pid]},
- {storage_properties, [{ets, [{read_concurrency, true}]}]}
- ]) of
- {atomic, ok} ->
- error_logger:info_msg("syn_processes_table was successfully created.~n"),
- ok;
- {aborted, {already_exists, syn_processes_table}} ->
- %% table already exists, try to add current node as copy
- add_table_copy_to_local_node();
- {aborted, {already_exists, syn_processes_table, CurrentNode}} ->
- %% table already exists, try to add current node as copy
- add_table_copy_to_local_node();
- Other ->
- error_logger:error_msg("Error while creating syn_processes_table: ~p~n", [Other]),
- {error, Other}
- end.
- -spec add_table_copy_to_local_node() -> ok | {error, any()}.
- add_table_copy_to_local_node() ->
- CurrentNode = node(),
- case mnesia:add_table_copy(syn_processes_table, node(), ram_copies) of
- {atomic, ok} ->
- error_logger:info_msg("Copy of syn_processes_table was successfully added to current node.~n"),
- ok;
- {aborted, {already_exists, syn_processes_table}} ->
- error_logger:info_msg("Copy of syn_processes_table is already added to current node.~n"),
- ok;
- {aborted, {already_exists, syn_processes_table, CurrentNode}} ->
- error_logger:info_msg("Copy of syn_processes_table is already added to current node.~n"),
- ok;
- {aborted, Reason} ->
- error_logger:error_msg("Error while creating copy of syn_processes_table: ~p~n", [Reason]),
- {error, Reason}
- end.
- -spec return_pid_if_on_connected_node(Process :: #syn_processes_table{}) -> pid() | undefined.
- return_pid_if_on_connected_node(Process) ->
- case lists:member(Process#syn_processes_table.node, [node() | nodes()]) of
- true -> Process#syn_processes_table.pid;
- _ -> undefined
- end.
- -spec return_key_if_on_connected_node(Process :: #syn_processes_table{}) -> pid() | undefined.
- return_key_if_on_connected_node(Process) ->
- case lists:member(Process#syn_processes_table.node, [node() | nodes()]) of
- true -> Process#syn_processes_table.key;
- _ -> undefined
- end.
- -spec remove_process_by_key(Key :: any()) -> ok.
- remove_process_by_key(Key) ->
- mnesia:dirty_delete(syn_processes_table, Key).
|