test_social_pipeline.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796
  1. import json
  2. from django.contrib.auth import get_user_model
  3. from django.core import mail
  4. from django.test import RequestFactory, override_settings
  5. from social_core.backends.github import GithubOAuth2
  6. from social_django.utils import load_strategy
  7. from misago.acl import ACL_CACHE
  8. from misago.acl.useracl import get_user_acl
  9. from misago.core.exceptions import SocialAuthFailed, SocialAuthBanned
  10. from misago.legal.models import Agreement
  11. from misago.users.constants import BANS_CACHE
  12. from misago.users.models import AnonymousUser, Ban, BanCache
  13. from misago.users.social.pipeline import (
  14. associate_by_email, create_user, create_user_with_form, get_username, require_activation,
  15. validate_ip_not_banned, validate_user_not_banned
  16. )
  17. from misago.users.testutils import UserTestCase
  18. UserModel = get_user_model()
  19. def create_request(user_ip='0.0.0.0', data=None):
  20. factory = RequestFactory()
  21. if data is None:
  22. request = factory.get('/')
  23. else:
  24. request = factory.post('/', data=json.dumps(data), content_type='application/json')
  25. request.include_frontend_context = True
  26. request.cache_versions = {BANS_CACHE: "abcdefgh", ACL_CACHE: "abcdefgh"}
  27. request.frontend_context = {}
  28. request.session = {}
  29. request.user = AnonymousUser()
  30. request.user_acl = get_user_acl(request.user, request.cache_versions)
  31. request.user_ip = user_ip
  32. return request
  33. class MockStrategy(object):
  34. def __init__(self, user_ip='0.0.0.0'):
  35. self.cleaned_partial_token = None
  36. self.request = create_request(user_ip)
  37. def clean_partial_pipeline(self, token):
  38. self.cleaned_partial_token = token
  39. class PipelineTestCase(UserTestCase):
  40. def get_initial_user(self):
  41. self.user = self.get_authenticated_user()
  42. def assertNewUserIsCorrect(
  43. self, new_user, form_data=None, activation=None, email_verified=False):
  44. self.assertFalse(new_user.has_usable_password())
  45. self.assertIn('Welcome', mail.outbox[0].subject)
  46. if form_data:
  47. self.assertEqual(new_user.email, form_data['email'])
  48. self.assertEqual(new_user.username, form_data['username'])
  49. if activation == 'none':
  50. self.assertEqual(new_user.requires_activation, UserModel.ACTIVATION_NONE)
  51. if activation == 'user':
  52. if email_verified:
  53. self.assertEqual(new_user.requires_activation, UserModel.ACTIVATION_NONE)
  54. else:
  55. self.assertEqual(new_user.requires_activation, UserModel.ACTIVATION_USER)
  56. if activation == 'admin':
  57. self.assertEqual(new_user.requires_activation, UserModel.ACTIVATION_ADMIN)
  58. self.assertEqual(new_user.audittrail_set.count(), 1)
  59. def assertJsonResponseEquals(self, response, value):
  60. response_content = response.content.decode("utf-8")
  61. response_json = json.loads(response_content)
  62. self.assertEqual(response_json, value)
  63. class AssociateByEmailTests(PipelineTestCase):
  64. def test_skip_if_user_is_already_set(self):
  65. """pipeline step is skipped if user was found by previous step"""
  66. result = associate_by_email(None, {}, GithubOAuth2, self.user)
  67. self.assertIsNone(result)
  68. def test_skip_if_no_email_passed(self):
  69. """pipeline step is skipped if no email was passed"""
  70. result = associate_by_email(None, {}, GithubOAuth2)
  71. self.assertIsNone(result)
  72. def test_skip_if_user_with_email_not_found(self):
  73. """pipeline step is skipped if no email was passed"""
  74. result = associate_by_email(None, {'email': 'not@found.com'}, GithubOAuth2)
  75. self.assertIsNone(result)
  76. def test_raise_if_user_is_inactive(self):
  77. """pipeline raises if user was inactive"""
  78. self.user.is_active = False
  79. self.user.save()
  80. try:
  81. associate_by_email(None, {'email': self.user.email}, GithubOAuth2)
  82. self.fail("associate_by_email should raise SocialAuthFailed")
  83. except SocialAuthFailed as e:
  84. self.assertEqual(
  85. e.message,
  86. (
  87. "The e-mail address associated with your GitHub account is not available for "
  88. "use on this site."
  89. ),
  90. )
  91. def test_raise_if_user_needs_admin_activation(self):
  92. """pipeline raises if user needs admin activation"""
  93. self.user.requires_activation = UserModel.ACTIVATION_ADMIN
  94. self.user.save()
  95. try:
  96. associate_by_email(None, {'email': self.user.email}, GithubOAuth2)
  97. self.fail("associate_by_email should raise SocialAuthFailed")
  98. except SocialAuthFailed as e:
  99. self.assertEqual(
  100. e.message,
  101. (
  102. "Your account has to be activated by site administrator before you will be "
  103. "able to sign in with GitHub."
  104. ),
  105. )
  106. def test_return_user(self):
  107. """pipeline returns user if email was found"""
  108. result = associate_by_email(None, {'email': self.user.email}, GithubOAuth2)
  109. self.assertEqual(result, {'user': self.user, 'is_new': False})
  110. def test_return_user_email_inactive(self):
  111. """pipeline returns user even if they didn't activate their account manually"""
  112. self.user.requires_activation = UserModel.ACTIVATION_USER
  113. self.user.save()
  114. result = associate_by_email(None, {'email': self.user.email}, GithubOAuth2)
  115. self.assertEqual(result, {'user': self.user, 'is_new': False})
  116. class CreateUser(PipelineTestCase):
  117. def test_skip_if_user_is_set(self):
  118. """pipeline step is skipped if user was passed"""
  119. result = create_user(MockStrategy(), {}, GithubOAuth2(), user=self.user)
  120. self.assertIsNone(result)
  121. def test_skip_if_no_email_passed(self):
  122. """pipeline step is skipped if no email was passed"""
  123. result = create_user(
  124. MockStrategy(),
  125. {},
  126. GithubOAuth2(),
  127. clean_username='TestBob',
  128. )
  129. self.assertIsNone(result)
  130. def test_skip_if_no_clean_username_passed(self):
  131. """pipeline step is skipped if cleaned username wasnt passed"""
  132. result = create_user(
  133. MockStrategy(),
  134. {'email': 'hello@example.com'},
  135. GithubOAuth2(),
  136. )
  137. self.assertIsNone(result)
  138. def test_skip_if_email_is_taken(self):
  139. """pipeline step is skipped if email was taken"""
  140. result = create_user(
  141. MockStrategy(),
  142. {'email': self.user.email},
  143. GithubOAuth2(),
  144. clean_username='NewUser',
  145. )
  146. self.assertIsNone(result)
  147. @override_settings(account_activation='none')
  148. def test_user_created_no_activation(self):
  149. """pipeline step creates active user for valid data and disabled activation"""
  150. result = create_user(
  151. MockStrategy(),
  152. {'email': 'new@example.com'},
  153. GithubOAuth2(),
  154. clean_username='NewUser',
  155. )
  156. new_user = UserModel.objects.get(email='new@example.com')
  157. self.assertEqual(result, {
  158. 'user': new_user,
  159. 'is_new': True,
  160. })
  161. self.assertEqual(new_user.username, 'NewUser')
  162. self.assertNewUserIsCorrect(new_user, email_verified=True, activation='none')
  163. @override_settings(account_activation='user')
  164. def test_user_created_activation_by_user(self):
  165. """pipeline step creates active user for valid data and user activation"""
  166. result = create_user(
  167. MockStrategy(),
  168. {'email': 'new@example.com'},
  169. GithubOAuth2(),
  170. clean_username='NewUser',
  171. )
  172. new_user = UserModel.objects.get(email='new@example.com')
  173. self.assertEqual(result, {
  174. 'user': new_user,
  175. 'is_new': True,
  176. })
  177. self.assertEqual(new_user.username, 'NewUser')
  178. self.assertNewUserIsCorrect(new_user, email_verified=True, activation='user')
  179. @override_settings(account_activation='admin')
  180. def test_user_created_activation_by_admin(self):
  181. """pipeline step creates in user for valid data and admin activation"""
  182. result = create_user(
  183. MockStrategy(),
  184. {'email': 'new@example.com'},
  185. GithubOAuth2(),
  186. clean_username='NewUser',
  187. )
  188. new_user = UserModel.objects.get(email='new@example.com')
  189. self.assertEqual(result, {
  190. 'user': new_user,
  191. 'is_new': True,
  192. })
  193. self.assertEqual(new_user.username, 'NewUser')
  194. self.assertNewUserIsCorrect(new_user, email_verified=True, activation='admin')
  195. class CreateUserWithFormTests(PipelineTestCase):
  196. def setUp(self):
  197. super().setUp()
  198. Agreement.objects.invalidate_cache()
  199. def tearDown(self):
  200. super().tearDown()
  201. Agreement.objects.invalidate_cache()
  202. def test_skip_if_user_is_set(self):
  203. """pipeline step is skipped if user was passed"""
  204. request = create_request()
  205. strategy = load_strategy(request=request)
  206. backend = GithubOAuth2(strategy, '/')
  207. result = create_user_with_form(
  208. strategy=strategy,
  209. details={},
  210. backend=backend,
  211. user=self.user,
  212. pipeline_index=1,
  213. )
  214. self.assertEqual(result, {})
  215. def test_renders_form_if_not_post(self):
  216. """pipeline step renders form if not POST"""
  217. request = create_request()
  218. strategy = load_strategy(request=request)
  219. backend = GithubOAuth2(strategy, '/')
  220. response = create_user_with_form(
  221. strategy=strategy,
  222. details={},
  223. backend=backend,
  224. user=None,
  225. pipeline_index=1,
  226. )
  227. self.assertContains(response, "GitHub")
  228. def test_empty_data_rejected(self):
  229. """form rejects empty data"""
  230. request = create_request(data={})
  231. strategy = load_strategy(request=request)
  232. backend = GithubOAuth2(strategy, '/')
  233. response = create_user_with_form(
  234. strategy=strategy,
  235. details={},
  236. backend=backend,
  237. user=None,
  238. pipeline_index=1,
  239. )
  240. self.assertEqual(response.status_code, 400)
  241. self.assertJsonResponseEquals(response, {
  242. 'email': ["This field is required."],
  243. 'username': ["This field is required."],
  244. })
  245. def test_taken_data_rejected(self):
  246. """form rejects taken data"""
  247. request = create_request(data={
  248. 'email': self.user.email,
  249. 'username': self.user.username,
  250. })
  251. strategy = load_strategy(request=request)
  252. backend = GithubOAuth2(strategy, '/')
  253. response = create_user_with_form(
  254. strategy=strategy,
  255. details={},
  256. backend=backend,
  257. user=None,
  258. pipeline_index=1,
  259. )
  260. self.assertEqual(response.status_code, 400)
  261. self.assertJsonResponseEquals(response, {
  262. 'email': ["This e-mail address is not available."],
  263. 'username': ["This username is not available."],
  264. })
  265. @override_settings(account_activation='none')
  266. def test_user_created_no_activation_verified_email(self):
  267. """active user is created for verified email and activation disabled"""
  268. form_data = {
  269. 'email': 'social@auth.com',
  270. 'username': 'SocialUser',
  271. }
  272. request = create_request(data=form_data)
  273. strategy = load_strategy(request=request)
  274. backend = GithubOAuth2(strategy, '/')
  275. result = create_user_with_form(
  276. strategy=strategy,
  277. details={'email': form_data['email']},
  278. backend=backend,
  279. user=None,
  280. pipeline_index=1,
  281. )
  282. new_user = UserModel.objects.get(email='social@auth.com')
  283. self.assertEqual(result, {'user': new_user, 'is_new': True})
  284. self.assertNewUserIsCorrect(new_user, form_data, activation='none', email_verified=True)
  285. @override_settings(account_activation='none')
  286. def test_user_created_no_activation_nonverified_email(self):
  287. """active user is created for non-verified email and activation disabled"""
  288. form_data = {
  289. 'email': 'social@auth.com',
  290. 'username': 'SocialUser',
  291. }
  292. request = create_request(data=form_data)
  293. strategy = load_strategy(request=request)
  294. backend = GithubOAuth2(strategy, '/')
  295. result = create_user_with_form(
  296. strategy=strategy,
  297. details={'email': ''},
  298. backend=backend,
  299. user=None,
  300. pipeline_index=1,
  301. )
  302. new_user = UserModel.objects.get(email='social@auth.com')
  303. self.assertEqual(result, {'user': new_user, 'is_new': True})
  304. self.assertNewUserIsCorrect(new_user, form_data, activation='none', email_verified=False)
  305. @override_settings(account_activation='user')
  306. def test_user_created_activation_by_user_verified_email(self):
  307. """active user is created for verified email and activation by user"""
  308. form_data = {
  309. 'email': 'social@auth.com',
  310. 'username': 'SocialUser',
  311. }
  312. request = create_request(data=form_data)
  313. strategy = load_strategy(request=request)
  314. backend = GithubOAuth2(strategy, '/')
  315. result = create_user_with_form(
  316. strategy=strategy,
  317. details={'email': form_data['email']},
  318. backend=backend,
  319. user=None,
  320. pipeline_index=1,
  321. )
  322. new_user = UserModel.objects.get(email='social@auth.com')
  323. self.assertEqual(result, {'user': new_user, 'is_new': True})
  324. self.assertNewUserIsCorrect(new_user, form_data, activation='user', email_verified=True)
  325. @override_settings(account_activation='user')
  326. def test_user_created_activation_by_user_nonverified_email(self):
  327. """inactive user is created for non-verified email and activation by user"""
  328. form_data = {
  329. 'email': 'social@auth.com',
  330. 'username': 'SocialUser',
  331. }
  332. request = create_request(data=form_data)
  333. strategy = load_strategy(request=request)
  334. backend = GithubOAuth2(strategy, '/')
  335. result = create_user_with_form(
  336. strategy=strategy,
  337. details={'email': ''},
  338. backend=backend,
  339. user=None,
  340. pipeline_index=1,
  341. )
  342. new_user = UserModel.objects.get(email='social@auth.com')
  343. self.assertEqual(result, {'user': new_user, 'is_new': True})
  344. self.assertNewUserIsCorrect(new_user, form_data, activation='user', email_verified=False)
  345. @override_settings(account_activation='admin')
  346. def test_user_created_activation_by_admin_verified_email(self):
  347. """inactive user is created for verified email and activation by admin"""
  348. form_data = {
  349. 'email': 'social@auth.com',
  350. 'username': 'SocialUser',
  351. }
  352. request = create_request(data=form_data)
  353. strategy = load_strategy(request=request)
  354. backend = GithubOAuth2(strategy, '/')
  355. result = create_user_with_form(
  356. strategy=strategy,
  357. details={'email': form_data['email']},
  358. backend=backend,
  359. user=None,
  360. pipeline_index=1,
  361. )
  362. new_user = UserModel.objects.get(email='social@auth.com')
  363. self.assertEqual(result, {'user': new_user, 'is_new': True})
  364. self.assertNewUserIsCorrect(new_user, form_data, activation='admin', email_verified=True)
  365. @override_settings(account_activation='admin')
  366. def test_user_created_activation_by_admin_nonverified_email(self):
  367. """inactive user is created for non-verified email and activation by admin"""
  368. form_data = {
  369. 'email': 'social@auth.com',
  370. 'username': 'SocialUser',
  371. }
  372. request = create_request(data=form_data)
  373. strategy = load_strategy(request=request)
  374. backend = GithubOAuth2(strategy, '/')
  375. result = create_user_with_form(
  376. strategy=strategy,
  377. details={'email': ''},
  378. backend=backend,
  379. user=None,
  380. pipeline_index=1,
  381. )
  382. new_user = UserModel.objects.get(email='social@auth.com')
  383. self.assertEqual(result, {'user': new_user, 'is_new': True})
  384. self.assertNewUserIsCorrect(new_user, form_data, activation='admin', email_verified=False)
  385. def test_form_check_agreement(self):
  386. """social register checks agreement"""
  387. form_data = {
  388. 'email': 'social@auth.com',
  389. 'username': 'SocialUser',
  390. }
  391. request = create_request(data=form_data)
  392. strategy = load_strategy(request=request)
  393. backend = GithubOAuth2(strategy, '/')
  394. agreement = Agreement.objects.create(
  395. type=Agreement.TYPE_TOS,
  396. text="Lorem ipsum",
  397. is_active=True,
  398. )
  399. response = create_user_with_form(
  400. strategy=strategy,
  401. details={'email': form_data['email']},
  402. backend=backend,
  403. user=None,
  404. pipeline_index=1,
  405. )
  406. self.assertEqual(response.status_code, 400)
  407. self.assertJsonResponseEquals(response, {
  408. 'terms_of_service': ['This agreement is required.'],
  409. })
  410. # invalid agreement id
  411. form_data = {
  412. 'email': 'social@auth.com',
  413. 'username': 'SocialUser',
  414. 'terms_of_service': agreement.id + 1,
  415. }
  416. request = create_request(data=form_data)
  417. strategy = load_strategy(request=request)
  418. backend = GithubOAuth2(strategy, '/')
  419. response = create_user_with_form(
  420. strategy=strategy,
  421. details={'email': form_data['email']},
  422. backend=backend,
  423. user=None,
  424. pipeline_index=1,
  425. )
  426. self.assertEqual(response.status_code, 400)
  427. self.assertJsonResponseEquals(response, {
  428. 'terms_of_service': ['This agreement is required.'],
  429. })
  430. # valid agreement id
  431. form_data = {
  432. 'email': 'social@auth.com',
  433. 'username': 'SocialUser',
  434. 'terms_of_service': agreement.id,
  435. }
  436. request = create_request(data=form_data)
  437. strategy = load_strategy(request=request)
  438. backend = GithubOAuth2(strategy, '/')
  439. result = create_user_with_form(
  440. strategy=strategy,
  441. details={'email': form_data['email']},
  442. backend=backend,
  443. user=None,
  444. pipeline_index=1,
  445. )
  446. new_user = UserModel.objects.get(email='social@auth.com')
  447. self.assertEqual(result, {'user': new_user, 'is_new': True})
  448. self.assertEqual(new_user.agreements, [agreement.id])
  449. self.assertEqual(new_user.useragreement_set.count(), 1)
  450. def test_form_ignore_inactive_agreement(self):
  451. """social register ignores inactive agreement"""
  452. form_data = {
  453. 'email': 'social@auth.com',
  454. 'username': 'SocialUser',
  455. 'terms_of_service': None,
  456. }
  457. request = create_request(data=form_data)
  458. strategy = load_strategy(request=request)
  459. backend = GithubOAuth2(strategy, '/')
  460. Agreement.objects.create(
  461. type=Agreement.TYPE_TOS,
  462. text="Lorem ipsum",
  463. is_active=False,
  464. )
  465. result = create_user_with_form(
  466. strategy=strategy,
  467. details={'email': form_data['email']},
  468. backend=backend,
  469. user=None,
  470. pipeline_index=1,
  471. )
  472. new_user = UserModel.objects.get(email='social@auth.com')
  473. self.assertEqual(result, {'user': new_user, 'is_new': True})
  474. self.assertEqual(new_user.agreements, [])
  475. self.assertEqual(new_user.useragreement_set.count(), 0)
  476. class GetUsernameTests(PipelineTestCase):
  477. def test_skip_if_user_is_set(self):
  478. """pipeline step is skipped if user was passed"""
  479. result = get_username(None, {}, None, user=self.user)
  480. self.assertIsNone(result)
  481. def test_skip_if_no_names(self):
  482. """pipeline step is skipped if API returned no names"""
  483. result = get_username(None, {}, None)
  484. self.assertIsNone(result)
  485. def test_resolve_to_username(self):
  486. """pipeline step resolves username"""
  487. result = get_username(None, {'username': 'BobBoberson'}, None)
  488. self.assertEqual(result, {'clean_username': 'BobBoberson'})
  489. def test_normalize_username(self):
  490. """pipeline step normalizes username"""
  491. result = get_username(None, {'username': 'Błop Błoperson'}, None)
  492. self.assertEqual(result, {'clean_username': 'BlopBloperson'})
  493. def test_resolve_to_first_name(self):
  494. """pipeline attempts to use first name because username is taken"""
  495. details = {
  496. 'username': self.user.username,
  497. 'first_name': 'Błob',
  498. }
  499. result = get_username(None, details, None)
  500. self.assertEqual(result, {'clean_username': 'Blob'})
  501. def test_dont_resolve_to_last_name(self):
  502. """pipeline will not fallback to last name because username is taken"""
  503. details = {
  504. 'username': self.user.username,
  505. 'last_name': 'Błob',
  506. }
  507. result = get_username(None, details, None)
  508. self.assertIsNone(result)
  509. def test_resolve_to_first_last_name_first_char(self):
  510. """pipeline will construct username from first name and first char of surname"""
  511. details = {
  512. 'first_name': self.user.username,
  513. 'last_name': 'Błob',
  514. }
  515. result = get_username(None, details, None)
  516. self.assertEqual(result, {'clean_username': self.user.username + 'B'})
  517. def test_dont_resolve_to_banned_name(self):
  518. """pipeline will not resolve to banned name"""
  519. Ban.objects.create(banned_value='*Admin*', check_type=Ban.USERNAME)
  520. details = {
  521. 'username': 'Misago Admin',
  522. 'first_name': 'Błob',
  523. }
  524. result = get_username(None, details, None)
  525. self.assertEqual(result, {'clean_username': 'Blob'})
  526. def test_resolve_full_name(self):
  527. """pipeline will resolve to full name"""
  528. Ban.objects.create(banned_value='*Admin*', check_type=Ban.USERNAME)
  529. details = {
  530. 'username': 'Misago Admin',
  531. 'full_name': 'Błob Błopo',
  532. }
  533. result = get_username(None, details, None)
  534. self.assertEqual(result, {'clean_username': 'BlobBlopo'})
  535. def test_resolve_to_cut_name(self):
  536. """pipeline will resolve cut too long name on second pass"""
  537. details = {
  538. 'username': 'Abrakadabrapokuskonstantynopolitańczykowianeczkatrzy',
  539. }
  540. result = get_username(None, details, None)
  541. self.assertEqual(result, {'clean_username': 'Abrakadabrapok'})
  542. class RequireActivationTests(PipelineTestCase):
  543. def setUp(self):
  544. super().setUp()
  545. self.user.requires_activation = UserModel.ACTIVATION_ADMIN
  546. self.user.save()
  547. def test_skip_if_user_not_set(self):
  548. """pipeline step is skipped if user is not set"""
  549. request = create_request()
  550. strategy = load_strategy(request=request)
  551. backend = GithubOAuth2(strategy, '/')
  552. result = require_activation(
  553. strategy=strategy,
  554. details={},
  555. backend=backend,
  556. user=None,
  557. pipeline_index=1,
  558. )
  559. self.assertEqual(result, {})
  560. def test_partial_token_if_user_not_set_no_showstopper(self):
  561. """pipeline step handles set session token if user is not set"""
  562. request = create_request()
  563. strategy = load_strategy(request=request)
  564. strategy.request.session['partial_pipeline_token'] = 'test-token'
  565. backend = GithubOAuth2(strategy, '/')
  566. require_activation(
  567. strategy=strategy,
  568. details={},
  569. backend=backend,
  570. user=None,
  571. pipeline_index=1,
  572. )
  573. def test_skip_if_user_is_active(self):
  574. """pipeline step is skipped if user is active"""
  575. self.user.requires_activation = UserModel.ACTIVATION_NONE
  576. self.user.save()
  577. self.assertFalse(self.user.requires_activation)
  578. request = create_request()
  579. strategy = load_strategy(request=request)
  580. backend = GithubOAuth2(strategy, '/')
  581. result = require_activation(
  582. strategy=strategy,
  583. details={},
  584. backend=backend,
  585. user=self.user,
  586. pipeline_index=1,
  587. )
  588. self.assertEqual(result, {})
  589. def test_pipeline_returns_html_responseon_get(self):
  590. """pipeline step renders http response for GET request and inactive user"""
  591. request = create_request()
  592. strategy = load_strategy(request=request)
  593. backend = GithubOAuth2(strategy, '/')
  594. response = require_activation(
  595. strategy=strategy,
  596. details={},
  597. backend=backend,
  598. user=self.user,
  599. pipeline_index=1,
  600. )
  601. self.assertEqual(response.status_code, 200)
  602. self.assertEqual(response['content-type'], 'text/html; charset=utf-8')
  603. def test_pipeline_returns_json_response_on_post(self):
  604. """pipeline step renders json response for POST request and inactive user"""
  605. request = create_request(data={'username': 'anything'})
  606. strategy = load_strategy(request=request)
  607. backend = GithubOAuth2(strategy, '/')
  608. response = require_activation(
  609. strategy=strategy,
  610. details={},
  611. backend=backend,
  612. user=self.user,
  613. pipeline_index=1,
  614. )
  615. self.assertEqual(response.status_code, 200)
  616. self.assertEqual(response['content-type'], 'application/json')
  617. self.assertJsonResponseEquals(response, {
  618. 'step': 'done',
  619. 'backend_name': 'GitHub',
  620. 'activation': 'admin',
  621. 'email': 'test@user.com',
  622. 'username': 'TestUser',
  623. })
  624. class ValidateIpNotBannedTests(PipelineTestCase):
  625. def test_skip_if_user_not_set(self):
  626. """pipeline step is skipped if no user was passed"""
  627. result = validate_ip_not_banned(None, {}, GithubOAuth2)
  628. self.assertIsNone(result)
  629. def test_raise_if_banned(self):
  630. """pipeline raises if user's IP is banned"""
  631. Ban.objects.create(banned_value='188.*', check_type=Ban.IP)
  632. try:
  633. validate_ip_not_banned(MockStrategy(user_ip='188.1.2.3'), {}, GithubOAuth2, self.user)
  634. self.fail("validate_ip_not_banned should raise SocialAuthBanned")
  635. except SocialAuthBanned as e:
  636. self.assertTrue(isinstance(e.ban, Ban))
  637. def test_exclude_staff(self):
  638. """pipeline excludes staff from bans"""
  639. self.user.is_staff = True
  640. self.user.save()
  641. Ban.objects.create(banned_value='188.*', check_type=Ban.IP)
  642. result = validate_ip_not_banned(
  643. MockStrategy(user_ip='188.1.2.3'), {}, GithubOAuth2, self.user)
  644. self.assertIsNone(result)
  645. class ValidateUserNotBannedTests(PipelineTestCase):
  646. def test_skip_if_user_not_set(self):
  647. """pipeline step is skipped if no user was passed"""
  648. result = validate_user_not_banned(None, {}, GithubOAuth2)
  649. self.assertIsNone(result)
  650. def test_raise_if_banned(self):
  651. """pipeline raises if user's IP is banned"""
  652. Ban.objects.create(banned_value=self.user.username, check_type=Ban.USERNAME)
  653. try:
  654. validate_user_not_banned(MockStrategy(), {}, GithubOAuth2, self.user)
  655. self.fail("validate_ip_not_banned should raise SocialAuthBanned")
  656. except SocialAuthBanned as e:
  657. self.assertEqual(e.ban.user, self.user)
  658. self.assertTrue(isinstance(e.ban, BanCache))
  659. def test_exclude_staff(self):
  660. """pipeline excludes staff from bans"""
  661. self.user.is_staff = True
  662. self.user.save()
  663. Ban.objects.create(banned_value=self.user.username, check_type=Ban.USERNAME)
  664. result = validate_user_not_banned(MockStrategy(), {}, GithubOAuth2, self.user)
  665. self.assertIsNone(result)