kvs.erl 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. -module(kvs).
  2. -behaviour(application).
  3. -behaviour(supervisor).
  4. -include_lib("stdlib/include/assert.hrl").
  5. -include("api.hrl").
  6. -include("metainfo.hrl").
  7. -include("stream.hrl").
  8. -include("cursors.hrl").
  9. -include("kvs.hrl").
  10. -include("backend.hrl").
  11. -export([dump/0,
  12. metainfo/0,
  13. ensure/1,
  14. ensure/2,
  15. seq_gen/0,
  16. fields/1,
  17. defined/2,
  18. field/2,
  19. setfield/3,
  20. cut/2]).
  21. -export([join/2, seq/3]).
  22. -export(?API)
  23. .
  24. -export(?STREAM)
  25. .
  26. -export([init/1, start/2, stop/1]).
  27. -record('$msg', {id, next, prev, user, msg}).
  28. init([]) -> {ok, {{one_for_one, 5, 10}, []}}.
  29. start(_, _) ->
  30. supervisor:start_link({local, kvs}, kvs, []).
  31. stop(_) -> ok.
  32. dba() -> application:get_env(kvs, dba, kvs_mnesia).
  33. db() -> (dba()):db().
  34. kvs_stream() ->
  35. application:get_env(kvs, dba_st, kvs_stream).
  36. % kvs api
  37. all(Table) -> all(Table, #kvs{mod = dba(), db = db()}).
  38. delete(Table, Key) ->
  39. delete(Table, Key, #kvs{mod = dba(), db = db()}).
  40. get(Table, Key) ->
  41. (?MODULE):get(Table, Key, #kvs{mod = dba(), db = db()}).
  42. index(Table, K, V) ->
  43. index(Table, K, V, #kvs{mod = dba()}).
  44. join() -> join([], #kvs{mod = dba(), db = db()}).
  45. dump() -> dump(#kvs{mod = dba()}).
  46. join(Node) -> join(Node, #kvs{mod = dba(), db = db()}).
  47. leave() -> leave(#kvs{mod = dba(), db = db()}).
  48. destroy() -> destroy(#kvs{mod = dba(), db = db()}).
  49. count(Table) -> count(Table, #kvs{mod = dba()}).
  50. put(Record) -> (?MODULE):put(Record, #kvs{mod = dba(), db = db()}).
  51. stop() -> stop_kvs(#kvs{mod = dba()}).
  52. start() -> start(#kvs{mod = dba()}).
  53. ver() -> ver(#kvs{mod = dba()}).
  54. dir() -> dir(#kvs{mod = dba()}).
  55. feed(Key) ->
  56. feed(Key, #kvs{mod = dba(), st = kvs_stream(), db = db()}).
  57. seq(Table, DX) -> seq(Table, DX, #kvs{mod = dba()}).
  58. remove(Rec, Feed) ->
  59. remove(Rec, Feed, #kvs{mod = dba(), st = kvs_stream(), db = db()}).
  60. put(Records, #kvs{mod = Mod, db = Db}) when is_list(Records) ->
  61. Mod:put(Records, Db);
  62. put(Record, #kvs{mod = Mod, db = Db}) -> Mod:put(Record, Db).
  63. get(RecordName, Key, #kvs{mod = Mod, db = Db}) ->
  64. Mod:get(RecordName, Key, Db).
  65. delete(Tab, Key, #kvs{mod = Mod, db = Db}) ->
  66. Mod:delete(Tab, Key, Db).
  67. count(Tab, #kvs{mod = DBA}) -> DBA:count(Tab).
  68. index(Tab, Key, Value, #kvs{mod = DBA}) ->
  69. DBA:index(Tab, Key, Value).
  70. seq(Tab, Incr, #kvs{mod = DBA}) -> DBA:seq(Tab, Incr).
  71. dump(#kvs{mod = Mod}) -> Mod:dump().
  72. feed(Tab, #kvs{st = Mod, db = Db}) -> Mod:feed(Tab, Db).
  73. remove(Rec, Feed, #kvs{st = Mod, db = Db}) ->
  74. Mod:remove(Rec, Feed, Db).
  75. all(Tab, #kvs{mod = DBA, db = Db}) -> DBA:all(Tab, Db).
  76. start(#kvs{mod = DBA}) -> DBA:start().
  77. stop_kvs(#kvs{mod = DBA}) -> DBA:stop().
  78. join(Node, #kvs{mod = DBA, db = Db}) -> DBA:join(Node, Db).
  79. leave(#kvs{mod = DBA, db = Db}) -> DBA:leave(Db).
  80. destroy(#kvs{mod = DBA, db = Db}) -> DBA:destroy(Db).
  81. ver(#kvs{mod = DBA}) -> DBA:version().
  82. dir(#kvs{mod = DBA}) -> DBA:dir().
  83. % stream api
  84. top(X) -> (kvs_stream()):top(X).
  85. top(X,#kvs{db = Db}) -> (kvs_stream()):top(X,Db).
  86. bot(X) -> (kvs_stream()):bot(X).
  87. bot(X,#kvs{db = Db}) -> (kvs_stream()):bot(X,Db).
  88. next(X) -> (kvs_stream()):next(X).
  89. next(X,#kvs{db = Db}) -> (kvs_stream()):next(X,Db).
  90. prev(X) -> (kvs_stream()):prev(X).
  91. prev(X,#kvs{db = Db}) -> (kvs_stream()):prev(X,Db).
  92. drop(X) -> (kvs_stream()):drop(X).
  93. drop(X,#kvs{db = Db}) -> (kvs_stream()):drop(X,Db).
  94. take(X) -> (kvs_stream()):take(X).
  95. take(X,#kvs{db = Db}) -> (kvs_stream()):take(X,Db).
  96. save(X) -> (kvs_stream()):save(X).
  97. save(X,#kvs{db = Db}) -> (kvs_stream()):save(X,Db).
  98. cut(X, Y) -> (kvs_stream()):cut(X, Y).
  99. add(X) -> (kvs_stream()):add(X).
  100. add(X,#kvs{db = Db}) -> (kvs_stream()):add(X,Db).
  101. append(X, Y) -> (kvs_stream()):append(X, Y).
  102. append(X, Y, #kvs{db = Db}) -> (kvs_stream()):append(X, Y, Db).
  103. load_reader(X) -> (kvs_stream()):load_reader(X).
  104. load_reader(X,#kvs{db = Db}) -> (kvs_stream()):load_reader(X,Db).
  105. writer(X) -> (kvs_stream()):writer(X).
  106. writer(X,#kvs{db = Db}) -> (kvs_stream()):writer(X,Db).
  107. reader(X) -> (kvs_stream()):reader(X).
  108. reader(X,#kvs{db = Db}) -> (kvs_stream()):reader(X,Db).
  109. % unrevisited
  110. ensure(#writer{} = X) -> ensure(X,#kvs{mod = dba(), db = db()}).
  111. ensure(#writer{id = Id},#kvs{} = X) ->
  112. case kvs:get(writer, Id, X) of
  113. {error, _} ->
  114. kvs:save(kvs:writer(Id, X)),
  115. ok;
  116. {ok, _} -> ok
  117. end.
  118. cursors() ->
  119. lists:flatten([[{T#table.name, T#table.fields}
  120. || #table{name = Name} = T
  121. <- (M:metainfo())#schema.tables,
  122. Name == reader orelse Name == writer]
  123. || M <- modules()]).
  124. % metainfo api
  125. tables() ->
  126. lists:flatten([(M:metainfo())#schema.tables
  127. || M <- modules()]).
  128. modules() -> application:get_env(kvs, schema, []).
  129. metainfo() ->
  130. #schema{name = kvs, tables = core() ++ test_tabs()}.
  131. core() ->
  132. [#table{name = id_seq,
  133. fields = record_info(fields, id_seq), keys = [thing]}].
  134. test_tabs() ->
  135. [#table{name = '$msg',
  136. fields = record_info(fields, '$msg')}].
  137. table(Name) when is_atom(Name) ->
  138. lists:keyfind(Name, #table.name, tables());
  139. table(_) -> false.
  140. seq_gen() ->
  141. Init = fun (Key) ->
  142. case kvs:get(id_seq, Key) of
  143. {error, _} ->
  144. {Key, kvs:put(#id_seq{thing = Key, id = 0})};
  145. {ok, _} -> {Key, skip}
  146. end
  147. end,
  148. [Init(atom_to_list(Name))
  149. || {Name, _Fields} <- cursors()].
  150. initialize(Backend, Module) ->
  151. [begin
  152. Backend:create_table(T#table.name,
  153. [{attributes, T#table.fields},
  154. {T#table.copy_type, [node()]},
  155. {type, T#table.type}]),
  156. [Backend:add_table_index(T#table.name, Key)
  157. || Key <- T#table.keys],
  158. T
  159. end
  160. || T <- (Module:metainfo())#schema.tables].
  161. fields(Table) when is_atom(Table) ->
  162. case table(Table) of
  163. false -> [];
  164. T -> T#table.fields
  165. end.
  166. defined(TableRecord, Field) ->
  167. FieldsList = fields(element(1, TableRecord)),
  168. lists:member(Field, FieldsList).
  169. field(TableRecord, Field) ->
  170. FieldsList = fields(element(1, TableRecord)),
  171. Index = string:str(FieldsList, [Field]) + 1,
  172. element(Index, TableRecord).
  173. setfield(TableRecord, Field, Value) ->
  174. FieldsList = fields(element(1, TableRecord)),
  175. Index = string:str(FieldsList, [Field]) + 1,
  176. setelement(Index, TableRecord, Value).