kvs_rocks.erl 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. -module(kvs_rocks).
  2. -include("backend.hrl").
  3. -include("kvs.hrl").
  4. -include("metainfo.hrl").
  5. -include_lib("stdlib/include/qlc.hrl").
  6. -export(?BACKEND).
  7. -export([ref/0,bt/1,key/2,key/1,fd/1,tb/1]).
  8. -export([seek_it/1, move_it/3, take_it/4]).
  9. e(X,Y) -> element(X,Y).
  10. bt([]) -> [];
  11. bt(X) -> binary_to_term(X).
  12. tb([]) -> <<>>;
  13. tb(T) when is_list(T) -> unicode:characters_to_nfkc_binary(T);
  14. tb(T) when is_atom(T) -> atom_to_binary(T, utf8);
  15. tb(T) when is_binary(T) -> T;
  16. tb(T) -> term_to_binary(T).
  17. sz([]) -> 0;
  18. sz(B) -> byte_size(B).
  19. key(R) when is_tuple(R) andalso tuple_size(R) > 1 -> key(e(1,R), e(2,R));
  20. key(R) -> key(R,[]).
  21. key(Tab,R) -> Fd = case Tab of [] -> []; _ -> tb(Tab) end,
  22. iolist_to_binary([lists:join(<<"/">>, lists:flatten([<<>>, Fd, fmt(R)]))]).
  23. fmt([]) -> [];
  24. fmt(K) -> Key = tb(K),
  25. End = sz(Key),
  26. {S,E} = case binary:matches(Key, [<<"/">>], []) of
  27. [{0,1}] -> {1, End-1};
  28. [{0,1},{1,1}] -> {2, End-2};
  29. [{0,1},{1,1}|_] -> {2, End-2};
  30. [{0,1}|_] -> {1, End-1};
  31. _ -> {0, End}
  32. end,
  33. binary:part(Key,{S,E}).
  34. fd(K) -> Key = tb(K),
  35. End = sz(Key),
  36. {S,_} = case binary:matches(Key,[<<"/">>],[]) of
  37. [{0,1}] -> {End,End};
  38. [{0,1},{1,1}] -> {End,End};
  39. [{0,1},{1,1}|T] -> hd(lists:reverse(T));
  40. [{0,1}|T] -> hd(lists:reverse(T));
  41. _ -> {End,End}
  42. end,
  43. binary:part(Key,{0,S}).
  44. run(<<>>,SK,_,_) -> {ok,SK,[],[]};
  45. run(Key, % key
  46. SK, % sup-key
  47. Dir, % direction next/prev
  48. Compiled_Operations) ->
  49. % H is iterator reference
  50. S = sz(SK),
  51. Initial_Object = {ref(), []},
  52. Run = fun (F,K,H,V,Acc) when binary_part(K,{0,S}) == SK -> {F(H,Dir),H,[V|Acc]}; % continue +------------+
  53. (_,K,H,V,Acc) -> stop_it(H), % fail-safe closing |
  54. throw({ok, fd(K), bt(V), [bt(A1) || A1 <- Acc]}) end, % acc unfold |
  55. % |
  56. Range_Check = fun(F,K,H,V) -> case F(H,prev) of % backward prefetch |
  57. {ok,K1,V1} when binary_part(K,{0,S}) == SK -> {{ok,K1,V1},H,[V]}; % return (range-check error) |
  58. {ok,K1,V1} -> Run(F,K1,H,V1,[]); % run prev-take chain | loop
  59. _ -> stop_it(H), % fail-safe closing |
  60. throw({ok, fd(K), bt(V), [bt(V)]}) % acc unfold |
  61. end end, % |
  62. % |
  63. State_Machine = fun % |
  64. (F,{ok,H}) -> {F(H,{seek,Key}),H}; % first move (seek) |
  65. (F,{{ok,K,V},H}) when Dir =:= prev -> Range_Check(F,K,H,V); % first chained prev-take |
  66. (F,{{ok,K,V},H}) -> Run(F,K,H,V,[]); % first chained next-take |
  67. (F,{{ok,K,V},H,A}) -> Run(F,K,H,V,A); % chained CPS-take continuator +---+
  68. (_,{{error,E},H,Acc}) -> {{error,E},H,Acc}; % error effects
  69. (F,{I,O}) -> F(I,O) % chain constructor from initial object
  70. end,
  71. catch case lists:foldl(State_Machine, Initial_Object, Compiled_Operations) of
  72. {{ok,K,Bin},_,A} -> {ok, fd(K), bt(Bin), [bt(A1) || A1 <- A]};
  73. {{ok,K,Bin},_} -> {ok, fd(K), bt(Bin), []};
  74. {{error,_},_,Acc} -> {ok, fd(SK), bt(shd(Acc)), [bt(A1) || A1 <- Acc]};
  75. {{error,_},_} -> {ok, fd(SK), [], []}
  76. end.
  77. initialize() -> [ kvs:initialize(kvs_rocks,Module) || Module <- kvs:modules() ].
  78. index(_,_,_) -> [].
  79. start() -> ok.
  80. stop() -> ok.
  81. destroy() -> rocksdb:destroy(application:get_env(kvs,rocks_name,"rocksdb"), []).
  82. version() -> {version,"KVS ROCKSDB"}.
  83. dir() -> [].
  84. ref() -> application:get_env(kvs,rocks_ref,[]).
  85. leave() -> case ref() of [] -> skip; X -> rocksdb:close(X), application:set_env(kvs,rocks_ref,[]), ok end.
  86. join(_) -> application:start(rocksdb),
  87. leave(), {ok, Ref} = rocksdb:open(application:get_env(kvs,rocks_name,"rocksdb"), [{create_if_missing, true}]),
  88. initialize(),
  89. application:set_env(kvs,rocks_ref,Ref).
  90. compile(seek) -> [fun rocksdb:iterator/2,fun rocksdb:iterator_move/2];
  91. compile(move) -> [fun rocksdb:iterator_move/2];
  92. compile(close) -> [fun rocksdb:iterator_close/1].
  93. compile(take,N) -> lists:map(fun(_) -> fun rocksdb:iterator_move/2 end, lists:seq(1, N)).
  94. stop_it(H) -> try begin [F]=compile(close), F(H) end catch error:badarg -> ok end.
  95. seek_it(K) -> run(K,K,ok,compile(seek)).
  96. move_it(Key,SK,Dir) -> run(Key,SK,Dir,compile(seek) ++ compile(move)).
  97. take_it(Key,SK,Dir,N) when is_integer(N) andalso N >= 0 -> run(Key,SK,Dir,compile(seek) ++ compile(take,N));
  98. take_it(Key,SK,Dir,_) -> take_it(Key,SK,Dir,0).
  99. all(R) -> kvs_st:feed(R).
  100. get(Tab, Key) ->
  101. case rocksdb:get(ref(), key(Tab,Key), []) of
  102. not_found -> {error,not_found};
  103. {ok,Bin} -> {ok,bt(Bin)} end.
  104. put(Records) when is_list(Records) -> lists:map(fun(Record) -> put(Record) end, Records);
  105. put(Record) -> rocksdb:put(ref(), key(Record), term_to_binary(Record), [{sync,true}]).
  106. delete(Feed, Id) -> rocksdb:delete(ref(), key(Feed,Id), []).
  107. count(_) -> 0.
  108. shd([]) -> [];
  109. shd(X) -> hd(X).
  110. create_table(_,_) -> [].
  111. add_table_index(_, _) -> ok.
  112. dump() -> ok.
  113. seq(_,_) ->
  114. case os:type() of
  115. {win32,nt} -> {Mega,Sec,Micro} = erlang:timestamp(), integer_to_list((Mega*1000000+Sec)*1000000+Micro);
  116. _ -> erlang:integer_to_list(element(2,hd(lists:reverse(erlang:system_info(os_monotonic_time_source)))))
  117. end.