kvs_feed.erl 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370
  1. -module(kvs_feed).
  2. -copyright('Synrc Research Center, s.r.o.').
  3. -compile(export_all).
  4. -include_lib("kvs/include/feeds.hrl").
  5. -include_lib("kvs/include/users.hrl").
  6. -include_lib("kvs/include/groups.hrl").
  7. -include_lib("kvs/include/feed_state.hrl").
  8. -include_lib("kvs/include/log.hrl").
  9. create() ->
  10. FId = kvs:next_id("feed", 1),
  11. ok = kvs:put(#feed{id = FId} ),
  12. FId.
  13. add_entry(FId, User, To, EntryId,Desc,Medias,Type,SharedBy) ->
  14. case kvs:get(entry,{EntryId, FId}) of
  15. {ok, _} -> ok;
  16. _ -> add_entry(FId, User, To, EntryId, Desc, Medias, Type, SharedBy, dont_check) end.
  17. add_entry(FId, User, To, EntryId, Desc, Medias, Type, SharedBy, _) ->
  18. {ok,Feed} = kvs:get(feed, erlang:integer_to_list(FId)),
  19. Id = {EntryId, FId},
  20. Next = undefined,
  21. Prev = case Feed#feed.top of
  22. undefined -> undefined;
  23. X -> case kvs:get(entry, X) of
  24. {ok, TopEntry} -> EditedEntry = TopEntry#entry{next = Id}, kvs:put(EditedEntry), TopEntry#entry.id;
  25. {error,notfound} -> undefined end end,
  26. kvs:put(#feed{id = FId, top = {EntryId, FId}}), % update feed top with current
  27. Entry = #entry{id = {EntryId, FId},
  28. entry_id = EntryId,
  29. feed_id = FId,
  30. from = User,
  31. to = To,
  32. type = Type,
  33. media = Medias,
  34. created_time = now(),
  35. description = Desc,
  36. raw_description = Desc,
  37. shared = SharedBy,
  38. next = Next,
  39. prev = Prev},
  40. ModEntry = case catch feedformat:format(Entry) of
  41. {_, Reason} -> ?ERROR("feedformat error: ~p", [Reason]), Entry;
  42. #entry{} = ME -> ME end,
  43. kvs:put(ModEntry),
  44. {ok, ModEntry}.
  45. entry_traversal(undefined, _) -> [];
  46. entry_traversal(_, 0) -> [];
  47. entry_traversal(Next, Count)->
  48. case kvs:get(entry, Next) of
  49. {error,notfound} -> [];
  50. {ok, R} ->
  51. Prev = element(#entry.prev, R),
  52. Count1 = case Count of
  53. C when is_integer(C) -> case R#entry.type of
  54. {_, system} -> C; % temporal entries are entries too, but they shouldn't be counted
  55. {_, system_note} -> C;
  56. _ -> C - 1
  57. end;
  58. _-> Count
  59. end,
  60. [R | entry_traversal(Prev, Count1)]
  61. end.
  62. entries(FeedId, undefined, PageAmount) ->
  63. case kvs:get(feed, FeedId) of
  64. {ok, O} -> entry_traversal(O#feed.top, PageAmount);
  65. {error, notfound} -> [] end;
  66. entries(FeedId, StartFrom, PageAmount) ->
  67. case kvs:get(entry,{StartFrom, FeedId}) of
  68. {ok, #entry{prev = Prev}} -> entry_traversal(Prev, PageAmount);
  69. _ -> [] end.
  70. add_like(Fid, Eid, Uid) ->
  71. Write_one_like = fun(Next) ->
  72. Self_id = kvs:next_id("one_like", 1),
  73. kvs:put(#one_like{ % add one like
  74. id = Self_id,
  75. user_id = Uid,
  76. entry_id = Eid,
  77. feed_id = Fid,
  78. created_time = now(),
  79. next = Next
  80. }),
  81. Self_id
  82. end,
  83. % add entry - like
  84. case kvs:get(entry_likes, Eid) of
  85. {ok, ELikes} ->
  86. kvs:put(ELikes#entry_likes{
  87. one_like_head = Write_one_like(ELikes#entry_likes.one_like_head),
  88. total_count = ELikes#entry_likes.total_count + 1
  89. });
  90. {error, notfound} ->
  91. kvs:put(#entry_likes{
  92. entry_id = Eid,
  93. one_like_head = Write_one_like(undefined),
  94. total_count = 1
  95. })
  96. end,
  97. % add user - like
  98. case kvs:get(user_likes, Uid) of
  99. {ok, ULikes} ->
  100. kvs:put(ULikes#user_likes{
  101. one_like_head = Write_one_like(ULikes#user_likes.one_like_head),
  102. total_count = ULikes#user_likes.total_count + 1
  103. });
  104. {error, notfound} ->
  105. kvs:put(#user_likes{
  106. user_id = Uid,
  107. one_like_head = Write_one_like(undefined),
  108. total_count = 1
  109. })
  110. end.
  111. entries_count(Uid) ->
  112. case kvs:get(user_etries_count, Uid) of
  113. {ok, UEC} -> UEC#user_etries_count.entries;
  114. {error, notfound} -> 0 end.
  115. comments_count(Uid) ->
  116. case kvs:get(user_etries_count, Uid) of
  117. {ok, UEC} -> UEC#user_etries_count.comments;
  118. {error, notfound} -> 0 end.
  119. remove_entry(FeedId, EId) ->
  120. {ok, #feed{top = TopId} = Feed} = kvs:get(feed,FeedId),
  121. case kvs:get(entry, {EId, FeedId}) of
  122. {ok, #entry{prev = Prev, next = Next}}->
  123. ?INFO("P: ~p, N: ~p", [Prev, Next]),
  124. case kvs:get(entry, Next) of {ok, NE} -> kvs:put(NE#entry{prev = Prev}); _ -> ok end,
  125. case kvs:get(entry, Prev) of {ok, PE} -> kvs:put(PE#entry{next = Next}); _ -> ok end,
  126. case TopId of {EId, FeedId} -> kvs:put(Feed#feed{top = Prev}); _ -> ok end;
  127. {error, notfound} -> ?INFO("Not found"), ok
  128. end,
  129. kvs:delete(entry, {EId, FeedId}).
  130. edit_entry(FeedId, EId, NewDescription) ->
  131. case kvs:get(entry,{EId, FeedId}) of
  132. {ok, OldEntry} ->
  133. NewEntryRaw = OldEntry#entry{description = NewDescription, raw_description = NewDescription},
  134. NewEntry = feedformat:format(NewEntryRaw),
  135. kvs:put(NewEntry);
  136. {error, notfound}-> {error, notfound} end.
  137. remove_entry_comments(FId, EId) ->
  138. AllComments = kvs:comments_by_entry(FId, EId),
  139. [begin kvs:delete(comment, ID) end || #comment{id = ID, media = M} <- AllComments].
  140. entry_add_comment(FId, User, EntryId, ParentComment, CommentId, Content, Medias) ->
  141. case kvs:get(entry,{EntryId, FId}) of
  142. {ok, _E} -> kvs_comment:add(FId, User, EntryId, ParentComment, CommentId, Content, Medias);
  143. _ -> ok end.
  144. like_list(undefined) -> [];
  145. like_list(Id) -> {ok, OneLike} = kvs:get(one_like, Id), [OneLike] ++ like_list(OneLike#one_like.next).
  146. like_list(undefined, _) -> [];
  147. like_list(_, 0) -> [];
  148. like_list(Id, N) -> {ok, OneLike} = kvs:get(one_like, Id), [OneLike] ++ like_list(OneLike#one_like.next, N-1).
  149. entry_likes(Entry_id) ->
  150. case kvs:get(entry_likes, Entry_id) of
  151. {ok, Likes} -> get_one_like_list(Likes#entry_likes.one_like_head);
  152. {error, notfound} -> [] end.
  153. entry_likes_count(Entry_id) ->
  154. case kvs:get(entry_likes, Entry_id) of
  155. {ok, Likes} -> Likes#entry_likes.total_count;
  156. {error, notfound} -> 0 end.
  157. user_likes_count(UserId) ->
  158. case kvs:get(user_likes, UserId) of
  159. {ok, Likes} -> Likes#user_likes.total_count;
  160. {error, notfound} -> 0 end.
  161. user_likes(UserId) ->
  162. case kvs:get(user_likes, UserId) of
  163. {ok, Likes} -> get_one_like_list(Likes#user_likes.one_like_head);
  164. {error, notfound} -> [] end.
  165. user_likes(UserId, {Page, PageAmount}) ->
  166. case kvs:get(user_likes, UserId) of
  167. {ok, Likes} -> lists:nthtail((Page-1)*PageAmount, get_one_like_list(Likes#user_likes.one_like_head, PageAmount*Page));
  168. {error, notfound} -> [] end.
  169. comments_entries(UserUid, _, Page, PageAmount) ->
  170. Pids = [Eid || #comment{entry_id=Eid} <- kvs:select(comment,
  171. fun(#comment{author_id=Who}) when Who=:=UserUid ->true;(_)->false end)],
  172. lists:flatten([kvs:select(entry,[{where, fun(#entry{entry_id=ID})-> ID=:=Pid end},
  173. {order, {1, descending}},{limit, {1,1}}]) || Pid <- Pids]).
  174. get_my_discussions(FId, Page, PageAmount, UserUid) ->
  175. Offset = case (Page-1)*PageAmount of 0 -> 1; M-> M end,
  176. Pids = [Eid || #comment{entry_id=Eid} <- kvs:select(comment,
  177. fun(#comment{author_id=Who}) when Who=:=UserUid ->true;(_)->false end)],
  178. lists:flatten([kvs:select(entry,[{where, fun(#entry{entry_id=ID})-> ID=:=Pid end},
  179. {order, {1, descending}},{limit, {1,1}}]) || Pid <- Pids]).
  180. purge_feed(FeedId) ->
  181. {ok,Feed} = kvs:get(feed,FeedId),
  182. Removal = entry_traversal(Feed#feed.top, -1),
  183. [kvs:delete(entry,Id)||#entry{id=Id}<-Removal],
  184. kvs:put(Feed#feed{top=undefined}).
  185. purge_unverified_feeds() ->
  186. [purge_feed(FeedId) || #user{feed=FeedId,status=S,email=E} <- kvs:all(user),E==undefined].
  187. %% MQ API
  188. handle_notice(["kvs_feed", "delete", Owner] = Route,
  189. Message, #state{owner = Owner} = State) ->
  190. ?INFO("feed(~p): notification received: User=~p, Route=~p, Message=~p", [self(), Owner, Route, Message]),
  191. {stop, normal, State};
  192. handle_notice(["kvs_feed", "group", GroupId, "entry", EntryId, "add"] = Route, [From|_] = Message, #state{owner = Owner, feed = Feed} = State) ->
  193. ?INFO("feed(~p): group message: Owner=~p, Route=~p, Message=~p", [self(), Owner, Route, Message]),
  194. [From, _Destinations, Desc, Medias] = Message,
  195. kvs_feed:add_entry(Feed, From, [{GroupId, group}], EntryId, Desc, Medias, {group, direct}, ""),
  196. case Owner == GroupId of
  197. false -> ok;
  198. true ->
  199. {ok, Group} = kvs:get(group, GroupId),
  200. GE = Group#group.entries_count,
  201. kvs:put(Group#group{entries_count = GE+1}),
  202. {ok, Subs} = kvs:get(group_subs, {From, GroupId}),
  203. SE = Subs#group_subscription.user_posts_count,
  204. kvs:put(Subs#group_subscription{user_posts_count = SE+1})
  205. end,
  206. self() ! {feed_refresh,Feed,20},
  207. {noreply, State};
  208. handle_notice(["kvs_feed", "user", FeedOwner, "entry", EntryId, "add"] = Route,
  209. [From|_] = Message, #state{owner = WorkerOwner, feed = Feed, direct = Direct} = State) ->
  210. ?INFO("feed(~p): message: Owner=~p, Route=~p, Message=~p", [self(), WorkerOwner, Route, Message]),
  211. [From, Destinations, Desc, Medias] = Message,
  212. if
  213. %% user added message to own feed
  214. FeedOwner == From andalso FeedOwner == WorkerOwner->
  215. FilteredDst = [D || {_, group} = D <- Destinations],
  216. kvs_feed:add_entry(Feed, From, FilteredDst, EntryId, Desc, Medias, {user, normal},""), self() ! {feed_refresh,Feed,20};
  217. %% friend added message to public feed
  218. FeedOwner == From ->
  219. kvs_feed:add_entry(Feed, From, [], EntryId, Desc, Medias, {user, normal},""), self() ! {feed_refresh,Feed,20};
  220. %% direct message to worker owner
  221. FeedOwner == WorkerOwner ->
  222. kvs_feed:add_entry(Direct, From, [{FeedOwner, user}], EntryId, Desc, Medias, {user,direct}, ""), self() ! {direct_refresh,Direct,20};
  223. %% user sent direct message to friend, add copy to his direct feed
  224. From == WorkerOwner ->
  225. kvs_feed:add_entry(Direct, WorkerOwner, Destinations, EntryId, Desc, Medias, {user, direct}, ""), self() ! {direct_refresh,Direct,20};
  226. true -> ?INFO("not matched case in entry->add")
  227. end,
  228. {noreply, State};
  229. handle_notice(["kvs_feed", "user", _FeedOwner, "entry", EntryId, "add_system"] = Route,
  230. [From|_] = Message, #state{owner = WorkerOwner, feed = Feed, direct = _Direct} = State) ->
  231. ?INFO("feed(~p): system message: Owner=~p, Route=~p, Message=~p", [self(), WorkerOwner, Route, Message]),
  232. [From, _Destinations, Desc, Medias] = Message,
  233. kvs_feed:add_entry(Feed, From, [], EntryId, Desc, Medias, {user, system}, ""),
  234. {noreply, State};
  235. handle_notice(["kvs_feed", "group", GroupId, "entry", EntryId, "add_system"] = Route,
  236. [From|_] = Message, #state{owner = Owner, feed = Feed} = State) ->
  237. ?INFO("feed(~p): group system message: Owner=~p, Route=~p, Message=~p",
  238. [self(), Owner, Route, Message]),
  239. [From, _Destinations, Desc, Medias] = Message,
  240. kvs_feed:add_entry(Feed, From, [{GroupId, group}], EntryId, Desc, Medias, {group, system}, ""),
  241. {noreply, State};
  242. handle_notice(["feed", "user", UId, "post_note"] = Route,
  243. Message, #state{owner = Owner, feed = Feed} = State) ->
  244. ?INFO("feed(~p): post_note: Owner=~p, Route=~p, Message=~p", [self(), Owner, Route, Message]),
  245. Note = Message,
  246. Id = utils:uuid_ex(),
  247. kvs_feed:add_entry(Feed, UId, [], Id, Note, [], {user, system_note}, ""),
  248. {noreply, State};
  249. handle_notice(["kvs_feed", _, WhoShares, "entry", NewEntryId, "share"],
  250. #entry{entry_id = _EntryId, raw_description = Desc, media = Medias, to = Destinations,
  251. from = From} = E, #state{feed = Feed, type = user} = State) ->
  252. %% FIXME: sharing is like posting to the wall
  253. ?INFO("share: ~p, WhoShares: ~p", [E, WhoShares]),
  254. kvs_feed:add_entry(Feed, From, Destinations, NewEntryId, Desc, Medias, {user, normal}, WhoShares),
  255. {noreply, State};
  256. handle_notice(["kvs_feed", "group", _Group, "entry", EntryId, "delete"] = Route,
  257. Message, #state{owner = Owner, feed = Feed} = State) ->
  258. ?INFO("feed(~p): remove entry: Owner=~p, Route=~p, Message=~p",
  259. [self(), Owner, Route, Message]),
  260. %% all group subscribers shold delete entry from their feeds
  261. kvs_feed:remove_entry(Feed, EntryId),
  262. self() ! {feed_refresh,Feed,20},
  263. {noreply, State};
  264. handle_notice(["kvs_feed", _Type, EntryOwner, "entry", EntryId, "delete"] = Route,
  265. Message, #state{owner = Owner, feed=Feed, direct=Direct} = State) ->
  266. case {EntryOwner, Message} of
  267. %% owner of the antry has deleted entry, we will delete it too
  268. {_, [EntryOwner|_]} ->
  269. ?INFO("feed(~p): remove entry: Owner=~p, Route=~p, Message=~p", [self(), Owner, Route, Message]),
  270. kvs_feed:remove_entry(Feed, EntryId),
  271. kvs_feed:remove_entry(Direct, EntryId);
  272. %% we are owner of the entry - delete it
  273. {Owner, _} ->
  274. ?INFO("feed(~p): remove entry: Owner=~p, Route=~p, Message=~p", [self(), Owner, Route, Message]),
  275. kvs_feed:remove_entry(Feed, EntryId),
  276. kvs_feed:remove_entry(Direct, EntryId);
  277. %% one of the friends has deleted some entry from his feed. Ignore
  278. _ -> ok end,
  279. self() ! {feed_refresh, State#state.feed,20},
  280. {noreply, State};
  281. handle_notice(["kvs_feed", _Type, _EntryOwner, "entry", EntryId, "edit"] = Route,
  282. Message, #state{owner = Owner, feed=Feed} = State) ->
  283. [NewDescription|_] = Message,
  284. ?INFO("feed(~p): edit: Owner=~p, Route=~p, Message=~p", [self(), Owner, Route, Message]),
  285. kvs_feed:edit_entry(Feed, EntryId, NewDescription),
  286. {noreply, State};
  287. handle_notice(["kvs_feed", _Type, _EntryOwner, "comment", CommentId, "add"] = Route,
  288. Message, #state{owner = Owner, feed=Feed} = State) ->
  289. [From, EntryId, ParentComment, Content, Medias] = Message,
  290. ?INFO("feed(~p): add comment: Owner=~p, Route=~p, Message=~p", [self(), Owner, Route, Message]),
  291. kvs_comment:add(Feed, From, EntryId, ParentComment, CommentId, Content, Medias),
  292. {noreply, State};
  293. handle_notice(["kvs_feed", "user", UId, "count_entry_in_statistics"] = Route,
  294. Message, #state{owner = Owner, type =Type} = State) ->
  295. ?INFO("queue_action(~p): count_entry_in_statistics: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
  296. case kvs:get(user_etries_count, UId) of
  297. {ok, UEC} ->
  298. kvs:put(UEC#user_etries_count{entries = UEC#user_etries_count.entries+1 }),
  299. kvs_users:attempt_active_user_top(UId, UEC#user_etries_count.entries+1);
  300. {error, notfound} ->
  301. kvs:put(#user_etries_count{user_id = UId, entries = 1 }),
  302. kvs_users:attempt_active_user_top(UId, 1) end,
  303. {noreply, State};
  304. handle_notice(["kvs_feed", "user", UId, "count_comment_in_statistics"] = Route,
  305. Message, #state{owner = Owner, type =Type} = State) ->
  306. ?INFO("queue_action(~p): count_comment_in_statistics: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
  307. case kvs:get(user_etries_count, UId) of
  308. {ok, UEC} -> kvs:put(UEC#user_etries_count{comments = UEC#user_etries_count.comments+1 });
  309. {error, notfound} -> kvs:put(#user_etries_count{ user_id = UId, comments = 1 }) end,
  310. {noreply, State};
  311. handle_notice(["kvs_feed","likes", _, _, "add_like"] = Route, % _, _ is here beacause of the same message used for comet update
  312. Message, #state{owner = Owner, type =Type} = State) ->
  313. ?INFO("queue_action(~p): add_like: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
  314. {UId, E} = Message,
  315. {EId, FId} = E#entry.id,
  316. kvs_feed:add_like(FId, EId, UId),
  317. {noreply, State};
  318. handle_notice(Route, Message, State) -> error_logger:error_msg("Unknown FEED notice").