Alexander Verbitsky 9 лет назад
Родитель
Сommit
6d0bdbddd4
7 измененных файлов с 190 добавлено и 131 удалено
  1. 4 0
      .gitignore
  2. 11 4
      include/epgsql_pool.hrl
  3. 4 2
      src/epgsql_pool.app.src
  4. 21 18
      src/epgsql_pool.erl
  5. 16 26
      src/epgsql_pool_app.erl
  6. 83 0
      src/epgsql_pool_utils.erl
  7. 51 81
      src/epgsql_pool_worker.erl

+ 4 - 0
.gitignore

@@ -0,0 +1,4 @@
+.eunit
+*.beam
+ebin
+deps/

+ 11 - 4
include/epgsql_pool.hrl

@@ -1,12 +1,19 @@
+-define(MAX_RECONNECT_TIMEOUT, 1000*30).
+-define(MIN_RECONNECT_TIMEOUT, 100).
 
 -record(epgsql_params, {
     host                :: string(),
     port                :: non_neg_integer(),
     username            :: string(),
     password            :: string(),
-    database            :: string(),
-    connection_timeout  :: non_neg_integer(),
-    query_timeout       :: non_neg_integer()
+    database            :: string()
 }).
 
--define(POOL_NAME, epgsql_pool).
+-record(epgsql_connection, {
+    connection            :: pid(),
+    params                :: #epgsql_params{},
+    connection_timeout  :: non_neg_integer(),
+    query_timeout       :: non_neg_integer(),
+    reconnect_attempt = 0 :: non_neg_integer(),
+    reconnect_timeout = 0 :: non_neg_integer()
+}).

+ 4 - 2
src/epgsql_pool.app.src

@@ -3,9 +3,11 @@
     {vsn, "1"},
     {registered, []},
     {applications, [
-        wgconfig,
+        epgsql,
+        lager,
         pooler,
-        epgsql
+        folsom,
+        wgconfig
     ]},
     {mod, { epgsql_pool_app, []}},
     {env, []}

+ 21 - 18
src/epgsql_pool.erl

@@ -1,44 +1,47 @@
 -module(epgsql_pool).
 
 -export([
-    transaction/1,
-    equery/2
+    equery/3,
+    transaction/2
 ]).
 
 -include("epgsql_pool.hrl").
 
+% TODO: move parameter into config
 -define(TIMEOUT, 1000).
 
-equery(Stmt, Params) ->
+equery({worker, _PoolName, Worker}, Stmt, Params) ->
+    % TODO: infinity - looks as dangerous
+    gen_server:call(Worker, {equery, Stmt, Params}, infinity);
+equery(PoolName, Stmt, Params) ->
     % Example
-    % epgsql_pool:equery("SELECT NOW() as now", []).
+    % epgsql_pool:equery(<<"default">>, "SELECT NOW() as now", []).
     transaction(
+        PoolName,
         fun(Worker) ->
-            equery({worker, Worker}, Stmt, Params)
+            equery(Worker, Stmt, Params)
         end).
 
-equery({worker, Worker}, Stmt, Params) ->
-    gen_server:call(Worker, {equery, Stmt, Params}, infinity).
-
-transaction(Fun) ->
+transaction(PoolName, Fun) ->
+    FullPoolName = list_to_atom("epgsql_pool." ++ binary_to_list(PoolName)),
     % TODO: logging with time of execution
-    case pooler:take_member(?POOL_NAME, ?TIMEOUT) of
-        Worker when is_pid(Worker) ->
-            W = {worker, Worker},
+    case pooler:take_member(FullPoolName, ?TIMEOUT) of
+        Pid when is_pid(Pid) ->
+            Worker = {worker, FullPoolName, Pid},
             try
-                equery(W, "BEGIN", []),
+                equery(Worker, "BEGIN", []),
                 Result = Fun(Worker),
-                equery(W, "COMMIT", []),
+                equery(Worker, "COMMIT", []),
                 Result
             catch
                 Err:Reason ->
-                    equery(W, "ROLLBACK", []),
+                    equery(Worker, "ROLLBACK", []),
                     erlang:raise(Err, Reason, erlang:get_stacktrace())
             after
-                pooler:return_member(?POOL_NAME, Worker, ok)
+                pooler:return_member(FullPoolName, Pid, ok)
             end;
         error_no_members ->
-            PoolStats = pooler:pool_stats(?POOL_NAME),
-            lager:warning("Pool overload: ~p", [PoolStats]),
+            PoolStats = pooler:pool_stats(FullPoolName),
+            lager:warning("Pool ~p overload: ~p", [FullPoolName, PoolStats]),
             {error, no_members}
     end.

+ 16 - 26
src/epgsql_pool_app.erl

@@ -11,43 +11,33 @@
 
 start(_StartType, _StartArgs) ->
     register(?MODULE, self()),
-    start_pool(),
+    lists:foreach(fun start_pool/1, wgconfig:list_sections("epgsql_pool")),
+
+    folsom_metrics:new_spiral(<<"db_connection_errors">>),
+    folsom_metrics:new_spiral(<<"db_common_errors">>),
+    folsom_metrics:new_spiral(<<"db_request_timeouts">>),
     {ok, self()}.
 
 stop(_State) ->
-    stop_pool(),
+    lists:foreach(fun stop_pool/1, wgconfig:list_sections("epgsql_pool")),
     ok.
 
+%% Internal functions
+
 -include("epgsql_pool.hrl").
 
-start_pool() ->
-    InitCount = wgconfig:get_int(?POOL_NAME, init_count),
-    MaxCount = wgconfig:get_int(?POOL_NAME, max_count),
-
-    % Connection parameters
-    Hots = wgconfig:get_string(?POOL_NAME, host),
-    Port = wgconfig:get_int(?POOL_NAME, port),
-    Username = wgconfig:get_string(?POOL_NAME, username),
-    Password = wgconfig:get_string(?POOL_NAME, password),
-    Database = wgconfig:get_string(?POOL_NAME, database),
-    ConnectionTimeout = wgconfig:get_int(?POOL_NAME, connection_timeout),
-    QueryTimeout = wgconfig:get_int(?POOL_NAME, query_timeout),
-
-    Params = #epgsql_params{
-        host=Hots, port=Port,
-        username=Username, password=Password,
-        database=Database,
-        connection_timeout=ConnectionTimeout,
-        query_timeout=QueryTimeout
-    },
+start_pool(PoolName) ->
+    InitCount = wgconfig:get_int(PoolName, init_count),
+    MaxCount = wgconfig:get_int(PoolName, max_count),
 
     PoolConfig = [
-        {name, ?POOL_NAME},
+        {name, erlang:binary_to_atom(PoolName, utf8)},
         {init_count, InitCount},
         {max_count, MaxCount},
-        {start_mfa, {epgsql_pool_worker, start_link, [Params]}}
+        {start_mfa, {epgsql_pool_worker, start_link, [PoolName]}}
     ],
     pooler:new_pool(PoolConfig).
 
-stop_pool() ->
-    pooler:rm_pool(?POOL_NAME).
+stop_pool(PoolName) ->
+    APoolName = erlang:binary_to_atom(PoolName, utf8),
+    pooler:rm_pool(APoolName).

+ 83 - 0
src/epgsql_pool_utils.erl

@@ -0,0 +1,83 @@
+-module(epgsql_pool_utils).
+
+-export([
+    new_connection/1,
+    open_connection/1,
+    close_connection/1,
+    reconnect/1
+]).
+
+-include_lib("epgsql/include/epgsql.hrl").
+-include("epgsql_pool.hrl").
+
+new_connection(SectionName) ->
+    HostSection = wgconfig:get_string(SectionName, "db_host"),
+    % Connection parameters
+    Params = #epgsql_params{
+        host                 = wgconfig:get_string(HostSection, "host"),
+        port                 = wgconfig:get_int(HostSection, "port"),
+        username             = wgconfig:get_string(HostSection, "username"),
+        password             = wgconfig:get_string(HostSection, "password"),
+        database             = wgconfig:get_string(HostSection, "database")
+    },
+    #epgsql_connection{
+        connection_timeout   = wgconfig:get_int(SectionName, "connection_timeout"),
+        query_timeout        = wgconfig:get_int(SectionName, "query_timeout"),
+        params=Params
+    }.
+
+open_connection(State) ->
+    Params = State#epgsql_connection.params,
+    lager:info("Connect ~p", [Params]),
+    #epgsql_params{
+        host               = Host,
+        port               = Port,
+        username           = Username,
+        password           = Password,
+        database           = Database
+    } = Params,
+    ConnectionTimeout = State#epgsql_connection.connection_timeout,
+
+    Res = epgsql:connect(Host, Username, Password, [        
+        {port, Port},
+        {database, Database},
+        {timeout, ConnectionTimeout}
+    ]),
+    case Res of
+        {ok, Sock} ->
+            {ok, State#epgsql_connection{
+                connection=Sock,
+                reconnect_attempt=0}};
+        {error, Reason} ->
+            lager:error("Connect fail: ~p", [Reason]),
+            {error, State}
+    end.
+
+close_connection(State) ->
+    Connection = State#epgsql_connection.connection,
+    epgsql:close(Connection),
+    #epgsql_connection{connection = undefined}.
+
+reconnect(#epgsql_connection{
+        reconnect_attempt = R,
+        reconnect_timeout = T} = State) ->
+    case T > ?MAX_RECONNECT_TIMEOUT of
+        true ->
+            reconnect_after(R, ?MIN_RECONNECT_TIMEOUT, T),
+            State#epgsql_connection{reconnect_attempt = R + 1};
+        _ ->
+            T2 = exponential_backoff(R, ?MIN_RECONNECT_TIMEOUT),
+            reconnect_after(R, ?MIN_RECONNECT_TIMEOUT, T2),
+            State#epgsql_connection{reconnect_attempt=R + 1, reconnect_timeout=T2}
+    end.
+
+reconnect_after(R, Tmin, Tmax) ->
+    Delay = rand_range(Tmin, Tmax),
+    lager:info("Reconnect after ~w ms (attempt ~w)", [Delay, R]),
+    erlang:send_after(Delay, self(), open_connection).
+
+rand_range(Min, Max) ->
+    max(random:uniform(Max), Min).
+
+exponential_backoff(N, T) ->
+    erlang:round(math:pow(2, N)) * T.

+ 51 - 81
src/epgsql_pool_worker.erl

@@ -13,37 +13,51 @@
     code_change/3
 ]).
 
+-import(epgsql_pool_utils, [
+    new_connection/1,
+    open_connection/1,
+    close_connection/1,
+    reconnect/1
+]).
+
 -include("epgsql_pool.hrl").
--include_lib("epgsql/include/epgsql.hrl").
 
--define(MAX_RECONNECT_TIMEOUT, 1000*30).
--define(MIN_RECONNECT_TIMEOUT, 200).
+-include_lib("epgsql/include/epgsql.hrl").
 
 -record(state, {
-    connection            :: pid(),
-    params                :: #epgsql_params{},
-    reconnect_attempt = 0 :: non_neg_integer(),
-    reconnect_timeout = 0 :: non_neg_integer()
+    config_section :: string(),
+    connection     :: #epgsql_connection{},
+    connection_timeout  :: non_neg_integer(),
+    query_timeout       :: non_neg_integer()
 }).
 
 start_link(Params) ->
     gen_server:start_link(?MODULE, Params, []).
 
-init(Params) -> 
+init(SectionName) -> 
+    lager:debug("Init epgsql pool worker: ~p", [SectionName]),
     process_flag(trap_exit, true),
-    random:seed(now()),
-    self() ! open_connection,
-    {ok, #state{params=Params}}.
+    random:seed(os:timestamp()),
+
+    State = #state{
+        config_section       = SectionName,
+        connection           = new_connection(SectionName)
+    },
+    erlang:send(self(), open_connection),
+    {ok, State}.
 
 handle_call({_Message}, _From, #state{connection = undefined} = State) ->
     {reply, {error, no_connection}, State};
-handle_call({equery, Stmt, Params}, From, State) ->
-    TStart = now(),
-    Result = epgsql:equery(State#state.connection, Stmt, Params),
-    Time = timer:now_diff(now(), TStart),
+handle_call({equery, Stmt, Params}, _From, State) ->
+    ConnState = State#state.connection,
+    Conn = ConnState#epgsql_connection.connection,
+    TStart = os:timestamp(),
+    %TODO: query_timeout
+    Result = epgsql:equery(Conn, Stmt, Params),
+    Time = timer:now_diff(os:timestamp(), TStart),
     lager:debug(
-        "Stmt=~p, Params=~p, Time=~p ms, Result=~p",
-        [Stmt, Params, Time / 1.0e3, Result]),
+        "Stmt=~p, Params=~p, Time=~p s, Result=~p",
+        [Stmt, Params, Time / 1.0e6, Result]),
     {reply, Result, State};
 handle_call(Message, From, State) ->
     lager:info(
@@ -55,17 +69,28 @@ handle_cast(Message, State) ->
     {noreply, State}.
 
 handle_info(open_connection, State) ->
-    case open_connection(State) of
-        {ok, UpdState} ->
-            {noreply, UpdState};
-        {error, UpdState} ->
-            {noreply, reconnect(UpdState)}
+    ConnState = State#state.connection,
+    case open_connection(ConnState) of
+        {ok, UpdConnState} ->
+            lager:debug("Connected: ~p", [UpdConnState]),
+            {noreply, State#state{connection = UpdConnState}};
+        {error, UpdConnState} ->
+            lager:error(
+                "Pool ~p could not to connect",
+                [State#state.config_section]),
+            folsom_metrics:notify({<<"db_connection_errors">>, 1}),
+            {noreply, State#state{connection = reconnect(UpdConnState)}}
     end;
 
-handle_info({'EXIT', Pid, Reason}, #state{connection = C} = State) when Pid == C ->
-    lager:error("Exit with reason: ~p", [Reason]),
-    close_connection(State),
-    {noreply, reconnect(State)};
+handle_info(
+        {'EXIT', Pid, Reason},
+        #state{connection = #epgsql_connection{connection = C}} = State)
+        when Pid == C ->
+
+    lager:debug("EXIT: Connection ~p, Reason: ~p", [Pid, Reason]),
+    UpdConnState = close_connection(State#state.connection),
+    folsom_metrics:notify({<<"db_connection_errors">>, 1}),
+    {noreply, State#state{connection = reconnect(UpdConnState)}};
 
 handle_info(Message, State) ->
     lager:debug("Info / Msg: ~p, State: ~p", [Message, State]),
@@ -79,58 +104,3 @@ code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
 %% -- internal functions --
-
-open_connection(#state{params = Params} = State) ->
-
-    #epgsql_params{
-        host               = Host,
-        port               = Port,
-        username           = Username,
-        password           = Password,
-        database           = Database,
-        connection_timeout = ConnectionTimeout
-    } = Params,
-
-    Res = epgsql:connect(Host, Username, Password, [        
-        {port, Port},
-        {database, Database},
-        {timeout, ConnectionTimeout}
-    ]),
-    case Res of
-        {ok, Sock} ->
-            {ok, State#state{
-                connection=Sock,
-                reconnect_attempt=0}};
-        {error, Reason} ->
-            lager:error("Connect fail: ~p", [Reason]),
-            {error, State}
-    end.
-
-close_connection(State) ->
-    Connection = State#state.connection,
-    epgsql:close(Connection),
-    #state{connection = undefined}.
-
-reconnect(#state{
-        reconnect_attempt = R,
-        reconnect_timeout = T} = State) ->
-    case T > ?MAX_RECONNECT_TIMEOUT of
-        true ->
-            reconnect_after(R, ?MIN_RECONNECT_TIMEOUT, T),
-            State#state{reconnect_attempt = R + 1};
-        _ ->
-            T2 = exponential_backoff(R, ?MIN_RECONNECT_TIMEOUT),
-            reconnect_after(R, ?MIN_RECONNECT_TIMEOUT, T2),
-            State#state{reconnect_attempt=R + 1, reconnect_timeout=T2}
-    end.
-
-reconnect_after(R, Tmin, Tmax) ->
-    Delay = rand_range(Tmin, Tmax),
-    lager:error("Reconnect after ~w ms (attempt ~w)", [Delay, R]),
-    erlang:send_after(Delay, self(), open_connection).
-
-rand_range(Min, Max) ->
-    max(random:uniform(Max), Min).
-
-exponential_backoff(N, T) ->
-    erlang:round(math:pow(2, N)) * T.