kvs_rocks.erl 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. -module(kvs_rocks).
  2. -include("backend.hrl").
  3. -include("kvs.hrl").
  4. -include("metainfo.hrl").
  5. -include_lib("stdlib/include/qlc.hrl").
  6. -export(?BACKEND).
  7. -export([ref/0,ref/1,bt/1,key/2,key/1,fd/1,tb/1,estimate/0,estimate/1]).
  8. -export([seek_it/1, seek_it/2, move_it/3, move_it/4, take_it/4, take_it/5, delete_it/1, delete_it/2]).
  9. e(X,Y) -> element(X,Y).
  10. bt([]) -> [];
  11. bt(X) -> binary_to_term(X).
  12. tb([]) -> <<>>;
  13. tb(T) when is_list(T) -> unicode:characters_to_nfkc_binary(T);
  14. tb(T) when is_atom(T) -> erlang:atom_to_binary(T, utf8);
  15. tb(T) when is_binary(T) -> T;
  16. tb(T) -> term_to_binary(T).
  17. sz([]) -> 0;
  18. sz(B) -> byte_size(B).
  19. key(R) when is_tuple(R) andalso tuple_size(R) > 1 -> key(e(1,R), e(2,R));
  20. key(R) -> key(R,[]).
  21. key(writer,R) -> % allow old writers
  22. iolist_to_binary([lists:join(<<"/">>, lists:flatten([<<>>, erlang:atom_to_binary(writer, utf8), tb(R)]))]);
  23. key(Tab,R) -> Fd = case Tab of [] -> []; _ -> tb(Tab) end,
  24. iolist_to_binary([lists:join(<<"/">>, lists:flatten([<<>>, Fd, fmt(R)]))]).
  25. keys(Tab, Db) ->
  26. Feed = key(Tab,[]),
  27. {ok, H} = rocksdb:iterator(ref(Db), []),
  28. Keys = fun KEY(K1,Acc) when binary_part(K1,{0,byte_size(Feed)}) =:= Feed ->
  29. case rocksdb:iterator_move(H, next) of
  30. {ok,K2,_} -> KEY(K2,[tb(K1)|Acc]);
  31. _ -> lists:reverse([tb(K1)|Acc])
  32. end;
  33. KEY(_,Acc) -> rocksdb:iterator_close(H), lists:reverse(Acc)
  34. end,
  35. {ok, K, _} = rocksdb:iterator_move(H, {seek, Feed}),
  36. Keys(K,[]).
  37. match(Tab, Id, Db) ->
  38. Feed = key(Tab,[]),
  39. {ok, H} = rocksdb:iterator(ref(Db), []),
  40. Keys = fun KEY(K1) when
  41. binary_part(K1,{0,byte_size(Feed)}) =:= Feed andalso
  42. binary_part(K1,{byte_size(K1), -byte_size(Id)}) =:= Id ->
  43. rocksdb:iterator_close(H), [K1];
  44. KEY(K1) when binary_part(K1,{0,byte_size(Feed)}) =:= Feed ->
  45. case rocksdb:iterator_move(H, next) of
  46. {ok,K2,_} -> KEY(K2);
  47. _ -> []
  48. end;
  49. KEY(_) -> rocksdb:iterator_close(H), []
  50. end,
  51. {ok, K, _} = rocksdb:iterator_move(H, {seek, Feed}),
  52. Keys(K).
  53. fmt([]) -> [];
  54. fmt(K) -> Key = tb(K),
  55. End = sz(Key),
  56. {S,E} = case binary:matches(Key, [<<"/">>], []) of
  57. [{0,1}] -> {1, End-1};
  58. [{0,1},{1,1}] -> {2, End-2};
  59. [{0,1},{1,1}|_] -> {2, End-2};
  60. [{0,1}|_] -> {1, End-1};
  61. _ -> {0, End}
  62. end,
  63. binary:part(Key,{S,E}).
  64. fd(K) -> Key = tb(K),
  65. End = sz(Key),
  66. {S,_} = case binary:matches(Key,[<<"/">>],[]) of
  67. [{0,1}] -> {End,End};
  68. [{0,1},{1,1}] -> {End,End};
  69. [{0,1},{1,1}|T] -> hd(lists:reverse(T));
  70. [{0,1}|T] -> hd(lists:reverse(T));
  71. _ -> {End,End}
  72. end,
  73. binary:part(Key,{0,S}).
  74. run(<<>>,SK,_,_,_) -> {ok,SK,[],[]};
  75. run(Key, % key
  76. SK, % sup-key
  77. Dir, % direction next/prev
  78. Compiled_Operations,
  79. Db) ->
  80. % H is iterator reference
  81. S = sz(SK),
  82. Initial_Object = {ref(Db), []},
  83. Run = fun (F,K,H,V,Acc) when binary_part(K,{0,S}) == SK -> {F(H,Dir),H,[V|Acc]}; % continue +------------+
  84. (_,K,H,V,Acc) -> stop_it(H), % fail-safe closing |
  85. throw({ok, fd(K), bt(V), [bt(A1) || A1 <- Acc]}) end, % acc unfold |
  86. % |
  87. Range_Check = fun(F,K,H,V) -> case F(H,prev) of % backward prefetch |
  88. {ok,K1,V1} when binary_part(K,{0,S}) == SK -> {{ok,K1,V1},H,[V]}; % return (range-check error) |
  89. {ok,K1,V1} -> Run(F,K1,H,V1,[]); % run prev-take chain | loop
  90. _ -> stop_it(H), % fail-safe closing |
  91. throw({ok, fd(K), bt(V), [bt(V)]}) % acc unfold |
  92. end end, % |
  93. % |
  94. State_Machine = fun % |
  95. (F,{ok,H}) -> {F(H,{seek,Key}),H}; % first move (seek) |
  96. (F,{{ok,K,V},H}) when Dir =:= prev -> Range_Check(F,K,H,V); % first chained prev-take |
  97. (F,{{ok,K,V},H}) -> Run(F,K,H,V,[]); % first chained next-take |
  98. (F,{{ok,K,V},H,A}) -> Run(F,K,H,V,A); % chained CPS-take continuator +---+
  99. (_,{{error,E},H,Acc}) -> {{error,E},H,Acc}; % error effects
  100. (F,{I,O}) -> F(I,O) % chain constructor from initial object
  101. end,
  102. catch case lists:foldl(State_Machine, Initial_Object, Compiled_Operations) of
  103. {{ok,K,Bin},H,A} -> stop_it(H), {ok, fd(K), bt(Bin), [bt(A1) || A1 <- A]};
  104. {{ok,K,Bin},H} -> stop_it(H), {ok, fd(K), bt(Bin), []};
  105. {{error,_},H,Acc} -> stop_it(H), {ok, fd(SK), bt(shd(Acc)), [bt(A1) || A1 <- Acc]};
  106. {{error,_},H} -> stop_it(H), {ok, fd(SK), [], []}
  107. end.
  108. initialize() -> [ kvs:initialize(kvs_rocks,Module) || Module <- kvs:modules() ].
  109. index(_,_,_) -> [].
  110. ref_env(Db) -> list_to_atom("rocks_ref_" ++ Db).
  111. db() -> application:get_env(kvs,rocks_name,"rocksdb").
  112. start() -> ok.
  113. stop() -> ok.
  114. destroy() -> destroy(db()).
  115. destroy(Db) -> rocksdb:destroy(Db, []).
  116. version() -> {version,"KVS ROCKSDB"}.
  117. dir() -> [].
  118. match(_) -> [].
  119. index_match(_,_) -> [].
  120. ref() -> ref(db()).
  121. ref(Db) -> application:get_env(kvs,ref_env(Db),[]).
  122. leave() -> leave(db()).
  123. leave(Db) -> case ref(Db) of [] -> skip; X -> rocksdb:close(X), application:set_env(kvs,ref_env(Db),[]), ok end.
  124. join(_,Db) ->
  125. application:start(rocksdb),
  126. leave(Db), {ok, Ref} = rocksdb:open(Db, [{create_if_missing, true}]),
  127. initialize(),
  128. application:set_env(kvs,ref_env(Db),Ref).
  129. compile(it) -> [fun rocksdb:iterator/2];
  130. compile(seek) -> [fun rocksdb:iterator/2,fun rocksdb:iterator_move/2];
  131. compile(move) -> [fun rocksdb:iterator_move/2];
  132. compile(close) -> [fun rocksdb:iterator_close/1].
  133. compile(take,N) -> lists:map(fun(_) -> fun rocksdb:iterator_move/2 end, lists:seq(1, N)).
  134. compile(delete,_, {error,E},_) -> {error,E};
  135. compile(delete,SK,{ok,_,V1,_},Db) ->
  136. F1 = key(key(fmt(SK),e(2,V1))), S = sz(SK),
  137. [fun Del(H,Dir) ->
  138. case rocksdb:delete(ref(Db), F1, []) of ok ->
  139. % {ok, K} case exist only in api, but never actually used
  140. case rocksdb:iterator_move(H,Dir) of
  141. {ok,K,_} when binary_part(K,{0,S}) == SK -> case rocksdb:delete(ref(Db), K, []) of ok -> Del(H,Dir); E -> E end;
  142. {ok,K} when binary_part(K,{0,S}) == SK -> case rocksdb:delete(ref(Db), K, []) of ok -> Del(H,Dir); E -> E end;
  143. {ok,K,V} -> {ok,K,V};
  144. {ok,K} -> {ok, K};
  145. E -> E
  146. end;
  147. E -> E
  148. end
  149. end].
  150. stop_it(H) -> try begin [F]=compile(close), F(H) end catch error:badarg -> ok end.
  151. seek_it(K) -> seek_it(K,db()).
  152. seek_it(K,Db) -> run(K,K,ok,compile(seek),Db).
  153. move_it(Key,SK,Dir) -> move_it(Key,SK,Dir,db()).
  154. move_it(Key,SK,Dir,Db) -> run(Key,SK,Dir,compile(seek) ++ compile(move),Db).
  155. take_it(Key,SK,Dir,N) -> take_it(Key,SK,Dir,N,db()).
  156. take_it(Key,SK,Dir,N,Db) when is_integer(N) andalso N >= 0 -> run(Key,SK,Dir,compile(seek) ++ compile(take,N),Db);
  157. take_it(Key,SK,Dir,_,Db) -> take_it(Key,SK,Dir,0,Db).
  158. delete_it(Fd) -> delete_it(Fd, db()).
  159. delete_it(Fd,Db) -> run(Fd,Fd,next,compile(seek) ++ compile(delete,Fd,seek_it(Fd),Db),Db).
  160. all(R,Db) -> kvs_st:feed(R,Db).
  161. get(Tab, {step,N,[208|_]=Key}, Db) -> get(Tab, {step,N,list_to_binary(Key)},Db);
  162. get(Tab, [208|_]=Key, Db) -> get(Tab, list_to_binary(Key), Db);
  163. get(Tab, Key, Db) ->
  164. case rocksdb:get(ref(Db), key(Tab,Key), []) of
  165. not_found -> {error,not_found};
  166. {ok,Bin} -> {ok,bt(Bin)} end.
  167. put(Record) -> put(Record,db()).
  168. put(Records,Db) when is_list(Records) -> lists:map(fun(Record) -> put(Record,Db) end, Records);
  169. put(Record,Db) -> rocksdb:put(ref(Db), key(Record), term_to_binary(Record), [{sync,true}]).
  170. delete(Feed, Id, Db) -> rocksdb:delete(ref(Db), key(Feed,Id), []).
  171. delete_range(Feed,{Fd,Key},Db) ->
  172. Last = key(key(fmt(Fd),Key)),
  173. ReadOps = [{'prefix_same_as_start', true}],
  174. CompactOps = [{change_level, true}],
  175. Feed1 = key(Feed),
  176. Sz = size(Feed1),
  177. Reopen = case ref(Db) of [] -> skip; _ -> leave(Db), ok end,
  178. {ok, R} = rocksdb:open(Db, [{prefix_extractor, {capped_prefix_transform, Sz}}]),
  179. {ok, H} = rocksdb:iterator(R, ReadOps),
  180. {ok, Start, _} = rocksdb:iterator_move(H, {seek, Feed1}),
  181. ok = rocksdb:delete_range(R, Start, Last, []),
  182. ok = rocksdb:delete(R, Last, []),
  183. ok = rocksdb:delete(R, key(writer,Feed), []),
  184. ok = rocksdb:compact_range(R, Start, undefined, CompactOps),
  185. ok = rocksdb:iterator_close(H),
  186. ok = rocksdb:close(R),
  187. case Reopen of skip -> ok; ok -> join([],Db) end.
  188. count(_) -> 0.
  189. estimate() -> estimate(db()).
  190. estimate(Db) -> case rocksdb:get_property(ref(Db), <<"rocksdb.estimate-num-keys">>) of
  191. {ok, Est} when is_binary(Est) -> binary_to_integer(Est);
  192. {ok, Est} when is_list(Est) -> list_to_integer(Est);
  193. {ok, Est} when is_integer(Est) -> Est;
  194. _ -> 0
  195. end.
  196. shd([]) -> [];
  197. shd(X) -> hd(X).
  198. create_table(_,_) -> [].
  199. add_table_index(_, _) -> ok.
  200. dump() -> ok.
  201. seq(_,_) ->
  202. Val =
  203. case os:type() of
  204. {win32,nt} -> {Mega,Sec,Micro} = erlang:timestamp(), integer_to_list((Mega*1000000+Sec)*1000000+Micro);
  205. _ -> integer_to_list(element(2,hd(lists:reverse(erlang:system_info(os_system_time_source)))))
  206. end,
  207. case 20 - length(Val) > 0 of true -> string:copies("0", 20 - length(Val)); _ -> "" end ++ Val.