|
@@ -157,7 +157,7 @@ join(Scope, GroupName, Pid, Meta) ->
|
|
%% update table on caller node immediately so that subsequent calls have an updated registry
|
|
%% update table on caller node immediately so that subsequent calls have an updated registry
|
|
add_to_local_table(GroupName, Pid, Meta, Time, undefined, TableByName, TableByPid),
|
|
add_to_local_table(GroupName, Pid, Meta, Time, undefined, TableByName, TableByPid),
|
|
%% callback
|
|
%% callback
|
|
- syn_event_handler:call_event_handler(CallbackMethod, [Scope, GroupName, Pid, Meta]),
|
|
|
|
|
|
+ syn_event_handler:call_event_handler(CallbackMethod, [Scope, GroupName, Pid, Meta, normal]),
|
|
%% return
|
|
%% return
|
|
ok;
|
|
ok;
|
|
|
|
|
|
@@ -182,7 +182,7 @@ leave(Scope, GroupName, Pid) ->
|
|
%% remove table on caller node immediately so that subsequent calls have an updated registry
|
|
%% remove table on caller node immediately so that subsequent calls have an updated registry
|
|
remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
|
|
remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
|
|
%% callback
|
|
%% callback
|
|
- syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta]),
|
|
|
|
|
|
+ syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta, normal]),
|
|
%% return
|
|
%% return
|
|
ok;
|
|
ok;
|
|
|
|
|
|
@@ -343,14 +343,14 @@ handle_call({join_on_node, RequesterNode, GroupName, Pid, Meta}, _From, #state{
|
|
undefined -> erlang:monitor(process, Pid); %% process is not monitored yet, create
|
|
undefined -> erlang:monitor(process, Pid); %% process is not monitored yet, create
|
|
MRef0 -> MRef0
|
|
MRef0 -> MRef0
|
|
end,
|
|
end,
|
|
- do_join_on_node(GroupName, Pid, Meta, MRef, RequesterNode, on_process_joined, State);
|
|
|
|
|
|
+ do_join_on_node(GroupName, Pid, Meta, MRef, normal, RequesterNode, on_process_joined, State);
|
|
|
|
|
|
{{_, Meta}, _, _, _, _} ->
|
|
{{_, Meta}, _, _, _, _} ->
|
|
%% re-joined with same meta
|
|
%% re-joined with same meta
|
|
{ok, noop};
|
|
{ok, noop};
|
|
|
|
|
|
{{_, _}, _, _, MRef, _} ->
|
|
{{_, _}, _, _, MRef, _} ->
|
|
- do_join_on_node(GroupName, Pid, Meta, MRef, RequesterNode, on_group_process_updated, State)
|
|
|
|
|
|
+ do_join_on_node(GroupName, Pid, Meta, MRef, normal, RequesterNode, on_group_process_updated, State)
|
|
end;
|
|
end;
|
|
|
|
|
|
false ->
|
|
false ->
|
|
@@ -372,9 +372,9 @@ handle_call({leave_on_node, RequesterNode, GroupName, Pid}, _From, #state{
|
|
%% remove from table
|
|
%% remove from table
|
|
remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
|
|
remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
|
|
%% callback
|
|
%% callback
|
|
- syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta]),
|
|
|
|
|
|
+ syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta, normal]),
|
|
%% broadcast
|
|
%% broadcast
|
|
- syn_gen_scope:broadcast({'3.0', sync_leave, GroupName, Pid, Meta}, [RequesterNode], State),
|
|
|
|
|
|
+ syn_gen_scope:broadcast({'3.0', sync_leave, GroupName, Pid, Meta, normal}, [RequesterNode], State),
|
|
%% return
|
|
%% return
|
|
{reply, {ok, {Meta, TableByPid}}, State}
|
|
{reply, {ok, {Meta, TableByPid}}, State}
|
|
end;
|
|
end;
|
|
@@ -390,19 +390,26 @@ handle_call(Request, From, #state{scope = Scope} = State) ->
|
|
{noreply, #state{}} |
|
|
{noreply, #state{}} |
|
|
{noreply, #state{}, timeout() | hibernate | {continue, term()}} |
|
|
{noreply, #state{}, timeout() | hibernate | {continue, term()}} |
|
|
{stop, Reason :: term(), #state{}}.
|
|
{stop, Reason :: term(), #state{}}.
|
|
-handle_info({'3.0', sync_join, GroupName, Pid, Meta, Time}, State) ->
|
|
|
|
- handle_groups_sync(GroupName, Pid, Meta, Time, State),
|
|
|
|
|
|
+handle_info({'3.0', sync_join, GroupName, Pid, Meta, Time, Reason}, State) ->
|
|
|
|
+ handle_groups_sync(GroupName, Pid, Meta, Time, Reason, State),
|
|
{noreply, State};
|
|
{noreply, State};
|
|
|
|
|
|
-handle_info({'3.0', sync_leave, GroupName, Pid, Meta}, #state{
|
|
|
|
|
|
+handle_info({'3.0', sync_leave, GroupName, Pid, Meta, Reason}, #state{
|
|
scope = Scope,
|
|
scope = Scope,
|
|
table_by_name = TableByName,
|
|
table_by_name = TableByName,
|
|
table_by_pid = TableByPid
|
|
table_by_pid = TableByPid
|
|
} = State) ->
|
|
} = State) ->
|
|
- %% remove from table
|
|
|
|
- remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
|
|
|
|
- %% callback
|
|
|
|
- syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta]),
|
|
|
|
|
|
+ case find_groups_entry_by_name_and_pid(GroupName, Pid, TableByName) of
|
|
|
|
+ undefined ->
|
|
|
|
+ %% not in table, nothing to do
|
|
|
|
+ ok;
|
|
|
|
+
|
|
|
|
+ _ ->
|
|
|
|
+ %% remove from table
|
|
|
|
+ remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
|
|
|
|
+ %% callback
|
|
|
|
+ syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta, Reason])
|
|
|
|
+ end,
|
|
%% return
|
|
%% return
|
|
{noreply, State};
|
|
{noreply, State};
|
|
|
|
|
|
@@ -423,9 +430,9 @@ handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{
|
|
%% remove from table
|
|
%% remove from table
|
|
remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
|
|
remove_from_local_table(GroupName, Pid, TableByName, TableByPid),
|
|
%% callback
|
|
%% callback
|
|
- syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta]),
|
|
|
|
|
|
+ syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta, Reason]),
|
|
%% broadcast
|
|
%% broadcast
|
|
- syn_gen_scope:broadcast({'3.0', sync_leave, GroupName, Pid, Meta}, State)
|
|
|
|
|
|
+ syn_gen_scope:broadcast({'3.0', sync_leave, GroupName, Pid, Meta, Reason}, State)
|
|
end, Entries)
|
|
end, Entries)
|
|
end,
|
|
end,
|
|
%% return
|
|
%% return
|
|
@@ -443,10 +450,10 @@ get_local_data(#state{table_by_name = TableByName}) ->
|
|
{ok, get_groups_tuples_for_node(node(), TableByName)}.
|
|
{ok, get_groups_tuples_for_node(node(), TableByName)}.
|
|
|
|
|
|
-spec save_remote_data(RemoteData :: term(), State :: term()) -> any().
|
|
-spec save_remote_data(RemoteData :: term(), State :: term()) -> any().
|
|
-save_remote_data(GroupsTuplesOfRemoteNode, State) ->
|
|
|
|
|
|
+save_remote_data(GroupsTuplesOfRemoteNode, #state{scope = Scope} = State) ->
|
|
%% insert tuples
|
|
%% insert tuples
|
|
lists:foreach(fun({GroupName, Pid, Meta, Time}) ->
|
|
lists:foreach(fun({GroupName, Pid, Meta, Time}) ->
|
|
- handle_groups_sync(GroupName, Pid, Meta, Time, State)
|
|
|
|
|
|
+ handle_groups_sync(GroupName, Pid, Meta, Time, {syn_remote_scope_node_up, Scope, node(Pid)}, State)
|
|
end, GroupsTuplesOfRemoteNode).
|
|
end, GroupsTuplesOfRemoteNode).
|
|
|
|
|
|
-spec purge_local_data_for_node(Node :: node(), State :: term()) -> any().
|
|
-spec purge_local_data_for_node(Node :: node(), State :: term()) -> any().
|
|
@@ -496,6 +503,7 @@ do_rebuild_monitors([{GroupName, Pid, Meta, Time} | T], NewMRefs, #state{
|
|
Pid :: pid(),
|
|
Pid :: pid(),
|
|
Meta :: term(),
|
|
Meta :: term(),
|
|
MRef :: reference() | undefined,
|
|
MRef :: reference() | undefined,
|
|
|
|
+ Reason :: term(),
|
|
RequesterNode :: node(),
|
|
RequesterNode :: node(),
|
|
CallbackMethod :: atom(),
|
|
CallbackMethod :: atom(),
|
|
#state{}
|
|
#state{}
|
|
@@ -510,7 +518,7 @@ do_rebuild_monitors([{GroupName, Pid, Meta, Time} | T], NewMRefs, #state{
|
|
}},
|
|
}},
|
|
#state{}
|
|
#state{}
|
|
}.
|
|
}.
|
|
-do_join_on_node(GroupName, Pid, Meta, MRef, RequesterNode, CallbackMethod, #state{
|
|
|
|
|
|
+do_join_on_node(GroupName, Pid, Meta, MRef, Reason, RequesterNode, CallbackMethod, #state{
|
|
scope = Scope,
|
|
scope = Scope,
|
|
table_by_name = TableByName,
|
|
table_by_name = TableByName,
|
|
table_by_pid = TableByPid
|
|
table_by_pid = TableByPid
|
|
@@ -519,9 +527,9 @@ do_join_on_node(GroupName, Pid, Meta, MRef, RequesterNode, CallbackMethod, #stat
|
|
%% add to local table
|
|
%% add to local table
|
|
add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid),
|
|
add_to_local_table(GroupName, Pid, Meta, Time, MRef, TableByName, TableByPid),
|
|
%% callback
|
|
%% callback
|
|
- syn_event_handler:call_event_handler(CallbackMethod, [Scope, GroupName, Pid, Meta]),
|
|
|
|
|
|
+ syn_event_handler:call_event_handler(CallbackMethod, [Scope, GroupName, Pid, Meta, Reason]),
|
|
%% broadcast
|
|
%% broadcast
|
|
- syn_gen_scope:broadcast({'3.0', sync_join, GroupName, Pid, Meta, Time}, [RequesterNode], State),
|
|
|
|
|
|
+ syn_gen_scope:broadcast({'3.0', sync_join, GroupName, Pid, Meta, Time, Reason}, [RequesterNode], State),
|
|
%% return
|
|
%% return
|
|
{reply, {ok, {CallbackMethod, Time, TableByName, TableByPid}}, State}.
|
|
{reply, {ok, {CallbackMethod, Time, TableByName, TableByPid}}, State}.
|
|
|
|
|
|
@@ -611,7 +619,9 @@ purge_groups_for_remote_node(Scope, Node, TableByName, TableByPid) when Node =/=
|
|
GroupsTuples = get_groups_tuples_for_node(Node, TableByName),
|
|
GroupsTuples = get_groups_tuples_for_node(Node, TableByName),
|
|
spawn(fun() ->
|
|
spawn(fun() ->
|
|
lists:foreach(fun({GroupName, Pid, Meta, _Time}) ->
|
|
lists:foreach(fun({GroupName, Pid, Meta, _Time}) ->
|
|
- syn_event_handler:call_event_handler(on_process_left, [Scope, GroupName, Pid, Meta])
|
|
|
|
|
|
+ syn_event_handler:call_event_handler(on_process_left,
|
|
|
|
+ [Scope, GroupName, Pid, Meta, {syn_remote_scope_node_down, Scope, Node}]
|
|
|
|
+ )
|
|
end, GroupsTuples)
|
|
end, GroupsTuples)
|
|
end),
|
|
end),
|
|
ets:match_delete(TableByName, {{'_', '_'}, '_', '_', '_', Node}),
|
|
ets:match_delete(TableByName, {{'_', '_'}, '_', '_', '_', Node}),
|
|
@@ -622,9 +632,10 @@ purge_groups_for_remote_node(Scope, Node, TableByName, TableByPid) when Node =/=
|
|
Pid :: pid(),
|
|
Pid :: pid(),
|
|
Meta :: term(),
|
|
Meta :: term(),
|
|
Time :: non_neg_integer(),
|
|
Time :: non_neg_integer(),
|
|
|
|
+ Reason :: term(),
|
|
#state{}
|
|
#state{}
|
|
) -> any().
|
|
) -> any().
|
|
-handle_groups_sync(GroupName, Pid, Meta, Time, #state{
|
|
|
|
|
|
+handle_groups_sync(GroupName, Pid, Meta, Time, Reason, #state{
|
|
scope = Scope,
|
|
scope = Scope,
|
|
table_by_name = TableByName,
|
|
table_by_name = TableByName,
|
|
table_by_pid = TableByPid
|
|
table_by_pid = TableByPid
|
|
@@ -634,18 +645,19 @@ handle_groups_sync(GroupName, Pid, Meta, Time, #state{
|
|
%% new
|
|
%% new
|
|
add_to_local_table(GroupName, Pid, Meta, Time, undefined, TableByName, TableByPid),
|
|
add_to_local_table(GroupName, Pid, Meta, Time, undefined, TableByName, TableByPid),
|
|
%% callback
|
|
%% callback
|
|
- syn_event_handler:call_event_handler(on_process_joined, [Scope, GroupName, Pid, Meta]);
|
|
|
|
|
|
+ syn_event_handler:call_event_handler(on_process_joined, [Scope, GroupName, Pid, Meta, Reason]);
|
|
|
|
|
|
- {{GroupName, Pid}, TableMeta, TableTime, _MRef, _TableNode} when Time > TableTime ->
|
|
|
|
|
|
+ {{GroupName, Pid}, TableMeta, TableTime, _, _} when Time > TableTime ->
|
|
%% maybe updated meta or time only
|
|
%% maybe updated meta or time only
|
|
add_to_local_table(GroupName, Pid, Meta, Time, undefined, TableByName, TableByPid),
|
|
add_to_local_table(GroupName, Pid, Meta, Time, undefined, TableByName, TableByPid),
|
|
%% callback (call only if meta update)
|
|
%% callback (call only if meta update)
|
|
case TableMeta =/= Meta of
|
|
case TableMeta =/= Meta of
|
|
- true -> syn_event_handler:call_event_handler(on_group_process_updated, [Scope, GroupName, Pid, Meta]);
|
|
|
|
|
|
+ true ->
|
|
|
|
+ syn_event_handler:call_event_handler(on_group_process_updated, [Scope, GroupName, Pid, Meta, Reason]);
|
|
_ -> ok
|
|
_ -> ok
|
|
end;
|
|
end;
|
|
|
|
|
|
- {{GroupName, Pid}, _TableMeta, _TableTime, _TableMRef, _TableNode} ->
|
|
|
|
|
|
+ _ ->
|
|
%% race condition: incoming data is older, ignore
|
|
%% race condition: incoming data is older, ignore
|
|
ok
|
|
ok
|
|
end.
|
|
end.
|