kvs_st.erl 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. -module(kvs_st).
  2. -description('KVS STREAM NATIVE ROCKS').
  3. -include("kvs.hrl").
  4. -include("stream.hrl").
  5. -include("metainfo.hrl").
  6. -export(?STREAM).
  7. ref() -> kvs_rocks:ref().
  8. % section: kvs_stream prelude
  9. se(X,Y,Z) -> setelement(X,Y,Z).
  10. e(X,Y) -> element(X,Y).
  11. c3(R,V) -> se(#reader.cache, R, V).
  12. c4(R,V) -> se(#reader.args, R, V).
  13. si(M,T) -> se(#it.id, M, T).
  14. id(T) -> e(#it.id, T).
  15. % section: next, prev
  16. top (#reader{}=C) -> C.
  17. bot (#reader{}=C) -> C.
  18. next (#reader{cache=[]}) -> {error,empty};
  19. next (#reader{cache=I}=C) ->
  20. case rocksdb:iterator_move(I, next) of
  21. {ok,_,Bin} -> C#reader{cache=binary_to_term(Bin,[safe])};
  22. {error,Reason} -> {error,Reason} end.
  23. prev (#reader{cache=[]}) -> {error,empty};
  24. prev (#reader{cache=I}=C) ->
  25. case rocksdb:iterator_move(I, prev) of
  26. {ok,_,Bin} -> C#reader{cache=binary_to_term(Bin,[safe])};
  27. {error,Reason} -> {error,Reason} end.
  28. % section: take, drop
  29. drop(#reader{args=N}) when N < 0 -> #reader{};
  30. drop(#reader{args=N,feed=Feed,cache=I}=C) when N == 0 ->
  31. Key = list_to_binary(lists:concat(["/",io_lib:format("~p",[Feed])])),
  32. case rocksdb:iterator_move(I, {seek,Key}) of
  33. {ok,_,Bin} -> C#reader{cache=binary_to_term(Bin,[safe])};
  34. _ -> C#reader{cache=[]} end;
  35. drop(#reader{args=N,feed=Feed,cache=I}=C) when N > 0 ->
  36. Key = list_to_binary(lists:concat(["/",io_lib:format("~p",[Feed])])),
  37. First = rocksdb:iterator_move(I, {seek,Key}),
  38. Term = lists:foldl(
  39. fun (_,{{ok,K,_},{_,X}}) when N > X -> {K,{<<131,106>>,N}};
  40. (_,{{ok,K,Bin},{A,X}}) when N =< X->
  41. case binary:part(K,0,size(Key)) of
  42. Key -> {rocksdb:iterator_move(I,next),{Bin,X+1}};
  43. _ -> {{error,range},{A,X}} end;
  44. (_,{_,{_,_}}) -> {[],{<<131,106>>,N}}
  45. end,
  46. {First,{<<131,106>>,1}},
  47. lists:seq(0,N)),
  48. C#reader{cache=binary_to_term(element(1,element(2,Term)))}.
  49. take(#reader{args=N,feed=Feed,cache=I,dir=Dir}=C) ->
  50. Key = list_to_binary(lists:concat(["/",io_lib:format("~p",[Feed])])),
  51. First = rocksdb:iterator_move(I, {seek,Key}),
  52. Res = kvs_rocks:next(I,Key,size(Key),First,[],[],N,0),
  53. C#reader{args= case Dir of 0 -> Res; 1 -> lists:reverse(Res) end}.
  54. % new, save, load, up, down, top, bot
  55. load_reader (Id) ->
  56. case kvs:get(reader,Id) of
  57. {ok,#reader{}=C} -> C#reader{cache=element(2,rocksdb:iterator(ref(),[]))};
  58. _ -> #reader{id=[]} end.
  59. writer (Id) -> case kvs:get(writer,Id) of {ok,W} -> W; {error,_} -> #writer{id=Id} end.
  60. reader (Id) ->
  61. case kvs:get(writer,Id) of
  62. {ok,#writer{}} ->
  63. {ok,I} = rocksdb:iterator(ref(), []),
  64. #reader{id=kvs:seq([],[]),feed=Id,cache=I};
  65. {error,_} -> #reader{} end.
  66. save (C) -> NC = c4(C,[]), N2 = c3(NC,[]), kvs:put(N2), N2.
  67. feed(Key) -> kvs:all(Key).
  68. % add
  69. add(#writer{args=M}=C) when element(2,M) == [] -> add(si(M,kvs:seq([],[])),C);
  70. add(#writer{args=M}=C) -> add(M,C).
  71. add(M,#writer{id=Feed,count=S}=C) -> NS=S+1,
  72. rocksdb:put(ref(),
  73. <<(list_to_binary(lists:concat(["/",io_lib:format("~p",[Feed]),"/"])))/binary,
  74. (term_to_binary(id(M)))/binary>>, term_to_binary(M), [{sync,true}]),
  75. C#writer{cache=M,count=NS}.
  76. append(Rec,Feed) ->
  77. kvs:ensure(#writer{id=Feed}),
  78. Id = element(2,Rec),
  79. case kvs:get(Feed,Id) of
  80. {ok,_} -> Id;
  81. {error,_} -> kvs:save(kvs:add((kvs:writer(Feed))#writer{args=Rec})), Id end.
  82. prev(_,_,_,_,_,_,N,C) when C == N -> C;
  83. prev(I,Key,S,{ok,A,X},_,T,N,C) -> prev(I,Key,S,A,X,T,N,C);
  84. prev(_,___,_,{error,_},_,_,_,C) -> C;
  85. prev(I,Key,S,A,_,_,N,C) when size(A) > S ->
  86. case binary:part(A,0,S) of Key ->
  87. rocksdb:delete(ref(), A, []),
  88. Next = rocksdb:iterator_move(I, prev),
  89. prev(I,Key, S, Next, [], A, N, C + 1);
  90. _ -> C end;
  91. prev(_,_,_,_,_,_,_,C) -> C.
  92. cut(Feed,Id) ->
  93. Key = list_to_binary(lists:concat(["/",io_lib:format("~p",[Feed]),"/"])),
  94. A = <<Key/binary,(term_to_binary(Id))/binary>>,
  95. {ok,I} = rocksdb:iterator(ref(), []),
  96. case rocksdb:iterator_move(I, {seek,A}) of
  97. {ok,A,X} -> {ok,prev(I,Key,size(Key),A,X,[],-1,0)};
  98. _ -> {error,not_found} end.