Browse Source

separate readers and writers

Namdak Tonpa 7 years ago
parent
commit
2f8c8f17f3
3 changed files with 418 additions and 287 deletions
  1. 17 5
      include/kvs.hrl
  2. 317 0
      src/kvs_double.erl
  3. 84 282
      src/kvs_stream.erl

+ 17 - 5
include/kvs.hrl

@@ -1,6 +1,19 @@
 -ifndef(KVS_HRL).
 -define(KVS_HRL, true).
 
+-record(writer,  {id    = [] :: term(), % {p2p,_,_} | {muc,_}
+                  count =  0 :: integer(),
+                  cache = [] :: [] | tuple(),
+                  args  = [] :: term(),
+                  first = [] :: [] | tuple()}).
+
+-record(reader,  {id    = [] :: term(), % phone_id
+                  pos   = [] :: [] | integer(),
+                  cache = [] :: [] | integer(),
+                  args  = [] :: term(),
+                  feed  = [] :: term(), % {p2p,_,_} | {muc,_} -- link to writer
+                  dir   =  0 :: 0 | 1}).
+
 -define(CUR,  id =  [] :: term(),
               top=  [] :: [] | integer(),
               bot=  [] :: [] | integer(),
@@ -9,11 +22,10 @@
               writer=  [] :: [] | tuple()).
 -record(cur,  {?CUR, left=0, right=0, args=[]::list(tuple()|integer()), money=0, status=[]}).
 
--define(ITER, id=   [] :: [] | integer(),
-              container = cur :: atom(),
-              feed_id = [] :: term(),
-              next= [] :: [] | integer(),
-              prev= [] :: [] | integer()).
+-define(ITER, id   = [] :: [] | integer(),
+              feed = [] :: term(),
+              next = [] :: [] | integer(),
+              prev = [] :: [] | integer()).
 -record(iter, {?ITER}).
 
 -define(CONTAINER, id=[] :: [] | integer(),

+ 317 - 0
src/kvs_double.erl

@@ -0,0 +1,317 @@
+-module(kvs_double).
+-description('KVS STREAM').
+-copyrihgt('Synrc Research Center').
+-author('Maxim Sokhatsky').
+-license('ISC').
+-include("kvs.hrl").
+-export([ new/0, top/1, bot/1, take/1, drop/1, load/1, save/1, down/1, up/1, cons/1, snoc/1,
+          check/0, seek/1, rewind/1, next/1, prev/1, add/1, remove/1 ]).
+
+% n2o stream protocol
+
+info(#cur{id=I,status=load}=C,R,S) -> {reply, {bert, kvs_stream:load(I)},   R, S};
+info(#cur{status=Method}   =C,R,S) -> {reply, {bert, kvs_stream:Method(C)}, R, S};
+info(                       C,R,S) -> {reply, {unknown,C},                  R, S}.
+
+% section: kvs_stream prelude
+
+se(X,Y,Z) -> setelement(X,Y,Z).
+set(X,Y,Z) -> setelement(X,Z,Y).
+e(X,Y) -> element(X,Y).
+cv(R,V) -> se(#cur.writer,R, V).
+cb(R,V) -> se(#cur.bot,   R, V).
+ct(R,V) -> se(#cur.top,   R, V).
+cl(R,V) -> se(#cur.left,  R, V).
+cr(R,V) -> se(#cur.right, R, V).
+cd(R,V) -> se(#cur.dir,   R, V).
+sn(M,T) -> se(#iter.next, M, T).
+sp(M,T) -> se(#iter.prev, M, T).
+si(M,T) -> se(#iter.id, M, T).
+el(X,T) -> e(X, T).
+tab(T) -> e(1, T).
+et(T) -> e(#cur.top, T).
+eb(T) -> e(#cur.bot, T).
+id(T) -> e(#iter.id, T).
+en(T) -> e(#iter.next, T).
+ep(T) -> e(#iter.prev, T).
+dir(0) -> top;
+dir(1) -> bot.
+acc(0) -> prev;
+acc(1) -> next.
+
+% section: next, prev
+
+next (#cur{reader=[]}=C) -> {error,[]};
+next (#cur{reader=B} =C) -> pos(kvs:get(tab(B),en(B)),C,right(C)).
+prev (#cur{reader=[]}=C) -> {error,[]};
+prev (#cur{reader=B} =C) -> pos(kvs:get(tab(B),ep(B)),C,left(C)).
+
+left (#cur{left=0,right=0,dir=D}) -> swap(D,{0,  0});
+left (#cur{left=0,right=R,dir=D}) -> swap(D,{0,  R});
+left (#cur{left=L,right=R,dir=D}) -> swap(D,{L-1,R+1}).
+right(#cur{left=0,right=0,dir=D}) -> swap(D,{0,  0});
+right(#cur{left=L,right=0,dir=D}) -> swap(D,{L,  0});
+right(#cur{left=L,right=R,dir=D}) -> swap(D,{L+1,R-1}).
+
+swap(1,{L,R}) -> {R,L};
+swap(0,{L,R}) -> {L,R}.
+
+pos({ok,R},C,{X,Y}) -> C#cur{reader=R,left=X,right=Y};
+pos({error,X},C,_)  -> {error,X}.
+
+% section: take, drop
+
+drop(#cur{dir=D,reader=P,args=N}=C) -> drop(acc(D),N,C,C).
+take(#cur{dir=D,reader=P,args=N}=C) -> take(acc(D),N,C,C,[]).
+
+take(_,_,{error,C},C2,R) -> C2#cur{args=lists:flatten(R)};
+take(_,0,C,C2,R) -> C2#cur{args=lists:flatten(R)};
+take(A,N,#cur{reader=B}=C,C2,R) -> take(A,N-1,?MODULE:A(C),C2,[B|R]).
+
+drop(_,_,{error,C},C2) -> C2;
+drop(_,0,C,C2) -> C2;
+drop(A,N,#cur{reader=B}=C,C2) -> drop(A,N-1,?MODULE:A(C),C2).
+
+% rewind (moves writer)
+
+rewind(#cur{writer=[]}=C) -> {error,[]};
+rewind(#cur{dir=D,top=T,bot=B,writer=V}=C) ->
+    C#cur{writer=id(kvs:get(tab(V),select(D,T,B)))}.
+
+select(0,T,B) -> T;
+select(1,T,B) -> B;
+select(P,P,X) -> X;
+select(P,N,X) -> N.
+
+% seek (moves reader)
+
+seek(#cur{writer=[]}=C) -> C;
+seek(#cur{bot=X,reader=P,dir=0}=C) when element(2,P) == X -> C;
+seek(#cur{top=X,reader=P,dir=1}=C) when element(2,P) == X -> C;
+seek(#cur{top=T,bot=B,left=L,right=R,dir=0,reader=P}=C) ->
+     C#cur{reader=id(kvs:get(tab(P),B)),left=0,right=L+R};
+seek(#cur{top=T,bot=B,left=L,right=R,dir=1,reader=P}=C) ->
+     C#cur{reader=id(kvs:get(tab(P),T)),left=L+R,right=0}.
+
+% new, save, load, up, down, top, bot
+
+new   () -> #cur{id=kvs:next_id(cur,1)}.
+save (C) -> NC = C#cur{args=[]}, kvs:put(NC), NC.
+load (K) -> case kvs:get(cur,K) of {ok,C} -> C; E -> E end.
+up   (C) -> C#cur{dir=0}.
+down (C) -> C#cur{dir=1}.
+top  (C) -> seek(down(C)).
+bot  (C) -> seek(up(C)).
+
+% add
+
+add(#cur{dir=D,args=M}=C) when element(2,M) == [] ->
+    add(dir(D),si(M,kvs:next_id(tab(M),1)),C);
+add(#cur{dir=D,args=M}=C) ->
+    add(dir(D),M,C).
+
+inc(#cur{left=L,right=R,dir=D}) -> swap(D,{L+1,R}).
+
+cons(#cur{args=M}=C) -> add(top,M,C).
+snoc(#cur{args=M}=C) -> add(bot,M,C).
+
+add(bot,M,#cur{bot=T,writer=[]}=C) ->
+    Id=id(M), N=sn(sp(M,T),[]), kvs:put(N),
+    C#cur{writer=N,reader=N,bot=Id,top=Id};
+
+add(top,M,#cur{top=B,writer=[]}=C) ->
+    Id=id(M), N=sp(sn(M,B),[]), kvs:put(N),
+    C#cur{writer=N,reader=N,top=Id,bot=Id};
+
+add(top,M,#cur{top=T, writer=V}=C) when element(2,V) /= T ->
+    add(top, M, rewind(C));
+
+add(bot,M,#cur{bot=B, writer=V}=C) when element(2,V) /= B ->
+    add(bot, M, rewind(C));
+
+add(bot,M,#cur{bot=T,writer=V,reader=P}=C) ->
+    Id=id(M), H=sn(sp(M,T),[]), N=sn(V,Id), kvs:put([H,N]),
+    {L,R} = inc(C), C#cur{reader=select(V,P,N),writer=H,bot=Id,left=L,right=R};
+
+add(top,M,#cur{top=B,writer=V,reader=P}=C) ->
+    Id=id(M), H=sp(sn(M,B),[]), N=sp(V,Id), kvs:put([H,N]),
+    {L,R} = inc(C), C#cur{reader=select(V,P,N),writer=H,top=Id,left=L,right=R}.
+
+% remove
+
+remove(#cur{writer=[]}=C) -> {error,val};
+remove(#cur{writer=B,reader=X,args=I}=C) ->
+    {ok,R}=kvs:get(tab(B),I), kvs:delete(tab(B),I),
+    join(I,[fix(tab(B),X)||X<-[ep(R),en(R)]],C).
+
+fix(M,[])     -> [];
+fix(M,X)      -> fix(kvs:get(M,X)).
+fix({ok,O})   -> O;
+fix(_)        -> [].
+
+dec(#cur{left=0,right=0,dir=D}) -> swap(D,{0,  0});
+dec(#cur{left=L,right=0,dir=D}) -> swap(D,{L-1,0});
+dec(#cur{left=0,right=R,dir=D}) -> swap(D,{0,R-1});
+dec(#cur{left=L,right=R,dir=D}) -> swap(D,{L-1,R}).
+
+m(I,_,I,_,I,L,R,P,V) -> {R,R};
+m(_,I,I,_,I,L,R,P,V) -> {R,L};
+m(I,_,_,I,I,L,R,P,V) -> {L,R};
+m(_,I,_,I,I,L,R,P,V) -> {L,L};
+m(I,_,_,_,I,L,R,P,V) -> {sn(V,id(R)),P};
+m(_,I,_,_,I,L,R,P,V) -> {sp(V,id(R)),P};
+m(_,_,I,_,I,L,R,P,V) -> {V,sn(P,id(L))};
+m(_,_,_,I,I,L,R,P,V) -> {V,sp(P,id(L))};
+m(_,_,_,_,I,L,R,P,V) -> {V,P}.
+
+join(I,[[],[]],C) ->
+    {X,Y} = dec(C),
+    C#cur{top=[],bot=[],writer=[],reader=[],left=X,right=Y};
+
+join(I,[[], R],#cur{reader=P,writer=V}=Cur) ->
+    N=sp(R,[]), kvs:put(N), {X,Y} = dec(Cur),
+    {NV,NP} = m(en(V),ep(V),en(P),ep(P),I,[],N,P,V),
+    Cur#cur{top=id(N), writer=NV, reader=NP, left=X, right=Y};
+
+join(I,[L, []],#cur{reader=P,writer=V}=Cur) ->
+    N=sn(L,[]), kvs:put(N), {X,Y} = dec(Cur),
+    {NV,NP} = m(en(V),ep(V),en(P),ep(P),I,N,[],P,V),
+    Cur#cur{bot=id(N), writer=NV, left=X, reader=NP, right=Y};
+
+join(I,[L,  R],#cur{reader=P,writer=V}=Cur) ->
+    N=sp(R,id(L)), M=sn(L,id(R)), kvs:put([N,M]), {X,Y} = dec(Cur),
+    {NV,NP} = m(en(V),ep(V),en(P),ep(P),I,N,M,P,V),
+    Cur#cur{left=X, reader=NP, writer=NV, right=Y}.
+
+% TESTS
+
+check() ->
+    te_remove(),
+    test1(),
+    test2(),
+    drop(),
+    create_destroy(),
+    next_prev_duality(),
+    test_sides(),
+    rewind(),
+    ok.
+
+rewind() ->
+    Empty = {'user2',[],[],[],[],[],[],[],[]},
+    C = #cur{top=T,bot=B,left=L,right=R,writer=V,reader=P} =
+    save(add(set(#cur.args,Empty,down(
+         add(set(#cur.args,Empty,up(
+         add(set(#cur.args,Empty,down(
+         add(set(#cur.args,Empty,down(
+         add(set(#cur.args,Empty,up(new())))))))))))))))),
+    PId = id(P),
+    VId = id(V),
+    B = VId,
+    PId = B - 4,
+    ok.
+
+test_sides() ->
+    Empty = {'user2',[],[],[],[],[],[],[],[]},
+    #cur{top=T,bot=B,left=L,right=R,writer=V,reader=P} =
+    save(
+    add(set(#cur.args,Empty,up(
+    add(set(#cur.args,Empty,down(
+    add(set(#cur.args,Empty,new()))))))))),
+    PId = id(P),
+    VId = id(V),
+    VId = T,
+    PId = B - 1,
+    1 = T - B,
+    L = R = 1.
+
+next_prev_duality() ->
+    Cur = new(),
+    [A,B,C] = [ kvs:next_id('user2',1) || _ <- lists:seq(1,3) ],
+    R = save(
+        add(set(#cur.args,{'user2',A,[],[],[],[],[],[],[]},
+        add(set(#cur.args,{'user2',B,[],[],[],[],[],[],[]},
+        add(set(#cur.args,{'user2',C,[],[],[],[],[],[],[]},
+        Cur))))))),
+    X = load(id(Cur)),
+    X = next(
+        next(
+        prev(
+        prev(X)))).
+
+test2() ->
+    Cur = new(),
+    [A,B,C,D] = [ kvs:next_id('user2',1) || _ <- lists:seq(1,4) ],
+    #cur{args=[]} = take(
+         up(
+         bot(
+         remove(set(#cur.args,A,
+         remove(set(#cur.args,B,
+         remove(set(#cur.args,C,
+         remove(set(#cur.args,D,
+         add(set(#cur.args,{'user2',A,[],[],[],[],[],[],[]},
+         add(set(#cur.args,{'user2',B,[],[],[],[],[],[],[]},
+         add(set(#cur.args,{'user2',C,[],[],[],[],[],[],[]},
+         add(set(#cur.args,{'user2',D,[],[],[],[],[],[],[]},
+         up(Cur#cur{args=-1})))))))))))))))))))).
+
+create_destroy() ->
+    Cur = new(),
+    [A,B,C,D] = [ kvs:next_id('user2',1)
+             || _ <- lists:seq(1,4) ],
+    #cur{args=[]} = take(
+         remove(set(#cur.args,B,
+         remove(set(#cur.args,D,
+         remove(set(#cur.args,A,
+         remove(set(#cur.args,C,
+         add(set(#cur.args,{'user2',D,[],[],[],[],[],[],[]},
+         add(set(#cur.args,{'user2',C,[],[],[],[],[],[],[]},
+         add(set(#cur.args,{'user2',B,[],[],[],[],[],[],[]},
+         add(set(#cur.args,{'user2',A,[],[],[],[],[],[],[]},
+         up(new())))))))))))))))))).
+
+test1() ->
+    [A,B,C,D] = [ kvs:next_id('user2',1) || _ <- lists:seq(1,4) ],
+    R  = save(
+         add(set(#cur.args,{'user2',D,[],[],[],[],[],[],[]},
+         add(set(#cur.args,{'user2',C,[],[],[],[],[],[],[]},
+         add(set(#cur.args,{'user2',B,[],[],[],[],[],[],[]},
+         add(set(#cur.args,{'user2',A,[],[],[],[],[],[],[]},
+         new() ))))))))),
+    #cur{args=X}  = take(top(R#cur{args=-1})),
+    #cur{args=Y}  = take(bot(R#cur{args=-1})),
+    X  = lists:reverse(Y),
+    L  = length(X).
+
+drop() ->
+    #cur{id=S}=save(new()),
+    P = {'user2',[],[],[],[],[],[],[],[]},
+    S1 = save(
+         add(set(#cur.args,P,
+         add(set(#cur.args,P,
+         add(set(#cur.args,P,
+         add(set(#cur.args,P,
+         load(S)))))))))),
+    S2= drop(S1#cur{args=2}),
+    4 = length(e(#cur.args,take(S2#cur{args=-1}))),
+    4 = length(e(#cur.args,take(S1#cur{args=-1}))),
+    ok.
+
+te_remove() ->
+    #cur{id=S}=save(new()),
+    P = {'user2',[],[],[],[],[],[],[],[]},
+    S1 = save(
+         add(set(#cur.args,P,
+         add(set(#cur.args,P,
+         add(set(#cur.args,P,
+         add(set(#cur.args,P,
+         load(S)))))))))),
+
+    Res = e(#cur.args,take(S1#cur{args=-1})),
+    4 = length(Res),
+    S2 = save(top(S1)),
+    S3 = save(remove(S2#cur{args=S2#cur.top-1})),
+    #cur{args=List} = take(top(S3#cur{args=-1})),
+    #cur{args=Rev}  = take(bot(S3#cur{args=-1})),
+    List = lists:reverse(Rev),
+    3 = length(List),
+    {S3,List}.

+ 84 - 282
src/kvs_stream.erl

@@ -3,315 +3,117 @@
 -copyrihgt('Synrc Research Center').
 -author('Maxim Sokhatsky').
 -license('ISC').
--include("kvs.hrl").
--export([ new/0, top/1, bot/1, take/1, drop/1, load/1, save/1, down/1, up/1, cons/1, snoc/1,
-          check/0, seek/1, rewind/1, next/1, prev/1, add/1, remove/1 ]).
-
-% n2o stream protocol
-
-info(#cur{id=I,status=load}=C,R,S) -> {reply, {bert, kvs_stream:load(I)},   R, S};
-info(#cur{status=Method}   =C,R,S) -> {reply, {bert, kvs_stream:Method(C)}, R, S};
-info(                       C,R,S) -> {reply, {unknown,C},                  R, S}.
+-include_lib("kvs/include/kvs.hrl").
+-compile(export_all).
 
 % section: kvs_stream prelude
 
-se(X,Y,Z) -> setelement(X,Y,Z).
+se(X,Y,Z)  -> setelement(X,Y,Z).
 set(X,Y,Z) -> setelement(X,Z,Y).
-e(X,Y) -> element(X,Y).
-cv(R,V) -> se(#cur.writer,R, V).
-cb(R,V) -> se(#cur.bot,   R, V).
-ct(R,V) -> se(#cur.top,   R, V).
-cl(R,V) -> se(#cur.left,  R, V).
-cr(R,V) -> se(#cur.right, R, V).
-cd(R,V) -> se(#cur.dir,   R, V).
+e(X,Y)  -> element(X,Y).
+c0(R,V) -> se(1, R, V).
+c1(R,V) -> se(#reader.id,    R, V).
+c2(R,V) -> se(#reader.pos,   R, V).
+c3(R,V) -> se(#reader.cache, R, V).
+c4(R,V) -> se(#reader.args,  R, V).
+c5(R,V) -> se(#reader.feed,  R, V).
+c6(R,V) -> se(#reader.dir,   R, V).
+wf(R,V) -> se(#writer.first, R, V).
 sn(M,T) -> se(#iter.next, M, T).
 sp(M,T) -> se(#iter.prev, M, T).
 si(M,T) -> se(#iter.id, M, T).
+sf(M,T) -> se(#iter.feed, M, T).
 el(X,T) -> e(X, T).
-tab(T) -> e(1, T).
-et(T) -> e(#cur.top, T).
-eb(T) -> e(#cur.bot, T).
-id(T) -> e(#iter.id, T).
-en(T) -> e(#iter.next, T).
-ep(T) -> e(#iter.prev, T).
-dir(0) -> top;
-dir(1) -> bot.
-acc(0) -> prev;
-acc(1) -> next.
+tab(T)  -> e(1, T).
+id(T)   -> e(#iter.id, T).
+en(T)   -> e(#iter.next, T).
+ep(T)   -> e(#iter.prev, T).
+pos(T)  -> e(#reader.pos, T).
+args(T) -> e(#writer.args, T).
+dir(0)  -> top;
+dir(1)  -> bot.
+acc(0)  -> next;
+acc(1)  -> prev.
 
 % section: next, prev
 
-next (#cur{reader=[]}=C) -> {error,[]};
-next (#cur{reader=B} =C) -> pos(kvs:get(tab(B),en(B)),C,right(C)).
-prev (#cur{reader=[]}=C) -> {error,[]};
-prev (#cur{reader=B} =C) -> pos(kvs:get(tab(B),ep(B)),C,left(C)).
+top(#reader{feed=F}=C) -> w(kvs:get(writer,F),top,C).
+bot(#reader{feed=F}=C) -> w(kvs:get(writer,F),bot,C).
 
-left (#cur{left=0,right=0,dir=D}) -> swap(D,{0,  0});
-left (#cur{left=0,right=R,dir=D}) -> swap(D,{0,  R});
-left (#cur{left=L,right=R,dir=D}) -> swap(D,{L-1,R+1}).
-right(#cur{left=0,right=0,dir=D}) -> swap(D,{0,  0});
-right(#cur{left=L,right=0,dir=D}) -> swap(D,{L,  0});
-right(#cur{left=L,right=R,dir=D}) -> swap(D,{L+1,R-1}).
+next(#reader{feed=F,cache=[]}=C)          -> {error,empty};
+next(#reader{feed=F,cache={T,R},pos=P}=C) -> n(kvs:get(T,R),C,P+1).
+prev(#reader{feed=F,cache=[]}=C)          -> {error,empty};
+prev(#reader{feed=F,cache={T,R},pos=P}=C) -> p(kvs:get(T,R),C,P-1).
 
-swap(1,{L,R}) -> {R,L};
-swap(0,{L,R}) -> {L,R}.
-
-pos({ok,R},C,{X,Y}) -> C#cur{reader=R,left=X,right=Y};
-pos({error,X},C,_)  -> {error,X}.
+n({ok,R},C,P)    -> r(kvs:get(tab(R),en(R)),C,P);
+n({error,X},C,_) -> {error,X}.
+p({ok,R},C,P)    -> r(kvs:get(tab(R),ep(R)),C,P);
+p({error,X},C,_) -> {error,X}.
+r({ok,R},C,P)    -> C#reader{cache={tab(R),id(R)},pos=P};
+r({error,X},C,_) -> {error,X}.
+w({ok,#writer{first=B}},bot,C)            -> C#reader{cache={tab(B),id(B)},pos=1};
+w({ok,#writer{cache=B,count=Size}}=X,top,C) -> C#reader{cache={tab(B),id(B)},pos=Size};
+w({error,X},_,C)                          -> {error,X}.
 
 % section: take, drop
 
-drop(#cur{dir=D,reader=P,args=N}=C) -> drop(acc(D),N,C,C).
-take(#cur{dir=D,reader=P,args=N}=C) -> take(acc(D),N,C,C,[]).
-
-take(_,_,{error,C},C2,R) -> C2#cur{args=lists:flatten(R)};
-take(_,0,C,C2,R) -> C2#cur{args=lists:flatten(R)};
-take(A,N,#cur{reader=B}=C,C2,R) -> take(A,N-1,?MODULE:A(C),C2,[B|R]).
-
-drop(_,_,{error,C},C2) -> C2;
-drop(_,0,C,C2) -> C2;
-drop(A,N,#cur{reader=B}=C,C2) -> drop(A,N-1,?MODULE:A(C),C2).
-
-% rewind (moves writer)
-
-rewind(#cur{writer=[]}=C) -> {error,[]};
-rewind(#cur{dir=D,top=T,bot=B,writer=V}=C) ->
-    C#cur{writer=id(kvs:get(tab(V),select(D,T,B)))}.
-
-select(0,T,B) -> T;
-select(1,T,B) -> B;
-select(P,P,X) -> X;
-select(P,N,X) -> N.
+drop(#reader{dir=D,cache=B,args=N,pos=P}=C) -> drop(acc(D),N,C,C,P,B).
+take(#reader{dir=D,cache=B,args=N,pos=P}=C) -> take(acc(D),N,C,C,[],P,B).
 
-% seek (moves reader)
+take(_,_,{error,C},C2,R,P,B) -> C2#reader{args=lists:flatten(R),pos=P,cache=B};
+take(_,0,C,C2,R,P,B)         -> C2#reader{args=lists:flatten(R),pos=P,cache=B};
+take(A,N,#reader{cache={T,I},pos=P}=C,C2,R,_,_) ->
+    take(A,N-1,?MODULE:A(C),C2,[element(2,kvs:get(T,I))|R],P,{T,I}).
 
-seek(#cur{writer=[]}=C) -> C;
-seek(#cur{bot=X,reader=P,dir=0}=C) when element(2,P) == X -> C;
-seek(#cur{top=X,reader=P,dir=1}=C) when element(2,P) == X -> C;
-seek(#cur{top=T,bot=B,left=L,right=R,dir=0,reader=P}=C) ->
-     C#cur{reader=id(kvs:get(tab(P),B)),left=0,right=L+R};
-seek(#cur{top=T,bot=B,left=L,right=R,dir=1,reader=P}=C) ->
-     C#cur{reader=id(kvs:get(tab(P),T)),left=L+R,right=0}.
+drop(_,_,{error,C},C2,P,B)     -> C2#reader{pos=P};
+drop(_,0,C,C2,P,B)             -> C2#reader{pos=P};
+drop(A,N,#reader{cache=B,pos=P}=C,C2,_,_) ->
+    drop(A,N-1,?MODULE:A(C),C2,P,B).
 
 % new, save, load, up, down, top, bot
 
-new   () -> #cur{id=kvs:next_id(cur,1)}.
-save (C) -> NC = C#cur{args=[]}, kvs:put(NC), NC.
-load (K) -> case kvs:get(cur,K) of {ok,C} -> C; E -> E end.
-up   (C) -> C#cur{dir=0}.
-down (C) -> C#cur{dir=1}.
-top  (C) -> seek(down(C)).
-bot  (C) -> seek(up(C)).
+load_writer (Id) -> case kvs:get(writer,Id) of {ok,C} -> C; E -> E end.
+load_reader (Id) -> case kvs:get(reader,Id) of {ok,C} -> C; E -> E end.
+writer (Id) -> #writer{id=Id}.
+reader (Id) ->
+    case kvs:get(writer,Id) of
+         {ok,#writer{first=[]}} -> #reader{id=kvs:next_id(reader,1),feed=Id,cache=[]};
+         {ok,#writer{first=F}}  -> #reader{id=kvs:next_id(reader,1),feed=Id,cache={tab(F),id(F)}};
+         {error,X} -> {error,X} end.
+save (C) -> NC = c4(C,[]), kvs:put(NC), NC.
+up   (C) -> C#reader{dir=0}.
+down (C) -> C#reader{dir=1}.
 
 % add
 
-add(#cur{dir=D,args=M}=C) when element(2,M) == [] ->
-    add(dir(D),si(M,kvs:next_id(tab(M),1)),C);
-add(#cur{dir=D,args=M}=C) ->
-    add(dir(D),M,C).
-
-inc(#cur{left=L,right=R,dir=D}) -> swap(D,{L+1,R}).
-
-cons(#cur{args=M}=C) -> add(top,M,C).
-snoc(#cur{args=M}=C) -> add(bot,M,C).
-
-add(bot,M,#cur{bot=T,writer=[]}=C) ->
-    Id=id(M), N=sn(sp(M,T),[]), kvs:put(N),
-    C#cur{writer=N,reader=N,bot=Id,top=Id};
-
-add(top,M,#cur{top=B,writer=[]}=C) ->
-    Id=id(M), N=sp(sn(M,B),[]), kvs:put(N),
-    C#cur{writer=N,reader=N,top=Id,bot=Id};
-
-add(top,M,#cur{top=T, writer=V}=C) when element(2,V) /= T ->
-    add(top, M, rewind(C));
-
-add(bot,M,#cur{bot=B, writer=V}=C) when element(2,V) /= B ->
-    add(bot, M, rewind(C));
-
-add(bot,M,#cur{bot=T,writer=V,reader=P}=C) ->
-    Id=id(M), H=sn(sp(M,T),[]), N=sn(V,Id), kvs:put([H,N]),
-    {L,R} = inc(C), C#cur{reader=select(V,P,N),writer=H,bot=Id,left=L,right=R};
-
-add(top,M,#cur{top=B,writer=V,reader=P}=C) ->
-    Id=id(M), H=sp(sn(M,B),[]), N=sp(V,Id), kvs:put([H,N]),
-    {L,R} = inc(C), C#cur{reader=select(V,P,N),writer=H,top=Id,left=L,right=R}.
+add(#writer{args=M}=C) when element(2,M) == [] -> add(si(M,kvs:next_id(tab(M),1)),C);
+add(#writer{args=M}=C) -> add(M,C).
 
-% remove
+add(M,#writer{cache=[]}=C) ->
+    Id=id(M), N=sp(sn(M,[]),[]), kvs:put(N),
+    C#writer{cache=N,count=1,first=N};
 
-remove(#cur{writer=[]}=C) -> {error,val};
-remove(#cur{writer=B,reader=X,args=I}=C) ->
-    {ok,R}=kvs:get(tab(B),I), kvs:delete(tab(B),I),
-    join(I,[fix(tab(B),X)||X<-[ep(R),en(R)]],C).
+add(M,#writer{cache=V,count=S}=C) ->
+    N=sp(sn(M,[]),id(V)), P=sn(V,id(M)), kvs:put([N,P]),
+    C#writer{cache=N,count=S+1}.
 
-fix(M,[])     -> [];
-fix(M,X)      -> fix(kvs:get(M,X)).
-fix({ok,O})   -> O;
-fix(_)        -> [].
+% tests
 
-dec(#cur{left=0,right=0,dir=D}) -> swap(D,{0,  0});
-dec(#cur{left=L,right=0,dir=D}) -> swap(D,{L-1,0});
-dec(#cur{left=0,right=R,dir=D}) -> swap(D,{0,R-1});
-dec(#cur{left=L,right=R,dir=D}) -> swap(D,{L-1,R}).
-
-m(I,_,I,_,I,L,R,P,V) -> {R,R};
-m(_,I,I,_,I,L,R,P,V) -> {R,L};
-m(I,_,_,I,I,L,R,P,V) -> {L,R};
-m(_,I,_,I,I,L,R,P,V) -> {L,L};
-m(I,_,_,_,I,L,R,P,V) -> {sn(V,id(R)),P};
-m(_,I,_,_,I,L,R,P,V) -> {sp(V,id(R)),P};
-m(_,_,I,_,I,L,R,P,V) -> {V,sn(P,id(L))};
-m(_,_,_,I,I,L,R,P,V) -> {V,sp(P,id(L))};
-m(_,_,_,_,I,L,R,P,V) -> {V,P}.
-
-join(I,[[],[]],C) ->
-    {X,Y} = dec(C),
-    C#cur{top=[],bot=[],writer=[],reader=[],left=X,right=Y};
-
-join(I,[[], R],#cur{reader=P,writer=V}=Cur) ->
-    N=sp(R,[]), kvs:put(N), {X,Y} = dec(Cur),
-    {NV,NP} = m(en(V),ep(V),en(P),ep(P),I,[],N,P,V),
-    Cur#cur{top=id(N), writer=NV, reader=NP, left=X, right=Y};
-
-join(I,[L, []],#cur{reader=P,writer=V}=Cur) ->
-    N=sn(L,[]), kvs:put(N), {X,Y} = dec(Cur),
-    {NV,NP} = m(en(V),ep(V),en(P),ep(P),I,N,[],P,V),
-    Cur#cur{bot=id(N), writer=NV, left=X, reader=NP, right=Y};
-
-join(I,[L,  R],#cur{reader=P,writer=V}=Cur) ->
-    N=sp(R,id(L)), M=sn(L,id(R)), kvs:put([N,M]), {X,Y} = dec(Cur),
-    {NV,NP} = m(en(V),ep(V),en(P),ep(P),I,N,M,P,V),
-    Cur#cur{left=X, reader=NP, writer=NV, right=Y}.
-
-% TESTS
-
-check() ->
-    te_remove(),
-    test1(),
-    test2(),
-    drop(),
-    create_destroy(),
-    next_prev_duality(),
-    test_sides(),
-    rewind(),
-    ok.
-
-rewind() ->
-    Empty = {'user2',[],[],[],[],[],[],[],[]},
-    C = #cur{top=T,bot=B,left=L,right=R,writer=V,reader=P} =
-    save(add(set(#cur.args,Empty,down(
-         add(set(#cur.args,Empty,up(
-         add(set(#cur.args,Empty,down(
-         add(set(#cur.args,Empty,down(
-         add(set(#cur.args,Empty,up(new())))))))))))))))),
-    PId = id(P),
-    VId = id(V),
-    B = VId,
-    PId = B - 4,
-    ok.
-
-test_sides() ->
-    Empty = {'user2',[],[],[],[],[],[],[],[]},
-    #cur{top=T,bot=B,left=L,right=R,writer=V,reader=P} =
-    save(
-    add(set(#cur.args,Empty,up(
-    add(set(#cur.args,Empty,down(
-    add(set(#cur.args,Empty,new()))))))))),
-    PId = id(P),
-    VId = id(V),
-    VId = T,
-    PId = B - 1,
-    1 = T - B,
-    L = R = 1.
-
-next_prev_duality() ->
-    Cur = new(),
-    [A,B,C] = [ kvs:next_id('user2',1) || _ <- lists:seq(1,3) ],
-    R = save(
-        add(set(#cur.args,{'user2',A,[],[],[],[],[],[],[]},
-        add(set(#cur.args,{'user2',B,[],[],[],[],[],[],[]},
-        add(set(#cur.args,{'user2',C,[],[],[],[],[],[],[]},
-        Cur))))))),
-    X = load(id(Cur)),
-    X = next(
-        next(
-        prev(
-        prev(X)))).
-
-test2() ->
-    Cur = new(),
-    [A,B,C,D] = [ kvs:next_id('user2',1) || _ <- lists:seq(1,4) ],
-    #cur{args=[]} = take(
-         up(
-         bot(
-         remove(set(#cur.args,A,
-         remove(set(#cur.args,B,
-         remove(set(#cur.args,C,
-         remove(set(#cur.args,D,
-         add(set(#cur.args,{'user2',A,[],[],[],[],[],[],[]},
-         add(set(#cur.args,{'user2',B,[],[],[],[],[],[],[]},
-         add(set(#cur.args,{'user2',C,[],[],[],[],[],[],[]},
-         add(set(#cur.args,{'user2',D,[],[],[],[],[],[],[]},
-         up(Cur#cur{args=-1})))))))))))))))))))).
-
-create_destroy() ->
-    Cur = new(),
-    [A,B,C,D] = [ kvs:next_id('user2',1)
-             || _ <- lists:seq(1,4) ],
-    #cur{args=[]} = take(
-         remove(set(#cur.args,B,
-         remove(set(#cur.args,D,
-         remove(set(#cur.args,A,
-         remove(set(#cur.args,C,
-         add(set(#cur.args,{'user2',D,[],[],[],[],[],[],[]},
-         add(set(#cur.args,{'user2',C,[],[],[],[],[],[],[]},
-         add(set(#cur.args,{'user2',B,[],[],[],[],[],[],[]},
-         add(set(#cur.args,{'user2',A,[],[],[],[],[],[],[]},
-         up(new())))))))))))))))))).
+check() -> test1().
 
 test1() ->
-    [A,B,C,D] = [ kvs:next_id('user2',1) || _ <- lists:seq(1,4) ],
-    R  = save(
-         add(set(#cur.args,{'user2',D,[],[],[],[],[],[],[]},
-         add(set(#cur.args,{'user2',C,[],[],[],[],[],[],[]},
-         add(set(#cur.args,{'user2',B,[],[],[],[],[],[],[]},
-         add(set(#cur.args,{'user2',A,[],[],[],[],[],[],[]},
-         new() ))))))))),
-    #cur{args=X}  = take(top(R#cur{args=-1})),
-    #cur{args=Y}  = take(bot(R#cur{args=-1})),
-    X  = lists:reverse(Y),
-    L  = length(X).
-
-drop() ->
-    #cur{id=S}=save(new()),
-    P = {'user2',[],[],[],[],[],[],[],[]},
-    S1 = save(
-         add(set(#cur.args,P,
-         add(set(#cur.args,P,
-         add(set(#cur.args,P,
-         add(set(#cur.args,P,
-         load(S)))))))))),
-    S2= drop(S1#cur{args=2}),
-    4 = length(e(#cur.args,take(S2#cur{args=-1}))),
-    4 = length(e(#cur.args,take(S1#cur{args=-1}))),
-    ok.
-
-te_remove() ->
-    #cur{id=S}=save(new()),
-    P = {'user2',[],[],[],[],[],[],[],[]},
-    S1 = save(
-         add(set(#cur.args,P,
-         add(set(#cur.args,P,
-         add(set(#cur.args,P,
-         add(set(#cur.args,P,
-         load(S)))))))))),
-
-    Res = e(#cur.args,take(S1#cur{args=-1})),
-    4 = length(Res),
-    S2 = save(top(S1)),
-    S3 = save(remove(S2#cur{args=S2#cur.top-1})),
-    #cur{args=List} = take(top(S3#cur{args=-1})),
-    #cur{args=Rev}  = take(bot(S3#cur{args=-1})),
-    List = lists:reverse(Rev),
-    3 = length(List),
-    {S3,List}.
+    Id  = {p2p,1,2},
+    X   = 5,
+    W   = kvs_stream:save(kvs_stream:writer(Id)),
+    #reader{id=R1} = kvs_stream:save(kvs_stream:reader(Id)),
+    #reader{id=R2} = kvs_stream:save(kvs_stream:reader(Id)),
+    [ kvs_stream:save(
+      kvs_stream:add((
+      kvs_stream:load_writer(Id))
+      #writer{args={user2,[],[],[],[],[],[],[],[],[]}})) || _ <- lists:seq(1,X) ],
+    Bot = kvs_stream:bot(kvs_stream:load_reader(R1)),
+    Top = kvs_stream:top(kvs_stream:load_reader(R2)),
+    #reader{args=F} = kvs_stream:take(Bot#reader{args=20,dir=0}),
+    #reader{args=B} = kvs_stream:take(Top#reader{args=20,dir=1}),
+    X == length(F),
+    F == lists:reverse(B).