Browse Source

Support {active, N} on TCP and SSL sockets for replication streaming mode

Yury Yantsevich 2 years ago
parent
commit
5894d3c04e
7 changed files with 263 additions and 102 deletions
  1. 4 2
      README.md
  2. 36 1
      doc/streaming.md
  3. 3 2
      src/commands/epgsql_cmd_connect.erl
  4. 16 1
      src/epgsql.erl
  5. 48 8
      src/epgsql_sock.erl
  6. 7 3
      test/epgsql_cth.erl
  7. 149 85
      test/epgsql_replication_SUITE.erl

+ 4 - 2
README.md

@@ -108,9 +108,11 @@ Only `host` and `username` are mandatory, but most likely you would need `databa
 - `ssl` if set to `true`, perform an attempt to connect in ssl mode, but continue unencrypted
   if encryption isn't supported by server. if set to `required` connection will fail if encryption
   is not available.
-- `ssl_opts` will be passed as is to `ssl:connect/3`
+- `ssl_opts` will be passed as is to `ssl:connect/3`. The `active` option is only available in
+  the `replication` mode and OTP >= 21.3.
 - `tcp_opts` will be passed as is to `gen_tcp:connect/3`. Some options are forbidden, such as
-  `mode`, `packet`, `header`, `active`. When `tcp_opts` is not provided, epgsql does some tuning
+  `mode`, `packet`, `header`. The `active` option is only available in the `replication` mode.
+  When `tcp_opts` is not provided, epgsql does some tuning
   (eg, sets TCP `keepalive` and auto-tunes `buffer`), but when `tcp_opts` is provided, no
   additional tweaks are added by epgsql itself, other than necessary ones (`active`, `packet` and `mode`).
 - `async` see [Server notifications](#server-notifications)

+ 36 - 1
doc/streaming.md

@@ -138,4 +138,39 @@ handle_x_log_data(StartLSN, EndLSN, Data, CbState) ->
     then you do not have any risk to lose data. 
     
     Otherwise (if you do not load all data from tables during erlang app startup) 
-    it is not recommended to set align_lsn to true. In this case to stop PG server stop epgsql replication first.
+    it is not recommended to set align_lsn to true. In this case to stop PG server stop epgsql replication first.
+    
+## Flow control
+
+It is possible to set `{active, N}` on a TCP or SSL (since OTP 21.3) socket. E.g. for SSL:
+```erlang
+Opts = #{ host => "localhost"
+        , username => "me"
+        , password => "pwd"
+        , database => "test"
+        , ssl => true
+        , ssl_opts => [{active, 10}]
+        },
+{ok, Conn} = epgsql:connect(Opts).
+```
+
+It is currently allowed only in the replication mode. Its main purpose is to control the flow of
+replication messages from Postgresql database. If a database is under a high load and a process, which
+handles the message stream, cannot keep up with it then setting this option gives the handling process
+ability to get messages on-demand.
+
+When the connection is in the asynchronous mode, a process which owns a connection will receive
+```erlang
+{epgsql, Connection, socket_passive}
+```
+The process decides when to activate connection's socket again. To do that it should call:
+```erlang
+epgsql:activate(Connection).
+```
+The `active` parameter of the socket will be set to the same value as it was configured in
+the connection's options.
+
+When the connection is in the synchronous mode, a provided callback module must implement
+`socket_passive/1` function, which receives a current callback state and should
+return `{ok, NewCallbackState}`. The callback should not call `epgsql:activate/1` directly
+because it results in a deadlock.

+ 3 - 2
src/commands/epgsql_cmd_connect.erl

@@ -295,11 +295,12 @@ prepare_tcp_opts(Opts0) ->
                          ({packet, _}) -> true;
                          ({packet_size, _}) -> true;
                          ({header, _}) -> true;
-                         ({active, _}) -> true;
                          (_) -> false
                       end, Opts0) of
         [] ->
-            [{active, false}, {packet, raw}, {mode, binary} | Opts0];
+            Opts = [{packet, raw}, {mode, binary} | Opts0],
+            %% Make sure that active is set to false while establishing a connection
+            lists:keystore(active, 1, Opts, {active, false});
         Forbidden ->
             error({forbidden_tcp_opts, Forbidden})
     end.

+ 16 - 1
src/epgsql.erl

@@ -37,7 +37,8 @@
          start_replication/5,
          start_replication/6,
          start_replication/7,
-         to_map/1]).
+         to_map/1,
+         activate/1]).
 -export([handle_x_log_data/5]).                 % private
 
 -export_type([connection/0, connect_option/0, connect_opts/0,
@@ -571,3 +572,17 @@ to_map(Map) when is_map(Map) ->
     Map;
 to_map(List) when is_list(List) ->
     maps:from_list(List).
+
+%% @doc Activates TCP or SSL socket of the connection.
+%%
+%% If {active, X} is set in:
+%% - `tcp_opts` and the current mode is TCP or
+%% - `ssl_opts` and the current mode is SSL
+%% the function sets {active, X} on the connection socket.
+%% It sets {active, true} otherwise.
+%%
+%% @param Connection connection
+%% @returns `ok' or `{error, Reason}'
+-spec activate(connection()) -> ok | {error, any()}.
+activate(Connection) ->
+  epgsql_sock:activate(Connection).

+ 48 - 8
src/epgsql_sock.erl

@@ -53,7 +53,8 @@
          cancel/1,
          copy_send_rows/3,
          standby_status_update/3,
-         get_backend_pid/1]).
+         get_backend_pid/1,
+         activate/1]).
 
 -export([handle_call/3, handle_cast/2, handle_info/2, format_status/2]).
 -export([init/1, code_change/3, terminate/2]).
@@ -69,7 +70,6 @@
 
 -export_type([transport/0, pg_sock/0, error/0]).
 
--include("epgsql.hrl").
 -include("protocol.hrl").
 -include("epgsql_replication.hrl").
 -include("epgsql_copy.hrl").
@@ -156,6 +156,10 @@ standby_status_update(C, FlushedLSN, AppliedLSN) ->
 get_backend_pid(C) ->
     gen_server:call(C, get_backend_pid).
 
+-spec activate(epgsql:connection()) -> ok | {error, any()}.
+activate(C) ->
+    gen_server:call(C, activate).
+
 %% -- command APIs --
 
 %% send()
@@ -189,8 +193,10 @@ set_attr(connect_opts, ConnectOpts, State) ->
 
 %% XXX: be careful!
 -spec set_packet_handler(atom(), pg_sock()) -> pg_sock().
-set_packet_handler(Handler, State) ->
-    State#state{handler = Handler}.
+set_packet_handler(Handler, State0) ->
+    State = State0#state{handler = Handler},
+    setopts(State, [{active, true}]),
+    State.
 
 -spec get_codec(pg_sock()) -> epgsql_binary:codec().
 get_codec(#state{codec = Codec}) ->
@@ -215,7 +221,6 @@ get_parameter_internal(Name, #state{parameters = Parameters}) ->
         false                  -> undefined
     end.
 
-
 %% -- gen_server implementation --
 
 init([]) ->
@@ -248,7 +253,11 @@ handle_call({standby_status_update, FlushedLSN, AppliedLSN}, _From,
 handle_call({copy_send_rows, Rows}, _From,
            #state{handler = Handler, subproto_state = CopyState} = State) ->
     Response = handle_copy_send_rows(Rows, Handler, CopyState, State),
-    {reply, Response, State}.
+    {reply, Response, State};
+
+handle_call(activate, _From, State) ->
+    setopts(State, [{active, true}]),
+    {reply, ok, State}.
 
 handle_cast({{Method, From, Ref} = Transport, Command, Args}, State)
   when ((Method == cast) or (Method == incremental)),
@@ -278,6 +287,11 @@ handle_info({DataTag, Sock, Data2}, #state{data = Data, sock = Sock} = State)
   when DataTag == tcp; DataTag == ssl ->
     loop(State#state{data = <<Data/binary, Data2/binary>>});
 
+handle_info({Passive, Sock}, #state{sock = Sock} = State)
+  when Passive == ssl_passive; Passive == tcp_passive ->
+    NewState = send_socket_pasive(State),
+    {noreply, NewState};
+
 handle_info({Closed, Sock}, #state{sock = Sock} = State)
   when Closed == tcp_closed; Closed == ssl_closed ->
     {stop, sock_closed, flush_queue(State#state{sock = undefined}, {error, sock_closed})};
@@ -313,6 +327,16 @@ format_status(terminate, [_PDict, State]) ->
   State#state{rows = information_redacted}.
 
 %% -- internal functions --
+-spec send_socket_pasive(pg_sock()) -> pg_sock().
+send_socket_pasive(#state{subproto_state = #repl{receiver = Rec}} = State) when Rec =/= undefined ->
+    Rec ! {epgsql, self(), socket_passive},
+    State;
+send_socket_pasive(#state{subproto_state = #repl{ cbmodule = CbMod
+                                                , cbstate = CbState} = Repl
+                         } = State) ->
+    {ok, NewCbState} = CbMod:socket_passive(CbState),
+    NewRepl = Repl#repl{cbstate = NewCbState},
+    State#state{subproto_state = NewRepl}.
 
 -spec command_new(transport(), epgsql_command:command(), any(), pg_sock()) ->
                          Result when
@@ -407,13 +431,29 @@ command_next(#state{current_cmd = PrevCmd,
                         results = []}
     end.
 
-
-setopts(#state{mod = Mod, sock = Sock}, Opts) ->
+setopts(#state{mod = Mod, sock = Sock} = State, DefaultOpts) ->
+    Opts = update_active(State, DefaultOpts),
     case Mod of
         gen_tcp -> inet:setopts(Sock, Opts);
         ssl     -> ssl:setopts(Sock, Opts)
     end.
 
+update_active(#state{handler = H}, DefaultOpts) when H =/= on_replication ->
+    %% Ignore active option in tcp_opts or ssl_opts unless in the replication mode
+    DefaultOpts;
+update_active(#state{mod = gen_tcp, connect_opts = #{tcp_opts := Opts}}, DefaultOpts) ->
+    update_active_opt(Opts, DefaultOpts);
+update_active(#state{mod = ssl, connect_opts = #{ssl_opts := Opts}}, DefaultOpts) ->
+    update_active_opt(Opts, DefaultOpts);
+update_active(_State, DefaultOpts) ->
+    DefaultOpts.
+
+update_active_opt(Opts, DefaultOpts) ->
+    case proplists:lookup(active, Opts) of
+        none -> DefaultOpts;
+        Active -> lists:keystore(active, 1, DefaultOpts, Active)
+    end.
+
 %% This one only used in connection initiation to send client's
 %% `StartupMessage' and `SSLRequest' packets
 -spec send(pg_sock(), iodata()) -> ok | {error, any()}.

+ 7 - 3
test/epgsql_cth.erl

@@ -150,7 +150,7 @@ init_database(Config) ->
 
     {ok, Cwd} = file:get_cwd(),
     PgDataDir = filename:append(Cwd, "datadir"),
-    {ok, _} = exec:run(Initdb ++ " --locale en_US.UTF8 " ++ PgDataDir, [sync,stdout,stderr]),
+    {ok, _} = exec:run(Initdb ++ " --locale en_US.UTF-8 " ++ PgDataDir, [sync, stdout, stderr]),
     [{datadir, PgDataDir}|Config].
 
 get_version(Config) ->
@@ -200,6 +200,10 @@ write_pg_hba_config(Config) ->
     Version = ?config(version, Config),
 
     User = os:getenv("USER"),
+    ClientCert = case Version >= [12] of
+                   true -> "cert";
+                   false -> "cert clientcert=1"
+                 end,
     PGConfig = [
         "local   all             ", User, "                              trust\n",
         "host    template1       ", User, "              127.0.0.1/32    trust\n",
@@ -209,7 +213,7 @@ write_pg_hba_config(Config) ->
         "host    epgsql_test_db1 epgsql_test             127.0.0.1/32    trust\n",
         "host    epgsql_test_db1 epgsql_test_md5         127.0.0.1/32    md5\n",
         "host    epgsql_test_db1 epgsql_test_cleartext   127.0.0.1/32    password\n",
-        "hostssl epgsql_test_db1 epgsql_test_cert        127.0.0.1/32    cert clientcert=1\n",
+        "hostssl epgsql_test_db1 epgsql_test_cert        127.0.0.1/32    ", ClientCert, "\n",
         "host    template1       ", User, "              ::1/128    trust\n",
         "host    ", User, "      ", User, "              ::1/128    trust\n",
         "hostssl postgres        ", User, "              ::1/128    trust\n",
@@ -217,7 +221,7 @@ write_pg_hba_config(Config) ->
         "host    epgsql_test_db1 epgsql_test             ::1/128    trust\n",
         "host    epgsql_test_db1 epgsql_test_md5         ::1/128    md5\n",
         "host    epgsql_test_db1 epgsql_test_cleartext   ::1/128    password\n",
-        "hostssl epgsql_test_db1 epgsql_test_cert        ::1/128    cert clientcert=1\n" |
+        "hostssl epgsql_test_db1 epgsql_test_cert        ::1/128    ", ClientCert, "\n" |
         case Version >= [10] of
             true ->
                 %% See

+ 149 - 85
test/epgsql_replication_SUITE.erl

@@ -1,110 +1,174 @@
 -module(epgsql_replication_SUITE).
+
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include("epgsql.hrl").
 
--export([
-    init_per_suite/1,
-    all/0,
-    end_per_suite/1,
+-export([ all/0
+        , init_per_suite/1
+        , end_per_suite/1
 
-    connect_in_repl_mode/1,
-    create_drop_replication_slot/1,
-    replication_sync/1,
-    replication_async/1,
+        , connect_in_repl_mode/1
+        , create_drop_replication_slot/1
+        , replication_sync/1
+        , replication_async/1
+        , replication_async_active_n_socket/1
+        , replication_sync_active_n_socket/1
 
-    %% Callbacks
-    handle_x_log_data/4
-]).
+          %% Callbacks
+        , handle_x_log_data/4
+        , socket_passive/1
+        ]).
 
 init_per_suite(Config) ->
-    [{module, epgsql}|Config].
+  [{module, epgsql} | Config].
 
 end_per_suite(_Config) ->
-    ok.
+  ok.
 
 all() ->
-    [
-     connect_in_repl_mode,
-     create_drop_replication_slot,
-     replication_async,
-     replication_sync
-    ].
+  [ connect_in_repl_mode
+  , create_drop_replication_slot
+  , replication_async
+  , replication_sync
+  , replication_async_active_n_socket
+  , replication_sync_active_n_socket
+  ].
 
 connect_in_repl_mode(Config) ->
-    epgsql_ct:connect_only(Config, ["epgsql_test_replication",
-        "epgsql_test_replication",
-        [{database, "epgsql_test_db1"}, {replication, "database"}]]).
+  epgsql_ct:connect_only(
+    Config,
+    [ "epgsql_test_replication"
+    , "epgsql_test_replication"
+    , [{database, "epgsql_test_db1"}, {replication, "database"}]
+    ]).
 
 create_drop_replication_slot(Config) ->
-    Module = ?config(module, Config),
-    epgsql_ct:with_connection(
-        Config,
-        fun(C) ->
-            {ok, Cols, Rows} = Module:squery(C, "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
-            [#column{name = <<"slot_name">>}, #column{name = <<"consistent_point">>},
-                #column{name = <<"snapshot_name">>}, #column{name = <<"output_plugin">>}] = Cols,
-            [{<<"epgsql_test">>, _, _, <<"test_decoding">>}] = Rows,
-            [{ok, _, _}, {ok, _, _}] = Module:squery(C, "DROP_REPLICATION_SLOT ""epgsql_test""")
-        end,
-        "epgsql_test_replication",
-        [{replication, "database"}]).
+  epgsql_ct:with_connection(
+    Config,
+    fun(C) ->
+        create_replication_slot(Config, C),
+        drop_replication_slot(Config, C)
+    end,
+    "epgsql_test_replication",
+    [{replication, "database"}]).
 
 replication_async(Config) ->
-    replication_test_run(Config, self()).
+  replication_test_run(Config, self()).
 
 replication_sync(Config) ->
-    replication_test_run(Config, ?MODULE).
+  replication_test_run(Config, ?MODULE).
+
+replication_async_active_n_socket(Config) ->
+  replication_test_run(Config, self(), [{tcp_opts, [{active, 1}]}, {ssl_opts, [{active, 1}]}]).
+
+replication_sync_active_n_socket(Config) ->
+  replication_test_run(Config, ?MODULE, [{tcp_opts, [{active, 1}]}, {ssl_opts, [{active, 1}]}]).
 
 replication_test_run(Config, Callback) ->
-    Module = ?config(module, Config),
-    epgsql_ct:with_connection(
-        Config,
-        fun(C) ->
-            {ok, _, _} = Module:squery(C, "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
-
-            %% new connection because main id in a replication mode
-            epgsql_ct:with_connection(
-                Config,
-                fun(C2) ->
-                    [{ok, 1},{ok, 1}] = Module:squery(C2,
-                        "insert into test_table1 (id, value) values (5, 'five');delete from test_table1 where id = 5;")
-                end),
-
-            Module:start_replication(C, "epgsql_test", Callback, {C, self()}, "0/0"),
-            ok = receive_replication_msgs(
-                [<<"table public.test_table1: INSERT: id[integer]:5 value[text]:'five'">>,
-                    <<"table public.test_table1: DELETE: id[integer]:5">>], C, [])
-        end,
-        "epgsql_test_replication",
-        [{replication, "database"}]),
-    %% cleanup
-    epgsql_ct:with_connection(
-        Config,
-        fun(C) ->
-            [{ok, _, _}, {ok, _, _}] = Module:squery(C, "DROP_REPLICATION_SLOT ""epgsql_test""")
-        end,
-        "epgsql_test_replication",
-        [{replication, "database"}]).
-
-receive_replication_msgs(Pattern, Pid, ReceivedMsgs) ->
-    receive
-        {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"BEGIN", _/binary>>}} ->
-            receive_replication_msgs(Pattern, Pid, [begin_msg | ReceivedMsgs]);
-        {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"COMMIT", _/binary>>}} ->
-            case lists:reverse(ReceivedMsgs) of
-                [begin_msg, row_msg | _] -> ok;
-                _ -> error_replication_messages_not_received
-            end;
-        {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, Msg}} ->
-            [Msg | T] = Pattern,
-            receive_replication_msgs(T, Pid, [row_msg | ReceivedMsgs])
-    after
-        60000 ->
-            error_timeout
-    end.
+  replication_test_run(Config, Callback, []).
+
+replication_test_run(Config, Callback, ExtOpts) ->
+  Module = ?config(module, Config),
+  {Queries, ReplicationMsgs} = gen_query_and_replication_msgs(lists:seq(100, 110)),
+  epgsql_ct:with_connection(
+    Config,
+    fun(C) ->
+        create_replication_slot(Config, C),
+        %% new connection because main is in the replication mode
+        epgsql_ct:with_connection(
+          Config,
+          fun(C2) ->
+              ExpectedResult = lists:duplicate(length(Queries), {ok, 1}),
+              Res = Module:squery(C2, lists:flatten(Queries)),
+              ?assertEqual(ExpectedResult, Res)
+          end),
+        Module:start_replication(C, "epgsql_test", Callback, {C, self()}, "0/0"),
+        ok = receive_replication_msgs(Module, ReplicationMsgs, C, [])
+    end,
+    "epgsql_test_replication",
+    [{replication, "database"} | ExtOpts]),
+  %% cleanup
+  epgsql_ct:with_connection(
+    Config,
+    fun(C) -> drop_replication_slot(Config, C) end,
+    "epgsql_test_replication",
+    [{replication, "database"}]).
+
+create_replication_slot(Config, Connection) ->
+  Module = ?config(module, Config),
+  {ok, Cols, Rows} =
+    Module:squery(Connection,
+                  "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
+  ?assertMatch([ #column{name = <<"slot_name">>}
+               , #column{name = <<"consistent_point">>}
+               , #column{name = <<"snapshot_name">>}
+               , #column{name = <<"output_plugin">>}
+               ],
+               Cols),
+  ?assertMatch([{<<"epgsql_test">>, _, _, <<"test_decoding">>}], Rows).
+
+drop_replication_slot(Config, Connection) ->
+  Module = ?config(module, Config),
+  Result = Module:squery(Connection, "DROP_REPLICATION_SLOT ""epgsql_test"""),
+  case ?config(version, ?config(pg_config, Config)) >= [13, 0] of
+    true -> ?assertMatch({ok, _, _}, Result);
+    false -> ?assertMatch([{ok, _, _}, {ok, _, _}], Result)
+  end.
+
+gen_query_and_replication_msgs(Ids) ->
+  QInsFmt = "INSERT INTO test_table1 (id, value) VALUES (~b, '~s');",
+  QDelFmt = "DELETE FROM test_table1 WHERE id = ~b;",
+  RmInsFmt = "table public.test_table1: INSERT: id[integer]:~b value[text]:'~s'",
+  RmDelFmt = "table public.test_table1: DELETE: id[integer]:~b",
+  LongBin = base64:encode(crypto:strong_rand_bytes(254)),
+  lists:foldl(
+    fun(Id, {Qs, RMs}) ->
+        QIns = lists:flatten(io_lib:format(QInsFmt, [Id, LongBin])),
+        QDel = lists:flatten(io_lib:format(QDelFmt, [Id])),
+        RmIns = iolist_to_binary(io_lib:format(RmInsFmt, [Id, LongBin])),
+        RmDel = iolist_to_binary(io_lib:format(RmDelFmt, [Id])),
+        {Qs ++ [QIns, QDel], RMs ++ [RmIns, RmDel]}
+    end,
+    {[], []},
+    Ids).
+
+receive_replication_msgs(Module, Pattern, Pid, ReceivedMsgs) ->
+  receive
+    {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"BEGIN", _/binary>>}} ->
+      receive_replication_msgs(Module, Pattern, Pid, [begin_msg | ReceivedMsgs]);
+    {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"COMMIT", _/binary>>}} ->
+      ensure_no_socket_passive_msgs(Module, Pid),
+      case lists:reverse(ReceivedMsgs) of
+        [begin_msg, row_msg | _] -> ok;
+        _ -> error_replication_messages_not_received
+      end;
+    {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, Msg}} ->
+      [Msg | T] = Pattern,
+      receive_replication_msgs(Module, T, Pid, [row_msg | ReceivedMsgs]);
+    {epgsql, Pid, socket_passive} ->
+      Module:activate(Pid),
+      receive_replication_msgs(Module, Pattern, Pid, ReceivedMsgs)
+  after
+    60000 ->
+      error_timeout
+  end.
+
+ensure_no_socket_passive_msgs(Module, Pid) ->
+  receive
+    {epgsql, Pid, socket_passive} ->
+      Module:activate(Pid),
+      ensure_no_socket_passive_msgs(Module, Pid)
+  after
+    100 ->
+      ok
+  end.
 
 handle_x_log_data(StartLSN, EndLSN, Data, CbState) ->
-    {C, Pid} = CbState,
-    Pid ! {epgsql, C, {x_log_data, StartLSN, EndLSN, Data}},
-    {ok, EndLSN, EndLSN, CbState}.
+  {C, Pid} = CbState,
+  Pid ! {epgsql, C, {x_log_data, StartLSN, EndLSN, Data}},
+  {ok, EndLSN, EndLSN, CbState}.
+
+socket_passive({C, _Pid} = CbState) ->
+  spawn(fun() -> epgsql:activate(C) end),
+  {ok, CbState}.