store_sql.erl 16 KB


  1. -module(store_sql).
  2. %%-author('Daniil Churikov').
  3. %%-author('Max Davidenko').
  4. -include_lib("kvs/include/sql.hrl").
  5. -include_lib("kvs/include/metainfo.hrl").
  6. -export([
  7. start/0,
  8. stop/0,
  9. version/0,
  10. join/0,
  11. join/1,
  12. initialize/0,
  13. insert/3,
  14. select_pk/3,
  15. select/3,
  16. update_pk/3,
  17. update_pk_diff/4,
  18. update/4,
  19. delete_pk/3,
  20. delete/3,
  21. prepare_insert/3,
  22. prepare_select/3,
  23. prepare_update/4,
  24. prepare_delete/3,
  25. build/1,
  26. e2l/1,
  27. skip/3,
  28. qdata/4,
  29. returning/3,
  30. where/2,
  31. predicate/2,
  32. op_to_bin/1,
  33. order_by/1,
  34. build_return/1,
  35. build_where/1,
  36. build_order_by/1,
  37. predicate_val/1,
  38. set_predicate_val/2,
  39. map2/3,
  40. intersperse/2,
  41. intersperse2/4,
  42. put/1,
  43. sql_put/1,
  44. get/2,
  45. sql_get/2,
  46. proplistToRecord/2,
  47. convertBindingVal/1,
  48. mapResult/2,
  49. extendedQuery/2,
  50. connection/0
  51. ]).
  52. -define(SETTINGS, #sql_settings{placeholder = fun (_, Pos, _) -> PosBin = integer_to_binary(Pos), <<"$", PosBin/binary>> end}).
  53. start() -> ok.
  54. stop() -> ok.
  55. version() -> {version,"KVS SQL 1.0.0"}.
  56. join() -> ok.
  57. join(_) -> ok.
  58. initialize() -> ok.
  59. insert(E, Table, S) ->
  60. SkipFun = fun(#column{ro = RO}, V) -> RO orelse V == '$skip' end,
  61. Q = prepare_insert(skip(SkipFun, Table#table.columns, e2l(E)), Table, S),
  62. if
  63. Q#query.values /= [] ->
  64. Query = build(Q),
  65. BinFold = fun(Elem, Acc) ->
  66. <<Acc/binary, Elem/binary>>
  67. end,
  68. NewBody = lists:foldl(BinFold, <<>>, lists:flatten(Query#query.body)),
  69. NewVals = lists:map(fun convertBindingVal/1, Query#query.values),
  70. {ok, Query#query{body = NewBody, values = NewVals}};
  71. true ->
  72. {error, empty_insert}
  73. end.
  74. select_pk(E, Table, S) ->
  75. SkipFun = fun(#column{key = Key}, _) -> not Key end,
  76. Q = prepare_select(skip(SkipFun, Table#table.columns, e2l(E)), Table, S),
  77. if
  78. Q#query.values /= [] ->
  79. Query = build(Q),
  80. BinFold = fun(Elem, Acc) ->
  81. <<Acc/binary, Elem/binary>>
  82. end,
  83. NewBody = lists:foldl(BinFold, <<>>, lists:flatten(Query#query.body)),
  84. NewVals = lists:map(fun convertBindingVal/1, Query#query.values),
  85. {ok, Query#query{body = NewBody, values = NewVals}};
  86. true -> {error, pk_miss} end.
  87. select(E, Table, S) ->
  88. SkipFun = fun(_, V) -> V == '$skip' end,
  89. Query = build(prepare_select(skip(SkipFun, Table#table.columns, e2l(E)), Table, S)),
  90. BinFold = fun(Elem, Acc) ->
  91. <<Acc/binary, Elem/binary>>
  92. end,
  93. NewBody = lists:foldl(BinFold, <<>>, lists:flatten(Query#query.body)),
  94. NewVals = lists:map(fun convertBindingVal/1, Query#query.values),
  95. {ok, Query#query{body = NewBody, values = NewVals}}.
  96. update_pk(E, Table = #table{columns = MekaoCols}, S) ->
  97. SetSkipFun = fun(#column{ro = RO}, V) -> RO orelse V == '$skip' end,
  98. WhereSkipFun = fun(#column{key = Key}, _) -> not Key end,
  99. Vals = e2l(E),
  100. Q = prepare_update(skip(SetSkipFun, MekaoCols, Vals),
  101. skip(WhereSkipFun, MekaoCols, Vals),Table, S),
  102. if (Q#query.body)#sql_update.set == [] -> {error, empty_update};
  103. (Q#query.body)#sql_update.where == [] -> {error, pk_miss};
  104. true ->
  105. Query = build(Q),
  106. BinFold = fun(Elem, Acc) ->
  107. <<Acc/binary, Elem/binary>>
  108. end,
  109. NewBody = lists:foldl(BinFold, <<>>, lists:flatten(Query#query.body)),
  110. NewVals = lists:map(fun convertBindingVal/1, Query#query.values),
  111. {ok, Query#query{body = NewBody, values = NewVals}}
  112. end.
  113. update_pk_diff(E1, E2, Table = #table{columns = MekaoCols}, S) ->
  114. Vals1 = e2l(E1),
  115. Vals2 = e2l(E2),
  116. DiffVals = map2(
  117. fun (V, V) -> '$skip';
  118. (_, V2) -> V2 end, Vals1, Vals2),
  119. SetSkipFun = fun(#column{ro = RO}, V) -> RO orelse V == '$skip' end,
  120. WhereSkipFun = fun(#column{key = Key}, _) -> not Key end,
  121. Q = prepare_update(skip(SetSkipFun, MekaoCols, DiffVals),
  122. skip(WhereSkipFun, MekaoCols, Vals1),Table, S),
  123. if (Q#query.body)#sql_update.set == [] -> {error, empty_update};
  124. (Q#query.body)#sql_update.where == [] -> {error, pk_miss};
  125. true ->
  126. Query = build(Q),
  127. BinFold = fun(Elem, Acc) ->
  128. <<Acc/binary, Elem/binary>>
  129. end,
  130. NewBody = lists:foldl(BinFold, <<>>, lists:flatten(Query#query.body)),
  131. NewVals = lists:map(fun convertBindingVal/1, Query#query.values),
  132. {ok, Query#query{body = NewBody, values = NewVals}}
  133. end.
  134. update(E, Selector, Table = #table{columns = MekaoCols}, S) ->
  135. SetSkipFun = fun(#column{ro = RO}, V) -> RO orelse V == '$skip' end,
  136. WhereSkipFun = fun(_, V) -> V == '$skip' end,
  137. Q = prepare_update(skip(SetSkipFun, MekaoCols, e2l(E)),
  138. skip(WhereSkipFun, MekaoCols, e2l(Selector)),Table, S),
  139. if (Q#query.body)#sql_update.set == [] -> {error, empty_update};
  140. true ->
  141. Query = build(Q),
  142. BinFold = fun(Elem, Acc) ->
  143. <<Acc/binary, Elem/binary>>
  144. end,
  145. NewBody = lists:foldl(BinFold, <<>>, lists:flatten(Query#query.body)),
  146. NewVals = lists:map(fun convertBindingVal/1, Query#query.values),
  147. {ok, Query#query{body = NewBody, values = NewVals}}
  148. end.
  149. delete_pk(E, Table, S) ->
  150. SkipFun = fun(#column{key = Key}, _) -> not Key end,
  151. Q = prepare_delete(skip(SkipFun, Table#table.columns, e2l(E)), Table, S),
  152. if Q#query.values /= [] ->
  153. Query = build(Q),
  154. BinFold = fun(Elem, Acc) ->
  155. <<Acc/binary, Elem/binary>>
  156. end,
  157. NewBody = lists:foldl(BinFold, <<>>, lists:flatten(Query#query.body)),
  158. NewVals = lists:map(fun convertBindingVal/1, Query#query.values),
  159. {ok, Query#query{body = NewBody, values = NewVals}};
  160. true -> {error, pk_miss} end.
  161. delete(Selector, Table, S) ->
  162. SkipFun = fun(_, V) -> V == '$skip' end,
  163. Q = prepare_delete(skip(SkipFun, Table#table.columns, e2l(Selector)), Table, S),
  164. Query = build(Q),
  165. BinFold = fun(Elem, Acc) ->
  166. <<Acc/binary, Elem/binary>>
  167. end,
  168. NewBody = lists:foldl(BinFold, <<>>, lists:flatten(Query#query.body)),
  169. NewVals = lists:map(fun convertBindingVal/1, Query#query.values),
  170. {ok, Query#query{body = NewBody, values = NewVals}}.
  171. prepare_insert(E, Table, S) ->
  172. {Cols, PHs, Types, Vals} = qdata(1, e2l(E), Table#table.columns, S),
  173. Q = #sql_insert{table=atom_to_binary(Table#table.name, utf8),columns=intersperse(Cols, <<", ">>),
  174. values=intersperse(PHs, <<", ">>),returning=returning(insert, Table, S)},
  175. #query{body=Q,types=Types,values=Vals,next_ph_num=length(PHs) + 1}.
  176. prepare_select(E, Table, S) ->
  177. #table{columns = MekaoCols,order_by = OrderBy} = Table,
  178. {Where, {_, PHs, Types, Vals}} = where(qdata(1, e2l(E), MekaoCols, S), S),
  179. AllCols = intersperse(MekaoCols, <<", ">>, fun(#column{name = Name}) -> Name end),
  180. Q = #sql_select{table=atom_to_binary(Table#table.name, utf8),columns=AllCols,where=Where,order_by=order_by(OrderBy)},
  181. #query{body=Q,types=Types,values=Vals,next_ph_num = length(PHs) + 1}.
  182. prepare_update(SetE, WhereE, Table = #table{columns = MekaoCols}, S) ->
  183. {SetCols, SetPHs, SetTypes, SetVals} = qdata(1, e2l(SetE), MekaoCols, S),
  184. SetPHsLen = length(SetPHs),
  185. {Where, {_, WherePHs, WhereTypes, WhereVals}} = where(qdata(SetPHsLen + 1, e2l(WhereE), MekaoCols, S), S),
  186. WherePHsLen = length(WherePHs),
  187. Set = intersperse2(fun (C, PH) -> [C, <<" = ">>, PH] end,<<", ">>, SetCols, SetPHs),
  188. Q = #sql_update{table=atom_to_binary(Table#table.name, utf8),set=Set,where=Where,returning=returning(update, Table, S)},
  189. #query{body=Q,types=SetTypes++WhereTypes,values=SetVals++WhereVals,next_ph_num = SetPHsLen + WherePHsLen + 1}.
  190. prepare_delete(E, Table, S) ->
  191. {Where, {_, PHs, Types, Vals}} = where(qdata(1, e2l(E), Table#table.columns, S), S),
  192. Q = #sql_delete{table=atom_to_binary(Table#table.name, utf8),where=Where,returning=returning(delete, Table, S)},
  193. #query{body=Q,types=Types,values=Vals,next_ph_num = length(PHs) + 1}.
  194. build(Q = #query{body = Select}) when is_record(Select, sql_select) ->
  195. #sql_select{columns=Columns,table=Table,where=Where,order_by=OrderBy} = Select,
  196. Q#query{body = [<<"SELECT ">>, Columns, <<" FROM ">>, Table, build_where(Where),build_order_by(OrderBy)]};
  197. build(Q = #query{body = Insert}) when is_record(Insert, sql_insert) ->
  198. #sql_insert{table=Table,columns=Columns,values=Values,returning=Return} = Insert,
  199. Q#query{body = [<<"INSERT INTO ">>, Table, <<" (">>, Columns, <<") VALUES (">>,Values, <<")">>, build_return(Return)]};
  200. build(Q = #query{body = Update}) when is_record(Update, sql_update) ->
  201. #sql_update{table=Table,set=Set,where=Where,returning=Return} = Update,
  202. Q#query{body = [<<"UPDATE ">>, Table, <<" SET ">>, Set,build_where(Where), build_return(Return)]};
  203. build(Q = #query{body = Delete}) when is_record(Delete, sql_delete) ->
  204. #sql_delete{table=Table,where=Where,returning=Return} = Delete,
  205. Q#query{body = [<<"DELETE FROM ">>, Table, build_where(Where),build_return(Return)]}.
  206. e2l(Vals) when is_list(Vals) -> Vals;
  207. e2l(E) when is_tuple(E) -> [_EntityName | Vals] = tuple_to_list(E), Vals.
  208. skip(SkipFun, Cols, Vals) -> map2(fun(C, V) -> Skip = SkipFun(C, V),if Skip -> '$skip'; true -> V end end, Cols, Vals).
  209. qdata(_, [], [], _) -> {[], [], [], []};
  210. qdata(Num, ['$skip' | Vals], [_Col | Cols], S) -> qdata(Num, Vals, Cols, S);
  211. qdata(Num, [Pred | Vals], [Col | Cols], S) ->
  212. #column{type = T, name = CName, transform = TrFun} = Col,
  213. V = predicate_val(Pred),
  214. NewV = if TrFun == undefined -> V;
  215. true -> TrFun(V) end,
  216. PH = (S#sql_settings.placeholder)(Col, Num, NewV),
  217. NewPred = set_predicate_val(Pred, NewV), {ResCols, ResPHs, ResTypes, ResVals} = qdata(Num + 1, Vals, Cols, S),
  218. {[CName | ResCols], [PH | ResPHs], [T | ResTypes], [NewPred | ResVals]}.
  219. returning(_QType, _Table, #sql_settings{returning = undefined}) -> [];
  220. returning(QType, Table, #sql_settings{returning = RetFun}) -> RetFun(QType, Table).
  221. where(QData = {[], [], [], []}, _S) -> {[], QData};
  222. where({[C], [PH], [T], [V]}, S) ->
  223. {W, {NewC, NewPH, NewT, NewV}} = predicate({C, PH, T, V}, S),
  224. {[W], {[NewC], [NewPH], [NewT], [NewV]}};
  225. where({[C | Cs], [PH | PHs], [T | Types], [V | Vals]}, S) ->
  226. {W, {NewC, NewPH, NewT, NewV}} = predicate({C, PH, T, V}, S),
  227. {Ws, {NewCs, NewPHs, NewTypes, NewVals}} = where({Cs, PHs, Types, Vals}, S),
  228. {[W, <<" AND ">> | Ws], {[NewC | NewCs], [NewPH | NewPHs], [NewT | NewTypes], [NewV | NewVals]}}.
  229. %% TODO: add NOT, IN, ANY, ALL, BETWEEN, LIKE handling
  230. predicate({C, PH, T, {'$predicate', Op, V}}, S) when Op == '='; Op == '<>' ->
  231. IsNull = (S#sql_settings.is_null)(V),
  232. if not IsNull -> {[C, op_to_bin(Op), PH], {C, PH, T, V}};
  233. Op == '=' -> {[C, <<" IS NULL">>], {C, PH, T, V}};
  234. Op == '<>' -> {[C, <<" IS NOT NULL">>], {C, PH, T, V}} end;
  235. predicate({C, PH, T, {'$predicate', OP, V}}, _S) -> {[C, op_to_bin(OP), PH], {C, PH, T, V}};
  236. predicate({C, PH, T, V}, S) -> predicate({C, PH, T, {'$predicate', '=', V}}, S).
  237. op_to_bin('=') -> <<" = ">>;
  238. op_to_bin('<>') -> <<" <> ">>;
  239. op_to_bin('>') -> <<" > ">>;
  240. op_to_bin('>=') -> <<" >= ">>;
  241. op_to_bin('<') -> <<" < ">>;
  242. op_to_bin('<=') -> <<" <= ">>.
  243. order_by(undefined) -> [];
  244. order_by([]) -> [];
  245. order_by([O]) -> [order_by_1(O)];
  246. order_by([O | OrderBy]) -> [order_by_1(O), <<", ">> | order_by(OrderBy)].
  247. order_by_1(E) when not is_tuple(E) -> order_by_1({E, {default, default}});
  248. order_by_1({Pos, Opts}) when is_integer(Pos) -> order_by_1({integer_to_list(Pos - 1), Opts});
  249. order_by_1({Expr, Opts}) when is_list(Expr); is_binary(Expr) -> [Expr, order_by_opts(Opts)].
  250. order_by_opts({Ordering, Nulls}) ->
  251. O = case Ordering of
  252. default -> <<"">>;
  253. asc -> <<" ASC">>;
  254. desc -> <<" DESC">> end,
  255. case Nulls of
  256. default -> O;
  257. nulls_first -> <<O/binary," NULLS FIRST">>;
  258. nulls_last -> <<O/binary, " NULLS LAST">> end.
  259. build_return([]) -> <<>>;
  260. build_return(Return) -> [<<" ">> | Return].
  261. build_where([]) -> <<>>;
  262. build_where(Where) -> [<<" WHERE ">> | Where].
  263. build_order_by([]) -> <<>>;
  264. build_order_by(OrderBy) -> [<<" ORDER BY ">>, OrderBy].
  265. predicate_val({'$predicate', _, V}) -> V;
  266. predicate_val(V) -> V.
  267. set_predicate_val({'$predicate', Op, _}, NewV) -> {'$predicate', Op, NewV};
  268. set_predicate_val(_, NewV) -> NewV.
  269. map2(_Fun, [], []) -> [];
  270. map2(Fun, [V1 | L1], [V2 | L2]) -> [Fun(V1, V2) | map2(Fun, L1, L2)].
  271. intersperse(List, Sep) -> intersperse(List, Sep, fun (X) -> X end).
  272. intersperse([], _, _) -> [];
  273. intersperse([Item], _, Fun) -> [Fun(Item)];
  274. intersperse([Item | Items], Sep, Fun) -> [Fun(Item), Sep | intersperse(Items, Sep, Fun)].
  275. intersperse2(_Fun, _Sep, [], []) -> [];
  276. intersperse2(Fun, _Sep, [I1], [I2]) -> [Fun(I1, I2)];
  277. intersperse2(Fun, Sep, [I1 | I1s], [I2 | I2s]) -> [Fun(I1, I2), Sep | intersperse2(Fun, Sep, I1s, I2s)].
  278. put(Records) when is_list(Records) -> lists:foreach(fun sql_put/1, Records);
  279. put(Record) -> sql_put(Record).
  280. sql_put(Record) ->
  281. Table = kvs:table(element(1, Record)),
  282. io:format("Trying to put ~p ~n", [Record]),
  283. {ok, Query} = insert(Record, Table, ?SETTINGS),
  284. io:format("Query [~p], vals [~p] ~n", [Query#query.body, Query#query.values]),
  285. PutRes = extendedQuery(Query#query.body, Query#query.values),
  286. case PutRes of
  287. {ok, _} -> ok;
  288. _ -> PutRes end.
  289. get(Table, Key) ->
  290. io:format("Trying to get from ~p by ~p ~n", [Table, Key]),
  291. TableSpec = kvs:table(Table),
  292. SkipVals = lists:duplicate(length(TableSpec#table.fields) - 1, '$skip'),
  293. RecList = [Table | [Key | SkipVals]],
  294. Record = list_to_tuple(RecList),
  295. sql_get(TableSpec, Record).
  296. sql_get(TableSpec, Record) ->
  297. io:format("Record is: ~p ~n", [Record]),
  298. {ok, Query} = select(Record, TableSpec, ?SETTINGS),
  299. io:format("Query is: ~p ~n", [Query#query.body]),
  300. QueryRes = extendedQuery(Query#query.body, Query#query.values),
  301. case QueryRes of
  302. {ok, []} -> {error, not_found};
  303. {ok, Rows} -> [Row | _] = Rows, {ok, proplistToRecord(element(1, Record), Row)};
  304. {ok, _Count, Rows} -> [Row | _] = Rows,{ok, proplistToRecord(element(1, Record), Row)};
  305. _ -> QueryRes end.
  306. proplistToRecord(Tag, Proplist) ->
  307. ValsExt = fun(Elem) ->
  308. {_, Val} = Elem,
  309. Val
  310. end,
  311. Vals = lists:map(ValsExt, Proplist),
  312. RecList = [Tag | Vals],
  313. list_to_tuple(RecList).
  314. convertBindingVal(Val) when is_integer(Val) -> Val;
  315. convertBindingVal(Val) when is_float(Val) -> Val;
  316. convertBindingVal(Val) -> term_to_binary(Val).
  317. mapResult([], _Rows) -> [];
  318. mapResult(_Columns, []) -> [];
  319. mapResult(Columns, Rows) ->
  320. ColumnsExtractor = fun(Elem) -> Elem#column.name end,
  321. ColsNames = lists:map(ColumnsExtractor, Columns),
  322. ResultsMapper = fun(Row) -> lists:zip(ColsNames, tuple_to_list(Row)) end,
  323. lists:map(ResultsMapper, Rows).
  324. extendedQuery(SQL, Params) ->
  325. QueryRes = case Params of
  326. [] -> pgsql:equery(connection(), SQL);
  327. _ -> pgsql:equery(connection(), SQL, Params) end,
  328. _Reply = case QueryRes of
  329. {ok, Columns, Rows} -> {ok, mapResult(Columns, Rows)};
  330. {ok, Count, Columns, Rows} -> {ok, Count, mapResult(Columns, Rows)};
  331. _ -> QueryRes end.
  332. connection() ->
  333. case wf:cache(pgsql_conn) of
  334. unefined -> Host = kvs:config(pgsql,host,"localhost"),
  335. Port = kvs:config(pgsql,port, 5432),
  336. User = kvs:config(pgsql,user, "user"),
  337. Pass = kvs:config(pgsql,pass, "pass"),
  338. Db = kvs:config(pgsql,db, "test"),
  339. Timeout = kvs:config(pgsql,timeout,5000),
  340. {ok, Conn} = pgsql:connect(Host, User, Pass,
  341. [{database, Db}, {port, Port}, {timeout, Timeout}]),
  342. wf:cache(pgsql_conn,Conn),
  343. Conn;
  344. Connection -> Connection end.