Browse Source

fix take for rocksdb

Namdak Tonpa 5 years ago
parent
commit
435cc09242
6 changed files with 85 additions and 32 deletions
  1. 2 1
      include/stream.hrl
  2. 5 2
      mix.exs
  3. 1 1
      src/kvs.app.src
  4. 16 8
      src/kvs.erl
  5. 2 2
      src/layers/kvs_stream.erl
  6. 59 18
      src/stores/kvs_st.erl

+ 2 - 1
include/stream.hrl

@@ -3,7 +3,7 @@
 -include("kvs.hrl").
 -include("kvs.hrl").
 -include("cursors.hrl").
 -include("cursors.hrl").
 -define(STREAM, [top/1, bot/1, next/1, prev/1, drop/1, take/1, append/2, cut/2,
 -define(STREAM, [top/1, bot/1, next/1, prev/1, drop/1, take/1, append/2, cut/2,
-                 load_reader/1, writer/1, reader/1, save/1, add/1, feed/1]).
+                 load_reader/1, writer/1, reader/1, save/1, add/1, remove/2]).
 -spec top(#reader{}) -> #reader{}.
 -spec top(#reader{}) -> #reader{}.
 -spec bot(#reader{}) -> #reader{}.
 -spec bot(#reader{}) -> #reader{}.
 -spec next(#reader{}) -> #reader{} | {error,not_found | empty}.
 -spec next(#reader{}) -> #reader{} | {error,not_found | empty}.
@@ -16,5 +16,6 @@
 -spec save (#reader{} | #writer{}) -> #reader{} | #writer{}.
 -spec save (#reader{} | #writer{}) -> #reader{} | #writer{}.
 -spec add(#writer{}) -> #writer{}.
 -spec add(#writer{}) -> #writer{}.
 -spec append(tuple(),term()) -> any().
 -spec append(tuple(),term()) -> any().
+-spec remove(tuple(),term()) -> integer().
 -spec cut(term(),term()) -> {ok,non_neg_integer()} | {error, not_found}.
 -spec cut(term(),term()) -> {ok,non_neg_integer()} | {error, not_found}.
 -endif.
 -endif.

+ 5 - 2
mix.exs

@@ -2,7 +2,7 @@ defmodule KVS.Mixfile do
   use Mix.Project
   use Mix.Project
 
 
   def project do
   def project do
-    [app: :kvs, version: "6.10.2", description: "KVS Abstract Chain Database", package: package(), deps: deps()]
+    [app: :kvs, version: "6.11.0", description: "KVS Abstract Chain Database", package: package(), deps: deps()]
   end
   end
 
 
   def application do
   def application do
@@ -18,6 +18,9 @@ defmodule KVS.Mixfile do
   end
   end
 
 
   defp deps do
   defp deps do
-    [{:ex_doc, "~> 0.11", only: :dev}]
+    [
+     {:ex_doc, "~> 0.11", only: :dev},
+    # {:rocksdb, "~> 1.3.2"}
+    ]
   end
   end
 end
 end

+ 1 - 1
src/kvs.app.src

@@ -1,6 +1,6 @@
 {application, kvs,
 {application, kvs,
    [{description, "KVS Abstract Chain Database"},
    [{description, "KVS Abstract Chain Database"},
-    {vsn, "6.10.2"},
+    {vsn, "6.11.0"},
     {registered, []},
     {registered, []},
     {applications, [kernel,stdlib]},
     {applications, [kernel,stdlib]},
     {mod, { kvs, []}},
     {mod, { kvs, []}},

+ 16 - 8
src/kvs.erl

@@ -9,7 +9,7 @@
 -include("cursors.hrl").
 -include("cursors.hrl").
 -include("kvs.hrl").
 -include("kvs.hrl").
 -include("backend.hrl").
 -include("backend.hrl").
--export([dump/0,check/0,metainfo/0,ensure/1,seq_gen/0,fold/6,fold/7,head/1,head/2,fetch/2,fetch/3]).
+-export([dump/0,check/0,metainfo/0,ensure/1,seq_gen/0,fold/6,fold/7,head/1,head/2,fetch/2,fetch/3,feed/2]).
 -export(?API).
 -export(?API).
 -export(?STREAM).
 -export(?STREAM).
 -compile(export_all).
 -compile(export_all).
@@ -41,7 +41,7 @@ stop()             -> stop_kvs(#kvs{mod=dba()}).
 start()            -> start   (#kvs{mod=dba()}).
 start()            -> start   (#kvs{mod=dba()}).
 ver()              -> ver(#kvs{mod=dba()}).
 ver()              -> ver(#kvs{mod=dba()}).
 dir()              -> dir     (#kvs{mod=dba()}).
 dir()              -> dir     (#kvs{mod=dba()}).
-feed(Key)          -> feed    (Key, #kvs{mod=dba()}).
+feed(Key)          -> feed    (Key, #kvs{mod=dba(),st=kvs_stream()}).
 seq(Table,DX)      -> seq     (Table, DX, #kvs{mod=dba()}).
 seq(Table,DX)      -> seq     (Table, DX, #kvs{mod=dba()}).
 
 
 % stream api
 % stream api
@@ -119,7 +119,12 @@ count(Tab,#kvs{mod=DBA}) -> DBA:count(Tab).
 index(Tab, Key, Value,#kvs{mod=DBA}) -> DBA:index(Tab, Key, Value).
 index(Tab, Key, Value,#kvs{mod=DBA}) -> DBA:index(Tab, Key, Value).
 seq(Tab, Incr,#kvs{mod=DBA}) -> DBA:seq(Tab, Incr).
 seq(Tab, Incr,#kvs{mod=DBA}) -> DBA:seq(Tab, Incr).
 dump(#kvs{mod=Mod}) -> Mod:dump().
 dump(#kvs{mod=Mod}) -> Mod:dump().
-feed(Key,#kvs{st=Mod}) -> Mod:feed(Key).
+feed(Key,#kvs{st=Mod}=KVS) -> (Mod:take((kvs:reader(Key))#reader{args=-1}))#reader.args.
+
+remove(Rec,Feed) -> remove(Rec,Feed,#kvs{mod=dba(),st=kvs_stream()}).
+
+remove(Rec,Feed, #kvs{st=Mod}=KVS) -> Mod:remove(Rec,Feed).
+
 head(Key) -> case (kvs:take((kvs:reader(Key))#reader{args=1}))#reader.args of [X] -> X; [] -> [] end.
 head(Key) -> case (kvs:take((kvs:reader(Key))#reader{args=1}))#reader.args of [X] -> X; [] -> [] end.
 head(Key,Count) -> (kvs:take((kvs:reader(Key))#reader{args=Count,dir=1}))#reader.args.
 head(Key,Count) -> (kvs:take((kvs:reader(Key))#reader{args=Count,dir=1}))#reader.args.
 
 
@@ -129,15 +134,18 @@ check() ->
     Id1 = {list1,kvs:seq([],[])},
     Id1 = {list1,kvs:seq([],[])},
     Id2 = {list2,kvs:seq([],[])},
     Id2 = {list2,kvs:seq([],[])},
     X   = 5,
     X   = 5,
-    _   = kvs:save(kvs:writer(Id1)),
-    _   = kvs:save(kvs:writer(Id2)),
+    W1   = kvs:save(kvs:writer(Id1)),
+    W2   = kvs:save(kvs:writer(Id2)),
     [ kvs:save(kvs:add((kvs:writer(Id1))#writer{args={'$msg',[],[],[],[],[]}})) || _ <- lists:seq(1,X) ],
     [ kvs:save(kvs:add((kvs:writer(Id1))#writer{args={'$msg',[],[],[],[],[]}})) || _ <- lists:seq(1,X) ],
     [ kvs:append({'$msg',[],[],[],[],[]},Id2) || _ <- lists:seq(1,X) ],
     [ kvs:append({'$msg',[],[],[],[],[]},Id2) || _ <- lists:seq(1,X) ],
-    #reader{args=A} = (kvs:take(kvs:reader(Id1)))#reader{args=20},
+    R1  = kvs:save(kvs:reader(Id1)),
+    R2  = kvs:save(kvs:reader(Id2)),
+    R = kvs:take((kvs:load_reader(R2#reader.id))#reader{args=20}),
     B = kvs:feed(Id1),
     B = kvs:feed(Id1),
     C = kvs:feed(Id2),
     C = kvs:feed(Id2),
-    ?assertMatch(A,20),
-    ?assertMatch(X,length(B)).
+    ?assertMatch(5,length(R#reader.args)),
+ %   ?assertMatch(X,length(B)),
+ ok.
 
 
 fetch(Table, Key) -> fetch(Table, Key, []).
 fetch(Table, Key) -> fetch(Table, Key, []).
 fetch(Table, Key, Default) -> case get(Table, Key) of
 fetch(Table, Key, Default) -> case get(Table, Key) of

+ 2 - 2
src/layers/kvs_stream.erl

@@ -79,8 +79,6 @@ save (C) -> NC = c4(C,[]), kvs:put(NC), NC.
 
 
 % add
 % add
 
 
-feed(Key) -> (kvs:take((kvs:reader(Key))#reader{args=-1}))#reader.args.
-
 add(#writer{args=M}=C) when element(2,M) == [] -> add(si(M,kvs:seq(tab(M),1)),C);
 add(#writer{args=M}=C) when element(2,M) == [] -> add(si(M,kvs:seq(tab(M),1)),C);
 add(#writer{args=M}=C) -> add(M,C).
 add(#writer{args=M}=C) -> add(M,C).
 
 
@@ -97,6 +95,8 @@ add(M,#writer{cache=V1,count=S}=C) ->
     N=sp(sn(M,[]),id(V)), P=sn(V,id(M)), kvs:put([N,P]),
     N=sp(sn(M,[]),id(V)), P=sn(V,id(M)), kvs:put([N,P]),
     C#writer{cache=N,count=S+1}.
     C#writer{cache=N,count=S+1}.
 
 
+remove(Rec,Feed) -> -1.
+
 append(Rec,Feed) ->
 append(Rec,Feed) ->
    kvs:ensure(#writer{id=Feed}),
    kvs:ensure(#writer{id=Feed}),
    Name = element(1,Rec),
    Name = element(1,Rec),

+ 59 - 18
src/stores/kvs_st.erl

@@ -4,7 +4,7 @@
 -include("stream.hrl").
 -include("stream.hrl").
 -include("metainfo.hrl").
 -include("metainfo.hrl").
 -export(?STREAM).
 -export(?STREAM).
--export([prev/8]).
+-export([prev/8,ref/0,feed_key/2]).
 
 
 ref() -> kvs_rocks:ref().
 ref() -> kvs_rocks:ref().
 
 
@@ -23,13 +23,25 @@ top  (#reader{}=C) -> C.
 bot  (#reader{}=C) -> C.
 bot  (#reader{}=C) -> C.
 
 
 next (#reader{cache=[]}) -> {error,empty};
 next (#reader{cache=[]}) -> {error,empty};
-next (#reader{cache=I}=C) ->
+next (#reader{feed=Feed,cache=I}=C) when is_tuple(I) ->
+   Key = feed_key(I,Feed),
+   rocksdb:iterator_move(I, {seek,Key}),
+   case rocksdb:iterator_move(I, next) of
+        {ok,_,Bin} -> C#reader{cache=binary_to_term(Bin,[safe])};
+            {error,Reason} -> {error,Reason} end;
+next (#reader{cache=I}=C) when is_reference(I) ->
    case rocksdb:iterator_move(I, next) of
    case rocksdb:iterator_move(I, next) of
         {ok,_,Bin} -> C#reader{cache=binary_to_term(Bin,[safe])};
         {ok,_,Bin} -> C#reader{cache=binary_to_term(Bin,[safe])};
             {error,Reason} -> {error,Reason} end.
             {error,Reason} -> {error,Reason} end.
 
 
 prev (#reader{cache=[]}) -> {error,empty};
 prev (#reader{cache=[]}) -> {error,empty};
-prev (#reader{cache=I}=C) ->
+prev (#reader{cache=I,id=Feed}=C) when is_tuple(I) ->
+   Key = feed_key(I,Feed),
+   rocksdb:iterator_move(I, {seek,Key}),
+   case rocksdb:iterator_move(I, prev) of
+        {ok,_,Bin} -> C#reader{cache=binary_to_term(Bin,[safe])};
+            {error,Reason} -> {error,Reason} end;
+prev (#reader{cache=I}=C) when is_reference(I) ->
    case rocksdb:iterator_move(I, prev) of
    case rocksdb:iterator_move(I, prev) of
         {ok,_,Bin} -> C#reader{cache=binary_to_term(Bin,[safe])};
         {ok,_,Bin} -> C#reader{cache=binary_to_term(Bin,[safe])};
             {error,Reason} -> {error,Reason} end.
             {error,Reason} -> {error,Reason} end.
@@ -59,11 +71,30 @@ drop(#reader{args=N,feed=Feed,cache=I}=C) when N > 0 ->
            lists:seq(0,N)),
            lists:seq(0,N)),
    C#reader{cache=binary_to_term(element(1,element(2,Term)))}.
    C#reader{cache=binary_to_term(element(1,element(2,Term)))}.
 
 
-take(#reader{args=N,feed=Feed,cache=I}=C) ->
+take(#reader{args=N,feed=Feed,cache={T,O}}=C) ->
    Key = list_to_binary(lists:concat(["/",kvs_rocks:format(Feed)])),
    Key = list_to_binary(lists:concat(["/",kvs_rocks:format(Feed)])),
-   Fir = rocksdb:iterator_move(I, {seek,Key}),
-   Res = kvs_rocks:next(I,Key,size(Key),Fir,[],[],N,0),
-   C#reader{args=Res}.
+   {ok,I} = rocksdb:iterator(ref(), []),
+   {ok,K,BERT} = rocksdb:iterator_move(I, {seek,feed_key({T,O},Feed)}),
+   Fir = binary_to_term(BERT),
+   Res = kvs_rocks:next(I,Key,size(Key),K,BERT,[],N+1,0),
+   io:format("Fir: ~p~n",[Fir]),
+   io:format("Res: ~p~n",[Res]),
+   case {Res,length(Res) < N + 1} of
+        {[],_}    -> C#reader{args=[],cache=I};
+        {[H|X],false} -> C#reader{args=X,cache={e(1,H),e(2,H)}};
+        {[H|X],true} -> C#reader{args=Res,cache=[]} end;
+
+take(#reader{args=N,feed=Feed,cache=I}=C) when is_reference(I) ->
+   Key = list_to_binary(lists:concat(["/",kvs_rocks:format(Feed)])),
+   {ok,K,BERT} = rocksdb:iterator_move(I, {seek,Key}),
+   Fir = binary_to_term(BERT),
+   Res = kvs_rocks:next(I,Key,size(Key),K,BERT,[],N+1,0),
+   io:format("Fir: ~p~n",[Fir]),
+   io:format("Res: ~p~n",[Res]),
+   case {Res,length(Res) < N + 1} of
+        {[],_}    -> C#reader{args=[],cache=I};
+        {[H|X],false} -> C#reader{args=X,cache={e(1,H),e(2,H)}};
+        {[H|X],true} -> C#reader{args=Res,cache=[]} end.
 
 
 % new, save, load, up, down, top, bot
 % new, save, load, up, down, top, bot
 
 
@@ -75,13 +106,14 @@ load_reader (Id) ->
 writer (Id) -> case kvs:get(writer,Id) of {ok,W} -> W; {error,_} -> #writer{id=Id} end.
 writer (Id) -> case kvs:get(writer,Id) of {ok,W} -> W; {error,_} -> #writer{id=Id} end.
 reader (Id) ->
 reader (Id) ->
     case kvs:get(writer,Id) of
     case kvs:get(writer,Id) of
-         {ok,#writer{}} ->
+         {ok,#writer{id=Feed}} ->
+             Key = list_to_binary(lists:concat(["/",kvs_rocks:format(Feed)])),
              {ok,I} = rocksdb:iterator(ref(), []),
              {ok,I} = rocksdb:iterator(ref(), []),
-             #reader{id=kvs:seq([],[]),feed=Id,cache=I};
+             {ok,K,BERT} = rocksdb:iterator_move(I, {seek,Key}),
+             F = binary_to_term(BERT),
+             #reader{id=kvs:seq([],[]),feed=Id,cache={e(1,F),e(2,F)}};
          {error,_} -> #reader{} end.
          {error,_} -> #reader{} end.
-save (C) -> NC = c4(C,[]), N2 = c3(NC,[]), kvs:put(N2), N2.
-
-feed(Key) -> kvs:all(Key).
+save (C) -> NC = c4(C,[]), kvs:put(NC), NC.
 
 
 % add
 % add
 
 
@@ -92,17 +124,26 @@ add(M,#writer{id=Feed,count=S}=C) -> NS=S+1,
     raw_append(M,Feed),
     raw_append(M,Feed),
     C#writer{cache=M,count=NS}.
     C#writer{cache=M,count=NS}.
 
 
-raw_append(M,Feed) ->
-    rocksdb:put(ref(),
-       <<(list_to_binary(lists:concat(["/",kvs_rocks:format(Feed),"/"])))/binary,
-         (term_to_binary(id(M)))/binary>>, term_to_binary(M), [{sync,true}]).
+feed_key(M,Feed) -> <<(list_to_binary(lists:concat(["/",kvs_rocks:format(Feed),"/"])))/binary,(term_to_binary(id(M)))/binary>>.
+raw_append(M,Feed) -> rocksdb:put(ref(), feed_key(M,Feed), term_to_binary(M), [{sync,true}]).
+
+remove(Rec,Feed) ->
+   kvs:ensure(#writer{id=Feed}),
+   W = #writer{count=C} = kvs:writer(Feed),
+   {ok,I} = rocksdb:iterator(ref(), []),
+   case kvs:delete(Feed,id(Rec)) of
+        ok -> Count = C - 1,
+              kvs:save(W#writer{count = Count, cache = I}),
+              Count;
+         _ -> C end.
 
 
 append(Rec,Feed) ->
 append(Rec,Feed) ->
    kvs:ensure(#writer{id=Feed}),
    kvs:ensure(#writer{id=Feed}),
    Id = element(2,Rec),
    Id = element(2,Rec),
+   W = kvs:writer(Feed),
    case kvs:get(Feed,Id) of
    case kvs:get(Feed,Id) of
-        {ok,_}    -> raw_append(Rec,Feed), Id;
-        {error,_} -> kvs:save(kvs:add((kvs:writer(Feed))#writer{args=Rec})), Id end.
+        {ok,_}    -> raw_append(Rec,Feed), kvs:save(W#writer{cache=Rec,count=W#writer.count + 1}), Id;
+        {error,_} -> kvs:save(kvs:add(W#writer{args=Rec,cache=Rec})), Id end.
 
 
 prev(_,_,_,_,_,_,N,C) when C == N -> C;
 prev(_,_,_,_,_,_,N,C) when C == N -> C;
 prev(I,Key,S,{ok,A,X},_,T,N,C) -> prev(I,Key,S,A,X,T,N,C);
 prev(I,Key,S,{ok,A,X},_,T,N,C) -> prev(I,Key,S,A,X,T,N,C);