|
@@ -96,11 +96,11 @@ register(Scope, Name, Pid) when is_pid(Pid) ->
|
|
|
register(Scope, Name, Pid, Meta) ->
|
|
|
Node = node(Pid),
|
|
|
case syn_gen_scope:call(?MODULE, Node, Scope, {register_on_owner, node(), Name, Pid, Meta}) of
|
|
|
- {ok, {TablePid, TableMeta, Time, TableByName, TableByPid}} when Node =/= node() ->
|
|
|
+ {ok, {CallbackMethod, Time, TableByName, TableByPid}} when Node =/= node() ->
|
|
|
%% update table on caller node immediately so that subsequent calls have an updated registry
|
|
|
add_to_local_table(Name, Pid, Meta, Time, undefined, TableByName, TableByPid),
|
|
|
%% callback
|
|
|
- syn_event_handler:do_on_process_registered(Scope, Name, {TablePid, TableMeta}, {Pid, Meta}),
|
|
|
+ syn_event_handler:call_event_handler(CallbackMethod, [Scope, Name, Pid, Meta]),
|
|
|
%% return
|
|
|
ok;
|
|
|
|
|
@@ -131,7 +131,7 @@ unregister(Scope, Name) ->
|
|
|
%% remove table on caller node immediately so that subsequent calls have an updated registry
|
|
|
remove_from_local_table(Name, Pid, TableByName, TableByPid),
|
|
|
%% callback
|
|
|
- syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
|
|
|
+ syn_event_handler:call_event_handler(on_process_unregistered, [Scope, Name, Pid, Meta]),
|
|
|
%% return
|
|
|
ok;
|
|
|
|
|
@@ -206,22 +206,26 @@ handle_call({register_on_owner, RequesterNode, Name, Pid, Meta}, _From, #state{
|
|
|
Time = erlang:system_time(),
|
|
|
add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid),
|
|
|
%% callback
|
|
|
- syn_event_handler:do_on_process_registered(Scope, Name, {undefined, undefined}, {Pid, Meta}),
|
|
|
+ syn_event_handler:call_event_handler(on_process_registered, [Scope, Name, Pid, Meta]),
|
|
|
%% broadcast
|
|
|
syn_gen_scope:broadcast({'3.0', sync_register, Name, Pid, Meta, Time}, [RequesterNode], State),
|
|
|
%% return
|
|
|
- {reply, {ok, {undefined, undefined, Time, TableByName, TableByPid}}, State};
|
|
|
+ {reply, {ok, {on_process_registered, Time, TableByName, TableByPid}}, State};
|
|
|
|
|
|
- {Name, Pid, TableMeta, _TableTime, MRef, _TableNode} ->
|
|
|
- %% same pid, possibly new meta or time, overwrite
|
|
|
+ {Name, Pid, Meta, _, _, _} ->
|
|
|
+ %% same pid, same meta
|
|
|
+ {reply, {ok, noop}, State};
|
|
|
+
|
|
|
+ {Name, Pid, _, _, MRef, _} ->
|
|
|
+ %% same pid, new meta
|
|
|
Time = erlang:system_time(),
|
|
|
add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid),
|
|
|
%% callback
|
|
|
- syn_event_handler:do_on_process_registered(Scope, Name, {Pid, TableMeta}, {Pid, Meta}),
|
|
|
+ syn_event_handler:call_event_handler(on_registry_process_updated, [Scope, Name, Pid, Meta]),
|
|
|
%% broadcast
|
|
|
syn_gen_scope:broadcast({'3.0', sync_register, Name, Pid, Meta, Time}, State),
|
|
|
%% return
|
|
|
- {reply, {ok, {Pid, TableMeta, Time, TableByName, TableByPid}}, State};
|
|
|
+ {reply, {ok, {on_registry_process_updated, Time, TableByName, TableByPid}}, State};
|
|
|
|
|
|
_ ->
|
|
|
{reply, {{error, taken}, undefined}, State}
|
|
@@ -243,7 +247,7 @@ handle_call({unregister_on_owner, RequesterNode, Name, Pid}, _From, #state{
|
|
|
%% remove from table
|
|
|
remove_from_local_table(Name, Pid, TableByName, TableByPid),
|
|
|
%% callback
|
|
|
- syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
|
|
|
+ syn_event_handler:call_event_handler(on_process_unregistered, [Scope, Name, Pid, Meta]),
|
|
|
%% broadcast
|
|
|
syn_gen_scope:broadcast({'3.0', sync_unregister, Name, Pid, Meta}, [RequesterNode], State),
|
|
|
%% return
|
|
@@ -279,7 +283,7 @@ handle_info({'3.0', sync_unregister, Name, Pid, Meta}, #state{
|
|
|
} = State) ->
|
|
|
remove_from_local_table(Name, Pid, TableByName, TableByPid),
|
|
|
%% callback
|
|
|
- syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
|
|
|
+ syn_event_handler:call_event_handler(on_process_unregistered, [Scope, Name, Pid, Meta]),
|
|
|
%% return
|
|
|
{noreply, State};
|
|
|
|
|
@@ -300,7 +304,7 @@ handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{
|
|
|
%% remove from table
|
|
|
remove_from_local_table(Name, Pid, TableByName, TableByPid),
|
|
|
%% callback
|
|
|
- syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta),
|
|
|
+ syn_event_handler:call_event_handler(on_process_unregistered, [Scope, Name, Pid, Meta]),
|
|
|
%% broadcast
|
|
|
syn_gen_scope:broadcast({'3.0', sync_unregister, Name, Pid, Meta}, State)
|
|
|
end, Entries)
|
|
@@ -469,7 +473,7 @@ purge_registry_for_remote_node(Scope, Node, TableByName, TableByPid) when Node =
|
|
|
RegistryTuples = get_registry_tuples_for_node(Node, TableByName),
|
|
|
spawn(fun() ->
|
|
|
lists:foreach(fun({Name, Pid, Meta, _Time}) ->
|
|
|
- syn_event_handler:do_on_process_unregistered(Scope, Name, Pid, Meta)
|
|
|
+ syn_event_handler:call_event_handler(on_process_unregistered, [Scope, Name, Pid, Meta])
|
|
|
end, RegistryTuples)
|
|
|
end),
|
|
|
%% remove all from pid table
|
|
@@ -493,13 +497,17 @@ handle_registry_sync(Name, Pid, Meta, Time, #state{
|
|
|
%% no conflict
|
|
|
add_to_local_table(Name, Pid, Meta, Time, undefined, TableByName, TableByPid),
|
|
|
%% callback
|
|
|
- syn_event_handler:do_on_process_registered(Scope, Name, {undefined, undefined}, {Pid, Meta});
|
|
|
+ syn_event_handler:call_event_handler(on_process_registered, [Scope, Name, Pid, Meta]);
|
|
|
|
|
|
{Name, Pid, TableMeta, _TableTime, MRef, _TableNode} ->
|
|
|
%% same pid, more recent (because it comes from the same node, which means that it's sequential)
|
|
|
+ %% maybe updated meta or time only
|
|
|
add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid),
|
|
|
- %% callback
|
|
|
- syn_event_handler:do_on_process_registered(Scope, Name, {Pid, TableMeta}, {Pid, Meta});
|
|
|
+ %% callback (call only if meta update)
|
|
|
+ case TableMeta =/= Meta of
|
|
|
+ true -> syn_event_handler:call_event_handler(on_registry_process_updated, [Scope, Name, Pid, Meta]);
|
|
|
+ _ -> ok
|
|
|
+ end;
|
|
|
|
|
|
{Name, TablePid, TableMeta, TableTime, TableMRef, _TableNode} when node(TablePid) =:= node() ->
|
|
|
%% current node runs a conflicting process -> resolve
|
|
@@ -513,8 +521,8 @@ handle_registry_sync(Name, Pid, Meta, Time, #state{
|
|
|
%% current node does not own any of the conflicting processes, update
|
|
|
update_local_table(Name, TablePid, {Pid, Meta, Time, undefined}, TableByName, TableByPid),
|
|
|
%% callbacks
|
|
|
- syn_event_handler:do_on_process_unregistered(Scope, Name, TablePid, TableMeta),
|
|
|
- syn_event_handler:do_on_process_registered(Scope, Name, {TablePid, TableMeta}, {Pid, Meta});
|
|
|
+ syn_event_handler:call_event_handler(on_process_unregistered, [Scope, Name, TablePid, TableMeta]),
|
|
|
+ syn_event_handler:call_event_handler(on_process_registered, [Scope, Name, Pid, Meta]);
|
|
|
|
|
|
{Name, _TablePid, _TableMeta, _TableTime, _TableMRef, _TableNode} ->
|
|
|
%% race condition: incoming data is older, ignore
|
|
@@ -551,8 +559,8 @@ resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime
|
|
|
%% kill
|
|
|
exit(TablePid, {syn_resolve_kill, Name, TableMeta}),
|
|
|
%% callbacks
|
|
|
- syn_event_handler:do_on_process_unregistered(Scope, Name, TablePid, TableMeta),
|
|
|
- syn_event_handler:do_on_process_registered(Scope, Name, {TablePid, TableMeta}, {Pid, Meta});
|
|
|
+ syn_event_handler:call_event_handler(on_process_unregistered, [Scope, Name, TablePid, TableMeta]),
|
|
|
+ syn_event_handler:call_event_handler(on_process_registered, [Scope, Name, Pid, Meta]);
|
|
|
|
|
|
TablePid ->
|
|
|
%% -> we keep the local pid
|
|
@@ -575,5 +583,5 @@ resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime
|
|
|
%% kill local, remote will be killed by other node performing the same resolve
|
|
|
exit(TablePid, {syn_resolve_kill, Name, TableMeta}),
|
|
|
%% callback
|
|
|
- syn_event_handler:do_on_process_unregistered(Scope, Name, TablePid, TableMeta)
|
|
|
+ syn_event_handler:call_event_handler(on_process_unregistered, [Scope, Name, TablePid, TableMeta])
|
|
|
end.
|