Просмотр исходного кода

Add replication_opts align_lsn to start_replication

Mikhail Kalashnikov 6 лет назад
Родитель
Сommit
baf39661ac
3 измененных файлов с 42 добавлено и 11 удалено
  1. 22 3
      src/epgsql.erl
  2. 15 6
      src/epgsql_sock.erl
  3. 5 2
      streaming.md

+ 22 - 3
src/epgsql.erl

@@ -26,6 +26,7 @@
          standby_status_update/3,
          start_replication/5,
          start_replication/6,
+         start_replication/7,
          to_proplist/1]).
 
 -export_type([connection/0, connect_option/0, connect_opts/0,
@@ -351,7 +352,20 @@ sync_on_error(_C, R) ->
 standby_status_update(Connection, FlushedLSN, AppliedLSN) ->
     gen_server:call(Connection, {standby_status_update, FlushedLSN, AppliedLSN}).
 
--spec start_replication(connection(), string(), Callback, cb_state(), string(), string()) -> ok | error_reply() when
+-type replication_option() ::
+    {align_lsn, boolean()}. %% Align last applied and flushed LSN with last received LSN
+                            %%  after Primary keepalive message with ReplyRequired flag
+
+-ifdef(have_maps).
+-type replication_opts() ::
+    [replication_option()]
+    | #{align_lsn => boolean()}.
+-else.
+    -type replication_opts() :: [replication_option()].
+-endif.
+
+-spec start_replication(connection(), string(), Callback, cb_state(), string(), string(), replication_opts()) ->
+    ok | error_reply() when
     Callback :: module() | pid().
 %% @doc instructs Postgres server to start streaming WAL for logical replication
 %% where
@@ -364,11 +378,16 @@ standby_status_update(Connection, FlushedLSN, AppliedLSN) ->
 %%                      "0/0" to let the server determine the start point.
 %% `PluginOpts'      - optional options passed to the slot's logical decoding plugin.
 %%                      For example: "option_name1 'value1', option_name2 'value2'"
+%% `Opts'            - options of logical replication
 %% returns `ok' otherwise `{error, Reason}'
+start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts, Opts0) ->
+    Opts = to_proplist(Opts0),
+    gen_server:call(Connection,
+        {start_replication, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts, Opts}).
 start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts) ->
-    gen_server:call(Connection, {start_replication, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts}).
+    start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts, []).
 start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition) ->
-    start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, []).
+    start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, [], []).
 
 %% @private
 to_proplist(List) when is_list(List) ->

+ 15 - 6
src/epgsql_sock.erl

@@ -89,7 +89,8 @@
                 repl_feedback_required,
                 repl_cbmodule,
                 repl_cbstate,
-                repl_receiver}).
+                repl_receiver,
+                repl_align_lsn}).
 
 %% -- client interface --
 
@@ -353,7 +354,7 @@ command(sync, State) ->
     send(State, ?SYNC, []),
     {noreply, State#state{sync_required = false}};
 
-command({start_replication, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts}, State) ->
+command({start_replication, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts, Opts}, State) ->
     Sql1 = ["START_REPLICATION SLOT """, ReplicationSlot, """ LOGICAL ", WALPosition],
     Sql2 =
         case PluginOpts of
@@ -369,8 +370,8 @@ command({start_replication, ReplicationSlot, Callback, CbInitState, WALPosition,
 
     Hex = [H || H <- WALPosition, H =/= $/],
     {ok, [LSN], _} = io_lib:fread("~16u", Hex),
-
-    State3 = State2#state{repl_last_flushed_lsn = LSN, repl_last_applied_lsn = LSN},
+    AlignLsn = proplists:get_value(align_lsn, Opts, false),
+    State3 = State2#state{repl_last_flushed_lsn = LSN, repl_last_applied_lsn = LSN, repl_align_lsn = AlignLsn},
 
     send(State3, ?SIMPLEQUERY, [Sql2, 0]),
     {noreply, State3}.
@@ -835,9 +836,17 @@ on_message({?COPY_DATA, _Data}, #state{repl_cbmodule = undefined, repl_receiver
 
 %% CopyData for Replication mode
 on_message({?COPY_DATA, <<?PRIMARY_KEEPALIVE_MESSAGE:8, LSN:?int64, _Timestamp:?int64, ReplyRequired:8>>},
-    #state{repl_last_flushed_lsn = LastFlushedLSN, repl_last_applied_lsn = LastAppliedLSN} = State) ->
+    #state{repl_last_flushed_lsn = LastFlushedLSN, repl_last_applied_lsn = LastAppliedLSN,
+        repl_align_lsn = AlignLsn} = State) ->
     case ReplyRequired of
-        1 ->
+        1 when AlignLsn ->
+            send(State, ?COPY_DATA,
+                epgsql_wire:encode_standby_status_update(LSN, LSN, LSN)),
+            {noreply, State#state{repl_feedback_required = false,
+                repl_last_received_lsn = LSN,
+                repl_last_applied_lsn = LSN,
+                repl_last_flushed_lsn = LSN}};
+        1 when not AlignLsn ->
             send(State, ?COPY_DATA,
                 epgsql_wire:encode_standby_status_update(LSN, LastFlushedLSN, LastAppliedLSN)),
             {noreply, State#state{repl_feedback_required = false, repl_last_received_lsn = LSN}};

+ 5 - 2
streaming.md

@@ -61,7 +61,7 @@ only special commands accepted in this mode
 
     ```erlang
     ok = epgsql:start_replication(Connection, ReplicationSlot, Callback, CbInitState, 
-            WALPosition, PluginOpts).
+            WALPosition, PluginOpts, Opts).
     ```
     - `Connection`           - connection in replication mode
     - `ReplicationSlot`      - the name of the replication slot to stream changes from
@@ -70,7 +70,10 @@ only special commands accepted in this mode
     - `CbInitState`          - initial state of callback module. 
     - `WALPosition`          - the WAL position XXX/XXX to begin streaming at.
                                "0/0" to let the server determine the start point.
-    - `PluginOpts`           - optional options passed to the slot's logical decoding plugin. 
+    - `PluginOpts`           - optional options passed to the slot's logical decoding plugin.
+    - `Opts`                 - options of logical replication. If decoding plugin filter some WAL records
+                                align_lsn = True must be set, otherwise it will not be possible 
+                                to stop PostgreSql DB correctly when logical replication is running.
     For example: "option_name1 'value1', option_name2 'value2'"
 
     On success, PostgreSQL server responds with a CopyBothResponse message, and then starts to stream WAL records.