123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309 |
- -module(kvs).
- -copyright('Synrc Research Center s.r.o.').
- -compile(export_all).
- -include("api.hrl").
- -include("config.hrl").
- -include("metainfo.hrl").
- -include("state.hrl").
- -include("kvs.hrl").
- -include_lib("stdlib/include/qlc.hrl").
- % NOTE: API Documentation
- -export([start/0,stop/0]). % service
- -export([destroy/0,join/0,join/1,init/2]). % schema change
- -export([modules/0,containers/0,tables/0,table/1,version/0]). % meta info
- -export([create/1,add/1,remove/2,remove/1]). % chain ops
- -export([put/1,delete/2,next_id/2]). % raw ops
- -export([get/2,get/3,index/3]). % read ops
- -export([load_db/1,save_db/1]). % import/export
- start() -> DBA = ?DBA, DBA:start().
- stop() -> DBA = ?DBA, DBA:stop().
- destroy() -> DBA = ?DBA, DBA:destroy().
- join() -> DBA = ?DBA, DBA:join().
- join(Node) -> DBA = ?DBA, DBA:join(Node).
- init(Backend, Module) ->
- [ begin
- Backend:create_table(T
- [ Backend:add_table_index(T
- T
- end || T <- (Module:metainfo())
- version() -> DBA=?DBA, DBA:version().
- tables() -> lists:flatten([ (M:metainfo())
- table(Name) -> lists:keyfind(Name,
- dir() -> DBA = ?DBA, DBA:dir().
- modules() -> kvs:config(schema).
- containers() ->
- lists:flatten([ [ {T
- || T=
- || M <- modules() ]).
- create(ContainerName) -> create(ContainerName, kvs:next_id(atom_to_list(ContainerName), 1)).
- create(ContainerName, Id) ->
- wf:info("kvs:create: ~p",[ContainerName]),
- Instance = list_to_tuple([ContainerName|proplists:get_value(ContainerName, kvs:containers())]),
- Top = setelement(
- Top2 = setelement(
- Top3 = setelement(
- ok = kvs:put(Top3),
- Id.
- add(Record) when is_tuple(Record) ->
- Id = element(
- case kvs:get(element(1,Record), Id) of
- {error, not_found} ->
- Type = table_type(element(1,Record)),
- CName = element(
- Cid = table_type(case element(
- undefined -> element(1,Record);
- Fid -> Fid end),
- Container = case kvs:get(CName, Cid) of
- {ok,C} -> C;
- {error, not_found} when Cid /= undefined ->
- NC = setelement(
- list_to_tuple([CName|proplists:get_value(CName, kvs:containers())]), Cid),
- NC1 = setelement(
- kvs:put(NC1),NC1;
- _ -> error end,
- if Container == error -> {error, no_container};
- true ->
- Next = undefined,
- Prev = case element(
- undefined -> undefined;
- Tid ->
- case kvs:get(Type, Tid) of
- {error, not_found} -> undefined;
- {ok, Top} ->
- NewTop = setelement(
- kvs:put(NewTop),
- element(
- C1 = setelement(
- C2 = setelement(
- element(
- kvs:put(C2),
- R = setelement(
- [ case F1 of
- {FN, Fd} -> {FN, Fd};
- _-> {F1, kvs:create(CName,{F1,element(
- end || F1 <- element(
- R1 = setelement(
- R2 = setelement(
- R3 = setelement(
- kvs:put(R3),
- kvs:info(?MODULE,"[kvs] put: ~p~n", [element(
- {ok, R3}
- end;
- {aborted, Reason} -> kvs:info(?MODULE,"[kvs] aborted: ~p~n", [Reason]), {aborted, Reason};
- {ok, _} -> kvs:info(?MODULE,"[kvs] entry exist while put: ~p~n", [Id]), {error, exist} end.
- remove(RecordName, RecordId) ->
- case kvs:get(RecordName, RecordId) of
- {error, not_found} -> kvs:error("[kvs] can't remove ~p~n",[{RecordName,RecordId}]);
- {ok, E} ->
- Id = element(
- CName = element(
- Cid = element(
- {ok, Container} = kvs:get(CName, Cid),
- Top = element(
- Next = element(
- Prev = element(
- case kvs:get(RecordName, Next) of
- {ok, NE} ->
- NewNext = setelement(
- kvs:put(NewNext);
- _ -> ok end,
- case kvs:get(RecordName, Prev) of
- {ok, PE} ->
- NewPrev = setelement(
- kvs:put(NewPrev);
- _ -> ok end,
- C1 = case Top of Id -> setelement(
- C2 = setelement(
- kvs:put(C2),
- kvs:info(?MODULE,"[kvs] delete: ~p id: ~p~n", [RecordName, Id]),
- kvs:delete(RecordName, Id) end.
- remove(E) when is_tuple(E) ->
- Id = element(
- CName = element(
- Cid = element(
- case kvs:get(CName, Cid) of {ok, Container} ->
- Top = element(
- Next = element(
- Prev = element(
- case kvs:get(element(1,E), Next) of
- {ok, NE} ->
- NewNext = setelement(
- kvs:put(NewNext); _ -> ok end,
- case kvs:get(element(1,E), Prev) of
- {ok, PE} ->
- NewPrev = setelement(
- kvs:put(NewPrev);
- _ -> ok end,
- C1 = case Top of Id -> setelement(
- C2 = setelement(
- kvs:put(C2);
- _ -> skip end,
- kvs:info(?MODULE,"[kvs] delete: ~p", [Id]),
- kvs:delete(element(1,E), Id).
- traversal( _,undefined,_,_) -> [];
- traversal(_,_,0,_) -> [];
- traversal(RecordType2, Start, Count, Direction)->
- RecordType = table_type(RecordType2),
- case kvs:get(RecordType, Start) of
- {ok, R} -> Prev = element(Direction, R),
- Count1 = case Count of C when is_integer(C) -> C - 1; _-> Count end,
- [R | traversal(RecordType2, Prev, Count1, Direction)];
- Error -> [] end.
- entries(Name) -> Table = kvs:table(Name), entries(kvs:get(Table
- entries(Name, Count) -> Table = kvs:table(Name), entries(kvs:get(Table
- entries({ok, Container}, RecordType, Count) -> entries(Container, RecordType, Count);
- entries(Container, RecordType, Count) when is_tuple(Container) ->
- traversal(RecordType, element(
- entries(RecordType, Start, Count, Direction) ->
- E = traversal(RecordType, Start, Count, Direction),
- case Direction of
- add_seq_ids() ->
- Init = fun(Key) ->
- case kvs:get(id_seq, Key) of
- {error, _} -> {Key,kvs:put(
- {ok, _} -> {Key,skip} end end,
- [ Init(atom_to_list(Name)) || {Name,_Fields} <- containers() ].
- put(Record) ->
- DBA=?DBA,
- DBA:put(Record).
- table_type(user2) -> user;
- table_type(A) -> A.
- range(RecordName,Id) -> Ranges = kvs:config(RecordName), find(Ranges,RecordName,Id).
- find([],_,Id) -> [];
- find([Range|T],RecordName,Id) ->
- case lookup(Range,Id) of
- [] -> find(T,RecordName,Id);
- Name -> Name end.
- lookup(
- lookup(#interval{},Id) -> [].
- get(RecordName, Key) ->
- DBA=?DBA,
- case range(RecordName,Key) of
- [] -> DBA:get(RecordName, Key);
- Name -> DBA:get(Name, Key) end.
- get(RecordName, Key, Default) ->
- DBA=?DBA,
- case DBA:get(RecordName, Key) of
- {ok,{RecordName,Key,Value}} ->
- kvs:info(?MODULE,"[kvs] get config value: ~p~n", [{RecordName, Key, Value}]),
- {ok,Value};
- {error, _B} ->
- kvs:info(?MODULE,"[kvs] new config value: ~p~n", [{RecordName, Key, Default}]),
- DBA:put({RecordName,Key,Default}),
- {ok,Default} end.
- delete(Tab, Key) -> DBA=?DBA,DBA:delete(Tab, Key).
- count(RecordName) -> DBA=?DBA,DBA:count(RecordName).
- all(RecordName) -> DBA=?DBA,DBA:all(RecordName).
- index(RecordName, Key, Value) -> DBA=?DBA,DBA:index(RecordName, Key, Value).
- next_id(RecordName, Incr) -> DBA=?DBA,DBA:next_id(RecordName, Incr).
- save_db(Path) ->
- Data = lists:append([all(B) || B <- [list_to_atom(Name) || {table,Name} <- kvs:dir()] ]),
- kvs:save(Path, Data).
- load_db(Path) ->
- add_seq_ids(),
- AllEntries = kvs:load(Path),
- [kvs:put(E) || E <- lists:filter(fun(E) -> is_tuple(E) end ,AllEntries)].
- save(Dir, Value) ->
- filelib:ensure_dir(Dir),
- file:write_file(Dir, term_to_binary(Value)).
- load(Key) ->
- {ok, Bin} = file:read_file(Key),
- binary_to_term(Bin).
- notify(_EventPath, _Data) -> skip.
- config(Key) -> config(kvs, Key, "").
- config(App,Key) -> config(App,Key, "").
- config(App, Key, Default) -> case application:get_env(App,Key) of
- undefined -> Default;
- {ok,V} -> V end.
- log_modules() -> [].
- -define(ALLOWED, (config(kvs,log_modules,kvs))).
- log(Module, String, Args, Fun) ->
- case lists:member(Module,?ALLOWED:log_modules()) of
- true -> error_logger:Fun("~p:"++String, [Module|Args]);
- false -> skip end.
- info(Module,String, Args) -> log(Module,String, Args, info_msg).
- info(String, Args) -> log(?MODULE, String, Args, info_msg).
- info(String) -> log(?MODULE, String, [], info_msg).
- warning(Module,String, Args) -> log(Module, String, Args, warning_msg).
- warning(String, Args) -> log(?MODULE, String, Args, warning_msg).
- warning(String) -> log(?MODULE,String, [], warning_msg).
- error(Module,String, Args) -> log(Module, String, Args, error_msg).
- error(String, Args) -> log(?MODULE, String, Args, error_msg).
- error(String) -> log(?MODULE, String, [], error_msg).
- dump() ->
- io:format("~20w ~20w ~10w ~10w~n",[name,storage_type,memory,size]),
- [ io:format("~20w ~20w ~10w ~10w~n",[Name,
- mnesia:table_info(Name,storage_type),
- mnesia:table_info(Name,memory),
- mnesia:table_info(Name,size)]) ||
- io:format("Snapshot taken: ~p~n",[calendar:now_to_datetime(now())]).
|