STREAM

INTRO

The kvs_stream is intended to store and retrieve doubly-linked lists using simple key-value access to different databases through its backends: redis, mongo, mnesia, riak, kai, fs. The main descriptor of the list is the cursor which holds the cached value of one of list elements and also two pointers to first and last elements along with default traversal direction. Cursor should be stored in databse, if there is no cursor for some data then this data is not alive yet. The data could be added only from list ends. The data in list could be removed by record id. The list could not contain duplicates or even records with the same id. When you consume the stream, the data is not deleted, you need to remove it manually.

RECORDS

KVS CORE
#ok { data= [] :: term() }. #error { data= [] :: term() }. #cur { id= [] :: term(), val= [] :: [] | tuple(), dir= 0 :: 0 | 1, top= [] :: [] | integer(), bot= [] :: [] | integer()}). #iter { id= [] :: [] | integer(), prev= [] :: [] | integer(), next= [] :: [] | integer()).

EXAMPLE

create_destroy() -> Cur = new(), [A,B,C,D] = [ kvs:next_id(person,1) || _ <- lists:seq(1,4) ], S = kvs_stream:save( kvs_stream:add(#person{id=A}, kvs_stream:down( kvs_stream:add(#person{id=B}, kvs_stream:up( kvs_stream:add(#person{id=C}, kvs_stream:down( kvs_stream:add(#person{id=D}, kvs_stream:up( kvs_stream:new()))))))))), Y = kvs_stream:remove(B, kvs_stream:remove(D, kvs_stream:remove(A, kvs_stream:remove(C,S)))), [] = kvs_stream:take(-1, kvs_stream:down( kvs_stream:top(Y))). > kvs_stream:check(). ok > kvs:all(cur). [{cur,8,[],0,[],[]}, {cur,4,[],0,[],[]}, {cur,9,{person,21,23,[],[],[],[],[],[]},0,21,22}, {cur,6,{person,13,15,[],[],[],[],[],[]},0,13,14}, {cur,5,{person,9,11,[],[],[],[],[],[]},0,9,10}] > kvs_stream:take(-1, kvs_stream:down( kvs_stream:top( kvs_stream:seek(24, kvs:get(cur,9))))). [{person,22,[],24,[],[],[],[],[]}, {person,24,22,23,[],[],[],[],[]}, {person,23,24,21,[],[],[],[],[]}, {person,21,23,[],[],[],[],[],[]}]

API

new() -> #cur{}.

Creates a KVS cursor.

save(#cur{}) -> #cur{}.

Saves cursor to database.

load() -> #ok{data::#cur{}} | #error{}.

Gets a curson from database.

next(#cur{}) -> #cur{}.

Moves cursor to next. Consume data top down.

prev(#cur{}) -> #cur{}.

Moves cursor to prev. Consume data bottom up.

seek(Id,#cur{}) -> #cur{} | #error{}.

Moves cursor to record by its id. If cursor has no cached value then function returns error.

top(#cur{}) -> #cur{}.

Moves cursor to top of the list.

bot(#cur{}) -> #cur{}.

Moves cursor to bottom of the list.

add(Message,#cur{}) -> #cur{}.

Adds message to datatabase and update cursor to new data. Message is linked on next prev fields with existed data under cursor. If cursor doesn't contain top or bottom value the additional seek to the end is performed according to cursor direction.

remove(Id,#cur{}) -> #cur{} | #error{}.

Removes record by id from database and unlink it from list. If cursor has no cached value then function returns error.

take(N,#cur{}) -> list().

Trying to consume N records from stream using its current value and direction. Returns consumed data.

CONFIG

In sys.config you should specify kvs backend and list of modules containing metainfo/0 exported function.

[{kvs, [{dba, store_mnesia}, {schema, [kvs]} ]}].

This module may refer to: mnesia, kvs.