Просмотр исходного кода

limit max number of operations based on estimate

dxt 3 лет назад
Родитель
Сommit
d52ffb6e03
3 измененных файлов с 38 добавлено и 8 удалено
  1. 7 4
      src/stores/kvs_rocks.erl
  2. 12 4
      src/stores/kvs_st.erl
  3. 19 0
      test/fd_test.exs

+ 7 - 4
src/stores/kvs_rocks.erl

@@ -4,7 +4,7 @@
 -include("metainfo.hrl").
 -include_lib("stdlib/include/qlc.hrl").
 -export(?BACKEND).
--export([ref/0,bt/1,key/2,key/1,fd/1,tb/1]).
+-export([ref/0,bt/1,key/2,key/1,fd/1,tb/1,estimate/0]).
 -export([seek_it/1, move_it/3, take_it/4]).
 
 e(X,Y) -> element(X,Y).
@@ -60,9 +60,6 @@ run(Key, % key
   S = sz(SK),
   Initial_Object = {ref(), []},
   
-  % refresh memtables/sst/cached data in current position
-  Refresh = fun(H) -> rocksdb:iterator_refresh(H) end,
-
   Run = fun (F,K,H,V,Acc) when binary_part(K,{0,S}) == SK -> {F(H,Dir),H,[V|Acc]}; % continue +------------+
             (_,K,H,V,Acc) -> stop_it(H),                                           % fail-safe closing     |
                              throw({ok, fd(K), bt(V), [bt(A1) || A1 <- Acc]}) end, % acc unfold            |
@@ -129,6 +126,12 @@ put(Records) when is_list(Records) -> lists:map(fun(Record) -> put(Record) end,
 put(Record) -> rocksdb:put(ref(), key(Record), term_to_binary(Record), [{sync,true}]).
 delete(Feed, Id) -> rocksdb:delete(ref(), key(Feed,Id), []).
 count(_) -> 0.
+estimate() -> case rocksdb:get_property(ref(), <<"rocksdb.estimate-num-keys">>) of
+                {ok, Est} when is_binary(Est)  -> binary_to_integer(Est);
+                {ok, Est} when is_list(Est)    -> list_to_integer(Est);
+                {ok, Est} when is_integer(Est) -> Est;
+                _ -> 0 
+            end.
 
 shd([]) -> [];
 shd(X) -> hd(X).

+ 12 - 4
src/stores/kvs_st.erl

@@ -3,7 +3,7 @@
 -include("stream.hrl").
 -include("metainfo.hrl").
 -export(?STREAM).
--import(kvs_rocks, [key/2, key/1, bt/1, tb/1, ref/0, seek_it/1, move_it/3, take_it/4]).
+-import(kvs_rocks, [key/2, key/1, bt/1, tb/1, ref/0, seek_it/1, move_it/3, take_it/4, estimate/0]).
 -export([raw_append/2]).
 
 se(X,Y,Z) -> setelement(X,Y,Z).
@@ -34,14 +34,22 @@ take(#reader{args=N,feed=Feed,cache=I,dir=_}=C) -> read_it(C,take_it(k(Feed,I),F
 drop(#reader{args=N}=C) when N =< 0 -> C;
 drop(#reader{}=C) -> (take(C#reader{dir=0}))#reader{args=[]}.
 
-feed(Feed) -> feed(fun(#reader{}=R) -> take(R#reader{args=4}) end, top(reader(Feed)),[]).
-feed(F,#reader{cache=C1}=R,Acc) ->
+feed(Feed) ->
+  #reader{count=Cn} = Top = top(reader(Feed)),
+  Halt = case {estimate(),Cn} of 
+          {E,C} when E =< 0 -> max(C,4);
+          {E,_} -> E
+         end,
+  feed(fun(#reader{}=R) -> take(R#reader{args=4}) end,Top,[],Halt).
+
+feed(F,#reader{},Acc,H) when H =< 0 -> Acc;
+feed(F,#reader{cache=C1,count=Cn}=R,Acc,H) ->
   #reader{args=A, cache=Ch, feed=Feed} = R1 = F(R),
   case Ch of
     C1 -> Acc ++ A;
     {_,_,K} when binary_part(K,{0,byte_size(Feed)}) == Feed
             andalso length(A) == 4
-      -> feed(F, R1, Acc ++ A);
+      -> feed(F, R1, Acc ++ A, H-4);
     _ -> Acc ++ A
   end.
 

+ 19 - 0
test/fd_test.exs

@@ -113,6 +113,25 @@ defmodule Fd.Test do
         assert KVS.reader(cache: {:msg, _, "//one/two"}) = :kvs.reader("/one/two")
     end
 
+    test "corrupted writers doesn't affect all" do
+        prev = :kvs.all("/crm/duck")
+        
+        KVS.writer(cache: ch) = w = :kvs.writer("/crm/duck")
+        w1 = KVS.writer(w, cache: {:msg, "unknown", "/corrupted"})
+        
+        :ok = :kvs_rocks.put(w1)
+        w2 = :kvs.writer("/crm/duck")
+        assert {:ok, ^w2} = :kvs.get(:writer, "/crm/duck")
+        assert w1 == w2
+
+        assert prev = :kvs.all("/crm/duck")
+        
+        {:ok,_} = :kvs.get(:writer, "/crm/duck")
+        :ok = :kvs.delete(:writer, "/crm/duck")
+        {:error, :not_found} = :kvs.get(:writer, "/crm/duck")
+        assert prev = :kvs.all("/crm/duck")
+    end
+
     defp log(x), do: IO.puts '#{inspect(x)}'
     defp log(m, x), do: IO.puts '#{m} #{inspect(x)}'
 end