1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465 |
- %% > SimpleQuery "START_REPLICATION ..."
- %% < CopyBothResponse | Error
- -module(epgsql_cmd_start_replication).
- -behaviour(epgsql_command).
- -export([init/1, execute/2, handle_message/4]).
- -export_type([response/0]).
- -type response() :: ok | {error, epgsql:query_error()}.
- -include("epgsql.hrl").
- -include("protocol.hrl").
- -include("../epgsql_replication.hrl").
- -record(start_repl,
- {slot,
- callback,
- cb_state,
- wal_pos,
- plugin_opts}).
- init({ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts}) ->
- #start_repl{slot = ReplicationSlot,
- callback = Callback,
- cb_state = CbInitState,
- wal_pos = WALPosition,
- plugin_opts = PluginOpts}.
- execute(Sock, #start_repl{slot = ReplicationSlot, callback = Callback,
- cb_state = CbInitState, wal_pos = WALPosition,
- plugin_opts = PluginOpts} = St) ->
- %% Connection should be started with 'replication' option. Then
- %% 'replication_state' will be initialized
- Repl = #repl{} = epgsql_sock:get_replication_state(Sock),
- Sql1 = ["START_REPLICATION SLOT ", ReplicationSlot, " LOGICAL ", WALPosition],
- Sql2 =
- case PluginOpts of
- [] -> Sql1;
- PluginOpts -> [Sql1 , " (", PluginOpts, ")"]
- end,
- Repl2 =
- case Callback of
- Pid when is_pid(Pid) -> Repl#repl{receiver = Pid};
- Module -> Repl#repl{cbmodule = Module, cbstate = CbInitState}
- end,
- Hex = [H || H <- WALPosition, H =/= $/],
- {ok, [LSN], _} = io_lib:fread("~16u", Hex),
- Repl3 = Repl2#repl{last_flushed_lsn = LSN,
- last_applied_lsn = LSN},
- Sock2 = epgsql_sock:set_attr(replication_state, Repl3, Sock),
- %% handler = on_replication},
- epgsql_sock:send(Sock2, ?SIMPLEQUERY, [Sql2, 0]),
- {ok, Sock2, St}.
- %% CopyBothResponse
- handle_message(?COPY_BOTH_RESPONSE, _Data, Sock, _State) ->
- {finish, ok, ok, epgsql_sock:set_packet_handler(on_replication, Sock)};
- handle_message(?ERROR, Error, _Sock, _State) ->
- Result = {error, Error},
- {sync_required, Result};
- handle_message(_, _, _, _) ->
- unknown.
|