test_social_pipeline.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780
  1. import json
  2. from unittest.mock import Mock
  3. from django.contrib.auth import get_user_model
  4. from django.core import mail
  5. from django.test import RequestFactory
  6. from social_core.backends.github import GithubOAuth2
  7. from social_django.utils import load_strategy
  8. from ...acl.useracl import get_user_acl
  9. from ...conf.dynamicsettings import DynamicSettings
  10. from ...conf.test import override_dynamic_settings
  11. from ...conftest import get_cache_versions
  12. from ...core.exceptions import SocialAuthBanned, SocialAuthFailed
  13. from ...legal.models import Agreement
  14. from ...users.models import AnonymousUser, Ban, BanCache
  15. from ...users.test import UserTestCase
  16. from ..pipeline import (
  17. associate_by_email,
  18. create_user,
  19. create_user_with_form,
  20. get_username,
  21. require_activation,
  22. validate_ip_not_banned,
  23. validate_user_not_banned,
  24. )
  25. User = get_user_model()
  26. def create_request(user_ip="0.0.0.0", data=None):
  27. factory = RequestFactory()
  28. if data is None:
  29. request = factory.get("/")
  30. else:
  31. request = factory.post(
  32. "/", data=json.dumps(data), content_type="application/json"
  33. )
  34. request.include_frontend_context = True
  35. request.cache_versions = get_cache_versions()
  36. request.frontend_context = {}
  37. request.socialauth = {}
  38. request.session = {}
  39. request.settings = DynamicSettings(request.cache_versions)
  40. request.user = AnonymousUser()
  41. request.user_acl = get_user_acl(request.user, request.cache_versions)
  42. request.user_ip = user_ip
  43. return request
  44. def create_strategy():
  45. request = create_request()
  46. return load_strategy(request=request)
  47. class MockStrategy:
  48. def __init__(self, user_ip="0.0.0.0"):
  49. self.cleaned_partial_token = None
  50. self.request = create_request(user_ip)
  51. def clean_partial_pipeline(self, token):
  52. self.cleaned_partial_token = token
  53. class PipelineTestCase(UserTestCase):
  54. def get_initial_user(self):
  55. self.user = self.get_authenticated_user()
  56. def assertNewUserIsCorrect(
  57. self, new_user, form_data=None, activation=None, email_verified=False
  58. ):
  59. self.assertFalse(new_user.has_usable_password())
  60. self.assertIn("Welcome", mail.outbox[0].subject)
  61. if form_data:
  62. self.assertEqual(new_user.email, form_data["email"])
  63. self.assertEqual(new_user.username, form_data["username"])
  64. if activation == "none":
  65. self.assertEqual(new_user.requires_activation, User.ACTIVATION_NONE)
  66. if activation == "user":
  67. if email_verified:
  68. self.assertEqual(new_user.requires_activation, User.ACTIVATION_NONE)
  69. else:
  70. self.assertEqual(new_user.requires_activation, User.ACTIVATION_USER)
  71. if activation == "admin":
  72. self.assertEqual(new_user.requires_activation, User.ACTIVATION_ADMIN)
  73. self.assertEqual(new_user.audittrail_set.count(), 1)
  74. def assertJsonResponseEquals(self, response, value):
  75. response_content = response.content.decode("utf-8")
  76. response_json = json.loads(response_content)
  77. self.assertEqual(response_json, value)
  78. class AssociateByEmailTests(PipelineTestCase):
  79. def test_skip_if_user_is_already_set(self):
  80. """pipeline step is skipped if user was found by previous step"""
  81. result = associate_by_email(Mock(), {}, GithubOAuth2, self.user)
  82. self.assertIsNone(result)
  83. def test_skip_if_no_email_passed(self):
  84. """pipeline step is skipped if no email was passed"""
  85. result = associate_by_email(Mock(), {}, GithubOAuth2)
  86. self.assertIsNone(result)
  87. def test_skip_if_user_with_email_not_found(self):
  88. """pipeline step is skipped if no email was passed"""
  89. result = associate_by_email(Mock(), {"email": "not@found.com"}, GithubOAuth2)
  90. self.assertIsNone(result)
  91. def test_raise_if_user_is_inactive(self):
  92. """pipeline raises if user was inactive"""
  93. strategy = Mock(setting=Mock(return_value=True))
  94. self.user.is_active = False
  95. self.user.save()
  96. try:
  97. associate_by_email(strategy, {"email": self.user.email}, GithubOAuth2)
  98. self.fail("associate_by_email should raise SocialAuthFailed")
  99. except SocialAuthFailed as e:
  100. self.assertEqual(
  101. e.message,
  102. (
  103. "The e-mail address associated with your GitHub account "
  104. "is not available for use on this site."
  105. ),
  106. )
  107. def test_raise_if_user_needs_admin_activation(self):
  108. """pipeline raises if user needs admin activation"""
  109. strategy = Mock(setting=Mock(return_value=True))
  110. self.user.requires_activation = User.ACTIVATION_ADMIN
  111. self.user.save()
  112. try:
  113. associate_by_email(strategy, {"email": self.user.email}, GithubOAuth2)
  114. self.fail("associate_by_email should raise SocialAuthFailed")
  115. except SocialAuthFailed as e:
  116. self.assertEqual(
  117. e.message,
  118. (
  119. "Your account has to be activated by site administrator "
  120. "before you will be able to sign in with GitHub."
  121. ),
  122. )
  123. def test_no_user_is_returned_if_pipeline_is_disabled(self):
  124. strategy = Mock(setting=Mock(return_value=False))
  125. result = associate_by_email(strategy, {"email": self.user.email}, GithubOAuth2)
  126. self.assertIsNone(result)
  127. strategy.setting.assert_called_once_with(
  128. "ASSOCIATE_BY_EMAIL", default=False, backend=GithubOAuth2
  129. )
  130. def test_return_user(self):
  131. """pipeline returns user if email was found"""
  132. strategy = Mock(setting=Mock(return_value=True))
  133. result = associate_by_email(strategy, {"email": self.user.email}, GithubOAuth2)
  134. self.assertEqual(result, {"user": self.user, "is_new": False})
  135. def test_return_user_email_inactive(self):
  136. """pipeline returns user even if they didn't activate their account manually"""
  137. strategy = Mock(setting=Mock(return_value=True))
  138. self.user.requires_activation = User.ACTIVATION_USER
  139. self.user.save()
  140. result = associate_by_email(strategy, {"email": self.user.email}, GithubOAuth2)
  141. self.assertEqual(result, {"user": self.user, "is_new": False})
  142. class CreateUser(PipelineTestCase):
  143. def test_skip_if_user_is_set(self):
  144. """pipeline step is skipped if user was passed"""
  145. result = create_user(MockStrategy(), {}, GithubOAuth2(), user=self.user)
  146. self.assertIsNone(result)
  147. def test_skip_if_no_email_passed(self):
  148. """pipeline step is skipped if no email was passed"""
  149. result = create_user(
  150. MockStrategy(), {}, GithubOAuth2(), clean_username="TestBob"
  151. )
  152. self.assertIsNone(result)
  153. def test_skip_if_no_clean_username_passed(self):
  154. """pipeline step is skipped if cleaned username wasnt passed"""
  155. result = create_user(
  156. MockStrategy(), {"email": "hello@example.com"}, GithubOAuth2()
  157. )
  158. self.assertIsNone(result)
  159. def test_skip_if_email_is_taken(self):
  160. """pipeline step is skipped if email was taken"""
  161. result = create_user(
  162. MockStrategy(),
  163. {"email": self.user.email},
  164. GithubOAuth2(),
  165. clean_username="NewUser",
  166. )
  167. self.assertIsNone(result)
  168. @override_dynamic_settings(account_activation="none")
  169. def test_user_created_no_activation(self):
  170. """pipeline step creates active user for valid data and disabled activation"""
  171. result = create_user(
  172. MockStrategy(),
  173. {"email": "new@example.com"},
  174. GithubOAuth2(),
  175. clean_username="NewUser",
  176. )
  177. new_user = User.objects.get(email="new@example.com")
  178. self.assertEqual(result, {"user": new_user, "is_new": True})
  179. self.assertEqual(new_user.username, "NewUser")
  180. self.assertNewUserIsCorrect(new_user, email_verified=True, activation="none")
  181. @override_dynamic_settings(account_activation="user")
  182. def test_user_created_activation_by_user(self):
  183. """pipeline step creates active user for valid data and user activation"""
  184. result = create_user(
  185. MockStrategy(),
  186. {"email": "new@example.com"},
  187. GithubOAuth2(),
  188. clean_username="NewUser",
  189. )
  190. new_user = User.objects.get(email="new@example.com")
  191. self.assertEqual(result, {"user": new_user, "is_new": True})
  192. self.assertEqual(new_user.username, "NewUser")
  193. self.assertNewUserIsCorrect(new_user, email_verified=True, activation="user")
  194. @override_dynamic_settings(account_activation="admin")
  195. def test_user_created_activation_by_admin(self):
  196. """pipeline step creates in user for valid data and admin activation"""
  197. result = create_user(
  198. MockStrategy(),
  199. {"email": "new@example.com"},
  200. GithubOAuth2(),
  201. clean_username="NewUser",
  202. )
  203. new_user = User.objects.get(email="new@example.com")
  204. self.assertEqual(result, {"user": new_user, "is_new": True})
  205. self.assertEqual(new_user.username, "NewUser")
  206. self.assertNewUserIsCorrect(new_user, email_verified=True, activation="admin")
  207. class CreateUserWithFormTests(PipelineTestCase):
  208. def setUp(self):
  209. super().setUp()
  210. Agreement.objects.invalidate_cache()
  211. def tearDown(self):
  212. super().tearDown()
  213. Agreement.objects.invalidate_cache()
  214. def test_skip_if_user_is_set(self):
  215. """pipeline step is skipped if user was passed"""
  216. request = create_request()
  217. strategy = load_strategy(request=request)
  218. backend = GithubOAuth2(strategy, "/")
  219. result = create_user_with_form(
  220. strategy=strategy,
  221. details={},
  222. backend=backend,
  223. user=self.user,
  224. pipeline_index=1,
  225. )
  226. self.assertEqual(result, {})
  227. def test_renders_form_if_not_post(self):
  228. """pipeline step renders form if not POST"""
  229. request = create_request()
  230. strategy = load_strategy(request=request)
  231. backend = GithubOAuth2(strategy, "/")
  232. response = create_user_with_form(
  233. strategy=strategy, details={}, backend=backend, user=None, pipeline_index=1
  234. )
  235. self.assertContains(response, "GitHub")
  236. def test_empty_data_rejected(self):
  237. """form rejects empty data"""
  238. request = create_request(data={})
  239. strategy = load_strategy(request=request)
  240. backend = GithubOAuth2(strategy, "/")
  241. response = create_user_with_form(
  242. strategy=strategy, details={}, backend=backend, user=None, pipeline_index=1
  243. )
  244. self.assertEqual(response.status_code, 400)
  245. self.assertJsonResponseEquals(
  246. response,
  247. {
  248. "email": ["This field is required."],
  249. "username": ["This field is required."],
  250. },
  251. )
  252. def test_taken_data_rejected(self):
  253. """form rejects taken data"""
  254. request = create_request(
  255. data={"email": self.user.email, "username": self.user.username}
  256. )
  257. strategy = load_strategy(request=request)
  258. backend = GithubOAuth2(strategy, "/")
  259. response = create_user_with_form(
  260. strategy=strategy, details={}, backend=backend, user=None, pipeline_index=1
  261. )
  262. self.assertEqual(response.status_code, 400)
  263. self.assertJsonResponseEquals(
  264. response,
  265. {
  266. "email": ["This e-mail address is not available."],
  267. "username": ["This username is not available."],
  268. },
  269. )
  270. @override_dynamic_settings(account_activation="none")
  271. def test_user_created_no_activation_verified_email(self):
  272. """active user is created for verified email and activation disabled"""
  273. form_data = {"email": "social@auth.com", "username": "SocialUser"}
  274. request = create_request(data=form_data)
  275. strategy = load_strategy(request=request)
  276. backend = GithubOAuth2(strategy, "/")
  277. result = create_user_with_form(
  278. strategy=strategy,
  279. details={"email": form_data["email"]},
  280. backend=backend,
  281. user=None,
  282. pipeline_index=1,
  283. )
  284. new_user = User.objects.get(email="social@auth.com")
  285. self.assertEqual(result, {"user": new_user, "is_new": True})
  286. self.assertNewUserIsCorrect(
  287. new_user, form_data, activation="none", email_verified=True
  288. )
  289. @override_dynamic_settings(account_activation="none")
  290. def test_user_created_no_activation_nonverified_email(self):
  291. """active user is created for non-verified email and activation disabled"""
  292. form_data = {"email": "social@auth.com", "username": "SocialUser"}
  293. request = create_request(data=form_data)
  294. strategy = load_strategy(request=request)
  295. backend = GithubOAuth2(strategy, "/")
  296. result = create_user_with_form(
  297. strategy=strategy,
  298. details={"email": ""},
  299. backend=backend,
  300. user=None,
  301. pipeline_index=1,
  302. )
  303. new_user = User.objects.get(email="social@auth.com")
  304. self.assertEqual(result, {"user": new_user, "is_new": True})
  305. self.assertNewUserIsCorrect(
  306. new_user, form_data, activation="none", email_verified=False
  307. )
  308. @override_dynamic_settings(account_activation="user")
  309. def test_user_created_activation_by_user_verified_email(self):
  310. """active user is created for verified email and activation by user"""
  311. form_data = {"email": "social@auth.com", "username": "SocialUser"}
  312. request = create_request(data=form_data)
  313. strategy = load_strategy(request=request)
  314. backend = GithubOAuth2(strategy, "/")
  315. result = create_user_with_form(
  316. strategy=strategy,
  317. details={"email": form_data["email"]},
  318. backend=backend,
  319. user=None,
  320. pipeline_index=1,
  321. )
  322. new_user = User.objects.get(email="social@auth.com")
  323. self.assertEqual(result, {"user": new_user, "is_new": True})
  324. self.assertNewUserIsCorrect(
  325. new_user, form_data, activation="user", email_verified=True
  326. )
  327. @override_dynamic_settings(account_activation="user")
  328. def test_user_created_activation_by_user_nonverified_email(self):
  329. """inactive user is created for non-verified email and activation by user"""
  330. form_data = {"email": "social@auth.com", "username": "SocialUser"}
  331. request = create_request(data=form_data)
  332. strategy = load_strategy(request=request)
  333. backend = GithubOAuth2(strategy, "/")
  334. result = create_user_with_form(
  335. strategy=strategy,
  336. details={"email": ""},
  337. backend=backend,
  338. user=None,
  339. pipeline_index=1,
  340. )
  341. new_user = User.objects.get(email="social@auth.com")
  342. self.assertEqual(result, {"user": new_user, "is_new": True})
  343. self.assertNewUserIsCorrect(
  344. new_user, form_data, activation="user", email_verified=False
  345. )
  346. @override_dynamic_settings(account_activation="admin")
  347. def test_user_created_activation_by_admin_verified_email(self):
  348. """inactive user is created for verified email and activation by admin"""
  349. form_data = {"email": "social@auth.com", "username": "SocialUser"}
  350. request = create_request(data=form_data)
  351. strategy = load_strategy(request=request)
  352. backend = GithubOAuth2(strategy, "/")
  353. result = create_user_with_form(
  354. strategy=strategy,
  355. details={"email": form_data["email"]},
  356. backend=backend,
  357. user=None,
  358. pipeline_index=1,
  359. )
  360. new_user = User.objects.get(email="social@auth.com")
  361. self.assertEqual(result, {"user": new_user, "is_new": True})
  362. self.assertNewUserIsCorrect(
  363. new_user, form_data, activation="admin", email_verified=True
  364. )
  365. @override_dynamic_settings(account_activation="admin")
  366. def test_user_created_activation_by_admin_nonverified_email(self):
  367. """inactive user is created for non-verified email and activation by admin"""
  368. form_data = {"email": "social@auth.com", "username": "SocialUser"}
  369. request = create_request(data=form_data)
  370. strategy = load_strategy(request=request)
  371. backend = GithubOAuth2(strategy, "/")
  372. result = create_user_with_form(
  373. strategy=strategy,
  374. details={"email": ""},
  375. backend=backend,
  376. user=None,
  377. pipeline_index=1,
  378. )
  379. new_user = User.objects.get(email="social@auth.com")
  380. self.assertEqual(result, {"user": new_user, "is_new": True})
  381. self.assertNewUserIsCorrect(
  382. new_user, form_data, activation="admin", email_verified=False
  383. )
  384. def test_form_check_agreement(self):
  385. """social register checks agreement"""
  386. form_data = {"email": "social@auth.com", "username": "SocialUser"}
  387. request = create_request(data=form_data)
  388. strategy = load_strategy(request=request)
  389. backend = GithubOAuth2(strategy, "/")
  390. agreement = Agreement.objects.create(
  391. type=Agreement.TYPE_TOS, text="Lorem ipsum", is_active=True
  392. )
  393. response = create_user_with_form(
  394. strategy=strategy,
  395. details={"email": form_data["email"]},
  396. backend=backend,
  397. user=None,
  398. pipeline_index=1,
  399. )
  400. self.assertEqual(response.status_code, 400)
  401. self.assertJsonResponseEquals(
  402. response, {"terms_of_service": ["This agreement is required."]}
  403. )
  404. # invalid agreement id
  405. form_data = {
  406. "email": "social@auth.com",
  407. "username": "SocialUser",
  408. "terms_of_service": agreement.id + 1,
  409. }
  410. request = create_request(data=form_data)
  411. strategy = load_strategy(request=request)
  412. backend = GithubOAuth2(strategy, "/")
  413. response = create_user_with_form(
  414. strategy=strategy,
  415. details={"email": form_data["email"]},
  416. backend=backend,
  417. user=None,
  418. pipeline_index=1,
  419. )
  420. self.assertEqual(response.status_code, 400)
  421. self.assertJsonResponseEquals(
  422. response, {"terms_of_service": ["This agreement is required."]}
  423. )
  424. # valid agreement id
  425. form_data = {
  426. "email": "social@auth.com",
  427. "username": "SocialUser",
  428. "terms_of_service": agreement.id,
  429. }
  430. request = create_request(data=form_data)
  431. strategy = load_strategy(request=request)
  432. backend = GithubOAuth2(strategy, "/")
  433. result = create_user_with_form(
  434. strategy=strategy,
  435. details={"email": form_data["email"]},
  436. backend=backend,
  437. user=None,
  438. pipeline_index=1,
  439. )
  440. new_user = User.objects.get(email="social@auth.com")
  441. self.assertEqual(result, {"user": new_user, "is_new": True})
  442. self.assertEqual(new_user.agreements, [agreement.id])
  443. self.assertEqual(new_user.useragreement_set.count(), 1)
  444. def test_form_ignore_inactive_agreement(self):
  445. """social register ignores inactive agreement"""
  446. form_data = {
  447. "email": "social@auth.com",
  448. "username": "SocialUser",
  449. "terms_of_service": None,
  450. }
  451. request = create_request(data=form_data)
  452. strategy = load_strategy(request=request)
  453. backend = GithubOAuth2(strategy, "/")
  454. Agreement.objects.create(
  455. type=Agreement.TYPE_TOS, text="Lorem ipsum", is_active=False
  456. )
  457. result = create_user_with_form(
  458. strategy=strategy,
  459. details={"email": form_data["email"]},
  460. backend=backend,
  461. user=None,
  462. pipeline_index=1,
  463. )
  464. new_user = User.objects.get(email="social@auth.com")
  465. self.assertEqual(result, {"user": new_user, "is_new": True})
  466. self.assertEqual(new_user.agreements, [])
  467. self.assertEqual(new_user.useragreement_set.count(), 0)
  468. class GetUsernameTests(PipelineTestCase):
  469. def test_skip_if_user_is_set(self):
  470. """pipeline step is skipped if user was passed"""
  471. strategy = create_strategy()
  472. result = get_username(strategy, {}, None, user=self.user)
  473. self.assertIsNone(result)
  474. def test_skip_if_no_names(self):
  475. """pipeline step is skipped if API returned no names"""
  476. strategy = create_strategy()
  477. result = get_username(strategy, {}, None)
  478. self.assertIsNone(result)
  479. def test_resolve_to_username(self):
  480. """pipeline step resolves username"""
  481. strategy = create_strategy()
  482. result = get_username(strategy, {"username": "BobBoberson"}, None)
  483. self.assertEqual(result, {"clean_username": "BobBoberson"})
  484. def test_normalize_username(self):
  485. """pipeline step normalizes username"""
  486. strategy = create_strategy()
  487. result = get_username(strategy, {"username": "Błop Błoperson"}, None)
  488. self.assertEqual(result, {"clean_username": "BlopBloperson"})
  489. def test_resolve_to_first_name(self):
  490. """pipeline attempts to use first name because username is taken"""
  491. strategy = create_strategy()
  492. details = {"username": self.user.username, "first_name": "Błob"}
  493. result = get_username(strategy, details, None)
  494. self.assertEqual(result, {"clean_username": "Blob"})
  495. def test_dont_resolve_to_last_name(self):
  496. """pipeline will not fallback to last name because username is taken"""
  497. strategy = create_strategy()
  498. details = {"username": self.user.username, "last_name": "Błob"}
  499. result = get_username(strategy, details, None)
  500. self.assertIsNone(result)
  501. def test_resolve_to_first_last_name_first_char(self):
  502. """pipeline will construct username from first name and first char of surname"""
  503. strategy = create_strategy()
  504. details = {"first_name": self.user.username, "last_name": "Błob"}
  505. result = get_username(strategy, details, None)
  506. self.assertEqual(result, {"clean_username": self.user.username + "B"})
  507. def test_dont_resolve_to_banned_name(self):
  508. """pipeline will not resolve to banned name"""
  509. strategy = create_strategy()
  510. Ban.objects.create(banned_value="*Admin*", check_type=Ban.USERNAME)
  511. details = {"username": "Misago Admin", "first_name": "Błob"}
  512. result = get_username(strategy, details, None)
  513. self.assertEqual(result, {"clean_username": "Blob"})
  514. def test_resolve_full_name(self):
  515. """pipeline will resolve to full name"""
  516. strategy = create_strategy()
  517. Ban.objects.create(banned_value="*Admin*", check_type=Ban.USERNAME)
  518. details = {"username": "Misago Admin", "full_name": "Błob Błopo"}
  519. result = get_username(strategy, details, None)
  520. self.assertEqual(result, {"clean_username": "BlobBlopo"})
  521. def test_resolve_to_cut_name(self):
  522. """pipeline will resolve cut too long name on second pass"""
  523. strategy = create_strategy()
  524. details = {"username": "Abrakadabrapokuskonstantynopolitańczykowianeczkatrzy"}
  525. result = get_username(strategy, details, None)
  526. self.assertEqual(result, {"clean_username": "Abrakadabrapok"})
  527. class RequireActivationTests(PipelineTestCase):
  528. def setUp(self):
  529. super().setUp()
  530. self.user.requires_activation = User.ACTIVATION_ADMIN
  531. self.user.save()
  532. def test_skip_if_user_not_set(self):
  533. """pipeline step is skipped if user is not set"""
  534. request = create_request()
  535. strategy = load_strategy(request=request)
  536. backend = GithubOAuth2(strategy, "/")
  537. result = require_activation(
  538. strategy=strategy, details={}, backend=backend, user=None, pipeline_index=1
  539. )
  540. self.assertEqual(result, {})
  541. def test_partial_token_if_user_not_set_no_showstopper(self):
  542. """pipeline step handles set session token if user is not set"""
  543. request = create_request()
  544. strategy = load_strategy(request=request)
  545. strategy.request.session["partial_pipeline_token"] = "test-token"
  546. backend = GithubOAuth2(strategy, "/")
  547. require_activation(
  548. strategy=strategy, details={}, backend=backend, user=None, pipeline_index=1
  549. )
  550. def test_skip_if_user_is_active(self):
  551. """pipeline step is skipped if user is active"""
  552. self.user.requires_activation = User.ACTIVATION_NONE
  553. self.user.save()
  554. self.assertFalse(self.user.requires_activation)
  555. request = create_request()
  556. strategy = load_strategy(request=request)
  557. backend = GithubOAuth2(strategy, "/")
  558. result = require_activation(
  559. strategy=strategy,
  560. details={},
  561. backend=backend,
  562. user=self.user,
  563. pipeline_index=1,
  564. )
  565. self.assertEqual(result, {})
  566. def test_pipeline_returns_html_response_on_get(self):
  567. """pipeline step renders http response for GET request and inactive user"""
  568. request = create_request()
  569. strategy = load_strategy(request=request)
  570. backend = GithubOAuth2(strategy, "/")
  571. response = require_activation(
  572. strategy=strategy,
  573. details={},
  574. backend=backend,
  575. user=self.user,
  576. pipeline_index=1,
  577. )
  578. self.assertEqual(response.status_code, 200)
  579. self.assertEqual(response["content-type"], "text/html; charset=utf-8")
  580. def test_pipeline_returns_json_response_on_post(self):
  581. """pipeline step renders json response for POST request and inactive user"""
  582. request = create_request(data={"username": "anything"})
  583. strategy = load_strategy(request=request)
  584. backend = GithubOAuth2(strategy, "/")
  585. response = require_activation(
  586. strategy=strategy,
  587. details={},
  588. backend=backend,
  589. user=self.user,
  590. pipeline_index=1,
  591. )
  592. self.assertEqual(response.status_code, 200)
  593. self.assertEqual(response["content-type"], "application/json")
  594. self.assertJsonResponseEquals(
  595. response,
  596. {
  597. "step": "done",
  598. "backend_name": "GitHub",
  599. "activation": "admin",
  600. "email": "test@user.com",
  601. "username": "TestUser",
  602. },
  603. )
  604. class ValidateIpNotBannedTests(PipelineTestCase):
  605. def test_skip_if_user_not_set(self):
  606. """pipeline step is skipped if no user was passed"""
  607. result = validate_ip_not_banned(None, {}, GithubOAuth2)
  608. self.assertIsNone(result)
  609. def test_raise_if_banned(self):
  610. """pipeline raises if user's IP is banned"""
  611. Ban.objects.create(banned_value="188.*", check_type=Ban.IP)
  612. try:
  613. validate_ip_not_banned(
  614. MockStrategy(user_ip="188.1.2.3"), {}, GithubOAuth2, self.user
  615. )
  616. self.fail("validate_ip_not_banned should raise SocialAuthBanned")
  617. except SocialAuthBanned as e:
  618. self.assertTrue(isinstance(e.ban, Ban))
  619. def test_exclude_staff(self):
  620. """pipeline excludes staff from bans"""
  621. self.user.is_staff = True
  622. self.user.save()
  623. Ban.objects.create(banned_value="188.*", check_type=Ban.IP)
  624. result = validate_ip_not_banned(
  625. MockStrategy(user_ip="188.1.2.3"), {}, GithubOAuth2, self.user
  626. )
  627. self.assertIsNone(result)
  628. class ValidateUserNotBannedTests(PipelineTestCase):
  629. def test_skip_if_user_not_set(self):
  630. """pipeline step is skipped if no user was passed"""
  631. result = validate_user_not_banned(None, {}, GithubOAuth2)
  632. self.assertIsNone(result)
  633. def test_raise_if_banned(self):
  634. """pipeline raises if user's IP is banned"""
  635. Ban.objects.create(banned_value=self.user.username, check_type=Ban.USERNAME)
  636. try:
  637. validate_user_not_banned(MockStrategy(), {}, GithubOAuth2, self.user)
  638. self.fail("validate_ip_not_banned should raise SocialAuthBanned")
  639. except SocialAuthBanned as e:
  640. self.assertEqual(e.ban.user, self.user)
  641. self.assertTrue(isinstance(e.ban, BanCache))
  642. def test_exclude_staff(self):
  643. """pipeline excludes staff from bans"""
  644. self.user.is_staff = True
  645. self.user.save()
  646. Ban.objects.create(banned_value=self.user.username, check_type=Ban.USERNAME)
  647. result = validate_user_not_banned(MockStrategy(), {}, GithubOAuth2, self.user)
  648. self.assertIsNone(result)