epgsql_cmd_start_replication.erl 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. %% > SimpleQuery "START_REPLICATION ..."
  2. %% < CopyBothResponse | Error
  3. -module(epgsql_cmd_start_replication).
  4. -behaviour(epgsql_command).
  5. -export([init/1, execute/2, handle_message/4]).
  6. -export_type([response/0]).
  7. -type response() :: ok | {error, epgsql:query_error()}.
  8. -include("epgsql.hrl").
  9. -include("protocol.hrl").
  10. -include("../epgsql_replication.hrl").
  11. -record(start_repl,
  12. {slot,
  13. callback,
  14. cb_state,
  15. wal_pos,
  16. plugin_opts}).
  17. init({ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts}) ->
  18. #start_repl{slot = ReplicationSlot,
  19. callback = Callback,
  20. cb_state = CbInitState,
  21. wal_pos = WALPosition,
  22. plugin_opts = PluginOpts}.
  23. execute(Sock, #start_repl{slot = ReplicationSlot, callback = Callback,
  24. cb_state = CbInitState, wal_pos = WALPosition,
  25. plugin_opts = PluginOpts} = St) ->
  26. %% Connection should be started with 'replication' option. Then
  27. %% 'replication_state' will be initialized
  28. Repl = #repl{} = epgsql_sock:get_replication_state(Sock),
  29. Sql1 = ["START_REPLICATION SLOT ", ReplicationSlot, " LOGICAL ", WALPosition],
  30. Sql2 =
  31. case PluginOpts of
  32. [] -> Sql1;
  33. PluginOpts -> [Sql1 , " (", PluginOpts, ")"]
  34. end,
  35. Repl2 =
  36. case Callback of
  37. Pid when is_pid(Pid) -> Repl#repl{receiver = Pid};
  38. Module -> Repl#repl{cbmodule = Module, cbstate = CbInitState}
  39. end,
  40. Hex = [H || H <- WALPosition, H =/= $/],
  41. {ok, [LSN], _} = io_lib:fread("~16u", Hex),
  42. Repl3 = Repl2#repl{last_flushed_lsn = LSN,
  43. last_applied_lsn = LSN},
  44. Sock2 = epgsql_sock:set_attr(replication_state, Repl3, Sock),
  45. %% handler = on_replication},
  46. epgsql_sock:send(Sock2, ?SIMPLEQUERY, [Sql2, 0]),
  47. {ok, Sock2, St}.
  48. %% CopyBothResponse
  49. handle_message(?COPY_BOTH_RESPONSE, _Data, Sock, _State) ->
  50. {finish, ok, ok, epgsql_sock:set_packet_handler(on_replication, Sock)};
  51. handle_message(?ERROR, Error, _Sock, _State) ->
  52. Result = {error, Error},
  53. {sync_required, Result};
  54. handle_message(_, _, _, _) ->
  55. unknown.