|
@@ -4,6 +4,7 @@
|
|
|
-include("stream.hrl").
|
|
|
-include("metainfo.hrl").
|
|
|
-export(?STREAM).
|
|
|
+-export([prev/8]).
|
|
|
|
|
|
ref() -> kvs_rocks:ref().
|
|
|
|
|
@@ -38,13 +39,13 @@ prev (#reader{cache=I}=C) ->
|
|
|
drop(#reader{args=N}) when N < 0 -> #reader{};
|
|
|
|
|
|
drop(#reader{args=N,feed=Feed,cache=I}=C) when N == 0 ->
|
|
|
- Key = list_to_binary(lists:concat(["/",io_lib:format("~p",[Feed])])),
|
|
|
+ Key = list_to_binary(lists:concat(["/",kvs_rocks:format(Feed)])),
|
|
|
case rocksdb:iterator_move(I, {seek,Key}) of
|
|
|
{ok,_,Bin} -> C#reader{cache=binary_to_term(Bin,[safe])};
|
|
|
_ -> C#reader{cache=[]} end;
|
|
|
|
|
|
drop(#reader{args=N,feed=Feed,cache=I}=C) when N > 0 ->
|
|
|
- Key = list_to_binary(lists:concat(["/",io_lib:format("~p",[Feed])])),
|
|
|
+ Key = list_to_binary(lists:concat(["/",kvs_rocks:format(Feed)])),
|
|
|
First = rocksdb:iterator_move(I, {seek,Key}),
|
|
|
Term = lists:foldl(
|
|
|
fun (_,{{ok,K,_},{_,X}}) when N > X -> {K,{<<131,106>>,N}};
|
|
@@ -58,11 +59,11 @@ drop(#reader{args=N,feed=Feed,cache=I}=C) when N > 0 ->
|
|
|
lists:seq(0,N)),
|
|
|
C#reader{cache=binary_to_term(element(1,element(2,Term)))}.
|
|
|
|
|
|
-take(#reader{args=N,feed=Feed,cache=I,dir=Dir}=C) ->
|
|
|
- Key = list_to_binary(lists:concat(["/",io_lib:format("~p",[Feed])])),
|
|
|
- First = rocksdb:iterator_move(I, {seek,Key}),
|
|
|
- Res = kvs_rocks:next(I,Key,size(Key),First,[],[],N,0),
|
|
|
- C#reader{args= case Dir of 0 -> Res; 1 -> lists:reverse(Res) end}.
|
|
|
+take(#reader{args=N,feed=Feed,cache=I}=C) ->
|
|
|
+ Key = list_to_binary(lists:concat(["/",kvs_rocks:format(Feed)])),
|
|
|
+ Fir = rocksdb:iterator_move(I, {seek,Key}),
|
|
|
+ Res = kvs_rocks:next(I,Key,size(Key),Fir,[],[],N,0),
|
|
|
+ C#reader{args=Res}.
|
|
|
|
|
|
% new, save, load, up, down, top, bot
|
|
|
|
|
@@ -89,7 +90,7 @@ add(#writer{args=M}=C) -> add(M,C).
|
|
|
|
|
|
add(M,#writer{id=Feed,count=S}=C) -> NS=S+1,
|
|
|
rocksdb:put(ref(),
|
|
|
- <<(list_to_binary(lists:concat(["/",io_lib:format("~p",[Feed]),"/"])))/binary,
|
|
|
+ <<(list_to_binary(lists:concat(["/",kvs_rocks:format(Feed),"/"])))/binary,
|
|
|
(term_to_binary(id(M)))/binary>>, term_to_binary(M), [{sync,true}]),
|
|
|
C#writer{cache=M,count=NS}.
|
|
|
|