kvs_feed.erl 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. -module(kvs_feed).
  2. -author('Maxim Sokhatsky').
  3. -author('Andrii Zadorozhnii').
  4. -author('Alexander Kalenuk').
  5. -copyright('Synrc Research Center, s.r.o.').
  6. -compile(export_all).
  7. -include_lib("kvs/include/feeds.hrl").
  8. -include_lib("kvs/include/users.hrl").
  9. -include_lib("kvs/include/groups.hrl").
  10. -include_lib("kvs/include/feed_state.hrl").
  11. -include_lib("kvs/include/log.hrl").
  12. -define(CACHED_ENTRIES, 20).
  13. create() ->
  14. FId = kvs:next_id("feed", 1),
  15. ok = kvs:put(#feed{id = FId} ),
  16. FId.
  17. add_entry(FId, From, To, EntryId, Title, Desc, Medias, Type, SharedBy) ->
  18. case kvs:get(feed, FId) of
  19. {ok,Feed} ->
  20. Id = {EntryId, FId},
  21. Next = undefined,
  22. Prev = case Feed#feed.top of
  23. undefined -> undefined;
  24. X -> case kvs:get(entry, X) of
  25. {ok, TopEntry} -> EditedEntry = TopEntry#entry{next = Id}, kvs:put(EditedEntry), TopEntry#entry.id;
  26. {error, _} -> undefined end end,
  27. kvs:put(#feed{id = FId, top = {EntryId, FId}}), % update feed top with current
  28. Entry = #entry{id = {EntryId, FId}, entry_id = EntryId, feed_id = FId, from = From,
  29. to = To, type = Type, media = Medias, created = now(),
  30. title=Title, description = Desc, shared = SharedBy,
  31. next = Next, prev = Prev},
  32. kvs:put(Entry),
  33. error_logger:info_msg("PUT entry: ~p", [Entry]),
  34. {ok, Entry};
  35. {error, not_found} -> error_logger:info_msg("Add entry failed. No feed ~p", [FId])
  36. end.
  37. entry_traversal(undefined, _) -> [];
  38. entry_traversal(_, 0) -> [];
  39. entry_traversal(Next, Count)->
  40. case kvs:get(entry, Next) of
  41. {error, _} -> [];
  42. {ok, R} ->
  43. Prev = element(#entry.prev, R),
  44. Count1 = case Count of
  45. C when is_integer(C) -> case R#entry.type of
  46. {_, system} -> C; % temporal entries are entries too, but they shouldn't be counted
  47. {_, system_note} -> C;
  48. _ -> C - 1 end;
  49. _-> Count end,
  50. [R | entry_traversal(Prev, Count1)] end.
  51. entries(FeedId, undefined, PageAmount) ->
  52. case kvs:get(feed, FeedId) of
  53. {ok, O} -> entry_traversal(O#feed.top, PageAmount);
  54. {error, _} -> [] end;
  55. entries(FeedId, StartFrom, PageAmount) ->
  56. case kvs:get(entry,{StartFrom, FeedId}) of
  57. {ok, #entry{prev = Prev}} -> entry_traversal(Prev, PageAmount);
  58. _ -> [] end.
  59. add_like(Fid, Eid, Uid) ->
  60. Write_one_like = fun(Next) ->
  61. Self_id = kvs:next_id("one_like", 1),
  62. kvs:put(#one_like{ % add one like
  63. id = Self_id,
  64. user_id = Uid,
  65. entry_id = Eid,
  66. feed_id = Fid,
  67. created_time = now(),
  68. next = Next
  69. }),
  70. Self_id
  71. end,
  72. % add entry - like
  73. case kvs:get(entry_likes, Eid) of
  74. {ok, ELikes} ->
  75. kvs:put(ELikes#entry_likes{
  76. one_like_head = Write_one_like(ELikes#entry_likes.one_like_head),
  77. total_count = ELikes#entry_likes.total_count + 1
  78. });
  79. {error, _} ->
  80. kvs:put(#entry_likes{
  81. entry_id = Eid,
  82. one_like_head = Write_one_like(undefined),
  83. total_count = 1
  84. })
  85. end,
  86. % add user - like
  87. case kvs:get(user_likes, Uid) of
  88. {ok, ULikes} ->
  89. kvs:put(ULikes#user_likes{
  90. one_like_head = Write_one_like(ULikes#user_likes.one_like_head),
  91. total_count = ULikes#user_likes.total_count + 1
  92. });
  93. {error, _} ->
  94. kvs:put(#user_likes{
  95. user_id = Uid,
  96. one_like_head = Write_one_like(undefined),
  97. total_count = 1
  98. })
  99. end.
  100. entries_count(Uid) ->
  101. case kvs:get(user_etries_count, Uid) of
  102. {ok, UEC} -> UEC#user_etries_count.entries;
  103. {error, _} -> 0 end.
  104. comments_count(Uid) ->
  105. case kvs:get(user_etries_count, Uid) of
  106. {ok, UEC} -> UEC#user_etries_count.comments;
  107. {error, _} -> 0 end.
  108. remove_entry(FeedId, EId) ->
  109. {ok, #feed{top = TopId} = Feed} = kvs:get(feed,FeedId),
  110. case kvs:get(entry, {EId, FeedId}) of
  111. {ok, #entry{prev = Prev, next = Next}}->
  112. case kvs:get(entry, Next) of {ok, NE} -> kvs:put(NE#entry{prev = Prev}); _ -> ok end,
  113. case kvs:get(entry, Prev) of {ok, PE} -> kvs:put(PE#entry{next = Next}); _ -> ok end,
  114. case TopId of {EId, FeedId} -> kvs:put(Feed#feed{top = Prev}); _ -> ok end;
  115. {error, _} -> error_logger:info_msg("Not found"), ok
  116. end,
  117. kvs:delete(entry, {EId, FeedId}).
  118. edit_entry(FeedId, EId, NewDescription) ->
  119. case kvs:get(entry,{EId, FeedId}) of
  120. {ok, OldEntry} ->
  121. NewEntryRaw = OldEntry#entry{description = NewDescription},
  122. NewEntry = feedformat:format(NewEntryRaw),
  123. kvs:put(NewEntry);
  124. {error, Reason}-> {error, Reason} end.
  125. like_list(undefined) -> [];
  126. like_list(Id) -> {ok, OneLike} = kvs:get(one_like, Id), [OneLike] ++ like_list(OneLike#one_like.next).
  127. like_list(undefined, _) -> [];
  128. like_list(_, 0) -> [];
  129. like_list(Id, N) -> {ok, OneLike} = kvs:get(one_like, Id), [OneLike] ++ like_list(OneLike#one_like.next, N-1).
  130. entry_likes(Entry_id) ->
  131. case kvs:get(entry_likes, Entry_id) of
  132. {ok, Likes} -> like_list(Likes#entry_likes.one_like_head);
  133. {error, _} -> [] end.
  134. entry_likes_count(Entry_id) ->
  135. case kvs:get(entry_likes, Entry_id) of
  136. {ok, Likes} -> Likes#entry_likes.total_count;
  137. {error, _} -> 0 end.
  138. user_likes_count(UserId) ->
  139. case kvs:get(user_likes, UserId) of
  140. {ok, Likes} -> Likes#user_likes.total_count;
  141. {error, _} -> 0 end.
  142. user_likes(UserId) ->
  143. case kvs:get(user_likes, UserId) of
  144. {ok, Likes} -> like_list(Likes#user_likes.one_like_head);
  145. {error, _} -> [] end.
  146. user_likes(UserId, {Page, PageAmount}) ->
  147. case kvs:get(user_likes, UserId) of
  148. {ok, Likes} -> lists:nthtail((Page-1)*PageAmount, like_list(Likes#user_likes.one_like_head, PageAmount*Page));
  149. {error, _} -> [] end.
  150. purge_feed(FeedId) ->
  151. {ok,Feed} = kvs:get(feed,FeedId),
  152. Removal = entry_traversal(Feed#feed.top, -1),
  153. [kvs:delete(entry,Id)||#entry{id=Id}<-Removal],
  154. kvs:put(Feed#feed{top=undefined}).
  155. purge_unverified_feeds() ->
  156. [purge_feed(FeedId) || #user{feed=FeedId, email=E} <- kvs:all(user), E==undefined].
  157. %% MQ API
  158. handle_notice([kvs_feed, Totype, Toid, entry, EntryId, add],
  159. [Fid, From, Title, Desc, Medias, EntryType, _, _, _],
  160. #state{owner=Owner, feed=Feed}=State)->
  161. if Owner == Toid ->
  162. % handle user direct feed
  163. error_logger:info_msg("Add: entry ~p worker ~p feed ~p", [EntryId, Owner, Feed]),
  164. add_entry(case Totype of product -> Fid; _ -> Feed end, From, {Toid, Totype}, EntryId, Title, Desc, Medias, EntryType, ""),
  165. case Totype of
  166. group ->
  167. {ok, Group} = kvs:get(group, Toid),
  168. GE = Group#group.entries_count,
  169. kvs:put(Group#group{entries_count = GE+1}),
  170. {ok, Subs} = kvs:get(group_subscription, {From, Toid}),
  171. SE = Subs#group_subscription.posts_count,
  172. kvs:put(Subs#group_subscription{posts_count = SE+1});
  173. _ -> skip
  174. end,
  175. self() ! {feed_refresh, Fid, ?CACHED_ENTRIES};
  176. true -> skip end,
  177. {noreply, State};
  178. handle_notice([kvs_feed, _, Toid, entry, {Eid,_}, edit],
  179. [_, _, Title, Desc],
  180. #state{owner=Owner, feed=Fid}=State) ->
  181. if Owner == Toid ->
  182. error_logger:info_msg("Edit: worker ~p entry ~p feed ~p" , [Owner, Eid, Fid] ),
  183. case kvs:get(entry, {Eid, Fid}) of {error, not_found}-> skip; {ok, Entry} -> kvs:put(Entry#entry{title=Title, description=Desc}) end;
  184. true -> skip end,
  185. {noreply, State};
  186. handle_notice([kvs_feed, Totype, Toid, entry, {Eid,Fid}, delete],
  187. [_From|_], #state{owner=Owner, feed=Feed} = State) ->
  188. if Owner == Toid ->
  189. error_logger:info_msg("Delete: worker ~p entry ~p feed ~p", [Owner, Eid, Fid]),
  190. FeedId = case Totype of product -> Fid; _ -> Feed end, %kvs_acl:check_access(From, {feature, admin})
  191. kvs_feed:remove_entry(FeedId, Eid),
  192. self() ! {feed_refresh, FeedId, ?CACHED_ENTRIES};
  193. true-> skip
  194. end,
  195. {noreply, State};
  196. handle_notice([kvs_feed, entry, {Eid, FeedId}, comment, Cid, add],
  197. [From, Parent, Content, Medias, _, _],
  198. #state{owner=Owner, feed=Fid} = State) ->
  199. if FeedId == Fid ->
  200. [begin error_logger:info_msg("Comment: worker ~p entry ~p cid ~p",[Owner, Eid, Cid]),
  201. kvs_comment:add(E#entry.feed_id, From, E#entry.entry_id, Parent, Cid, Content, Medias)
  202. end || E <- kvs:all_by_index(entry, entry_id, Eid)];
  203. true -> skip end,
  204. {noreply, State};
  205. handle_notice(["feed", "user", UId, "post_note"] = Route,
  206. Message, #state{owner = Owner, feed = Feed} = State) ->
  207. error_logger:info_msg("feed(~p): post_note: Owner=~p, Route=~p, Message=~p", [self(), Owner, Route, Message]),
  208. Note = Message,
  209. Id = utils:uuid_ex(),
  210. kvs_feed:add_entry(Feed, UId, [], Id, Note, [], {user, system_note}, ""),
  211. {noreply, State};
  212. handle_notice(["kvs_feed", _, WhoShares, "entry", NewEntryId, "share"],
  213. #entry{entry_id = _EntryId, description = Desc, media = Medias, to = Destinations,
  214. from = From} = E, #state{feed = Feed, type = user} = State) ->
  215. %% FIXME: sharing is like posting to the wall
  216. error_logger:info_msg("share: ~p, WhoShares: ~p", [E, WhoShares]),
  217. kvs_feed:add_entry(Feed, From, Destinations, NewEntryId, Desc, Medias, {user, normal}, WhoShares),
  218. {noreply, State};
  219. handle_notice(["kvs_feed", "group", _Group, "entry", EntryId, "delete"] = Route,
  220. Message, #state{owner = Owner, feed = Feed} = State) ->
  221. error_logger:info_msg("feed(~p): remove entry: Owner=~p, Route=~p, Message=~p",
  222. [self(), Owner, Route, Message]),
  223. %% all group subscribers shold delete entry from their feeds
  224. kvs_feed:remove_entry(Feed, EntryId),
  225. self() ! {feed_refresh,Feed, ?CACHED_ENTRIES},
  226. {noreply, State};
  227. handle_notice(["kvs_feed", _Type, EntryOwner, "entry", EntryId, "delete"] = Route,
  228. Message, #state{owner = Owner, feed=Feed, direct=Direct} = State) ->
  229. case {EntryOwner, Message} of
  230. %% owner of the antry has deleted entry, we will delete it too
  231. {_, [EntryOwner|_]} ->
  232. error_logger:info_msg("feed(~p): remove entry: Owner=~p, Route=~p, Message=~p", [self(), Owner, Route, Message]),
  233. kvs_feed:remove_entry(Feed, EntryId),
  234. kvs_feed:remove_entry(Direct, EntryId);
  235. %% we are owner of the entry - delete it
  236. {Owner, _} ->
  237. error_logger:info_msg("feed(~p): remove entry: Owner=~p, Route=~p, Message=~p", [self(), Owner, Route, Message]),
  238. kvs_feed:remove_entry(Feed, EntryId),
  239. kvs_feed:remove_entry(Direct, EntryId);
  240. %% one of the friends has deleted some entry from his feed. Ignore
  241. _ -> ok end,
  242. self() ! {feed_refresh, State#state.feed, ?CACHED_ENTRIES},
  243. {noreply, State};
  244. handle_notice(["kvs_feed", "user", UId, "count_entry_in_statistics"] = Route,
  245. Message, #state{owner = Owner, type =Type} = State) ->
  246. error_logger:info_msg("queue_action(~p): count_entry_in_statistics: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
  247. case kvs:get(user_etries_count, UId) of
  248. {ok, UEC} ->
  249. kvs:put(UEC#user_etries_count{entries = UEC#user_etries_count.entries+1 }),
  250. kvs_users:attempt_active_user_top(UId, UEC#user_etries_count.entries+1);
  251. {error, _} ->
  252. kvs:put(#user_etries_count{user_id = UId, entries = 1 }),
  253. kvs_users:attempt_active_user_top(UId, 1) end,
  254. {noreply, State};
  255. handle_notice(["kvs_feed", "user", UId, "count_comment_in_statistics"] = Route,
  256. Message, #state{owner = Owner, type =Type} = State) ->
  257. error_logger:info_msg("queue_action(~p): count_comment_in_statistics: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
  258. case kvs:get(user_etries_count, UId) of
  259. {ok, UEC} -> kvs:put(UEC#user_etries_count{comments = UEC#user_etries_count.comments+1 });
  260. {error, _} -> kvs:put(#user_etries_count{ user_id = UId, comments = 1 }) end,
  261. {noreply, State};
  262. handle_notice(["kvs_feed","likes", _, _, "add_like"] = Route, % _, _ is here beacause of the same message used for comet update
  263. Message, #state{owner = Owner, type =Type} = State) ->
  264. error_logger:info_msg("queue_action(~p): add_like: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
  265. {UId, E} = Message,
  266. {EId, FId} = E#entry.id,
  267. kvs_feed:add_like(FId, EId, UId),
  268. {noreply, State};
  269. handle_notice(Route, _Message, State) -> error_logger:error_msg("Unknown FEED notice ~p", [Route]), {noreply, State}.