kvs_group.erl 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. -module(kvs_group).
  2. -compile(export_all).
  3. -include_lib("kvs/include/users.hrl").
  4. -include_lib("kvs/include/groups.hrl").
  5. -include_lib("kvs/include/accounts.hrl").
  6. -include_lib("kvs/include/log.hrl").
  7. -include_lib("kvs/include/feed_state.hrl").
  8. -include_lib("mqs/include/mqs.hrl").
  9. retrieve_groups(User) ->
  10. ?INFO("retrieve group for user: ~p",[User]),
  11. case participate(User) of
  12. [] -> [];
  13. Gs -> UC_GId = lists:sublist(lists:reverse(
  14. lists:sort([{members_count(GId), GId} || GId <- Gs])),
  15. 20),
  16. Result = [begin case kvs:get(group,GId) of
  17. {ok, Group} -> {Group#group.name,GId,UC};
  18. _ -> undefined end end || {UC, GId} <- UC_GId],
  19. [X||X<-Result,X/=undefined] end.
  20. create(Creator, GroupName, GroupFullName, Desc, Publicity) ->
  21. Feed = kvs_feed:create(),
  22. Time = erlang:now(),
  23. Group = #group{username = GroupName, name = GroupFullName, description = Desc, publicity = Publicity,
  24. creator = Creator, created = Time, owner = Creator, feed = Feed},
  25. kvs:put(Group),
  26. init_mq(Group),
  27. mqs:notify([group, init], {GroupName, Feed}),
  28. add(Creator, GroupName, member),
  29. GroupName.
  30. delete(GroupName) ->
  31. case kvs:get(group,GroupName) of
  32. {error,_} -> ok;
  33. {ok, Group} ->
  34. mqs:notify([feed, delete, GroupName], empty),
  35. kvs:delete_by_index(group_subscription, <<"where_bin">>, GroupName),
  36. kvs:delete(feed, Group#group.feed),
  37. kvs:delete(group, GroupName),
  38. case mqs:open([]) of
  39. {ok, Channel} ->
  40. Routes = kvs_users:rk_group_feed(GroupName),
  41. kvs_users:unbind_group_exchange(Channel, GroupName, Routes),
  42. mqs_channel:close(Channel);
  43. {error,Reason} -> ?ERROR("delete group failed: ~p",[Reason]) end end.
  44. participate(UserName) -> [GroupName || #group_subscription{group_id=GroupName} <- kvs:all_by_index(group_subscription, <<"who_bin">>, UserName) ].
  45. members(GroupName) -> [UserName || #group_subscription{user_id=UserName} <- kvs:all_by_index(group_subscription, <<"where_bin">>, GroupName) ].
  46. members_by_type(GroupName, Type) -> [UserName || #group_subscription{user_id=UserName, user_type=T} <- kvs:all_by_index(group_subscription, <<"where_bin">>, GroupName), T == Type ].
  47. members_with_types(GroupName) -> [{UserName, Type} || #group_subscription{user_id=UserName, user_type=Type} <- kvs:all_by_index(group_subscriptioin, <<"where_bin">>, list_to_binary(GroupName)) ].
  48. owner(UserName, GroupName) ->
  49. case kvs:get(group, GroupName) of
  50. {ok,Group} -> case Group#group.owner of UserName -> true; _ -> false end;
  51. _ -> false end.
  52. member(UserName, GroupName) ->
  53. case kvs:get(group_subscription, {UserName, GroupName}) of
  54. {error, _} -> false;
  55. {ok,_} -> true end.
  56. member_type(UserName, GroupName) ->
  57. case kvs:get(group_subs, {UserName, GroupName}) of
  58. {error, notfound} -> not_in_group;
  59. {ok, #group_subscription{user_type=Type}} -> Type end.
  60. add(UserName, GroupName, Type) ->
  61. kvs:put(#group_subscription{key={UserName,GroupName},user_id=UserName, group_id=GroupName, user_type=Type}),
  62. {ok, Group} = kvs:get(group, GroupName),
  63. Users = Group#group.users_count,
  64. kvs:put(Group#group{users_count = Users + 1}).
  65. join(UserName,GroupName) ->
  66. case kvs:get(group,GroupName) of
  67. {ok, #group{username = GroupName, publicity = public}} ->
  68. add(UserName, GroupName, member),
  69. {ok, joined};
  70. {ok, #group{username = GroupName}} ->
  71. case member_type(UserName, GroupName) of
  72. member -> {ok, joined};
  73. req -> {error, already_sent};
  74. req_rejected -> {error, request_rejected};
  75. not_in_group -> add(UserName, GroupName, req), {ok, requested};
  76. _ -> {error, unknown_type} end;
  77. Error -> Error end.
  78. leave(UserName,GroupName) ->
  79. kvs:delete(group_subscription, {UserName, GroupName}),
  80. case kvs:get(group, GroupName) of
  81. {error,_} -> ?ERROR("Remove ~p from group failed reading group ~p", [UserName, GroupName]);
  82. {ok,Group} -> kvs:put(Group#group{users_count = Group#group.users_count - 1}) end.
  83. approve_request(UserName, GroupName) -> add(UserName, GroupName, member).
  84. reject_request(UserName, GroupName) -> add(UserName, GroupName, req_rejected).
  85. member_type(UserName, GroupName, Type) -> mqs:notify(["subscription", "user", GroupName, "add_to_group"], {GroupName, UserName, Type}).
  86. exists(GroupName) -> case kvs:get(group,GroupName) of {ok,_} -> true; _ -> false end.
  87. coalesce(undefined, B) -> B;
  88. coalesce(A, _) -> A.
  89. publicity(GroupName) ->
  90. case kvs:get(group,GroupName) of
  91. {error,_} -> no_such_group;
  92. {ok,Group} -> Group#group.publicity end.
  93. members_count(GroupName) ->
  94. case kvs:get(group,GroupName) of
  95. {error,_} -> no_such_group;
  96. {ok,Group} -> Group#group.users_count end.
  97. user_has_access(UserName, GroupName) ->
  98. Type = member_type(UserName, GroupName),
  99. case kvs:get(group, GroupName) of
  100. {error,_} -> false;
  101. {ok,Group} ->
  102. Publicity = Group#group.publicity,
  103. case {Publicity, Type} of
  104. {public, _} -> true;
  105. {private, member} -> true;
  106. _ -> false end end.
  107. handle_notice(["kvs_group", "create"] = Route,
  108. Message, #state{owner = Owner, type =Type} = State) ->
  109. ?INFO("queue_action(~p): create_group: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
  110. {Creator, GroupName, FullName, Desc, Publicity} = Message,
  111. create(Creator, GroupName, FullName, Desc, Publicity),
  112. {noreply, State};
  113. handle_notice(["kvs_group", "update", GroupName] = Route,
  114. Message, #state{owner=ThisGroupOwner, type=Type} = State) ->
  115. ?INFO("queue_action(~p): update_group: Owner=~p, Route=~p, Message=~p", [self(), {Type, ThisGroupOwner}, Route, Message]),
  116. {_UId, _GroupUsername, Name, Description, Owner, Publicity} = Message,
  117. case kvs:get(group, GroupName) of
  118. {ok, Group} ->
  119. NewGroup = Group#group{name = coalesce(Name,Group#group.name),
  120. description = coalesce(Description,Group#group.description),
  121. publicity = coalesce(Publicity,Group#group.publicity),
  122. owner = coalesce(Owner,Group#group.owner)},
  123. kvs:put(NewGroup);
  124. {error,Reason} -> ?ERROR("Cannot update group ~p",[Reason]) end,
  125. {noreply, State};
  126. handle_notice(["kvs_group", "remove", GroupName] = Route,
  127. Message, #state{owner = Owner, type = Type} = State) ->
  128. ?INFO("queue_action(~p): remove_group: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
  129. delete(GroupName),
  130. {noreply, State};
  131. handle_notice(["kvs_group", "join", GroupName] = Route,
  132. Message, #state{owner = Owner, type =Type} = State) ->
  133. {GroupName, UserName, Type} = Message,
  134. join(UserName, GroupName),
  135. subscription_mq(group, add, UserName, GroupName),
  136. {noreply, State};
  137. handle_notice(["kvs_group", "leave", GroupName] = Route,
  138. Message, #state{owner = Owner, type =Type} = State) ->
  139. ?INFO("queue_action(~p): remove_from_group: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
  140. {UserName} = Message,
  141. leave(UserName,GroupName),
  142. subscription_mq(group, remove, UserName, GroupName),
  143. {noreply, State};
  144. handle_notice(Route, Message, State) -> error_logger:info_msg("Unknown GROUP notice").
  145. build_group_relations(Group) -> [
  146. msq:key( [kvs_group, create] ),
  147. msq:key( [kvs_group, update, Group] ),
  148. msq:key( [kvs_group, remove, Group] ),
  149. msq:key( [kvs_group, join, Group] ),
  150. msq:key( [kvs_group, leave, Group] ),
  151. msq:key( [kvs_group, like, Group]), % for comet mostly
  152. msq:key( [kvs_feed, delete, Group] ),
  153. msq:key( [kvs_feed, group, Group, '*', '*', '*'] )
  154. ].
  155. init_mq(Group=#group{}) ->
  156. GroupExchange = ?GROUP_EXCHANGE(Group#group.username),
  157. ExchangeOptions = [{type, <<"fanout">>}, durable, {auto_delete, false}],
  158. case mqs:open([]) of
  159. {ok, Channel} ->
  160. mqs_channel:create_exchange(Channel, GroupExchange, ExchangeOptions),
  161. Relations = build_group_relations(Group),
  162. [bind_group_exchange(Channel, Group, RK) || RK <- Relations],
  163. mqs_channel:close(Channel);
  164. {error, Reason} -> ?ERROR("init_mq error: ~p",[Reason]) end.
  165. rk_group_feed(Group) -> mqs_lib:list_to_key([feed, group, Group, '*', '*', '*']).
  166. bind_group_exchange(Channel, Group, Route) -> {bind, Route, mqs_channel:bind_exchange(Channel, ?GROUP_EXCHANGE(Group), ?NOTIFICATIONS_EX, Route)}.
  167. unbind_group_exchange(Channel, Group, Route) -> {unbind, Route, mqs_channel:unbind_exchange(Channel, ?GROUP_EXCHANGE(Group), ?NOTIFICATIONS_EX, Route)}.
  168. subscription_mq(Type, Action, MeId, ToId) ->
  169. case mqs:open([]) of
  170. {ok,Channel} ->
  171. case {Type,Action} of
  172. {group,add} -> bind_group_exchange(Channel, MeId, rk_group_feed(ToId));
  173. {group,remove} -> unbind_group_exchange(Channel, MeId, rk_group_feed(ToId)) end,
  174. mqs_channel:close(Channel);
  175. {error,Reason} -> ?ERROR("subscription_mq error: ~p",[Reason]) end.