kvs_stream.erl 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. -module(kvs_stream).
  2. -include("kvs.hrl").
  3. -include("user.hrl").
  4. -compile(export_all).
  5. -export([ new/1, top/1, bot/1, take/2, load/1, save/1, seek/2, next/1, prev/1, add/2 ]).
  6. % PUBLIC
  7. new(T) -> #cur{feed=kvs:next_id(cur,1),tab=T}.
  8. top(#cur{top=[]}=C) -> C#cur{val=[]};
  9. top(#cur{top=T}=C) -> seek(T,C).
  10. bot(#cur{bot=[]}=C) -> C#cur{val=[]};
  11. bot(#cur{bot=B}=C) -> seek(B,C).
  12. save(#cur{}=C) -> kvs:put(C), C.
  13. load(#cur{feed=K}) -> kvs:get(cur,K).
  14. next(#cur{tab=T,val=[]}=C) -> {error,[]};
  15. next(#cur{tab=T,val=B}=C) -> lookup(kvs:get(T,en(B)),C).
  16. prev(#cur{tab=T,val=[]}=C) -> {error,[]};
  17. prev(#cur{tab=T,val=B}=C) -> lookup(kvs:get(T,ep(B)),C).
  18. take(N,#cur{dir=D}=C) -> take(D,N,C,[]).
  19. seek(Id, #cur{tab=T}=C) -> {ok,R}=kvs:get(T,Id), C#cur{val=R}.
  20. add(M,#cur{dir=D}=C) -> add(dir(D),M,C).
  21. remove(Id, #cur{tab=M}=C) -> {ok,R}=kvs:get(M,Id), kvs:delete(M,Id),
  22. join([fix(M,X)||X<-[ep(R),en(R)]],C).
  23. % PRIVATE
  24. add(top,M,#cur{top=T,val=[]}=C) -> Id=id(M), N=sp(sn(M,T),[]), kvs:put(N), C#cur{val=N,bot=Id,top=Id};
  25. add(bot,M,#cur{bot=B,val=[]}=C) -> Id=id(M), N=sn(sp(M,B),[]), kvs:put(N), C#cur{val=N,bot=Id,top=Id};
  26. add(top,M,#cur{top=T, val=V}=C) when element(2,V) /=T -> add(top, M, top(C));
  27. add(bot,M,#cur{bot=B, val=V}=C) when element(2,V) /=B -> add(bot, M, bot(C));
  28. add(top,M,#cur{top=T, val=V}=C) -> Id=id(M), N=sp(sn(M,T),[]), kvs:put([N,sp(V,Id)]), C#cur{val=N,top=Id};
  29. add(bot,M,#cur{bot=B, val=V}=C) -> Id=id(M), N=sn(sp(M,B),[]), kvs:put([N,sn(V,Id)]), C#cur{val=N,bot=Id}.
  30. join([[],[]],C) -> C#cur{top=[],bot=[],val=[]};
  31. join([[], R],C) -> N=sp(R,[]), kvs:put(N), C#cur{top=id(N),val=N};
  32. join([L, []],C) -> N=sn(L,[]), kvs:put(N), C#cur{bot=id(N),val=N};
  33. join([L, R],C) -> N=sp(R,id(L)), kvs:put([N,sn(L,id(R))]), C#cur{val=N}.
  34. sn(M,T) -> setelement(#iterator.next, M, T).
  35. sp(M,T) -> setelement(#iterator.prev, M, T).
  36. el(X,T) -> element(X,T).
  37. tab(T) -> element(1,T).
  38. id(T) -> element(2,T).
  39. en(T) -> element(#iterator.next, T).
  40. ep(T) -> element(#iterator.prev, T).
  41. dir(next) -> top;
  42. dir(prev) -> bot.
  43. down(C) -> C#cur{dir=next}.
  44. up(C) -> C#cur{dir=prev}.
  45. fix(M,[]) -> [];
  46. fix(M,X) -> fix(kvs:get(M,X)).
  47. fix({ok,O}) -> O;
  48. fix(_) -> [].
  49. lookup({ok,R},C) -> C#cur{val=R};
  50. lookup(X,C) -> X.
  51. take(_,_,{error,_},R) -> lists:flatten(R);
  52. take(_,0,_,R) -> lists:flatten(R);
  53. take(A,N,#cur{val=B}=C,R) -> take(A,N-1,?MODULE:A(C),[B|R]).
  54. % TESTS
  55. check() -> test(), test2(), test3(), ok.
  56. test2() ->
  57. Cur = new(user),
  58. [A,B,C,D] = [ kvs:next_id(user,1) || _ <- lists:seq(1,4) ],
  59. R = save(add(#user{id=A},
  60. down(add(#user{id=B},
  61. up(add(#user{id=C},
  62. down(add(#user{id=D},
  63. up(Cur))))))))),
  64. X = remove(A,remove(B,remove(C,remove(D,R)))),
  65. [] = take(-1,down(top(X))).
  66. test3() ->
  67. Cur = new(user),
  68. [A,B,C,D] = [ kvs:next_id(user,1) || _ <- lists:seq(1,4) ],
  69. S = save(add(#user{id=A},
  70. down(add(#user{id=B},
  71. up(add(#user{id=C},
  72. down(add(#user{id=D},
  73. up(Cur))))))))),
  74. Y = remove(B,remove(D,remove(A,remove(C,S)))),
  75. [] = take(-1,down(top(Y))).
  76. test() ->
  77. Cur = new(user),
  78. take(-1,down(top(Cur))),
  79. [A,B,C,D] = [ kvs:next_id(user,1) || _ <- lists:seq(1,4) ],
  80. R = save(add(top,#user{id=A},
  81. add(bot,#user{id=B},
  82. add(top,#user{id=C},
  83. add(bot,#user{id=D}, Cur ))))),
  84. X = take(-1,down(top(R))),
  85. Y = take(-1,up(bot(R))),
  86. X = lists:reverse(Y),
  87. L = length(X).