kvs_st.erl 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. -module(kvs_st).
  2. -description('KVS STREAM NATIVE ROCKS').
  3. -include("kvs.hrl").
  4. -include("stream.hrl").
  5. -include("metainfo.hrl").
  6. -export(?STREAM).
  7. -import(kvs_rocks, [key/2, key/1, bt/1, ref/0]).
  8. % section: kvs_stream prelude
  9. se(X,Y,Z) -> setelement(X,Y,Z).
  10. e(X,Y) -> element(X,Y).
  11. c4(R,V) -> se(#reader.args, R, V).
  12. si(M,T) -> se(#it.id, M, T).
  13. id(T) -> e(#it.id, T).
  14. % section: next, prev
  15. feed(Feed) -> take((reader(Feed))#reader{args=-1}).
  16. top(#reader{}=C) -> C#reader{dir=1}.
  17. bot(#reader{}=C) -> C#reader{dir=0}.
  18. % handle -> seek -> move
  19. move_it(Key,Dir) ->
  20. Seek = fun(F,{ok,H}) -> {F(H,{seek,Key}),H};
  21. (F,{{ok,_,_},H}) -> F(H,Dir);
  22. (F,{{ok,_}, H}) -> F(H,Dir);
  23. (_,{error,Error}) -> {error,Error};
  24. (_,{{error,Error},_}) -> {error,Error};
  25. (F,{R,O}) -> F(R,O) end,
  26. case lists:foldl(Seek, {ref(),[]},
  27. [fun rocksdb:iterator/2, fun rocksdb:iterator_move/2, fun rocksdb:iterator_move/2]) of
  28. {ok,_,Bin} -> {ok,bt(Bin)};
  29. {error, Error} -> {error,Error}
  30. end.
  31. % iterator -> specific feed reader
  32. read_it(C, Feed, Move) ->
  33. case Move of
  34. {ok, Bin} when element(1,Bin) =:= Feed -> C#reader{cache=Bin};
  35. {ok,_} -> C;
  36. {error, Error} -> {error, Error}
  37. end.
  38. next(#reader{cache=[]}) -> {error,empty};
  39. next(#reader{feed=Feed,cache=I}=C) when is_tuple(I) -> read_it(C,Feed,move_it(key(Feed,I),next)).
  40. prev(#reader{cache=[]}) -> {error,empty};
  41. prev(#reader{cache=I,feed=Feed}=C) when is_tuple(I) -> read_it(C,Feed,move_it(key(Feed,I),prev)).
  42. % section: take, drop
  43. drop(#reader{args=N}) when N < 0 -> #reader{};
  44. drop(#reader{args=N}=C) when N == 0 -> C;
  45. drop(#reader{args=N,feed=Feed,cache=I}=C) when N > 0 ->
  46. Key = key(Feed),
  47. {ok, H} = rocksdb:iterator(ref(), []),
  48. First = rocksdb:iterator_move(H, {seek,Key}),
  49. Term = lists:foldl(
  50. fun (_,{{ok,K,_},{_,X}}) when N > X -> {K,{<<131,106>>,N}};
  51. (_,{{ok,K,Bin},{A,X}}) when N =< X->
  52. case binary:part(K,0,size(Key)) of
  53. Key -> {rocksdb:iterator_move(H,next),{Bin,X+1}};
  54. _ -> {{error,range},{A,X}} end;
  55. (_,{_,{_,_}}) -> {[],{<<131,106>>,N}}
  56. end,
  57. {First,{<<131,106>>,1}},
  58. lists:seq(0,N)),
  59. C#reader{cache=bt(element(1,element(2,Term)))}.
  60. % 1. Курсор всегда выставлен на следущий невычитанный элемент
  61. % 2. Если после вычитки курсор указывает на недавно вычитаный элемент -- это признак конца списка
  62. % 3. Если результат вычитки меньше требуемого значения -- это признак конца списка
  63. % 4. Если курсор установлен в конец списка и уже вернул его последний элемент
  64. % то результат вычитки будет равным пустому списку
  65. take(#reader{pos='end',dir=0}=C) -> C#reader{args=[]}; % 4
  66. take(#reader{args=N,feed=Feed,cache={T,O},dir=0}=C) -> % 1
  67. Key = key(Feed),
  68. {ok,I} = rocksdb:iterator(ref(), []),
  69. {ok,K,BERT} = rocksdb:iterator_move(I, {seek,key(Feed,{T,O})}),
  70. {KK,Res} = kvs_rocks:next2(I,Key,size(Key),K,BERT,[],case N of -1 -> -1; J -> J + 1 end,0),
  71. Last = last(KK,O,'end'),
  72. case {Res,length(Res)} of
  73. {[],_} -> C#reader{args=[],cache=[]};
  74. {[H], _A} when element(2,KK) == O -> C#reader{args=Res,pos=Last,cache={e(1,H),e(2,H)}}; % 2
  75. {[H|_X],A} when A < N + 1 orelse N == -1 -> C#reader{args=Res,cache={e(1,H),e(2,H)},pos=Last};
  76. {[H| X],A} when A == N -> C#reader{args=[bt(BERT)|X],cache={e(1,H),e(2,H)},pos=Last};
  77. {[H|_X],A} when A =< N andalso Last == 'end'-> C#reader{args=Res,cache={e(1,H),e(2,H)},pos=Last};
  78. {[H| X],_} -> C#reader{args=X,cache={e(1,H),e(2,H)}} end;
  79. take(#reader{pos=0,dir=0}=C) -> C#reader{pos='begin',args=[]};
  80. take(#reader{pos='begin',dir=1}=C) -> C#reader{args=[]}; % 4
  81. % TODO: try to remove lists:reverse and abstract both branches
  82. take(#reader{args=N,feed=Feed,cache={T,O},dir=1}=C) -> % 1
  83. Key = key(Feed),
  84. {ok,I} = rocksdb:iterator(ref(), []),
  85. {ok,K,BERT} = rocksdb:iterator_move(I, {seek,key(Feed,{T,O})}),
  86. {KK,Res} = kvs_rocks:prev2(I,Key,size(Key),K,BERT,[],case N of -1 -> -1; J -> J + 1 end,0),
  87. Last = last(KK,O,'begin'),
  88. case {lists:reverse(Res),length(Res)} of
  89. {[],_} -> C#reader{args=[],cache=[]};
  90. {[H],_} when element(2,KK) == O -> C#reader{args=Res,pos=Last,cache={e(1,H),e(2,H)}}; % 2
  91. {[_|_],A} when A < N - 1 orelse N == -1 -> [HX|_] = Res, C#reader{args=Res,cache={e(1,HX),e(2,HX)},pos=Last};
  92. {[_|X],A} when A == N -> [HX|_] = Res, C#reader{args=[bt(BERT)|X],cache={e(1,HX),e(2,HX)},pos=Last};
  93. {[_|_],A} when A =< N andalso Last == 'begin'-> [HX|_] = Res, C#reader{args=lists:reverse(Res),cache={e(1,HX),e(2,HX)},pos=Last};
  94. {[_|_],_} -> [HX|TL] = Res, C#reader{args=lists:reverse(TL),cache={e(1,HX),e(2,HX)}} end.
  95. last(KK,O,Atom) ->
  96. Last = case KK of
  97. [] -> Atom;
  98. _ when element(2,KK) == O -> Atom;
  99. _ -> 0
  100. end,
  101. Last.
  102. % new, save, load, up, down, top, bot
  103. load_reader(Id) ->
  104. case kvs:get(reader,Id) of
  105. {ok,#reader{}=C} -> C;
  106. _ -> #reader{id=[]} end.
  107. writer(Id) -> case kvs:get(writer,Id) of {ok,W} -> W; {error,_} -> #writer{id=Id} end.
  108. reader(Id) ->
  109. case kvs:get(writer,Id) of
  110. {ok,#writer{id=Feed}} ->
  111. Key = key(Feed),
  112. {ok,I} = rocksdb:iterator(ref(), []),
  113. {ok,_,BERT} = rocksdb:iterator_move(I, {seek,Key}),
  114. F = bt(BERT),
  115. #reader{id=kvs:seq([],[]),feed=Id,cache={e(1,F),e(2,F)}};
  116. {error,_} -> #reader{} end.
  117. save(C) -> NC = c4(C,[]), kvs:put(NC), NC.
  118. % add
  119. add(#writer{args=M}=C) when element(2,M) == [] -> add(si(M,kvs:seq([],[])),C);
  120. add(#writer{args=M}=C) -> add(M,C).
  121. add(M,#writer{id=Feed,count=S}=C) -> NS=S+1, raw_append(M,Feed), C#writer{cache=M,count=NS}.
  122. remove(Rec,Feed) ->
  123. kvs:ensure(#writer{id=Feed}),
  124. W = #writer{count=C} = kvs:writer(Feed),
  125. {ok,I} = rocksdb:iterator(ref(), []),
  126. case kvs:delete(Feed,id(Rec)) of
  127. ok -> Count = C - 1,
  128. kvs:save(W#writer{count = Count, cache = I}),
  129. Count;
  130. _ -> C end.
  131. raw_append(M,Feed) ->
  132. rocksdb:put(ref(), key(Feed,M), term_to_binary(M), [{sync,true}]).
  133. append(Rec,Feed) ->
  134. kvs:ensure(#writer{id=Feed}),
  135. Id = e(2,Rec),
  136. W = kvs:writer(Feed),
  137. case kvs:get(Feed,Id) of
  138. {ok,_} -> raw_append(Rec,Feed), kvs:save(W#writer{cache=Rec,count=W#writer.count + 1}), Id;
  139. {error,_} -> kvs:save(kvs:add(W#writer{args=Rec,cache=Rec})), Id end.
  140. cut(Feed,Id) ->
  141. Key = key(Feed),
  142. A = key(Feed,Id),
  143. {ok,I} = rocksdb:iterator(ref(), []),
  144. case rocksdb:iterator_move(I, {seek,A}) of
  145. {ok,A,X} -> {ok,kvs_rocks:cut(I,Key,size(Key),A,X,[],-1,0)};
  146. _ -> {error,not_found} end.