kvs.erl 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. -module(kvs).
  2. -behaviour(application).
  3. -behaviour(supervisor).
  4. -include_lib("stdlib/include/assert.hrl").
  5. -include("api.hrl").
  6. -include("metainfo.hrl").
  7. -include("stream.hrl").
  8. -include("cursors.hrl").
  9. -include("kvs.hrl").
  10. -include("backend.hrl").
  11. -export([dump/0,
  12. db/0,
  13. metainfo/0,
  14. ensure/1,
  15. ensure/2,
  16. seq_gen/0,
  17. keys/1,
  18. find/2,
  19. fields/1,
  20. defined/2,
  21. field/2,
  22. setfield/3,
  23. remove/1]).
  24. -export([join/2, seq/3]).
  25. -export(?API)
  26. .
  27. -export(?STREAM)
  28. .
  29. -export([init/1, start/2, stop/1]).
  30. -record('$msg', {id, next, prev, user, msg}).
  31. init([]) -> {ok, {{one_for_one, 5, 10}, []}}.
  32. start(_, _) ->
  33. supervisor:start_link({local, kvs}, kvs, []).
  34. stop(_) -> ok.
  35. dba() -> application:get_env(kvs, dba, kvs_mnesia).
  36. seq_dba() -> application:get_env(kvs, dba_seq, kvs_mnesia).
  37. db() -> (dba()):db().
  38. kvs_stream() ->
  39. application:get_env(kvs, dba_st, kvs_stream).
  40. % kvs api
  41. all(Table) -> all(Table, #kvs{mod = dba(), db = db()}).
  42. delete(Table, Key) ->
  43. delete(Table, Key, #kvs{mod = dba(), db = db()}).
  44. get(Table, Key) ->
  45. (?MODULE):get(Table, Key, #kvs{mod = dba(), db = db()}).
  46. index(Table, K, V) ->
  47. index(Table, K, V, #kvs{mod = dba()}).
  48. keys(Feed) ->
  49. keys(Feed, #kvs{mod = dba(), db = db()}).
  50. find(Feed, Id) ->
  51. find(Feed,Id, #kvs{mod = dba(), db=db()}).
  52. match(Record) ->
  53. match(Record, #kvs{mod = dba()}).
  54. index_match(Record, Index) ->
  55. index_match(Record, Index, #kvs{mod = dba()}).
  56. join() -> join([], #kvs{mod = dba(), db = db()}).
  57. dump() -> dump(#kvs{mod = dba()}).
  58. join(Node) -> join(Node, #kvs{mod = dba(), db = db()}).
  59. leave() -> leave(#kvs{mod = dba(), db = db()}).
  60. destroy() -> destroy(#kvs{mod = dba(), db = db()}).
  61. count(Table) -> count(Table, #kvs{mod = dba()}).
  62. put(Record) -> (?MODULE):put(Record, #kvs{mod = dba(), db = db()}).
  63. stop() -> stop_kvs(#kvs{mod = dba()}).
  64. start() -> start(#kvs{mod = dba()}).
  65. ver() -> ver(#kvs{mod = dba()}).
  66. dir() -> dir(#kvs{mod = dba()}).
  67. feed(Key) ->
  68. feed(Key, #kvs{mod = dba(), st = kvs_stream(), db = db()}).
  69. seq([], DX) -> seq([], DX, #kvs{mod = kvs_rocks});
  70. seq(Table, DX) -> seq(Table, DX, #kvs{mod = seq_dba()}).
  71. remove(Rec, Feed) ->
  72. remove(Rec, Feed, #kvs{mod = dba(), st = kvs_stream(), db = db()}).
  73. put(Records, #kvs{mod = Mod, db = Db}) when is_list(Records) ->
  74. Mod:put(Records, Db);
  75. put(Record, #kvs{mod = Mod, db = Db}) -> Mod:put(Record, Db).
  76. get(RecordName, Key, #kvs{mod = Mod, db = Db}) ->
  77. Mod:get(RecordName, Key, Db).
  78. delete(Tab, Key, #kvs{mod = Mod, db = Db}) ->
  79. Mod:delete(Tab, Key, Db).
  80. delete_range(Feed, Last, #kvs{mod=DBA, db=Db}) ->
  81. DBA:delete_range(Feed,Last,Db).
  82. count(Tab, #kvs{mod = DBA}) -> DBA:count(Tab).
  83. index(Tab, Key, Value, #kvs{mod = DBA}) ->
  84. DBA:index(Tab, Key, Value).
  85. keys(Feed, #kvs{mod = DBA, db = Db}) ->
  86. DBA:keys(Feed, Db).
  87. find(Feed, Id, #kvs{mod = DBA, db = Db}) ->
  88. DBA:find(Feed, Id, Db).
  89. match(Record, #kvs{mod = DBA}) ->
  90. DBA:match(Record).
  91. index_match(Record, Index, #kvs{mod = DBA}) ->
  92. DBA:index_match(Record, Index).
  93. seq(Tab, Incr, #kvs{mod = DBA}) -> DBA:seq(Tab, Incr).
  94. dump(#kvs{mod = Mod}) -> Mod:dump().
  95. feed(Tab, #kvs{st = Mod, db = Db}) -> Mod:feed(Tab, Db).
  96. remove(Rec, Feed, #kvs{st = Mod, db = Db}) ->
  97. Mod:remove(Rec, Feed, Db).
  98. all(Tab, #kvs{mod = DBA, db = Db}) -> DBA:all(Tab, Db).
  99. start(#kvs{mod = DBA}) -> DBA:start().
  100. stop_kvs(#kvs{mod = DBA}) -> DBA:stop().
  101. join(Node, #kvs{mod = DBA, db = Db}) -> DBA:join(Node, Db).
  102. leave(#kvs{mod = DBA, db = Db}) -> DBA:leave(Db).
  103. destroy(#kvs{mod = DBA, db = Db}) -> DBA:destroy(Db).
  104. ver(#kvs{mod = DBA}) -> DBA:version().
  105. dir(#kvs{mod = DBA}) -> DBA:dir().
  106. % stream api
  107. top(X) -> (kvs_stream()):top(X).
  108. top(X,#kvs{db = Db}) -> (kvs_stream()):top(X,Db).
  109. bot(X) -> (kvs_stream()):bot(X).
  110. bot(X,#kvs{db = Db}) -> (kvs_stream()):bot(X,Db).
  111. next(X) -> (kvs_stream()):next(X).
  112. next(X,#kvs{db = Db}) -> (kvs_stream()):next(X,Db).
  113. prev(X) -> (kvs_stream()):prev(X).
  114. prev(X,#kvs{db = Db}) -> (kvs_stream()):prev(X,Db).
  115. drop(X) -> (kvs_stream()):drop(X).
  116. drop(X,#kvs{db = Db}) -> (kvs_stream()):drop(X,Db).
  117. take(X) -> (kvs_stream()):take(X).
  118. take(X,#kvs{db = Db}) -> (kvs_stream()):take(X,Db).
  119. save(X) -> (kvs_stream()):save(X).
  120. save(X,#kvs{db = Db}) -> (kvs_stream()):save(X,Db).
  121. remove(X) -> (kvs_stream()):remove(X).
  122. cut(X) -> (kvs_stream()):cut(X).
  123. cut(X,#kvs{db = Db}) -> (kvs_stream()):cut(X, Db).
  124. add(X) -> (kvs_stream()):add(X).
  125. add(X,#kvs{db = Db}) -> (kvs_stream()):add(X,Db).
  126. append(X, Y) -> (kvs_stream()):append(X, Y).
  127. append(X, Y, #kvs{db = Db}) -> (kvs_stream()):append(X, Y, Db).
  128. load_reader(X) -> (kvs_stream()):load_reader(X).
  129. load_reader(X,#kvs{db = Db}) -> (kvs_stream()):load_reader(X,Db).
  130. writer(X) -> (kvs_stream()):writer(X).
  131. writer(X,#kvs{db = Db}) -> (kvs_stream()):writer(X,Db).
  132. reader(X) -> (kvs_stream()):reader(X).
  133. reader(X,#kvs{db = Db}) -> (kvs_stream()):reader(X,Db).
  134. % unrevisited
  135. ensure(#writer{} = X) -> ensure(X,#kvs{mod = dba(), db = db()}).
  136. ensure(#writer{id = Id},#kvs{} = X) ->
  137. case kvs:get(writer, Id, X) of
  138. {error, _} ->
  139. kvs:save(kvs:writer(Id, X)),
  140. ok;
  141. {ok, _} -> ok
  142. end.
  143. cursors() ->
  144. lists:flatten([[{T#table.name, T#table.fields}
  145. || #table{name = Name} = T
  146. <- (M:metainfo())#schema.tables,
  147. Name == reader orelse Name == writer]
  148. || M <- modules()]).
  149. % metainfo api
  150. tables() ->
  151. lists:flatten([(M:metainfo())#schema.tables
  152. || M <- modules()]).
  153. modules() -> application:get_env(kvs, schema, []).
  154. metainfo() ->
  155. #schema{name = kvs, tables = core() ++ test_tabs()}.
  156. core() ->
  157. [#table{name = id_seq,
  158. fields = record_info(fields, id_seq), keys = [thing]}].
  159. test_tabs() ->
  160. [#table{name = '$msg',
  161. fields = record_info(fields, '$msg')}].
  162. table(Name) when is_atom(Name) ->
  163. lists:keyfind(Name, #table.name, tables());
  164. table(_) -> false.
  165. seq_gen() ->
  166. Init = fun (Key) ->
  167. case kvs:get(id_seq, Key) of
  168. {error, _} ->
  169. {Key, kvs:put(#id_seq{thing = Key, id = 0})};
  170. {ok, _} -> {Key, skip}
  171. end
  172. end,
  173. [Init(atom_to_list(Name))
  174. || {Name, _Fields} <- cursors()].
  175. initialize(Backend, Module) ->
  176. [begin
  177. Backend:create_table(T#table.name,
  178. [{attributes, T#table.fields},
  179. {T#table.copy_type, [node()]},
  180. {type, T#table.type}]),
  181. [Backend:add_table_index(T#table.name, Key)
  182. || Key <- T#table.keys],
  183. T
  184. end
  185. || T <- (Module:metainfo())#schema.tables].
  186. fields(Table) when is_atom(Table) ->
  187. case table(Table) of
  188. false -> [];
  189. T -> T#table.fields
  190. end.
  191. defined(TableRecord, Field) ->
  192. FieldsList = fields(element(1, TableRecord)),
  193. lists:member(Field, FieldsList).
  194. field(TableRecord, Field) ->
  195. FieldsList = fields(element(1, TableRecord)),
  196. Index = string:str(FieldsList, [Field]) + 1,
  197. element(Index, TableRecord).
  198. setfield(TableRecord, Field, Value) ->
  199. FieldsList = fields(element(1, TableRecord)),
  200. Index = string:str(FieldsList, [Field]) + 1,
  201. setelement(Index, TableRecord, Value).