kvs_product.erl 3.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. -module(kvs_product).
  2. -copyright('Synrc Research Center s.r.o.').
  3. -include_lib("kvs/include/products.hrl").
  4. -include_lib("kvs/include/users.hrl").
  5. -include_lib("kvs/include/groups.hrl").
  6. -include_lib("kvs/include/feeds.hrl").
  7. -include_lib("kvs/include/accounts.hrl").
  8. -include_lib("kvs/include/log.hrl").
  9. -include_lib("kvs/include/config.hrl").
  10. -include_lib("kvs/include/feed_state.hrl").
  11. -include_lib("mqs/include/mqs.hrl").
  12. -compile(export_all).
  13. delete(Name) ->
  14. case kvs:get(product, Name) of
  15. {ok, Product} ->
  16. [kvs_group:leave(Name, Gid) || Gid <- kvs_group:participate(Name)],
  17. kvs:delete(product, Name),
  18. {ok, Product};
  19. E -> E end.
  20. subscription_mq(Type, Action, Who, Whom) ->
  21. case mqs:open([]) of
  22. {ok,Channel} ->
  23. case {Type,Action} of
  24. {user,add} -> mqs_channel:bind_exchange(Channel, ?USER_EXCHANGE(Who), ?NOTIFICATIONS_EX, rk_product_feed(Whom));
  25. {user,remove} -> mqs_channel:unbind_exchange(Channel, ?USER_EXCHANGE(Who), ?NOTIFICATIONS_EX, rk_product_feed(Whom)) end,
  26. mqs_channel:close(Channel);
  27. {error,Reason} -> ?ERROR("subscription_mq error: ~p",[Reason]) end.
  28. init_mq(Product=#product{}) ->
  29. Groups = kvs_group:participate(Product),
  30. ProductExchange = ?USER_EXCHANGE(Product#product.id),
  31. ExchangeOptions = [{type, <<"fanout">>}, durable, {auto_delete, false}],
  32. case mqs:open([]) of
  33. {ok, Channel} ->
  34. ?INFO("Cration Exchange: ~p,",[{Channel,ProductExchange,ExchangeOptions}]),
  35. mqs_channel:create_exchange(Channel, ProductExchange, ExchangeOptions),
  36. Relations = build_product_relations(Product, Groups),
  37. [ mqs_channel:bind_exchange(Channel, ?USER_EXCHANGE(Product#product.id), ?NOTIFICATIONS_EX, Route) || Route <- Relations],
  38. mqs_channel:close(Channel);
  39. {error,Reason} -> ?ERROR("init_mq error: ~p",[Reason]) end.
  40. build_product_relations(Product, Groups) -> [
  41. mqs:key( [kvs_product, '*', Product]),
  42. mqs:key( [kvs_feed, product, Product, '*', '*', '*']),
  43. mqs:key( [kvs_feed, product, Product, '*'] ),
  44. mqs:key( [kvs_payment, product, Product, '*']),
  45. mqs:key( [kvs_account, product, Product, '*']),
  46. mqs:key( [kvs_meeting, product, Product, '*']),
  47. mqs:key( [kvs_purchase, product, Product, '*']) |
  48. [ mqs:key( [kvs_feed, group, G, '*', '*', '*']) || G <- Groups ]
  49. ].
  50. rk_product_feed(Product) -> mqs:key([kvs_feed, product, Product, '*', '*', '*']).
  51. retrieve_connections(Id,Type) ->
  52. Friends = case Type of
  53. user -> kvs_product:list_subscr_usernames(Id);
  54. _ -> kvs_group:list_group_members(Id) end,
  55. case Friends of
  56. [] -> [];
  57. Full -> Sub = lists:sublist(Full, 10),
  58. case Sub of
  59. [] -> [];
  60. _ ->
  61. Data = [begin
  62. case kvs:get(user,Who) of
  63. {ok,Product} -> RealName = kvs_product:user_realname_user(Product),
  64. Paid = kvs_payment:user_paid(Who),
  65. {Who,Paid,RealName};
  66. _ -> undefined end end || Who <- Sub],
  67. [ X || X <- Data, X/=undefined ] end end.
  68. handle_notice(["kvs_product", "subscribe", Who],
  69. Message, #state{owner = _Owner, type =_Type} = State) ->
  70. {Whom} = Message,
  71. kvs_product:subscribe(Who, Whom),
  72. subscription_mq(user, add, Who, Whom),
  73. {noreply, State};
  74. handle_notice(["kvs_product", "unsubscribe", Who],
  75. Message, #state{owner = _Owner, type =_Type} = State) ->
  76. {Whom} = Message,
  77. kvs_product:unsubscribe(Who, Whom),
  78. subscription_mq(user, remove, Who, Whom),
  79. {noreply, State};
  80. handle_notice(["kvs_product", "update", _Who],
  81. Message, #state{owner = _Owner, type =_Type} = State) ->
  82. {NewProduct} = Message,
  83. kvs:put(NewProduct),
  84. {noreply, State};
  85. handle_notice(_Route, _Message, State) ->
  86. %error_logger:info_msg("Unknown USERS notice"),
  87. {noreply, State}.