Просмотр исходного кода

make riak's notfound and mnesia's no_found agnostic

Maxim Sokhatsky 12 лет назад
Родитель
Сommit
4d766bea29
11 измененных файлов с 51 добавлено и 66 удалено
  1. 4 4
      include/groups.hrl
  2. 1 0
      src/kvs.erl
  3. 5 16
      src/kvs_account.erl
  4. 2 2
      src/kvs_acl.erl
  5. 18 18
      src/kvs_feed.erl
  6. 7 7
      src/kvs_group.erl
  7. 1 3
      src/kvs_meeting.erl
  8. 6 9
      src/kvs_payment.erl
  9. 2 2
      src/kvs_user.erl
  10. 4 4
      src/store_mnesia.erl
  11. 1 1
      src/store_riak.erl

+ 4 - 4
include/groups.hrl

@@ -15,10 +15,10 @@
 
 -record(group_subscription, {
         key,
-        user_id,
-        group_id,
-        user_type,
-        user_posts_count = 0 :: integer() % we need this for sorting and counting is expensive
+        who,
+        where,
+        type,
+        posts_count = 0 :: integer() % we need this for sorting and counting is expensive
         }).
 
 -define(GROUP_EXCHANGE(GroupId), list_to_binary("group_exchange."++GroupId++".fanout")).

+ 1 - 0
src/kvs.erl

@@ -116,6 +116,7 @@ add_sample_users() ->
     {ok, Quota} = kvs:get(config,"accounts/default_quota", 300),
 
     [ begin
+        [ kvs_group:join(Me#user.username,G#group.username) || G <- Groups ],
           kvs_account:create_account(Me#user.username),
           kvs_account:transaction(Me#user.username, quota, Quota, #tx_default_assignment{}),
           kvs:put(Me#user{password = kvs:sha(Me#user.password), starred = kvs_feed:create(), pinned = kvs_feed:create()})

+ 5 - 16
src/kvs_account.erl

@@ -64,24 +64,13 @@ check_quota(User, Amount) ->
 
 commit_transaction(#transaction{remitter = R, acceptor = A,  currency = Currency, amount = Amount} = TX) ->
     case change_accounts(R, A, Currency, Amount) of
-         ok -> skip;
-              %mqs:notify([transaction, user, R, add_transaction], TX),% notify_transaction(R,TX),
-              %mqs:notify([transaction, user, A, add_transaction], TX);%notify_transaction(A,TX);
-         Error ->  skip
-%            case TX#transaction.info of
-%                #tx_game_event{} ->
-%                    mqs:notify_transaction(R,TX),
-%                    mqs:notify_transaction(A,TX);
-%                _ ->
-%                    ?ERROR("commit transaction error: change accounts ~p", [Error]),
-%                    Error
-%            end
-    end.
+         ok -> mqs:notify([transaction, user, R, add_transaction], TX),
+               mqs:notify([transaction, user, A, add_transaction], TX);
+         Error -> skip end.
 
 change_accounts(Remitter, Acceptor, Currency, Amount) ->
     case {kvs:get(account,{Remitter, Currency}), kvs:get(account,{Acceptor, Currency})} of
         {{ok, RA = #account{}}, {ok, AA = #account{}}}  ->
-            ?INFO("transacrion: RemitterAccount ~p, AcceptorAccount: ~p", [RA, AA]),
             %% check balance for remitter according to currency and amount
             case check_remitter_balance(RA, Amount) of
                 ok ->   RA1 = RA#account{credit = RA#account.credit + Amount, last_change = -Amount },
@@ -107,7 +96,7 @@ transactions(UserId) -> tx_list(UserId, undefined, 10000).
 tx_list(UserId, undefined, PageAmount) ->
     case kvs:get(user_transaction, UserId) of
         {ok, O} when O#user_transaction.top =/= undefined -> tx_list(UserId, O#user_transaction.top, PageAmount);
-        {error, notfound} -> [] end;
+        {error, _} -> [] end;
 tx_list(UserId, StartFrom, Limit) ->
     case kvs:get(transaction,StartFrom) of
         {ok, #transaction{next = N}=P} -> [ P | kvs:traversal(transaction, #transaction.next, N, Limit)];
@@ -152,7 +141,7 @@ add_transaction_to_user(UserId,Purchase) ->
                            next = TopEntry#transaction.next,
                            prev = EntryId },
                     kvs:put(EditedEntry); % update prev entry
-                 {error,notfound} -> Next = undefined
+                 {error, _} -> Next = undefined
              end
     end,
 

+ 2 - 2
src/kvs_acl.erl

@@ -78,7 +78,7 @@ acl_entries(AclId) ->
     RA = kvs:get(acl, erlang:list_to_binary(AclStr)),
     case RA of
         {ok,RO} -> riak_read_acl_entries(RO#acl.top, []);
-        {error, notfound} -> [] end.
+        {error, _} -> [] end.
 
 riak_read_acl_entries(undefined, Result) -> Result;
 riak_read_acl_entries(Next, Result) ->
@@ -86,7 +86,7 @@ riak_read_acl_entries(Next, Result) ->
     RA = kvs:get(acl_entry,erlang:list_to_binary(NextStr)),
     case RA of
          {ok,RO} -> riak_read_acl_entries(RO#acl_entry.prev, Result ++ [RO]);
-         {error,notfound} -> Result end.
+         {error, _} -> Result end.
 
 acl_add_entry(Resource, Accessor, Action) ->
     Acl = case kvs:get(acl, Resource) of

+ 18 - 18
src/kvs_feed.erl

@@ -29,7 +29,7 @@ add_entry(FId, User, To, EntryId, Desc, Medias, Type, SharedBy, _) ->
                undefined -> undefined;
                X -> case kvs:get(entry, X) of
                        {ok, TopEntry} -> EditedEntry = TopEntry#entry{next = Id}, kvs:put(EditedEntry), TopEntry#entry.id;
-                       {error,notfound} -> undefined end end,
+                       {error, _} -> undefined end end,
 
     kvs:put(#feed{id = FId, top = {EntryId, FId}}), % update feed top with current
 
@@ -50,7 +50,7 @@ entry_traversal(undefined, _) -> [];
 entry_traversal(_, 0) -> [];
 entry_traversal(Next, Count)->
     case kvs:get(entry, Next) of
-        {error,notfound} -> [];
+        {error, _} -> [];
         {ok, R} ->
             Prev = element(#entry.prev, R),
             Count1 = case Count of 
@@ -67,7 +67,7 @@ entry_traversal(Next, Count)->
 entries(FeedId, undefined, PageAmount) ->
     case kvs:get(feed, FeedId) of
         {ok, O} -> entry_traversal(O#feed.top, PageAmount);
-        {error, notfound} -> [] end;
+        {error, _} -> [] end;
 entries(FeedId, StartFrom, PageAmount) ->
     case kvs:get(entry,{StartFrom, FeedId}) of
         {ok, #entry{prev = Prev}} -> entry_traversal(Prev, PageAmount);
@@ -93,7 +93,7 @@ add_like(Fid, Eid, Uid) ->
                 one_like_head = Write_one_like(ELikes#entry_likes.one_like_head), 
                 total_count = ELikes#entry_likes.total_count + 1
             });
-        {error, notfound} ->
+        {error, _} ->
             kvs:put(#entry_likes{
                 entry_id = Eid,                
                 one_like_head = Write_one_like(undefined),
@@ -107,7 +107,7 @@ add_like(Fid, Eid, Uid) ->
                 one_like_head = Write_one_like(ULikes#user_likes.one_like_head),
                 total_count = ULikes#user_likes.total_count + 1
             });
-        {error, notfound} ->
+        {error, _} ->
             kvs:put(#user_likes{
                 user_id = Uid,                
                 one_like_head = Write_one_like(undefined),
@@ -118,12 +118,12 @@ add_like(Fid, Eid, Uid) ->
 entries_count(Uid) ->
     case kvs:get(user_etries_count, Uid) of
         {ok, UEC} -> UEC#user_etries_count.entries;
-        {error, notfound} -> 0 end.
+        {error, _} -> 0 end.
 
 comments_count(Uid) ->
     case kvs:get(user_etries_count, Uid) of
         {ok, UEC} -> UEC#user_etries_count.comments;
-        {error, notfound} -> 0 end.
+        {error, _} -> 0 end.
 
 remove_entry(FeedId, EId) ->
     {ok, #feed{top = TopId} = Feed} = kvs:get(feed,FeedId),
@@ -133,7 +133,7 @@ remove_entry(FeedId, EId) ->
             case kvs:get(entry, Next) of {ok, NE} -> kvs:put(NE#entry{prev = Prev});  _ -> ok end,
             case kvs:get(entry, Prev) of {ok, PE} -> kvs:put(PE#entry{next = Next});  _ -> ok end,
             case TopId of {EId, FeedId} -> kvs:put(Feed#feed{top = Prev}); _ -> ok end;
-        {error, notfound} -> ?INFO("Not found"), ok
+        {error, _} -> ?INFO("Not found"), ok
     end,
     kvs:delete(entry, {EId, FeedId}).
 
@@ -143,7 +143,7 @@ edit_entry(FeedId, EId, NewDescription) ->
             NewEntryRaw =  OldEntry#entry{description = NewDescription, raw_description = NewDescription},
             NewEntry = feedformat:format(NewEntryRaw),
             kvs:put(NewEntry);
-        {error, notfound}-> {error, notfound} end.
+        {error, Reason}-> {error, Reason} end.
 
 like_list(undefined) -> [];
 like_list(Id) -> {ok, OneLike} = kvs:get(one_like, Id), [OneLike] ++ like_list(OneLike#one_like.next).
@@ -154,27 +154,27 @@ like_list(Id, N) -> {ok, OneLike} = kvs:get(one_like, Id), [OneLike] ++ like_lis
 entry_likes(Entry_id) ->
     case kvs:get(entry_likes, Entry_id) of
         {ok, Likes} -> like_list(Likes#entry_likes.one_like_head);
-        {error, notfound} -> [] end.
+        {error, _} -> [] end.
 
 entry_likes_count(Entry_id) ->
     case kvs:get(entry_likes, Entry_id) of
         {ok, Likes} -> Likes#entry_likes.total_count;
-        {error, notfound} -> 0 end.
+        {error, _} -> 0 end.
 
 user_likes_count(UserId) ->
     case kvs:get(user_likes, UserId) of
         {ok, Likes} -> Likes#user_likes.total_count;
-        {error, notfound} -> 0 end.
+        {error, _} -> 0 end.
 
 user_likes(UserId) ->
     case kvs:get(user_likes, UserId) of
         {ok, Likes} -> like_list(Likes#user_likes.one_like_head);
-        {error, notfound} -> [] end.
+        {error, _} -> [] end.
 
 user_likes(UserId, {Page, PageAmount}) ->
     case kvs:get(user_likes, UserId) of
         {ok, Likes} -> lists:nthtail((Page-1)*PageAmount, like_list(Likes#user_likes.one_like_head, PageAmount*Page));
-        {error, notfound} -> [] end.
+        {error, _} -> [] end.
 
 purge_feed(FeedId) ->
     {ok,Feed} = kvs:get(feed,FeedId),
@@ -198,8 +198,8 @@ handle_notice(["kvs_feed", "group", GroupId, "entry", EntryId, "add"] = Route, [
             GE = Group#group.entries_count,
             kvs:put(Group#group{entries_count = GE+1}),
             {ok, Subs} = kvs:get(group_subscription, {From, GroupId}),
-            SE = Subs#group_subscription.user_posts_count,
-            kvs:put(Subs#group_subscription{user_posts_count = SE+1})
+            SE = Subs#group_subscription.posts_count,
+            kvs:put(Subs#group_subscription{posts_count = SE+1})
     end,
     self() ! {feed_refresh,Feed,20},
     {noreply, State};
@@ -316,7 +316,7 @@ handle_notice(["kvs_feed", "user", UId, "count_entry_in_statistics"] = Route,
         {ok, UEC} -> 
             kvs:put(UEC#user_etries_count{entries = UEC#user_etries_count.entries+1 }),
             kvs_users:attempt_active_user_top(UId, UEC#user_etries_count.entries+1);
-        {error, notfound} ->
+        {error, _} ->
             kvs:put(#user_etries_count{user_id = UId, entries = 1 }),
             kvs_users:attempt_active_user_top(UId, 1) end,
     {noreply, State};
@@ -326,7 +326,7 @@ handle_notice(["kvs_feed", "user", UId, "count_comment_in_statistics"] = Route,
     ?INFO("queue_action(~p): count_comment_in_statistics: Owner=~p, Route=~p, Message=~p", [self(), {Type, Owner}, Route, Message]),
     case kvs:get(user_etries_count, UId) of
         {ok, UEC} -> kvs:put(UEC#user_etries_count{comments = UEC#user_etries_count.comments+1 });
-        {error, notfound} -> kvs:put(#user_etries_count{ user_id = UId, comments = 1 }) end,
+        {error, _} -> kvs:put(#user_etries_count{ user_id = UId, comments = 1 }) end,
     {noreply, State};
 
 handle_notice(["kvs_feed","likes", _, _, "add_like"] = Route,  % _, _ is here beacause of the same message used for comet update

+ 7 - 7
src/kvs_group.erl

@@ -46,10 +46,10 @@ delete(GroupName) ->
                     mqs_channel:close(Channel);
                 {error,Reason} -> ?ERROR("delete group failed: ~p",[Reason]) end end.
 
-participate(UserName) -> [GroupName || #group_subscription{group_id=GroupName} <- kvs:all_by_index(group_subscription, <<"who_bin">>, UserName) ].
-members(GroupName) -> [UserName || #group_subscription{user_id=UserName} <- kvs:all_by_index(group_subscription, <<"where_bin">>, GroupName) ].
-members_by_type(GroupName, Type) -> [UserName || #group_subscription{user_id=UserName, user_type=T} <- kvs:all_by_index(group_subscription, <<"where_bin">>, GroupName), T == Type ].
-members_with_types(GroupName) -> [{UserName, Type} || #group_subscription{user_id=UserName, user_type=Type} <- kvs:all_by_index(group_subscriptioin, <<"where_bin">>, list_to_binary(GroupName)) ].
+participate(UserName) -> [GroupName || #group_subscription{where=GroupName} <- kvs:all_by_index(group_subscription, <<"who_bin">>, UserName) ].
+members(GroupName) -> [UserName || #group_subscription{who=UserName} <- kvs:all_by_index(group_subscription, <<"where_bin">>, GroupName) ].
+members_by_type(GroupName, Type) -> [UserName || #group_subscription{who=UserName, type=T} <- kvs:all_by_index(group_subscription, <<"where_bin">>, GroupName), T == Type ].
+members_with_types(GroupName) -> [{UserName, Type} || #group_subscription{who=UserName, type=Type} <- kvs:all_by_index(group_subscriptioin, <<"where_bin">>, list_to_binary(GroupName)) ].
 
 owner(UserName, GroupName) ->
     case kvs:get(group, GroupName) of
@@ -63,11 +63,11 @@ member(UserName, GroupName) ->
 
 member_type(UserName, GroupName) ->
     case kvs:get(group_subs, {UserName, GroupName}) of
-        {error, notfound} -> not_in_group;
-        {ok, #group_subscription{user_type=Type}} -> Type end.
+        {error, _} -> not_in_group;
+        {ok, #group_subscription{type=Type}} -> Type end.
 
 add(UserName, GroupName, Type) ->
-    kvs:put(#group_subscription{key={UserName,GroupName},user_id=UserName, group_id=GroupName, user_type=Type}),
+    kvs:put(#group_subscription{key={UserName,GroupName},who=UserName, where=GroupName, type=Type}),
     {ok, Group} = kvs:get(group, GroupName),
     Users = Group#group.users_count,
     kvs:put(Group#group{users_count = Users + 1}).

+ 1 - 3
src/kvs_meeting.erl

@@ -24,9 +24,7 @@ create(UID, Name, Desc, Date, Time, Players, Quota, Awards, Type, Game, Mode, To
 get(TID) ->
     case kvs:get(meeting, TID) of
         {ok, Tournament} -> Tournament;
-        {error, not_found} -> #meeting{};
-        {error, notfound} -> #meeting{}
-    end.
+        {error, _} -> #meeting{} end.
 
 start(_TID) -> ok.
 join(UID, TID) -> kvs:join_tournament(UID, TID).

+ 6 - 9
src/kvs_payment.erl

@@ -7,12 +7,10 @@
 -compile(export_all).
 
 user_paid(UId) ->
-    {_, UP} = kvs:get(user_payment, UId),
-    case UP of
-        notfound -> false;
-        #user_payment{top = undefined} -> false;
-        _ -> true
-    end.
+    case kvs:get(user_payment, UId) of
+        {error,_} -> false;
+        {ok,#user_payment{top = undefined}} -> false;
+        _ -> true end.
 
 default_if_undefined(Value, Undefined, Default) ->
     case Value of
@@ -53,7 +51,7 @@ add_payment(#payment{} = MP, State0, Info) ->
             Id = default_if_undefined(MP#payment.id, undefined, payment_id()),
             Purchase = MP#payment{id = Id, state = State, start_time = Start, state_log = StateLog},
             %mqs:notify_purchase(Purchase),
-            ?INFO("Purchase added ~p ~p",[Purchase#payment.user_id, Purchase]),
+%            ?INFO("Payment added ~p ~p",[Purchase#payment.user_id, Purchase]),
             add_to_user(Purchase#payment.user_id, Purchase)
     end.
 
@@ -74,8 +72,7 @@ add_to_user(UserId,Payment) ->
                      Next = TopEntry#payment.id,
                      EditedEntry = TopEntry#payment{next = TopEntry#payment.next, prev = EntryId},
                      kvs:put(EditedEntry);
-                {error,notfound} -> Next = undefined end
-    end,
+                {error, _} -> Next = undefined end end,
 
     kvs:put(#user_payment{ user = UserId, top = EntryId}), % update team top with current
 

+ 2 - 2
src/kvs_user.erl

@@ -46,7 +46,7 @@ process_register(#user{username=U} = RegisterData0) ->
 
 check_username(Name, FbId) ->
     case kvs_users:get(Name) of
-        {error, notfound} -> {ok, Name};
+        {error, _} -> {ok, Name};
         {ok, User} when FbId =/= undefined -> check_username(User#user.username  ++ integer_to_list(crypto:rand_uniform(0,10)), FbId);
         {ok, _}-> {error, username_taken} end.
 
@@ -97,7 +97,7 @@ subscription_mq(Type, Action, MeId, ToId) ->
 
 init_mq(User=#user{}) ->
     Groups = kvs_group:participate(User),
-    ?INFO("~p init mq. users: ~p", [User, Groups]),
+%    ?INFO("~p init mq. users: ~p", [User, Groups]),
     UserExchange = ?USER_EXCHANGE(User#user.username),
     ExchangeOptions = [{type, <<"fanout">>}, durable, {auto_delete, false}],
     case mqs:open([]) of

+ 4 - 4
src/store_mnesia.erl

@@ -18,7 +18,7 @@
 -compile(export_all).
 -define(CREATE_TAB(T), create_table(T, record_info(fields, T), [{storage, permanent}]) ).
 
-start() -> mnesia:change_table_copy_type(schema, node(), disc_copies).
+start() -> mnesia:start(), mnesia:change_table_copy_type(schema, node(), disc_copies).
 stop() -> mnesia:stop().
 delete() -> mnesia:delete_schema([node()]).
 version() -> {version,"KVS MNESIA Embedded"}.
@@ -46,6 +46,8 @@ initialize() ->
     ok = add_table_index(comment, entry_id),
     ok = add_table_index(subscription, who),
     ok = add_table_index(subscription, whom),
+    ok = add_table_index(group_subscription, who),
+    ok = add_table_index(group_subscription, where),
     ok = add_table_index(entry, feed_id),
     ok = add_table_index(entry, entry_id),
     ok = add_table_index(entry, from),
@@ -95,9 +97,7 @@ all(RecordName) -> flatten(fun() -> Lists = mnesia:all_keys(RecordName), [ mnesi
 all_by_index(RecordName,Key,Value) -> flatten(fun() -> mnesia:index_read(RecordName,Value,Key) end).
 
 next_id(RecordName) -> next_id(RecordName, 1).
-next_id(RecordName, Incr) ->
-%    [RecordStr] = io_lib:format("~p",[RecordName]),
-    mnesia:dirty_update_counter({id_seq, RecordName}, Incr).
+next_id(RecordName, Incr) -> mnesia:dirty_update_counter({id_seq, RecordName}, Incr).
 
 just_one(Fun) ->
     case mnesia:transaction(Fun) of

+ 1 - 1
src/store_riak.erl

@@ -55,7 +55,7 @@ make_indices(#subscription{who=Who, whom=Whom}) -> [
     {<<"who_bin">>, key_to_bin(Who)},
     {<<"whom_bin">>, key_to_bin(Whom)}];
 
-make_indices(#group_subscription{user_id=UId, group_id=GId}) -> [
+make_indices(#group_subscription{who=UId, where=GId}) -> [
     {<<"who_bin">>, key_to_bin(UId)},
     {<<"where_bin">>, key_to_bin(GId)}];