Browse Source

handle_cast for bind, execute, describe, close, sync; pgsql:receive_atom

Anton Lebedevich 13 years ago
parent
commit
2872a91221
2 changed files with 61 additions and 6 deletions
  1. 16 2
      src/pgsql.erl
  2. 45 4
      src/pgsql_sock.erl

+ 16 - 2
src/pgsql.erl

@@ -77,7 +77,8 @@ bind(C, Statement, Parameters) ->
     bind(C, Statement, "", Parameters).
     bind(C, Statement, "", Parameters).
 
 
 bind(C, Statement, PortalName, Parameters) ->
 bind(C, Statement, PortalName, Parameters) ->
-    pgsql_sock:bind(C, Statement, PortalName, Parameters).
+    Ref = pgsql_sock:bind(C, Statement, PortalName, Parameters),
+    receive_atom(C, Ref, ok, ok).
 
 
 %% execute
 %% execute
 
 
@@ -102,6 +103,7 @@ describe(C, statement, Name) ->
     receive_describe(C, Ref, #statement{name = Name});
     receive_describe(C, Ref, #statement{name = Name});
 
 
 describe(C, Type, Name) ->
 describe(C, Type, Name) ->
+    %% TODO unknown result format of Describe portal
     pgsql_sock:describe(C, Type, Name).
     pgsql_sock:describe(C, Type, Name).
 
 
 close(C, #statement{name = Name}) ->
 close(C, #statement{name = Name}) ->
@@ -111,7 +113,8 @@ close(C, Type, Name) ->
     pgsql_sock:close(C, Type, Name).
     pgsql_sock:close(C, Type, Name).
 
 
 sync(C) ->
 sync(C) ->
-    pgsql_sock:sync(C).
+    Ref = pgsql_sock:sync(C),
+    receive_atom(C, Ref, done, ok).
 
 
 %% misc helper functions
 %% misc helper functions
 with_transaction(C, F) ->
 with_transaction(C, F) ->
@@ -122,6 +125,7 @@ with_transaction(C, F) ->
     catch
     catch
         _:Why ->
         _:Why ->
             squery(C, "ROLLBACK"),
             squery(C, "ROLLBACK"),
+            %% TODO hides error stacktrace
             {rollback, Why}
             {rollback, Why}
     end.
     end.
 
 
@@ -201,3 +205,13 @@ receive_describe(C, Ref, Statement = #statement{}) ->
         {'EXIT', C, _Reason} ->
         {'EXIT', C, _Reason} ->
             {error, closed}
             {error, closed}
     end.
     end.
+
+receive_atom(C, Ref, Receive, Return) ->
+    receive
+        {Ref, Receive} ->
+            Return;
+        {Ref, Error = {error, _}} ->
+            Error;
+        {'EXIT', C, _Reason} ->
+            {error, closed}
+    end.

+ 45 - 4
src/pgsql_sock.erl

@@ -125,12 +125,12 @@ handle_cast(stop, #state{queue = Q} = State) ->
     %% TODO flush queue
     %% TODO flush queue
     {stop, normal, State};
     {stop, normal, State};
 
 
-handle_cast(Req = {{_, _}, {squery, Sql}}, State) ->
+handle_cast(Req = {_, {squery, Sql}}, State) ->
     #state{queue = Q} = State,
     #state{queue = Q} = State,
     send(State, $Q, [Sql, 0]),
     send(State, $Q, [Sql, 0]),
     {noreply, State#state{queue = queue:in(Req, Q)}};
     {noreply, State#state{queue = queue:in(Req, Q)}};
 
 
-handle_cast(Req = {{_,_}, {equery, Statement, Parameters}}, State) ->
+handle_cast(Req = {_, {equery, Statement, Parameters}}, State) ->
     #state{queue = Q} = State,
     #state{queue = Q} = State,
     #statement{name = StatementName, columns = Columns} = Statement,
     #statement{name = StatementName, columns = Columns} = Statement,
     Bin1 = pgsql_wire:encode_parameters(Parameters),
     Bin1 = pgsql_wire:encode_parameters(Parameters),
@@ -143,7 +143,7 @@ handle_cast(Req = {{_,_}, {equery, Statement, Parameters}}, State) ->
     send(State, $S, []),
     send(State, $S, []),
     {noreply, State#state{queue = queue:in(Req, Q)}};
     {noreply, State#state{queue = queue:in(Req, Q)}};
 
 
-handle_cast(Req = {{_, _}, {parse, Name, Sql, Types}}, State) ->
+handle_cast(Req = {_, {parse, Name, Sql, Types}}, State) ->
     #state{queue = Q} = State,
     #state{queue = Q} = State,
     Bin = pgsql_wire:encode_types(Types),
     Bin = pgsql_wire:encode_types(Types),
     send(State, $P, [Name, 0, Sql, 0, Bin]),
     send(State, $P, [Name, 0, Sql, 0, Bin]),
@@ -151,6 +151,47 @@ handle_cast(Req = {{_, _}, {parse, Name, Sql, Types}}, State) ->
     send(State, $H, []),
     send(State, $H, []),
     {noreply, State#state{queue = queue:in(Req, Q)}};
     {noreply, State#state{queue = queue:in(Req, Q)}};
 
 
+handle_cast(Req = {_, {bind, Statement, PortalName, Parameters}}, State) ->
+    #state{queue = Q} = State,
+    #statement{name = StatementName, columns = Columns, types = Types} = Statement,
+    Typed_Parameters = lists:zip(Types, Parameters),
+    Bin1 = pgsql_wire:encode_parameters(Typed_Parameters),
+    Bin2 = pgqsl_wire:encode_formats(Columns),
+    send(State, $B, [PortalName, 0, StatementName, 0, Bin1, Bin2]),
+    send(State, $H, []),
+    {noreply, State#state{queue = queue:in(Req, Q)}};
+
+handle_cast(Req = {_, {execute, _, PortalName, MaxRows}}, State) ->
+    #state{queue = Q} = State,
+    send(State, $E, [PortalName, 0, <<MaxRows:?int32>>]),
+    send(State, $H, []),
+    {noreply, State#state{queue = queue:in(Req, Q)}};
+
+handle_cast(Req = {_, {describe, Type, Name}}, State) ->
+    #state{queue = Q} = State,
+    case Type of
+        statement -> Type2 = $S;
+        portal    -> Type2 = $P
+    end,
+    send(State, $D, [Type2, Name, 0]),
+    send(State, $H, []),
+    {noreply, State#state{queue = queue:in(Req, Q)}};
+
+handle_cast(Req = {_, {close, Type, Name}}, State) ->
+    #state{queue = Q} = State,
+    case Type of
+        statement -> Type2 = $S;
+        portal    -> Type2 = $P
+    end,
+    send(State, $C, [Type2, Name, 0]),
+    send(State, $H, []),
+    {noreply, State#state{queue = queue:in(Req, Q)}};
+
+handle_cast(Req = {_, sync}, State) ->
+    #state{queue = Q} = State,
+    send(State, $S, []),
+    {noreply, State#state{queue = queue:in(Req, Q)}};
+
 handle_cast(cancel, State = #state{backend = {Pid, Key}}) ->
 handle_cast(cancel, State = #state{backend = {Pid, Key}}) ->
     {ok, {Addr, Port}} = inet:peername(State#state.sock),
     {ok, {Addr, Port}} = inet:peername(State#state.sock),
     SockOpts = [{active, false}, {packet, raw}, binary],
     SockOpts = [{active, false}, {packet, raw}, binary],
@@ -239,7 +280,7 @@ notify_async(#state{async = Pid}, Msg) ->
         false -> false
         false -> false
     end.
     end.
 
 
-request_tag(#state{queue = Q} = State) ->
+request_tag(#state{queue = Q}) ->
     {_, Req} = queue:get(Q),
     {_, Req} = queue:get(Q),
     element(1, Req).
     element(1, Req).