Browse Source

change riak to 1.3

Maxim Sokhatsky 12 years ago
parent
commit
66587ffd60
7 changed files with 57 additions and 45 deletions
  1. 5 4
      src/accounts.erl
  2. 7 6
      src/groups.erl
  3. 31 21
      src/kvs.erl
  4. 1 0
      src/kvs_sup.erl
  5. 3 3
      src/kvs_users.erl
  6. 2 2
      src/membership_packages.erl
  7. 8 9
      src/store_riak.erl

+ 5 - 4
src/accounts.erl

@@ -99,13 +99,14 @@ check_quota(User, Amount) ->
 
 
 commit_transaction(#transaction{remitter = R, acceptor = A,  currency = Currency, amount = Amount} = TX) ->
 commit_transaction(#transaction{remitter = R, acceptor = A,  currency = Currency, amount = Amount} = TX) ->
     case change_accounts(R, A, Currency, Amount) of
     case change_accounts(R, A, Currency, Amount) of
-         ok -> nsx_msg:notify_transaction(R,TX),
-               nsx_msg:notify_transaction(A,TX);
+         ok -> skip;
+              %mqs:notify([transaction, user, R, add_transaction], TX),% notify_transaction(R,TX),
+              %mqs:notify([transaction, user, A, add_transaction], TX);%notify_transaction(A,TX);
          Error ->  skip
          Error ->  skip
 %            case TX#transaction.info of
 %            case TX#transaction.info of
 %                #tx_game_event{} ->
 %                #tx_game_event{} ->
-%                    nsx_msg:notify_transaction(R,TX),
-%                    nsx_msg:notify_transaction(A,TX);
+%                    mqs:notify_transaction(R,TX),
+%                    mqs:notify_transaction(A,TX);
 %                _ ->
 %                _ ->
 %                    ?ERROR("commit transaction error: change accounts ~p", [Error]),
 %                    ?ERROR("commit transaction error: change accounts ~p", [Error]),
 %                    Error
 %                    Error

+ 7 - 6
src/groups.erl

@@ -20,19 +20,20 @@ retrieve_groups(User) ->
 create_group_directly_to_db(UId, GId, Name, Desc, Publicity) ->
 create_group_directly_to_db(UId, GId, Name, Desc, Publicity) ->
     FId = kvs:feed_create(),
     FId = kvs:feed_create(),
     CTime = erlang:now(),
     CTime = erlang:now(),
-    kvs:put(#group{username = GId,
+    Group = #group{username = GId,
                       name = Name,
                       name = Name,
                       description = Desc,
                       description = Desc,
                       publicity = Publicity,
                       publicity = Publicity,
                       creator = UId,
                       creator = UId,
                       created = CTime,
                       created = CTime,
                       owner = UId,
                       owner = UId,
-                     feed = FId}),
-    kvs_users:init_mq_for_group(GId),
+                     feed = FId},
+    kvs:put(Group),
+    kvs_users:init_mq(Group),
     add_to_group_directly_to_db(UId, GId, member),
     add_to_group_directly_to_db(UId, GId, member),
     GId.
     GId.
 
 
-add_to_group(Who, GId, Type, Owner) -> nsx_msg:notify(["subscription", "user", Owner, "add_to_group"], {GId, Who, Type}).
+add_to_group(Who, GId, Type, Owner) -> mqs:notify(["subscription", "user", Owner, "add_to_group"], {GId, Who, Type}).
 
 
 add_to_group_directly_to_db(UId, GId, Type) ->
 add_to_group_directly_to_db(UId, GId, Type) ->
     kvs:put(#group_subscription{key={UId,GId},user_id=UId, group_id=GId, user_type=Type}),
     kvs:put(#group_subscription{key={UId,GId},user_id=UId, group_id=GId, user_type=Type}),
@@ -45,7 +46,7 @@ delete_group(GId) ->
     case Group of 
     case Group of 
         notfound -> ok;
         notfound -> ok;
         _ ->
         _ ->
-            nsx_msg:notify([feed, delete, GId], empty),
+            mqs:notify([feed, delete, GId], empty),
             kvs:delete_by_index(group_subscription, <<"group_subs_group_id_bin">>, GId),         
             kvs:delete_by_index(group_subscription, <<"group_subs_group_id_bin">>, GId),         
             kvs:delete(feed, Group#group.feed),
             kvs:delete(feed, Group#group.feed),
             kvs:delete(group, GId),
             kvs:delete(group, GId),
@@ -105,7 +106,7 @@ join_group(GId, User) ->
 
 
 approve_request(UId, GId, Owner) -> add_to_group(UId, GId, member, Owner).
 approve_request(UId, GId, Owner) -> add_to_group(UId, GId, member, Owner).
 reject_request(UId, GId, Owner) -> add_to_group(UId, GId, reqrejected, Owner).
 reject_request(UId, GId, Owner) -> add_to_group(UId, GId, reqrejected, Owner).
-change_group_user_type(UId, GId, Type) -> nsx_msg:notify(["subscription", "user", UId, "add_to_group"], {GId, UId, Type}).
+change_group_user_type(UId, GId, Type) -> mqs:notify(["subscription", "user", UId, "add_to_group"], {GId, UId, Type}).
 
 
 group_exists(GId) ->
 group_exists(GId) ->
     {R, _} = get_group(GId),
     {R, _} = get_group(GId),

+ 31 - 21
src/kvs.erl

@@ -29,19 +29,20 @@ init_indexes() -> DBA = ?DBA, DBA:init_indexes().
 init_db() ->
 init_db() ->
     case kvs:get(user,"alice") of
     case kvs:get(user,"alice") of
        {error,_} ->
        {error,_} ->
-            DBA = ?DBA,
-            DBA:init_db(),
+%            DBA = ?DBA,
+%            DBA:init_db(),
+%            membership_packages:add_sample_data(),
             add_seq_ids(),
             add_seq_ids(),
             accounts:create_account(system),
             accounts:create_account(system),
             add_sample_users(),
             add_sample_users(),
             add_sample_packages(),
             add_sample_packages(),
-            add_translations(),
-            case is_production() of
-                false ->
-                    add_purchases();
-                true ->
-                    do_nothing
-            end;
+            add_translations();
+%            case is_production() of
+%                false ->
+%                    add_purchases();
+%                true ->
+%                    do_nothing
+%            end;
        {ok,_} -> ignore
        {ok,_} -> ignore
     end.
     end.
 
 
@@ -119,30 +120,31 @@ add_sample_users() ->
 
 
     ?INFO("creating groups"),
     ?INFO("creating groups"),
 
 
-    GId1  = groups:create_group_directly_to_db("maxim", "kakaranet", "Kakaranet", "Kakaranet'e Hoşgeldiniz", public),
-    GId2  = groups:create_group_directly_to_db("maxim", "yeniler", "Yeniler", "So, you must be new here.", public),
+%    GId1  = groups:create_group_directly_to_db("maxim", "kakaranet", "Kakaranet", "Kakaranet'e Hoşgeldiniz", public),
+%    GId2  = groups:create_group_directly_to_db("maxim", "yeniler", "Yeniler", "So, you must be new here.", public),
 
 
     ?INFO("adding users accounts"),
     ?INFO("adding users accounts"),
     [ begin
     [ begin
           accounts:create_account(Me#user.username),
           accounts:create_account(Me#user.username),
           accounts:transaction(Me#user.username, quota, kvs:get_config("accounts/default_quota", 300), #tx_default_assignment{}),
           accounts:transaction(Me#user.username, quota, kvs:get_config("accounts/default_quota", 300), #tx_default_assignment{}),
-          kvs:put(Me#user{password = utils:sha(Me#user.password),
+          kvs:put(Me#user{password = kvs:sha(Me#user.password),
                                 starred = feed_create(),
                                 starred = feed_create(),
                                 pinned = feed_create()})
                                 pinned = feed_create()})
       end || Me <- UserList],
       end || Me <- UserList],
     ?INFO("adding users to groups"),
     ?INFO("adding users to groups"),
     [ begin
     [ begin
-          kvs_users:init_mq(Me#user.username, [GId1, GId2]),
-          groups:add_to_group_directly_to_db(Me#user.username, GId1, member),
-          groups:add_to_group_directly_to_db(Me#user.username, GId2, member)
+        ok
+%          kvs_users:init_mq(Me#user.username, [GId1, GId2]),
+%          groups:add_to_group_directly_to_db(Me#user.username, GId1, member),
+%          groups:add_to_group_directly_to_db(Me#user.username, GId2, member)
       end || Me <- UserList ],
       end || Me <- UserList ],
     acls:define_access({user, "maxim"},    {feature, admin}, allow),
     acls:define_access({user, "maxim"},    {feature, admin}, allow),
     acls:define_access({user_type, admin}, {feature, admin}, allow),
     acls:define_access({user_type, admin}, {feature, admin}, allow),
-    ?INFO("making all users each other friends"),
-    [[case Me == Her of
-        true -> ok;
-        false -> kvs_users:subscr_user(Me#user.username, Her#user.username)
-    end || Her <- UserList] || Me <- UserList].
+    ?INFO("making all users each other friends").
+%    [[case Me == Her of
+%        true -> ok;
+%        false -> kvs_users:subscribe(Me#user.username, Her#user.username)
+%    end || Her <- UserList] || Me <- UserList].
 
 
 add_sample_packages() -> membership_packages:add_sample_data().
 add_sample_packages() -> membership_packages:add_sample_data().
 version() -> ?INFO("version: ~p", [1]).
 version() -> ?INFO("version: ~p", [1]).
@@ -386,4 +388,12 @@ handle_notice(["db", "group", GroupId, "update_group"] = Route,
 handle_notice(Route, Message, State) -> error_logger:info_msg("Unknown KVS notice").
 handle_notice(Route, Message, State) -> error_logger:info_msg("Unknown KVS notice").
 
 
 coalesce(undefined, B) -> B;
 coalesce(undefined, B) -> B;
-coalesce(A, _) -> A.
+coalesce(A, _) -> A.
+
+sha(Raw) ->
+    lists:flatten(
+      [io_lib:format("~2.16.0b", [N]) || <<N>> <= crypto:sha(Raw)]).
+
+sha_upper(Raw) ->
+    SHA = sha(Raw),
+    string:to_upper(SHA).

+ 1 - 0
src/kvs_sup.erl

@@ -35,6 +35,7 @@ init([]) ->
     {ok, _Value} -> skip end,
     {ok, _Value} -> skip end,
 
 
   kvs:initialize(),
   kvs:initialize(),
+  kvs:init_indexes(),
 
 
   case application:get_env(kvs, riak_srv_node) of
   case application:get_env(kvs, riak_srv_node) of
     undefined ->
     undefined ->

+ 3 - 3
src/kvs_users.erl

@@ -52,7 +52,7 @@ check_username(Name, FbId) ->
 delete(UserName) ->
 delete(UserName) ->
     case kvs_users:get(UserName) of
     case kvs_users:get(UserName) of
         {ok, User} -> GIds = groups:list_groups_per_user(UserName),
         {ok, User} -> GIds = groups:list_groups_per_user(UserName),
-                      [nsx_msg:notify(["subscription", "user", UserName, "remove_from_group"], {GId}) || GId <- GIds],
+                      [mqs:notify(["subscription", "user", UserName, "remove_from_group"], {GId}) || GId <- GIds],
                       F2U = [ {MeId, FrId} || #subscription{who = MeId, whom = FrId} <- subscriptions(User) ],
                       F2U = [ {MeId, FrId} || #subscription{who = MeId, whom = FrId} <- subscriptions(User) ],
                       [ unsubscribe(MeId, FrId) || {MeId, FrId} <- F2U ],
                       [ unsubscribe(MeId, FrId) || {MeId, FrId} <- F2U ],
                       [ unsubscribe(FrId, MeId) || {MeId, FrId} <- F2U ],
                       [ unsubscribe(FrId, MeId) || {MeId, FrId} <- F2U ],
@@ -112,7 +112,7 @@ subscription_mq(Type, Action, MeId, ToId) ->
 init_mq(User=#user{}) ->
 init_mq(User=#user{}) ->
     Groups = groups:list_groups_per_user(User),
     Groups = groups:list_groups_per_user(User),
     ?INFO("~p init mq. users: ~p", [User, Groups]),
     ?INFO("~p init mq. users: ~p", [User, Groups]),
-    UserExchange = ?USER_EXCHANGE(User),
+    UserExchange = ?USER_EXCHANGE(User#user.username),
     ExchangeOptions = [{type, <<"fanout">>}, durable, {auto_delete, false}],
     ExchangeOptions = [{type, <<"fanout">>}, durable, {auto_delete, false}],
     {ok, Channel} = mqs:open([]),
     {ok, Channel} = mqs:open([]),
     ?INFO("Cration Exchange: ~p,",[{Channel,UserExchange,ExchangeOptions}]),
     ?INFO("Cration Exchange: ~p,",[{Channel,UserExchange,ExchangeOptions}]),
@@ -122,7 +122,7 @@ init_mq(User=#user{}) ->
     mqs_channel:close(Channel);
     mqs_channel:close(Channel);
 
 
 init_mq(Group=#group{}) ->
 init_mq(Group=#group{}) ->
-    GroupExchange = ?GROUP_EXCHANGE(Group),
+    GroupExchange = ?GROUP_EXCHANGE(Group#group.username),
     ExchangeOptions = [{type, <<"fanout">>}, durable, {auto_delete, false}],
     ExchangeOptions = [{type, <<"fanout">>}, durable, {auto_delete, false}],
     {ok, Channel} = mqs:open([]),
     {ok, Channel} = mqs:open([]),
     ok = mqs_channel:create_exchange(Channel, GroupExchange, ExchangeOptions),
     ok = mqs_channel:create_exchange(Channel, GroupExchange, ExchangeOptions),

+ 2 - 2
src/membership_packages.erl

@@ -59,7 +59,7 @@ add_purchase(#membership_purchase{} = MP, State0, Info) ->
                                               state_log = StateLog},
                                               state_log = StateLog},
 
 
             %% notify about purchase added
             %% notify about purchase added
-%            nsx_msg:notify_purchase(Purchase),
+%            mqs:notify_purchase(Purchase),
 
 
             ?INFO("Purchase added ~p ~p",[Purchase#membership_purchase.user_id, Purchase]),
             ?INFO("Purchase added ~p ~p",[Purchase#membership_purchase.user_id, Purchase]),
 
 
@@ -91,7 +91,7 @@ set_purchase_state(MPId, NewState, Info) ->
                                       state_log = NewStateLog},
                                       state_log = NewStateLog},
 
 
     %% notify aboput state change
     %% notify aboput state change
-%    nsx_msg:notify_purchase(Purchase),
+%    mqs:notify_purchase(Purchase),
     NewMP=MP#membership_purchase{state = NewState,
     NewMP=MP#membership_purchase{state = NewState,
                                          end_time = EndTime,
                                          end_time = EndTime,
                                          state_log = NewStateLog},
                                          state_log = NewStateLog},

+ 8 - 9
src/store_riak.erl

@@ -13,15 +13,12 @@
 -include_lib("stdlib/include/qlc.hrl").
 -include_lib("stdlib/include/qlc.hrl").
 -compile(export_all).
 -compile(export_all).
 
 
--define(BUCKET_INDEX, "bucket_bin").
--define(MD_INDEX, <<"index">>).
-
 delete() -> ok.
 delete() -> ok.
 start() -> ok.
 start() -> ok.
 stop() -> stopped.
 stop() -> stopped.
 
 
 initialize() ->
 initialize() ->
-    C = riak:client_connect("n2o@127.0.0.1"),
+    C = riak:client_connect(node()),
     ets:new(config, [named_table,{keypos,#config.key}]),
     ets:new(config, [named_table,{keypos,#config.key}]),
     ets:insert(config, #config{ key = "riak_client", value = C}),
     ets:insert(config, #config{ key = "riak_client", value = C}),
     ok.
     ok.
@@ -57,16 +54,17 @@ make_object(T) ->
     Bucket = element(1,T),
     Bucket = element(1,T),
     Key = element(2,T),
     Key = element(2,T),
     Obj1 = riak_object:new(key_to_bin(Bucket), key_to_bin(Key), T),
     Obj1 = riak_object:new(key_to_bin(Bucket), key_to_bin(Key), T),
-    Indices = [{?BUCKET_INDEX, Bucket} | make_indices(T)], %% Usefull only for level_db buckets
-    Meta = dict:store(?MD_INDEX, Indices, dict:new()),
+    Indices = [<<"index">>|make_indices(T)], %% Usefull only for level_db buckets
+    Meta = dict:store("index", Indices, dict:new()),
     Obj2 = riak_object:update_metadata(Obj1, Meta),
     Obj2 = riak_object:update_metadata(Obj1, Meta),
+    error_logger:info_msg("RIAK PUT IDX ~p",[Indices]),
     Obj2.
     Obj2.
 
 
 make_indices(#subscription{who=Who, whom=Whom}) -> [{<<"subs_who_bin">>, key_to_bin(Who)}, {<<"subs_whom_bin">>, key_to_bin(Whom)}];
 make_indices(#subscription{who=Who, whom=Whom}) -> [{<<"subs_who_bin">>, key_to_bin(Who)}, {<<"subs_whom_bin">>, key_to_bin(Whom)}];
 make_indices(#group_subscription{user_id=UId, group_id=GId}) -> [{<<"group_subs_user_bin">>, key_to_bin(UId)}, {<<"group_subs_group_bin">>, key_to_bin(GId)}];
 make_indices(#group_subscription{user_id=UId, group_id=GId}) -> [{<<"group_subs_user_bin">>, key_to_bin(UId)}, {<<"group_subs_group_bin">>, key_to_bin(GId)}];
 make_indices(#user_bought_gifts{username=UId}) -> [{<<"user_bought_gifts_username_bin">>, key_to_bin(UId)}];
 make_indices(#user_bought_gifts{username=UId}) -> [{<<"user_bought_gifts_username_bin">>, key_to_bin(UId)}];
-make_indices(#user{username=UId,zone=Zone}) -> [{<<"user_bin">>, key_to_bin(UId)},{<<"user_zone_bin">>, key_to_bin(Zone)}];
-make_indices(_Record) -> [].
+make_indices(#user{username=UId,zone=Zone}) -> [{<<"user_bin">>, key_to_bin(UId)}];
+make_indices(Record) -> [{key_to_bin(atom_to_list(element(1,Record))++"_bin"),key_to_bin(element(2,Record))}].
 
 
 riak_client() -> [{_,_,{_,C}}] = ets:lookup(config, "riak_client"), C.
 riak_client() -> [{_,_,{_,C}}] = ets:lookup(config, "riak_client"), C.
 
 
@@ -76,7 +74,8 @@ put(Record) -> store_riak:put([Record]).
 riak_put(Record) ->
 riak_put(Record) ->
     Object = make_object(Record),
     Object = make_object(Record),
     Riak = riak_client(),
     Riak = riak_client(),
-    Result = Riak:put(Object, [{allow_mult,false},{last_write_wins,true}]),
+    Result = Riak:put(Object),
+    error_logger:info_msg("RIAK PUT RES ~p",[Result]),
     post_write_hooks(Record, Riak),
     post_write_hooks(Record, Riak),
     Result.
     Result.