Browse Source

receive single result

Anton Lebedevich 13 years ago
parent
commit
afd7110295
2 changed files with 20 additions and 97 deletions
  1. 15 95
      src/pgsql.erl
  2. 5 2
      src/pgsql_sock.erl

+ 15 - 95
src/pgsql.erl

@@ -40,10 +40,7 @@ get_parameter(C, Name) ->
 
 
 squery(C, Sql) ->
 squery(C, Sql) ->
     Ref = pgsql_sock:squery(C, Sql),
     Ref = pgsql_sock:squery(C, Sql),
-    case receive_results(C, Ref, []) of
-        [Result] -> Result;
-        Results  -> Results
-    end.
+    receive_result(C, Ref).
 
 
 equery(C, Sql) ->
 equery(C, Sql) ->
     equery(C, Sql, []).
     equery(C, Sql, []).
@@ -54,7 +51,7 @@ equery(C, Sql, Parameters) ->
         {ok, #statement{types = Types} = S} ->
         {ok, #statement{types = Types} = S} ->
             Typed_Parameters = lists:zip(Types, Parameters),
             Typed_Parameters = lists:zip(Types, Parameters),
             Ref = pgsql_sock:equery(C, S, Typed_Parameters),
             Ref = pgsql_sock:equery(C, S, Typed_Parameters),
-            receive_result(C, Ref, undefined);
+            receive_result(C, Ref);
         Error ->
         Error ->
             Error
             Error
     end.
     end.
@@ -69,7 +66,7 @@ parse(C, Sql, Types) ->
 
 
 parse(C, Name, Sql, Types) ->
 parse(C, Name, Sql, Types) ->
     Ref = pgsql_sock:parse(C, Name, Sql, Types),
     Ref = pgsql_sock:parse(C, Name, Sql, Types),
-    sync_on_error(C, receive_describe(C, Ref, #statement{name = Name})).
+    sync_on_error(C, receive_result(C, Ref)).
 
 
 %% bind
 %% bind
 
 
@@ -78,7 +75,7 @@ bind(C, Statement, Parameters) ->
 
 
 bind(C, Statement, PortalName, Parameters) ->
 bind(C, Statement, PortalName, Parameters) ->
     Ref = pgsql_sock:bind(C, Statement, PortalName, Parameters),
     Ref = pgsql_sock:bind(C, Statement, PortalName, Parameters),
-    sync_on_error(C, receive_atom(C, Ref, ok, ok)).
+    sync_on_error(C, receive_result(C, Ref)).
 
 
 %% execute
 %% execute
 
 
@@ -90,31 +87,28 @@ execute(C, S, N) ->
 
 
 execute(C, S, PortalName, N) ->
 execute(C, S, PortalName, N) ->
     Ref = pgsql_sock:execute(C, S, PortalName, N),
     Ref = pgsql_sock:execute(C, S, PortalName, N),
-    receive_extended_result(C, Ref).
+    receive_result(C, Ref).
 
 
 %% statement/portal functions
 %% statement/portal functions
 
 
 describe(C, #statement{name = Name}) ->
 describe(C, #statement{name = Name}) ->
     describe(C, statement, Name).
     describe(C, statement, Name).
 
 
-describe(C, statement, Name) ->
-    Ref = pgsql_sock:describe(C, statement, Name),
-    sync_on_error(C, receive_describe(C, Ref, #statement{name = Name}));
-
 describe(C, Type, Name) ->
 describe(C, Type, Name) ->
+    Ref = pgsql_sock:describe(C, Type, Name),
     %% TODO unknown result format of Describe portal
     %% TODO unknown result format of Describe portal
-    pgsql_sock:describe(C, Type, Name).
+    sync_on_error(C, receive_result(C, Ref)).
 
 
 close(C, #statement{name = Name}) ->
 close(C, #statement{name = Name}) ->
     close(C, statement, Name).
     close(C, statement, Name).
 
 
 close(C, Type, Name) ->
 close(C, Type, Name) ->
     Ref = pgsql_sock:close(C, Type, Name),
     Ref = pgsql_sock:close(C, Type, Name),
-    receive_atom(C, Ref, ok, ok).
+    receive_result(C, Ref).
 
 
 sync(C) ->
 sync(C) ->
     Ref = pgsql_sock:sync(C),
     Ref = pgsql_sock:sync(C),
-    receive_atom(C, Ref, done, ok).
+    receive_result(C, Ref).
 
 
 %% misc helper functions
 %% misc helper functions
 with_transaction(C, F) ->
 with_transaction(C, F) ->
@@ -131,93 +125,19 @@ with_transaction(C, F) ->
 
 
 %% -- internal functions --
 %% -- internal functions --
 
 
-receive_result(C, Ref, Result) ->
-    try receive_result(C, Ref, [], []) of
-        done    -> Result;
-        R       -> receive_result(C, Ref, R)
-    catch
-        throw:E -> E
-    end.
-
-receive_results(C, Ref, Results) ->
-    try receive_result(C, Ref, [], []) of
-        done    -> lists:reverse(Results);
-        R       -> receive_results(C, Ref, [R | Results])
-    catch
-        throw:E -> E
-    end.
-
-receive_result(C, Ref, Cols, Rows) ->
+receive_result(C, Ref) ->
+    %% TODO timeout
     receive
     receive
-        {Ref, {columns, Cols2}} ->
-            receive_result(C, Ref, Cols2, Rows);
-        {Ref, {data, Row}} ->
-            receive_result(C, Ref, Cols, [Row | Rows]);
-        {Ref, {error, _E} = Error} ->
-            Error;
-        {Ref, {complete, {_Type, Count}}} ->
-            case Rows of
-                [] -> {ok, Count};
-                _L -> {ok, Count, Cols, lists:reverse(Rows)}
-            end;
-        {Ref, {complete, _Type}} ->
-            {ok, Cols, lists:reverse(Rows)};
-        {Ref, done} ->
-            done;
-        {'EXIT', C, _Reason} ->
-            throw({error, closed})
-    end.
-
-receive_extended_result(C, Ref)->
-    receive_extended_result(C, Ref, []).
-
-receive_extended_result(C, Ref, Rows) ->
-    receive
-        {Ref, {data, Row}} ->
-            receive_extended_result(C, Ref, [Row | Rows]);
-        {Ref, {error, _E} = Error} ->
-            Error;
-        {Ref, suspended} ->
-            {partial, lists:reverse(Rows)};
-        {Ref, {complete, {_Type, Count}}} ->
-            case Rows of
-                [] -> {ok, Count};
-                _L -> {ok, Count, lists:reverse(Rows)}
-            end;
-        {Ref, {complete, _Type}} ->
-            {ok, lists:reverse(Rows)};
-        {'EXIT', C, _Reason} ->
-            {error, closed}
-    end.
-
-receive_describe(C, Ref, Statement = #statement{}) ->
-    receive
-        {Ref, {types, Types}} ->
-            receive_describe(C, Ref, Statement#statement{types = Types});
-        {Ref, {columns, Columns}} ->
-            Columns2 = [Col#column{format = pgsql_wire:format(Col#column.type)} || Col <- Columns],
-            {ok, Statement#statement{columns = Columns2}};
-        {Ref, no_data} ->
-            {ok, Statement#statement{columns = []}};
-        {Ref, Error = {error, _}} ->
-            Error;
-        {'EXIT', C, _Reason} ->
-            {error, closed}
-    end.
-
-receive_atom(C, Ref, Receive, Return) ->
-    receive
-        {Ref, Receive} ->
-            Return;
-        {Ref, Error = {error, _}} ->
-            Error;
+        {Ref, Result} ->
+            Result;
+        %% TODO no 'EXIT' for not linked processes
         {'EXIT', C, _Reason} ->
         {'EXIT', C, _Reason} ->
             {error, closed}
             {error, closed}
     end.
     end.
 
 
 sync_on_error(C, Error = {error, _}) ->
 sync_on_error(C, Error = {error, _}) ->
     Ref = pgsql_sock:sync(C),
     Ref = pgsql_sock:sync(C),
-    receive_atom(C, Ref, done, ok),
+    receive_result(C, Ref),
     Error;
     Error;
 
 
 sync_on_error(_C, R) ->
 sync_on_error(_C, R) ->

+ 5 - 2
src/pgsql_sock.erl

@@ -394,10 +394,13 @@ on_message({$T, <<Count:?int16, Bin/binary>>}, State) ->
                  squery ->
                  squery ->
                      State#state{columns = Columns};
                      State#state{columns = Columns};
                  C when C == parse; C == describe_statement ->
                  C when C == parse; C == describe_statement ->
-                     %% TODO set binary format to supported columns
+                     Columns2 =
+                         [Col#column{format = pgsql_wire:format(
+                                                Col#column.type)}
+                          || Col <- Columns],
                      reply(State,
                      reply(State,
                            {ok, State#state.statement#statement{
                            {ok, State#state.statement#statement{
-                                              columns = Columns}});
+                                              columns = Columns2}});
                  describe_portal ->
                  describe_portal ->
                      reply(State, {ok, Columns})
                      reply(State, {ok, Columns})
              end,
              end,