|
@@ -3,13 +3,12 @@
|
|
-include_lib("kvs/include/kvs.hrl").
|
|
-include_lib("kvs/include/kvs.hrl").
|
|
-include_lib("kvs/include/products.hrl").
|
|
-include_lib("kvs/include/products.hrl").
|
|
-include_lib("kvs/include/purchases.hrl").
|
|
-include_lib("kvs/include/purchases.hrl").
|
|
--include_lib("kvs/include/users.hrl").
|
|
|
|
|
|
+-include_lib("kvs/include/user.hrl").
|
|
-include_lib("kvs/include/groups.hrl").
|
|
-include_lib("kvs/include/groups.hrl").
|
|
-include_lib("kvs/include/feeds.hrl").
|
|
-include_lib("kvs/include/feeds.hrl").
|
|
-include_lib("kvs/include/accounts.hrl").
|
|
-include_lib("kvs/include/accounts.hrl").
|
|
-include_lib("kvs/include/config.hrl").
|
|
-include_lib("kvs/include/config.hrl").
|
|
-include_lib("kvs/include/feed_state.hrl").
|
|
-include_lib("kvs/include/feed_state.hrl").
|
|
--include_lib("mqs/include/mqs.hrl").
|
|
|
|
-compile(export_all).
|
|
-compile(export_all).
|
|
|
|
|
|
init(Backend) ->
|
|
init(Backend) ->
|
|
@@ -26,31 +25,6 @@ delete(Name) ->
|
|
{ok, Product};
|
|
{ok, Product};
|
|
E -> E end.
|
|
E -> E end.
|
|
|
|
|
|
-subscription_mq(Type, Action, Who, Whom) ->
|
|
|
|
- case mqs:open([]) of
|
|
|
|
- {ok,Channel} ->
|
|
|
|
- case {Type,Action} of
|
|
|
|
- {user,add}->
|
|
|
|
- mqs_channel:bind_exchange(Channel, ?USER_EXCHANGE(Who), ?NOTIFICATIONS_EX, rk_product_feed(Whom));
|
|
|
|
- {user,remove} ->
|
|
|
|
- mqs_channel:unbind_exchange(Channel, ?USER_EXCHANGE(Who), ?NOTIFICATIONS_EX, rk_product_feed(Whom)) end,
|
|
|
|
- mqs_channel:close(Channel);
|
|
|
|
- {error,Reason} -> error_logger:info_msg("subscription_mq error: ~p",[Reason]) end.
|
|
|
|
-
|
|
|
|
-init_mq(Product=#product{}) ->
|
|
|
|
- Groups = kvs_group:participate(Product),
|
|
|
|
- ProductExchange = ?USER_EXCHANGE(Product#product.id),
|
|
|
|
- ExchangeOptions = [{type, <<"fanout">>}, durable, {auto_delete, false}],
|
|
|
|
- case mqs:open([]) of
|
|
|
|
- {ok, Channel} ->
|
|
|
|
- error_logger:info_msg("Cration Exchange: ~p,",[{Channel,ProductExchange,ExchangeOptions}]),
|
|
|
|
- mqs_channel:create_exchange(Channel, ProductExchange, ExchangeOptions),
|
|
|
|
- Relations = build_product_relations(Product, Groups),
|
|
|
|
- [ mqs_channel:bind_exchange(Channel, ?USER_EXCHANGE(Product#product.id), ?NOTIFICATIONS_EX, Route)
|
|
|
|
- || Route <- Relations],
|
|
|
|
- mqs_channel:close(Channel);
|
|
|
|
- {error,Reason} -> error_logger:info_msg("init_mq error: ~p",[Reason]) end.
|
|
|
|
-
|
|
|
|
build_product_relations(Product, Groups) -> [
|
|
build_product_relations(Product, Groups) -> [
|
|
mqs:key( [kvs_product, '*', Product]),
|
|
mqs:key( [kvs_product, '*', Product]),
|
|
mqs:key( [kvs_feed, product, Product, '*', '*', '*']),
|
|
mqs:key( [kvs_feed, product, Product, '*', '*', '*']),
|
|
@@ -86,14 +60,14 @@ handle_notice(["kvs_product", "subscribe", Who],
|
|
Message, #state{owner = _Owner, type =_Type} = State) ->
|
|
Message, #state{owner = _Owner, type =_Type} = State) ->
|
|
{Whom} = Message,
|
|
{Whom} = Message,
|
|
kvs_product:subscribe(Who, Whom),
|
|
kvs_product:subscribe(Who, Whom),
|
|
- subscription_mq(user, add, Who, Whom),
|
|
|
|
|
|
+% subscription_mq(user, add, Who, Whom),
|
|
{noreply, State};
|
|
{noreply, State};
|
|
|
|
|
|
handle_notice(["kvs_product", "unsubscribe", Who],
|
|
handle_notice(["kvs_product", "unsubscribe", Who],
|
|
Message, #state{owner = _Owner, type =_Type} = State) ->
|
|
Message, #state{owner = _Owner, type =_Type} = State) ->
|
|
{Whom} = Message,
|
|
{Whom} = Message,
|
|
kvs_product:unsubscribe(Who, Whom),
|
|
kvs_product:unsubscribe(Who, Whom),
|
|
- subscription_mq(user, remove, Who, Whom),
|
|
|
|
|
|
+% subscription_mq(user, remove, Who, Whom),
|
|
{noreply, State};
|
|
{noreply, State};
|
|
|
|
|
|
handle_notice([kvs_product, Owner, add],
|
|
handle_notice([kvs_product, Owner, add],
|