Browse Source

Add registry update.

Roberto Ostinelli 3 years ago
parent
commit
0be0cbf42c
4 changed files with 176 additions and 13 deletions
  1. 25 1
      src/syn.erl
  2. 2 0
      src/syn_pg.erl
  3. 49 10
      src/syn_registry.erl
  4. 100 2
      test/syn_registry_SUITE.erl

+ 25 - 1
src/syn.erl

@@ -134,7 +134,7 @@
 -export([set_event_handler/1]).
 %% registry
 -export([lookup/2]).
--export([register/3, register/4]).
+-export([register/3, register/4, update_registry/3]).
 -export([unregister/2]).
 -export([registry_count/1, registry_count/2]).
 -export([local_registry_count/1]).
@@ -344,6 +344,30 @@ register(Scope, Name, Pid) ->
 register(Scope, Name, Pid, Meta) ->
     syn_registry:register(Scope, Name, Pid, Meta).
 
+%% @doc Updates the registered Name metadata in the specified Scope.
+%%
+%% Atomically calls Fun with the current metadata, and stores the return value as new metadata.
+%%
+%% <h2>Examples</h2>
+%% <h3>Elixir</h3>
+%% ```
+%% iex> :syn.register(:devices, "SN-123-456789", self(), 10)
+%% :ok
+%% iex> :syn.update_registry(:devices, "area-1", fn existing_meta -> existing_meta * 2 end)
+%% {:ok, {#PID<0.105.0>, 20}}
+%% '''
+%% <h3>Erlang</h3>
+%% ```
+%% 1> syn:register(devices, "SN-123-456789", self(), 10).
+%% ok
+%% 2> syn:update_registry(devices, "SN-123-456789", fun(ExistingMeta) -> ExistingMeta * 2 end).
+%% {ok, {<0.69.0>, 20}}
+%% '''
+-spec update_registry(Scope :: atom(), Name :: term(), Fun :: function()) ->
+    {ok, {Pid :: pid(), Meta :: term()}} | {error, Reason :: term()}.
+update_registry(Scope, Name, Fun) ->
+    syn_registry:update(Scope, Name, Fun).
+
 %% @doc Unregisters a process from specified Scope.
 %%
 %% Possible error reasons:

+ 2 - 0
src/syn_pg.erl

@@ -156,6 +156,8 @@ join(Scope, GroupName, Pid, Meta) ->
 update_member(Scope, GroupName, Pid, Fun) when is_function(Fun) ->
     join_or_update(Scope, GroupName, Pid, Fun).
 
+-spec join_or_update(Scope :: atom(), GroupName :: term(), Pid :: pid(), MetaOrFun :: term() | function()) ->
+    {ok, {Pid :: pid(), Meta :: term()}} | {error, Reason :: term()}.
 join_or_update(Scope, GroupName, Pid, MetaOrFun) ->
     case syn_backbone:is_strict_mode() of
         true when Pid =/= self() ->

+ 49 - 10
src/syn_registry.erl

@@ -32,6 +32,7 @@
 -export([subcluster_nodes/1]).
 -export([lookup/2]).
 -export([register/4]).
+-export([update/3]).
 -export([unregister/2]).
 -export([count/1, count/2]).
 
@@ -83,23 +84,52 @@ lookup(Scope, Name) ->
 
 -spec register(Scope :: atom(), Name :: term(), Pid :: pid(), Meta :: term()) -> ok | {error, Reason :: term()}.
 register(Scope, Name, Pid, Meta) ->
+    case register_or_update(Scope, Name, Pid, Meta) of
+        {ok, _} -> ok;
+        {error, Reason} -> {error, Reason}
+    end.
+
+-spec update(Scope :: atom(), Name :: term(), Fun :: function()) ->
+    {ok, {Pid :: pid(), Meta :: term()}} | {error, Reason :: term()}.
+update(Scope, Name, Fun) ->
+    case syn_backbone:get_table_name(syn_registry_by_name, Scope) of
+        undefined ->
+            error({invalid_scope, Scope});
+
+        TableByName ->
+            % get process' node
+            case find_registry_entry_by_name(Name, TableByName) of
+                undefined ->
+                    {error, undefined};
+
+                {Name, Pid, _, _, _, _} ->
+                    register_or_update(Scope, Name, Pid, Fun)
+            end
+    end.
+
+-spec register_or_update(Scope :: atom(), Name :: term(), Pid :: pid(), MetaOrFun :: term() | function()) ->
+    {ok, {Pid :: pid(), Meta :: term()}} | {error, Reason :: term()}.
+register_or_update(Scope, Name, Pid, MetaOrFun) ->
     case syn_backbone:is_strict_mode() of
         true when Pid =/= self() ->
             {error, not_self};
 
         _ ->
             Node = node(Pid),
-            case syn_gen_scope:call(?MODULE, Node, Scope, {'3.0', register_on_node, node(), Name, Pid, Meta}) of
-                {ok, {CallbackMethod, Time, TableByName, TableByPid}} when Node =/= node() ->
+            case syn_gen_scope:call(?MODULE, Node, Scope, {'3.0', register_or_update_on_node, node(), Name, Pid, MetaOrFun}) of
+                {ok, {CallbackMethod, Meta, Time, TableByName, TableByPid}} when Node =/= node() ->
                     %% update table on caller node immediately so that subsequent calls have an updated registry
                     add_to_local_table(Name, Pid, Meta, Time, undefined, TableByName, TableByPid),
                     %% callback
                     syn_event_handler:call_event_handler(CallbackMethod, [Scope, Name, Pid, Meta, normal]),
                     %% return
-                    ok;
+                    {ok, {Pid, Meta}};
+
+                {ok, {_, Meta, _, _, _}} ->
+                    {ok, {Pid, Meta}};
 
-                {Response, _} ->
-                    Response
+                {{error, Reason}, _} ->
+                    {error, Reason}
             end
     end.
 
@@ -184,28 +214,36 @@ init(#state{
     {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
     {stop, Reason :: term(), Reply :: term(), #state{}} |
     {stop, Reason :: term(), #state{}}.
-handle_call({'3.0', register_on_node, RequesterNode, Name, Pid, Meta}, _From, #state{
+handle_call({'3.0', register_or_update_on_node, RequesterNode, Name, Pid, MetaOrFun}, _From, #state{
     table_by_name = TableByName,
     table_by_pid = TableByPid
 } = State) ->
     case is_process_alive(Pid) of
         true ->
             case find_registry_entry_by_name(Name, TableByName) of
+                undefined when is_function(MetaOrFun) ->
+                    {reply, {{error, undefined}, undefined}, State};
+
                 undefined ->
                     %% available
                     MRef = case find_monitor_for_pid(Pid, TableByPid) of
                         undefined -> erlang:monitor(process, Pid);  %% process is not monitored yet, add
                         MRef0 -> MRef0
                     end,
-                    do_register_on_node(Name, Pid, Meta, MRef, normal, RequesterNode, on_process_registered, State);
+                    do_register_on_node(Name, Pid, MetaOrFun, MRef, normal, RequesterNode, on_process_registered, State);
 
-                {Name, Pid, Meta, _, _, _} ->
+                {Name, Pid, TableMeta, _, MRef, _} when is_function(MetaOrFun) ->
+                    %% update with fun
+                    Meta = MetaOrFun(TableMeta),
+                    do_register_on_node(Name, Pid, Meta, MRef, normal, RequesterNode, on_registry_process_updated, State);
+
+                {Name, Pid, MetaOrFun, _, _, _} ->
                     %% same pid, same meta
                     {reply, {ok, noop}, State};
 
                 {Name, Pid, _, _, MRef, _} ->
                     %% same pid, different meta
-                    do_register_on_node(Name, Pid, Meta, MRef, normal, RequesterNode, on_registry_process_updated, State);
+                    do_register_on_node(Name, Pid, MetaOrFun, MRef, normal, RequesterNode, on_registry_process_updated, State);
 
                 _ ->
                     {reply, {{error, taken}, undefined}, State}
@@ -387,6 +425,7 @@ do_rebuild_monitors([{Name, Pid, Meta, Time} | T], NewMRefs, Scope, TableByName,
         reply,
         {ok, {
             CallbackMethod :: atom(),
+            Meta :: term(),
             Time :: non_neg_integer(),
             TableByName :: atom(),
             TableByPid :: atom()
@@ -406,7 +445,7 @@ do_register_on_node(Name, Pid, Meta, MRef, Reason, RequesterNode, CallbackMethod
     %% broadcast
     syn_gen_scope:broadcast({'3.0', sync_register, Name, Pid, Meta, Time, Reason}, [RequesterNode], State),
     %% return
-    {reply, {ok, {CallbackMethod, Time, TableByName, TableByPid}}, State}.
+    {reply, {ok, {CallbackMethod, Meta, Time, TableByName, TableByPid}}, State}.
 
 -spec get_registry_tuples_for_node(Node :: node(), TableByName :: atom()) -> [syn_registry_tuple()].
 get_registry_tuples_for_node(Node, TableByName) ->

+ 100 - 2
test/syn_registry_SUITE.erl

@@ -43,7 +43,8 @@
     three_nodes_cluster_changes/1,
     three_nodes_cluster_conflicts/1,
     three_nodes_custom_event_handler_reg_unreg/1,
-    three_nodes_custom_event_handler_conflict_resolution/1
+    three_nodes_custom_event_handler_conflict_resolution/1,
+    three_nodes_update/1
 ]).
 -export([
     four_nodes_concurrency/1
@@ -96,7 +97,8 @@ groups() ->
             three_nodes_cluster_changes,
             three_nodes_cluster_conflicts,
             three_nodes_custom_event_handler_reg_unreg,
-            three_nodes_custom_event_handler_conflict_resolution
+            three_nodes_custom_event_handler_conflict_resolution,
+            three_nodes_update
         ]},
         {four_nodes_registry, [shuffle], [
             four_nodes_concurrency
@@ -1454,6 +1456,102 @@ three_nodes_custom_event_handler_conflict_resolution(Config) ->
         fun() -> rpc:call(SlaveNode2, erlang, is_process_alive, [PidOn2]) end
     ).
 
+three_nodes_update(Config) ->
+    %% get slaves
+    SlaveNode1 = proplists:get_value(syn_slave_1, Config),
+    SlaveNode2 = proplists:get_value(syn_slave_2, Config),
+
+    %% start syn
+    ok = syn:start(),
+    ok = rpc:call(SlaveNode1, syn, start, []),
+    ok = rpc:call(SlaveNode2, syn, start, []),
+
+    %% add scopes
+    ok = syn:add_node_to_scopes([scope_all]),
+    ok = rpc:call(SlaveNode1, syn, add_node_to_scopes, [[scope_all, scope_bc]]),
+    ok = rpc:call(SlaveNode2, syn, add_node_to_scopes, [[scope_all, scope_bc]]),
+
+    %% start processes
+    Pid = syn_test_suite_helper:start_process(),
+    PidOn1 = syn_test_suite_helper:start_process(SlaveNode1),
+
+    %% init
+    TestPid = self(),
+    LocalNode = node(),
+
+    %% register
+    ok = syn:register(scope_all, "my-proc", Pid, {recipient, TestPid, 10}),
+
+    %% add custom handler for resolution (using method call)
+    syn:set_event_handler(syn_test_event_handler_callbacks),
+    rpc:call(SlaveNode1, syn, set_event_handler, [syn_test_event_handler_callbacks]),
+    rpc:call(SlaveNode2, syn, set_event_handler, [syn_test_event_handler_callbacks]),
+
+    %% errors
+    {error, undefined} = syn:update_registry(scope_all, "unknown", fun(ExistingMeta) -> ExistingMeta end),
+
+    %% update
+    {ok, {Pid, {recipient, TestPid, 20}}} = syn:update_registry(scope_all, "my-proc", fun({recipient, TestPid0, Count}) ->
+        {recipient, TestPid0, Count * 2}
+    end),
+
+    %% retrieve
+    syn_test_suite_helper:assert_wait(
+        {Pid, {recipient, TestPid, 20}},
+        fun() -> syn:lookup(scope_all, "my-proc") end
+    ),
+    syn_test_suite_helper:assert_wait(
+        {Pid, {recipient, TestPid, 20}},
+        fun() -> rpc:call(SlaveNode1, syn, lookup, [scope_all, "my-proc"]) end
+    ),
+    syn_test_suite_helper:assert_wait(
+        {Pid, {recipient, TestPid, 20}},
+        fun() -> rpc:call(SlaveNode2, syn, lookup, [scope_all, "my-proc"]) end
+    ),
+
+    %% check callbacks called
+    syn_test_suite_helper:assert_received_messages([
+        {on_registry_process_updated, LocalNode, scope_all, "my-proc", Pid, 20, normal},
+        {on_registry_process_updated, SlaveNode1, scope_all, "my-proc", Pid, 20, normal},
+        {on_registry_process_updated, SlaveNode2, scope_all, "my-proc", Pid, 20, normal}
+    ]),
+
+    %% register on remote
+    ok = syn:register(scope_all, "my-proc-on-1", PidOn1, {recipient, TestPid, 1000}),
+
+    %% check callbacks called
+    syn_test_suite_helper:assert_received_messages([
+        {on_process_registered, LocalNode, scope_all, "my-proc-on-1", PidOn1, 1000, normal},
+        {on_process_registered, SlaveNode1, scope_all, "my-proc-on-1", PidOn1, 1000, normal},
+        {on_process_registered, SlaveNode2, scope_all, "my-proc-on-1", PidOn1, 1000, normal}
+    ]),
+
+    %% update on remote
+    {ok, {PidOn1, {recipient, TestPid, 1001}}} = syn:update_registry(scope_all, "my-proc-on-1", fun({recipient, TestPid0, Count}) ->
+        {recipient, TestPid0, Count + 1}
+    end),
+
+    %% retrieve
+    syn_test_suite_helper:assert_wait(
+        {PidOn1, {recipient, TestPid, 1001}},
+        fun() -> syn:lookup(scope_all, "my-proc-on-1") end
+    ),
+    syn_test_suite_helper:assert_wait(
+        {PidOn1, {recipient, TestPid, 1001}},
+        fun() -> rpc:call(SlaveNode1, syn, lookup, [scope_all, "my-proc-on-1"]) end
+    ),
+    syn_test_suite_helper:assert_wait(
+        {PidOn1, {recipient, TestPid, 1001}},
+        fun() -> rpc:call(SlaveNode2, syn, lookup, [scope_all, "my-proc-on-1"]) end
+    ),
+
+    %% check callbacks called
+    syn_test_suite_helper:assert_received_messages([
+        {on_registry_process_updated, LocalNode, scope_all, "my-proc-on-1", PidOn1, 1001, normal},
+        {on_registry_process_updated, SlaveNode1, scope_all, "my-proc-on-1", PidOn1, 1001, normal},
+        {on_registry_process_updated, SlaveNode2, scope_all, "my-proc-on-1", PidOn1, 1001, normal}
+    ]).
+
 four_nodes_concurrency(Config) ->
     %% get slaves
     SlaveNode1 = proplists:get_value(syn_slave_1, Config),