kvs_product.erl 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. -module(kvs_product).
  2. -copyright('Synrc Research Center s.r.o.').
  3. -include_lib("kvs/include/kvs.hrl").
  4. -include_lib("kvs/include/products.hrl").
  5. -include_lib("kvs/include/purchases.hrl").
  6. -include_lib("kvs/include/users.hrl").
  7. -include_lib("kvs/include/groups.hrl").
  8. -include_lib("kvs/include/feeds.hrl").
  9. -include_lib("kvs/include/accounts.hrl").
  10. -include_lib("kvs/include/config.hrl").
  11. -include_lib("kvs/include/feed_state.hrl").
  12. -include_lib("mqs/include/mqs.hrl").
  13. -compile(export_all).
  14. init(Backend) ->
  15. ?CREATE_TAB(product),
  16. ?CREATE_TAB(user_product),
  17. ?CREATE_TAB(product_category),
  18. ok.
  19. delete(Name) ->
  20. case kvs:get(product, Name) of
  21. {ok, Product} ->
  22. [kvs_group:leave(Name, Gid) || Gid <- kvs_group:participate(Name)],
  23. kvs:remove(product, Name),
  24. {ok, Product};
  25. E -> E end.
  26. subscription_mq(Type, Action, Who, Whom) ->
  27. case mqs:open([]) of
  28. {ok,Channel} ->
  29. case {Type,Action} of
  30. {user,add} -> mqs_channel:bind_exchange(Channel, ?USER_EXCHANGE(Who), ?NOTIFICATIONS_EX, rk_product_feed(Whom));
  31. {user,remove} -> mqs_channel:unbind_exchange(Channel, ?USER_EXCHANGE(Who), ?NOTIFICATIONS_EX, rk_product_feed(Whom)) end,
  32. mqs_channel:close(Channel);
  33. {error,Reason} -> error_logger:info_msg("subscription_mq error: ~p",[Reason]) end.
  34. init_mq(Product=#product{}) ->
  35. Groups = kvs_group:participate(Product),
  36. ProductExchange = ?USER_EXCHANGE(Product#product.id),
  37. ExchangeOptions = [{type, <<"fanout">>}, durable, {auto_delete, false}],
  38. case mqs:open([]) of
  39. {ok, Channel} ->
  40. error_logger:info_msg("Cration Exchange: ~p,",[{Channel,ProductExchange,ExchangeOptions}]),
  41. mqs_channel:create_exchange(Channel, ProductExchange, ExchangeOptions),
  42. Relations = build_product_relations(Product, Groups),
  43. [ mqs_channel:bind_exchange(Channel, ?USER_EXCHANGE(Product#product.id), ?NOTIFICATIONS_EX, Route) || Route <- Relations],
  44. mqs_channel:close(Channel);
  45. {error,Reason} -> error_logger:info_msg("init_mq error: ~p",[Reason]) end.
  46. build_product_relations(Product, Groups) -> [
  47. mqs:key( [kvs_product, '*', Product]),
  48. mqs:key( [kvs_feed, product, Product, '*', '*', '*']),
  49. mqs:key( [kvs_feed, product, Product, '*'] ),
  50. mqs:key( [kvs_payment, product, Product, '*']),
  51. mqs:key( [kvs_account, product, Product, '*']),
  52. mqs:key( [kvs_meeting, product, Product, '*']),
  53. mqs:key( [kvs_purchase, product, Product, '*']) |
  54. [ mqs:key( [kvs_feed, group, G, '*', '*', '*']) || G <- Groups ]
  55. ].
  56. rk_product_feed(Product) -> mqs:key([kvs_feed, product, Product, '*', '*', '*']).
  57. retrieve_connections(Id,Type) ->
  58. Friends = case Type of
  59. user -> kvs_product:list_subscr_usernames(Id);
  60. _ -> kvs_group:list_group_members(Id) end,
  61. case Friends of
  62. [] -> [];
  63. Full -> Sub = lists:sublist(Full, 10),
  64. case Sub of
  65. [] -> [];
  66. _ ->
  67. Data = [begin
  68. case kvs:get(user,Who) of
  69. {ok,Product} -> RealName = kvs_product:user_realname_user(Product),
  70. Paid = kvs_payment:user_paid(Who),
  71. {Who,Paid,RealName};
  72. _ -> undefined end end || Who <- Sub],
  73. [ X || X <- Data, X/=undefined ] end end.
  74. handle_notice(["kvs_product", "subscribe", Who],
  75. Message, #state{owner = _Owner, type =_Type} = State) ->
  76. {Whom} = Message,
  77. kvs_product:subscribe(Who, Whom),
  78. subscription_mq(user, add, Who, Whom),
  79. {noreply, State};
  80. handle_notice(["kvs_product", "unsubscribe", Who],
  81. Message, #state{owner = _Owner, type =_Type} = State) ->
  82. {Whom} = Message,
  83. kvs_product:unsubscribe(Who, Whom),
  84. subscription_mq(user, remove, Who, Whom),
  85. {noreply, State};
  86. handle_notice(["kvs_product", "update", _Who],
  87. Message, #state{owner = _Owner, type =_Type} = State) ->
  88. {NewProduct} = Message,
  89. kvs:put(NewProduct),
  90. {noreply, State};
  91. handle_notice(_Route, _Message, State) ->
  92. %error_logger:info_msg("Unknown USERS notice"),
  93. {noreply, State}.