kvs_st.erl 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. -module(kvs_st).
  2. -description('KVS STREAM NATIVE ROCKS').
  3. -include("kvs.hrl").
  4. -include("stream.hrl").
  5. -include("metainfo.hrl").
  6. -export(?STREAM).
  7. -import(kvs_rocks, [key/2, key/1, bt/1, ref/0, fd/1, seek_it/1, move_it/3]).
  8. % section: kvs_stream prelude
  9. se(X,Y,Z) -> setelement(X,Y,Z).
  10. e(X,Y) -> element(X,Y).
  11. c4(R,V) -> se(#reader.args, R, V).
  12. si(M,T) -> se(#it.id, M, T).
  13. id(T) -> e(#it.id, T).
  14. % section: next, prev
  15. feed(Feed) -> #reader{args=Args} = take((reader(Feed))#reader{args=-1}), Args.
  16. top(#reader{feed=Feed}=C) -> #writer{count=Cn} = writer(Feed), read_it(C#reader{count=Cn},seek_it(key(Feed))).
  17. bot(#reader{feed=Feed}=C) -> #writer{cache=Ch, count=Cn} = writer(Feed), C#reader{cache=Ch, count=Cn, dir=1}.
  18. % iterator -> specific feed reader
  19. read_it(C,{ok,F,V,H}) -> C#reader{cache={e(1,V),id(V), F}, args=lists:reverse(H)};
  20. read_it(C,_) -> C.
  21. next(#reader{feed=Feed,cache=I}=C) -> read_it(C,move_it(key(Feed,I),key(Feed),next)).
  22. prev(#reader{cache=I,feed=Feed}=C) -> read_it(C,move_it(key(Feed,I),key(Feed),prev)).
  23. % section: take, drop
  24. drop(#reader{args=N}=C) when N =< 0 -> C;
  25. drop(#reader{args=N,feed=Feed,cache=I}=C) -> (take(C#reader{dir=0}))#reader{args=[]}.
  26. % 1. Курсор всегда выставлен на следущий невычитанный элемент
  27. % 2. Если после вычитки курсор указывает на недавно вычитаный элемент -- это признак конца списка
  28. % 3. Если результат вычитки меньше требуемого значения -- это признак конца списка
  29. % 4. Если курсор установлен в конец списка и уже вернул его последний элемент
  30. % то результат вычитки будет равным пустому списку
  31. take(#reader{pos='end',dir=0}=C) -> C#reader{args=[]}; % 4
  32. take(#reader{args=N,feed=Feed,cache={T,O,_},dir=0}=C) -> take(C#reader{cache={T,O}});
  33. take(#reader{args=N,feed=Feed,cache={T,O},dir=0}=C) -> % 1
  34. Key = key(Feed),
  35. {ok,I} = rocksdb:iterator(ref(), []),
  36. {ok,K,BERT} = rocksdb:iterator_move(I, {seek,key(Feed,{T,O})}),
  37. {KK,Res} = kvs_rocks:next2(I,Key,size(Key),K,BERT,[],case N of -1 -> -1; J -> J + 1 end,0),
  38. Last = last(KK,O,'end'),
  39. case {Res,length(Res)} of
  40. {[],_} -> C#reader{args=[],cache=[]};
  41. {[H], _A} when element(2,KK) == O -> C#reader{args=Res,pos=Last,cache={e(1,H),e(2,H)}}; % 2
  42. {[H|_X],A} when A < N + 1 orelse N == -1 -> C#reader{args=Res,cache={e(1,H),e(2,H)},pos=Last};
  43. {[H| X],A} when A == N -> C#reader{args=[bt(BERT)|X],cache={e(1,H),e(2,H)},pos=Last};
  44. {[H|_X],A} when A =< N andalso Last == 'end'-> C#reader{args=Res,cache={e(1,H),e(2,H)},pos=Last};
  45. {[H| X],_} -> C#reader{args=X,cache={e(1,H),e(2,H)}} end;
  46. take(#reader{pos=0,dir=0}=C) -> C#reader{pos='begin',args=[]};
  47. take(#reader{pos='begin',dir=1}=C) -> C#reader{args=[]}; % 4
  48. take(#reader{pos=0,cache=[],dir=1}=C) -> C#reader{args=[]};
  49. % TODO: try to remove lists:reverse and abstract both branches
  50. take(#reader{args=N,feed=Feed,cache={T,O,_},dir=1}=C) -> take(C#reader{cache={T,O}});
  51. take(#reader{args=N,feed=Feed,cache={T,O},dir=1}=C) -> % 1
  52. Key = key(Feed),
  53. {ok,I} = rocksdb:iterator(ref(), []),
  54. {ok,K,BERT} = rocksdb:iterator_move(I, {seek,key(Feed,{T,O})}),
  55. {KK,Res} = kvs_rocks:prev2(I,Key,size(Key),K,BERT,[],case N of -1 -> -1; J -> J + 1 end,0),
  56. Last = last(KK,O,'begin'),
  57. case {lists:reverse(Res),length(Res)} of
  58. {[],_} -> C#reader{args=[],cache=[]};
  59. {[H],_} when element(2,KK) == O -> C#reader{args=Res,pos=Last,cache={e(1,H),e(2,H)}}; % 2
  60. {[_|_],A} when A < N - 1 orelse N == -1 -> [HX|_] = Res, C#reader{args=Res,cache={e(1,HX),e(2,HX)},pos=Last};
  61. {[_|X],A} when A == N -> [HX|_] = Res, C#reader{args=[bt(BERT)|X],cache={e(1,HX),e(2,HX)},pos=Last};
  62. {[_|_],A} when A =< N andalso Last == 'begin'-> [HX|_] = Res, C#reader{args=lists:reverse(Res),cache={e(1,HX),e(2,HX)},pos=Last};
  63. {[_|_],_} -> [HX|TL] = Res, C#reader{args=lists:reverse(TL),cache={e(1,HX),e(2,HX)}} end.
  64. last(KK,O,Atom) ->
  65. Last = case KK of
  66. [] -> Atom;
  67. _ when element(2,KK) == O -> Atom;
  68. _ -> 0
  69. end,
  70. Last.
  71. % new, save, load, up, down, top, bot
  72. load_reader(Id) ->
  73. case kvs:get(reader,Id) of
  74. {ok,#reader{}=C} -> C;
  75. _ -> #reader{id=[]} end.
  76. writer(Id) -> case kvs:get(writer,Id) of {ok,W} -> W; {error,_} -> #writer{id=Id} end.
  77. reader(Id) -> case kvs:get(writer,Id) of
  78. {ok,#writer{id=Feed, count=Cn}} ->
  79. {ok,I} = rocksdb:iterator(ref(), []),
  80. {ok,F1,BERT} = rocksdb:iterator_move(I, {seek,key(Feed)}),
  81. F = bt(BERT),
  82. #reader{id=kvs:seq([],[]),feed=Id,count=Cn,cache={e(1,F),e(2,F),fd(F1)}};
  83. {error,_} -> save(#writer{id=Id}), reader(Id) end.
  84. save(C) -> NC = c4(C,[]), kvs:put(NC), NC.
  85. % add
  86. add(#writer{args=M}=C) when element(2,M) == [] -> add(si(M,kvs:seq([],[])),C);
  87. add(#writer{args=M}=C) -> add(M,C).
  88. add(M,#writer{id=Feed,count=S}=C) -> NS=S+1, raw_append(M,Feed), C#writer{cache={e(1,M),e(2,M),fd(Feed)},count=NS}.
  89. remove(Rec,Feed) ->
  90. kvs:ensure(#writer{id=Feed}),
  91. W = #writer{count=C, cache=Ch} = kvs:writer(Feed),
  92. Ch1 = case {e(1,Rec),e(2,Rec)} of Ch -> Ch;_ -> [] end, % need to keep reference for next element
  93. case kvs:delete(Feed,id(Rec)) of
  94. ok -> Count = C - 1,
  95. save(W#writer{count = Count, cache=Ch1}),
  96. Count;
  97. _ -> C end.
  98. raw_append(M,Feed) ->
  99. rocksdb:put(ref(), key(Feed,M), term_to_binary(M), [{sync,true}]).
  100. append(Rec,Feed) ->
  101. kvs:ensure(#writer{id=Feed}),
  102. Id = e(2,Rec),
  103. W = writer(Feed),
  104. case kvs:get(Feed,Id) of
  105. {ok,_} -> raw_append(Rec,Feed), save(W#writer{cache={e(1,Rec),Id,fd(Feed)},count=W#writer.count + 1}), Id;
  106. {error,_} -> save(add(W#writer{args=Rec})), Id end.
  107. cut(Feed,Id) ->
  108. Key = key(Feed),
  109. A = key(Feed,Id),
  110. {ok,I} = rocksdb:iterator(ref(), []),
  111. case rocksdb:iterator_move(I, {seek,A}) of
  112. {ok,A,X} -> {ok,kvs_rocks:cut(I,Key,size(Key),A,X,[],-1,0)};
  113. _ -> {error,not_found} end.