kvs_user.erl 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. -module(kvs_user).
  2. -copyright('Synrc Research Center s.r.o.').
  3. -include_lib("kvs/include/users.hrl").
  4. -include_lib("kvs/include/groups.hrl").
  5. -include_lib("kvs/include/accounts.hrl").
  6. -include_lib("kvs/include/config.hrl").
  7. -include_lib("kvs/include/feeds.hrl").
  8. -include_lib("kvs/include/feed_state.hrl").
  9. -include_lib("mqs/include/mqs.hrl").
  10. -compile(export_all).
  11. delete(UserName) ->
  12. case kvs_user:get(UserName) of
  13. {ok, User} ->
  14. GIds = kvs_group:participate(UserName),
  15. [ mqs:notify(["subscription", "user", UserName, "remove_from_group"], {GId}) || GId <- GIds ],
  16. F2U = [ {MeId, FrId} || #subscription{who = MeId, whom = FrId} <- subscriptions(User) ],
  17. [ unsubscribe(MeId, FrId) || {MeId, FrId} <- F2U ],
  18. [ unsubscribe(FrId, MeId) || {MeId, FrId} <- F2U ],
  19. kvs:delete(user_status, UserName),
  20. kvs:delete(user, UserName),
  21. {ok, User};
  22. E -> E end.
  23. get({facebook, FBId}) -> user_by_facebook_id(FBId);
  24. get({googleplus, GId}) -> user_by_googleplus_id(GId).
  25. subscribe(Who, Whom) ->
  26. Record = #subscription{key={Who,Whom}, who = Who, whom = Whom},
  27. kvs:put(Record).
  28. unsubscribe(Who, Whom) ->
  29. case subscribed(Who, Whom) of
  30. true -> kvs:delete(subscription, {Who, Whom});
  31. false -> skip end.
  32. subscriptions(undefined)-> [];
  33. subscriptions(#user{username = UId}) -> subscriptions(UId);
  34. subscriptions(UId) -> DBA=?DBA, DBA:subscriptions(UId).
  35. subscribed(Who) -> DBA=?DBA, DBA:subscribed(Who).
  36. subscribed(Who, Whom) ->
  37. case kvs:get(subscription, {Who, Whom}) of
  38. {ok, _} -> true;
  39. _ -> false end.
  40. subscription_mq(Type, Action, Who, Whom) ->
  41. case mqs:open([]) of
  42. {ok,Channel} ->
  43. case {Type,Action} of
  44. {user,add} -> mqs_channel:bind_exchange(Channel, ?USER_EXCHANGE(Who), ?NOTIFICATIONS_EX, rk_user_feed(Whom));
  45. {user,remove} -> mqs_channel:unbind_exchange(Channel, ?USER_EXCHANGE(Who), ?NOTIFICATIONS_EX, rk_user_feed(Whom)) end,
  46. mqs_channel:close(Channel);
  47. {error,Reason} -> error_logger:info_msg("subscription_mq error: ~p",[Reason]) end.
  48. init_mq(User=#user{}) ->
  49. Groups = kvs_group:participate(User),
  50. UserExchange = ?USER_EXCHANGE(User#user.username),
  51. ExchangeOptions = [{type, <<"fanout">>}, durable, {auto_delete, false}],
  52. case mqs:open([]) of
  53. {ok, Channel} ->
  54. error_logger:info_msg("Cration Exchange: ~p,",[{Channel,UserExchange,ExchangeOptions}]),
  55. mqs_channel:create_exchange(Channel, UserExchange, ExchangeOptions),
  56. Relations = build_user_relations(User, Groups),
  57. [ mqs_channel:bind_exchange(Channel, ?USER_EXCHANGE(User#user.username), ?NOTIFICATIONS_EX, Route) || Route <- Relations],
  58. mqs_channel:close(Channel);
  59. {error,Reason} -> error_logger:info_msg("init_mq error: ~p",[Reason]) end.
  60. build_user_relations(User, Groups) -> [
  61. mqs:key( [kvs_user, '*', User]),
  62. mqs:key( [kvs_feed, user, User, '*', '*', '*']),
  63. mqs:key( [kvs_feed, user, User, '*'] ),
  64. mqs:key( [kvs_payment, user, User, '*']),
  65. mqs:key( [kvs_account, user, User, '*']),
  66. mqs:key( [kvs_meeting, user, User, '*']),
  67. mqs:key( [kvs_purchase, user, User, '*']) |
  68. [ mqs:key( [kvs_feed, group, G, '*', '*', '*']) || G <- Groups ]
  69. ].
  70. rk_user_feed(User) -> mqs:key([kvs_feed, user, User, '*', '*', '*']).
  71. retrieve_connections(Id,Type) ->
  72. Friends = case Type of
  73. user -> kvs_user:list_subscr_usernames(Id);
  74. _ -> kvs_group:list_group_members(Id) end,
  75. case Friends of
  76. [] -> [];
  77. Full -> Sub = lists:sublist(Full, 10),
  78. case Sub of
  79. [] -> [];
  80. _ ->
  81. Data = [begin
  82. case kvs:get(user,Who) of
  83. {ok,User} -> RealName = kvs_user:user_realname_user(User),
  84. Paid = kvs_payment:user_paid(Who),
  85. {Who,Paid,RealName};
  86. _ -> undefined end end || Who <- Sub],
  87. [ X || X <- Data, X/=undefined ] end end.
  88. user_by_facebook_id(FBId) ->
  89. case kvs:get(facebook,FBId) of
  90. {ok,{_,User,_}} -> kvs:get(user,User);
  91. Else -> Else end.
  92. user_by_googleplus_id(GId) ->
  93. case kvs:get(googleplus,GId) of
  94. {ok,{_,User,_}} -> kvs:get(user,User);
  95. Else -> Else end.
  96. handle_notice([kvs_user, user, registered], {_,_,#user{id=Who}=U}, #state{owner=Who}=State)->
  97. error_logger:info_msg("Notification about registered me ~p", [U]),
  98. kvs_account:create_account(Who),
  99. {ok, DefaultQuota} = kvs:get(config, "accounts/default_quota", 300),
  100. kvs_account:transaction(Who, quota, DefaultQuota, #tx_default_assignment{}),
  101. {noreply, State};
  102. handle_notice([kvs_user, login, user, Who, update_status],
  103. Message,
  104. #state{owner=Who} = State) ->
  105. error_logger:info_msg("update status ~p", [Who]),
  106. Update = case kvs:get(user_status, Who) of
  107. {error, not_found} -> #user_status{email = Who, last_login = erlang:now()};
  108. {ok, UserStatus} -> UserStatus#user_status{last_login = erlang:now()} end,
  109. kvs:put(Update),
  110. {noreply, State};
  111. handle_notice(["kvs_user", "subscribe", Who] = Route,
  112. Message, #state{owner = Owner, type =Type} = State) ->
  113. {Whom} = Message,
  114. kvs_user:subscribe(Who, Whom),
  115. subscription_mq(user, add, Who, Whom),
  116. {noreply, State};
  117. handle_notice(["kvs_user", "unsubscribe", Who] = Route,
  118. Message, #state{owner = Owner, type =Type} = State) ->
  119. {Whom} = Message,
  120. kvs_user:unsubscribe(Who, Whom),
  121. subscription_mq(user, remove, Who, Whom),
  122. {noreply, State};
  123. handle_notice(["kvs_user", "update", Who] = Route,
  124. Message, #state{owner = Owner, type =Type} = State) ->
  125. {NewUser} = Message,
  126. kvs:put(NewUser),
  127. {noreply, State};
  128. handle_notice(Route, Message, State) ->
  129. %error_logger:info_msg("Unknown USERS notice"),
  130. {noreply, State}.