bans.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. """
  2. API for checking values for bans
  3. Calling this instead of Ban.objects.find_ban is preffered, if you don't want
  4. to use validate_X_banned validators
  5. """
  6. from datetime import timedelta
  7. from django.utils import timezone
  8. from django.utils.dateparse import parse_datetime
  9. from . import BANS_CACHE
  10. from .models import Ban, BanCache
  11. CACHE_SESSION_KEY = "misago_ip_check"
  12. def get_username_ban(username, registration_only=False):
  13. try:
  14. return Ban.objects.get_username_ban(username, registration_only)
  15. except Ban.DoesNotExist:
  16. return None
  17. def get_email_ban(email, registration_only=False):
  18. try:
  19. return Ban.objects.get_email_ban(email, registration_only)
  20. except Ban.DoesNotExist:
  21. return None
  22. def get_ip_ban(ip, registration_only=False):
  23. try:
  24. return Ban.objects.get_ip_ban(ip, registration_only)
  25. except Ban.DoesNotExist:
  26. return None
  27. def get_user_ban(user, cache_versions):
  28. """
  29. This function checks if user is banned
  30. When user model is available, this is preffered to calling
  31. get_email_ban(user.email) and get_username_ban(user.username)
  32. because it sets ban cache on user model
  33. """
  34. try:
  35. ban_cache = user.ban_cache
  36. if not ban_cache.is_valid(cache_versions):
  37. _set_user_ban_cache(user, cache_versions)
  38. except BanCache.DoesNotExist:
  39. user.ban_cache = BanCache(user=user)
  40. user.ban_cache = _set_user_ban_cache(user, cache_versions)
  41. if user.ban_cache.ban:
  42. return user.ban_cache
  43. def _set_user_ban_cache(user, cache_versions):
  44. ban_cache = user.ban_cache
  45. ban_cache.cache_version = cache_versions[BANS_CACHE]
  46. try:
  47. user_ban = Ban.objects.get_ban(
  48. username=user.username, email=user.email, registration_only=False
  49. )
  50. ban_cache.ban = user_ban
  51. ban_cache.expires_on = user_ban.expires_on
  52. ban_cache.user_message = user_ban.user_message
  53. ban_cache.staff_message = user_ban.staff_message
  54. except Ban.DoesNotExist:
  55. ban_cache.ban = None
  56. ban_cache.expires_on = None
  57. ban_cache.user_message = None
  58. ban_cache.staff_message = None
  59. ban_cache.save()
  60. return ban_cache
  61. def get_request_ip_ban(request):
  62. """
  63. Utility for checking if request came from banned IP
  64. This check may be performed frequently, which is why there is extra
  65. boilerplate that caches ban check result in session
  66. """
  67. session_ban_cache = _get_session_bancache(request)
  68. if session_ban_cache:
  69. if session_ban_cache["is_banned"]:
  70. return session_ban_cache
  71. return False
  72. found_ban = get_ip_ban(request.user_ip)
  73. ban_cache = request.session[CACHE_SESSION_KEY] = {
  74. "version": request.cache_versions[BANS_CACHE],
  75. "ip": request.user_ip,
  76. }
  77. if found_ban:
  78. if found_ban.expires_on:
  79. ban_cache["expires_on"] = found_ban.expires_on.isoformat()
  80. else:
  81. ban_cache["expires_on"] = None
  82. ban_cache.update({"is_banned": True, "message": found_ban.user_message})
  83. request.session[CACHE_SESSION_KEY] = ban_cache
  84. return _hydrate_session_cache(request.session[CACHE_SESSION_KEY])
  85. ban_cache["is_banned"] = False
  86. request.session[CACHE_SESSION_KEY] = ban_cache
  87. def _get_session_bancache(request):
  88. try:
  89. ban_cache = request.session[CACHE_SESSION_KEY]
  90. ban_cache = _hydrate_session_cache(ban_cache)
  91. if ban_cache["ip"] != request.user_ip:
  92. return None
  93. if ban_cache["version"] != request.cache_versions[BANS_CACHE]:
  94. return None
  95. if ban_cache.get("expires_on"):
  96. if ban_cache["expires_on"] < timezone.today():
  97. return None
  98. return ban_cache
  99. except KeyError:
  100. return None
  101. def _hydrate_session_cache(ban_cache):
  102. hydrated = ban_cache.copy()
  103. if hydrated.get("expires_on"):
  104. hydrated["expires_on"] = parse_datetime(hydrated["expires_on"])
  105. return hydrated
  106. # Utilities for front-end based bans
  107. def ban_user(user, user_message=None, staff_message=None, length=None, expires_on=None):
  108. if not expires_on and length:
  109. expires_on = timezone.now() + timedelta(**length)
  110. ban = Ban.objects.create(
  111. banned_value=user.username.lower(),
  112. user_message=user_message,
  113. staff_message=staff_message,
  114. expires_on=expires_on,
  115. )
  116. Ban.objects.invalidate_cache()
  117. return ban
  118. def ban_ip(ip, user_message=None, staff_message=None, length=None, expires_on=None):
  119. if not expires_on and length:
  120. expires_on = timezone.now() + timedelta(**length)
  121. ban = Ban.objects.create(
  122. check_type=Ban.IP,
  123. banned_value=ip,
  124. user_message=user_message,
  125. staff_message=staff_message,
  126. expires_on=expires_on,
  127. )
  128. Ban.objects.invalidate_cache()
  129. return ban