Browse Source

initial table partitions support

Namdak Tonpa 9 years ago
parent
commit
88dd14305c
3 changed files with 53 additions and 16 deletions
  1. 1 1
      include/metainfo.hrl
  2. 51 14
      src/kvs.erl
  3. 1 1
      src/store/store_mnesia.erl

+ 1 - 1
include/metainfo.hrl

@@ -2,7 +2,7 @@
 -define(METAINFO_HRL, true).
 
 -record(schema, {name,tables=[]}).
--record(table,  {name,container=feed,fields=[],keys=[],copy_type=disc_copies,columns,order_by}).
+-record(table,  {name,container=feed,fields=[],keys=[],copy_type=application:get_env(kvs,mnesia_media,disc_copies),columns,order_by}).
 -record(column, {name,type,key=false,ro=false,transform}).
 -record(query,  {body,types=[],values=[],next_ph_num = 1}).
 

+ 51 - 14
src/kvs.erl

@@ -45,7 +45,7 @@ stop(#kvs{mod=DBA}) -> DBA:stop().
 change_storage(Type) -> [ change_storage(Name,Type) || #table{name=Name} <- kvs:tables() ].
 change_storage(Table,Type,#kvs{mod=DBA}) -> DBA:change_storage(Table,Type).
 destroy(#kvs{mod=DBA}) -> DBA:destroy().
-join(Node,#kvs{mod=DBA}) -> DBA:join(Node).
+join(Node,#kvs{mod=DBA}) -> DBA:join(Node), load_partitions().
 version(#kvs{mod=DBA}) -> DBA:version().
 tables() -> lists:flatten([ (M:metainfo())#schema.tables || M <- modules() ]).
 table(Name) -> lists:keyfind(Name,#table.name,tables()).
@@ -73,7 +73,7 @@ ensure_link(Record, #kvs{mod=_Store}=Driver) ->
     Type  = table_type(element(1,Record)),
     CName = element(#iterator.container, Record),
     Cid   = table_type(case element(#iterator.feed_id, Record) of
-               undefined -> element(1,Record);
+               undefined -> table_type(element(1,Record));
                      Fid -> Fid end),
 
     Container = case kvs:get(CName, Cid, Driver) of
@@ -94,11 +94,10 @@ ensure_link(Record, #kvs{mod=_Store}=Driver) ->
                        Prev = case element(#container.top, Container) of
                                    undefined -> undefined;
                                    Tid -> case kvs:get(Type, Tid, Driver) of
-                                              {error, _} -> undefined;
-                                                       {ok, Top} ->
-                                        NewTop = setelement(#iterator.next, Top, Id),
-                                        kvs:put(NewTop, Driver),
-                                        element(#iterator.id, NewTop) end end,
+                                               {error, _} -> undefined;
+                                               {ok, Top}  -> NewTop = setelement(#iterator.next, Top, Id),
+                                                             kvs:put(NewTop, Driver),
+                                                             element(#iterator.id, NewTop) end end,
 
                        C1 = setelement(#container.top, Container, Id),
                        C2 = setelement(#container.count, C1,
@@ -125,13 +124,13 @@ ensure_link(Record, #kvs{mod=_Store}=Driver) ->
 
 link(Record,#kvs{mod=_Store}=Driver) ->
     Id = element(#iterator.id, Record),
-    case kvs:get(element(1,Record), Id, Driver) of
+    case kvs:get(table_type(element(1,Record)), Id, Driver) of
               {ok, Exists} -> ensure_link(Exists, Driver);
         {error, not_found} -> {error, not_found} end.
 
 add(Record, #kvs{mod=_Store}=Driver) when is_tuple(Record) ->
     Id = element(#iterator.id, Record),
-    case kvs:get(element(1,Record), Id, Driver) of
+    case kvs:get(table_type(element(1,Record)), Id, Driver) of
                 {error, _} -> ensure_link(Record, Driver);
          {aborted, Reason} -> {aborted, Reason};
                    {ok, _} -> {error, exist} end.
@@ -173,10 +172,13 @@ do_remove(E,#kvs{mod=Mod}=Driver) ->
 traversal(Table, Start, Count, Direction, Driver)->
     fold(fun(A,Acc) -> [A|Acc] end,[],Table,Start,Count,Direction,Driver).
 
+% kvs:fold(fun(X,A)->[X|A]end,[],process,2152,-1,#iterator.next,#kvs{mod=store_mnesia}).
+
 fold(___,___,_,undefined,_,_,_) -> [];
 fold(___,Acc,_,_,0,_,_) -> Acc;
 fold(Fun,Acc,Table,Start,Count,Direction,Driver) ->
     RecordType = table_type(Table),
+    %io:format("fold: ~p~n",[{RecordType, Start, Driver}]),
     case kvs:get(RecordType, Start, Driver) of
          {ok, R} -> Prev = element(Direction, R),
                     Count1 = case Count of C when is_integer(C) -> C - 1; _-> Count end,
@@ -198,13 +200,16 @@ add_seq_ids() ->
                 {ok, _} -> {Key,skip} end end,
     [ Init(atom_to_list(Name))  || {Name,_Fields} <- containers() ].
 
-
-
-put(Record,#kvs{mod=DBA}) -> DBA:put(Record).
+put(Record,#kvs{mod=Mod}) ->
+    case range(element(1,Record),element(2,Record)) of
+         [] -> Mod:put(Record);
+         Name ->  Mod:put(setelement(1,Record,Name)) end.
 
 table_type(user2) -> user;
 table_type(A) -> A.
 
+
+
 range(RecordName,Id) -> Ranges = kvs:config(RecordName), find(Ranges,RecordName,Id).
 
 find([],_,_Id) -> [];
@@ -218,8 +223,10 @@ lookup(#interval{},_Id) -> [].
 
 get(RecordName, Key, #kvs{mod=Mod}) ->
     case range(RecordName,Key) of
-         [] -> Mod:get(RecordName, Key);
-         Name ->  Mod:get(Name, Key) end.
+         []   -> Mod:get(RecordName, Key);
+         Name -> case Mod:get(Name, Key) of
+                      {ok,Record} -> {ok,setelement(1,Record,RecordName)};
+                      Else -> Else end end.
 
 count(RecordName,#kvs{mod=DBA}) -> DBA:count(RecordName).
 all(RecordName,#kvs{mod=DBA}) -> DBA:all(RecordName).
@@ -270,3 +277,33 @@ dump() ->
          mnesia:table_info(Name,memory),
          mnesia:table_info(Name,size)]) || #table{name=Name} <- kvs:tables()],
      io:format("Snapshot taken: ~p~n",[calendar:now_to_datetime(os:timestamp())]).
+
+                % Table Partitions
+
+load_partitions() -> [ case kvs:get(config,Table) of
+                            {ok,{config,_,List}} -> application:set_env(kvs,Table,List);
+                             Else -> ok end || {table,Table} <- kvs:dir() ].
+
+limit()        -> 10000000000000000000.
+store(Table,X) -> application:set_env(kvs,Table,X), X.
+cname(Table)   -> list_to_atom(lists:concat([process,(element(2,kvs:get(id_seq,lists:concat([process,".tables"]))))#id_seq.id-1])).
+fold(N)        -> kvs:fold(fun(X,A)->[X|A]end,[],process,N,-1,#iterator.next,#kvs{mod=store_mnesia}).
+top(Table)     -> (element(2,kvs:get(id_seq,atom_to_list(Table))))#id_seq.id.
+name(T)        -> list_to_atom(lists:concat([T,kvs:next_id(lists:concat([T,".tables"]),1)])).
+init(T)        -> store_mnesia:create_table(T#table.name, [{attributes,T#table.fields},{T#table.copy_type, [node()]}]),
+                [ store_mnesia:add_table_index(T#table.name, Key) || Key <- T#table.keys ].
+
+                    % rotate DETS table
+
+interval(L,R,Name) -> #interval{left=L,right=R,name=Name}.
+rotate(Table)      -> Name = name(Table), init(setelement(#table.name,kvs:table(Table),Name)),
+                      kvs:put(#config{key   = Table,
+                                      value = store(Table,case kvs:get(config,Table)  of
+                                              {error,not_found}        -> update_list(Table,[],Name);
+                                              {ok,#config{value=List}} -> update_list(Table,List,Name) end)}).
+
+update_list(Table,List,Name) ->
+    [ interval(top(Table)+1,limit(),Name) ] ++
+    case lists:keyfind(cname(Table),#interval.name,List) of
+         false -> List;
+         CI -> lists:keyreplace(cname(Table),#interval.name,List,CI#interval{right=top(Table)}) end.

+ 1 - 1
src/store/store_mnesia.erl

@@ -13,7 +13,7 @@ dir()      -> [{table,T}||T<-mnesia:system_info(local_tables)].
 join([])   -> mnesia:change_table_copy_type(schema, node(), disc_copies), initialize();
 join(Node) ->
     mnesia:change_config(extra_db_nodes, [Node]),
-    mnesia:change_table_copy_type(schema, node(), kvs:config(kvs,mnesia_media,disc_copies)),
+    mnesia:change_table_copy_type(schema, node(), disc_copies),
     [{Tb, mnesia:add_table_copy(Tb, node(), Type)}
      || {Tb, [{N, Type}]} <- [{T, mnesia:table_info(T, where_to_commit)}
                                || T <- mnesia:system_info(tables)], Node==N].