store_riak.erl 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. -module(store_riak).
  2. -author('Maxim Sokhatsky <maxim@synrc.com>').
  3. -copyright('Synrc Research Center s.r.o.').
  4. -include_lib("kvs/include/config.hrl").
  5. -include_lib("kvs/include/users.hrl").
  6. -include_lib("kvs/include/groups.hrl").
  7. -include_lib("kvs/include/feeds.hrl").
  8. -include_lib("kvs/include/acls.hrl").
  9. -include_lib("kvs/include/invites.hrl").
  10. -include_lib("kvs/include/meetings.hrl").
  11. -include_lib("kvs/include/membership.hrl").
  12. -include_lib("kvs/include/payments.hrl").
  13. -include_lib("kvs/include/purchases.hrl").
  14. -include_lib("kvs/include/accounts.hrl").
  15. -include_lib("stdlib/include/qlc.hrl").
  16. -compile(export_all).
  17. start() -> ok.
  18. stop() -> ok.
  19. version() -> {version,"KVS RIAK 1.3.2-voxoz"}.
  20. join() -> initialize(), ok.
  21. join(Node) -> initialize(), ok.
  22. initialize() ->
  23. C = riak:client_connect(node()),
  24. ets:new(config, [named_table,{keypos,#config.key}]),
  25. ets:insert(config, #config{ key = "riak_client", value = C}),
  26. ok.
  27. dir() ->
  28. C = riak_client(),
  29. {ok,Buckets} = C:list_buckets(),
  30. [{table,binary_to_list(X)}||X<-Buckets].
  31. riak_clean(Table) when is_list(Table)->
  32. C = riak_client(),
  33. {ok,Keys}=C:list_keys(erlang:list_to_binary(Table)),
  34. [ C:delete(erlang:list_to_binary(Table),Key) || Key <- Keys];
  35. riak_clean(Table) ->
  36. C = riak_client(),
  37. [TableStr] = io_lib:format("~p",[Table]),
  38. {ok,Keys}=C:list_keys(erlang:list_to_binary(TableStr)),
  39. [ kvs:delete(Table,key_to_bin(Key)) || Key <- Keys].
  40. make_object(T) ->
  41. Bucket = element(1,T),
  42. Key = element(2,T),
  43. Obj1 = riak_object:new(key_to_bin(Bucket), key_to_bin(Key), T),
  44. Indices = make_indices(T),
  45. Meta = dict:store(<<"index">>, Indices, dict:new()),
  46. Obj2 = riak_object:update_metadata(Obj1, Meta),
  47. error_logger:info_msg("RIAK PUT IDX ~p",[Indices]),
  48. Obj2.
  49. make_indices(#subscription{who=Who, whom=Whom}) -> [
  50. {<<"who_bin">>, key_to_bin(Who)},
  51. {<<"whom_bin">>, key_to_bin(Whom)}];
  52. make_indices(#group_subscription{who=UId, where=GId}) -> [
  53. {<<"who_bin">>, key_to_bin(UId)},
  54. {<<"where_bin">>, key_to_bin(GId)}];
  55. make_indices(#user{username=UId,zone=Zone}) -> [
  56. {<<"user_bin">>, key_to_bin(UId)},
  57. {<<"zone_bin">>, key_to_bin(Zone)}];
  58. make_indices(#user_product{username=UId,product_id=PId}) -> [
  59. {<<"user_bin">>, key_to_bin(UId)},
  60. {<<"product_bin">>, key_to_bin(PId)}];
  61. make_indices(#payment{id=Id,user_id=UId}) -> [
  62. {<<"payment_bin">>, key_to_bin(Id)},
  63. {<<"user_bin">>, key_to_bin(UId)}];
  64. make_indices(#comment{id={CID,EID},from=Who}) -> [
  65. {<<"comment_bin">>, key_to_bin({CID,EID})},
  66. {<<"author_bin">>, key_to_bin(Who)}];
  67. make_indices(#entry{id={EID,FID},entry_id=EntryId,feed_id=Feed,from=From,to=To}) -> [
  68. {<<"entry_feed_bin">>, key_to_bin({EID,FID})},
  69. {<<"entry_bin">>, key_to_bin(EntryId)},
  70. {<<"from_bin">>, key_to_bin(From)},
  71. {<<"to_bin">>, key_to_bin(To)},
  72. {<<"feed_bin">>, key_to_bin(Feed)}];
  73. make_indices(Record) -> [
  74. {key_to_bin(atom_to_list(element(1,Record))++"_bin"),key_to_bin(element(2,Record))}].
  75. riak_client() -> [{_,_,{_,C}}] = ets:lookup(config, "riak_client"), C.
  76. put(Records) when is_list(Records) -> lists:foreach(fun riak_put/1, Records);
  77. put(Record) -> riak_put(Record).
  78. riak_put(Record) ->
  79. Object = make_object(Record),
  80. Riak = riak_client(),
  81. Result = Riak:put(Object),
  82. post_write_hooks(Record, Riak),
  83. Result.
  84. put_if_none_match(Record) ->
  85. Object = make_object(Record),
  86. Riak = riak_client(),
  87. case Riak:put(Object, [if_none_match]) of
  88. ok -> post_write_hooks(Record, Riak), ok;
  89. Error -> Error end.
  90. update(Record, Object) ->
  91. NewObject = make_object(Record),
  92. NewKey = riak_object:key(NewObject),
  93. case riak_object:key(Object) of
  94. NewKey ->
  95. MetaInfo = riak_object:get_update_metatdata(NewObject),
  96. UpdObject2 = riak_object:update_value(Object, Record),
  97. UpdObject3 = riak_object:update_metadata(UpdObject2, MetaInfo),
  98. Riak = riak_client(),
  99. case Riak:put(UpdObject3, [if_not_modified]) of
  100. ok -> post_write_hooks(Record, Riak), ok;
  101. Error -> Error
  102. end;
  103. _ -> {error, keys_not_equal}
  104. end.
  105. post_write_hooks(R,C) ->
  106. case element(1,R) of
  107. user -> case R#user.email of
  108. undefined -> nothing;
  109. _ -> C:put(make_object({email, R#user.username, R#user.email})) end,
  110. case R#user.facebook_id of
  111. undefined -> nothing;
  112. _ -> C:put(make_object({facebook, R#user.username, R#user.facebook_id})) end,
  113. case R#user.googleplus_id of
  114. undefined -> nothing;
  115. _ -> C:put(make_object({googleplus, R#user.username, R#user.googleplus_id})) end,
  116. case R#user.twitter_id of
  117. undefined -> nothing;
  118. _ -> C:put(make_object({twitter, R#user.username, R#user.twitter_id})) end,
  119. case R#user.github_id of
  120. undefined -> nothing;
  121. _ -> C:put(make_object({github, R#user.username, R#user.github_id})) end;
  122. _ -> continue end.
  123. get(Tab, Key) ->
  124. Bucket = key_to_bin(Tab),
  125. IntKey = key_to_bin(Key),
  126. riak_get(Bucket, IntKey).
  127. riak_get(Bucket,Key) ->
  128. C = riak_client(),
  129. RiakAnswer = C:get(Bucket,Key),
  130. case RiakAnswer of
  131. {ok, O} -> {ok, riak_object:get_value(O)};
  132. X -> X end.
  133. get_for_update(Tab, Key) ->
  134. C = riak_client(),
  135. case C:get(key_to_bin(Tab), key_to_bin(Key)) of
  136. {ok, O} -> {ok, riak_object:get_value(O), O};
  137. Error -> Error end.
  138. delete(Tab, Key) ->
  139. C = riak_client(),
  140. Bucket = key_to_bin(Tab),
  141. IntKey = key_to_bin(Key),
  142. C:delete(Bucket, IntKey).
  143. delete_by_index(Tab, IndexId, IndexVal) ->
  144. Riak = riak_client(),
  145. Bucket = key_to_bin(Tab),
  146. {ok, Keys} = Riak:get_index(Bucket, {eq, IndexId, key_to_bin(IndexVal)}),
  147. [Riak:delete(Bucket, Key) || Key <- Keys].
  148. key_to_bin(Key) ->
  149. if is_integer(Key) -> erlang:list_to_binary(integer_to_list(Key));
  150. is_list(Key) -> erlang:list_to_binary(Key);
  151. is_atom(Key) -> erlang:list_to_binary(erlang:atom_to_list(Key));
  152. is_binary(Key) -> Key;
  153. true -> [ListKey] = io_lib:format("~p", [Key]), erlang:list_to_binary(ListKey) end.
  154. all(RecordName) ->
  155. Riak = riak_client(),
  156. RecordBin = key_to_bin(RecordName),
  157. {ok,Keys} = Riak:list_keys(RecordBin),
  158. Results = [ riak_get_raw({RecordBin, Key, Riak}) || Key <- Keys ],
  159. [ Object || Object <- Results, Object =/= failure ].
  160. all_by_index(Tab, IndexId, IndexVal) ->
  161. Riak = riak_client(),
  162. Bucket = key_to_bin(Tab),
  163. {ok, Keys} = Riak:get_index(Bucket, {eq, IndexId, key_to_bin(IndexVal)}),
  164. lists:foldl(fun(Key, Acc) ->
  165. case Riak:get(Bucket, Key, []) of
  166. {ok, O} -> [riak_object:get_value(O) | Acc];
  167. {error, notfound} -> Acc end end, [], Keys).
  168. riak_get_raw({RecordBin, Key, Riak}) ->
  169. case Riak:get(RecordBin, Key) of
  170. {ok,O} -> riak_object:get_value(O);
  171. X -> failure end.
  172. next_id(CounterId) -> next_id(CounterId, 1).
  173. next_id(CounterId, Incr) -> next_id(CounterId, 0, Incr).
  174. next_id(CounterId, Default, Incr) ->
  175. Riak = riak_client(),
  176. CounterBin = key_to_bin(CounterId),
  177. {Object, Value, Options} =
  178. case Riak:get(key_to_bin(id_seq), CounterBin, []) of
  179. {ok, CurObj} ->
  180. R = #id_seq{id = CurVal} = riak_object:get_value(CurObj),
  181. NewVal = CurVal + Incr,
  182. Obj = riak_object:update_value(CurObj, R#id_seq{id = NewVal}),
  183. {Obj, NewVal, [if_not_modified]};
  184. {error, notfound} ->
  185. NewVal = Default + Incr,
  186. Obj = riak_object:new(key_to_bin(id_seq), CounterBin, #id_seq{thing = CounterId, id = NewVal}),
  187. {Obj, NewVal, [if_none_match]} end,
  188. case Riak:put(Object, Options) of
  189. ok -> Value;
  190. {error, _} -> next_id(CounterId, Incr) end.
  191. % index funs
  192. products(UId) -> all_by_index(user_product, <<"user_bin">>, list_to_binary(UId)).
  193. subscriptions(UId) -> all_by_index(subsciption, <<"subs_who_bin">>, list_to_binary(UId)).
  194. subscribed(Who) -> all_by_index(subscription, <<"subs_whom_bin">>, list_to_binary(Who)).
  195. participate(UserName) -> all_by_index(group_subscription, <<"who_bin">>, UserName).
  196. members(GroupName) -> all_by_index(group_subscription, <<"where_bin">>, GroupName).
  197. user_tournaments(UId) -> all_by_index(play_record, <<"play_record_who_bin">>, list_to_binary(UId)).
  198. tournament_users(TId) -> all_by_index(play_record, <<"play_record_tournament_bin">>, list_to_binary(integer_to_list(TId))).
  199. author_comments(Who) ->
  200. EIDs = [E || #comment{entry_id=E} <- all_by_index(comment,<<"author_bin">>, Who) ],
  201. lists:flatten([ all_by_index(entry,<<"entry_bin">>,EID) || EID <- EIDs]).