Browse Source

Merge pull request #181 from SIfoxDevTeam/repl_fix

Add options to epgsql:start_replication
Sergey Prokhorov 6 years ago
parent
commit
93f6a14981
5 changed files with 74 additions and 18 deletions
  1. 9 6
      src/commands/epgsql_cmd_start_replication.erl
  2. 19 7
      src/epgsql.erl
  3. 2 1
      src/epgsql_replication.hrl
  4. 8 2
      src/epgsql_sock.erl
  5. 36 2
      streaming.md

+ 9 - 6
src/commands/epgsql_cmd_start_replication.erl

@@ -16,18 +16,20 @@
          callback,
          cb_state,
          wal_pos,
-         plugin_opts}).
+         plugin_opts,
+         opts}).
 
-init({ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts}) ->
+init({ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts, Opts}) ->
     #start_repl{slot = ReplicationSlot,
                 callback = Callback,
                 cb_state = CbInitState,
                 wal_pos = WALPosition,
-                plugin_opts = PluginOpts}.
+                plugin_opts = PluginOpts,
+                opts = Opts}.
 
 execute(Sock, #start_repl{slot = ReplicationSlot, callback = Callback,
                           cb_state = CbInitState, wal_pos = WALPosition,
-                          plugin_opts = PluginOpts} = St) ->
+                          plugin_opts = PluginOpts, opts = Opts} = St) ->
     %% Connection should be started with 'replication' option. Then
     %% 'replication_state' will be initialized
     Repl = #repl{} = epgsql_sock:get_replication_state(Sock),
@@ -46,9 +48,10 @@ execute(Sock, #start_repl{slot = ReplicationSlot, callback = Callback,
 
     Hex = [H || H <- WALPosition, H =/= $/],
     {ok, [LSN], _} = io_lib:fread("~16u", Hex),
-
+    AlignLsn = maps:get(align_lsn, Opts, false),
     Repl3 = Repl2#repl{last_flushed_lsn = LSN,
-                       last_applied_lsn = LSN},
+                       last_applied_lsn = LSN,
+                       align_lsn = AlignLsn},
     Sock2 = epgsql_sock:set_attr(replication_state, Repl3, Sock),
                          %% handler = on_replication},
 

+ 19 - 7
src/epgsql.erl

@@ -27,6 +27,7 @@
          standby_status_update/3,
          start_replication/5,
          start_replication/6,
+         start_replication/7,
          to_map/1]).
 -export([handle_x_log_data/5]).                 % private
 
@@ -414,9 +415,18 @@ standby_status_update(Connection, FlushedLSN, AppliedLSN) ->
 handle_x_log_data(Mod, StartLSN, EndLSN, WALRecord, Repl) ->
     Mod:handle_x_log_data(StartLSN, EndLSN, WALRecord, Repl).
 
--spec start_replication(connection(), string(), Callback, cb_state(), string(), string()) -> Response when
-      Response :: epgsql_cmd_start_replication:response(),
-      Callback :: module() | pid().
+-type replication_option() ::
+    {align_lsn, boolean()}. %% Align last applied and flushed LSN with last received LSN
+                            %%  after Primary keepalive message with ReplyRequired flag
+
+-type replication_opts() ::
+    [replication_option()]
+    | #{align_lsn => boolean()}.
+
+-spec start_replication(connection(), string(), Callback, cb_state(), string(), string(), replication_opts()) ->
+    Response when
+    Response :: epgsql_cmd_start_replication:response(),
+    Callback :: module() | pid().
 %% @doc instructs Postgres server to start streaming WAL for logical replication
 %% where
 %% `Connection'      - connection in replication mode
@@ -428,13 +438,15 @@ handle_x_log_data(Mod, StartLSN, EndLSN, WALRecord, Repl) ->
 %%                      "0/0" to let the server determine the start point.
 %% `PluginOpts'      - optional options passed to the slot's logical decoding plugin.
 %%                      For example: "option_name1 'value1', option_name2 'value2'"
+%% `Opts'            - options of logical replication
 %% returns `ok' otherwise `{error, Reason}'
-start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts) ->
-    Command = {ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts},
+start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts, Opts) ->
+    Command = {ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts, to_map(Opts)},
     epgsql_sock:sync_command(Connection, epgsql_cmd_start_replication, Command).
-
+start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts) ->
+    start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts, []).
 start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition) ->
-    start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, []).
+    start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, [], []).
 
 %% @private
 -spec to_map([{any(), any()}] | map()) -> map().

+ 2 - 1
src/epgsql_replication.hrl

@@ -6,5 +6,6 @@
           feedback_required :: boolean() | undefined,
           cbmodule :: module() | undefined,
           cbstate :: any() | undefined,
-          receiver :: pid() | undefined
+          receiver :: pid() | undefined,
+          align_lsn :: boolean() | undefined
         }).

+ 8 - 2
src/epgsql_sock.erl

@@ -525,10 +525,16 @@ on_message(Msg, Payload, State) ->
 %% CopyData for Replication mode
 on_replication(?COPY_DATA, <<?PRIMARY_KEEPALIVE_MESSAGE:8, LSN:?int64, _Timestamp:?int64, ReplyRequired:8>>,
                #state{repl = #repl{last_flushed_lsn = LastFlushedLSN,
-                                   last_applied_lsn = LastAppliedLSN} = Repl} = State) ->
+                                   last_applied_lsn = LastAppliedLSN,
+                                   align_lsn = AlignLsn} = Repl} = State) ->
     Repl1 =
         case ReplyRequired of
-            1 ->
+            1 when AlignLsn ->
+                send(State, ?COPY_DATA,
+                     epgsql_wire:encode_standby_status_update(LSN, LSN, LSN)),
+                Repl#repl{feedback_required = false,
+                     last_received_lsn = LSN, last_applied_lsn = LSN, last_flushed_lsn = LSN};
+            1 when not AlignLsn ->
                 send(State, ?COPY_DATA,
                      epgsql_wire:encode_standby_status_update(LSN, LastFlushedLSN, LastAppliedLSN)),
                 Repl#repl{feedback_required = false,

+ 36 - 2
streaming.md

@@ -61,7 +61,7 @@ only special commands accepted in this mode
 
     ```erlang
     ok = epgsql:start_replication(Connection, ReplicationSlot, Callback, CbInitState, 
-            WALPosition, PluginOpts).
+            WALPosition, PluginOpts, Opts).
     ```
     - `Connection`           - connection in replication mode
     - `ReplicationSlot`      - the name of the replication slot to stream changes from
@@ -71,7 +71,10 @@ only special commands accepted in this mode
     - `WALPosition`          - the WAL position XXX/XXX to begin streaming at.
                                "0/0" to let the server determine the start point.
     - `PluginOpts`           - optional options passed to the slot's logical decoding plugin. 
-    For example: "option_name1 'value1', option_name2 'value2'"
+                               For example: "option_name1 'value1', option_name2 'value2'"
+    - `Opts`                 - options of logical replication. 
+                               See Logical replication options section for details.
+    
 
     On success, PostgreSQL server responds with a CopyBothResponse message, and then starts to stream WAL records.
     PostgreSQL sends CopyData messages which contain:
@@ -105,3 +108,34 @@ handle_x_log_data(StartLSN, EndLSN, Data, CbState) ->
     io:format("~p~n", [{StartLSN, EndLSN, Data}]),
     {ok, EndLSN, EndLSN, CbState}.
 ```
+
+##Logical replication options
+
+* **align_lsn** - Default - false.
+
+    During shutdown PG server waits to exit until XLOG records have been sent to the standby, 
+    up to the shutdown checkpoint record and sends `Primary keepalive message` 
+    with the special flag (which means that the client should reply to this message as soon as possible) 
+    to get the last applied LSN from the standby.
+
+    If epgsql uses for replication a decoding plugin which filter some WAL records 
+    (for example pgoutput and PG publications with some tables) 
+    then epgsql will not receive all WAL records and keep in the state not the latest LSN.
+    In such case it is not be possible to stop PG server if epgsql replication is running, 
+    because epgsql is not able to report latest LSN.
+
+    To overcome this problem use option `align_lsn = true`.
+    If this option enabled when epgsql gets `Primary keepalive message` with the reply required flag 
+    it will send back to PG server LSN received in `Primary keepalive message`. 
+    PG server will stop normally after receiving this latest LSN.
+    If during PG server shutdown epgsql has some replication delay 
+    then there is a risk to skip not yet received wal segments, 
+    i.e. after restart PG server will send only new wal segments.
+    
+    However if you use epgsql replication to implement a case 'Consistent cache' 
+    and re-create replication slots (or use temporary slots) 
+    and load all data from tables on an application startup 
+    then you do not have any risk to lose data. 
+    
+    Otherwise (if you do not load all data from tables during erlang app startup) 
+    it is not recommended to set align_lsn to true. In this case to stop PG server stop epgsql replication first.