test_sso_views.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. from urllib.parse import urlparse
  2. from itsdangerous.timed import TimedSerializer, TimestampSigner
  3. from requests import Response
  4. from requests.sessions import Session
  5. from django.contrib.auth import get_user_model
  6. from django.shortcuts import reverse
  7. from django.test import override_settings, TestCase
  8. from django.utils.timezone import now
  9. from ...conf.test import override_dynamic_settings
  10. from .utils import TEST_SSO_SETTINGS
  11. User = get_user_model()
  12. SSO_USER_ID = 1
  13. def create_verify_response(data):
  14. signer = TimedSerializer(TEST_SSO_SETTINGS["sso_private_key"])
  15. return signer.dumps(data)
  16. class ConnectionMock:
  17. def __init__(self, user_data=None):
  18. self.session = Session
  19. self.user_data = user_data
  20. def __enter__(self):
  21. self.origin_post = Session.post
  22. def mocked_post(*args, **kwargs):
  23. mocked_response = Response()
  24. requested_url = args[1]
  25. if "/server/request-token/" == urlparse(requested_url).path:
  26. # token generated for private key settings.SSO_PRIVATE_KEY = 'priv1'
  27. mocked_response._content = (
  28. b'{"request_token": "XcHtuemqcjnIT6J2WHTFswLQP0W07nI96XfxqGkm6b1zFT'
  29. b'oF0YGEoIYu37QOajkc"}.XTd9sA.quRsXFxqMk-ufwSc79q-_YLDNzg'
  30. )
  31. elif "/server/verify/" == urlparse(requested_url).path:
  32. user_data = {
  33. "id": SSO_USER_ID,
  34. "username": "jkowalski",
  35. "email": "jkowalski@example.com",
  36. "first_name": "Jan",
  37. "last_name": "Kowalski",
  38. "is_staff": False,
  39. "is_superuser": False,
  40. "is_active": True,
  41. }
  42. if self.user_data:
  43. user_data.update(self.user_data)
  44. mocked_response._content = create_verify_response(user_data)
  45. mocked_response.status_code = 200
  46. return mocked_response
  47. setattr(self.session, "post", mocked_post)
  48. return self.session
  49. def __exit__(self, type, value, traceback):
  50. setattr(self.session, "post", self.origin_post)
  51. class TimestampSignerMock:
  52. def __init__(self):
  53. self.TimestampSigner = TimestampSigner
  54. def __enter__(self):
  55. self.origin_unsign = TimestampSigner.unsign
  56. def mocked_unsign(*args, **kwargs):
  57. s = args[1]
  58. if b'"username": "jkowalski"' in s:
  59. value = s[: s.index(b"}.") + 1] # {...}
  60. timestamp_to_datetime = now()
  61. return value, timestamp_to_datetime
  62. else:
  63. return self.origin_unsign(*args, **kwargs)
  64. setattr(self.TimestampSigner, "unsign", mocked_unsign)
  65. return self.TimestampSigner
  66. def __exit__(self, type, value, traceback):
  67. setattr(self.TimestampSigner, "unsign", self.origin_unsign)
  68. @override_dynamic_settings(enable_sso=False)
  69. def test_sso_login_view_returns_404_if_sso_is_disabled(db, client):
  70. url_to_external_logging = reverse("simple-sso-login")
  71. assert url_to_external_logging == "/sso/client/"
  72. response = client.get(url_to_external_logging)
  73. assert response.status_code == 404
  74. @override_dynamic_settings(**TEST_SSO_SETTINGS)
  75. def test_sso_login_view_initiates_auth_flow(db, client):
  76. url_to_external_logging = reverse("simple-sso-login")
  77. assert url_to_external_logging == "/sso/client/"
  78. with ConnectionMock():
  79. response = client.get(url_to_external_logging)
  80. assert response.status_code == 302
  81. url_parsed = urlparse(response.url)
  82. assert url_parsed.path == "/server/authorize/"
  83. assert url_parsed.query == (
  84. "token=XcHtuemqcjnIT6J2WHTFswLQP0W07nI96XfxqGkm6b1zFToF0YGEoIYu37QOajkc"
  85. )
  86. @override_dynamic_settings(enable_sso=False)
  87. def test_sso_auth_view_returns_404_if_sso_is_disabled(db, client):
  88. url_to_authenticate = reverse("simple-sso-authenticate")
  89. assert url_to_authenticate == "/sso/client/authenticate/"
  90. response = client.get(url_to_authenticate)
  91. assert response.status_code == 404
  92. @override_dynamic_settings(**TEST_SSO_SETTINGS)
  93. def test_sso_auth_view_creates_new_user(db, client):
  94. url_to_authenticate = reverse("simple-sso-authenticate")
  95. assert url_to_authenticate == "/sso/client/authenticate/"
  96. query = (
  97. "next=%2F&access_token=InBBMjllMlNla2ZWdDdJMnR0c3R3QWIxcjQwRzV6TmphZDRSaEprbjlMbnR0TnF"
  98. "Ka3Q2d1dNR1lVYkhzVThvZU0i.XTeRVQ.3XiIMg0AFcJKDFCekse6s43uNLI"
  99. )
  100. url_to_authenticate += "?" + query
  101. with ConnectionMock():
  102. with TimestampSignerMock():
  103. response = client.get(url_to_authenticate)
  104. assert response.status_code == 302
  105. assert response.url == "/"
  106. user = User.objects.first()
  107. assert user.username == "jkowalski"
  108. @override_dynamic_settings(**TEST_SSO_SETTINGS)
  109. def test_sso_auth_view_authenticates_existing_user(user, client):
  110. user.sso_id = SSO_USER_ID
  111. user.save()
  112. url_to_authenticate = reverse("simple-sso-authenticate")
  113. assert url_to_authenticate == "/sso/client/authenticate/"
  114. query = (
  115. "next=%2F&access_token=InBBMjllMlNla2ZWdDdJMnR0c3R3QWIxcjQwRzV6TmphZDRSaEprbjlMbnR0TnF"
  116. "Ka3Q2d1dNR1lVYkhzVThvZU0i.XTeRVQ.3XiIMg0AFcJKDFCekse6s43uNLI"
  117. )
  118. url_to_authenticate += "?" + query
  119. with ConnectionMock():
  120. with TimestampSignerMock():
  121. response = client.get(url_to_authenticate)
  122. assert response.status_code == 302
  123. assert response.url == "/"
  124. assert User.objects.count() == 1
  125. @override_dynamic_settings(**TEST_SSO_SETTINGS)
  126. def test_sso_auth_view_updates_existing_user_using_data_from_sso(user, client):
  127. user.sso_id = SSO_USER_ID
  128. user.is_active = False
  129. user.save()
  130. url_to_authenticate = reverse("simple-sso-authenticate")
  131. assert url_to_authenticate == "/sso/client/authenticate/"
  132. query = (
  133. "next=%2F&access_token=InBBMjllMlNla2ZWdDdJMnR0c3R3QWIxcjQwRzV6TmphZDRSaEprbjlMbnR0TnF"
  134. "Ka3Q2d1dNR1lVYkhzVThvZU0i.XTeRVQ.3XiIMg0AFcJKDFCekse6s43uNLI"
  135. )
  136. url_to_authenticate += "?" + query
  137. with ConnectionMock():
  138. with TimestampSignerMock():
  139. client.get(url_to_authenticate)
  140. user.refresh_from_db()
  141. assert user.username == "jkowalski"
  142. assert user.email == "jkowalski@example.com"
  143. assert user.is_active is True
  144. @override_dynamic_settings(**TEST_SSO_SETTINGS)
  145. def test_sso_auth_view_returns_bad_request_error_for_invalid_user_data(db, client):
  146. url_to_authenticate = reverse("simple-sso-authenticate")
  147. assert url_to_authenticate == "/sso/client/authenticate/"
  148. query = (
  149. "next=%2F&access_token=InBBMjllMlNla2ZWdDdJMnR0c3R3QWIxcjQwRzV6TmphZDRSaEprbjlMbnR0TnF"
  150. "Ka3Q2d1dNR1lVYkhzVThvZU0i.XTeRVQ.3XiIMg0AFcJKDFCekse6s43uNLI"
  151. )
  152. url_to_authenticate += "?" + query
  153. with ConnectionMock({"email": "invalid"}):
  154. with TimestampSignerMock():
  155. response = client.get(url_to_authenticate)
  156. assert response.status_code == 400