|
@@ -1,19 +1,31 @@
|
|
|
+from urllib.parse import urlparse
|
|
|
+
|
|
|
from itsdangerous.timed import TimestampSigner
|
|
|
from requests import Response
|
|
|
from requests.sessions import Session
|
|
|
-from urllib.parse import urlparse
|
|
|
|
|
|
from django.contrib.auth import get_user_model
|
|
|
from django.shortcuts import reverse
|
|
|
from django.test import override_settings, TestCase
|
|
|
from django.utils.timezone import now
|
|
|
|
|
|
+from ..conf.test import override_dynamic_settings
|
|
|
+
|
|
|
User = get_user_model()
|
|
|
|
|
|
+TEST_SSO_SETTINGS = {
|
|
|
+ "enable_sso": True,
|
|
|
+ "sso_private_key": "priv1",
|
|
|
+ "sso_public_key": "fakeSsoPublicKey",
|
|
|
+ "sso_server": "http://example.com/server/",
|
|
|
+}
|
|
|
+
|
|
|
+SSO_USER_EMAIL = "jkowalski@example.com"
|
|
|
+
|
|
|
|
|
|
class ConnectionMock:
|
|
|
def __init__(self):
|
|
|
- self.Session = Session
|
|
|
+ self.session = Session
|
|
|
|
|
|
def __enter__(self):
|
|
|
self.origin_post = Session.post
|
|
@@ -29,19 +41,22 @@ class ConnectionMock:
|
|
|
)
|
|
|
elif "/server/verify/" == urlparse(requested_url).path:
|
|
|
mocked_response._content = (
|
|
|
- b'{"username": "jkowalski", "email": "jkowalski@example.com", "first_name": '
|
|
|
- b'"Jan", "last_name": "Kowalski", "is_staff": false, "is_superuser": false, '
|
|
|
- b'"is_active": true}.XTg4IQ._cANZR5jHvtwhNzcnNYDfE1nLHE'
|
|
|
- )
|
|
|
+ (
|
|
|
+ '{"username": "jkowalski", "email": "%s", "first_name": '
|
|
|
+ '"Jan", "last_name": "Kowalski", "is_staff": false, "is_superuser": false, '
|
|
|
+ '"is_active": true}.XTg4IQ._cANZR5jHvtwhNzcnNYDfE1nLHE'
|
|
|
+ )
|
|
|
+ % SSO_USER_EMAIL
|
|
|
+ ).encode("utf-8")
|
|
|
|
|
|
mocked_response.status_code = 200
|
|
|
return mocked_response
|
|
|
|
|
|
- setattr(self.Session, "post", mocked_post)
|
|
|
- return self.Session
|
|
|
+ setattr(self.session, "post", mocked_post)
|
|
|
+ return self.session
|
|
|
|
|
|
def __exit__(self, type, value, traceback):
|
|
|
- setattr(self.Session, "post", self.origin_post)
|
|
|
+ setattr(self.session, "post", self.origin_post)
|
|
|
|
|
|
|
|
|
class TimestampSignerMock:
|
|
@@ -67,38 +82,82 @@ class TimestampSignerMock:
|
|
|
setattr(self.TimestampSigner, "unsign", self.origin_unsign)
|
|
|
|
|
|
|
|
|
-class SsoModuleTestCase(TestCase):
|
|
|
- def test_sso_client(self):
|
|
|
- url_to_external_logging = reverse("simple-sso-login")
|
|
|
- self.assertEqual("/sso/client/", url_to_external_logging)
|
|
|
+@override_dynamic_settings(enable_sso=False)
|
|
|
+def test_sso_login_view_returns_404_if_sso_is_disabled(db, client):
|
|
|
+ url_to_external_logging = reverse("simple-sso-login")
|
|
|
+ assert url_to_external_logging == "/sso/client/"
|
|
|
+
|
|
|
+ response = client.get(url_to_external_logging)
|
|
|
+ assert response.status_code == 404
|
|
|
+
|
|
|
+
|
|
|
+@override_dynamic_settings(**TEST_SSO_SETTINGS)
|
|
|
+def test_sso_login_view_initiates_auth_flow(db, client):
|
|
|
+ url_to_external_logging = reverse("simple-sso-login")
|
|
|
+ assert url_to_external_logging == "/sso/client/"
|
|
|
+
|
|
|
+ with ConnectionMock():
|
|
|
+ response = client.get(url_to_external_logging)
|
|
|
+
|
|
|
+ assert response.status_code == 302
|
|
|
+
|
|
|
+ url_parsed = urlparse(response.url)
|
|
|
+ assert url_parsed.path == "/server/authorize/"
|
|
|
+ assert url_parsed.query == (
|
|
|
+ "token=XcHtuemqcjnIT6J2WHTFswLQP0W07nI96XfxqGkm6b1zFToF0YGEoIYu37QOajkc"
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+@override_dynamic_settings(enable_sso=False)
|
|
|
+def test_sso_auth_view_returns_404_if_sso_is_disabled(db, client):
|
|
|
+ url_to_authenticate = reverse("simple-sso-authenticate")
|
|
|
+ assert url_to_authenticate == "/sso/client/authenticate/"
|
|
|
+
|
|
|
+ response = client.get(url_to_authenticate)
|
|
|
+ assert response.status_code == 404
|
|
|
+
|
|
|
+
|
|
|
+@override_dynamic_settings(**TEST_SSO_SETTINGS)
|
|
|
+def test_sso_auth_view_creates_new_user(db, client):
|
|
|
+ url_to_authenticate = reverse("simple-sso-authenticate")
|
|
|
+ assert url_to_authenticate == "/sso/client/authenticate/"
|
|
|
+
|
|
|
+ query = (
|
|
|
+ "next=%2F&access_token=InBBMjllMlNla2ZWdDdJMnR0c3R3QWIxcjQwRzV6TmphZDRSaEprbjlMbnR0TnF"
|
|
|
+ "Ka3Q2d1dNR1lVYkhzVThvZU0i.XTeRVQ.3XiIMg0AFcJKDFCekse6s43uNLI"
|
|
|
+ )
|
|
|
+ url_to_authenticate += "?" + query
|
|
|
+
|
|
|
+ with ConnectionMock():
|
|
|
+ with TimestampSignerMock():
|
|
|
+ response = client.get(url_to_authenticate)
|
|
|
+
|
|
|
+ assert response.status_code == 302
|
|
|
+ assert response.url == "/"
|
|
|
+
|
|
|
+ user = User.objects.first()
|
|
|
+ assert user.username == "jkowalski"
|
|
|
|
|
|
- with ConnectionMock():
|
|
|
- response = self.client.get(url_to_external_logging)
|
|
|
|
|
|
- self.assertEqual(302, response.status_code)
|
|
|
+@override_dynamic_settings(**TEST_SSO_SETTINGS)
|
|
|
+def test_sso_auth_view_authenticates_existing_user(user, client):
|
|
|
+ user.set_email(SSO_USER_EMAIL)
|
|
|
+ user.save()
|
|
|
|
|
|
- url_parsed = urlparse(response.url)
|
|
|
- self.assertEqual("/server/authorize/", url_parsed.path)
|
|
|
- self.assertEqual(
|
|
|
- "token=XcHtuemqcjnIT6J2WHTFswLQP0W07nI96XfxqGkm6b1zFToF0YGEoIYu37QOajkc",
|
|
|
- url_parsed.query,
|
|
|
- )
|
|
|
+ url_to_authenticate = reverse("simple-sso-authenticate")
|
|
|
+ assert url_to_authenticate == "/sso/client/authenticate/"
|
|
|
|
|
|
- def test_sso_client_authenticate(self):
|
|
|
- url_to_authenticate = reverse("simple-sso-authenticate")
|
|
|
- self.assertEqual("/sso/client/authenticate/", url_to_authenticate)
|
|
|
- query = (
|
|
|
- "next=%2F&access_token=InBBMjllMlNla2ZWdDdJMnR0c3R3QWIxcjQwRzV6TmphZDRSaEprbjlMbnR0TnF"
|
|
|
- "Ka3Q2d1dNR1lVYkhzVThvZU0i.XTeRVQ.3XiIMg0AFcJKDFCekse6s43uNLI"
|
|
|
- )
|
|
|
- url_to_authenticate += "?" + query
|
|
|
+ query = (
|
|
|
+ "next=%2F&access_token=InBBMjllMlNla2ZWdDdJMnR0c3R3QWIxcjQwRzV6TmphZDRSaEprbjlMbnR0TnF"
|
|
|
+ "Ka3Q2d1dNR1lVYkhzVThvZU0i.XTeRVQ.3XiIMg0AFcJKDFCekse6s43uNLI"
|
|
|
+ )
|
|
|
+ url_to_authenticate += "?" + query
|
|
|
|
|
|
- with ConnectionMock():
|
|
|
- with TimestampSignerMock():
|
|
|
- response = self.client.get(url_to_authenticate)
|
|
|
+ with ConnectionMock():
|
|
|
+ with TimestampSignerMock():
|
|
|
+ response = client.get(url_to_authenticate)
|
|
|
|
|
|
- self.assertEqual(302, response.status_code)
|
|
|
- self.assertEqual("/", response.url)
|
|
|
+ assert response.status_code == 302
|
|
|
+ assert response.url == "/"
|
|
|
|
|
|
- u = User.objects.first()
|
|
|
- self.assertEqual("jkowalski", u.username)
|
|
|
+ assert User.objects.count() == 1
|