test_social_pipeline.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766
  1. import json
  2. from django.contrib.auth import get_user_model
  3. from django.core import mail
  4. from django.test import RequestFactory
  5. from social_core.backends.github import GithubOAuth2
  6. from social_django.utils import load_strategy
  7. from ...acl.useracl import get_user_acl
  8. from ...conf.dynamicsettings import DynamicSettings
  9. from ...conf.test import override_dynamic_settings
  10. from ...conftest import get_cache_versions
  11. from ...core.exceptions import SocialAuthBanned, SocialAuthFailed
  12. from ...legal.models import Agreement
  13. from ..models import AnonymousUser, Ban, BanCache
  14. from ..social.pipeline import (
  15. associate_by_email,
  16. create_user,
  17. create_user_with_form,
  18. get_username,
  19. require_activation,
  20. validate_ip_not_banned,
  21. validate_user_not_banned,
  22. )
  23. from ..test import UserTestCase
  24. User = get_user_model()
  25. def create_request(user_ip="0.0.0.0", data=None):
  26. factory = RequestFactory()
  27. if data is None:
  28. request = factory.get("/")
  29. else:
  30. request = factory.post(
  31. "/", data=json.dumps(data), content_type="application/json"
  32. )
  33. request.include_frontend_context = True
  34. request.cache_versions = get_cache_versions()
  35. request.frontend_context = {}
  36. request.session = {}
  37. request.settings = DynamicSettings(request.cache_versions)
  38. request.user = AnonymousUser()
  39. request.user_acl = get_user_acl(request.user, request.cache_versions)
  40. request.user_ip = user_ip
  41. return request
  42. def create_strategy():
  43. request = create_request()
  44. return load_strategy(request=request)
  45. class MockStrategy:
  46. def __init__(self, user_ip="0.0.0.0"):
  47. self.cleaned_partial_token = None
  48. self.request = create_request(user_ip)
  49. def clean_partial_pipeline(self, token):
  50. self.cleaned_partial_token = token
  51. class PipelineTestCase(UserTestCase):
  52. def get_initial_user(self):
  53. self.user = self.get_authenticated_user()
  54. def assertNewUserIsCorrect(
  55. self, new_user, form_data=None, activation=None, email_verified=False
  56. ):
  57. self.assertFalse(new_user.has_usable_password())
  58. self.assertIn("Welcome", mail.outbox[0].subject)
  59. if form_data:
  60. self.assertEqual(new_user.email, form_data["email"])
  61. self.assertEqual(new_user.username, form_data["username"])
  62. if activation == "none":
  63. self.assertEqual(new_user.requires_activation, User.ACTIVATION_NONE)
  64. if activation == "user":
  65. if email_verified:
  66. self.assertEqual(new_user.requires_activation, User.ACTIVATION_NONE)
  67. else:
  68. self.assertEqual(new_user.requires_activation, User.ACTIVATION_USER)
  69. if activation == "admin":
  70. self.assertEqual(new_user.requires_activation, User.ACTIVATION_ADMIN)
  71. self.assertEqual(new_user.audittrail_set.count(), 1)
  72. def assertJsonResponseEquals(self, response, value):
  73. response_content = response.content.decode("utf-8")
  74. response_json = json.loads(response_content)
  75. self.assertEqual(response_json, value)
  76. class AssociateByEmailTests(PipelineTestCase):
  77. def test_skip_if_user_is_already_set(self):
  78. """pipeline step is skipped if user was found by previous step"""
  79. result = associate_by_email(None, {}, GithubOAuth2, self.user)
  80. self.assertIsNone(result)
  81. def test_skip_if_no_email_passed(self):
  82. """pipeline step is skipped if no email was passed"""
  83. result = associate_by_email(None, {}, GithubOAuth2)
  84. self.assertIsNone(result)
  85. def test_skip_if_user_with_email_not_found(self):
  86. """pipeline step is skipped if no email was passed"""
  87. result = associate_by_email(None, {"email": "not@found.com"}, GithubOAuth2)
  88. self.assertIsNone(result)
  89. def test_raise_if_user_is_inactive(self):
  90. """pipeline raises if user was inactive"""
  91. self.user.is_active = False
  92. self.user.save()
  93. try:
  94. associate_by_email(None, {"email": self.user.email}, GithubOAuth2)
  95. self.fail("associate_by_email should raise SocialAuthFailed")
  96. except SocialAuthFailed as e:
  97. self.assertEqual(
  98. e.message,
  99. (
  100. "The e-mail address associated with your GitHub account is not available for "
  101. "use on this site."
  102. ),
  103. )
  104. def test_raise_if_user_needs_admin_activation(self):
  105. """pipeline raises if user needs admin activation"""
  106. self.user.requires_activation = User.ACTIVATION_ADMIN
  107. self.user.save()
  108. try:
  109. associate_by_email(None, {"email": self.user.email}, GithubOAuth2)
  110. self.fail("associate_by_email should raise SocialAuthFailed")
  111. except SocialAuthFailed as e:
  112. self.assertEqual(
  113. e.message,
  114. (
  115. "Your account has to be activated by site administrator before you will be "
  116. "able to sign in with GitHub."
  117. ),
  118. )
  119. def test_return_user(self):
  120. """pipeline returns user if email was found"""
  121. result = associate_by_email(None, {"email": self.user.email}, GithubOAuth2)
  122. self.assertEqual(result, {"user": self.user, "is_new": False})
  123. def test_return_user_email_inactive(self):
  124. """pipeline returns user even if they didn't activate their account manually"""
  125. self.user.requires_activation = User.ACTIVATION_USER
  126. self.user.save()
  127. result = associate_by_email(None, {"email": self.user.email}, GithubOAuth2)
  128. self.assertEqual(result, {"user": self.user, "is_new": False})
  129. class CreateUser(PipelineTestCase):
  130. def test_skip_if_user_is_set(self):
  131. """pipeline step is skipped if user was passed"""
  132. result = create_user(MockStrategy(), {}, GithubOAuth2(), user=self.user)
  133. self.assertIsNone(result)
  134. def test_skip_if_no_email_passed(self):
  135. """pipeline step is skipped if no email was passed"""
  136. result = create_user(
  137. MockStrategy(), {}, GithubOAuth2(), clean_username="TestBob"
  138. )
  139. self.assertIsNone(result)
  140. def test_skip_if_no_clean_username_passed(self):
  141. """pipeline step is skipped if cleaned username wasnt passed"""
  142. result = create_user(
  143. MockStrategy(), {"email": "hello@example.com"}, GithubOAuth2()
  144. )
  145. self.assertIsNone(result)
  146. def test_skip_if_email_is_taken(self):
  147. """pipeline step is skipped if email was taken"""
  148. result = create_user(
  149. MockStrategy(),
  150. {"email": self.user.email},
  151. GithubOAuth2(),
  152. clean_username="NewUser",
  153. )
  154. self.assertIsNone(result)
  155. @override_dynamic_settings(account_activation="none")
  156. def test_user_created_no_activation(self):
  157. """pipeline step creates active user for valid data and disabled activation"""
  158. result = create_user(
  159. MockStrategy(),
  160. {"email": "new@example.com"},
  161. GithubOAuth2(),
  162. clean_username="NewUser",
  163. )
  164. new_user = User.objects.get(email="new@example.com")
  165. self.assertEqual(result, {"user": new_user, "is_new": True})
  166. self.assertEqual(new_user.username, "NewUser")
  167. self.assertNewUserIsCorrect(new_user, email_verified=True, activation="none")
  168. @override_dynamic_settings(account_activation="user")
  169. def test_user_created_activation_by_user(self):
  170. """pipeline step creates active user for valid data and user activation"""
  171. result = create_user(
  172. MockStrategy(),
  173. {"email": "new@example.com"},
  174. GithubOAuth2(),
  175. clean_username="NewUser",
  176. )
  177. new_user = User.objects.get(email="new@example.com")
  178. self.assertEqual(result, {"user": new_user, "is_new": True})
  179. self.assertEqual(new_user.username, "NewUser")
  180. self.assertNewUserIsCorrect(new_user, email_verified=True, activation="user")
  181. @override_dynamic_settings(account_activation="admin")
  182. def test_user_created_activation_by_admin(self):
  183. """pipeline step creates in user for valid data and admin activation"""
  184. result = create_user(
  185. MockStrategy(),
  186. {"email": "new@example.com"},
  187. GithubOAuth2(),
  188. clean_username="NewUser",
  189. )
  190. new_user = User.objects.get(email="new@example.com")
  191. self.assertEqual(result, {"user": new_user, "is_new": True})
  192. self.assertEqual(new_user.username, "NewUser")
  193. self.assertNewUserIsCorrect(new_user, email_verified=True, activation="admin")
  194. class CreateUserWithFormTests(PipelineTestCase):
  195. def setUp(self):
  196. super().setUp()
  197. Agreement.objects.invalidate_cache()
  198. def tearDown(self):
  199. super().tearDown()
  200. Agreement.objects.invalidate_cache()
  201. def test_skip_if_user_is_set(self):
  202. """pipeline step is skipped if user was passed"""
  203. request = create_request()
  204. strategy = load_strategy(request=request)
  205. backend = GithubOAuth2(strategy, "/")
  206. result = create_user_with_form(
  207. strategy=strategy,
  208. details={},
  209. backend=backend,
  210. user=self.user,
  211. pipeline_index=1,
  212. )
  213. self.assertEqual(result, {})
  214. def test_renders_form_if_not_post(self):
  215. """pipeline step renders form if not POST"""
  216. request = create_request()
  217. strategy = load_strategy(request=request)
  218. backend = GithubOAuth2(strategy, "/")
  219. response = create_user_with_form(
  220. strategy=strategy, details={}, backend=backend, user=None, pipeline_index=1
  221. )
  222. self.assertContains(response, "GitHub")
  223. def test_empty_data_rejected(self):
  224. """form rejects empty data"""
  225. request = create_request(data={})
  226. strategy = load_strategy(request=request)
  227. backend = GithubOAuth2(strategy, "/")
  228. response = create_user_with_form(
  229. strategy=strategy, details={}, backend=backend, user=None, pipeline_index=1
  230. )
  231. self.assertEqual(response.status_code, 400)
  232. self.assertJsonResponseEquals(
  233. response,
  234. {
  235. "email": ["This field is required."],
  236. "username": ["This field is required."],
  237. },
  238. )
  239. def test_taken_data_rejected(self):
  240. """form rejects taken data"""
  241. request = create_request(
  242. data={"email": self.user.email, "username": self.user.username}
  243. )
  244. strategy = load_strategy(request=request)
  245. backend = GithubOAuth2(strategy, "/")
  246. response = create_user_with_form(
  247. strategy=strategy, details={}, backend=backend, user=None, pipeline_index=1
  248. )
  249. self.assertEqual(response.status_code, 400)
  250. self.assertJsonResponseEquals(
  251. response,
  252. {
  253. "email": ["This e-mail address is not available."],
  254. "username": ["This username is not available."],
  255. },
  256. )
  257. @override_dynamic_settings(account_activation="none")
  258. def test_user_created_no_activation_verified_email(self):
  259. """active user is created for verified email and activation disabled"""
  260. form_data = {"email": "social@auth.com", "username": "SocialUser"}
  261. request = create_request(data=form_data)
  262. strategy = load_strategy(request=request)
  263. backend = GithubOAuth2(strategy, "/")
  264. result = create_user_with_form(
  265. strategy=strategy,
  266. details={"email": form_data["email"]},
  267. backend=backend,
  268. user=None,
  269. pipeline_index=1,
  270. )
  271. new_user = User.objects.get(email="social@auth.com")
  272. self.assertEqual(result, {"user": new_user, "is_new": True})
  273. self.assertNewUserIsCorrect(
  274. new_user, form_data, activation="none", email_verified=True
  275. )
  276. @override_dynamic_settings(account_activation="none")
  277. def test_user_created_no_activation_nonverified_email(self):
  278. """active user is created for non-verified email and activation disabled"""
  279. form_data = {"email": "social@auth.com", "username": "SocialUser"}
  280. request = create_request(data=form_data)
  281. strategy = load_strategy(request=request)
  282. backend = GithubOAuth2(strategy, "/")
  283. result = create_user_with_form(
  284. strategy=strategy,
  285. details={"email": ""},
  286. backend=backend,
  287. user=None,
  288. pipeline_index=1,
  289. )
  290. new_user = User.objects.get(email="social@auth.com")
  291. self.assertEqual(result, {"user": new_user, "is_new": True})
  292. self.assertNewUserIsCorrect(
  293. new_user, form_data, activation="none", email_verified=False
  294. )
  295. @override_dynamic_settings(account_activation="user")
  296. def test_user_created_activation_by_user_verified_email(self):
  297. """active user is created for verified email and activation by user"""
  298. form_data = {"email": "social@auth.com", "username": "SocialUser"}
  299. request = create_request(data=form_data)
  300. strategy = load_strategy(request=request)
  301. backend = GithubOAuth2(strategy, "/")
  302. result = create_user_with_form(
  303. strategy=strategy,
  304. details={"email": form_data["email"]},
  305. backend=backend,
  306. user=None,
  307. pipeline_index=1,
  308. )
  309. new_user = User.objects.get(email="social@auth.com")
  310. self.assertEqual(result, {"user": new_user, "is_new": True})
  311. self.assertNewUserIsCorrect(
  312. new_user, form_data, activation="user", email_verified=True
  313. )
  314. @override_dynamic_settings(account_activation="user")
  315. def test_user_created_activation_by_user_nonverified_email(self):
  316. """inactive user is created for non-verified email and activation by user"""
  317. form_data = {"email": "social@auth.com", "username": "SocialUser"}
  318. request = create_request(data=form_data)
  319. strategy = load_strategy(request=request)
  320. backend = GithubOAuth2(strategy, "/")
  321. result = create_user_with_form(
  322. strategy=strategy,
  323. details={"email": ""},
  324. backend=backend,
  325. user=None,
  326. pipeline_index=1,
  327. )
  328. new_user = User.objects.get(email="social@auth.com")
  329. self.assertEqual(result, {"user": new_user, "is_new": True})
  330. self.assertNewUserIsCorrect(
  331. new_user, form_data, activation="user", email_verified=False
  332. )
  333. @override_dynamic_settings(account_activation="admin")
  334. def test_user_created_activation_by_admin_verified_email(self):
  335. """inactive user is created for verified email and activation by admin"""
  336. form_data = {"email": "social@auth.com", "username": "SocialUser"}
  337. request = create_request(data=form_data)
  338. strategy = load_strategy(request=request)
  339. backend = GithubOAuth2(strategy, "/")
  340. result = create_user_with_form(
  341. strategy=strategy,
  342. details={"email": form_data["email"]},
  343. backend=backend,
  344. user=None,
  345. pipeline_index=1,
  346. )
  347. new_user = User.objects.get(email="social@auth.com")
  348. self.assertEqual(result, {"user": new_user, "is_new": True})
  349. self.assertNewUserIsCorrect(
  350. new_user, form_data, activation="admin", email_verified=True
  351. )
  352. @override_dynamic_settings(account_activation="admin")
  353. def test_user_created_activation_by_admin_nonverified_email(self):
  354. """inactive user is created for non-verified email and activation by admin"""
  355. form_data = {"email": "social@auth.com", "username": "SocialUser"}
  356. request = create_request(data=form_data)
  357. strategy = load_strategy(request=request)
  358. backend = GithubOAuth2(strategy, "/")
  359. result = create_user_with_form(
  360. strategy=strategy,
  361. details={"email": ""},
  362. backend=backend,
  363. user=None,
  364. pipeline_index=1,
  365. )
  366. new_user = User.objects.get(email="social@auth.com")
  367. self.assertEqual(result, {"user": new_user, "is_new": True})
  368. self.assertNewUserIsCorrect(
  369. new_user, form_data, activation="admin", email_verified=False
  370. )
  371. def test_form_check_agreement(self):
  372. """social register checks agreement"""
  373. form_data = {"email": "social@auth.com", "username": "SocialUser"}
  374. request = create_request(data=form_data)
  375. strategy = load_strategy(request=request)
  376. backend = GithubOAuth2(strategy, "/")
  377. agreement = Agreement.objects.create(
  378. type=Agreement.TYPE_TOS, text="Lorem ipsum", is_active=True
  379. )
  380. response = create_user_with_form(
  381. strategy=strategy,
  382. details={"email": form_data["email"]},
  383. backend=backend,
  384. user=None,
  385. pipeline_index=1,
  386. )
  387. self.assertEqual(response.status_code, 400)
  388. self.assertJsonResponseEquals(
  389. response, {"terms_of_service": ["This agreement is required."]}
  390. )
  391. # invalid agreement id
  392. form_data = {
  393. "email": "social@auth.com",
  394. "username": "SocialUser",
  395. "terms_of_service": agreement.id + 1,
  396. }
  397. request = create_request(data=form_data)
  398. strategy = load_strategy(request=request)
  399. backend = GithubOAuth2(strategy, "/")
  400. response = create_user_with_form(
  401. strategy=strategy,
  402. details={"email": form_data["email"]},
  403. backend=backend,
  404. user=None,
  405. pipeline_index=1,
  406. )
  407. self.assertEqual(response.status_code, 400)
  408. self.assertJsonResponseEquals(
  409. response, {"terms_of_service": ["This agreement is required."]}
  410. )
  411. # valid agreement id
  412. form_data = {
  413. "email": "social@auth.com",
  414. "username": "SocialUser",
  415. "terms_of_service": agreement.id,
  416. }
  417. request = create_request(data=form_data)
  418. strategy = load_strategy(request=request)
  419. backend = GithubOAuth2(strategy, "/")
  420. result = create_user_with_form(
  421. strategy=strategy,
  422. details={"email": form_data["email"]},
  423. backend=backend,
  424. user=None,
  425. pipeline_index=1,
  426. )
  427. new_user = User.objects.get(email="social@auth.com")
  428. self.assertEqual(result, {"user": new_user, "is_new": True})
  429. self.assertEqual(new_user.agreements, [agreement.id])
  430. self.assertEqual(new_user.useragreement_set.count(), 1)
  431. def test_form_ignore_inactive_agreement(self):
  432. """social register ignores inactive agreement"""
  433. form_data = {
  434. "email": "social@auth.com",
  435. "username": "SocialUser",
  436. "terms_of_service": None,
  437. }
  438. request = create_request(data=form_data)
  439. strategy = load_strategy(request=request)
  440. backend = GithubOAuth2(strategy, "/")
  441. Agreement.objects.create(
  442. type=Agreement.TYPE_TOS, text="Lorem ipsum", is_active=False
  443. )
  444. result = create_user_with_form(
  445. strategy=strategy,
  446. details={"email": form_data["email"]},
  447. backend=backend,
  448. user=None,
  449. pipeline_index=1,
  450. )
  451. new_user = User.objects.get(email="social@auth.com")
  452. self.assertEqual(result, {"user": new_user, "is_new": True})
  453. self.assertEqual(new_user.agreements, [])
  454. self.assertEqual(new_user.useragreement_set.count(), 0)
  455. class GetUsernameTests(PipelineTestCase):
  456. def test_skip_if_user_is_set(self):
  457. """pipeline step is skipped if user was passed"""
  458. strategy = create_strategy()
  459. result = get_username(strategy, {}, None, user=self.user)
  460. self.assertIsNone(result)
  461. def test_skip_if_no_names(self):
  462. """pipeline step is skipped if API returned no names"""
  463. strategy = create_strategy()
  464. result = get_username(strategy, {}, None)
  465. self.assertIsNone(result)
  466. def test_resolve_to_username(self):
  467. """pipeline step resolves username"""
  468. strategy = create_strategy()
  469. result = get_username(strategy, {"username": "BobBoberson"}, None)
  470. self.assertEqual(result, {"clean_username": "BobBoberson"})
  471. def test_normalize_username(self):
  472. """pipeline step normalizes username"""
  473. strategy = create_strategy()
  474. result = get_username(strategy, {"username": "Błop Błoperson"}, None)
  475. self.assertEqual(result, {"clean_username": "BlopBloperson"})
  476. def test_resolve_to_first_name(self):
  477. """pipeline attempts to use first name because username is taken"""
  478. strategy = create_strategy()
  479. details = {"username": self.user.username, "first_name": "Błob"}
  480. result = get_username(strategy, details, None)
  481. self.assertEqual(result, {"clean_username": "Blob"})
  482. def test_dont_resolve_to_last_name(self):
  483. """pipeline will not fallback to last name because username is taken"""
  484. strategy = create_strategy()
  485. details = {"username": self.user.username, "last_name": "Błob"}
  486. result = get_username(strategy, details, None)
  487. self.assertIsNone(result)
  488. def test_resolve_to_first_last_name_first_char(self):
  489. """pipeline will construct username from first name and first char of surname"""
  490. strategy = create_strategy()
  491. details = {"first_name": self.user.username, "last_name": "Błob"}
  492. result = get_username(strategy, details, None)
  493. self.assertEqual(result, {"clean_username": self.user.username + "B"})
  494. def test_dont_resolve_to_banned_name(self):
  495. """pipeline will not resolve to banned name"""
  496. strategy = create_strategy()
  497. Ban.objects.create(banned_value="*Admin*", check_type=Ban.USERNAME)
  498. details = {"username": "Misago Admin", "first_name": "Błob"}
  499. result = get_username(strategy, details, None)
  500. self.assertEqual(result, {"clean_username": "Blob"})
  501. def test_resolve_full_name(self):
  502. """pipeline will resolve to full name"""
  503. strategy = create_strategy()
  504. Ban.objects.create(banned_value="*Admin*", check_type=Ban.USERNAME)
  505. details = {"username": "Misago Admin", "full_name": "Błob Błopo"}
  506. result = get_username(strategy, details, None)
  507. self.assertEqual(result, {"clean_username": "BlobBlopo"})
  508. def test_resolve_to_cut_name(self):
  509. """pipeline will resolve cut too long name on second pass"""
  510. strategy = create_strategy()
  511. details = {"username": "Abrakadabrapokuskonstantynopolitańczykowianeczkatrzy"}
  512. result = get_username(strategy, details, None)
  513. self.assertEqual(result, {"clean_username": "Abrakadabrapok"})
  514. class RequireActivationTests(PipelineTestCase):
  515. def setUp(self):
  516. super().setUp()
  517. self.user.requires_activation = User.ACTIVATION_ADMIN
  518. self.user.save()
  519. def test_skip_if_user_not_set(self):
  520. """pipeline step is skipped if user is not set"""
  521. request = create_request()
  522. strategy = load_strategy(request=request)
  523. backend = GithubOAuth2(strategy, "/")
  524. result = require_activation(
  525. strategy=strategy, details={}, backend=backend, user=None, pipeline_index=1
  526. )
  527. self.assertEqual(result, {})
  528. def test_partial_token_if_user_not_set_no_showstopper(self):
  529. """pipeline step handles set session token if user is not set"""
  530. request = create_request()
  531. strategy = load_strategy(request=request)
  532. strategy.request.session["partial_pipeline_token"] = "test-token"
  533. backend = GithubOAuth2(strategy, "/")
  534. require_activation(
  535. strategy=strategy, details={}, backend=backend, user=None, pipeline_index=1
  536. )
  537. def test_skip_if_user_is_active(self):
  538. """pipeline step is skipped if user is active"""
  539. self.user.requires_activation = User.ACTIVATION_NONE
  540. self.user.save()
  541. self.assertFalse(self.user.requires_activation)
  542. request = create_request()
  543. strategy = load_strategy(request=request)
  544. backend = GithubOAuth2(strategy, "/")
  545. result = require_activation(
  546. strategy=strategy,
  547. details={},
  548. backend=backend,
  549. user=self.user,
  550. pipeline_index=1,
  551. )
  552. self.assertEqual(result, {})
  553. def test_pipeline_returns_html_responseon_get(self):
  554. """pipeline step renders http response for GET request and inactive user"""
  555. request = create_request()
  556. strategy = load_strategy(request=request)
  557. backend = GithubOAuth2(strategy, "/")
  558. response = require_activation(
  559. strategy=strategy,
  560. details={},
  561. backend=backend,
  562. user=self.user,
  563. pipeline_index=1,
  564. )
  565. self.assertEqual(response.status_code, 200)
  566. self.assertEqual(response["content-type"], "text/html; charset=utf-8")
  567. def test_pipeline_returns_json_response_on_post(self):
  568. """pipeline step renders json response for POST request and inactive user"""
  569. request = create_request(data={"username": "anything"})
  570. strategy = load_strategy(request=request)
  571. backend = GithubOAuth2(strategy, "/")
  572. response = require_activation(
  573. strategy=strategy,
  574. details={},
  575. backend=backend,
  576. user=self.user,
  577. pipeline_index=1,
  578. )
  579. self.assertEqual(response.status_code, 200)
  580. self.assertEqual(response["content-type"], "application/json")
  581. self.assertJsonResponseEquals(
  582. response,
  583. {
  584. "step": "done",
  585. "backend_name": "GitHub",
  586. "activation": "admin",
  587. "email": "test@user.com",
  588. "username": "TestUser",
  589. },
  590. )
  591. class ValidateIpNotBannedTests(PipelineTestCase):
  592. def test_skip_if_user_not_set(self):
  593. """pipeline step is skipped if no user was passed"""
  594. result = validate_ip_not_banned(None, {}, GithubOAuth2)
  595. self.assertIsNone(result)
  596. def test_raise_if_banned(self):
  597. """pipeline raises if user's IP is banned"""
  598. Ban.objects.create(banned_value="188.*", check_type=Ban.IP)
  599. try:
  600. validate_ip_not_banned(
  601. MockStrategy(user_ip="188.1.2.3"), {}, GithubOAuth2, self.user
  602. )
  603. self.fail("validate_ip_not_banned should raise SocialAuthBanned")
  604. except SocialAuthBanned as e:
  605. self.assertTrue(isinstance(e.ban, Ban))
  606. def test_exclude_staff(self):
  607. """pipeline excludes staff from bans"""
  608. self.user.is_staff = True
  609. self.user.save()
  610. Ban.objects.create(banned_value="188.*", check_type=Ban.IP)
  611. result = validate_ip_not_banned(
  612. MockStrategy(user_ip="188.1.2.3"), {}, GithubOAuth2, self.user
  613. )
  614. self.assertIsNone(result)
  615. class ValidateUserNotBannedTests(PipelineTestCase):
  616. def test_skip_if_user_not_set(self):
  617. """pipeline step is skipped if no user was passed"""
  618. result = validate_user_not_banned(None, {}, GithubOAuth2)
  619. self.assertIsNone(result)
  620. def test_raise_if_banned(self):
  621. """pipeline raises if user's IP is banned"""
  622. Ban.objects.create(banned_value=self.user.username, check_type=Ban.USERNAME)
  623. try:
  624. validate_user_not_banned(MockStrategy(), {}, GithubOAuth2, self.user)
  625. self.fail("validate_ip_not_banned should raise SocialAuthBanned")
  626. except SocialAuthBanned as e:
  627. self.assertEqual(e.ban.user, self.user)
  628. self.assertTrue(isinstance(e.ban, BanCache))
  629. def test_exclude_staff(self):
  630. """pipeline excludes staff from bans"""
  631. self.user.is_staff = True
  632. self.user.save()
  633. Ban.objects.create(banned_value=self.user.username, check_type=Ban.USERNAME)
  634. result = validate_user_not_banned(MockStrategy(), {}, GithubOAuth2, self.user)
  635. self.assertIsNone(result)