test_sso_views.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. from urllib.parse import urlparse
  2. from itsdangerous.timed import TimedSerializer, TimestampSigner
  3. from requests import Response
  4. from requests.sessions import Session
  5. from django.contrib.auth import get_user_model
  6. from django.shortcuts import reverse
  7. from django.test import override_settings, TestCase
  8. from django.utils.timezone import now
  9. from ...conf.test import override_dynamic_settings
  10. User = get_user_model()
  11. TEST_SSO_SETTINGS = {
  12. "enable_sso": True,
  13. "sso_private_key": "priv1",
  14. "sso_public_key": "fakeSsoPublicKey",
  15. "sso_server": "http://example.com/server/",
  16. }
  17. SSO_USER_ID = 1
  18. def create_verify_response(data):
  19. signer = TimedSerializer(TEST_SSO_SETTINGS["sso_private_key"])
  20. return signer.dumps(data)
  21. class ConnectionMock:
  22. def __init__(self, user_data=None):
  23. self.session = Session
  24. self.user_data = user_data
  25. def __enter__(self):
  26. self.origin_post = Session.post
  27. def mocked_post(*args, **kwargs):
  28. mocked_response = Response()
  29. requested_url = args[1]
  30. if "/server/request-token/" == urlparse(requested_url).path:
  31. # token generated for private key settings.SSO_PRIVATE_KEY = 'priv1'
  32. mocked_response._content = (
  33. b'{"request_token": "XcHtuemqcjnIT6J2WHTFswLQP0W07nI96XfxqGkm6b1zFT'
  34. b'oF0YGEoIYu37QOajkc"}.XTd9sA.quRsXFxqMk-ufwSc79q-_YLDNzg'
  35. )
  36. elif "/server/verify/" == urlparse(requested_url).path:
  37. user_data = {
  38. "id": SSO_USER_ID,
  39. "username": "jkowalski",
  40. "email": "jkowalski@example.com",
  41. "first_name": "Jan",
  42. "last_name": "Kowalski",
  43. "is_staff": False,
  44. "is_superuser": False,
  45. "is_active": True,
  46. }
  47. if self.user_data:
  48. user_data.update(self.user_data)
  49. mocked_response._content = create_verify_response(user_data)
  50. mocked_response.status_code = 200
  51. return mocked_response
  52. setattr(self.session, "post", mocked_post)
  53. return self.session
  54. def __exit__(self, type, value, traceback):
  55. setattr(self.session, "post", self.origin_post)
  56. class TimestampSignerMock:
  57. def __init__(self):
  58. self.TimestampSigner = TimestampSigner
  59. def __enter__(self):
  60. self.origin_unsign = TimestampSigner.unsign
  61. def mocked_unsign(*args, **kwargs):
  62. s = args[1]
  63. if b'"username": "jkowalski"' in s:
  64. value = s[: s.index(b"}.") + 1] # {...}
  65. timestamp_to_datetime = now()
  66. return value, timestamp_to_datetime
  67. else:
  68. return self.origin_unsign(*args, **kwargs)
  69. setattr(self.TimestampSigner, "unsign", mocked_unsign)
  70. return self.TimestampSigner
  71. def __exit__(self, type, value, traceback):
  72. setattr(self.TimestampSigner, "unsign", self.origin_unsign)
  73. @override_dynamic_settings(enable_sso=False)
  74. def test_sso_login_view_returns_404_if_sso_is_disabled(db, client):
  75. url_to_external_logging = reverse("simple-sso-login")
  76. assert url_to_external_logging == "/sso/client/"
  77. response = client.get(url_to_external_logging)
  78. assert response.status_code == 404
  79. @override_dynamic_settings(**TEST_SSO_SETTINGS)
  80. def test_sso_login_view_initiates_auth_flow(db, client):
  81. url_to_external_logging = reverse("simple-sso-login")
  82. assert url_to_external_logging == "/sso/client/"
  83. with ConnectionMock():
  84. response = client.get(url_to_external_logging)
  85. assert response.status_code == 302
  86. url_parsed = urlparse(response.url)
  87. assert url_parsed.path == "/server/authorize/"
  88. assert url_parsed.query == (
  89. "token=XcHtuemqcjnIT6J2WHTFswLQP0W07nI96XfxqGkm6b1zFToF0YGEoIYu37QOajkc"
  90. )
  91. @override_dynamic_settings(enable_sso=False)
  92. def test_sso_auth_view_returns_404_if_sso_is_disabled(db, client):
  93. url_to_authenticate = reverse("simple-sso-authenticate")
  94. assert url_to_authenticate == "/sso/client/authenticate/"
  95. response = client.get(url_to_authenticate)
  96. assert response.status_code == 404
  97. @override_dynamic_settings(**TEST_SSO_SETTINGS)
  98. def test_sso_auth_view_creates_new_user(db, client):
  99. url_to_authenticate = reverse("simple-sso-authenticate")
  100. assert url_to_authenticate == "/sso/client/authenticate/"
  101. query = (
  102. "next=%2F&access_token=InBBMjllMlNla2ZWdDdJMnR0c3R3QWIxcjQwRzV6TmphZDRSaEprbjlMbnR0TnF"
  103. "Ka3Q2d1dNR1lVYkhzVThvZU0i.XTeRVQ.3XiIMg0AFcJKDFCekse6s43uNLI"
  104. )
  105. url_to_authenticate += "?" + query
  106. with ConnectionMock():
  107. with TimestampSignerMock():
  108. response = client.get(url_to_authenticate)
  109. assert response.status_code == 302
  110. assert response.url == "/"
  111. user = User.objects.first()
  112. assert user.username == "jkowalski"
  113. @override_dynamic_settings(**TEST_SSO_SETTINGS)
  114. def test_sso_auth_view_authenticates_existing_user(user, client):
  115. user.sso_id = SSO_USER_ID
  116. user.save()
  117. url_to_authenticate = reverse("simple-sso-authenticate")
  118. assert url_to_authenticate == "/sso/client/authenticate/"
  119. query = (
  120. "next=%2F&access_token=InBBMjllMlNla2ZWdDdJMnR0c3R3QWIxcjQwRzV6TmphZDRSaEprbjlMbnR0TnF"
  121. "Ka3Q2d1dNR1lVYkhzVThvZU0i.XTeRVQ.3XiIMg0AFcJKDFCekse6s43uNLI"
  122. )
  123. url_to_authenticate += "?" + query
  124. with ConnectionMock():
  125. with TimestampSignerMock():
  126. response = client.get(url_to_authenticate)
  127. assert response.status_code == 302
  128. assert response.url == "/"
  129. assert User.objects.count() == 1
  130. @override_dynamic_settings(**TEST_SSO_SETTINGS)
  131. def test_sso_auth_view_updates_existing_user_using_data_from_sso(user, client):
  132. user.sso_id = SSO_USER_ID
  133. user.is_active = False
  134. user.save()
  135. url_to_authenticate = reverse("simple-sso-authenticate")
  136. assert url_to_authenticate == "/sso/client/authenticate/"
  137. query = (
  138. "next=%2F&access_token=InBBMjllMlNla2ZWdDdJMnR0c3R3QWIxcjQwRzV6TmphZDRSaEprbjlMbnR0TnF"
  139. "Ka3Q2d1dNR1lVYkhzVThvZU0i.XTeRVQ.3XiIMg0AFcJKDFCekse6s43uNLI"
  140. )
  141. url_to_authenticate += "?" + query
  142. with ConnectionMock():
  143. with TimestampSignerMock():
  144. client.get(url_to_authenticate)
  145. user.refresh_from_db()
  146. assert user.username == "jkowalski"
  147. assert user.email == "jkowalski@example.com"
  148. assert user.is_active is True
  149. @override_dynamic_settings(**TEST_SSO_SETTINGS)
  150. def test_sso_auth_view_returns_bad_request_error_for_invalid_user_data(db, client):
  151. url_to_authenticate = reverse("simple-sso-authenticate")
  152. assert url_to_authenticate == "/sso/client/authenticate/"
  153. query = (
  154. "next=%2F&access_token=InBBMjllMlNla2ZWdDdJMnR0c3R3QWIxcjQwRzV6TmphZDRSaEprbjlMbnR0TnF"
  155. "Ka3Q2d1dNR1lVYkhzVThvZU0i.XTeRVQ.3XiIMg0AFcJKDFCekse6s43uNLI"
  156. )
  157. url_to_authenticate += "?" + query
  158. with ConnectionMock({"email": "invalid"}):
  159. with TimestampSignerMock():
  160. response = client.get(url_to_authenticate)
  161. assert response.status_code == 400