kvs_stream.erl 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. -module(kvs_stream).
  2. -description('KVS STREAM').
  3. -copyrihgt('Synrc Research Center').
  4. -author('Maxim Sokhatsky').
  5. -license('ISC').
  6. -include_lib("kvs/include/kvs.hrl").
  7. -compile(export_all).
  8. -include_lib("stdlib/include/assert.hrl").
  9. % section: kvs_stream prelude
  10. se(X,Y,Z) -> setelement(X,Y,Z).
  11. set(X,Y,Z) -> setelement(X,Z,Y).
  12. e(X,Y) -> element(X,Y).
  13. c0(R,V) -> se(1, R, V).
  14. c1(R,V) -> se(#reader.id, R, V).
  15. c2(R,V) -> se(#reader.pos, R, V).
  16. c3(R,V) -> se(#reader.cache, R, V).
  17. c4(R,V) -> se(#reader.args, R, V).
  18. c5(R,V) -> se(#reader.feed, R, V).
  19. c6(R,V) -> se(#reader.dir, R, V).
  20. wf(R,V) -> se(#writer.first, R, V).
  21. sn(M,T) -> se(#iter.next, M, T).
  22. sp(M,T) -> se(#iter.prev, M, T).
  23. si(M,T) -> se(#iter.id, M, T).
  24. sf(M,T) -> se(#iter.feed, M, T).
  25. el(X,T) -> e(X, T).
  26. tab(T) -> e(1, T).
  27. id(T) -> e(#iter.id, T).
  28. en(T) -> e(#iter.next, T).
  29. ep(T) -> e(#iter.prev, T).
  30. pos(T) -> e(#reader.pos, T).
  31. args(T) -> e(#writer.args, T).
  32. dir(0) -> top;
  33. dir(1) -> bot.
  34. acc(0) -> next;
  35. acc(1) -> prev.
  36. % section: next, prev
  37. top(#reader{feed=F}=C) -> w(kvs:get(writer,F),top,C).
  38. bot(#reader{feed=F}=C) -> w(kvs:get(writer,F),bot,C).
  39. next(#reader{cache=[]}) -> {error,empty};
  40. next(#reader{cache={T,R},pos=P}=C) -> n(kvs:get(T,R),C,P+1).
  41. prev(#reader{cache=[]}) -> {error,empty};
  42. prev(#reader{cache={T,R},pos=P}=C) -> p(kvs:get(T,R),C,P-1).
  43. n({ok,R},C,P) -> r(kvs:get(tab(R),en(R)),C,P);
  44. n({error,X},_,_) -> {error,X}.
  45. p({ok,R},C,P) -> r(kvs:get(tab(R),ep(R)),C,P);
  46. p({error,X},_,_) -> {error,X}.
  47. r({ok,R},C,P) -> C#reader{cache={tab(R),id(R)},pos=P};
  48. r({error,X},_,_) -> {error,X}.
  49. w({ok,#writer{first=B}},bot,C) -> C#reader{cache={tab(B),id(B)},pos=1};
  50. w({ok,#writer{cache=B,count=Size}},top,C) -> C#reader{cache={tab(B),id(B)},pos=Size};
  51. w({error,X},_,_) -> {error,X}.
  52. % section: take, drop
  53. drop(#reader{dir=D,cache=B,args=N,pos=P}=C) -> drop(acc(D),N,C,C,P,B).
  54. take(#reader{dir=D,cache=B,args=N,pos=P}=C) -> take(acc(D),N,C,C,[],P,B).
  55. take(_,_,{error,_},C2,R,P,B) -> C2#reader{args=lists:flatten(R),pos=P,cache=B};
  56. take(_,0,_,C2,R,P,B) -> C2#reader{args=lists:flatten(R),pos=P,cache=B};
  57. take(A,N,#reader{cache={T,I},pos=P}=C,C2,R,_,_) ->
  58. take(A,N-1,?MODULE:A(C),C2,[element(2,kvs:get(T,I))|R],P,{T,I}).
  59. drop(_,_,{error,_},C2,P,B) -> C2#reader{pos=P,cache=B};
  60. drop(_,0,_,C2,P,B) -> C2#reader{pos=P,cache=B};
  61. drop(A,N,#reader{cache=B,pos=P}=C,C2,_,_) ->
  62. drop(A,N-1,?MODULE:A(C),C2,P,B).
  63. % new, save, load, up, down, top, bot
  64. load_writer (Id) -> case kvs:get(writer,Id) of {ok,C} -> C; E -> E end.
  65. load_reader (Id) -> case kvs:get(reader,Id) of {ok,C} -> C; E -> E end.
  66. writer (Id) -> #writer{id=Id}.
  67. reader (Id) ->
  68. case kvs:get(writer,Id) of
  69. {ok,#writer{first=[]}} -> #reader{id=kvs:next_id(reader,1),feed=Id,cache=[]};
  70. {ok,#writer{first=F}} -> #reader{id=kvs:next_id(reader,1),feed=Id,cache={tab(F),id(F)}};
  71. {error,X} -> {error,X} end.
  72. save (C) -> NC = c4(C,[]), kvs:put(NC), NC.
  73. up (C) -> C#reader{dir=0}.
  74. down (C) -> C#reader{dir=1}.
  75. % add
  76. add(#writer{args=M}=C) when element(2,M) == [] -> add(si(M,kvs:next_id(tab(M),1)),C);
  77. add(#writer{args=M}=C) -> add(M,C).
  78. add(M,#writer{cache=[]}=C) ->
  79. _Id=id(M), N=sp(sn(M,[]),[]), kvs:put(N),
  80. C#writer{cache=N,count=1,first=N};
  81. add(M,#writer{cache=V,count=S}=C) ->
  82. N=sp(sn(M,[]),id(V)), P=sn(V,id(M)), kvs:put([N,P]),
  83. C#writer{cache=N,count=S+1}.
  84. % tests
  85. check() -> test1().
  86. test1() ->
  87. Id = {p2p,1,2},
  88. X = 5,
  89. _W = kvs_stream:save(kvs_stream:writer(Id)),
  90. #reader{id=R1} = kvs_stream:save(kvs_stream:reader(Id)),
  91. #reader{id=R2} = kvs_stream:save(kvs_stream:reader(Id)),
  92. [ kvs_stream:save(
  93. kvs_stream:add((
  94. kvs_stream:load_writer(Id))
  95. #writer{args={user2,[],[],[],[],[],[],[],[],[]}})) || _ <- lists:seq(1,X) ],
  96. Bot = kvs_stream:bot(kvs_stream:load_reader(R1)),
  97. Top = kvs_stream:top(kvs_stream:load_reader(R2)),
  98. #reader{args=F} = kvs_stream:take(Bot#reader{args=20,dir=0}),
  99. #reader{args=B} = kvs_stream:take(Top#reader{args=20,dir=1}),
  100. ?assertMatch(X,length(F)),
  101. ?assertMatch(F,lists:reverse(B)).