kvs_stream.erl 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. -module(kvs_stream).
  2. -description('KVS STREAM').
  3. -copyrihgt('Synrc Research Center').
  4. -author('Maxim Sokhatsky').
  5. -license('ISC').
  6. -include("kvs.hrl").
  7. -export([ new/0, top/1, bot/1, take/1, drop/1, load/1, save/1, down/1, up/1, cons/1, snoc/1,
  8. check/0, seek/1, rewind/1, next/1, prev/1, add/1, remove/1 ]).
  9. % n2o stream protocol
  10. info(#cur{id=I,status=load}=C,R,S) -> {reply, {bert, kvs_stream:load(I)}, R, S};
  11. info(#cur{status=Method} =C,R,S) -> {reply, {bert, kvs_stream:Method(C)}, R, S};
  12. info( C,R,S) -> {reply, {unknown,C}, R, S}.
  13. % section: kvs_stream prelude
  14. se(X,Y,Z) -> setelement(X,Y,Z).
  15. set(X,Y,Z) -> setelement(X,Z,Y).
  16. e(X,Y) -> element(X,Y).
  17. cv(R,V) -> se(#cur.writer,R, V).
  18. cb(R,V) -> se(#cur.bot, R, V).
  19. ct(R,V) -> se(#cur.top, R, V).
  20. cl(R,V) -> se(#cur.left, R, V).
  21. cr(R,V) -> se(#cur.right, R, V).
  22. cd(R,V) -> se(#cur.dir, R, V).
  23. sn(M,T) -> se(#iter.next, M, T).
  24. sp(M,T) -> se(#iter.prev, M, T).
  25. si(M,T) -> se(#iter.id, M, T).
  26. el(X,T) -> e(X, T).
  27. tab(T) -> e(1, T).
  28. et(T) -> e(#cur.top, T).
  29. eb(T) -> e(#cur.bot, T).
  30. id(T) -> e(#iter.id, T).
  31. en(T) -> e(#iter.next, T).
  32. ep(T) -> e(#iter.prev, T).
  33. dir(0) -> top;
  34. dir(1) -> bot.
  35. acc(0) -> prev;
  36. acc(1) -> next.
  37. % section: next, prev
  38. next (#cur{reader=[]}=C) -> {error,[]};
  39. next (#cur{reader=B} =C) -> pos(kvs:get(tab(B),en(B)),C,right(C)).
  40. prev (#cur{reader=[]}=C) -> {error,[]};
  41. prev (#cur{reader=B} =C) -> pos(kvs:get(tab(B),ep(B)),C,left(C)).
  42. left (#cur{left=0,right=0,dir=D}) -> swap(D,{0, 0});
  43. left (#cur{left=0,right=R,dir=D}) -> swap(D,{0, R});
  44. left (#cur{left=L,right=R,dir=D}) -> swap(D,{L-1,R+1}).
  45. right(#cur{left=0,right=0,dir=D}) -> swap(D,{0, 0});
  46. right(#cur{left=L,right=0,dir=D}) -> swap(D,{L, 0});
  47. right(#cur{left=L,right=R,dir=D}) -> swap(D,{L+1,R-1}).
  48. swap(1,{L,R}) -> {R,L};
  49. swap(0,{L,R}) -> {L,R}.
  50. pos({ok,R},C,{X,Y}) -> C#cur{reader=R,left=X,right=Y};
  51. pos({error,X},C,_) -> {error,X}.
  52. % section: take, drop
  53. drop(#cur{dir=D,reader=P,args=N}=C) -> drop(acc(D),N,C,C).
  54. take(#cur{dir=D,reader=P,args=N}=C) -> take(acc(D),N,C,C,[]).
  55. take(_,_,{error,C},C2,R) -> C2#cur{args=lists:flatten(R)};
  56. take(_,0,C,C2,R) -> C2#cur{args=lists:flatten(R)};
  57. take(A,N,#cur{reader=B}=C,C2,R) -> take(A,N-1,?MODULE:A(C),C2,[B|R]).
  58. drop(_,_,{error,C},C2) -> C2;
  59. drop(_,0,C,C2) -> C2;
  60. drop(A,N,#cur{reader=B}=C,C2) -> drop(A,N-1,?MODULE:A(C),C2).
  61. % rewind (moves writer)
  62. rewind(#cur{writer=[]}=C) -> {error,[]};
  63. rewind(#cur{dir=D,top=T,bot=B,writer=V}=C) ->
  64. C#cur{writer=id(kvs:get(tab(V),select(D,T,B)))}.
  65. select(0,T,B) -> T;
  66. select(1,T,B) -> B;
  67. select(P,P,X) -> X;
  68. select(P,N,X) -> N.
  69. % seek (moves reader)
  70. seek(#cur{writer=[]}=C) -> C;
  71. seek(#cur{bot=X,reader=P,dir=0}=C) when element(2,P) == X -> C;
  72. seek(#cur{top=X,reader=P,dir=1}=C) when element(2,P) == X -> C;
  73. seek(#cur{top=T,bot=B,left=L,right=R,dir=0,reader=P}=C) ->
  74. C#cur{reader=id(kvs:get(tab(P),B)),left=0,right=L+R};
  75. seek(#cur{top=T,bot=B,left=L,right=R,dir=1,reader=P}=C) ->
  76. C#cur{reader=id(kvs:get(tab(P),T)),left=L+R,right=0}.
  77. % new, save, load, up, down, top, bot
  78. new () -> #cur{id=kvs:next_id(cur,1)}.
  79. save (C) -> NC = C#cur{args=[]}, kvs:put(NC), NC.
  80. load (K) -> case kvs:get(cur,K) of {ok,C} -> C; E -> E end.
  81. up (C) -> C#cur{dir=0}.
  82. down (C) -> C#cur{dir=1}.
  83. top (C) -> seek(down(C)).
  84. bot (C) -> seek(up(C)).
  85. % add
  86. add(#cur{dir=D,args=M}=C) when element(2,M) == [] ->
  87. add(dir(D),si(M,kvs:next_id(tab(M),1)),C);
  88. add(#cur{dir=D,args=M}=C) ->
  89. add(dir(D),M,C).
  90. inc(#cur{left=L,right=R,dir=D}) -> swap(D,{L+1,R}).
  91. cons(#cur{args=M}=C) -> add(top,M,C).
  92. snoc(#cur{args=M}=C) -> add(bot,M,C).
  93. add(bot,M,#cur{bot=T,writer=[]}=C) ->
  94. Id=id(M), N=sn(sp(M,T),[]), kvs:put(N),
  95. C#cur{writer=N,reader=N,bot=Id,top=Id};
  96. add(top,M,#cur{top=B,writer=[]}=C) ->
  97. Id=id(M), N=sp(sn(M,B),[]), kvs:put(N),
  98. C#cur{writer=N,reader=N,top=Id,bot=Id};
  99. add(top,M,#cur{top=T, writer=V}=C) when element(2,V) /= T ->
  100. add(top, M, rewind(C));
  101. add(bot,M,#cur{bot=B, writer=V}=C) when element(2,V) /= B ->
  102. add(bot, M, rewind(C));
  103. add(bot,M,#cur{bot=T,writer=V,reader=P}=C) ->
  104. Id=id(M), H=sn(sp(M,T),[]), N=sn(V,Id), kvs:put([H,N]),
  105. {L,R} = inc(C), C#cur{reader=select(V,P,N),writer=H,bot=Id,left=L,right=R};
  106. add(top,M,#cur{top=B,writer=V,reader=P}=C) ->
  107. Id=id(M), H=sp(sn(M,B),[]), N=sp(V,Id), kvs:put([H,N]),
  108. {L,R} = inc(C), C#cur{reader=select(V,P,N),writer=H,top=Id,left=L,right=R}.
  109. % remove
  110. remove(#cur{writer=[]}=C) -> {error,val};
  111. remove(#cur{writer=B,reader=X,args=I}=C) ->
  112. {ok,R}=kvs:get(tab(B),I), kvs:delete(tab(B),I),
  113. join(I,[fix(tab(B),X)||X<-[ep(R),en(R)]],C).
  114. fix(M,[]) -> [];
  115. fix(M,X) -> fix(kvs:get(M,X)).
  116. fix({ok,O}) -> O;
  117. fix(_) -> [].
  118. dec(#cur{left=0,right=0,dir=D}) -> swap(D,{0, 0});
  119. dec(#cur{left=L,right=0,dir=D}) -> swap(D,{L-1,0});
  120. dec(#cur{left=0,right=R,dir=D}) -> swap(D,{0,R-1});
  121. dec(#cur{left=L,right=R,dir=D}) -> swap(D,{L-1,R}).
  122. m(I,_,I,_,I,L,R,P,V) -> {R,R};
  123. m(_,I,I,_,I,L,R,P,V) -> {R,L};
  124. m(I,_,_,I,I,L,R,P,V) -> {L,R};
  125. m(_,I,_,I,I,L,R,P,V) -> {L,L};
  126. m(I,_,_,_,I,L,R,P,V) -> {sn(V,id(R)),P};
  127. m(_,I,_,_,I,L,R,P,V) -> {sp(V,id(R)),P};
  128. m(_,_,I,_,I,L,R,P,V) -> {V,sn(P,id(L))};
  129. m(_,_,_,I,I,L,R,P,V) -> {V,sp(P,id(L))};
  130. m(_,_,_,_,I,L,R,P,V) -> {V,P}.
  131. join(I,[[],[]],C) ->
  132. {X,Y} = dec(C),
  133. C#cur{top=[],bot=[],writer=[],reader=[],left=X,right=Y};
  134. join(I,[[], R],#cur{reader=P,writer=V}=Cur) ->
  135. N=sp(R,[]), kvs:put(N), {X,Y} = dec(Cur),
  136. {NV,NP} = m(en(V),ep(V),en(P),ep(P),I,[],N,P,V),
  137. Cur#cur{top=id(N), writer=NV, reader=NP, left=X, right=Y};
  138. join(I,[L, []],#cur{reader=P,writer=V}=Cur) ->
  139. N=sn(L,[]), kvs:put(N), {X,Y} = dec(Cur),
  140. {NV,NP} = m(en(V),ep(V),en(P),ep(P),I,N,[],P,V),
  141. Cur#cur{bot=id(N), writer=NV, left=X, reader=NP, right=Y};
  142. join(I,[L, R],#cur{reader=P,writer=V}=Cur) ->
  143. N=sp(R,id(L)), M=sn(L,id(R)), kvs:put([N,M]), {X,Y} = dec(Cur),
  144. {NV,NP} = m(en(V),ep(V),en(P),ep(P),I,N,M,P,V),
  145. Cur#cur{left=X, reader=NP, writer=NV, right=Y}.
  146. % TESTS
  147. check() ->
  148. te_remove(),
  149. test1(),
  150. test2(),
  151. drop(),
  152. create_destroy(),
  153. next_prev_duality(),
  154. test_sides(),
  155. rewind(),
  156. ok.
  157. rewind() ->
  158. Empty = {'user2',[],[],[],[],[],[],[],[]},
  159. C = #cur{top=T,bot=B,left=L,right=R,writer=V,reader=P} =
  160. save(add(set(#cur.args,Empty,down(
  161. add(set(#cur.args,Empty,up(
  162. add(set(#cur.args,Empty,down(
  163. add(set(#cur.args,Empty,down(
  164. add(set(#cur.args,Empty,up(new())))))))))))))))),
  165. PId = id(P),
  166. VId = id(V),
  167. B = VId,
  168. PId = B - 4,
  169. ok.
  170. test_sides() ->
  171. Empty = {'user2',[],[],[],[],[],[],[],[]},
  172. #cur{top=T,bot=B,left=L,right=R,writer=V,reader=P} =
  173. save(
  174. add(set(#cur.args,Empty,up(
  175. add(set(#cur.args,Empty,down(
  176. add(set(#cur.args,Empty,new()))))))))),
  177. PId = id(P),
  178. VId = id(V),
  179. VId = T,
  180. PId = B - 1,
  181. 1 = T - B,
  182. L = R = 1.
  183. next_prev_duality() ->
  184. Cur = new(),
  185. [A,B,C] = [ kvs:next_id('user2',1) || _ <- lists:seq(1,3) ],
  186. R = save(
  187. add(set(#cur.args,{'user2',A,[],[],[],[],[],[],[]},
  188. add(set(#cur.args,{'user2',B,[],[],[],[],[],[],[]},
  189. add(set(#cur.args,{'user2',C,[],[],[],[],[],[],[]},
  190. Cur))))))),
  191. X = load(id(Cur)),
  192. X = next(
  193. next(
  194. prev(
  195. prev(X)))).
  196. test2() ->
  197. Cur = new(),
  198. [A,B,C,D] = [ kvs:next_id('user2',1) || _ <- lists:seq(1,4) ],
  199. #cur{args=[]} = take(
  200. up(
  201. bot(
  202. remove(set(#cur.args,A,
  203. remove(set(#cur.args,B,
  204. remove(set(#cur.args,C,
  205. remove(set(#cur.args,D,
  206. add(set(#cur.args,{'user2',A,[],[],[],[],[],[],[]},
  207. add(set(#cur.args,{'user2',B,[],[],[],[],[],[],[]},
  208. add(set(#cur.args,{'user2',C,[],[],[],[],[],[],[]},
  209. add(set(#cur.args,{'user2',D,[],[],[],[],[],[],[]},
  210. up(Cur#cur{args=-1})))))))))))))))))))).
  211. create_destroy() ->
  212. Cur = new(),
  213. [A,B,C,D] = [ kvs:next_id('user2',1)
  214. || _ <- lists:seq(1,4) ],
  215. #cur{args=[]} = take(
  216. remove(set(#cur.args,B,
  217. remove(set(#cur.args,D,
  218. remove(set(#cur.args,A,
  219. remove(set(#cur.args,C,
  220. add(set(#cur.args,{'user2',D,[],[],[],[],[],[],[]},
  221. add(set(#cur.args,{'user2',C,[],[],[],[],[],[],[]},
  222. add(set(#cur.args,{'user2',B,[],[],[],[],[],[],[]},
  223. add(set(#cur.args,{'user2',A,[],[],[],[],[],[],[]},
  224. up(new())))))))))))))))))).
  225. test1() ->
  226. [A,B,C,D] = [ kvs:next_id('user2',1) || _ <- lists:seq(1,4) ],
  227. R = save(
  228. add(set(#cur.args,{'user2',D,[],[],[],[],[],[],[]},
  229. add(set(#cur.args,{'user2',C,[],[],[],[],[],[],[]},
  230. add(set(#cur.args,{'user2',B,[],[],[],[],[],[],[]},
  231. add(set(#cur.args,{'user2',A,[],[],[],[],[],[],[]},
  232. new() ))))))))),
  233. #cur{args=X} = take(top(R#cur{args=-1})),
  234. #cur{args=Y} = take(bot(R#cur{args=-1})),
  235. X = lists:reverse(Y),
  236. L = length(X).
  237. drop() ->
  238. #cur{id=S}=save(new()),
  239. P = {'user2',[],[],[],[],[],[],[],[]},
  240. S1 = save(
  241. add(set(#cur.args,P,
  242. add(set(#cur.args,P,
  243. add(set(#cur.args,P,
  244. add(set(#cur.args,P,
  245. load(S)))))))))),
  246. S2= drop(S1#cur{args=2}),
  247. 4 = length(e(#cur.args,take(S2#cur{args=-1}))),
  248. 4 = length(e(#cur.args,take(S1#cur{args=-1}))),
  249. ok.
  250. te_remove() ->
  251. #cur{id=S}=save(new()),
  252. P = {'user2',[],[],[],[],[],[],[],[]},
  253. S1 = save(
  254. add(set(#cur.args,P,
  255. add(set(#cur.args,P,
  256. add(set(#cur.args,P,
  257. add(set(#cur.args,P,
  258. load(S)))))))))),
  259. Res = e(#cur.args,take(S1#cur{args=-1})),
  260. 4 = length(Res),
  261. S2 = save(top(S1)),
  262. S3 = save(remove(S2#cur{args=S2#cur.top-1})),
  263. #cur{args=List} = take(top(S3#cur{args=-1})),
  264. #cur{args=Rev} = take(bot(S3#cur{args=-1})),
  265. List = lists:reverse(Rev),
  266. 3 = length(List),
  267. {S3,List}.