kvs_products.erl 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. -module(kvs_products).
  2. -copyright('Synrc Research Center s.r.o.').
  3. -include_lib("kvs/include/products.hrl").
  4. -include_lib("kvs/include/users.hrl").
  5. -include_lib("kvs/include/groups.hrl").
  6. -include_lib("kvs/include/accounts.hrl").
  7. -include_lib("kvs/include/log.hrl").
  8. -include_lib("kvs/include/config.hrl").
  9. -include_lib("kvs/include/feed_state.hrl").
  10. -include_lib("mqs/include/mqs.hrl").
  11. -compile(export_all).
  12. register(#product{} = Registration) ->
  13. Id = kvs:next_id("product", 1),
  14. Product = Registration#product{id = Id,
  15. name = "product"++integer_to_list(Id),
  16. feed = kvs_feed:create(),
  17. blog = kvs_feed:create(),
  18. features = kvs_feed:create(),
  19. specs = kvs_feed:create(),
  20. gallery = kvs_feed:create(),
  21. videos = kvs_feed:create(),
  22. bundles = kvs_feed:create(),
  23. creation_date = erlang:now()
  24. },
  25. error_logger:info_msg("PUT PRODUCT ~p", [Product]),
  26. kvs:put(Product),
  27. % init_mq(Product),
  28. {ok, Product}.
  29. delete(Name) ->
  30. case kvs:get(product, Name) of
  31. {ok, Product} ->
  32. GIds = kvs_group:participate(Name),
  33. [ mqs:notify(["subscription", "product", Name, "remove_from_group"], {GId}) || GId <- GIds ],
  34. F2U = [ {MeId, FrId} || #subscription{who = MeId, whom = FrId} <- subscriptions(Product) ],
  35. [ unsubscribe(MeId, FrId) || {MeId, FrId} <- F2U ],
  36. [ unsubscribe(FrId, MeId) || {MeId, FrId} <- F2U ],
  37. % kvs:delete(user_status, Name),
  38. kvs:delete(product, Name),
  39. {ok, Product};
  40. E -> E end.
  41. subscribe(Who, Whom) ->
  42. Record = #subscription{key={Who,Whom}, who = Who, whom = Whom},
  43. kvs:put(Record).
  44. unsubscribe(Who, Whom) ->
  45. case subscribed(Who, Whom) of
  46. true -> kvs:delete(subscription, {Who, Whom});
  47. false -> skip end.
  48. subscriptions(undefined)-> [];
  49. subscriptions(#product{name = UId}) -> subscriptions(UId);
  50. subscriptions(UId) -> DBA=?DBA, DBA:subscriptions(UId).
  51. subscribed(Who) -> DBA=?DBA, DBA:subscribed(Who).
  52. subscribed(Who, Whom) ->
  53. case kvs:get(subscription, {Who, Whom}) of
  54. {ok, _} -> true;
  55. _ -> false end.
  56. subscription_mq(Type, Action, Who, Whom) ->
  57. case mqs:open([]) of
  58. {ok,Channel} ->
  59. case {Type,Action} of
  60. {user,add} -> mqs_channel:bind_exchange(Channel, ?USER_EXCHANGE(Who), ?NOTIFICATIONS_EX, rk_product_feed(Whom));
  61. {user,remove} -> mqs_channel:unbind_exchange(Channel, ?USER_EXCHANGE(Who), ?NOTIFICATIONS_EX, rk_product_feed(Whom)) end,
  62. mqs_channel:close(Channel);
  63. {error,Reason} -> ?ERROR("subscription_mq error: ~p",[Reason]) end.
  64. init_mq(Product=#product{}) ->
  65. Groups = kvs_group:participate(Product),
  66. ProductExchange = ?USER_EXCHANGE(Product#product.name),
  67. ExchangeOptions = [{type, <<"fanout">>}, durable, {auto_delete, false}],
  68. case mqs:open([]) of
  69. {ok, Channel} ->
  70. ?INFO("Cration Exchange: ~p,",[{Channel,ProductExchange,ExchangeOptions}]),
  71. mqs_channel:create_exchange(Channel, ProductExchange, ExchangeOptions),
  72. Relations = build_user_relations(Product, Groups),
  73. [ mqs_channel:bind_exchange(Channel, ?USER_EXCHANGE(Product#product.name), ?NOTIFICATIONS_EX, Route) || Route <- Relations],
  74. mqs_channel:close(Channel);
  75. {error,Reason} -> ?ERROR("init_mq error: ~p",[Reason]) end.
  76. build_user_relations(Product, Groups) -> [
  77. mqs:key( [kvs_product, '*', Product]),
  78. mqs:key( [kvs_feed, product, Product, '*', '*', '*']),
  79. mqs:key( [kvs_feed, product, Product, '*'] ),
  80. mqs:key( [kvs_payment, product, Product, '*']),
  81. mqs:key( [kvs_account, product, Product, '*']),
  82. mqs:key( [kvs_meeting, product, Product, '*']),
  83. mqs:key( [kvs_purchase, product, Product, '*']) |
  84. [ mqs:key( [kvs_feed, group, G, '*', '*', '*']) || G <- Groups ]
  85. ].
  86. rk_product_feed(Product) -> mqs:key([kvs_feed, product, Product, '*', '*', '*']).
  87. retrieve_connections(Id,Type) ->
  88. Friends = case Type of
  89. user -> kvs_product:list_subscr_usernames(Id);
  90. _ -> kvs_group:list_group_members(Id) end,
  91. case Friends of
  92. [] -> [];
  93. Full -> Sub = lists:sublist(Full, 10),
  94. case Sub of
  95. [] -> [];
  96. _ ->
  97. Data = [begin
  98. case kvs:get(user,Who) of
  99. {ok,Product} -> RealName = kvs_product:user_realname_user(Product),
  100. Paid = kvs_payment:user_paid(Who),
  101. {Who,Paid,RealName};
  102. _ -> undefined end end || Who <- Sub],
  103. [ X || X <- Data, X/=undefined ] end end.
  104. handle_notice(["kvs_product", "subscribe", Who],
  105. Message, #state{owner = _Owner, type =_Type} = State) ->
  106. {Whom} = Message,
  107. kvs_product:subscribe(Who, Whom),
  108. subscription_mq(user, add, Who, Whom),
  109. {noreply, State};
  110. handle_notice(["kvs_product", "unsubscribe", Who],
  111. Message, #state{owner = _Owner, type =_Type} = State) ->
  112. {Whom} = Message,
  113. kvs_product:unsubscribe(Who, Whom),
  114. subscription_mq(user, remove, Who, Whom),
  115. {noreply, State};
  116. handle_notice(["kvs_product", "update", _Who],
  117. Message, #state{owner = _Owner, type =_Type} = State) ->
  118. {NewProduct} = Message,
  119. kvs:put(NewProduct),
  120. {noreply, State};
  121. handle_notice(_Route, _Message, State) ->
  122. %error_logger:info_msg("Unknown USERS notice"),
  123. {noreply, State}.