|
@@ -5,7 +5,7 @@
|
|
-include_lib("stdlib/include/qlc.hrl").
|
|
-include_lib("stdlib/include/qlc.hrl").
|
|
-export(?BACKEND).
|
|
-export(?BACKEND).
|
|
-export([ref/0,ref/1,bt/1,key/2,key/1,fd/1,tb/1,estimate/0,estimate/1]).
|
|
-export([ref/0,ref/1,bt/1,key/2,key/1,fd/1,tb/1,estimate/0,estimate/1]).
|
|
--export([seek_it/1, seek_it/2, move_it/3, move_it/4, take_it/4, take_it/5]).
|
|
|
|
|
|
+-export([seek_it/1, seek_it/2, move_it/3, move_it/4, take_it/4, take_it/5, delete_it/1, delete_it/2]).
|
|
|
|
|
|
e(X,Y) -> element(X,Y).
|
|
e(X,Y) -> element(X,Y).
|
|
|
|
|
|
@@ -26,6 +26,35 @@ key(writer,R) -> % allow old writers
|
|
iolist_to_binary([lists:join(<<"/">>, lists:flatten([<<>>, erlang:atom_to_binary(writer, utf8), tb(R)]))]);
|
|
iolist_to_binary([lists:join(<<"/">>, lists:flatten([<<>>, erlang:atom_to_binary(writer, utf8), tb(R)]))]);
|
|
key(Tab,R) -> Fd = case Tab of [] -> []; _ -> tb(Tab) end,
|
|
key(Tab,R) -> Fd = case Tab of [] -> []; _ -> tb(Tab) end,
|
|
iolist_to_binary([lists:join(<<"/">>, lists:flatten([<<>>, Fd, fmt(R)]))]).
|
|
iolist_to_binary([lists:join(<<"/">>, lists:flatten([<<>>, Fd, fmt(R)]))]).
|
|
|
|
+keys(Tab, Db) ->
|
|
|
|
+ Feed = key(Tab,[]),
|
|
|
|
+ {ok, H} = rocksdb:iterator(ref(Db), []),
|
|
|
|
+ Keys = fun KEY(K1,Acc) when binary_part(K1,{0,byte_size(Feed)}) =:= Feed ->
|
|
|
|
+ case rocksdb:iterator_move(H, next) of
|
|
|
|
+ {ok,K2,_} -> KEY(K2,[tb(K1)|Acc]);
|
|
|
|
+ _ -> lists:reverse([tb(K1)|Acc])
|
|
|
|
+ end;
|
|
|
|
+ KEY(_,Acc) -> rocksdb:iterator_close(H), lists:reverse(Acc)
|
|
|
|
+ end,
|
|
|
|
+ {ok, K, _} = rocksdb:iterator_move(H, {seek, Feed}),
|
|
|
|
+ Keys(K,[]).
|
|
|
|
+
|
|
|
|
+ key_match(Tab, Id, Db) ->
|
|
|
|
+ Feed = key(Tab,[]),
|
|
|
|
+ {ok, H} = rocksdb:iterator(ref(Db), []),
|
|
|
|
+ Keys = fun KEY(K1) when
|
|
|
|
+ binary_part(K1,{0,byte_size(Feed)}) =:= Feed andalso
|
|
|
|
+ binary_part(K1,{byte_size(K1), -byte_size(Id)}) =:= Id ->
|
|
|
|
+ rocksdb:iterator_close(H), [K1];
|
|
|
|
+ KEY(K1) when binary_part(K1,{0,byte_size(Feed)}) =:= Feed ->
|
|
|
|
+ case rocksdb:iterator_move(H, next) of
|
|
|
|
+ {ok,K2,_} -> KEY(K2);
|
|
|
|
+ _ -> []
|
|
|
|
+ end;
|
|
|
|
+ KEY(_) -> rocksdb:iterator_close(H), []
|
|
|
|
+ end,
|
|
|
|
+ {ok, K, _} = rocksdb:iterator_move(H, {seek, Feed}),
|
|
|
|
+ Keys(K).
|
|
|
|
|
|
fmt([]) -> [];
|
|
fmt([]) -> [];
|
|
fmt(K) -> Key = tb(K),
|
|
fmt(K) -> Key = tb(K),
|
|
@@ -111,10 +140,27 @@ join(_,Db) ->
|
|
initialize(),
|
|
initialize(),
|
|
application:set_env(kvs,ref_env(Db),Ref).
|
|
application:set_env(kvs,ref_env(Db),Ref).
|
|
|
|
|
|
-compile(seek) -> [fun rocksdb:iterator/2,fun rocksdb:iterator_move/2];
|
|
|
|
-compile(move) -> [fun rocksdb:iterator_move/2];
|
|
|
|
-compile(close) -> [fun rocksdb:iterator_close/1].
|
|
|
|
|
|
+compile(it) -> [fun rocksdb:iterator/2];
|
|
|
|
+compile(seek) -> [fun rocksdb:iterator/2,fun rocksdb:iterator_move/2];
|
|
|
|
+compile(move) -> [fun rocksdb:iterator_move/2];
|
|
|
|
+compile(close) -> [fun rocksdb:iterator_close/1].
|
|
compile(take,N) -> lists:map(fun(_) -> fun rocksdb:iterator_move/2 end, lists:seq(1, N)).
|
|
compile(take,N) -> lists:map(fun(_) -> fun rocksdb:iterator_move/2 end, lists:seq(1, N)).
|
|
|
|
+compile(delete,_, {error,E},_) -> {error,E};
|
|
|
|
+compile(delete,SK,{ok,_,V1,_},Db) ->
|
|
|
|
+ F1 = key(key(fmt(SK),e(2,V1))), S = sz(SK),
|
|
|
|
+ [fun Del(H,Dir) ->
|
|
|
|
+ case rocksdb:delete(ref(Db), F1, []) of ok ->
|
|
|
|
+ % {ok, K} case exist only in api, but never actually used
|
|
|
|
+ case rocksdb:iterator_move(H,Dir) of
|
|
|
|
+ {ok,K,_} when binary_part(K,{0,S}) == SK -> case rocksdb:delete(ref(Db), K, []) of ok -> Del(H,Dir); E -> E end;
|
|
|
|
+ {ok,K} when binary_part(K,{0,S}) == SK -> case rocksdb:delete(ref(Db), K, []) of ok -> Del(H,Dir); E -> E end;
|
|
|
|
+ {ok,K,V} -> {ok,K,V};
|
|
|
|
+ {ok,K} -> {ok, K};
|
|
|
|
+ E -> E
|
|
|
|
+ end;
|
|
|
|
+ E -> E
|
|
|
|
+ end
|
|
|
|
+ end].
|
|
|
|
|
|
stop_it(H) -> try begin [F]=compile(close), F(H) end catch error:badarg -> ok end.
|
|
stop_it(H) -> try begin [F]=compile(close), F(H) end catch error:badarg -> ok end.
|
|
seek_it(K) -> seek_it(K,db()).
|
|
seek_it(K) -> seek_it(K,db()).
|
|
@@ -124,6 +170,8 @@ move_it(Key,SK,Dir,Db) -> run(Key,SK,Dir,compile(seek) ++ compile(move),Db).
|
|
take_it(Key,SK,Dir,N) -> take_it(Key,SK,Dir,N,db()).
|
|
take_it(Key,SK,Dir,N) -> take_it(Key,SK,Dir,N,db()).
|
|
take_it(Key,SK,Dir,N,Db) when is_integer(N) andalso N >= 0 -> run(Key,SK,Dir,compile(seek) ++ compile(take,N),Db);
|
|
take_it(Key,SK,Dir,N,Db) when is_integer(N) andalso N >= 0 -> run(Key,SK,Dir,compile(seek) ++ compile(take,N),Db);
|
|
take_it(Key,SK,Dir,_,Db) -> take_it(Key,SK,Dir,0,Db).
|
|
take_it(Key,SK,Dir,_,Db) -> take_it(Key,SK,Dir,0,Db).
|
|
|
|
+delete_it(Fd) -> delete_it(Fd, db()).
|
|
|
|
+delete_it(Fd,Db) -> run(Fd,Fd,next,compile(seek) ++ compile(delete,Fd,seek_it(Fd),Db),Db).
|
|
|
|
|
|
all(R,Db) -> kvs_st:feed(R,Db).
|
|
all(R,Db) -> kvs_st:feed(R,Db).
|
|
|
|
|
|
@@ -138,6 +186,25 @@ put(Record) -> put(Record,db()).
|
|
put(Records,Db) when is_list(Records) -> lists:map(fun(Record) -> put(Record,Db) end, Records);
|
|
put(Records,Db) when is_list(Records) -> lists:map(fun(Record) -> put(Record,Db) end, Records);
|
|
put(Record,Db) -> rocksdb:put(ref(Db), key(Record), term_to_binary(Record), [{sync,true}]).
|
|
put(Record,Db) -> rocksdb:put(ref(Db), key(Record), term_to_binary(Record), [{sync,true}]).
|
|
delete(Feed, Id, Db) -> rocksdb:delete(ref(Db), key(Feed,Id), []).
|
|
delete(Feed, Id, Db) -> rocksdb:delete(ref(Db), key(Feed,Id), []).
|
|
|
|
+delete_range(Feed,{Fd,Key},Db) ->
|
|
|
|
+ Last = key(key(fmt(Fd),Key)),
|
|
|
|
+ ReadOps = [{'prefix_same_as_start', true}],
|
|
|
|
+ CompactOps = [{change_level, true}],
|
|
|
|
+ Feed1 = key(Feed),
|
|
|
|
+ Sz = size(Feed1),
|
|
|
|
+ Reopen = case ref(Db) of [] -> skip; _ -> leave(Db), ok end,
|
|
|
|
+
|
|
|
|
+ {ok, R} = rocksdb:open(Db, [{prefix_extractor, {capped_prefix_transform, Sz}}]),
|
|
|
|
+ {ok, H} = rocksdb:iterator(R, ReadOps),
|
|
|
|
+ {ok, Start, _} = rocksdb:iterator_move(H, {seek, Feed1}),
|
|
|
|
+
|
|
|
|
+ ok = rocksdb:delete_range(R, Start, Last, []),
|
|
|
|
+ ok = rocksdb:delete(R, Last, []),
|
|
|
|
+ ok = rocksdb:delete(R, key(writer,Feed), []),
|
|
|
|
+ ok = rocksdb:compact_range(R, Start, undefined, CompactOps),
|
|
|
|
+ ok = rocksdb:iterator_close(H),
|
|
|
|
+ ok = rocksdb:close(R),
|
|
|
|
+ case Reopen of skip -> ok; ok -> join([],Db) end.
|
|
count(_) -> 0.
|
|
count(_) -> 0.
|
|
estimate() -> estimate(db()).
|
|
estimate() -> estimate(db()).
|
|
estimate(Db) -> case rocksdb:get_property(ref(Db), <<"rocksdb.estimate-num-keys">>) of
|
|
estimate(Db) -> case rocksdb:get_property(ref(Db), <<"rocksdb.estimate-num-keys">>) of
|