Namdak Tonpa 4 years ago
parent
commit
a30f820211
1 changed files with 34 additions and 32 deletions
  1. 34 32
      src/stores/kvs_rocks.erl

+ 34 - 32
src/stores/kvs_rocks.erl

@@ -7,19 +7,19 @@
 -export([ref/0,bt/1,key/2,key/1,fd/1, tb/1]).
 -export([seek_it/1, move_it/3, take_it/4]).
 
-e(X,Y)     -> element(X,Y).
-bt([])     -> [];
-bt(X)      -> binary_to_term(X).
-tb([])     -> [];
+e(X,Y) -> element(X,Y).
+
+bt([]) -> [];
+bt(X)  -> binary_to_term(X).
+
+tb([]) -> [];
 tb(T) when is_list(T) -> list_to_binary(T);
 tb(T) when is_atom(T) -> atom_to_binary(T, utf8);
 tb(T) when is_binary(T) -> T;
-tb(T)      -> term_to_binary(T).
-
-% put
+tb(T) -> term_to_binary(T).
 
-key(R)     when is_tuple(R) andalso tuple_size(R) > 1 -> key(e(1,R), e(2,R));
-key(R)                                                -> key(R,[]).
+key(R) when is_tuple(R) andalso tuple_size(R) > 1 -> key(e(1,R), e(2,R));
+key(R) -> key(R,[]).
 key(Tab,R) when is_tuple(R) andalso tuple_size(R) > 1 -> key(Tab, e(2,R));
 key(Tab,R) -> iolist_to_binary([lists:join(<<"/">>, lists:flatten([<<>>, tb(Tab), tb(R)]))]).
 
@@ -39,7 +39,7 @@ o(Key,FK,Dir,CompiledOperations) ->
   S = size(FK),
 
   Run = fun (F,K,H,V,Acc) when binary_part(K,{0,S}) == FK -> {F(H,Dir),H,[V|Acc]}; % continue +------------+
-            (_,K,H,V,Acc) -> close_it(H),                                          % failsafe close        |
+            (_,K,H,V,Acc) -> stop_it(H),                                           % fail-safe closing     |
                              throw({ok,fd(K),bt(V),[bt(A1)||A1<-Acc]}) end,        % acc unfold            |
                                                                                    %                       |
   RangeCheckRun = fun(F,K,V,H) -> case F(H,prev) of                     %                                  |
@@ -64,28 +64,34 @@ o(Key,FK,Dir,CompiledOperations) ->
     {{error,_},_,Acc} -> {ok,fd(FK),bt(shd(Acc)),[bt(A1) ||A1<-Acc]}
   end.
 
+initialize() -> [ kvs:initialize(kvs_rocks,Module) || Module <- kvs:modules() ].
+index(_,_,_) -> [].
+
 start()    -> ok.
 stop()     -> ok.
 destroy()  -> rocksdb:destroy(application:get_env(kvs,rocks_name,"rocksdb"), []).
 version()  -> {version,"KVS ROCKSDB"}.
 dir()      -> [].
-leave() -> case ref() of [] -> skip; X -> rocksdb:close(X), application:set_env(kvs,rocks_ref,[]), ok end.
-join(_) -> application:start(rocksdb),
-           leave(), {ok, Ref} = rocksdb:open(application:get_env(kvs,rocks_name,"rocksdb"), [{create_if_missing, true}]),
-           initialize(),
-           application:set_env(kvs,rocks_ref,Ref).
-initialize() -> [ kvs:initialize(kvs_rocks,Module) || Module <- kvs:modules() ].
-ref() -> application:get_env(kvs,rocks_ref,[]).
-index(_,_,_) -> [].
-
-close_it(H) -> try rocksdb:iterator_close(H) catch error:badarg -> ok end.
-seek_it(K) -> o(K,K,ok,[fun rocksdb:iterator/2,fun rocksdb:iterator_move/2]).
-move_it(K,FK,Dir) -> o(K,FK,Dir,[fun rocksdb:iterator/2,fun rocksdb:iterator_move/2,fun rocksdb:iterator_move/2]).
-take_it(Key,FK,Dir,N) when is_integer(N) andalso N >= 0 ->
-  o(Key,FK,Dir,[fun rocksdb:iterator/2,fun rocksdb:iterator_move/2] ++
-               lists:map(fun(_) -> fun rocksdb:iterator_move/2 end,lists:seq(1,N)));
+ref()      -> application:get_env(kvs,rocks_ref,[]).
+leave()    -> case ref() of [] -> skip; X -> rocksdb:close(X), application:set_env(kvs,rocks_ref,[]), ok end.
+join(_)    -> application:start(rocksdb),
+              leave(), {ok, Ref} = rocksdb:open(application:get_env(kvs,rocks_name,"rocksdb"), [{create_if_missing, true}]),
+              initialize(),
+              application:set_env(kvs,rocks_ref,Ref).
+
+compile(seek)     -> [fun rocksdb:iterator/2,fun rocksdb:iterator_move/2];
+compile(move)     -> [fun rocksdb:iterator_move/2];
+compile(close)    -> [fun rocksdb:iterator_close/1].
+compile(take,N)   -> lists:map(fun(_) -> fun rocksdb:iterator_move/2 end, lists:seq(1, N)).
+
+stop_it(H)        -> try (compile(close))(H) catch error:badarg -> ok end.
+seek_it(K)        -> o(K,K,ok,compile(seek)).
+move_it(K,FK,Dir) -> o(K,FK,Dir,compile(seek) ++ compile(move)).
+take_it(Key,FK,Dir,N) when is_integer(N) andalso N >= 0 -> o(Key,FK,Dir,compile(seek) ++ compile(take,N));
 take_it(Key,FK,Dir,_) -> take_it(Key,FK,Dir,0).
 
+all(R) -> kvs_st:feed(R).
+
 get(Tab, Key) ->
     case rocksdb:get(ref(), key(Tab,Key), []) of
          not_found -> {error,not_found};
@@ -94,20 +100,16 @@ get(Tab, Key) ->
 put(Records) when is_list(Records) -> lists:map(fun(Record) -> put(Record) end, Records);
 put(Record) -> rocksdb:put(ref(), key(Record), term_to_binary(Record), [{sync,true}]).
 delete(Feed, Id) -> rocksdb:delete(ref(), key(Feed,Id), []).
-
 count(_) -> 0.
 
-all(R) -> kvs_st:feed(R).
-
 shd([]) -> [];
 shd(X) -> hd(X).
+create_table(_,_) -> [].
+add_table_index(_, _) -> ok.
+dump() -> ok.
 
 seq(_,_) ->
   case os:type() of
        {win32,nt} -> {Mega,Sec,Micro} = erlang:timestamp(), integer_to_list((Mega*1000000+Sec)*1000000+Micro);
                 _ -> erlang:integer_to_list(element(2,hd(lists:reverse(erlang:system_info(os_monotonic_time_source)))))
   end.
-
-create_table(_,_) -> [].
-add_table_index(_, _) -> ok.
-dump() -> ok.