Browse Source

Add general socket_active option instead of using tcp_opts and ssl_opts, update docs

Yury Yantsevich 2 years ago
parent
commit
d6f6f472cf
6 changed files with 77 additions and 71 deletions
  1. 6 4
      README.md
  2. 10 7
      doc/streaming.md
  3. 2 3
      src/commands/epgsql_cmd_connect.erl
  4. 10 5
      src/epgsql.erl
  5. 19 22
      src/epgsql_sock.erl
  6. 30 30
      test/epgsql_replication_SUITE.erl

+ 6 - 4
README.md

@@ -108,11 +108,9 @@ Only `host` and `username` are mandatory, but most likely you would need `databa
 - `ssl` if set to `true`, perform an attempt to connect in ssl mode, but continue unencrypted
 - `ssl` if set to `true`, perform an attempt to connect in ssl mode, but continue unencrypted
   if encryption isn't supported by server. if set to `required` connection will fail if encryption
   if encryption isn't supported by server. if set to `required` connection will fail if encryption
   is not available.
   is not available.
-- `ssl_opts` will be passed as is to `ssl:connect/3`. The `active` option is only available in
-  the `replication` mode and OTP >= 21.3.
+- `ssl_opts` will be passed as is to `ssl:connect/3`.
 - `tcp_opts` will be passed as is to `gen_tcp:connect/3`. Some options are forbidden, such as
 - `tcp_opts` will be passed as is to `gen_tcp:connect/3`. Some options are forbidden, such as
-  `mode`, `packet`, `header`. The `active` option is only available in the `replication` mode.
-  When `tcp_opts` is not provided, epgsql does some tuning
+  `mode`, `packet`, `header`, `active`. When `tcp_opts` is not provided, epgsql does some tuning
   (eg, sets TCP `keepalive` and auto-tunes `buffer`), but when `tcp_opts` is provided, no
   (eg, sets TCP `keepalive` and auto-tunes `buffer`), but when `tcp_opts` is provided, no
   additional tweaks are added by epgsql itself, other than necessary ones (`active`, `packet` and `mode`).
   additional tweaks are added by epgsql itself, other than necessary ones (`active`, `packet` and `mode`).
 - `async` see [Server notifications](#server-notifications)
 - `async` see [Server notifications](#server-notifications)
@@ -127,6 +125,10 @@ Only `host` and `username` are mandatory, but most likely you would need `databa
 - `application_name` is an optional string parameter. It is usually set by an application upon
 - `application_name` is an optional string parameter. It is usually set by an application upon
    connection to the server. The name will be displayed in the `pg_stat_activity`
    connection to the server. The name will be displayed in the `pg_stat_activity`
    view and included in CSV log entries.
    view and included in CSV log entries.
+- `socket_active` is an optional parameter, which can be true or integer in the range -32768
+  to 32767 (inclusive). This option only works in the replication mode and is used to control
+  the flow of incoming messages. See [Streaming replication protocol](#streaming-replication-protocol)
+  for more details.
 
 
 Options may be passed as proplist or as map with the same key names.
 Options may be passed as proplist or as map with the same key names.
 
 

+ 10 - 7
doc/streaming.md

@@ -142,14 +142,15 @@ handle_x_log_data(StartLSN, EndLSN, Data, CbState) ->
     
     
 ## Flow control
 ## Flow control
 
 
-It is possible to set `{active, N}` on a TCP or SSL (since OTP 21.3) socket. E.g. for SSL:
+It is possible to set `{socket_active, N}` on a [TCP](https://www.erlang.org/doc/man/inet.html#setopts-2)
+or [SSL](https://www.erlang.org/doc/man/ssl.html#setopts-2) (since OTP 21.3) socket. E.g. for SSL:
 ```erlang
 ```erlang
-Opts = #{ host => "localhost"
-        , username => "me"
-        , password => "pwd"
-        , database => "test"
-        , ssl => true
-        , ssl_opts => [{active, 10}]
+Opts = #{host => "localhost",
+         username => "me",
+         password => "pwd",
+         database => "test",
+         ssl => require,
+         socket_active => 10
         },
         },
 {ok, Conn} = epgsql:connect(Opts).
 {ok, Conn} = epgsql:connect(Opts).
 ```
 ```
@@ -163,6 +164,8 @@ When the connection is in the asynchronous mode, a process which owns a connecti
 ```erlang
 ```erlang
 {epgsql, Connection, socket_passive}
 {epgsql, Connection, socket_passive}
 ```
 ```
+as soon as underlying socket received N messages from network.
+
 The process decides when to activate connection's socket again. To do that it should call:
 The process decides when to activate connection's socket again. To do that it should call:
 ```erlang
 ```erlang
 epgsql:activate(Connection).
 epgsql:activate(Connection).

+ 2 - 3
src/commands/epgsql_cmd_connect.erl

@@ -295,12 +295,11 @@ prepare_tcp_opts(Opts0) ->
                          ({packet, _}) -> true;
                          ({packet, _}) -> true;
                          ({packet_size, _}) -> true;
                          ({packet_size, _}) -> true;
                          ({header, _}) -> true;
                          ({header, _}) -> true;
+                         ({active, _}) -> true;
                          (_) -> false
                          (_) -> false
                       end, Opts0) of
                       end, Opts0) of
         [] ->
         [] ->
-            Opts = [{packet, raw}, {mode, binary} | Opts0],
-            %% Make sure that active is set to false while establishing a connection
-            lists:keystore(active, 1, Opts, {active, false});
+            [{active, false}, {packet, raw}, {mode, binary} | Opts0];
         Forbidden ->
         Forbidden ->
             error({forbidden_tcp_opts, Forbidden})
             error({forbidden_tcp_opts, Forbidden})
     end.
     end.

+ 10 - 5
src/epgsql.erl

@@ -47,7 +47,7 @@
 -export_type([connection/0, connect_option/0, connect_opts/0,
 -export_type([connection/0, connect_option/0, connect_opts/0,
               connect_error/0, query_error/0, sql_query/0, column/0,
               connect_error/0, query_error/0, sql_query/0, column/0,
               type_name/0, epgsql_type/0, statement/0,
               type_name/0, epgsql_type/0, statement/0,
-              transaction_option/0, transaction_opts/0]).
+              transaction_option/0, transaction_opts/0, socket_active/0]).
 
 
 %% Deprecated types
 %% Deprecated types
 -export_type([bind_param/0, typed_param/0,
 -export_type([bind_param/0, typed_param/0,
@@ -66,6 +66,7 @@
 -type host() :: inet:ip_address() | inet:hostname().
 -type host() :: inet:ip_address() | inet:hostname().
 -type password() :: string() | iodata() | fun( () -> iodata() ).
 -type password() :: string() | iodata() | fun( () -> iodata() ).
 -type connection() :: pid().
 -type connection() :: pid().
+-type socket_active() :: true | -32768..32767.
 -type connect_option() ::
 -type connect_option() ::
     {host, host()}                                 |
     {host, host()}                                 |
     {username, string()}                           |
     {username, string()}                           |
@@ -80,7 +81,8 @@
     {codecs,   Codecs     :: [{epgsql_codec:codec_mod(), any()}]} |
     {codecs,   Codecs     :: [{epgsql_codec:codec_mod(), any()}]} |
     {nulls,    Nulls      :: [any(), ...]} |    % terms to be used as NULL
     {nulls,    Nulls      :: [any(), ...]} |    % terms to be used as NULL
     {replication, Replication :: string()} | % Pass "database" to connect in replication mode
     {replication, Replication :: string()} | % Pass "database" to connect in replication mode
-    {application_name, ApplicationName :: string()}.
+    {application_name, ApplicationName :: string()} |
+    {socket_active, Active :: socket_active()}.
 
 
 -type connect_opts() ::
 -type connect_opts() ::
         [connect_option()]
         [connect_option()]
@@ -97,7 +99,8 @@
           codecs => [{epgsql_codec:codec_mod(), any()}],
           codecs => [{epgsql_codec:codec_mod(), any()}],
           nulls => [any(), ...],
           nulls => [any(), ...],
           replication => string(),
           replication => string(),
-          application_name => string()
+          application_name => string(),
+          socket_active => socket_active()
           }.
           }.
 
 
 -type transaction_option() ::
 -type transaction_option() ::
@@ -594,6 +597,8 @@ to_map(List) when is_list(List) ->
 %%
 %%
 %% @param Connection connection
 %% @param Connection connection
 %% @returns `ok' or `{error, Reason}'
 %% @returns `ok' or `{error, Reason}'
--spec activate(connection()) -> ok | {error, any()}.
+%%
+%% The ssl:reason() type is not exported.
+-spec activate(connection()) -> ok | {error, inet:posix() | any()}.
 activate(Connection) ->
 activate(Connection) ->
-  epgsql_sock:activate(Connection).
+    epgsql_sock:activate(Connection).

+ 19 - 22
src/epgsql_sock.erl

@@ -156,7 +156,8 @@ standby_status_update(C, FlushedLSN, AppliedLSN) ->
 get_backend_pid(C) ->
 get_backend_pid(C) ->
     gen_server:call(C, get_backend_pid).
     gen_server:call(C, get_backend_pid).
 
 
--spec activate(epgsql:connection()) -> ok | {error, any()}.
+%% The ssl:reason() type is not exported
+-spec activate(epgsql:connection()) -> ok | {error, inet:posix() | any()}.
 activate(C) ->
 activate(C) ->
     gen_server:call(C, activate).
     gen_server:call(C, activate).
 
 
@@ -168,7 +169,8 @@ activate(C) ->
 -spec set_net_socket(gen_tcp | ssl, tcp_socket() | ssl:sslsocket(), pg_sock()) -> pg_sock().
 -spec set_net_socket(gen_tcp | ssl, tcp_socket() | ssl:sslsocket(), pg_sock()) -> pg_sock().
 set_net_socket(Mod, Socket, State) ->
 set_net_socket(Mod, Socket, State) ->
     State1 = State#state{mod = Mod, sock = Socket},
     State1 = State#state{mod = Mod, sock = Socket},
-    setopts(State1, [{active, true}]),
+    Active = get_socket_active(State1),
+    setopts(State1, [{active, Active}]),
     State1.
     State1.
 
 
 -spec init_replication_state(pg_sock()) -> pg_sock().
 -spec init_replication_state(pg_sock()) -> pg_sock().
@@ -195,7 +197,8 @@ set_attr(connect_opts, ConnectOpts, State) ->
 -spec set_packet_handler(atom(), pg_sock()) -> pg_sock().
 -spec set_packet_handler(atom(), pg_sock()) -> pg_sock().
 set_packet_handler(Handler, State0) ->
 set_packet_handler(Handler, State0) ->
     State = State0#state{handler = Handler},
     State = State0#state{handler = Handler},
-    setopts(State, [{active, true}]),
+    Active = get_socket_active(State),
+    setopts(State, [{active, Active}]),
     State.
     State.
 
 
 -spec get_codec(pg_sock()) -> epgsql_binary:codec().
 -spec get_codec(pg_sock()) -> epgsql_binary:codec().
@@ -256,8 +259,9 @@ handle_call({copy_send_rows, Rows}, _From,
     {reply, Response, State};
     {reply, Response, State};
 
 
 handle_call(activate, _From, State) ->
 handle_call(activate, _From, State) ->
-    setopts(State, [{active, true}]),
-    {reply, ok, State}.
+    Active = get_socket_active(State),
+    Res = setopts(State, [{active, Active}]),
+    {reply, Res, State}.
 
 
 handle_cast({{Method, From, Ref} = Transport, Command, Args}, State)
 handle_cast({{Method, From, Ref} = Transport, Command, Args}, State)
   when ((Method == cast) or (Method == incremental)),
   when ((Method == cast) or (Method == incremental)),
@@ -431,28 +435,21 @@ command_next(#state{current_cmd = PrevCmd,
                         results = []}
                         results = []}
     end.
     end.
 
 
-setopts(#state{mod = Mod, sock = Sock} = State, DefaultOpts) ->
-    Opts = update_active(State, DefaultOpts),
+setopts(#state{mod = Mod, sock = Sock}, Opts) ->
     case Mod of
     case Mod of
         gen_tcp -> inet:setopts(Sock, Opts);
         gen_tcp -> inet:setopts(Sock, Opts);
         ssl     -> ssl:setopts(Sock, Opts)
         ssl     -> ssl:setopts(Sock, Opts)
     end.
     end.
 
 
-update_active(#state{handler = H}, DefaultOpts) when H =/= on_replication ->
-    %% Ignore active option in tcp_opts or ssl_opts unless in the replication mode
-    DefaultOpts;
-update_active(#state{mod = gen_tcp, connect_opts = #{tcp_opts := Opts}}, DefaultOpts) ->
-    update_active_opt(Opts, DefaultOpts);
-update_active(#state{mod = ssl, connect_opts = #{ssl_opts := Opts}}, DefaultOpts) ->
-    update_active_opt(Opts, DefaultOpts);
-update_active(_State, DefaultOpts) ->
-    DefaultOpts.
-
-update_active_opt(Opts, DefaultOpts) ->
-    case proplists:lookup(active, Opts) of
-        none -> DefaultOpts;
-        Active -> lists:keystore(active, 1, DefaultOpts, Active)
-    end.
+-spec get_socket_active(pg_sock()) -> epgsql:socket_active().
+get_socket_active(#state{handler = H}) when H =/= on_replication ->
+    true;
+get_socket_active(#state{connect_opts = #{socket_active := Active}}) ->
+    Active;
+get_socket_active(#state{connect_opts = Opts}) when is_list(Opts) ->
+    proplists:get_value(socket_active, Opts, true);
+get_socket_active(_State) ->
+    true.
 
 
 %% This one only used in connection initiation to send client's
 %% This one only used in connection initiation to send client's
 %% `StartupMessage' and `SSLRequest' packets
 %% `StartupMessage' and `SSLRequest' packets

+ 30 - 30
test/epgsql_replication_SUITE.erl

@@ -4,20 +4,20 @@
 -include_lib("common_test/include/ct.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include("epgsql.hrl").
 -include("epgsql.hrl").
 
 
--export([ all/0
-        , init_per_suite/1
-        , end_per_suite/1
-
-        , connect_in_repl_mode/1
-        , create_drop_replication_slot/1
-        , replication_sync/1
-        , replication_async/1
-        , replication_async_active_n_socket/1
-        , replication_sync_active_n_socket/1
-
-          %% Callbacks
-        , handle_x_log_data/4
-        , handle_socket_passive/1
+-export([all/0,
+         init_per_suite/1,
+         end_per_suite/1,
+
+         connect_in_repl_mode/1,
+         create_drop_replication_slot/1,
+         replication_sync/1,
+         replication_async/1,
+         replication_async_active_n_socket/1,
+         replication_sync_active_n_socket/1,
+
+         %% Callbacks
+         handle_x_log_data/4,
+         handle_socket_passive/1
         ]).
         ]).
 
 
 init_per_suite(Config) ->
 init_per_suite(Config) ->
@@ -27,20 +27,20 @@ end_per_suite(_Config) ->
   ok.
   ok.
 
 
 all() ->
 all() ->
-  [ connect_in_repl_mode
-  , create_drop_replication_slot
-  , replication_async
-  , replication_sync
-  , replication_async_active_n_socket
-  , replication_sync_active_n_socket
+  [connect_in_repl_mode,
+   create_drop_replication_slot,
+   replication_async,
+   replication_sync,
+   replication_async_active_n_socket,
+   replication_sync_active_n_socket
   ].
   ].
 
 
 connect_in_repl_mode(Config) ->
 connect_in_repl_mode(Config) ->
   epgsql_ct:connect_only(
   epgsql_ct:connect_only(
     Config,
     Config,
-    [ "epgsql_test_replication"
-    , "epgsql_test_replication"
-    , [{database, "epgsql_test_db1"}, {replication, "database"}]
+    ["epgsql_test_replication",
+     "epgsql_test_replication",
+     [{database, "epgsql_test_db1"}, {replication, "database"}]
     ]).
     ]).
 
 
 create_drop_replication_slot(Config) ->
 create_drop_replication_slot(Config) ->
@@ -60,10 +60,10 @@ replication_sync(Config) ->
   replication_test_run(Config, ?MODULE).
   replication_test_run(Config, ?MODULE).
 
 
 replication_async_active_n_socket(Config) ->
 replication_async_active_n_socket(Config) ->
-  replication_test_run(Config, self(), [{tcp_opts, [{active, 1}]}, {ssl_opts, [{active, 1}]}]).
+  replication_test_run(Config, self(), [{socket_active, 1}]).
 
 
 replication_sync_active_n_socket(Config) ->
 replication_sync_active_n_socket(Config) ->
-  replication_test_run(Config, ?MODULE, [{tcp_opts, [{active, 1}]}, {ssl_opts, [{active, 1}]}]).
+  replication_test_run(Config, ?MODULE, [{socket_active, 1}]).
 
 
 replication_test_run(Config, Callback) ->
 replication_test_run(Config, Callback) ->
   replication_test_run(Config, Callback, []).
   replication_test_run(Config, Callback, []).
@@ -80,7 +80,7 @@ replication_test_run(Config, Callback, ExtOpts) ->
           Config,
           Config,
           fun(C2) ->
           fun(C2) ->
               ExpectedResult = lists:duplicate(length(Queries), {ok, 1}),
               ExpectedResult = lists:duplicate(length(Queries), {ok, 1}),
-              Res = Module:squery(C2, lists:flatten(Queries)),
+              Res = Module:squery(C2, Queries),
               ?assertEqual(ExpectedResult, Res)
               ?assertEqual(ExpectedResult, Res)
           end),
           end),
         Module:start_replication(C, "epgsql_test", Callback, {C, self()}, "0/0"),
         Module:start_replication(C, "epgsql_test", Callback, {C, self()}, "0/0"),
@@ -100,10 +100,10 @@ create_replication_slot(Config, Connection) ->
   {ok, Cols, Rows} =
   {ok, Cols, Rows} =
     Module:squery(Connection,
     Module:squery(Connection,
                   "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
                   "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
-  ?assertMatch([ #column{name = <<"slot_name">>}
-               , #column{name = <<"consistent_point">>}
-               , #column{name = <<"snapshot_name">>}
-               , #column{name = <<"output_plugin">>}
+  ?assertMatch([#column{name = <<"slot_name">>},
+                #column{name = <<"consistent_point">>},
+                #column{name = <<"snapshot_name">>},
+                #column{name = <<"output_plugin">>}
                ],
                ],
                Cols),
                Cols),
   ?assertMatch([{<<"epgsql_test">>, _, _, <<"test_decoding">>}], Rows).
   ?assertMatch([{<<"epgsql_test">>, _, _, <<"test_decoding">>}], Rows).