Browse Source

remove by iteration (for small feeds)

dxt 2 years ago
parent
commit
572443cb23
4 changed files with 54 additions and 15 deletions
  1. 4 1
      src/kvs.erl
  2. 8 6
      src/layers/kvs_st.erl
  3. 23 4
      src/stores/kvs_rocks.erl
  4. 19 4
      test/fd_test.exs

+ 4 - 1
src/kvs.erl

@@ -28,7 +28,8 @@
          fields/1,
          defined/2,
          field/2,
-         setfield/3]).
+         setfield/3,
+         remove/1]).
 
 -export([join/2, seq/3]).
 
@@ -195,6 +196,8 @@ save(X)                      -> (kvs_stream()):save(X).
 
 save(X,#kvs{db = Db})        -> (kvs_stream()):save(X,Db).
 
+remove(X)                    -> (kvs_stream()):remove(X).
+
 cut(X)                       -> (kvs_stream()):cut(X).
 
 cut(X,#kvs{db = Db})         -> (kvs_stream()):cut(X, Db).

+ 8 - 6
src/layers/kvs_st.erl

@@ -3,8 +3,8 @@
 -include("stream.hrl").
 -include("metainfo.hrl").
 -export(?STREAM).
--import(kvs_rocks, [fmt/1, key/2, key/1, bt/1, tb/1, ref/0, ref/1, seek_it/1, seek_it/2, move_it/3, move_it/4, take_it/4, take_it/5, estimate/0, estimate/1]).
--export([raw_append/2,raw_append/3]).
+-import(kvs_rocks, [fmt/1, key/2, key/1, bt/1, tb/1, ref/0, ref/1, seek_it/1, seek_it/2, move_it/3, move_it/4, take_it/4, take_it/5, delete_it/2, estimate/0, estimate/1]).
+-export([raw_append/2,raw_append/3, remove/1]).
 
 db() -> application:get_env(kvs,rocks_name,"rocksdb").
 
@@ -41,6 +41,9 @@ take(#reader{args=N,feed=Feed,cache=I,dir=_}=C,Db) -> read_it(C,take_it(k(Feed,I
 drop(#reader{}=X) -> drop(X,db()).
 drop(#reader{args=N}=C,_) when N =< 0 -> C;
 drop(#reader{}=C,Db) -> (take(C#reader{dir=0},Db))#reader{args=[]}.
+remove(#reader{}=C) -> remove(C, db()).
+remove(#reader{feed=Feed}=C,Db) -> read_it(C, delete_it(Feed,Db));
+remove(Rec,Feed) -> remove(Rec,Feed,db()).
 
 feed(Feed) -> feed(Feed,db()).
 feed(Feed,Db) ->
@@ -95,13 +98,12 @@ add(M,#writer{id=Feed,count=S}=C,Db) ->
 
 cut(Feed) -> cut(Feed,db()).
 cut(Feed,Db) ->
-  #writer{cache={_,Key,Fd}=Ch} = W = kvs:writer(Feed, #kvs{db=Db,mod=kvs_rocks}),
-  #reader{cache=Prev} = kvs:prev(reader(Feed, Db)),
-  #reader{cache=Next} = kvs:next(#reader{feed=key(Feed), cache=Ch}),
+  #writer{cache={_,Key,Fd}=Ch} = kvs:writer(Feed, #kvs{db=Db,mod=kvs_rocks}),
+  #reader{} = kvs:prev(reader(Feed, Db)),
+  #reader{} = kvs:next(#reader{feed=key(Feed), cache=Ch}),
 
   kvs:delete_range(Feed,{Fd,Key},#kvs{db=Db,mod=kvs_rocks}).
 
-remove(Rec,Feed) -> remove(Rec,Feed,db()).
 remove(Rec,Feed,Db) ->
   kvs:ensure(#writer{id=Feed},#kvs{db=Db,mod=kvs_rocks}),
   W = #writer{count=C, cache=Ch} = kvs:writer(Feed,#kvs{db=Db,mod=kvs_rocks}),

+ 23 - 4
src/stores/kvs_rocks.erl

@@ -5,7 +5,7 @@
 -include_lib("stdlib/include/qlc.hrl").
 -export(?BACKEND).
 -export([ref/0,ref/1,bt/1,key/2,key/1,fd/1,tb/1,estimate/0,estimate/1]).
--export([seek_it/1, seek_it/2, move_it/3, move_it/4, take_it/4, take_it/5]).
+-export([seek_it/1, seek_it/2, move_it/3, move_it/4, take_it/4, take_it/5, delete_it/1, delete_it/2]).
 
 e(X,Y) -> element(X,Y).
 
@@ -123,10 +123,27 @@ join(_,Db)       ->
               initialize(),
               application:set_env(kvs,ref_env(Db),Ref).
 
-compile(seek) -> [fun rocksdb:iterator/2,fun rocksdb:iterator_move/2];
-compile(move) -> [fun rocksdb:iterator_move/2];
-compile(close) -> [fun rocksdb:iterator_close/1].
+compile(it)     -> [fun rocksdb:iterator/2];
+compile(seek)   -> [fun rocksdb:iterator/2,fun rocksdb:iterator_move/2];
+compile(move)   -> [fun rocksdb:iterator_move/2];
+compile(close)  -> [fun rocksdb:iterator_close/1].
 compile(take,N) -> lists:map(fun(_) -> fun rocksdb:iterator_move/2 end, lists:seq(1, N)).
+compile(delete,_, {error,E},_) -> {error,E};
+compile(delete,SK,{ok,_,V1,_},Db) ->
+  F1 = key(key(fmt(SK),e(2,V1))), S = sz(SK),
+  [fun Del(H,Dir) ->
+    case rocksdb:delete(ref(Db), F1, []) of ok ->      
+      % {ok, K} case exist only in api, but never actually used
+      case rocksdb:iterator_move(H,Dir) of
+        {ok,K,_} when binary_part(K,{0,S}) == SK -> case rocksdb:delete(ref(Db), K, []) of ok -> Del(H,Dir); E -> E end;
+        {ok,K}   when binary_part(K,{0,S}) == SK -> case rocksdb:delete(ref(Db), K, []) of ok -> Del(H,Dir); E -> E end;
+        {ok,K,V} -> {ok,K,V};
+        {ok,K}   -> {ok, K};
+        E -> E
+      end;
+      E -> E
+    end
+  end].
 
 stop_it(H) -> try begin [F]=compile(close), F(H) end catch error:badarg -> ok end.
 seek_it(K) -> seek_it(K,db()).
@@ -136,6 +153,8 @@ move_it(Key,SK,Dir,Db) -> run(Key,SK,Dir,compile(seek) ++ compile(move),Db).
 take_it(Key,SK,Dir,N) -> take_it(Key,SK,Dir,N,db()).
 take_it(Key,SK,Dir,N,Db) when is_integer(N) andalso N >= 0 -> run(Key,SK,Dir,compile(seek) ++ compile(take,N),Db);
 take_it(Key,SK,Dir,_,Db) -> take_it(Key,SK,Dir,0,Db).
+delete_it(Fd) -> delete_it(Fd, db()).
+delete_it(Fd,Db) -> run(Fd,Fd,next,compile(seek) ++ compile(delete,Fd,seek_it(Fd),Db),Db).
 
 all(R,Db) -> kvs_st:feed(R,Db).
 

+ 19 - 4
test/fd_test.exs

@@ -106,10 +106,6 @@ defmodule Fd.Test do
     end
 
     test "cut the *uck", kvs do
-        log "1", kvs[:id0]
-        log "2", kvs[:id1]
-        log "3", kvs[:id2]
-
         :kvs.cut("/crm/luck")
 
         all = :kvs.all("/crm")
@@ -128,6 +124,25 @@ defmodule Fd.Test do
         assert 0 = length(all)
     end
 
+    test "remove the *uck with readers", kvs do
+        :kvs.remove(:kvs.reader("/crm/luck"))
+
+        all = :kvs.all("/crm")
+        assert 20 = length(all)
+        assert all = :kvs.all("/crm/duck") ++ :kvs.all("/crm/truck")
+
+        :kvs.remove(:kvs.reader("/crm/duck"))
+
+        all = :kvs.all("/crm")
+        assert 10 = length(all)
+        assert all = :kvs.all("/crm/truck")
+
+        :kvs.remove(:kvs.reader("/crm/truck"))
+
+        all = :kvs.all("/crm")
+        assert 0 = length(all)
+    end
+
     @tag :skip # can`t manage this within current implementation. create correct keys!
     test "keys with feeds separator" do
         :kvs.append(msg(id: "1/1"), "/one/two")