123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314 |
- -module(kvs).
- -behaviour(application).
- -behaviour(supervisor).
- -include_lib("stdlib/include/assert.hrl").
- -include("api.hrl").
- -include("metainfo.hrl").
- -include("stream.hrl").
- -include("cursors.hrl").
- -include("kvs.hrl").
- -include("backend.hrl").
- -export([dump/0,
- db/0,
- metainfo/0,
- ensure/1,
- ensure/2,
- seq_gen/0,
- keys/1,
- find/2,
- fields/1,
- defined/2,
- field/2,
- setfield/3,
- remove/1]).
- -export([join/2, seq/3]).
- -export(?API)
- .
- -export(?STREAM)
- .
- -export([init/1, start/2, stop/1]).
- -record('$msg', {id, next, prev, user, msg}).
- init([]) -> {ok, {{one_for_one, 5, 10}, []}}.
- start(_, _) ->
- supervisor:start_link({local, kvs}, kvs, []).
- stop(_) -> ok.
- dba() -> application:get_env(kvs, dba, kvs_mnesia).
- seq_dba() -> application:get_env(kvs, dba_seq, kvs_mnesia).
- db() -> (dba()):db().
- kvs_stream() ->
- application:get_env(kvs, dba_st, kvs_stream).
- % kvs api
- all(Table) -> all(Table, #kvs{mod = dba(), db = db()}).
- delete(Table, Key) ->
- delete(Table, Key, #kvs{mod = dba(), db = db()}).
- get(Table, Key) ->
- (?MODULE):get(Table, Key, #kvs{mod = dba(), db = db()}).
- index(Table, K, V) ->
- index(Table, K, V, #kvs{mod = dba()}).
- keys(Feed) ->
- keys(Feed, #kvs{mod = dba(), db = db()}).
- find(Feed, Id) ->
- find(Feed,Id, #kvs{mod = dba(), db=db()}).
- match(Record) ->
- match(Record, #kvs{mod = dba()}).
- index_match(Record, Index) ->
- index_match(Record, Index, #kvs{mod = dba()}).
- join() -> join([], #kvs{mod = dba(), db = db()}).
- dump() -> dump(#kvs{mod = dba()}).
- join(Node) -> join(Node, #kvs{mod = dba(), db = db()}).
- leave() -> leave(#kvs{mod = dba(), db = db()}).
- destroy() -> destroy(#kvs{mod = dba(), db = db()}).
- count(Table) -> count(Table, #kvs{mod = dba()}).
- put(Record) -> (?MODULE):put(Record, #kvs{mod = dba(), db = db()}).
- stop() -> stop_kvs(#kvs{mod = dba()}).
- start() -> start(#kvs{mod = dba()}).
- ver() -> ver(#kvs{mod = dba()}).
- dir() -> dir(#kvs{mod = dba()}).
- feed(Key) ->
- feed(Key, #kvs{mod = dba(), st = kvs_stream(), db = db()}).
- seq([], DX) -> seq([], DX, #kvs{mod = kvs_rocks});
- seq(Table, DX) -> seq(Table, DX, #kvs{mod = seq_dba()}).
- remove(Rec, Feed) ->
- remove(Rec, Feed, #kvs{mod = dba(), st = kvs_stream(), db = db()}).
- put(Records, #kvs{mod = Mod, db = Db}) when is_list(Records) ->
- Mod:put(Records, Db);
- put(Record, #kvs{mod = Mod, db = Db}) -> Mod:put(Record, Db).
- get(RecordName, Key, #kvs{mod = Mod, db = Db}) ->
- Mod:get(RecordName, Key, Db).
- delete(Tab, Key, #kvs{mod = Mod, db = Db}) ->
- Mod:delete(Tab, Key, Db).
- delete_range(Feed, Last, #kvs{mod=DBA, db=Db}) ->
- DBA:delete_range(Feed,Last,Db).
- count(Tab, #kvs{mod = DBA}) -> DBA:count(Tab).
- index(Tab, Key, Value, #kvs{mod = DBA}) ->
- DBA:index(Tab, Key, Value).
- keys(Feed, #kvs{mod = DBA, db = Db}) ->
- DBA:keys(Feed, Db).
- find(Feed, Id, #kvs{mod = DBA, db = Db}) ->
- DBA:find(Feed, Id, Db).
- match(Record, #kvs{mod = DBA}) ->
- DBA:match(Record).
- index_match(Record, Index, #kvs{mod = DBA}) ->
- DBA:index_match(Record, Index).
- seq(Tab, Incr, #kvs{mod = DBA}) -> DBA:seq(Tab, Incr).
- dump(#kvs{mod = Mod}) -> Mod:dump().
- feed(Tab, #kvs{st = Mod, db = Db}) -> Mod:feed(Tab, Db).
- remove(Rec, Feed, #kvs{st = Mod, db = Db}) ->
- Mod:remove(Rec, Feed, Db).
- all(Tab, #kvs{mod = DBA, db = Db}) -> DBA:all(Tab, Db).
- start(#kvs{mod = DBA}) -> DBA:start().
- stop_kvs(#kvs{mod = DBA}) -> DBA:stop().
- join(Node, #kvs{mod = DBA, db = Db}) -> DBA:join(Node, Db).
- leave(#kvs{mod = DBA, db = Db}) -> DBA:leave(Db).
- destroy(#kvs{mod = DBA, db = Db}) -> DBA:destroy(Db).
- ver(#kvs{mod = DBA}) -> DBA:version().
- dir(#kvs{mod = DBA}) -> DBA:dir().
- % stream api
- top(X) -> (kvs_stream()):top(X).
- top(X,#kvs{db = Db}) -> (kvs_stream()):top(X,Db).
- bot(X) -> (kvs_stream()):bot(X).
- bot(X,#kvs{db = Db}) -> (kvs_stream()):bot(X,Db).
- next(X) -> (kvs_stream()):next(X).
- next(X,#kvs{db = Db}) -> (kvs_stream()):next(X,Db).
- prev(X) -> (kvs_stream()):prev(X).
- prev(X,#kvs{db = Db}) -> (kvs_stream()):prev(X,Db).
- drop(X) -> (kvs_stream()):drop(X).
- drop(X,#kvs{db = Db}) -> (kvs_stream()):drop(X,Db).
- take(X) -> (kvs_stream()):take(X).
- take(X,#kvs{db = Db}) -> (kvs_stream()):take(X,Db).
- save(X) -> (kvs_stream()):save(X).
- save(X,#kvs{db = Db}) -> (kvs_stream()):save(X,Db).
- remove(X) -> (kvs_stream()):remove(X).
- cut(X) -> (kvs_stream()):cut(X).
- cut(X,#kvs{db = Db}) -> (kvs_stream()):cut(X, Db).
- add(X) -> (kvs_stream()):add(X).
- add(X,#kvs{db = Db}) -> (kvs_stream()):add(X,Db).
- append(X, Y) -> (kvs_stream()):append(X, Y).
- append(X, Y, #kvs{db = Db}) -> (kvs_stream()):append(X, Y, Db).
- load_reader(X) -> (kvs_stream()):load_reader(X).
- load_reader(X,#kvs{db = Db}) -> (kvs_stream()):load_reader(X,Db).
- writer(X) -> (kvs_stream()):writer(X).
- writer(X,#kvs{db = Db}) -> (kvs_stream()):writer(X,Db).
- reader(X) -> (kvs_stream()):reader(X).
- reader(X,#kvs{db = Db}) -> (kvs_stream()):reader(X,Db).
- % unrevisited
- ensure(#writer{} = X) -> ensure(X,#kvs{mod = dba(), db = db()}).
- ensure(#writer{id = Id},#kvs{} = X) ->
- case kvs:get(writer, Id, X) of
- {error, _} ->
- kvs:save(kvs:writer(Id, X)),
- ok;
- {ok, _} -> ok
- end.
- cursors() ->
- lists:flatten([[{T#table.name, T#table.fields}
- || #table{name = Name} = T
- <- (M:metainfo())#schema.tables,
- Name == reader orelse Name == writer]
- || M <- modules()]).
- % metainfo api
- tables() ->
- lists:flatten([(M:metainfo())#schema.tables
- || M <- modules()]).
- modules() -> application:get_env(kvs, schema, []).
- metainfo() ->
- #schema{name = kvs, tables = core() ++ test_tabs()}.
- core() ->
- [#table{name = id_seq,
- fields = record_info(fields, id_seq), keys = [thing]}].
- test_tabs() ->
- [#table{name = '$msg',
- fields = record_info(fields, '$msg')}].
- table(Name) when is_atom(Name) ->
- lists:keyfind(Name, #table.name, tables());
- table(_) -> false.
- seq_gen() ->
- Init = fun (Key) ->
- case kvs:get(id_seq, Key) of
- {error, _} ->
- {Key, kvs:put(#id_seq{thing = Key, id = 0})};
- {ok, _} -> {Key, skip}
- end
- end,
- [Init(atom_to_list(Name))
- || {Name, _Fields} <- cursors()].
- initialize(Backend, Module) ->
- [begin
- Backend:create_table(T#table.name,
- [{attributes, T#table.fields},
- {T#table.copy_type, [node()]},
- {type, T#table.type}]),
- [Backend:add_table_index(T#table.name, Key)
- || Key <- T#table.keys],
- T
- end
- || T <- (Module:metainfo())#schema.tables].
- fields(Table) when is_atom(Table) ->
- case table(Table) of
- false -> [];
- T -> T#table.fields
- end.
- defined(TableRecord, Field) ->
- FieldsList = fields(element(1, TableRecord)),
- lists:member(Field, FieldsList).
- field(TableRecord, Field) ->
- FieldsList = fields(element(1, TableRecord)),
- Index = string:str(FieldsList, [Field]) + 1,
- element(Index, TableRecord).
- setfield(TableRecord, Field, Value) ->
- FieldsList = fields(element(1, TableRecord)),
- Index = string:str(FieldsList, [Field]) + 1,
- setelement(Index, TableRecord, Value).
|