Browse Source

kvs_feed refactored

Maxim Sokhatsky 12 years ago
parent
commit
cd12479c82
3 changed files with 170 additions and 269 deletions
  1. 9 5
      README.md
  2. 1 10
      src/kvs_comment.erl
  3. 160 254
      src/kvs_feed.erl

+ 9 - 5
README.md

@@ -11,23 +11,27 @@ and provides high-level rich API to stored and extend following data:
 * Users
 * Groups
 * Subscriptions
-* Feeds
-* Accounts
+* Feeds and Comments
 * Meetings
-* Payments
-* Purchases
+* Accounts and Payments
+* Products and Purchases
 
 This Framework provides also a Plugin for Feed Server for sequential consistency.
 All write requests with given object keys will be handled by single processes
 in Feed Server so you may not worry about concurrent changes of user feeds.
 
+All write operations that are made to data with secondary indexes,
+i.e. not like linked lists could be potentially handled without feed_server.
+But some KV storages are not supporting secondary indexes add those backends carefully.
+
 Store Backends
 --------------
 
 Currently kvs includes following store backends:
 
 * Mnesia
-* Riak, supports secondary indexes via LevelDB
+* Riak
+* CouchDB
 
 Credits
 -------

+ 1 - 10
src/kvs_comment.erl

@@ -2,15 +2,6 @@
 -include("feeds.hrl").
 -compile(export_all).
 
-add(EId, User, Value) ->  add(EId, User, Value, []).
-add(EId, User, Value, Medias) ->
-    CId = kvs:next_id("comment"),
-    R = #comment{id = {CId, EId}, comment_id = CId, entry_id = EId,
-                 content = Value, author_id = User, media = Medias, create_time = now() },
-    case kvs:put(R) of
-        ok -> {ok, R};
-        A -> A end.
-
 select_by_entry_id(EntryId) ->
     kvs:comments_by_entry(EntryId).
 
@@ -37,7 +28,7 @@ comments_by_entry({EId, FId}) ->
     end.
 
 
-feed_add_comment(FId, User, EntryId, ParentComment, CommentId, Content, Medias) ->
+add(FId, User, EntryId, ParentComment, CommentId, Content, Medias) ->
     FullId = {CommentId, {EntryId, FId}},
 
     Prev = case ParentComment of

+ 160 - 254
src/kvs_feed.erl

@@ -1,4 +1,5 @@
 -module(kvs_feed).
+-copyright('Synrc Research Center, s.r.o.').
 -compile(export_all).
 -include_lib("kvs/include/feeds.hrl").
 -include_lib("kvs/include/users.hrl").
@@ -11,6 +12,73 @@ create() ->
     ok = kvs:put(#feed{id = FId} ),
     FId.
 
+add_entry(FId, User, To, EntryId,Desc,Medias,Type,SharedBy) ->
+    case kvs:get(entry,{EntryId, FId}) of
+        {ok, _} -> ok;
+        _ -> add_entry(FId, User, To, EntryId, Desc, Medias, Type, SharedBy, dont_check) end.
+
+add_entry(FId, User, To, EntryId, Desc, Medias, Type, SharedBy, _) ->
+    {ok,Feed} = kvs:get(feed, erlang:integer_to_list(FId)),
+
+    Id = {EntryId, FId},
+    Next = undefined,
+    Prev = case Feed#feed.top of
+               undefined -> undefined;
+               X -> case kvs:get(entry, X) of
+                       {ok, TopEntry} -> EditedEntry = TopEntry#entry{next = Id}, kvs:put(EditedEntry), TopEntry#entry.id;
+                       {error,notfound} -> undefined end end,
+
+    kvs:put(#feed{id = FId, top = {EntryId, FId}}), % update feed top with current
+
+    Entry  = #entry{id = {EntryId, FId},
+                    entry_id = EntryId,
+                    feed_id = FId,
+                    from = User,
+                    to = To,
+                    type = Type,
+                    media = Medias,
+                    created_time = now(),
+                    description = Desc,
+                    raw_description = Desc,
+                    shared = SharedBy,
+                    next = Next,
+                    prev = Prev},
+
+    ModEntry = case catch feedformat:format(Entry) of
+                   {_, Reason} -> ?ERROR("feedformat error: ~p", [Reason]), Entry;
+                   #entry{} = ME -> ME end,
+
+    kvs:put(ModEntry),
+
+    {ok, ModEntry}.
+
+entry_traversal(undefined, _) -> [];
+entry_traversal(_, 0) -> [];
+entry_traversal(Next, Count)->
+    case kvs:get(entry, Next) of
+        {error,notfound} -> [];
+        {ok, R} ->
+            Prev = element(#entry.prev, R),
+            Count1 = case Count of 
+                C when is_integer(C) -> case R#entry.type of
+                    {_, system} -> C;   % temporal entries are entries too, but they shouldn't be counted
+                    {_, system_note} -> C;
+                    _ -> C - 1
+                end;
+                _-> Count 
+            end,
+            [R | entry_traversal(Prev, Count1)]
+    end.
+
+entries(FeedId, undefined, PageAmount) ->
+    case kvs:get(feed, FeedId) of
+        {ok, O} -> entry_traversal(O#feed.top, PageAmount);
+        {error, notfound} -> [] end;
+entries(FeedId, StartFrom, PageAmount) ->
+    case kvs:get(entry,{StartFrom, FeedId}) of
+        {ok, #entry{prev = Prev}} -> entry_traversal(Prev, PageAmount);
+        _ -> [] end.
+
 add_like(Fid, Eid, Uid) ->
     Write_one_like = fun(Next) ->
         Self_id = kvs:next_id("one_like", 1),   
@@ -53,28 +121,16 @@ add_like(Fid, Eid, Uid) ->
             })
     end.
 
-% statistics
-
-get_entries_count(Uid) ->
+entries_count(Uid) ->
     case kvs:get(user_etries_count, Uid) of
         {ok, UEC} -> UEC#user_etries_count.entries;
         {error, notfound} -> 0 end.
 
-get_comments_count(Uid) ->
+comments_count(Uid) ->
     case kvs:get(user_etries_count, Uid) of
         {ok, UEC} -> UEC#user_etries_count.comments;
         {error, notfound} -> 0 end.
 
-create_message(Table) ->
-    EId = kvs:next_id("entry", 1),
-    #entry{id = {EId, system_info},
-        entry_id = EId,
-        from = system,
-        type = {system, new_table},
-        created_time = now(),
-        description = Table}.
-
-
 remove_entry(FeedId, EId) ->
     {ok, #feed{top = TopId} = Feed} = kvs:get(feed,FeedId),
     case kvs:get(entry, {EId, FeedId}) of
@@ -90,13 +146,10 @@ remove_entry(FeedId, EId) ->
 edit_entry(FeedId, EId, NewDescription) ->
     case kvs:get(entry,{EId, FeedId}) of
         {ok, OldEntry} ->
-            NewEntryRaw =  OldEntry#entry{description = NewDescription,
-                                          raw_description = NewDescription},
+            NewEntryRaw =  OldEntry#entry{description = NewDescription, raw_description = NewDescription},
             NewEntry = feedformat:format(NewEntryRaw),
             kvs:put(NewEntry);
-        {error, notfound}->
-            {error, notfound}
-    end.
+        {error, notfound}-> {error, notfound} end.
 
 remove_entry_comments(FId, EId) ->
     AllComments = kvs:comments_by_entry(FId, EId),
@@ -104,82 +157,74 @@ remove_entry_comments(FId, EId) ->
 
 entry_add_comment(FId, User, EntryId, ParentComment, CommentId, Content, Medias) ->
      case kvs:get(entry,{EntryId, FId}) of
-         {ok, _E} -> kvs:add_comment(FId, User, EntryId, ParentComment, CommentId, Content, Medias);
+         {ok, _E} -> kvs_comment:add(FId, User, EntryId, ParentComment, CommentId, Content, Medias);
          _ -> ok end.
 
-get_one_like_list(undefined) -> [];
-get_one_like_list(Id) -> {ok, OneLike} = kvs:get(one_like, Id),
-    [OneLike] ++ get_one_like_list(OneLike#one_like.next).
+like_list(undefined) -> [];
+like_list(Id) -> {ok, OneLike} = kvs:get(one_like, Id), [OneLike] ++ like_list(OneLike#one_like.next).
+like_list(undefined, _) -> [];
+like_list(_, 0) -> [];
+like_list(Id, N) -> {ok, OneLike} = kvs:get(one_like, Id), [OneLike] ++ like_list(OneLike#one_like.next, N-1).
 
-get_entries_likes(Entry_id) ->
+entry_likes(Entry_id) ->
     case kvs:get(entry_likes, Entry_id) of
         {ok, Likes} -> get_one_like_list(Likes#entry_likes.one_like_head);
         {error, notfound} -> [] end.
 
-get_entries_likes_count(Entry_id) ->
+entry_likes_count(Entry_id) ->
     case kvs:get(entry_likes, Entry_id) of
-        {ok, Likes} ->
-            Likes#entry_likes.total_count;
-        {error, notfound} -> 0
-    end.
+        {ok, Likes} -> Likes#entry_likes.total_count;
+        {error, notfound} -> 0 end.
 
-get_user_likes_count(UserId) ->
+user_likes_count(UserId) ->
     case kvs:get(user_likes, UserId) of
         {ok, Likes} -> Likes#user_likes.total_count;
-        {error, notfound} -> 0
-    end.
+        {error, notfound} -> 0 end.
 
-get_user_likes(UserId) ->
+user_likes(UserId) ->
     case kvs:get(user_likes, UserId) of
         {ok, Likes} -> get_one_like_list(Likes#user_likes.one_like_head);
-        {error, notfound} -> []
-    end.
+        {error, notfound} -> [] end.
 
-get_one_like_list(undefined, _) -> [];
-get_one_like_list(_, 0) -> [];
-get_one_like_list(Id, N) -> {ok, OneLike} = kvs:get(one_like, Id),
-    [OneLike] ++ get_one_like_list(OneLike#one_like.next, N-1).
 
-get_user_likes(UserId, {Page, PageAmount}) ->
+user_likes(UserId, {Page, PageAmount}) ->
     case kvs:get(user_likes, UserId) of
         {ok, Likes} -> lists:nthtail((Page-1)*PageAmount, get_one_like_list(Likes#user_likes.one_like_head, PageAmount*Page));
-        {error, notfound} -> []
-    end.
+        {error, notfound} -> [] end.
 
-get_comments_entries(UserUid, _, _Page, _PageAmount) ->
+comments_entries(UserUid, _, Page, PageAmount) ->
     Pids = [Eid || #comment{entry_id=Eid} <- kvs:select(comment,
         fun(#comment{author_id=Who}) when Who=:=UserUid ->true;(_)->false end)],
-    %?PRINT({"GCE pids length: ", length(Pids)}),
     lists:flatten([kvs:select(entry,[{where, fun(#entry{entry_id=ID})-> ID=:=Pid end},
         {order, {1, descending}},{limit, {1,1}}]) || Pid <- Pids]).
 
-get_my_discussions(_FId, Page, PageAmount, UserUid) ->
-    _Offset= case (Page-1)*PageAmount of
-        0 -> 1
-        ;M-> M
-    end,
+get_my_discussions(FId, Page, PageAmount, UserUid) ->
+    Offset = case (Page-1)*PageAmount of 0 -> 1; M-> M  end,
     Pids = [Eid || #comment{entry_id=Eid} <- kvs:select(comment,
         fun(#comment{author_id=Who}) when Who=:=UserUid ->true;(_)->false end)],
     lists:flatten([kvs:select(entry,[{where, fun(#entry{entry_id=ID})-> ID=:=Pid end},
         {order, {1, descending}},{limit, {1,1}}]) || Pid <- Pids]).
 
-%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+purge_feed(FeedId) ->
+    {ok,Feed} = kvs:get(feed,FeedId),
+    Removal = entry_traversal(Feed#feed.top, -1),
+    [kvs:delete(entry,Id)||#entry{id=Id}<-Removal],
+    kvs:put(Feed#feed{top=undefined}).
+
+purge_unverified_feeds() ->
+    [purge_feed(FeedId) || #user{feed=FeedId,status=S,email=E} <- kvs:all(user),E==undefined].
 
-handle_notice(["feed", "delete", Owner] = Route, Message,
-              #state{owner = Owner} = State) ->
-    ?INFO("feed(~p): notification received: User=~p, Route=~p, Message=~p",
-          [self(), Owner, Route, Message]),
+%% MQ API
+
+handle_notice(["kvs_feed", "delete", Owner] = Route,
+    Message, #state{owner = Owner} = State) ->
+    ?INFO("feed(~p): notification received: User=~p, Route=~p, Message=~p", [self(), Owner, Route, Message]),
     {stop, normal, State};
 
-handle_notice(["feed", "group", GroupId, "entry", EntryId, "add"] = Route,
-              [From|_] = Message,
-              #state{owner = Owner, feed = Feed} = State) ->
-    ?INFO("feed(~p): group message: Owner=~p, Route=~p, Message=~p",
-          [self(), Owner, Route, Message]),
+handle_notice(["kvs_feed", "group", GroupId, "entry", EntryId, "add"] = Route, [From|_] = Message, #state{owner = Owner, feed = Feed} = State) ->
+    ?INFO("feed(~p): group message: Owner=~p, Route=~p, Message=~p", [self(), Owner, Route, Message]),
     [From, _Destinations, Desc, Medias] = Message,
-    kvs_feed:add_group_entry(Feed, From, [{GroupId, group}], EntryId,
-                         Desc, Medias, {group, direct}),
-    % statistics
+    kvs_feed:add_entry(Feed, From, [{GroupId, group}], EntryId, Desc, Medias, {group, direct}, ""),
     case Owner == GroupId of
         false -> ok;
         true ->
@@ -189,87 +234,72 @@ handle_notice(["feed", "group", GroupId, "entry", EntryId, "add"] = Route,
             {ok, Subs} = kvs:get(group_subs, {From, GroupId}),
             SE = Subs#group_subscription.user_posts_count,
             kvs:put(Subs#group_subscription{user_posts_count = SE+1})
-    end,    
-
+    end,
     self() ! {feed_refresh,Feed,20},
     {noreply, State};
 
-handle_notice(["feed", "user", FeedOwner, "entry", EntryId, "add"] = Route,
-              [From|_] = Message,
-              #state{owner = WorkerOwner, feed = Feed, direct = Direct} = State) ->
-    ?INFO("feed(~p): message: Owner=~p, Route=~p, Message=~p",
-          [self(), WorkerOwner, Route, Message]),
+handle_notice(["kvs_feed", "user", FeedOwner, "entry", EntryId, "add"] = Route,
+    [From|_] = Message, #state{owner = WorkerOwner, feed = Feed, direct = Direct} = State) ->
+    ?INFO("feed(~p): message: Owner=~p, Route=~p, Message=~p", [self(), WorkerOwner, Route, Message]),
+
     [From, Destinations, Desc, Medias] = Message,
 
     if
         %% user added message to own feed
         FeedOwner == From andalso FeedOwner == WorkerOwner->
             FilteredDst = [D || {_, group} = D <- Destinations],
-            kvs_feed:add_entry(Feed, From, FilteredDst, EntryId, Desc, Medias,
-                           {user, normal}), self() ! {feed_refresh,Feed,20};
+            kvs_feed:add_entry(Feed, From, FilteredDst, EntryId, Desc, Medias, {user, normal},""), self() ! {feed_refresh,Feed,20};
 
         %% friend added message to public feed
-        FeedOwner == From ->
-            kvs_feed:add_entry(Feed, From, [], EntryId, Desc, Medias,
-                           {user, normal}), self() ! {feed_refresh,Feed,20};
+        FeedOwner == From -> 
+            kvs_feed:add_entry(Feed, From, [], EntryId, Desc, Medias, {user, normal},""), self() ! {feed_refresh,Feed,20};
 
         %% direct message to worker owner
-        FeedOwner == WorkerOwner ->
-            kvs_feed:add_direct_message(Direct, From, [{FeedOwner, user}],
-                                    EntryId, Desc, Medias), self() ! {direct_refresh,Direct,20};
+        FeedOwner == WorkerOwner -> 
+            kvs_feed:add_entry(Direct, From, [{FeedOwner, user}], EntryId, Desc, Medias, {user,direct}, ""), self() ! {direct_refresh,Direct,20};
 
         %% user sent direct message to friend, add copy to his direct feed
         From == WorkerOwner ->
-            kvs_feed:add_direct_message(Direct, WorkerOwner, Destinations,
-                                    EntryId, Desc, Medias), self() ! {direct_refresh,Direct,20};
-        true ->
-            ?INFO("not matched case in entry->add")
+            kvs_feed:add_entry(Direct, WorkerOwner, Destinations, EntryId, Desc, Medias, {user, direct}, ""), self() ! {direct_refresh,Direct,20};
+
+        true -> ?INFO("not matched case in entry->add")
     end,
-    
+
     {noreply, State};
 
-% add/delete system message
-handle_notice(["feed", "user", _FeedOwner, "entry", EntryId, "add_system"] = Route, 
-              [From|_] = Message,
-              #state{owner = WorkerOwner, feed = Feed, direct = _Direct} = State) ->
-    ?INFO("feed(~p): system message: Owner=~p, Route=~p, Message=~p",
-          [self(), WorkerOwner, Route, Message]),
+handle_notice(["kvs_feed", "user", _FeedOwner, "entry", EntryId, "add_system"] = Route,
+    [From|_] = Message, #state{owner = WorkerOwner, feed = Feed, direct = _Direct} = State) ->
+    ?INFO("feed(~p): system message: Owner=~p, Route=~p, Message=~p", [self(), WorkerOwner, Route, Message]),
     [From, _Destinations, Desc, Medias] = Message,
-
-    kvs_feed:add_entry(Feed, From, [], EntryId, Desc, Medias, {user, system}),
+    kvs_feed:add_entry(Feed, From, [], EntryId, Desc, Medias, {user, system}, ""),
     {noreply, State};
 
-handle_notice(["feed", "group", GroupId, "entry", EntryId, "add_system"] = Route,
-              [From|_] = Message,
-              #state{owner = Owner, feed = Feed} = State) ->
+handle_notice(["kvs_feed", "group", GroupId, "entry", EntryId, "add_system"] = Route,
+              [From|_] = Message, #state{owner = Owner, feed = Feed} = State) ->
     ?INFO("feed(~p): group system message: Owner=~p, Route=~p, Message=~p",
           [self(), Owner, Route, Message]),
     [From, _Destinations, Desc, Medias] = Message,
-    kvs_feed:add_group_entry(Feed, From, [{GroupId, group}], EntryId,
-                         Desc, Medias, {group, system}),
+    kvs_feed:add_entry(Feed, From, [{GroupId, group}], EntryId, Desc, Medias, {group, system}, ""),
     {noreply, State};
 
-handle_notice(["feed", "user", UId, "post_note"] = Route, Message, 
-        #state{owner = Owner, feed = Feed} = State) ->
-    ?INFO("feed(~p): post_note: Owner=~p, Route=~p, Message=~p", [self(), Owner, Route, Message]),
+handle_notice(["feed", "user", UId, "post_note"] = Route,
+    Message, #state{owner = Owner, feed = Feed} = State) ->
+     ?INFO("feed(~p): post_note: Owner=~p, Route=~p, Message=~p", [self(), Owner, Route, Message]),
     Note = Message,
     Id = utils:uuid_ex(),
-    kvs_feed:add_entry(Feed, UId, [], Id, Note, [], {user, system_note}),
+    kvs_feed:add_entry(Feed, UId, [], Id, Note, [], {user, system_note}, ""),
     {noreply, State};
 
-handle_notice(["feed", _, WhoShares, "entry", NewEntryId, "share"],
-              #entry{entry_id = _EntryId, raw_description = Desc, media = Medias,
-                     to = Destinations, from = From} = E,
-              #state{feed = Feed, type = user} = State) ->
+handle_notice(["kvs_feed", _, WhoShares, "entry", NewEntryId, "share"],
+                #entry{entry_id = _EntryId, raw_description = Desc, media = Medias, to = Destinations,
+                from = From} = E, #state{feed = Feed, type = user} = State) ->
     %% FIXME: sharing is like posting to the wall
     ?INFO("share: ~p, WhoShares: ~p", [E, WhoShares]),
-%    NewEntryId = utils:uuid_ex(),
-    kvs_feed:add_shared_entry(Feed, From, Destinations, NewEntryId, Desc, Medias, {user, normal}, WhoShares),
+    kvs_feed:add_entry(Feed, From, Destinations, NewEntryId, Desc, Medias, {user, normal}, WhoShares),
     {noreply, State};
 
-handle_notice(["feed", "group", _Group, "entry", EntryId, "delete"] = Route,
-              Message,
-              #state{owner = Owner, feed = Feed} = State) ->
+handle_notice(["kvs_feed", "group", _Group, "entry", EntryId, "delete"] = Route,
+              Message, #state{owner = Owner, feed = Feed} = State) ->
     ?INFO("feed(~p): remove entry: Owner=~p, Route=~p, Message=~p",
           [self(), Owner, Route, Message]),
     %% all group subscribers shold delete entry from their feeds
@@ -277,188 +307,64 @@ handle_notice(["feed", "group", _Group, "entry", EntryId, "delete"] = Route,
     self() ! {feed_refresh,Feed,20},
     {noreply, State};
 
-handle_notice(["feed", _Type, EntryOwner, "entry", EntryId, "delete"] = Route,
-              Message,
-              #state{owner = Owner, feed=Feed, direct=Direct} = State) ->
+handle_notice(["kvs_feed", _Type, EntryOwner, "entry", EntryId, "delete"] = Route,
+              Message, #state{owner = Owner, feed=Feed, direct=Direct} = State) ->
     case {EntryOwner, Message} of
         %% owner of the antry has deleted entry, we will delete it too
         {_, [EntryOwner|_]} ->
-            ?INFO("feed(~p): remove entry: Owner=~p, Route=~p, Message=~p",
-                  [self(), Owner, Route, Message]),
+            ?INFO("feed(~p): remove entry: Owner=~p, Route=~p, Message=~p", [self(), Owner, Route, Message]),
             kvs_feed:remove_entry(Feed, EntryId),
             kvs_feed:remove_entry(Direct, EntryId);
         %% we are owner of the entry - delete it
         {Owner, _} ->
-            ?INFO("feed(~p): remove entry: Owner=~p, Route=~p, Message=~p",
-                  [self(), Owner, Route, Message]),
+            ?INFO("feed(~p): remove entry: Owner=~p, Route=~p, Message=~p", [self(), Owner, Route, Message]),
             kvs_feed:remove_entry(Feed, EntryId),
             kvs_feed:remove_entry(Direct, EntryId);
         %% one of the friends has deleted some entry from his feed. Ignore
-        _ ->
-            ok
-    end,
+        _ -> ok end,
     self() ! {feed_refresh, State#state.feed,20},
     {noreply, State};
 
-handle_notice(["feed", _Type, _EntryOwner, "entry", EntryId, "edit"] = Route,
-              Message,
-              #state{owner = Owner, feed=Feed} = State) ->
+handle_notice(["kvs_feed", _Type, _EntryOwner, "entry", EntryId, "edit"] = Route,
+              Message, #state{owner = Owner, feed=Feed} = State) ->
     [NewDescription|_] = Message,
-    ?INFO("feed(~p): edit: Owner=~p, Route=~p, Message=~p",
-          [self(), Owner, Route, Message]),
-
-    %% edit entry in all feeds
+    ?INFO("feed(~p): edit: Owner=~p, Route=~p, Message=~p", [self(), Owner, Route, Message]),
     kvs_feed:edit_entry(Feed, EntryId, NewDescription),
-
     {noreply, State};
 
-handle_notice(["feed", _Type, _EntryOwner, "comment", CommentId, "add"] = Route,
-              Message,
-              #state{owner = Owner, feed=Feed} = State) ->
+handle_notice(["kvs_feed", _Type, _EntryOwner, "comment", CommentId, "add"] = Route,
+              Message, #state{owner = Owner, feed=Feed} = State) ->
     [From, EntryId, ParentComment, Content, Medias] = Message,
-
-    ?INFO("feed(~p): add comment: Owner=~p, Route=~p, Message=~p",
-          [self(), Owner, Route, Message]),
-    kvs_feed:entry_add_comment(Feed, From, EntryId, ParentComment, CommentId, Content, Medias),
+    ?INFO("feed(~p): add comment: Owner=~p, Route=~p, Message=~p", [self(), Owner, Route, Message]),
+    kvs_comment:add(Feed, From, EntryId, ParentComment, CommentId, Content, Medias),
     {noreply, State};
 
-handle_notice(["feed", "user", UId, "count_entry_in_statistics"] = Route, 
+handle_notice(["kvs_feed", "user", UId, "count_entry_in_statistics"] = Route, 
     Message, #state{owner = Owner, type =Type} = State) ->
     ?INFO("queue_action(~p): count_entry_in_statistics: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
     case kvs:get(user_etries_count, UId) of
         {ok, UEC} -> 
-            kvs:put(UEC#user_etries_count{
-                entries = UEC#user_etries_count.entries+1
-            }),
+            kvs:put(UEC#user_etries_count{entries = UEC#user_etries_count.entries+1 }),
             kvs_users:attempt_active_user_top(UId, UEC#user_etries_count.entries+1);
         {error, notfound} ->
-            kvs:put(#user_etries_count{
-                user_id = UId,
-                entries = 1
-            }),
-            kvs_users:attempt_active_user_top(UId, 1)
-    end,
+            kvs:put(#user_etries_count{user_id = UId, entries = 1 }),
+            kvs_users:attempt_active_user_top(UId, 1) end,
     {noreply, State};
 
-handle_notice(["feed", "user", UId, "count_comment_in_statistics"] = Route, 
+handle_notice(["kvs_feed", "user", UId, "count_comment_in_statistics"] = Route, 
     Message, #state{owner = Owner, type =Type} = State) ->
     ?INFO("queue_action(~p): count_comment_in_statistics: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
     case kvs:get(user_etries_count, UId) of
-        {ok, UEC} -> 
-            kvs:put(UEC#user_etries_count{
-                comments = UEC#user_etries_count.comments+1
-            });
-        {error, notfound} ->
-            kvs:put(#user_etries_count{
-                user_id = UId,
-                comments = 1
-            })
-    end,
+        {ok, UEC} -> kvs:put(UEC#user_etries_count{comments = UEC#user_etries_count.comments+1 });
+        {error, notfound} -> kvs:put(#user_etries_count{ user_id = UId, comments = 1 }) end,
     {noreply, State};
 
-handle_notice(["likes", _, _, "add_like"] = Route,  % _, _ is here beacause of the same message used for comet update
+handle_notice(["kvs_feed","likes", _, _, "add_like"] = Route,  % _, _ is here beacause of the same message used for comet update
     Message, #state{owner = Owner, type =Type} = State) ->
     ?INFO("queue_action(~p): add_like: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
     {UId, E} = Message,
     {EId, FId} = E#entry.id,
-    feed:add_like(FId, EId, UId),
+    kvs_feed:add_like(FId, EId, UId),
     {noreply, State};
 
-handle_notice(Route, Message, State) -> error_logger:info_msg("Unknown FEEDS notice").
-
-%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-
-feed_add_direct_message(FId,User,To,EntryId,Desc,Medias) -> feed_add_entry(FId,User,To,EntryId,Desc,Medias,{user,direct},"").
-feed_add_entry(FId,From,EntryId,Desc,Medias) -> feed_add_entry(FId,From,undefined,EntryId,Desc,Medias,{user,normal},"").
-feed_add_entry(FId, User, To, EntryId,Desc,Medias,Type,SharedBy) ->
-    %% prevent adding of duplicate records to feed
-    case kvs:get(entry,{EntryId, FId}) of
-        {ok, _} -> ok;
-        _ -> do_feed_add_entry(FId, User, To, EntryId, Desc, Medias, Type, SharedBy)
-    end.
-
-do_feed_add_entry(FId, User, To, EntryId, Desc, Medias, Type, SharedBy) ->
-    {ok,Feed} = kvs:get(feed,erlang:integer_to_list(FId)),
-    Id = {EntryId, FId},
-    Next = undefined,
-    Prev = case Feed#feed.top of
-               undefined ->
-                   undefined;
-               X ->
-                   case kvs:get(entry, X) of
-                       {ok, TopEntry} ->
-                           EditedEntry = TopEntry#entry{next = Id},
-                           % update prev entry
-                           kvs:put(EditedEntry),
-                           TopEntry#entry.id;
-                       {error,notfound} ->
-                           undefined
-                   end
-           end,
-
-    kvs:put(#feed{id = FId, top = {EntryId, FId}}), % update feed top with current
-
-    Entry  = #entry{id = {EntryId, FId},
-                    entry_id = EntryId,
-                    feed_id = FId,
-                    from = User,
-                    to = To,
-                    type = Type,
-                    media = Medias,
-                    created_time = now(),
-                    description = Desc,
-                    raw_description = Desc,
-                    shared = SharedBy,
-                    next = Next,
-                    prev = Prev},
-
-    ModEntry = case catch feedformat:format(Entry) of
-                   {_, Reason} ->
-                       ?ERROR("feedformat error: ~p", [Reason]),
-                       Entry;
-                   #entry{} = ME ->
-                       ME
-               end,
-
-    kvs:put(ModEntry),
-    {ok, ModEntry}.
-
-purge_feed(FeedId) ->
-    {ok,Feed} = kvs:get(feed,FeedId),
-    Removal = riak_entry_traversal(Feed#feed.top, -1),
-    [kvs:delete(entry,Id)||#entry{id=Id}<-Removal],
-    kvs:put(Feed#feed{top=undefined}).
-
-purge_unverified_feeds() ->
-    [purge_feed(FeedId) || #user{feed=FeedId,status=S,email=E} <- kvs:all(user),E==undefined].
-
-entry_traversal(undefined, _) -> [];
-entry_traversal(_, 0) -> [];
-entry_traversal(Next, Count)->
-    case kvs:get(entry, Next) of
-        {error,notfound} -> [];
-        {ok, R} ->
-            Prev = element(#entry.prev, R),
-            Count1 = case Count of 
-                C when is_integer(C) -> case R#entry.type of
-                    {_, system} -> C;   % temporal entries are entries too, but they shouldn't be counted
-                    {_, system_note} -> C;
-                    _ -> C - 1
-                end;
-                _-> Count 
-            end,
-            [R | entry_traversal(Prev, Count1)]
-    end.
-
-entries_in_feed(FeedId, undefined, PageAmount) ->
-    case kvs:get(feed, FeedId) of
-        {ok, O} -> entry_traversal(O#feed.top, PageAmount);
-        {error, notfound} -> []
-    end;
-entries_in_feed(FeedId, StartFrom, PageAmount) ->
-    case kvs:get(entry,{StartFrom, FeedId}) of
-        {ok, #entry{prev = Prev}} -> entry_traversal(Prev, PageAmount);
-        _ -> []
-    end.
-
-
+handle_notice(Route, Message, State) -> error_logger:error_msg("Unknown FEED notice").