Browse Source

key formatting + takeit

dxt 4 years ago
parent
commit
b3bf7bf4b1
1 changed files with 20 additions and 5 deletions
  1. 20 5
      src/stores/kvs_rocks.erl

+ 20 - 5
src/stores/kvs_rocks.erl

@@ -5,7 +5,7 @@
 -include_lib("stdlib/include/qlc.hrl").
 -include_lib("stdlib/include/qlc.hrl").
 -export(?BACKEND).
 -export(?BACKEND).
 -export([ref/0,cut/8,next/8,prev/8,prev2/8,next2/8,bt/1,key/2,key/1,fd/1]).
 -export([ref/0,cut/8,next/8,prev/8,prev2/8,next2/8,bt/1,key/2,key/1,fd/1]).
--export([seek_it/1, move_it/3]).
+-export([seek_it/1, move_it/3, take_it/4]).
 
 
 e(X,Y)     -> element(X,Y).
 e(X,Y)     -> element(X,Y).
 bt([])     -> [];
 bt([])     -> [];
@@ -16,10 +16,22 @@ tb(T) when is_atom(T) -> atom_to_binary(T);
 tb(T) when is_binary(T) -> T;
 tb(T) when is_binary(T) -> T;
 tb(T)      -> term_to_binary(T).
 tb(T)      -> term_to_binary(T).
 
 
+fmt([]) -> [];
+fmt(K) ->
+  B = tb(K),
+  S = if byte_size(B) > 1 -> 2;true -> byte_size(B) end,
+  B1 = binary:split(B,[<<"/">>,<<"//">>], [{scope,{0,S}},trim_all]),
+  B2 = case B1 of [] -> <<>>;[X|_] -> X end,
+  S1 = if byte_size(B2) > 1 -> -2; true -> 0 end,
+  B3 = binary:split(B2,[<<"/">>,<<"//">>], [{scope,{byte_size(B2),S1}},trim_all]),
+  B4 = case B3 of [] -> <<>>;[X1|_] -> X1 end,
+  B4.
+
 key(R)     when is_tuple(R) andalso tuple_size(R) > 1 -> key(e(1,R), e(2,R));
 key(R)     when is_tuple(R) andalso tuple_size(R) > 1 -> key(e(1,R), e(2,R));
 key(R)     -> key(R,[]).
 key(R)     -> key(R,[]).
 key(Tab,R) when is_tuple(R) andalso tuple_size(R) > 1 -> key(Tab, e(2,R));
 key(Tab,R) when is_tuple(R) andalso tuple_size(R) > 1 -> key(Tab, e(2,R));
-key(Tab,R) -> iolist_to_binary([lists:join(<<"/">>, lists:flatten([<<>>, tb(Tab), tb(R)]))]).
+key(Tab,R) -> iolist_to_binary([lists:join(<<"/">>, lists:flatten([<<>>, fmt(Tab), fmt(R)]))]).
+
 
 
 fd(Key) ->
 fd(Key) ->
   B = lists:reverse(binary:split(tb(Key), [<<"/">>, <<"//">>], [global, trim_all])),
   B = lists:reverse(binary:split(tb(Key), [<<"/">>, <<"//">>], [global, trim_all])),
@@ -41,9 +53,8 @@ o(Key,FK,Dir,Fx) ->
           (F,{R,O})             -> F(R,O);
           (F,{R,O})             -> F(R,O);
           (F,H)                 -> F(H) end,
           (F,H)                 -> F(H) end,
   catch case lists:foldl(It, {ref(),[]}, Fx) of
   catch case lists:foldl(It, {ref(),[]}, Fx) of
-    {{ok,K,Bin},_,A} when binary_part(K,{0,S}) == FK  -> {ok,fd(K),bt(Bin),[bt(A1)||A1<-A]};
-    {{ok,K,Bin},_,_}                                  -> {ok,fd(K),bt(Bin),[]};
-    {{ok,K,Bin},_}                                    -> {ok,fd(K),bt(Bin),[]}
+    {{ok,K,Bin},_,A} -> {ok,fd(K),bt(Bin),[bt(A1)||A1<-A]};
+    {{ok,K,Bin},_}   -> {ok,fd(K),bt(Bin),[]}
   end.
   end.
 
 
 start()    -> ok.
 start()    -> ok.
@@ -63,6 +74,10 @@ index(_,_,_) -> [].
 close_it(H) -> try rocksdb:iterator_close(H) catch error:badarg -> ok end.
 close_it(H) -> try rocksdb:iterator_close(H) catch error:badarg -> ok end.
 seek_it(K) -> o(K,K,ok,[fun rocksdb:iterator/2,fun rocksdb:iterator_move/2]).
 seek_it(K) -> o(K,K,ok,[fun rocksdb:iterator/2,fun rocksdb:iterator_move/2]).
 move_it(K,FK,Dir) -> o(K,FK,Dir,[fun rocksdb:iterator/2,fun rocksdb:iterator_move/2,fun rocksdb:iterator_move/2]).
 move_it(K,FK,Dir) -> o(K,FK,Dir,[fun rocksdb:iterator/2,fun rocksdb:iterator_move/2,fun rocksdb:iterator_move/2]).
+take_it(Key,FK,Dir,N) when is_integer(N) andalso N >= 0 ->
+  o(Key,FK,Dir,[fun rocksdb:iterator/2,fun rocksdb:iterator_move/2] ++
+               lists:map(fun(_) -> fun rocksdb:iterator_move/2 end,lists:seq(1,N)));
+take_it(Key,FK,Dir,_) -> take_it(Key,FK,Dir,0).
 
 
 get(Tab, Key) ->
 get(Tab, Key) ->
     case rocksdb:get(ref(), key(Tab,Key), []) of
     case rocksdb:get(ref(), key(Tab,Key), []) of