Yuriy Zhloba 9 лет назад
Родитель
Сommit
c043e0337d
4 измененных файлов с 58 добавлено и 13 удалено
  1. 1 1
      src/epgsql_pool_app.erl
  2. 3 5
      src/epgsql_pool_settings.erl
  3. 1 1
      src/epgsql_pool_utils.erl
  4. 53 6
      src/epgsql_pool_worker.erl

+ 1 - 1
src/epgsql_pool_app.erl

@@ -23,7 +23,7 @@ test_run() ->
     Params = #epgsql_connection_params{host = "localhost",
                                        port = 5432,
                                        username = "test",
-                                       password = "invalidpass",
+                                       password = "test",
                                        database = "testdb"},
     epgsql_pool_settings:set_connection_params(my_pool, Params),
     {ok, _} = epgsql_pool:start(my_pool, 1, 2),

+ 3 - 5
src/epgsql_pool_settings.erl

@@ -7,9 +7,6 @@
 -include("epgsql_pool.hrl").
 -include("otp_types.hrl").
 
--type(epgsql_pool_settings_key() ::
-        connection_timeout | query_timeout | pooler_get_worker_timeout | max_reconnect_timeout | min_reconnect_timeout).
-
 
 %%% module API
 
@@ -33,7 +30,7 @@ set_connection_params(PoolName, Params) ->
     gen_server:call(?MODULE, {save, Key, Params}).
 
 
--spec get(epgsql_pool_settings_key()) -> integer().
+-spec get(atom()) -> integer().
 get(Key) ->
     case ets:lookup(?MODULE, {settings, Key}) of
         [] -> throw({settings_not_found, Key});
@@ -41,7 +38,7 @@ get(Key) ->
     end.
 
 
--spec set(epgsql_pool_settings_key(), integer()) -> ok.
+-spec set(atom(), integer()) -> ok.
 set(Key, Value) ->
     gen_server:call(?MODULE, {save, {settings, Key}, Value}).
 
@@ -56,6 +53,7 @@ init([]) ->
     ets:insert(T, {{settings, pooler_get_worker_timeout}, 1000}),
     ets:insert(T, {{settings, max_reconnect_timeout}, 5000}),
     ets:insert(T, {{settings, min_reconnect_timeout}, 100}),
+    ets:insert(T, {{settings, keep_alive_timeout}, 60000}),
     {ok, T}.
 
 

+ 1 - 1
src/epgsql_pool_utils.erl

@@ -31,7 +31,7 @@ open_connection(PoolName, Connection0) ->
 close_connection(#epgsql_connection{sock = undefined} = Connection) -> Connection;
 close_connection(#epgsql_connection{sock = Sock} = Connection) ->
     epgsql:close(Sock),
-    Connection#epgsql_connection{sock = undefined}.
+    Connection#epgsql_connection{sock = undefined, reconnect_attempt = 0}.
 
 
 -spec reconnect(#epgsql_connection{}) -> #epgsql_connection{}.

+ 53 - 6
src/epgsql_pool_worker.erl

@@ -9,7 +9,10 @@
 -include("otp_types.hrl").
 
 -record(state, {pool_name :: atom(),
-                connection :: #epgsql_connection{}
+                connection :: #epgsql_connection{},
+                keep_alive_query_ref :: reference(), % ref to async keep-alive query to DB
+                send_keep_alive_timer :: reference(), % timer to send keep-alive query to DB
+                no_reply_keep_alive_timer :: reference() % timer to wait for reply from DB
                }).
 
 %% Module API
@@ -26,11 +29,11 @@ start_link(PoolName0) ->
 init(PoolName) ->
     error_logger:info_msg("Init epgsql pool worker: ~p", [PoolName]),
     process_flag(trap_exit, true),
-
     random:seed(now()),
-
     self() ! open_connection,
-    {ok, #state{pool_name = PoolName}}.
+    {ok, #state{pool_name = PoolName,
+                send_keep_alive_timer = make_ref(), % no need to check for undefined in cancel_timer
+                no_reply_keep_alive_timer = make_ref()}}.
 
 
 -spec handle_call(gs_request(), gs_from(), gs_reply()) -> gs_call_reply().
@@ -63,15 +66,59 @@ handle_cast(Message, State) ->
 
 
 -spec handle_info(gs_request(), gs_state()) -> gs_info_reply().
-handle_info(open_connection, #state{pool_name = PoolName, connection = Connection} = State) ->
+handle_info(open_connection, #state{pool_name = PoolName, connection = Connection,
+                                    send_keep_alive_timer = Send_KA_Timer} = State) ->
     case epgsql_pool_utils:open_connection(PoolName, Connection) of
-        {ok, Connection2} -> {noreply, State#state{connection = Connection2}};
+        {ok, Connection2} ->
+            KeepAliveTimeout = epgsql_pool_settings:get(keep_alive_timeout),
+            erlang:cancel_timer(Send_KA_Timer),
+            Send_KA_Timer2 = erlang:send_after(KeepAliveTimeout, self(), keep_alive),
+            {noreply, State#state{connection = Connection2, send_keep_alive_timer = Send_KA_Timer2}};
         {error, Reason, Connection3} ->
             error_logger:error_msg("Pool:~p, Worker:~p could not to connect to DB:~p", [PoolName, self(), Reason]),
             Connection4 = epgsql_pool_utils:reconnect(Connection3),
             {noreply, State#state{connection = Connection4}}
     end;
 
+handle_info(keep_alive, #state{connection = #epgsql_connection{sock = undefined}} = State) ->
+    do_nothing,
+    {noreply, State};
+
+handle_info(keep_alive, #state{connection = #epgsql_connection{sock = Sock},
+                               no_reply_keep_alive_timer = NR_KA_Timer} = State) ->
+    %% send async keep-alive query to DB
+    KA_Ref = epgsqli:squery(Sock, <<"SELECT 1">>),
+
+    KeepAliveTimeout = epgsql_pool_settings:get(keep_alive_timeout),
+    erlang:cancel_timer(NR_KA_Timer),
+    NR_KA_Timer2 = erlang:send_after(KeepAliveTimeout * 2, self(), no_reply_to_keep_alive),
+    {noreply, State#state{keep_alive_query_ref = KA_Ref, no_reply_keep_alive_timer = NR_KA_Timer2}};
+
+handle_info({_Pid, Ref, done}, #state{keep_alive_query_ref = Ref,
+                                      send_keep_alive_timer = Send_KA_Timer,
+                                      no_reply_keep_alive_timer = NR_KA_Timer} = State) ->
+    %% got reply to asycn keep-alive query from DB
+    KeepAliveTimeout = epgsql_pool_settings:get(keep_alive_timeout),
+    erlang:cancel_timer(Send_KA_Timer),
+    erlang:cancel_timer(NR_KA_Timer),
+    Send_KA_Timer2 = erlang:send_after(KeepAliveTimeout, self(), keep_alive),
+    {noreply, State#state{send_keep_alive_timer = Send_KA_Timer2}};
+
+handle_info({_Pid, Ref, _Reply}, #state{keep_alive_query_ref = Ref} = State) ->
+    do_nothing,
+    {noreply, State};
+
+handle_info(no_reply_to_keep_alive, #state{connection = Connection,
+                                           send_keep_alive_timer = Send_KA_Timer,
+                                           no_reply_keep_alive_timer = NR_KA_Timer} = State) ->
+    %% no reply to asycn keep-alive query from DB
+    error_logger:error_msg("DB Connection, no_reply_to_keep_alive"),
+    erlang:cancel_timer(Send_KA_Timer),
+    erlang:cancel_timer(NR_KA_Timer),
+    Connection2 = epgsql_pool_utils:close_connection(Connection),
+    Connection3 = epgsql_pool_utils:reconnect(Connection2),
+    {noreply, State#state{connection = Connection3}};
+
 handle_info({'EXIT', _Sock, normal},
             #state{connection = #epgsql_connection{sock = undefined}} = State) ->
     do_nothing,