Browse Source

Merge pull request #53 from synrc/b2

close key formatting options. update feed ids in tests to expected ids
Andrii Zadorozhnii 4 years ago
parent
commit
8d62772edb
3 changed files with 73 additions and 84 deletions
  1. 17 21
      src/stores/kvs_rocks.erl
  2. 13 19
      src/stores/kvs_st.erl
  3. 43 44
      test/old_test.exs

+ 17 - 21
src/stores/kvs_rocks.erl

@@ -4,10 +4,22 @@
 -include("metainfo.hrl").
 -include_lib("stdlib/include/qlc.hrl").
 -export(?BACKEND).
--export([ref/0,cut/8,next/8,prev/8,prev2/8,next2/8,format/1,bt/1]).
+-export([ref/0,cut/8,next/8,prev/8,prev2/8,next2/8,bt/1,key/2,key/1]).
 
+e(X,Y)     -> element(X,Y).
 bt([])     -> [];
 bt(X)      -> binary_to_term(X).
+tb([])     -> [];
+tb(T) when is_list(T) -> list_to_binary(T);
+tb(T) when is_atom(T) -> atom_to_binary(T);
+tb(T) when is_binary(T) -> T;
+tb(T)      -> term_to_binary(T).
+
+key(R)     when is_tuple(R) andalso tuple_size(R) > 1 -> key(e(1,R), e(2,R));
+key(R)     -> key(R,[]).
+key(Tab,R) when is_tuple(R) andalso tuple_size(R) > 1 -> key(Tab, e(2,R));
+key(Tab,R) -> iolist_to_binary([lists:join(<<"/">>, lists:flatten([<<>>, tb(Tab), tb(R)]))]).
+
 start()    -> ok.
 stop()     -> ok.
 destroy()  -> rocksdb:destroy(application:get_env(kvs,rocks_name,"rocksdb"), []).
@@ -22,33 +34,17 @@ initialize() -> [ kvs:initialize(kvs_rocks,Module) || Module <- kvs:modules() ].
 ref() -> application:get_env(kvs,rocks_ref,[]).
 index(_,_,_) -> [].
 get(Tab, Key) ->
-    Address = <<(list_to_binary(lists:concat(["/",format(Tab),"/"])))/binary,
-                (term_to_binary(Key))/binary>>,
-%    io:format("KVS.GET.Address: ~s~n",[Address]),
-    case rocksdb:get(ref(), Address, []) of
+    case rocksdb:get(ref(), key(Tab,Key), []) of
          not_found -> {error,not_found};
          {ok,Bin} -> {ok,bt(Bin)} end.
 
 put(Records) when is_list(Records) -> lists:map(fun(Record) -> put(Record) end, Records);
-put(Record) -> 
-    Address = <<(list_to_binary(lists:concat(["/",format(element(1,Record)),"/"])))/binary,
-                         (term_to_binary(element(2,Record)))/binary>>,
-%    io:format("KVS.PUT.Address: ~s~n",[Address]),
-    rocksdb:put(ref(), Address, term_to_binary(Record), [{sync,true}]).
-
-format(X) when is_list(X) -> X;
-format(X) when is_atom(X) -> atom_to_list(X);
-format(X) when is_binary(X) -> binary_to_list(X);
-format(X) -> io_lib:format("~p",[X]).
-
-delete(Feed, Id) ->
-    Key    = list_to_binary(lists:concat(["/",format(Feed),"/"])),
-    A      = <<Key/binary,(term_to_binary(Id))/binary>>,
-    rocksdb:delete(ref(), A, []).
+put(Record) -> rocksdb:put(ref(), key(Record), term_to_binary(Record), [{sync,true}]).
+delete(Feed, Id) -> rocksdb:delete(ref(), key(Feed,Id), []).
 
 count(_) -> 0.
 all(R) -> {ok,I} = rocksdb:iterator(ref(), []),
-           Key = list_to_binary(lists:concat(["/",format(R)])),
+           Key = key(R),
            First = rocksdb:iterator_move(I, {seek,Key}),
            lists:reverse(next(I,Key,size(Key),First,[],[],-1,0)).
 

+ 13 - 19
src/stores/kvs_st.erl

@@ -4,10 +4,7 @@
 -include("stream.hrl").
 -include("metainfo.hrl").
 -export(?STREAM).
--export([ref/0,feed_key/2]).
-
-bt(X) -> kvs_rocks:bt(X).
-ref() -> kvs_rocks:ref().
+-import(kvs_rocks, [key/2, key/1, bt/1, ref/0]).
 
 % section: kvs_stream prelude
 
@@ -47,17 +44,17 @@ read_it(C, Feed, Move) ->
   end.
 
 next(#reader{cache=[]}) -> {error,empty};
-next(#reader{feed=Feed,cache=I}=C) when is_tuple(I) -> read_it(C,Feed,move_it(feed_key(I,Feed),next)).
+next(#reader{feed=Feed,cache=I}=C) when is_tuple(I) -> read_it(C,Feed,move_it(key(Feed,I),next)).
 
 prev(#reader{cache=[]}) -> {error,empty};
-prev(#reader{cache=I,feed=Feed}=C) when is_tuple(I) -> read_it(C,Feed,move_it(feed_key(I,Feed),prev)).
+prev(#reader{cache=I,feed=Feed}=C) when is_tuple(I) -> read_it(C,Feed,move_it(key(Feed,I),prev)).
 
 % section: take, drop
 
 drop(#reader{args=N}) when N < 0 -> #reader{};
 drop(#reader{args=N}=C) when N == 0 -> C;
 drop(#reader{args=N,feed=Feed,cache=I}=C) when N > 0 ->
-   Key   = list_to_binary(lists:concat(["/",kvs_rocks:format(Feed)])),
+   Key = key(Feed),
    {ok, H} = rocksdb:iterator(ref(), []),
    First = rocksdb:iterator_move(H, {seek,Key}),
 
@@ -82,9 +79,9 @@ drop(#reader{args=N,feed=Feed,cache=I}=C) when N > 0 ->
 
 take(#reader{pos='end',dir=0}=C) -> C#reader{args=[]}; % 4
 take(#reader{args=N,feed=Feed,cache={T,O},dir=0}=C) -> % 1
-   Key = list_to_binary(lists:concat(["/",kvs_rocks:format(Feed)])),
+   Key = key(Feed),
    {ok,I} = rocksdb:iterator(ref(), []),
-   {ok,K,BERT} = rocksdb:iterator_move(I, {seek,feed_key({T,O},Feed)}),
+   {ok,K,BERT} = rocksdb:iterator_move(I, {seek,key(Feed,{T,O})}),
    {KK,Res} = kvs_rocks:next2(I,Key,size(Key),K,BERT,[],case N of -1 -> -1; J -> J + 1 end,0),
    Last = last(KK,O,'end'),
    case {Res,length(Res)} of
@@ -100,9 +97,9 @@ take(#reader{pos='begin',dir=1}=C) -> C#reader{args=[]}; % 4
 
 % TODO: try to remove lists:reverse and abstract both branches
 take(#reader{args=N,feed=Feed,cache={T,O},dir=1}=C) -> % 1
-   Key = list_to_binary(lists:concat(["/",kvs_rocks:format(Feed)])),
+   Key = key(Feed),
    {ok,I} = rocksdb:iterator(ref(), []),
-   {ok,K,BERT} = rocksdb:iterator_move(I, {seek,feed_key({T,O},Feed)}),
+   {ok,K,BERT} = rocksdb:iterator_move(I, {seek,key(Feed,{T,O})}),
    {KK,Res} = kvs_rocks:prev2(I,Key,size(Key),K,BERT,[],case N of -1 -> -1; J -> J + 1 end,0),
    Last = last(KK,O,'begin'),
    case {lists:reverse(Res),length(Res)} of
@@ -132,7 +129,7 @@ writer(Id) -> case kvs:get(writer,Id) of {ok,W} -> W; {error,_} -> #writer{id=Id
 reader(Id) ->
     case kvs:get(writer,Id) of
          {ok,#writer{id=Feed}} ->
-             Key = list_to_binary(lists:concat(["/",kvs_rocks:format(Feed)])),
+             Key = key(Feed),
              {ok,I} = rocksdb:iterator(ref(), []),
              {ok,_,BERT} = rocksdb:iterator_move(I, {seek,Key}),
              F = bt(BERT),
@@ -147,9 +144,6 @@ add(#writer{args=M}=C) -> add(M,C).
 
 add(M,#writer{id=Feed,count=S}=C) -> NS=S+1, raw_append(M,Feed), C#writer{cache=M,count=NS}.
 
-feed_key(M,Feed) -> <<(list_to_binary(lists:concat(["/",kvs_rocks:format(Feed),"/"])))/binary,
-                      (term_to_binary(id(M)))/binary>>.
-
 remove(Rec,Feed) ->
    kvs:ensure(#writer{id=Feed}),
    W = #writer{count=C} = kvs:writer(Feed),
@@ -161,19 +155,19 @@ remove(Rec,Feed) ->
          _ -> C end.
 
 raw_append(M,Feed) ->
-   rocksdb:put(ref(), feed_key(M,Feed), term_to_binary(M), [{sync,true}]).
+   rocksdb:put(ref(), key(Feed,M), term_to_binary(M), [{sync,true}]).
 
 append(Rec,Feed) ->
    kvs:ensure(#writer{id=Feed}),
-   Id = element(2,Rec),
+   Id = e(2,Rec),
    W = kvs:writer(Feed),
    case kvs:get(Feed,Id) of
         {ok,_} -> raw_append(Rec,Feed), kvs:save(W#writer{cache=Rec,count=W#writer.count + 1}), Id;
         {error,_} -> kvs:save(kvs:add(W#writer{args=Rec,cache=Rec})), Id end.
 
 cut(Feed,Id) ->
-    Key    = list_to_binary(lists:concat(["/",kvs_rocks:format(Feed),"/"])),
-    A      = <<Key/binary,(term_to_binary(Id))/binary>>,
+    Key    = key(Feed),
+    A      = key(Feed,Id),
     {ok,I} = rocksdb:iterator(ref(), []),
     case rocksdb:iterator_move(I, {seek,A}) of
          {ok,A,X} -> {ok,kvs_rocks:cut(I,Key,size(Key),A,X,[],-1,0)};

+ 43 - 44
test/old_test.exs

@@ -7,8 +7,8 @@ defmodule OLD.Test do
   setup do: (on_exit(fn -> :ok = :kvs.leave();:ok = :kvs.destroy() end);:kvs.join())
 
   test "basic" do
-    id1 = {:basic, :kvs.seq([], [])}
-    id2 = {:basic, :kvs.seq([], [])}
+    id1 = "/basic/one"
+    id2 = "/basic/two"
     x = 5
     :kvs.save(:kvs.writer(id1))
     :kvs.save(:kvs.writer(id2))
@@ -29,9 +29,8 @@ defmodule OLD.Test do
 
     case :application.get_env(:kvs, :dba_st, :kvs_st) do
       :kvs_st ->
-        c = :kvs.all(id2)
+        c = :kvs.all("/basic/two")
         assert :lists.reverse(c) == KVS.reader(x2, :args)
-
       _ ->
         # mnesia doesn't support `all` over feeds (only for tables)
         []
@@ -64,29 +63,29 @@ defmodule OLD.Test do
   end
 
   test "take" do
-    id = {:partial, :kvs.seq([], [])}
+    feed = :partial
     x = 5
-    :kvs.save(:kvs.writer(id))
-    :lists.map(fn _ -> :kvs.append({:"$msg", [], [], [], [], []}, id) end, :lists.seq(1, x))
-    KVS.reader(id: rid) = :kvs.save(:kvs.reader(id))
+    :kvs.save(:kvs.writer(feed))
+    :lists.map(fn _ -> :kvs.append({:"$msg", [], [], [], [], []}, feed) end, :lists.seq(1, x))
+    KVS.reader(id: rid) = :kvs.save(:kvs.reader(feed))
     t = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: 20))
-    b = KVS.reader(:kvs.feed(id), :args)
+    b = KVS.reader(:kvs.feed(feed), :args)
     #: mnesia
     assert KVS.reader(t, :args) == b
   end
 
   test "take back full" do
     log(:st, "take back full")
-    id = {:partial, :kvs.seq([], [])}
+    feed = :partial
     x = 5
-    :kvs.save(:kvs.writer(id))
-    :lists.map(fn _ -> :kvs.append({:"$msg", [], [], [], [], []}, id) end, :lists.seq(1, x))
-    KVS.reader(id: rid) = :kvs.save(:kvs.reader(id))
+    :kvs.save(:kvs.writer(feed))
+    :lists.map(fn _ -> :kvs.append({:"$msg", [], [], [], [], []}, feed) end, :lists.seq(1, x))
+    KVS.reader(id: rid) = :kvs.save(:kvs.reader(feed))
     t = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: 5))
     :kvs.save(KVS.reader(t, dir: 1))
     log("t:", t)
     n = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: 5))
-    b = KVS.reader(:kvs.feed(id), :args)
+    b = KVS.reader(:kvs.feed(feed), :args)
     log("n:", n)
     assert KVS.reader(n, :args) == KVS.reader(t, :args)
     assert KVS.reader(t, :args) == b
@@ -94,12 +93,12 @@ defmodule OLD.Test do
   end
 
   test "partial take back" do
-    id = {:partial, :kvs.seq([], [])}
+    feed = :partial
     x = 3
     p = 2
-    :kvs.save(:kvs.writer(id))
-    :lists.map(fn _ -> :kvs.append({:"$msg", [], [], [], [], []}, id) end, :lists.seq(1, x))
-    KVS.reader(id: rid) = :kvs.save(:kvs.reader(id))
+    :kvs.save(:kvs.writer(feed))
+    :lists.map(fn _ -> :kvs.append({:"$msg", [], [], [], [], []}, feed) end, :lists.seq(1, x))
+    KVS.reader(id: rid) = :kvs.save(:kvs.reader(feed))
     t = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
     :kvs.save(KVS.reader(t, dir: 1))
     n = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p + 1))
@@ -108,16 +107,16 @@ defmodule OLD.Test do
 
   test "partial full bidirectional" do
     log(:st, "partial full bidirectional")
-    id = {:partial, :kvs.seq([], [])}
+    feed = :partial
     x = 5
     p =2
-    :kvs.save(:kvs.writer(id))
-    :lists.map(fn _ -> :kvs.append({:"$msg", :kvs.seq([],[]), [], [], [], []}, id) end, :lists.seq(1, x))
-    r = :kvs.save(:kvs.reader(id))
+    :kvs.save(:kvs.writer(feed))
+    :lists.map(fn _ -> :kvs.append({:"$msg", :kvs.seq([],[]), [], [], [], []}, feed) end, :lists.seq(1, x))
+    r = :kvs.save(:kvs.reader(feed))
     rid = KVS.reader(r, :id)
     t1 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p, dir: 0))
     z1 = KVS.reader(t1, :args)
-    IO.inspect :kvs.all(id)
+    IO.inspect :kvs.all(feed)
     r = :kvs.save(t1)
     log("next t1:", t1)
 
@@ -151,14 +150,14 @@ defmodule OLD.Test do
 
   test "test bidirectional (new)" do
     log(:st, "test bidirectional (new)")
-    id = {:partial, :kvs.seq([], [])}
+    feed = :partial
     x = 6
     p = 3
-    :kvs.save(:kvs.writer(id))
-    :lists.map(fn _ -> :kvs.append({:"$msg", :kvs.seq([],[]), [], [], [], []}, id) end, :lists.seq(1, x))
-    r = :kvs.save(:kvs.reader(id))
+    :kvs.save(:kvs.writer(feed))
+    :lists.map(fn _ -> :kvs.append({:"$msg", :kvs.seq([],[]), [], [], [], []}, feed) end, :lists.seq(1, x))
+    r = :kvs.save(:kvs.reader(feed))
     rid = KVS.reader(r, :id)
-    IO.inspect :kvs.all(id)
+    IO.inspect :kvs.all(feed)
 
     t1 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p, dir: 0))
     z1 = KVS.reader(t1, :args)
@@ -205,13 +204,13 @@ defmodule OLD.Test do
 
   test "partial take forward full" do
     log(:st, "partial take forward full")
-    id = {:partial, :kvs.seq([], [])}
+    feed = :partial
     x = 7
-    :kvs.save(:kvs.writer(id))
-    :lists.map(fn _ -> :kvs.append({:"$msg", [], [], [], [], []}, id) end, :lists.seq(1, x))
-    KVS.reader(id: rid) = :kvs.save(:kvs.reader(id))
+    :kvs.save(:kvs.writer(feed))
+    :lists.map(fn _ -> :kvs.append({:"$msg", [], [], [], [], []}, feed) end, :lists.seq(1, x))
+    KVS.reader(id: rid) = :kvs.save(:kvs.reader(feed))
     p = 3
-    IO.inspect :kvs.all id
+    IO.inspect :kvs.all(feed)
 
     t1 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p))
     z1 = KVS.reader(t1, :args)
@@ -229,19 +228,19 @@ defmodule OLD.Test do
     log("next t3:", t3)
 
     assert length(z3) == 1
-    assert :lists.reverse(z1) ++ :lists.reverse(z2) ++ z3 == :kvs.all(id)
+    assert :lists.reverse(z1) ++ :lists.reverse(z2) ++ z3 == :kvs.all(:partial)
     log(:end, "partial take forward full")
   end
 
   test "take with empy" do
     log(:st, "take with empy")
-    id = {:partial, :kvs.seq([], [])}
+    feed = :partial
     x = 6
     p = 3
-    :kvs.save(:kvs.writer(id))
-    :lists.map(fn _ -> :kvs.append({:"$msg", :kvs.seq([],[]), [], [], [], []}, id) end, :lists.seq(1, x))
-    r = :kvs.save(:kvs.reader(id))
-    IO.inspect :kvs.all(id)
+    :kvs.save(:kvs.writer(feed))
+    :lists.map(fn _ -> :kvs.append({:"$msg", :kvs.seq([],[]), [], [], [], []}, feed) end, :lists.seq(1, x))
+    r = :kvs.save(:kvs.reader(feed))
+    IO.inspect :kvs.all(feed)
     rid = KVS.reader(r, :id)
     t1 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p, dir: 0))
     z1 = KVS.reader(t1, :args)
@@ -276,14 +275,14 @@ defmodule OLD.Test do
 
   test "test prev" do
     log(:st, "test prev")
-    id = {:partial, :kvs.seq([], [])}
+    feed = :partial
     x = 6
     p = 3
-    :kvs.save(:kvs.writer(id))
-    :lists.map(fn _ -> :kvs.append({:"$msg", :kvs.seq([],[]), [], [], [], []}, id) end, :lists.seq(1, x))
-    r = :kvs.save(:kvs.reader(id))
+    :kvs.save(:kvs.writer(feed))
+    :lists.map(fn _ -> :kvs.append({:"$msg", :kvs.seq([],[]), [], [], [], []}, feed) end, :lists.seq(1, x))
+    r = :kvs.save(:kvs.reader(feed))
     rid = KVS.reader(r, :id)
-    IO.inspect :kvs.all(id)
+    IO.inspect :kvs.all(feed)
 
     t1 = :kvs.take(KVS.reader(:kvs.load_reader(rid), args: p, dir: 0))
     z1 = KVS.reader(t1, :args)