kvs_user.erl 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. -module(kvs_user).
  2. -copyright('Synrc Research Center s.r.o.').
  3. -include_lib("kvs/include/users.hrl").
  4. -include_lib("kvs/include/groups.hrl").
  5. -include_lib("kvs/include/accounts.hrl").
  6. -include_lib("kvs/include/config.hrl").
  7. -include_lib("kvs/include/feeds.hrl").
  8. -include_lib("kvs/include/feed_state.hrl").
  9. -include_lib("mqs/include/mqs.hrl").
  10. -compile(export_all).
  11. register(#user{email=Email, feeds=Ch} = Registration, Feed) ->
  12. case kvs:get(iterator, Email) of {ok,_} -> {error, email_taken};
  13. {error, _} ->
  14. HashedPassword = case Registration#user.password of undefined -> undefined; PlainPassword -> kvs:sha(PlainPassword) end,
  15. RegisterData = Registration#user{feeds=[{Feed, kvs_feed:create()} || Feed <- Ch], password = HashedPassword},
  16. Next = undefined,
  17. Prev = case Feed#feed.top of undefined -> undefined;
  18. T -> case kvs:get(iterator, T) of {error,not_found} -> undefined;
  19. {ok, Top} -> UpdTop = Top#iterator{next=Email}, kvs:put(UpdTop), UpdTop#iterator.id end end,
  20. kvs:put(Feed#feed{top=Email, entries_count=Feed#feed.entries_count+1}),
  21. Iterator = #iterator{id=Email, object=RegisterData, next=Next, prev=Prev},
  22. kvs:put(Iterator),
  23. kvs:put(RegisterData),%todo: by_index support
  24. error_logger:info_msg("PUT USER: ~p", [RegisterData]),
  25. kvs_account:create_account(Email),
  26. {ok, DefaultQuota} = kvs:get(config, "accounts/default_quota", 300),
  27. kvs_account:transaction(Email, quota, DefaultQuota, #tx_default_assignment{}),
  28. {ok, RegisterData} end.
  29. delete(UserName) ->
  30. case kvs_user:get(UserName) of
  31. {ok, User} ->
  32. GIds = kvs_group:participate(UserName),
  33. [ mqs:notify(["subscription", "user", UserName, "remove_from_group"], {GId}) || GId <- GIds ],
  34. F2U = [ {MeId, FrId} || #subscription{who = MeId, whom = FrId} <- subscriptions(User) ],
  35. [ unsubscribe(MeId, FrId) || {MeId, FrId} <- F2U ],
  36. [ unsubscribe(FrId, MeId) || {MeId, FrId} <- F2U ],
  37. kvs:delete(user_status, UserName),
  38. kvs:delete(user, UserName),
  39. {ok, User};
  40. E -> E end.
  41. get({facebook, FBId}) -> user_by_facebook_id(FBId);
  42. get({googleplus, GId}) -> user_by_googleplus_id(GId);
  43. get(UId) -> kvs:get(user, UId).
  44. subscribe(Who, Whom) ->
  45. Record = #subscription{key={Who,Whom}, who = Who, whom = Whom},
  46. kvs:put(Record).
  47. unsubscribe(Who, Whom) ->
  48. case subscribed(Who, Whom) of
  49. true -> kvs:delete(subscription, {Who, Whom});
  50. false -> skip end.
  51. subscriptions(undefined)-> [];
  52. subscriptions(#user{username = UId}) -> subscriptions(UId);
  53. subscriptions(UId) -> DBA=?DBA, DBA:subscriptions(UId).
  54. subscribed(Who) -> DBA=?DBA, DBA:subscribed(Who).
  55. subscribed(Who, Whom) ->
  56. case kvs:get(subscription, {Who, Whom}) of
  57. {ok, _} -> true;
  58. _ -> false end.
  59. subscription_mq(Type, Action, Who, Whom) ->
  60. case mqs:open([]) of
  61. {ok,Channel} ->
  62. case {Type,Action} of
  63. {user,add} -> mqs_channel:bind_exchange(Channel, ?USER_EXCHANGE(Who), ?NOTIFICATIONS_EX, rk_user_feed(Whom));
  64. {user,remove} -> mqs_channel:unbind_exchange(Channel, ?USER_EXCHANGE(Who), ?NOTIFICATIONS_EX, rk_user_feed(Whom)) end,
  65. mqs_channel:close(Channel);
  66. {error,Reason} -> error_logger:info_msg("subscription_mq error: ~p",[Reason]) end.
  67. init_mq(User=#user{}) ->
  68. Groups = kvs_group:participate(User),
  69. UserExchange = ?USER_EXCHANGE(User#user.username),
  70. ExchangeOptions = [{type, <<"fanout">>}, durable, {auto_delete, false}],
  71. case mqs:open([]) of
  72. {ok, Channel} ->
  73. error_logger:info_msg("Cration Exchange: ~p,",[{Channel,UserExchange,ExchangeOptions}]),
  74. mqs_channel:create_exchange(Channel, UserExchange, ExchangeOptions),
  75. Relations = build_user_relations(User, Groups),
  76. [ mqs_channel:bind_exchange(Channel, ?USER_EXCHANGE(User#user.username), ?NOTIFICATIONS_EX, Route) || Route <- Relations],
  77. mqs_channel:close(Channel);
  78. {error,Reason} -> error_logger:info_msg("init_mq error: ~p",[Reason]) end.
  79. build_user_relations(User, Groups) -> [
  80. mqs:key( [kvs_user, '*', User]),
  81. mqs:key( [kvs_feed, user, User, '*', '*', '*']),
  82. mqs:key( [kvs_feed, user, User, '*'] ),
  83. mqs:key( [kvs_payment, user, User, '*']),
  84. mqs:key( [kvs_account, user, User, '*']),
  85. mqs:key( [kvs_meeting, user, User, '*']),
  86. mqs:key( [kvs_purchase, user, User, '*']) |
  87. [ mqs:key( [kvs_feed, group, G, '*', '*', '*']) || G <- Groups ]
  88. ].
  89. rk_user_feed(User) -> mqs:key([kvs_feed, user, User, '*', '*', '*']).
  90. retrieve_connections(Id,Type) ->
  91. Friends = case Type of
  92. user -> kvs_user:list_subscr_usernames(Id);
  93. _ -> kvs_group:list_group_members(Id) end,
  94. case Friends of
  95. [] -> [];
  96. Full -> Sub = lists:sublist(Full, 10),
  97. case Sub of
  98. [] -> [];
  99. _ ->
  100. Data = [begin
  101. case kvs:get(user,Who) of
  102. {ok,User} -> RealName = kvs_user:user_realname_user(User),
  103. Paid = kvs_payment:user_paid(Who),
  104. {Who,Paid,RealName};
  105. _ -> undefined end end || Who <- Sub],
  106. [ X || X <- Data, X/=undefined ] end end.
  107. user_by_facebook_id(FBId) ->
  108. case kvs:get(facebook,FBId) of
  109. {ok,{_,User,_}} -> kvs:get(user,User);
  110. Else -> Else end.
  111. user_by_googleplus_id(GId) ->
  112. case kvs:get(googleplus,GId) of
  113. {ok,{_,User,_}} -> kvs:get(user,User);
  114. Else -> Else end.
  115. handle_notice([kvs_user, login, user, Who, update_status],
  116. Message,
  117. #state{owner=Who} = State) ->
  118. error_logger:info_msg("update status ~p", [Who]),
  119. Update = case kvs:get(user_status, Who) of
  120. {error, not_found} -> #user_status{email = Who, last_login = erlang:now()};
  121. {ok, UserStatus} -> UserStatus#user_status{last_login = erlang:now()} end,
  122. kvs:put(Update),
  123. {noreply, State};
  124. handle_notice(["kvs_user", "subscribe", Who] = Route,
  125. Message, #state{owner = Owner, type =Type} = State) ->
  126. {Whom} = Message,
  127. kvs_user:subscribe(Who, Whom),
  128. subscription_mq(user, add, Who, Whom),
  129. {noreply, State};
  130. handle_notice(["kvs_user", "unsubscribe", Who] = Route,
  131. Message, #state{owner = Owner, type =Type} = State) ->
  132. {Whom} = Message,
  133. kvs_user:unsubscribe(Who, Whom),
  134. subscription_mq(user, remove, Who, Whom),
  135. {noreply, State};
  136. handle_notice(["kvs_user", "update", Who] = Route,
  137. Message, #state{owner = Owner, type =Type} = State) ->
  138. {NewUser} = Message,
  139. kvs:put(NewUser),
  140. {noreply, State};
  141. handle_notice(Route, Message, State) ->
  142. %error_logger:info_msg("Unknown USERS notice"),
  143. {noreply, State}.