kvs.erl 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. -module(kvs).
  2. -behaviour(application).
  3. -behaviour(supervisor).
  4. -description('KVS Abstract Chain Store').
  5. -include_lib("stdlib/include/assert.hrl").
  6. -include("api.hrl").
  7. -include("metainfo.hrl").
  8. -include("stream.hrl").
  9. -include("cursors.hrl").
  10. -include("kvs.hrl").
  11. -include("backend.hrl").
  12. -export([dump/0,metainfo/0,ensure/1,seq_gen/0,fold/6,fold/7,head/1,head/2,fetch/2,fetch/3,feed/2]).
  13. -export(?API).
  14. -export(?STREAM).
  15. -compile(export_all).
  16. -export([init/1, start/2, stop/1]).
  17. -record('$msg', {id,next,prev,user,msg}).
  18. init([]) -> {ok, { {one_for_one, 5, 10}, []} }.
  19. start(_,_) -> supervisor:start_link({local, kvs}, kvs, []).
  20. stop(_) -> ok.
  21. test_tabs() -> [ #table{name='$msg', fields=record_info(fields,'$msg')} ].
  22. % kvs api
  23. dba() -> application:get_env(kvs,dba,kvs_mnesia).
  24. kvs_stream() -> application:get_env(kvs,dba_st,kvs_stream).
  25. all(Table) -> all (Table, #kvs{mod=dba()}).
  26. delete(Table,Key) -> delete (Table, Key, #kvs{mod=dba()}).
  27. get(Table,Key) -> ?MODULE:get (Table, Key, #kvs{mod=dba()}).
  28. index(Table,K,V) -> index (Table, K,V, #kvs{mod=dba()}).
  29. join() -> join ([], #kvs{mod=dba()}).
  30. dump() -> dump (#kvs{mod=dba()}).
  31. join(Node) -> join (Node, #kvs{mod=dba()}).
  32. leave() -> leave (#kvs{mod=dba()}).
  33. count(Table) -> count (Table, #kvs{mod=dba()}).
  34. put(Record) -> ?MODULE:put (Record, #kvs{mod=dba()}).
  35. fold(Fun,Acc,T,S,C,D) -> fold (Fun,Acc,T,S,C,D, #kvs{mod=dba()}).
  36. stop() -> stop_kvs(#kvs{mod=dba()}).
  37. start() -> start (#kvs{mod=dba()}).
  38. ver() -> ver(#kvs{mod=dba()}).
  39. dir() -> dir (#kvs{mod=dba()}).
  40. feed(Key) -> feed (Key, #kvs{mod=dba(),st=kvs_stream()}).
  41. seq(Table,DX) -> seq (Table, DX, #kvs{mod=dba()}).
  42. % stream api
  43. top (X) -> (kvs_stream()):top (X).
  44. bot (X) -> (kvs_stream()):bot (X).
  45. next (X) -> (kvs_stream()):next(X).
  46. prev (X) -> (kvs_stream()):prev(X).
  47. drop (X) -> (kvs_stream()):drop(X).
  48. take (X) -> (kvs_stream()):take(X).
  49. save (X) -> (kvs_stream()):save(X).
  50. cut (X,Y) -> (kvs_stream()):cut (X,Y).
  51. add (X) -> (kvs_stream()):add (X).
  52. append (X, Y) -> (kvs_stream()):append (X, Y).
  53. load_reader (X) -> (kvs_stream()):load_reader(X).
  54. writer (X) -> (kvs_stream()):writer(X).
  55. reader (X) -> (kvs_stream()):reader(X).
  56. ensure(#writer{id=Id}) ->
  57. case kvs:get(writer,Id) of
  58. {error,_} -> kvs:save(kvs:writer(Id)), ok;
  59. {ok,_} -> ok end.
  60. metainfo() -> #schema { name = kvs, tables = core() ++ test_tabs() }.
  61. core() -> [ #table { name = id_seq, fields = record_info(fields,id_seq), keys=[thing]} ].
  62. initialize(Backend, Module) ->
  63. [ begin
  64. Backend:create_table(T#table.name, [{attributes,T#table.fields},
  65. {T#table.copy_type, [node()]},{type,T#table.type}]),
  66. [ Backend:add_table_index(T#table.name, Key) || Key <- T#table.keys ],
  67. T
  68. end || T <- (Module:metainfo())#schema.tables ].
  69. all(Tab,#kvs{mod=DBA}) -> DBA:all(Tab).
  70. start(#kvs{mod=DBA}) -> DBA:start().
  71. stop_kvs(#kvs{mod=DBA}) -> DBA:stop().
  72. join(Node,#kvs{mod=DBA}) -> DBA:join(Node).
  73. leave(#kvs{mod=DBA}) -> DBA:leave().
  74. ver(#kvs{mod=DBA}) -> DBA:version().
  75. tables() -> lists:flatten([ (M:metainfo())#schema.tables || M <- modules() ]).
  76. table(Name) when is_atom(Name) -> lists:keyfind(Name,#table.name,tables());
  77. table(_) -> false.
  78. dir(#kvs{mod=DBA}) -> DBA:dir().
  79. modules() -> application:get_env(kvs,schema,[]).
  80. cursors() ->
  81. lists:flatten([ [ {T#table.name,T#table.fields}
  82. || #table{name=Name}=T <- (M:metainfo())#schema.tables, Name == reader orelse Name == writer ]
  83. || M <- modules() ]).
  84. fold(___,Acc,_,[],_,_,_) -> Acc;
  85. fold(___,Acc,_,undefined,_,_,_) -> Acc;
  86. fold(___,Acc,_,_,0,_,_) -> Acc;
  87. fold(Fun,Acc,Table,Start,Count,Direction,Driver) ->
  88. try
  89. case kvs:get(Table, Start, Driver) of
  90. {ok, R} -> Prev = element(Direction, R),
  91. Count1 = case Count of C when is_integer(C) -> C - 1; _-> Count end,
  92. fold(Fun, Fun(R,Acc), Table, Prev, Count1, Direction, Driver);
  93. _Error -> Acc
  94. end catch _:_ -> Acc end.
  95. seq_gen() ->
  96. Init = fun(Key) ->
  97. case kvs:get(id_seq, Key) of
  98. {error, _} -> {Key,kvs:put(#id_seq{thing = Key, id = 0})};
  99. {ok, _} -> {Key,skip} end end,
  100. [ Init(atom_to_list(Name)) || {Name,_Fields} <- cursors() ].
  101. put(Records,#kvs{mod=Mod}) when is_list(Records) -> Mod:put(Records);
  102. put(Record,#kvs{mod=Mod}) -> Mod:put(Record).
  103. get(RecordName, Key, #kvs{mod=Mod}) -> Mod:get(RecordName, Key).
  104. delete(Tab, Key, #kvs{mod=Mod}) -> Mod:delete(Tab, Key).
  105. count(Tab,#kvs{mod=DBA}) -> DBA:count(Tab).
  106. index(Tab, Key, Value,#kvs{mod=DBA}) -> DBA:index(Tab, Key, Value).
  107. seq(Tab, Incr,#kvs{mod=DBA}) -> DBA:seq(Tab, Incr).
  108. dump(#kvs{mod=Mod}) -> Mod:dump().
  109. feed(Key,#kvs{st=Mod}=KVS) -> (Mod:take((kvs:reader(Key))#reader{args=-1}))#reader.args.
  110. remove(Rec,Feed) -> remove(Rec,Feed,#kvs{mod=dba(),st=kvs_stream()}).
  111. remove(Rec,Feed, #kvs{st=Mod}=KVS) -> Mod:remove(Rec,Feed).
  112. head(Key) -> case (kvs:take((kvs:reader(Key))#reader{args=1}))#reader.args of [X] -> X; [] -> [] end.
  113. head(Key,Count) -> (kvs:take((kvs:reader(Key))#reader{args=Count,dir=1}))#reader.args.
  114. fetch(Table, Key) -> fetch(Table, Key, []).
  115. fetch(Table, Key, Default) -> case get(Table, Key) of
  116. {ok, Value} -> Value;
  117. _ -> Default
  118. end.