store_riak.erl 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. -module(store_riak).
  2. -author('Maxim Sokhatsky <maxim@synrc.com>').
  3. -copyright('Synrc Research Center s.r.o.').
  4. -include("config.hrl").
  5. -include("user.hrl").
  6. -include("subscription.hrl").
  7. -include("group.hrl").
  8. -include("comment.hrl").
  9. -include("entry.hrl").
  10. -include("feed.hrl").
  11. -include("acl.hrl").
  12. -compile(export_all).
  13. start() -> ok.
  14. stop() -> ok.
  15. version() -> {version,"KVS RIAK 2.0.2"}.
  16. join() -> initialize(), ok.
  17. join(_) -> initialize(), ok.
  18. initialize() -> riak:client_connect(node()).
  19. dir() ->
  20. {ok,Buckets} = riak_client:list_buckets(),
  21. [{table,binary_to_list(X)}||X<-Buckets].
  22. riak_clean(Table) when is_list(Table)->
  23. {ok,Keys}=riak_client:list_keys(erlang:list_to_binary(Table)),
  24. [ riak_client:delete(erlang:list_to_binary(Table),Key) || Key <- Keys];
  25. riak_clean(Table) ->
  26. [TableStr] = io_lib:format("~p",[Table]),
  27. {ok,Keys}=riak_client:list_keys(erlang:list_to_binary(TableStr)),
  28. [ kvs:delete(Table,key_to_bin(Key)) || Key <- Keys].
  29. make_object(T) ->
  30. Bucket = element(1,T),
  31. Key = element(2,T),
  32. Obj1 = riak_object:new(key_to_bin(Bucket), key_to_bin(Key), T),
  33. Indices = make_indices(T),
  34. Meta = dict:store(<<"index">>, Indices, dict:new()),
  35. Obj2 = riak_object:update_metadata(Obj1, Meta),
  36. error_logger:info_msg("RIAK PUT IDX ~p",[Indices]),
  37. Obj2.
  38. make_indices(#subscription{who=Who, whom=Whom}) -> [
  39. {<<"who_bin">>, key_to_bin(Who)},
  40. {<<"whom_bin">>, key_to_bin(Whom)}];
  41. make_indices(#user{id=UId,zone=Zone}) -> [
  42. {<<"user_bin">>, key_to_bin(UId)},
  43. {<<"zone_bin">>, key_to_bin(Zone)}];
  44. make_indices(#comment{id={CID,EID},from=Who}) -> [
  45. {<<"comment_bin">>, key_to_bin({CID,EID})},
  46. {<<"author_bin">>, key_to_bin(Who)}];
  47. make_indices(#entry{id={EID,FID},entry_id=EntryId,feed_id=Feed,from=From,to=To}) -> [
  48. {<<"entry_feed_bin">>, key_to_bin({EID,FID})},
  49. {<<"entry_bin">>, key_to_bin(EntryId)},
  50. {<<"from_bin">>, key_to_bin(From)},
  51. {<<"to_bin">>, key_to_bin(To)},
  52. {<<"feed_bin">>, key_to_bin(Feed)}];
  53. make_indices(Record) -> [
  54. {key_to_bin(atom_to_list(element(1,Record))++"_bin"),key_to_bin(element(2,Record))}].
  55. put(Records) when is_list(Records) -> lists:foreach(fun riak_put/1, Records);
  56. put(Record) -> riak_put(Record).
  57. riak_put(Record) ->
  58. {ok,C}=riak:local_client(),
  59. Object = make_object(Record),
  60. Result = riak_client:put(Object,C),
  61. Result.
  62. put_if_none_match(Record) ->
  63. Object = make_object(Record),
  64. case riak_client:put(Object, [if_none_match]) of
  65. ok -> ok;
  66. Error -> Error end.
  67. update(Record, Object) ->
  68. NewObject = make_object(Record),
  69. NewKey = riak_object:key(NewObject),
  70. case riak_object:key(Object) of
  71. NewKey ->
  72. MetaInfo = riak_object:get_update_metatdata(NewObject),
  73. UpdObject2 = riak_object:update_value(Object, Record),
  74. UpdObject3 = riak_object:update_metadata(UpdObject2, MetaInfo),
  75. case riak_client:put(UpdObject3, [if_not_modified]) of
  76. ok -> ok;
  77. Error -> Error
  78. end;
  79. _ -> {error, keys_not_equal}
  80. end.
  81. get(Tab, Key) ->
  82. Bucket = key_to_bin(Tab),
  83. IntKey = key_to_bin(Key),
  84. riak_get(Bucket, IntKey).
  85. riak_get(Bucket,Key) ->
  86. {ok,C} = riak:local_client(),
  87. RiakAnswer = riak_client:get(Bucket,Key,C),
  88. case RiakAnswer of
  89. {ok, O} -> {ok, riak_object:get_value(O)};
  90. X -> X end.
  91. get_for_update(Tab, Key) ->
  92. case riak_client:get(key_to_bin(Tab), key_to_bin(Key)) of
  93. {ok, O} -> {ok, riak_object:get_value(O), O};
  94. Error -> Error end.
  95. delete(Tab, Key) ->
  96. Bucket = key_to_bin(Tab),
  97. IntKey = key_to_bin(Key),
  98. riak_client:delete(Bucket, IntKey).
  99. delete_by_index(Tab, IndexId, IndexVal) ->
  100. Bucket = key_to_bin(Tab),
  101. {ok, Keys} = riak_client:get_index(Bucket, {eq, IndexId, key_to_bin(IndexVal)}),
  102. [riak_client:delete(Bucket, Key) || Key <- Keys].
  103. key_to_bin(Key) ->
  104. if is_integer(Key) -> erlang:list_to_binary(integer_to_list(Key));
  105. is_list(Key) -> erlang:list_to_binary(Key);
  106. is_atom(Key) -> erlang:list_to_binary(erlang:atom_to_list(Key));
  107. is_binary(Key) -> Key;
  108. true -> [ListKey] = io_lib:format("~p", [Key]), erlang:list_to_binary(ListKey) end.
  109. all(RecordName) ->
  110. RecordBin = key_to_bin(RecordName),
  111. {ok,Keys} = riak_client:list_keys(RecordBin),
  112. Results = [ riak_get_raw({RecordBin, Key, riak_client}) || Key <- Keys ],
  113. [ Object || Object <- Results, Object =/= failure ].
  114. all_by_index(Tab, IndexId, IndexVal) ->
  115. Bucket = key_to_bin(Tab),
  116. {ok, Keys} = riak_client:get_index(Bucket, {eq, IndexId, key_to_bin(IndexVal)}),
  117. lists:foldl(fun(Key, Acc) ->
  118. case riak_client:get(Bucket, Key, []) of
  119. {ok, O} -> [riak_object:get_value(O) | Acc];
  120. {error, notfound} -> Acc end end, [], Keys).
  121. riak_get_raw({RecordBin, Key, Riak}) ->
  122. case Riak:get(RecordBin, Key) of
  123. {ok,O} -> riak_object:get_value(O);
  124. _ -> failure end.
  125. next_id(CounterId) -> next_id(CounterId, 1).
  126. next_id(CounterId, Incr) -> next_id(CounterId, 0, Incr).
  127. next_id(CounterId, Default, Incr) ->
  128. CounterBin = key_to_bin(CounterId),
  129. {Object, Value, Options} =
  130. case riak_client:get(key_to_bin(id_seq), CounterBin, []) of
  131. {ok, CurObj} ->
  132. R = #id_seq{id = CurVal} = riak_object:get_value(CurObj),
  133. NewVal = CurVal + Incr,
  134. Obj = riak_object:update_value(CurObj, R#id_seq{id = NewVal}),
  135. {Obj, NewVal, [if_not_modified]};
  136. {error, notfound} ->
  137. NewVal = Default + Incr,
  138. Obj = riak_object:new(key_to_bin(id_seq), CounterBin, #id_seq{thing = CounterId, id = NewVal}),
  139. {Obj, NewVal, [if_none_match]} end,
  140. case riak_client:put(Object, Options) of
  141. ok -> Value;
  142. {error, _} -> next_id(CounterId, Incr) end.
  143. % index funs
  144. subscriptions(UId) -> all_by_index(subsciption, <<"subs_who_bin">>, list_to_binary(UId)).
  145. subscribed(Who) -> all_by_index(subscription, <<"subs_whom_bin">>, list_to_binary(Who)).
  146. author_comments(Who) ->
  147. EIDs = [E || #comment{entry_id=E} <- all_by_index(comment,<<"author_bin">>, Who) ],
  148. lists:flatten([ all_by_index(entry,<<"entry_bin">>,EID) || EID <- EIDs]).