Browse Source

allow to brake back into feed traversal when cache is out.

dxt 4 years ago
parent
commit
00a08a7afd
3 changed files with 23 additions and 5 deletions
  1. 10 2
      src/stores/kvs_rocks.erl
  2. 1 1
      test/fd_test.exs
  3. 12 2
      test/sc_test.exs

+ 10 - 2
src/stores/kvs_rocks.erl

@@ -45,16 +45,24 @@ o(Key,FK,Dir,Fx) ->
   Sheaf = fun (F,K,H,V,Acc) when binary_part(K,{0,S}) == FK -> {F(H,Dir),H,[V|Acc]};
                                               (_,K,H,V,Acc) -> close_it(H),
                                                                throw({ok,fd(K),bt(V),[bt(A1)||A1<-Acc]}) end,
+  Break = fun(F,K,V,H) -> case F(H,prev) of
+      {ok,K1,V1} when binary_part(K,{0,S}) == FK -> {{ok,K1,V1},H,[V]};
+      {ok,K1,V1} -> Sheaf(F,K1,H,V1,[]);
+      E -> E
+  end end,
 
   It = fun(F,{ok,H})            -> {F(H,{seek,Key}),H};
+          (F,{{ok,K,V},H})
+              when Dir =:= prev -> Break(F,K,V,H);
           (F,{{ok,K,V},H})      -> Sheaf(F,K,H,V,[]);
           (F,{{ok,K,V},H,A})    -> Sheaf(F,K,H,V,A);
           (_,{{error,_},H,Acc}) -> {{ok,[],[]},H,Acc};
           (F,{R,O})             -> F(R,O);
           (F,H)                 -> F(H) end,
   catch case lists:foldl(It, {ref(),[]}, Fx) of
-    {{ok,K,Bin},_,A} -> {ok,fd(K),bt(Bin),[bt(A1)||A1<-A]};
-    {{ok,K,Bin},_}   -> {ok,fd(K),bt(Bin),[]}
+    {{ok,K,Bin},_,A}  -> {ok,fd(K), bt(Bin),[bt(A1)||A1<-A]};
+    {{ok,K,Bin},_}    -> {ok,fd(K), bt(Bin),[]};
+    {{error,_},H,Acc} -> {ok,fd(FK),bt(shd(Acc)),[bt(A1) ||A1<-Acc]}
   end.
 
 start()    -> ok.

+ 1 - 1
test/fd_test.exs

@@ -1,7 +1,7 @@
 ExUnit.start()
 
 defmodule Fd.Test do
-    use ExUnit.Case, async: true
+    use ExUnit.Case, async: false
     require KVS
     import Record
 

+ 12 - 2
test/sc_test.exs

@@ -1,7 +1,7 @@
 ExUnit.start()
 
 defmodule Sc.Test do
-    use ExUnit.Case, async: true
+    use ExUnit.Case, async: false
     require KVS
     import Record
     @moduledoc """
@@ -17,7 +17,6 @@ defmodule Sc.Test do
         id2: :lists.map(fn _ -> :kvs.append(msg(id: :kvs.seq([],[])), "/crm/truck") end, :lists.seq(1,10)),
         id3: :lists.map(fn _ -> :kvs.save(:kvs.add(KVS.writer(:kvs.writer(:sym),
                                         args: msg(id: :kvs.seq([],[]))))) end, :lists.seq(1,10))]
-
     test "basic", kvs do
         id1 = "/crm/luck"
         id2 = "/crm/truck"
@@ -41,6 +40,17 @@ defmodule Sc.Test do
         {:ok, KVS.writer(id: :sym, count: 10, cache: last)} = :kvs.get(:writer, :sym)
     end
 
+    test "take back full" do
+        feed = "/crm/duck"
+        :kvs.save(:kvs.writer(feed))
+        KVS.reader(id: rid) = :kvs.save(:kvs.reader(feed))
+        t = KVS.reader(args: a1) = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: 10))
+        assert a1 == :kvs.feed(feed)
+        :kvs.save(KVS.reader(t, dir: 1))
+        KVS.reader(args: a2) = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: 10))
+        assert :lists.reverse(a2) == :kvs.feed(feed)
+  end
+
     defp log(x), do: IO.puts '#{inspect(x)}'
     defp log(m, x), do: IO.puts '#{m} #{inspect(x)}'
 end