store_riak.erl 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691
  1. -module(store_riak).
  2. -author('Maxim Sokhatsky <maxim@synrc.com>').
  3. -include_lib("kvs/include/config.hrl").
  4. -include_lib("kvs/include/users.hrl").
  5. -include_lib("kvs/include/groups.hrl").
  6. -include_lib("kvs/include/feeds.hrl").
  7. -include_lib("kvs/include/acls.hrl").
  8. -include_lib("kvs/include/invites.hrl").
  9. -include_lib("kvs/include/meetings.hrl").
  10. -include_lib("kvs/include/membership_packages.hrl").
  11. -include_lib("kvs/include/accounts.hrl").
  12. -include_lib("kvs/include/log.hrl").
  13. -include_lib("stdlib/include/qlc.hrl").
  14. -compile(export_all).
  15. -define(BUCKET_INDEX, "bucket_bin").
  16. -define(MD_INDEX, <<"index">>).
  17. delete() -> ok.
  18. start() -> ok.
  19. stop() -> stopped.
  20. initialize() ->
  21. C = riak:client_connect('node_runner@127.0.0.1'),
  22. ets:new(config, [named_table,{keypos,#config.key}]),
  23. ets:insert(config, #config{ key = "riak_client", value = C}),
  24. ok.
  25. init_indexes() ->
  26. C = riak_client(),
  27. C:set_bucket(key_to_bin(id_seq), [{backend, leveldb_backend}]),
  28. C:set_bucket(key_to_bin(subscription), [{backend, leveldb_backend}]),
  29. C:set_bucket(key_to_bin(user), [{backend, leveldb_backend}]),
  30. C:set_bucket(key_to_bin(group), [{backend, leveldb_backend}]),
  31. C:set_bucket(key_to_bin(translation), [{backend, leveldb_backend}]),
  32. C:set_bucket(key_to_bin(group_subscription), [{backend, leveldb_backend}]),
  33. C:set_bucket(key_to_bin(user_bought_gifts), [{backend, leveldb_backend}]),
  34. C:set_bucket(key_to_bin(play_record), [{backend, leveldb_backend}]),
  35. ok.
  36. dir() ->
  37. C = riak_client(),
  38. {ok,Buckets} = C:list_buckets(),
  39. [binary_to_list(X)||X<-Buckets].
  40. riak_clean(Table) when is_list(Table)->
  41. C = riak_client(),
  42. {ok,Keys}=C:list_keys(erlang:list_to_binary(Table)),
  43. [ C:delete(erlang:list_to_binary(Table),Key) || Key <- Keys];
  44. riak_clean(Table) ->
  45. C = riak_client(),
  46. [TableStr] = io_lib:format("~p",[Table]),
  47. {ok,Keys}=C:list_keys(erlang:list_to_binary(TableStr)),
  48. [ kvs:delete(Table,key_to_bin(Key)) || Key <- Keys].
  49. make_object(T) ->
  50. Bucket = element(1,T),
  51. Key = element(2,T),
  52. Obj1 = riak_object:new(key_to_bin(Bucket), key_to_bin(Key), T),
  53. Indices = [{?BUCKET_INDEX, Bucket} | make_indices(T)], %% Usefull only for level_db buckets
  54. Meta = dict:store(?MD_INDEX, Indices, dict:new()),
  55. Obj2 = riak_object:update_metadata(Obj1, Meta),
  56. Obj2.
  57. make_indices(#subscription{who=Who, whom=Whom}) -> [{<<"subs_who_bin">>, key_to_bin(Who)}, {<<"subs_whom_bin">>, key_to_bin(Whom)}];
  58. make_indices(#group_subscription{user_id=UId, group_id=GId}) -> [{<<"group_subs_user_bin">>, key_to_bin(UId)}, {<<"group_subs_group_bin">>, key_to_bin(GId)}];
  59. make_indices(#user_bought_gifts{username=UId}) -> [{<<"user_bought_gifts_username_bin">>, key_to_bin(UId)}];
  60. make_indices(#user{username=UId,zone=Zone}) -> [{<<"user_bin">>, key_to_bin(UId)},{<<"user_zone_bin">>, key_to_bin(Zone)}];
  61. make_indices(_Record) -> [].
  62. riak_client() -> [{_,_,{_,C}}] = ets:lookup(config, "riak_client"), C.
  63. put(Records) when is_list(Records) -> lists:foreach(fun riak_put/1, Records);
  64. put(Record) -> store_riak:put([Record]).
  65. riak_put(Record) ->
  66. Object = make_object(Record),
  67. Riak = riak_client(),
  68. Result = Riak:put(Object, [{allow_mult,false},{last_write_wins,true}]),
  69. post_write_hooks(Record, Riak),
  70. Result.
  71. put_if_none_match(Record) ->
  72. Object = make_object(Record),
  73. Riak = riak_client(),
  74. case Riak:put(Object, [if_none_match]) of
  75. ok ->
  76. post_write_hooks(Record, Riak),
  77. ok;
  78. Error ->
  79. Error
  80. end.
  81. %% update(Record, Meta) -> ok | {error, Reason}
  82. update(Record, Object) ->
  83. NewObject = make_object(Record),
  84. NewKey = riak_object:key(NewObject),
  85. case riak_object:key(Object) of
  86. NewKey ->
  87. MetaInfo = riak_object:get_update_metatdata(NewObject),
  88. UpdObject2 = riak_object:update_value(Object, Record),
  89. UpdObject3 = riak_object:update_metadata(UpdObject2, MetaInfo),
  90. Riak = riak_client(),
  91. case Riak:put(UpdObject3, [if_not_modified]) of
  92. ok -> post_write_hooks(Record, Riak), ok;
  93. Error -> Error
  94. end;
  95. _ -> {error, keys_not_equal}
  96. end.
  97. post_write_hooks(R,C) ->
  98. case element(1,R) of
  99. user -> case R#user.email of
  100. undefined -> nothing;
  101. _ -> C:put(make_object({email, R#user.username, R#user.email})) end,
  102. case R#user.verification_code of
  103. undefined -> nothing;
  104. _ -> C:put(make_object({code, R#user.username, R#user.verification_code})) end,
  105. case R#user.facebook_id of
  106. undefined -> nothing;
  107. _ -> C:put(make_object({facebook, R#user.username, R#user.facebook_id})) end;
  108. _ -> continue
  109. end.
  110. get(Tab, Key) ->
  111. Bucket = key_to_bin(Tab),
  112. IntKey = key_to_bin(Key),
  113. riak_get(Bucket, IntKey).
  114. riak_get(Bucket,Key) -> %% TODO: add state monad here for conflict resolution when not last_win strategy used
  115. C = riak_client(),
  116. RiakAnswer = C:get(Bucket,Key,[{last_write_wins,true},{allow_mult,false}]),
  117. case RiakAnswer of
  118. {ok, O} -> {ok,riak_object:get_value(O)};
  119. X -> X
  120. end.
  121. get_for_update(Tab, Key) ->
  122. C = riak_client(),
  123. case C:get(key_to_bin(Tab), key_to_bin(Key), [{last_write_wins,true},{allow_mult,false}]) of
  124. {ok, O} -> {ok, riak_object:get_value(O), O};
  125. Error -> Error
  126. end.
  127. get_word(Word) -> store_riak:get(ut_word,Word).
  128. get_translation({Lang, Word}) -> store_riak:get(ut_translation, Lang ++ "_" ++ Word).
  129. % delete
  130. delete(Keys) when is_list(Keys) -> lists:foreach(fun mnesia:delete_object/1, Keys); % TODO
  131. delete(Keys) -> delete([Keys]).
  132. delete(Tab, Key) ->
  133. C = riak_client(),
  134. Bucket = key_to_bin(Tab),
  135. IntKey = key_to_bin(Key),
  136. C:delete(Bucket, IntKey),
  137. ok.
  138. delete_by_index(Tab, IndexId, IndexVal) ->
  139. Riak = riak_client(),
  140. Bucket = key_to_bin(Tab),
  141. {ok, Keys} = Riak:get_index(Bucket, {eq, IndexId, key_to_bin(IndexVal)}),
  142. [Riak:delete(Bucket, Key) || Key <- Keys],
  143. ok.
  144. key_to_bin(Key) ->
  145. if is_integer(Key) -> erlang:list_to_binary(integer_to_list(Key));
  146. is_list(Key) -> erlang:list_to_binary(Key);
  147. is_atom(Key) -> erlang:list_to_binary(erlang:atom_to_list(Key));
  148. is_binary(Key) -> Key;
  149. true -> [ListKey] = io_lib:format("~p", [Key]), erlang:list_to_binary(ListKey)
  150. end.
  151. select(RecordName, Pred) when is_function(Pred) ->
  152. %% FIXME: bruteforce select
  153. All = all(RecordName),
  154. lists:filter(Pred, All);
  155. select(RecordName, Select) when is_list(Select) ->
  156. %% FIXME: dummy select!
  157. Where = proplists:get_value(where, Select, fun(_)->true end),
  158. {Position, _Order} = proplists:get_value(order, Select, {1, descending}),
  159. Limit = proplists:get_value(limit, Select, all),
  160. Selected = select(RecordName, Where),
  161. Sorted = lists:keysort(Position, Selected),
  162. case Limit of
  163. all ->
  164. Sorted;
  165. {Offset, Amoumt} ->
  166. lists:sublist(Sorted, Offset, Amoumt)
  167. end.
  168. count(_RecordName) -> erlang:length(all(_RecordName)).
  169. all(RecordName) ->
  170. Riak = riak_client(),
  171. [RecordStr] = io_lib:format("~p",[RecordName]),
  172. RecordBin = erlang:list_to_binary(RecordStr),
  173. {ok,Keys} = Riak:list_keys(RecordBin),
  174. Results = [ get_record_from_table({RecordBin, Key, Riak}) || Key <- Keys ],
  175. [ Object || Object <- Results, Object =/= failure ].
  176. %% get by index
  177. all_by_index(Tab, IndexId, IndexVal) ->
  178. Riak = riak_client(),
  179. Bucket = key_to_bin(Tab),
  180. {ok, Keys} = Riak:get_index(Bucket, {eq, IndexId, key_to_bin(IndexVal)}),
  181. F = fun(Key, Acc) ->
  182. case Riak:get(Bucket, Key, []) of
  183. {ok, O} -> [riak_object:get_value(O) | Acc];
  184. {error, notfound} -> Acc
  185. end
  186. end,
  187. lists:foldl(F, [], Keys).
  188. get_record_from_table({RecordBin, Key, Riak}) ->
  189. case Riak:get(RecordBin, Key) of
  190. {ok,O} -> riak_object:get_value(O);
  191. X -> failure
  192. end.
  193. % id generator
  194. next_id(CounterId) -> next_id(CounterId, 1).
  195. next_id(CounterId, Incr) -> next_id(CounterId, 0, Incr).
  196. next_id(CounterId, Default, Incr) ->
  197. Riak = riak_client(),
  198. CounterBin = key_to_bin(CounterId),
  199. {Object, Value, Options} =
  200. case Riak:get(key_to_bin(id_seq), CounterBin, []) of
  201. {ok, CurObj} ->
  202. R = #id_seq{id = CurVal} = riak_object:get_value(CurObj),
  203. NewVal = CurVal + Incr,
  204. Obj = riak_object:update_value(CurObj, R#id_seq{id = NewVal}),
  205. {Obj, NewVal, [if_not_modified]};
  206. {error, notfound} ->
  207. NewVal = Default + Incr,
  208. Obj = riak_object:new(key_to_bin(id_seq), CounterBin, #id_seq{thing = CounterId, id = NewVal}),
  209. {Obj, NewVal, [if_none_match]} end,
  210. case Riak:put(Object, Options) of
  211. ok -> Value;
  212. {error, _} -> next_id(CounterId, Incr) end.
  213. % user backlinks
  214. user_by_verification_code(Code) ->
  215. case kvs:get(code,Code) of
  216. {ok,{_,User,_}} -> kvs:get(user,User);
  217. Else -> Else end.
  218. user_by_facebook_id(FBId) ->
  219. case kvs:get(facebook,FBId) of
  220. {ok,{_,User,_}} -> kvs:get(user,User);
  221. Else -> Else end.
  222. user_by_email(Email) ->
  223. case kvs:get(email,Email) of
  224. {ok,{_,User,_}} -> kvs:get(user,User);
  225. Else -> Else end.
  226. user_by_username(Name) ->
  227. case X = kvs:get(user,Name) of
  228. {ok,_Res} -> X;
  229. Else -> Else end.
  230. % feeds
  231. feed_add_direct_message(FId,User,To,EntryId,Desc,Medias) -> feed_add_entry(FId,User,To,EntryId,Desc,Medias,{user,direct},"").
  232. feed_add_entry(FId,From,EntryId,Desc,Medias) -> feed_add_entry(FId,From,undefined,EntryId,Desc,Medias,{user,normal},"").
  233. feed_add_entry(FId, User, To, EntryId,Desc,Medias,Type,SharedBy) ->
  234. %% prevent adding of duplicate records to feed
  235. case kvs:entry_by_id({EntryId, FId}) of
  236. {ok, _} -> ok;
  237. _ -> do_feed_add_entry(FId, User, To, EntryId, Desc, Medias, Type, SharedBy)
  238. end.
  239. do_feed_add_entry(FId, User, To, EntryId, Desc, Medias, Type, SharedBy) ->
  240. {ok,Feed} = kvs:get(feed,erlang:integer_to_list(FId)),
  241. Id = {EntryId, FId},
  242. Next = undefined,
  243. Prev = case Feed#feed.top of
  244. undefined ->
  245. undefined;
  246. X ->
  247. case kvs:get(entry, X) of
  248. {ok, TopEntry} ->
  249. EditedEntry = TopEntry#entry{next = Id},
  250. % update prev entry
  251. kvs:put(EditedEntry),
  252. TopEntry#entry.id;
  253. {error,notfound} ->
  254. undefined
  255. end
  256. end,
  257. kvs:put(#feed{id = FId, top = {EntryId, FId}}), % update feed top with current
  258. Entry = #entry{id = {EntryId, FId},
  259. entry_id = EntryId,
  260. feed_id = FId,
  261. from = User,
  262. to = To,
  263. type = Type,
  264. media = Medias,
  265. created_time = now(),
  266. description = Desc,
  267. raw_description = Desc,
  268. shared = SharedBy,
  269. next = Next,
  270. prev = Prev},
  271. ModEntry = case catch feedformat:format(Entry) of
  272. {_, Reason} ->
  273. ?ERROR("feedformat error: ~p", [Reason]),
  274. Entry;
  275. #entry{} = ME ->
  276. ME
  277. end,
  278. kvs:put(ModEntry),
  279. {ok, ModEntry}.
  280. feed_add_comment(FId, User, EntryId, ParentComment, CommentId, Content, Medias) ->
  281. FullId = {CommentId, {EntryId, FId}},
  282. Prev = case ParentComment of
  283. undefined ->
  284. {ok, Entry} = kvs:entry_by_id({EntryId, FId}),
  285. {PrevC, E} = case Entry#entry.comments of
  286. undefined ->
  287. {undefined, Entry#entry{comments_rear = FullId}};
  288. Id ->
  289. {ok, PrevTop} = kvs:get(comment, Id),
  290. kvs:put(PrevTop#comment{next = FullId}),
  291. {Id, Entry}
  292. end,
  293. kvs:put(E#entry{comments=FullId}),
  294. PrevC;
  295. _ ->
  296. {ok, Parent} = kvs:get(comment, {{EntryId, FId}, ParentComment}),
  297. {PrevC, CC} = case Parent#comment.comments of
  298. undefined ->
  299. {undefined, Parent#comment{comments_rear = FullId}};
  300. Id ->
  301. {ok, PrevTop} = kvs:get(comment, Id),
  302. kvs:put(PrevTop#comment{next = FullId}),
  303. {Id, Parent}
  304. end,
  305. kvs:put(CC#comment{comments = FullId}),
  306. PrevC
  307. end,
  308. Comment = #comment{id = FullId,
  309. author_id = User,
  310. comment_id = CommentId,
  311. entry_id = EntryId,
  312. raw_content = Content,
  313. content = Content,
  314. media = Medias,
  315. create_time = now(),
  316. prev = Prev,
  317. next = undefined
  318. },
  319. kvs:put(Comment),
  320. {ok, Comment}.
  321. add_transaction_to_user(UserId,Purchase) ->
  322. {ok,Team} = case kvs:get(user_transaction, UserId) of
  323. {ok,T} -> {ok,T};
  324. _ -> ?INFO("user_transaction not found"),
  325. Head = #user_transaction{ user = UserId, top = undefined},
  326. {kvs:put(Head),Head}
  327. end,
  328. EntryId = Purchase#transaction.id, %kvs:next_id("membership_purchase",1),
  329. Prev = undefined,
  330. case Team#user_transaction.top of
  331. undefined -> Next = undefined;
  332. X -> case kvs:get(transaction, X) of
  333. {ok, TopEntry} ->
  334. Next = TopEntry#transaction.id,
  335. EditedEntry = #transaction {
  336. commit_time = TopEntry#transaction.commit_time,
  337. amount = TopEntry#transaction.amount,
  338. remitter = TopEntry#transaction.remitter,
  339. acceptor = TopEntry#transaction.acceptor,
  340. currency = TopEntry#transaction.currency,
  341. info = TopEntry#transaction.info,
  342. id = TopEntry#transaction.id,
  343. next = TopEntry#transaction.next,
  344. prev = EntryId },
  345. kvs:put(EditedEntry); % update prev entry
  346. {error,notfound} -> Next = undefined
  347. end
  348. end,
  349. Entry = #transaction{id = EntryId,
  350. commit_time = Purchase#transaction.commit_time,
  351. amount = Purchase#transaction.amount,
  352. remitter = Purchase#transaction.remitter,
  353. acceptor = Purchase#transaction.acceptor,
  354. currency = Purchase#transaction.currency,
  355. info = Purchase#transaction.info,
  356. next = Next,
  357. prev = Prev},
  358. case kvs:put(Entry) of ok -> kvs:put(#user_transaction{ user = UserId, top = EntryId}), {ok, EntryId};
  359. Error -> ?INFO("Cant write transaction"), {failure,Error} end.
  360. add_purchase_to_user(UserId,Purchase) ->
  361. {ok,Team} = case kvs:get(user_purchase, UserId) of
  362. {ok,T} -> ?INFO("user_purchase found"), {ok,T};
  363. _ -> ?INFO("user_purchase not found"),
  364. Head = #user_purchase{ user = UserId, top = undefined},
  365. {kvs:put(Head),Head}
  366. end,
  367. EntryId = Purchase#membership_purchase.id, %kvs:next_id("membership_purchase",1),
  368. Prev = undefined,
  369. case Team#user_purchase.top of
  370. undefined -> Next = undefined;
  371. X -> case kvs:get(membership_purchase, X) of
  372. {ok, TopEntry} ->
  373. Next = TopEntry#membership_purchase.id,
  374. EditedEntry = #membership_purchase{
  375. external_id = TopEntry#membership_purchase.external_id,
  376. user_id = TopEntry#membership_purchase.user_id,
  377. state = TopEntry#membership_purchase.state,
  378. membership_package = TopEntry#membership_purchase.membership_package,
  379. start_time = TopEntry#membership_purchase.start_time,
  380. end_time = TopEntry#membership_purchase.end_time,
  381. state_log = TopEntry#membership_purchase.state_log,
  382. info = TopEntry#membership_purchase.info,
  383. id = TopEntry#membership_purchase.id,
  384. next = TopEntry#membership_purchase.next,
  385. prev = EntryId},
  386. kvs:put(EditedEntry); % update prev entry
  387. {error,notfound} -> Next = undefined
  388. end
  389. end,
  390. kvs:put(#user_purchase{ user = UserId, top = EntryId}), % update team top with current
  391. Entry = #membership_purchase{id = EntryId,
  392. user_id = UserId,
  393. external_id = Purchase#membership_purchase.external_id,
  394. state = Purchase#membership_purchase.state,
  395. membership_package = Purchase#membership_purchase.membership_package,
  396. start_time = Purchase#membership_purchase.start_time,
  397. end_time = Purchase#membership_purchase.end_time,
  398. state_log = Purchase#membership_purchase.state_log,
  399. info = Purchase#membership_purchase.info,
  400. next = Next,
  401. prev = Prev},
  402. case kvs:put(Entry) of ok -> {ok, EntryId};
  403. Error -> ?INFO("Cant write purchase"), {failure,Error} end.
  404. acl_add_entry(Resource, Accessor, Action) ->
  405. Acl = case kvs:get(acl, Resource) of
  406. {ok, A} ->
  407. A;
  408. %% if acl record wasn't created already
  409. {error, notfound} ->
  410. A = #acl{id = Resource, resource=Resource},
  411. kvs:put(A),
  412. A
  413. end,
  414. EntryId = {Accessor, Resource},
  415. case kvs:get(acl_entry, EntryId) of
  416. %% there is no entries for specified Acl and Accessor, we have to add it
  417. {error, notfound} ->
  418. Next = undefined,
  419. Prev = case Acl#acl.top of
  420. undefined ->
  421. undefined;
  422. Top ->
  423. case kvs:get(acl_entry, Top) of
  424. {ok, TopEntry} ->
  425. EditedEntry = TopEntry#acl_entry{next = EntryId},
  426. kvs:put(EditedEntry), % update prev entry
  427. TopEntry#acl_entry.id;
  428. {error, notfound} ->
  429. undefined
  430. end
  431. end,
  432. %% update acl with top of acl entries list
  433. kvs:put(Acl#acl{top = EntryId}),
  434. Entry = #acl_entry{id = EntryId,
  435. entry_id = EntryId,
  436. accessor = Accessor,
  437. action = Action,
  438. next = Next,
  439. prev = Prev},
  440. ok = kvs:put(Entry),
  441. Entry;
  442. %% if acl entry for Accessor and Acl is defined - just change action
  443. {ok, AclEntry} ->
  444. kvs:put(AclEntry#acl_entry{action = Action}),
  445. AclEntry
  446. end.
  447. join_tournament(UserId, TournamentId) ->
  448. case kvs:get(user, UserId) of
  449. {ok, User} ->
  450. GP = case accounts:balance(UserId, points) of
  451. {ok, AS1} -> AS1;
  452. {error, _} -> 0 end,
  453. Q = case accounts:balance(UserId, quota) of
  454. {ok, AS4} -> AS4;
  455. {error, _} -> 0 end,
  456. RN = users:user_realname(UserId),
  457. kvs:put(#play_record{
  458. who = UserId,
  459. tournament = TournamentId,
  460. team = User#user.team,
  461. game_id = undefined,
  462. other = now(),
  463. realname = RN,
  464. points = GP,
  465. quota = Q});
  466. _ ->
  467. ?INFO(" User ~p not found for joining tournament ~p", [UserId, TournamentId])
  468. end.
  469. leave_tournament(UserId, TournamentId) ->
  470. case kvs:get(play_record, {UserId, TournamentId}) of
  471. {ok, _} ->
  472. kvs:delete(play_record, {UserId, TournamentId}),
  473. leave_tournament(UserId, TournamentId); % due to WTF error with old records
  474. _ -> ok
  475. end.
  476. user_tournaments(UId) ->
  477. kvs:all_by_index(play_record, <<"play_record_who_bin">>, list_to_binary(UId)).
  478. tournament_waiting_queue(TId) ->
  479. kvs:all_by_index(play_record, <<"play_record_tournament_bin">>, list_to_binary(integer_to_list(TId))).
  480. -spec entry_by_id(term()) -> {ok, #entry{}} | {error, not_found}.
  481. entry_by_id(EntryId) -> kvs:get(entry, EntryId).
  482. -spec comment_by_id({{EntryId::term(), FeedId::term()}, CommentId::term()}) -> {ok, #comment{}}.
  483. comment_by_id(CommentId) -> kvs:get(CommentId).
  484. -spec comments_by_entry(EId::{string(), term()}) -> [#comment{}].
  485. comments_by_entry({EId, FId}) ->
  486. case kvs:entry_by_id({EId, FId}) of
  487. {ok, #entry{comments_rear = undefined}} ->
  488. [];
  489. {ok, #entry{comments_rear = First}} ->
  490. lists:flatten(read_comments_rev(First));
  491. _ ->
  492. []
  493. end.
  494. purchases(UserId) -> purchases_in_basket(UserId, undefined, 1000).
  495. get_purchases_by_user(UserId, Count, States) -> get_purchases_by_user(UserId, undefined, Count, States).
  496. get_purchases_by_user(UserId, Start, Count, States) ->
  497. List = purchases_in_basket(UserId, Start, Count),
  498. case States == all of
  499. true -> List;
  500. false -> [P||P<-List, lists:member(P#membership_purchase.state, States)]
  501. end.
  502. purchases_in_basket(UserId, undefined, PageAmount) ->
  503. case kvs:get(user_purchase, UserId) of
  504. {ok, O} when O#user_purchase.top =/= undefined ->
  505. purchases_in_basket(UserId, O#user_purchase.top, PageAmount);
  506. {error, notfound} -> []
  507. end;
  508. purchases_in_basket(UserId, StartFrom, Limit) ->
  509. case kvs:get(membership_purchase,StartFrom) of
  510. {ok, #membership_purchase{next = N}=P} -> [ P | riak_traversal(membership_purchase, #membership_purchase.next, N, Limit)];
  511. X -> []
  512. end.
  513. transactions(UserId) -> tx_list(UserId, undefined, 10000).
  514. tx_list(UserId, undefined, PageAmount) ->
  515. case kvs:get(user_transaction, UserId) of
  516. {ok, O} when O#user_transaction.top =/= undefined -> tx_list(UserId, O#user_transaction.top, PageAmount);
  517. {error, notfound} -> []
  518. end;
  519. tx_list(UserId, StartFrom, Limit) ->
  520. case kvs:get(transaction,StartFrom) of
  521. {ok, #transaction{next = N}=P} -> [ P | riak_traversal(transaction, #transaction.next, N, Limit)];
  522. X -> []
  523. end.
  524. read_comments(undefined) -> [];
  525. read_comments([#comment{comments = C} | Rest]) -> [read_comments(C) | read_comments(Rest)];
  526. read_comments(C) -> riak_traversal(comment, #comment.prev, C, all).
  527. read_comments_rev(undefined) -> [];
  528. read_comments_rev([#comment{comments = C} | Rest]) -> [read_comments_rev(C) | read_comments_rev(Rest)];
  529. read_comments_rev(C) -> riak_traversal(comment, #comment.next, C, all).
  530. riak_traversal( _, _, undefined, _) -> [];
  531. riak_traversal(_, _, _, 0) -> [];
  532. riak_traversal(RecordType, PrevPos, Next, Count)->
  533. case srore_riak:get(RecordType, Next) of
  534. {error,notfound} -> [];
  535. {ok, R} ->
  536. Prev = element(PrevPos, R),
  537. Count1 = case Count of C when is_integer(C) -> C - 1; _-> Count end,
  538. [R | riak_traversal(RecordType, PrevPos, Prev, Count1)]
  539. end.
  540. riak_read_acl_entries(_, undefined, Result) -> Result;
  541. riak_read_acl_entries(C, Next, Result) ->
  542. NextStr = io_lib:format("~p",[Next]),
  543. RA = C:get(<<"acl_entry">>,erlang:list_to_binary(NextStr)),
  544. case RA of
  545. {ok,RO} -> O = riak_object:get_value(RO), riak_read_acl_entries(C, O#acl_entry.prev, Result ++ [O]);
  546. {error,notfound} -> Result
  547. end.
  548. purge_feed(FeedId) ->
  549. {ok,Feed} = kvs:get(feed,FeedId),
  550. Removal = riak_entry_traversal(Feed#feed.top, -1),
  551. [kvs:delete(entry,Id)||#entry{id=Id}<-Removal],
  552. kvs:put(Feed#feed{top=undefined}).
  553. purge_unverified_feeds() ->
  554. [purge_feed(FeedId) || #user{feed=FeedId,status=S,email=E} <- kvs:all(user),E==undefined].
  555. riak_entry_traversal(undefined, _) -> [];
  556. riak_entry_traversal(_, 0) -> [];
  557. riak_entry_traversal(Next, Count)->
  558. case store_riak:get(entry, Next) of
  559. {error,notfound} -> [];
  560. {ok, R} ->
  561. Prev = element(#entry.prev, R),
  562. Count1 = case Count of
  563. C when is_integer(C) -> case R#entry.type of
  564. {_, system} -> C; % temporal entries are entries too, but they shouldn't be counted
  565. {_, system_note} -> C;
  566. _ -> C - 1
  567. end;
  568. _-> Count
  569. end,
  570. [R | riak_entry_traversal(Prev, Count1)]
  571. end.
  572. entries_in_feed(FeedId, undefined, PageAmount) ->
  573. case kvs:get(feed, FeedId) of
  574. {ok, O} -> riak_entry_traversal(O#feed.top, PageAmount);
  575. {error, notfound} -> []
  576. end;
  577. entries_in_feed(FeedId, StartFrom, PageAmount) ->
  578. %% construct entry unic id
  579. case kvs:get(entry,{StartFrom, FeedId}) of
  580. {ok, #entry{prev = Prev}} -> riak_entry_traversal(Prev, PageAmount);
  581. _ -> []
  582. end.
  583. acl_entries(AclId) ->
  584. C = riak_client(),
  585. [AclStr] = io_lib:format("~p",[AclId]),
  586. RA = C:get(<<"acl">>, erlang:list_to_binary(AclStr)),
  587. case RA of
  588. {ok,RO} ->
  589. O = riak_object:get_value(RO),
  590. riak_read_acl_entries(C, O#acl.top, []);
  591. {error, notfound} -> []
  592. end.
  593. feed_direct_messages(_FId, Page, PageAmount, CurrentUser, CurrentFId) ->
  594. Page, PageAmount, CurrentUser, CurrentFId,
  595. [].