|
@@ -34,23 +34,29 @@ fd(K) -> Key = tb(K),
|
|
|
end,
|
|
|
binary:part(Key,{0,S}).
|
|
|
|
|
|
-o(<<>>,FK,_,_) -> {ok,FK,[],[]};
|
|
|
-o(Key,FK,Dir,CompiledOperations) ->
|
|
|
- S = size(FK),
|
|
|
+o(<<>>,SK,_,_) -> {ok,SK,[],[]};
|
|
|
+o(Key, % key
|
|
|
+ SK, % sub-key
|
|
|
+ Dir, % direction next/prev
|
|
|
+ Compiled_Operations) ->
|
|
|
|
|
|
- Run = fun (F,K,H,V,Acc) when binary_part(K,{0,S}) == FK -> {F(H,Dir),H,[V|Acc]}; % continue +------------+
|
|
|
+ S = size(SK),
|
|
|
+
|
|
|
+% H is cache prefix
|
|
|
+
|
|
|
+ Run = fun (F,K,H,V,Acc) when binary_part(K,{0,S}) == SK -> {F(H,Dir),H,[V|Acc]}; % continue +------------+
|
|
|
(_,K,H,V,Acc) -> stop_it(H), % fail-safe closing |
|
|
|
- throw({ok,fd(K),bt(V),[bt(A1)||A1<-Acc]}) end, % acc unfold |
|
|
|
+ throw({ok, fd(K), bt(V), [bt(A1) || A1 <- Acc]}) end, % acc unfold |
|
|
|
% |
|
|
|
- RangeCheckRun = fun(F,K,V,H) -> case F(H,prev) of % |
|
|
|
- {ok,K1,V1} when binary_part(K,{0,S}) == FK -> {{ok,K1,V1},H,[V]}; % return (range-check error) |
|
|
|
+ Range_Check = fun(F,K,H,V) -> case F(H,prev) of % |
|
|
|
+ {ok,K1,V1} when binary_part(K,{0,S}) == SK -> {{ok,K1,V1},H,[V]}; % return (range-check error) |
|
|
|
{ok,K1,V1} -> Run(F,K1,H,V1,[]); % run prev-take chain | loop
|
|
|
E -> E % violation |
|
|
|
end end, % |
|
|
|
% |
|
|
|
- StateMachine = fun % |
|
|
|
+ State_Machine = fun % |
|
|
|
(F,{ok,H}) -> {F(H,{seek,Key}),H}; % first move (seek) |
|
|
|
- (F,{{ok,K,V},H}) when Dir =:= prev -> RangeCheckRun(F,K,V,H); % first chained prev-take |
|
|
|
+ (F,{{ok,K,V},H}) when Dir =:= prev -> Range_Check(F,K,H,V); % first chained prev-take |
|
|
|
(F,{{ok,K,V},H}) -> Run(F,K,H,V,[]); % first chained next-take |
|
|
|
(F,{{ok,K,V},H,A}) -> Run(F,K,H,V,A); % chained CPS-take continuator +---+
|
|
|
(_,{{error,_},H,Acc}) -> {{ok,[],[]},H,Acc}; % error effects
|
|
@@ -58,10 +64,10 @@ o(Key,FK,Dir,CompiledOperations) ->
|
|
|
(F,H) -> F(H)
|
|
|
end,
|
|
|
|
|
|
- catch case lists:foldl(StateMachine, {ref(),[]}, CompiledOperations) of
|
|
|
- {{ok,K,Bin},_,A} -> {ok,fd(K), bt(Bin),[bt(A1)||A1<-A]};
|
|
|
- {{ok,K,Bin},_} -> {ok,fd(K), bt(Bin),[]};
|
|
|
- {{error,_},_,Acc} -> {ok,fd(FK),bt(shd(Acc)),[bt(A1) ||A1<-Acc]}
|
|
|
+ catch case lists:foldl(State_Machine, {ref(), []}, Compiled_Operations) of
|
|
|
+ {{ok,K,Bin},_,A} -> {ok, fd(K), bt(Bin), [bt(A1) || A1 <- A]};
|
|
|
+ {{ok,K,Bin},_} -> {ok, fd(K), bt(Bin), []};
|
|
|
+ {{error,_},_,Acc} -> {ok, fd(SK), bt(shd(Acc)), [bt(A1) || A1 <- Acc]}
|
|
|
end.
|
|
|
|
|
|
initialize() -> [ kvs:initialize(kvs_rocks,Module) || Module <- kvs:modules() ].
|
|
@@ -84,11 +90,11 @@ compile(move) -> [fun rocksdb:iterator_move/2];
|
|
|
compile(close) -> [fun rocksdb:iterator_close/1].
|
|
|
compile(take,N) -> lists:map(fun(_) -> fun rocksdb:iterator_move/2 end, lists:seq(1, N)).
|
|
|
|
|
|
-stop_it(H) -> try begin [F]=compile(close), F(H) end catch error:badarg -> ok end.
|
|
|
-seek_it(K) -> o(K,K,ok,compile(seek)).
|
|
|
-move_it(K,FK,Dir) -> o(K,FK,Dir,compile(seek) ++ compile(move)).
|
|
|
-take_it(Key,FK,Dir,N) when is_integer(N) andalso N >= 0 -> o(Key,FK,Dir,compile(seek) ++ compile(take,N));
|
|
|
-take_it(Key,FK,Dir,_) -> take_it(Key,FK,Dir,0).
|
|
|
+stop_it(H) -> try begin [F]=compile(close), F(H) end catch error:badarg -> ok end.
|
|
|
+seek_it(K) -> o(K,K,ok,compile(seek)).
|
|
|
+move_it(Key,SK,Dir) -> o(Key,SK,Dir,compile(seek) ++ compile(move)).
|
|
|
+take_it(Key,SK,Dir,N) when is_integer(N) andalso N >= 0 -> o(Key,SK,Dir,compile(seek) ++ compile(take,N));
|
|
|
+take_it(Key,SK,Dir,_) -> take_it(Key,SK,Dir,0).
|
|
|
|
|
|
all(R) -> kvs_st:feed(R).
|
|
|
|