Browse Source

Allow to leave groups.

Roberto Ostinelli 3 years ago
parent
commit
a206a40a19
4 changed files with 232 additions and 3 deletions
  1. 9 0
      src/syn.erl
  2. 147 3
      src/syn_groups.erl
  3. 1 0
      src/syn_registry.erl
  4. 75 0
      test/syn_groups_SUITE.erl

+ 9 - 0
src/syn.erl

@@ -40,6 +40,7 @@
 %% groups
 -export([get_members/1, get_members/2]).
 -export([join/2, join/3, join/4]).
+-export([leave/2, leave/3]).
 -export([groups_count/1, groups_count/2]).
 
 %% ===================================================================
@@ -163,6 +164,14 @@ join(GroupNameOrScope, PidOrGroupName, MetaOrPid) ->
 join(Scope, GroupName, Pid, Meta) ->
     syn_groups:join(Scope, GroupName, Pid, Meta).
 
+-spec leave(GroupName :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
+leave(GroupName, Pid) ->
+    syn_groups:leave(GroupName, Pid).
+
+-spec leave(Scope :: atom(), GroupName :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
+leave(Scope, GroupName, Pid) ->
+    syn_groups:leave(Scope, GroupName, Pid).
+
 -spec groups_count(Scope :: atom()) -> non_neg_integer().
 groups_count(Scope) ->
     syn_groups:count(Scope).

+ 147 - 3
src/syn_groups.erl

@@ -30,6 +30,7 @@
 -export([start_link/1]).
 -export([get_subcluster_nodes/1]).
 -export([join/2, join/3, join/4]).
+-export([leave/2, leave/3]).
 -export([get_members/1, get_members/2]).
 -export([count/1, count/2]).
 
@@ -103,6 +104,32 @@ join(Scope, GroupName, Pid, Meta) ->
             Response
     end.
 
+-spec leave(GroupName :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
+leave(GroupName, Pid) ->
+    leave(default, GroupName, Pid).
+
+-spec leave(Scope :: atom(), GroupName :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
+leave(Scope, GroupName, Pid) ->
+    case syn_backbone:get_table_name(syn_groups_by_name, Scope) of
+        undefined ->
+            error({invalid_scope, Scope});
+
+        TableByName ->
+            Node = node(Pid),
+            case syn_gen_scope:call(?MODULE, Node, Scope, {leave_on_owner, node(), GroupName, Pid}) of
+                {ok, TableByPid} when Node =/= node() ->
+                    %% remove table on caller node immediately so that subsequent calls have an updated registry
+                    remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
+                    %% callback
+                    %%syn_event_handler:do_on_process_left(Scope, GroupName, Pid, Meta),
+                    %% return
+                    ok;
+
+                {Response, _} ->
+                    Response
+            end
+    end.
+
 -spec count(Scope :: atom()) -> non_neg_integer().
 count(Scope) ->
     case syn_backbone:get_table_name(syn_groups_by_name, Scope) of
@@ -143,8 +170,10 @@ count(Scope, Node) ->
 %% Init
 %% ----------------------------------------------------------------------------------------------------------
 -spec init(#state{}) -> {ok, HandlerState :: term()}.
-init(_State) ->
+init(State) ->
     HandlerState = #{},
+    %% rebuild
+    rebuild_monitors(State),
     %% init
     {ok, HandlerState}.
 
@@ -184,6 +213,25 @@ handle_call({join_on_owner, RequesterNode, GroupName, Pid, Meta}, _From, #state{
             {reply, {{error, not_alive}, undefined}, State}
     end;
 
+handle_call({leave_on_owner, RequesterNode, GroupName, Pid}, _From, #state{
+    table_by_name = TableByName,
+    table_by_pid = TableByPid
+} = State) ->
+    case find_groups_entry_by_name_and_pid(GroupName, Pid, TableByName) of
+        undefined ->
+            {reply, {{error, not_in_group}, undefined}, State};
+
+        _ ->
+            %% is this the last group process is in?
+            maybe_demonitor(Pid, TableByPid),
+            %% remove from table
+            remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
+            %% broadcast
+            syn_gen_scope:broadcast({'3.0', sync_leave, GroupName, Pid}, [RequesterNode], State),
+            %% return
+            {reply, {ok, TableByPid}, State}
+    end;
+
 handle_call(Request, From, State) ->
     error_logger:warning_msg("SYN[~s] Received from ~p an unknown call message: ~p", [node(), From, Request]),
     {reply, undefined, State}.
@@ -207,18 +255,51 @@ handle_info({'3.0', sync_join, GroupName, Pid, Meta, Time}, #state{
             %%syn_event_handler:do_on_process_joined(Scope, GroupName, {undefined, undefined}, {Pid, Meta});
             {noreply, State};
 
-        {GroupName, Pid, _TableMeta, TableTime, _MRef, _TableNode} when Time > TableTime ->
+        {{GroupName, Pid}, _TableMeta, TableTime, _MRef, _TableNode} when Time > TableTime ->
             %% update meta
             add_to_local_table(GroupName, Pid, Meta, Time, undefined, TableByName, TableByPid),
             %% callback
             %%syn_event_handler:do_on_process_joined(Scope, GroupName, {undefined, undefined}, {Pid, Meta});
             {noreply, State};
 
-        {GroupName, Pid, _TableMeta, _TableTime, _TableMRef, _TableNode} ->
+        {{GroupName, Pid}, _TableMeta, _TableTime, _TableMRef, _TableNode} ->
             %% race condition: incoming data is older, ignore
             {noreply, State}
     end;
 
+handle_info({'3.0', sync_leave, GroupName, Pid}, #state{
+    table_by_name = TableByName,
+    table_by_pid = TableByPid
+} = State) ->
+    %% remove from table
+    remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
+    %% return
+    {noreply, State};
+
+handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{
+    table_by_name = TableByName,
+    table_by_pid = TableByPid
+} = State) ->
+    case find_groups_entries_by_pid(Pid, TableByPid) of
+        [] ->
+            error_logger:warning_msg(
+                "SYN[~s] Received a DOWN message from an unknown process ~p with reason: ~p",
+                [node(), Pid, Reason]
+            );
+
+        Entries ->
+            lists:foreach(fun({{_Pid, GroupName}, Meta, _, _, _}) ->
+                %% remove from table
+                remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
+                %% callback
+                %%syn_event_handler:do_on_process_left(Scope, GroupName, Pid, Meta),
+                %% broadcast
+                syn_gen_scope:broadcast({'3.0', sync_leave, GroupName, Pid}, State)
+            end, Entries)
+    end,
+    %% return
+    {noreply, State};
+
 handle_info(Info, State) ->
     error_logger:warning_msg("SYN[~s] Received an unknown info message: ~p", [node(), Info]),
     {noreply, State}.
@@ -246,6 +327,32 @@ purge_local_data_for_node(Node, #state{
 %% ===================================================================
 %% Internal
 %% ===================================================================
+-spec rebuild_monitors(#state{}) -> ok.
+rebuild_monitors(#state{
+    table_by_name = TableByName,
+    table_by_pid = TableByPid
+}) ->
+    GroupsTuples = get_groups_tuples_for_node(node(), TableByName),
+    lists:foreach(fun({GroupName, Pid, Meta, Time}) ->
+        remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
+        case is_process_alive(Pid) of
+            true ->
+                MRef = erlang:monitor(process, Pid),
+                add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid);
+
+            _ ->
+                ok
+        end
+    end, GroupsTuples).
+
+-spec get_groups_tuples_for_node(Node :: node(), TableByName :: atom()) -> [syn_groups_tuple()].
+get_groups_tuples_for_node(Node, TableByName) ->
+    ets:select(TableByName, [{
+        {{'$1', '$2'}, '$3', '$4', '_', Node},
+        [],
+        [{{'$1', '$2', '$3', '$4'}}]
+    }]).
+
 -spec find_monitor_for_pid(Pid :: pid(), TableByPid :: atom()) -> reference() | undefined.
 find_monitor_for_pid(Pid, TableByPid) when is_pid(Pid) ->
     %% we use select instead of lookup to limit the results and thus cover the case
@@ -267,6 +374,33 @@ find_groups_entry_by_name_and_pid(GroupName, Pid, TableByName) ->
         [Entry] -> Entry
     end.
 
+-spec find_groups_entries_by_pid(Pid :: pid(), TableByPid :: atom()) -> GroupTuples :: [syn_groups_tuple()].
+find_groups_entries_by_pid(Pid, TableByPid) when is_pid(Pid) ->
+    ets:select(TableByPid, [{
+        {{Pid, '_'}, '_', '_', '_', '_'},
+        [],
+        ['$_']
+    }]).
+
+-spec maybe_demonitor(Pid :: pid(), TableByPid :: atom()) -> ok.
+maybe_demonitor(Pid, TableByPid) ->
+    %% select 2: if only 1 is returned it means that no other aliases exist for the Pid
+    %% we use select instead of lookup to limit the results and thus cover the case
+    %% when a process is registered with a considerable amount of names
+    case ets:select(TableByPid, [{
+        {{Pid, '_'}, '_', '_', '$5', '_'},
+        [],
+        ['$5']
+    }], 2) of
+        {[MRef], _} when is_reference(MRef) ->
+            %% no other aliases, demonitor
+            erlang:demonitor(MRef, [flush]),
+            ok;
+
+        _ ->
+            ok
+    end.
+
 -spec add_to_local_table(
     GroupName :: term(),
     Pid :: pid(),
@@ -280,3 +414,13 @@ add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid) ->
     %% insert
     ets:insert(TableByName, {{GroupName, Pid}, Meta, Time, MRef, node(Pid)}),
     ets:insert(TableByPid, {{Pid, GroupName}, Meta, Time, MRef, node(Pid)}).
+
+-spec remove_from_local_table(
+    Name :: term(),
+    Pid :: pid(),
+    TableByName :: atom(),
+    TableByPid :: atom()
+) -> true.
+remove_from_local_table(GroupName, Pid, TableByName, TableByPid) ->
+    true = ets:delete(TableByName, {GroupName, Pid}),
+    true = ets:delete(TableByPid, {Pid, GroupName}).

+ 1 - 0
src/syn_registry.erl

@@ -282,6 +282,7 @@ handle_info({'3.0', sync_unregister, Name, Pid, Meta}, #state{
     syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
     %% return
     {noreply, State};
+
 handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{
     scope = Scope,
     table_by_name = TableByName,

+ 75 - 0
test/syn_groups_SUITE.erl

@@ -376,6 +376,7 @@ three_nodes_join_leave_and_monitor_default_scope(Config) ->
 
     %% errors
     {error, not_alive} = syn:join({"pid not alive"}, list_to_pid("<0.9999.0>")),
+    {error, not_in_group} = syn:leave({group, "three"}, Pid),
 
     %% retrieve
     syn_test_suite_helper:assert_wait(
@@ -405,4 +406,78 @@ three_nodes_join_leave_and_monitor_default_scope(Config) ->
     2 = syn:groups_count(default),
     2 = syn:groups_count(default, node()),
     1 = syn:groups_count(default, SlaveNode1),
+    0 = syn:groups_count(default, SlaveNode2),
+
+    %% re-join to edit meta
+    ok = syn:join({group, "one"}, PidWithMeta, <<"with updated meta">>),
+    ok = rpc:call(SlaveNode2, syn, join, [{group, "one"}, PidRemoteOn1, added_meta]), %% updated on slave 2
+
+    %% retrieve
+    syn_test_suite_helper:assert_wait(
+        lists:sort([{Pid, undefined}, {PidWithMeta, <<"with updated meta">>}, {PidRemoteOn1, added_meta}]),
+        fun() -> lists:sort(syn:get_members({group, "one"})) end
+    ),
+    syn_test_suite_helper:assert_wait(
+        lists:sort([{Pid, undefined}, {PidWithMeta, <<"with updated meta">>}, {PidRemoteOn1, added_meta}]),
+        fun() -> lists:sort(rpc:call(SlaveNode1, syn, get_members, [{group, "one"}])) end
+    ),
+    syn_test_suite_helper:assert_wait(
+        lists:sort([{Pid, undefined}, {PidWithMeta, <<"with updated meta">>}, {PidRemoteOn1, added_meta}]),
+        fun() -> lists:sort(rpc:call(SlaveNode2, syn, get_members, [{group, "one"}])) end
+    ),
+    syn_test_suite_helper:assert_wait(
+        lists:sort([{Pid, undefined}, {PidWithMeta, "with-meta-2"}]),
+        fun() -> lists:sort(syn:get_members({group, "two"})) end
+    ),
+    syn_test_suite_helper:assert_wait(
+        lists:sort([{Pid, undefined}, {PidWithMeta, "with-meta-2"}]),
+        fun() -> lists:sort(rpc:call(SlaveNode1, syn, get_members, [{group, "two"}])) end
+    ),
+    syn_test_suite_helper:assert_wait(
+        lists:sort([{Pid, undefined}, {PidWithMeta, "with-meta-2"}]),
+        fun() -> lists:sort(rpc:call(SlaveNode2, syn, get_members, [{group, "two"}])) end
+    ),
+    2 = syn:groups_count(default),
+    2 = syn:groups_count(default, node()),
+    1 = syn:groups_count(default, SlaveNode1),
+    0 = syn:groups_count(default, SlaveNode2),
+
+    %% crash scope process to ensure that monitors get recreated
+    exit(whereis(syn_groups_default), kill),
+    syn_test_suite_helper:wait_process_name_ready(syn_groups_default),
+
+    %% kill process
+    syn_test_suite_helper:kill_process(Pid),
+    syn_test_suite_helper:kill_process(PidRemoteOn1),
+    %% leave
+    ok = syn:leave({group, "one"}, PidWithMeta),
+
+    %% retrieve
+    syn_test_suite_helper:assert_wait(
+        [],
+        fun() -> lists:sort(syn:get_members({group, "one"})) end
+    ),
+    syn_test_suite_helper:assert_wait(
+        [],
+        fun() -> lists:sort(rpc:call(SlaveNode1, syn, get_members, [{group, "one"}])) end
+    ),
+    syn_test_suite_helper:assert_wait(
+        [],
+        fun() -> lists:sort(rpc:call(SlaveNode2, syn, get_members, [{group, "one"}])) end
+    ),
+    syn_test_suite_helper:assert_wait(
+        [{PidWithMeta, "with-meta-2"}],
+        fun() -> lists:sort(syn:get_members({group, "two"})) end
+    ),
+    syn_test_suite_helper:assert_wait(
+        [{PidWithMeta, "with-meta-2"}],
+        fun() -> lists:sort(rpc:call(SlaveNode1, syn, get_members, [{group, "two"}])) end
+    ),
+    syn_test_suite_helper:assert_wait(
+        [{PidWithMeta, "with-meta-2"}],
+        fun() -> lists:sort(rpc:call(SlaveNode2, syn, get_members, [{group, "two"}])) end
+    ),
+    1 = syn:groups_count(default),
+    1 = syn:groups_count(default, node()),
+    0 = syn:groups_count(default, SlaveNode1),
     0 = syn:groups_count(default, SlaveNode2).