Просмотр исходного кода

handle_cast with From, Ref, implement get_parameter

Anton Lebedevich 13 лет назад
Родитель
Сommit
55f94ebe17
1 измененных файлов с 54 добавлено и 47 удалено
  1. 54 47
      src/pgsql_sock.erl

+ 54 - 47
src/pgsql_sock.erl

@@ -5,16 +5,17 @@
 -behavior(gen_server).
 
 -export([start_link/0,
-         connect/5,
+         connect/7,
          close/1,
          get_parameter/2,
-         squery/2,
-         parse/4,
-         bind/4,
-         execute/4,
-         describe/3,
-         sync/1,
-         close/3,
+         squery/4,
+         equery/5,
+         parse/6,
+         bind/6,
+         execute/6,
+         describe/5,
+         close/5,
+         sync/3,
          cancel/1]).
 
 -export([handle_call/3, handle_cast/2, handle_info/2]).
@@ -33,7 +34,6 @@
                 handler,
                 queue = queue:new(),
                 async,
-                ready,
                 parameters = [],
                 txstatus}).
 
@@ -42,39 +42,39 @@
 start_link() ->
     gen_server:start_link(?MODULE, [], []).
 
-connect(C, Host, Username, Password, Opts) ->
-    gen_server:cast(C, {connect, Host, Username, Password, Opts}).
+connect(C, From, Ref, Host, Username, Password, Opts) ->
+    gen_server:cast(C, {connect, From, Ref, Host, Username, Password, Opts}).
 
 close(C) when is_pid(C) ->
     catch gen_server:cast(C, stop),
     ok.
 
 get_parameter(C, Name) ->
-    gen_server:call(C, {get_parameter, to_binary(Name)}).
+    gen_server:call(C, {get_parameter, to_binary(Name)}, infinity).
 
-squery(C, Sql) ->
-    gen_server:cast(C, {squery, Sql}).
+squery(C, From, Ref, Sql) ->
+    gen_server:cast(C, {squery, From, Ref, Sql}).
 
-equery(C, Statement, Parameters) ->
-    gen_server:cast(C, {equery, Statement, Parameters}).
+equery(C, From, Ref, Statement, Parameters) ->
+    gen_server:cast(C, {equery, From, Ref, Statement, Parameters}).
 
-parse(C, Name, Sql, Types) ->
-    gen_server:cast(C, {parse, Name, Sql, Types}).
+parse(C, From, Ref, Name, Sql, Types) ->
+    gen_server:cast(C, {parse, From, Ref, Name, Sql, Types}).
 
-bind(C, Statement, PortalName, Parameters) ->
-    gen_server:cast(C, {bind, Statement, PortalName, Parameters}).
+bind(C, From, Ref, Statement, PortalName, Parameters) ->
+    gen_server:cast(C, {bind, From, Ref, Statement, PortalName, Parameters}).
 
-execute(C, Statement, PortalName, MaxRows) ->
-    gen_server:cast(C, {execute, Statement, PortalName, MaxRows}).
+execute(C, From, Ref, Statement, PortalName, MaxRows) ->
+    gen_server:cast(C, {execute, From, Ref, Statement, PortalName, MaxRows}).
 
-describe(C, Type, Name) ->
-    gen_server:cast(C, {describe, Type, Name}).
+describe(C, From, Ref, Type, Name) ->
+    gen_server:cast(C, {describe, From, Ref, Type, Name}).
 
-close(C, Type, Name) ->
-    gen_server:cast(C, {close, Type, Name}).
+close(C, From, Ref, Type, Name) ->
+    gen_server:cast(C, {close, From, Ref, Type, Name}).
 
-sync(C) ->
-    gen_server:cast(C, sync).
+sync(C, From, Ref) ->
+    gen_server:cast(C, {sync, From, Ref}).
 
 cancel(S) ->
     gen_server:cast(S, cancel).
@@ -84,8 +84,14 @@ cancel(S) ->
 init([]) ->
     {ok, #state{}}.
 
-handle_call({connect, Host, Username, Password, Opts},
-            From,
+handle_call({get_parameter, Name}, From, State) ->
+    case lists:keysearch(Name, 1, State#state.parameters) of
+        {value, {Name, Value}} -> Value;
+        false                  -> Value = undefined
+    end,
+    {reply, {ok, Value}, State}.
+
+handle_cast({connect, From, Ref, Host, Username, Password, Opts},
             #state{queue = Queue} = State) ->
     Timeout = proplists:get_value(timeout, Opts, 5000),
     Port = proplists:get_value(port, Opts, 5432),
@@ -111,22 +117,21 @@ handle_call({connect, Host, Username, Password, Opts},
     put(password, Password),
     {noreply,
      State2#state{handler = auth,
-                  queue = queue:in(From, Queue),
+                  queue = queue:in({connect, From, Ref}, Queue),
                   async = Async}};
 
-handle_call(stop, From, #state{queue = Queue} = State) ->
+handle_cast(stop, #state{queue = Queue} = State) ->
     %% TODO flush queue
-    {stop, normal, ok, State};
+    {stop, normal, State};
 
-handle_call({parse, Name, Sql, Types}, From, State) ->
+handle_cast({parse, From, Ref, Name, Sql, Types}, State) ->
     #state{queue = Queue} = State,
     Bin = pgsql_wire:encode_types(Types),
     send(State, $P, [Name, 0, Sql, 0, Bin]),
     send(State, $D, [$S, Name, 0]),
     send(State, $H, []),
     S = #statement{name = Name},
-    State2 = State#state{queue = queue:in(From, Queue)},
-    {noreply, State2}.
+    {noreply, State#state{queue = queue:in({parse, From, Ref}, Queue)}};
 
 handle_cast(cancel, State = #state{backend = {Pid, Key}}) ->
     {ok, {Addr, Port}} = inet:peername(State#state.sock),
@@ -199,8 +204,9 @@ send(#state{mod = Mod, sock = Sock}, Data) ->
 send(#state{mod = Mod, sock = Sock}, Type, Data) ->
     Mod:send(Sock, pgsql_wire:encode(Type, Data)).
 
-gen_reply(#state{queue = Q} = State, Message) ->
-    gen_server:reply(queue:get(Q), Message).
+notify(#state{queue = Q} = State, Message) ->
+    {_, From, Ref} = queue:get(Q),
+    From ! {Ref, Message}.
 
 notify_async(#state{async = Pid}, Msg) ->
     case is_pid(Pid) of
@@ -235,9 +241,8 @@ auth({$R, <<M:?int32, _/binary>>}, State) ->
         8 -> Method = sspi;
         _ -> Method = unknown
     end,
-    {stop,
-     normal,
-     gen_reply(State, {error, {unsupported_auth_method, Method}})};
+    notify(State, {error, {unsupported_auth_method, Method}}),
+    {stop, normal, State};
 
 %% ErrorResponse
 auth({error, E}, State) ->
@@ -246,7 +251,8 @@ auth({error, E}, State) ->
         <<"28P01">> -> Why = invalid_password;
         Any         -> Why = Any
     end,
-    {stop, normal, gen_reply(State, {error, Why})};
+    notify(State, {error, Why}),
+    {stop, normal, State};
 
 auth(Other, State) ->
     on_message(Other, State).
@@ -258,7 +264,7 @@ initializing({$K, <<Pid:?int32, Key:?int32>>}, State) ->
 
 %% ReadyForQuery
 initializing({$Z, <<Status:8>>}, State) ->
-    #state{parameters = Parameters} = State,
+    #state{parameters = Parameters, queue = Queue} = State,
     erase(username),
     erase(password),
     %% TODO decode dates to now() format
@@ -266,13 +272,14 @@ initializing({$Z, <<Status:8>>}, State) ->
         {value, {_, <<"on">>}}  -> put(datetime_mod, pgsql_idatetime);
         {value, {_, <<"off">>}} -> put(datetime_mod, pgsql_fdatetime)
     end,
-    State2 = State#state{handler = on_message,
+    notify(State, ok),
+    {noreply, State#state{handler = on_message,
                          txstatus = Status,
-                         ready = true},
-    {noreply, gen_reply(State2, {ok, self()})};
+                         queue = queue:drop(Queue)}};
 
 initializing({error, _} = Error, State) ->
-    {stop, normal, gen_reply(State, Error)};
+    notify(State, Error), 
+    {stop, normal, State};
 
 initializing(Other, State) ->
     on_message(Other, State).