Browse Source

extract cast api to apgsql, use gen_server:call in pgsql

Anton Lebedevich 13 years ago
parent
commit
974a6a201b
3 changed files with 90 additions and 86 deletions
  1. 69 0
      src/apgsql.erl
  2. 20 39
      src/pgsql.erl
  3. 1 47
      src/pgsql_sock.erl

+ 69 - 0
src/apgsql.erl

@@ -0,0 +1,69 @@
+%%% Copyright (C) 2011 - Anton Lebedevich.  All rights reserved.
+
+-module(apgsql).
+
+-export([start_link/0,
+         connect/5,
+         close/1,
+         get_parameter/2,
+         squery/2,
+         equery/3,
+         parse/4,
+         bind/4,
+         execute/4,
+         describe/3,
+         close/3,
+         sync/1,
+         cancel/1]).
+
+%% -- client interface --
+
+start_link() ->
+    pgqsq_sock:start_link().
+
+connect(C, Host, Username, Password, Opts) ->
+    cast(C, {connect, Host, Username, Password, Opts}).
+
+close(C) ->
+    pgsql_sock:close(C).
+
+get_parameter(C, Name) ->
+    pgsql_sock:get_parameter(C, Name).
+
+squery(C, Sql) ->
+    cast(C, {squery, Sql}).
+
+equery(C, Statement, Parameters) ->
+    cast(C, {equery, Statement, Parameters}).
+
+parse(C, Name, Sql, Types) ->
+    cast(C, {parse, Name, Sql, Types}).
+
+bind(C, Statement, PortalName, Parameters) ->
+    cast(C, {bind, Statement, PortalName, Parameters}).
+
+execute(C, Statement, PortalName, MaxRows) ->
+    cast(C, {execute, Statement, PortalName, MaxRows}).
+
+describe(C, statement, Name) ->
+    cast(C, {describe_statement, Name});
+
+describe(C, portal, Name) ->
+    cast(C, {describe_portal, Name}).
+
+close(C, Type, Name) ->
+    cast(C, {close, Type, Name}).
+
+sync(C) ->
+    cast(C, sync).
+
+cancel(C) ->
+    pgsql_sock:cancel(C).
+
+
+%% -- internal functions --
+
+cast(C, Command) ->
+    Ref = make_ref(),
+    gen_server:cast(C, {{self(), Ref}, Command}),
+    Ref.

+ 20 - 39
src/pgsql.erl

@@ -9,7 +9,7 @@
 -export([bind/3, bind/4, execute/2, execute/3, execute/4]).
 -export([bind/3, bind/4, execute/2, execute/3, execute/4]).
 -export([close/2, close/3, sync/1]).
 -export([close/2, close/3, sync/1]).
 -export([with_transaction/2]).
 -export([with_transaction/2]).
--export([receive_result/2, sync_on_error/2]).
+-export([sync_on_error/2]).
 
 
 -include("pgsql.hrl").
 -include("pgsql.hrl").
 
 
@@ -23,15 +23,14 @@ connect(Host, Username, Opts) ->
 
 
 connect(Host, Username, Password, Opts) ->
 connect(Host, Username, Password, Opts) ->
     {ok, C} = pgsql_sock:start_link(),
     {ok, C} = pgsql_sock:start_link(),
-    Ref = pgsql_sock:connect(C, Host, Username, Password, Opts),
     %% TODO connect timeout
     %% TODO connect timeout
-    receive
-        {Ref, connected} ->
+    case gen_server:call(C,
+                         {connect, Host, Username, Password, Opts},
+                         infinity) of
+        connected ->
             {ok, C};
             {ok, C};
-        {Ref, Error = {error, _}} ->
-            Error;
-        {'EXIT', C, _Reason} ->
-            {error, closed}
+        Error = {error, _} ->
+            Error
     end.
     end.
 
 
 close(C) ->
 close(C) ->
@@ -41,8 +40,7 @@ get_parameter(C, Name) ->
     pgsql_sock:get_parameter(C, Name).
     pgsql_sock:get_parameter(C, Name).
 
 
 squery(C, Sql) ->
 squery(C, Sql) ->
-    Ref = pgsql_sock:squery(C, Sql),
-    receive_result(C, Ref).
+    gen_server:call(C, {squery, Sql}, infinity).
 
 
 equery(C, Sql) ->
 equery(C, Sql) ->
     equery(C, Sql, []).
     equery(C, Sql, []).
@@ -52,8 +50,7 @@ equery(C, Sql, Parameters) ->
     case parse(C, Sql) of
     case parse(C, Sql) of
         {ok, #statement{types = Types} = S} ->
         {ok, #statement{types = Types} = S} ->
             Typed_Parameters = lists:zip(Types, Parameters),
             Typed_Parameters = lists:zip(Types, Parameters),
-            Ref = pgsql_sock:equery(C, S, Typed_Parameters),
-            receive_result(C, Ref);
+            gen_server:call(C, {equery, S, Typed_Parameters}, infinity);
         Error ->
         Error ->
             Error
             Error
     end.
     end.
@@ -67,8 +64,7 @@ parse(C, Sql, Types) ->
     parse(C, "", Sql, Types).
     parse(C, "", Sql, Types).
 
 
 parse(C, Name, Sql, Types) ->
 parse(C, Name, Sql, Types) ->
-    Ref = pgsql_sock:parse(C, Name, Sql, Types),
-    sync_on_error(C, receive_result(C, Ref)).
+    sync_on_error(C, gen_server:call(C, {parse, Name, Sql, Types}, infinity)).
 
 
 %% bind
 %% bind
 
 
@@ -76,8 +72,9 @@ bind(C, Statement, Parameters) ->
     bind(C, Statement, "", Parameters).
     bind(C, Statement, "", Parameters).
 
 
 bind(C, Statement, PortalName, Parameters) ->
 bind(C, Statement, PortalName, Parameters) ->
-    Ref = pgsql_sock:bind(C, Statement, PortalName, Parameters),
-    sync_on_error(C, receive_result(C, Ref)).
+    sync_on_error(
+      C,
+      gen_server:call(C, {bind, Statement, PortalName, Parameters}, infinity)).
 
 
 %% execute
 %% execute
 
 
@@ -88,29 +85,25 @@ execute(C, S, N) ->
     execute(C, S, "", N).
     execute(C, S, "", N).
 
 
 execute(C, S, PortalName, N) ->
 execute(C, S, PortalName, N) ->
-    Ref = pgsql_sock:execute(C, S, PortalName, N),
-    receive_result(C, Ref).
+    gen_server:call(C, {execute, S, PortalName, N}, infinity).
 
 
 %% statement/portal functions
 %% statement/portal functions
 
 
 describe(C, #statement{name = Name}) ->
 describe(C, #statement{name = Name}) ->
     describe(C, statement, Name).
     describe(C, statement, Name).
 
 
+%% TODO unknown result format of Describe portal
 describe(C, Type, Name) ->
 describe(C, Type, Name) ->
-    Ref = pgsql_sock:describe(C, Type, Name),
-    %% TODO unknown result format of Describe portal
-    sync_on_error(C, receive_result(C, Ref)).
+    sync_on_error(C, gen_server:call(C, {describe, Type, Name}, infinity)).
 
 
 close(C, #statement{name = Name}) ->
 close(C, #statement{name = Name}) ->
     close(C, statement, Name).
     close(C, statement, Name).
 
 
 close(C, Type, Name) ->
 close(C, Type, Name) ->
-    Ref = pgsql_sock:close(C, Type, Name),
-    receive_result(C, Ref).
+    gen_server:call(C, {close, Type, Name}).
 
 
 sync(C) ->
 sync(C) ->
-    Ref = pgsql_sock:sync(C),
-    receive_result(C, Ref).
+    gen_server:call(C, sync).
 
 
 %% misc helper functions
 %% misc helper functions
 with_transaction(C, F) ->
 with_transaction(C, F) ->
@@ -125,20 +118,8 @@ with_transaction(C, F) ->
             {rollback, Why}
             {rollback, Why}
     end.
     end.
 
 
-receive_result(C, Ref) ->
-    %% TODO timeout
-    receive
-        {Ref, Result} ->
-            Result;
-        %% TODO no 'EXIT' for not linked processes
-        {'EXIT', C, _Reason} ->
-            {error, closed}
-    end.
-
-sync_on_error(C, Error = {error, _}) ->
-    Ref = pgsql_sock:sync(C),
-    receive_result(C, Ref),
-    Error;
+sync_on_error(C, {error, _}) ->
+    sync(C);
 
 
 sync_on_error(_C, R) ->
 sync_on_error(_C, R) ->
     R.
     R.

+ 1 - 47
src/pgsql_sock.erl

@@ -6,17 +6,8 @@
 -behavior(gen_server).
 -behavior(gen_server).
 
 
 -export([start_link/0,
 -export([start_link/0,
-         connect/5,
          close/1,
          close/1,
          get_parameter/2,
          get_parameter/2,
-         squery/2,
-         equery/3,
-         parse/4,
-         bind/4,
-         execute/4,
-         describe/3,
-         close/3,
-         sync/1,
          cancel/1]).
          cancel/1]).
 
 
 -export([handle_call/3, handle_cast/2, handle_info/2]).
 -export([handle_call/3, handle_cast/2, handle_info/2]).
@@ -48,10 +39,6 @@
 start_link() ->
 start_link() ->
     gen_server:start_link(?MODULE, [], []).
     gen_server:start_link(?MODULE, [], []).
 
 
-connect(C, Host, Username, Password, Opts) ->
-    cast(C, {connect, Host, Username, Password, Opts}).
-
-%% TODO extract API functions
 close(C) when is_pid(C) ->
 close(C) when is_pid(C) ->
     catch gen_server:cast(C, stop),
     catch gen_server:cast(C, stop),
     ok.
     ok.
@@ -59,33 +46,6 @@ close(C) when is_pid(C) ->
 get_parameter(C, Name) ->
 get_parameter(C, Name) ->
     gen_server:call(C, {get_parameter, to_binary(Name)}, infinity).
     gen_server:call(C, {get_parameter, to_binary(Name)}, infinity).
 
 
-squery(C, Sql) ->
-    cast(C, {squery, Sql}).
-
-equery(C, Statement, Parameters) ->
-    cast(C, {equery, Statement, Parameters}).
-
-parse(C, Name, Sql, Types) ->
-    cast(C, {parse, Name, Sql, Types}).
-
-bind(C, Statement, PortalName, Parameters) ->
-    cast(C, {bind, Statement, PortalName, Parameters}).
-
-execute(C, Statement, PortalName, MaxRows) ->
-    cast(C, {execute, Statement, PortalName, MaxRows}).
-
-describe(C, statement, Name) ->
-    cast(C, {describe_statement, Name});
-
-describe(C, portal, Name) ->
-    cast(C, {describe_portal, Name}).
-
-close(C, Type, Name) ->
-    cast(C, {close, Type, Name}).
-
-sync(C) ->
-    cast(C, sync).
-
 cancel(S) ->
 cancel(S) ->
     gen_server:cast(S, cancel).
     gen_server:cast(S, cancel).
 
 
@@ -145,11 +105,6 @@ code_change(_OldVsn, State, _Extra) ->
 
 
 %% -- internal functions --
 %% -- internal functions --
 
 
-cast(C, Command) ->
-    Ref = make_ref(),
-    gen_server:cast(C, {{self(), Ref}, Command}),
-    Ref.
-
 command(Command, State = #state{sync_required = true})
 command(Command, State = #state{sync_required = true})
   when Command /= sync ->
   when Command /= sync ->
     {noreply, finish(State, {error, sync_required})};
     {noreply, finish(State, {error, sync_required})};
@@ -214,8 +169,7 @@ command({bind, Statement, PortalName, Parameters}, State) ->
     send(State, $H, []),
     send(State, $H, []),
     {noreply, State};
     {noreply, State};
 
 
-%% TODO unused parameter?
-command({execute, _, PortalName, MaxRows}, State) ->
+command({execute, _Statement, PortalName, MaxRows}, State) ->
     send(State, $E, [PortalName, 0, <<MaxRows:?int32>>]),
     send(State, $E, [PortalName, 0, <<MaxRows:?int32>>]),
     send(State, $H, []),
     send(State, $H, []),
     {noreply, State};
     {noreply, State};