prop_pooler.erl 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. -module(prop_pooler).
  2. -export([
  3. prop_fixed_start/1,
  4. prop_fixed_checkout_all/1,
  5. prop_dynamic_checkout/1,
  6. prop_fixed_take_return/1,
  7. prop_fixed_take_return_broken/1,
  8. prop_fixed_client_died/1,
  9. prop_group_take_return/1
  10. ]).
  11. -include_lib("proper/include/proper.hrl").
  12. -include_lib("stdlib/include/assert.hrl").
  13. prop_fixed_start(doc) ->
  14. "Check that the pool of any fixed size can be started, internal statistics is correct".
  15. prop_fixed_start() ->
  16. Conf0 =
  17. #{
  18. name => ?FUNCTION_NAME,
  19. start_mfa => {pooled_gs, start_link, [{?FUNCTION_NAME}]}
  20. },
  21. ?FORALL(
  22. Size,
  23. pos_integer(),
  24. with_pool(
  25. Conf0#{
  26. init_count => Size,
  27. max_count => Size
  28. },
  29. fun() ->
  30. %% Pool is not utilized
  31. pool_is_free(?FUNCTION_NAME, Size),
  32. true
  33. end
  34. )
  35. ).
  36. prop_fixed_checkout_all(doc) ->
  37. "Can take all members from fixed-size pool. Following attempts will return error. Stats is correct.".
  38. prop_fixed_checkout_all() ->
  39. Conf0 = #{
  40. name => ?FUNCTION_NAME,
  41. start_mfa => {pooled_gs, start_link, [{?FUNCTION_NAME}]}
  42. },
  43. ?FORALL(
  44. Size,
  45. pos_integer(),
  46. with_pool(
  47. Conf0#{
  48. init_count => Size,
  49. max_count => Size
  50. },
  51. fun() ->
  52. ?assert(
  53. lists:all(
  54. fun(Res) -> is_pid(Res) end,
  55. take_n(?FUNCTION_NAME, 0, Size)
  56. )
  57. ),
  58. %% Fixed pool - can't take more members than pool size
  59. ?assertEqual(error_no_members, pooler:take_member(?FUNCTION_NAME, 10)),
  60. %% Pool is fully utilized
  61. pool_is_utilized(?FUNCTION_NAME, self(), Size),
  62. true
  63. end
  64. )
  65. ).
  66. prop_dynamic_checkout(doc) ->
  67. "It's possible to take all fixed and then all dynamic members, but no more than max_count; stats is correct".
  68. prop_dynamic_checkout() ->
  69. Conf0 = #{
  70. name => ?FUNCTION_NAME,
  71. max_age => {1, min},
  72. start_mfa => {pooled_gs, start_link, [{?FUNCTION_NAME}]}
  73. },
  74. ?FORALL(
  75. {Size, Extra},
  76. {pos_integer(), pos_integer()},
  77. with_pool(
  78. Conf0#{
  79. init_count => Size,
  80. max_count => Size + Extra
  81. },
  82. fun() ->
  83. MaxCount = Size + Extra,
  84. ?assert(
  85. lists:all(
  86. fun(Res) -> is_pid(Res) end,
  87. take_n(?FUNCTION_NAME, 0, Size)
  88. )
  89. ),
  90. %% Fixed pool is fully utilized up to init_count
  91. pool_is_utilized(?FUNCTION_NAME, self(), Size),
  92. %% Take all dynamic workers
  93. ?assert(
  94. lists:all(
  95. fun(Res) -> is_pid(Res) end,
  96. take_n(?FUNCTION_NAME, 1000, Extra)
  97. )
  98. ),
  99. %% Pool is fully utilized now
  100. ?assertEqual(error_no_members, pooler:take_member(?FUNCTION_NAME, 10)),
  101. %% Dynamic pool is fully utilized up to max_count
  102. pool_is_utilized(?FUNCTION_NAME, self(), MaxCount),
  103. true
  104. end
  105. )
  106. ).
  107. prop_fixed_take_return(doc) ->
  108. "The state of the pool is same before all members are taken and after they are returned".
  109. prop_fixed_take_return() ->
  110. Conf0 = #{
  111. name => ?FUNCTION_NAME,
  112. start_mfa => {pooled_gs, start_link, [{?FUNCTION_NAME}]}
  113. },
  114. Stats = fun() ->
  115. lists:sort([{Pid, State} || {Pid, {_, State, _}} <- pooler:pool_stats(?FUNCTION_NAME)])
  116. end,
  117. ?FORALL(
  118. Size,
  119. pos_integer(),
  120. with_pool(
  121. Conf0#{
  122. init_count => Size,
  123. max_count => Size
  124. },
  125. fun() ->
  126. UtilizationBefore = utilization(?FUNCTION_NAME),
  127. StatsBefore = Stats(),
  128. Taken = take_n(?FUNCTION_NAME, 0, Size),
  129. ?assert(lists:all(fun(Res) -> is_pid(Res) end, Taken)),
  130. pool_is_utilized(?FUNCTION_NAME, self(), Size),
  131. [pooler:return_member(?FUNCTION_NAME, Pid) || Pid <- Taken],
  132. pool_is_free(?FUNCTION_NAME, Size),
  133. UtilizationAfter = utilization(?FUNCTION_NAME),
  134. StatsAfter = Stats(),
  135. ?assertEqual(UtilizationBefore, UtilizationAfter),
  136. ?assertEqual(StatsBefore, StatsAfter),
  137. true
  138. end
  139. )
  140. ).
  141. prop_fixed_take_return_broken(doc) ->
  142. "Pool recovers to initial state when all members are returned with 'fail' flag, but workers are replaced".
  143. prop_fixed_take_return_broken() ->
  144. Conf0 = #{
  145. name => ?FUNCTION_NAME,
  146. start_mfa => {pooled_gs, start_link, [{?FUNCTION_NAME}]}
  147. },
  148. Stats = fun() ->
  149. lists:sort([{Pid, State} || {Pid, {_, State, _}} <- pooler:pool_stats(?FUNCTION_NAME)])
  150. end,
  151. ?FORALL(
  152. Size,
  153. pos_integer(),
  154. with_pool(
  155. Conf0#{
  156. init_count => Size,
  157. max_count => Size
  158. },
  159. fun() ->
  160. UtilizationBefore = utilization(?FUNCTION_NAME),
  161. StatsBefore = Stats(),
  162. Taken = take_n(?FUNCTION_NAME, 0, Size),
  163. ?assert(lists:all(fun(Res) -> is_pid(Res) end, Taken)),
  164. pool_is_utilized(?FUNCTION_NAME, self(), Size),
  165. [pooler:return_member(?FUNCTION_NAME, Pid, fail) || Pid <- Taken],
  166. %% Since failed workers are replaced asynchronously, we need to wait for pool to recover
  167. UtilizationAfter =
  168. wait_for_utilization(
  169. ?FUNCTION_NAME,
  170. 5000,
  171. fun(#{free_count := Free, starting_count := Starting}) ->
  172. Free =:= Size andalso Starting =:= 0
  173. end
  174. ),
  175. pool_is_free(?FUNCTION_NAME, Size),
  176. StatsAfter = Stats(),
  177. ?assertEqual(UtilizationBefore, UtilizationAfter),
  178. {PidsBefore, StatusBefore} = lists:unzip(StatsBefore),
  179. {PidsAfter, StatusAfter} = lists:unzip(StatsAfter),
  180. %% all workers have status `free` before and after
  181. ?assertEqual(StatusBefore, StatusAfter),
  182. %% however, all workers are new processes, none reused
  183. ?assertEqual([], ordsets:intersection(ordsets:from_list(PidsBefore), ordsets:from_list(PidsAfter))),
  184. true
  185. end
  186. )
  187. ).
  188. prop_fixed_client_died(doc) ->
  189. "Pool recovers to initial state when client that have taken processes have died with reason 'normal'".
  190. prop_fixed_client_died() ->
  191. Conf0 = #{
  192. name => ?FUNCTION_NAME,
  193. start_mfa => {pooled_gs, start_link, [{?FUNCTION_NAME}]}
  194. },
  195. Stats = fun() ->
  196. lists:sort([{Pid, State} || {Pid, {_, State, _}} <- pooler:pool_stats(?FUNCTION_NAME)])
  197. end,
  198. ?FORALL(
  199. Size,
  200. pos_integer(),
  201. with_pool(
  202. Conf0#{
  203. init_count => Size,
  204. max_count => Size
  205. },
  206. fun() ->
  207. Main = self(),
  208. UtilizationBefore = utilization(?FUNCTION_NAME),
  209. StatsBefore = Stats(),
  210. {Pid, MRef} =
  211. erlang:spawn_monitor(
  212. fun() ->
  213. Taken = take_n(?FUNCTION_NAME, 0, Size),
  214. ?assert(lists:all(fun(Res) -> is_pid(Res) end, Taken)),
  215. Main ! {taken, self()},
  216. receive
  217. {finish, Main} -> ok
  218. after 5000 ->
  219. exit(timeout)
  220. end,
  221. exit(normal)
  222. end
  223. ),
  224. %% Wait for spawned client to take all workers
  225. receive
  226. {taken, Pid} -> ok
  227. after 5000 ->
  228. error(timeout)
  229. end,
  230. pool_is_utilized(?FUNCTION_NAME, Pid, Size),
  231. %% Wait for the client to die
  232. Pid ! {finish, self()},
  233. receive
  234. {'DOWN', MRef, process, Pid, normal} ->
  235. ok
  236. after 5000 ->
  237. error(timeout)
  238. end,
  239. %% Since worker monitors are asynchronous, we need to wait for pool to recover
  240. UtilizationAfter =
  241. wait_for_utilization(
  242. ?FUNCTION_NAME,
  243. 5000,
  244. fun(#{free_count := Free, in_use_count := InUse}) ->
  245. Free =:= Size andalso InUse =:= 0
  246. end
  247. ),
  248. pool_is_free(?FUNCTION_NAME, Size),
  249. StatsAfter = Stats(),
  250. ?assertEqual(UtilizationBefore, UtilizationAfter),
  251. ?assertEqual(StatsBefore, StatsAfter),
  252. true
  253. end
  254. )
  255. ).
  256. prop_group_take_return(doc) ->
  257. "Take all workers from all group members - no more workers can be taken. Return them - pools are free.".
  258. prop_group_take_return() ->
  259. Conf0 = #{start_mfa => {pooled_gs, start_link, [{?FUNCTION_NAME}]}},
  260. PoolName = fun(I) -> list_to_atom(atom_to_list(?FUNCTION_NAME) ++ integer_to_list(I)) end,
  261. ?FORALL(
  262. {NumWorkers, NumPools},
  263. {pos_integer(), pos_integer()},
  264. begin
  265. with_pools(
  266. [
  267. Conf0#{
  268. name => PoolName(I),
  269. init_count => NumWorkers,
  270. max_count => NumWorkers,
  271. group => ?FUNCTION_NAME
  272. }
  273. || I <- lists:seq(1, NumPools)
  274. ],
  275. fun() ->
  276. Client = self(),
  277. %% Group registration is asynchronous, so, need to wait for it to happen
  278. GroupPoolPids = wait_for_group_size(?FUNCTION_NAME, NumPools, 5000),
  279. %% All pools are members of the group
  280. ?assertEqual(NumPools, length(GroupPoolPids)),
  281. %% It's possible to take all workers from all members of a group
  282. Taken = group_take_n(?FUNCTION_NAME, NumWorkers * NumPools),
  283. ?assert(lists:all(fun(Res) -> is_pid(Res) end, Taken)),
  284. %% All pools are saturated
  285. ?assertEqual(error_no_members, pooler:take_group_member(?FUNCTION_NAME)),
  286. %% All pools are utilized
  287. lists:foreach(
  288. fun(Pool) -> pool_is_utilized(Pool, Client, NumWorkers) end,
  289. GroupPoolPids
  290. ),
  291. %% Now return all the workers
  292. [ok = pooler:return_group_member(?FUNCTION_NAME, Pid) || Pid <- Taken],
  293. %% All pools are free
  294. lists:foreach(
  295. fun(Pool) -> pool_is_free(Pool, NumWorkers) end,
  296. GroupPoolPids
  297. ),
  298. true
  299. end
  300. )
  301. end
  302. ).
  303. %% Helpers
  304. take_n(Pool, Timeout, N) when N > 0 ->
  305. [pooler:take_member(Pool, Timeout) | take_n(Pool, Timeout, N - 1)];
  306. take_n(_Pool, _Timeout, 0) ->
  307. [].
  308. group_take_n(Group, N) when N > 0 ->
  309. [pooler:take_group_member(Group) | group_take_n(Group, N - 1)];
  310. group_take_n(_Group, 0) ->
  311. [].
  312. with_pool(Conf, Fun) ->
  313. with_pools([Conf], Fun).
  314. with_pools(Confs, Fun) ->
  315. pg_start(),
  316. %% Disable SASL logs
  317. logger:set_handler_config(default, filters, []),
  318. try
  319. {ok, _} = application:ensure_all_started(pooler),
  320. [{ok, _} = pooler:new_pool(Conf) || Conf <- Confs],
  321. Res = Fun(),
  322. [ok = pooler:rm_pool(maps:get(name, Conf)) || Conf <- Confs],
  323. Res
  324. after
  325. application:stop(pooler)
  326. end.
  327. wait_for_utilization(Pool, Timeout, Fun) when Timeout > 0 ->
  328. Utilization = utilization(Pool),
  329. case Fun(Utilization) of
  330. true ->
  331. Utilization;
  332. false ->
  333. timer:sleep(50),
  334. wait_for_utilization(Pool, Timeout - 50, Fun)
  335. end;
  336. wait_for_utilization(_, _, _) ->
  337. error(timeout).
  338. wait_for_group_size(GroupName, Size, Timeout) when Timeout > 0 ->
  339. Pools = pooler:group_pools(GroupName),
  340. case length(Pools) of
  341. Size ->
  342. Pools;
  343. Larger when Larger > Size ->
  344. error(group_size_exceeded);
  345. Smaller when Smaller < Size ->
  346. timer:sleep(50),
  347. wait_for_group_size(GroupName, Size, Timeout - 50)
  348. end;
  349. wait_for_group_size(_, _, _) ->
  350. error(timeout).
  351. utilization(Pool) ->
  352. maps:from_list(pooler:pool_utilization(Pool)).
  353. pool_is_utilized(Pool, Client, NumWorkers) ->
  354. Utilization = utilization(Pool),
  355. ?assertMatch(
  356. #{
  357. in_use_count := NumWorkers,
  358. free_count := 0,
  359. queued_count := 0
  360. },
  361. Utilization
  362. ),
  363. %% All members are taken by Client
  364. ?assert(
  365. lists:all(
  366. fun({_, {_, State, _}}) -> State =:= Client end,
  367. pooler:pool_stats(Pool)
  368. )
  369. ),
  370. true.
  371. pool_is_free(Pool, NumWorkers) ->
  372. Utilization = utilization(Pool),
  373. ?assertMatch(
  374. #{
  375. in_use_count := 0,
  376. free_count := NumWorkers,
  377. queued_count := 0
  378. },
  379. Utilization
  380. ),
  381. %% All members are free
  382. ?assert(
  383. lists:all(
  384. fun({_, {_, State, _}}) -> State =:= free end,
  385. pooler:pool_stats(Pool)
  386. )
  387. ),
  388. true.
  389. -if(?OTP_RELEASE >= 23).
  390. pg_start() ->
  391. pg:start(pg).
  392. -else.
  393. pg_start() ->
  394. pg2:start().
  395. -endif.