kvs_user.erl 15 KB


  1. -module(kvs_user).
  2. -include_lib("kvs/include/users.hrl").
  3. -include_lib("kvs/include/groups.hrl").
  4. -include_lib("kvs/include/accounts.hrl").
  5. -include_lib("kvs/include/log.hrl").
  6. -include_lib("kvs/include/feed_state.hrl").
  7. -include_lib("mqs/include/mqs.hrl").
  8. -compile(export_all).
  9. register(#user{username=U, email=Email, facebook_id = FbId} = RegisterData0) ->
  10. FindUser = case check_username(U, FbId) of
  11. {error, E} -> {error, E};
  12. {ok, NewName} -> case kvs_users:get({email, Email}) of
  13. {error, _} -> {ok, NewName};
  14. {ok, _} -> {error, email_taken} end end,
  15. FindUser2 = case FindUser of
  16. {ok, UserName} -> case kvs_group:get(UserName) of
  17. {error, _} -> {ok, UserName};
  18. _ -> {error, username_taken} end;
  19. A -> A end,
  20. case FindUser2 of
  21. {ok, Name} -> process_register(RegisterData0#user{username=Name});
  22. {error, username_taken} -> {error, user_exist};
  23. {error, email_taken} -> {error, email_taken} end.
  24. process_register(#user{username=U} = RegisterData0) ->
  25. HashedPassword = case RegisterData0#user.password of
  26. undefined -> undefined;
  27. PlainPassword -> utils:sha(PlainPassword) end,
  28. RegisterData = RegisterData0#user {
  29. feed = kvs:feed_create(),
  30. direct = kvs:feed_create(),
  31. pinned = kvs:feed_create(),
  32. starred = kvs:feed_create(),
  33. password = HashedPassword },
  34. kvs:put(RegisterData),
  35. kvs_account:create_account(U),
  36. {ok, DefaultQuota} = kvs:get(config, "accounts/default_quota", 300),
  37. kvs_account:transaction(U, quota, DefaultQuota, #tx_default_assignment{}),
  38. init_mq(U),
  39. {ok, U}.
  40. check_username(Name, FbId) ->
  41. case kvs_users:get(Name) of
  42. {error, notfound} -> {ok, Name};
  43. {ok, User} when FbId =/= undefined -> check_username(User#user.username ++ integer_to_list(crypto:rand_uniform(0,10)), FbId);
  44. {ok, _}-> {error, username_taken} end.
  45. delete(UserName) ->
  46. case kvs_users:get(UserName) of
  47. {ok, User} -> GIds = kvs_group:list_groups_per_user(UserName),
  48. [mqs:notify(["subscription", "user", UserName, "remove_from_group"], {GId}) || GId <- GIds],
  49. F2U = [ {MeId, FrId} || #subscription{who = MeId, whom = FrId} <- subscriptions(User) ],
  50. [ unsubscribe(MeId, FrId) || {MeId, FrId} <- F2U ],
  51. [ unsubscribe(FrId, MeId) || {MeId, FrId} <- F2U ],
  52. kvs:delete(user_status, UserName),
  53. kvs:delete(user, UserName),
  54. {ok, User};
  55. E -> E end.
  56. get({username, UserName}) -> kvs:user_by_username(UserName);
  57. get({facebook, FBId}) -> kvs:user_by_facebook_id(FBId);
  58. get({email, Email}) -> kvs:user_by_email(Email);
  59. get(UId) -> kvs:get(user, UId).
  60. subscribe(Who, Whom) ->
  61. Record = #subscription{key={Who,Whom}, who = Who, whom = Whom},
  62. kvs:put(Record),
  63. subscription_mq(user, add, Who, Whom).
  64. unsubscribe(Who, Whom) ->
  65. case subscribed(Who, Whom) of
  66. true -> kvs:delete(subscription, {Who, Whom}),
  67. subscription_mq(user, remove, Who, Whom);
  68. false -> skip end.
  69. subscriptions(undefined)-> [];
  70. subscriptions(#user{username = UId}) -> subscriptions(UId);
  71. subscriptions(UId) when is_list(UId) -> lists:sort( kvs:all_by_index(subs, <<"subs_who_bin">>, list_to_binary(UId)) ).
  72. subscribed(Who, Whom) ->
  73. case kvs:get(subscription, {Who, Whom}) of
  74. {ok, _} -> true;
  75. _ -> false end.
  76. update_user(#user{username=UId,name=Name,surname=Surname} = NewUser) ->
  77. OldUser = case kvs:get(user,UId) of
  78. {error,notfound} -> NewUser;
  79. {ok,#user{}=User} -> User
  80. end,
  81. kvs:put(NewUser),
  82. case Name==OldUser#user.name andalso Surname==OldUser#user.surname of
  83. true -> ok;
  84. false -> kvs:update_user_name(UId,Name,Surname)
  85. end.
  86. subscription_mq(Type, Action, MeId, ToId) ->
  87. case mqs:open([]) of
  88. {ok,Channel} -> Routes = case Type of user -> rk_user_feed(ToId); group -> rk_group_feed(ToId) end,
  89. case Action of
  90. add -> bind_user_exchange(Channel, MeId, Routes);
  91. remove -> unbind_user_exchange(Channel, MeId, Routes) end,
  92. mqs_channel:close(Channel);
  93. {error,Reason} -> ?ERROR("subscription_mq error: ~p",[Reason]) end.
  94. init_mq(User=#user{}) ->
  95. Groups = kvs_group:participate(User),
  96. ?INFO("~p init mq. users: ~p", [User, Groups]),
  97. UserExchange = ?USER_EXCHANGE(User#user.username),
  98. ExchangeOptions = [{type, <<"fanout">>}, durable, {auto_delete, false}],
  99. case mqs:open([]) of
  100. {ok, Channel} ->
  101. ?INFO("Cration Exchange: ~p,",[{Channel,UserExchange,ExchangeOptions}]),
  102. mqs_channel:create_exchange(Channel, UserExchange, ExchangeOptions),
  103. Relations = build_user_relations(User, Groups),
  104. [bind_user_exchange(Channel, User, RK) || RK <- Relations],
  105. mqs_channel:close(Channel);
  106. {error,Reason} -> ?ERROR("init_mq error: ~p",[Reason]) end;
  107. init_mq(Group=#group{}) ->
  108. GroupExchange = ?GROUP_EXCHANGE(Group#group.username),
  109. ExchangeOptions = [{type, <<"fanout">>}, durable, {auto_delete, false}],
  110. case mqs:open([]) of
  111. {ok, Channel} ->
  112. mqs_channel:create_exchange(Channel, GroupExchange, ExchangeOptions),
  113. Relations = build_group_relations(Group),
  114. [bind_group_exchange(Channel, Group, RK) || RK <- Relations],
  115. mqs_channel:close(Channel);
  116. {error, Reason} -> ?ERROR("init_mq error: ~p",[Reason]) end.
  117. build_user_relations(User, Groups) -> [
  118. rk( [db, user, User, put] ),
  119. rk( [subscription, user, User, add_to_group]),
  120. rk( [subscription, user, User, remove_from_group]),
  121. rk( [subscription, user, User, leave_group]),
  122. rk( [login, user, User, update_after_login]),
  123. rk( [likes, user, User, add_like]),
  124. rk( [feed, delete, User]),
  125. rk( [feed, user, User, '*', '*', '*']),
  126. rk( [feed, user, User, count_entry_in_statistics] ),
  127. rk( [feed, user, User, count_comment_in_statistics] ),
  128. rk( [feed, user, User, post_note] ),
  129. rk( [subscription, user, User, subscribe_user]),
  130. rk( [subscription, user, User, remove_subscribe]),
  131. rk( [subscription, user, User, set_user_game_status]),
  132. rk( [subscription, user, User, update_user]),
  133. rk( [subscription, user, User, block_user]),
  134. rk( [subscription, user, User, unblock_user]),
  135. rk( [payment, user, User, set_purchase_external_id]),
  136. rk( [payment, user, User, set_purchase_state]),
  137. rk( [payment, user, User, set_purchase_info]),
  138. rk( [payment, user, User, add]),
  139. rk( [transaction, user, User, add]),
  140. rk( [invite, user, User, add]),
  141. rk( [meeting, user, User, create]),
  142. rk( [meeting, user, User, join]),
  143. rk( [purchase, user, User, buy_gift]),
  144. rk( [purchase, user, User, give_gift]),
  145. rk( [purchase, user, User, mark_gift_as_deliving]),
  146. rk( [feed, system, '*', '*']) |
  147. [rk_group_feed(G) || G <- Groups]
  148. ].
  149. build_group_relations(Group) -> [
  150. rk( [db, group, Group, put] ),
  151. rk( [db, group, Group, update_group] ),
  152. rk( [db, group, Group, remove_group] ),
  153. rk( [likes, group, Group, add_like]), % for comet mostly
  154. rk( [feed, delete, Group] ),
  155. rk( [feed, group, Group, '*', '*', '*'] )
  156. ].
  157. rk_user_feed(User) -> rk([feed, user, User, '*', '*', '*']).
  158. rk_group_feed(Group) -> rk([feed, group, Group, '*', '*', '*']).
  159. bind_user_exchange(Channel, User, RoutingKey) -> {bind, RoutingKey, mqs_channel:bind_exchange(Channel, ?USER_EXCHANGE(User), ?NOTIFICATIONS_EX, RoutingKey)}.
  160. unbind_user_exchange(Channel, User, RoutingKey) -> {unbind, RoutingKey, mqs_channel:unbind_exchange(Channel, ?USER_EXCHANGE(User), ?NOTIFICATIONS_EX, RoutingKey)}.
  161. bind_group_exchange(Channel, Group, RoutingKey) -> {bind, RoutingKey, mqs_channel:bind_exchange(Channel, ?GROUP_EXCHANGE(Group), ?NOTIFICATIONS_EX, RoutingKey)}.
  162. unbind_group_exchange(Channel, Group, RoutingKey) -> {unbind, RoutingKey, mqs_channel:unbind_exchange(Channel, ?GROUP_EXCHANGE(Group), ?NOTIFICATIONS_EX, RoutingKey)}.
  163. rk(List) -> mqs_lib:list_to_key(List).
  164. retrieve_connections(Id,Type) ->
  165. Friends = case Type of
  166. user -> kvs_users:list_subscr_usernames(Id);
  167. _ -> kvs_group:list_group_members(Id) end,
  168. case Friends of
  169. [] -> [];
  170. Full -> Sub = lists:sublist(Full, 10),
  171. case Sub of
  172. [] -> [];
  173. _ -> Data = [begin case kvs:get(user,Who) of
  174. {ok,User} -> RealName = kvs_users:user_realname_user(User),
  175. Paid = kvs_account:user_paid(Who),
  176. {Who,Paid,RealName};
  177. _ -> undefined end end || Who <- Sub],
  178. [X||X<-Data, X/=undefined] end end.
  179. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
  180. handle_notice(["system", "create_group"] = Route,
  181. Message, #state{owner = Owner, type =Type} = State) ->
  182. ?INFO("queue_action(~p): create_group: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
  183. {UId, GId, Name, Desc, Publicity} = Message,
  184. FId = kvs:feed_create(),
  185. CTime = erlang:now(),
  186. Group =#group{username = GId,
  187. name = Name,
  188. description = Desc,
  189. publicity = Publicity,
  190. creator = UId,
  191. created = CTime,
  192. owner = UId,
  193. feed = FId},
  194. kvs:put(Group),
  195. mqs:notify([group, init], {GId, FId}),
  196. kvs_users:init_mq(Group),
  197. {noreply, State};
  198. handle_notice(["db", "group", GId, "remove_group"] = Route,
  199. Message, #state{owner = Owner, type =Type} = State) ->
  200. ?INFO("queue_action(~p): remove_group: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
  201. {_, Group} = kvs_group:get_group(GId),
  202. case Group of
  203. notfound -> ok;
  204. _ ->
  205. mqs:notify([feed, delete, GId], empty),
  206. kvs:delete_by_index(group_subs, <<"group_subs_group_id_bin">>, GId),
  207. kvs:delete(feed, Group#group.feed),
  208. kvs:delete(group, GId),
  209. % unbind exchange
  210. {ok, Channel} = mqs:open([]),
  211. Routes = kvs_users:rk_group_feed(GId),
  212. kvs_users:unbind_group_exchange(Channel, GId, Routes),
  213. mqs_channel:close(Channel)
  214. end,
  215. {noreply, State};
  216. handle_notice(["subscription", "user", UId, "add_to_group"] = Route,
  217. Message, #state{owner = Owner, type =Type} = State) ->
  218. ?INFO("queue_action(~p): add_to_group: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
  219. {GId, Who, UType} = Message,
  220. case kvs:get(group_subs, {UId, GId}) of
  221. {error, notfound} ->
  222. {R, Group} = kvs:get(group, GId),
  223. case R of
  224. error -> ?INFO("Add to group failed reading group");
  225. _ ->
  226. GU = Group#group.users_count,
  227. kvs:put(Group#group{users_count = GU+1})
  228. end;
  229. _ ->
  230. ok
  231. end,
  232. OK = kvs:put({group_subs,UId,GId,Type,0}),
  233. % add_to_group(Who, GId, UType),
  234. ?INFO("add ~p to group ~p with Type ~p by ~p", [Who, GId,UType,UId]),
  235. kvs_users:subscribemq(group, add, Who, GId),
  236. {noreply, State};
  237. handle_notice(["subscription", "user", UId, "remove_from_group"] = Route,
  238. Message, #state{owner = Owner, type =Type} = State) ->
  239. ?INFO("queue_action(~p): remove_from_group: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
  240. {GId} = Message,
  241. ?INFO("remove ~p from group ~p", [UId, GId]),
  242. kvs_users:remove_subscription_mq(group, UId, GId),
  243. kvs:delete(group_subs, {UId, GId}),
  244. {R, Group} = kvs:get(group, GId),
  245. case R of
  246. error -> ?INFO("Remove ~p from group failed reading group ~p", [UId, GId]);
  247. _ ->
  248. GU = Group#group.users_count,
  249. kvs:put(Group#group{users_count = GU-1})
  250. end,
  251. {noreply, State};
  252. handle_notice(["subscription", "user", UId, "leave_group"] = Route,
  253. Message, #state{owner = Owner, type =Type} = State) ->
  254. ?INFO(" queue_action(~p): leave_group: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
  255. {GId} = Message,
  256. {R, Group} = kvs:get(group, GId),
  257. case R of
  258. error -> ?ERROR(" Error reading group ~p for leave_group", [GId]);
  259. ok ->
  260. case Group#group.owner of
  261. UId -> % User is owner, transfer ownership to someone else
  262. Members = kvs_group:list_group_members(GId),
  263. case Members of
  264. [ FirstOne | _ ] ->
  265. ok = kvs:put(Group#group{owner = FirstOne}),
  266. mqs:notify(["subscription", "user", UId, "remove_from_group"], {GId});
  267. [] ->
  268. % Nobody left in group, remove group at all
  269. mqs:notify([db, group, GId, remove_group], [])
  270. end;
  271. _ -> % Plain user removes -- just remove it
  272. mqs:notify(["subscription", "user", UId, "remove_from_group"], {GId})
  273. end;
  274. _ -> % user is just someone, remove it
  275. mqs:notify(["subscription", "user", UId, "remove_from_group"], {GId})
  276. end,
  277. {noreply, State};
  278. handle_notice(["subscription", "user", UId, "subscribe"] = Route,
  279. Message, #state{owner = Owner, type =Type} = State) ->
  280. ?INFO(" queue_action(~p): subscribe_user: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
  281. {Whom} = Message,
  282. kvs_users:subscribe(UId, Whom),
  283. {noreply, State};
  284. handle_notice(["subscription", "user", UId, "unsubscribe"] = Route,
  285. Message, #state{owner = Owner, type =Type} = State) ->
  286. ?INFO(" queue_action(~p): remove_subscribe: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
  287. {Whom} = Message,
  288. kvs_users:unsubscribe(UId, Whom),
  289. {noreply, State};
  290. handle_notice(["subscription", "user", _UId, "update"] = Route,
  291. Message, #state{owner = Owner, type =Type} = State) ->
  292. ?INFO(" queue_action(~p): update_user: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
  293. {NewUser} = Message,
  294. kvs_users:update_user(NewUser),
  295. {noreply, State};
  296. handle_notice(["login", "user", UId, "update_after_login"] = Route,
  297. Message, #state{owner = Owner, type =Type} = State) ->
  298. ?INFO("queue_action(~p): update_after_login: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
  299. Update = case kvs_users:user_status(UId) of
  300. {error, status_info_not_found} -> #user_status{username = UId, last_login = erlang:now()};
  301. {ok, UserStatus} -> UserStatus#user_status{last_login = erlang:now()} end,
  302. kvs:put(Update),
  303. {noreply, State};
  304. handle_notice(Route, Message, State) -> error_logger:info_msg("Unknown USERS notice").
  305. %%%%%%%%%%%%%%%%%%%%%%
  306. % user backlinks
  307. user_by_verification_code(Code) ->
  308. case kvs:get(code,Code) of
  309. {ok,{_,User,_}} -> kvs:get(user,User);
  310. Else -> Else end.
  311. user_by_facebook_id(FBId) ->
  312. case kvs:get(facebook,FBId) of
  313. {ok,{_,User,_}} -> kvs:get(user,User);
  314. Else -> Else end.
  315. user_by_email(Email) ->
  316. case kvs:get(email,Email) of
  317. {ok,{_,User,_}} -> kvs:get(user,User);
  318. Else -> Else end.
  319. user_by_username(Name) ->
  320. case X = kvs:get(user,Name) of
  321. {ok,_Res} -> X;
  322. Else -> Else end.