Namdak Tonpa 7 years ago
parent
commit
5e793fffda
6 changed files with 259 additions and 407 deletions
  1. 11 6
      include/kvs.hrl
  2. 6 7
      include/user.hrl
  3. 0 335
      man/kvs.htm
  4. 166 0
      man/kvs_stream.htm
  5. 75 58
      src/kvs_stream.erl
  6. 1 1
      src/kvs_user.erl

+ 11 - 6
include/kvs.hrl

@@ -1,12 +1,17 @@
 -ifndef(KVS_HRL).
 -define(KVS_HRL, true).
 
--record(cur, {feed=[]::term(),
-              tab= []::atom(),
-              val= []::[]|tuple(),
-              dir= next::next|prev,
-              top= []::[]|integer(),
-              bot= []::[]|integer()}).
+-record(cur, {id =  [] :: term(),
+              val=  [] :: [] | tuple(),
+              dir=   0 ::  0 | 1,
+              top=  [] :: [] | integer(),
+              bot=  [] :: [] | integer()}).
+
+-define(ITER, id=   [] :: term(),
+              next= [] :: [] | integer(),
+              prev= [] :: [] | integer()).
+
+-record(iter, {?ITER}).
 
 -define(CONTAINER, id=[] :: [] | integer(),
                    top=[] :: [] | integer(),

+ 6 - 7
include/user.hrl

@@ -23,12 +23,11 @@
         zone=[],
         type=[] }).
 
--record(user2, {?ITERATOR(feed), % version 2
-        everyting_getting_small,
-        email,
-        username,
-        password,
-        zone,
-        type }).
+-record(person, {?ITER, % version 2
+        mail=[]::[]|binary(),
+        name=[]::[]|binary(),
+        pass=[]::[]|binary(),
+        zone=[]::[]|binary(),
+        type=[]::[]|atom() }).
 
 -endif.

+ 0 - 335
man/kvs.htm

@@ -1,335 +0,0 @@
-<html>
-
-<head>
-    <meta charset="utf-8" />
-    <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
-    <meta name="viewport" content="width=device-width, initial-scale=1.0" />
-    <meta name="description" content="" />
-    <meta name="author" content="Maxim Sokhatsky" />
-    <title>N2O</title>
-    <link rel="stylesheet" href="https://n2o.space/n2o.css" />
-</head>
-
-<body>
-
-<header>
-    <a href="../index.html"><img src="https://n2o.space/img/Synrc Neo.svg"></a>
-    <h1>N2O API</h1>
-</header>
-
-<main>
-    <section>
-
-<h3>INTRO</h3>
-
-<p>The <b>n2o</b> defines the way you create, configure and run
-arbitrary applications and protocols inside some hosts, into
-which N2O can be injected, such as <b>cowboy</b> and <b>emqttd</b>.
-Each application can spawn its instance in its own way, like
-web pages spawn WebSocket connections, workflow engines
-spawn business proceseses and chat applications spawns roster
-and chatroom processes. With N2O everything is managed by protocols.</p>
-
-<p>The N2O itself a an enbeddable protocol loop. However besides that
-it handles cache and session ETS tables along with flexible async
-processes with no ownership restriction. It also introduces logging
-approach, AES/CBC&mdash;128 pickling and with BERT/JSON formatter.</p>
-
-    </section>
-    <section>
-
-<h3>RECORDS</h3>
-
-<figure><figcaption>Listing 1. Erlang/OTP records</figcaption><code>
-      #ok { data  = [] :: term() }.
-   #error { data  = [] :: term() }.
-
-</code></figure>
-
-<figure><figcaption>Listing 2. N2O Protocol</figcaption><code>
-   #reply { data  = [] :: [] | binary(),
-            req   = [] :: [] | term(),
-            state = [] :: [] | term() }.
-
- #unknown { data  = [] :: [] | binary(),
-            req   = [] :: [] | term(),
-            state = [] :: [] | term() }.
-
-</code></figure>
-
-<figure><figcaption>Listing 3. N2O State</figcaption><code>
-      #cx { session   = [] :: [] | binary(),
-            formatter = bert :: bert | json,
-            actions   = [] :: list(tuple()),
-            state     = [] :: [] | term(),
-            module    = [] :: [] | atom(),
-            lang      = [] :: [] | atom(),
-            path      = [] :: [] | binary(),
-            node      = [] :: [] | atom(),
-            pid       = [] :: [] | pid(),
-            vsn       = [] :: [] | integer() }).
-
-</code></figure>
-
-    </section>
-    <section>
-
-<h3>PROTOCOLS</h3>
-
-<p>While all application protocols in the system
-are desired to be placed in the single effectful
-environment or same error handling path, <b>n2o</b> defines
-single protocol loop for all applications in its federation of protocols.</p>
-
-<p>In core bundle <b>n2o</b> is shipped with NITRO and FTP protocols which
-allows you to create real-time web applications with binary-based protocols,
-priving also robust and performant upload client and file transfer protocol.
-For bulding web based NITRO applications you need to include
-<b>nitro</b> dependency.</p>
-
-<h4>info(term(),term(),#cx{}) -> #reply{} | #unknown{}.</h4>
-
-<p>The <b>info/2</b> is a N2O protocol callback that will be called on each
-incoming request. N2O code should be embedded into applications host:
-MQTT (as ring of MQTT clients), or HTTP server or WebSocket, or raw TCP.</p>
-
-    </section>
-    <section>
-
-<h3>RPC MQTT</h3>
-
-<p>N2O provides RPC over MQ mechanism for MQTT devices. N2O spawn a set of
-<a href="n2o_vnode.htm">n2o_vnode</a> workers as <a href="n2o_async.htm">n2o_async</a>
-processes that listen to <b>events</b> topic. Response are sent to <b>actions</b> topic,
-which is subscribed automatically on MQTT session init.</p>
-
-<figure><figcaption>Listing 4. MQTT RPC Topics</figcaption><code>
- actions/:vsn/:module/:client
-  events/:vsn/:node/:module/:client
-
-</code></figure>
-
-    </section>
-    <section>
-
-<h3>RPC WebSocket</h3>
-
-<p>In pure WebSocket case N2O implements <a href="n2o_proto.htm">n2o_proto</a> as cowboy module supporting binary and text messages.</p>
-
-<figure><figcaption>Listing 5. Cowboy stream protocol</figcaption><code>
- #binary { data :: binary() }.
-   #text { data :: binary() }.
-
-</code></figure>
-
-    </section>
-    <section>
-
-<h3>EXAMPLE</h3>
-
-<p>Here is little example of overriding INIT protocol message from NITRO protocol
-and generate standart token stored in KVS.</p>
-
-<figure><figcaption>Listing 6. Custom INIT Protocol</figcaption><code>
- -module(custom_init).
- -compile(export_all).
-
- info({init, <<>>}, Req, State = #cx{session = Session}) ->
-      {'Token', Token} = n2o_auth:gen_token([], Session),
-      #cx{params = Client} = get(context),
-      kvs:put(#'Token'{token = Token, client = Client}),
-      n2o_nitro:info({init, n2o:format(Token)}, Req, State);
-
- info(Message,Req,State) -> {unknown,Message,Req,State}.
-
-</code></figure>
-
-    </section>
-    <section>
-
-<h3>CONFIG</h3>
-
-<p>Just put protocol implementation module name to <b>protocol</b> option in sys.config.</p>
-
-<figure><code>
-  [{n2o,[{cache,n2o},
-         {mq,n2o},
-         {formatter,n2o_format},
-         {logging,n2o_io},
-         {log_modules,n2o},
-         {log_level,n2o},
-         {session,n2o_session},
-         {pickler,n2o_secret},
-         {protocols,[custom_init,n2o_ftp,n2o_nitro]},
-         {timer,{0,10,0}}]}].
-
-</code></figure>
-
-<p>N2O is the facade of the following services: cache, mq, message formating, loging,
-   sessions, pickling and protocol loops. The other part of N2O is <a href="n2o_async.htm">n2o_async</a> module
-   for spawning supervised application processes tha use N2O API. In this simple
-   configuration you may set any implementation to any service.</p>
-
-    </section>
-    <section>
-
-<h3>CACHE</h3>
-
-<p>Cache is fast expirable memory store. Just put values onto keys using these
-functions and system timer will clear expired entries eventually.
-You can select caching module implementation by seting <b>cache</b> n2o parameter to module name.
-Default n2o cache implementation just turns each ets store into expirable.</p>
-
-<h4>cache(Tab, Key, Value, Till) -> term().</h4>
-
-<p>Sets a Value with a given TTL.</p>
-
-<h4>cache(Tab, Key) -> term().</h4>
-
-<p>Gets a Value.</p>
-
-    </section>
-    <section>
-
-<h3>PUBSUB</h3>
-
-<p>The minimal requirement for any framework is to pub/sub API.
-   N2O provides selectable API through <b>mq</b> environment parameter.</p>
-
-<h4>subscribe(Client, Topic, Options) -> term().</h4>
-
-<p>Subscribe an absctract client to a transient topic. In particular
-   implementation the semantics could differ. In MQTT you can subscribe
-   offline/online clients to any persistent topics.</p>
-
-<h4>unsubscribe(Client, Topic, Options) -> term().</h4>
-
-<p>Unsubscribe an abstract client from a transient topic. In MQTT
-   we remove the subscription from persistent database.</p>
-
-<h4>publish(Topic, Message, Options) -> term().</h4>
-
-<p>Publish a message to a topic. In MQTT if clients are offline
-they will receive offline messages from the
-inflight srotarge once they become online.</p>
-
-    </section>
-    <section>
-
-<h3>FORMAT</h3>
-
-<p>Call this function for changeable at runtime socket binary formatting.
-   Used to format data field of the returned values in protocol implementations.
-   N2O provides selectable API through <b>formatter</b> environment parameter.</p>
-
-<h4>format(Message, bert | json) -> binary().</h4>
-
-<figure><code>
- > io:format("~ts~n",[n2o:format(#io{},json)]).
- {"t":104,"v":[{"t":100,"v":"io"},{"t":109,"v":""},{"t":109,"v":[131,106]}]}
- ok
-
- > n2o:format(#ok{code=undefined},bert).
- {binary,<<131,104,2,100,0,2,111,107,100,0,9,117,110,100,
-           101,102,105,110,101,100>>}
-
-</code></figure>
-
-    </section>
-    <section>
-
-<h3>LOG</h3>
-
-<p>First you need specify global module in sys.config,
-where functions <b>log_level</b> and <b>log_modules</b> are placed.
-See options in config with with same names as functions.</p>
-
-<p>Then implement these function in way of returning the list
-modules you want to trace, and global log level for them.</p>
-
-<figure><figcaption>Listing 7. Log Framework</figcaption>
-<code>
- log_modules() -> [n2o,n2o_async,n2o_proto].
- log_level() -> info.
-
-</code></figure>
-
-<p>In your code you should use following trace functions
-   which are the same as callback API for n2o logging
-   environment variable.</p>
-
-<h4>error(Module, Format, Args) -> ok | skip.</h4>
-
-<h4>info(Module, Format, Args) -> ok | skip.</h4>
-
-<h4>warning(Module, Format, Args) -> ok | skip.</h4>
-
-    </section>
-    <section>
-
-<h3>SESSION</h3>
-
-<p>Sessions are stored in <b>cookies</b> table and indexed by security token which
-is usually a password based token. All session variables from all users are stored in this table.
-each user see only its variables indexed by his token. Sessions like a cache are expirable.
-Technically, N2O sessions are the server controlling mechanism of JavaScript cookies.</p>
-
-<h4>session(Key, Value) -> term().</h4>
-
-<p>Sets a Value into ETS table <b>cookies</b> for a token from #cx.session which
-is set there earlier from INIT message or MQTT headers,
-before entering the top level N2O loop.</p>
-
-<figure><figcaption>Listing 8. Sessions</figcaption>
-<code>
- 1> rr(n2o).
- [bin,client,cx,direct,ev,flush,ftp,ftpack,handler,
-  mqtt_client,mqtt_message,pickle,server]
- 2> put(context,#cx{session=10}).
- undefined
- 3> n2o:session(user,maxim).
- maxim
- 4> ets:tab2list(cookies).
- [{{10,user},
-   <<"/">>,
-   {1504,977449,476430},
-   {{2017,9,9},{20,32,29}},
-   maxim}]
-
-</code></figure>
-
-<h4>session(Key) -> term().</h4>
-
-<p>Gets a Value by any Key.</p>
-
-    </section>
-    <section>
-
-<h3>PICKLE</h3>
-
-<p>Call this function for changeable at runtime term pickling.</p>
-
-<h4>pickle(term()) -> binary().</h4>
-
-<h4>depickle(binary()) -> term().</h4>
-
-    </section>
-    <section>
-
-<p>This module may refer to:
-<a href="http://erlang.org/doc/man/io.html">io</a></b>,
-<a href="http://erlang.org/doc/man/ets.html">ets</a></b>,
-<a href="n2o_async.htm">n2o_asynrc</a></b>,
-<a href="n2o_vnode.htm">n2o_vnode</a></b>,
-<a href="n2o_proto.htm"><b>n2o_proto</b></a>.
-</p>
-
-    </section>
-</main>
-
-<footer>
-    2005&mdash;2017 &copy; Synrc Research Center
-</footer>
-
-</body>
-</html>

+ 166 - 0
man/kvs_stream.htm

@@ -0,0 +1,166 @@
+<html>
+
+<head>
+    <meta charset="utf-8" />
+    <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
+    <meta name="viewport" content="width=device-width, initial-scale=1.0" />
+    <meta name="description" content="" />
+    <meta name="author" content="Maxim Sokhatsky" />
+    <title>STREAM</title>
+    <link rel="stylesheet" href="https://n2o.space/n2o.css" />
+</head>
+
+<body>
+
+<header>
+    <a href="../index.html"><img src="https://n2o.space/img/Synrc Neo.svg"></a>
+    <h1>STREAM</h1>
+</header>
+
+<main>
+    <section>
+
+<h3>INTRO</h3>
+
+<p>The <b>kvs_stream</b> is intended to store and retrieve doubly-linked lists using
+   simple key-value access to different databases through its backends:
+   redis, mongo, mnesia, riak, kai, fs.
+   The main descriptor of the list is the cursor which holds the cached value of one of list elements
+   and also two pointers to first and last elements along with default traversal direction.
+   Cursor should be stored in databse, if there is no cursor for some data then this data is not alive yet.
+   The data could be added only from list ends.
+   The data in list could be removed by record id.
+   The list could not contain duplicates or even records with the same id.
+   When you consume the stream, the data is not deleted, you need to remove it manually.</p>
+    </section>
+    <section>
+
+<h3>RECORDS</h3>
+
+<figure><figcaption>KVS CORE</figcaption><code>
+    #ok { data= [] :: term() }.
+
+ #error { data= [] :: term() }.
+
+   #cur { id=   [] :: term(),
+          val=  [] :: [] | tuple(),
+          dir=   0 ::  0 | 1,
+          top=  [] :: [] | integer(),
+          bot=  [] :: [] | integer()}).
+
+  #iter { id=   [] :: [] | integer(),
+          prev= [] :: [] | integer(),
+          next= [] :: [] | integer()).
+
+</code></figure>
+
+    </section>
+    <section>
+
+<h3>EXAMPLE</h3>
+
+<figure><code>
+ create_destroy() ->
+    Cur = new(),
+    [A,B,C,D] = [ kvs:next_id(person,1)
+             || _ <- lists:seq(1,4) ],
+    S  = kvs_stream:save(
+         kvs_stream:add(#person{id=A},
+         kvs_stream:down(
+         kvs_stream:add(#person{id=B},
+         kvs_stream:up(
+         kvs_stream:add(#person{id=C},
+         kvs_stream:down(
+         kvs_stream:add(#person{id=D},
+         kvs_stream:up(
+         kvs_stream:new()))))))))),
+    Y  = kvs_stream:remove(B,
+         kvs_stream:remove(D,
+         kvs_stream:remove(A,
+         kvs_stream:remove(C,S)))),
+    [] = kvs_stream:take(-1,
+         kvs_stream:down(
+         kvs_stream:top(Y))).
+
+</code></figure>
+    </section>
+    <section>
+
+<h3>API</h3>
+
+<h4>new() -> #cur{}.</h4>
+
+<p>Creates a KVS cursor.</p>
+
+<h4>save(#cur{}) -> #cur{}.</h4>
+
+<p>Saves cursor to database.<?p>
+
+<h4>load() -> #ok{data::#cur{}} | #error{}.</h4>
+
+<p>Gets a curson from database.</p>
+
+<h4>next(#cur{}) -> #cur{}.</h4>
+
+<p>Moves cursor to next. Consume data top down.</p>
+
+<h4>prev(#cur{}) -> #cur{}.</h4>
+
+<p>Moves cursor to prev. Consume data bottom up.</p>
+
+<h4>seek(Id,#cur{}) -> #cur{} | #error{}.</h4>
+
+<p>Moves cursor to Id record. If cursor has no cached value then function returns error.</p>
+
+<h4>top(#cur{}) -> #cur{}.</h4>
+
+<p>Moves cursor to top of the list.</p>
+
+<h4>bot(#cur{}) -> #cur{}.</h4>
+
+<p>Moves cursor to bottom of the list.</p>
+
+<h4>add(Message,#cur{}) -> #cur{}.</h4>
+
+<p>Adds message to datatabase and update cursor to new data.
+   Message is linked on next prev fields with existed data under cursor.</p>
+
+<h4>remove(Id,#cur{}) -> #cur{} | #error{}.</h4>
+
+<p>Removes record by id from database and unlink it from list.
+   If cursor has no cached value then function returns error.</p>
+
+<h4>take(N,#cur{}) -> list().</h4>
+
+<p>Consumes records from stream using its current value and direction. Returns consumed data.</p>
+
+    </section>
+    <section>
+
+<h3>CONFIG</h3>
+
+<p>In sys.config you should specify kvs backend and list of modules containing <b>metainfo/0</b> exported function.</p>
+
+<figure><code>
+  [{kvs, [{dba, store_mnesia},
+          {schema, [kvs]} ]}].
+
+</code></figure>
+
+    </section>
+    <section>
+
+<p>This module may refer to:
+<a href="http://erlang.org/doc/man/mnesia.html">mnesia</a></b>,
+<a href="kvs.htm"><b>kvs</b></a>.
+</p>
+
+    </section>
+</main>
+
+<footer>
+    2005&mdash;2017 &copy; Synrc Research Center
+</footer>
+
+</body>
+</html>

+ 75 - 58
src/kvs_stream.erl

@@ -2,52 +2,58 @@
 -include("kvs.hrl").
 -include("user.hrl").
 -compile(export_all).
--export([ new/1, top/1, bot/1, take/2, load/1, save/1, seek/2, next/1, prev/1, add/2 ]).
+-export([ new/0, top/1, bot/1, take/2, load/1, save/1, seek/2, next/1, prev/1, add/2, remove/2 ]).
 
 % PUBLIC
 
-new(T)                     -> #cur{feed=kvs:next_id(cur,1),tab=T}.
-top(#cur{top=[]}=C)        -> C#cur{val=[]};
-top(#cur{top=T}=C)         -> seek(T,C).
-bot(#cur{bot=[]}=C)        -> C#cur{val=[]};
-bot(#cur{bot=B}=C)         -> seek(B,C).
-save(#cur{}=C)             -> kvs:put(C), C.
-load(#cur{feed=K})         -> kvs:get(cur,K).
-next(#cur{tab=T,val=[]}=C) -> {error,[]};
-next(#cur{tab=T,val=B}=C)  -> lookup(kvs:get(T,en(B)),C).
-prev(#cur{tab=T,val=[]}=C) -> {error,[]};
-prev(#cur{tab=T,val=B}=C)  -> lookup(kvs:get(T,ep(B)),C).
-take(N,#cur{dir=D}=C)      -> take(D,N,C,[]).
-seek(Id, #cur{tab=T}=C)    -> {ok,R}=kvs:get(T,Id), C#cur{val=R}.
-add(M,#cur{dir=D}=C)       -> add(dir(D),M,C).
-remove(Id, #cur{tab=M}=C)  -> {ok,R}=kvs:get(M,Id), kvs:delete(M,Id),
-                              join([fix(M,X)||X<-[ep(R),en(R)]],C).
+new() -> #cur{id=kvs:next_id(cur,1)}.
+top(#cur{top=[]}=C)   -> C#cur{val=[]};
+top(#cur{top=T}=C)    -> seek(T,C).
+bot(#cur{bot=[]}=C)   -> C#cur{val=[]};
+bot(#cur{bot=B}=C)    -> seek(B,C).
+add(M,#cur{dir=D}=C)  -> add(dir(D),M,C).
+save(#cur{}=C)        -> kvs:put(C), C.
+load(K)               -> kvs:get(cur,K).
+next(#cur{val=[]}=C)  -> {error,[]};
+next(#cur{val=B}=C)   -> lookup(kvs:get(tab(B),en(B)),C).
+prev(#cur{val=[]}=C)  -> {error,[]};
+prev(#cur{val=B}=C)   -> lookup(kvs:get(tab(B),ep(B)),C).
+take(N,#cur{dir=D}=C) -> take(acc(D),N,C,[]).
+
+seek(I,  #cur{val=[]}=C) -> {error,val};
+seek(I,   #cur{val=B}=C) -> {ok,R}=kvs:get(tab(B),I), C#cur{val=R}.
+remove(I,#cur{val=[]}=C) -> {error,val};
+remove(I, #cur{val=B}=C) -> {ok,R}=kvs:get(tab(B),I), kvs:delete(tab(B),I),
+                            join([fix(tab(B),X)||X<-[ep(R),en(R)]],C).
 
 % PRIVATE
 
 add(top,M,#cur{top=T,val=[]}=C) -> Id=id(M), N=sp(sn(M,T),[]), kvs:put(N),     C#cur{val=N,bot=Id,top=Id};
 add(bot,M,#cur{bot=B,val=[]}=C) -> Id=id(M), N=sn(sp(M,B),[]), kvs:put(N),     C#cur{val=N,bot=Id,top=Id};
-add(top,M,#cur{top=T, val=V}=C) when element(2,V) /=T -> add(top, M, top(C));
-add(bot,M,#cur{bot=B, val=V}=C) when element(2,V) /=B -> add(bot, M, bot(C));
+add(top,M,#cur{top=T, val=V}=C) when element(2,V)/=T -> n2o:warning(?MODULE,"cur:~p",[V]), add(top, M, top(C));
+add(bot,M,#cur{bot=B, val=V}=C) when element(2,V)/=B -> n2o:warning(?MODULE,"cur:~p",[V]), add(bot, M, bot(C));
 add(top,M,#cur{top=T, val=V}=C) -> Id=id(M), N=sp(sn(M,T),[]), kvs:put([N,sp(V,Id)]), C#cur{val=N,top=Id};
 add(bot,M,#cur{bot=B, val=V}=C) -> Id=id(M), N=sn(sp(M,B),[]), kvs:put([N,sn(V,Id)]), C#cur{val=N,bot=Id}.
 
-join([[],[]],C) ->                                          C#cur{top=[],bot=[],val=[]};
-join([[], R],C) -> N=sp(R,[]),    kvs:put(N),               C#cur{top=id(N),val=N};
-join([L, []],C) -> N=sn(L,[]),    kvs:put(N),               C#cur{bot=id(N),val=N};
-join([L,  R],C) -> N=sp(R,id(L)), kvs:put([N,sn(L,id(R))]), C#cur{val=N}.
-
-sn(M,T)   -> setelement(#iterator.next, M, T).
-sp(M,T)   -> setelement(#iterator.prev, M, T).
-el(X,T)   -> element(X,T).
-tab(T)    -> element(1,T).
-id(T)     -> element(2,T).
-en(T)     -> element(#iterator.next, T).
-ep(T)     -> element(#iterator.prev, T).
-dir(next) -> top;
-dir(prev) -> bot.
-down(C)   -> C#cur{dir=next}.
-up(C)     -> C#cur{dir=prev}.
+join([[],[]],C) ->                                          C#cur{top=[],bot=[],   val=[]};
+join([[], R],C) -> N=sp(R,[]),    kvs:put(N),               C#cur{top=id(N),       val=N};
+join([L, []],C) -> N=sn(L,[]),    kvs:put(N),               C#cur{       bot=id(N),val=N};
+join([L,  R],C) -> N=sp(R,id(L)), kvs:put([N,sn(L,id(R))]), C#cur{                 val=N}.
+
+sn(M,T) -> setelement(#iter.next, M, T).
+sp(M,T) -> setelement(#iter.prev, M, T).
+el(X,T) -> element(X,T).
+tab(T)  -> element(1,T).
+id(T)   -> element(2,T).
+
+en(T)   -> element(#iter.next, T).
+ep(T)   -> element(#iter.prev, T).
+dir(0)  -> top;
+dir(1)  -> bot.
+acc(0)  -> next;
+acc(1)  -> prev.
+down(C) -> C#cur{dir=0}.
+up(C)   -> C#cur{dir=1}.
 
 fix(M,[])   -> [];
 fix(M,X)    -> fix(kvs:get(M,X)).
@@ -63,38 +69,49 @@ take(A,N,#cur{val=B}=C,R) -> take(A,N-1,?MODULE:A(C),[B|R]).
 
 % TESTS
 
-check() -> test1(), test2(), test3(), ok.
+check() -> test1(), test2(), create_destroy(), ok.
 
 test2() ->
-    Cur = new(user),
-    [A,B,C,D] = [ kvs:next_id(user,1) || _ <- lists:seq(1,4) ],
-    R = save(add(#user{id=A},
-        down(add(#user{id=B},
-          up(add(#user{id=C},
-        down(add(#user{id=D},
+    Cur = new(),
+    [A,B,C,D] = [ kvs:next_id(person,1) || _ <- lists:seq(1,4) ],
+    R = save(add(#person{id=A},
+        down(add(#person{id=B},
+          up(add(#person{id=C},
+        down(add(#person{id=D},
         up(Cur))))))))),
     X = remove(A,remove(B,remove(C,remove(D,R)))),
     [] = take(-1,down(top(X))).
 
-test3() ->
-    Cur = new(user),
-    [A,B,C,D] = [ kvs:next_id(user,1) || _ <- lists:seq(1,4) ],
-    S = save(add(#user{id=A},
-        down(add(#user{id=B},
-          up(add(#user{id=C},
-        down(add(#user{id=D},
-        up(Cur))))))))),
-    Y = remove(B,remove(D,remove(A,remove(C,S)))),
-    [] = take(-1,down(top(Y))).
+create_destroy() ->
+    Cur = new(),
+    [A,B,C,D] = [ kvs:next_id(person,1)
+             || _ <- lists:seq(1,4) ],
+    S  = kvs_stream:save(
+         kvs_stream:add(#person{id=A},
+         kvs_stream:down(
+         kvs_stream:add(#person{id=B},
+         kvs_stream:up(
+         kvs_stream:add(#person{id=C},
+         kvs_stream:down(
+         kvs_stream:add(#person{id=D},
+         kvs_stream:up(
+         kvs_stream:new()))))))))),
+    Y  = kvs_stream:remove(B,
+         kvs_stream:remove(D,
+         kvs_stream:remove(A,
+         kvs_stream:remove(C,S)))),
+    [] = kvs_stream:take(-1,
+         kvs_stream:down(
+         kvs_stream:top(Y))).
 
 test1() ->
-    Cur = new(user),
+    Cur = new(),
     take(-1,down(top(Cur))),
-    [A,B,C,D] = [ kvs:next_id(user,1) || _ <- lists:seq(1,4) ],
-    R = save(add(top,#user{id=A},
-             add(bot,#user{id=B},
-             add(top,#user{id=C},
-             add(bot,#user{id=D}, Cur ))))),
+    [A,B,C,D] = [ kvs:next_id(person,1) || _ <- lists:seq(1,4) ],
+    R = save(add(top,#person{id=A},
+             add(bot,#person{id=B},
+             add(top,#person{id=C},
+             add(bot,#person{id=D}, Cur ))))),
     X = take(-1,down(top(R))),
     Y = take(-1,up(bot(R))),
     X = lists:reverse(Y),

+ 1 - 1
src/kvs_user.erl

@@ -8,7 +8,7 @@
 
 metainfo() ->
     #schema{name=kvs,tables=[
-%        #table{name=user2,container=feed,fields=record_info(fields,user2)},
+        #table{name=person,container=feed,fields=record_info(fields,person)},
         #table{name=group,container=feed,fields=record_info(fields,group)},
         #table{name=cur,container=feed,fields=record_info(fields,cur)},
         #table{name=user,container=feed,fields=record_info(fields,user),keys=[email]}