Browse Source

Merge branch 'master' of github.com:synrc/kvs

Maxim Sokhatsky 12 years ago
parent
commit
d48360a0d7

+ 0 - 9
include/attachments.hrl

@@ -1,9 +0,0 @@
--record(uploads, {key, counter}).
--record(attachment, {
-        id, 
-        name,
-        type,
-        file,
-        thumb,
-        owner,
-        data}).

+ 7 - 1
include/feed_state.hrl

@@ -3,8 +3,14 @@
         type :: user | group | system | product,
         feed,
         direct,
+        blog,
+        features,
+        specs,
+        gallery,
+        videos,
+        bundles,
+        callback=feed_server_api,  % tmp field\part of behaviour callback state
         cached_feed,
         cached_direct,
         cached_friends,
         cached_groups }).
-

+ 1 - 0
include/feeds.hrl

@@ -9,6 +9,7 @@
         feed_id,  % are for secondary indexes
         from, % author
         to,
+        title,
         description,
         created, % time
         hidden,

+ 16 - 7
include/products.hrl

@@ -14,20 +14,29 @@
         user_price }).
 
 -record(product, {
-        id                     :: integer(), % auto
+        id,
         ext_id                 :: term(),    % ext
-        name                   :: binary(),  % admin (based on ext)
+        name                   :: string(),  % name
+        display_name           :: binary(),  % admin (based on ext)
         ext_name               :: binary(),  % ext
         vendor_id              :: integer(), % auto
         categories             :: list(integer()), % admin
+        creator,
+        owner,
         feed,
-        description_short      :: binary(),  % admin (based on ext)
-        description_long       :: binary(),  % admin (based on ext)
-        image_small_url        :: binary(),  % admin (based on ext)
-        image_big_url          :: binary(),  % admin (based on ext)
+        blog,
+        features,
+        specs,
+        gallery,
+        videos,
+        bundles,
+        title,
+        brief,
+        cover,
         publish_start_date     :: calendar:date_time(), % admin
         publish_end_date       :: calendar:date_time(), % admin
-        price                  :: integer(), % ext
+        currency,
+        price,
         retailer_price         :: integer(), % ext
         our_price              :: integer(), % auto
         enabled_on_site        :: boolean(), % admin

+ 2 - 0
include/users.hrl

@@ -105,3 +105,5 @@
         }).
 
 -define(USER_EXCHANGE(UserId), list_to_binary("user_exchange."++UserId++".fanout")).
+
+-record(uploads, {key, counter}).

+ 4 - 3
src/feed_server_api.erl

@@ -1,6 +1,7 @@
 -module(feed_server_api).
+-export([handle/3]).
 
 handle([Module|Parameters],Message,State) ->
-    Module = list_to_atom(binary_to_list(term_to_binary(Module))),
-    error_logger:info_msg("handle_notice Route: ~p Message ~p",[[Module|Parameters],Message]),
-    Module:handle_notice([Module|Parameters],Message,State).
+%    error_logger:info_msg("[feed_server_api]handle_notice Route: ~p Message ~p",[[Module|Parameters],Message]),
+    M = if is_atom(Module)->Module; true-> list_to_atom(binary_to_list(term_to_binary(Module))) end,
+    M:handle_notice([M|Parameters],Message,State).

+ 1 - 0
src/kvs.erl

@@ -12,6 +12,7 @@
 -include_lib("kvs/include/log.hrl").
 -include_lib("kvs/include/membership.hrl").
 -include_lib("kvs/include/payments.hrl").
+-include_lib("kvs/include/products.hrl").
 -include_lib("stdlib/include/qlc.hrl").
 -include_lib("kvs/include/feed_state.hrl").
 -compile(export_all).

+ 3 - 2
src/kvs_account.erl

@@ -45,8 +45,9 @@ check_quota(User, Amount) ->
 
 commit_transaction(#transaction{remitter = R, acceptor = A,  currency = Currency, amount = Amount} = TX) ->
     case change_accounts(R, A, Currency, Amount) of
-         ok -> mqs:notify([kvs_account, user, R, transaction], TX),
-               mqs:notify([kvs_account, user, A, transaction], TX);
+         ok -> %mqs:notify([kvs_account, user, R, transaction], TX),
+               %mqs:notify([kvs_account, user, A, transaction], TX);
+                xen;
          Error -> skip end.
 
 check_remitter_balance(RA, Amount) -> ok.

+ 33 - 36
src/kvs_comment.erl

@@ -10,41 +10,38 @@ add(FId, User, EntryId, ParentComment, CommentId, Content, Medias) ->
          _ -> ok end.
 
 add(FId, User, EntryId, ParentComment, CommentId, Content, Medias, _) ->
-    FullId = {CommentId, {EntryId, FId}},
-
-    Prev = case ParentComment of
-        undefined ->
-            {ok, Entry} = kvs:get(entry,{EntryId, FId}),
-            {PrevC, E} = case Entry#entry.comments of
-                        undefined -> {undefined, Entry#entry{comments_rear = FullId}};
-                        Id -> {ok, PrevTop} = kvs:get(comment, Id),
-                              kvs:put(PrevTop#comment{next = FullId}),
-                              {Id, Entry} end,
-            kvs:put(E#entry{comments=FullId}),
-            PrevC;
-        _ ->
-            {ok, Parent} = kvs:get(comment, {{EntryId, FId}, ParentComment}),
-            {PrevC, CC} = case Parent#comment.comments of
-                        undefined -> {undefined, Parent#comment{comments_rear = FullId}};
-                        Id -> {ok, PrevTop} = kvs:get(comment, Id),
-                              kvs:put(PrevTop#comment{next = FullId}),
-                              {Id, Parent} end,
-            kvs:put(CC#comment{comments = FullId}),
-            PrevC end,
-
-    Comment = #comment{id = FullId,
-                       author_id = User,
-                       comment_id = CommentId,
-                       entry_id = EntryId,
-                       content = Content,
-                       media = Medias,
-                       creation_time = now(),
-                       prev = Prev,
-                       next = undefined},
-
-    kvs:put(Comment),
-    {ok, Comment}.
-
+  FullId = {CommentId, {EntryId, FId}},
+
+  Prev = case ParentComment of
+    undefined ->
+      {ok, Entry} = kvs:get(entry,{EntryId, FId}),
+      {PrevC, E} = case Entry#entry.comments of
+        undefined -> {undefined, Entry#entry{comments_rear = FullId}};
+        Id ->  case kvs:get(comment, Id) of {ok, PrevTop} -> kvs:put(PrevTop#comment{next = FullId}); {error, not_found} -> skip end, {Id, Entry} end,
+      kvs:put(E#entry{comments=FullId}),
+      PrevC;
+    P ->
+      case kvs:get(comment, {P, {EntryId, FId}}) of
+        {ok, Parent} ->
+          {PrevC, CC} = case Parent#comment.comments of
+            undefined -> {undefined, Parent#comment{comments_rear = FullId}};
+            Id -> {ok, PrevTop} = kvs:get(comment, Id), kvs:put(PrevTop#comment{next = FullId}), {Id, Parent} end,
+          kvs:put(CC#comment{comments = FullId}),
+          PrevC;
+        {error, not_found} -> undefined end end,
+
+  Comment = #comment{id = FullId,
+                     author_id = User,
+                     comment_id = CommentId,
+                     entry_id = EntryId,
+                     content = Content,
+                     media = Medias,
+                     creation_time = now(),
+                     prev = Prev,
+                     next = undefined},
+  error_logger:info_msg("PUT: ~p", [Comment]),
+  kvs:put(Comment),
+  {ok, Comment}.
 
 read_comments(undefined) -> [];
 read_comments([#comment{comments = C} | Rest]) -> [read_comments(C) | read_comments(Rest)];
@@ -60,5 +57,5 @@ author_comments(Who) -> DBA=?DBA,DBA:author_comments(Who).
 
 remove(FId, EId) ->
     AllComments = feed_comments({EId, FId}),
-    [begin kvs:delete(comment, ID) end || #comment{id = ID, media = M} <- AllComments].
+    [begin kvs:delete(comment, ID) end || #comment{id = ID, media = _M} <- AllComments].
 

+ 82 - 117
src/kvs_feed.erl

@@ -17,36 +17,29 @@ create() ->
     ok = kvs:put(#feed{id = FId} ),
     FId.
 
-add_entry(FId, User, To, EntryId,Desc,Medias,Type,SharedBy) ->
-    case kvs:get(entry,{EntryId, FId}) of
-        {ok, _} -> ok;
-        _ -> add_entry(FId, User, To, EntryId, Desc, Medias, Type, SharedBy, dont_check) end.
-
-add_entry(FId, User, To, EntryId, Desc, Medias, Type, SharedBy, _) ->
-    {ok,Feed} = kvs:get(feed, erlang:integer_to_list(FId)),
-
-    Id = {EntryId, FId},
-    Next = undefined,
-    Prev = case Feed#feed.top of
-               undefined -> undefined;
-               X -> case kvs:get(entry, X) of
-                       {ok, TopEntry} -> EditedEntry = TopEntry#entry{next = Id}, kvs:put(EditedEntry), TopEntry#entry.id;
-                       {error, _} -> undefined end end,
-
-    kvs:put(#feed{id = FId, top = {EntryId, FId}}), % update feed top with current
-
-    Entry  = #entry{id = {EntryId, FId}, entry_id = EntryId, feed_id = FId, from = User,
+add_entry(FId, From, To, EntryId, Title, Desc, Medias, Type, SharedBy) ->
+  case kvs:get(feed, FId) of
+    {ok,Feed} ->
+      Id = {EntryId, FId},
+      Next = undefined,
+      Prev = case Feed#feed.top of
+        undefined -> undefined;
+        X -> case kvs:get(entry, X) of
+          {ok, TopEntry} -> EditedEntry = TopEntry#entry{next = Id}, kvs:put(EditedEntry), TopEntry#entry.id;
+          {error, _} -> undefined end end,
+
+      kvs:put(#feed{id = FId, top = {EntryId, FId}}), % update feed top with current
+
+      Entry  = #entry{id = {EntryId, FId}, entry_id = EntryId, feed_id = FId, from = From,
                     to = To, type = Type, media = Medias, created = now(),
-                    description = Desc, shared = SharedBy,
+                    title=Title, description = Desc, shared = SharedBy,
                     next = Next, prev = Prev},
 
-    ModEntry = case catch feedformat:format(Entry) of
-                   {_, Reason} -> ?ERROR("feedformat error: ~p", [Reason]), Entry;
-                   #entry{} = ME -> ME end,
-
-    kvs:put(ModEntry),
-
-    {ok, ModEntry}.
+      kvs:put(Entry),
+      error_logger:info_msg("PUT entry: ~p", [Entry]),
+      {ok, Entry};
+    {error, not_found} -> error_logger:info_msg("Add entry failed. No feed ~p", [FId])
+  end.
 
 entry_traversal(undefined, _) -> [];
 entry_traversal(_, 0) -> [];
@@ -131,7 +124,7 @@ remove_entry(FeedId, EId) ->
             case kvs:get(entry, Next) of {ok, NE} -> kvs:put(NE#entry{prev = Prev});  _ -> ok end,
             case kvs:get(entry, Prev) of {ok, PE} -> kvs:put(PE#entry{next = Next});  _ -> ok end,
             case TopId of {EId, FeedId} -> kvs:put(Feed#feed{top = Prev}); _ -> ok end;
-        {error, _} -> ?INFO("Not found"), ok
+        {error, _} -> error_logger:info_msg("Not found"), ok
     end,
     kvs:delete(entry, {EId, FeedId}).
 
@@ -181,95 +174,81 @@ purge_feed(FeedId) ->
     kvs:put(Feed#feed{top=undefined}).
 
 purge_unverified_feeds() ->
-    [purge_feed(FeedId) || #user{feed=FeedId,status=S,email=E} <- kvs:all(user), E==undefined].
+    [purge_feed(FeedId) || #user{feed=FeedId, email=E} <- kvs:all(user), E==undefined].
 
 %% MQ API
 
-handle_notice(["kvs_feed", "group", GroupId, "entry", EntryId, "add"] = Route, [From|_] = Message, #state{owner = Owner, feed = Feed} = State) ->
-    ?INFO("feed(~p): group message: Owner=~p, Route=~p, Message=~p", [self(), Owner, Route, Message]),
-    [From, _Destinations, Desc, Medias] = Message,
-    kvs_feed:add_entry(Feed, From, [{GroupId, group}], EntryId, Desc, Medias, {group, direct}, ""),
-    case Owner == GroupId of
-        false -> ok;
-        true ->
-            {ok, Group} = kvs:get(group, GroupId),
-            GE = Group#group.entries_count,
-            kvs:put(Group#group{entries_count = GE+1}),
-            {ok, Subs} = kvs:get(group_subscription, {From, GroupId}),
-            SE = Subs#group_subscription.posts_count,
-            kvs:put(Subs#group_subscription{posts_count = SE+1})
-    end,
-    self() ! {feed_refresh,Feed, ?CACHED_ENTRIES},
-    {noreply, State};
-
-
-handle_notice(["kvs_feed", "user", FeedOwner, "entry", EntryId, "add"] = Route,
-    [From|_] = Message, #state{owner = WorkerOwner, feed = Feed, direct = Direct} = State) ->
-    ?INFO("feed(~p): message: Owner=~p, Route=~p, Message=~p", [self(), WorkerOwner, Route, Message]),
-
-    [From, Destinations, Desc, Medias] = Message,
-
-    if
-        %% user added message to own feed
-        FeedOwner == From andalso FeedOwner == WorkerOwner->
-            FilteredDst = [D || {_, group} = D <- Destinations],
-            kvs_feed:add_entry(Feed, From, FilteredDst, EntryId, Desc, Medias, {user, normal},""),
-            self() ! {feed_refresh,Feed, ?CACHED_ENTRIES};
-
-        %% friend added message to public feed
-        FeedOwner == From -> 
-            kvs_feed:add_entry(Feed, From, [], EntryId, Desc, Medias, {user, normal},""),
-            self() ! {feed_refresh,Feed, ?CACHED_ENTRIES};
-
-        %% direct message to worker owner
-        FeedOwner == WorkerOwner -> 
-            kvs_feed:add_entry(Direct, From, [{FeedOwner, user}], EntryId, Desc, Medias, {user,direct}, ""),
-            self() ! {direct_refresh,Direct, ?CACHED_ENTRIES};
-
-        %% user sent direct message to friend, add copy to his direct feed
-        From == WorkerOwner ->
-            kvs_feed:add_entry(Direct, WorkerOwner, Destinations, EntryId, Desc, Medias, {user, direct}, ""),
-            self() ! {direct_refresh,Direct, ?CACHED_ENTRIES};
-
-        true -> ?INFO("not matched case in entry->add")
+handle_notice([kvs_feed, Totype, Toid, entry, EntryId, add],
+              [Fid, From, Title, Desc, Medias, EntryType, _, _, _],
+              #state{owner=Owner, feed=Feed}=State)->
+  if Owner == Toid ->
+    % handle user direct feed
+    error_logger:info_msg("Add: entry ~p worker ~p feed ~p", [EntryId, Owner, Feed]),
+    add_entry(case Totype of product -> Fid; _ -> Feed end, From, {Toid, Totype}, EntryId, Title, Desc, Medias, EntryType, ""),
+    case Totype of
+      group ->
+          {ok, Group} = kvs:get(group, Toid),
+          GE = Group#group.entries_count,
+          kvs:put(Group#group{entries_count = GE+1}),
+          {ok, Subs} = kvs:get(group_subscription, {From, Toid}),
+          SE = Subs#group_subscription.posts_count,
+          kvs:put(Subs#group_subscription{posts_count = SE+1});
+      _ -> skip
     end,
-
-    {noreply, State};
-
-handle_notice(["kvs_feed", "user", _FeedOwner, "entry", EntryId, "add_system"] = Route,
-    [From|_] = Message, #state{owner = WorkerOwner, feed = Feed, direct = _Direct} = State) ->
-    ?INFO("feed(~p): system message: Owner=~p, Route=~p, Message=~p", [self(), WorkerOwner, Route, Message]),
-    [From, _Destinations, Desc, Medias] = Message,
-    kvs_feed:add_entry(Feed, From, [], EntryId, Desc, Medias, {user, system}, ""),
-    {noreply, State};
-
-handle_notice(["kvs_feed", "group", GroupId, "entry", EntryId, "add_system"] = Route,
-              [From|_] = Message, #state{owner = Owner, feed = Feed} = State) ->
-    ?INFO("feed(~p): group system message: Owner=~p, Route=~p, Message=~p",
-          [self(), Owner, Route, Message]),
-    [From, _Destinations, Desc, Medias] = Message,
-    kvs_feed:add_entry(Feed, From, [{GroupId, group}], EntryId, Desc, Medias, {group, system}, ""),
-    {noreply, State};
+    self() ! {feed_refresh, Fid, ?CACHED_ENTRIES};
+    true -> skip end,
+  {noreply, State};
+
+handle_notice([kvs_feed, Totype, Toid, entry, {Eid, Fid}, edit],
+              [_, _, Title, Desc],
+              #state{owner=Owner, feed=Feed}=State) ->
+  if Owner == Toid ->
+    error_logger:info_msg("Edit: worker ~p entry ~p feed ~p" , [Owner, Eid, Fid] ),
+    case kvs:get(entry, {Eid, case Totype of product -> Fid; _ -> Feed end}) of {error, not_found}-> skip; {ok, Entry} -> error_logger:info_msg("ok!"),kvs:put(Entry#entry{title=Title, description=Desc}) end;
+    true -> skip end,
+  {noreply, State};
+
+handle_notice([kvs_feed, Totype, Toid, entry, {Eid,Fid}, delete],
+              [_From|_], #state{owner=Owner, feed=Feed} = State) ->
+  if Owner == Toid ->
+    error_logger:info_msg("Delete: worker ~p entry ~p feed ~p", [Owner, Eid, Fid]),
+    FeedId = case Totype of product -> Fid; _ -> Feed end, %kvs_acl:check_access(From, {feature, admin})
+    kvs_feed:remove_entry(FeedId, Eid),
+    self() ! {feed_refresh, FeedId, ?CACHED_ENTRIES};
+    true-> skip
+  end,
+  {noreply, State};
+
+handle_notice([kvs_feed, entry, {Eid, FeedId}, comment, Cid, add],
+              [From, Parent, Content, Medias, _, _],
+              #state{owner=Owner, feed=Fid} = State) ->
+  if FeedId == Fid ->
+    [begin error_logger:info_msg("Comment: worker ~p entry ~p cid ~p",[Owner, Eid, Cid]),
+      kvs_comment:add(E#entry.feed_id, From, E#entry.entry_id, Parent, Cid, Content, Medias)
+    end || E <- kvs:all_by_index(entry, entry_id, Eid)];
+
+    true -> skip end,
+  {noreply, State};
 
 handle_notice(["feed", "user", UId, "post_note"] = Route,
     Message, #state{owner = Owner, feed = Feed} = State) ->
-     ?INFO("feed(~p): post_note: Owner=~p, Route=~p, Message=~p", [self(), Owner, Route, Message]),
+     error_logger:info_msg("feed(~p): post_note: Owner=~p, Route=~p, Message=~p", [self(), Owner, Route, Message]),
     Note = Message,
     Id = utils:uuid_ex(),
     kvs_feed:add_entry(Feed, UId, [], Id, Note, [], {user, system_note}, ""),
     {noreply, State};
 
-handle_notice(["kvs_feed", _, WhoShares, "entry", NewEntryId, "share"] = Route,
+handle_notice(["kvs_feed", _, WhoShares, "entry", NewEntryId, "share"],
                 #entry{entry_id = _EntryId, description = Desc, media = Medias, to = Destinations,
                 from = From} = E, #state{feed = Feed, type = user} = State) ->
     %% FIXME: sharing is like posting to the wall
-    ?INFO("share: ~p, WhoShares: ~p", [E, WhoShares]),
+    error_logger:info_msg("share: ~p, WhoShares: ~p", [E, WhoShares]),
     kvs_feed:add_entry(Feed, From, Destinations, NewEntryId, Desc, Medias, {user, normal}, WhoShares),
     {noreply, State};
 
 handle_notice(["kvs_feed", "group", _Group, "entry", EntryId, "delete"] = Route,
               Message, #state{owner = Owner, feed = Feed} = State) ->
-    ?INFO("feed(~p): remove entry: Owner=~p, Route=~p, Message=~p",
+    error_logger:info_msg("feed(~p): remove entry: Owner=~p, Route=~p, Message=~p",
           [self(), Owner, Route, Message]),
     %% all group subscribers shold delete entry from their feeds
     kvs_feed:remove_entry(Feed, EntryId),
@@ -281,12 +260,12 @@ handle_notice(["kvs_feed", _Type, EntryOwner, "entry", EntryId, "delete"] = Rout
     case {EntryOwner, Message} of
         %% owner of the antry has deleted entry, we will delete it too
         {_, [EntryOwner|_]} ->
-            ?INFO("feed(~p): remove entry: Owner=~p, Route=~p, Message=~p", [self(), Owner, Route, Message]),
+            error_logger:info_msg("feed(~p): remove entry: Owner=~p, Route=~p, Message=~p", [self(), Owner, Route, Message]),
             kvs_feed:remove_entry(Feed, EntryId),
             kvs_feed:remove_entry(Direct, EntryId);
         %% we are owner of the entry - delete it
         {Owner, _} ->
-            ?INFO("feed(~p): remove entry: Owner=~p, Route=~p, Message=~p", [self(), Owner, Route, Message]),
+            error_logger:info_msg("feed(~p): remove entry: Owner=~p, Route=~p, Message=~p", [self(), Owner, Route, Message]),
             kvs_feed:remove_entry(Feed, EntryId),
             kvs_feed:remove_entry(Direct, EntryId);
         %% one of the friends has deleted some entry from his feed. Ignore
@@ -294,23 +273,9 @@ handle_notice(["kvs_feed", _Type, EntryOwner, "entry", EntryId, "delete"] = Rout
     self() ! {feed_refresh, State#state.feed, ?CACHED_ENTRIES},
     {noreply, State};
 
-handle_notice(["kvs_feed", _Type, _EntryOwner, "entry", EntryId, "edit"] = Route,
-              Message, #state{owner = Owner, feed=Feed} = State) ->
-    [NewDescription|_] = Message,
-    ?INFO("feed(~p): edit: Owner=~p, Route=~p, Message=~p", [self(), Owner, Route, Message]),
-    kvs_feed:edit_entry(Feed, EntryId, NewDescription),
-    {noreply, State};
-
-handle_notice(["kvs_feed", _Type, _EntryOwner, "comment", CommentId, "add"] = Route,
-              Message, #state{owner = Owner, feed=Feed} = State) ->
-    [From, EntryId, ParentComment, Content, Medias] = Message,
-    ?INFO("feed(~p): add comment: Owner=~p, Route=~p, Message=~p", [self(), Owner, Route, Message]),
-    kvs_comment:add(Feed, From, EntryId, ParentComment, CommentId, Content, Medias),
-    {noreply, State};
-
 handle_notice(["kvs_feed", "user", UId, "count_entry_in_statistics"] = Route, 
     Message, #state{owner = Owner, type =Type} = State) ->
-    ?INFO("queue_action(~p): count_entry_in_statistics: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
+    error_logger:info_msg("queue_action(~p): count_entry_in_statistics: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
     case kvs:get(user_etries_count, UId) of
         {ok, UEC} -> 
             kvs:put(UEC#user_etries_count{entries = UEC#user_etries_count.entries+1 }),
@@ -322,7 +287,7 @@ handle_notice(["kvs_feed", "user", UId, "count_entry_in_statistics"] = Route,
 
 handle_notice(["kvs_feed", "user", UId, "count_comment_in_statistics"] = Route, 
     Message, #state{owner = Owner, type =Type} = State) ->
-    ?INFO("queue_action(~p): count_comment_in_statistics: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
+    error_logger:info_msg("queue_action(~p): count_comment_in_statistics: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
     case kvs:get(user_etries_count, UId) of
         {ok, UEC} -> kvs:put(UEC#user_etries_count{comments = UEC#user_etries_count.comments+1 });
         {error, _} -> kvs:put(#user_etries_count{ user_id = UId, comments = 1 }) end,
@@ -330,10 +295,10 @@ handle_notice(["kvs_feed", "user", UId, "count_comment_in_statistics"] = Route,
 
 handle_notice(["kvs_feed","likes", _, _, "add_like"] = Route,  % _, _ is here beacause of the same message used for comet update
     Message, #state{owner = Owner, type =Type} = State) ->
-    ?INFO("queue_action(~p): add_like: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
+    error_logger:info_msg("queue_action(~p): add_like: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
     {UId, E} = Message,
     {EId, FId} = E#entry.id,
     kvs_feed:add_like(FId, EId, UId),
     {noreply, State};
 
-handle_notice(Route, Message, State) -> error_logger:error_msg("Unknown FEED notice").
+handle_notice(Route, _Message, State) -> error_logger:error_msg("Unknown FEED notice ~p", [Route]), {noreply, State}.

+ 19 - 16
src/kvs_group.erl

@@ -17,23 +17,24 @@ retrieve_groups(User) ->
                                    _ -> undefined end end || {UC, GId} <- UC_GId],
                [X||X<-Result,X/=undefined] end.
 
-create(Creator, GroupName, GroupFullName, Desc, Publicity) ->
+create(Creator, Id, Name, Desc, Publicity) ->
     Feed = kvs_feed:create(),
     Time = erlang:now(),
-    Group = #group{id = GroupName, name = GroupFullName, description = Desc, scope = Publicity,
+    Group = #group{id = Id, name = Name, description = Desc, scope = Publicity,
                    creator = Creator, created = Time, owner = Creator, feed = Feed},
+    error_logger:info_msg("PUT ~p", [Group]),
     kvs:put(Group),
-    init_mq(Group),
-    mqs:notify([group, init], {GroupName, Feed}),
-    add(Creator, GroupName, member),
-    GroupName.
+%    init_mq(Group),
+%    mqs:notify([group, init], {GroupName, Feed}),
+    add(Creator, Id, member),
+  {ok, Group}.
 
 
 delete(GroupName) ->
     case kvs:get(group,GroupName) of 
         {error,_} -> ok;
         {ok, Group} ->
-            mqs:notify([feed, delete, GroupName], empty),
+%            mqs:notify([feed, delete, GroupName], empty),
             kvs:delete_by_index(group_subscription, <<"where_bin">>, GroupName),
             kvs:delete(feed, Group#group.feed),
             kvs:delete(group, GroupName),
@@ -118,7 +119,7 @@ user_has_access(UserName, GroupName) ->
                 {private, member} -> true;
                 _ -> false end end.
 
-handle_notice(["kvs_group", "create"] = Route, 
+handle_notice([kvs_group, create] = Route,
     Message, #state{owner = Owner, type =Type} = State) ->
     ?INFO("queue_action(~p): create_group: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
     {Creator, GroupName, FullName, Desc, Publicity} = Message,
@@ -145,22 +146,24 @@ handle_notice(["kvs_group", "remove", GroupName] = Route,
     delete(GroupName),
     {noreply, State};
 
-handle_notice(["kvs_group", "join", GroupName] = Route,
-    Message, #state{owner = Owner, type =Type} = State) ->
-    {GroupName, UserName, Type} = Message,
-    join(UserName, GroupName),
-    subscription_mq(group, add, UserName, GroupName),
-    {noreply, State};
+handle_notice([kvs_group, join, GroupName],
+    {UserName, Type}, #state{type=Type} = State) ->
+  error_logger:info_msg("Join group:  ~p State type:~p", [GroupName, Type]),
+  join(UserName, GroupName),
+%    subscription_mq(group, add, UserName, GroupName),
+  {noreply, State};
 
 handle_notice(["kvs_group", "leave", GroupName] = Route,
     Message, #state{owner = Owner, type =Type} = State) ->
     ?INFO("queue_action(~p): remove_from_group: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
     {UserName} = Message,
     leave(UserName,GroupName),
-    subscription_mq(group, remove, UserName, GroupName),
+%    subscription_mq(group, remove, UserName, GroupName),
     {noreply, State};
 
-handle_notice(Route, Message, State) -> error_logger:info_msg("Unknown GROUP notice").
+handle_notice(_Route, _Message, State) ->
+%  error_logger:info_msg("Unknown GROUP notice"),
+  {noreply, State}.
 
 build_group_relations(Group) -> [
     mqs:key( [kvs_group, create] ),

+ 139 - 0
src/kvs_products.erl

@@ -0,0 +1,139 @@
+-module(kvs_products).
+-copyright('Synrc Research Center s.r.o.').
+-include_lib("kvs/include/products.hrl").
+-include_lib("kvs/include/users.hrl").
+-include_lib("kvs/include/groups.hrl").
+-include_lib("kvs/include/accounts.hrl").
+-include_lib("kvs/include/log.hrl").
+-include_lib("kvs/include/config.hrl").
+-include_lib("kvs/include/feed_state.hrl").
+-include_lib("mqs/include/mqs.hrl").
+-compile(export_all).
+
+register(#product{} = Registration) ->
+    Id = kvs:next_id("product", 1),
+    Product = Registration#product{id = Id,
+      name = "product"++integer_to_list(Id),
+      feed = kvs_feed:create(),
+      blog = kvs_feed:create(),
+      features = kvs_feed:create(),
+      specs = kvs_feed:create(),
+      gallery = kvs_feed:create(),
+      videos = kvs_feed:create(),
+      bundles = kvs_feed:create(),
+      creation_date = erlang:now()
+    },
+    error_logger:info_msg("PUT PRODUCT ~p", [Product]),
+    kvs:put(Product),
+%    init_mq(Product),
+    {ok, Product}.
+
+delete(Name) ->
+    case kvs:get(product, Name) of
+        {ok, Product} ->
+            GIds = kvs_group:participate(Name),
+            [ mqs:notify(["subscription", "product", Name, "remove_from_group"], {GId}) || GId <- GIds ],
+            F2U = [ {MeId, FrId} || #subscription{who = MeId, whom = FrId} <- subscriptions(Product) ],
+            [ unsubscribe(MeId, FrId) || {MeId, FrId} <- F2U ],
+            [ unsubscribe(FrId, MeId) || {MeId, FrId} <- F2U ],
+%            kvs:delete(user_status, Name),
+            kvs:delete(product, Name),
+            {ok, Product};
+        E -> E end.
+
+subscribe(Who, Whom) ->
+    Record = #subscription{key={Who,Whom}, who = Who, whom = Whom},
+    kvs:put(Record).
+
+unsubscribe(Who, Whom) ->
+    case subscribed(Who, Whom) of
+        true  -> kvs:delete(subscription, {Who, Whom});
+        false -> skip end.
+
+subscriptions(undefined)-> [];
+subscriptions(#product{name = UId}) -> subscriptions(UId);
+
+subscriptions(UId) -> DBA=?DBA, DBA:subscriptions(UId).
+subscribed(Who) -> DBA=?DBA, DBA:subscribed(Who).
+
+subscribed(Who, Whom) ->
+    case kvs:get(subscription, {Who, Whom}) of
+        {ok, _} -> true;
+        _ -> false end.
+
+subscription_mq(Type, Action, Who, Whom) ->
+    case mqs:open([]) of
+        {ok,Channel} ->
+            case {Type,Action} of 
+                {user,add}     -> mqs_channel:bind_exchange(Channel, ?USER_EXCHANGE(Who), ?NOTIFICATIONS_EX, rk_product_feed(Whom));
+                {user,remove}  -> mqs_channel:unbind_exchange(Channel, ?USER_EXCHANGE(Who), ?NOTIFICATIONS_EX, rk_product_feed(Whom)) end,
+            mqs_channel:close(Channel);
+        {error,Reason} -> ?ERROR("subscription_mq error: ~p",[Reason]) end.
+
+init_mq(Product=#product{}) ->
+    Groups = kvs_group:participate(Product),
+    ProductExchange = ?USER_EXCHANGE(Product#product.name),
+    ExchangeOptions = [{type, <<"fanout">>}, durable, {auto_delete, false}],
+    case mqs:open([]) of
+        {ok, Channel} ->
+            ?INFO("Cration Exchange: ~p,",[{Channel,ProductExchange,ExchangeOptions}]),
+            mqs_channel:create_exchange(Channel, ProductExchange, ExchangeOptions),
+            Relations = build_user_relations(Product, Groups),
+            [ mqs_channel:bind_exchange(Channel, ?USER_EXCHANGE(Product#product.name), ?NOTIFICATIONS_EX, Route) || Route <- Relations],
+            mqs_channel:close(Channel);
+        {error,Reason} -> ?ERROR("init_mq error: ~p",[Reason]) end.
+
+build_user_relations(Product, Groups) -> [
+    mqs:key( [kvs_product, '*', Product]),
+    mqs:key( [kvs_feed, product, Product, '*', '*', '*']),
+    mqs:key( [kvs_feed, product, Product, '*'] ),
+    mqs:key( [kvs_payment, product, Product, '*']),
+    mqs:key( [kvs_account, product, Product, '*']),
+    mqs:key( [kvs_meeting, product, Product, '*']),
+    mqs:key( [kvs_purchase, product, Product, '*']) |
+  [ mqs:key( [kvs_feed, group, G, '*', '*', '*']) || G <- Groups ]
+    ].
+
+rk_product_feed(Product) -> mqs:key([kvs_feed, product, Product, '*', '*', '*']).
+
+retrieve_connections(Id,Type) ->
+    Friends = case Type of 
+        user -> kvs_product:list_subscr_usernames(Id);
+        _ -> kvs_group:list_group_members(Id) end,
+    case Friends of
+        [] -> [];
+        Full -> Sub = lists:sublist(Full, 10),
+            case Sub of
+                [] -> [];
+                _ ->
+                    Data = [begin
+                        case kvs:get(user,Who) of
+                            {ok,Product} -> RealName = kvs_product:user_realname_user(Product),
+                            Paid = kvs_payment:user_paid(Who),
+                            {Who,Paid,RealName};
+                        _ -> undefined end end || Who <- Sub],
+                    [ X || X <- Data, X/=undefined ] end end.
+
+handle_notice(["kvs_product", "subscribe", Who],
+    Message, #state{owner = _Owner, type =_Type} = State) ->
+    {Whom} = Message,
+    kvs_product:subscribe(Who, Whom),
+    subscription_mq(user, add, Who, Whom),
+    {noreply, State};
+
+handle_notice(["kvs_product", "unsubscribe", Who],
+    Message, #state{owner = _Owner, type =_Type} = State) ->
+    {Whom} = Message,
+    kvs_product:unsubscribe(Who, Whom),
+    subscription_mq(user, remove, Who, Whom),
+    {noreply, State};
+
+handle_notice(["kvs_product", "update", _Who],
+    Message, #state{owner = _Owner, type =_Type} = State) ->
+    {NewProduct} = Message,
+    kvs:put(NewProduct),
+    {noreply, State};
+
+handle_notice(_Route, _Message, State) ->
+  %error_logger:info_msg("Unknown USERS notice"),
+  {noreply, State}.

+ 3 - 1
src/kvs_user.erl

@@ -176,4 +176,6 @@ handle_notice(["kvs_user", "update", Who] = Route,
     kvs:put(NewUser),
     {noreply, State};
 
-handle_notice(Route, Message, State) -> error_logger:info_msg("Unknown USERS notice").
+handle_notice(Route, Message, State) -> 
+  %error_logger:info_msg("Unknown USERS notice"), 
+  {noreply, State}.

+ 4 - 1
src/store_mnesia.erl

@@ -11,6 +11,7 @@
 -include_lib("kvs/include/membership.hrl").
 -include_lib("kvs/include/payments.hrl").
 -include_lib("kvs/include/purchases.hrl").
+-include_lib("kvs/include/products.hrl").
 -include_lib("kvs/include/accounts.hrl").
 -include_lib("kvs/include/log.hrl").
 -include_lib("kvs/include/translations.hrl").
@@ -69,6 +70,8 @@ initialize() ->
     ?CREATE_TAB(id_seq),
     ?CREATE_TAB(transaction),
     ?CREATE_TAB(translation),
+    ?CREATE_TAB(product),
+    ?CREATE_TAB(product_category),
     mnesia:wait_for_tables([comment,subscription,group,group_subscription,user,entry],5000),
     add_indexes(),
     ok.
@@ -162,7 +165,7 @@ exec(Q) -> F = fun() -> qlc:e(Q) end, {atomic, Val} = mnesia:transaction(F), Val
 % index funs
 
 products(UId) -> all_by_index(user_product, #user_product.username, UId).
-subscriptions(UId) -> all_by_index(subsciption, #subscription.who, UId).
+subscriptions(UId) -> all_by_index(subscription, #subscription.who, UId).
 subscribed(Who) -> all_by_index(subscription, #subscription.whom, Who).
 participate(UserName) -> all_by_index(group_subscription, #group_subscription.who, UserName).
 members(GroupName) -> all_by_index(group_subscription, #group_subscription.where, GroupName).