Browse Source

Add member & update in pg.

Roberto Ostinelli 3 years ago
parent
commit
e34f19062d
4 changed files with 218 additions and 27 deletions
  1. 47 2
      src/syn.erl
  2. 1 1
      src/syn_gen_scope.erl
  3. 63 22
      src/syn_pg.erl
  4. 107 2
      test/syn_pg_SUITE.erl

+ 47 - 2
src/syn.erl

@@ -141,7 +141,7 @@
 %% gen_server via interface
 -export([register_name/2, unregister_name/1, whereis_name/1, send/2]).
 %% groups
--export([members/2, is_member/3]).
+-export([members/2, member/3, is_member/3, update_member/4]).
 -export([local_members/2, is_local_member/3]).
 -export([join/3, join/4]).
 -export([leave/3]).
@@ -429,7 +429,7 @@ send({Scope, Name}, Message) ->
 %% <h2>Examples</h2>
 %% <h3>Elixir</h3>
 %% ```
-%% iex> :syn.join(:devices, "area-1")
+%% iex> :syn.join(:devices, "area-1", self())
 %% :ok
 %% iex> :syn.members(:devices, "area-1")
 %% [{#PID<0.105.0>, :undefined}]
@@ -445,11 +445,56 @@ send({Scope, Name}, Message) ->
 members(Scope, GroupName) ->
     syn_pg:members(Scope, GroupName).
 
+%% @doc Returns the member for GroupName in the specified Scope.
+%%
+%% <h2>Examples</h2>
+%% <h3>Elixir</h3>
+%% ```
+%% iex> :syn.join(:devices, "area-1", self(), :meta)
+%% :ok
+%% iex> :syn.member(:devices, "area-1", self())
+%% {#PID<0.105.0>, :meta}
+%% '''
+%% <h3>Erlang</h3>
+%% ```
+%% 1> syn:join(devices, "area-1", self(), meta).
+%% ok
+%% 2> syn:member(devices, "area-1", self()).
+%% [{<0.69.0>, meta}]
+%% '''
+-spec member(Scope :: atom(), GroupName :: term(), pid()) -> {pid(), Meta :: term()} | undefined.
+member(Scope, GroupName, Pid) ->
+    syn_pg:member(Scope, GroupName, Pid).
+
 %% @doc Returns whether a `pid()' is a member of GroupName in the specified Scope.
 -spec is_member(Scope :: atom(), GroupName :: term(), Pid :: pid()) -> boolean().
 is_member(Scope, GroupName, Pid) ->
     syn_pg:is_member(Scope, GroupName, Pid).
 
+%% @doc Updates the GroupName member metadata in the specified Scope.
+%%
+%% Atomically calls Fun with the current metadata, and stores the return value as new metadata.
+%%
+%% <h2>Examples</h2>
+%% <h3>Elixir</h3>
+%% ```
+%% iex> :syn.join(:devices, "area-1", self(), 10)
+%% :ok
+%% iex> :syn.update_member(:devices, "area-1", self(), fn existing_meta -> existing_meta * 2 end)
+%% {:ok, {#PID<0.105.0>, 20}}
+%% '''
+%% <h3>Erlang</h3>
+%% ```
+%% 1> syn:join(devices, "area-1", self(), 10).
+%% ok
+%% 2> syn:update_member(devices, "area-1", self(), fun(ExistingMeta) -> ExistingMeta * 2 end).
+%% {ok, {<0.69.0>, 20}}
+%% '''
+-spec update_member(Scope :: atom(), GroupName :: term(), Pid :: pid(), Fun :: function()) ->
+    {ok, {Pid :: pid(), Meta :: term()}} | {error, Reason :: term()}.
+update_member(Scope, GroupName, Pid, Fun) ->
+    syn_pg:update_member(Scope, GroupName, Pid, Fun).
+
 %% @doc Returns the list of all members for GroupName in the specified Scope running on the local node.
 -spec local_members(Scope :: atom(), GroupName :: term()) -> [{Pid :: pid(), Meta :: term()}].
 local_members(Scope, GroupName) ->

+ 1 - 1
src/syn_gen_scope.erl

@@ -79,7 +79,7 @@
 %% API
 %% ===================================================================
 -spec start_link(Handler :: module(), HandlerLogName :: atom(), Scope :: atom()) ->
-    {ok, Pid :: pid()} | {error, {already_started, Pid :: pid()}} | {error, Reason :: term()}.
+    {ok, pid()} | {error, {already_started, pid()}} | {error, Reason :: term()}.
 start_link(Handler, HandlerLogName, Scope) when is_atom(Scope) ->
     %% build name
     HandlerBin = list_to_binary(atom_to_list(Handler)),

+ 63 - 22
src/syn_pg.erl

@@ -33,7 +33,9 @@
 -export([join/4]).
 -export([leave/3]).
 -export([members/2]).
+-export([member/3]).
 -export([is_member/3]).
+-export([update_member/4]).
 -export([local_members/2]).
 -export([is_local_member/3]).
 -export([count/1, count/2]).
@@ -80,9 +82,16 @@ subcluster_nodes(Scope) ->
 
 -spec members(Scope :: atom(), GroupName :: term()) -> [{Pid :: pid(), Meta :: term()}].
 members(Scope, GroupName) ->
-    do_get_members(Scope, GroupName, '_').
+    do_get_members(Scope, GroupName, undefined, undefined).
 
--spec is_member(Scope :: atom(), GroupName :: term(), Pid :: pid()) -> boolean().
+-spec member(Scope :: atom(), GroupName :: term(), pid()) -> {pid(), Meta :: term()} | undefined.
+member(Scope, GroupName, Pid) ->
+    case do_get_members(Scope, GroupName, Pid, undefined) of
+        [] -> undefined;
+        [Member] -> Member
+    end.
+
+-spec is_member(Scope :: atom(), GroupName :: term(), pid()) -> boolean().
 is_member(Scope, GroupName, Pid) ->
     case syn_backbone:get_table_name(syn_pg_by_name, Scope) of
         undefined ->
@@ -95,25 +104,34 @@ is_member(Scope, GroupName, Pid) ->
             end
     end.
 
--spec local_members(Scope :: atom(), GroupName :: term()) -> [{Pid :: pid(), Meta :: term()}].
+-spec local_members(Scope :: atom(), GroupName :: term()) -> [{pid(), Meta :: term()}].
 local_members(Scope, GroupName) ->
-    do_get_members(Scope, GroupName, node()).
-
--spec do_get_members(Scope :: atom(), GroupName :: term(), NodeParam :: atom()) -> [{Pid :: pid(), Meta :: term()}].
-do_get_members(Scope, GroupName, NodeParam) ->
+    do_get_members(Scope, GroupName, undefined, node()).
+
+-spec do_get_members(Scope :: atom(), GroupName :: term(), pid() | undefined, Node :: node() | undefined) ->
+    [{pid(), Meta :: term()}].
+do_get_members(Scope, GroupName, Pid, Node) ->
+    PidParam = case Pid of
+        undefined -> '$2';
+        _ -> Pid
+    end,
+    NodeParam = case Node of
+        undefined -> '_';
+        _ -> Node
+    end,
     case syn_backbone:get_table_name(syn_pg_by_name, Scope) of
         undefined ->
             error({invalid_scope, Scope});
 
         TableByName ->
             ets:select(TableByName, [{
-                {{GroupName, '$2'}, '$3', '_', '_', NodeParam},
+                {{GroupName, PidParam}, '$3', '_', '_', NodeParam},
                 [],
-                [{{'$2', '$3'}}]
+                [{{PidParam, '$3'}}]
             }])
     end.
 
--spec is_local_member(Scope :: atom(), GroupName :: term(), Pid :: pid()) -> boolean().
+-spec is_local_member(Scope :: atom(), GroupName :: term(), pid()) -> boolean().
 is_local_member(Scope, GroupName, Pid) ->
     case syn_backbone:get_table_name(syn_pg_by_name, Scope) of
         undefined ->
@@ -128,27 +146,41 @@ is_local_member(Scope, GroupName, Pid) ->
 
 -spec join(Scope :: atom(), GroupName :: term(), Pid :: pid(), Meta :: term()) -> ok.
 join(Scope, GroupName, Pid, Meta) ->
+    case join_or_update(Scope, GroupName, Pid, Meta) of
+        {ok, _} -> ok;
+        {error, Reason} -> {error, Reason}
+    end.
+
+-spec update_member(Scope :: atom(), GroupName :: term(), Pid :: pid(), Fun :: function()) ->
+    {ok, {Pid :: pid(), Meta :: term()}} | {error, Reason :: term()}.
+update_member(Scope, GroupName, Pid, Fun) when is_function(Fun) ->
+    join_or_update(Scope, GroupName, Pid, Fun).
+
+join_or_update(Scope, GroupName, Pid, MetaOrFun) ->
     case syn_backbone:is_strict_mode() of
         true when Pid =/= self() ->
             {error, not_self};
 
         _ ->
             Node = node(Pid),
-            case syn_gen_scope:call(?MODULE, Node, Scope, {'3.0', join_on_node, node(), GroupName, Pid, Meta}) of
-                {ok, {CallbackMethod, Time, TableByName, TableByPid}} when Node =/= node() ->
+            case syn_gen_scope:call(?MODULE, Node, Scope, {'3.0', join_or_update_on_node, node(), GroupName, Pid, MetaOrFun}) of
+                {ok, {CallbackMethod, Meta, Time, TableByName, TableByPid}} when Node =/= node() ->
                     %% update table on caller node immediately so that subsequent calls have an updated pg
                     add_to_local_table(GroupName, Pid, Meta, Time, undefined, TableByName, TableByPid),
                     %% callback
                     syn_event_handler:call_event_handler(CallbackMethod, [Scope, GroupName, Pid, Meta, normal]),
                     %% return
-                    ok;
+                    {ok, {Pid, Meta}};
 
-                {Response, _} ->
-                    Response
+                {ok, {_, Meta, _, _, _}} ->
+                    {ok, {Pid, Meta}};
+
+                {{error, Reason}, _} ->
+                    {error, Reason}
             end
     end.
 
--spec leave(Scope :: atom(), GroupName :: term(), Pid :: pid()) -> ok | {error, Reason :: term()}.
+-spec leave(Scope :: atom(), GroupName :: term(), pid()) -> ok | {error, Reason :: term()}.
 leave(Scope, GroupName, Pid) ->
     case syn_backbone:get_table_name(syn_pg_by_name, Scope) of
         undefined ->
@@ -236,7 +268,7 @@ multi_call(Scope, GroupName, Message, Timeout) ->
     end, Members),
     collect_replies(orddict:from_list(Members)).
 
--spec multi_call_reply({CallerPid :: pid(), reference()}, Reply :: term()) -> any().
+-spec multi_call_reply({CallerPid :: term(), reference()}, Reply :: term()) -> any().
 multi_call_reply({CallerPid, Ref}, Reply) ->
     CallerPid ! {syn_multi_call_reply, Ref, Reply}.
 
@@ -270,28 +302,36 @@ init(#state{
     {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
     {stop, Reason :: term(), Reply :: term(), #state{}} |
     {stop, Reason :: term(), #state{}}.
-handle_call({'3.0', join_on_node, RequesterNode, GroupName, Pid, Meta}, _From, #state{
+handle_call({'3.0', join_or_update_on_node, RequesterNode, GroupName, Pid, MetaOrFun}, _From, #state{
     table_by_name = TableByName,
     table_by_pid = TableByPid
 } = State) ->
     case is_process_alive(Pid) of
         true ->
             case find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) of
+                undefined when is_function(MetaOrFun) ->
+                    {reply, {{error, undefined}, undefined}, State};
+
                 undefined ->
                     %% add
                     MRef = case find_monitor_for_pid(Pid, TableByPid) of
                         undefined -> erlang:monitor(process, Pid);  %% process is not monitored yet, create
                         MRef0 -> MRef0
                     end,
-                    do_join_on_node(GroupName, Pid, Meta, MRef, normal, RequesterNode, on_process_joined, State);
+                    do_join_on_node(GroupName, Pid, MetaOrFun, MRef, normal, RequesterNode, on_process_joined, State);
+
+                {{_, _}, TableMeta, _, MRef, _} when is_function(MetaOrFun) ->
+                    %% update with fun
+                    Meta = MetaOrFun(TableMeta),
+                    do_join_on_node(GroupName, Pid, Meta, MRef, normal, RequesterNode, on_group_process_updated, State);
 
-                {{_, _}, Meta, _, _, _} ->
+                {{_, _}, MetaOrFun, _, _, _} ->
                     %% re-joined with same meta
                     {reply, {ok, noop}, State};
 
                 {{_, _}, _, _, MRef, _} ->
                     %% re-joined with different meta
-                    do_join_on_node(GroupName, Pid, Meta, MRef, normal, RequesterNode, on_group_process_updated, State)
+                    do_join_on_node(GroupName, Pid, MetaOrFun, MRef, normal, RequesterNode, on_group_process_updated, State)
             end;
 
         false ->
@@ -467,6 +507,7 @@ do_rebuild_monitors([{GroupName, Pid, Meta, Time} | T], NewMRefs, Scope, TableBy
         reply,
         {ok, {
             CallbackMethod :: atom(),
+            Meta :: term(),
             Time :: non_neg_integer(),
             TableByName :: atom(),
             TableByPid :: atom()
@@ -486,7 +527,7 @@ do_join_on_node(GroupName, Pid, Meta, MRef, Reason, RequesterNode, CallbackMetho
     %% broadcast
     syn_gen_scope:broadcast({'3.0', sync_join, GroupName, Pid, Meta, Time, Reason}, [RequesterNode], State),
     %% return
-    {reply, {ok, {CallbackMethod, Time, TableByName, TableByPid}}, State}.
+    {reply, {ok, {CallbackMethod, Meta, Time, TableByName, TableByPid}}, State}.
 
 -spec get_pg_tuples_for_node(Node :: node(), TableByName :: atom()) -> [syn_pg_tuple()].
 get_pg_tuples_for_node(Node, TableByName) ->

+ 107 - 2
test/syn_pg_SUITE.erl

@@ -43,7 +43,8 @@
     three_nodes_custom_event_handler_joined_left/1,
     three_nodes_publish/1,
     three_nodes_multi_call/1,
-    three_nodes_group_names/1
+    three_nodes_group_names/1,
+    three_nodes_member_and_update/1
 ]).
 -export([
     four_nodes_concurrency/1
@@ -102,7 +103,8 @@ groups() ->
             three_nodes_custom_event_handler_joined_left,
             three_nodes_publish,
             three_nodes_multi_call,
-            three_nodes_group_names
+            three_nodes_group_names,
+            three_nodes_member_and_update
         ]},
         {four_nodes_pg, [shuffle], [
             four_nodes_concurrency
@@ -1646,6 +1648,109 @@ three_nodes_group_names(Config) ->
         fun() -> lists:sort(rpc:call(SlaveNode2, syn, group_names, [scope_all, SlaveNode2])) end
     ).
 
+three_nodes_member_and_update(Config) ->
+    %% get slaves
+    SlaveNode1 = proplists:get_value(syn_slave_1, Config),
+    SlaveNode2 = proplists:get_value(syn_slave_2, Config),
+
+    %% start syn on nodes
+    ok = syn:start(),
+    ok = rpc:call(SlaveNode1, syn, start, []),
+    ok = rpc:call(SlaveNode2, syn, start, []),
+
+    %% add scopes
+    ok = syn:add_node_to_scopes([scope_all]),
+    ok = rpc:call(SlaveNode1, syn, add_node_to_scopes, [[scope_all]]),
+    ok = rpc:call(SlaveNode2, syn, add_node_to_scopes, [[scope_all]]),
+
+    %% start processes
+    Pid = syn_test_suite_helper:start_process(),
+    PidOn1 = syn_test_suite_helper:start_process(SlaveNode1),
+
+    %% init
+    TestPid = self(),
+    LocalNode = node(),
+
+    %% join
+    ok = syn:join(scope_all, "my-group", Pid, {recipient, TestPid, 10}),
+
+    %% retrieve
+    {Pid, {recipient, TestPid, 10}} = syn:member(scope_all, "my-group", Pid),
+    undefined = syn:member(scope_all, "my-group", PidOn1),
+    {'EXIT', {{invalid_scope, custom}, _}} = (catch syn:member(custom, "group", PidOn1)),
+
+    %% add custom handler for callbacks
+    syn:set_event_handler(syn_test_event_handler_callbacks),
+    rpc:call(SlaveNode1, syn, set_event_handler, [syn_test_event_handler_callbacks]),
+    rpc:call(SlaveNode2, syn, set_event_handler, [syn_test_event_handler_callbacks]),
+
+    %% errors
+    {error, undefined} = syn:update_member(scope_all, "my-group", PidOn1, fun(ExistingMeta) -> ExistingMeta end),
+    InvalidPid = list_to_pid("<0.9999.0>"),
+    {error, not_alive} = syn:update_member(scope_all, "my-group", InvalidPid, fun(ExistingMeta) -> ExistingMeta end),
+
+    %% update
+    {ok, {Pid, {recipient, TestPid, 20}}} = syn:update_member(scope_all, "my-group", Pid, fun({recipient, TestPid0, Count}) ->
+        {recipient, TestPid0, Count * 2}
+    end),
+
+    %% retrieve
+    syn_test_suite_helper:assert_wait(
+        {Pid, {recipient, TestPid, 20}},
+        fun() -> syn:member(scope_all, "my-group", Pid) end
+    ),
+    syn_test_suite_helper:assert_wait(
+        {Pid, {recipient, TestPid, 20}},
+        fun() -> rpc:call(SlaveNode1, syn, member, [scope_all, "my-group", Pid]) end
+    ),
+    syn_test_suite_helper:assert_wait(
+        {Pid, {recipient, TestPid, 20}},
+        fun() -> rpc:call(SlaveNode2, syn, member, [scope_all, "my-group", Pid]) end
+    ),
+
+    %% check callbacks called
+    syn_test_suite_helper:assert_received_messages([
+        {on_group_process_updated, LocalNode, scope_all, "my-group", Pid, 20, normal},
+        {on_group_process_updated, SlaveNode1, scope_all, "my-group", Pid, 20, normal},
+        {on_group_process_updated, SlaveNode2, scope_all, "my-group", Pid, 20, normal}
+    ]),
+
+    %% join on remote
+    ok = syn:join(scope_all, "my-group", PidOn1, {recipient, TestPid, 1000}),
+
+    %% check callbacks called
+    syn_test_suite_helper:assert_received_messages([
+        {on_process_joined, LocalNode, scope_all, "my-group", PidOn1, 1000, normal},
+        {on_process_joined, SlaveNode1, scope_all, "my-group", PidOn1, 1000, normal},
+        {on_process_joined, SlaveNode2, scope_all, "my-group", PidOn1, 1000, normal}
+    ]),
+
+    %% update on remote
+    {ok, {PidOn1, {recipient, TestPid, 1001}}} = syn:update_member(scope_all, "my-group", PidOn1, fun({recipient, TestPid0, Count}) ->
+        {recipient, TestPid0, Count + 1}
+    end),
+
+    %% retrieve
+    syn_test_suite_helper:assert_wait(
+        {PidOn1, {recipient, TestPid, 1001}},
+        fun() -> syn:member(scope_all, "my-group", PidOn1) end
+    ),
+    syn_test_suite_helper:assert_wait(
+        {PidOn1, {recipient, TestPid, 1001}},
+        fun() -> rpc:call(SlaveNode1, syn, member, [scope_all, "my-group", PidOn1]) end
+    ),
+    syn_test_suite_helper:assert_wait(
+        {PidOn1, {recipient, TestPid, 1001}},
+        fun() -> rpc:call(SlaveNode2, syn, member, [scope_all, "my-group", PidOn1]) end
+    ),
+
+    %% check callbacks called
+    syn_test_suite_helper:assert_received_messages([
+        {on_group_process_updated, LocalNode, scope_all, "my-group", PidOn1, 1001, normal},
+        {on_group_process_updated, SlaveNode1, scope_all, "my-group", PidOn1, 1001, normal},
+        {on_group_process_updated, SlaveNode2, scope_all, "my-group", PidOn1, 1001, normal}
+    ]).
+
 four_nodes_concurrency(Config) ->
     %% get slaves
     SlaveNode1 = proplists:get_value(syn_slave_1, Config),