|
@@ -134,9 +134,8 @@ handle_cast(Req = {_, {connect, Host, Username, Password, Opts}}, State) ->
|
|
|
queue = queue:in(Req, Q),
|
|
|
async = Async}};
|
|
|
|
|
|
-handle_cast(stop, #state{queue = Q} = State) ->
|
|
|
- %% TODO flush queue
|
|
|
- {stop, normal, State};
|
|
|
+handle_cast(stop, State) ->
|
|
|
+ {stop, normal, flush_queue(State, {error, stop})};
|
|
|
|
|
|
handle_cast(Req = {_, {squery, Sql}}, State) ->
|
|
|
#state{queue = Q} = State,
|
|
@@ -220,13 +219,12 @@ handle_cast(cancel, State = #state{backend = {Pid, Key}}) ->
|
|
|
|
|
|
handle_info({Closed, Sock}, #state{sock = Sock} = State)
|
|
|
when Closed == tcp_closed; Closed == ssl_closed ->
|
|
|
- %% TODO flush queue
|
|
|
- {stop, sock_closed, State};
|
|
|
+ {stop, sock_closed, flush_queue(State, {error, sock_closed})};
|
|
|
|
|
|
handle_info({Error, Sock, Reason}, #state{sock = Sock} = State)
|
|
|
when Error == tcp_error; Error == ssl_error ->
|
|
|
- %% TODO flush queue
|
|
|
- {stop, {sock_error, Reason}, State};
|
|
|
+ Why = {sock_error, Reason},
|
|
|
+ {stop, Why, flush_queue(State, {error, Why})};
|
|
|
|
|
|
handle_info({_, Sock, Data2}, #state{data = Data, sock = Sock} = State) ->
|
|
|
loop(State#state{data = <<Data/binary, Data2/binary>>}).
|
|
@@ -340,6 +338,13 @@ sync_required(#state{queue = Q} = State) ->
|
|
|
State#state{sync_required = true}
|
|
|
end.
|
|
|
|
|
|
+flush_queue(#state{queue = Q} = State, Error) ->
|
|
|
+ case queue:is_empty(Q) of
|
|
|
+ false ->
|
|
|
+ flush_queue(reply(State, Error), Error);
|
|
|
+ true -> State
|
|
|
+ end.
|
|
|
+
|
|
|
to_binary(B) when is_binary(B) -> B;
|
|
|
to_binary(L) when is_list(L) -> list_to_binary(L).
|
|
|
|