Browse Source

introduce o, seek iterator, top+bottom, sync cache

dxt 4 years ago
parent
commit
53c453c8ce
3 changed files with 48 additions and 11 deletions
  1. 30 0
      src/stores/kvs_rocks.erl
  2. 13 11
      src/stores/kvs_st.erl
  3. 5 0
      test/st_test.exs

+ 30 - 0
src/stores/kvs_rocks.erl

@@ -5,6 +5,7 @@
 -include_lib("stdlib/include/qlc.hrl").
 -export(?BACKEND).
 -export([ref/0,cut/8,next/8,prev/8,prev2/8,next2/8,bt/1,key/2,key/1]).
+-export([seek_it/1]).
 
 e(X,Y)     -> element(X,Y).
 bt([])     -> [];
@@ -20,6 +21,31 @@ key(R)     -> key(R,[]).
 key(Tab,R) when is_tuple(R) andalso tuple_size(R) > 1 -> key(Tab, e(2,R));
 key(Tab,R) -> iolist_to_binary([lists:join(<<"/">>, lists:flatten([<<>>, tb(Tab), tb(R)]))]).
 
+fd(Key) ->
+  B = lists:reverse(binary:split(tb(Key), [<<"/">>, <<"//">>], [global, trim_all])),
+  B1 = lists:reverse(case B of [] -> [];[X] -> [X];[_|T] -> T end),
+  iolist_to_binary(lists:join(<<"/">>, [<<>>]++B1)).
+
+o(<<>>,FK,_,_) -> {ok,FK,[],[]};
+o(Key,FK,Dir,Fx) ->
+  S = size(FK),
+
+  Sheaf = fun (F,K,H,V,Acc) when binary_part(K,{0,S}) == FK -> {F(H,Dir),H,[V|Acc]};
+                                              (_,K,H,V,Acc) -> close_it(H),
+                                                               throw({ok,fd(K),bt(V),[bt(A1)||A1<-Acc]}) end,
+
+  It = fun(F,{ok,H})            -> {F(H,{seek,Key}),H};
+          (F,{{ok,K,V},H})      -> Sheaf(F,K,H,V,[]);
+          (F,{{ok,K,V},H,A})    -> Sheaf(F,K,H,V,A);
+          (_,{{error,_},H,Acc}) -> {{ok,[],[]},H,Acc};
+          (F,{R,O})             -> F(R,O);
+          (F,H)                 -> F(H) end,
+  catch case lists:foldl(It, {ref(),[]}, Fx) of
+    {{ok,K,Bin},_,A} when binary_part(K,{0,S}) == FK  -> {ok,fd(K),bt(Bin),[bt(A1)||A1<-A]};
+    {{ok,K,Bin},_,_}                                  -> {ok,fd(K),bt(Bin),[]};
+    {{ok,K,Bin},_}                                    -> {ok,fd(K),bt(Bin),[]}
+  end.
+
 start()    -> ok.
 stop()     -> ok.
 destroy()  -> rocksdb:destroy(application:get_env(kvs,rocks_name,"rocksdb"), []).
@@ -33,6 +59,10 @@ join(_) -> application:start(rocksdb),
 initialize() -> [ kvs:initialize(kvs_rocks,Module) || Module <- kvs:modules() ].
 ref() -> application:get_env(kvs,rocks_ref,[]).
 index(_,_,_) -> [].
+
+close_it(H) -> try rocksdb:iterator_close(H) catch error:badarg -> ok end.
+seek_it(K) -> o(K,K,ok,[fun rocksdb:iterator/2,fun rocksdb:iterator_move/2]).
+
 get(Tab, Key) ->
     case rocksdb:get(ref(), key(Tab,Key), []) of
          not_found -> {error,not_found};

+ 13 - 11
src/stores/kvs_st.erl

@@ -4,7 +4,7 @@
 -include("stream.hrl").
 -include("metainfo.hrl").
 -export(?STREAM).
--import(kvs_rocks, [key/2, key/1, bt/1, ref/0]).
+-import(kvs_rocks, [key/2, key/1, bt/1, ref/0, seek_it/1]).
 
 % section: kvs_stream prelude
 
@@ -17,8 +17,8 @@ id(T) -> e(#it.id, T).
 % section: next, prev
 feed(Feed) -> take((reader(Feed))#reader{args=-1}).
 
-top(#reader{}=C) -> C#reader{dir=1}.
-bot(#reader{}=C) -> C#reader{dir=0}.
+top(#reader{feed=Feed}=C) -> #writer{count=Cn} = writer(Feed), read_it(C#reader{count=Cn},seek_it(key(Feed))).
+bot(#reader{feed=Feed}=C) -> #writer{cache=Ch, count=Cn} = writer(Feed), C#reader{cache=Ch, count=Cn, dir=1}.
 
 % handle -> seek -> move
 move_it(Key,Dir) -> 
@@ -36,6 +36,8 @@ move_it(Key,Dir) ->
   end.
 
 % iterator -> specific feed reader
+read_it(C,{ok,F,V,H}) -> C#reader{cache={e(1,V),id(V)}, args=lists:reverse(H)}; % real cache {F,e(1,V),id(V)}
+read_it(C,_) -> C.
 read_it(C, Feed, Move) ->
   case Move of 
     {ok, Bin} when element(1,Bin) =:= Feed -> C#reader{cache=Bin};
@@ -126,11 +128,11 @@ load_reader(Id) ->
 
 writer(Id) -> case kvs:get(writer,Id) of {ok,W} -> W; {error,_} -> #writer{id=Id} end.
 reader(Id) -> case kvs:get(writer,Id) of
-  {ok,#writer{id=Feed}} ->
+  {ok,#writer{id=Feed, count=Cn}} ->
     {ok,I} = rocksdb:iterator(ref(), []),
     {ok,_,BERT} = rocksdb:iterator_move(I, {seek,key(Feed)}),
     F = bt(BERT),
-    #reader{id=kvs:seq([],[]),feed=Id,cache={e(1,F),e(2,F)}};
+    #reader{id=kvs:seq([],[]),feed=Id,count=Cn,cache={e(1,F),e(2,F)}};
   {error,_} -> save(#writer{id=Id}), reader(Id) end.
 save(C) -> NC = c4(C,[]), kvs:put(NC), NC.
 
@@ -139,15 +141,15 @@ save(C) -> NC = c4(C,[]), kvs:put(NC), NC.
 add(#writer{args=M}=C) when element(2,M) == [] -> add(si(M,kvs:seq([],[])),C);
 add(#writer{args=M}=C) -> add(M,C).
 
-add(M,#writer{id=Feed,count=S}=C) -> NS=S+1, raw_append(M,Feed), C#writer{cache=M,count=NS}.
+add(M,#writer{id=Feed,count=S}=C) -> NS=S+1, raw_append(M,Feed), C#writer{cache={e(1,M),e(2,M)},count=NS}.
 
 remove(Rec,Feed) ->
    kvs:ensure(#writer{id=Feed}),
-   W = #writer{count=C} = kvs:writer(Feed),
-   {ok,I} = rocksdb:iterator(ref(), []),
+   W = #writer{count=C, cache=Ch} = kvs:writer(Feed),
+   Ch1 = case {e(1,Rec),e(2,Rec)} of Ch -> Ch;_ -> [] end, % need to keep reference for next element
    case kvs:delete(Feed,id(Rec)) of
         ok -> Count = C - 1,
-              kvs:save(W#writer{count = Count, cache = I}),
+              kvs:save(W#writer{count = Count, cache=Ch1}),
               Count;
          _ -> C end.
 
@@ -159,8 +161,8 @@ append(Rec,Feed) ->
    Id = e(2,Rec),
    W = kvs:writer(Feed),
    case kvs:get(Feed,Id) of
-        {ok,_} -> raw_append(Rec,Feed), kvs:save(W#writer{cache=Rec,count=W#writer.count + 1}), Id;
-        {error,_} -> kvs:save(kvs:add(W#writer{args=Rec,cache=Rec})), Id end.
+        {ok,_} -> raw_append(Rec,Feed), kvs:save(W#writer{cache={e(1,Rec),Id},count=W#writer.count + 1}), Id;
+        {error,_} -> kvs:save(kvs:add(W#writer{args=Rec,cache={e(1,Rec),Id}})), Id end.
 
 cut(Feed,Id) ->
     Key    = key(Feed),

+ 5 - 0
test/st_test.exs

@@ -16,6 +16,11 @@ defmodule ST.Test do
     test "al0", kvs, do: assert kvs[:ids] |> Enum.map(&msg(id: &1)) == :kvs.all(:feed)
     test "al1", kvs, do: assert (kvs[:id0] ++ kvs[:id2] ++ kvs[:id1]) |> Enum.map(&msg(id: &1)) == :kvs.all("/crm/personal/Реєстратор А1/in")
 
+    #: real cache {:feed, :msg, id}
+    test "top",  kvs, do: (r0=:kvs.reader(:feed); assert KVS.reader(r0, cache: {:msg, Enum.at(kvs[:ids],0)}, dir: 0) == :kvs.top(r0))
+    test "bot",  kvs, do: (r0=:kvs.reader(:feed); assert KVS.reader(r0, cache: {:msg, Enum.at(kvs[:ids],9)}, dir: 1) == :kvs.bot(r0))
+    
+
     test "take-ø", kvs do
         r = KVS.reader() = :kvs.reader("/empty-feed")
         assert r1 = KVS.reader(feed: "/empty-feed", args: []) = :kvs.take(KVS.reader(r, args: 1))