Browse Source

Merge pull request #103 from SIfoxDevTeam/repl_protocol

Streaming replication protocol support
Sergey Prokhorov 8 years ago
parent
commit
6895536978
8 changed files with 388 additions and 14 deletions
  1. 3 3
      README.md
  2. 4 0
      setup_test_db.sh
  3. 46 4
      src/epgsql.erl
  4. 99 6
      src/epgsql_sock.erl
  5. 7 1
      src/epgsql_wire.erl
  6. 107 0
      streaming.md
  7. 121 0
      test/epgsql_replication_tests.erl
  8. 1 0
      test_data/test_schema.sql

+ 3 - 3
README.md

@@ -68,8 +68,9 @@ see `CHANGES` for full list.
     {ssl,      IsEnabled  :: boolean() | required} |
     {ssl,      IsEnabled  :: boolean() | required} |
     {ssl_opts, SslOptions :: [ssl:ssl_option()]}   | % @see OTP ssl app, ssl_api.hrl
     {ssl_opts, SslOptions :: [ssl:ssl_option()]}   | % @see OTP ssl app, ssl_api.hrl
     {timeout,  TimeoutMs  :: timeout()}            | % default: 5000 ms
     {timeout,  TimeoutMs  :: timeout()}            | % default: 5000 ms
-    {async,    Receiver   :: pid() | atom()}. % process to receive LISTEN/NOTIFY msgs
-
+    {async,    Receiver   :: pid() | atom()}       | % process to receive LISTEN/NOTIFY msgs
+    {replication, Replication :: string()}. % Pass "database" to connect in replication mode
+    
 -spec connect(host(), string(), string(), [connect_option()])
 -spec connect(host(), string(), string(), [connect_option()])
         -> {ok, Connection :: connection()} | {error, Reason :: connect_error()}.    
         -> {ok, Connection :: connection()} | {error, Reason :: connect_error()}.    
 %% @doc connects to Postgres
 %% @doc connects to Postgres
@@ -396,7 +397,6 @@ example:
 - `{C, Ref, {complete, _Type}}`
 - `{C, Ref, {complete, _Type}}`
 - `{C, Ref, done}` - execution of all queries from Batch has finished
 - `{C, Ref, done}` - execution of all queries from Batch has finished
 
 
-
 ## Data Representation
 ## Data Representation
 PG type       | Representation
 PG type       | Representation
 --------------|-------------------------------------
 --------------|-------------------------------------

+ 4 - 0
setup_test_db.sh

@@ -13,6 +13,9 @@ cat > datadir/postgresql.conf <<EOF
 ssl = on
 ssl = on
 ssl_ca_file = 'root.crt'
 ssl_ca_file = 'root.crt'
 lc_messages = 'en_US.UTF-8'
 lc_messages = 'en_US.UTF-8'
+wal_level = 'logical'
+max_replication_slots = 15
+max_wal_senders = 15
 EOF
 EOF
 
 
 cp test_data/epgsql.crt datadir/server.crt
 cp test_data/epgsql.crt datadir/server.crt
@@ -30,5 +33,6 @@ host    epgsql_test_db1 epgsql_test             127.0.0.1/32    trust
 host    epgsql_test_db1 epgsql_test_md5         127.0.0.1/32    md5
 host    epgsql_test_db1 epgsql_test_md5         127.0.0.1/32    md5
 host    epgsql_test_db1 epgsql_test_cleartext   127.0.0.1/32    password
 host    epgsql_test_db1 epgsql_test_cleartext   127.0.0.1/32    password
 hostssl epgsql_test_db1 epgsql_test_cert        127.0.0.1/32    cert clientcert=1
 hostssl epgsql_test_db1 epgsql_test_cert        127.0.0.1/32    cert clientcert=1
+host    replication     epgsql_test_replication 127.0.0.1/32    trust
 EOF
 EOF
 
 

+ 46 - 4
src/epgsql.erl

@@ -21,7 +21,10 @@
          update_type_cache/1,
          update_type_cache/1,
          update_type_cache/2,
          update_type_cache/2,
          with_transaction/2,
          with_transaction/2,
-         sync_on_error/2]).
+         sync_on_error/2,
+         standby_status_update/3,
+         start_replication/5,
+         start_replication/6]).
 
 
 -export_type([connection/0, connect_option/0,
 -export_type([connection/0, connect_option/0,
               connect_error/0, query_error/0,
               connect_error/0, query_error/0,
@@ -39,7 +42,9 @@
     {ssl,      IsEnabled  :: boolean() | required} |
     {ssl,      IsEnabled  :: boolean() | required} |
     {ssl_opts, SslOptions :: [ssl:ssl_option()]}   | % @see OTP ssl app, ssl_api.hrl
     {ssl_opts, SslOptions :: [ssl:ssl_option()]}   | % @see OTP ssl app, ssl_api.hrl
     {timeout,  TimeoutMs  :: timeout()}            | % default: 5000 ms
     {timeout,  TimeoutMs  :: timeout()}            | % default: 5000 ms
-    {async,    Receiver   :: pid()}. % process to receive LISTEN/NOTIFY msgs
+    {async,    Receiver   :: pid()}                | % process to receive LISTEN/NOTIFY msgs
+    {replication, Replication :: string()}. % Pass "database" to connect in replication mode
+
 -type connect_error() :: #error{}.
 -type connect_error() :: #error{}.
 -type query_error() :: #error{}.
 -type query_error() :: #error{}.
 
 
@@ -68,6 +73,15 @@
     {ok, Count :: non_neg_integer(), ColumnsDescription :: [#column{}], RowsValues :: [RowType]}. % update/insert/delete + returning
     {ok, Count :: non_neg_integer(), ColumnsDescription :: [#column{}], RowsValues :: [RowType]}. % update/insert/delete + returning
 -type error_reply() :: {error, query_error()}.
 -type error_reply() :: {error, query_error()}.
 -type reply(RowType) :: ok_reply(RowType) | error_reply().
 -type reply(RowType) :: ok_reply(RowType) | error_reply().
+-type lsn() :: integer().
+-type cb_state() :: term().
+
+%% -- behaviour callbacks --
+
+%% Handles a XLogData Message (StartLSN, EndLSN, WALRecord, CbState).
+%% Return: {ok, LastFlushedLSN, LastAppliedLSN, NewCbState}
+-callback handle_x_log_data(lsn(), lsn(), binary(), cb_state()) -> {ok, lsn(), lsn(), cb_state()}.
+%% -------------
 
 
 %% -- client interface --
 %% -- client interface --
 connect(Settings) ->
 connect(Settings) ->
@@ -103,8 +117,12 @@ connect(C, Host, Username, Password, Opts) ->
                          {connect, Host, Username, Password, Opts},
                          {connect, Host, Username, Password, Opts},
                          infinity) of
                          infinity) of
         connected ->
         connected ->
-            update_type_cache(C),
-            {ok, C};
+            case proplists:get_value(replication, Opts, undefined) of
+                undefined ->
+                    update_type_cache(C),
+                    {ok, C};
+                _ -> {ok, C} %% do not update update_type_cache if connection is in replication mode
+            end;
         Error = {error, _} ->
         Error = {error, _} ->
             Error
             Error
     end.
     end.
@@ -274,3 +292,27 @@ sync_on_error(C, Error = {error, _}) ->
 
 
 sync_on_error(_C, R) ->
 sync_on_error(_C, R) ->
     R.
     R.
+
+-spec standby_status_update(connection(), lsn(), lsn()) -> ok | error_reply().
+%% @doc sends last flushed and applied WAL positions to the server in a standby status update message via given `Connection'
+standby_status_update(Connection, FlushedLSN, AppliedLSN) ->
+    gen_server:call(Connection, {standby_status_update, FlushedLSN, AppliedLSN}).
+
+-spec start_replication(connection(), string(), Callback, cb_state(), string(), string()) -> ok | error_reply() when
+    Callback :: module() | pid().
+%% @doc instructs Postgres server to start streaming WAL for logical replication
+%% where
+%% `Connection'      - connection in replication mode
+%% `ReplicationSlot' - the name of the replication slot to stream changes from
+%% `Callback'        - Callback module which should have the callback functions implemented for message processing.
+%%                      or a process which should be able to receive replication messages.
+%% `CbInitState'     - Callback Module's initial state
+%% `WALPosition'     - the WAL position XXX/XXX to begin streaming at.
+%%                      "0/0" to let the server determine the start point.
+%% `PluginOpts'      - optional options passed to the slot's logical decoding plugin.
+%%                      For example: "option_name1 'value1', option_name2 'value2'"
+%% returns `ok' otherwise `{error, Reason}'
+start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts) ->
+    gen_server:call(Connection, {start_replication, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts}).
+start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition) ->
+    start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, []).

+ 99 - 6
src/epgsql_sock.erl

@@ -57,6 +57,13 @@
 -define(PARAMETER_DESCRIPTION, $t).
 -define(PARAMETER_DESCRIPTION, $t).
 -define(ROW_DESCRIPTION, $T).
 -define(ROW_DESCRIPTION, $T).
 -define(READY_FOR_QUERY, $Z).
 -define(READY_FOR_QUERY, $Z).
+-define(COPY_BOTH_RESPONSE, $W).
+-define(COPY_DATA, $d).
+
+% CopyData replication messages
+-define(X_LOG_DATA, $w).
+-define(PRIMARY_KEEPALIVE_MESSAGE, $k).
+-define(STANDBY_STATUS_UPDATE, $r).
 
 
 -record(state, {mod,
 -record(state, {mod,
                 sock,
                 sock,
@@ -73,7 +80,14 @@
                 results = [],
                 results = [],
                 batch = [],
                 batch = [],
                 sync_required,
                 sync_required,
-                txstatus}).
+                txstatus,
+                repl_last_received_lsn,
+                repl_last_flushed_lsn,
+                repl_last_applied_lsn,
+                repl_feedback_required,
+                repl_cbmodule,
+                repl_cbstate,
+                repl_receiver}).
 
 
 %% -- client interface --
 %% -- client interface --
 
 
@@ -113,6 +127,11 @@ handle_call({get_parameter, Name}, _From, State) ->
 handle_call({set_async_receiver, PidOrName}, _From, #state{async = Previous} = State) ->
 handle_call({set_async_receiver, PidOrName}, _From, #state{async = Previous} = State) ->
     {reply, {ok, Previous}, State#state{async = PidOrName}};
     {reply, {ok, Previous}, State#state{async = PidOrName}};
 
 
+handle_call({standby_status_update, FlushedLSN, AppliedLSN}, _From,
+    #state{repl_last_received_lsn = ReceivedLSN} = State) ->
+    send(State, ?COPY_DATA, epgsql_wire:encode_standby_status_update(ReceivedLSN, FlushedLSN, AppliedLSN)),
+    {reply, ok, State#state{repl_last_flushed_lsn = FlushedLSN, repl_last_applied_lsn = AppliedLSN}};
+
 handle_call(Command, From, State) ->
 handle_call(Command, From, State) ->
     #state{queue = Q} = State,
     #state{queue = Q} = State,
     Req = {{call, From}, Command},
     Req = {{call, From}, Command},
@@ -201,7 +220,13 @@ command({connect, Host, Username, Password, Opts}, State) ->
                 undefined -> Opts2;
                 undefined -> Opts2;
                 Database  -> [Opts2 | ["database", 0, Database, 0]]
                 Database  -> [Opts2 | ["database", 0, Database, 0]]
             end,
             end,
-            send(State2, [<<196608:?int32>>, Opts3, 0]),
+
+            Opts4 = case proplists:get_value(replication, Opts, undefined) of
+                        undefined -> Opts3;
+                        Replication  -> [Opts3 | ["replication", 0, Replication, 0]]
+                    end,
+
+            send(State2, [<<196608:?int32>>, Opts4, 0]),
             Async   = proplists:get_value(async, Opts, undefined),
             Async   = proplists:get_value(async, Opts, undefined),
             setopts(State2, [{active, true}]),
             setopts(State2, [{active, true}]),
             put(username, Username),
             put(username, Username),
@@ -316,7 +341,29 @@ command({close, Type, Name}, State) ->
 
 
 command(sync, State) ->
 command(sync, State) ->
     send(State, ?SYNC, []),
     send(State, ?SYNC, []),
-    {noreply, State#state{sync_required = false}}.
+    {noreply, State#state{sync_required = false}};
+
+command({start_replication, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts}, State) ->
+    Sql1 = ["START_REPLICATION SLOT """, ReplicationSlot, """ LOGICAL ", WALPosition],
+    Sql2 =
+        case PluginOpts of
+            [] -> Sql1;
+            PluginOpts -> [Sql1 , " (", PluginOpts, ")"]
+        end,
+
+    State2 =
+        case Callback of
+            Pid when is_pid(Pid) -> State#state{repl_receiver = Pid};
+            Module -> State#state{repl_cbmodule = Module, repl_cbstate = CbInitState}
+        end,
+
+    Hex = [H || H <- WALPosition, H =/= $/],
+    {ok, [LSN], _} = io_lib:fread("~16u", Hex),
+
+    State3 = State2#state{repl_last_flushed_lsn = LSN, repl_last_applied_lsn = LSN},
+
+    send(State3, ?SIMPLEQUERY, [Sql2, 0]),
+    {noreply, State3}.
 
 
 start_ssl(S, Flag, Opts, State) ->
 start_ssl(S, Flag, Opts, State) ->
     ok = gen_tcp:send(S, <<8:?int32, 80877103:?int32>>),
     ok = gen_tcp:send(S, <<8:?int32, 80877103:?int32>>),
@@ -364,7 +411,9 @@ do_send(gen_tcp, Sock, Bin) ->
 do_send(Mod, Sock, Bin) ->
 do_send(Mod, Sock, Bin) ->
     Mod:send(Sock, Bin).
     Mod:send(Sock, Bin).
 
 
-loop(#state{data = Data, handler = Handler} = State) ->
+loop(#state{data = Data, handler = Handler, repl_last_received_lsn = LastReceivedLSN,
+    repl_last_flushed_lsn = LastFlushedLSN, repl_last_applied_lsn = LastAppliedLSN,
+    repl_feedback_required = ReplFeedbackRequired} = State) ->
     case epgsql_wire:decode_message(Data) of
     case epgsql_wire:decode_message(Data) of
         {Message, Tail} ->
         {Message, Tail} ->
             case ?MODULE:Handler(Message, State#state{data = Tail}) of
             case ?MODULE:Handler(Message, State#state{data = Tail}) of
@@ -374,7 +423,15 @@ loop(#state{data = Data, handler = Handler} = State) ->
                     R
                     R
             end;
             end;
         _ ->
         _ ->
-            {noreply, State}
+            %% in replication mode send feedback after each batch of messages
+            case ReplFeedbackRequired of
+                true ->
+                    send(State, ?COPY_DATA, epgsql_wire:encode_standby_status_update(
+                        LastReceivedLSN, LastFlushedLSN, LastAppliedLSN)),
+                    {noreply, State#state{repl_feedback_required = false}};
+                _ ->
+                    {noreply, State}
+            end
     end.
     end.
 
 
 finish(State, Result) ->
 finish(State, Result) ->
@@ -752,4 +809,40 @@ on_message({?NOTIFICATION, <<Pid:?int32, Strings/binary>>}, State) ->
         [Channel]          -> {Channel, <<>>}
         [Channel]          -> {Channel, <<>>}
     end,
     end,
     notify_async(State, {notification, Channel1, Pid, Payload1}),
     notify_async(State, {notification, Channel1, Pid, Payload1}),
-    {noreply, State}.
+    {noreply, State};
+
+%% CopyBothResponse
+on_message({?COPY_BOTH_RESPONSE, _Data}, State) ->
+    State2 = finish(State, ok),
+    {noreply, State2};
+
+%% CopyData for COPY command. COPY command not supported yet.
+on_message({?COPY_DATA, _Data}, #state{repl_cbmodule = undefined, repl_receiver = undefined} = State) ->
+    {stop, {error, copy_command_not_supported}, State};
+
+%% CopyData for Replication mode
+on_message({?COPY_DATA, <<?PRIMARY_KEEPALIVE_MESSAGE:8, LSN:?int64, _Timestamp:?int64, ReplyRequired:8>>},
+    #state{repl_last_flushed_lsn = LastFlushedLSN, repl_last_applied_lsn = LastAppliedLSN} = State) ->
+    case ReplyRequired of
+        1 ->
+            send(State, ?COPY_DATA,
+                epgsql_wire:encode_standby_status_update(LSN, LastFlushedLSN, LastAppliedLSN)),
+            {noreply, State#state{repl_feedback_required = false, repl_last_received_lsn = LSN}};
+        _ ->
+            {noreply, State#state{repl_feedback_required = true, repl_last_received_lsn = LSN}}
+    end;
+
+%% CopyData for Replication mode. with async messages
+on_message({?COPY_DATA, <<?X_LOG_DATA, StartLSN:?int64, EndLSN:?int64, _Timestamp:?int64, WALRecord/binary>>},
+    #state{repl_cbmodule = undefined, repl_receiver = Receiver} = State) ->
+    Receiver ! {epgsql, self(), {x_log_data, StartLSN, EndLSN, WALRecord}},
+    {noreply, State#state{repl_feedback_required = true, repl_last_received_lsn = EndLSN}};
+
+%% CopyData for Replication mode. with callback method
+on_message({?COPY_DATA, <<?X_LOG_DATA, StartLSN:?int64, EndLSN:?int64, _Timestamp:?int64, WALRecord/binary>>},
+    #state{repl_cbmodule = CbModule, repl_cbstate = CbState, repl_receiver = undefined} = State) ->
+    {ok, LastFlushedLSN, LastAppliedLSN, NewCbState} =
+        CbModule:handle_x_log_data(StartLSN, EndLSN, WALRecord, CbState),
+    {noreply, State#state{repl_feedback_required = true, repl_last_received_lsn = EndLSN,
+        repl_last_flushed_lsn = LastFlushedLSN, repl_last_applied_lsn = LastAppliedLSN,
+        repl_cbstate = NewCbState}}.

+ 7 - 1
src/epgsql_wire.erl

@@ -14,7 +14,8 @@
          encode_types/2,
          encode_types/2,
          encode_formats/1,
          encode_formats/1,
          format/1,
          format/1,
-         encode_parameters/2]).
+         encode_parameters/2,
+         encode_standby_status_update/3]).
 
 
 -include("epgsql.hrl").
 -include("epgsql.hrl").
 -include("epgsql_binary.hrl").
 -include("epgsql_binary.hrl").
@@ -206,3 +207,8 @@ encode_parameter(L) when is_list(L)    -> {0, encode_list(L)}.
 encode_list(L) ->
 encode_list(L) ->
     Bin = list_to_binary(L),
     Bin = list_to_binary(L),
     <<(byte_size(Bin)):?int32, Bin/binary>>.
     <<(byte_size(Bin)):?int32, Bin/binary>>.
+
+encode_standby_status_update(ReceivedLSN, FlushedLSN, AppliedLSN) ->
+    {MegaSecs, Secs, MicroSecs} = erlang:now(),
+    Timestamp = ((MegaSecs * 1000000 + Secs) * 1000000 + MicroSecs) - 946684800*1000000, %% microseconds since midnight on 2000-01-01
+    <<$r:8, ReceivedLSN:?int64, FlushedLSN:?int64, AppliedLSN:?int64, Timestamp:?int64, 0:8>>.

+ 107 - 0
streaming.md

@@ -0,0 +1,107 @@
+# Streaming replication protocol
+
+EPGSQL supports PostgreSQL streaming replication protocol.
+See https://www.postgresql.org/docs/current/static/protocol-replication.html
+
+## Use cases
+***Consistent cache***
+
+Erlang application uses in read-only mode data from PostgreSql DB tables 
+(e.g. some configuration, rate/price plans, subscriber’s profiles) 
+and needs to store the data in some internal structure (e.g. ETS table). 
+The data in PostgreSql DB is updated by some third party applications. 
+Erlang application needs to know about modifications in a near real-time mode. 
+During start Erlang application uploads initial data from the tables 
+and subscribes to receive any modifications for data in these tables and update internal ETS tables accordingly.
+Using special output plugins for replication slots (for example: pglogical_output) 
+we can filter tables and receive only changes for some particular tables.
+
+***Consistent internal DB***
+
+This case is similar to previous “Consistent cache”, 
+but the difference is that amount of data in the table is huge 
+and it is inefficient and takes too much time to upload data every time on startup, 
+so Erlang application stores copy of data in some persistent internal storage(mnesia, riak, files). 
+In this case application uploads initial data from the table only once during the first start. 
+Afterwards it receives modifications and apply it in it’s storage. 
+Even if application is down it will not lose any changes from DB 
+and will receive all changes when it starts again.
+
+
+##Usage
+1. Initiate connection in replication mode.
+
+    To initiate streaming replication connection, `replication` parameter with 
+the value "database" should be set in the `epgsql:connect`.
+Only simple queries `squery` can be used in replication mode and 
+only special commands accepted in this mode 
+(e.g DROP_REPLICATION_SLOT, CREATE_REPLICATION_SLOT, IDENTIFY_SYSTEM).
+
+    Replication commands as well as replication connection mode available only in epgsql and not in epgsqla, epgsqli.
+
+2. Create replication slot. 
+
+    Replication slot will be updated as replication progresses so that the PostgreSQL server knows 
+    which WAL segments are still needed by the standby.
+    
+    Use `epgsql:squery` to send CREATE_REPLICATION_SLOT command.
+    
+    When a new replication slot is created, a snapshot is exported, 
+    which will show exactly the state of the database 
+    after which all changes will be included in the change stream. 
+    This snapshot can be used to create a new replica by using SET TRANSACTION SNAPSHOT 
+    to read the state of the database at the moment the slot was created. 
+    
+    Note: you have to create new connection with the not-replciation mode to select initial state of tables, 
+    since you cannot run SELECT in replication mode connection.
+
+3. Start replication.
+
+    Use `epgsql:start_replication` to start streaming. 
+
+    ```erlang
+    ok = epgsql:start_replication(Connection, ReplicationSlot, Callback, CbInitState, 
+            WALPosition, PluginOpts).
+    ```
+    - `Connection`           - connection in replication mode
+    - `ReplicationSlot`      - the name of the replication slot to stream changes from
+    - `Callback`             - callback module which should have the callback functions
+                                or a process which should be able to receive replication messages.
+    - `CbInitState`          - initial state of callback module. 
+    - `WALPosition`          - the WAL position XXX/XXX to begin streaming at.
+                               "0/0" to let the server determine the start point.
+    - `PluginOpts`           - optional options passed to the slot's logical decoding plugin. 
+    For example: "option_name1 'value1', option_name2 'value2'"
+
+    On success, PostgreSQL server responds with a CopyBothResponse message, and then starts to stream WAL records.
+    PostgreSQL sends CopyData messages which contain:
+    - Keepalive message. *epgsql* answers with standby status update message, to avoid a timeout disconnect.
+    - XLogData message. *epgsql* calls `CallbackModule:handle_x_log_data` for each XLogData 
+    or sends async messages `{epgsql, self(), {x_log_data, StartLSN, EndLSN, WALRecord}}`. 
+    In case of async mode, the receiving process should report last processed LSN by calling 
+    `standby_status_update(Connection, FlushedLSN, AppliedLSN)`.
+
+    ```erlang
+    %% Handles a XLogData Message (StartLSN, EndLSN, WALRecord, CbState).
+    %% Return: {ok, LastFlushedLSN, LastAppliedLSN, NewCbState}
+    -callback handle_x_log_data(lsn(), lsn(), binary(), cb_state()) -> {ok, lsn(), lsn(), cb_state()}.
+     ```
+ 
+Example:
+
+```erlang
+start_replication() -> 
+    {ok, C} = epgsql:connect("localhost", "username", "psss", [
+                {database, "test_db"},
+                {replication, "database"}
+            ]),
+    
+    Res = epgsql:squery(C, "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
+    io:format("~p~n", [Res]),
+    
+    ok = epgsql:start_replication(C, "epgsql_test", ?MODULE, [], "0/0").
+
+handle_x_log_data(StartLSN, EndLSN, Data, CbState) ->
+    io:format("~p~n", [{StartLSN, EndLSN, Data}]),
+    {ok, EndLSN, EndLSN, CbState}.
+```

+ 121 - 0
test/epgsql_replication_tests.erl

@@ -0,0 +1,121 @@
+-module(epgsql_replication_tests).
+
+-export([run_tests/0]).
+-compile([export_all]).
+
+-include_lib("eunit/include/eunit.hrl").
+-include("epgsql.hrl").
+
+connect_in_repl_mode_test(Module) ->
+    epgsql_tests:connect_only(Module, ["epgsql_test_replication",
+        "epgsql_test_replication",
+        [{database, "epgsql_test_db1"}, {replication, "database"}]]).
+
+create_drop_replication_slot_test(Module) ->
+    epgsql_tests:with_connection(
+        Module,
+        fun(C) ->
+            {ok, Cols, Rows} = Module:squery(C, "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
+            [#column{name = <<"slot_name">>}, #column{name = <<"consistent_point">>},
+                #column{name = <<"snapshot_name">>}, #column{name = <<"output_plugin">>}] = Cols,
+            [{<<"epgsql_test">>, _, _, <<"test_decoding">>}] = Rows,
+            [{ok, _, _}, {ok, _, _}] = Module:squery(C, "DROP_REPLICATION_SLOT ""epgsql_test""")
+        end,
+        "epgsql_test_replication",
+        [{replication, "database"}]).
+
+replication_async_test(Module) ->
+    replication_test_run(Module, self()).
+
+replication_sync_test(Module) ->
+    replication_test_run(Module, ?MODULE).
+
+%% -- run all tests --
+
+run_tests() ->
+    Files = filelib:wildcard(filename:dirname(code:which(epgsql_replication_tests))
+                             ++ "/*tests.beam"),
+    Mods = [list_to_atom(filename:basename(F, ".beam")) || F <- Files],
+    eunit:test(Mods, []).
+
+all_test_() ->
+    Tests =
+        lists:map(
+          fun({Name, _}) ->
+                  {Name, fun(X) -> ?MODULE:Name(X) end}
+          end,
+          lists:filter(
+            fun({Name, Arity}) ->
+                    case {lists:suffix("_test", atom_to_list(Name)), Arity} of
+                        {true, 1} -> true;
+                        _ -> false
+                    end
+            end,
+            ?MODULE:module_info(functions))),
+    WithModule =
+        fun(Module) ->
+                lists:map(
+                  fun({Name, Test}) ->
+                          {lists:flatten(
+                             io_lib:format("~s(~s)", [Name, Module])),
+                           fun() -> Test(Module) end}
+                  end,
+                  Tests)
+        end,
+    [WithModule(epgsql)
+    ].
+
+%% -- internal functions --
+
+replication_test_run(Module, Callback) ->
+    epgsql_tests:with_connection(
+        Module,
+        fun(C) ->
+            {ok, _, _} = Module:squery(C, "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
+
+            %% new connection because main id in a replication mode
+            epgsql_tests:with_connection(
+                Module,
+                fun(C2) ->
+                    [{ok, 1},{ok, 1}] = Module:squery(C2,
+                        "insert into test_table1 (id, value) values (5, 'five');delete from test_table1 where id = 5;")
+                end,
+                "epgsql_test_db1"),
+
+            Module:start_replication(C, "epgsql_test", Callback, {C, self()}, "0/0"),
+            ok = receive_replication_msgs(
+                [<<"table public.test_table1: INSERT: id[integer]:5 value[text]:'five'">>,
+                    <<"table public.test_table1: DELETE: id[integer]:5">>], C, [])
+        end,
+        "epgsql_test_replication",
+        [{replication, "database"}]),
+    %% cleanup
+    epgsql_tests:with_connection(
+        Module,
+        fun(C) ->
+            [{ok, _, _}, {ok, _, _}] = Module:squery(C, "DROP_REPLICATION_SLOT ""epgsql_test""")
+        end,
+        "epgsql_test_replication",
+        [{replication, "database"}]).
+
+receive_replication_msgs(Pattern, Pid, ReceivedMsgs) ->
+    receive
+        {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"BEGIN", _/binary>>}} ->
+            receive_replication_msgs(Pattern, Pid, [begin_msg | ReceivedMsgs]);
+        {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"COMMIT", _/binary>>}} ->
+            case lists:reverse(ReceivedMsgs) of
+                [begin_msg, row_msg | _] -> ok;
+                _ -> error_replication_messages_not_received
+            end;
+        {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, Msg}} ->
+            [Msg | T] = Pattern,
+            receive_replication_msgs(T, Pid, [row_msg | ReceivedMsgs])
+    after
+        60000 ->
+            error_timeout
+    end.
+
+handle_x_log_data(StartLSN, EndLSN, Data, CbState) ->
+    {C, Pid} = CbState,
+    Pid ! {epgsql, C, {x_log_data, StartLSN, EndLSN, Data}},
+    {ok, EndLSN, EndLSN, CbState}.

+ 1 - 0
test_data/test_schema.sql

@@ -11,6 +11,7 @@ CREATE USER epgsql_test;
 CREATE USER epgsql_test_md5 WITH PASSWORD 'epgsql_test_md5';
 CREATE USER epgsql_test_md5 WITH PASSWORD 'epgsql_test_md5';
 CREATE USER epgsql_test_cleartext WITH PASSWORD 'epgsql_test_cleartext';
 CREATE USER epgsql_test_cleartext WITH PASSWORD 'epgsql_test_cleartext';
 CREATE USER epgsql_test_cert;
 CREATE USER epgsql_test_cert;
+CREATE USER epgsql_test_replication WITH REPLICATION PASSWORD 'epgsql_test_replication';
 
 
 DROP DATABASE epgsql_test_db1;
 DROP DATABASE epgsql_test_db1;
 DROP DATABASE epgsql_test_db2;
 DROP DATABASE epgsql_test_db2;