|
@@ -4,7 +4,7 @@
|
|
-include("stream.hrl").
|
|
-include("stream.hrl").
|
|
-include("metainfo.hrl").
|
|
-include("metainfo.hrl").
|
|
-export(?STREAM).
|
|
-export(?STREAM).
|
|
--import(kvs_rocks, [key/2, key/1, bt/1, ref/0, seek_it/1]).
|
|
|
|
|
|
+-import(kvs_rocks, [key/2, key/1, bt/1, ref/0, seek_it/1, move_it/3]).
|
|
|
|
|
|
% section: kvs_stream prelude
|
|
% section: kvs_stream prelude
|
|
|
|
|
|
@@ -20,33 +20,12 @@ feed(Feed) -> take((reader(Feed))#reader{args=-1}).
|
|
top(#reader{feed=Feed}=C) -> #writer{count=Cn} = writer(Feed), read_it(C#reader{count=Cn},seek_it(key(Feed))).
|
|
top(#reader{feed=Feed}=C) -> #writer{count=Cn} = writer(Feed), read_it(C#reader{count=Cn},seek_it(key(Feed))).
|
|
bot(#reader{feed=Feed}=C) -> #writer{cache=Ch, count=Cn} = writer(Feed), C#reader{cache=Ch, count=Cn, dir=1}.
|
|
bot(#reader{feed=Feed}=C) -> #writer{cache=Ch, count=Cn} = writer(Feed), C#reader{cache=Ch, count=Cn, dir=1}.
|
|
|
|
|
|
-% handle -> seek -> move
|
|
|
|
-move_it(Key,Dir) ->
|
|
|
|
- Seek = fun(F,{ok,H}) -> {F(H,{seek,Key}),H};
|
|
|
|
- (F,{{ok,_,_},H}) -> F(H,Dir);
|
|
|
|
- (F,{{ok,_}, H}) -> F(H,Dir);
|
|
|
|
- (_,{error,Error}) -> {error,Error};
|
|
|
|
- (_,{{error,Error},_}) -> {error,Error};
|
|
|
|
- (F,{R,O}) -> F(R,O) end,
|
|
|
|
-
|
|
|
|
- case lists:foldl(Seek, {ref(),[]},
|
|
|
|
- [fun rocksdb:iterator/2, fun rocksdb:iterator_move/2, fun rocksdb:iterator_move/2]) of
|
|
|
|
- {ok,_,Bin} -> {ok,bt(Bin)};
|
|
|
|
- {error, Error} -> {error,Error}
|
|
|
|
- end.
|
|
|
|
-
|
|
|
|
% iterator -> specific feed reader
|
|
% iterator -> specific feed reader
|
|
-read_it(C,{ok,F,V,H}) -> C#reader{cache={e(1,V),id(V)}, args=lists:reverse(H)}; % real cache {F,e(1,V),id(V)}
|
|
|
|
|
|
+read_it(C,{ok,_F,V,H}) -> C#reader{cache={e(1,V),id(V)}, args=lists:reverse(H)}; % real cache {F,e(1,V),id(V)}
|
|
read_it(C,_) -> C.
|
|
read_it(C,_) -> C.
|
|
-read_it(C, Feed, Move) ->
|
|
|
|
- case Move of
|
|
|
|
- {ok, Bin} when element(1,Bin) =:= Feed -> C#reader{cache=Bin};
|
|
|
|
- {ok,_} -> C;
|
|
|
|
- {error,_} -> C
|
|
|
|
- end.
|
|
|
|
-
|
|
|
|
-next(#reader{feed=Feed,cache=I}=C) -> read_it(C,Feed,move_it(key(Feed,I),next)).
|
|
|
|
-prev(#reader{cache=I,feed=Feed}=C) -> read_it(C,Feed,move_it(key(Feed,I),prev)).
|
|
|
|
|
|
+
|
|
|
|
+next(#reader{feed=Feed,cache=I}=C) -> read_it(C,move_it(key(Feed,I),key(Feed),next)).
|
|
|
|
+prev(#reader{cache=I,feed=Feed}=C) -> read_it(C,move_it(key(Feed,I),key(Feed),prev)).
|
|
|
|
|
|
% section: take, drop
|
|
% section: take, drop
|
|
|
|
|