Maxim Sokhatsky 12 лет назад
Родитель
Сommit
68ed8a2b1b
8 измененных файлов с 264 добавлено и 443 удалено
  1. 1 0
      src/feed_server_api.erl
  2. 0 22
      src/kvs.erl
  3. 2 4
      src/kvs_account.erl
  4. 2 7
      src/kvs_feed.erl
  5. 179 125
      src/kvs_group.erl
  6. 10 33
      src/kvs_meeting.erl
  7. 4 4
      src/kvs_payment.erl
  8. 66 248
      src/kvs_user.erl

+ 1 - 0
src/feed_server_api.erl

@@ -2,4 +2,5 @@
 
 
 handle([Module|Parameters],Message,State) ->
 handle([Module|Parameters],Message,State) ->
     Module = list_to_atom(binary_to_list(term_to_binary(Module))),
     Module = list_to_atom(binary_to_list(term_to_binary(Module))),
+    error_logger:info_msg("handle_notice Route: ~p Message ~p",[[Module|Parameters],Message]),
     Module:handle_notice([Module|Parameters],Message,State).
     Module:handle_notice([Module|Parameters],Message,State).

+ 0 - 22
src/kvs.erl

@@ -363,28 +363,6 @@ handle_notice(["kvs","system", "delete"] = Route,
     kvs:delete(Where, What),
     kvs:delete(Where, What),
     {noreply, State};
     {noreply, State};
 
 
-handle_notice(["db", "group", GroupId, "update_group"] = Route, 
-    Message, #state{owner=ThisGroupOwner, type=Type} = State) ->
-    ?INFO("queue_action(~p): update_group: Owner=~p, Route=~p, Message=~p", [self(), {Type, ThisGroupOwner}, Route, Message]),    
-    {_UId, _GroupUsername, Name, Description, Owner, Publicity} = Message,
-    SanePublicity = case Publicity of
-        "public" -> public;
-        "moderated" -> moderated;
-        "private" -> private;
-        _ -> undefined
-    end,
-    SaneOwner = case kvs:get(user, Owner) of
-        {ok, _} -> Owner;
-        _ -> undefined
-    end,
-    {ok, #group{}=Group} = kvs:get(group, GroupId),
-    NewGroup = Group#group{
-                   name = coalesce(Name,Group#group.name),
-                   description = coalesce(Description,Group#group.description),
-                   publicity = coalesce(SanePublicity,Group#group.publicity),
-                   owner = coalesce(SaneOwner,Group#group.owner)},
-    kvs:put(NewGroup),
-    {noreply, State};
 
 
 handle_notice(Route, Message, State) -> error_logger:info_msg("Unknown KVS notice").
 handle_notice(Route, Message, State) -> error_logger:info_msg("Unknown KVS notice").
 
 

+ 2 - 4
src/kvs_account.erl

@@ -115,19 +115,17 @@ tx_list(UserId, StartFrom, Limit) ->
 
 
 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 
 
-handle_notice(["transaction", "user", User, "add_transaction"] = Route,
+handle_notice(["kvs_account", "user", User, "transaction"] = Route,
     Message, #state{owner = Owner, type =Type} = State) ->
     Message, #state{owner = Owner, type =Type} = State) ->
     ?INFO("queue_action(~p): add_transaction: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),    
     ?INFO("queue_action(~p): add_transaction: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),    
     MP = Message,
     MP = Message,
-    kvs:add_transaction_to_user(User,MP),
+    add_transaction_to_user(User,MP),
     {noreply, State};
     {noreply, State};
 
 
 handle_notice(Route, Message, State) -> error_logger:info_msg("Unknown ACCOUNTS notice").
 handle_notice(Route, Message, State) -> error_logger:info_msg("Unknown ACCOUNTS notice").
 
 
-
 %%%%%%%%%%%%%%%%%%%%%%%%%
 %%%%%%%%%%%%%%%%%%%%%%%%%
 
 
-
 add_transaction_to_user(UserId,Purchase) ->
 add_transaction_to_user(UserId,Purchase) ->
     {ok,Team} = case kvs:get(user_transaction, UserId) of
     {ok,Team} = case kvs:get(user_transaction, UserId) of
                      {ok,T} -> {ok,T};
                      {ok,T} -> {ok,T};

+ 2 - 7
src/kvs_feed.erl

@@ -196,11 +196,6 @@ purge_unverified_feeds() ->
 
 
 %% MQ API
 %% MQ API
 
 
-handle_notice(["kvs_feed", "delete", Owner] = Route,
-    Message, #state{owner = Owner} = State) ->
-    ?INFO("feed(~p): notification received: User=~p, Route=~p, Message=~p", [self(), Owner, Route, Message]),
-    {stop, normal, State};
-
 handle_notice(["kvs_feed", "group", GroupId, "entry", EntryId, "add"] = Route, [From|_] = Message, #state{owner = Owner, feed = Feed} = State) ->
 handle_notice(["kvs_feed", "group", GroupId, "entry", EntryId, "add"] = Route, [From|_] = Message, #state{owner = Owner, feed = Feed} = State) ->
     ?INFO("feed(~p): group message: Owner=~p, Route=~p, Message=~p", [self(), Owner, Route, Message]),
     ?INFO("feed(~p): group message: Owner=~p, Route=~p, Message=~p", [self(), Owner, Route, Message]),
     [From, _Destinations, Desc, Medias] = Message,
     [From, _Destinations, Desc, Medias] = Message,
@@ -211,7 +206,7 @@ handle_notice(["kvs_feed", "group", GroupId, "entry", EntryId, "add"] = Route, [
             {ok, Group} = kvs:get(group, GroupId),
             {ok, Group} = kvs:get(group, GroupId),
             GE = Group#group.entries_count,
             GE = Group#group.entries_count,
             kvs:put(Group#group{entries_count = GE+1}),
             kvs:put(Group#group{entries_count = GE+1}),
-            {ok, Subs} = kvs:get(group_subs, {From, GroupId}),
+            {ok, Subs} = kvs:get(group_subscription, {From, GroupId}),
             SE = Subs#group_subscription.user_posts_count,
             SE = Subs#group_subscription.user_posts_count,
             kvs:put(Subs#group_subscription{user_posts_count = SE+1})
             kvs:put(Subs#group_subscription{user_posts_count = SE+1})
     end,
     end,
@@ -274,7 +269,7 @@ handle_notice(["feed", "user", UId, "post_note"] = Route,
     kvs_feed:add_entry(Feed, UId, [], Id, Note, [], {user, system_note}, ""),
     kvs_feed:add_entry(Feed, UId, [], Id, Note, [], {user, system_note}, ""),
     {noreply, State};
     {noreply, State};
 
 
-handle_notice(["kvs_feed", _, WhoShares, "entry", NewEntryId, "share"],
+handle_notice(["kvs_feed", _, WhoShares, "entry", NewEntryId, "share"] = Route,
                 #entry{entry_id = _EntryId, raw_description = Desc, media = Medias, to = Destinations,
                 #entry{entry_id = _EntryId, raw_description = Desc, media = Medias, to = Destinations,
                 from = From} = E, #state{feed = Feed, type = user} = State) ->
                 from = From} = E, #state{feed = Feed, type = user} = State) ->
     %% FIXME: sharing is like posting to the wall
     %% FIXME: sharing is like posting to the wall

+ 179 - 125
src/kvs_group.erl

@@ -2,148 +2,202 @@
 -compile(export_all).
 -compile(export_all).
 -include_lib("kvs/include/users.hrl").
 -include_lib("kvs/include/users.hrl").
 -include_lib("kvs/include/groups.hrl").
 -include_lib("kvs/include/groups.hrl").
--include_lib("kvs/include/feeds.hrl").
+-include_lib("kvs/include/accounts.hrl").
 -include_lib("kvs/include/log.hrl").
 -include_lib("kvs/include/log.hrl").
+-include_lib("kvs/include/feed_state.hrl").
+-include_lib("mqs/include/mqs.hrl").
 
 
 retrieve_groups(User) ->
 retrieve_groups(User) ->
     ?INFO("retrieve group for user: ~p",[User]),
     ?INFO("retrieve group for user: ~p",[User]),
     case participate(User) of
     case participate(User) of
          [] -> [];
          [] -> [];
          Gs -> UC_GId = lists:sublist(lists:reverse(
          Gs -> UC_GId = lists:sublist(lists:reverse(
-                              lists:sort([{group_members_count(GId), GId} || GId <- Gs])), 
+                              lists:sort([{members_count(GId), GId} || GId <- Gs])), 
                                     20),
                                     20),
-               Result = [begin case get_group(GId) of
+               Result = [begin case kvs:get(group,GId) of
                                    {ok, Group} -> {Group#group.name,GId,UC};
                                    {ok, Group} -> {Group#group.name,GId,UC};
                                    _ -> undefined end end || {UC, GId} <- UC_GId],
                                    _ -> undefined end end || {UC, GId} <- UC_GId],
                [X||X<-Result,X/=undefined] end.
                [X||X<-Result,X/=undefined] end.
 
 
-create_group_directly_to_db(UId, GId, Name, Desc, Publicity) ->
-    FId = kvs:feed_create(),
-    CTime = erlang:now(),
-    Group = #group{username = GId,
-                      name = Name,
-                      description = Desc,
-                      publicity = Publicity,
-                      creator = UId,
-                      created = CTime,
-                      owner = UId,
-                     feed = FId},
+create(Creator, GroupName, GroupFullName, Desc, Publicity) ->
+    Feed = kvs_feed:create(),
+    Time = erlang:now(),
+    Group = #group{username = GroupName, name = GroupFullName, description = Desc, publicity = Publicity,
+                   creator = Creator, created = Time, owner = Creator, feed = Feed},
     kvs:put(Group),
     kvs:put(Group),
-    kvs_users:init_mq(Group),
-    add_to_group_directly_to_db(UId, GId, member),
-    GId.
-
-add_to_group(Who, GId, Type, Owner) -> mqs:notify(["subscription", "user", Owner, "add_to_group"], {GId, Who, Type}).
-
-add_to_group_directly_to_db(UId, GId, Type) ->
-    kvs:put(#group_subscription{key={UId,GId},user_id=UId, group_id=GId, user_type=Type}),
-    {ok, Group} = kvs:get(group, GId),
-    GU = Group#group.users_count,
-    kvs:put(Group#group{users_count = GU+1}).
-
-delete_group(GId) ->
-    {_, Group} = get_group(GId),
-    case Group of 
-        notfound -> ok;
-        _ ->
-            mqs:notify([feed, delete, GId], empty),
-            kvs:delete_by_index(group_subscription, <<"group_subs_group_id_bin">>, GId),         
+    init_mq(Group),
+    mqs:notify([group, init], {GroupName, Feed}),
+    add(Creator, GroupName, member),
+    GroupName.
+
+
+delete(GroupName) ->
+    case kvs:get(group,GroupName) of 
+        {error,_} -> ok;
+        {ok, Group} ->
+            mqs:notify([feed, delete, GroupName], empty),
+            kvs:delete_by_index(group_subscription, <<"where_bin">>, GroupName),
             kvs:delete(feed, Group#group.feed),
             kvs:delete(feed, Group#group.feed),
-            kvs:delete(group, GId),
-            % unbind exchange
-            {ok, Channel} = mqs:open([]),
-            Routes = kvs_users:rk_group_feed(GId),
-            kvs_users:unbind_group_exchange(Channel, GId, Routes),
-            mqs_channel:close(Channel)
-    end.
-
-participate(UId) -> [GId || #group_subscription{group_id=GId} <- kvs:all_by_index(group_subs, <<"group_subs_user_id_bin">>, UId) ].
-members(GId) -> [UId || #group_subscription{user_id=UId, user_type=UT} <- kvs:all_by_index(group_subs, <<"group_subs_group_id_bin">>, GId), UT == member ].
-members_by_type(GId, Type) -> [UId || #group_subscription{user_id=UId, user_type=UT} <- kvs:all_by_index(group_subs, <<"group_subs_group_id_bin">>, GId), UT == Type ].
-members_with_types(GId) -> [{UId, UType} || #group_subscription{user_id=UId, user_type=UType} <- kvs:all_by_index(group_subs, <<"group_subs_group_id_bin">>, list_to_binary(GId)) ].
-
-get_group(GId) -> kvs:get(group, GId).
-
-user_is_owner(UId, GId) ->
-    {R, Group} = kvs:get(group, GId),
-    case R of
-        ok -> case Group#group.owner of
-                UId -> true;
-                _ -> false
-            end;
-        _ -> false
-    end.
-
-user_in_group(UId, GId) ->
-    case kvs:get(group_subs, {UId, GId}) of
-        {error, notfound} -> false;
-        _ -> true
-    end.
-
-group_user_type(UId, GId) ->
-    case kvs:get(group_subs, {UId, GId}) of
+            kvs:delete(group, GroupName),
+            case mqs:open([]) of
+                {ok, Channel} ->
+                    Routes = kvs_users:rk_group_feed(GroupName),
+                    kvs_users:unbind_group_exchange(Channel, GroupName, Routes),
+                    mqs_channel:close(Channel);
+                {error,Reason} -> ?ERROR("delete group failed: ~p",[Reason]) end end.
+
+participate(UserName) -> [GroupName || #group_subscription{group_id=GroupName} <- kvs:all_by_index(group_subscription, <<"who_bin">>, UserName) ].
+members(GroupName) -> [UserName || #group_subscription{user_id=UserName} <- kvs:all_by_index(group_subscription, <<"where_bin">>, GroupName) ].
+members_by_type(GroupName, Type) -> [UserName || #group_subscription{user_id=UserName, user_type=T} <- kvs:all_by_index(group_subscription, <<"where_bin">>, GroupName), T == Type ].
+members_with_types(GroupName) -> [{UserName, Type} || #group_subscription{user_id=UserName, user_type=Type} <- kvs:all_by_index(group_subscriptioin, <<"where_bin">>, list_to_binary(GroupName)) ].
+
+owner(UserName, GroupName) ->
+    case kvs:get(group, GroupName) of
+        {ok,Group} -> case Group#group.owner of UserName -> true;  _ -> false end;
+        _ -> false end.
+
+member(UserName, GroupName) ->
+    case kvs:get(group_subscription, {UserName, GroupName}) of
+        {error, _} -> false;
+        {ok,_} -> true end.
+
+member_type(UserName, GroupName) ->
+    case kvs:get(group_subs, {UserName, GroupName}) of
         {error, notfound} -> not_in_group;
         {error, notfound} -> not_in_group;
-        {ok, #group_subscription{user_type=Type}} -> Type
-    end.
-
-join_group(GId, User) ->
-    {ok, Group} = get_group(GId),
-    case Group of
-        #group{username = GId, publicity = public} ->
-            % Join to this group
-            add_to_group(User, GId, member, Group#group.owner),
+        {ok, #group_subscription{user_type=Type}} -> Type end.
+
+add(UserName, GroupName, Type) ->
+    kvs:put(#group_subscription{key={UserName,GroupName},user_id=UserName, group_id=GroupName, user_type=Type}),
+    {ok, Group} = kvs:get(group, GroupName),
+    Users = Group#group.users_count,
+    kvs:put(Group#group{users_count = Users + 1}).
+
+join(UserName,GroupName) ->
+    case kvs:get(group,GroupName) of
+        {ok, #group{username = GroupName, publicity = public}} ->
+            add(UserName, GroupName, member),
             {ok, joined};
             {ok, joined};
-        #group{username = GId, publicity = _, feed=_Feed} ->
-            case group_user_type(User, GId) of
+        {ok, #group{username = GroupName}} ->
+            case member_type(UserName, GroupName) of
                 member -> {ok, joined};
                 member -> {ok, joined};
                 req -> {error, already_sent};
                 req -> {error, already_sent};
-                reqrejected -> {error, request_rejected};
-                not_in_group -> add_to_group(User, GId, req, Group#group.owner), {ok, requested};
-                _ -> {error, unknown_type}
-            end;
-        _ -> {error, notfound}
-    end.
-
-approve_request(UId, GId, Owner) -> add_to_group(UId, GId, member, Owner).
-reject_request(UId, GId, Owner) -> add_to_group(UId, GId, reqrejected, Owner).
-change_group_user_type(UId, GId, Type) -> mqs:notify(["subscription", "user", UId, "add_to_group"], {GId, UId, Type}).
-
-group_exists(GId) ->
-    {R, _} = get_group(GId),
-    case R of
-        ok -> true;
-        _ -> false
-    end.
-
-group_publicity(GId) ->
-    {_, Group} = get_group(GId),
-    case Group of
-        notfound ->
-            no_such_group;
-        _ ->
-            Group#group.publicity
-    end.
-
-group_members_count(GId) ->
-    {_, Group} = get_group(GId),
-    case Group of
-        notfound ->
-            no_such_group;
-        _ ->
-            Group#group.users_count
-    end.
-
-user_has_access(UId, GId) ->
-    UType = group_user_type(UId, GId),
-    {_, Group} = get_group(GId),
-    case Group of
-        notfound ->
-            false;
-        _ ->
-            GPublicity = Group#group.publicity,
-            case {GPublicity, UType} of
+                req_rejected -> {error, request_rejected};
+                not_in_group -> add(UserName, GroupName, req), {ok, requested};
+                _ -> {error, unknown_type} end;
+        Error -> Error end.
+
+leave(UserName,GroupName) ->
+    kvs:delete(group_subscription, {UserName, GroupName}),
+    case kvs:get(group, GroupName) of
+        {error,_} -> ?ERROR("Remove ~p from group failed reading group ~p", [UserName, GroupName]);
+        {ok,Group} -> kvs:put(Group#group{users_count = Group#group.users_count - 1}) end.
+
+approve_request(UserName, GroupName) -> add(UserName, GroupName, member).
+reject_request(UserName, GroupName) -> add(UserName, GroupName, req_rejected).
+member_type(UserName, GroupName, Type) -> mqs:notify(["subscription", "user", GroupName, "add_to_group"], {GroupName, UserName, Type}).
+
+exists(GroupName) -> case kvs:get(group,GroupName) of {ok,_} -> true; _ -> false end.
+
+coalesce(undefined, B) -> B;
+coalesce(A, _) -> A.
+
+publicity(GroupName) ->
+    case kvs:get(group,GroupName) of
+        {error,_} -> no_such_group;
+        {ok,Group} -> Group#group.publicity end.
+
+members_count(GroupName) ->
+    case kvs:get(group,GroupName) of
+        {error,_} -> no_such_group;
+        {ok,Group} -> Group#group.users_count end.
+
+user_has_access(UserName, GroupName) ->
+    Type = member_type(UserName, GroupName),
+    case kvs:get(group, GroupName) of
+        {error,_} -> false;
+        {ok,Group} ->
+            Publicity = Group#group.publicity,
+            case {Publicity, Type} of
                 {public, _} -> true;
                 {public, _} -> true;
                 {private, member} -> true;
                 {private, member} -> true;
-                _ -> false
-            end
-    end.
+                _ -> false end end.
+
+handle_notice(["kvs_group", "create"] = Route, 
+    Message, #state{owner = Owner, type =Type} = State) ->
+    ?INFO("queue_action(~p): create_group: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
+    {Creator, GroupName, FullName, Desc, Publicity} = Message,
+    create(Creator, GroupName, FullName, Desc, Publicity),
+    {noreply, State};
+
+handle_notice(["kvs_group", "update", GroupName] = Route, 
+    Message, #state{owner=ThisGroupOwner, type=Type} = State) ->
+    ?INFO("queue_action(~p): update_group: Owner=~p, Route=~p, Message=~p", [self(), {Type, ThisGroupOwner}, Route, Message]),    
+    {_UId, _GroupUsername, Name, Description, Owner, Publicity} = Message,
+    case kvs:get(group, GroupName) of
+        {ok, Group} ->
+            NewGroup = Group#group{name = coalesce(Name,Group#group.name),
+                description = coalesce(Description,Group#group.description),
+                publicity = coalesce(Publicity,Group#group.publicity),
+                owner = coalesce(Owner,Group#group.owner)},
+            kvs:put(NewGroup);
+        {error,Reason} -> ?ERROR("Cannot update group ~p",[Reason]) end,
+    {noreply, State};
+
+handle_notice(["kvs_group", "remove", GroupName] = Route, 
+    Message, #state{owner = Owner, type = Type} = State) ->
+    ?INFO("queue_action(~p): remove_group: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
+    delete(GroupName),
+    {noreply, State};
+
+handle_notice(["kvs_group", "join", GroupName] = Route,
+    Message, #state{owner = Owner, type =Type} = State) ->
+    {GroupName, UserName, Type} = Message,
+    join(UserName, GroupName),
+    subscription_mq(group, add, UserName, GroupName),
+    {noreply, State};
+
+handle_notice(["kvs_group", "leave", GroupName] = Route,
+    Message, #state{owner = Owner, type =Type} = State) ->
+    ?INFO("queue_action(~p): remove_from_group: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
+    {UserName} = Message,
+    leave(UserName,GroupName),
+    subscription_mq(group, remove, UserName, GroupName),
+    {noreply, State};
+
+handle_notice(Route, Message, State) -> error_logger:info_msg("Unknown GROUP notice").
+
+build_group_relations(Group) -> [
+    msq:key( [kvs_group, create] ),
+    msq:key( [kvs_group, update, Group] ),
+    msq:key( [kvs_group, remove, Group] ),
+    msq:key( [kvs_group, join, Group] ),
+    msq:key( [kvs_group, leave, Group] ),
+    msq:key( [kvs_group, like, Group]),   % for comet mostly
+    msq:key( [kvs_feed, delete, Group] ),
+    msq:key( [kvs_feed, group, Group, '*', '*', '*'] )
+    ].
+
+init_mq(Group=#group{}) ->
+    GroupExchange = ?GROUP_EXCHANGE(Group#group.username),
+    ExchangeOptions = [{type, <<"fanout">>}, durable, {auto_delete, false}],
+    case mqs:open([]) of
+        {ok, Channel} ->
+            mqs_channel:create_exchange(Channel, GroupExchange, ExchangeOptions),
+            Relations = build_group_relations(Group),
+            [bind_group_exchange(Channel, Group, RK) || RK <- Relations],
+            mqs_channel:close(Channel);
+        {error, Reason} -> ?ERROR("init_mq error: ~p",[Reason]) end.
+
+rk_group_feed(Group) -> mqs_lib:list_to_key([feed, group, Group, '*', '*', '*']).
+
+bind_group_exchange(Channel, Group, Route) -> {bind, Route, mqs_channel:bind_exchange(Channel, ?GROUP_EXCHANGE(Group), ?NOTIFICATIONS_EX, Route)}.
+unbind_group_exchange(Channel, Group, Route) -> {unbind, Route, mqs_channel:unbind_exchange(Channel, ?GROUP_EXCHANGE(Group), ?NOTIFICATIONS_EX, Route)}.
+
+subscription_mq(Type, Action, MeId, ToId) ->
+    case mqs:open([]) of
+        {ok,Channel} ->
+            case {Type,Action} of 
+                {group,add} -> bind_group_exchange(Channel, MeId, rk_group_feed(ToId));
+                {group,remove} -> unbind_group_exchange(Channel, MeId, rk_group_feed(ToId)) end,
+            mqs_channel:close(Channel);
+        {error,Reason} -> ?ERROR("subscription_mq error: ~p",[Reason]) end.

+ 10 - 33
src/kvs_meeting.erl

@@ -14,26 +14,11 @@ create(UID, Name) -> create(UID, Name, "", date(), time(), 100, 100, undefined,
 create(UID, Name, Desc, Date, Time, Players, Quota, Awards, Type, Game, Mode, Tours, Speed) ->
 create(UID, Name, Desc, Date, Time, Players, Quota, Awards, Type, Game, Mode, Tours, Speed) ->
     NodeAtom = nsx_opt:get_env(store,game_srv_node,'game@doxtop.cc'),
     NodeAtom = nsx_opt:get_env(store,game_srv_node,'game@doxtop.cc'),
     TID = rpc:call(NodeAtom, game_manager, gen_game_id, []),
     TID = rpc:call(NodeAtom, game_manager, gen_game_id, []),
-
     CTime = erlang:now(),
     CTime = erlang:now(),
-    ok = kvs:put(#meeting{name = Name,
-                                   id = TID,
-                                   description = Desc,
-                                   quota = Quota,
-                                   players_count = Players,
-                                   start_date = Date,
-                                   awards = Awards,
-                                   creator = UID,
-                                   created = CTime,
-                                   game_type = Game,
-                                   game_mode = Mode,
-                                   type = Type,
-                                   tours = Tours,
-                                   speed = Speed,
-                                   start_time = Time,
-                                   status = created,
-                                   owner = UID}),
-
+    kvs:put(#meeting{name = Name, id = TID, description = Desc, quota = Quota,
+        players_count = Players, start_date = Date, awards = Awards,
+        creator = UID, created = CTime, game_type = Game, game_mode = Mode,
+        type = Type, tours = Tours, speed = Speed, start_time = Time, status = created, owner = UID}),
     TID.
     TID.
 
 
 get(TID) ->
 get(TID) ->
@@ -60,39 +45,34 @@ clear() -> [destroy(T#meeting.id) || T <- kvs:all(meeting)].
 lost() -> lists:usort([erlang:element(3, I) || I <- kvs:all(play_record)]).
 lost() -> lists:usort([erlang:element(3, I) || I <- kvs:all(play_record)]).
 fake_join(TID) -> [kvs_meeting:join(auth:ima_gio2(X),TID)||X<-lists:seq(1,30)].
 fake_join(TID) -> [kvs_meeting:join(auth:ima_gio2(X),TID)||X<-lists:seq(1,30)].
 
 
-%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-
-handle_notice(["tournaments", "user", UId, "create"] = Route,
+handle_notice(["kvs_meeting", "user", UId, "create"] = Route,
     Message, #state{owner = Owner, type =Type} = State) ->
     Message, #state{owner = Owner, type =Type} = State) ->
     ?INFO("queue_action(~p): create: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
     ?INFO("queue_action(~p): create: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
     {TourName, TourDesc, {Y,M,D}, Time, MaxPlayers, Quota, Award, TourType, GameType} = Message,
     {TourName, TourDesc, {Y,M,D}, Time, MaxPlayers, Quota, Award, TourType, GameType} = Message,
     case kvs_meeting:create(UId, TourName, TourDesc, {Y,M,D}, Time, MaxPlayers, Quota, Award, TourType, GameType) of
     case kvs_meeting:create(UId, TourName, TourDesc, {Y,M,D}, Time, MaxPlayers, Quota, Award, TourType, GameType) of
         {error,X} -> 
         {error,X} -> 
             ?ERROR("Error creating tournament: ~p", X);
             ?ERROR("Error creating tournament: ~p", X);
-        TId -> skip
-    end,
+        TId -> skip end,
     {noreply, State};
     {noreply, State};
 
 
-handle_notice(["tournaments", "user", UId, "create_and_join"] = Route,
+handle_notice(["kvs_meeting", "user", UId, "create_and_join"] = Route,
     Message, #state{owner = Owner, type =Type} = State) ->
     Message, #state{owner = Owner, type =Type} = State) ->
     ?INFO("queue_action(~p): create_and_join: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
     ?INFO("queue_action(~p): create_and_join: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
     {TourName, TourDesc, {Y,M,D}, Time, MaxPlayers, Quota, Award, TourType, GameType} = Message,
     {TourName, TourDesc, {Y,M,D}, Time, MaxPlayers, Quota, Award, TourType, GameType} = Message,
     case kvs_meeting:create(UId, TourName, TourDesc, {Y,M,D}, Time, MaxPlayers, Quota, Award, TourType, GameType) of
     case kvs_meeting:create(UId, TourName, TourDesc, {Y,M,D}, Time, MaxPlayers, Quota, Award, TourType, GameType) of
         {error,X} -> 
         {error,X} -> 
             ?ERROR("Error creating tournament: ~p", X);
             ?ERROR("Error creating tournament: ~p", X);
-        TId -> 
-            kvs_meeting:join(UId, TId)
-    end,
+        TId -> kvs_meeting:join(UId, TId) end,
     {noreply, State};
     {noreply, State};
 
 
-handle_notice(["system", "tournament_join"] = Route,
+handle_notice(["kvs_meeting", "user", UId, "join"] = Route,
     Message, #state{owner = Owner, type =Type} = State) ->
     Message, #state{owner = Owner, type =Type} = State) ->
     ?INFO("queue_action(~p): tournament_join: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
     ?INFO("queue_action(~p): tournament_join: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
     {UId, TId} = Message,
     {UId, TId} = Message,
     kvs_meeting:join(UId, TId),
     kvs_meeting:join(UId, TId),
     {noreply, State};
     {noreply, State};
 
 
-handle_notice(["system", "tournament_remove"] = Route,
+handle_notice(["kvs_meeting", "user", UId, "leave"] = Route,
     Message, #state{owner = Owner, type =Type} = State) ->
     Message, #state{owner = Owner, type =Type} = State) ->
     ?INFO("queue_action(~p): tournament_remove: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
     ?INFO("queue_action(~p): tournament_remove: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
     {UId, TId} = Message,
     {UId, TId} = Message,
@@ -101,9 +81,6 @@ handle_notice(["system", "tournament_remove"] = Route,
 
 
 handle_notice(Route, Message, State) -> error_logger:info_msg("Unknown MEETINGS notice").
 handle_notice(Route, Message, State) -> error_logger:info_msg("Unknown MEETINGS notice").
 
 
-%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-
-
 join_tournament(UserId, TournamentId) ->
 join_tournament(UserId, TournamentId) ->
     case kvs:get(user, UserId) of
     case kvs:get(user, UserId) of
         {ok, User} ->
         {ok, User} ->

+ 4 - 4
src/kvs_payment.erl

@@ -132,28 +132,28 @@ payment_id() ->
     NextId = kvs:next_id("payment"),
     NextId = kvs:next_id("payment"),
     lists:concat([kvs_membership:timestamp(), "_", NextId]).
     lists:concat([kvs_membership:timestamp(), "_", NextId]).
 
 
-handle_notice(["kvs_payment", "user", _, "set_purchase_state"] = Route,
+handle_notice(["kvs_payment", "user", _, "set_state"] = Route,
     Message, #state{owner = Owner, type =Type} = State) ->
     Message, #state{owner = Owner, type =Type} = State) ->
     ?INFO("queue_action(~p): set_purchase_state: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),  
     ?INFO("queue_action(~p): set_purchase_state: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),  
     {MPId, NewState, Info} = Message,
     {MPId, NewState, Info} = Message,
     set_payment_state(MPId, NewState, Info),
     set_payment_state(MPId, NewState, Info),
     {noreply, State};
     {noreply, State};
 
 
-handle_notice(["kvs_payment", "user", _, "add_purchase"] = Route,
+handle_notice(["kvs_payment", "user", _, "add"] = Route,
     Message, #state{owner = Owner, type =Type} = State) ->
     Message, #state{owner = Owner, type =Type} = State) ->
     ?INFO("queue_action(~p): add_purchase: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),    
     ?INFO("queue_action(~p): add_purchase: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),    
     {MP} = Message,
     {MP} = Message,
     add_payment(MP),
     add_payment(MP),
     {noreply, State};
     {noreply, State};
 
 
-handle_notice(["kvs_payment", "user", _, "set_purchase_external_id"] = Route,
+handle_notice(["kvs_payment", "user", _, "set_external_id"] = Route,
     Message, #state{owner = Owner, type =Type} = State) ->
     Message, #state{owner = Owner, type =Type} = State) ->
     ?INFO("queue_action(~p): set_purchase_external_id: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
     ?INFO("queue_action(~p): set_purchase_external_id: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
     {PurchaseId, TxnId} = Message,
     {PurchaseId, TxnId} = Message,
     set_payment_external_id(PurchaseId, TxnId),
     set_payment_external_id(PurchaseId, TxnId),
     {noreply, State};
     {noreply, State};
 
 
-handle_notice(["kvs_payment", "user", _, "set_purchase_info"] = Route,
+handle_notice(["kvs_payment", "user", _, "set_info"] = Route,
     Message, #state{owner = Owner, type =Type} = State) ->
     Message, #state{owner = Owner, type =Type} = State) ->
     ?INFO("queue_action(~p): set_purchase_info: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
     ?INFO("queue_action(~p): set_purchase_info: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
     {OrderId, Info} = Message,
     {OrderId, Info} = Message,

+ 66 - 248
src/kvs_user.erl

@@ -41,6 +41,7 @@ process_register(#user{username=U} = RegisterData0) ->
     {ok, DefaultQuota} = kvs:get(config, "accounts/default_quota",  300),
     {ok, DefaultQuota} = kvs:get(config, "accounts/default_quota",  300),
     kvs_account:transaction(U, quota, DefaultQuota, #tx_default_assignment{}),
     kvs_account:transaction(U, quota, DefaultQuota, #tx_default_assignment{}),
     init_mq(U),
     init_mq(U),
+    mqs:notify([user, init], {U, RegisterData#user.feed}),
     {ok, U}.
     {ok, U}.
 
 
 check_username(Name, FbId) ->
 check_username(Name, FbId) ->
@@ -51,15 +52,16 @@ check_username(Name, FbId) ->
 
 
 delete(UserName) ->
 delete(UserName) ->
     case kvs_users:get(UserName) of
     case kvs_users:get(UserName) of
-        {ok, User} -> GIds = kvs_group:list_groups_per_user(UserName),
-                      [mqs:notify(["subscription", "user", UserName, "remove_from_group"], {GId}) || GId <- GIds],
-                      F2U = [ {MeId, FrId} || #subscription{who = MeId, whom = FrId} <- subscriptions(User) ],
-                      [ unsubscribe(MeId, FrId) || {MeId, FrId} <- F2U ],
-                      [ unsubscribe(FrId, MeId) || {MeId, FrId} <- F2U ],
-                      kvs:delete(user_status, UserName),
-                      kvs:delete(user, UserName),
-                      {ok, User};
-                 E -> E end.
+        {ok, User} ->
+            GIds = kvs_group:list_groups_per_user(UserName),
+            [ mqs:notify(["subscription", "user", UserName, "remove_from_group"], {GId}) || GId <- GIds ],
+            F2U = [ {MeId, FrId} || #subscription{who = MeId, whom = FrId} <- subscriptions(User) ],
+            [ unsubscribe(MeId, FrId) || {MeId, FrId} <- F2U ],
+            [ unsubscribe(FrId, MeId) || {MeId, FrId} <- F2U ],
+            kvs:delete(user_status, UserName),
+            kvs:delete(user, UserName),
+            {ok, User};
+        E -> E end.
 
 
 get({username, UserName}) -> kvs:user_by_username(UserName);
 get({username, UserName}) -> kvs:user_by_username(UserName);
 get({facebook, FBId}) -> kvs:user_by_facebook_id(FBId);
 get({facebook, FBId}) -> kvs:user_by_facebook_id(FBId);
@@ -68,13 +70,11 @@ get(UId) -> kvs:get(user, UId).
 
 
 subscribe(Who, Whom) ->
 subscribe(Who, Whom) ->
     Record = #subscription{key={Who,Whom}, who = Who, whom = Whom},
     Record = #subscription{key={Who,Whom}, who = Who, whom = Whom},
-    kvs:put(Record),
-    subscription_mq(user, add, Who, Whom).
+    kvs:put(Record).
 
 
 unsubscribe(Who, Whom) ->
 unsubscribe(Who, Whom) ->
     case subscribed(Who, Whom) of
     case subscribed(Who, Whom) of
-        true  -> kvs:delete(subscription, {Who, Whom}),
-                 subscription_mq(user, remove, Who, Whom);
+        true  -> kvs:delete(subscription, {Who, Whom});
         false -> skip end.
         false -> skip end.
 
 
 subscriptions(undefined)-> [];
 subscriptions(undefined)-> [];
@@ -86,24 +86,13 @@ subscribed(Who, Whom) ->
         {ok, _} -> true;
         {ok, _} -> true;
         _ -> false end.
         _ -> false end.
 
 
-update_user(#user{username=UId,name=Name,surname=Surname} = NewUser) ->
-    OldUser = case kvs:get(user,UId) of
-        {error,notfound} -> NewUser;
-        {ok,#user{}=User} -> User
-    end,
-    kvs:put(NewUser),
-    case Name==OldUser#user.name andalso Surname==OldUser#user.surname of
-        true -> ok;
-        false -> kvs:update_user_name(UId,Name,Surname)
-    end.
-
 subscription_mq(Type, Action, MeId, ToId) ->
 subscription_mq(Type, Action, MeId, ToId) ->
     case mqs:open([]) of
     case mqs:open([]) of
-        {ok,Channel} -> Routes = case Type of user -> rk_user_feed(ToId); group -> rk_group_feed(ToId) end,
-                        case Action of
-                            add -> bind_user_exchange(Channel, MeId, Routes);
-                            remove -> unbind_user_exchange(Channel, MeId, Routes) end,
-                        mqs_channel:close(Channel);
+        {ok,Channel} ->
+            case {Type,Action} of 
+                {user,add}     -> bind_user_exchange(Channel, MeId, rk_user_feed(ToId));
+                {user,remove}  -> unbind_user_exchange(Channel, MeId, rk_user_feed(ToId)) end,
+            mqs_channel:close(Channel);
         {error,Reason} -> ?ERROR("subscription_mq error: ~p",[Reason]) end.
         {error,Reason} -> ?ERROR("subscription_mq error: ~p",[Reason]) end.
 
 
 init_mq(User=#user{}) ->
 init_mq(User=#user{}) ->
@@ -118,235 +107,42 @@ init_mq(User=#user{}) ->
             Relations = build_user_relations(User, Groups),
             Relations = build_user_relations(User, Groups),
             [bind_user_exchange(Channel, User, RK) || RK <- Relations],
             [bind_user_exchange(Channel, User, RK) || RK <- Relations],
             mqs_channel:close(Channel);
             mqs_channel:close(Channel);
-        {error,Reason} -> ?ERROR("init_mq error: ~p",[Reason]) end;
-
-init_mq(Group=#group{}) ->
-    GroupExchange = ?GROUP_EXCHANGE(Group#group.username),
-    ExchangeOptions = [{type, <<"fanout">>}, durable, {auto_delete, false}],
-    case mqs:open([]) of
-        {ok, Channel} ->
-            mqs_channel:create_exchange(Channel, GroupExchange, ExchangeOptions),
-            Relations = build_group_relations(Group),
-            [bind_group_exchange(Channel, Group, RK) || RK <- Relations],
-            mqs_channel:close(Channel);
-        {error, Reason} -> ?ERROR("init_mq error: ~p",[Reason]) end.
+        {error,Reason} -> ?ERROR("init_mq error: ~p",[Reason]) end.
 
 
 build_user_relations(User, Groups) -> [
 build_user_relations(User, Groups) -> [
-    rk( [db, user, User, put] ),
-    rk( [subscription, user, User, add_to_group]),
-    rk( [subscription, user, User, remove_from_group]),
-    rk( [subscription, user, User, leave_group]),
-    rk( [login, user, User, update_after_login]),
-    rk( [likes, user, User, add_like]),
-    rk( [feed, delete, User]),
-    rk( [feed, user, User, '*', '*', '*']),
-    rk( [feed, user, User, count_entry_in_statistics] ),
-    rk( [feed, user, User, count_comment_in_statistics] ),
-    rk( [feed, user, User, post_note] ),
-    rk( [subscription, user, User, subscribe_user]),
-    rk( [subscription, user, User, remove_subscribe]),
-    rk( [subscription, user, User, set_user_game_status]),
-    rk( [subscription, user, User, update_user]),
-    rk( [subscription, user, User, block_user]),
-    rk( [subscription, user, User, unblock_user]),
-    rk( [payment, user, User, set_purchase_external_id]),
-    rk( [payment, user, User, set_purchase_state]),
-    rk( [payment, user, User, set_purchase_info]),
-    rk( [payment, user, User, add]),
-    rk( [transaction, user, User, add]),
-    rk( [invite, user, User, add]),
-    rk( [meeting, user, User, create]),
-    rk( [meeting, user, User, join]),
-    rk( [purchase, user, User, buy_gift]),
-    rk( [purchase, user, User, give_gift]),
-    rk( [purchase, user, User, mark_gift_as_deliving]),
-    rk( [feed, system, '*', '*']) |
-    [rk_group_feed(G) || G <- Groups]
-    ].
-
-build_group_relations(Group) -> [
-    rk( [db, group, Group, put] ),
-    rk( [db, group, Group, update_group] ),
-    rk( [db, group, Group, remove_group] ),
-    rk( [likes, group, Group, add_like]),   % for comet mostly
-    rk( [feed, delete, Group] ),
-    rk( [feed, group, Group, '*', '*', '*'] )
+    msq:key( [kvs_user, '*', User]),
+    msq:key( [kvs_feed, user, User, '*', '*', '*']),
+    msq:key( [kvs_feed, user, User, '*'] ),
+    msq:key( [kvs_payment, user, User, '*']),
+    msq:key( [kvs_payment, user, User, '*']),
+    msq:key( [kvs_account, user, User, '*']),
+    msq:key( [kvs_meeting, user, User, '*']),
+    msq:key( [kvs_purchase, user, User, '*']) |
+  [ mqs:key( [kvs_feed, group, G, '*', '*', '*']) || G <- Groups ]
     ].
     ].
 
 
-rk_user_feed(User) -> rk([feed, user, User, '*', '*', '*']).
-rk_group_feed(Group) -> rk([feed, group, Group, '*', '*', '*']).
-
-bind_user_exchange(Channel, User, RoutingKey) -> {bind, RoutingKey, mqs_channel:bind_exchange(Channel, ?USER_EXCHANGE(User), ?NOTIFICATIONS_EX, RoutingKey)}.
-unbind_user_exchange(Channel, User, RoutingKey) -> {unbind, RoutingKey, mqs_channel:unbind_exchange(Channel, ?USER_EXCHANGE(User), ?NOTIFICATIONS_EX, RoutingKey)}.
-bind_group_exchange(Channel, Group, RoutingKey) -> {bind, RoutingKey, mqs_channel:bind_exchange(Channel, ?GROUP_EXCHANGE(Group), ?NOTIFICATIONS_EX, RoutingKey)}.
-unbind_group_exchange(Channel, Group, RoutingKey) -> {unbind, RoutingKey, mqs_channel:unbind_exchange(Channel, ?GROUP_EXCHANGE(Group), ?NOTIFICATIONS_EX, RoutingKey)}.
+rk_user_feed(User) -> mqs:key([feed, user, User, '*', '*', '*']).
 
 
-rk(List) -> mqs_lib:list_to_key(List).
+bind_user_exchange(Channel, User, Route) -> {bind, Route, mqs_channel:bind_exchange(Channel, ?USER_EXCHANGE(User), ?NOTIFICATIONS_EX, Route)}.
+unbind_user_exchange(Channel, User, Route) -> {unbind, Route, mqs_channel:unbind_exchange(Channel, ?USER_EXCHANGE(User), ?NOTIFICATIONS_EX, Route)}.
 
 
 retrieve_connections(Id,Type) ->
 retrieve_connections(Id,Type) ->
     Friends = case Type of 
     Friends = case Type of 
-                  user -> kvs_users:list_subscr_usernames(Id);
-                     _ -> kvs_group:list_group_members(Id) end,
+        user -> kvs_users:list_subscr_usernames(Id);
+        _ -> kvs_group:list_group_members(Id) end,
     case Friends of
     case Friends of
-	[] -> [];
-	Full -> Sub = lists:sublist(Full, 10),
-                case Sub of
-                     [] -> [];
-                      _ -> Data = [begin case kvs:get(user,Who) of
-                                       {ok,User} -> RealName = kvs_users:user_realname_user(User),
-                                                    Paid = kvs_account:user_paid(Who),
-                                                    {Who,Paid,RealName};
-				               _ -> undefined end end || Who <- Sub],
-			   [X||X<-Data, X/=undefined] end end.
-
-%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-
-handle_notice(["system", "create_group"] = Route, 
-    Message, #state{owner = Owner, type =Type} = State) ->
-    ?INFO("queue_action(~p): create_group: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
-    {UId, GId, Name, Desc, Publicity} = Message,
-    FId = kvs:feed_create(),
-    CTime = erlang:now(),
-
-    Group =#group{username = GId,
-                              name = Name,
-                              description = Desc,
-                              publicity = Publicity,
-                              creator = UId,
-                              created = CTime,
-                              owner = UId,
-                              feed = FId},
-    kvs:put(Group),
-    mqs:notify([group, init], {GId, FId}),
-    kvs_users:init_mq(Group),
-
-    {noreply, State};
-
-handle_notice(["db", "group", GId, "remove_group"] = Route, 
-    Message, #state{owner = Owner, type =Type} = State) ->
-    ?INFO("queue_action(~p): remove_group: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
-    {_, Group} = kvs_group:get_group(GId),
-    case Group of 
-        notfound -> ok;
-        _ ->
-            mqs:notify([feed, delete, GId], empty),
-            kvs:delete_by_index(group_subs, <<"group_subs_group_id_bin">>, GId),         
-            kvs:delete(feed, Group#group.feed),
-            kvs:delete(group, GId),
-            % unbind exchange
-            {ok, Channel} = mqs:open([]),
-            Routes = kvs_users:rk_group_feed(GId),
-            kvs_users:unbind_group_exchange(Channel, GId, Routes),
-            mqs_channel:close(Channel)
-    end,
-    {noreply, State};
-
-handle_notice(["subscription", "user", UId, "add_to_group"] = Route,
-    Message, #state{owner = Owner, type =Type} = State) ->
-    ?INFO("queue_action(~p): add_to_group: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
-    {GId, Who, UType} = Message,
-
-    case kvs:get(group_subs, {UId, GId}) of
-        {error, notfound} ->
-            {R, Group} = kvs:get(group, GId),
-            case R of 
-                error -> ?INFO("Add to group failed reading group");
+        [] -> [];
+        Full -> Sub = lists:sublist(Full, 10),
+            case Sub of
+                [] -> [];
                 _ ->
                 _ ->
-                    GU = Group#group.users_count,
-                    kvs:put(Group#group{users_count = GU+1})
-            end;
-        _ ->
-            ok
-    end,
-
-    OK = kvs:put({group_subs,UId,GId,Type,0}),
-
-%    add_to_group(Who, GId, UType),
-    ?INFO("add ~p to group ~p with Type ~p by ~p", [Who, GId,UType,UId]),
-    kvs_users:subscribemq(group, add, Who, GId),
-    {noreply, State};
-
-handle_notice(["subscription", "user", UId, "remove_from_group"] = Route,
-    Message, #state{owner = Owner, type =Type} = State) ->
-    ?INFO("queue_action(~p): remove_from_group: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),    
-    {GId} = Message,
-    ?INFO("remove ~p from group ~p", [UId, GId]),
-    kvs_users:remove_subscription_mq(group, UId, GId),
-
-    kvs:delete(group_subs, {UId, GId}),
-    {R, Group} = kvs:get(group, GId),
-    case R of
-        error -> ?INFO("Remove ~p from group failed reading group ~p", [UId, GId]);
-        _ ->
-            GU = Group#group.users_count,
-            kvs:put(Group#group{users_count = GU-1})
-    end,
-
-    {noreply, State};
-
-handle_notice(["subscription", "user", UId, "leave_group"] = Route,
-    Message, #state{owner = Owner, type =Type} = State) ->
-    ?INFO(" queue_action(~p): leave_group: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
-    {GId} = Message,
-    {R, Group} = kvs:get(group, GId),
-    case R of 
-        error -> ?ERROR(" Error reading group ~p for leave_group", [GId]);
-        ok ->
-            case Group#group.owner of
-                UId -> % User is owner, transfer ownership to someone else
-                    Members = kvs_group:list_group_members(GId),
-                    case Members of
-                        [ FirstOne | _ ] ->
-                            ok = kvs:put(Group#group{owner = FirstOne}),
-                            mqs:notify(["subscription", "user", UId, "remove_from_group"], {GId});
-                        [] ->
-                            % Nobody left in group, remove group at all
-                            mqs:notify([db, group, GId, remove_group], [])
-                    end;
-                _ -> % Plain user removes -- just remove it
-                    mqs:notify(["subscription", "user", UId, "remove_from_group"], {GId})
-            end;
-        _ -> % user is just someone, remove it
-            mqs:notify(["subscription", "user", UId, "remove_from_group"], {GId})
-    end,
-    {noreply, State};
-
-handle_notice(["subscription", "user", UId, "subscribe"] = Route,
-    Message, #state{owner = Owner, type =Type} = State) ->
-    ?INFO(" queue_action(~p): subscribe_user: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
-    {Whom} = Message,
-    kvs_users:subscribe(UId, Whom),
-    {noreply, State};
-
-handle_notice(["subscription", "user", UId, "unsubscribe"] = Route,
-    Message, #state{owner = Owner, type =Type} = State) ->
-    ?INFO(" queue_action(~p): remove_subscribe: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
-    {Whom} = Message,
-    kvs_users:unsubscribe(UId, Whom),
-    {noreply, State};
-
-handle_notice(["subscription", "user", _UId, "update"] = Route,
-    Message, #state{owner = Owner, type =Type} = State) ->
-    ?INFO(" queue_action(~p): update_user: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
-    {NewUser} = Message,
-    kvs_users:update_user(NewUser),
-    {noreply, State};
-
-handle_notice(["login", "user", UId, "update_after_login"] = Route,
-    Message, #state{owner = Owner, type =Type} = State) ->
-    ?INFO("queue_action(~p): update_after_login: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),    
-    Update = case kvs_users:user_status(UId) of
-        {error, status_info_not_found} -> #user_status{username = UId, last_login = erlang:now()};
-        {ok, UserStatus} -> UserStatus#user_status{last_login = erlang:now()} end,
-    kvs:put(Update),
-    {noreply, State};
-
-handle_notice(Route, Message, State) -> error_logger:info_msg("Unknown USERS notice").
-
-
-%%%%%%%%%%%%%%%%%%%%%%
-
-% user backlinks
+                    Data = [begin
+                        case kvs:get(user,Who) of
+                            {ok,User} -> RealName = kvs_users:user_realname_user(User),
+                            Paid = kvs_account:user_paid(Who),
+                            {Who,Paid,RealName};
+                        _ -> undefined end end || Who <- Sub],
+                    [ X || X <- Data, X/=undefined ] end end.
 
 
 user_by_verification_code(Code) ->
 user_by_verification_code(Code) ->
     case kvs:get(code,Code) of
     case kvs:get(code,Code) of
@@ -367,3 +163,25 @@ user_by_username(Name) ->
     case X = kvs:get(user,Name) of
     case X = kvs:get(user,Name) of
         {ok,_Res} -> X;
         {ok,_Res} -> X;
         Else -> Else end.
         Else -> Else end.
+
+handle_notice(["kvs_user", "subscribe", Who] = Route,
+    Message, #state{owner = Owner, type =Type} = State) ->
+    {Whom} = Message,
+    kvs_users:subscribe(Who, Whom),
+    subscription_mq(user, add, Who, Whom),
+    {noreply, State};
+
+handle_notice(["kvs_user", "unsubscribe", Who] = Route,
+    Message, #state{owner = Owner, type =Type} = State) ->
+    {Whom} = Message,
+    kvs_users:unsubscribe(Who, Whom),
+    subscription_mq(user, remove, Who, Whom),
+    {noreply, State};
+
+handle_notice(["kvs_user", "update", Who] = Route,
+    Message, #state{owner = Owner, type =Type} = State) ->
+    {NewUser} = Message,
+    kvs:put(NewUser),
+    {noreply, State};
+
+handle_notice(Route, Message, State) -> error_logger:info_msg("Unknown USERS notice").