Browse Source

sync_required

Anton Lebedevich 13 years ago
parent
commit
43baf6666a
1 changed files with 24 additions and 4 deletions
  1. 24 4
      src/pgsql_sock.erl

+ 24 - 4
src/pgsql_sock.erl

@@ -36,6 +36,7 @@
                 async,
                 parameters = [],
                 columns,
+                sync_required,
                 txstatus}).
 
 %% -- client interface --
@@ -92,8 +93,13 @@ handle_call({get_parameter, Name}, _From, State) ->
     end,
     {reply, {ok, Value}, State}.
 
-handle_cast(Req = {{_, _}, {connect, Host, Username, Password, Opts}},
-            #state{queue = Q} = State) ->
+handle_cast({{From, Ref}, Command}, State = #state{sync_required = true})
+  when Command /= sync ->
+    From ! {Ref, {error, sync_required}},
+    {noreply, State};
+
+handle_cast(Req = {_, {connect, Host, Username, Password, Opts}}, State) ->
+    #state{queue = Q} = State,
     Timeout = proplists:get_value(timeout, Opts, 5000),
     Port = proplists:get_value(port, Opts, 5432),
     SockOpts = [{active, false}, {packet, raw}, binary, {nodelay, true}],
@@ -190,7 +196,7 @@ handle_cast(Req = {_, {close, Type, Name}}, State) ->
 handle_cast(Req = {_, sync}, State) ->
     #state{queue = Q} = State,
     send(State, $S, []),
-    {noreply, State#state{queue = queue:in(Req, Q)}};
+    {noreply, State#state{queue = queue:in(Req, Q), sync_required = false}};
 
 handle_cast(cancel, State = #state{backend = {Pid, Key}}) ->
     {ok, {Addr, Port}} = inet:peername(State#state.sock),
@@ -457,7 +463,7 @@ on_message(Error = {error, _}, State) ->
                  C when C == squery; C == equery ->
                      State;
                  _ ->
-                     State#state{queue = queue:drop(Q)}
+                     sync_required(State#state{queue = queue:drop(Q)})
              end,
     {noreply, State2};
 
@@ -483,6 +489,20 @@ on_message({$A, <<Pid:?int32, Strings/binary>>}, State) ->
     notify_async(State, {notification, Channel, Pid, Payload}),
     {noreply, State}.
 
+sync_required(#state{queue = Q} = State) ->
+    case queue:is_empty(Q) of
+        false ->
+            case request_tag(State) of
+                T when T == sync ->
+                    State;
+                _ ->
+                    notify(State, {error, sync_required}),
+                    sync_required(State#state{queue = queue:drop(Q)})
+            end;
+        true ->
+            State#state{sync_required = true}
+    end.
+
 to_binary(B) when is_binary(B) -> B;
 to_binary(L) when is_list(L)   -> list_to_binary(L).