kvs.erl 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. -module(kvs).
  2. -copyright('Synrc Research Center s.r.o.').
  3. -compile(export_all).
  4. %-include_lib("stdlib/include/qlc.hrl").
  5. -include("config.hrl").
  6. -include("metainfo.hrl").
  7. -include("kvs.hrl").
  8. -include("user.hrl").
  9. -include("api.hrl").
  10. % Public Main Backend is given in sys.config and
  11. % could be obtained with application:get_env(kvs,dba,store_mnesia).
  12. delete(Table,Key) -> delete (Table, Key, #kvs{mod=?DBA}).
  13. remove(Table,Key) -> remove (Table, Key, #kvs{mod=?DBA}).
  14. get(Table,Key) -> get (Table, Key, #kvs{mod=?DBA}).
  15. index(Table,K,V) -> index (Table, K,V, #kvs{mod=?DBA}).
  16. change_storage(Table,Type) -> change_storage(Table,Type, #kvs{mod=?DBA}).
  17. entries(A,B,C) -> entries (A,B,C, #kvs{mod=?DBA}).
  18. join() -> join ([], #kvs{mod=?DBA}).
  19. join(Node) -> join (Node, #kvs{mod=?DBA}).
  20. count(Table) -> count (Table, #kvs{mod=?DBA}).
  21. add(Table) -> add (Table, #kvs{mod=?DBA}).
  22. all(Table) -> all (Table, #kvs{mod=?DBA}).
  23. put(Table) -> put (Table, #kvs{mod=?DBA}).
  24. link(Table) -> link (Table, #kvs{mod=?DBA}).
  25. fold(Fun,Acc,T,S,C,D) -> fold (Fun,Acc,T,S,C,D,#kvs{mod=?DBA}).
  26. traversal(T,S,C,D) -> traversal(T,S,C,D, #kvs{mod=?DBA}).
  27. info(T) -> info (T, #kvs{mod=?DBA}).
  28. start() -> start (#kvs{mod=?DBA}).
  29. stop() -> stop (#kvs{mod=?DBA}).
  30. destroy() -> destroy (#kvs{mod=?DBA}).
  31. version() -> version (#kvs{mod=?DBA}).
  32. dir() -> dir (#kvs{mod=?DBA}).
  33. next_id(Table,DX) -> next_id(Table, DX, #kvs{mod=?DBA}).
  34. generation(Table,Key) ->
  35. case Key - topleft(Table,Key) < norm(application:get_env(kvs,generation,{?MODULE,limit}),Table,Key) of
  36. true -> skip;
  37. false -> kvs:rotate(Table) end.
  38. norm({A,B},Table,Key) -> A:B(Table,Key);
  39. norm(_,Table,Key) -> limit(Table,Key).
  40. limit(user,_Key) -> 2;
  41. limit(comment,_Key) -> 2;
  42. limit(_Table,_Key) -> 250000.
  43. forbid(user) -> 3;
  44. forbid(comment) -> 3;
  45. forbid(____) -> 100000.
  46. % Implementation
  47. init(Backend, Module) ->
  48. [ begin
  49. Backend:create_table(T#table.name, [{attributes,T#table.fields},{T#table.copy_type, [node()]}]),
  50. [ Backend:add_table_index(T#table.name, Key) || Key <- T#table.keys ],
  51. T
  52. end || T <- (Module:metainfo())#schema.tables ].
  53. start(#kvs{mod=DBA}) -> DBA:start().
  54. stop(#kvs{mod=DBA}) -> DBA:stop().
  55. change_storage(Type) -> [ change_storage(Name,Type) || #table{name=Name} <- kvs:tables() ].
  56. change_storage(Table,Type,#kvs{mod=DBA}) -> DBA:change_storage(Table,Type).
  57. destroy(#kvs{mod=DBA}) -> DBA:destroy().
  58. join(Node,#kvs{mod=DBA}) -> DBA:join(Node), rotate_new(), load_partitions(), load_config().
  59. version(#kvs{mod=DBA}) -> DBA:version().
  60. tables() -> lists:flatten([ (M:metainfo())#schema.tables || M <- modules() ]).
  61. table(Name) when is_atom(Name) -> lists:keyfind(rname(Name),#table.name,tables());
  62. table(_) -> false.
  63. dir(#kvs{mod=DBA}) -> DBA:dir().
  64. info(T,#kvs{mod=DBA}) -> DBA:info(T).
  65. modules() -> kvs:config(schema).
  66. containers() ->
  67. lists:flatten([ [ {T#table.name,T#table.fields}
  68. || T=#table{container=true} <- (M:metainfo())#schema.tables ]
  69. || M <- modules() ]).
  70. create(ContainerName) -> create(ContainerName, kvs:next_id(atom_to_list(ContainerName), 1), #kvs{mod=?DBA}).
  71. create(ContainerName, Id, Driver) ->
  72. kvs:info(?MODULE,"Create: ~p",[ContainerName]),
  73. Instance = list_to_tuple([ContainerName|proplists:get_value(ContainerName, kvs:containers())]),
  74. Top = setelement(#container.id,Instance,Id),
  75. Top2 = setelement(#container.top,Top,[]),
  76. Top3 = setelement(#container.count,Top2,0),
  77. ok = kvs:put(Top3, Driver),
  78. Id.
  79. ensure_link(Record, #kvs{mod=_Store}=Driver) ->
  80. Id = element(2,Record),
  81. Type = rname(element(1,Record)),
  82. CName = element(#iterator.container, Record),
  83. Cid = case element(#iterator.feed_id, Record) of
  84. [] -> rname(element(1,Record));
  85. undefined -> rname(element(1,Record));
  86. Fid -> Fid end,
  87. Container = case kvs:get(CName, Cid, Driver) of
  88. {ok,Res} -> Res;
  89. {error, _} when Cid /= undefined andalso Cid /= [] ->
  90. NC = setelement(#container.id,
  91. list_to_tuple([CName|
  92. proplists:get_value(CName, kvs:containers())]), Cid),
  93. NC1 = setelement(#container.count, NC, 0),
  94. NC1;
  95. _Error -> error end,
  96. case Container of
  97. error -> {error, no_container};
  98. _ when element(#container.top,Container) == Id -> {error,just_added};
  99. _ ->
  100. Top = case element(#container.top, Container) of
  101. undefined -> [];
  102. [] -> [];
  103. Tid -> case kvs:get(Type, Tid, Driver) of
  104. {error, _} -> [];
  105. {ok, T} -> setelement(#iterator.next, T, Id) end end,
  106. Prev = case Top of undefined -> [];
  107. [] -> [];
  108. E -> element(#iterator.id, E) end,
  109. Next = [],
  110. C1 = setelement(#container.top, Container, Id),
  111. C2 = setelement(#container.count, C1,
  112. element(#container.count, Container)+1),
  113. R = setelement(#iterator.feeds, Record,
  114. [ case F1 of
  115. {FN, Fd} -> {FN, Fd};
  116. _-> {F1, kvs:create(CName,{F1,element(#iterator.id,Record)},Driver)}
  117. end || F1 <- element(#iterator.feeds, Record)]),
  118. R1 = setelement(#iterator.next, R, Next),
  119. R2 = setelement(#iterator.prev, R1, Prev),
  120. R3 = setelement(#iterator.feed_id, R2, element(#container.id, Container)),
  121. case {kvs:put(R3, Driver),Top} of % Iterator
  122. {ok,[]} -> kvs:put(C2, Driver); % Container
  123. {ok,Top} -> kvs:put(C2, Driver),
  124. kvs:put(Top, Driver);
  125. __ -> kvs:error(?MODULE,"Error Updating Iterator: ~p~n",
  126. [element(#container.id,R3)]) end,
  127. kvs:info(?MODULE,"Put: ~p~n", [element(#container.id,R3)]),
  128. {ok, R3}
  129. end.
  130. link(Record,#kvs{mod=_Store}=Driver) ->
  131. Id = element(#iterator.id, Record),
  132. case kvs:get(rname(element(1,Record)), Id, Driver) of
  133. {ok, Exists} -> ensure_link(Exists, Driver);
  134. {error, not_found} -> {error, not_found} end.
  135. %add(Record, #kvs{mod=store_mnesia}=Driver) when is_tuple(Record) -> store_mnesia:add(Record);
  136. add(Record, #kvs{mod=Store}=Driver) when is_tuple(Record) -> append(Record,Driver).
  137. append(Record, #kvs{mod=_Store}=Driver) when is_tuple(Record) ->
  138. Id = element(#iterator.id, Record),
  139. Name = rname(element(1,Record)),
  140. generation(Name, Id),
  141. case kvs:get(Name, Id, Driver) of
  142. {error, _} -> ensure_link(Record, Driver);
  143. {aborted, Reason} -> {aborted, Reason};
  144. {ok, _} -> {error, exist} end.
  145. reverse(#iterator.prev) -> #iterator.next;
  146. reverse(#iterator.next) -> #iterator.prev.
  147. relink(Container, E, Driver) ->
  148. Id = element(#iterator.id, E),
  149. Next = element(#iterator.next, E),
  150. Prev = element(#iterator.prev, E),
  151. Top = element(#container.top, Container),
  152. case kvs:get(element(1,E), Prev, Driver) of
  153. {ok, PE} -> kvs:put(setelement(#iterator.next, PE, Next), Driver);
  154. _ -> ok end,
  155. case kvs:get(element(1,E), Next, Driver) of
  156. {ok, NE} -> kvs:put(setelement(#iterator.prev, NE, Prev), Driver);
  157. _ -> ok end,
  158. C = case Top of
  159. Id -> setelement(#container.top, Container, Prev);
  160. _ -> Container end,
  161. case element(#container.top,C) of
  162. undefined -> kvs:delete(element(1,C),element(#container.id,C));
  163. _ -> kvs:put(setelement(#container.count,C,element(#container.count,C)-1), Driver) end.
  164. delete(Tab, Key, #kvs{mod=Mod}) ->
  165. case range(Tab,Key) of
  166. [] -> Mod:delete(Tab, Key);
  167. T -> Mod:delete(T, Key) end.
  168. remove(Record, Id,#kvs{mod=store_mnesia}=Driver) -> store_mnesia:remove(Record,Id);
  169. remove(Record, Id,#kvs{mod=Store}=Driver) -> takeoff(Record,Id,Driver).
  170. takeoff(Record,Id,#kvs{mod=Mod}=Driver) ->
  171. case get(Record,Id) of
  172. {error, not_found} -> kvs:error(?MODULE,"Can't remove ~p~n",[{Record,Id}]);
  173. {ok,R} -> do_remove(R,Driver) end.
  174. do_remove(E,#kvs{mod=Mod}=Driver) ->
  175. case get(element(#iterator.container,E),element(#iterator.feed_id,E)) of
  176. {ok, C} -> relink(C,E,Driver);
  177. _ -> skip end,
  178. kvs:info(?MODULE,"Delete: ~p", [E]),
  179. kvs:delete(element(1,E),element(2,E), Driver).
  180. traversal(Table, Start, Count, Direction, Driver)->
  181. fold(fun(A,Acc) -> [A|Acc] end,[],Table,Start,Count,Direction,Driver).
  182. % kvs:fold(fun(X,A)->[X|A]end,[],process,2152,-1,#iterator.next,#kvs{mod=store_mnesia}).
  183. fold(___,___,_,undefined,_,_,_) -> [];
  184. fold(___,Acc,_,_,0,_,_) -> Acc;
  185. fold(Fun,Acc,Table,Start,Count,Direction,Driver) ->
  186. %io:format("fold: ~p~n",[{Table, Start, Driver}]),
  187. try
  188. case kvs:get(rname(Table), Start, Driver) of
  189. {ok, R} -> Prev = element(Direction, R),
  190. Count1 = case Count of C when is_integer(C) -> C - 1; _-> Count end,
  191. fold(Fun, Fun(R,Acc), Table, Prev, Count1, Direction, Driver);
  192. Error -> %kvs:error(?MODULE,"Error: ~p~n",[Error]),
  193. Acc end catch _:_ -> Acc end.
  194. entries({error,_},_,_,_) -> [];
  195. entries({ok,Container},N,C,Driver) -> entries(Container,N,C,Driver);
  196. entries(T,N,C,Driver) -> traversal(N,element(#container.top,T),C,#iterator.prev,Driver).
  197. entries(N, Start, Count, Direction, Driver) ->
  198. E = traversal(N, Start, Count, Direction, Driver),
  199. case Direction of #iterator.next -> lists:reverse(E);
  200. #iterator.prev -> E end.
  201. add_seq_ids() ->
  202. Init = fun(Key) ->
  203. case kvs:get(id_seq, Key) of
  204. {error, _} -> {Key,kvs:put(#id_seq{thing = Key, id = 0})};
  205. {ok, _} -> {Key,skip} end end,
  206. [ Init(atom_to_list(Name)) || {Name,_Fields} <- containers() ].
  207. put(Record,#kvs{mod=Mod}) ->
  208. case range(element(1,Record),element(2,Record)) of
  209. [] -> Mod:put(Record);
  210. Name -> Mod:put(setelement(1,Record,Name)) end.
  211. get(RecordName, Key, #kvs{mod=Mod}) ->
  212. case range(RecordName,Key) of
  213. [] -> Mod:get(RecordName, Key);
  214. Name -> case Mod:get(Name, Key) of
  215. {ok,Record} -> {ok,setelement(1,Record,kvs:last(RecordName,Key))};
  216. Else -> Else end end.
  217. count(Tab,#kvs{mod=DBA}) -> lists:foldl(fun(T,A) -> DBA:count(T) + A end, 0, rlist(Tab)).
  218. all(Tab,#kvs{mod=DBA}) ->
  219. lists:flatten([ rnorm(rname(Tab),DBA:all(T)) || T <- rlist(Tab) ]).
  220. index(Tab, Key, Value,#kvs{mod=DBA}) ->
  221. lists:flatten([ rnorm(rname(Tab),DBA:index(T, Key, Value)) || T <- rlist(Tab) ]).
  222. next_id(Tab, Incr,#kvs{mod=DBA}) ->
  223. DBA:next_id(case table(Tab) of #table{} -> atom_to_list(Tab); _ -> Tab end, Incr).
  224. save_db(Path) ->
  225. Data = lists:append([all(B) || B <- [list_to_atom(Name) || {table,Name} <- kvs:dir()] ]),
  226. kvs:save(Path, Data).
  227. load_db(Path) ->
  228. add_seq_ids(),
  229. AllEntries = kvs:load(Path),
  230. [kvs:put(E) || E <- lists:filter(fun(E) -> is_tuple(E) end ,AllEntries)].
  231. save(Dir, Value) ->
  232. filelib:ensure_dir(Dir),
  233. file:write_file(Dir, term_to_binary(Value)).
  234. load(Key) ->
  235. {ok, Bin} = file:read_file(Key),
  236. binary_to_term(Bin).
  237. notify(_EventPath, _Data) -> skip.
  238. config(Key) -> config(kvs, Key, "").
  239. config(App,Key) -> config(App,Key, "").
  240. config(App, Key, Default) -> case application:get_env(App,Key) of
  241. undefined -> Default;
  242. {ok,V} -> V end.
  243. log_modules() -> [].
  244. -define(ALLOWED, (config(kvs,log_modules,kvs))).
  245. log(Module, String, Args, Fun) ->
  246. case lists:member(Module,?ALLOWED:log_modules()) of
  247. true -> error_logger:Fun("~p:"++String, [Module|Args]);
  248. false -> skip end.
  249. info(Module, String, Args) -> log(Module, String, Args, info_msg).
  250. warning(Module,String, Args) -> log(Module, String, Args, warning_msg).
  251. error(Module, String, Args) -> log(Module, String, Args, error_msg).
  252. dump() -> dump([ rlist(N) || #table{name=N} <- kvs:tables() ]).
  253. dump(short) ->
  254. Gen = fun(T) ->
  255. {S,M,C}=lists:unzip3([ dump_info(R) || R <- rlist(T) ]),
  256. {lists:usort(S),lists:sum(M),lists:sum(C)}
  257. end,
  258. dump_format([ {T,Gen(T)} || T <- [ N || #table{name=N} <- kvs:tables() ] ]);
  259. dump(Table) when is_atom(Table) -> dump(rlist(Table));
  260. dump(Tables) ->
  261. dump_format([{case nname(T) of 1 -> rname(T); _ -> T end,dump_info(T)} || T <- lists:flatten(Tables) ]).
  262. dump_info(T) ->
  263. {mnesia:table_info(T,storage_type),
  264. mnesia:table_info(T,memory) * erlang:system_info(wordsize) / 1024 / 1024,
  265. mnesia:table_info(T,size)}.
  266. dump_format(List) ->
  267. io:format("~20s ~32s ~14s ~10s~n~n",["NAME","STORAGE TYPE","MEMORY (MB)","ELEMENTS"]),
  268. [ io:format("~20s ~32w ~14.2f ~10b~n",[T,S,M,C]) || {T,{S,M,C}} <- List ],
  269. io:format("~nSnapshot taken: ~p~n",[calendar:now_to_datetime(os:timestamp())]).
  270. % Table Partitions
  271. range(RecordName,Id) -> (find(kvs:config(kvs:rname(RecordName)),RecordName,Id))#interval.name.
  272. topleft(RecordName,Id) -> (find(kvs:config(kvs:rname(RecordName)),RecordName,Id))#interval.left.
  273. last(RecordName,Id) -> (find(kvs:config(kvs:rname(RecordName)),RecordName,Id))#interval.last.
  274. find([],_,_Id) -> #interval{left=1,right=infinity,name=[],last=[]};
  275. find([Range|T],RecordName,Id) ->
  276. case lookup(Range,Id) of
  277. [] -> find(T,RecordName,Id);
  278. Interval -> Interval end.
  279. lookup(#interval{left=Left,right=Right,name=Name}=I,Id) when Id =< Right, Id >= Left -> I;
  280. lookup(#interval{},_) -> [].
  281. rotate_new() ->
  282. N = [ kvs:rotate(kvs:table(T)) || {T,_} <- fold_tables(),
  283. length(proplists:get_value(attributes,info(last_disc(T)),[])) /= length((table(rname(T)))#table.fields) ],
  284. io:format("Nonexistent: ~p~n",[N]), N.
  285. rotate(#table{name=N}) ->
  286. R = name(rname(N)),
  287. init(setelement(N,kvs:table(N),R)),
  288. update_config(rname(N),R);
  289. rotate(Table) ->
  290. Intervals = kvs:config(Table),
  291. {M,F} = application:get_env(kvs,forbidding,{?MODULE,forbid}),
  292. New = lists:sublist(Intervals,M:F(Table)),
  293. Delete = Intervals -- New,
  294. [ mnesia:change_table_copy_type(Name, node(), disc_only_copies) || #interval{name=Name}
  295. <- shd(Delete) ],
  296. rotate(kvs:table(Table)), ok.
  297. load_partitions() ->
  298. [ case kvs:get(config,Table) of
  299. {ok,{config,_,List}} -> application:set_env(kvs,Table,List);
  300. Else -> ok end || {table,Table} <- kvs:dir() ].
  301. rnorm(Tag,List) -> [ setelement(1,R,Tag) || R <- List ].
  302. rlist(Table) -> [ N || #interval{name=N} <- kvs:config(Table) ]++[Table].
  303. shd([]) -> [];
  304. shd(X) -> [hd(X)].
  305. wait() -> timer:tc(fun() -> mnesia:wait_for_tables([ T#table.name || T <- kvs:tables()],infinity) end).
  306. stl([]) -> [];
  307. stl(S) -> tl(S).
  308. dat(T) -> [ mnesia:change_table_copy_type(Name, node(), disc_only_copies) || #interval{name=Name} <- stl((element(2,kvs:get(config,T)))#config.value) ].
  309. omitone(1) -> [];
  310. omitone(X) -> X.
  311. limit() -> infinity.
  312. load_config() -> [ application:set_env(kvs,Key,Value) || #config{key=Key,value=Value}<- kvs:all(config) ].
  313. store(Table,X) -> application:set_env(kvs,Table,X), X.
  314. rname(Table) -> list_to_atom(lists:filter(fun(X) -> not lists:member(X,"1234567890") end, atom_to_list(Table))).
  315. nname(Table) -> list_to_integer(case lists:filter(fun(X) -> lists:member(X,"1234567890") end, atom_to_list(Table)) of [] -> "1"; E -> E end).
  316. fold(N) -> kvs:fold(fun(X,A)->[X|A]end,[],process,N,-1,#iterator.next,#kvs{mod=store_mnesia}).
  317. top(acl) -> id_seq(conv);
  318. top(i) -> id_seq(conv);
  319. top(Table) -> id_seq(Table).
  320. name(T) -> list_to_atom(lists:concat([T,omitone(kvs:next_id(lists:concat([T,".tables"]),1))])).
  321. init(T) -> store_mnesia:create_table(T#table.name, [{attributes,T#table.fields},{T#table.copy_type, [node()]}]),
  322. [ store_mnesia:add_table_index(T#table.name, Key) || Key <- T#table.keys ].
  323. id_seq(Tab) -> T = atom_to_list(Tab), case kvs:get(id_seq,T) of {ok,#id_seq{id=Id}} -> Id; _ -> kvs:next_id(T,1) end.
  324. last_disc(T) -> list_to_atom(lists:concat([T,omitone(kvs:id_seq(list_to_atom(lists:concat([T,".tables"]))))])).
  325. last_table(T) -> list_to_atom(lists:concat([T,omitone(lists:max(proplists:get_value(T,fold_tables(),[1])))])).
  326. fold_tables() -> lists:foldl(fun(#table{name=X},Acc) ->
  327. setkey(kvs:rname(X),1,Acc,{kvs:rname(X),[kvs:nname(X)|proplists:get_value(kvs:rname(X),Acc,[])]}) end,
  328. [], kvs:tables()).
  329. interval(L,R,Name) -> #interval{left=L,right=R,name=Name,last=last_table(rname(Name))}.
  330. update_config(Table,Name) ->
  331. kvs:put(#config{key = Table,
  332. value = store(Table,case kvs:get(config,Table) of
  333. {error,not_found} -> update_list(Table,[],Name);
  334. {ok,#config{value=List}} -> update_list(Table,List,Name) end)}).
  335. update_list(Table,[],Name) -> [ interval(top(Table)+1,limit(),Name) ];
  336. update_list(Table,[#interval{}=CI|Tail],Name) -> [ interval(top(Table)+1,limit(),Name) ] ++
  337. [ CI#interval{right=top(Table)} ] ++ Tail.
  338. setkey(Name,Pos,List,New) ->
  339. case lists:keyfind(Name,Pos,List) of
  340. false -> [New|List];
  341. _Element -> lists:keyreplace(Name,Pos,List,New) end.
  342. test() ->
  343. kvs:join(),
  344. [ kvs:add(#user{id=kvs:next_id("user",1)}) || _ <- lists:seq(1,20) ],
  345. io:format("Config: ~p~n",[kvs:all(config)]),
  346. io:format("Fetch: ~p~n",[kvs:entries(kvs:get(feed,user),user,infinity)]).