|
@@ -22,6 +22,7 @@ connect(Host, Username, Opts) ->
|
|
|
connect(Host, Username, Password, Opts) ->
|
|
|
{ok, C} = pgsql_sock:start_link(),
|
|
|
Ref = pgsql_sock:connect(C, Host, Username, Password, Opts),
|
|
|
+ %% TODO connect timeout
|
|
|
receive
|
|
|
{Ref, connected} ->
|
|
|
{ok, C};
|
|
@@ -38,8 +39,8 @@ get_parameter(C, Name) ->
|
|
|
pgsql_sock:get_parameter(C, Name).
|
|
|
|
|
|
squery(C, Sql) ->
|
|
|
- ok = pgsql_sock:squery(C, Sql),
|
|
|
- case receive_results(C, []) of
|
|
|
+ Ref = pgsql_sock:squery(C, Sql),
|
|
|
+ case receive_results(C, Ref, []) of
|
|
|
[Result] -> Result;
|
|
|
Results -> Results
|
|
|
end.
|
|
@@ -49,12 +50,10 @@ equery(C, Sql) ->
|
|
|
|
|
|
equery(C, Sql, Parameters) ->
|
|
|
case parse(C, Sql) of
|
|
|
- {ok, S} ->
|
|
|
- ok = bind(C, S, Parameters),
|
|
|
- ok = pgsql_sock:execute(C, S, "", 0),
|
|
|
- ok = close(C, S),
|
|
|
- ok = sync(C),
|
|
|
- receive_result(C, undefined);
|
|
|
+ {ok, #statement{types = Types} = S} ->
|
|
|
+ Typed_Parameters = lists:zip(Types, Parameters),
|
|
|
+ Ref = pgsql_sock:equery(C, S, Typed_Parameters),
|
|
|
+ receive_result(C, Ref, undefined);
|
|
|
Error ->
|
|
|
Error
|
|
|
end.
|
|
@@ -68,7 +67,8 @@ parse(C, Sql, Types) ->
|
|
|
parse(C, "", Sql, Types).
|
|
|
|
|
|
parse(C, Name, Sql, Types) ->
|
|
|
- pgsql_sock:parse(C, Name, Sql, Types).
|
|
|
+ Ref = pgsql_sock:parse(C, Name, Sql, Types),
|
|
|
+ receive_describe(C, Ref, #statement{name = Name}).
|
|
|
|
|
|
%% bind
|
|
|
|
|
@@ -87,13 +87,18 @@ execute(C, S, N) ->
|
|
|
execute(C, S, "", N).
|
|
|
|
|
|
execute(C, S, PortalName, N) ->
|
|
|
- pgsql_sock:execute(C, S, PortalName, N),
|
|
|
- receive_extended_result(C).
|
|
|
+ Ref = pgsql_sock:execute(C, S, PortalName, N),
|
|
|
+ receive_extended_result(C, Ref).
|
|
|
|
|
|
%% statement/portal functions
|
|
|
|
|
|
-describe(C, #statement{name = Name}) ->
|
|
|
- pgsql_sock:describe(C, statement, Name).
|
|
|
+describe(C, Statement = #statement{name = Name}) ->
|
|
|
+ Ref = pgsql_sock:describe(C, statement, Name),
|
|
|
+ receive_describe(C, Ref, Statement).
|
|
|
+
|
|
|
+describe(C, statement, Name) ->
|
|
|
+ Ref = pgsql_sock:describe(C, statement, Name),
|
|
|
+ receive_describe(C, Ref, #statement{name = Name});
|
|
|
|
|
|
describe(C, Type, Name) ->
|
|
|
pgsql_sock:describe(C, Type, Name).
|
|
@@ -121,65 +126,79 @@ with_transaction(C, F) ->
|
|
|
|
|
|
%% -- internal functions --
|
|
|
|
|
|
-receive_result(C, Result) ->
|
|
|
- try receive_result(C, [], []) of
|
|
|
+receive_result(C, Ref, Result) ->
|
|
|
+ try receive_result(C, Ref, [], []) of
|
|
|
done -> Result;
|
|
|
- R -> receive_result(C, R)
|
|
|
+ R -> receive_result(C, Ref, R)
|
|
|
catch
|
|
|
throw:E -> E
|
|
|
end.
|
|
|
|
|
|
-receive_results(C, Results) ->
|
|
|
- try receive_result(C, [], []) of
|
|
|
+receive_results(C, Ref, Results) ->
|
|
|
+ try receive_result(C, Ref, [], []) of
|
|
|
done -> lists:reverse(Results);
|
|
|
- R -> receive_results(C, [R | Results])
|
|
|
+ R -> receive_results(C, Ref, [R | Results])
|
|
|
catch
|
|
|
throw:E -> E
|
|
|
end.
|
|
|
|
|
|
-receive_result(C, Cols, Rows) ->
|
|
|
+receive_result(C, Ref, Cols, Rows) ->
|
|
|
receive
|
|
|
- {pgsql, C, {columns, Cols2}} ->
|
|
|
- receive_result(C, Cols2, Rows);
|
|
|
- {pgsql, C, {data, Row}} ->
|
|
|
- receive_result(C, Cols, [Row | Rows]);
|
|
|
- {pgsql, C, {error, _E} = Error} ->
|
|
|
+ {Ref, {columns, Cols2}} ->
|
|
|
+ receive_result(C, Ref, Cols2, Rows);
|
|
|
+ {Ref, {data, Row}} ->
|
|
|
+ receive_result(C, Ref, Cols, [Row | Rows]);
|
|
|
+ {Ref, {error, _E} = Error} ->
|
|
|
Error;
|
|
|
- {pgsql, C, {complete, {_Type, Count}}} ->
|
|
|
+ {Ref, {complete, {_Type, Count}}} ->
|
|
|
case Rows of
|
|
|
[] -> {ok, Count};
|
|
|
_L -> {ok, Count, Cols, lists:reverse(Rows)}
|
|
|
end;
|
|
|
- {pgsql, C, {complete, _Type}} ->
|
|
|
+ {Ref, {complete, _Type}} ->
|
|
|
{ok, Cols, lists:reverse(Rows)};
|
|
|
- {pgsql, C, done} ->
|
|
|
+ {Ref, done} ->
|
|
|
done;
|
|
|
- {pgsql, C, timeout} ->
|
|
|
+ {Ref, timeout} ->
|
|
|
throw({error, timeout});
|
|
|
{'EXIT', C, _Reason} ->
|
|
|
throw({error, closed})
|
|
|
end.
|
|
|
|
|
|
-receive_extended_result(C)->
|
|
|
- receive_extended_result(C, []).
|
|
|
+receive_extended_result(C, Ref)->
|
|
|
+ receive_extended_result(C, Ref, []).
|
|
|
|
|
|
-receive_extended_result(C, Rows) ->
|
|
|
+receive_extended_result(C, Ref, Rows) ->
|
|
|
receive
|
|
|
- {pgsql, C, {data, Row}} ->
|
|
|
- receive_extended_result(C, [Row | Rows]);
|
|
|
- {pgsql, C, {error, _E} = Error} ->
|
|
|
+ {Ref, {data, Row}} ->
|
|
|
+ receive_extended_result(C, Ref, [Row | Rows]);
|
|
|
+ {Ref, {error, _E} = Error} ->
|
|
|
Error;
|
|
|
- {pgsql, C, suspended} ->
|
|
|
+ {Ref, suspended} ->
|
|
|
{partial, lists:reverse(Rows)};
|
|
|
- {pgsql, C, {complete, {_Type, Count}}} ->
|
|
|
+ {Ref, {complete, {_Type, Count}}} ->
|
|
|
case Rows of
|
|
|
[] -> {ok, Count};
|
|
|
_L -> {ok, Count, lists:reverse(Rows)}
|
|
|
end;
|
|
|
- {pgsql, C, {complete, _Type}} ->
|
|
|
+ {Ref, {complete, _Type}} ->
|
|
|
{ok, lists:reverse(Rows)};
|
|
|
- {pgsql, C, timeout} ->
|
|
|
+ {Ref, timeout} ->
|
|
|
{error, timeout};
|
|
|
{'EXIT', C, _Reason} ->
|
|
|
{error, closed}
|
|
|
end.
|
|
|
+
|
|
|
+receive_describe(C, Ref, Statement = #statement{}) ->
|
|
|
+ receive
|
|
|
+ {Ref, {types, Types}} ->
|
|
|
+ receive_describe(C, Ref, Statement#statement{types = Types});
|
|
|
+ {Ref, {columns, Columns}} ->
|
|
|
+ Statement#statement{columns = Columns};
|
|
|
+ {Ref, no_data} ->
|
|
|
+ Statement#statement{columns = []};
|
|
|
+ {Ref, Error = {error, _}} ->
|
|
|
+ Error;
|
|
|
+ {'EXIT', C, _Reason} ->
|
|
|
+ {error, closed}
|
|
|
+ end.
|