kvs.erl 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. -module(kvs).
  2. -author('Maxim Sokhatsky <maxim@synrc.com>').
  3. -include_lib("kvs/include/users.hrl").
  4. -include_lib("kvs/include/translations.hrl").
  5. -include_lib("kvs/include/groups.hrl").
  6. -include_lib("kvs/include/feeds.hrl").
  7. -include_lib("kvs/include/acls.hrl").
  8. -include_lib("kvs/include/meetings.hrl").
  9. -include_lib("kvs/include/invites.hrl").
  10. -include_lib("kvs/include/config.hrl").
  11. -include_lib("kvs/include/accounts.hrl").
  12. -include_lib("kvs/include/membership.hrl").
  13. -include_lib("kvs/include/payments.hrl").
  14. -include_lib("kvs/include/products.hrl").
  15. -include_lib("stdlib/include/qlc.hrl").
  16. -include_lib("kvs/include/feed_state.hrl").
  17. -compile(export_all).
  18. start() -> DBA = ?DBA, DBA:start().
  19. dir() -> DBA = ?DBA, DBA:dir().
  20. stop() -> DBA = ?DBA, DBA:stop().
  21. initialize() -> DBA = ?DBA, DBA:initialize().
  22. delete() -> DBA = ?DBA, DBA:delete().
  23. init_indexes() -> DBA = ?DBA, DBA:init_indexes().
  24. wait_for_tables() -> DBA=?DBA, DBA:wait_for_tables().
  25. add(Record) when is_tuple(Record) ->
  26. Id = element(#iterator.id, Record),
  27. case kvs:get(element(1,Record), Id) of {ok, _} -> error_logger:info_msg("Entry exist: ~p", [Id]),{error, exist};
  28. {error, not_found} ->
  29. Type = element(1, Record),
  30. CName = element(#iterator.container, Record),
  31. Cid = case element(#iterator.feed_id, Record) of undefined -> ?FEED(Type); Fid -> Fid end,
  32. Container = case kvs:get(CName, Cid) of {ok,C} -> C;
  33. {error, not_found} when Cid /= undefined ->
  34. NC = setelement(#container.id, erlang:list_to_tuple([CName|proplists:get_value(CName, ?CONTAINERS)]), Cid),
  35. NC1 = setelement(#container.entries_count, NC, 0),
  36. kvs:put(NC1),NC1;
  37. _ -> error end,
  38. if Container == error -> {error, no_container}; true ->
  39. Next = undefined,
  40. Prev = case element(#container.top, Container) of undefined -> undefined;
  41. Tid -> case kvs:get(Type, Tid) of {error, not_found} -> undefined;
  42. {ok, Top} -> NewTop = setelement(#iterator.next, Top, Id), kvs:put(NewTop), element(#iterator.id, NewTop) end end,
  43. C1 = setelement(#container.top, Container, Id),
  44. C2 = setelement(#container.entries_count, C1, element(#container.entries_count, Container)+1),
  45. kvs:put(C2),
  46. R = setelement(#iterator.feeds, Record, [ case F1 of {FN, Fd} -> {FN, Fd} ; _-> {F1, kvs_feed:create()} end || F1 <- element(#iterator.feeds, Record)]),
  47. R1 = setelement(#iterator.next, R, Next),
  48. R2 = setelement(#iterator.prev, R1, Prev),
  49. R3 = setelement(#iterator.feed_id, R2, element(#container.id, Container)),
  50. kvs:put(R3),
  51. error_logger:info_msg("[kvs] PUT: ~p", [element(#container.id,R3)]),
  52. {ok, R3} end end.
  53. remove(RecordName, RecordId) ->
  54. case kvs:get(RecordName, RecordId) of {error, not_found} -> error_logger:info_msg("not found");
  55. {ok, E} ->
  56. Id = element(#iterator.id, E),
  57. CName = element(#iterator.container, E),
  58. Cid = element(#iterator.feed_id, E),
  59. {ok, Container} = kvs:get(CName, Cid),
  60. Top = element(#container.top, Container),
  61. Next = element(#iterator.next, E),
  62. Prev = element(#iterator.prev, E),
  63. case kvs:get(RecordName, Next) of {ok, NE} -> NewNext = setelement(#iterator.prev, NE, Prev), kvs:put(NewNext); _ -> ok end,
  64. case kvs:get(RecordName, Prev) of {ok, PE} -> NewPrev = setelement(#iterator.next, PE, Next), kvs:put(NewPrev); _ -> ok end,
  65. C1 = case Top of Id -> setelement(#container.top, Container, Prev); _ -> Container end,
  66. C2 = setelement(#container.entries_count, C1, element(#container.entries_count, Container)-1),
  67. kvs:put(C2),
  68. error_logger:info_msg("[kvs] DELETE: ~p id: ~p", [RecordName, Id]),
  69. kvs:delete(RecordName, Id) end.
  70. remove(E) when is_tuple(E) ->
  71. Id = element(#iterator.id, E),
  72. CName = element(#iterator.container, E),
  73. Cid = element(#iterator.feed_id, E),
  74. {ok, Container} = kvs:get(CName, Cid),
  75. Top = element(#container.top, Container),
  76. Next = element(#iterator.next, E),
  77. Prev = element(#iterator.prev, E),
  78. case kvs:get(element(1,E), Next) of {ok, NE} -> NewNext = setelement(#iterator.prev, NE, Prev), kvs:put(NewNext); _ -> ok end,
  79. case kvs:get(element(1,E), Prev) of {ok, PE} -> NewPrev = setelement(#iterator.next, PE, Next), kvs:put(NewPrev); _ -> ok end,
  80. C1 = case Top of Id -> setelement(#container.top, Container, Prev); _ -> Container end,
  81. C2 = setelement(#container.entries_count, C1, element(#container.entries_count, Container)-1),
  82. kvs:put(C2),
  83. error_logger:info_msg("[kvs] DELETE: ~p", [Id]),
  84. kvs:delete(E).
  85. %purge_feed(FeedId) ->
  86. % {ok,Feed} = kvs:get(feed,FeedId),
  87. % Removal = entry_traversal(Feed#feed.top, -1),
  88. % [kvs:delete(entry,Id)||#entry{id=Id}<-Removal],
  89. % kvs:put(Feed#feed{top=undefined}).
  90. %purge_unverified_feeds() ->
  91. % [ [purge_feed(Fid)|| {_, Fid} <- Feeds ] || #user{feeds=Feeds, email=E} <- kvs:all(user), E==undefined].
  92. traversal( _,undefined,_,_) -> [];
  93. traversal(_,_,0,_) -> [];
  94. traversal(RecordType, Start, Count, Direction)->
  95. case kvs:get(RecordType, Start) of {error,_} -> [];
  96. {ok, R} -> Prev = element(Direction, R),
  97. Count1 = case Count of C when is_integer(C) -> C - 1; _-> Count end,
  98. [R | traversal(RecordType, Prev, Count1, Direction)] end.
  99. entries({ok, Container}, RecordType, Count) -> entries(Container, RecordType, Count);
  100. entries(Container, RecordType, Count) when is_tuple(Container) -> traversal(RecordType, element(#container.top, Container), Count, #iterator.prev);
  101. entries(_,_,_) -> error_logger:info_msg("=> ENTRIES ARGS NOT MATCH!"), [].
  102. entries(RecordType, Start, Count, Direction) ->
  103. E = traversal(RecordType, Start, Count, Direction),
  104. case Direction of #iterator.next -> lists:reverse(E); #iterator.prev -> E end.
  105. init_db() ->
  106. case kvs:get(user,"joe") of
  107. {error,_} ->
  108. add_seq_ids(),
  109. kvs_account:create_account(system),
  110. %add_sample_users(),
  111. % add_sample_packages(),
  112. % add_sample_payments(),
  113. add_translations();
  114. {ok,_} -> ignore end.
  115. %add_sample_packages() -> kvs_membership:add_sample_data().
  116. %add_sample_payments() ->
  117. % {ok, Pkg1} = kvs:get(membership,1),
  118. % {ok, Pkg2} = kvs:get(membership,2),
  119. % {ok, Pkg3} = kvs:get(membership,3),
  120. % {ok, Pkg4} = kvs:get(membership,4),
  121. % PList = [{"doxtop", Pkg1},{"maxim", Pkg2},{"maxim",Pkg4}, {"kate", Pkg3} ],
  122. % [ok = add_payment(U, P) || {U, P} <- PList],
  123. % ok.
  124. %add_payment(UserId, Package) ->
  125. % {ok, MPId} = kvs_payment:add_payment(#payment{user_id=UserId, membership=Package}),
  126. % kvs_payment:set_payment_state(MPId, ?MP_STATE_DONE, undefined).
  127. add_seq_ids() ->
  128. Init = fun(Key) ->
  129. case kvs:get(id_seq, Key) of
  130. {error, _} -> ok = kvs:put(#id_seq{thing = Key, id = 0});
  131. {ok, _} -> ignore
  132. end
  133. end,
  134. Init("meeting"),
  135. Init("user_transaction"),
  136. Init("user_product"),
  137. Init("user_payment"),
  138. Init("user_status"),
  139. Init("transaction"),
  140. Init("membership"),
  141. Init("payment"),
  142. Init("acl"),
  143. Init("acl_entry"),
  144. Init("feed"),
  145. Init("entry"),
  146. Init("like_entry"),
  147. Init("likes"),
  148. Init("one_like"),
  149. Init("comment").
  150. add_translations() ->
  151. lists:foreach(fun({English, Lang, Word}) ->
  152. kvs:put(#translation{english = English, lang = "en", word = English}),
  153. kvs:put(#translation{english = English, lang = Lang, word = Word}) end, ?URL_DICTIONARY).
  154. add_sample_users() ->
  155. Groups = [],
  156. UserList = [],
  157. kvs:put(Groups),
  158. {ok, Quota} = kvs:get(config,"accounts/default_quota", 300),
  159. [ begin
  160. [ kvs_group:join(Me#user.username,G#group.id) || G <- Groups ],
  161. kvs_account:create_account(Me#user.username),
  162. kvs_account:transaction(Me#user.username, quota, Quota, #tx_default_assignment{}),
  163. kvs:put(Me#user{password = kvs:sha(Me#user.password)})
  164. end || Me <- UserList ],
  165. %kvs_acl:define_access({user, "maxim"}, {feature, admin}, allow),
  166. %kvs_acl:define_access({user_type, admin}, {feature, admin}, allow),
  167. [ kvs_user:subscribe(Me#user.username, Her#user.username) || Her <- UserList, Me <- UserList, Her /= Me ],
  168. [ kvs_user:init_mq(U) || U <- UserList ],
  169. ok.
  170. version() -> DBA=?DBA, DBA:version().
  171. add_configs() ->
  172. %% smtp
  173. kvs:put(#config{key="smtp/user", value="noreply@synrc.com"}),
  174. kvs:put(#config{key="smtp/password", value="maxim@synrc.com"}),
  175. kvs:put(#config{key="smtp/host", value="mail.synrc.com"}),
  176. kvs:put(#config{key="smtp/port", value=465}),
  177. kvs:put(#config{key="smtp/with_ssl", value=true}),
  178. kvs:put(#config{key="accounts/default_quota", value=2000}),
  179. kvs:put(#config{key="accounts/quota_limit/soft", value=-30}),
  180. kvs:put(#config{key="accounts/quota_limit/hard", value=-100}),
  181. kvs:put(#config{key="purchase/notifications/email", value=["maxim@synrc.com"]}),
  182. kvs:put(#config{key="delivery/notifications/email", value=["maxim@synrc.com"]}).
  183. put(Record) ->
  184. DBA=?DBA,
  185. DBA:put(Record).
  186. put_if_none_match(Record) ->
  187. DBA=?DBA,
  188. DBA:put_if_none_match(Record).
  189. update(Record, Meta) ->
  190. DBA=?DBA,
  191. DBA:update(Record, Meta).
  192. get(RecordName, Key) ->
  193. DBA=?DBA,
  194. DBA:get(RecordName, Key).
  195. get_for_update(RecordName, Key) ->
  196. DBA=?DBA,
  197. DBA:get_for_update(RecordName, Key).
  198. get(RecordName, Key, Default) ->
  199. DBA=?DBA,
  200. case DBA:get(RecordName, Key) of
  201. {ok,{RecordName,Key,Value}} ->
  202. error_logger:info_msg("db:get config value ~p,", [{RecordName, Key, Value}]),
  203. {ok,Value};
  204. {error, _B} ->
  205. error_logger:info_msg("db:get new config value ~p,", [{RecordName, Key, Default}]),
  206. DBA:put({RecordName,Key,Default}),
  207. {ok,Default} end.
  208. delete(Keys) -> DBA=?DBA, DBA:delete(Keys).
  209. delete(Tab, Key) -> error_logger:info_msg("db:delete ~p:~p",[Tab, Key]), DBA=?DBA,DBA:delete(Tab, Key).
  210. delete_by_index(Tab, IndexId, IndexVal) -> DBA=?DBA,DBA:delete_by_index(Tab, IndexId, IndexVal).
  211. multi_select(RecordName, Keys) -> DBA=?DBA,DBA:multi_select(RecordName, Keys).
  212. select(From, PredicateFunction) -> error_logger:info_msg("db:select ~p, ~p",[From,PredicateFunction]), DBA=?DBA, DBA:select(From, PredicateFunction).
  213. count(RecordName) -> DBA=?DBA,DBA:count(RecordName).
  214. all(RecordName) -> DBA=?DBA,DBA:all(RecordName).
  215. all_by_index(RecordName, Index, IndexValue) -> DBA=?DBA,DBA:all_by_index(RecordName, Index, IndexValue).
  216. next_id(RecordName) -> DBA=?DBA,DBA:next_id(RecordName).
  217. next_id(RecordName, Incr) -> DBA=?DBA,DBA:next_id(RecordName, Incr).
  218. next_id(RecordName, Default, Incr) -> DBA=?DBA,DBA:next_id(RecordName, Default, Incr).
  219. author_comments(Who) -> DBA=?DBA,DBA:author_comments(Who).
  220. make_admin(User) ->
  221. {ok,U} = kvs:get(user, User),
  222. kvs_acl:define_access({user, U#user.id}, {feature, admin}, allow),
  223. ok.
  224. make_rich(User) ->
  225. Q = kvs:get_config("accounts/default_quota", 300),
  226. kvs_account:transaction(User, quota, Q * 100, #tx_default_assignment{}),
  227. kvs_account:transaction(User, internal, Q, #tx_default_assignment{}),
  228. kvs_account:transaction(User, currency, Q * 2, #tx_default_assignment{}).
  229. list_to_term(String) ->
  230. {ok, T, _} = erl_scan:string(String++"."),
  231. case erl_parse:parse_term(T) of
  232. {ok, Term} ->
  233. Term;
  234. {error, Error} ->
  235. Error
  236. end.
  237. save_db(Path) ->
  238. Data = lists:append([all(B) || B <- [list_to_term(B) || B <- store_riak:dir()] ]),
  239. kvs:save(Path, Data).
  240. load_db(Path) ->
  241. add_seq_ids(),
  242. AllEntries = kvs:load(Path),
  243. [{_,_,{_,Handler}}] = ets:lookup(config, "riak_client"),
  244. [case is_tuple(E) of
  245. false -> skip;
  246. true -> put(E)
  247. end || E <- AllEntries].
  248. make_paid_fake(UId) ->
  249. put(#payment{user_id=UId,info= "fake_purchase"}).
  250. save(Dir, Value) ->
  251. filelib:ensure_dir(Dir),
  252. file:write_file(Dir, term_to_binary(Value)).
  253. load(Key) ->
  254. {ok, Bin} = file:read_file(Key),
  255. binary_to_term(Bin).
  256. coalesce(undefined, B) -> B;
  257. coalesce(A, _) -> A.
  258. uuid() ->
  259. R1 = random:uniform(round(math:pow(2, 48))) - 1,
  260. R2 = random:uniform(round(math:pow(2, 12))) - 1,
  261. R3 = random:uniform(round(math:pow(2, 32))) - 1,
  262. R4 = random:uniform(round(math:pow(2, 30))) - 1,
  263. R5 = erlang:phash({node(), now()}, round(math:pow(2, 32))),
  264. UUIDBin = <<R1:48, 4:4, R2:12, 2:2, R3:32, R4: 30>>,
  265. <<TL:32, TM:16, THV:16, CSR:8, CSL:8, N:48>> = UUIDBin,
  266. lists:flatten(io_lib:format("~8.16.0b-~4.16.0b-~4.16.0b-~2.16.0b~2.16.0b-~12.16.0b-~8.16.0b",
  267. [TL, TM, THV, CSR, CSL, N, R5])).
  268. uuname() ->
  269. lists:flatten(io_lib:format("~8.16.0b",[erlang:phash2({node(), now()}, round(math:pow(2, 32)))])).
  270. sha(Raw) ->
  271. lists:flatten([io_lib:format("~2.16.0b", [N]) || <<N>> <= crypto:sha(Raw)]).
  272. sha_upper(Raw) ->
  273. SHA = sha(Raw),
  274. string:to_upper(SHA).