ban.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. import re
  2. from django.conf import settings
  3. from django.db import IntegrityError, models
  4. from django.utils import timezone
  5. from django.utils.translation import ugettext_lazy as _
  6. from misago.core import cachebuster
  7. from misago.users.constants import BANS_CACHEBUSTER
  8. class BansManager(models.Manager):
  9. def get_ip_ban(self, ip, registration_only=False):
  10. return self.get_ban(
  11. ip=ip,
  12. registration_only=registration_only,
  13. )
  14. def get_username_ban(self, username, registration_only=False):
  15. return self.get_ban(
  16. username=username,
  17. registration_only=registration_only,
  18. )
  19. def get_email_ban(self, email, registration_only=False):
  20. return self.get_ban(
  21. email=email,
  22. registration_only=registration_only,
  23. )
  24. def invalidate_cache(self):
  25. cachebuster.invalidate(BANS_CACHEBUSTER)
  26. def get_ban(self, username=None, email=None, ip=None, registration_only=False):
  27. checks = []
  28. if username:
  29. username = username.lower()
  30. checks.append(self.model.USERNAME)
  31. if email:
  32. email = email.lower()
  33. checks.append(self.model.EMAIL)
  34. if ip:
  35. checks.append(self.model.IP)
  36. queryset = self.filter(
  37. is_checked=True,
  38. registration_only=registration_only,
  39. )
  40. if len(checks) == 1:
  41. queryset = queryset.filter(check_type=checks[0])
  42. elif checks:
  43. queryset = queryset.filter(check_type__in=checks)
  44. for ban in queryset.order_by('-id').iterator():
  45. if ban.is_expired:
  46. continue
  47. elif (ban.check_type == self.model.USERNAME and username and ban.check_value(username)):
  48. return ban
  49. elif (ban.check_type == self.model.EMAIL and email and ban.check_value(email)):
  50. return ban
  51. elif ban.check_type == self.model.IP and ip and ban.check_value(ip):
  52. return ban
  53. else:
  54. raise Ban.DoesNotExist('specified values are not banned')
  55. class Ban(models.Model):
  56. USERNAME = 0
  57. EMAIL = 1
  58. IP = 2
  59. CHOICES = [
  60. (USERNAME, _('Username')),
  61. (EMAIL, _('E-mail address')),
  62. (IP, _('IP address')),
  63. ]
  64. check_type = models.PositiveIntegerField(default=USERNAME, choices=CHOICES, db_index=True)
  65. registration_only = models.BooleanField(default=False, db_index=True)
  66. banned_value = models.CharField(max_length=255, db_index=True)
  67. user_message = models.TextField(null=True, blank=True)
  68. staff_message = models.TextField(null=True, blank=True)
  69. expires_on = models.DateTimeField(null=True, blank=True, db_index=True)
  70. is_checked = models.BooleanField(default=True, db_index=True)
  71. objects = BansManager()
  72. def save(self, *args, **kwargs):
  73. self.banned_value = self.banned_value.lower()
  74. self.is_checked = not self.is_expired
  75. return super(Ban, self).save(*args, **kwargs)
  76. def get_serialized_message(self):
  77. from misago.users.serializers import BanMessageSerializer
  78. return BanMessageSerializer(self).data
  79. @property
  80. def name(self):
  81. return self.banned_value
  82. @property
  83. def is_expired(self):
  84. if self.expires_on:
  85. return self.expires_on < timezone.now()
  86. else:
  87. return False
  88. def check_value(self, value):
  89. if '*' in self.banned_value:
  90. regex = re.escape(self.banned_value).replace('\*', '(.*?)')
  91. return re.search('^%s$' % regex, value) is not None
  92. else:
  93. return self.banned_value == value
  94. def lift(self):
  95. self.expires_on = timezone.now()
  96. class BanCache(models.Model):
  97. user = models.OneToOneField(
  98. settings.AUTH_USER_MODEL,
  99. primary_key=True,
  100. related_name='ban_cache',
  101. on_delete=models.CASCADE,
  102. )
  103. ban = models.ForeignKey(
  104. Ban,
  105. null=True,
  106. blank=True,
  107. on_delete=models.SET_NULL,
  108. )
  109. bans_version = models.PositiveIntegerField(default=0)
  110. user_message = models.TextField(null=True, blank=True)
  111. staff_message = models.TextField(null=True, blank=True)
  112. expires_on = models.DateTimeField(null=True, blank=True)
  113. def save(self, *args, **kwargs):
  114. try:
  115. super(BanCache, self).save(*args, **kwargs)
  116. except IntegrityError:
  117. pass # first come is first serve with ban cache
  118. def get_serialized_message(self):
  119. from misago.users.serializers import BanMessageSerializer
  120. temp_ban = Ban(
  121. id=1,
  122. check_type=Ban.USERNAME,
  123. user_message=self.user_message,
  124. staff_message=self.staff_message,
  125. expires_on=self.expires_on,
  126. )
  127. return BanMessageSerializer(temp_ban).data
  128. @property
  129. def is_banned(self):
  130. return bool(self.ban)
  131. @property
  132. def is_valid(self):
  133. version_is_valid = cachebuster.is_valid(BANS_CACHEBUSTER, self.bans_version)
  134. expired = self.expires_on and self.expires_on < timezone.now()
  135. return version_is_valid and not expired