|
@@ -39,7 +39,7 @@ process_register(#user{username=U} = RegisterData0) ->
|
|
accounts:create_account(U),
|
|
accounts:create_account(U),
|
|
{ok, DefaultQuota} = kvs:get(config, "accounts/default_quota", 300),
|
|
{ok, DefaultQuota} = kvs:get(config, "accounts/default_quota", 300),
|
|
accounts:transaction(U, quota, DefaultQuota, #tx_default_assignment{}),
|
|
accounts:transaction(U, quota, DefaultQuota, #tx_default_assignment{}),
|
|
- init_mq(U, []),
|
|
|
|
|
|
+ init_mq(U),
|
|
{ok, U}.
|
|
{ok, U}.
|
|
|
|
|
|
check_username(Name, FbId) ->
|
|
check_username(Name, FbId) ->
|
|
@@ -66,17 +66,14 @@ get({email, Email}) -> kvs:user_by_email(Email);
|
|
get(UId) -> kvs:get(user, UId).
|
|
get(UId) -> kvs:get(user, UId).
|
|
|
|
|
|
subscribe(Who, Whom) ->
|
|
subscribe(Who, Whom) ->
|
|
- case is_user_blocked(Who, Whom) of
|
|
|
|
- false -> Record = #subscription{key={Who,Whom}, who = Who, whom = Whom},
|
|
|
|
- kvs:put(Record),
|
|
|
|
- subscribe_user_mq(user, Who, Whom);
|
|
|
|
- true -> do_nothing
|
|
|
|
- end.
|
|
|
|
|
|
+ Record = #subscription{key={Who,Whom}, who = Who, whom = Whom},
|
|
|
|
+ kvs:put(Record),
|
|
|
|
+ subscription_mq(user, add, Who, Whom).
|
|
|
|
|
|
unsubscribe(Who, Whom) ->
|
|
unsubscribe(Who, Whom) ->
|
|
case subscribed(Who, Whom) of
|
|
case subscribed(Who, Whom) of
|
|
true -> kvs:delete(subscription, {Who, Whom}),
|
|
true -> kvs:delete(subscription, {Who, Whom}),
|
|
- remove_subscription_mq(user, Who, Whom);
|
|
|
|
|
|
+ subscription_mq(user, remove, Who, Whom);
|
|
false -> skip end.
|
|
false -> skip end.
|
|
|
|
|
|
subscriptions(undefined)-> [];
|
|
subscriptions(undefined)-> [];
|
|
@@ -88,26 +85,6 @@ subscribed(Who, Whom) ->
|
|
{ok, _} -> true;
|
|
{ok, _} -> true;
|
|
_ -> false end.
|
|
_ -> false end.
|
|
|
|
|
|
-block(Who, Whom) ->
|
|
|
|
- ?INFO("~w:block_user/2 Who=~p Whom=~p", [?MODULE, Who, Whom]),
|
|
|
|
- unsubscribe(Who, Whom),
|
|
|
|
- kvs:block_user(Who, Whom),
|
|
|
|
- nsx_msg:notify_user_block(Who, Whom).
|
|
|
|
-
|
|
|
|
-unblock(Who, Whom) ->
|
|
|
|
- ?INFO("~w:unblock_user/2 Who=~p Whom=~p", [?MODULE, Who, Whom]),
|
|
|
|
- kvs:unblock_user(Who, Whom),
|
|
|
|
- nsx_msg:notify_user_unblock(Who, Whom).
|
|
|
|
-
|
|
|
|
-blocked_users(UserId) -> kvs:list_blocks(UserId).
|
|
|
|
-
|
|
|
|
-get_blocked_users_feed_id(UserId) ->
|
|
|
|
- UsersId = kvs:list_blocks(UserId),
|
|
|
|
- Users = kvs:select(user, fun(#user{username=U})-> lists:member(U, UsersId) end),
|
|
|
|
- {UsersId, [Fid || #user{feed=Fid} <- Users]}.
|
|
|
|
-
|
|
|
|
-is_user_blocked(Who, Whom) -> kvs:is_user_blocked(Who,Whom).
|
|
|
|
-
|
|
|
|
update_user(#user{username=UId,name=Name,surname=Surname} = NewUser) ->
|
|
update_user(#user{username=UId,name=Name,surname=Surname} = NewUser) ->
|
|
OldUser = case kvs:get(user,UId) of
|
|
OldUser = case kvs:get(user,UId) of
|
|
{error,notfound} -> NewUser;
|
|
{error,notfound} -> NewUser;
|
|
@@ -119,9 +96,7 @@ update_user(#user{username=UId,name=Name,surname=Surname} = NewUser) ->
|
|
false -> kvs:update_user_name(UId,Name,Surname)
|
|
false -> kvs:update_user_name(UId,Name,Surname)
|
|
end.
|
|
end.
|
|
|
|
|
|
-subscribe_user_mq(Type, MeId, ToId) -> process_subscription_mq(Type, add, MeId, ToId).
|
|
|
|
-remove_subscription_mq(Type, MeId, ToId) -> process_subscription_mq(Type, delete, MeId, ToId).
|
|
|
|
-process_subscription_mq(Type, Action, MeId, ToId) ->
|
|
|
|
|
|
+subscription_mq(Type, Action, MeId, ToId) ->
|
|
{ok, Channel} = mqs:open([]),
|
|
{ok, Channel} = mqs:open([]),
|
|
Routes = case Type of
|
|
Routes = case Type of
|
|
user -> rk_user_feed(ToId);
|
|
user -> rk_user_feed(ToId);
|
|
@@ -129,11 +104,12 @@ process_subscription_mq(Type, Action, MeId, ToId) ->
|
|
end,
|
|
end,
|
|
case Action of
|
|
case Action of
|
|
add -> bind_user_exchange(Channel, MeId, Routes);
|
|
add -> bind_user_exchange(Channel, MeId, Routes);
|
|
- delete -> catch(unbind_user_exchange(Channel, MeId, Routes))
|
|
|
|
|
|
+ remove -> unbind_user_exchange(Channel, MeId, Routes)
|
|
end,
|
|
end,
|
|
mqs_channel:close(Channel).
|
|
mqs_channel:close(Channel).
|
|
|
|
|
|
-init_mq(User, Groups) ->
|
|
|
|
|
|
+init_mq(User=#user{}) ->
|
|
|
|
+ Groups = groups:list_groups_per_user(User),
|
|
?INFO("~p init mq. users: ~p", [User, Groups]),
|
|
?INFO("~p init mq. users: ~p", [User, Groups]),
|
|
UserExchange = ?USER_EXCHANGE(User),
|
|
UserExchange = ?USER_EXCHANGE(User),
|
|
ExchangeOptions = [{type, <<"fanout">>}, durable, {auto_delete, false}],
|
|
ExchangeOptions = [{type, <<"fanout">>}, durable, {auto_delete, false}],
|
|
@@ -141,83 +117,70 @@ init_mq(User, Groups) ->
|
|
?INFO("Cration Exchange: ~p,",[{Channel,UserExchange,ExchangeOptions}]),
|
|
?INFO("Cration Exchange: ~p,",[{Channel,UserExchange,ExchangeOptions}]),
|
|
mqs_channel:create_exchange(Channel, UserExchange, ExchangeOptions), ?INFO("Created OK"),
|
|
mqs_channel:create_exchange(Channel, UserExchange, ExchangeOptions), ?INFO("Created OK"),
|
|
Relations = build_user_relations(User, Groups),
|
|
Relations = build_user_relations(User, Groups),
|
|
- [bind_user_exchange(Channel, User, RK) || RK <- [rk([feed, delete, User])|Relations]],
|
|
|
|
- mqs_channel:close(Channel),
|
|
|
|
- ok.
|
|
|
|
-
|
|
|
|
-init_mq_for_user(User) -> init_mq(User, groups:list_groups_per_user(User) ).
|
|
|
|
-
|
|
|
|
-build_user_relations(User, Groups) ->
|
|
|
|
- %% Feed Keys. Subscribe for self events, system and groups events
|
|
|
|
- %% feed.FeedOwnerType.FeedOwnerId.ElementType.ElementId.Action
|
|
|
|
- %% feed.system.ElementType.Action
|
|
|
|
- [rk_user_feed(User),
|
|
|
|
- rk( [db, user, User, put] ),
|
|
|
|
- rk( [subscription, user, User, add_to_group]),
|
|
|
|
- rk( [subscription, user, User, remove_from_group]),
|
|
|
|
- rk( [subscription, user, User, leave_group]),
|
|
|
|
- rk( [login, user, User, update_after_login]),
|
|
|
|
- rk( [likes, user, User, add_like]),
|
|
|
|
- rk( [personal_score, user, User, add]),
|
|
|
|
- rk( [feed, user, User, count_entry_in_statistics] ),
|
|
|
|
- rk( [feed, user, User, count_comment_in_statistics] ),
|
|
|
|
- rk( [feed, user, User, post_note] ),
|
|
|
|
- rk( [subscription, user, User, subscribe_user]),
|
|
|
|
- rk( [subscription, user, User, remove_subscribe]),
|
|
|
|
- rk( [subscription, user, User, set_user_game_status]),
|
|
|
|
- rk( [subscription, user, User, update_user]),
|
|
|
|
- rk( [subscription, user, User, block_user]),
|
|
|
|
- rk( [subscription, user, User, unblock_user]),
|
|
|
|
- rk( [affiliates, user, User, create_affiliate]),
|
|
|
|
- rk( [affiliates, user, User, delete_affiliate]),
|
|
|
|
- rk( [affiliates, user, User, enable_to_look_details]),
|
|
|
|
- rk( [affiliates, user, User, disable_to_look_details]),
|
|
|
|
- rk( [purchase, user, User, set_purchase_external_id]),
|
|
|
|
- rk( [purchase, user, User, set_purchase_state]),
|
|
|
|
- rk( [purchase, user, User, set_purchase_info]),
|
|
|
|
- rk( [purchase, user, User, add_purchase]),
|
|
|
|
- rk( [transaction, user, User, add_transaction]),
|
|
|
|
- rk( [invite, user, User, add_invite_to_issuer]),
|
|
|
|
- rk( [tournaments, user, User, create]),
|
|
|
|
- rk( [tournaments, user, User, create_and_join]),
|
|
|
|
- rk( [gifts, user, User, buy_gift]),
|
|
|
|
- rk( [gifts, user, User, give_gift]),
|
|
|
|
- rk( [gifts, user, User, mark_gift_as_deliving]),
|
|
|
|
- rk( [feed, system, '*', '*']) |
|
|
|
|
- [rk_group_feed(G) || G <- Groups]].
|
|
|
|
|
|
+ [bind_user_exchange(Channel, User, RK) || RK <- Relations],
|
|
|
|
+ mqs_channel:close(Channel);
|
|
|
|
|
|
-bind_user_exchange(Channel, User, RoutingKey) -> {bind, RoutingKey, mqs_channel:bind_exchange(Channel, ?USER_EXCHANGE(User), ?NOTIFICATIONS_EX, RoutingKey)}.
|
|
|
|
-unbind_user_exchange(Channel, User, RoutingKey) -> {unbind, RoutingKey, mqs_channel:unbind_exchange(Channel, ?USER_EXCHANGE(User), ?NOTIFICATIONS_EX, RoutingKey)}.
|
|
|
|
-bind_group_exchange(Channel, Group, RoutingKey) -> {bind, RoutingKey, mqs_channel:bind_exchange(Channel, ?GROUP_EXCHANGE(Group), ?NOTIFICATIONS_EX, RoutingKey)}.
|
|
|
|
-unbind_group_exchange(Channel, Group, RoutingKey) -> {unbind, RoutingKey, mqs_channel:unbind_exchange(Channel, ?GROUP_EXCHANGE(Group), ?NOTIFICATIONS_EX, RoutingKey)}.
|
|
|
|
-
|
|
|
|
-init_mq_for_group(Group) ->
|
|
|
|
|
|
+init_mq(Group=#group{}) ->
|
|
GroupExchange = ?GROUP_EXCHANGE(Group),
|
|
GroupExchange = ?GROUP_EXCHANGE(Group),
|
|
- ExchangeOptions = [{type, <<"fanout">>},
|
|
|
|
- durable,
|
|
|
|
- {auto_delete, false}],
|
|
|
|
|
|
+ ExchangeOptions = [{type, <<"fanout">>}, durable, {auto_delete, false}],
|
|
{ok, Channel} = mqs:open([]),
|
|
{ok, Channel} = mqs:open([]),
|
|
ok = mqs_channel:create_exchange(Channel, GroupExchange, ExchangeOptions),
|
|
ok = mqs_channel:create_exchange(Channel, GroupExchange, ExchangeOptions),
|
|
Relations = build_group_relations(Group),
|
|
Relations = build_group_relations(Group),
|
|
[bind_group_exchange(Channel, Group, RK) || RK <- Relations],
|
|
[bind_group_exchange(Channel, Group, RK) || RK <- Relations],
|
|
- mqs_channel:close(Channel),
|
|
|
|
- ok.
|
|
|
|
-
|
|
|
|
-build_group_relations(Group) ->
|
|
|
|
- [
|
|
|
|
- rk( [db, group, Group, put] ),
|
|
|
|
- rk( [db, group, Group, update_group] ),
|
|
|
|
- rk( [db, group, Group, remove_group] ),
|
|
|
|
- rk( [likes, group, Group, add_like]), % for comet mostly
|
|
|
|
- rk( [feed, delete, Group] ),
|
|
|
|
- rk( [feed, group, Group, '*', '*', '*'] )
|
|
|
|
|
|
+ mqs_channel:close(Channel).
|
|
|
|
+
|
|
|
|
+build_user_relations(User, Groups) -> [
|
|
|
|
+ rk( [db, user, User, put] ),
|
|
|
|
+ rk( [subscription, user, User, add_to_group]),
|
|
|
|
+ rk( [subscription, user, User, remove_from_group]),
|
|
|
|
+ rk( [subscription, user, User, leave_group]),
|
|
|
|
+ rk( [login, user, User, update_after_login]),
|
|
|
|
+ rk( [likes, user, User, add_like]),
|
|
|
|
+ rk( [feed, delete, User]),
|
|
|
|
+ rk( [feed, user, User, '*', '*', '*']),
|
|
|
|
+ rk( [feed, user, User, count_entry_in_statistics] ),
|
|
|
|
+ rk( [feed, user, User, count_comment_in_statistics] ),
|
|
|
|
+ rk( [feed, user, User, post_note] ),
|
|
|
|
+ rk( [subscription, user, User, subscribe_user]),
|
|
|
|
+ rk( [subscription, user, User, remove_subscribe]),
|
|
|
|
+ rk( [subscription, user, User, set_user_game_status]),
|
|
|
|
+ rk( [subscription, user, User, update_user]),
|
|
|
|
+ rk( [subscription, user, User, block_user]),
|
|
|
|
+ rk( [subscription, user, User, unblock_user]),
|
|
|
|
+ rk( [payment, user, User, set_purchase_external_id]),
|
|
|
|
+ rk( [payment, user, User, set_purchase_state]),
|
|
|
|
+ rk( [payment, user, User, set_purchase_info]),
|
|
|
|
+ rk( [payment, user, User, add]),
|
|
|
|
+ rk( [transaction, user, User, add]),
|
|
|
|
+ rk( [invite, user, User, add]),
|
|
|
|
+ rk( [meeting, user, User, create]),
|
|
|
|
+ rk( [meeting, user, User, join]),
|
|
|
|
+ rk( [purchase, user, User, buy_gift]),
|
|
|
|
+ rk( [purchase, user, User, give_gift]),
|
|
|
|
+ rk( [purchase, user, User, mark_gift_as_deliving]),
|
|
|
|
+ rk( [feed, system, '*', '*']) |
|
|
|
|
+ [rk_group_feed(G) || G <- Groups]
|
|
].
|
|
].
|
|
|
|
|
|
|
|
+build_group_relations(Group) -> [
|
|
|
|
+ rk( [db, group, Group, put] ),
|
|
|
|
+ rk( [db, group, Group, update_group] ),
|
|
|
|
+ rk( [db, group, Group, remove_group] ),
|
|
|
|
+ rk( [likes, group, Group, add_like]), % for comet mostly
|
|
|
|
+ rk( [feed, delete, Group] ),
|
|
|
|
+ rk( [feed, group, Group, '*', '*', '*'] )
|
|
|
|
+ ].
|
|
|
|
|
|
-rk(List) -> mqs_lib:list_to_key(List).
|
|
|
|
rk_user_feed(User) -> rk([feed, user, User, '*', '*', '*']).
|
|
rk_user_feed(User) -> rk([feed, user, User, '*', '*', '*']).
|
|
rk_group_feed(Group) -> rk([feed, group, Group, '*', '*', '*']).
|
|
rk_group_feed(Group) -> rk([feed, group, Group, '*', '*', '*']).
|
|
|
|
|
|
|
|
+bind_user_exchange(Channel, User, RoutingKey) -> {bind, RoutingKey, mqs_channel:bind_exchange(Channel, ?USER_EXCHANGE(User), ?NOTIFICATIONS_EX, RoutingKey)}.
|
|
|
|
+unbind_user_exchange(Channel, User, RoutingKey) -> {unbind, RoutingKey, mqs_channel:unbind_exchange(Channel, ?USER_EXCHANGE(User), ?NOTIFICATIONS_EX, RoutingKey)}.
|
|
|
|
+bind_group_exchange(Channel, Group, RoutingKey) -> {bind, RoutingKey, mqs_channel:bind_exchange(Channel, ?GROUP_EXCHANGE(Group), ?NOTIFICATIONS_EX, RoutingKey)}.
|
|
|
|
+unbind_group_exchange(Channel, Group, RoutingKey) -> {unbind, RoutingKey, mqs_channel:unbind_exchange(Channel, ?GROUP_EXCHANGE(Group), ?NOTIFICATIONS_EX, RoutingKey)}.
|
|
|
|
+
|
|
|
|
+rk(List) -> mqs_lib:list_to_key(List).
|
|
|
|
+
|
|
retrieve_connections(Id,Type) ->
|
|
retrieve_connections(Id,Type) ->
|
|
Friends = case Type of
|
|
Friends = case Type of
|
|
user -> users:list_subscr_usernames(Id);
|
|
user -> users:list_subscr_usernames(Id);
|