import json from django.contrib.auth import get_user_model from django.core import mail from django.test import RequestFactory, override_settings from social_core.backends.github import GithubOAuth2 from social_django.utils import load_strategy from misago.core.exceptions import SocialAuthFailed, SocialAuthBanned from misago.legal.models import Agreement from misago.users.models import AnonymousUser, Ban, BanCache from misago.users.social.pipeline import ( associate_by_email, create_user, create_user_with_form, get_username, require_activation, validate_ip_not_banned, validate_user_not_banned ) from misago.users.testutils import UserTestCase UserModel = get_user_model() def create_request(user_ip='0.0.0.0', data=None): factory = RequestFactory() if data is None: request = factory.get('/') else: request = factory.post('/', data=json.dumps(data), content_type='application/json') request.include_frontend_context = True request.frontend_context = {} request.session = {} request.user = AnonymousUser() request.user_ip = user_ip return request class MockStrategy(object): def __init__(self, user_ip='0.0.0.0'): self.cleaned_partial_token = None self.request = create_request(user_ip) def clean_partial_pipeline(self, token): self.cleaned_partial_token = token class PipelineTestCase(UserTestCase): def get_initial_user(self): self.user = self.get_authenticated_user() def assertNewUserIsCorrect( self, new_user, form_data=None, activation=None, email_verified=False): self.assertFalse(new_user.has_usable_password()) self.assertIn('Welcome', mail.outbox[0].subject) if form_data: self.assertEqual(new_user.email, form_data['email']) self.assertEqual(new_user.username, form_data['username']) if activation == 'none': self.assertEqual(new_user.requires_activation, UserModel.ACTIVATION_NONE) if activation == 'user': if email_verified: self.assertEqual(new_user.requires_activation, UserModel.ACTIVATION_NONE) else: self.assertEqual(new_user.requires_activation, UserModel.ACTIVATION_USER) if activation == 'admin': self.assertEqual(new_user.requires_activation, UserModel.ACTIVATION_ADMIN) self.assertEqual(new_user.audittrail_set.count(), 1) def assertJsonResponseEquals(self, response, value): response_content = response.content.decode("utf-8") response_json = json.loads(response_content) self.assertEqual(response_json, value) class AssociateByEmailTests(PipelineTestCase): def test_skip_if_user_is_already_set(self): """pipeline step is skipped if user was found by previous step""" result = associate_by_email(None, {}, GithubOAuth2, self.user) self.assertIsNone(result) def test_skip_if_no_email_passed(self): """pipeline step is skipped if no email was passed""" result = associate_by_email(None, {}, GithubOAuth2) self.assertIsNone(result) def test_skip_if_user_with_email_not_found(self): """pipeline step is skipped if no email was passed""" result = associate_by_email(None, {'email': 'not@found.com'}, GithubOAuth2) self.assertIsNone(result) def test_raise_if_user_is_inactive(self): """pipeline raises if user was inactive""" self.user.is_active = False self.user.save() try: associate_by_email(None, {'email': self.user.email}, GithubOAuth2) self.fail("associate_by_email should raise SocialAuthFailed") except SocialAuthFailed as e: self.assertEqual( e.message, ( "The e-mail address associated with your GitHub account is not available for " "use on this site." ), ) def test_raise_if_user_needs_admin_activation(self): """pipeline raises if user needs admin activation""" self.user.requires_activation = UserModel.ACTIVATION_ADMIN self.user.save() try: associate_by_email(None, {'email': self.user.email}, GithubOAuth2) self.fail("associate_by_email should raise SocialAuthFailed") except SocialAuthFailed as e: self.assertEqual( e.message, ( "Your account has to be activated by site administrator before you will be " "able to sign in with GitHub." ), ) def test_return_user(self): """pipeline returns user if email was found""" result = associate_by_email(None, {'email': self.user.email}, GithubOAuth2) self.assertEqual(result, {'user': self.user, 'is_new': False}) def test_return_user_email_inactive(self): """pipeline returns user even if they didn't activate their account manually""" self.user.requires_activation = UserModel.ACTIVATION_USER self.user.save() result = associate_by_email(None, {'email': self.user.email}, GithubOAuth2) self.assertEqual(result, {'user': self.user, 'is_new': False}) class CreateUser(PipelineTestCase): def test_skip_if_user_is_set(self): """pipeline step is skipped if user was passed""" result = create_user(MockStrategy(), {}, GithubOAuth2(), user=self.user) self.assertIsNone(result) def test_skip_if_no_email_passed(self): """pipeline step is skipped if no email was passed""" result = create_user( MockStrategy(), {}, GithubOAuth2(), clean_username='TestBob', ) self.assertIsNone(result) def test_skip_if_no_clean_username_passed(self): """pipeline step is skipped if cleaned username wasnt passed""" result = create_user( MockStrategy(), {'email': 'hello@example.com'}, GithubOAuth2(), ) self.assertIsNone(result) def test_skip_if_email_is_taken(self): """pipeline step is skipped if email was taken""" result = create_user( MockStrategy(), {'email': self.user.email}, GithubOAuth2(), clean_username='NewUser', ) self.assertIsNone(result) @override_settings(account_activation='none') def test_user_created_no_activation(self): """pipeline step creates active user for valid data and disabled activation""" result = create_user( MockStrategy(), {'email': 'new@example.com'}, GithubOAuth2(), clean_username='NewUser', ) new_user = UserModel.objects.get(email='new@example.com') self.assertEqual(result, { 'user': new_user, 'is_new': True, }) self.assertEqual(new_user.username, 'NewUser') self.assertNewUserIsCorrect(new_user, email_verified=True, activation='none') @override_settings(account_activation='user') def test_user_created_activation_by_user(self): """pipeline step creates active user for valid data and user activation""" result = create_user( MockStrategy(), {'email': 'new@example.com'}, GithubOAuth2(), clean_username='NewUser', ) new_user = UserModel.objects.get(email='new@example.com') self.assertEqual(result, { 'user': new_user, 'is_new': True, }) self.assertEqual(new_user.username, 'NewUser') self.assertNewUserIsCorrect(new_user, email_verified=True, activation='user') @override_settings(account_activation='admin') def test_user_created_activation_by_admin(self): """pipeline step creates in user for valid data and admin activation""" result = create_user( MockStrategy(), {'email': 'new@example.com'}, GithubOAuth2(), clean_username='NewUser', ) new_user = UserModel.objects.get(email='new@example.com') self.assertEqual(result, { 'user': new_user, 'is_new': True, }) self.assertEqual(new_user.username, 'NewUser') self.assertNewUserIsCorrect(new_user, email_verified=True, activation='admin') class CreateUserWithFormTests(PipelineTestCase): def setUp(self): super(CreateUserWithFormTests, self).setUp() Agreement.objects.invalidate_cache() def tearDown(self): super(CreateUserWithFormTests, self).tearDown() Agreement.objects.invalidate_cache() def test_skip_if_user_is_set(self): """pipeline step is skipped if user was passed""" request = create_request() strategy = load_strategy(request=request) backend = GithubOAuth2(strategy, '/') result = create_user_with_form( strategy=strategy, details={}, backend=backend, user=self.user, pipeline_index=1, ) self.assertEqual(result, {}) def test_renders_form_if_not_post(self): """pipeline step renders form if not POST""" request = create_request() strategy = load_strategy(request=request) backend = GithubOAuth2(strategy, '/') response = create_user_with_form( strategy=strategy, details={}, backend=backend, user=None, pipeline_index=1, ) self.assertContains(response, "GitHub") def test_empty_data_rejected(self): """form rejects empty data""" request = create_request(data={}) strategy = load_strategy(request=request) backend = GithubOAuth2(strategy, '/') response = create_user_with_form( strategy=strategy, details={}, backend=backend, user=None, pipeline_index=1, ) self.assertEqual(response.status_code, 400) self.assertJsonResponseEquals(response, { 'email': ["This field is required."], 'username': ["This field is required."], }) def test_taken_data_rejected(self): """form rejects taken data""" request = create_request(data={ 'email': self.user.email, 'username': self.user.username, }) strategy = load_strategy(request=request) backend = GithubOAuth2(strategy, '/') response = create_user_with_form( strategy=strategy, details={}, backend=backend, user=None, pipeline_index=1, ) self.assertEqual(response.status_code, 400) self.assertJsonResponseEquals(response, { 'email': ["This e-mail address is not available."], 'username': ["This username is not available."], }) @override_settings(account_activation='none') def test_user_created_no_activation_verified_email(self): """active user is created for verified email and activation disabled""" form_data = { 'email': 'social@auth.com', 'username': 'SocialUser', } request = create_request(data=form_data) strategy = load_strategy(request=request) backend = GithubOAuth2(strategy, '/') result = create_user_with_form( strategy=strategy, details={'email': form_data['email']}, backend=backend, user=None, pipeline_index=1, ) new_user = UserModel.objects.get(email='social@auth.com') self.assertEqual(result, {'user': new_user, 'is_new': True}) self.assertNewUserIsCorrect(new_user, form_data, activation='none', email_verified=True) @override_settings(account_activation='none') def test_user_created_no_activation_nonverified_email(self): """active user is created for non-verified email and activation disabled""" form_data = { 'email': 'social@auth.com', 'username': 'SocialUser', } request = create_request(data=form_data) strategy = load_strategy(request=request) backend = GithubOAuth2(strategy, '/') result = create_user_with_form( strategy=strategy, details={'email': ''}, backend=backend, user=None, pipeline_index=1, ) new_user = UserModel.objects.get(email='social@auth.com') self.assertEqual(result, {'user': new_user, 'is_new': True}) self.assertNewUserIsCorrect(new_user, form_data, activation='none', email_verified=False) @override_settings(account_activation='user') def test_user_created_activation_by_user_verified_email(self): """active user is created for verified email and activation by user""" form_data = { 'email': 'social@auth.com', 'username': 'SocialUser', } request = create_request(data=form_data) strategy = load_strategy(request=request) backend = GithubOAuth2(strategy, '/') result = create_user_with_form( strategy=strategy, details={'email': form_data['email']}, backend=backend, user=None, pipeline_index=1, ) new_user = UserModel.objects.get(email='social@auth.com') self.assertEqual(result, {'user': new_user, 'is_new': True}) self.assertNewUserIsCorrect(new_user, form_data, activation='user', email_verified=True) @override_settings(account_activation='user') def test_user_created_activation_by_user_nonverified_email(self): """inactive user is created for non-verified email and activation by user""" form_data = { 'email': 'social@auth.com', 'username': 'SocialUser', } request = create_request(data=form_data) strategy = load_strategy(request=request) backend = GithubOAuth2(strategy, '/') result = create_user_with_form( strategy=strategy, details={'email': ''}, backend=backend, user=None, pipeline_index=1, ) new_user = UserModel.objects.get(email='social@auth.com') self.assertEqual(result, {'user': new_user, 'is_new': True}) self.assertNewUserIsCorrect(new_user, form_data, activation='user', email_verified=False) @override_settings(account_activation='admin') def test_user_created_activation_by_admin_verified_email(self): """inactive user is created for verified email and activation by admin""" form_data = { 'email': 'social@auth.com', 'username': 'SocialUser', } request = create_request(data=form_data) strategy = load_strategy(request=request) backend = GithubOAuth2(strategy, '/') result = create_user_with_form( strategy=strategy, details={'email': form_data['email']}, backend=backend, user=None, pipeline_index=1, ) new_user = UserModel.objects.get(email='social@auth.com') self.assertEqual(result, {'user': new_user, 'is_new': True}) self.assertNewUserIsCorrect(new_user, form_data, activation='admin', email_verified=True) @override_settings(account_activation='admin') def test_user_created_activation_by_admin_nonverified_email(self): """inactive user is created for non-verified email and activation by admin""" form_data = { 'email': 'social@auth.com', 'username': 'SocialUser', } request = create_request(data=form_data) strategy = load_strategy(request=request) backend = GithubOAuth2(strategy, '/') result = create_user_with_form( strategy=strategy, details={'email': ''}, backend=backend, user=None, pipeline_index=1, ) new_user = UserModel.objects.get(email='social@auth.com') self.assertEqual(result, {'user': new_user, 'is_new': True}) self.assertNewUserIsCorrect(new_user, form_data, activation='admin', email_verified=False) def test_form_check_agreement(self): """social register checks agreement""" form_data = { 'email': 'social@auth.com', 'username': 'SocialUser', } request = create_request(data=form_data) strategy = load_strategy(request=request) backend = GithubOAuth2(strategy, '/') agreement = Agreement.objects.create( type=Agreement.TYPE_TOS, text="Lorem ipsum", is_active=True, ) response = create_user_with_form( strategy=strategy, details={'email': form_data['email']}, backend=backend, user=None, pipeline_index=1, ) self.assertEqual(response.status_code, 400) self.assertJsonResponseEquals(response, { 'terms_of_service': ['This agreement is required.'], }) # invalid agreement id form_data = { 'email': 'social@auth.com', 'username': 'SocialUser', 'terms_of_service': agreement.id + 1, } request = create_request(data=form_data) strategy = load_strategy(request=request) backend = GithubOAuth2(strategy, '/') response = create_user_with_form( strategy=strategy, details={'email': form_data['email']}, backend=backend, user=None, pipeline_index=1, ) self.assertEqual(response.status_code, 400) self.assertJsonResponseEquals(response, { 'terms_of_service': ['This agreement is required.'], }) # valid agreement id form_data = { 'email': 'social@auth.com', 'username': 'SocialUser', 'terms_of_service': agreement.id, } request = create_request(data=form_data) strategy = load_strategy(request=request) backend = GithubOAuth2(strategy, '/') result = create_user_with_form( strategy=strategy, details={'email': form_data['email']}, backend=backend, user=None, pipeline_index=1, ) new_user = UserModel.objects.get(email='social@auth.com') self.assertEqual(result, {'user': new_user, 'is_new': True}) self.assertEqual(new_user.agreements, [agreement.id]) self.assertEqual(new_user.useragreement_set.count(), 1) def test_form_ignore_inactive_agreement(self): """social register ignores inactive agreement""" form_data = { 'email': 'social@auth.com', 'username': 'SocialUser', 'terms_of_service': None, } request = create_request(data=form_data) strategy = load_strategy(request=request) backend = GithubOAuth2(strategy, '/') Agreement.objects.create( type=Agreement.TYPE_TOS, text="Lorem ipsum", is_active=False, ) result = create_user_with_form( strategy=strategy, details={'email': form_data['email']}, backend=backend, user=None, pipeline_index=1, ) new_user = UserModel.objects.get(email='social@auth.com') self.assertEqual(result, {'user': new_user, 'is_new': True}) self.assertEqual(new_user.agreements, []) self.assertEqual(new_user.useragreement_set.count(), 0) class GetUsernameTests(PipelineTestCase): def test_skip_if_user_is_set(self): """pipeline step is skipped if user was passed""" result = get_username(None, {}, None, user=self.user) self.assertIsNone(result) def test_skip_if_no_names(self): """pipeline step is skipped if API returned no names""" result = get_username(None, {}, None) self.assertIsNone(result) def test_resolve_to_username(self): """pipeline step resolves username""" result = get_username(None, {'username': 'BobBoberson'}, None) self.assertEqual(result, {'clean_username': 'BobBoberson'}) def test_normalize_username(self): """pipeline step normalizes username""" result = get_username(None, {'username': 'Błop Błoperson'}, None) self.assertEqual(result, {'clean_username': 'BlopBloperson'}) def test_resolve_to_first_name(self): """pipeline attempts to use first name because username is taken""" details = { 'username': self.user.username, 'first_name': 'Błob', } result = get_username(None, details, None) self.assertEqual(result, {'clean_username': 'Blob'}) def test_dont_resolve_to_last_name(self): """pipeline will not fallback to last name because username is taken""" details = { 'username': self.user.username, 'last_name': 'Błob', } result = get_username(None, details, None) self.assertIsNone(result) def test_resolve_to_first_last_name_first_char(self): """pipeline will construct username from first name and first char of surname""" details = { 'first_name': self.user.username, 'last_name': 'Błob', } result = get_username(None, details, None) self.assertEqual(result, {'clean_username': self.user.username + 'B'}) def test_dont_resolve_to_banned_name(self): """pipeline will not resolve to banned name""" Ban.objects.create(banned_value='*Admin*', check_type=Ban.USERNAME) details = { 'username': 'Misago Admin', 'first_name': 'Błob', } result = get_username(None, details, None) self.assertEqual(result, {'clean_username': 'Blob'}) def test_resolve_full_name(self): """pipeline will resolve to full name""" Ban.objects.create(banned_value='*Admin*', check_type=Ban.USERNAME) details = { 'username': 'Misago Admin', 'full_name': 'Błob Błopo', } result = get_username(None, details, None) self.assertEqual(result, {'clean_username': 'BlobBlopo'}) def test_resolve_to_cut_name(self): """pipeline will resolve cut too long name on second pass""" details = { 'username': 'Abrakadabrapokuskonstantynopolitańczykowianeczkatrzy', } result = get_username(None, details, None) self.assertEqual(result, {'clean_username': 'Abrakadabrapok'}) class RequireActivationTests(PipelineTestCase): def setUp(self): super(RequireActivationTests, self).setUp() self.user.requires_activation = UserModel.ACTIVATION_ADMIN self.user.save() def test_skip_if_user_not_set(self): """pipeline step is skipped if user is not set""" request = create_request() strategy = load_strategy(request=request) backend = GithubOAuth2(strategy, '/') result = require_activation( strategy=strategy, details={}, backend=backend, user=None, pipeline_index=1, ) self.assertEqual(result, {}) def test_partial_token_if_user_not_set_no_showstopper(self): """pipeline step handles set session token if user is not set""" request = create_request() strategy = load_strategy(request=request) strategy.request.session['partial_pipeline_token'] = 'test-token' backend = GithubOAuth2(strategy, '/') require_activation( strategy=strategy, details={}, backend=backend, user=None, pipeline_index=1, ) def test_skip_if_user_is_active(self): """pipeline step is skipped if user is active""" self.user.requires_activation = UserModel.ACTIVATION_NONE self.user.save() self.assertFalse(self.user.requires_activation) request = create_request() strategy = load_strategy(request=request) backend = GithubOAuth2(strategy, '/') result = require_activation( strategy=strategy, details={}, backend=backend, user=self.user, pipeline_index=1, ) self.assertEqual(result, {}) def test_pipeline_returns_html_responseon_get(self): """pipeline step renders http response for GET request and inactive user""" request = create_request() strategy = load_strategy(request=request) backend = GithubOAuth2(strategy, '/') response = require_activation( strategy=strategy, details={}, backend=backend, user=self.user, pipeline_index=1, ) self.assertEqual(response.status_code, 200) self.assertEqual(response['content-type'], 'text/html; charset=utf-8') def test_pipeline_returns_json_response_on_post(self): """pipeline step renders json response for POST request and inactive user""" request = create_request(data={'username': 'anything'}) strategy = load_strategy(request=request) backend = GithubOAuth2(strategy, '/') response = require_activation( strategy=strategy, details={}, backend=backend, user=self.user, pipeline_index=1, ) self.assertEqual(response.status_code, 200) self.assertEqual(response['content-type'], 'application/json') self.assertJsonResponseEquals(response, { 'step': 'done', 'backend_name': 'GitHub', 'activation': 'admin', 'email': 'test@user.com', 'username': 'TestUser', }) class ValidateIpNotBannedTests(PipelineTestCase): def test_skip_if_user_not_set(self): """pipeline step is skipped if no user was passed""" result = validate_ip_not_banned(None, {}, GithubOAuth2) self.assertIsNone(result) def test_raise_if_banned(self): """pipeline raises if user's IP is banned""" Ban.objects.create(banned_value='188.*', check_type=Ban.IP) try: validate_ip_not_banned(MockStrategy(user_ip='188.1.2.3'), {}, GithubOAuth2, self.user) self.fail("validate_ip_not_banned should raise SocialAuthBanned") except SocialAuthBanned as e: self.assertTrue(isinstance(e.ban, Ban)) def test_exclude_staff(self): """pipeline excludes staff from bans""" self.user.is_staff = True self.user.save() Ban.objects.create(banned_value='188.*', check_type=Ban.IP) result = validate_ip_not_banned( MockStrategy(user_ip='188.1.2.3'), {}, GithubOAuth2, self.user) self.assertIsNone(result) class ValidateUserNotBannedTests(PipelineTestCase): def test_skip_if_user_not_set(self): """pipeline step is skipped if no user was passed""" result = validate_user_not_banned(None, {}, GithubOAuth2) self.assertIsNone(result) def test_raise_if_banned(self): """pipeline raises if user's IP is banned""" Ban.objects.create(banned_value=self.user.username, check_type=Ban.USERNAME) try: validate_user_not_banned(MockStrategy(), {}, GithubOAuth2, self.user) self.fail("validate_ip_not_banned should raise SocialAuthBanned") except SocialAuthBanned as e: self.assertEqual(e.ban.user, self.user) self.assertTrue(isinstance(e.ban, BanCache)) def test_exclude_staff(self): """pipeline excludes staff from bans""" self.user.is_staff = True self.user.save() Ban.objects.create(banned_value=self.user.username, check_type=Ban.USERNAME) result = validate_user_not_banned(MockStrategy(), {}, GithubOAuth2, self.user) self.assertIsNone(result)