|
@@ -4,7 +4,7 @@
|
|
|
-include("stream.hrl").
|
|
|
-include("metainfo.hrl").
|
|
|
-export(?STREAM).
|
|
|
--import(kvs_rocks, [key/2, key/1, bt/1, ref/0, fd/1, seek_it/1, move_it/3]).
|
|
|
+-import(kvs_rocks, [key/2, key/1, bt/1, ref/0, fd/1, seek_it/1, move_it/3, take_it/4]).
|
|
|
|
|
|
% section: kvs_stream prelude
|
|
|
|
|
@@ -14,77 +14,29 @@ c4(R,V) -> se(#reader.args, R, V).
|
|
|
si(M,T) -> se(#it.id, M, T).
|
|
|
id(T) -> e(#it.id, T).
|
|
|
|
|
|
-% section: next, prev
|
|
|
-feed(Feed) -> #reader{args=Args} = take((reader(Feed))#reader{args=-1}), Args.
|
|
|
+k(F,[]) -> key(F);
|
|
|
+k(_,{_,Id,SF}) -> key(SF,Id).
|
|
|
|
|
|
-top(#reader{feed=Feed}=C) -> #writer{count=Cn} = writer(Feed), read_it(C#reader{count=Cn},seek_it(key(Feed))).
|
|
|
-bot(#reader{feed=Feed}=C) -> #writer{cache=Ch, count=Cn} = writer(Feed), C#reader{cache=Ch, count=Cn, dir=1}.
|
|
|
-
|
|
|
-% iterator -> specific feed reader
|
|
|
-read_it(C,{ok,F,V,H}) -> C#reader{cache={e(1,V),id(V), F}, args=lists:reverse(H)};
|
|
|
+read_it(C,{ok,F,V,H}) -> C#reader{cache={e(1,V),id(V),F}, args=lists:reverse(H)};
|
|
|
read_it(C,_) -> C.
|
|
|
|
|
|
-next(#reader{feed=Feed,cache=I}=C) -> read_it(C,move_it(key(Feed,I),key(Feed),next)).
|
|
|
-prev(#reader{cache=I,feed=Feed}=C) -> read_it(C,move_it(key(Feed,I),key(Feed),prev)).
|
|
|
-
|
|
|
-% section: take, drop
|
|
|
-
|
|
|
+top(#reader{feed=Feed}=C) -> #writer{count=Cn} = writer(Feed), read_it(C#reader{count=Cn},seek_it(key(Feed))).
|
|
|
+bot(#reader{feed=Feed}=C) -> #writer{cache=Ch, count=Cn} = writer(Feed), C#reader{cache=Ch, count=Cn}.
|
|
|
+next(#reader{feed=Feed,cache=I}=C) -> read_it(C,move_it(k(Feed,I),key(Feed),next)).
|
|
|
+prev(#reader{cache=I,feed=Feed}=C) -> read_it(C,move_it(k(Feed,I),key(Feed),prev)).
|
|
|
+take(#reader{args=N,feed=Feed,cache=I,dir=1}=C) -> read_it(C,take_it(k(Feed,I),key(Feed),prev,N));
|
|
|
+take(#reader{args=N,feed=Feed,cache=I,dir=_}=C) -> read_it(C,take_it(k(Feed,I),key(Feed),next,N)).
|
|
|
drop(#reader{args=N}=C) when N =< 0 -> C;
|
|
|
-drop(#reader{args=N,feed=Feed,cache=I}=C) -> (take(C#reader{dir=0}))#reader{args=[]}.
|
|
|
-
|
|
|
-% 1. Курсор всегда выставлен на следущий невычитанный элемент
|
|
|
-% 2. Если после вычитки курсор указывает на недавно вычитаный элемент -- это признак конца списка
|
|
|
-% 3. Если результат вычитки меньше требуемого значения -- это признак конца списка
|
|
|
-% 4. Если курсор установлен в конец списка и уже вернул его последний элемент
|
|
|
-% то результат вычитки будет равным пустому списку
|
|
|
-
|
|
|
+drop(#reader{}=C) -> (take(C#reader{dir=0}))#reader{args=[]}.
|
|
|
|
|
|
-take(#reader{pos='end',dir=0}=C) -> C#reader{args=[]}; % 4
|
|
|
-take(#reader{args=N,feed=Feed,cache={T,O,_},dir=0}=C) -> take(C#reader{cache={T,O}});
|
|
|
-take(#reader{args=N,feed=Feed,cache={T,O},dir=0}=C) -> % 1
|
|
|
- Key = key(Feed),
|
|
|
- {ok,I} = rocksdb:iterator(ref(), []),
|
|
|
- {ok,K,BERT} = rocksdb:iterator_move(I, {seek,key(Feed,{T,O})}),
|
|
|
- {KK,Res} = kvs_rocks:next2(I,Key,size(Key),K,BERT,[],case N of -1 -> -1; J -> J + 1 end,0),
|
|
|
- Last = last(KK,O,'end'),
|
|
|
- case {Res,length(Res)} of
|
|
|
- {[],_} -> C#reader{args=[],cache=[]};
|
|
|
- {[H], _A} when element(2,KK) == O -> C#reader{args=Res,pos=Last,cache={e(1,H),e(2,H)}}; % 2
|
|
|
- {[H|_X],A} when A < N + 1 orelse N == -1 -> C#reader{args=Res,cache={e(1,H),e(2,H)},pos=Last};
|
|
|
- {[H| X],A} when A == N -> C#reader{args=[bt(BERT)|X],cache={e(1,H),e(2,H)},pos=Last};
|
|
|
- {[H|_X],A} when A =< N andalso Last == 'end'-> C#reader{args=Res,cache={e(1,H),e(2,H)},pos=Last};
|
|
|
- {[H| X],_} -> C#reader{args=X,cache={e(1,H),e(2,H)}} end;
|
|
|
-
|
|
|
-
|
|
|
-take(#reader{pos=0,dir=0}=C) -> C#reader{pos='begin',args=[]};
|
|
|
-take(#reader{pos='begin',dir=1}=C) -> C#reader{args=[]}; % 4
|
|
|
-take(#reader{pos=0,cache=[],dir=1}=C) -> C#reader{args=[]};
|
|
|
-
|
|
|
-% TODO: try to remove lists:reverse and abstract both branches
|
|
|
-take(#reader{args=N,feed=Feed,cache={T,O,_},dir=1}=C) -> take(C#reader{cache={T,O}});
|
|
|
-take(#reader{args=N,feed=Feed,cache={T,O},dir=1}=C) -> % 1
|
|
|
- Key = key(Feed),
|
|
|
- {ok,I} = rocksdb:iterator(ref(), []),
|
|
|
- {ok,K,BERT} = rocksdb:iterator_move(I, {seek,key(Feed,{T,O})}),
|
|
|
- {KK,Res} = kvs_rocks:prev2(I,Key,size(Key),K,BERT,[],case N of -1 -> -1; J -> J + 1 end,0),
|
|
|
- Last = last(KK,O,'begin'),
|
|
|
- case {lists:reverse(Res),length(Res)} of
|
|
|
- {[],_} -> C#reader{args=[],cache=[]};
|
|
|
- {[H],_} when element(2,KK) == O -> C#reader{args=Res,pos=Last,cache={e(1,H),e(2,H)}}; % 2
|
|
|
- {[_|_],A} when A < N - 1 orelse N == -1 -> [HX|_] = Res, C#reader{args=Res,cache={e(1,HX),e(2,HX)},pos=Last};
|
|
|
- {[_|X],A} when A == N -> [HX|_] = Res, C#reader{args=[bt(BERT)|X],cache={e(1,HX),e(2,HX)},pos=Last};
|
|
|
- {[_|_],A} when A =< N andalso Last == 'begin'-> [HX|_] = Res, C#reader{args=lists:reverse(Res),cache={e(1,HX),e(2,HX)},pos=Last};
|
|
|
- {[_|_],_} -> [HX|TL] = Res, C#reader{args=lists:reverse(TL),cache={e(1,HX),e(2,HX)}} end.
|
|
|
-
|
|
|
-last(KK,O,Atom) ->
|
|
|
- Last = case KK of
|
|
|
- [] -> Atom;
|
|
|
- _ when element(2,KK) == O -> Atom;
|
|
|
- _ -> 0
|
|
|
- end,
|
|
|
- Last.
|
|
|
-
|
|
|
-% new, save, load, up, down, top, bot
|
|
|
+feed(Feed) -> feed(fun(#reader{}=R) -> take(R#reader{args=4}) end, top(reader(key(Feed))),[]).
|
|
|
+feed(F,#reader{cache=C1}=R,Acc) ->
|
|
|
+ #reader{args=A, cache=Ch, feed=Feed} = R1 = F(R),
|
|
|
+ case Ch of
|
|
|
+ C1 -> lists:reverse(Acc ++ A);
|
|
|
+ {_,_,K} when binary_part(K,{0,byte_size(Feed)}) == Feed -> feed(F, R1, Acc ++ A);
|
|
|
+ _ -> lists:reverse(Acc ++ A)
|
|
|
+ end.
|
|
|
|
|
|
load_reader(Id) ->
|
|
|
case kvs:get(reader,Id) of
|
|
@@ -94,10 +46,7 @@ load_reader(Id) ->
|
|
|
writer(Id) -> case kvs:get(writer,Id) of {ok,W} -> W; {error,_} -> #writer{id=Id} end.
|
|
|
reader(Id) -> case kvs:get(writer,Id) of
|
|
|
{ok,#writer{id=Feed, count=Cn}} ->
|
|
|
- {ok,I} = rocksdb:iterator(ref(), []),
|
|
|
- {ok,F1,BERT} = rocksdb:iterator_move(I, {seek,key(Feed)}),
|
|
|
- F = bt(BERT),
|
|
|
- #reader{id=kvs:seq([],[]),feed=Id,count=Cn,cache={e(1,F),e(2,F),fd(F1)}};
|
|
|
+ read_it(#reader{id=kvs:seq([],[]),feed=key(Feed),count=Cn},seek_it(key(Feed)));
|
|
|
{error,_} -> save(#writer{id=Id}), reader(Id) end.
|
|
|
save(C) -> NC = c4(C,[]), kvs:put(NC), NC.
|
|
|
|
|
@@ -106,12 +55,12 @@ save(C) -> NC = c4(C,[]), kvs:put(NC), NC.
|
|
|
add(#writer{args=M}=C) when element(2,M) == [] -> add(si(M,kvs:seq([],[])),C);
|
|
|
add(#writer{args=M}=C) -> add(M,C).
|
|
|
|
|
|
-add(M,#writer{id=Feed,count=S}=C) -> NS=S+1, raw_append(M,Feed), C#writer{cache={e(1,M),e(2,M),fd(Feed)},count=NS}.
|
|
|
+add(M,#writer{id=Feed,count=S}=C) -> NS=S+1, raw_append(M,Feed), C#writer{cache={e(1,M),e(2,M),Feed},count=NS}.
|
|
|
|
|
|
remove(Rec,Feed) ->
|
|
|
kvs:ensure(#writer{id=Feed}),
|
|
|
W = #writer{count=C, cache=Ch} = kvs:writer(Feed),
|
|
|
- Ch1 = case {e(1,Rec),e(2,Rec)} of Ch -> Ch;_ -> [] end, % need to keep reference for next element
|
|
|
+ Ch1 = case {e(1,Rec),e(2,Rec),Feed} of Ch -> Ch;_ -> [] end, % need to keep reference for next element
|
|
|
case kvs:delete(Feed,id(Rec)) of
|
|
|
ok -> Count = C - 1,
|
|
|
save(W#writer{count = Count, cache=Ch1}),
|
|
@@ -126,7 +75,7 @@ append(Rec,Feed) ->
|
|
|
Id = e(2,Rec),
|
|
|
W = writer(Feed),
|
|
|
case kvs:get(Feed,Id) of
|
|
|
- {ok,_} -> raw_append(Rec,Feed), save(W#writer{cache={e(1,Rec),Id,fd(Feed)},count=W#writer.count + 1}), Id;
|
|
|
+ {ok,_} -> raw_append(Rec,Feed), save(W#writer{cache={e(1,Rec),Id,Feed},count=W#writer.count + 1}), Id;
|
|
|
{error,_} -> save(add(W#writer{args=Rec})), Id end.
|
|
|
|
|
|
cut(Feed,Id) ->
|