epgsql_sock.erl 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578
  1. %%% Copyright (C) 2009 - Will Glozer. All rights reserved.
  2. %%% Copyright (C) 2011 - Anton Lebedevich. All rights reserved.
  3. %%% @doc GenServer holding all connection state (including socket).
  4. %%%
  5. %%% See https://www.postgresql.org/docs/current/static/protocol-flow.html
  6. %%% Commands in PostgreSQL are pipelined: you don't need to wait for reply to
  7. %%% be able to send next command.
  8. %%% Commands are processed (and responses to them are generated) in FIFO order.
  9. %%% eg, if you execute 2 SimpleQuery: #1 and #2, first you get all response
  10. %%% packets for #1 and then all for #2:
  11. %%% > SQuery #1
  12. %%% > SQuery #2
  13. %%% < RowDescription #1
  14. %%% < DataRow #1
  15. %%% < CommandComplete #1
  16. %%% < RowDescription #2
  17. %%% < DataRow #2
  18. %%% < CommandComplete #2
  19. %%%
  20. %%% See epgsql_cmd_connect for network connection and authentication setup
  21. -module(epgsql_sock).
  22. -behavior(gen_server).
  23. -export([start_link/0,
  24. close/1,
  25. sync_command/3,
  26. async_command/4,
  27. get_parameter/2,
  28. set_notice_receiver/2,
  29. get_cmd_status/1,
  30. cancel/1]).
  31. -export([handle_call/3, handle_cast/2, handle_info/2]).
  32. -export([init/1, code_change/3, terminate/2]).
  33. %% loop callback
  34. -export([on_message/3, on_replication/3]).
  35. %% Comand's APIs
  36. -export([set_net_socket/3, init_replication_state/1, set_attr/3, get_codec/1,
  37. get_rows/1, get_results/1, notify/2, send/2, send/3, send_multi/2,
  38. get_parameter_internal/2,
  39. get_replication_state/1, set_packet_handler/2]).
  40. -export_type([transport/0, pg_sock/0]).
  41. -include("epgsql.hrl").
  42. -include("protocol.hrl").
  43. -include("epgsql_replication.hrl").
  44. -type transport() :: {call, any()}
  45. | {cast, pid(), reference()}
  46. | {incremental, pid(), reference()}.
  47. -type tcp_socket() :: port(). %gen_tcp:socket() isn't exported prior to erl 18
  48. -type repl_state() :: #repl{}.
  49. -record(state, {mod :: gen_tcp | ssl | undefined,
  50. sock :: tcp_socket() | ssl:sslsocket() | undefined,
  51. data = <<>>,
  52. backend :: {Pid :: integer(), Key :: integer()} | undefined,
  53. handler = on_message :: on_message | on_replication | undefined,
  54. codec :: epgsql_binary:codec() | undefined,
  55. queue = queue:new() :: queue:queue({epgsql_command:command(), any(), transport()}),
  56. current_cmd :: epgsql_command:command() | undefined,
  57. current_cmd_state :: any() | undefined,
  58. current_cmd_transport :: transport() | undefined,
  59. async :: undefined | atom() | pid(),
  60. parameters = [] :: [{Key :: binary(), Value :: binary()}],
  61. rows = [] :: [tuple()],
  62. results = [],
  63. sync_required :: boolean() | undefined,
  64. txstatus :: byte() | undefined, % $I | $T | $E,
  65. complete_status :: atom() | {atom(), integer()} | undefined,
  66. repl :: repl_state() | undefined}).
  67. -opaque pg_sock() :: #state{}.
  68. %% -- client interface --
  69. start_link() ->
  70. gen_server:start_link(?MODULE, [], []).
  71. close(C) when is_pid(C) ->
  72. catch gen_server:cast(C, stop),
  73. ok.
  74. -spec sync_command(epgsql:connection(), epgsql_command:command(), any()) -> any().
  75. sync_command(C, Command, Args) ->
  76. gen_server:call(C, {command, Command, Args}, infinity).
  77. -spec async_command(epgsql:connection(), cast | incremental,
  78. epgsql_command:command(), any()) -> reference().
  79. async_command(C, Transport, Command, Args) ->
  80. Ref = make_ref(),
  81. Pid = self(),
  82. ok = gen_server:cast(C, {{Transport, Pid, Ref}, Command, Args}),
  83. Ref.
  84. get_parameter(C, Name) ->
  85. gen_server:call(C, {get_parameter, to_binary(Name)}, infinity).
  86. set_notice_receiver(C, PidOrName) when is_pid(PidOrName);
  87. is_atom(PidOrName) ->
  88. gen_server:call(C, {set_async_receiver, PidOrName}, infinity).
  89. get_cmd_status(C) ->
  90. gen_server:call(C, get_cmd_status, infinity).
  91. cancel(S) ->
  92. gen_server:cast(S, cancel).
  93. %% -- command APIs --
  94. %% send()
  95. %% send_many()
  96. -spec set_net_socket(gen_tcp | ssl, tcp_socket() | ssl:sslsocket(), pg_sock()) -> pg_sock().
  97. set_net_socket(Mod, Socket, State) ->
  98. State1 = State#state{mod = Mod, sock = Socket},
  99. setopts(State1, [{active, true}]),
  100. State1.
  101. -spec init_replication_state(pg_sock()) -> pg_sock().
  102. init_replication_state(State) ->
  103. State#state{repl = #repl{}}.
  104. -spec set_attr(atom(), any(), pg_sock()) -> pg_sock().
  105. set_attr(backend, {_Pid, _Key} = Backend, State) ->
  106. State#state{backend = Backend};
  107. set_attr(async, Async, State) ->
  108. State#state{async = Async};
  109. set_attr(txstatus, Status, State) ->
  110. State#state{txstatus = Status};
  111. set_attr(codec, Codec, State) ->
  112. State#state{codec = Codec};
  113. set_attr(sync_required, Value, State) ->
  114. State#state{sync_required = Value};
  115. set_attr(replication_state, Value, State) ->
  116. State#state{repl = Value}.
  117. %% XXX: be careful!
  118. -spec set_packet_handler(atom(), pg_sock()) -> pg_sock().
  119. set_packet_handler(Handler, State) ->
  120. State#state{handler = Handler}.
  121. -spec get_codec(pg_sock()) -> epgsql_binary:codec().
  122. get_codec(#state{codec = Codec}) ->
  123. Codec.
  124. -spec get_replication_state(pg_sock()) -> repl_state().
  125. get_replication_state(#state{repl = Repl}) ->
  126. Repl.
  127. -spec get_rows(pg_sock()) -> [tuple()].
  128. get_rows(#state{rows = Rows}) ->
  129. lists:reverse(Rows).
  130. -spec get_results(pg_sock()) -> [any()].
  131. get_results(#state{results = Results}) ->
  132. lists:reverse(Results).
  133. -spec get_parameter_internal(binary(), pg_sock()) -> binary() | undefined.
  134. get_parameter_internal(Name, #state{parameters = Parameters}) ->
  135. case lists:keysearch(Name, 1, Parameters) of
  136. {value, {Name, Value}} -> Value;
  137. false -> undefined
  138. end.
  139. %% -- gen_server implementation --
  140. init([]) ->
  141. {ok, #state{}}.
  142. handle_call({get_parameter, Name}, _From, State) ->
  143. {reply, {ok, get_parameter_internal(Name, State)}, State};
  144. handle_call({set_async_receiver, PidOrName}, _From, #state{async = Previous} = State) ->
  145. {reply, {ok, Previous}, State#state{async = PidOrName}};
  146. handle_call(get_cmd_status, _From, #state{complete_status = Status} = State) ->
  147. {reply, {ok, Status}, State};
  148. handle_call({standby_status_update, FlushedLSN, AppliedLSN}, _From,
  149. #state{handler = on_replication,
  150. repl = #repl{last_received_lsn = ReceivedLSN} = Repl} = State) ->
  151. send(State, ?COPY_DATA, epgsql_wire:encode_standby_status_update(ReceivedLSN, FlushedLSN, AppliedLSN)),
  152. Repl1 = Repl#repl{last_flushed_lsn = FlushedLSN,
  153. last_applied_lsn = AppliedLSN},
  154. {reply, ok, State#state{repl = Repl1}};
  155. handle_call({command, Command, Args}, From, State) ->
  156. Transport = {call, From},
  157. command_new(Transport, Command, Args, State).
  158. handle_cast({{Method, From, Ref} = Transport, Command, Args}, State)
  159. when ((Method == cast) or (Method == incremental)),
  160. is_pid(From),
  161. is_reference(Ref) ->
  162. command_new(Transport, Command, Args, State);
  163. handle_cast(stop, State) ->
  164. {stop, normal, flush_queue(State, {error, closed})};
  165. handle_cast(cancel, State = #state{backend = {Pid, Key},
  166. sock = TimedOutSock}) ->
  167. {ok, {Addr, Port}} = case State#state.mod of
  168. gen_tcp -> inet:peername(TimedOutSock);
  169. ssl -> ssl:peername(TimedOutSock)
  170. end,
  171. SockOpts = [{active, false}, {packet, raw}, binary],
  172. %% TODO timeout
  173. {ok, Sock} = gen_tcp:connect(Addr, Port, SockOpts),
  174. Msg = <<16:?int32, 80877102:?int32, Pid:?int32, Key:?int32>>,
  175. ok = gen_tcp:send(Sock, Msg),
  176. gen_tcp:close(Sock),
  177. {noreply, State}.
  178. handle_info({Closed, Sock}, #state{sock = Sock} = State)
  179. when Closed == tcp_closed; Closed == ssl_closed ->
  180. {stop, sock_closed, flush_queue(State#state{sock = undefined}, {error, sock_closed})};
  181. handle_info({Error, Sock, Reason}, #state{sock = Sock} = State)
  182. when Error == tcp_error; Error == ssl_error ->
  183. Why = {sock_error, Reason},
  184. {stop, Why, flush_queue(State, {error, Why})};
  185. handle_info({inet_reply, _, ok}, State) ->
  186. {noreply, State};
  187. handle_info({inet_reply, _, Status}, State) ->
  188. {stop, Status, flush_queue(State, {error, Status})};
  189. handle_info({_, Sock, Data2}, #state{data = Data, sock = Sock} = State) ->
  190. loop(State#state{data = <<Data/binary, Data2/binary>>}).
  191. terminate(_Reason, #state{sock = undefined}) -> ok;
  192. terminate(_Reason, #state{mod = gen_tcp, sock = Sock}) -> gen_tcp:close(Sock);
  193. terminate(_Reason, #state{mod = ssl, sock = Sock}) -> ssl:close(Sock).
  194. code_change(_OldVsn, State, _Extra) ->
  195. {ok, State}.
  196. %% -- internal functions --
  197. -spec command_new(transport(), epgsql_command:command(), any(), pg_sock()) ->
  198. Result when
  199. Result :: {noreply, pg_sock()}
  200. | {stop, Reason :: any(), pg_sock()}.
  201. command_new(Transport, Command, Args, State) ->
  202. CmdState = epgsql_command:init(Command, Args),
  203. command_exec(Transport, Command, CmdState, State).
  204. -spec command_exec(transport(), epgsql_command:command(), any(), pg_sock()) ->
  205. Result when
  206. Result :: {noreply, pg_sock()}
  207. | {stop, Reason :: any(), pg_sock()}.
  208. command_exec(Transport, Command, _, State = #state{sync_required = true})
  209. when Command /= epgsql_cmd_sync ->
  210. {noreply,
  211. finish(State#state{current_cmd = Command,
  212. current_cmd_transport = Transport},
  213. {error, sync_required})};
  214. command_exec(Transport, Command, CmdState, State) ->
  215. case epgsql_command:execute(Command, State, CmdState) of
  216. {ok, State1, CmdState1} ->
  217. {noreply, command_enqueue(Transport, Command, CmdState1, State1)};
  218. {stop, StopReason, Response, State1} ->
  219. reply(Transport, Response, Response),
  220. {stop, StopReason, State1}
  221. end.
  222. -spec command_enqueue(transport(), epgsql_command:command(), epgsql_command:state(), pg_sock()) -> pg_sock().
  223. command_enqueue(Transport, Command, CmdState, #state{current_cmd = undefined} = State) ->
  224. State#state{current_cmd = Command,
  225. current_cmd_state = CmdState,
  226. current_cmd_transport = Transport,
  227. complete_status = undefined};
  228. command_enqueue(Transport, Command, CmdState, #state{queue = Q} = State) ->
  229. State#state{queue = queue:in({Command, CmdState, Transport}, Q),
  230. complete_status = undefined}.
  231. -spec command_handle_message(byte(), binary() | epgsql:query_error(), pg_sock()) ->
  232. {noreply, pg_sock()}
  233. | {stop, any(), pg_sock()}.
  234. command_handle_message(Msg, Payload,
  235. #state{current_cmd = Command,
  236. current_cmd_state = CmdState} = State) ->
  237. case epgsql_command:handle_message(Command, Msg, Payload, State, CmdState) of
  238. {add_row, Row, State1, CmdState1} ->
  239. {noreply, add_row(State1#state{current_cmd_state = CmdState1}, Row)};
  240. {add_result, Result, Notice, State1, CmdState1} ->
  241. {noreply,
  242. add_result(State1#state{current_cmd_state = CmdState1},
  243. Notice, Result)};
  244. {finish, Result, Notice, State1} ->
  245. {noreply, finish(State1, Notice, Result)};
  246. {noaction, State1} ->
  247. {noreply, State1};
  248. {noaction, State1, CmdState1} ->
  249. {noreply, State1#state{current_cmd_state = CmdState1}};
  250. {requeue, State1, CmdState1} ->
  251. Transport = State1#state.current_cmd_transport,
  252. command_exec(Transport, Command, CmdState1,
  253. State1#state{current_cmd = undefined});
  254. {stop, Reason, Response, State1} ->
  255. {stop, Reason, finish(State1, Response)};
  256. {sync_required, Why} ->
  257. %% Protocol error. Finish and flush all pending commands.
  258. {noreply, sync_required(finish(State#state{sync_required = true}, Why))};
  259. unknown ->
  260. {stop, {error, {unexpected_message, Msg, Command, CmdState}}, State}
  261. end.
  262. command_next(#state{current_cmd = PrevCmd,
  263. queue = Q} = State) when PrevCmd =/= undefined ->
  264. case queue:out(Q) of
  265. {empty, _} ->
  266. State#state{current_cmd = undefined,
  267. current_cmd_state = undefined,
  268. current_cmd_transport = undefined,
  269. rows = [],
  270. results = []};
  271. {{value, {Command, CmdState, Transport}}, Q1} ->
  272. State#state{current_cmd = Command,
  273. current_cmd_state = CmdState,
  274. current_cmd_transport = Transport,
  275. queue = Q1,
  276. rows = [],
  277. results = []}
  278. end.
  279. setopts(#state{mod = Mod, sock = Sock}, Opts) ->
  280. case Mod of
  281. gen_tcp -> inet:setopts(Sock, Opts);
  282. ssl -> ssl:setopts(Sock, Opts)
  283. end.
  284. %% This one only used in connection initiation to send client's
  285. %% `StartupMessage' and `SSLRequest' packets
  286. -spec send(pg_sock(), iodata()) -> ok | {error, any()}.
  287. send(#state{mod = Mod, sock = Sock}, Data) ->
  288. do_send(Mod, Sock, epgsql_wire:encode_command(Data)).
  289. -spec send(pg_sock(), byte(), iodata()) -> ok | {error, any()}.
  290. send(#state{mod = Mod, sock = Sock}, Type, Data) ->
  291. do_send(Mod, Sock, epgsql_wire:encode_command(Type, Data)).
  292. -spec send_multi(pg_sock(), [{byte(), iodata()}]) -> ok | {error, any()}.
  293. send_multi(#state{mod = Mod, sock = Sock}, List) ->
  294. do_send(Mod, Sock, lists:map(fun({Type, Data}) ->
  295. epgsql_wire:encode_command(Type, Data)
  296. end, List)).
  297. do_send(gen_tcp, Sock, Bin) ->
  298. %% Why not gen_tcp:send/2?
  299. %% See https://github.com/rabbitmq/rabbitmq-common/blob/v3.7.4/src/rabbit_writer.erl#L367-L384
  300. %% Because of that we also have `handle_info({inet_reply, ...`
  301. try erlang:port_command(Sock, Bin) of
  302. true ->
  303. ok
  304. catch
  305. error:_Error ->
  306. {error, einval}
  307. end;
  308. do_send(ssl, Sock, Bin) ->
  309. ssl:send(Sock, Bin).
  310. loop(#state{data = Data, handler = Handler, repl = Repl} = State) ->
  311. case epgsql_wire:decode_message(Data) of
  312. {Type, Payload, Tail} ->
  313. case ?MODULE:Handler(Type, Payload, State#state{data = Tail}) of
  314. {noreply, State2} ->
  315. loop(State2);
  316. R = {stop, _Reason2, _State2} ->
  317. R
  318. end;
  319. _ ->
  320. %% in replication mode send feedback after each batch of messages
  321. case (Repl =/= undefined) andalso (Repl#repl.feedback_required) of
  322. true ->
  323. #repl{last_received_lsn = LastReceivedLSN,
  324. last_flushed_lsn = LastFlushedLSN,
  325. last_applied_lsn = LastAppliedLSN} = Repl,
  326. send(State, ?COPY_DATA, epgsql_wire:encode_standby_status_update(
  327. LastReceivedLSN, LastFlushedLSN, LastAppliedLSN)),
  328. {noreply, State#state{repl = Repl#repl{feedback_required = false}}};
  329. _ ->
  330. {noreply, State}
  331. end
  332. end.
  333. finish(State, Result) ->
  334. finish(State, Result, Result).
  335. finish(State = #state{current_cmd_transport = Transport}, Notice, Result) ->
  336. reply(Transport, Notice, Result),
  337. command_next(State).
  338. reply({cast, From, Ref}, _, Result) ->
  339. From ! {self(), Ref, Result};
  340. reply({incremental, From, Ref}, Notice, _) ->
  341. From ! {self(), Ref, Notice};
  342. reply({call, From}, _, Result) ->
  343. gen_server:reply(From, Result).
  344. add_result(#state{results = Results, current_cmd_transport = Transport} = State, Notice, Result) ->
  345. Results2 = case Transport of
  346. {incremental, From, Ref} ->
  347. From ! {self(), Ref, Notice},
  348. Results;
  349. _ ->
  350. [Result | Results]
  351. end,
  352. State#state{rows = [],
  353. results = Results2}.
  354. add_row(#state{rows = Rows, current_cmd_transport = Transport} = State, Data) ->
  355. Rows2 = case Transport of
  356. {incremental, From, Ref} ->
  357. From ! {self(), Ref, {data, Data}},
  358. Rows;
  359. _ ->
  360. [Data | Rows]
  361. end,
  362. State#state{rows = Rows2}.
  363. notify(#state{current_cmd_transport = {incremental, From, Ref}} = State, Notice) ->
  364. From ! {self(), Ref, Notice},
  365. State;
  366. notify(State, _) ->
  367. State.
  368. %% Send asynchronous messages (notice / notification)
  369. notify_async(#state{async = undefined}, _) ->
  370. false;
  371. notify_async(#state{async = PidOrName}, Msg) ->
  372. try PidOrName ! {epgsql, self(), Msg} of
  373. _ -> true
  374. catch error:badarg ->
  375. %% no process registered under this name
  376. false
  377. end.
  378. sync_required(#state{current_cmd = epgsql_cmd_sync} = State) ->
  379. State;
  380. sync_required(#state{current_cmd = undefined} = State) ->
  381. State#state{sync_required = true};
  382. sync_required(State) ->
  383. sync_required(finish(State, {error, sync_required})).
  384. flush_queue(#state{current_cmd = undefined} = State, _) ->
  385. State;
  386. flush_queue(State, Error) ->
  387. flush_queue(finish(State, Error), Error).
  388. to_binary(B) when is_binary(B) -> B;
  389. to_binary(L) when is_list(L) -> list_to_binary(L).
  390. %% -- backend message handling --
  391. %% CommandComplete
  392. on_message(?COMMAND_COMPLETE = Msg, Bin, State) ->
  393. Complete = epgsql_wire:decode_complete(Bin),
  394. command_handle_message(Msg, Bin, State#state{complete_status = Complete});
  395. %% ReadyForQuery
  396. on_message(?READY_FOR_QUERY = Msg, <<Status:8>> = Bin, State) ->
  397. command_handle_message(Msg, Bin, State#state{txstatus = Status});
  398. %% Error
  399. on_message(?ERROR = Msg, Err, #state{current_cmd = CurrentCmd} = State) ->
  400. Reason = epgsql_wire:decode_error(Err),
  401. case CurrentCmd of
  402. undefined ->
  403. %% Message generated by server asynchronously
  404. {stop, {shutdown, Reason}, State};
  405. _ ->
  406. command_handle_message(Msg, Reason, State)
  407. end;
  408. %% NoticeResponse
  409. on_message(?NOTICE, Data, State) ->
  410. notify_async(State, {notice, epgsql_wire:decode_error(Data)}),
  411. {noreply, State};
  412. %% ParameterStatus
  413. on_message(?PARAMETER_STATUS, Data, State) ->
  414. [Name, Value] = epgsql_wire:decode_strings(Data),
  415. Parameters2 = lists:keystore(Name, 1, State#state.parameters,
  416. {Name, Value}),
  417. {noreply, State#state{parameters = Parameters2}};
  418. %% NotificationResponse
  419. on_message(?NOTIFICATION, <<Pid:?int32, Strings/binary>>, State) ->
  420. {Channel1, Payload1} = case epgsql_wire:decode_strings(Strings) of
  421. [Channel, Payload] -> {Channel, Payload};
  422. [Channel] -> {Channel, <<>>}
  423. end,
  424. notify_async(State, {notification, Channel1, Pid, Payload1}),
  425. {noreply, State};
  426. %% ParseComplete
  427. %% ParameterDescription
  428. %% RowDescription
  429. %% NoData
  430. %% BindComplete
  431. %% CloseComplete
  432. %% DataRow
  433. %% PortalSuspended
  434. %% EmptyQueryResponse
  435. %% CopyData
  436. %% CopyBothResponse
  437. on_message(Msg, Payload, State) ->
  438. command_handle_message(Msg, Payload, State).
  439. %% CopyData for Replication mode
  440. on_replication(?COPY_DATA, <<?PRIMARY_KEEPALIVE_MESSAGE:8, LSN:?int64, _Timestamp:?int64, ReplyRequired:8>>,
  441. #state{repl = #repl{last_flushed_lsn = LastFlushedLSN,
  442. last_applied_lsn = LastAppliedLSN,
  443. align_lsn = AlignLsn} = Repl} = State) ->
  444. Repl1 =
  445. case ReplyRequired of
  446. 1 when AlignLsn ->
  447. send(State, ?COPY_DATA,
  448. epgsql_wire:encode_standby_status_update(LSN, LSN, LSN)),
  449. Repl#repl{feedback_required = false,
  450. last_received_lsn = LSN, last_applied_lsn = LSN, last_flushed_lsn = LSN};
  451. 1 when not AlignLsn ->
  452. send(State, ?COPY_DATA,
  453. epgsql_wire:encode_standby_status_update(LSN, LastFlushedLSN, LastAppliedLSN)),
  454. Repl#repl{feedback_required = false,
  455. last_received_lsn = LSN};
  456. _ ->
  457. Repl#repl{feedback_required = true,
  458. last_received_lsn = LSN}
  459. end,
  460. {noreply, State#state{repl = Repl1}};
  461. %% CopyData for Replication mode
  462. on_replication(?COPY_DATA, <<?X_LOG_DATA, StartLSN:?int64, EndLSN:?int64,
  463. _Timestamp:?int64, WALRecord/binary>>,
  464. #state{repl = Repl} = State) ->
  465. Repl1 = handle_xlog_data(StartLSN, EndLSN, WALRecord, Repl),
  466. {noreply, State#state{repl = Repl1}};
  467. on_replication(?ERROR, Err, State) ->
  468. Reason = epgsql_wire:decode_error(Err),
  469. {stop, {error, Reason}, State};
  470. on_replication(M, Data, Sock) when M == ?NOTICE;
  471. M == ?NOTIFICATION;
  472. M == ?PARAMETER_STATUS ->
  473. on_message(M, Data, Sock).
  474. handle_xlog_data(StartLSN, EndLSN, WALRecord, #repl{cbmodule = undefined,
  475. receiver = Receiver} = Repl) ->
  476. %% with async messages
  477. Receiver ! {epgsql, self(), {x_log_data, StartLSN, EndLSN, WALRecord}},
  478. Repl#repl{feedback_required = true,
  479. last_received_lsn = EndLSN};
  480. handle_xlog_data(StartLSN, EndLSN, WALRecord,
  481. #repl{cbmodule = CbModule, cbstate = CbState, receiver = undefined} = Repl) ->
  482. %% with callback method
  483. {ok, LastFlushedLSN, LastAppliedLSN, NewCbState} =
  484. epgsql:handle_x_log_data(CbModule, StartLSN, EndLSN, WALRecord, CbState),
  485. Repl#repl{feedback_required = true,
  486. last_received_lsn = EndLSN,
  487. last_flushed_lsn = LastFlushedLSN,
  488. last_applied_lsn = LastAppliedLSN,
  489. cbstate = NewCbState}.