Browse Source

accumulate results inside pgsql_sock

Anton Lebedevich 13 years ago
parent
commit
25f8e5303c
1 changed files with 88 additions and 68 deletions
  1. 88 68
      src/pgsql_sock.erl

+ 88 - 68
src/pgsql_sock.erl

@@ -38,6 +38,7 @@
                 statement,
                 columns,
                 rows = [],
+                results = [],
                 sync_required,
                 txstatus}).
 
@@ -284,9 +285,14 @@ send(#state{mod = Mod, sock = Sock}, Data) ->
 send(#state{mod = Mod, sock = Sock}, Type, Data) ->
     Mod:send(Sock, pgsql_wire:encode(Type, Data)).
 
-notify(#state{queue = Q}, Message) ->
+reply(State = #state{queue = Q}, Message) ->
     {{From, Ref}, _} = queue:get(Q),
-    From ! {Ref, Message}.
+    From ! {Ref, Message},
+    State#state{queue = queue:drop(Q),
+                statement = undefined,
+                columns = undefined,
+                rows = [],
+                results = []}.
 
 notify_async(#state{async = Pid}, Msg) ->
     case is_pid(Pid) of
@@ -294,7 +300,7 @@ notify_async(#state{async = Pid}, Msg) ->
         false -> false
     end.
 
-request_tag(#state{queue = Q}) ->
+command_tag(#state{queue = Q}) ->
     {_, Req} = queue:get(Q),
     element(1, Req).
 
@@ -325,8 +331,8 @@ auth({$R, <<M:?int32, _/binary>>}, State) ->
         8 -> Method = sspi;
         _ -> Method = unknown
     end,
-    notify(State, {error, {unsupported_auth_method, Method}}),
-    {stop, normal, State};
+    State2 = reply(State, {error, {unsupported_auth_method, Method}}),
+    {stop, normal, State2};
 
 %% ErrorResponse
 auth({error, E}, State) ->
@@ -335,20 +341,18 @@ auth({error, E}, State) ->
         <<"28P01">> -> Why = invalid_password;
         Any         -> Why = Any
     end,
-    notify(State, {error, Why}),
-    {stop, normal, State};
+    {stop, normal, reply(State, {error, Why})};
 
 auth(Other, State) ->
     on_message(Other, State).
 
 %% BackendKeyData
 initializing({$K, <<Pid:?int32, Key:?int32>>}, State) ->
-    State2 = State#state{backend = {Pid, Key}},
-    {noreply, State2};
+    {noreply, State#state{backend = {Pid, Key}}};
 
 %% ReadyForQuery
 initializing({$Z, <<Status:8>>}, State) ->
-    #state{parameters = Parameters, queue = Q} = State,
+    #state{parameters = Parameters} = State,
     erase(username),
     erase(password),
     %% TODO decode dates to now() format
@@ -356,14 +360,13 @@ initializing({$Z, <<Status:8>>}, State) ->
         {value, {_, <<"on">>}}  -> put(datetime_mod, pgsql_idatetime);
         {value, {_, <<"off">>}} -> put(datetime_mod, pgsql_fdatetime)
     end,
-    notify(State, connected),
-    {noreply, State#state{handler = on_message,
-                         txstatus = Status,
-                         queue = queue:drop(Q)}};
+    State2 = reply(State#state{handler = on_message,
+                               txstatus = Status},
+                   connected),
+    {noreply, State2};
 
 initializing({error, _} = Error, State) ->
-    notify(State, Error), 
-    {stop, normal, State};
+    {stop, normal, reply(State, Error)};
 
 initializing(Other, State) ->
     on_message(Other, State).
@@ -377,57 +380,54 @@ on_message({$t, <<_Count:?int16, Bin/binary>>}, State) ->
     #state{queue = Q} = State,
     Types = [pgsql_types:oid2type(Oid) || <<Oid:?int32>> <= Bin],
     Name = case queue:get(Q) of
-                 {_, {parse, N, _, _}} -> N;
-                 {_, {describe_statement, N}} -> N
+               {_, {parse, N, _, _}} -> N;
+               {_, {describe_statement, N}} -> N
            end,
     {noreply, State#state{statement = #statement{name = Name,
                                                  types = Types}}};
 
 %% RowDescription
 on_message({$T, <<Count:?int16, Bin/binary>>}, State) ->
-    #state{queue = Q} = State,
     Columns = pgsql_wire:decode_columns(Count, Bin),
-    State2 = case request_tag(State) of
+    State2 = case command_tag(State) of
                  C when C == squery ->
                      State#state{columns = Columns};
                  C when C == parse; C == describe_statement ->
-                     notify(State,
-                            State#state.statement#statement{columns= Columns}),
-                     State#state{statement = undefined, queue = queue:drop(Q)};
+                     reply(State,
+                           {ok, State#state.statement#statement{
+                                              columns = Columns}});
                  C when C == describe_portal ->
-                     notify(State, Columns),
-                     State#state{queue = queue:drop(Q)}
+                     reply(State, {ok, Columns})
              end,
     {noreply, State2};
 
 %% NoData
 on_message({$n, <<>>}, State) ->
-    #state{queue = Q} = State,
-    notify(State, no_data),
-    {noreply, State#state{queue = queue:drop(Q)}};
+    State2 = case command_tag(State) of
+                 C when C == parse; C == describe_statement ->
+                     reply(State,
+                           {ok, State#state.statement#statement{columns= []}});
+                 describe_portal ->
+                     reply(State, {ok, []})
+             end,
+    {noreply, State2};
 
 %% BindComplete
 on_message({$2, <<>>}, State) ->
-    #state{queue = Q} = State,
-    State2 = case queue:get(Q) of
-                 {_, {equery, #statement{columns = C}, _}} ->
-                     %% TODO remove this workaround, send Describe on handle_cast(equery)
-                     notify(State, {columns, C}),
+    State2 = case command_tag(State) of
+                 equery ->
                      State;
-                 {_, {bind, _, _, _}} ->
-                     notify(State, ok),
-                     State#state{queue = queue:drop(Q)}
-    end,
+                 bind ->
+                     reply(State, ok)
+             end,
     {noreply, State2};
 
 %% CloseComplete
 on_message({$3, <<>>}, State) ->
-    #state{queue = Q} = State,
-    State2 = case request_tag(State) of
-                 C when C == close ->
-                     notify(State, ok),
-                     State#state{queue = queue:drop(Q)};
-                 _ ->
+    State2 = case command_tag(State) of
+                 close ->
+                     reply(State, ok);
+                 equery ->
                      State
              end,
     {noreply, State2};
@@ -448,42 +448,63 @@ on_message({$D, <<_Count:?int16, Bin/binary>>}, State) ->
 
 %% PortalSuspended
 on_message({$s, <<>>}, State) ->
-    #state{queue = Q} = State,
-    notify(State, suspended),
-    {noreply, State#state{queue = queue:drop(Q)}};
+    {noreply, reply(State, {partial, lists:reverse(State#state.rows)})};
 
 %% CommandComplete
 on_message({$C, Bin}, State) ->
-    #state{queue = Q} = State,
-    Complete = pgsql_wire:decode_complete(Bin),
-    notify(State, {complete, Complete}),
-    State2 = case request_tag(State) of
-                 C when C == execute ->
-                     State#state{queue = queue:drop(Q)};
-                 _ ->
-                     State
+    Result = case pgsql_wire:decode_complete(Bin) of
+                 {_Type, Count} ->
+                     case State#state.rows of
+                         [] -> {ok, Count};
+                         _ -> {ok, Count, State#state.columns,
+                               lists:reverse(State#state.rows)}
+                     end;
+                 _Type ->
+                     {ok, State#state.columns, lists:reverse(State#state.rows)}
+             end,
+    State2 = case command_tag(State) of
+                 execute ->
+                     reply(State, Result);
+                 C when C == squery; C == equery ->
+                     State#state{results = [Result | State#state.results]}
              end,
     {noreply, State2};
 
 %% EmptyQueryResponse
 on_message({$I, _Bin}, State) ->
-    notify(State, {complete, empty}),
-    {noreply, State};
+    %% TODO check expected result format
+    State2 = case command_tag(State) of
+                 execute ->
+                     reply(State, empty);
+                 C when C == squery; C == equery ->
+                     State#state{results = [empty | State#state.results]}
+             end,
+    {noreply, State2};
 
 %% ReadyForQuery
 on_message({$Z, <<_Status:8>>}, State) ->
-    #state{queue = Q} = State,
-    notify(State, done),
-    {noreply, State#state{queue = queue:drop(Q)}};
+    State2 = case command_tag(State) of
+                 squery ->
+                     case State#state.results of
+                         [Result] ->
+                             reply(State, Result);
+                         Results ->
+                             reply(State, lists:reverse(Results))
+                     end;
+                 equery ->
+                     [Result] = State#state.results,
+                     reply(State, Result);
+                 sync ->
+                     reply(State, ok)
+             end,
+    {noreply, State2};
 
 on_message(Error = {error, _}, State) ->
-    #state{queue = Q} = State,
-    notify(State, Error),
-    State2 = case request_tag(State) of
+    State2 = case command_tag(State) of
                  C when C == squery; C == equery ->
-                     State;
+                     State#state{results = [Error | State#state.results]};
                  _ ->
-                     sync_required(State#state{queue = queue:drop(Q)})
+                     sync_required(reply(State, Error))
              end,
     {noreply, State2};
 
@@ -512,12 +533,11 @@ on_message({$A, <<Pid:?int32, Strings/binary>>}, State) ->
 sync_required(#state{queue = Q} = State) ->
     case queue:is_empty(Q) of
         false ->
-            case request_tag(State) of
-                T when T == sync ->
+            case command_tag(State) of
+                sync ->
                     State;
                 _ ->
-                    notify(State, {error, sync_required}),
-                    sync_required(State#state{queue = queue:drop(Q)})
+                    sync_required(reply(State, {error, sync_required}))
             end;
         true ->
             State#state{sync_required = true}