epgsql_pool.erl 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. -module(epgsql_pool).
  2. -export([start/4, stop/1,
  3. validate_connection_params/1,
  4. query/2, query/3, query/4,
  5. squery/2, squery/3,
  6. transaction/2,
  7. get_settings/0, set_settings/1
  8. ]).
  9. -include("epgsql_pool.hrl").
  10. -type(pool_name() :: binary() | string() | atom()).
  11. -export_type([pool_name/0]).
  12. %% Module API
  13. -spec start(pool_name(), integer(), integer(), map() | #epgsql_connection_params{}) -> {ok, pid()}.
  14. start(PoolName, InitCount, MaxCount, ConnectionParams) when is_map(ConnectionParams) ->
  15. Params2 = #epgsql_connection_params{
  16. host = maps:get(host, ConnectionParams),
  17. port = maps:get(port, ConnectionParams),
  18. username = maps:get(username, ConnectionParams),
  19. password = maps:get(password, ConnectionParams),
  20. database = maps:get(database, ConnectionParams)
  21. },
  22. start(PoolName, InitCount, MaxCount, Params2);
  23. start(PoolName0, InitCount, MaxCount, #epgsql_connection_params{} = ConnectionParams) ->
  24. PoolName = epgsql_pool_utils:pool_name_to_atom(PoolName0),
  25. case lists:member(PoolName, all_keys()) of
  26. true -> {error, invalid_pool_name};
  27. false ->
  28. application:set_env(epgsql_pool, PoolName, ConnectionParams),
  29. {ok, MaxQueue} = application:get_env(epgsql_pool, pooler_max_queue),
  30. PoolConfig = [{name, PoolName},
  31. {init_count, InitCount},
  32. {max_count, MaxCount},
  33. {queue_max, MaxQueue},
  34. {start_mfa, {epgsql_pool_worker, start_link, [PoolName]}},
  35. {stop_mfa, {epgsql_pool_worker, stop, ['$pooler_pid']}}
  36. ],
  37. pooler:new_pool(PoolConfig)
  38. end.
  39. -spec stop(pool_name()) -> ok | {error, term()}.
  40. stop(PoolName) ->
  41. pooler:rm_pool(epgsql_pool_utils:pool_name_to_atom(PoolName)).
  42. -spec validate_connection_params(map() | #epgsql_connection_params{}) -> ok | {error, term()}.
  43. validate_connection_params(ConnectionParams) when is_map(ConnectionParams) ->
  44. Params2 = #epgsql_connection_params{
  45. host = maps:get(host, ConnectionParams),
  46. port = maps:get(port, ConnectionParams),
  47. username = maps:get(username, ConnectionParams),
  48. password = maps:get(password, ConnectionParams),
  49. database = maps:get(database, ConnectionParams)
  50. },
  51. validate_connection_params(Params2);
  52. validate_connection_params(#epgsql_connection_params{host = Host, port = Port, username = Username,
  53. password = Password, database = Database}) ->
  54. {ok,ConnectionTimeout} = application:get_env(epgsql_pool, connection_timeout),
  55. Res = epgsql:connect(Host, Username, Password,
  56. [{port, Port},
  57. {database, Database},
  58. {timeout, ConnectionTimeout}]),
  59. case Res of
  60. {ok, Sock} -> epgsql:close(Sock), ok;
  61. {error, Reason} -> {error, Reason}
  62. end.
  63. -spec query(pool_name() | pid(), epgsql:sql_query()) -> epgsql:reply().
  64. query(PoolNameOrWorker, Stmt) ->
  65. query(PoolNameOrWorker, Stmt, [], []).
  66. -spec query(pool_name() | pid(), epgsql:sql_query(), [epgsql:bind_param()]) -> epgsql:reply().
  67. query(PoolNameOrWorker, Stmt, Params) ->
  68. query(PoolNameOrWorker, Stmt, Params, []).
  69. -spec query(pool_name() | pid(), epgsql:sql_query(), [epgsql:bind_param()], [proplists:option()]) -> epgsql:reply().
  70. query(PoolNameOrWorker, Stmt, Params, Options) ->
  71. do_query(PoolNameOrWorker, {equery, Stmt, Params}, Options).
  72. -spec squery(pool_name() | pid(), epgsql:sql_query()) -> epgsql:reply(epgsql:squery_row()) | {error, timeout}.
  73. squery(PoolNameOrWorker, Stmt) ->
  74. squery(PoolNameOrWorker, Stmt, []).
  75. -spec squery(pool_name() | pid(), epgsql:sql_query(), [proplists:option()]) ->
  76. epgsql:reply(epgsql:squery_row()) | {error, timeout}.
  77. squery(PoolNameOrWorker, Stmt, Options) ->
  78. do_query(PoolNameOrWorker, {squery, Stmt}, Options).
  79. do_query(Worker, QueryTuple, Options) when is_pid(Worker) ->
  80. Timeout = case proplists:get_value(timeout, Options) of
  81. undefined -> element(2, application:get_env(epgsql_pool, query_timeout));
  82. V -> V
  83. end,
  84. try
  85. Sock = gen_server:call(Worker, get_sock),
  86. try
  87. gen_server:call(Worker, QueryTuple, Timeout)
  88. catch
  89. exit:{timeout, _} ->
  90. error_logger:error_msg("query timeout ~p", [QueryTuple]),
  91. epgsql_sock:cancel(Sock),
  92. {error, timeout}
  93. end
  94. catch
  95. exit:{timeout, _} ->
  96. error_logger:error_msg("get_sock timeout ~p", [QueryTuple]),
  97. {error, timeout}
  98. end;
  99. do_query(PoolName0, QueryTuple, Options) ->
  100. PoolName = epgsql_pool_utils:pool_name_to_atom(PoolName0),
  101. with_worker(
  102. PoolName,
  103. fun(Worker) ->
  104. do_query(Worker, QueryTuple, Options)
  105. end).
  106. -spec transaction(pool_name(), fun()) -> epgsql:reply() | {error, term()}.
  107. transaction(PoolName0, Fun) ->
  108. PoolName = epgsql_pool_utils:pool_name_to_atom(PoolName0),
  109. Timeout = application:get_env(epgsql_pool, transaction_timeout, 20000),
  110. with_worker(
  111. PoolName,
  112. fun(Worker) ->
  113. try
  114. gen_server:call(Worker, {squery, "BEGIN"}, Timeout),
  115. Result = Fun(Worker),
  116. gen_server:call(Worker, {squery, "COMMIT"}, Timeout),
  117. Result
  118. catch
  119. Err:Reason ->
  120. gen_server:call(Worker, {squery, "ROLLBACK"}, Timeout),
  121. erlang:raise(Err, Reason, erlang:get_stacktrace())
  122. end
  123. end).
  124. -spec get_settings() -> map().
  125. get_settings() ->
  126. lists:foldl(fun(Key, Map) ->
  127. maps:put(Key, element(2, application:get_env(epgsql_pool, Key)), Map)
  128. end, maps:new(), all_keys()).
  129. -spec set_settings(map()) -> ok.
  130. set_settings(Map) ->
  131. lists:foreach(fun(Key) ->
  132. case maps:find(Key, Map) of
  133. {ok, Value} -> application:set_env(epgsql_pool, Key, Value);
  134. error -> do_nothing
  135. end
  136. end, all_keys()),
  137. ok.
  138. %%% inner functions
  139. -spec get_worker(pool_name()) -> {ok, pid()} | {error, term()}.
  140. get_worker(PoolName) ->
  141. {ok, Timeout} = application:get_env(epgsql_pool, pooler_get_worker_timeout),
  142. case pooler:take_member(PoolName, Timeout) of
  143. Worker when is_pid(Worker) -> {ok, Worker};
  144. error_no_members ->
  145. PoolStats = pooler:pool_stats(PoolName),
  146. error_logger:error_msg("Pool ~p overload: ~p", [PoolName, PoolStats]),
  147. {error, pool_overload}
  148. end.
  149. with_worker(PoolName, Fun) ->
  150. case get_worker(PoolName) of
  151. {ok, Worker} ->
  152. Response =
  153. try
  154. Fun(Worker)
  155. catch
  156. Err:Reason ->
  157. pooler:return_member(PoolName, Worker, fail),
  158. erlang:raise(Err, Reason, erlang:get_stacktrace())
  159. end,
  160. pooler:return_member(PoolName, Worker, ok),
  161. Response;
  162. Err ->
  163. Err
  164. end.
  165. -spec all_keys() -> [atom()].
  166. all_keys() ->
  167. lists:filtermap(fun({_Key, #epgsql_connection_params{}}) -> false;
  168. ({included_applications, _}) -> false;
  169. ({Key, _Value}) -> {true, Key}
  170. end,
  171. application:get_all_env(epgsql_pool)).