ban.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. import re
  2. from django.conf import settings
  3. from django.db import IntegrityError, models
  4. from django.utils import timezone
  5. from django.utils.translation import ugettext_lazy as _
  6. from misago.core import cachebuster
  7. from misago.users.constants import BANS_CACHEBUSTER
  8. class BansManager(models.Manager):
  9. def get_ip_ban(self, ip):
  10. return self.get_ban(ip=ip)
  11. def get_username_ban(self, username):
  12. return self.get_ban(username=username)
  13. def get_email_ban(self, email):
  14. return self.get_ban(email=email)
  15. def invalidate_cache(self):
  16. cachebuster.invalidate(BANS_CACHEBUSTER)
  17. def get_ban(self, username=None, email=None, ip=None):
  18. checks = []
  19. if username:
  20. username = username.lower()
  21. checks.append(self.model.USERNAME)
  22. if email:
  23. email = email.lower()
  24. checks.append(self.model.EMAIL)
  25. if ip:
  26. checks.append(self.model.IP)
  27. queryset = self.filter(is_checked=True)
  28. if len(checks) == 1:
  29. queryset = queryset.filter(check_type=checks[0])
  30. elif checks:
  31. queryset = queryset.filter(check_type__in=checks)
  32. for ban in queryset.order_by('-id').iterator():
  33. if ban.is_expired:
  34. continue
  35. elif (ban.check_type == self.model.USERNAME and username and ban.check_value(username)):
  36. return ban
  37. elif (ban.check_type == self.model.EMAIL and email and ban.check_value(email)):
  38. return ban
  39. elif ban.check_type == self.model.IP and ip and ban.check_value(ip):
  40. return ban
  41. else:
  42. raise Ban.DoesNotExist('specified values are not banned')
  43. class Ban(models.Model):
  44. USERNAME = 0
  45. EMAIL = 1
  46. IP = 2
  47. CHOICES = [
  48. (USERNAME, _('Username')),
  49. (EMAIL, _('E-mail address')),
  50. (IP, _('IP address')),
  51. ]
  52. check_type = models.PositiveIntegerField(default=USERNAME, db_index=True)
  53. banned_value = models.CharField(max_length=255, db_index=True)
  54. user_message = models.TextField(null=True, blank=True)
  55. staff_message = models.TextField(null=True, blank=True)
  56. expires_on = models.DateTimeField(null=True, blank=True, db_index=True)
  57. is_checked = models.BooleanField(default=True, db_index=True)
  58. objects = BansManager()
  59. def save(self, *args, **kwargs):
  60. self.banned_value = self.banned_value.lower()
  61. self.is_checked = not self.is_expired
  62. return super(Ban, self).save(*args, **kwargs)
  63. def get_serialized_message(self):
  64. from misago.users.serializers import BanMessageSerializer
  65. return BanMessageSerializer(self).data
  66. @property
  67. def check_name(self):
  68. return self.CHOICES[self.check_type][1]
  69. @property
  70. def name(self):
  71. return self.banned_value
  72. @property
  73. def is_expired(self):
  74. if self.expires_on:
  75. return self.expires_on < timezone.now()
  76. else:
  77. return False
  78. def check_value(self, value):
  79. if '*' in self.banned_value:
  80. regex = re.escape(self.banned_value).replace('\*', '(.*?)')
  81. return re.search('^%s$' % regex, value) is not None
  82. else:
  83. return self.banned_value == value
  84. def lift(self):
  85. self.expires_on = timezone.now()
  86. class BanCache(models.Model):
  87. user = models.OneToOneField(
  88. settings.AUTH_USER_MODEL,
  89. primary_key=True,
  90. related_name='ban_cache',
  91. )
  92. ban = models.ForeignKey(Ban, null=True, blank=True, on_delete=models.SET_NULL)
  93. bans_version = models.PositiveIntegerField(default=0)
  94. user_message = models.TextField(null=True, blank=True)
  95. staff_message = models.TextField(null=True, blank=True)
  96. expires_on = models.DateTimeField(null=True, blank=True)
  97. def save(self, *args, **kwargs):
  98. try:
  99. super(BanCache, self).save(*args, **kwargs)
  100. except IntegrityError:
  101. pass # first come is first serve with ban cache
  102. def get_serialized_message(self):
  103. from misago.users.serializers import BanMessageSerializer
  104. temp_ban = Ban(
  105. id=1,
  106. check_type=Ban.USERNAME,
  107. user_message=self.user_message,
  108. staff_message=self.staff_message,
  109. expires_on=self.expires_on,
  110. )
  111. return BanMessageSerializer(temp_ban).data
  112. @property
  113. def is_banned(self):
  114. return bool(self.ban)
  115. @property
  116. def is_valid(self):
  117. version_is_valid = cachebuster.is_valid(BANS_CACHEBUSTER, self.bans_version)
  118. expired = self.expires_on and self.expires_on < timezone.now()
  119. return version_is_valid and not expired