kvs.erl 7.4 KB


  1. -module(kvs).
  2. -behaviour(application).
  3. -behaviour(supervisor).
  4. -include_lib("stdlib/include/assert.hrl").
  5. -include("api.hrl").
  6. -include("metainfo.hrl").
  7. -include("stream.hrl").
  8. -include("cursors.hrl").
  9. -include("kvs.hrl").
  10. -include("backend.hrl").
  11. -export([dump/0,
  12. db/0,
  13. metainfo/0,
  14. ensure/1,
  15. ensure/2,
  16. seq_gen/0,
  17. keys/1,
  18. fields/1,
  19. defined/2,
  20. field/2,
  21. setfield/3]).
  22. -export([join/2, seq/3]).
  23. -export(?API)
  24. .
  25. -export(?STREAM)
  26. .
  27. -export([init/1, start/2, stop/1]).
  28. -record('$msg', {id, next, prev, user, msg}).
  29. init([]) -> {ok, {{one_for_one, 5, 10}, []}}.
  30. start(_, _) ->
  31. supervisor:start_link({local, kvs}, kvs, []).
  32. stop(_) -> ok.
  33. dba() -> application:get_env(kvs, dba, kvs_mnesia).
  34. seq_dba() -> application:get_env(kvs, dba_seq, kvs_mnesia).
  35. db() -> (dba()):db().
  36. kvs_stream() ->
  37. application:get_env(kvs, dba_st, kvs_stream).
  38. % kvs api
  39. all(Table) -> all(Table, #kvs{mod = dba(), db = db()}).
  40. delete(Table, Key) ->
  41. delete(Table, Key, #kvs{mod = dba(), db = db()}).
  42. get(Table, Key) ->
  43. (?MODULE):get(Table, Key, #kvs{mod = dba(), db = db()}).
  44. index(Table, K, V) ->
  45. index(Table, K, V, #kvs{mod = dba()}).
  46. keys(Feed) ->
  47. keys(Feed, #kvs{mod = dba(), db = db()}).
  48. match(Record) ->
  49. match(Record, #kvs{mod = dba()}).
  50. index_match(Record, Index) ->
  51. index_match(Record, Index, #kvs{mod = dba()}).
  52. join() -> join([], #kvs{mod = dba(), db = db()}).
  53. dump() -> dump(#kvs{mod = dba()}).
  54. join(Node) -> join(Node, #kvs{mod = dba(), db = db()}).
  55. leave() -> leave(#kvs{mod = dba(), db = db()}).
  56. destroy() -> destroy(#kvs{mod = dba(), db = db()}).
  57. count(Table) -> count(Table, #kvs{mod = dba()}).
  58. put(Record) -> (?MODULE):put(Record, #kvs{mod = dba(), db = db()}).
  59. stop() -> stop_kvs(#kvs{mod = dba()}).
  60. start() -> start(#kvs{mod = dba()}).
  61. ver() -> ver(#kvs{mod = dba()}).
  62. dir() -> dir(#kvs{mod = dba()}).
  63. feed(Key) ->
  64. feed(Key, #kvs{mod = dba(), st = kvs_stream(), db = db()}).
  65. seq([], DX) -> seq([], DX, #kvs{mod = kvs_rocks});
  66. seq(Table, DX) -> seq(Table, DX, #kvs{mod = seq_dba()}).
  67. remove(Rec, Feed) ->
  68. remove(Rec, Feed, #kvs{mod = dba(), st = kvs_stream(), db = db()}).
  69. put(Records, #kvs{mod = Mod, db = Db}) when is_list(Records) ->
  70. Mod:put(Records, Db);
  71. put(Record, #kvs{mod = Mod, db = Db}) -> Mod:put(Record, Db).
  72. get(RecordName, Key, #kvs{mod = Mod, db = Db}) ->
  73. Mod:get(RecordName, Key, Db).
  74. delete(Tab, Key, #kvs{mod = Mod, db = Db}) ->
  75. Mod:delete(Tab, Key, Db).
  76. delete_range(Feed, Last, #kvs{mod=DBA, db=Db}) ->
  77. DBA:delete_range(Feed,Last,Db).
  78. count(Tab, #kvs{mod = DBA}) -> DBA:count(Tab).
  79. index(Tab, Key, Value, #kvs{mod = DBA}) ->
  80. DBA:index(Tab, Key, Value).
  81. keys(Feed, #kvs{mod = DBA, db = Db}) ->
  82. DBA:keys(Feed, Db).
  83. match(Record, #kvs{mod = DBA}) ->
  84. DBA:match(Record).
  85. index_match(Record, Index, #kvs{mod = DBA}) ->
  86. DBA:index_match(Record, Index).
  87. seq(Tab, Incr, #kvs{mod = DBA}) -> DBA:seq(Tab, Incr).
  88. dump(#kvs{mod = Mod}) -> Mod:dump().
  89. feed(Tab, #kvs{st = Mod, db = Db}) -> Mod:feed(Tab, Db).
  90. remove(Rec, Feed, #kvs{st = Mod, db = Db}) ->
  91. Mod:remove(Rec, Feed, Db).
  92. all(Tab, #kvs{mod = DBA, db = Db}) -> DBA:all(Tab, Db).
  93. start(#kvs{mod = DBA}) -> DBA:start().
  94. stop_kvs(#kvs{mod = DBA}) -> DBA:stop().
  95. join(Node, #kvs{mod = DBA, db = Db}) -> DBA:join(Node, Db).
  96. leave(#kvs{mod = DBA, db = Db}) -> DBA:leave(Db).
  97. destroy(#kvs{mod = DBA, db = Db}) -> DBA:destroy(Db).
  98. ver(#kvs{mod = DBA}) -> DBA:version().
  99. dir(#kvs{mod = DBA}) -> DBA:dir().
  100. % stream api
  101. top(X) -> (kvs_stream()):top(X).
  102. top(X,#kvs{db = Db}) -> (kvs_stream()):top(X,Db).
  103. bot(X) -> (kvs_stream()):bot(X).
  104. bot(X,#kvs{db = Db}) -> (kvs_stream()):bot(X,Db).
  105. next(X) -> (kvs_stream()):next(X).
  106. next(X,#kvs{db = Db}) -> (kvs_stream()):next(X,Db).
  107. prev(X) -> (kvs_stream()):prev(X).
  108. prev(X,#kvs{db = Db}) -> (kvs_stream()):prev(X,Db).
  109. drop(X) -> (kvs_stream()):drop(X).
  110. drop(X,#kvs{db = Db}) -> (kvs_stream()):drop(X,Db).
  111. take(X) -> (kvs_stream()):take(X).
  112. take(X,#kvs{db = Db}) -> (kvs_stream()):take(X,Db).
  113. save(X) -> (kvs_stream()):save(X).
  114. save(X,#kvs{db = Db}) -> (kvs_stream()):save(X,Db).
  115. cut(X) -> (kvs_stream()):cut(X).
  116. cut(X,#kvs{db = Db}) -> (kvs_stream()):cut(X, Db).
  117. add(X) -> (kvs_stream()):add(X).
  118. add(X,#kvs{db = Db}) -> (kvs_stream()):add(X,Db).
  119. append(X, Y) -> (kvs_stream()):append(X, Y).
  120. append(X, Y, #kvs{db = Db}) -> (kvs_stream()):append(X, Y, Db).
  121. load_reader(X) -> (kvs_stream()):load_reader(X).
  122. load_reader(X,#kvs{db = Db}) -> (kvs_stream()):load_reader(X,Db).
  123. writer(X) -> (kvs_stream()):writer(X).
  124. writer(X,#kvs{db = Db}) -> (kvs_stream()):writer(X,Db).
  125. reader(X) -> (kvs_stream()):reader(X).
  126. reader(X,#kvs{db = Db}) -> (kvs_stream()):reader(X,Db).
  127. % unrevisited
  128. ensure(#writer{} = X) -> ensure(X,#kvs{mod = dba(), db = db()}).
  129. ensure(#writer{id = Id},#kvs{} = X) ->
  130. case kvs:get(writer, Id, X) of
  131. {error, _} ->
  132. kvs:save(kvs:writer(Id, X)),
  133. ok;
  134. {ok, _} -> ok
  135. end.
  136. cursors() ->
  137. lists:flatten([[{T#table.name, T#table.fields}
  138. || #table{name = Name} = T
  139. <- (M:metainfo())#schema.tables,
  140. Name == reader orelse Name == writer]
  141. || M <- modules()]).
  142. % metainfo api
  143. tables() ->
  144. lists:flatten([(M:metainfo())#schema.tables
  145. || M <- modules()]).
  146. modules() -> application:get_env(kvs, schema, []).
  147. metainfo() ->
  148. #schema{name = kvs, tables = core() ++ test_tabs()}.
  149. core() ->
  150. [#table{name = id_seq,
  151. fields = record_info(fields, id_seq), keys = [thing]}].
  152. test_tabs() ->
  153. [#table{name = '$msg',
  154. fields = record_info(fields, '$msg')}].
  155. table(Name) when is_atom(Name) ->
  156. lists:keyfind(Name, #table.name, tables());
  157. table(_) -> false.
  158. seq_gen() ->
  159. Init = fun (Key) ->
  160. case kvs:get(id_seq, Key) of
  161. {error, _} ->
  162. {Key, kvs:put(#id_seq{thing = Key, id = 0})};
  163. {ok, _} -> {Key, skip}
  164. end
  165. end,
  166. [Init(atom_to_list(Name))
  167. || {Name, _Fields} <- cursors()].
  168. initialize(Backend, Module) ->
  169. [begin
  170. Backend:create_table(T#table.name,
  171. [{attributes, T#table.fields},
  172. {T#table.copy_type, [node()]},
  173. {type, T#table.type}]),
  174. [Backend:add_table_index(T#table.name, Key)
  175. || Key <- T#table.keys],
  176. T
  177. end
  178. || T <- (Module:metainfo())#schema.tables].
  179. fields(Table) when is_atom(Table) ->
  180. case table(Table) of
  181. false -> [];
  182. T -> T#table.fields
  183. end.
  184. defined(TableRecord, Field) ->
  185. FieldsList = fields(element(1, TableRecord)),
  186. lists:member(Field, FieldsList).
  187. field(TableRecord, Field) ->
  188. FieldsList = fields(element(1, TableRecord)),
  189. Index = string:str(FieldsList, [Field]) + 1,
  190. element(Index, TableRecord).
  191. setfield(TableRecord, Field, Value) ->
  192. FieldsList = fields(element(1, TableRecord)),
  193. Index = string:str(FieldsList, [Field]) + 1,
  194. setelement(Index, TableRecord, Value).