Namdak Tonpa 6 лет назад
Родитель
Сommit
9f006ba48c
7 измененных файлов с 34 добавлено и 31 удалено
  1. 1 1
      include/stream.hrl
  2. 1 1
      rebar.config
  3. 19 12
      src/kvs.erl
  4. 5 2
      src/layers/kvs_stream.erl
  5. 4 13
      src/stores/kvs_mnesia.erl
  6. 2 0
      src/stores/kvs_st.erl
  7. 2 2
      sys.config

+ 1 - 1
include/stream.hrl

@@ -3,7 +3,7 @@
 -include("kvs.hrl").
 -include("kvs.hrl").
 -include("cursors.hrl").
 -include("cursors.hrl").
 -define(STREAM, [top/1, bot/1, next/1, prev/1, drop/1, take/1, append/2, cut/2,
 -define(STREAM, [top/1, bot/1, next/1, prev/1, drop/1, take/1, append/2, cut/2,
-                 load_reader/1, writer/1, reader/1, save/1, add/1]).
+                 load_reader/1, writer/1, reader/1, save/1, add/1, feed/1]).
 -spec top(#reader{}) -> #reader{}.
 -spec top(#reader{}) -> #reader{}.
 -spec bot(#reader{}) -> #reader{}.
 -spec bot(#reader{}) -> #reader{}.
 -spec next(#reader{}) -> #reader{} | {error,not_found | empty}.
 -spec next(#reader{}) -> #reader{} | {error,not_found | empty}.

+ 1 - 1
rebar.config

@@ -1,2 +1,2 @@
 {erl_opts, [nowarn_unused_function,nowarn_duplicated_export]}.
 {erl_opts, [nowarn_unused_function,nowarn_duplicated_export]}.
-{deps, []}.
+{deps, [{rocksdb,".*",{git,"git://github.com/enterprizing/rocksdb",{tag,"master"}}}]}.

+ 19 - 12
src/kvs.erl

@@ -14,9 +14,14 @@
 -export(?STREAM).
 -export(?STREAM).
 -export([init/1, start/2, stop/1]).
 -export([init/1, start/2, stop/1]).
 
 
+-record('$msg', {id,next,prev,user,msg}).
+
 init([]) -> {ok, { {one_for_one, 5, 10}, []} }.
 init([]) -> {ok, { {one_for_one, 5, 10}, []} }.
 start(_,_) -> supervisor:start_link({local, kvs}, kvs, []).
 start(_,_) -> supervisor:start_link({local, kvs}, kvs, []).
 stop(_) -> ok.
 stop(_) -> ok.
+test_tabs() ->
+  case application:get_env(kvs,dba,[]) of
+    kvs_mnesia -> [ #table{name='$msg', fields=record_info(fields,'$msg')} ] end.
 
 
 % kvs api
 % kvs api
 
 
@@ -37,6 +42,7 @@ stop()             -> stop_kvs(#kvs{mod=dba()}).
 start()            -> start   (#kvs{mod=dba()}).
 start()            -> start   (#kvs{mod=dba()}).
 ver()              -> ver(#kvs{mod=dba()}).
 ver()              -> ver(#kvs{mod=dba()}).
 dir()              -> dir     (#kvs{mod=dba()}).
 dir()              -> dir     (#kvs{mod=dba()}).
+feed(Key)          -> feed    (Key, #kvs{mod=dba()}).
 seq(Table,DX)      -> seq     (Table, DX, #kvs{mod=dba()}).
 seq(Table,DX)      -> seq     (Table, DX, #kvs{mod=dba()}).
 
 
 % stream api
 % stream api
@@ -59,7 +65,7 @@ ensure(#writer{id=Id}) ->
         {error,_} -> kvs:save(kvs:writer(Id)), ok;
         {error,_} -> kvs:save(kvs:writer(Id)), ok;
         {ok,_}    -> ok end.
         {ok,_}    -> ok end.
 
 
-metainfo() ->  #schema { name = kvs, tables = core() }.
+metainfo() ->  #schema { name = kvs, tables = core() ++ test_tabs() }.
 core()    -> [ #table { name = id_seq, fields = record_info(fields,id_seq), keys=[thing]} ].
 core()    -> [ #table { name = id_seq, fields = record_info(fields,id_seq), keys=[thing]} ].
 
 
 initialize(Backend, Module) ->
 initialize(Backend, Module) ->
@@ -114,19 +120,20 @@ count(Tab,#kvs{mod=DBA}) -> DBA:count(Tab).
 index(Tab, Key, Value,#kvs{mod=DBA}) -> DBA:index(Tab, Key, Value).
 index(Tab, Key, Value,#kvs{mod=DBA}) -> DBA:index(Tab, Key, Value).
 seq(Tab, Incr,#kvs{mod=DBA}) -> DBA:seq(Tab, Incr).
 seq(Tab, Incr,#kvs{mod=DBA}) -> DBA:seq(Tab, Incr).
 dump(#kvs{mod=Mod}) -> Mod:dump().
 dump(#kvs{mod=Mod}) -> Mod:dump().
+feed(Key,#kvs{st=Mod}) -> Mod:feed(Key).
 
 
 % tests
 % tests
 
 
 check() ->
 check() ->
-    Id  = {list,kvs:seq(writer,1)},
+    Id1 = {list1,kvs:seq([],[])},
+    Id2 = {list2,kvs:seq([],[])},
     X   = 5,
     X   = 5,
-    _W   = kvs:save(kvs:writer(Id)),
-    #reader{id=R1} = kvs:save(kvs:reader(Id)),
-    #reader{id=R2} = kvs:save(kvs:reader(Id)),
-    [ kvs:save(kvs:add((kvs:writer(Id))#writer{args={emails,[],[],[],[]}})) || _ <- lists:seq(1,X) ],
-    Bot = kvs:bot(kvs:load_reader(R1)),
-    Top = kvs:top(kvs:load_reader(R2)),
-    #reader{args=F} = kvs:take(Bot#reader{args=20,dir=0}),
-    #reader{args=B} = kvs:take(Top#reader{args=20,dir=1}),
-    ?assertMatch(X,length(F)),
-    ?assertMatch(F,lists:reverse(B)).
+    _   = kvs:save(kvs:writer(Id1)),
+    _   = kvs:save(kvs:writer(Id2)),
+    [ kvs:save(kvs:add((kvs:writer(Id1))#writer{args={'$msg',[],[],[],[],[]}})) || _ <- lists:seq(1,X) ],
+    [ kvs:append({'$msg',[],[],[],[],[]},Id2) || _ <- lists:seq(1,X) ],
+    #reader{args=A} = kvs:take((kvs:bot(kvs:reader(Id1)))#reader{args=20}),
+    B = kvs:feed(Id1),
+    C = kvs:feed(Id2),
+    ?assertMatch(A,B),
+    ?assertMatch(X,length(C)).

+ 5 - 2
src/layers/kvs_stream.erl

@@ -69,15 +69,18 @@ drop(A,N,#reader{cache=B,pos=P}=C,C2,_,_) ->
 % new, save, load, up, down, top, bot
 % new, save, load, up, down, top, bot
 
 
 load_reader (Id) -> case kvs:get(reader,Id) of {ok,C} -> C; _ -> #reader{id=[]} end.
 load_reader (Id) -> case kvs:get(reader,Id) of {ok,C} -> C; _ -> #reader{id=[]} end.
-writer (Id) -> case kvs:get(writer,Id) of {ok,W} -> W; _ -> #writer{id=Id} end.
+
+writer (Id) -> case kvs:get(writer,Id) of {ok,W} -> W; {error,_} -> #writer{id=Id} end.
 reader (Id) -> case kvs:get(writer,Id) of
 reader (Id) -> case kvs:get(writer,Id) of
          {ok,#writer{first=[]}} -> #reader{id=kvs:seq(reader,1),feed=Id,cache=[]};
          {ok,#writer{first=[]}} -> #reader{id=kvs:seq(reader,1),feed=Id,cache=[]};
          {ok,#writer{first=F}}  -> #reader{id=kvs:seq(reader,1),feed=Id,cache={tab(F),id(F)}};
          {ok,#writer{first=F}}  -> #reader{id=kvs:seq(reader,1),feed=Id,cache={tab(F),id(F)}};
-         {error,_} -> kvs:save(#writer{id=Id}), reader(Id) end.
+         {error,_} -> save(#writer{id=Id}), reader(Id) end.
 save (C) -> NC = c4(C,[]), kvs:put(NC), NC.
 save (C) -> NC = c4(C,[]), kvs:put(NC), NC.
 
 
 % add
 % add
 
 
+feed(Key) -> (take((kvs:reader(Key))#reader{args=-1}))#reader.args.
+
 add(#writer{args=M}=C) when element(2,M) == [] -> add(si(M,kvs:seq(tab(M),1)),C);
 add(#writer{args=M}=C) when element(2,M) == [] -> add(si(M,kvs:seq(tab(M),1)),C);
 add(#writer{args=M}=C) -> add(M,C).
 add(#writer{args=M}=C) -> add(M,C).
 
 

+ 4 - 13
src/stores/kvs_mnesia.erl

@@ -5,7 +5,7 @@
 -include_lib("mnesia/src/mnesia.hrl").
 -include_lib("mnesia/src/mnesia.hrl").
 -include_lib("stdlib/include/qlc.hrl").
 -include_lib("stdlib/include/qlc.hrl").
 -export(?BACKEND).
 -export(?BACKEND).
--export([info/1,exec/1,sync_indexes/0,sync_indexes/1,dump/1]).
+-export([info/1,exec/1,dump/1]).
 start()    -> mnesia:start().
 start()    -> mnesia:start().
 stop()     -> mnesia:stop().
 stop()     -> mnesia:stop().
 destroy()  -> [mnesia:delete_table(T)||{_,T}<-kvs:dir()], mnesia:delete_schema([node()]), ok.
 destroy()  -> [mnesia:delete_table(T)||{_,T}<-kvs:dir()], mnesia:delete_schema([node()]), ok.
@@ -43,7 +43,7 @@ count(RecordName) -> mnesia:table_info(RecordName, size).
 all(R) -> lists:flatten(many(fun() -> L= mnesia:all_keys(R), [ mnesia:read({R, G}) || G <- L ] end)).
 all(R) -> lists:flatten(many(fun() -> L= mnesia:all_keys(R), [ mnesia:read({R, G}) || G <- L ] end)).
 seq([],[]) -> os:system_time();
 seq([],[]) -> os:system_time();
 seq(RecordName, Incr) -> mnesia:dirty_update_counter({id_seq, RecordName}, Incr).
 seq(RecordName, Incr) -> mnesia:dirty_update_counter({id_seq, RecordName}, Incr).
-many(Fun) -> case mnesia:activity(context(),Fun) of {atomic, R} -> R; {aborted, Error} -> {error, Error}; X -> X end.
+many(Fun) -> case mnesia:activity(context(),Fun) of {atomic, [R]} -> R; {aborted, Error} -> {error, Error}; X -> X end.
 void(Fun) -> case mnesia:activity(context(),Fun) of {atomic, ok} -> ok; {aborted, Error} -> {error, Error}; X -> X end.
 void(Fun) -> case mnesia:activity(context(),Fun) of {atomic, ok} -> ok; {aborted, Error} -> {error, Error}; X -> X end.
 info(T) -> try mnesia:table_info(T,all) catch _:_ -> [] end.
 info(T) -> try mnesia:table_info(T,all) catch _:_ -> [] end.
 create_table(Name,Options) -> mnesia:create_table(Name, Options).
 create_table(Name,Options) -> mnesia:create_table(Name, Options).
@@ -52,24 +52,15 @@ exec(Q) -> F = fun() -> qlc:e(Q) end, {atomic, Val} = mnesia:activity(context(),
 just_one(Fun) ->
 just_one(Fun) ->
     case mnesia:activity(context(),Fun) of
     case mnesia:activity(context(),Fun) of
         {atomic, []} -> {error, not_found};
         {atomic, []} -> {error, not_found};
-        {atomic, R} -> {ok, R};
+        {atomic, [R]} -> {ok, R};
         [] -> {error, not_found};
         [] -> {error, not_found};
+        [R] -> {ok,R};
         R when is_list(R) -> {ok,R};
         R when is_list(R) -> {ok,R};
         Error -> Error end.
         Error -> Error end.
 
 
 %add(Record) -> mnesia:activity(context(),fun() -> kvs:append(Record,#kvs{mod=?MODULE}) end).
 %add(Record) -> mnesia:activity(context(),fun() -> kvs:append(Record,#kvs{mod=?MODULE}) end).
 context() -> application:get_env(kvs,mnesia_context,async_dirty).
 context() -> application:get_env(kvs,mnesia_context,async_dirty).
 
 
-sync_indexes() ->
-    lists:map(fun sync_indexes/1, kvs:tables()).
-sync_indexes(#table{name = Table, keys = Keys}) ->
-    mnesia:wait_for_tables(Table, 10000),
-    #cstruct{attributes = Attrs} = mnesia:table_info(Table, cstruct),
-    Indexes = mnesia:table_info(Table, index),
-    IndexedKeys = [lists:nth(I-1, Attrs)|| I <- Indexes],
-    [mnesia:del_table_index(Table, Key) || Key <- IndexedKeys -- Keys],
-    [mnesia:add_table_index(Table, Key) || Key <- Keys -- IndexedKeys].
-
 dump() -> dump([ N || #table{name=N} <- kvs:tables() ]), ok.
 dump() -> dump([ N || #table{name=N} <- kvs:tables() ]), ok.
 dump(short) ->
 dump(short) ->
     Gen = fun(T) ->
     Gen = fun(T) ->

+ 2 - 0
src/stores/kvs_st.erl

@@ -80,6 +80,8 @@ reader (Id) ->
          {error,_} -> #reader{} end.
          {error,_} -> #reader{} end.
 save (C) -> NC = c4(C,[]), N2 = c3(NC,[]), kvs:put(N2), N2.
 save (C) -> NC = c4(C,[]), N2 = c3(NC,[]), kvs:put(N2), N2.
 
 
+feed(Key) -> kvs:all(Key).
+
 % add
 % add
 
 
 add(#writer{args=M}=C) when element(2,M) == [] -> add(si(M,kvs:seq([],[])),C);
 add(#writer{args=M}=C) when element(2,M) == [] -> add(si(M,kvs:seq([],[])),C);

+ 2 - 2
sys.config

@@ -1,4 +1,4 @@
-[ {kvs,[{dba,kvs_rocks},
-        {dba_st,kvs_st},
+[ {kvs,[{dba,kvs_mnesia},
+        {dba_st,kvs_stream},
         {schema,[kvs,kvs_stream]}]}
         {schema,[kvs,kvs_stream]}]}
 ].
 ].