pidq_test.erl 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. -module(pidq_test).
  2. -include_lib("eunit/include/eunit.hrl").
  3. -compile([export_all]).
  4. % The `user' processes represent users of the pidq library. A user
  5. % process will take a pid, report details on the pid it has, release
  6. % and take a new pid, stop cleanly, and crash.
  7. start_user() ->
  8. spawn(fun() ->
  9. TC = pidq:take_pid(),
  10. user_loop(TC)
  11. end).
  12. user_id(Pid) ->
  13. Pid ! {get_tc_id, self()},
  14. receive
  15. {Type, Id} ->
  16. {Type, Id}
  17. end.
  18. user_new_tc(Pid) ->
  19. Pid ! new_tc.
  20. user_stop(Pid) ->
  21. Pid ! stop.
  22. user_crash(Pid) ->
  23. Pid ! crash.
  24. user_loop(MyTC) ->
  25. receive
  26. {get_tc_id, From} ->
  27. From ! get_tc_id(MyTC),
  28. user_loop(MyTC);
  29. new_tc ->
  30. pidq:return_pid(MyTC, ok),
  31. MyNewTC = pidq:take_pid(),
  32. user_loop(MyNewTC);
  33. stop ->
  34. pidq:return_pid(MyTC, ok),
  35. stopped;
  36. crash ->
  37. erlang:error({user_loop, kaboom})
  38. end.
  39. % The `tc' processes represent the pids tracked by pidq for testing.
  40. % They have a type and an ID and can report their type and ID and
  41. % stop.
  42. tc_loop({Type, Id}) ->
  43. receive
  44. {get_id, From} ->
  45. From ! {ok, Type, Id},
  46. tc_loop({Type, Id});
  47. stop -> stopped;
  48. crash ->
  49. erlang:error({tc_loop, kaboom})
  50. end.
  51. get_tc_id(Pid) ->
  52. Pid ! {get_id, self()},
  53. receive
  54. {ok, Type, Id} ->
  55. {Type, Id}
  56. after 200 ->
  57. timeout
  58. end.
  59. stop_tc(Pid) ->
  60. Pid ! stop.
  61. tc_starter(Type) ->
  62. Ref = make_ref(),
  63. spawn_link(fun() -> tc_loop({Type, Ref}) end).
  64. assert_tc_valid(Pid) ->
  65. ?assertMatch({_Type, _Ref}, get_tc_id(Pid)),
  66. ok.
  67. tc_sanity_test() ->
  68. Pid1 = tc_starter("1"),
  69. {"1", Id1} = get_tc_id(Pid1),
  70. Pid2 = tc_starter("1"),
  71. {"1", Id2} = get_tc_id(Pid2),
  72. ?assertNot(Id1 == Id2),
  73. stop_tc(Pid1),
  74. stop_tc(Pid2).
  75. user_sanity_test() ->
  76. Pid1 = tc_starter("1"),
  77. User = spawn(fun() -> user_loop(Pid1) end),
  78. ?assertMatch({"1", _Ref}, user_id(User)),
  79. user_crash(User),
  80. stop_tc(Pid1).
  81. pidq_basics_test_() ->
  82. {foreach,
  83. % setup
  84. fun() ->
  85. Pools = [[{name, "p1"},
  86. {max_pids, 3}, {min_free, 1},
  87. {init_size, 2}, {pid_starter_args, ["type-0"]}]],
  88. Config = [{pid_starter, {?MODULE, tc_starter}},
  89. {pid_stopper, {?MODULE, stop_tc}},
  90. {pools, Pools}],
  91. pidq:start(Config)
  92. end,
  93. fun(_X) ->
  94. pidq:stop()
  95. end,
  96. [
  97. {"take and return one",
  98. fun() ->
  99. P = pidq:take_pid(),
  100. ?assertMatch({"type-0", _Id}, get_tc_id(P)),
  101. ok = pidq:return_pid(P, ok)
  102. end},
  103. {"pids are created on demand until max",
  104. fun() ->
  105. Pids = [pidq:take_pid(), pidq:take_pid(), pidq:take_pid()],
  106. ?assertMatch(error_no_pids, pidq:take_pid()),
  107. ?assertMatch(error_no_pids, pidq:take_pid()),
  108. PRefs = [ R || {_T, R} <- [ get_tc_id(P) || P <- Pids ] ],
  109. % no duplicates
  110. ?assertEqual(length(PRefs), length(lists:usort(PRefs)))
  111. end
  112. },
  113. {"pids are reused most recent return first",
  114. fun() ->
  115. P1 = pidq:take_pid(),
  116. P2 = pidq:take_pid(),
  117. ?assertNot(P1 == P2),
  118. ok = pidq:return_pid(P1, ok),
  119. ok = pidq:return_pid(P2, ok),
  120. % pids are reused most recent first
  121. ?assertEqual(P2, pidq:take_pid()),
  122. ?assertEqual(P1, pidq:take_pid())
  123. end},
  124. {"if a pid crashes it is replaced",
  125. fun() ->
  126. Pids0 = [pidq:take_pid(), pidq:take_pid(), pidq:take_pid()],
  127. Ids0 = [ get_tc_id(P) || P <- Pids0 ],
  128. % crash them all
  129. [ P ! crash || P <- Pids0 ],
  130. Pids1 = get_n_pids(3, []),
  131. Ids1 = [ get_tc_id(P) || P <- Pids1 ],
  132. [ ?assertNot(lists:member(I, Ids0)) || I <- Ids1 ]
  133. end
  134. },
  135. {"if a pid is returned with bad status it is replaced",
  136. fun() ->
  137. Pids0 = [pidq:take_pid(), pidq:take_pid(), pidq:take_pid()],
  138. Ids0 = [ get_tc_id(P) || P <- Pids0 ],
  139. % return them all marking as bad
  140. [ pidq:return_pid(P, fail) || P <- Pids0 ],
  141. Pids1 = get_n_pids(3, []),
  142. Ids1 = [ get_tc_id(P) || P <- Pids1 ],
  143. [ ?assertNot(lists:member(I, Ids0)) || I <- Ids1 ]
  144. end
  145. },
  146. {"if a consumer crashes, pid is replaced",
  147. fun() ->
  148. Consumer = start_user(),
  149. StartId = user_id(Consumer),
  150. ?debugVal(pidq:pool_stats("p1")),
  151. user_crash(Consumer),
  152. NewPid = hd(get_n_pids(1, [])),
  153. NewId = get_tc_id(NewPid),
  154. ?debugVal(pidq:pool_stats("p1")),
  155. ?assertNot(NewId == StartId)
  156. end
  157. }
  158. ]}.
  159. pidq_integration_test_() ->
  160. {foreach,
  161. % setup
  162. fun() ->
  163. Pools = [[{name, "p1"},
  164. {max_pids, 10},
  165. {min_free, 3},
  166. {init_size, 10},
  167. {pid_starter_args, ["type-0"]}]],
  168. Config = [{pid_starter, {?MODULE, tc_starter}},
  169. {pid_stopper, {?MODULE, stop_tc}},
  170. {pools, Pools}],
  171. pidq:start(Config),
  172. Users = [ start_user() || _X <- lists:seq(1, 10) ],
  173. Users
  174. end,
  175. % cleanup
  176. fun(Users) ->
  177. [ user_stop(U) || U <- Users ],
  178. pidq:stop()
  179. end,
  180. %
  181. [
  182. fun(Users) ->
  183. fun() ->
  184. % each user has a different tc ID
  185. TcIds = lists:sort([ user_id(UPid) || UPid <- Users ]),
  186. ?assertEqual(lists:usort(TcIds), TcIds)
  187. end
  188. end,
  189. fun(Users) ->
  190. fun() ->
  191. % users still unique after a renew cycle
  192. [ user_new_tc(UPid) || UPid <- Users ],
  193. TcIds = lists:sort([ user_id(UPid) || UPid <- Users ]),
  194. ?assertEqual(lists:usort(TcIds), TcIds)
  195. end
  196. end
  197. ,
  198. fun(Users) ->
  199. fun() ->
  200. % all users crash, pids are replaced
  201. TcIds1 = lists:sort([ user_id(UPid) || UPid <- Users ]),
  202. [ user_crash(UPid) || UPid <- Users ],
  203. Seq = lists:seq(1, 5),
  204. Users2 = [ start_user() || _X <- Seq ],
  205. TcIds2 = lists:sort([ user_id(UPid) || UPid <- Users2 ]),
  206. Both =
  207. sets:to_list(sets:intersection([sets:from_list(TcIds1),
  208. sets:from_list(TcIds2)])),
  209. ?assertEqual([], Both)
  210. end
  211. end
  212. ]
  213. }.
  214. % testing crash recovery means race conditions when either pids
  215. % haven't yet crashed or pidq hasn't recovered. So this helper loops
  216. % forver until N pids are obtained, ignoring error_no_pids.
  217. get_n_pids(0, Acc) ->
  218. Acc;
  219. get_n_pids(N, Acc) ->
  220. case pidq:take_pid() of
  221. error_no_pids ->
  222. get_n_pids(N, Acc);
  223. Pid ->
  224. get_n_pids(N - 1, [Pid|Acc])
  225. end.