kvs_rocks.erl 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. -module(kvs_rocks).
  2. -include("backend.hrl").
  3. -include("kvs.hrl").
  4. -include("metainfo.hrl").
  5. -include_lib("stdlib/include/qlc.hrl").
  6. -export(?BACKEND).
  7. -export([ref/0,cut/8,next/8,prev/8,prev2/8,next2/8,format/1,bt/1]).
  8. bt([]) -> [];
  9. bt(X) -> binary_to_term(X).
  10. start() -> ok.
  11. stop() -> ok.
  12. destroy() -> rocksdb:destroy(application:get_env(kvs,rocks_name,"rocksdb"), []).
  13. version() -> {version,"KVS ROCKSDB"}.
  14. dir() -> [].
  15. leave() -> case ref() of [] -> skip; X -> rocksdb:close(X), application:set_env(kvs,rocks_ref,[]), ok end.
  16. join(_) -> application:start(rocksdb),
  17. leave(), {ok, Ref} = rocksdb:open(application:get_env(kvs,rocks_name,"rocksdb"), [{create_if_missing, true}]),
  18. initialize(),
  19. application:set_env(kvs,rocks_ref,Ref).
  20. initialize() -> [ kvs:initialize(kvs_rocks,Module) || Module <- kvs:modules() ].
  21. ref() -> application:get_env(kvs,rocks_ref,[]).
  22. index(_,_,_) -> [].
  23. get(Tab, Key) ->
  24. Address = <<(list_to_binary(lists:concat(["/",format(Tab),"/"])))/binary,
  25. (term_to_binary(Key))/binary>>,
  26. % io:format("KVS.GET.Address: ~s~n",[Address]),
  27. case rocksdb:get(ref(), Address, []) of
  28. not_found -> {error,not_found};
  29. {ok,Bin} -> {ok,bt(Bin)} end.
  30. put(Records) when is_list(Records) -> lists:map(fun(Record) -> put(Record) end, Records);
  31. put(Record) ->
  32. Address = <<(list_to_binary(lists:concat(["/",format(element(1,Record)),"/"])))/binary,
  33. (term_to_binary(element(2,Record)))/binary>>,
  34. % io:format("KVS.PUT.Address: ~s~n",[Address]),
  35. rocksdb:put(ref(), Address, term_to_binary(Record), [{sync,true}]).
  36. format(X) when is_list(X) -> X;
  37. format(X) when is_atom(X) -> atom_to_list(X);
  38. format(X) when is_binary(X) -> binary_to_list(X);
  39. format(X) -> io_lib:format("~p",[X]).
  40. delete(Feed, Id) ->
  41. Key = list_to_binary(lists:concat(["/",format(Feed),"/"])),
  42. A = <<Key/binary,(term_to_binary(Id))/binary>>,
  43. rocksdb:delete(ref(), A, []).
  44. count(_) -> 0.
  45. all(R) -> {ok,I} = rocksdb:iterator(ref(), []),
  46. Key = list_to_binary(lists:concat(["/",format(R)])),
  47. First = rocksdb:iterator_move(I, {seek,Key}),
  48. lists:reverse(next(I,Key,size(Key),First,[],[],-1,0)).
  49. next(I,Key,S,A,X,T,N,C) -> {_,L} = next2(I,Key,S,A,X,T,N,C), L.
  50. prev(I,Key,S,A,X,T,N,C) -> {_,L} = prev2(I,Key,S,A,X,T,N,C), L.
  51. shd([]) -> [];
  52. shd(X) -> hd(X).
  53. next2(_,Key,_,_,X,T,N,C) when C == N -> {shd(lists:reverse(T)),T};
  54. next2(I,Key,S,{ok,A,X},_,T,N,C) -> next2(I,Key,S,A,X,T,N,C);
  55. next2(_,Key,_,{error,_},X,T,_,_) -> {shd(lists:reverse(T)),T};
  56. next2(I,Key,S,A,X,T,N,C) when size(A) > S ->
  57. case binary:part(A, 0, S) of Key ->
  58. next2(I, Key, S, rocksdb:iterator_move(I, next), [], [bt(X)|T], N, C + 1);
  59. _ -> {shd(lists:reverse(T)),T} end;
  60. next2(_,Key,_,{ok,A,_},X,T,_,_) -> {bt(X),T};
  61. next2(_,Key,_,_,X,T,_,_) -> {shd(lists:reverse(T)),T}.
  62. prev2(_,Key,_,_,X,T,N,C) when C == N -> {bt(X),T};
  63. prev2(I,Key,S,{ok,A,X},_,T,N,C) -> prev2(I,Key,S,A,X,T,N,C);
  64. prev2(_,Key,_,{error,_},X,T,_,_) -> {bt(X),T};
  65. prev2(I,Key,S,A,X,T,N,C) when size(A) > S ->
  66. case binary:part(A, 0, S) of Key ->
  67. prev2(I, Key, S, rocksdb:iterator_move(I, prev), [], [bt(X)|T], N, C + 1);
  68. _ -> {shd(lists:reverse(T)),T} end;
  69. prev2(_,Key,_,{ok,A,_},X,T,_,_) -> {bt(X),T};
  70. prev2(_,Key,_,_,X,T,_,_) -> {bt(X),T}.
  71. cut(_,_,_,_,_,_,N,C) when C == N -> C;
  72. cut(I,Key,S,{ok,A,X},_,T,N,C) -> prev(I,Key,S,A,X,T,N,C);
  73. cut(_,___,_,{error,_},_,_,_,C) -> C;
  74. cut(I,Key,S,A,_,_,N,C) when size(A) > S ->
  75. case binary:part(A,0,S) of Key ->
  76. rocksdb:delete(ref(), A, []),
  77. Next = rocksdb:iterator_move(I, prev),
  78. cut(I,Key, S, Next, [], A, N, C + 1);
  79. _ -> C end;
  80. cut(_,_,_,_,_,_,_,C) -> C.
  81. seq(_,_) ->
  82. case os:type() of
  83. {win32,nt} -> {Mega,Sec,Micro} = erlang:timestamp(), integer_to_list((Mega*1000000+Sec)*1000000+Micro);
  84. _ -> erlang:integer_to_list(element(2,hd(lists:reverse(erlang:system_info(os_monotonic_time_source)))))
  85. end.
  86. create_table(_,_) -> [].
  87. add_table_index(_, _) -> ok.
  88. dump() -> ok.