|
@@ -1,6 +1,6 @@
|
|
|
from urllib.parse import urlparse
|
|
|
|
|
|
-from itsdangerous.timed import TimestampSigner
|
|
|
+from itsdangerous.timed import TimedSerializer, TimestampSigner
|
|
|
from requests import Response
|
|
|
from requests.sessions import Session
|
|
|
|
|
@@ -9,7 +9,7 @@ from django.shortcuts import reverse
|
|
|
from django.test import override_settings, TestCase
|
|
|
from django.utils.timezone import now
|
|
|
|
|
|
-from ..conf.test import override_dynamic_settings
|
|
|
+from ...conf.test import override_dynamic_settings
|
|
|
|
|
|
User = get_user_model()
|
|
|
|
|
@@ -20,12 +20,18 @@ TEST_SSO_SETTINGS = {
|
|
|
"sso_server": "http://example.com/server/",
|
|
|
}
|
|
|
|
|
|
-SSO_USER_EMAIL = "jkowalski@example.com"
|
|
|
+SSO_USER_ID = 1
|
|
|
+
|
|
|
+
|
|
|
+def create_verify_response(data):
|
|
|
+ signer = TimedSerializer(TEST_SSO_SETTINGS["sso_private_key"])
|
|
|
+ return signer.dumps(data)
|
|
|
|
|
|
|
|
|
class ConnectionMock:
|
|
|
- def __init__(self):
|
|
|
+ def __init__(self, user_data=None):
|
|
|
self.session = Session
|
|
|
+ self.user_data = user_data
|
|
|
|
|
|
def __enter__(self):
|
|
|
self.origin_post = Session.post
|
|
@@ -36,18 +42,25 @@ class ConnectionMock:
|
|
|
if "/server/request-token/" == urlparse(requested_url).path:
|
|
|
# token generated for private key settings.SSO_PRIVATE_KEY = 'priv1'
|
|
|
mocked_response._content = (
|
|
|
- b'{"request_token": "XcHtuemqcjnIT6J2WHTFswLQP0W07nI96XfxqGkm6b1zFToF0YGEoIYu3'
|
|
|
- b'7QOajkc"}.XTd9sA.quRsXFxqMk-ufwSc79q-_YLDNzg'
|
|
|
+ b'{"request_token": "XcHtuemqcjnIT6J2WHTFswLQP0W07nI96XfxqGkm6b1zFT'
|
|
|
+ b'oF0YGEoIYu37QOajkc"}.XTd9sA.quRsXFxqMk-ufwSc79q-_YLDNzg'
|
|
|
)
|
|
|
elif "/server/verify/" == urlparse(requested_url).path:
|
|
|
- mocked_response._content = (
|
|
|
- (
|
|
|
- '{"username": "jkowalski", "email": "%s", "first_name": '
|
|
|
- '"Jan", "last_name": "Kowalski", "is_staff": false, "is_superuser": false, '
|
|
|
- '"is_active": true}.XTg4IQ._cANZR5jHvtwhNzcnNYDfE1nLHE'
|
|
|
- )
|
|
|
- % SSO_USER_EMAIL
|
|
|
- ).encode("utf-8")
|
|
|
+ user_data = {
|
|
|
+ "id": SSO_USER_ID,
|
|
|
+ "username": "jkowalski",
|
|
|
+ "email": "jkowalski@example.com",
|
|
|
+ "first_name": "Jan",
|
|
|
+ "last_name": "Kowalski",
|
|
|
+ "is_staff": False,
|
|
|
+ "is_superuser": False,
|
|
|
+ "is_active": True,
|
|
|
+ }
|
|
|
+
|
|
|
+ if self.user_data:
|
|
|
+ user_data.update(self.user_data)
|
|
|
+
|
|
|
+ mocked_response._content = create_verify_response(user_data)
|
|
|
|
|
|
mocked_response.status_code = 200
|
|
|
return mocked_response
|
|
@@ -69,7 +82,7 @@ class TimestampSignerMock:
|
|
|
def mocked_unsign(*args, **kwargs):
|
|
|
s = args[1]
|
|
|
if b'"username": "jkowalski"' in s:
|
|
|
- value = s[:166] # {...}
|
|
|
+ value = s[: s.index(b"}.") + 1] # {...}
|
|
|
timestamp_to_datetime = now()
|
|
|
return value, timestamp_to_datetime
|
|
|
else:
|
|
@@ -141,7 +154,7 @@ def test_sso_auth_view_creates_new_user(db, client):
|
|
|
|
|
|
@override_dynamic_settings(**TEST_SSO_SETTINGS)
|
|
|
def test_sso_auth_view_authenticates_existing_user(user, client):
|
|
|
- user.set_email(SSO_USER_EMAIL)
|
|
|
+ user.sso_id = SSO_USER_ID
|
|
|
user.save()
|
|
|
|
|
|
url_to_authenticate = reverse("simple-sso-authenticate")
|
|
@@ -161,3 +174,46 @@ def test_sso_auth_view_authenticates_existing_user(user, client):
|
|
|
assert response.url == "/"
|
|
|
|
|
|
assert User.objects.count() == 1
|
|
|
+
|
|
|
+
|
|
|
+@override_dynamic_settings(**TEST_SSO_SETTINGS)
|
|
|
+def test_sso_auth_view_updates_existing_user_using_data_from_sso(user, client):
|
|
|
+ user.sso_id = SSO_USER_ID
|
|
|
+ user.is_active = False
|
|
|
+ user.save()
|
|
|
+
|
|
|
+ url_to_authenticate = reverse("simple-sso-authenticate")
|
|
|
+ assert url_to_authenticate == "/sso/client/authenticate/"
|
|
|
+
|
|
|
+ query = (
|
|
|
+ "next=%2F&access_token=InBBMjllMlNla2ZWdDdJMnR0c3R3QWIxcjQwRzV6TmphZDRSaEprbjlMbnR0TnF"
|
|
|
+ "Ka3Q2d1dNR1lVYkhzVThvZU0i.XTeRVQ.3XiIMg0AFcJKDFCekse6s43uNLI"
|
|
|
+ )
|
|
|
+ url_to_authenticate += "?" + query
|
|
|
+
|
|
|
+ with ConnectionMock():
|
|
|
+ with TimestampSignerMock():
|
|
|
+ client.get(url_to_authenticate)
|
|
|
+
|
|
|
+ user.refresh_from_db()
|
|
|
+ assert user.username == "jkowalski"
|
|
|
+ assert user.email == "jkowalski@example.com"
|
|
|
+ assert user.is_active is True
|
|
|
+
|
|
|
+
|
|
|
+@override_dynamic_settings(**TEST_SSO_SETTINGS)
|
|
|
+def test_sso_auth_view_returns_bad_request_error_for_invalid_user_data(db, client):
|
|
|
+
|
|
|
+ url_to_authenticate = reverse("simple-sso-authenticate")
|
|
|
+ assert url_to_authenticate == "/sso/client/authenticate/"
|
|
|
+
|
|
|
+ query = (
|
|
|
+ "next=%2F&access_token=InBBMjllMlNla2ZWdDdJMnR0c3R3QWIxcjQwRzV6TmphZDRSaEprbjlMbnR0TnF"
|
|
|
+ "Ka3Q2d1dNR1lVYkhzVThvZU0i.XTeRVQ.3XiIMg0AFcJKDFCekse6s43uNLI"
|
|
|
+ )
|
|
|
+ url_to_authenticate += "?" + query
|
|
|
+
|
|
|
+ with ConnectionMock({"email": "invalid"}):
|
|
|
+ with TimestampSignerMock():
|
|
|
+ response = client.get(url_to_authenticate)
|
|
|
+ assert response.status_code == 400
|