ban.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. import re
  2. from django.conf import settings
  3. from django.db import IntegrityError, models
  4. from django.utils import timezone
  5. from django.utils.translation import ugettext_lazy as _
  6. from misago.core import cachebuster
  7. __all__ = [
  8. 'BAN_USERNAME', 'BAN_EMAIL', 'BAN_IP', 'BANS_CHOICES',
  9. 'Ban', 'BanCache'
  10. ]
  11. BAN_CACHEBUSTER = 'misago_bans'
  12. BAN_USERNAME = 0
  13. BAN_EMAIL = 1
  14. BAN_IP = 2
  15. BANS_CHOICES = (
  16. (BAN_USERNAME, _('Username')),
  17. (BAN_EMAIL, _('E-mail address')),
  18. (BAN_IP, _('IP address')),
  19. )
  20. class BansManager(models.Manager):
  21. def get_ip_ban(self, ip):
  22. return self.get_ban(ip=ip)
  23. def get_username_ban(self, username):
  24. return self.get_ban(username=username)
  25. def get_email_ban(self, email):
  26. return self.get_ban(email=email)
  27. def invalidate_cache(self):
  28. cachebuster.invalidate(BAN_CACHEBUSTER)
  29. def get_ban(self, username=None, email=None, ip=None):
  30. checks = []
  31. if username:
  32. username = username.lower()
  33. checks.append(BAN_USERNAME)
  34. if email:
  35. email = email.lower()
  36. checks.append(BAN_EMAIL)
  37. if ip:
  38. checks.append(BAN_IP)
  39. queryset = self.filter(is_checked=True)
  40. if len(checks) == 1:
  41. queryset = queryset.filter(check_type=checks[0])
  42. elif checks:
  43. queryset = queryset.filter(check_type__in=checks)
  44. for ban in queryset.order_by('-id').iterator():
  45. if ban.is_expired:
  46. continue
  47. elif (ban.check_type == BAN_USERNAME and username and
  48. ban.check_value(username)):
  49. return ban
  50. elif (ban.check_type == BAN_EMAIL and email and
  51. ban.check_value(email)):
  52. return ban
  53. elif ban.check_type == BAN_IP and ip and ban.check_value(ip):
  54. return ban
  55. else:
  56. raise Ban.DoesNotExist('specified values are not banned')
  57. class Ban(models.Model):
  58. check_type = models.PositiveIntegerField(default=BAN_USERNAME, db_index=True)
  59. banned_value = models.CharField(max_length=255, db_index=True)
  60. user_message = models.TextField(null=True, blank=True)
  61. staff_message = models.TextField(null=True, blank=True)
  62. expires_on = models.DateTimeField(null=True, blank=True, db_index=True)
  63. is_checked = models.BooleanField(default=True, db_index=True)
  64. objects = BansManager()
  65. def save(self, *args, **kwargs):
  66. self.banned_value = self.banned_value.lower()
  67. self.is_checked = not self.is_expired
  68. return super(Ban, self).save(*args, **kwargs)
  69. def get_serialized_message(self):
  70. from ..serializers import BanMessageSerializer
  71. return BanMessageSerializer(self).data
  72. @property
  73. def check_name(self):
  74. return BANS_CHOICES[self.check_type][1]
  75. @property
  76. def name(self):
  77. return self.banned_value
  78. @property
  79. def is_expired(self):
  80. if self.expires_on:
  81. return self.expires_on < timezone.now()
  82. else:
  83. return False
  84. def check_value(self, value):
  85. if '*' in self.banned_value:
  86. regex = re.escape(self.banned_value).replace('\*', '(.*?)')
  87. return re.search('^%s$' % regex, value) is not None
  88. else:
  89. return self.banned_value == value
  90. def lift(self):
  91. self.expires_on = timezone.now()
  92. class BanCache(models.Model):
  93. user = models.OneToOneField(settings.AUTH_USER_MODEL, primary_key=True, related_name='ban_cache')
  94. ban = models.ForeignKey(Ban, null=True, blank=True, on_delete=models.SET_NULL)
  95. bans_version = models.PositiveIntegerField(default=0)
  96. user_message = models.TextField(null=True, blank=True)
  97. staff_message = models.TextField(null=True, blank=True)
  98. expires_on = models.DateTimeField(null=True, blank=True)
  99. def save(self, *args, **kwargs):
  100. try:
  101. super(BanCache, self).save(*args, **kwargs)
  102. except IntegrityError:
  103. pass # first come is first serve with ban cache
  104. def get_serialized_message(self):
  105. from ..serializers import BanMessageSerializer
  106. temp_ban = Ban(
  107. id=1,
  108. check_type=BAN_USERNAME,
  109. user_message=self.user_message,
  110. staff_message=self.staff_message,
  111. expires_on=self.expires_on
  112. )
  113. return BanMessageSerializer(temp_ban).data
  114. @property
  115. def is_banned(self):
  116. return bool(self.ban)
  117. @property
  118. def is_valid(self):
  119. version_is_valid = cachebuster.is_valid(BAN_CACHEBUSTER,
  120. self.bans_version)
  121. expired = self.expires_on and self.expires_on < timezone.now()
  122. return version_is_valid and not expired