test_social_pipeline.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798
  1. import json
  2. from django.contrib.auth import get_user_model
  3. from django.core import mail
  4. from django.test import RequestFactory, override_settings
  5. from social_core.backends.github import GithubOAuth2
  6. from social_django.utils import load_strategy
  7. from misago.acl import ACL_CACHE
  8. from misago.acl.useracl import get_user_acl
  9. from misago.conf.dynamicsettings import DynamicSettings
  10. from misago.core.exceptions import SocialAuthFailed, SocialAuthBanned
  11. from misago.conftest import get_cache_versions
  12. from misago.legal.models import Agreement
  13. from misago.users.models import AnonymousUser, Ban, BanCache
  14. from misago.users.social.pipeline import (
  15. associate_by_email, create_user, create_user_with_form, get_username, require_activation,
  16. validate_ip_not_banned, validate_user_not_banned
  17. )
  18. from misago.users.testutils import UserTestCase
  19. UserModel = get_user_model()
  20. def create_request(user_ip='0.0.0.0', data=None):
  21. factory = RequestFactory()
  22. if data is None:
  23. request = factory.get('/')
  24. else:
  25. request = factory.post('/', data=json.dumps(data), content_type='application/json')
  26. request.include_frontend_context = True
  27. request.cache_versions = get_cache_versions()
  28. request.frontend_context = {}
  29. request.session = {}
  30. request.settings = DynamicSettings(request.cache_versions)
  31. request.user = AnonymousUser()
  32. request.user_acl = get_user_acl(request.user, request.cache_versions)
  33. request.user_ip = user_ip
  34. return request
  35. class MockStrategy(object):
  36. def __init__(self, user_ip='0.0.0.0'):
  37. self.cleaned_partial_token = None
  38. self.request = create_request(user_ip)
  39. def clean_partial_pipeline(self, token):
  40. self.cleaned_partial_token = token
  41. class PipelineTestCase(UserTestCase):
  42. def get_initial_user(self):
  43. self.user = self.get_authenticated_user()
  44. def assertNewUserIsCorrect(
  45. self, new_user, form_data=None, activation=None, email_verified=False):
  46. self.assertFalse(new_user.has_usable_password())
  47. self.assertIn('Welcome', mail.outbox[0].subject)
  48. if form_data:
  49. self.assertEqual(new_user.email, form_data['email'])
  50. self.assertEqual(new_user.username, form_data['username'])
  51. if activation == 'none':
  52. self.assertEqual(new_user.requires_activation, UserModel.ACTIVATION_NONE)
  53. if activation == 'user':
  54. if email_verified:
  55. self.assertEqual(new_user.requires_activation, UserModel.ACTIVATION_NONE)
  56. else:
  57. self.assertEqual(new_user.requires_activation, UserModel.ACTIVATION_USER)
  58. if activation == 'admin':
  59. self.assertEqual(new_user.requires_activation, UserModel.ACTIVATION_ADMIN)
  60. self.assertEqual(new_user.audittrail_set.count(), 1)
  61. def assertJsonResponseEquals(self, response, value):
  62. response_content = response.content.decode("utf-8")
  63. response_json = json.loads(response_content)
  64. self.assertEqual(response_json, value)
  65. class AssociateByEmailTests(PipelineTestCase):
  66. def test_skip_if_user_is_already_set(self):
  67. """pipeline step is skipped if user was found by previous step"""
  68. result = associate_by_email(None, {}, GithubOAuth2, self.user)
  69. self.assertIsNone(result)
  70. def test_skip_if_no_email_passed(self):
  71. """pipeline step is skipped if no email was passed"""
  72. result = associate_by_email(None, {}, GithubOAuth2)
  73. self.assertIsNone(result)
  74. def test_skip_if_user_with_email_not_found(self):
  75. """pipeline step is skipped if no email was passed"""
  76. result = associate_by_email(None, {'email': 'not@found.com'}, GithubOAuth2)
  77. self.assertIsNone(result)
  78. def test_raise_if_user_is_inactive(self):
  79. """pipeline raises if user was inactive"""
  80. self.user.is_active = False
  81. self.user.save()
  82. try:
  83. associate_by_email(None, {'email': self.user.email}, GithubOAuth2)
  84. self.fail("associate_by_email should raise SocialAuthFailed")
  85. except SocialAuthFailed as e:
  86. self.assertEqual(
  87. e.message,
  88. (
  89. "The e-mail address associated with your GitHub account is not available for "
  90. "use on this site."
  91. ),
  92. )
  93. def test_raise_if_user_needs_admin_activation(self):
  94. """pipeline raises if user needs admin activation"""
  95. self.user.requires_activation = UserModel.ACTIVATION_ADMIN
  96. self.user.save()
  97. try:
  98. associate_by_email(None, {'email': self.user.email}, GithubOAuth2)
  99. self.fail("associate_by_email should raise SocialAuthFailed")
  100. except SocialAuthFailed as e:
  101. self.assertEqual(
  102. e.message,
  103. (
  104. "Your account has to be activated by site administrator before you will be "
  105. "able to sign in with GitHub."
  106. ),
  107. )
  108. def test_return_user(self):
  109. """pipeline returns user if email was found"""
  110. result = associate_by_email(None, {'email': self.user.email}, GithubOAuth2)
  111. self.assertEqual(result, {'user': self.user, 'is_new': False})
  112. def test_return_user_email_inactive(self):
  113. """pipeline returns user even if they didn't activate their account manually"""
  114. self.user.requires_activation = UserModel.ACTIVATION_USER
  115. self.user.save()
  116. result = associate_by_email(None, {'email': self.user.email}, GithubOAuth2)
  117. self.assertEqual(result, {'user': self.user, 'is_new': False})
  118. class CreateUser(PipelineTestCase):
  119. def test_skip_if_user_is_set(self):
  120. """pipeline step is skipped if user was passed"""
  121. result = create_user(MockStrategy(), {}, GithubOAuth2(), user=self.user)
  122. self.assertIsNone(result)
  123. def test_skip_if_no_email_passed(self):
  124. """pipeline step is skipped if no email was passed"""
  125. result = create_user(
  126. MockStrategy(),
  127. {},
  128. GithubOAuth2(),
  129. clean_username='TestBob',
  130. )
  131. self.assertIsNone(result)
  132. def test_skip_if_no_clean_username_passed(self):
  133. """pipeline step is skipped if cleaned username wasnt passed"""
  134. result = create_user(
  135. MockStrategy(),
  136. {'email': 'hello@example.com'},
  137. GithubOAuth2(),
  138. )
  139. self.assertIsNone(result)
  140. def test_skip_if_email_is_taken(self):
  141. """pipeline step is skipped if email was taken"""
  142. result = create_user(
  143. MockStrategy(),
  144. {'email': self.user.email},
  145. GithubOAuth2(),
  146. clean_username='NewUser',
  147. )
  148. self.assertIsNone(result)
  149. @override_settings(account_activation='none')
  150. def test_user_created_no_activation(self):
  151. """pipeline step creates active user for valid data and disabled activation"""
  152. result = create_user(
  153. MockStrategy(),
  154. {'email': 'new@example.com'},
  155. GithubOAuth2(),
  156. clean_username='NewUser',
  157. )
  158. new_user = UserModel.objects.get(email='new@example.com')
  159. self.assertEqual(result, {
  160. 'user': new_user,
  161. 'is_new': True,
  162. })
  163. self.assertEqual(new_user.username, 'NewUser')
  164. self.assertNewUserIsCorrect(new_user, email_verified=True, activation='none')
  165. @override_settings(account_activation='user')
  166. def test_user_created_activation_by_user(self):
  167. """pipeline step creates active user for valid data and user activation"""
  168. result = create_user(
  169. MockStrategy(),
  170. {'email': 'new@example.com'},
  171. GithubOAuth2(),
  172. clean_username='NewUser',
  173. )
  174. new_user = UserModel.objects.get(email='new@example.com')
  175. self.assertEqual(result, {
  176. 'user': new_user,
  177. 'is_new': True,
  178. })
  179. self.assertEqual(new_user.username, 'NewUser')
  180. self.assertNewUserIsCorrect(new_user, email_verified=True, activation='user')
  181. @override_settings(account_activation='admin')
  182. def test_user_created_activation_by_admin(self):
  183. """pipeline step creates in user for valid data and admin activation"""
  184. result = create_user(
  185. MockStrategy(),
  186. {'email': 'new@example.com'},
  187. GithubOAuth2(),
  188. clean_username='NewUser',
  189. )
  190. new_user = UserModel.objects.get(email='new@example.com')
  191. self.assertEqual(result, {
  192. 'user': new_user,
  193. 'is_new': True,
  194. })
  195. self.assertEqual(new_user.username, 'NewUser')
  196. self.assertNewUserIsCorrect(new_user, email_verified=True, activation='admin')
  197. class CreateUserWithFormTests(PipelineTestCase):
  198. def setUp(self):
  199. super().setUp()
  200. Agreement.objects.invalidate_cache()
  201. def tearDown(self):
  202. super().tearDown()
  203. Agreement.objects.invalidate_cache()
  204. def test_skip_if_user_is_set(self):
  205. """pipeline step is skipped if user was passed"""
  206. request = create_request()
  207. strategy = load_strategy(request=request)
  208. backend = GithubOAuth2(strategy, '/')
  209. result = create_user_with_form(
  210. strategy=strategy,
  211. details={},
  212. backend=backend,
  213. user=self.user,
  214. pipeline_index=1,
  215. )
  216. self.assertEqual(result, {})
  217. def test_renders_form_if_not_post(self):
  218. """pipeline step renders form if not POST"""
  219. request = create_request()
  220. strategy = load_strategy(request=request)
  221. backend = GithubOAuth2(strategy, '/')
  222. response = create_user_with_form(
  223. strategy=strategy,
  224. details={},
  225. backend=backend,
  226. user=None,
  227. pipeline_index=1,
  228. )
  229. self.assertContains(response, "GitHub")
  230. def test_empty_data_rejected(self):
  231. """form rejects empty data"""
  232. request = create_request(data={})
  233. strategy = load_strategy(request=request)
  234. backend = GithubOAuth2(strategy, '/')
  235. response = create_user_with_form(
  236. strategy=strategy,
  237. details={},
  238. backend=backend,
  239. user=None,
  240. pipeline_index=1,
  241. )
  242. self.assertEqual(response.status_code, 400)
  243. self.assertJsonResponseEquals(response, {
  244. 'email': ["This field is required."],
  245. 'username': ["This field is required."],
  246. })
  247. def test_taken_data_rejected(self):
  248. """form rejects taken data"""
  249. request = create_request(data={
  250. 'email': self.user.email,
  251. 'username': self.user.username,
  252. })
  253. strategy = load_strategy(request=request)
  254. backend = GithubOAuth2(strategy, '/')
  255. response = create_user_with_form(
  256. strategy=strategy,
  257. details={},
  258. backend=backend,
  259. user=None,
  260. pipeline_index=1,
  261. )
  262. self.assertEqual(response.status_code, 400)
  263. self.assertJsonResponseEquals(response, {
  264. 'email': ["This e-mail address is not available."],
  265. 'username': ["This username is not available."],
  266. })
  267. @override_settings(account_activation='none')
  268. def test_user_created_no_activation_verified_email(self):
  269. """active user is created for verified email and activation disabled"""
  270. form_data = {
  271. 'email': 'social@auth.com',
  272. 'username': 'SocialUser',
  273. }
  274. request = create_request(data=form_data)
  275. strategy = load_strategy(request=request)
  276. backend = GithubOAuth2(strategy, '/')
  277. result = create_user_with_form(
  278. strategy=strategy,
  279. details={'email': form_data['email']},
  280. backend=backend,
  281. user=None,
  282. pipeline_index=1,
  283. )
  284. new_user = UserModel.objects.get(email='social@auth.com')
  285. self.assertEqual(result, {'user': new_user, 'is_new': True})
  286. self.assertNewUserIsCorrect(new_user, form_data, activation='none', email_verified=True)
  287. @override_settings(account_activation='none')
  288. def test_user_created_no_activation_nonverified_email(self):
  289. """active user is created for non-verified email and activation disabled"""
  290. form_data = {
  291. 'email': 'social@auth.com',
  292. 'username': 'SocialUser',
  293. }
  294. request = create_request(data=form_data)
  295. strategy = load_strategy(request=request)
  296. backend = GithubOAuth2(strategy, '/')
  297. result = create_user_with_form(
  298. strategy=strategy,
  299. details={'email': ''},
  300. backend=backend,
  301. user=None,
  302. pipeline_index=1,
  303. )
  304. new_user = UserModel.objects.get(email='social@auth.com')
  305. self.assertEqual(result, {'user': new_user, 'is_new': True})
  306. self.assertNewUserIsCorrect(new_user, form_data, activation='none', email_verified=False)
  307. @override_settings(account_activation='user')
  308. def test_user_created_activation_by_user_verified_email(self):
  309. """active user is created for verified email and activation by user"""
  310. form_data = {
  311. 'email': 'social@auth.com',
  312. 'username': 'SocialUser',
  313. }
  314. request = create_request(data=form_data)
  315. strategy = load_strategy(request=request)
  316. backend = GithubOAuth2(strategy, '/')
  317. result = create_user_with_form(
  318. strategy=strategy,
  319. details={'email': form_data['email']},
  320. backend=backend,
  321. user=None,
  322. pipeline_index=1,
  323. )
  324. new_user = UserModel.objects.get(email='social@auth.com')
  325. self.assertEqual(result, {'user': new_user, 'is_new': True})
  326. self.assertNewUserIsCorrect(new_user, form_data, activation='user', email_verified=True)
  327. @override_settings(account_activation='user')
  328. def test_user_created_activation_by_user_nonverified_email(self):
  329. """inactive user is created for non-verified email and activation by user"""
  330. form_data = {
  331. 'email': 'social@auth.com',
  332. 'username': 'SocialUser',
  333. }
  334. request = create_request(data=form_data)
  335. strategy = load_strategy(request=request)
  336. backend = GithubOAuth2(strategy, '/')
  337. result = create_user_with_form(
  338. strategy=strategy,
  339. details={'email': ''},
  340. backend=backend,
  341. user=None,
  342. pipeline_index=1,
  343. )
  344. new_user = UserModel.objects.get(email='social@auth.com')
  345. self.assertEqual(result, {'user': new_user, 'is_new': True})
  346. self.assertNewUserIsCorrect(new_user, form_data, activation='user', email_verified=False)
  347. @override_settings(account_activation='admin')
  348. def test_user_created_activation_by_admin_verified_email(self):
  349. """inactive user is created for verified email and activation by admin"""
  350. form_data = {
  351. 'email': 'social@auth.com',
  352. 'username': 'SocialUser',
  353. }
  354. request = create_request(data=form_data)
  355. strategy = load_strategy(request=request)
  356. backend = GithubOAuth2(strategy, '/')
  357. result = create_user_with_form(
  358. strategy=strategy,
  359. details={'email': form_data['email']},
  360. backend=backend,
  361. user=None,
  362. pipeline_index=1,
  363. )
  364. new_user = UserModel.objects.get(email='social@auth.com')
  365. self.assertEqual(result, {'user': new_user, 'is_new': True})
  366. self.assertNewUserIsCorrect(new_user, form_data, activation='admin', email_verified=True)
  367. @override_settings(account_activation='admin')
  368. def test_user_created_activation_by_admin_nonverified_email(self):
  369. """inactive user is created for non-verified email and activation by admin"""
  370. form_data = {
  371. 'email': 'social@auth.com',
  372. 'username': 'SocialUser',
  373. }
  374. request = create_request(data=form_data)
  375. strategy = load_strategy(request=request)
  376. backend = GithubOAuth2(strategy, '/')
  377. result = create_user_with_form(
  378. strategy=strategy,
  379. details={'email': ''},
  380. backend=backend,
  381. user=None,
  382. pipeline_index=1,
  383. )
  384. new_user = UserModel.objects.get(email='social@auth.com')
  385. self.assertEqual(result, {'user': new_user, 'is_new': True})
  386. self.assertNewUserIsCorrect(new_user, form_data, activation='admin', email_verified=False)
  387. def test_form_check_agreement(self):
  388. """social register checks agreement"""
  389. form_data = {
  390. 'email': 'social@auth.com',
  391. 'username': 'SocialUser',
  392. }
  393. request = create_request(data=form_data)
  394. strategy = load_strategy(request=request)
  395. backend = GithubOAuth2(strategy, '/')
  396. agreement = Agreement.objects.create(
  397. type=Agreement.TYPE_TOS,
  398. text="Lorem ipsum",
  399. is_active=True,
  400. )
  401. response = create_user_with_form(
  402. strategy=strategy,
  403. details={'email': form_data['email']},
  404. backend=backend,
  405. user=None,
  406. pipeline_index=1,
  407. )
  408. self.assertEqual(response.status_code, 400)
  409. self.assertJsonResponseEquals(response, {
  410. 'terms_of_service': ['This agreement is required.'],
  411. })
  412. # invalid agreement id
  413. form_data = {
  414. 'email': 'social@auth.com',
  415. 'username': 'SocialUser',
  416. 'terms_of_service': agreement.id + 1,
  417. }
  418. request = create_request(data=form_data)
  419. strategy = load_strategy(request=request)
  420. backend = GithubOAuth2(strategy, '/')
  421. response = create_user_with_form(
  422. strategy=strategy,
  423. details={'email': form_data['email']},
  424. backend=backend,
  425. user=None,
  426. pipeline_index=1,
  427. )
  428. self.assertEqual(response.status_code, 400)
  429. self.assertJsonResponseEquals(response, {
  430. 'terms_of_service': ['This agreement is required.'],
  431. })
  432. # valid agreement id
  433. form_data = {
  434. 'email': 'social@auth.com',
  435. 'username': 'SocialUser',
  436. 'terms_of_service': agreement.id,
  437. }
  438. request = create_request(data=form_data)
  439. strategy = load_strategy(request=request)
  440. backend = GithubOAuth2(strategy, '/')
  441. result = create_user_with_form(
  442. strategy=strategy,
  443. details={'email': form_data['email']},
  444. backend=backend,
  445. user=None,
  446. pipeline_index=1,
  447. )
  448. new_user = UserModel.objects.get(email='social@auth.com')
  449. self.assertEqual(result, {'user': new_user, 'is_new': True})
  450. self.assertEqual(new_user.agreements, [agreement.id])
  451. self.assertEqual(new_user.useragreement_set.count(), 1)
  452. def test_form_ignore_inactive_agreement(self):
  453. """social register ignores inactive agreement"""
  454. form_data = {
  455. 'email': 'social@auth.com',
  456. 'username': 'SocialUser',
  457. 'terms_of_service': None,
  458. }
  459. request = create_request(data=form_data)
  460. strategy = load_strategy(request=request)
  461. backend = GithubOAuth2(strategy, '/')
  462. Agreement.objects.create(
  463. type=Agreement.TYPE_TOS,
  464. text="Lorem ipsum",
  465. is_active=False,
  466. )
  467. result = create_user_with_form(
  468. strategy=strategy,
  469. details={'email': form_data['email']},
  470. backend=backend,
  471. user=None,
  472. pipeline_index=1,
  473. )
  474. new_user = UserModel.objects.get(email='social@auth.com')
  475. self.assertEqual(result, {'user': new_user, 'is_new': True})
  476. self.assertEqual(new_user.agreements, [])
  477. self.assertEqual(new_user.useragreement_set.count(), 0)
  478. class GetUsernameTests(PipelineTestCase):
  479. def test_skip_if_user_is_set(self):
  480. """pipeline step is skipped if user was passed"""
  481. result = get_username(None, {}, None, user=self.user)
  482. self.assertIsNone(result)
  483. def test_skip_if_no_names(self):
  484. """pipeline step is skipped if API returned no names"""
  485. result = get_username(None, {}, None)
  486. self.assertIsNone(result)
  487. def test_resolve_to_username(self):
  488. """pipeline step resolves username"""
  489. result = get_username(None, {'username': 'BobBoberson'}, None)
  490. self.assertEqual(result, {'clean_username': 'BobBoberson'})
  491. def test_normalize_username(self):
  492. """pipeline step normalizes username"""
  493. result = get_username(None, {'username': 'Błop Błoperson'}, None)
  494. self.assertEqual(result, {'clean_username': 'BlopBloperson'})
  495. def test_resolve_to_first_name(self):
  496. """pipeline attempts to use first name because username is taken"""
  497. details = {
  498. 'username': self.user.username,
  499. 'first_name': 'Błob',
  500. }
  501. result = get_username(None, details, None)
  502. self.assertEqual(result, {'clean_username': 'Blob'})
  503. def test_dont_resolve_to_last_name(self):
  504. """pipeline will not fallback to last name because username is taken"""
  505. details = {
  506. 'username': self.user.username,
  507. 'last_name': 'Błob',
  508. }
  509. result = get_username(None, details, None)
  510. self.assertIsNone(result)
  511. def test_resolve_to_first_last_name_first_char(self):
  512. """pipeline will construct username from first name and first char of surname"""
  513. details = {
  514. 'first_name': self.user.username,
  515. 'last_name': 'Błob',
  516. }
  517. result = get_username(None, details, None)
  518. self.assertEqual(result, {'clean_username': self.user.username + 'B'})
  519. def test_dont_resolve_to_banned_name(self):
  520. """pipeline will not resolve to banned name"""
  521. Ban.objects.create(banned_value='*Admin*', check_type=Ban.USERNAME)
  522. details = {
  523. 'username': 'Misago Admin',
  524. 'first_name': 'Błob',
  525. }
  526. result = get_username(None, details, None)
  527. self.assertEqual(result, {'clean_username': 'Blob'})
  528. def test_resolve_full_name(self):
  529. """pipeline will resolve to full name"""
  530. Ban.objects.create(banned_value='*Admin*', check_type=Ban.USERNAME)
  531. details = {
  532. 'username': 'Misago Admin',
  533. 'full_name': 'Błob Błopo',
  534. }
  535. result = get_username(None, details, None)
  536. self.assertEqual(result, {'clean_username': 'BlobBlopo'})
  537. def test_resolve_to_cut_name(self):
  538. """pipeline will resolve cut too long name on second pass"""
  539. details = {
  540. 'username': 'Abrakadabrapokuskonstantynopolitańczykowianeczkatrzy',
  541. }
  542. result = get_username(None, details, None)
  543. self.assertEqual(result, {'clean_username': 'Abrakadabrapok'})
  544. class RequireActivationTests(PipelineTestCase):
  545. def setUp(self):
  546. super().setUp()
  547. self.user.requires_activation = UserModel.ACTIVATION_ADMIN
  548. self.user.save()
  549. def test_skip_if_user_not_set(self):
  550. """pipeline step is skipped if user is not set"""
  551. request = create_request()
  552. strategy = load_strategy(request=request)
  553. backend = GithubOAuth2(strategy, '/')
  554. result = require_activation(
  555. strategy=strategy,
  556. details={},
  557. backend=backend,
  558. user=None,
  559. pipeline_index=1,
  560. )
  561. self.assertEqual(result, {})
  562. def test_partial_token_if_user_not_set_no_showstopper(self):
  563. """pipeline step handles set session token if user is not set"""
  564. request = create_request()
  565. strategy = load_strategy(request=request)
  566. strategy.request.session['partial_pipeline_token'] = 'test-token'
  567. backend = GithubOAuth2(strategy, '/')
  568. require_activation(
  569. strategy=strategy,
  570. details={},
  571. backend=backend,
  572. user=None,
  573. pipeline_index=1,
  574. )
  575. def test_skip_if_user_is_active(self):
  576. """pipeline step is skipped if user is active"""
  577. self.user.requires_activation = UserModel.ACTIVATION_NONE
  578. self.user.save()
  579. self.assertFalse(self.user.requires_activation)
  580. request = create_request()
  581. strategy = load_strategy(request=request)
  582. backend = GithubOAuth2(strategy, '/')
  583. result = require_activation(
  584. strategy=strategy,
  585. details={},
  586. backend=backend,
  587. user=self.user,
  588. pipeline_index=1,
  589. )
  590. self.assertEqual(result, {})
  591. def test_pipeline_returns_html_responseon_get(self):
  592. """pipeline step renders http response for GET request and inactive user"""
  593. request = create_request()
  594. strategy = load_strategy(request=request)
  595. backend = GithubOAuth2(strategy, '/')
  596. response = require_activation(
  597. strategy=strategy,
  598. details={},
  599. backend=backend,
  600. user=self.user,
  601. pipeline_index=1,
  602. )
  603. self.assertEqual(response.status_code, 200)
  604. self.assertEqual(response['content-type'], 'text/html; charset=utf-8')
  605. def test_pipeline_returns_json_response_on_post(self):
  606. """pipeline step renders json response for POST request and inactive user"""
  607. request = create_request(data={'username': 'anything'})
  608. strategy = load_strategy(request=request)
  609. backend = GithubOAuth2(strategy, '/')
  610. response = require_activation(
  611. strategy=strategy,
  612. details={},
  613. backend=backend,
  614. user=self.user,
  615. pipeline_index=1,
  616. )
  617. self.assertEqual(response.status_code, 200)
  618. self.assertEqual(response['content-type'], 'application/json')
  619. self.assertJsonResponseEquals(response, {
  620. 'step': 'done',
  621. 'backend_name': 'GitHub',
  622. 'activation': 'admin',
  623. 'email': 'test@user.com',
  624. 'username': 'TestUser',
  625. })
  626. class ValidateIpNotBannedTests(PipelineTestCase):
  627. def test_skip_if_user_not_set(self):
  628. """pipeline step is skipped if no user was passed"""
  629. result = validate_ip_not_banned(None, {}, GithubOAuth2)
  630. self.assertIsNone(result)
  631. def test_raise_if_banned(self):
  632. """pipeline raises if user's IP is banned"""
  633. Ban.objects.create(banned_value='188.*', check_type=Ban.IP)
  634. try:
  635. validate_ip_not_banned(MockStrategy(user_ip='188.1.2.3'), {}, GithubOAuth2, self.user)
  636. self.fail("validate_ip_not_banned should raise SocialAuthBanned")
  637. except SocialAuthBanned as e:
  638. self.assertTrue(isinstance(e.ban, Ban))
  639. def test_exclude_staff(self):
  640. """pipeline excludes staff from bans"""
  641. self.user.is_staff = True
  642. self.user.save()
  643. Ban.objects.create(banned_value='188.*', check_type=Ban.IP)
  644. result = validate_ip_not_banned(
  645. MockStrategy(user_ip='188.1.2.3'), {}, GithubOAuth2, self.user)
  646. self.assertIsNone(result)
  647. class ValidateUserNotBannedTests(PipelineTestCase):
  648. def test_skip_if_user_not_set(self):
  649. """pipeline step is skipped if no user was passed"""
  650. result = validate_user_not_banned(None, {}, GithubOAuth2)
  651. self.assertIsNone(result)
  652. def test_raise_if_banned(self):
  653. """pipeline raises if user's IP is banned"""
  654. Ban.objects.create(banned_value=self.user.username, check_type=Ban.USERNAME)
  655. try:
  656. validate_user_not_banned(MockStrategy(), {}, GithubOAuth2, self.user)
  657. self.fail("validate_ip_not_banned should raise SocialAuthBanned")
  658. except SocialAuthBanned as e:
  659. self.assertEqual(e.ban.user, self.user)
  660. self.assertTrue(isinstance(e.ban, BanCache))
  661. def test_exclude_staff(self):
  662. """pipeline excludes staff from bans"""
  663. self.user.is_staff = True
  664. self.user.save()
  665. Ban.objects.create(banned_value=self.user.username, check_type=Ban.USERNAME)
  666. result = validate_user_not_banned(MockStrategy(), {}, GithubOAuth2, self.user)
  667. self.assertIsNone(result)