123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317 |
- -module(kvs_stream).
- -description('KVS STREAM').
- -copyrihgt('Synrc Research Center').
- -author('Maxim Sokhatsky').
- -license('ISC').
- -include("kvs.hrl").
- -export([ new/0, top/1, bot/1, take/1, drop/1, load/1, save/1, down/1, up/1, cons/1, snoc/1,
- check/0, seek/1, rewind/1, next/1, prev/1, add/1, remove/1 ]).
- % n2o stream protocol
- info(#cur{id=I,status=load}=C,R,S) -> {reply, {bert, kvs_stream:load(I)}, R, S};
- info(#cur{status=Method} =C,R,S) -> {reply, {bert, kvs_stream:Method(C)}, R, S};
- info( C,R,S) -> {reply, {unknown,C}, R, S}.
- % section: kvs_stream prelude
- se(X,Y,Z) -> setelement(X,Y,Z).
- set(X,Y,Z) -> setelement(X,Z,Y).
- e(X,Y) -> element(X,Y).
- cv(R,V) -> se(#cur.writer,R, V).
- cb(R,V) -> se(#cur.bot, R, V).
- ct(R,V) -> se(#cur.top, R, V).
- cl(R,V) -> se(#cur.left, R, V).
- cr(R,V) -> se(#cur.right, R, V).
- cd(R,V) -> se(#cur.dir, R, V).
- sn(M,T) -> se(#iter.next, M, T).
- sp(M,T) -> se(#iter.prev, M, T).
- si(M,T) -> se(#iter.id, M, T).
- el(X,T) -> e(X, T).
- tab(T) -> e(1, T).
- et(T) -> e(#cur.top, T).
- eb(T) -> e(#cur.bot, T).
- id(T) -> e(#iter.id, T).
- en(T) -> e(#iter.next, T).
- ep(T) -> e(#iter.prev, T).
- dir(0) -> top;
- dir(1) -> bot.
- acc(0) -> prev;
- acc(1) -> next.
- % section: next, prev
- next (#cur{reader=[]}=C) -> {error,[]};
- next (#cur{reader=B} =C) -> pos(kvs:get(tab(B),en(B)),C,right(C)).
- prev (#cur{reader=[]}=C) -> {error,[]};
- prev (#cur{reader=B} =C) -> pos(kvs:get(tab(B),ep(B)),C,left(C)).
- left (#cur{left=0,right=0,dir=D}) -> swap(D,{0, 0});
- left (#cur{left=0,right=R,dir=D}) -> swap(D,{0, R});
- left (#cur{left=L,right=R,dir=D}) -> swap(D,{L-1,R+1}).
- right(#cur{left=0,right=0,dir=D}) -> swap(D,{0, 0});
- right(#cur{left=L,right=0,dir=D}) -> swap(D,{L, 0});
- right(#cur{left=L,right=R,dir=D}) -> swap(D,{L+1,R-1}).
- swap(1,{L,R}) -> {R,L};
- swap(0,{L,R}) -> {L,R}.
- pos({ok,R},C,{X,Y}) -> C#cur{reader=R,left=X,right=Y};
- pos({error,X},C,_) -> {error,X}.
- % section: take, drop
- drop(#cur{dir=D,reader=P,args=N}=C) -> drop(acc(D),N,C,C).
- take(#cur{dir=D,reader=P,args=N}=C) -> take(acc(D),N,C,C,[]).
- take(_,_,{error,C},C2,R) -> C2#cur{args=lists:flatten(R)};
- take(_,0,C,C2,R) -> C2#cur{args=lists:flatten(R)};
- take(A,N,#cur{reader=B}=C,C2,R) -> take(A,N-1,?MODULE:A(C),C2,[B|R]).
- drop(_,_,{error,C},C2) -> C2;
- drop(_,0,C,C2) -> C2;
- drop(A,N,#cur{reader=B}=C,C2) -> drop(A,N-1,?MODULE:A(C),C2).
- % rewind (moves writer)
- rewind(#cur{writer=[]}=C) -> {error,[]};
- rewind(#cur{dir=D,top=T,bot=B,writer=V}=C) ->
- C#cur{writer=id(kvs:get(tab(V),select(D,T,B)))}.
- select(0,T,B) -> T;
- select(1,T,B) -> B;
- select(P,P,X) -> X;
- select(P,N,X) -> N.
- % seek (moves reader)
- seek(#cur{writer=[]}=C) -> C;
- seek(#cur{bot=X,reader=P,dir=0}=C) when element(2,P) == X -> C;
- seek(#cur{top=X,reader=P,dir=1}=C) when element(2,P) == X -> C;
- seek(#cur{top=T,bot=B,left=L,right=R,dir=0,reader=P}=C) ->
- C#cur{reader=id(kvs:get(tab(P),B)),left=0,right=L+R};
- seek(#cur{top=T,bot=B,left=L,right=R,dir=1,reader=P}=C) ->
- C#cur{reader=id(kvs:get(tab(P),T)),left=L+R,right=0}.
- % new, save, load, up, down, top, bot
- new () -> #cur{id=kvs:next_id(cur,1)}.
- save (C) -> NC = C#cur{args=[]}, kvs:put(NC), NC.
- load (K) -> case kvs:get(cur,K) of {ok,C} -> C; E -> E end.
- up (C) -> C#cur{dir=0}.
- down (C) -> C#cur{dir=1}.
- top (C) -> seek(down(C)).
- bot (C) -> seek(up(C)).
- % add
- add(#cur{dir=D,args=M}=C) when element(2,M) == [] ->
- add(dir(D),si(M,kvs:next_id(tab(M),1)),C);
- add(#cur{dir=D,args=M}=C) ->
- add(dir(D),M,C).
- inc(#cur{left=L,right=R,dir=D}) -> swap(D,{L+1,R}).
- cons(#cur{args=M}=C) -> add(top,M,C).
- snoc(#cur{args=M}=C) -> add(bot,M,C).
- add(bot,M,#cur{bot=T,writer=[]}=C) ->
- Id=id(M), N=sn(sp(M,T),[]), kvs:put(N),
- C#cur{writer=N,reader=N,bot=Id,top=Id};
- add(top,M,#cur{top=B,writer=[]}=C) ->
- Id=id(M), N=sp(sn(M,B),[]), kvs:put(N),
- C#cur{writer=N,reader=N,top=Id,bot=Id};
- add(top,M,#cur{top=T, writer=V}=C) when element(2,V) /= T ->
- add(top, M, rewind(C));
- add(bot,M,#cur{bot=B, writer=V}=C) when element(2,V) /= B ->
- add(bot, M, rewind(C));
- add(bot,M,#cur{bot=T,writer=V,reader=P}=C) ->
- Id=id(M), H=sn(sp(M,T),[]), N=sn(V,Id), kvs:put([H,N]),
- {L,R} = inc(C), C#cur{reader=select(V,P,N),writer=H,bot=Id,left=L,right=R};
- add(top,M,#cur{top=B,writer=V,reader=P}=C) ->
- Id=id(M), H=sp(sn(M,B),[]), N=sp(V,Id), kvs:put([H,N]),
- {L,R} = inc(C), C#cur{reader=select(V,P,N),writer=H,top=Id,left=L,right=R}.
- % remove
- remove(#cur{writer=[]}=C) -> {error,val};
- remove(#cur{writer=B,reader=X,args=I}=C) ->
- {ok,R}=kvs:get(tab(B),I), kvs:delete(tab(B),I),
- join(I,[fix(tab(B),X)||X<-[ep(R),en(R)]],C).
- fix(M,[]) -> [];
- fix(M,X) -> fix(kvs:get(M,X)).
- fix({ok,O}) -> O;
- fix(_) -> [].
- dec(#cur{left=0,right=0,dir=D}) -> swap(D,{0, 0});
- dec(#cur{left=L,right=0,dir=D}) -> swap(D,{L-1,0});
- dec(#cur{left=0,right=R,dir=D}) -> swap(D,{0,R-1});
- dec(#cur{left=L,right=R,dir=D}) -> swap(D,{L-1,R}).
- m(I,_,I,_,I,L,R,P,V) -> {R,R};
- m(_,I,I,_,I,L,R,P,V) -> {R,L};
- m(I,_,_,I,I,L,R,P,V) -> {L,R};
- m(_,I,_,I,I,L,R,P,V) -> {L,L};
- m(I,_,_,_,I,L,R,P,V) -> {sn(V,id(R)),P};
- m(_,I,_,_,I,L,R,P,V) -> {sp(V,id(R)),P};
- m(_,_,I,_,I,L,R,P,V) -> {V,sn(P,id(L))};
- m(_,_,_,I,I,L,R,P,V) -> {V,sp(P,id(L))};
- m(_,_,_,_,I,L,R,P,V) -> {V,P}.
- join(I,[[],[]],C) ->
- {X,Y} = dec(C),
- C#cur{top=[],bot=[],writer=[],reader=[],left=X,right=Y};
- join(I,[[], R],#cur{reader=P,writer=V}=Cur) ->
- N=sp(R,[]), kvs:put(N), {X,Y} = dec(Cur),
- {NV,NP} = m(en(V),ep(V),en(P),ep(P),I,[],N,P,V),
- Cur#cur{top=id(N), writer=NV, reader=NP, left=X, right=Y};
- join(I,[L, []],#cur{reader=P,writer=V}=Cur) ->
- N=sn(L,[]), kvs:put(N), {X,Y} = dec(Cur),
- {NV,NP} = m(en(V),ep(V),en(P),ep(P),I,N,[],P,V),
- Cur#cur{bot=id(N), writer=NV, left=X, reader=NP, right=Y};
- join(I,[L, R],#cur{reader=P,writer=V}=Cur) ->
- N=sp(R,id(L)), M=sn(L,id(R)), kvs:put([N,M]), {X,Y} = dec(Cur),
- {NV,NP} = m(en(V),ep(V),en(P),ep(P),I,N,M,P,V),
- Cur#cur{left=X, reader=NP, writer=NV, right=Y}.
- % TESTS
- check() ->
- te_remove(),
- test1(),
- test2(),
- drop(),
- create_destroy(),
- next_prev_duality(),
- test_sides(),
- rewind(),
- ok.
- rewind() ->
- Empty = {'user2',[],[],[],[],[],[],[],[]},
- C = #cur{top=T,bot=B,left=L,right=R,writer=V,reader=P} =
- save(add(set(#cur.args,Empty,down(
- add(set(#cur.args,Empty,up(
- add(set(#cur.args,Empty,down(
- add(set(#cur.args,Empty,down(
- add(set(#cur.args,Empty,up(new())))))))))))))))),
- PId = id(P),
- VId = id(V),
- B = VId,
- PId = B - 4,
- ok.
- test_sides() ->
- Empty = {'user2',[],[],[],[],[],[],[],[]},
- #cur{top=T,bot=B,left=L,right=R,writer=V,reader=P} =
- save(
- add(set(#cur.args,Empty,up(
- add(set(#cur.args,Empty,down(
- add(set(#cur.args,Empty,new()))))))))),
- PId = id(P),
- VId = id(V),
- VId = T,
- PId = B - 1,
- 1 = T - B,
- L = R = 1.
- next_prev_duality() ->
- Cur = new(),
- [A,B,C] = [ kvs:next_id('user2',1) || _ <- lists:seq(1,3) ],
- R = save(
- add(set(#cur.args,{'user2',A,[],[],[],[],[],[],[]},
- add(set(#cur.args,{'user2',B,[],[],[],[],[],[],[]},
- add(set(#cur.args,{'user2',C,[],[],[],[],[],[],[]},
- Cur))))))),
- X = load(id(Cur)),
- X = next(
- next(
- prev(
- prev(X)))).
- test2() ->
- Cur = new(),
- [A,B,C,D] = [ kvs:next_id('user2',1) || _ <- lists:seq(1,4) ],
- #cur{args=[]} = take(
- up(
- bot(
- remove(set(#cur.args,A,
- remove(set(#cur.args,B,
- remove(set(#cur.args,C,
- remove(set(#cur.args,D,
- add(set(#cur.args,{'user2',A,[],[],[],[],[],[],[]},
- add(set(#cur.args,{'user2',B,[],[],[],[],[],[],[]},
- add(set(#cur.args,{'user2',C,[],[],[],[],[],[],[]},
- add(set(#cur.args,{'user2',D,[],[],[],[],[],[],[]},
- up(Cur#cur{args=-1})))))))))))))))))))).
- create_destroy() ->
- Cur = new(),
- [A,B,C,D] = [ kvs:next_id('user2',1)
- || _ <- lists:seq(1,4) ],
- #cur{args=[]} = take(
- remove(set(#cur.args,B,
- remove(set(#cur.args,D,
- remove(set(#cur.args,A,
- remove(set(#cur.args,C,
- add(set(#cur.args,{'user2',D,[],[],[],[],[],[],[]},
- add(set(#cur.args,{'user2',C,[],[],[],[],[],[],[]},
- add(set(#cur.args,{'user2',B,[],[],[],[],[],[],[]},
- add(set(#cur.args,{'user2',A,[],[],[],[],[],[],[]},
- up(new())))))))))))))))))).
- test1() ->
- [A,B,C,D] = [ kvs:next_id('user2',1) || _ <- lists:seq(1,4) ],
- R = save(
- add(set(#cur.args,{'user2',D,[],[],[],[],[],[],[]},
- add(set(#cur.args,{'user2',C,[],[],[],[],[],[],[]},
- add(set(#cur.args,{'user2',B,[],[],[],[],[],[],[]},
- add(set(#cur.args,{'user2',A,[],[],[],[],[],[],[]},
- new() ))))))))),
- #cur{args=X} = take(top(R#cur{args=-1})),
- #cur{args=Y} = take(bot(R#cur{args=-1})),
- X = lists:reverse(Y),
- L = length(X).
- drop() ->
- #cur{id=S}=save(new()),
- P = {'user2',[],[],[],[],[],[],[],[]},
- S1 = save(
- add(set(#cur.args,P,
- add(set(#cur.args,P,
- add(set(#cur.args,P,
- add(set(#cur.args,P,
- load(S)))))))))),
- S2= drop(S1#cur{args=2}),
- 4 = length(e(#cur.args,take(S2#cur{args=-1}))),
- 4 = length(e(#cur.args,take(S1#cur{args=-1}))),
- ok.
- te_remove() ->
- #cur{id=S}=save(new()),
- P = {'user2',[],[],[],[],[],[],[],[]},
- S1 = save(
- add(set(#cur.args,P,
- add(set(#cur.args,P,
- add(set(#cur.args,P,
- add(set(#cur.args,P,
- load(S)))))))))),
- Res = e(#cur.args,take(S1#cur{args=-1})),
- 4 = length(Res),
- S2 = save(top(S1)),
- S3 = save(remove(S2#cur{args=S2#cur.top-1})),
- #cur{args=List} = take(top(S3#cur{args=-1})),
- #cur{args=Rev} = take(bot(S3#cur{args=-1})),
- List = lists:reverse(Rev),
- 3 = length(List),
- {S3,List}.
|