prop_pooler.erl 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. -module(prop_pooler).
  2. -export([
  3. prop_fixed_start/1,
  4. prop_fixed_checkout_all/1,
  5. prop_dynamic_checkout/1,
  6. prop_fixed_take_return/1,
  7. prop_fixed_take_return_broken/1,
  8. prop_fixed_client_died/1,
  9. prop_group_take_return/1
  10. ]).
  11. -include_lib("proper/include/proper.hrl").
  12. -include_lib("stdlib/include/assert.hrl").
  13. -include("pooler.hrl").
  14. prop_fixed_start(doc) ->
  15. "Check that the pool of any fixed size can be started, internal statistics is correct".
  16. prop_fixed_start() ->
  17. Conf0 = [
  18. {name, ?FUNCTION_NAME},
  19. {start_mfa, {pooled_gs, start_link, [{?FUNCTION_NAME}]}}
  20. ],
  21. ?FORALL(
  22. Size,
  23. pos_integer(),
  24. with_pool(
  25. [
  26. {init_count, Size},
  27. {max_count, Size}
  28. | Conf0
  29. ],
  30. fun() ->
  31. %% Pool is not utilized
  32. pool_is_free(?FUNCTION_NAME, Size),
  33. true
  34. end
  35. )
  36. ).
  37. prop_fixed_checkout_all(doc) ->
  38. "Can take all members from fixed-size pool. Following attempts will return error. Stats is correct.".
  39. prop_fixed_checkout_all() ->
  40. Conf0 = [
  41. {name, ?FUNCTION_NAME},
  42. {start_mfa, {pooled_gs, start_link, [{?FUNCTION_NAME}]}}
  43. ],
  44. ?FORALL(
  45. Size,
  46. pos_integer(),
  47. with_pool(
  48. [
  49. {init_count, Size},
  50. {max_count, Size}
  51. | Conf0
  52. ],
  53. fun() ->
  54. ?assert(
  55. lists:all(
  56. fun(Res) -> is_pid(Res) end,
  57. take_n(?FUNCTION_NAME, 0, Size)
  58. )
  59. ),
  60. %% Fixed pool - can't take more members than pool size
  61. ?assertEqual(error_no_members, pooler:take_member(?FUNCTION_NAME, 10)),
  62. %% Pool is fully utilized
  63. pool_is_utilized(?FUNCTION_NAME, self(), Size),
  64. true
  65. end
  66. )
  67. ).
  68. prop_dynamic_checkout(doc) ->
  69. "It's possible to take all fixed and then all dynamic members, but no more than max_count; stats is correct".
  70. prop_dynamic_checkout() ->
  71. Conf0 = [
  72. {name, ?FUNCTION_NAME},
  73. {max_age, {1, min}},
  74. {start_mfa, {pooled_gs, start_link, [{?FUNCTION_NAME}]}}
  75. ],
  76. ?FORALL(
  77. {Size, Extra},
  78. {pos_integer(), pos_integer()},
  79. with_pool(
  80. [
  81. {init_count, Size},
  82. {max_count, Size + Extra}
  83. | Conf0
  84. ],
  85. fun() ->
  86. MaxCount = Size + Extra,
  87. ?assert(
  88. lists:all(
  89. fun(Res) -> is_pid(Res) end,
  90. take_n(?FUNCTION_NAME, 0, Size)
  91. )
  92. ),
  93. %% Fixed pool is fully utilized up to init_count
  94. pool_is_utilized(?FUNCTION_NAME, self(), Size),
  95. %% Take all dynamic workers
  96. ?assert(
  97. lists:all(
  98. fun(Res) -> is_pid(Res) end,
  99. take_n(?FUNCTION_NAME, 1000, Extra)
  100. )
  101. ),
  102. %% Pool is fully utilized now
  103. ?assertEqual(error_no_members, pooler:take_member(?FUNCTION_NAME, 10)),
  104. %% Dynamic pool is fully utilized up to max_count
  105. pool_is_utilized(?FUNCTION_NAME, self(), MaxCount),
  106. true
  107. end
  108. )
  109. ).
  110. prop_fixed_take_return(doc) ->
  111. "The state of the pool is same before all members are taken and after they are returned".
  112. prop_fixed_take_return() ->
  113. Conf0 = [
  114. {name, ?FUNCTION_NAME},
  115. {start_mfa, {pooled_gs, start_link, [{?FUNCTION_NAME}]}}
  116. ],
  117. Stats = fun() ->
  118. lists:sort([{Pid, State} || {Pid, {_, State, _}} <- pooler:pool_stats(?FUNCTION_NAME)])
  119. end,
  120. ?FORALL(
  121. Size,
  122. pos_integer(),
  123. with_pool(
  124. [
  125. {init_count, Size},
  126. {max_count, Size}
  127. | Conf0
  128. ],
  129. fun() ->
  130. UtilizationBefore = utilization(?FUNCTION_NAME),
  131. StatsBefore = Stats(),
  132. Taken = take_n(?FUNCTION_NAME, 0, Size),
  133. ?assert(lists:all(fun(Res) -> is_pid(Res) end, Taken)),
  134. pool_is_utilized(?FUNCTION_NAME, self(), Size),
  135. [pooler:return_member(?FUNCTION_NAME, Pid) || Pid <- Taken],
  136. pool_is_free(?FUNCTION_NAME, Size),
  137. UtilizationAfter = utilization(?FUNCTION_NAME),
  138. StatsAfter = Stats(),
  139. ?assertEqual(UtilizationBefore, UtilizationAfter),
  140. ?assertEqual(StatsBefore, StatsAfter),
  141. true
  142. end
  143. )
  144. ).
  145. prop_fixed_take_return_broken(doc) ->
  146. "Pool recovers to initial state when all members are returned with 'fail' flag, but workers are replaced".
  147. prop_fixed_take_return_broken() ->
  148. Conf0 = [
  149. {name, ?FUNCTION_NAME},
  150. {start_mfa, {pooled_gs, start_link, [{?FUNCTION_NAME}]}}
  151. ],
  152. Stats = fun() ->
  153. lists:sort([{Pid, State} || {Pid, {_, State, _}} <- pooler:pool_stats(?FUNCTION_NAME)])
  154. end,
  155. ?FORALL(
  156. Size,
  157. pos_integer(),
  158. with_pool(
  159. [
  160. {init_count, Size},
  161. {max_count, Size}
  162. | Conf0
  163. ],
  164. fun() ->
  165. UtilizationBefore = utilization(?FUNCTION_NAME),
  166. StatsBefore = Stats(),
  167. Taken = take_n(?FUNCTION_NAME, 0, Size),
  168. ?assert(lists:all(fun(Res) -> is_pid(Res) end, Taken)),
  169. pool_is_utilized(?FUNCTION_NAME, self(), Size),
  170. [pooler:return_member(?FUNCTION_NAME, Pid, fail) || Pid <- Taken],
  171. %% Since failed workers are replaced asynchronously, we need to wait for pool to recover
  172. UtilizationAfter =
  173. wait_for_utilization(
  174. ?FUNCTION_NAME,
  175. 5000,
  176. fun(#{free_count := Free, starting_count := Starting}) ->
  177. Free =:= Size andalso Starting =:= 0
  178. end
  179. ),
  180. pool_is_free(?FUNCTION_NAME, Size),
  181. StatsAfter = Stats(),
  182. ?assertEqual(UtilizationBefore, UtilizationAfter),
  183. {PidsBefore, StatusBefore} = lists:unzip(StatsBefore),
  184. {PidsAfter, StatusAfter} = lists:unzip(StatsAfter),
  185. %% all workers have status `free` before and after
  186. ?assertEqual(StatusBefore, StatusAfter),
  187. %% however, all workers are new processes, none reused
  188. ?assertEqual([], ordsets:intersection(ordsets:from_list(PidsBefore), ordsets:from_list(PidsAfter))),
  189. true
  190. end
  191. )
  192. ).
  193. prop_fixed_client_died(doc) ->
  194. "Pool recovers to initial state when client that have taken processes have died with reason 'normal'".
  195. prop_fixed_client_died() ->
  196. Conf0 = [
  197. {name, ?FUNCTION_NAME},
  198. {start_mfa, {pooled_gs, start_link, [{?FUNCTION_NAME}]}}
  199. ],
  200. Stats = fun() ->
  201. lists:sort([{Pid, State} || {Pid, {_, State, _}} <- pooler:pool_stats(?FUNCTION_NAME)])
  202. end,
  203. ?FORALL(
  204. Size,
  205. pos_integer(),
  206. with_pool(
  207. [
  208. {init_count, Size},
  209. {max_count, Size}
  210. | Conf0
  211. ],
  212. fun() ->
  213. Main = self(),
  214. UtilizationBefore = utilization(?FUNCTION_NAME),
  215. StatsBefore = Stats(),
  216. {Pid, MRef} =
  217. erlang:spawn_monitor(
  218. fun() ->
  219. Taken = take_n(?FUNCTION_NAME, 0, Size),
  220. ?assert(lists:all(fun(Res) -> is_pid(Res) end, Taken)),
  221. Main ! {taken, self()},
  222. receive
  223. {finish, Main} -> ok
  224. after 5000 ->
  225. exit(timeout)
  226. end,
  227. exit(normal)
  228. end
  229. ),
  230. %% Wait for spawned client to take all workers
  231. receive
  232. {taken, Pid} -> ok
  233. after 5000 ->
  234. error(timeout)
  235. end,
  236. pool_is_utilized(?FUNCTION_NAME, Pid, Size),
  237. %% Wait for the client to die
  238. Pid ! {finish, self()},
  239. receive
  240. {'DOWN', MRef, process, Pid, normal} ->
  241. ok
  242. after 5000 ->
  243. error(timeout)
  244. end,
  245. %% Since worker monitors are asynchronous, we need to wait for pool to recover
  246. UtilizationAfter =
  247. wait_for_utilization(
  248. ?FUNCTION_NAME,
  249. 5000,
  250. fun(#{free_count := Free, in_use_count := InUse}) ->
  251. Free =:= Size andalso InUse =:= 0
  252. end
  253. ),
  254. pool_is_free(?FUNCTION_NAME, Size),
  255. StatsAfter = Stats(),
  256. ?assertEqual(UtilizationBefore, UtilizationAfter),
  257. ?assertEqual(StatsBefore, StatsAfter),
  258. true
  259. end
  260. )
  261. ).
  262. prop_group_take_return(doc) ->
  263. "Take all workers from all group members - no more workers can be taken. Return them - pools are free.".
  264. prop_group_take_return() ->
  265. Conf0 = [
  266. {start_mfa, {pooled_gs, start_link, [{?FUNCTION_NAME}]}}
  267. ],
  268. PoolName = fun(I) -> list_to_atom(atom_to_list(?FUNCTION_NAME) ++ integer_to_list(I)) end,
  269. ?FORALL(
  270. {NumWorkers, NumPools},
  271. {pos_integer(), pos_integer()},
  272. begin
  273. with_pools(
  274. [
  275. [
  276. {name, PoolName(I)},
  277. {init_count, NumWorkers},
  278. {max_count, NumWorkers},
  279. {group, ?FUNCTION_NAME}
  280. | Conf0
  281. ]
  282. || I <- lists:seq(1, NumPools)
  283. ],
  284. fun() ->
  285. Client = self(),
  286. %% Group registration is asynchronous, so, need to wait for it to happen
  287. GroupPoolPids = wait_for_group_size(?FUNCTION_NAME, NumPools, 5000),
  288. %% All pools are members of the group
  289. ?assertEqual(NumPools, length(GroupPoolPids)),
  290. %% It's possible to take all workers from all members of a group
  291. Taken = group_take_n(?FUNCTION_NAME, NumWorkers * NumPools),
  292. ?assert(lists:all(fun(Res) -> is_pid(Res) end, Taken)),
  293. %% All pools are saturated
  294. ?assertEqual(error_no_members, pooler:take_group_member(?FUNCTION_NAME)),
  295. %% All pools are utilized
  296. lists:foreach(
  297. fun(Pool) -> pool_is_utilized(Pool, Client, NumWorkers) end,
  298. GroupPoolPids
  299. ),
  300. %% Now return all the workers
  301. [ok = pooler:return_group_member(?FUNCTION_NAME, Pid) || Pid <- Taken],
  302. %% All pools are free
  303. lists:foreach(
  304. fun(Pool) -> pool_is_free(Pool, NumWorkers) end,
  305. GroupPoolPids
  306. ),
  307. true
  308. end
  309. )
  310. end
  311. ).
  312. %% Helpers
  313. take_n(Pool, Timeout, N) when N > 0 ->
  314. [pooler:take_member(Pool, Timeout) | take_n(Pool, Timeout, N - 1)];
  315. take_n(_Pool, _Timeout, 0) ->
  316. [].
  317. group_take_n(Group, N) when N > 0 ->
  318. [pooler:take_group_member(Group) | group_take_n(Group, N - 1)];
  319. group_take_n(_Group, 0) ->
  320. [].
  321. with_pool(Conf, Fun) ->
  322. with_pools([Conf], Fun).
  323. with_pools(Confs, Fun) ->
  324. pg_start(),
  325. %% Disable SASL logs
  326. logger:set_handler_config(default, filters, []),
  327. try
  328. {ok, _} = application:ensure_all_started(pooler),
  329. [{ok, _} = pooler:new_pool(Conf) || Conf <- Confs],
  330. Res = Fun(),
  331. [ok = pooler:rm_pool(proplists:get_value(name, Conf)) || Conf <- Confs],
  332. Res
  333. after
  334. application:stop(pooler)
  335. end.
  336. wait_for_utilization(Pool, Timeout, Fun) when Timeout > 0 ->
  337. Utilization = utilization(Pool),
  338. case Fun(Utilization) of
  339. true ->
  340. Utilization;
  341. false ->
  342. timer:sleep(50),
  343. wait_for_utilization(Pool, Timeout - 50, Fun)
  344. end;
  345. wait_for_utilization(_, _, _) ->
  346. error(timeout).
  347. wait_for_group_size(GroupName, Size, Timeout) when Timeout > 0 ->
  348. Pools = pooler:group_pools(GroupName),
  349. case length(Pools) of
  350. Size ->
  351. Pools;
  352. Larger when Larger > Size ->
  353. error(group_size_exceeded);
  354. Smaller when Smaller < Size ->
  355. timer:sleep(50),
  356. wait_for_group_size(GroupName, Size, Timeout - 50)
  357. end;
  358. wait_for_group_size(_, _, _) ->
  359. error(timeout).
  360. utilization(Pool) ->
  361. maps:from_list(pooler:pool_utilization(Pool)).
  362. pool_is_utilized(Pool, Client, NumWorkers) ->
  363. Utilization = utilization(Pool),
  364. ?assertMatch(
  365. #{
  366. in_use_count := NumWorkers,
  367. free_count := 0,
  368. queued_count := 0
  369. },
  370. Utilization
  371. ),
  372. %% All members are taken by Client
  373. ?assert(
  374. lists:all(
  375. fun({_, {_, State, _}}) -> State =:= Client end,
  376. pooler:pool_stats(Pool)
  377. )
  378. ),
  379. true.
  380. pool_is_free(Pool, NumWorkers) ->
  381. Utilization = utilization(Pool),
  382. ?assertMatch(
  383. #{
  384. in_use_count := 0,
  385. free_count := NumWorkers,
  386. queued_count := 0
  387. },
  388. Utilization
  389. ),
  390. %% All members are free
  391. ?assert(
  392. lists:all(
  393. fun({_, {_, State, _}}) -> State =:= free end,
  394. pooler:pool_stats(Pool)
  395. )
  396. ),
  397. true.
  398. -if(?OTP_RELEASE >= 23).
  399. pg_start() ->
  400. pg:start(pg).
  401. -else.
  402. pg_start() ->
  403. pg2:start().
  404. -endif.