123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680 |
- %% ==========================================================================================================
- %% Syn - A global Process Registry and Process Group manager.
- %%
- %% The MIT License (MIT)
- %%
- %% Copyright (c) 2015-2021 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
- %%
- %% Permission is hereby granted, free of charge, to any person obtaining a copy
- %% of this software and associated documentation files (the "Software"), to deal
- %% in the Software without restriction, including without limitation the rights
- %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- %% copies of the Software, and to permit persons to whom the Software is
- %% furnished to do so, subject to the following conditions:
- %%
- %% The above copyright notice and this permission notice shall be included in
- %% all copies or substantial portions of the Software.
- %%
- %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- %% IMPLIED, INCLUDING BUT NOT LIMITED TO THxE WARRANTIES OF MERCHANTABILITY,
- %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
- %% THE SOFTWARE.
- %% ==========================================================================================================
- -module(syn_registry).
- -behaviour(gen_server).
- %% API
- -export([start_link/1]).
- -export([get_subcluster_nodes/1]).
- -export([lookup/1, lookup/2]).
- -export([register/2, register/3, register/4]).
- -export([unregister/1, unregister/2]).
- -export([count/1, count/2]).
- %% gen_server callbacks
- -export([
- init/1,
- handle_call/3,
- handle_cast/2,
- handle_info/2,
- handle_continue/2,
- terminate/2,
- code_change/3
- ]).
- %% tests
- -ifdef(TEST).
- -export([add_to_local_table/6, remove_from_local_table/3]).
- -endif.
- %% records
- -record(state, {
- scope = default :: atom(),
- process_name = syn_registry_default :: atom(),
- nodes = #{} :: #{node() => pid()}
- }).
- %% includes
- -include("syn.hrl").
- %% ===================================================================
- %% API
- %% ===================================================================
- -spec start_link(Scope :: atom()) ->
- {ok, Pid :: pid()} | {error, {already_started, Pid :: pid()}} | {error, Reason :: any()}.
- start_link(Scope) when is_atom(Scope) ->
- ProcessName = get_process_name_for_scope(Scope),
- Args = [Scope, ProcessName],
- gen_server:start_link({local, ProcessName}, ?MODULE, Args, []).
- -spec get_subcluster_nodes(Scope :: atom()) -> [node()].
- get_subcluster_nodes(Scope) ->
- ProcessName = get_process_name_for_scope(Scope),
- gen_server:call(ProcessName, get_subcluster_nodes).
- -spec lookup(Name :: any()) -> {pid(), Meta :: any()} | undefined.
- lookup(Name) ->
- lookup(default, Name).
- -spec lookup(Scope :: atom(), Name :: any()) -> {pid(), Meta :: any()} | undefined.
- lookup(Scope, Name) ->
- try find_registry_entry_by_name(Scope, Name) of
- undefined -> undefined;
- {{Name, Pid}, Meta, _, _, _} -> {Pid, Meta}
- catch
- error:badarg -> error({invalid_scope, Scope})
- end.
- -spec register(Name :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
- register(Name, Pid) ->
- register(default, Name, Pid, undefined).
- -spec register(NameOrScope :: any(), PidOrName :: any(), MetaOrPid :: any()) -> ok | {error, Reason :: any()}.
- register(Name, Pid, Meta) when is_pid(Pid) ->
- register(default, Name, Pid, Meta);
- register(Scope, Name, Pid) when is_pid(Pid) ->
- register(Scope, Name, Pid, undefined).
- -spec register(Scope :: atom(), Name :: any(), Pid :: pid(), Meta :: any()) -> ok | {error, Reason :: any()}.
- register(Scope, Name, Pid, Meta) ->
- ProcessName = get_process_name_for_scope(Scope),
- Node = node(Pid),
- try gen_server:call({ProcessName, Node}, {register_on_owner, node(), Name, Pid, Meta}) of
- {ok, {TablePid, TableMeta, Time}} when Node =/= node() ->
- %% update table on caller node immediately so that subsequent calls have an updated registry
- add_to_local_table(Scope, Name, Pid, Meta, Time, undefined),
- %% callback
- syn_event_handler:do_on_process_registered(Scope, Name, {TablePid, TableMeta}, {Pid, Meta}),
- %% return
- ok;
- {Response, _} ->
- Response
- catch
- exit:{noproc, {gen_server, call, _}} -> error({invalid_scope, Scope})
- end.
- -spec unregister(Name :: any()) -> ok | {error, Reason :: any()}.
- unregister(Name) ->
- unregister(default, Name).
- -spec unregister(Scope :: atom(), Name :: any()) -> ok | {error, Reason :: any()}.
- unregister(Scope, Name) ->
- % get process' node
- try find_registry_entry_by_name(Scope, Name) of
- undefined ->
- {error, undefined};
- {{Name, Pid}, Meta, _, _, _} ->
- ProcessName = get_process_name_for_scope(Scope),
- Node = node(Pid),
- case gen_server:call({ProcessName, Node}, {unregister_on_owner, node(), Name, Pid}) of
- ok when Node =/= node() ->
- %% remove table on caller node immediately so that subsequent calls have an updated registry
- remove_from_local_table(Scope, Name, Pid),
- %% callback
- syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
- %% return
- ok;
- Response ->
- Response
- end
- catch
- exit:{noproc, {gen_server, call, _}} -> error({invalid_scope, Scope});
- error:badarg -> error({invalid_scope, Scope})
- end.
- -spec count(Scope :: atom()) -> non_neg_integer().
- count(Scope) ->
- case ets:info(syn_backbone:get_table_name(syn_registry_by_name, Scope), size) of
- undefined -> error({invalid_scope, Scope});
- Value -> Value
- end.
- -spec count(Scope :: atom(), Node :: node()) -> non_neg_integer().
- count(Scope, Node) ->
- case catch ets:select_count(syn_backbone:get_table_name(syn_registry_by_name, Scope), [{
- {{'_', '_'}, '_', '_', '_', Node},
- [],
- [true]
- }]) of
- {'EXIT', {badarg, [{ets, select_count, _, _} | _]}} -> error({invalid_scope, Scope});
- Value -> Value
- end.
- %% ===================================================================
- %% Callbacks
- %% ===================================================================
- %% ----------------------------------------------------------------------------------------------------------
- %% Init
- %% ----------------------------------------------------------------------------------------------------------
- -spec init(Args :: term()) ->
- {ok, State :: term()} | {ok, State :: term(), timeout() | hibernate | {continue, term()}} |
- {stop, Reason :: term()} | ignore.
- init([Scope, ProcessName]) ->
- %% monitor nodes
- ok = net_kernel:monitor_nodes(true),
- %% rebuild monitors (if after crash)
- rebuild_monitors(Scope),
- %% build state
- State = #state{
- scope = Scope,
- process_name = ProcessName
- },
- %% init
- {ok, State, {continue, after_init}}.
- %% ----------------------------------------------------------------------------------------------------------
- %% Call messages
- %% ----------------------------------------------------------------------------------------------------------
- -spec handle_call(Request :: term(), From :: {pid(), Tag :: term()},
- State :: term()) ->
- {reply, Reply :: term(), NewState :: term()} |
- {reply, Reply :: term(), NewState :: term(), timeout() | hibernate | {continue, term()}} |
- {noreply, NewState :: term()} |
- {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
- {stop, Reason :: term(), Reply :: term(), NewState :: term()} |
- {stop, Reason :: term(), NewState :: term()}.
- handle_call(get_subcluster_nodes, _From, #state{
- nodes = Nodes
- } = State) ->
- {reply, Nodes, State};
- handle_call({register_on_owner, RequesterNode, Name, Pid, Meta}, _From, #state{
- scope = Scope
- } = State) ->
- case is_process_alive(Pid) of
- true ->
- case find_registry_entry_by_name(Scope, Name) of
- undefined ->
- %% available
- MRef = case find_monitor_for_pid(Scope, Pid) of
- undefined -> erlang:monitor(process, Pid); %% process is not monitored yet, add
- MRef0 -> MRef0
- end,
- %% add to local table
- Time = erlang:system_time(),
- add_to_local_table(Scope, Name, Pid, Meta, Time, MRef),
- %% callback
- syn_event_handler:do_on_process_registered(Scope, Name, {undefined, undefined}, {Pid, Meta}),
- %% broadcast
- broadcast({'3.0', sync_register, Scope, Name, Pid, Meta, Time}, [RequesterNode], State),
- %% return
- {reply, {ok, {undefined, undefined, Time}}, State};
- {{Name, Pid}, TableMeta, _TableTime, MRef, _TableNode} ->
- %% same pid, possibly new meta or time, overwrite
- Time = erlang:system_time(),
- add_to_local_table(Scope, Name, Pid, Meta, Time, MRef),
- %% callback
- syn_event_handler:do_on_process_registered(Scope, Name, {Pid, TableMeta}, {Pid, Meta}),
- %% broadcast
- broadcast({'3.0', sync_register, Scope, Name, Pid, Meta, Time}, State),
- %% return
- {reply, {ok, {Pid, TableMeta, Time}}, State};
- _ ->
- {reply, {{error, taken}, undefined}, State}
- end;
- false ->
- {reply, {{error, not_alive}, undefined}, State}
- end;
- handle_call({unregister_on_owner, RequesterNode, Name, Pid}, _From, #state{scope = Scope} = State) ->
- case find_registry_entry_by_name(Scope, Name) of
- {{Name, Pid}, Meta, _Time, _MRef, _Node} ->
- %% demonitor if the process is not registered under other names
- maybe_demonitor(Scope, Pid),
- %% remove from table
- remove_from_local_table(Scope, Name, Pid),
- %% callback
- syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
- %% broadcast
- broadcast({'3.0', sync_unregister, Name, Pid, Meta}, [RequesterNode], State),
- %% return
- {reply, ok, State};
- {{Name, _TablePid}, _Meta, _Time, _MRef, _Node} ->
- %% process is registered locally with another pid: race condition, wait for sync to happen & return error
- {reply, {error, race_condition}, State};
- undefined ->
- {reply, {error, undefined}, State}
- end;
- handle_call(Request, From, State) ->
- error_logger:warning_msg("SYN[~s] Received from ~p an unknown call message: ~p", [node(), Request, From]),
- {reply, undefined, State}.
- %% ----------------------------------------------------------------------------------------------------------
- %% Cast messages
- %% ----------------------------------------------------------------------------------------------------------
- -spec handle_cast(Request :: term(), State :: term()) ->
- {noreply, NewState :: term()} |
- {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
- {stop, Reason :: term(), NewState :: term()}.
- handle_cast(Msg, State) ->
- error_logger:warning_msg("SYN[~s] Received an unknown cast message: ~p", [node(), Msg]),
- {noreply, State}.
- %% ----------------------------------------------------------------------------------------------------------
- %% Info messages
- %% ----------------------------------------------------------------------------------------------------------
- -spec handle_info(Info :: timeout | term(), State :: term()) ->
- {noreply, NewState :: term()} |
- {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
- {stop, Reason :: term(), NewState :: term()}.
- handle_info({'3.0', sync_register, Scope, Name, Pid, Meta, Time}, State) ->
- handle_registry_sync(Scope, Name, Pid, Meta, Time, State),
- {noreply, State};
- handle_info({'3.0', sync_unregister, Name, Pid, Meta}, #state{scope = Scope} = State) ->
- remove_from_local_table(Scope, Name, Pid),
- %% callback
- syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
- %% return
- {noreply, State};
- handle_info({'3.0', discover, RemoteScopePid}, #state{
- scope = Scope,
- nodes = Nodes
- } = State) ->
- RemoteScopeNode = node(RemoteScopePid),
- error_logger:info_msg("SYN[~s] Received DISCOVER request from node '~s' and scope '~s'", [node(), RemoteScopeNode, Scope]),
- %% send data
- RegistryTuplesOfLocalNode = get_registry_tuples_for_node(Scope, node()),
- send_to_node(RemoteScopeNode, {'3.0', ack_sync, self(), RegistryTuplesOfLocalNode}, State),
- %% is this a new node?
- case maps:is_key(RemoteScopeNode, Nodes) of
- true ->
- %% already known, ignore
- {noreply, State};
- false ->
- %% monitor
- _MRef = monitor(process, RemoteScopePid),
- {noreply, State#state{nodes = Nodes#{RemoteScopeNode => RemoteScopePid}}}
- end;
- handle_info({'3.0', ack_sync, RemoteScopePid, RegistryTuplesOfRemoteNode}, #state{
- scope = Scope,
- nodes = Nodes
- } = State) ->
- RemoteScopeNode = node(RemoteScopePid),
- error_logger:info_msg("SYN[~s] Received ACK SYNC (~w entries) from node '~s' and scope '~s'",
- [node(), length(RegistryTuplesOfRemoteNode), RemoteScopeNode, Scope]
- ),
- %% insert tuples
- lists:foreach(fun({Name, Pid, Meta, Time}) ->
- handle_registry_sync(Scope, Name, Pid, Meta, Time, State)
- end, RegistryTuplesOfRemoteNode),
- %% is this a new node?
- case maps:is_key(RemoteScopeNode, Nodes) of
- true ->
- %% already known, ignore
- {noreply, State};
- false ->
- %% monitor
- _MRef = monitor(process, RemoteScopePid),
- %% send data
- RegistryTuplesOfLocalNode = get_registry_tuples_for_node(Scope, node()),
- send_to_node(RemoteScopeNode, {'3.0', ack_sync, self(), RegistryTuplesOfLocalNode}, State),
- %% return
- {noreply, State#state{nodes = Nodes#{RemoteScopeNode => RemoteScopePid}}}
- end;
- handle_info({'DOWN', _MRef, process, Pid, _Reason}, #state{
- scope = Scope,
- nodes = Nodes
- } = State) when node(Pid) =/= node() ->
- %% scope process down
- RemoteNode = node(Pid),
- case maps:take(RemoteNode, Nodes) of
- {Pid, Nodes1} ->
- error_logger:info_msg("SYN[~s] Scope Process ~p is DOWN on node '~s'", [node(), Scope, RemoteNode]),
- purge_registry_for_remote_node(Scope, RemoteNode),
- {noreply, State#state{nodes = Nodes1}};
- error ->
- error_logger:warning_msg("SYN[~s] Received DOWN message from unknown pid: ~p", [node(), Pid]),
- {noreply, State}
- end;
- handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{scope = Scope} = State) ->
- case find_registry_entries_by_pid(Scope, Pid) of
- [] ->
- error_logger:warning_msg(
- "SYN[~s] Received a DOWN message from an unknown process ~p with reason: ~p",
- [node(), Pid, Reason]
- );
- Entries ->
- lists:foreach(fun({{Name, _Pid}, Meta, _, _, _}) ->
- %% remove from table
- remove_from_local_table(Scope, Name, Pid),
- %% callback
- syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
- %% broadcast
- broadcast({'3.0', sync_unregister, Name, Pid, Meta}, State)
- end, Entries)
- end,
- %% return
- {noreply, State};
- handle_info({nodedown, _Node}, State) ->
- %% ignore & wait for monitor DOWN message
- {noreply, State};
- handle_info({nodeup, RemoteNode}, #state{scope = Scope} = State) ->
- error_logger:info_msg("SYN[~s] Node '~s' has joined the cluster, sending discover message for scope '~s'",
- [node(), RemoteNode, Scope]
- ),
- send_to_node(RemoteNode, {'3.0', discover, self()}, State),
- {noreply, State};
- handle_info(Info, State) ->
- error_logger:warning_msg("SYN[~s] Received an unknown info message: ~p", [node(), Info]),
- {noreply, State}.
- %% ----------------------------------------------------------------------------------------------------------
- %% Continue messages
- %% ----------------------------------------------------------------------------------------------------------
- -spec handle_continue(Info :: term(), State :: term()) ->
- {noreply, NewState :: term()} |
- {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
- {stop, Reason :: term(), NewState :: term()}.
- handle_continue(after_init, #state{scope = Scope} = State) ->
- error_logger:info_msg("SYN[~s] Discovering the cluster with scope '~s'", [node(), Scope]),
- broadcast_all({'3.0', discover, self()}, State),
- {noreply, State}.
- %% ----------------------------------------------------------------------------------------------------------
- %% Terminate
- %% ----------------------------------------------------------------------------------------------------------
- -spec terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), State :: term()) -> term().
- terminate(Reason, _State) ->
- error_logger:info_msg("SYN[~s] Terminating with reason: ~p", [node(), Reason]),
- terminated.
- %% ----------------------------------------------------------------------------------------------------------
- %% Convert process state when code is changed.
- %% ----------------------------------------------------------------------------------------------------------
- -spec code_change(OldVsn :: (term() | {down, term()}), State :: term(),
- Extra :: term()) ->
- {ok, NewState :: term()} | {error, Reason :: term()}.
- code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
- %% ===================================================================
- %% Internal
- %% ===================================================================
- -spec get_process_name_for_scope(Scope :: atom()) -> atom().
- get_process_name_for_scope(Scope) ->
- ModuleBin = atom_to_binary(?MODULE),
- ScopeBin = atom_to_binary(Scope),
- binary_to_atom(<<ModuleBin/binary, "_", ScopeBin/binary>>).
- -spec rebuild_monitors(Scope :: atom()) -> ok.
- rebuild_monitors(Scope) ->
- RegistryTuples = get_registry_tuples_for_node(Scope, node()),
- lists:foreach(fun({Name, Pid, Meta, Time}) ->
- remove_from_local_table(Scope, Name, Pid),
- case is_process_alive(Pid) of
- true ->
- MRef = erlang:monitor(process, Pid),
- add_to_local_table(Scope, Name, Pid, Meta, Time, MRef);
- _ ->
- ok
- end
- end, RegistryTuples).
- -spec broadcast(Message :: any(), #state{}) -> any().
- broadcast(Message, State) ->
- broadcast(Message, [], State).
- -spec broadcast(Message :: any(), ExcludedNodes :: [node()], #state{}) -> any().
- broadcast(Message, ExcludedNodes, #state{process_name = ProcessName, nodes = Nodes}) ->
- lists:foreach(fun(RemoteNode) ->
- erlang:send({ProcessName, RemoteNode}, Message, [noconnect])
- end, maps:keys(Nodes) -- ExcludedNodes).
- -spec broadcast_all(Message :: any(), #state{}) -> any().
- broadcast_all(Message, #state{process_name = ProcessName}) ->
- lists:foreach(fun(RemoteNode) ->
- erlang:send({ProcessName, RemoteNode}, Message, [noconnect])
- end, nodes()).
- -spec send_to_node(RemoteNode :: node(), Message :: any(), #state{}) -> any().
- send_to_node(RemoteNode, Message, #state{process_name = ProcessName}) ->
- erlang:send({ProcessName, RemoteNode}, Message, [noconnect]).
- -spec get_registry_tuples_for_node(Scope :: atom(), Node :: node()) -> [syn_registry_tuple()].
- get_registry_tuples_for_node(Scope, Node) ->
- ets:select(syn_backbone:get_table_name(syn_registry_by_name, Scope), [{
- {{'$1', '$2'}, '$3', '$4', '_', Node},
- [],
- [{{'$1', '$2', '$3', '$4'}}]
- }]).
- -spec find_registry_entry_by_name(Scope :: atom(), Name :: any()) -> Entry :: syn_registry_entry() | undefined.
- find_registry_entry_by_name(Scope, Name) ->
- case ets:select(syn_backbone:get_table_name(syn_registry_by_name, Scope), [{
- {{Name, '_'}, '_', '_', '_', '_'},
- [],
- ['$_']
- }]) of
- [RegistryEntry] -> RegistryEntry;
- [] -> undefined
- end.
- -spec find_registry_entries_by_pid(Scope :: atom(), Pid :: pid()) -> RegistryEntries :: [syn_registry_entry()].
- find_registry_entries_by_pid(Scope, Pid) when is_pid(Pid) ->
- ets:select(syn_backbone:get_table_name(syn_registry_by_pid, Scope), [{
- {{Pid, '$2'}, '$3', '$4', '$5', '$6'},
- [],
- [{{{{'$2', Pid}}, '$3', '$4', '$5', '$6'}}]
- }]).
- -spec find_monitor_for_pid(Scope :: atom(), Pid :: pid()) -> reference() | undefined.
- find_monitor_for_pid(Scope, Pid) when is_pid(Pid) ->
- case ets:select(syn_backbone:get_table_name(syn_registry_by_pid, Scope), [{
- {{Pid, '_'}, '_', '_', '$5', '_'},
- [],
- ['$5']
- }], 1) of
- {[MRef], _} -> MRef;
- '$end_of_table' -> undefined
- end.
- -spec maybe_demonitor(Scope :: atom(), Pid :: pid()) -> ok.
- maybe_demonitor(Scope, Pid) ->
- %% try to retrieve 2 items
- %% if only 1 is returned it means that no other aliases exist for the Pid
- case ets:select(syn_backbone:get_table_name(syn_registry_by_pid, Scope), [{
- {{Pid, '_'}, '_', '_', '$5', '_'},
- [],
- ['$5']
- }], 2) of
- {[MRef], _} when is_reference(MRef) ->
- %% no other aliases, demonitor
- erlang:demonitor(MRef, [flush]),
- ok;
- _ ->
- ok
- end.
- -spec add_to_local_table(
- Scope :: atom(),
- Name :: any(),
- Pid :: pid(),
- Meta :: any(),
- Time :: integer(),
- MRef :: undefined | reference()
- ) -> true.
- add_to_local_table(Scope, Name, Pid, Meta, Time, MRef) ->
- %% insert
- true = ets:insert(syn_backbone:get_table_name(syn_registry_by_name, Scope),
- {{Name, Pid}, Meta, Time, MRef, node(Pid)}
- ),
- true = ets:insert(syn_backbone:get_table_name(syn_registry_by_pid, Scope),
- {{Pid, Name}, Meta, Time, MRef, node(Pid)}
- ).
- -spec remove_from_local_table(Scope :: atom(), Name :: any(), Pid :: pid()) -> true.
- remove_from_local_table(Scope, Name, Pid) ->
- true = ets:delete(syn_backbone:get_table_name(syn_registry_by_name, Scope), {Name, Pid}),
- true = ets:delete(syn_backbone:get_table_name(syn_registry_by_pid, Scope), {Pid, Name}).
- -spec update_local_table(
- Scope :: atom(),
- Name :: any(),
- PreviousPid :: pid(),
- {
- Pid :: pid(),
- Meta :: any(),
- Time :: integer(),
- MRef :: undefined | reference()
- }
- ) -> true.
- update_local_table(Scope, Name, PreviousPid, {Pid, Meta, Time, MRef}) ->
- maybe_demonitor(Scope, PreviousPid),
- remove_from_local_table(Scope, Name, PreviousPid),
- add_to_local_table(Scope, Name, Pid, Meta, Time, MRef).
- -spec purge_registry_for_remote_node(Scope :: atom(), Node :: atom()) -> true.
- purge_registry_for_remote_node(Scope, Node) when Node =/= node() ->
- %% loop elements for callback in a separate process to free scope process
- RegistryTuples = get_registry_tuples_for_node(Scope, Node),
- spawn(fun() ->
- lists:foreach(fun({Name, Pid, Meta, _Time}) ->
- syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta)
- end, RegistryTuples)
- end),
- %% remove all from pid table
- true = ets:match_delete(syn_backbone:get_table_name(syn_registry_by_name, Scope), {{'_', '_'}, '_', '_', '_', Node}),
- true = ets:match_delete(syn_backbone:get_table_name(syn_registry_by_pid, Scope), {{'_', '_'}, '_', '_', '_', Node}).
- -spec handle_registry_sync(
- Scope :: atom(),
- Name :: any(),
- Pid :: pid(),
- Meta :: any(),
- Time :: non_neg_integer(),
- #state{}
- ) -> any().
- handle_registry_sync(Scope, Name, Pid, Meta, Time, State) ->
- case find_registry_entry_by_name(Scope, Name) of
- undefined ->
- %% no conflict
- add_to_local_table(Scope, Name, Pid, Meta, Time, undefined),
- %% callback
- syn_event_handler:do_on_process_registered(Scope, Name, {undefined, undefined}, {Pid, Meta});
- {{Name, Pid}, TableMeta, _TableTime, MRef, _TableNode} ->
- %% same pid, more recent (because it comes from the same node, which means that it's sequential)
- add_to_local_table(Scope, Name, Pid, Meta, Time, MRef),
- %% callback
- syn_event_handler:do_on_process_registered(Scope, Name, {Pid, TableMeta}, {Pid, Meta});
- {{Name, TablePid}, TableMeta, TableTime, TableMRef, _TableNode} when node(TablePid) =:= node() ->
- %% current node runs a conflicting process -> resolve
- %% * the conflict is resolved by the two nodes that own the conflicting processes
- %% * when a process is chosen, the time is updated
- %% * the node that runs the process that is kept sends the sync_register message
- %% * recipients check that the time is more recent that what they have to ensure that there are no race conditions
- resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime, TableMRef}, State);
- {{Name, TablePid}, TableMeta, TableTime, _TableMRef, _TableNode} when TableTime < Time ->
- %% current node does not own any of the conflicting processes, update
- update_local_table(Scope, Name, TablePid, {Pid, Meta, Time, undefined}),
- %% callbacks
- syn_event_handler:do_on_process_unregistered(Scope, Name, TablePid, TableMeta),
- syn_event_handler:do_on_process_registered(Scope, Name, {TablePid, TableMeta}, {Pid, Meta});
- {{Name, _TablePid}, _TableMeta, _TableTime, _TableMRef, _TableNode} ->
- %% race condition: incoming data is older, ignore
- ok
- end.
- -spec resolve_conflict(
- Scope :: atom(),
- Name :: any(),
- {Pid :: pid(), Meta :: any(), Time :: non_neg_integer()},
- {TablePid :: pid(), TableMeta :: any(), TableTime :: non_neg_integer(), TableMRef :: reference()},
- #state{}
- ) -> any().
- resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime, TableMRef}, State) ->
- %% call conflict resolution
- PidToKeep = syn_event_handler:do_resolve_registry_conflict(
- Scope,
- Name,
- {Pid, Meta, Time},
- {TablePid, TableMeta, TableTime}
- ),
- %% resolve
- case PidToKeep of
- Pid ->
- %% -> we keep the remote pid
- %% update locally, the incoming sync_register will update with the time coming from remote node
- update_local_table(Scope, Name, TablePid, {Pid, Meta, Time, undefined}),
- %% callbacks
- syn_event_handler:do_on_process_unregistered(Scope, Name, TablePid, TableMeta),
- syn_event_handler:do_on_process_registered(Scope, Name, {TablePid, TableMeta}, {Pid, Meta}),
- %% kill
- exit(TablePid, {syn_resolve_kill, Name, TableMeta}),
- error_logger:info_msg("SYN[~s] Registry CONFLICT for name ~p@~s: ~p ~p -> chosen: ~p",
- [node(), Name, Scope, Pid, TablePid, Pid]
- );
- TablePid ->
- %% -> we keep the local pid
- %% overwrite with updated time
- ResolveTime = erlang:system_time(),
- add_to_local_table(Scope, Name, TablePid, TableMeta, ResolveTime, TableMRef),
- %% broadcast
- broadcast({'3.0', sync_register, Scope, Name, TablePid, TableMeta, ResolveTime}, State),
- error_logger:info_msg("SYN[~s] Registry CONFLICT for name ~p@~s: ~p ~p -> chosen: ~p",
- [node(), Name, Scope, Pid, TablePid, TablePid]
- );
- Invalid ->
- %% remove
- maybe_demonitor(Scope, TablePid),
- remove_from_local_table(Scope, Name, TablePid),
- %% callback
- syn_event_handler:do_on_process_unregistered(Scope, Name, TablePid, TableMeta),
- %% kill local, remote will be killed by other node performing the same resolve
- exit(TablePid, {syn_resolve_kill, Name, TableMeta}),
- error_logger:info_msg("SYN[~s] Registry CONFLICT for name ~p@~s: ~p ~p -> none chosen (got: ~p)",
- [node(), Name, Scope, Pid, TablePid, Invalid]
- )
- end.
|