|
@@ -6,6 +6,7 @@
|
|
|
-export(?STREAM).
|
|
|
-export([prev/8,ref/0,feed_key/2]).
|
|
|
|
|
|
+bt(X) -> kvs_rocks:bt(X).
|
|
|
ref() -> kvs_rocks:ref().
|
|
|
|
|
|
% section: kvs_stream prelude
|
|
@@ -26,11 +27,11 @@ next (#reader{feed=Feed,cache=I}=C) when is_tuple(I) ->
|
|
|
Key = feed_key(I,Feed),
|
|
|
rocksdb:iterator_move(I, {seek,Key}),
|
|
|
case rocksdb:iterator_move(I, next) of
|
|
|
- {ok,_,Bin} -> C#reader{cache=binary_to_term(Bin,[safe])};
|
|
|
+ {ok,_,Bin} -> C#reader{cache=bt(Bin)};
|
|
|
{error,Reason} -> {error,Reason} end;
|
|
|
next (#reader{cache=I}=C) when is_reference(I) ->
|
|
|
case rocksdb:iterator_move(I, next) of
|
|
|
- {ok,_,Bin} -> C#reader{cache=binary_to_term(Bin,[safe])};
|
|
|
+ {ok,_,Bin} -> C#reader{cache=bt(Bin)};
|
|
|
{error,Reason} -> {error,Reason} end.
|
|
|
|
|
|
prev (#reader{cache=[]}) -> {error,empty};
|
|
@@ -38,11 +39,11 @@ prev (#reader{cache=I,id=Feed}=C) when is_tuple(I) ->
|
|
|
Key = feed_key(I,Feed),
|
|
|
rocksdb:iterator_move(I, {seek,Key}),
|
|
|
case rocksdb:iterator_move(I, prev) of
|
|
|
- {ok,_,Bin} -> C#reader{cache=binary_to_term(Bin,[safe])};
|
|
|
+ {ok,_,Bin} -> C#reader{cache=bt(Bin)};
|
|
|
{error,Reason} -> {error,Reason} end;
|
|
|
prev (#reader{cache=I}=C) when is_reference(I) ->
|
|
|
case rocksdb:iterator_move(I, prev) of
|
|
|
- {ok,_,Bin} -> C#reader{cache=binary_to_term(Bin,[safe])};
|
|
|
+ {ok,_,Bin} -> C#reader{cache=bt(Bin)};
|
|
|
{error,Reason} -> {error,Reason} end.
|
|
|
|
|
|
% section: take, drop
|
|
@@ -52,7 +53,7 @@ drop(#reader{args=N}) when N < 0 -> #reader{};
|
|
|
drop(#reader{args=N,feed=Feed,cache=I}=C) when N == 0 ->
|
|
|
Key = list_to_binary(lists:concat(["/",kvs_rocks:format(Feed)])),
|
|
|
case rocksdb:iterator_move(I, {seek,Key}) of
|
|
|
- {ok,_,Bin} -> C#reader{cache=binary_to_term(Bin,[safe])};
|
|
|
+ {ok,_,Bin} -> C#reader{cache=bt(Bin)};
|
|
|
_ -> C#reader{cache=[]} end;
|
|
|
|
|
|
drop(#reader{args=N,feed=Feed,cache=I}=C) when N > 0 ->
|
|
@@ -68,30 +69,15 @@ drop(#reader{args=N,feed=Feed,cache=I}=C) when N > 0 ->
|
|
|
end,
|
|
|
{First,{<<131,106>>,1}},
|
|
|
lists:seq(0,N)),
|
|
|
- C#reader{cache=binary_to_term(element(1,element(2,Term)))}.
|
|
|
+ C#reader{cache=bt(element(1,element(2,Term)))}.
|
|
|
|
|
|
take(#reader{args=N,feed=Feed,cache={T,O}}=C) ->
|
|
|
Key = list_to_binary(lists:concat(["/",kvs_rocks:format(Feed)])),
|
|
|
{ok,I} = rocksdb:iterator(ref(), []),
|
|
|
{ok,K,BERT} = rocksdb:iterator_move(I, {seek,feed_key({T,O},Feed)}),
|
|
|
- Fir = binary_to_term(BERT),
|
|
|
- Res = kvs_rocks:next(I,Key,size(Key),K,BERT,[],N+1,0),
|
|
|
- io:format("Fir: ~p~n",[Fir]),
|
|
|
- io:format("Res: ~p~n",[Res]),
|
|
|
- case {Res,length(Res) < N + 1} of
|
|
|
- {[],_} -> C#reader{args=[],cache=I};
|
|
|
- {[H|X],false} -> C#reader{args=X,cache={e(1,H),e(2,H)}};
|
|
|
- {[H|X],true} -> C#reader{args=Res,cache=[]} end;
|
|
|
-
|
|
|
-take(#reader{args=N,feed=Feed,cache=I}=C) when is_reference(I) ->
|
|
|
- Key = list_to_binary(lists:concat(["/",kvs_rocks:format(Feed)])),
|
|
|
- {ok,K,BERT} = rocksdb:iterator_move(I, {seek,Key}),
|
|
|
- Fir = binary_to_term(BERT),
|
|
|
- Res = kvs_rocks:next(I,Key,size(Key),K,BERT,[],N+1,0),
|
|
|
- io:format("Fir: ~p~n",[Fir]),
|
|
|
- io:format("Res: ~p~n",[Res]),
|
|
|
- case {Res,length(Res) < N + 1} of
|
|
|
- {[],_} -> C#reader{args=[],cache=I};
|
|
|
+ Res = kvs_rocks:next(I,Key,size(Key),K,BERT,[],case N of -1 -> -1; J -> J + 1 end,0),
|
|
|
+ case {Res,length(Res) < N + 1 orelse N == -1} of
|
|
|
+ {[],_} -> C#reader{args=[],cache=[]};
|
|
|
{[H|X],false} -> C#reader{args=X,cache={e(1,H),e(2,H)}};
|
|
|
{[H|X],true} -> C#reader{args=Res,cache=[]} end.
|
|
|
|
|
@@ -99,7 +85,7 @@ take(#reader{args=N,feed=Feed,cache=I}=C) when is_reference(I) ->
|
|
|
|
|
|
load_reader (Id) ->
|
|
|
case kvs:get(reader,Id) of
|
|
|
- {ok,#reader{}=C} -> C#reader{cache=element(2,rocksdb:iterator(ref(),[]))};
|
|
|
+ {ok,#reader{}=C} -> C;
|
|
|
_ -> #reader{id=[]} end.
|
|
|
|
|
|
writer (Id) -> case kvs:get(writer,Id) of {ok,W} -> W; {error,_} -> #writer{id=Id} end.
|
|
@@ -109,7 +95,7 @@ reader (Id) ->
|
|
|
Key = list_to_binary(lists:concat(["/",kvs_rocks:format(Feed)])),
|
|
|
{ok,I} = rocksdb:iterator(ref(), []),
|
|
|
{ok,K,BERT} = rocksdb:iterator_move(I, {seek,Key}),
|
|
|
- F = binary_to_term(BERT),
|
|
|
+ F = bt(BERT),
|
|
|
#reader{id=kvs:seq([],[]),feed=Id,cache={e(1,F),e(2,F)}};
|
|
|
{error,_} -> #reader{} end.
|
|
|
save (C) -> NC = c4(C,[]), kvs:put(NC), NC.
|