store_redis.erl 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. -module(store_redis).
  2. -author('Andrey Martemyanov').
  3. -copyright('Synrc Research Center s.r.o.').
  4. -include("config.hrl").
  5. -include("kvs.hrl").
  6. -include("metainfo.hrl").
  7. -compile(export_all).
  8. start() -> erase(eredis_pid), {ok,C}=eredis:start_link(), put(eredis_pid,C), ok.
  9. stop() -> P=erase(eredis_pid), eredis:stop(P), ok.
  10. c() -> case get(eredis_pid) of
  11. P when is_pid(P) ->
  12. case is_process_alive(P) of true -> P; _ -> start(), get(eredis_pid) end;
  13. _ -> start(), get(eredis_pid) end.
  14. destroy() -> ok.
  15. version() -> {version,"KVS REDIS"}.
  16. dir() -> [{table,T}||T<-kvs:modules()].
  17. join() -> initialize(), ok.
  18. join(_Node) -> initialize(), ok.
  19. change_storage(_Table,_Type) -> ok.
  20. initialize() -> ok.
  21. b2i(B) -> list_to_integer(binary_to_list(B)).
  22. redis_table(RecordName) ->
  23. list_to_binary(atom_to_list(RecordName)).
  24. redis_key(RecordName,Key) ->
  25. <<(redis_table(RecordName))/binary,$:,(term_to_binary(Key))/binary>>.
  26. redis_keys(RecordName) ->
  27. case eredis:q(c(), ["keys", <<(redis_table(RecordName))/binary,$:,$*>> ]) of
  28. {ok,KeyList} when is_list(KeyList) -> KeyList;
  29. _ -> [] end.
  30. redis_put(#id_seq{thing=Thing,id=Incr}) when is_integer(Incr)->
  31. eredis:q(c(), ["SET", redis_key(id_seq,Thing), Incr]);
  32. redis_put(Record) ->
  33. Key = redis_key(element(1,Record),element(2,Record)),
  34. Value = term_to_binary(Record),
  35. eredis:q(c(), ["SET", Key, Value]).
  36. redis_get(Key, Fun) ->
  37. case eredis:q(c(), ["GET", Key]) of
  38. {ok, undefined} -> {error,not_found};
  39. {ok, <<"QUEUED">>} -> transaction;
  40. {ok, Value} ->
  41. if is_function(Fun) -> {ok,Fun(Value)};
  42. true -> {ok,binary_to_term(Value)} end;
  43. E -> {error, E} end.
  44. redis_get(Key) -> redis_get(Key, undefined).
  45. redis_transaction(Fun) ->
  46. {ok, <<"OK">>} = eredis:q(c(), ["MULTI"]),
  47. Fun(),
  48. {ok,List} = eredis:q(c(), ["EXEC"]),
  49. List.
  50. index(_RecordName,_Key,_Value) -> not_implemented.
  51. get(id_seq,Key) ->
  52. redis_get(redis_key(id_seq,Key), fun(Value) ->
  53. #id_seq{thing=Key,id=b2i(Value)} end);
  54. get(RecordName,Key) -> redis_get(redis_key(RecordName,Key)).
  55. put(Records) when is_list(Records) ->
  56. redis_transaction(fun() -> lists:foreach(fun put/1, Records) end);
  57. put(Record) -> redis_put(Record).
  58. delete(RecordName,Key) ->
  59. case eredis:q(c(), ["DEL", redis_key(RecordName,Key)]) of
  60. {ok,<<"1">>} -> ok;
  61. E -> {error, E} end.
  62. count(RecordName) -> length(redis_keys(RecordName)).
  63. all(RecordName) ->
  64. Keys = redis_keys(RecordName),
  65. List = redis_transaction(fun() -> [redis_get(Key) || Key <- Keys] end),
  66. case RecordName of
  67. id_seq ->
  68. lists:zipwith(fun(K,R) ->
  69. #id_seq{thing=binary_to_term(binary_part(K,7,size(K)-7)),id=b2i(R)}
  70. end, Keys, List);
  71. _ -> [ binary_to_term(R) || R <- List ] end.
  72. next_id(RecordName,Incr) ->
  73. Key = redis_key(id_seq,RecordName),
  74. {ok, Value} = eredis:q(c(), ["INCRBY", Key, Incr]),
  75. b2i(Value).
  76. create_table(_Name,_Options) -> ok.
  77. add_table_index(_Record,_Field) -> not_implemented.