123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 |
- -module(store_riak).
- -author('Maxim Sokhatsky <maxim@synrc.com>').
- -copyright('Synrc Research Center s.r.o.').
- -include_lib("kvs/include/config.hrl").
- -include_lib("kvs/include/users.hrl").
- -include_lib("kvs/include/groups.hrl").
- -include_lib("kvs/include/feeds.hrl").
- -include_lib("kvs/include/acls.hrl").
- -include_lib("kvs/include/invites.hrl").
- -include_lib("kvs/include/meetings.hrl").
- -include_lib("kvs/include/membership.hrl").
- -include_lib("kvs/include/payments.hrl").
- -include_lib("kvs/include/purchases.hrl").
- -include_lib("kvs/include/accounts.hrl").
- -include_lib("kvs/include/log.hrl").
- -include_lib("stdlib/include/qlc.hrl").
- -compile(export_all).
- start() -> ok.
- stop() -> ok.
- version() -> {version,"KVS RIAK 1.3.2-voxoz"}.
- initialize() ->
- C = riak:client_connect(node()),
- ets:new(config, [named_table,{keypos,#config.key}]),
- ets:insert(config, #config{ key = "riak_client", value = C}),
- ok.
- dir() ->
- C = riak_client(),
- {ok,Buckets} = C:list_buckets(),
- [{table,binary_to_list(X)}||X<-Buckets].
- riak_clean(Table) when is_list(Table)->
- C = riak_client(),
- {ok,Keys}=C:list_keys(erlang:list_to_binary(Table)),
- [ C:delete(erlang:list_to_binary(Table),Key) || Key <- Keys];
- riak_clean(Table) ->
- C = riak_client(),
- [TableStr] = io_lib:format("~p",[Table]),
- {ok,Keys}=C:list_keys(erlang:list_to_binary(TableStr)),
- [ kvs:delete(Table,key_to_bin(Key)) || Key <- Keys].
- make_object(T) ->
- Bucket = element(1,T),
- Key = element(2,T),
- Obj1 = riak_object:new(key_to_bin(Bucket), key_to_bin(Key), T),
- Indices = make_indices(T),
- Meta = dict:store(<<"index">>, Indices, dict:new()),
- Obj2 = riak_object:update_metadata(Obj1, Meta),
- error_logger:info_msg("RIAK PUT IDX ~p",[Indices]),
- Obj2.
- make_indices(#subscription{who=Who, whom=Whom}) -> [
- {<<"who_bin">>, key_to_bin(Who)},
- {<<"whom_bin">>, key_to_bin(Whom)}];
- make_indices(#group_subscription{who=UId, where=GId}) -> [
- {<<"who_bin">>, key_to_bin(UId)},
- {<<"where_bin">>, key_to_bin(GId)}];
- make_indices(#user{username=UId,zone=Zone}) -> [
- {<<"user_bin">>, key_to_bin(UId)},
- {<<"zone_bin">>, key_to_bin(Zone)}];
- make_indices(#user_product{username=UId,product_id=PId}) -> [
- {<<"user_bin">>, key_to_bin(UId)},
- {<<"product_bin">>, key_to_bin(PId)}];
- make_indices(#payment{id=Id,user_id=UId}) -> [
- {<<"payment_bin">>, key_to_bin(Id)},
- {<<"user_bin">>, key_to_bin(UId)}];
- make_indices(#comment{id={CID,EID},author_id=Who}) -> [
- {<<"comment_bin">>, key_to_bin({CID,EID})},
- {<<"author_bin">>, key_to_bin(Who)}];
- make_indices(#entry{id={EID,FID},entry_id=EntryId,feed_id=Feed,from=From,to=To}) -> [
- {<<"entry_feed_bin">>, key_to_bin({EID,FID})},
- {<<"entry_bin">>, key_to_bin(EntryId)},
- {<<"from_bin">>, key_to_bin(From)},
- {<<"to_bin">>, key_to_bin(To)},
- {<<"feed_bin">>, key_to_bin(Feed)}];
- make_indices(Record) -> [
- {key_to_bin(atom_to_list(element(1,Record))++"_bin"),key_to_bin(element(2,Record))}].
- riak_client() -> [{_,_,{_,C}}] = ets:lookup(config, "riak_client"), C.
- put(Records) when is_list(Records) -> lists:foreach(fun riak_put/1, Records);
- put(Record) -> riak_put(Record).
- riak_put(Record) ->
- Object = make_object(Record),
- Riak = riak_client(),
- Result = Riak:put(Object),
- post_write_hooks(Record, Riak),
- Result.
- put_if_none_match(Record) ->
- Object = make_object(Record),
- Riak = riak_client(),
- case Riak:put(Object, [if_none_match]) of
- ok -> post_write_hooks(Record, Riak), ok;
- Error -> Error end.
- update(Record, Object) ->
- NewObject = make_object(Record),
- NewKey = riak_object:key(NewObject),
- case riak_object:key(Object) of
- NewKey ->
- MetaInfo = riak_object:get_update_metatdata(NewObject),
- UpdObject2 = riak_object:update_value(Object, Record),
- UpdObject3 = riak_object:update_metadata(UpdObject2, MetaInfo),
- Riak = riak_client(),
- case Riak:put(UpdObject3, [if_not_modified]) of
- ok -> post_write_hooks(Record, Riak), ok;
- Error -> Error
- end;
- _ -> {error, keys_not_equal}
- end.
- post_write_hooks(R,C) ->
- case element(1,R) of
- user -> case R#user.email of
- undefined -> nothing;
- _ -> C:put(make_object({email, R#user.username, R#user.email})) end,
- case R#user.verification_code of
- undefined -> nothing;
- _ -> C:put(make_object({code, R#user.username, R#user.verification_code})) end,
- case R#user.facebook_id of
- undefined -> nothing;
- _ -> C:put(make_object({facebook, R#user.username, R#user.facebook_id})) end;
- _ -> continue end.
- get(Tab, Key) ->
- Bucket = key_to_bin(Tab),
- IntKey = key_to_bin(Key),
- riak_get(Bucket, IntKey).
- riak_get(Bucket,Key) ->
- C = riak_client(),
- RiakAnswer = C:get(Bucket,Key),
- case RiakAnswer of
- {ok, O} -> {ok, riak_object:get_value(O)};
- X -> X end.
- get_for_update(Tab, Key) ->
- C = riak_client(),
- case C:get(key_to_bin(Tab), key_to_bin(Key)) of
- {ok, O} -> {ok, riak_object:get_value(O), O};
- Error -> Error end.
- delete(Tab, Key) ->
- C = riak_client(),
- Bucket = key_to_bin(Tab),
- IntKey = key_to_bin(Key),
- C:delete(Bucket, IntKey).
- delete_by_index(Tab, IndexId, IndexVal) ->
- Riak = riak_client(),
- Bucket = key_to_bin(Tab),
- {ok, Keys} = Riak:get_index(Bucket, {eq, IndexId, key_to_bin(IndexVal)}),
- [Riak:delete(Bucket, Key) || Key <- Keys],
- ok.
- key_to_bin(Key) ->
- if is_integer(Key) -> erlang:list_to_binary(integer_to_list(Key));
- is_list(Key) -> erlang:list_to_binary(Key);
- is_atom(Key) -> erlang:list_to_binary(erlang:atom_to_list(Key));
- is_binary(Key) -> Key;
- true -> [ListKey] = io_lib:format("~p", [Key]), erlang:list_to_binary(ListKey)
- end.
- all(RecordName) ->
- Riak = riak_client(),
- RecordBin = key_to_bin(RecordName),
- {ok,Keys} = Riak:list_keys(RecordBin),
- Results = [ riak_get_raw({RecordBin, Key, Riak}) || Key <- Keys ],
- [ Object || Object <- Results, Object =/= failure ].
- all_by_index(Tab, IndexId, IndexVal) ->
- Riak = riak_client(),
- Bucket = key_to_bin(Tab),
- {ok, Keys} = Riak:get_index(Bucket, {eq, IndexId, key_to_bin(IndexVal)}),
- F = fun(Key, Acc) ->
- case Riak:get(Bucket, Key, []) of
- {ok, O} -> [riak_object:get_value(O) | Acc];
- {error, notfound} -> Acc end end,
- lists:foldl(F, [], Keys).
- riak_get_raw({RecordBin, Key, Riak}) ->
- case Riak:get(RecordBin, Key) of
- {ok,O} -> riak_object:get_value(O);
- X -> failure end.
- next_id(CounterId) -> next_id(CounterId, 1).
- next_id(CounterId, Incr) -> next_id(CounterId, 0, Incr).
- next_id(CounterId, Default, Incr) ->
- Riak = riak_client(),
- CounterBin = key_to_bin(CounterId),
- {Object, Value, Options} =
- case Riak:get(key_to_bin(id_seq), CounterBin, []) of
- {ok, CurObj} ->
- R = #id_seq{id = CurVal} = riak_object:get_value(CurObj),
- NewVal = CurVal + Incr,
- Obj = riak_object:update_value(CurObj, R#id_seq{id = NewVal}),
- {Obj, NewVal, [if_not_modified]};
- {error, notfound} ->
- NewVal = Default + Incr,
- Obj = riak_object:new(key_to_bin(id_seq), CounterBin, #id_seq{thing = CounterId, id = NewVal}),
- {Obj, NewVal, [if_none_match]} end,
- case Riak:put(Object, Options) of
- ok -> Value;
- {error, _} -> next_id(CounterId, Incr) end.
|