|
@@ -98,37 +98,38 @@ update_user(#user{username=UId,name=Name,surname=Surname} = NewUser) ->
|
|
|
end.
|
|
|
|
|
|
subscription_mq(Type, Action, MeId, ToId) ->
|
|
|
- {ok, Channel} = mqs:open([]),
|
|
|
- Routes = case Type of
|
|
|
- user -> rk_user_feed(ToId);
|
|
|
- group -> rk_group_feed(ToId)
|
|
|
- end,
|
|
|
- case Action of
|
|
|
- add -> bind_user_exchange(Channel, MeId, Routes);
|
|
|
- remove -> unbind_user_exchange(Channel, MeId, Routes)
|
|
|
- end,
|
|
|
- mqs_channel:close(Channel).
|
|
|
+ case mqs:open([]) of
|
|
|
+ {ok,Channel} -> Routes = case Type of user -> rk_user_feed(ToId); group -> rk_group_feed(ToId) end,
|
|
|
+ case Action of
|
|
|
+ add -> bind_user_exchange(Channel, MeId, Routes);
|
|
|
+ remove -> unbind_user_exchange(Channel, MeId, Routes) end,
|
|
|
+ mqs_channel:close(Channel);
|
|
|
+ {error,Reason} -> ?ERROR("subscription_mq error: ~p",[Reason]) end.
|
|
|
|
|
|
init_mq(User=#user{}) ->
|
|
|
- Groups = kvs_group:list_groups_per_user(User),
|
|
|
+ Groups = kvs_group:participate(User),
|
|
|
?INFO("~p init mq. users: ~p", [User, Groups]),
|
|
|
UserExchange = ?USER_EXCHANGE(User#user.username),
|
|
|
ExchangeOptions = [{type, <<"fanout">>}, durable, {auto_delete, false}],
|
|
|
- {ok, Channel} = mqs:open([]),
|
|
|
- ?INFO("Cration Exchange: ~p,",[{Channel,UserExchange,ExchangeOptions}]),
|
|
|
- mqs_channel:create_exchange(Channel, UserExchange, ExchangeOptions), ?INFO("Created OK"),
|
|
|
- Relations = build_user_relations(User, Groups),
|
|
|
- [bind_user_exchange(Channel, User, RK) || RK <- Relations],
|
|
|
- mqs_channel:close(Channel);
|
|
|
+ case mqs:open([]) of
|
|
|
+ {ok, Channel} ->
|
|
|
+ ?INFO("Cration Exchange: ~p,",[{Channel,UserExchange,ExchangeOptions}]),
|
|
|
+ mqs_channel:create_exchange(Channel, UserExchange, ExchangeOptions),
|
|
|
+ Relations = build_user_relations(User, Groups),
|
|
|
+ [bind_user_exchange(Channel, User, RK) || RK <- Relations],
|
|
|
+ mqs_channel:close(Channel);
|
|
|
+ {error,Reason} -> ?ERROR("init_mq error: ~p",[Reason]) end;
|
|
|
|
|
|
init_mq(Group=#group{}) ->
|
|
|
GroupExchange = ?GROUP_EXCHANGE(Group#group.username),
|
|
|
ExchangeOptions = [{type, <<"fanout">>}, durable, {auto_delete, false}],
|
|
|
- {ok, Channel} = mqs:open([]),
|
|
|
- ok = mqs_channel:create_exchange(Channel, GroupExchange, ExchangeOptions),
|
|
|
- Relations = build_group_relations(Group),
|
|
|
- [bind_group_exchange(Channel, Group, RK) || RK <- Relations],
|
|
|
- mqs_channel:close(Channel).
|
|
|
+ case mqs:open([]) of
|
|
|
+ {ok, Channel} ->
|
|
|
+ mqs_channel:create_exchange(Channel, GroupExchange, ExchangeOptions),
|
|
|
+ Relations = build_group_relations(Group),
|
|
|
+ [bind_group_exchange(Channel, Group, RK) || RK <- Relations],
|
|
|
+ mqs_channel:close(Channel);
|
|
|
+ {error, Reason} -> ?ERROR("init_mq error: ~p",[Reason]) end.
|
|
|
|
|
|
build_user_relations(User, Groups) -> [
|
|
|
rk( [db, user, User, put] ),
|