epgsql_sock.erl 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779
  1. %%% @doc GenServer holding all the connection state (including socket).
  2. %%%
  3. %%% See [https://www.postgresql.org/docs/current/static/protocol-flow.html]
  4. %%%
  5. %%% Commands in PostgreSQL protocol are pipelined: you don't have to wait for
  6. %%% reply to be able to send next command.
  7. %%% Commands are processed (and responses to them are generated) in FIFO order.
  8. %%% eg, if you execute 2 SimpleQuery: #1 and #2, first you get all response
  9. %%% packets for #1 and then all for #2:
  10. %%% ```
  11. %%% > SQuery #1
  12. %%% > SQuery #2
  13. %%% < RowDescription #1
  14. %%% < DataRow #1.1
  15. %%% < ...
  16. %%% < DataRow #1.N
  17. %%% < CommandComplete #1
  18. %%% < RowDescription #2
  19. %%% < DataRow #2.1
  20. %%% < ...
  21. %%% < DataRow #2.N
  22. %%% < CommandComplete #2
  23. %%% '''
  24. %%% `epgsql_sock' is capable of utilizing the pipelining feature - as soon as
  25. %%% it receives a new command, it sends it to the server immediately and then
  26. %%% it puts command's callbacks and state into internal queue of all the commands
  27. %%% which were sent to the server and waiting for response. So it knows in which
  28. %%% order it should call each pipelined command's `handle_message' callback.
  29. %%% But it can be easily broken if high-level command is poorly implemented or
  30. %%% some conflicting low-level commands (such as `parse', `bind', `execute') are
  31. %%% executed in a wrong order. In this case server and epgsql states become out of
  32. %%% sync and {@link epgsql_cmd_sync} have to be executed in order to recover.
  33. %%%
  34. %%% {@link epgsql_cmd_copy_from_stdin} and {@link epgsql_cmd_start_replication} switches the
  35. %%% "state machine" of connection process to a special "COPY mode" subprotocol.
  36. %%% See [https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-COPY].
  37. %%% @see epgsql_cmd_connect. epgsql_cmd_connect for network connection and authentication setup
  38. %%% @end
  39. %%% Copyright (C) 2009 - Will Glozer. All rights reserved.
  40. %%% Copyright (C) 2011 - Anton Lebedevich. All rights reserved.
  41. -module(epgsql_sock).
  42. -behavior(gen_server).
  43. -export([start_link/0,
  44. close/1,
  45. sync_command/3,
  46. async_command/4,
  47. get_parameter/2,
  48. set_notice_receiver/2,
  49. get_cmd_status/1,
  50. cancel/1,
  51. copy_send_rows/3,
  52. standby_status_update/3,
  53. get_backend_pid/1,
  54. activate/1]).
  55. -export([handle_call/3, handle_cast/2, handle_info/2, format_status/2]).
  56. -export([init/1, code_change/3, terminate/2]).
  57. %% loop callback
  58. -export([on_message/3, on_replication/3, on_copy_from_stdin/3]).
  59. %% Comand's APIs
  60. -export([set_net_socket/3, init_replication_state/1, set_attr/3, get_codec/1,
  61. get_rows/1, get_results/1, notify/2, send/2, send/3, send_multi/2,
  62. get_parameter_internal/2,
  63. get_subproto_state/1, set_packet_handler/2]).
  64. -export_type([transport/0, pg_sock/0, error/0]).
  65. -include("protocol.hrl").
  66. -include("epgsql_replication.hrl").
  67. -include("epgsql_copy.hrl").
  68. -type transport() :: {call, any()}
  69. | {cast, pid(), reference()}
  70. | {incremental, pid(), reference()}.
  71. -type tcp_socket() :: port(). %gen_tcp:socket() isn't exported prior to erl 18
  72. -type repl_state() :: #repl{}.
  73. -type copy_state() :: #copy{}.
  74. -type error() :: {error, sync_required | closed | sock_closed | sock_error}.
  75. -record(state, {mod :: gen_tcp | ssl | undefined,
  76. sock :: tcp_socket() | ssl:sslsocket() | undefined,
  77. data = <<>>,
  78. backend :: {Pid :: integer(), Key :: integer()} | undefined,
  79. handler = on_message :: on_message | on_replication | on_copy_from_stdin | undefined,
  80. codec :: epgsql_binary:codec() | undefined,
  81. queue = queue:new() :: queue:queue({epgsql_command:command(), any(), transport()}),
  82. current_cmd :: epgsql_command:command() | undefined,
  83. current_cmd_state :: any() | undefined,
  84. current_cmd_transport :: transport() | undefined,
  85. async :: undefined | atom() | pid(),
  86. parameters = [] :: [{Key :: binary(), Value :: binary()}],
  87. rows = [] :: [tuple()] | information_redacted,
  88. results = [],
  89. sync_required :: boolean() | undefined,
  90. txstatus :: byte() | undefined, % $I | $T | $E,
  91. complete_status :: atom() | {atom(), integer()} | undefined,
  92. subproto_state :: repl_state() | copy_state() | undefined,
  93. connect_opts :: epgsql:connect_opts() | undefined}).
  94. -opaque pg_sock() :: #state{}.
  95. -ifndef(OTP_RELEASE). % pre-OTP21
  96. -define(WITH_STACKTRACE(T, R, S), T:R -> S = erlang:get_stacktrace(), ).
  97. -else.
  98. -define(WITH_STACKTRACE(T, R, S), T:R:S ->).
  99. -endif.
  100. %% -- client interface --
  101. start_link() ->
  102. gen_server:start_link(?MODULE, [], []).
  103. close(C) when is_pid(C) ->
  104. catch gen_server:cast(C, stop),
  105. ok.
  106. -spec sync_command(epgsql:connection(), epgsql_command:command(), any()) -> any().
  107. sync_command(C, Command, Args) ->
  108. gen_server:call(C, {command, Command, Args}, infinity).
  109. -spec async_command(epgsql:connection(), cast | incremental,
  110. epgsql_command:command(), any()) -> reference().
  111. async_command(C, Transport, Command, Args) ->
  112. Ref = make_ref(),
  113. Pid = self(),
  114. ok = gen_server:cast(C, {{Transport, Pid, Ref}, Command, Args}),
  115. Ref.
  116. get_parameter(C, Name) ->
  117. gen_server:call(C, {get_parameter, to_binary(Name)}, infinity).
  118. set_notice_receiver(C, PidOrName) when is_pid(PidOrName);
  119. is_atom(PidOrName) ->
  120. gen_server:call(C, {set_async_receiver, PidOrName}, infinity).
  121. get_cmd_status(C) ->
  122. gen_server:call(C, get_cmd_status, infinity).
  123. cancel(S) ->
  124. gen_server:cast(S, cancel).
  125. copy_send_rows(C, Rows, Timeout) ->
  126. gen_server:call(C, {copy_send_rows, Rows}, Timeout).
  127. standby_status_update(C, FlushedLSN, AppliedLSN) ->
  128. gen_server:call(C, {standby_status_update, FlushedLSN, AppliedLSN}).
  129. -spec get_backend_pid(epgsql:connection()) -> integer().
  130. get_backend_pid(C) ->
  131. gen_server:call(C, get_backend_pid).
  132. -spec activate(epgsql:connection()) -> ok | {error, any()}.
  133. activate(C) ->
  134. gen_server:call(C, activate).
  135. %% -- command APIs --
  136. %% send()
  137. %% send_many()
  138. -spec set_net_socket(gen_tcp | ssl, tcp_socket() | ssl:sslsocket(), pg_sock()) -> pg_sock().
  139. set_net_socket(Mod, Socket, State) ->
  140. State1 = State#state{mod = Mod, sock = Socket},
  141. setopts(State1, [{active, true}]),
  142. State1.
  143. -spec init_replication_state(pg_sock()) -> pg_sock().
  144. init_replication_state(State) ->
  145. State#state{subproto_state = #repl{}}.
  146. -spec set_attr(atom(), any(), pg_sock()) -> pg_sock().
  147. set_attr(backend, {_Pid, _Key} = Backend, State) ->
  148. State#state{backend = Backend};
  149. set_attr(async, Async, State) ->
  150. State#state{async = Async};
  151. set_attr(txstatus, Status, State) ->
  152. State#state{txstatus = Status};
  153. set_attr(codec, Codec, State) ->
  154. State#state{codec = Codec};
  155. set_attr(sync_required, Value, State) ->
  156. State#state{sync_required = Value};
  157. set_attr(subproto_state, Value, State) ->
  158. State#state{subproto_state = Value};
  159. set_attr(connect_opts, ConnectOpts, State) ->
  160. State#state{connect_opts = ConnectOpts}.
  161. %% XXX: be careful!
  162. -spec set_packet_handler(atom(), pg_sock()) -> pg_sock().
  163. set_packet_handler(Handler, State0) ->
  164. State = State0#state{handler = Handler},
  165. setopts(State, [{active, true}]),
  166. State.
  167. -spec get_codec(pg_sock()) -> epgsql_binary:codec().
  168. get_codec(#state{codec = Codec}) ->
  169. Codec.
  170. -spec get_subproto_state(pg_sock()) -> repl_state() | copy_state() | undefined.
  171. get_subproto_state(#state{subproto_state = SubState}) ->
  172. SubState.
  173. -spec get_rows(pg_sock()) -> [tuple()].
  174. get_rows(#state{rows = Rows}) ->
  175. lists:reverse(Rows).
  176. -spec get_results(pg_sock()) -> [any()].
  177. get_results(#state{results = Results}) ->
  178. lists:reverse(Results).
  179. -spec get_parameter_internal(binary(), pg_sock()) -> binary() | undefined.
  180. get_parameter_internal(Name, #state{parameters = Parameters}) ->
  181. case lists:keysearch(Name, 1, Parameters) of
  182. {value, {Name, Value}} -> Value;
  183. false -> undefined
  184. end.
  185. %% -- gen_server implementation --
  186. init([]) ->
  187. {ok, #state{}}.
  188. handle_call({command, Command, Args}, From, State) ->
  189. Transport = {call, From},
  190. command_new(Transport, Command, Args, State);
  191. handle_call({get_parameter, Name}, _From, State) ->
  192. {reply, {ok, get_parameter_internal(Name, State)}, State};
  193. handle_call({set_async_receiver, PidOrName}, _From, #state{async = Previous} = State) ->
  194. {reply, {ok, Previous}, State#state{async = PidOrName}};
  195. handle_call(get_cmd_status, _From, #state{complete_status = Status} = State) ->
  196. {reply, {ok, Status}, State};
  197. handle_call(get_backend_pid, _From, #state{backend = {Pid, _Key}} = State) ->
  198. {reply, Pid, State};
  199. handle_call({standby_status_update, FlushedLSN, AppliedLSN}, _From,
  200. #state{handler = on_replication,
  201. subproto_state = #repl{last_received_lsn = ReceivedLSN} = Repl} = State) ->
  202. send(State, ?COPY_DATA, epgsql_wire:encode_standby_status_update(ReceivedLSN, FlushedLSN, AppliedLSN)),
  203. Repl1 = Repl#repl{last_flushed_lsn = FlushedLSN,
  204. last_applied_lsn = AppliedLSN},
  205. {reply, ok, State#state{subproto_state = Repl1}};
  206. handle_call({copy_send_rows, Rows}, _From,
  207. #state{handler = Handler, subproto_state = CopyState} = State) ->
  208. Response = handle_copy_send_rows(Rows, Handler, CopyState, State),
  209. {reply, Response, State};
  210. handle_call(activate, _From, State) ->
  211. setopts(State, [{active, true}]),
  212. {reply, ok, State}.
  213. handle_cast({{Method, From, Ref} = Transport, Command, Args}, State)
  214. when ((Method == cast) or (Method == incremental)),
  215. is_pid(From),
  216. is_reference(Ref) ->
  217. command_new(Transport, Command, Args, State);
  218. handle_cast(stop, State) ->
  219. send(State, ?TERMINATE, []),
  220. {stop, normal, flush_queue(State, {error, closed})};
  221. handle_cast(cancel, State = #state{backend = {Pid, Key},
  222. connect_opts = ConnectOpts,
  223. mod = Mode}) ->
  224. SockOpts = [{active, false}, {packet, raw}, binary],
  225. Msg = <<16:?int32, 80877102:?int32, Pid:?int32, Key:?int32>>,
  226. case epgsql_cmd_connect:open_socket(SockOpts, ConnectOpts) of
  227. {ok, Mode, Sock} ->
  228. ok = apply(Mode, send, [Sock, Msg]),
  229. apply(Mode, close, [Sock]);
  230. {error, _Reason} ->
  231. noop
  232. end,
  233. {noreply, State}.
  234. handle_info({DataTag, Sock, Data2}, #state{data = Data, sock = Sock} = State)
  235. when DataTag == tcp; DataTag == ssl ->
  236. loop(State#state{data = <<Data/binary, Data2/binary>>});
  237. handle_info({Passive, Sock}, #state{sock = Sock} = State)
  238. when Passive == ssl_passive; Passive == tcp_passive ->
  239. NewState = send_socket_pasive(State),
  240. {noreply, NewState};
  241. handle_info({Closed, Sock}, #state{sock = Sock} = State)
  242. when Closed == tcp_closed; Closed == ssl_closed ->
  243. {stop, sock_closed, flush_queue(State#state{sock = undefined}, {error, sock_closed})};
  244. handle_info({Error, Sock, Reason}, #state{sock = Sock} = State)
  245. when Error == tcp_error; Error == ssl_error ->
  246. Why = {sock_error, Reason},
  247. {stop, Why, flush_queue(State, {error, Why})};
  248. handle_info({inet_reply, _, ok}, State) ->
  249. {noreply, State};
  250. handle_info({inet_reply, _, Status}, State) ->
  251. {stop, Status, flush_queue(State, {error, Status})};
  252. handle_info({io_request, From, ReplyAs, Request}, State) ->
  253. Response = handle_io_request(Request, State),
  254. io_reply(Response, From, ReplyAs),
  255. {noreply, State}.
  256. terminate(_Reason, #state{sock = undefined}) -> ok;
  257. terminate(_Reason, #state{mod = gen_tcp, sock = Sock}) -> gen_tcp:close(Sock);
  258. terminate(_Reason, #state{mod = ssl, sock = Sock}) -> ssl:close(Sock).
  259. code_change(_OldVsn, State, _Extra) ->
  260. {ok, State}.
  261. format_status(normal, [_PDict, State=#state{}]) ->
  262. [{data, [{"State", State}]}];
  263. format_status(terminate, [_PDict, State]) ->
  264. %% Do not format the rows attribute when process terminates abnormally
  265. %% but allow it when is a sys:get_status/1.2
  266. State#state{rows = information_redacted}.
  267. %% -- internal functions --
  268. -spec send_socket_pasive(pg_sock()) -> pg_sock().
  269. send_socket_pasive(#state{subproto_state = #repl{receiver = Rec}} = State) when Rec =/= undefined ->
  270. Rec ! {epgsql, self(), socket_passive},
  271. State;
  272. send_socket_pasive(#state{subproto_state = #repl{ cbmodule = CbMod
  273. , cbstate = CbState} = Repl
  274. } = State) ->
  275. {ok, NewCbState} = epgsql:handle_socket_passive(CbMod, CbState),
  276. NewRepl = Repl#repl{cbstate = NewCbState},
  277. State#state{subproto_state = NewRepl}.
  278. -spec command_new(transport(), epgsql_command:command(), any(), pg_sock()) ->
  279. Result when
  280. Result :: {noreply, pg_sock()}
  281. | {stop, Reason :: any(), pg_sock()}.
  282. command_new(Transport, Command, Args, State) ->
  283. CmdState = epgsql_command:init(Command, Args),
  284. command_exec(Transport, Command, CmdState, State).
  285. -spec command_exec(transport(), epgsql_command:command(), any(), pg_sock()) ->
  286. Result when
  287. Result :: {noreply, pg_sock()}
  288. | {stop, Reason :: any(), pg_sock()}.
  289. command_exec(Transport, Command, _, State = #state{sync_required = true})
  290. when Command /= epgsql_cmd_sync ->
  291. {noreply,
  292. finish(State#state{current_cmd = Command,
  293. current_cmd_transport = Transport},
  294. {error, sync_required})};
  295. command_exec(Transport, Command, CmdState, State) ->
  296. case epgsql_command:execute(Command, State, CmdState) of
  297. {ok, State1, CmdState1} ->
  298. {noreply, command_enqueue(Transport, Command, CmdState1, State1)};
  299. {send, PktType, PktData, State1, CmdState1} ->
  300. ok = send(State1, PktType, PktData),
  301. {noreply, command_enqueue(Transport, Command, CmdState1, State1)};
  302. {send_multi, Packets, State1, CmdState1} when is_list(Packets) ->
  303. ok = send_multi(State1, Packets),
  304. {noreply, command_enqueue(Transport, Command, CmdState1, State1)};
  305. {stop, StopReason, Response, State1} ->
  306. reply(Transport, Response, Response),
  307. {stop, StopReason, State1}
  308. end.
  309. -spec command_enqueue(transport(), epgsql_command:command(), epgsql_command:state(), pg_sock()) -> pg_sock().
  310. command_enqueue(Transport, Command, CmdState, #state{current_cmd = undefined} = State) ->
  311. State#state{current_cmd = Command,
  312. current_cmd_state = CmdState,
  313. current_cmd_transport = Transport,
  314. complete_status = undefined};
  315. command_enqueue(Transport, Command, CmdState, #state{queue = Q} = State) ->
  316. State#state{queue = queue:in({Command, CmdState, Transport}, Q),
  317. complete_status = undefined}.
  318. -spec command_handle_message(byte(), binary() | epgsql:query_error(), pg_sock()) ->
  319. {noreply, pg_sock()}
  320. | {stop, any(), pg_sock()}.
  321. command_handle_message(Msg, Payload,
  322. #state{current_cmd = Command,
  323. current_cmd_state = CmdState} = State) ->
  324. case epgsql_command:handle_message(Command, Msg, Payload, State, CmdState) of
  325. {add_row, Row, State1, CmdState1} ->
  326. {noreply, add_row(State1#state{current_cmd_state = CmdState1}, Row)};
  327. {add_result, Result, Notice, State1, CmdState1} ->
  328. {noreply,
  329. add_result(State1#state{current_cmd_state = CmdState1},
  330. Notice, Result)};
  331. {finish, Result, Notice, State1} ->
  332. {noreply, finish(State1, Notice, Result)};
  333. {noaction, State1} ->
  334. {noreply, State1};
  335. {noaction, State1, CmdState1} ->
  336. {noreply, State1#state{current_cmd_state = CmdState1}};
  337. {requeue, State1, CmdState1} ->
  338. Transport = State1#state.current_cmd_transport,
  339. command_exec(Transport, Command, CmdState1,
  340. State1#state{current_cmd = undefined});
  341. {stop, Reason, Response, State1} ->
  342. {stop, Reason, finish(State1, Response)};
  343. {sync_required, Why} ->
  344. %% Protocol error. Finish and flush all pending commands.
  345. {noreply, sync_required(finish(State#state{sync_required = true}, Why))};
  346. unknown ->
  347. {stop, {error, {unexpected_message, Msg, Command, CmdState}}, State}
  348. end.
  349. command_next(#state{current_cmd = PrevCmd,
  350. queue = Q} = State) when PrevCmd =/= undefined ->
  351. case queue:out(Q) of
  352. {empty, _} ->
  353. State#state{current_cmd = undefined,
  354. current_cmd_state = undefined,
  355. current_cmd_transport = undefined,
  356. rows = [],
  357. results = []};
  358. {{value, {Command, CmdState, Transport}}, Q1} ->
  359. State#state{current_cmd = Command,
  360. current_cmd_state = CmdState,
  361. current_cmd_transport = Transport,
  362. queue = Q1,
  363. rows = [],
  364. results = []}
  365. end.
  366. setopts(#state{mod = Mod, sock = Sock} = State, DefaultOpts) ->
  367. Opts = update_active(State, DefaultOpts),
  368. case Mod of
  369. gen_tcp -> inet:setopts(Sock, Opts);
  370. ssl -> ssl:setopts(Sock, Opts)
  371. end.
  372. update_active(#state{handler = H}, DefaultOpts) when H =/= on_replication ->
  373. %% Ignore active option in tcp_opts or ssl_opts unless in the replication mode
  374. DefaultOpts;
  375. update_active(#state{mod = gen_tcp, connect_opts = #{tcp_opts := Opts}}, DefaultOpts) ->
  376. update_active_opt(Opts, DefaultOpts);
  377. update_active(#state{mod = ssl, connect_opts = #{ssl_opts := Opts}}, DefaultOpts) ->
  378. update_active_opt(Opts, DefaultOpts);
  379. update_active(_State, DefaultOpts) ->
  380. DefaultOpts.
  381. update_active_opt(Opts, DefaultOpts) ->
  382. case proplists:lookup(active, Opts) of
  383. none -> DefaultOpts;
  384. Active -> lists:keystore(active, 1, DefaultOpts, Active)
  385. end.
  386. %% This one only used in connection initiation to send client's
  387. %% `StartupMessage' and `SSLRequest' packets
  388. -spec send(pg_sock(), iodata()) -> ok | {error, any()}.
  389. send(#state{mod = Mod, sock = Sock}, Data) ->
  390. do_send(Mod, Sock, epgsql_wire:encode_command(Data)).
  391. -spec send(pg_sock(), epgsql_wire:packet_type(), iodata()) -> ok | {error, any()}.
  392. send(#state{mod = Mod, sock = Sock}, Type, Data) ->
  393. do_send(Mod, Sock, epgsql_wire:encode_command(Type, Data)).
  394. -spec send_multi(pg_sock(), [{epgsql_wire:packet_type(), iodata()}]) -> ok | {error, any()}.
  395. send_multi(#state{mod = Mod, sock = Sock}, List) ->
  396. do_send(Mod, Sock, lists:map(fun({Type, Data}) ->
  397. epgsql_wire:encode_command(Type, Data)
  398. end, List)).
  399. do_send(gen_tcp, Sock, Bin) ->
  400. %% Why not gen_tcp:send/2?
  401. %% See https://github.com/rabbitmq/rabbitmq-common/blob/v3.7.4/src/rabbit_writer.erl#L367-L384
  402. %% Because of that we also have `handle_info({inet_reply, ...`
  403. try erlang:port_command(Sock, Bin) of
  404. true ->
  405. ok
  406. catch
  407. error:_Error ->
  408. {error, einval}
  409. end;
  410. do_send(ssl, Sock, Bin) ->
  411. ssl:send(Sock, Bin).
  412. loop(#state{data = Data, handler = Handler, subproto_state = Repl} = State) ->
  413. case epgsql_wire:decode_message(Data) of
  414. {Type, Payload, Tail} ->
  415. case ?MODULE:Handler(Type, Payload, State#state{data = Tail}) of
  416. {noreply, State2} ->
  417. loop(State2);
  418. R = {stop, _Reason2, _State2} ->
  419. R
  420. end;
  421. _ ->
  422. %% in replication mode send feedback after each batch of messages
  423. case Handler == on_replication
  424. andalso (Repl =/= undefined)
  425. andalso (Repl#repl.feedback_required) of
  426. true ->
  427. #repl{last_received_lsn = LastReceivedLSN,
  428. last_flushed_lsn = LastFlushedLSN,
  429. last_applied_lsn = LastAppliedLSN} = Repl,
  430. send(State, ?COPY_DATA, epgsql_wire:encode_standby_status_update(
  431. LastReceivedLSN, LastFlushedLSN, LastAppliedLSN)),
  432. {noreply, State#state{subproto_state = Repl#repl{feedback_required = false}}};
  433. _ ->
  434. {noreply, State}
  435. end
  436. end.
  437. finish(State, Result) ->
  438. finish(State, Result, Result).
  439. finish(State = #state{current_cmd_transport = Transport}, Notice, Result) ->
  440. reply(Transport, Notice, Result),
  441. command_next(State).
  442. reply({cast, From, Ref}, _, Result) ->
  443. From ! {self(), Ref, Result};
  444. reply({incremental, From, Ref}, Notice, _) ->
  445. From ! {self(), Ref, Notice};
  446. reply({call, From}, _, Result) ->
  447. gen_server:reply(From, Result).
  448. add_result(#state{results = Results, current_cmd_transport = Transport} = State, Notice, Result) ->
  449. Results2 = case Transport of
  450. {incremental, From, Ref} ->
  451. From ! {self(), Ref, Notice},
  452. Results;
  453. _ ->
  454. [Result | Results]
  455. end,
  456. State#state{rows = [],
  457. results = Results2}.
  458. add_row(#state{rows = Rows, current_cmd_transport = Transport} = State, Data) ->
  459. Rows2 = case Transport of
  460. {incremental, From, Ref} ->
  461. From ! {self(), Ref, {data, Data}},
  462. Rows;
  463. _ ->
  464. [Data | Rows]
  465. end,
  466. State#state{rows = Rows2}.
  467. notify(#state{current_cmd_transport = {incremental, From, Ref}} = State, Notice) ->
  468. From ! {self(), Ref, Notice},
  469. State;
  470. notify(State, _) ->
  471. State.
  472. %% Send asynchronous messages (notice / notification)
  473. notify_async(#state{async = undefined}, _) ->
  474. false;
  475. notify_async(#state{async = PidOrName}, Msg) ->
  476. try PidOrName ! {epgsql, self(), Msg} of
  477. _ -> true
  478. catch error:badarg ->
  479. %% no process registered under this name
  480. false
  481. end.
  482. sync_required(#state{current_cmd = epgsql_cmd_sync} = State) ->
  483. State;
  484. sync_required(#state{current_cmd = undefined} = State) ->
  485. State#state{sync_required = true};
  486. sync_required(State) ->
  487. sync_required(finish(State, {error, sync_required})).
  488. flush_queue(#state{current_cmd = undefined} = State, _) ->
  489. State;
  490. flush_queue(State, Error) ->
  491. flush_queue(finish(State, Error), Error).
  492. %% @doc Handler for IO protocol version of COPY FROM STDIN
  493. %%
  494. %% COPY FROM STDIN is implemented as Erlang
  495. %% <a href="https://erlang.org/doc/apps/stdlib/io_protocol.html">io protocol</a>.
  496. handle_io_request(_, #state{handler = Handler}) when Handler =/= on_copy_from_stdin ->
  497. %% Received IO request when `epgsql_cmd_copy_from_stdin' haven't yet been called or it was
  498. %% terminated with error and already sent `ReadyForQuery'
  499. {error, not_in_copy_mode};
  500. handle_io_request(_, #state{subproto_state = #copy{last_error = Err}}) when Err =/= undefined ->
  501. {error, Err};
  502. handle_io_request({put_chars, Encoding, Chars}, State) ->
  503. send(State, ?COPY_DATA, encode_chars(Encoding, Chars));
  504. handle_io_request({put_chars, Encoding, Mod, Fun, Args}, State) ->
  505. try apply(Mod, Fun, Args) of
  506. Chars when is_binary(Chars);
  507. is_list(Chars) ->
  508. handle_io_request({put_chars, Encoding, Chars}, State);
  509. Other ->
  510. {error, {fun_return_not_characters, Other}}
  511. catch ?WITH_STACKTRACE(T, R, S)
  512. {error, {fun_exception, {T, R, S}}}
  513. end;
  514. handle_io_request({setopts, _}, _State) ->
  515. {error, request};
  516. handle_io_request(getopts, _State) ->
  517. {error, request};
  518. handle_io_request({requests, Requests}, State) ->
  519. try_requests(Requests, State, ok).
  520. try_requests([Req | Requests], State, _) ->
  521. case handle_io_request(Req, State) of
  522. {error, _} = Err ->
  523. Err;
  524. Other ->
  525. try_requests(Requests, State, Other)
  526. end;
  527. try_requests([], _, LastRes) ->
  528. LastRes.
  529. io_reply(Result, From, ReplyAs) ->
  530. From ! {io_reply, ReplyAs, Result}.
  531. %% @doc Handler for `copy_send_rows' API
  532. %%
  533. %% Only supports binary protocol right now.
  534. %% But, in theory, can be used for text / csv formats as well, but we would need to add
  535. %% some more callbacks to `epgsql_type' behaviour (eg, `encode_text')
  536. handle_copy_send_rows(_Rows, Handler, _CopyState, _State) when Handler =/= on_copy_from_stdin ->
  537. {error, not_in_copy_mode};
  538. handle_copy_send_rows(_, _, #copy{format = Format}, _) when Format =/= binary ->
  539. %% copy_send_rows only supports "binary" format
  540. {error, not_binary_format};
  541. handle_copy_send_rows(_, _, #copy{last_error = LastError}, _) when LastError =/= undefined ->
  542. %% server already reported error in data stream asynchronously
  543. {error, LastError};
  544. handle_copy_send_rows(Rows, _, #copy{binary_types = Types}, State) ->
  545. Data = [epgsql_wire:encode_copy_row(Values, Types, get_codec(State))
  546. || Values <- Rows],
  547. ok = send(State, ?COPY_DATA, Data).
  548. encode_chars(_, Bin) when is_binary(Bin) ->
  549. Bin;
  550. encode_chars(unicode, Chars) when is_list(Chars) ->
  551. unicode:characters_to_binary(Chars);
  552. encode_chars(latin1, Chars) when is_list(Chars) ->
  553. unicode:characters_to_binary(Chars, latin1).
  554. to_binary(B) when is_binary(B) -> B;
  555. to_binary(L) when is_list(L) -> list_to_binary(L).
  556. %% -- backend message handling --
  557. %% CommandComplete
  558. on_message(?COMMAND_COMPLETE = Msg, Bin, State) ->
  559. Complete = epgsql_wire:decode_complete(Bin),
  560. command_handle_message(Msg, Bin, State#state{complete_status = Complete});
  561. %% ReadyForQuery
  562. on_message(?READY_FOR_QUERY = Msg, <<Status:8>> = Bin, State) ->
  563. command_handle_message(Msg, Bin, State#state{txstatus = Status});
  564. %% Error
  565. on_message(?ERROR = Msg, Err, #state{current_cmd = CurrentCmd} = State) ->
  566. Reason = epgsql_wire:decode_error(Err),
  567. case CurrentCmd of
  568. undefined ->
  569. %% Message generated by server asynchronously
  570. {stop, {shutdown, Reason}, State};
  571. _ ->
  572. command_handle_message(Msg, Reason, State)
  573. end;
  574. %% NoticeResponse
  575. on_message(?NOTICE, Data, State) ->
  576. notify_async(State, {notice, epgsql_wire:decode_error(Data)}),
  577. {noreply, State};
  578. %% ParameterStatus
  579. on_message(?PARAMETER_STATUS, Data, State) ->
  580. [Name, Value] = epgsql_wire:decode_strings(Data),
  581. Parameters2 = lists:keystore(Name, 1, State#state.parameters,
  582. {Name, Value}),
  583. {noreply, State#state{parameters = Parameters2}};
  584. %% NotificationResponse
  585. on_message(?NOTIFICATION, <<Pid:?int32, Strings/binary>>, State) ->
  586. {Channel1, Payload1} = case epgsql_wire:decode_strings(Strings) of
  587. [Channel, Payload] -> {Channel, Payload};
  588. [Channel] -> {Channel, <<>>}
  589. end,
  590. notify_async(State, {notification, Channel1, Pid, Payload1}),
  591. {noreply, State};
  592. %% ParseComplete
  593. %% ParameterDescription
  594. %% RowDescription
  595. %% NoData
  596. %% BindComplete
  597. %% CloseComplete
  598. %% DataRow
  599. %% PortalSuspended
  600. %% EmptyQueryResponse
  601. %% CopyData
  602. %% CopyBothResponse
  603. on_message(Msg, Payload, State) ->
  604. command_handle_message(Msg, Payload, State).
  605. %% @doc Handle "copy subprotocol" for COPY .. FROM STDIN
  606. %%
  607. %% Activated by `epgsql_cmd_copy_from_stdin', deactivated by `epgsql_cmd_copy_done' or error
  608. on_copy_from_stdin(?READY_FOR_QUERY, <<Status:8>>,
  609. #state{subproto_state = #copy{last_error = Err,
  610. initiator = Pid}} = State) when Err =/= undefined ->
  611. %% Reporting error from here and not from ?ERROR so it's easier to be in sync state
  612. Pid ! {epgsql, self(), {error, Err}},
  613. {noreply, State#state{subproto_state = undefined,
  614. handler = on_message,
  615. txstatus = Status}};
  616. on_copy_from_stdin(?ERROR, Err, #state{subproto_state = SubState} = State) ->
  617. Reason = epgsql_wire:decode_error(Err),
  618. {noreply, State#state{subproto_state = SubState#copy{last_error = Reason}}};
  619. on_copy_from_stdin(M, Data, Sock) when M == ?NOTICE;
  620. M == ?NOTIFICATION;
  621. M == ?PARAMETER_STATUS ->
  622. on_message(M, Data, Sock).
  623. %% CopyData for Replication mode
  624. on_replication(?COPY_DATA, <<?PRIMARY_KEEPALIVE_MESSAGE:8, LSN:?int64, _Timestamp:?int64, ReplyRequired:8>>,
  625. #state{subproto_state = #repl{last_flushed_lsn = LastFlushedLSN,
  626. last_applied_lsn = LastAppliedLSN,
  627. align_lsn = AlignLsn} = Repl} = State) ->
  628. Repl1 =
  629. case ReplyRequired of
  630. 1 when AlignLsn ->
  631. send(State, ?COPY_DATA,
  632. epgsql_wire:encode_standby_status_update(LSN, LSN, LSN)),
  633. Repl#repl{feedback_required = false,
  634. last_received_lsn = LSN, last_applied_lsn = LSN, last_flushed_lsn = LSN};
  635. 1 when not AlignLsn ->
  636. send(State, ?COPY_DATA,
  637. epgsql_wire:encode_standby_status_update(LSN, LastFlushedLSN, LastAppliedLSN)),
  638. Repl#repl{feedback_required = false,
  639. last_received_lsn = LSN};
  640. _ ->
  641. Repl#repl{feedback_required = true,
  642. last_received_lsn = LSN}
  643. end,
  644. {noreply, State#state{subproto_state = Repl1}};
  645. %% CopyData for Replication mode
  646. on_replication(?COPY_DATA, <<?X_LOG_DATA, StartLSN:?int64, EndLSN:?int64,
  647. _Timestamp:?int64, WALRecord/binary>>,
  648. #state{subproto_state = Repl} = State) ->
  649. Repl1 = handle_xlog_data(StartLSN, EndLSN, WALRecord, Repl),
  650. {noreply, State#state{subproto_state = Repl1}};
  651. on_replication(?ERROR, Err, State) ->
  652. Reason = epgsql_wire:decode_error(Err),
  653. {stop, {error, Reason}, State};
  654. on_replication(M, Data, Sock) when M == ?NOTICE;
  655. M == ?NOTIFICATION;
  656. M == ?PARAMETER_STATUS ->
  657. on_message(M, Data, Sock).
  658. handle_xlog_data(StartLSN, EndLSN, WALRecord, #repl{cbmodule = undefined,
  659. receiver = Receiver} = Repl) ->
  660. %% with async messages
  661. Receiver ! {epgsql, self(), {x_log_data, StartLSN, EndLSN, WALRecord}},
  662. Repl#repl{feedback_required = true,
  663. last_received_lsn = EndLSN};
  664. handle_xlog_data(StartLSN, EndLSN, WALRecord,
  665. #repl{cbmodule = CbModule, cbstate = CbState, receiver = undefined} = Repl) ->
  666. %% with callback method
  667. {ok, LastFlushedLSN, LastAppliedLSN, NewCbState} =
  668. epgsql:handle_x_log_data(CbModule, StartLSN, EndLSN, WALRecord, CbState),
  669. Repl#repl{feedback_required = true,
  670. last_received_lsn = EndLSN,
  671. last_flushed_lsn = LastFlushedLSN,
  672. last_applied_lsn = LastAppliedLSN,
  673. cbstate = NewCbState}.