kvs_user.erl 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. -module(kvs_user).
  2. -copyright('Synrc Research Center s.r.o.').
  3. -include_lib("kvs/include/users.hrl").
  4. -include_lib("kvs/include/groups.hrl").
  5. -include_lib("kvs/include/accounts.hrl").
  6. -include_lib("kvs/include/log.hrl").
  7. -include_lib("kvs/include/config.hrl").
  8. -include_lib("kvs/include/feed_state.hrl").
  9. -include_lib("mqs/include/mqs.hrl").
  10. -compile(export_all).
  11. register(#user{username=UserName, email=Email} = Registration) ->
  12. EmailUser = case check_username(UserName) of
  13. {error, Reason} -> {error, Reason};
  14. {ok, Name} -> case kvs_user:get({email, Email}) of
  15. {error, _} -> {ok, Name};
  16. {ok, _} -> {error, email_taken} end end,
  17. GroupUser = case EmailUser of
  18. {error, Reason2} -> {error, Reason2};
  19. {ok, Name2} -> case kvs:get(group, Name2) of
  20. {error, _} -> {ok, Name2};
  21. {ok,_} -> {error, username_taken} end end,
  22. case GroupUser of
  23. {ok, Name3} -> process_register(Registration#user{username=Name3});
  24. Error -> Error end.
  25. process_register(#user{email=E} = RegisterData0) ->
  26. HashedPassword = case RegisterData0#user.password of
  27. undefined -> undefined;
  28. PlainPassword -> kvs:sha(PlainPassword) end,
  29. RegisterData = RegisterData0#user {
  30. feed = kvs_feed:create(),
  31. direct = kvs_feed:create(),
  32. pinned = kvs_feed:create(),
  33. starred = kvs_feed:create(),
  34. password = HashedPassword },
  35. error_logger:info_msg("PUT USER ~p", [E]),
  36. kvs:put(RegisterData),
  37. % kvs_account:create_account(E),
  38. % {ok, DefaultQuota} = kvs:get(config, "accounts/default_quota", 300),
  39. % kvs_account:transaction(E, quota, DefaultQuota, #tx_default_assignment{}),
  40. % init_mq(RegisterData),
  41. % mqs:notify([user, init], {E, RegisterData#user.feed}),
  42. {ok, RegisterData}.
  43. check_username(Name) ->
  44. case kvs_user:get(Name) of
  45. {error, _} -> {ok, Name};
  46. {ok, User} -> check_username(Name ++ integer_to_list(crypto:rand_uniform(0,10))) end.
  47. delete(UserName) ->
  48. case kvs_user:get(UserName) of
  49. {ok, User} ->
  50. GIds = kvs_group:participate(UserName),
  51. [ mqs:notify(["subscription", "user", UserName, "remove_from_group"], {GId}) || GId <- GIds ],
  52. F2U = [ {MeId, FrId} || #subscription{who = MeId, whom = FrId} <- subscriptions(User) ],
  53. [ unsubscribe(MeId, FrId) || {MeId, FrId} <- F2U ],
  54. [ unsubscribe(FrId, MeId) || {MeId, FrId} <- F2U ],
  55. kvs:delete(user_status, UserName),
  56. kvs:delete(user, UserName),
  57. {ok, User};
  58. E -> E end.
  59. get({facebook, FBId}) -> user_by_facebook_id(FBId);
  60. get({googleplus, GId}) -> user_by_googleplus_id(GId);
  61. get({email, Email}) -> user_by_email(Email);
  62. get(UId) -> kvs:get(user, UId).
  63. subscribe(Who, Whom) ->
  64. Record = #subscription{key={Who,Whom}, who = Who, whom = Whom},
  65. kvs:put(Record).
  66. unsubscribe(Who, Whom) ->
  67. case subscribed(Who, Whom) of
  68. true -> kvs:delete(subscription, {Who, Whom});
  69. false -> skip end.
  70. subscriptions(undefined)-> [];
  71. subscriptions(#user{username = UId}) -> subscriptions(UId);
  72. subscriptions(UId) -> DBA=?DBA, DBA:subscriptions(UId).
  73. subscribed(Who) -> DBA=?DBA, DBA:subscribed(Who).
  74. subscribed(Who, Whom) ->
  75. case kvs:get(subscription, {Who, Whom}) of
  76. {ok, _} -> true;
  77. _ -> false end.
  78. subscription_mq(Type, Action, Who, Whom) ->
  79. case mqs:open([]) of
  80. {ok,Channel} ->
  81. case {Type,Action} of
  82. {user,add} -> mqs_channel:bind_exchange(Channel, ?USER_EXCHANGE(Who), ?NOTIFICATIONS_EX, rk_user_feed(Whom));
  83. {user,remove} -> mqs_channel:unbind_exchange(Channel, ?USER_EXCHANGE(Who), ?NOTIFICATIONS_EX, rk_user_feed(Whom)) end,
  84. mqs_channel:close(Channel);
  85. {error,Reason} -> ?ERROR("subscription_mq error: ~p",[Reason]) end.
  86. init_mq(User=#user{}) ->
  87. Groups = kvs_group:participate(User),
  88. UserExchange = ?USER_EXCHANGE(User#user.username),
  89. ExchangeOptions = [{type, <<"fanout">>}, durable, {auto_delete, false}],
  90. case mqs:open([]) of
  91. {ok, Channel} ->
  92. ?INFO("Cration Exchange: ~p,",[{Channel,UserExchange,ExchangeOptions}]),
  93. mqs_channel:create_exchange(Channel, UserExchange, ExchangeOptions),
  94. Relations = build_user_relations(User, Groups),
  95. [ mqs_channel:bind_exchange(Channel, ?USER_EXCHANGE(User#user.username), ?NOTIFICATIONS_EX, Route) || Route <- Relations],
  96. mqs_channel:close(Channel);
  97. {error,Reason} -> ?ERROR("init_mq error: ~p",[Reason]) end.
  98. build_user_relations(User, Groups) -> [
  99. mqs:key( [kvs_user, '*', User]),
  100. mqs:key( [kvs_feed, user, User, '*', '*', '*']),
  101. mqs:key( [kvs_feed, user, User, '*'] ),
  102. mqs:key( [kvs_payment, user, User, '*']),
  103. mqs:key( [kvs_account, user, User, '*']),
  104. mqs:key( [kvs_meeting, user, User, '*']),
  105. mqs:key( [kvs_purchase, user, User, '*']) |
  106. [ mqs:key( [kvs_feed, group, G, '*', '*', '*']) || G <- Groups ]
  107. ].
  108. rk_user_feed(User) -> mqs:key([kvs_feed, user, User, '*', '*', '*']).
  109. retrieve_connections(Id,Type) ->
  110. Friends = case Type of
  111. user -> kvs_user:list_subscr_usernames(Id);
  112. _ -> kvs_group:list_group_members(Id) end,
  113. case Friends of
  114. [] -> [];
  115. Full -> Sub = lists:sublist(Full, 10),
  116. case Sub of
  117. [] -> [];
  118. _ ->
  119. Data = [begin
  120. case kvs:get(user,Who) of
  121. {ok,User} -> RealName = kvs_user:user_realname_user(User),
  122. Paid = kvs_payment:user_paid(Who),
  123. {Who,Paid,RealName};
  124. _ -> undefined end end || Who <- Sub],
  125. [ X || X <- Data, X/=undefined ] end end.
  126. user_by_facebook_id(FBId) ->
  127. case kvs:get(facebook,FBId) of
  128. {ok,{_,User,_}} -> kvs:get(user,User);
  129. Else -> Else end.
  130. user_by_googleplus_id(GId) ->
  131. case kvs:get(googleplus,GId) of
  132. {ok,{_,User,_}} -> kvs:get(user,User);
  133. Else -> Else end.
  134. user_by_email(Email) ->
  135. case kvs:get(email,Email) of
  136. {ok,{_,User,_}} -> kvs:get(user,User);
  137. Else -> Else end.
  138. handle_notice(["kvs_user", "subscribe", Who] = Route,
  139. Message, #state{owner = Owner, type =Type} = State) ->
  140. {Whom} = Message,
  141. kvs_user:subscribe(Who, Whom),
  142. subscription_mq(user, add, Who, Whom),
  143. {noreply, State};
  144. handle_notice(["kvs_user", "unsubscribe", Who] = Route,
  145. Message, #state{owner = Owner, type =Type} = State) ->
  146. {Whom} = Message,
  147. kvs_user:unsubscribe(Who, Whom),
  148. subscription_mq(user, remove, Who, Whom),
  149. {noreply, State};
  150. handle_notice(["kvs_user", "update", Who] = Route,
  151. Message, #state{owner = Owner, type =Type} = State) ->
  152. {NewUser} = Message,
  153. kvs:put(NewUser),
  154. {noreply, State};
  155. handle_notice(Route, Message, State) -> error_logger:info_msg("Unknown USERS notice").