Namdak Tonpa 10 лет назад
Родитель
Сommit
2e06582070
10 измененных файлов с 57 добавлено и 53 удалено
  1. 7 5
      src/kvs.erl
  2. 2 2
      src/kvs_acl.erl
  3. 1 1
      src/kvs_feed.erl
  4. 0 2
      src/kvs_sup.erl
  5. 2 2
      src/kvs_user.erl
  6. 24 26
      src/store_fs.erl
  7. 3 3
      src/store_kai.erl
  8. 16 10
      src/store_mnesia.erl
  9. 1 1
      src/store_redis.erl
  10. 1 1
      src/store_riak.erl

+ 7 - 5
src/kvs.erl

@@ -59,6 +59,8 @@ add(Record) when is_tuple(Record) ->
 
     Id = element(#iterator.id, Record),
 
+    kvs:get(element(1,Record), Id),
+
     case kvs:get(element(1,Record), Id) of
         {error, not_found} ->
 
@@ -87,7 +89,7 @@ add(Record) when is_tuple(Record) ->
                     Next = undefined,
                     Prev = case element(#container.top, Container) of
                         undefined -> undefined;
-                        Tid -> 
+                        Tid ->
                             case kvs:get(Type, Tid) of
                             {error, not_found} -> undefined;
                             {ok, Top} ->
@@ -197,7 +199,7 @@ traversal(RecordType2, Start, Count, Direction)->
     {ok, R} ->  Prev = element(Direction, R),
                 Count1 = case Count of C when is_integer(C) -> C - 1; _-> Count end,
                 [R | traversal(RecordType2, Prev, Count1, Direction)];
-    Error -> 
+    Error ->
      io:format("Error: ~p~n",[Error]),
       [] end.
 
@@ -228,14 +230,14 @@ table_type(A) -> A.
 
 range(RecordName,Id) -> Ranges = kvs:config(RecordName), find(Ranges,RecordName,Id).
 
-find([],_,Id) -> [];
-find([Range|T],RecordName,Id) -> 
+find([],_,_Id) -> [];
+find([Range|T],RecordName,Id) ->
      case lookup(Range,Id) of
           [] -> find(T,RecordName,Id);
           Name -> Name end.
 
 lookup(#interval{left=Left,right=Right,name=Name},Id) when Id =< Right, Id >= Left -> Name;
-lookup(#interval{},Id) -> [].
+lookup(#interval{},_Id) -> [].
 
 get(RecordName, Key) ->
     DBA=?DBA,

+ 2 - 2
src/kvs_acl.erl

@@ -8,13 +8,13 @@
 -include("group.hrl").
 -include("feed.hrl").
 
-metainfo() -> 
+metainfo() ->
     #schema{name=kvs,tables=[
         #table{name=acl,container=true,fields=record_info(fields,acl),keys=[id,accessor]},
         #table{name=access,container=acl,fields=record_info(fields,access)}
     ]}.
 
-define_access(Accessor, Resource, Action) -> 
+define_access(Accessor, Resource, Action) ->
     Entry = #access{ id={Accessor, Resource}, accessor=Accessor, action=Action},
     case kvs:add(Entry) of
         {error, exist} -> kvs:put(Entry#access{action=Action});

+ 1 - 1
src/kvs_feed.erl

@@ -8,7 +8,7 @@
 -include("metainfo.hrl").
 -include("state.hrl").
 
-metainfo() -> 
+metainfo() ->
     #schema{name=kvs,tables=[
         #table{name=entry,container=true,fields=record_info(fields,entry)},
         #table{name=comment,container=true,fields=record_info(fields,comment)},

+ 0 - 2
src/kvs_sup.erl

@@ -8,6 +8,4 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 init([]) ->
 
-    kvs:start(),
-
     {ok, { {one_for_one, 5, 10}, []} }.

+ 2 - 2
src/kvs_user.erl

@@ -6,7 +6,7 @@
 -include("metainfo.hrl").
 -compile(export_all).
 
-metainfo() -> 
+metainfo() ->
     #schema{name=kvs,tables=[
         #table{name=user2,container=feed,fields=record_info(fields,user2)},
         #table{name=user,container=feed,fields=record_info(fields,user),
@@ -21,6 +21,6 @@ handle_notice([kvs_user, user, Owner, delete], [#user{}=U], #state{owner=Owner}=
     kvs:info(?MODULE,"[kvs_user] delete user: ~p", [U]),
     {noreply, State};
 
-handle_notice(_Route, _Message, State) -> 
+handle_notice(_Route, _Message, State) ->
     kvs:info(?MODULE,"[kvs_user] unknown notice."),
     {noreply, State}.

+ 24 - 26
src/store_fs.erl

@@ -9,8 +9,8 @@ start()    -> ok.
 stop()     -> ok.
 destroy()  -> ok.
 version()  -> {version,"KVS FS"}.
-dir()      -> filelib:fold_files("data", "",true, fun(A,Acc)-> [{table,A}|Acc] end, []).
-join()     -> ensure_dir("data").
+dir()      -> [ {table,F} || F <- filelib:wildcard("data/*"), filelib:is_dir(F) ].
+join()     -> filelib:ensure_dir("data/").
 join(Node) -> ok. % should be rsync or smth
 change_storage(Table,Type) -> ok.
 
@@ -20,30 +20,28 @@ initialize() ->
     [ kvs:init(store_fs,Module) || Module <- kvs:modules() ],
     mnesia:wait_for_tables([ T#table.name || T <- kvs:tables()],infinity).
 
-index(Tab,Key,Value) ->
-    Table = kvs:table(Tab),
-    Index = string:str(Table#table.fields,[Key]),
-    lists:flatten(many(fun() -> mnesia:index_read(Tab,Value,Index+1) end)).
+index(Tab,Key,Value) -> ok.
+get(TableName, Key) ->
+    HashKey = wf:url_encode(base64:encode(crypto:sha(term_to_binary(Key)))),
+    Dir = lists:concat(["data/",TableName,"/"]),
+    case file:read_file(lists:concat([Dir,HashKey])) of
+         {ok,Binary} -> {ok,binary_to_term(Binary,[safe])};
+         {error,Reason} -> {error,Reason} end.
 
-get(RecordName, Key) -> just_one(fun() -> mnesia:read(RecordName, Key) end).
-put(Records) when is_list(Records) -> void(fun() -> lists:foreach(fun mnesia:write/1, Records) end);
-put(Record) -> put([Record]).
-delete(Tab, Key) ->
-    case mnesia:transaction(fun()-> mnesia:delete({Tab, Key}) end) of
-        {aborted,Reason} -> {error,Reason};
-        {atomic,_Result} -> ok end.
+put(Records) when is_list(Records) -> lists:map(fun(Record) -> put(Record) end, Records);
+put(Record) ->
+    TableName = element(1,Record),
+    HashKey = wf:url_encode(base64:encode(crypto:sha(term_to_binary(element(2,Record))))),
+    BinaryValue = term_to_binary(Record),
+    Dir = lists:concat(["data/",TableName,"/"]),
+    filelib:ensure_dir(Dir),
+    File = lists:concat([Dir,HashKey]),
+    io:format("File: ~p~n",[File]),
+    file:write_file(File,BinaryValue,[write,raw,binary,exclusive,sync]).
+
+delete(Tab, Key) -> ok.
 count(RecordName) -> length(filelib:fold_files(lists:concat(["data/",RecordName]), "",true, fun(A,Acc)-> [A|Acc] end, [])).
-all(R) -> filelib:fold_files(lists:concat(["data/",RecordName]), "",true, fun(A,Acc)-> [A|Acc] end, []).
+all(R) -> filelib:fold_files(lists:concat(["data/",R]), "",true, fun(A,Acc)-> [A|Acc] end, []).
 next_id(RecordName, Incr) -> mnesia:dirty_update_counter({id_seq, RecordName}, Incr).
-create_table(Name,Options) ->
-    X = mnesia:create_table(Name, Options),
-    kvs:info("Create table ~p ~nOptions ~p~nReturn ~p~n",[Name, Options,X]),
-    X.
-add_table_index(Record, Field) -> mnesia:add_table_index(Record, Field).
-exec(Q) -> F = fun() -> qlc:e(Q) end, {atomic, Val} = mnesia:transaction(F), Val.
-just_one(Fun) ->
-    case mnesia:transaction(Fun) of
-        {atomic, []} -> {error, not_found};
-        {atomic, [R]} -> {ok, R};
-        {atomic, [_|_]} -> {error, duplicated};
-        Error -> Error end.
+create_table(Name,Options) -> filelib:ensure_dir(lists:concat(["data/",Name,"/"])).
+add_table_index(Record, Field) -> ok.

+ 3 - 3
src/store_kai.erl

@@ -11,7 +11,7 @@ start() -> kai:start(), ok.
 stop() -> kai_store:stop(), ok.
 version() -> {version,"KVS KAI"}.
 join() -> initialize(), ok.
-join(Node) -> initialize(), ok.
+join(_Node) -> initialize(), ok.
 initialize() -> ok.
 dir() -> [{table,T}||T<-kvs:modules()].
 
@@ -24,7 +24,7 @@ kai_put(Record) ->
         vector_clocks = vclock:fresh(), value = Record },
     kai_store:put(Data).
 
-update(Record, Object) -> ok.
+update(_Record, _Object) -> ok.
 
 get(Tab, Key) ->
     Data = #data{key=Key,bucket=table_to_num(Tab)},
@@ -56,7 +56,7 @@ all(RecordName) ->
     {list_of_data,List} = kai_store:list(table_to_num(RecordName)),
     [ begin {ok,Val}=kai_get(Data),Val end || Data <- List ].
 
-all_by_index(Tab, IndexId, IndexVal) -> [].
+all_by_index(_Tab, _IndexId, _IndexVal) -> [].
 
 table_to_num(user)         -> 10;
 table_to_num(subscription) -> 11;

+ 16 - 10
src/store_mnesia.erl

@@ -13,7 +13,7 @@ dir()      -> [{table,T}||T<-mnesia:system_info(local_tables)].
 join()     -> mnesia:change_table_copy_type(schema, node(), disc_copies), initialize().
 join(Node) ->
     mnesia:change_config(extra_db_nodes, [Node]),
-    mnesia:change_table_copy_type(schema, node(), disc_copies),
+    mnesia:change_table_copy_type(schema, node(), kvs:config(kvs,mnesia_media,disc_copies)),
     [{Tb, mnesia:add_table_copy(Tb, node(), Type)}
      || {Tb, [{N, Type}]} <- [{T, mnesia:table_info(T, where_to_commit)}
                                || T <- mnesia:system_info(tables)], Node==N].
@@ -21,7 +21,7 @@ join(Node) ->
 change_storage(Table,Type) -> mnesia:change_table_copy_type(Table, node(), Type).
 
 initialize() ->
-    kvs:info(?MODULE,"[store_mnesia] mnesia init.~n",[]),
+    kvs:info(?MODULE,"mnesia init.~n",[]),
     mnesia:create_schema([node()]),
     [ kvs:init(store_mnesia,Module) || Module <- kvs:modules() ],
     mnesia:wait_for_tables([ T#table.name || T <- kvs:tables()],infinity).
@@ -35,23 +35,29 @@ get(RecordName, Key) -> just_one(fun() -> mnesia:read(RecordName, Key) end).
 put(Records) when is_list(Records) -> void(fun() -> lists:foreach(fun mnesia:write/1, Records) end);
 put(Record) -> put([Record]).
 delete(Tab, Key) ->
-    case mnesia:transaction(fun()-> mnesia:delete({Tab, Key}) end) of
+    case mnesia:activity(context(),fun()-> mnesia:delete({Tab, Key}) end) of
         {aborted,Reason} -> {error,Reason};
-        {atomic,_Result} -> ok end.
+        {atomic,_Result} -> ok;
+        X -> X end.
 count(RecordName) -> mnesia:table_info(RecordName, size).
 all(R) -> lists:flatten(many(fun() -> L= mnesia:all_keys(R), [ mnesia:read({R, G}) || G <- L ] end)).
 next_id(RecordName, Incr) -> mnesia:dirty_update_counter({id_seq, RecordName}, Incr).
-many(Fun) -> case mnesia:transaction(Fun) of {atomic, R} -> R; _ -> [] end.
-void(Fun) -> case mnesia:transaction(Fun) of {atomic, ok} -> ok; {aborted, Error} -> {error, Error} end.
-create_table(Name,Options) -> 
+many(Fun) -> case mnesia:activity(context(),Fun) of {atomic, R} -> R; X -> X end.
+void(Fun) -> case mnesia:activity(context(),Fun) of {atomic, ok} -> ok; {aborted, Error} -> {error, Error}; X -> X end.
+create_table(Name,Options) ->
     X = mnesia:create_table(Name, Options),
-    kvs:info("Create table ~p ~nOptions ~p~nReturn ~p~n",[Name, Options,X]),
+    kvs:info(?MODULE,"Create table ~p ~nOptions ~p~nReturn ~p~n",[Name, Options,X]),
     X.
 add_table_index(Record, Field) -> mnesia:add_table_index(Record, Field).
-exec(Q) -> F = fun() -> qlc:e(Q) end, {atomic, Val} = mnesia:transaction(F), Val.
+exec(Q) -> F = fun() -> qlc:e(Q) end, {atomic, Val} = mnesia:activity(context(),F), Val.
 just_one(Fun) ->
-    case mnesia:transaction(Fun) of
+    case mnesia:activity(context(),Fun) of
         {atomic, []} -> {error, not_found};
         {atomic, [R]} -> {ok, R};
         {atomic, [_|_]} -> {error, duplicated};
+        [] -> {error, not_found};
+        [R] -> {ok,R};
+        [_|_] -> {error, duplicated};
         Error -> Error end.
+
+context() -> kvs:config(kvs,mnesia_context,async_dirty).

+ 1 - 1
src/store_redis.erl

@@ -63,7 +63,7 @@ delete(RecordName,Key) ->
         {ok,<<"1">>} -> ok;
         E -> {error, E} end.
 count(RecordName) -> length(redis_keys(RecordName)).
-all(RecordName) -> 
+all(RecordName) ->
     Keys = redis_keys(RecordName),
     List = redis_transaction(fun() -> [redis_get(Key) || Key <- Keys] end),
     case RecordName of

+ 1 - 1
src/store_riak.erl

@@ -143,7 +143,7 @@ next_id(CounterId, Incr) -> next_id(CounterId, 0, Incr).
 next_id(CounterId, Default, Incr) ->
     {ok,C}=riak:local_client(),
     CounterBin = key_to_bin(CounterId),
-    {Object, Value, Options} =
+    {Object, Value, _Options} =
         case C:get(key_to_bin(id_seq), CounterBin) of
             {ok, CurObj} ->
                 R = #id_seq{id = CurVal} = riak_object:get_value(CurObj),