123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531 |
- %% ==========================================================================================================
- %% Syn - A global Process Registry and Process Group manager.
- %%
- %% The MIT License (MIT)
- %%
- %% Copyright (c) 2015-2019 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
- %%
- %% Permission is hereby granted, free of charge, to any person obtaining a copy
- %% of this software and associated documentation files (the "Software"), to deal
- %% in the Software without restriction, including without limitation the rights
- %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- %% copies of the Software, and to permit persons to whom the Software is
- %% furnished to do so, subject to the following conditions:
- %%
- %% The above copyright notice and this permission notice shall be included in
- %% all copies or substantial portions of the Software.
- %%
- %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- %% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
- %% THE SOFTWARE.
- %% ==========================================================================================================
- -module(syn_groups).
- -behaviour(gen_server).
- %% API
- -export([start_link/0]).
- -export([join/2, join/3]).
- -export([leave/2]).
- -export([get_members/1, get_members/2]).
- -export([member/2]).
- -export([get_local_members/1, get_local_members/2]).
- -export([local_member/2]).
- -export([publish/2]).
- -export([publish_to_local/2]).
- -export([multi_call/2, multi_call/3, multi_call_reply/2]).
- %% sync API
- -export([sync_join/4, sync_leave/3]).
- -export([sync_get_local_group_tuples/1]).
- %% gen_server callbacks
- -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
- %% internal
- -export([multi_call_and_receive/4]).
- %% records
- -record(state, {
- custom_event_handler = undefined :: module()
- }).
- %% macros
- -define(DEFAULT_EVENT_HANDLER_MODULE, syn_event_handler).
- -define(DEFAULT_MULTI_CALL_TIMEOUT_MS, 5000).
- %% includes
- -include("syn.hrl").
- %% ===================================================================
- %% API
- %% ===================================================================
- -spec start_link() -> {ok, pid()} | {error, any()}.
- start_link() ->
- Options = [],
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
- -spec join(GroupName :: any(), Pid :: pid()) -> ok.
- join(GroupName, Pid) ->
- join(GroupName, Pid, undefined).
- -spec join(GroupName :: any(), Pid :: pid(), Meta :: any()) -> ok.
- join(GroupName, Pid, Meta) when is_pid(Pid) ->
- Node = node(Pid),
- gen_server:call({?MODULE, Node}, {join_on_node, GroupName, Pid, Meta}).
- -spec leave(GroupName :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
- leave(GroupName, Pid) ->
- case find_process_entry_by_name_and_pid(GroupName, Pid) of
- undefined ->
- {error, not_in_group};
- _ ->
- Node = node(Pid),
- gen_server:call({?MODULE, Node}, {leave_on_node, GroupName, Pid})
- end.
- -spec get_members(Name :: any()) -> [pid()].
- get_members(GroupName) ->
- Entries = mnesia:dirty_read(syn_groups_table, GroupName),
- Pids = [Entry#syn_groups_table.pid || Entry <- Entries],
- lists:sort(Pids).
- -spec get_members(GroupName :: any(), with_meta) -> [{pid(), Meta :: any()}].
- get_members(GroupName, with_meta) ->
- Entries = mnesia:dirty_read(syn_groups_table, GroupName),
- Pids = [{Entry#syn_groups_table.pid, Entry#syn_groups_table.meta} || Entry <- Entries],
- lists:sort(Pids).
- -spec member(Pid :: pid(), GroupName :: any()) -> boolean().
- member(Pid, GroupName) ->
- case find_process_entry_by_name_and_pid(GroupName, Pid) of
- undefined -> false;
- _ -> true
- end.
- -spec get_local_members(Name :: any()) -> [pid()].
- get_local_members(GroupName) ->
- %% build name guard
- NameGuard = case is_tuple(GroupName) of
- true -> {'==', '$1', {GroupName}};
- _ -> {'=:=', '$1', GroupName}
- end,
- %% build match specs
- MatchHead = #syn_groups_table{name = '$1', node = '$2', pid = '$3', _ = '_'},
- Guards = [NameGuard, {'=:=', '$2', node()}],
- Result = '$3',
- %% select
- Pids = mnesia:dirty_select(syn_groups_table, [{MatchHead, Guards, [Result]}]),
- lists:sort(Pids).
- -spec get_local_members(GroupName :: any(), with_meta) -> [{pid(), Meta :: any()}].
- get_local_members(GroupName, with_meta) ->
- %% build name guard
- NameGuard = case is_tuple(GroupName) of
- true -> {'==', '$1', {GroupName}};
- _ -> {'=:=', '$1', GroupName}
- end,
- %% build match specs
- MatchHead = #syn_groups_table{name = '$1', node = '$2', pid = '$3', meta = '$4', _ = '_'},
- Guards = [NameGuard, {'=:=', '$2', node()}],
- Result = {{'$3', '$4'}},
- %% select
- PidsWithMeta = mnesia:dirty_select(syn_groups_table, [{MatchHead, Guards, [Result]}]),
- lists:keysort(1, PidsWithMeta).
- -spec local_member(Pid :: pid(), GroupName :: any()) -> boolean().
- local_member(Pid, GroupName) ->
- case find_process_entry_by_name_and_pid(GroupName, Pid) of
- undefined -> false;
- Entry when Entry#syn_groups_table.node =:= node() -> true;
- _ -> false
- end.
- -spec publish(GroupName :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
- publish(GroupName, Message) ->
- MemberPids = get_members(GroupName),
- FSend = fun(Pid) ->
- Pid ! Message
- end,
- lists:foreach(FSend, MemberPids),
- {ok, length(MemberPids)}.
- -spec publish_to_local(GroupName :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
- publish_to_local(GroupName, Message) ->
- MemberPids = get_local_members(GroupName),
- FSend = fun(Pid) ->
- Pid ! Message
- end,
- lists:foreach(FSend, MemberPids),
- {ok, length(MemberPids)}.
- -spec multi_call(GroupName :: any(), Message :: any()) -> {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
- multi_call(GroupName, Message) ->
- multi_call(GroupName, Message, ?DEFAULT_MULTI_CALL_TIMEOUT_MS).
- -spec multi_call(GroupName :: any(), Message :: any(), Timeout :: non_neg_integer()) ->
- {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
- multi_call(GroupName, Message, Timeout) ->
- Self = self(),
- MemberPids = get_members(GroupName),
- FSend = fun(Pid) ->
- spawn_link(?MODULE, multi_call_and_receive, [Self, Pid, Message, Timeout])
- end,
- lists:foreach(FSend, MemberPids),
- collect_replies(MemberPids).
- -spec multi_call_reply(CallerPid :: pid(), Reply :: any()) -> {syn_multi_call_reply, pid(), Reply :: any()}.
- multi_call_reply(CallerPid, Reply) ->
- CallerPid ! {syn_multi_call_reply, self(), Reply}.
- -spec sync_join(RemoteNode :: node(), GroupName :: any(), Pid :: pid(), Meta :: any()) -> ok.
- sync_join(RemoteNode, GroupName, Pid, Meta) ->
- gen_server:cast({?MODULE, RemoteNode}, {sync_join, GroupName, Pid, Meta}).
- -spec sync_leave(RemoteNode :: node(), GroupName :: any(), Pid :: pid()) -> ok.
- sync_leave(RemoteNode, GroupName, Pid) ->
- gen_server:cast({?MODULE, RemoteNode}, {sync_leave, GroupName, Pid}).
- -spec sync_get_local_group_tuples(FromNode :: node()) -> list(syn_group_tuple()).
- sync_get_local_group_tuples(FromNode) ->
- error_logger:info_msg("Syn(~p): Received request of local group tuples from remote node: ~p~n", [node(), FromNode]),
- %% build match specs
- MatchHead = #syn_groups_table{name = '$1', pid = '$2', node = '$3', meta = '$4', _ = '_'},
- Guard = {'=:=', '$3', node()},
- GroupTupleFormat = {{'$1', '$2', '$4'}},
- %% select
- mnesia:dirty_select(syn_groups_table, [{MatchHead, [Guard], [GroupTupleFormat]}]).
- %% ===================================================================
- %% Callbacks
- %% ===================================================================
- %% ----------------------------------------------------------------------------------------------------------
- %% Init
- %% ----------------------------------------------------------------------------------------------------------
- -spec init([]) ->
- {ok, #state{}} |
- {ok, #state{}, Timeout :: non_neg_integer()} |
- ignore |
- {stop, Reason :: any()}.
- init([]) ->
- %% wait for table
- case mnesia:wait_for_tables([syn_groups_table], 10000) of
- ok ->
- %% monitor nodes
- ok = net_kernel:monitor_nodes(true),
- %% get handler
- CustomEventHandler = application:get_env(syn, event_handler, ?DEFAULT_EVENT_HANDLER_MODULE),
- %% ensure that is it loaded (not using code:ensure_loaded/1 to support embedded mode)
- catch CustomEventHandler:module_info(exports),
- %% init
- {ok, #state{
- custom_event_handler = CustomEventHandler
- }};
- Reason ->
- {stop, {error_waiting_for_groups_table, Reason}}
- end.
- %% ----------------------------------------------------------------------------------------------------------
- %% Call messages
- %% ----------------------------------------------------------------------------------------------------------
- -spec handle_call(Request :: any(), From :: any(), #state{}) ->
- {reply, Reply :: any(), #state{}} |
- {reply, Reply :: any(), #state{}, Timeout :: non_neg_integer()} |
- {noreply, #state{}} |
- {noreply, #state{}, Timeout :: non_neg_integer()} |
- {stop, Reason :: any(), Reply :: any(), #state{}} |
- {stop, Reason :: any(), #state{}}.
- handle_call({join_on_node, GroupName, Pid, Meta}, _From, State) ->
- %% check if pid is alive
- case is_process_alive(Pid) of
- true ->
- join_on_node(GroupName, Pid, Meta),
- %% multicast
- multicast_join(GroupName, Pid, Meta),
- %% return
- {reply, ok, State};
- _ ->
- {reply, {error, not_alive}, State}
- end;
- handle_call({leave_on_node, GroupName, Pid}, _From, State) ->
- case leave_on_node(GroupName, Pid) of
- ok ->
- %% multicast
- multicast_leave(GroupName, Pid),
- %% return
- {reply, ok, State};
- {error, Reason} ->
- %% return
- {reply, {error, Reason}, State}
- end;
- handle_call(Request, From, State) ->
- error_logger:warning_msg("Syn(~p): Received from ~p an unknown call message: ~p~n", [node(), Request, From]),
- {reply, undefined, State}.
- %% ----------------------------------------------------------------------------------------------------------
- %% Cast messages
- %% ----------------------------------------------------------------------------------------------------------
- -spec handle_cast(Msg :: any(), #state{}) ->
- {noreply, #state{}} |
- {noreply, #state{}, Timeout :: non_neg_integer()} |
- {stop, Reason :: any(), #state{}}.
- handle_cast({sync_join, GroupName, Pid, Meta}, State) ->
- %% add to table
- add_to_local_table(GroupName, Pid, Meta, undefined),
- %% return
- {noreply, State};
- handle_cast({sync_leave, GroupName, Pid}, State) ->
- %% remove entry
- remove_from_local_table(GroupName, Pid),
- %% return
- {noreply, State};
- handle_cast(Msg, State) ->
- error_logger:warning_msg("Syn(~p): Received an unknown cast message: ~p~n", [node(), Msg]),
- {noreply, State}.
- %% ----------------------------------------------------------------------------------------------------------
- %% All non Call / Cast messages
- %% ----------------------------------------------------------------------------------------------------------
- -spec handle_info(Info :: any(), #state{}) ->
- {noreply, #state{}} |
- {noreply, #state{}, Timeout :: non_neg_integer()} |
- {stop, Reason :: any(), #state{}}.
- handle_info({'DOWN', _MonitorRef, process, Pid, Reason}, State) ->
- case find_processes_entry_by_pid(Pid) of
- [] ->
- %% handle
- handle_process_down(undefined, Pid, undefined, Reason, State);
- Entries ->
- lists:foreach(fun(Entry) ->
- %% get process info
- GroupName = Entry#syn_groups_table.name,
- Meta = Entry#syn_groups_table.meta,
- %% handle
- handle_process_down(GroupName, Pid, Meta, Reason, State),
- %% remove from table
- remove_from_local_table(Entry),
- %% multicast
- multicast_leave(GroupName, Pid)
- end, Entries)
- end,
- %% return
- {noreply, State};
- handle_info({nodeup, RemoteNode}, State) ->
- error_logger:info_msg("Syn(~p): Node ~p has joined the cluster~n", [node(), RemoteNode]),
- global:trans({{?MODULE, auto_merge_groups}, self()},
- fun() ->
- error_logger:warning_msg("Syn(~p): GROUPS AUTOMERGE ----> Initiating for remote node ~p~n", [node(), RemoteNode]),
- %% get group tuples from remote node
- GroupTuples = rpc:call(RemoteNode, ?MODULE, sync_get_local_group_tuples, [node()]),
- error_logger:warning_msg(
- "Syn(~p): Received ~p group entrie(s) from remote node ~p, writing to local~n",
- [node(), length(GroupTuples), RemoteNode]
- ),
- sync_group_tuples(RemoteNode, GroupTuples),
- %% exit
- error_logger:warning_msg("Syn(~p): GROUPS AUTOMERGE <---- Done for remote node ~p~n", [node(), RemoteNode])
- end
- ),
- %% resume
- {noreply, State};
- handle_info({nodedown, RemoteNode}, State) ->
- error_logger:warning_msg("Syn(~p): Node ~p has left the cluster, removing group entries on local~n", [node(), RemoteNode]),
- purge_group_entries_for_remote_node(RemoteNode),
- {noreply, State};
- handle_info(Info, State) ->
- error_logger:warning_msg("Syn(~p): Received an unknown info message: ~p~n", [node(), Info]),
- {noreply, State}.
- %% ----------------------------------------------------------------------------------------------------------
- %% Terminate
- %% ----------------------------------------------------------------------------------------------------------
- -spec terminate(Reason :: any(), #state{}) -> terminated.
- terminate(Reason, _State) ->
- error_logger:info_msg("Syn(~p): Terminating with reason: ~p~n", [node(), Reason]),
- terminated.
- %% ----------------------------------------------------------------------------------------------------------
- %% Convert process state when code is changed.
- %% ----------------------------------------------------------------------------------------------------------
- -spec code_change(OldVsn :: any(), #state{}, Extra :: any()) -> {ok, #state{}}.
- code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
- %% ===================================================================
- %% Internal
- %% ===================================================================
- -spec multicast_join(GroupName :: any(), Pid :: pid(), Meta :: any()) -> pid().
- multicast_join(GroupName, Pid, Meta) ->
- spawn_link(fun() ->
- lists:foreach(fun(RemoteNode) ->
- sync_join(RemoteNode, GroupName, Pid, Meta)
- end, nodes())
- end).
- -spec multicast_leave(GroupName :: any(), Pid :: pid()) -> pid().
- multicast_leave(GroupName, Pid) ->
- spawn_link(fun() ->
- lists:foreach(fun(RemoteNode) ->
- sync_leave(RemoteNode, GroupName, Pid)
- end, nodes())
- end).
- -spec join_on_node(GroupName :: any(), Pid :: pid(), Meta :: any()) -> ok.
- join_on_node(GroupName, Pid, Meta) ->
- MonitorRef = case find_processes_entry_by_pid(Pid) of
- [] ->
- %% process is not monitored yet, add
- erlang:monitor(process, Pid);
- [Entry | _] ->
- Entry#syn_groups_table.monitor_ref
- end,
- %% add to table
- add_to_local_table(GroupName, Pid, Meta, MonitorRef).
- -spec leave_on_node(GroupName :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
- leave_on_node(GroupName, Pid) ->
- case find_process_entry_by_name_and_pid(GroupName, Pid) of
- undefined ->
- {error, not_in_group};
- Entry when Entry#syn_groups_table.monitor_ref =/= undefined ->
- %% is this the last group process is in?
- case find_processes_entry_by_pid(Pid) of
- [Entry] ->
- %% demonitor
- erlang:demonitor(Entry#syn_groups_table.monitor_ref);
- _ ->
- ok
- end,
- %% remove from table
- remove_from_local_table(Entry)
- end.
- -spec add_to_local_table(GroupName :: any(), Pid :: pid(), Meta :: any(), MonitorRef :: undefined | reference()) -> ok.
- add_to_local_table(GroupName, Pid, Meta, MonitorRef) ->
- %% clean if any
- remove_from_local_table(GroupName, Pid),
- %% write
- mnesia:dirty_write(#syn_groups_table{
- name = GroupName,
- pid = Pid,
- node = node(Pid),
- meta = Meta,
- monitor_ref = MonitorRef
- }).
- -spec remove_from_local_table(GroupName :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
- remove_from_local_table(GroupName, Pid) ->
- case find_process_entry_by_name_and_pid(GroupName, Pid) of
- undefined ->
- {error, not_in_group};
- Entry ->
- %% remove from table
- remove_from_local_table(Entry)
- end.
- -spec remove_from_local_table(Entry :: #syn_groups_table{}) -> ok.
- remove_from_local_table(Entry) ->
- mnesia:dirty_delete_object(syn_groups_table, Entry).
- -spec find_processes_entry_by_pid(Pid :: pid()) -> Entries :: list(#syn_groups_table{}).
- find_processes_entry_by_pid(Pid) when is_pid(Pid) ->
- mnesia:dirty_index_read(syn_groups_table, Pid, #syn_groups_table.pid).
- -spec find_process_entry_by_name_and_pid(GroupName :: any(), Pid :: pid()) -> Entry :: #syn_groups_table{} | undefined.
- find_process_entry_by_name_and_pid(GroupName, Pid) ->
- %% build match specs
- MatchHead = #syn_groups_table{name = GroupName, pid = Pid, _ = '_'},
- Guards = [],
- Result = '$_',
- %% select
- case mnesia:dirty_select(syn_groups_table, [{MatchHead, Guards, [Result]}]) of
- [Entry] -> Entry;
- [] -> undefined
- end.
- -spec handle_process_down(GroupName :: any(), Pid :: pid(), Meta :: any(), Reason :: any(), #state{}) -> ok.
- handle_process_down(GroupName, Pid, Meta, Reason, #state{
- custom_event_handler = CustomEventHandler
- }) ->
- case GroupName of
- undefined ->
- error_logger:warning_msg(
- "Syn(~p): Received a DOWN message from an unmonitored group process ~p with reason: ~p~n",
- [node(), Pid, Reason]
- );
- _ ->
- syn_event_handler:do_on_group_process_exit(GroupName, Pid, Meta, Reason, CustomEventHandler)
- end.
- -spec sync_group_tuples(RemoteNode :: node(), GroupTuples :: [syn_registry_tuple()]) -> ok.
- sync_group_tuples(RemoteNode, GroupTuples) ->
- %% ensure that groups doesn't have any joining node's entries (here again for race conditions)
- purge_group_entries_for_remote_node(RemoteNode),
- %% loop
- F = fun({Name, RemotePid, RemoteMeta}) ->
- join_on_node(Name, RemotePid, RemoteMeta)
- end,
- %% add to table
- lists:foreach(F, GroupTuples).
- -spec purge_group_entries_for_remote_node(Node :: atom()) -> ok.
- purge_group_entries_for_remote_node(Node) when Node =/= node() ->
- %% NB: no demonitoring is done, hence why this needs to run for a remote node
- %% build match specs
- Pattern = #syn_groups_table{node = Node, _ = '_'},
- ObjectsToDelete = mnesia:dirty_match_object(syn_groups_table, Pattern),
- %% delete
- DelF = fun(Record) -> mnesia:dirty_delete_object(syn_groups_table, Record) end,
- lists:foreach(DelF, ObjectsToDelete).
- -spec multi_call_and_receive(
- CollectorPid :: pid(),
- Pid :: pid(),
- Message :: any(),
- Timeout :: non_neg_integer()
- ) -> any().
- multi_call_and_receive(CollectorPid, Pid, Message, Timeout) ->
- MonitorRef = monitor(process, Pid),
- Pid ! {syn_multi_call, self(), Message},
- receive
- {syn_multi_call_reply, Pid, Reply} ->
- CollectorPid ! {reply, Pid, Reply};
- {'DOWN', MonitorRef, _, _, _} ->
- CollectorPid ! {bad_pid, Pid}
- after Timeout ->
- CollectorPid ! {bad_pid, Pid}
- end.
- -spec collect_replies(MemberPids :: [pid()]) -> {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
- collect_replies(MemberPids) ->
- collect_replies(MemberPids, [], []).
- -spec collect_replies(MemberPids :: [pid()], [{pid(), Reply :: any()}], [pid()]) ->
- {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
- collect_replies([], Replies, BadPids) -> {Replies, BadPids};
- collect_replies(MemberPids, Replies, BadPids) ->
- receive
- {reply, Pid, Reply} ->
- MemberPids1 = lists:delete(Pid, MemberPids),
- collect_replies(MemberPids1, [{Pid, Reply} | Replies], BadPids);
- {bad_pid, Pid} ->
- MemberPids1 = lists:delete(Pid, MemberPids),
- collect_replies(MemberPids1, Replies, [Pid | BadPids])
- end.
|