tests.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. from urllib.parse import urlparse
  2. from itsdangerous.timed import TimestampSigner
  3. from requests import Response
  4. from requests.sessions import Session
  5. from django.contrib.auth import get_user_model
  6. from django.shortcuts import reverse
  7. from django.test import override_settings, TestCase
  8. from django.utils.timezone import now
  9. from ..conf.test import override_dynamic_settings
  10. User = get_user_model()
  11. TEST_SSO_SETTINGS = {
  12. "enable_sso": True,
  13. "sso_private_key": "priv1",
  14. "sso_public_key": "fakeSsoPublicKey",
  15. "sso_server": "http://example.com/server/",
  16. }
  17. SSO_USER_EMAIL = "jkowalski@example.com"
  18. class ConnectionMock:
  19. def __init__(self):
  20. self.session = Session
  21. def __enter__(self):
  22. self.origin_post = Session.post
  23. def mocked_post(*args, **kwargs):
  24. mocked_response = Response()
  25. requested_url = args[1]
  26. if "/server/request-token/" == urlparse(requested_url).path:
  27. # token generated for private key settings.SSO_PRIVATE_KEY = 'priv1'
  28. mocked_response._content = (
  29. b'{"request_token": "XcHtuemqcjnIT6J2WHTFswLQP0W07nI96XfxqGkm6b1zFToF0YGEoIYu3'
  30. b'7QOajkc"}.XTd9sA.quRsXFxqMk-ufwSc79q-_YLDNzg'
  31. )
  32. elif "/server/verify/" == urlparse(requested_url).path:
  33. mocked_response._content = (
  34. (
  35. '{"username": "jkowalski", "email": "%s", "first_name": '
  36. '"Jan", "last_name": "Kowalski", "is_staff": false, "is_superuser": false, '
  37. '"is_active": true}.XTg4IQ._cANZR5jHvtwhNzcnNYDfE1nLHE'
  38. )
  39. % SSO_USER_EMAIL
  40. ).encode("utf-8")
  41. mocked_response.status_code = 200
  42. return mocked_response
  43. setattr(self.session, "post", mocked_post)
  44. return self.session
  45. def __exit__(self, type, value, traceback):
  46. setattr(self.session, "post", self.origin_post)
  47. class TimestampSignerMock:
  48. def __init__(self):
  49. self.TimestampSigner = TimestampSigner
  50. def __enter__(self):
  51. self.origin_unsign = TimestampSigner.unsign
  52. def mocked_unsign(*args, **kwargs):
  53. s = args[1]
  54. if b'"username": "jkowalski"' in s:
  55. value = s[:166] # {...}
  56. timestamp_to_datetime = now()
  57. return value, timestamp_to_datetime
  58. else:
  59. return self.origin_unsign(*args, **kwargs)
  60. setattr(self.TimestampSigner, "unsign", mocked_unsign)
  61. return self.TimestampSigner
  62. def __exit__(self, type, value, traceback):
  63. setattr(self.TimestampSigner, "unsign", self.origin_unsign)
  64. @override_dynamic_settings(enable_sso=False)
  65. def test_sso_login_view_returns_404_if_sso_is_disabled(db, client):
  66. url_to_external_logging = reverse("simple-sso-login")
  67. assert url_to_external_logging == "/sso/client/"
  68. response = client.get(url_to_external_logging)
  69. assert response.status_code == 404
  70. @override_dynamic_settings(**TEST_SSO_SETTINGS)
  71. def test_sso_login_view_initiates_auth_flow(db, client):
  72. url_to_external_logging = reverse("simple-sso-login")
  73. assert url_to_external_logging == "/sso/client/"
  74. with ConnectionMock():
  75. response = client.get(url_to_external_logging)
  76. assert response.status_code == 302
  77. url_parsed = urlparse(response.url)
  78. assert url_parsed.path == "/server/authorize/"
  79. assert url_parsed.query == (
  80. "token=XcHtuemqcjnIT6J2WHTFswLQP0W07nI96XfxqGkm6b1zFToF0YGEoIYu37QOajkc"
  81. )
  82. @override_dynamic_settings(enable_sso=False)
  83. def test_sso_auth_view_returns_404_if_sso_is_disabled(db, client):
  84. url_to_authenticate = reverse("simple-sso-authenticate")
  85. assert url_to_authenticate == "/sso/client/authenticate/"
  86. response = client.get(url_to_authenticate)
  87. assert response.status_code == 404
  88. @override_dynamic_settings(**TEST_SSO_SETTINGS)
  89. def test_sso_auth_view_creates_new_user(db, client):
  90. url_to_authenticate = reverse("simple-sso-authenticate")
  91. assert url_to_authenticate == "/sso/client/authenticate/"
  92. query = (
  93. "next=%2F&access_token=InBBMjllMlNla2ZWdDdJMnR0c3R3QWIxcjQwRzV6TmphZDRSaEprbjlMbnR0TnF"
  94. "Ka3Q2d1dNR1lVYkhzVThvZU0i.XTeRVQ.3XiIMg0AFcJKDFCekse6s43uNLI"
  95. )
  96. url_to_authenticate += "?" + query
  97. with ConnectionMock():
  98. with TimestampSignerMock():
  99. response = client.get(url_to_authenticate)
  100. assert response.status_code == 302
  101. assert response.url == "/"
  102. user = User.objects.first()
  103. assert user.username == "jkowalski"
  104. @override_dynamic_settings(**TEST_SSO_SETTINGS)
  105. def test_sso_auth_view_authenticates_existing_user(user, client):
  106. user.set_email(SSO_USER_EMAIL)
  107. user.save()
  108. url_to_authenticate = reverse("simple-sso-authenticate")
  109. assert url_to_authenticate == "/sso/client/authenticate/"
  110. query = (
  111. "next=%2F&access_token=InBBMjllMlNla2ZWdDdJMnR0c3R3QWIxcjQwRzV6TmphZDRSaEprbjlMbnR0TnF"
  112. "Ka3Q2d1dNR1lVYkhzVThvZU0i.XTeRVQ.3XiIMg0AFcJKDFCekse6s43uNLI"
  113. )
  114. url_to_authenticate += "?" + query
  115. with ConnectionMock():
  116. with TimestampSignerMock():
  117. response = client.get(url_to_authenticate)
  118. assert response.status_code == 302
  119. assert response.url == "/"
  120. assert User.objects.count() == 1