Browse Source

More internal typespecs; 'epgsql_sock:command*' functions

Сергей Прохоров 7 years ago
parent
commit
fed889b863
5 changed files with 73 additions and 36 deletions
  1. 17 16
      src/epgsql.erl
  2. 10 0
      src/epgsql_binary.erl
  3. 44 14
      src/epgsql_sock.erl
  4. 1 3
      src/epgsqla.erl
  5. 1 3
      src/epgsqli.erl

+ 17 - 16
src/epgsql.erl

@@ -157,9 +157,8 @@ connect(Host, Username, Password, Opts) ->
 connect(C, Host, Username, Password, Opts0) ->
 connect(C, Host, Username, Password, Opts0) ->
     Opts = to_proplist(Opts0),
     Opts = to_proplist(Opts0),
     %% TODO connect timeout
     %% TODO connect timeout
-    case gen_server:call(C,
-                         {connect, Host, Username, Password, Opts},
-                         infinity) of
+    case epgsql_sock:sync_command(
+           C, {connect, Host, Username, Password, Opts}) of
         connected ->
         connected ->
             case proplists:get_value(replication, Opts, undefined) of
             case proplists:get_value(replication, Opts, undefined) of
                 undefined ->
                 undefined ->
@@ -216,7 +215,7 @@ get_cmd_status(C) ->
 -spec squery(connection(), sql_query()) -> reply(squery_row()) | [reply(squery_row())].
 -spec squery(connection(), sql_query()) -> reply(squery_row()) | [reply(squery_row())].
 %% @doc runs simple `SqlQuery' via given `Connection'
 %% @doc runs simple `SqlQuery' via given `Connection'
 squery(Connection, SqlQuery) ->
 squery(Connection, SqlQuery) ->
-    gen_server:call(Connection, {squery, SqlQuery}, infinity).
+    epgsql_sock:sync_command(Connection, {squery, SqlQuery}).
 
 
 equery(C, Sql) ->
 equery(C, Sql) ->
     equery(C, Sql, []).
     equery(C, Sql, []).
@@ -226,7 +225,7 @@ equery(C, Sql, Parameters) ->
     case parse(C, "", Sql, []) of
     case parse(C, "", Sql, []) of
         {ok, #statement{types = Types} = S} ->
         {ok, #statement{types = Types} = S} ->
             Typed_Parameters = lists:zip(Types, Parameters),
             Typed_Parameters = lists:zip(Types, Parameters),
-            gen_server:call(C, {equery, S, Typed_Parameters}, infinity);
+            epgsql_sock:sync_command(C, {equery, S, Typed_Parameters});
         Error ->
         Error ->
             Error
             Error
     end.
     end.
@@ -236,7 +235,7 @@ equery(C, Name, Sql, Parameters) ->
     case parse(C, Name, Sql, []) of
     case parse(C, Name, Sql, []) of
         {ok, #statement{types = Types} = S} ->
         {ok, #statement{types = Types} = S} ->
             Typed_Parameters = lists:zip(Types, Parameters),
             Typed_Parameters = lists:zip(Types, Parameters),
-            gen_server:call(C, {equery, S, Typed_Parameters}, infinity);
+            epgsql_sock:sync_command(C, {equery, S, Typed_Parameters});
         Error ->
         Error ->
             Error
             Error
     end.
     end.
@@ -246,7 +245,7 @@ prepared_query(C, Name, Parameters) ->
     case describe(C, statement, Name) of
     case describe(C, statement, Name) of
         {ok, #statement{types = Types} = S} ->
         {ok, #statement{types = Types} = S} ->
             Typed_Parameters = lists:zip(Types, Parameters),
             Typed_Parameters = lists:zip(Types, Parameters),
-            gen_server:call(C, {prepared_query, S, Typed_Parameters}, infinity);
+            epgsql_sock:sync_command(C, {prepared_query, S, Typed_Parameters});
         Error ->
         Error ->
             Error
             Error
     end.
     end.
@@ -263,7 +262,7 @@ parse(C, Sql, Types) ->
 -spec parse(connection(), iolist(), sql_query(), [epgsql_type()]) ->
 -spec parse(connection(), iolist(), sql_query(), [epgsql_type()]) ->
                    {ok, #statement{}} | {error, query_error()}.
                    {ok, #statement{}} | {error, query_error()}.
 parse(C, Name, Sql, Types) ->
 parse(C, Name, Sql, Types) ->
-    sync_on_error(C, gen_server:call(C, {parse, Name, Sql, Types}, infinity)).
+    sync_on_error(C, epgsql_sock:sync_command(C, {parse, Name, Sql, Types})).
 
 
 %% bind
 %% bind
 
 
@@ -275,7 +274,7 @@ bind(C, Statement, Parameters) ->
 bind(C, Statement, PortalName, Parameters) ->
 bind(C, Statement, PortalName, Parameters) ->
     sync_on_error(
     sync_on_error(
       C,
       C,
-      gen_server:call(C, {bind, Statement, PortalName, Parameters}, infinity)).
+      epgsql_sock:sync_command(C, {bind, Statement, PortalName, Parameters})).
 
 
 %% execute
 %% execute
 
 
@@ -292,11 +291,11 @@ execute(C, S, N) ->
              | {ok, non_neg_integer(), [equery_row()]}
              | {ok, non_neg_integer(), [equery_row()]}
              | {error, query_error()}.
              | {error, query_error()}.
 execute(C, S, PortalName, N) ->
 execute(C, S, PortalName, N) ->
-    gen_server:call(C, {execute, S, PortalName, N}, infinity).
+    epgsql_sock:sync_command(C, {execute, S, PortalName, N}).
 
 
 -spec execute_batch(connection(), [{#statement{}, [bind_param()]}]) -> [reply(equery_row())].
 -spec execute_batch(connection(), [{#statement{}, [bind_param()]}]) -> [reply(equery_row())].
 execute_batch(C, Batch) ->
 execute_batch(C, Batch) ->
-    gen_server:call(C, {execute_batch, Batch}, infinity).
+    epgsql_sock:sync_command(C, {execute_batch, Batch}).
 
 
 %% statement/portal functions
 %% statement/portal functions
 
 
@@ -304,20 +303,20 @@ describe(C, #statement{name = Name}) ->
     describe(C, statement, Name).
     describe(C, statement, Name).
 
 
 describe(C, statement, Name) ->
 describe(C, statement, Name) ->
-    sync_on_error(C, gen_server:call(C, {describe_statement, Name}, infinity));
+    sync_on_error(C, epgsql_sock:sync_command(C, {describe_statement, Name}));
 
 
 %% TODO unknown result format of Describe portal
 %% TODO unknown result format of Describe portal
 describe(C, portal, Name) ->
 describe(C, portal, Name) ->
-    sync_on_error(C, gen_server:call(C, {describe_portal, Name}, infinity)).
+    sync_on_error(C, epgsql_sock:sync_command(C, {describe_portal, Name})).
 
 
 close(C, #statement{name = Name}) ->
 close(C, #statement{name = Name}) ->
     close(C, statement, Name).
     close(C, statement, Name).
 
 
 close(C, Type, Name) ->
 close(C, Type, Name) ->
-    gen_server:call(C, {close, Type, Name}).
+    epgsql_sock:sync_command(C, {close, Type, Name}).
 
 
 sync(C) ->
 sync(C) ->
-    gen_server:call(C, sync).
+    epgsql_sock:sync_command(C, sync).
 
 
 -spec cancel(connection()) -> ok.
 -spec cancel(connection()) -> ok.
 cancel(C) ->
 cancel(C) ->
@@ -405,7 +404,9 @@ standby_status_update(Connection, FlushedLSN, AppliedLSN) ->
 %%                      For example: "option_name1 'value1', option_name2 'value2'"
 %%                      For example: "option_name1 'value1', option_name2 'value2'"
 %% returns `ok' otherwise `{error, Reason}'
 %% returns `ok' otherwise `{error, Reason}'
 start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts) ->
 start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts) ->
-    gen_server:call(Connection, {start_replication, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts}).
+    Command = {start_replication, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts},
+    epgsql_sock:sync_command(Connection, Command).
+
 start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition) ->
 start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition) ->
     start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, []).
     start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, []).
 
 

+ 10 - 0
src/epgsql_binary.erl

@@ -7,6 +7,8 @@
          type2oid/2, oid2type/2,
          type2oid/2, oid2type/2,
          encode/3, decode/3, supports/1]).
          encode/3, decode/3, supports/1]).
 
 
+-export_type([codec/0]).
+
 -record(codec, {
 -record(codec, {
     type2oid = [],
     type2oid = [],
     oid2type = []
     oid2type = []
@@ -14,6 +16,8 @@
 
 
 -include("epgsql_binary.hrl").
 -include("epgsql_binary.hrl").
 
 
+-opaque codec() :: #codec{}.
+
 -define(datetime, (get(datetime_mod))).
 -define(datetime, (get(datetime_mod))).
 
 
 -define(INET, 2).
 -define(INET, 2).
@@ -24,8 +28,10 @@
 -define(MAX_IP6_MASK, 128).
 -define(MAX_IP6_MASK, 128).
 -define(JSONB_VERSION_1, 1).
 -define(JSONB_VERSION_1, 1).
 
 
+-spec new_codec(list()) -> codec().
 new_codec([]) -> #codec{}.
 new_codec([]) -> #codec{}.
 
 
+-spec update_type_cache(list(), codec()) -> codec().
 update_type_cache(TypeInfos, Codec) ->
 update_type_cache(TypeInfos, Codec) ->
     Type2Oid = lists:flatmap(
     Type2Oid = lists:flatmap(
         fun({NameBin, ElementOid, ArrayOid}) ->
         fun({NameBin, ElementOid, ArrayOid}) ->
@@ -36,6 +42,8 @@ update_type_cache(TypeInfos, Codec) ->
     Oid2Type = [{Oid, Type} || {Type, Oid} <- Type2Oid],
     Oid2Type = [{Oid, Type} || {Type, Oid} <- Type2Oid],
     Codec#codec{type2oid = Type2Oid, oid2type = Oid2Type}.
     Codec#codec{type2oid = Type2Oid, oid2type = Oid2Type}.
 
 
+-spec oid2type(integer(), codec()) -> Type | {unknown_oid, integer()} when
+      Type :: atom() | {array, atom()}.
 oid2type(Oid, #codec{oid2type = Oid2Type}) ->
 oid2type(Oid, #codec{oid2type = Oid2Type}) ->
     case epgsql_types:oid2type(Oid) of
     case epgsql_types:oid2type(Oid) of
         {unknown_oid, _} ->
         {unknown_oid, _} ->
@@ -43,6 +51,8 @@ oid2type(Oid, #codec{oid2type = Oid2Type}) ->
         Type -> Type
         Type -> Type
     end.
     end.
 
 
+-spec type2oid(Type, codec()) -> integer() | {unknown_type, Type} when
+      Type :: atom() | {array, atom()}.
 type2oid(Type, #codec{type2oid = Type2Oid}) ->
 type2oid(Type, #codec{type2oid = Type2Oid}) ->
     case epgsql_types:type2oid(Type) of
     case epgsql_types:type2oid(Type) of
         {unknown_type, _} ->
         {unknown_type, _} ->

+ 44 - 14
src/epgsql_sock.erl

@@ -7,6 +7,8 @@
 
 
 -export([start_link/0,
 -export([start_link/0,
          close/1,
          close/1,
+         sync_command/2,
+         async_command/3,
          get_parameter/2,
          get_parameter/2,
          set_notice_receiver/2,
          set_notice_receiver/2,
          get_cmd_status/1,
          get_cmd_status/1,
@@ -21,6 +23,21 @@
 -include("epgsql.hrl").
 -include("epgsql.hrl").
 -include("epgsql_binary.hrl").
 -include("epgsql_binary.hrl").
 
 
+-type command() ::
+        sync
+      | {connect, any(), any(), any(), list()}
+      | {squery, iodata()}
+      | {equery, #statement{}, list()}
+      | {prepared_query, #statement{}, list()}
+      | {parse, iodata(), iodata(), list()}
+      | {bind, #statement{}, iodata(), list()}
+      | {execute, #statement{}, iodata(), non_neg_integer()}
+      | {execute_batch, [{#statement{}, list()}]}
+      | {describe_statement, iodata()}
+      | {describe_portal, iodata()}
+      | {close, statement | portal, iodata()}
+      | {start_replication, iodata(), module() | pid(), any(), string(), string()}.
+
 %% Commands defined as per this page:
 %% Commands defined as per this page:
 %% http://www.postgresql.org/docs/9.2/static/protocol-message-formats.html
 %% http://www.postgresql.org/docs/9.2/static/protocol-message-formats.html
 
 
@@ -77,23 +94,24 @@
           receiver :: pid() | undefined
           receiver :: pid() | undefined
         }).
         }).
 
 
--record(state, {mod,
-                sock,
+-record(state, {mod :: gen_tcp | ssl | undefined,
+                sock :: gen_tcp:socket() | ssl:sslsocket() | undefined,
                 data = <<>>,
                 data = <<>>,
-                backend,
-                handler,
-                codec,
-                queue = queue:new(),
-                async,
-                parameters = [],
-                types = [],
-                columns = [],
-                rows = [],
+                backend :: {Pid :: integer(), Key :: integer()} | undefined,
+                handler :: auth | initializing | on_message | on_replication | undefined,
+                codec :: epgsql_binary:codec() | undefined,
+                queue = queue:new() :: queue:queue(command()),
+                command_state = dict:new() :: dict:dict(reference(), any()) | undefined,
+                async :: undefined | atom() | pid(),
+                parameters = [] :: [{Key :: binary(), Value :: binary()}],
+                types = [] :: [atom()],
+                columns = [] :: [#column{}],
+                rows = [] :: [tuple()],
                 results = [],
                 results = [],
                 batch = [],
                 batch = [],
-                sync_required,
-                txstatus,
-                complete_status :: undefined | atom() | {atom(), integer()},
+                sync_required :: boolean() | undefined,
+                txstatus :: byte() | undefined,  % $I | $T | $E,
+                complete_status :: atom() | {atom(), integer()} | undefined,
                 repl :: #repl{} | undefined}).
                 repl :: #repl{} | undefined}).
 
 
 %% -- client interface --
 %% -- client interface --
@@ -105,6 +123,17 @@ close(C) when is_pid(C) ->
     catch gen_server:cast(C, stop),
     catch gen_server:cast(C, stop),
     ok.
     ok.
 
 
+-spec sync_command(epgsql:conection(), command()) -> any().
+sync_command(C, Command) ->
+    gen_server:call(C, Command, infinity).
+
+-spec async_command(epgsql:conection(), cast | incremental, command()) -> reference().
+async_command(C, Method, Command) ->
+    Ref = make_ref(),
+    Pid = self(),
+    ok = gen_server:cast(C, {{Method, Pid, Ref}, Command}),
+    Ref.
+
 get_parameter(C, Name) ->
 get_parameter(C, Name) ->
     gen_server:call(C, {get_parameter, to_binary(Name)}, infinity).
     gen_server:call(C, {get_parameter, to_binary(Name)}, infinity).
 
 
@@ -205,6 +234,7 @@ code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
     {ok, State}.
 
 
 %% -- internal functions --
 %% -- internal functions --
+-spec command(command(), #state{}) -> {noreply, #state{}} | {stop, any(), #state{}}.
 
 
 command(Command, State = #state{sync_required = true})
 command(Command, State = #state{sync_required = true})
   when Command /= sync ->
   when Command /= sync ->

+ 1 - 3
src/epgsqla.erl

@@ -140,9 +140,7 @@ cancel(C) ->
 %% -- internal functions --
 %% -- internal functions --
 
 
 cast(C, Command) ->
 cast(C, Command) ->
-    Ref = make_ref(),
-    gen_server:cast(C, {{cast, self(), Ref}, Command}),
-    Ref.
+    epgsql_sock:async_command(C, cast, Command).
 
 
 complete_connect(C, Ref) ->
 complete_connect(C, Ref) ->
     receive
     receive

+ 1 - 3
src/epgsqli.erl

@@ -140,6 +140,4 @@ cancel(C) ->
 %% -- internal functions --
 %% -- internal functions --
 
 
 incremental(C, Command) ->
 incremental(C, Command) ->
-    Ref = make_ref(),
-    gen_server:cast(C, {{incremental, self(), Ref}, Command}),
-    Ref.
+    epgsql_sock:async_command(C, incremental, Command).