epgsql_pool.erl 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. -module(epgsql_pool).
  2. -export([start/4, stop/1,
  3. validate_connection_params/1,
  4. query/2, query/3, query/4,
  5. squery/2, squery/3,
  6. transaction/2,
  7. get_settings/0, set_settings/1,
  8. get_worker/1,
  9. set_notice/2
  10. ]).
  11. -include("epgsql_pool.hrl").
  12. -include("otp_types.hrl").
  13. -type(pool_name() :: binary() | string() | atom()).
  14. -export_type([pool_name/0]).
  15. %% Module API
  16. -spec start(pool_name(), integer(), integer(), map() | #epgsql_connection_params{}) -> {ok, pid()}.
  17. start(PoolName, InitCount, MaxCount, ConnectionParams) when is_map(ConnectionParams) ->
  18. Params2 = #epgsql_connection_params{
  19. host = maps:get(host, ConnectionParams),
  20. port = maps:get(port, ConnectionParams),
  21. username = maps:get(username, ConnectionParams),
  22. password = maps:get(password, ConnectionParams),
  23. database = maps:get(database, ConnectionParams)
  24. },
  25. start(PoolName, InitCount, MaxCount, Params2);
  26. start(PoolName0, InitCount, MaxCount, #epgsql_connection_params{} = ConnectionParams) ->
  27. PoolName = epgsql_pool_utils:pool_name_to_atom(PoolName0),
  28. case lists:member(PoolName, all_keys()) of
  29. true -> {error, invalid_pool_name};
  30. false ->
  31. application:set_env(epgsql_pool, PoolName, ConnectionParams),
  32. {ok, MaxQueue} = application:get_env(epgsql_pool, pooler_max_queue),
  33. PoolConfig = [{name, PoolName},
  34. {init_count, InitCount},
  35. {max_count, MaxCount},
  36. {queue_max, MaxQueue},
  37. {start_mfa, {epgsql_pool_worker, start_link, [PoolName]}},
  38. {stop_mfa, {epgsql_pool_worker, stop, ['$pooler_pid']}}
  39. ],
  40. pooler:new_pool(PoolConfig)
  41. end.
  42. -spec stop(pool_name()) -> ok | {error, term()}.
  43. stop(PoolName) ->
  44. pooler:rm_pool(epgsql_pool_utils:pool_name_to_atom(PoolName)).
  45. -spec validate_connection_params(map() | #epgsql_connection_params{}) -> ok | {error, term()}.
  46. validate_connection_params(ConnectionParams) when is_map(ConnectionParams) ->
  47. Params2 = #epgsql_connection_params{
  48. host = maps:get(host, ConnectionParams),
  49. port = maps:get(port, ConnectionParams),
  50. username = maps:get(username, ConnectionParams),
  51. password = maps:get(password, ConnectionParams),
  52. database = maps:get(database, ConnectionParams)
  53. },
  54. validate_connection_params(Params2);
  55. validate_connection_params(#epgsql_connection_params{host = Host, port = Port, username = Username,
  56. password = Password, database = Database}) ->
  57. {ok,ConnectionTimeout} = application:get_env(epgsql_pool, connection_timeout),
  58. Res = epgsql:connect(Host, Username, Password,
  59. [{port, Port},
  60. {database, Database},
  61. {timeout, ConnectionTimeout}]),
  62. case Res of
  63. {ok, Sock} -> epgsql:close(Sock), ok;
  64. {error, Reason} -> {error, Reason}
  65. end.
  66. -spec query(pool_name() | pid(), epgsql:sql_query()) -> epgsql:reply().
  67. query(PoolNameOrWorker, Stmt) ->
  68. query(PoolNameOrWorker, Stmt, [], []).
  69. -spec query(pool_name() | pid(), epgsql:sql_query(), [epgsql:bind_param()]) -> epgsql:reply().
  70. query(PoolNameOrWorker, Stmt, Params) ->
  71. query(PoolNameOrWorker, Stmt, Params, []).
  72. -spec query(pool_name() | pid(), epgsql:sql_query(), [epgsql:bind_param()], [proplists:option()]) -> epgsql:reply().
  73. query(PoolNameOrWorker, Stmt, Params, Options) ->
  74. do_query(PoolNameOrWorker, {equery, Stmt, Params}, Options).
  75. -spec squery(pool_name() | pid(), epgsql:sql_query()) -> epgsql:reply(epgsql:squery_row()) | {error, timeout}.
  76. squery(PoolNameOrWorker, Stmt) ->
  77. squery(PoolNameOrWorker, Stmt, []).
  78. -spec squery(pool_name() | pid(), epgsql:sql_query(), [proplists:option()]) ->
  79. epgsql:reply(epgsql:squery_row()) | {error, timeout}.
  80. squery(PoolNameOrWorker, Stmt, Options) ->
  81. do_query(PoolNameOrWorker, {squery, Stmt}, Options).
  82. do_query(Worker, QueryTuple, Options) when is_pid(Worker) ->
  83. Timeout = case proplists:get_value(timeout, Options) of
  84. undefined -> element(2, application:get_env(epgsql_pool, query_timeout));
  85. V -> V
  86. end,
  87. try
  88. Sock = gen_server:call(Worker, get_sock),
  89. try
  90. gen_server:call(Worker, QueryTuple, Timeout)
  91. catch
  92. exit:{timeout, _} ->
  93. error_logger:error_msg("query timeout ~p", [QueryTuple]),
  94. epgsql_sock:cancel(Sock),
  95. {error, timeout}
  96. end
  97. catch
  98. exit:{timeout, _} ->
  99. error_logger:error_msg("get_sock timeout ~p", [QueryTuple]),
  100. {error, timeout}
  101. end;
  102. do_query(PoolName0, QueryTuple, Options) ->
  103. PoolName = epgsql_pool_utils:pool_name_to_atom(PoolName0),
  104. with_worker(
  105. PoolName,
  106. fun(Worker) ->
  107. do_query(Worker, QueryTuple, Options)
  108. end).
  109. -spec transaction(pool_name(), fun()) -> epgsql:reply() | {error, term()}.
  110. transaction(PoolName0, Fun) ->
  111. PoolName = epgsql_pool_utils:pool_name_to_atom(PoolName0),
  112. Timeout = application:get_env(epgsql_pool, transaction_timeout, 20000),
  113. with_worker(
  114. PoolName,
  115. fun(Worker) ->
  116. try
  117. gen_server:call(Worker, {squery, "BEGIN"}, Timeout),
  118. Result = Fun(Worker),
  119. gen_server:call(Worker, {squery, "COMMIT"}, Timeout),
  120. Result
  121. catch
  122. Err:Reason:ST ->
  123. gen_server:call(Worker, {squery, "ROLLBACK"}, Timeout),
  124. erlang:raise(Err, Reason, ST)
  125. end
  126. end).
  127. -spec get_settings() -> map().
  128. get_settings() ->
  129. lists:foldl(fun(Key, Map) ->
  130. maps:put(Key, element(2, application:get_env(epgsql_pool, Key)), Map)
  131. end, maps:new(), all_keys()).
  132. -spec set_settings(map()) -> ok.
  133. set_settings(Map) ->
  134. lists:foreach(fun(Key) ->
  135. case maps:find(Key, Map) of
  136. {ok, Value} -> application:set_env(epgsql_pool, Key, Value);
  137. error -> do_nothing
  138. end
  139. end, all_keys()),
  140. ok.
  141. -spec set_notice(pid(), pid()) -> gs_call_reply().
  142. set_notice(Worker, Pid) ->
  143. Timeout = application:get_env(epgsql_pool, query_timeout, 5000),
  144. gen_server:call(Worker, {set_async_receiver, Pid}, Timeout).
  145. %%% inner functions
  146. -spec get_worker(pool_name()) -> {ok, pid()} | {error, term()}.
  147. get_worker(PoolName) ->
  148. {ok, Timeout} = application:get_env(epgsql_pool, pooler_get_worker_timeout),
  149. case pooler:take_member(PoolName, Timeout) of
  150. Worker when is_pid(Worker) -> {ok, Worker};
  151. error_no_members ->
  152. PoolStats = pooler:pool_stats(PoolName),
  153. error_logger:error_msg("Pool ~p overload: ~p", [PoolName, PoolStats]),
  154. {error, pool_overload}
  155. end.
  156. with_worker(PoolName, Fun) ->
  157. case get_worker(PoolName) of
  158. {ok, Worker} ->
  159. Response =
  160. try
  161. Fun(Worker)
  162. catch
  163. Err:Reason:ST ->
  164. pooler:return_member(PoolName, Worker, fail),
  165. erlang:raise(Err, Reason, ST)
  166. end,
  167. pooler:return_member(PoolName, Worker, ok),
  168. Response;
  169. Err ->
  170. Err
  171. end.
  172. -spec all_keys() -> [atom()].
  173. all_keys() ->
  174. lists:filtermap(fun({_Key, #epgsql_connection_params{}}) -> false;
  175. ({included_applications, _}) -> false;
  176. ({Key, _Value}) -> {true, Key}
  177. end,
  178. application:get_all_env(epgsql_pool)).