epgsql_pool_utils.erl 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. -module(epgsql_pool_utils).
  2. -export([
  3. get_host_params/1,
  4. new_connection/1,
  5. open_connection/1,
  6. close_connection/1,
  7. reconnect/1,
  8. pool_name_to_atom/1
  9. ]).
  10. -include_lib("epgsql/include/epgsql.hrl").
  11. -include("epgsql_pool.hrl").
  12. get_host_params(HostSection) ->
  13. #epgsql_params{
  14. host = wgconfig:get_string(HostSection, "host"),
  15. port = wgconfig:get_int(HostSection, "port"),
  16. username = wgconfig:get_string(HostSection, "username"),
  17. password = wgconfig:get_string(HostSection, "password"),
  18. database = wgconfig:get_string(HostSection, "database")
  19. }.
  20. new_connection(SectionName) ->
  21. HostSection = wgconfig:get_string(SectionName, "db_host"),
  22. % Connection parameters
  23. Params = get_host_params(HostSection),
  24. #epgsql_connection{
  25. connection_timeout = wgconfig:get_int(SectionName, "connection_timeout"),
  26. query_timeout = wgconfig:get_int(SectionName, "query_timeout"),
  27. params=Params
  28. }.
  29. open_connection(State) ->
  30. Params = State#epgsql_connection.params,
  31. lager:info("Connect ~p", [Params]),
  32. #epgsql_params{
  33. host = Host,
  34. port = Port,
  35. username = Username,
  36. password = Password,
  37. database = Database
  38. } = Params,
  39. ConnectionTimeout = State#epgsql_connection.connection_timeout,
  40. Res = epgsql:connect(Host, Username, Password, [
  41. {port, Port},
  42. {database, Database},
  43. {timeout, ConnectionTimeout}
  44. ]),
  45. case Res of
  46. {ok, Sock} ->
  47. {ok, State#epgsql_connection{
  48. connection=Sock,
  49. reconnect_attempt=0}};
  50. {error, Reason} ->
  51. lager:error("Connect fail: ~p", [Reason]),
  52. {error, State}
  53. end.
  54. close_connection(State) ->
  55. Connection = State#epgsql_connection.connection,
  56. epgsql:close(Connection),
  57. #epgsql_connection{connection = undefined}.
  58. reconnect(#epgsql_connection{
  59. reconnect_attempt = R,
  60. reconnect_timeout = T} = State) ->
  61. case T > ?DB_MAX_RECONNECT_TIMEOUT of
  62. true ->
  63. reconnect_after(R, ?DB_MIN_RECONNECT_TIMEOUT, T),
  64. State#epgsql_connection{reconnect_attempt = R + 1};
  65. _ ->
  66. T2 = exponential_backoff(R, ?DB_MIN_RECONNECT_TIMEOUT),
  67. reconnect_after(R, ?DB_MIN_RECONNECT_TIMEOUT, T2),
  68. State#epgsql_connection{reconnect_attempt=R + 1, reconnect_timeout=T2}
  69. end.
  70. reconnect_after(R, Tmin, Tmax) ->
  71. Delay = rand_range(Tmin, Tmax),
  72. lager:info("Reconnect after ~w ms (attempt ~w)", [Delay, R]),
  73. erlang:send_after(Delay, self(), open_connection).
  74. rand_range(Min, Max) ->
  75. max(random:uniform(Max), Min).
  76. exponential_backoff(N, T) ->
  77. erlang:round(math:pow(2, N)) * T.
  78. -spec pool_name_to_atom(pool_name()) -> atom().
  79. pool_name_to_atom(PoolName) when is_binary(PoolName) ->
  80. pool_name_to_atom(erlang:binary_to_atom(PoolName, utf8));
  81. pool_name_to_atom(PoolName) when is_list(PoolName) ->
  82. pool_name_to_atom(list_to_atom(PoolName));
  83. pool_name_to_atom(PoolName) -> PoolName.