test_social_pipeline.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815
  1. import json
  2. from django.contrib.auth import get_user_model
  3. from django.core import mail
  4. from django.test import RequestFactory
  5. from social_core.backends.github import GithubOAuth2
  6. from social_django.utils import load_strategy
  7. from misago.acl import ACL_CACHE
  8. from misago.acl.useracl import get_user_acl
  9. from misago.conf.dynamicsettings import DynamicSettings
  10. from misago.core.exceptions import SocialAuthFailed, SocialAuthBanned
  11. from misago.conf.test import override_dynamic_settings
  12. from misago.conftest import get_cache_versions
  13. from misago.legal.models import Agreement
  14. from misago.users.models import AnonymousUser, Ban, BanCache
  15. from misago.users.social.pipeline import (
  16. associate_by_email, create_user, create_user_with_form, get_username, require_activation,
  17. validate_ip_not_banned, validate_user_not_banned
  18. )
  19. from misago.users.testutils import UserTestCase
  20. UserModel = get_user_model()
  21. def create_request(user_ip='0.0.0.0', data=None):
  22. factory = RequestFactory()
  23. if data is None:
  24. request = factory.get('/')
  25. else:
  26. request = factory.post('/', data=json.dumps(data), content_type='application/json')
  27. request.include_frontend_context = True
  28. request.cache_versions = get_cache_versions()
  29. request.frontend_context = {}
  30. request.session = {}
  31. request.settings = DynamicSettings(request.cache_versions)
  32. request.user = AnonymousUser()
  33. request.user_acl = get_user_acl(request.user, request.cache_versions)
  34. request.user_ip = user_ip
  35. return request
  36. def create_strategy():
  37. request = create_request()
  38. return load_strategy(request=request)
  39. class MockStrategy(object):
  40. def __init__(self, user_ip='0.0.0.0'):
  41. self.cleaned_partial_token = None
  42. self.request = create_request(user_ip)
  43. def clean_partial_pipeline(self, token):
  44. self.cleaned_partial_token = token
  45. class PipelineTestCase(UserTestCase):
  46. def get_initial_user(self):
  47. self.user = self.get_authenticated_user()
  48. def assertNewUserIsCorrect(
  49. self, new_user, form_data=None, activation=None, email_verified=False
  50. ):
  51. self.assertFalse(new_user.has_usable_password())
  52. self.assertIn('Welcome', mail.outbox[0].subject)
  53. if form_data:
  54. self.assertEqual(new_user.email, form_data['email'])
  55. self.assertEqual(new_user.username, form_data['username'])
  56. if activation == 'none':
  57. self.assertEqual(new_user.requires_activation, UserModel.ACTIVATION_NONE)
  58. if activation == 'user':
  59. if email_verified:
  60. self.assertEqual(new_user.requires_activation, UserModel.ACTIVATION_NONE)
  61. else:
  62. self.assertEqual(new_user.requires_activation, UserModel.ACTIVATION_USER)
  63. if activation == 'admin':
  64. self.assertEqual(new_user.requires_activation, UserModel.ACTIVATION_ADMIN)
  65. self.assertEqual(new_user.audittrail_set.count(), 1)
  66. def assertJsonResponseEquals(self, response, value):
  67. response_content = response.content.decode("utf-8")
  68. response_json = json.loads(response_content)
  69. self.assertEqual(response_json, value)
  70. class AssociateByEmailTests(PipelineTestCase):
  71. def test_skip_if_user_is_already_set(self):
  72. """pipeline step is skipped if user was found by previous step"""
  73. result = associate_by_email(None, {}, GithubOAuth2, self.user)
  74. self.assertIsNone(result)
  75. def test_skip_if_no_email_passed(self):
  76. """pipeline step is skipped if no email was passed"""
  77. result = associate_by_email(None, {}, GithubOAuth2)
  78. self.assertIsNone(result)
  79. def test_skip_if_user_with_email_not_found(self):
  80. """pipeline step is skipped if no email was passed"""
  81. result = associate_by_email(None, {'email': 'not@found.com'}, GithubOAuth2)
  82. self.assertIsNone(result)
  83. def test_raise_if_user_is_inactive(self):
  84. """pipeline raises if user was inactive"""
  85. self.user.is_active = False
  86. self.user.save()
  87. try:
  88. associate_by_email(None, {'email': self.user.email}, GithubOAuth2)
  89. self.fail("associate_by_email should raise SocialAuthFailed")
  90. except SocialAuthFailed as e:
  91. self.assertEqual(
  92. e.message,
  93. (
  94. "The e-mail address associated with your GitHub account is not available for "
  95. "use on this site."
  96. ),
  97. )
  98. def test_raise_if_user_needs_admin_activation(self):
  99. """pipeline raises if user needs admin activation"""
  100. self.user.requires_activation = UserModel.ACTIVATION_ADMIN
  101. self.user.save()
  102. try:
  103. associate_by_email(None, {'email': self.user.email}, GithubOAuth2)
  104. self.fail("associate_by_email should raise SocialAuthFailed")
  105. except SocialAuthFailed as e:
  106. self.assertEqual(
  107. e.message,
  108. (
  109. "Your account has to be activated by site administrator before you will be "
  110. "able to sign in with GitHub."
  111. ),
  112. )
  113. def test_return_user(self):
  114. """pipeline returns user if email was found"""
  115. result = associate_by_email(None, {'email': self.user.email}, GithubOAuth2)
  116. self.assertEqual(result, {'user': self.user, 'is_new': False})
  117. def test_return_user_email_inactive(self):
  118. """pipeline returns user even if they didn't activate their account manually"""
  119. self.user.requires_activation = UserModel.ACTIVATION_USER
  120. self.user.save()
  121. result = associate_by_email(None, {'email': self.user.email}, GithubOAuth2)
  122. self.assertEqual(result, {'user': self.user, 'is_new': False})
  123. class CreateUser(PipelineTestCase):
  124. def test_skip_if_user_is_set(self):
  125. """pipeline step is skipped if user was passed"""
  126. result = create_user(MockStrategy(), {}, GithubOAuth2(), user=self.user)
  127. self.assertIsNone(result)
  128. def test_skip_if_no_email_passed(self):
  129. """pipeline step is skipped if no email was passed"""
  130. result = create_user(
  131. MockStrategy(),
  132. {},
  133. GithubOAuth2(),
  134. clean_username='TestBob',
  135. )
  136. self.assertIsNone(result)
  137. def test_skip_if_no_clean_username_passed(self):
  138. """pipeline step is skipped if cleaned username wasnt passed"""
  139. result = create_user(
  140. MockStrategy(),
  141. {'email': 'hello@example.com'},
  142. GithubOAuth2(),
  143. )
  144. self.assertIsNone(result)
  145. def test_skip_if_email_is_taken(self):
  146. """pipeline step is skipped if email was taken"""
  147. result = create_user(
  148. MockStrategy(),
  149. {'email': self.user.email},
  150. GithubOAuth2(),
  151. clean_username='NewUser',
  152. )
  153. self.assertIsNone(result)
  154. @override_dynamic_settings(account_activation='none')
  155. def test_user_created_no_activation(self):
  156. """pipeline step creates active user for valid data and disabled activation"""
  157. result = create_user(
  158. MockStrategy(),
  159. {'email': 'new@example.com'},
  160. GithubOAuth2(),
  161. clean_username='NewUser',
  162. )
  163. new_user = UserModel.objects.get(email='new@example.com')
  164. self.assertEqual(result, {
  165. 'user': new_user,
  166. 'is_new': True,
  167. })
  168. self.assertEqual(new_user.username, 'NewUser')
  169. self.assertNewUserIsCorrect(new_user, email_verified=True, activation='none')
  170. @override_dynamic_settings(account_activation='user')
  171. def test_user_created_activation_by_user(self):
  172. """pipeline step creates active user for valid data and user activation"""
  173. result = create_user(
  174. MockStrategy(),
  175. {'email': 'new@example.com'},
  176. GithubOAuth2(),
  177. clean_username='NewUser',
  178. )
  179. new_user = UserModel.objects.get(email='new@example.com')
  180. self.assertEqual(result, {
  181. 'user': new_user,
  182. 'is_new': True,
  183. })
  184. self.assertEqual(new_user.username, 'NewUser')
  185. self.assertNewUserIsCorrect(new_user, email_verified=True, activation='user')
  186. @override_dynamic_settings(account_activation='admin')
  187. def test_user_created_activation_by_admin(self):
  188. """pipeline step creates in user for valid data and admin activation"""
  189. result = create_user(
  190. MockStrategy(),
  191. {'email': 'new@example.com'},
  192. GithubOAuth2(),
  193. clean_username='NewUser',
  194. )
  195. new_user = UserModel.objects.get(email='new@example.com')
  196. self.assertEqual(result, {
  197. 'user': new_user,
  198. 'is_new': True,
  199. })
  200. self.assertEqual(new_user.username, 'NewUser')
  201. self.assertNewUserIsCorrect(new_user, email_verified=True, activation='admin')
  202. class CreateUserWithFormTests(PipelineTestCase):
  203. def setUp(self):
  204. super().setUp()
  205. Agreement.objects.invalidate_cache()
  206. def tearDown(self):
  207. super().tearDown()
  208. Agreement.objects.invalidate_cache()
  209. def test_skip_if_user_is_set(self):
  210. """pipeline step is skipped if user was passed"""
  211. request = create_request()
  212. strategy = load_strategy(request=request)
  213. backend = GithubOAuth2(strategy, '/')
  214. result = create_user_with_form(
  215. strategy=strategy,
  216. details={},
  217. backend=backend,
  218. user=self.user,
  219. pipeline_index=1,
  220. )
  221. self.assertEqual(result, {})
  222. def test_renders_form_if_not_post(self):
  223. """pipeline step renders form if not POST"""
  224. request = create_request()
  225. strategy = load_strategy(request=request)
  226. backend = GithubOAuth2(strategy, '/')
  227. response = create_user_with_form(
  228. strategy=strategy,
  229. details={},
  230. backend=backend,
  231. user=None,
  232. pipeline_index=1,
  233. )
  234. self.assertContains(response, "GitHub")
  235. def test_empty_data_rejected(self):
  236. """form rejects empty data"""
  237. request = create_request(data={})
  238. strategy = load_strategy(request=request)
  239. backend = GithubOAuth2(strategy, '/')
  240. response = create_user_with_form(
  241. strategy=strategy,
  242. details={},
  243. backend=backend,
  244. user=None,
  245. pipeline_index=1,
  246. )
  247. self.assertEqual(response.status_code, 400)
  248. self.assertJsonResponseEquals(response, {
  249. 'email': ["This field is required."],
  250. 'username': ["This field is required."],
  251. })
  252. def test_taken_data_rejected(self):
  253. """form rejects taken data"""
  254. request = create_request(data={
  255. 'email': self.user.email,
  256. 'username': self.user.username,
  257. })
  258. strategy = load_strategy(request=request)
  259. backend = GithubOAuth2(strategy, '/')
  260. response = create_user_with_form(
  261. strategy=strategy,
  262. details={},
  263. backend=backend,
  264. user=None,
  265. pipeline_index=1,
  266. )
  267. self.assertEqual(response.status_code, 400)
  268. self.assertJsonResponseEquals(response, {
  269. 'email': ["This e-mail address is not available."],
  270. 'username': ["This username is not available."],
  271. })
  272. @override_dynamic_settings(account_activation='none')
  273. def test_user_created_no_activation_verified_email(self):
  274. """active user is created for verified email and activation disabled"""
  275. form_data = {
  276. 'email': 'social@auth.com',
  277. 'username': 'SocialUser',
  278. }
  279. request = create_request(data=form_data)
  280. strategy = load_strategy(request=request)
  281. backend = GithubOAuth2(strategy, '/')
  282. result = create_user_with_form(
  283. strategy=strategy,
  284. details={'email': form_data['email']},
  285. backend=backend,
  286. user=None,
  287. pipeline_index=1,
  288. )
  289. new_user = UserModel.objects.get(email='social@auth.com')
  290. self.assertEqual(result, {'user': new_user, 'is_new': True})
  291. self.assertNewUserIsCorrect(new_user, form_data, activation='none', email_verified=True)
  292. @override_dynamic_settings(account_activation='none')
  293. def test_user_created_no_activation_nonverified_email(self):
  294. """active user is created for non-verified email and activation disabled"""
  295. form_data = {
  296. 'email': 'social@auth.com',
  297. 'username': 'SocialUser',
  298. }
  299. request = create_request(data=form_data)
  300. strategy = load_strategy(request=request)
  301. backend = GithubOAuth2(strategy, '/')
  302. result = create_user_with_form(
  303. strategy=strategy,
  304. details={'email': ''},
  305. backend=backend,
  306. user=None,
  307. pipeline_index=1,
  308. )
  309. new_user = UserModel.objects.get(email='social@auth.com')
  310. self.assertEqual(result, {'user': new_user, 'is_new': True})
  311. self.assertNewUserIsCorrect(new_user, form_data, activation='none', email_verified=False)
  312. @override_dynamic_settings(account_activation='user')
  313. def test_user_created_activation_by_user_verified_email(self):
  314. """active user is created for verified email and activation by user"""
  315. form_data = {
  316. 'email': 'social@auth.com',
  317. 'username': 'SocialUser',
  318. }
  319. request = create_request(data=form_data)
  320. strategy = load_strategy(request=request)
  321. backend = GithubOAuth2(strategy, '/')
  322. result = create_user_with_form(
  323. strategy=strategy,
  324. details={'email': form_data['email']},
  325. backend=backend,
  326. user=None,
  327. pipeline_index=1,
  328. )
  329. new_user = UserModel.objects.get(email='social@auth.com')
  330. self.assertEqual(result, {'user': new_user, 'is_new': True})
  331. self.assertNewUserIsCorrect(new_user, form_data, activation='user', email_verified=True)
  332. @override_dynamic_settings(account_activation='user')
  333. def test_user_created_activation_by_user_nonverified_email(self):
  334. """inactive user is created for non-verified email and activation by user"""
  335. form_data = {
  336. 'email': 'social@auth.com',
  337. 'username': 'SocialUser',
  338. }
  339. request = create_request(data=form_data)
  340. strategy = load_strategy(request=request)
  341. backend = GithubOAuth2(strategy, '/')
  342. result = create_user_with_form(
  343. strategy=strategy,
  344. details={'email': ''},
  345. backend=backend,
  346. user=None,
  347. pipeline_index=1,
  348. )
  349. new_user = UserModel.objects.get(email='social@auth.com')
  350. self.assertEqual(result, {'user': new_user, 'is_new': True})
  351. self.assertNewUserIsCorrect(new_user, form_data, activation='user', email_verified=False)
  352. @override_dynamic_settings(account_activation='admin')
  353. def test_user_created_activation_by_admin_verified_email(self):
  354. """inactive user is created for verified email and activation by admin"""
  355. form_data = {
  356. 'email': 'social@auth.com',
  357. 'username': 'SocialUser',
  358. }
  359. request = create_request(data=form_data)
  360. strategy = load_strategy(request=request)
  361. backend = GithubOAuth2(strategy, '/')
  362. result = create_user_with_form(
  363. strategy=strategy,
  364. details={'email': form_data['email']},
  365. backend=backend,
  366. user=None,
  367. pipeline_index=1,
  368. )
  369. new_user = UserModel.objects.get(email='social@auth.com')
  370. self.assertEqual(result, {'user': new_user, 'is_new': True})
  371. self.assertNewUserIsCorrect(new_user, form_data, activation='admin', email_verified=True)
  372. @override_dynamic_settings(account_activation='admin')
  373. def test_user_created_activation_by_admin_nonverified_email(self):
  374. """inactive user is created for non-verified email and activation by admin"""
  375. form_data = {
  376. 'email': 'social@auth.com',
  377. 'username': 'SocialUser',
  378. }
  379. request = create_request(data=form_data)
  380. strategy = load_strategy(request=request)
  381. backend = GithubOAuth2(strategy, '/')
  382. result = create_user_with_form(
  383. strategy=strategy,
  384. details={'email': ''},
  385. backend=backend,
  386. user=None,
  387. pipeline_index=1,
  388. )
  389. new_user = UserModel.objects.get(email='social@auth.com')
  390. self.assertEqual(result, {'user': new_user, 'is_new': True})
  391. self.assertNewUserIsCorrect(new_user, form_data, activation='admin', email_verified=False)
  392. def test_form_check_agreement(self):
  393. """social register checks agreement"""
  394. form_data = {
  395. 'email': 'social@auth.com',
  396. 'username': 'SocialUser',
  397. }
  398. request = create_request(data=form_data)
  399. strategy = load_strategy(request=request)
  400. backend = GithubOAuth2(strategy, '/')
  401. agreement = Agreement.objects.create(
  402. type=Agreement.TYPE_TOS,
  403. text="Lorem ipsum",
  404. is_active=True,
  405. )
  406. response = create_user_with_form(
  407. strategy=strategy,
  408. details={'email': form_data['email']},
  409. backend=backend,
  410. user=None,
  411. pipeline_index=1,
  412. )
  413. self.assertEqual(response.status_code, 400)
  414. self.assertJsonResponseEquals(response, {
  415. 'terms_of_service': ['This agreement is required.'],
  416. })
  417. # invalid agreement id
  418. form_data = {
  419. 'email': 'social@auth.com',
  420. 'username': 'SocialUser',
  421. 'terms_of_service': agreement.id + 1,
  422. }
  423. request = create_request(data=form_data)
  424. strategy = load_strategy(request=request)
  425. backend = GithubOAuth2(strategy, '/')
  426. response = create_user_with_form(
  427. strategy=strategy,
  428. details={'email': form_data['email']},
  429. backend=backend,
  430. user=None,
  431. pipeline_index=1,
  432. )
  433. self.assertEqual(response.status_code, 400)
  434. self.assertJsonResponseEquals(response, {
  435. 'terms_of_service': ['This agreement is required.'],
  436. })
  437. # valid agreement id
  438. form_data = {
  439. 'email': 'social@auth.com',
  440. 'username': 'SocialUser',
  441. 'terms_of_service': agreement.id,
  442. }
  443. request = create_request(data=form_data)
  444. strategy = load_strategy(request=request)
  445. backend = GithubOAuth2(strategy, '/')
  446. result = create_user_with_form(
  447. strategy=strategy,
  448. details={'email': form_data['email']},
  449. backend=backend,
  450. user=None,
  451. pipeline_index=1,
  452. )
  453. new_user = UserModel.objects.get(email='social@auth.com')
  454. self.assertEqual(result, {'user': new_user, 'is_new': True})
  455. self.assertEqual(new_user.agreements, [agreement.id])
  456. self.assertEqual(new_user.useragreement_set.count(), 1)
  457. def test_form_ignore_inactive_agreement(self):
  458. """social register ignores inactive agreement"""
  459. form_data = {
  460. 'email': 'social@auth.com',
  461. 'username': 'SocialUser',
  462. 'terms_of_service': None,
  463. }
  464. request = create_request(data=form_data)
  465. strategy = load_strategy(request=request)
  466. backend = GithubOAuth2(strategy, '/')
  467. Agreement.objects.create(
  468. type=Agreement.TYPE_TOS,
  469. text="Lorem ipsum",
  470. is_active=False,
  471. )
  472. result = create_user_with_form(
  473. strategy=strategy,
  474. details={'email': form_data['email']},
  475. backend=backend,
  476. user=None,
  477. pipeline_index=1,
  478. )
  479. new_user = UserModel.objects.get(email='social@auth.com')
  480. self.assertEqual(result, {'user': new_user, 'is_new': True})
  481. self.assertEqual(new_user.agreements, [])
  482. self.assertEqual(new_user.useragreement_set.count(), 0)
  483. class GetUsernameTests(PipelineTestCase):
  484. def test_skip_if_user_is_set(self):
  485. """pipeline step is skipped if user was passed"""
  486. strategy = create_strategy()
  487. result = get_username(strategy, {}, None, user=self.user)
  488. self.assertIsNone(result)
  489. def test_skip_if_no_names(self):
  490. """pipeline step is skipped if API returned no names"""
  491. strategy = create_strategy()
  492. result = get_username(strategy, {}, None)
  493. self.assertIsNone(result)
  494. def test_resolve_to_username(self):
  495. """pipeline step resolves username"""
  496. strategy = create_strategy()
  497. result = get_username(strategy, {'username': 'BobBoberson'}, None)
  498. self.assertEqual(result, {'clean_username': 'BobBoberson'})
  499. def test_normalize_username(self):
  500. """pipeline step normalizes username"""
  501. strategy = create_strategy()
  502. result = get_username(strategy, {'username': 'Błop Błoperson'}, None)
  503. self.assertEqual(result, {'clean_username': 'BlopBloperson'})
  504. def test_resolve_to_first_name(self):
  505. """pipeline attempts to use first name because username is taken"""
  506. strategy = create_strategy()
  507. details = {
  508. 'username': self.user.username,
  509. 'first_name': 'Błob',
  510. }
  511. result = get_username(strategy, details, None)
  512. self.assertEqual(result, {'clean_username': 'Blob'})
  513. def test_dont_resolve_to_last_name(self):
  514. """pipeline will not fallback to last name because username is taken"""
  515. strategy = create_strategy()
  516. details = {
  517. 'username': self.user.username,
  518. 'last_name': 'Błob',
  519. }
  520. result = get_username(strategy, details, None)
  521. self.assertIsNone(result)
  522. def test_resolve_to_first_last_name_first_char(self):
  523. """pipeline will construct username from first name and first char of surname"""
  524. strategy = create_strategy()
  525. details = {
  526. 'first_name': self.user.username,
  527. 'last_name': 'Błob',
  528. }
  529. result = get_username(strategy, details, None)
  530. self.assertEqual(result, {'clean_username': self.user.username + 'B'})
  531. def test_dont_resolve_to_banned_name(self):
  532. """pipeline will not resolve to banned name"""
  533. strategy = create_strategy()
  534. Ban.objects.create(banned_value='*Admin*', check_type=Ban.USERNAME)
  535. details = {
  536. 'username': 'Misago Admin',
  537. 'first_name': 'Błob',
  538. }
  539. result = get_username(strategy, details, None)
  540. self.assertEqual(result, {'clean_username': 'Blob'})
  541. def test_resolve_full_name(self):
  542. """pipeline will resolve to full name"""
  543. strategy = create_strategy()
  544. Ban.objects.create(banned_value='*Admin*', check_type=Ban.USERNAME)
  545. details = {
  546. 'username': 'Misago Admin',
  547. 'full_name': 'Błob Błopo',
  548. }
  549. result = get_username(strategy, details, None)
  550. self.assertEqual(result, {'clean_username': 'BlobBlopo'})
  551. def test_resolve_to_cut_name(self):
  552. """pipeline will resolve cut too long name on second pass"""
  553. strategy = create_strategy()
  554. details = {
  555. 'username': 'Abrakadabrapokuskonstantynopolitańczykowianeczkatrzy',
  556. }
  557. result = get_username(strategy, details, None)
  558. self.assertEqual(result, {'clean_username': 'Abrakadabrapok'})
  559. class RequireActivationTests(PipelineTestCase):
  560. def setUp(self):
  561. super().setUp()
  562. self.user.requires_activation = UserModel.ACTIVATION_ADMIN
  563. self.user.save()
  564. def test_skip_if_user_not_set(self):
  565. """pipeline step is skipped if user is not set"""
  566. request = create_request()
  567. strategy = load_strategy(request=request)
  568. backend = GithubOAuth2(strategy, '/')
  569. result = require_activation(
  570. strategy=strategy,
  571. details={},
  572. backend=backend,
  573. user=None,
  574. pipeline_index=1,
  575. )
  576. self.assertEqual(result, {})
  577. def test_partial_token_if_user_not_set_no_showstopper(self):
  578. """pipeline step handles set session token if user is not set"""
  579. request = create_request()
  580. strategy = load_strategy(request=request)
  581. strategy.request.session['partial_pipeline_token'] = 'test-token'
  582. backend = GithubOAuth2(strategy, '/')
  583. require_activation(
  584. strategy=strategy,
  585. details={},
  586. backend=backend,
  587. user=None,
  588. pipeline_index=1,
  589. )
  590. def test_skip_if_user_is_active(self):
  591. """pipeline step is skipped if user is active"""
  592. self.user.requires_activation = UserModel.ACTIVATION_NONE
  593. self.user.save()
  594. self.assertFalse(self.user.requires_activation)
  595. request = create_request()
  596. strategy = load_strategy(request=request)
  597. backend = GithubOAuth2(strategy, '/')
  598. result = require_activation(
  599. strategy=strategy,
  600. details={},
  601. backend=backend,
  602. user=self.user,
  603. pipeline_index=1,
  604. )
  605. self.assertEqual(result, {})
  606. def test_pipeline_returns_html_responseon_get(self):
  607. """pipeline step renders http response for GET request and inactive user"""
  608. request = create_request()
  609. strategy = load_strategy(request=request)
  610. backend = GithubOAuth2(strategy, '/')
  611. response = require_activation(
  612. strategy=strategy,
  613. details={},
  614. backend=backend,
  615. user=self.user,
  616. pipeline_index=1,
  617. )
  618. self.assertEqual(response.status_code, 200)
  619. self.assertEqual(response['content-type'], 'text/html; charset=utf-8')
  620. def test_pipeline_returns_json_response_on_post(self):
  621. """pipeline step renders json response for POST request and inactive user"""
  622. request = create_request(data={'username': 'anything'})
  623. strategy = load_strategy(request=request)
  624. backend = GithubOAuth2(strategy, '/')
  625. response = require_activation(
  626. strategy=strategy,
  627. details={},
  628. backend=backend,
  629. user=self.user,
  630. pipeline_index=1,
  631. )
  632. self.assertEqual(response.status_code, 200)
  633. self.assertEqual(response['content-type'], 'application/json')
  634. self.assertJsonResponseEquals(response, {
  635. 'step': 'done',
  636. 'backend_name': 'GitHub',
  637. 'activation': 'admin',
  638. 'email': 'test@user.com',
  639. 'username': 'TestUser',
  640. })
  641. class ValidateIpNotBannedTests(PipelineTestCase):
  642. def test_skip_if_user_not_set(self):
  643. """pipeline step is skipped if no user was passed"""
  644. result = validate_ip_not_banned(None, {}, GithubOAuth2)
  645. self.assertIsNone(result)
  646. def test_raise_if_banned(self):
  647. """pipeline raises if user's IP is banned"""
  648. Ban.objects.create(banned_value='188.*', check_type=Ban.IP)
  649. try:
  650. validate_ip_not_banned(MockStrategy(user_ip='188.1.2.3'), {}, GithubOAuth2, self.user)
  651. self.fail("validate_ip_not_banned should raise SocialAuthBanned")
  652. except SocialAuthBanned as e:
  653. self.assertTrue(isinstance(e.ban, Ban))
  654. def test_exclude_staff(self):
  655. """pipeline excludes staff from bans"""
  656. self.user.is_staff = True
  657. self.user.save()
  658. Ban.objects.create(banned_value='188.*', check_type=Ban.IP)
  659. result = validate_ip_not_banned(
  660. MockStrategy(user_ip='188.1.2.3'), {}, GithubOAuth2, self.user)
  661. self.assertIsNone(result)
  662. class ValidateUserNotBannedTests(PipelineTestCase):
  663. def test_skip_if_user_not_set(self):
  664. """pipeline step is skipped if no user was passed"""
  665. result = validate_user_not_banned(None, {}, GithubOAuth2)
  666. self.assertIsNone(result)
  667. def test_raise_if_banned(self):
  668. """pipeline raises if user's IP is banned"""
  669. Ban.objects.create(banned_value=self.user.username, check_type=Ban.USERNAME)
  670. try:
  671. validate_user_not_banned(MockStrategy(), {}, GithubOAuth2, self.user)
  672. self.fail("validate_ip_not_banned should raise SocialAuthBanned")
  673. except SocialAuthBanned as e:
  674. self.assertEqual(e.ban.user, self.user)
  675. self.assertTrue(isinstance(e.ban, BanCache))
  676. def test_exclude_staff(self):
  677. """pipeline excludes staff from bans"""
  678. self.user.is_staff = True
  679. self.user.save()
  680. Ban.objects.create(banned_value=self.user.username, check_type=Ban.USERNAME)
  681. result = validate_user_not_banned(MockStrategy(), {}, GithubOAuth2, self.user)
  682. self.assertIsNone(result)