test_social_pipeline.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767
  1. import json
  2. from django.contrib.auth import get_user_model
  3. from django.core import mail
  4. from django.test import RequestFactory
  5. from social_core.backends.github import GithubOAuth2
  6. from social_django.utils import load_strategy
  7. from ...acl import ACL_CACHE
  8. from ...acl.useracl import get_user_acl
  9. from ...conf.dynamicsettings import DynamicSettings
  10. from ...conf.test import override_dynamic_settings
  11. from ...conftest import get_cache_versions
  12. from ...core.exceptions import SocialAuthBanned, SocialAuthFailed
  13. from ...legal.models import Agreement
  14. from ..models import AnonymousUser, Ban, BanCache
  15. from ..social.pipeline import (
  16. associate_by_email,
  17. create_user,
  18. create_user_with_form,
  19. get_username,
  20. require_activation,
  21. validate_ip_not_banned,
  22. validate_user_not_banned,
  23. )
  24. from ..test import UserTestCase
  25. User = get_user_model()
  26. def create_request(user_ip="0.0.0.0", data=None):
  27. factory = RequestFactory()
  28. if data is None:
  29. request = factory.get("/")
  30. else:
  31. request = factory.post(
  32. "/", data=json.dumps(data), content_type="application/json"
  33. )
  34. request.include_frontend_context = True
  35. request.cache_versions = get_cache_versions()
  36. request.frontend_context = {}
  37. request.session = {}
  38. request.settings = DynamicSettings(request.cache_versions)
  39. request.user = AnonymousUser()
  40. request.user_acl = get_user_acl(request.user, request.cache_versions)
  41. request.user_ip = user_ip
  42. return request
  43. def create_strategy():
  44. request = create_request()
  45. return load_strategy(request=request)
  46. class MockStrategy(object):
  47. def __init__(self, user_ip="0.0.0.0"):
  48. self.cleaned_partial_token = None
  49. self.request = create_request(user_ip)
  50. def clean_partial_pipeline(self, token):
  51. self.cleaned_partial_token = token
  52. class PipelineTestCase(UserTestCase):
  53. def get_initial_user(self):
  54. self.user = self.get_authenticated_user()
  55. def assertNewUserIsCorrect(
  56. self, new_user, form_data=None, activation=None, email_verified=False
  57. ):
  58. self.assertFalse(new_user.has_usable_password())
  59. self.assertIn("Welcome", mail.outbox[0].subject)
  60. if form_data:
  61. self.assertEqual(new_user.email, form_data["email"])
  62. self.assertEqual(new_user.username, form_data["username"])
  63. if activation == "none":
  64. self.assertEqual(new_user.requires_activation, User.ACTIVATION_NONE)
  65. if activation == "user":
  66. if email_verified:
  67. self.assertEqual(new_user.requires_activation, User.ACTIVATION_NONE)
  68. else:
  69. self.assertEqual(new_user.requires_activation, User.ACTIVATION_USER)
  70. if activation == "admin":
  71. self.assertEqual(new_user.requires_activation, User.ACTIVATION_ADMIN)
  72. self.assertEqual(new_user.audittrail_set.count(), 1)
  73. def assertJsonResponseEquals(self, response, value):
  74. response_content = response.content.decode("utf-8")
  75. response_json = json.loads(response_content)
  76. self.assertEqual(response_json, value)
  77. class AssociateByEmailTests(PipelineTestCase):
  78. def test_skip_if_user_is_already_set(self):
  79. """pipeline step is skipped if user was found by previous step"""
  80. result = associate_by_email(None, {}, GithubOAuth2, self.user)
  81. self.assertIsNone(result)
  82. def test_skip_if_no_email_passed(self):
  83. """pipeline step is skipped if no email was passed"""
  84. result = associate_by_email(None, {}, GithubOAuth2)
  85. self.assertIsNone(result)
  86. def test_skip_if_user_with_email_not_found(self):
  87. """pipeline step is skipped if no email was passed"""
  88. result = associate_by_email(None, {"email": "not@found.com"}, GithubOAuth2)
  89. self.assertIsNone(result)
  90. def test_raise_if_user_is_inactive(self):
  91. """pipeline raises if user was inactive"""
  92. self.user.is_active = False
  93. self.user.save()
  94. try:
  95. associate_by_email(None, {"email": self.user.email}, GithubOAuth2)
  96. self.fail("associate_by_email should raise SocialAuthFailed")
  97. except SocialAuthFailed as e:
  98. self.assertEqual(
  99. e.message,
  100. (
  101. "The e-mail address associated with your GitHub account is not available for "
  102. "use on this site."
  103. ),
  104. )
  105. def test_raise_if_user_needs_admin_activation(self):
  106. """pipeline raises if user needs admin activation"""
  107. self.user.requires_activation = User.ACTIVATION_ADMIN
  108. self.user.save()
  109. try:
  110. associate_by_email(None, {"email": self.user.email}, GithubOAuth2)
  111. self.fail("associate_by_email should raise SocialAuthFailed")
  112. except SocialAuthFailed as e:
  113. self.assertEqual(
  114. e.message,
  115. (
  116. "Your account has to be activated by site administrator before you will be "
  117. "able to sign in with GitHub."
  118. ),
  119. )
  120. def test_return_user(self):
  121. """pipeline returns user if email was found"""
  122. result = associate_by_email(None, {"email": self.user.email}, GithubOAuth2)
  123. self.assertEqual(result, {"user": self.user, "is_new": False})
  124. def test_return_user_email_inactive(self):
  125. """pipeline returns user even if they didn't activate their account manually"""
  126. self.user.requires_activation = User.ACTIVATION_USER
  127. self.user.save()
  128. result = associate_by_email(None, {"email": self.user.email}, GithubOAuth2)
  129. self.assertEqual(result, {"user": self.user, "is_new": False})
  130. class CreateUser(PipelineTestCase):
  131. def test_skip_if_user_is_set(self):
  132. """pipeline step is skipped if user was passed"""
  133. result = create_user(MockStrategy(), {}, GithubOAuth2(), user=self.user)
  134. self.assertIsNone(result)
  135. def test_skip_if_no_email_passed(self):
  136. """pipeline step is skipped if no email was passed"""
  137. result = create_user(
  138. MockStrategy(), {}, GithubOAuth2(), clean_username="TestBob"
  139. )
  140. self.assertIsNone(result)
  141. def test_skip_if_no_clean_username_passed(self):
  142. """pipeline step is skipped if cleaned username wasnt passed"""
  143. result = create_user(
  144. MockStrategy(), {"email": "hello@example.com"}, GithubOAuth2()
  145. )
  146. self.assertIsNone(result)
  147. def test_skip_if_email_is_taken(self):
  148. """pipeline step is skipped if email was taken"""
  149. result = create_user(
  150. MockStrategy(),
  151. {"email": self.user.email},
  152. GithubOAuth2(),
  153. clean_username="NewUser",
  154. )
  155. self.assertIsNone(result)
  156. @override_dynamic_settings(account_activation="none")
  157. def test_user_created_no_activation(self):
  158. """pipeline step creates active user for valid data and disabled activation"""
  159. result = create_user(
  160. MockStrategy(),
  161. {"email": "new@example.com"},
  162. GithubOAuth2(),
  163. clean_username="NewUser",
  164. )
  165. new_user = User.objects.get(email="new@example.com")
  166. self.assertEqual(result, {"user": new_user, "is_new": True})
  167. self.assertEqual(new_user.username, "NewUser")
  168. self.assertNewUserIsCorrect(new_user, email_verified=True, activation="none")
  169. @override_dynamic_settings(account_activation="user")
  170. def test_user_created_activation_by_user(self):
  171. """pipeline step creates active user for valid data and user activation"""
  172. result = create_user(
  173. MockStrategy(),
  174. {"email": "new@example.com"},
  175. GithubOAuth2(),
  176. clean_username="NewUser",
  177. )
  178. new_user = User.objects.get(email="new@example.com")
  179. self.assertEqual(result, {"user": new_user, "is_new": True})
  180. self.assertEqual(new_user.username, "NewUser")
  181. self.assertNewUserIsCorrect(new_user, email_verified=True, activation="user")
  182. @override_dynamic_settings(account_activation="admin")
  183. def test_user_created_activation_by_admin(self):
  184. """pipeline step creates in user for valid data and admin activation"""
  185. result = create_user(
  186. MockStrategy(),
  187. {"email": "new@example.com"},
  188. GithubOAuth2(),
  189. clean_username="NewUser",
  190. )
  191. new_user = User.objects.get(email="new@example.com")
  192. self.assertEqual(result, {"user": new_user, "is_new": True})
  193. self.assertEqual(new_user.username, "NewUser")
  194. self.assertNewUserIsCorrect(new_user, email_verified=True, activation="admin")
  195. class CreateUserWithFormTests(PipelineTestCase):
  196. def setUp(self):
  197. super().setUp()
  198. Agreement.objects.invalidate_cache()
  199. def tearDown(self):
  200. super().tearDown()
  201. Agreement.objects.invalidate_cache()
  202. def test_skip_if_user_is_set(self):
  203. """pipeline step is skipped if user was passed"""
  204. request = create_request()
  205. strategy = load_strategy(request=request)
  206. backend = GithubOAuth2(strategy, "/")
  207. result = create_user_with_form(
  208. strategy=strategy,
  209. details={},
  210. backend=backend,
  211. user=self.user,
  212. pipeline_index=1,
  213. )
  214. self.assertEqual(result, {})
  215. def test_renders_form_if_not_post(self):
  216. """pipeline step renders form if not POST"""
  217. request = create_request()
  218. strategy = load_strategy(request=request)
  219. backend = GithubOAuth2(strategy, "/")
  220. response = create_user_with_form(
  221. strategy=strategy, details={}, backend=backend, user=None, pipeline_index=1
  222. )
  223. self.assertContains(response, "GitHub")
  224. def test_empty_data_rejected(self):
  225. """form rejects empty data"""
  226. request = create_request(data={})
  227. strategy = load_strategy(request=request)
  228. backend = GithubOAuth2(strategy, "/")
  229. response = create_user_with_form(
  230. strategy=strategy, details={}, backend=backend, user=None, pipeline_index=1
  231. )
  232. self.assertEqual(response.status_code, 400)
  233. self.assertJsonResponseEquals(
  234. response,
  235. {
  236. "email": ["This field is required."],
  237. "username": ["This field is required."],
  238. },
  239. )
  240. def test_taken_data_rejected(self):
  241. """form rejects taken data"""
  242. request = create_request(
  243. data={"email": self.user.email, "username": self.user.username}
  244. )
  245. strategy = load_strategy(request=request)
  246. backend = GithubOAuth2(strategy, "/")
  247. response = create_user_with_form(
  248. strategy=strategy, details={}, backend=backend, user=None, pipeline_index=1
  249. )
  250. self.assertEqual(response.status_code, 400)
  251. self.assertJsonResponseEquals(
  252. response,
  253. {
  254. "email": ["This e-mail address is not available."],
  255. "username": ["This username is not available."],
  256. },
  257. )
  258. @override_dynamic_settings(account_activation="none")
  259. def test_user_created_no_activation_verified_email(self):
  260. """active user is created for verified email and activation disabled"""
  261. form_data = {"email": "social@auth.com", "username": "SocialUser"}
  262. request = create_request(data=form_data)
  263. strategy = load_strategy(request=request)
  264. backend = GithubOAuth2(strategy, "/")
  265. result = create_user_with_form(
  266. strategy=strategy,
  267. details={"email": form_data["email"]},
  268. backend=backend,
  269. user=None,
  270. pipeline_index=1,
  271. )
  272. new_user = User.objects.get(email="social@auth.com")
  273. self.assertEqual(result, {"user": new_user, "is_new": True})
  274. self.assertNewUserIsCorrect(
  275. new_user, form_data, activation="none", email_verified=True
  276. )
  277. @override_dynamic_settings(account_activation="none")
  278. def test_user_created_no_activation_nonverified_email(self):
  279. """active user is created for non-verified email and activation disabled"""
  280. form_data = {"email": "social@auth.com", "username": "SocialUser"}
  281. request = create_request(data=form_data)
  282. strategy = load_strategy(request=request)
  283. backend = GithubOAuth2(strategy, "/")
  284. result = create_user_with_form(
  285. strategy=strategy,
  286. details={"email": ""},
  287. backend=backend,
  288. user=None,
  289. pipeline_index=1,
  290. )
  291. new_user = User.objects.get(email="social@auth.com")
  292. self.assertEqual(result, {"user": new_user, "is_new": True})
  293. self.assertNewUserIsCorrect(
  294. new_user, form_data, activation="none", email_verified=False
  295. )
  296. @override_dynamic_settings(account_activation="user")
  297. def test_user_created_activation_by_user_verified_email(self):
  298. """active user is created for verified email and activation by user"""
  299. form_data = {"email": "social@auth.com", "username": "SocialUser"}
  300. request = create_request(data=form_data)
  301. strategy = load_strategy(request=request)
  302. backend = GithubOAuth2(strategy, "/")
  303. result = create_user_with_form(
  304. strategy=strategy,
  305. details={"email": form_data["email"]},
  306. backend=backend,
  307. user=None,
  308. pipeline_index=1,
  309. )
  310. new_user = User.objects.get(email="social@auth.com")
  311. self.assertEqual(result, {"user": new_user, "is_new": True})
  312. self.assertNewUserIsCorrect(
  313. new_user, form_data, activation="user", email_verified=True
  314. )
  315. @override_dynamic_settings(account_activation="user")
  316. def test_user_created_activation_by_user_nonverified_email(self):
  317. """inactive user is created for non-verified email and activation by user"""
  318. form_data = {"email": "social@auth.com", "username": "SocialUser"}
  319. request = create_request(data=form_data)
  320. strategy = load_strategy(request=request)
  321. backend = GithubOAuth2(strategy, "/")
  322. result = create_user_with_form(
  323. strategy=strategy,
  324. details={"email": ""},
  325. backend=backend,
  326. user=None,
  327. pipeline_index=1,
  328. )
  329. new_user = User.objects.get(email="social@auth.com")
  330. self.assertEqual(result, {"user": new_user, "is_new": True})
  331. self.assertNewUserIsCorrect(
  332. new_user, form_data, activation="user", email_verified=False
  333. )
  334. @override_dynamic_settings(account_activation="admin")
  335. def test_user_created_activation_by_admin_verified_email(self):
  336. """inactive user is created for verified email and activation by admin"""
  337. form_data = {"email": "social@auth.com", "username": "SocialUser"}
  338. request = create_request(data=form_data)
  339. strategy = load_strategy(request=request)
  340. backend = GithubOAuth2(strategy, "/")
  341. result = create_user_with_form(
  342. strategy=strategy,
  343. details={"email": form_data["email"]},
  344. backend=backend,
  345. user=None,
  346. pipeline_index=1,
  347. )
  348. new_user = User.objects.get(email="social@auth.com")
  349. self.assertEqual(result, {"user": new_user, "is_new": True})
  350. self.assertNewUserIsCorrect(
  351. new_user, form_data, activation="admin", email_verified=True
  352. )
  353. @override_dynamic_settings(account_activation="admin")
  354. def test_user_created_activation_by_admin_nonverified_email(self):
  355. """inactive user is created for non-verified email and activation by admin"""
  356. form_data = {"email": "social@auth.com", "username": "SocialUser"}
  357. request = create_request(data=form_data)
  358. strategy = load_strategy(request=request)
  359. backend = GithubOAuth2(strategy, "/")
  360. result = create_user_with_form(
  361. strategy=strategy,
  362. details={"email": ""},
  363. backend=backend,
  364. user=None,
  365. pipeline_index=1,
  366. )
  367. new_user = User.objects.get(email="social@auth.com")
  368. self.assertEqual(result, {"user": new_user, "is_new": True})
  369. self.assertNewUserIsCorrect(
  370. new_user, form_data, activation="admin", email_verified=False
  371. )
  372. def test_form_check_agreement(self):
  373. """social register checks agreement"""
  374. form_data = {"email": "social@auth.com", "username": "SocialUser"}
  375. request = create_request(data=form_data)
  376. strategy = load_strategy(request=request)
  377. backend = GithubOAuth2(strategy, "/")
  378. agreement = Agreement.objects.create(
  379. type=Agreement.TYPE_TOS, text="Lorem ipsum", is_active=True
  380. )
  381. response = create_user_with_form(
  382. strategy=strategy,
  383. details={"email": form_data["email"]},
  384. backend=backend,
  385. user=None,
  386. pipeline_index=1,
  387. )
  388. self.assertEqual(response.status_code, 400)
  389. self.assertJsonResponseEquals(
  390. response, {"terms_of_service": ["This agreement is required."]}
  391. )
  392. # invalid agreement id
  393. form_data = {
  394. "email": "social@auth.com",
  395. "username": "SocialUser",
  396. "terms_of_service": agreement.id + 1,
  397. }
  398. request = create_request(data=form_data)
  399. strategy = load_strategy(request=request)
  400. backend = GithubOAuth2(strategy, "/")
  401. response = create_user_with_form(
  402. strategy=strategy,
  403. details={"email": form_data["email"]},
  404. backend=backend,
  405. user=None,
  406. pipeline_index=1,
  407. )
  408. self.assertEqual(response.status_code, 400)
  409. self.assertJsonResponseEquals(
  410. response, {"terms_of_service": ["This agreement is required."]}
  411. )
  412. # valid agreement id
  413. form_data = {
  414. "email": "social@auth.com",
  415. "username": "SocialUser",
  416. "terms_of_service": agreement.id,
  417. }
  418. request = create_request(data=form_data)
  419. strategy = load_strategy(request=request)
  420. backend = GithubOAuth2(strategy, "/")
  421. result = create_user_with_form(
  422. strategy=strategy,
  423. details={"email": form_data["email"]},
  424. backend=backend,
  425. user=None,
  426. pipeline_index=1,
  427. )
  428. new_user = User.objects.get(email="social@auth.com")
  429. self.assertEqual(result, {"user": new_user, "is_new": True})
  430. self.assertEqual(new_user.agreements, [agreement.id])
  431. self.assertEqual(new_user.useragreement_set.count(), 1)
  432. def test_form_ignore_inactive_agreement(self):
  433. """social register ignores inactive agreement"""
  434. form_data = {
  435. "email": "social@auth.com",
  436. "username": "SocialUser",
  437. "terms_of_service": None,
  438. }
  439. request = create_request(data=form_data)
  440. strategy = load_strategy(request=request)
  441. backend = GithubOAuth2(strategy, "/")
  442. Agreement.objects.create(
  443. type=Agreement.TYPE_TOS, text="Lorem ipsum", is_active=False
  444. )
  445. result = create_user_with_form(
  446. strategy=strategy,
  447. details={"email": form_data["email"]},
  448. backend=backend,
  449. user=None,
  450. pipeline_index=1,
  451. )
  452. new_user = User.objects.get(email="social@auth.com")
  453. self.assertEqual(result, {"user": new_user, "is_new": True})
  454. self.assertEqual(new_user.agreements, [])
  455. self.assertEqual(new_user.useragreement_set.count(), 0)
  456. class GetUsernameTests(PipelineTestCase):
  457. def test_skip_if_user_is_set(self):
  458. """pipeline step is skipped if user was passed"""
  459. strategy = create_strategy()
  460. result = get_username(strategy, {}, None, user=self.user)
  461. self.assertIsNone(result)
  462. def test_skip_if_no_names(self):
  463. """pipeline step is skipped if API returned no names"""
  464. strategy = create_strategy()
  465. result = get_username(strategy, {}, None)
  466. self.assertIsNone(result)
  467. def test_resolve_to_username(self):
  468. """pipeline step resolves username"""
  469. strategy = create_strategy()
  470. result = get_username(strategy, {"username": "BobBoberson"}, None)
  471. self.assertEqual(result, {"clean_username": "BobBoberson"})
  472. def test_normalize_username(self):
  473. """pipeline step normalizes username"""
  474. strategy = create_strategy()
  475. result = get_username(strategy, {"username": "Błop Błoperson"}, None)
  476. self.assertEqual(result, {"clean_username": "BlopBloperson"})
  477. def test_resolve_to_first_name(self):
  478. """pipeline attempts to use first name because username is taken"""
  479. strategy = create_strategy()
  480. details = {"username": self.user.username, "first_name": "Błob"}
  481. result = get_username(strategy, details, None)
  482. self.assertEqual(result, {"clean_username": "Blob"})
  483. def test_dont_resolve_to_last_name(self):
  484. """pipeline will not fallback to last name because username is taken"""
  485. strategy = create_strategy()
  486. details = {"username": self.user.username, "last_name": "Błob"}
  487. result = get_username(strategy, details, None)
  488. self.assertIsNone(result)
  489. def test_resolve_to_first_last_name_first_char(self):
  490. """pipeline will construct username from first name and first char of surname"""
  491. strategy = create_strategy()
  492. details = {"first_name": self.user.username, "last_name": "Błob"}
  493. result = get_username(strategy, details, None)
  494. self.assertEqual(result, {"clean_username": self.user.username + "B"})
  495. def test_dont_resolve_to_banned_name(self):
  496. """pipeline will not resolve to banned name"""
  497. strategy = create_strategy()
  498. Ban.objects.create(banned_value="*Admin*", check_type=Ban.USERNAME)
  499. details = {"username": "Misago Admin", "first_name": "Błob"}
  500. result = get_username(strategy, details, None)
  501. self.assertEqual(result, {"clean_username": "Blob"})
  502. def test_resolve_full_name(self):
  503. """pipeline will resolve to full name"""
  504. strategy = create_strategy()
  505. Ban.objects.create(banned_value="*Admin*", check_type=Ban.USERNAME)
  506. details = {"username": "Misago Admin", "full_name": "Błob Błopo"}
  507. result = get_username(strategy, details, None)
  508. self.assertEqual(result, {"clean_username": "BlobBlopo"})
  509. def test_resolve_to_cut_name(self):
  510. """pipeline will resolve cut too long name on second pass"""
  511. strategy = create_strategy()
  512. details = {"username": "Abrakadabrapokuskonstantynopolitańczykowianeczkatrzy"}
  513. result = get_username(strategy, details, None)
  514. self.assertEqual(result, {"clean_username": "Abrakadabrapok"})
  515. class RequireActivationTests(PipelineTestCase):
  516. def setUp(self):
  517. super().setUp()
  518. self.user.requires_activation = User.ACTIVATION_ADMIN
  519. self.user.save()
  520. def test_skip_if_user_not_set(self):
  521. """pipeline step is skipped if user is not set"""
  522. request = create_request()
  523. strategy = load_strategy(request=request)
  524. backend = GithubOAuth2(strategy, "/")
  525. result = require_activation(
  526. strategy=strategy, details={}, backend=backend, user=None, pipeline_index=1
  527. )
  528. self.assertEqual(result, {})
  529. def test_partial_token_if_user_not_set_no_showstopper(self):
  530. """pipeline step handles set session token if user is not set"""
  531. request = create_request()
  532. strategy = load_strategy(request=request)
  533. strategy.request.session["partial_pipeline_token"] = "test-token"
  534. backend = GithubOAuth2(strategy, "/")
  535. require_activation(
  536. strategy=strategy, details={}, backend=backend, user=None, pipeline_index=1
  537. )
  538. def test_skip_if_user_is_active(self):
  539. """pipeline step is skipped if user is active"""
  540. self.user.requires_activation = User.ACTIVATION_NONE
  541. self.user.save()
  542. self.assertFalse(self.user.requires_activation)
  543. request = create_request()
  544. strategy = load_strategy(request=request)
  545. backend = GithubOAuth2(strategy, "/")
  546. result = require_activation(
  547. strategy=strategy,
  548. details={},
  549. backend=backend,
  550. user=self.user,
  551. pipeline_index=1,
  552. )
  553. self.assertEqual(result, {})
  554. def test_pipeline_returns_html_responseon_get(self):
  555. """pipeline step renders http response for GET request and inactive user"""
  556. request = create_request()
  557. strategy = load_strategy(request=request)
  558. backend = GithubOAuth2(strategy, "/")
  559. response = require_activation(
  560. strategy=strategy,
  561. details={},
  562. backend=backend,
  563. user=self.user,
  564. pipeline_index=1,
  565. )
  566. self.assertEqual(response.status_code, 200)
  567. self.assertEqual(response["content-type"], "text/html; charset=utf-8")
  568. def test_pipeline_returns_json_response_on_post(self):
  569. """pipeline step renders json response for POST request and inactive user"""
  570. request = create_request(data={"username": "anything"})
  571. strategy = load_strategy(request=request)
  572. backend = GithubOAuth2(strategy, "/")
  573. response = require_activation(
  574. strategy=strategy,
  575. details={},
  576. backend=backend,
  577. user=self.user,
  578. pipeline_index=1,
  579. )
  580. self.assertEqual(response.status_code, 200)
  581. self.assertEqual(response["content-type"], "application/json")
  582. self.assertJsonResponseEquals(
  583. response,
  584. {
  585. "step": "done",
  586. "backend_name": "GitHub",
  587. "activation": "admin",
  588. "email": "test@user.com",
  589. "username": "TestUser",
  590. },
  591. )
  592. class ValidateIpNotBannedTests(PipelineTestCase):
  593. def test_skip_if_user_not_set(self):
  594. """pipeline step is skipped if no user was passed"""
  595. result = validate_ip_not_banned(None, {}, GithubOAuth2)
  596. self.assertIsNone(result)
  597. def test_raise_if_banned(self):
  598. """pipeline raises if user's IP is banned"""
  599. Ban.objects.create(banned_value="188.*", check_type=Ban.IP)
  600. try:
  601. validate_ip_not_banned(
  602. MockStrategy(user_ip="188.1.2.3"), {}, GithubOAuth2, self.user
  603. )
  604. self.fail("validate_ip_not_banned should raise SocialAuthBanned")
  605. except SocialAuthBanned as e:
  606. self.assertTrue(isinstance(e.ban, Ban))
  607. def test_exclude_staff(self):
  608. """pipeline excludes staff from bans"""
  609. self.user.is_staff = True
  610. self.user.save()
  611. Ban.objects.create(banned_value="188.*", check_type=Ban.IP)
  612. result = validate_ip_not_banned(
  613. MockStrategy(user_ip="188.1.2.3"), {}, GithubOAuth2, self.user
  614. )
  615. self.assertIsNone(result)
  616. class ValidateUserNotBannedTests(PipelineTestCase):
  617. def test_skip_if_user_not_set(self):
  618. """pipeline step is skipped if no user was passed"""
  619. result = validate_user_not_banned(None, {}, GithubOAuth2)
  620. self.assertIsNone(result)
  621. def test_raise_if_banned(self):
  622. """pipeline raises if user's IP is banned"""
  623. Ban.objects.create(banned_value=self.user.username, check_type=Ban.USERNAME)
  624. try:
  625. validate_user_not_banned(MockStrategy(), {}, GithubOAuth2, self.user)
  626. self.fail("validate_ip_not_banned should raise SocialAuthBanned")
  627. except SocialAuthBanned as e:
  628. self.assertEqual(e.ban.user, self.user)
  629. self.assertTrue(isinstance(e.ban, BanCache))
  630. def test_exclude_staff(self):
  631. """pipeline excludes staff from bans"""
  632. self.user.is_staff = True
  633. self.user.save()
  634. Ban.objects.create(banned_value=self.user.username, check_type=Ban.USERNAME)
  635. result = validate_user_not_banned(MockStrategy(), {}, GithubOAuth2, self.user)
  636. self.assertIsNone(result)