Browse Source

Add groups join.

Roberto Ostinelli 3 years ago
parent
commit
aba9cb4bfd
6 changed files with 296 additions and 8 deletions
  1. 36 1
      src/syn.erl
  2. 5 2
      src/syn.hrl
  3. 2 2
      src/syn_backbone.erl
  4. 164 0
      src/syn_groups.erl
  5. 72 3
      test/syn_groups_SUITE.erl
  6. 17 0
      test/syn_test_suite_helper.erl

+ 36 - 1
src/syn.erl

@@ -27,14 +27,20 @@
 
 %% API
 -export([start/0, stop/0]).
+%% scopes
 -export([get_node_scopes/0, add_node_to_scope/1, add_node_to_scopes/1]).
 -export([set_event_handler/1]).
+%% registry
 -export([lookup/1, lookup/2]).
 -export([register/2, register/3, register/4]).
 -export([unregister/1, unregister/2]).
 -export([registry_count/1, registry_count/2]).
 %% gen_server via interface
 -export([register_name/2, unregister_name/1, whereis_name/1, send/2]).
+%% groups
+-export([get_members/1, get_members/2]).
+-export([join/2, join/3, join/4]).
+-export([groups_count/1, groups_count/2]).
 
 %% ===================================================================
 %% API
@@ -72,7 +78,7 @@ set_event_handler(Module) ->
 lookup(Name) ->
     syn_registry:lookup(Name).
 
--spec lookup(Scope ::atom(), Name :: any()) -> {pid(), Meta :: any()} | undefined.
+-spec lookup(Scope :: atom(), Name :: any()) -> {pid(), Meta :: any()} | undefined.
 lookup(Scope, Name) ->
     syn_registry:lookup(Scope, Name).
 
@@ -135,3 +141,32 @@ send(Name, Message) ->
             Pid ! Message,
             Pid
     end.
+
+%% ----- \/ groups ---------------------------------------------------
+-spec get_members(GroupName :: term()) -> [{Pid :: pid(), Meta :: term()}].
+get_members(GroupName) ->
+    syn_groups:get_members(GroupName).
+
+-spec get_members(Scope :: atom(), GroupName :: term()) -> [{Pid :: pid(), Meta :: term()}].
+get_members(Scope, GroupName) ->
+    syn_groups:get_members(Scope, GroupName).
+
+-spec join(GroupName :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
+join(GroupName, Pid) ->
+    syn_groups:join(GroupName, Pid).
+
+-spec join(GroupNameOrScope :: any(), PidOrGroupName :: any(), MetaOrPid :: any()) -> ok | {error, Reason :: any()}.
+join(GroupNameOrScope, PidOrGroupName, MetaOrPid) ->
+    syn_groups:join(GroupNameOrScope, PidOrGroupName, MetaOrPid).
+
+-spec join(Scope :: atom(), GroupName :: any(), Pid :: pid(), Meta :: any()) -> ok | {error, Reason :: any()}.
+join(Scope, GroupName, Pid, Meta) ->
+    syn_groups:join(Scope, GroupName, Pid, Meta).
+
+-spec groups_count(Scope :: atom()) -> non_neg_integer().
+groups_count(Scope) ->
+    syn_groups:count(Scope).
+
+-spec groups_count(Scope :: atom(), Node :: node()) -> non_neg_integer().
+groups_count(Scope, Node) ->
+    syn_groups:count(Scope, Node).

+ 5 - 2
src/syn.hrl

@@ -51,9 +51,12 @@
     Time :: integer()
 }.
 -type syn_groups_entry() :: {
-    GroupName :: any(),
-    Pid :: pid(),
+    {
+        GroupName :: any(),
+        Pid :: pid()
+    },
     Meta :: any(),
+    Time :: integer(),
     MRef :: undefined | reference(),
     Node :: node()
 }.

+ 2 - 2
src/syn_backbone.erl

@@ -101,8 +101,8 @@ handle_call({create_tables_for_scope, Scope}, _From, State) ->
     error_logger:info_msg("SYN[~s] Creating tables for scope '~s'", [node(), Scope]),
     ensure_table_exists(set, syn_registry_by_name, Scope),
     ensure_table_exists(bag, syn_registry_by_pid, Scope),
-    ensure_table_exists(set, syn_groups_by_name, Scope),
-    ensure_table_exists(bag, syn_groups_by_pid, Scope),
+    ensure_table_exists(ordered_set, syn_groups_by_name, Scope),
+    ensure_table_exists(ordered_set, syn_groups_by_pid, Scope),
     {reply, ok, State};
 
 handle_call(Request, From, State) ->

+ 164 - 0
src/syn_groups.erl

@@ -29,6 +29,9 @@
 %% API
 -export([start_link/1]).
 -export([get_subcluster_nodes/1]).
+-export([join/2, join/3, join/4]).
+-export([get_members/1, get_members/2]).
+-export([count/1, count/2]).
 
 %% syn_gen_scope callbacks
 -export([
@@ -55,6 +58,83 @@ start_link(Scope) when is_atom(Scope) ->
 get_subcluster_nodes(Scope) ->
     syn_gen_scope:get_subcluster_nodes(?MODULE, Scope).
 
+-spec get_members(GroupName :: term()) -> [{Pid :: pid(), Meta :: term()}].
+get_members(GroupName) ->
+    get_members(default, GroupName).
+
+-spec get_members(Scope :: atom(), GroupName :: term()) -> [{Pid :: pid(), Meta :: term()}].
+get_members(Scope, GroupName) ->
+    case syn_backbone:get_table_name(syn_groups_by_name, Scope) of
+        undefined ->
+            error({invalid_scope, Scope});
+
+        TableByName ->
+            ets:select(TableByName, [{
+                {{GroupName, '$2'}, '$3', '_', '_', '_'},
+                [],
+                [{{'$2', '$3'}}]
+            }])
+    end.
+
+-spec join(GroupName :: term(), Pid :: pid()) -> ok.
+join(GroupName, Pid) ->
+    join(GroupName, Pid, undefined).
+
+-spec join(GroupNameOrScope :: term(), PidOrGroupName :: term(), MetaOrPid :: term()) -> ok.
+join(GroupName, Pid, Meta) when is_pid(Pid) ->
+    join(default, GroupName, Pid, Meta);
+
+join(Scope, GroupName, Pid) when is_pid(Pid) ->
+    join(Scope, GroupName, Pid, undefined).
+
+-spec join(Scope :: atom(), GroupName :: term(), Pid :: pid(), Meta :: term()) -> ok.
+join(Scope, GroupName, Pid, Meta) ->
+    Node = node(Pid),
+    case syn_gen_scope:call(?MODULE, Node, Scope, {join_on_owner, node(), GroupName, Pid, Meta}) of
+        {ok, {Time, TableByName, TableByPid}} when Node =/= node() ->
+            %% update table on caller node immediately so that subsequent calls have an updated registry
+            add_to_local_table(GroupName, Pid, Meta, Time, undefined, TableByName, TableByPid),
+            %% callback
+            %%syn_event_handler:do_on_process_joined(Scope, GroupName, {TablePid, TableMeta}, {Pid, Meta}),
+            %% return
+            ok;
+
+        {Response, _} ->
+            Response
+    end.
+
+-spec count(Scope :: atom()) -> non_neg_integer().
+count(Scope) ->
+    case syn_backbone:get_table_name(syn_groups_by_name, Scope) of
+        undefined ->
+            error({invalid_scope, Scope});
+
+        TableByName ->
+            Entries = ets:select(TableByName, [{
+                {{'$1', '_'}, '_', '_', '_', '_'},
+                [],
+                ['$1']
+            }]),
+            Set = sets:from_list(Entries),
+            sets:size(Set)
+    end.
+
+-spec count(Scope :: atom(), Node :: node()) -> non_neg_integer().
+count(Scope, Node) ->
+    case syn_backbone:get_table_name(syn_groups_by_name, Scope) of
+        undefined ->
+            error({invalid_scope, Scope});
+
+        TableByName ->
+            Entries = ets:select(TableByName, [{
+                {{'$1', '_'}, '_', '_', '_', Node},
+                [],
+                ['$1']
+            }]),
+            Set = sets:from_list(Entries),
+            sets:size(Set)
+    end.
+
 %% ===================================================================
 %% Callbacks
 %% ===================================================================
@@ -78,6 +158,32 @@ init(_State) ->
     {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
     {stop, Reason :: term(), Reply :: term(), #state{}} |
     {stop, Reason :: term(), #state{}}.
+handle_call({join_on_owner, RequesterNode, GroupName, Pid, Meta}, _From, #state{
+    scope = Scope,
+    table_by_name = TableByName,
+    table_by_pid = TableByPid
+} = State) ->
+    case is_process_alive(Pid) of
+        true ->
+            %% available
+            MRef = case find_monitor_for_pid(Pid, TableByPid) of
+                undefined -> erlang:monitor(process, Pid);  %% process is not monitored yet, add
+                MRef0 -> MRef0
+            end,
+            %% add to local table
+            Time = erlang:system_time(),
+            add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid),
+            %% callback
+            %%syn_event_handler:do_on_process_joined(Scope, GroupName, {undefined, undefined}, {Pid, Meta}),
+            %% broadcast
+            syn_gen_scope:broadcast({'3.0', sync_join, GroupName, Pid, Meta, Time}, [RequesterNode], State),
+            %% return
+            {reply, {ok, {Time, TableByName, TableByPid}}, State};
+
+        false ->
+            {reply, {{error, not_alive}, undefined}, State}
+    end;
+
 handle_call(Request, From, State) ->
     error_logger:warning_msg("SYN[~s] Received from ~p an unknown call message: ~p", [node(), From, Request]),
     {reply, undefined, State}.
@@ -89,6 +195,30 @@ handle_call(Request, From, State) ->
     {noreply, #state{}} |
     {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
     {stop, Reason :: term(), #state{}}.
+handle_info({'3.0', sync_join, GroupName, Pid, Meta, Time}, #state{
+    table_by_name = TableByName,
+    table_by_pid = TableByPid
+} = State) ->
+    case find_groups_entry_by_name_and_pid(GroupName, Pid, TableByName) of
+        undefined ->
+            %% new
+            add_to_local_table(GroupName, Pid, Meta, Time, undefined, TableByName, TableByPid),
+            %% callback
+            %%syn_event_handler:do_on_process_joined(Scope, GroupName, {undefined, undefined}, {Pid, Meta});
+            {noreply, State};
+
+        {GroupName, Pid, _TableMeta, TableTime, _MRef, _TableNode} when Time > TableTime ->
+            %% update meta
+            add_to_local_table(GroupName, Pid, Meta, Time, undefined, TableByName, TableByPid),
+            %% callback
+            %%syn_event_handler:do_on_process_joined(Scope, GroupName, {undefined, undefined}, {Pid, Meta});
+            {noreply, State};
+
+        {GroupName, Pid, _TableMeta, _TableTime, _TableMRef, _TableNode} ->
+            %% race condition: incoming data is older, ignore
+            {noreply, State}
+    end;
+
 handle_info(Info, State) ->
     error_logger:warning_msg("SYN[~s] Received an unknown info message: ~p", [node(), Info]),
     {noreply, State}.
@@ -116,3 +246,37 @@ purge_local_data_for_node(Node, #state{
 %% ===================================================================
 %% Internal
 %% ===================================================================
+-spec find_monitor_for_pid(Pid :: pid(), TableByPid :: atom()) -> reference() | undefined.
+find_monitor_for_pid(Pid, TableByPid) when is_pid(Pid) ->
+    %% we use select instead of lookup to limit the results and thus cover the case
+    %% when a process is registered with a considerable amount of names
+    case ets:select(TableByPid, [{
+        {{Pid, '_'}, '_', '_', '$5', '_'},
+        [],
+        ['$5']
+    }], 1) of
+        {[MRef], _} -> MRef;
+        '$end_of_table' -> undefined
+    end.
+
+-spec find_groups_entry_by_name_and_pid(GroupName :: any(), Pid :: pid(), TableByName :: atom()) ->
+    Entry :: syn_groups_entry() | undefined.
+find_groups_entry_by_name_and_pid(GroupName, Pid, TableByName) ->
+    case ets:lookup(TableByName, {GroupName, Pid}) of
+        [] -> undefined;
+        [Entry] -> Entry
+    end.
+
+-spec add_to_local_table(
+    GroupName :: term(),
+    Pid :: pid(),
+    Meta :: term(),
+    Time :: integer(),
+    MRef :: undefined | reference(),
+    TableByName :: atom(),
+    TableByPid :: atom()
+) -> true.
+add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid) ->
+    %% insert
+    ets:insert(TableByName, {{GroupName, Pid}, Meta, Time, MRef, node(Pid)}),
+    ets:insert(TableByPid, {{Pid, GroupName}, Meta, Time, MRef, node(Pid)}).

+ 72 - 3
test/syn_groups_SUITE.erl

@@ -34,7 +34,8 @@
 %% tests
 -export([
     three_nodes_discover_default_scope/1,
-    three_nodes_discover_custom_scope/1
+    three_nodes_discover_custom_scope/1,
+    three_nodes_join_leave_and_monitor_default_scope/1
 ]).
 
 %% include
@@ -72,8 +73,9 @@ all() ->
 groups() ->
     [
         {three_nodes_groups, [shuffle], [
-            three_nodes_discover_default_scope,
-            three_nodes_discover_custom_scope
+%%            three_nodes_discover_default_scope,
+%%            three_nodes_discover_custom_scope,
+            three_nodes_join_leave_and_monitor_default_scope
         ]}
     ].
 %% -------------------------------------------------------------------
@@ -337,3 +339,70 @@ three_nodes_discover_custom_scope(Config) ->
     syn_test_suite_helper:assert_groups_scope_subcluster(SlaveNode2, custom_scope_bc, [SlaveNode1]),
     syn_test_suite_helper:assert_groups_scope_subcluster(SlaveNode2, custom_scope_c, []),
     syn_test_suite_helper:assert_groups_scope_subcluster(SlaveNode2, custom_scope_all, [node(), SlaveNode1]).
+
+three_nodes_join_leave_and_monitor_default_scope(Config) ->
+    %% get slaves
+    SlaveNode1 = proplists:get_value(slave_node_1, Config),
+    SlaveNode2 = proplists:get_value(slave_node_2, Config),
+
+    %% start syn on nodes
+    ok = syn:start(),
+    ok = rpc:call(SlaveNode1, syn, start, []),
+    ok = rpc:call(SlaveNode2, syn, start, []),
+
+    %% start processes
+    Pid = syn_test_suite_helper:start_process(),
+    PidWithMeta = syn_test_suite_helper:start_process(),
+    PidRemoteOn1 = syn_test_suite_helper:start_process(SlaveNode1),
+
+    %% check
+    [] = syn:get_members({group, "one"}),
+    [] = rpc:call(SlaveNode1, syn, get_members, [{group, "one"}]),
+    [] = rpc:call(SlaveNode2, syn, get_members, [{group, "one"}]),
+    [] = syn:get_members({group, "two"}),
+    [] = rpc:call(SlaveNode1, syn, get_members, [{group, "two"}]),
+    [] = rpc:call(SlaveNode2, syn, get_members, [{group, "two"}]),
+    0 = syn:groups_count(default),
+    0 = syn:groups_count(default, node()),
+    0 = syn:groups_count(default, SlaveNode1),
+    0 = syn:groups_count(default, SlaveNode2),
+
+    %% join
+    ok = syn:join({group, "one"}, Pid),
+    ok = syn:join({group, "one"}, PidWithMeta, <<"with meta">>),
+    ok = syn:join({group, "one"}, PidRemoteOn1),
+    ok = syn:join({group, "two"}, Pid),
+    ok = syn:join({group, "two"}, PidWithMeta, "with-meta-2"),
+
+    %% errors
+    {error, not_alive} = syn:join({"pid not alive"}, list_to_pid("<0.9999.0>")),
+
+    %% retrieve
+    syn_test_suite_helper:assert_wait(
+        lists:sort([{Pid, undefined}, {PidWithMeta, <<"with meta">>}, {PidRemoteOn1, undefined}]),
+        fun() -> lists:sort(syn:get_members({group, "one"})) end
+    ),
+    syn_test_suite_helper:assert_wait(
+        lists:sort([{Pid, undefined}, {PidWithMeta, <<"with meta">>}, {PidRemoteOn1, undefined}]),
+        fun() -> lists:sort(rpc:call(SlaveNode1, syn, get_members, [{group, "one"}])) end
+    ),
+    syn_test_suite_helper:assert_wait(
+        lists:sort([{Pid, undefined}, {PidWithMeta, <<"with meta">>}, {PidRemoteOn1, undefined}]),
+        fun() -> lists:sort(rpc:call(SlaveNode2, syn, get_members, [{group, "one"}])) end
+    ),
+    syn_test_suite_helper:assert_wait(
+        lists:sort([{Pid, undefined}, {PidWithMeta, "with-meta-2"}]),
+        fun() -> lists:sort(syn:get_members({group, "two"})) end
+    ),
+    syn_test_suite_helper:assert_wait(
+        lists:sort([{Pid, undefined}, {PidWithMeta, "with-meta-2"}]),
+        fun() -> lists:sort(rpc:call(SlaveNode1, syn, get_members, [{group, "two"}])) end
+    ),
+    syn_test_suite_helper:assert_wait(
+        lists:sort([{Pid, undefined}, {PidWithMeta, "with-meta-2"}]),
+        fun() -> lists:sort(rpc:call(SlaveNode2, syn, get_members, [{group, "two"}])) end
+    ),
+    2 = syn:groups_count(default),
+    2 = syn:groups_count(default, node()),
+    1 = syn:groups_count(default, SlaveNode1),
+    0 = syn:groups_count(default, SlaveNode2).

+ 17 - 0
test/syn_test_suite_helper.erl

@@ -38,6 +38,7 @@
 -export([assert_registry_scope_subcluster/3, assert_groups_scope_subcluster/3]).
 -export([assert_received_messages/1]).
 -export([assert_empty_queue/1]).
+-export([assert_same_array_with_same_members/2]).
 -export([assert_wait/2]).
 -export([send_error_logger_to_disk/0]).
 
@@ -217,6 +218,22 @@ assert_empty_queue(Pid) when is_pid(Pid) ->
             ct:fail("~n\tMessage queue was not empty, got:~n\t~p~n", [Messages])
     end.
 
+assert_same_array_with_same_members(Arr1, Arr2) ->
+    assert_same_array_with_same_members(Arr1, Arr2, []).
+assert_same_array_with_same_members([], [], []) ->
+    ok;
+assert_same_array_with_same_members([], RemArr2, RemArr1) ->
+    ct:fail("~n\tIn 1 only: ~p~n\tIn 2 only: ~p~n", [RemArr1, RemArr2]);
+assert_same_array_with_same_members([E1 | Arr1], Arr2, RemArr1) ->
+    case lists:member(E1, Arr2) of
+        false ->
+            assert_same_array_with_same_members(Arr1, Arr2, [E1 | RemArr1]);
+
+        true ->
+            NewArr2 = lists:delete(E1, Arr2),
+            assert_same_array_with_same_members(Arr1, NewArr2, RemArr1)
+    end.
+
 assert_wait(ExpectedResult, Fun) ->
     assert_wait(ExpectedResult, Fun, os:system_time(millisecond)).
 assert_wait(ExpectedResult, Fun, StartAt) ->