123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369 |
- -module(kvs_user).
- -include_lib("kvs/include/users.hrl").
- -include_lib("kvs/include/groups.hrl").
- -include_lib("kvs/include/accounts.hrl").
- -include_lib("kvs/include/log.hrl").
- -include_lib("kvs/include/feed_state.hrl").
- -include_lib("mqs/include/mqs.hrl").
- -compile(export_all).
- register(#user{username=U, email=Email, facebook_id = FbId} = RegisterData0) ->
- FindUser = case check_username(U, FbId) of
- {error, E} -> {error, E};
- {ok, NewName} -> case kvs_users:get({email, Email}) of
- {error, _} -> {ok, NewName};
- {ok, _} -> {error, email_taken} end end,
- FindUser2 = case FindUser of
- {ok, UserName} -> case kvs_group:get(UserName) of
- {error, _} -> {ok, UserName};
- _ -> {error, username_taken} end;
- A -> A end,
- case FindUser2 of
- {ok, Name} -> process_register(RegisterData0#user{username=Name});
- {error, username_taken} -> {error, user_exist};
- {error, email_taken} -> {error, email_taken} end.
- process_register(#user{username=U} = RegisterData0) ->
- HashedPassword = case RegisterData0#user.password of
- undefined -> undefined;
- PlainPassword -> utils:sha(PlainPassword) end,
- RegisterData = RegisterData0#user {
- feed = kvs:feed_create(),
- direct = kvs:feed_create(),
- pinned = kvs:feed_create(),
- starred = kvs:feed_create(),
- password = HashedPassword },
- kvs:put(RegisterData),
- kvs_account:create_account(U),
- {ok, DefaultQuota} = kvs:get(config, "accounts/default_quota", 300),
- kvs_account:transaction(U, quota, DefaultQuota, #tx_default_assignment{}),
- init_mq(U),
- {ok, U}.
- check_username(Name, FbId) ->
- case kvs_users:get(Name) of
- {error, notfound} -> {ok, Name};
- {ok, User} when FbId =/= undefined -> check_username(User#user.username ++ integer_to_list(crypto:rand_uniform(0,10)), FbId);
- {ok, _}-> {error, username_taken} end.
- delete(UserName) ->
- case kvs_users:get(UserName) of
- {ok, User} -> GIds = kvs_group:list_groups_per_user(UserName),
- [mqs:notify(["subscription", "user", UserName, "remove_from_group"], {GId}) || GId <- GIds],
- F2U = [ {MeId, FrId} || #subscription{who = MeId, whom = FrId} <- subscriptions(User) ],
- [ unsubscribe(MeId, FrId) || {MeId, FrId} <- F2U ],
- [ unsubscribe(FrId, MeId) || {MeId, FrId} <- F2U ],
- kvs:delete(user_status, UserName),
- kvs:delete(user, UserName),
- {ok, User};
- E -> E end.
- get({username, UserName}) -> kvs:user_by_username(UserName);
- get({facebook, FBId}) -> kvs:user_by_facebook_id(FBId);
- get({email, Email}) -> kvs:user_by_email(Email);
- get(UId) -> kvs:get(user, UId).
- subscribe(Who, Whom) ->
- Record = #subscription{key={Who,Whom}, who = Who, whom = Whom},
- kvs:put(Record),
- subscription_mq(user, add, Who, Whom).
- unsubscribe(Who, Whom) ->
- case subscribed(Who, Whom) of
- true -> kvs:delete(subscription, {Who, Whom}),
- subscription_mq(user, remove, Who, Whom);
- false -> skip end.
- subscriptions(undefined)-> [];
- subscriptions(#user{username = UId}) -> subscriptions(UId);
- subscriptions(UId) when is_list(UId) -> lists:sort( kvs:all_by_index(subs, <<"subs_who_bin">>, list_to_binary(UId)) ).
- subscribed(Who, Whom) ->
- case kvs:get(subscription, {Who, Whom}) of
- {ok, _} -> true;
- _ -> false end.
- update_user(#user{username=UId,name=Name,surname=Surname} = NewUser) ->
- OldUser = case kvs:get(user,UId) of
- {error,notfound} -> NewUser;
- {ok,#user{}=User} -> User
- end,
- kvs:put(NewUser),
- case Name==OldUser#user.name andalso Surname==OldUser#user.surname of
- true -> ok;
- false -> kvs:update_user_name(UId,Name,Surname)
- end.
- subscription_mq(Type, Action, MeId, ToId) ->
- case mqs:open([]) of
- {ok,Channel} -> Routes = case Type of user -> rk_user_feed(ToId); group -> rk_group_feed(ToId) end,
- case Action of
- add -> bind_user_exchange(Channel, MeId, Routes);
- remove -> unbind_user_exchange(Channel, MeId, Routes) end,
- mqs_channel:close(Channel);
- {error,Reason} -> ?ERROR("subscription_mq error: ~p",[Reason]) end.
- init_mq(User=#user{}) ->
- Groups = kvs_group:participate(User),
- ?INFO("~p init mq. users: ~p", [User, Groups]),
- UserExchange = ?USER_EXCHANGE(User#user.username),
- ExchangeOptions = [{type, <<"fanout">>}, durable, {auto_delete, false}],
- case mqs:open([]) of
- {ok, Channel} ->
- ?INFO("Cration Exchange: ~p,",[{Channel,UserExchange,ExchangeOptions}]),
- mqs_channel:create_exchange(Channel, UserExchange, ExchangeOptions),
- Relations = build_user_relations(User, Groups),
- [bind_user_exchange(Channel, User, RK) || RK <- Relations],
- mqs_channel:close(Channel);
- {error,Reason} -> ?ERROR("init_mq error: ~p",[Reason]) end;
- init_mq(Group=#group{}) ->
- GroupExchange = ?GROUP_EXCHANGE(Group#group.username),
- ExchangeOptions = [{type, <<"fanout">>}, durable, {auto_delete, false}],
- case mqs:open([]) of
- {ok, Channel} ->
- mqs_channel:create_exchange(Channel, GroupExchange, ExchangeOptions),
- Relations = build_group_relations(Group),
- [bind_group_exchange(Channel, Group, RK) || RK <- Relations],
- mqs_channel:close(Channel);
- {error, Reason} -> ?ERROR("init_mq error: ~p",[Reason]) end.
- build_user_relations(User, Groups) -> [
- rk( [db, user, User, put] ),
- rk( [subscription, user, User, add_to_group]),
- rk( [subscription, user, User, remove_from_group]),
- rk( [subscription, user, User, leave_group]),
- rk( [login, user, User, update_after_login]),
- rk( [likes, user, User, add_like]),
- rk( [feed, delete, User]),
- rk( [feed, user, User, '*', '*', '*']),
- rk( [feed, user, User, count_entry_in_statistics] ),
- rk( [feed, user, User, count_comment_in_statistics] ),
- rk( [feed, user, User, post_note] ),
- rk( [subscription, user, User, subscribe_user]),
- rk( [subscription, user, User, remove_subscribe]),
- rk( [subscription, user, User, set_user_game_status]),
- rk( [subscription, user, User, update_user]),
- rk( [subscription, user, User, block_user]),
- rk( [subscription, user, User, unblock_user]),
- rk( [payment, user, User, set_purchase_external_id]),
- rk( [payment, user, User, set_purchase_state]),
- rk( [payment, user, User, set_purchase_info]),
- rk( [payment, user, User, add]),
- rk( [transaction, user, User, add]),
- rk( [invite, user, User, add]),
- rk( [meeting, user, User, create]),
- rk( [meeting, user, User, join]),
- rk( [purchase, user, User, buy_gift]),
- rk( [purchase, user, User, give_gift]),
- rk( [purchase, user, User, mark_gift_as_deliving]),
- rk( [feed, system, '*', '*']) |
- [rk_group_feed(G) || G <- Groups]
- ].
- build_group_relations(Group) -> [
- rk( [db, group, Group, put] ),
- rk( [db, group, Group, update_group] ),
- rk( [db, group, Group, remove_group] ),
- rk( [likes, group, Group, add_like]), % for comet mostly
- rk( [feed, delete, Group] ),
- rk( [feed, group, Group, '*', '*', '*'] )
- ].
- rk_user_feed(User) -> rk([feed, user, User, '*', '*', '*']).
- rk_group_feed(Group) -> rk([feed, group, Group, '*', '*', '*']).
- bind_user_exchange(Channel, User, RoutingKey) -> {bind, RoutingKey, mqs_channel:bind_exchange(Channel, ?USER_EXCHANGE(User), ?NOTIFICATIONS_EX, RoutingKey)}.
- unbind_user_exchange(Channel, User, RoutingKey) -> {unbind, RoutingKey, mqs_channel:unbind_exchange(Channel, ?USER_EXCHANGE(User), ?NOTIFICATIONS_EX, RoutingKey)}.
- bind_group_exchange(Channel, Group, RoutingKey) -> {bind, RoutingKey, mqs_channel:bind_exchange(Channel, ?GROUP_EXCHANGE(Group), ?NOTIFICATIONS_EX, RoutingKey)}.
- unbind_group_exchange(Channel, Group, RoutingKey) -> {unbind, RoutingKey, mqs_channel:unbind_exchange(Channel, ?GROUP_EXCHANGE(Group), ?NOTIFICATIONS_EX, RoutingKey)}.
- rk(List) -> mqs_lib:list_to_key(List).
- retrieve_connections(Id,Type) ->
- Friends = case Type of
- user -> kvs_users:list_subscr_usernames(Id);
- _ -> kvs_group:list_group_members(Id) end,
- case Friends of
- [] -> [];
- Full -> Sub = lists:sublist(Full, 10),
- case Sub of
- [] -> [];
- _ -> Data = [begin case kvs:get(user,Who) of
- {ok,User} -> RealName = kvs_users:user_realname_user(User),
- Paid = kvs_account:user_paid(Who),
- {Who,Paid,RealName};
- _ -> undefined end end || Who <- Sub],
- [X||X<-Data, X/=undefined] end end.
- %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
- handle_notice(["system", "create_group"] = Route,
- Message, #state{owner = Owner, type =Type} = State) ->
- ?INFO("queue_action(~p): create_group: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
- {UId, GId, Name, Desc, Publicity} = Message,
- FId = kvs:feed_create(),
- CTime = erlang:now(),
- Group =#group{username = GId,
- name = Name,
- description = Desc,
- publicity = Publicity,
- creator = UId,
- created = CTime,
- owner = UId,
- feed = FId},
- kvs:put(Group),
- mqs:notify([group, init], {GId, FId}),
- kvs_users:init_mq(Group),
- {noreply, State};
- handle_notice(["db", "group", GId, "remove_group"] = Route,
- Message, #state{owner = Owner, type =Type} = State) ->
- ?INFO("queue_action(~p): remove_group: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
- {_, Group} = kvs_group:get_group(GId),
- case Group of
- notfound -> ok;
- _ ->
- mqs:notify([feed, delete, GId], empty),
- kvs:delete_by_index(group_subs, <<"group_subs_group_id_bin">>, GId),
- kvs:delete(feed, Group#group.feed),
- kvs:delete(group, GId),
- % unbind exchange
- {ok, Channel} = mqs:open([]),
- Routes = kvs_users:rk_group_feed(GId),
- kvs_users:unbind_group_exchange(Channel, GId, Routes),
- mqs_channel:close(Channel)
- end,
- {noreply, State};
- handle_notice(["subscription", "user", UId, "add_to_group"] = Route,
- Message, #state{owner = Owner, type =Type} = State) ->
- ?INFO("queue_action(~p): add_to_group: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
- {GId, Who, UType} = Message,
- case kvs:get(group_subs, {UId, GId}) of
- {error, notfound} ->
- {R, Group} = kvs:get(group, GId),
- case R of
- error -> ?INFO("Add to group failed reading group");
- _ ->
- GU = Group#group.users_count,
- kvs:put(Group#group{users_count = GU+1})
- end;
- _ ->
- ok
- end,
- OK = kvs:put({group_subs,UId,GId,Type,0}),
- % add_to_group(Who, GId, UType),
- ?INFO("add ~p to group ~p with Type ~p by ~p", [Who, GId,UType,UId]),
- kvs_users:subscribemq(group, add, Who, GId),
- {noreply, State};
- handle_notice(["subscription", "user", UId, "remove_from_group"] = Route,
- Message, #state{owner = Owner, type =Type} = State) ->
- ?INFO("queue_action(~p): remove_from_group: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
- {GId} = Message,
- ?INFO("remove ~p from group ~p", [UId, GId]),
- kvs_users:remove_subscription_mq(group, UId, GId),
- kvs:delete(group_subs, {UId, GId}),
- {R, Group} = kvs:get(group, GId),
- case R of
- error -> ?INFO("Remove ~p from group failed reading group ~p", [UId, GId]);
- _ ->
- GU = Group#group.users_count,
- kvs:put(Group#group{users_count = GU-1})
- end,
- {noreply, State};
- handle_notice(["subscription", "user", UId, "leave_group"] = Route,
- Message, #state{owner = Owner, type =Type} = State) ->
- ?INFO(" queue_action(~p): leave_group: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
- {GId} = Message,
- {R, Group} = kvs:get(group, GId),
- case R of
- error -> ?ERROR(" Error reading group ~p for leave_group", [GId]);
- ok ->
- case Group#group.owner of
- UId -> % User is owner, transfer ownership to someone else
- Members = kvs_group:list_group_members(GId),
- case Members of
- [ FirstOne | _ ] ->
- ok = kvs:put(Group#group{owner = FirstOne}),
- mqs:notify(["subscription", "user", UId, "remove_from_group"], {GId});
- [] ->
- % Nobody left in group, remove group at all
- mqs:notify([db, group, GId, remove_group], [])
- end;
- _ -> % Plain user removes -- just remove it
- mqs:notify(["subscription", "user", UId, "remove_from_group"], {GId})
- end;
- _ -> % user is just someone, remove it
- mqs:notify(["subscription", "user", UId, "remove_from_group"], {GId})
- end,
- {noreply, State};
- handle_notice(["subscription", "user", UId, "subscribe"] = Route,
- Message, #state{owner = Owner, type =Type} = State) ->
- ?INFO(" queue_action(~p): subscribe_user: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
- {Whom} = Message,
- kvs_users:subscribe(UId, Whom),
- {noreply, State};
- handle_notice(["subscription", "user", UId, "unsubscribe"] = Route,
- Message, #state{owner = Owner, type =Type} = State) ->
- ?INFO(" queue_action(~p): remove_subscribe: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
- {Whom} = Message,
- kvs_users:unsubscribe(UId, Whom),
- {noreply, State};
- handle_notice(["subscription", "user", _UId, "update"] = Route,
- Message, #state{owner = Owner, type =Type} = State) ->
- ?INFO(" queue_action(~p): update_user: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
- {NewUser} = Message,
- kvs_users:update_user(NewUser),
- {noreply, State};
- handle_notice(["login", "user", UId, "update_after_login"] = Route,
- Message, #state{owner = Owner, type =Type} = State) ->
- ?INFO("queue_action(~p): update_after_login: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
- Update = case kvs_users:user_status(UId) of
- {error, status_info_not_found} -> #user_status{username = UId, last_login = erlang:now()};
- {ok, UserStatus} -> UserStatus#user_status{last_login = erlang:now()} end,
- kvs:put(Update),
- {noreply, State};
- handle_notice(Route, Message, State) -> error_logger:info_msg("Unknown USERS notice").
- %%%%%%%%%%%%%%%%%%%%%%
- % user backlinks
- user_by_verification_code(Code) ->
- case kvs:get(code,Code) of
- {ok,{_,User,_}} -> kvs:get(user,User);
- Else -> Else end.
- user_by_facebook_id(FBId) ->
- case kvs:get(facebook,FBId) of
- {ok,{_,User,_}} -> kvs:get(user,User);
- Else -> Else end.
- user_by_email(Email) ->
- case kvs:get(email,Email) of
- {ok,{_,User,_}} -> kvs:get(user,User);
- Else -> Else end.
- user_by_username(Name) ->
- case X = kvs:get(user,Name) of
- {ok,_Res} -> X;
- Else -> Else end.
|