epgsql_pool.erl 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. -module(epgsql_pool).
  2. -export([start/4, stop/1,
  3. validate_connection_params/1,
  4. query/2, query/3, query/4,
  5. transaction/2,
  6. get_settings/0, set_settings/1
  7. ]).
  8. -include("epgsql_pool.hrl").
  9. -type(pool_name() :: binary() | string() | atom()).
  10. -export_type([pool_name/0]).
  11. %% Module API
  12. -spec start(pool_name(), integer(), integer(), map() | #epgsql_connection_params{}) -> {ok, pid()} | {error, term()}.
  13. start(PoolName, InitCount, MaxCount, ConnectionParams) when is_map(ConnectionParams) ->
  14. Params2 = #epgsql_connection_params{
  15. host = maps:get(host, ConnectionParams),
  16. port = maps:get(port, ConnectionParams),
  17. username = maps:get(username, ConnectionParams),
  18. password = maps:get(password, ConnectionParams),
  19. database = maps:get(database, ConnectionParams)
  20. },
  21. start(PoolName, InitCount, MaxCount, Params2);
  22. start(PoolName0, InitCount, MaxCount, #epgsql_connection_params{} = ConnectionParams) ->
  23. epgsql_pool_settings:set_connection_params(my_pool, ConnectionParams),
  24. PoolName = epgsql_pool_utils:pool_name_to_atom(PoolName0),
  25. MaxQueue = epgsql_pool_settings:get(pooler_max_queue),
  26. case epgsql_pool_settings:get_connection_params(PoolName) of
  27. {ok, _} -> PoolConfig = [{name, PoolName},
  28. {init_count, InitCount},
  29. {max_count, MaxCount},
  30. {queue_max, MaxQueue},
  31. {start_mfa, {epgsql_pool_worker, start_link, [PoolName]}},
  32. {stop_mfa, {epgsql_pool_worker, stop, []}}
  33. ],
  34. pooler:new_pool(PoolConfig);
  35. {error, not_found} -> {error, connection_params_not_found}
  36. end.
  37. -spec stop(pool_name()) -> ok | {error, term()}.
  38. stop(PoolName) ->
  39. pooler:rm_pool(epgsql_pool_utils:pool_name_to_atom(PoolName)).
  40. -spec validate_connection_params(#epgsql_connection_params{}) -> ok | {error, term()}.
  41. validate_connection_params(#epgsql_connection_params{host = Host, port = Port, username = Username,
  42. password = Password, database = Database}) ->
  43. ConnectionTimeout = epgsql_pool_settings:get(connection_timeout),
  44. Res = epgsql:connect(Host, Username, Password,
  45. [{port, Port},
  46. {database, Database},
  47. {timeout, ConnectionTimeout}]),
  48. case Res of
  49. {ok, Sock} -> epgsql:close(Sock), ok;
  50. {error, Reason} -> {error, Reason}
  51. end.
  52. -spec query(pool_name() | pid(), epgsql:sql_query()) -> epgsql:reply().
  53. query(PoolNameOrWorker, Stmt) ->
  54. query(PoolNameOrWorker, Stmt, [], []).
  55. -spec query(pool_name() | pid(), epgsql:sql_query(), [epgsql:bind_param()]) -> epgsql:reply().
  56. query(PoolNameOrWorker, Stmt, Params) ->
  57. query(PoolNameOrWorker, Stmt, Params, []).
  58. -spec query(pool_name() | pid(), epgsql:sql_query(), [epgsql:bind_param()], [proplists:option()]) -> epgsql:reply().
  59. query(Worker, Stmt, Params, Options) when is_pid(Worker) ->
  60. Timeout = case proplists:get_value(timeout, Options) of
  61. undefined -> epgsql_pool_settings:get(query_timeout);
  62. V -> V
  63. end,
  64. try
  65. gen_server:call(Worker, {equery, Stmt, Params}, Timeout)
  66. catch
  67. exit:{timeout, _} ->
  68. gen_server:call(Worker, cancel),
  69. error_logger:error_msg("query timeout ~p ~p", [Stmt, Params]),
  70. {error, timeout}
  71. end;
  72. query(PoolName0, Stmt, Params, Options) ->
  73. PoolName = epgsql_pool_utils:pool_name_to_atom(PoolName0),
  74. case get_worker(PoolName) of
  75. {ok, Worker} ->
  76. try
  77. query(Worker, Stmt, Params, Options)
  78. catch
  79. Err:Reason ->
  80. erlang:raise(Err, Reason, erlang:get_stacktrace())
  81. after
  82. pooler:return_member(PoolName, Worker, ok)
  83. end;
  84. {error, Reason} -> {error, Reason}
  85. end.
  86. -spec transaction(pool_name(), fun()) -> epgsql:reply() | {error, term()}.
  87. transaction(PoolName0, Fun) ->
  88. PoolName = epgsql_pool_utils:pool_name_to_atom(PoolName0),
  89. case get_worker(PoolName) of
  90. {ok, Worker} ->
  91. try
  92. gen_server:call(Worker, {squery, "BEGIN"}),
  93. Result = Fun(Worker),
  94. gen_server:call(Worker, {squery, "COMMIT"}),
  95. Result
  96. catch
  97. Err:Reason ->
  98. gen_server:call(Worker, {squery, "ROLLBACK"}),
  99. erlang:raise(Err, Reason, erlang:get_stacktrace())
  100. after
  101. pooler:return_member(PoolName, Worker, ok)
  102. end;
  103. {error, Reason} -> {error, Reason}
  104. end.
  105. -spec get_settings() -> map().
  106. get_settings() ->
  107. lists:foldl(fun(Key, Map) ->
  108. maps:put(Key, epgsql_pool_settings:get(Key), Map)
  109. end, maps:new(), epgsql_pool_settings:all_keys()).
  110. -spec set_settings(map()) -> ok.
  111. set_settings(Map) ->
  112. lists:foreach(fun(Key) ->
  113. case maps:find(Key, Map) of
  114. {ok, Value} -> epgsql_pool_settings:set(Key, Value);
  115. error -> do_nothing
  116. end
  117. end, epgsql_pool_settings:all_keys()),
  118. ok.
  119. %%% inner functions
  120. get_worker(PoolName) ->
  121. Timeout = epgsql_pool_settings:get(pooler_get_worker_timeout),
  122. case pooler:take_member(PoolName, Timeout) of
  123. Worker when is_pid(Worker) -> {ok, Worker};
  124. error_no_members ->
  125. PoolStats = pooler:pool_stats(PoolName),
  126. error_logger:error_msg("Pool ~p overload: ~p", [PoolName, PoolStats]),
  127. {error, pool_overload}
  128. end.