kvs.erl 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. -module(kvs).
  2. -copyright('Synrc Research Center s.r.o.').
  3. -compile(export_all).
  4. -include("api.hrl").
  5. -include("config.hrl").
  6. -include("metainfo.hrl").
  7. -include("state.hrl").
  8. -include("kvs.hrl").
  9. -include_lib("stdlib/include/qlc.hrl").
  10. % NOTE: API Documentation
  11. -export([start/0,stop/0]). % service
  12. -export([destroy/0,join/0,join/1,init/2]). % schema change
  13. -export([modules/0,containers/0,tables/0,table/1,version/0]). % meta info
  14. -export([create/1,add/1,remove/2,remove/1]). % chain ops
  15. -export([put/1,delete/2,next_id/2]). % raw ops
  16. -export([get/2,get/3,index/3]). % read ops
  17. -export([load_db/1,save_db/1]). % import/export
  18. start() -> DBA = ?DBA, DBA:start().
  19. stop() -> DBA = ?DBA, DBA:stop().
  20. destroy() -> DBA = ?DBA, DBA:destroy().
  21. join() -> DBA = ?DBA, DBA:join().
  22. join(Node) -> DBA = ?DBA, DBA:join(Node).
  23. init(Backend, Module) ->
  24. [ begin
  25. Backend:create_table(T#table.name, [{attributes,T#table.fields},{T#table.copy_type, [node()]}]),
  26. [ Backend:add_table_index(T#table.name, Key) || Key <- T#table.keys ],
  27. T
  28. end || T <- (Module:metainfo())#schema.tables ].
  29. version() -> DBA=?DBA, DBA:version().
  30. tables() -> lists:flatten([ (M:metainfo())#schema.tables || M <- modules() ]).
  31. table(Name) -> lists:keyfind(Name,#table.name,tables()).
  32. dir() -> DBA = ?DBA, DBA:dir().
  33. modules() -> kvs:config(schema).
  34. containers() ->
  35. lists:flatten([ [ {T#table.name,T#table.fields}
  36. || T=#table{container=true} <- (M:metainfo())#schema.tables ]
  37. || M <- modules() ]).
  38. create(ContainerName) -> create(ContainerName, kvs:next_id(atom_to_list(ContainerName), 1)).
  39. create(ContainerName, Id) ->
  40. wf:info("kvs:create: ~p",[ContainerName]),
  41. Instance = list_to_tuple([ContainerName|proplists:get_value(ContainerName, kvs:containers())]),
  42. Top = setelement(#container.id,Instance,Id),
  43. Top2 = setelement(#container.top,Top,undefined),
  44. Top3 = setelement(#container.entries_count,Top2,0),
  45. ok = kvs:put(Top3),
  46. Id.
  47. add(Record) when is_tuple(Record) ->
  48. Id = element(#iterator.id, Record),
  49. case kvs:get(element(1,Record), Id) of
  50. {error, not_found} ->
  51. Type = table_type(element(1,Record)),
  52. CName = element(#iterator.container, Record),
  53. Cid = table_type(case element(#iterator.feed_id, Record) of
  54. undefined -> element(1,Record);
  55. Fid -> Fid end),
  56. Container = case kvs:get(CName, Cid) of
  57. {ok,C} -> C;
  58. {error, not_found} when Cid /= undefined ->
  59. NC = setelement(#container.id,
  60. list_to_tuple([CName|proplists:get_value(CName, kvs:containers())]), Cid),
  61. NC1 = setelement(#container.entries_count, NC, 0),
  62. kvs:put(NC1),NC1;
  63. _ -> error end,
  64. if Container == error -> {error, no_container};
  65. true ->
  66. Next = undefined,
  67. Prev = case element(#container.top, Container) of
  68. undefined -> undefined;
  69. Tid ->
  70. case kvs:get(Type, Tid) of
  71. {error, not_found} -> undefined;
  72. {ok, Top} ->
  73. NewTop = setelement(#iterator.next, Top, Id),
  74. kvs:put(NewTop),
  75. element(#iterator.id, NewTop) end end,
  76. C1 = setelement(#container.top, Container, Id),
  77. C2 = setelement(#container.entries_count, C1,
  78. element(#container.entries_count, Container)+1),
  79. kvs:put(C2),
  80. R = setelement(#iterator.feeds, Record,
  81. [ case F1 of
  82. {FN, Fd} -> {FN, Fd};
  83. _-> {F1, kvs:create(CName,{F1,element(#iterator.id,Record)})}
  84. end || F1 <- element(#iterator.feeds, Record)]),
  85. R1 = setelement(#iterator.next, R, Next),
  86. R2 = setelement(#iterator.prev, R1, Prev),
  87. R3 = setelement(#iterator.feed_id, R2, element(#container.id, Container)),
  88. kvs:put(R3),
  89. kvs:info(?MODULE,"[kvs] put: ~p~n", [element(#container.id,R3)]),
  90. {ok, R3}
  91. end;
  92. {aborted, Reason} -> kvs:info(?MODULE,"[kvs] aborted: ~p~n", [Reason]), {aborted, Reason};
  93. {ok, _} -> kvs:info(?MODULE,"[kvs] entry exist while put: ~p~n", [Id]), {error, exist} end.
  94. remove(RecordName, RecordId) ->
  95. case kvs:get(RecordName, RecordId) of
  96. {error, not_found} -> kvs:error("[kvs] can't remove ~p~n",[{RecordName,RecordId}]);
  97. {ok, E} ->
  98. Id = element(#iterator.id, E),
  99. CName = element(#iterator.container, E),
  100. Cid = element(#iterator.feed_id, E),
  101. {ok, Container} = kvs:get(CName, Cid),
  102. Top = element(#container.top, Container),
  103. Next = element(#iterator.next, E),
  104. Prev = element(#iterator.prev, E),
  105. case kvs:get(RecordName, Next) of
  106. {ok, NE} ->
  107. NewNext = setelement(#iterator.prev, NE, Prev),
  108. kvs:put(NewNext);
  109. _ -> ok end,
  110. case kvs:get(RecordName, Prev) of
  111. {ok, PE} ->
  112. NewPrev = setelement(#iterator.next, PE, Next),
  113. kvs:put(NewPrev);
  114. _ -> ok end,
  115. C1 = case Top of Id -> setelement(#container.top, Container, Prev); _ -> Container end,
  116. C2 = setelement(#container.entries_count, C1, element(#container.entries_count, Container)-1),
  117. kvs:put(C2),
  118. kvs:info(?MODULE,"[kvs] delete: ~p id: ~p~n", [RecordName, Id]),
  119. kvs:delete(RecordName, Id) end.
  120. remove(E) when is_tuple(E) ->
  121. Id = element(#iterator.id, E),
  122. CName = element(#iterator.container, E),
  123. Cid = element(#iterator.feed_id, E),
  124. case kvs:get(CName, Cid) of {ok, Container} ->
  125. Top = element(#container.top, Container),
  126. Next = element(#iterator.next, E),
  127. Prev = element(#iterator.prev, E),
  128. case kvs:get(element(1,E), Next) of
  129. {ok, NE} ->
  130. NewNext = setelement(#iterator.prev, NE, Prev),
  131. kvs:put(NewNext); _ -> ok end,
  132. case kvs:get(element(1,E), Prev) of
  133. {ok, PE} ->
  134. NewPrev = setelement(#iterator.next, PE, Next),
  135. kvs:put(NewPrev);
  136. _ -> ok end,
  137. C1 = case Top of Id -> setelement(#container.top, Container, Prev); _ -> Container end,
  138. C2 = setelement(#container.entries_count, C1, element(#container.entries_count, Container)-1),
  139. kvs:put(C2);
  140. _ -> skip end,
  141. kvs:info(?MODULE,"[kvs] delete: ~p", [Id]),
  142. kvs:delete(element(1,E), Id).
  143. traversal( _,undefined,_,_) -> [];
  144. traversal(_,_,0,_) -> [];
  145. traversal(RecordType2, Start, Count, Direction)->
  146. RecordType = table_type(RecordType2),
  147. case kvs:get(RecordType, Start) of
  148. {ok, R} -> Prev = element(Direction, R),
  149. Count1 = case Count of C when is_integer(C) -> C - 1; _-> Count end,
  150. [R | traversal(RecordType2, Prev, Count1, Direction)];
  151. Error -> [] end.
  152. entries(Name) -> Table = kvs:table(Name), entries(kvs:get(Table#table.container,Name), Name, undefined).
  153. entries(Name, Count) -> Table = kvs:table(Name), entries(kvs:get(Table#table.container,Name), Name, Count).
  154. entries({ok, Container}, RecordType, Count) -> entries(Container, RecordType, Count);
  155. entries(Container, RecordType, Count) when is_tuple(Container) ->
  156. traversal(RecordType, element(#container.top, Container), Count, #iterator.prev).
  157. entries(RecordType, Start, Count, Direction) ->
  158. E = traversal(RecordType, Start, Count, Direction),
  159. case Direction of #iterator.next -> lists:reverse(E); #iterator.prev -> E end.
  160. add_seq_ids() ->
  161. Init = fun(Key) ->
  162. case kvs:get(id_seq, Key) of
  163. {error, _} -> {Key,kvs:put(#id_seq{thing = Key, id = 0})};
  164. {ok, _} -> {Key,skip} end end,
  165. [ Init(atom_to_list(Name)) || {Name,_Fields} <- containers() ].
  166. put(Record) ->
  167. DBA=?DBA,
  168. DBA:put(Record).
  169. table_type(user2) -> user;
  170. table_type(A) -> A.
  171. range(RecordName,Id) -> Ranges = kvs:config(RecordName), find(Ranges,RecordName,Id).
  172. find([],_,Id) -> [];
  173. find([Range|T],RecordName,Id) ->
  174. case lookup(Range,Id) of
  175. [] -> find(T,RecordName,Id);
  176. Name -> Name end.
  177. lookup(#interval{left=Left,right=Right,name=Name},Id) when Id =< Right, Id >= Left -> Name;
  178. lookup(#interval{},Id) -> [].
  179. get(RecordName, Key) ->
  180. DBA=?DBA,
  181. case range(RecordName,Key) of
  182. [] -> DBA:get(RecordName, Key);
  183. Name -> DBA:get(Name, Key) end.
  184. get(RecordName, Key, Default) ->
  185. DBA=?DBA,
  186. case DBA:get(RecordName, Key) of
  187. {ok,{RecordName,Key,Value}} ->
  188. kvs:info(?MODULE,"[kvs] get config value: ~p~n", [{RecordName, Key, Value}]),
  189. {ok,Value};
  190. {error, _B} ->
  191. kvs:info(?MODULE,"[kvs] new config value: ~p~n", [{RecordName, Key, Default}]),
  192. DBA:put({RecordName,Key,Default}),
  193. {ok,Default} end.
  194. delete(Tab, Key) -> DBA=?DBA,DBA:delete(Tab, Key).
  195. count(RecordName) -> DBA=?DBA,DBA:count(RecordName).
  196. all(RecordName) -> DBA=?DBA,DBA:all(RecordName).
  197. index(RecordName, Key, Value) -> DBA=?DBA,DBA:index(RecordName, Key, Value).
  198. next_id(RecordName, Incr) -> DBA=?DBA,DBA:next_id(RecordName, Incr).
  199. save_db(Path) ->
  200. Data = lists:append([all(B) || B <- [list_to_atom(Name) || {table,Name} <- kvs:dir()] ]),
  201. kvs:save(Path, Data).
  202. load_db(Path) ->
  203. add_seq_ids(),
  204. AllEntries = kvs:load(Path),
  205. [kvs:put(E) || E <- lists:filter(fun(E) -> is_tuple(E) end ,AllEntries)].
  206. save(Dir, Value) ->
  207. filelib:ensure_dir(Dir),
  208. file:write_file(Dir, term_to_binary(Value)).
  209. load(Key) ->
  210. {ok, Bin} = file:read_file(Key),
  211. binary_to_term(Bin).
  212. notify(_EventPath, _Data) -> skip.
  213. config(Key) -> config(kvs, Key, "").
  214. config(App,Key) -> config(App,Key, "").
  215. config(App, Key, Default) -> case application:get_env(App,Key) of
  216. undefined -> Default;
  217. {ok,V} -> V end.
  218. log_modules() -> [].
  219. -define(ALLOWED, (config(kvs,log_modules,kvs))).
  220. log(Module, String, Args, Fun) ->
  221. case lists:member(Module,?ALLOWED:log_modules()) of
  222. true -> error_logger:Fun("~p:"++String, [Module|Args]);
  223. false -> skip end.
  224. info(Module,String, Args) -> log(Module,String, Args, info_msg).
  225. info(String, Args) -> log(?MODULE, String, Args, info_msg).
  226. info(String) -> log(?MODULE, String, [], info_msg).
  227. warning(Module,String, Args) -> log(Module, String, Args, warning_msg).
  228. warning(String, Args) -> log(?MODULE, String, Args, warning_msg).
  229. warning(String) -> log(?MODULE,String, [], warning_msg).
  230. error(Module,String, Args) -> log(Module, String, Args, error_msg).
  231. error(String, Args) -> log(?MODULE, String, Args, error_msg).
  232. error(String) -> log(?MODULE, String, [], error_msg).
  233. dump() ->
  234. io:format("~20w ~20w ~10w ~10w~n",[name,storage_type,memory,size]),
  235. [ io:format("~20w ~20w ~10w ~10w~n",[Name,
  236. mnesia:table_info(Name,storage_type),
  237. mnesia:table_info(Name,memory),
  238. mnesia:table_info(Name,size)]) || #table{name=Name} <- kvs:tables()],
  239. io:format("Snapshot taken: ~p~n",[calendar:now_to_datetime(now())]).