|
@@ -4,34 +4,34 @@
|
|
|
-include("stream.hrl").
|
|
|
-include("metainfo.hrl").
|
|
|
-export(?STREAM).
|
|
|
--export([prev/8,ref/0,feed_key/2]).
|
|
|
+-export([ref/0,feed_key/2]).
|
|
|
|
|
|
bt(X) -> kvs_rocks:bt(X).
|
|
|
ref() -> kvs_rocks:ref().
|
|
|
|
|
|
% section: kvs_stream prelude
|
|
|
|
|
|
-se(X,Y,Z) -> setelement(X,Y,Z).
|
|
|
-e(X,Y) -> element(X,Y).
|
|
|
+se(X,Y,Z) -> setelement(X,Y,Z).
|
|
|
+e(X,Y) -> element(X,Y).
|
|
|
c4(R,V) -> se(#reader.args, R, V).
|
|
|
si(M,T) -> se(#it.id, M, T).
|
|
|
-id(T) -> e(#it.id, T).
|
|
|
+id(T) -> e(#it.id, T).
|
|
|
|
|
|
% section: next, prev
|
|
|
|
|
|
-top (#reader{}=C) -> C#reader{dir=1}.
|
|
|
-bot (#reader{}=C) -> C#reader{dir=0}.
|
|
|
+top(#reader{}=C) -> C#reader{dir=1}.
|
|
|
+bot(#reader{}=C) -> C#reader{dir=0}.
|
|
|
|
|
|
-next (#reader{cache=[]}) -> {error,empty};
|
|
|
-next (#reader{feed=Feed,cache=I}=C) when is_tuple(I) ->
|
|
|
+next(#reader{cache=[]}) -> {error,empty};
|
|
|
+next(#reader{feed=Feed,cache=I}=C) when is_tuple(I) ->
|
|
|
Key = feed_key(I,Feed),
|
|
|
rocksdb:iterator_move(I, {seek,Key}),
|
|
|
case rocksdb:iterator_move(I, next) of
|
|
|
{ok,_,Bin} -> C#reader{cache=bt(Bin)};
|
|
|
{error,Reason} -> {error,Reason} end.
|
|
|
|
|
|
-prev (#reader{cache=[]}) -> {error,empty};
|
|
|
-prev (#reader{cache=I,id=Feed}=C) when is_tuple(I) ->
|
|
|
+prev(#reader{cache=[]}) -> {error,empty};
|
|
|
+prev(#reader{cache=I,id=Feed}=C) when is_tuple(I) ->
|
|
|
Key = feed_key(I,Feed),
|
|
|
rocksdb:iterator_move(I, {seek,Key}),
|
|
|
case rocksdb:iterator_move(I, prev) of
|
|
@@ -41,7 +41,6 @@ prev (#reader{cache=I,id=Feed}=C) when is_tuple(I) ->
|
|
|
% section: take, drop
|
|
|
|
|
|
drop(#reader{args=N}) when N < 0 -> #reader{};
|
|
|
-
|
|
|
drop(#reader{args=N,feed=Feed,cache=I}=C) when N == 0 ->
|
|
|
Key = list_to_binary(lists:concat(["/",kvs_rocks:format(Feed)])),
|
|
|
case rocksdb:iterator_move(I, {seek,Key}) of
|
|
@@ -97,8 +96,7 @@ take(#reader{args=N,feed=Feed,cache={T,O},dir=1}=C) -> % 1
|
|
|
{[H],A} when element(2,KK) == O -> C#reader{args=Res,pos=Last,cache={e(1,H),e(2,H)}}; % 2
|
|
|
{[H|X],A} when A < N - 1 orelse N == -1 -> [HX|_] = Res, C#reader{args=Res,cache={e(1,HX),e(2,HX)},pos=Last};
|
|
|
{[H|X],A} when A == N -> [HX|TL] = Res, C#reader{args=[bt(BERT)|X],cache={e(1,HX),e(2,HX)}};
|
|
|
- {[H|X],_} -> [HX|TL] = Res, C#reader{args=lists:reverse(TL),cache={e(1,HX),e(2,HX)}}
|
|
|
- end.
|
|
|
+ {[H|X],_} -> [HX|TL] = Res, C#reader{args=lists:reverse(TL),cache={e(1,HX),e(2,HX)}} end.
|
|
|
|
|
|
last(KK,O,Atom) ->
|
|
|
Last = case KK of
|
|
@@ -109,13 +107,13 @@ last(KK,O,Atom) ->
|
|
|
|
|
|
% new, save, load, up, down, top, bot
|
|
|
|
|
|
-load_reader (Id) ->
|
|
|
+load_reader(Id) ->
|
|
|
case kvs:get(reader,Id) of
|
|
|
{ok,#reader{}=C} -> C;
|
|
|
_ -> #reader{id=[]} end.
|
|
|
|
|
|
-writer (Id) -> case kvs:get(writer,Id) of {ok,W} -> W; {error,_} -> #writer{id=Id} end.
|
|
|
-reader (Id) ->
|
|
|
+writer(Id) -> case kvs:get(writer,Id) of {ok,W} -> W; {error,_} -> #writer{id=Id} end.
|
|
|
+reader(Id) ->
|
|
|
case kvs:get(writer,Id) of
|
|
|
{ok,#writer{id=Feed}} ->
|
|
|
Key = list_to_binary(lists:concat(["/",kvs_rocks:format(Feed)])),
|
|
@@ -124,19 +122,17 @@ reader (Id) ->
|
|
|
F = bt(BERT),
|
|
|
#reader{id=kvs:seq([],[]),feed=Id,cache={e(1,F),e(2,F)}};
|
|
|
{error,_} -> #reader{} end.
|
|
|
-save (C) -> NC = c4(C,[]), kvs:put(NC), NC.
|
|
|
+save(C) -> NC = c4(C,[]), kvs:put(NC), NC.
|
|
|
|
|
|
% add
|
|
|
|
|
|
add(#writer{args=M}=C) when element(2,M) == [] -> add(si(M,kvs:seq([],[])),C);
|
|
|
add(#writer{args=M}=C) -> add(M,C).
|
|
|
|
|
|
-add(M,#writer{id=Feed,count=S}=C) -> NS=S+1,
|
|
|
- raw_append(M,Feed),
|
|
|
- C#writer{cache=M,count=NS}.
|
|
|
+add(M,#writer{id=Feed,count=S}=C) -> NS=S+1, raw_append(M,Feed), C#writer{cache=M,count=NS}.
|
|
|
|
|
|
-feed_key(M,Feed) -> <<(list_to_binary(lists:concat(["/",kvs_rocks:format(Feed),"/"])))/binary,(term_to_binary(id(M)))/binary>>.
|
|
|
-raw_append(M,Feed) -> rocksdb:put(ref(), feed_key(M,Feed), term_to_binary(M), [{sync,true}]).
|
|
|
+feed_key(M,Feed) -> <<(list_to_binary(lists:concat(["/",kvs_rocks:format(Feed),"/"])))/binary,
|
|
|
+ (term_to_binary(id(M)))/binary>>.
|
|
|
|
|
|
remove(Rec,Feed) ->
|
|
|
kvs:ensure(#writer{id=Feed}),
|
|
@@ -148,29 +144,21 @@ remove(Rec,Feed) ->
|
|
|
Count;
|
|
|
_ -> C end.
|
|
|
|
|
|
+raw_append(M,Feed) ->
|
|
|
+ rocksdb:put(ref(), feed_key(M,Feed), term_to_binary(M), [{sync,true}]).
|
|
|
+
|
|
|
append(Rec,Feed) ->
|
|
|
kvs:ensure(#writer{id=Feed}),
|
|
|
Id = element(2,Rec),
|
|
|
W = kvs:writer(Feed),
|
|
|
case kvs:get(Feed,Id) of
|
|
|
- {ok,_} -> raw_append(Rec,Feed), kvs:save(W#writer{cache=Rec,count=W#writer.count + 1}), Id;
|
|
|
+ {ok,_} -> raw_append(Rec,Feed), kvs:save(W#writer{cache=Rec,count=W#writer.count + 1}), Id;
|
|
|
{error,_} -> kvs:save(kvs:add(W#writer{args=Rec,cache=Rec})), Id end.
|
|
|
|
|
|
-prev(_,_,_,_,_,_,N,C) when C == N -> C;
|
|
|
-prev(I,Key,S,{ok,A,X},_,T,N,C) -> prev(I,Key,S,A,X,T,N,C);
|
|
|
-prev(_,___,_,{error,_},_,_,_,C) -> C;
|
|
|
-prev(I,Key,S,A,_,_,N,C) when size(A) > S ->
|
|
|
- case binary:part(A,0,S) of Key ->
|
|
|
- rocksdb:delete(ref(), A, []),
|
|
|
- Next = rocksdb:iterator_move(I, prev),
|
|
|
- prev(I,Key, S, Next, [], A, N, C + 1);
|
|
|
- _ -> C end;
|
|
|
-prev(_,_,_,_,_,_,_,C) -> C.
|
|
|
-
|
|
|
cut(Feed,Id) ->
|
|
|
Key = list_to_binary(lists:concat(["/",kvs_rocks:format(Feed),"/"])),
|
|
|
A = <<Key/binary,(term_to_binary(Id))/binary>>,
|
|
|
{ok,I} = rocksdb:iterator(ref(), []),
|
|
|
case rocksdb:iterator_move(I, {seek,A}) of
|
|
|
- {ok,A,X} -> {ok,prev(I,Key,size(Key),A,X,[],-1,0)};
|
|
|
+ {ok,A,X} -> {ok,kvs_rocks:cut(I,Key,size(Key),A,X,[],-1,0)};
|
|
|
_ -> {error,not_found} end.
|