|
@@ -66,7 +66,7 @@
|
|
|
%% API
|
|
|
%% ===================================================================
|
|
|
-spec start_link(Scope :: atom()) ->
|
|
|
- {ok, Pid :: pid()} | {error, {already_started, Pid :: pid()}} | {error, Reason :: any()}.
|
|
|
+ {ok, Pid :: pid()} | {error, {already_started, Pid :: pid()}} | {error, Reason :: term()}.
|
|
|
start_link(Scope) when is_atom(Scope) ->
|
|
|
syn_gen_scope:start_link(?MODULE, Scope).
|
|
|
|
|
@@ -82,11 +82,11 @@ members(GroupName) ->
|
|
|
members(Scope, GroupName) ->
|
|
|
do_get_members(Scope, GroupName, '_').
|
|
|
|
|
|
--spec is_member(GroupName :: any(), Pid :: pid()) -> boolean().
|
|
|
+-spec is_member(GroupName :: term(), Pid :: pid()) -> boolean().
|
|
|
is_member(GroupName, Pid) ->
|
|
|
is_member(?DEFAULT_SCOPE, GroupName, Pid).
|
|
|
|
|
|
--spec is_member(Scope :: atom(), GroupName :: any(), Pid :: pid()) -> boolean().
|
|
|
+-spec is_member(Scope :: atom(), GroupName :: term(), Pid :: pid()) -> boolean().
|
|
|
is_member(Scope, GroupName, Pid) ->
|
|
|
case syn_backbone:get_table_name(syn_groups_by_name, Scope) of
|
|
|
undefined ->
|
|
@@ -121,11 +121,11 @@ do_get_members(Scope, GroupName, NodeParam) ->
|
|
|
}])
|
|
|
end.
|
|
|
|
|
|
--spec is_local_member(GroupName :: any(), Pid :: pid()) -> boolean().
|
|
|
+-spec is_local_member(GroupName :: term(), Pid :: pid()) -> boolean().
|
|
|
is_local_member(GroupName, Pid) ->
|
|
|
is_local_member(?DEFAULT_SCOPE, GroupName, Pid).
|
|
|
|
|
|
--spec is_local_member(Scope :: atom(), GroupName :: any(), Pid :: pid()) -> boolean().
|
|
|
+-spec is_local_member(Scope :: atom(), GroupName :: term(), Pid :: pid()) -> boolean().
|
|
|
is_local_member(Scope, GroupName, Pid) ->
|
|
|
case syn_backbone:get_table_name(syn_groups_by_name, Scope) of
|
|
|
undefined ->
|
|
@@ -165,11 +165,11 @@ join(Scope, GroupName, Pid, Meta) ->
|
|
|
Response
|
|
|
end.
|
|
|
|
|
|
--spec leave(GroupName :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
|
|
|
+-spec leave(GroupName :: term(), Pid :: pid()) -> ok | {error, Reason :: term()}.
|
|
|
leave(GroupName, Pid) ->
|
|
|
leave(?DEFAULT_SCOPE, GroupName, Pid).
|
|
|
|
|
|
--spec leave(Scope :: atom(), GroupName :: any(), Pid :: pid()) -> ok | {error, Reason :: any()}.
|
|
|
+-spec leave(Scope :: atom(), GroupName :: term(), Pid :: pid()) -> ok | {error, Reason :: term()}.
|
|
|
leave(Scope, GroupName, Pid) ->
|
|
|
case syn_backbone:get_table_name(syn_groups_by_name, Scope) of
|
|
|
undefined ->
|
|
@@ -263,25 +263,25 @@ do_group_names(Scope, NodeParam) ->
|
|
|
ordsets:to_list(Set)
|
|
|
end.
|
|
|
|
|
|
--spec publish(GroupName :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
|
|
|
+-spec publish(GroupName :: term(), Message :: term()) -> {ok, RecipientCount :: non_neg_integer()}.
|
|
|
publish(GroupName, Message) ->
|
|
|
publish(?DEFAULT_SCOPE, GroupName, Message).
|
|
|
|
|
|
--spec publish(Scope :: atom(), GroupName :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
|
|
|
+-spec publish(Scope :: atom(), GroupName :: term(), Message :: term()) -> {ok, RecipientCount :: non_neg_integer()}.
|
|
|
publish(Scope, GroupName, Message) ->
|
|
|
Members = members(Scope, GroupName),
|
|
|
do_publish(Members, Message).
|
|
|
|
|
|
--spec local_publish(GroupName :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
|
|
|
+-spec local_publish(GroupName :: term(), Message :: term()) -> {ok, RecipientCount :: non_neg_integer()}.
|
|
|
local_publish(GroupName, Message) ->
|
|
|
local_publish(?DEFAULT_SCOPE, GroupName, Message).
|
|
|
|
|
|
--spec local_publish(Scope :: atom(), GroupName :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
|
|
|
+-spec local_publish(Scope :: atom(), GroupName :: term(), Message :: term()) -> {ok, RecipientCount :: non_neg_integer()}.
|
|
|
local_publish(Scope, GroupName, Message) ->
|
|
|
Members = local_members(Scope, GroupName),
|
|
|
do_publish(Members, Message).
|
|
|
|
|
|
--spec do_publish(Members :: [{Pid :: pid(), Meta :: term()}], Message :: any()) ->
|
|
|
+-spec do_publish(Members :: [{Pid :: pid(), Meta :: term()}], Message :: term()) ->
|
|
|
{ok, RecipientCount :: non_neg_integer()}.
|
|
|
do_publish(Members, Message) ->
|
|
|
lists:foreach(fun({Pid, _Meta}) ->
|
|
@@ -289,16 +289,16 @@ do_publish(Members, Message) ->
|
|
|
end, Members),
|
|
|
{ok, length(Members)}.
|
|
|
|
|
|
--spec multi_call(GroupName :: any(), Message :: any()) -> {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
|
|
|
+-spec multi_call(GroupName :: term(), Message :: term()) -> {[{pid(), Reply :: term()}], [BadPid :: pid()]}.
|
|
|
multi_call(GroupName, Message) ->
|
|
|
multi_call(?DEFAULT_SCOPE, GroupName, Message).
|
|
|
|
|
|
--spec multi_call(Scope :: atom(), GroupName :: any(), Message :: any()) -> {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
|
|
|
+-spec multi_call(Scope :: atom(), GroupName :: term(), Message :: term()) -> {[{pid(), Reply :: term()}], [BadPid :: pid()]}.
|
|
|
multi_call(Scope, GroupName, Message) ->
|
|
|
multi_call(Scope, GroupName, Message, ?DEFAULT_MULTI_CALL_TIMEOUT_MS).
|
|
|
|
|
|
--spec multi_call(Scope :: atom(), GroupName :: any(), Message :: any(), Timeout :: non_neg_integer()) ->
|
|
|
- {[{pid(), Reply :: any()}], [BadPid :: pid()]}.
|
|
|
+-spec multi_call(Scope :: atom(), GroupName :: term(), Message :: term(), Timeout :: non_neg_integer()) ->
|
|
|
+ {[{pid(), Reply :: term()}], [BadPid :: pid()]}.
|
|
|
multi_call(Scope, GroupName, Message, Timeout) ->
|
|
|
Self = self(),
|
|
|
Members = members(Scope, GroupName),
|
|
@@ -307,7 +307,7 @@ multi_call(Scope, GroupName, Message, Timeout) ->
|
|
|
end, Members),
|
|
|
collect_replies(orddict:from_list(Members)).
|
|
|
|
|
|
--spec multi_call_reply(CallerPid :: pid(), Reply :: any()) -> {syn_multi_call_reply, pid(), Reply :: any()}.
|
|
|
+-spec multi_call_reply(CallerPid :: pid(), Reply :: term()) -> {syn_multi_call_reply, pid(), Reply :: term()}.
|
|
|
multi_call_reply(CallerPid, Reply) ->
|
|
|
CallerPid ! {syn_multi_call_reply, self(), Reply}.
|
|
|
|
|
@@ -444,11 +444,11 @@ handle_info(Info, #state{scope = Scope} = State) ->
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
|
%% Data callbacks
|
|
|
%% ----------------------------------------------------------------------------------------------------------
|
|
|
--spec get_local_data(State :: term()) -> {ok, Data :: any()} | undefined.
|
|
|
+-spec get_local_data(State :: term()) -> {ok, Data :: term()} | undefined.
|
|
|
get_local_data(#state{table_by_name = TableByName}) ->
|
|
|
{ok, get_groups_tuples_for_node(node(), TableByName)}.
|
|
|
|
|
|
--spec save_remote_data(RemoteData :: any(), State :: term()) -> any().
|
|
|
+-spec save_remote_data(RemoteData :: term(), State :: term()) -> any().
|
|
|
save_remote_data(GroupsTuplesOfRemoteNode, State) ->
|
|
|
%% insert tuples
|
|
|
lists:foreach(fun({GroupName, Pid, Meta, Time}) ->
|
|
@@ -552,7 +552,7 @@ find_monitor_for_pid(Pid, TableByPid) when is_pid(Pid) ->
|
|
|
'$end_of_table' -> undefined
|
|
|
end.
|
|
|
|
|
|
--spec find_groups_entry_by_name_and_pid(GroupName :: any(), Pid :: pid(), TableByName :: atom()) ->
|
|
|
+-spec find_groups_entry_by_name_and_pid(GroupName :: term(), Pid :: pid(), TableByName :: atom()) ->
|
|
|
Entry :: syn_groups_entry() | undefined.
|
|
|
find_groups_entry_by_name_and_pid(GroupName, Pid, TableByName) ->
|
|
|
case ets:lookup(TableByName, {GroupName, Pid}) of
|
|
@@ -660,7 +660,7 @@ handle_groups_sync(GroupName, Pid, Meta, Time, #state{
|
|
|
CollectorPid :: pid(),
|
|
|
Pid :: pid(),
|
|
|
Meta :: term(),
|
|
|
- Message :: any(),
|
|
|
+ Message :: term(),
|
|
|
Timeout :: non_neg_integer()
|
|
|
) -> any().
|
|
|
multi_call_and_receive(CollectorPid, Pid, Meta, Message, Timeout) ->
|
|
@@ -680,21 +680,21 @@ multi_call_and_receive(CollectorPid, Pid, Meta, Message, Timeout) ->
|
|
|
CollectorPid ! {bad_pid, Pid}
|
|
|
end.
|
|
|
|
|
|
--spec collect_replies(MembersOD :: orddict:orddict({pid(), Meta :: any()})) ->
|
|
|
+-spec collect_replies(MembersOD :: orddict:orddict({pid(), Meta :: term()})) ->
|
|
|
{
|
|
|
- Replies :: [{{pid(), Meta :: term()}, Reply :: any()}],
|
|
|
+ Replies :: [{{pid(), Meta :: term()}, Reply :: term()}],
|
|
|
BadReplies :: [{pid(), Meta :: term()}]
|
|
|
}.
|
|
|
collect_replies(MembersOD) ->
|
|
|
collect_replies(MembersOD, [], []).
|
|
|
|
|
|
-spec collect_replies(
|
|
|
- MembersOD :: orddict:orddict({pid(), Meta :: any()}),
|
|
|
- Replies :: [{{pid(), Meta :: term()}, Reply :: any()}],
|
|
|
+ MembersOD :: orddict:orddict({pid(), Meta :: term()}),
|
|
|
+ Replies :: [{{pid(), Meta :: term()}, Reply :: term()}],
|
|
|
BadReplies :: [{pid(), Meta :: term()}]
|
|
|
) ->
|
|
|
{
|
|
|
- Replies :: [{{pid(), Meta :: term()}, Reply :: any()}],
|
|
|
+ Replies :: [{{pid(), Meta :: term()}, Reply :: term()}],
|
|
|
BadReplies :: [{pid(), Meta :: term()}]
|
|
|
}.
|
|
|
collect_replies([], Replies, BadReplies) -> {Replies, BadReplies};
|