Browse Source

add rebar.config, fix dialyzer errors

Yuriy Zhloba 9 years ago
parent
commit
41bba30b60
6 changed files with 72 additions and 64 deletions
  1. 0 14
      include/epgsql_pool.hrl
  2. 12 0
      rebar.config
  3. 19 17
      src/epgsql_pool.erl
  4. 6 3
      src/epgsql_pool_settings.erl
  5. 21 22
      src/epgsql_pool_utils.erl
  6. 14 8
      src/epgsql_pool_worker.erl

+ 0 - 14
include/epgsql_pool.hrl

@@ -1,17 +1,3 @@
--define(DB_QUERY_TIMEOUT, 10000).
--define(DB_POOLER_GET_WORKER_TIMEOUT, 1000).
--define(DB_MAX_RECONNECT_TIMEOUT, 3000).
--define(DB_MIN_RECONNECT_TIMEOUT, 100).
-
-
--type(pool_name() :: binary() | string() | atom()).
--type(db_query() :: binary() | string()). % TODO: did driver accepts string?
--type(db_reply() :: term()). % TODO: narrow type
-
-
--type(epgsql_pool_settings_key() ::
-        connection_timeout | query_timeout | pooler_get_worker_timeout | max_reconnect_timeout | min_reconnect_timeout).
-
 
 
 -record(epgsql_connection_params, {
 -record(epgsql_connection_params, {
     host :: string() | binary(),
     host :: string() | binary(),

+ 12 - 0
rebar.config

@@ -0,0 +1,12 @@
+%%-*- mode: erlang -*-
+{require_otp_vsn, "17"}.
+
+{erl_opts, [debug_info,
+            warn_missing_spec,
+            {i, "include"} %% need for emacs work properly with include_lib
+           ]}.
+
+{deps, [
+        {pooler, ".*", {git, "git://github.com/seth/pooler.git", "b6c522a67a1d067122705ef725535a8664dd8514"}},
+        {epgsql, ".*", {git, "git://github.com/epgsql/epgsql.git", {tag, "3.1.0"}}}
+       ]}.

+ 19 - 17
src/epgsql_pool.erl

@@ -1,21 +1,21 @@
 -module(epgsql_pool).
 -module(epgsql_pool).
 
 
--export([
-         start/3, stop/1,
+-export([start/3, stop/1,
          equery/2, equery/3,
          equery/2, equery/3,
          transaction/2
          transaction/2
         ]).
         ]).
 
 
 -include("epgsql_pool.hrl").
 -include("epgsql_pool.hrl").
 
 
--import(epgsql_pool_utils, [pool_name_to_atom/1]).
+-type(pool_name() :: binary() | string() | atom()).
+-export_type([pool_name/0]).
 
 
 
 
 %% Module API
 %% Module API
 
 
--spec start(pool_name(), integer(), integer()) -> ok. % TODO what returns from pooler:new_pool?
+-spec start(pool_name(), integer(), integer()) -> {ok, pid()} | {error, term()}.
 start(PoolName0, InitCount, MaxCount) ->
 start(PoolName0, InitCount, MaxCount) ->
-    PoolName = pool_name_to_atom(PoolName0),
+    PoolName = epgsql_pool_utils:pool_name_to_atom(PoolName0),
     PoolConfig = [
     PoolConfig = [
         {name, PoolName},
         {name, PoolName},
         {init_count, InitCount},
         {init_count, InitCount},
@@ -25,19 +25,19 @@ start(PoolName0, InitCount, MaxCount) ->
     pooler:new_pool(PoolConfig).
     pooler:new_pool(PoolConfig).
 
 
 
 
--spec stop(pool_name()) -> ok. % TODO what returns from pooler:stop?
+-spec stop(pool_name()) -> ok | {error, term()}.
 stop(PoolName) ->
 stop(PoolName) ->
-    pooler:rm_pool(pool_name_to_atom(PoolName)).
+    pooler:rm_pool(epgsql_pool_utils:pool_name_to_atom(PoolName)).
 
 
 
 
--spec equery(pool_name(), db_query()) -> db_reply().
+-spec equery(pool_name(), epgsql:sql_query()) -> epgsql:reply().
 equery(PoolName, Stmt) ->
 equery(PoolName, Stmt) ->
     equery(PoolName, Stmt, []).
     equery(PoolName, Stmt, []).
 
 
 
 
 %% Example
 %% Example
 %% epgsql_pool:equery("my_db_pool", "SELECT NOW() as now", []).
 %% epgsql_pool:equery("my_db_pool", "SELECT NOW() as now", []).
--spec equery(pool_name(), db_query(), list()) -> db_reply().
+-spec equery(pool_name(), epgsql:sql_query(), [epgsql:bind_param()]) -> epgsql:reply().
 equery(PoolName, Stmt, Params) ->
 equery(PoolName, Stmt, Params) ->
     transaction(PoolName,
     transaction(PoolName,
                 fun(Worker) ->
                 fun(Worker) ->
@@ -45,19 +45,20 @@ equery(PoolName, Stmt, Params) ->
                 end).
                 end).
 
 
 
 
--spec transaction(pool_name(), fun()) -> db_reply() | {error, term()}.
+-spec transaction(pool_name(), fun()) -> epgsql:reply() | {error, term()}.
 transaction(PoolName0, Fun) ->
 transaction(PoolName0, Fun) ->
-    PoolName = pool_name_to_atom(PoolName0),
-    case pooler:take_member(PoolName, ?DB_POOLER_GET_WORKER_TIMEOUT) of
+    PoolName = epgsql_pool_utils:pool_name_to_atom(PoolName0),
+    Timeout = epgsql_pool_settings:get(pooler_get_worker_timeout),
+    case pooler:take_member(PoolName, Timeout) of
         Worker when is_pid(Worker) ->
         Worker when is_pid(Worker) ->
             try
             try
-                equery(Worker, "BEGIN", []),
+                equery_with_worker(Worker, "BEGIN", []),
                 Result = Fun(Worker),
                 Result = Fun(Worker),
-                equery(Worker, "COMMIT", []),
+                equery_with_worker(Worker, "COMMIT", []),
                 Result
                 Result
             catch
             catch
                 Err:Reason ->
                 Err:Reason ->
-                    equery(Worker, "ROLLBACK", []),
+                    equery_with_worker(Worker, "ROLLBACK", []),
                     erlang:raise(Err, Reason, erlang:get_stacktrace())
                     erlang:raise(Err, Reason, erlang:get_stacktrace())
             after
             after
                 pooler:return_member(PoolName, Worker, ok)
                 pooler:return_member(PoolName, Worker, ok)
@@ -71,6 +72,7 @@ transaction(PoolName0, Fun) ->
 
 
 %% Inner functions
 %% Inner functions
 
 
--spec equery_with_worker(pid(), db_query(), list()) -> db_reply().
+-spec equery_with_worker(pid(), epgsql:sql_query(), [epgsql:bind_param()]) -> epgsql:reply().
 equery_with_worker(Worker, Stmt, Params) ->
 equery_with_worker(Worker, Stmt, Params) ->
-    gen_server:call(Worker, {equery, Stmt, Params}, ?DB_QUERY_TIMEOUT).
+    Timeout = epgsql_pool_settings:get(query_timeout),
+    gen_server:call(Worker, {equery, Stmt, Params}, Timeout). % TODO need other way to implement it

+ 6 - 3
src/epgsql_pool_settings.erl

@@ -7,6 +7,9 @@
 -include("epgsql_pool.hrl").
 -include("epgsql_pool.hrl").
 -include("otp_types.hrl").
 -include("otp_types.hrl").
 
 
+-type(epgsql_pool_settings_key() ::
+        connection_timeout | query_timeout | pooler_get_worker_timeout | max_reconnect_timeout | min_reconnect_timeout).
+
 
 
 %%% module API
 %%% module API
 
 
@@ -15,7 +18,7 @@ start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
 
 
 
--spec get_connection_params(pool_name()) -> #epgsql_connection_params{}.
+-spec get_connection_params(epgsql_pool:pool_name()) -> #epgsql_connection_params{}.
 get_connection_params(PoolName) ->
 get_connection_params(PoolName) ->
     Key = {connection, epgsql_pool_utils:pool_name_to_atom(PoolName)},
     Key = {connection, epgsql_pool_utils:pool_name_to_atom(PoolName)},
     case ets:lookup(?MODULE, Key) of
     case ets:lookup(?MODULE, Key) of
@@ -24,7 +27,7 @@ get_connection_params(PoolName) ->
     end.
     end.
 
 
 
 
--spec set_connection_params(pool_name(), #epgsql_connection_params{}) -> ok.
+-spec set_connection_params(egpsql_pool:pool_name(), #epgsql_connection_params{}) -> ok.
 set_connection_params(PoolName, Params) ->
 set_connection_params(PoolName, Params) ->
     Key = {connection, epgsql_pool_utils:pool_name_to_atom(PoolName)},
     Key = {connection, epgsql_pool_utils:pool_name_to_atom(PoolName)},
     gen_server:call(?MODULE, {save, Key, Params}).
     gen_server:call(?MODULE, {save, Key, Params}).
@@ -34,7 +37,7 @@ set_connection_params(PoolName, Params) ->
 get(Key) ->
 get(Key) ->
     case ets:lookup(?MODULE, {settings, Key}) of
     case ets:lookup(?MODULE, {settings, Key}) of
         [] -> throw({settings_not_found, Key});
         [] -> throw({settings_not_found, Key});
-        [Value] -> Value
+        [{_, Value}] -> Value
     end.
     end.
 
 
 
 

+ 21 - 22
src/epgsql_pool_utils.erl

@@ -6,16 +6,15 @@
          pool_name_to_atom/1
          pool_name_to_atom/1
         ]).
         ]).
 
 
--include_lib("epgsql/include/epgsql.hrl").
 -include("epgsql_pool.hrl").
 -include("epgsql_pool.hrl").
 
 
 
 
--spec open_connection(#epgsql_connection_params{}) -> #epgsql_connection{}.
-open_connection(#epgsql_params{host = Host, port = Port,
-                               username = Username,
-                               password = Password,
-                               database = Database} = ConnectionParams) ->
-    Connection0 = #epgsql_connection{params = ConnectionParams, reconnect_attempt=0},
+-spec open_connection(#epgsql_connection_params{}) -> {ok, #epgsql_connection{}} | {error, term(), #epgsql_connection{}}.
+open_connection(#epgsql_connection_params{host = Host, port = Port,
+                                          username = Username,
+                                          password = Password,
+                                          database = Database} = ConnectionParams) ->
+    Connection0 = #epgsql_connection{params = ConnectionParams, reconnect_attempt = 0},
     ConnectionTimeout = epgsql_pool_settings:get(connection_timeout),
     ConnectionTimeout = epgsql_pool_settings:get(connection_timeout),
     Res = epgsql:connect(Host, Username, Password,
     Res = epgsql:connect(Host, Username, Password,
                          [{port, Port},
                          [{port, Port},
@@ -30,26 +29,26 @@ open_connection(#epgsql_params{host = Host, port = Port,
 -spec close_connection(#epgsql_connection{}) -> #epgsql_connection{}.
 -spec close_connection(#epgsql_connection{}) -> #epgsql_connection{}.
 close_connection(#epgsql_connection{connection_sock = Sock} = Connection) ->
 close_connection(#epgsql_connection{connection_sock = Sock} = Connection) ->
     epgsql:close(Sock),
     epgsql:close(Sock),
-    Connection#epgsql_connection{connection_pid = undefined}.
+    Connection#epgsql_connection{connection_sock = undefined}.
 
 
 
 
 -spec reconnect(#epgsql_connection{}) -> #epgsql_connection{}.
 -spec reconnect(#epgsql_connection{}) -> #epgsql_connection{}.
-reconnect(#epgsql_connection{reconnect_attempt = R,
-                             reconnect_timeout = T} = Connection) ->
-    case T > ?DB_MAX_RECONNECT_TIMEOUT of
-        true ->
-            reconnect_after(?DB_MIN_RECONNECT_TIMEOUT, T),
-            Connection#epgsql_connection{reconnect_attempt = R + 1};
-        _ ->
-            T2 = exponential_backoff(R, ?DB_MIN_RECONNECT_TIMEOUT),
-            reconnect_after(?DB_MIN_RECONNECT_TIMEOUT, T2),
-            Connection#epgsql_connection{reconnect_attempt = R + 1, reconnect_timeout = T2}
-    end.
+reconnect(#epgsql_connection{reconnect_attempt = Attempt,
+                             reconnect_timeout = Timeout0} = Connection) ->
+    MaxReconnectTimeout = epgsql_pool_settings:get(max_reconnect_timeout),
+    MinReconnectTimeout = epgsql_pool_settings:get(min_reconnect_timeout),
+    Timeout = if
+                  Timeout0 > MaxReconnectTimeout -> Timeout0;
+                  true -> exponential_backoff(Attempt, MinReconnectTimeout)
+              end,
+    reconnect_after(Attempt, MinReconnectTimeout, Timeout),
+    Connection#epgsql_connection{reconnect_attempt = Attempt + 1, reconnect_timeout = Timeout}.
 
 
 
 
--spec reconnect_after(integer(), integer()) -> ok.
-reconnect_after(TMin, TMax) ->
+-spec reconnect_after(integer(), integer(), integer()) -> ok.
+reconnect_after(Attempt, TMin, TMax) ->
     Delay = max(random:uniform(TMax), TMin),
     Delay = max(random:uniform(TMax), TMin),
+    error_logger:warning_msg("epgsql_pool reconnect after ~p attempt ~p", [Delay, Attempt]),
     erlang:send_after(Delay, self(), open_connection),
     erlang:send_after(Delay, self(), open_connection),
     ok.
     ok.
 
 
@@ -59,7 +58,7 @@ exponential_backoff(N, T) ->
     erlang:round(math:pow(2, N)) * T.
     erlang:round(math:pow(2, N)) * T.
 
 
 
 
--spec pool_name_to_atom(pool_name()) -> atom().
+-spec pool_name_to_atom(epgsql_pool:pool_name()) -> atom().
 pool_name_to_atom(PoolName) when is_binary(PoolName) ->
 pool_name_to_atom(PoolName) when is_binary(PoolName) ->
     pool_name_to_atom(erlang:binary_to_atom(PoolName, utf8));
     pool_name_to_atom(erlang:binary_to_atom(PoolName, utf8));
 pool_name_to_atom(PoolName) when is_list(PoolName) ->
 pool_name_to_atom(PoolName) when is_list(PoolName) ->

+ 14 - 8
src/epgsql_pool_worker.erl

@@ -6,7 +6,7 @@
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
 
 
 -include("epgsql_pool.hrl").
 -include("epgsql_pool.hrl").
--include_lib("epgsql/include/epgsql.hrl").
+-include("otp_types.hrl").
 
 
 -record(state, {pool_name :: atom(),
 -record(state, {pool_name :: atom(),
                 connection :: #epgsql_connection{}
                 connection :: #epgsql_connection{}
@@ -14,6 +14,7 @@
 
 
 %% Module API
 %% Module API
 
 
+-spec start_link(epgsql_pool:pool_name()) -> gs_start_link_reply().
 start_link(PoolName0) ->
 start_link(PoolName0) ->
     PoolName = epgsql_pool_utils:pool_name_to_atom(PoolName0),
     PoolName = epgsql_pool_utils:pool_name_to_atom(PoolName0),
     gen_server:start_link(?MODULE, PoolName, []).
     gen_server:start_link(?MODULE, PoolName, []).
@@ -21,22 +22,23 @@ start_link(PoolName0) ->
 
 
 %%% gen_server API
 %%% gen_server API
 
 
+-spec init(gs_args()) -> gs_init_reply().
 init(PoolName) ->
 init(PoolName) ->
-    error_logger:info_message("Init epgsql pool worker: ~p", [PoolName]),
+    error_logger:info_msg("Init epgsql pool worker: ~p", [PoolName]),
     process_flag(trap_exit, true),
     process_flag(trap_exit, true),
     self() ! open_connection,
     self() ! open_connection,
     {ok, #state{pool_name = PoolName}}.
     {ok, #state{pool_name = PoolName}}.
 
 
 
 
+-spec handle_call(gs_request(), gs_from(), gs_reply()) -> gs_call_reply().
 handle_call({equery, _, _}, _From, #state{connection = undefined} = State) ->
 handle_call({equery, _, _}, _From, #state{connection = undefined} = State) ->
     {reply, {error, no_connection}, State};
     {reply, {error, no_connection}, State};
 
 
-handle_call({equery, Stmt, Params}, _From, State) ->
-    ConnState = State#state.connection,
-    Conn = ConnState#epgsql_connection.connection,
+handle_call({equery, Stmt, Params}, _From, #state{connection = Connection} = State) ->
     %% TStart = os:timestamp(),
     %% TStart = os:timestamp(),
     %% TODO: query_timeout
     %% TODO: query_timeout
-    Result = epgsql:equery(Conn, Stmt, Params),
+    Sock = Connection#epgsql_connection.connection_sock,
+    Result = epgsql:equery(Sock, Stmt, Params),
     %% Time = timer:now_diff(os:timestamp(), TStart),
     %% Time = timer:now_diff(os:timestamp(), TStart),
     {reply, Result, State};
     {reply, Result, State};
 
 
@@ -45,14 +47,16 @@ handle_call(Message, _From, State) ->
     {reply, ok, State}.
     {reply, ok, State}.
 
 
 
 
+-spec handle_cast(gs_request(), gs_state()) -> gs_cast_reply().
 handle_cast(Message, State) ->
 handle_cast(Message, State) ->
     error_logger:error_msg("unknown cast ~p in ~p ~n", [Message, ?MODULE]),
     error_logger:error_msg("unknown cast ~p in ~p ~n", [Message, ?MODULE]),
     {noreply, State}.
     {noreply, State}.
 
 
 
 
+-spec handle_info(gs_request(), gs_state()) -> gs_info_reply().
 handle_info(open_connection, #state{pool_name = PoolName} = State) ->
 handle_info(open_connection, #state{pool_name = PoolName} = State) ->
     ConnectionParams = epgsql_pool_settings:get_connection_params(PoolName),
     ConnectionParams = epgsql_pool_settings:get_connection_params(PoolName),
-    case open_connection(ConnectionParams) of
+    case epgsql_pool_utils:open_connection(ConnectionParams) of
         {ok, Connection} ->
         {ok, Connection} ->
             {noreply, State#state{connection = Connection}};
             {noreply, State#state{connection = Connection}};
         {error, Reason, Connection} ->
         {error, Reason, Connection} ->
@@ -74,9 +78,11 @@ handle_info(Message, State) ->
     {noreply, State}.
     {noreply, State}.
 
 
 
 
+-spec terminate(terminate_reason(), gs_state()) -> ok.
 terminate(_Reason, _State) ->
 terminate(_Reason, _State) ->
-    normal.
+    ok.
 
 
 
 
+-spec code_change(term(), term(), term()) -> gs_code_change_reply().
 code_change(_OldVsn, State, _Extra) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
     {ok, State}.