Browse Source

refactor utils and worker

Yuriy Zhloba 9 years ago
parent
commit
cc70993b3e
4 changed files with 117 additions and 192 deletions
  1. 11 9
      include/epgsql_pool.hrl
  2. 25 49
      src/epgsql_pool_settings.erl
  3. 34 63
      src/epgsql_pool_utils.erl
  4. 47 71
      src/epgsql_pool_worker.erl

+ 11 - 9
include/epgsql_pool.hrl

@@ -9,19 +9,21 @@
 -type(db_reply() :: term()). % TODO: narrow type
 
 
+-type(epgsql_pool_settings_key() ::
+        connection_timeout | query_timeout | pooler_get_worker_timeout | max_reconnect_timeout | min_reconnect_timeout).
+
+
 -record(epgsql_connection_params, {
-    host                :: string() | binary(),
-    port                :: non_neg_integer(),
-    username            :: string() | binary(),
-    password            :: string() | binary(),
-    database            :: string() | binary()
+    host :: string() | binary(),
+    port :: non_neg_integer(),
+    username :: string() | binary(),
+    password :: string() | binary(),
+    database :: string() | binary()
 }).
 
 -record(epgsql_connection, {
-    connection            :: pid(),
-    params                :: #epgsql_connection_params{},
-    connection_timeout    :: non_neg_integer(),
-    query_timeout         :: non_neg_integer(),
+    connection_sock :: pid(),
+    params :: #epgsql_connection_params{},
     reconnect_attempt = 0 :: non_neg_integer(),
     reconnect_timeout = 0 :: non_neg_integer()
 }).

+ 25 - 49
src/epgsql_pool_settings.erl

@@ -7,13 +7,6 @@
 -include("epgsql_pool.hrl").
 -include("otp_types.hrl").
 
--import(epgsql_pool_utils, [pool_name_to_atom/1]).
-
--record(state, {
-          connection_params :: orddict:orddictr(),
-          settings :: orddict:orddict()
-         }).
-
 
 %%% module API
 
@@ -22,68 +15,51 @@ start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
 
--spec get_connection_params(pool_name()) -> #epgsql_connection_params{}. % or throw({connection_params_not_found, PoolName})
+-spec get_connection_params(pool_name()) -> #epgsql_connection_params{}.
 get_connection_params(PoolName) ->
-    gen_server:call(?MODULE, {get_connection_params, pool_name_to_atom(PoolName)}).
+    Key = {connection, epgsql_pool_utils:pool_name_to_atom(PoolName)},
+    case ets:lookup(?MODULE, Key) of
+        [] -> throw({connection_params_not_found, PoolName});
+        [ConnectionParams] -> ConnectionParams
+    end.
 
 
 -spec set_connection_params(pool_name(), #epgsql_connection_params{}) -> ok.
 set_connection_params(PoolName, Params) ->
-    gen_server:call(?MODULE, {set_connection_params, pool_name_to_atom(PoolName), Params}).
+    Key = {connection, epgsql_pool_utils:pool_name_to_atom(PoolName)},
+    gen_server:call(?MODULE, {save, Key, Params}).
 
 
--spec get(atom()) -> integer(). % or throw({settings_not_found, Key})
+-spec get(epgsql_pool_settings_key()) -> integer().
 get(Key) ->
-    gen_server:call(?MODULE, {get, Key}).
+    case ets:lookup(?MODULE, {settings, Key}) of
+        [] -> throw({settings_not_found, Key});
+        [Value] -> Value
+    end.
 
 
--spec set(atom(), integer()) -> ok. % or throw({settings_not_found, Key})
+-spec set(epgsql_pool_settings_key(), integer()) -> ok.
 set(Key, Value) ->
-    gen_server:call(?MODULE, {set, Key, Value}).
+    gen_server:call(?MODULE, {save, {settings, Key}, Value}).
 
 
 %%% gen_server API
 
 -spec init(gs_args()) -> gs_init_reply().
 init([]) ->
-    {ok, #state{connection_params = orddict:new(),
-                settings = orddict:from_list(
-                             [{query_timeout, 10000},
-                              {pooler_get_worker_timeout, 1000},
-                              {max_reconnect_timeout, 3000},
-                              {min_reconnect_timeout, 100}
-                             ])}}.
+    T = ets:new(?MODULE, [protected, named_table]),
+    ets:insert(T, {{settings, connection_timeout}, 10000}),
+    ets:insert(T, {{settings, query_timeout}, 10000}),
+    ets:insert(T, {{settings, pooler_get_worker_timeout}, 1000}),
+    ets:insert(T, {{settings, max_reconnect_timeout}, 3000}),
+    ets:insert(T, {{settings, min_reconnect_timeout}, 100}),
+    {ok, T}.
 
 
 -spec handle_call(gs_request(), gs_from(), gs_reply()) -> gs_call_reply().
-handle_call({get_connection_params, PoolName}, _From,
-            #state{connection_params = ConnectionParams} = State) ->
-    Reply = case orddict:find(PoolName, ConnectionParams) of
-                {ok, Params} -> Params;
-                error -> throw({connection_params_not_found, PoolName})
-            end,
-    {reply, Reply, State};
-
-handle_call({set_connection_params, PoolName, Params}, _From,
-            #state{connection_params = ConnectionParams} = State) ->
-    ConnectionParams2 = orddict:store(PoolName, Params, ConnectionParams),
-    {reply, ok, State#state{connection_params = ConnectionParams2}};
-
-handle_call({get, Key}, _From,
-            #state{settings = Settings} = State) ->
-    Reply = case orddict:find(Key, Settings) of
-                {ok, Value} -> Value;
-                error -> throw({settings_not_found, Key})
-            end,
-    {reply, Reply, State};
-
-handle_call({set, Key, Value}, _From,
-            #state{settings = Settings} = State) ->
-    Settings2 = case orddict:find(Key, Settings) of
-                    {ok, _} -> orddict:store(Key, Value, Settings);
-                    error -> throw({settings_not_found, Key})
-                end,
-    {reply, ok, State#state{settings = Settings2}};
+handle_call({save, Key, Value}, _From, Table) ->
+    ets:insert(Table, Key, Value),
+    {reply, ok, Table};
 
 handle_call(Any, _From, State) ->
     error_logger:error_msg("unknown call ~p in ~p ~n", [Any, ?MODULE]),

+ 34 - 63
src/epgsql_pool_utils.erl

@@ -1,9 +1,6 @@
 -module(epgsql_pool_utils).
 
--export([
-         get_host_params/1,
-         new_connection/1,
-         open_connection/1,
+-export([open_connection/1,
          close_connection/1,
          reconnect/1,
          pool_name_to_atom/1
@@ -12,78 +9,52 @@
 -include_lib("epgsql/include/epgsql.hrl").
 -include("epgsql_pool.hrl").
 
-get_host_params(HostSection) ->
-    #epgsql_params{
-        host                 = wgconfig:get_string(HostSection, "host"),
-        port                 = wgconfig:get_int(HostSection, "port"),
-        username             = wgconfig:get_string(HostSection, "username"),
-        password             = wgconfig:get_string(HostSection, "password"),
-        database             = wgconfig:get_string(HostSection, "database")
-    }.
 
-new_connection(SectionName) ->
-    HostSection = wgconfig:get_string(SectionName, "db_host"),
-    % Connection parameters
-    Params = get_host_params(HostSection),
-    #epgsql_connection{
-        connection_timeout   = wgconfig:get_int(SectionName, "connection_timeout"),
-        query_timeout        = wgconfig:get_int(SectionName, "query_timeout"),
-        params=Params
-    }.
-
-open_connection(State) ->
-    Params = State#epgsql_connection.params,
-    lager:info("Connect ~p", [Params]),
-    #epgsql_params{
-        host               = Host,
-        port               = Port,
-        username           = Username,
-        password           = Password,
-        database           = Database
-    } = Params,
-    ConnectionTimeout = State#epgsql_connection.connection_timeout,
-
-    Res = epgsql:connect(Host, Username, Password, [
-        {port, Port},
-        {database, Database},
-        {timeout, ConnectionTimeout}
-    ]),
+-spec open_connection(#epgsql_connection_params{}) -> #epgsql_connection{}.
+open_connection(#epgsql_params{host = Host, port = Port,
+                               username = Username,
+                               password = Password,
+                               database = Database} = ConnectionParams) ->
+    Connection0 = #epgsql_connection{params = ConnectionParams, reconnect_attempt=0},
+    ConnectionTimeout = epgsql_pool_settings:get(connection_timeout),
+    Res = epgsql:connect(Host, Username, Password,
+                         [{port, Port},
+                          {database, Database},
+                          {timeout, ConnectionTimeout}]),
     case Res of
-        {ok, Sock} ->
-            {ok, State#epgsql_connection{
-                connection=Sock,
-                reconnect_attempt=0}};
-        {error, Reason} ->
-            lager:error("Connect fail: ~p", [Reason]),
-            {error, State}
+        {ok, Sock} -> {ok, Connection0#epgsql_connection{connection_sock = Sock}};
+        {error, Reason} -> {error, Reason, Connection0}
     end.
 
-close_connection(State) ->
-    Connection = State#epgsql_connection.connection,
-    epgsql:close(Connection),
-    #epgsql_connection{connection = undefined}.
 
-reconnect(#epgsql_connection{
-        reconnect_attempt = R,
-        reconnect_timeout = T} = State) ->
+-spec close_connection(#epgsql_connection{}) -> #epgsql_connection{}.
+close_connection(#epgsql_connection{connection_sock = Sock} = Connection) ->
+    epgsql:close(Sock),
+    Connection#epgsql_connection{connection_pid = undefined}.
+
+
+-spec reconnect(#epgsql_connection{}) -> #epgsql_connection{}.
+reconnect(#epgsql_connection{reconnect_attempt = R,
+                             reconnect_timeout = T} = Connection) ->
     case T > ?DB_MAX_RECONNECT_TIMEOUT of
         true ->
-            reconnect_after(R, ?DB_MIN_RECONNECT_TIMEOUT, T),
-            State#epgsql_connection{reconnect_attempt = R + 1};
+            reconnect_after(?DB_MIN_RECONNECT_TIMEOUT, T),
+            Connection#epgsql_connection{reconnect_attempt = R + 1};
         _ ->
             T2 = exponential_backoff(R, ?DB_MIN_RECONNECT_TIMEOUT),
-            reconnect_after(R, ?DB_MIN_RECONNECT_TIMEOUT, T2),
-            State#epgsql_connection{reconnect_attempt=R + 1, reconnect_timeout=T2}
+            reconnect_after(?DB_MIN_RECONNECT_TIMEOUT, T2),
+            Connection#epgsql_connection{reconnect_attempt = R + 1, reconnect_timeout = T2}
     end.
 
-reconnect_after(R, Tmin, Tmax) ->
-    Delay = rand_range(Tmin, Tmax),
-    lager:info("Reconnect after ~w ms (attempt ~w)", [Delay, R]),
-    erlang:send_after(Delay, self(), open_connection).
 
-rand_range(Min, Max) ->
-    max(random:uniform(Max), Min).
+-spec reconnect_after(integer(), integer()) -> ok.
+reconnect_after(TMin, TMax) ->
+    Delay = max(random:uniform(TMax), TMin),
+    erlang:send_after(Delay, self(), open_connection),
+    ok.
+
 
+-spec exponential_backoff(integer(), integer()) -> integer().
 exponential_backoff(N, T) ->
     erlang:round(math:pow(2, N)) * T.
 

+ 47 - 71
src/epgsql_pool_worker.erl

@@ -3,104 +3,80 @@
 -behaviour(gen_server).
 
 -export([start_link/1]).
-
--export([
-    init/1,
-    handle_call/3,
-    handle_cast/2,
-    handle_info/2,
-    terminate/2,
-    code_change/3
-]).
-
--import(epgsql_pool_utils, [
-    new_connection/1,
-    open_connection/1,
-    close_connection/1,
-    reconnect/1
-]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
 
 -include("epgsql_pool.hrl").
-
 -include_lib("epgsql/include/epgsql.hrl").
 
--record(state, {
-    config_section :: string(),
-    connection     :: #epgsql_connection{},
-    connection_timeout  :: non_neg_integer(),
-    query_timeout       :: non_neg_integer()
-}).
+-record(state, {pool_name :: atom(),
+                connection :: #epgsql_connection{}
+               }).
+
+%% Module API
+
+start_link(PoolName0) ->
+    PoolName = epgsql_pool_utils:pool_name_to_atom(PoolName0),
+    gen_server:start_link(?MODULE, PoolName, []).
 
-start_link(Params) ->
-    gen_server:start_link(?MODULE, Params, []).
 
-init(SectionName) -> 
-    lager:debug("Init epgsql pool worker: ~p", [SectionName]),
+%%% gen_server API
+
+init(PoolName) ->
+    error_logger:info_message("Init epgsql pool worker: ~p", [PoolName]),
     process_flag(trap_exit, true),
-    random:seed(os:timestamp()),
+    self() ! open_connection,
+    {ok, #state{pool_name = PoolName}}.
 
-    State = #state{
-        config_section       = SectionName,
-        connection           = new_connection(SectionName)
-    },
-    erlang:send(self(), open_connection),
-    {ok, State}.
 
-handle_call({_Message}, _From, #state{connection = undefined} = State) ->
+handle_call({equery, _, _}, _From, #state{connection = undefined} = State) ->
     {reply, {error, no_connection}, State};
+
 handle_call({equery, Stmt, Params}, _From, State) ->
     ConnState = State#state.connection,
     Conn = ConnState#epgsql_connection.connection,
-    TStart = os:timestamp(),
-    %TODO: query_timeout
+    %% TStart = os:timestamp(),
+    %% TODO: query_timeout
     Result = epgsql:equery(Conn, Stmt, Params),
-    Time = timer:now_diff(os:timestamp(), TStart),
-    lager:debug(
-        "Stmt=~p, Params=~p, Time=~p s, Result=~p",
-        [Stmt, Params, Time / 1.0e6, Result]),
+    %% Time = timer:now_diff(os:timestamp(), TStart),
     {reply, Result, State};
-handle_call(Message, From, State) ->
-    lager:info(
-        "Call / Message: ~p, From: ~p, State: ~p", [Message, From, State]),
+
+handle_call(Message, _From, State) ->
+    error_logger:error_msg("unknown call ~p in ~p ~n", [Message, ?MODULE]),
     {reply, ok, State}.
 
+
 handle_cast(Message, State) ->
-    lager:info("Cast / Message: ~p, State: ~p", [Message, State]),
+    error_logger:error_msg("unknown cast ~p in ~p ~n", [Message, ?MODULE]),
     {noreply, State}.
 
-handle_info(open_connection, State) ->
-    ConnState = State#state.connection,
-    case open_connection(ConnState) of
-        {ok, UpdConnState} ->
-            lager:debug("Connected: ~p", [UpdConnState]),
-            {noreply, State#state{connection = UpdConnState}};
-        {error, UpdConnState} ->
-            lager:error(
-                "Pool ~p could not to connect",
-                [State#state.config_section]),
-            folsom_metrics:notify({<<"db_connection_errors">>, 1}),
-            {noreply, State#state{connection = reconnect(UpdConnState)}}
-    end;
 
-handle_info(
-        {'EXIT', Pid, Reason},
-        #state{connection = #epgsql_connection{connection = C}} = State)
-        when Pid == C ->
+handle_info(open_connection, #state{pool_name = PoolName} = State) ->
+    ConnectionParams = epgsql_pool_settings:get_connection_params(PoolName),
+    case open_connection(ConnectionParams) of
+        {ok, Connection} ->
+            {noreply, State#state{connection = Connection}};
+        {error, Reason, Connection} ->
+            error_logger:error_msg("Pool ~p could not to connect to DB:~p", [PoolName, Reason]),
+            Connection2 = epgsql_pool_utils:reconnect(Connection),
+            {noreply, State#state{connection = Connection2}}
+    end;
 
-    lager:debug("EXIT: Connection ~p, Reason: ~p", [Pid, Reason]),
-    UpdConnState = close_connection(State#state.connection),
-    folsom_metrics:notify({<<"db_connection_errors">>, 1}),
-    {noreply, State#state{connection = reconnect(UpdConnState)}};
+handle_info({'EXIT', Pid, Reason},
+            #state{connection = #epgsql_connection{connection_sock = Sock}} = State)
+  when Pid == Sock ->
+    error_logger:error_msg("DB Connection ~p EXIT with reason: ~p", [Pid, Reason]),
+    Connection = epgsql_pool_utils:close_connection(State#state.connection),
+    Connection2 = epgsql_pool_utils:reconnect(Connection),
+    {noreply, State#state{connection = Connection2}};
 
 handle_info(Message, State) ->
-    lager:debug("Info / Msg: ~p, State: ~p", [Message, State]),
+    error_logger:error_msg("unknown info ~p in ~p ~n", [Message, ?MODULE]),
     {noreply, State}.
 
-terminate(Reason, State) ->
-    lager:debug("Terminate / Reason: ~p, State: ~p", [Reason, State]),
+
+terminate(_Reason, _State) ->
     normal.
 
+
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
-
-%% -- internal functions --