test_social_pipeline.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773
  1. import json
  2. from django.contrib.auth import get_user_model
  3. from django.core import mail
  4. from django.test import RequestFactory
  5. from social_core.backends.github import GithubOAuth2
  6. from social_django.utils import load_strategy
  7. from misago.acl import ACL_CACHE
  8. from misago.acl.useracl import get_user_acl
  9. from misago.conf.dynamicsettings import DynamicSettings
  10. from misago.core.exceptions import SocialAuthFailed, SocialAuthBanned
  11. from misago.conf.test import override_dynamic_settings
  12. from misago.conftest import get_cache_versions
  13. from misago.legal.models import Agreement
  14. from misago.users.models import AnonymousUser, Ban, BanCache
  15. from misago.users.social.pipeline import (
  16. associate_by_email,
  17. create_user,
  18. create_user_with_form,
  19. get_username,
  20. require_activation,
  21. validate_ip_not_banned,
  22. validate_user_not_banned,
  23. )
  24. from misago.users.testutils import UserTestCase
  25. UserModel = get_user_model()
  26. def create_request(user_ip="0.0.0.0", data=None):
  27. factory = RequestFactory()
  28. if data is None:
  29. request = factory.get("/")
  30. else:
  31. request = factory.post(
  32. "/", data=json.dumps(data), content_type="application/json"
  33. )
  34. request.include_frontend_context = True
  35. request.cache_versions = get_cache_versions()
  36. request.frontend_context = {}
  37. request.session = {}
  38. request.settings = DynamicSettings(request.cache_versions)
  39. request.user = AnonymousUser()
  40. request.user_acl = get_user_acl(request.user, request.cache_versions)
  41. request.user_ip = user_ip
  42. return request
  43. def create_strategy():
  44. request = create_request()
  45. return load_strategy(request=request)
  46. class MockStrategy(object):
  47. def __init__(self, user_ip="0.0.0.0"):
  48. self.cleaned_partial_token = None
  49. self.request = create_request(user_ip)
  50. def clean_partial_pipeline(self, token):
  51. self.cleaned_partial_token = token
  52. class PipelineTestCase(UserTestCase):
  53. def get_initial_user(self):
  54. self.user = self.get_authenticated_user()
  55. def assertNewUserIsCorrect(
  56. self, new_user, form_data=None, activation=None, email_verified=False
  57. ):
  58. self.assertFalse(new_user.has_usable_password())
  59. self.assertIn("Welcome", mail.outbox[0].subject)
  60. if form_data:
  61. self.assertEqual(new_user.email, form_data["email"])
  62. self.assertEqual(new_user.username, form_data["username"])
  63. if activation == "none":
  64. self.assertEqual(new_user.requires_activation, UserModel.ACTIVATION_NONE)
  65. if activation == "user":
  66. if email_verified:
  67. self.assertEqual(
  68. new_user.requires_activation, UserModel.ACTIVATION_NONE
  69. )
  70. else:
  71. self.assertEqual(
  72. new_user.requires_activation, UserModel.ACTIVATION_USER
  73. )
  74. if activation == "admin":
  75. self.assertEqual(new_user.requires_activation, UserModel.ACTIVATION_ADMIN)
  76. self.assertEqual(new_user.audittrail_set.count(), 1)
  77. def assertJsonResponseEquals(self, response, value):
  78. response_content = response.content.decode("utf-8")
  79. response_json = json.loads(response_content)
  80. self.assertEqual(response_json, value)
  81. class AssociateByEmailTests(PipelineTestCase):
  82. def test_skip_if_user_is_already_set(self):
  83. """pipeline step is skipped if user was found by previous step"""
  84. result = associate_by_email(None, {}, GithubOAuth2, self.user)
  85. self.assertIsNone(result)
  86. def test_skip_if_no_email_passed(self):
  87. """pipeline step is skipped if no email was passed"""
  88. result = associate_by_email(None, {}, GithubOAuth2)
  89. self.assertIsNone(result)
  90. def test_skip_if_user_with_email_not_found(self):
  91. """pipeline step is skipped if no email was passed"""
  92. result = associate_by_email(None, {"email": "not@found.com"}, GithubOAuth2)
  93. self.assertIsNone(result)
  94. def test_raise_if_user_is_inactive(self):
  95. """pipeline raises if user was inactive"""
  96. self.user.is_active = False
  97. self.user.save()
  98. try:
  99. associate_by_email(None, {"email": self.user.email}, GithubOAuth2)
  100. self.fail("associate_by_email should raise SocialAuthFailed")
  101. except SocialAuthFailed as e:
  102. self.assertEqual(
  103. e.message,
  104. (
  105. "The e-mail address associated with your GitHub account is not available for "
  106. "use on this site."
  107. ),
  108. )
  109. def test_raise_if_user_needs_admin_activation(self):
  110. """pipeline raises if user needs admin activation"""
  111. self.user.requires_activation = UserModel.ACTIVATION_ADMIN
  112. self.user.save()
  113. try:
  114. associate_by_email(None, {"email": self.user.email}, GithubOAuth2)
  115. self.fail("associate_by_email should raise SocialAuthFailed")
  116. except SocialAuthFailed as e:
  117. self.assertEqual(
  118. e.message,
  119. (
  120. "Your account has to be activated by site administrator before you will be "
  121. "able to sign in with GitHub."
  122. ),
  123. )
  124. def test_return_user(self):
  125. """pipeline returns user if email was found"""
  126. result = associate_by_email(None, {"email": self.user.email}, GithubOAuth2)
  127. self.assertEqual(result, {"user": self.user, "is_new": False})
  128. def test_return_user_email_inactive(self):
  129. """pipeline returns user even if they didn't activate their account manually"""
  130. self.user.requires_activation = UserModel.ACTIVATION_USER
  131. self.user.save()
  132. result = associate_by_email(None, {"email": self.user.email}, GithubOAuth2)
  133. self.assertEqual(result, {"user": self.user, "is_new": False})
  134. class CreateUser(PipelineTestCase):
  135. def test_skip_if_user_is_set(self):
  136. """pipeline step is skipped if user was passed"""
  137. result = create_user(MockStrategy(), {}, GithubOAuth2(), user=self.user)
  138. self.assertIsNone(result)
  139. def test_skip_if_no_email_passed(self):
  140. """pipeline step is skipped if no email was passed"""
  141. result = create_user(
  142. MockStrategy(), {}, GithubOAuth2(), clean_username="TestBob"
  143. )
  144. self.assertIsNone(result)
  145. def test_skip_if_no_clean_username_passed(self):
  146. """pipeline step is skipped if cleaned username wasnt passed"""
  147. result = create_user(
  148. MockStrategy(), {"email": "hello@example.com"}, GithubOAuth2()
  149. )
  150. self.assertIsNone(result)
  151. def test_skip_if_email_is_taken(self):
  152. """pipeline step is skipped if email was taken"""
  153. result = create_user(
  154. MockStrategy(),
  155. {"email": self.user.email},
  156. GithubOAuth2(),
  157. clean_username="NewUser",
  158. )
  159. self.assertIsNone(result)
  160. @override_dynamic_settings(account_activation="none")
  161. def test_user_created_no_activation(self):
  162. """pipeline step creates active user for valid data and disabled activation"""
  163. result = create_user(
  164. MockStrategy(),
  165. {"email": "new@example.com"},
  166. GithubOAuth2(),
  167. clean_username="NewUser",
  168. )
  169. new_user = UserModel.objects.get(email="new@example.com")
  170. self.assertEqual(result, {"user": new_user, "is_new": True})
  171. self.assertEqual(new_user.username, "NewUser")
  172. self.assertNewUserIsCorrect(new_user, email_verified=True, activation="none")
  173. @override_dynamic_settings(account_activation="user")
  174. def test_user_created_activation_by_user(self):
  175. """pipeline step creates active user for valid data and user activation"""
  176. result = create_user(
  177. MockStrategy(),
  178. {"email": "new@example.com"},
  179. GithubOAuth2(),
  180. clean_username="NewUser",
  181. )
  182. new_user = UserModel.objects.get(email="new@example.com")
  183. self.assertEqual(result, {"user": new_user, "is_new": True})
  184. self.assertEqual(new_user.username, "NewUser")
  185. self.assertNewUserIsCorrect(new_user, email_verified=True, activation="user")
  186. @override_dynamic_settings(account_activation="admin")
  187. def test_user_created_activation_by_admin(self):
  188. """pipeline step creates in user for valid data and admin activation"""
  189. result = create_user(
  190. MockStrategy(),
  191. {"email": "new@example.com"},
  192. GithubOAuth2(),
  193. clean_username="NewUser",
  194. )
  195. new_user = UserModel.objects.get(email="new@example.com")
  196. self.assertEqual(result, {"user": new_user, "is_new": True})
  197. self.assertEqual(new_user.username, "NewUser")
  198. self.assertNewUserIsCorrect(new_user, email_verified=True, activation="admin")
  199. class CreateUserWithFormTests(PipelineTestCase):
  200. def setUp(self):
  201. super().setUp()
  202. Agreement.objects.invalidate_cache()
  203. def tearDown(self):
  204. super().tearDown()
  205. Agreement.objects.invalidate_cache()
  206. def test_skip_if_user_is_set(self):
  207. """pipeline step is skipped if user was passed"""
  208. request = create_request()
  209. strategy = load_strategy(request=request)
  210. backend = GithubOAuth2(strategy, "/")
  211. result = create_user_with_form(
  212. strategy=strategy,
  213. details={},
  214. backend=backend,
  215. user=self.user,
  216. pipeline_index=1,
  217. )
  218. self.assertEqual(result, {})
  219. def test_renders_form_if_not_post(self):
  220. """pipeline step renders form if not POST"""
  221. request = create_request()
  222. strategy = load_strategy(request=request)
  223. backend = GithubOAuth2(strategy, "/")
  224. response = create_user_with_form(
  225. strategy=strategy, details={}, backend=backend, user=None, pipeline_index=1
  226. )
  227. self.assertContains(response, "GitHub")
  228. def test_empty_data_rejected(self):
  229. """form rejects empty data"""
  230. request = create_request(data={})
  231. strategy = load_strategy(request=request)
  232. backend = GithubOAuth2(strategy, "/")
  233. response = create_user_with_form(
  234. strategy=strategy, details={}, backend=backend, user=None, pipeline_index=1
  235. )
  236. self.assertEqual(response.status_code, 400)
  237. self.assertJsonResponseEquals(
  238. response,
  239. {
  240. "email": ["This field is required."],
  241. "username": ["This field is required."],
  242. },
  243. )
  244. def test_taken_data_rejected(self):
  245. """form rejects taken data"""
  246. request = create_request(
  247. data={"email": self.user.email, "username": self.user.username}
  248. )
  249. strategy = load_strategy(request=request)
  250. backend = GithubOAuth2(strategy, "/")
  251. response = create_user_with_form(
  252. strategy=strategy, details={}, backend=backend, user=None, pipeline_index=1
  253. )
  254. self.assertEqual(response.status_code, 400)
  255. self.assertJsonResponseEquals(
  256. response,
  257. {
  258. "email": ["This e-mail address is not available."],
  259. "username": ["This username is not available."],
  260. },
  261. )
  262. @override_dynamic_settings(account_activation="none")
  263. def test_user_created_no_activation_verified_email(self):
  264. """active user is created for verified email and activation disabled"""
  265. form_data = {"email": "social@auth.com", "username": "SocialUser"}
  266. request = create_request(data=form_data)
  267. strategy = load_strategy(request=request)
  268. backend = GithubOAuth2(strategy, "/")
  269. result = create_user_with_form(
  270. strategy=strategy,
  271. details={"email": form_data["email"]},
  272. backend=backend,
  273. user=None,
  274. pipeline_index=1,
  275. )
  276. new_user = UserModel.objects.get(email="social@auth.com")
  277. self.assertEqual(result, {"user": new_user, "is_new": True})
  278. self.assertNewUserIsCorrect(
  279. new_user, form_data, activation="none", email_verified=True
  280. )
  281. @override_dynamic_settings(account_activation="none")
  282. def test_user_created_no_activation_nonverified_email(self):
  283. """active user is created for non-verified email and activation disabled"""
  284. form_data = {"email": "social@auth.com", "username": "SocialUser"}
  285. request = create_request(data=form_data)
  286. strategy = load_strategy(request=request)
  287. backend = GithubOAuth2(strategy, "/")
  288. result = create_user_with_form(
  289. strategy=strategy,
  290. details={"email": ""},
  291. backend=backend,
  292. user=None,
  293. pipeline_index=1,
  294. )
  295. new_user = UserModel.objects.get(email="social@auth.com")
  296. self.assertEqual(result, {"user": new_user, "is_new": True})
  297. self.assertNewUserIsCorrect(
  298. new_user, form_data, activation="none", email_verified=False
  299. )
  300. @override_dynamic_settings(account_activation="user")
  301. def test_user_created_activation_by_user_verified_email(self):
  302. """active user is created for verified email and activation by user"""
  303. form_data = {"email": "social@auth.com", "username": "SocialUser"}
  304. request = create_request(data=form_data)
  305. strategy = load_strategy(request=request)
  306. backend = GithubOAuth2(strategy, "/")
  307. result = create_user_with_form(
  308. strategy=strategy,
  309. details={"email": form_data["email"]},
  310. backend=backend,
  311. user=None,
  312. pipeline_index=1,
  313. )
  314. new_user = UserModel.objects.get(email="social@auth.com")
  315. self.assertEqual(result, {"user": new_user, "is_new": True})
  316. self.assertNewUserIsCorrect(
  317. new_user, form_data, activation="user", email_verified=True
  318. )
  319. @override_dynamic_settings(account_activation="user")
  320. def test_user_created_activation_by_user_nonverified_email(self):
  321. """inactive user is created for non-verified email and activation by user"""
  322. form_data = {"email": "social@auth.com", "username": "SocialUser"}
  323. request = create_request(data=form_data)
  324. strategy = load_strategy(request=request)
  325. backend = GithubOAuth2(strategy, "/")
  326. result = create_user_with_form(
  327. strategy=strategy,
  328. details={"email": ""},
  329. backend=backend,
  330. user=None,
  331. pipeline_index=1,
  332. )
  333. new_user = UserModel.objects.get(email="social@auth.com")
  334. self.assertEqual(result, {"user": new_user, "is_new": True})
  335. self.assertNewUserIsCorrect(
  336. new_user, form_data, activation="user", email_verified=False
  337. )
  338. @override_dynamic_settings(account_activation="admin")
  339. def test_user_created_activation_by_admin_verified_email(self):
  340. """inactive user is created for verified email and activation by admin"""
  341. form_data = {"email": "social@auth.com", "username": "SocialUser"}
  342. request = create_request(data=form_data)
  343. strategy = load_strategy(request=request)
  344. backend = GithubOAuth2(strategy, "/")
  345. result = create_user_with_form(
  346. strategy=strategy,
  347. details={"email": form_data["email"]},
  348. backend=backend,
  349. user=None,
  350. pipeline_index=1,
  351. )
  352. new_user = UserModel.objects.get(email="social@auth.com")
  353. self.assertEqual(result, {"user": new_user, "is_new": True})
  354. self.assertNewUserIsCorrect(
  355. new_user, form_data, activation="admin", email_verified=True
  356. )
  357. @override_dynamic_settings(account_activation="admin")
  358. def test_user_created_activation_by_admin_nonverified_email(self):
  359. """inactive user is created for non-verified email and activation by admin"""
  360. form_data = {"email": "social@auth.com", "username": "SocialUser"}
  361. request = create_request(data=form_data)
  362. strategy = load_strategy(request=request)
  363. backend = GithubOAuth2(strategy, "/")
  364. result = create_user_with_form(
  365. strategy=strategy,
  366. details={"email": ""},
  367. backend=backend,
  368. user=None,
  369. pipeline_index=1,
  370. )
  371. new_user = UserModel.objects.get(email="social@auth.com")
  372. self.assertEqual(result, {"user": new_user, "is_new": True})
  373. self.assertNewUserIsCorrect(
  374. new_user, form_data, activation="admin", email_verified=False
  375. )
  376. def test_form_check_agreement(self):
  377. """social register checks agreement"""
  378. form_data = {"email": "social@auth.com", "username": "SocialUser"}
  379. request = create_request(data=form_data)
  380. strategy = load_strategy(request=request)
  381. backend = GithubOAuth2(strategy, "/")
  382. agreement = Agreement.objects.create(
  383. type=Agreement.TYPE_TOS, text="Lorem ipsum", is_active=True
  384. )
  385. response = create_user_with_form(
  386. strategy=strategy,
  387. details={"email": form_data["email"]},
  388. backend=backend,
  389. user=None,
  390. pipeline_index=1,
  391. )
  392. self.assertEqual(response.status_code, 400)
  393. self.assertJsonResponseEquals(
  394. response, {"terms_of_service": ["This agreement is required."]}
  395. )
  396. # invalid agreement id
  397. form_data = {
  398. "email": "social@auth.com",
  399. "username": "SocialUser",
  400. "terms_of_service": agreement.id + 1,
  401. }
  402. request = create_request(data=form_data)
  403. strategy = load_strategy(request=request)
  404. backend = GithubOAuth2(strategy, "/")
  405. response = create_user_with_form(
  406. strategy=strategy,
  407. details={"email": form_data["email"]},
  408. backend=backend,
  409. user=None,
  410. pipeline_index=1,
  411. )
  412. self.assertEqual(response.status_code, 400)
  413. self.assertJsonResponseEquals(
  414. response, {"terms_of_service": ["This agreement is required."]}
  415. )
  416. # valid agreement id
  417. form_data = {
  418. "email": "social@auth.com",
  419. "username": "SocialUser",
  420. "terms_of_service": agreement.id,
  421. }
  422. request = create_request(data=form_data)
  423. strategy = load_strategy(request=request)
  424. backend = GithubOAuth2(strategy, "/")
  425. result = create_user_with_form(
  426. strategy=strategy,
  427. details={"email": form_data["email"]},
  428. backend=backend,
  429. user=None,
  430. pipeline_index=1,
  431. )
  432. new_user = UserModel.objects.get(email="social@auth.com")
  433. self.assertEqual(result, {"user": new_user, "is_new": True})
  434. self.assertEqual(new_user.agreements, [agreement.id])
  435. self.assertEqual(new_user.useragreement_set.count(), 1)
  436. def test_form_ignore_inactive_agreement(self):
  437. """social register ignores inactive agreement"""
  438. form_data = {
  439. "email": "social@auth.com",
  440. "username": "SocialUser",
  441. "terms_of_service": None,
  442. }
  443. request = create_request(data=form_data)
  444. strategy = load_strategy(request=request)
  445. backend = GithubOAuth2(strategy, "/")
  446. Agreement.objects.create(
  447. type=Agreement.TYPE_TOS, text="Lorem ipsum", is_active=False
  448. )
  449. result = create_user_with_form(
  450. strategy=strategy,
  451. details={"email": form_data["email"]},
  452. backend=backend,
  453. user=None,
  454. pipeline_index=1,
  455. )
  456. new_user = UserModel.objects.get(email="social@auth.com")
  457. self.assertEqual(result, {"user": new_user, "is_new": True})
  458. self.assertEqual(new_user.agreements, [])
  459. self.assertEqual(new_user.useragreement_set.count(), 0)
  460. class GetUsernameTests(PipelineTestCase):
  461. def test_skip_if_user_is_set(self):
  462. """pipeline step is skipped if user was passed"""
  463. strategy = create_strategy()
  464. result = get_username(strategy, {}, None, user=self.user)
  465. self.assertIsNone(result)
  466. def test_skip_if_no_names(self):
  467. """pipeline step is skipped if API returned no names"""
  468. strategy = create_strategy()
  469. result = get_username(strategy, {}, None)
  470. self.assertIsNone(result)
  471. def test_resolve_to_username(self):
  472. """pipeline step resolves username"""
  473. strategy = create_strategy()
  474. result = get_username(strategy, {"username": "BobBoberson"}, None)
  475. self.assertEqual(result, {"clean_username": "BobBoberson"})
  476. def test_normalize_username(self):
  477. """pipeline step normalizes username"""
  478. strategy = create_strategy()
  479. result = get_username(strategy, {"username": "Błop Błoperson"}, None)
  480. self.assertEqual(result, {"clean_username": "BlopBloperson"})
  481. def test_resolve_to_first_name(self):
  482. """pipeline attempts to use first name because username is taken"""
  483. strategy = create_strategy()
  484. details = {"username": self.user.username, "first_name": "Błob"}
  485. result = get_username(strategy, details, None)
  486. self.assertEqual(result, {"clean_username": "Blob"})
  487. def test_dont_resolve_to_last_name(self):
  488. """pipeline will not fallback to last name because username is taken"""
  489. strategy = create_strategy()
  490. details = {"username": self.user.username, "last_name": "Błob"}
  491. result = get_username(strategy, details, None)
  492. self.assertIsNone(result)
  493. def test_resolve_to_first_last_name_first_char(self):
  494. """pipeline will construct username from first name and first char of surname"""
  495. strategy = create_strategy()
  496. details = {"first_name": self.user.username, "last_name": "Błob"}
  497. result = get_username(strategy, details, None)
  498. self.assertEqual(result, {"clean_username": self.user.username + "B"})
  499. def test_dont_resolve_to_banned_name(self):
  500. """pipeline will not resolve to banned name"""
  501. strategy = create_strategy()
  502. Ban.objects.create(banned_value="*Admin*", check_type=Ban.USERNAME)
  503. details = {"username": "Misago Admin", "first_name": "Błob"}
  504. result = get_username(strategy, details, None)
  505. self.assertEqual(result, {"clean_username": "Blob"})
  506. def test_resolve_full_name(self):
  507. """pipeline will resolve to full name"""
  508. strategy = create_strategy()
  509. Ban.objects.create(banned_value="*Admin*", check_type=Ban.USERNAME)
  510. details = {"username": "Misago Admin", "full_name": "Błob Błopo"}
  511. result = get_username(strategy, details, None)
  512. self.assertEqual(result, {"clean_username": "BlobBlopo"})
  513. def test_resolve_to_cut_name(self):
  514. """pipeline will resolve cut too long name on second pass"""
  515. strategy = create_strategy()
  516. details = {"username": "Abrakadabrapokuskonstantynopolitańczykowianeczkatrzy"}
  517. result = get_username(strategy, details, None)
  518. self.assertEqual(result, {"clean_username": "Abrakadabrapok"})
  519. class RequireActivationTests(PipelineTestCase):
  520. def setUp(self):
  521. super().setUp()
  522. self.user.requires_activation = UserModel.ACTIVATION_ADMIN
  523. self.user.save()
  524. def test_skip_if_user_not_set(self):
  525. """pipeline step is skipped if user is not set"""
  526. request = create_request()
  527. strategy = load_strategy(request=request)
  528. backend = GithubOAuth2(strategy, "/")
  529. result = require_activation(
  530. strategy=strategy, details={}, backend=backend, user=None, pipeline_index=1
  531. )
  532. self.assertEqual(result, {})
  533. def test_partial_token_if_user_not_set_no_showstopper(self):
  534. """pipeline step handles set session token if user is not set"""
  535. request = create_request()
  536. strategy = load_strategy(request=request)
  537. strategy.request.session["partial_pipeline_token"] = "test-token"
  538. backend = GithubOAuth2(strategy, "/")
  539. require_activation(
  540. strategy=strategy, details={}, backend=backend, user=None, pipeline_index=1
  541. )
  542. def test_skip_if_user_is_active(self):
  543. """pipeline step is skipped if user is active"""
  544. self.user.requires_activation = UserModel.ACTIVATION_NONE
  545. self.user.save()
  546. self.assertFalse(self.user.requires_activation)
  547. request = create_request()
  548. strategy = load_strategy(request=request)
  549. backend = GithubOAuth2(strategy, "/")
  550. result = require_activation(
  551. strategy=strategy,
  552. details={},
  553. backend=backend,
  554. user=self.user,
  555. pipeline_index=1,
  556. )
  557. self.assertEqual(result, {})
  558. def test_pipeline_returns_html_responseon_get(self):
  559. """pipeline step renders http response for GET request and inactive user"""
  560. request = create_request()
  561. strategy = load_strategy(request=request)
  562. backend = GithubOAuth2(strategy, "/")
  563. response = require_activation(
  564. strategy=strategy,
  565. details={},
  566. backend=backend,
  567. user=self.user,
  568. pipeline_index=1,
  569. )
  570. self.assertEqual(response.status_code, 200)
  571. self.assertEqual(response["content-type"], "text/html; charset=utf-8")
  572. def test_pipeline_returns_json_response_on_post(self):
  573. """pipeline step renders json response for POST request and inactive user"""
  574. request = create_request(data={"username": "anything"})
  575. strategy = load_strategy(request=request)
  576. backend = GithubOAuth2(strategy, "/")
  577. response = require_activation(
  578. strategy=strategy,
  579. details={},
  580. backend=backend,
  581. user=self.user,
  582. pipeline_index=1,
  583. )
  584. self.assertEqual(response.status_code, 200)
  585. self.assertEqual(response["content-type"], "application/json")
  586. self.assertJsonResponseEquals(
  587. response,
  588. {
  589. "step": "done",
  590. "backend_name": "GitHub",
  591. "activation": "admin",
  592. "email": "test@user.com",
  593. "username": "TestUser",
  594. },
  595. )
  596. class ValidateIpNotBannedTests(PipelineTestCase):
  597. def test_skip_if_user_not_set(self):
  598. """pipeline step is skipped if no user was passed"""
  599. result = validate_ip_not_banned(None, {}, GithubOAuth2)
  600. self.assertIsNone(result)
  601. def test_raise_if_banned(self):
  602. """pipeline raises if user's IP is banned"""
  603. Ban.objects.create(banned_value="188.*", check_type=Ban.IP)
  604. try:
  605. validate_ip_not_banned(
  606. MockStrategy(user_ip="188.1.2.3"), {}, GithubOAuth2, self.user
  607. )
  608. self.fail("validate_ip_not_banned should raise SocialAuthBanned")
  609. except SocialAuthBanned as e:
  610. self.assertTrue(isinstance(e.ban, Ban))
  611. def test_exclude_staff(self):
  612. """pipeline excludes staff from bans"""
  613. self.user.is_staff = True
  614. self.user.save()
  615. Ban.objects.create(banned_value="188.*", check_type=Ban.IP)
  616. result = validate_ip_not_banned(
  617. MockStrategy(user_ip="188.1.2.3"), {}, GithubOAuth2, self.user
  618. )
  619. self.assertIsNone(result)
  620. class ValidateUserNotBannedTests(PipelineTestCase):
  621. def test_skip_if_user_not_set(self):
  622. """pipeline step is skipped if no user was passed"""
  623. result = validate_user_not_banned(None, {}, GithubOAuth2)
  624. self.assertIsNone(result)
  625. def test_raise_if_banned(self):
  626. """pipeline raises if user's IP is banned"""
  627. Ban.objects.create(banned_value=self.user.username, check_type=Ban.USERNAME)
  628. try:
  629. validate_user_not_banned(MockStrategy(), {}, GithubOAuth2, self.user)
  630. self.fail("validate_ip_not_banned should raise SocialAuthBanned")
  631. except SocialAuthBanned as e:
  632. self.assertEqual(e.ban.user, self.user)
  633. self.assertTrue(isinstance(e.ban, BanCache))
  634. def test_exclude_staff(self):
  635. """pipeline excludes staff from bans"""
  636. self.user.is_staff = True
  637. self.user.save()
  638. Ban.objects.create(banned_value=self.user.username, check_type=Ban.USERNAME)
  639. result = validate_user_not_banned(MockStrategy(), {}, GithubOAuth2, self.user)
  640. self.assertIsNone(result)