Yuriy Zhloba 9 лет назад
Родитель
Сommit
eed8c642eb
4 измененных файлов с 97 добавлено и 9 удалено
  1. 26 3
      src/epgsql_pool.erl
  2. 19 1
      src/epgsql_pool_settings.erl
  3. 41 5
      test/epgsql_pool_SUITE.erl
  4. 11 0
      test/epgsql_pool_settings_tests.erl

+ 26 - 3
src/epgsql_pool.erl

@@ -2,6 +2,7 @@
 
 -export([start/3, stop/1,
          validate_connection_params/1,
+         set_monitoring_callback/1,
          query/2, query/3, query/4,
          transaction/2
         ]).
@@ -45,6 +46,11 @@ validate_connection_params(#epgsql_connection_params{host = Host, port = Port, u
     end.
 
 
+-spec set_monitoring_callback(fun()) -> ok.
+set_monitoring_callback(Callback) ->
+    epgsql_pool_settings:set_monitoring_callback(Callback).
+
+
 -spec query(pool_name() | pid(), epgsql:sql_query()) -> epgsql:reply().
 query(PoolNameOrWorker, Stmt) ->
     query(PoolNameOrWorker, Stmt, [], []).
@@ -61,14 +67,22 @@ query(Worker, Stmt, Params, Options) when is_pid(Worker) ->
                   undefined -> epgsql_pool_settings:get(query_timeout);
                   V -> V
               end,
-    %% TStart = os:timestamp(),
-    %% Time = timer:now_diff(os:timestamp(), TStart),
     try
-        gen_server:call(Worker, {equery, Stmt, Params}, Timeout)
+        TStart = os:timestamp(),
+        Res = gen_server:call(Worker, {equery, Stmt, Params}, Timeout),
+        Time = timer:now_diff(os:timestamp(), TStart),
+        notify_db_event({db_query_time, Time}),
+        case Res of
+            {error, Error} -> notify_db_event({db_query_error, Stmt, Params, Error});
+            _ -> do_nothing
+        end,
+        %% TODO monitor query time
+        Res
     catch
         exit:{timeout, _} ->
             gen_server:call(Worker, cancel),
             error_logger:error_msg("query timeout ~p ~p", [Stmt, Params]),
+            notify_db_event({db_query_timeout, Stmt, Params}),
             {error, timeout}
     end;
 
@@ -99,6 +113,8 @@ transaction(PoolName, Fun) ->
     end.
 
 
+%%% inner functions
+
 get_worker(PoolName0) ->
     PoolName = epgsql_pool_utils:pool_name_to_atom(PoolName0),
     Timeout = epgsql_pool_settings:get(pooler_get_worker_timeout),
@@ -109,3 +125,10 @@ get_worker(PoolName0) ->
             error_logger:error_msg("Pool ~p overload: ~p", [PoolName, PoolStats]),
             {error, pool_overload}
     end.
+
+
+notify_db_event(Event) ->
+    case epgsql_pool_settings:get_monitoring_callback() of
+        undefined -> do_nothing;
+        F when is_function(F) -> F(Event)
+    end.

+ 19 - 1
src/epgsql_pool_settings.erl

@@ -1,7 +1,12 @@
 -module(epgsql_pool_settings).
 -behavior(gen_server).
 
--export([start_link/0, get_connection_params/1, set_connection_params/2, get/1, set/2]).
+-export([start_link/0,
+         get_connection_params/1,
+         set_connection_params/2,
+         get_monitoring_callback/0,
+         set_monitoring_callback/1,
+         get/1, set/2]).
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
 
 -include("epgsql_pool.hrl").
@@ -30,6 +35,19 @@ set_connection_params(PoolName, Params) ->
     gen_server:call(?MODULE, {save, Key, Params}).
 
 
+-spec get_monitoring_callback() -> fun() | undefined.
+get_monitoring_callback() ->
+    case ets:lookup(?MODULE, monitoring_callback) of
+        [] -> undefined;
+        [{monitoring_callback, Callback}] -> Callback
+    end.
+
+
+-spec set_monitoring_callback(fun()) -> ok.
+set_monitoring_callback(Callback) ->
+    gen_server:call(?MODULE, {save, monitoring_callback, Callback}).
+
+
 -spec get(atom()) -> integer().
 get(Key) ->
     case ets:lookup(?MODULE, {settings, Key}) of

+ 41 - 5
test/epgsql_pool_SUITE.erl

@@ -11,7 +11,8 @@
 
 -export([all/0,
          init_per_suite/1, end_per_suite/1, init_per_testcase/2, end_per_testcase/2,
-         query_test/1, transaction_test/1, reconnect_test/1, timeout_test/1, validate_connection_params_test/1
+         query_test/1, transaction_test/1, reconnect_test/1, timeout_test/1,
+         validate_connection_params_test/1, monitoring_callback_test/1
         ]).
 
 -define(SELECT_ITEMS_QUERY, "SELECT id, category_id, title, num FROM item ORDER by id ASC").
@@ -22,7 +23,8 @@ all() ->
      transaction_test,
      reconnect_test,
      timeout_test,
-     validate_connection_params_test
+     validate_connection_params_test,
+     monitoring_callback_test
     ].
 
 
@@ -177,19 +179,53 @@ timeout_test(_Config) ->
 
 
 validate_connection_params_test(_Config) ->
-    Params1 = #epgsql_connection_params{host = "localhost", port = 5432, username = "test", password = "test", database = "testdb"},
+    Params1 = #epgsql_connection_params{host = "localhost", port = 5432,
+                                        username = "test", password = "test", database = "testdb"},
     Res1 = epgsql_pool:validate_connection_params(Params1),
     ct:pal("Res1: ~p", [Res1]),
     ?assertEqual(ok, Res1),
 
-    Params2 = #epgsql_connection_params{host = "localhost", port = 5432, username = "test", password = "some", database = "testdb"},
+    Params2 = #epgsql_connection_params{host = "localhost", port = 5432,
+                                        username = "test", password = "some", database = "testdb"},
     Res2 = epgsql_pool:validate_connection_params(Params2),
     ct:pal("Res2: ~p", [Res2]),
     ?assertEqual({error,invalid_password}, Res2),
 
-    Params3 = #epgsql_connection_params{host = "localhost", port = 5432, username = "test", password = "test", database = "some"},
+    Params3 = #epgsql_connection_params{host = "localhost", port = 5432,
+                                        username = "test", password = "test", database = "some"},
     Res3 = epgsql_pool:validate_connection_params(Params3),
     ct:pal("Res3: ~p", [Res3]),
     ?assertEqual({error,{error,fatal,<<"3D000">>,<<"database \"some\" does not exist">>, []}}, Res3),
 
     ok.
+
+
+monitoring_callback_test(_Config) ->
+    Callback = fun({db_query_time, Time}) ->
+                       ct:pal("db_query_time ~p", [Time]),
+                       put(db_query_time, true); % use process dictionary
+                  ({db_query_error, _Stmt, _Params, Error}) ->
+                       ct:pal("db_query_error ~p", [Error]),
+                       put(db_query_error, true);
+                  ({db_query_timeout, _Stmt, _Params}) ->
+                       ct:pal("db_query_timeout"),
+                       put(db_query_timeout, true)
+               end,
+    epgsql_pool:set_monitoring_callback(Callback),
+
+    Res1 = epgsql_pool:query(my_pool, ?SELECT_ITEMS_QUERY),
+    ct:pal("Res1:~p", [Res1]),
+    ?assertMatch({ok, _, _}, Res1),
+    ?assertEqual(true, get(db_query_time)),
+
+    Res2 = epgsql_pool:query(my_pool, "SELECT * FROM blablabla"),
+    ct:pal("Res2:~p", [Res2]),
+    ?assertMatch({error, #error{}}, Res2),
+    ?assertEqual(true, get(db_query_error)),
+
+    Res3 = epgsql_pool:query(my_pool, "SELECT pg_sleep(1)", [], [{timeout, 500}]),
+    ct:pal("Res3:~p", [Res3]),
+    ?assertEqual({error, timeout}, Res3),
+    ?assertEqual(true, get(db_query_timeout)),
+
+    ok.

+ 11 - 0
test/epgsql_pool_settings_tests.erl

@@ -48,3 +48,14 @@ connection_params_test() ->
 
     epgsql_pool_settings ! stop,
     ok.
+
+
+monitoring_callback_test() ->
+    epgsql_pool_settings:start_link(),
+
+    ?assertEqual(undefined, epgsql_pool_settings:get_monitoring_callback()),
+    Callback = fun(Event) -> Event end,
+    ?assertEqual(ok, epgsql_pool_settings:set_monitoring_callback(Callback)),
+    ?assertEqual(Callback, epgsql_pool_settings:get_monitoring_callback()),
+
+    ok.