store_mongo.erl 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. -module(store_mongo).
  2. -author("Vitaly Shutko").
  3. -author("Oleg Zinchenko").
  4. -copyright('Synrc Research Center s.r.o.').
  5. -include("metainfo.hrl").
  6. -compile(export_all).
  7. -define(POOL_NAME,mongo_pool).
  8. start() -> ok.
  9. stop() -> stopped.
  10. version() -> {version,"KVS MONGO"}.
  11. join([]) -> connect(),[kvs:init(?MODULE,M) || M <- kvs:modules()],ok;
  12. join(_Node) -> {error,not_implemented}.
  13. connect() ->
  14. {Conn,Pool} = config(),
  15. case Pool of none -> connect(Conn); _ -> connect(Pool,Conn) end.
  16. connect(Conn) ->
  17. Spec = {?POOL_NAME,{gen_server,start_link,[{local,?POOL_NAME},mc_worker,Conn,[]]},
  18. permanent,5000,worker,[kvs_sup]},
  19. {ok,_} = supervisor:start_child(kvs_sup,Spec),put(no_pool,true),ok.
  20. connect(Pool,Conn) ->
  21. Spec = poolboy:child_spec(?POOL_NAME,[{name,{local,?POOL_NAME}},{worker_module,mc_worker}]++Pool,Conn),
  22. {ok,_} = supervisor:start_child(kvs_sup,Spec),ok.
  23. config() ->
  24. Config = kvs:config(kvs,mongo),
  25. {connection,Conn} = proplists:lookup(connection,Config),
  26. P = proplists:lookup(pool,Config),
  27. Pool = case P of {pool,Pl} -> Pl; _ -> none end,
  28. {Conn,Pool}.
  29. transaction(Fun) ->
  30. case get(no_pool) of true -> Fun(?POOL_NAME); _ -> poolboy:transaction(?POOL_NAME,Fun) end.
  31. dir() ->
  32. {_,{_,{_,_,_,_,_,Colls}}} = transaction(fun (W) -> mongo:command(W,{<<"listCollections">>,1}) end),
  33. [{table,binary_to_list(C)} || {_,C,_,_} <- Colls].
  34. destroy() -> transaction(fun (W) -> [mongo:command(W,{<<"drop">>,to_binary(T)}) || {_,T} <- dir()] end).
  35. next_id(_Tab,_Incr) -> mongo_id_server:object_id().
  36. to_binary({<<ObjectId:12/binary>>}) -> {ObjectId};
  37. to_binary({Key,Value}) -> {Key,to_binary(Value)};
  38. to_binary(Value) -> to_binary(Value,false).
  39. to_binary(V,ForceList) ->
  40. if is_integer(V) -> V;
  41. is_list(V) -> case io_lib:printable_unicode_list(V) of
  42. false -> lists:map(fun (X) -> to_binary(X) end, V);
  43. true -> unicode:characters_to_binary(V,utf8,utf8) end;
  44. is_atom(V) -> list_to_binary(atom_to_list(V));
  45. is_pid(V) -> {pid,list_to_binary(pid_to_list(V))};
  46. true -> case ForceList of true -> [P] = io_lib:format("~p",[V]),list_to_binary(P); _ -> V end
  47. end.
  48. make_document(Tab,Key,Values) ->
  49. Table = kvs:table(Tab),
  50. list_to_tuple(['_id',make_id(Key)|list_to_doc(tl(Table#table.fields),Values)]).
  51. list_to_doc([],[]) -> [];
  52. list_to_doc([F|Fields],[V|Values]) ->
  53. case V of
  54. undefined -> list_to_doc(Fields,Values);
  55. _ ->
  56. case F of
  57. feed_id -> [F,make_id(V)|list_to_doc(Fields,Values)];
  58. _ -> [F,make_field(V)|list_to_doc(Fields,Values)]
  59. end
  60. end.
  61. make_id({<<ObjectId:12/binary>>}) -> {ObjectId};
  62. make_id(Term) -> to_binary(Term, true).
  63. make_field({geo_point, Coords}) when length(Coords) == 2; length(Coords) == 0 ->
  64. {<<"type">>, <<"Point">>, <<"coordinates">>, lists:reverse(Coords)};
  65. make_field({geo_polygon, Coords}) when is_list(Coords) ->
  66. {<<"type">>, <<"Polygon">>, <<"coordinates">>, [lists:reverse(Coord) || Coord <- Coords]};
  67. make_field(V) ->
  68. if is_atom(V) -> case V of
  69. true -> to_binary(V);
  70. false -> to_binary(V);
  71. _ -> {atom,atom_to_binary(V,utf8)} end;
  72. is_pid(V) -> {pid,list_to_binary(pid_to_list(V))};
  73. is_list(V) -> case io_lib:printable_unicode_list(V) of
  74. false -> lists:foldl(fun (X, Acc) -> [make_field(X)|Acc] end, [], V);
  75. true -> to_binary(V) end;
  76. true -> to_binary(V) end.
  77. make_record(Tab,Doc) ->
  78. Table = kvs:table(Tab),
  79. DocPropList = doc_to_proplist(tuple_to_list(Doc)),
  80. list_to_tuple([Tab|[proplists:get_value(atom_to_binary(F,utf8),DocPropList) || F <- Table#table.fields]]).
  81. decode_value({<<"type">>, <<"Point">>, <<"coordinates">>, Coords}) -> {geo_point, lists:reverse(Coords)};
  82. decode_value({<<"type">>, <<"Polygon">>, <<"coordinates">>, Coords}) -> {geo_polygon, [lists:reverse(Coord) || Coord <- Coords]};
  83. decode_value(<<"true">>) -> true;
  84. decode_value(<<"false">>) -> false;
  85. decode_value({<<"atom">>,Atom}) -> binary_to_atom(Atom,utf8);
  86. decode_value({<<"pid">>,Pid}) -> list_to_pid(binary_to_list(Pid));
  87. decode_value(V) when is_binary(V) -> unicode:characters_to_list(V,utf8);
  88. decode_value(V) -> V.
  89. decode_id({<<ObjectId:12/binary>>}) -> {ObjectId};
  90. decode_id(List) ->
  91. {ok,Tokens,_EndLine} = erl_scan:string(lists:append(binary_to_list(List), ".")),
  92. {ok,AbsForm} = erl_parse:parse_exprs(Tokens),
  93. {value,Value,_Bs} = erl_eval:exprs(AbsForm, erl_eval:new_bindings()),
  94. Value.
  95. doc_to_proplist(Doc) -> doc_to_proplist(Doc,[]).
  96. doc_to_proplist([],Acc) -> Acc;
  97. doc_to_proplist([<<"_id">>,V|Doc],Acc) -> doc_to_proplist(Doc,[{<<"id">>,decode_id(V)}|Acc]);
  98. doc_to_proplist([<<"feed_id">>,V|Doc],Acc) -> doc_to_proplist(Doc,[{<<"feed_id">>,decode_id(V)}|Acc]);
  99. doc_to_proplist([F,V|Doc],Acc) -> doc_to_proplist(Doc,[{F,decode_value(V)}|Acc]).
  100. get(Tab,Key) ->
  101. Result = transaction(fun (W) -> mongo:find_one(W,to_binary(Tab),{'_id',make_id(Key)}) end),
  102. case Result of {} -> {error,not_found}; {Doc} -> {ok, make_record(Tab,Doc)} end.
  103. put(Records) when is_list(Records) ->
  104. try lists:foreach(fun mongo_put/1,Records) catch error:Reason -> {error,Reason} end;
  105. put(Record) -> put([Record]).
  106. mongo_put(Record) ->
  107. Tab = element(1,Record),
  108. Key = element(2,Record),
  109. [_,_|Values] = tuple_to_list(Record),
  110. Sel = {'_id', make_id(Key)},
  111. transaction(fun (W) -> mongo:update(W,to_binary(Tab),Sel,{<<"$set">>, make_document(Tab,Key,Values)},true) end).
  112. delete(Tab,Key) ->
  113. transaction(fun (W) -> mongo:delete_one(W,to_binary(Tab),{'_id',Key}) end),ok.
  114. mongo_find(Tab,Sel) ->
  115. Cursor = transaction(fun (W) -> mongo:find(W,to_binary(Tab),Sel) end),
  116. Result = mc_cursor:rest(Cursor),
  117. mc_cursor:close(Cursor),
  118. case Result of [] -> []; _ -> [make_record(Tab,Doc) || Doc <- Result] end.
  119. all(Tab) -> mongo_find(Tab,{}).
  120. index(Tab,Key,Value) -> mongo_find(Tab,{to_binary(Key),to_binary(Value)}).
  121. create_table(Tab,_Options) -> transaction(fun (W) -> mongo:command(W,{<<"create">>,to_binary(Tab)}) end).
  122. add_table_index(Tab,Key) ->
  123. transaction(fun (W) -> mongo:ensure_index(W,to_binary(Tab),{key,{to_binary(Key,true),1}}) end).
  124. count(Tab) -> {_,{_,N}} = transaction(fun (W) -> mongo:command(W,{<<"count">>,to_binary(Tab)}) end),N.