Browse Source

Cleanup replication code

Сергей Прохоров 7 years ago
parent
commit
b1ca2542ea
1 changed files with 82 additions and 51 deletions
  1. 82 51
      src/epgsql_sock.erl

+ 82 - 51
src/epgsql_sock.erl

@@ -16,7 +16,7 @@
 -export([init/1, code_change/3, terminate/2]).
 
 %% state callbacks
--export([auth/2, initializing/2, on_message/2]).
+-export([auth/2, initializing/2, on_message/2, on_replication/2]).
 
 -include("epgsql.hrl").
 -include("epgsql_binary.hrl").
@@ -66,6 +66,17 @@
 -define(PRIMARY_KEEPALIVE_MESSAGE, $k).
 -define(STANDBY_STATUS_UPDATE, $r).
 
+-record(repl,
+        {
+          last_received_lsn :: integer() | undefined,
+          last_flushed_lsn :: integer() | undefined,
+          last_applied_lsn :: integer() | undefined,
+          feedback_required :: boolean() | undefined,
+          cbmodule :: module() | undefined,
+          cbstate :: any() | undefined,
+          receiver :: pid() | undefined
+        }).
+
 -record(state, {mod,
                 sock,
                 data = <<>>,
@@ -83,13 +94,7 @@
                 sync_required,
                 txstatus,
                 complete_status :: undefined | atom() | {atom(), integer()},
-                repl_last_received_lsn,
-                repl_last_flushed_lsn,
-                repl_last_applied_lsn,
-                repl_feedback_required,
-                repl_cbmodule,
-                repl_cbstate,
-                repl_receiver}).
+                repl :: #repl{} | undefined}).
 
 %% -- client interface --
 
@@ -136,9 +141,12 @@ handle_call(get_cmd_status, _From, #state{complete_status = Status} = State) ->
     {reply, {ok, Status}, State};
 
 handle_call({standby_status_update, FlushedLSN, AppliedLSN}, _From,
-    #state{repl_last_received_lsn = ReceivedLSN} = State) ->
+            #state{handler = on_replication,
+                   repl = #repl{last_received_lsn = ReceivedLSN} = Repl} = State) ->
     send(State, ?COPY_DATA, epgsql_wire:encode_standby_status_update(ReceivedLSN, FlushedLSN, AppliedLSN)),
-    {reply, ok, State#state{repl_last_flushed_lsn = FlushedLSN, repl_last_applied_lsn = AppliedLSN}};
+    Repl1 = Repl#repl{last_flushed_lsn = FlushedLSN,
+                      last_applied_lsn = AppliedLSN},
+    {reply, ok, State#state{repl = Repl1}};
 
 handle_call(Command, From, State) ->
     #state{queue = Q} = State,
@@ -231,10 +239,12 @@ command({connect, Host, Username, Password, Opts}, State) ->
                 Database  -> [Opts2 | ["database", 0, Database, 0]]
             end,
 
-            Opts4 = case proplists:get_value(replication, Opts, undefined) of
-                        undefined -> Opts3;
-                        Replication  -> [Opts3 | ["replication", 0, Replication, 0]]
-                    end,
+            {Opts4, Repl} = case proplists:get_value(replication, Opts, undefined) of
+                                undefined -> {Opts3, undefined};
+                                Replication  ->
+                                    {[Opts3 | ["replication", 0, Replication, 0]],
+                                     #repl{}}
+                            end,
 
             send(State2, [<<196608:?int32>>, Opts4, 0]),
             Async   = proplists:get_value(async, Opts, undefined),
@@ -243,6 +253,7 @@ command({connect, Host, Username, Password, Opts}, State) ->
             put(password, Password),
             {noreply,
              State2#state{handler = auth,
+                          repl = Repl,
                           async = Async}};
 
         {error, Reason} = Error ->
@@ -402,7 +413,8 @@ command(sync, State) ->
     send(State, ?SYNC, []),
     {noreply, State#state{sync_required = false}};
 
-command({start_replication, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts}, State) ->
+command({start_replication, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts},
+        #state{repl = Repl} = State) ->
     %% > SIMPLEQUERY
     %% < CopyBothResponse
     %% < COPY_DATA+
@@ -414,19 +426,21 @@ command({start_replication, ReplicationSlot, Callback, CbInitState, WALPosition,
             PluginOpts -> [Sql1 , " (", PluginOpts, ")"]
         end,
 
-    State2 =
+    Repl2 =
         case Callback of
-            Pid when is_pid(Pid) -> State#state{repl_receiver = Pid};
-            Module -> State#state{repl_cbmodule = Module, repl_cbstate = CbInitState}
+            Pid when is_pid(Pid) -> Repl#repl{receiver = Pid};
+            Module -> Repl#repl{cbmodule = Module, cbstate = CbInitState}
         end,
 
     Hex = [H || H <- WALPosition, H =/= $/],
     {ok, [LSN], _} = io_lib:fread("~16u", Hex),
 
-    State3 = State2#state{repl_last_flushed_lsn = LSN, repl_last_applied_lsn = LSN},
+    State2 = State#state{repl = Repl2#repl{last_flushed_lsn = LSN,
+                                           last_applied_lsn = LSN},
+                         handler = on_replication},
 
-    send(State3, ?SIMPLEQUERY, [Sql2, 0]),
-    {noreply, State3}.
+    send(State2, ?SIMPLEQUERY, [Sql2, 0]),
+    {noreply, State2}.
 
 start_ssl(S, Flag, Opts, State) ->
     ok = gen_tcp:send(S, <<8:?int32, 80877103:?int32>>),
@@ -475,9 +489,7 @@ do_send(gen_tcp, Sock, Bin) ->
 do_send(Mod, Sock, Bin) ->
     Mod:send(Sock, Bin).
 
-loop(#state{data = Data, handler = Handler, repl_last_received_lsn = LastReceivedLSN,
-    repl_last_flushed_lsn = LastFlushedLSN, repl_last_applied_lsn = LastAppliedLSN,
-    repl_feedback_required = ReplFeedbackRequired} = State) ->
+loop(#state{data = Data, handler = Handler, repl = Repl} = State) ->
     case epgsql_wire:decode_message(Data) of
         {Message, Tail} ->
             case ?MODULE:Handler(Message, State#state{data = Tail}) of
@@ -488,11 +500,14 @@ loop(#state{data = Data, handler = Handler, repl_last_received_lsn = LastReceive
             end;
         _ ->
             %% in replication mode send feedback after each batch of messages
-            case ReplFeedbackRequired of
+            case (Repl =/= undefined) andalso (Repl#repl.feedback_required) of
                 true ->
+                    #repl{last_received_lsn = LastReceivedLSN,
+                          last_flushed_lsn = LastFlushedLSN,
+                          last_applied_lsn = LastAppliedLSN} = Repl,
                     send(State, ?COPY_DATA, epgsql_wire:encode_standby_status_update(
                         LastReceivedLSN, LastFlushedLSN, LastAppliedLSN)),
-                    {noreply, State#state{repl_feedback_required = false}};
+                    {noreply, State#state{repl = Repl#repl{feedback_required = false}}};
                 _ ->
                     {noreply, State}
             end
@@ -877,38 +892,54 @@ on_message({?NOTIFICATION, <<Pid:?int32, Strings/binary>>}, State) ->
     notify_async(State, {notification, Channel1, Pid, Payload1}),
     {noreply, State};
 
+%% CopyData for COPY command. COPY command not supported yet.
+on_message({?COPY_DATA, _Data}, State) ->
+    {stop, {error, copy_command_not_supported}, State}.
+
+
 %% CopyBothResponse
-on_message({?COPY_BOTH_RESPONSE, _Data}, State) ->
+on_replication({?COPY_BOTH_RESPONSE, _Data}, State) ->
     State2 = finish(State, ok),
     {noreply, State2};
 
-%% CopyData for COPY command. COPY command not supported yet.
-on_message({?COPY_DATA, _Data}, #state{repl_cbmodule = undefined, repl_receiver = undefined} = State) ->
-    {stop, {error, copy_command_not_supported}, State};
+%% CopyData for Replication mode
+on_replication({?COPY_DATA, <<?PRIMARY_KEEPALIVE_MESSAGE:8, LSN:?int64, _Timestamp:?int64, ReplyRequired:8>>},
+    #state{repl = #repl{last_flushed_lsn = LastFlushedLSN,
+                        last_applied_lsn = LastAppliedLSN} = Repl} = State) ->
+    Repl1 =
+        case ReplyRequired of
+            1 ->
+                send(State, ?COPY_DATA,
+                     epgsql_wire:encode_standby_status_update(LSN, LastFlushedLSN, LastAppliedLSN)),
+                Repl#repl{feedback_required = false,
+                          last_received_lsn = LSN};
+            _ ->
+                Repl#repl{feedback_required = true,
+                          last_received_lsn = LSN}
+        end,
+    {noreply, State#state{repl = Repl1}};
 
 %% CopyData for Replication mode
-on_message({?COPY_DATA, <<?PRIMARY_KEEPALIVE_MESSAGE:8, LSN:?int64, _Timestamp:?int64, ReplyRequired:8>>},
-    #state{repl_last_flushed_lsn = LastFlushedLSN, repl_last_applied_lsn = LastAppliedLSN} = State) ->
-    case ReplyRequired of
-        1 ->
-            send(State, ?COPY_DATA,
-                epgsql_wire:encode_standby_status_update(LSN, LastFlushedLSN, LastAppliedLSN)),
-            {noreply, State#state{repl_feedback_required = false, repl_last_received_lsn = LSN}};
-        _ ->
-            {noreply, State#state{repl_feedback_required = true, repl_last_received_lsn = LSN}}
-    end;
+on_replication({?COPY_DATA, <<?X_LOG_DATA, StartLSN:?int64, EndLSN:?int64,
+                              _Timestamp:?int64, WALRecord/binary>>},
+               #state{repl = Repl} = State) ->
+    Repl1 = handle_xlog_data(StartLSN, EndLSN, WALRecord, Repl),
+    {noreply, State#state{repl = Repl1}}.
 
-%% CopyData for Replication mode. with async messages
-on_message({?COPY_DATA, <<?X_LOG_DATA, StartLSN:?int64, EndLSN:?int64, _Timestamp:?int64, WALRecord/binary>>},
-    #state{repl_cbmodule = undefined, repl_receiver = Receiver} = State) ->
-    Receiver ! {epgsql, self(), {x_log_data, StartLSN, EndLSN, WALRecord}},
-    {noreply, State#state{repl_feedback_required = true, repl_last_received_lsn = EndLSN}};
 
-%% CopyData for Replication mode. with callback method
-on_message({?COPY_DATA, <<?X_LOG_DATA, StartLSN:?int64, EndLSN:?int64, _Timestamp:?int64, WALRecord/binary>>},
-    #state{repl_cbmodule = CbModule, repl_cbstate = CbState, repl_receiver = undefined} = State) ->
+handle_xlog_data(StartLSN, EndLSN, WALRecord, #repl{cbmodule = undefined,
+                                                    receiver = Receiver} = Repl) ->
+    %% with async messages
+    Receiver ! {epgsql, self(), {x_log_data, StartLSN, EndLSN, WALRecord}},
+    Repl#repl{feedback_required = true,
+              last_received_lsn = EndLSN};
+handle_xlog_data(StartLSN, EndLSN, WALRecord,
+                 #repl{cbmodule = CbModule, cbstate = CbState, receiver = undefined} = Repl) ->
+    %% with callback method
     {ok, LastFlushedLSN, LastAppliedLSN, NewCbState} =
         CbModule:handle_x_log_data(StartLSN, EndLSN, WALRecord, CbState),
-    {noreply, State#state{repl_feedback_required = true, repl_last_received_lsn = EndLSN,
-        repl_last_flushed_lsn = LastFlushedLSN, repl_last_applied_lsn = LastAppliedLSN,
-        repl_cbstate = NewCbState}}.
+    Repl#repl{feedback_required = true,
+              last_received_lsn = EndLSN,
+              last_flushed_lsn = LastFlushedLSN,
+              last_applied_lsn = LastAppliedLSN,
+              cbstate = NewCbState}.