store_riak.erl 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. -module(store_riak).
  2. -author('Maxim Sokhatsky <maxim@synrc.com>').
  3. -copyright('Synrc Research Center s.r.o.').
  4. -include_lib("kvs/include/config.hrl").
  5. -include_lib("kvs/include/users.hrl").
  6. -include_lib("kvs/include/groups.hrl").
  7. -include_lib("kvs/include/feeds.hrl").
  8. -include_lib("kvs/include/acls.hrl").
  9. -include_lib("kvs/include/invites.hrl").
  10. -include_lib("kvs/include/meetings.hrl").
  11. -include_lib("kvs/include/membership_packages.hrl").
  12. -include_lib("kvs/include/accounts.hrl").
  13. -include_lib("kvs/include/log.hrl").
  14. -include_lib("stdlib/include/qlc.hrl").
  15. -compile(export_all).
  16. delete() -> ok.
  17. start() -> ok.
  18. stop() -> stopped.
  19. initialize() ->
  20. C = riak:client_connect(node()),
  21. ets:new(config, [named_table,{keypos,#config.key}]),
  22. ets:insert(config, #config{ key = "riak_client", value = C}),
  23. ok.
  24. init_indexes() ->
  25. C = riak_client(),
  26. C:set_bucket(key_to_bin(id_seq), [{backend, leveldb_backend}]),
  27. C:set_bucket(key_to_bin(subscription), [{backend, leveldb_backend}]),
  28. C:set_bucket(key_to_bin(user), [{backend, leveldb_backend}]),
  29. C:set_bucket(key_to_bin(group), [{backend, leveldb_backend}]),
  30. C:set_bucket(key_to_bin(translation), [{backend, leveldb_backend}]),
  31. C:set_bucket(key_to_bin(group_subscription), [{backend, leveldb_backend}]),
  32. C:set_bucket(key_to_bin(user_bought_gifts), [{backend, leveldb_backend}]),
  33. C:set_bucket(key_to_bin(play_record), [{backend, leveldb_backend}]),
  34. ok.
  35. dir() ->
  36. C = riak_client(),
  37. {ok,Buckets} = C:list_buckets(),
  38. [binary_to_list(X)||X<-Buckets].
  39. riak_clean(Table) when is_list(Table)->
  40. C = riak_client(),
  41. {ok,Keys}=C:list_keys(erlang:list_to_binary(Table)),
  42. [ C:delete(erlang:list_to_binary(Table),Key) || Key <- Keys];
  43. riak_clean(Table) ->
  44. C = riak_client(),
  45. [TableStr] = io_lib:format("~p",[Table]),
  46. {ok,Keys}=C:list_keys(erlang:list_to_binary(TableStr)),
  47. [ kvs:delete(Table,key_to_bin(Key)) || Key <- Keys].
  48. make_object(T) ->
  49. Bucket = element(1,T),
  50. Key = element(2,T),
  51. Obj1 = riak_object:new(key_to_bin(Bucket), key_to_bin(Key), T),
  52. Indices = make_indices(T),
  53. Meta = dict:store(<<"index">>, Indices, dict:new()),
  54. Obj2 = riak_object:update_metadata(Obj1, Meta),
  55. error_logger:info_msg("RIAK PUT IDX ~p",[Indices]),
  56. Obj2.
  57. make_indices(#subscription{who=Who, whom=Whom}) -> [{<<"subs_who_bin">>, key_to_bin(Who)}, {<<"subs_whom_bin">>, key_to_bin(Whom)}];
  58. make_indices(#group_subscription{user_id=UId, group_id=GId}) -> [{<<"group_subs_user_bin">>, key_to_bin(UId)}, {<<"group_subs_group_bin">>, key_to_bin(GId)}];
  59. make_indices(#user_bought_gifts{username=UId}) -> [{<<"user_bought_gifts_username_bin">>, key_to_bin(UId)}];
  60. make_indices(#user{username=UId,zone=Zone}) -> [{<<"user_bin">>, key_to_bin(UId)}];
  61. make_indices(Record) -> [{key_to_bin(atom_to_list(element(1,Record))++"_bin"),key_to_bin(element(2,Record))}].
  62. riak_client() -> [{_,_,{_,C}}] = ets:lookup(config, "riak_client"), C.
  63. put(Records) when is_list(Records) -> lists:foreach(fun riak_put/1, Records);
  64. put(Record) -> store_riak:put([Record]).
  65. riak_put(Record) ->
  66. Object = make_object(Record),
  67. Riak = riak_client(),
  68. Result = Riak:put(Object),
  69. error_logger:info_msg("RIAK PUT RES ~p",[Result]),
  70. post_write_hooks(Record, Riak),
  71. Result.
  72. put_if_none_match(Record) ->
  73. Object = make_object(Record),
  74. Riak = riak_client(),
  75. case Riak:put(Object, [if_none_match]) of
  76. ok ->
  77. post_write_hooks(Record, Riak),
  78. ok;
  79. Error ->
  80. Error
  81. end.
  82. update(Record, Object) ->
  83. NewObject = make_object(Record),
  84. NewKey = riak_object:key(NewObject),
  85. case riak_object:key(Object) of
  86. NewKey ->
  87. MetaInfo = riak_object:get_update_metatdata(NewObject),
  88. UpdObject2 = riak_object:update_value(Object, Record),
  89. UpdObject3 = riak_object:update_metadata(UpdObject2, MetaInfo),
  90. Riak = riak_client(),
  91. case Riak:put(UpdObject3, [if_not_modified]) of
  92. ok -> post_write_hooks(Record, Riak), ok;
  93. Error -> Error
  94. end;
  95. _ -> {error, keys_not_equal}
  96. end.
  97. post_write_hooks(R,C) ->
  98. case element(1,R) of
  99. user -> case R#user.email of
  100. undefined -> nothing;
  101. _ -> C:put(make_object({email, R#user.username, R#user.email})) end,
  102. case R#user.verification_code of
  103. undefined -> nothing;
  104. _ -> C:put(make_object({code, R#user.username, R#user.verification_code})) end,
  105. case R#user.facebook_id of
  106. undefined -> nothing;
  107. _ -> C:put(make_object({facebook, R#user.username, R#user.facebook_id})) end;
  108. _ -> continue
  109. end.
  110. get(Tab, Key) ->
  111. Bucket = key_to_bin(Tab),
  112. IntKey = key_to_bin(Key),
  113. riak_get(Bucket, IntKey).
  114. riak_get(Bucket,Key) ->
  115. C = riak_client(),
  116. RiakAnswer = C:get(Bucket,Key),
  117. case RiakAnswer of
  118. {ok, O} -> {ok,riak_object:get_value(O)};
  119. X -> X
  120. end.
  121. get_for_update(Tab, Key) ->
  122. C = riak_client(),
  123. case C:get(key_to_bin(Tab), key_to_bin(Key), [{last_write_wins,true},{allow_mult,false}]) of
  124. {ok, O} -> {ok, riak_object:get_value(O), O};
  125. Error -> Error
  126. end.
  127. get_word(Word) -> store_riak:get(ut_word,Word).
  128. get_translation({Lang, Word}) -> store_riak:get(ut_translation, Lang ++ "_" ++ Word).
  129. delete(Keys) when is_list(Keys) -> lists:foreach(fun mnesia:delete_object/1, Keys); % TODO
  130. delete(Keys) -> delete([Keys]).
  131. delete(Tab, Key) ->
  132. C = riak_client(),
  133. Bucket = key_to_bin(Tab),
  134. IntKey = key_to_bin(Key),
  135. C:delete(Bucket, IntKey).
  136. delete_by_index(Tab, IndexId, IndexVal) ->
  137. Riak = riak_client(),
  138. Bucket = key_to_bin(Tab),
  139. {ok, Keys} = Riak:get_index(Bucket, {eq, IndexId, key_to_bin(IndexVal)}),
  140. [Riak:delete(Bucket, Key) || Key <- Keys],
  141. ok.
  142. key_to_bin(Key) ->
  143. if is_integer(Key) -> erlang:list_to_binary(integer_to_list(Key));
  144. is_list(Key) -> erlang:list_to_binary(Key);
  145. is_atom(Key) -> erlang:list_to_binary(erlang:atom_to_list(Key));
  146. is_binary(Key) -> Key;
  147. true -> [ListKey] = io_lib:format("~p", [Key]), erlang:list_to_binary(ListKey)
  148. end.
  149. select(RecordName, Pred) when is_function(Pred) ->
  150. All = all(RecordName),
  151. lists:filter(Pred, All);
  152. select(RecordName, Select) when is_list(Select) ->
  153. Where = proplists:get_value(where, Select, fun(_)->true end),
  154. {Position, _Order} = proplists:get_value(order, Select, {1, descending}),
  155. Limit = proplists:get_value(limit, Select, all),
  156. Selected = select(RecordName, Where),
  157. Sorted = lists:keysort(Position, Selected),
  158. case Limit of
  159. all -> Sorted;
  160. {Offset, Amoumt} -> lists:sublist(Sorted, Offset, Amoumt) end.
  161. count(_RecordName) -> erlang:length(all(_RecordName)).
  162. all(RecordName) ->
  163. Riak = riak_client(),
  164. [RecordStr] = io_lib:format("~p",[RecordName]),
  165. RecordBin = erlang:list_to_binary(RecordStr),
  166. {ok,Keys} = Riak:list_keys(RecordBin),
  167. Results = [ get_record_from_table({RecordBin, Key, Riak}) || Key <- Keys ],
  168. [ Object || Object <- Results, Object =/= failure ].
  169. all_by_index(Tab, IndexId, IndexVal) ->
  170. Riak = riak_client(),
  171. Bucket = key_to_bin(Tab),
  172. {ok, Keys} = Riak:get_index(Bucket, {eq, IndexId, key_to_bin(IndexVal)}),
  173. F = fun(Key, Acc) ->
  174. case Riak:get(Bucket, Key, []) of
  175. {ok, O} -> [riak_object:get_value(O) | Acc];
  176. {error, notfound} -> Acc end end,
  177. lists:foldl(F, [], Keys).
  178. get_record_from_table({RecordBin, Key, Riak}) ->
  179. case Riak:get(RecordBin, Key) of
  180. {ok,O} -> riak_object:get_value(O);
  181. X -> failure end.
  182. next_id(CounterId) -> next_id(CounterId, 1).
  183. next_id(CounterId, Incr) -> next_id(CounterId, 0, Incr).
  184. next_id(CounterId, Default, Incr) ->
  185. Riak = riak_client(),
  186. CounterBin = key_to_bin(CounterId),
  187. {Object, Value, Options} =
  188. case Riak:get(key_to_bin(id_seq), CounterBin, []) of
  189. {ok, CurObj} ->
  190. R = #id_seq{id = CurVal} = riak_object:get_value(CurObj),
  191. NewVal = CurVal + Incr,
  192. Obj = riak_object:update_value(CurObj, R#id_seq{id = NewVal}),
  193. {Obj, NewVal, [if_not_modified]};
  194. {error, notfound} ->
  195. NewVal = Default + Incr,
  196. Obj = riak_object:new(key_to_bin(id_seq), CounterBin, #id_seq{thing = CounterId, id = NewVal}),
  197. {Obj, NewVal, [if_none_match]} end,
  198. case Riak:put(Object, Options) of
  199. ok -> Value;
  200. {error, _} -> next_id(CounterId, Incr) end.