SunRiseGG 3 years ago
parent
commit
f5fce08379
7 changed files with 43 additions and 40 deletions
  1. 1 1
      include/stream.hrl
  2. 3 1
      src/kvs.erl
  3. 21 21
      src/layers/kvs_stream.erl
  4. 1 1
      src/stores/kvs_fs.erl
  5. 1 1
      src/stores/kvs_mnesia.erl
  6. 1 1
      src/stores/kvs_rocks.erl
  7. 15 14
      src/stores/kvs_st.erl

+ 1 - 1
include/stream.hrl

@@ -3,7 +3,7 @@
 -include("kvs.hrl").
 -include("cursors.hrl").
 -define(STREAM, [top/1, top/2, bot/1, bot/2, next/1, next/2, prev/1, prev/2, drop/1, drop/2, take/1, take/2, append/2, append/3, feed/1, feed/2,
-                 load_reader/1, load_reader/2, writer/1, writer/2, reader/1, reader/2, save/1, add/1, add/2, remove/2, remove/3]).
+                 load_reader/1, load_reader/2, writer/1, writer/2, reader/1, reader/2, save/1, save/2, add/1, add/2, remove/2, remove/3]).
 
 -spec top(#reader{})  -> #reader{}.
 -spec bot(#reader{})  -> #reader{}.

+ 3 - 1
src/kvs.erl

@@ -168,6 +168,8 @@ take(X,#kvs{db = Db})        -> (kvs_stream()):take(X,Db).
 
 save(X)                      -> (kvs_stream()):save(X).
 
+save(X,#kvs{db = Db})        -> (kvs_stream()):save(X,Db).
+
 cut(X, Y)                    -> (kvs_stream()):cut(X, Y).
 
 add(X)                       -> (kvs_stream()):add(X).
@@ -192,7 +194,7 @@ reader(X,#kvs{db = Db})      -> (kvs_stream()):reader(X,Db).
 
 % unrevisited
 
-ensure(#writer{} = X) -> ensure(X,#kvs{db = db()}).
+ensure(#writer{} = X) -> ensure(X,#kvs{mod = dba(), db = db()}).
 ensure(#writer{id = Id},#kvs{} = X) ->
     case kvs:get(writer, Id, X) of
         {error, _} ->

+ 21 - 21
src/layers/kvs_stream.erl

@@ -29,11 +29,11 @@ ep(T)   -> e(#iter.prev, T).
 acc(0)  -> next;
 acc(1)  -> prev.
 
-n({ok,R},C,P)    -> r(kvs:get(tab(R),en(R)),C,P);
-n({error,X},_,_) -> {error,X}.
+n({ok,R},C,P,Db)   -> r(kvs:get(tab(R),en(R),#kvs{db=Db}),C,P);
+n({error,X},_,_,_) -> {error,X}.
 
-p({ok,R},C,P)    -> r(kvs:get(tab(R),ep(R)),C,P);
-p({error,X},_,_) -> {error,X}.
+p({ok,R},C,P,Db)   -> r(kvs:get(tab(R),ep(R),#kvs{db=Db}),C,P);
+p({error,X},_,_,_) -> {error,X}.
 
 r({ok,R},C,P)    -> C#reader{cache={tab(R),id(R)},pos=P};
 r({error,X},_,_) -> {error,X}.
@@ -45,17 +45,17 @@ w({error,X},_,_)                          -> {error,X}.
 
 % next, prev, top, bot
 top(#reader{}=X)          -> top(X,db()).
-top(#reader{feed=F}=C,Db) -> w(kvs:get(writer,F,Db),top,C).
+top(#reader{feed=F}=C,Db) -> w(kvs:get(writer,F,#kvs{db=Db}),top,C).
 bot(#reader{}=X)          -> bot(X,db()).
-bot(#reader{feed=F}=C,Db) -> w(kvs:get(writer,F,Db),bot,C).
+bot(#reader{feed=F}=C,Db) -> w(kvs:get(writer,F,#kvs{db=Db}),bot,C).
 
 next(#reader{}=X)         -> next(X,db()).
 next(#reader{cache=[]},_) -> {error,empty};
-next(#reader{cache={T,R},pos=P}=C,Db) -> n(kvs:get(T,R,Db),C,P+1).
+next(#reader{cache={T,R},pos=P}=C,Db) -> n(kvs:get(T,R,#kvs{db=Db}),C,P+1,Db).
 
 prev(#reader{}=X)       -> prev(X,db()).
 prev(#reader{cache=[]},_) -> {error,empty};
-prev(#reader{cache={T,R},pos=P}=C,Db) -> p(kvs:get(T,R,Db),C,P-1).
+prev(#reader{cache={T,R},pos=P}=C,Db) -> p(kvs:get(T,R,#kvs{db=Db}),C,P-1,Db).
 
 % take, drop, feed
 
@@ -68,7 +68,7 @@ take(#reader{dir=D,cache=_B,args=N,pos=P}=C,Db)  -> take(acc(D),N,C,C,[],P,Db).
 
 take(_,_,{error,_},C2,R,P,_) -> C2#reader{args=lists:flatten(R),pos=P,cache={tab(hd(R)),en(hd(R))}};
 take(_,0,_,C2,R,P,_)         -> C2#reader{args=lists:flatten(R),pos=P,cache={tab(hd(R)),en(hd(R))}};
-take(A,N,#reader{cache={T,I},pos=P}=C,C2,R,_,Db) -> take(A,N-1,?MODULE:A(C,Db),C2,[element(2,kvs:get(T,I,Db))|R],P,Db).
+take(A,N,#reader{cache={T,I},pos=P}=C,C2,R,_,Db) -> take(A,N-1,?MODULE:A(C,Db),C2,[element(2,kvs:get(T,I,#kvs{db=Db}))|R],P,Db).
 
 drop(_,_,{error,_},C2,P,B,_)     -> C2#reader{pos=P,cache=B};
 drop(_,0,_,C2,P,B,_)             -> C2#reader{pos=P,cache=B};
@@ -80,14 +80,14 @@ feed(Feed,Db) -> #reader{args=Args} = take((reader(Feed,Db))#reader{args=-1},Db)
 % new, save, load, writer, reader
 
 load_reader (Id) -> load_reader(Id,db()).
-load_reader (Id,Db) -> case kvs:get(reader,Id,Db) of {ok,C} -> C; _ -> #reader{id=[]} end.
+load_reader (Id,Db) -> case kvs:get(reader,Id,#kvs{db=Db}) of {ok,C} -> C; _ -> #reader{id=[]} end.
 
 save (C) -> save(C,db()).
-save (C,Db) -> NC = c4(C,[]), kvs:put(NC,Db), NC.
+save (C,Db) -> NC = c4(C,[]), kvs:put(NC,#kvs{db=Db}), NC.
 writer (Id) -> writer(Id,db()).
-writer (Id,Db) -> case kvs:get(writer,Id,Db) of {ok,W} -> W; {error,_} -> #writer{id=Id} end.
+writer (Id,Db) -> case kvs:get(writer,Id,#kvs{db=Db}) of {ok,W} -> W; {error,_} -> #writer{id=Id} end.
 reader (Id) -> reader(Id,db()).
-reader (Id,Db) -> case kvs:get(writer,Id,Db) of
+reader (Id,Db) -> case kvs:get(writer,Id,#kvs{db=Db}) of
          {ok,#writer{first=[]}} -> #reader{id=kvs:seq(reader,1),feed=Id,cache=[]};
          {ok,#writer{first=F}}  -> #reader{id=kvs:seq(reader,1),feed=Id,cache={tab(F),id(F)}};
          {error,_} -> save(#writer{id=Id},Db), reader(Id,Db) end.
@@ -99,26 +99,26 @@ add(#writer{args=M}=C,Db) when element(2,M) == [] -> add(si(M,kvs:seq(tab(M),1))
 add(#writer{args=M}=C,Db) -> add(M,C,Db).
 
 add(M,#writer{cache=[]}=C,Db) ->
-    _Id=id(M), N=sp(sn(M,[]),[]), kvs:put(N,Db),
+    _Id=id(M), N=sp(sn(M,[]),[]), kvs:put(N,#kvs{db=Db}),
     C#writer{cache=N,count=1,first=N};
 
 add(M,#writer{cache=V1,count=S}=C,Db) ->
-    {ok,V} = kvs:get(tab(V1),id(V1),Db),
-    N=sp(sn(M,[]),id(V)), P=sn(V,id(M)), kvs:put([N,P],Db),
+    {ok,V} = kvs:get(tab(V1),id(V1),#kvs{db=Db}),
+    N=sp(sn(M,[]),id(V)), P=sn(V,id(M)), kvs:put([N,P],#kvs{db=Db}),
     C#writer{cache=N,count=S+1}.
 
 remove(Rec,Feed)  -> remove(Rec,Feed,db()).
 remove(_Rec,Feed,Db) ->
-   {ok,W=#writer{count=Count}} = kvs:get(writer,Feed,Db),
+   {ok,W=#writer{count=Count}} = kvs:get(writer,Feed,#kvs{db=Db}),
    NC = Count-1,
-   kvs:save(W#writer{count=NC},Db),
+   kvs:save(W#writer{count=NC},#kvs{db=Db}),
    NC.
 
 append(Rec,Feed)    -> append(Rec,Feed,db()).
 append(Rec,Feed,Db) ->
-   kvs:ensure(#writer{id=Feed},Db),
+   kvs:ensure(#writer{id=Feed},#kvs{db=Db}),
    Name = element(1,Rec),
    Id = element(2,Rec),
-   case kvs:get(Name,Id,Db) of
+   case kvs:get(Name,Id,#kvs{db=Db}) of
         {ok,_}    -> Id;
-        {error,_} -> kvs:save(kvs:add((kvs:writer(Feed,Db))#writer{args=Rec},Db),Db), Id end.
+        {error,_} -> kvs:save(kvs:add((kvs:writer(Feed,#kvs{db=Db}))#writer{args=Rec},#kvs{db=Db}),#kvs{db=Db}), Id end.

+ 1 - 1
src/stores/kvs_fs.erl

@@ -5,7 +5,7 @@
 -include_lib("stdlib/include/qlc.hrl").
 -export(?BACKEND).
 
-db()        -> [].
+db()        -> "".
 
 start()     -> ok.
 

+ 1 - 1
src/stores/kvs_mnesia.erl

@@ -7,7 +7,7 @@
 -export(?BACKEND).
 -export([info/1,exec/1,dump/1]).
 
-db()       -> [].
+db()       -> "".
 start()    -> mnesia:start().
 stop()     -> mnesia:stop().
 destroy(_)  -> [mnesia:delete_table(T)||{_,T}<-kvs:dir()], mnesia:delete_schema([node()]), ok.

+ 1 - 1
src/stores/kvs_rocks.erl

@@ -91,7 +91,7 @@ run(Key, % key
 initialize() -> [ kvs:initialize(kvs_rocks,Module) || Module <- kvs:modules() ].
 index(_,_,_) -> [].
 
-ref_env(Db)       -> list_to_atom("rocks_ref" ++ Db).
+ref_env(Db)       -> list_to_atom("rocks_ref_" ++ Db).
 db()              -> application:get_env(kvs,rocks_name,"rocksdb").
 start()           -> ok.
 stop()            -> ok.

+ 15 - 14
src/stores/kvs_st.erl

@@ -51,8 +51,8 @@ feed(Feed,Db) ->
          end,
   feed(fun(#reader{}=R) -> take(R#reader{args=4},Db) end,Top,[],Halt).
 
-feed(F,#reader{},Acc,H) when H =< 0 -> Acc;
-feed(F,#reader{cache=C1,count=Cn}=R,Acc,H) ->
+feed(_F,#reader{},Acc,H) when H =< 0 -> Acc;
+feed(F,#reader{cache=C1}=R,Acc,H) ->
   #reader{args=A, cache=Ch, feed=Feed} = R1 = F(R),
   case Ch of
     C1 -> Acc ++ A;
@@ -63,21 +63,22 @@ feed(F,#reader{cache=C1,count=Cn}=R,Acc,H) ->
   end.
 
 load_reader(Id) -> load_reader(Id,db()).
-load_reader(Id,Db) -> case kvs:get(reader,Id,Db) of {ok,#reader{}=C} -> C; _ -> #reader{id=kvs:seq([],[])} end.
+load_reader(Id,Db) -> case kvs:get(reader,Id,#kvs{db=Db,mod=kvs_rocks}) of {ok,#reader{}=C} -> C; _ -> #reader{id=kvs:seq([],[])} end.
 
 writer(Id) -> writer(Id,db()).
-writer(Id,Db) -> case kvs:get(writer,Id,Db) of {ok,W} -> W; {error,_} -> #writer{id=Id} end.
+writer(Id,Db) -> case kvs:get(writer,Id,#kvs{db=Db,mod=kvs_rocks}) of {ok,W} -> W; {error,_} -> #writer{id=Id} end.
 reader(Id) -> reader(Id,db()).
-reader(Id,Db) -> case kvs:get(writer,Id,Db) of
+reader(Id,Db) -> case kvs:get(writer,Id,#kvs{db=Db,mod=kvs_rocks}) of
   {ok,#writer{id=Feed, count=Cn, cache=Ch}} ->
     read_it(#reader{id=kvs:seq([],[]),feed=key(Feed),count=Cn,cache=Ch},seek_it(key(Feed),Db));
   {error,_} ->
     read_it(#reader{id=kvs:seq([],[]),feed=key(Id),count=0,cache=[]},seek_it(key(Id),Db))
   end.
-save(C) ->
+save(C)    -> save(C,db()).
+save(C,Db) ->
   N1 = case id(C) of [] -> si(C,kvs:seq([],[])); _ -> C end,
   NC = c4(N1,[]),
-  kvs:put(NC), NC.
+  kvs:put(NC,#kvs{db=Db,mod=kvs_rocks}), NC.
 
 % add
 raw_append(M,Feed) -> raw_append(M,Feed,db()).
@@ -94,22 +95,22 @@ add(M,#writer{id=Feed,count=S}=C,Db) ->
 
 remove(Rec,Feed) -> remove(Rec,Feed,db()).
 remove(Rec,Feed,Db) ->
-  kvs:ensure(#writer{id=Feed},#kvs{db=Db}),
-  W = #writer{count=C, cache=Ch} = kvs:writer(Feed,Db),
+  kvs:ensure(#writer{id=Feed},#kvs{db=Db,mod=kvs_rocks}),
+  W = #writer{count=C, cache=Ch} = kvs:writer(Feed,#kvs{db=Db,mod=kvs_rocks}),
   Ch1 = case {e(1,Rec),e(2,Rec),key(Feed)} of % need to keep reference for next element
               Ch -> R = reader(Feed,Db), e(4, prev(R#reader{cache=Ch},Db));
               _ -> Ch end,
-  case kvs:delete(Feed,id(Rec),Db) of
+  case kvs:delete(Feed,id(Rec),#kvs{db=Db,mod=kvs_rocks}) of
         ok -> Count = C - 1,
-              save(W#writer{count = Count, cache=Ch1}),
+              save(W#writer{count = Count, cache=Ch1},Db),
               Count;
         _ -> C end.
 
 append(Rec,Feed) -> append(Rec,Feed,db()).
 append(Rec,Feed,Db) ->
-   kvs:ensure(#writer{id=Feed},#kvs{db=Db}),
+   kvs:ensure(#writer{id=Feed},#kvs{db=Db,mod=kvs_rocks}),
    Id = e(2,Rec),
    W = writer(Feed,Db),
-   case kvs:get(Feed,Id,Db) of
+   case kvs:get(Feed,Id,#kvs{db=Db,mod=kvs_rocks}) of
         {ok,_} -> raw_append(Rec,Feed,Db), Id;
-        {error,_} -> save(add(W#writer{args=Rec},Db)), Id end.
+        {error,_} -> save(add(W#writer{args=Rec},Db),Db), Id end.