Browse Source

refactor, add epgsql_pool_settings

Yuriy Zhloba 9 years ago
parent
commit
bcba1b6ba2

+ 1 - 0
.gitignore

@@ -2,3 +2,4 @@
 *.beam
 *.beam
 ebin
 ebin
 deps/
 deps/
+.rebar

+ 18 - 10
include/epgsql_pool.hrl

@@ -1,19 +1,27 @@
--define(MAX_RECONNECT_TIMEOUT, 1000*30).
--define(MIN_RECONNECT_TIMEOUT, 100).
+-define(DB_QUERY_TIMEOUT, 10000).
+-define(DB_POOLER_GET_WORKER_TIMEOUT, 1000).
+-define(DB_MAX_RECONNECT_TIMEOUT, 3000).
+-define(DB_MIN_RECONNECT_TIMEOUT, 100).
 
 
--record(epgsql_params, {
-    host                :: string(),
+
+-type(pool_name() :: binary() | string() | atom()).
+-type(db_query() :: binary() | string()). % TODO: did driver accepts string?
+-type(db_reply() :: term()). % TODO: narrow type
+
+
+-record(epgsql_connection_params, {
+    host                :: string() | binary(),
     port                :: non_neg_integer(),
     port                :: non_neg_integer(),
-    username            :: string(),
-    password            :: string(),
-    database            :: string()
+    username            :: string() | binary(),
+    password            :: string() | binary(),
+    database            :: string() | binary()
 }).
 }).
 
 
 -record(epgsql_connection, {
 -record(epgsql_connection, {
     connection            :: pid(),
     connection            :: pid(),
-    params                :: #epgsql_params{},
-    connection_timeout  :: non_neg_integer(),
-    query_timeout       :: non_neg_integer(),
+    params                :: #epgsql_connection_params{},
+    connection_timeout    :: non_neg_integer(),
+    query_timeout         :: non_neg_integer(),
     reconnect_attempt = 0 :: non_neg_integer(),
     reconnect_attempt = 0 :: non_neg_integer(),
     reconnect_timeout = 0 :: non_neg_integer()
     reconnect_timeout = 0 :: non_neg_integer()
 }).
 }).

+ 44 - 0
include/otp_types.hrl

@@ -0,0 +1,44 @@
+%% shorter definitions for gen_server and supervisor callback types
+
+-type(gs_args() :: term()).
+-type(gs_state() :: term()).
+-type(gs_reason() :: term()).
+
+-type(gs_start_link_reply() :: {ok, pid()} | ignore | {error, term()}).
+
+-type(gs_init_reply() ::
+    {ok, gs_state()} | {ok, gs_state(), timeout() | hibernate} |
+    {stop, gs_reason()} | ignore).
+
+-type(gs_request() :: term()).
+-type(gs_from() :: {pid(), term()}).
+-type(gs_reply() :: term()).
+
+-type(gs_call_reply() ::
+    {reply, gs_reply(), gs_state()} |
+    {reply, gs_reply(), gs_state(), timeout() | hibernate} |
+    {noreply, gs_state()} |
+    {noreply, gs_state(), timeout() | hibernate} |
+    {stop, gs_reason(), gs_reply(), gs_state()} |
+    {stop, gs_reason(), gs_state()}).
+
+-type(gs_cast_reply() ::
+    {noreply, gs_state()} |
+    {noreply, gs_state(), timeout() | hibernate} |
+    {stop, gs_reason(), gs_state()}).
+
+-type(gs_info_reply() ::
+    {noreply, gs_state()} |
+    {noreply, gs_state(), timeout() | hibernate} |
+    {stop, gs_reason(), gs_state()}).
+
+-type(terminate_reason() :: normal | shutdown | {shutdown, term()} | term()).
+
+-type(gs_code_change_reply() ::
+    {ok, gs_state()} | {error, gs_reason()}).
+
+
+-type(sup_init_reply() ::
+        {ok, {{supervisor:strategy(), non_neg_integer(), non_neg_integer()},
+              [supervisor:child_spec()]}}
+      | ignore).

+ 2 - 6
src/epgsql_pool.app.src

@@ -1,14 +1,10 @@
 {application, epgsql_pool, [
 {application, epgsql_pool, [
     {description, "Connection pool for PostgreSQL"},
     {description, "Connection pool for PostgreSQL"},
-    {vsn, "1"},
+    {vsn, "1.0.0"},
     {registered, []},
     {registered, []},
     {applications, [
     {applications, [
         epgsql,
         epgsql,
-        lager,
-        pooler,
-        folsom,
-        wgconfig
+        pooler
     ]},
     ]},
-    {mod, { epgsql_pool_app, []}},
     {env, []}
     {env, []}
 ]}.
 ]}.

+ 56 - 27
src/epgsql_pool.erl

@@ -1,33 +1,55 @@
 -module(epgsql_pool).
 -module(epgsql_pool).
 
 
 -export([
 -export([
-    equery/3,
-    transaction/2
-]).
+         start/3, stop/1,
+         equery/2, equery/3,
+         transaction/2
+        ]).
 
 
 -include("epgsql_pool.hrl").
 -include("epgsql_pool.hrl").
 
 
-% TODO: move parameter into config
--define(TIMEOUT, 1000).
+-import(epgsql_pool_utils, [pool_name_to_atom/1]).
 
 
-equery({worker, _PoolName, Worker}, Stmt, Params) ->
-    % TODO: infinity - looks as dangerous
-    gen_server:call(Worker, {equery, Stmt, Params}, infinity);
+
+%% Module API
+
+-spec start(pool_name(), integer(), integer()) -> ok. % TODO what returns from pooler:new_pool?
+start(PoolName0, InitCount, MaxCount) ->
+    PoolName = pool_name_to_atom(PoolName0),
+    PoolConfig = [
+        {name, PoolName},
+        {init_count, InitCount},
+        {max_count, MaxCount},
+        {start_mfa, {epgsql_pool_worker, start_link, [PoolName]}}
+    ],
+    pooler:new_pool(PoolConfig).
+
+
+-spec stop(pool_name()) -> ok. % TODO what returns from pooler:stop?
+stop(PoolName) ->
+    pooler:rm_pool(pool_name_to_atom(PoolName)).
+
+
+-spec equery(pool_name(), db_query()) -> db_reply().
+equery(PoolName, Stmt) ->
+    equery(PoolName, Stmt, []).
+
+
+%% Example
+%% epgsql_pool:equery("my_db_pool", "SELECT NOW() as now", []).
+-spec equery(pool_name(), db_query(), list()) -> db_reply().
 equery(PoolName, Stmt, Params) ->
 equery(PoolName, Stmt, Params) ->
-    % Example
-    % epgsql_pool:equery(<<"default">>, "SELECT NOW() as now", []).
-    transaction(
-        PoolName,
-        fun(Worker) ->
-            equery(Worker, Stmt, Params)
-        end).
-
-transaction(PoolName, Fun) ->
-    FullPoolName = list_to_atom("epgsql_pool." ++ binary_to_list(PoolName)),
-    % TODO: logging with time of execution
-    case pooler:take_member(FullPoolName, ?TIMEOUT) of
-        Pid when is_pid(Pid) ->
-            Worker = {worker, FullPoolName, Pid},
+    transaction(PoolName,
+                fun(Worker) ->
+                        equery_with_worker(Worker, Stmt, Params)
+                end).
+
+
+-spec transaction(pool_name(), fun()) -> db_reply() | {error, term()}.
+transaction(PoolName0, Fun) ->
+    PoolName = pool_name_to_atom(PoolName0),
+    case pooler:take_member(PoolName, ?DB_POOLER_GET_WORKER_TIMEOUT) of
+        Worker when is_pid(Worker) ->
             try
             try
                 equery(Worker, "BEGIN", []),
                 equery(Worker, "BEGIN", []),
                 Result = Fun(Worker),
                 Result = Fun(Worker),
@@ -38,10 +60,17 @@ transaction(PoolName, Fun) ->
                     equery(Worker, "ROLLBACK", []),
                     equery(Worker, "ROLLBACK", []),
                     erlang:raise(Err, Reason, erlang:get_stacktrace())
                     erlang:raise(Err, Reason, erlang:get_stacktrace())
             after
             after
-                pooler:return_member(FullPoolName, Pid, ok)
+                pooler:return_member(PoolName, Worker, ok)
             end;
             end;
         error_no_members ->
         error_no_members ->
-            PoolStats = pooler:pool_stats(FullPoolName),
-            lager:warning("Pool ~p overload: ~p", [FullPoolName, PoolStats]),
-            {error, no_members}
-    end.
+            PoolStats = pooler:pool_stats(PoolName),
+            error_logger:error_msg("Pool ~p overload: ~p", [PoolName, PoolStats]),
+            {error, pool_overload}
+    end.
+
+
+%% Inner functions
+
+-spec equery_with_worker(pid(), db_query(), list()) -> db_reply().
+equery_with_worker(Worker, Stmt, Params) ->
+    gen_server:call(Worker, {equery, Stmt, Params}, ?DB_QUERY_TIMEOUT).

+ 1 - 31
src/epgsql_pool_app.erl

@@ -2,42 +2,12 @@
 
 
 -behaviour(application).
 -behaviour(application).
 
 
-%% Application callbacks
 -export([start/2, stop/1]).
 -export([start/2, stop/1]).
 
 
-%% ===================================================================
-%% Application callbacks
-%% ===================================================================
 
 
 start(_StartType, _StartArgs) ->
 start(_StartType, _StartArgs) ->
-    register(?MODULE, self()),
-    lists:foreach(fun start_pool/1, wgconfig:list_sections("epgsql_pool")),
+    epgsql_pool_sup:start_link().
 
 
-    folsom_metrics:new_spiral(<<"db_connection_errors">>),
-    folsom_metrics:new_spiral(<<"db_common_errors">>),
-    folsom_metrics:new_spiral(<<"db_request_timeouts">>),
-    {ok, self()}.
 
 
 stop(_State) ->
 stop(_State) ->
-    lists:foreach(fun stop_pool/1, wgconfig:list_sections("epgsql_pool")),
     ok.
     ok.
-
-%% Internal functions
-
--include("epgsql_pool.hrl").
-
-start_pool(PoolName) ->
-    InitCount = wgconfig:get_int(PoolName, init_count),
-    MaxCount = wgconfig:get_int(PoolName, max_count),
-
-    PoolConfig = [
-        {name, erlang:binary_to_atom(PoolName, utf8)},
-        {init_count, InitCount},
-        {max_count, MaxCount},
-        {start_mfa, {epgsql_pool_worker, start_link, [PoolName]}}
-    ],
-    pooler:new_pool(PoolConfig).
-
-stop_pool(PoolName) ->
-    APoolName = erlang:binary_to_atom(PoolName, utf8),
-    pooler:rm_pool(APoolName).

+ 112 - 0
src/epgsql_pool_settings.erl

@@ -0,0 +1,112 @@
+-module(epgsql_pool_settings).
+-behavior(gen_server).
+
+-export([start_link/0, get_connection_params/1, set_connection_params/2, get/1, set/2]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
+
+-include("epgsql_pool.hrl").
+-include("otp_types.hrl").
+
+-import(epgsql_pool_utils, [pool_name_to_atom/1]).
+
+-record(state, {
+          connection_params :: orddict:orddictr(),
+          settings :: orddict:orddict()
+         }).
+
+
+%%% module API
+
+-spec start_link() -> gs_start_link_reply().
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+-spec get_connection_params(pool_name()) -> #epgsql_connection_params{}. % or throw({connection_params_not_found, PoolName})
+get_connection_params(PoolName) ->
+    gen_server:call(?MODULE, {get_connection_params, pool_name_to_atom(PoolName)}).
+
+
+-spec set_connection_params(pool_name(), #epgsql_connection_params{}) -> ok.
+set_connection_params(PoolName, Params) ->
+    gen_server:call(?MODULE, {set_connection_params, pool_name_to_atom(PoolName), Params}).
+
+
+-spec get(atom()) -> integer(). % or throw({settings_not_found, Key})
+get(Key) ->
+    gen_server:call(?MODULE, {get, Key}).
+
+
+-spec set(atom(), integer()) -> ok. % or throw({settings_not_found, Key})
+set(Key, Value) ->
+    gen_server:call(?MODULE, {set, Key, Value}).
+
+
+%%% gen_server API
+
+-spec init(gs_args()) -> gs_init_reply().
+init([]) ->
+    {ok, #state{connection_params = orddict:new(),
+                settings = orddict:from_list(
+                             [{query_timeout, 10000},
+                              {pooler_get_worker_timeout, 1000},
+                              {max_reconnect_timeout, 3000},
+                              {min_reconnect_timeout, 100}
+                             ])}}.
+
+
+-spec handle_call(gs_request(), gs_from(), gs_reply()) -> gs_call_reply().
+handle_call({get_connection_params, PoolName}, _From,
+            #state{connection_params = ConnectionParams} = State) ->
+    Reply = case orddict:find(PoolName, ConnectionParams) of
+                {ok, Params} -> Params;
+                error -> throw({connection_params_not_found, PoolName})
+            end,
+    {reply, Reply, State};
+
+handle_call({set_connection_params, PoolName, Params}, _From,
+            #state{connection_params = ConnectionParams} = State) ->
+    ConnectionParams2 = orddict:store(PoolName, Params, ConnectionParams),
+    {reply, ok, State#state{connection_params = ConnectionParams2}};
+
+handle_call({get, Key}, _From,
+            #state{settings = Settings} = State) ->
+    Reply = case orddict:find(Key, Settings) of
+                {ok, Value} -> Value;
+                error -> throw({settings_not_found, Key})
+            end,
+    {reply, Reply, State};
+
+handle_call({set, Key, Value}, _From,
+            #state{settings = Settings} = State) ->
+    Settings2 = case orddict:find(Key, Settings) of
+                    {ok, _} -> orddict:store(Key, Value, Settings);
+                    error -> throw({settings_not_found, Key})
+                end,
+    {reply, ok, State#state{settings = Settings2}};
+
+handle_call(Any, _From, State) ->
+    error_logger:error_msg("unknown call ~p in ~p ~n", [Any, ?MODULE]),
+    {noreply, State}.
+
+
+-spec handle_cast(gs_request(), gs_state()) -> gs_cast_reply().
+handle_cast(Any, State) ->
+    error_logger:error_msg("unknown cast ~p in ~p ~n", [Any, ?MODULE]),
+    {noreply, State}.
+
+
+-spec handle_info(gs_request(), gs_state()) -> gs_info_reply().
+handle_info(Request, State) ->
+    error_logger:error_msg("unknown info ~p in ~p ~n", [Request, ?MODULE]),
+    {noreply, State}.
+
+
+-spec terminate(terminate_reason(), gs_state()) -> ok.
+terminate(_Reason, _State) ->
+    ok.
+
+
+-spec code_change(term(), term(), term()) -> gs_code_change_reply().
+code_change(_OldVersion, State, _Extra) ->
+    {ok, State}.

+ 33 - 0
src/epgsql_pool_sup.erl

@@ -0,0 +1,33 @@
+-module(epgsql_pool_sup).
+-behaviour(supervisor).
+
+-export([start_link/0, init/1]).
+
+-include("otp_types.hrl").
+
+
+-spec(start_link() -> {ok, pid()}).
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+-spec(init(gs_args()) -> sup_init_reply()).
+init(_Args) ->
+    RestartStrategy = one_for_one, % one_for_one | one_for_all | rest_for_one
+    Intensity = 10, %% max restarts
+    Period = 60, %% in period of time
+    SupervisorSpecification = {RestartStrategy, Intensity, Period},
+
+    Restart = permanent, % permanent | transient | temporary
+    Shutdown = 2000, % milliseconds | brutal_kill | infinity
+
+    ChildSpecifications =
+        [
+         {epgsql_pool_settings,
+          {epgsql_pool_settings, start_link, []},
+          Restart,
+          Shutdown,
+          worker,
+          [epgsql_pool_settings]}
+        ],
+    {ok, {SupervisorSpecification, ChildSpecifications}}.

+ 20 - 11
src/epgsql_pool_utils.erl

@@ -1,12 +1,13 @@
 -module(epgsql_pool_utils).
 -module(epgsql_pool_utils).
 
 
 -export([
 -export([
-    get_host_params/1,
-    new_connection/1,
-    open_connection/1,
-    close_connection/1,
-    reconnect/1
-]).
+         get_host_params/1,
+         new_connection/1,
+         open_connection/1,
+         close_connection/1,
+         reconnect/1,
+         pool_name_to_atom/1
+        ]).
 
 
 -include_lib("epgsql/include/epgsql.hrl").
 -include_lib("epgsql/include/epgsql.hrl").
 -include("epgsql_pool.hrl").
 -include("epgsql_pool.hrl").
@@ -42,7 +43,7 @@ open_connection(State) ->
     } = Params,
     } = Params,
     ConnectionTimeout = State#epgsql_connection.connection_timeout,
     ConnectionTimeout = State#epgsql_connection.connection_timeout,
 
 
-    Res = epgsql:connect(Host, Username, Password, [        
+    Res = epgsql:connect(Host, Username, Password, [
         {port, Port},
         {port, Port},
         {database, Database},
         {database, Database},
         {timeout, ConnectionTimeout}
         {timeout, ConnectionTimeout}
@@ -65,13 +66,13 @@ close_connection(State) ->
 reconnect(#epgsql_connection{
 reconnect(#epgsql_connection{
         reconnect_attempt = R,
         reconnect_attempt = R,
         reconnect_timeout = T} = State) ->
         reconnect_timeout = T} = State) ->
-    case T > ?MAX_RECONNECT_TIMEOUT of
+    case T > ?DB_MAX_RECONNECT_TIMEOUT of
         true ->
         true ->
-            reconnect_after(R, ?MIN_RECONNECT_TIMEOUT, T),
+            reconnect_after(R, ?DB_MIN_RECONNECT_TIMEOUT, T),
             State#epgsql_connection{reconnect_attempt = R + 1};
             State#epgsql_connection{reconnect_attempt = R + 1};
         _ ->
         _ ->
-            T2 = exponential_backoff(R, ?MIN_RECONNECT_TIMEOUT),
-            reconnect_after(R, ?MIN_RECONNECT_TIMEOUT, T2),
+            T2 = exponential_backoff(R, ?DB_MIN_RECONNECT_TIMEOUT),
+            reconnect_after(R, ?DB_MIN_RECONNECT_TIMEOUT, T2),
             State#epgsql_connection{reconnect_attempt=R + 1, reconnect_timeout=T2}
             State#epgsql_connection{reconnect_attempt=R + 1, reconnect_timeout=T2}
     end.
     end.
 
 
@@ -85,3 +86,11 @@ rand_range(Min, Max) ->
 
 
 exponential_backoff(N, T) ->
 exponential_backoff(N, T) ->
     erlang:round(math:pow(2, N)) * T.
     erlang:round(math:pow(2, N)) * T.
+
+
+-spec pool_name_to_atom(pool_name()) -> atom().
+pool_name_to_atom(PoolName) when is_binary(PoolName) ->
+    pool_name_to_atom(erlang:binary_to_atom(PoolName, utf8));
+pool_name_to_atom(PoolName) when is_list(PoolName) ->
+    pool_name_to_atom(list_to_atom(PoolName));
+pool_name_to_atom(PoolName) -> PoolName.