Browse Source

Handle socket_passive messages internally when the replication mode use synchronous handler

Yury Yantsevich 2 years ago
parent
commit
3d5d3d207a
4 changed files with 20 additions and 45 deletions
  1. 2 4
      doc/streaming.md
  2. 5 18
      src/epgsql.erl
  3. 12 17
      src/epgsql_sock.erl
  4. 1 6
      test/epgsql_replication_SUITE.erl

+ 2 - 4
doc/streaming.md

@@ -173,7 +173,5 @@ epgsql:activate(Connection).
 The `active` parameter of the socket will be set to the same value as it was configured in
 the connection's options.
 
-When the connection is in the synchronous mode, a provided callback module must implement
-`handle_socket_passive/1` function, which receives a current callback state and should
-return `{ok, NewCallbackState}`. The callback should not call `epgsql:activate/1` directly
-because it results in a deadlock.
+In the case of synchronous handler for replication messages `epgsql` will handle `socket_passive`
+messages internally.

+ 5 - 18
src/epgsql.erl

@@ -40,9 +40,7 @@
          to_map/1,
          activate/1]).
 %% private
--export([ handle_x_log_data/5
-        , handle_socket_passive/2
-        ]).
+-export([handle_x_log_data/5]).
 
 -export_type([connection/0, connect_option/0, connect_opts/0,
               connect_error/0, query_error/0, sql_query/0, column/0,
@@ -164,10 +162,6 @@
 %% Handles a XLogData Message (StartLSN, EndLSN, WALRecord, CbState).
 %% Return: {ok, LastFlushedLSN, LastAppliedLSN, NewCbState}
 -callback handle_x_log_data(lsn(), lsn(), binary(), cb_state()) -> {ok, lsn(), lsn(), cb_state()}.
-
-%% Handles socket_passive message.
-%% Return: {ok, NewCbState}.
--callback handle_socket_passive(cb_state()) -> {ok, cb_state()}.
 %% -------------
 
 %% -- client interface --
@@ -544,10 +538,6 @@ standby_status_update(Connection, FlushedLSN, AppliedLSN) ->
 handle_x_log_data(Mod, StartLSN, EndLSN, WALRecord, Repl) ->
     Mod:handle_x_log_data(StartLSN, EndLSN, WALRecord, Repl).
 
--spec handle_socket_passive(atom(), cb_state()) -> {ok, cb_state()}.
-handle_socket_passive(Mod, CbState) ->
-    Mod:handle_socket_passive(CbState).
-
 -type replication_option() ::
     {align_lsn, boolean()}. %% Align last applied and flushed LSN with last received LSN
                             %%  after Primary keepalive message with ReplyRequired flag
@@ -587,18 +577,15 @@ to_map(Map) when is_map(Map) ->
 to_map(List) when is_list(List) ->
     maps:from_list(List).
 
-%% @doc Activates TCP or SSL socket of the connection.
+%% @doc Activates TCP or SSL socket of a connection.
 %%
-%% If `{active, X}' is set in:
-%% - `tcp_opts' and the current mode is TCP or
-%% - `ssl_opts' and the current mode is SSL
-%% the function sets `{active, X}' on the connection socket.
-%% It sets `{active, true}' otherwise.
+%% If the `socket_active` connection option is supplied the function sets
+%% `{active, X}' the connection's SSL or TCP socket. It sets `{active, true}' otherwise.
 %%
 %% @param Connection connection
 %% @returns `ok' or `{error, Reason}'
 %%
-%% The ssl:reason() type is not exported.
+%% Note: The ssl:reason() type is not exported so that we use `any()' on the spec.
 -spec activate(connection()) -> ok | {error, inet:posix() | any()}.
 activate(Connection) ->
     epgsql_sock:activate(Connection).

+ 12 - 17
src/epgsql_sock.erl

@@ -169,8 +169,7 @@ activate(C) ->
 -spec set_net_socket(gen_tcp | ssl, tcp_socket() | ssl:sslsocket(), pg_sock()) -> pg_sock().
 set_net_socket(Mod, Socket, State) ->
     State1 = State#state{mod = Mod, sock = Socket},
-    Active = get_socket_active(State1),
-    setopts(State1, [{active, Active}]),
+    ok = activate_socket(State1),
     State1.
 
 -spec init_replication_state(pg_sock()) -> pg_sock().
@@ -197,8 +196,7 @@ set_attr(connect_opts, ConnectOpts, State) ->
 -spec set_packet_handler(atom(), pg_sock()) -> pg_sock().
 set_packet_handler(Handler, State0) ->
     State = State0#state{handler = Handler},
-    Active = get_socket_active(State),
-    setopts(State, [{active, Active}]),
+    ok = activate_socket(State),
     State.
 
 -spec get_codec(pg_sock()) -> epgsql_binary:codec().
@@ -259,8 +257,7 @@ handle_call({copy_send_rows, Rows}, _From,
     {reply, Response, State};
 
 handle_call(activate, _From, State) ->
-    Active = get_socket_active(State),
-    Res = setopts(State, [{active, Active}]),
+    Res = activate_socket(State),
     {reply, Res, State}.
 
 handle_cast({{Method, From, Ref} = Transport, Command, Args}, State)
@@ -335,12 +332,9 @@ format_status(terminate, [_PDict, State]) ->
 send_socket_pasive(#state{subproto_state = #repl{receiver = Rec}} = State) when Rec =/= undefined ->
     Rec ! {epgsql, self(), socket_passive},
     State;
-send_socket_pasive(#state{subproto_state = #repl{ cbmodule = CbMod
-                                                , cbstate = CbState} = Repl
-                         } = State) ->
-    {ok, NewCbState} = epgsql:handle_socket_passive(CbMod, CbState),
-    NewRepl = Repl#repl{cbstate = NewCbState},
-    State#state{subproto_state = NewRepl}.
+send_socket_pasive(State) ->
+    ok = activate_socket(State),
+    State.
 
 -spec command_new(transport(), epgsql_command:command(), any(), pg_sock()) ->
                          Result when
@@ -442,15 +436,16 @@ setopts(#state{mod = Mod, sock = Sock}, Opts) ->
     end.
 
 -spec get_socket_active(pg_sock()) -> epgsql:socket_active().
-get_socket_active(#state{handler = H}) when H =/= on_replication ->
-    true;
-get_socket_active(#state{connect_opts = #{socket_active := Active}}) ->
+get_socket_active(#state{handler = on_replication, connect_opts = #{socket_active := Active}}) ->
     Active;
-get_socket_active(#state{connect_opts = Opts}) when is_list(Opts) ->
-    proplists:get_value(socket_active, Opts, true);
 get_socket_active(_State) ->
     true.
 
+-spec activate_socket(pg_sock()) -> ok | {error, inet:posix() | any()}.
+activate_socket(State) ->
+  Active = get_socket_active(State),
+  setopts(State, [{active, Active}]).
+
 %% This one only used in connection initiation to send client's
 %% `StartupMessage' and `SSLRequest' packets
 -spec send(pg_sock(), iodata()) -> ok | {error, any()}.

+ 1 - 6
test/epgsql_replication_SUITE.erl

@@ -16,8 +16,7 @@
          replication_sync_active_n_socket/1,
 
          %% Callbacks
-         handle_x_log_data/4,
-         handle_socket_passive/1
+         handle_x_log_data/4
         ]).
 
 init_per_suite(Config) ->
@@ -168,7 +167,3 @@ handle_x_log_data(StartLSN, EndLSN, Data, CbState) ->
   {C, Pid} = CbState,
   Pid ! {epgsql, C, {x_log_data, StartLSN, EndLSN, Data}},
   {ok, EndLSN, EndLSN, CbState}.
-
-handle_socket_passive({C, _Pid} = CbState) ->
-  spawn(fun() -> epgsql:activate(C) end),
-  {ok, CbState}.