|
@@ -28,25 +28,28 @@ delete(Name) ->
|
|
|
|
|
|
subscription_mq(Type, Action, Who, Whom) ->
|
|
subscription_mq(Type, Action, Who, Whom) ->
|
|
case mqs:open([]) of
|
|
case mqs:open([]) of
|
|
- {ok,Channel} ->
|
|
+ {ok,Channel} ->
|
|
- case {Type,Action} of
|
|
+ case {Type,Action} of
|
|
- {user,add} -> mqs_channel:bind_exchange(Channel, ?USER_EXCHANGE(Who), ?NOTIFICATIONS_EX, rk_product_feed(Whom));
|
|
+ {user,add}->
|
|
- {user,remove} -> mqs_channel:unbind_exchange(Channel, ?USER_EXCHANGE(Who), ?NOTIFICATIONS_EX, rk_product_feed(Whom)) end,
|
|
+ mqs_channel:bind_exchange(Channel, ?USER_EXCHANGE(Who), ?NOTIFICATIONS_EX, rk_product_feed(Whom));
|
|
- mqs_channel:close(Channel);
|
|
+ {user,remove} ->
|
|
- {error,Reason} -> error_logger:info_msg("subscription_mq error: ~p",[Reason]) end.
|
|
+ mqs_channel:unbind_exchange(Channel, ?USER_EXCHANGE(Who), ?NOTIFICATIONS_EX, rk_product_feed(Whom)) end,
|
|
|
|
+ mqs_channel:close(Channel);
|
|
|
|
+ {error,Reason} -> error_logger:info_msg("subscription_mq error: ~p",[Reason]) end.
|
|
|
|
|
|
init_mq(Product=#product{}) ->
|
|
init_mq(Product=#product{}) ->
|
|
Groups = kvs_group:participate(Product),
|
|
Groups = kvs_group:participate(Product),
|
|
ProductExchange = ?USER_EXCHANGE(Product#product.id),
|
|
ProductExchange = ?USER_EXCHANGE(Product#product.id),
|
|
ExchangeOptions = [{type, <<"fanout">>}, durable, {auto_delete, false}],
|
|
ExchangeOptions = [{type, <<"fanout">>}, durable, {auto_delete, false}],
|
|
case mqs:open([]) of
|
|
case mqs:open([]) of
|
|
- {ok, Channel} ->
|
|
+ {ok, Channel} ->
|
|
- error_logger:info_msg("Cration Exchange: ~p,",[{Channel,ProductExchange,ExchangeOptions}]),
|
|
+ error_logger:info_msg("Cration Exchange: ~p,",[{Channel,ProductExchange,ExchangeOptions}]),
|
|
- mqs_channel:create_exchange(Channel, ProductExchange, ExchangeOptions),
|
|
+ mqs_channel:create_exchange(Channel, ProductExchange, ExchangeOptions),
|
|
- Relations = build_product_relations(Product, Groups),
|
|
+ Relations = build_product_relations(Product, Groups),
|
|
- [ mqs_channel:bind_exchange(Channel, ?USER_EXCHANGE(Product#product.id), ?NOTIFICATIONS_EX, Route) || Route <- Relations],
|
|
+ [ mqs_channel:bind_exchange(Channel, ?USER_EXCHANGE(Product#product.id), ?NOTIFICATIONS_EX, Route)
|
|
- mqs_channel:close(Channel);
|
|
+ || Route <- Relations],
|
|
- {error,Reason} -> error_logger:info_msg("init_mq error: ~p",[Reason]) end.
|
|
+ mqs_channel:close(Channel);
|
|
|
|
+ {error,Reason} -> error_logger:info_msg("init_mq error: ~p",[Reason]) end.
|
|
|
|
|
|
build_product_relations(Product, Groups) -> [
|
|
build_product_relations(Product, Groups) -> [
|
|
mqs:key( [kvs_product, '*', Product]),
|
|
mqs:key( [kvs_product, '*', Product]),
|
|
@@ -99,6 +102,51 @@ handle_notice(["kvs_product", "update", _Who],
|
|
kvs:put(NewProduct),
|
|
kvs:put(NewProduct),
|
|
{noreply, State};
|
|
{noreply, State};
|
|
|
|
|
|
|
|
+handle_notice([kvs_product, Owner, update],
|
|
|
|
+ [#product{}=Product, Recipients, Is, Fs], #state{owner=Owner} = State) ->
|
|
|
|
+ error_logger:info_msg("[kvs_product] Update product ~p", [Owner]),
|
|
|
|
+ case kvs:get(product, Owner) of {error,E}->
|
|
|
|
+ msg:notify([kvs_product, product, Owner, updated], [{error,E}, Is, Fs]);
|
|
|
|
+ {ok, #product{}=P} ->
|
|
|
|
+ Id = P#product.id,
|
|
|
|
+ UpdProduct = P#product{
|
|
|
|
+ title = Product#product.title,
|
|
|
|
+ brief = Product#product.brief,
|
|
|
|
+ cover = Product#product.cover,
|
|
|
|
+ price = Product#product.price,
|
|
|
|
+ currency = Product#product.currency},
|
|
|
|
+ kvs:put(UpdProduct),
|
|
|
|
+
|
|
|
|
+ Medias = case Product#product.cover of undefined -> [];
|
|
|
|
+ File -> [#media{url=File,
|
|
|
|
+ thumbnail_url = filename:join([filename:dirname(File),"thumbnail",filename:basename(File)]) }] end,
|
|
|
|
+
|
|
|
|
+ Entry = #entry{
|
|
|
|
+ created=P#product.created,
|
|
|
|
+ entry_id=Id,
|
|
|
|
+ from=P#product.owner,
|
|
|
|
+ type= product,
|
|
|
|
+ media=Medias,
|
|
|
|
+ title=Product#product.title,
|
|
|
|
+ description=Product#product.brief,
|
|
|
|
+ shared=""},
|
|
|
|
+
|
|
|
|
+ Groups = ordsets:from_list([Gid || {Type,Gid} <- Recipients, Type==group]),
|
|
|
|
+ Participate = ordsets:from_list([Gid || #group_subscription{where=Gid} <- kvs_group:participate(Id)]),
|
|
|
|
+ Intersection = ordsets:intersection(Groups, Participate),
|
|
|
|
+ Leave = ordsets:subtract(Participate, Intersection),
|
|
|
|
+ Join = ordsets:subtract(Groups, Intersection),
|
|
|
|
+
|
|
|
|
+ [msg:notify([kvs_group, leave, Gid], [{products, Id}]) || Gid <- Leave],
|
|
|
|
+ [msg:notify([kvs_group, join, Gid], [{products, Id}, Entry]) || Gid <- Join],
|
|
|
|
+
|
|
|
|
+ [msg:notify([kvs_feed, RouteType, To, entry, {Id, products}, edit], [Entry])
|
|
|
|
+ || {RouteType, To} <- [{user, P#product.owner} | [{group, G} || G <- Intersection]]],
|
|
|
|
+ msg:notify([kvs_product, product, Owner, updated], [UpdProduct, Is])
|
|
|
|
+ end,
|
|
|
|
+ {noreply, State};
|
|
|
|
+
|
|
|
|
+
|
|
handle_notice(_Route, _Message, State) ->
|
|
handle_notice(_Route, _Message, State) ->
|
|
%error_logger:info_msg("Unknown USERS notice"),
|
|
%error_logger:info_msg("Unknown USERS notice"),
|
|
{noreply, State}.
|
|
{noreply, State}.
|