kvs.erl 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. -module(kvs).
  2. -behaviour(application).
  3. -behaviour(supervisor).
  4. -include_lib("stdlib/include/assert.hrl").
  5. -include("api.hrl").
  6. -include("metainfo.hrl").
  7. -include("stream.hrl").
  8. -include("cursors.hrl").
  9. -include("kvs.hrl").
  10. -include("backend.hrl").
  11. -export([dump/0,
  12. metainfo/0,
  13. ensure/1,
  14. ensure/2,
  15. seq_gen/0,
  16. fields/1,
  17. defined/2,
  18. field/2,
  19. setfield/3,
  20. cut/2]).
  21. -export([join/2, seq/3]).
  22. -export(?API)
  23. .
  24. -export(?STREAM)
  25. .
  26. -export([init/1, start/2, stop/1]).
  27. -record('$msg', {id, next, prev, user, msg}).
  28. init([]) -> {ok, {{one_for_one, 5, 10}, []}}.
  29. start(_, _) ->
  30. supervisor:start_link({local, kvs}, kvs, []).
  31. stop(_) -> ok.
  32. dba() -> application:get_env(kvs, dba, kvs_mnesia).
  33. seq_dba() -> application:get_env(kvs, seq_dba, kvs_mnesia).
  34. db() -> (dba()):db().
  35. kvs_stream() ->
  36. application:get_env(kvs, dba_st, kvs_stream).
  37. % kvs api
  38. all(Table) -> all(Table, #kvs{mod = dba(), db = db()}).
  39. delete(Table, Key) ->
  40. delete(Table, Key, #kvs{mod = dba(), db = db()}).
  41. get(Table, Key) ->
  42. (?MODULE):get(Table, Key, #kvs{mod = dba(), db = db()}).
  43. index(Table, K, V) ->
  44. index(Table, K, V, #kvs{mod = dba()}).
  45. join() -> join([], #kvs{mod = dba(), db = db()}).
  46. dump() -> dump(#kvs{mod = dba()}).
  47. join(Node) -> join(Node, #kvs{mod = dba(), db = db()}).
  48. leave() -> leave(#kvs{mod = dba(), db = db()}).
  49. destroy() -> destroy(#kvs{mod = dba(), db = db()}).
  50. count(Table) -> count(Table, #kvs{mod = dba()}).
  51. put(Record) -> (?MODULE):put(Record, #kvs{mod = dba(), db = db()}).
  52. stop() -> stop_kvs(#kvs{mod = dba()}).
  53. start() -> start(#kvs{mod = dba()}).
  54. ver() -> ver(#kvs{mod = dba()}).
  55. dir() -> dir(#kvs{mod = dba()}).
  56. feed(Key) ->
  57. feed(Key, #kvs{mod = dba(), st = kvs_stream(), db = db()}).
  58. seq(Table, DX) -> seq(Table, DX, #kvs{mod = seq_dba()}).
  59. remove(Rec, Feed) ->
  60. remove(Rec, Feed, #kvs{mod = dba(), st = kvs_stream(), db = db()}).
  61. put(Records, #kvs{mod = Mod, db = Db}) when is_list(Records) ->
  62. Mod:put(Records, Db);
  63. put(Record, #kvs{mod = Mod, db = Db}) -> Mod:put(Record, Db).
  64. get(RecordName, Key, #kvs{mod = Mod, db = Db}) ->
  65. Mod:get(RecordName, Key, Db).
  66. delete(Tab, Key, #kvs{mod = Mod, db = Db}) ->
  67. Mod:delete(Tab, Key, Db).
  68. count(Tab, #kvs{mod = DBA}) -> DBA:count(Tab).
  69. index(Tab, Key, Value, #kvs{mod = DBA}) ->
  70. DBA:index(Tab, Key, Value).
  71. seq(Tab, Incr, #kvs{mod = DBA}) -> DBA:seq(Tab, Incr).
  72. dump(#kvs{mod = Mod}) -> Mod:dump().
  73. feed(Tab, #kvs{st = Mod, db = Db}) -> Mod:feed(Tab, Db).
  74. remove(Rec, Feed, #kvs{st = Mod, db = Db}) ->
  75. Mod:remove(Rec, Feed, Db).
  76. all(Tab, #kvs{mod = DBA, db = Db}) -> DBA:all(Tab, Db).
  77. start(#kvs{mod = DBA}) -> DBA:start().
  78. stop_kvs(#kvs{mod = DBA}) -> DBA:stop().
  79. join(Node, #kvs{mod = DBA, db = Db}) -> DBA:join(Node, Db).
  80. leave(#kvs{mod = DBA, db = Db}) -> DBA:leave(Db).
  81. destroy(#kvs{mod = DBA, db = Db}) -> DBA:destroy(Db).
  82. ver(#kvs{mod = DBA}) -> DBA:version().
  83. dir(#kvs{mod = DBA}) -> DBA:dir().
  84. % stream api
  85. top(X) -> (kvs_stream()):top(X).
  86. top(X,#kvs{db = Db}) -> (kvs_stream()):top(X,Db).
  87. bot(X) -> (kvs_stream()):bot(X).
  88. bot(X,#kvs{db = Db}) -> (kvs_stream()):bot(X,Db).
  89. next(X) -> (kvs_stream()):next(X).
  90. next(X,#kvs{db = Db}) -> (kvs_stream()):next(X,Db).
  91. prev(X) -> (kvs_stream()):prev(X).
  92. prev(X,#kvs{db = Db}) -> (kvs_stream()):prev(X,Db).
  93. drop(X) -> (kvs_stream()):drop(X).
  94. drop(X,#kvs{db = Db}) -> (kvs_stream()):drop(X,Db).
  95. take(X) -> (kvs_stream()):take(X).
  96. take(X,#kvs{db = Db}) -> (kvs_stream()):take(X,Db).
  97. save(X) -> (kvs_stream()):save(X).
  98. save(X,#kvs{db = Db}) -> (kvs_stream()):save(X,Db).
  99. cut(X, Y) -> (kvs_stream()):cut(X, Y).
  100. add(X) -> (kvs_stream()):add(X).
  101. add(X,#kvs{db = Db}) -> (kvs_stream()):add(X,Db).
  102. append(X, Y) -> (kvs_stream()):append(X, Y).
  103. append(X, Y, #kvs{db = Db}) -> (kvs_stream()):append(X, Y, Db).
  104. load_reader(X) -> (kvs_stream()):load_reader(X).
  105. load_reader(X,#kvs{db = Db}) -> (kvs_stream()):load_reader(X,Db).
  106. writer(X) -> (kvs_stream()):writer(X).
  107. writer(X,#kvs{db = Db}) -> (kvs_stream()):writer(X,Db).
  108. reader(X) -> (kvs_stream()):reader(X).
  109. reader(X,#kvs{db = Db}) -> (kvs_stream()):reader(X,Db).
  110. % unrevisited
  111. ensure(#writer{} = X) -> ensure(X,#kvs{mod = dba(), db = db()}).
  112. ensure(#writer{id = Id},#kvs{} = X) ->
  113. case kvs:get(writer, Id, X) of
  114. {error, _} ->
  115. kvs:save(kvs:writer(Id, X)),
  116. ok;
  117. {ok, _} -> ok
  118. end.
  119. cursors() ->
  120. lists:flatten([[{T#table.name, T#table.fields}
  121. || #table{name = Name} = T
  122. <- (M:metainfo())#schema.tables,
  123. Name == reader orelse Name == writer]
  124. || M <- modules()]).
  125. % metainfo api
  126. tables() ->
  127. lists:flatten([(M:metainfo())#schema.tables
  128. || M <- modules()]).
  129. modules() -> application:get_env(kvs, schema, []).
  130. metainfo() ->
  131. #schema{name = kvs, tables = core() ++ test_tabs()}.
  132. core() ->
  133. [#table{name = id_seq,
  134. fields = record_info(fields, id_seq), keys = [thing]}].
  135. test_tabs() ->
  136. [#table{name = '$msg',
  137. fields = record_info(fields, '$msg')}].
  138. table(Name) when is_atom(Name) ->
  139. lists:keyfind(Name, #table.name, tables());
  140. table(_) -> false.
  141. seq_gen() ->
  142. Init = fun (Key) ->
  143. case kvs:get(id_seq, Key) of
  144. {error, _} ->
  145. {Key, kvs:put(#id_seq{thing = Key, id = 0})};
  146. {ok, _} -> {Key, skip}
  147. end
  148. end,
  149. [Init(atom_to_list(Name))
  150. || {Name, _Fields} <- cursors()].
  151. initialize(Backend, Module) ->
  152. [begin
  153. Backend:create_table(T#table.name,
  154. [{attributes, T#table.fields},
  155. {T#table.copy_type, [node()]},
  156. {type, T#table.type}]),
  157. [Backend:add_table_index(T#table.name, Key)
  158. || Key <- T#table.keys],
  159. T
  160. end
  161. || T <- (Module:metainfo())#schema.tables].
  162. fields(Table) when is_atom(Table) ->
  163. case table(Table) of
  164. false -> [];
  165. T -> T#table.fields
  166. end.
  167. defined(TableRecord, Field) ->
  168. FieldsList = fields(element(1, TableRecord)),
  169. lists:member(Field, FieldsList).
  170. field(TableRecord, Field) ->
  171. FieldsList = fields(element(1, TableRecord)),
  172. Index = string:str(FieldsList, [Field]) + 1,
  173. element(Index, TableRecord).
  174. setfield(TableRecord, Field, Value) ->
  175. FieldsList = fields(element(1, TableRecord)),
  176. Index = string:str(FieldsList, [Field]) + 1,
  177. setelement(Index, TableRecord, Value).