Namdak Tonpa 4 лет назад
Родитель
Сommit
72e187c8d0
4 измененных файлов с 76 добавлено и 86 удалено
  1. 1 1
      include/api.hrl
  2. 2 0
      include/cursors.hrl
  3. 67 79
      src/kvs.erl
  4. 6 6
      src/stores/kvs_st.erl

+ 1 - 1
include/api.hrl

@@ -1,6 +1,6 @@
 -ifndef(API_HRL).
 -define(API_HRL, true).
--define(API,[start/0,stop/0,leave/0,leave/1,destroy/0,
+-define(API,[start/0,stop/0,leave/0,leave/1,destroy/0,feed/2,
              join/0,join/1,modules/0,cursors/0,get/2,get/3,put/1,put/2,index/3,delete/2,
              table/1,tables/0,dir/0,initialize/2,seq/2,all/1,all/2,count/1,ver/0]).
 -include("metainfo.hrl").

+ 2 - 0
include/cursors.hrl

@@ -6,6 +6,7 @@
                   cache = [] :: [] | {term(),term()} | {term(),term(),term()},
                   args  = [] :: [] | term(),
                   first = [] :: [] | tuple() } ).
+
 -record(reader, { id    = [] :: [] | integer(),
                 pos   =  0 :: integer() | atom(),
                 cache = [] :: [] | {term(),term()} | {term(),term(),term()},
@@ -14,4 +15,5 @@
                 seek = [] :: term(),
                 count = 0 :: integer(),
                 dir   =  0 :: 0 | 1 } ).
+
 -endif.

+ 67 - 79
src/kvs.erl

@@ -1,7 +1,6 @@
 -module(kvs).
 -behaviour(application).
 -behaviour(supervisor).
--description('KVS Abstract Chain Store').
 -include_lib("stdlib/include/assert.hrl").
 -include("api.hrl").
 -include("metainfo.hrl").
@@ -9,44 +8,23 @@
 -include("cursors.hrl").
 -include("kvs.hrl").
 -include("backend.hrl").
--export([dump/0,metainfo/0,ensure/1,seq_gen/0,fold/6,fold/7,head/1,head/2,feed/2,fields/1,defined/2,field/2,setfield/3,cut/2]).
+-export([dump/0,metainfo/0,ensure/1,seq_gen/0,fields/1,defined/2,field/2,setfield/3,cut/2]).
 -export(?API).
 -export(?STREAM).
 -export([init/1, start/2, stop/1]).
 -record('$msg', {id,next,prev,user,msg}).
 
-init([]) -> {ok, { {one_for_one, 5, 10}, []} }.
-start(_,_) -> supervisor:start_link({local, kvs}, kvs, []).
-stop(_) -> ok.
-test_tabs() -> [ #table{name='$msg', fields=record_info(fields,'$msg')} ].
-
-fields(Table) when is_atom(Table) ->
-  case table(Table) of
-    false -> [];
-    T -> T#table.fields
-  end.
-
-defined(TableRecord, Field) ->
-  FieldsList = fields(element(1, TableRecord)),
-  lists:member(Field, FieldsList).
-
-field(TableRecord, Field) ->
-  FieldsList = fields(element(1, TableRecord)),
-  Index = string:str(FieldsList, [Field]) + 1,
-  element(Index, TableRecord).
-
-setfield(TableRecord, Field, Value) ->
-  FieldsList = fields(element(1, TableRecord)),
-  Index = string:str(FieldsList, [Field]) + 1,
-  setelement(Index, TableRecord, Value).
+init([])     -> {ok, { {one_for_one, 5, 10}, []} }.
+start(_,_)   -> supervisor:start_link({local, kvs}, kvs, []).
+stop(_)      -> ok.
+dba()        -> application:get_env(kvs,dba,kvs_mnesia).
+kvs_stream() -> application:get_env(kvs,dba_st,kvs_stream).
 
 % kvs api
 
-dba()              -> application:get_env(kvs,dba,kvs_mnesia).
-kvs_stream()       -> application:get_env(kvs,dba_st,kvs_stream).
 all(Table)         -> all     (Table, #kvs{mod=dba()}).
 delete(Table,Key)  -> delete  (Table, Key, #kvs{mod=dba()}).
-get(Table,Key)     -> ?MODULE:get     (Table, Key, #kvs{mod=dba()}).
+get(Table,Key)     -> ?MODULE:get (Table, Key, #kvs{mod=dba()}).
 index(Table,K,V)   -> index   (Table, K,V, #kvs{mod=dba()}).
 join()             -> join    ([],    #kvs{mod=dba()}).
 dump()             -> dump    (#kvs{mod=dba()}).
@@ -54,14 +32,34 @@ join(Node)         -> join    (Node,  #kvs{mod=dba()}).
 leave()            -> leave   (#kvs{mod=dba()}).
 destroy()          -> destroy (#kvs{mod=dba()}).
 count(Table)       -> count   (Table, #kvs{mod=dba()}).
-put(Record)        -> ?MODULE:put     (Record, #kvs{mod=dba()}).
-fold(Fun,Acc,T,S,C,D) -> fold (Fun,Acc,T,S,C,D, #kvs{mod=dba()}).
+put(Record)        -> ?MODULE:put (Record, #kvs{mod=dba()}).
 stop()             -> stop_kvs(#kvs{mod=dba()}).
 start()            -> start   (#kvs{mod=dba()}).
 ver()              -> ver(#kvs{mod=dba()}).
 dir()              -> dir     (#kvs{mod=dba()}).
 feed(Key)          -> feed    (Key, #kvs{mod=dba(),st=kvs_stream()}).
 seq(Table,DX)      -> seq     (Table, DX, #kvs{mod=dba()}).
+remove(Rec,Feed)   -> remove  (Rec, Feed, #kvs{mod=dba(),st=kvs_stream()}).
+
+put(Records,#kvs{mod=Mod}) when is_list(Records) -> Mod:put(Records);
+put(Record,#kvs{mod=Mod})            -> Mod:put(Record).
+get(RecordName, Key, #kvs{mod=Mod})  -> Mod:get(RecordName, Key).
+delete(Tab, Key, #kvs{mod=Mod})      -> Mod:delete(Tab, Key).
+count(Tab,#kvs{mod=DBA})             -> DBA:count(Tab).
+index(Tab, Key, Value,#kvs{mod=DBA}) -> DBA:index(Tab, Key, Value).
+seq(Tab, Incr,#kvs{mod=DBA})         -> DBA:seq(Tab, Incr).
+dump(#kvs{mod=Mod})                  -> Mod:dump().
+feed(Tab,#kvs{st=Mod})               -> Mod:feed(Tab).
+remove(Rec,Feed, #kvs{st=Mod})       -> Mod:remove(Rec,Feed).
+
+all(Tab,#kvs{mod=DBA})   -> DBA:all(Tab).
+start(#kvs{mod=DBA})     -> DBA:start().
+stop_kvs(#kvs{mod=DBA})  -> DBA:stop().
+join(Node,#kvs{mod=DBA}) -> DBA:join(Node).
+leave(#kvs{mod=DBA})     -> DBA:leave().
+destroy(#kvs{mod=DBA})   -> DBA:destroy().
+ver(#kvs{mod=DBA})       -> DBA:version().
+dir(#kvs{mod=DBA})       -> DBA:dir().
 
 % stream api
 
@@ -74,54 +72,32 @@ take (X) -> (kvs_stream()):take(X).
 save (X) -> (kvs_stream()):save(X).
 cut  (X,Y) -> (kvs_stream()):cut (X,Y).
 add  (X) -> (kvs_stream()):add (X).
-append  (X, Y) -> (kvs_stream()):append (X, Y).
+append   (X, Y) -> (kvs_stream()):append (X, Y).
 load_reader (X) -> (kvs_stream()):load_reader(X).
 writer      (X) -> (kvs_stream()):writer(X).
 reader      (X) -> (kvs_stream()):reader(X).
+
+% unrevisited
+
 ensure(#writer{id=Id}) ->
    case kvs:get(writer,Id) of
         {error,_} -> kvs:save(kvs:writer(Id)), ok;
         {ok,_}    -> ok end.
 
-metainfo() ->  #schema { name = kvs, tables = core() ++ test_tabs() }.
-core()    -> [ #table { name = id_seq, fields = record_info(fields,id_seq), keys=[thing]} ].
-
-initialize(Backend, Module) ->
-    [ begin
-        Backend:create_table(T#table.name, [{attributes,T#table.fields},
-               {T#table.copy_type, [node()]},{type,T#table.type}]),
-        [ Backend:add_table_index(T#table.name, Key) || Key <- T#table.keys ],
-        T
-    end || T <- (Module:metainfo())#schema.tables ].
-
-all(Tab,#kvs{mod=DBA}) -> DBA:all(Tab).
-start(#kvs{mod=DBA}) -> DBA:start().
-stop_kvs(#kvs{mod=DBA}) -> DBA:stop().
-join(Node,#kvs{mod=DBA}) -> DBA:join(Node).
-leave(#kvs{mod=DBA}) -> DBA:leave().
-destroy(#kvs{mod=DBA}) -> DBA:destroy().
-ver(#kvs{mod=DBA}) -> DBA:version().
-tables() -> lists:flatten([ (M:metainfo())#schema.tables || M <- modules() ]).
-table(Name) when is_atom(Name) -> lists:keyfind(Name,#table.name,tables());
-table(_) -> false.
-dir(#kvs{mod=DBA}) -> DBA:dir().
-modules() -> application:get_env(kvs,schema,[]).
 cursors() ->
     lists:flatten([ [ {T#table.name,T#table.fields}
         || #table{name=Name}=T <- (M:metainfo())#schema.tables, Name == reader orelse Name == writer  ]
     || M <- modules() ]).
 
-fold(___,Acc,_,[],_,_,_) -> Acc;
-fold(___,Acc,_,undefined,_,_,_) -> Acc;
-fold(___,Acc,_,_,0,_,_) -> Acc;
-fold(Fun,Acc,Table,Start,Count,Direction,Driver) ->
-    try
-    case kvs:get(Table, Start, Driver) of
-         {ok, R} -> Prev = element(Direction, R),
-                    Count1 = case Count of C when is_integer(C) -> C - 1; _-> Count end,
-                    fold(Fun, Fun(R,Acc), Table, Prev, Count1, Direction, Driver);
-          _Error -> Acc
-    end catch _:_ -> Acc end.
+% metainfo api
+
+tables()    -> lists:flatten([ (M:metainfo())#schema.tables || M <- modules() ]).
+modules()   -> application:get_env(kvs,schema,[]).
+metainfo()  ->  #schema { name = kvs, tables = core() ++ test_tabs() }.
+core()      -> [ #table { name = id_seq, fields = record_info(fields,id_seq), keys=[thing]} ].
+test_tabs() -> [ #table{name='$msg', fields=record_info(fields,'$msg')} ].
+table(Name) when is_atom(Name) -> lists:keyfind(Name,#table.name,tables());
+table(_)    -> false.
 
 seq_gen() ->
     Init = fun(Key) ->
@@ -130,16 +106,28 @@ seq_gen() ->
                 {ok, _} -> {Key,skip} end end,
     [ Init(atom_to_list(Name))  || {Name,_Fields} <- cursors() ].
 
-put(Records,#kvs{mod=Mod}) when is_list(Records) -> Mod:put(Records);
-put(Record,#kvs{mod=Mod}) -> Mod:put(Record).
-get(RecordName, Key, #kvs{mod=Mod}) -> Mod:get(RecordName, Key).
-delete(Tab, Key, #kvs{mod=Mod}) -> Mod:delete(Tab, Key).
-count(Tab,#kvs{mod=DBA}) -> DBA:count(Tab).
-index(Tab, Key, Value,#kvs{mod=DBA}) -> DBA:index(Tab, Key, Value).
-seq(Tab, Incr,#kvs{mod=DBA}) -> DBA:seq(Tab, Incr).
-dump(#kvs{mod=Mod}) -> Mod:dump().
-feed(Tab,#kvs{st=Mod}) -> Mod:feed(Tab).
-remove(Rec,Feed) -> remove(Rec,Feed,#kvs{mod=dba(),st=kvs_stream()}).
-remove(Rec,Feed, #kvs{st=Mod}) -> Mod:remove(Rec,Feed).
-head(Key) -> case (kvs:take((kvs:reader(Key))#reader{args=1}))#reader.args of [X] -> X; [] -> [] end.
-head(Key,Count) -> (kvs:take((kvs:reader(Key))#reader{args=Count,dir=1}))#reader.args.
+initialize(Backend, Module) ->
+    [ begin
+        Backend:create_table(T#table.name, [{attributes,T#table.fields},
+               {T#table.copy_type, [node()]},{type,T#table.type}]),
+        [ Backend:add_table_index(T#table.name, Key) || Key <- T#table.keys ],
+        T
+    end || T <- (Module:metainfo())#schema.tables ].
+
+fields(Table) when is_atom(Table) ->
+  case table(Table) of false -> [];
+    T -> T#table.fields end.
+
+defined(TableRecord, Field) ->
+  FieldsList = fields(element(1, TableRecord)),
+  lists:member(Field, FieldsList).
+
+field(TableRecord, Field) ->
+  FieldsList = fields(element(1, TableRecord)),
+  Index = string:str(FieldsList, [Field]) + 1,
+  element(Index, TableRecord).
+
+setfield(TableRecord, Field, Value) ->
+  FieldsList = fields(element(1, TableRecord)),
+  Index = string:str(FieldsList, [Field]) + 1,
+  setelement(Index, TableRecord, Value).

+ 6 - 6
src/stores/kvs_st.erl

@@ -1,13 +1,10 @@
 -module(kvs_st).
--description('KVS STREAM NATIVE ROCKS').
 -include("kvs.hrl").
 -include("stream.hrl").
 -include("metainfo.hrl").
 -export(?STREAM).
 -import(kvs_rocks, [key/2, key/1, bt/1, tb/1, ref/0, fd/1, seek_it/1, move_it/3, take_it/4]).
 
-% section: kvs_stream prelude
-
 se(X,Y,Z) -> setelement(X,Y,Z).
 e(X,Y) -> element(X,Y).
 c4(R,V) -> se(#reader.args,  R, V).
@@ -56,10 +53,15 @@ save(C) -> NC = c4(C,[]), kvs:put(NC), NC.
 
 % add
 
+raw_append(M,Feed) -> rocksdb:put(ref(), key(Feed,M), term_to_binary(M), [{sync,true}]).
+
 add(#writer{args=M}=C) when element(2,M) == [] -> add(si(M,kvs:seq([],[])),C);
 add(#writer{args=M}=C) -> add(M,C).
 
-add(M,#writer{id=Feed,count=S}=C) -> NS=S+1, raw_append(M,Feed), C#writer{cache={e(1,M),e(2,M),key(Feed)},count=NS}.
+add(M,#writer{id=Feed,count=S}=C) ->
+   NS=S+1,
+   raw_append(M,Feed),
+   C#writer{cache={e(1,M),e(2,M),key(Feed)},count=NS}.
 
 remove(Rec,Feed) ->
    kvs:ensure(#writer{id=Feed}),
@@ -71,8 +73,6 @@ remove(Rec,Feed) ->
               Count;
          _ -> C end.
 
-raw_append(M,Feed) -> rocksdb:put(ref(), key(Feed,M), term_to_binary(M), [{sync,true}]).
-
 append(Rec,Feed) ->
    kvs:ensure(#writer{id=Feed}),
    Id = e(2,Rec),