kvs_product.erl 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. -module(kvs_product).
  2. -copyright('Synrc Research Center s.r.o.').
  3. -include_lib("kvs/include/kvs.hrl").
  4. -include_lib("kvs/include/products.hrl").
  5. -include_lib("kvs/include/purchases.hrl").
  6. -include_lib("kvs/include/users.hrl").
  7. -include_lib("kvs/include/groups.hrl").
  8. -include_lib("kvs/include/feeds.hrl").
  9. -include_lib("kvs/include/accounts.hrl").
  10. -include_lib("kvs/include/config.hrl").
  11. -include_lib("kvs/include/feed_state.hrl").
  12. -include_lib("mqs/include/mqs.hrl").
  13. -compile(export_all).
  14. init(Backend) ->
  15. ?CREATE_TAB(product),
  16. ?CREATE_TAB(user_product),
  17. ?CREATE_TAB(product_category),
  18. ok.
  19. delete(Name) ->
  20. case kvs:get(product, Name) of
  21. {ok, Product} ->
  22. [kvs_group:leave(Name, Gid) || Gid <- kvs_group:participate(Name)],
  23. kvs:remove(product, Name),
  24. {ok, Product};
  25. E -> E end.
  26. subscription_mq(Type, Action, Who, Whom) ->
  27. case mqs:open([]) of
  28. {ok,Channel} ->
  29. case {Type,Action} of
  30. {user,add}->
  31. mqs_channel:bind_exchange(Channel, ?USER_EXCHANGE(Who), ?NOTIFICATIONS_EX, rk_product_feed(Whom));
  32. {user,remove} ->
  33. mqs_channel:unbind_exchange(Channel, ?USER_EXCHANGE(Who), ?NOTIFICATIONS_EX, rk_product_feed(Whom)) end,
  34. mqs_channel:close(Channel);
  35. {error,Reason} -> error_logger:info_msg("subscription_mq error: ~p",[Reason]) end.
  36. init_mq(Product=#product{}) ->
  37. Groups = kvs_group:participate(Product),
  38. ProductExchange = ?USER_EXCHANGE(Product#product.id),
  39. ExchangeOptions = [{type, <<"fanout">>}, durable, {auto_delete, false}],
  40. case mqs:open([]) of
  41. {ok, Channel} ->
  42. error_logger:info_msg("Cration Exchange: ~p,",[{Channel,ProductExchange,ExchangeOptions}]),
  43. mqs_channel:create_exchange(Channel, ProductExchange, ExchangeOptions),
  44. Relations = build_product_relations(Product, Groups),
  45. [ mqs_channel:bind_exchange(Channel, ?USER_EXCHANGE(Product#product.id), ?NOTIFICATIONS_EX, Route)
  46. || Route <- Relations],
  47. mqs_channel:close(Channel);
  48. {error,Reason} -> error_logger:info_msg("init_mq error: ~p",[Reason]) end.
  49. build_product_relations(Product, Groups) -> [
  50. mqs:key( [kvs_product, '*', Product]),
  51. mqs:key( [kvs_feed, product, Product, '*', '*', '*']),
  52. mqs:key( [kvs_feed, product, Product, '*'] ),
  53. mqs:key( [kvs_payment, product, Product, '*']),
  54. mqs:key( [kvs_account, product, Product, '*']),
  55. mqs:key( [kvs_meeting, product, Product, '*']),
  56. mqs:key( [kvs_purchase, product, Product, '*']) |
  57. [ mqs:key( [kvs_feed, group, G, '*', '*', '*']) || G <- Groups ]
  58. ].
  59. rk_product_feed(Product) -> mqs:key([kvs_feed, product, Product, '*', '*', '*']).
  60. retrieve_connections(Id,Type) ->
  61. Friends = case Type of
  62. user -> kvs_product:list_subscr_usernames(Id);
  63. _ -> kvs_group:list_group_members(Id) end,
  64. case Friends of
  65. [] -> [];
  66. Full -> Sub = lists:sublist(Full, 10),
  67. case Sub of
  68. [] -> [];
  69. _ ->
  70. Data = [begin
  71. case kvs:get(user,Who) of
  72. {ok,Product} -> RealName = kvs_product:user_realname_user(Product),
  73. Paid = kvs_payment:user_paid(Who),
  74. {Who,Paid,RealName};
  75. _ -> undefined end end || Who <- Sub],
  76. [ X || X <- Data, X/=undefined ] end end.
  77. handle_notice(["kvs_product", "subscribe", Who],
  78. Message, #state{owner = _Owner, type =_Type} = State) ->
  79. {Whom} = Message,
  80. kvs_product:subscribe(Who, Whom),
  81. subscription_mq(user, add, Who, Whom),
  82. {noreply, State};
  83. handle_notice(["kvs_product", "unsubscribe", Who],
  84. Message, #state{owner = _Owner, type =_Type} = State) ->
  85. {Whom} = Message,
  86. kvs_product:unsubscribe(Who, Whom),
  87. subscription_mq(user, remove, Who, Whom),
  88. {noreply, State};
  89. handle_notice([kvs_product, Owner, create],
  90. [#product{}=Product, Recipients],
  91. #state{owner=Owner, feeds=Feeds}=State) ->
  92. error_logger:info_msg("[kvs_product] Create product ~p", [Owner]),
  93. Created = case kvs:add(Product) of {error, E} -> {error, E};
  94. {ok, #product{id=Id} = P} ->
  95. Params = [{id, Id}, {type, product}, {feeds, element(#iterator.feeds, P)}],
  96. case workers_sup:start_child(Params) of {error, E} -> {error, E};
  97. _ -> Entry = to_entry(P),
  98. [msg:notify([kvs_group, join, Gid], [{products, Id}, Entry]) || {Type, Gid} <- Recipients, Type==group],
  99. case lists:keyfind(products, 1, Feeds) of false -> skip;
  100. {_,Fid} -> msg:notify([kvs_feed, user, Owner, entry, Id, add], [Entry#entry{feed_id=Fid}]) end,
  101. P
  102. end end,
  103. msg:notify([kvs_product, product, Product#product.id, created], [Created]),
  104. {noreply, State};
  105. handle_notice([kvs_product, Owner, update],
  106. [#product{}=Product, Recipients], #state{owner=Owner} = State) ->
  107. error_logger:info_msg("[kvs_product] Update product ~p", [Owner]),
  108. case kvs:get(product, Owner) of {error,E}->
  109. msg:notify([kvs_product, product, Owner, updated], [{error,E}]);
  110. {ok, #product{}=P} ->
  111. Id = P#product.id,
  112. UpdProduct = P#product{
  113. title = Product#product.title,
  114. brief = Product#product.brief,
  115. cover = Product#product.cover,
  116. price = Product#product.price,
  117. currency = Product#product.currency},
  118. kvs:put(UpdProduct),
  119. Entry = to_entry(UpdProduct),
  120. Groups = ordsets:from_list([Gid || {Type,Gid} <- Recipients, Type==group]),
  121. Participate = ordsets:from_list([Gid || #group_subscription{where=Gid} <- kvs_group:participate(Id)]),
  122. Intersection = ordsets:intersection(Groups, Participate),
  123. Leave = ordsets:subtract(Participate, Intersection),
  124. Join = ordsets:subtract(Groups, Intersection),
  125. [msg:notify([kvs_group, leave, Gid], [{products, Id}]) || Gid <- Leave],
  126. [msg:notify([kvs_group, join, Gid], [{products, Id}, Entry]) || Gid <- Join],
  127. [msg:notify([kvs_feed, RouteType, To, entry, {Id, products}, edit], [Entry])
  128. || {RouteType, To} <- [{user, P#product.owner} | [{group, G} || G <- Intersection]]],
  129. msg:notify([kvs_product, product, Owner, updated], [UpdProduct])
  130. end,
  131. {noreply, State};
  132. handle_notice([kvs_product, Owner, delete],
  133. [#product{}=P],
  134. #state{owner=Owner, feeds=Feeds}=State) ->
  135. error_logger:info_msg("[kvs_product] Delete product ~p ~p", [P#product.id, Owner]),
  136. Removed = case kvs:remove(P) of {error,E} -> {error,E};
  137. ok ->
  138. [msg:notify([kvs_group, leave, Gid], [{products, P#product.id}])
  139. || #group_subscription{where=Gid} <- kvs_group:participate(P#product.id)],
  140. case lists:keyfind(products, 1, Feeds) of false -> skip;
  141. {_,Fid} -> msg:notify([kvs_feed, user, Owner, entry, {P#product.id, Fid}, delete], []) end,
  142. supervisor:terminate_child(workers_sup, {product, P#product.id}),
  143. supervisor:delete_child(workers_sup, {product, P#product.id}),
  144. P
  145. end,
  146. msg:notify([kvs_product, product, P#product.id, deleted], [Removed]),
  147. {noreply, State};
  148. handle_notice(_Route, _Message, State) -> {noreply, State}.
  149. to_entry(#product{}=P) ->
  150. Media = case P#product.cover of undefined -> [];
  151. File ->
  152. Thumbnail = filename:join([ filename:dirname(File), "thumbnail", filename:basename(File)]),
  153. [#media{url=File, thumbnail_url = Thumbnail}] end,
  154. #entry{ entry_id = P#product.id,
  155. created = P#product.created,
  156. from = P#product.owner,
  157. type = product,
  158. media = Media,
  159. title = P#product.title,
  160. description = P#product.brief,
  161. shared = ""}.