|
@@ -56,33 +56,24 @@ save(C) -> NC = c4(C,[]), kvs:put(NC), NC.
|
|
|
add(#writer{args=M}=C) when element(2,M) == [] -> add(si(M,kvs:seq([],[])),C);
|
|
|
add(#writer{args=M}=C) -> add(M,C).
|
|
|
|
|
|
-add(M,#writer{id=Feed,count=S}=C) -> NS=S+1, raw_append(M,Feed), C#writer{cache={e(1,M),e(2,M),Feed},count=NS}.
|
|
|
+add(M,#writer{id=Feed,count=S}=C) -> NS=S+1, raw_append(M,Feed), C#writer{cache={e(1,M),e(2,M),key(Feed)},count=NS}.
|
|
|
|
|
|
remove(Rec,Feed) ->
|
|
|
kvs:ensure(#writer{id=Feed}),
|
|
|
W = #writer{count=C, cache=Ch} = kvs:writer(Feed),
|
|
|
- Ch1 = case {e(1,Rec),e(2,Rec),Feed} of Ch -> Ch;_ -> [] end, % need to keep reference for next element
|
|
|
+ Ch1 = case {e(1,Rec),e(2,Rec),key(Feed)} of Ch -> Ch;_ -> [] end, % need to keep reference for next element
|
|
|
case kvs:delete(Feed,id(Rec)) of
|
|
|
ok -> Count = C - 1,
|
|
|
save(W#writer{count = Count, cache=Ch1}),
|
|
|
Count;
|
|
|
_ -> C end.
|
|
|
|
|
|
-raw_append(M,Feed) ->
|
|
|
- rocksdb:put(ref(), key(Feed,M), term_to_binary(M), [{sync,true}]).
|
|
|
+raw_append(M,Feed) -> rocksdb:put(ref(), key(Feed,M), term_to_binary(M), [{sync,true}]).
|
|
|
|
|
|
append(Rec,Feed) ->
|
|
|
kvs:ensure(#writer{id=Feed}),
|
|
|
Id = e(2,Rec),
|
|
|
W = writer(Feed),
|
|
|
case kvs:get(Feed,Id) of
|
|
|
- {ok,_} -> raw_append(Rec,Feed), save(W#writer{cache={e(1,Rec),Id,Feed},count=W#writer.count + 1}), Id;
|
|
|
+ {ok,_} -> raw_append(Rec,Feed), save(W#writer{cache={e(1,Rec),Id,key(Feed)},count=W#writer.count + 1}), Id;
|
|
|
{error,_} -> save(add(W#writer{args=Rec})), Id end.
|
|
|
-
|
|
|
-cut(Feed,Id) ->
|
|
|
- Key = key(Feed),
|
|
|
- A = key(Feed,Id),
|
|
|
- {ok,I} = rocksdb:iterator(ref(), []),
|
|
|
- case rocksdb:iterator_move(I, {seek,A}) of
|
|
|
- {ok,A,X} -> {ok,kvs_rocks:cut(I,Key,size(Key),A,X,[],-1,0)};
|
|
|
- _ -> {error,not_found} end.
|