SunRiseGG 3 years ago
parent
commit
fc66a437cc
10 changed files with 239 additions and 169 deletions
  1. 1 1
      include/api.hrl
  2. 6 5
      include/backend.hrl
  3. 1 0
      include/kvs.hrl
  4. 2 2
      include/stream.hrl
  5. 64 38
      src/kvs.erl
  6. 52 37
      src/layers/kvs_stream.erl
  7. 14 12
      src/stores/kvs_fs.erl
  8. 10 9
      src/stores/kvs_mnesia.erl
  9. 39 30
      src/stores/kvs_rocks.erl
  10. 50 35
      src/stores/kvs_st.erl

+ 1 - 1
include/api.hrl

@@ -1,6 +1,6 @@
 -ifndef(API_HRL).
 -define(API_HRL, true).
--define(API,[start/0,stop/0,leave/0,leave/1,destroy/0,feed/2,
+-define(API,[start/0,stop/0,leave/0,leave/1,destroy/0,
              join/0,join/1,modules/0,cursors/0,get/2,get/3,put/1,put/2,index/3,delete/2,
              table/1,tables/0,dir/0,initialize/2,seq/2,all/1,all/2,count/1,ver/0]).
 -include("metainfo.hrl").

+ 6 - 5
include/backend.hrl

@@ -1,11 +1,12 @@
 -ifndef(BACKEND_HRL).
 -define(BACKEND_HRL, true).
--define(BACKEND, [get/2,put/1,delete/2,index/3,dump/0,start/0,stop/0,destroy/0,
-                  join/1,leave/0,dir/0,create_table/2,add_table_index/2,seq/2,all/1,count/1,version/0]).
+-define(BACKEND, [db/0,get/3,put/2,delete/3,index/3,dump/0,start/0,stop/0,destroy/1,
+                  join/2,leave/1,dir/0,create_table/2,add_table_index/2,seq/2,all/2,count/1,version/0]).
 -compile({no_auto_import,[get/1,put/2]}).
--spec put(tuple() | list(tuple())) -> ok | {error,any()}.
--spec get(term() | any(), any()) -> {ok,any()} | {error,not_found}.
--spec delete(term(), any()) -> ok | {error,not_found}.
+-include("kvs.hrl").
+-spec put(tuple() | list(tuple()), #kvs{}) -> ok | {error,any()}.
+-spec get(term() | any(), any(), #kvs{}) -> {ok,any()} | {error,not_found}.
+-spec delete(term(), any(), #kvs{}) -> ok | {error,not_found}.
 -spec dump() -> ok.
 -spec start() -> ok.
 -spec stop() -> ok.

+ 1 - 0
include/kvs.hrl

@@ -6,5 +6,6 @@
 -record(iter,   { id    = []::[] | integer(), next  = []::[] | integer(), prev  = []::[] | integer() } ).
 -record(kvs,    { mod   = kvs_mnesia :: kvs_mnesia | kvs_rocks | kvs_fs,
                   st    = kvs_stream :: kvs_stream | kvs_st,
+                  db    = []::list(),
                   cx    = []::term() }).
 -endif.

+ 2 - 2
include/stream.hrl

@@ -2,8 +2,8 @@
 -define(STREAM_HRL, true).
 -include("kvs.hrl").
 -include("cursors.hrl").
--define(STREAM, [top/1, bot/1, next/1, prev/1, drop/1, take/1, append/2, feed/1,
-                 load_reader/1, writer/1, reader/1, save/1, add/1, remove/2]).
+-define(STREAM, [top/1, top/2, bot/1, bot/2, next/1, next/2, prev/1, prev/2, drop/1, drop/2, take/1, take/2, append/2, append/3, feed/1, feed/2,
+                 load_reader/1, load_reader/2, writer/1, writer/2, reader/1, reader/2, save/1, add/1, add/2, remove/2, remove/3]).
 
 -spec top(#reader{})  -> #reader{}.
 -spec bot(#reader{})  -> #reader{}.

+ 64 - 38
src/kvs.erl

@@ -21,6 +21,7 @@
 -export([dump/0,
          metainfo/0,
          ensure/1,
+         ensure/2,
          seq_gen/0,
          fields/1,
          defined/2,
@@ -51,35 +52,37 @@ stop(_) -> ok.
 
 dba() -> application:get_env(kvs, dba, kvs_mnesia).
 
+db()  -> (dba()):db().
+
 kvs_stream() ->
     application:get_env(kvs, dba_st, kvs_stream).
 
 % kvs api
 
-all(Table) -> all(Table, #kvs{mod = dba()}).
+all(Table) -> all(Table, #kvs{mod = dba(), db = db()}).
 
 delete(Table, Key) ->
-    delete(Table, Key, #kvs{mod = dba()}).
+    delete(Table, Key, #kvs{mod = dba(), db = db()}).
 
 get(Table, Key) ->
-    (?MODULE):get(Table, Key, #kvs{mod = dba()}).
+    (?MODULE):get(Table, Key, #kvs{mod = dba(), db = db()}).
 
 index(Table, K, V) ->
     index(Table, K, V, #kvs{mod = dba()}).
 
-join() -> join([], #kvs{mod = dba()}).
+join() -> join([], #kvs{mod = dba(), db = db()}).
 
 dump() -> dump(#kvs{mod = dba()}).
 
-join(Node) -> join(Node, #kvs{mod = dba()}).
+join(Node) -> join(Node, #kvs{mod = dba(), db = db()}).
 
-leave() -> leave(#kvs{mod = dba()}).
+leave() -> leave(#kvs{mod = dba(), db = db()}).
 
-destroy() -> destroy(#kvs{mod = dba()}).
+destroy() -> destroy(#kvs{mod = dba(), db = db()}).
 
 count(Table) -> count(Table, #kvs{mod = dba()}).
 
-put(Record) -> (?MODULE):put(Record, #kvs{mod = dba()}).
+put(Record) -> (?MODULE):put(Record, #kvs{mod = dba(), db = db()}).
 
 stop() -> stop_kvs(#kvs{mod = dba()}).
 
@@ -90,22 +93,22 @@ ver() -> ver(#kvs{mod = dba()}).
 dir() -> dir(#kvs{mod = dba()}).
 
 feed(Key) ->
-    feed(Key, #kvs{mod = dba(), st = kvs_stream()}).
+    feed(Key, #kvs{mod = dba(), st = kvs_stream(), db = db()}).
 
 seq(Table, DX) -> seq(Table, DX, #kvs{mod = dba()}).
 
 remove(Rec, Feed) ->
-    remove(Rec, Feed, #kvs{mod = dba(), st = kvs_stream()}).
+    remove(Rec, Feed, #kvs{mod = dba(), st = kvs_stream(), db = db()}).
 
-put(Records, #kvs{mod = Mod}) when is_list(Records) ->
-    Mod:put(Records);
-put(Record, #kvs{mod = Mod}) -> Mod:put(Record).
+put(Records, #kvs{mod = Mod, db = Db}) when is_list(Records) ->
+    Mod:put(Records, Db);
+put(Record, #kvs{mod = Mod, db = Db}) -> Mod:put(Record, Db).
 
-get(RecordName, Key, #kvs{mod = Mod}) ->
-    Mod:get(RecordName, Key).
+get(RecordName, Key, #kvs{mod = Mod, db = Db}) ->
+    Mod:get(RecordName, Key, Db).
 
-delete(Tab, Key, #kvs{mod = Mod}) ->
-    Mod:delete(Tab, Key).
+delete(Tab, Key, #kvs{mod = Mod, db = Db}) ->
+    Mod:delete(Tab, Key, Db).
 
 count(Tab, #kvs{mod = DBA}) -> DBA:count(Tab).
 
@@ -116,22 +119,22 @@ seq(Tab, Incr, #kvs{mod = DBA}) -> DBA:seq(Tab, Incr).
 
 dump(#kvs{mod = Mod}) -> Mod:dump().
 
-feed(Tab, #kvs{st = Mod}) -> Mod:feed(Tab).
+feed(Tab, #kvs{st = Mod, db = Db}) -> Mod:feed(Tab, Db).
 
-remove(Rec, Feed, #kvs{st = Mod}) ->
-    Mod:remove(Rec, Feed).
+remove(Rec, Feed, #kvs{st = Mod, db = Db}) ->
+    Mod:remove(Rec, Feed, Db).
 
-all(Tab, #kvs{mod = DBA}) -> DBA:all(Tab).
+all(Tab, #kvs{mod = DBA, db = Db}) -> DBA:all(Tab, Db).
 
 start(#kvs{mod = DBA}) -> DBA:start().
 
 stop_kvs(#kvs{mod = DBA}) -> DBA:stop().
 
-join(Node, #kvs{mod = DBA}) -> DBA:join(Node).
+join(Node, #kvs{mod = DBA, db = Db}) -> DBA:join(Node, Db).
 
-leave(#kvs{mod = DBA}) -> DBA:leave().
+leave(#kvs{mod = DBA, db = Db}) -> DBA:leave(Db).
 
-destroy(#kvs{mod = DBA}) -> DBA:destroy().
+destroy(#kvs{mod = DBA, db = Db}) -> DBA:destroy(Db).
 
 ver(#kvs{mod = DBA}) -> DBA:version().
 
@@ -141,36 +144,59 @@ dir(#kvs{mod = DBA}) -> DBA:dir().
 
 top(X) -> (kvs_stream()):top(X).
 
+top(X,#kvs{db = Db}) -> (kvs_stream()):top(X,Db).
+
 bot(X) -> (kvs_stream()):bot(X).
 
-next(X) -> (kvs_stream()):next(X).
+bot(X,#kvs{db = Db})         -> (kvs_stream()):bot(X,Db).
+
+next(X)                      -> (kvs_stream()):next(X).
+
+next(X,#kvs{db = Db})        -> (kvs_stream()):next(X,Db).
+
+prev(X)                      -> (kvs_stream()):prev(X).
+
+prev(X,#kvs{db = Db})        -> (kvs_stream()):prev(X,Db).
+
+drop(X)                      -> (kvs_stream()):drop(X).
+
+drop(X,#kvs{db = Db})        -> (kvs_stream()):drop(X,Db).
+
+take(X)                      -> (kvs_stream()):take(X).
+
+take(X,#kvs{db = Db})        -> (kvs_stream()):take(X,Db).
+
+save(X)                      -> (kvs_stream()):save(X).
+
+cut(X, Y)                    -> (kvs_stream()):cut(X, Y).
 
-prev(X) -> (kvs_stream()):prev(X).
+add(X)                       -> (kvs_stream()):add(X).
 
-drop(X) -> (kvs_stream()):drop(X).
+add(X,#kvs{db = Db})         -> (kvs_stream()):add(X,Db).
 
-take(X) -> (kvs_stream()):take(X).
+append(X, Y)                 -> (kvs_stream()):append(X, Y).
 
-save(X) -> (kvs_stream()):save(X).
+append(X, Y, #kvs{db = Db})  -> (kvs_stream()):append(X, Y, Db).
 
-cut(X, Y) -> (kvs_stream()):cut(X, Y).
+load_reader(X)               -> (kvs_stream()):load_reader(X).
 
-add(X) -> (kvs_stream()):add(X).
+load_reader(X,#kvs{db = Db}) -> (kvs_stream()):load_reader(X,Db).
 
-append(X, Y) -> (kvs_stream()):append(X, Y).
+writer(X)                    -> (kvs_stream()):writer(X).
 
-load_reader(X) -> (kvs_stream()):load_reader(X).
+writer(X,#kvs{db = Db})      -> (kvs_stream()):writer(X,Db).
 
-writer(X) -> (kvs_stream()):writer(X).
+reader(X)                    -> (kvs_stream()):reader(X).
 
-reader(X) -> (kvs_stream()):reader(X).
+reader(X,#kvs{db = Db})      -> (kvs_stream()):reader(X,Db).
 
 % unrevisited
 
-ensure(#writer{id = Id}) ->
-    case kvs:get(writer, Id) of
+ensure(#writer{} = X) -> ensure(X,#kvs{db = db()}).
+ensure(#writer{id = Id},#kvs{} = X) ->
+    case kvs:get(writer, Id, X) of
         {error, _} ->
-            kvs:save(kvs:writer(Id)),
+            kvs:save(kvs:writer(Id, X)),
             ok;
         {ok, _} -> ok
     end.

+ 52 - 37
src/layers/kvs_stream.erl

@@ -6,6 +6,8 @@
 -export(?STREAM).
 -export([metainfo/0]).
 
+db() -> kvs:db().
+
 % boot for sample
 
 metainfo() -> #schema { name = kvs,    tables = tables() }.
@@ -42,68 +44,81 @@ w({ok,#writer{cache=B,count=Size}},top,C) -> C#reader{cache={tab(B),id(B)},pos=S
 w({error,X},_,_)                          -> {error,X}.
 
 % next, prev, top, bot
+top(#reader{}=X)          -> top(X,db()).
+top(#reader{feed=F}=C,Db) -> w(kvs:get(writer,F,Db),top,C).
+bot(#reader{}=X)          -> bot(X,db()).
+bot(#reader{feed=F}=C,Db) -> w(kvs:get(writer,F,Db),bot,C).
 
-top(#reader{feed=F}=C) -> w(kvs:get(writer,F),top,C).
-bot(#reader{feed=F}=C) -> w(kvs:get(writer,F),bot,C).
-
-next(#reader{cache=[]}) -> {error,empty};
-next(#reader{cache={T,R},pos=P}=C) -> n(kvs:get(T,R),C,P+1).
+next(#reader{}=X)         -> next(X,db()).
+next(#reader{cache=[]},_) -> {error,empty};
+next(#reader{cache={T,R},pos=P}=C,Db) -> n(kvs:get(T,R,Db),C,P+1).
 
-prev(#reader{cache=[]}) -> {error,empty};
-prev(#reader{cache={T,R},pos=P}=C) -> p(kvs:get(T,R),C,P-1).
+prev(#reader{}=X)       -> prev(X,db()).
+prev(#reader{cache=[]},_) -> {error,empty};
+prev(#reader{cache={T,R},pos=P}=C,Db) -> p(kvs:get(T,R,Db),C,P-1).
 
 % take, drop, feed
 
-drop(#reader{cache=[]}=C) -> C#reader{args=[]};
-drop(#reader{dir=D,cache=B,args=N,pos=P}=C)  -> drop(acc(D),N,C,C,P,B).
-take(#reader{cache=[]}=C) -> C#reader{args=[]};
-take(#reader{dir=D,cache=_B,args=N,pos=P}=C)  -> take(acc(D),N,C,C,[],P).
+drop(#reader{}=X)           -> drop(X,db()).
+drop(#reader{cache=[]}=C,_) -> C#reader{args=[]};
+drop(#reader{dir=D,cache=B,args=N,pos=P}=C,Db)  -> drop(acc(D),N,C,C,P,B,Db).
+take(#reader{}=X)           -> take(X,db()).
+take(#reader{cache=[]}=C,_) -> C#reader{args=[]};
+take(#reader{dir=D,cache=_B,args=N,pos=P}=C,Db)  -> take(acc(D),N,C,C,[],P,Db).
 
-take(_,_,{error,_},C2,R,P) -> C2#reader{args=lists:flatten(R),pos=P,cache={tab(hd(R)),en(hd(R))}};
-take(_,0,_,C2,R,P)         -> C2#reader{args=lists:flatten(R),pos=P,cache={tab(hd(R)),en(hd(R))}};
-take(A,N,#reader{cache={T,I},pos=P}=C,C2,R,_) -> take(A,N-1,?MODULE:A(C),C2,[element(2,kvs:get(T,I))|R],P).
+take(_,_,{error,_},C2,R,P,_) -> C2#reader{args=lists:flatten(R),pos=P,cache={tab(hd(R)),en(hd(R))}};
+take(_,0,_,C2,R,P,_)         -> C2#reader{args=lists:flatten(R),pos=P,cache={tab(hd(R)),en(hd(R))}};
+take(A,N,#reader{cache={T,I},pos=P}=C,C2,R,_,Db) -> take(A,N-1,?MODULE:A(C,Db),C2,[element(2,kvs:get(T,I,Db))|R],P,Db).
 
-drop(_,_,{error,_},C2,P,B)     -> C2#reader{pos=P,cache=B};
-drop(_,0,_,C2,P,B)             -> C2#reader{pos=P,cache=B};
-drop(A,N,#reader{cache=B,pos=P}=C,C2,_,_) -> drop(A,N-1,?MODULE:A(C),C2,P,B).
+drop(_,_,{error,_},C2,P,B,_)     -> C2#reader{pos=P,cache=B};
+drop(_,0,_,C2,P,B,_)             -> C2#reader{pos=P,cache=B};
+drop(A,N,#reader{cache=B,pos=P}=C,C2,_,_,Db) -> drop(A,N-1,?MODULE:A(C,Db),C2,P,B,Db).
 
-feed(Feed) -> #reader{args=Args} = take((reader(Feed))#reader{args=-1}), Args.
+feed(Feed)    -> feed(Feed,db()).
+feed(Feed,Db) -> #reader{args=Args} = take((reader(Feed,Db))#reader{args=-1},Db), Args.
 
 % new, save, load, writer, reader
 
-load_reader (Id) -> case kvs:get(reader,Id) of {ok,C} -> C; _ -> #reader{id=[]} end.
+load_reader (Id) -> load_reader(Id,db()).
+load_reader (Id,Db) -> case kvs:get(reader,Id,Db) of {ok,C} -> C; _ -> #reader{id=[]} end.
 
-save (C) -> NC = c4(C,[]), kvs:put(NC), NC.
-writer (Id) -> case kvs:get(writer,Id) of {ok,W} -> W; {error,_} -> #writer{id=Id} end.
-reader (Id) -> case kvs:get(writer,Id) of
+save (C) -> save(C,db()).
+save (C,Db) -> NC = c4(C,[]), kvs:put(NC,Db), NC.
+writer (Id) -> writer(Id,db()).
+writer (Id,Db) -> case kvs:get(writer,Id,Db) of {ok,W} -> W; {error,_} -> #writer{id=Id} end.
+reader (Id) -> reader(Id,db()).
+reader (Id,Db) -> case kvs:get(writer,Id,Db) of
          {ok,#writer{first=[]}} -> #reader{id=kvs:seq(reader,1),feed=Id,cache=[]};
          {ok,#writer{first=F}}  -> #reader{id=kvs:seq(reader,1),feed=Id,cache={tab(F),id(F)}};
-         {error,_} -> save(#writer{id=Id}), reader(Id) end.
+         {error,_} -> save(#writer{id=Id},Db), reader(Id,Db) end.
 
 % add, remove, append
 
-add(#writer{args=M}=C) when element(2,M) == [] -> add(si(M,kvs:seq(tab(M),1)),C);
-add(#writer{args=M}=C) -> add(M,C).
+add(#writer{}=X) -> add(X,db()).
+add(#writer{args=M}=C,Db) when element(2,M) == [] -> add(si(M,kvs:seq(tab(M),1)),C,Db);
+add(#writer{args=M}=C,Db) -> add(M,C,Db).
 
-add(M,#writer{cache=[]}=C) ->
-    _Id=id(M), N=sp(sn(M,[]),[]), kvs:put(N),
+add(M,#writer{cache=[]}=C,Db) ->
+    _Id=id(M), N=sp(sn(M,[]),[]), kvs:put(N,Db),
     C#writer{cache=N,count=1,first=N};
 
-add(M,#writer{cache=V1,count=S}=C) ->
-    {ok,V} = kvs:get(tab(V1),id(V1)),
-    N=sp(sn(M,[]),id(V)), P=sn(V,id(M)), kvs:put([N,P]),
+add(M,#writer{cache=V1,count=S}=C,Db) ->
+    {ok,V} = kvs:get(tab(V1),id(V1),Db),
+    N=sp(sn(M,[]),id(V)), P=sn(V,id(M)), kvs:put([N,P],Db),
     C#writer{cache=N,count=S+1}.
 
-remove(_Rec,Feed) ->
-   {ok,W=#writer{count=Count}} = kvs:get(writer,Feed),
+remove(Rec,Feed)  -> remove(Rec,Feed,db()).
+remove(_Rec,Feed,Db) ->
+   {ok,W=#writer{count=Count}} = kvs:get(writer,Feed,Db),
    NC = Count-1,
-   kvs:save(W#writer{count=NC}),
+   kvs:save(W#writer{count=NC},Db),
    NC.
 
-append(Rec,Feed) ->
-   kvs:ensure(#writer{id=Feed}),
+append(Rec,Feed)    -> append(Rec,Feed,db()).
+append(Rec,Feed,Db) ->
+   kvs:ensure(#writer{id=Feed},Db),
    Name = element(1,Rec),
    Id = element(2,Rec),
-   case kvs:get(Name,Id) of
+   case kvs:get(Name,Id,Db) of
         {ok,_}    -> Id;
-        {error,_} -> kvs:save(kvs:add((kvs:writer(Feed))#writer{args=Rec})), Id end.
+        {error,_} -> kvs:save(kvs:add((kvs:writer(Feed,Db))#writer{args=Rec},Db),Db), Id end.

+ 14 - 12
src/stores/kvs_fs.erl

@@ -5,19 +5,21 @@
 -include_lib("stdlib/include/qlc.hrl").
 -export(?BACKEND).
 
-start()    -> ok.
+db()        -> [].
 
-stop()     -> ok.
+start()     -> ok.
 
-destroy()  -> ok.
+stop()      -> ok.
 
-version()  -> {version,"KVS FS"}.
+destroy(_)  -> ok.
 
-leave()    -> ok.
+version()   -> {version,"KVS FS"}.
 
-dir()      -> [ {table,F} || F <- filelib:wildcard("data/*"), filelib:is_dir(F) ].
+leave(_)     -> ok.
 
-join(_Node) -> filelib:ensure_dir(dir_name()), initialize(). % should be rsync or smth
+dir()       -> [ {table,F} || F <- filelib:wildcard("data/*"), filelib:is_dir(F) ].
+
+join(_Node,_) -> filelib:ensure_dir(dir_name()), initialize(). % should be rsync or smth
 
 initialize() ->
     mnesia:create_schema([node()]),
@@ -25,7 +27,7 @@ initialize() ->
     mnesia:wait_for_tables([ T#table.name || T <- kvs:tables()],infinity).
 
 index(_Tab,_Key,_Value) -> [].
-get(TableName, Key) ->
+get(TableName, Key, _) ->
     HashKey = hashkey(Key),
     {ok, Dir} = dir(TableName),
     File = filename:join([Dir,HashKey]),
@@ -33,8 +35,8 @@ get(TableName, Key) ->
          {ok,Binary} -> {ok,binary_to_term(Binary,[safe])};
          {error,Reason} -> {error,Reason} end.
 
-put(Records) when is_list(Records) -> lists:map(fun(Record) -> put(Record) end, Records);
-put(Record) ->
+put(Records,X) when is_list(Records) -> lists:map(fun(Record) -> put(Record,X) end, Records);
+put(Record,_) ->
     TableName = element(1,Record),
     HashKey = hashkey(element(2,Record)),
     BinaryValue = term_to_binary(Record),
@@ -55,7 +57,7 @@ dir(TableName) ->
 
 hashkey(Key) -> encode(base64:encode(crypto:hash(sha, term_to_binary(Key)))).
 
-delete(TableName, Key) ->
+delete(TableName, Key, _) ->
     case kvs_fs:get(TableName, Key) of
         {ok,_} ->
             {ok, Dir} = dir(TableName),
@@ -67,7 +69,7 @@ delete(TableName, Key) ->
 
 count(RecordName) -> length(filelib:fold_files(filename:join([dir_name(), RecordName]), "",true, fun(A,Acc)-> [A|Acc] end, [])).
 
-all(R) -> lists:flatten([ begin case file:read_file(File) of
+all(R,_) -> lists:flatten([ begin case file:read_file(File) of
                         {ok,Binary} -> binary_to_term(Binary,[safe]);
                         {error,_Reason} -> [] end end || File <-
       filelib:fold_files(filename:join([dir_name(), R]), "",true, fun(A,Acc)-> [A|Acc] end, []) ]).

+ 10 - 9
src/stores/kvs_mnesia.erl

@@ -7,15 +7,16 @@
 -export(?BACKEND).
 -export([info/1,exec/1,dump/1]).
 
+db()       -> [].
 start()    -> mnesia:start().
 stop()     -> mnesia:stop().
-destroy()  -> [mnesia:delete_table(T)||{_,T}<-kvs:dir()], mnesia:delete_schema([node()]), ok.
-leave()    -> ok.
+destroy(_)  -> [mnesia:delete_table(T)||{_,T}<-kvs:dir()], mnesia:delete_schema([node()]), ok.
+leave(_)    -> ok.
 version()  -> {version,"KVS MNESIA"}.
 dir()      -> [{table,T}||T<-mnesia:system_info(local_tables)].
-join([])   -> mnesia:start(), mnesia:change_table_copy_type(schema, node(), disc_copies), initialize();
+join([], _)   -> mnesia:start(), mnesia:change_table_copy_type(schema, node(), disc_copies), initialize();
 
-join(Node) ->
+join(Node, _) ->
     mnesia:start(),
     mnesia:change_config(extra_db_nodes, [Node]),
     mnesia:change_table_copy_type(schema, node(), disc_copies),
@@ -32,16 +33,16 @@ initialize() ->
 index(Tab,Key,Value) ->
     lists:flatten(many(fun() -> mnesia:index_read(Tab,Value,Key) end)).
 
-get(RecordName, Key) -> just_one(fun() -> mnesia:read(RecordName, Key) end).
-put(Records) when is_list(Records) -> void(fun() -> lists:foreach(fun mnesia:write/1, Records) end);
-put(Record) -> put([Record]).
-delete(Tab, Key) ->
+get(RecordName, Key, _) -> just_one(fun() -> mnesia:read(RecordName, Key) end).
+put(Records, _) when is_list(Records) -> void(fun() -> lists:foreach(fun mnesia:write/1, Records) end);
+put(Record, X) -> put([Record], X).
+delete(Tab, Key, _) ->
     case mnesia:activity(context(),fun()-> mnesia:delete({Tab, Key}) end) of
         {aborted,Reason} -> {error,Reason};
         {atomic,_Result} -> ok;
         _ -> ok end.
 count(RecordName) -> mnesia:table_info(RecordName, size).
-all(R) -> lists:flatten(many(fun() -> L= mnesia:all_keys(R), [ mnesia:read({R, G}) || G <- L ] end)).
+all(R, _) -> lists:flatten(many(fun() -> L= mnesia:all_keys(R), [ mnesia:read({R, G}) || G <- L ] end)).
 seq([],[]) ->
   case os:type() of
        {win32,nt} -> {Mega,Sec,Micro} = erlang:timestamp(), integer_to_list((Mega*1000000+Sec)*1000000+Micro);

+ 39 - 30
src/stores/kvs_rocks.erl

@@ -4,8 +4,8 @@
 -include("metainfo.hrl").
 -include_lib("stdlib/include/qlc.hrl").
 -export(?BACKEND).
--export([ref/0,bt/1,key/2,key/1,fd/1,tb/1,estimate/0]).
--export([seek_it/1, move_it/3, take_it/4]).
+-export([ref/0,ref/1,bt/1,key/2,key/1,fd/1,tb/1,estimate/0,estimate/1]).
+-export([seek_it/1, seek_it/2, move_it/3, move_it/4, take_it/4, take_it/5]).
 
 e(X,Y) -> element(X,Y).
 
@@ -50,15 +50,16 @@ fd(K) -> Key = tb(K),
   end,
   binary:part(Key,{0,S}).
 
-run(<<>>,SK,_,_) -> {ok,SK,[],[]};
+run(<<>>,SK,_,_,_) -> {ok,SK,[],[]};
 run(Key, % key
   SK,  % sup-key
   Dir, % direction next/prev
-  Compiled_Operations) ->
+  Compiled_Operations,
+  Db) ->
        % H is iterator reference
 
   S = sz(SK),
-  Initial_Object = {ref(), []},
+  Initial_Object = {ref(Db), []},
 
   Run = fun (F,K,H,V,Acc) when binary_part(K,{0,S}) == SK -> {F(H,Dir),H,[V|Acc]}; % continue +------------+
             (_,K,H,V,Acc) -> stop_it(H),                                           % fail-safe closing     |
@@ -90,17 +91,21 @@ run(Key, % key
 initialize() -> [ kvs:initialize(kvs_rocks,Module) || Module <- kvs:modules() ].
 index(_,_,_) -> [].
 
-start()    -> ok.
-stop()     -> ok.
-destroy()  -> rocksdb:destroy(application:get_env(kvs,rocks_name,"rocksdb"), []).
-version()  -> {version,"KVS ROCKSDB"}.
-dir()      -> [].
-ref()      -> application:get_env(kvs,rocks_ref,[]).
-leave()    -> case ref() of [] -> skip; X -> rocksdb:close(X), application:set_env(kvs,rocks_ref,[]), ok end.
-join(_)    -> application:start(rocksdb),
-              leave(), {ok, Ref} = rocksdb:open(application:get_env(kvs,rocks_name,"rocksdb"), [{create_if_missing, true}]),
+ref_env(Db)       -> list_to_atom("rocks_ref" ++ Db).
+db()              -> application:get_env(kvs,rocks_name,"rocksdb").
+start()           -> ok.
+stop()            -> ok.
+destroy(Db)       -> rocksdb:destroy(Db, []).
+version()         -> {version,"KVS ROCKSDB"}.
+dir()             -> [].
+ref()             -> ref(db()).
+ref(Db)           -> application:get_env(kvs,ref_env(Db),[]).
+leave(Db)         -> case ref(Db) of [] -> skip; X -> rocksdb:close(X), application:set_env(kvs,ref_env(Db),[]), ok end.
+join(_,Db)        ->
+              application:start(rocksdb),
+              leave(Db), {ok, Ref} = rocksdb:open(Db, [{create_if_missing, true}]),
               initialize(),
-              application:set_env(kvs,rocks_ref,Ref).
+              application:set_env(kvs,ref_env(Db),Ref).
 
 compile(seek) -> [fun rocksdb:iterator/2,fun rocksdb:iterator_move/2];
 compile(move) -> [fun rocksdb:iterator_move/2];
@@ -108,25 +113,29 @@ compile(close) -> [fun rocksdb:iterator_close/1].
 compile(take,N) -> lists:map(fun(_) -> fun rocksdb:iterator_move/2 end, lists:seq(1, N)).
 
 stop_it(H) -> try begin [F]=compile(close), F(H) end catch error:badarg -> ok end.
-seek_it(K) -> run(K,K,ok,compile(seek)).
-move_it(Key,SK,Dir) -> run(Key,SK,Dir,compile(seek) ++ compile(move)).
-take_it(Key,SK,Dir,N) when is_integer(N) andalso N >= 0 -> run(Key,SK,Dir,compile(seek) ++ compile(take,N));
-take_it(Key,SK,Dir,_) -> take_it(Key,SK,Dir,0).
-
-all(R) -> kvs_st:feed(R).
-
-get(Tab, {step,N,[208|_]=Key}) -> get(Tab, {step,N,list_to_binary(Key)});
-get(Tab, [208|_]=Key) -> get(Tab, list_to_binary(Key));
-get(Tab, Key) ->
-    case rocksdb:get(ref(), key(Tab,Key), []) of
+seek_it(K) -> seek_it(K,db()).
+seek_it(K,Db) -> run(K,K,ok,compile(seek),Db).
+move_it(Key,SK,Dir) -> move_it(Key,SK,Dir,db()).
+move_it(Key,SK,Dir,Db) -> run(Key,SK,Dir,compile(seek) ++ compile(move),Db).
+take_it(Key,SK,Dir,N) -> take_it(Key,SK,Dir,N,db()).
+take_it(Key,SK,Dir,N,Db) when is_integer(N) andalso N >= 0 -> run(Key,SK,Dir,compile(seek) ++ compile(take,N),Db);
+take_it(Key,SK,Dir,_,Db) -> take_it(Key,SK,Dir,0,Db).
+
+all(R,Db) -> kvs_st:feed(R,Db).
+
+get(Tab, {step,N,[208|_]=Key}, Db) -> get(Tab, {step,N,list_to_binary(Key)},Db);
+get(Tab, [208|_]=Key, Db) -> get(Tab, list_to_binary(Key), Db);
+get(Tab, Key, Db) ->
+    case rocksdb:get(ref(Db), key(Tab,Key), []) of
          not_found -> {error,not_found};
          {ok,Bin} -> {ok,bt(Bin)} end.
 
-put(Records) when is_list(Records) -> lists:map(fun(Record) -> put(Record) end, Records);
-put(Record) -> rocksdb:put(ref(), key(Record), term_to_binary(Record), [{sync,true}]).
-delete(Feed, Id) -> rocksdb:delete(ref(), key(Feed,Id), []).
+put(Records,Db) when is_list(Records) -> lists:map(fun(Record) -> put(Record,Db) end, Records);
+put(Record,Db) -> rocksdb:put(ref(Db), key(Record), term_to_binary(Record), [{sync,true}]).
+delete(Feed, Id, Db) -> rocksdb:delete(ref(Db), key(Feed,Id), []).
 count(_) -> 0.
-estimate() -> case rocksdb:get_property(ref(), <<"rocksdb.estimate-num-keys">>) of
+estimate()   -> estimate(db()).
+estimate(Db) -> case rocksdb:get_property(ref(Db), <<"rocksdb.estimate-num-keys">>) of
                 {ok, Est} when is_binary(Est)  -> binary_to_integer(Est);
                 {ok, Est} when is_list(Est)    -> list_to_integer(Est);
                 {ok, Est} when is_integer(Est) -> Est;

+ 50 - 35
src/stores/kvs_st.erl

@@ -3,8 +3,10 @@
 -include("stream.hrl").
 -include("metainfo.hrl").
 -export(?STREAM).
--import(kvs_rocks, [key/2, key/1, bt/1, tb/1, ref/0, seek_it/1, move_it/3, take_it/4, estimate/0]).
--export([raw_append/2]).
+-import(kvs_rocks, [key/2, key/1, bt/1, tb/1, ref/0, ref/1, seek_it/1, seek_it/2, move_it/3, move_it/4, take_it/4, take_it/5, estimate/0, estimate/1]).
+-export([raw_append/2,raw_append/3]).
+
+db() -> application:get_env(kvs,rocks_name,"rocksdb").
 
 se(X,Y,Z) -> setelement(X,Y,Z).
 e(X,Y) -> element(X,Y).
@@ -25,22 +27,29 @@ read_it(C,{ok,_,[],H}) -> C#reader{cache=[], args=lists:reverse(H)};
 read_it(C,{ok,F,V,H})  -> C#reader{cache={e(1,V),id(V),F}, args=lists:reverse(H)};
 read_it(C,_) -> C#reader{args=[]}.
 
-top(#reader{feed=Feed}=C) -> #writer{count=Cn} = writer(f2(Feed)), read_it(C#reader{count=Cn},seek_it(Feed)).
-bot(#reader{feed=Feed}=C) -> #writer{cache=Ch, count=Cn} = writer(f2(Feed)), C#reader{cache=Ch, count=Cn}.
-next(#reader{feed=Feed,cache=I}=C) -> read_it(C,move_it(k(Feed,I),Feed,next)).
-prev(#reader{cache=I,feed=Feed}=C) -> read_it(C,move_it(k(Feed,I),Feed,prev)).
-take(#reader{args=N,feed=Feed,cache=I,dir=1}=C) -> read_it(C,take_it(k(Feed,I),Feed,prev,N));
-take(#reader{args=N,feed=Feed,cache=I,dir=_}=C) -> read_it(C,take_it(k(Feed,I),Feed,next,N)).
-drop(#reader{args=N}=C) when N =< 0 -> C;
-drop(#reader{}=C) -> (take(C#reader{dir=0}))#reader{args=[]}.
+top(#reader{}=X) -> top(X,db()).
+top(#reader{feed=Feed}=C,Db) -> #writer{count=Cn} = writer(f2(Feed),Db), read_it(C#reader{count=Cn},seek_it(Feed,Db)).
+bot(#reader{}=X) -> bot(X,db()).
+bot(#reader{feed=Feed}=C,Db) -> #writer{cache=Ch, count=Cn} = writer(f2(Feed),Db), C#reader{cache=Ch, count=Cn}.
+next(#reader{}=X) -> next(X,db()).
+next(#reader{feed=Feed,cache=I}=C,Db) -> read_it(C,move_it(k(Feed,I),Feed,next,Db)).
+prev(#reader{}=X) -> prev(X,db()).
+prev(#reader{cache=I,feed=Feed}=C,Db) -> read_it(C,move_it(k(Feed,I),Feed,prev,Db)).
+take(#reader{}=X) -> take(X,db()).
+take(#reader{args=N,feed=Feed,cache=I,dir=1}=C,Db) -> read_it(C,take_it(k(Feed,I),Feed,prev,N,Db));
+take(#reader{args=N,feed=Feed,cache=I,dir=_}=C,Db) -> read_it(C,take_it(k(Feed,I),Feed,next,N,Db)).
+drop(#reader{}=X) -> drop(X,db()).
+drop(#reader{args=N}=C,_) when N =< 0 -> C;
+drop(#reader{}=C,Db) -> (take(C#reader{dir=0},Db))#reader{args=[]}.
 
-feed(Feed) ->
-  #reader{count=Cn} = Top = top(reader(Feed)),
+feed(Feed) -> feed(Feed,db()).
+feed(Feed,Db) ->
+  #reader{count=Cn} = Top = top(reader(Feed,Db),Db),
   Halt = case {estimate(),Cn} of 
           {E,C} when E =< 0 -> max(C,4);
           {E,_} -> E
          end,
-  feed(fun(#reader{}=R) -> take(R#reader{args=4}) end,Top,[],Halt).
+  feed(fun(#reader{}=R) -> take(R#reader{args=4},Db) end,Top,[],Halt).
 
 feed(F,#reader{},Acc,H) when H =< 0 -> Acc;
 feed(F,#reader{cache=C1,count=Cn}=R,Acc,H) ->
@@ -53,14 +62,17 @@ feed(F,#reader{cache=C1,count=Cn}=R,Acc,H) ->
     _ -> Acc ++ A
   end.
 
-load_reader(Id) -> case kvs:get(reader,Id) of {ok,#reader{}=C} -> C; _ -> #reader{id=kvs:seq([],[])} end.
+load_reader(Id) -> load_reader(Id,db()).
+load_reader(Id,Db) -> case kvs:get(reader,Id,Db) of {ok,#reader{}=C} -> C; _ -> #reader{id=kvs:seq([],[])} end.
 
-writer(Id) -> case kvs:get(writer,Id) of {ok,W} -> W; {error,_} -> #writer{id=Id} end.
-reader(Id) -> case kvs:get(writer,Id) of
+writer(Id) -> writer(Id,db()).
+writer(Id,Db) -> case kvs:get(writer,Id,Db) of {ok,W} -> W; {error,_} -> #writer{id=Id} end.
+reader(Id) -> reader(Id,db()).
+reader(Id,Db) -> case kvs:get(writer,Id,Db) of
   {ok,#writer{id=Feed, count=Cn, cache=Ch}} ->
-    read_it(#reader{id=kvs:seq([],[]),feed=key(Feed),count=Cn,cache=Ch},seek_it(key(Feed)));
+    read_it(#reader{id=kvs:seq([],[]),feed=key(Feed),count=Cn,cache=Ch},seek_it(key(Feed),Db));
   {error,_} ->
-    read_it(#reader{id=kvs:seq([],[]),feed=key(Id),count=0,cache=[]},seek_it(key(Id)))
+    read_it(#reader{id=kvs:seq([],[]),feed=key(Id),count=0,cache=[]},seek_it(key(Id),Db))
   end.
 save(C) ->
   N1 = case id(C) of [] -> si(C,kvs:seq([],[])); _ -> C end,
@@ -68,33 +80,36 @@ save(C) ->
   kvs:put(NC), NC.
 
 % add
+raw_append(M,Feed) -> raw_append(M,Feed,db()).
+raw_append(M,Feed,Db) -> rocksdb:put(ref(Db), key(Feed,e(2,M)), term_to_binary(M), [{sync,true}]).
 
-raw_append(M,Feed) -> rocksdb:put(ref(), key(Feed,e(2,M)), term_to_binary(M), [{sync,true}]).
-
-add(#writer{args=M}=C) when element(2,M) == [] -> add(si(M,kvs:seq([],[])),C);
-add(#writer{args=M}=C) -> add(M,C).
+add(#writer{}=X) -> add(X, db()).
+add(#writer{args=M}=C,Db) when element(2,M) == [] -> add(si(M,kvs:seq([],[])),C,Db);
+add(#writer{args=M}=C,Db) -> add(M,C,Db).
 
-add(M,#writer{id=Feed,count=S}=C) ->
+add(M,#writer{id=Feed,count=S}=C,Db) ->
    NS=S+1,
-   raw_append(M,Feed),
+   raw_append(M,Feed,Db),
    C#writer{cache={e(1,M),e(2,M),key(Feed)},count=NS}.
 
-remove(Rec,Feed) ->
-  kvs:ensure(#writer{id=Feed}),
-  W = #writer{count=C, cache=Ch} = kvs:writer(Feed),
+remove(Rec,Feed) -> remove(Rec,Feed,db()).
+remove(Rec,Feed,Db) ->
+  kvs:ensure(#writer{id=Feed},#kvs{db=Db}),
+  W = #writer{count=C, cache=Ch} = kvs:writer(Feed,Db),
   Ch1 = case {e(1,Rec),e(2,Rec),key(Feed)} of % need to keep reference for next element
-              Ch -> R = reader(Feed), e(4, prev(R#reader{cache=Ch}));
+              Ch -> R = reader(Feed,Db), e(4, prev(R#reader{cache=Ch},Db));
               _ -> Ch end,
-  case kvs:delete(Feed,id(Rec)) of
+  case kvs:delete(Feed,id(Rec),Db) of
         ok -> Count = C - 1,
               save(W#writer{count = Count, cache=Ch1}),
               Count;
         _ -> C end.
 
-append(Rec,Feed) ->
-   kvs:ensure(#writer{id=Feed}),
+append(Rec,Feed) -> append(Rec,Feed,db()).
+append(Rec,Feed,Db) ->
+   kvs:ensure(#writer{id=Feed},#kvs{db=Db}),
    Id = e(2,Rec),
-   W = writer(Feed),
-   case kvs:get(Feed,Id) of
-        {ok,_} -> raw_append(Rec,Feed), Id;
-        {error,_} -> save(add(W#writer{args=Rec})), Id end.
+   W = writer(Feed,Db),
+   case kvs:get(Feed,Id,Db) of
+        {ok,_} -> raw_append(Rec,Feed,Db), Id;
+        {error,_} -> save(add(W#writer{args=Rec},Db)), Id end.