|
@@ -80,7 +80,7 @@ is_member(Scope, GroupName, Pid) ->
|
|
error({invalid_scope, Scope});
|
|
error({invalid_scope, Scope});
|
|
|
|
|
|
TableByName ->
|
|
TableByName ->
|
|
- case find_groups_entry_by_name_and_pid(GroupName, Pid, TableByName) of
|
|
|
|
|
|
+ case find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) of
|
|
undefined -> false;
|
|
undefined -> false;
|
|
_ -> true
|
|
_ -> true
|
|
end
|
|
end
|
|
@@ -111,7 +111,7 @@ is_local_member(Scope, GroupName, Pid) ->
|
|
error({invalid_scope, Scope});
|
|
error({invalid_scope, Scope});
|
|
|
|
|
|
TableByName ->
|
|
TableByName ->
|
|
- case find_groups_entry_by_name_and_pid(GroupName, Pid, TableByName) of
|
|
|
|
|
|
+ case find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) of
|
|
{{_, _}, _, _, _, Node} when Node =:= node() -> true;
|
|
{{_, _}, _, _, _, Node} when Node =:= node() -> true;
|
|
_ -> false
|
|
_ -> false
|
|
end
|
|
end
|
|
@@ -182,12 +182,12 @@ group_names_ordset(Scope, NodeParam) ->
|
|
error({invalid_scope, Scope});
|
|
error({invalid_scope, Scope});
|
|
|
|
|
|
TableByName ->
|
|
TableByName ->
|
|
- Groups = ets:select(TableByName, [{
|
|
|
|
|
|
+ DuplicatedGroups = ets:select(TableByName, [{
|
|
{{'$1', '_'}, '_', '_', '_', NodeParam},
|
|
{{'$1', '_'}, '_', '_', '_', NodeParam},
|
|
[],
|
|
[],
|
|
['$1']
|
|
['$1']
|
|
}]),
|
|
}]),
|
|
- ordsets:from_list(Groups)
|
|
|
|
|
|
+ ordsets:from_list(DuplicatedGroups)
|
|
end.
|
|
end.
|
|
|
|
|
|
-spec publish(Scope :: atom(), GroupName :: term(), Message :: term()) -> {ok, RecipientCount :: non_neg_integer()}.
|
|
-spec publish(Scope :: atom(), GroupName :: term(), Message :: term()) -> {ok, RecipientCount :: non_neg_integer()}.
|
|
@@ -240,7 +240,7 @@ init(#state{
|
|
table_by_pid = TableByPid
|
|
table_by_pid = TableByPid
|
|
}) ->
|
|
}) ->
|
|
%% purge remote & rebuild
|
|
%% purge remote & rebuild
|
|
- purge_groups_for_remote_nodes(Scope, TableByName, TableByPid),
|
|
|
|
|
|
+ purge_pg_for_remote_nodes(Scope, TableByName, TableByPid),
|
|
rebuild_monitors(Scope, TableByName, TableByPid),
|
|
rebuild_monitors(Scope, TableByName, TableByPid),
|
|
%% init
|
|
%% init
|
|
HandlerState = #{},
|
|
HandlerState = #{},
|
|
@@ -262,7 +262,7 @@ handle_call({'3.0', join_on_node, RequesterNode, GroupName, Pid, Meta}, _From, #
|
|
} = State) ->
|
|
} = State) ->
|
|
case is_process_alive(Pid) of
|
|
case is_process_alive(Pid) of
|
|
true ->
|
|
true ->
|
|
- case find_groups_entry_by_name_and_pid(GroupName, Pid, TableByName) of
|
|
|
|
|
|
+ case find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) of
|
|
undefined ->
|
|
undefined ->
|
|
%% add
|
|
%% add
|
|
MRef = case find_monitor_for_pid(Pid, TableByPid) of
|
|
MRef = case find_monitor_for_pid(Pid, TableByPid) of
|
|
@@ -288,7 +288,7 @@ handle_call({'3.0', leave_on_node, RequesterNode, GroupName, Pid}, _From, #state
|
|
table_by_name = TableByName,
|
|
table_by_name = TableByName,
|
|
table_by_pid = TableByPid
|
|
table_by_pid = TableByPid
|
|
} = State) ->
|
|
} = State) ->
|
|
- case find_groups_entry_by_name_and_pid(GroupName, Pid, TableByName) of
|
|
|
|
|
|
+ case find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) of
|
|
undefined ->
|
|
undefined ->
|
|
{reply, {{error, not_in_group}, undefined}, State};
|
|
{reply, {{error, not_in_group}, undefined}, State};
|
|
|
|
|
|
@@ -319,7 +319,7 @@ handle_call(Request, From, #state{scope = Scope} = State) ->
|
|
handle_info({'3.0', sync_join, GroupName, Pid, Meta, Time, Reason}, #state{nodes_map = NodesMap} = State) ->
|
|
handle_info({'3.0', sync_join, GroupName, Pid, Meta, Time, Reason}, #state{nodes_map = NodesMap} = State) ->
|
|
case maps:is_key(node(Pid), NodesMap) of
|
|
case maps:is_key(node(Pid), NodesMap) of
|
|
true ->
|
|
true ->
|
|
- handle_groups_sync(GroupName, Pid, Meta, Time, Reason, State);
|
|
|
|
|
|
+ handle_pg_sync(GroupName, Pid, Meta, Time, Reason, State);
|
|
|
|
|
|
false ->
|
|
false ->
|
|
%% ignore, race condition
|
|
%% ignore, race condition
|
|
@@ -332,7 +332,7 @@ handle_info({'3.0', sync_leave, GroupName, Pid, Meta, Reason}, #state{
|
|
table_by_name = TableByName,
|
|
table_by_name = TableByName,
|
|
table_by_pid = TableByPid
|
|
table_by_pid = TableByPid
|
|
} = State) ->
|
|
} = State) ->
|
|
- case find_groups_entry_by_name_and_pid(GroupName, Pid, TableByName) of
|
|
|
|
|
|
+ case find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) of
|
|
undefined ->
|
|
undefined ->
|
|
%% not in table, nothing to do
|
|
%% not in table, nothing to do
|
|
ok;
|
|
ok;
|
|
@@ -351,7 +351,7 @@ handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{
|
|
table_by_name = TableByName,
|
|
table_by_name = TableByName,
|
|
table_by_pid = TableByPid
|
|
table_by_pid = TableByPid
|
|
} = State) ->
|
|
} = State) ->
|
|
- case find_groups_entries_by_pid(Pid, TableByPid) of
|
|
|
|
|
|
+ case find_pg_entries_by_pid(Pid, TableByPid) of
|
|
[] ->
|
|
[] ->
|
|
error_logger:warning_msg(
|
|
error_logger:warning_msg(
|
|
"SYN[~s<~s>] Received a DOWN message from an unknown process ~p with reason: ~p",
|
|
"SYN[~s<~s>] Received a DOWN message from an unknown process ~p with reason: ~p",
|
|
@@ -380,14 +380,14 @@ handle_info(Info, #state{scope = Scope} = State) ->
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
-spec get_local_data(State :: term()) -> {ok, Data :: term()} | undefined.
|
|
-spec get_local_data(State :: term()) -> {ok, Data :: term()} | undefined.
|
|
get_local_data(#state{table_by_name = TableByName}) ->
|
|
get_local_data(#state{table_by_name = TableByName}) ->
|
|
- {ok, get_groups_tuples_for_node(node(), TableByName)}.
|
|
|
|
|
|
+ {ok, get_pg_tuples_for_node(node(), TableByName)}.
|
|
|
|
|
|
-spec save_remote_data(RemoteData :: term(), State :: term()) -> any().
|
|
-spec save_remote_data(RemoteData :: term(), State :: term()) -> any().
|
|
-save_remote_data(GroupsTuplesOfRemoteNode, #state{scope = Scope} = State) ->
|
|
|
|
|
|
+save_remote_data(PgTuplesOfRemoteNode, #state{scope = Scope} = State) ->
|
|
%% insert tuples
|
|
%% insert tuples
|
|
lists:foreach(fun({GroupName, Pid, Meta, Time}) ->
|
|
lists:foreach(fun({GroupName, Pid, Meta, Time}) ->
|
|
- handle_groups_sync(GroupName, Pid, Meta, Time, {syn_remote_scope_node_up, Scope, node(Pid)}, State)
|
|
|
|
- end, GroupsTuplesOfRemoteNode).
|
|
|
|
|
|
+ handle_pg_sync(GroupName, Pid, Meta, Time, {syn_remote_scope_node_up, Scope, node(Pid)}, State)
|
|
|
|
+ end, PgTuplesOfRemoteNode).
|
|
|
|
|
|
-spec purge_local_data_for_node(Node :: node(), State :: term()) -> any().
|
|
-spec purge_local_data_for_node(Node :: node(), State :: term()) -> any().
|
|
purge_local_data_for_node(Node, #state{
|
|
purge_local_data_for_node(Node, #state{
|
|
@@ -395,15 +395,15 @@ purge_local_data_for_node(Node, #state{
|
|
table_by_name = TableByName,
|
|
table_by_name = TableByName,
|
|
table_by_pid = TableByPid
|
|
table_by_pid = TableByPid
|
|
}) ->
|
|
}) ->
|
|
- purge_groups_for_remote_node(Scope, Node, TableByName, TableByPid).
|
|
|
|
|
|
+ purge_pg_for_remote_node(Scope, Node, TableByName, TableByPid).
|
|
|
|
|
|
%% ===================================================================
|
|
%% ===================================================================
|
|
%% Internal
|
|
%% Internal
|
|
%% ===================================================================
|
|
%% ===================================================================
|
|
-spec rebuild_monitors(Scope :: atom(), TableByName :: atom(), TableByPid :: atom()) -> ok.
|
|
-spec rebuild_monitors(Scope :: atom(), TableByName :: atom(), TableByPid :: atom()) -> ok.
|
|
rebuild_monitors(Scope, TableByName, TableByPid) ->
|
|
rebuild_monitors(Scope, TableByName, TableByPid) ->
|
|
- GroupsTuples = get_groups_tuples_for_node(node(), TableByName),
|
|
|
|
- do_rebuild_monitors(GroupsTuples, #{}, Scope, TableByName, TableByPid).
|
|
|
|
|
|
+ PgTuples = get_pg_tuples_for_node(node(), TableByName),
|
|
|
|
+ do_rebuild_monitors(PgTuples, #{}, Scope, TableByName, TableByPid).
|
|
|
|
|
|
-spec do_rebuild_monitors(
|
|
-spec do_rebuild_monitors(
|
|
[syn_pg_tuple()],
|
|
[syn_pg_tuple()],
|
|
@@ -471,8 +471,8 @@ do_join_on_node(GroupName, Pid, Meta, MRef, Reason, RequesterNode, CallbackMetho
|
|
%% return
|
|
%% return
|
|
{reply, {ok, {CallbackMethod, Time, TableByName, TableByPid}}, State}.
|
|
{reply, {ok, {CallbackMethod, Time, TableByName, TableByPid}}, State}.
|
|
|
|
|
|
--spec get_groups_tuples_for_node(Node :: node(), TableByName :: atom()) -> [syn_pg_tuple()].
|
|
|
|
-get_groups_tuples_for_node(Node, TableByName) ->
|
|
|
|
|
|
+-spec get_pg_tuples_for_node(Node :: node(), TableByName :: atom()) -> [syn_pg_tuple()].
|
|
|
|
+get_pg_tuples_for_node(Node, TableByName) ->
|
|
ets:select(TableByName, [{
|
|
ets:select(TableByName, [{
|
|
{{'$1', '$2'}, '$3', '$4', '_', Node},
|
|
{{'$1', '$2'}, '$3', '$4', '_', Node},
|
|
[],
|
|
[],
|
|
@@ -492,16 +492,16 @@ find_monitor_for_pid(Pid, TableByPid) when is_pid(Pid) ->
|
|
'$end_of_table' -> undefined
|
|
'$end_of_table' -> undefined
|
|
end.
|
|
end.
|
|
|
|
|
|
--spec find_groups_entry_by_name_and_pid(GroupName :: term(), Pid :: pid(), TableByName :: atom()) ->
|
|
|
|
|
|
+-spec find_pg_entry_by_name_and_pid(GroupName :: term(), Pid :: pid(), TableByName :: atom()) ->
|
|
Entry :: syn_pg_entry() | undefined.
|
|
Entry :: syn_pg_entry() | undefined.
|
|
-find_groups_entry_by_name_and_pid(GroupName, Pid, TableByName) ->
|
|
|
|
|
|
+find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) ->
|
|
case ets:lookup(TableByName, {GroupName, Pid}) of
|
|
case ets:lookup(TableByName, {GroupName, Pid}) of
|
|
[] -> undefined;
|
|
[] -> undefined;
|
|
[Entry] -> Entry
|
|
[Entry] -> Entry
|
|
end.
|
|
end.
|
|
|
|
|
|
--spec find_groups_entries_by_pid(Pid :: pid(), TableByPid :: atom()) -> GroupEntries :: [syn_pg_entry()].
|
|
|
|
-find_groups_entries_by_pid(Pid, TableByPid) when is_pid(Pid) ->
|
|
|
|
|
|
+-spec find_pg_entries_by_pid(Pid :: pid(), TableByPid :: atom()) -> GroupEntries :: [syn_pg_entry()].
|
|
|
|
+find_pg_entries_by_pid(Pid, TableByPid) when is_pid(Pid) ->
|
|
ets:select(TableByPid, [{
|
|
ets:select(TableByPid, [{
|
|
{{Pid, '_'}, '_', '_', '_', '_'},
|
|
{{Pid, '_'}, '_', '_', '_', '_'},
|
|
[],
|
|
[],
|
|
@@ -551,34 +551,34 @@ remove_from_local_table(GroupName, Pid, TableByName, TableByPid) ->
|
|
true = ets:delete(TableByName, {GroupName, Pid}),
|
|
true = ets:delete(TableByName, {GroupName, Pid}),
|
|
true = ets:delete(TableByPid, {Pid, GroupName}).
|
|
true = ets:delete(TableByPid, {Pid, GroupName}).
|
|
|
|
|
|
--spec purge_groups_for_remote_nodes(Scope :: atom(), TableByName :: atom(), TableByPid :: atom()) -> any().
|
|
|
|
-purge_groups_for_remote_nodes(Scope, TableByName, TableByPid) ->
|
|
|
|
|
|
+-spec purge_pg_for_remote_nodes(Scope :: atom(), TableByName :: atom(), TableByPid :: atom()) -> any().
|
|
|
|
+purge_pg_for_remote_nodes(Scope, TableByName, TableByPid) ->
|
|
LocalNode = node(),
|
|
LocalNode = node(),
|
|
- RemoteNodesWithDoubles = ets:select(TableByName, [{
|
|
|
|
|
|
+ DuplicatedRemoteNodes = ets:select(TableByName, [{
|
|
{{'_', '_'}, '_', '_', '_', '$6'},
|
|
{{'_', '_'}, '_', '_', '_', '$6'},
|
|
[{'=/=', '$6', LocalNode}],
|
|
[{'=/=', '$6', LocalNode}],
|
|
['$6']
|
|
['$6']
|
|
}]),
|
|
}]),
|
|
- RemoteNodes = ordsets:from_list(RemoteNodesWithDoubles),
|
|
|
|
|
|
+ RemoteNodes = ordsets:from_list(DuplicatedRemoteNodes),
|
|
ordsets:fold(fun(RemoteNode, _) ->
|
|
ordsets:fold(fun(RemoteNode, _) ->
|
|
- purge_groups_for_remote_node(Scope, RemoteNode, TableByName, TableByPid)
|
|
|
|
|
|
+ purge_pg_for_remote_node(Scope, RemoteNode, TableByName, TableByPid)
|
|
end, undefined, RemoteNodes).
|
|
end, undefined, RemoteNodes).
|
|
|
|
|
|
--spec purge_groups_for_remote_node(Scope :: atom(), Node :: atom(), TableByName :: atom(), TableByPid :: atom()) -> true.
|
|
|
|
-purge_groups_for_remote_node(Scope, Node, TableByName, TableByPid) when Node =/= node() ->
|
|
|
|
|
|
+-spec purge_pg_for_remote_node(Scope :: atom(), Node :: atom(), TableByName :: atom(), TableByPid :: atom()) -> true.
|
|
|
|
+purge_pg_for_remote_node(Scope, Node, TableByName, TableByPid) when Node =/= node() ->
|
|
%% loop elements for callback in a separate process to free scope process
|
|
%% loop elements for callback in a separate process to free scope process
|
|
- GroupsTuples = get_groups_tuples_for_node(Node, TableByName),
|
|
|
|
|
|
+ PgTuples = get_pg_tuples_for_node(Node, TableByName),
|
|
spawn(fun() ->
|
|
spawn(fun() ->
|
|
lists:foreach(fun({GroupName, Pid, Meta, _Time}) ->
|
|
lists:foreach(fun({GroupName, Pid, Meta, _Time}) ->
|
|
syn_event_handler:call_event_handler(on_process_left,
|
|
syn_event_handler:call_event_handler(on_process_left,
|
|
[Scope, GroupName, Pid, Meta, {syn_remote_scope_node_down, Scope, Node}]
|
|
[Scope, GroupName, Pid, Meta, {syn_remote_scope_node_down, Scope, Node}]
|
|
)
|
|
)
|
|
- end, GroupsTuples)
|
|
|
|
|
|
+ end, PgTuples)
|
|
end),
|
|
end),
|
|
ets:match_delete(TableByName, {{'_', '_'}, '_', '_', '_', Node}),
|
|
ets:match_delete(TableByName, {{'_', '_'}, '_', '_', '_', Node}),
|
|
ets:match_delete(TableByPid, {{'_', '_'}, '_', '_', '_', Node}).
|
|
ets:match_delete(TableByPid, {{'_', '_'}, '_', '_', '_', Node}).
|
|
|
|
|
|
--spec handle_groups_sync(
|
|
|
|
|
|
+-spec handle_pg_sync(
|
|
GroupName :: term(),
|
|
GroupName :: term(),
|
|
Pid :: pid(),
|
|
Pid :: pid(),
|
|
Meta :: term(),
|
|
Meta :: term(),
|
|
@@ -586,12 +586,12 @@ purge_groups_for_remote_node(Scope, Node, TableByName, TableByPid) when Node =/=
|
|
Reason :: term(),
|
|
Reason :: term(),
|
|
#state{}
|
|
#state{}
|
|
) -> any().
|
|
) -> any().
|
|
-handle_groups_sync(GroupName, Pid, Meta, Time, Reason, #state{
|
|
|
|
|
|
+handle_pg_sync(GroupName, Pid, Meta, Time, Reason, #state{
|
|
scope = Scope,
|
|
scope = Scope,
|
|
table_by_name = TableByName,
|
|
table_by_name = TableByName,
|
|
table_by_pid = TableByPid
|
|
table_by_pid = TableByPid
|
|
}) ->
|
|
}) ->
|
|
- case find_groups_entry_by_name_and_pid(GroupName, Pid, TableByName) of
|
|
|
|
|
|
+ case find_pg_entry_by_name_and_pid(GroupName, Pid, TableByName) of
|
|
undefined ->
|
|
undefined ->
|
|
%% new
|
|
%% new
|
|
add_to_local_table(GroupName, Pid, Meta, Time, undefined, TableByName, TableByPid),
|
|
add_to_local_table(GroupName, Pid, Meta, Time, undefined, TableByName, TableByPid),
|