kvs_group.erl 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. -module(kvs_group).
  2. -compile(export_all).
  3. -include_lib("kvs/include/kvs.hrl").
  4. -include_lib("kvs/include/user.hrl").
  5. -include_lib("kvs/include/groups.hrl").
  6. -include_lib("kvs/include/feeds.hrl").
  7. -include_lib("kvs/include/accounts.hrl").
  8. -include_lib("kvs/include/feed_state.hrl").
  9. -include_lib("kvs/include/config.hrl").
  10. init(Backend) ->
  11. ?CREATE_TAB(group_subscription),
  12. ?CREATE_TAB(group),
  13. Backend:add_table_index(group_subscription, who),
  14. Backend:add_table_index(group_subscription, where),
  15. ok.
  16. retrieve_groups(User) ->
  17. case participate(User) of
  18. [] -> [];
  19. Gs -> UC_GId = lists:reverse(lists:sort([{members_count(GId), GId} || GId <- Gs])),
  20. Result = [begin case kvs:get(group,GId) of
  21. {ok, Group} -> {Group#group.name,GId,UC};
  22. _ -> undefined end end || {UC, GId} <- UC_GId],
  23. [X||X<-Result,X/=undefined] end.
  24. delete(GroupName) ->
  25. case kvs:get(group,GroupName) of
  26. {error,_} -> ok;
  27. {ok, Group} ->
  28. % mqs:notify([feed, delete, GroupName], empty),
  29. kvs:delete_by_index(group_subscription, <<"where_bin">>, GroupName),
  30. [kvs:delete(Feed, Fid) || {Feed, Fid} <- Group#group.feeds],
  31. kvs:delete(group, GroupName),
  32. case mqs:open([]) of
  33. {ok, Channel} ->
  34. Routes = kvs_users:rk_group_feed(GroupName),
  35. kvs_users:unbind_group_exchange(Channel, GroupName, Routes),
  36. mqs_channel:close(Channel);
  37. {error,Reason} -> error_logger:info_msg("delete group failed: ~p",[Reason]) end end.
  38. participate(UserName) -> DBA=?DBA,DBA:participate(UserName).
  39. members(GroupName) -> DBA=?DBA,DBA:members(GroupName).
  40. owner(UserName, GroupName) ->
  41. case kvs:get(group, GroupName) of
  42. {ok,Group} -> case Group#group.owner of UserName -> true; _ -> false end;
  43. _ -> false end.
  44. member(UserName, GroupName) ->
  45. case kvs:get(group_subscription, {UserName, GroupName}) of
  46. {error, _} -> false;
  47. {ok,_} -> true end.
  48. member_type(UserName, GroupName) ->
  49. case kvs:get(group_subs, {UserName, GroupName}) of
  50. {error, _} -> not_in_group;
  51. {ok, #group_subscription{type=Type}} -> Type end.
  52. add(UserName, GroupName, Type) ->
  53. kvs:put(#group_subscription{key={UserName,GroupName},who=UserName, where=GroupName, type=Type}),
  54. {ok, Group} = kvs:get(group, GroupName),
  55. Users = Group#group.users_count,
  56. kvs:put(Group#group{users_count = Users + 1}).
  57. join(UserName,GroupName) ->
  58. case kvs:get(group,GroupName) of
  59. {ok, #group{id = GroupName, scope = public}} ->
  60. add(UserName, GroupName, member),
  61. {ok, joined};
  62. {ok, #group{id = GroupName}} ->
  63. case member_type(UserName, GroupName) of
  64. member -> {ok, joined};
  65. req -> {error, already_sent};
  66. req_rejected -> {error, request_rejected};
  67. not_in_group -> add(UserName, GroupName, req), {ok, requested};
  68. _ -> {error, unknown_type} end;
  69. Error -> Error end.
  70. leave(UserName,GroupName) ->
  71. kvs:delete(group_subscription, {UserName, GroupName}),
  72. case kvs:get(group, GroupName) of
  73. {error,_} -> error_logger:info_msg("Remove ~p from group failed reading group ~p", [UserName, GroupName]);
  74. {ok,Group} -> kvs:put(Group#group{users_count = Group#group.users_count - 1}) end.
  75. approve_request(UserName, GroupName) -> add(UserName, GroupName, member).
  76. reject_request(UserName, GroupName) -> add(UserName, GroupName, req_rejected).
  77. member_type(UserName, GroupName, Type) -> msg:notify(["subscription", "user", GroupName, "add_to_group"], {GroupName, UserName, Type}).
  78. exists(GroupName) -> case kvs:get(group,GroupName) of {ok,_} -> true; _ -> false end.
  79. coalesce(undefined, B) -> B;
  80. coalesce(A, _) -> A.
  81. publicity(GroupName) ->
  82. case kvs:get(group,GroupName) of
  83. {error,_} -> no_such_group;
  84. {ok,Group} -> Group#group.scope end.
  85. members_count(GroupName) ->
  86. case kvs:get(group,GroupName) of
  87. {error,_} -> no_such_group;
  88. {ok,Group} -> Group#group.users_count end.
  89. user_has_access(UserName, GroupName) ->
  90. Type = member_type(UserName, GroupName),
  91. case kvs:get(group, GroupName) of
  92. {error,_} -> false;
  93. {ok,Group} ->
  94. Publicity = Group#group.scope,
  95. case {Publicity, Type} of
  96. {public, _} -> true;
  97. {private, member} -> true;
  98. _ -> false end end.
  99. handle_notice([kvs_group, Owner, add],
  100. [#group{}=Group],
  101. #state{owner=Owner}=State) ->
  102. error_logger:info_msg("[kvs_group] Create group ~p", [Owner]),
  103. Created = case kvs:add(Group) of {error, E} -> {error, E};
  104. {ok, #group{id=Id} = G} ->
  105. Params = [{id, Id}, {type, group}, {feeds, element(#iterator.feeds, G)}],
  106. case workers_sup:start_child(Params) of {error, E} -> {error, E}; _ -> G end end,
  107. msg:notify([kvs_group, group, Group#group.id, added], [Created]),
  108. {noreply, State};
  109. handle_notice(["kvs_group", "update", GroupName] = Route,
  110. Message, #state{owner=ThisGroupOwner, type=Type} = State) ->
  111. error_logger:info_msg("queue_action(~p): update_group: Owner=~p, Route=~p, Message=~p", [self(), {Type, ThisGroupOwner}, Route, Message]),
  112. {_UId, _GroupUsername, Name, Description, Owner, Publicity} = Message,
  113. case kvs:get(group, GroupName) of
  114. {ok, Group} ->
  115. NewGroup = Group#group{name = coalesce(Name,Group#group.name),
  116. description = coalesce(Description,Group#group.description),
  117. scope = coalesce(Publicity,Group#group.scope),
  118. owner = coalesce(Owner,Group#group.owner)},
  119. kvs:put(NewGroup);
  120. {error,Reason} -> error_logger:info_msg("Cannot update group ~p",[Reason]) end,
  121. {noreply, State};
  122. handle_notice([kvs_group, Owner, delete],
  123. [#group{}=P],
  124. #state{owner=Owner, feeds=Feeds}=State) ->
  125. error_logger:info_msg("[kvs_group] Delete group ~p ~p", [P#group.id, Owner]),
  126. Removed = case kvs:remove(P) of {error,E} -> {error,E};
  127. ok ->
  128. % todo: handle group feeds?
  129. supervisor:terminate_child(workers_sup, {group, P#group.id}),
  130. supervisor:delete_child(workers_sup, {group, P#group.id}),
  131. P
  132. end,
  133. msg:notify([kvs_product, group, P#group.id, deleted], [Removed]),
  134. {noreply, State};
  135. handle_notice([kvs_group, join, Owner],
  136. [{FeedName, Who}, Entry],
  137. #state{owner = Owner, feeds=Feeds} = State) ->
  138. error_logger:info_msg("[kvs_group] ~p Join group: ~p", [Who, Owner]),
  139. join(Who, Owner),
  140. case lists:keyfind(FeedName, 1, Feeds) of false -> skip;
  141. {_,Fid} -> msg:notify([kvs_feed, group, Owner, entry, Who, add],
  142. [Entry#entry{id={Who, Fid},feed_id=Fid,to={group, Owner}}]) end,
  143. % subscription_mq(group, add, UserName, GroupName),
  144. {noreply, State};
  145. handle_notice([kvs_group, leave, Owner],
  146. [{FeedName, Who}],
  147. #state{owner = Owner, feeds=Feeds} = State) ->
  148. error_logger:info_msg("[kvs_group] ~p leave group ~p", [Who, Owner]),
  149. leave(Who, Owner),
  150. case lists:keyfind(FeedName, 1, Feeds) of false -> skip;
  151. {_,Fid} -> msg:notify([kvs_feed, group, Owner, entry, delete], [#entry{id={Who,Fid}, feed_id=Fid}]) end,
  152. % subscription_mq(group, remove, UserName, GroupName),
  153. {noreply, State};
  154. handle_notice(_Route, _Message, State) ->
  155. % error_logger:info_msg("Unknown GROUP notice"),
  156. {noreply, State}.
  157. build_group_relations(Group) -> [
  158. mqs:key( [kvs_group, create] ),
  159. mqs:key( [kvs_group, update, Group] ),
  160. mqs:key( [kvs_group, remove, Group] ),
  161. mqs:key( [kvs_group, join, Group] ),
  162. mqs:key( [kvs_group, leave, Group] ),
  163. mqs:key( [kvs_group, like, Group]), % for comet mostly
  164. mqs:key( [kvs_feed, delete, Group] ),
  165. mqs:key( [kvs_feed, group, Group, '*', '*', '*'] )
  166. ].
  167. rk_group_feed(Group) -> mqs_lib:list_to_key([kvs_feed, group, Group, '*', '*', '*']).