epgsql_pool_utils.erl 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. -module(epgsql_pool_utils).
  2. -export([open_connection/1,
  3. close_connection/1,
  4. reconnect/1,
  5. pool_name_to_atom/1
  6. ]).
  7. -include("epgsql_pool.hrl").
  8. -spec open_connection(#epgsql_connection_params{}) -> {ok, #epgsql_connection{}} | {error, term(), #epgsql_connection{}}.
  9. open_connection(#epgsql_connection_params{host = Host, port = Port,
  10. username = Username,
  11. password = Password,
  12. database = Database} = ConnectionParams) ->
  13. Connection0 = #epgsql_connection{params = ConnectionParams, reconnect_attempt = 0},
  14. ConnectionTimeout = epgsql_pool_settings:get(connection_timeout),
  15. Res = epgsql:connect(Host, Username, Password,
  16. [{port, Port},
  17. {database, Database},
  18. {timeout, ConnectionTimeout}]),
  19. case Res of
  20. {ok, Sock} -> {ok, Connection0#epgsql_connection{sock = Sock}};
  21. {error, Reason} -> {error, Reason, Connection0}
  22. end.
  23. -spec close_connection(#epgsql_connection{}) -> #epgsql_connection{}.
  24. close_connection(#epgsql_connection{sock = Sock} = Connection) ->
  25. epgsql:close(Sock),
  26. Connection#epgsql_connection{sock = undefined}.
  27. -spec reconnect(#epgsql_connection{}) -> #epgsql_connection{}.
  28. reconnect(#epgsql_connection{reconnect_attempt = Attempt,
  29. reconnect_timeout = Timeout0} = Connection) ->
  30. MaxReconnectTimeout = epgsql_pool_settings:get(max_reconnect_timeout),
  31. MinReconnectTimeout = epgsql_pool_settings:get(min_reconnect_timeout),
  32. Timeout = if
  33. Timeout0 > MaxReconnectTimeout -> Timeout0;
  34. true -> exponential_backoff(Attempt, MinReconnectTimeout)
  35. end,
  36. reconnect_after(Attempt, MinReconnectTimeout, Timeout),
  37. Connection#epgsql_connection{reconnect_attempt = Attempt + 1, reconnect_timeout = Timeout}.
  38. -spec reconnect_after(integer(), integer(), integer()) -> ok.
  39. reconnect_after(Attempt, TMin, TMax) ->
  40. Delay = max(random:uniform(TMax), TMin),
  41. error_logger:warning_msg("epgsql_pool reconnect after ~p attempt ~p", [Delay, Attempt]),
  42. erlang:send_after(Delay, self(), open_connection),
  43. ok.
  44. -spec exponential_backoff(integer(), integer()) -> integer().
  45. exponential_backoff(N, T) ->
  46. erlang:round(math:pow(2, N)) * T.
  47. -spec pool_name_to_atom(epgsql_pool:pool_name()) -> atom().
  48. pool_name_to_atom(PoolName) when is_binary(PoolName) ->
  49. pool_name_to_atom(erlang:binary_to_atom(PoolName, utf8));
  50. pool_name_to_atom(PoolName) when is_list(PoolName) ->
  51. pool_name_to_atom(list_to_atom(PoolName));
  52. pool_name_to_atom(PoolName) -> PoolName.