Browse Source

Merge branch 'master' of https://github.com/synrc/kvs

Maxim Sokhatsky 10 years ago
parent
commit
4446f92843
7 changed files with 285 additions and 51 deletions
  1. 4 2
      include/metainfo.hrl
  2. 45 0
      include/sql.hrl
  3. 1 1
      samples/rels/files/sys.config
  4. 6 2
      src/kvs.erl
  5. 4 0
      src/kvs_feed.erl
  6. 36 46
      src/store_riak.erl
  7. 189 0
      src/store_sql.erl

+ 4 - 2
include/metainfo.hrl

@@ -1,7 +1,9 @@
 -ifndef(METAINFO_HRL).
 -define(METAINFO_HRL, true).
 
--record(schema,{name,tables=[]}).
--record(table,{name,container,fields=[],keys=[],copy_type=disc_copies}).
+-record(schema, {name,tables=[]}).
+-record(table,  {name,container=feed,fields=[],keys=[],copy_type=disc_copies,columns,order_by}).
+-record(column, {name,type,key=false,ro=false,transform}).
+-record(query,  {body,types=[],values=[],next_ph_num = 1}).
 
 -endif.

+ 45 - 0
include/sql.hrl

@@ -0,0 +1,45 @@
+-include("metainfo.hrl").
+
+% MEKAO SQL
+
+-record(sql_select,   {table,columns,where,order_by}).
+-record(sql_insert,   {table,columns,values,returning}).
+-record(sql_update,   {table,set,where,returning}).
+-record(sql_delete,   {table,where,returning}).
+-record(sql_settings, {placeholder,returning,is_null = fun(X) -> X == undefined end}).
+
+-spec insert(entity(), table(), s()) -> {ok, b_query()} | {error, empty_insert}.
+-spec select_pk(selector(), table(), s()) -> {ok, b_query()} | {error, pk_miss}.
+-spec select(selector(), table(), s()) -> {ok, b_query()}.
+-spec update_pk(selector(), table(), s()) -> {ok, b_query()} | {error, pk_miss} | {error, empty_update}.
+-spec update_pk_diff( Old :: entity(), New :: entity(), table(), s()) -> {ok, b_query()} | {error, pk_miss} | {error, empty_update}.
+-spec update(entity(), selector(), table(), s()) -> {ok, b_query()} | {error, empty_update}.
+-spec delete_pk(selector(), table(), s()) -> {ok, b_query()} | {error, pk_miss}.
+-spec delete(selector(), table(), s()) -> {ok, b_query()}.
+-spec prepare_insert(entity(), table(), s()) -> p_query().
+-spec prepare_select(selector(), table(), s()) -> p_query().
+-spec prepare_delete(selector(), table(), s()) -> p_query().
+-spec prepare_update(entity(), selector(), table(), s()) -> p_query().
+-spec returning(insert | update | delete, table(), s()) -> iolist().
+-spec build(p_query()) -> b_query().
+
+-type b_query() :: 'query'(iolist()).
+-type table()   :: #table{}.
+-type column()  :: #column{}.
+-type s()       :: #sql_settings{}.
+-type entity()      :: tuple() | list().
+-type selector()    :: tuple() | list(predicate(term())).
+-type predicate(Value) :: Value | { '$predicate', '=' | '<>' | '>' | '>=' | '<' | '<=', Value}.
+-type 'query'(Body) :: #query{body :: Body}.
+-type p_query() :: 'query'( #sql_insert{}
+                          | #sql_select{}
+                          | #sql_update{}
+                          | #sql_delete{}
+                          ).
+
+-export_type([
+    table/0, column/0, s/0,
+    p_query/0, b_query/0,
+    predicate/1
+]).
+

+ 1 - 1
samples/rels/files/sys.config

@@ -2,7 +2,7 @@
  {kvs,
      [{pass_init_db,true},
       {nodes,[]},
-      {dba, store_riak},
+      {dba, store_mnesia},
       {schema, [kvs_user, kvs_acl, kvs_feed, kvs_subscription]}]},
  {sasl, [
          {sasl_error_logger, {file, "log/sasl-error.log"}},

+ 6 - 2
src/kvs.erl

@@ -76,7 +76,8 @@ add(Record) when is_tuple(Record) ->
                             list_to_tuple([CName|proplists:get_value(CName, kvs:containers())]), Cid),
                     NC1 = setelement(#container.entries_count, NC, 0),
 
-                    kvs:put(NC1),NC1;
+                    kvs:put(NC1),
+                    NC1;
 
                 _ -> error end,
 
@@ -196,12 +197,15 @@ traversal(RecordType2, Start, Count, Direction)->
     {ok, R} ->  Prev = element(Direction, R),
                 Count1 = case Count of C when is_integer(C) -> C - 1; _-> Count end,
                 [R | traversal(RecordType2, Prev, Count1, Direction)];
-    Error -> [] end.
+    Error -> 
+     io:format("Error: ~p~n",[Error]),
+      [] end.
 
 entries(Name) -> Table = kvs:table(Name), entries(kvs:get(Table#table.container,Name), Name, undefined).
 entries(Name, Count) -> Table = kvs:table(Name), entries(kvs:get(Table#table.container,Name), Name, Count).
 entries({ok, Container}, RecordType, Count) -> entries(Container, RecordType, Count);
 entries(Container, RecordType, Count) when is_tuple(Container) ->
+    io:format("Container: ~p~n",[Container]),
     traversal(RecordType, element(#container.top, Container), Count, #iterator.prev).
 
 entries(RecordType, Start, Count, Direction) ->

+ 4 - 0
src/kvs_feed.erl

@@ -2,11 +2,15 @@
 -copyright('Synrc Research Center, s.r.o.').
 -compile(export_all).
 -include("config.hrl").
+-include("entry.hrl").
+-include("comment.hrl").
 -include("feed.hrl").
 -include("metainfo.hrl").
 -include("state.hrl").
 
 metainfo() -> 
     #schema{name=kvs,tables=[
+        #table{name=entry,container=true,fields=record_info(fields,entry)},
+        #table{name=comment,container=true,fields=record_info(fields,comment)},
         #table{name=feed,container=true,fields=record_info(fields,feed)}
     ]}.

+ 36 - 46
src/store_riak.erl

@@ -14,20 +14,23 @@
 start() -> ok.
 stop() -> ok.
 version() -> {version,"KVS RIAK 2.0.2"}.
-join() -> initialize(), ok.
-join(_) -> initialize(), ok.
+join() -> ok.
+join(Ring) -> riak_core:join(Ring).
 initialize() -> riak:client_connect(node()).
 
 dir() ->
-    {ok,Buckets} = riak_client:list_buckets(),
+    {ok,C}=riak:local_client(),
+    {ok,Buckets} = C:list_buckets(),
     [{table,binary_to_list(X)}||X<-Buckets].
 
 riak_clean(Table) when is_list(Table)->
-    {ok,Keys}=riak_client:list_keys(erlang:list_to_binary(Table)),
-    [ riak_client:delete(erlang:list_to_binary(Table),Key) || Key <- Keys];
+    {ok,C}=riak:local_client(),
+    {ok,Keys}=C:list_keys(erlang:list_to_binary(Table)),
+    [ C:delete(erlang:list_to_binary(Table),Key) || Key <- Keys];
 riak_clean(Table) ->
+    {ok,C}=riak:local_client(),
     [TableStr] = io_lib:format("~p",[Table]),
-    {ok,Keys}=riak_client:list_keys(erlang:list_to_binary(TableStr)),
+    {ok,Keys}=C:list_keys(erlang:list_to_binary(TableStr)),
     [ kvs:delete(Table,key_to_bin(Key)) || Key <- Keys].
 
 make_object(T) ->
@@ -37,7 +40,7 @@ make_object(T) ->
     Indices = make_indices(T),
     Meta = dict:store(<<"index">>, Indices, dict:new()),
     Obj2 = riak_object:update_metadata(Obj1, Meta),
-    error_logger:info_msg("RIAK PUT IDX ~p",[Indices]),
+    kvs:info(?MODULE,"RIAK PUT IDX ~p",[Indices]),
     Obj2.
 
 make_indices(#subscription{who=Who, whom=Whom}) -> [
@@ -48,6 +51,9 @@ make_indices(#user{id=UId,zone=Zone}) -> [
     {<<"user_bin">>, key_to_bin(UId)},
     {<<"zone_bin">>, key_to_bin(Zone)}];
 
+make_indices(#feed{id=UId}) -> [
+    {<<"feed_bin">>, key_to_bin(UId)}];
+
 make_indices(#comment{id={CID,EID},from=Who}) -> [
     {<<"comment_bin">>, key_to_bin({CID,EID})},
     {<<"author_bin">>, key_to_bin(Who)}];
@@ -67,30 +73,15 @@ put(Record) -> riak_put(Record).
 
 riak_put(Record) ->
     {ok,C}=riak:local_client(),
+    Bucket = key_to_bin(element(1,Record)),
+    Key = key_to_bin(element(2,Record)),
     Object = make_object(Record),
-    Result = riak_client:put(Object,C),
-    Result.
-
-put_if_none_match(Record) ->
-    Object = make_object(Record),
-    case riak_client:put(Object, [if_none_match]) of
-        ok -> ok;
-        Error -> Error end.
-
-update(Record, Object) ->
-    NewObject = make_object(Record),
-    NewKey = riak_object:key(NewObject),
-    case riak_object:key(Object) of
-        NewKey ->
-            MetaInfo = riak_object:get_update_metatdata(NewObject),
-            UpdObject2 = riak_object:update_value(Object, Record),
-            UpdObject3 = riak_object:update_metadata(UpdObject2, MetaInfo),
-            case riak_client:put(UpdObject3, [if_not_modified]) of
-                ok -> ok;
-                Error -> Error
-            end;
-        _ -> {error, keys_not_equal}
-    end.
+    RiakAnswer = C:get(Bucket,Key),
+    case RiakAnswer of
+         {ok,O} ->
+              Obj = riak_object:update_value(riak_object:reconcile([O],false), Record),
+              C:put(Obj);
+         _ -> C:put(Object) end.
 
 get(Tab, Key) ->
     Bucket = key_to_bin(Tab),
@@ -99,16 +90,14 @@ get(Tab, Key) ->
 
 riak_get(Bucket,Key) ->
     {ok,C} = riak:local_client(),
-    RiakAnswer = riak_client:get(Bucket,Key,C),
+    RiakAnswer = C:get(Bucket,Key),
     case RiakAnswer of
-        {ok, O} -> {ok, riak_object:get_value(O)};
+        {ok, O} ->
+            % kvs:info(?MODULE,"Value Count: ~p~n",[riak_object:value_count(O)]),
+            {ok,riak_object:get_value(riak_object:reconcile([O],false))};
+        {error, notfound} -> {error, not_found};
         X -> X end.
 
-get_for_update(Tab, Key) ->
-    case riak_client:get(key_to_bin(Tab), key_to_bin(Key)) of
-        {ok, O} -> {ok, riak_object:get_value(O), O};
-        Error -> Error end.
-
 delete(Tab, Key) ->
     {ok,C}=riak:local_client(),
     Bucket = key_to_bin(Tab),
@@ -116,9 +105,10 @@ delete(Tab, Key) ->
     C:delete(Bucket, IntKey).
 
 delete_by_index(Tab, IndexId, IndexVal) ->
+    {ok,C}=riak:local_client(),
     Bucket = key_to_bin(Tab),
     {ok, Keys} = riak_client:get_index(Bucket, {eq, IndexId, key_to_bin(IndexVal)}),
-    [riak_client:delete(Bucket, Key) || Key <- Keys].
+    [C:delete(Bucket, Key) || Key <- Keys].
 
 key_to_bin(Key) ->
     if is_integer(Key) -> erlang:list_to_binary(integer_to_list(Key));
@@ -131,30 +121,30 @@ all(RecordName) ->
     {ok,C}=riak:local_client(),
     RecordBin = key_to_bin(RecordName),
     {ok,Keys} = C:list_keys(RecordBin),
-    io:format("RecordBin: ~p~n",[RecordBin]),
-    io:format("Keys: ~p~n",[Keys]),
     Results = [ riak_get_raw({RecordBin, Key, C}) || Key <- Keys ],
     [ Object || Object <- Results, Object =/= failure ].
 
 all_by_index(Tab, IndexId, IndexVal) ->
+    {ok,C}=riak:local_client(),
     Bucket = key_to_bin(Tab),
-    {ok, Keys} = riak_client:get_index(Bucket, {eq, IndexId, key_to_bin(IndexVal)}),
+    {ok, Keys} = C:get_index(Bucket, {eq, IndexId, key_to_bin(IndexVal)}),
     lists:foldl(fun(Key, Acc) ->
-        case riak_client:get(Bucket, Key, []) of
-            {ok, O} -> [riak_object:get_value(O) | Acc];
+        case C:get(Bucket, Key) of
+            {ok, O} -> {ok,riak_object:get_value(riak_object:reconcile([O],false))};
             {error, notfound} -> Acc end end, [], Keys).
 
 riak_get_raw({RecordBin, Key, C}) ->
     case C:get(RecordBin, Key) of
-        {ok,O} -> riak_object:get_value(O);
+        {ok, O} -> riak_object:get_value(riak_object:reconcile([O],false));
         _ -> failure end.
 
 next_id(CounterId) -> next_id(CounterId, 1).
 next_id(CounterId, Incr) -> next_id(CounterId, 0, Incr).
 next_id(CounterId, Default, Incr) ->
+    {ok,C}=riak:local_client(),
     CounterBin = key_to_bin(CounterId),
     {Object, Value, Options} =
-        case riak_client:get(key_to_bin(id_seq), CounterBin, []) of
+        case C:get(key_to_bin(id_seq), CounterBin) of
             {ok, CurObj} ->
                 R = #id_seq{id = CurVal} = riak_object:get_value(CurObj),
                 NewVal = CurVal + Incr,
@@ -164,7 +154,7 @@ next_id(CounterId, Default, Incr) ->
                 NewVal = Default + Incr,
                 Obj = riak_object:new(key_to_bin(id_seq), CounterBin, #id_seq{thing = CounterId, id = NewVal}),
                 {Obj, NewVal, [if_none_match]} end,
-    case riak_client:put(Object, Options) of
+    case C:put(Object) of
         ok -> Value;
         {error, _} -> next_id(CounterId, Incr) end.
 

+ 189 - 0
src/store_sql.erl

@@ -0,0 +1,189 @@
+-module(store_sql).
+-author('Daniil Churikov').
+-compile(export_all).
+-include("sql.hrl").
+-include("metainfo.hrl").
+
+insert(E, Table, S) ->
+    SkipFun = fun(#column{ro = RO}, V) -> RO orelse V == '$skip' end,
+    Q = prepare_insert(skip(SkipFun, Table#table.columns, e2l(E)), Table, S),
+    if Q#query.values /= [] -> {ok, build(Q)};
+    true -> {error, empty_insert} end.
+
+select_pk(E, Table, S) ->
+    SkipFun = fun(#column{key = Key}, _) -> not Key end,
+    Q = prepare_select(skip(SkipFun, Table#table.columns, e2l(E)), Table, S),
+    if Q#query.values /= [] -> {ok, build(Q)};
+    true -> {error, pk_miss} end.
+
+select(E, Table, S) ->
+    SkipFun = fun(_, V) -> V == '$skip' end,
+    {ok, build(prepare_select(skip(SkipFun, Table#table.columns, e2l(E)), Table, S))}.
+
+update_pk(E, Table = #table{columns = MekaoCols}, S) ->
+    SetSkipFun = fun(#column{ro = RO}, V) -> RO orelse V == '$skip' end,
+    WhereSkipFun = fun(#column{key = Key}, _) -> not Key end,
+    Vals = e2l(E),
+    Q = prepare_update(skip(SetSkipFun, MekaoCols, Vals),
+                       skip(WhereSkipFun, MekaoCols, Vals),Table, S),
+    if (Q#query.body)#sql_update.set == [] -> {error, empty_update};
+       (Q#query.body)#sql_update.where == [] -> {error, pk_miss};
+       true -> {ok, build(Q)} end.
+
+update_pk_diff(E1, E2, Table = #table{columns = MekaoCols}, S) ->
+    Vals1 = e2l(E1),
+    Vals2 = e2l(E2),
+    DiffVals = map2(
+        fun (V, V) -> '$skip';
+            (_, V2) -> V2 end, Vals1, Vals2),
+    SetSkipFun = fun(#column{ro = RO}, V) -> RO orelse V == '$skip' end,
+    WhereSkipFun = fun(#column{key = Key}, _) -> not Key end,
+    Q = prepare_update(skip(SetSkipFun, MekaoCols, DiffVals),
+                       skip(WhereSkipFun, MekaoCols, Vals1),Table, S),
+    if (Q#query.body)#sql_update.set == [] -> {error, empty_update};
+       (Q#query.body)#sql_update.where == [] -> {error, pk_miss};
+       true -> {ok, build(Q)} end.
+
+update(E, Selector, Table = #table{columns = MekaoCols}, S) ->
+    SetSkipFun = fun(#column{ro = RO}, V) -> RO orelse V == '$skip' end,
+    WhereSkipFun = fun(_, V) -> V == '$skip' end,
+    Q = prepare_update(skip(SetSkipFun, MekaoCols, e2l(E)),
+                       skip(WhereSkipFun, MekaoCols, e2l(Selector)),Table, S),
+    if (Q#query.body)#sql_update.set == [] -> {error, empty_update};
+       true -> {ok, build(Q)} end.
+
+delete_pk(E, Table, S) ->
+    SkipFun = fun(#column{key = Key}, _) -> not Key end,
+    Q = prepare_delete(skip(SkipFun, Table#table.columns, e2l(E)), Table, S),
+    if Q#query.values /= [] -> {ok, build(Q)};
+       true -> {error, pk_miss} end.
+
+delete(Selector, Table, S) ->
+    SkipFun = fun(_, V) -> V == '$skip' end,
+    Q = prepare_delete(skip(SkipFun, Table#table.columns, e2l(Selector)), Table, S),
+    {ok, build(Q)}.
+
+prepare_insert(E, Table, S) ->
+    {Cols, PHs, Types, Vals} = qdata(1, e2l(E), Table#table.columns, S),
+    Q = #sql_insert{table=Table#table.name,columns=intersperse(Cols, <<", ">>),
+                   values=intersperse(PHs, <<", ">>),returning=returning(insert, Table, S)},
+    #query{body=Q,types=Types,values=Vals,next_ph_num=length(PHs) + 1}.
+
+prepare_select(E, Table, S) ->
+    #table{columns = MekaoCols,order_by = OrderBy} = Table,
+    {Where, {_, PHs, Types, Vals}} = where(qdata(1, e2l(E), MekaoCols, S), S),
+    AllCols = intersperse(MekaoCols, <<", ">>, fun(#column{name = Name}) -> Name end),
+    Q = #sql_select{table=Table#table.name,columns=AllCols,where=Where,order_by=order_by(OrderBy)},
+    #query{body=Q,types=Types,values=Vals,next_ph_num = length(PHs) + 1}.
+
+prepare_update(SetE, WhereE, Table = #table{columns = MekaoCols}, S) ->
+    {SetCols, SetPHs, SetTypes, SetVals} = qdata(1, e2l(SetE), MekaoCols, S),
+    SetPHsLen = length(SetPHs),
+    {Where, {_, WherePHs, WhereTypes, WhereVals}} = where(qdata(SetPHsLen + 1, e2l(WhereE), MekaoCols, S), S),
+    WherePHsLen = length(WherePHs),
+    Set = intersperse2(fun (C, PH) -> [C, <<" = ">>, PH] end,<<", ">>, SetCols, SetPHs),
+    Q = #sql_update{table=Table#table.name,set=Set,where=Where,returning=returning(update, Table, S)},
+    #query{body=Q,types=SetTypes++WhereTypes,values=SetVals++WhereVals,next_ph_num = SetPHsLen + WherePHsLen + 1}.
+
+prepare_delete(E, Table, S) ->
+    {Where, {_, PHs, Types, Vals}} = where(qdata(1, e2l(E), Table#table.columns, S), S),
+    Q = #sql_delete{table=Table#table.name,where=Where,returning=returning(delete, Table, S)},
+    #query{body=Q,types=Types,values=Vals,next_ph_num = length(PHs) + 1}.
+
+build(Q = #query{body = Select}) when is_record(Select, sql_select) ->
+    #sql_select{columns=Columns,table=Table,where=Where,order_by=OrderBy} = Select,
+    Q#query{body = [<<"SELECT ">>, Columns, <<" FROM ">>, Table, build_where(Where),build_order_by(OrderBy)]};
+build(Q = #query{body = Insert}) when is_record(Insert, sql_insert) ->
+    #sql_insert{table=Table,columns=Columns,values=Values,returning=Return} = Insert,
+    Q#query{body = [<<"INSERT INTO ">>, Table, <<" (">>, Columns, <<") VALUES (">>,Values, <<")">>, build_return(Return)]};
+build(Q = #query{body = Update}) when is_record(Update, sql_update) ->
+    #sql_update{table=Table,set=Set,where=Where,returning=Return} = Update,
+    Q#query{body = [<<"UPDATE ">>, Table, <<" SET ">>, Set,build_where(Where), build_return(Return)]};
+build(Q = #query{body = Delete}) when is_record(Delete, sql_delete) ->
+    #sql_delete{table=Table,where=Where,returning=Return} = Delete,
+    Q#query{body = [<<"DELETE FROM ">>, Table, build_where(Where),build_return(Return)]}.
+
+e2l(Vals) when is_list(Vals) -> Vals;
+e2l(E) when is_tuple(E) -> [_EntityName | Vals] = tuple_to_list(E), Vals.
+
+skip(SkipFun, Cols, Vals) -> map2(fun(C, V) -> Skip = SkipFun(C, V),if Skip -> '$skip'; true -> V end end, Cols, Vals).
+
+qdata(_, [], [], _) -> {[], [], [], []};
+qdata(Num, ['$skip' | Vals], [_Col | Cols], S) -> qdata(Num, Vals, Cols, S);
+qdata(Num, [Pred | Vals], [Col | Cols], S) ->
+    #column{type = T, name = CName, transform = TrFun} = Col,
+    V = predicate_val(Pred),
+    NewV = if TrFun == undefined -> V;
+                            true -> TrFun(V) end,
+    PH = (S#sql_settings.placeholder)(Col, Num, NewV),
+    NewPred = set_predicate_val(Pred, NewV), {ResCols, ResPHs, ResTypes, ResVals} = qdata(Num + 1, Vals, Cols, S),
+    {[CName | ResCols], [PH | ResPHs], [T | ResTypes], [NewPred | ResVals]}.
+
+returning(_QType, _Table, #sql_settings{returning = undefined}) -> [];
+returning(QType, Table, #sql_settings{returning = RetFun}) -> RetFun(QType, Table).
+
+where(QData = {[], [], [], []}, _S) -> {[], QData};
+where({[C], [PH], [T], [V]}, S) ->
+    {W, {NewC, NewPH, NewT, NewV}} = predicate({C, PH, T, V}, S),
+    {[W], {[NewC], [NewPH], [NewT], [NewV]}};
+where({[C | Cs], [PH | PHs], [T | Types], [V | Vals]}, S) ->
+    {W, {NewC, NewPH, NewT, NewV}} = predicate({C, PH, T, V}, S),
+    {Ws, {NewCs, NewPHs, NewTypes, NewVals}} = where({Cs, PHs, Types, Vals}, S),
+    {[W, <<" AND ">> | Ws], {[NewC | NewCs], [NewPH | NewPHs], [NewT | NewTypes], [NewV | NewVals]}}.
+
+%% TODO: add NOT, IN, ANY, ALL, BETWEEN, LIKE handling
+predicate({C, PH, T, {'$predicate', Op, V}}, S) when Op == '='; Op == '<>' ->
+    IsNull = (S#sql_settings.is_null)(V),
+    if not IsNull -> {[C, op_to_bin(Op), PH], {C, PH, T, V}};
+    Op == '=' -> {[C, <<" IS NULL">>], {C, PH, T, V}};
+    Op == '<>' -> {[C, <<" IS NOT NULL">>], {C, PH, T, V}} end;
+predicate({C, PH, T, {'$predicate', OP, V}},  _S) -> {[C, op_to_bin(OP), PH],  {C, PH, T, V}};
+predicate({C, PH, T, V}, S) -> predicate({C, PH, T, {'$predicate', '=', V}}, S).
+
+op_to_bin('=')  -> <<" = ">>;
+op_to_bin('<>') -> <<" <> ">>;
+op_to_bin('>')  -> <<" > ">>;
+op_to_bin('>=') -> <<" >= ">>;
+op_to_bin('<')  -> <<" < ">>;
+op_to_bin('<=') -> <<" <= ">>.
+
+order_by([]) -> [];
+order_by([O]) -> [order_by_1(O)];
+order_by([O | OrderBy]) -> [order_by_1(O), <<", ">> | order_by(OrderBy)].
+order_by_1(E) when not is_tuple(E) -> order_by_1({E, {default, default}});
+order_by_1({Pos, Opts}) when is_integer(Pos) -> order_by_1({integer_to_list(Pos - 1), Opts});
+order_by_1({Expr, Opts}) when is_list(Expr); is_binary(Expr) -> [Expr, order_by_opts(Opts)].
+order_by_opts({Ordering, Nulls}) ->
+    O = case Ordering of
+        default -> <<"">>;
+        asc -> <<" ASC">>;
+        desc -> <<" DESC">> end,
+    case Nulls of
+        default -> O;
+        nulls_first -> <<O/binary," NULLS FIRST">>;
+        nulls_last -> <<O/binary, " NULLS LAST">> end.
+
+build_return([]) -> <<>>;
+build_return(Return) -> [<<" ">> | Return].
+build_where([]) -> <<>>;
+build_where(Where) -> [<<" WHERE ">> | Where].
+build_order_by([]) -> <<>>;
+build_order_by(OrderBy) -> [<<" ORDER BY ">>, OrderBy].
+
+predicate_val({'$predicate', _, V}) -> V;
+predicate_val(V) -> V.
+
+set_predicate_val({'$predicate', Op, _}, NewV) -> {'$predicate', Op, NewV};
+set_predicate_val(_, NewV) -> NewV.
+
+map2(_Fun, [], []) -> [];
+map2(Fun, [V1 | L1], [V2 | L2]) -> [Fun(V1, V2) | map2(Fun, L1, L2)].
+
+intersperse(List, Sep) -> intersperse(List, Sep, fun (X) -> X end).
+intersperse([], _, _) -> [];
+intersperse([Item], _, Fun) -> [Fun(Item)];
+intersperse([Item | Items], Sep, Fun) -> [Fun(Item), Sep | intersperse(Items, Sep, Fun)].
+
+intersperse2(_Fun, _Sep, [], []) -> [];
+intersperse2(Fun, _Sep, [I1], [I2]) -> [Fun(I1, I2)];
+intersperse2(Fun, Sep, [I1 | I1s], [I2 | I2s]) -> [Fun(I1, I2), Sep | intersperse2(Fun, Sep, I1s, I2s)].