ban.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. import re
  2. from django.conf import settings
  3. from django.db import IntegrityError, models
  4. from django.utils import timezone
  5. from django.utils.translation import gettext_lazy as _
  6. from misago.cache.versions import invalidate_cache
  7. from misago.users import BANS_CACHE
  8. class BansManager(models.Manager):
  9. def get_ip_ban(self, ip, registration_only=False):
  10. return self.get_ban(ip=ip, registration_only=registration_only)
  11. def get_username_ban(self, username, registration_only=False):
  12. return self.get_ban(username=username, registration_only=registration_only)
  13. def get_email_ban(self, email, registration_only=False):
  14. return self.get_ban(email=email, registration_only=registration_only)
  15. def invalidate_cache(self):
  16. invalidate_cache(BANS_CACHE)
  17. def get_ban(self, username=None, email=None, ip=None, registration_only=False):
  18. checks = []
  19. if username:
  20. username = username.lower()
  21. checks.append(self.model.USERNAME)
  22. if email:
  23. email = email.lower()
  24. checks.append(self.model.EMAIL)
  25. if ip:
  26. checks.append(self.model.IP)
  27. queryset = self.filter(is_checked=True)
  28. if not registration_only:
  29. queryset = self.filter(registration_only=False)
  30. if len(checks) == 1:
  31. queryset = queryset.filter(check_type=checks[0])
  32. elif checks:
  33. queryset = queryset.filter(check_type__in=checks)
  34. for ban in queryset.order_by("-id").iterator():
  35. if ban.is_expired:
  36. continue
  37. elif (
  38. ban.check_type == self.model.USERNAME
  39. and username
  40. and ban.check_value(username)
  41. ):
  42. return ban
  43. elif (
  44. ban.check_type == self.model.EMAIL and email and ban.check_value(email)
  45. ):
  46. return ban
  47. elif ban.check_type == self.model.IP and ip and ban.check_value(ip):
  48. return ban
  49. else:
  50. raise Ban.DoesNotExist("specified values are not banned")
  51. class Ban(models.Model):
  52. USERNAME = 0
  53. EMAIL = 1
  54. IP = 2
  55. CHOICES = [
  56. (USERNAME, _("Username")),
  57. (EMAIL, _("E-mail address")),
  58. (IP, _("IP address")),
  59. ]
  60. check_type = models.PositiveIntegerField(
  61. default=USERNAME, choices=CHOICES, db_index=True
  62. )
  63. registration_only = models.BooleanField(default=False, db_index=True)
  64. banned_value = models.CharField(max_length=255, db_index=True)
  65. user_message = models.TextField(null=True, blank=True)
  66. staff_message = models.TextField(null=True, blank=True)
  67. expires_on = models.DateTimeField(null=True, blank=True, db_index=True)
  68. is_checked = models.BooleanField(default=True, db_index=True)
  69. objects = BansManager()
  70. def save(self, *args, **kwargs):
  71. self.banned_value = self.banned_value.lower()
  72. self.is_checked = not self.is_expired
  73. return super().save(*args, **kwargs)
  74. def get_serialized_message(self):
  75. from misago.users.serializers import BanMessageSerializer
  76. return BanMessageSerializer(self).data
  77. @property
  78. def name(self):
  79. return self.banned_value
  80. @property
  81. def is_expired(self):
  82. if self.expires_on:
  83. return self.expires_on < timezone.now()
  84. else:
  85. return False
  86. def check_value(self, value):
  87. if "*" in self.banned_value:
  88. regex = re.escape(self.banned_value).replace("\*", "(.*?)")
  89. return re.search("^%s$" % regex, value, re.IGNORECASE) is not None
  90. else:
  91. return self.banned_value.lower() == value.lower()
  92. def lift(self):
  93. self.expires_on = timezone.now()
  94. class BanCache(models.Model):
  95. user = models.OneToOneField(
  96. settings.AUTH_USER_MODEL,
  97. primary_key=True,
  98. related_name="ban_cache",
  99. on_delete=models.CASCADE,
  100. )
  101. ban = models.ForeignKey(Ban, null=True, blank=True, on_delete=models.SET_NULL)
  102. cache_version = models.CharField(max_length=8)
  103. user_message = models.TextField(null=True, blank=True)
  104. staff_message = models.TextField(null=True, blank=True)
  105. expires_on = models.DateTimeField(null=True, blank=True)
  106. def save(self, *args, **kwargs):
  107. try:
  108. super().save(*args, **kwargs)
  109. except IntegrityError:
  110. pass # first come is first serve with ban cache
  111. def get_serialized_message(self):
  112. from misago.users.serializers import BanMessageSerializer
  113. temp_ban = Ban(
  114. id=1,
  115. check_type=Ban.USERNAME,
  116. user_message=self.user_message,
  117. staff_message=self.staff_message,
  118. expires_on=self.expires_on,
  119. )
  120. return BanMessageSerializer(temp_ban).data
  121. @property
  122. def is_banned(self):
  123. return bool(self.ban)
  124. def is_valid(self, cache_versions):
  125. is_versioned = self.cache_version == cache_versions[BANS_CACHE]
  126. is_expired = self.expires_on and self.expires_on < timezone.now()
  127. return is_versioned and not is_expired