Maxim Sokhatsky 12 лет назад
Родитель
Сommit
d52eb9c94d
8 измененных файлов с 52 добавлено и 135 удалено
  1. 3 3
      include/accounts.hrl
  2. 7 9
      src/kvs.erl
  3. 3 14
      src/kvs_account.erl
  4. 4 7
      src/kvs_feed.erl
  5. 5 8
      src/kvs_group.erl
  6. 24 39
      src/kvs_user.erl
  7. 0 43
      src/map_reduce.erl
  8. 6 12
      src/store_riak.erl

+ 3 - 3
include/accounts.hrl

@@ -4,9 +4,9 @@
 
 -record(account, {
         id :: account_id(),
-        debet       :: integer(),
-        credit      :: integer(),
-        last_change :: integer() }).
+        debet = 0 :: integer(),
+        credit = 0 :: integer(),
+        last_change = 0 :: integer() }).
 
 -record(tx_payment,{ id :: integer() }).
 -record(tx_admin_change,{ reason :: binary() }).

+ 7 - 9
src/kvs.erl

@@ -45,6 +45,8 @@ init_db() ->
             add_translations();
         {ok,_} -> ignore end.
 
+add_sample_packages() -> kvs_membership:add_sample_data().
+
 add_sample_payments() ->
     {ok, Pkg1} = kvs:get(membership,1),
     {ok, Pkg2} = kvs:get(membership,2),
@@ -83,10 +85,8 @@ add_seq_ids() ->
 
 add_translations() ->
     lists:foreach(fun({English, Lang, Word}) ->
-                          ok = kvs:put(#translation{english = English, lang = "en",  word = English}),
-                          ok = kvs:put(#translation{english = English, lang = Lang,  word = Word}),
-              ok
-    end, ?URL_DICTIONARY).
+        kvs:put(#translation{english = English, lang = "en",  word = English}),
+        kvs:put(#translation{english = English, lang = Lang,  word = Word}) end, ?URL_DICTIONARY).
 
 add_sample_users() ->
 
@@ -117,8 +117,8 @@ add_sample_users() ->
         [ kvs_group:join(Me#user.username,G#group.id) || G <- Groups ],
           kvs_account:create_account(Me#user.username),
           kvs_account:transaction(Me#user.username, quota, Quota, #tx_default_assignment{}),
-          kvs:put(Me#user{password = kvs:sha(Me#user.password), starred = kvs_feed:create(), pinned = kvs_feed:create()})
-      end || Me <- UserList],
+          kvs:put(Me#user{password = sha(Me#user.password), starred = kvs_feed:create(), pinned = kvs_feed:create()})
+      end || Me <- UserList ],
 
     kvs_acl:define_access({user, "maxim"},    {feature, admin}, allow),
     kvs_acl:define_access({user_type, admin}, {feature, admin}, allow),
@@ -128,7 +128,6 @@ add_sample_users() ->
 
     ok.
 
-add_sample_packages() -> kvs_membership:add_sample_data().
 version() -> DBA=?DBA, DBA:version().
 
 add_configs() ->
@@ -225,8 +224,7 @@ load_db(Path) ->
 make_paid_fake(UId) ->
     put(#payment{user_id=UId,info= "fake_purchase"}).
 
-save(Key, Value) ->
-    Dir = ling:trim_from_last(Key, "/"),
+save(Dir, Value) ->
     filelib:ensure_dir(Dir),
     file:write_file(Key, term_to_binary(Value)).
 

+ 3 - 14
src/kvs_account.erl

@@ -30,18 +30,7 @@ balance(AccountId, Currency) ->
          {ok, #account{debet = Debet, credit = Credit}} -> {ok, Debet - Credit};
          Error -> Error end.
 
-create_account(AccountId) ->
-    Currencies = get_currencies(),
-    try [{ok, Currency} = {create_account(AccountId, Currency), Currency} || Currency <- Currencies]
-    catch _:_ -> {error, unable_create_account} end.
-
-create_account(AccountId, Currency) ->
-    Account = #account{id = {AccountId, Currency}, credit = 0, debet = 0, last_change = 0},
-
-    case kvs:put(Account) of
-         ok -> ok;
-         Error -> ?ERROR("create_account: put to db error: ~p", [Error]),
-                  {error, unable_to_store_account} end.
+create_account(AccountId) -> [ kvs:put(#account{id={AccountId, Currency}) || Currency <- get_currencies() ].
 
 check_quota(User) -> check_quota(User, 0).
 check_quota(User, Amount) ->
@@ -56,8 +45,8 @@ check_quota(User, Amount) ->
 
 commit_transaction(#transaction{remitter = R, acceptor = A,  currency = Currency, amount = Amount} = TX) ->
     case change_accounts(R, A, Currency, Amount) of
-         ok -> mqs:notify([transaction, user, R, add_transaction], TX),
-               mqs:notify([transaction, user, A, add_transaction], TX);
+         ok -> mqs:notify([kvs_account, user, R, transaction], TX),
+               mqs:notify([kvs_account, user, A, transaction], TX);
          Error -> skip end.
 
 check_remitter_balance(RA, Amount) -> ok.

+ 4 - 7
src/kvs_feed.erl

@@ -57,14 +57,11 @@ entry_traversal(Next, Count)->
             Prev = element(#entry.prev, R),
             Count1 = case Count of 
                 C when is_integer(C) -> case R#entry.type of
-                    {_, system} -> C;   % temporal entries are entries too, but they shouldn't be counted
+                    {_, system} -> C; % temporal entries are entries too, but they shouldn't be counted
                     {_, system_note} -> C;
-                    _ -> C - 1
-                end;
-                _-> Count 
-            end,
-            [R | entry_traversal(Prev, Count1)]
-    end.
+                    _ -> C - 1 end;
+                _-> Count end,
+            [R | entry_traversal(Prev, Count1)] end.
 
 entries(FeedId, undefined, PageAmount) ->
     case kvs:get(feed, FeedId) of

+ 5 - 8
src/kvs_group.erl

@@ -180,20 +180,17 @@ init_mq(Group=#group{}) ->
         {ok, Channel} ->
             mqs_channel:create_exchange(Channel, GroupExchange, ExchangeOptions),
             Relations = build_group_relations(Group),
-            [bind_group_exchange(Channel, Group, RK) || RK <- Relations],
+            [mqs_channel:bind_exchange(Channel, ?GROUP_EXCHANGE(Group#group.id), ?NOTIFICATIONS_EX, Route) || Route <- Relations],
             mqs_channel:close(Channel);
         {error, Reason} -> ?ERROR("init_mq error: ~p",[Reason]) end.
 
-rk_group_feed(Group) -> mqs_lib:list_to_key([feed, group, Group, '*', '*', '*']).
+rk_group_feed(Group) -> mqs_lib:list_to_key([kvs_feed, group, Group, '*', '*', '*']).
 
-bind_group_exchange(Channel, Group, Route) -> {bind, Route, mqs_channel:bind_exchange(Channel, ?GROUP_EXCHANGE(Group), ?NOTIFICATIONS_EX, Route)}.
-unbind_group_exchange(Channel, Group, Route) -> {unbind, Route, mqs_channel:unbind_exchange(Channel, ?GROUP_EXCHANGE(Group), ?NOTIFICATIONS_EX, Route)}.
-
-subscription_mq(Type, Action, MeId, ToId) ->
+subscription_mq(Type, Action, Who, Where) ->
     case mqs:open([]) of
         {ok,Channel} ->
             case {Type,Action} of 
-                {group,add} -> bind_group_exchange(Channel, MeId, rk_group_feed(ToId));
-                {group,remove} -> unbind_group_exchange(Channel, MeId, rk_group_feed(ToId)) end,
+                {group,add}     -> mqs_channel:bind_exchange(Channel, ?GROUP_EXCHANGE(Who), ?NOTIFICATIONS_EX, rk_group_feed(Where));
+                {groupr,remove}  -> mqs_channel:unbind_exchange(Channel, ?GROUP_EXCHANGE(Who), ?NOTIFICATIONS_EX, rk_group_feed(Where));
             mqs_channel:close(Channel);
         {error,Reason} -> ?ERROR("subscription_mq error: ~p",[Reason]) end.

+ 24 - 39
src/kvs_user.erl

@@ -9,29 +9,28 @@
 -include_lib("mqs/include/mqs.hrl").
 -compile(export_all).
 
+register(#user{username=UserName, email=Email, facebook_id = FacebookId} = Registeration) ->
 
-register(#user{username=U, email=Email, facebook_id = FbId} = RegisterData0) ->
-    FindUser = case check_username(U, FbId) of
-        {error, E} -> {error, E};
-        {ok, NewName} -> case kvs_users:get({email, Email}) of
-            {error, _} -> {ok, NewName};
+    EmailUser = case check_username(UserName, FacebookId) of
+        {error, Reason} -> {error, Reason};
+        {ok, Name} -> case kvs_users:get({email, Email}) of
+            {error, _} -> {ok, Name};
             {ok, _} -> {error, email_taken} end end,
 
-    FindUser2 = case FindUser of
-        {ok, UserName} -> case kvs_group:get(UserName) of
-            {error, _} -> {ok, UserName};
-            _ -> {error, username_taken} end;
-        A -> A end,
+    GroupUser = case EmailUser of
+        {error, Reason} -> {error, Reason};
+        {ok, Name} -> case kvs_group:get(Name) of
+            {error, _} -> {ok, Name};
+            {ok,_} -> {error, username_taken} end end,
 
-    case FindUser2 of
-        {ok, Name} -> process_register(RegisterData0#user{username=Name});
-        {error, username_taken} -> {error, user_exist};
-        {error, email_taken} ->    {error, email_taken} end.
+    case Group of
+        {ok, Name} -> process_register(Registeration#user{username=Name});
+        Error -> Error end.
 
 process_register(#user{username=U} = RegisterData0) ->
     HashedPassword = case RegisterData0#user.password of
         undefined -> undefined;
-        PlainPassword -> utils:sha(PlainPassword) end,
+        PlainPassword -> sha(PlainPassword) end,
     RegisterData = RegisterData0#user {
         feed     = kvs:feed_create(),
         direct   = kvs:feed_create(),
@@ -50,13 +49,14 @@ process_register(#user{username=U} = RegisterData0) ->
 check_username(Name, FbId) ->
     case kvs_users:get(Name) of
         {error, _} -> {ok, Name};
-        {ok, User} when FbId =/= undefined -> check_username(User#user.username  ++ integer_to_list(crypto:rand_uniform(0,10)), FbId);
+        {ok, User} when FbId =/= undefined ->
+            check_username(User#user.username  ++ integer_to_list(crypto:rand_uniform(0,10)), FbId);
         {ok, _}-> {error, username_taken} end.
 
 delete(UserName) ->
     case kvs_users:get(UserName) of
         {ok, User} ->
-            GIds = kvs_group:list_groups_per_user(UserName),
+            GIds = kvs_group:participate(UserName),
             [ mqs:notify(["subscription", "user", UserName, "remove_from_group"], {GId}) || GId <- GIds ],
             F2U = [ {MeId, FrId} || #subscription{who = MeId, whom = FrId} <- subscriptions(User) ],
             [ unsubscribe(MeId, FrId) || {MeId, FrId} <- F2U ],
@@ -66,9 +66,8 @@ delete(UserName) ->
             {ok, User};
         E -> E end.
 
-get({username, UserName}) -> kvs:user_by_username(UserName);
-get({facebook, FBId}) -> kvs:user_by_facebook_id(FBId);
-get({email, Email}) -> kvs:user_by_email(Email);
+get({facebook, FBId}) -> user_by_facebook_id(FBId);
+get({email, Email}) -> user_by_email(Email);
 get(UId) -> kvs:get(user, UId).
 
 subscribe(Who, Whom) ->
@@ -91,18 +90,17 @@ subscribed(Who, Whom) ->
         {ok, _} -> true;
         _ -> false end.
 
-subscription_mq(Type, Action, MeId, ToId) ->
+subscription_mq(Type, Action, Who, Whom) ->
     case mqs:open([]) of
         {ok,Channel} ->
             case {Type,Action} of 
-                {user,add}     -> bind_user_exchange(Channel, MeId, rk_user_feed(ToId));
-                {user,remove}  -> unbind_user_exchange(Channel, MeId, rk_user_feed(ToId)) end,
+                {user,add}     -> mqs_channel:bind_exchange(Channel, ?USER_EXCHANGE(Who), ?NOTIFICATIONS_EX, rk_user_feed(Whom));
+                {user,remove}  -> mqs_channel:unbind_exchange(Channel, ?USER_EXCHANGE(Who), ?NOTIFICATIONS_EX, rk_user_feed(Whom));
             mqs_channel:close(Channel);
         {error,Reason} -> ?ERROR("subscription_mq error: ~p",[Reason]) end.
 
 init_mq(User=#user{}) ->
     Groups = kvs_group:participate(User),
-%    ?INFO("~p init mq. users: ~p", [User, Groups]),
     UserExchange = ?USER_EXCHANGE(User#user.username),
     ExchangeOptions = [{type, <<"fanout">>}, durable, {auto_delete, false}],
     case mqs:open([]) of
@@ -110,7 +108,7 @@ init_mq(User=#user{}) ->
             ?INFO("Cration Exchange: ~p,",[{Channel,UserExchange,ExchangeOptions}]),
             mqs_channel:create_exchange(Channel, UserExchange, ExchangeOptions),
             Relations = build_user_relations(User, Groups),
-            [bind_user_exchange(Channel, User, RK) || RK <- Relations],
+            [ mqs_channel:bind_exchange(Channel, ?USER_EXCHANGE(User#user.username), ?NOTIFICATIONS_EX, Route) || Route <- Relations],
             mqs_channel:close(Channel);
         {error,Reason} -> ?ERROR("init_mq error: ~p",[Reason]) end.
 
@@ -125,10 +123,7 @@ build_user_relations(User, Groups) -> [
   [ mqs:key( [kvs_feed, group, G, '*', '*', '*']) || G <- Groups ]
     ].
 
-rk_user_feed(User) -> mqs:key([feed, user, User, '*', '*', '*']).
-
-bind_user_exchange(Channel, User, Route) -> {bind, Route, mqs_channel:bind_exchange(Channel, ?USER_EXCHANGE(User), ?NOTIFICATIONS_EX, Route)}.
-unbind_user_exchange(Channel, User, Route) -> {unbind, Route, mqs_channel:unbind_exchange(Channel, ?USER_EXCHANGE(User), ?NOTIFICATIONS_EX, Route)}.
+rk_user_feed(User) -> mqs:key([kvs_feed, user, User, '*', '*', '*']).
 
 retrieve_connections(Id,Type) ->
     Friends = case Type of 
@@ -148,11 +143,6 @@ retrieve_connections(Id,Type) ->
                         _ -> undefined end end || Who <- Sub],
                     [ X || X <- Data, X/=undefined ] end end.
 
-user_by_verification_code(Code) ->
-    case kvs:get(code,Code) of
-        {ok,{_,User,_}} -> kvs:get(user,User);
-        Else -> Else end.
-
 user_by_facebook_id(FBId) ->
     case kvs:get(facebook,FBId) of
         {ok,{_,User,_}} -> kvs:get(user,User);
@@ -163,11 +153,6 @@ user_by_email(Email) ->
         {ok,{_,User,_}} -> kvs:get(user,User);
         Else -> Else end.
 
-user_by_username(Name) ->
-    case X = kvs:get(user,Name) of
-        {ok,_Res} -> X;
-        Else -> Else end.
-
 handle_notice(["kvs_user", "subscribe", Who] = Route,
     Message, #state{owner = Owner, type =Type} = State) ->
     {Whom} = Message,

+ 0 - 43
src/map_reduce.erl

@@ -1,43 +0,0 @@
--module(map_reduce).
--author('Maxim Sokhatsky').
--include_lib("kvs/include/log.hrl").
--include_lib("stdlib/include/qlc.hrl").
--compile(export_all).
-
-map_reduce(Module, Fun, Args)->
-    lists:flatten([ case rpc:call(Node, Module, Fun, Args) of
-                       {badrpc, _Reason} -> [];
-                       R -> R end || Node <- nodes()]).
-
-map_call(NodeType,Module,Fun,Args=[ID|Rest],NodeHash0) ->
-
-    Node = nsx_opt:get_env(nsx_idgen,game_pool,5000000) div 1000000,
-
-    NodeHash = case NodeHash0 of
-                    string_map -> fun(X) -> string_map(X) end;
-                    long_map -> fun(X) -> long_map(X) end end,
-
-    DefaultNodeAtom = case NodeType of
-                           "app" -> app_srv_node;
-                           "game" -> game_srv_node;
-                           "public" -> web_srv_node end,
-
-    ServerNode = case Node of
-                      4 -> nsx_opt:get_env(store,DefaultNodeAtom,'maxim@synrc.com');
-                      5 -> nsx_opt:get_env(store,DefaultNodeAtom,'maxim@synrc.com');
-                      _ -> list_to_atom(NodeType ++ "@srv" ++ 
-                           integer_to_list(NodeHash(ID)) ++
-                           ".synrc.com") end,
-
-    ?INFO("map_call: ~p",[{ServerNode,Module,Fun,Args}]),
-    rpc:call(ServerNode,Module,Fun,Args).
-
-string_map(X) -> lists:foldl(fun(A,Sum)->A+Sum end,0,X) rem 3+1.
-long_map(X)   -> X div 1000000.
- 
-consumer_pid(Args)       -> map_call("app", feed_server,pid,Args,string_map).
-cached_feed(Args)        -> map_call("app", feed_writer,cached_feed,Args,string_map).
-cached_direct(Args)      -> map_call("app", feed_writer,cached_direct,Args,string_map).
-cached_friends(Args)     -> map_call("app", feed_writer,cached_friends,Args,string_map).
-cached_groups(Args)      -> map_call("app", feed_writer,cached_groups,Args,string_map).
-start_worker(Args)       -> map_call("app", feed_launcher,start_worker,Args,string_map).

+ 6 - 12
src/store_riak.erl

@@ -125,9 +125,6 @@ post_write_hooks(R,C) ->
         user -> case R#user.email of
                     undefined -> nothing;
                     _ -> C:put(make_object({email, R#user.username, R#user.email})) end,
-                case R#user.verification_code of
-                    undefined -> nothing;
-                    _ -> C:put(make_object({code, R#user.username, R#user.verification_code})) end,
                 case R#user.facebook_id of
                   undefined -> nothing;
                   _ -> C:put(make_object({facebook, R#user.username, R#user.facebook_id})) end;
@@ -161,16 +158,14 @@ delete_by_index(Tab, IndexId, IndexVal) ->
     Riak = riak_client(),
     Bucket = key_to_bin(Tab),
     {ok, Keys} = Riak:get_index(Bucket, {eq, IndexId, key_to_bin(IndexVal)}),
-    [Riak:delete(Bucket, Key) || Key <- Keys],
-    ok.
+    [Riak:delete(Bucket, Key) || Key <- Keys].
 
 key_to_bin(Key) ->
     if is_integer(Key) -> erlang:list_to_binary(integer_to_list(Key));
        is_list(Key) -> erlang:list_to_binary(Key);
        is_atom(Key) -> erlang:list_to_binary(erlang:atom_to_list(Key));
        is_binary(Key) -> Key;
-       true ->  [ListKey] = io_lib:format("~p", [Key]), erlang:list_to_binary(ListKey)
-    end.
+       true ->  [ListKey] = io_lib:format("~p", [Key]), erlang:list_to_binary(ListKey) end.
 
 all(RecordName) ->
     Riak = riak_client(),
@@ -183,11 +178,10 @@ all_by_index(Tab, IndexId, IndexVal) ->
     Riak = riak_client(),
     Bucket = key_to_bin(Tab),
     {ok, Keys} = Riak:get_index(Bucket, {eq, IndexId, key_to_bin(IndexVal)}),
-    F = fun(Key, Acc) ->
-                case Riak:get(Bucket, Key, []) of
-                    {ok, O} -> [riak_object:get_value(O) | Acc];
-                    {error, notfound} -> Acc end end,
-    lists:foldl(F, [], Keys).
+    lists:foldl(fun(Key, Acc) ->
+        case Riak:get(Bucket, Key, []) of
+            {ok, O} -> [riak_object:get_value(O) | Acc];
+            {error, notfound} -> Acc end end, [], Keys).
 
 riak_get_raw({RecordBin, Key, Riak}) ->
     case Riak:get(RecordBin, Key) of