kvs.erl 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. -module(kvs).
  2. -copyright('Synrc Research Center s.r.o.').
  3. -compile(export_all).
  4. -include_lib("stdlib/include/qlc.hrl").
  5. -include("config.hrl").
  6. -include("metainfo.hrl").
  7. -include("kvs.hrl").
  8. -include("api.hrl").
  9. % Public Main Backend is given in sys.config and
  10. % could be obtained with application:get_env(kvs,dba,store_mnesia).
  11. delete(Table,Key) -> delete (Table, Key, #kvs{mod=?DBA}).
  12. remove(Table,Key) -> remove (Table, Key, #kvs{mod=?DBA}).
  13. get(Table,Key) -> get (Table, Key, #kvs{mod=?DBA}).
  14. index(Table,K,V) -> index (Table, K,V, #kvs{mod=?DBA}).
  15. next_id(Table,DX) -> next_id (Table, DX, #kvs{mod=?DBA}).
  16. change_storage(Table,Type) -> change_storage(Table,Type, #kvs{mod=?DBA}).
  17. entries(A,B,C) -> entries (A,B,C, #kvs{mod=?DBA}).
  18. join() -> join ([], #kvs{mod=?DBA}).
  19. join(Node) -> join (Node, #kvs{mod=?DBA}).
  20. count(Table) -> count (Table, #kvs{mod=?DBA}).
  21. add(Table) -> add (Table, #kvs{mod=?DBA}).
  22. all(Table) -> all (Table, #kvs{mod=?DBA}).
  23. put(Table) -> put (Table, #kvs{mod=?DBA}).
  24. link(Table) -> link (Table, #kvs{mod=?DBA}).
  25. traversal(T,S,C,D) -> traversal(T,S,C,D, #kvs{mod=?DBA}).
  26. start() -> start (#kvs{mod=?DBA}).
  27. stop() -> stop (#kvs{mod=?DBA}).
  28. destroy() -> destroy (#kvs{mod=?DBA}).
  29. version() -> version (#kvs{mod=?DBA}).
  30. dir() -> dir (#kvs{mod=?DBA}).
  31. % Implementation
  32. init(Backend, Module) ->
  33. [ begin
  34. Backend:create_table(T#table.name, [{attributes,T#table.fields},{T#table.copy_type, [node()]}]),
  35. [ Backend:add_table_index(T#table.name, Key) || Key <- T#table.keys ],
  36. T
  37. end || T <- (Module:metainfo())#schema.tables ].
  38. start(#kvs{mod=DBA}) -> DBA:start().
  39. stop(#kvs{mod=DBA}) -> DBA:stop().
  40. change_storage(Type) -> [ change_storage(Name,Type) || #table{name=Name} <- kvs:tables() ].
  41. change_storage(Table,Type,#kvs{mod=DBA}) -> DBA:change_storage(Table,Type).
  42. destroy(#kvs{mod=DBA}) -> DBA:destroy().
  43. join(Node,#kvs{mod=DBA}) -> DBA:join(Node).
  44. version(#kvs{mod=DBA}) -> DBA:version().
  45. tables() -> lists:flatten([ (M:metainfo())#schema.tables || M <- modules() ]).
  46. table(Name) -> lists:keyfind(Name,#table.name,tables()).
  47. dir(#kvs{mod=DBA}) -> DBA:dir().
  48. modules() -> kvs:config(schema).
  49. containers() ->
  50. lists:flatten([ [ {T#table.name,T#table.fields}
  51. || T=#table{container=true} <- (M:metainfo())#schema.tables ]
  52. || M <- modules() ]).
  53. create(ContainerName) -> create(ContainerName, kvs:next_id(atom_to_list(ContainerName), 1), #kvs{mod=?DBA}).
  54. create(ContainerName, Id, Driver) ->
  55. kvs:info(?MODULE,"Create: ~p",[ContainerName]),
  56. Instance = list_to_tuple([ContainerName|proplists:get_value(ContainerName, kvs:containers())]),
  57. Top = setelement(#container.id,Instance,Id),
  58. Top2 = setelement(#container.top,Top,undefined),
  59. Top3 = setelement(#container.count,Top2,0),
  60. ok = kvs:put(Top3, Driver),
  61. Id.
  62. ensure_link(Record, #kvs{mod=_Store}=Driver) ->
  63. Id = element(2,Record),
  64. Type = table_type(element(1,Record)),
  65. CName = element(#iterator.container, Record),
  66. Cid = table_type(case element(#iterator.feed_id, Record) of
  67. undefined -> element(1,Record);
  68. Fid -> Fid end),
  69. Container = case kvs:get(CName, Cid, Driver) of
  70. {ok,Res} -> Res;
  71. {error, _} when Cid /= undefined ->
  72. NC = setelement(#container.id,
  73. list_to_tuple([CName|
  74. proplists:get_value(CName, kvs:containers())]), Cid),
  75. NC1 = setelement(#container.count, NC, 0),
  76. NC1;
  77. _Error -> error end,
  78. case Container of
  79. error -> {error, no_container};
  80. _ when element(#container.top,Container) == Id -> {error,just_added};
  81. _ ->
  82. Next = undefined,
  83. Prev = case element(#container.top, Container) of
  84. undefined -> undefined;
  85. Tid -> case kvs:get(Type, Tid, Driver) of
  86. {error, _} -> undefined;
  87. {ok, Top} ->
  88. NewTop = setelement(#iterator.next, Top, Id),
  89. kvs:put(NewTop, Driver),
  90. element(#iterator.id, NewTop) end end,
  91. C1 = setelement(#container.top, Container, Id),
  92. C2 = setelement(#container.count, C1,
  93. element(#container.count, Container)+1),
  94. kvs:put(C2, Driver), % Container
  95. R = setelement(#iterator.feeds, Record,
  96. [ case F1 of
  97. {FN, Fd} -> {FN, Fd};
  98. _-> {F1, kvs:create(CName,{F1,element(#iterator.id,Record)},Driver)}
  99. end || F1 <- element(#iterator.feeds, Record)]),
  100. R1 = setelement(#iterator.next, R, Next),
  101. R2 = setelement(#iterator.prev, R1, Prev),
  102. R3 = setelement(#iterator.feed_id, R2, element(#container.id, Container)),
  103. kvs:put(R3, Driver), % Iterator
  104. kvs:info(?MODULE,"Put: ~p~n", [element(#container.id,R3)]),
  105. {ok, R3}
  106. end.
  107. link(Record,#kvs{mod=_Store}=Driver) ->
  108. Id = element(#iterator.id, Record),
  109. case kvs:get(element(1,Record), Id, Driver) of
  110. {ok, Exists} -> ensure_link(Exists, Driver);
  111. {error, not_found} -> {error, not_found} end.
  112. add(Record, #kvs{mod=_Store}=Driver) when is_tuple(Record) ->
  113. Id = element(#iterator.id, Record),
  114. case kvs:get(element(1,Record), Id, Driver) of
  115. {error, _} -> ensure_link(Record, Driver);
  116. {aborted, Reason} -> {aborted, Reason};
  117. {ok, _} -> {error, exist} end.
  118. reverse(#iterator.prev) -> #iterator.next;
  119. reverse(#iterator.next) -> #iterator.prev.
  120. relink(Container, E, Driver) ->
  121. Id = element(#iterator.id, E),
  122. Next = element(#iterator.next, E),
  123. Prev = element(#iterator.prev, E),
  124. Top = element(#container.top, Container),
  125. case kvs:get(element(1,E), Prev, Driver) of
  126. {ok, PE} -> kvs:put(setelement(#iterator.next, PE, Next), Driver);
  127. _ -> ok end,
  128. case kvs:get(element(1,E), Next, Driver) of
  129. {ok, NE} -> kvs:put(setelement(#iterator.prev, NE, Prev), Driver);
  130. _ -> ok end,
  131. C = case Top of
  132. Id -> setelement(#container.top, Container, Prev);
  133. _ -> Container end,
  134. kvs:put(setelement(#container.count,C,element(#container.count,C)-1), Driver).
  135. delete(Tab, Key, #kvs{mod=Mod}) -> Mod:delete(Tab, Key).
  136. remove(Record,Id,#kvs{mod=Mod}=Driver) ->
  137. case Mod:get(Record,Id) of
  138. {error, not_found} -> kvs:error(?MODULE,"Can't remove ~p~n",[{Record,Id}]);
  139. {ok,R} -> do_remove(R,Driver) end.
  140. do_remove(E,#kvs{mod=Mod}=Driver) ->
  141. case Mod:get(element(#iterator.container,E),element(#iterator.feed_id,E)) of
  142. {ok, Container} -> relink(Container,E,Driver);
  143. _ -> skip end,
  144. kvs:info(?MODULE,"Delete: ~p", [E]),
  145. kvs:delete(element(1,E),element(2,E), Driver).
  146. traversal(Table, Start, Count, Direction, Driver)->
  147. fold(fun(A,Acc) -> [A|Acc] end,[],Table,Start,Count,Direction,Driver).
  148. fold(___,___,_,undefined,_,_,_) -> [];
  149. fold(___,Acc,_,_,0,_,_) -> Acc;
  150. fold(Fun,Acc,Table,Start,Count,Direction,Driver) ->
  151. RecordType = table_type(Table),
  152. case kvs:get(RecordType, Start, Driver) of
  153. {ok, R} -> Prev = element(Direction, R),
  154. Count1 = case Count of C when is_integer(C) -> C - 1; _-> Count end,
  155. fold(Fun, Fun(R,Acc), Table, Prev, Count1, Direction, Driver);
  156. Error -> kvs:error(?MODULE,"Error: ~p~n",[Error]), Acc end.
  157. entries({error,_},_,_,_) -> [];
  158. entries({ok,Container},N,C,Driver) -> entries(Container,N,C,Driver);
  159. entries(T,N,C,Driver) -> traversal(N,element(#container.top,T),C,#iterator.prev,Driver).
  160. entries(N, Start, Count, Direction, Driver) ->
  161. E = traversal(N, Start, Count, Direction, Driver),
  162. case Direction of #iterator.next -> lists:reverse(E);
  163. #iterator.prev -> E end.
  164. add_seq_ids() ->
  165. Init = fun(Key) ->
  166. case kvs:get(id_seq, Key) of
  167. {error, _} -> {Key,kvs:put(#id_seq{thing = Key, id = 0})};
  168. {ok, _} -> {Key,skip} end end,
  169. [ Init(atom_to_list(Name)) || {Name,_Fields} <- containers() ].
  170. put(Record,#kvs{mod=DBA}) -> DBA:put(Record).
  171. table_type(user2) -> user;
  172. table_type(A) -> A.
  173. range(RecordName,Id) -> Ranges = kvs:config(RecordName), find(Ranges,RecordName,Id).
  174. find([],_,_Id) -> [];
  175. find([Range|T],RecordName,Id) ->
  176. case lookup(Range,Id) of
  177. [] -> find(T,RecordName,Id);
  178. Name -> Name end.
  179. lookup(#interval{left=Left,right=Right,name=Name},Id) when Id =< Right, Id >= Left -> Name;
  180. lookup(#interval{},_Id) -> [].
  181. get(RecordName, Key, #kvs{mod=Mod}) ->
  182. case range(RecordName,Key) of
  183. [] -> Mod:get(RecordName, Key);
  184. Name -> Mod:get(Name, Key) end.
  185. count(RecordName,#kvs{mod=DBA}) -> DBA:count(RecordName).
  186. all(RecordName,#kvs{mod=DBA}) -> DBA:all(RecordName).
  187. index(RecordName, Key, Value,#kvs{mod=DBA}) -> DBA:index(RecordName, Key, Value).
  188. next_id(RecordName, Incr,#kvs{mod=DBA}) -> DBA:next_id(RecordName, Incr).
  189. save_db(Path) ->
  190. Data = lists:append([all(B) || B <- [list_to_atom(Name) || {table,Name} <- kvs:dir()] ]),
  191. kvs:save(Path, Data).
  192. load_db(Path) ->
  193. add_seq_ids(),
  194. AllEntries = kvs:load(Path),
  195. [kvs:put(E) || E <- lists:filter(fun(E) -> is_tuple(E) end ,AllEntries)].
  196. save(Dir, Value) ->
  197. filelib:ensure_dir(Dir),
  198. file:write_file(Dir, term_to_binary(Value)).
  199. load(Key) ->
  200. {ok, Bin} = file:read_file(Key),
  201. binary_to_term(Bin).
  202. notify(_EventPath, _Data) -> skip.
  203. config(Key) -> config(kvs, Key, "").
  204. config(App,Key) -> config(App,Key, "").
  205. config(App, Key, Default) -> case application:get_env(App,Key) of
  206. undefined -> Default;
  207. {ok,V} -> V end.
  208. log_modules() -> [].
  209. -define(ALLOWED, (config(kvs,log_modules,kvs))).
  210. log(Module, String, Args, Fun) ->
  211. case lists:member(Module,?ALLOWED:log_modules()) of
  212. true -> error_logger:Fun("~p:"++String, [Module|Args]);
  213. false -> skip end.
  214. info(Module, String, Args) -> log(Module, String, Args, info_msg).
  215. warning(Module,String, Args) -> log(Module, String, Args, warning_msg).
  216. error(Module, String, Args) -> log(Module, String, Args, error_msg).
  217. dump() ->
  218. io:format("~20w ~20w ~10w ~10w~n",[name,storage_type,memory,size]),
  219. [ io:format("~20w ~20w ~10w ~10w~n",[Name,
  220. mnesia:table_info(Name,storage_type),
  221. mnesia:table_info(Name,memory),
  222. mnesia:table_info(Name,size)]) || #table{name=Name} <- kvs:tables()],
  223. io:format("Snapshot taken: ~p~n",[calendar:now_to_datetime(os:timestamp())]).