kvs_rocks.erl 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. -module(kvs_rocks).
  2. -include("backend.hrl").
  3. -include("kvs.hrl").
  4. -include("metainfo.hrl").
  5. -include_lib("stdlib/include/qlc.hrl").
  6. -export(?BACKEND).
  7. -export([ref/0,cut/8,next/8,prev/8,prev2/8,next2/8,bt/1,key/2,key/1,fd/1]).
  8. -export([seek_it/1, move_it/3]).
  9. e(X,Y) -> element(X,Y).
  10. bt([]) -> [];
  11. bt(X) -> binary_to_term(X).
  12. tb([]) -> [];
  13. tb(T) when is_list(T) -> list_to_binary(T);
  14. tb(T) when is_atom(T) -> atom_to_binary(T);
  15. tb(T) when is_binary(T) -> T;
  16. tb(T) -> term_to_binary(T).
  17. key(R) when is_tuple(R) andalso tuple_size(R) > 1 -> key(e(1,R), e(2,R));
  18. key(R) -> key(R,[]).
  19. key(Tab,R) when is_tuple(R) andalso tuple_size(R) > 1 -> key(Tab, e(2,R));
  20. key(Tab,R) -> iolist_to_binary([lists:join(<<"/">>, lists:flatten([<<>>, tb(Tab), tb(R)]))]).
  21. fd(Key) ->
  22. B = lists:reverse(binary:split(tb(Key), [<<"/">>, <<"//">>], [global, trim_all])),
  23. B1 = lists:reverse(case B of [] -> [];[X] -> [X];[_|T] -> T end),
  24. iolist_to_binary(lists:join(<<"/">>, [<<>>]++B1)).
  25. o(<<>>,FK,_,_) -> {ok,FK,[],[]};
  26. o(Key,FK,Dir,Fx) ->
  27. S = size(FK),
  28. Sheaf = fun (F,K,H,V,Acc) when binary_part(K,{0,S}) == FK -> {F(H,Dir),H,[V|Acc]};
  29. (_,K,H,V,Acc) -> close_it(H),
  30. throw({ok,fd(K),bt(V),[bt(A1)||A1<-Acc]}) end,
  31. It = fun(F,{ok,H}) -> {F(H,{seek,Key}),H};
  32. (F,{{ok,K,V},H}) -> Sheaf(F,K,H,V,[]);
  33. (F,{{ok,K,V},H,A}) -> Sheaf(F,K,H,V,A);
  34. (_,{{error,_},H,Acc}) -> {{ok,[],[]},H,Acc};
  35. (F,{R,O}) -> F(R,O);
  36. (F,H) -> F(H) end,
  37. catch case lists:foldl(It, {ref(),[]}, Fx) of
  38. {{ok,K,Bin},_,A} when binary_part(K,{0,S}) == FK -> {ok,fd(K),bt(Bin),[bt(A1)||A1<-A]};
  39. {{ok,K,Bin},_,_} -> {ok,fd(K),bt(Bin),[]};
  40. {{ok,K,Bin},_} -> {ok,fd(K),bt(Bin),[]}
  41. end.
  42. start() -> ok.
  43. stop() -> ok.
  44. destroy() -> rocksdb:destroy(application:get_env(kvs,rocks_name,"rocksdb"), []).
  45. version() -> {version,"KVS ROCKSDB"}.
  46. dir() -> [].
  47. leave() -> case ref() of [] -> skip; X -> rocksdb:close(X), application:set_env(kvs,rocks_ref,[]), ok end.
  48. join(_) -> application:start(rocksdb),
  49. leave(), {ok, Ref} = rocksdb:open(application:get_env(kvs,rocks_name,"rocksdb"), [{create_if_missing, true}]),
  50. initialize(),
  51. application:set_env(kvs,rocks_ref,Ref).
  52. initialize() -> [ kvs:initialize(kvs_rocks,Module) || Module <- kvs:modules() ].
  53. ref() -> application:get_env(kvs,rocks_ref,[]).
  54. index(_,_,_) -> [].
  55. close_it(H) -> try rocksdb:iterator_close(H) catch error:badarg -> ok end.
  56. seek_it(K) -> o(K,K,ok,[fun rocksdb:iterator/2,fun rocksdb:iterator_move/2]).
  57. move_it(K,FK,Dir) -> o(K,FK,Dir,[fun rocksdb:iterator/2,fun rocksdb:iterator_move/2,fun rocksdb:iterator_move/2]).
  58. get(Tab, Key) ->
  59. case rocksdb:get(ref(), key(Tab,Key), []) of
  60. not_found -> {error,not_found};
  61. {ok,Bin} -> {ok,bt(Bin)} end.
  62. put(Records) when is_list(Records) -> lists:map(fun(Record) -> put(Record) end, Records);
  63. put(Record) -> rocksdb:put(ref(), key(Record), term_to_binary(Record), [{sync,true}]).
  64. delete(Feed, Id) -> rocksdb:delete(ref(), key(Feed,Id), []).
  65. count(_) -> 0.
  66. all(R) -> {ok,I} = rocksdb:iterator(ref(), []),
  67. Key = key(R),
  68. First = rocksdb:iterator_move(I, {seek,Key}),
  69. lists:reverse(next(I,Key,size(Key),First,[],[],-1,0)).
  70. next(I,Key,S,A,X,T,N,C) -> {_,L} = next2(I,Key,S,A,X,T,N,C), L.
  71. prev(I,Key,S,A,X,T,N,C) -> {_,L} = prev2(I,Key,S,A,X,T,N,C), L.
  72. shd([]) -> [];
  73. shd(X) -> hd(X).
  74. next2(_,Key,_,_,X,T,N,C) when C == N -> {shd(lists:reverse(T)),T};
  75. next2(I,Key,S,{ok,A,X},_,T,N,C) -> next2(I,Key,S,A,X,T,N,C);
  76. next2(_,Key,_,{error,_},X,T,_,_) -> {shd(lists:reverse(T)),T};
  77. next2(I,Key,S,A,X,T,N,C) when size(A) > S ->
  78. case binary:part(A, 0, S) of Key ->
  79. next2(I, Key, S, rocksdb:iterator_move(I, next), [], [bt(X)|T], N, C + 1);
  80. _ -> {shd(lists:reverse(T)),T} end;
  81. next2(_,Key,_,{ok,A,_},X,T,_,_) -> {bt(X),T};
  82. next2(_,Key,_,_,X,T,_,_) -> {shd(lists:reverse(T)),T}.
  83. prev2(_,Key,_,_,X,T,N,C) when C == N -> {bt(X),T};
  84. prev2(I,Key,S,{ok,A,X},_,T,N,C) -> prev2(I,Key,S,A,X,T,N,C);
  85. prev2(_,Key,_,{error,_},X,T,_,_) -> {bt(X),T};
  86. prev2(I,Key,S,A,X,T,N,C) when size(A) > S ->
  87. case binary:part(A, 0, S) of Key ->
  88. prev2(I, Key, S, rocksdb:iterator_move(I, prev), [], [bt(X)|T], N, C + 1);
  89. _ -> {shd(lists:reverse(T)),T} end;
  90. prev2(_,Key,_,{ok,A,_},X,T,_,_) -> {bt(X),T};
  91. prev2(_,Key,_,_,X,T,_,_) -> {bt(X),T}.
  92. cut(_,_,_,_,_,_,N,C) when C == N -> C;
  93. cut(I,Key,S,{ok,A,X},_,T,N,C) -> prev(I,Key,S,A,X,T,N,C);
  94. cut(_,___,_,{error,_},_,_,_,C) -> C;
  95. cut(I,Key,S,A,_,_,N,C) when size(A) > S ->
  96. case binary:part(A,0,S) of Key ->
  97. rocksdb:delete(ref(), A, []),
  98. Next = rocksdb:iterator_move(I, prev),
  99. cut(I,Key, S, Next, [], A, N, C + 1);
  100. _ -> C end;
  101. cut(_,_,_,_,_,_,_,C) -> C.
  102. seq(_,_) ->
  103. case os:type() of
  104. {win32,nt} -> {Mega,Sec,Micro} = erlang:timestamp(), integer_to_list((Mega*1000000+Sec)*1000000+Micro);
  105. _ -> erlang:integer_to_list(element(2,hd(lists:reverse(erlang:system_info(os_monotonic_time_source)))))
  106. end.
  107. create_table(_,_) -> [].
  108. add_table_index(_, _) -> ok.
  109. dump() -> ok.