epgsql_pool.erl 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. -module(epgsql_pool).
  2. -export([start/4, stop/1,
  3. validate_connection_params/1,
  4. query/2, query/3, query/4,
  5. squery/2, squery/3,
  6. transaction/2,
  7. get_settings/0, set_settings/1,
  8. test_run/0
  9. ]).
  10. -include("epgsql_pool.hrl").
  11. -type(pool_name() :: binary() | string() | atom()).
  12. -export_type([pool_name/0]).
  13. %% Module API
  14. -spec start(pool_name(), integer(), integer(), map() | #epgsql_connection_params{}) -> {ok, pid()}.
  15. start(PoolName, InitCount, MaxCount, ConnectionParams) when is_map(ConnectionParams) ->
  16. Params2 = #epgsql_connection_params{
  17. host = maps:get(host, ConnectionParams),
  18. port = maps:get(port, ConnectionParams),
  19. username = maps:get(username, ConnectionParams),
  20. password = maps:get(password, ConnectionParams),
  21. database = maps:get(database, ConnectionParams)
  22. },
  23. start(PoolName, InitCount, MaxCount, Params2);
  24. start(PoolName0, InitCount, MaxCount, #epgsql_connection_params{} = ConnectionParams) ->
  25. PoolName = epgsql_pool_utils:pool_name_to_atom(PoolName0),
  26. case lists:member(PoolName, all_keys()) of
  27. true -> {error, invalid_pool_name};
  28. false ->
  29. application:set_env(epgsql_pool, PoolName, ConnectionParams),
  30. {ok, MaxQueue} = application:get_env(epgsql_pool, pooler_max_queue),
  31. PoolConfig = [{name, PoolName},
  32. {init_count, InitCount},
  33. {max_count, MaxCount},
  34. {queue_max, MaxQueue},
  35. {start_mfa, {epgsql_pool_worker, start_link, [PoolName]}},
  36. {stop_mfa, {epgsql_pool_worker, stop, ['$pooler_pid']}}
  37. ],
  38. pooler:new_pool(PoolConfig)
  39. end.
  40. -spec stop(pool_name()) -> ok | {error, term()}.
  41. stop(PoolName) ->
  42. pooler:rm_pool(epgsql_pool_utils:pool_name_to_atom(PoolName)).
  43. -spec validate_connection_params(map() | #epgsql_connection_params{}) -> ok | {error, term()}.
  44. validate_connection_params(ConnectionParams) when is_map(ConnectionParams) ->
  45. Params2 = #epgsql_connection_params{
  46. host = maps:get(host, ConnectionParams),
  47. port = maps:get(port, ConnectionParams),
  48. username = maps:get(username, ConnectionParams),
  49. password = maps:get(password, ConnectionParams),
  50. database = maps:get(database, ConnectionParams)
  51. },
  52. validate_connection_params(Params2);
  53. validate_connection_params(#epgsql_connection_params{host = Host, port = Port, username = Username,
  54. password = Password, database = Database}) ->
  55. {ok,ConnectionTimeout} = application:get_env(epgsql_pool, connection_timeout),
  56. Res = epgsql:connect(Host, Username, Password,
  57. [{port, Port},
  58. {database, Database},
  59. {timeout, ConnectionTimeout}]),
  60. case Res of
  61. {ok, Sock} -> epgsql:close(Sock), ok;
  62. {error, Reason} -> {error, Reason}
  63. end.
  64. -spec query(pool_name() | pid(), epgsql:sql_query()) -> epgsql:reply().
  65. query(PoolNameOrWorker, Stmt) ->
  66. query(PoolNameOrWorker, Stmt, [], []).
  67. -spec query(pool_name() | pid(), epgsql:sql_query(), [epgsql:bind_param()]) -> epgsql:reply().
  68. query(PoolNameOrWorker, Stmt, Params) ->
  69. query(PoolNameOrWorker, Stmt, Params, []).
  70. -spec query(pool_name() | pid(), epgsql:sql_query(), [epgsql:bind_param()], [proplists:option()]) -> epgsql:reply().
  71. query(PoolNameOrWorker, Stmt, Params, Options) ->
  72. do_query(PoolNameOrWorker, {equery, Stmt, Params}, Options).
  73. -spec squery(pool_name() | pid(), epgsql:sql_query()) -> epgsql:reply(epgsql:squery_row()) | {error, timeout}.
  74. squery(PoolNameOrWorker, Stmt) ->
  75. squery(PoolNameOrWorker, Stmt, []).
  76. -spec squery(pool_name() | pid(), epgsql:sql_query(), [proplists:option()]) ->
  77. epgsql:reply(epgsql:squery_row()) | {error, timeout}.
  78. squery(PoolNameOrWorker, Stmt, Options) ->
  79. do_query(PoolNameOrWorker, {squery, Stmt}, Options).
  80. do_query(Worker, QueryTuple, Options) when is_pid(Worker) ->
  81. Timeout = case proplists:get_value(timeout, Options) of
  82. undefined -> element(2, application:get_env(epgsql_pool, query_timeout));
  83. V -> V
  84. end,
  85. Sock = gen_server:call(Worker, get_sock),
  86. try
  87. gen_server:call(Worker, QueryTuple, Timeout)
  88. catch
  89. exit:{timeout, _} ->
  90. error_logger:error_msg("query timeout ~p", [QueryTuple]),
  91. epgsql_sock:cancel(Sock),
  92. {error, timeout}
  93. end;
  94. do_query(PoolName0, QueryTuple, Options) ->
  95. PoolName = epgsql_pool_utils:pool_name_to_atom(PoolName0),
  96. with_worker(
  97. PoolName,
  98. fun(Worker) ->
  99. do_query(Worker, QueryTuple, Options)
  100. end).
  101. -spec transaction(pool_name(), fun()) -> epgsql:reply() | {error, term()}.
  102. transaction(PoolName0, Fun) ->
  103. PoolName = epgsql_pool_utils:pool_name_to_atom(PoolName0),
  104. with_worker(
  105. PoolName,
  106. fun(Worker) ->
  107. try
  108. gen_server:call(Worker, {squery, "BEGIN"}),
  109. Result = Fun(Worker),
  110. gen_server:call(Worker, {squery, "COMMIT"}),
  111. Result
  112. catch
  113. Err:Reason ->
  114. gen_server:call(Worker, {squery, "ROLLBACK"}),
  115. erlang:raise(Err, Reason, erlang:get_stacktrace())
  116. end
  117. end).
  118. -spec get_settings() -> map().
  119. get_settings() ->
  120. lists:foldl(fun(Key, Map) ->
  121. maps:put(Key, element(2, application:get_env(epgsql_pool, Key)), Map)
  122. end, maps:new(), all_keys()).
  123. -spec set_settings(map()) -> ok.
  124. set_settings(Map) ->
  125. lists:foreach(fun(Key) ->
  126. case maps:find(Key, Map) of
  127. {ok, Value} -> application:set_env(epgsql_pool, Key, Value);
  128. error -> do_nothing
  129. end
  130. end, all_keys()),
  131. ok.
  132. -spec test_run() -> ok.
  133. test_run() ->
  134. application:ensure_all_started(epgsql_pool),
  135. Params = #{host => "localhost",
  136. port => 5432,
  137. username => "test",
  138. password => "test",
  139. database => "testdb"},
  140. {ok, _} = epgsql_pool:start(my_pool, 2, 2, Params),
  141. Res1 = epgsql_pool:query(my_pool, "select * from category"),
  142. error_logger:info_msg("Res1: ~p", [Res1]),
  143. ok.
  144. %%% inner functions
  145. -spec get_worker(pool_name()) -> {ok, pid()} | {error, term()}.
  146. get_worker(PoolName) ->
  147. {ok, Timeout} = application:get_env(epgsql_pool, pooler_get_worker_timeout),
  148. case pooler:take_member(PoolName, Timeout) of
  149. Worker when is_pid(Worker) -> {ok, Worker};
  150. error_no_members ->
  151. PoolStats = pooler:pool_stats(PoolName),
  152. error_logger:error_msg("Pool ~p overload: ~p", [PoolName, PoolStats]),
  153. {error, pool_overload}
  154. end.
  155. with_worker(PoolName, Fun) ->
  156. case get_worker(PoolName) of
  157. {ok, Worker} ->
  158. Response =
  159. try
  160. Fun(Worker)
  161. catch
  162. Err:Reason ->
  163. pooler:return_member(PoolName, Worker, fail),
  164. erlang:raise(Err, Reason, erlang:get_stacktrace())
  165. end,
  166. pooler:return_member(PoolName, Worker, ok),
  167. Response;
  168. Err ->
  169. Err
  170. end.
  171. -spec all_keys() -> [atom()].
  172. all_keys() ->
  173. lists:filtermap(fun({_Key, #epgsql_connection_params{}}) -> false;
  174. ({included_applications, _}) -> false;
  175. ({Key, _Value}) -> {true, Key}
  176. end,
  177. application:get_all_env(epgsql_pool)).