Browse Source

reply -> finish, add_row, notify

Anton Lebedevich 13 years ago
parent
commit
c449cd43c0
1 changed files with 67 additions and 46 deletions
  1. 67 46
      src/pgsql_sock.erl

+ 67 - 46
src/pgsql_sock.erl

@@ -101,8 +101,8 @@ handle_call({get_parameter, Name}, _From, State) ->
     end,
     end,
     {reply, {ok, Value}, State}.
     {reply, {ok, Value}, State}.
 
 
-%% TODO request format broken
-handle_cast({{From, Ref}, Command}, State = #state{sync_required = true})
+%% TODO request formats
+handle_cast({{cast, From, Ref}, Command}, State = #state{sync_required = true})
   when Command /= sync ->
   when Command /= sync ->
     From ! {Ref, {error, sync_required}},
     From ! {Ref, {error, sync_required}},
     {noreply, State};
     {noreply, State};
@@ -242,7 +242,7 @@ code_change(_OldVsn, State, _Extra) ->
 
 
 cast(C, Command) ->
 cast(C, Command) ->
     Ref = make_ref(),
     Ref = make_ref(),
-    gen_server:cast(C, {{self(), Ref}, Command}),
+    gen_server:cast(C, {{cast, self(), Ref}, Command}),
     Ref.
     Ref.
 
 
 start_ssl(S, Flag, Opts, State) ->
 start_ssl(S, Flag, Opts, State) ->
@@ -287,21 +287,31 @@ loop(#state{data = Data, handler = Handler} = State) ->
             {noreply, State}
             {noreply, State}
     end.
     end.
 
 
-reply(State = #state{queue = Q}, Message) ->
-    {{From, Ref}, _} = queue:get(Q),
-    From ! {Ref, Message},
+finish(State, Result) ->
+    finish(State, Result, Result).
+
+finish(State = #state{queue = Q}, Notice, Result) ->
+    {{cast, From, Ref}, _} = queue:get(Q),
+    From ! {Ref, Result},
     State#state{queue = queue:drop(Q),
     State#state{queue = queue:drop(Q),
                 types = [],
                 types = [],
                 columns = [],
                 columns = [],
                 rows = [],
                 rows = [],
                 results = []}.
                 results = []}.
 
 
-add_result(State = #state{results = Results}, Result) ->
+add_result(State = #state{results = Results}, Notice, Result) ->
     State#state{types = [],
     State#state{types = [],
                 columns = [],
                 columns = [],
                 rows = [],
                 rows = [],
                 results = [Result | Results]}.
                 results = [Result | Results]}.
 
 
+add_row(State = #state{rows = Rows}, Data) ->
+    State#state{rows = [Data | Rows]}.
+
+notify(State = #state{queue = Q}, Notice) ->
+    %% TODO handle 'incremental' request
+    State.
+
 notify_async(#state{async = Pid}, Msg) ->
 notify_async(#state{async = Pid}, Msg) ->
     case is_pid(Pid) of
     case is_pid(Pid) of
         true  -> Pid ! {pgsql, self(), Msg};
         true  -> Pid ! {pgsql, self(), Msg};
@@ -342,7 +352,7 @@ sync_required(#state{queue = Q} = State) ->
                 sync ->
                 sync ->
                     State;
                     State;
                 _ ->
                 _ ->
-                    sync_required(reply(State, {error, sync_required}))
+                    sync_required(finish(State, {error, sync_required}))
             end;
             end;
         true ->
         true ->
             State#state{sync_required = true}
             State#state{sync_required = true}
@@ -351,7 +361,7 @@ sync_required(#state{queue = Q} = State) ->
 flush_queue(#state{queue = Q} = State, Error) ->
 flush_queue(#state{queue = Q} = State, Error) ->
     case queue:is_empty(Q) of
     case queue:is_empty(Q) of
         false ->
         false ->
-            flush_queue(reply(State, Error), Error);
+            flush_queue(finish(State, Error), Error);
         true -> State
         true -> State
     end.
     end.
 
 
@@ -391,7 +401,7 @@ auth({$R, <<M:?int32, _/binary>>}, State) ->
         8 -> Method = sspi;
         8 -> Method = sspi;
         _ -> Method = unknown
         _ -> Method = unknown
     end,
     end,
-    State2 = reply(State, {error, {unsupported_auth_method, Method}}),
+    State2 = finish(State, {error, {unsupported_auth_method, Method}}),
     {stop, normal, State2};
     {stop, normal, State2};
 
 
 %% ErrorResponse
 %% ErrorResponse
@@ -401,7 +411,7 @@ auth({error, E}, State) ->
         <<"28P01">> -> Why = invalid_password;
         <<"28P01">> -> Why = invalid_password;
         Any         -> Why = Any
         Any         -> Why = Any
     end,
     end,
-    {stop, normal, reply(State, {error, Why})};
+    {stop, normal, finish(State, Error)};
 
 
 auth(Other, State) ->
 auth(Other, State) ->
     on_message(Other, State).
     on_message(Other, State).
@@ -420,13 +430,13 @@ initializing({$Z, <<Status:8>>}, State) ->
         {value, {_, <<"on">>}}  -> put(datetime_mod, pgsql_idatetime);
         {value, {_, <<"on">>}}  -> put(datetime_mod, pgsql_idatetime);
         {value, {_, <<"off">>}} -> put(datetime_mod, pgsql_fdatetime)
         {value, {_, <<"off">>}} -> put(datetime_mod, pgsql_fdatetime)
     end,
     end,
-    State2 = reply(State#state{handler = on_message,
+    State2 = finish(State#state{handler = on_message,
                                txstatus = Status},
                                txstatus = Status},
                    connected),
                    connected),
     {noreply, State2};
     {noreply, State2};
 
 
 initializing({error, _} = Error, State) ->
 initializing({error, _} = Error, State) ->
-    {stop, normal, reply(State, Error)};
+    {stop, normal, finish(State, Error)};
 
 
 initializing(Other, State) ->
 initializing(Other, State) ->
     on_message(Other, State).
     on_message(Other, State).
@@ -438,23 +448,29 @@ on_message({$1, <<>>}, State) ->
 %% ParameterDescription
 %% ParameterDescription
 on_message({$t, <<_Count:?int16, Bin/binary>>}, State) ->
 on_message({$t, <<_Count:?int16, Bin/binary>>}, State) ->
     Types = [pgsql_types:oid2type(Oid) || <<Oid:?int32>> <= Bin],
     Types = [pgsql_types:oid2type(Oid) || <<Oid:?int32>> <= Bin],
-    {noreply, State#state{types = Types}};
+    State2 = notify(State#state{types = Types}, {types, Types}),
+    {noreply, State2};
 
 
 %% RowDescription
 %% RowDescription
 on_message({$T, <<Count:?int16, Bin/binary>>}, State) ->
 on_message({$T, <<Count:?int16, Bin/binary>>}, State) ->
     Columns = pgsql_wire:decode_columns(Count, Bin),
     Columns = pgsql_wire:decode_columns(Count, Bin),
-    State3 = case command_tag(State) of
+    Columns2 =
+        case command_tag(State) of
+            C when C == describe_portal; C == squery ->
+                Columns;
+            C when C == parse; C == describe_statement ->
+                [Col#column{format = pgsql_wire:format(Col#column.type)}
+                 || Col <- Columns]
+        end,
+    State2 = State#state{columns = Columns2},
+    Message = {columns, Columns2},
+    State3 = case command_tag(State2) of
                  squery ->
                  squery ->
-                     State#state{columns = Columns};
-                 C when C == parse; C == describe_statement ->
-                     Columns2 =
-                         [Col#column{format = pgsql_wire:format(
-                                                Col#column.type)}
-                          || Col <- Columns],
-                     State2 = State#state{columns = Columns2},
-                     reply(State2, {ok, make_statement(State2)});
+                     notify(State2, Message);
+                 T when T == parse; T == describe_statement ->
+                     finish(State2, Message, {ok, make_statement(State2)});
                  describe_portal ->
                  describe_portal ->
-                     reply(State, {ok, Columns})
+                     finish(State2, Message, {ok, Columns})
              end,
              end,
     {noreply, State3};
     {noreply, State3};
 
 
@@ -462,9 +478,9 @@ on_message({$T, <<Count:?int16, Bin/binary>>}, State) ->
 on_message({$n, <<>>}, State) ->
 on_message({$n, <<>>}, State) ->
     State2 = case command_tag(State) of
     State2 = case command_tag(State) of
                  C when C == parse; C == describe_statement ->
                  C when C == parse; C == describe_statement ->
-                     reply(State, {ok, make_statement(State)});
+                     finish(State, no_data, {ok, make_statement(State)});
                  describe_portal ->
                  describe_portal ->
-                     reply(State, {ok, []})
+                     finish(State, no_data, {ok, []})
              end,
              end,
     {noreply, State2};
     {noreply, State2};
 
 
@@ -474,57 +490,62 @@ on_message({$2, <<>>}, State) ->
                  equery ->
                  equery ->
                      State;
                      State;
                  bind ->
                  bind ->
-                     reply(State, ok)
+                     finish(State, ok)
              end,
              end,
     {noreply, State2};
     {noreply, State2};
 
 
 %% CloseComplete
 %% CloseComplete
 on_message({$3, <<>>}, State) ->
 on_message({$3, <<>>}, State) ->
     State2 = case command_tag(State) of
     State2 = case command_tag(State) of
-                 close ->
-                     reply(State, ok);
                  equery ->
                  equery ->
-                     State
+                     State;
+                 close ->
+                     finish(State, ok)
              end,
              end,
     {noreply, State2};
     {noreply, State2};
 
 
 %% DataRow
 %% DataRow
 on_message({$D, <<_Count:?int16, Bin/binary>>}, State) ->
 on_message({$D, <<_Count:?int16, Bin/binary>>}, State) ->
     Data = pgsql_wire:decode_data(get_columns(State), Bin),
     Data = pgsql_wire:decode_data(get_columns(State), Bin),
-    {noreply, State#state{rows = [Data | State#state.rows]}};
+    {noreply, add_row(State, Data)};
 
 
 %% PortalSuspended
 %% PortalSuspended
 on_message({$s, <<>>}, State) ->
 on_message({$s, <<>>}, State) ->
-    {noreply, reply(State, {partial, lists:reverse(State#state.rows)})};
+    State2 = finish(State,
+                   suspended,
+                   {partial, lists:reverse(State#state.rows)}),
+    {noreply, State2};
 
 
 %% CommandComplete
 %% CommandComplete
 on_message({$C, Bin}, State) ->
 on_message({$C, Bin}, State) ->
     Complete = pgsql_wire:decode_complete(Bin),
     Complete = pgsql_wire:decode_complete(Bin),
     Command = command_tag(State),
     Command = command_tag(State),
+    Notice = {complete, Complete},
     Rows = lists:reverse(State#state.rows),
     Rows = lists:reverse(State#state.rows),
     State2 = case {Command, Complete, Rows} of
     State2 = case {Command, Complete, Rows} of
                  {execute, {_, Count}, []} ->
                  {execute, {_, Count}, []} ->
-                     reply(State, {ok, Count});
+                     finish(State, Notice, {ok, Count});
                  {execute, {_, Count}, _} ->
                  {execute, {_, Count}, _} ->
-                     reply(State, {ok, Count, Rows});
+                     finish(State, Notice, {ok, Count, Rows});
                  {execute, _, _} ->
                  {execute, _, _} ->
-                     reply(State, {ok, Rows});
+                     finish(State, Notice, {ok, Rows});
                  {C, {_, Count}, []} when C == squery; C == equery ->
                  {C, {_, Count}, []} when C == squery; C == equery ->
-                     add_result(State, {ok, Count});
+                     add_result(State, Notice, {ok, Count});
                  {C, {_, Count}, _} when C == squery; C == equery ->
                  {C, {_, Count}, _} when C == squery; C == equery ->
-                     add_result(State, {ok, Count, get_columns(State), Rows});
+                     add_result(State, Notice, {ok, Count, get_columns(State), Rows});
                  {C, _, _} when C == squery; C == equery ->
                  {C, _, _} when C == squery; C == equery ->
-                     add_result(State, {ok, get_columns(State), Rows})
+                     add_result(State, Notice, {ok, get_columns(State), Rows})
              end,
              end,
     {noreply, State2};
     {noreply, State2};
 
 
 %% EmptyQueryResponse
 %% EmptyQueryResponse
 on_message({$I, _Bin}, State) ->
 on_message({$I, _Bin}, State) ->
+    Notice = {complete, empty},
     State2 = case command_tag(State) of
     State2 = case command_tag(State) of
                  execute ->
                  execute ->
-                     reply(State, {ok, [], []});
+                     finish(State, Notice, {ok, [], []});
                  C when C == squery; C == equery ->
                  C when C == squery; C == equery ->
-                     add_result(State, {ok, [], []})
+                     add_result(State, Notice, {ok, [], []})
              end,
              end,
     {noreply, State2};
     {noreply, State2};
 
 
@@ -534,24 +555,24 @@ on_message({$Z, <<Status:8>>}, State) ->
                  squery ->
                  squery ->
                      case State#state.results of
                      case State#state.results of
                          [Result] ->
                          [Result] ->
-                             reply(State, Result);
+                             finish(State, done, Result);
                          Results ->
                          Results ->
-                             reply(State, lists:reverse(Results))
+                             finish(State, done, lists:reverse(Results))
                      end;
                      end;
                  equery ->
                  equery ->
                      [Result] = State#state.results,
                      [Result] = State#state.results,
-                     reply(State, Result);
+                     finish(State, done, Result);
                  sync ->
                  sync ->
-                     reply(State, ok)
+                     finish(State, ok)
              end,
              end,
     {noreply, State2#state{txstatus = Status}};
     {noreply, State2#state{txstatus = Status}};
 
 
 on_message(Error = {error, _}, State) ->
 on_message(Error = {error, _}, State) ->
     State2 = case command_tag(State) of
     State2 = case command_tag(State) of
                  C when C == squery; C == equery ->
                  C when C == squery; C == equery ->
-                     add_result(State, Error);
+                     add_result(State, Error, Error);
                  _ ->
                  _ ->
-                     sync_required(reply(State, Error))
+                     sync_required(finish(State, Error))
              end,
              end,
     {noreply, State2};
     {noreply, State2};