Browse Source

Merge pull request #52 from synrc/b1

api cleanup. tests
Andrii Zadorozhnii 4 years ago
parent
commit
e6a07a702f

+ 1 - 1
.github/workflows/elixir.yml

@@ -17,4 +17,4 @@ jobs:
     - name: Compilation
     - name: Compilation
       run: mix compile
       run: mix compile
     - name: Tests
     - name: Tests
-      run: mix test test/test_helper.exs
+      run: mix test

+ 4 - 0
.gitignore

@@ -3,3 +3,7 @@ ebin
 *.gz
 *.gz
 *.dump
 *.dump
 .applist
 .applist
+rocksdb/
+deps/
+_build/
+*.lock

+ 2 - 1
include/api.hrl

@@ -1,6 +1,6 @@
 -ifndef(API_HRL).
 -ifndef(API_HRL).
 -define(API_HRL, true).
 -define(API_HRL, true).
--define(API,[start/0,stop/0,leave/0,leave/1,
+-define(API,[start/0,stop/0,leave/0,leave/1,destroy/0,
              join/0,join/1,modules/0,cursors/0,get/2,get/3,put/1,put/2,index/3,delete/2,
              join/0,join/1,modules/0,cursors/0,get/2,get/3,put/1,put/2,index/3,delete/2,
              table/1,tables/0,dir/0,initialize/2,seq/2,all/1,all/2,count/1,ver/0]).
              table/1,tables/0,dir/0,initialize/2,seq/2,all/1,all/2,count/1,ver/0]).
 -include("metainfo.hrl").
 -include("metainfo.hrl").
@@ -9,6 +9,7 @@
 -spec dir() -> list({'table',atom()}).
 -spec dir() -> list({'table',atom()}).
 -spec ver() -> {'version',string()}.
 -spec ver() -> {'version',string()}.
 -spec leave() -> ok.
 -spec leave() -> ok.
+-spec destroy() -> ok.
 -spec join() -> ok | {error,any()}.
 -spec join() -> ok | {error,any()}.
 -spec join(Node :: string()) -> [{atom(),any()}].
 -spec join(Node :: string()) -> [{atom(),any()}].
 -spec modules() -> list(atom()).
 -spec modules() -> list(atom()).

+ 1 - 1
include/backend.hrl

@@ -1,7 +1,7 @@
 -ifndef(BACKEND_HRL).
 -ifndef(BACKEND_HRL).
 -define(BACKEND_HRL, true).
 -define(BACKEND_HRL, true).
 -define(BACKEND, [get/2,put/1,delete/2,index/3,dump/0,start/0,stop/0,destroy/0,
 -define(BACKEND, [get/2,put/1,delete/2,index/3,dump/0,start/0,stop/0,destroy/0,
-                  join/1,dir/0,create_table/2,add_table_index/2,seq/2,all/1,count/1,version/0]).
+                  join/1,leave/0,dir/0,create_table/2,add_table_index/2,seq/2,all/1,count/1,version/0]).
 -compile({no_auto_import,[get/1,put/2]}).
 -compile({no_auto_import,[get/1,put/2]}).
 -spec put(tuple() | list(tuple())) -> ok | {error,any()}.
 -spec put(tuple() | list(tuple())) -> ok | {error,any()}.
 -spec get(term() | any(), any()) -> {ok,any()} | {error,not_found}.
 -spec get(term() | any(), any()) -> {ok,any()} | {error,not_found}.

+ 4 - 4
include/cursors.hrl

@@ -3,13 +3,13 @@
 
 
 -record(writer, { id    = [] :: term(),
 -record(writer, { id    = [] :: term(),
                   count =  0 :: integer(),
                   count =  0 :: integer(),
-                  cache = [] :: [] | integer() | {term(),term()},
-                  args  = [] :: term(),
+                  cache = [] :: [] | {term(),term()} | {term(),term(),term()},
+                  args  = [] :: [] | term(),
                   first = [] :: [] | tuple() } ).
                   first = [] :: [] | tuple() } ).
 -record(reader, { id    = [] :: [] | integer(),
 -record(reader, { id    = [] :: [] | integer(),
                 pos   =  0 :: integer() | atom(),
                 pos   =  0 :: integer() | atom(),
-                cache = [] :: [] | integer() | {term(),term()},
-                args  = 0 :: term(),
+                cache = [] :: [] | {term(),term()} | {term(),term(),term()},
+                args  = [] :: [] | integer() | term(),
                 feed  = [] :: term(),
                 feed  = [] :: term(),
                 seek = [] :: term(),
                 seek = [] :: term(),
                 count = 0 :: integer(),
                 count = 0 :: integer(),

+ 7 - 5
include/stream.hrl

@@ -2,14 +2,16 @@
 -define(STREAM_HRL, true).
 -define(STREAM_HRL, true).
 -include("kvs.hrl").
 -include("kvs.hrl").
 -include("cursors.hrl").
 -include("cursors.hrl").
--define(STREAM, [top/1, bot/1, next/1, prev/1, drop/1, take/1, append/2, cut/2,
+-define(STREAM, [top/1, bot/1, next/1, prev/1, drop/1, take/1, append/2, cut/2, feed/1,
                  load_reader/1, writer/1, reader/1, save/1, add/1, remove/2]).
                  load_reader/1, writer/1, reader/1, save/1, add/1, remove/2]).
--spec top(#reader{}) -> #reader{}.
--spec bot(#reader{}) -> #reader{}.
--spec next(#reader{}) -> #reader{} | {error,not_found | empty}.
--spec prev(#reader{}) -> #reader{} | {error,not_found | empty}.
+
+-spec top(#reader{})  -> #reader{}.
+-spec bot(#reader{})  -> #reader{}.
+-spec next(#reader{}) -> #reader{}.
+-spec prev(#reader{}) -> #reader{}.
 -spec drop(#reader{}) -> #reader{}.
 -spec drop(#reader{}) -> #reader{}.
 -spec take(#reader{}) -> #reader{}.
 -spec take(#reader{}) -> #reader{}.
+-spec feed (term())   -> #reader{}.
 -spec load_reader (term()) -> #reader{}.
 -spec load_reader (term()) -> #reader{}.
 -spec writer (term()) -> #writer{}.
 -spec writer (term()) -> #writer{}.
 -spec reader (term()) -> #reader{}.
 -spec reader (term()) -> #reader{}.

+ 4 - 2
src/kvs.erl

@@ -54,6 +54,7 @@ join()             -> join    ([],    #kvs{mod=dba()}).
 dump()             -> dump    (#kvs{mod=dba()}).
 dump()             -> dump    (#kvs{mod=dba()}).
 join(Node)         -> join    (Node,  #kvs{mod=dba()}).
 join(Node)         -> join    (Node,  #kvs{mod=dba()}).
 leave()            -> leave   (#kvs{mod=dba()}).
 leave()            -> leave   (#kvs{mod=dba()}).
+destroy()          -> destroy (#kvs{mod=dba()}).
 count(Table)       -> count   (Table, #kvs{mod=dba()}).
 count(Table)       -> count   (Table, #kvs{mod=dba()}).
 put(Record)        -> ?MODULE:put     (Record, #kvs{mod=dba()}).
 put(Record)        -> ?MODULE:put     (Record, #kvs{mod=dba()}).
 fold(Fun,Acc,T,S,C,D) -> fold (Fun,Acc,T,S,C,D, #kvs{mod=dba()}).
 fold(Fun,Acc,T,S,C,D) -> fold (Fun,Acc,T,S,C,D, #kvs{mod=dba()}).
@@ -100,6 +101,7 @@ start(#kvs{mod=DBA}) -> DBA:start().
 stop_kvs(#kvs{mod=DBA}) -> DBA:stop().
 stop_kvs(#kvs{mod=DBA}) -> DBA:stop().
 join(Node,#kvs{mod=DBA}) -> DBA:join(Node).
 join(Node,#kvs{mod=DBA}) -> DBA:join(Node).
 leave(#kvs{mod=DBA}) -> DBA:leave().
 leave(#kvs{mod=DBA}) -> DBA:leave().
+destroy(#kvs{mod=DBA}) -> DBA:destroy().
 ver(#kvs{mod=DBA}) -> DBA:version().
 ver(#kvs{mod=DBA}) -> DBA:version().
 tables() -> lists:flatten([ (M:metainfo())#schema.tables || M <- modules() ]).
 tables() -> lists:flatten([ (M:metainfo())#schema.tables || M <- modules() ]).
 table(Name) when is_atom(Name) -> lists:keyfind(Name,#table.name,tables());
 table(Name) when is_atom(Name) -> lists:keyfind(Name,#table.name,tables());
@@ -138,9 +140,9 @@ count(Tab,#kvs{mod=DBA}) -> DBA:count(Tab).
 index(Tab, Key, Value,#kvs{mod=DBA}) -> DBA:index(Tab, Key, Value).
 index(Tab, Key, Value,#kvs{mod=DBA}) -> DBA:index(Tab, Key, Value).
 seq(Tab, Incr,#kvs{mod=DBA}) -> DBA:seq(Tab, Incr).
 seq(Tab, Incr,#kvs{mod=DBA}) -> DBA:seq(Tab, Incr).
 dump(#kvs{mod=Mod}) -> Mod:dump().
 dump(#kvs{mod=Mod}) -> Mod:dump().
-feed(Key,#kvs{st=Mod}=KVS) -> (Mod:take((kvs:reader(Key))#reader{args=-1}))#reader.args.
+feed(Tab,#kvs{st=Mod}) -> Mod:feed(Tab).
 remove(Rec,Feed) -> remove(Rec,Feed,#kvs{mod=dba(),st=kvs_stream()}).
 remove(Rec,Feed) -> remove(Rec,Feed,#kvs{mod=dba(),st=kvs_stream()}).
-remove(Rec,Feed, #kvs{st=Mod}=KVS) -> Mod:remove(Rec,Feed).
+remove(Rec,Feed, #kvs{st=Mod}) -> Mod:remove(Rec,Feed).
 head(Key) -> case (kvs:take((kvs:reader(Key))#reader{args=1}))#reader.args of [X] -> X; [] -> [] end.
 head(Key) -> case (kvs:take((kvs:reader(Key))#reader{args=1}))#reader.args of [X] -> X; [] -> [] end.
 head(Key,Count) -> (kvs:take((kvs:reader(Key))#reader{args=Count,dir=1}))#reader.args.
 head(Key,Count) -> (kvs:take((kvs:reader(Key))#reader{args=Count,dir=1}))#reader.args.
 fetch(Table, Key) -> fetch(Table, Key, []).
 fetch(Table, Key) -> fetch(Table, Key, []).

+ 1 - 0
src/layers/kvs_stream.erl

@@ -50,6 +50,7 @@ w({ok,#writer{cache=B,count=Size}},top,C) -> C#reader{cache={tab(B),id(B)},pos=S
 w({error,X},_,_)                          -> {error,X}.
 w({error,X},_,_)                          -> {error,X}.
 
 
 % section: take, drop
 % section: take, drop
+feed(Feed) -> take((reader(Feed))#reader{args=-1}).
 
 
 drop(#reader{cache=[]}=C) -> C#reader{args=[]};
 drop(#reader{cache=[]}=C) -> C#reader{args=[]};
 drop(#reader{dir=D,cache=B,args=N,pos=P}=C)  -> drop(acc(D),N,C,C,P,B).
 drop(#reader{dir=D,cache=B,args=N,pos=P}=C)  -> drop(acc(D),N,C,C,P,B).

+ 1 - 0
src/stores/kvs_fs.erl

@@ -8,6 +8,7 @@ start()    -> ok.
 stop()     -> ok.
 stop()     -> ok.
 destroy()  -> ok.
 destroy()  -> ok.
 version()  -> {version,"KVS FS"}.
 version()  -> {version,"KVS FS"}.
+leave()    -> ok.
 dir()      -> [ {table,F} || F <- filelib:wildcard("data/*"), filelib:is_dir(F) ].
 dir()      -> [ {table,F} || F <- filelib:wildcard("data/*"), filelib:is_dir(F) ].
 join(_Node) -> filelib:ensure_dir("data/"), initialize(). % should be rsync or smth
 join(_Node) -> filelib:ensure_dir("data/"), initialize(). % should be rsync or smth
 initialize() ->
 initialize() ->

+ 2 - 1
src/stores/kvs_mnesia.erl

@@ -9,6 +9,7 @@
 start()    -> mnesia:start().
 start()    -> mnesia:start().
 stop()     -> mnesia:stop().
 stop()     -> mnesia:stop().
 destroy()  -> [mnesia:delete_table(T)||{_,T}<-kvs:dir()], mnesia:delete_schema([node()]), ok.
 destroy()  -> [mnesia:delete_table(T)||{_,T}<-kvs:dir()], mnesia:delete_schema([node()]), ok.
+leave()    -> ok.
 version()  -> {version,"KVS MNESIA"}.
 version()  -> {version,"KVS MNESIA"}.
 dir()      -> [{table,T}||T<-mnesia:system_info(local_tables)].
 dir()      -> [{table,T}||T<-mnesia:system_info(local_tables)].
 join([])   -> mnesia:start(), mnesia:change_table_copy_type(schema, node(), disc_copies), initialize();
 join([])   -> mnesia:start(), mnesia:change_table_copy_type(schema, node(), disc_copies), initialize();
@@ -41,7 +42,7 @@ count(RecordName) -> mnesia:table_info(RecordName, size).
 all(R) -> lists:flatten(many(fun() -> L= mnesia:all_keys(R), [ mnesia:read({R, G}) || G <- L ] end)).
 all(R) -> lists:flatten(many(fun() -> L= mnesia:all_keys(R), [ mnesia:read({R, G}) || G <- L ] end)).
 seq([],[]) ->
 seq([],[]) ->
   case os:type() of
   case os:type() of
-       {win32,nt} -> {Mega,Sec,Micro} = erlang:now(), integer_to_list((Mega*1000000+Sec)*1000000+Micro);
+       {win32,nt} -> {Mega,Sec,Micro} = erlang:timestamp(), integer_to_list((Mega*1000000+Sec)*1000000+Micro);
                 _ -> erlang:integer_to_list(element(2,hd(lists:reverse(erlang:system_info(os_monotonic_time_source)))))
                 _ -> erlang:integer_to_list(element(2,hd(lists:reverse(erlang:system_info(os_monotonic_time_source)))))
   end;
   end;
 seq(RecordName, Incr) -> mnesia:dirty_update_counter({id_seq, RecordName}, Incr).
 seq(RecordName, Incr) -> mnesia:dirty_update_counter({id_seq, RecordName}, Incr).

+ 3 - 3
src/stores/kvs_rocks.erl

@@ -10,10 +10,10 @@ bt([])     -> [];
 bt(X)      -> binary_to_term(X).
 bt(X)      -> binary_to_term(X).
 start()    -> ok.
 start()    -> ok.
 stop()     -> ok.
 stop()     -> ok.
-destroy()  -> ok.
+destroy()  -> rocksdb:destroy(application:get_env(kvs,rocks_name,"rocksdb"), []).
 version()  -> {version,"KVS ROCKSDB"}.
 version()  -> {version,"KVS ROCKSDB"}.
 dir()      -> [].
 dir()      -> [].
-leave() -> case ref() of [] -> skip; X -> rocksdb:close(X) end.
+leave() -> case ref() of [] -> skip; X -> rocksdb:close(X), application:set_env(kvs,rocks_ref,[]), ok end.
 join(_) -> application:start(rocksdb),
 join(_) -> application:start(rocksdb),
            leave(), {ok, Ref} = rocksdb:open(application:get_env(kvs,rocks_name,"rocksdb"), [{create_if_missing, true}]),
            leave(), {ok, Ref} = rocksdb:open(application:get_env(kvs,rocks_name,"rocksdb"), [{create_if_missing, true}]),
            initialize(),
            initialize(),
@@ -91,7 +91,7 @@ cut(_,_,_,_,_,_,_,C) -> C.
 
 
 seq(_,_) ->
 seq(_,_) ->
   case os:type() of
   case os:type() of
-       {win32,nt} -> {Mega,Sec,Micro} = erlang:now(), integer_to_list((Mega*1000000+Sec)*1000000+Micro);
+       {win32,nt} -> {Mega,Sec,Micro} = erlang:timestamp(), integer_to_list((Mega*1000000+Sec)*1000000+Micro);
                 _ -> erlang:integer_to_list(element(2,hd(lists:reverse(erlang:system_info(os_monotonic_time_source)))))
                 _ -> erlang:integer_to_list(element(2,hd(lists:reverse(erlang:system_info(os_monotonic_time_source)))))
   end.
   end.
 
 

+ 1 - 0
src/stores/kvs_st.erl

@@ -18,6 +18,7 @@ si(M,T) -> se(#it.id, M, T).
 id(T) -> e(#it.id, T).
 id(T) -> e(#it.id, T).
 
 
 % section: next, prev
 % section: next, prev
+feed(Feed) -> take((reader(Feed))#reader{args=-1}).
 
 
 top(#reader{}=C) -> C#reader{dir=1}.
 top(#reader{}=C) -> C#reader{dir=1}.
 bot(#reader{}=C) -> C#reader{dir=0}.
 bot(#reader{}=C) -> C#reader{dir=0}.

+ 327 - 0
test/old_test.exs

@@ -0,0 +1,327 @@
+ExUnit.start()
+
+defmodule OLD.Test do
+  use ExUnit.Case, async: true
+  require KVS
+
+  setup do: (on_exit(fn -> :ok = :kvs.leave();:ok = :kvs.destroy() end);:kvs.join())
+
+  test "basic" do
+    id1 = {:basic, :kvs.seq([], [])}
+    id2 = {:basic, :kvs.seq([], [])}
+    x = 5
+    :kvs.save(:kvs.writer(id1))
+    :kvs.save(:kvs.writer(id2))
+
+    :lists.map(
+      fn _ ->
+        :kvs.save(:kvs.add(KVS.writer(:kvs.writer(id1), args: {:"$msg", [], [], [], [], []})))
+      end,
+      :lists.seq(1, x)
+    )
+
+    :lists.map(fn _ -> :kvs.append({:"$msg", [], [], [], [], []}, id2) end, :lists.seq(1, x))
+    r1 = :kvs.save(:kvs.reader(id1))
+    r2 = :kvs.save(:kvs.reader(id2))
+    x1 = :kvs.take(KVS.reader(:kvs.load_reader(KVS.reader(r1, :id)), args: 20))
+    x2 = :kvs.take(KVS.reader(:kvs.load_reader(KVS.reader(r2, :id)), args: 20))
+    b = KVS.reader(:kvs.feed(id1), :args)
+
+    case :application.get_env(:kvs, :dba_st, :kvs_st) do
+      :kvs_st ->
+        c = :kvs.all(id2)
+        assert :lists.reverse(c) == KVS.reader(x2, :args)
+
+      _ ->
+        # mnesia doesn't support `all` over feeds (only for tables)
+        []
+    end
+
+    assert KVS.reader(x1, :args) == b
+
+    assert length(KVS.reader(x1, :args)) == length(KVS.reader(x2, :args))
+    assert x == length(b)
+  end
+
+  test "sym" do
+    id = {:sym, :kvs.seq([], [])}
+    :kvs.save(:kvs.writer(id))
+    x = 5
+
+    :lists.map(
+      fn
+        z ->
+          :kvs.remove(KVS.writer(z, :cache), id)
+      end, :lists.map(
+        fn _ ->
+          :kvs.save(:kvs.add(KVS.writer(:kvs.writer(id), args: {:"$msg", [], [], [], [], []})))
+        end,
+        :lists.seq(1, x)
+      )
+    )
+
+    {:ok, KVS.writer(count: 0)} = :kvs.get(:writer, id)
+  end
+
+  test "take" do
+    id = {:partial, :kvs.seq([], [])}
+    x = 5
+    :kvs.save(:kvs.writer(id))
+    :lists.map(fn _ -> :kvs.append({:"$msg", [], [], [], [], []}, id) end, :lists.seq(1, x))
+    KVS.reader(id: rid) = :kvs.save(:kvs.reader(id))
+    t = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: 20))
+    b = KVS.reader(:kvs.feed(id), :args)
+    #: mnesia
+    assert KVS.reader(t, :args) == b
+  end
+
+  test "take back full" do
+    log(:st, "take back full")
+    id = {:partial, :kvs.seq([], [])}
+    x = 5
+    :kvs.save(:kvs.writer(id))
+    :lists.map(fn _ -> :kvs.append({:"$msg", [], [], [], [], []}, id) end, :lists.seq(1, x))
+    KVS.reader(id: rid) = :kvs.save(:kvs.reader(id))
+    t = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: 5))
+    :kvs.save(KVS.reader(t, dir: 1))
+    log("t:", t)
+    n = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: 5))
+    b = KVS.reader(:kvs.feed(id), :args)
+    log("n:", n)
+    assert KVS.reader(n, :args) == KVS.reader(t, :args)
+    assert KVS.reader(t, :args) == b
+    log(:end, "take back full")
+  end
+
+  test "partial take back" do
+    id = {:partial, :kvs.seq([], [])}
+    x = 3
+    p = 2
+    :kvs.save(:kvs.writer(id))
+    :lists.map(fn _ -> :kvs.append({:"$msg", [], [], [], [], []}, id) end, :lists.seq(1, x))
+    KVS.reader(id: rid) = :kvs.save(:kvs.reader(id))
+    t = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
+    :kvs.save(KVS.reader(t, dir: 1))
+    n = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p + 1))
+    assert KVS.reader(t, :args) == tl(KVS.reader(n, :args))
+  end
+
+  test "partial full bidirectional" do
+    log(:st, "partial full bidirectional")
+    id = {:partial, :kvs.seq([], [])}
+    x = 5
+    p =2
+    :kvs.save(:kvs.writer(id))
+    :lists.map(fn _ -> :kvs.append({:"$msg", :kvs.seq([],[]), [], [], [], []}, id) end, :lists.seq(1, x))
+    r = :kvs.save(:kvs.reader(id))
+    rid = KVS.reader(r, :id)
+    t1 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p, dir: 0))
+    z1 = KVS.reader(t1, :args)
+    IO.inspect :kvs.all(id)
+    r = :kvs.save(t1)
+    log("next t1:", t1)
+
+    t2 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
+    z2 = KVS.reader(t2, :args)
+    r = :kvs.save(t2)
+    log("next t2:", t2)
+
+    t3 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
+    z3 = KVS.reader(t3, :args)
+    :kvs.save(KVS.reader(t3, dir: 1, pos: 0))
+    log("next t3:", t3)
+
+    n1 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
+    nz1 = KVS.reader(n1, :args)
+    :kvs.save n1
+    log("prev n1:", n1)
+
+    n2 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
+    nz2 = KVS.reader(n2, :args)
+    :kvs.save n2
+    log("prev n2:", n2)
+
+    n3 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
+    nz3 = KVS.reader(n3, :args)
+    log("prev n3:", n3)
+
+    assert z3 ++ z2 ++ z1 == nz1 ++ nz2 ++ nz3
+    log(:end, "partial full bidirectional")
+  end
+
+  test "test bidirectional (new)" do
+    log(:st, "test bidirectional (new)")
+    id = {:partial, :kvs.seq([], [])}
+    x = 6
+    p = 3
+    :kvs.save(:kvs.writer(id))
+    :lists.map(fn _ -> :kvs.append({:"$msg", :kvs.seq([],[]), [], [], [], []}, id) end, :lists.seq(1, x))
+    r = :kvs.save(:kvs.reader(id))
+    rid = KVS.reader(r, :id)
+    IO.inspect :kvs.all(id)
+
+    t1 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p, dir: 0))
+    z1 = KVS.reader(t1, :args)
+    r = :kvs.save(t1)
+    log("next t1:", t1)
+
+    t2 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
+    z2 = KVS.reader(t2, :args)
+    r = :kvs.save(t2)
+    log("next t2:", t2)
+
+    t3 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
+    z3 = KVS.reader(t3, :args)
+    :kvs.save(KVS.reader(t3, dir: 1, pos: 0))
+    log("next t3:", t3)
+
+    assert z3 == []
+
+    n1 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
+    nz1 = KVS.reader(n1, :args)
+    :kvs.save n1
+    log("prev n1:", n1)
+
+    n2 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
+    nz2 = KVS.reader(n2, :args)
+    :kvs.save n2
+    log("prev n2:", n2)
+
+    n3 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
+    nz3 = KVS.reader(n3, :args)
+    :kvs.save(KVS.reader(n3, dir: 0))
+    log("prev n3:", n3)
+
+    assert nz3 == []
+
+    t4 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p, dir: 0))
+    z4 = KVS.reader(t4, :args)
+    r = :kvs.save(t4)
+    log("next t4:", t4)
+
+    assert length(z4) == p
+    log(:end, "test bidirectional (new)")
+  end
+
+  test "partial take forward full" do
+    log(:st, "partial take forward full")
+    id = {:partial, :kvs.seq([], [])}
+    x = 7
+    :kvs.save(:kvs.writer(id))
+    :lists.map(fn _ -> :kvs.append({:"$msg", [], [], [], [], []}, id) end, :lists.seq(1, x))
+    KVS.reader(id: rid) = :kvs.save(:kvs.reader(id))
+    p = 3
+    IO.inspect :kvs.all id
+
+    t1 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
+    z1 = KVS.reader(t1, :args)
+    :kvs.save(t1)
+    log("next t1:", t1)
+
+    t2 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
+    z2 = KVS.reader(t2, :args)
+    :kvs.save(t2)
+    log("next t2:", t2)
+
+    t3 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
+    z3 = KVS.reader(t3, :args)
+    :kvs.save(t3)
+    log("next t3:", t3)
+
+    assert length(z3) == 1
+    assert :lists.reverse(z1) ++ :lists.reverse(z2) ++ z3 == :kvs.all(id)
+    log(:end, "partial take forward full")
+  end
+
+  test "take with empy" do
+    log(:st, "take with empy")
+    id = {:partial, :kvs.seq([], [])}
+    x = 6
+    p = 3
+    :kvs.save(:kvs.writer(id))
+    :lists.map(fn _ -> :kvs.append({:"$msg", :kvs.seq([],[]), [], [], [], []}, id) end, :lists.seq(1, x))
+    r = :kvs.save(:kvs.reader(id))
+    IO.inspect :kvs.all(id)
+    rid = KVS.reader(r, :id)
+    t1 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p, dir: 0))
+    z1 = KVS.reader(t1, :args)
+    r = :kvs.save(t1)
+    log("next t1:", t1)
+
+    t2 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
+    z2 = KVS.reader(t2, :args)
+    r = :kvs.save(t2)
+    log("next t2:", t2)
+
+    t3 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
+    z3 = KVS.reader(t3, :args)
+    r = :kvs.save(t3)
+    log("next t3:", t3)
+    assert  z3 == []
+
+    KVS.reader(id: tid) = :kvs.save(KVS.reader(t3, dir: 1, pos: 0))
+    n1 = :kvs.take(KVS.reader(:kvs.load_reader(tid), args: p))
+    nz1 = KVS.reader(n1, :args)
+    :kvs.save n1
+    log("prev b1:", n1)
+
+    n2 = :kvs.take(KVS.reader(:kvs.load_reader(tid), args: p))
+    nz2 = KVS.reader(n2, :args)
+    :kvs.save n2
+    log("prev b2:", n2)
+
+    assert z2 ++ z1 == nz1 ++ nz2
+    log(:end, "take with empy")
+  end
+
+  test "test prev" do
+    log(:st, "test prev")
+    id = {:partial, :kvs.seq([], [])}
+    x = 6
+    p = 3
+    :kvs.save(:kvs.writer(id))
+    :lists.map(fn _ -> :kvs.append({:"$msg", :kvs.seq([],[]), [], [], [], []}, id) end, :lists.seq(1, x))
+    r = :kvs.save(:kvs.reader(id))
+    rid = KVS.reader(r, :id)
+    IO.inspect :kvs.all(id)
+
+    t1 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p, dir: 0))
+    z1 = KVS.reader(t1, :args)
+    r = :kvs.save(t1)
+    log("next z1:", z1)
+
+    t2 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
+    z2 = KVS.reader(t2, :args)
+
+    KVS.reader(id: tid) = :kvs.save(KVS.reader(t2, dir: 1, pos: 0))
+    log("next z2:", z2)
+
+    n1 = :kvs.take(KVS.reader(:kvs.load_reader(tid), args: p))
+    nz1 = tl(:lists.reverse(KVS.reader(n1, :args)))
+    :kvs.save(n1)
+    log("prev nz1:", nz1)
+
+    n2 = :kvs.take(KVS.reader(:kvs.load_reader(tid), args: p))
+    nz2 = KVS.reader(n2, :args)
+    :kvs.save n2
+    log("prev n2:", n2)
+
+    assert length(nz2) == p
+    assert nz2 == z1
+
+    n3 = :kvs.take(KVS.reader(:kvs.load_reader(tid), args: p))
+    nz3 = KVS.reader(n3, :args)
+    :kvs.save(KVS.reader(n3, dir: 0))
+    log("prev nz3:", nz3)
+
+    assert nz3 = []
+
+    log(:end, "test prev")
+
+  end
+
+  def log(x,cursor) do
+     IO.inspect {x,cursor}
+  end
+end
+

+ 0 - 324
test/test_helper.exs

@@ -1,325 +1 @@
-require KVS
-:kvs.join()
 ExUnit.start()
 ExUnit.start()
-
-defmodule BPE.Test do
-  use ExUnit.Case, async: true
-
-  test "basic" do
-    id1 = {:basic, :kvs.seq([], [])}
-    id2 = {:basic, :kvs.seq([], [])}
-    x = 5
-    :kvs.save(:kvs.writer(id1))
-    :kvs.save(:kvs.writer(id2))
-
-    :lists.map(
-      fn _ ->
-        :kvs.save(:kvs.add(KVS.writer(:kvs.writer(id1), args: {:"$msg", [], [], [], [], []})))
-      end,
-      :lists.seq(1, x)
-    )
-
-    :lists.map(fn _ -> :kvs.append({:"$msg", [], [], [], [], []}, id2) end, :lists.seq(1, x))
-    r1 = :kvs.save(:kvs.reader(id1))
-    r2 = :kvs.save(:kvs.reader(id2))
-    x1 = :kvs.take(KVS.reader(:kvs.load_reader(KVS.reader(r1, :id)), args: 20))
-    x2 = :kvs.take(KVS.reader(:kvs.load_reader(KVS.reader(r2, :id)), args: 20))
-    b = :kvs.feed(id1)
-
-    case :application.get_env(:kvs, :dba_st, :kvs_st) do
-      :kvs_st ->
-        c = :kvs.all(id2)
-        assert :lists.reverse(c) == KVS.reader(x2, :args)
-
-      _ ->
-        # mnesia doesn't support `all` over feeds (only for tables)
-        []
-    end
-
-    assert KVS.reader(x1, :args) == b
-    assert length(KVS.reader(x1, :args)) == length(KVS.reader(x2, :args))
-    assert x == length(b)
-  end
-
-  test "sym" do
-    id = {:sym, :kvs.seq([], [])}
-    :kvs.save(:kvs.writer(id))
-    x = 5
-
-    :lists.map(
-      fn
-        z ->
-          :kvs.remove(KVS.writer(z, :cache), id)
-      end, :lists.map(
-        fn _ ->
-          :kvs.save(:kvs.add(KVS.writer(:kvs.writer(id), args: {:"$msg", [], [], [], [], []})))
-        end,
-        :lists.seq(1, x)
-      )
-    )
-
-    {:ok, KVS.writer(count: 0)} = :kvs.get(:writer, id)
-  end
-
-  test "take" do
-    id = {:partial, :kvs.seq([], [])}
-    x = 5
-    :kvs.save(:kvs.writer(id))
-    :lists.map(fn _ -> :kvs.append({:"$msg", [], [], [], [], []}, id) end, :lists.seq(1, x))
-    KVS.reader(id: rid) = :kvs.save(:kvs.reader(id))
-    t = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: 20))
-    b = :kvs.feed(id)
-    # mnesia
-    assert KVS.reader(t, :args) == b
-  end
-
-  test "take back full" do
-    log(:st, "take back full")
-    id = {:partial, :kvs.seq([], [])}
-    x = 5
-    :kvs.save(:kvs.writer(id))
-    :lists.map(fn _ -> :kvs.append({:"$msg", [], [], [], [], []}, id) end, :lists.seq(1, x))
-    KVS.reader(id: rid) = :kvs.save(:kvs.reader(id))
-    t = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: 5))
-    :kvs.save(KVS.reader(t, dir: 1))
-    log("t:", t)
-    n = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: 5))
-    b = :kvs.feed(id)
-    log("n:", n)
-    assert KVS.reader(n, :args) == KVS.reader(t, :args)
-    assert KVS.reader(t, :args) == b
-    log(:end, "take back full")
-  end
-
-  test "partial take back" do
-    id = {:partial, :kvs.seq([], [])}
-    x = 3
-    p = 2
-    :kvs.save(:kvs.writer(id))
-    :lists.map(fn _ -> :kvs.append({:"$msg", [], [], [], [], []}, id) end, :lists.seq(1, x))
-    KVS.reader(id: rid) = :kvs.save(:kvs.reader(id))
-    t = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
-    :kvs.save(KVS.reader(t, dir: 1))
-    n = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p + 1))
-    assert KVS.reader(t, :args) == tl(KVS.reader(n, :args))
-  end
-
-  test "partial full bidirectional" do
-    log(:st, "partial full bidirectional")
-    id = {:partial, :kvs.seq([], [])}
-    x = 5
-    p =2
-    :kvs.save(:kvs.writer(id))
-    :lists.map(fn _ -> :kvs.append({:"$msg", :kvs.seq([],[]), [], [], [], []}, id) end, :lists.seq(1, x))
-    r = :kvs.save(:kvs.reader(id))
-    rid = KVS.reader(r, :id)
-    t1 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p, dir: 0))
-    z1 = KVS.reader(t1, :args)
-    IO.inspect :kvs.all(id)
-    r = :kvs.save(t1)
-    log("next t1:", t1)
-
-    t2 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
-    z2 = KVS.reader(t2, :args)
-    r = :kvs.save(t2)
-    log("next t2:", t2)
-
-    t3 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
-    z3 = KVS.reader(t3, :args)
-    :kvs.save(KVS.reader(t3, dir: 1, pos: 0))
-    log("next t3:", t3)
-
-    n1 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
-    nz1 = KVS.reader(n1, :args)
-    :kvs.save n1
-    log("prev n1:", n1)
-
-    n2 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
-    nz2 = KVS.reader(n2, :args)
-    :kvs.save n2
-    log("prev n2:", n2)
-
-    n3 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
-    nz3 = KVS.reader(n3, :args)
-    log("prev n3:", n3)
-
-    assert z3 ++ z2 ++ z1 == nz1 ++ nz2 ++ nz3
-    log(:end, "partial full bidirectional")
-  end
-
-  test "test bidirectional (new)" do
-    log(:st, "test bidirectional (new)")
-    id = {:partial, :kvs.seq([], [])}
-    x = 6
-    p = 3
-    :kvs.save(:kvs.writer(id))
-    :lists.map(fn _ -> :kvs.append({:"$msg", :kvs.seq([],[]), [], [], [], []}, id) end, :lists.seq(1, x))
-    r = :kvs.save(:kvs.reader(id))
-    rid = KVS.reader(r, :id)
-    IO.inspect :kvs.all(id)
-
-    t1 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p, dir: 0))
-    z1 = KVS.reader(t1, :args)
-    r = :kvs.save(t1)
-    log("next t1:", t1)
-
-    t2 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
-    z2 = KVS.reader(t2, :args)
-    r = :kvs.save(t2)
-    log("next t2:", t2)
-
-    t3 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
-    z3 = KVS.reader(t3, :args)
-    :kvs.save(KVS.reader(t3, dir: 1, pos: 0))
-    log("next t3:", t3)
-
-    assert z3 == []
-
-    n1 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
-    nz1 = KVS.reader(n1, :args)
-    :kvs.save n1
-    log("prev n1:", n1)
-
-    n2 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
-    nz2 = KVS.reader(n2, :args)
-    :kvs.save n2
-    log("prev n2:", n2)
-
-    n3 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
-    nz3 = KVS.reader(n3, :args)
-    :kvs.save(KVS.reader(n3, dir: 0))
-    log("prev n3:", n3)
-
-    assert nz3 == []
-
-    t4 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p, dir: 0))
-    z4 = KVS.reader(t4, :args)
-    r = :kvs.save(t4)
-    log("next t4:", t4)
-
-    assert length(z4) == p
-    log(:end, "test bidirectional (new)")
-  end
-
-  test "partial take forward full" do
-    log(:st, "partial take forward full")
-    id = {:partial, :kvs.seq([], [])}
-    x = 7
-    :kvs.save(:kvs.writer(id))
-    :lists.map(fn _ -> :kvs.append({:"$msg", [], [], [], [], []}, id) end, :lists.seq(1, x))
-    KVS.reader(id: rid) = :kvs.save(:kvs.reader(id))
-    p = 3
-    IO.inspect :kvs.all id
-
-    t1 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
-    z1 = KVS.reader(t1, :args)
-    :kvs.save(t1)
-    log("next t1:", t1)
-
-    t2 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
-    z2 = KVS.reader(t2, :args)
-    :kvs.save(t2)
-    log("next t2:", t2)
-
-    t3 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
-    z3 = KVS.reader(t3, :args)
-    :kvs.save(t3)
-    log("next t3:", t3)
-
-    assert length(z3) == 1
-    assert :lists.reverse(z1) ++ :lists.reverse(z2) ++ z3 == :kvs.all(id)
-    log(:end, "partial take forward full")
-  end
-
-  test "take with empy" do
-    log(:st, "take with empy")
-    id = {:partial, :kvs.seq([], [])}
-    x = 6
-    p = 3
-    :kvs.save(:kvs.writer(id))
-    :lists.map(fn _ -> :kvs.append({:"$msg", :kvs.seq([],[]), [], [], [], []}, id) end, :lists.seq(1, x))
-    r = :kvs.save(:kvs.reader(id))
-    IO.inspect :kvs.all(id)
-    rid = KVS.reader(r, :id)
-    t1 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p, dir: 0))
-    z1 = KVS.reader(t1, :args)
-    r = :kvs.save(t1)
-    log("next t1:", t1)
-
-    t2 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
-    z2 = KVS.reader(t2, :args)
-    r = :kvs.save(t2)
-    log("next t2:", t2)
-
-    t3 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
-    z3 = KVS.reader(t3, :args)
-    r = :kvs.save(t3)
-    log("next t3:", t3)
-    assert  z3 == []
-
-    KVS.reader(id: tid) = :kvs.save(KVS.reader(t3, dir: 1, pos: 0))
-    n1 = :kvs.take(KVS.reader(:kvs.load_reader(tid), args: p))
-    nz1 = KVS.reader(n1, :args)
-    :kvs.save n1
-    log("prev b1:", n1)
-
-    n2 = :kvs.take(KVS.reader(:kvs.load_reader(tid), args: p))
-    nz2 = KVS.reader(n2, :args)
-    :kvs.save n2
-    log("prev b2:", n2)
-
-    assert z2 ++ z1 == nz1 ++ nz2
-    log(:end, "take with empy")
-  end
-
-  test "test prev" do
-    log(:st, "test prev")
-    id = {:partial, :kvs.seq([], [])}
-    x = 6
-    p = 3
-    :kvs.save(:kvs.writer(id))
-    :lists.map(fn _ -> :kvs.append({:"$msg", :kvs.seq([],[]), [], [], [], []}, id) end, :lists.seq(1, x))
-    r = :kvs.save(:kvs.reader(id))
-    rid = KVS.reader(r, :id)
-    IO.inspect :kvs.all(id)
-
-    t1 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p, dir: 0))
-    z1 = KVS.reader(t1, :args)
-    r = :kvs.save(t1)
-    log("next z1:", z1)
-
-    t2 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
-    z2 = KVS.reader(t2, :args)
-
-    KVS.reader(id: tid) = :kvs.save(KVS.reader(t2, dir: 1, pos: 0))
-    log("next z2:", z2)
-
-    n1 = :kvs.take(KVS.reader(:kvs.load_reader(tid), args: p))
-    nz1 = tl(:lists.reverse(KVS.reader(n1, :args)))
-    :kvs.save(n1)
-    log("prev nz1:", nz1)
-
-    n2 = :kvs.take(KVS.reader(:kvs.load_reader(tid), args: p))
-    nz2 = KVS.reader(n2, :args)
-    :kvs.save n2
-    log("prev n2:", n2)
-
-    assert length(nz2) == p
-    assert nz2 == z1
-
-    n3 = :kvs.take(KVS.reader(:kvs.load_reader(tid), args: p))
-    nz3 = KVS.reader(n3, :args)
-    :kvs.save(KVS.reader(n3, dir: 0))
-    log("prev nz3:", nz3)
-
-    assert nz3 = []
-
-    log(:end, "test prev")
-
-  end
-
-  def log(x,cursor) do
-     IO.inspect {x,cursor}
-  end
-
-end