123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- -module(store_mongo).
- -author("Vitaly Shutko").
- -author("Oleg Zinchenko").
- -copyright('Synrc Research Center s.r.o.').
- -include("metainfo.hrl").
- -compile(export_all).
- -define(POOL_NAME,mongo_pool).
- start() -> ok.
- stop() -> stopped.
- version() -> {version,"KVS MONGO"}.
- join([]) -> connect(),[kvs:init(?MODULE,M) || M <- kvs:modules()],ok;
- join(_Node) -> {error,not_implemented}.
- connect() ->
- {Conn,Pool} = config(),
- case Pool of none -> connect(Conn); _ -> connect(Pool,Conn) end.
- connect(Conn) ->
- Spec = {?POOL_NAME,{gen_server,start_link,[{local,?POOL_NAME},mc_worker,Conn,[]]},
- permanent,5000,worker,[kvs_sup]},
- {ok,_} = supervisor:start_child(kvs_sup,Spec),put(no_pool,true),ok.
- connect(Pool,Conn) ->
- Spec = poolboy:child_spec(?POOL_NAME,[{name,{local,?POOL_NAME}},{worker_module,mc_worker}]++Pool,Conn),
- {ok,_} = supervisor:start_child(kvs_sup,Spec),ok.
- config() ->
- Config = kvs:config(kvs,mongo),
- {connection,Conn} = proplists:lookup(connection,Config),
- P = proplists:lookup(pool,Config),
- Pool = case P of {pool,Pl} -> Pl; _ -> none end,
- {Conn,Pool}.
- transaction(Fun) ->
- case get(no_pool) of true -> Fun(?POOL_NAME); _ -> poolboy:transaction(?POOL_NAME,Fun) end.
- dir() ->
- {_,{_,{_,_,_,_,_,Colls}}} = transaction(fun (W) -> mongo:command(W,{<<"listCollections">>,1}) end),
- [{table,binary_to_list(C)} || {_,C,_,_} <- Colls].
- destroy() -> transaction(fun (W) -> [mongo:command(W,{<<"drop">>,to_binary(T)}) || {_,T} <- dir()] end).
- next_id(_Tab,_Incr) -> mongo_id_server:object_id().
- to_binary({<<ObjectId:12/binary>>}) -> {ObjectId};
- to_binary({Key,Value}) -> {Key,to_binary(Value)};
- to_binary(Value) -> to_binary(Value,false).
- to_binary(V,ForceList) ->
- if is_integer(V) -> V;
- is_list(V) -> case io_lib:printable_unicode_list(V) of
- false -> lists:map(fun (X) -> to_binary(X) end, V);
- true -> unicode:characters_to_binary(V,utf8,utf8) end;
- is_atom(V) -> list_to_binary(atom_to_list(V));
- is_pid(V) -> {pid,list_to_binary(pid_to_list(V))};
- true -> case ForceList of true -> [P] = io_lib:format("~p",[V]),list_to_binary(P); _ -> V end
- end.
- make_document(Tab,Key,Values) ->
- Table = kvs:table(Tab),
- list_to_tuple(['_id',make_id(Key)|list_to_doc(tl(Table#table.fields),Values)]).
- list_to_doc([],[]) -> [];
- list_to_doc([F|Fields],[V|Values]) ->
- case V of
- undefined -> list_to_doc(Fields,Values);
- _ ->
- case F of
- feed_id -> [F,make_id(V)|list_to_doc(Fields,Values)];
- _ -> [F,make_field(V)|list_to_doc(Fields,Values)]
- end
- end.
- make_id({<<ObjectId:12/binary>>}) -> {ObjectId};
- make_id(Term) -> to_binary(Term, true).
- make_field({geo_point, Coords}) when length(Coords) == 2; length(Coords) == 0 ->
- {<<"type">>, <<"Point">>, <<"coordinates">>, lists:reverse(Coords)};
- make_field({geo_polygon, Coords}) when is_list(Coords) ->
- {<<"type">>, <<"Polygon">>, <<"coordinates">>, [lists:reverse(Coord) || Coord <- Coords]};
- make_field(V) ->
- if is_atom(V) -> case V of
- true -> to_binary(V);
- false -> to_binary(V);
- _ -> {atom,atom_to_binary(V,utf8)} end;
- is_pid(V) -> {pid,list_to_binary(pid_to_list(V))};
- is_list(V) -> case io_lib:printable_unicode_list(V) of
- false -> lists:foldl(fun (X, Acc) -> [make_field(X)|Acc] end, [], V);
- true -> to_binary(V) end;
- true -> to_binary(V) end.
- make_record(Tab,Doc) ->
- Table = kvs:table(Tab),
- DocPropList = doc_to_proplist(tuple_to_list(Doc)),
- list_to_tuple([Tab|[proplists:get_value(atom_to_binary(F,utf8),DocPropList) || F <- Table#table.fields]]).
- decode_value({<<"type">>, <<"Point">>, <<"coordinates">>, Coords}) -> {geo_point, lists:reverse(Coords)};
- decode_value({<<"type">>, <<"Polygon">>, <<"coordinates">>, Coords}) -> {geo_polygon, [lists:reverse(Coord) || Coord <- Coords]};
- decode_value(<<"true">>) -> true;
- decode_value(<<"false">>) -> false;
- decode_value({<<"atom">>,Atom}) -> binary_to_atom(Atom,utf8);
- decode_value({<<"pid">>,Pid}) -> list_to_pid(binary_to_list(Pid));
- decode_value(V) when is_binary(V) -> unicode:characters_to_list(V,utf8);
- decode_value(V) -> V.
- decode_id({<<ObjectId:12/binary>>}) -> {ObjectId};
- decode_id(List) ->
- {ok,Tokens,_EndLine} = erl_scan:string(lists:append(binary_to_list(List), ".")),
- {ok,AbsForm} = erl_parse:parse_exprs(Tokens),
- {value,Value,_Bs} = erl_eval:exprs(AbsForm, erl_eval:new_bindings()),
- Value.
- doc_to_proplist(Doc) -> doc_to_proplist(Doc,[]).
- doc_to_proplist([],Acc) -> Acc;
- doc_to_proplist([<<"_id">>,V|Doc],Acc) -> doc_to_proplist(Doc,[{<<"id">>,decode_id(V)}|Acc]);
- doc_to_proplist([<<"feed_id">>,V|Doc],Acc) -> doc_to_proplist(Doc,[{<<"feed_id">>,decode_id(V)}|Acc]);
- doc_to_proplist([F,V|Doc],Acc) -> doc_to_proplist(Doc,[{F,decode_value(V)}|Acc]).
- get(Tab,Key) ->
- Result = transaction(fun (W) -> mongo:find_one(W,to_binary(Tab),{'_id',make_id(Key)}) end),
- case Result of {} -> {error,not_found}; {Doc} -> {ok, make_record(Tab,Doc)} end.
- put(Records) when is_list(Records) ->
- try lists:foreach(fun mongo_put/1,Records) catch error:Reason -> {error,Reason} end;
- put(Record) -> put([Record]).
- mongo_put(Record) ->
- Tab = element(1,Record),
- Key = element(2,Record),
- [_,_|Values] = tuple_to_list(Record),
- Sel = {'_id', make_id(Key)},
- transaction(fun (W) -> mongo:update(W,to_binary(Tab),Sel,{<<"$set">>, make_document(Tab,Key,Values)},true) end).
- delete(Tab,Key) ->
- transaction(fun (W) -> mongo:delete_one(W,to_binary(Tab),{'_id',Key}) end),ok.
- mongo_find(Tab,Sel) ->
- Cursor = transaction(fun (W) -> mongo:find(W,to_binary(Tab),Sel) end),
- Result = mc_cursor:rest(Cursor),
- mc_cursor:close(Cursor),
- case Result of [] -> []; _ -> [make_record(Tab,Doc) || Doc <- Result] end.
- all(Tab) -> mongo_find(Tab,{}).
- index(Tab,Key,Value) -> mongo_find(Tab,{to_binary(Key),to_binary(Value)}).
- create_table(Tab,_Options) -> transaction(fun (W) -> mongo:command(W,{<<"create">>,to_binary(Tab)}) end).
- add_table_index(Tab,Key) ->
- transaction(fun (W) -> mongo:ensure_index(W,to_binary(Tab),{key,{to_binary(Key,true),1}}) end).
- count(Tab) -> {_,{_,N}} = transaction(fun (W) -> mongo:command(W,{<<"count">>,to_binary(Tab)}) end),N.
|