|
@@ -1,25 +1,59 @@
|
|
-module(store_sql).
|
|
-module(store_sql).
|
|
-author('Daniil Churikov').
|
|
-author('Daniil Churikov').
|
|
--license("MIT").
|
|
|
|
|
|
+-author('Max Davidenko').
|
|
-compile(export_all).
|
|
-compile(export_all).
|
|
--include("sql.hrl").
|
|
|
|
--include("metainfo.hrl").
|
|
|
|
|
|
+-include_lib("kvs/include/sql.hrl").
|
|
|
|
+-include_lib("kvs/include/metainfo.hrl").
|
|
|
|
+
|
|
|
|
+-define(SETTINGS, #sql_settings{placeholder = fun (_, Pos, _) -> PosBin = integer_to_binary(Pos), <<"$", PosBin/binary>> end}).
|
|
|
|
+
|
|
|
|
+start() -> ok.
|
|
|
|
+stop() -> ok.
|
|
|
|
+
|
|
|
|
+version() -> {version,"KVS SQL 1.0.0"}.
|
|
|
|
+join() -> ok.
|
|
|
|
+join(_) -> ok.
|
|
|
|
+initialize() -> ok.
|
|
|
|
|
|
insert(E, Table, S) ->
|
|
insert(E, Table, S) ->
|
|
SkipFun = fun(#column{ro = RO}, V) -> RO orelse V == '$skip' end,
|
|
SkipFun = fun(#column{ro = RO}, V) -> RO orelse V == '$skip' end,
|
|
Q = prepare_insert(skip(SkipFun, Table#table.columns, e2l(E)), Table, S),
|
|
Q = prepare_insert(skip(SkipFun, Table#table.columns, e2l(E)), Table, S),
|
|
- if Q#query.values /= [] -> {ok, build(Q)};
|
|
|
|
- true -> {error, empty_insert} end.
|
|
|
|
|
|
+ if
|
|
|
|
+ Q#query.values /= [] ->
|
|
|
|
+ Query = build(Q),
|
|
|
|
+ BinFold = fun(Elem, Acc) ->
|
|
|
|
+ <<Acc/binary, Elem/binary>>
|
|
|
|
+ end,
|
|
|
|
+ NewBody = lists:foldl(BinFold, <<>>, lists:flatten(Query#query.body)),
|
|
|
|
+ NewVals = lists:map(fun convertBindingVal/1, Query#query.values),
|
|
|
|
+ {ok, Query#query{body = NewBody, values = NewVals}};
|
|
|
|
+ true ->
|
|
|
|
+ {error, empty_insert}
|
|
|
|
+ end.
|
|
|
|
|
|
select_pk(E, Table, S) ->
|
|
select_pk(E, Table, S) ->
|
|
SkipFun = fun(#column{key = Key}, _) -> not Key end,
|
|
SkipFun = fun(#column{key = Key}, _) -> not Key end,
|
|
Q = prepare_select(skip(SkipFun, Table#table.columns, e2l(E)), Table, S),
|
|
Q = prepare_select(skip(SkipFun, Table#table.columns, e2l(E)), Table, S),
|
|
- if Q#query.values /= [] -> {ok, build(Q)};
|
|
|
|
|
|
+ if
|
|
|
|
+ Q#query.values /= [] ->
|
|
|
|
+ Query = build(Q),
|
|
|
|
+ BinFold = fun(Elem, Acc) ->
|
|
|
|
+ <<Acc/binary, Elem/binary>>
|
|
|
|
+ end,
|
|
|
|
+ NewBody = lists:foldl(BinFold, <<>>, lists:flatten(Query#query.body)),
|
|
|
|
+ NewVals = lists:map(fun convertBindingVal/1, Query#query.values),
|
|
|
|
+ {ok, Query#query{body = NewBody, values = NewVals}};
|
|
true -> {error, pk_miss} end.
|
|
true -> {error, pk_miss} end.
|
|
|
|
|
|
select(E, Table, S) ->
|
|
select(E, Table, S) ->
|
|
SkipFun = fun(_, V) -> V == '$skip' end,
|
|
SkipFun = fun(_, V) -> V == '$skip' end,
|
|
- {ok, build(prepare_select(skip(SkipFun, Table#table.columns, e2l(E)), Table, S))}.
|
|
|
|
|
|
+ Query = build(prepare_select(skip(SkipFun, Table#table.columns, e2l(E)), Table, S)),
|
|
|
|
+ BinFold = fun(Elem, Acc) ->
|
|
|
|
+ <<Acc/binary, Elem/binary>>
|
|
|
|
+ end,
|
|
|
|
+ NewBody = lists:foldl(BinFold, <<>>, lists:flatten(Query#query.body)),
|
|
|
|
+ NewVals = lists:map(fun convertBindingVal/1, Query#query.values),
|
|
|
|
+ {ok, Query#query{body = NewBody, values = NewVals}}.
|
|
|
|
|
|
update_pk(E, Table = #table{columns = MekaoCols}, S) ->
|
|
update_pk(E, Table = #table{columns = MekaoCols}, S) ->
|
|
SetSkipFun = fun(#column{ro = RO}, V) -> RO orelse V == '$skip' end,
|
|
SetSkipFun = fun(#column{ro = RO}, V) -> RO orelse V == '$skip' end,
|
|
@@ -29,7 +63,15 @@ update_pk(E, Table = #table{columns = MekaoCols}, S) ->
|
|
skip(WhereSkipFun, MekaoCols, Vals),Table, S),
|
|
skip(WhereSkipFun, MekaoCols, Vals),Table, S),
|
|
if (Q#query.body)#sql_update.set == [] -> {error, empty_update};
|
|
if (Q#query.body)#sql_update.set == [] -> {error, empty_update};
|
|
(Q#query.body)#sql_update.where == [] -> {error, pk_miss};
|
|
(Q#query.body)#sql_update.where == [] -> {error, pk_miss};
|
|
- true -> {ok, build(Q)} end.
|
|
|
|
|
|
+ true ->
|
|
|
|
+ Query = build(Q),
|
|
|
|
+ BinFold = fun(Elem, Acc) ->
|
|
|
|
+ <<Acc/binary, Elem/binary>>
|
|
|
|
+ end,
|
|
|
|
+ NewBody = lists:foldl(BinFold, <<>>, lists:flatten(Query#query.body)),
|
|
|
|
+ NewVals = lists:map(fun convertBindingVal/1, Query#query.values),
|
|
|
|
+ {ok, Query#query{body = NewBody, values = NewVals}}
|
|
|
|
+ end.
|
|
|
|
|
|
update_pk_diff(E1, E2, Table = #table{columns = MekaoCols}, S) ->
|
|
update_pk_diff(E1, E2, Table = #table{columns = MekaoCols}, S) ->
|
|
Vals1 = e2l(E1),
|
|
Vals1 = e2l(E1),
|
|
@@ -43,7 +85,15 @@ update_pk_diff(E1, E2, Table = #table{columns = MekaoCols}, S) ->
|
|
skip(WhereSkipFun, MekaoCols, Vals1),Table, S),
|
|
skip(WhereSkipFun, MekaoCols, Vals1),Table, S),
|
|
if (Q#query.body)#sql_update.set == [] -> {error, empty_update};
|
|
if (Q#query.body)#sql_update.set == [] -> {error, empty_update};
|
|
(Q#query.body)#sql_update.where == [] -> {error, pk_miss};
|
|
(Q#query.body)#sql_update.where == [] -> {error, pk_miss};
|
|
- true -> {ok, build(Q)} end.
|
|
|
|
|
|
+ true ->
|
|
|
|
+ Query = build(Q),
|
|
|
|
+ BinFold = fun(Elem, Acc) ->
|
|
|
|
+ <<Acc/binary, Elem/binary>>
|
|
|
|
+ end,
|
|
|
|
+ NewBody = lists:foldl(BinFold, <<>>, lists:flatten(Query#query.body)),
|
|
|
|
+ NewVals = lists:map(fun convertBindingVal/1, Query#query.values),
|
|
|
|
+ {ok, Query#query{body = NewBody, values = NewVals}}
|
|
|
|
+ end.
|
|
|
|
|
|
update(E, Selector, Table = #table{columns = MekaoCols}, S) ->
|
|
update(E, Selector, Table = #table{columns = MekaoCols}, S) ->
|
|
SetSkipFun = fun(#column{ro = RO}, V) -> RO orelse V == '$skip' end,
|
|
SetSkipFun = fun(#column{ro = RO}, V) -> RO orelse V == '$skip' end,
|
|
@@ -51,22 +101,43 @@ update(E, Selector, Table = #table{columns = MekaoCols}, S) ->
|
|
Q = prepare_update(skip(SetSkipFun, MekaoCols, e2l(E)),
|
|
Q = prepare_update(skip(SetSkipFun, MekaoCols, e2l(E)),
|
|
skip(WhereSkipFun, MekaoCols, e2l(Selector)),Table, S),
|
|
skip(WhereSkipFun, MekaoCols, e2l(Selector)),Table, S),
|
|
if (Q#query.body)#sql_update.set == [] -> {error, empty_update};
|
|
if (Q#query.body)#sql_update.set == [] -> {error, empty_update};
|
|
- true -> {ok, build(Q)} end.
|
|
|
|
|
|
+ true ->
|
|
|
|
+ Query = build(Q),
|
|
|
|
+ BinFold = fun(Elem, Acc) ->
|
|
|
|
+ <<Acc/binary, Elem/binary>>
|
|
|
|
+ end,
|
|
|
|
+ NewBody = lists:foldl(BinFold, <<>>, lists:flatten(Query#query.body)),
|
|
|
|
+ NewVals = lists:map(fun convertBindingVal/1, Query#query.values),
|
|
|
|
+ {ok, Query#query{body = NewBody, values = NewVals}}
|
|
|
|
+ end.
|
|
|
|
|
|
delete_pk(E, Table, S) ->
|
|
delete_pk(E, Table, S) ->
|
|
SkipFun = fun(#column{key = Key}, _) -> not Key end,
|
|
SkipFun = fun(#column{key = Key}, _) -> not Key end,
|
|
Q = prepare_delete(skip(SkipFun, Table#table.columns, e2l(E)), Table, S),
|
|
Q = prepare_delete(skip(SkipFun, Table#table.columns, e2l(E)), Table, S),
|
|
- if Q#query.values /= [] -> {ok, build(Q)};
|
|
|
|
|
|
+ if Q#query.values /= [] ->
|
|
|
|
+ Query = build(Q),
|
|
|
|
+ BinFold = fun(Elem, Acc) ->
|
|
|
|
+ <<Acc/binary, Elem/binary>>
|
|
|
|
+ end,
|
|
|
|
+ NewBody = lists:foldl(BinFold, <<>>, lists:flatten(Query#query.body)),
|
|
|
|
+ NewVals = lists:map(fun convertBindingVal/1, Query#query.values),
|
|
|
|
+ {ok, Query#query{body = NewBody, values = NewVals}};
|
|
true -> {error, pk_miss} end.
|
|
true -> {error, pk_miss} end.
|
|
|
|
|
|
delete(Selector, Table, S) ->
|
|
delete(Selector, Table, S) ->
|
|
SkipFun = fun(_, V) -> V == '$skip' end,
|
|
SkipFun = fun(_, V) -> V == '$skip' end,
|
|
Q = prepare_delete(skip(SkipFun, Table#table.columns, e2l(Selector)), Table, S),
|
|
Q = prepare_delete(skip(SkipFun, Table#table.columns, e2l(Selector)), Table, S),
|
|
- {ok, build(Q)}.
|
|
|
|
|
|
+ Query = build(Q),
|
|
|
|
+ BinFold = fun(Elem, Acc) ->
|
|
|
|
+ <<Acc/binary, Elem/binary>>
|
|
|
|
+ end,
|
|
|
|
+ NewBody = lists:foldl(BinFold, <<>>, lists:flatten(Query#query.body)),
|
|
|
|
+ NewVals = lists:map(fun convertBindingVal/1, Query#query.values),
|
|
|
|
+ {ok, Query#query{body = NewBody, values = NewVals}}.
|
|
|
|
|
|
prepare_insert(E, Table, S) ->
|
|
prepare_insert(E, Table, S) ->
|
|
{Cols, PHs, Types, Vals} = qdata(1, e2l(E), Table#table.columns, S),
|
|
{Cols, PHs, Types, Vals} = qdata(1, e2l(E), Table#table.columns, S),
|
|
- Q = #sql_insert{table=Table#table.name,columns=intersperse(Cols, <<", ">>),
|
|
|
|
|
|
+ Q = #sql_insert{table=atom_to_binary(Table#table.name, utf8),columns=intersperse(Cols, <<", ">>),
|
|
values=intersperse(PHs, <<", ">>),returning=returning(insert, Table, S)},
|
|
values=intersperse(PHs, <<", ">>),returning=returning(insert, Table, S)},
|
|
#query{body=Q,types=Types,values=Vals,next_ph_num=length(PHs) + 1}.
|
|
#query{body=Q,types=Types,values=Vals,next_ph_num=length(PHs) + 1}.
|
|
|
|
|
|
@@ -74,7 +145,7 @@ prepare_select(E, Table, S) ->
|
|
#table{columns = MekaoCols,order_by = OrderBy} = Table,
|
|
#table{columns = MekaoCols,order_by = OrderBy} = Table,
|
|
{Where, {_, PHs, Types, Vals}} = where(qdata(1, e2l(E), MekaoCols, S), S),
|
|
{Where, {_, PHs, Types, Vals}} = where(qdata(1, e2l(E), MekaoCols, S), S),
|
|
AllCols = intersperse(MekaoCols, <<", ">>, fun(#column{name = Name}) -> Name end),
|
|
AllCols = intersperse(MekaoCols, <<", ">>, fun(#column{name = Name}) -> Name end),
|
|
- Q = #sql_select{table=Table#table.name,columns=AllCols,where=Where,order_by=order_by(OrderBy)},
|
|
|
|
|
|
+ Q = #sql_select{table=atom_to_binary(Table#table.name, utf8),columns=AllCols,where=Where,order_by=order_by(OrderBy)},
|
|
#query{body=Q,types=Types,values=Vals,next_ph_num = length(PHs) + 1}.
|
|
#query{body=Q,types=Types,values=Vals,next_ph_num = length(PHs) + 1}.
|
|
|
|
|
|
prepare_update(SetE, WhereE, Table = #table{columns = MekaoCols}, S) ->
|
|
prepare_update(SetE, WhereE, Table = #table{columns = MekaoCols}, S) ->
|
|
@@ -83,12 +154,12 @@ prepare_update(SetE, WhereE, Table = #table{columns = MekaoCols}, S) ->
|
|
{Where, {_, WherePHs, WhereTypes, WhereVals}} = where(qdata(SetPHsLen + 1, e2l(WhereE), MekaoCols, S), S),
|
|
{Where, {_, WherePHs, WhereTypes, WhereVals}} = where(qdata(SetPHsLen + 1, e2l(WhereE), MekaoCols, S), S),
|
|
WherePHsLen = length(WherePHs),
|
|
WherePHsLen = length(WherePHs),
|
|
Set = intersperse2(fun (C, PH) -> [C, <<" = ">>, PH] end,<<", ">>, SetCols, SetPHs),
|
|
Set = intersperse2(fun (C, PH) -> [C, <<" = ">>, PH] end,<<", ">>, SetCols, SetPHs),
|
|
- Q = #sql_update{table=Table#table.name,set=Set,where=Where,returning=returning(update, Table, S)},
|
|
|
|
|
|
+ Q = #sql_update{table=atom_to_binary(Table#table.name, utf8),set=Set,where=Where,returning=returning(update, Table, S)},
|
|
#query{body=Q,types=SetTypes++WhereTypes,values=SetVals++WhereVals,next_ph_num = SetPHsLen + WherePHsLen + 1}.
|
|
#query{body=Q,types=SetTypes++WhereTypes,values=SetVals++WhereVals,next_ph_num = SetPHsLen + WherePHsLen + 1}.
|
|
|
|
|
|
prepare_delete(E, Table, S) ->
|
|
prepare_delete(E, Table, S) ->
|
|
{Where, {_, PHs, Types, Vals}} = where(qdata(1, e2l(E), Table#table.columns, S), S),
|
|
{Where, {_, PHs, Types, Vals}} = where(qdata(1, e2l(E), Table#table.columns, S), S),
|
|
- Q = #sql_delete{table=Table#table.name,where=Where,returning=returning(delete, Table, S)},
|
|
|
|
|
|
+ Q = #sql_delete{table=atom_to_binary(Table#table.name, utf8),where=Where,returning=returning(delete, Table, S)},
|
|
#query{body=Q,types=Types,values=Vals,next_ph_num = length(PHs) + 1}.
|
|
#query{body=Q,types=Types,values=Vals,next_ph_num = length(PHs) + 1}.
|
|
|
|
|
|
build(Q = #query{body = Select}) when is_record(Select, sql_select) ->
|
|
build(Q = #query{body = Select}) when is_record(Select, sql_select) ->
|
|
@@ -148,6 +219,7 @@ op_to_bin('>=') -> <<" >= ">>;
|
|
op_to_bin('<') -> <<" < ">>;
|
|
op_to_bin('<') -> <<" < ">>;
|
|
op_to_bin('<=') -> <<" <= ">>.
|
|
op_to_bin('<=') -> <<" <= ">>.
|
|
|
|
|
|
|
|
+order_by(undefined) -> [];
|
|
order_by([]) -> [];
|
|
order_by([]) -> [];
|
|
order_by([O]) -> [order_by_1(O)];
|
|
order_by([O]) -> [order_by_1(O)];
|
|
order_by([O | OrderBy]) -> [order_by_1(O), <<", ">> | order_by(OrderBy)].
|
|
order_by([O | OrderBy]) -> [order_by_1(O), <<", ">> | order_by(OrderBy)].
|
|
@@ -188,3 +260,79 @@ intersperse([Item | Items], Sep, Fun) -> [Fun(Item), Sep | intersperse(Items, Se
|
|
intersperse2(_Fun, _Sep, [], []) -> [];
|
|
intersperse2(_Fun, _Sep, [], []) -> [];
|
|
intersperse2(Fun, _Sep, [I1], [I2]) -> [Fun(I1, I2)];
|
|
intersperse2(Fun, _Sep, [I1], [I2]) -> [Fun(I1, I2)];
|
|
intersperse2(Fun, Sep, [I1 | I1s], [I2 | I2s]) -> [Fun(I1, I2), Sep | intersperse2(Fun, Sep, I1s, I2s)].
|
|
intersperse2(Fun, Sep, [I1 | I1s], [I2 | I2s]) -> [Fun(I1, I2), Sep | intersperse2(Fun, Sep, I1s, I2s)].
|
|
|
|
+
|
|
|
|
+put(Records) when is_list(Records) -> lists:foreach(fun sql_put/1, Records);
|
|
|
|
+put(Record) -> sql_put(Record).
|
|
|
|
+
|
|
|
|
+sql_put(Record) ->
|
|
|
|
+ Table = kvs:table(element(1, Record)),
|
|
|
|
+ io:format("Trying to put ~p ~n", [Record]),
|
|
|
|
+ {ok, Query} = insert(Record, Table, ?SETTINGS),
|
|
|
|
+ io:format("Query [~p], vals [~p] ~n", [Query#query.body, Query#query.values]),
|
|
|
|
+ PutRes = extendedQuery(Query#query.body, Query#query.values),
|
|
|
|
+ case PutRes of
|
|
|
|
+ {ok, _} -> ok;
|
|
|
|
+ _ -> PutRes end.
|
|
|
|
+
|
|
|
|
+get(Table, Key) ->
|
|
|
|
+ io:format("Trying to get from ~p by ~p ~n", [Table, Key]),
|
|
|
|
+ TableSpec = kvs:table(Table),
|
|
|
|
+ SkipVals = lists:duplicate(length(TableSpec#table.fields) - 1, '$skip'),
|
|
|
|
+ RecList = [Table | [Key | SkipVals]],
|
|
|
|
+ Record = list_to_tuple(RecList),
|
|
|
|
+ sql_get(TableSpec, Record).
|
|
|
|
+
|
|
|
|
+sql_get(TableSpec, Record) ->
|
|
|
|
+ io:format("Record is: ~p ~n", [Record]),
|
|
|
|
+ {ok, Query} = select(Record, TableSpec, ?SETTINGS),
|
|
|
|
+ io:format("Query is: ~p ~n", [Query#query.body]),
|
|
|
|
+ QueryRes = extendedQuery(Query#query.body, Query#query.values),
|
|
|
|
+ case QueryRes of
|
|
|
|
+ {ok, []} -> {error, not_found};
|
|
|
|
+ {ok, Rows} -> [Row | _] = Rows, {ok, proplistToRecord(element(1, Record), Row)};
|
|
|
|
+ {ok, _Count, Rows} -> [Row | _] = Rows,{ok, proplistToRecord(element(1, Record), Row)};
|
|
|
|
+ _ -> QueryRes end.
|
|
|
|
+
|
|
|
|
+proplistToRecord(Tag, Proplist) ->
|
|
|
|
+ ValsExt = fun(Elem) ->
|
|
|
|
+ {_, Val} = Elem,
|
|
|
|
+ Val
|
|
|
|
+ end,
|
|
|
|
+ Vals = lists:map(ValsExt, Proplist),
|
|
|
|
+ RecList = [Tag | Vals],
|
|
|
|
+ list_to_tuple(RecList).
|
|
|
|
+
|
|
|
|
+convertBindingVal(Val) when is_integer(Val) -> Val;
|
|
|
|
+convertBindingVal(Val) when is_float(Val) -> Val;
|
|
|
|
+convertBindingVal(Val) -> term_to_binary(Val).
|
|
|
|
+
|
|
|
|
+mapResult([], _Rows) -> [];
|
|
|
|
+mapResult(_Columns, []) -> [];
|
|
|
|
+mapResult(Columns, Rows) ->
|
|
|
|
+ ColumnsExtractor = fun(Elem) -> Elem#column.name end,
|
|
|
|
+ ColsNames = lists:map(ColumnsExtractor, Columns),
|
|
|
|
+ ResultsMapper = fun(Row) -> lists:zip(ColsNames, tuple_to_list(Row)) end,
|
|
|
|
+ lists:map(ResultsMapper, Rows).
|
|
|
|
+
|
|
|
|
+extendedQuery(SQL, Params) ->
|
|
|
|
+ QueryRes = case Params of
|
|
|
|
+ [] -> pgsql:equery(connection(), SQL);
|
|
|
|
+ _ -> pgsql:equery(connection(), SQL, Params) end,
|
|
|
|
+ _Reply = case QueryRes of
|
|
|
|
+ {ok, Columns, Rows} -> {ok, mapResult(Columns, Rows)};
|
|
|
|
+ {ok, Count, Columns, Rows} -> {ok, Count, mapResult(Columns, Rows)};
|
|
|
|
+ _ -> QueryRes end.
|
|
|
|
+
|
|
|
|
+connection() ->
|
|
|
|
+ case wf:cache(pgsql_conn) of
|
|
|
|
+ unefined -> Host = kvs:config(pgsql,host,"localhost"),
|
|
|
|
+ Port = kvs:config(pgsql,port, 5432),
|
|
|
|
+ User = kvs:config(pgsql,user, "user"),
|
|
|
|
+ Pass = kvs:config(pgsql,pass, "pass"),
|
|
|
|
+ Db = kvs:config(pgsql,db, "test"),
|
|
|
|
+ Timeout = kvs:config(pgsql,timeout,5000),
|
|
|
|
+ {ok, Conn} = pgsql:connect(Host, User, Pass,
|
|
|
|
+ [{database, Db}, {port, Port}, {timeout, Timeout}]),
|
|
|
|
+ wf:cache(pgsql_conn,Conn),
|
|
|
|
+ Conn;
|
|
|
|
+ Connection -> Connection end.
|