epgsql_cmd_start_replication.erl 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. %% @doc Requests server to start sending replication packets
  2. %%
  3. %% See {@link epgsql:connect/1} `replication' parameter.
  4. %% ```
  5. %% > SimpleQuery "START_REPLICATION ..."
  6. %% < CopyBothResponse | Error
  7. %% '''
  8. -module(epgsql_cmd_start_replication).
  9. -behaviour(epgsql_command).
  10. -export([init/1, execute/2, handle_message/4]).
  11. -export_type([response/0]).
  12. -type response() :: ok | {error, epgsql:query_error()}.
  13. -include("epgsql.hrl").
  14. -include("protocol.hrl").
  15. -include("../epgsql_replication.hrl").
  16. -record(start_repl,
  17. {slot,
  18. callback,
  19. cb_state,
  20. wal_pos,
  21. plugin_opts,
  22. opts}).
  23. init({ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts, Opts}) ->
  24. #start_repl{slot = ReplicationSlot,
  25. callback = Callback,
  26. cb_state = CbInitState,
  27. wal_pos = WALPosition,
  28. plugin_opts = PluginOpts,
  29. opts = Opts}.
  30. execute(Sock, #start_repl{slot = ReplicationSlot, callback = Callback,
  31. cb_state = CbInitState, wal_pos = WALPosition,
  32. plugin_opts = PluginOpts, opts = Opts} = St) ->
  33. %% Connection should be started with 'replication' option. Then
  34. %% 'replication_state' will be initialized
  35. Repl = #repl{} = epgsql_sock:get_replication_state(Sock),
  36. Sql1 = ["START_REPLICATION SLOT ", ReplicationSlot, " LOGICAL ", WALPosition],
  37. Sql2 =
  38. case PluginOpts of
  39. [] -> Sql1;
  40. PluginOpts -> [Sql1 , " (", PluginOpts, ")"]
  41. end,
  42. Repl2 =
  43. case Callback of
  44. Pid when is_pid(Pid) -> Repl#repl{receiver = Pid};
  45. Module -> Repl#repl{cbmodule = Module, cbstate = CbInitState}
  46. end,
  47. Hex = [H || H <- WALPosition, H =/= $/],
  48. {ok, [LSN], _} = io_lib:fread("~16u", Hex),
  49. AlignLsn = maps:get(align_lsn, Opts, false),
  50. Repl3 = Repl2#repl{last_flushed_lsn = LSN,
  51. last_applied_lsn = LSN,
  52. align_lsn = AlignLsn},
  53. Sock2 = epgsql_sock:set_attr(replication_state, Repl3, Sock),
  54. %% handler = on_replication},
  55. epgsql_sock:send(Sock2, ?SIMPLEQUERY, [Sql2, 0]),
  56. {ok, Sock2, St}.
  57. %% CopyBothResponse
  58. handle_message(?COPY_BOTH_RESPONSE, _Data, Sock, _State) ->
  59. {finish, ok, ok, epgsql_sock:set_packet_handler(on_replication, Sock)};
  60. handle_message(?ERROR, Error, _Sock, _State) ->
  61. Result = {error, Error},
  62. {sync_required, Result};
  63. handle_message(_, _, _, _) ->
  64. unknown.