|
@@ -4,7 +4,7 @@
|
|
-include("stream.hrl").
|
|
-include("stream.hrl").
|
|
-include("metainfo.hrl").
|
|
-include("metainfo.hrl").
|
|
-export(?STREAM).
|
|
-export(?STREAM).
|
|
--import(kvs_rocks, [key/2, key/1, bt/1, ref/0, seek_it/1, move_it/3]).
|
|
|
|
|
|
+-import(kvs_rocks, [key/2, key/1, bt/1, ref/0, fd/1, seek_it/1, move_it/3]).
|
|
|
|
|
|
% section: kvs_stream prelude
|
|
% section: kvs_stream prelude
|
|
|
|
|
|
@@ -21,7 +21,7 @@ top(#reader{feed=Feed}=C) -> #writer{count=Cn} = writer(Feed), read_it(C#reader{
|
|
bot(#reader{feed=Feed}=C) -> #writer{cache=Ch, count=Cn} = writer(Feed), C#reader{cache=Ch, count=Cn, dir=1}.
|
|
bot(#reader{feed=Feed}=C) -> #writer{cache=Ch, count=Cn} = writer(Feed), C#reader{cache=Ch, count=Cn, dir=1}.
|
|
|
|
|
|
% iterator -> specific feed reader
|
|
% iterator -> specific feed reader
|
|
-read_it(C,{ok,_F,V,H}) -> C#reader{cache={e(1,V),id(V)}, args=lists:reverse(H)}; % real cache {F,e(1,V),id(V)}
|
|
|
|
|
|
+read_it(C,{ok,F,V,H}) -> C#reader{cache={e(1,V),id(V), F}, args=lists:reverse(H)};
|
|
read_it(C,_) -> C.
|
|
read_it(C,_) -> C.
|
|
|
|
|
|
next(#reader{feed=Feed,cache=I}=C) -> read_it(C,move_it(key(Feed,I),key(Feed),next)).
|
|
next(#reader{feed=Feed,cache=I}=C) -> read_it(C,move_it(key(Feed,I),key(Feed),next)).
|
|
@@ -40,6 +40,7 @@ drop(#reader{args=N,feed=Feed,cache=I}=C) -> (take(C#reader{dir=0}))#reader{args
|
|
|
|
|
|
|
|
|
|
take(#reader{pos='end',dir=0}=C) -> C#reader{args=[]}; % 4
|
|
take(#reader{pos='end',dir=0}=C) -> C#reader{args=[]}; % 4
|
|
|
|
+take(#reader{args=N,feed=Feed,cache={T,O,_},dir=0}=C) -> take(C#reader{cache={T,O}});
|
|
take(#reader{args=N,feed=Feed,cache={T,O},dir=0}=C) -> % 1
|
|
take(#reader{args=N,feed=Feed,cache={T,O},dir=0}=C) -> % 1
|
|
Key = key(Feed),
|
|
Key = key(Feed),
|
|
{ok,I} = rocksdb:iterator(ref(), []),
|
|
{ok,I} = rocksdb:iterator(ref(), []),
|
|
@@ -60,6 +61,7 @@ take(#reader{pos='begin',dir=1}=C) -> C#reader{args=[]}; % 4
|
|
take(#reader{pos=0,cache=[],dir=1}=C) -> C#reader{args=[]};
|
|
take(#reader{pos=0,cache=[],dir=1}=C) -> C#reader{args=[]};
|
|
|
|
|
|
% TODO: try to remove lists:reverse and abstract both branches
|
|
% TODO: try to remove lists:reverse and abstract both branches
|
|
|
|
+take(#reader{args=N,feed=Feed,cache={T,O,_},dir=1}=C) -> take(C#reader{cache={T,O}});
|
|
take(#reader{args=N,feed=Feed,cache={T,O},dir=1}=C) -> % 1
|
|
take(#reader{args=N,feed=Feed,cache={T,O},dir=1}=C) -> % 1
|
|
Key = key(Feed),
|
|
Key = key(Feed),
|
|
{ok,I} = rocksdb:iterator(ref(), []),
|
|
{ok,I} = rocksdb:iterator(ref(), []),
|
|
@@ -93,9 +95,9 @@ writer(Id) -> case kvs:get(writer,Id) of {ok,W} -> W; {error,_} -> #writer{id=Id
|
|
reader(Id) -> case kvs:get(writer,Id) of
|
|
reader(Id) -> case kvs:get(writer,Id) of
|
|
{ok,#writer{id=Feed, count=Cn}} ->
|
|
{ok,#writer{id=Feed, count=Cn}} ->
|
|
{ok,I} = rocksdb:iterator(ref(), []),
|
|
{ok,I} = rocksdb:iterator(ref(), []),
|
|
- {ok,_,BERT} = rocksdb:iterator_move(I, {seek,key(Feed)}),
|
|
|
|
|
|
+ {ok,F1,BERT} = rocksdb:iterator_move(I, {seek,key(Feed)}),
|
|
F = bt(BERT),
|
|
F = bt(BERT),
|
|
- #reader{id=kvs:seq([],[]),feed=Id,count=Cn,cache={e(1,F),e(2,F)}};
|
|
|
|
|
|
+ #reader{id=kvs:seq([],[]),feed=Id,count=Cn,cache={e(1,F),e(2,F),fd(F1)}};
|
|
{error,_} -> save(#writer{id=Id}), reader(Id) end.
|
|
{error,_} -> save(#writer{id=Id}), reader(Id) end.
|
|
save(C) -> NC = c4(C,[]), kvs:put(NC), NC.
|
|
save(C) -> NC = c4(C,[]), kvs:put(NC), NC.
|
|
|
|
|
|
@@ -104,7 +106,7 @@ save(C) -> NC = c4(C,[]), kvs:put(NC), NC.
|
|
add(#writer{args=M}=C) when element(2,M) == [] -> add(si(M,kvs:seq([],[])),C);
|
|
add(#writer{args=M}=C) when element(2,M) == [] -> add(si(M,kvs:seq([],[])),C);
|
|
add(#writer{args=M}=C) -> add(M,C).
|
|
add(#writer{args=M}=C) -> add(M,C).
|
|
|
|
|
|
-add(M,#writer{id=Feed,count=S}=C) -> NS=S+1, raw_append(M,Feed), C#writer{cache={e(1,M),e(2,M)},count=NS}.
|
|
|
|
|
|
+add(M,#writer{id=Feed,count=S}=C) -> NS=S+1, raw_append(M,Feed), C#writer{cache={e(1,M),e(2,M),fd(Feed)},count=NS}.
|
|
|
|
|
|
remove(Rec,Feed) ->
|
|
remove(Rec,Feed) ->
|
|
kvs:ensure(#writer{id=Feed}),
|
|
kvs:ensure(#writer{id=Feed}),
|
|
@@ -112,7 +114,7 @@ remove(Rec,Feed) ->
|
|
Ch1 = case {e(1,Rec),e(2,Rec)} of Ch -> Ch;_ -> [] end, % need to keep reference for next element
|
|
Ch1 = case {e(1,Rec),e(2,Rec)} of Ch -> Ch;_ -> [] end, % need to keep reference for next element
|
|
case kvs:delete(Feed,id(Rec)) of
|
|
case kvs:delete(Feed,id(Rec)) of
|
|
ok -> Count = C - 1,
|
|
ok -> Count = C - 1,
|
|
- kvs:save(W#writer{count = Count, cache=Ch1}),
|
|
|
|
|
|
+ save(W#writer{count = Count, cache=Ch1}),
|
|
Count;
|
|
Count;
|
|
_ -> C end.
|
|
_ -> C end.
|
|
|
|
|
|
@@ -122,10 +124,10 @@ raw_append(M,Feed) ->
|
|
append(Rec,Feed) ->
|
|
append(Rec,Feed) ->
|
|
kvs:ensure(#writer{id=Feed}),
|
|
kvs:ensure(#writer{id=Feed}),
|
|
Id = e(2,Rec),
|
|
Id = e(2,Rec),
|
|
- W = kvs:writer(Feed),
|
|
|
|
|
|
+ W = writer(Feed),
|
|
case kvs:get(Feed,Id) of
|
|
case kvs:get(Feed,Id) of
|
|
- {ok,_} -> raw_append(Rec,Feed), kvs:save(W#writer{cache={e(1,Rec),Id},count=W#writer.count + 1}), Id;
|
|
|
|
- {error,_} -> kvs:save(kvs:add(W#writer{args=Rec,cache={e(1,Rec),Id}})), Id end.
|
|
|
|
|
|
+ {ok,_} -> raw_append(Rec,Feed), save(W#writer{cache={e(1,Rec),Id,fd(Feed)},count=W#writer.count + 1}), Id;
|
|
|
|
+ {error,_} -> save(add(W#writer{args=Rec})), Id end.
|
|
|
|
|
|
cut(Feed,Id) ->
|
|
cut(Feed,Id) ->
|
|
Key = key(Feed),
|
|
Key = key(Feed),
|