validation.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. from dataclasses import dataclass
  2. from django.contrib.auth import get_user_model
  3. from django.core.validators import validate_email
  4. from django.forms import ValidationError
  5. from django.utils.crypto import get_random_string
  6. from unidecode import unidecode
  7. from ..hooks import oauth2_user_data_filters
  8. from ..users.validators import (
  9. validate_new_registration,
  10. validate_username_content,
  11. validate_username_length,
  12. )
  13. from .exceptions import OAuth2UserDataValidationError
  14. User = get_user_model()
  15. class UsernameSettings:
  16. username_length_max: int = 200
  17. username_length_min: int = 1
  18. def validate_user_data(request, user, user_data):
  19. filtered_data = filter_user_data(request, user, user_data)
  20. try:
  21. validate_username_content(filtered_data["name"])
  22. validate_username_length(UsernameSettings, filtered_data["name"])
  23. validate_email(filtered_data["email"])
  24. errors_list = []
  25. def add_error(_field_unused: str | None, error: str | ValidationError):
  26. if isinstance(error, ValidationError):
  27. error = error.message
  28. errors_list.append(str(error))
  29. validate_new_registration(request, filtered_data, add_error)
  30. except ValidationError as exc:
  31. raise OAuth2UserDataValidationError(error_list=[str(exc.message)])
  32. if errors_list:
  33. raise OAuth2UserDataValidationError(error_list=errors_list)
  34. return filtered_data
  35. def filter_user_data(request, user, user_data):
  36. filtered_data = {
  37. "id": user_data["id"],
  38. "name": str(user_data["name"] or "").strip(),
  39. "email": str(user_data["email"] or "").strip(),
  40. "avatar": filter_user_avatar(user_data["avatar"]),
  41. }
  42. if oauth2_user_data_filters:
  43. return filter_user_data_with_filters(
  44. request, user, filtered_data, oauth2_user_data_filters
  45. )
  46. else:
  47. filtered_data["name"] = filter_name(user, filtered_data["name"])
  48. return filtered_data
  49. def filter_user_avatar(user_avatar):
  50. if user_avatar:
  51. return str(user_avatar).strip() or None
  52. return None
  53. def filter_user_data_with_filters(request, user, user_data, filters):
  54. for filter_user_data in filters:
  55. user_data = filter_user_data(request, user, user_data.copy()) or user_data
  56. return user_data
  57. def filter_name(user, name):
  58. if user and user.username == name:
  59. return name
  60. clean_name = "".join(
  61. [c for c in unidecode(name.replace(" ", "_")) if c.isalnum() or c == "_"]
  62. )
  63. if user and user.username == clean_name:
  64. return clean_name # No change in name
  65. if not clean_name.replace("_", ""):
  66. clean_name = "User_%s" % get_random_string(4)
  67. clean_name_root = clean_name
  68. while True:
  69. try:
  70. db_user = User.objects.get_by_username(clean_name)
  71. except User.DoesNotExist:
  72. return clean_name
  73. if not user or user.pk != db_user.pk:
  74. clean_name = f"{clean_name_root}_{get_random_string(4)}"
  75. else:
  76. return clean_name