INTRO
The kvs_stream is intended to store and retrieve doubly-linked lists using simple key-value access to different databases through its backends: redis, mongo, mnesia, riak, kai, fs. The main descriptor of the list is the cursor which holds the cached value of one of list elements and also two pointers to first and last elements along with default traversal direction. Cursor should be stored in databse, if there is no cursor for some data then this data is not alive yet. The data could be added only to list ends. The data in list could be removed only by record id. The list could not contain duplicates or even records with the same id. When you consume the stream, the data is not deleted.
RECORDS
#ok { data= [] :: term() }.
#error { data= [] :: term() }.
#cur { id= [] :: term(),
val= [] :: [] | tuple(),
dir= 0 :: 0 | 1,
top= [] :: [] | integer(),
bot= [] :: [] | integer()}).
#iter { id= [] :: [] | integer(),
prev= [] :: [] | integer(),
next= [] :: [] | integer()).
- id — Unique key of the cursor
- val — Cached value of current element of the list
- dir — 0 from top do next, 1 from bot do prev.
- top — The top of the list
- bot — The bottom of the list
- id — Unique key of the record in the list
- next — The next element of the list
- prev — The prev element of the list
EXAMPLE
-module(test).
-include_lib("kvs/include/kvs.hrl").
-include_lib("kvs/include/user.hrl").
-compile(export_all).
check() ->
Cur = kvs_stream:new(),
[A,B,C,D] = [ kvs:next_id(person,1)
|| _ <- lists:seq(1,4) ],
R = kvs_stream:save(
kvs_stream:add(#person{id=A},
kvs_stream:add(#person{id=B},
kvs_stream:add(#person{id=C},
kvs_stream:add(#person{id=D}, Cur ))))),
X = kvs_stream:take(-1,
kvs_stream:down(
kvs_stream:top(R))),
Y = kvs_stream:take(-1,
kvs_stream:up(
kvs_stream:bot(R))),
X = lists:reverse(Y),
L = length(X).
> test:check().
4
> kvs:all(cur).
[{cur,1,{person,1,3,[],[],[],[],[],[]},0,1,2}]
> kvs_stream:take(-1,
kvs_stream:top(kvs:get(cur,1))).
[{person,2,[],4,[],[],[],[],[]},
{person,4,2,3,[],[],[],[],[]},
{person,3,4,1,[],[],[],[],[]},
{person,1,3,[],[],[],[],[],[]}]
API
We prepared for you sequential steps manual. Just enter the erlang commands in shell in right order and check the results.
new() -> #cur{}.
Creates a KVS cursor.
> kvs_stream:new().
{cur,1,[],0,[],[]}
> kvs:get(id_seq,"cur").
{ok,{id_seq,"cur",1}}
> kvs_stream:new().
{cur,2,[],0,[],[]}
> kvs:get(id_seq,"cur").
{ok,{id_seq,"cur",2}}
save(#cur{}) -> #cur{}.
Saves the cursor to database.
> kvs:all(cur).
[]
> kvs_stream:save(kvs_stream:new()).
{cur,3,[],0,[],[]}
> kvs:all(cur).
[{cur,3,[],0,[],[]}]
load(Id) -> #ok{data::#cur{}} | #error{}.
Gets a cursor from database.
> kvs_stream:load(3).
{cur,3,[],[],0,[]}
add(Message,#cur{}) -> #cur{}.
Adds message to datatabase and update cursor to new data. Message is linked on next prev fields with existed data under cursor. If cursor doesn't contain top or bottom value the additional seek to the end is performed according to cursor direction.
> rr(kvs_user).
[block,column,container,cur,group,id_seq,iter,iterator,kvs,
log,operation,person,query,schema,table,user]
> kvs_stream:save(
kvs_stream:add(#person{},
kvs_stream:load(3))).
#cur{id = 3,top = 1,bot = 1,dir = 0,
val = #person{id = 1,next = [],prev = [],mail = [],
name = [],pass = [],zone = [],type = []}}
prev(#cur{}) -> #cur{} | #error{}.
Moves cursor to prev. Consume data bottom up. Reutrn error if lists is empty, otherwise next element or last.
> kvs_stream:prev(kvs_stream:load(3)).
{error,not_found}
next(#cur{}) -> #cur{} | #error{}.
Moves cursor to next. Consume data top down. Reutrn error if lists is empty, otherwise next element or last.
> kvs_stream:next(
kvs_stream:save(
kvs_stream:add(#person{},
kvs_stream:load(3)))).
#cur{id = 3,top = 2,bot = 1,dir = 0,
val = #person{id = 1,next = [],prev = 2,mail = [],name = [],
pass = [],zone = [],type = []}}
seek(Id,#cur{}) -> #cur{} | #error{}.
Moves cursor to record by its id. If cursor has no cached value then function returns error.
> kvs_stream:seek(2,
kvs_stream:save(
kvs_stream:add(#person{id=kvs:next_id(person,1)},
kvs_stream:load(3)))).
#cur{id = 3,top = 3,bot = 1,dir = 0,
val = #person{id = 2,next = 1,prev = 3,mail = [],name = [],
pass = [],zone = [],type = []}}
top(#cur{}) -> #cur{} | #error{}.
Moves cursor to top of the list.
> kvs_stream:top(
kvs_stream:load(3)).
#cur{id = 3,top = 3,bot = 1,dir = 0,
val = #person{id = 3,next = 2,prev = [],mail = [],name = [],
pass = [],zone = [],type = []}}
bot(#cur{}) -> #cur{} | #error{}.
Moves cursor to bottom of the list.
> kvs_stream:bot(kvs_stream:load(3)).
#cur{id = 3,top = 3,bot = 1,dir = 0,
val = #person{id = 1,next = [],prev = 2,mail = [],name = [],
pass = [],zone = [],type = []}}
take(N,#cur{}) -> list().
Trying to consume N records from stream using its current value and direction. Returns consumed data. Usually you seek to some position and then consume some data.
> kvs_stream:take(-1,kvs_stream:load(3)).
[#person{id = 1,next = [],prev = 2, mail = [],name = [],
pass = [],zone = [],type = []},
#person{id = 2,next = 1,prev = 3,mail = [],name = [],
pass = [],zone = [],type = []},
#person{id = 3,next = 2,prev = [],mail = [],name = [],
pass = [],zone = [],type = []}]
remove(Id,#cur{}) -> #cur{} | #error{}.
Removes record by id from database and unlink it from list. If cursor has no cached value then function returns error. Please do not use remove, keep your data immutable :-)
> kvs_stream:take(-1,
kvs_stream:save(
kvs_stream:remove(2,
kvs_stream:load(3)))).
[#person{id = 1,next = [],prev = 3,mail = [],name = [],
pass = [],zone = [],type = []},
#person{id = 3,next = 1,prev = [],mail = [],name = [],
pass = [],zone = [],type = []}]
down(#cur{}) -> #cur{}.
Changes the cursor direction.
up(#cur{}) -> #cur{}.
Changes the cursor direction.
CONFIG
In sys.config you should specify kvs backend and list of modules containing metainfo/0 exported function.
[{kvs, [{dba, store_mnesia},
{schema, [kvs]} ]}].