Browse Source

siblings reconcilation in riak backend

Maxim Sokhatsky 10 years ago
parent
commit
b9b991ec04
2 changed files with 36 additions and 46 deletions
  1. 2 1
      src/kvs.erl
  2. 34 45
      src/store_riak.erl

+ 2 - 1
src/kvs.erl

@@ -76,7 +76,8 @@ add(Record) when is_tuple(Record) ->
                             list_to_tuple([CName|proplists:get_value(CName, kvs:containers())]), Cid),
                     NC1 = setelement(#container.entries_count, NC, 0),
 
-                    kvs:put(NC1),NC1;
+                    kvs:put(NC1),
+                    NC1;
 
                 _ -> error end,
 

+ 34 - 45
src/store_riak.erl

@@ -14,8 +14,8 @@
 start() -> ok.
 stop() -> ok.
 version() -> {version,"KVS RIAK 2.0.2"}.
-join() -> initialize(), ok.
-join(_) -> initialize(), ok.
+join() -> ok.
+join(Ring) -> riak_core:join(Ring).
 initialize() -> riak:client_connect(node()).
 
 dir() ->
@@ -23,11 +23,13 @@ dir() ->
     [{table,binary_to_list(X)}||X<-Buckets].
 
 riak_clean(Table) when is_list(Table)->
-    {ok,Keys}=riak_client:list_keys(erlang:list_to_binary(Table)),
-    [ riak_client:delete(erlang:list_to_binary(Table),Key) || Key <- Keys];
+    {ok,C}=riak:local_client(),
+    {ok,Keys}=C:list_keys(erlang:list_to_binary(Table)),
+    [ C:delete(erlang:list_to_binary(Table),Key) || Key <- Keys];
 riak_clean(Table) ->
+    {ok,C}=riak:local_client(),
     [TableStr] = io_lib:format("~p",[Table]),
-    {ok,Keys}=riak_client:list_keys(erlang:list_to_binary(TableStr)),
+    {ok,Keys}=C:list_keys(erlang:list_to_binary(TableStr)),
     [ kvs:delete(Table,key_to_bin(Key)) || Key <- Keys].
 
 make_object(T) ->
@@ -37,7 +39,7 @@ make_object(T) ->
     Indices = make_indices(T),
     Meta = dict:store(<<"index">>, Indices, dict:new()),
     Obj2 = riak_object:update_metadata(Obj1, Meta),
-    error_logger:info_msg("RIAK PUT IDX ~p",[Indices]),
+    kvs:info(?MODULE,"RIAK PUT IDX ~p",[Indices]),
     Obj2.
 
 make_indices(#subscription{who=Who, whom=Whom}) -> [
@@ -48,6 +50,9 @@ make_indices(#user{id=UId,zone=Zone}) -> [
     {<<"user_bin">>, key_to_bin(UId)},
     {<<"zone_bin">>, key_to_bin(Zone)}];
 
+make_indices(#feed{id=UId}) -> [
+    {<<"feed_bin">>, key_to_bin(UId)}];
+
 make_indices(#comment{id={CID,EID},from=Who}) -> [
     {<<"comment_bin">>, key_to_bin({CID,EID})},
     {<<"author_bin">>, key_to_bin(Who)}];
@@ -67,30 +72,15 @@ put(Record) -> riak_put(Record).
 
 riak_put(Record) ->
     {ok,C}=riak:local_client(),
+    Bucket = key_to_bin(element(1,Record)),
+    Key = key_to_bin(element(2,Record)),
     Object = make_object(Record),
-    Result = riak_client:put(Object,C),
-    Result.
-
-put_if_none_match(Record) ->
-    Object = make_object(Record),
-    case riak_client:put(Object, [if_none_match]) of
-        ok -> ok;
-        Error -> Error end.
-
-update(Record, Object) ->
-    NewObject = make_object(Record),
-    NewKey = riak_object:key(NewObject),
-    case riak_object:key(Object) of
-        NewKey ->
-            MetaInfo = riak_object:get_update_metatdata(NewObject),
-            UpdObject2 = riak_object:update_value(Object, Record),
-            UpdObject3 = riak_object:update_metadata(UpdObject2, MetaInfo),
-            case riak_client:put(UpdObject3, [if_not_modified]) of
-                ok -> ok;
-                Error -> Error
-            end;
-        _ -> {error, keys_not_equal}
-    end.
+    RiakAnswer = C:get(Bucket,Key),
+    case RiakAnswer of
+         {ok,O} ->
+              Obj = riak_object:update_value(riak_object:reconcile([O],false), Record),
+              C:put(Obj);
+         _ -> C:put(Object) end.
 
 get(Tab, Key) ->
     Bucket = key_to_bin(Tab),
@@ -99,16 +89,14 @@ get(Tab, Key) ->
 
 riak_get(Bucket,Key) ->
     {ok,C} = riak:local_client(),
-    RiakAnswer = riak_client:get(Bucket,Key,C),
+    RiakAnswer = C:get(Bucket,Key),
     case RiakAnswer of
-        {ok, O} -> {ok, riak_object:get_value(O)};
+        {ok, O} ->
+            % kvs:info(?MODULE,"Value Count: ~p~n",[riak_object:value_count(O)]),
+            {ok,riak_object:get_value(riak_object:reconcile([O],false))};
+        {error, notfound} -> {error, not_found};
         X -> X end.
 
-get_for_update(Tab, Key) ->
-    case riak_client:get(key_to_bin(Tab), key_to_bin(Key)) of
-        {ok, O} -> {ok, riak_object:get_value(O), O};
-        Error -> Error end.
-
 delete(Tab, Key) ->
     {ok,C}=riak:local_client(),
     Bucket = key_to_bin(Tab),
@@ -116,9 +104,10 @@ delete(Tab, Key) ->
     C:delete(Bucket, IntKey).
 
 delete_by_index(Tab, IndexId, IndexVal) ->
+    {ok,C}=riak:local_client(),
     Bucket = key_to_bin(Tab),
     {ok, Keys} = riak_client:get_index(Bucket, {eq, IndexId, key_to_bin(IndexVal)}),
-    [riak_client:delete(Bucket, Key) || Key <- Keys].
+    [C:delete(Bucket, Key) || Key <- Keys].
 
 key_to_bin(Key) ->
     if is_integer(Key) -> erlang:list_to_binary(integer_to_list(Key));
@@ -131,30 +120,30 @@ all(RecordName) ->
     {ok,C}=riak:local_client(),
     RecordBin = key_to_bin(RecordName),
     {ok,Keys} = C:list_keys(RecordBin),
-    io:format("RecordBin: ~p~n",[RecordBin]),
-    io:format("Keys: ~p~n",[Keys]),
     Results = [ riak_get_raw({RecordBin, Key, C}) || Key <- Keys ],
     [ Object || Object <- Results, Object =/= failure ].
 
 all_by_index(Tab, IndexId, IndexVal) ->
+    {ok,C}=riak:local_client(),
     Bucket = key_to_bin(Tab),
-    {ok, Keys} = riak_client:get_index(Bucket, {eq, IndexId, key_to_bin(IndexVal)}),
+    {ok, Keys} = C:get_index(Bucket, {eq, IndexId, key_to_bin(IndexVal)}),
     lists:foldl(fun(Key, Acc) ->
-        case riak_client:get(Bucket, Key, []) of
-            {ok, O} -> [riak_object:get_value(O) | Acc];
+        case C:get(Bucket, Key) of
+            {ok, O} -> {ok,riak_object:get_value(riak_object:reconcile([O],false))};
             {error, notfound} -> Acc end end, [], Keys).
 
 riak_get_raw({RecordBin, Key, C}) ->
     case C:get(RecordBin, Key) of
-        {ok,O} -> riak_object:get_value(O);
+        {ok, O} -> riak_object:get_value(riak_object:reconcile([O],false));
         _ -> failure end.
 
 next_id(CounterId) -> next_id(CounterId, 1).
 next_id(CounterId, Incr) -> next_id(CounterId, 0, Incr).
 next_id(CounterId, Default, Incr) ->
+    {ok,C}=riak:local_client(),
     CounterBin = key_to_bin(CounterId),
     {Object, Value, Options} =
-        case riak_client:get(key_to_bin(id_seq), CounterBin, []) of
+        case C:get(key_to_bin(id_seq), CounterBin) of
             {ok, CurObj} ->
                 R = #id_seq{id = CurVal} = riak_object:get_value(CurObj),
                 NewVal = CurVal + Incr,
@@ -164,7 +153,7 @@ next_id(CounterId, Default, Incr) ->
                 NewVal = Default + Incr,
                 Obj = riak_object:new(key_to_bin(id_seq), CounterBin, #id_seq{thing = CounterId, id = NewVal}),
                 {Obj, NewVal, [if_none_match]} end,
-    case riak_client:put(Object, Options) of
+    case C:put(Object) of
         ok -> Value;
         {error, _} -> next_id(CounterId, Incr) end.