store_redis.erl 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. -module(store_redis).
  2. %%-author('Andrey Martemyanov').
  3. %%-copyright('Synrc Research Center s.r.o.').
  4. -include_lib("kvs/include/config.hrl").
  5. -include_lib("kvs/include/kvs.hrl").
  6. -include_lib("kvs/include/metainfo.hrl").
  7. -export([
  8. start/0,
  9. stop/0,
  10. c/0,
  11. destroy/0,
  12. version/0,
  13. dir/0,
  14. join/1,
  15. change_storage/2,
  16. initialize/0,
  17. b2i/1,
  18. redis_table/1,
  19. redis_key/2,
  20. redis_keys/1,
  21. redis_put/1,
  22. redis_get/2,
  23. redis_get/1,
  24. redis_transaction/1,
  25. index/3,
  26. get/2,
  27. put/1,
  28. delete/2,
  29. count/1,
  30. all/1,
  31. next_id/2,
  32. create_table/2,
  33. add_table_index/2
  34. ]).
  35. start() -> erase(eredis_pid), {ok,C}=eredis:start_link(), erlang:put(eredis_pid,C), ok.
  36. stop() -> P=erase(eredis_pid), eredis:stop(P), ok.
  37. c() -> case get(eredis_pid) of
  38. P when is_pid(P) ->
  39. case is_process_alive(P) of true -> P; _ -> start(), erlang:get(eredis_pid) end;
  40. _ -> start(), get(eredis_pid) end.
  41. destroy() -> ok.
  42. version() -> {version,"KVS REDIS"}.
  43. dir() -> [{table,T}||T<-kvs:modules()].
  44. join(_Node) -> initialize(), ok.
  45. change_storage(_Table,_Type) -> ok.
  46. initialize() -> ok.
  47. b2i(B) -> list_to_integer(binary_to_list(B)).
  48. redis_table(RecordName) ->
  49. list_to_binary(atom_to_list(RecordName)).
  50. redis_key(RecordName,Key) ->
  51. <<(redis_table(RecordName))/binary,$:,(term_to_binary(Key))/binary>>.
  52. redis_keys(RecordName) ->
  53. case eredis:q(c(), ["keys", <<(redis_table(RecordName))/binary,$:,$*>> ]) of
  54. {ok,KeyList} when is_list(KeyList) -> KeyList;
  55. _ -> [] end.
  56. redis_put(#id_seq{thing=Thing,id=Incr}) when is_integer(Incr)->
  57. eredis:q(c(), ["SET", redis_key(id_seq,Thing), Incr]);
  58. redis_put(Record) ->
  59. Key = redis_key(element(1,Record),element(2,Record)),
  60. Value = term_to_binary(Record),
  61. eredis:q(c(), ["SET", Key, Value]).
  62. redis_get(Key, Fun) ->
  63. case eredis:q(c(), ["GET", Key]) of
  64. {ok, undefined} -> {error,not_found};
  65. {ok, <<"QUEUED">>} -> transaction;
  66. {ok, Value} ->
  67. if is_function(Fun) -> {ok,Fun(Value)};
  68. true -> {ok,binary_to_term(Value)} end;
  69. E -> {error, E} end.
  70. redis_get(Key) -> redis_get(Key, undefined).
  71. redis_transaction(Fun) ->
  72. {ok, <<"OK">>} = eredis:q(c(), ["MULTI"]),
  73. Fun(),
  74. {ok,List} = eredis:q(c(), ["EXEC"]),
  75. List.
  76. index(_RecordName,_Key,_Value) -> not_implemented.
  77. get(id_seq,Key) ->
  78. redis_get(redis_key(id_seq,Key), fun(Value) ->
  79. #id_seq{thing=Key,id=b2i(Value)} end);
  80. get(RecordName,Key) -> redis_get(redis_key(RecordName,Key)).
  81. put(Records) when is_list(Records) ->
  82. redis_transaction(fun() -> lists:foreach(fun put/1, Records) end);
  83. put(Record) -> redis_put(Record).
  84. delete(RecordName,Key) ->
  85. case eredis:q(c(), ["DEL", redis_key(RecordName,Key)]) of
  86. {ok,<<"1">>} -> ok;
  87. E -> {error, E} end.
  88. count(RecordName) -> length(redis_keys(RecordName)).
  89. all(RecordName) ->
  90. Keys = redis_keys(RecordName),
  91. List = redis_transaction(fun() -> [redis_get(Key) || Key <- Keys] end),
  92. case RecordName of
  93. id_seq ->
  94. lists:zipwith(fun(K,R) ->
  95. #id_seq{thing=binary_to_term(binary_part(K,7,size(K)-7)),id=b2i(R)}
  96. end, Keys, List);
  97. _ -> [ binary_to_term(R) || R <- List ] end.
  98. next_id(RecordName,Incr) ->
  99. Key = redis_key(id_seq,RecordName),
  100. {ok, Value} = eredis:q(c(), ["INCRBY", Key, Incr]),
  101. b2i(Value).
  102. create_table(_Name,_Options) -> ok.
  103. add_table_index(_Record,_Field) -> not_implemented.